06a1cb8675e407b963f1cd7cde0157fd45fe52b9
[charm.git] / src / ck-ldb / CentralLB.C
1 /**
2  * \addtogroup CkLdb
3 */
4 /*@{*/
5
6 #include <charm++.h>
7 #include "ck.h"
8 #include "envelope.h"
9 #include "CentralLB.h"
10 #include "LBDBManager.h"
11 #include "LBSimulation.h"
12
13 #define  DEBUGF(x)       // CmiPrintf x;
14 #define  DEBUG(x)        // x;
15
16 #if CMK_MEM_CHECKPOINT
17    /* can not handle reduction in inmem FT */
18 #define USE_REDUCTION         0
19 #define USE_LDB_SPANNING_TREE 0
20 #elif defined(_FAULT_MLOG_)
21 /* can not handle reduction in inmem FT */
22 #define USE_REDUCTION         0
23 #define USE_LDB_SPANNING_TREE 0
24 #else
25 #define USE_REDUCTION         1
26 #define USE_LDB_SPANNING_TREE 1
27 #endif
28
29 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
30 extern int _restartFlag;
31 extern void getGlobalStep(CkGroupID );
32 extern void initMlogLBStep(CkGroupID );
33 extern int globalResumeCount;
34 extern void sendDummyMigrationCounts(int *);
35 #endif
36
37 #if CMK_GRID_QUEUE_AVAILABLE
38 CpvExtern(void *, CkGridObject);
39 #endif
40
41 CkGroupID loadbalancer;
42 int * lb_ptr;
43 int load_balancer_created;
44
45 double lb_iteration_start_time = 0.0;
46 double lb_ideal_trigger_lb_time = 0.0;
47 double lb_migration_cost = 0.0;
48 double lb_strategy_cost = 0.0;
49 int lb_no_iterations = 0;
50 double lb_min_stat_coll_period = 0.0;
51 double cur_max_pe_load = 0.0;
52 double cur_avg_pe_load = 0.0;
53
54 CreateLBFunc_Def(CentralLB, "CentralLB base class")
55
56 static void getPredictedLoadWithMsg(BaseLB::LDStats* stats, int count, 
57                              LBMigrateMsg *, LBInfo &info, int considerComm);
58
59 CkReductionMsg* lbDataCollection(int nMsg, CkReductionMsg** msgs) {
60   double lb_data[3];
61   lb_data[0] = 0;
62   lb_data[1] = 0;
63   lb_data[2] = 0;
64   for (int i = 0; i < nMsg; i++) {
65     CkAssert(msgs[i]->getSize() == 3*sizeof(double));
66     double* m = (double *)msgs[i]->getData();
67     lb_data[0] += m[0];
68     lb_data[1] = ((m[1] > lb_data[1])? m[1] : lb_data[1]);
69     lb_data[2] += m[2];
70   }
71   return CkReductionMsg::buildNew(3*sizeof(double), lb_data);
72 }
73
74 /*global*/ CkReduction::reducerType lbDataCollectionType;
75 /*initcall*/ void registerLBDataCollection(void) {
76   lbDataCollectionType = CkReduction::addReducer(lbDataCollection);
77 }
78
79 /*
80 void CreateCentralLB()
81 {
82   CProxy_CentralLB::ckNew(0);
83 }
84 */
85
86 void CentralLB::staticStartLB(void* data)
87 {
88   CentralLB *me = (CentralLB*)(data);
89   me->StartLB();
90 }
91
92 void CentralLB::staticMigrated(void* data, LDObjHandle h, int waitBarrier)
93 {
94   CentralLB *me = (CentralLB*)(data);
95   me->Migrated(h, waitBarrier);
96 }
97
98 void CentralLB::staticAtSync(void* data)
99 {
100   CentralLB *me = (CentralLB*)(data);
101   me->AtSync();
102 }
103
104 void CentralLB::initLB(const CkLBOptions &opt)
105 {
106 #if CMK_LBDB_ON
107   lbname = "CentralLB";
108   thisProxy = CProxy_CentralLB(thisgroup);
109   //  CkPrintf("Construct in %d\n",CkMyPe());
110
111   // create and turn on by default
112   receiver = theLbdb->
113     AddLocalBarrierReceiver((LDBarrierFn)(staticAtSync),(void*)(this));
114   notifier = theLbdb->getLBDB()->
115     NotifyMigrated((LDMigratedFn)(staticMigrated),(void*)(this));
116   startLbFnHdl = theLbdb->getLBDB()->
117     AddStartLBFn((LDStartLBFn)(staticStartLB),(void*)(this));
118
119   // CkPrintf("[%d] CentralLB initLB \n",CkMyPe());
120   if (opt.getSeqNo() > 0) turnOff();
121
122   stats_msg_count = 0;
123   statsMsgsList = NULL;
124   statsData = NULL;
125
126   storedMigrateMsg = NULL;
127   reduction_started = 0;
128
129   // for future predictor
130   if (_lb_predict) predicted_model = new FutureModel(_lb_predict_window);
131   else predicted_model=0;
132   // register user interface callbacks
133   theLbdb->getLBDB()->SetupPredictor((LDPredictModelFn)(staticPredictorOn),(LDPredictWindowFn)(staticPredictorOnWin),(LDPredictFn)(staticPredictorOff),(LDPredictModelFn)(staticChangePredictor),(void*)(this));
134
135   myspeed = theLbdb->ProcessorSpeed();
136
137   migrates_completed = 0;
138   future_migrates_completed = 0;
139   migrates_expected = -1;
140   future_migrates_expected = -1;
141   cur_ld_balancer = _lb_args.central_pe();      // 0 default
142   lbdone = 0;
143   count_msgs=0;
144   statsMsg = NULL;
145
146   if (_lb_args.statsOn()) theLbdb->CollectStatsOn();
147
148   load_balancer_created = 1;
149 #endif
150 }
151
152 CentralLB::~CentralLB()
153 {
154 #if CMK_LBDB_ON
155   delete [] statsMsgsList;
156   delete statsData;
157   theLbdb = CProxy_LBDatabase(_lbdb).ckLocalBranch();
158   if (theLbdb) {
159     theLbdb->getLBDB()->
160       RemoveNotifyMigrated(notifier);
161     theLbdb->
162       RemoveStartLBFn((LDStartLBFn)(staticStartLB));
163   }
164 #endif
165 }
166
167 void CentralLB::turnOn() 
168 {
169 #if CMK_LBDB_ON
170   theLbdb->getLBDB()->
171     TurnOnBarrierReceiver(receiver);
172   theLbdb->getLBDB()->
173     TurnOnNotifyMigrated(notifier);
174   theLbdb->getLBDB()->
175     TurnOnStartLBFn(startLbFnHdl);
176 #endif
177 }
178
179 void CentralLB::turnOff() 
180 {
181 #if CMK_LBDB_ON
182   theLbdb->getLBDB()->
183     TurnOffBarrierReceiver(receiver);
184   theLbdb->getLBDB()->
185     TurnOffNotifyMigrated(notifier);
186   theLbdb->getLBDB()->
187     TurnOffStartLBFn(startLbFnHdl);
188 #endif
189 }
190
191 void CentralLB::AtSync()
192 {
193   CkPrintf("AtSync CEntral LB\n");
194 #if CMK_LBDB_ON
195   DEBUGF(("[%d] CentralLB AtSync step %d!!!!!\n",CkMyPe(),step()));
196
197 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
198         CpvAccess(_currentObj)=this;
199 #endif
200
201   // if num of processor is only 1, nothing should happen
202   if (!QueryBalanceNow(step()) || CkNumPes() == 1) {
203     MigrationDone(0);
204     return;
205   }
206   if(CmiNodeAlive(CkMyPe())){
207     thisProxy [CkMyPe()].ProcessAtSyncMin();
208   }
209 #endif
210 }
211
212 #include "ComlibStrategy.h"
213
214 void CentralLB::ProcessAtSync()
215 {
216 #if CMK_LBDB_ON
217   if (reduction_started) return;              // reducton in progress
218
219   CmiAssert(CmiNodeAlive(CkMyPe()));
220   if (CkMyPe() == cur_ld_balancer) {
221     start_lb_time = CkWallTimer();
222   }
223
224
225 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
226         initMlogLBStep(thisgroup);
227 #endif
228
229   // build message
230   BuildStatsMsg();
231
232 #if USE_REDUCTION
233     // reduction to get total number of objects and comm
234     // so that processor 0 can pre-allocate load balancing database
235   int counts[2];
236   counts[0] = theLbdb->GetObjDataSz();
237   counts[1] = theLbdb->GetCommDataSz();
238
239   CkCallback cb(CkIndex_CentralLB::ReceiveCounts((CkReductionMsg*)NULL), 
240                   thisProxy[0]);
241   contribute(2*sizeof(int), counts, CkReduction::sum_int, cb);
242   reduction_started = 1;
243 #else
244   SendStats();
245 #endif
246 #endif
247 }
248
249 void CentralLB::ProcessAtSyncMin()
250 {
251 #if CMK_LBDB_ON
252   if (reduction_started) return;              // reducton in progress
253
254   CmiAssert(CmiNodeAlive(CkMyPe()));
255   if (CkMyPe() == cur_ld_balancer) {
256     start_lb_time = CkWallTimer();
257   }
258
259
260 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
261         initMlogLBStep(thisgroup);
262 #endif
263   
264   CkPrintf("ProcessAtSyncMin [%d]\n", CkMyPe());
265
266 //  if (CkWallTimer() > lb_min_stat_coll_period) {
267 //    CkPrintf("Since the time is past the collection period, send MinStats of [%d]\n", CkMyPe());
268     SendMinStats();
269 //  } else {
270 //    CkPrintf("No need to send MinStats of [%d]\n", CkMyPe());
271 //    ResumeClients(1);
272 //  }
273 #endif
274 }
275
276 void CentralLB::SendMinStats() {
277
278  double total_walltime;
279  double idle_time;
280  double bg_walltime;
281   theLbdb->GetTime(&total_walltime,&total_walltime, &idle_time, &bg_walltime, &bg_walltime);
282   CkPrintf("Total walltime [%d] %lf\n", CkMyPe(), total_walltime);
283
284   double lb_data[3];
285   lb_data[0] = total_walltime;
286   lb_data[1] = total_walltime;
287   lb_data[2] = 1;
288
289   CkCallback cb(CkIndex_CentralLB::ReceiveMinStats((CkReductionMsg*)NULL), 
290                   thisProxy[0]);
291   contribute(3*sizeof(double), lb_data, lbDataCollectionType, cb);
292
293 //  {
294 //  // enfore the barrier to wait until centralLB says no
295 //  LDOMHandle h;
296 //  h.id.id.idx = 0;
297 //  theLbdb->getLBDB()->RegisteringObjects(h);
298 //  }
299 }
300
301 void CentralLB::ReceiveMinStats(CkReductionMsg *msg) {
302   CmiAssert(CkMyPe() == 0);
303   lb_no_iterations++;
304   double* load = (LBRealType *) msg->getData();
305   double max = load[1];
306   double avg = load[0]/load[2];
307   CkPrintf("Total load : %lf Avg load: %lf Max load: %lf\n", load[0], load[0]/load[2], load[1]);
308   cur_max_pe_load = max;
309   cur_avg_pe_load = avg;
310
311   // Some heuristics for lbperiod
312   int ideal_period = (lb_strategy_cost + lb_migration_cost) * lb_no_iterations / (max - avg);
313   lb_ideal_trigger_lb_time = CkWallTimer() + (ideal_period * max/lb_no_iterations);
314   lb_min_stat_coll_period = CkWallTimer() + (ideal_period * max / lb_no_iterations);
315   CkPrintf("max : %lf, avg: %lf, strat cost: %lf, migration_cost: %lf, idealperiod : %d trigger  lb: %lf current time: %lf\n",
316       max/lb_no_iterations, avg/lb_no_iterations,
317       lb_strategy_cost, lb_migration_cost, ideal_period, lb_ideal_trigger_lb_time,
318       CkWallTimer());
319   CkPrintf("Walltime: %lf, lb trigger time %f, lb min stats coll time %f\n", CkWallTimer(), lb_ideal_trigger_lb_time, lb_min_stat_coll_period);
320   //  thisProxy.ResumeClients(0);
321   
322   thisProxy.ReceiveIdealLBPeriod(lb_ideal_trigger_lb_time, lb_min_stat_coll_period);
323
324 //  if (max/avg > 1.01) { 
325 //    CkPrintf("Carrying out loadbalancing\n");
326 //    thisProxy.ProcessAtSync();
327 //    lb_no_iterations = 0;
328 //  } else {
329     CkPrintf("Not very overloaded\n");
330           thisProxy.ResumeClients(1);
331 //  }
332 }
333
334 void CentralLB::ReceiveIdealLBPeriod(double lb_ideal_trigger_lbtime,
335     double lb_min_stat_collperiod) {
336   lb_ideal_trigger_lb_time = lb_ideal_trigger_lbtime;
337   lb_min_stat_coll_period = lb_min_stat_collperiod;
338 }
339 // called only on 0
340 void CentralLB::ReceiveCounts(CkReductionMsg  *msg)
341 {
342   CmiAssert(CkMyPe() == 0);
343   if (statsData == NULL) statsData = new LDStats;
344
345   int *counts = (int *)msg->getData();
346   int n_objs = counts[0];
347   int n_comm = counts[1];
348
349     // resize database
350   statsData->objData.resize(n_objs);
351   statsData->from_proc.resize(n_objs);
352   statsData->to_proc.resize(n_objs);
353   statsData->commData.resize(n_comm);
354
355   DEBUGF(("[%d] ReceiveCounts: n_objs:%d n_comm:%d\n",CkMyPe(), n_objs, n_comm));
356         
357     // broadcast call to let everybody start to send stats
358   thisProxy.SendStats();
359 }
360
361 void CentralLB::BuildStatsMsg()
362 {
363 #if CMK_LBDB_ON
364   // build and send stats
365   const int osz = theLbdb->GetObjDataSz();
366   const int csz = theLbdb->GetCommDataSz();
367
368   int npes = CkNumPes();
369   CLBStatsMsg* msg = new CLBStatsMsg(osz, csz);
370   _MEMCHECK(msg);
371   msg->from_pe = CkMyPe();
372 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
373         msg->step = step();
374 #endif
375   //msg->serial = CrnRand();
376
377 /*
378   theLbdb->TotalTime(&msg->total_walltime,&msg->total_cputime);
379   theLbdb->IdleTime(&msg->idletime);
380   theLbdb->BackgroundLoad(&msg->bg_walltime,&msg->bg_cputime);
381 */
382 #if CMK_LB_CPUTIMER
383   theLbdb->GetTime(&msg->total_walltime,&msg->total_cputime,
384                    &msg->idletime, &msg->bg_walltime,&msg->bg_cputime);
385 #else
386   theLbdb->GetTime(&msg->total_walltime,&msg->total_walltime,
387                    &msg->idletime, &msg->bg_walltime,&msg->bg_walltime);
388 #endif
389
390   msg->pe_speed = myspeed;
391   DEBUGF(("Processor %d Total time (wall,cpu) = %f %f Idle = %f Bg = %f %f\n", CkMyPe(),msg->total_walltime,msg->total_cputime,msg->idletime,msg->bg_walltime,msg->bg_cputime));
392
393   msg->n_objs = osz;
394   theLbdb->GetObjData(msg->objData);
395   msg->n_comm = csz;
396   theLbdb->GetCommData(msg->commData);
397 //  theLbdb->ClearLoads();
398   DEBUGF(("PE %d BuildStatsMsg %d objs, %d comm\n",CkMyPe(),msg->n_objs,msg->n_comm));
399
400   if(CkMyPe() == cur_ld_balancer) {
401     msg->avail_vector = new char[CkNumPes()];
402     LBDatabaseObj()->get_avail_vector(msg->avail_vector);
403     msg->next_lb = LBDatabaseObj()->new_lbbalancer();
404   }
405
406   CmiAssert(statsMsg == NULL);
407   statsMsg = msg;
408 #endif
409 }
410
411
412 // called on every processor
413 void CentralLB::SendStats()
414 {
415 #if CMK_LBDB_ON
416   CmiAssert(statsMsg != NULL);
417   reduction_started = 0;
418
419 #if USE_LDB_SPANNING_TREE
420   if(CkNumPes()>1024)
421   {
422     if (CkMyPe() == cur_ld_balancer)
423       thisProxy[CkMyPe()].ReceiveStats(statsMsg);
424     else
425       thisProxy[CkMyPe()].ReceiveStatsViaTree(statsMsg);
426   }
427   else
428 #endif
429   {
430     DEBUGF(("[%d] calling ReceiveStats on step %d \n",CmiMyPe(),step()));
431     thisProxy[cur_ld_balancer].ReceiveStats(statsMsg);
432   }
433
434   statsMsg = NULL;
435
436 #ifdef __BIGSIM__
437   BgEndStreaming();
438 #endif
439
440   {
441   // enfore the barrier to wait until centralLB says no
442   LDOMHandle h;
443   h.id.id.idx = 0;
444   theLbdb->getLBDB()->RegisteringObjects(h);
445   }
446 #endif
447 }
448
449 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
450 extern int donotCountMigration;
451 #endif
452
453 void CentralLB::Migrated(LDObjHandle h, int waitBarrier)
454 {
455 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
456     if(donotCountMigration){
457         return ;
458     }
459 #endif
460
461 #if CMK_LBDB_ON
462   if (waitBarrier) {
463             migrates_completed++;
464       DEBUGF(("[%d] An object migrated! %d %d\n",CkMyPe(),migrates_completed,migrates_expected));
465     if (migrates_completed == migrates_expected) {
466       MigrationDone(1);
467     }
468   }
469   else {
470     future_migrates_completed ++;
471     DEBUGF(("[%d] An object migrated with no barrier! %d expected: %d\n",CkMyPe(),future_migrates_completed,future_migrates_expected));
472     if (future_migrates_completed == future_migrates_expected)  {
473         CheckMigrationComplete();
474     }
475   }
476 #endif
477 }
478
479 void CentralLB::MissMigrate(int waitForBarrier)
480 {
481   LDObjHandle h;
482   Migrated(h, waitForBarrier);
483 }
484
485 // build a complete data from bufferred messages
486 // not used when USE_REDUCTION = 1
487 void CentralLB::buildStats()
488 {
489     statsData->nprocs() = stats_msg_count;
490     // allocate space
491     statsData->objData.resize(statsData->n_objs);
492     statsData->from_proc.resize(statsData->n_objs);
493     statsData->to_proc.resize(statsData->n_objs);
494     statsData->commData.resize(statsData->n_comm);
495
496     int nobj = 0;
497     int ncom = 0;
498     int nmigobj = 0;
499     // copy all data in individule message to this big structure
500     for (int pe=0; pe<CkNumPes(); pe++) {
501        int i;
502        CLBStatsMsg *msg = statsMsgsList[pe];
503        if(msg == NULL) continue;
504        for (i=0; i<msg->n_objs; i++) {
505          statsData->from_proc[nobj] = statsData->to_proc[nobj] = pe;
506          statsData->objData[nobj] = msg->objData[i];
507          if (msg->objData[i].migratable) nmigobj++;
508          nobj++;
509        }
510        for (i=0; i<msg->n_comm; i++) {
511          statsData->commData[ncom] = msg->commData[i];
512          ncom++;
513        }
514        // free the memory
515        delete msg;
516        statsMsgsList[pe]=0;
517     }
518     statsData->n_migrateobjs = nmigobj;
519 }
520
521 // deposit one processor data at a time, note database is pre-allocated
522 // to have enough space
523 // used when USE_REDUCTION = 1
524 void CentralLB::depositData(CLBStatsMsg *m)
525 {
526   int i;
527   if (m == NULL) return;
528
529   const int pe = m->from_pe;
530   struct ProcStats &procStat = statsData->procs[pe];
531   procStat.pe = pe;
532   procStat.total_walltime = m->total_walltime;
533   procStat.idletime = m->idletime;
534   procStat.bg_walltime = m->bg_walltime;
535 #if CMK_LB_CPUTIMER
536   procStat.total_cputime = m->total_cputime;
537   procStat.bg_cputime = m->bg_cputime;
538 #endif
539   procStat.pe_speed = m->pe_speed;
540   //procStat.utilization = 1.0;
541   procStat.available = CmiTrue;
542   procStat.n_objs = m->n_objs;
543
544   int &nobj = statsData->n_objs;
545   int &nmigobj = statsData->n_migrateobjs;
546   for (i=0; i<m->n_objs; i++) {
547       statsData->from_proc[nobj] = statsData->to_proc[nobj] = pe;
548       statsData->objData[nobj] = m->objData[i];
549       if (m->objData[i].migratable) nmigobj++;
550       nobj++;
551       CmiAssert(nobj <= statsData->objData.capacity());
552   }
553   int &n_comm = statsData->n_comm;
554   for (i=0; i<m->n_comm; i++) {
555       statsData->commData[n_comm] = m->commData[i];
556       n_comm++;
557       CmiAssert(n_comm <= statsData->commData.capacity());
558   }
559   delete m;
560 }
561
562 void CentralLB::ReceiveStats(CkMarshalledCLBStatsMessage &msg)
563 {
564 #if CMK_LBDB_ON
565   if (statsMsgsList == NULL) {
566     statsMsgsList = new CLBStatsMsg*[CkNumPes()];
567     CmiAssert(statsMsgsList != NULL);
568     for(int i=0; i < CkNumPes(); i++)
569       statsMsgsList[i] = 0;
570   }
571   if (statsData == NULL) statsData = new LDStats;
572
573     //  loop through all CLBStatsMsg in the incoming msg
574   int count = msg.getCount();
575   for (int num = 0; num < count; num++) 
576   {
577     CLBStatsMsg *m = msg.getMessage(num);
578     CmiAssert(m!=NULL);
579     const int pe = m->from_pe;
580     DEBUGF(("Stats msg received, %d %d %d %p step %d\n", pe,stats_msg_count,m->n_objs,m,step()));
581 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))     
582 /*      
583  *  if(m->step < step()){
584  *    //TODO: if a processor is redoing an old load balance step..
585  *    //tell it that the step is done and that it should not perform any migrations
586  *      thisProxy[pe].ReceiveDummyMigration();
587  *  }*/
588 #endif
589         
590     if(!CmiNodeAlive(pe)){
591         DEBUGF(("[%d] ReceiveStats called from invalidProcessor %d\n",CkMyPe(),pe));
592         continue;
593     }
594         
595     if (m->avail_vector!=NULL) {
596       LBDatabaseObj()->set_avail_vector(m->avail_vector,  m->next_lb);
597     }
598
599     if (statsMsgsList[pe] != 0) {
600       CkPrintf("*** Unexpected CLBStatsMsg in ReceiveStats from PE %d ***\n",
601              pe);
602     } else {
603       statsMsgsList[pe] = m;
604 #if USE_REDUCTION
605       depositData(m);
606 #else
607       // store per processor data right away
608       struct ProcStats &procStat = statsData->procs[pe];
609       procStat.pe = pe;
610       procStat.total_walltime = m->total_walltime;
611       procStat.idletime = m->idletime;
612       procStat.bg_walltime = m->bg_walltime;
613 #if CMK_LB_CPUTIMER
614       procStat.total_cputime = m->total_cputime;
615       procStat.bg_cputime = m->bg_cputime;
616 #endif
617       procStat.pe_speed = m->pe_speed;
618       //procStat.utilization = 1.0;
619       procStat.available = CmiTrue;
620       procStat.n_objs = m->n_objs;
621
622       statsData->n_objs += m->n_objs;
623       statsData->n_comm += m->n_comm;
624 #endif
625       stats_msg_count++;
626     }
627   }    // end of for
628
629   const int clients = CkNumValidPes();
630   DEBUGF(("THIS POINT count = %d, clients = %d\n",stats_msg_count,clients));
631  
632   if (stats_msg_count == clients) {
633         DEBUGF(("[%d] All stats messages received \n",CmiMyPe()));
634     statsData->nprocs() = stats_msg_count;
635     thisProxy[CkMyPe()].LoadBalance();
636   }
637 #endif
638 }
639
640 /** added by Abhinav for receiving msgs via spanning tree */
641 void CentralLB::ReceiveStatsViaTree(CkMarshalledCLBStatsMessage &msg)
642 {
643 #if CMK_LBDB_ON
644         CmiAssert(CkMyPe() != 0);
645         bufMsg.add(msg);         // buffer messages
646         count_msgs++;
647         //CkPrintf("here %d\n", CkMyPe());
648         if (count_msgs == st.numChildren+1) {
649                 if(st.parent == 0)
650                 {
651                         thisProxy[0].ReceiveStats(bufMsg);
652                         //CkPrintf("from %d\n", CkMyPe());
653                 }
654                 else
655                         thisProxy[st.parent].ReceiveStatsViaTree(bufMsg);
656                 count_msgs = 0;
657                 bufMsg.free();
658         } 
659 #endif
660 }
661
662 void CentralLB::LoadBalance()
663 {
664 #if CMK_LBDB_ON
665   int proc;
666   const int clients = CkNumPes();
667
668 #if ! USE_REDUCTION
669   // build data
670   buildStats();
671 #else
672   for (proc = 0; proc < clients; proc++) statsMsgsList[proc] = NULL;
673 #endif
674
675   if (_lb_args.debug()) 
676       CmiPrintf("\nCharmLB> %s: PE [%d] step %d starting at %f Memory: %f MB\n",
677                   lbname, cur_ld_balancer, step(), start_lb_time,
678                   CmiMemoryUsage()/(1024.0*1024.0));
679
680   // if we are in simulation mode read data
681   if (LBSimulation::doSimulation) simulationRead();
682
683   char *availVector = LBDatabaseObj()->availVector();
684   for(proc = 0; proc < clients; proc++)
685       statsData->procs[proc].available = (CmiBool)availVector[proc];
686
687   preprocess(statsData);
688
689 //    CkPrintf("Before Calling Strategy\n");
690
691   if (_lb_args.printSummary()) {
692       LBInfo info(clients);
693         // not take comm data
694       info.getInfo(statsData, clients, 0);
695       LBRealType mLoad, mCpuLoad, totalLoad;
696       info.getSummary(mLoad, mCpuLoad, totalLoad);
697       int nmsgs, nbytes;
698       statsData->computeNonlocalComm(nmsgs, nbytes);
699       CkPrintf("[%d] Load Summary (before LB): max (with bg load): %f max (obj only): %f average: %f at step %d nonlocal: %d msgs %.2fKB.\n", CkMyPe(), mLoad, mCpuLoad, totalLoad/clients, step(), nmsgs, 1.0*nbytes/1024);
700 //      if (_lb_args.debug() > 1) {
701 //        for (int i=0; i<statsData->n_objs; i++)
702 //          CmiPrintf("[%d] %.10f %.10f\n", i, statsData->objData[i].minWall, statsData->objData[i].maxWall);
703 //      }
704   }
705
706 #if CMK_REPLAYSYSTEM
707   LDHandle *loadBalancer_pointers;
708   if (_replaySystem) {
709     loadBalancer_pointers = (LDHandle*)malloc(CkNumPes()*sizeof(LDHandle));
710     for (int i=0; i<statsData->n_objs; ++i) loadBalancer_pointers[statsData->from_proc[i]] = statsData->objData[i].handle.omhandle.ldb;
711   }
712 #endif
713   
714   LBMigrateMsg* migrateMsg = Strategy(statsData);
715 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
716         migrateMsg->step = step();
717 #endif
718
719 #if CMK_REPLAYSYSTEM
720   CpdHandleLBMessage(&migrateMsg);
721   if (_replaySystem) {
722     for (int i=0; i<migrateMsg->n_moves; ++i) migrateMsg->moves[i].obj.omhandle.ldb = loadBalancer_pointers[migrateMsg->moves[i].from_pe];
723     free(loadBalancer_pointers);
724   }
725 #endif
726   
727   LBDatabaseObj()->get_avail_vector(migrateMsg->avail_vector);
728   migrateMsg->next_lb = LBDatabaseObj()->new_lbbalancer();
729
730   // if this is the step at which we need to dump the database
731   simulationWrite();
732
733 //  calculate predicted load
734 //  very time consuming though, so only happen when debugging is on
735   if (_lb_args.printSummary()) {
736       LBInfo info(clients);
737         // not take comm data
738       getPredictedLoadWithMsg(statsData, clients, migrateMsg, info, 0);
739       LBRealType mLoad, mCpuLoad, totalLoad;
740       info.getSummary(mLoad, mCpuLoad, totalLoad);
741       int nmsgs, nbytes;
742       statsData->computeNonlocalComm(nmsgs, nbytes);
743       CkPrintf("[%d] Load Summary (after LB): max (with bg load): %f max (obj only): %f average: %f at step %d nonlocal: %d msgs %.2fKB useMem: %.2fKB.\n", CkMyPe(), mLoad, mCpuLoad, totalLoad/clients, step(), nmsgs, 1.0*nbytes/1024, (1.0*useMem())/1024);
744       for (int i=0; i<clients; i++)
745         migrateMsg->expectedLoad[i] = info.peLoads[i];
746   }
747
748   DEBUGF(("[%d]calling recv migration\n",CkMyPe()));
749 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_)) 
750     lbDecisionCount++;
751     migrateMsg->lbDecisionCount = lbDecisionCount;
752 #endif
753
754   envelope *env = UsrToEnv(migrateMsg);
755   if (1) {
756       // broadcast
757     thisProxy.ReceiveMigration(migrateMsg);
758   }
759   else {
760     // split the migration for each processor
761     for (int p=0; p<CkNumPes(); p++) {
762       LBMigrateMsg *m = extractMigrateMsg(migrateMsg, p);
763       thisProxy[p].ReceiveMigration(m);
764     }
765     delete migrateMsg;
766   }
767
768   // Zero out data structures for next cycle
769   // CkPrintf("zeroing out data\n");
770   statsData->clear();
771   stats_msg_count=0;
772 #endif
773 }
774
775 // test if sender and receiver in a commData is nonmigratable.
776 static int isMigratable(LDObjData **objData, int *len, int count, const LDCommData &commData)
777 {
778 #if CMK_LBDB_ON
779   for (int pe=0 ; pe<count; pe++)
780   {
781     for (int i=0; i<len[pe]; i++)
782       if (LDObjIDEqual(objData[pe][i].objID(), commData.sender.objID()) ||
783           LDObjIDEqual(objData[pe][i].objID(), commData.receiver.get_destObj().objID())) 
784       return 0;
785   }
786 #endif
787   return 1;
788 }
789
790 // rebuild LDStats and remove all non-migratble objects and related things
791 void CentralLB::removeNonMigratable(LDStats* stats, int count)
792 {
793   int i;
794
795   // check if we have non-migratable objects
796   int have = 0;
797   for (i=0; i<stats->n_objs; i++) 
798   {
799     LDObjData &odata = stats->objData[i];
800     if (!odata.migratable) {
801       have = 1; break;
802     }
803   }
804   if (have == 0) return;
805
806   CkVec<LDObjData> nonmig;
807   CkVec<int> new_from_proc, new_to_proc;
808   nonmig.resize(stats->n_migrateobjs);
809   new_from_proc.resize(stats->n_migrateobjs);
810   new_to_proc.resize(stats->n_migrateobjs);
811   int n_objs = 0;
812   for (i=0; i<stats->n_objs; i++) 
813   {
814     LDObjData &odata = stats->objData[i];
815     if (odata.migratable) {
816       nonmig[n_objs] = odata;
817       new_from_proc[n_objs] = stats->from_proc[i];
818       new_to_proc[n_objs] = stats->to_proc[i];
819       n_objs ++;
820     }
821     else {
822       stats->procs[stats->from_proc[i]].bg_walltime += odata.wallTime;
823 #if CMK_LB_CPUTIMER
824       stats->procs[stats->from_proc[i]].bg_cputime += odata.cpuTime;
825 #endif
826     }
827   }
828   CmiAssert(stats->n_migrateobjs == n_objs);
829
830   stats->makeCommHash();
831   
832   CkVec<LDCommData> newCommData;
833   newCommData.resize(stats->n_comm);
834   int n_comm = 0;
835   for (i=0; i<stats->n_comm; i++) 
836   {
837     LDCommData& cdata = stats->commData[i];
838     if (!cdata.from_proc()) 
839     {
840       int idx = stats->getSendHash(cdata);
841       CmiAssert(idx != -1);
842       if (!stats->objData[idx].migratable) continue;
843     }
844     switch (cdata.receiver.get_type()) {
845     case LD_PROC_MSG:
846       break;
847     case LD_OBJ_MSG:  {
848       int idx = stats->getRecvHash(cdata);
849       if (stats->complete_flag)
850         CmiAssert(idx != -1);
851       else if (idx == -1) continue;          // receiver not in this group
852       if (!stats->objData[idx].migratable) continue;
853       break;
854       }
855     case LD_OBJLIST_MSG:    // object message FIXME add multicast
856       break;
857     }
858     newCommData[n_comm] = cdata;
859     n_comm ++;
860   }
861
862   if (n_objs != stats->n_objs) CmiPrintf("Removed %d nonmigratable %d comms - n_objs:%d migratable:%d\n", stats->n_objs-n_objs, stats->n_objs, stats->n_migrateobjs, stats->n_comm-n_comm);
863
864   // swap to new data
865   stats->objData = nonmig;
866   stats->from_proc = new_from_proc;
867   stats->to_proc = new_to_proc;
868   stats->n_objs = n_objs;
869
870   stats->commData = newCommData;
871   stats->n_comm = n_comm;
872
873   stats->deleteCommHash();
874   stats->makeCommHash();
875
876 }
877
878
879 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
880 extern int restarted;
881 #endif
882
883 void CentralLB::ReceiveMigration(LBMigrateMsg *m)
884 {
885   storedMigrateMsg = m;
886   CkCallback cb(CkIndex_CentralLB::ProcessReceiveMigration((CkReductionMsg*)NULL),
887                   thisProxy);
888   contribute(0, NULL, CkReduction::max_int, cb);
889 }
890
891 void CentralLB::ProcessReceiveMigration(CkReductionMsg  *msg)
892 {
893 #if CMK_LBDB_ON
894         int i;
895         LBMigrateMsg *m = storedMigrateMsg;
896         CmiAssert(m!=NULL);
897         delete msg;
898
899 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
900         int *dummyCounts;
901
902         DEBUGF(("[%d] Starting ReceiveMigration WITH step %d m->step %d\n",CkMyPe(),step(),m->step));
903         // CmiPrintf("[%d] Starting ReceiveMigration step %d m->step %d\n",CkMyPe(),step(),m->step);
904         if(step() > m->step){
905                 char str[100];
906                 envelope *env = UsrToEnv(m);
907                 CmiPrintf("[%d] Object %s tProcessed %d m->TN %d\n",CmiMyPe(),mlogData->objID.toString(str),mlogData->tProcessed,env->TN);
908                 return;
909         }
910         lbDecisionCount = m->lbDecisionCount;
911 #endif
912
913   if (_lb_args.debug() > 1) 
914     if (CkMyPe()%1024==0) CmiPrintf("[%d] Starting ReceiveMigration step %d at %f\n",CkMyPe(),step(), CmiWallTimer());
915
916   for (i=0; i<CkNumPes(); i++) theLbdb->lastLBInfo.expectedLoad[i] = m->expectedLoad[i];
917   CmiAssert(migrates_expected <= 0 || migrates_completed == migrates_expected);
918 /*FAULT_EVAC*/
919   if(!CmiNodeAlive(CkMyPe())){
920         delete m;
921         return;
922   }
923   migrates_expected = 0;
924   future_migrates_expected = 0;
925 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
926         int sending=0;
927     int dummy=0;
928         LBDB *_myLBDB = theLbdb->getLBDB();
929         if(_restartFlag){
930         dummyCounts = new int[CmiNumPes()];
931         bzero(dummyCounts,sizeof(int)*CmiNumPes());
932     }
933 #endif
934   for(i=0; i < m->n_moves; i++) {
935     MigrateInfo& move = m->moves[i];
936     const int me = CkMyPe();
937     if (move.from_pe == me && move.to_pe != me) {
938       DEBUGF(("[%d] migrating object to %d\n",move.from_pe,move.to_pe));
939       // migrate object, in case it is already gone, inform toPe
940 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
941       if (theLbdb->Migrate(move.obj,move.to_pe) == 0) 
942          thisProxy[move.to_pe].MissMigrate(!move.async_arrival);
943 #else
944             if(_restartFlag == 0){
945                 DEBUG(CmiPrintf("[%d] need to move object from %d to %d \n",CkMyPe(),move.from_pe,move.to_pe));
946                 theLbdb->Migrate(move.obj,move.to_pe);
947                 sending++;
948             }else{
949                 if(_myLBDB->validObjHandle(move.obj)){
950                     DEBUG(CmiPrintf("[%d] need to move object from %d to %d \n",CkMyPe(),move.from_pe,move.to_pe));
951                     theLbdb->Migrate(move.obj,move.to_pe);
952                     sending++;
953                 }else{
954                     DEBUG(CmiPrintf("[%d] dummy move to pe %d detected after restart \n",CmiMyPe(),move.to_pe));
955                     dummyCounts[move.to_pe]++;
956                     dummy++;
957                 }
958             }
959 #endif
960     } else if (move.from_pe != me && move.to_pe == me) {
961        DEBUGF(("[%d] expecting object from %d\n",move.to_pe,move.from_pe));
962       if (!move.async_arrival) migrates_expected++;
963       else future_migrates_expected++;
964     }
965   }
966   DEBUGF(("[%d] in ReceiveMigration %d moves expected: %d future expected: %d\n",CkMyPe(),m->n_moves, migrates_expected, future_migrates_expected));
967   // if (_lb_debug) CkPrintf("[%d] expecting %d objects migrating.\n", CkMyPe(), migrates_expected);
968
969 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
970         if(_restartFlag){
971                 sendDummyMigrationCounts(dummyCounts);
972                 _restartFlag  =0;
973         delete []dummyCounts;
974         }
975 #endif
976
977
978 #if 0
979   if (m->n_moves ==0) {
980     theLbdb->SetLBPeriod(theLbdb->GetLBPeriod()*2);
981   }
982 #endif
983   CkPrintf("Can set the LBPeriod here [%d]\n", CkMyPe());
984   cur_ld_balancer = m->next_lb;
985   if((CkMyPe() == cur_ld_balancer) && (cur_ld_balancer != 0)){
986       LBDatabaseObj()->set_avail_vector(m->avail_vector, -2);
987   }
988
989   if (migrates_expected == 0 || migrates_completed == migrates_expected)
990     MigrationDone(1);
991   delete m;
992
993 //      CkEvacuatedElement();
994 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
995 //  migrates_expected = 0;
996 //  //  ResumeClients(1);
997 #endif
998 #endif
999 }
1000
1001 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1002 void CentralLB::ReceiveDummyMigration(int globalDecisionCount){
1003     DEBUGF(("[%d] ReceiveDummyMigration called for step %d with globalDecisionCount %d\n",CkMyPe(),step(),globalDecisionCount));
1004     //TODO: this is gonna be important when a crash happens during checkpoint
1005     //the globalDecisionCount would have to be saved and compared against
1006     //a future recvMigration
1007                 
1008         thisProxy[CkMyPe()].ResumeClients(1);
1009 }
1010 #endif
1011
1012 void CentralLB::MigrationDone(int balancing)
1013 {
1014 #if CMK_LBDB_ON
1015   migrates_completed = 0;
1016   migrates_expected = -1;
1017   // clear load stats
1018   if (balancing) theLbdb->ClearLoads();
1019   // Increment to next step
1020   theLbdb->incStep();
1021         DEBUGF(("[%d] Incrementing Step %d \n",CkMyPe(),step()));
1022   // if sync resume, invoke a barrier
1023
1024 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1025     savedBalancing = balancing;
1026     startLoadBalancingMlog(&resumeCentralLbAfterChkpt,(void *)this);
1027 #endif
1028
1029   LBDatabase::Object()->MigrationDone();    // call registered callbacks
1030
1031   LoadbalanceDone(balancing);        // callback
1032 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
1033   // if sync resume invoke a barrier
1034   if (balancing && _lb_args.syncResume()) {
1035     CkCallback cb(CkIndex_CentralLB::ResumeClients((CkReductionMsg*)NULL), 
1036                   thisProxy);
1037     contribute(0, NULL, CkReduction::sum_int, cb);
1038   }
1039   else{ 
1040     if(CmiNodeAlive(CkMyPe())){
1041         thisProxy [CkMyPe()].ResumeClients(balancing);
1042     }   
1043   }     
1044 #if CMK_GRID_QUEUE_AVAILABLE
1045   CmiGridQueueDeregisterAll ();
1046   CpvAccess(CkGridObject) = NULL;
1047 #endif
1048 #endif 
1049 #endif
1050 }
1051
1052 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1053 void CentralLB::endMigrationDone(int balancing){
1054     DEBUGF(("[%d] CentralLB::endMigrationDone step %d\n",CkMyPe(),step()));
1055
1056
1057   if (balancing && _lb_args.syncResume()) {
1058     CkCallback cb(CkIndex_CentralLB::ResumeClients((CkReductionMsg*)NULL),
1059                   thisProxy);
1060     contribute(0, NULL, CkReduction::sum_int, cb);
1061   }
1062   else{
1063     if(CmiNodeAlive(CkMyPe())){
1064     DEBUGF(("[%d] Sending ResumeClients balancing %d \n",CkMyPe(),balancing));
1065     thisProxy [CkMyPe()].ResumeClients(balancing);
1066     }
1067   }
1068
1069 }
1070 #endif
1071
1072 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1073 void resumeCentralLbAfterChkpt(void *_lb){
1074     CentralLB *lb= (CentralLB *)_lb;
1075     CpvAccess(_currentObj)=lb;
1076     lb->endMigrationDone(lb->savedBalancing);
1077 }
1078 #endif
1079
1080
1081 void CentralLB::ResumeClients(CkReductionMsg *msg)
1082 {
1083   ResumeClients(1);
1084   delete msg;
1085 }
1086
1087 void CentralLB::ResumeClients(int balancing)
1088 {
1089 #if CMK_LBDB_ON
1090 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1091     resumeCount++;
1092     globalResumeCount = resumeCount;
1093 #endif
1094   DEBUGF(("[%d] Resuming clients. balancing:%d.\n",CkMyPe(),balancing));
1095   if (balancing && _lb_args.debug() && CkMyPe() == cur_ld_balancer) {
1096     double end_lb_time = CkWallTimer();
1097   }
1098
1099 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
1100   if (balancing) ComlibNotifyMigrationDone();  
1101 #endif
1102
1103   theLbdb->ResumeClients();
1104   if (balancing)  {
1105     CheckMigrationComplete();
1106     if (future_migrates_expected == 0 || 
1107             future_migrates_expected == future_migrates_completed) {
1108       CheckMigrationComplete();
1109     }
1110   }
1111 #endif
1112 }
1113
1114 /*
1115   migration of objects contains two different kinds:
1116   (1) objects want to make a barrier for migration completion
1117       (waitForBarrier is true)
1118       migrationDone() to finish and resumeClients
1119   (2) objects don't need a barrier
1120   However, next load balancing can only happen when both migrations complete
1121 */ 
1122 void CentralLB::CheckMigrationComplete()
1123 {
1124 #if CMK_LBDB_ON
1125   lbdone ++;
1126   if (lbdone == 2) {
1127     if (_lb_args.debug() && CkMyPe()==0) {
1128       double end_lb_time = CkWallTimer();
1129       CkPrintf("CharmLB> %s: PE [%d] step %d finished at %f duration %f s\n\n",
1130                 lbname, cur_ld_balancer, step()-1, end_lb_time,
1131                 end_lb_time-start_lb_time);
1132     }
1133
1134     lb_migration_cost = (CkWallTimer() - start_lb_time);
1135     lb_iteration_start_time = CkWallTimer();
1136     CkPrintf("lb_iteration_start_time[%d] %lf\n", CkMyPe(), lb_iteration_start_time);
1137
1138     lbdone = 0;
1139     future_migrates_expected = -1;
1140     future_migrates_completed = 0;
1141     DEBUGF(("[%d] Migration Complete\n", CkMyPe()));
1142     // release local barrier  so that the next load balancer can go
1143     LDOMHandle h;
1144     h.id.id.idx = 0;
1145     theLbdb->getLBDB()->DoneRegisteringObjects(h);
1146     // switch to the next load balancer in the list
1147     // subtle: called from Migrated() may result in Migrated() called in next LB
1148     theLbdb->nextLoadbalancer(seqno);
1149   }
1150 #endif
1151 }
1152
1153 void CentralLB::preprocess(LDStats* stats)
1154 {
1155   if (_lb_args.ignoreBgLoad())
1156     stats->clearBgLoad();
1157
1158   // Call the predictor for the future
1159   if (_lb_predict) FuturePredictor(statsData);
1160 }
1161
1162 // default load balancing strategy
1163 LBMigrateMsg* CentralLB::Strategy(LDStats* stats)
1164 {
1165 #if CMK_LBDB_ON
1166   double strat_start_time = CkWallTimer();
1167   if (_lb_args.debug())
1168     CkPrintf("CharmLB> %s: PE [%d] strategy starting at %f\n", lbname, cur_ld_balancer, strat_start_time);
1169
1170   work(stats);
1171
1172   if (_lb_args.debug()>1)  {
1173     CkPrintf("CharmLB> Obj Map:\n");
1174     for (int i=0; i<stats->n_objs; i++) CkPrintf("%d ", stats->to_proc[i]);
1175     CkPrintf("\n");
1176   }
1177
1178   LBMigrateMsg *msg = createMigrateMsg(stats);
1179
1180   if (_lb_args.debug()) {
1181     double strat_end_time = CkWallTimer();
1182     envelope *env = UsrToEnv(msg);
1183
1184     double lbdbMemsize = LBDatabase::Object()->useMem()/1000;
1185     CkPrintf("CharmLB> %s: PE [%d] Memory: LBManager: %d KB CentralLB: %d KB\n",
1186               lbname, cur_ld_balancer, (int)lbdbMemsize, (int)(useMem()/1000));
1187     CkPrintf("CharmLB> %s: PE [%d] #Objects migrating: %d, LBMigrateMsg size: %.2f MB\n", lbname, cur_ld_balancer, msg->n_moves, env->getTotalsize()/1024.0/1024.0);
1188     CkPrintf("CharmLB> %s: PE [%d] strategy finished at %f duration %f s\n",
1189               lbname, cur_ld_balancer, strat_end_time, strat_end_time-strat_start_time);
1190     lb_strategy_cost = (strat_end_time - strat_start_time);
1191     CkPrintf("Strategy cost %f %f %f\n", strat_end_time, strat_start_time, lb_strategy_cost);
1192   }
1193
1194   return msg;
1195 #else
1196   return NULL;
1197 #endif
1198 }
1199
1200 void CentralLB::work(LDStats* stats)
1201 {
1202   // does nothing but print the database
1203   stats->print();
1204 }
1205
1206 // generate migrate message from stats->from_proc and to_proc
1207 LBMigrateMsg * CentralLB::createMigrateMsg(LDStats* stats)
1208 {
1209   int i;
1210   CkVec<MigrateInfo*> migrateInfo;
1211   for (i=0; i<stats->n_objs; i++) {
1212     LDObjData &objData = stats->objData[i];
1213     int frompe = stats->from_proc[i];
1214     int tope = stats->to_proc[i];
1215     if (frompe != tope) {
1216       //      CkPrintf("[%d] Obj %d migrating from %d to %d\n",
1217       //         CkMyPe(),obj,pe,dest);
1218       MigrateInfo *migrateMe = new MigrateInfo;
1219       migrateMe->obj = objData.handle;
1220       migrateMe->from_pe = frompe;
1221       migrateMe->to_pe = tope;
1222       migrateMe->async_arrival = objData.asyncArrival;
1223       migrateInfo.insertAtEnd(migrateMe);
1224     }
1225   }
1226
1227   int migrate_count=migrateInfo.length();
1228   LBMigrateMsg* msg = new(migrate_count,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
1229   msg->n_moves = migrate_count;
1230   for(i=0; i < migrate_count; i++) {
1231     MigrateInfo* item = (MigrateInfo*) migrateInfo[i];
1232     msg->moves[i] = *item;
1233     delete item;
1234     migrateInfo[i] = 0;
1235   }
1236   return msg;
1237 }
1238
1239 LBMigrateMsg * CentralLB::extractMigrateMsg(LBMigrateMsg *m, int p)
1240 {
1241   int nmoves = 0;
1242   int nunavail = 0;
1243   int i;
1244   for (i=0; i<m->n_moves; i++) {
1245     MigrateInfo* item = (MigrateInfo*) &m->moves[i];
1246     if (item->from_pe == p || item->to_pe == p) nmoves++;
1247   }
1248   for (i=0; i<CkNumPes();i++) {
1249     if (!m->avail_vector[i]) nunavail++;
1250   }
1251   LBMigrateMsg* msg;
1252   if (nunavail) msg = new(nmoves,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
1253   else msg = new(nmoves,0,0,0) LBMigrateMsg;
1254   msg->n_moves = nmoves;
1255   msg->level = m->level;
1256   msg->next_lb = m->next_lb;
1257   for (i=0,nmoves=0; i<m->n_moves; i++) {
1258     MigrateInfo* item = (MigrateInfo*) &m->moves[i];
1259     if (item->from_pe == p || item->to_pe == p) {
1260       msg->moves[nmoves] = *item;
1261       nmoves++;
1262     }
1263   }
1264   // copy processor data
1265   if (nunavail)
1266   for (i=0; i<CkNumPes();i++) {
1267     msg->avail_vector[i] = m->avail_vector[i];
1268     msg->expectedLoad[i] = m->expectedLoad[i];
1269   }
1270   return msg;
1271 }
1272
1273 void CentralLB::simulationWrite() {
1274   if(step() == LBSimulation::dumpStep)
1275   {
1276     // here we are supposed to dump the database
1277     int dumpFileSize = strlen(LBSimulation::dumpFile) + 4;
1278     char *dumpFileName = (char *)malloc(dumpFileSize);
1279     while (sprintf(dumpFileName, "%s.%d", LBSimulation::dumpFile, LBSimulation::dumpStep) >= dumpFileSize) {
1280       free(dumpFileName);
1281       dumpFileSize+=3;
1282       dumpFileName = (char *)malloc(dumpFileSize);
1283     }
1284     writeStatsMsgs(dumpFileName);
1285     free(dumpFileName);
1286     CmiPrintf("LBDump: Dumped the load balancing data at step %d.\n",LBSimulation::dumpStep);
1287     ++LBSimulation::dumpStep;
1288     --LBSimulation::dumpStepSize;
1289     if (LBSimulation::dumpStepSize <= 0) { // prevent stupid step sizes
1290       CmiPrintf("Charm++> Exiting...\n");
1291       CkExit();
1292     }
1293     return;
1294   }
1295 }
1296
1297 void CentralLB::simulationRead() {
1298   LBSimulation *simResults = NULL, *realResults;
1299   LBMigrateMsg *voidMessage = new (0,0,0,0) LBMigrateMsg();
1300   voidMessage->n_moves=0;
1301   for ( ;LBSimulation::simStepSize > 0; --LBSimulation::simStepSize, ++LBSimulation::simStep) {
1302     // here we are supposed to read the data from the dump database
1303     int simFileSize = strlen(LBSimulation::dumpFile) + 4;
1304     char *simFileName = (char *)malloc(simFileSize);
1305     while (sprintf(simFileName, "%s.%d", LBSimulation::dumpFile, LBSimulation::simStep) >= simFileSize) {
1306       free(simFileName);
1307       simFileSize+=3;
1308       simFileName = (char *)malloc(simFileSize);
1309     }
1310     readStatsMsgs(simFileName);
1311
1312     // allocate simResults (only the first step)
1313     if (simResults == NULL) {
1314       simResults = new LBSimulation(LBSimulation::simProcs);
1315       realResults = new LBSimulation(LBSimulation::simProcs);
1316     }
1317     else {
1318       // should be the same number of procs of the original simulation!
1319       if (!LBSimulation::procsChanged) {
1320         // it means we have a previous step, so in simResults there is data.
1321         // we can now print the real effects of the load balancer during the simulation
1322         // or print the difference between the predicted data and the real one.
1323         realResults->reset();
1324         // reset to_proc of statsData to be equal to from_proc
1325         for (int k=0; k < statsData->n_objs; ++k) statsData->to_proc[k] = statsData->from_proc[k];
1326         findSimResults(statsData, LBSimulation::simProcs, voidMessage, realResults);
1327         simResults->PrintDifferences(realResults,statsData);
1328       }
1329       simResults->reset();
1330     }
1331
1332     // now pass it to the strategy routine
1333     double startT = CkWallTimer();
1334     preprocess(statsData);
1335     CmiPrintf("%s> Strategy starts ... \n", lbname);
1336     LBMigrateMsg* migrateMsg = Strategy(statsData);
1337     CmiPrintf("%s> Strategy took %fs memory usage: CentralLB: %d KB.\n",
1338                lbname, CkWallTimer()-startT, (int)(useMem()/1000));
1339
1340     // now calculate the results of the load balancing simulation
1341     findSimResults(statsData, LBSimulation::simProcs, migrateMsg, simResults);
1342
1343     // now we have the simulation data, so print it and loop
1344     CmiPrintf("Charm++> LBSim: Simulation of load balancing step %d done.\n",LBSimulation::simStep);
1345     // **CWL** Officially recording my disdain here for using ints for bool
1346     if (LBSimulation::showDecisionsOnly) {
1347       simResults->PrintDecisions(migrateMsg, simFileName, 
1348                                  LBSimulation::simProcs);
1349     } else {
1350       simResults->PrintSimulationResults();
1351     }
1352
1353     free(simFileName);
1354     delete migrateMsg;
1355     CmiPrintf("Charm++> LBSim: Passing to the next step\n");
1356   }
1357   // deallocate simResults
1358   delete simResults;
1359   CmiPrintf("Charm++> Exiting...\n");
1360   CkExit();
1361 }
1362
1363 void CentralLB::readStatsMsgs(const char* filename) 
1364 {
1365 #if CMK_LBDB_ON
1366   int i;
1367   FILE *f = fopen(filename, "r");
1368   if (f==NULL) {
1369     CmiPrintf("Fatal Error> Cannot open LB Dump file %s!\n", filename);
1370     CmiAbort("");
1371   }
1372
1373   // at this stage, we need to rebuild the statsMsgList and
1374   // statsDataList structures. For that first deallocate the
1375   // old structures
1376   if (statsMsgsList) {
1377     for(i = 0; i < stats_msg_count; i++)
1378       delete statsMsgsList[i];
1379     delete[] statsMsgsList;
1380     statsMsgsList=0;
1381   }
1382
1383   PUP::fromDisk pd(f);
1384   PUP::machineInfo machInfo;
1385
1386   pd((char *)&machInfo, sizeof(machInfo));      // read machine info
1387   PUP::xlater p(machInfo, pd);
1388
1389   if (_lb_args.lbversion() > 1) {
1390     p|_lb_args.lbversion();             // write version number
1391     CkPrintf("LB> File version detected: %d\n", _lb_args.lbversion());
1392     CmiAssert(_lb_args.lbversion() <= LB_FORMAT_VERSION);
1393   }
1394   p|stats_msg_count;
1395
1396   CmiPrintf("readStatsMsgs for %d pes starts ... \n", stats_msg_count);
1397   if (LBSimulation::simProcs == 0) LBSimulation::simProcs = stats_msg_count;
1398   if (LBSimulation::simProcs != stats_msg_count) LBSimulation::procsChanged = true;
1399
1400   // LBSimulation::simProcs must be set
1401   statsData->pup(p);
1402
1403   CmiPrintf("Simulation for %d pes \n", LBSimulation::simProcs);
1404   CmiPrintf("n_obj: %d n_migratble: %d \n", statsData->n_objs, statsData->n_migrateobjs);
1405
1406   // file f is closed in the destructor of PUP::fromDisk
1407   CmiPrintf("ReadStatsMsg from %s completed\n", filename);
1408 #endif
1409 }
1410
1411 void CentralLB::writeStatsMsgs(const char* filename) 
1412 {
1413 #if CMK_LBDB_ON
1414   FILE *f = fopen(filename, "w");
1415   if (f==NULL) {
1416     CmiPrintf("Fatal Error> writeStatsMsgs failed to open the output file %s!\n", filename);
1417     CmiAbort("");
1418   }
1419
1420   const PUP::machineInfo &machInfo = PUP::machineInfo::current();
1421   PUP::toDisk p(f);
1422   p((char *)&machInfo, sizeof(machInfo));       // machine info
1423
1424   p|_lb_args.lbversion();               // write version number
1425   p|stats_msg_count;
1426   statsData->pup(p);
1427
1428   fclose(f);
1429
1430   CmiPrintf("WriteStatsMsgs to %s succeed!\n", filename);
1431 #endif
1432 }
1433
1434 // calculate the predicted wallclock/cpu load for every processors
1435 // considering communication overhead if considerComm is true
1436 void getPredictedLoadWithMsg(BaseLB::LDStats* stats, int count, 
1437                       LBMigrateMsg *msg, LBInfo &info, 
1438                       int considerComm)
1439 {
1440 #if CMK_LBDB_ON
1441         stats->makeCommHash();
1442
1443         // update to_proc according to migration msgs
1444         for(int i = 0; i < msg->n_moves; i++) {
1445           MigrateInfo &mInfo = msg->moves[i];
1446           int idx = stats->getHash(mInfo.obj.objID(), mInfo.obj.omID());
1447           CmiAssert(idx != -1);
1448           stats->to_proc[idx] = mInfo.to_pe;
1449         }
1450
1451         info.getInfo(stats, count, considerComm);
1452 #endif
1453 }
1454
1455
1456 void CentralLB::findSimResults(LDStats* stats, int count, LBMigrateMsg* msg, LBSimulation* simResults)
1457 {
1458     CkAssert(simResults != NULL && count == simResults->numPes);
1459     // estimate the new loads of the processors. As a first approximation, this is the
1460     // sum of the cpu times of the objects on that processor
1461     double startT = CkWallTimer();
1462     getPredictedLoadWithMsg(stats, count, msg, simResults->lbinfo, 1);
1463     CmiPrintf("getPredictedLoad finished in %fs\n", CkWallTimer()-startT);
1464 }
1465
1466 void CentralLB::pup(PUP::er &p) { 
1467   BaseLB::pup(p); 
1468   if (p.isUnpacking())  {
1469     initLB(CkLBOptions(seqno)); 
1470   }
1471   p|reduction_started;
1472   int has_statsMsg=0;
1473   if (p.isPacking()) has_statsMsg = (statsMsg!=NULL);
1474   p|has_statsMsg;
1475   if (has_statsMsg) {
1476     if (p.isUnpacking())
1477       statsMsg = new CLBStatsMsg;
1478     statsMsg->pup(p);
1479   }
1480 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1481   p | lbDecisionCount;
1482   p | resumeCount;
1483 #endif
1484         
1485 }
1486
1487 int CentralLB::useMem() { 
1488   return sizeof(CentralLB) + statsData->useMem() + 
1489          CkNumPes() * sizeof(CLBStatsMsg *);
1490 }
1491
1492
1493 /**
1494   CLBStatsMsg is not a real message now.
1495   CLBStatsMsg is used for all processors to fill in their local load and comm
1496   statistics and send to processor 0
1497 */
1498
1499 CLBStatsMsg::CLBStatsMsg(int osz, int csz) {
1500   n_objs = osz;
1501   n_comm = csz;
1502   objData = new LDObjData[osz];
1503   commData = new LDCommData[csz];
1504   avail_vector = NULL;
1505 }
1506
1507 CLBStatsMsg::~CLBStatsMsg() {
1508   delete [] objData;
1509   delete [] commData;
1510   delete [] avail_vector;
1511 }
1512
1513 void CLBStatsMsg::pup(PUP::er &p) {
1514   int i;
1515   p|from_pe;
1516   p|pe_speed;
1517   p|total_walltime;
1518   p|idletime;
1519   p|bg_walltime;
1520 #if CMK_LB_CPUTIMER
1521   p|total_cputime;
1522   p|bg_cputime;
1523 #endif
1524 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1525   p | step;
1526 #endif
1527   p|n_objs;
1528   if (p.isUnpacking()) objData = new LDObjData[n_objs];
1529   for (i=0; i<n_objs; i++) p|objData[i];
1530   p|n_comm;
1531   if (p.isUnpacking()) commData = new LDCommData[n_comm];
1532   for (i=0; i<n_comm; i++) p|commData[i];
1533
1534   int has_avail_vector;
1535   if (!p.isUnpacking()) has_avail_vector = (avail_vector != NULL);
1536   p|has_avail_vector;
1537   if (p.isUnpacking()) {
1538     if (has_avail_vector) avail_vector = new char[CkNumPes()];
1539     else avail_vector = NULL;
1540   }
1541   if (has_avail_vector) p(avail_vector, CkNumPes());
1542
1543   p(next_lb);
1544 }
1545
1546 // CkMarshalledCLBStatsMessage is used in the marshalled parameter in
1547 // the entry function, it is just used to use to pup.
1548 // I don't use CLBStatsMsg directly as marshalled parameter because
1549 // I want the data pointer stored and not to be freed by the Charm++.
1550 void CkMarshalledCLBStatsMessage::free() { 
1551   int count = msgs.size();
1552   for  (int i=0; i<count; i++) {
1553     delete msgs[i];
1554     msgs[i] = NULL;
1555   }
1556   msgs.free();
1557 }
1558
1559 void CkMarshalledCLBStatsMessage::add(CkMarshalledCLBStatsMessage &m)
1560 {
1561   int count = m.getCount();
1562   for (int i=0; i<count; i++) add(m.getMessage(i));
1563 }
1564
1565 void CkMarshalledCLBStatsMessage::pup(PUP::er &p)
1566 {
1567   int count = msgs.size();
1568   p|count;
1569   for (int i=0; i<count; i++) {
1570     CLBStatsMsg *msg;
1571     if (p.isUnpacking()) msg = new CLBStatsMsg;
1572     else { 
1573       msg = msgs[i]; CmiAssert(msg!=NULL);
1574     }
1575     msg->pup(p);
1576     if (p.isUnpacking()) add(msg);
1577   }
1578 }
1579
1580 SpanningTree::SpanningTree()
1581 {
1582         double sq = sqrt(CkNumPes()*4.0-3.0) - 1; // 1 + arity + arity*arity = CkNumPes()
1583         arity = (int)ceil(sq/2);
1584         calcParent(CkMyPe());
1585         calcNumChildren(CkMyPe());
1586 }
1587
1588 void SpanningTree::calcParent(int n)
1589 {
1590         parent=-1;
1591         if(n != 0  && arity > 0)
1592                 parent = (n-1)/arity;
1593 }
1594
1595 void SpanningTree::calcNumChildren(int n)
1596 {
1597         numChildren = 0;
1598         if (arity == 0) return;
1599         int fullNode=(CkNumPes()-1-arity)/arity;
1600         if(n <= fullNode)
1601                 numChildren = arity;
1602         if(n == fullNode+1)
1603                 numChildren = CkNumPes()-1-(fullNode+1)*arity;
1604         if(n > fullNode+1)
1605                 numChildren = 0;
1606 }
1607
1608 #include "CentralLB.def.h"
1609  
1610 /*@}*/