258b922c7484097c0f25011dfd3a4b7667e40107
[charm.git] / src / ck-ldb / CentralLB.C
1
2 /**
3  * \addtogroup CkLdb
4 */
5 /*@{*/
6
7 #include <charm++.h>
8 #include "ck.h"
9 #include "envelope.h"
10 #include "CentralLB.h"
11 #include "LBDBManager.h"
12 #include "LBSimulation.h"
13
14 #include <vector>
15
16 #define  DEBUGF(x)       // CmiPrintf x;
17 #define  DEBUG(x)        // x;
18
19 #if CMK_MEM_CHECKPOINT
20    /* can not handle reduction in inmem FT */
21 #define USE_REDUCTION         0
22 #define USE_LDB_SPANNING_TREE 0
23 #elif defined(_FAULT_MLOG_)
24 /* can not handle reduction in inmem FT */
25 #define USE_REDUCTION         0
26 #define USE_LDB_SPANNING_TREE 0
27 #else
28 #define USE_REDUCTION         1
29 #define USE_LDB_SPANNING_TREE 1
30 #endif
31
32 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
33 extern int _restartFlag;
34 extern void getGlobalStep(CkGroupID );
35 extern void initMlogLBStep(CkGroupID );
36 extern int globalResumeCount;
37 extern void sendDummyMigrationCounts(int *);
38 #endif
39
40 #if CMK_GRID_QUEUE_AVAILABLE
41 CpvExtern(void *, CkGridObject);
42 #endif
43
44 CkGroupID loadbalancer;
45 int * lb_ptr;
46 int load_balancer_created;
47
48 double lb_migration_cost = 0.0;
49 double lb_strategy_cost = 0.0;
50
51 int lb_no_iterations = -1;
52 double cur_max_pe_load = 0.0;
53 double cur_avg_pe_load = 0.0;
54 double prev_load = 0.0;
55
56 bool lb_period_informed = false;
57
58 int global_max_iter_no = 0;
59 int global_recv_iter_counter = 0;
60
61 struct AdaptiveData {
62   int iteration;
63   double max_load;
64   double avg_load;
65 };
66
67 struct AdaptiveLBDatabase {
68   std::vector<AdaptiveData> history_data;
69 } adaptive_lbdb;
70
71 enum state {
72   OFF,
73   ON,
74   PAUSE,
75   DECIDED,
76   LOAD_BALANCE
77 } local_state;
78
79 CreateLBFunc_Def(CentralLB, "CentralLB base class")
80
81 static void getPredictedLoadWithMsg(BaseLB::LDStats* stats, int count, 
82                              LBMigrateMsg *, LBInfo &info, int considerComm);
83
84 CkReductionMsg* lbDataCollection(int nMsg, CkReductionMsg** msgs) {
85   double lb_data[4];
86   lb_data[0] = 0;
87   lb_data[1] = 0;
88   lb_data[2] = 0;
89   for (int i = 0; i < nMsg; i++) {
90     CkAssert(msgs[i]->getSize() == 4*sizeof(double));
91     double* m = (double *)msgs[i]->getData();
92     lb_data[0] += m[0];
93     lb_data[1] = ((m[1] > lb_data[1])? m[1] : lb_data[1]);
94     lb_data[2] += m[2];
95     if (i == 0) {
96       lb_data[3] = m[3];
97     }
98   }
99   return CkReductionMsg::buildNew(4*sizeof(double), lb_data);
100 }
101
102 CkReductionMsg* lbIterMax(int nMsg, CkReductionMsg** msgs) {
103   int max[1];
104   //CkPrintf("\tReduction type no of msg %d\n", nMsg);
105   max[0] = 0;
106   for (int i = 0; i < nMsg; i++) {
107     int *m = (int *)msgs[i]->getData();
108     max[0] = ((m[0] > max[0]) ? m[0] : max[0]);
109   //  CkPrintf("\tReduction type msg %d\n", *m);
110   }
111   return CkReductionMsg::buildNew(sizeof(int), max);
112 }
113
114 /*global*/ CkReduction::reducerType lbDataCollectionType;
115 /*global*/ CkReduction::reducerType lbIterMaxType;
116 /*initcall*/ void registerLBDataCollection(void) {
117   lbDataCollectionType = CkReduction::addReducer(lbDataCollection);
118   lbIterMaxType = CkReduction::addReducer(lbIterMax);
119 }
120
121 /*
122 void CreateCentralLB()
123 {
124   CProxy_CentralLB::ckNew(0);
125 }
126 */
127
128 void CentralLB::staticStartLB(void* data)
129 {
130   CentralLB *me = (CentralLB*)(data);
131   me->StartLB();
132 }
133
134 void CentralLB::staticMigrated(void* data, LDObjHandle h, int waitBarrier)
135 {
136   CentralLB *me = (CentralLB*)(data);
137   me->Migrated(h, waitBarrier);
138 }
139
140 void CentralLB::staticAtSync(void* data)
141 {
142   CentralLB *me = (CentralLB*)(data);
143   me->AtSync();
144 }
145
146 void CentralLB::initLB(const CkLBOptions &opt)
147 {
148 #if CMK_LBDB_ON
149   lbname = "CentralLB";
150   thisProxy = CProxy_CentralLB(thisgroup);
151   //  CkPrintf("Construct in %d\n",CkMyPe());
152
153   // create and turn on by default
154   receiver = theLbdb->
155     AddLocalBarrierReceiver((LDBarrierFn)(staticAtSync),(void*)(this));
156   notifier = theLbdb->getLBDB()->
157     NotifyMigrated((LDMigratedFn)(staticMigrated),(void*)(this));
158   startLbFnHdl = theLbdb->getLBDB()->
159     AddStartLBFn((LDStartLBFn)(staticStartLB),(void*)(this));
160
161   // CkPrintf("[%d] CentralLB initLB \n",CkMyPe());
162   if (opt.getSeqNo() > 0) turnOff();
163
164   stats_msg_count = 0;
165   statsMsgsList = NULL;
166   statsData = NULL;
167
168   storedMigrateMsg = NULL;
169   reduction_started = 0;
170
171   // for future predictor
172   if (_lb_predict) predicted_model = new FutureModel(_lb_predict_window);
173   else predicted_model=0;
174   // register user interface callbacks
175   theLbdb->getLBDB()->SetupPredictor((LDPredictModelFn)(staticPredictorOn),(LDPredictWindowFn)(staticPredictorOnWin),(LDPredictFn)(staticPredictorOff),(LDPredictModelFn)(staticChangePredictor),(void*)(this));
176
177   myspeed = theLbdb->ProcessorSpeed();
178
179   migrates_completed = 0;
180   future_migrates_completed = 0;
181   migrates_expected = -1;
182   future_migrates_expected = -1;
183   cur_ld_balancer = _lb_args.central_pe();      // 0 default
184   lbdone = 0;
185   count_msgs=0;
186   statsMsg = NULL;
187
188   if (_lb_args.statsOn()) theLbdb->CollectStatsOn();
189
190   load_balancer_created = 1;
191 #endif
192 }
193
194 CentralLB::~CentralLB()
195 {
196 #if CMK_LBDB_ON
197   delete [] statsMsgsList;
198   delete statsData;
199   theLbdb = CProxy_LBDatabase(_lbdb).ckLocalBranch();
200   if (theLbdb) {
201     theLbdb->getLBDB()->
202       RemoveNotifyMigrated(notifier);
203     theLbdb->
204       RemoveStartLBFn((LDStartLBFn)(staticStartLB));
205   }
206 #endif
207 }
208
209 void CentralLB::turnOn() 
210 {
211 #if CMK_LBDB_ON
212   theLbdb->getLBDB()->
213     TurnOnBarrierReceiver(receiver);
214   theLbdb->getLBDB()->
215     TurnOnNotifyMigrated(notifier);
216   theLbdb->getLBDB()->
217     TurnOnStartLBFn(startLbFnHdl);
218 #endif
219 }
220
221 void CentralLB::turnOff() 
222 {
223 #if CMK_LBDB_ON
224   theLbdb->getLBDB()->
225     TurnOffBarrierReceiver(receiver);
226   theLbdb->getLBDB()->
227     TurnOffNotifyMigrated(notifier);
228   theLbdb->getLBDB()->
229     TurnOffStartLBFn(startLbFnHdl);
230 #endif
231 }
232
233 void CentralLB::AtSync()
234 {
235 //  CkPrintf("AtSync CEntral LB [%d]\n", CkMyPe());
236 #if CMK_LBDB_ON
237 //  DEBUGF(("[%d] CentralLB AtSync step %d!!!!!\n",CkMyPe(),step()));
238
239 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
240         CpvAccess(_currentObj)=this;
241 #endif
242
243   // if num of processor is only 1, nothing should happen
244   if (!QueryBalanceNow(step()) || CkNumPes() == 1) {
245     MigrationDone(0);
246     return;
247   }
248   if(CmiNodeAlive(CkMyPe())){
249     thisProxy [CkMyPe()].ProcessAtSyncMin();
250   }
251 #endif
252 }
253
254 #include "ComlibStrategy.h"
255
256 void CentralLB::ProcessAtSync()
257 {
258
259
260
261 #if CMK_LBDB_ON
262   if (reduction_started) return;              // reducton in progress
263
264   CmiAssert(CmiNodeAlive(CkMyPe()));
265   if (CkMyPe() == cur_ld_balancer) {
266     start_lb_time = CkWallTimer();
267   }
268
269
270 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
271         initMlogLBStep(thisgroup);
272 #endif
273
274   // build message
275   BuildStatsMsg();
276
277 #if USE_REDUCTION
278     // reduction to get total number of objects and comm
279     // so that processor 0 can pre-allocate load balancing database
280   int counts[2];
281   counts[0] = theLbdb->GetObjDataSz();
282   counts[1] = theLbdb->GetCommDataSz();
283
284   CkCallback cb(CkIndex_CentralLB::ReceiveCounts((CkReductionMsg*)NULL), 
285                   thisProxy[0]);
286   contribute(2*sizeof(int), counts, CkReduction::sum_int, cb);
287   reduction_started = 1;
288 #else
289   SendStats();
290 #endif
291 #endif
292 }
293
294 void CentralLB::ProcessAtSyncMin()
295 {
296 #if CMK_LBDB_ON
297   if (reduction_started) return;              // reducton in progress
298
299   CmiAssert(CmiNodeAlive(CkMyPe()));
300   if (CkMyPe() == cur_ld_balancer) {
301     start_lb_time = CkWallTimer();
302   }
303
304
305 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
306         initMlogLBStep(thisgroup);
307 #endif
308   
309   lb_no_iterations++;
310  // CkPrintf("[%d] ProcessAtSyncMin lb_iteration [%d] lb_ideal_period [%d]\n", CkMyPe(),
311  //     lb_no_iterations, lb_ideal_period);
312
313   // If decision has been made and has reached the lb_period, then do load
314   // balancing, else if hasn't reached ideal_period, then resume.
315   if (local_state == DECIDED) {
316     if (lb_no_iterations < lb_ideal_period) {
317  //     CkPrintf("[%d] Decision is made but lagging\n", CkMyPe());
318       SendMinStats();
319       ResumeClients(0);
320     } else {
321       local_state = LOAD_BALANCE;
322  //     CkPrintf("[%d] Decision is made and do LB\n", CkMyPe());
323       ProcessAtSync();
324     }
325     return;
326   }
327    
328   // If the state is ON and not DECIDED, then if havn't reached lb_period, then
329   // move ahead. If has reached lb_ideal_period, then change state to PAUSE and
330   // dont resume client.
331   if (local_state == ON) {
332     if (lb_no_iterations < lb_ideal_period) {
333       SendMinStats();
334       ResumeClients(0);
335     } else {
336       local_state = PAUSE;
337     }
338     return;
339   }
340
341   SendMinStats();
342   ResumeClients(0);
343 #endif
344 }
345
346 void CentralLB::SendMinStats() {
347
348  double total_load;
349  double idle_time;
350  double bg_walltime;
351   theLbdb->GetTime(&total_load,&total_load, &idle_time, &bg_walltime, &bg_walltime);
352  // CkPrintf("Total walltime [%d] %lf: %lf: %lf final laod: %lf\n", CkMyPe(), total_load, idle_time, bg_walltime, (total_load - idle_time));
353
354   // Since the total_load is cumulative since the last load balancing stage,
355   // Hence it is subtracted from the previous load.
356   total_load -= idle_time;
357   double tmp = total_load;
358   total_load -= prev_load;
359   prev_load = tmp; 
360
361   double lb_data[4];
362   lb_data[0] = total_load;
363   lb_data[1] = total_load;
364   lb_data[2] = 1;
365   lb_data[3] = lb_no_iterations;
366
367   CkCallback cb(CkIndex_CentralLB::ReceiveMinStats((CkReductionMsg*)NULL), 
368                   thisProxy[0]);
369   contribute(4*sizeof(double), lb_data, lbDataCollectionType, cb);
370
371 //    int tmp1 = lb_no_iterations;
372 //    CkPrintf("[%d] contribution iteration_no: %d\n",CkMyPe(), tmp1);
373 //    // Send the current iteration no
374 //    CkCallback cb1(CkIndex_CentralLB::ReceiveIterationNo((CkReductionMsg*)NULL), 
375 //        thisProxy[0]);
376 //    contribute(sizeof(int), &tmp1, CkReduction::max_int, cb1);
377 }
378
379 void CentralLB::ReceiveMinStats(CkReductionMsg *msg) {
380   CmiAssert(CkMyPe() == 0);
381   double* load = (LBRealType *) msg->getData();
382   double max = load[1];
383   double avg = load[0]/load[2];
384   int iteration_n = load[3];
385   //CkPrintf("Iteration %d Total load : %lf Avg load: %lf Max load: %lf for %lf procs\n\n",iteration_n, load[0], load[0]/load[2], load[1], load[2]);
386   delete msg;
387
388   // Store the data for this iteration
389   AdaptiveData data;
390   data.iteration = lb_no_iterations;
391   data.max_load = max;
392   data.avg_load = avg;
393   adaptive_lbdb.history_data.push_back(data);
394
395   if (lb_period_informed) {
396     return;
397   }
398
399   // If the max/avg ratio is greater than the threshold and also this is not the
400   // step immediately after load balancing, carry out load balancing
401   //if (max/avg >= 1.1 && adaptive_lbdb.history_data.size() > 4) {
402   if (max/avg >= 1.1 && adaptive_lbdb.history_data.size() > 4) {
403     //CkPrintf("Carry out load balancing step at iter\n");
404     lb_ideal_period = iteration_n + 1;
405     lb_period_informed = true;
406     thisProxy.LoadBalanceDecision(lb_ideal_period);
407     return;
408   }
409
410   // Generate the plan for the adaptive strategy
411   if (generatePlan()) {
412     //CkPrintf("Carry out load balancing step at iter\n");
413     lb_period_informed = true;
414     thisProxy.LoadBalanceDecision(lb_ideal_period);
415   }
416 }
417
418 bool CentralLB::generatePlan() {
419   if (adaptive_lbdb.history_data.size() <= 4) {
420     return false;
421   }
422
423   // Some heuristics for lbperiod
424   // If constant load or almost constant,
425   // then max * new_lb_period > avg * new_lb_period + lb_cost
426   double max = 0.0;
427   double avg = 0.0;
428   AdaptiveData data;
429   for (int i = 1; i < adaptive_lbdb.history_data.size(); i++) {
430     data = adaptive_lbdb.history_data[i];
431     max += data.max_load;
432     avg += data.avg_load;
433     //CkPrintf("max (%d, %lf) avg (%d, %lf)\n", i, data.max_load, i, data.avg_load);
434   }
435 //  max /= (lb_no_iterations - adaptive_lbdb.history_data[0].iteration);
436 //  avg /= (lb_no_iterations - adaptive_lbdb.history_data[0].iteration);
437 //
438 //  lb_ideal_period = (lb_strategy_cost + lb_migration_cost) / (max - avg);
439 //  CkPrintf("max : %lf, avg: %lf, strat cost: %lf, migration_cost: %lf, idealperiod : %d \n",
440 //      max, avg, lb_strategy_cost, lb_migration_cost, lb_ideal_period);
441 //
442   // If linearly varying load, then find lb_period
443   // area between the max and avg curve 
444   double mslope, aslope, mc, ac;
445   getLineEq(aslope, ac, mslope, mc);
446   //CkPrintf("\n max: %fx + %f; avg: %fx + %f\n", mslope, mc, aslope, ac);
447   double a = (mslope - aslope)/2;
448   double b = (mc - ac);
449   double c = -(lb_strategy_cost + lb_migration_cost);
450   //c = -2.5;
451   lb_ideal_period = getPeriodForLinear(a, b, c);
452   //CkPrintf("Ideal period for linear load %d\n", lb_ideal_period);
453
454   return true;
455 }
456
457 int CentralLB::getPeriodForLinear(double a, double b, double c) {
458   if (a == 0.0) {
459     return (-c / b);
460   }
461   int x;
462   double t = (b * b) - (4*a*c);
463   t = (-b + sqrt(t)) / (2*a);
464   x = t;
465   return x;
466 }
467
468 bool CentralLB::getLineEq(double& aslope, double& ac, double& mslope, double& mc) {
469   int total = adaptive_lbdb.history_data.size();
470   int iterations = 1 + adaptive_lbdb.history_data[total - 1].iteration -
471       adaptive_lbdb.history_data[0].iteration;
472   double a1 = 0;
473   double m1 = 0;
474   double a2 = 0;
475   double m2 = 0;
476   AdaptiveData data;
477   int i = 0;
478   for (i = 0; i <= total/2; i++) {
479     data = adaptive_lbdb.history_data[i];
480     m1 += data.max_load;
481     a1 += data.avg_load;
482   }
483   m1 /= i;
484   a1 /= i;
485
486   for (i = total/2; i < total; i++) {
487     data = adaptive_lbdb.history_data[i];
488     m2 += data.max_load;
489     a2 += data.avg_load;
490   }
491   m2 /= (i - total/2);
492   a2 /= (i - total/2);
493
494   aslope = 2 * (a2 - a1) / iterations;
495   mslope = 2 * (m2 - m1) / iterations;
496   ac = adaptive_lbdb.history_data[0].avg_load;
497   mc = adaptive_lbdb.history_data[0].max_load;
498   return true;
499 }
500
501 void CentralLB::LoadBalanceDecision(int period) {
502   //CkPrintf("[%d] Load balance decision made cur iteration: %d period:%d state: %d\n",CkMyPe(), lb_no_iterations, period, local_state);
503   lb_ideal_period = period;
504   if (local_state == OFF) {
505     local_state = ON;
506     thisProxy[0].ReceiveIterationNo(lb_no_iterations);
507     return;
508   }
509
510   if (local_state == ON) {
511     CkPrintf("lb will happen at %d\n", lb_ideal_period);
512     local_state = DECIDED;
513     return;
514   }
515
516   // If the state is PAUSE, then its waiting for the final decision from central
517   // processor. If the decision is that the ideal period is in the future,
518   // resume. If the ideal period is now, then carry out load balancing.
519   if (local_state == PAUSE) {
520     if (lb_no_iterations < lb_ideal_period) {
521       local_state = DECIDED;
522       ResumeClients(0);
523     } else {
524       ProcessAtSync();
525     }
526   }
527 }
528
529 void CentralLB::ReceiveIterationNo(int local_iter_no) {
530   CmiAssert(CkMyPe() == 0);
531   global_recv_iter_counter++;
532   if (local_iter_no > global_max_iter_no) {
533     global_max_iter_no = local_iter_no;
534   }
535   if (CkNumPes() == global_recv_iter_counter) {
536     lb_ideal_period = (lb_ideal_period > global_max_iter_no) ? lb_ideal_period : global_max_iter_no + 1;
537     thisProxy.LoadBalanceDecision(lb_ideal_period);
538     global_max_iter_no = 0;
539     global_recv_iter_counter = 0;
540   }
541 }
542
543 // called only on 0
544 void CentralLB::ReceiveCounts(CkReductionMsg  *msg)
545 {
546   CmiAssert(CkMyPe() == 0);
547   if (statsData == NULL) statsData = new LDStats;
548
549   int *counts = (int *)msg->getData();
550   int n_objs = counts[0];
551   int n_comm = counts[1];
552
553     // resize database
554   statsData->objData.resize(n_objs);
555   statsData->from_proc.resize(n_objs);
556   statsData->to_proc.resize(n_objs);
557   statsData->commData.resize(n_comm);
558
559   DEBUGF(("[%d] ReceiveCounts: n_objs:%d n_comm:%d\n",CkMyPe(), n_objs, n_comm));
560         
561     // broadcast call to let everybody start to send stats
562   thisProxy.SendStats();
563 }
564
565 void CentralLB::BuildStatsMsg()
566 {
567 #if CMK_LBDB_ON
568   // build and send stats
569   const int osz = theLbdb->GetObjDataSz();
570   const int csz = theLbdb->GetCommDataSz();
571
572   int npes = CkNumPes();
573   CLBStatsMsg* msg = new CLBStatsMsg(osz, csz);
574   _MEMCHECK(msg);
575   msg->from_pe = CkMyPe();
576 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
577         msg->step = step();
578 #endif
579   //msg->serial = CrnRand();
580
581 /*
582   theLbdb->TotalTime(&msg->total_walltime,&msg->total_cputime);
583   theLbdb->IdleTime(&msg->idletime);
584   theLbdb->BackgroundLoad(&msg->bg_walltime,&msg->bg_cputime);
585 */
586 #if CMK_LB_CPUTIMER
587   theLbdb->GetTime(&msg->total_walltime,&msg->total_cputime,
588                    &msg->idletime, &msg->bg_walltime,&msg->bg_cputime);
589 #else
590   theLbdb->GetTime(&msg->total_walltime,&msg->total_walltime,
591                    &msg->idletime, &msg->bg_walltime,&msg->bg_walltime);
592 #endif
593
594   msg->pe_speed = myspeed;
595   DEBUGF(("Processor %d Total time (wall,cpu) = %f %f Idle = %f Bg = %f %f\n", CkMyPe(),msg->total_walltime,msg->total_cputime,msg->idletime,msg->bg_walltime,msg->bg_cputime));
596
597   msg->n_objs = osz;
598   theLbdb->GetObjData(msg->objData);
599   msg->n_comm = csz;
600   theLbdb->GetCommData(msg->commData);
601 //  theLbdb->ClearLoads();
602   DEBUGF(("PE %d BuildStatsMsg %d objs, %d comm\n",CkMyPe(),msg->n_objs,msg->n_comm));
603
604   if(CkMyPe() == cur_ld_balancer) {
605     msg->avail_vector = new char[CkNumPes()];
606     LBDatabaseObj()->get_avail_vector(msg->avail_vector);
607     msg->next_lb = LBDatabaseObj()->new_lbbalancer();
608   }
609
610   CmiAssert(statsMsg == NULL);
611   statsMsg = msg;
612 #endif
613 }
614
615
616 // called on every processor
617 void CentralLB::SendStats()
618 {
619 #if CMK_LBDB_ON
620   CmiAssert(statsMsg != NULL);
621   reduction_started = 0;
622
623 #if USE_LDB_SPANNING_TREE
624   if(CkNumPes()>1024)
625   {
626     if (CkMyPe() == cur_ld_balancer)
627       thisProxy[CkMyPe()].ReceiveStats(statsMsg);
628     else
629       thisProxy[CkMyPe()].ReceiveStatsViaTree(statsMsg);
630   }
631   else
632 #endif
633   {
634     DEBUGF(("[%d] calling ReceiveStats on step %d \n",CmiMyPe(),step()));
635     thisProxy[cur_ld_balancer].ReceiveStats(statsMsg);
636   }
637
638   statsMsg = NULL;
639
640 #ifdef __BIGSIM__
641   BgEndStreaming();
642 #endif
643
644   {
645   // enfore the barrier to wait until centralLB says no
646   LDOMHandle h;
647   h.id.id.idx = 0;
648   theLbdb->getLBDB()->RegisteringObjects(h);
649   }
650 #endif
651 }
652
653 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
654 extern int donotCountMigration;
655 #endif
656
657 void CentralLB::Migrated(LDObjHandle h, int waitBarrier)
658 {
659 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
660     if(donotCountMigration){
661         return ;
662     }
663 #endif
664
665 #if CMK_LBDB_ON
666   if (waitBarrier) {
667             migrates_completed++;
668       DEBUGF(("[%d] An object migrated! %d %d\n",CkMyPe(),migrates_completed,migrates_expected));
669     if (migrates_completed == migrates_expected) {
670       MigrationDone(1);
671     }
672   }
673   else {
674     future_migrates_completed ++;
675     DEBUGF(("[%d] An object migrated with no barrier! %d expected: %d\n",CkMyPe(),future_migrates_completed,future_migrates_expected));
676     if (future_migrates_completed == future_migrates_expected)  {
677         CheckMigrationComplete();
678     }
679   }
680 #endif
681 }
682
683 void CentralLB::MissMigrate(int waitForBarrier)
684 {
685   LDObjHandle h;
686   Migrated(h, waitForBarrier);
687 }
688
689 // build a complete data from bufferred messages
690 // not used when USE_REDUCTION = 1
691 void CentralLB::buildStats()
692 {
693     statsData->nprocs() = stats_msg_count;
694     // allocate space
695     statsData->objData.resize(statsData->n_objs);
696     statsData->from_proc.resize(statsData->n_objs);
697     statsData->to_proc.resize(statsData->n_objs);
698     statsData->commData.resize(statsData->n_comm);
699
700     int nobj = 0;
701     int ncom = 0;
702     int nmigobj = 0;
703     // copy all data in individule message to this big structure
704     for (int pe=0; pe<CkNumPes(); pe++) {
705        int i;
706        CLBStatsMsg *msg = statsMsgsList[pe];
707        if(msg == NULL) continue;
708        for (i=0; i<msg->n_objs; i++) {
709          statsData->from_proc[nobj] = statsData->to_proc[nobj] = pe;
710          statsData->objData[nobj] = msg->objData[i];
711          if (msg->objData[i].migratable) nmigobj++;
712          nobj++;
713        }
714        for (i=0; i<msg->n_comm; i++) {
715          statsData->commData[ncom] = msg->commData[i];
716          ncom++;
717        }
718        // free the memory
719        delete msg;
720        statsMsgsList[pe]=0;
721     }
722     statsData->n_migrateobjs = nmigobj;
723 }
724
725 // deposit one processor data at a time, note database is pre-allocated
726 // to have enough space
727 // used when USE_REDUCTION = 1
728 void CentralLB::depositData(CLBStatsMsg *m)
729 {
730   int i;
731   if (m == NULL) return;
732
733   const int pe = m->from_pe;
734   struct ProcStats &procStat = statsData->procs[pe];
735   procStat.pe = pe;
736   procStat.total_walltime = m->total_walltime;
737   procStat.idletime = m->idletime;
738   procStat.bg_walltime = m->bg_walltime;
739 #if CMK_LB_CPUTIMER
740   procStat.total_cputime = m->total_cputime;
741   procStat.bg_cputime = m->bg_cputime;
742 #endif
743   procStat.pe_speed = m->pe_speed;
744   //procStat.utilization = 1.0;
745   procStat.available = CmiTrue;
746   procStat.n_objs = m->n_objs;
747
748   int &nobj = statsData->n_objs;
749   int &nmigobj = statsData->n_migrateobjs;
750   for (i=0; i<m->n_objs; i++) {
751       statsData->from_proc[nobj] = statsData->to_proc[nobj] = pe;
752       statsData->objData[nobj] = m->objData[i];
753       if (m->objData[i].migratable) nmigobj++;
754       nobj++;
755       CmiAssert(nobj <= statsData->objData.capacity());
756   }
757   int &n_comm = statsData->n_comm;
758   for (i=0; i<m->n_comm; i++) {
759       statsData->commData[n_comm] = m->commData[i];
760       n_comm++;
761       CmiAssert(n_comm <= statsData->commData.capacity());
762   }
763   delete m;
764 }
765
766 void CentralLB::ReceiveStats(CkMarshalledCLBStatsMessage &msg)
767 {
768 #if CMK_LBDB_ON
769   if (statsMsgsList == NULL) {
770     statsMsgsList = new CLBStatsMsg*[CkNumPes()];
771     CmiAssert(statsMsgsList != NULL);
772     for(int i=0; i < CkNumPes(); i++)
773       statsMsgsList[i] = 0;
774   }
775   if (statsData == NULL) statsData = new LDStats;
776
777     //  loop through all CLBStatsMsg in the incoming msg
778   int count = msg.getCount();
779   for (int num = 0; num < count; num++) 
780   {
781     CLBStatsMsg *m = msg.getMessage(num);
782     CmiAssert(m!=NULL);
783     const int pe = m->from_pe;
784     DEBUGF(("Stats msg received, %d %d %d %p step %d\n", pe,stats_msg_count,m->n_objs,m,step()));
785 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))     
786 /*      
787  *  if(m->step < step()){
788  *    //TODO: if a processor is redoing an old load balance step..
789  *    //tell it that the step is done and that it should not perform any migrations
790  *      thisProxy[pe].ReceiveDummyMigration();
791  *  }*/
792 #endif
793         
794     if(!CmiNodeAlive(pe)){
795         DEBUGF(("[%d] ReceiveStats called from invalidProcessor %d\n",CkMyPe(),pe));
796         continue;
797     }
798         
799     if (m->avail_vector!=NULL) {
800       LBDatabaseObj()->set_avail_vector(m->avail_vector,  m->next_lb);
801     }
802
803     if (statsMsgsList[pe] != 0) {
804       CkPrintf("*** Unexpected CLBStatsMsg in ReceiveStats from PE %d ***\n",
805              pe);
806     } else {
807       statsMsgsList[pe] = m;
808 #if USE_REDUCTION
809       depositData(m);
810 #else
811       // store per processor data right away
812       struct ProcStats &procStat = statsData->procs[pe];
813       procStat.pe = pe;
814       procStat.total_walltime = m->total_walltime;
815       procStat.idletime = m->idletime;
816       procStat.bg_walltime = m->bg_walltime;
817 #if CMK_LB_CPUTIMER
818       procStat.total_cputime = m->total_cputime;
819       procStat.bg_cputime = m->bg_cputime;
820 #endif
821       procStat.pe_speed = m->pe_speed;
822       //procStat.utilization = 1.0;
823       procStat.available = CmiTrue;
824       procStat.n_objs = m->n_objs;
825
826       statsData->n_objs += m->n_objs;
827       statsData->n_comm += m->n_comm;
828 #endif
829       stats_msg_count++;
830     }
831   }    // end of for
832
833   const int clients = CkNumValidPes();
834   DEBUGF(("THIS POINT count = %d, clients = %d\n",stats_msg_count,clients));
835  
836   if (stats_msg_count == clients) {
837         DEBUGF(("[%d] All stats messages received \n",CmiMyPe()));
838     statsData->nprocs() = stats_msg_count;
839     thisProxy[CkMyPe()].LoadBalance();
840   }
841 #endif
842 }
843
844 /** added by Abhinav for receiving msgs via spanning tree */
845 void CentralLB::ReceiveStatsViaTree(CkMarshalledCLBStatsMessage &msg)
846 {
847 #if CMK_LBDB_ON
848         CmiAssert(CkMyPe() != 0);
849         bufMsg.add(msg);         // buffer messages
850         count_msgs++;
851         //CkPrintf("here %d\n", CkMyPe());
852         if (count_msgs == st.numChildren+1) {
853                 if(st.parent == 0)
854                 {
855                         thisProxy[0].ReceiveStats(bufMsg);
856                         //CkPrintf("from %d\n", CkMyPe());
857                 }
858                 else
859                         thisProxy[st.parent].ReceiveStatsViaTree(bufMsg);
860                 count_msgs = 0;
861                 bufMsg.free();
862         } 
863 #endif
864 }
865
866 void CentralLB::LoadBalance()
867 {
868 #if CMK_LBDB_ON
869   int proc;
870   const int clients = CkNumPes();
871
872 #if ! USE_REDUCTION
873   // build data
874   buildStats();
875 #else
876   for (proc = 0; proc < clients; proc++) statsMsgsList[proc] = NULL;
877 #endif
878
879   if (_lb_args.debug()) 
880       CmiPrintf("\nCharmLB> %s: PE [%d] step %d starting at %f Memory: %f MB\n",
881                   lbname, cur_ld_balancer, step(), start_lb_time,
882                   CmiMemoryUsage()/(1024.0*1024.0));
883
884   // if we are in simulation mode read data
885   if (LBSimulation::doSimulation) simulationRead();
886
887   char *availVector = LBDatabaseObj()->availVector();
888   for(proc = 0; proc < clients; proc++)
889       statsData->procs[proc].available = (CmiBool)availVector[proc];
890
891   preprocess(statsData);
892
893 //    CkPrintf("Before Calling Strategy\n");
894
895   if (_lb_args.printSummary()) {
896       LBInfo info(clients);
897         // not take comm data
898       info.getInfo(statsData, clients, 0);
899       LBRealType mLoad, mCpuLoad, totalLoad;
900       info.getSummary(mLoad, mCpuLoad, totalLoad);
901       int nmsgs, nbytes;
902       statsData->computeNonlocalComm(nmsgs, nbytes);
903       CkPrintf("[%d] Load Summary (before LB): max (with bg load): %f max (obj only): %f average: %f at step %d nonlocal: %d msgs %.2fKB.\n", CkMyPe(), mLoad, mCpuLoad, totalLoad/clients, step(), nmsgs, 1.0*nbytes/1024);
904 //      if (_lb_args.debug() > 1) {
905 //        for (int i=0; i<statsData->n_objs; i++)
906 //          CmiPrintf("[%d] %.10f %.10f\n", i, statsData->objData[i].minWall, statsData->objData[i].maxWall);
907 //      }
908   }
909
910 #if CMK_REPLAYSYSTEM
911   LDHandle *loadBalancer_pointers;
912   if (_replaySystem) {
913     loadBalancer_pointers = (LDHandle*)malloc(CkNumPes()*sizeof(LDHandle));
914     for (int i=0; i<statsData->n_objs; ++i) loadBalancer_pointers[statsData->from_proc[i]] = statsData->objData[i].handle.omhandle.ldb;
915   }
916 #endif
917   
918   LBMigrateMsg* migrateMsg = Strategy(statsData);
919 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
920         migrateMsg->step = step();
921 #endif
922
923 #if CMK_REPLAYSYSTEM
924   CpdHandleLBMessage(&migrateMsg);
925   if (_replaySystem) {
926     for (int i=0; i<migrateMsg->n_moves; ++i) migrateMsg->moves[i].obj.omhandle.ldb = loadBalancer_pointers[migrateMsg->moves[i].from_pe];
927     free(loadBalancer_pointers);
928   }
929 #endif
930   
931   LBDatabaseObj()->get_avail_vector(migrateMsg->avail_vector);
932   migrateMsg->next_lb = LBDatabaseObj()->new_lbbalancer();
933
934   // if this is the step at which we need to dump the database
935   simulationWrite();
936
937 //  calculate predicted load
938 //  very time consuming though, so only happen when debugging is on
939   if (_lb_args.printSummary()) {
940       LBInfo info(clients);
941         // not take comm data
942       getPredictedLoadWithMsg(statsData, clients, migrateMsg, info, 0);
943       LBRealType mLoad, mCpuLoad, totalLoad;
944       info.getSummary(mLoad, mCpuLoad, totalLoad);
945       int nmsgs, nbytes;
946       statsData->computeNonlocalComm(nmsgs, nbytes);
947       CkPrintf("[%d] Load Summary (after LB): max (with bg load): %f max (obj only): %f average: %f at step %d nonlocal: %d msgs %.2fKB useMem: %.2fKB.\n", CkMyPe(), mLoad, mCpuLoad, totalLoad/clients, step(), nmsgs, 1.0*nbytes/1024, (1.0*useMem())/1024);
948       for (int i=0; i<clients; i++)
949         migrateMsg->expectedLoad[i] = info.peLoads[i];
950   }
951
952   DEBUGF(("[%d]calling recv migration\n",CkMyPe()));
953 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_)) 
954     lbDecisionCount++;
955     migrateMsg->lbDecisionCount = lbDecisionCount;
956 #endif
957
958   envelope *env = UsrToEnv(migrateMsg);
959   if (1) {
960       // broadcast
961     thisProxy.ReceiveMigration(migrateMsg);
962   }
963   else {
964     // split the migration for each processor
965     for (int p=0; p<CkNumPes(); p++) {
966       LBMigrateMsg *m = extractMigrateMsg(migrateMsg, p);
967       thisProxy[p].ReceiveMigration(m);
968     }
969     delete migrateMsg;
970   }
971
972   // Zero out data structures for next cycle
973   // CkPrintf("zeroing out data\n");
974   statsData->clear();
975   stats_msg_count=0;
976 #endif
977 }
978
979 // test if sender and receiver in a commData is nonmigratable.
980 static int isMigratable(LDObjData **objData, int *len, int count, const LDCommData &commData)
981 {
982 #if CMK_LBDB_ON
983   for (int pe=0 ; pe<count; pe++)
984   {
985     for (int i=0; i<len[pe]; i++)
986       if (LDObjIDEqual(objData[pe][i].objID(), commData.sender.objID()) ||
987           LDObjIDEqual(objData[pe][i].objID(), commData.receiver.get_destObj().objID())) 
988       return 0;
989   }
990 #endif
991   return 1;
992 }
993
994 // rebuild LDStats and remove all non-migratble objects and related things
995 void CentralLB::removeNonMigratable(LDStats* stats, int count)
996 {
997   int i;
998
999   // check if we have non-migratable objects
1000   int have = 0;
1001   for (i=0; i<stats->n_objs; i++) 
1002   {
1003     LDObjData &odata = stats->objData[i];
1004     if (!odata.migratable) {
1005       have = 1; break;
1006     }
1007   }
1008   if (have == 0) return;
1009
1010   CkVec<LDObjData> nonmig;
1011   CkVec<int> new_from_proc, new_to_proc;
1012   nonmig.resize(stats->n_migrateobjs);
1013   new_from_proc.resize(stats->n_migrateobjs);
1014   new_to_proc.resize(stats->n_migrateobjs);
1015   int n_objs = 0;
1016   for (i=0; i<stats->n_objs; i++) 
1017   {
1018     LDObjData &odata = stats->objData[i];
1019     if (odata.migratable) {
1020       nonmig[n_objs] = odata;
1021       new_from_proc[n_objs] = stats->from_proc[i];
1022       new_to_proc[n_objs] = stats->to_proc[i];
1023       n_objs ++;
1024     }
1025     else {
1026       stats->procs[stats->from_proc[i]].bg_walltime += odata.wallTime;
1027 #if CMK_LB_CPUTIMER
1028       stats->procs[stats->from_proc[i]].bg_cputime += odata.cpuTime;
1029 #endif
1030     }
1031   }
1032   CmiAssert(stats->n_migrateobjs == n_objs);
1033
1034   stats->makeCommHash();
1035   
1036   CkVec<LDCommData> newCommData;
1037   newCommData.resize(stats->n_comm);
1038   int n_comm = 0;
1039   for (i=0; i<stats->n_comm; i++) 
1040   {
1041     LDCommData& cdata = stats->commData[i];
1042     if (!cdata.from_proc()) 
1043     {
1044       int idx = stats->getSendHash(cdata);
1045       CmiAssert(idx != -1);
1046       if (!stats->objData[idx].migratable) continue;
1047     }
1048     switch (cdata.receiver.get_type()) {
1049     case LD_PROC_MSG:
1050       break;
1051     case LD_OBJ_MSG:  {
1052       int idx = stats->getRecvHash(cdata);
1053       if (stats->complete_flag)
1054         CmiAssert(idx != -1);
1055       else if (idx == -1) continue;          // receiver not in this group
1056       if (!stats->objData[idx].migratable) continue;
1057       break;
1058       }
1059     case LD_OBJLIST_MSG:    // object message FIXME add multicast
1060       break;
1061     }
1062     newCommData[n_comm] = cdata;
1063     n_comm ++;
1064   }
1065
1066   if (n_objs != stats->n_objs) CmiPrintf("Removed %d nonmigratable %d comms - n_objs:%d migratable:%d\n", stats->n_objs-n_objs, stats->n_objs, stats->n_migrateobjs, stats->n_comm-n_comm);
1067
1068   // swap to new data
1069   stats->objData = nonmig;
1070   stats->from_proc = new_from_proc;
1071   stats->to_proc = new_to_proc;
1072   stats->n_objs = n_objs;
1073
1074   stats->commData = newCommData;
1075   stats->n_comm = n_comm;
1076
1077   stats->deleteCommHash();
1078   stats->makeCommHash();
1079
1080 }
1081
1082
1083 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1084 extern int restarted;
1085 #endif
1086
1087 void CentralLB::ReceiveMigration(LBMigrateMsg *m)
1088 {
1089   storedMigrateMsg = m;
1090   CkCallback cb(CkIndex_CentralLB::ProcessReceiveMigration((CkReductionMsg*)NULL),
1091                   thisProxy);
1092   contribute(0, NULL, CkReduction::max_int, cb);
1093
1094   // Reset all adaptive lb related fields since load balancing is being done.
1095   lb_no_iterations = -1;
1096   adaptive_lbdb.history_data.clear();
1097   prev_load = 0.0;
1098   lb_ideal_period = INT_MAX;
1099   local_state = OFF;
1100   lb_period_informed = false;
1101 }
1102
1103 void CentralLB::ProcessReceiveMigration(CkReductionMsg  *msg)
1104 {
1105 #if CMK_LBDB_ON
1106         int i;
1107         LBMigrateMsg *m = storedMigrateMsg;
1108         CmiAssert(m!=NULL);
1109         delete msg;
1110
1111 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1112         int *dummyCounts;
1113
1114         DEBUGF(("[%d] Starting ReceiveMigration WITH step %d m->step %d\n",CkMyPe(),step(),m->step));
1115         // CmiPrintf("[%d] Starting ReceiveMigration step %d m->step %d\n",CkMyPe(),step(),m->step);
1116         if(step() > m->step){
1117                 char str[100];
1118                 envelope *env = UsrToEnv(m);
1119                 CmiPrintf("[%d] Object %s tProcessed %d m->TN %d\n",CmiMyPe(),mlogData->objID.toString(str),mlogData->tProcessed,env->TN);
1120                 return;
1121         }
1122         lbDecisionCount = m->lbDecisionCount;
1123 #endif
1124
1125   if (_lb_args.debug() > 1) 
1126     if (CkMyPe()%1024==0) CmiPrintf("[%d] Starting ReceiveMigration step %d at %f\n",CkMyPe(),step(), CmiWallTimer());
1127
1128   for (i=0; i<CkNumPes(); i++) theLbdb->lastLBInfo.expectedLoad[i] = m->expectedLoad[i];
1129   CmiAssert(migrates_expected <= 0 || migrates_completed == migrates_expected);
1130 /*FAULT_EVAC*/
1131   if(!CmiNodeAlive(CkMyPe())){
1132         delete m;
1133         return;
1134   }
1135   migrates_expected = 0;
1136   future_migrates_expected = 0;
1137 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1138         int sending=0;
1139     int dummy=0;
1140         LBDB *_myLBDB = theLbdb->getLBDB();
1141         if(_restartFlag){
1142         dummyCounts = new int[CmiNumPes()];
1143         bzero(dummyCounts,sizeof(int)*CmiNumPes());
1144     }
1145 #endif
1146   for(i=0; i < m->n_moves; i++) {
1147     MigrateInfo& move = m->moves[i];
1148     const int me = CkMyPe();
1149     if (move.from_pe == me && move.to_pe != me) {
1150       DEBUGF(("[%d] migrating object to %d\n",move.from_pe,move.to_pe));
1151       // migrate object, in case it is already gone, inform toPe
1152 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
1153       if (theLbdb->Migrate(move.obj,move.to_pe) == 0) 
1154          thisProxy[move.to_pe].MissMigrate(!move.async_arrival);
1155 #else
1156             if(_restartFlag == 0){
1157                 DEBUG(CmiPrintf("[%d] need to move object from %d to %d \n",CkMyPe(),move.from_pe,move.to_pe));
1158                 theLbdb->Migrate(move.obj,move.to_pe);
1159                 sending++;
1160             }else{
1161                 if(_myLBDB->validObjHandle(move.obj)){
1162                     DEBUG(CmiPrintf("[%d] need to move object from %d to %d \n",CkMyPe(),move.from_pe,move.to_pe));
1163                     theLbdb->Migrate(move.obj,move.to_pe);
1164                     sending++;
1165                 }else{
1166                     DEBUG(CmiPrintf("[%d] dummy move to pe %d detected after restart \n",CmiMyPe(),move.to_pe));
1167                     dummyCounts[move.to_pe]++;
1168                     dummy++;
1169                 }
1170             }
1171 #endif
1172     } else if (move.from_pe != me && move.to_pe == me) {
1173        DEBUGF(("[%d] expecting object from %d\n",move.to_pe,move.from_pe));
1174       if (!move.async_arrival) migrates_expected++;
1175       else future_migrates_expected++;
1176     }
1177   }
1178   DEBUGF(("[%d] in ReceiveMigration %d moves expected: %d future expected: %d\n",CkMyPe(),m->n_moves, migrates_expected, future_migrates_expected));
1179   // if (_lb_debug) CkPrintf("[%d] expecting %d objects migrating.\n", CkMyPe(), migrates_expected);
1180
1181 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1182         if(_restartFlag){
1183                 sendDummyMigrationCounts(dummyCounts);
1184                 _restartFlag  =0;
1185         delete []dummyCounts;
1186         }
1187 #endif
1188
1189
1190 #if 0
1191   if (m->n_moves ==0) {
1192     theLbdb->SetLBPeriod(theLbdb->GetLBPeriod()*2);
1193   }
1194 #endif
1195   cur_ld_balancer = m->next_lb;
1196   if((CkMyPe() == cur_ld_balancer) && (cur_ld_balancer != 0)){
1197       LBDatabaseObj()->set_avail_vector(m->avail_vector, -2);
1198   }
1199
1200   if (migrates_expected == 0 || migrates_completed == migrates_expected)
1201     MigrationDone(1);
1202   delete m;
1203
1204 //      CkEvacuatedElement();
1205 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1206 //  migrates_expected = 0;
1207 //  //  ResumeClients(1);
1208 #endif
1209 #endif
1210 }
1211
1212 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1213 void CentralLB::ReceiveDummyMigration(int globalDecisionCount){
1214     DEBUGF(("[%d] ReceiveDummyMigration called for step %d with globalDecisionCount %d\n",CkMyPe(),step(),globalDecisionCount));
1215     //TODO: this is gonna be important when a crash happens during checkpoint
1216     //the globalDecisionCount would have to be saved and compared against
1217     //a future recvMigration
1218                 
1219         thisProxy[CkMyPe()].ResumeClients(1);
1220 }
1221 #endif
1222
1223 void CentralLB::MigrationDone(int balancing)
1224 {
1225 #if CMK_LBDB_ON
1226   migrates_completed = 0;
1227   migrates_expected = -1;
1228   // clear load stats
1229   if (balancing) theLbdb->ClearLoads();
1230   // Increment to next step
1231   theLbdb->incStep();
1232         DEBUGF(("[%d] Incrementing Step %d \n",CkMyPe(),step()));
1233   // if sync resume, invoke a barrier
1234
1235 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1236     savedBalancing = balancing;
1237     startLoadBalancingMlog(&resumeCentralLbAfterChkpt,(void *)this);
1238 #endif
1239
1240   LBDatabase::Object()->MigrationDone();    // call registered callbacks
1241
1242   LoadbalanceDone(balancing);        // callback
1243 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
1244   // if sync resume invoke a barrier
1245   if (balancing && _lb_args.syncResume()) {
1246     CkCallback cb(CkIndex_CentralLB::ResumeClients((CkReductionMsg*)NULL), 
1247                   thisProxy);
1248     contribute(0, NULL, CkReduction::sum_int, cb);
1249   }
1250   else{ 
1251     if(CmiNodeAlive(CkMyPe())){
1252         thisProxy [CkMyPe()].ResumeClients(balancing);
1253     }   
1254   }     
1255 #if CMK_GRID_QUEUE_AVAILABLE
1256   CmiGridQueueDeregisterAll ();
1257   CpvAccess(CkGridObject) = NULL;
1258 #endif
1259 #endif 
1260 #endif
1261 }
1262
1263 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1264 void CentralLB::endMigrationDone(int balancing){
1265     DEBUGF(("[%d] CentralLB::endMigrationDone step %d\n",CkMyPe(),step()));
1266
1267
1268   if (balancing && _lb_args.syncResume()) {
1269     CkCallback cb(CkIndex_CentralLB::ResumeClients((CkReductionMsg*)NULL),
1270                   thisProxy);
1271     contribute(0, NULL, CkReduction::sum_int, cb);
1272   }
1273   else{
1274     if(CmiNodeAlive(CkMyPe())){
1275     DEBUGF(("[%d] Sending ResumeClients balancing %d \n",CkMyPe(),balancing));
1276     thisProxy [CkMyPe()].ResumeClients(balancing);
1277     }
1278   }
1279
1280 }
1281 #endif
1282
1283 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1284 void resumeCentralLbAfterChkpt(void *_lb){
1285     CentralLB *lb= (CentralLB *)_lb;
1286     CpvAccess(_currentObj)=lb;
1287     lb->endMigrationDone(lb->savedBalancing);
1288 }
1289 #endif
1290
1291
1292 void CentralLB::ResumeClients(CkReductionMsg *msg)
1293 {
1294   ResumeClients(1);
1295   delete msg;
1296 }
1297
1298 void CentralLB::ResumeClients(int balancing)
1299 {
1300 #if CMK_LBDB_ON
1301 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1302     resumeCount++;
1303     globalResumeCount = resumeCount;
1304 #endif
1305   DEBUGF(("[%d] Resuming clients. balancing:%d.\n",CkMyPe(),balancing));
1306   if (balancing && _lb_args.debug() && CkMyPe() == cur_ld_balancer) {
1307     double end_lb_time = CkWallTimer();
1308   }
1309
1310 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
1311   if (balancing) ComlibNotifyMigrationDone();  
1312 #endif
1313
1314   theLbdb->ResumeClients();
1315   if (balancing)  {
1316
1317     CheckMigrationComplete();
1318     if (future_migrates_expected == 0 || 
1319             future_migrates_expected == future_migrates_completed) {
1320       CheckMigrationComplete();
1321     }
1322   }
1323 #endif
1324 }
1325
1326 /*
1327   migration of objects contains two different kinds:
1328   (1) objects want to make a barrier for migration completion
1329       (waitForBarrier is true)
1330       migrationDone() to finish and resumeClients
1331   (2) objects don't need a barrier
1332   However, next load balancing can only happen when both migrations complete
1333 */ 
1334 void CentralLB::CheckMigrationComplete()
1335 {
1336 #if CMK_LBDB_ON
1337   lbdone ++;
1338   if (lbdone == 2) {
1339     if (_lb_args.debug() && CkMyPe()==0) {
1340       double end_lb_time = CkWallTimer();
1341       CkPrintf("CharmLB> %s: PE [%d] step %d finished at %f duration %f s\n\n",
1342                 lbname, cur_ld_balancer, step()-1, end_lb_time,
1343                 end_lb_time-start_lb_time);
1344     }
1345
1346     lb_migration_cost = (CkWallTimer() - start_lb_time);
1347
1348     lbdone = 0;
1349     future_migrates_expected = -1;
1350     future_migrates_completed = 0;
1351
1352
1353     DEBUGF(("[%d] Migration Complete\n", CkMyPe()));
1354     // release local barrier  so that the next load balancer can go
1355     LDOMHandle h;
1356     h.id.id.idx = 0;
1357     theLbdb->getLBDB()->DoneRegisteringObjects(h);
1358     // switch to the next load balancer in the list
1359     // subtle: called from Migrated() may result in Migrated() called in next LB
1360     theLbdb->nextLoadbalancer(seqno);
1361   }
1362 #endif
1363 }
1364
1365 void CentralLB::preprocess(LDStats* stats)
1366 {
1367   if (_lb_args.ignoreBgLoad())
1368     stats->clearBgLoad();
1369
1370   // Call the predictor for the future
1371   if (_lb_predict) FuturePredictor(statsData);
1372 }
1373
1374 // default load balancing strategy
1375 LBMigrateMsg* CentralLB::Strategy(LDStats* stats)
1376 {
1377 #if CMK_LBDB_ON
1378   double strat_start_time = CkWallTimer();
1379   if (_lb_args.debug())
1380     CkPrintf("CharmLB> %s: PE [%d] strategy starting at %f\n", lbname, cur_ld_balancer, strat_start_time);
1381
1382   work(stats);
1383
1384   if (_lb_args.debug()>1)  {
1385     CkPrintf("CharmLB> Obj Map:\n");
1386     for (int i=0; i<stats->n_objs; i++) CkPrintf("%d ", stats->to_proc[i]);
1387     CkPrintf("\n");
1388   }
1389
1390   LBMigrateMsg *msg = createMigrateMsg(stats);
1391
1392   if (_lb_args.debug()) {
1393     double strat_end_time = CkWallTimer();
1394     envelope *env = UsrToEnv(msg);
1395
1396     double lbdbMemsize = LBDatabase::Object()->useMem()/1000;
1397     CkPrintf("CharmLB> %s: PE [%d] Memory: LBManager: %d KB CentralLB: %d KB\n",
1398               lbname, cur_ld_balancer, (int)lbdbMemsize, (int)(useMem()/1000));
1399     CkPrintf("CharmLB> %s: PE [%d] #Objects migrating: %d, LBMigrateMsg size: %.2f MB\n", lbname, cur_ld_balancer, msg->n_moves, env->getTotalsize()/1024.0/1024.0);
1400     CkPrintf("CharmLB> %s: PE [%d] strategy finished at %f duration %f s\n",
1401               lbname, cur_ld_balancer, strat_end_time, strat_end_time-strat_start_time);
1402     lb_strategy_cost = (strat_end_time - strat_start_time);
1403     CkPrintf("Strategy cost %f %f %f\n", strat_end_time, strat_start_time, lb_strategy_cost);
1404   }
1405   return msg;
1406 #else
1407   return NULL;
1408 #endif
1409 }
1410
1411 void CentralLB::work(LDStats* stats)
1412 {
1413   // does nothing but print the database
1414   stats->print();
1415 }
1416
1417 // generate migrate message from stats->from_proc and to_proc
1418 LBMigrateMsg * CentralLB::createMigrateMsg(LDStats* stats)
1419 {
1420   int i;
1421   CkVec<MigrateInfo*> migrateInfo;
1422   for (i=0; i<stats->n_objs; i++) {
1423     LDObjData &objData = stats->objData[i];
1424     int frompe = stats->from_proc[i];
1425     int tope = stats->to_proc[i];
1426     if (frompe != tope) {
1427       //      CkPrintf("[%d] Obj %d migrating from %d to %d\n",
1428       //         CkMyPe(),obj,pe,dest);
1429       MigrateInfo *migrateMe = new MigrateInfo;
1430       migrateMe->obj = objData.handle;
1431       migrateMe->from_pe = frompe;
1432       migrateMe->to_pe = tope;
1433       migrateMe->async_arrival = objData.asyncArrival;
1434       migrateInfo.insertAtEnd(migrateMe);
1435     }
1436   }
1437
1438   int migrate_count=migrateInfo.length();
1439   LBMigrateMsg* msg = new(migrate_count,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
1440   msg->n_moves = migrate_count;
1441   for(i=0; i < migrate_count; i++) {
1442     MigrateInfo* item = (MigrateInfo*) migrateInfo[i];
1443     msg->moves[i] = *item;
1444     delete item;
1445     migrateInfo[i] = 0;
1446   }
1447   return msg;
1448 }
1449
1450 LBMigrateMsg * CentralLB::extractMigrateMsg(LBMigrateMsg *m, int p)
1451 {
1452   int nmoves = 0;
1453   int nunavail = 0;
1454   int i;
1455   for (i=0; i<m->n_moves; i++) {
1456     MigrateInfo* item = (MigrateInfo*) &m->moves[i];
1457     if (item->from_pe == p || item->to_pe == p) nmoves++;
1458   }
1459   for (i=0; i<CkNumPes();i++) {
1460     if (!m->avail_vector[i]) nunavail++;
1461   }
1462   LBMigrateMsg* msg;
1463   if (nunavail) msg = new(nmoves,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
1464   else msg = new(nmoves,0,0,0) LBMigrateMsg;
1465   msg->n_moves = nmoves;
1466   msg->level = m->level;
1467   msg->next_lb = m->next_lb;
1468   for (i=0,nmoves=0; i<m->n_moves; i++) {
1469     MigrateInfo* item = (MigrateInfo*) &m->moves[i];
1470     if (item->from_pe == p || item->to_pe == p) {
1471       msg->moves[nmoves] = *item;
1472       nmoves++;
1473     }
1474   }
1475   // copy processor data
1476   if (nunavail)
1477   for (i=0; i<CkNumPes();i++) {
1478     msg->avail_vector[i] = m->avail_vector[i];
1479     msg->expectedLoad[i] = m->expectedLoad[i];
1480   }
1481   return msg;
1482 }
1483
1484 void CentralLB::simulationWrite() {
1485   if(step() == LBSimulation::dumpStep)
1486   {
1487     // here we are supposed to dump the database
1488     int dumpFileSize = strlen(LBSimulation::dumpFile) + 4;
1489     char *dumpFileName = (char *)malloc(dumpFileSize);
1490     while (sprintf(dumpFileName, "%s.%d", LBSimulation::dumpFile, LBSimulation::dumpStep) >= dumpFileSize) {
1491       free(dumpFileName);
1492       dumpFileSize+=3;
1493       dumpFileName = (char *)malloc(dumpFileSize);
1494     }
1495     writeStatsMsgs(dumpFileName);
1496     free(dumpFileName);
1497     CmiPrintf("LBDump: Dumped the load balancing data at step %d.\n",LBSimulation::dumpStep);
1498     ++LBSimulation::dumpStep;
1499     --LBSimulation::dumpStepSize;
1500     if (LBSimulation::dumpStepSize <= 0) { // prevent stupid step sizes
1501       CmiPrintf("Charm++> Exiting...\n");
1502       CkExit();
1503     }
1504     return;
1505   }
1506 }
1507
1508 void CentralLB::simulationRead() {
1509   LBSimulation *simResults = NULL, *realResults;
1510   LBMigrateMsg *voidMessage = new (0,0,0,0) LBMigrateMsg();
1511   voidMessage->n_moves=0;
1512   for ( ;LBSimulation::simStepSize > 0; --LBSimulation::simStepSize, ++LBSimulation::simStep) {
1513     // here we are supposed to read the data from the dump database
1514     int simFileSize = strlen(LBSimulation::dumpFile) + 4;
1515     char *simFileName = (char *)malloc(simFileSize);
1516     while (sprintf(simFileName, "%s.%d", LBSimulation::dumpFile, LBSimulation::simStep) >= simFileSize) {
1517       free(simFileName);
1518       simFileSize+=3;
1519       simFileName = (char *)malloc(simFileSize);
1520     }
1521     readStatsMsgs(simFileName);
1522
1523     // allocate simResults (only the first step)
1524     if (simResults == NULL) {
1525       simResults = new LBSimulation(LBSimulation::simProcs);
1526       realResults = new LBSimulation(LBSimulation::simProcs);
1527     }
1528     else {
1529       // should be the same number of procs of the original simulation!
1530       if (!LBSimulation::procsChanged) {
1531         // it means we have a previous step, so in simResults there is data.
1532         // we can now print the real effects of the load balancer during the simulation
1533         // or print the difference between the predicted data and the real one.
1534         realResults->reset();
1535         // reset to_proc of statsData to be equal to from_proc
1536         for (int k=0; k < statsData->n_objs; ++k) statsData->to_proc[k] = statsData->from_proc[k];
1537         findSimResults(statsData, LBSimulation::simProcs, voidMessage, realResults);
1538         simResults->PrintDifferences(realResults,statsData);
1539       }
1540       simResults->reset();
1541     }
1542
1543     // now pass it to the strategy routine
1544     double startT = CkWallTimer();
1545     preprocess(statsData);
1546     CmiPrintf("%s> Strategy starts ... \n", lbname);
1547     LBMigrateMsg* migrateMsg = Strategy(statsData);
1548     CmiPrintf("%s> Strategy took %fs memory usage: CentralLB: %d KB.\n",
1549                lbname, CkWallTimer()-startT, (int)(useMem()/1000));
1550
1551     // now calculate the results of the load balancing simulation
1552     findSimResults(statsData, LBSimulation::simProcs, migrateMsg, simResults);
1553
1554     // now we have the simulation data, so print it and loop
1555     CmiPrintf("Charm++> LBSim: Simulation of load balancing step %d done.\n",LBSimulation::simStep);
1556     // **CWL** Officially recording my disdain here for using ints for bool
1557     if (LBSimulation::showDecisionsOnly) {
1558       simResults->PrintDecisions(migrateMsg, simFileName, 
1559                                  LBSimulation::simProcs);
1560     } else {
1561       simResults->PrintSimulationResults();
1562     }
1563
1564     free(simFileName);
1565     delete migrateMsg;
1566     CmiPrintf("Charm++> LBSim: Passing to the next step\n");
1567   }
1568   // deallocate simResults
1569   delete simResults;
1570   CmiPrintf("Charm++> Exiting...\n");
1571   CkExit();
1572 }
1573
1574 void CentralLB::readStatsMsgs(const char* filename) 
1575 {
1576 #if CMK_LBDB_ON
1577   int i;
1578   FILE *f = fopen(filename, "r");
1579   if (f==NULL) {
1580     CmiPrintf("Fatal Error> Cannot open LB Dump file %s!\n", filename);
1581     CmiAbort("");
1582   }
1583
1584   // at this stage, we need to rebuild the statsMsgList and
1585   // statsDataList structures. For that first deallocate the
1586   // old structures
1587   if (statsMsgsList) {
1588     for(i = 0; i < stats_msg_count; i++)
1589       delete statsMsgsList[i];
1590     delete[] statsMsgsList;
1591     statsMsgsList=0;
1592   }
1593
1594   PUP::fromDisk pd(f);
1595   PUP::machineInfo machInfo;
1596
1597   pd((char *)&machInfo, sizeof(machInfo));      // read machine info
1598   PUP::xlater p(machInfo, pd);
1599
1600   if (_lb_args.lbversion() > 1) {
1601     p|_lb_args.lbversion();             // write version number
1602     CkPrintf("LB> File version detected: %d\n", _lb_args.lbversion());
1603     CmiAssert(_lb_args.lbversion() <= LB_FORMAT_VERSION);
1604   }
1605   p|stats_msg_count;
1606
1607   CmiPrintf("readStatsMsgs for %d pes starts ... \n", stats_msg_count);
1608   if (LBSimulation::simProcs == 0) LBSimulation::simProcs = stats_msg_count;
1609   if (LBSimulation::simProcs != stats_msg_count) LBSimulation::procsChanged = true;
1610
1611   // LBSimulation::simProcs must be set
1612   statsData->pup(p);
1613
1614   CmiPrintf("Simulation for %d pes \n", LBSimulation::simProcs);
1615   CmiPrintf("n_obj: %d n_migratble: %d \n", statsData->n_objs, statsData->n_migrateobjs);
1616
1617   // file f is closed in the destructor of PUP::fromDisk
1618   CmiPrintf("ReadStatsMsg from %s completed\n", filename);
1619 #endif
1620 }
1621
1622 void CentralLB::writeStatsMsgs(const char* filename) 
1623 {
1624 #if CMK_LBDB_ON
1625   FILE *f = fopen(filename, "w");
1626   if (f==NULL) {
1627     CmiPrintf("Fatal Error> writeStatsMsgs failed to open the output file %s!\n", filename);
1628     CmiAbort("");
1629   }
1630
1631   const PUP::machineInfo &machInfo = PUP::machineInfo::current();
1632   PUP::toDisk p(f);
1633   p((char *)&machInfo, sizeof(machInfo));       // machine info
1634
1635   p|_lb_args.lbversion();               // write version number
1636   p|stats_msg_count;
1637   statsData->pup(p);
1638
1639   fclose(f);
1640
1641   CmiPrintf("WriteStatsMsgs to %s succeed!\n", filename);
1642 #endif
1643 }
1644
1645 // calculate the predicted wallclock/cpu load for every processors
1646 // considering communication overhead if considerComm is true
1647 void getPredictedLoadWithMsg(BaseLB::LDStats* stats, int count, 
1648                       LBMigrateMsg *msg, LBInfo &info, 
1649                       int considerComm)
1650 {
1651 #if CMK_LBDB_ON
1652         stats->makeCommHash();
1653
1654         // update to_proc according to migration msgs
1655         for(int i = 0; i < msg->n_moves; i++) {
1656           MigrateInfo &mInfo = msg->moves[i];
1657           int idx = stats->getHash(mInfo.obj.objID(), mInfo.obj.omID());
1658           CmiAssert(idx != -1);
1659           stats->to_proc[idx] = mInfo.to_pe;
1660         }
1661
1662         info.getInfo(stats, count, considerComm);
1663 #endif
1664 }
1665
1666
1667 void CentralLB::findSimResults(LDStats* stats, int count, LBMigrateMsg* msg, LBSimulation* simResults)
1668 {
1669     CkAssert(simResults != NULL && count == simResults->numPes);
1670     // estimate the new loads of the processors. As a first approximation, this is the
1671     // sum of the cpu times of the objects on that processor
1672     double startT = CkWallTimer();
1673     getPredictedLoadWithMsg(stats, count, msg, simResults->lbinfo, 1);
1674     CmiPrintf("getPredictedLoad finished in %fs\n", CkWallTimer()-startT);
1675 }
1676
1677 void CentralLB::pup(PUP::er &p) { 
1678   BaseLB::pup(p); 
1679   if (p.isUnpacking())  {
1680     initLB(CkLBOptions(seqno)); 
1681   }
1682   p|reduction_started;
1683   int has_statsMsg=0;
1684   if (p.isPacking()) has_statsMsg = (statsMsg!=NULL);
1685   p|has_statsMsg;
1686   if (has_statsMsg) {
1687     if (p.isUnpacking())
1688       statsMsg = new CLBStatsMsg;
1689     statsMsg->pup(p);
1690   }
1691   p|lb_ideal_period;
1692 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1693   p | lbDecisionCount;
1694   p | resumeCount;
1695 #endif
1696         
1697 }
1698
1699 int CentralLB::useMem() { 
1700   return sizeof(CentralLB) + statsData->useMem() + 
1701          CkNumPes() * sizeof(CLBStatsMsg *);
1702 }
1703
1704
1705 /**
1706   CLBStatsMsg is not a real message now.
1707   CLBStatsMsg is used for all processors to fill in their local load and comm
1708   statistics and send to processor 0
1709 */
1710
1711 CLBStatsMsg::CLBStatsMsg(int osz, int csz) {
1712   n_objs = osz;
1713   n_comm = csz;
1714   objData = new LDObjData[osz];
1715   commData = new LDCommData[csz];
1716   avail_vector = NULL;
1717 }
1718
1719 CLBStatsMsg::~CLBStatsMsg() {
1720   delete [] objData;
1721   delete [] commData;
1722   delete [] avail_vector;
1723 }
1724
1725 void CLBStatsMsg::pup(PUP::er &p) {
1726   int i;
1727   p|from_pe;
1728   p|pe_speed;
1729   p|total_walltime;
1730   p|idletime;
1731   p|bg_walltime;
1732 #if CMK_LB_CPUTIMER
1733   p|total_cputime;
1734   p|bg_cputime;
1735 #endif
1736 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1737   p | step;
1738 #endif
1739   p|n_objs;
1740   if (p.isUnpacking()) objData = new LDObjData[n_objs];
1741   for (i=0; i<n_objs; i++) p|objData[i];
1742   p|n_comm;
1743   if (p.isUnpacking()) commData = new LDCommData[n_comm];
1744   for (i=0; i<n_comm; i++) p|commData[i];
1745
1746   int has_avail_vector;
1747   if (!p.isUnpacking()) has_avail_vector = (avail_vector != NULL);
1748   p|has_avail_vector;
1749   if (p.isUnpacking()) {
1750     if (has_avail_vector) avail_vector = new char[CkNumPes()];
1751     else avail_vector = NULL;
1752   }
1753   if (has_avail_vector) p(avail_vector, CkNumPes());
1754
1755   p(next_lb);
1756 }
1757
1758 // CkMarshalledCLBStatsMessage is used in the marshalled parameter in
1759 // the entry function, it is just used to use to pup.
1760 // I don't use CLBStatsMsg directly as marshalled parameter because
1761 // I want the data pointer stored and not to be freed by the Charm++.
1762 void CkMarshalledCLBStatsMessage::free() { 
1763   int count = msgs.size();
1764   for  (int i=0; i<count; i++) {
1765     delete msgs[i];
1766     msgs[i] = NULL;
1767   }
1768   msgs.free();
1769 }
1770
1771 void CkMarshalledCLBStatsMessage::add(CkMarshalledCLBStatsMessage &m)
1772 {
1773   int count = m.getCount();
1774   for (int i=0; i<count; i++) add(m.getMessage(i));
1775 }
1776
1777 void CkMarshalledCLBStatsMessage::pup(PUP::er &p)
1778 {
1779   int count = msgs.size();
1780   p|count;
1781   for (int i=0; i<count; i++) {
1782     CLBStatsMsg *msg;
1783     if (p.isUnpacking()) msg = new CLBStatsMsg;
1784     else { 
1785       msg = msgs[i]; CmiAssert(msg!=NULL);
1786     }
1787     msg->pup(p);
1788     if (p.isUnpacking()) add(msg);
1789   }
1790 }
1791
1792 SpanningTree::SpanningTree()
1793 {
1794         double sq = sqrt(CkNumPes()*4.0-3.0) - 1; // 1 + arity + arity*arity = CkNumPes()
1795         arity = (int)ceil(sq/2);
1796         calcParent(CkMyPe());
1797         calcNumChildren(CkMyPe());
1798 }
1799
1800 void SpanningTree::calcParent(int n)
1801 {
1802         parent=-1;
1803         if(n != 0  && arity > 0)
1804                 parent = (n-1)/arity;
1805 }
1806
1807 void SpanningTree::calcNumChildren(int n)
1808 {
1809         numChildren = 0;
1810         if (arity == 0) return;
1811         int fullNode=(CkNumPes()-1-arity)/arity;
1812         if(n <= fullNode)
1813                 numChildren = arity;
1814         if(n == fullNode+1)
1815                 numChildren = CkNumPes()-1-(fullNode+1)*arity;
1816         if(n > fullNode+1)
1817                 numChildren = 0;
1818 }
1819
1820 #include "CentralLB.def.h"
1821  
1822 /*@}*/