Merging
[charm.git] / src / ck-ldb / LBDBManager.C
1 /**
2  * \addtogroup CkLdb
3 */
4 /*@{*/
5
6 #include <charm++.h>
7
8 #if CMK_LBDB_ON
9
10 #include "LBDBManager.h"
11
12 struct MigrateCB;
13
14 /*************************************************************
15  * Set up the builtin barrier-- the load balancer needs somebody
16  * to call AtSync on each PE in case there are no atSync array 
17  * elements.  The builtin-atSync caller (batsyncer) does this.
18  */
19
20 //Called periodically-- starts next load balancing cycle
21 void LBDB::batsyncer::gotoSync(void *bs)
22 {
23   LBDB::batsyncer *s=(LBDB::batsyncer *)bs;
24   s->db->AtLocalBarrier(s->BH);
25 }
26 //Called at end of each load balancing cycle
27 void LBDB::batsyncer::resumeFromSync(void *bs)
28 {
29   LBDB::batsyncer *s=(LBDB::batsyncer *)bs;
30 //  CmiPrintf("[%d] LBDB::batsyncer::resumeFromSync with %gs\n", CkMyPe(), s->period);
31
32 #if 0
33   double curT = CmiWallTimer();
34   if (s->nextT<curT)  s->period *= 2;
35   s->nextT = curT + s->period;
36 #endif
37
38   CcdCallFnAfterOnPE((CcdVoidFn)gotoSync, (void *)s, 1000*s->period, CkMyPe());
39 }
40
41 // initPeriod in seconds
42 void LBDB::batsyncer::init(LBDB *_db,double initPeriod)
43 {
44   db=_db;
45   period=initPeriod;
46   nextT = CmiWallTimer() + period;
47   BH = db->AddLocalBarrierClient((LDResumeFn)resumeFromSync,(void*)(this));
48   //This just does a CcdCallFnAfter
49   resumeFromSync((void *)this);
50 }
51
52
53 /*************************************************************
54  * LBDB Code
55  *************************************************************/
56
57 LBDB::LBDB(): useBarrier(CmiTrue)
58 {
59     statsAreOn = CmiFalse;
60     omCount = objCount = oms_registering = 0;
61     obj_running = CmiFalse;
62     commTable = new LBCommTable;
63     obj_walltime = 0;
64 #if CMK_LB_CPUTIMER
65     obj_cputime = 0;
66 #endif
67     startLBFn_count = 0;
68     predictCBFn = NULL;
69     batsync.init(this, _lb_args.lbperiod());        // original 1.0 second
70 }
71
72 LDOMHandle LBDB::AddOM(LDOMid _userID, void* _userData, 
73                        LDCallbacks _callbacks)
74 {
75   LDOMHandle newhandle;
76
77   newhandle.ldb.handle = (void*)(this);
78 //  newhandle.user_ptr = _userData;
79   newhandle.id = _userID;
80
81   LBOM* om = new LBOM(this,_userID,_userData,_callbacks);
82   if (om != NULL) {
83     newhandle.handle = oms.length();
84     oms.insertAtEnd(om);
85   } else newhandle.handle = -1;
86   om->DepositHandle(newhandle);
87   omCount++;
88   return newhandle;
89 }
90
91 #if CMK_BIGSIM_CHARM
92 #define LBOBJ_OOC_IDX 0x1
93 #endif
94
95 LDObjHandle LBDB::AddObj(LDOMHandle _omh, LDObjid _id,
96                          void *_userData, CmiBool _migratable)
97 {
98   LDObjHandle newhandle;
99
100   newhandle.omhandle = _omh;
101 //  newhandle.user_ptr = _userData;
102   newhandle.id = _id;
103   
104 #if 1
105 #if CMK_BIGSIM_CHARM
106   if(_BgOutOfCoreFlag==2){ //taking object into memory
107     //first find the first (LBOBJ_OOC_IDX) in objs and insert the object at that position
108     int newpos = -1;
109     for(int i=0; i<objs.length(); i++){
110         if(objs[i]==(LBObj *)LBOBJ_OOC_IDX){
111             newpos = i;
112             break;
113         }
114     }
115     if(newpos==-1) newpos = objs.length();
116     newhandle.handle = newpos;
117     LBObj *obj = new LBObj(this, newhandle, _userData, _migratable);
118     objs.insert(newpos, obj);
119     //objCount is not increased since it's the original object which is pupped
120     //through out-of-core emulation. 
121     //objCount++;
122   }else
123 #endif
124   {
125     //BIGSIM_OOC DEBUGGING
126     //CkPrintf("Proc[%d]: In AddObj for real migration\n", CkMyPe());
127     newhandle.handle = objs.length();
128     LBObj *obj = new LBObj(this, newhandle, _userData, _migratable);
129     objs.insertAtEnd(obj);
130     objCount++;
131   }
132   //BIGSIM_OOC DEBUGGING
133   //CkPrintf("LBDBManager.C: New handle: %d, LBObj=%p\n", newhandle.handle, objs[newhandle.handle]);
134
135 #else
136   LBObj *obj = new LBObj(this,_omh,_id,_userData,_migratable);
137   if (obj != NULL) {
138     newhandle.handle = objs.length();
139     objs.insertAtEnd(obj);
140   } else {
141     newhandle.handle = -1;
142   }
143   obj->DepositHandle(newhandle);
144 #endif
145   return newhandle;
146 }
147
148 void LBDB::UnregisterObj(LDObjHandle _h)
149 {
150 //  (objs[_h.handle])->registered=CmiFalse;
151 // free the memory, it is a memory leak.
152 // CmiPrintf("[%d] UnregisterObj: %d\n", CkMyPe(), _h.handle);
153   delete objs[_h.handle];
154
155 #if CMK_BIGSIM_CHARM
156   //hack for BigSim out-of-core emulation.
157   //we want the chare array object to keep at the same
158   //position even going through the pupping routine.
159   if(_BgOutOfCoreFlag==1){ //in taking object out of memory
160     objs[_h.handle] = (LBObj *)(LBOBJ_OOC_IDX);
161   }else
162 #endif
163   {
164     objs[_h.handle] = NULL;
165   }
166 }
167
168 void LBDB::RegisteringObjects(LDOMHandle _h)
169 {
170   // for an unregistered anonymous OM to join and control the barrier
171   if (_h.id.id.idx == 0) {
172     if (oms_registering == 0)
173       localBarrier.TurnOff();
174     oms_registering++;
175   }
176   else {
177   LBOM* om = oms[_h.handle];
178   if (!om->RegisteringObjs()) {
179     if (oms_registering == 0)
180       localBarrier.TurnOff();
181     oms_registering++;
182     om->SetRegisteringObjs(CmiTrue);
183   }
184   }
185 }
186
187 void LBDB::DoneRegisteringObjects(LDOMHandle _h)
188 {
189   // for an unregistered anonymous OM to join and control the barrier
190   if (_h.id.id.idx == 0) {
191     oms_registering--;
192     if (oms_registering == 0)
193       localBarrier.TurnOn();
194   }
195   else {
196   LBOM* om = oms[_h.handle];
197   if (om->RegisteringObjs()) {
198     oms_registering--;
199     if (oms_registering == 0)
200       localBarrier.TurnOn();
201     om->SetRegisteringObjs(CmiFalse);
202   }
203   }
204 }
205
206
207 void LBDB::Send(const LDOMHandle &destOM, const LDObjid &destid, unsigned int bytes, int destObjProc)
208 {
209   LBCommData* item_ptr;
210
211   if (obj_running) {
212     const LDObjHandle &runObj = RunningObj();
213
214     // Don't record self-messages from an object to an object
215     if ( LDOMidEqual(runObj.omhandle.id,destOM.id)
216          && LDObjIDEqual(runObj.id,destid) )
217       return;
218
219     // In the future, we'll have to eliminate processor to same 
220     // processor messages as well
221
222     LBCommData item(runObj,destOM.id,destid, destObjProc);
223     item_ptr = commTable->HashInsertUnique(item);
224   } else {
225     LBCommData item(CkMyPe(),destOM.id,destid, destObjProc);
226     item_ptr = commTable->HashInsertUnique(item);
227   }  
228   item_ptr->addMessage(bytes);
229 }
230
231 void LBDB::MulticastSend(const LDOMHandle &destOM, LDObjid *destids, int ndests, unsigned int bytes, int nMsgs)
232 {
233   LBCommData* item_ptr;
234
235   //CmiAssert(obj_running);
236   if (obj_running) {
237     const LDObjHandle &runObj = RunningObj();
238
239     LBCommData item(runObj,destOM.id,destids, ndests);
240     item_ptr = commTable->HashInsertUnique(item);
241     item_ptr->addMessage(bytes, nMsgs);
242   } 
243 }
244
245 void LBDB::ClearLoads(void)
246 {
247   int i;
248   for(i=0; i < objCount; i++) {
249     LBObj *obj = objs[i]; 
250     if (obj)
251     {
252       if (obj->data.wallTime>.0) {
253         obj->lastWallTime = obj->data.wallTime;
254 #if CMK_LB_CPUTIMER
255         obj->lastCpuTime = obj->data.cpuTime;
256 #endif
257       }
258       obj->data.wallTime = 0.;
259 #if CMK_LB_CPUTIMER
260       obj->data.cpuTime = 0.;
261 #endif
262     }
263   }
264   delete commTable;
265   commTable = new LBCommTable;
266   machineUtil.Clear();
267   obj_walltime = 0;
268 #if CMK_LB_CPUTIMER
269   obj_cputime = 0;
270 #endif
271 }
272
273 int LBDB::ObjDataCount()
274 {
275   int nitems=0;
276   int i;
277   if (_lb_args.migObjOnly()) {
278   for(i=0; i < objCount; i++)
279     if (objs[i] && (objs[i])->data.migratable)
280       nitems++;
281   }
282   else {
283   for(i=0; i < objCount; i++)
284     if (objs[i])
285       nitems++;
286   }
287   return nitems;
288 }
289
290 void LBDB::GetObjData(LDObjData *dp)
291 {
292   if (_lb_args.migObjOnly()) {
293   for(int i = 0; i < objs.length(); i++) {
294     LBObj* obj = objs[i];
295     if ( obj && obj->data.migratable)
296       *dp++ = obj->ObjData();
297   }
298   }
299   else {
300   for(int i = 0; i < objs.length(); i++) {
301     LBObj* obj = objs[i];
302     if (obj)
303       *dp++ = obj->ObjData();
304   }
305   }
306 }
307
308 int LBDB::Migrate(LDObjHandle h, int dest)
309 {
310     //BIGSIM_OOC DEBUGGING
311     //CmiPrintf("[%d] LBDB::Migrate: incoming handle %d with handle range 0-%d\n", CkMyPe(), h.handle, objCount);
312
313   if (h.handle > objCount)
314     CmiPrintf("[%d] LBDB::Migrate: Handle %d out of range 0-%d\n",CkMyPe(),h.handle,objCount);
315   else if (!objs[h.handle]) {
316     CmiPrintf("[%d] LBDB::Migrate: Handle %d no longer registered, range 0-%d\n", CkMyPe(),h.handle,objCount);
317     return 0;
318   }
319
320   if ((h.handle < objCount) && objs[h.handle]) {
321     LBOM *const om = oms[(objs[h.handle])->parentOM().handle];
322     om->Migrate(h, dest);
323   }
324   return 1;
325 }
326
327 void LBDB::AdaptResumeSync(int lb_ideal_period) {
328   for (int i = 0; i < objs.length(); i++) {
329     LBObj* obj = objs[i];
330     if (obj) {
331       LBOM *om = oms[obj->parentOM().handle];
332       LDObjHandle h = obj->GetLDObjHandle();
333       om->AdaptResumeSync(h, lb_ideal_period);
334     }
335   }
336 }
337
338 void LBDB::Migrated(LDObjHandle h, int waitBarrier)
339 {
340   // Object migrated, inform load balancers
341
342   // subtle: callback may change (on) when switching LBs
343   // call in reverse order
344   //for(int i=0; i < migrateCBList.length(); i++) {
345   for(int i=migrateCBList.length()-1; i>=0; i--) {
346     MigrateCB* cb = (MigrateCB*)migrateCBList[i];
347     if (cb && cb->on) (cb->fn)(cb->data,h,waitBarrier);
348   }
349   
350 }
351
352
353 int LBDB::NotifyMigrated(LDMigratedFn fn, void* data)
354 {
355   // Save migration function
356   MigrateCB* callbk = new MigrateCB;
357
358   callbk->fn = fn;
359   callbk->data = data;
360   callbk->on = 1;
361   migrateCBList.insertAtEnd(callbk);
362   return migrateCBList.size()-1;
363 }
364
365 void LBDB::RemoveNotifyMigrated(int handle)
366 {
367   MigrateCB* callbk = migrateCBList[handle];
368   migrateCBList[handle] = NULL;
369   delete callbk;
370 }
371
372 int LBDB::AddStartLBFn(LDStartLBFn fn, void* data)
373 {
374   // Save startLB function
375   StartLBCB* callbk = new StartLBCB;
376
377   callbk->fn = fn;
378   callbk->data = data;
379   callbk->on = 1;
380   startLBFnList.push_back(callbk);
381   startLBFn_count++;
382   return startLBFnList.size()-1;
383 }
384
385 void LBDB::RemoveStartLBFn(LDStartLBFn fn)
386 {
387   for (int i=0; i<startLBFnList.length(); i++) {
388     StartLBCB* callbk = startLBFnList[i];
389     if (callbk && callbk->fn == fn) {
390       delete callbk;
391       startLBFnList[i] = 0; 
392       startLBFn_count --;
393       break;
394     }
395   }
396 }
397
398 void LBDB::StartLB() 
399 {
400   if (startLBFn_count == 0) {
401     CmiAbort("StartLB is not supported in this LB");
402   }
403   for (int i=0; i<startLBFnList.length(); i++) {
404     StartLBCB *startLBFn = startLBFnList[i];
405     if (startLBFn && startLBFn->on) startLBFn->fn(startLBFn->data);
406   }
407 }
408
409 int LBDB::AddMigrationDoneFn(LDMigrationDoneFn fn, void* data) {
410   // Save migrationDone callback function
411   MigrationDoneCB* callbk = new MigrationDoneCB;
412
413   callbk->fn = fn;
414   callbk->data = data;
415   migrationDoneCBList.push_back(callbk);
416   return migrationDoneCBList.size()-1;
417 }
418
419 void LBDB::RemoveMigrationDoneFn(LDMigrationDoneFn fn) {
420   for (int i=0; i<migrationDoneCBList.length(); i++) {
421     MigrationDoneCB* callbk = migrationDoneCBList[i];
422     if (callbk && callbk->fn == fn) {
423       delete callbk;
424       migrationDoneCBList[i] = 0; 
425       break;
426     }
427   }
428 }
429
430 void LBDB::MigrationDone() {
431   for (int i=0; i<migrationDoneCBList.length(); i++) {
432     MigrationDoneCB *callbk = migrationDoneCBList[i];
433     if (callbk) callbk->fn(callbk->data);
434   }
435 }
436
437 void LBDB::SetupPredictor(LDPredictModelFn on, LDPredictWindowFn onWin, LDPredictFn off, LDPredictModelFn change, void* data)
438 {
439   if (predictCBFn==NULL) predictCBFn = new PredictCB;
440   predictCBFn->on = on;
441   predictCBFn->onWin = onWin;
442   predictCBFn->off = off;
443   predictCBFn->change = change;
444   predictCBFn->data = data;
445 }
446
447 void LBDB::BackgroundLoad(LBRealType* bg_walltime, LBRealType* bg_cputime)
448 {
449   LBRealType total_walltime;
450   LBRealType total_cputime;
451   TotalTime(&total_walltime, &total_cputime);
452
453   LBRealType idletime;
454   IdleTime(&idletime);
455
456   *bg_walltime = total_walltime - idletime - obj_walltime;
457   if (*bg_walltime < 0) *bg_walltime = 0.;
458 #if CMK_LB_CPUTIMER
459   *bg_cputime = total_cputime - obj_cputime;
460 #else
461   *bg_cputime = *bg_walltime;
462 #endif
463 }
464
465 void LBDB::GetTime(LBRealType *total_walltime,LBRealType *total_cputime,
466                    LBRealType *idletime, LBRealType *bg_walltime, LBRealType *bg_cputime)
467 {
468   TotalTime(total_walltime,total_cputime);
469
470   IdleTime(idletime);
471   
472   *bg_walltime = *total_walltime - *idletime - obj_walltime;
473   if (*bg_walltime < 0) *bg_walltime = 0.;
474 #if CMK_LB_CPUTIMER
475   *bg_cputime = *total_cputime - obj_cputime;
476 #else
477   *bg_cputime = *bg_walltime;
478 #endif
479   //CkPrintf("HERE [%d] total: %f %f obj: %f %f idle: %f bg: %f\n", CkMyPe(), *total_walltime, *total_cputime, obj_walltime, obj_cputime, *idletime, *bg_walltime);
480 }
481
482 void LBDB::DumpDatabase()
483 {
484 #ifdef DEBUG  
485   CmiPrintf("Database contains %d object managers\n",omCount);
486   CmiPrintf("Database contains %d objects\n",objCount);
487 #endif
488 }
489
490 int LBDB::useMem() {
491   int size = sizeof(LBDB);
492   size += oms.length() * sizeof(LBOM);
493   size += ObjDataCount() * sizeof(LBObj);
494   size += migrateCBList.length() * sizeof(MigrateCBList);
495   size += startLBFnList.length() * sizeof(StartLBCB);
496   size += commTable->useMem();
497   return size;
498 }
499
500 LDBarrierClient LocalBarrier::AddClient(LDResumeFn fn, void* data)
501 {
502   client* new_client = new client;
503   new_client->fn = fn;
504   new_client->data = data;
505   new_client->refcount = cur_refcount;
506
507   LDBarrierClient ret_val;
508 #if CMK_BIGSIM_CHARM
509   ret_val.serial = first_free_client_slot;
510   clients.insert(ret_val.serial, new_client);
511
512   //looking for the next first free client slot
513   int nextfree=-1;
514   for(int i=first_free_client_slot+1; i<clients.size(); i++)
515     if(clients[i]==NULL) { nextfree = i; break; }
516   if(nextfree==-1) nextfree = clients.size();
517   first_free_client_slot = nextfree;
518
519   if(_BgOutOfCoreFlag!=2){
520     //during out-of-core emualtion for BigSim, if taking procs from disk to mem,
521     //client_count should not be increased
522     client_count++;
523   }
524
525 #else  
526   //ret_val.serial = max_client;
527   ret_val.serial = clients.size();
528   clients.insertAtEnd(new_client);
529   //max_client++;
530   client_count++;
531 #endif
532
533   return ret_val;
534 }
535
536 void LocalBarrier::RemoveClient(LDBarrierClient c)
537 {
538   const int cnum = c.serial;
539 #if CMK_BIGSIM_CHARM
540   if (cnum < clients.size() && clients[cnum] != 0) {
541     delete (clients[cnum]);
542     clients[cnum] = 0;
543
544     if(cnum<=first_free_client_slot) first_free_client_slot = cnum;
545
546     if(_BgOutOfCoreFlag!=1){
547         //during out-of-core emulation for BigSim, if taking procs from mem to disk,
548         //client_count should not be increased
549         client_count--;
550     }
551   }
552 #else
553   //if (cnum < max_client && clients[cnum] != 0) {
554   if (cnum < clients.size() && clients[cnum] != 0) {
555     delete (clients[cnum]);
556     clients[cnum] = 0;
557     client_count--;
558   }
559 #endif
560 }
561
562 LDBarrierReceiver LocalBarrier::AddReceiver(LDBarrierFn fn, void* data)
563 {
564   receiver* new_receiver = new receiver;
565   new_receiver->fn = fn;
566   new_receiver->data = data;
567   new_receiver->on = 1;
568
569   LDBarrierReceiver ret_val;
570 //  ret_val.serial = max_receiver;
571   ret_val.serial = receivers.size();
572   receivers.insertAtEnd(new_receiver);
573 //  max_receiver++;
574
575   return ret_val;
576 }
577
578 void LocalBarrier::RemoveReceiver(LDBarrierReceiver c)
579 {
580   const int cnum = c.serial;
581   //if (cnum < max_receiver && receivers[cnum] != 0) {
582   if (cnum < receivers.size() && receivers[cnum] != 0) {
583     delete (receivers[cnum]);
584     receivers[cnum] = 0;
585   }
586 }
587
588 void LocalBarrier::TurnOnReceiver(LDBarrierReceiver c)
589 {
590   const int cnum = c.serial;
591   //if (cnum < max_receiver && receivers[cnum] != 0) {
592   if (cnum < receivers.size() && receivers[cnum] != 0) {
593     receivers[cnum]->on = 1;
594   }
595 }
596
597 void LocalBarrier::TurnOffReceiver(LDBarrierReceiver c)
598 {
599   const int cnum = c.serial;
600   //if (cnum < max_receiver && receivers[cnum] != 0) {
601   if (cnum < receivers.size() && receivers[cnum] != 0) {
602     receivers[cnum]->on = 0;
603   }
604 }
605
606 void LocalBarrier::AtBarrier(LDBarrierClient h)
607 {
608   (clients[h.serial])->refcount++;
609   at_count++;
610   CheckBarrier();
611 }
612
613 void LocalBarrier::CheckBarrier()
614 {
615   if (!on) return;
616
617   // If there are no clients, resume as soon as we're turned on
618
619   if (client_count == 0) {
620     cur_refcount++;
621     CallReceivers();
622   }
623   if (at_count >= client_count) {
624     CmiBool at_barrier = CmiFalse;
625
626 //    for(int i=0; i < max_client; i++)
627     for(int i=0; i < clients.size(); i++)
628       if (clients[i] != 0 && ((client*)clients[i])->refcount >= cur_refcount)
629         at_barrier = CmiTrue;
630                 
631     if (at_barrier) {
632       at_count -= client_count;
633       cur_refcount++;
634       CallReceivers();
635     }
636   }
637 }
638
639 void LocalBarrier::CallReceivers(void)
640 {
641   CmiBool called_receiver=CmiFalse;
642
643 //  for(int i=0; i < max_receiver; i++)
644 //   for (int i=max_receiver-1; i>=0; i--) {
645    for (int i=receivers.size()-1; i>=0; i--) {
646       receiver *recv = receivers[i];
647       if (recv != 0 && recv->on) {
648         recv->fn(recv->data);
649         called_receiver = CmiTrue;
650       }
651   }
652
653   if (!called_receiver)
654     ResumeClients();
655   
656 }
657
658 void LocalBarrier::ResumeClients(void)
659 {
660 //  for(int i=0; i < max_client; i++)
661   for(int i=0; i < clients.size(); i++)
662     if (clients[i] != 0) {
663       ((client*)clients[i])->fn(((client*)clients[i])->data);
664     }   
665 }
666
667 #endif
668
669 /*@}*/