adaptively adjust LB period time
[charm.git] / src / ck-ldb / LBDBManager.C
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 /**
9  * \addtogroup CkLdb
10 */
11 /*@{*/
12
13 #include <charm++.h>
14
15 #if CMK_LBDB_ON
16
17 #include "LBDBManager.h"
18
19 struct MigrateCB;
20
21 /*************************************************************
22  * Set up the builtin barrier-- the load balancer needs somebody
23  * to call AtSync on each PE in case there are no atSync array 
24  * elements.  The builtin-atSync caller (batsyncer) does this.
25  */
26
27 //Called periodically-- starts next load balancing cycle
28 void LBDB::batsyncer::gotoSync(void *bs)
29 {
30   LBDB::batsyncer *s=(LBDB::batsyncer *)bs;
31   s->db->AtLocalBarrier(s->BH);
32 }
33 //Called at end of each load balancing cycle
34 void LBDB::batsyncer::resumeFromSync(void *bs)
35 {
36   LBDB::batsyncer *s=(LBDB::batsyncer *)bs;
37 //  CmiPrintf("[%d] LBDB::batsyncer::resumeFromSync with %gs\n", CkMyPe(), s->period);
38   double curT = CmiWallTimer();
39   if (s->nextT<curT)  s->period *= 2;
40   s->nextT = curT + s->period;
41
42   CcdCallFnAfterOnPE((CcdVoidFn)gotoSync, (void *)s, 1000*s->period, CkMyPe());
43 }
44
45 // initPeriod in seconds
46 void LBDB::batsyncer::init(LBDB *_db,double initPeriod)
47 {
48   db=_db;
49   period=initPeriod;
50   nextT = CmiWallTimer() + period;
51   BH = db->AddLocalBarrierClient((LDResumeFn)resumeFromSync,(void*)(this));
52   //This just does a CcdCallFnAfter
53   resumeFromSync((void *)this);
54 }
55
56
57 /*************************************************************
58  * LBDB Code
59  *************************************************************/
60
61 LBDB::LBDB(): useBarrier(CmiTrue)
62 {
63     statsAreOn = CmiFalse;
64     omCount = objCount = oms_registering = 0;
65     obj_running = CmiFalse;
66     commTable = new LBCommTable;
67     obj_walltime = 0;
68 #if CMK_LB_CPUTIMER
69     obj_cputime = 0;
70 #endif
71     startLBFn_count = 0;
72     predictCBFn = NULL;
73     batsync.init(this, _lb_args.lbperiod());        // original 1.0 second
74 }
75
76 LDOMHandle LBDB::AddOM(LDOMid _userID, void* _userData, 
77                        LDCallbacks _callbacks)
78 {
79   LDOMHandle newhandle;
80
81   newhandle.ldb.handle = (void*)(this);
82 //  newhandle.user_ptr = _userData;
83   newhandle.id = _userID;
84
85   LBOM* om = new LBOM(this,_userID,_userData,_callbacks);
86   if (om != NULL) {
87     newhandle.handle = oms.length();
88     oms.insertAtEnd(om);
89   } else newhandle.handle = -1;
90   om->DepositHandle(newhandle);
91   omCount++;
92   return newhandle;
93 }
94
95 #if CMK_BLUEGENE_CHARM
96 #define LBOBJ_OOC_IDX 0x1
97 #endif
98
99 LDObjHandle LBDB::AddObj(LDOMHandle _omh, LDObjid _id,
100                          void *_userData, CmiBool _migratable)
101 {
102   LDObjHandle newhandle;
103
104   newhandle.omhandle = _omh;
105 //  newhandle.user_ptr = _userData;
106   newhandle.id = _id;
107   
108 #if 1
109 #if CMK_BLUEGENE_CHARM
110   if(_BgOutOfCoreFlag==2){ //taking object into memory
111     //first find the first (LBOBJ_OOC_IDX) in objs and insert the object at that position
112     int newpos = -1;
113     for(int i=0; i<objs.length(); i++){
114         if(objs[i]==(LBObj *)LBOBJ_OOC_IDX){
115             newpos = i;
116             break;
117         }
118     }
119     if(newpos==-1) newpos = objs.length();
120     newhandle.handle = newpos;
121     LBObj *obj = new LBObj(this, newhandle, _userData, _migratable);
122     objs.insert(newpos, obj);
123     //objCount is not increased since it's the original object which is pupped
124     //through out-of-core emulation. 
125     //objCount++;
126   }else
127 #endif
128   {
129     //BIGSIM_OOC DEBUGGING
130     //CkPrintf("Proc[%d]: In AddObj for real migration\n", CkMyPe());
131     newhandle.handle = objs.length();
132     LBObj *obj = new LBObj(this, newhandle, _userData, _migratable);
133     objs.insertAtEnd(obj);
134     objCount++;
135   }
136   //BIGSIM_OOC DEBUGGING
137   //CkPrintf("LBDBManager.C: New handle: %d, LBObj=%p\n", newhandle.handle, objs[newhandle.handle]);
138
139 #else
140   LBObj *obj = new LBObj(this,_omh,_id,_userData,_migratable);
141   if (obj != NULL) {
142     newhandle.handle = objs.length();
143     objs.insertAtEnd(obj);
144   } else {
145     newhandle.handle = -1;
146   }
147   obj->DepositHandle(newhandle);
148 #endif
149   return newhandle;
150 }
151
152 void LBDB::UnregisterObj(LDObjHandle _h)
153 {
154 //  (objs[_h.handle])->registered=CmiFalse;
155 // free the memory, it is a memory leak.
156 // CmiPrintf("[%d] UnregisterObj: %d\n", CkMyPe(), _h.handle);
157   delete objs[_h.handle];
158
159 #if CMK_BLUEGENE_CHARM
160   //hack for BigSim out-of-core emulation.
161   //we want the chare array object to keep at the same
162   //position even going through the pupping routine.
163   if(_BgOutOfCoreFlag==1){ //in taking object out of memory
164     objs[_h.handle] = (LBObj *)(LBOBJ_OOC_IDX);
165   }else
166 #endif
167   {
168     objs[_h.handle] = NULL;
169   }
170 }
171
172 void LBDB::RegisteringObjects(LDOMHandle _h)
173 {
174   // for an unregistered anonymous OM to join and control the barrier
175   if (_h.id.id.idx == 0) {
176     if (oms_registering == 0)
177       localBarrier.TurnOff();
178     oms_registering++;
179   }
180   else {
181   LBOM* om = oms[_h.handle];
182   if (!om->RegisteringObjs()) {
183     if (oms_registering == 0)
184       localBarrier.TurnOff();
185     oms_registering++;
186     om->SetRegisteringObjs(CmiTrue);
187   }
188   }
189 }
190
191 void LBDB::DoneRegisteringObjects(LDOMHandle _h)
192 {
193   // for an unregistered anonymous OM to join and control the barrier
194   if (_h.id.id.idx == 0) {
195     oms_registering--;
196     if (oms_registering == 0)
197       localBarrier.TurnOn();
198   }
199   else {
200   LBOM* om = oms[_h.handle];
201   if (om->RegisteringObjs()) {
202     oms_registering--;
203     if (oms_registering == 0)
204       localBarrier.TurnOn();
205     om->SetRegisteringObjs(CmiFalse);
206   }
207   }
208 }
209
210
211 void LBDB::Send(const LDOMHandle &destOM, const LDObjid &destid, unsigned int bytes, int destObjProc)
212 {
213   LBCommData* item_ptr;
214
215   if (obj_running) {
216     const LDObjHandle &runObj = RunningObj();
217
218     // Don't record self-messages from an object to an object
219     if ( LDOMidEqual(runObj.omhandle.id,destOM.id)
220          && LDObjIDEqual(runObj.id,destid) )
221       return;
222
223     // In the future, we'll have to eliminate processor to same 
224     // processor messages as well
225
226     LBCommData item(runObj,destOM.id,destid, destObjProc);
227     item_ptr = commTable->HashInsertUnique(item);
228   } else {
229     LBCommData item(CkMyPe(),destOM.id,destid, destObjProc);
230     item_ptr = commTable->HashInsertUnique(item);
231   }  
232   item_ptr->addMessage(bytes);
233 }
234
235 void LBDB::MulticastSend(const LDOMHandle &destOM, LDObjid *destids, int ndests, unsigned int bytes, int nMsgs)
236 {
237   LBCommData* item_ptr;
238
239   //CmiAssert(obj_running);
240   if (obj_running) {
241     const LDObjHandle &runObj = RunningObj();
242
243     LBCommData item(runObj,destOM.id,destids, ndests);
244     item_ptr = commTable->HashInsertUnique(item);
245     item_ptr->addMessage(bytes, nMsgs);
246   } 
247 }
248
249 void LBDB::ClearLoads(void)
250 {
251   int i;
252   for(i=0; i < objCount; i++) {
253     LBObj *obj = objs[i]; 
254     if (obj)
255     {
256       if (obj->data.wallTime>.0) {
257         obj->lastWallTime = obj->data.wallTime;
258 #if CMK_LB_CPUTIMER
259         obj->lastCpuTime = obj->data.cpuTime;
260 #endif
261       }
262       obj->data.wallTime = 0.;
263 #if CMK_LB_CPUTIMER
264       obj->data.cpuTime = 0.;
265 #endif
266     }
267   }
268   delete commTable;
269   commTable = new LBCommTable;
270   machineUtil.Clear();
271   obj_walltime = 0;
272 #if CMK_LB_CPUTIMER
273   obj_cputime = 0;
274 #endif
275 }
276
277 int LBDB::ObjDataCount()
278 {
279   int nitems=0;
280   int i;
281   if (_lb_args.migObjOnly()) {
282   for(i=0; i < objCount; i++)
283     if (objs[i] && (objs[i])->data.migratable)
284       nitems++;
285   }
286   else {
287   for(i=0; i < objCount; i++)
288     if (objs[i])
289       nitems++;
290   }
291   return nitems;
292 }
293
294 void LBDB::GetObjData(LDObjData *dp)
295 {
296   if (_lb_args.migObjOnly()) {
297   for(int i = 0; i < objs.length(); i++) {
298     LBObj* obj = objs[i];
299     if ( obj && obj->data.migratable)
300       *dp++ = obj->ObjData();
301   }
302   }
303   else {
304   for(int i = 0; i < objs.length(); i++) {
305     LBObj* obj = objs[i];
306     if (obj)
307       *dp++ = obj->ObjData();
308   }
309   }
310 }
311
312 int LBDB::Migrate(LDObjHandle h, int dest)
313 {
314     //BIGSIM_OOC DEBUGGING
315     //CmiPrintf("[%d] LBDB::Migrate: incoming handle %d with handle range 0-%d\n", CkMyPe(), h.handle, objCount);
316
317   if (h.handle > objCount)
318     CmiPrintf("[%d] LBDB::Migrate: Handle %d out of range 0-%d\n",CkMyPe(),h.handle,objCount);
319   else if (!objs[h.handle]) {
320     CmiPrintf("[%d] LBDB::Migrate: Handle %d no longer registered, range 0-%d\n", CkMyPe(),h.handle,objCount);
321     return 0;
322   }
323
324   if ((h.handle < objCount) && objs[h.handle]) {
325     LBOM *const om = oms[(objs[h.handle])->parentOM().handle];
326     om->Migrate(h, dest);
327   }
328   return 1;
329 }
330
331 void LBDB::Migrated(LDObjHandle h, int waitBarrier)
332 {
333   // Object migrated, inform load balancers
334
335   // subtle: callback may change (on) when switching LBs
336   // call in reverse order
337   //for(int i=0; i < migrateCBList.length(); i++) {
338   for(int i=migrateCBList.length()-1; i>=0; i--) {
339     MigrateCB* cb = (MigrateCB*)migrateCBList[i];
340     if (cb && cb->on) (cb->fn)(cb->data,h,waitBarrier);
341   }
342   
343 }
344
345 int LBDB::NotifyMigrated(LDMigratedFn fn, void* data)
346 {
347   // Save migration function
348   MigrateCB* callbk = new MigrateCB;
349
350   callbk->fn = fn;
351   callbk->data = data;
352   callbk->on = 1;
353   migrateCBList.insertAtEnd(callbk);
354   return migrateCBList.size()-1;
355 }
356
357 void LBDB::RemoveNotifyMigrated(int handle)
358 {
359   MigrateCB* callbk = migrateCBList[handle];
360   migrateCBList[handle] = NULL;
361   delete callbk;
362 }
363
364 int LBDB::AddStartLBFn(LDStartLBFn fn, void* data)
365 {
366   // Save startLB function
367   StartLBCB* callbk = new StartLBCB;
368
369   callbk->fn = fn;
370   callbk->data = data;
371   callbk->on = 1;
372   startLBFnList.push_back(callbk);
373   startLBFn_count++;
374   return startLBFnList.size()-1;
375 }
376
377 void LBDB::RemoveStartLBFn(LDStartLBFn fn)
378 {
379   for (int i=0; i<startLBFnList.length(); i++) {
380     StartLBCB* callbk = startLBFnList[i];
381     if (callbk && callbk->fn == fn) {
382       delete callbk;
383       startLBFnList[i] = 0; 
384       startLBFn_count --;
385       break;
386     }
387   }
388 }
389
390 void LBDB::StartLB() 
391 {
392   if (startLBFn_count == 0) {
393     CmiAbort("StartLB is not supported in this LB");
394   }
395   for (int i=0; i<startLBFnList.length(); i++) {
396     StartLBCB *startLBFn = startLBFnList[i];
397     if (startLBFn && startLBFn->on) startLBFn->fn(startLBFn->data);
398   }
399 }
400
401 void LBDB::SetupPredictor(LDPredictModelFn on, LDPredictWindowFn onWin, LDPredictFn off, LDPredictModelFn change, void* data)
402 {
403   if (predictCBFn==NULL) predictCBFn = new PredictCB;
404   predictCBFn->on = on;
405   predictCBFn->onWin = onWin;
406   predictCBFn->off = off;
407   predictCBFn->change = change;
408   predictCBFn->data = data;
409 }
410
411 void LBDB::BackgroundLoad(LBRealType* bg_walltime, LBRealType* bg_cputime)
412 {
413   LBRealType total_walltime;
414   LBRealType total_cputime;
415   TotalTime(&total_walltime, &total_cputime);
416
417   LBRealType idletime;
418   IdleTime(&idletime);
419
420   *bg_walltime = total_walltime - idletime - obj_walltime;
421   if (*bg_walltime < 0) *bg_walltime = 0.;
422 #if CMK_LB_CPUTIMER
423   *bg_cputime = total_cputime - obj_cputime;
424 #else
425   *bg_cputime = *bg_walltime;
426 #endif
427 }
428
429 void LBDB::GetTime(LBRealType *total_walltime,LBRealType *total_cputime,
430                    LBRealType *idletime, LBRealType *bg_walltime, LBRealType *bg_cputime)
431 {
432   TotalTime(total_walltime,total_cputime);
433
434   IdleTime(idletime);
435   
436   *bg_walltime = *total_walltime - *idletime - obj_walltime;
437   if (*bg_walltime < 0) *bg_walltime = 0.;
438 #if CMK_LB_CPUTIMER
439   *bg_cputime = *total_cputime - obj_cputime;
440 #else
441   *bg_cputime = *bg_walltime;
442 #endif
443   //CkPrintf("HERE [%d] total: %f %f obj: %f %f idle: %f bg: %f\n", CkMyPe(), *total_walltime, *total_cputime, obj_walltime, obj_cputime, *idletime, *bg_walltime);
444 }
445
446 void LBDB::DumpDatabase()
447 {
448 #ifdef DEBUG  
449   CmiPrintf("Database contains %d object managers\n",omCount);
450   CmiPrintf("Database contains %d objects\n",objCount);
451 #endif
452 }
453
454 int LBDB::useMem() {
455   int size = sizeof(LBDB);
456   size += oms.length() * sizeof(LBOM);
457   size += ObjDataCount() * sizeof(LBObj);
458   size += migrateCBList.length() * sizeof(MigrateCBList);
459   size += startLBFnList.length() * sizeof(StartLBCB);
460   size += commTable->useMem();
461   return size;
462 }
463
464 LDBarrierClient LocalBarrier::AddClient(LDResumeFn fn, void* data)
465 {
466   client* new_client = new client;
467   new_client->fn = fn;
468   new_client->data = data;
469   new_client->refcount = cur_refcount;
470
471   LDBarrierClient ret_val;
472 #if CMK_BLUEGENE_CHARM
473   ret_val.serial = first_free_client_slot;
474   clients.insert(ret_val.serial, new_client);
475
476   //looking for the next first free client slot
477   int nextfree=-1;
478   for(int i=first_free_client_slot+1; i<clients.size(); i++)
479     if(clients[i]==NULL) { nextfree = i; break; }
480   if(nextfree==-1) nextfree = clients.size();
481   first_free_client_slot = nextfree;
482
483   if(_BgOutOfCoreFlag!=2){
484     //during out-of-core emualtion for BigSim, if taking procs from disk to mem,
485     //client_count should not be increased
486     client_count++;
487   }
488
489 #else  
490   //ret_val.serial = max_client;
491   ret_val.serial = clients.size();
492   clients.insertAtEnd(new_client);
493   //max_client++;
494   client_count++;
495 #endif
496
497   return ret_val;
498 }
499
500 void LocalBarrier::RemoveClient(LDBarrierClient c)
501 {
502   const int cnum = c.serial;
503 #if CMK_BLUEGENE_CHARM
504   if (cnum < clients.size() && clients[cnum] != 0) {
505     delete (clients[cnum]);
506     clients[cnum] = 0;
507
508     if(cnum<=first_free_client_slot) first_free_client_slot = cnum;
509
510     if(_BgOutOfCoreFlag!=1){
511         //during out-of-core emulation for BigSim, if taking procs from mem to disk,
512         //client_count should not be increased
513         client_count--;
514     }
515   }
516 #else
517   //if (cnum < max_client && clients[cnum] != 0) {
518   if (cnum < clients.size() && clients[cnum] != 0) {
519     delete (clients[cnum]);
520     clients[cnum] = 0;
521     client_count--;
522   }
523 #endif
524 }
525
526 LDBarrierReceiver LocalBarrier::AddReceiver(LDBarrierFn fn, void* data)
527 {
528   receiver* new_receiver = new receiver;
529   new_receiver->fn = fn;
530   new_receiver->data = data;
531   new_receiver->on = 1;
532
533   LDBarrierReceiver ret_val;
534 //  ret_val.serial = max_receiver;
535   ret_val.serial = receivers.size();
536   receivers.insertAtEnd(new_receiver);
537 //  max_receiver++;
538
539   return ret_val;
540 }
541
542 void LocalBarrier::RemoveReceiver(LDBarrierReceiver c)
543 {
544   const int cnum = c.serial;
545   //if (cnum < max_receiver && receivers[cnum] != 0) {
546   if (cnum < receivers.size() && receivers[cnum] != 0) {
547     delete (receivers[cnum]);
548     receivers[cnum] = 0;
549   }
550 }
551
552 void LocalBarrier::TurnOnReceiver(LDBarrierReceiver c)
553 {
554   const int cnum = c.serial;
555   //if (cnum < max_receiver && receivers[cnum] != 0) {
556   if (cnum < receivers.size() && receivers[cnum] != 0) {
557     receivers[cnum]->on = 1;
558   }
559 }
560
561 void LocalBarrier::TurnOffReceiver(LDBarrierReceiver c)
562 {
563   const int cnum = c.serial;
564   //if (cnum < max_receiver && receivers[cnum] != 0) {
565   if (cnum < receivers.size() && receivers[cnum] != 0) {
566     receivers[cnum]->on = 0;
567   }
568 }
569
570 void LocalBarrier::AtBarrier(LDBarrierClient h)
571 {
572   (clients[h.serial])->refcount++;
573   at_count++;
574   CheckBarrier();
575 }
576
577 void LocalBarrier::CheckBarrier()
578 {
579   if (!on) return;
580
581   // If there are no clients, resume as soon as we're turned on
582
583   if (client_count == 0) {
584     cur_refcount++;
585     CallReceivers();
586   }
587   if (at_count >= client_count) {
588     CmiBool at_barrier = CmiFalse;
589
590 //    for(int i=0; i < max_client; i++)
591     for(int i=0; i < clients.size(); i++)
592       if (clients[i] != 0 && ((client*)clients[i])->refcount >= cur_refcount)
593         at_barrier = CmiTrue;
594                 
595     if (at_barrier) {
596       at_count -= client_count;
597       cur_refcount++;
598       CallReceivers();
599     }
600   }
601 }
602
603 void LocalBarrier::CallReceivers(void)
604 {
605   CmiBool called_receiver=CmiFalse;
606
607 //  for(int i=0; i < max_receiver; i++)
608 //   for (int i=max_receiver-1; i>=0; i--) {
609    for (int i=receivers.size()-1; i>=0; i--) {
610       receiver *recv = receivers[i];
611       if (recv != 0 && recv->on) {
612         recv->fn(recv->data);
613         called_receiver = CmiTrue;
614       }
615   }
616
617   if (!called_receiver)
618     ResumeClients();
619   
620 }
621
622 void LocalBarrier::ResumeClients(void)
623 {
624 //  for(int i=0; i < max_client; i++)
625   for(int i=0; i < clients.size(); i++)
626     if (clients[i] != 0) {
627       ((client*)clients[i])->fn(((client*)clients[i])->data);
628     }   
629 }
630
631 #endif
632
633 /*@}*/