fixing bug in the case of pause
[charm.git] / src / ck-ldb / CentralLB.C
1
2 /**
3  * \addtogroup CkLdb
4 */
5 /*@{*/
6
7 #include <charm++.h>
8 #include "ck.h"
9 #include "envelope.h"
10 #include "CentralLB.h"
11 #include "LBDBManager.h"
12 #include "LBSimulation.h"
13
14 #include "limits.h"
15 #include <vector>
16
17 #define  DEBUGF(x)       // CmiPrintf x;
18 #define  DEBUG(x)        // x;
19
20 #if CMK_MEM_CHECKPOINT
21    /* can not handle reduction in inmem FT */
22 #define USE_REDUCTION         0
23 #define USE_LDB_SPANNING_TREE 0
24 #elif defined(_FAULT_MLOG_)
25 /* can not handle reduction in inmem FT */
26 #define USE_REDUCTION         0
27 #define USE_LDB_SPANNING_TREE 0
28 #else
29 #define USE_REDUCTION         1
30 #define USE_LDB_SPANNING_TREE 1
31 #endif
32
33 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
34 extern int _restartFlag;
35 extern void getGlobalStep(CkGroupID );
36 extern void initMlogLBStep(CkGroupID );
37 extern int globalResumeCount;
38 extern void sendDummyMigrationCounts(int *);
39 #endif
40
41 #if CMK_GRID_QUEUE_AVAILABLE
42 CpvExtern(void *, CkGridObject);
43 #endif
44
45 CkGroupID loadbalancer;
46 int * lb_ptr;
47 int load_balancer_created;
48
49 struct AdaptiveData {
50   int iteration;
51   double max_load;
52   double avg_load;
53 };
54
55 struct AdaptiveLBDatabase {
56   std::vector<AdaptiveData> history_data;
57 } adaptive_lbdb;
58
59 enum state {
60   OFF,
61   ON,
62   PAUSE,
63   DECIDED,
64   LOAD_BALANCE
65 } local_state;
66
67 struct AdaptiveLBStructure {
68   int lb_ideal_period;
69   int lb_calculated_period;
70   int lb_no_iterations;
71   int global_max_iter_no;
72   int global_recv_iter_counter;
73   bool in_progress;
74   double prev_load;
75   double lb_strategy_cost;
76   double lb_migration_cost;
77   bool lb_period_informed;
78   int lb_msg_send_no;
79   int lb_msg_recv_no;
80 } adaptive_struct;
81
82 CreateLBFunc_Def(CentralLB, "CentralLB base class")
83
84 static void getPredictedLoadWithMsg(BaseLB::LDStats* stats, int count, 
85                              LBMigrateMsg *, LBInfo &info, int considerComm);
86
87 CkReductionMsg* lbDataCollection(int nMsg, CkReductionMsg** msgs) {
88   double lb_data[4];
89   lb_data[0] = 0;
90   lb_data[1] = 0;
91   lb_data[2] = 0;
92   for (int i = 0; i < nMsg; i++) {
93     CkAssert(msgs[i]->getSize() == 4*sizeof(double));
94     double* m = (double *)msgs[i]->getData();
95     lb_data[0] += m[0];
96     lb_data[1] = ((m[1] > lb_data[1])? m[1] : lb_data[1]);
97     lb_data[2] += m[2];
98     if (i == 0) {
99       lb_data[3] = m[3];
100     }
101   }
102   return CkReductionMsg::buildNew(4*sizeof(double), lb_data);
103 }
104
105 /*global*/ CkReduction::reducerType lbDataCollectionType;
106 /*initcall*/ void registerLBDataCollection(void) {
107   lbDataCollectionType = CkReduction::addReducer(lbDataCollection);
108 }
109
110 /*
111 void CreateCentralLB()
112 {
113   CProxy_CentralLB::ckNew(0);
114 }
115 */
116
117 void CentralLB::staticStartLB(void* data)
118 {
119   CentralLB *me = (CentralLB*)(data);
120   me->StartLB();
121 }
122
123 void CentralLB::staticMigrated(void* data, LDObjHandle h, int waitBarrier)
124 {
125   CentralLB *me = (CentralLB*)(data);
126   me->Migrated(h, waitBarrier);
127 }
128
129 void CentralLB::staticAtSync(void* data)
130 {
131   CentralLB *me = (CentralLB*)(data);
132   me->AtSync();
133 }
134
135 void CentralLB::initLB(const CkLBOptions &opt)
136 {
137 #if CMK_LBDB_ON
138   lbname = "CentralLB";
139   thisProxy = CProxy_CentralLB(thisgroup);
140   //  CkPrintf("Construct in %d\n",CkMyPe());
141
142   // create and turn on by default
143   receiver = theLbdb->
144     AddLocalBarrierReceiver((LDBarrierFn)(staticAtSync),(void*)(this));
145   notifier = theLbdb->getLBDB()->
146     NotifyMigrated((LDMigratedFn)(staticMigrated),(void*)(this));
147   startLbFnHdl = theLbdb->getLBDB()->
148     AddStartLBFn((LDStartLBFn)(staticStartLB),(void*)(this));
149
150   // CkPrintf("[%d] CentralLB initLB \n",CkMyPe());
151   if (opt.getSeqNo() > 0) turnOff();
152
153   stats_msg_count = 0;
154   statsMsgsList = NULL;
155   statsData = NULL;
156
157   storedMigrateMsg = NULL;
158   reduction_started = 0;
159
160   // for future predictor
161   if (_lb_predict) predicted_model = new FutureModel(_lb_predict_window);
162   else predicted_model=0;
163   // register user interface callbacks
164   theLbdb->getLBDB()->SetupPredictor((LDPredictModelFn)(staticPredictorOn),(LDPredictWindowFn)(staticPredictorOnWin),(LDPredictFn)(staticPredictorOff),(LDPredictModelFn)(staticChangePredictor),(void*)(this));
165
166   myspeed = theLbdb->ProcessorSpeed();
167
168   migrates_completed = 0;
169   future_migrates_completed = 0;
170   migrates_expected = -1;
171   future_migrates_expected = -1;
172   cur_ld_balancer = _lb_args.central_pe();      // 0 default
173   lbdone = 0;
174   count_msgs=0;
175   statsMsg = NULL;
176
177   if (_lb_args.statsOn()) theLbdb->CollectStatsOn();
178
179   load_balancer_created = 1;
180
181   // If metabalancer enabled, initialize the variables
182   adaptive_struct.lb_ideal_period =  INT_MAX;
183   adaptive_struct.lb_calculated_period = INT_MAX;
184   adaptive_struct.lb_no_iterations = -1;
185   adaptive_struct.global_max_iter_no = 0;
186   adaptive_struct.global_recv_iter_counter = 0;
187   adaptive_struct.in_progress = false;
188   adaptive_struct.prev_load = 0.0;
189   adaptive_struct.lb_strategy_cost = 0.0;
190   adaptive_struct.lb_migration_cost = 0.0;
191   adaptive_struct.lb_msg_send_no = 0;
192   adaptive_struct.lb_msg_recv_no = 0;
193   local_state = OFF;
194 #endif
195 }
196
197 CentralLB::~CentralLB()
198 {
199 #if CMK_LBDB_ON
200   delete [] statsMsgsList;
201   delete statsData;
202   theLbdb = CProxy_LBDatabase(_lbdb).ckLocalBranch();
203   if (theLbdb) {
204     theLbdb->getLBDB()->
205       RemoveNotifyMigrated(notifier);
206     theLbdb->
207       RemoveStartLBFn((LDStartLBFn)(staticStartLB));
208   }
209 #endif
210 }
211
212 void CentralLB::turnOn() 
213 {
214 #if CMK_LBDB_ON
215   theLbdb->getLBDB()->
216     TurnOnBarrierReceiver(receiver);
217   theLbdb->getLBDB()->
218     TurnOnNotifyMigrated(notifier);
219   theLbdb->getLBDB()->
220     TurnOnStartLBFn(startLbFnHdl);
221 #endif
222 }
223
224 void CentralLB::turnOff() 
225 {
226 #if CMK_LBDB_ON
227   theLbdb->getLBDB()->
228     TurnOffBarrierReceiver(receiver);
229   theLbdb->getLBDB()->
230     TurnOffNotifyMigrated(notifier);
231   theLbdb->getLBDB()->
232     TurnOffStartLBFn(startLbFnHdl);
233 #endif
234 }
235
236 void CentralLB::AtSync()
237 {
238 //  CkPrintf("AtSync CEntral LB [%d]\n", CkMyPe());
239 #if CMK_LBDB_ON
240 //  DEBUGF(("[%d] CentralLB AtSync step %d!!!!!\n",CkMyPe(),step()));
241
242 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
243         CpvAccess(_currentObj)=this;
244 #endif
245
246   // if num of processor is only 1, nothing should happen
247   if (!QueryBalanceNow(step()) || CkNumPes() == 1) {
248     MigrationDone(0);
249     return;
250   }
251   if(CmiNodeAlive(CkMyPe())){
252     thisProxy [CkMyPe()].ProcessAtSyncMin();
253   }
254 #endif
255 }
256
257 #include "ComlibStrategy.h"
258
259 void CentralLB::ProcessAtSync()
260 {
261
262
263
264 #if CMK_LBDB_ON
265   if (reduction_started) return;              // reducton in progress
266
267   CmiAssert(CmiNodeAlive(CkMyPe()));
268   if (CkMyPe() == cur_ld_balancer) {
269     start_lb_time = CkWallTimer();
270   }
271
272
273 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
274         initMlogLBStep(thisgroup);
275 #endif
276
277   // build message
278   BuildStatsMsg();
279
280 #if USE_REDUCTION
281     // reduction to get total number of objects and comm
282     // so that processor 0 can pre-allocate load balancing database
283   int counts[2];
284   counts[0] = theLbdb->GetObjDataSz();
285   counts[1] = theLbdb->GetCommDataSz();
286
287   CkCallback cb(CkIndex_CentralLB::ReceiveCounts((CkReductionMsg*)NULL), 
288                   thisProxy[0]);
289   contribute(2*sizeof(int), counts, CkReduction::sum_int, cb);
290   reduction_started = 1;
291 #else
292   SendStats();
293 #endif
294 #endif
295 }
296
297 void CentralLB::ProcessAtSyncMin()
298 {
299 #if CMK_LBDB_ON
300   if (reduction_started) return;              // reducton in progress
301
302   CmiAssert(CmiNodeAlive(CkMyPe()));
303   if (CkMyPe() == cur_ld_balancer) {
304     start_lb_time = CkWallTimer();
305   }
306
307
308 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
309         initMlogLBStep(thisgroup);
310 #endif
311   
312   adaptive_struct.lb_no_iterations++;
313  // CkPrintf("[%d] ProcessAtSyncMin lb_iteration [%d] adaptive_struct.lb_ideal_period [%d]\n", CkMyPe(),
314  //     adaptive_struct.lb_no_iterations, adaptive_struct.lb_ideal_period);
315
316   // If decision has been made and has reached the lb_period, then do load
317   // balancing, else if hasn't reached ideal_period, then resume.
318   if (local_state == DECIDED) {
319     if (adaptive_struct.lb_no_iterations < adaptive_struct.lb_ideal_period) {
320  //     CkPrintf("[%d] Decision is made but lagging\n", CkMyPe());
321       SendMinStats();
322       ResumeClients(0);
323     } else {
324       local_state = LOAD_BALANCE;
325  //     CkPrintf("[%d] Decision is made and do LB\n", CkMyPe());
326       ProcessAtSync();
327     }
328     return;
329   }
330    
331   // If the state is ON and not DECIDED, then if havn't reached lb_period, then
332   // move ahead. If has reached lb_ideal_period, then change state to PAUSE and
333   // dont resume client.
334   if (local_state == ON) {
335     if (adaptive_struct.lb_no_iterations < adaptive_struct.lb_ideal_period) {
336       SendMinStats();
337       ResumeClients(0);
338     } else {
339       local_state = PAUSE;
340     }
341     return;
342   }
343
344   SendMinStats();
345   ResumeClients(0);
346 #endif
347 }
348
349 void CentralLB::SendMinStats() {
350
351  double total_load;
352  double idle_time;
353  double bg_walltime;
354   theLbdb->GetTime(&total_load,&total_load, &idle_time, &bg_walltime, &bg_walltime);
355  // CkPrintf("Total walltime [%d] %lf: %lf: %lf final laod: %lf\n", CkMyPe(), total_load, idle_time, bg_walltime, (total_load - idle_time));
356
357   // Since the total_load is cumulative since the last load balancing stage,
358   // Hence it is subtracted from the previous load.
359   total_load -= idle_time;
360   double tmp = total_load;
361   total_load -= adaptive_struct.prev_load;
362   adaptive_struct.prev_load = tmp; 
363
364   double lb_data[4];
365   lb_data[0] = total_load;
366   lb_data[1] = total_load;
367   lb_data[2] = 1;
368   lb_data[3] = adaptive_struct.lb_no_iterations;
369   CkPrintf("[%d] sends total load %lf at iter %d\n", CkMyPe(), total_load, adaptive_struct.lb_no_iterations);
370
371   if (adaptive_struct.lb_no_iterations != 0) {
372     CkCallback cb(CkIndex_CentralLB::ReceiveMinStats((CkReductionMsg*)NULL), 
373         thisProxy[0]);
374     contribute(4*sizeof(double), lb_data, lbDataCollectionType, cb);
375   }
376
377 //    int tmp1 = adaptive_struct.lb_no_iterations;
378 //    CkPrintf("[%d] contribution iteration_no: %d\n",CkMyPe(), tmp1);
379 //    // Send the current iteration no
380 //    CkCallback cb1(CkIndex_CentralLB::ReceiveIterationNo((CkReductionMsg*)NULL), 
381 //        thisProxy[0]);
382 //    contribute(sizeof(int), &tmp1, CkReduction::max_int, cb1);
383 }
384
385 void CentralLB::ReceiveMinStats(CkReductionMsg *msg) {
386   CmiAssert(CkMyPe() == 0);
387   double* load = (LBRealType *) msg->getData();
388   double max = load[1];
389   double avg = load[0]/load[2];
390   int iteration_n = load[3];
391   CkPrintf("Iteration %d Total load : %lf Avg load: %lf Max load: %lf for %lf procs\n",iteration_n, load[0], load[0]/load[2], load[1], load[2]);
392   CkPrintf("Current calculated period %d\n", adaptive_struct.lb_calculated_period);
393   delete msg;
394
395   // Store the data for this iteration
396   AdaptiveData data;
397   data.iteration = adaptive_struct.lb_no_iterations;
398   data.max_load = max;
399   data.avg_load = avg;
400   adaptive_lbdb.history_data.push_back(data);
401
402   // If lb period inform is in progress, dont inform again
403   if (adaptive_struct.in_progress) {
404     return;
405   }
406
407 //  if (adaptive_struct.lb_period_informed) {
408 //    return;
409 //  }
410
411   // If the max/avg ratio is greater than the threshold and also this is not the
412   // step immediately after load balancing, carry out load balancing
413   //if (max/avg >= 1.1 && adaptive_lbdb.history_data.size() > 4) {
414   if (max/avg >= 1.5 && adaptive_lbdb.history_data.size() > 4) {
415     CkPrintf("Carry out load balancing step at iter max/avg(%lf) > 1.1\n", max/avg);
416 //    if (!adaptive_struct.lb_period_informed) {
417 //      // Just for testing
418 //      adaptive_struct.lb_calculated_period = 40;
419 //      adaptive_struct.lb_period_informed = true;
420 //      thisProxy.LoadBalanceDecision(adaptive_struct.lb_calculated_period);
421 //      return;
422 //    }
423
424     // If the new lb period is less than current set lb period
425     if (adaptive_struct.lb_calculated_period > iteration_n + 1) {
426       adaptive_struct.lb_calculated_period = iteration_n + 1;
427       adaptive_struct.lb_period_informed = true;
428       adaptive_struct.in_progress = true;
429       thisProxy.LoadBalanceDecision(adaptive_struct.lb_msg_send_no++, adaptive_struct.lb_calculated_period);
430     }
431     return;
432   }
433
434   // Generate the plan for the adaptive strategy
435   int period;
436   if (generatePlan(period)) {
437     //CkPrintf("Carry out load balancing step at iter\n");
438
439     // If the new lb period is less than current set lb period
440     if (adaptive_struct.lb_calculated_period > period) {
441       adaptive_struct.lb_calculated_period = period;
442       adaptive_struct.in_progress = true;
443       adaptive_struct.lb_period_informed = true;
444       thisProxy.LoadBalanceDecision(adaptive_struct.lb_msg_send_no++, adaptive_struct.lb_calculated_period);
445     }
446   }
447 }
448
449 bool CentralLB::generatePlan(int& period) {
450   if (adaptive_lbdb.history_data.size() <= 8) {
451     return false;
452   }
453
454   // Some heuristics for lbperiod
455   // If constant load or almost constant,
456   // then max * new_lb_period > avg * new_lb_period + lb_cost
457   double max = 0.0;
458   double avg = 0.0;
459   AdaptiveData data;
460   for (int i = 0; i < adaptive_lbdb.history_data.size(); i++) {
461     data = adaptive_lbdb.history_data[i];
462     max += data.max_load;
463     avg += data.avg_load;
464     CkPrintf("max (%d, %lf) avg (%d, %lf)\n", i, data.max_load, i, data.avg_load);
465   }
466 //  max /= (adaptive_struct.lb_no_iterations - adaptive_lbdb.history_data[0].iteration);
467 //  avg /= (adaptive_struct.lb_no_iterations - adaptive_lbdb.history_data[0].iteration);
468 //
469 //  adaptive_struct.lb_ideal_period = (adaptive_struct.lb_strategy_cost +
470 //  adaptive_struct.lb_migration_cost) / (max - avg);
471 //  CkPrintf("max : %lf, avg: %lf, strat cost: %lf, migration_cost: %lf, idealperiod : %d \n",
472 //      max, avg, adaptive_struct.lb_strategy_cost, adaptive_struct.lb_migration_cost, adaptive_struct.lb_ideal_period);
473 //
474   // If linearly varying load, then find lb_period
475   // area between the max and avg curve 
476   double mslope, aslope, mc, ac;
477   getLineEq(aslope, ac, mslope, mc);
478   CkPrintf("\n max: %fx + %f; avg: %fx + %f\n", mslope, mc, aslope, ac);
479   double a = (mslope - aslope)/2;
480   double b = (mc - ac);
481   double c = -(adaptive_struct.lb_strategy_cost + adaptive_struct.lb_migration_cost);
482   //c = -2.5;
483   bool got_period = getPeriodForLinear(a, b, c, period);
484   if (!got_period) {
485     return false;
486   }
487   
488   if (mslope < 0) {
489     if (period > (-mc/mslope)) {
490       CkPrintf("Max < 0 Period set when max load is -ve\n");
491       return false;
492     }
493   }
494
495   if (aslope < 0) {
496     if (period > (-ac/aslope)) {
497       CkPrintf("Avg < 0 Period set when avg load is -ve\n");
498       return false;
499     }
500   }
501
502   if (period > ((mc - ac)/(aslope - mslope))) {
503     CkPrintf("Avg | Max Period set when curves intersect\n");
504     return false;
505   }
506   return true;
507 }
508
509 bool CentralLB::getPeriodForLinear(double a, double b, double c, int& period) {
510   CkPrintf("Quadratic Equation %lf X^2 + %lf X + %lf\n", a, b, c);
511   if (a == 0.0) {
512     period = (-c / b);
513     CkPrintf("Ideal period for linear load %d\n", period);
514     return true;
515   }
516   int x;
517   double t = (b * b) - (4*a*c);
518   if (t < 0) {
519     CkPrintf("(b * b) - (4*a*c) is -ve sqrt : %lf\n", sqrt(t));
520     return false;
521   }
522   t = (-b + sqrt(t)) / (2*a);
523   x = t;
524   if (x < 0) {
525     CkPrintf("boo!!! x (%d) < 0\n", x);
526     x = 0;
527     return false;
528   }
529   period = x;
530   CkPrintf("Ideal period for linear load %d\n", period);
531   return true;
532 }
533
534 bool CentralLB::getLineEq(double& aslope, double& ac, double& mslope, double& mc) {
535   int total = adaptive_lbdb.history_data.size();
536   int iterations = 1 + adaptive_lbdb.history_data[total - 1].iteration -
537       adaptive_lbdb.history_data[0].iteration;
538   double a1 = 0;
539   double m1 = 0;
540   double a2 = 0;
541   double m2 = 0;
542   AdaptiveData data;
543   int i = 0;
544   for (i = 0; i <= total/2; i++) {
545     data = adaptive_lbdb.history_data[i];
546     m1 += data.max_load;
547     a1 += data.avg_load;
548   }
549   m1 /= i;
550   a1 /= i;
551
552   for (i = total/2; i < total; i++) {
553     data = adaptive_lbdb.history_data[i];
554     m2 += data.max_load;
555     a2 += data.avg_load;
556   }
557   m2 /= (i - total/2);
558   a2 /= (i - total/2);
559
560   aslope = 2 * (a2 - a1) / iterations;
561   mslope = 2 * (m2 - m1) / iterations;
562   ac = adaptive_lbdb.history_data[0].avg_load;
563   mc = adaptive_lbdb.history_data[0].max_load;
564   return true;
565 }
566
567 void CentralLB::LoadBalanceDecision(int req_no, int period) {
568   if (req_no < adaptive_struct.lb_msg_recv_no) {
569     return;
570   }
571   CkPrintf("[%d] Load balance decision made cur iteration: %d period:%d state: %d\n",CkMyPe(), adaptive_struct.lb_no_iterations, period, local_state);
572   adaptive_struct.lb_ideal_period = period;
573
574   if (local_state == ON) {
575     CkPrintf("lb will happen at %d\n", adaptive_struct.lb_ideal_period);
576     local_state = DECIDED;
577     return;
578   }
579
580   // If the state is PAUSE, then its waiting for the final decision from central
581   // processor. If the decision is that the ideal period is in the future,
582   // resume. If the ideal period is now, then carry out load balancing.
583   if (local_state == PAUSE) {
584     if (adaptive_struct.lb_no_iterations < adaptive_struct.lb_ideal_period) {
585       local_state = DECIDED;
586       SendMinStats();
587       ResumeClients(0);
588     } else {
589       local_state = LOAD_BALANCE;
590       ProcessAtSync();
591     }
592     return;
593   }
594
595 //  if (local_state == OFF) {
596     local_state = ON;
597     adaptive_struct.lb_msg_recv_no = req_no;
598     thisProxy[0].ReceiveIterationNo(req_no, adaptive_struct.lb_no_iterations);
599 //    return;
600 //  }
601 }
602
603 void CentralLB::ReceiveIterationNo(int req_no, int local_iter_no) {
604   CmiAssert(CkMyPe() == 0);
605
606   adaptive_struct.global_recv_iter_counter++;
607   if (local_iter_no > adaptive_struct.global_max_iter_no) {
608     adaptive_struct.global_max_iter_no = local_iter_no;
609   }
610   if (CkNumPes() == adaptive_struct.global_recv_iter_counter) {
611     adaptive_struct.lb_ideal_period = (adaptive_struct.lb_ideal_period > adaptive_struct.global_max_iter_no) ? adaptive_struct.lb_ideal_period : adaptive_struct.global_max_iter_no + 1;
612     thisProxy.LoadBalanceDecision(req_no, adaptive_struct.lb_ideal_period);
613     adaptive_struct.in_progress = false;
614     adaptive_struct.global_max_iter_no = 0;
615     adaptive_struct.global_recv_iter_counter = 0;
616   }
617 }
618
619 // called only on 0
620 void CentralLB::ReceiveCounts(CkReductionMsg  *msg)
621 {
622   CmiAssert(CkMyPe() == 0);
623   if (statsData == NULL) statsData = new LDStats;
624
625   int *counts = (int *)msg->getData();
626   int n_objs = counts[0];
627   int n_comm = counts[1];
628
629     // resize database
630   statsData->objData.resize(n_objs);
631   statsData->from_proc.resize(n_objs);
632   statsData->to_proc.resize(n_objs);
633   statsData->commData.resize(n_comm);
634
635   DEBUGF(("[%d] ReceiveCounts: n_objs:%d n_comm:%d\n",CkMyPe(), n_objs, n_comm));
636         
637     // broadcast call to let everybody start to send stats
638   thisProxy.SendStats();
639 }
640
641 void CentralLB::BuildStatsMsg()
642 {
643 #if CMK_LBDB_ON
644   // build and send stats
645   const int osz = theLbdb->GetObjDataSz();
646   const int csz = theLbdb->GetCommDataSz();
647
648   int npes = CkNumPes();
649   CLBStatsMsg* msg = new CLBStatsMsg(osz, csz);
650   _MEMCHECK(msg);
651   msg->from_pe = CkMyPe();
652 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
653         msg->step = step();
654 #endif
655   //msg->serial = CrnRand();
656
657 /*
658   theLbdb->TotalTime(&msg->total_walltime,&msg->total_cputime);
659   theLbdb->IdleTime(&msg->idletime);
660   theLbdb->BackgroundLoad(&msg->bg_walltime,&msg->bg_cputime);
661 */
662 #if CMK_LB_CPUTIMER
663   theLbdb->GetTime(&msg->total_walltime,&msg->total_cputime,
664                    &msg->idletime, &msg->bg_walltime,&msg->bg_cputime);
665 #else
666   theLbdb->GetTime(&msg->total_walltime,&msg->total_walltime,
667                    &msg->idletime, &msg->bg_walltime,&msg->bg_walltime);
668 #endif
669
670   msg->pe_speed = myspeed;
671   DEBUGF(("Processor %d Total time (wall,cpu) = %f %f Idle = %f Bg = %f %f\n", CkMyPe(),msg->total_walltime,msg->total_cputime,msg->idletime,msg->bg_walltime,msg->bg_cputime));
672
673   msg->n_objs = osz;
674   theLbdb->GetObjData(msg->objData);
675   msg->n_comm = csz;
676   theLbdb->GetCommData(msg->commData);
677 //  theLbdb->ClearLoads();
678   DEBUGF(("PE %d BuildStatsMsg %d objs, %d comm\n",CkMyPe(),msg->n_objs,msg->n_comm));
679
680   if(CkMyPe() == cur_ld_balancer) {
681     msg->avail_vector = new char[CkNumPes()];
682     LBDatabaseObj()->get_avail_vector(msg->avail_vector);
683     msg->next_lb = LBDatabaseObj()->new_lbbalancer();
684   }
685
686   CmiAssert(statsMsg == NULL);
687   statsMsg = msg;
688 #endif
689 }
690
691
692 // called on every processor
693 void CentralLB::SendStats()
694 {
695 #if CMK_LBDB_ON
696   CmiAssert(statsMsg != NULL);
697   reduction_started = 0;
698
699 #if USE_LDB_SPANNING_TREE
700   if(CkNumPes()>1024)
701   {
702     if (CkMyPe() == cur_ld_balancer)
703       thisProxy[CkMyPe()].ReceiveStats(statsMsg);
704     else
705       thisProxy[CkMyPe()].ReceiveStatsViaTree(statsMsg);
706   }
707   else
708 #endif
709   {
710     DEBUGF(("[%d] calling ReceiveStats on step %d \n",CmiMyPe(),step()));
711     thisProxy[cur_ld_balancer].ReceiveStats(statsMsg);
712   }
713
714   statsMsg = NULL;
715
716 #ifdef __BIGSIM__
717   BgEndStreaming();
718 #endif
719
720   {
721   // enfore the barrier to wait until centralLB says no
722   LDOMHandle h;
723   h.id.id.idx = 0;
724   theLbdb->getLBDB()->RegisteringObjects(h);
725   }
726 #endif
727 }
728
729 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
730 extern int donotCountMigration;
731 #endif
732
733 void CentralLB::Migrated(LDObjHandle h, int waitBarrier)
734 {
735 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
736     if(donotCountMigration){
737         return ;
738     }
739 #endif
740
741 #if CMK_LBDB_ON
742   if (waitBarrier) {
743             migrates_completed++;
744       DEBUGF(("[%d] An object migrated! %d %d\n",CkMyPe(),migrates_completed,migrates_expected));
745     if (migrates_completed == migrates_expected) {
746       MigrationDone(1);
747     }
748   }
749   else {
750     future_migrates_completed ++;
751     DEBUGF(("[%d] An object migrated with no barrier! %d expected: %d\n",CkMyPe(),future_migrates_completed,future_migrates_expected));
752     if (future_migrates_completed == future_migrates_expected)  {
753         CheckMigrationComplete();
754     }
755   }
756 #endif
757 }
758
759 void CentralLB::MissMigrate(int waitForBarrier)
760 {
761   LDObjHandle h;
762   Migrated(h, waitForBarrier);
763 }
764
765 // build a complete data from bufferred messages
766 // not used when USE_REDUCTION = 1
767 void CentralLB::buildStats()
768 {
769     statsData->nprocs() = stats_msg_count;
770     // allocate space
771     statsData->objData.resize(statsData->n_objs);
772     statsData->from_proc.resize(statsData->n_objs);
773     statsData->to_proc.resize(statsData->n_objs);
774     statsData->commData.resize(statsData->n_comm);
775
776     int nobj = 0;
777     int ncom = 0;
778     int nmigobj = 0;
779     // copy all data in individule message to this big structure
780     for (int pe=0; pe<CkNumPes(); pe++) {
781        int i;
782        CLBStatsMsg *msg = statsMsgsList[pe];
783        if(msg == NULL) continue;
784        for (i=0; i<msg->n_objs; i++) {
785          statsData->from_proc[nobj] = statsData->to_proc[nobj] = pe;
786          statsData->objData[nobj] = msg->objData[i];
787          if (msg->objData[i].migratable) nmigobj++;
788          nobj++;
789        }
790        for (i=0; i<msg->n_comm; i++) {
791          statsData->commData[ncom] = msg->commData[i];
792          ncom++;
793        }
794        // free the memory
795        delete msg;
796        statsMsgsList[pe]=0;
797     }
798     statsData->n_migrateobjs = nmigobj;
799 }
800
801 // deposit one processor data at a time, note database is pre-allocated
802 // to have enough space
803 // used when USE_REDUCTION = 1
804 void CentralLB::depositData(CLBStatsMsg *m)
805 {
806   int i;
807   if (m == NULL) return;
808
809   const int pe = m->from_pe;
810   struct ProcStats &procStat = statsData->procs[pe];
811   procStat.pe = pe;
812   procStat.total_walltime = m->total_walltime;
813   procStat.idletime = m->idletime;
814   procStat.bg_walltime = m->bg_walltime;
815 #if CMK_LB_CPUTIMER
816   procStat.total_cputime = m->total_cputime;
817   procStat.bg_cputime = m->bg_cputime;
818 #endif
819   procStat.pe_speed = m->pe_speed;
820   //procStat.utilization = 1.0;
821   procStat.available = CmiTrue;
822   procStat.n_objs = m->n_objs;
823
824   int &nobj = statsData->n_objs;
825   int &nmigobj = statsData->n_migrateobjs;
826   for (i=0; i<m->n_objs; i++) {
827       statsData->from_proc[nobj] = statsData->to_proc[nobj] = pe;
828       statsData->objData[nobj] = m->objData[i];
829       if (m->objData[i].migratable) nmigobj++;
830       nobj++;
831       CmiAssert(nobj <= statsData->objData.capacity());
832   }
833   int &n_comm = statsData->n_comm;
834   for (i=0; i<m->n_comm; i++) {
835       statsData->commData[n_comm] = m->commData[i];
836       n_comm++;
837       CmiAssert(n_comm <= statsData->commData.capacity());
838   }
839   delete m;
840 }
841
842 void CentralLB::ReceiveStats(CkMarshalledCLBStatsMessage &msg)
843 {
844 #if CMK_LBDB_ON
845   if (statsMsgsList == NULL) {
846     statsMsgsList = new CLBStatsMsg*[CkNumPes()];
847     CmiAssert(statsMsgsList != NULL);
848     for(int i=0; i < CkNumPes(); i++)
849       statsMsgsList[i] = 0;
850   }
851   if (statsData == NULL) statsData = new LDStats;
852
853     //  loop through all CLBStatsMsg in the incoming msg
854   int count = msg.getCount();
855   for (int num = 0; num < count; num++) 
856   {
857     CLBStatsMsg *m = msg.getMessage(num);
858     CmiAssert(m!=NULL);
859     const int pe = m->from_pe;
860     DEBUGF(("Stats msg received, %d %d %d %p step %d\n", pe,stats_msg_count,m->n_objs,m,step()));
861 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))     
862 /*      
863  *  if(m->step < step()){
864  *    //TODO: if a processor is redoing an old load balance step..
865  *    //tell it that the step is done and that it should not perform any migrations
866  *      thisProxy[pe].ReceiveDummyMigration();
867  *  }*/
868 #endif
869         
870     if(!CmiNodeAlive(pe)){
871         DEBUGF(("[%d] ReceiveStats called from invalidProcessor %d\n",CkMyPe(),pe));
872         continue;
873     }
874         
875     if (m->avail_vector!=NULL) {
876       LBDatabaseObj()->set_avail_vector(m->avail_vector,  m->next_lb);
877     }
878
879     if (statsMsgsList[pe] != 0) {
880       CkPrintf("*** Unexpected CLBStatsMsg in ReceiveStats from PE %d ***\n",
881              pe);
882     } else {
883       statsMsgsList[pe] = m;
884 #if USE_REDUCTION
885       depositData(m);
886 #else
887       // store per processor data right away
888       struct ProcStats &procStat = statsData->procs[pe];
889       procStat.pe = pe;
890       procStat.total_walltime = m->total_walltime;
891       procStat.idletime = m->idletime;
892       procStat.bg_walltime = m->bg_walltime;
893 #if CMK_LB_CPUTIMER
894       procStat.total_cputime = m->total_cputime;
895       procStat.bg_cputime = m->bg_cputime;
896 #endif
897       procStat.pe_speed = m->pe_speed;
898       //procStat.utilization = 1.0;
899       procStat.available = CmiTrue;
900       procStat.n_objs = m->n_objs;
901
902       statsData->n_objs += m->n_objs;
903       statsData->n_comm += m->n_comm;
904 #endif
905       stats_msg_count++;
906     }
907   }    // end of for
908
909   const int clients = CkNumValidPes();
910   DEBUGF(("THIS POINT count = %d, clients = %d\n",stats_msg_count,clients));
911  
912   if (stats_msg_count == clients) {
913         DEBUGF(("[%d] All stats messages received \n",CmiMyPe()));
914     statsData->nprocs() = stats_msg_count;
915     thisProxy[CkMyPe()].LoadBalance();
916   }
917 #endif
918 }
919
920 /** added by Abhinav for receiving msgs via spanning tree */
921 void CentralLB::ReceiveStatsViaTree(CkMarshalledCLBStatsMessage &msg)
922 {
923 #if CMK_LBDB_ON
924         CmiAssert(CkMyPe() != 0);
925         bufMsg.add(msg);         // buffer messages
926         count_msgs++;
927         //CkPrintf("here %d\n", CkMyPe());
928         if (count_msgs == st.numChildren+1) {
929                 if(st.parent == 0)
930                 {
931                         thisProxy[0].ReceiveStats(bufMsg);
932                         //CkPrintf("from %d\n", CkMyPe());
933                 }
934                 else
935                         thisProxy[st.parent].ReceiveStatsViaTree(bufMsg);
936                 count_msgs = 0;
937                 bufMsg.free();
938         } 
939 #endif
940 }
941
942 void CentralLB::LoadBalance()
943 {
944 #if CMK_LBDB_ON
945   int proc;
946   const int clients = CkNumPes();
947
948 #if ! USE_REDUCTION
949   // build data
950   buildStats();
951 #else
952   for (proc = 0; proc < clients; proc++) statsMsgsList[proc] = NULL;
953 #endif
954
955   if (_lb_args.debug()) 
956       CmiPrintf("\nCharmLB> %s: PE [%d] step %d starting at %f Memory: %f MB\n",
957                   lbname, cur_ld_balancer, step(), start_lb_time,
958                   CmiMemoryUsage()/(1024.0*1024.0));
959
960   // if we are in simulation mode read data
961   if (LBSimulation::doSimulation) simulationRead();
962
963   char *availVector = LBDatabaseObj()->availVector();
964   for(proc = 0; proc < clients; proc++)
965       statsData->procs[proc].available = (CmiBool)availVector[proc];
966
967   preprocess(statsData);
968
969 //    CkPrintf("Before Calling Strategy\n");
970
971   if (_lb_args.printSummary()) {
972       LBInfo info(clients);
973         // not take comm data
974       info.getInfo(statsData, clients, 0);
975       LBRealType mLoad, mCpuLoad, totalLoad;
976       info.getSummary(mLoad, mCpuLoad, totalLoad);
977       int nmsgs, nbytes;
978       statsData->computeNonlocalComm(nmsgs, nbytes);
979       CkPrintf("[%d] Load Summary (before LB): max (with bg load): %f max (obj only): %f average: %f at step %d nonlocal: %d msgs %.2fKB.\n", CkMyPe(), mLoad, mCpuLoad, totalLoad/clients, step(), nmsgs, 1.0*nbytes/1024);
980 //      if (_lb_args.debug() > 1) {
981 //        for (int i=0; i<statsData->n_objs; i++)
982 //          CmiPrintf("[%d] %.10f %.10f\n", i, statsData->objData[i].minWall, statsData->objData[i].maxWall);
983 //      }
984   }
985
986 #if CMK_REPLAYSYSTEM
987   LDHandle *loadBalancer_pointers;
988   if (_replaySystem) {
989     loadBalancer_pointers = (LDHandle*)malloc(CkNumPes()*sizeof(LDHandle));
990     for (int i=0; i<statsData->n_objs; ++i) loadBalancer_pointers[statsData->from_proc[i]] = statsData->objData[i].handle.omhandle.ldb;
991   }
992 #endif
993   
994   LBMigrateMsg* migrateMsg = Strategy(statsData);
995 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
996         migrateMsg->step = step();
997 #endif
998
999 #if CMK_REPLAYSYSTEM
1000   CpdHandleLBMessage(&migrateMsg);
1001   if (_replaySystem) {
1002     for (int i=0; i<migrateMsg->n_moves; ++i) migrateMsg->moves[i].obj.omhandle.ldb = loadBalancer_pointers[migrateMsg->moves[i].from_pe];
1003     free(loadBalancer_pointers);
1004   }
1005 #endif
1006   
1007   LBDatabaseObj()->get_avail_vector(migrateMsg->avail_vector);
1008   migrateMsg->next_lb = LBDatabaseObj()->new_lbbalancer();
1009
1010   // if this is the step at which we need to dump the database
1011   simulationWrite();
1012
1013 //  calculate predicted load
1014 //  very time consuming though, so only happen when debugging is on
1015   if (_lb_args.printSummary()) {
1016       LBInfo info(clients);
1017         // not take comm data
1018       getPredictedLoadWithMsg(statsData, clients, migrateMsg, info, 0);
1019       LBRealType mLoad, mCpuLoad, totalLoad;
1020       info.getSummary(mLoad, mCpuLoad, totalLoad);
1021       int nmsgs, nbytes;
1022       statsData->computeNonlocalComm(nmsgs, nbytes);
1023       CkPrintf("[%d] Load Summary (after LB): max (with bg load): %f max (obj only): %f average: %f at step %d nonlocal: %d msgs %.2fKB useMem: %.2fKB.\n", CkMyPe(), mLoad, mCpuLoad, totalLoad/clients, step(), nmsgs, 1.0*nbytes/1024, (1.0*useMem())/1024);
1024       for (int i=0; i<clients; i++)
1025         migrateMsg->expectedLoad[i] = info.peLoads[i];
1026   }
1027
1028   DEBUGF(("[%d]calling recv migration\n",CkMyPe()));
1029 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_)) 
1030     lbDecisionCount++;
1031     migrateMsg->lbDecisionCount = lbDecisionCount;
1032 #endif
1033
1034   envelope *env = UsrToEnv(migrateMsg);
1035   if (1) {
1036       // broadcast
1037     thisProxy.ReceiveMigration(migrateMsg);
1038   }
1039   else {
1040     // split the migration for each processor
1041     for (int p=0; p<CkNumPes(); p++) {
1042       LBMigrateMsg *m = extractMigrateMsg(migrateMsg, p);
1043       thisProxy[p].ReceiveMigration(m);
1044     }
1045     delete migrateMsg;
1046   }
1047
1048   // Zero out data structures for next cycle
1049   // CkPrintf("zeroing out data\n");
1050   statsData->clear();
1051   stats_msg_count=0;
1052 #endif
1053 }
1054
1055 // test if sender and receiver in a commData is nonmigratable.
1056 static int isMigratable(LDObjData **objData, int *len, int count, const LDCommData &commData)
1057 {
1058 #if CMK_LBDB_ON
1059   for (int pe=0 ; pe<count; pe++)
1060   {
1061     for (int i=0; i<len[pe]; i++)
1062       if (LDObjIDEqual(objData[pe][i].objID(), commData.sender.objID()) ||
1063           LDObjIDEqual(objData[pe][i].objID(), commData.receiver.get_destObj().objID())) 
1064       return 0;
1065   }
1066 #endif
1067   return 1;
1068 }
1069
1070 // rebuild LDStats and remove all non-migratble objects and related things
1071 void CentralLB::removeNonMigratable(LDStats* stats, int count)
1072 {
1073   int i;
1074
1075   // check if we have non-migratable objects
1076   int have = 0;
1077   for (i=0; i<stats->n_objs; i++) 
1078   {
1079     LDObjData &odata = stats->objData[i];
1080     if (!odata.migratable) {
1081       have = 1; break;
1082     }
1083   }
1084   if (have == 0) return;
1085
1086   CkVec<LDObjData> nonmig;
1087   CkVec<int> new_from_proc, new_to_proc;
1088   nonmig.resize(stats->n_migrateobjs);
1089   new_from_proc.resize(stats->n_migrateobjs);
1090   new_to_proc.resize(stats->n_migrateobjs);
1091   int n_objs = 0;
1092   for (i=0; i<stats->n_objs; i++) 
1093   {
1094     LDObjData &odata = stats->objData[i];
1095     if (odata.migratable) {
1096       nonmig[n_objs] = odata;
1097       new_from_proc[n_objs] = stats->from_proc[i];
1098       new_to_proc[n_objs] = stats->to_proc[i];
1099       n_objs ++;
1100     }
1101     else {
1102       stats->procs[stats->from_proc[i]].bg_walltime += odata.wallTime;
1103 #if CMK_LB_CPUTIMER
1104       stats->procs[stats->from_proc[i]].bg_cputime += odata.cpuTime;
1105 #endif
1106     }
1107   }
1108   CmiAssert(stats->n_migrateobjs == n_objs);
1109
1110   stats->makeCommHash();
1111   
1112   CkVec<LDCommData> newCommData;
1113   newCommData.resize(stats->n_comm);
1114   int n_comm = 0;
1115   for (i=0; i<stats->n_comm; i++) 
1116   {
1117     LDCommData& cdata = stats->commData[i];
1118     if (!cdata.from_proc()) 
1119     {
1120       int idx = stats->getSendHash(cdata);
1121       CmiAssert(idx != -1);
1122       if (!stats->objData[idx].migratable) continue;
1123     }
1124     switch (cdata.receiver.get_type()) {
1125     case LD_PROC_MSG:
1126       break;
1127     case LD_OBJ_MSG:  {
1128       int idx = stats->getRecvHash(cdata);
1129       if (stats->complete_flag)
1130         CmiAssert(idx != -1);
1131       else if (idx == -1) continue;          // receiver not in this group
1132       if (!stats->objData[idx].migratable) continue;
1133       break;
1134       }
1135     case LD_OBJLIST_MSG:    // object message FIXME add multicast
1136       break;
1137     }
1138     newCommData[n_comm] = cdata;
1139     n_comm ++;
1140   }
1141
1142   if (n_objs != stats->n_objs) CmiPrintf("Removed %d nonmigratable %d comms - n_objs:%d migratable:%d\n", stats->n_objs-n_objs, stats->n_objs, stats->n_migrateobjs, stats->n_comm-n_comm);
1143
1144   // swap to new data
1145   stats->objData = nonmig;
1146   stats->from_proc = new_from_proc;
1147   stats->to_proc = new_to_proc;
1148   stats->n_objs = n_objs;
1149
1150   stats->commData = newCommData;
1151   stats->n_comm = n_comm;
1152
1153   stats->deleteCommHash();
1154   stats->makeCommHash();
1155
1156 }
1157
1158
1159 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1160 extern int restarted;
1161 #endif
1162
1163 void CentralLB::ReceiveMigration(LBMigrateMsg *m)
1164 {
1165   storedMigrateMsg = m;
1166   CkCallback cb(CkIndex_CentralLB::ProcessReceiveMigration((CkReductionMsg*)NULL),
1167                   thisProxy);
1168   contribute(0, NULL, CkReduction::max_int, cb);
1169
1170   // Reset all adaptive lb related fields since load balancing is being done.
1171   adaptive_struct.lb_no_iterations = -1;
1172   adaptive_lbdb.history_data.clear();
1173   adaptive_struct.prev_load = 0.0;
1174   local_state = OFF;
1175   adaptive_struct.lb_period_informed = false;
1176   adaptive_struct.lb_ideal_period = INT_MAX;
1177   adaptive_struct.lb_calculated_period = INT_MAX;
1178   adaptive_struct.lb_msg_send_no = 0;
1179   adaptive_struct.lb_msg_recv_no = 0;
1180 }
1181
1182 void CentralLB::ProcessReceiveMigration(CkReductionMsg  *msg)
1183 {
1184 #if CMK_LBDB_ON
1185         int i;
1186         LBMigrateMsg *m = storedMigrateMsg;
1187         CmiAssert(m!=NULL);
1188         delete msg;
1189
1190 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1191         int *dummyCounts;
1192
1193         DEBUGF(("[%d] Starting ReceiveMigration WITH step %d m->step %d\n",CkMyPe(),step(),m->step));
1194         // CmiPrintf("[%d] Starting ReceiveMigration step %d m->step %d\n",CkMyPe(),step(),m->step);
1195         if(step() > m->step){
1196                 char str[100];
1197                 envelope *env = UsrToEnv(m);
1198                 CmiPrintf("[%d] Object %s tProcessed %d m->TN %d\n",CmiMyPe(),mlogData->objID.toString(str),mlogData->tProcessed,env->TN);
1199                 return;
1200         }
1201         lbDecisionCount = m->lbDecisionCount;
1202 #endif
1203
1204   if (_lb_args.debug() > 1) 
1205     if (CkMyPe()%1024==0) CmiPrintf("[%d] Starting ReceiveMigration step %d at %f\n",CkMyPe(),step(), CmiWallTimer());
1206
1207   for (i=0; i<CkNumPes(); i++) theLbdb->lastLBInfo.expectedLoad[i] = m->expectedLoad[i];
1208   CmiAssert(migrates_expected <= 0 || migrates_completed == migrates_expected);
1209 /*FAULT_EVAC*/
1210   if(!CmiNodeAlive(CkMyPe())){
1211         delete m;
1212         return;
1213   }
1214   migrates_expected = 0;
1215   future_migrates_expected = 0;
1216 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1217         int sending=0;
1218     int dummy=0;
1219         LBDB *_myLBDB = theLbdb->getLBDB();
1220         if(_restartFlag){
1221         dummyCounts = new int[CmiNumPes()];
1222         bzero(dummyCounts,sizeof(int)*CmiNumPes());
1223     }
1224 #endif
1225   for(i=0; i < m->n_moves; i++) {
1226     MigrateInfo& move = m->moves[i];
1227     const int me = CkMyPe();
1228     if (move.from_pe == me && move.to_pe != me) {
1229       DEBUGF(("[%d] migrating object to %d\n",move.from_pe,move.to_pe));
1230       // migrate object, in case it is already gone, inform toPe
1231 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
1232       if (theLbdb->Migrate(move.obj,move.to_pe) == 0) 
1233          thisProxy[move.to_pe].MissMigrate(!move.async_arrival);
1234 #else
1235             if(_restartFlag == 0){
1236                 DEBUG(CmiPrintf("[%d] need to move object from %d to %d \n",CkMyPe(),move.from_pe,move.to_pe));
1237                 theLbdb->Migrate(move.obj,move.to_pe);
1238                 sending++;
1239             }else{
1240                 if(_myLBDB->validObjHandle(move.obj)){
1241                     DEBUG(CmiPrintf("[%d] need to move object from %d to %d \n",CkMyPe(),move.from_pe,move.to_pe));
1242                     theLbdb->Migrate(move.obj,move.to_pe);
1243                     sending++;
1244                 }else{
1245                     DEBUG(CmiPrintf("[%d] dummy move to pe %d detected after restart \n",CmiMyPe(),move.to_pe));
1246                     dummyCounts[move.to_pe]++;
1247                     dummy++;
1248                 }
1249             }
1250 #endif
1251     } else if (move.from_pe != me && move.to_pe == me) {
1252        DEBUGF(("[%d] expecting object from %d\n",move.to_pe,move.from_pe));
1253       if (!move.async_arrival) migrates_expected++;
1254       else future_migrates_expected++;
1255     }
1256   }
1257   DEBUGF(("[%d] in ReceiveMigration %d moves expected: %d future expected: %d\n",CkMyPe(),m->n_moves, migrates_expected, future_migrates_expected));
1258   // if (_lb_debug) CkPrintf("[%d] expecting %d objects migrating.\n", CkMyPe(), migrates_expected);
1259
1260 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1261         if(_restartFlag){
1262                 sendDummyMigrationCounts(dummyCounts);
1263                 _restartFlag  =0;
1264         delete []dummyCounts;
1265         }
1266 #endif
1267
1268
1269 #if 0
1270   if (m->n_moves ==0) {
1271     theLbdb->SetLBPeriod(theLbdb->GetLBPeriod()*2);
1272   }
1273 #endif
1274   cur_ld_balancer = m->next_lb;
1275   if((CkMyPe() == cur_ld_balancer) && (cur_ld_balancer != 0)){
1276       LBDatabaseObj()->set_avail_vector(m->avail_vector, -2);
1277   }
1278
1279   if (migrates_expected == 0 || migrates_completed == migrates_expected)
1280     MigrationDone(1);
1281   delete m;
1282
1283 //      CkEvacuatedElement();
1284 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1285 //  migrates_expected = 0;
1286 //  //  ResumeClients(1);
1287 #endif
1288 #endif
1289 }
1290
1291 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1292 void CentralLB::ReceiveDummyMigration(int globalDecisionCount){
1293     DEBUGF(("[%d] ReceiveDummyMigration called for step %d with globalDecisionCount %d\n",CkMyPe(),step(),globalDecisionCount));
1294     //TODO: this is gonna be important when a crash happens during checkpoint
1295     //the globalDecisionCount would have to be saved and compared against
1296     //a future recvMigration
1297                 
1298         thisProxy[CkMyPe()].ResumeClients(1);
1299 }
1300 #endif
1301
1302 void CentralLB::MigrationDone(int balancing)
1303 {
1304 #if CMK_LBDB_ON
1305   migrates_completed = 0;
1306   migrates_expected = -1;
1307   // clear load stats
1308   if (balancing) theLbdb->ClearLoads();
1309   // Increment to next step
1310   theLbdb->incStep();
1311         DEBUGF(("[%d] Incrementing Step %d \n",CkMyPe(),step()));
1312   // if sync resume, invoke a barrier
1313
1314 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1315     savedBalancing = balancing;
1316     startLoadBalancingMlog(&resumeCentralLbAfterChkpt,(void *)this);
1317 #endif
1318
1319   LBDatabase::Object()->MigrationDone();    // call registered callbacks
1320
1321   LoadbalanceDone(balancing);        // callback
1322 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
1323   // if sync resume invoke a barrier
1324   if (balancing && _lb_args.syncResume()) {
1325     CkCallback cb(CkIndex_CentralLB::ResumeClients((CkReductionMsg*)NULL), 
1326                   thisProxy);
1327     contribute(0, NULL, CkReduction::sum_int, cb);
1328   }
1329   else{ 
1330     if(CmiNodeAlive(CkMyPe())){
1331         thisProxy [CkMyPe()].ResumeClients(balancing);
1332     }   
1333   }     
1334 #if CMK_GRID_QUEUE_AVAILABLE
1335   CmiGridQueueDeregisterAll ();
1336   CpvAccess(CkGridObject) = NULL;
1337 #endif
1338 #endif 
1339 #endif
1340 }
1341
1342 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1343 void CentralLB::endMigrationDone(int balancing){
1344     DEBUGF(("[%d] CentralLB::endMigrationDone step %d\n",CkMyPe(),step()));
1345
1346
1347   if (balancing && _lb_args.syncResume()) {
1348     CkCallback cb(CkIndex_CentralLB::ResumeClients((CkReductionMsg*)NULL),
1349                   thisProxy);
1350     contribute(0, NULL, CkReduction::sum_int, cb);
1351   }
1352   else{
1353     if(CmiNodeAlive(CkMyPe())){
1354     DEBUGF(("[%d] Sending ResumeClients balancing %d \n",CkMyPe(),balancing));
1355     thisProxy [CkMyPe()].ResumeClients(balancing);
1356     }
1357   }
1358
1359 }
1360 #endif
1361
1362 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1363 void resumeCentralLbAfterChkpt(void *_lb){
1364     CentralLB *lb= (CentralLB *)_lb;
1365     CpvAccess(_currentObj)=lb;
1366     lb->endMigrationDone(lb->savedBalancing);
1367 }
1368 #endif
1369
1370
1371 void CentralLB::ResumeClients(CkReductionMsg *msg)
1372 {
1373   ResumeClients(1);
1374   delete msg;
1375 }
1376
1377 void CentralLB::ResumeClients(int balancing)
1378 {
1379 #if CMK_LBDB_ON
1380 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1381     resumeCount++;
1382     globalResumeCount = resumeCount;
1383 #endif
1384   DEBUGF(("[%d] Resuming clients. balancing:%d.\n",CkMyPe(),balancing));
1385   if (balancing && _lb_args.debug() && CkMyPe() == cur_ld_balancer) {
1386     double end_lb_time = CkWallTimer();
1387   }
1388
1389 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
1390   if (balancing) ComlibNotifyMigrationDone();  
1391 #endif
1392
1393   theLbdb->ResumeClients();
1394   if (balancing)  {
1395
1396     CheckMigrationComplete();
1397     if (future_migrates_expected == 0 || 
1398             future_migrates_expected == future_migrates_completed) {
1399       CheckMigrationComplete();
1400     }
1401   }
1402 #endif
1403 }
1404
1405 /*
1406   migration of objects contains two different kinds:
1407   (1) objects want to make a barrier for migration completion
1408       (waitForBarrier is true)
1409       migrationDone() to finish and resumeClients
1410   (2) objects don't need a barrier
1411   However, next load balancing can only happen when both migrations complete
1412 */ 
1413 void CentralLB::CheckMigrationComplete()
1414 {
1415 #if CMK_LBDB_ON
1416   lbdone ++;
1417   if (lbdone == 2) {
1418     if (_lb_args.debug() && CkMyPe()==0) {
1419       double end_lb_time = CkWallTimer();
1420       CkPrintf("CharmLB> %s: PE [%d] step %d finished at %f duration %f s\n\n",
1421                 lbname, cur_ld_balancer, step()-1, end_lb_time,
1422                 end_lb_time-start_lb_time);
1423     }
1424
1425     adaptive_struct.lb_migration_cost = (CkWallTimer() - start_lb_time);
1426
1427     lbdone = 0;
1428     future_migrates_expected = -1;
1429     future_migrates_completed = 0;
1430
1431
1432     DEBUGF(("[%d] Migration Complete\n", CkMyPe()));
1433     // release local barrier  so that the next load balancer can go
1434     LDOMHandle h;
1435     h.id.id.idx = 0;
1436     theLbdb->getLBDB()->DoneRegisteringObjects(h);
1437     // switch to the next load balancer in the list
1438     // subtle: called from Migrated() may result in Migrated() called in next LB
1439     theLbdb->nextLoadbalancer(seqno);
1440   }
1441 #endif
1442 }
1443
1444 void CentralLB::preprocess(LDStats* stats)
1445 {
1446   if (_lb_args.ignoreBgLoad())
1447     stats->clearBgLoad();
1448
1449   // Call the predictor for the future
1450   if (_lb_predict) FuturePredictor(statsData);
1451 }
1452
1453 // default load balancing strategy
1454 LBMigrateMsg* CentralLB::Strategy(LDStats* stats)
1455 {
1456 #if CMK_LBDB_ON
1457   double strat_start_time = CkWallTimer();
1458   if (_lb_args.debug())
1459     CkPrintf("CharmLB> %s: PE [%d] strategy starting at %f\n", lbname, cur_ld_balancer, strat_start_time);
1460
1461   work(stats);
1462
1463   if (_lb_args.debug()>1)  {
1464     CkPrintf("CharmLB> Obj Map:\n");
1465     for (int i=0; i<stats->n_objs; i++) CkPrintf("%d ", stats->to_proc[i]);
1466     CkPrintf("\n");
1467   }
1468
1469   LBMigrateMsg *msg = createMigrateMsg(stats);
1470
1471   if (_lb_args.debug()) {
1472     double strat_end_time = CkWallTimer();
1473     envelope *env = UsrToEnv(msg);
1474
1475     double lbdbMemsize = LBDatabase::Object()->useMem()/1000;
1476     CkPrintf("CharmLB> %s: PE [%d] Memory: LBManager: %d KB CentralLB: %d KB\n",
1477               lbname, cur_ld_balancer, (int)lbdbMemsize, (int)(useMem()/1000));
1478     CkPrintf("CharmLB> %s: PE [%d] #Objects migrating: %d, LBMigrateMsg size: %.2f MB\n", lbname, cur_ld_balancer, msg->n_moves, env->getTotalsize()/1024.0/1024.0);
1479     CkPrintf("CharmLB> %s: PE [%d] strategy finished at %f duration %f s\n",
1480               lbname, cur_ld_balancer, strat_end_time, strat_end_time-strat_start_time);
1481     adaptive_struct.lb_strategy_cost = (strat_end_time - strat_start_time);
1482     CkPrintf("Strategy cost %f %f %f\n", strat_end_time, strat_start_time, adaptive_struct.lb_strategy_cost);
1483   }
1484   return msg;
1485 #else
1486   return NULL;
1487 #endif
1488 }
1489
1490 void CentralLB::work(LDStats* stats)
1491 {
1492   // does nothing but print the database
1493   stats->print();
1494 }
1495
1496 // generate migrate message from stats->from_proc and to_proc
1497 LBMigrateMsg * CentralLB::createMigrateMsg(LDStats* stats)
1498 {
1499   int i;
1500   CkVec<MigrateInfo*> migrateInfo;
1501   for (i=0; i<stats->n_objs; i++) {
1502     LDObjData &objData = stats->objData[i];
1503     int frompe = stats->from_proc[i];
1504     int tope = stats->to_proc[i];
1505     if (frompe != tope) {
1506       //      CkPrintf("[%d] Obj %d migrating from %d to %d\n",
1507       //         CkMyPe(),obj,pe,dest);
1508       MigrateInfo *migrateMe = new MigrateInfo;
1509       migrateMe->obj = objData.handle;
1510       migrateMe->from_pe = frompe;
1511       migrateMe->to_pe = tope;
1512       migrateMe->async_arrival = objData.asyncArrival;
1513       migrateInfo.insertAtEnd(migrateMe);
1514     }
1515   }
1516
1517   int migrate_count=migrateInfo.length();
1518   LBMigrateMsg* msg = new(migrate_count,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
1519   msg->n_moves = migrate_count;
1520   for(i=0; i < migrate_count; i++) {
1521     MigrateInfo* item = (MigrateInfo*) migrateInfo[i];
1522     msg->moves[i] = *item;
1523     delete item;
1524     migrateInfo[i] = 0;
1525   }
1526   return msg;
1527 }
1528
1529 LBMigrateMsg * CentralLB::extractMigrateMsg(LBMigrateMsg *m, int p)
1530 {
1531   int nmoves = 0;
1532   int nunavail = 0;
1533   int i;
1534   for (i=0; i<m->n_moves; i++) {
1535     MigrateInfo* item = (MigrateInfo*) &m->moves[i];
1536     if (item->from_pe == p || item->to_pe == p) nmoves++;
1537   }
1538   for (i=0; i<CkNumPes();i++) {
1539     if (!m->avail_vector[i]) nunavail++;
1540   }
1541   LBMigrateMsg* msg;
1542   if (nunavail) msg = new(nmoves,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
1543   else msg = new(nmoves,0,0,0) LBMigrateMsg;
1544   msg->n_moves = nmoves;
1545   msg->level = m->level;
1546   msg->next_lb = m->next_lb;
1547   for (i=0,nmoves=0; i<m->n_moves; i++) {
1548     MigrateInfo* item = (MigrateInfo*) &m->moves[i];
1549     if (item->from_pe == p || item->to_pe == p) {
1550       msg->moves[nmoves] = *item;
1551       nmoves++;
1552     }
1553   }
1554   // copy processor data
1555   if (nunavail)
1556   for (i=0; i<CkNumPes();i++) {
1557     msg->avail_vector[i] = m->avail_vector[i];
1558     msg->expectedLoad[i] = m->expectedLoad[i];
1559   }
1560   return msg;
1561 }
1562
1563 void CentralLB::simulationWrite() {
1564   if(step() == LBSimulation::dumpStep)
1565   {
1566     // here we are supposed to dump the database
1567     int dumpFileSize = strlen(LBSimulation::dumpFile) + 4;
1568     char *dumpFileName = (char *)malloc(dumpFileSize);
1569     while (sprintf(dumpFileName, "%s.%d", LBSimulation::dumpFile, LBSimulation::dumpStep) >= dumpFileSize) {
1570       free(dumpFileName);
1571       dumpFileSize+=3;
1572       dumpFileName = (char *)malloc(dumpFileSize);
1573     }
1574     writeStatsMsgs(dumpFileName);
1575     free(dumpFileName);
1576     CmiPrintf("LBDump: Dumped the load balancing data at step %d.\n",LBSimulation::dumpStep);
1577     ++LBSimulation::dumpStep;
1578     --LBSimulation::dumpStepSize;
1579     if (LBSimulation::dumpStepSize <= 0) { // prevent stupid step sizes
1580       CmiPrintf("Charm++> Exiting...\n");
1581       CkExit();
1582     }
1583     return;
1584   }
1585 }
1586
1587 void CentralLB::simulationRead() {
1588   LBSimulation *simResults = NULL, *realResults;
1589   LBMigrateMsg *voidMessage = new (0,0,0,0) LBMigrateMsg();
1590   voidMessage->n_moves=0;
1591   for ( ;LBSimulation::simStepSize > 0; --LBSimulation::simStepSize, ++LBSimulation::simStep) {
1592     // here we are supposed to read the data from the dump database
1593     int simFileSize = strlen(LBSimulation::dumpFile) + 4;
1594     char *simFileName = (char *)malloc(simFileSize);
1595     while (sprintf(simFileName, "%s.%d", LBSimulation::dumpFile, LBSimulation::simStep) >= simFileSize) {
1596       free(simFileName);
1597       simFileSize+=3;
1598       simFileName = (char *)malloc(simFileSize);
1599     }
1600     readStatsMsgs(simFileName);
1601
1602     // allocate simResults (only the first step)
1603     if (simResults == NULL) {
1604       simResults = new LBSimulation(LBSimulation::simProcs);
1605       realResults = new LBSimulation(LBSimulation::simProcs);
1606     }
1607     else {
1608       // should be the same number of procs of the original simulation!
1609       if (!LBSimulation::procsChanged) {
1610         // it means we have a previous step, so in simResults there is data.
1611         // we can now print the real effects of the load balancer during the simulation
1612         // or print the difference between the predicted data and the real one.
1613         realResults->reset();
1614         // reset to_proc of statsData to be equal to from_proc
1615         for (int k=0; k < statsData->n_objs; ++k) statsData->to_proc[k] = statsData->from_proc[k];
1616         findSimResults(statsData, LBSimulation::simProcs, voidMessage, realResults);
1617         simResults->PrintDifferences(realResults,statsData);
1618       }
1619       simResults->reset();
1620     }
1621
1622     // now pass it to the strategy routine
1623     double startT = CkWallTimer();
1624     preprocess(statsData);
1625     CmiPrintf("%s> Strategy starts ... \n", lbname);
1626     LBMigrateMsg* migrateMsg = Strategy(statsData);
1627     CmiPrintf("%s> Strategy took %fs memory usage: CentralLB: %d KB.\n",
1628                lbname, CkWallTimer()-startT, (int)(useMem()/1000));
1629
1630     // now calculate the results of the load balancing simulation
1631     findSimResults(statsData, LBSimulation::simProcs, migrateMsg, simResults);
1632
1633     // now we have the simulation data, so print it and loop
1634     CmiPrintf("Charm++> LBSim: Simulation of load balancing step %d done.\n",LBSimulation::simStep);
1635     // **CWL** Officially recording my disdain here for using ints for bool
1636     if (LBSimulation::showDecisionsOnly) {
1637       simResults->PrintDecisions(migrateMsg, simFileName, 
1638                                  LBSimulation::simProcs);
1639     } else {
1640       simResults->PrintSimulationResults();
1641     }
1642
1643     free(simFileName);
1644     delete migrateMsg;
1645     CmiPrintf("Charm++> LBSim: Passing to the next step\n");
1646   }
1647   // deallocate simResults
1648   delete simResults;
1649   CmiPrintf("Charm++> Exiting...\n");
1650   CkExit();
1651 }
1652
1653 void CentralLB::readStatsMsgs(const char* filename) 
1654 {
1655 #if CMK_LBDB_ON
1656   int i;
1657   FILE *f = fopen(filename, "r");
1658   if (f==NULL) {
1659     CmiPrintf("Fatal Error> Cannot open LB Dump file %s!\n", filename);
1660     CmiAbort("");
1661   }
1662
1663   // at this stage, we need to rebuild the statsMsgList and
1664   // statsDataList structures. For that first deallocate the
1665   // old structures
1666   if (statsMsgsList) {
1667     for(i = 0; i < stats_msg_count; i++)
1668       delete statsMsgsList[i];
1669     delete[] statsMsgsList;
1670     statsMsgsList=0;
1671   }
1672
1673   PUP::fromDisk pd(f);
1674   PUP::machineInfo machInfo;
1675
1676   pd((char *)&machInfo, sizeof(machInfo));      // read machine info
1677   PUP::xlater p(machInfo, pd);
1678
1679   if (_lb_args.lbversion() > 1) {
1680     p|_lb_args.lbversion();             // write version number
1681     CkPrintf("LB> File version detected: %d\n", _lb_args.lbversion());
1682     CmiAssert(_lb_args.lbversion() <= LB_FORMAT_VERSION);
1683   }
1684   p|stats_msg_count;
1685
1686   CmiPrintf("readStatsMsgs for %d pes starts ... \n", stats_msg_count);
1687   if (LBSimulation::simProcs == 0) LBSimulation::simProcs = stats_msg_count;
1688   if (LBSimulation::simProcs != stats_msg_count) LBSimulation::procsChanged = true;
1689
1690   // LBSimulation::simProcs must be set
1691   statsData->pup(p);
1692
1693   CmiPrintf("Simulation for %d pes \n", LBSimulation::simProcs);
1694   CmiPrintf("n_obj: %d n_migratble: %d \n", statsData->n_objs, statsData->n_migrateobjs);
1695
1696   // file f is closed in the destructor of PUP::fromDisk
1697   CmiPrintf("ReadStatsMsg from %s completed\n", filename);
1698 #endif
1699 }
1700
1701 void CentralLB::writeStatsMsgs(const char* filename) 
1702 {
1703 #if CMK_LBDB_ON
1704   FILE *f = fopen(filename, "w");
1705   if (f==NULL) {
1706     CmiPrintf("Fatal Error> writeStatsMsgs failed to open the output file %s!\n", filename);
1707     CmiAbort("");
1708   }
1709
1710   const PUP::machineInfo &machInfo = PUP::machineInfo::current();
1711   PUP::toDisk p(f);
1712   p((char *)&machInfo, sizeof(machInfo));       // machine info
1713
1714   p|_lb_args.lbversion();               // write version number
1715   p|stats_msg_count;
1716   statsData->pup(p);
1717
1718   fclose(f);
1719
1720   CmiPrintf("WriteStatsMsgs to %s succeed!\n", filename);
1721 #endif
1722 }
1723
1724 // calculate the predicted wallclock/cpu load for every processors
1725 // considering communication overhead if considerComm is true
1726 void getPredictedLoadWithMsg(BaseLB::LDStats* stats, int count, 
1727                       LBMigrateMsg *msg, LBInfo &info, 
1728                       int considerComm)
1729 {
1730 #if CMK_LBDB_ON
1731         stats->makeCommHash();
1732
1733         // update to_proc according to migration msgs
1734         for(int i = 0; i < msg->n_moves; i++) {
1735           MigrateInfo &mInfo = msg->moves[i];
1736           int idx = stats->getHash(mInfo.obj.objID(), mInfo.obj.omID());
1737           CmiAssert(idx != -1);
1738           stats->to_proc[idx] = mInfo.to_pe;
1739         }
1740
1741         info.getInfo(stats, count, considerComm);
1742 #endif
1743 }
1744
1745
1746 void CentralLB::findSimResults(LDStats* stats, int count, LBMigrateMsg* msg, LBSimulation* simResults)
1747 {
1748     CkAssert(simResults != NULL && count == simResults->numPes);
1749     // estimate the new loads of the processors. As a first approximation, this is the
1750     // sum of the cpu times of the objects on that processor
1751     double startT = CkWallTimer();
1752     getPredictedLoadWithMsg(stats, count, msg, simResults->lbinfo, 1);
1753     CmiPrintf("getPredictedLoad finished in %fs\n", CkWallTimer()-startT);
1754 }
1755
1756 void CentralLB::pup(PUP::er &p) { 
1757   BaseLB::pup(p); 
1758   if (p.isUnpacking())  {
1759     initLB(CkLBOptions(seqno)); 
1760   }
1761   p|reduction_started;
1762   int has_statsMsg=0;
1763   if (p.isPacking()) has_statsMsg = (statsMsg!=NULL);
1764   p|has_statsMsg;
1765   if (has_statsMsg) {
1766     if (p.isUnpacking())
1767       statsMsg = new CLBStatsMsg;
1768     statsMsg->pup(p);
1769   }
1770 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1771   p | lbDecisionCount;
1772   p | resumeCount;
1773 #endif
1774         
1775 }
1776
1777 int CentralLB::useMem() { 
1778   return sizeof(CentralLB) + statsData->useMem() + 
1779          CkNumPes() * sizeof(CLBStatsMsg *);
1780 }
1781
1782
1783 /**
1784   CLBStatsMsg is not a real message now.
1785   CLBStatsMsg is used for all processors to fill in their local load and comm
1786   statistics and send to processor 0
1787 */
1788
1789 CLBStatsMsg::CLBStatsMsg(int osz, int csz) {
1790   n_objs = osz;
1791   n_comm = csz;
1792   objData = new LDObjData[osz];
1793   commData = new LDCommData[csz];
1794   avail_vector = NULL;
1795 }
1796
1797 CLBStatsMsg::~CLBStatsMsg() {
1798   delete [] objData;
1799   delete [] commData;
1800   delete [] avail_vector;
1801 }
1802
1803 void CLBStatsMsg::pup(PUP::er &p) {
1804   int i;
1805   p|from_pe;
1806   p|pe_speed;
1807   p|total_walltime;
1808   p|idletime;
1809   p|bg_walltime;
1810 #if CMK_LB_CPUTIMER
1811   p|total_cputime;
1812   p|bg_cputime;
1813 #endif
1814 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1815   p | step;
1816 #endif
1817   p|n_objs;
1818   if (p.isUnpacking()) objData = new LDObjData[n_objs];
1819   for (i=0; i<n_objs; i++) p|objData[i];
1820   p|n_comm;
1821   if (p.isUnpacking()) commData = new LDCommData[n_comm];
1822   for (i=0; i<n_comm; i++) p|commData[i];
1823
1824   int has_avail_vector;
1825   if (!p.isUnpacking()) has_avail_vector = (avail_vector != NULL);
1826   p|has_avail_vector;
1827   if (p.isUnpacking()) {
1828     if (has_avail_vector) avail_vector = new char[CkNumPes()];
1829     else avail_vector = NULL;
1830   }
1831   if (has_avail_vector) p(avail_vector, CkNumPes());
1832
1833   p(next_lb);
1834 }
1835
1836 // CkMarshalledCLBStatsMessage is used in the marshalled parameter in
1837 // the entry function, it is just used to use to pup.
1838 // I don't use CLBStatsMsg directly as marshalled parameter because
1839 // I want the data pointer stored and not to be freed by the Charm++.
1840 void CkMarshalledCLBStatsMessage::free() { 
1841   int count = msgs.size();
1842   for  (int i=0; i<count; i++) {
1843     delete msgs[i];
1844     msgs[i] = NULL;
1845   }
1846   msgs.free();
1847 }
1848
1849 void CkMarshalledCLBStatsMessage::add(CkMarshalledCLBStatsMessage &m)
1850 {
1851   int count = m.getCount();
1852   for (int i=0; i<count; i++) add(m.getMessage(i));
1853 }
1854
1855 void CkMarshalledCLBStatsMessage::pup(PUP::er &p)
1856 {
1857   int count = msgs.size();
1858   p|count;
1859   for (int i=0; i<count; i++) {
1860     CLBStatsMsg *msg;
1861     if (p.isUnpacking()) msg = new CLBStatsMsg;
1862     else { 
1863       msg = msgs[i]; CmiAssert(msg!=NULL);
1864     }
1865     msg->pup(p);
1866     if (p.isUnpacking()) add(msg);
1867   }
1868 }
1869
1870 SpanningTree::SpanningTree()
1871 {
1872         double sq = sqrt(CkNumPes()*4.0-3.0) - 1; // 1 + arity + arity*arity = CkNumPes()
1873         arity = (int)ceil(sq/2);
1874         calcParent(CkMyPe());
1875         calcNumChildren(CkMyPe());
1876 }
1877
1878 void SpanningTree::calcParent(int n)
1879 {
1880         parent=-1;
1881         if(n != 0  && arity > 0)
1882                 parent = (n-1)/arity;
1883 }
1884
1885 void SpanningTree::calcNumChildren(int n)
1886 {
1887         numChildren = 0;
1888         if (arity == 0) return;
1889         int fullNode=(CkNumPes()-1-arity)/arity;
1890         if(n <= fullNode)
1891                 numChildren = arity;
1892         if(n == fullNode+1)
1893                 numChildren = CkNumPes()-1-(fullNode+1)*arity;
1894         if(n > fullNode+1)
1895                 numChildren = 0;
1896 }
1897
1898 #include "CentralLB.def.h"
1899  
1900 /*@}*/