57598e57e67d5f39047dddd203100742dc619af6
[charm.git] / src / ck-ldb / CentralLB.C
1
2 /**
3  * \addtogroup CkLdb
4 */
5 /*@{*/
6
7 #include <charm++.h>
8 #include "ck.h"
9 #include "envelope.h"
10 #include "CentralLB.h"
11 #include "LBDBManager.h"
12 #include "LBSimulation.h"
13
14 #include "limits.h"
15 #include <vector>
16
17 #define  DEBUGF(x)       // CmiPrintf x;
18 #define  DEBUG(x)        // x;
19
20 #if CMK_MEM_CHECKPOINT
21    /* can not handle reduction in inmem FT */
22 #define USE_REDUCTION         0
23 #define USE_LDB_SPANNING_TREE 0
24 #elif defined(_FAULT_MLOG_)
25 /* can not handle reduction in inmem FT */
26 #define USE_REDUCTION         0
27 #define USE_LDB_SPANNING_TREE 0
28 #else
29 #define USE_REDUCTION         1
30 #define USE_LDB_SPANNING_TREE 1
31 #endif
32
33 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
34 extern int _restartFlag;
35 extern void getGlobalStep(CkGroupID );
36 extern void initMlogLBStep(CkGroupID );
37 extern int globalResumeCount;
38 extern void sendDummyMigrationCounts(int *);
39 #endif
40
41 #if CMK_GRID_QUEUE_AVAILABLE
42 CpvExtern(void *, CkGridObject);
43 #endif
44
45 CkGroupID loadbalancer;
46 int * lb_ptr;
47 int load_balancer_created;
48
49 struct AdaptiveData {
50   int iteration;
51   double max_load;
52   double avg_load;
53 };
54
55 struct AdaptiveLBDatabase {
56   std::vector<AdaptiveData> history_data;
57 } adaptive_lbdb;
58
59 enum state {
60   OFF,
61   ON,
62   PAUSE,
63   DECIDED,
64   LOAD_BALANCE
65 } local_state;
66
67 struct AdaptiveLBStructure {
68   int lb_ideal_period;
69   int lb_calculated_period;
70   int lb_no_iterations;
71   int global_max_iter_no;
72   int global_recv_iter_counter;
73   double prev_load;
74   double lb_strategy_cost;
75   double lb_migration_cost;
76   bool lb_period_informed;
77 } adaptive_struct;
78
79 CreateLBFunc_Def(CentralLB, "CentralLB base class")
80
81 static void getPredictedLoadWithMsg(BaseLB::LDStats* stats, int count, 
82                              LBMigrateMsg *, LBInfo &info, int considerComm);
83
84 CkReductionMsg* lbDataCollection(int nMsg, CkReductionMsg** msgs) {
85   double lb_data[4];
86   lb_data[0] = 0;
87   lb_data[1] = 0;
88   lb_data[2] = 0;
89   for (int i = 0; i < nMsg; i++) {
90     CkAssert(msgs[i]->getSize() == 4*sizeof(double));
91     double* m = (double *)msgs[i]->getData();
92     lb_data[0] += m[0];
93     lb_data[1] = ((m[1] > lb_data[1])? m[1] : lb_data[1]);
94     lb_data[2] += m[2];
95     if (i == 0) {
96       lb_data[3] = m[3];
97     }
98   }
99   return CkReductionMsg::buildNew(4*sizeof(double), lb_data);
100 }
101
102 /*global*/ CkReduction::reducerType lbDataCollectionType;
103 /*initcall*/ void registerLBDataCollection(void) {
104   lbDataCollectionType = CkReduction::addReducer(lbDataCollection);
105 }
106
107 /*
108 void CreateCentralLB()
109 {
110   CProxy_CentralLB::ckNew(0);
111 }
112 */
113
114 void CentralLB::staticStartLB(void* data)
115 {
116   CentralLB *me = (CentralLB*)(data);
117   me->StartLB();
118 }
119
120 void CentralLB::staticMigrated(void* data, LDObjHandle h, int waitBarrier)
121 {
122   CentralLB *me = (CentralLB*)(data);
123   me->Migrated(h, waitBarrier);
124 }
125
126 void CentralLB::staticAtSync(void* data)
127 {
128   CentralLB *me = (CentralLB*)(data);
129   me->AtSync();
130 }
131
132 void CentralLB::initLB(const CkLBOptions &opt)
133 {
134 #if CMK_LBDB_ON
135   lbname = "CentralLB";
136   thisProxy = CProxy_CentralLB(thisgroup);
137   //  CkPrintf("Construct in %d\n",CkMyPe());
138
139   // create and turn on by default
140   receiver = theLbdb->
141     AddLocalBarrierReceiver((LDBarrierFn)(staticAtSync),(void*)(this));
142   notifier = theLbdb->getLBDB()->
143     NotifyMigrated((LDMigratedFn)(staticMigrated),(void*)(this));
144   startLbFnHdl = theLbdb->getLBDB()->
145     AddStartLBFn((LDStartLBFn)(staticStartLB),(void*)(this));
146
147   // CkPrintf("[%d] CentralLB initLB \n",CkMyPe());
148   if (opt.getSeqNo() > 0) turnOff();
149
150   stats_msg_count = 0;
151   statsMsgsList = NULL;
152   statsData = NULL;
153
154   storedMigrateMsg = NULL;
155   reduction_started = 0;
156
157   // for future predictor
158   if (_lb_predict) predicted_model = new FutureModel(_lb_predict_window);
159   else predicted_model=0;
160   // register user interface callbacks
161   theLbdb->getLBDB()->SetupPredictor((LDPredictModelFn)(staticPredictorOn),(LDPredictWindowFn)(staticPredictorOnWin),(LDPredictFn)(staticPredictorOff),(LDPredictModelFn)(staticChangePredictor),(void*)(this));
162
163   myspeed = theLbdb->ProcessorSpeed();
164
165   migrates_completed = 0;
166   future_migrates_completed = 0;
167   migrates_expected = -1;
168   future_migrates_expected = -1;
169   cur_ld_balancer = _lb_args.central_pe();      // 0 default
170   lbdone = 0;
171   count_msgs=0;
172   statsMsg = NULL;
173
174   if (_lb_args.statsOn()) theLbdb->CollectStatsOn();
175
176   load_balancer_created = 1;
177
178   // If metabalancer enabled, initialize the variables
179   adaptive_struct.lb_ideal_period =  INT_MAX;
180   adaptive_struct.lb_calculated_period = INT_MAX;
181   adaptive_struct.lb_no_iterations = -1;
182   adaptive_struct.global_max_iter_no = 0;
183   adaptive_struct.global_recv_iter_counter = 0;
184   adaptive_struct.prev_load = 0.0;
185   adaptive_struct.lb_strategy_cost = 0.0;
186   adaptive_struct.lb_migration_cost = 0.0;
187   local_state = OFF;
188 #endif
189 }
190
191 CentralLB::~CentralLB()
192 {
193 #if CMK_LBDB_ON
194   delete [] statsMsgsList;
195   delete statsData;
196   theLbdb = CProxy_LBDatabase(_lbdb).ckLocalBranch();
197   if (theLbdb) {
198     theLbdb->getLBDB()->
199       RemoveNotifyMigrated(notifier);
200     theLbdb->
201       RemoveStartLBFn((LDStartLBFn)(staticStartLB));
202   }
203 #endif
204 }
205
206 void CentralLB::turnOn() 
207 {
208 #if CMK_LBDB_ON
209   theLbdb->getLBDB()->
210     TurnOnBarrierReceiver(receiver);
211   theLbdb->getLBDB()->
212     TurnOnNotifyMigrated(notifier);
213   theLbdb->getLBDB()->
214     TurnOnStartLBFn(startLbFnHdl);
215 #endif
216 }
217
218 void CentralLB::turnOff() 
219 {
220 #if CMK_LBDB_ON
221   theLbdb->getLBDB()->
222     TurnOffBarrierReceiver(receiver);
223   theLbdb->getLBDB()->
224     TurnOffNotifyMigrated(notifier);
225   theLbdb->getLBDB()->
226     TurnOffStartLBFn(startLbFnHdl);
227 #endif
228 }
229
230 void CentralLB::AtSync()
231 {
232 //  CkPrintf("AtSync CEntral LB [%d]\n", CkMyPe());
233 #if CMK_LBDB_ON
234 //  DEBUGF(("[%d] CentralLB AtSync step %d!!!!!\n",CkMyPe(),step()));
235
236 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
237         CpvAccess(_currentObj)=this;
238 #endif
239
240   // if num of processor is only 1, nothing should happen
241   if (!QueryBalanceNow(step()) || CkNumPes() == 1) {
242     MigrationDone(0);
243     return;
244   }
245   if(CmiNodeAlive(CkMyPe())){
246     thisProxy [CkMyPe()].ProcessAtSyncMin();
247   }
248 #endif
249 }
250
251 #include "ComlibStrategy.h"
252
253 void CentralLB::ProcessAtSync()
254 {
255
256
257
258 #if CMK_LBDB_ON
259   if (reduction_started) return;              // reducton in progress
260
261   CmiAssert(CmiNodeAlive(CkMyPe()));
262   if (CkMyPe() == cur_ld_balancer) {
263     start_lb_time = CkWallTimer();
264   }
265
266
267 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
268         initMlogLBStep(thisgroup);
269 #endif
270
271   // build message
272   BuildStatsMsg();
273
274 #if USE_REDUCTION
275     // reduction to get total number of objects and comm
276     // so that processor 0 can pre-allocate load balancing database
277   int counts[2];
278   counts[0] = theLbdb->GetObjDataSz();
279   counts[1] = theLbdb->GetCommDataSz();
280
281   CkCallback cb(CkIndex_CentralLB::ReceiveCounts((CkReductionMsg*)NULL), 
282                   thisProxy[0]);
283   contribute(2*sizeof(int), counts, CkReduction::sum_int, cb);
284   reduction_started = 1;
285 #else
286   SendStats();
287 #endif
288 #endif
289 }
290
291 void CentralLB::ProcessAtSyncMin()
292 {
293 #if CMK_LBDB_ON
294   if (reduction_started) return;              // reducton in progress
295
296   CmiAssert(CmiNodeAlive(CkMyPe()));
297   if (CkMyPe() == cur_ld_balancer) {
298     start_lb_time = CkWallTimer();
299   }
300
301
302 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
303         initMlogLBStep(thisgroup);
304 #endif
305   
306   adaptive_struct.lb_no_iterations++;
307  // CkPrintf("[%d] ProcessAtSyncMin lb_iteration [%d] adaptive_struct.lb_ideal_period [%d]\n", CkMyPe(),
308  //     adaptive_struct.lb_no_iterations, adaptive_struct.lb_ideal_period);
309
310   // If decision has been made and has reached the lb_period, then do load
311   // balancing, else if hasn't reached ideal_period, then resume.
312   if (local_state == DECIDED) {
313     if (adaptive_struct.lb_no_iterations < adaptive_struct.lb_ideal_period) {
314  //     CkPrintf("[%d] Decision is made but lagging\n", CkMyPe());
315       SendMinStats();
316       ResumeClients(0);
317     } else {
318       local_state = LOAD_BALANCE;
319  //     CkPrintf("[%d] Decision is made and do LB\n", CkMyPe());
320       ProcessAtSync();
321     }
322     return;
323   }
324    
325   // If the state is ON and not DECIDED, then if havn't reached lb_period, then
326   // move ahead. If has reached lb_ideal_period, then change state to PAUSE and
327   // dont resume client.
328   if (local_state == ON) {
329     if (adaptive_struct.lb_no_iterations < adaptive_struct.lb_ideal_period) {
330       SendMinStats();
331       ResumeClients(0);
332     } else {
333       local_state = PAUSE;
334     }
335     return;
336   }
337
338   SendMinStats();
339   ResumeClients(0);
340 #endif
341 }
342
343 void CentralLB::SendMinStats() {
344
345  double total_load;
346  double idle_time;
347  double bg_walltime;
348   theLbdb->GetTime(&total_load,&total_load, &idle_time, &bg_walltime, &bg_walltime);
349  // CkPrintf("Total walltime [%d] %lf: %lf: %lf final laod: %lf\n", CkMyPe(), total_load, idle_time, bg_walltime, (total_load - idle_time));
350
351   // Since the total_load is cumulative since the last load balancing stage,
352   // Hence it is subtracted from the previous load.
353   total_load -= idle_time;
354   double tmp = total_load;
355   total_load -= adaptive_struct.prev_load;
356   adaptive_struct.prev_load = tmp; 
357
358   double lb_data[4];
359   lb_data[0] = total_load;
360   lb_data[1] = total_load;
361   lb_data[2] = 1;
362   lb_data[3] = adaptive_struct.lb_no_iterations;
363
364   if (adaptive_struct.lb_no_iterations != 0) {
365     CkCallback cb(CkIndex_CentralLB::ReceiveMinStats((CkReductionMsg*)NULL), 
366         thisProxy[0]);
367     contribute(4*sizeof(double), lb_data, lbDataCollectionType, cb);
368   }
369
370 //    int tmp1 = adaptive_struct.lb_no_iterations;
371 //    CkPrintf("[%d] contribution iteration_no: %d\n",CkMyPe(), tmp1);
372 //    // Send the current iteration no
373 //    CkCallback cb1(CkIndex_CentralLB::ReceiveIterationNo((CkReductionMsg*)NULL), 
374 //        thisProxy[0]);
375 //    contribute(sizeof(int), &tmp1, CkReduction::max_int, cb1);
376 }
377
378 void CentralLB::ReceiveMinStats(CkReductionMsg *msg) {
379   CmiAssert(CkMyPe() == 0);
380   double* load = (LBRealType *) msg->getData();
381   double max = load[1];
382   double avg = load[0]/load[2];
383   int iteration_n = load[3];
384   CkPrintf("Iteration %d Total load : %lf Avg load: %lf Max load: %lf for %lf procs\n",iteration_n, load[0], load[0]/load[2], load[1], load[2]);
385   CkPrintf("Current calculated period %d\n", adaptive_struct.lb_calculated_period);
386   delete msg;
387
388   // Store the data for this iteration
389   AdaptiveData data;
390   data.iteration = adaptive_struct.lb_no_iterations;
391   data.max_load = max;
392   data.avg_load = avg;
393   adaptive_lbdb.history_data.push_back(data);
394
395 //  if (adaptive_struct.lb_period_informed) {
396 //    return;
397 //  }
398
399   // If the max/avg ratio is greater than the threshold and also this is not the
400   // step immediately after load balancing, carry out load balancing
401   //if (max/avg >= 1.1 && adaptive_lbdb.history_data.size() > 4) {
402   if (max/avg >= 1.5 && adaptive_lbdb.history_data.size() > 4) {
403     CkPrintf("Carry out load balancing step at iter max/avg(%lf) > 1.1\n", max/avg);
404 //    if (!adaptive_struct.lb_period_informed) {
405 //      // Just for testing
406 //      adaptive_struct.lb_calculated_period = 40;
407 //      adaptive_struct.lb_period_informed = true;
408 //      thisProxy.LoadBalanceDecision(adaptive_struct.lb_calculated_period);
409 //      return;
410 //    }
411
412     // If the new lb period is less than current set lb period
413     if (adaptive_struct.lb_calculated_period > iteration_n + 1) {
414       adaptive_struct.lb_calculated_period = iteration_n + 1;
415       adaptive_struct.lb_period_informed = true;
416       thisProxy.LoadBalanceDecision(adaptive_struct.lb_calculated_period);
417     }
418     return;
419   }
420
421   // Generate the plan for the adaptive strategy
422   int period;
423   if (generatePlan(period)) {
424     //CkPrintf("Carry out load balancing step at iter\n");
425
426     // If the new lb period is less than current set lb period
427     if (adaptive_struct.lb_calculated_period > period) {
428       adaptive_struct.lb_calculated_period = period;
429       adaptive_struct.lb_period_informed = true;
430       thisProxy.LoadBalanceDecision(adaptive_struct.lb_calculated_period);
431     }
432   }
433 }
434
435 bool CentralLB::generatePlan(int& period) {
436   if (adaptive_lbdb.history_data.size() <= 8) {
437     return false;
438   }
439
440   // Some heuristics for lbperiod
441   // If constant load or almost constant,
442   // then max * new_lb_period > avg * new_lb_period + lb_cost
443   double max = 0.0;
444   double avg = 0.0;
445   AdaptiveData data;
446   for (int i = 0; i < adaptive_lbdb.history_data.size(); i++) {
447     data = adaptive_lbdb.history_data[i];
448     max += data.max_load;
449     avg += data.avg_load;
450     CkPrintf("max (%d, %lf) avg (%d, %lf)\n", i, data.max_load, i, data.avg_load);
451   }
452 //  max /= (adaptive_struct.lb_no_iterations - adaptive_lbdb.history_data[0].iteration);
453 //  avg /= (adaptive_struct.lb_no_iterations - adaptive_lbdb.history_data[0].iteration);
454 //
455 //  adaptive_struct.lb_ideal_period = (adaptive_struct.lb_strategy_cost +
456 //  adaptive_struct.lb_migration_cost) / (max - avg);
457 //  CkPrintf("max : %lf, avg: %lf, strat cost: %lf, migration_cost: %lf, idealperiod : %d \n",
458 //      max, avg, adaptive_struct.lb_strategy_cost, adaptive_struct.lb_migration_cost, adaptive_struct.lb_ideal_period);
459 //
460   // If linearly varying load, then find lb_period
461   // area between the max and avg curve 
462   double mslope, aslope, mc, ac;
463   getLineEq(aslope, ac, mslope, mc);
464   CkPrintf("\n max: %fx + %f; avg: %fx + %f\n", mslope, mc, aslope, ac);
465   double a = (mslope - aslope)/2;
466   double b = (mc - ac);
467   double c = -(adaptive_struct.lb_strategy_cost + adaptive_struct.lb_migration_cost);
468   //c = -2.5;
469   bool got_period = getPeriodForLinear(a, b, c, period);
470   if (!got_period) {
471     return false;
472   }
473   
474   if (mslope < 0) {
475     if (period > (-mc/mslope)) {
476       return false;
477     }
478   }
479
480   if (aslope < 0) {
481     if (period > (-ac/aslope)) {
482       return false;
483     }
484   }
485
486   if (period > ((mc - ac)/(aslope - mslope))) {
487     return false;
488   }
489   return true;
490 }
491
492 bool CentralLB::getPeriodForLinear(double a, double b, double c, int& period) {
493   CkPrintf("Quadratic Equation %lf X^2 + %lf X + %lf\n", a, b, c);
494   if (a == 0.0) {
495     period = (-c / b);
496     CkPrintf("Ideal period for linear load %d\n", period);
497     return true;
498   }
499   int x;
500   double t = (b * b) - (4*a*c);
501   if (t < 0) {
502     CkPrintf("(b * b) - (4*a*c) is -ve sqrt : %lf\n", sqrt(t));
503     return false;
504   }
505   t = (-b + sqrt(t)) / (2*a);
506   x = t;
507   if (x < 0) {
508     CkPrintf("boo!!! x (%d) < 0\n", x);
509     x = 0;
510     return false;
511   }
512   period = x;
513   CkPrintf("Ideal period for linear load %d\n", period);
514   return true;
515 }
516
517 bool CentralLB::getLineEq(double& aslope, double& ac, double& mslope, double& mc) {
518   int total = adaptive_lbdb.history_data.size();
519   int iterations = 1 + adaptive_lbdb.history_data[total - 1].iteration -
520       adaptive_lbdb.history_data[0].iteration;
521   double a1 = 0;
522   double m1 = 0;
523   double a2 = 0;
524   double m2 = 0;
525   AdaptiveData data;
526   int i = 0;
527   for (i = 0; i <= total/2; i++) {
528     data = adaptive_lbdb.history_data[i];
529     m1 += data.max_load;
530     a1 += data.avg_load;
531   }
532   m1 /= i;
533   a1 /= i;
534
535   for (i = total/2; i < total; i++) {
536     data = adaptive_lbdb.history_data[i];
537     m2 += data.max_load;
538     a2 += data.avg_load;
539   }
540   m2 /= (i - total/2);
541   a2 /= (i - total/2);
542
543   aslope = 2 * (a2 - a1) / iterations;
544   mslope = 2 * (m2 - m1) / iterations;
545   ac = adaptive_lbdb.history_data[0].avg_load;
546   mc = adaptive_lbdb.history_data[0].max_load;
547   return true;
548 }
549
550 void CentralLB::LoadBalanceDecision(int period) {
551   CkPrintf("[%d] Load balance decision made cur iteration: %d period:%d state: %d\n",CkMyPe(), adaptive_struct.lb_no_iterations, period, local_state);
552   adaptive_struct.lb_ideal_period = period;
553
554   if (local_state == ON) {
555     CkPrintf("lb will happen at %d\n", adaptive_struct.lb_ideal_period);
556     local_state = DECIDED;
557     return;
558   }
559
560   // If the state is PAUSE, then its waiting for the final decision from central
561   // processor. If the decision is that the ideal period is in the future,
562   // resume. If the ideal period is now, then carry out load balancing.
563   if (local_state == PAUSE) {
564     if (adaptive_struct.lb_no_iterations < adaptive_struct.lb_ideal_period) {
565       local_state = DECIDED;
566       ResumeClients(0);
567     } else {
568       ProcessAtSync();
569     }
570     return;
571   }
572
573 //  if (local_state == OFF) {
574     local_state = ON;
575     thisProxy[0].ReceiveIterationNo(adaptive_struct.lb_no_iterations);
576 //    return;
577 //  }
578 }
579
580 void CentralLB::ReceiveIterationNo(int local_iter_no) {
581   CmiAssert(CkMyPe() == 0);
582   adaptive_struct.global_recv_iter_counter++;
583   if (local_iter_no > adaptive_struct.global_max_iter_no) {
584     adaptive_struct.global_max_iter_no = local_iter_no;
585   }
586   if (CkNumPes() == adaptive_struct.global_recv_iter_counter) {
587     adaptive_struct.lb_ideal_period = (adaptive_struct.lb_ideal_period > adaptive_struct.global_max_iter_no) ? adaptive_struct.lb_ideal_period : adaptive_struct.global_max_iter_no + 1;
588     thisProxy.LoadBalanceDecision(adaptive_struct.lb_ideal_period);
589     adaptive_struct.global_max_iter_no = 0;
590     adaptive_struct.global_recv_iter_counter = 0;
591   }
592 }
593
594 // called only on 0
595 void CentralLB::ReceiveCounts(CkReductionMsg  *msg)
596 {
597   CmiAssert(CkMyPe() == 0);
598   if (statsData == NULL) statsData = new LDStats;
599
600   int *counts = (int *)msg->getData();
601   int n_objs = counts[0];
602   int n_comm = counts[1];
603
604     // resize database
605   statsData->objData.resize(n_objs);
606   statsData->from_proc.resize(n_objs);
607   statsData->to_proc.resize(n_objs);
608   statsData->commData.resize(n_comm);
609
610   DEBUGF(("[%d] ReceiveCounts: n_objs:%d n_comm:%d\n",CkMyPe(), n_objs, n_comm));
611         
612     // broadcast call to let everybody start to send stats
613   thisProxy.SendStats();
614 }
615
616 void CentralLB::BuildStatsMsg()
617 {
618 #if CMK_LBDB_ON
619   // build and send stats
620   const int osz = theLbdb->GetObjDataSz();
621   const int csz = theLbdb->GetCommDataSz();
622
623   int npes = CkNumPes();
624   CLBStatsMsg* msg = new CLBStatsMsg(osz, csz);
625   _MEMCHECK(msg);
626   msg->from_pe = CkMyPe();
627 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
628         msg->step = step();
629 #endif
630   //msg->serial = CrnRand();
631
632 /*
633   theLbdb->TotalTime(&msg->total_walltime,&msg->total_cputime);
634   theLbdb->IdleTime(&msg->idletime);
635   theLbdb->BackgroundLoad(&msg->bg_walltime,&msg->bg_cputime);
636 */
637 #if CMK_LB_CPUTIMER
638   theLbdb->GetTime(&msg->total_walltime,&msg->total_cputime,
639                    &msg->idletime, &msg->bg_walltime,&msg->bg_cputime);
640 #else
641   theLbdb->GetTime(&msg->total_walltime,&msg->total_walltime,
642                    &msg->idletime, &msg->bg_walltime,&msg->bg_walltime);
643 #endif
644
645   msg->pe_speed = myspeed;
646   DEBUGF(("Processor %d Total time (wall,cpu) = %f %f Idle = %f Bg = %f %f\n", CkMyPe(),msg->total_walltime,msg->total_cputime,msg->idletime,msg->bg_walltime,msg->bg_cputime));
647
648   msg->n_objs = osz;
649   theLbdb->GetObjData(msg->objData);
650   msg->n_comm = csz;
651   theLbdb->GetCommData(msg->commData);
652 //  theLbdb->ClearLoads();
653   DEBUGF(("PE %d BuildStatsMsg %d objs, %d comm\n",CkMyPe(),msg->n_objs,msg->n_comm));
654
655   if(CkMyPe() == cur_ld_balancer) {
656     msg->avail_vector = new char[CkNumPes()];
657     LBDatabaseObj()->get_avail_vector(msg->avail_vector);
658     msg->next_lb = LBDatabaseObj()->new_lbbalancer();
659   }
660
661   CmiAssert(statsMsg == NULL);
662   statsMsg = msg;
663 #endif
664 }
665
666
667 // called on every processor
668 void CentralLB::SendStats()
669 {
670 #if CMK_LBDB_ON
671   CmiAssert(statsMsg != NULL);
672   reduction_started = 0;
673
674 #if USE_LDB_SPANNING_TREE
675   if(CkNumPes()>1024)
676   {
677     if (CkMyPe() == cur_ld_balancer)
678       thisProxy[CkMyPe()].ReceiveStats(statsMsg);
679     else
680       thisProxy[CkMyPe()].ReceiveStatsViaTree(statsMsg);
681   }
682   else
683 #endif
684   {
685     DEBUGF(("[%d] calling ReceiveStats on step %d \n",CmiMyPe(),step()));
686     thisProxy[cur_ld_balancer].ReceiveStats(statsMsg);
687   }
688
689   statsMsg = NULL;
690
691 #ifdef __BIGSIM__
692   BgEndStreaming();
693 #endif
694
695   {
696   // enfore the barrier to wait until centralLB says no
697   LDOMHandle h;
698   h.id.id.idx = 0;
699   theLbdb->getLBDB()->RegisteringObjects(h);
700   }
701 #endif
702 }
703
704 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
705 extern int donotCountMigration;
706 #endif
707
708 void CentralLB::Migrated(LDObjHandle h, int waitBarrier)
709 {
710 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
711     if(donotCountMigration){
712         return ;
713     }
714 #endif
715
716 #if CMK_LBDB_ON
717   if (waitBarrier) {
718             migrates_completed++;
719       DEBUGF(("[%d] An object migrated! %d %d\n",CkMyPe(),migrates_completed,migrates_expected));
720     if (migrates_completed == migrates_expected) {
721       MigrationDone(1);
722     }
723   }
724   else {
725     future_migrates_completed ++;
726     DEBUGF(("[%d] An object migrated with no barrier! %d expected: %d\n",CkMyPe(),future_migrates_completed,future_migrates_expected));
727     if (future_migrates_completed == future_migrates_expected)  {
728         CheckMigrationComplete();
729     }
730   }
731 #endif
732 }
733
734 void CentralLB::MissMigrate(int waitForBarrier)
735 {
736   LDObjHandle h;
737   Migrated(h, waitForBarrier);
738 }
739
740 // build a complete data from bufferred messages
741 // not used when USE_REDUCTION = 1
742 void CentralLB::buildStats()
743 {
744     statsData->nprocs() = stats_msg_count;
745     // allocate space
746     statsData->objData.resize(statsData->n_objs);
747     statsData->from_proc.resize(statsData->n_objs);
748     statsData->to_proc.resize(statsData->n_objs);
749     statsData->commData.resize(statsData->n_comm);
750
751     int nobj = 0;
752     int ncom = 0;
753     int nmigobj = 0;
754     // copy all data in individule message to this big structure
755     for (int pe=0; pe<CkNumPes(); pe++) {
756        int i;
757        CLBStatsMsg *msg = statsMsgsList[pe];
758        if(msg == NULL) continue;
759        for (i=0; i<msg->n_objs; i++) {
760          statsData->from_proc[nobj] = statsData->to_proc[nobj] = pe;
761          statsData->objData[nobj] = msg->objData[i];
762          if (msg->objData[i].migratable) nmigobj++;
763          nobj++;
764        }
765        for (i=0; i<msg->n_comm; i++) {
766          statsData->commData[ncom] = msg->commData[i];
767          ncom++;
768        }
769        // free the memory
770        delete msg;
771        statsMsgsList[pe]=0;
772     }
773     statsData->n_migrateobjs = nmigobj;
774 }
775
776 // deposit one processor data at a time, note database is pre-allocated
777 // to have enough space
778 // used when USE_REDUCTION = 1
779 void CentralLB::depositData(CLBStatsMsg *m)
780 {
781   int i;
782   if (m == NULL) return;
783
784   const int pe = m->from_pe;
785   struct ProcStats &procStat = statsData->procs[pe];
786   procStat.pe = pe;
787   procStat.total_walltime = m->total_walltime;
788   procStat.idletime = m->idletime;
789   procStat.bg_walltime = m->bg_walltime;
790 #if CMK_LB_CPUTIMER
791   procStat.total_cputime = m->total_cputime;
792   procStat.bg_cputime = m->bg_cputime;
793 #endif
794   procStat.pe_speed = m->pe_speed;
795   //procStat.utilization = 1.0;
796   procStat.available = CmiTrue;
797   procStat.n_objs = m->n_objs;
798
799   int &nobj = statsData->n_objs;
800   int &nmigobj = statsData->n_migrateobjs;
801   for (i=0; i<m->n_objs; i++) {
802       statsData->from_proc[nobj] = statsData->to_proc[nobj] = pe;
803       statsData->objData[nobj] = m->objData[i];
804       if (m->objData[i].migratable) nmigobj++;
805       nobj++;
806       CmiAssert(nobj <= statsData->objData.capacity());
807   }
808   int &n_comm = statsData->n_comm;
809   for (i=0; i<m->n_comm; i++) {
810       statsData->commData[n_comm] = m->commData[i];
811       n_comm++;
812       CmiAssert(n_comm <= statsData->commData.capacity());
813   }
814   delete m;
815 }
816
817 void CentralLB::ReceiveStats(CkMarshalledCLBStatsMessage &msg)
818 {
819 #if CMK_LBDB_ON
820   if (statsMsgsList == NULL) {
821     statsMsgsList = new CLBStatsMsg*[CkNumPes()];
822     CmiAssert(statsMsgsList != NULL);
823     for(int i=0; i < CkNumPes(); i++)
824       statsMsgsList[i] = 0;
825   }
826   if (statsData == NULL) statsData = new LDStats;
827
828     //  loop through all CLBStatsMsg in the incoming msg
829   int count = msg.getCount();
830   for (int num = 0; num < count; num++) 
831   {
832     CLBStatsMsg *m = msg.getMessage(num);
833     CmiAssert(m!=NULL);
834     const int pe = m->from_pe;
835     DEBUGF(("Stats msg received, %d %d %d %p step %d\n", pe,stats_msg_count,m->n_objs,m,step()));
836 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))     
837 /*      
838  *  if(m->step < step()){
839  *    //TODO: if a processor is redoing an old load balance step..
840  *    //tell it that the step is done and that it should not perform any migrations
841  *      thisProxy[pe].ReceiveDummyMigration();
842  *  }*/
843 #endif
844         
845     if(!CmiNodeAlive(pe)){
846         DEBUGF(("[%d] ReceiveStats called from invalidProcessor %d\n",CkMyPe(),pe));
847         continue;
848     }
849         
850     if (m->avail_vector!=NULL) {
851       LBDatabaseObj()->set_avail_vector(m->avail_vector,  m->next_lb);
852     }
853
854     if (statsMsgsList[pe] != 0) {
855       CkPrintf("*** Unexpected CLBStatsMsg in ReceiveStats from PE %d ***\n",
856              pe);
857     } else {
858       statsMsgsList[pe] = m;
859 #if USE_REDUCTION
860       depositData(m);
861 #else
862       // store per processor data right away
863       struct ProcStats &procStat = statsData->procs[pe];
864       procStat.pe = pe;
865       procStat.total_walltime = m->total_walltime;
866       procStat.idletime = m->idletime;
867       procStat.bg_walltime = m->bg_walltime;
868 #if CMK_LB_CPUTIMER
869       procStat.total_cputime = m->total_cputime;
870       procStat.bg_cputime = m->bg_cputime;
871 #endif
872       procStat.pe_speed = m->pe_speed;
873       //procStat.utilization = 1.0;
874       procStat.available = CmiTrue;
875       procStat.n_objs = m->n_objs;
876
877       statsData->n_objs += m->n_objs;
878       statsData->n_comm += m->n_comm;
879 #endif
880       stats_msg_count++;
881     }
882   }    // end of for
883
884   const int clients = CkNumValidPes();
885   DEBUGF(("THIS POINT count = %d, clients = %d\n",stats_msg_count,clients));
886  
887   if (stats_msg_count == clients) {
888         DEBUGF(("[%d] All stats messages received \n",CmiMyPe()));
889     statsData->nprocs() = stats_msg_count;
890     thisProxy[CkMyPe()].LoadBalance();
891   }
892 #endif
893 }
894
895 /** added by Abhinav for receiving msgs via spanning tree */
896 void CentralLB::ReceiveStatsViaTree(CkMarshalledCLBStatsMessage &msg)
897 {
898 #if CMK_LBDB_ON
899         CmiAssert(CkMyPe() != 0);
900         bufMsg.add(msg);         // buffer messages
901         count_msgs++;
902         //CkPrintf("here %d\n", CkMyPe());
903         if (count_msgs == st.numChildren+1) {
904                 if(st.parent == 0)
905                 {
906                         thisProxy[0].ReceiveStats(bufMsg);
907                         //CkPrintf("from %d\n", CkMyPe());
908                 }
909                 else
910                         thisProxy[st.parent].ReceiveStatsViaTree(bufMsg);
911                 count_msgs = 0;
912                 bufMsg.free();
913         } 
914 #endif
915 }
916
917 void CentralLB::LoadBalance()
918 {
919 #if CMK_LBDB_ON
920   int proc;
921   const int clients = CkNumPes();
922
923 #if ! USE_REDUCTION
924   // build data
925   buildStats();
926 #else
927   for (proc = 0; proc < clients; proc++) statsMsgsList[proc] = NULL;
928 #endif
929
930   if (_lb_args.debug()) 
931       CmiPrintf("\nCharmLB> %s: PE [%d] step %d starting at %f Memory: %f MB\n",
932                   lbname, cur_ld_balancer, step(), start_lb_time,
933                   CmiMemoryUsage()/(1024.0*1024.0));
934
935   // if we are in simulation mode read data
936   if (LBSimulation::doSimulation) simulationRead();
937
938   char *availVector = LBDatabaseObj()->availVector();
939   for(proc = 0; proc < clients; proc++)
940       statsData->procs[proc].available = (CmiBool)availVector[proc];
941
942   preprocess(statsData);
943
944 //    CkPrintf("Before Calling Strategy\n");
945
946   if (_lb_args.printSummary()) {
947       LBInfo info(clients);
948         // not take comm data
949       info.getInfo(statsData, clients, 0);
950       LBRealType mLoad, mCpuLoad, totalLoad;
951       info.getSummary(mLoad, mCpuLoad, totalLoad);
952       int nmsgs, nbytes;
953       statsData->computeNonlocalComm(nmsgs, nbytes);
954       CkPrintf("[%d] Load Summary (before LB): max (with bg load): %f max (obj only): %f average: %f at step %d nonlocal: %d msgs %.2fKB.\n", CkMyPe(), mLoad, mCpuLoad, totalLoad/clients, step(), nmsgs, 1.0*nbytes/1024);
955 //      if (_lb_args.debug() > 1) {
956 //        for (int i=0; i<statsData->n_objs; i++)
957 //          CmiPrintf("[%d] %.10f %.10f\n", i, statsData->objData[i].minWall, statsData->objData[i].maxWall);
958 //      }
959   }
960
961 #if CMK_REPLAYSYSTEM
962   LDHandle *loadBalancer_pointers;
963   if (_replaySystem) {
964     loadBalancer_pointers = (LDHandle*)malloc(CkNumPes()*sizeof(LDHandle));
965     for (int i=0; i<statsData->n_objs; ++i) loadBalancer_pointers[statsData->from_proc[i]] = statsData->objData[i].handle.omhandle.ldb;
966   }
967 #endif
968   
969   LBMigrateMsg* migrateMsg = Strategy(statsData);
970 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
971         migrateMsg->step = step();
972 #endif
973
974 #if CMK_REPLAYSYSTEM
975   CpdHandleLBMessage(&migrateMsg);
976   if (_replaySystem) {
977     for (int i=0; i<migrateMsg->n_moves; ++i) migrateMsg->moves[i].obj.omhandle.ldb = loadBalancer_pointers[migrateMsg->moves[i].from_pe];
978     free(loadBalancer_pointers);
979   }
980 #endif
981   
982   LBDatabaseObj()->get_avail_vector(migrateMsg->avail_vector);
983   migrateMsg->next_lb = LBDatabaseObj()->new_lbbalancer();
984
985   // if this is the step at which we need to dump the database
986   simulationWrite();
987
988 //  calculate predicted load
989 //  very time consuming though, so only happen when debugging is on
990   if (_lb_args.printSummary()) {
991       LBInfo info(clients);
992         // not take comm data
993       getPredictedLoadWithMsg(statsData, clients, migrateMsg, info, 0);
994       LBRealType mLoad, mCpuLoad, totalLoad;
995       info.getSummary(mLoad, mCpuLoad, totalLoad);
996       int nmsgs, nbytes;
997       statsData->computeNonlocalComm(nmsgs, nbytes);
998       CkPrintf("[%d] Load Summary (after LB): max (with bg load): %f max (obj only): %f average: %f at step %d nonlocal: %d msgs %.2fKB useMem: %.2fKB.\n", CkMyPe(), mLoad, mCpuLoad, totalLoad/clients, step(), nmsgs, 1.0*nbytes/1024, (1.0*useMem())/1024);
999       for (int i=0; i<clients; i++)
1000         migrateMsg->expectedLoad[i] = info.peLoads[i];
1001   }
1002
1003   DEBUGF(("[%d]calling recv migration\n",CkMyPe()));
1004 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_)) 
1005     lbDecisionCount++;
1006     migrateMsg->lbDecisionCount = lbDecisionCount;
1007 #endif
1008
1009   envelope *env = UsrToEnv(migrateMsg);
1010   if (1) {
1011       // broadcast
1012     thisProxy.ReceiveMigration(migrateMsg);
1013   }
1014   else {
1015     // split the migration for each processor
1016     for (int p=0; p<CkNumPes(); p++) {
1017       LBMigrateMsg *m = extractMigrateMsg(migrateMsg, p);
1018       thisProxy[p].ReceiveMigration(m);
1019     }
1020     delete migrateMsg;
1021   }
1022
1023   // Zero out data structures for next cycle
1024   // CkPrintf("zeroing out data\n");
1025   statsData->clear();
1026   stats_msg_count=0;
1027 #endif
1028 }
1029
1030 // test if sender and receiver in a commData is nonmigratable.
1031 static int isMigratable(LDObjData **objData, int *len, int count, const LDCommData &commData)
1032 {
1033 #if CMK_LBDB_ON
1034   for (int pe=0 ; pe<count; pe++)
1035   {
1036     for (int i=0; i<len[pe]; i++)
1037       if (LDObjIDEqual(objData[pe][i].objID(), commData.sender.objID()) ||
1038           LDObjIDEqual(objData[pe][i].objID(), commData.receiver.get_destObj().objID())) 
1039       return 0;
1040   }
1041 #endif
1042   return 1;
1043 }
1044
1045 // rebuild LDStats and remove all non-migratble objects and related things
1046 void CentralLB::removeNonMigratable(LDStats* stats, int count)
1047 {
1048   int i;
1049
1050   // check if we have non-migratable objects
1051   int have = 0;
1052   for (i=0; i<stats->n_objs; i++) 
1053   {
1054     LDObjData &odata = stats->objData[i];
1055     if (!odata.migratable) {
1056       have = 1; break;
1057     }
1058   }
1059   if (have == 0) return;
1060
1061   CkVec<LDObjData> nonmig;
1062   CkVec<int> new_from_proc, new_to_proc;
1063   nonmig.resize(stats->n_migrateobjs);
1064   new_from_proc.resize(stats->n_migrateobjs);
1065   new_to_proc.resize(stats->n_migrateobjs);
1066   int n_objs = 0;
1067   for (i=0; i<stats->n_objs; i++) 
1068   {
1069     LDObjData &odata = stats->objData[i];
1070     if (odata.migratable) {
1071       nonmig[n_objs] = odata;
1072       new_from_proc[n_objs] = stats->from_proc[i];
1073       new_to_proc[n_objs] = stats->to_proc[i];
1074       n_objs ++;
1075     }
1076     else {
1077       stats->procs[stats->from_proc[i]].bg_walltime += odata.wallTime;
1078 #if CMK_LB_CPUTIMER
1079       stats->procs[stats->from_proc[i]].bg_cputime += odata.cpuTime;
1080 #endif
1081     }
1082   }
1083   CmiAssert(stats->n_migrateobjs == n_objs);
1084
1085   stats->makeCommHash();
1086   
1087   CkVec<LDCommData> newCommData;
1088   newCommData.resize(stats->n_comm);
1089   int n_comm = 0;
1090   for (i=0; i<stats->n_comm; i++) 
1091   {
1092     LDCommData& cdata = stats->commData[i];
1093     if (!cdata.from_proc()) 
1094     {
1095       int idx = stats->getSendHash(cdata);
1096       CmiAssert(idx != -1);
1097       if (!stats->objData[idx].migratable) continue;
1098     }
1099     switch (cdata.receiver.get_type()) {
1100     case LD_PROC_MSG:
1101       break;
1102     case LD_OBJ_MSG:  {
1103       int idx = stats->getRecvHash(cdata);
1104       if (stats->complete_flag)
1105         CmiAssert(idx != -1);
1106       else if (idx == -1) continue;          // receiver not in this group
1107       if (!stats->objData[idx].migratable) continue;
1108       break;
1109       }
1110     case LD_OBJLIST_MSG:    // object message FIXME add multicast
1111       break;
1112     }
1113     newCommData[n_comm] = cdata;
1114     n_comm ++;
1115   }
1116
1117   if (n_objs != stats->n_objs) CmiPrintf("Removed %d nonmigratable %d comms - n_objs:%d migratable:%d\n", stats->n_objs-n_objs, stats->n_objs, stats->n_migrateobjs, stats->n_comm-n_comm);
1118
1119   // swap to new data
1120   stats->objData = nonmig;
1121   stats->from_proc = new_from_proc;
1122   stats->to_proc = new_to_proc;
1123   stats->n_objs = n_objs;
1124
1125   stats->commData = newCommData;
1126   stats->n_comm = n_comm;
1127
1128   stats->deleteCommHash();
1129   stats->makeCommHash();
1130
1131 }
1132
1133
1134 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1135 extern int restarted;
1136 #endif
1137
1138 void CentralLB::ReceiveMigration(LBMigrateMsg *m)
1139 {
1140   storedMigrateMsg = m;
1141   CkCallback cb(CkIndex_CentralLB::ProcessReceiveMigration((CkReductionMsg*)NULL),
1142                   thisProxy);
1143   contribute(0, NULL, CkReduction::max_int, cb);
1144
1145   // Reset all adaptive lb related fields since load balancing is being done.
1146   adaptive_struct.lb_no_iterations = -1;
1147   adaptive_lbdb.history_data.clear();
1148   adaptive_struct.prev_load = 0.0;
1149   local_state = OFF;
1150   adaptive_struct.lb_period_informed = false;
1151   adaptive_struct.lb_ideal_period = INT_MAX;
1152   adaptive_struct.lb_calculated_period = INT_MAX;
1153 }
1154
1155 void CentralLB::ProcessReceiveMigration(CkReductionMsg  *msg)
1156 {
1157 #if CMK_LBDB_ON
1158         int i;
1159         LBMigrateMsg *m = storedMigrateMsg;
1160         CmiAssert(m!=NULL);
1161         delete msg;
1162
1163 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1164         int *dummyCounts;
1165
1166         DEBUGF(("[%d] Starting ReceiveMigration WITH step %d m->step %d\n",CkMyPe(),step(),m->step));
1167         // CmiPrintf("[%d] Starting ReceiveMigration step %d m->step %d\n",CkMyPe(),step(),m->step);
1168         if(step() > m->step){
1169                 char str[100];
1170                 envelope *env = UsrToEnv(m);
1171                 CmiPrintf("[%d] Object %s tProcessed %d m->TN %d\n",CmiMyPe(),mlogData->objID.toString(str),mlogData->tProcessed,env->TN);
1172                 return;
1173         }
1174         lbDecisionCount = m->lbDecisionCount;
1175 #endif
1176
1177   if (_lb_args.debug() > 1) 
1178     if (CkMyPe()%1024==0) CmiPrintf("[%d] Starting ReceiveMigration step %d at %f\n",CkMyPe(),step(), CmiWallTimer());
1179
1180   for (i=0; i<CkNumPes(); i++) theLbdb->lastLBInfo.expectedLoad[i] = m->expectedLoad[i];
1181   CmiAssert(migrates_expected <= 0 || migrates_completed == migrates_expected);
1182 /*FAULT_EVAC*/
1183   if(!CmiNodeAlive(CkMyPe())){
1184         delete m;
1185         return;
1186   }
1187   migrates_expected = 0;
1188   future_migrates_expected = 0;
1189 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1190         int sending=0;
1191     int dummy=0;
1192         LBDB *_myLBDB = theLbdb->getLBDB();
1193         if(_restartFlag){
1194         dummyCounts = new int[CmiNumPes()];
1195         bzero(dummyCounts,sizeof(int)*CmiNumPes());
1196     }
1197 #endif
1198   for(i=0; i < m->n_moves; i++) {
1199     MigrateInfo& move = m->moves[i];
1200     const int me = CkMyPe();
1201     if (move.from_pe == me && move.to_pe != me) {
1202       DEBUGF(("[%d] migrating object to %d\n",move.from_pe,move.to_pe));
1203       // migrate object, in case it is already gone, inform toPe
1204 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
1205       if (theLbdb->Migrate(move.obj,move.to_pe) == 0) 
1206          thisProxy[move.to_pe].MissMigrate(!move.async_arrival);
1207 #else
1208             if(_restartFlag == 0){
1209                 DEBUG(CmiPrintf("[%d] need to move object from %d to %d \n",CkMyPe(),move.from_pe,move.to_pe));
1210                 theLbdb->Migrate(move.obj,move.to_pe);
1211                 sending++;
1212             }else{
1213                 if(_myLBDB->validObjHandle(move.obj)){
1214                     DEBUG(CmiPrintf("[%d] need to move object from %d to %d \n",CkMyPe(),move.from_pe,move.to_pe));
1215                     theLbdb->Migrate(move.obj,move.to_pe);
1216                     sending++;
1217                 }else{
1218                     DEBUG(CmiPrintf("[%d] dummy move to pe %d detected after restart \n",CmiMyPe(),move.to_pe));
1219                     dummyCounts[move.to_pe]++;
1220                     dummy++;
1221                 }
1222             }
1223 #endif
1224     } else if (move.from_pe != me && move.to_pe == me) {
1225        DEBUGF(("[%d] expecting object from %d\n",move.to_pe,move.from_pe));
1226       if (!move.async_arrival) migrates_expected++;
1227       else future_migrates_expected++;
1228     }
1229   }
1230   DEBUGF(("[%d] in ReceiveMigration %d moves expected: %d future expected: %d\n",CkMyPe(),m->n_moves, migrates_expected, future_migrates_expected));
1231   // if (_lb_debug) CkPrintf("[%d] expecting %d objects migrating.\n", CkMyPe(), migrates_expected);
1232
1233 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1234         if(_restartFlag){
1235                 sendDummyMigrationCounts(dummyCounts);
1236                 _restartFlag  =0;
1237         delete []dummyCounts;
1238         }
1239 #endif
1240
1241
1242 #if 0
1243   if (m->n_moves ==0) {
1244     theLbdb->SetLBPeriod(theLbdb->GetLBPeriod()*2);
1245   }
1246 #endif
1247   cur_ld_balancer = m->next_lb;
1248   if((CkMyPe() == cur_ld_balancer) && (cur_ld_balancer != 0)){
1249       LBDatabaseObj()->set_avail_vector(m->avail_vector, -2);
1250   }
1251
1252   if (migrates_expected == 0 || migrates_completed == migrates_expected)
1253     MigrationDone(1);
1254   delete m;
1255
1256 //      CkEvacuatedElement();
1257 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1258 //  migrates_expected = 0;
1259 //  //  ResumeClients(1);
1260 #endif
1261 #endif
1262 }
1263
1264 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1265 void CentralLB::ReceiveDummyMigration(int globalDecisionCount){
1266     DEBUGF(("[%d] ReceiveDummyMigration called for step %d with globalDecisionCount %d\n",CkMyPe(),step(),globalDecisionCount));
1267     //TODO: this is gonna be important when a crash happens during checkpoint
1268     //the globalDecisionCount would have to be saved and compared against
1269     //a future recvMigration
1270                 
1271         thisProxy[CkMyPe()].ResumeClients(1);
1272 }
1273 #endif
1274
1275 void CentralLB::MigrationDone(int balancing)
1276 {
1277 #if CMK_LBDB_ON
1278   migrates_completed = 0;
1279   migrates_expected = -1;
1280   // clear load stats
1281   if (balancing) theLbdb->ClearLoads();
1282   // Increment to next step
1283   theLbdb->incStep();
1284         DEBUGF(("[%d] Incrementing Step %d \n",CkMyPe(),step()));
1285   // if sync resume, invoke a barrier
1286
1287 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1288     savedBalancing = balancing;
1289     startLoadBalancingMlog(&resumeCentralLbAfterChkpt,(void *)this);
1290 #endif
1291
1292   LBDatabase::Object()->MigrationDone();    // call registered callbacks
1293
1294   LoadbalanceDone(balancing);        // callback
1295 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
1296   // if sync resume invoke a barrier
1297   if (balancing && _lb_args.syncResume()) {
1298     CkCallback cb(CkIndex_CentralLB::ResumeClients((CkReductionMsg*)NULL), 
1299                   thisProxy);
1300     contribute(0, NULL, CkReduction::sum_int, cb);
1301   }
1302   else{ 
1303     if(CmiNodeAlive(CkMyPe())){
1304         thisProxy [CkMyPe()].ResumeClients(balancing);
1305     }   
1306   }     
1307 #if CMK_GRID_QUEUE_AVAILABLE
1308   CmiGridQueueDeregisterAll ();
1309   CpvAccess(CkGridObject) = NULL;
1310 #endif
1311 #endif 
1312 #endif
1313 }
1314
1315 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1316 void CentralLB::endMigrationDone(int balancing){
1317     DEBUGF(("[%d] CentralLB::endMigrationDone step %d\n",CkMyPe(),step()));
1318
1319
1320   if (balancing && _lb_args.syncResume()) {
1321     CkCallback cb(CkIndex_CentralLB::ResumeClients((CkReductionMsg*)NULL),
1322                   thisProxy);
1323     contribute(0, NULL, CkReduction::sum_int, cb);
1324   }
1325   else{
1326     if(CmiNodeAlive(CkMyPe())){
1327     DEBUGF(("[%d] Sending ResumeClients balancing %d \n",CkMyPe(),balancing));
1328     thisProxy [CkMyPe()].ResumeClients(balancing);
1329     }
1330   }
1331
1332 }
1333 #endif
1334
1335 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1336 void resumeCentralLbAfterChkpt(void *_lb){
1337     CentralLB *lb= (CentralLB *)_lb;
1338     CpvAccess(_currentObj)=lb;
1339     lb->endMigrationDone(lb->savedBalancing);
1340 }
1341 #endif
1342
1343
1344 void CentralLB::ResumeClients(CkReductionMsg *msg)
1345 {
1346   ResumeClients(1);
1347   delete msg;
1348 }
1349
1350 void CentralLB::ResumeClients(int balancing)
1351 {
1352 #if CMK_LBDB_ON
1353 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1354     resumeCount++;
1355     globalResumeCount = resumeCount;
1356 #endif
1357   DEBUGF(("[%d] Resuming clients. balancing:%d.\n",CkMyPe(),balancing));
1358   if (balancing && _lb_args.debug() && CkMyPe() == cur_ld_balancer) {
1359     double end_lb_time = CkWallTimer();
1360   }
1361
1362 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
1363   if (balancing) ComlibNotifyMigrationDone();  
1364 #endif
1365
1366   theLbdb->ResumeClients();
1367   if (balancing)  {
1368
1369     CheckMigrationComplete();
1370     if (future_migrates_expected == 0 || 
1371             future_migrates_expected == future_migrates_completed) {
1372       CheckMigrationComplete();
1373     }
1374   }
1375 #endif
1376 }
1377
1378 /*
1379   migration of objects contains two different kinds:
1380   (1) objects want to make a barrier for migration completion
1381       (waitForBarrier is true)
1382       migrationDone() to finish and resumeClients
1383   (2) objects don't need a barrier
1384   However, next load balancing can only happen when both migrations complete
1385 */ 
1386 void CentralLB::CheckMigrationComplete()
1387 {
1388 #if CMK_LBDB_ON
1389   lbdone ++;
1390   if (lbdone == 2) {
1391     if (_lb_args.debug() && CkMyPe()==0) {
1392       double end_lb_time = CkWallTimer();
1393       CkPrintf("CharmLB> %s: PE [%d] step %d finished at %f duration %f s\n\n",
1394                 lbname, cur_ld_balancer, step()-1, end_lb_time,
1395                 end_lb_time-start_lb_time);
1396     }
1397
1398     adaptive_struct.lb_migration_cost = (CkWallTimer() - start_lb_time);
1399
1400     lbdone = 0;
1401     future_migrates_expected = -1;
1402     future_migrates_completed = 0;
1403
1404
1405     DEBUGF(("[%d] Migration Complete\n", CkMyPe()));
1406     // release local barrier  so that the next load balancer can go
1407     LDOMHandle h;
1408     h.id.id.idx = 0;
1409     theLbdb->getLBDB()->DoneRegisteringObjects(h);
1410     // switch to the next load balancer in the list
1411     // subtle: called from Migrated() may result in Migrated() called in next LB
1412     theLbdb->nextLoadbalancer(seqno);
1413   }
1414 #endif
1415 }
1416
1417 void CentralLB::preprocess(LDStats* stats)
1418 {
1419   if (_lb_args.ignoreBgLoad())
1420     stats->clearBgLoad();
1421
1422   // Call the predictor for the future
1423   if (_lb_predict) FuturePredictor(statsData);
1424 }
1425
1426 // default load balancing strategy
1427 LBMigrateMsg* CentralLB::Strategy(LDStats* stats)
1428 {
1429 #if CMK_LBDB_ON
1430   double strat_start_time = CkWallTimer();
1431   if (_lb_args.debug())
1432     CkPrintf("CharmLB> %s: PE [%d] strategy starting at %f\n", lbname, cur_ld_balancer, strat_start_time);
1433
1434   work(stats);
1435
1436   if (_lb_args.debug()>1)  {
1437     CkPrintf("CharmLB> Obj Map:\n");
1438     for (int i=0; i<stats->n_objs; i++) CkPrintf("%d ", stats->to_proc[i]);
1439     CkPrintf("\n");
1440   }
1441
1442   LBMigrateMsg *msg = createMigrateMsg(stats);
1443
1444   if (_lb_args.debug()) {
1445     double strat_end_time = CkWallTimer();
1446     envelope *env = UsrToEnv(msg);
1447
1448     double lbdbMemsize = LBDatabase::Object()->useMem()/1000;
1449     CkPrintf("CharmLB> %s: PE [%d] Memory: LBManager: %d KB CentralLB: %d KB\n",
1450               lbname, cur_ld_balancer, (int)lbdbMemsize, (int)(useMem()/1000));
1451     CkPrintf("CharmLB> %s: PE [%d] #Objects migrating: %d, LBMigrateMsg size: %.2f MB\n", lbname, cur_ld_balancer, msg->n_moves, env->getTotalsize()/1024.0/1024.0);
1452     CkPrintf("CharmLB> %s: PE [%d] strategy finished at %f duration %f s\n",
1453               lbname, cur_ld_balancer, strat_end_time, strat_end_time-strat_start_time);
1454     adaptive_struct.lb_strategy_cost = (strat_end_time - strat_start_time);
1455     CkPrintf("Strategy cost %f %f %f\n", strat_end_time, strat_start_time, adaptive_struct.lb_strategy_cost);
1456   }
1457   return msg;
1458 #else
1459   return NULL;
1460 #endif
1461 }
1462
1463 void CentralLB::work(LDStats* stats)
1464 {
1465   // does nothing but print the database
1466   stats->print();
1467 }
1468
1469 // generate migrate message from stats->from_proc and to_proc
1470 LBMigrateMsg * CentralLB::createMigrateMsg(LDStats* stats)
1471 {
1472   int i;
1473   CkVec<MigrateInfo*> migrateInfo;
1474   for (i=0; i<stats->n_objs; i++) {
1475     LDObjData &objData = stats->objData[i];
1476     int frompe = stats->from_proc[i];
1477     int tope = stats->to_proc[i];
1478     if (frompe != tope) {
1479       //      CkPrintf("[%d] Obj %d migrating from %d to %d\n",
1480       //         CkMyPe(),obj,pe,dest);
1481       MigrateInfo *migrateMe = new MigrateInfo;
1482       migrateMe->obj = objData.handle;
1483       migrateMe->from_pe = frompe;
1484       migrateMe->to_pe = tope;
1485       migrateMe->async_arrival = objData.asyncArrival;
1486       migrateInfo.insertAtEnd(migrateMe);
1487     }
1488   }
1489
1490   int migrate_count=migrateInfo.length();
1491   LBMigrateMsg* msg = new(migrate_count,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
1492   msg->n_moves = migrate_count;
1493   for(i=0; i < migrate_count; i++) {
1494     MigrateInfo* item = (MigrateInfo*) migrateInfo[i];
1495     msg->moves[i] = *item;
1496     delete item;
1497     migrateInfo[i] = 0;
1498   }
1499   return msg;
1500 }
1501
1502 LBMigrateMsg * CentralLB::extractMigrateMsg(LBMigrateMsg *m, int p)
1503 {
1504   int nmoves = 0;
1505   int nunavail = 0;
1506   int i;
1507   for (i=0; i<m->n_moves; i++) {
1508     MigrateInfo* item = (MigrateInfo*) &m->moves[i];
1509     if (item->from_pe == p || item->to_pe == p) nmoves++;
1510   }
1511   for (i=0; i<CkNumPes();i++) {
1512     if (!m->avail_vector[i]) nunavail++;
1513   }
1514   LBMigrateMsg* msg;
1515   if (nunavail) msg = new(nmoves,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
1516   else msg = new(nmoves,0,0,0) LBMigrateMsg;
1517   msg->n_moves = nmoves;
1518   msg->level = m->level;
1519   msg->next_lb = m->next_lb;
1520   for (i=0,nmoves=0; i<m->n_moves; i++) {
1521     MigrateInfo* item = (MigrateInfo*) &m->moves[i];
1522     if (item->from_pe == p || item->to_pe == p) {
1523       msg->moves[nmoves] = *item;
1524       nmoves++;
1525     }
1526   }
1527   // copy processor data
1528   if (nunavail)
1529   for (i=0; i<CkNumPes();i++) {
1530     msg->avail_vector[i] = m->avail_vector[i];
1531     msg->expectedLoad[i] = m->expectedLoad[i];
1532   }
1533   return msg;
1534 }
1535
1536 void CentralLB::simulationWrite() {
1537   if(step() == LBSimulation::dumpStep)
1538   {
1539     // here we are supposed to dump the database
1540     int dumpFileSize = strlen(LBSimulation::dumpFile) + 4;
1541     char *dumpFileName = (char *)malloc(dumpFileSize);
1542     while (sprintf(dumpFileName, "%s.%d", LBSimulation::dumpFile, LBSimulation::dumpStep) >= dumpFileSize) {
1543       free(dumpFileName);
1544       dumpFileSize+=3;
1545       dumpFileName = (char *)malloc(dumpFileSize);
1546     }
1547     writeStatsMsgs(dumpFileName);
1548     free(dumpFileName);
1549     CmiPrintf("LBDump: Dumped the load balancing data at step %d.\n",LBSimulation::dumpStep);
1550     ++LBSimulation::dumpStep;
1551     --LBSimulation::dumpStepSize;
1552     if (LBSimulation::dumpStepSize <= 0) { // prevent stupid step sizes
1553       CmiPrintf("Charm++> Exiting...\n");
1554       CkExit();
1555     }
1556     return;
1557   }
1558 }
1559
1560 void CentralLB::simulationRead() {
1561   LBSimulation *simResults = NULL, *realResults;
1562   LBMigrateMsg *voidMessage = new (0,0,0,0) LBMigrateMsg();
1563   voidMessage->n_moves=0;
1564   for ( ;LBSimulation::simStepSize > 0; --LBSimulation::simStepSize, ++LBSimulation::simStep) {
1565     // here we are supposed to read the data from the dump database
1566     int simFileSize = strlen(LBSimulation::dumpFile) + 4;
1567     char *simFileName = (char *)malloc(simFileSize);
1568     while (sprintf(simFileName, "%s.%d", LBSimulation::dumpFile, LBSimulation::simStep) >= simFileSize) {
1569       free(simFileName);
1570       simFileSize+=3;
1571       simFileName = (char *)malloc(simFileSize);
1572     }
1573     readStatsMsgs(simFileName);
1574
1575     // allocate simResults (only the first step)
1576     if (simResults == NULL) {
1577       simResults = new LBSimulation(LBSimulation::simProcs);
1578       realResults = new LBSimulation(LBSimulation::simProcs);
1579     }
1580     else {
1581       // should be the same number of procs of the original simulation!
1582       if (!LBSimulation::procsChanged) {
1583         // it means we have a previous step, so in simResults there is data.
1584         // we can now print the real effects of the load balancer during the simulation
1585         // or print the difference between the predicted data and the real one.
1586         realResults->reset();
1587         // reset to_proc of statsData to be equal to from_proc
1588         for (int k=0; k < statsData->n_objs; ++k) statsData->to_proc[k] = statsData->from_proc[k];
1589         findSimResults(statsData, LBSimulation::simProcs, voidMessage, realResults);
1590         simResults->PrintDifferences(realResults,statsData);
1591       }
1592       simResults->reset();
1593     }
1594
1595     // now pass it to the strategy routine
1596     double startT = CkWallTimer();
1597     preprocess(statsData);
1598     CmiPrintf("%s> Strategy starts ... \n", lbname);
1599     LBMigrateMsg* migrateMsg = Strategy(statsData);
1600     CmiPrintf("%s> Strategy took %fs memory usage: CentralLB: %d KB.\n",
1601                lbname, CkWallTimer()-startT, (int)(useMem()/1000));
1602
1603     // now calculate the results of the load balancing simulation
1604     findSimResults(statsData, LBSimulation::simProcs, migrateMsg, simResults);
1605
1606     // now we have the simulation data, so print it and loop
1607     CmiPrintf("Charm++> LBSim: Simulation of load balancing step %d done.\n",LBSimulation::simStep);
1608     // **CWL** Officially recording my disdain here for using ints for bool
1609     if (LBSimulation::showDecisionsOnly) {
1610       simResults->PrintDecisions(migrateMsg, simFileName, 
1611                                  LBSimulation::simProcs);
1612     } else {
1613       simResults->PrintSimulationResults();
1614     }
1615
1616     free(simFileName);
1617     delete migrateMsg;
1618     CmiPrintf("Charm++> LBSim: Passing to the next step\n");
1619   }
1620   // deallocate simResults
1621   delete simResults;
1622   CmiPrintf("Charm++> Exiting...\n");
1623   CkExit();
1624 }
1625
1626 void CentralLB::readStatsMsgs(const char* filename) 
1627 {
1628 #if CMK_LBDB_ON
1629   int i;
1630   FILE *f = fopen(filename, "r");
1631   if (f==NULL) {
1632     CmiPrintf("Fatal Error> Cannot open LB Dump file %s!\n", filename);
1633     CmiAbort("");
1634   }
1635
1636   // at this stage, we need to rebuild the statsMsgList and
1637   // statsDataList structures. For that first deallocate the
1638   // old structures
1639   if (statsMsgsList) {
1640     for(i = 0; i < stats_msg_count; i++)
1641       delete statsMsgsList[i];
1642     delete[] statsMsgsList;
1643     statsMsgsList=0;
1644   }
1645
1646   PUP::fromDisk pd(f);
1647   PUP::machineInfo machInfo;
1648
1649   pd((char *)&machInfo, sizeof(machInfo));      // read machine info
1650   PUP::xlater p(machInfo, pd);
1651
1652   if (_lb_args.lbversion() > 1) {
1653     p|_lb_args.lbversion();             // write version number
1654     CkPrintf("LB> File version detected: %d\n", _lb_args.lbversion());
1655     CmiAssert(_lb_args.lbversion() <= LB_FORMAT_VERSION);
1656   }
1657   p|stats_msg_count;
1658
1659   CmiPrintf("readStatsMsgs for %d pes starts ... \n", stats_msg_count);
1660   if (LBSimulation::simProcs == 0) LBSimulation::simProcs = stats_msg_count;
1661   if (LBSimulation::simProcs != stats_msg_count) LBSimulation::procsChanged = true;
1662
1663   // LBSimulation::simProcs must be set
1664   statsData->pup(p);
1665
1666   CmiPrintf("Simulation for %d pes \n", LBSimulation::simProcs);
1667   CmiPrintf("n_obj: %d n_migratble: %d \n", statsData->n_objs, statsData->n_migrateobjs);
1668
1669   // file f is closed in the destructor of PUP::fromDisk
1670   CmiPrintf("ReadStatsMsg from %s completed\n", filename);
1671 #endif
1672 }
1673
1674 void CentralLB::writeStatsMsgs(const char* filename) 
1675 {
1676 #if CMK_LBDB_ON
1677   FILE *f = fopen(filename, "w");
1678   if (f==NULL) {
1679     CmiPrintf("Fatal Error> writeStatsMsgs failed to open the output file %s!\n", filename);
1680     CmiAbort("");
1681   }
1682
1683   const PUP::machineInfo &machInfo = PUP::machineInfo::current();
1684   PUP::toDisk p(f);
1685   p((char *)&machInfo, sizeof(machInfo));       // machine info
1686
1687   p|_lb_args.lbversion();               // write version number
1688   p|stats_msg_count;
1689   statsData->pup(p);
1690
1691   fclose(f);
1692
1693   CmiPrintf("WriteStatsMsgs to %s succeed!\n", filename);
1694 #endif
1695 }
1696
1697 // calculate the predicted wallclock/cpu load for every processors
1698 // considering communication overhead if considerComm is true
1699 void getPredictedLoadWithMsg(BaseLB::LDStats* stats, int count, 
1700                       LBMigrateMsg *msg, LBInfo &info, 
1701                       int considerComm)
1702 {
1703 #if CMK_LBDB_ON
1704         stats->makeCommHash();
1705
1706         // update to_proc according to migration msgs
1707         for(int i = 0; i < msg->n_moves; i++) {
1708           MigrateInfo &mInfo = msg->moves[i];
1709           int idx = stats->getHash(mInfo.obj.objID(), mInfo.obj.omID());
1710           CmiAssert(idx != -1);
1711           stats->to_proc[idx] = mInfo.to_pe;
1712         }
1713
1714         info.getInfo(stats, count, considerComm);
1715 #endif
1716 }
1717
1718
1719 void CentralLB::findSimResults(LDStats* stats, int count, LBMigrateMsg* msg, LBSimulation* simResults)
1720 {
1721     CkAssert(simResults != NULL && count == simResults->numPes);
1722     // estimate the new loads of the processors. As a first approximation, this is the
1723     // sum of the cpu times of the objects on that processor
1724     double startT = CkWallTimer();
1725     getPredictedLoadWithMsg(stats, count, msg, simResults->lbinfo, 1);
1726     CmiPrintf("getPredictedLoad finished in %fs\n", CkWallTimer()-startT);
1727 }
1728
1729 void CentralLB::pup(PUP::er &p) { 
1730   BaseLB::pup(p); 
1731   if (p.isUnpacking())  {
1732     initLB(CkLBOptions(seqno)); 
1733   }
1734   p|reduction_started;
1735   int has_statsMsg=0;
1736   if (p.isPacking()) has_statsMsg = (statsMsg!=NULL);
1737   p|has_statsMsg;
1738   if (has_statsMsg) {
1739     if (p.isUnpacking())
1740       statsMsg = new CLBStatsMsg;
1741     statsMsg->pup(p);
1742   }
1743 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1744   p | lbDecisionCount;
1745   p | resumeCount;
1746 #endif
1747         
1748 }
1749
1750 int CentralLB::useMem() { 
1751   return sizeof(CentralLB) + statsData->useMem() + 
1752          CkNumPes() * sizeof(CLBStatsMsg *);
1753 }
1754
1755
1756 /**
1757   CLBStatsMsg is not a real message now.
1758   CLBStatsMsg is used for all processors to fill in their local load and comm
1759   statistics and send to processor 0
1760 */
1761
1762 CLBStatsMsg::CLBStatsMsg(int osz, int csz) {
1763   n_objs = osz;
1764   n_comm = csz;
1765   objData = new LDObjData[osz];
1766   commData = new LDCommData[csz];
1767   avail_vector = NULL;
1768 }
1769
1770 CLBStatsMsg::~CLBStatsMsg() {
1771   delete [] objData;
1772   delete [] commData;
1773   delete [] avail_vector;
1774 }
1775
1776 void CLBStatsMsg::pup(PUP::er &p) {
1777   int i;
1778   p|from_pe;
1779   p|pe_speed;
1780   p|total_walltime;
1781   p|idletime;
1782   p|bg_walltime;
1783 #if CMK_LB_CPUTIMER
1784   p|total_cputime;
1785   p|bg_cputime;
1786 #endif
1787 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1788   p | step;
1789 #endif
1790   p|n_objs;
1791   if (p.isUnpacking()) objData = new LDObjData[n_objs];
1792   for (i=0; i<n_objs; i++) p|objData[i];
1793   p|n_comm;
1794   if (p.isUnpacking()) commData = new LDCommData[n_comm];
1795   for (i=0; i<n_comm; i++) p|commData[i];
1796
1797   int has_avail_vector;
1798   if (!p.isUnpacking()) has_avail_vector = (avail_vector != NULL);
1799   p|has_avail_vector;
1800   if (p.isUnpacking()) {
1801     if (has_avail_vector) avail_vector = new char[CkNumPes()];
1802     else avail_vector = NULL;
1803   }
1804   if (has_avail_vector) p(avail_vector, CkNumPes());
1805
1806   p(next_lb);
1807 }
1808
1809 // CkMarshalledCLBStatsMessage is used in the marshalled parameter in
1810 // the entry function, it is just used to use to pup.
1811 // I don't use CLBStatsMsg directly as marshalled parameter because
1812 // I want the data pointer stored and not to be freed by the Charm++.
1813 void CkMarshalledCLBStatsMessage::free() { 
1814   int count = msgs.size();
1815   for  (int i=0; i<count; i++) {
1816     delete msgs[i];
1817     msgs[i] = NULL;
1818   }
1819   msgs.free();
1820 }
1821
1822 void CkMarshalledCLBStatsMessage::add(CkMarshalledCLBStatsMessage &m)
1823 {
1824   int count = m.getCount();
1825   for (int i=0; i<count; i++) add(m.getMessage(i));
1826 }
1827
1828 void CkMarshalledCLBStatsMessage::pup(PUP::er &p)
1829 {
1830   int count = msgs.size();
1831   p|count;
1832   for (int i=0; i<count; i++) {
1833     CLBStatsMsg *msg;
1834     if (p.isUnpacking()) msg = new CLBStatsMsg;
1835     else { 
1836       msg = msgs[i]; CmiAssert(msg!=NULL);
1837     }
1838     msg->pup(p);
1839     if (p.isUnpacking()) add(msg);
1840   }
1841 }
1842
1843 SpanningTree::SpanningTree()
1844 {
1845         double sq = sqrt(CkNumPes()*4.0-3.0) - 1; // 1 + arity + arity*arity = CkNumPes()
1846         arity = (int)ceil(sq/2);
1847         calcParent(CkMyPe());
1848         calcNumChildren(CkMyPe());
1849 }
1850
1851 void SpanningTree::calcParent(int n)
1852 {
1853         parent=-1;
1854         if(n != 0  && arity > 0)
1855                 parent = (n-1)/arity;
1856 }
1857
1858 void SpanningTree::calcNumChildren(int n)
1859 {
1860         numChildren = 0;
1861         if (arity == 0) return;
1862         int fullNode=(CkNumPes()-1-arity)/arity;
1863         if(n <= fullNode)
1864                 numChildren = arity;
1865         if(n == fullNode+1)
1866                 numChildren = CkNumPes()-1-(fullNode+1)*arity;
1867         if(n > fullNode+1)
1868                 numChildren = 0;
1869 }
1870
1871 #include "CentralLB.def.h"
1872  
1873 /*@}*/