01d0be4d59e251323797868bfd337867e6c37fcf
[charm.git] / src / ck-ldb / LBDBManager.C
1 /**
2  * \addtogroup CkLdb
3 */
4 /*@{*/
5
6 #include <charm++.h>
7
8 #if CMK_LBDB_ON
9
10 #include "LBDBManager.h"
11
12 struct MigrateCB;
13
14 /*************************************************************
15  * Set up the builtin barrier-- the load balancer needs somebody
16  * to call AtSync on each PE in case there are no atSync array 
17  * elements.  The builtin-atSync caller (batsyncer) does this.
18  */
19
20 //Called periodically-- starts next load balancing cycle
21 void LBDB::batsyncer::gotoSync(void *bs)
22 {
23         CkPrintf("[%d][%d] go to sync\n",CmiMyPartition(),CkMyPe());
24   LBDB::batsyncer *s=(LBDB::batsyncer *)bs;
25   s->db->AtLocalBarrier(s->BH);
26 }
27 //Called at end of each load balancing cycle
28 void LBDB::batsyncer::resumeFromSync(void *bs)
29 {
30   LBDB::batsyncer *s=(LBDB::batsyncer *)bs;
31 //  CmiPrintf("[%d] LBDB::batsyncer::resumeFromSync with %gs\n", CkMyPe(), s->period);
32
33 #if 0
34   double curT = CmiWallTimer();
35   if (s->nextT<curT)  s->period *= 2;
36   s->nextT = curT + s->period;
37 #endif
38
39   CcdCallFnAfterOnPE((CcdVoidFn)gotoSync, (void *)s, 10000000000*s->period, CkMyPe());
40 }
41
42 // initPeriod in seconds
43 void LBDB::batsyncer::init(LBDB *_db,double initPeriod)
44 {
45   db=_db;
46   period=initPeriod;
47   nextT = CmiWallTimer() + period;
48   BH = db->AddLocalBarrierClient((LDResumeFn)resumeFromSync,(void*)(this));
49   //This just does a CcdCallFnAfter
50   resumeFromSync((void *)this);
51 }
52
53
54 /*************************************************************
55  * LBDB Code
56  *************************************************************/
57
58 LBDB::LBDB(): useBarrier(CmiTrue)
59 {
60     statsAreOn = CmiFalse;
61     omCount = objCount = oms_registering = 0;
62     obj_running = CmiFalse;
63     commTable = new LBCommTable;
64     obj_walltime = 0;
65 #if CMK_LB_CPUTIMER
66     obj_cputime = 0;
67 #endif
68     startLBFn_count = 0;
69     predictCBFn = NULL;
70     batsync.init(this, _lb_args.lbperiod());        // original 1.0 second
71 }
72
73 LDOMHandle LBDB::AddOM(LDOMid _userID, void* _userData, 
74                        LDCallbacks _callbacks)
75 {
76   LDOMHandle newhandle;
77
78   newhandle.ldb.handle = (void*)(this);
79 //  newhandle.user_ptr = _userData;
80   newhandle.id = _userID;
81
82   LBOM* om = new LBOM(this,_userID,_userData,_callbacks);
83   if (om != NULL) {
84     newhandle.handle = oms.length();
85     oms.insertAtEnd(om);
86   } else newhandle.handle = -1;
87   om->DepositHandle(newhandle);
88   omCount++;
89   return newhandle;
90 }
91
92 #if CMK_BIGSIM_CHARM
93 #define LBOBJ_OOC_IDX 0x1
94 #endif
95
96 LDObjHandle LBDB::AddObj(LDOMHandle _omh, LDObjid _id,
97                          void *_userData, CmiBool _migratable)
98 {
99   LDObjHandle newhandle;
100
101   newhandle.omhandle = _omh;
102 //  newhandle.user_ptr = _userData;
103   newhandle.id = _id;
104   
105 #if 1
106 #if CMK_BIGSIM_CHARM
107   if(_BgOutOfCoreFlag==2){ //taking object into memory
108     //first find the first (LBOBJ_OOC_IDX) in objs and insert the object at that position
109     int newpos = -1;
110     for(int i=0; i<objs.length(); i++){
111         if(objs[i]==(LBObj *)LBOBJ_OOC_IDX){
112             newpos = i;
113             break;
114         }
115     }
116     if(newpos==-1) newpos = objs.length();
117     newhandle.handle = newpos;
118     LBObj *obj = new LBObj(this, newhandle, _userData, _migratable);
119     objs.insert(newpos, obj);
120     //objCount is not increased since it's the original object which is pupped
121     //through out-of-core emulation. 
122     //objCount++;
123   }else
124 #endif
125   {
126     //BIGSIM_OOC DEBUGGING
127     //CkPrintf("Proc[%d]: In AddObj for real migration\n", CkMyPe());
128     newhandle.handle = objs.length();
129     LBObj *obj = new LBObj(this, newhandle, _userData, _migratable);
130     objs.insertAtEnd(obj);
131     objCount++;
132   }
133   //BIGSIM_OOC DEBUGGING
134   //CkPrintf("LBDBManager.C: New handle: %d, LBObj=%p\n", newhandle.handle, objs[newhandle.handle]);
135
136 #else
137   LBObj *obj = new LBObj(this,_omh,_id,_userData,_migratable);
138   if (obj != NULL) {
139     newhandle.handle = objs.length();
140     objs.insertAtEnd(obj);
141   } else {
142     newhandle.handle = -1;
143   }
144   obj->DepositHandle(newhandle);
145 #endif
146   return newhandle;
147 }
148
149 void LBDB::UnregisterObj(LDObjHandle _h)
150 {
151 //  (objs[_h.handle])->registered=CmiFalse;
152 // free the memory, it is a memory leak.
153 // CmiPrintf("[%d] UnregisterObj: %d\n", CkMyPe(), _h.handle);
154   delete objs[_h.handle];
155
156 #if CMK_BIGSIM_CHARM
157   //hack for BigSim out-of-core emulation.
158   //we want the chare array object to keep at the same
159   //position even going through the pupping routine.
160   if(_BgOutOfCoreFlag==1){ //in taking object out of memory
161     objs[_h.handle] = (LBObj *)(LBOBJ_OOC_IDX);
162   }else
163 #endif
164   {
165     objs[_h.handle] = NULL;
166   }
167 }
168
169 void LBDB::RegisteringObjects(LDOMHandle _h)
170 {
171   // for an unregistered anonymous OM to join and control the barrier
172   if (_h.id.id.idx == 0) {
173     if (oms_registering == 0)
174       localBarrier.TurnOff();
175     oms_registering++;
176   }
177   else {
178   LBOM* om = oms[_h.handle];
179   if (!om->RegisteringObjs()) {
180     if (oms_registering == 0)
181       localBarrier.TurnOff();
182     oms_registering++;
183     om->SetRegisteringObjs(CmiTrue);
184   }
185   }
186 }
187
188 void LBDB::DoneRegisteringObjects(LDOMHandle _h)
189 {
190   // for an unregistered anonymous OM to join and control the barrier
191   if (_h.id.id.idx == 0) {
192     oms_registering--;
193     if (oms_registering == 0)
194       localBarrier.TurnOn();
195   }
196   else {
197   LBOM* om = oms[_h.handle];
198   if (om->RegisteringObjs()) {
199     oms_registering--;
200     if (oms_registering == 0)
201       localBarrier.TurnOn();
202     om->SetRegisteringObjs(CmiFalse);
203   }
204   }
205 }
206
207
208 void LBDB::Send(const LDOMHandle &destOM, const LDObjid &destid, unsigned int bytes, int destObjProc)
209 {
210   LBCommData* item_ptr;
211
212   if (obj_running) {
213     const LDObjHandle &runObj = RunningObj();
214
215     // Don't record self-messages from an object to an object
216     if ( LDOMidEqual(runObj.omhandle.id,destOM.id)
217          && LDObjIDEqual(runObj.id,destid) )
218       return;
219
220     // In the future, we'll have to eliminate processor to same 
221     // processor messages as well
222
223     LBCommData item(runObj,destOM.id,destid, destObjProc);
224     item_ptr = commTable->HashInsertUnique(item);
225   } else {
226     LBCommData item(CkMyPe(),destOM.id,destid, destObjProc);
227     item_ptr = commTable->HashInsertUnique(item);
228   }  
229   item_ptr->addMessage(bytes);
230 }
231
232 void LBDB::MulticastSend(const LDOMHandle &destOM, LDObjid *destids, int ndests, unsigned int bytes, int nMsgs)
233 {
234   LBCommData* item_ptr;
235
236   //CmiAssert(obj_running);
237   if (obj_running) {
238     const LDObjHandle &runObj = RunningObj();
239
240     LBCommData item(runObj,destOM.id,destids, ndests);
241     item_ptr = commTable->HashInsertUnique(item);
242     item_ptr->addMessage(bytes, nMsgs);
243   } 
244 }
245
246 void LBDB::ClearLoads(void)
247 {
248   int i;
249   for(i=0; i < objCount; i++) {
250     LBObj *obj = objs[i]; 
251     if (obj)
252     {
253       if (obj->data.wallTime>.0) {
254         obj->lastWallTime = obj->data.wallTime;
255 #if CMK_LB_CPUTIMER
256         obj->lastCpuTime = obj->data.cpuTime;
257 #endif
258       }
259       obj->data.wallTime = 0.;
260 #if CMK_LB_CPUTIMER
261       obj->data.cpuTime = 0.;
262 #endif
263     }
264   }
265   delete commTable;
266   commTable = new LBCommTable;
267   machineUtil.Clear();
268   obj_walltime = 0;
269 #if CMK_LB_CPUTIMER
270   obj_cputime = 0;
271 #endif
272 }
273
274 int LBDB::ObjDataCount()
275 {
276   int nitems=0;
277   int i;
278   if (_lb_args.migObjOnly()) {
279   for(i=0; i < objCount; i++)
280     if (objs[i] && (objs[i])->data.migratable)
281       nitems++;
282   }
283   else {
284   for(i=0; i < objCount; i++)
285     if (objs[i])
286       nitems++;
287   }
288   return nitems;
289 }
290
291 void LBDB::GetObjData(LDObjData *dp)
292 {
293   if (_lb_args.migObjOnly()) {
294   for(int i = 0; i < objs.length(); i++) {
295     LBObj* obj = objs[i];
296     if ( obj && obj->data.migratable)
297       *dp++ = obj->ObjData();
298   }
299   }
300   else {
301   for(int i = 0; i < objs.length(); i++) {
302     LBObj* obj = objs[i];
303     if (obj)
304       *dp++ = obj->ObjData();
305   }
306   }
307 }
308
309 int LBDB::Migrate(LDObjHandle h, int dest)
310 {
311     //BIGSIM_OOC DEBUGGING
312     //CmiPrintf("[%d] LBDB::Migrate: incoming handle %d with handle range 0-%d\n", CkMyPe(), h.handle, objCount);
313
314   if (h.handle > objCount)
315     CmiPrintf("[%d] LBDB::Migrate: Handle %d out of range 0-%d\n",CkMyPe(),h.handle,objCount);
316   else if (!objs[h.handle]) {
317     CmiPrintf("[%d] LBDB::Migrate: Handle %d no longer registered, range 0-%d\n", CkMyPe(),h.handle,objCount);
318     return 0;
319   }
320
321   if ((h.handle < objCount) && objs[h.handle]) {
322     LBOM *const om = oms[(objs[h.handle])->parentOM().handle];
323     om->Migrate(h, dest);
324   }
325   return 1;
326 }
327
328 void LBDB::MetaLBResumeWaitingChares(int lb_ideal_period) {
329   for (int i = 0; i < objs.length(); i++) {
330     LBObj* obj = objs[i];
331     if (obj) {
332       LBOM *om = oms[obj->parentOM().handle];
333       LDObjHandle h = obj->GetLDObjHandle();
334       om->MetaLBResumeWaitingChares(h, lb_ideal_period);
335     }
336   }
337 }
338
339 void LBDB::MetaLBCallLBOnChares() {
340   for (int i = 0; i < objs.length(); i++) {
341     LBObj* obj = objs[i];
342     if (obj) {
343       LBOM *om = oms[obj->parentOM().handle];
344       LDObjHandle h = obj->GetLDObjHandle();
345       om->MetaLBCallLBOnChares(h);
346     }
347   }
348 }
349
350 void LBDB::Migrated(LDObjHandle h, int waitBarrier)
351 {
352   // Object migrated, inform load balancers
353
354   // subtle: callback may change (on) when switching LBs
355   // call in reverse order
356   //for(int i=0; i < migrateCBList.length(); i++) {
357   for(int i=migrateCBList.length()-1; i>=0; i--) {
358     MigrateCB* cb = (MigrateCB*)migrateCBList[i];
359     if (cb && cb->on) (cb->fn)(cb->data,h,waitBarrier);
360   }
361   
362 }
363
364
365 int LBDB::NotifyMigrated(LDMigratedFn fn, void* data)
366 {
367   // Save migration function
368   MigrateCB* callbk = new MigrateCB;
369
370   callbk->fn = fn;
371   callbk->data = data;
372   callbk->on = 1;
373   migrateCBList.insertAtEnd(callbk);
374   return migrateCBList.size()-1;
375 }
376
377 void LBDB::RemoveNotifyMigrated(int handle)
378 {
379   MigrateCB* callbk = migrateCBList[handle];
380   migrateCBList[handle] = NULL;
381   delete callbk;
382 }
383
384 int LBDB::AddStartLBFn(LDStartLBFn fn, void* data)
385 {
386   // Save startLB function
387   StartLBCB* callbk = new StartLBCB;
388
389   callbk->fn = fn;
390   callbk->data = data;
391   callbk->on = 1;
392   startLBFnList.push_back(callbk);
393   startLBFn_count++;
394   return startLBFnList.size()-1;
395 }
396
397 void LBDB::RemoveStartLBFn(LDStartLBFn fn)
398 {
399   for (int i=0; i<startLBFnList.length(); i++) {
400     StartLBCB* callbk = startLBFnList[i];
401     if (callbk && callbk->fn == fn) {
402       delete callbk;
403       startLBFnList[i] = 0; 
404       startLBFn_count --;
405       break;
406     }
407   }
408 }
409
410 void LBDB::StartLB() 
411 {
412   if (startLBFn_count == 0) {
413     CmiAbort("StartLB is not supported in this LB");
414   }
415   for (int i=0; i<startLBFnList.length(); i++) {
416     StartLBCB *startLBFn = startLBFnList[i];
417     if (startLBFn && startLBFn->on) startLBFn->fn(startLBFn->data);
418   }
419 }
420
421 int LBDB::AddMigrationDoneFn(LDMigrationDoneFn fn, void* data) {
422   // Save migrationDone callback function
423   MigrationDoneCB* callbk = new MigrationDoneCB;
424
425   callbk->fn = fn;
426   callbk->data = data;
427   migrationDoneCBList.push_back(callbk);
428   return migrationDoneCBList.size()-1;
429 }
430
431 void LBDB::RemoveMigrationDoneFn(LDMigrationDoneFn fn) {
432   for (int i=0; i<migrationDoneCBList.length(); i++) {
433     MigrationDoneCB* callbk = migrationDoneCBList[i];
434     if (callbk && callbk->fn == fn) {
435       delete callbk;
436       migrationDoneCBList[i] = 0; 
437       break;
438     }
439   }
440 }
441
442 void LBDB::MigrationDone() {
443   for (int i=0; i<migrationDoneCBList.length(); i++) {
444     MigrationDoneCB *callbk = migrationDoneCBList[i];
445     if (callbk) callbk->fn(callbk->data);
446   }
447 }
448
449 void LBDB::SetupPredictor(LDPredictModelFn on, LDPredictWindowFn onWin, LDPredictFn off, LDPredictModelFn change, void* data)
450 {
451   if (predictCBFn==NULL) predictCBFn = new PredictCB;
452   predictCBFn->on = on;
453   predictCBFn->onWin = onWin;
454   predictCBFn->off = off;
455   predictCBFn->change = change;
456   predictCBFn->data = data;
457 }
458
459 void LBDB::BackgroundLoad(LBRealType* bg_walltime, LBRealType* bg_cputime)
460 {
461   LBRealType total_walltime;
462   LBRealType total_cputime;
463   TotalTime(&total_walltime, &total_cputime);
464
465   LBRealType idletime;
466   IdleTime(&idletime);
467
468   *bg_walltime = total_walltime - idletime - obj_walltime;
469   if (*bg_walltime < 0) *bg_walltime = 0.;
470 #if CMK_LB_CPUTIMER
471   *bg_cputime = total_cputime - obj_cputime;
472 #else
473   *bg_cputime = *bg_walltime;
474 #endif
475 }
476
477 void LBDB::GetTime(LBRealType *total_walltime,LBRealType *total_cputime,
478                    LBRealType *idletime, LBRealType *bg_walltime, LBRealType *bg_cputime)
479 {
480   TotalTime(total_walltime,total_cputime);
481
482   IdleTime(idletime);
483   
484   *bg_walltime = *total_walltime - *idletime - obj_walltime;
485   if (*bg_walltime < 0) *bg_walltime = 0.;
486 #if CMK_LB_CPUTIMER
487   *bg_cputime = *total_cputime - obj_cputime;
488 #else
489   *bg_cputime = *bg_walltime;
490 #endif
491   //CkPrintf("HERE [%d] total: %f %f obj: %f %f idle: %f bg: %f\n", CkMyPe(), *total_walltime, *total_cputime, obj_walltime, obj_cputime, *idletime, *bg_walltime);
492 }
493
494 void LBDB::DumpDatabase()
495 {
496 #ifdef DEBUG  
497   CmiPrintf("Database contains %d object managers\n",omCount);
498   CmiPrintf("Database contains %d objects\n",objCount);
499 #endif
500 }
501
502 int LBDB::useMem() {
503   int size = sizeof(LBDB);
504   size += oms.length() * sizeof(LBOM);
505   size += ObjDataCount() * sizeof(LBObj);
506   size += migrateCBList.length() * sizeof(MigrateCBList);
507   size += startLBFnList.length() * sizeof(StartLBCB);
508   size += commTable->useMem();
509   return size;
510 }
511
512 LDBarrierClient LocalBarrier::AddClient(LDResumeFn fn, void* data)
513 {
514   client* new_client = new client;
515   new_client->fn = fn;
516   new_client->data = data;
517   new_client->refcount = cur_refcount;
518
519   LDBarrierClient ret_val;
520 #if CMK_BIGSIM_CHARM
521   ret_val.serial = first_free_client_slot;
522   clients.insert(ret_val.serial, new_client);
523
524   //looking for the next first free client slot
525   int nextfree=-1;
526   for(int i=first_free_client_slot+1; i<clients.size(); i++)
527     if(clients[i]==NULL) { nextfree = i; break; }
528   if(nextfree==-1) nextfree = clients.size();
529   first_free_client_slot = nextfree;
530
531   if(_BgOutOfCoreFlag!=2){
532     //during out-of-core emualtion for BigSim, if taking procs from disk to mem,
533     //client_count should not be increased
534     client_count++;
535   }
536
537 #else  
538   //ret_val.serial = max_client;
539   ret_val.serial = clients.size();
540   clients.insertAtEnd(new_client);
541   //max_client++;
542   client_count++;
543 #endif
544
545   return ret_val;
546 }
547
548 void LocalBarrier::RemoveClient(LDBarrierClient c)
549 {
550   const int cnum = c.serial;
551 #if CMK_BIGSIM_CHARM
552   if (cnum < clients.size() && clients[cnum] != 0) {
553     delete (clients[cnum]);
554     clients[cnum] = 0;
555
556     if(cnum<=first_free_client_slot) first_free_client_slot = cnum;
557
558     if(_BgOutOfCoreFlag!=1){
559         //during out-of-core emulation for BigSim, if taking procs from mem to disk,
560         //client_count should not be increased
561         client_count--;
562     }
563   }
564 #else
565   //if (cnum < max_client && clients[cnum] != 0) {
566   if (cnum < clients.size() && clients[cnum] != 0) {
567     delete (clients[cnum]);
568     clients[cnum] = 0;
569     client_count--;
570   }
571 #endif
572 }
573
574 LDBarrierReceiver LocalBarrier::AddReceiver(LDBarrierFn fn, void* data)
575 {
576   receiver* new_receiver = new receiver;
577   new_receiver->fn = fn;
578   new_receiver->data = data;
579   new_receiver->on = 1;
580
581   LDBarrierReceiver ret_val;
582 //  ret_val.serial = max_receiver;
583   ret_val.serial = receivers.size();
584   receivers.insertAtEnd(new_receiver);
585 //  max_receiver++;
586
587   return ret_val;
588 }
589
590 void LocalBarrier::RemoveReceiver(LDBarrierReceiver c)
591 {
592   const int cnum = c.serial;
593   //if (cnum < max_receiver && receivers[cnum] != 0) {
594   if (cnum < receivers.size() && receivers[cnum] != 0) {
595     delete (receivers[cnum]);
596     receivers[cnum] = 0;
597   }
598 }
599
600 void LocalBarrier::TurnOnReceiver(LDBarrierReceiver c)
601 {
602   const int cnum = c.serial;
603   //if (cnum < max_receiver && receivers[cnum] != 0) {
604   if (cnum < receivers.size() && receivers[cnum] != 0) {
605     receivers[cnum]->on = 1;
606   }
607 }
608
609 void LocalBarrier::TurnOffReceiver(LDBarrierReceiver c)
610 {
611   const int cnum = c.serial;
612   //if (cnum < max_receiver && receivers[cnum] != 0) {
613   if (cnum < receivers.size() && receivers[cnum] != 0) {
614     receivers[cnum]->on = 0;
615   }
616 }
617
618 void LocalBarrier::AtBarrier(LDBarrierClient h)
619 {
620   (clients[h.serial])->refcount++;
621   at_count++;
622   //if(CmiMyPartition()==1)
623  //CkPrintf("[%d][%d] at barrier\n",CmiMyPartition(),CkMyPe());
624   CheckBarrier();
625 }
626
627 void LocalBarrier::CheckBarrier()
628 {
629   if (!on) return;
630
631   // If there are no clients, resume as soon as we're turned on
632
633   //if(CmiMyPartition()==1){
634  //     CkPrintf("[%d] at_count %d client_count %d\n",CkMyPe(),at_count,client_count);
635   //}
636   if (client_count == 0) {
637     cur_refcount++;
638     CallReceivers();
639   }
640   if (at_count >= client_count) {
641     CmiBool at_barrier = CmiFalse;
642
643 //    for(int i=0; i < max_client; i++)
644     for(int i=0; i < clients.size(); i++)
645       if (clients[i] != 0 && ((client*)clients[i])->refcount >= cur_refcount)
646         at_barrier = CmiTrue;
647                 
648     if (at_barrier) {
649       at_count -= client_count;
650       cur_refcount++;
651       CallReceivers();
652     }
653   }
654 }
655
656 void LocalBarrier::CallReceivers(void)
657 {
658   CmiBool called_receiver=CmiFalse;
659 //  if(CmiMyPartition()==1)
660         //CkPrintf("[%d][%d] call receiver\n",CmiMyPartition(),CkMyPe());
661         CkPrintf("[%d] call receiver\n",CkMyPe());
662 //  for(int i=0; i < max_receiver; i++)
663 //   for (int i=max_receiver-1; i>=0; i--) {
664    for (int i=receivers.size()-1; i>=0; i--) {
665       receiver *recv = receivers[i];
666       if (recv != 0 && recv->on) {
667         recv->fn(recv->data);
668         called_receiver = CmiTrue;
669       }
670   }
671
672   if (!called_receiver)
673     ResumeClients();
674   
675 }
676
677 void LocalBarrier::ResumeClients(void)
678 {
679 //  for(int i=0; i < max_client; i++)
680   for(int i=0; i < clients.size(); i++)
681     if (clients[i] != 0) {
682       ((client*)clients[i])->fn(((client*)clients[i])->data);
683     }   
684 }
685
686 #endif
687
688 /*@}*/