doc: Add serial to list of ci file reserved words
[charm.git] / src / ck-ldb / HbmLB.C
1 /**
2  * \addtogroup CkLdb
3 */
4 /*@{*/
5
6 #include <math.h>
7 #include "HbmLB.h"
8 #include "LBDBManager.h"
9 #include "GreedyLB.h"
10 #include "GreedyCommLB.h"
11 #include "RefineCommLB.h"
12 #include "RefineLB.h"
13
14 #define  DEBUGF(x)     //  CmiPrintf x;
15
16 CreateLBFunc_Def(HbmLB, "HybridBase load balancer")
17
18 void HbmLB::staticMigrated(void* data, LDObjHandle h, int waitBarrier)
19 {
20   HbmLB *me = (HbmLB*)(data);
21
22   me->Migrated(h, waitBarrier);
23 }
24
25 void HbmLB::staticAtSync(void* data)
26 {
27   HbmLB *me = (HbmLB*)(data);
28
29   me->AtSync();
30 }
31
32 HbmLB::HbmLB(const CkLBOptions &opt): BaseLB(opt)
33 {
34 #if CMK_LBDB_ON
35   lbname = (char *)"HbmLB";
36   thisProxy = CProxy_HbmLB(thisgroup);
37   receiver = theLbdb->
38     AddLocalBarrierReceiver((LDBarrierFn)(staticAtSync),
39                             (void*)(this));
40   notifier = theLbdb->getLBDB()->
41     NotifyMigrated((LDMigratedFn)(staticMigrated), (void*)(this));
42
43   // defines topology
44   tree = new HypercubeTree;
45
46   currentLevel = 0;
47   foundNeighbors = 0;
48
49   maxLoad = 0.0;
50   vector_n_moves = 0;
51   maxLoad = 0.0;
52   maxCpuLoad = 0.0;
53   totalLoad = 0.0;
54   maxCommCount = 0;
55   maxCommBytes = 0.0;
56
57   if (_lb_args.statsOn()) theLbdb->CollectStatsOn();
58 #endif
59 }
60
61 HbmLB::~HbmLB()
62 {
63 #if CMK_LBDB_ON
64   theLbdb = CProxy_LBDatabase(_lbdb).ckLocalBranch();
65   if (theLbdb) {
66     theLbdb->getLBDB()->
67       RemoveNotifyMigrated(notifier);
68     //theLbdb->
69     //  RemoveStartLBFn((LDStartLBFn)(staticStartLB));
70   }
71   delete tree;
72 #endif
73 }
74
75 // get tree information
76 void HbmLB::FindNeighbors()
77 {
78   if (foundNeighbors == 0) { // Neighbors never initialized, so init them
79                            // and other things that depend on the number
80                            // of neighbors
81
82     int nlevels = tree->numLevels();
83     int mype = CkMyPe();
84     for (int level=0; level<nlevels; level++) 
85     {
86       LevelData *data = new LevelData;
87       data->parent = tree->parent(mype, level);
88       if (tree->isroot(mype, level)) {
89         data->nChildren = tree->numChildren(mype, level);
90         data->children = new int[data->nChildren];
91         tree->getChildren(mype, level, data->children, data->nChildren);
92         data->statsData = new LDStats(data->nChildren+1);
93         //  a fake processor
94         ProcStats &procStat = data->statsData->procs[data->nChildren];
95         procStat.available = CmiFalse;
96       }
97       levelData.push_back(data);
98       DEBUGF(("[%d] level: %d nchildren:%d - %d %d\n", CkMyPe(), level, data->nChildren, data->nChildren>0?data->children[0]:-1, data->nChildren>1?data->children[1]:-1));
99     }
100     
101     foundNeighbors = 1;
102   }   // end if
103 }
104
105 void HbmLB::AtSync()
106 {
107 #if CMK_LBDB_ON
108   //  CkPrintf("[%d] HbmLB At Sync step %d!!!!\n",CkMyPe(),mystep);
109
110   FindNeighbors();
111
112   // if num of processor is only 1, nothing should happen
113   if (!QueryBalanceNow(step()) || CkNumPes() == 1) {
114     MigrationDone(0);
115     return;
116   }
117
118   thisProxy[CkMyPe()].ProcessAtSync();
119 #endif
120 }
121
122 void HbmLB::ProcessAtSync()
123 {
124 #if CMK_LBDB_ON
125   int i;
126   start_lb_time = 0;
127
128   if (CkMyPe() == 0) {
129     start_lb_time = CkWallTimer();
130     if (_lb_args.debug())
131       CkPrintf("[%s] Load balancing step %d starting at %f\n",
132                lbName(), step(), CkWallTimer());
133   }
134
135   // build LDStats
136   LBRealType total_walltime, total_cputime, idletime, bg_walltime, bg_cputime;
137   theLbdb->TotalTime(&total_walltime,&total_cputime);
138   theLbdb->IdleTime(&idletime);
139   theLbdb->BackgroundLoad(&bg_walltime,&bg_cputime);
140
141   myStats.n_objs = theLbdb->GetObjDataSz();
142   myStats.objData.resize(myStats.n_objs);
143   myStats.from_proc.resize(myStats.n_objs);
144   myStats.to_proc.resize(myStats.n_objs);
145   theLbdb->GetObjData(myStats.objData.getVec());
146   for (i=0; i<myStats.n_objs; i++)
147     myStats.from_proc[i] = myStats.to_proc[i] = 0;    // only one PE
148
149   myStats.n_comm = theLbdb->GetCommDataSz();
150   myStats.commData.resize(myStats.n_comm);
151   theLbdb->GetCommData(myStats.commData.getVec());
152
153   myStats.complete_flag = 0;
154
155   // send to parent
156   DEBUGF(("[%d] Send stats to parent %d\n", CkMyPe(), levelData[0]->parent));
157   double tload = 0.0;
158   for (i=0; i<myStats.n_objs; i++) tload += myStats.objData[i].wallTime;
159   thisProxy[levelData[0]->parent].ReceiveStats(tload, CkMyPe(), 0);
160 #endif
161 }
162
163 void HbmLB::ReceiveStats(double t, int frompe, int fromlevel)
164 {
165 #if CMK_LBDB_ON
166   FindNeighbors();
167
168   int atlevel = fromlevel + 1;
169   CmiAssert(tree->isroot(CkMyPe(), atlevel));
170
171   DEBUGF(("[%d] ReceiveStats from PE %d from level: %d\n", CkMyPe(), frompe, fromlevel));
172   int neighborIdx = NeighborIndex(frompe, atlevel);
173   CmiAssert(neighborIdx==0 || neighborIdx==1);
174   LevelData *lData = levelData[atlevel];
175   lData->statsList[neighborIdx] = t;
176
177   int &stats_msg_count = levelData[atlevel]->stats_msg_count;
178   stats_msg_count ++;
179
180   DEBUGF(("[%d] ReceiveStats at level: %d %d/%d\n", CkMyPe(), atlevel, stats_msg_count, levelData[atlevel]->nChildren));
181   if (stats_msg_count == levelData[atlevel]->nChildren)  
182   {
183     stats_msg_count = 0;
184     int parent = levelData[atlevel]->parent;
185
186     // load balancing
187     thisProxy[CkMyPe()].Loadbalancing(atlevel);
188   }
189
190 #endif  
191 }
192
193
194 inline double myabs(double x) { return x>0.0?x:-x; }
195 inline double mymax(double x, double y) { return x>y?x:y; }
196
197 //  LDStats data sent to parent contains real PE
198 //  LDStats in parent should contain relative PE
199 void HbmLB::Loadbalancing(int atlevel)
200 {
201
202   CmiAssert(atlevel >= 1);
203
204   LevelData *lData = levelData[atlevel];
205   LDStats *statsData = lData->statsData;
206   CmiAssert(statsData);
207
208   // at this time, all objects processor location is relative, and 
209   // all incoming objects from outside group belongs to the fake root proc.
210
211   // clear background load if needed
212   if (_lb_args.ignoreBgLoad()) statsData->clearBgLoad();
213
214   currentLevel = atlevel;
215
216   double start_lb_time(CkWallTimer());
217
218   double lload = lData->statsList[0];
219   double rload = lData->statsList[1];
220
221   double diff = myabs(lload-rload);
222   double maxl = mymax(lload, rload);
223   double avg =  (lload+rload)/2.0;
224 CkPrintf("[%d] lload: %f rload: %f atlevel: %d\n", CkMyPe(), lload, rload, atlevel);
225   if (diff/avg > 0.02) {
226     // we need to perform load balancing
227     int numpes = (int)pow(2.0, atlevel);
228     double delta = myabs(lload-rload) / numpes;
229
230     int overloaded = lData->children[0];
231     if (lload < rload) {
232       overloaded = lData->children[1];
233     }
234     DEBUGF(("[%d] branch %d is overloaded by %f... \n", CkMyPe(), overloaded, delta));
235     thisProxy[overloaded].ReceiveMigrationDelta(delta, atlevel, atlevel);
236   }
237   else {
238     LoadbalancingDone(atlevel);
239   }
240 }
241
242 // when receiving all response from underloaded pes
243 void HbmLB::LoadbalancingDone(int atlevel)
244 {
245   LevelData *lData = levelData[atlevel];
246   DEBUGF(("[%d] LoadbalancingDone at level: %d\n", CkMyPe(), atlevel));
247   if (lData->parent != -1) {
248     // send sum up
249     double lload = lData->statsList[0];
250     double rload = lData->statsList[1];
251     double totalLoad = lload + rload;
252     thisProxy[lData->parent].ReceiveStats(totalLoad, CkMyPe(), atlevel);
253   }
254   else {
255     // done now, broadcast via tree to resume all
256 //    thisProxy.ReceiveResumeClients(1, tree->numLevels()-1, lData->nChildren, lData->children);
257     thisProxy.ReceiveResumeClients(1, tree->numLevels()-1);
258   }
259 }
260
261 void HbmLB::ReceiveResumeClients(int balancing, int fromlevel){
262 #if 0
263   int atlevel = fromlevel-1;
264   LevelData *lData = levelData[atlevel];
265   if (atlevel != 0) 
266     thisProxy.ReceiveResumeClients(balancing, atlevel, lData->nChildren, lData->children);
267   else
268     ResumeClients(balancing);
269 #else
270   ResumeClients(balancing);    // it is always syncResume
271 /*
272   if (balancing && _lb_args.syncResume()) {
273     // max load of all
274     CkCallback cb(CkIndex_HbmLB::ResumeClients((CkReductionMsg*)NULL),
275                   thisProxy);
276     contribute(sizeof(double), &maxLoad, CkReduction::max_double, cb);
277   }
278   else
279     thisProxy[CkMyPe()].ResumeClients(balancing);
280   }
281 */
282 #endif
283 }
284
285 // pick objects to migrate "t" amount of work
286 void HbmLB::ReceiveMigrationDelta(double t, int lblevel, int fromlevel)
287 {
288 #if CMK_LBDB_ON
289   int i;
290   int atlevel = fromlevel-1;
291   LevelData *lData = levelData[atlevel];
292   if (atlevel != 0) {
293     thisProxy.ReceiveMigrationDelta(t, lblevel, atlevel, lData->nChildren, lData->children);
294     return;
295   }
296
297   // I am leave, find objects to migrate
298
299   CkVec<int> migs;
300   CkVec<LDObjData> &objData = myStats.objData;
301   for (i=0; i<myStats.n_objs; i++) {
302     LDObjData &oData = objData[i];
303     if (oData.wallTime < t) {
304       migs.push_back(i);
305       t -= oData.wallTime;
306       if (t == 0.0) break;
307     }
308   }
309
310   int nmigs = migs.size();
311   // send a message to 
312   int matchPE = CkMyPe() ^ (1<<(lblevel-1));
313   
314   DEBUGF(("[%d] migrating %d objs to %d at lblevel %d! \n", CkMyPe(),nmigs,matchPE,lblevel));
315   thisProxy[matchPE].ReceiveMigrationCount(nmigs, lblevel);
316
317   // migrate objects
318   for (i=0; i<nmigs; i++) {
319     int idx = migs[i]-i;
320     LDObjData &oData = objData[idx];
321     CkVec<LDCommData> comms;
322     collectCommData(idx, comms);
323     thisProxy[matchPE].ObjMigrated(oData, comms.getVec(), comms.size());
324     theLbdb->Migrate(oData.handle, matchPE);
325     // TODO modify LDStats
326     DEBUGF(("myStats.removeObject: %d, %d, %d\n", migs[i], i, objData.size()));
327     myStats.removeObject(idx);
328   }
329 #endif
330 }
331
332 // find sender comms
333 void HbmLB::collectCommData(int objIdx, CkVec<LDCommData> &comms)
334 {
335 #if CMK_LBDB_ON
336   LevelData *lData = levelData[0];
337
338   LDObjData &objData = myStats.objData[objIdx];
339
340   for (int com=0; com<myStats.n_comm; com++) {
341     LDCommData &cdata = myStats.commData[com];
342     if (cdata.from_proc()) continue;
343     if (cdata.sender.objID() == objData.objID() && cdata.sender.omID() == objData.omID())
344       comms.push_back(cdata);
345   }
346 #endif
347 }
348
349
350 // an object arrives with only objdata
351 void HbmLB::ObjMigrated(LDObjData data, LDCommData *cdata, int n)
352 {
353   LevelData *lData = levelData[0];
354   // change LDStats
355   CkVec<LDObjData> &oData = myStats.objData;
356
357     // need to update LDObjHandle later
358   lData->obj_completed++;
359   data.handle.handle = -100;
360   oData.push_back(data);
361   myStats.n_objs++;
362   if (data.migratable) myStats.n_migrateobjs++;
363   myStats.from_proc.push_back(-1);    // not important
364   myStats.to_proc.push_back(0);
365
366     // copy into comm data
367   if (n) {
368     CkVec<LDCommData> &cData = myStats.commData;
369     for (int i=0; i<n; i++) 
370         cData.push_back(cdata[i]);
371     myStats.n_comm += n;
372     myStats.deleteCommHash();
373   }
374
375   if (lData->migrationDone()) {
376     // migration done finally
377     MigrationDone(1);
378   }
379 }
380
381 void HbmLB::ReceiveMigrationCount(int count, int lblevel)
382 {
383   lbLevel = lblevel;
384
385   LevelData *lData = levelData[0];
386   lData->migrates_expected =  count;
387   if (lData->migrationDone()) {
388     // migration done finally
389     MigrationDone(1);
390   }
391 }
392
393 void HbmLB::Migrated(LDObjHandle h, int waitBarrier)
394 {
395   LevelData *lData = levelData[0];
396
397   lData->migrates_completed++;
398   newObjs.push_back(h);
399   DEBUGF(("[%d] An object migrated! %d %d\n", CkMyPe(),lData->migrates_completed,lData->migrates_expected));
400   if (lData->migrationDone()) {
401     // migration done finally
402     MigrationDone(1);
403   }
404 }
405
406 void HbmLB::NotifyObjectMigrationDone(int fromlevel, int lblevel)
407 {
408
409   int atlevel = fromlevel + 1;
410   LevelData *lData = levelData[atlevel];
411
412   lData->mig_reported ++;
413   DEBUGF(("[%d] HbmLB::NotifyObjectMigrationDone at level: %d lblevel: %d reported: %d!\n", CkMyPe(), atlevel, lblevel, lData->mig_reported));
414   if (atlevel < lblevel) {
415     if (lData->mig_reported == lData->nChildren) {
416       lData->mig_reported = 0;
417       thisProxy[lData->parent].NotifyObjectMigrationDone(atlevel, lbLevel);
418     }
419   }
420   else {
421     if (lData->mig_reported == lData->nChildren/2) {   // half tree
422       lData->mig_reported = 0;
423       // load balancing done at this level
424       LoadbalancingDone(atlevel);
425     }
426   }
427 }
428
429 // migration done at current lbLevel
430 void HbmLB::MigrationDone(int balancing)
431 {
432 #if CMK_LBDB_ON
433   int i, j;
434   LevelData *lData = levelData[0];
435
436   DEBUGF(("[%d] HbmLB::MigrationDone lbLevel:%d numLevels:%d!\n", CkMyPe(), lbLevel, tree->numLevels()));
437
438   CmiAssert(newObjs.size() == lData->migrates_expected);
439
440 #if 0
441   if (lbLevel == tree->numLevels()-1) {
442     theLbdb->incStep();
443     // reset 
444     lData->clear();
445   }
446   else {
447     lData->migrates_expected = -1;
448     lData->migrates_completed = 0;
449     lData->obj_completed = 0;
450   }
451 #else
452   lData->migrates_expected = -1;
453   lData->migrates_completed = 0;
454   lData->obj_completed = 0;
455 #endif
456
457   CkVec<LDObjData> &oData = myStats.objData;
458
459   // update
460   int count=0;
461   for (i=0; i<oData.size(); i++)
462     if (oData[i].handle.handle == -100) count++;
463   CmiAssert(count == newObjs.size());
464
465   for (i=0; i<oData.size(); i++) {
466     if (oData[i].handle.handle == -100) {
467       LDObjHandle &handle = oData[i].handle;
468       for (j=0; j<newObjs.size(); j++) {
469         if (handle.omID() == newObjs[j].omID() && 
470                   handle.objID() == newObjs[j].objID()) {
471           handle = newObjs[j];
472           break;
473         }
474       }
475       CmiAssert(j<newObjs.size());
476     }
477   }
478   newObjs.free();
479
480   thisProxy[lData->parent].NotifyObjectMigrationDone(0, lbLevel);
481 #endif
482 }
483
484 void HbmLB::ResumeClients(CkReductionMsg *msg)
485 {
486   if (CkMyPe() == 0 && _lb_args.printSummary()) {
487     double mload = *(double *)msg->getData();
488     CkPrintf("[%d] MAX Load: %f at step %d.\n", CkMyPe(), mload, step()-1);
489   }
490   ResumeClients(1);
491   delete msg;
492 }
493
494 void HbmLB::ResumeClients(int balancing)
495 {
496 #if CMK_LBDB_ON
497   DEBUGF(("[%d] ResumeClients. \n", CkMyPe()));
498
499   theLbdb->incStep();
500   // reset 
501   LevelData *lData = levelData[0];
502   lData->clear();
503
504   if (CkMyPe() == 0 && balancing) {
505     double end_lb_time = CkWallTimer();
506     if (_lb_args.debug())
507       CkPrintf("[%s] Load balancing step %d finished at %f duration %f\n",
508                 lbName(), step()-1,end_lb_time,end_lb_time - start_lb_time);
509   }
510   if (balancing && _lb_args.printSummary()) {
511       int count = 1;
512       LBInfo info(count);
513       LDStats *stats = &myStats;
514       info.getInfo(stats, count, 0);    // no comm cost
515       LBRealType mLoad, mCpuLoad, totalLoad;
516       info.getSummary(mLoad, mCpuLoad, totalLoad);
517       int nmsgs, nbytes;
518       stats->computeNonlocalComm(nmsgs, nbytes);
519       CkPrintf("[%d] Load with %d objs: max (with comm): %f max (obj only): %f total: %f on %d processors at step %d useMem: %fKB nonlocal: %d %.2fKB.\n", CkMyPe(), stats->n_objs, mLoad, mCpuLoad, totalLoad, count, step()-1, (1.0*useMem())/1024, nmsgs, nbytes/1024.0);
520       thisProxy[0].reportLBQulity(mLoad, mCpuLoad, totalLoad, nmsgs, 1.0*nbytes/1024.0);
521   }
522
523   // zero out stats
524   theLbdb->ClearLoads();
525
526   theLbdb->ResumeClients();
527 #endif
528 }
529
530 // only called on PE 0
531 void HbmLB::reportLBQulity(double mload, double mCpuLoad, double totalload, int nmsgs, double bytes)
532 {
533   static int pecount=0;
534   CmiAssert(CkMyPe() == 0);
535   if (mload > maxLoad) maxLoad = mload;
536   if (mCpuLoad > maxCpuLoad) maxCpuLoad = mCpuLoad;
537   totalLoad += totalload;
538   maxCommCount += nmsgs;
539   maxCommBytes += bytes;   // KB
540   pecount++;
541   if (pecount == CkNumPes()) {
542     CkPrintf("[%d] Load Summary: max (with comm): %f max (obj only): %f total: %f at step %d nonlocal: %d msgs, %.2fKB reported from %d PEs.\n", CkMyPe(), maxLoad, maxCpuLoad, totalLoad, step(), maxCommCount, maxCommBytes, pecount);
543     maxLoad = 0.0;
544     maxCpuLoad = 0.0;
545     totalLoad = 0.0;
546     maxCommCount = 0;
547     maxCommBytes = 0.0;
548     pecount = 0;
549   }
550 }
551
552 void HbmLB::work(LDStats* stats)
553 {
554 #if CMK_LBDB_ON
555   CkPrintf("[%d] HbmLB::work called!\n", CkMyPe());
556 #endif
557 }
558   
559 int HbmLB::NeighborIndex(int pe, int atlevel)
560 {
561     int peslot = -1;
562     for(int i=0; i < levelData[atlevel]->nChildren; i++) {
563       if (pe == levelData[atlevel]->children[i]) {
564         peslot = i;
565         break;
566       }
567     }
568     return peslot;
569 }
570
571 int HbmLB::useMem()
572 {
573   int i;
574   int memused = 0;
575   for (i=0; i<levelData.size(); i++)
576     if (levelData[i]) memused+=levelData[i]->useMem();
577   return memused;
578 }
579
580 #include "HbmLB.def.h"
581
582 /*@{*/
583
584