minor changes, removed warnings.
[charm.git] / src / ck-ldb / LBDBManager.C
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 /**
9  * \addtogroup CkLdb
10 */
11 /*@{*/
12
13 #include <charm++.h>
14
15 #if CMK_LBDB_ON
16
17 #include <iostream.h>
18 #include "LBDBManager.h"
19
20 struct MigrateCB;
21
22 /*************************************************************
23  * Set up the builtin barrier-- the load balancer needs somebody
24  * to call AtSync on each PE in case there are no atSync array 
25  * elements.  The builtin-atSync caller (batsyncer) does this.
26  */
27
28 //Called periodically-- starts next load balancing cycle
29 void LBDB::batsyncer::gotoSync(void *bs)
30 {
31   LBDB::batsyncer *s=(LBDB::batsyncer *)bs;
32   s->db->AtLocalBarrier(s->BH);
33 }
34 //Called at end of each load balancing cycle
35 void LBDB::batsyncer::resumeFromSync(void *bs)
36 {
37   LBDB::batsyncer *s=(LBDB::batsyncer *)bs;
38   CcdCallFnAfterOnPE((CcdVoidFn)gotoSync,(void *)s,(int)(1000*s->period), CkMyPe());
39 }
40
41 void LBDB::batsyncer::init(LBDB *_db,double initPeriod)
42 {
43   db=_db;
44   period=initPeriod;
45   BH = db->AddLocalBarrierClient((LDResumeFn)resumeFromSync,(void*)(this));
46   //This just does a CcdCallFnAfter
47   resumeFromSync((void *)this);
48 }
49
50
51 /*************************************************************
52  * LBDB Code
53  *************************************************************/
54
55 LBDB::LBDB(): useBarrier(CmiTrue)
56 {
57     statsAreOn = CmiFalse;
58     omCount = objCount = oms_registering = 0;
59     obj_running = CmiFalse;
60     commTable = new LBCommTable;
61     obj_walltime = obj_cputime = 0;
62     batsync.init(this,1.0);
63     startLBFn_count = 0;
64 }
65
66 LDOMHandle LBDB::AddOM(LDOMid _userID, void* _userData, 
67                        LDCallbacks _callbacks)
68 {
69   LDOMHandle newhandle;
70
71   newhandle.ldb.handle = (void*)(this);
72 //  newhandle.user_ptr = _userData;
73   newhandle.id = _userID;
74
75   LBOM* om = new LBOM(this,_userID,_userData,_callbacks);
76   if (om != NULL) {
77     newhandle.handle = oms.length();
78     oms.insertAtEnd(om);
79   } else newhandle.handle = -1;
80   om->DepositHandle(newhandle);
81   omCount++;
82   return newhandle;
83 }
84
85 LDObjHandle LBDB::AddObj(LDOMHandle _omh, LDObjid _id,
86                          void *_userData, CmiBool _migratable)
87 {
88   LDObjHandle newhandle;
89
90   newhandle.omhandle = _omh;
91 //  newhandle.user_ptr = _userData;
92   newhandle.id = _id;
93   
94 #if 1
95   newhandle.handle = objs.length();
96   LBObj *obj = new LBObj(this, newhandle, _userData, _migratable);
97   objs.insertAtEnd(obj);
98 #else
99   LBObj *obj = new LBObj(this,_omh,_id,_userData,_migratable);
100   if (obj != NULL) {
101     newhandle.handle = objs.length();
102     objs.insertAtEnd(obj);
103   } else {
104     newhandle.handle = -1;
105   }
106   obj->DepositHandle(newhandle);
107 #endif
108   objCount++;
109   return newhandle;
110 }
111
112 void LBDB::UnregisterObj(LDObjHandle _h)
113 {
114   (objs[_h.handle])->registered=CmiFalse;
115 }
116
117 void LBDB::RegisteringObjects(LDOMHandle _h)
118 {
119   LBOM* om = oms[_h.handle];
120   if (!om->RegisteringObjs()) {
121     if (oms_registering == 0)
122       localBarrier.TurnOff();
123     oms_registering++;
124     om->SetRegisteringObjs(CmiTrue);
125   }
126 }
127
128 void LBDB::DoneRegisteringObjects(LDOMHandle _h)
129 {
130   LBOM* om = oms[_h.handle];
131   if (om->RegisteringObjs()) {
132     oms_registering--;
133     if (oms_registering == 0)
134       localBarrier.TurnOn();
135     om->SetRegisteringObjs(CmiFalse);
136   }
137 }
138
139
140 void LBDB::Send(const LDOMHandle &destOM, const LDObjid &destid, unsigned int bytes)
141 {
142   LBCommData* item_ptr;
143
144   if (obj_running) {
145     const LDObjHandle &runObj = RunningObj();
146
147     // Don't record self-messages from an object to an object
148     if ( LDOMidEqual(runObj.omhandle.id,destOM.id)
149          && LDObjIDEqual(runObj.id,destid) )
150       return;
151
152     // In the future, we'll have to eliminate processor to same 
153     // processor messages as well
154
155     LBCommData item(runObj,destOM.id,destid);
156     item_ptr = commTable->HashInsertUnique(item);
157   } else {
158     LBCommData item(CkMyPe(),destOM.id,destid);
159     item_ptr = commTable->HashInsertUnique(item);
160   }  
161   item_ptr->addMessage(bytes);
162 }
163
164 void LBDB::ClearLoads(void)
165 {
166   int i;
167   for(i=0; i < objCount; i++) {
168     LBObj *obj = objs[i];
169     if (obj->registered)
170     {
171       if (obj->data.cpuTime>.0) {
172         obj->lastCpuTime = obj->data.cpuTime;
173         obj->lastWallTime = obj->data.wallTime;
174       }
175       obj->data.wallTime = 
176         obj->data.cpuTime = 0.;
177     }
178   }
179   delete commTable;
180   commTable = new LBCommTable;
181   machineUtil.Clear();
182   obj_walltime = obj_cputime = 0;
183 }
184
185 int LBDB::ObjDataCount()
186 {
187   int nitems=0;
188   int i;
189   if (lb_ignoreBgLoad) {
190   for(i=0; i < objCount; i++)
191     if ((objs[i])->registered && (objs[i])->data.migratable)
192       nitems++;
193   }
194   else {
195   for(i=0; i < objCount; i++)
196     if ((objs[i])->registered)
197       nitems++;
198   }
199   return nitems;
200 }
201
202 void LBDB::GetObjData(LDObjData *dp)
203 {
204   if (lb_ignoreBgLoad) {
205   for(int i = 0; i < objs.length(); i++) {
206     LBObj* obj = objs[i];
207     if ( obj->registered && obj->data.migratable)
208       *dp++ = obj->ObjData();
209   }
210   }
211   else {
212   for(int i = 0; i < objs.length(); i++) {
213     LBObj* obj = objs[i];
214     if ( obj->registered )
215       *dp++ = obj->ObjData();
216   }
217   }
218 }
219
220 void LBDB::Migrate(LDObjHandle h, int dest)
221 {
222   if (h.handle > objCount)
223     CmiPrintf("[%d] Handle %d out of range 0-%d\n",CkMyPe(),h.handle,objCount);
224   else if (!(objs[h.handle])->registered)
225     CmiPrintf("[%d] Handle %d no longer registered, range 0-%d\n",
226             CkMyPe(),h.handle,objCount);
227
228   if ((h.handle < objCount) && ((objs[h.handle])->registered)) {
229     LBOM *const om = oms[(objs[h.handle])->parentOM().handle];
230     om->Migrate(h, dest);
231   }
232   return;
233 }
234
235 void LBDB::Migrated(LDObjHandle h)
236 {
237   // Object migrated, inform load balancers
238
239   for(int i=0; i < migrateCBList.length(); i++) {
240     MigrateCB* cb = (MigrateCB*)migrateCBList[i];
241     (cb->fn)(cb->data,h);
242   }
243   
244 }
245
246 void LBDB::NotifyMigrated(LDMigratedFn fn, void* data)
247 {
248   // Save migration function
249   MigrateCB* callbk = new MigrateCB;
250
251   callbk->fn = fn;
252   callbk->data = data;
253   migrateCBList.insertAtEnd(callbk);
254 }
255
256 void LBDB::AddStartLBFn(LDStartLBFn fn, void* data)
257 {
258   // Save startLB function
259   StartLBCB* callbk = new StartLBCB;
260
261   callbk->fn = fn;
262   callbk->data = data;
263   startLBFnList.push_back(callbk);
264   startLBFn_count++;
265 }
266
267 void LBDB::RemoveStartLBFn(LDStartLBFn fn)
268 {
269   for (int i=0; i<startLBFnList.length(); i++) {
270     if (startLBFnList[i]->fn == fn) {
271       startLBFnList[i] = 0; 
272       startLBFn_count --;
273       break;
274     }
275   }
276 }
277
278 void LBDB::StartLB() 
279 {
280   if (startLBFn_count == 0) {
281     CmiAbort("StartLB is not supported in this LB");
282   }
283   for (int i=0; i<startLBFnList.length(); i++) {
284     StartLBCB *startLBFn = startLBFnList[i];
285     if (startLBFn) startLBFn->fn(startLBFn->data);
286   }
287 }
288
289 void LBDB::BackgroundLoad(double* walltime, double* cputime)
290 {
291   double totalwall;
292   double totalcpu;
293   TotalTime(&totalwall,&totalcpu);
294
295   double idle;
296   IdleTime(&idle);
297   
298   *walltime = totalwall - idle - obj_walltime;
299   *cputime = totalcpu - obj_cputime;
300 }
301
302 void LBDB::DumpDatabase()
303 {
304 #ifdef DEBUG  
305   CmiPrintf("Database contains %d object managers\n",omCount);
306   CmiPrintf("Database contains %d objects\n",objCount);
307 #endif
308 }
309
310 int LBDB::useMem() {
311   int size = sizeof(LBDB);
312   size += oms.length() * sizeof(LBOM);
313   size += objs.length() * sizeof(LBObj);
314   size += migrateCBList.length() * sizeof(MigrateCBList);
315   size += startLBFnList.length() * sizeof(StartLBCB);
316   size += commTable->useMem();
317   return size;
318 }
319
320 LDBarrierClient LocalBarrier::AddClient(LDResumeFn fn, void* data)
321 {
322   client* new_client = new client;
323   new_client->fn = fn;
324   new_client->data = data;
325   new_client->refcount = cur_refcount;
326
327   LDBarrierClient ret_val;
328   ret_val.serial = max_client;
329   clients.insertAtEnd(new_client);
330   max_client++;
331
332   client_count++;
333
334   return ret_val;
335 }
336
337 void LocalBarrier::RemoveClient(LDBarrierClient c)
338 {
339   const int cnum = c.serial;
340   if (cnum < max_client && clients[cnum] != 0) {
341     delete (clients[cnum]);
342     clients[cnum] = 0;
343     client_count--;
344   }
345 }
346
347 LDBarrierReceiver LocalBarrier::AddReceiver(LDBarrierFn fn, void* data)
348 {
349   receiver* new_receiver = new receiver;
350   new_receiver->fn = fn;
351   new_receiver->data = data;
352
353   LDBarrierReceiver ret_val;
354   ret_val.serial = max_receiver;
355   receivers.insertAtEnd(new_receiver);
356   max_receiver++;
357
358   return ret_val;
359 }
360
361 void LocalBarrier::RemoveReceiver(LDBarrierReceiver c)
362 {
363   const int cnum = c.serial;
364   if (cnum < max_receiver && receivers[cnum] != 0) {
365     delete (receivers[cnum]);
366     receivers[cnum] = 0;
367   }
368 }
369
370 void LocalBarrier::AtBarrier(LDBarrierClient h)
371 {
372   (clients[h.serial])->refcount++;
373   at_count++;
374   CheckBarrier();
375 }
376
377 void LocalBarrier::CheckBarrier()
378 {
379   if (!on) return;
380
381   // If there are no clients, resume as soon as we're turned on
382
383   if (client_count == 0) {
384     cur_refcount++;
385     CallReceivers();
386   }
387   if (at_count >= client_count) {
388     CmiBool at_barrier = CmiFalse;
389
390     for(int i=0; i < max_client; i++)
391       if (clients[i] != 0 && ((client*)clients[i])->refcount >= cur_refcount)
392         at_barrier = CmiTrue;
393
394     if (at_barrier) {
395       at_count -= client_count;
396       cur_refcount++;
397       CallReceivers();
398     }
399   }
400 }
401
402 void LocalBarrier::CallReceivers(void)
403 {
404   CmiBool called_receiver=CmiFalse;
405
406 //  for(int i=0; i < max_receiver; i++)
407     for (int i=max_receiver-1; i>=0; i--)
408       if (receivers[i] != 0) {
409         ((receiver*)receivers[i])->fn(((receiver*)receivers[i])->data);
410         called_receiver = CmiTrue;
411       }
412
413   if (!called_receiver)
414     ResumeClients();
415   
416 }
417
418 void LocalBarrier::ResumeClients(void)
419 {
420   for(int i=0; i < max_client; i++)
421     if (clients[i] != 0) 
422       ((client*)clients[i])->fn(((client*)clients[i])->data);
423 }
424
425 #endif
426
427 /*@}*/