d5f614d1d2fc6f0edb3ac5f4126aac53645a0b47
[charm.git] / src / ck-ldb / LBDBManager.C
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 /**
9  * \addtogroup CkLdb
10 */
11 /*@{*/
12
13 #include <charm++.h>
14
15 #if CMK_LBDB_ON
16
17 #include "LBDBManager.h"
18
19 struct MigrateCB;
20
21 /*************************************************************
22  * Set up the builtin barrier-- the load balancer needs somebody
23  * to call AtSync on each PE in case there are no atSync array 
24  * elements.  The builtin-atSync caller (batsyncer) does this.
25  */
26
27 //Called periodically-- starts next load balancing cycle
28 void LBDB::batsyncer::gotoSync(void *bs)
29 {
30   LBDB::batsyncer *s=(LBDB::batsyncer *)bs;
31   s->db->AtLocalBarrier(s->BH);
32 }
33 //Called at end of each load balancing cycle
34 void LBDB::batsyncer::resumeFromSync(void *bs)
35 {
36   LBDB::batsyncer *s=(LBDB::batsyncer *)bs;
37 //  CmiPrintf("[%d] LBDB::batsyncer::resumeFromSync with %gs\n", CkMyPe(), s->period);
38   CcdCallFnAfterOnPE((CcdVoidFn)gotoSync, (void *)s, 1000*s->period, CkMyPe());
39 }
40
41 // initPeriod in seconds
42 void LBDB::batsyncer::init(LBDB *_db,double initPeriod)
43 {
44   db=_db;
45   period=initPeriod;
46   BH = db->AddLocalBarrierClient((LDResumeFn)resumeFromSync,(void*)(this));
47   //This just does a CcdCallFnAfter
48   resumeFromSync((void *)this);
49 }
50
51
52 /*************************************************************
53  * LBDB Code
54  *************************************************************/
55
56 LBDB::LBDB(): useBarrier(CmiTrue)
57 {
58     statsAreOn = CmiFalse;
59     omCount = objCount = oms_registering = 0;
60     obj_running = CmiFalse;
61     commTable = new LBCommTable;
62     obj_walltime = 0;
63 #if CMK_LB_CPUTIMER
64     obj_cputime = 0;
65 #endif
66     startLBFn_count = 0;
67     predictCBFn = NULL;
68     batsync.init(this, _lb_args.lbperiod());        // original 1.0 second
69 }
70
71 LDOMHandle LBDB::AddOM(LDOMid _userID, void* _userData, 
72                        LDCallbacks _callbacks)
73 {
74   LDOMHandle newhandle;
75
76   newhandle.ldb.handle = (void*)(this);
77 //  newhandle.user_ptr = _userData;
78   newhandle.id = _userID;
79
80   LBOM* om = new LBOM(this,_userID,_userData,_callbacks);
81   if (om != NULL) {
82     newhandle.handle = oms.length();
83     oms.insertAtEnd(om);
84   } else newhandle.handle = -1;
85   om->DepositHandle(newhandle);
86   omCount++;
87   return newhandle;
88 }
89
90 #if CMK_BLUEGENE_CHARM
91 #define LBOBJ_OOC_IDX 0x1
92 #endif
93
94 LDObjHandle LBDB::AddObj(LDOMHandle _omh, LDObjid _id,
95                          void *_userData, CmiBool _migratable)
96 {
97   LDObjHandle newhandle;
98
99   newhandle.omhandle = _omh;
100 //  newhandle.user_ptr = _userData;
101   newhandle.id = _id;
102   
103 #if 1
104 #if CMK_BLUEGENE_CHARM
105   if(_BgOutOfCoreFlag==2){ //taking object into memory
106     //first find the first (LBOBJ_OOC_IDX) in objs and insert the object at that position
107     int newpos = -1;
108     for(int i=0; i<objs.length(); i++){
109         if(objs[i]==(LBObj *)LBOBJ_OOC_IDX){
110             newpos = i;
111             break;
112         }
113     }
114     if(newpos==-1) newpos = objs.length();
115     newhandle.handle = newpos;
116     LBObj *obj = new LBObj(this, newhandle, _userData, _migratable);
117     objs.insert(newpos, obj);
118     //objCount is not increased since it's the original object which is pupped
119     //through out-of-core emulation. 
120     //objCount++;
121   }else
122 #endif
123   {
124     //BIGSIM_OOC DEBUGGING
125     //CkPrintf("Proc[%d]: In AddObj for real migration\n", CkMyPe());
126     newhandle.handle = objs.length();
127     LBObj *obj = new LBObj(this, newhandle, _userData, _migratable);
128     objs.insertAtEnd(obj);
129     objCount++;
130   }
131   //BIGSIM_OOC DEBUGGING
132   //CkPrintf("LBDBManager.C: New handle: %d, LBObj=%p\n", newhandle.handle, objs[newhandle.handle]);
133
134 #else
135   LBObj *obj = new LBObj(this,_omh,_id,_userData,_migratable);
136   if (obj != NULL) {
137     newhandle.handle = objs.length();
138     objs.insertAtEnd(obj);
139   } else {
140     newhandle.handle = -1;
141   }
142   obj->DepositHandle(newhandle);
143 #endif
144   return newhandle;
145 }
146
147 void LBDB::UnregisterObj(LDObjHandle _h)
148 {
149 //  (objs[_h.handle])->registered=CmiFalse;
150 // free the memory, it is a memory leak.
151 // CmiPrintf("[%d] UnregisterObj: %d\n", CkMyPe(), _h.handle);
152   delete objs[_h.handle];
153
154 #if CMK_BLUEGENE_CHARM
155   //hack for BigSim out-of-core emulation.
156   //we want the chare array object to keep at the same
157   //position even going through the pupping routine.
158   if(_BgOutOfCoreFlag==1){ //in taking object out of memory
159     objs[_h.handle] = (LBObj *)(LBOBJ_OOC_IDX);
160   }else
161 #endif
162   {
163     objs[_h.handle] = NULL;
164   }
165 }
166
167 void LBDB::RegisteringObjects(LDOMHandle _h)
168 {
169   // for an unregistered anonymous OM to join and control the barrier
170   if (_h.id.id.idx == 0) {
171     if (oms_registering == 0)
172       localBarrier.TurnOff();
173     oms_registering++;
174   }
175   else {
176   LBOM* om = oms[_h.handle];
177   if (!om->RegisteringObjs()) {
178     if (oms_registering == 0)
179       localBarrier.TurnOff();
180     oms_registering++;
181     om->SetRegisteringObjs(CmiTrue);
182   }
183   }
184 }
185
186 void LBDB::DoneRegisteringObjects(LDOMHandle _h)
187 {
188   // for an unregistered anonymous OM to join and control the barrier
189   if (_h.id.id.idx == 0) {
190     oms_registering--;
191     if (oms_registering == 0)
192       localBarrier.TurnOn();
193   }
194   else {
195   LBOM* om = oms[_h.handle];
196   if (om->RegisteringObjs()) {
197     oms_registering--;
198     if (oms_registering == 0)
199       localBarrier.TurnOn();
200     om->SetRegisteringObjs(CmiFalse);
201   }
202   }
203 }
204
205
206 void LBDB::Send(const LDOMHandle &destOM, const LDObjid &destid, unsigned int bytes, int destObjProc)
207 {
208   LBCommData* item_ptr;
209
210   if (obj_running) {
211     const LDObjHandle &runObj = RunningObj();
212
213     // Don't record self-messages from an object to an object
214     if ( LDOMidEqual(runObj.omhandle.id,destOM.id)
215          && LDObjIDEqual(runObj.id,destid) )
216       return;
217
218     // In the future, we'll have to eliminate processor to same 
219     // processor messages as well
220
221     LBCommData item(runObj,destOM.id,destid, destObjProc);
222     item_ptr = commTable->HashInsertUnique(item);
223   } else {
224     LBCommData item(CkMyPe(),destOM.id,destid, destObjProc);
225     item_ptr = commTable->HashInsertUnique(item);
226   }  
227   item_ptr->addMessage(bytes);
228 }
229
230 void LBDB::MulticastSend(const LDOMHandle &destOM, LDObjid *destids, int ndests, unsigned int bytes, int nMsgs)
231 {
232   LBCommData* item_ptr;
233
234   //CmiAssert(obj_running);
235   if (obj_running) {
236     const LDObjHandle &runObj = RunningObj();
237
238     LBCommData item(runObj,destOM.id,destids, ndests);
239     item_ptr = commTable->HashInsertUnique(item);
240     item_ptr->addMessage(bytes, nMsgs);
241   } 
242 }
243
244 void LBDB::ClearLoads(void)
245 {
246   int i;
247   for(i=0; i < objCount; i++) {
248     LBObj *obj = objs[i]; 
249     if (obj)
250     {
251       if (obj->data.wallTime>.0) {
252         obj->lastWallTime = obj->data.wallTime;
253 #if CMK_LB_CPUTIMER
254         obj->lastCpuTime = obj->data.cpuTime;
255 #endif
256       }
257       obj->data.wallTime = 0.;
258 #if CMK_LB_CPUTIMER
259       obj->data.cpuTime = 0.;
260 #endif
261     }
262   }
263   delete commTable;
264   commTable = new LBCommTable;
265   machineUtil.Clear();
266   obj_walltime = 0;
267 #if CMK_LB_CPUTIMER
268   obj_cputime = 0;
269 #endif
270 }
271
272 int LBDB::ObjDataCount()
273 {
274   int nitems=0;
275   int i;
276   if (_lb_args.migObjOnly()) {
277   for(i=0; i < objCount; i++)
278     if (objs[i] && (objs[i])->data.migratable)
279       nitems++;
280   }
281   else {
282   for(i=0; i < objCount; i++)
283     if (objs[i])
284       nitems++;
285   }
286   return nitems;
287 }
288
289 void LBDB::GetObjData(LDObjData *dp)
290 {
291   if (_lb_args.migObjOnly()) {
292   for(int i = 0; i < objs.length(); i++) {
293     LBObj* obj = objs[i];
294     if ( obj && obj->data.migratable)
295       *dp++ = obj->ObjData();
296   }
297   }
298   else {
299   for(int i = 0; i < objs.length(); i++) {
300     LBObj* obj = objs[i];
301     if (obj)
302       *dp++ = obj->ObjData();
303   }
304   }
305 }
306
307 int LBDB::Migrate(LDObjHandle h, int dest)
308 {
309     //BIGSIM_OOC DEBUGGING
310     //CmiPrintf("[%d] LBDB::Migrate: incoming handle %d with handle range 0-%d\n", CkMyPe(), h.handle, objCount);
311
312   if (h.handle > objCount)
313     CmiPrintf("[%d] LBDB::Migrate: Handle %d out of range 0-%d\n",CkMyPe(),h.handle,objCount);
314   else if (!objs[h.handle]) {
315     CmiPrintf("[%d] LBDB::Migrate: Handle %d no longer registered, range 0-%d\n", CkMyPe(),h.handle,objCount);
316     return 0;
317   }
318
319   if ((h.handle < objCount) && objs[h.handle]) {
320     LBOM *const om = oms[(objs[h.handle])->parentOM().handle];
321     om->Migrate(h, dest);
322   }
323   return 1;
324 }
325
326 void LBDB::Migrated(LDObjHandle h, int waitBarrier)
327 {
328   // Object migrated, inform load balancers
329
330   // subtle: callback may change (on) when switching LBs
331   // call in reverse order
332   //for(int i=0; i < migrateCBList.length(); i++) {
333   for(int i=migrateCBList.length()-1; i>=0; i--) {
334     MigrateCB* cb = (MigrateCB*)migrateCBList[i];
335     if (cb && cb->on) (cb->fn)(cb->data,h,waitBarrier);
336   }
337   
338 }
339
340 int LBDB::NotifyMigrated(LDMigratedFn fn, void* data)
341 {
342   // Save migration function
343   MigrateCB* callbk = new MigrateCB;
344
345   callbk->fn = fn;
346   callbk->data = data;
347   callbk->on = 1;
348   migrateCBList.insertAtEnd(callbk);
349   return migrateCBList.size()-1;
350 }
351
352 void LBDB::RemoveNotifyMigrated(int handle)
353 {
354   MigrateCB* callbk = migrateCBList[handle];
355   migrateCBList[handle] = NULL;
356   delete callbk;
357 }
358
359 int LBDB::AddStartLBFn(LDStartLBFn fn, void* data)
360 {
361   // Save startLB function
362   StartLBCB* callbk = new StartLBCB;
363
364   callbk->fn = fn;
365   callbk->data = data;
366   callbk->on = 1;
367   startLBFnList.push_back(callbk);
368   startLBFn_count++;
369   return startLBFnList.size()-1;
370 }
371
372 void LBDB::RemoveStartLBFn(LDStartLBFn fn)
373 {
374   for (int i=0; i<startLBFnList.length(); i++) {
375     StartLBCB* callbk = startLBFnList[i];
376     if (callbk && callbk->fn == fn) {
377       delete callbk;
378       startLBFnList[i] = 0; 
379       startLBFn_count --;
380       break;
381     }
382   }
383 }
384
385 void LBDB::StartLB() 
386 {
387   if (startLBFn_count == 0) {
388     CmiAbort("StartLB is not supported in this LB");
389   }
390   for (int i=0; i<startLBFnList.length(); i++) {
391     StartLBCB *startLBFn = startLBFnList[i];
392     if (startLBFn && startLBFn->on) startLBFn->fn(startLBFn->data);
393   }
394 }
395
396 void LBDB::SetupPredictor(LDPredictModelFn on, LDPredictWindowFn onWin, LDPredictFn off, LDPredictModelFn change, void* data)
397 {
398   if (predictCBFn==NULL) predictCBFn = new PredictCB;
399   predictCBFn->on = on;
400   predictCBFn->onWin = onWin;
401   predictCBFn->off = off;
402   predictCBFn->change = change;
403   predictCBFn->data = data;
404 }
405
406 void LBDB::BackgroundLoad(LBRealType* bg_walltime, LBRealType* bg_cputime)
407 {
408   LBRealType total_walltime;
409   LBRealType total_cputime;
410   TotalTime(&total_walltime, &total_cputime);
411
412   LBRealType idletime;
413   IdleTime(&idletime);
414
415   *bg_walltime = total_walltime - idletime - obj_walltime;
416   if (*bg_walltime < 0) *bg_walltime = 0.;
417 #if CMK_LB_CPUTIMER
418   *bg_cputime = total_cputime - obj_cputime;
419 #else
420   *bg_cputime = *bg_walltime;
421 #endif
422 }
423
424 void LBDB::GetTime(LBRealType *total_walltime,LBRealType *total_cputime,
425                    LBRealType *idletime, LBRealType *bg_walltime, LBRealType *bg_cputime)
426 {
427   TotalTime(total_walltime,total_cputime);
428
429   IdleTime(idletime);
430   
431   *bg_walltime = *total_walltime - *idletime - obj_walltime;
432   if (*bg_walltime < 0) *bg_walltime = 0.;
433 #if CMK_LB_CPUTIMER
434   *bg_cputime = *total_cputime - obj_cputime;
435 #else
436   *bg_cputime = *bg_walltime;
437 #endif
438   //CkPrintf("HERE [%d] total: %f %f obj: %f %f idle: %f bg: %f\n", CkMyPe(), *total_walltime, *total_cputime, obj_walltime, obj_cputime, *idletime, *bg_walltime);
439 }
440
441 void LBDB::DumpDatabase()
442 {
443 #ifdef DEBUG  
444   CmiPrintf("Database contains %d object managers\n",omCount);
445   CmiPrintf("Database contains %d objects\n",objCount);
446 #endif
447 }
448
449 int LBDB::useMem() {
450   int size = sizeof(LBDB);
451   size += oms.length() * sizeof(LBOM);
452   size += ObjDataCount() * sizeof(LBObj);
453   size += migrateCBList.length() * sizeof(MigrateCBList);
454   size += startLBFnList.length() * sizeof(StartLBCB);
455   size += commTable->useMem();
456   return size;
457 }
458
459 LDBarrierClient LocalBarrier::AddClient(LDResumeFn fn, void* data)
460 {
461   client* new_client = new client;
462   new_client->fn = fn;
463   new_client->data = data;
464   new_client->refcount = cur_refcount;
465
466   LDBarrierClient ret_val;
467 #if CMK_BLUEGENE_CHARM
468   ret_val.serial = first_free_client_slot;
469   clients.insert(ret_val.serial, new_client);
470
471   //looking for the next first free client slot
472   int nextfree=-1;
473   for(int i=first_free_client_slot+1; i<clients.size(); i++)
474     if(clients[i]==NULL) { nextfree = i; break; }
475   if(nextfree==-1) nextfree = clients.size();
476   first_free_client_slot = nextfree;
477
478   if(_BgOutOfCoreFlag!=2){
479     //during out-of-core emualtion for BigSim, if taking procs from disk to mem,
480     //client_count should not be increased
481     client_count++;
482   }
483
484 #else  
485   //ret_val.serial = max_client;
486   ret_val.serial = clients.size();
487   clients.insertAtEnd(new_client);
488   //max_client++;
489   client_count++;
490 #endif
491
492   return ret_val;
493 }
494
495 void LocalBarrier::RemoveClient(LDBarrierClient c)
496 {
497   const int cnum = c.serial;
498 #if CMK_BLUEGENE_CHARM
499   if (cnum < clients.size() && clients[cnum] != 0) {
500     delete (clients[cnum]);
501     clients[cnum] = 0;
502
503     if(cnum<=first_free_client_slot) first_free_client_slot = cnum;
504
505     if(_BgOutOfCoreFlag!=1){
506         //during out-of-core emulation for BigSim, if taking procs from mem to disk,
507         //client_count should not be increased
508         client_count--;
509     }
510   }
511 #else
512   //if (cnum < max_client && clients[cnum] != 0) {
513   if (cnum < clients.size() && clients[cnum] != 0) {
514     delete (clients[cnum]);
515     clients[cnum] = 0;
516     client_count--;
517   }
518 #endif
519 }
520
521 LDBarrierReceiver LocalBarrier::AddReceiver(LDBarrierFn fn, void* data)
522 {
523   receiver* new_receiver = new receiver;
524   new_receiver->fn = fn;
525   new_receiver->data = data;
526   new_receiver->on = 1;
527
528   LDBarrierReceiver ret_val;
529 //  ret_val.serial = max_receiver;
530   ret_val.serial = receivers.size();
531   receivers.insertAtEnd(new_receiver);
532 //  max_receiver++;
533
534   return ret_val;
535 }
536
537 void LocalBarrier::RemoveReceiver(LDBarrierReceiver c)
538 {
539   const int cnum = c.serial;
540   //if (cnum < max_receiver && receivers[cnum] != 0) {
541   if (cnum < receivers.size() && receivers[cnum] != 0) {
542     delete (receivers[cnum]);
543     receivers[cnum] = 0;
544   }
545 }
546
547 void LocalBarrier::TurnOnReceiver(LDBarrierReceiver c)
548 {
549   const int cnum = c.serial;
550   //if (cnum < max_receiver && receivers[cnum] != 0) {
551   if (cnum < receivers.size() && receivers[cnum] != 0) {
552     receivers[cnum]->on = 1;
553   }
554 }
555
556 void LocalBarrier::TurnOffReceiver(LDBarrierReceiver c)
557 {
558   const int cnum = c.serial;
559   //if (cnum < max_receiver && receivers[cnum] != 0) {
560   if (cnum < receivers.size() && receivers[cnum] != 0) {
561     receivers[cnum]->on = 0;
562   }
563 }
564
565 void LocalBarrier::AtBarrier(LDBarrierClient h)
566 {
567   (clients[h.serial])->refcount++;
568   at_count++;
569   CheckBarrier();
570 }
571
572 void LocalBarrier::CheckBarrier()
573 {
574   if (!on) return;
575
576   // If there are no clients, resume as soon as we're turned on
577
578   if (client_count == 0) {
579     cur_refcount++;
580     CallReceivers();
581   }
582   if (at_count >= client_count) {
583     CmiBool at_barrier = CmiFalse;
584
585 //    for(int i=0; i < max_client; i++)
586     for(int i=0; i < clients.size(); i++)
587       if (clients[i] != 0 && ((client*)clients[i])->refcount >= cur_refcount)
588         at_barrier = CmiTrue;
589                 
590     if (at_barrier) {
591       at_count -= client_count;
592       cur_refcount++;
593       CallReceivers();
594     }
595   }
596 }
597
598 void LocalBarrier::CallReceivers(void)
599 {
600   CmiBool called_receiver=CmiFalse;
601
602 //  for(int i=0; i < max_receiver; i++)
603 //   for (int i=max_receiver-1; i>=0; i--) {
604    for (int i=receivers.size()-1; i>=0; i--) {
605       receiver *recv = receivers[i];
606       if (recv != 0 && recv->on) {
607         recv->fn(recv->data);
608         called_receiver = CmiTrue;
609       }
610   }
611
612   if (!called_receiver)
613     ResumeClients();
614   
615 }
616
617 void LocalBarrier::ResumeClients(void)
618 {
619 //  for(int i=0; i < max_client; i++)
620   for(int i=0; i < clients.size(); i++)
621     if (clients[i] != 0) {
622       ((client*)clients[i])->fn(((client*)clients[i])->data);
623     }   
624 }
625
626 #endif
627
628 /*@}*/