add pup support
[charm.git] / src / ck-ldb / LBDBManager.C
1 /**
2  * \addtogroup CkLdb
3 */
4 /*@{*/
5
6 #include <charm++.h>
7
8 #if CMK_LBDB_ON
9
10 #include "LBDBManager.h"
11
12 struct MigrateCB;
13
14 /*************************************************************
15  * Set up the builtin barrier-- the load balancer needs somebody
16  * to call AtSync on each PE in case there are no atSync array 
17  * elements.  The builtin-atSync caller (batsyncer) does this.
18  */
19
20 //Called periodically-- starts next load balancing cycle
21 void LBDB::batsyncer::gotoSync(void *bs)
22 {
23   LBDB::batsyncer *s=(LBDB::batsyncer *)bs;
24   s->db->AtLocalBarrier(s->BH);
25 }
26 //Called at end of each load balancing cycle
27 void LBDB::batsyncer::resumeFromSync(void *bs)
28 {
29   LBDB::batsyncer *s=(LBDB::batsyncer *)bs;
30 //  CmiPrintf("[%d] LBDB::batsyncer::resumeFromSync with %gs\n", CkMyPe(), s->period);
31
32 #if 0
33   double curT = CmiWallTimer();
34   if (s->nextT<curT)  s->period *= 2;
35   s->nextT = curT + s->period;
36 #endif
37
38   CcdCallFnAfterOnPE((CcdVoidFn)gotoSync, (void *)s, 1000*s->period, CkMyPe());
39 }
40
41 // initPeriod in seconds
42 void LBDB::batsyncer::init(LBDB *_db,double initPeriod)
43 {
44   db=_db;
45   period=initPeriod;
46   nextT = CmiWallTimer() + period;
47   BH = db->AddLocalBarrierClient((LDResumeFn)resumeFromSync,(void*)(this));
48   //This just does a CcdCallFnAfter
49   resumeFromSync((void *)this);
50 }
51
52
53 /*************************************************************
54  * LBDB Code
55  *************************************************************/
56
57 LBDB::LBDB(): useBarrier(CmiTrue)
58 {
59     statsAreOn = CmiFalse;
60     omCount = objCount = oms_registering = 0;
61     obj_running = CmiFalse;
62     commTable = new LBCommTable;
63     obj_walltime = 0;
64 #if CMK_LB_CPUTIMER
65     obj_cputime = 0;
66 #endif
67     startLBFn_count = 0;
68     predictCBFn = NULL;
69     batsync.init(this, _lb_args.lbperiod());        // original 1.0 second
70 }
71
72 LDOMHandle LBDB::AddOM(LDOMid _userID, void* _userData, 
73                        LDCallbacks _callbacks)
74 {
75   LDOMHandle newhandle;
76
77   newhandle.ldb.handle = (void*)(this);
78 //  newhandle.user_ptr = _userData;
79   newhandle.id = _userID;
80
81   LBOM* om = new LBOM(this,_userID,_userData,_callbacks);
82   if (om != NULL) {
83     newhandle.handle = oms.length();
84     oms.insertAtEnd(om);
85   } else newhandle.handle = -1;
86   om->DepositHandle(newhandle);
87   omCount++;
88   return newhandle;
89 }
90
91 #if CMK_BIGSIM_CHARM
92 #define LBOBJ_OOC_IDX 0x1
93 #endif
94
95 LDObjHandle LBDB::AddObj(LDOMHandle _omh, LDObjid _id,
96                          void *_userData, CmiBool _migratable)
97 {
98   LDObjHandle newhandle;
99
100   newhandle.omhandle = _omh;
101 //  newhandle.user_ptr = _userData;
102   newhandle.id = _id;
103   
104 #if 1
105 #if CMK_BIGSIM_CHARM
106   if(_BgOutOfCoreFlag==2){ //taking object into memory
107     //first find the first (LBOBJ_OOC_IDX) in objs and insert the object at that position
108     int newpos = -1;
109     for(int i=0; i<objs.length(); i++){
110         if(objs[i]==(LBObj *)LBOBJ_OOC_IDX){
111             newpos = i;
112             break;
113         }
114     }
115     if(newpos==-1) newpos = objs.length();
116     newhandle.handle = newpos;
117     LBObj *obj = new LBObj(this, newhandle, _userData, _migratable);
118     objs.insert(newpos, obj);
119     //objCount is not increased since it's the original object which is pupped
120     //through out-of-core emulation. 
121     //objCount++;
122   }else
123 #endif
124   {
125     //BIGSIM_OOC DEBUGGING
126     //CkPrintf("Proc[%d]: In AddObj for real migration\n", CkMyPe());
127     newhandle.handle = objs.length();
128     LBObj *obj = new LBObj(this, newhandle, _userData, _migratable);
129     objs.insertAtEnd(obj);
130     objCount++;
131   }
132   //BIGSIM_OOC DEBUGGING
133   //CkPrintf("LBDBManager.C: New handle: %d, LBObj=%p\n", newhandle.handle, objs[newhandle.handle]);
134
135 #else
136   LBObj *obj = new LBObj(this,_omh,_id,_userData,_migratable);
137   if (obj != NULL) {
138     newhandle.handle = objs.length();
139     objs.insertAtEnd(obj);
140   } else {
141     newhandle.handle = -1;
142   }
143   obj->DepositHandle(newhandle);
144 #endif
145   return newhandle;
146 }
147
148 void LBDB::UnregisterObj(LDObjHandle _h)
149 {
150 //  (objs[_h.handle])->registered=CmiFalse;
151 // free the memory, it is a memory leak.
152 // CmiPrintf("[%d] UnregisterObj: %d\n", CkMyPe(), _h.handle);
153   delete objs[_h.handle];
154
155 #if CMK_BIGSIM_CHARM
156   //hack for BigSim out-of-core emulation.
157   //we want the chare array object to keep at the same
158   //position even going through the pupping routine.
159   if(_BgOutOfCoreFlag==1){ //in taking object out of memory
160     objs[_h.handle] = (LBObj *)(LBOBJ_OOC_IDX);
161   }else
162 #endif
163   {
164     objs[_h.handle] = NULL;
165   }
166 }
167
168 void LBDB::RegisteringObjects(LDOMHandle _h)
169 {
170   // for an unregistered anonymous OM to join and control the barrier
171   if (_h.id.id.idx == 0) {
172     if (oms_registering == 0)
173       localBarrier.TurnOff();
174     oms_registering++;
175   }
176   else {
177   LBOM* om = oms[_h.handle];
178   if (!om->RegisteringObjs()) {
179     if (oms_registering == 0)
180       localBarrier.TurnOff();
181     oms_registering++;
182     om->SetRegisteringObjs(CmiTrue);
183   }
184   }
185 }
186
187 void LBDB::DoneRegisteringObjects(LDOMHandle _h)
188 {
189   // for an unregistered anonymous OM to join and control the barrier
190   if (_h.id.id.idx == 0) {
191     oms_registering--;
192     if (oms_registering == 0)
193       localBarrier.TurnOn();
194   }
195   else {
196   LBOM* om = oms[_h.handle];
197   if (om->RegisteringObjs()) {
198     oms_registering--;
199     if (oms_registering == 0)
200       localBarrier.TurnOn();
201     om->SetRegisteringObjs(CmiFalse);
202   }
203   }
204 }
205
206
207 void LBDB::Send(const LDOMHandle &destOM, const LDObjid &destid, unsigned int bytes, int destObjProc)
208 {
209   LBCommData* item_ptr;
210
211   if (obj_running) {
212     const LDObjHandle &runObj = RunningObj();
213
214     // Don't record self-messages from an object to an object
215     if ( LDOMidEqual(runObj.omhandle.id,destOM.id)
216          && LDObjIDEqual(runObj.id,destid) )
217       return;
218
219     // In the future, we'll have to eliminate processor to same 
220     // processor messages as well
221
222     LBCommData item(runObj,destOM.id,destid, destObjProc);
223     item_ptr = commTable->HashInsertUnique(item);
224   } else {
225     LBCommData item(CkMyPe(),destOM.id,destid, destObjProc);
226     item_ptr = commTable->HashInsertUnique(item);
227   }  
228   item_ptr->addMessage(bytes);
229 }
230
231 void LBDB::MulticastSend(const LDOMHandle &destOM, LDObjid *destids, int ndests, unsigned int bytes, int nMsgs)
232 {
233   LBCommData* item_ptr;
234
235   //CmiAssert(obj_running);
236   if (obj_running) {
237     const LDObjHandle &runObj = RunningObj();
238
239     LBCommData item(runObj,destOM.id,destids, ndests);
240     item_ptr = commTable->HashInsertUnique(item);
241     item_ptr->addMessage(bytes, nMsgs);
242   } 
243 }
244
245 void LBDB::ClearLoads(void)
246 {
247   int i;
248   for(i=0; i < objCount; i++) {
249     LBObj *obj = objs[i]; 
250     if (obj)
251     {
252       if (obj->data.wallTime>.0) {
253         obj->lastWallTime = obj->data.wallTime;
254 #if CMK_LB_CPUTIMER
255         obj->lastCpuTime = obj->data.cpuTime;
256 #endif
257       }
258       obj->data.wallTime = 0.;
259 #if CMK_LB_CPUTIMER
260       obj->data.cpuTime = 0.;
261 #endif
262     }
263   }
264   delete commTable;
265   commTable = new LBCommTable;
266   machineUtil.Clear();
267   obj_walltime = 0;
268 #if CMK_LB_CPUTIMER
269   obj_cputime = 0;
270 #endif
271 }
272
273 int LBDB::ObjDataCount()
274 {
275   int nitems=0;
276   int i;
277   if (_lb_args.migObjOnly()) {
278   for(i=0; i < objCount; i++)
279     if (objs[i] && (objs[i])->data.migratable)
280       nitems++;
281   }
282   else {
283   for(i=0; i < objCount; i++)
284     if (objs[i])
285       nitems++;
286   }
287   return nitems;
288 }
289
290 void LBDB::GetObjData(LDObjData *dp)
291 {
292   if (_lb_args.migObjOnly()) {
293   for(int i = 0; i < objs.length(); i++) {
294     LBObj* obj = objs[i];
295     if ( obj && obj->data.migratable)
296       *dp++ = obj->ObjData();
297   }
298   }
299   else {
300   for(int i = 0; i < objs.length(); i++) {
301     LBObj* obj = objs[i];
302     if (obj)
303       *dp++ = obj->ObjData();
304   }
305   }
306 }
307
308 int LBDB::Migrate(LDObjHandle h, int dest)
309 {
310     //BIGSIM_OOC DEBUGGING
311     //CmiPrintf("[%d] LBDB::Migrate: incoming handle %d with handle range 0-%d\n", CkMyPe(), h.handle, objCount);
312
313   if (h.handle > objCount)
314     CmiPrintf("[%d] LBDB::Migrate: Handle %d out of range 0-%d\n",CkMyPe(),h.handle,objCount);
315   else if (!objs[h.handle]) {
316     CmiPrintf("[%d] LBDB::Migrate: Handle %d no longer registered, range 0-%d\n", CkMyPe(),h.handle,objCount);
317     return 0;
318   }
319
320   if ((h.handle < objCount) && objs[h.handle]) {
321     LBOM *const om = oms[(objs[h.handle])->parentOM().handle];
322     om->Migrate(h, dest);
323   }
324   return 1;
325 }
326
327 void LBDB::MetaLBResumeWaitingChares(int lb_ideal_period) {
328   for (int i = 0; i < objs.length(); i++) {
329     LBObj* obj = objs[i];
330     if (obj) {
331       LBOM *om = oms[obj->parentOM().handle];
332       LDObjHandle h = obj->GetLDObjHandle();
333       om->MetaLBResumeWaitingChares(h, lb_ideal_period);
334     }
335   }
336 }
337
338 void LBDB::MetaLBCallLBOnChares() {
339   for (int i = 0; i < objs.length(); i++) {
340     LBObj* obj = objs[i];
341     if (obj) {
342       LBOM *om = oms[obj->parentOM().handle];
343       LDObjHandle h = obj->GetLDObjHandle();
344       om->MetaLBCallLBOnChares(h);
345     }
346   }
347 }
348
349 void LBDB::Migrated(LDObjHandle h, int waitBarrier)
350 {
351   // Object migrated, inform load balancers
352
353   // subtle: callback may change (on) when switching LBs
354   // call in reverse order
355   //for(int i=0; i < migrateCBList.length(); i++) {
356   for(int i=migrateCBList.length()-1; i>=0; i--) {
357     MigrateCB* cb = (MigrateCB*)migrateCBList[i];
358     if (cb && cb->on) (cb->fn)(cb->data,h,waitBarrier);
359   }
360   
361 }
362
363
364 int LBDB::NotifyMigrated(LDMigratedFn fn, void* data)
365 {
366   // Save migration function
367   MigrateCB* callbk = new MigrateCB;
368
369   callbk->fn = fn;
370   callbk->data = data;
371   callbk->on = 1;
372   migrateCBList.insertAtEnd(callbk);
373   return migrateCBList.size()-1;
374 }
375
376 void LBDB::RemoveNotifyMigrated(int handle)
377 {
378   MigrateCB* callbk = migrateCBList[handle];
379   migrateCBList[handle] = NULL;
380   delete callbk;
381 }
382
383 int LBDB::AddStartLBFn(LDStartLBFn fn, void* data)
384 {
385   // Save startLB function
386   StartLBCB* callbk = new StartLBCB;
387
388   callbk->fn = fn;
389   callbk->data = data;
390   callbk->on = 1;
391   startLBFnList.push_back(callbk);
392   startLBFn_count++;
393   return startLBFnList.size()-1;
394 }
395
396 void LBDB::RemoveStartLBFn(LDStartLBFn fn)
397 {
398   for (int i=0; i<startLBFnList.length(); i++) {
399     StartLBCB* callbk = startLBFnList[i];
400     if (callbk && callbk->fn == fn) {
401       delete callbk;
402       startLBFnList[i] = 0; 
403       startLBFn_count --;
404       break;
405     }
406   }
407 }
408
409 void LBDB::StartLB() 
410 {
411   if (startLBFn_count == 0) {
412     CmiAbort("StartLB is not supported in this LB");
413   }
414   for (int i=0; i<startLBFnList.length(); i++) {
415     StartLBCB *startLBFn = startLBFnList[i];
416     if (startLBFn && startLBFn->on) startLBFn->fn(startLBFn->data);
417   }
418 }
419
420 int LBDB::AddMigrationDoneFn(LDMigrationDoneFn fn, void* data) {
421   // Save migrationDone callback function
422   MigrationDoneCB* callbk = new MigrationDoneCB;
423
424   callbk->fn = fn;
425   callbk->data = data;
426   migrationDoneCBList.push_back(callbk);
427   return migrationDoneCBList.size()-1;
428 }
429
430 void LBDB::RemoveMigrationDoneFn(LDMigrationDoneFn fn) {
431   for (int i=0; i<migrationDoneCBList.length(); i++) {
432     MigrationDoneCB* callbk = migrationDoneCBList[i];
433     if (callbk && callbk->fn == fn) {
434       delete callbk;
435       migrationDoneCBList[i] = 0; 
436       break;
437     }
438   }
439 }
440
441 void LBDB::MigrationDone() {
442   for (int i=0; i<migrationDoneCBList.length(); i++) {
443     MigrationDoneCB *callbk = migrationDoneCBList[i];
444     if (callbk) callbk->fn(callbk->data);
445   }
446 }
447
448 void LBDB::SetupPredictor(LDPredictModelFn on, LDPredictWindowFn onWin, LDPredictFn off, LDPredictModelFn change, void* data)
449 {
450   if (predictCBFn==NULL) predictCBFn = new PredictCB;
451   predictCBFn->on = on;
452   predictCBFn->onWin = onWin;
453   predictCBFn->off = off;
454   predictCBFn->change = change;
455   predictCBFn->data = data;
456 }
457
458 void LBDB::BackgroundLoad(LBRealType* bg_walltime, LBRealType* bg_cputime)
459 {
460   LBRealType total_walltime;
461   LBRealType total_cputime;
462   TotalTime(&total_walltime, &total_cputime);
463
464   LBRealType idletime;
465   IdleTime(&idletime);
466
467   *bg_walltime = total_walltime - idletime - obj_walltime;
468   if (*bg_walltime < 0) *bg_walltime = 0.;
469 #if CMK_LB_CPUTIMER
470   *bg_cputime = total_cputime - obj_cputime;
471 #else
472   *bg_cputime = *bg_walltime;
473 #endif
474 }
475
476 void LBDB::GetTime(LBRealType *total_walltime,LBRealType *total_cputime,
477                    LBRealType *idletime, LBRealType *bg_walltime, LBRealType *bg_cputime)
478 {
479   TotalTime(total_walltime,total_cputime);
480
481   IdleTime(idletime);
482   
483   *bg_walltime = *total_walltime - *idletime - obj_walltime;
484   if (*bg_walltime < 0) *bg_walltime = 0.;
485 #if CMK_LB_CPUTIMER
486   *bg_cputime = *total_cputime - obj_cputime;
487 #else
488   *bg_cputime = *bg_walltime;
489 #endif
490   //CkPrintf("HERE [%d] total: %f %f obj: %f %f idle: %f bg: %f\n", CkMyPe(), *total_walltime, *total_cputime, obj_walltime, obj_cputime, *idletime, *bg_walltime);
491 }
492
493 void LBDB::DumpDatabase()
494 {
495 #ifdef DEBUG  
496   CmiPrintf("Database contains %d object managers\n",omCount);
497   CmiPrintf("Database contains %d objects\n",objCount);
498 #endif
499 }
500
501 int LBDB::useMem() {
502   int size = sizeof(LBDB);
503   size += oms.length() * sizeof(LBOM);
504   size += ObjDataCount() * sizeof(LBObj);
505   size += migrateCBList.length() * sizeof(MigrateCBList);
506   size += startLBFnList.length() * sizeof(StartLBCB);
507   size += commTable->useMem();
508   return size;
509 }
510
511 LDBarrierClient LocalBarrier::AddClient(LDResumeFn fn, void* data)
512 {
513   client* new_client = new client;
514   new_client->fn = fn;
515   new_client->data = data;
516   new_client->refcount = cur_refcount;
517
518   LDBarrierClient ret_val;
519 #if CMK_BIGSIM_CHARM
520   ret_val.serial = first_free_client_slot;
521   clients.insert(ret_val.serial, new_client);
522
523   //looking for the next first free client slot
524   int nextfree=-1;
525   for(int i=first_free_client_slot+1; i<clients.size(); i++)
526     if(clients[i]==NULL) { nextfree = i; break; }
527   if(nextfree==-1) nextfree = clients.size();
528   first_free_client_slot = nextfree;
529
530   if(_BgOutOfCoreFlag!=2){
531     //during out-of-core emualtion for BigSim, if taking procs from disk to mem,
532     //client_count should not be increased
533     client_count++;
534   }
535
536 #else  
537   //ret_val.serial = max_client;
538   ret_val.serial = clients.size();
539   clients.insertAtEnd(new_client);
540   //max_client++;
541   client_count++;
542 #endif
543
544   return ret_val;
545 }
546
547 void LocalBarrier::RemoveClient(LDBarrierClient c)
548 {
549   const int cnum = c.serial;
550 #if CMK_BIGSIM_CHARM
551   if (cnum < clients.size() && clients[cnum] != 0) {
552     delete (clients[cnum]);
553     clients[cnum] = 0;
554
555     if(cnum<=first_free_client_slot) first_free_client_slot = cnum;
556
557     if(_BgOutOfCoreFlag!=1){
558         //during out-of-core emulation for BigSim, if taking procs from mem to disk,
559         //client_count should not be increased
560         client_count--;
561     }
562   }
563 #else
564   //if (cnum < max_client && clients[cnum] != 0) {
565   if (cnum < clients.size() && clients[cnum] != 0) {
566     delete (clients[cnum]);
567     clients[cnum] = 0;
568     client_count--;
569   }
570 #endif
571 }
572
573 LDBarrierReceiver LocalBarrier::AddReceiver(LDBarrierFn fn, void* data)
574 {
575   receiver* new_receiver = new receiver;
576   new_receiver->fn = fn;
577   new_receiver->data = data;
578   new_receiver->on = 1;
579
580   LDBarrierReceiver ret_val;
581 //  ret_val.serial = max_receiver;
582   ret_val.serial = receivers.size();
583   receivers.insertAtEnd(new_receiver);
584 //  max_receiver++;
585
586   return ret_val;
587 }
588
589 void LocalBarrier::RemoveReceiver(LDBarrierReceiver c)
590 {
591   const int cnum = c.serial;
592   //if (cnum < max_receiver && receivers[cnum] != 0) {
593   if (cnum < receivers.size() && receivers[cnum] != 0) {
594     delete (receivers[cnum]);
595     receivers[cnum] = 0;
596   }
597 }
598
599 void LocalBarrier::TurnOnReceiver(LDBarrierReceiver c)
600 {
601   const int cnum = c.serial;
602   //if (cnum < max_receiver && receivers[cnum] != 0) {
603   if (cnum < receivers.size() && receivers[cnum] != 0) {
604     receivers[cnum]->on = 1;
605   }
606 }
607
608 void LocalBarrier::TurnOffReceiver(LDBarrierReceiver c)
609 {
610   const int cnum = c.serial;
611   //if (cnum < max_receiver && receivers[cnum] != 0) {
612   if (cnum < receivers.size() && receivers[cnum] != 0) {
613     receivers[cnum]->on = 0;
614   }
615 }
616
617 void LocalBarrier::AtBarrier(LDBarrierClient h)
618 {
619   (clients[h.serial])->refcount++;
620   at_count++;
621   //if(CmiMyPartition()==1)
622          CkPrintf("[%d] at barrier\n",CkMyPe());
623   CheckBarrier();
624 }
625
626 void LocalBarrier::CheckBarrier()
627 {
628   if (!on) return;
629
630   // If there are no clients, resume as soon as we're turned on
631
632   //if(CmiMyPartition()==1){
633         CkPrintf("[%d] at_count %d client_count %d\n",CkMyPe(),at_count,client_count);
634   //}
635   if (client_count == 0) {
636     cur_refcount++;
637     CallReceivers();
638   }
639   if (at_count >= client_count) {
640     CmiBool at_barrier = CmiFalse;
641
642 //    for(int i=0; i < max_client; i++)
643     for(int i=0; i < clients.size(); i++)
644       if (clients[i] != 0 && ((client*)clients[i])->refcount >= cur_refcount)
645         at_barrier = CmiTrue;
646                 
647     if (at_barrier) {
648       at_count -= client_count;
649       cur_refcount++;
650       CallReceivers();
651     }
652   }
653 }
654
655 void LocalBarrier::CallReceivers(void)
656 {
657   CmiBool called_receiver=CmiFalse;
658 //  if(CmiMyPartition()==1)
659         //CkPrintf("[%d][%d] call receiver\n",CmiMyPartition(),CkMyPe());
660         CkPrintf("[%d] call receiver\n",CkMyPe());
661 //  for(int i=0; i < max_receiver; i++)
662 //   for (int i=max_receiver-1; i>=0; i--) {
663    for (int i=receivers.size()-1; i>=0; i--) {
664       receiver *recv = receivers[i];
665       if (recv != 0 && recv->on) {
666         recv->fn(recv->data);
667         called_receiver = CmiTrue;
668       }
669   }
670
671   if (!called_receiver)
672     ResumeClients();
673   
674 }
675
676 void LocalBarrier::ResumeClients(void)
677 {
678 //  for(int i=0; i < max_client; i++)
679   for(int i=0; i < clients.size(); i++)
680     if (clients[i] != 0) {
681       ((client*)clients[i])->fn(((client*)clients[i])->data);
682     }   
683 }
684
685 #endif
686
687 /*@}*/