Changing the way line is plotted
[charm.git] / src / ck-ldb / CentralLB.C
1
2 /**
3  * \addtogroup CkLdb
4 */
5 /*@{*/
6
7 #include <charm++.h>
8 #include "ck.h"
9 #include "envelope.h"
10 #include "CentralLB.h"
11 #include "LBDBManager.h"
12 #include "LBSimulation.h"
13
14 #include "limits.h"
15 #include <vector>
16
17 #define  DEBUGF(x)       // CmiPrintf x;
18 #define  DEBUG(x)        // x;
19
20 #if CMK_MEM_CHECKPOINT
21    /* can not handle reduction in inmem FT */
22 #define USE_REDUCTION         0
23 #define USE_LDB_SPANNING_TREE 0
24 #elif defined(_FAULT_MLOG_)
25 /* can not handle reduction in inmem FT */
26 #define USE_REDUCTION         0
27 #define USE_LDB_SPANNING_TREE 0
28 #else
29 #define USE_REDUCTION         1
30 #define USE_LDB_SPANNING_TREE 1
31 #endif
32
33 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
34 extern int _restartFlag;
35 extern void getGlobalStep(CkGroupID );
36 extern void initMlogLBStep(CkGroupID );
37 extern int globalResumeCount;
38 extern void sendDummyMigrationCounts(int *);
39 #endif
40
41 #if CMK_GRID_QUEUE_AVAILABLE
42 CpvExtern(void *, CkGridObject);
43 #endif
44
45 CkGroupID loadbalancer;
46 int * lb_ptr;
47 int load_balancer_created;
48
49 struct AdaptiveData {
50   int iteration;
51   double max_load;
52   double avg_load;
53 };
54
55 struct AdaptiveLBDatabase {
56   std::vector<AdaptiveData> history_data;
57 } adaptive_lbdb;
58
59 enum state {
60   OFF,
61   ON,
62   PAUSE,
63   DECIDED,
64   LOAD_BALANCE
65 } local_state;
66
67 struct AdaptiveLBStructure {
68   int lb_ideal_period;
69   int lb_calculated_period;
70   int lb_no_iterations;
71   int global_max_iter_no;
72   int global_recv_iter_counter;
73   bool in_progress;
74   double prev_load;
75   double lb_strategy_cost;
76   double lb_migration_cost;
77   bool lb_period_informed;
78   int lb_msg_send_no;
79   int lb_msg_recv_no;
80 } adaptive_struct;
81
82 CreateLBFunc_Def(CentralLB, "CentralLB base class")
83
84 static void getPredictedLoadWithMsg(BaseLB::LDStats* stats, int count, 
85                              LBMigrateMsg *, LBInfo &info, int considerComm);
86
87 CkReductionMsg* lbDataCollection(int nMsg, CkReductionMsg** msgs) {
88   double lb_data[4];
89   lb_data[0] = 0;
90   lb_data[1] = 0;
91   lb_data[2] = 0;
92   for (int i = 0; i < nMsg; i++) {
93     CkAssert(msgs[i]->getSize() == 4*sizeof(double));
94     double* m = (double *)msgs[i]->getData();
95     lb_data[0] += m[0];
96     lb_data[1] = ((m[1] > lb_data[1])? m[1] : lb_data[1]);
97     lb_data[2] += m[2];
98     if (i == 0) {
99       lb_data[3] = m[3];
100     }
101     if (m[3] != lb_data[3]) {
102       CkPrintf("Error!!! Reduction is intermingled between iteration %lf and\
103       %lf\n", lb_data[3], m[3]);
104     }
105   }
106   return CkReductionMsg::buildNew(4*sizeof(double), lb_data);
107 }
108
109 /*global*/ CkReduction::reducerType lbDataCollectionType;
110 /*initcall*/ void registerLBDataCollection(void) {
111   lbDataCollectionType = CkReduction::addReducer(lbDataCollection);
112 }
113
114 /*
115 void CreateCentralLB()
116 {
117   CProxy_CentralLB::ckNew(0);
118 }
119 */
120
121 void CentralLB::staticStartLB(void* data)
122 {
123   CentralLB *me = (CentralLB*)(data);
124   me->StartLB();
125 }
126
127 void CentralLB::staticMigrated(void* data, LDObjHandle h, int waitBarrier)
128 {
129   CentralLB *me = (CentralLB*)(data);
130   me->Migrated(h, waitBarrier);
131 }
132
133 void CentralLB::staticAtSync(void* data)
134 {
135   CentralLB *me = (CentralLB*)(data);
136   me->AtSync();
137 }
138
139 void CentralLB::initLB(const CkLBOptions &opt)
140 {
141 #if CMK_LBDB_ON
142   lbname = "CentralLB";
143   thisProxy = CProxy_CentralLB(thisgroup);
144   //  CkPrintf("Construct in %d\n",CkMyPe());
145
146   // create and turn on by default
147   receiver = theLbdb->
148     AddLocalBarrierReceiver((LDBarrierFn)(staticAtSync),(void*)(this));
149   notifier = theLbdb->getLBDB()->
150     NotifyMigrated((LDMigratedFn)(staticMigrated),(void*)(this));
151   startLbFnHdl = theLbdb->getLBDB()->
152     AddStartLBFn((LDStartLBFn)(staticStartLB),(void*)(this));
153
154   // CkPrintf("[%d] CentralLB initLB \n",CkMyPe());
155   if (opt.getSeqNo() > 0) turnOff();
156
157   stats_msg_count = 0;
158   statsMsgsList = NULL;
159   statsData = NULL;
160
161   storedMigrateMsg = NULL;
162   reduction_started = 0;
163
164   // for future predictor
165   if (_lb_predict) predicted_model = new FutureModel(_lb_predict_window);
166   else predicted_model=0;
167   // register user interface callbacks
168   theLbdb->getLBDB()->SetupPredictor((LDPredictModelFn)(staticPredictorOn),(LDPredictWindowFn)(staticPredictorOnWin),(LDPredictFn)(staticPredictorOff),(LDPredictModelFn)(staticChangePredictor),(void*)(this));
169
170   myspeed = theLbdb->ProcessorSpeed();
171
172   migrates_completed = 0;
173   future_migrates_completed = 0;
174   migrates_expected = -1;
175   future_migrates_expected = -1;
176   cur_ld_balancer = _lb_args.central_pe();      // 0 default
177   lbdone = 0;
178   count_msgs=0;
179   statsMsg = NULL;
180
181   if (_lb_args.statsOn()) theLbdb->CollectStatsOn();
182
183   load_balancer_created = 1;
184
185   // If metabalancer enabled, initialize the variables
186   adaptive_struct.lb_ideal_period =  INT_MAX;
187   adaptive_struct.lb_calculated_period = INT_MAX;
188   adaptive_struct.lb_no_iterations = -1;
189   adaptive_struct.global_max_iter_no = 0;
190   adaptive_struct.global_recv_iter_counter = 0;
191   adaptive_struct.in_progress = false;
192   adaptive_struct.prev_load = 0.0;
193   adaptive_struct.lb_strategy_cost = 0.0;
194   adaptive_struct.lb_migration_cost = 0.0;
195   adaptive_struct.lb_msg_send_no = 0;
196   adaptive_struct.lb_msg_recv_no = 0;
197   local_state = OFF;
198 #endif
199 }
200
201 CentralLB::~CentralLB()
202 {
203 #if CMK_LBDB_ON
204   delete [] statsMsgsList;
205   delete statsData;
206   theLbdb = CProxy_LBDatabase(_lbdb).ckLocalBranch();
207   if (theLbdb) {
208     theLbdb->getLBDB()->
209       RemoveNotifyMigrated(notifier);
210     theLbdb->
211       RemoveStartLBFn((LDStartLBFn)(staticStartLB));
212   }
213 #endif
214 }
215
216 void CentralLB::turnOn() 
217 {
218 #if CMK_LBDB_ON
219   theLbdb->getLBDB()->
220     TurnOnBarrierReceiver(receiver);
221   theLbdb->getLBDB()->
222     TurnOnNotifyMigrated(notifier);
223   theLbdb->getLBDB()->
224     TurnOnStartLBFn(startLbFnHdl);
225 #endif
226 }
227
228 void CentralLB::turnOff() 
229 {
230 #if CMK_LBDB_ON
231   theLbdb->getLBDB()->
232     TurnOffBarrierReceiver(receiver);
233   theLbdb->getLBDB()->
234     TurnOffNotifyMigrated(notifier);
235   theLbdb->getLBDB()->
236     TurnOffStartLBFn(startLbFnHdl);
237 #endif
238 }
239
240 void CentralLB::AtSync()
241 {
242 //  CkPrintf("AtSync CEntral LB [%d]\n", CkMyPe());
243 #if CMK_LBDB_ON
244 //  DEBUGF(("[%d] CentralLB AtSync step %d!!!!!\n",CkMyPe(),step()));
245
246 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
247         CpvAccess(_currentObj)=this;
248 #endif
249
250   // if num of processor is only 1, nothing should happen
251   if (!QueryBalanceNow(step()) || CkNumPes() == 1) {
252     MigrationDone(0);
253     return;
254   }
255   if(CmiNodeAlive(CkMyPe())){
256     thisProxy [CkMyPe()].ProcessAtSyncMin();
257   }
258 #endif
259 }
260
261 #include "ComlibStrategy.h"
262
263 void CentralLB::ProcessAtSync()
264 {
265
266
267
268 #if CMK_LBDB_ON
269   if (reduction_started) return;              // reducton in progress
270
271   CmiAssert(CmiNodeAlive(CkMyPe()));
272   if (CkMyPe() == cur_ld_balancer) {
273     start_lb_time = CkWallTimer();
274   }
275
276
277 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
278         initMlogLBStep(thisgroup);
279 #endif
280
281   // build message
282   BuildStatsMsg();
283
284 #if USE_REDUCTION
285     // reduction to get total number of objects and comm
286     // so that processor 0 can pre-allocate load balancing database
287   int counts[2];
288   counts[0] = theLbdb->GetObjDataSz();
289   counts[1] = theLbdb->GetCommDataSz();
290
291   CkCallback cb(CkIndex_CentralLB::ReceiveCounts((CkReductionMsg*)NULL), 
292                   thisProxy[0]);
293   contribute(2*sizeof(int), counts, CkReduction::sum_int, cb);
294   reduction_started = 1;
295 #else
296   SendStats();
297 #endif
298 #endif
299 }
300
301 void CentralLB::ProcessAtSyncMin()
302 {
303 #if CMK_LBDB_ON
304   if (reduction_started) return;              // reducton in progress
305
306   CmiAssert(CmiNodeAlive(CkMyPe()));
307   if (CkMyPe() == cur_ld_balancer) {
308     start_lb_time = CkWallTimer();
309   }
310
311
312 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
313         initMlogLBStep(thisgroup);
314 #endif
315   
316   adaptive_struct.lb_no_iterations++;
317  // CkPrintf("[%d] ProcessAtSyncMin lb_iteration [%d] adaptive_struct.lb_ideal_period [%d]\n", CkMyPe(),
318  //     adaptive_struct.lb_no_iterations, adaptive_struct.lb_ideal_period);
319
320   // If decision has been made and has reached the lb_period, then do load
321   // balancing, else if hasn't reached ideal_period, then resume.
322   if (local_state == DECIDED) {
323     if (adaptive_struct.lb_no_iterations < adaptive_struct.lb_ideal_period) {
324  //     CkPrintf("[%d] Decision is made but lagging\n", CkMyPe());
325       SendMinStats();
326       ResumeClients(0);
327     } else {
328       local_state = LOAD_BALANCE;
329  //     CkPrintf("[%d] Decision is made and do LB\n", CkMyPe());
330       ProcessAtSync();
331     }
332     return;
333   }
334    
335   // If the state is ON and not DECIDED, then if havn't reached lb_period, then
336   // move ahead. If has reached lb_ideal_period, then change state to PAUSE and
337   // dont resume client.
338   if (local_state == ON) {
339     if (adaptive_struct.lb_no_iterations < adaptive_struct.lb_ideal_period) {
340       SendMinStats();
341       ResumeClients(0);
342     } else {
343       local_state = PAUSE;
344     }
345     return;
346   }
347
348   SendMinStats();
349   ResumeClients(0);
350 #endif
351 }
352
353 void CentralLB::SendMinStats() {
354
355  double total_load;
356  double idle_time;
357  double bg_walltime;
358   theLbdb->GetTime(&total_load,&total_load, &idle_time, &bg_walltime, &bg_walltime);
359  // CkPrintf("Total walltime [%d] %lf: %lf: %lf final laod: %lf\n", CkMyPe(), total_load, idle_time, bg_walltime, (total_load - idle_time));
360
361   // Since the total_load is cumulative since the last load balancing stage,
362   // Hence it is subtracted from the previous load.
363   total_load -= idle_time;
364   double tmp = total_load;
365   total_load -= adaptive_struct.prev_load;
366   adaptive_struct.prev_load = tmp; 
367
368   double lb_data[4];
369   lb_data[0] = total_load;
370   lb_data[1] = total_load;
371   lb_data[2] = 1;
372   lb_data[3] = adaptive_struct.lb_no_iterations;
373   //CkPrintf("[%d] sends total load %lf at iter %d\n", CkMyPe(), total_load, adaptive_struct.lb_no_iterations);
374
375   if (adaptive_struct.lb_no_iterations != 0) {
376     CkCallback cb(CkIndex_CentralLB::ReceiveMinStats((CkReductionMsg*)NULL), 
377         thisProxy[0]);
378     contribute(4*sizeof(double), lb_data, lbDataCollectionType, cb);
379   }
380
381 //    int tmp1 = adaptive_struct.lb_no_iterations;
382 //    CkPrintf("[%d] contribution iteration_no: %d\n",CkMyPe(), tmp1);
383 //    // Send the current iteration no
384 //    CkCallback cb1(CkIndex_CentralLB::ReceiveIterationNo((CkReductionMsg*)NULL), 
385 //        thisProxy[0]);
386 //    contribute(sizeof(int), &tmp1, CkReduction::max_int, cb1);
387 }
388
389 void CentralLB::ReceiveMinStats(CkReductionMsg *msg) {
390   CmiAssert(CkMyPe() == 0);
391   double* load = (LBRealType *) msg->getData();
392   double max = load[1];
393   double avg = load[0]/load[2];
394   int iteration_n = load[3];
395   CkPrintf("Iteration %d Total load : %lf Avg load: %lf Max load: %lf for %lf procs\n",iteration_n, load[0], load[0]/load[2], load[1], load[2]);
396   CkPrintf("Current calculated period %d\n", adaptive_struct.lb_calculated_period);
397   delete msg;
398
399   // Store the data for this iteration
400   AdaptiveData data;
401   data.iteration = adaptive_struct.lb_no_iterations;
402   data.max_load = max;
403   data.avg_load = avg;
404   adaptive_lbdb.history_data.push_back(data);
405
406   // If lb period inform is in progress, dont inform again
407   if (adaptive_struct.in_progress) {
408     return;
409   }
410
411 //  if (adaptive_struct.lb_period_informed) {
412 //    return;
413 //  }
414
415   // If the max/avg ratio is greater than the threshold and also this is not the
416   // step immediately after load balancing, carry out load balancing
417   //if (max/avg >= 1.1 && adaptive_lbdb.history_data.size() > 4) {
418   if (max/avg >= 1.5 && adaptive_lbdb.history_data.size() > 4) {
419     CkPrintf("Carry out load balancing step at iter max/avg(%lf) > 1.1\n", max/avg);
420 //    if (!adaptive_struct.lb_period_informed) {
421 //      // Just for testing
422 //      adaptive_struct.lb_calculated_period = 40;
423 //      adaptive_struct.lb_period_informed = true;
424 //      thisProxy.LoadBalanceDecision(adaptive_struct.lb_calculated_period);
425 //      return;
426 //    }
427
428     // If the new lb period is less than current set lb period
429     if (adaptive_struct.lb_calculated_period > iteration_n + 1) {
430       adaptive_struct.lb_calculated_period = iteration_n + 1;
431       adaptive_struct.lb_period_informed = true;
432       adaptive_struct.in_progress = true;
433       CkPrintf("Informing everyone the lb period is %d\n",
434           adaptive_struct.lb_calculated_period);
435       thisProxy.LoadBalanceDecision(adaptive_struct.lb_msg_send_no++, adaptive_struct.lb_calculated_period);
436     }
437     return;
438   }
439
440   // Generate the plan for the adaptive strategy
441   int period;
442   if (generatePlan(period)) {
443     //CkPrintf("Carry out load balancing step at iter\n");
444
445     // If the new lb period is less than current set lb period
446     if (adaptive_struct.lb_calculated_period > period) {
447       adaptive_struct.lb_calculated_period = period;
448       adaptive_struct.in_progress = true;
449       adaptive_struct.lb_period_informed = true;
450       CkPrintf("Informing everyone the lb period is %d\n",
451           adaptive_struct.lb_calculated_period);
452       thisProxy.LoadBalanceDecision(adaptive_struct.lb_msg_send_no++, adaptive_struct.lb_calculated_period);
453     }
454   }
455 }
456
457 bool CentralLB::generatePlan(int& period) {
458   if (adaptive_lbdb.history_data.size() <= 8) {
459     return false;
460   }
461
462   // Some heuristics for lbperiod
463   // If constant load or almost constant,
464   // then max * new_lb_period > avg * new_lb_period + lb_cost
465   double max = 0.0;
466   double avg = 0.0;
467   AdaptiveData data;
468   for (int i = 0; i < adaptive_lbdb.history_data.size(); i++) {
469     data = adaptive_lbdb.history_data[i];
470     max += data.max_load;
471     avg += data.avg_load;
472     CkPrintf("max (%d, %lf) avg (%d, %lf)\n", i, data.max_load, i, data.avg_load);
473   }
474 //  max /= (adaptive_struct.lb_no_iterations - adaptive_lbdb.history_data[0].iteration);
475 //  avg /= (adaptive_struct.lb_no_iterations - adaptive_lbdb.history_data[0].iteration);
476 //
477 //  adaptive_struct.lb_ideal_period = (adaptive_struct.lb_strategy_cost +
478 //  adaptive_struct.lb_migration_cost) / (max - avg);
479 //  CkPrintf("max : %lf, avg: %lf, strat cost: %lf, migration_cost: %lf, idealperiod : %d \n",
480 //      max, avg, adaptive_struct.lb_strategy_cost, adaptive_struct.lb_migration_cost, adaptive_struct.lb_ideal_period);
481 //
482   // If linearly varying load, then find lb_period
483   // area between the max and avg curve 
484   double mslope, aslope, mc, ac;
485   getLineEq(aslope, ac, mslope, mc);
486   CkPrintf("\n max: %fx + %f; avg: %fx + %f\n", mslope, mc, aslope, ac);
487   double a = (mslope - aslope)/2;
488   double b = (mc - ac);
489   double c = -(adaptive_struct.lb_strategy_cost + adaptive_struct.lb_migration_cost);
490   //c = -2.5;
491   bool got_period = getPeriodForLinear(a, b, c, period);
492   if (!got_period) {
493     return false;
494   }
495   
496   if (mslope < 0) {
497     if (period > (-mc/mslope)) {
498       CkPrintf("Max < 0 Period set when max load is -ve\n");
499       return false;
500     }
501   }
502
503   if (aslope < 0) {
504     if (period > (-ac/aslope)) {
505       CkPrintf("Avg < 0 Period set when avg load is -ve\n");
506       return false;
507     }
508   }
509
510   int intersection_t = (mc-ac) / (aslope - mslope);
511   if (intersection_t > 0 && period > intersection_t) {
512     CkPrintf("Avg | Max Period set when curves intersect\n");
513     return false;
514   }
515   return true;
516 }
517
518 bool CentralLB::getPeriodForLinear(double a, double b, double c, int& period) {
519   CkPrintf("Quadratic Equation %lf X^2 + %lf X + %lf\n", a, b, c);
520   if (a == 0.0) {
521     period = (-c / b);
522     CkPrintf("Ideal period for linear load %d\n", period);
523     return true;
524   }
525   int x;
526   double t = (b * b) - (4*a*c);
527   if (t < 0) {
528     CkPrintf("(b * b) - (4*a*c) is -ve sqrt : %lf\n", sqrt(t));
529     return false;
530   }
531   t = (-b + sqrt(t)) / (2*a);
532   x = t;
533   if (x < 0) {
534     CkPrintf("boo!!! x (%d) < 0\n", x);
535     x = 0;
536     return false;
537   }
538   period = x;
539   CkPrintf("Ideal period for linear load %d\n", period);
540   return true;
541 }
542
543 bool CentralLB::getLineEq(double& aslope, double& ac, double& mslope, double& mc) {
544   int total = adaptive_lbdb.history_data.size();
545   int iterations = 1 + adaptive_lbdb.history_data[total - 1].iteration -
546       adaptive_lbdb.history_data[0].iteration;
547   double a1 = 0;
548   double m1 = 0;
549   double a2 = 0;
550   double m2 = 0;
551   AdaptiveData data;
552   int i = 0;
553   for (i = 0; i < total/2; i++) {
554     data = adaptive_lbdb.history_data[i];
555     m1 += data.max_load;
556     a1 += data.avg_load;
557   }
558   m1 /= i;
559   a1 /= i;
560
561   for (i = total/2; i < total; i++) {
562     data = adaptive_lbdb.history_data[i];
563     m2 += data.max_load;
564     a2 += data.avg_load;
565   }
566   m2 /= (i - total/2);
567   a2 /= (i - total/2);
568
569   aslope = 2 * (a2 - a1) / iterations;
570   mslope = 2 * (m2 - m1) / iterations;
571   ac = adaptive_lbdb.history_data[0].avg_load;
572   mc = adaptive_lbdb.history_data[0].max_load;
573   return true;
574 }
575
576 void CentralLB::LoadBalanceDecision(int req_no, int period) {
577   if (req_no < adaptive_struct.lb_msg_recv_no) {
578     CkPrintf("Error!!! Received a request which was already sent or old\n");
579     return;
580   }
581   //CkPrintf("[%d] Load balance decision made cur iteration: %d period:%d state: %d\n",CkMyPe(), adaptive_struct.lb_no_iterations, period, local_state);
582   adaptive_struct.lb_ideal_period = period;
583   local_state = ON;
584   adaptive_struct.lb_msg_recv_no = req_no;
585   thisProxy[0].ReceiveIterationNo(req_no, adaptive_struct.lb_no_iterations);
586 }
587
588 void CentralLB::LoadBalanceDecisionFinal(int req_no, int period) {
589   if (req_no < adaptive_struct.lb_msg_recv_no) {
590     return;
591   }
592   //CkPrintf("[%d] Final Load balance decision made cur iteration: %d period:%d state: %d\n",CkMyPe(), adaptive_struct.lb_no_iterations, period, local_state);
593   adaptive_struct.lb_ideal_period = period;
594
595   if (local_state == ON) {
596     local_state = DECIDED;
597     return;
598   }
599
600   // If the state is PAUSE, then its waiting for the final decision from central
601   // processor. If the decision is that the ideal period is in the future,
602   // resume. If the ideal period is now, then carry out load balancing.
603   if (local_state == PAUSE) {
604     if (adaptive_struct.lb_no_iterations < adaptive_struct.lb_ideal_period) {
605       local_state = DECIDED;
606       SendMinStats();
607       ResumeClients(0);
608     } else {
609       local_state = LOAD_BALANCE;
610       ProcessAtSync();
611     }
612     return;
613   }
614   CkPrintf("Error!!! Final decision received but the state is invalid %d\n", local_state);
615 }
616
617
618 void CentralLB::ReceiveIterationNo(int req_no, int local_iter_no) {
619   CmiAssert(CkMyPe() == 0);
620
621   adaptive_struct.global_recv_iter_counter++;
622   if (local_iter_no > adaptive_struct.global_max_iter_no) {
623     adaptive_struct.global_max_iter_no = local_iter_no;
624   }
625   if (CkNumPes() == adaptive_struct.global_recv_iter_counter) {
626     adaptive_struct.lb_ideal_period = (adaptive_struct.lb_ideal_period > adaptive_struct.global_max_iter_no) ? adaptive_struct.lb_ideal_period : adaptive_struct.global_max_iter_no + 1;
627     thisProxy.LoadBalanceDecisionFinal(req_no, adaptive_struct.lb_ideal_period);
628     CkPrintf("Final lb_period %d\n", adaptive_struct.lb_ideal_period);
629     adaptive_struct.in_progress = false;
630     adaptive_struct.global_max_iter_no = 0;
631     adaptive_struct.global_recv_iter_counter = 0;
632   }
633 }
634
635 // called only on 0
636 void CentralLB::ReceiveCounts(CkReductionMsg  *msg)
637 {
638   CmiAssert(CkMyPe() == 0);
639   if (statsData == NULL) statsData = new LDStats;
640
641   int *counts = (int *)msg->getData();
642   int n_objs = counts[0];
643   int n_comm = counts[1];
644
645     // resize database
646   statsData->objData.resize(n_objs);
647   statsData->from_proc.resize(n_objs);
648   statsData->to_proc.resize(n_objs);
649   statsData->commData.resize(n_comm);
650
651   DEBUGF(("[%d] ReceiveCounts: n_objs:%d n_comm:%d\n",CkMyPe(), n_objs, n_comm));
652         
653     // broadcast call to let everybody start to send stats
654   thisProxy.SendStats();
655 }
656
657 void CentralLB::BuildStatsMsg()
658 {
659 #if CMK_LBDB_ON
660   // build and send stats
661   const int osz = theLbdb->GetObjDataSz();
662   const int csz = theLbdb->GetCommDataSz();
663
664   int npes = CkNumPes();
665   CLBStatsMsg* msg = new CLBStatsMsg(osz, csz);
666   _MEMCHECK(msg);
667   msg->from_pe = CkMyPe();
668 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
669         msg->step = step();
670 #endif
671   //msg->serial = CrnRand();
672
673 /*
674   theLbdb->TotalTime(&msg->total_walltime,&msg->total_cputime);
675   theLbdb->IdleTime(&msg->idletime);
676   theLbdb->BackgroundLoad(&msg->bg_walltime,&msg->bg_cputime);
677 */
678 #if CMK_LB_CPUTIMER
679   theLbdb->GetTime(&msg->total_walltime,&msg->total_cputime,
680                    &msg->idletime, &msg->bg_walltime,&msg->bg_cputime);
681 #else
682   theLbdb->GetTime(&msg->total_walltime,&msg->total_walltime,
683                    &msg->idletime, &msg->bg_walltime,&msg->bg_walltime);
684 #endif
685
686   msg->pe_speed = myspeed;
687   DEBUGF(("Processor %d Total time (wall,cpu) = %f %f Idle = %f Bg = %f %f\n", CkMyPe(),msg->total_walltime,msg->total_cputime,msg->idletime,msg->bg_walltime,msg->bg_cputime));
688
689   msg->n_objs = osz;
690   theLbdb->GetObjData(msg->objData);
691   msg->n_comm = csz;
692   theLbdb->GetCommData(msg->commData);
693 //  theLbdb->ClearLoads();
694   DEBUGF(("PE %d BuildStatsMsg %d objs, %d comm\n",CkMyPe(),msg->n_objs,msg->n_comm));
695
696   if(CkMyPe() == cur_ld_balancer) {
697     msg->avail_vector = new char[CkNumPes()];
698     LBDatabaseObj()->get_avail_vector(msg->avail_vector);
699     msg->next_lb = LBDatabaseObj()->new_lbbalancer();
700   }
701
702   CmiAssert(statsMsg == NULL);
703   statsMsg = msg;
704 #endif
705 }
706
707
708 // called on every processor
709 void CentralLB::SendStats()
710 {
711 #if CMK_LBDB_ON
712   CmiAssert(statsMsg != NULL);
713   reduction_started = 0;
714
715 #if USE_LDB_SPANNING_TREE
716   if(CkNumPes()>1024)
717   {
718     if (CkMyPe() == cur_ld_balancer)
719       thisProxy[CkMyPe()].ReceiveStats(statsMsg);
720     else
721       thisProxy[CkMyPe()].ReceiveStatsViaTree(statsMsg);
722   }
723   else
724 #endif
725   {
726     DEBUGF(("[%d] calling ReceiveStats on step %d \n",CmiMyPe(),step()));
727     thisProxy[cur_ld_balancer].ReceiveStats(statsMsg);
728   }
729
730   statsMsg = NULL;
731
732 #ifdef __BIGSIM__
733   BgEndStreaming();
734 #endif
735
736   {
737   // enfore the barrier to wait until centralLB says no
738   LDOMHandle h;
739   h.id.id.idx = 0;
740   theLbdb->getLBDB()->RegisteringObjects(h);
741   }
742 #endif
743 }
744
745 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
746 extern int donotCountMigration;
747 #endif
748
749 void CentralLB::Migrated(LDObjHandle h, int waitBarrier)
750 {
751 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
752     if(donotCountMigration){
753         return ;
754     }
755 #endif
756
757 #if CMK_LBDB_ON
758   if (waitBarrier) {
759             migrates_completed++;
760       DEBUGF(("[%d] An object migrated! %d %d\n",CkMyPe(),migrates_completed,migrates_expected));
761     if (migrates_completed == migrates_expected) {
762       MigrationDone(1);
763     }
764   }
765   else {
766     future_migrates_completed ++;
767     DEBUGF(("[%d] An object migrated with no barrier! %d expected: %d\n",CkMyPe(),future_migrates_completed,future_migrates_expected));
768     if (future_migrates_completed == future_migrates_expected)  {
769         CheckMigrationComplete();
770     }
771   }
772 #endif
773 }
774
775 void CentralLB::MissMigrate(int waitForBarrier)
776 {
777   LDObjHandle h;
778   Migrated(h, waitForBarrier);
779 }
780
781 // build a complete data from bufferred messages
782 // not used when USE_REDUCTION = 1
783 void CentralLB::buildStats()
784 {
785     statsData->nprocs() = stats_msg_count;
786     // allocate space
787     statsData->objData.resize(statsData->n_objs);
788     statsData->from_proc.resize(statsData->n_objs);
789     statsData->to_proc.resize(statsData->n_objs);
790     statsData->commData.resize(statsData->n_comm);
791
792     int nobj = 0;
793     int ncom = 0;
794     int nmigobj = 0;
795     // copy all data in individule message to this big structure
796     for (int pe=0; pe<CkNumPes(); pe++) {
797        int i;
798        CLBStatsMsg *msg = statsMsgsList[pe];
799        if(msg == NULL) continue;
800        for (i=0; i<msg->n_objs; i++) {
801          statsData->from_proc[nobj] = statsData->to_proc[nobj] = pe;
802          statsData->objData[nobj] = msg->objData[i];
803          if (msg->objData[i].migratable) nmigobj++;
804          nobj++;
805        }
806        for (i=0; i<msg->n_comm; i++) {
807          statsData->commData[ncom] = msg->commData[i];
808          ncom++;
809        }
810        // free the memory
811        delete msg;
812        statsMsgsList[pe]=0;
813     }
814     statsData->n_migrateobjs = nmigobj;
815 }
816
817 // deposit one processor data at a time, note database is pre-allocated
818 // to have enough space
819 // used when USE_REDUCTION = 1
820 void CentralLB::depositData(CLBStatsMsg *m)
821 {
822   int i;
823   if (m == NULL) return;
824
825   const int pe = m->from_pe;
826   struct ProcStats &procStat = statsData->procs[pe];
827   procStat.pe = pe;
828   procStat.total_walltime = m->total_walltime;
829   procStat.idletime = m->idletime;
830   procStat.bg_walltime = m->bg_walltime;
831 #if CMK_LB_CPUTIMER
832   procStat.total_cputime = m->total_cputime;
833   procStat.bg_cputime = m->bg_cputime;
834 #endif
835   procStat.pe_speed = m->pe_speed;
836   //procStat.utilization = 1.0;
837   procStat.available = CmiTrue;
838   procStat.n_objs = m->n_objs;
839
840   int &nobj = statsData->n_objs;
841   int &nmigobj = statsData->n_migrateobjs;
842   for (i=0; i<m->n_objs; i++) {
843       statsData->from_proc[nobj] = statsData->to_proc[nobj] = pe;
844       statsData->objData[nobj] = m->objData[i];
845       if (m->objData[i].migratable) nmigobj++;
846       nobj++;
847       CmiAssert(nobj <= statsData->objData.capacity());
848   }
849   int &n_comm = statsData->n_comm;
850   for (i=0; i<m->n_comm; i++) {
851       statsData->commData[n_comm] = m->commData[i];
852       n_comm++;
853       CmiAssert(n_comm <= statsData->commData.capacity());
854   }
855   delete m;
856 }
857
858 void CentralLB::ReceiveStats(CkMarshalledCLBStatsMessage &msg)
859 {
860 #if CMK_LBDB_ON
861   if (statsMsgsList == NULL) {
862     statsMsgsList = new CLBStatsMsg*[CkNumPes()];
863     CmiAssert(statsMsgsList != NULL);
864     for(int i=0; i < CkNumPes(); i++)
865       statsMsgsList[i] = 0;
866   }
867   if (statsData == NULL) statsData = new LDStats;
868
869     //  loop through all CLBStatsMsg in the incoming msg
870   int count = msg.getCount();
871   for (int num = 0; num < count; num++) 
872   {
873     CLBStatsMsg *m = msg.getMessage(num);
874     CmiAssert(m!=NULL);
875     const int pe = m->from_pe;
876     DEBUGF(("Stats msg received, %d %d %d %p step %d\n", pe,stats_msg_count,m->n_objs,m,step()));
877 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))     
878 /*      
879  *  if(m->step < step()){
880  *    //TODO: if a processor is redoing an old load balance step..
881  *    //tell it that the step is done and that it should not perform any migrations
882  *      thisProxy[pe].ReceiveDummyMigration();
883  *  }*/
884 #endif
885         
886     if(!CmiNodeAlive(pe)){
887         DEBUGF(("[%d] ReceiveStats called from invalidProcessor %d\n",CkMyPe(),pe));
888         continue;
889     }
890         
891     if (m->avail_vector!=NULL) {
892       LBDatabaseObj()->set_avail_vector(m->avail_vector,  m->next_lb);
893     }
894
895     if (statsMsgsList[pe] != 0) {
896       CkPrintf("*** Unexpected CLBStatsMsg in ReceiveStats from PE %d ***\n",
897              pe);
898     } else {
899       statsMsgsList[pe] = m;
900 #if USE_REDUCTION
901       depositData(m);
902 #else
903       // store per processor data right away
904       struct ProcStats &procStat = statsData->procs[pe];
905       procStat.pe = pe;
906       procStat.total_walltime = m->total_walltime;
907       procStat.idletime = m->idletime;
908       procStat.bg_walltime = m->bg_walltime;
909 #if CMK_LB_CPUTIMER
910       procStat.total_cputime = m->total_cputime;
911       procStat.bg_cputime = m->bg_cputime;
912 #endif
913       procStat.pe_speed = m->pe_speed;
914       //procStat.utilization = 1.0;
915       procStat.available = CmiTrue;
916       procStat.n_objs = m->n_objs;
917
918       statsData->n_objs += m->n_objs;
919       statsData->n_comm += m->n_comm;
920 #endif
921       stats_msg_count++;
922     }
923   }    // end of for
924
925   const int clients = CkNumValidPes();
926   DEBUGF(("THIS POINT count = %d, clients = %d\n",stats_msg_count,clients));
927  
928   if (stats_msg_count == clients) {
929         DEBUGF(("[%d] All stats messages received \n",CmiMyPe()));
930     statsData->nprocs() = stats_msg_count;
931     thisProxy[CkMyPe()].LoadBalance();
932   }
933 #endif
934 }
935
936 /** added by Abhinav for receiving msgs via spanning tree */
937 void CentralLB::ReceiveStatsViaTree(CkMarshalledCLBStatsMessage &msg)
938 {
939 #if CMK_LBDB_ON
940         CmiAssert(CkMyPe() != 0);
941         bufMsg.add(msg);         // buffer messages
942         count_msgs++;
943         //CkPrintf("here %d\n", CkMyPe());
944         if (count_msgs == st.numChildren+1) {
945                 if(st.parent == 0)
946                 {
947                         thisProxy[0].ReceiveStats(bufMsg);
948                         //CkPrintf("from %d\n", CkMyPe());
949                 }
950                 else
951                         thisProxy[st.parent].ReceiveStatsViaTree(bufMsg);
952                 count_msgs = 0;
953                 bufMsg.free();
954         } 
955 #endif
956 }
957
958 void CentralLB::LoadBalance()
959 {
960 #if CMK_LBDB_ON
961   int proc;
962   const int clients = CkNumPes();
963
964 #if ! USE_REDUCTION
965   // build data
966   buildStats();
967 #else
968   for (proc = 0; proc < clients; proc++) statsMsgsList[proc] = NULL;
969 #endif
970
971   if (!_lb_args.samePeSpeed()) statsData->normalize_speed();
972
973   if (_lb_args.debug()) 
974       CmiPrintf("\nCharmLB> %s: PE [%d] step %d starting at %f Memory: %f MB\n",
975                   lbname, cur_ld_balancer, step(), start_lb_time,
976                   CmiMemoryUsage()/(1024.0*1024.0));
977
978   // if we are in simulation mode read data
979   if (LBSimulation::doSimulation) simulationRead();
980
981   char *availVector = LBDatabaseObj()->availVector();
982   for(proc = 0; proc < clients; proc++)
983       statsData->procs[proc].available = (CmiBool)availVector[proc];
984
985   preprocess(statsData);
986
987 //    CkPrintf("Before Calling Strategy\n");
988
989   if (_lb_args.printSummary()) {
990       LBInfo info(clients);
991         // not take comm data
992       info.getInfo(statsData, clients, 0);
993       LBRealType mLoad, mCpuLoad, totalLoad;
994       info.getSummary(mLoad, mCpuLoad, totalLoad);
995       int nmsgs, nbytes;
996       statsData->computeNonlocalComm(nmsgs, nbytes);
997       CkPrintf("[%d] Load Summary (before LB): max (with bg load): %f max (obj only): %f average: %f at step %d nonlocal: %d msgs %.2fKB.\n", CkMyPe(), mLoad, mCpuLoad, totalLoad/clients, step(), nmsgs, 1.0*nbytes/1024);
998 //      if (_lb_args.debug() > 1) {
999 //        for (int i=0; i<statsData->n_objs; i++)
1000 //          CmiPrintf("[%d] %.10f %.10f\n", i, statsData->objData[i].minWall, statsData->objData[i].maxWall);
1001 //      }
1002   }
1003
1004 #if CMK_REPLAYSYSTEM
1005   LDHandle *loadBalancer_pointers;
1006   if (_replaySystem) {
1007     loadBalancer_pointers = (LDHandle*)malloc(CkNumPes()*sizeof(LDHandle));
1008     for (int i=0; i<statsData->n_objs; ++i) loadBalancer_pointers[statsData->from_proc[i]] = statsData->objData[i].handle.omhandle.ldb;
1009   }
1010 #endif
1011   
1012   LBMigrateMsg* migrateMsg = Strategy(statsData);
1013 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1014         migrateMsg->step = step();
1015 #endif
1016
1017 #if CMK_REPLAYSYSTEM
1018   CpdHandleLBMessage(&migrateMsg);
1019   if (_replaySystem) {
1020     for (int i=0; i<migrateMsg->n_moves; ++i) migrateMsg->moves[i].obj.omhandle.ldb = loadBalancer_pointers[migrateMsg->moves[i].from_pe];
1021     free(loadBalancer_pointers);
1022   }
1023 #endif
1024   
1025   LBDatabaseObj()->get_avail_vector(migrateMsg->avail_vector);
1026   migrateMsg->next_lb = LBDatabaseObj()->new_lbbalancer();
1027
1028   // if this is the step at which we need to dump the database
1029   simulationWrite();
1030
1031 //  calculate predicted load
1032 //  very time consuming though, so only happen when debugging is on
1033   if (_lb_args.printSummary()) {
1034       LBInfo info(clients);
1035         // not take comm data
1036       getPredictedLoadWithMsg(statsData, clients, migrateMsg, info, 0);
1037       LBRealType mLoad, mCpuLoad, totalLoad;
1038       info.getSummary(mLoad, mCpuLoad, totalLoad);
1039       int nmsgs, nbytes;
1040       statsData->computeNonlocalComm(nmsgs, nbytes);
1041       CkPrintf("[%d] Load Summary (after LB): max (with bg load): %f max (obj only): %f average: %f at step %d nonlocal: %d msgs %.2fKB useMem: %.2fKB.\n", CkMyPe(), mLoad, mCpuLoad, totalLoad/clients, step(), nmsgs, 1.0*nbytes/1024, (1.0*useMem())/1024);
1042       for (int i=0; i<clients; i++)
1043         migrateMsg->expectedLoad[i] = info.peLoads[i];
1044   }
1045
1046   DEBUGF(("[%d]calling recv migration\n",CkMyPe()));
1047 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_)) 
1048     lbDecisionCount++;
1049     migrateMsg->lbDecisionCount = lbDecisionCount;
1050 #endif
1051
1052   envelope *env = UsrToEnv(migrateMsg);
1053   if (1) {
1054       // broadcast
1055     thisProxy.ReceiveMigration(migrateMsg);
1056   }
1057   else {
1058     // split the migration for each processor
1059     for (int p=0; p<CkNumPes(); p++) {
1060       LBMigrateMsg *m = extractMigrateMsg(migrateMsg, p);
1061       thisProxy[p].ReceiveMigration(m);
1062     }
1063     delete migrateMsg;
1064   }
1065
1066   // Zero out data structures for next cycle
1067   // CkPrintf("zeroing out data\n");
1068   statsData->clear();
1069   stats_msg_count=0;
1070 #endif
1071 }
1072
1073 // test if sender and receiver in a commData is nonmigratable.
1074 static int isMigratable(LDObjData **objData, int *len, int count, const LDCommData &commData)
1075 {
1076 #if CMK_LBDB_ON
1077   for (int pe=0 ; pe<count; pe++)
1078   {
1079     for (int i=0; i<len[pe]; i++)
1080       if (LDObjIDEqual(objData[pe][i].objID(), commData.sender.objID()) ||
1081           LDObjIDEqual(objData[pe][i].objID(), commData.receiver.get_destObj().objID())) 
1082       return 0;
1083   }
1084 #endif
1085   return 1;
1086 }
1087
1088 // rebuild LDStats and remove all non-migratble objects and related things
1089 void CentralLB::removeNonMigratable(LDStats* stats, int count)
1090 {
1091   int i;
1092
1093   // check if we have non-migratable objects
1094   int have = 0;
1095   for (i=0; i<stats->n_objs; i++) 
1096   {
1097     LDObjData &odata = stats->objData[i];
1098     if (!odata.migratable) {
1099       have = 1; break;
1100     }
1101   }
1102   if (have == 0) return;
1103
1104   CkVec<LDObjData> nonmig;
1105   CkVec<int> new_from_proc, new_to_proc;
1106   nonmig.resize(stats->n_migrateobjs);
1107   new_from_proc.resize(stats->n_migrateobjs);
1108   new_to_proc.resize(stats->n_migrateobjs);
1109   int n_objs = 0;
1110   for (i=0; i<stats->n_objs; i++) 
1111   {
1112     LDObjData &odata = stats->objData[i];
1113     if (odata.migratable) {
1114       nonmig[n_objs] = odata;
1115       new_from_proc[n_objs] = stats->from_proc[i];
1116       new_to_proc[n_objs] = stats->to_proc[i];
1117       n_objs ++;
1118     }
1119     else {
1120       stats->procs[stats->from_proc[i]].bg_walltime += odata.wallTime;
1121 #if CMK_LB_CPUTIMER
1122       stats->procs[stats->from_proc[i]].bg_cputime += odata.cpuTime;
1123 #endif
1124     }
1125   }
1126   CmiAssert(stats->n_migrateobjs == n_objs);
1127
1128   stats->makeCommHash();
1129   
1130   CkVec<LDCommData> newCommData;
1131   newCommData.resize(stats->n_comm);
1132   int n_comm = 0;
1133   for (i=0; i<stats->n_comm; i++) 
1134   {
1135     LDCommData& cdata = stats->commData[i];
1136     if (!cdata.from_proc()) 
1137     {
1138       int idx = stats->getSendHash(cdata);
1139       CmiAssert(idx != -1);
1140       if (!stats->objData[idx].migratable) continue;
1141     }
1142     switch (cdata.receiver.get_type()) {
1143     case LD_PROC_MSG:
1144       break;
1145     case LD_OBJ_MSG:  {
1146       int idx = stats->getRecvHash(cdata);
1147       if (stats->complete_flag)
1148         CmiAssert(idx != -1);
1149       else if (idx == -1) continue;          // receiver not in this group
1150       if (!stats->objData[idx].migratable) continue;
1151       break;
1152       }
1153     case LD_OBJLIST_MSG:    // object message FIXME add multicast
1154       break;
1155     }
1156     newCommData[n_comm] = cdata;
1157     n_comm ++;
1158   }
1159
1160   if (n_objs != stats->n_objs) CmiPrintf("Removed %d nonmigratable %d comms - n_objs:%d migratable:%d\n", stats->n_objs-n_objs, stats->n_objs, stats->n_migrateobjs, stats->n_comm-n_comm);
1161
1162   // swap to new data
1163   stats->objData = nonmig;
1164   stats->from_proc = new_from_proc;
1165   stats->to_proc = new_to_proc;
1166   stats->n_objs = n_objs;
1167
1168   stats->commData = newCommData;
1169   stats->n_comm = n_comm;
1170
1171   stats->deleteCommHash();
1172   stats->makeCommHash();
1173
1174 }
1175
1176
1177 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1178 extern int restarted;
1179 #endif
1180
1181 void CentralLB::ReceiveMigration(LBMigrateMsg *m)
1182 {
1183   storedMigrateMsg = m;
1184   CkCallback cb(CkIndex_CentralLB::ProcessReceiveMigration((CkReductionMsg*)NULL),
1185                   thisProxy);
1186   contribute(0, NULL, CkReduction::max_int, cb);
1187
1188   // Reset all adaptive lb related fields since load balancing is being done.
1189   adaptive_struct.lb_no_iterations = -1;
1190   adaptive_lbdb.history_data.clear();
1191   adaptive_struct.prev_load = 0.0;
1192   local_state = OFF;
1193   adaptive_struct.lb_period_informed = false;
1194   adaptive_struct.lb_ideal_period = INT_MAX;
1195   adaptive_struct.lb_calculated_period = INT_MAX;
1196   adaptive_struct.lb_msg_send_no = 0;
1197   adaptive_struct.lb_msg_recv_no = 0;
1198 }
1199
1200 void CentralLB::ProcessReceiveMigration(CkReductionMsg  *msg)
1201 {
1202 #if CMK_LBDB_ON
1203         int i;
1204         LBMigrateMsg *m = storedMigrateMsg;
1205         CmiAssert(m!=NULL);
1206         delete msg;
1207
1208 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1209         int *dummyCounts;
1210
1211         DEBUGF(("[%d] Starting ReceiveMigration WITH step %d m->step %d\n",CkMyPe(),step(),m->step));
1212         // CmiPrintf("[%d] Starting ReceiveMigration step %d m->step %d\n",CkMyPe(),step(),m->step);
1213         if(step() > m->step){
1214                 char str[100];
1215                 envelope *env = UsrToEnv(m);
1216                 CmiPrintf("[%d] Object %s tProcessed %d m->TN %d\n",CmiMyPe(),mlogData->objID.toString(str),mlogData->tProcessed,env->TN);
1217                 return;
1218         }
1219         lbDecisionCount = m->lbDecisionCount;
1220 #endif
1221
1222   if (_lb_args.debug() > 1) 
1223     if (CkMyPe()%1024==0) CmiPrintf("[%d] Starting ReceiveMigration step %d at %f\n",CkMyPe(),step(), CmiWallTimer());
1224
1225   for (i=0; i<CkNumPes(); i++) theLbdb->lastLBInfo.expectedLoad[i] = m->expectedLoad[i];
1226   CmiAssert(migrates_expected <= 0 || migrates_completed == migrates_expected);
1227 /*FAULT_EVAC*/
1228   if(!CmiNodeAlive(CkMyPe())){
1229         delete m;
1230         return;
1231   }
1232   migrates_expected = 0;
1233   future_migrates_expected = 0;
1234 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1235         int sending=0;
1236     int dummy=0;
1237         LBDB *_myLBDB = theLbdb->getLBDB();
1238         if(_restartFlag){
1239         dummyCounts = new int[CmiNumPes()];
1240         bzero(dummyCounts,sizeof(int)*CmiNumPes());
1241     }
1242 #endif
1243   for(i=0; i < m->n_moves; i++) {
1244     MigrateInfo& move = m->moves[i];
1245     const int me = CkMyPe();
1246     if (move.from_pe == me && move.to_pe != me) {
1247       DEBUGF(("[%d] migrating object to %d\n",move.from_pe,move.to_pe));
1248       // migrate object, in case it is already gone, inform toPe
1249 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
1250       if (theLbdb->Migrate(move.obj,move.to_pe) == 0) 
1251          thisProxy[move.to_pe].MissMigrate(!move.async_arrival);
1252 #else
1253             if(_restartFlag == 0){
1254                 DEBUG(CmiPrintf("[%d] need to move object from %d to %d \n",CkMyPe(),move.from_pe,move.to_pe));
1255                 theLbdb->Migrate(move.obj,move.to_pe);
1256                 sending++;
1257             }else{
1258                 if(_myLBDB->validObjHandle(move.obj)){
1259                     DEBUG(CmiPrintf("[%d] need to move object from %d to %d \n",CkMyPe(),move.from_pe,move.to_pe));
1260                     theLbdb->Migrate(move.obj,move.to_pe);
1261                     sending++;
1262                 }else{
1263                     DEBUG(CmiPrintf("[%d] dummy move to pe %d detected after restart \n",CmiMyPe(),move.to_pe));
1264                     dummyCounts[move.to_pe]++;
1265                     dummy++;
1266                 }
1267             }
1268 #endif
1269     } else if (move.from_pe != me && move.to_pe == me) {
1270        DEBUGF(("[%d] expecting object from %d\n",move.to_pe,move.from_pe));
1271       if (!move.async_arrival) migrates_expected++;
1272       else future_migrates_expected++;
1273     }
1274   }
1275   DEBUGF(("[%d] in ReceiveMigration %d moves expected: %d future expected: %d\n",CkMyPe(),m->n_moves, migrates_expected, future_migrates_expected));
1276   // if (_lb_debug) CkPrintf("[%d] expecting %d objects migrating.\n", CkMyPe(), migrates_expected);
1277
1278 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1279         if(_restartFlag){
1280                 sendDummyMigrationCounts(dummyCounts);
1281                 _restartFlag  =0;
1282         delete []dummyCounts;
1283         }
1284 #endif
1285
1286
1287 #if 0
1288   if (m->n_moves ==0) {
1289     theLbdb->SetLBPeriod(theLbdb->GetLBPeriod()*2);
1290   }
1291 #endif
1292   cur_ld_balancer = m->next_lb;
1293   if((CkMyPe() == cur_ld_balancer) && (cur_ld_balancer != 0)){
1294       LBDatabaseObj()->set_avail_vector(m->avail_vector, -2);
1295   }
1296
1297   if (migrates_expected == 0 || migrates_completed == migrates_expected)
1298     MigrationDone(1);
1299   delete m;
1300
1301 //      CkEvacuatedElement();
1302 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1303 //  migrates_expected = 0;
1304 //  //  ResumeClients(1);
1305 #endif
1306 #endif
1307 }
1308
1309 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1310 void CentralLB::ReceiveDummyMigration(int globalDecisionCount){
1311     DEBUGF(("[%d] ReceiveDummyMigration called for step %d with globalDecisionCount %d\n",CkMyPe(),step(),globalDecisionCount));
1312     //TODO: this is gonna be important when a crash happens during checkpoint
1313     //the globalDecisionCount would have to be saved and compared against
1314     //a future recvMigration
1315                 
1316         thisProxy[CkMyPe()].ResumeClients(1);
1317 }
1318 #endif
1319
1320 void CentralLB::MigrationDone(int balancing)
1321 {
1322 #if CMK_LBDB_ON
1323   migrates_completed = 0;
1324   migrates_expected = -1;
1325   // clear load stats
1326   if (balancing) theLbdb->ClearLoads();
1327   // Increment to next step
1328   theLbdb->incStep();
1329         DEBUGF(("[%d] Incrementing Step %d \n",CkMyPe(),step()));
1330   // if sync resume, invoke a barrier
1331
1332 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1333     savedBalancing = balancing;
1334     startLoadBalancingMlog(&resumeCentralLbAfterChkpt,(void *)this);
1335 #endif
1336
1337   LBDatabase::Object()->MigrationDone();    // call registered callbacks
1338
1339   LoadbalanceDone(balancing);        // callback
1340 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
1341   // if sync resume invoke a barrier
1342   if (balancing && _lb_args.syncResume()) {
1343     CkCallback cb(CkIndex_CentralLB::ResumeClients((CkReductionMsg*)NULL), 
1344                   thisProxy);
1345     contribute(0, NULL, CkReduction::sum_int, cb);
1346   }
1347   else{ 
1348     if(CmiNodeAlive(CkMyPe())){
1349         thisProxy [CkMyPe()].ResumeClients(balancing);
1350     }   
1351   }     
1352 #if CMK_GRID_QUEUE_AVAILABLE
1353   CmiGridQueueDeregisterAll ();
1354   CpvAccess(CkGridObject) = NULL;
1355 #endif
1356 #endif 
1357 #endif
1358 }
1359
1360 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1361 void CentralLB::endMigrationDone(int balancing){
1362     DEBUGF(("[%d] CentralLB::endMigrationDone step %d\n",CkMyPe(),step()));
1363
1364
1365   if (balancing && _lb_args.syncResume()) {
1366     CkCallback cb(CkIndex_CentralLB::ResumeClients((CkReductionMsg*)NULL),
1367                   thisProxy);
1368     contribute(0, NULL, CkReduction::sum_int, cb);
1369   }
1370   else{
1371     if(CmiNodeAlive(CkMyPe())){
1372     DEBUGF(("[%d] Sending ResumeClients balancing %d \n",CkMyPe(),balancing));
1373     thisProxy [CkMyPe()].ResumeClients(balancing);
1374     }
1375   }
1376
1377 }
1378 #endif
1379
1380 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1381 void resumeCentralLbAfterChkpt(void *_lb){
1382     CentralLB *lb= (CentralLB *)_lb;
1383     CpvAccess(_currentObj)=lb;
1384     lb->endMigrationDone(lb->savedBalancing);
1385 }
1386 #endif
1387
1388
1389 void CentralLB::ResumeClients(CkReductionMsg *msg)
1390 {
1391   ResumeClients(1);
1392   delete msg;
1393 }
1394
1395 void CentralLB::ResumeClients(int balancing)
1396 {
1397 #if CMK_LBDB_ON
1398 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1399     resumeCount++;
1400     globalResumeCount = resumeCount;
1401 #endif
1402   DEBUGF(("[%d] Resuming clients. balancing:%d.\n",CkMyPe(),balancing));
1403   if (balancing && _lb_args.debug() && CkMyPe() == cur_ld_balancer) {
1404     double end_lb_time = CkWallTimer();
1405   }
1406
1407 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
1408   if (balancing) ComlibNotifyMigrationDone();  
1409 #endif
1410
1411   theLbdb->ResumeClients();
1412   if (balancing)  {
1413
1414     CheckMigrationComplete();
1415     if (future_migrates_expected == 0 || 
1416             future_migrates_expected == future_migrates_completed) {
1417       CheckMigrationComplete();
1418     }
1419   }
1420 #endif
1421 }
1422
1423 /*
1424   migration of objects contains two different kinds:
1425   (1) objects want to make a barrier for migration completion
1426       (waitForBarrier is true)
1427       migrationDone() to finish and resumeClients
1428   (2) objects don't need a barrier
1429   However, next load balancing can only happen when both migrations complete
1430 */ 
1431 void CentralLB::CheckMigrationComplete()
1432 {
1433 #if CMK_LBDB_ON
1434   lbdone ++;
1435   if (lbdone == 2) {
1436     if (_lb_args.debug() && CkMyPe()==0) {
1437       double end_lb_time = CkWallTimer();
1438       CkPrintf("CharmLB> %s: PE [%d] step %d finished at %f duration %f s\n\n",
1439                 lbname, cur_ld_balancer, step()-1, end_lb_time,
1440                 end_lb_time-start_lb_time);
1441     }
1442
1443     adaptive_struct.lb_migration_cost = (CkWallTimer() - start_lb_time);
1444
1445     lbdone = 0;
1446     future_migrates_expected = -1;
1447     future_migrates_completed = 0;
1448
1449
1450     DEBUGF(("[%d] Migration Complete\n", CkMyPe()));
1451     // release local barrier  so that the next load balancer can go
1452     LDOMHandle h;
1453     h.id.id.idx = 0;
1454     theLbdb->getLBDB()->DoneRegisteringObjects(h);
1455     // switch to the next load balancer in the list
1456     // subtle: called from Migrated() may result in Migrated() called in next LB
1457     theLbdb->nextLoadbalancer(seqno);
1458   }
1459 #endif
1460 }
1461
1462 void CentralLB::preprocess(LDStats* stats)
1463 {
1464   if (_lb_args.ignoreBgLoad())
1465     stats->clearBgLoad();
1466
1467   // Call the predictor for the future
1468   if (_lb_predict) FuturePredictor(statsData);
1469 }
1470
1471 // default load balancing strategy
1472 LBMigrateMsg* CentralLB::Strategy(LDStats* stats)
1473 {
1474 #if CMK_LBDB_ON
1475   double strat_start_time = CkWallTimer();
1476   if (_lb_args.debug())
1477     CkPrintf("CharmLB> %s: PE [%d] strategy starting at %f\n", lbname, cur_ld_balancer, strat_start_time);
1478
1479   work(stats);
1480
1481   if (_lb_args.debug()>2)  {
1482     CkPrintf("CharmLB> Obj Map:\n");
1483     for (int i=0; i<stats->n_objs; i++) CkPrintf("%d ", stats->to_proc[i]);
1484     CkPrintf("\n");
1485   }
1486
1487   LBMigrateMsg *msg = createMigrateMsg(stats);
1488
1489   if (_lb_args.debug()) {
1490     double strat_end_time = CkWallTimer();
1491     envelope *env = UsrToEnv(msg);
1492
1493     double lbdbMemsize = LBDatabase::Object()->useMem()/1000;
1494     CkPrintf("CharmLB> %s: PE [%d] Memory: LBManager: %d KB CentralLB: %d KB\n",
1495               lbname, cur_ld_balancer, (int)lbdbMemsize, (int)(useMem()/1000));
1496     CkPrintf("CharmLB> %s: PE [%d] #Objects migrating: %d, LBMigrateMsg size: %.2f MB\n", lbname, cur_ld_balancer, msg->n_moves, env->getTotalsize()/1024.0/1024.0);
1497     CkPrintf("CharmLB> %s: PE [%d] strategy finished at %f duration %f s\n",
1498               lbname, cur_ld_balancer, strat_end_time, strat_end_time-strat_start_time);
1499     adaptive_struct.lb_strategy_cost = (strat_end_time - strat_start_time);
1500     CkPrintf("Strategy cost %f %f %f\n", strat_end_time, strat_start_time, adaptive_struct.lb_strategy_cost);
1501   }
1502   return msg;
1503 #else
1504   return NULL;
1505 #endif
1506 }
1507
1508 void CentralLB::work(LDStats* stats)
1509 {
1510   // does nothing but print the database
1511   stats->print();
1512 }
1513
1514 // generate migrate message from stats->from_proc and to_proc
1515 LBMigrateMsg * CentralLB::createMigrateMsg(LDStats* stats)
1516 {
1517   int i;
1518   CkVec<MigrateInfo*> migrateInfo;
1519   for (i=0; i<stats->n_objs; i++) {
1520     LDObjData &objData = stats->objData[i];
1521     int frompe = stats->from_proc[i];
1522     int tope = stats->to_proc[i];
1523     if (frompe != tope) {
1524       //      CkPrintf("[%d] Obj %d migrating from %d to %d\n",
1525       //         CkMyPe(),obj,pe,dest);
1526       MigrateInfo *migrateMe = new MigrateInfo;
1527       migrateMe->obj = objData.handle;
1528       migrateMe->from_pe = frompe;
1529       migrateMe->to_pe = tope;
1530       migrateMe->async_arrival = objData.asyncArrival;
1531       migrateInfo.insertAtEnd(migrateMe);
1532     }
1533   }
1534
1535   int migrate_count=migrateInfo.length();
1536   LBMigrateMsg* msg = new(migrate_count,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
1537   msg->n_moves = migrate_count;
1538   for(i=0; i < migrate_count; i++) {
1539     MigrateInfo* item = (MigrateInfo*) migrateInfo[i];
1540     msg->moves[i] = *item;
1541     delete item;
1542     migrateInfo[i] = 0;
1543   }
1544   return msg;
1545 }
1546
1547 LBMigrateMsg * CentralLB::extractMigrateMsg(LBMigrateMsg *m, int p)
1548 {
1549   int nmoves = 0;
1550   int nunavail = 0;
1551   int i;
1552   for (i=0; i<m->n_moves; i++) {
1553     MigrateInfo* item = (MigrateInfo*) &m->moves[i];
1554     if (item->from_pe == p || item->to_pe == p) nmoves++;
1555   }
1556   for (i=0; i<CkNumPes();i++) {
1557     if (!m->avail_vector[i]) nunavail++;
1558   }
1559   LBMigrateMsg* msg;
1560   if (nunavail) msg = new(nmoves,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
1561   else msg = new(nmoves,0,0,0) LBMigrateMsg;
1562   msg->n_moves = nmoves;
1563   msg->level = m->level;
1564   msg->next_lb = m->next_lb;
1565   for (i=0,nmoves=0; i<m->n_moves; i++) {
1566     MigrateInfo* item = (MigrateInfo*) &m->moves[i];
1567     if (item->from_pe == p || item->to_pe == p) {
1568       msg->moves[nmoves] = *item;
1569       nmoves++;
1570     }
1571   }
1572   // copy processor data
1573   if (nunavail)
1574   for (i=0; i<CkNumPes();i++) {
1575     msg->avail_vector[i] = m->avail_vector[i];
1576     msg->expectedLoad[i] = m->expectedLoad[i];
1577   }
1578   return msg;
1579 }
1580
1581 void CentralLB::simulationWrite() {
1582   if(step() == LBSimulation::dumpStep)
1583   {
1584     // here we are supposed to dump the database
1585     int dumpFileSize = strlen(LBSimulation::dumpFile) + 4;
1586     char *dumpFileName = (char *)malloc(dumpFileSize);
1587     while (sprintf(dumpFileName, "%s.%d", LBSimulation::dumpFile, LBSimulation::dumpStep) >= dumpFileSize) {
1588       free(dumpFileName);
1589       dumpFileSize+=3;
1590       dumpFileName = (char *)malloc(dumpFileSize);
1591     }
1592     writeStatsMsgs(dumpFileName);
1593     free(dumpFileName);
1594     CmiPrintf("LBDump: Dumped the load balancing data at step %d.\n",LBSimulation::dumpStep);
1595     ++LBSimulation::dumpStep;
1596     --LBSimulation::dumpStepSize;
1597     if (LBSimulation::dumpStepSize <= 0) { // prevent stupid step sizes
1598       CmiPrintf("Charm++> Exiting...\n");
1599       CkExit();
1600     }
1601     return;
1602   }
1603 }
1604
1605 void CentralLB::simulationRead() {
1606   LBSimulation *simResults = NULL, *realResults;
1607   LBMigrateMsg *voidMessage = new (0,0,0,0) LBMigrateMsg();
1608   voidMessage->n_moves=0;
1609   for ( ;LBSimulation::simStepSize > 0; --LBSimulation::simStepSize, ++LBSimulation::simStep) {
1610     // here we are supposed to read the data from the dump database
1611     int simFileSize = strlen(LBSimulation::dumpFile) + 4;
1612     char *simFileName = (char *)malloc(simFileSize);
1613     while (sprintf(simFileName, "%s.%d", LBSimulation::dumpFile, LBSimulation::simStep) >= simFileSize) {
1614       free(simFileName);
1615       simFileSize+=3;
1616       simFileName = (char *)malloc(simFileSize);
1617     }
1618     readStatsMsgs(simFileName);
1619
1620     // allocate simResults (only the first step)
1621     if (simResults == NULL) {
1622       simResults = new LBSimulation(LBSimulation::simProcs);
1623       realResults = new LBSimulation(LBSimulation::simProcs);
1624     }
1625     else {
1626       // should be the same number of procs of the original simulation!
1627       if (!LBSimulation::procsChanged) {
1628         // it means we have a previous step, so in simResults there is data.
1629         // we can now print the real effects of the load balancer during the simulation
1630         // or print the difference between the predicted data and the real one.
1631         realResults->reset();
1632         // reset to_proc of statsData to be equal to from_proc
1633         for (int k=0; k < statsData->n_objs; ++k) statsData->to_proc[k] = statsData->from_proc[k];
1634         findSimResults(statsData, LBSimulation::simProcs, voidMessage, realResults);
1635         simResults->PrintDifferences(realResults,statsData);
1636       }
1637       simResults->reset();
1638     }
1639
1640     // now pass it to the strategy routine
1641     double startT = CkWallTimer();
1642     preprocess(statsData);
1643     CmiPrintf("%s> Strategy starts ... \n", lbname);
1644     LBMigrateMsg* migrateMsg = Strategy(statsData);
1645     CmiPrintf("%s> Strategy took %fs memory usage: CentralLB: %d KB.\n",
1646                lbname, CkWallTimer()-startT, (int)(useMem()/1000));
1647
1648     // now calculate the results of the load balancing simulation
1649     findSimResults(statsData, LBSimulation::simProcs, migrateMsg, simResults);
1650
1651     // now we have the simulation data, so print it and loop
1652     CmiPrintf("Charm++> LBSim: Simulation of load balancing step %d done.\n",LBSimulation::simStep);
1653     // **CWL** Officially recording my disdain here for using ints for bool
1654     if (LBSimulation::showDecisionsOnly) {
1655       simResults->PrintDecisions(migrateMsg, simFileName, 
1656                                  LBSimulation::simProcs);
1657     } else {
1658       simResults->PrintSimulationResults();
1659     }
1660
1661     free(simFileName);
1662     delete migrateMsg;
1663     CmiPrintf("Charm++> LBSim: Passing to the next step\n");
1664   }
1665   // deallocate simResults
1666   delete simResults;
1667   CmiPrintf("Charm++> Exiting...\n");
1668   CkExit();
1669 }
1670
1671 void CentralLB::readStatsMsgs(const char* filename) 
1672 {
1673 #if CMK_LBDB_ON
1674   int i;
1675   FILE *f = fopen(filename, "r");
1676   if (f==NULL) {
1677     CmiPrintf("Fatal Error> Cannot open LB Dump file %s!\n", filename);
1678     CmiAbort("");
1679   }
1680
1681   // at this stage, we need to rebuild the statsMsgList and
1682   // statsDataList structures. For that first deallocate the
1683   // old structures
1684   if (statsMsgsList) {
1685     for(i = 0; i < stats_msg_count; i++)
1686       delete statsMsgsList[i];
1687     delete[] statsMsgsList;
1688     statsMsgsList=0;
1689   }
1690
1691   PUP::fromDisk pd(f);
1692   PUP::machineInfo machInfo;
1693
1694   pd((char *)&machInfo, sizeof(machInfo));      // read machine info
1695   PUP::xlater p(machInfo, pd);
1696
1697   if (_lb_args.lbversion() > 1) {
1698     p|_lb_args.lbversion();             // write version number
1699     CkPrintf("LB> File version detected: %d\n", _lb_args.lbversion());
1700     CmiAssert(_lb_args.lbversion() <= LB_FORMAT_VERSION);
1701   }
1702   p|stats_msg_count;
1703
1704   CmiPrintf("readStatsMsgs for %d pes starts ... \n", stats_msg_count);
1705   if (LBSimulation::simProcs == 0) LBSimulation::simProcs = stats_msg_count;
1706   if (LBSimulation::simProcs != stats_msg_count) LBSimulation::procsChanged = true;
1707
1708   // LBSimulation::simProcs must be set
1709   statsData->pup(p);
1710
1711   CmiPrintf("Simulation for %d pes \n", LBSimulation::simProcs);
1712   CmiPrintf("n_obj: %d n_migratble: %d \n", statsData->n_objs, statsData->n_migrateobjs);
1713
1714   // file f is closed in the destructor of PUP::fromDisk
1715   CmiPrintf("ReadStatsMsg from %s completed\n", filename);
1716 #endif
1717 }
1718
1719 void CentralLB::writeStatsMsgs(const char* filename) 
1720 {
1721 #if CMK_LBDB_ON
1722   FILE *f = fopen(filename, "w");
1723   if (f==NULL) {
1724     CmiPrintf("Fatal Error> writeStatsMsgs failed to open the output file %s!\n", filename);
1725     CmiAbort("");
1726   }
1727
1728   const PUP::machineInfo &machInfo = PUP::machineInfo::current();
1729   PUP::toDisk p(f);
1730   p((char *)&machInfo, sizeof(machInfo));       // machine info
1731
1732   p|_lb_args.lbversion();               // write version number
1733   p|stats_msg_count;
1734   statsData->pup(p);
1735
1736   fclose(f);
1737
1738   CmiPrintf("WriteStatsMsgs to %s succeed!\n", filename);
1739 #endif
1740 }
1741
1742 // calculate the predicted wallclock/cpu load for every processors
1743 // considering communication overhead if considerComm is true
1744 void getPredictedLoadWithMsg(BaseLB::LDStats* stats, int count, 
1745                       LBMigrateMsg *msg, LBInfo &info, 
1746                       int considerComm)
1747 {
1748 #if CMK_LBDB_ON
1749         stats->makeCommHash();
1750
1751         // update to_proc according to migration msgs
1752         for(int i = 0; i < msg->n_moves; i++) {
1753           MigrateInfo &mInfo = msg->moves[i];
1754           int idx = stats->getHash(mInfo.obj.objID(), mInfo.obj.omID());
1755           CmiAssert(idx != -1);
1756           stats->to_proc[idx] = mInfo.to_pe;
1757         }
1758
1759         info.getInfo(stats, count, considerComm);
1760 #endif
1761 }
1762
1763
1764 void CentralLB::findSimResults(LDStats* stats, int count, LBMigrateMsg* msg, LBSimulation* simResults)
1765 {
1766     CkAssert(simResults != NULL && count == simResults->numPes);
1767     // estimate the new loads of the processors. As a first approximation, this is the
1768     // sum of the cpu times of the objects on that processor
1769     double startT = CkWallTimer();
1770     getPredictedLoadWithMsg(stats, count, msg, simResults->lbinfo, 1);
1771     CmiPrintf("getPredictedLoad finished in %fs\n", CkWallTimer()-startT);
1772 }
1773
1774 void CentralLB::pup(PUP::er &p) { 
1775   BaseLB::pup(p); 
1776   if (p.isUnpacking())  {
1777     initLB(CkLBOptions(seqno)); 
1778   }
1779   p|reduction_started;
1780   int has_statsMsg=0;
1781   if (p.isPacking()) has_statsMsg = (statsMsg!=NULL);
1782   p|has_statsMsg;
1783   if (has_statsMsg) {
1784     if (p.isUnpacking())
1785       statsMsg = new CLBStatsMsg;
1786     statsMsg->pup(p);
1787   }
1788 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1789   p | lbDecisionCount;
1790   p | resumeCount;
1791 #endif
1792         
1793 }
1794
1795 int CentralLB::useMem() { 
1796   return sizeof(CentralLB) + statsData->useMem() + 
1797          CkNumPes() * sizeof(CLBStatsMsg *);
1798 }
1799
1800
1801 /**
1802   CLBStatsMsg is not a real message now.
1803   CLBStatsMsg is used for all processors to fill in their local load and comm
1804   statistics and send to processor 0
1805 */
1806
1807 CLBStatsMsg::CLBStatsMsg(int osz, int csz) {
1808   n_objs = osz;
1809   n_comm = csz;
1810   objData = new LDObjData[osz];
1811   commData = new LDCommData[csz];
1812   avail_vector = NULL;
1813 }
1814
1815 CLBStatsMsg::~CLBStatsMsg() {
1816   delete [] objData;
1817   delete [] commData;
1818   delete [] avail_vector;
1819 }
1820
1821 void CLBStatsMsg::pup(PUP::er &p) {
1822   int i;
1823   p|from_pe;
1824   p|pe_speed;
1825   p|total_walltime;
1826   p|idletime;
1827   p|bg_walltime;
1828 #if CMK_LB_CPUTIMER
1829   p|total_cputime;
1830   p|bg_cputime;
1831 #endif
1832 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1833   p | step;
1834 #endif
1835   p|n_objs;
1836   if (p.isUnpacking()) objData = new LDObjData[n_objs];
1837   for (i=0; i<n_objs; i++) p|objData[i];
1838   p|n_comm;
1839   if (p.isUnpacking()) commData = new LDCommData[n_comm];
1840   for (i=0; i<n_comm; i++) p|commData[i];
1841
1842   int has_avail_vector;
1843   if (!p.isUnpacking()) has_avail_vector = (avail_vector != NULL);
1844   p|has_avail_vector;
1845   if (p.isUnpacking()) {
1846     if (has_avail_vector) avail_vector = new char[CkNumPes()];
1847     else avail_vector = NULL;
1848   }
1849   if (has_avail_vector) p(avail_vector, CkNumPes());
1850
1851   p(next_lb);
1852 }
1853
1854 // CkMarshalledCLBStatsMessage is used in the marshalled parameter in
1855 // the entry function, it is just used to use to pup.
1856 // I don't use CLBStatsMsg directly as marshalled parameter because
1857 // I want the data pointer stored and not to be freed by the Charm++.
1858 void CkMarshalledCLBStatsMessage::free() { 
1859   int count = msgs.size();
1860   for  (int i=0; i<count; i++) {
1861     delete msgs[i];
1862     msgs[i] = NULL;
1863   }
1864   msgs.free();
1865 }
1866
1867 void CkMarshalledCLBStatsMessage::add(CkMarshalledCLBStatsMessage &m)
1868 {
1869   int count = m.getCount();
1870   for (int i=0; i<count; i++) add(m.getMessage(i));
1871 }
1872
1873 void CkMarshalledCLBStatsMessage::pup(PUP::er &p)
1874 {
1875   int count = msgs.size();
1876   p|count;
1877   for (int i=0; i<count; i++) {
1878     CLBStatsMsg *msg;
1879     if (p.isUnpacking()) msg = new CLBStatsMsg;
1880     else { 
1881       msg = msgs[i]; CmiAssert(msg!=NULL);
1882     }
1883     msg->pup(p);
1884     if (p.isUnpacking()) add(msg);
1885   }
1886 }
1887
1888 SpanningTree::SpanningTree()
1889 {
1890         double sq = sqrt(CkNumPes()*4.0-3.0) - 1; // 1 + arity + arity*arity = CkNumPes()
1891         arity = (int)ceil(sq/2);
1892         calcParent(CkMyPe());
1893         calcNumChildren(CkMyPe());
1894 }
1895
1896 void SpanningTree::calcParent(int n)
1897 {
1898         parent=-1;
1899         if(n != 0  && arity > 0)
1900                 parent = (n-1)/arity;
1901 }
1902
1903 void SpanningTree::calcNumChildren(int n)
1904 {
1905         numChildren = 0;
1906         if (arity == 0) return;
1907         int fullNode=(CkNumPes()-1-arity)/arity;
1908         if(n <= fullNode)
1909                 numChildren = arity;
1910         if(n == fullNode+1)
1911                 numChildren = CkNumPes()-1-(fullNode+1)*arity;
1912         if(n > fullNode+1)
1913                 numChildren = 0;
1914 }
1915
1916 #include "CentralLB.def.h"
1917  
1918 /*@}*/