reset state for CmiReduce after crash; remove print
[charm.git] / src / ck-ldb / LBDBManager.C
1 /**
2  * \addtogroup CkLdb
3 */
4 /*@{*/
5
6 #include <charm++.h>
7
8 #if CMK_LBDB_ON
9
10 #include "LBDBManager.h"
11
12 struct MigrateCB;
13
14 /*************************************************************
15  * Set up the builtin barrier-- the load balancer needs somebody
16  * to call AtSync on each PE in case there are no atSync array 
17  * elements.  The builtin-atSync caller (batsyncer) does this.
18  */
19
20 //Called periodically-- starts next load balancing cycle
21 void LBDB::batsyncer::gotoSync(void *bs)
22 {
23   LBDB::batsyncer *s=(LBDB::batsyncer *)bs;
24   s->db->AtLocalBarrier(s->BH);
25 }
26 //Called at end of each load balancing cycle
27 void LBDB::batsyncer::resumeFromSync(void *bs)
28 {
29   LBDB::batsyncer *s=(LBDB::batsyncer *)bs;
30
31 #if 0
32   double curT = CmiWallTimer();
33   if (s->nextT<curT)  s->period *= 2;
34   s->nextT = curT + s->period;
35 #endif
36
37   CcdCallFnAfterOnPE((CcdVoidFn)gotoSync, (void *)s, 1000*s->period, CkMyPe());
38 }
39
40 // initPeriod in seconds
41 void LBDB::batsyncer::init(LBDB *_db,double initPeriod)
42 {
43   db=_db;
44   period=initPeriod;
45   nextT = CmiWallTimer() + period;
46   BH = db->AddLocalBarrierClient((LDResumeFn)resumeFromSync,(void*)(this));
47   //This just does a CcdCallFnAfter
48   resumeFromSync((void *)this);
49 }
50
51
52 /*************************************************************
53  * LBDB Code
54  *************************************************************/
55
56 LBDB::LBDB(): useBarrier(CmiTrue)
57 {
58     statsAreOn = CmiFalse;
59     omCount = objCount = oms_registering = 0;
60     obj_running = CmiFalse;
61     commTable = new LBCommTable;
62     obj_walltime = 0;
63 #if CMK_LB_CPUTIMER
64     obj_cputime = 0;
65 #endif
66     startLBFn_count = 0;
67     predictCBFn = NULL;
68     batsync.init(this, _lb_args.lbperiod());        // original 1.0 second
69 }
70
71 LDOMHandle LBDB::AddOM(LDOMid _userID, void* _userData, 
72                        LDCallbacks _callbacks)
73 {
74   LDOMHandle newhandle;
75
76   newhandle.ldb.handle = (void*)(this);
77 //  newhandle.user_ptr = _userData;
78   newhandle.id = _userID;
79
80   LBOM* om = new LBOM(this,_userID,_userData,_callbacks);
81   if (om != NULL) {
82     newhandle.handle = oms.length();
83     oms.insertAtEnd(om);
84   } else newhandle.handle = -1;
85   om->DepositHandle(newhandle);
86   omCount++;
87   return newhandle;
88 }
89
90 #if CMK_BIGSIM_CHARM
91 #define LBOBJ_OOC_IDX 0x1
92 #endif
93
94 LDObjHandle LBDB::AddObj(LDOMHandle _omh, LDObjid _id,
95                          void *_userData, CmiBool _migratable)
96 {
97   LDObjHandle newhandle;
98
99   newhandle.omhandle = _omh;
100 //  newhandle.user_ptr = _userData;
101   newhandle.id = _id;
102   
103 #if 1
104 #if CMK_BIGSIM_CHARM
105   if(_BgOutOfCoreFlag==2){ //taking object into memory
106     //first find the first (LBOBJ_OOC_IDX) in objs and insert the object at that position
107     int newpos = -1;
108     for(int i=0; i<objs.length(); i++){
109         if(objs[i]==(LBObj *)LBOBJ_OOC_IDX){
110             newpos = i;
111             break;
112         }
113     }
114     if(newpos==-1) newpos = objs.length();
115     newhandle.handle = newpos;
116     LBObj *obj = new LBObj(this, newhandle, _userData, _migratable);
117     objs.insert(newpos, obj);
118     //objCount is not increased since it's the original object which is pupped
119     //through out-of-core emulation. 
120     //objCount++;
121   }else
122 #endif
123   {
124     //BIGSIM_OOC DEBUGGING
125     //CkPrintf("Proc[%d]: In AddObj for real migration\n", CkMyPe());
126     newhandle.handle = objs.length();
127     LBObj *obj = new LBObj(this, newhandle, _userData, _migratable);
128     objs.insertAtEnd(obj);
129     objCount++;
130   }
131   //BIGSIM_OOC DEBUGGING
132   //CkPrintf("LBDBManager.C: New handle: %d, LBObj=%p\n", newhandle.handle, objs[newhandle.handle]);
133
134 #else
135   LBObj *obj = new LBObj(this,_omh,_id,_userData,_migratable);
136   if (obj != NULL) {
137     newhandle.handle = objs.length();
138     objs.insertAtEnd(obj);
139   } else {
140     newhandle.handle = -1;
141   }
142   obj->DepositHandle(newhandle);
143 #endif
144   return newhandle;
145 }
146
147 void LBDB::UnregisterObj(LDObjHandle _h)
148 {
149 //  (objs[_h.handle])->registered=CmiFalse;
150 // free the memory, it is a memory leak.
151 // CmiPrintf("[%d] UnregisterObj: %d\n", CkMyPe(), _h.handle);
152   delete objs[_h.handle];
153
154 #if CMK_BIGSIM_CHARM
155   //hack for BigSim out-of-core emulation.
156   //we want the chare array object to keep at the same
157   //position even going through the pupping routine.
158   if(_BgOutOfCoreFlag==1){ //in taking object out of memory
159     objs[_h.handle] = (LBObj *)(LBOBJ_OOC_IDX);
160   }else
161 #endif
162   {
163     objs[_h.handle] = NULL;
164   }
165 }
166
167 void LBDB::RegisteringObjects(LDOMHandle _h)
168 {
169   // for an unregistered anonymous OM to join and control the barrier
170   if (_h.id.id.idx == 0) {
171     if (oms_registering == 0)
172       localBarrier.TurnOff();
173     oms_registering++;
174   }
175   else {
176   LBOM* om = oms[_h.handle];
177   if (!om->RegisteringObjs()) {
178     if (oms_registering == 0)
179       localBarrier.TurnOff();
180     oms_registering++;
181     om->SetRegisteringObjs(CmiTrue);
182   }
183   }
184 }
185
186 void LBDB::DoneRegisteringObjects(LDOMHandle _h)
187 {
188   // for an unregistered anonymous OM to join and control the barrier
189   if (_h.id.id.idx == 0) {
190     oms_registering--;
191     if (oms_registering == 0)
192       localBarrier.TurnOn();
193   }
194   else {
195   LBOM* om = oms[_h.handle];
196   if (om->RegisteringObjs()) {
197     oms_registering--;
198     if (oms_registering == 0)
199       localBarrier.TurnOn();
200     om->SetRegisteringObjs(CmiFalse);
201   }
202   }
203 }
204
205
206 void LBDB::Send(const LDOMHandle &destOM, const LDObjid &destid, unsigned int bytes, int destObjProc)
207 {
208   LBCommData* item_ptr;
209
210   if (obj_running) {
211     const LDObjHandle &runObj = RunningObj();
212
213     // Don't record self-messages from an object to an object
214     if ( LDOMidEqual(runObj.omhandle.id,destOM.id)
215          && LDObjIDEqual(runObj.id,destid) )
216       return;
217
218     // In the future, we'll have to eliminate processor to same 
219     // processor messages as well
220
221     LBCommData item(runObj,destOM.id,destid, destObjProc);
222     item_ptr = commTable->HashInsertUnique(item);
223   } else {
224     LBCommData item(CkMyPe(),destOM.id,destid, destObjProc);
225     item_ptr = commTable->HashInsertUnique(item);
226   }  
227   item_ptr->addMessage(bytes);
228 }
229
230 void LBDB::MulticastSend(const LDOMHandle &destOM, LDObjid *destids, int ndests, unsigned int bytes, int nMsgs)
231 {
232   LBCommData* item_ptr;
233
234   //CmiAssert(obj_running);
235   if (obj_running) {
236     const LDObjHandle &runObj = RunningObj();
237
238     LBCommData item(runObj,destOM.id,destids, ndests);
239     item_ptr = commTable->HashInsertUnique(item);
240     item_ptr->addMessage(bytes, nMsgs);
241   } 
242 }
243
244 void LBDB::ClearLoads(void)
245 {
246   int i;
247   for(i=0; i < objCount; i++) {
248     LBObj *obj = objs[i]; 
249     if (obj)
250     {
251       if (obj->data.wallTime>.0) {
252         obj->lastWallTime = obj->data.wallTime;
253 #if CMK_LB_CPUTIMER
254         obj->lastCpuTime = obj->data.cpuTime;
255 #endif
256       }
257       obj->data.wallTime = 0.;
258 #if CMK_LB_CPUTIMER
259       obj->data.cpuTime = 0.;
260 #endif
261     }
262   }
263   delete commTable;
264   commTable = new LBCommTable;
265   machineUtil.Clear();
266   obj_walltime = 0;
267 #if CMK_LB_CPUTIMER
268   obj_cputime = 0;
269 #endif
270 }
271
272 int LBDB::ObjDataCount()
273 {
274   int nitems=0;
275   int i;
276   if (_lb_args.migObjOnly()) {
277   for(i=0; i < objCount; i++)
278     if (objs[i] && (objs[i])->data.migratable)
279       nitems++;
280   }
281   else {
282   for(i=0; i < objCount; i++)
283     if (objs[i])
284       nitems++;
285   }
286   return nitems;
287 }
288
289 void LBDB::GetObjData(LDObjData *dp)
290 {
291   if (_lb_args.migObjOnly()) {
292   for(int i = 0; i < objs.length(); i++) {
293     LBObj* obj = objs[i];
294     if ( obj && obj->data.migratable)
295       *dp++ = obj->ObjData();
296   }
297   }
298   else {
299   for(int i = 0; i < objs.length(); i++) {
300     LBObj* obj = objs[i];
301     if (obj)
302       *dp++ = obj->ObjData();
303   }
304   }
305 }
306
307 int LBDB::Migrate(LDObjHandle h, int dest)
308 {
309     //BIGSIM_OOC DEBUGGING
310     //CmiPrintf("[%d] LBDB::Migrate: incoming handle %d with handle range 0-%d\n", CkMyPe(), h.handle, objCount);
311
312   if (h.handle > objCount)
313     CmiPrintf("[%d] LBDB::Migrate: Handle %d out of range 0-%d\n",CkMyPe(),h.handle,objCount);
314   else if (!objs[h.handle]) {
315     CmiPrintf("[%d] LBDB::Migrate: Handle %d no longer registered, range 0-%d\n", CkMyPe(),h.handle,objCount);
316     return 0;
317   }
318
319   if ((h.handle < objCount) && objs[h.handle]) {
320     LBOM *const om = oms[(objs[h.handle])->parentOM().handle];
321     om->Migrate(h, dest);
322   }
323   return 1;
324 }
325
326 void LBDB::MetaLBResumeWaitingChares(int lb_ideal_period) {
327   for (int i = 0; i < objs.length(); i++) {
328     LBObj* obj = objs[i];
329     if (obj) {
330       LBOM *om = oms[obj->parentOM().handle];
331       LDObjHandle h = obj->GetLDObjHandle();
332       om->MetaLBResumeWaitingChares(h, lb_ideal_period);
333     }
334   }
335 }
336
337 void LBDB::MetaLBCallLBOnChares() {
338   for (int i = 0; i < objs.length(); i++) {
339     LBObj* obj = objs[i];
340     if (obj) {
341       LBOM *om = oms[obj->parentOM().handle];
342       LDObjHandle h = obj->GetLDObjHandle();
343       om->MetaLBCallLBOnChares(h);
344     }
345   }
346 }
347
348 void LBDB::Migrated(LDObjHandle h, int waitBarrier)
349 {
350   // Object migrated, inform load balancers
351
352   // subtle: callback may change (on) when switching LBs
353   // call in reverse order
354   //for(int i=0; i < migrateCBList.length(); i++) {
355   for(int i=migrateCBList.length()-1; i>=0; i--) {
356     MigrateCB* cb = (MigrateCB*)migrateCBList[i];
357     if (cb && cb->on) (cb->fn)(cb->data,h,waitBarrier);
358   }
359   
360 }
361
362
363 int LBDB::NotifyMigrated(LDMigratedFn fn, void* data)
364 {
365   // Save migration function
366   MigrateCB* callbk = new MigrateCB;
367
368   callbk->fn = fn;
369   callbk->data = data;
370   callbk->on = 1;
371   migrateCBList.insertAtEnd(callbk);
372   return migrateCBList.size()-1;
373 }
374
375 void LBDB::RemoveNotifyMigrated(int handle)
376 {
377   MigrateCB* callbk = migrateCBList[handle];
378   migrateCBList[handle] = NULL;
379   delete callbk;
380 }
381
382 int LBDB::AddStartLBFn(LDStartLBFn fn, void* data)
383 {
384   // Save startLB function
385   StartLBCB* callbk = new StartLBCB;
386
387   callbk->fn = fn;
388   callbk->data = data;
389   callbk->on = 1;
390   startLBFnList.push_back(callbk);
391   startLBFn_count++;
392   return startLBFnList.size()-1;
393 }
394
395 void LBDB::RemoveStartLBFn(LDStartLBFn fn)
396 {
397   for (int i=0; i<startLBFnList.length(); i++) {
398     StartLBCB* callbk = startLBFnList[i];
399     if (callbk && callbk->fn == fn) {
400       delete callbk;
401       startLBFnList[i] = 0; 
402       startLBFn_count --;
403       break;
404     }
405   }
406 }
407
408 void LBDB::StartLB() 
409 {
410   if (startLBFn_count == 0) {
411     CmiAbort("StartLB is not supported in this LB");
412   }
413   for (int i=0; i<startLBFnList.length(); i++) {
414     StartLBCB *startLBFn = startLBFnList[i];
415     if (startLBFn && startLBFn->on) startLBFn->fn(startLBFn->data);
416   }
417 }
418
419 int LBDB::AddMigrationDoneFn(LDMigrationDoneFn fn, void* data) {
420   // Save migrationDone callback function
421   MigrationDoneCB* callbk = new MigrationDoneCB;
422
423   callbk->fn = fn;
424   callbk->data = data;
425   migrationDoneCBList.push_back(callbk);
426   return migrationDoneCBList.size()-1;
427 }
428
429 void LBDB::RemoveMigrationDoneFn(LDMigrationDoneFn fn) {
430   for (int i=0; i<migrationDoneCBList.length(); i++) {
431     MigrationDoneCB* callbk = migrationDoneCBList[i];
432     if (callbk && callbk->fn == fn) {
433       delete callbk;
434       migrationDoneCBList[i] = 0; 
435       break;
436     }
437   }
438 }
439
440 void LBDB::MigrationDone() {
441   for (int i=0; i<migrationDoneCBList.length(); i++) {
442     MigrationDoneCB *callbk = migrationDoneCBList[i];
443     if (callbk) callbk->fn(callbk->data);
444   }
445 }
446
447 void LBDB::SetupPredictor(LDPredictModelFn on, LDPredictWindowFn onWin, LDPredictFn off, LDPredictModelFn change, void* data)
448 {
449   if (predictCBFn==NULL) predictCBFn = new PredictCB;
450   predictCBFn->on = on;
451   predictCBFn->onWin = onWin;
452   predictCBFn->off = off;
453   predictCBFn->change = change;
454   predictCBFn->data = data;
455 }
456
457 void LBDB::BackgroundLoad(LBRealType* bg_walltime, LBRealType* bg_cputime)
458 {
459   LBRealType total_walltime;
460   LBRealType total_cputime;
461   TotalTime(&total_walltime, &total_cputime);
462
463   LBRealType idletime;
464   IdleTime(&idletime);
465
466   *bg_walltime = total_walltime - idletime - obj_walltime;
467   if (*bg_walltime < 0) *bg_walltime = 0.;
468 #if CMK_LB_CPUTIMER
469   *bg_cputime = total_cputime - obj_cputime;
470 #else
471   *bg_cputime = *bg_walltime;
472 #endif
473 }
474
475 void LBDB::GetTime(LBRealType *total_walltime,LBRealType *total_cputime,
476                    LBRealType *idletime, LBRealType *bg_walltime, LBRealType *bg_cputime)
477 {
478   TotalTime(total_walltime,total_cputime);
479
480   IdleTime(idletime);
481   
482   *bg_walltime = *total_walltime - *idletime - obj_walltime;
483   if (*bg_walltime < 0) *bg_walltime = 0.;
484 #if CMK_LB_CPUTIMER
485   *bg_cputime = *total_cputime - obj_cputime;
486 #else
487   *bg_cputime = *bg_walltime;
488 #endif
489   //CkPrintf("HERE [%d] total: %f %f obj: %f %f idle: %f bg: %f\n", CkMyPe(), *total_walltime, *total_cputime, obj_walltime, obj_cputime, *idletime, *bg_walltime);
490 }
491
492 void LBDB::DumpDatabase()
493 {
494 #ifdef DEBUG  
495   CmiPrintf("Database contains %d object managers\n",omCount);
496   CmiPrintf("Database contains %d objects\n",objCount);
497 #endif
498 }
499
500 int LBDB::useMem() {
501   int size = sizeof(LBDB);
502   size += oms.length() * sizeof(LBOM);
503   size += ObjDataCount() * sizeof(LBObj);
504   size += migrateCBList.length() * sizeof(MigrateCBList);
505   size += startLBFnList.length() * sizeof(StartLBCB);
506   size += commTable->useMem();
507   return size;
508 }
509
510 LDBarrierClient LocalBarrier::AddClient(LDResumeFn fn, void* data)
511 {
512   client* new_client = new client;
513   new_client->fn = fn;
514   new_client->data = data;
515   new_client->refcount = cur_refcount;
516
517   LDBarrierClient ret_val;
518 #if CMK_BIGSIM_CHARM
519   ret_val.serial = first_free_client_slot;
520   clients.insert(ret_val.serial, new_client);
521
522   //looking for the next first free client slot
523   int nextfree=-1;
524   for(int i=first_free_client_slot+1; i<clients.size(); i++)
525     if(clients[i]==NULL) { nextfree = i; break; }
526   if(nextfree==-1) nextfree = clients.size();
527   first_free_client_slot = nextfree;
528
529   if(_BgOutOfCoreFlag!=2){
530     //during out-of-core emualtion for BigSim, if taking procs from disk to mem,
531     //client_count should not be increased
532     client_count++;
533   }
534
535 #else  
536   //ret_val.serial = max_client;
537   ret_val.serial = clients.size();
538   clients.insertAtEnd(new_client);
539   //max_client++;
540   client_count++;
541 #endif
542
543   return ret_val;
544 }
545
546 void LocalBarrier::RemoveClient(LDBarrierClient c)
547 {
548   const int cnum = c.serial;
549 #if CMK_BIGSIM_CHARM
550   if (cnum < clients.size() && clients[cnum] != 0) {
551     delete (clients[cnum]);
552     clients[cnum] = 0;
553
554     if(cnum<=first_free_client_slot) first_free_client_slot = cnum;
555
556     if(_BgOutOfCoreFlag!=1){
557         //during out-of-core emulation for BigSim, if taking procs from mem to disk,
558         //client_count should not be increased
559         client_count--;
560     }
561   }
562 #else
563   //if (cnum < max_client && clients[cnum] != 0) {
564   if (cnum < clients.size() && clients[cnum] != 0) {
565     delete (clients[cnum]);
566     clients[cnum] = 0;
567     client_count--;
568   }
569 #endif
570 }
571
572 LDBarrierReceiver LocalBarrier::AddReceiver(LDBarrierFn fn, void* data)
573 {
574   receiver* new_receiver = new receiver;
575   new_receiver->fn = fn;
576   new_receiver->data = data;
577   new_receiver->on = 1;
578
579   LDBarrierReceiver ret_val;
580 //  ret_val.serial = max_receiver;
581   ret_val.serial = receivers.size();
582   receivers.insertAtEnd(new_receiver);
583 //  max_receiver++;
584
585   return ret_val;
586 }
587
588 void LocalBarrier::RemoveReceiver(LDBarrierReceiver c)
589 {
590   const int cnum = c.serial;
591   //if (cnum < max_receiver && receivers[cnum] != 0) {
592   if (cnum < receivers.size() && receivers[cnum] != 0) {
593     delete (receivers[cnum]);
594     receivers[cnum] = 0;
595   }
596 }
597
598 void LocalBarrier::TurnOnReceiver(LDBarrierReceiver c)
599 {
600   const int cnum = c.serial;
601   //if (cnum < max_receiver && receivers[cnum] != 0) {
602   if (cnum < receivers.size() && receivers[cnum] != 0) {
603     receivers[cnum]->on = 1;
604   }
605 }
606
607 void LocalBarrier::TurnOffReceiver(LDBarrierReceiver c)
608 {
609   const int cnum = c.serial;
610   //if (cnum < max_receiver && receivers[cnum] != 0) {
611   if (cnum < receivers.size() && receivers[cnum] != 0) {
612     receivers[cnum]->on = 0;
613   }
614 }
615
616 void LocalBarrier::AtBarrier(LDBarrierClient h)
617 {
618   (clients[h.serial])->refcount++;
619   at_count++;
620   //if(CmiMyPartition()==1)
621  //CkPrintf("[%d][%d] at barrier\n",CmiMyPartition(),CkMyPe());
622   CheckBarrier();
623 }
624
625 void LocalBarrier::CheckBarrier()
626 {
627   if (!on) return;
628
629   // If there are no clients, resume as soon as we're turned on
630
631   //if(CmiMyPartition()==1){
632  //     CkPrintf("[%d] at_count %d client_count %d\n",CkMyPe(),at_count,client_count);
633   //}
634   if (client_count == 0) {
635     cur_refcount++;
636     CallReceivers();
637   }
638   if (at_count >= client_count) {
639     CmiBool at_barrier = CmiFalse;
640
641 //    for(int i=0; i < max_client; i++)
642     for(int i=0; i < clients.size(); i++)
643       if (clients[i] != 0 && ((client*)clients[i])->refcount >= cur_refcount)
644         at_barrier = CmiTrue;
645                 
646     if (at_barrier) {
647       at_count -= client_count;
648       cur_refcount++;
649       CallReceivers();
650     }
651   }
652 }
653
654 void LocalBarrier::CallReceivers(void)
655 {
656   CmiBool called_receiver=CmiFalse;
657 //  if(CmiMyPartition()==1)
658         //CkPrintf("[%d][%d] call receiver\n",CmiMyPartition(),CkMyPe());
659         CkPrintf("[%d] call receiver\n",CkMyPe());
660 //  for(int i=0; i < max_receiver; i++)
661 //   for (int i=max_receiver-1; i>=0; i--) {
662    for (int i=receivers.size()-1; i>=0; i--) {
663       receiver *recv = receivers[i];
664       if (recv != 0 && recv->on) {
665         recv->fn(recv->data);
666         called_receiver = CmiTrue;
667       }
668   }
669
670   if (!called_receiver)
671     ResumeClients();
672   
673 }
674
675 void LocalBarrier::ResumeClients(void)
676 {
677 //  for(int i=0; i < max_client; i++)
678   for(int i=0; i < clients.size(); i++)
679     if (clients[i] != 0) {
680       ((client*)clients[i])->fn(((client*)clients[i])->data);
681     }   
682 }
683
684 #endif
685
686 /*@}*/