c4ea7d1ed67651873f757796db75e8fe10c7572a
[charm.git] / src / ck-ldb / CentralLB.C
1 /**
2  * \addtogroup CkLdb
3 */
4 /*@{*/
5
6 #include <charm++.h>
7 #include "ck.h"
8 #include "envelope.h"
9 #include "CentralLB.h"
10 #include "LBDBManager.h"
11 #include "LBSimulation.h"
12
13 #include <vector>
14
15 #define  DEBUGF(x)       // CmiPrintf x;
16 #define  DEBUG(x)        // x;
17
18 #if CMK_MEM_CHECKPOINT
19    /* can not handle reduction in inmem FT */
20 #define USE_REDUCTION         0
21 #define USE_LDB_SPANNING_TREE 0
22 #elif defined(_FAULT_MLOG_)
23 /* can not handle reduction in inmem FT */
24 #define USE_REDUCTION         0
25 #define USE_LDB_SPANNING_TREE 0
26 #else
27 #define USE_REDUCTION         1
28 #define USE_LDB_SPANNING_TREE 1
29 #endif
30
31 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
32 extern int _restartFlag;
33 extern void getGlobalStep(CkGroupID );
34 extern void initMlogLBStep(CkGroupID );
35 extern int globalResumeCount;
36 extern void sendDummyMigrationCounts(int *);
37 #endif
38
39 #if CMK_GRID_QUEUE_AVAILABLE
40 CpvExtern(void *, CkGridObject);
41 #endif
42
43 CkGroupID loadbalancer;
44 int * lb_ptr;
45 int load_balancer_created;
46
47 double lb_migration_cost = 0.0;
48 double lb_strategy_cost = 0.0;
49
50 int lb_no_iterations = -1;
51 double cur_max_pe_load = 0.0;
52 double cur_avg_pe_load = 0.0;
53 double prev_load = 0.0;
54
55 struct AdaptiveData {
56   int iteration;
57   double max_load;
58   double avg_load;
59 };
60
61 struct AdaptiveLBDatabase {
62   std::vector<AdaptiveData> history_data;
63 } adaptive_lbdb;
64
65 enum state {
66   OFF,
67   ON,
68   PAUSE,
69   DECIDED,
70   LOAD_BALANCE
71 } local_state;
72
73 CreateLBFunc_Def(CentralLB, "CentralLB base class")
74
75 static void getPredictedLoadWithMsg(BaseLB::LDStats* stats, int count, 
76                              LBMigrateMsg *, LBInfo &info, int considerComm);
77
78 CkReductionMsg* lbDataCollection(int nMsg, CkReductionMsg** msgs) {
79   double lb_data[4];
80   lb_data[0] = 0;
81   lb_data[1] = 0;
82   lb_data[2] = 0;
83   for (int i = 0; i < nMsg; i++) {
84     CkAssert(msgs[i]->getSize() == 4*sizeof(double));
85     double* m = (double *)msgs[i]->getData();
86     lb_data[0] += m[0];
87     lb_data[1] = ((m[1] > lb_data[1])? m[1] : lb_data[1]);
88     lb_data[2] += m[2];
89     if (i == 0) {
90       lb_data[3] = m[3];
91     }
92   }
93   return CkReductionMsg::buildNew(4*sizeof(double), lb_data);
94 }
95
96 /*global*/ CkReduction::reducerType lbDataCollectionType;
97 /*initcall*/ void registerLBDataCollection(void) {
98   lbDataCollectionType = CkReduction::addReducer(lbDataCollection);
99 }
100
101 /*
102 void CreateCentralLB()
103 {
104   CProxy_CentralLB::ckNew(0);
105 }
106 */
107
108 void CentralLB::staticStartLB(void* data)
109 {
110   CentralLB *me = (CentralLB*)(data);
111   me->StartLB();
112 }
113
114 void CentralLB::staticMigrated(void* data, LDObjHandle h, int waitBarrier)
115 {
116   CentralLB *me = (CentralLB*)(data);
117   me->Migrated(h, waitBarrier);
118 }
119
120 void CentralLB::staticAtSync(void* data)
121 {
122   CentralLB *me = (CentralLB*)(data);
123   me->AtSync();
124 }
125
126 void CentralLB::initLB(const CkLBOptions &opt)
127 {
128 #if CMK_LBDB_ON
129   lbname = "CentralLB";
130   thisProxy = CProxy_CentralLB(thisgroup);
131   //  CkPrintf("Construct in %d\n",CkMyPe());
132
133   // create and turn on by default
134   receiver = theLbdb->
135     AddLocalBarrierReceiver((LDBarrierFn)(staticAtSync),(void*)(this));
136   notifier = theLbdb->getLBDB()->
137     NotifyMigrated((LDMigratedFn)(staticMigrated),(void*)(this));
138   startLbFnHdl = theLbdb->getLBDB()->
139     AddStartLBFn((LDStartLBFn)(staticStartLB),(void*)(this));
140
141   // CkPrintf("[%d] CentralLB initLB \n",CkMyPe());
142   if (opt.getSeqNo() > 0) turnOff();
143
144   stats_msg_count = 0;
145   statsMsgsList = NULL;
146   statsData = NULL;
147
148   storedMigrateMsg = NULL;
149   reduction_started = 0;
150
151   // for future predictor
152   if (_lb_predict) predicted_model = new FutureModel(_lb_predict_window);
153   else predicted_model=0;
154   // register user interface callbacks
155   theLbdb->getLBDB()->SetupPredictor((LDPredictModelFn)(staticPredictorOn),(LDPredictWindowFn)(staticPredictorOnWin),(LDPredictFn)(staticPredictorOff),(LDPredictModelFn)(staticChangePredictor),(void*)(this));
156
157   myspeed = theLbdb->ProcessorSpeed();
158
159   migrates_completed = 0;
160   future_migrates_completed = 0;
161   migrates_expected = -1;
162   future_migrates_expected = -1;
163   cur_ld_balancer = _lb_args.central_pe();      // 0 default
164   lbdone = 0;
165   count_msgs=0;
166   statsMsg = NULL;
167
168   if (_lb_args.statsOn()) theLbdb->CollectStatsOn();
169
170   load_balancer_created = 1;
171 #endif
172 }
173
174 CentralLB::~CentralLB()
175 {
176 #if CMK_LBDB_ON
177   delete [] statsMsgsList;
178   delete statsData;
179   theLbdb = CProxy_LBDatabase(_lbdb).ckLocalBranch();
180   if (theLbdb) {
181     theLbdb->getLBDB()->
182       RemoveNotifyMigrated(notifier);
183     theLbdb->
184       RemoveStartLBFn((LDStartLBFn)(staticStartLB));
185   }
186 #endif
187 }
188
189 void CentralLB::turnOn() 
190 {
191 #if CMK_LBDB_ON
192   theLbdb->getLBDB()->
193     TurnOnBarrierReceiver(receiver);
194   theLbdb->getLBDB()->
195     TurnOnNotifyMigrated(notifier);
196   theLbdb->getLBDB()->
197     TurnOnStartLBFn(startLbFnHdl);
198 #endif
199 }
200
201 void CentralLB::turnOff() 
202 {
203 #if CMK_LBDB_ON
204   theLbdb->getLBDB()->
205     TurnOffBarrierReceiver(receiver);
206   theLbdb->getLBDB()->
207     TurnOffNotifyMigrated(notifier);
208   theLbdb->getLBDB()->
209     TurnOffStartLBFn(startLbFnHdl);
210 #endif
211 }
212
213 void CentralLB::AtSync()
214 {
215 //  CkPrintf("AtSync CEntral LB [%d]\n", CkMyPe());
216 #if CMK_LBDB_ON
217 //  DEBUGF(("[%d] CentralLB AtSync step %d!!!!!\n",CkMyPe(),step()));
218
219 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
220         CpvAccess(_currentObj)=this;
221 #endif
222
223   // if num of processor is only 1, nothing should happen
224   if (!QueryBalanceNow(step()) || CkNumPes() == 1) {
225     MigrationDone(0);
226     return;
227   }
228   if(CmiNodeAlive(CkMyPe())){
229     thisProxy [CkMyPe()].ProcessAtSyncMin();
230   }
231 #endif
232 }
233
234 #include "ComlibStrategy.h"
235
236 void CentralLB::ProcessAtSync()
237 {
238
239   // Reset all adaptive lb related fields since load balancing is being done.
240   lb_no_iterations = -1;
241   adaptive_lbdb.history_data.clear();
242   prev_load = 0.0;
243   lb_ideal_period = INT_MAX;
244   local_state = OFF;
245
246
247 #if CMK_LBDB_ON
248   if (reduction_started) return;              // reducton in progress
249
250   CmiAssert(CmiNodeAlive(CkMyPe()));
251   if (CkMyPe() == cur_ld_balancer) {
252     start_lb_time = CkWallTimer();
253   }
254
255
256 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
257         initMlogLBStep(thisgroup);
258 #endif
259
260   // build message
261   BuildStatsMsg();
262
263 #if USE_REDUCTION
264     // reduction to get total number of objects and comm
265     // so that processor 0 can pre-allocate load balancing database
266   int counts[2];
267   counts[0] = theLbdb->GetObjDataSz();
268   counts[1] = theLbdb->GetCommDataSz();
269
270   CkCallback cb(CkIndex_CentralLB::ReceiveCounts((CkReductionMsg*)NULL), 
271                   thisProxy[0]);
272   contribute(2*sizeof(int), counts, CkReduction::sum_int, cb);
273   reduction_started = 1;
274 #else
275   SendStats();
276 #endif
277 #endif
278 }
279
280 void CentralLB::ProcessAtSyncMin()
281 {
282 #if CMK_LBDB_ON
283   if (reduction_started) return;              // reducton in progress
284
285   CmiAssert(CmiNodeAlive(CkMyPe()));
286   if (CkMyPe() == cur_ld_balancer) {
287     start_lb_time = CkWallTimer();
288   }
289
290
291 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
292         initMlogLBStep(thisgroup);
293 #endif
294   
295   lb_no_iterations++;
296   CkPrintf("ProcessAtSyncMin [%d] lb_iteration [%d] lb_ideal_period [%d]\n", CkMyPe(),
297       lb_no_iterations, lb_ideal_period);
298
299   // If decision has been made and has reached the lb_period, then do load
300   // balancing, else if hasn't reached ideal_period, then resume.
301   if (local_state == DECIDED) {
302     if (lb_no_iterations < lb_ideal_period) {
303       ResumeClients(1);
304     } else {
305       local_state = LOAD_BALANCE;
306       ProcessAtSync();
307     }
308     return;
309   }
310    
311   // If the state is ON and not DECIDED, then if havn't reached lb_period, then
312   // move ahead. If has reached lb_ideal_period, then change state to PAUSE and
313   // dont resume client.
314   if (local_state == ON) {
315     if (lb_no_iterations < lb_ideal_period) {
316       ResumeClients(1);
317     } else {
318       local_state = PAUSE;
319     }
320     return;
321   }
322
323   SendMinStats();
324   ResumeClients(1);
325 #endif
326 }
327
328 void CentralLB::SendMinStats() {
329
330  double total_load;
331  double idle_time;
332  double bg_walltime;
333   theLbdb->GetTime(&total_load,&total_load, &idle_time, &bg_walltime, &bg_walltime);
334   CkPrintf("Total walltime [%d] %lf: %lf: %lf final laod: %lf\n", CkMyPe(), total_load, idle_time, bg_walltime, (total_load - idle_time));
335
336   // Since the total_load is cumulative since the last load balancing stage,
337   // Hence it is subtracted from the previous load.
338   total_load -= idle_time;
339   double tmp = total_load;
340   total_load -= prev_load;
341   prev_load = tmp; 
342
343   double lb_data[4];
344   lb_data[0] = total_load;
345   lb_data[1] = total_load;
346   lb_data[2] = 1;
347   lb_data[3] = lb_no_iterations;
348
349   CkCallback cb(CkIndex_CentralLB::ReceiveMinStats((CkReductionMsg*)NULL), 
350                   thisProxy[0]);
351   contribute(4*sizeof(double), lb_data, lbDataCollectionType, cb);
352 }
353
354 void CentralLB::ReceiveMinStats(CkReductionMsg *msg) {
355   CmiAssert(CkMyPe() == 0);
356   double* load = (LBRealType *) msg->getData();
357   double max = load[1];
358   double avg = load[0]/load[2];
359   int iteration_n = load[3];
360   CkPrintf("Iteration %d Total load : %lf Avg load: %lf Max load: %lf\n\n",lb_no_iterations, load[0], load[0]/load[2], load[1]);
361
362   // Store the data for this iteration
363   AdaptiveData data;
364   data.iteration = lb_no_iterations;
365   data.max_load = max;
366   data.avg_load = avg;
367   adaptive_lbdb.history_data.push_back(data);
368
369   // If the max/avg ratio is greater than the threshold and also this is not the
370   // step immediately after load balancing, carry out load balancing
371   if (max/avg >= 1.1 && adaptive_lbdb.history_data.size() > 4) {
372     CkPrintf("Carry out load balancing step\n");
373     lb_ideal_period = iteration_n + 1;
374     thisProxy.LoadBalanceDecision(lb_ideal_period);
375     return;
376   }
377
378   // Generate the plan for the adaptive strategy
379   if (generatePlan()) {
380     thisProxy.LoadBalanceDecision(lb_ideal_period);
381   }
382 }
383
384 bool CentralLB::generatePlan() {
385   if (adaptive_lbdb.history_data.size() <= 4) {
386     return false;
387   }
388
389   // Some heuristics for lbperiod
390   // If constant load or almost constant,
391   // then max * new_lb_period > avg * new_lb_period + lb_cost
392   double max = 0.0;
393   double avg = 0.0;
394   AdaptiveData data;
395   for (int i = 1; i < adaptive_lbdb.history_data.size(); i++) {
396     data = adaptive_lbdb.history_data[i];
397     max += data.max_load;
398     avg += data.avg_load;
399     CkPrintf("max (%d, %lf) avg (%d, %lf)\n", i, data.max_load, i, data.avg_load);
400   }
401 //  max /= (lb_no_iterations - adaptive_lbdb.history_data[0].iteration);
402 //  avg /= (lb_no_iterations - adaptive_lbdb.history_data[0].iteration);
403 //
404 //  lb_ideal_period = (lb_strategy_cost + lb_migration_cost) / (max - avg);
405 //  CkPrintf("max : %lf, avg: %lf, strat cost: %lf, migration_cost: %lf, idealperiod : %d \n",
406 //      max, avg, lb_strategy_cost, lb_migration_cost, lb_ideal_period);
407 //
408   // If linearly varying load, then find lb_period
409   // area between the max and avg curve 
410   double mslope, aslope, mc, ac;
411   getLineEq(aslope, ac, mslope, mc);
412   CkPrintf("\n max: %fx + %f; avg: %fx + %f\n", mslope, mc, aslope, ac);
413   double a = (mslope - aslope)/2;
414   double b = (mc - ac);
415   double c = -(lb_strategy_cost + lb_migration_cost);
416   //c = -2.5;
417   lb_ideal_period = getPeriodForLinear(a, b, c);
418   CkPrintf("Ideal period for linear load %d\n", lb_ideal_period);
419
420   return true;
421 }
422
423 int CentralLB::getPeriodForLinear(double a, double b, double c) {
424   if (a == 0.0) {
425     return (-c / b);
426   }
427   int x;
428   double t = (b * b) - (4*a*c);
429   t = (-b + sqrt(t)) / (2*a);
430   x = t;
431   return x;
432 }
433
434 bool CentralLB::getLineEq(double& aslope, double& ac, double& mslope, double& mc) {
435   int total = adaptive_lbdb.history_data.size();
436   int iterations = 1 + adaptive_lbdb.history_data[total - 1].iteration -
437       adaptive_lbdb.history_data[0].iteration;
438   double a1 = 0;
439   double m1 = 0;
440   double a2 = 0;
441   double m2 = 0;
442   AdaptiveData data;
443   int i = 0;
444   for (i = 0; i <= total/2; i++) {
445     data = adaptive_lbdb.history_data[i];
446     m1 += data.max_load;
447     a1 += data.avg_load;
448   }
449   m1 /= i;
450   a1 /= i;
451
452   for (i = total/2; i < total; i++) {
453     data = adaptive_lbdb.history_data[i];
454     m2 += data.max_load;
455     a2 += data.avg_load;
456   }
457   m2 /= (i - total/2);
458   a2 /= (i - total/2);
459
460   aslope = 2 * (a2 - a1) / iterations;
461   mslope = 2 * (m2 - m1) / iterations;
462   ac = adaptive_lbdb.history_data[0].avg_load;
463   mc = adaptive_lbdb.history_data[0].max_load;
464   return true;
465 }
466
467 void CentralLB::LoadBalanceDecision(int period) {
468   CkPrintf("[%d] Load balance decision made cur iteration: %d period:%d state: %d\n",CkMyPe(), lb_no_iterations, period, local_state);
469   lb_ideal_period = period;
470   if (local_state == OFF) {
471     local_state = ON;
472
473     // Send the current iteration no
474     CkCallback cb(CkIndex_CentralLB::ReceiveIterationNo((CkReductionMsg*)NULL), 
475         thisProxy[0]);
476     int tmp = lb_no_iterations;
477     contribute(sizeof(int), &tmp, CkReduction::max_int, cb);
478     return;
479   }
480
481   if (local_state == ON) {
482     local_state = DECIDED;
483     return;
484   }
485
486   // If the state is PAUSE, then its waiting for the final decision from central
487   // processor. If the decision is that the ideal period is in the future,
488   // resume. If the ideal period is now, then carry out load balancing.
489   if (local_state == PAUSE) {
490     if (lb_no_iterations < lb_ideal_period) {
491       local_state = DECIDED;
492       ResumeClients(1);
493     } else {
494       ProcessAtSync();
495     }
496   }
497 }
498
499 void CentralLB::ReceiveIterationNo(CkReductionMsg *msg) {
500   CmiAssert(CkMyPe() == 0);
501   int *it_no = (int *) msg->getData();
502   CkPrintf("^^^^ Received all iteration no and final decision %d^^^^\n", it_no[0]);
503   //thisProxy.LoadBalanceDecision(it_no[0]);
504   lb_ideal_period = (lb_ideal_period > it_no[0]) ? lb_ideal_period : it_no[0] + 1;
505   thisProxy.LoadBalanceDecision(lb_ideal_period);
506 }
507
508 // called only on 0
509 void CentralLB::ReceiveCounts(CkReductionMsg  *msg)
510 {
511   CmiAssert(CkMyPe() == 0);
512   if (statsData == NULL) statsData = new LDStats;
513
514   int *counts = (int *)msg->getData();
515   int n_objs = counts[0];
516   int n_comm = counts[1];
517
518     // resize database
519   statsData->objData.resize(n_objs);
520   statsData->from_proc.resize(n_objs);
521   statsData->to_proc.resize(n_objs);
522   statsData->commData.resize(n_comm);
523
524   DEBUGF(("[%d] ReceiveCounts: n_objs:%d n_comm:%d\n",CkMyPe(), n_objs, n_comm));
525         
526     // broadcast call to let everybody start to send stats
527   thisProxy.SendStats();
528 }
529
530 void CentralLB::BuildStatsMsg()
531 {
532 #if CMK_LBDB_ON
533   // build and send stats
534   const int osz = theLbdb->GetObjDataSz();
535   const int csz = theLbdb->GetCommDataSz();
536
537   int npes = CkNumPes();
538   CLBStatsMsg* msg = new CLBStatsMsg(osz, csz);
539   _MEMCHECK(msg);
540   msg->from_pe = CkMyPe();
541 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
542         msg->step = step();
543 #endif
544   //msg->serial = CrnRand();
545
546 /*
547   theLbdb->TotalTime(&msg->total_walltime,&msg->total_cputime);
548   theLbdb->IdleTime(&msg->idletime);
549   theLbdb->BackgroundLoad(&msg->bg_walltime,&msg->bg_cputime);
550 */
551 #if CMK_LB_CPUTIMER
552   theLbdb->GetTime(&msg->total_walltime,&msg->total_cputime,
553                    &msg->idletime, &msg->bg_walltime,&msg->bg_cputime);
554 #else
555   theLbdb->GetTime(&msg->total_walltime,&msg->total_walltime,
556                    &msg->idletime, &msg->bg_walltime,&msg->bg_walltime);
557 #endif
558
559   msg->pe_speed = myspeed;
560   DEBUGF(("Processor %d Total time (wall,cpu) = %f %f Idle = %f Bg = %f %f\n", CkMyPe(),msg->total_walltime,msg->total_cputime,msg->idletime,msg->bg_walltime,msg->bg_cputime));
561
562   msg->n_objs = osz;
563   theLbdb->GetObjData(msg->objData);
564   msg->n_comm = csz;
565   theLbdb->GetCommData(msg->commData);
566 //  theLbdb->ClearLoads();
567   DEBUGF(("PE %d BuildStatsMsg %d objs, %d comm\n",CkMyPe(),msg->n_objs,msg->n_comm));
568
569   if(CkMyPe() == cur_ld_balancer) {
570     msg->avail_vector = new char[CkNumPes()];
571     LBDatabaseObj()->get_avail_vector(msg->avail_vector);
572     msg->next_lb = LBDatabaseObj()->new_lbbalancer();
573   }
574
575   CmiAssert(statsMsg == NULL);
576   statsMsg = msg;
577 #endif
578 }
579
580
581 // called on every processor
582 void CentralLB::SendStats()
583 {
584 #if CMK_LBDB_ON
585   CmiAssert(statsMsg != NULL);
586   reduction_started = 0;
587
588 #if USE_LDB_SPANNING_TREE
589   if(CkNumPes()>1024)
590   {
591     if (CkMyPe() == cur_ld_balancer)
592       thisProxy[CkMyPe()].ReceiveStats(statsMsg);
593     else
594       thisProxy[CkMyPe()].ReceiveStatsViaTree(statsMsg);
595   }
596   else
597 #endif
598   {
599     DEBUGF(("[%d] calling ReceiveStats on step %d \n",CmiMyPe(),step()));
600     thisProxy[cur_ld_balancer].ReceiveStats(statsMsg);
601   }
602
603   statsMsg = NULL;
604
605 #ifdef __BIGSIM__
606   BgEndStreaming();
607 #endif
608
609   {
610   // enfore the barrier to wait until centralLB says no
611   LDOMHandle h;
612   h.id.id.idx = 0;
613   theLbdb->getLBDB()->RegisteringObjects(h);
614   }
615 #endif
616 }
617
618 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
619 extern int donotCountMigration;
620 #endif
621
622 void CentralLB::Migrated(LDObjHandle h, int waitBarrier)
623 {
624 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
625     if(donotCountMigration){
626         return ;
627     }
628 #endif
629
630 #if CMK_LBDB_ON
631   if (waitBarrier) {
632             migrates_completed++;
633       DEBUGF(("[%d] An object migrated! %d %d\n",CkMyPe(),migrates_completed,migrates_expected));
634     if (migrates_completed == migrates_expected) {
635       MigrationDone(1);
636     }
637   }
638   else {
639     future_migrates_completed ++;
640     DEBUGF(("[%d] An object migrated with no barrier! %d expected: %d\n",CkMyPe(),future_migrates_completed,future_migrates_expected));
641     if (future_migrates_completed == future_migrates_expected)  {
642         CheckMigrationComplete();
643     }
644   }
645 #endif
646 }
647
648 void CentralLB::MissMigrate(int waitForBarrier)
649 {
650   LDObjHandle h;
651   Migrated(h, waitForBarrier);
652 }
653
654 // build a complete data from bufferred messages
655 // not used when USE_REDUCTION = 1
656 void CentralLB::buildStats()
657 {
658     statsData->nprocs() = stats_msg_count;
659     // allocate space
660     statsData->objData.resize(statsData->n_objs);
661     statsData->from_proc.resize(statsData->n_objs);
662     statsData->to_proc.resize(statsData->n_objs);
663     statsData->commData.resize(statsData->n_comm);
664
665     int nobj = 0;
666     int ncom = 0;
667     int nmigobj = 0;
668     // copy all data in individule message to this big structure
669     for (int pe=0; pe<CkNumPes(); pe++) {
670        int i;
671        CLBStatsMsg *msg = statsMsgsList[pe];
672        if(msg == NULL) continue;
673        for (i=0; i<msg->n_objs; i++) {
674          statsData->from_proc[nobj] = statsData->to_proc[nobj] = pe;
675          statsData->objData[nobj] = msg->objData[i];
676          if (msg->objData[i].migratable) nmigobj++;
677          nobj++;
678        }
679        for (i=0; i<msg->n_comm; i++) {
680          statsData->commData[ncom] = msg->commData[i];
681          ncom++;
682        }
683        // free the memory
684        delete msg;
685        statsMsgsList[pe]=0;
686     }
687     statsData->n_migrateobjs = nmigobj;
688 }
689
690 // deposit one processor data at a time, note database is pre-allocated
691 // to have enough space
692 // used when USE_REDUCTION = 1
693 void CentralLB::depositData(CLBStatsMsg *m)
694 {
695   int i;
696   if (m == NULL) return;
697
698   const int pe = m->from_pe;
699   struct ProcStats &procStat = statsData->procs[pe];
700   procStat.pe = pe;
701   procStat.total_walltime = m->total_walltime;
702   procStat.idletime = m->idletime;
703   procStat.bg_walltime = m->bg_walltime;
704 #if CMK_LB_CPUTIMER
705   procStat.total_cputime = m->total_cputime;
706   procStat.bg_cputime = m->bg_cputime;
707 #endif
708   procStat.pe_speed = m->pe_speed;
709   //procStat.utilization = 1.0;
710   procStat.available = CmiTrue;
711   procStat.n_objs = m->n_objs;
712
713   int &nobj = statsData->n_objs;
714   int &nmigobj = statsData->n_migrateobjs;
715   for (i=0; i<m->n_objs; i++) {
716       statsData->from_proc[nobj] = statsData->to_proc[nobj] = pe;
717       statsData->objData[nobj] = m->objData[i];
718       if (m->objData[i].migratable) nmigobj++;
719       nobj++;
720       CmiAssert(nobj <= statsData->objData.capacity());
721   }
722   int &n_comm = statsData->n_comm;
723   for (i=0; i<m->n_comm; i++) {
724       statsData->commData[n_comm] = m->commData[i];
725       n_comm++;
726       CmiAssert(n_comm <= statsData->commData.capacity());
727   }
728   delete m;
729 }
730
731 void CentralLB::ReceiveStats(CkMarshalledCLBStatsMessage &msg)
732 {
733 #if CMK_LBDB_ON
734   if (statsMsgsList == NULL) {
735     statsMsgsList = new CLBStatsMsg*[CkNumPes()];
736     CmiAssert(statsMsgsList != NULL);
737     for(int i=0; i < CkNumPes(); i++)
738       statsMsgsList[i] = 0;
739   }
740   if (statsData == NULL) statsData = new LDStats;
741
742     //  loop through all CLBStatsMsg in the incoming msg
743   int count = msg.getCount();
744   for (int num = 0; num < count; num++) 
745   {
746     CLBStatsMsg *m = msg.getMessage(num);
747     CmiAssert(m!=NULL);
748     const int pe = m->from_pe;
749     DEBUGF(("Stats msg received, %d %d %d %p step %d\n", pe,stats_msg_count,m->n_objs,m,step()));
750 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))     
751 /*      
752  *  if(m->step < step()){
753  *    //TODO: if a processor is redoing an old load balance step..
754  *    //tell it that the step is done and that it should not perform any migrations
755  *      thisProxy[pe].ReceiveDummyMigration();
756  *  }*/
757 #endif
758         
759     if(!CmiNodeAlive(pe)){
760         DEBUGF(("[%d] ReceiveStats called from invalidProcessor %d\n",CkMyPe(),pe));
761         continue;
762     }
763         
764     if (m->avail_vector!=NULL) {
765       LBDatabaseObj()->set_avail_vector(m->avail_vector,  m->next_lb);
766     }
767
768     if (statsMsgsList[pe] != 0) {
769       CkPrintf("*** Unexpected CLBStatsMsg in ReceiveStats from PE %d ***\n",
770              pe);
771     } else {
772       statsMsgsList[pe] = m;
773 #if USE_REDUCTION
774       depositData(m);
775 #else
776       // store per processor data right away
777       struct ProcStats &procStat = statsData->procs[pe];
778       procStat.pe = pe;
779       procStat.total_walltime = m->total_walltime;
780       procStat.idletime = m->idletime;
781       procStat.bg_walltime = m->bg_walltime;
782 #if CMK_LB_CPUTIMER
783       procStat.total_cputime = m->total_cputime;
784       procStat.bg_cputime = m->bg_cputime;
785 #endif
786       procStat.pe_speed = m->pe_speed;
787       //procStat.utilization = 1.0;
788       procStat.available = CmiTrue;
789       procStat.n_objs = m->n_objs;
790
791       statsData->n_objs += m->n_objs;
792       statsData->n_comm += m->n_comm;
793 #endif
794       stats_msg_count++;
795     }
796   }    // end of for
797
798   const int clients = CkNumValidPes();
799   DEBUGF(("THIS POINT count = %d, clients = %d\n",stats_msg_count,clients));
800  
801   if (stats_msg_count == clients) {
802         DEBUGF(("[%d] All stats messages received \n",CmiMyPe()));
803     statsData->nprocs() = stats_msg_count;
804     thisProxy[CkMyPe()].LoadBalance();
805   }
806 #endif
807 }
808
809 /** added by Abhinav for receiving msgs via spanning tree */
810 void CentralLB::ReceiveStatsViaTree(CkMarshalledCLBStatsMessage &msg)
811 {
812 #if CMK_LBDB_ON
813         CmiAssert(CkMyPe() != 0);
814         bufMsg.add(msg);         // buffer messages
815         count_msgs++;
816         //CkPrintf("here %d\n", CkMyPe());
817         if (count_msgs == st.numChildren+1) {
818                 if(st.parent == 0)
819                 {
820                         thisProxy[0].ReceiveStats(bufMsg);
821                         //CkPrintf("from %d\n", CkMyPe());
822                 }
823                 else
824                         thisProxy[st.parent].ReceiveStatsViaTree(bufMsg);
825                 count_msgs = 0;
826                 bufMsg.free();
827         } 
828 #endif
829 }
830
831 void CentralLB::LoadBalance()
832 {
833 #if CMK_LBDB_ON
834   int proc;
835   const int clients = CkNumPes();
836
837 #if ! USE_REDUCTION
838   // build data
839   buildStats();
840 #else
841   for (proc = 0; proc < clients; proc++) statsMsgsList[proc] = NULL;
842 #endif
843
844   if (_lb_args.debug()) 
845       CmiPrintf("\nCharmLB> %s: PE [%d] step %d starting at %f Memory: %f MB\n",
846                   lbname, cur_ld_balancer, step(), start_lb_time,
847                   CmiMemoryUsage()/(1024.0*1024.0));
848
849   // if we are in simulation mode read data
850   if (LBSimulation::doSimulation) simulationRead();
851
852   char *availVector = LBDatabaseObj()->availVector();
853   for(proc = 0; proc < clients; proc++)
854       statsData->procs[proc].available = (CmiBool)availVector[proc];
855
856   preprocess(statsData);
857
858 //    CkPrintf("Before Calling Strategy\n");
859
860   if (_lb_args.printSummary()) {
861       LBInfo info(clients);
862         // not take comm data
863       info.getInfo(statsData, clients, 0);
864       LBRealType mLoad, mCpuLoad, totalLoad;
865       info.getSummary(mLoad, mCpuLoad, totalLoad);
866       int nmsgs, nbytes;
867       statsData->computeNonlocalComm(nmsgs, nbytes);
868       CkPrintf("[%d] Load Summary (before LB): max (with bg load): %f max (obj only): %f average: %f at step %d nonlocal: %d msgs %.2fKB.\n", CkMyPe(), mLoad, mCpuLoad, totalLoad/clients, step(), nmsgs, 1.0*nbytes/1024);
869 //      if (_lb_args.debug() > 1) {
870 //        for (int i=0; i<statsData->n_objs; i++)
871 //          CmiPrintf("[%d] %.10f %.10f\n", i, statsData->objData[i].minWall, statsData->objData[i].maxWall);
872 //      }
873   }
874
875 #if CMK_REPLAYSYSTEM
876   LDHandle *loadBalancer_pointers;
877   if (_replaySystem) {
878     loadBalancer_pointers = (LDHandle*)malloc(CkNumPes()*sizeof(LDHandle));
879     for (int i=0; i<statsData->n_objs; ++i) loadBalancer_pointers[statsData->from_proc[i]] = statsData->objData[i].handle.omhandle.ldb;
880   }
881 #endif
882   
883   LBMigrateMsg* migrateMsg = Strategy(statsData);
884 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
885         migrateMsg->step = step();
886 #endif
887
888 #if CMK_REPLAYSYSTEM
889   CpdHandleLBMessage(&migrateMsg);
890   if (_replaySystem) {
891     for (int i=0; i<migrateMsg->n_moves; ++i) migrateMsg->moves[i].obj.omhandle.ldb = loadBalancer_pointers[migrateMsg->moves[i].from_pe];
892     free(loadBalancer_pointers);
893   }
894 #endif
895   
896   LBDatabaseObj()->get_avail_vector(migrateMsg->avail_vector);
897   migrateMsg->next_lb = LBDatabaseObj()->new_lbbalancer();
898
899   // if this is the step at which we need to dump the database
900   simulationWrite();
901
902 //  calculate predicted load
903 //  very time consuming though, so only happen when debugging is on
904   if (_lb_args.printSummary()) {
905       LBInfo info(clients);
906         // not take comm data
907       getPredictedLoadWithMsg(statsData, clients, migrateMsg, info, 0);
908       LBRealType mLoad, mCpuLoad, totalLoad;
909       info.getSummary(mLoad, mCpuLoad, totalLoad);
910       int nmsgs, nbytes;
911       statsData->computeNonlocalComm(nmsgs, nbytes);
912       CkPrintf("[%d] Load Summary (after LB): max (with bg load): %f max (obj only): %f average: %f at step %d nonlocal: %d msgs %.2fKB useMem: %.2fKB.\n", CkMyPe(), mLoad, mCpuLoad, totalLoad/clients, step(), nmsgs, 1.0*nbytes/1024, (1.0*useMem())/1024);
913       for (int i=0; i<clients; i++)
914         migrateMsg->expectedLoad[i] = info.peLoads[i];
915   }
916
917   DEBUGF(("[%d]calling recv migration\n",CkMyPe()));
918 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_)) 
919     lbDecisionCount++;
920     migrateMsg->lbDecisionCount = lbDecisionCount;
921 #endif
922
923   envelope *env = UsrToEnv(migrateMsg);
924   if (1) {
925       // broadcast
926     thisProxy.ReceiveMigration(migrateMsg);
927   }
928   else {
929     // split the migration for each processor
930     for (int p=0; p<CkNumPes(); p++) {
931       LBMigrateMsg *m = extractMigrateMsg(migrateMsg, p);
932       thisProxy[p].ReceiveMigration(m);
933     }
934     delete migrateMsg;
935   }
936
937   // Zero out data structures for next cycle
938   // CkPrintf("zeroing out data\n");
939   statsData->clear();
940   stats_msg_count=0;
941 #endif
942 }
943
944 // test if sender and receiver in a commData is nonmigratable.
945 static int isMigratable(LDObjData **objData, int *len, int count, const LDCommData &commData)
946 {
947 #if CMK_LBDB_ON
948   for (int pe=0 ; pe<count; pe++)
949   {
950     for (int i=0; i<len[pe]; i++)
951       if (LDObjIDEqual(objData[pe][i].objID(), commData.sender.objID()) ||
952           LDObjIDEqual(objData[pe][i].objID(), commData.receiver.get_destObj().objID())) 
953       return 0;
954   }
955 #endif
956   return 1;
957 }
958
959 // rebuild LDStats and remove all non-migratble objects and related things
960 void CentralLB::removeNonMigratable(LDStats* stats, int count)
961 {
962   int i;
963
964   // check if we have non-migratable objects
965   int have = 0;
966   for (i=0; i<stats->n_objs; i++) 
967   {
968     LDObjData &odata = stats->objData[i];
969     if (!odata.migratable) {
970       have = 1; break;
971     }
972   }
973   if (have == 0) return;
974
975   CkVec<LDObjData> nonmig;
976   CkVec<int> new_from_proc, new_to_proc;
977   nonmig.resize(stats->n_migrateobjs);
978   new_from_proc.resize(stats->n_migrateobjs);
979   new_to_proc.resize(stats->n_migrateobjs);
980   int n_objs = 0;
981   for (i=0; i<stats->n_objs; i++) 
982   {
983     LDObjData &odata = stats->objData[i];
984     if (odata.migratable) {
985       nonmig[n_objs] = odata;
986       new_from_proc[n_objs] = stats->from_proc[i];
987       new_to_proc[n_objs] = stats->to_proc[i];
988       n_objs ++;
989     }
990     else {
991       stats->procs[stats->from_proc[i]].bg_walltime += odata.wallTime;
992 #if CMK_LB_CPUTIMER
993       stats->procs[stats->from_proc[i]].bg_cputime += odata.cpuTime;
994 #endif
995     }
996   }
997   CmiAssert(stats->n_migrateobjs == n_objs);
998
999   stats->makeCommHash();
1000   
1001   CkVec<LDCommData> newCommData;
1002   newCommData.resize(stats->n_comm);
1003   int n_comm = 0;
1004   for (i=0; i<stats->n_comm; i++) 
1005   {
1006     LDCommData& cdata = stats->commData[i];
1007     if (!cdata.from_proc()) 
1008     {
1009       int idx = stats->getSendHash(cdata);
1010       CmiAssert(idx != -1);
1011       if (!stats->objData[idx].migratable) continue;
1012     }
1013     switch (cdata.receiver.get_type()) {
1014     case LD_PROC_MSG:
1015       break;
1016     case LD_OBJ_MSG:  {
1017       int idx = stats->getRecvHash(cdata);
1018       if (stats->complete_flag)
1019         CmiAssert(idx != -1);
1020       else if (idx == -1) continue;          // receiver not in this group
1021       if (!stats->objData[idx].migratable) continue;
1022       break;
1023       }
1024     case LD_OBJLIST_MSG:    // object message FIXME add multicast
1025       break;
1026     }
1027     newCommData[n_comm] = cdata;
1028     n_comm ++;
1029   }
1030
1031   if (n_objs != stats->n_objs) CmiPrintf("Removed %d nonmigratable %d comms - n_objs:%d migratable:%d\n", stats->n_objs-n_objs, stats->n_objs, stats->n_migrateobjs, stats->n_comm-n_comm);
1032
1033   // swap to new data
1034   stats->objData = nonmig;
1035   stats->from_proc = new_from_proc;
1036   stats->to_proc = new_to_proc;
1037   stats->n_objs = n_objs;
1038
1039   stats->commData = newCommData;
1040   stats->n_comm = n_comm;
1041
1042   stats->deleteCommHash();
1043   stats->makeCommHash();
1044
1045 }
1046
1047
1048 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1049 extern int restarted;
1050 #endif
1051
1052 void CentralLB::ReceiveMigration(LBMigrateMsg *m)
1053 {
1054   storedMigrateMsg = m;
1055   CkCallback cb(CkIndex_CentralLB::ProcessReceiveMigration((CkReductionMsg*)NULL),
1056                   thisProxy);
1057   contribute(0, NULL, CkReduction::max_int, cb);
1058 }
1059
1060 void CentralLB::ProcessReceiveMigration(CkReductionMsg  *msg)
1061 {
1062 #if CMK_LBDB_ON
1063         int i;
1064         LBMigrateMsg *m = storedMigrateMsg;
1065         CmiAssert(m!=NULL);
1066         delete msg;
1067
1068 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1069         int *dummyCounts;
1070
1071         DEBUGF(("[%d] Starting ReceiveMigration WITH step %d m->step %d\n",CkMyPe(),step(),m->step));
1072         // CmiPrintf("[%d] Starting ReceiveMigration step %d m->step %d\n",CkMyPe(),step(),m->step);
1073         if(step() > m->step){
1074                 char str[100];
1075                 envelope *env = UsrToEnv(m);
1076                 CmiPrintf("[%d] Object %s tProcessed %d m->TN %d\n",CmiMyPe(),mlogData->objID.toString(str),mlogData->tProcessed,env->TN);
1077                 return;
1078         }
1079         lbDecisionCount = m->lbDecisionCount;
1080 #endif
1081
1082   if (_lb_args.debug() > 1) 
1083     if (CkMyPe()%1024==0) CmiPrintf("[%d] Starting ReceiveMigration step %d at %f\n",CkMyPe(),step(), CmiWallTimer());
1084
1085   for (i=0; i<CkNumPes(); i++) theLbdb->lastLBInfo.expectedLoad[i] = m->expectedLoad[i];
1086   CmiAssert(migrates_expected <= 0 || migrates_completed == migrates_expected);
1087 /*FAULT_EVAC*/
1088   if(!CmiNodeAlive(CkMyPe())){
1089         delete m;
1090         return;
1091   }
1092   migrates_expected = 0;
1093   future_migrates_expected = 0;
1094 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1095         int sending=0;
1096     int dummy=0;
1097         LBDB *_myLBDB = theLbdb->getLBDB();
1098         if(_restartFlag){
1099         dummyCounts = new int[CmiNumPes()];
1100         bzero(dummyCounts,sizeof(int)*CmiNumPes());
1101     }
1102 #endif
1103   for(i=0; i < m->n_moves; i++) {
1104     MigrateInfo& move = m->moves[i];
1105     const int me = CkMyPe();
1106     if (move.from_pe == me && move.to_pe != me) {
1107       DEBUGF(("[%d] migrating object to %d\n",move.from_pe,move.to_pe));
1108       // migrate object, in case it is already gone, inform toPe
1109 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
1110       if (theLbdb->Migrate(move.obj,move.to_pe) == 0) 
1111          thisProxy[move.to_pe].MissMigrate(!move.async_arrival);
1112 #else
1113             if(_restartFlag == 0){
1114                 DEBUG(CmiPrintf("[%d] need to move object from %d to %d \n",CkMyPe(),move.from_pe,move.to_pe));
1115                 theLbdb->Migrate(move.obj,move.to_pe);
1116                 sending++;
1117             }else{
1118                 if(_myLBDB->validObjHandle(move.obj)){
1119                     DEBUG(CmiPrintf("[%d] need to move object from %d to %d \n",CkMyPe(),move.from_pe,move.to_pe));
1120                     theLbdb->Migrate(move.obj,move.to_pe);
1121                     sending++;
1122                 }else{
1123                     DEBUG(CmiPrintf("[%d] dummy move to pe %d detected after restart \n",CmiMyPe(),move.to_pe));
1124                     dummyCounts[move.to_pe]++;
1125                     dummy++;
1126                 }
1127             }
1128 #endif
1129     } else if (move.from_pe != me && move.to_pe == me) {
1130        DEBUGF(("[%d] expecting object from %d\n",move.to_pe,move.from_pe));
1131       if (!move.async_arrival) migrates_expected++;
1132       else future_migrates_expected++;
1133     }
1134   }
1135   DEBUGF(("[%d] in ReceiveMigration %d moves expected: %d future expected: %d\n",CkMyPe(),m->n_moves, migrates_expected, future_migrates_expected));
1136   // if (_lb_debug) CkPrintf("[%d] expecting %d objects migrating.\n", CkMyPe(), migrates_expected);
1137
1138 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1139         if(_restartFlag){
1140                 sendDummyMigrationCounts(dummyCounts);
1141                 _restartFlag  =0;
1142         delete []dummyCounts;
1143         }
1144 #endif
1145
1146
1147 #if 0
1148   if (m->n_moves ==0) {
1149     theLbdb->SetLBPeriod(theLbdb->GetLBPeriod()*2);
1150   }
1151 #endif
1152   cur_ld_balancer = m->next_lb;
1153   if((CkMyPe() == cur_ld_balancer) && (cur_ld_balancer != 0)){
1154       LBDatabaseObj()->set_avail_vector(m->avail_vector, -2);
1155   }
1156
1157   if (migrates_expected == 0 || migrates_completed == migrates_expected)
1158     MigrationDone(1);
1159   delete m;
1160
1161 //      CkEvacuatedElement();
1162 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1163 //  migrates_expected = 0;
1164 //  //  ResumeClients(1);
1165 #endif
1166 #endif
1167 }
1168
1169 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1170 void CentralLB::ReceiveDummyMigration(int globalDecisionCount){
1171     DEBUGF(("[%d] ReceiveDummyMigration called for step %d with globalDecisionCount %d\n",CkMyPe(),step(),globalDecisionCount));
1172     //TODO: this is gonna be important when a crash happens during checkpoint
1173     //the globalDecisionCount would have to be saved and compared against
1174     //a future recvMigration
1175                 
1176         thisProxy[CkMyPe()].ResumeClients(1);
1177 }
1178 #endif
1179
1180 void CentralLB::MigrationDone(int balancing)
1181 {
1182 #if CMK_LBDB_ON
1183   migrates_completed = 0;
1184   migrates_expected = -1;
1185   // clear load stats
1186   if (balancing) theLbdb->ClearLoads();
1187   // Increment to next step
1188   theLbdb->incStep();
1189         DEBUGF(("[%d] Incrementing Step %d \n",CkMyPe(),step()));
1190   // if sync resume, invoke a barrier
1191
1192 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1193     savedBalancing = balancing;
1194     startLoadBalancingMlog(&resumeCentralLbAfterChkpt,(void *)this);
1195 #endif
1196
1197   LBDatabase::Object()->MigrationDone();    // call registered callbacks
1198
1199   LoadbalanceDone(balancing);        // callback
1200 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
1201   // if sync resume invoke a barrier
1202   if (balancing && _lb_args.syncResume()) {
1203     CkCallback cb(CkIndex_CentralLB::ResumeClients((CkReductionMsg*)NULL), 
1204                   thisProxy);
1205     contribute(0, NULL, CkReduction::sum_int, cb);
1206   }
1207   else{ 
1208     if(CmiNodeAlive(CkMyPe())){
1209         thisProxy [CkMyPe()].ResumeClients(balancing);
1210     }   
1211   }     
1212 #if CMK_GRID_QUEUE_AVAILABLE
1213   CmiGridQueueDeregisterAll ();
1214   CpvAccess(CkGridObject) = NULL;
1215 #endif
1216 #endif 
1217 #endif
1218 }
1219
1220 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1221 void CentralLB::endMigrationDone(int balancing){
1222     DEBUGF(("[%d] CentralLB::endMigrationDone step %d\n",CkMyPe(),step()));
1223
1224
1225   if (balancing && _lb_args.syncResume()) {
1226     CkCallback cb(CkIndex_CentralLB::ResumeClients((CkReductionMsg*)NULL),
1227                   thisProxy);
1228     contribute(0, NULL, CkReduction::sum_int, cb);
1229   }
1230   else{
1231     if(CmiNodeAlive(CkMyPe())){
1232     DEBUGF(("[%d] Sending ResumeClients balancing %d \n",CkMyPe(),balancing));
1233     thisProxy [CkMyPe()].ResumeClients(balancing);
1234     }
1235   }
1236
1237 }
1238 #endif
1239
1240 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1241 void resumeCentralLbAfterChkpt(void *_lb){
1242     CentralLB *lb= (CentralLB *)_lb;
1243     CpvAccess(_currentObj)=lb;
1244     lb->endMigrationDone(lb->savedBalancing);
1245 }
1246 #endif
1247
1248
1249 void CentralLB::ResumeClients(CkReductionMsg *msg)
1250 {
1251   ResumeClients(1);
1252   delete msg;
1253 }
1254
1255 void CentralLB::ResumeClients(int balancing)
1256 {
1257 #if CMK_LBDB_ON
1258 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1259     resumeCount++;
1260     globalResumeCount = resumeCount;
1261 #endif
1262   DEBUGF(("[%d] Resuming clients. balancing:%d.\n",CkMyPe(),balancing));
1263   if (balancing && _lb_args.debug() && CkMyPe() == cur_ld_balancer) {
1264     double end_lb_time = CkWallTimer();
1265   }
1266
1267 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
1268   if (balancing) ComlibNotifyMigrationDone();  
1269 #endif
1270
1271   theLbdb->ResumeClients();
1272   if (balancing)  {
1273     CheckMigrationComplete();
1274     if (future_migrates_expected == 0 || 
1275             future_migrates_expected == future_migrates_completed) {
1276       CheckMigrationComplete();
1277     }
1278   }
1279 #endif
1280 }
1281
1282 /*
1283   migration of objects contains two different kinds:
1284   (1) objects want to make a barrier for migration completion
1285       (waitForBarrier is true)
1286       migrationDone() to finish and resumeClients
1287   (2) objects don't need a barrier
1288   However, next load balancing can only happen when both migrations complete
1289 */ 
1290 void CentralLB::CheckMigrationComplete()
1291 {
1292 #if CMK_LBDB_ON
1293   lbdone ++;
1294   if (lbdone == 2) {
1295     if (_lb_args.debug() && CkMyPe()==0) {
1296       double end_lb_time = CkWallTimer();
1297       CkPrintf("CharmLB> %s: PE [%d] step %d finished at %f duration %f s\n\n",
1298                 lbname, cur_ld_balancer, step()-1, end_lb_time,
1299                 end_lb_time-start_lb_time);
1300     }
1301
1302     lb_migration_cost = (CkWallTimer() - start_lb_time);
1303
1304     lbdone = 0;
1305     future_migrates_expected = -1;
1306     future_migrates_completed = 0;
1307     DEBUGF(("[%d] Migration Complete\n", CkMyPe()));
1308     // release local barrier  so that the next load balancer can go
1309     LDOMHandle h;
1310     h.id.id.idx = 0;
1311     theLbdb->getLBDB()->DoneRegisteringObjects(h);
1312     // switch to the next load balancer in the list
1313     // subtle: called from Migrated() may result in Migrated() called in next LB
1314     theLbdb->nextLoadbalancer(seqno);
1315   }
1316 #endif
1317 }
1318
1319 void CentralLB::preprocess(LDStats* stats)
1320 {
1321   if (_lb_args.ignoreBgLoad())
1322     stats->clearBgLoad();
1323
1324   // Call the predictor for the future
1325   if (_lb_predict) FuturePredictor(statsData);
1326 }
1327
1328 // default load balancing strategy
1329 LBMigrateMsg* CentralLB::Strategy(LDStats* stats)
1330 {
1331 #if CMK_LBDB_ON
1332   double strat_start_time = CkWallTimer();
1333   if (_lb_args.debug())
1334     CkPrintf("CharmLB> %s: PE [%d] strategy starting at %f\n", lbname, cur_ld_balancer, strat_start_time);
1335
1336   work(stats);
1337
1338   if (_lb_args.debug()>1)  {
1339     CkPrintf("CharmLB> Obj Map:\n");
1340     for (int i=0; i<stats->n_objs; i++) CkPrintf("%d ", stats->to_proc[i]);
1341     CkPrintf("\n");
1342   }
1343
1344   LBMigrateMsg *msg = createMigrateMsg(stats);
1345
1346   if (_lb_args.debug()) {
1347     double strat_end_time = CkWallTimer();
1348     envelope *env = UsrToEnv(msg);
1349
1350     double lbdbMemsize = LBDatabase::Object()->useMem()/1000;
1351     CkPrintf("CharmLB> %s: PE [%d] Memory: LBManager: %d KB CentralLB: %d KB\n",
1352               lbname, cur_ld_balancer, (int)lbdbMemsize, (int)(useMem()/1000));
1353     CkPrintf("CharmLB> %s: PE [%d] #Objects migrating: %d, LBMigrateMsg size: %.2f MB\n", lbname, cur_ld_balancer, msg->n_moves, env->getTotalsize()/1024.0/1024.0);
1354     CkPrintf("CharmLB> %s: PE [%d] strategy finished at %f duration %f s\n",
1355               lbname, cur_ld_balancer, strat_end_time, strat_end_time-strat_start_time);
1356     lb_strategy_cost = (strat_end_time - strat_start_time);
1357     CkPrintf("Strategy cost %f %f %f\n", strat_end_time, strat_start_time, lb_strategy_cost);
1358   }
1359
1360   return msg;
1361 #else
1362   return NULL;
1363 #endif
1364 }
1365
1366 void CentralLB::work(LDStats* stats)
1367 {
1368   // does nothing but print the database
1369   stats->print();
1370 }
1371
1372 // generate migrate message from stats->from_proc and to_proc
1373 LBMigrateMsg * CentralLB::createMigrateMsg(LDStats* stats)
1374 {
1375   int i;
1376   CkVec<MigrateInfo*> migrateInfo;
1377   for (i=0; i<stats->n_objs; i++) {
1378     LDObjData &objData = stats->objData[i];
1379     int frompe = stats->from_proc[i];
1380     int tope = stats->to_proc[i];
1381     if (frompe != tope) {
1382       //      CkPrintf("[%d] Obj %d migrating from %d to %d\n",
1383       //         CkMyPe(),obj,pe,dest);
1384       MigrateInfo *migrateMe = new MigrateInfo;
1385       migrateMe->obj = objData.handle;
1386       migrateMe->from_pe = frompe;
1387       migrateMe->to_pe = tope;
1388       migrateMe->async_arrival = objData.asyncArrival;
1389       migrateInfo.insertAtEnd(migrateMe);
1390     }
1391   }
1392
1393   int migrate_count=migrateInfo.length();
1394   LBMigrateMsg* msg = new(migrate_count,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
1395   msg->n_moves = migrate_count;
1396   for(i=0; i < migrate_count; i++) {
1397     MigrateInfo* item = (MigrateInfo*) migrateInfo[i];
1398     msg->moves[i] = *item;
1399     delete item;
1400     migrateInfo[i] = 0;
1401   }
1402   return msg;
1403 }
1404
1405 LBMigrateMsg * CentralLB::extractMigrateMsg(LBMigrateMsg *m, int p)
1406 {
1407   int nmoves = 0;
1408   int nunavail = 0;
1409   int i;
1410   for (i=0; i<m->n_moves; i++) {
1411     MigrateInfo* item = (MigrateInfo*) &m->moves[i];
1412     if (item->from_pe == p || item->to_pe == p) nmoves++;
1413   }
1414   for (i=0; i<CkNumPes();i++) {
1415     if (!m->avail_vector[i]) nunavail++;
1416   }
1417   LBMigrateMsg* msg;
1418   if (nunavail) msg = new(nmoves,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
1419   else msg = new(nmoves,0,0,0) LBMigrateMsg;
1420   msg->n_moves = nmoves;
1421   msg->level = m->level;
1422   msg->next_lb = m->next_lb;
1423   for (i=0,nmoves=0; i<m->n_moves; i++) {
1424     MigrateInfo* item = (MigrateInfo*) &m->moves[i];
1425     if (item->from_pe == p || item->to_pe == p) {
1426       msg->moves[nmoves] = *item;
1427       nmoves++;
1428     }
1429   }
1430   // copy processor data
1431   if (nunavail)
1432   for (i=0; i<CkNumPes();i++) {
1433     msg->avail_vector[i] = m->avail_vector[i];
1434     msg->expectedLoad[i] = m->expectedLoad[i];
1435   }
1436   return msg;
1437 }
1438
1439 void CentralLB::simulationWrite() {
1440   if(step() == LBSimulation::dumpStep)
1441   {
1442     // here we are supposed to dump the database
1443     int dumpFileSize = strlen(LBSimulation::dumpFile) + 4;
1444     char *dumpFileName = (char *)malloc(dumpFileSize);
1445     while (sprintf(dumpFileName, "%s.%d", LBSimulation::dumpFile, LBSimulation::dumpStep) >= dumpFileSize) {
1446       free(dumpFileName);
1447       dumpFileSize+=3;
1448       dumpFileName = (char *)malloc(dumpFileSize);
1449     }
1450     writeStatsMsgs(dumpFileName);
1451     free(dumpFileName);
1452     CmiPrintf("LBDump: Dumped the load balancing data at step %d.\n",LBSimulation::dumpStep);
1453     ++LBSimulation::dumpStep;
1454     --LBSimulation::dumpStepSize;
1455     if (LBSimulation::dumpStepSize <= 0) { // prevent stupid step sizes
1456       CmiPrintf("Charm++> Exiting...\n");
1457       CkExit();
1458     }
1459     return;
1460   }
1461 }
1462
1463 void CentralLB::simulationRead() {
1464   LBSimulation *simResults = NULL, *realResults;
1465   LBMigrateMsg *voidMessage = new (0,0,0,0) LBMigrateMsg();
1466   voidMessage->n_moves=0;
1467   for ( ;LBSimulation::simStepSize > 0; --LBSimulation::simStepSize, ++LBSimulation::simStep) {
1468     // here we are supposed to read the data from the dump database
1469     int simFileSize = strlen(LBSimulation::dumpFile) + 4;
1470     char *simFileName = (char *)malloc(simFileSize);
1471     while (sprintf(simFileName, "%s.%d", LBSimulation::dumpFile, LBSimulation::simStep) >= simFileSize) {
1472       free(simFileName);
1473       simFileSize+=3;
1474       simFileName = (char *)malloc(simFileSize);
1475     }
1476     readStatsMsgs(simFileName);
1477
1478     // allocate simResults (only the first step)
1479     if (simResults == NULL) {
1480       simResults = new LBSimulation(LBSimulation::simProcs);
1481       realResults = new LBSimulation(LBSimulation::simProcs);
1482     }
1483     else {
1484       // should be the same number of procs of the original simulation!
1485       if (!LBSimulation::procsChanged) {
1486         // it means we have a previous step, so in simResults there is data.
1487         // we can now print the real effects of the load balancer during the simulation
1488         // or print the difference between the predicted data and the real one.
1489         realResults->reset();
1490         // reset to_proc of statsData to be equal to from_proc
1491         for (int k=0; k < statsData->n_objs; ++k) statsData->to_proc[k] = statsData->from_proc[k];
1492         findSimResults(statsData, LBSimulation::simProcs, voidMessage, realResults);
1493         simResults->PrintDifferences(realResults,statsData);
1494       }
1495       simResults->reset();
1496     }
1497
1498     // now pass it to the strategy routine
1499     double startT = CkWallTimer();
1500     preprocess(statsData);
1501     CmiPrintf("%s> Strategy starts ... \n", lbname);
1502     LBMigrateMsg* migrateMsg = Strategy(statsData);
1503     CmiPrintf("%s> Strategy took %fs memory usage: CentralLB: %d KB.\n",
1504                lbname, CkWallTimer()-startT, (int)(useMem()/1000));
1505
1506     // now calculate the results of the load balancing simulation
1507     findSimResults(statsData, LBSimulation::simProcs, migrateMsg, simResults);
1508
1509     // now we have the simulation data, so print it and loop
1510     CmiPrintf("Charm++> LBSim: Simulation of load balancing step %d done.\n",LBSimulation::simStep);
1511     // **CWL** Officially recording my disdain here for using ints for bool
1512     if (LBSimulation::showDecisionsOnly) {
1513       simResults->PrintDecisions(migrateMsg, simFileName, 
1514                                  LBSimulation::simProcs);
1515     } else {
1516       simResults->PrintSimulationResults();
1517     }
1518
1519     free(simFileName);
1520     delete migrateMsg;
1521     CmiPrintf("Charm++> LBSim: Passing to the next step\n");
1522   }
1523   // deallocate simResults
1524   delete simResults;
1525   CmiPrintf("Charm++> Exiting...\n");
1526   CkExit();
1527 }
1528
1529 void CentralLB::readStatsMsgs(const char* filename) 
1530 {
1531 #if CMK_LBDB_ON
1532   int i;
1533   FILE *f = fopen(filename, "r");
1534   if (f==NULL) {
1535     CmiPrintf("Fatal Error> Cannot open LB Dump file %s!\n", filename);
1536     CmiAbort("");
1537   }
1538
1539   // at this stage, we need to rebuild the statsMsgList and
1540   // statsDataList structures. For that first deallocate the
1541   // old structures
1542   if (statsMsgsList) {
1543     for(i = 0; i < stats_msg_count; i++)
1544       delete statsMsgsList[i];
1545     delete[] statsMsgsList;
1546     statsMsgsList=0;
1547   }
1548
1549   PUP::fromDisk pd(f);
1550   PUP::machineInfo machInfo;
1551
1552   pd((char *)&machInfo, sizeof(machInfo));      // read machine info
1553   PUP::xlater p(machInfo, pd);
1554
1555   if (_lb_args.lbversion() > 1) {
1556     p|_lb_args.lbversion();             // write version number
1557     CkPrintf("LB> File version detected: %d\n", _lb_args.lbversion());
1558     CmiAssert(_lb_args.lbversion() <= LB_FORMAT_VERSION);
1559   }
1560   p|stats_msg_count;
1561
1562   CmiPrintf("readStatsMsgs for %d pes starts ... \n", stats_msg_count);
1563   if (LBSimulation::simProcs == 0) LBSimulation::simProcs = stats_msg_count;
1564   if (LBSimulation::simProcs != stats_msg_count) LBSimulation::procsChanged = true;
1565
1566   // LBSimulation::simProcs must be set
1567   statsData->pup(p);
1568
1569   CmiPrintf("Simulation for %d pes \n", LBSimulation::simProcs);
1570   CmiPrintf("n_obj: %d n_migratble: %d \n", statsData->n_objs, statsData->n_migrateobjs);
1571
1572   // file f is closed in the destructor of PUP::fromDisk
1573   CmiPrintf("ReadStatsMsg from %s completed\n", filename);
1574 #endif
1575 }
1576
1577 void CentralLB::writeStatsMsgs(const char* filename) 
1578 {
1579 #if CMK_LBDB_ON
1580   FILE *f = fopen(filename, "w");
1581   if (f==NULL) {
1582     CmiPrintf("Fatal Error> writeStatsMsgs failed to open the output file %s!\n", filename);
1583     CmiAbort("");
1584   }
1585
1586   const PUP::machineInfo &machInfo = PUP::machineInfo::current();
1587   PUP::toDisk p(f);
1588   p((char *)&machInfo, sizeof(machInfo));       // machine info
1589
1590   p|_lb_args.lbversion();               // write version number
1591   p|stats_msg_count;
1592   statsData->pup(p);
1593
1594   fclose(f);
1595
1596   CmiPrintf("WriteStatsMsgs to %s succeed!\n", filename);
1597 #endif
1598 }
1599
1600 // calculate the predicted wallclock/cpu load for every processors
1601 // considering communication overhead if considerComm is true
1602 void getPredictedLoadWithMsg(BaseLB::LDStats* stats, int count, 
1603                       LBMigrateMsg *msg, LBInfo &info, 
1604                       int considerComm)
1605 {
1606 #if CMK_LBDB_ON
1607         stats->makeCommHash();
1608
1609         // update to_proc according to migration msgs
1610         for(int i = 0; i < msg->n_moves; i++) {
1611           MigrateInfo &mInfo = msg->moves[i];
1612           int idx = stats->getHash(mInfo.obj.objID(), mInfo.obj.omID());
1613           CmiAssert(idx != -1);
1614           stats->to_proc[idx] = mInfo.to_pe;
1615         }
1616
1617         info.getInfo(stats, count, considerComm);
1618 #endif
1619 }
1620
1621
1622 void CentralLB::findSimResults(LDStats* stats, int count, LBMigrateMsg* msg, LBSimulation* simResults)
1623 {
1624     CkAssert(simResults != NULL && count == simResults->numPes);
1625     // estimate the new loads of the processors. As a first approximation, this is the
1626     // sum of the cpu times of the objects on that processor
1627     double startT = CkWallTimer();
1628     getPredictedLoadWithMsg(stats, count, msg, simResults->lbinfo, 1);
1629     CmiPrintf("getPredictedLoad finished in %fs\n", CkWallTimer()-startT);
1630 }
1631
1632 void CentralLB::pup(PUP::er &p) { 
1633   BaseLB::pup(p); 
1634   if (p.isUnpacking())  {
1635     initLB(CkLBOptions(seqno)); 
1636   }
1637   p|reduction_started;
1638   int has_statsMsg=0;
1639   if (p.isPacking()) has_statsMsg = (statsMsg!=NULL);
1640   p|has_statsMsg;
1641   if (has_statsMsg) {
1642     if (p.isUnpacking())
1643       statsMsg = new CLBStatsMsg;
1644     statsMsg->pup(p);
1645   }
1646   p|lb_ideal_period;
1647 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1648   p | lbDecisionCount;
1649   p | resumeCount;
1650 #endif
1651         
1652 }
1653
1654 int CentralLB::useMem() { 
1655   return sizeof(CentralLB) + statsData->useMem() + 
1656          CkNumPes() * sizeof(CLBStatsMsg *);
1657 }
1658
1659
1660 /**
1661   CLBStatsMsg is not a real message now.
1662   CLBStatsMsg is used for all processors to fill in their local load and comm
1663   statistics and send to processor 0
1664 */
1665
1666 CLBStatsMsg::CLBStatsMsg(int osz, int csz) {
1667   n_objs = osz;
1668   n_comm = csz;
1669   objData = new LDObjData[osz];
1670   commData = new LDCommData[csz];
1671   avail_vector = NULL;
1672 }
1673
1674 CLBStatsMsg::~CLBStatsMsg() {
1675   delete [] objData;
1676   delete [] commData;
1677   delete [] avail_vector;
1678 }
1679
1680 void CLBStatsMsg::pup(PUP::er &p) {
1681   int i;
1682   p|from_pe;
1683   p|pe_speed;
1684   p|total_walltime;
1685   p|idletime;
1686   p|bg_walltime;
1687 #if CMK_LB_CPUTIMER
1688   p|total_cputime;
1689   p|bg_cputime;
1690 #endif
1691 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1692   p | step;
1693 #endif
1694   p|n_objs;
1695   if (p.isUnpacking()) objData = new LDObjData[n_objs];
1696   for (i=0; i<n_objs; i++) p|objData[i];
1697   p|n_comm;
1698   if (p.isUnpacking()) commData = new LDCommData[n_comm];
1699   for (i=0; i<n_comm; i++) p|commData[i];
1700
1701   int has_avail_vector;
1702   if (!p.isUnpacking()) has_avail_vector = (avail_vector != NULL);
1703   p|has_avail_vector;
1704   if (p.isUnpacking()) {
1705     if (has_avail_vector) avail_vector = new char[CkNumPes()];
1706     else avail_vector = NULL;
1707   }
1708   if (has_avail_vector) p(avail_vector, CkNumPes());
1709
1710   p(next_lb);
1711 }
1712
1713 // CkMarshalledCLBStatsMessage is used in the marshalled parameter in
1714 // the entry function, it is just used to use to pup.
1715 // I don't use CLBStatsMsg directly as marshalled parameter because
1716 // I want the data pointer stored and not to be freed by the Charm++.
1717 void CkMarshalledCLBStatsMessage::free() { 
1718   int count = msgs.size();
1719   for  (int i=0; i<count; i++) {
1720     delete msgs[i];
1721     msgs[i] = NULL;
1722   }
1723   msgs.free();
1724 }
1725
1726 void CkMarshalledCLBStatsMessage::add(CkMarshalledCLBStatsMessage &m)
1727 {
1728   int count = m.getCount();
1729   for (int i=0; i<count; i++) add(m.getMessage(i));
1730 }
1731
1732 void CkMarshalledCLBStatsMessage::pup(PUP::er &p)
1733 {
1734   int count = msgs.size();
1735   p|count;
1736   for (int i=0; i<count; i++) {
1737     CLBStatsMsg *msg;
1738     if (p.isUnpacking()) msg = new CLBStatsMsg;
1739     else { 
1740       msg = msgs[i]; CmiAssert(msg!=NULL);
1741     }
1742     msg->pup(p);
1743     if (p.isUnpacking()) add(msg);
1744   }
1745 }
1746
1747 SpanningTree::SpanningTree()
1748 {
1749         double sq = sqrt(CkNumPes()*4.0-3.0) - 1; // 1 + arity + arity*arity = CkNumPes()
1750         arity = (int)ceil(sq/2);
1751         calcParent(CkMyPe());
1752         calcNumChildren(CkMyPe());
1753 }
1754
1755 void SpanningTree::calcParent(int n)
1756 {
1757         parent=-1;
1758         if(n != 0  && arity > 0)
1759                 parent = (n-1)/arity;
1760 }
1761
1762 void SpanningTree::calcNumChildren(int n)
1763 {
1764         numChildren = 0;
1765         if (arity == 0) return;
1766         int fullNode=(CkNumPes()-1-arity)/arity;
1767         if(n <= fullNode)
1768                 numChildren = arity;
1769         if(n == fullNode+1)
1770                 numChildren = CkNumPes()-1-(fullNode+1)*arity;
1771         if(n > fullNode+1)
1772                 numChildren = 0;
1773 }
1774
1775 #include "CentralLB.def.h"
1776  
1777 /*@}*/