453ebc90ac148607fd59e7325219fedac080ff45
[charm.git] / src / ck-ldb / CentralLB.C
1
2 /**
3  * \addtogroup CkLdb
4 */
5 /*@{*/
6
7 #include <charm++.h>
8 #include "ck.h"
9 #include "envelope.h"
10 #include "CentralLB.h"
11 #include "LBDBManager.h"
12 #include "LBSimulation.h"
13
14 //#include "limits.h"
15 #include <vector>
16
17 #define alpha 4.0e-6
18 #define beta 2.67e-9
19 #define percent_overhead 10
20
21
22 #define  DEBUGF(x)       // CmiPrintf x;
23 #define  DEBUG(x)        // x;
24 #define  DEBAD(x)        // CmiPrintf x
25
26 #if CMK_MEM_CHECKPOINT
27    /* can not handle reduction in inmem FT */
28 #define USE_REDUCTION         0
29 #define USE_LDB_SPANNING_TREE 0
30 #elif defined(_FAULT_MLOG_)
31 /* can not handle reduction in inmem FT */
32 #define USE_REDUCTION         0
33 #define USE_LDB_SPANNING_TREE 0
34 #else
35 #define USE_REDUCTION         1
36 #define USE_LDB_SPANNING_TREE 1
37 #endif
38
39 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
40 extern int _restartFlag;
41 extern void getGlobalStep(CkGroupID );
42 extern void initMlogLBStep(CkGroupID );
43 extern int globalResumeCount;
44 extern void sendDummyMigrationCounts(int *);
45 #endif
46
47 #if CMK_GRID_QUEUE_AVAILABLE
48 CpvExtern(void *, CkGridObject);
49 #endif
50
51 CkGroupID loadbalancer;
52 int * lb_ptr;
53 int load_balancer_created;
54
55 //struct AdaptiveData {
56 //  int iteration;
57 //  double max_load;
58 //  double avg_load;
59 //};
60 //
61 //struct AdaptiveLBDatabase {
62 //  std::vector<AdaptiveData> history_data;
63 //} adaptive_lbdb;
64 //
65 //enum state {
66 //  OFF,
67 //  ON,
68 //  PAUSE,
69 //  DECIDED,
70 //  LOAD_BALANCE
71 //} local_state;
72 //
73 //struct AdaptiveLBStructure {
74 //  int lb_ideal_period;
75 //  int lb_calculated_period;
76 //  int lb_no_iterations;
77 //  int global_max_iter_no;
78 //  int global_recv_iter_counter;
79 //  bool in_progress;
80 //  double prev_load;
81 //  double lb_strategy_cost;
82 //  double lb_migration_cost;
83 //  bool lb_period_informed;
84 //  int lb_msg_send_no;
85 //  int lb_msg_recv_no;
86 //} adaptive_struct;
87
88 CreateLBFunc_Def(CentralLB, "CentralLB base class")
89
90 static void getPredictedLoadWithMsg(BaseLB::LDStats* stats, int count, 
91                              LBMigrateMsg *, LBInfo &info, int considerComm);
92
93 //CkReductionMsg* lbDataCollection(int nMsg, CkReductionMsg** msgs) {
94 //  double lb_data[4];
95 //  lb_data[0] = 0;
96 //  lb_data[1] = 0;
97 //  lb_data[2] = 0;
98 //  for (int i = 0; i < nMsg; i++) {
99 //    CkAssert(msgs[i]->getSize() == 4*sizeof(double));
100 //    double* m = (double *)msgs[i]->getData();
101 //    lb_data[0] += m[0];
102 //    lb_data[1] = ((m[1] > lb_data[1])? m[1] : lb_data[1]);
103 //    lb_data[2] += m[2];
104 //    if (i == 0) {
105 //      lb_data[3] = m[3];
106 //    }
107 //    if (m[3] != lb_data[3]) {
108 //      CkPrintf("Error!!! Reduction is intermingled between iteration %lf and\
109 //      %lf\n", lb_data[3], m[3]);
110 //    }
111 //  }
112 //  return CkReductionMsg::buildNew(4*sizeof(double), lb_data);
113 //}
114 //
115 ///*global*/ CkReduction::reducerType lbDataCollectionType;
116 ///*initcall*/ void registerLBDataCollection(void) {
117 //  lbDataCollectionType = CkReduction::addReducer(lbDataCollection);
118 //}
119
120 /*
121 void CreateCentralLB()
122 {
123   CProxy_CentralLB::ckNew(0);
124 }
125 */
126
127 void CentralLB::staticStartLB(void* data)
128 {
129   CentralLB *me = (CentralLB*)(data);
130   me->StartLB();
131 }
132
133 void CentralLB::staticMigrated(void* data, LDObjHandle h, int waitBarrier)
134 {
135   CentralLB *me = (CentralLB*)(data);
136   me->Migrated(h, waitBarrier);
137 }
138
139 void CentralLB::staticAtSync(void* data)
140 {
141   CentralLB *me = (CentralLB*)(data);
142   me->AtSync();
143 }
144
145 void CentralLB::initLB(const CkLBOptions &opt)
146 {
147 #if CMK_LBDB_ON
148   lbname = "CentralLB";
149   thisProxy = CProxy_CentralLB(thisgroup);
150   //  CkPrintf("Construct in %d\n",CkMyPe());
151
152   // create and turn on by default
153   receiver = theLbdb->
154     AddLocalBarrierReceiver((LDBarrierFn)(staticAtSync),(void*)(this));
155   notifier = theLbdb->getLBDB()->
156     NotifyMigrated((LDMigratedFn)(staticMigrated),(void*)(this));
157   startLbFnHdl = theLbdb->getLBDB()->
158     AddStartLBFn((LDStartLBFn)(staticStartLB),(void*)(this));
159
160   // CkPrintf("[%d] CentralLB initLB \n",CkMyPe());
161   if (opt.getSeqNo() > 0) turnOff();
162
163   stats_msg_count = 0;
164   statsMsgsList = NULL;
165   statsData = NULL;
166
167   storedMigrateMsg = NULL;
168   reduction_started = 0;
169
170   // for future predictor
171   if (_lb_predict) predicted_model = new FutureModel(_lb_predict_window);
172   else predicted_model=0;
173   // register user interface callbacks
174   theLbdb->getLBDB()->SetupPredictor((LDPredictModelFn)(staticPredictorOn),(LDPredictWindowFn)(staticPredictorOnWin),(LDPredictFn)(staticPredictorOff),(LDPredictModelFn)(staticChangePredictor),(void*)(this));
175
176   myspeed = theLbdb->ProcessorSpeed();
177
178   migrates_completed = 0;
179   future_migrates_completed = 0;
180   migrates_expected = -1;
181   future_migrates_expected = -1;
182   cur_ld_balancer = _lb_args.central_pe();      // 0 default
183   lbdone = 0;
184   count_msgs=0;
185   statsMsg = NULL;
186
187   if (_lb_args.statsOn()) theLbdb->CollectStatsOn();
188
189   load_balancer_created = 1;
190
191   // If metabalancer enabled, initialize the variables
192  // adaptive_struct.lb_ideal_period =  INT_MAX;
193  // adaptive_struct.lb_calculated_period = INT_MAX;
194  // adaptive_struct.lb_no_iterations = -1;
195  // adaptive_struct.global_max_iter_no = 0;
196  // adaptive_struct.global_recv_iter_counter = 0;
197  // adaptive_struct.in_progress = false;
198  // adaptive_struct.prev_load = 0.0;
199  // adaptive_struct.lb_strategy_cost = 0.0;
200  // adaptive_struct.lb_migration_cost = 0.0;
201  // adaptive_struct.lb_msg_send_no = 0;
202  // adaptive_struct.lb_msg_recv_no = 0;
203  // local_state = OFF;
204 #endif
205 }
206
207 CentralLB::~CentralLB()
208 {
209 #if CMK_LBDB_ON
210   delete [] statsMsgsList;
211   delete statsData;
212   theLbdb = CProxy_LBDatabase(_lbdb).ckLocalBranch();
213   if (theLbdb) {
214     theLbdb->getLBDB()->
215       RemoveNotifyMigrated(notifier);
216     theLbdb->
217       RemoveStartLBFn((LDStartLBFn)(staticStartLB));
218   }
219 #endif
220 }
221
222 void CentralLB::turnOn() 
223 {
224 #if CMK_LBDB_ON
225   theLbdb->getLBDB()->
226     TurnOnBarrierReceiver(receiver);
227   theLbdb->getLBDB()->
228     TurnOnNotifyMigrated(notifier);
229   theLbdb->getLBDB()->
230     TurnOnStartLBFn(startLbFnHdl);
231 #endif
232 }
233
234 void CentralLB::turnOff() 
235 {
236 #if CMK_LBDB_ON
237   theLbdb->getLBDB()->
238     TurnOffBarrierReceiver(receiver);
239   theLbdb->getLBDB()->
240     TurnOffNotifyMigrated(notifier);
241   theLbdb->getLBDB()->
242     TurnOffStartLBFn(startLbFnHdl);
243 #endif
244 }
245
246 void CentralLB::AtSync()
247 {
248 //  CkPrintf("AtSync CEntral LB [%d]\n", CkMyPe());
249 #if CMK_LBDB_ON
250 //  DEBUGF(("[%d] CentralLB AtSync step %d!!!!!\n",CkMyPe(),step()));
251
252 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
253         CpvAccess(_currentObj)=this;
254 #endif
255
256   // if num of processor is only 1, nothing should happen
257   if (!QueryBalanceNow(step()) || CkNumPes() == 1) {
258     MigrationDone(0);
259     return;
260   }
261   if(CmiNodeAlive(CkMyPe())){
262     thisProxy [CkMyPe()].ProcessAtSync();
263   }
264 #endif
265 }
266
267 #include "ComlibStrategy.h"
268
269 void CentralLB::ProcessAtSync()
270 {
271
272
273
274 #if CMK_LBDB_ON
275   if (reduction_started) return;              // reducton in progress
276
277   CmiAssert(CmiNodeAlive(CkMyPe()));
278   if (CkMyPe() == cur_ld_balancer) {
279     start_lb_time = CkWallTimer();
280   }
281  double total_load;
282  double idle_time;
283  double bg_walltime;
284  theLbdb->GetTime(&total_load,&total_load, &idle_time, &bg_walltime, &bg_walltime);
285  theLbdb->IdleTime(&idle_time);
286  DEBAD(("Total walltime [%d] %lf: %lf: %lf final laod: %lf\n", CkMyPe(),
287     total_load, idle_time, bg_walltime, (total_load - idle_time - bg_walltime)));
288
289 // CkPrintf("Total walltime [%d] %lf: %lf: %lf final laod: %lf\n", CkMyPe(),
290 //    total_load, idle_time, bg_walltime, (total_load - idle_time - bg_walltime));
291
292
293 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
294         initMlogLBStep(thisgroup);
295 #endif
296
297   // build message
298   BuildStatsMsg();
299
300 #if USE_REDUCTION
301     // reduction to get total number of objects and comm
302     // so that processor 0 can pre-allocate load balancing database
303   int counts[2];
304   counts[0] = theLbdb->GetObjDataSz();
305   counts[1] = theLbdb->GetCommDataSz();
306
307   CkCallback cb(CkIndex_CentralLB::ReceiveCounts((CkReductionMsg*)NULL), 
308                   thisProxy[0]);
309   contribute(2*sizeof(int), counts, CkReduction::sum_int, cb);
310   reduction_started = 1;
311 #else
312   SendStats();
313 #endif
314 #endif
315 }
316
317 //void CentralLB::ProcessAtSyncMin()
318 //{
319 //#if CMK_LBDB_ON
320 //  if (reduction_started) return;              // reducton in progress
321 //
322 //  CmiAssert(CmiNodeAlive(CkMyPe()));
323 //  if (CkMyPe() == cur_ld_balancer) {
324 //    start_lb_time = CkWallTimer();
325 //  }
326 //
327 //
328 //#if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
329 //      initMlogLBStep(thisgroup);
330 //#endif
331 //  
332 //  adaptive_struct.lb_no_iterations++;
333 // // CkPrintf("[%d] ProcessAtSyncMin lb_iteration [%d] adaptive_struct.lb_ideal_period [%d]\n", CkMyPe(),
334 // //     adaptive_struct.lb_no_iterations, adaptive_struct.lb_ideal_period);
335 //
336 //  // If decision has been made and has reached the lb_period, then do load
337 //  // balancing, else if hasn't reached ideal_period, then resume.
338 //  if (local_state == DECIDED) {
339 //    if (adaptive_struct.lb_no_iterations < adaptive_struct.lb_ideal_period) {
340 // //     CkPrintf("[%d] Decision is made but lagging\n", CkMyPe());
341 //      SendMinStats();
342 //      ResumeClients(0);
343 //    } else {
344 //      local_state = LOAD_BALANCE;
345 // //     CkPrintf("[%d] Decision is made and do LB\n", CkMyPe());
346 //      ProcessAtSync();
347 //    }
348 //    return;
349 //  }
350 //   
351 //  // If the state is ON and not DECIDED, then if havn't reached lb_period, then
352 //  // move ahead. If has reached lb_ideal_period, then change state to PAUSE and
353 //  // dont resume client.
354 //  if (local_state == ON) {
355 //    if (adaptive_struct.lb_no_iterations < adaptive_struct.lb_ideal_period) {
356 //      SendMinStats();
357 //      ResumeClients(0);
358 //    } else {
359 //      local_state = PAUSE;
360 //    }
361 //    return;
362 //  }
363 //
364 //  SendMinStats();
365 //  ResumeClients(0);
366 //#endif
367 //}
368 //
369 //void CentralLB::SendMinStats() {
370 //
371 // double total_load;
372 // double idle_time;
373 // double bg_walltime;
374 //  theLbdb->GetTime(&total_load,&total_load, &idle_time, &bg_walltime, &bg_walltime);
375 // // CkPrintf("Total walltime [%d] %lf: %lf: %lf final laod: %lf\n", CkMyPe(), total_load, idle_time, bg_walltime, (total_load - idle_time));
376 //
377 //  // Since the total_load is cumulative since the last load balancing stage,
378 //  // Hence it is subtracted from the previous load.
379 //  total_load -= idle_time;
380 //  double tmp = total_load;
381 //  total_load -= adaptive_struct.prev_load;
382 //  adaptive_struct.prev_load = tmp; 
383 //
384 //  double lb_data[4];
385 //  lb_data[0] = total_load;
386 //  lb_data[1] = total_load;
387 //  lb_data[2] = 1;
388 //  lb_data[3] = adaptive_struct.lb_no_iterations;
389 //  //CkPrintf("[%d] sends total load %lf at iter %d\n", CkMyPe(), total_load, adaptive_struct.lb_no_iterations);
390 //
391 //  if (adaptive_struct.lb_no_iterations != 0) {
392 //    CkCallback cb(CkIndex_CentralLB::ReceiveMinStats((CkReductionMsg*)NULL), 
393 //        thisProxy[0]);
394 ////    contribute(4*sizeof(double), lb_data, lbDataCollectionType, cb);
395 //  }
396 //
397 ////    int tmp1 = adaptive_struct.lb_no_iterations;
398 ////    CkPrintf("[%d] contribution iteration_no: %d\n",CkMyPe(), tmp1);
399 ////    // Send the current iteration no
400 ////    CkCallback cb1(CkIndex_CentralLB::ReceiveIterationNo((CkReductionMsg*)NULL), 
401 ////        thisProxy[0]);
402 ////    contribute(sizeof(int), &tmp1, CkReduction::max_int, cb1);
403 //}
404 //
405 //void CentralLB::ReceiveMinStats(CkReductionMsg *msg) {
406 //  CmiAssert(CkMyPe() == 0);
407 //  double* load = (LBRealType *) msg->getData();
408 //  double max = load[1];
409 //  double avg = load[0]/load[2];
410 //  int iteration_n = load[3];
411 //  CkPrintf("Iteration %d Total load : %lf Avg load: %lf Max load: %lf for %lf procs\n",iteration_n, load[0], load[0]/load[2], load[1], load[2]);
412 //  CkPrintf("Current calculated period %d\n", adaptive_struct.lb_calculated_period);
413 //  delete msg;
414 //
415 //  // Store the data for this iteration
416 //  AdaptiveData data;
417 //  data.iteration = adaptive_struct.lb_no_iterations;
418 //  data.max_load = max;
419 //  data.avg_load = avg;
420 //  adaptive_lbdb.history_data.push_back(data);
421 //
422 //  // If lb period inform is in progress, dont inform again
423 //  if (adaptive_struct.in_progress) {
424 //    return;
425 //  }
426 //
427 ////  if (adaptive_struct.lb_period_informed) {
428 ////    return;
429 ////  }
430 //
431 //  // If the max/avg ratio is greater than the threshold and also this is not the
432 //  // step immediately after load balancing, carry out load balancing
433 //  //if (max/avg >= 1.1 && adaptive_lbdb.history_data.size() > 4) {
434 //  if (max/avg >= 1.5 && adaptive_lbdb.history_data.size() > 4) {
435 //    CkPrintf("Carry out load balancing step at iter max/avg(%lf) > 1.1\n", max/avg);
436 ////    if (!adaptive_struct.lb_period_informed) {
437 ////      // Just for testing
438 ////      adaptive_struct.lb_calculated_period = 40;
439 ////      adaptive_struct.lb_period_informed = true;
440 ////      thisProxy.LoadBalanceDecision(adaptive_struct.lb_calculated_period);
441 ////      return;
442 ////    }
443 //
444 //    // If the new lb period is less than current set lb period
445 //    if (adaptive_struct.lb_calculated_period > iteration_n + 1) {
446 //      adaptive_struct.lb_calculated_period = iteration_n + 1;
447 //      adaptive_struct.lb_period_informed = true;
448 //      adaptive_struct.in_progress = true;
449 //      CkPrintf("Informing everyone the lb period is %d\n",
450 //          adaptive_struct.lb_calculated_period);
451 //      thisProxy.LoadBalanceDecision(adaptive_struct.lb_msg_send_no++, adaptive_struct.lb_calculated_period);
452 //    }
453 //    return;
454 //  }
455 //
456 //  // Generate the plan for the adaptive strategy
457 //  int period;
458 //  if (generatePlan(period)) {
459 //    //CkPrintf("Carry out load balancing step at iter\n");
460 //
461 //    // If the new lb period is less than current set lb period
462 //    if (adaptive_struct.lb_calculated_period > period) {
463 //      adaptive_struct.lb_calculated_period = period;
464 //      adaptive_struct.in_progress = true;
465 //      adaptive_struct.lb_period_informed = true;
466 //      CkPrintf("Informing everyone the lb period is %d\n",
467 //          adaptive_struct.lb_calculated_period);
468 //      thisProxy.LoadBalanceDecision(adaptive_struct.lb_msg_send_no++, adaptive_struct.lb_calculated_period);
469 //    }
470 //  }
471 //}
472 //
473 //bool CentralLB::generatePlan(int& period) {
474 //  if (adaptive_lbdb.history_data.size() <= 8) {
475 //    return false;
476 //  }
477 //
478 //  // Some heuristics for lbperiod
479 //  // If constant load or almost constant,
480 //  // then max * new_lb_period > avg * new_lb_period + lb_cost
481 //  double max = 0.0;
482 //  double avg = 0.0;
483 //  AdaptiveData data;
484 //  for (int i = 0; i < adaptive_lbdb.history_data.size(); i++) {
485 //    data = adaptive_lbdb.history_data[i];
486 //    max += data.max_load;
487 //    avg += data.avg_load;
488 //    CkPrintf("max (%d, %lf) avg (%d, %lf)\n", i, data.max_load, i, data.avg_load);
489 //  }
490 ////  max /= (adaptive_struct.lb_no_iterations - adaptive_lbdb.history_data[0].iteration);
491 ////  avg /= (adaptive_struct.lb_no_iterations - adaptive_lbdb.history_data[0].iteration);
492 ////
493 ////  adaptive_struct.lb_ideal_period = (adaptive_struct.lb_strategy_cost +
494 ////  adaptive_struct.lb_migration_cost) / (max - avg);
495 ////  CkPrintf("max : %lf, avg: %lf, strat cost: %lf, migration_cost: %lf, idealperiod : %d \n",
496 ////      max, avg, adaptive_struct.lb_strategy_cost, adaptive_struct.lb_migration_cost, adaptive_struct.lb_ideal_period);
497 ////
498 //  // If linearly varying load, then find lb_period
499 //  // area between the max and avg curve 
500 //  double mslope, aslope, mc, ac;
501 //  getLineEq(aslope, ac, mslope, mc);
502 //  CkPrintf("\n max: %fx + %f; avg: %fx + %f\n", mslope, mc, aslope, ac);
503 //  double a = (mslope - aslope)/2;
504 //  double b = (mc - ac);
505 //  double c = -(adaptive_struct.lb_strategy_cost + adaptive_struct.lb_migration_cost);
506 //  //c = -2.5;
507 //  bool got_period = getPeriodForLinear(a, b, c, period);
508 //  if (!got_period) {
509 //    return false;
510 //  }
511 //  
512 //  if (mslope < 0) {
513 //    if (period > (-mc/mslope)) {
514 //      CkPrintf("Max < 0 Period set when max load is -ve\n");
515 //      return false;
516 //    }
517 //  }
518 //
519 //  if (aslope < 0) {
520 //    if (period > (-ac/aslope)) {
521 //      CkPrintf("Avg < 0 Period set when avg load is -ve\n");
522 //      return false;
523 //    }
524 //  }
525 //
526 //  int intersection_t = (mc-ac) / (aslope - mslope);
527 //  if (intersection_t > 0 && period > intersection_t) {
528 //    CkPrintf("Avg | Max Period set when curves intersect\n");
529 //    return false;
530 //  }
531 //  return true;
532 //}
533 //
534 //bool CentralLB::getPeriodForLinear(double a, double b, double c, int& period) {
535 //  CkPrintf("Quadratic Equation %lf X^2 + %lf X + %lf\n", a, b, c);
536 //  if (a == 0.0) {
537 //    period = (-c / b);
538 //    CkPrintf("Ideal period for linear load %d\n", period);
539 //    return true;
540 //  }
541 //  int x;
542 //  double t = (b * b) - (4*a*c);
543 //  if (t < 0) {
544 //    CkPrintf("(b * b) - (4*a*c) is -ve sqrt : %lf\n", sqrt(t));
545 //    return false;
546 //  }
547 //  t = (-b + sqrt(t)) / (2*a);
548 //  x = t;
549 //  if (x < 0) {
550 //    CkPrintf("boo!!! x (%d) < 0\n", x);
551 //    x = 0;
552 //    return false;
553 //  }
554 //  period = x;
555 //  CkPrintf("Ideal period for linear load %d\n", period);
556 //  return true;
557 //}
558 //
559 //bool CentralLB::getLineEq(double& aslope, double& ac, double& mslope, double& mc) {
560 //  int total = adaptive_lbdb.history_data.size();
561 //  int iterations = 1 + adaptive_lbdb.history_data[total - 1].iteration -
562 //      adaptive_lbdb.history_data[0].iteration;
563 //  double a1 = 0;
564 //  double m1 = 0;
565 //  double a2 = 0;
566 //  double m2 = 0;
567 //  AdaptiveData data;
568 //  int i = 0;
569 //  for (i = 0; i < total/2; i++) {
570 //    data = adaptive_lbdb.history_data[i];
571 //    m1 += data.max_load;
572 //    a1 += data.avg_load;
573 //  }
574 //  m1 /= i;
575 //  a1 /= i;
576 //
577 //  for (i = total/2; i < total; i++) {
578 //    data = adaptive_lbdb.history_data[i];
579 //    m2 += data.max_load;
580 //    a2 += data.avg_load;
581 //  }
582 //  m2 /= (i - total/2);
583 //  a2 /= (i - total/2);
584 //
585 //  aslope = 2 * (a2 - a1) / iterations;
586 //  mslope = 2 * (m2 - m1) / iterations;
587 //  ac = adaptive_lbdb.history_data[0].avg_load;
588 //  mc = adaptive_lbdb.history_data[0].max_load;
589 //  return true;
590 //}
591 //
592 //void CentralLB::LoadBalanceDecision(int req_no, int period) {
593 //  if (req_no < adaptive_struct.lb_msg_recv_no) {
594 //    CkPrintf("Error!!! Received a request which was already sent or old\n");
595 //    return;
596 //  }
597 //  //CkPrintf("[%d] Load balance decision made cur iteration: %d period:%d state: %d\n",CkMyPe(), adaptive_struct.lb_no_iterations, period, local_state);
598 //  adaptive_struct.lb_ideal_period = period;
599 //  local_state = ON;
600 //  adaptive_struct.lb_msg_recv_no = req_no;
601 //  thisProxy[0].ReceiveIterationNo(req_no, adaptive_struct.lb_no_iterations);
602 //}
603 //
604 //void CentralLB::LoadBalanceDecisionFinal(int req_no, int period) {
605 //  if (req_no < adaptive_struct.lb_msg_recv_no) {
606 //    return;
607 //  }
608 //  //CkPrintf("[%d] Final Load balance decision made cur iteration: %d period:%d state: %d\n",CkMyPe(), adaptive_struct.lb_no_iterations, period, local_state);
609 //  adaptive_struct.lb_ideal_period = period;
610 //
611 //  if (local_state == ON) {
612 //    local_state = DECIDED;
613 //    return;
614 //  }
615 //
616 //  // If the state is PAUSE, then its waiting for the final decision from central
617 //  // processor. If the decision is that the ideal period is in the future,
618 //  // resume. If the ideal period is now, then carry out load balancing.
619 //  if (local_state == PAUSE) {
620 //    if (adaptive_struct.lb_no_iterations < adaptive_struct.lb_ideal_period) {
621 //      local_state = DECIDED;
622 //      SendMinStats();
623 //      ResumeClients(0);
624 //    } else {
625 //      local_state = LOAD_BALANCE;
626 //      ProcessAtSync();
627 //    }
628 //    return;
629 //  }
630 //  CkPrintf("Error!!! Final decision received but the state is invalid %d\n", local_state);
631 //}
632 //
633 //
634 //void CentralLB::ReceiveIterationNo(int req_no, int local_iter_no) {
635 //  CmiAssert(CkMyPe() == 0);
636 //
637 //  adaptive_struct.global_recv_iter_counter++;
638 //  if (local_iter_no > adaptive_struct.global_max_iter_no) {
639 //    adaptive_struct.global_max_iter_no = local_iter_no;
640 //  }
641 //  if (CkNumPes() == adaptive_struct.global_recv_iter_counter) {
642 //    adaptive_struct.lb_ideal_period = (adaptive_struct.lb_ideal_period > adaptive_struct.global_max_iter_no) ? adaptive_struct.lb_ideal_period : adaptive_struct.global_max_iter_no + 1;
643 //    thisProxy.LoadBalanceDecisionFinal(req_no, adaptive_struct.lb_ideal_period);
644 //    CkPrintf("Final lb_period %d\n", adaptive_struct.lb_ideal_period);
645 //    adaptive_struct.in_progress = false;
646 //    adaptive_struct.global_max_iter_no = 0;
647 //    adaptive_struct.global_recv_iter_counter = 0;
648 //  }
649 //}
650
651 // called only on 0
652 void CentralLB::ReceiveCounts(CkReductionMsg  *msg)
653 {
654   CmiAssert(CkMyPe() == 0);
655   if (statsData == NULL) statsData = new LDStats;
656
657   int *counts = (int *)msg->getData();
658   int n_objs = counts[0];
659   int n_comm = counts[1];
660
661     // resize database
662   statsData->objData.resize(n_objs);
663   statsData->from_proc.resize(n_objs);
664   statsData->to_proc.resize(n_objs);
665   statsData->commData.resize(n_comm);
666
667   DEBUGF(("[%d] ReceiveCounts: n_objs:%d n_comm:%d\n",CkMyPe(), n_objs, n_comm));
668         
669     // broadcast call to let everybody start to send stats
670   thisProxy.SendStats();
671 }
672
673 void CentralLB::BuildStatsMsg()
674 {
675 #if CMK_LBDB_ON
676   // build and send stats
677   const int osz = theLbdb->GetObjDataSz();
678   const int csz = theLbdb->GetCommDataSz();
679
680   int npes = CkNumPes();
681   CLBStatsMsg* msg = new CLBStatsMsg(osz, csz);
682   _MEMCHECK(msg);
683   msg->from_pe = CkMyPe();
684 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
685         msg->step = step();
686 #endif
687   //msg->serial = CrnRand();
688
689 /*
690   theLbdb->TotalTime(&msg->total_walltime,&msg->total_cputime);
691   theLbdb->IdleTime(&msg->idletime);
692   theLbdb->BackgroundLoad(&msg->bg_walltime,&msg->bg_cputime);
693 */
694 #if CMK_LB_CPUTIMER
695   theLbdb->GetTime(&msg->total_walltime,&msg->total_cputime,
696                    &msg->idletime, &msg->bg_walltime,&msg->bg_cputime);
697 #else
698   theLbdb->GetTime(&msg->total_walltime,&msg->total_walltime,
699                    &msg->idletime, &msg->bg_walltime,&msg->bg_walltime);
700 #endif
701
702   msg->pe_speed = myspeed;
703   DEBUGF(("Processor %d Total time (wall,cpu) = %f %f Idle = %f Bg = %f %f\n", CkMyPe(),msg->total_walltime,msg->total_cputime,msg->idletime,msg->bg_walltime,msg->bg_cputime));
704
705   msg->n_objs = osz;
706   theLbdb->GetObjData(msg->objData);
707   msg->n_comm = csz;
708   theLbdb->GetCommData(msg->commData);
709 //  theLbdb->ClearLoads();
710   DEBUGF(("PE %d BuildStatsMsg %d objs, %d comm\n",CkMyPe(),msg->n_objs,msg->n_comm));
711
712   if(CkMyPe() == cur_ld_balancer) {
713     msg->avail_vector = new char[CkNumPes()];
714     LBDatabaseObj()->get_avail_vector(msg->avail_vector);
715     msg->next_lb = LBDatabaseObj()->new_lbbalancer();
716   }
717
718   CmiAssert(statsMsg == NULL);
719   statsMsg = msg;
720 #endif
721 }
722
723
724 // called on every processor
725 void CentralLB::SendStats()
726 {
727 #if CMK_LBDB_ON
728   CmiAssert(statsMsg != NULL);
729   reduction_started = 0;
730
731 #if USE_LDB_SPANNING_TREE
732   if(CkNumPes()>1024)
733   {
734     if (CkMyPe() == cur_ld_balancer)
735       thisProxy[CkMyPe()].ReceiveStats(statsMsg);
736     else
737       thisProxy[CkMyPe()].ReceiveStatsViaTree(statsMsg);
738   }
739   else
740 #endif
741   {
742     DEBUGF(("[%d] calling ReceiveStats on step %d \n",CmiMyPe(),step()));
743     thisProxy[cur_ld_balancer].ReceiveStats(statsMsg);
744   }
745
746   statsMsg = NULL;
747
748 #ifdef __BIGSIM__
749   BgEndStreaming();
750 #endif
751
752   {
753   // enfore the barrier to wait until centralLB says no
754   LDOMHandle h;
755   h.id.id.idx = 0;
756   theLbdb->getLBDB()->RegisteringObjects(h);
757   }
758 #endif
759 }
760
761 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
762 extern int donotCountMigration;
763 #endif
764
765 void CentralLB::Migrated(LDObjHandle h, int waitBarrier)
766 {
767 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
768     if(donotCountMigration){
769         return ;
770     }
771 #endif
772
773 #if CMK_LBDB_ON
774   if (waitBarrier) {
775             migrates_completed++;
776       DEBUGF(("[%d] An object migrated! %d %d\n",CkMyPe(),migrates_completed,migrates_expected));
777     if (migrates_completed == migrates_expected) {
778       MigrationDone(1);
779     }
780   }
781   else {
782     future_migrates_completed ++;
783     DEBUGF(("[%d] An object migrated with no barrier! %d expected: %d\n",CkMyPe(),future_migrates_completed,future_migrates_expected));
784     if (future_migrates_completed == future_migrates_expected)  {
785         CheckMigrationComplete();
786     }
787   }
788 #endif
789 }
790
791 void CentralLB::MissMigrate(int waitForBarrier)
792 {
793   LDObjHandle h;
794   Migrated(h, waitForBarrier);
795 }
796
797 // build a complete data from bufferred messages
798 // not used when USE_REDUCTION = 1
799 void CentralLB::buildStats()
800 {
801     statsData->nprocs() = stats_msg_count;
802     // allocate space
803     statsData->objData.resize(statsData->n_objs);
804     statsData->from_proc.resize(statsData->n_objs);
805     statsData->to_proc.resize(statsData->n_objs);
806     statsData->commData.resize(statsData->n_comm);
807
808     int nobj = 0;
809     int ncom = 0;
810     int nmigobj = 0;
811     // copy all data in individule message to this big structure
812     for (int pe=0; pe<CkNumPes(); pe++) {
813        int i;
814        CLBStatsMsg *msg = statsMsgsList[pe];
815        if(msg == NULL) continue;
816        for (i=0; i<msg->n_objs; i++) {
817          statsData->from_proc[nobj] = statsData->to_proc[nobj] = pe;
818          statsData->objData[nobj] = msg->objData[i];
819          if (msg->objData[i].migratable) nmigobj++;
820          nobj++;
821        }
822        for (i=0; i<msg->n_comm; i++) {
823          statsData->commData[ncom] = msg->commData[i];
824          ncom++;
825        }
826        // free the memory
827        delete msg;
828        statsMsgsList[pe]=0;
829     }
830     statsData->n_migrateobjs = nmigobj;
831 }
832
833 // deposit one processor data at a time, note database is pre-allocated
834 // to have enough space
835 // used when USE_REDUCTION = 1
836 void CentralLB::depositData(CLBStatsMsg *m)
837 {
838   int i;
839   if (m == NULL) return;
840
841   const int pe = m->from_pe;
842   struct ProcStats &procStat = statsData->procs[pe];
843   procStat.pe = pe;
844   procStat.total_walltime = m->total_walltime;
845   procStat.idletime = m->idletime;
846   procStat.bg_walltime = m->bg_walltime;
847 #if CMK_LB_CPUTIMER
848   procStat.total_cputime = m->total_cputime;
849   procStat.bg_cputime = m->bg_cputime;
850 #endif
851   procStat.pe_speed = m->pe_speed;
852   //procStat.utilization = 1.0;
853   procStat.available = CmiTrue;
854   procStat.n_objs = m->n_objs;
855
856   int &nobj = statsData->n_objs;
857   int &nmigobj = statsData->n_migrateobjs;
858   for (i=0; i<m->n_objs; i++) {
859       statsData->from_proc[nobj] = statsData->to_proc[nobj] = pe;
860       statsData->objData[nobj] = m->objData[i];
861       if (m->objData[i].migratable) nmigobj++;
862       nobj++;
863       CmiAssert(nobj <= statsData->objData.capacity());
864   }
865   int &n_comm = statsData->n_comm;
866   for (i=0; i<m->n_comm; i++) {
867       statsData->commData[n_comm] = m->commData[i];
868       n_comm++;
869       CmiAssert(n_comm <= statsData->commData.capacity());
870   }
871   delete m;
872 }
873
874 void CentralLB::ReceiveStats(CkMarshalledCLBStatsMessage &msg)
875 {
876 #if CMK_LBDB_ON
877   if (statsMsgsList == NULL) {
878     statsMsgsList = new CLBStatsMsg*[CkNumPes()];
879     CmiAssert(statsMsgsList != NULL);
880     for(int i=0; i < CkNumPes(); i++)
881       statsMsgsList[i] = 0;
882   }
883   if (statsData == NULL) statsData = new LDStats;
884
885     //  loop through all CLBStatsMsg in the incoming msg
886   int count = msg.getCount();
887   for (int num = 0; num < count; num++) 
888   {
889     CLBStatsMsg *m = msg.getMessage(num);
890     CmiAssert(m!=NULL);
891     const int pe = m->from_pe;
892     DEBUGF(("Stats msg received, %d %d %d %p step %d\n", pe,stats_msg_count,m->n_objs,m,step()));
893 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))     
894 /*      
895  *  if(m->step < step()){
896  *    //TODO: if a processor is redoing an old load balance step..
897  *    //tell it that the step is done and that it should not perform any migrations
898  *      thisProxy[pe].ReceiveDummyMigration();
899  *  }*/
900 #endif
901         
902     if(!CmiNodeAlive(pe)){
903         DEBUGF(("[%d] ReceiveStats called from invalidProcessor %d\n",CkMyPe(),pe));
904         continue;
905     }
906         
907     if (m->avail_vector!=NULL) {
908       LBDatabaseObj()->set_avail_vector(m->avail_vector,  m->next_lb);
909     }
910
911     if (statsMsgsList[pe] != 0) {
912       CkPrintf("*** Unexpected CLBStatsMsg in ReceiveStats from PE %d ***\n",
913              pe);
914     } else {
915       statsMsgsList[pe] = m;
916 #if USE_REDUCTION
917       depositData(m);
918 #else
919       // store per processor data right away
920       struct ProcStats &procStat = statsData->procs[pe];
921       procStat.pe = pe;
922       procStat.total_walltime = m->total_walltime;
923       procStat.idletime = m->idletime;
924       procStat.bg_walltime = m->bg_walltime;
925 #if CMK_LB_CPUTIMER
926       procStat.total_cputime = m->total_cputime;
927       procStat.bg_cputime = m->bg_cputime;
928 #endif
929       procStat.pe_speed = m->pe_speed;
930       //procStat.utilization = 1.0;
931       procStat.available = CmiTrue;
932       procStat.n_objs = m->n_objs;
933
934       statsData->n_objs += m->n_objs;
935       statsData->n_comm += m->n_comm;
936 #endif
937       stats_msg_count++;
938     }
939   }    // end of for
940
941   const int clients = CkNumValidPes();
942   DEBUGF(("THIS POINT count = %d, clients = %d\n",stats_msg_count,clients));
943  
944   if (stats_msg_count == clients) {
945         DEBUGF(("[%d] All stats messages received \n",CmiMyPe()));
946     statsData->nprocs() = stats_msg_count;
947     thisProxy[CkMyPe()].LoadBalance();
948   }
949 #endif
950 }
951
952 /** added by Abhinav for receiving msgs via spanning tree */
953 void CentralLB::ReceiveStatsViaTree(CkMarshalledCLBStatsMessage &msg)
954 {
955 #if CMK_LBDB_ON
956         CmiAssert(CkMyPe() != 0);
957         bufMsg.add(msg);         // buffer messages
958         count_msgs++;
959         //CkPrintf("here %d\n", CkMyPe());
960         if (count_msgs == st.numChildren+1) {
961                 if(st.parent == 0)
962                 {
963                         thisProxy[0].ReceiveStats(bufMsg);
964                         //CkPrintf("from %d\n", CkMyPe());
965                 }
966                 else
967                         thisProxy[st.parent].ReceiveStatsViaTree(bufMsg);
968                 count_msgs = 0;
969                 bufMsg.free();
970         } 
971 #endif
972 }
973
974 void CentralLB::LoadBalance()
975 {
976 #if CMK_LBDB_ON
977   int proc;
978   const int clients = CkNumPes();
979
980 #if ! USE_REDUCTION
981   // build data
982   buildStats();
983 #else
984   for (proc = 0; proc < clients; proc++) statsMsgsList[proc] = NULL;
985 #endif
986
987 //NOTE  theLbdb->ResetAdaptive();
988   if (!_lb_args.samePeSpeed()) statsData->normalize_speed();
989
990   if (_lb_args.debug()) 
991       CmiPrintf("\nCharmLB> %s: PE [%d] step %d starting at %f Memory: %f MB\n",
992                   lbname, cur_ld_balancer, step(), start_lb_time,
993                   CmiMemoryUsage()/(1024.0*1024.0));
994
995   // if we are in simulation mode read data
996   if (LBSimulation::doSimulation) simulationRead();
997
998   char *availVector = LBDatabaseObj()->availVector();
999   for(proc = 0; proc < clients; proc++)
1000       statsData->procs[proc].available = (CmiBool)availVector[proc];
1001
1002   preprocess(statsData);
1003
1004 //    CkPrintf("Before Calling Strategy\n");
1005
1006   if (_lb_args.printSummary()) {
1007       LBInfo info(clients);
1008         // not take comm data
1009       info.getInfo(statsData, clients, 0);
1010       LBRealType mLoad, mCpuLoad, totalLoad;
1011       info.getSummary(mLoad, mCpuLoad, totalLoad);
1012       int nmsgs, nbytes;
1013       statsData->computeNonlocalComm(nmsgs, nbytes);
1014       CkPrintf("[%d] Load Summary (before LB): max (with bg load): %f max (obj only): %f average: %f at step %d nonlocal: %d msgs %.2fKB.\n", CkMyPe(), mLoad, mCpuLoad, totalLoad/clients, step(), nmsgs, 1.0*nbytes/1024);
1015 //      if (_lb_args.debug() > 1) {
1016 //        for (int i=0; i<statsData->n_objs; i++)
1017 //          CmiPrintf("[%d] %.10f %.10f\n", i, statsData->objData[i].minWall, statsData->objData[i].maxWall);
1018 //      }
1019   }
1020
1021 #if CMK_REPLAYSYSTEM
1022   LDHandle *loadBalancer_pointers;
1023   if (_replaySystem) {
1024     loadBalancer_pointers = (LDHandle*)malloc(CkNumPes()*sizeof(LDHandle));
1025     for (int i=0; i<statsData->n_objs; ++i) loadBalancer_pointers[statsData->from_proc[i]] = statsData->objData[i].handle.omhandle.ldb;
1026   }
1027 #endif
1028   
1029   LBMigrateMsg* migrateMsg = Strategy(statsData);
1030 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1031         migrateMsg->step = step();
1032 #endif
1033
1034 #if CMK_REPLAYSYSTEM
1035   CpdHandleLBMessage(&migrateMsg);
1036   if (_replaySystem) {
1037     for (int i=0; i<migrateMsg->n_moves; ++i) migrateMsg->moves[i].obj.omhandle.ldb = loadBalancer_pointers[migrateMsg->moves[i].from_pe];
1038     free(loadBalancer_pointers);
1039   }
1040 #endif
1041   
1042   LBDatabaseObj()->get_avail_vector(migrateMsg->avail_vector);
1043   migrateMsg->next_lb = LBDatabaseObj()->new_lbbalancer();
1044
1045   // if this is the step at which we need to dump the database
1046   simulationWrite();
1047
1048 //  calculate predicted load
1049 //  very time consuming though, so only happen when debugging is on
1050   if (_lb_args.printSummary()) {
1051       LBInfo info(clients);
1052         // not take comm data
1053       getPredictedLoadWithMsg(statsData, clients, migrateMsg, info, 0);
1054       LBRealType mLoad, mCpuLoad, totalLoad;
1055       info.getSummary(mLoad, mCpuLoad, totalLoad);
1056       int nmsgs, nbytes;
1057       statsData->computeNonlocalComm(nmsgs, nbytes);
1058       CkPrintf("[%d] Load Summary (after LB): max (with bg load): %f max (obj only): %f average: %f at step %d nonlocal: %d msgs %.2fKB useMem: %.2fKB.\n", CkMyPe(), mLoad, mCpuLoad, totalLoad/clients, step(), nmsgs, 1.0*nbytes/1024, (1.0*useMem())/1024);
1059       for (int i=0; i<clients; i++)
1060         migrateMsg->expectedLoad[i] = info.peLoads[i];
1061   }
1062
1063   DEBUGF(("[%d]calling recv migration\n",CkMyPe()));
1064 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_)) 
1065     lbDecisionCount++;
1066     migrateMsg->lbDecisionCount = lbDecisionCount;
1067 #endif
1068
1069   envelope *env = UsrToEnv(migrateMsg);
1070   if (1) {
1071       // broadcast
1072     thisProxy.ReceiveMigration(migrateMsg);
1073   }
1074   else {
1075     // split the migration for each processor
1076     for (int p=0; p<CkNumPes(); p++) {
1077       LBMigrateMsg *m = extractMigrateMsg(migrateMsg, p);
1078       thisProxy[p].ReceiveMigration(m);
1079     }
1080     delete migrateMsg;
1081   }
1082
1083   // Zero out data structures for next cycle
1084   // CkPrintf("zeroing out data\n");
1085   statsData->clear();
1086   stats_msg_count=0;
1087 #endif
1088 }
1089
1090 // test if sender and receiver in a commData is nonmigratable.
1091 static int isMigratable(LDObjData **objData, int *len, int count, const LDCommData &commData)
1092 {
1093 #if CMK_LBDB_ON
1094   for (int pe=0 ; pe<count; pe++)
1095   {
1096     for (int i=0; i<len[pe]; i++)
1097       if (LDObjIDEqual(objData[pe][i].objID(), commData.sender.objID()) ||
1098           LDObjIDEqual(objData[pe][i].objID(), commData.receiver.get_destObj().objID())) 
1099       return 0;
1100   }
1101 #endif
1102   return 1;
1103 }
1104
1105 // rebuild LDStats and remove all non-migratble objects and related things
1106 void CentralLB::removeNonMigratable(LDStats* stats, int count)
1107 {
1108   int i;
1109
1110   // check if we have non-migratable objects
1111   int have = 0;
1112   for (i=0; i<stats->n_objs; i++) 
1113   {
1114     LDObjData &odata = stats->objData[i];
1115     if (!odata.migratable) {
1116       have = 1; break;
1117     }
1118   }
1119   if (have == 0) return;
1120
1121   CkVec<LDObjData> nonmig;
1122   CkVec<int> new_from_proc, new_to_proc;
1123   nonmig.resize(stats->n_migrateobjs);
1124   new_from_proc.resize(stats->n_migrateobjs);
1125   new_to_proc.resize(stats->n_migrateobjs);
1126   int n_objs = 0;
1127   for (i=0; i<stats->n_objs; i++) 
1128   {
1129     LDObjData &odata = stats->objData[i];
1130     if (odata.migratable) {
1131       nonmig[n_objs] = odata;
1132       new_from_proc[n_objs] = stats->from_proc[i];
1133       new_to_proc[n_objs] = stats->to_proc[i];
1134       n_objs ++;
1135     }
1136     else {
1137       stats->procs[stats->from_proc[i]].bg_walltime += odata.wallTime;
1138 #if CMK_LB_CPUTIMER
1139       stats->procs[stats->from_proc[i]].bg_cputime += odata.cpuTime;
1140 #endif
1141     }
1142   }
1143   CmiAssert(stats->n_migrateobjs == n_objs);
1144
1145   stats->makeCommHash();
1146   
1147   CkVec<LDCommData> newCommData;
1148   newCommData.resize(stats->n_comm);
1149   int n_comm = 0;
1150   for (i=0; i<stats->n_comm; i++) 
1151   {
1152     LDCommData& cdata = stats->commData[i];
1153     if (!cdata.from_proc()) 
1154     {
1155       int idx = stats->getSendHash(cdata);
1156       CmiAssert(idx != -1);
1157       if (!stats->objData[idx].migratable) continue;
1158     }
1159     switch (cdata.receiver.get_type()) {
1160     case LD_PROC_MSG:
1161       break;
1162     case LD_OBJ_MSG:  {
1163       int idx = stats->getRecvHash(cdata);
1164       if (stats->complete_flag)
1165         CmiAssert(idx != -1);
1166       else if (idx == -1) continue;          // receiver not in this group
1167       if (!stats->objData[idx].migratable) continue;
1168       break;
1169       }
1170     case LD_OBJLIST_MSG:    // object message FIXME add multicast
1171       break;
1172     }
1173     newCommData[n_comm] = cdata;
1174     n_comm ++;
1175   }
1176
1177   if (n_objs != stats->n_objs) CmiPrintf("Removed %d nonmigratable %d comms - n_objs:%d migratable:%d\n", stats->n_objs-n_objs, stats->n_objs, stats->n_migrateobjs, stats->n_comm-n_comm);
1178
1179   // swap to new data
1180   stats->objData = nonmig;
1181   stats->from_proc = new_from_proc;
1182   stats->to_proc = new_to_proc;
1183   stats->n_objs = n_objs;
1184
1185   stats->commData = newCommData;
1186   stats->n_comm = n_comm;
1187
1188   stats->deleteCommHash();
1189   stats->makeCommHash();
1190
1191 }
1192
1193
1194 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1195 extern int restarted;
1196 #endif
1197
1198 void CentralLB::ReceiveMigration(LBMigrateMsg *m)
1199 {
1200   storedMigrateMsg = m;
1201 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1202         ProcessReceiveMigration((CkReductionMsg*)NULL);
1203 #else
1204   CkCallback cb(CkIndex_CentralLB::ProcessReceiveMigration((CkReductionMsg*)NULL),
1205                   thisProxy);
1206   contribute(0, NULL, CkReduction::max_int, cb);
1207
1208 #endif
1209 }
1210
1211 void CentralLB::ProcessReceiveMigration(CkReductionMsg  *msg)
1212 {
1213 #if CMK_LBDB_ON
1214         int i;
1215         LBMigrateMsg *m = storedMigrateMsg;
1216         CmiAssert(m!=NULL);
1217         delete msg;
1218
1219 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1220         int *dummyCounts;
1221
1222         DEBUGF(("[%d] Starting ReceiveMigration WITH step %d m->step %d\n",CkMyPe(),step(),m->step));
1223         // CmiPrintf("[%d] Starting ReceiveMigration step %d m->step %d\n",CkMyPe(),step(),m->step);
1224         if(step() > m->step){
1225                 char str[100];
1226                 envelope *env = UsrToEnv(m);
1227                 CmiPrintf("[%d] Object %s tProcessed %d m->TN %d\n",CmiMyPe(),mlogData->objID.toString(str),mlogData->tProcessed,env->TN);
1228                 return;
1229         }
1230         lbDecisionCount = m->lbDecisionCount;
1231 #endif
1232
1233   if (_lb_args.debug() > 1) 
1234     if (CkMyPe()%1024==0) CmiPrintf("[%d] Starting ReceiveMigration step %d at %f\n",CkMyPe(),step(), CmiWallTimer());
1235
1236   for (i=0; i<CkNumPes(); i++) theLbdb->lastLBInfo.expectedLoad[i] = m->expectedLoad[i];
1237   CmiAssert(migrates_expected <= 0 || migrates_completed == migrates_expected);
1238 /*FAULT_EVAC*/
1239   if(!CmiNodeAlive(CkMyPe())){
1240         delete m;
1241         return;
1242   }
1243   migrates_expected = 0;
1244   future_migrates_expected = 0;
1245 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1246         int sending=0;
1247     int dummy=0;
1248         LBDB *_myLBDB = theLbdb->getLBDB();
1249         if(_restartFlag){
1250         dummyCounts = new int[CmiNumPes()];
1251         bzero(dummyCounts,sizeof(int)*CmiNumPes());
1252     }
1253 #endif
1254   for(i=0; i < m->n_moves; i++) {
1255     MigrateInfo& move = m->moves[i];
1256     const int me = CkMyPe();
1257     if (move.from_pe == me && move.to_pe != me) {
1258       DEBUGF(("[%d] migrating object to %d\n",move.from_pe,move.to_pe));
1259       // migrate object, in case it is already gone, inform toPe
1260 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
1261       if (theLbdb->Migrate(move.obj,move.to_pe) == 0) 
1262          thisProxy[move.to_pe].MissMigrate(!move.async_arrival);
1263 #else
1264             if(_restartFlag == 0){
1265                 DEBUG(CmiPrintf("[%d] need to move object from %d to %d \n",CkMyPe(),move.from_pe,move.to_pe));
1266                 theLbdb->Migrate(move.obj,move.to_pe);
1267                 sending++;
1268             }else{
1269                 if(_myLBDB->validObjHandle(move.obj)){
1270                     DEBUG(CmiPrintf("[%d] need to move object from %d to %d \n",CkMyPe(),move.from_pe,move.to_pe));
1271                     theLbdb->Migrate(move.obj,move.to_pe);
1272                     sending++;
1273                 }else{
1274                     DEBUG(CmiPrintf("[%d] dummy move to pe %d detected after restart \n",CmiMyPe(),move.to_pe));
1275                     dummyCounts[move.to_pe]++;
1276                     dummy++;
1277                 }
1278             }
1279 #endif
1280     } else if (move.from_pe != me && move.to_pe == me) {
1281        DEBUGF(("[%d] expecting object from %d\n",move.to_pe,move.from_pe));
1282       if (!move.async_arrival) migrates_expected++;
1283       else future_migrates_expected++;
1284     }
1285   }
1286   DEBUGF(("[%d] in ReceiveMigration %d moves expected: %d future expected: %d\n",CkMyPe(),m->n_moves, migrates_expected, future_migrates_expected));
1287   // if (_lb_debug) CkPrintf("[%d] expecting %d objects migrating.\n", CkMyPe(), migrates_expected);
1288
1289 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1290         if(_restartFlag){
1291                 sendDummyMigrationCounts(dummyCounts);
1292                 _restartFlag  =0;
1293         delete []dummyCounts;
1294         }
1295 #endif
1296
1297
1298 #if 0
1299   if (m->n_moves ==0) {
1300     theLbdb->SetLBPeriod(theLbdb->GetLBPeriod()*2);
1301   }
1302 #endif
1303   cur_ld_balancer = m->next_lb;
1304   if((CkMyPe() == cur_ld_balancer) && (cur_ld_balancer != 0)){
1305       LBDatabaseObj()->set_avail_vector(m->avail_vector, -2);
1306   }
1307
1308   if (migrates_expected == 0 || migrates_completed == migrates_expected)
1309     MigrationDone(1);
1310   delete m;
1311
1312 //      CkEvacuatedElement();
1313 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1314 //  migrates_expected = 0;
1315 //  //  ResumeClients(1);
1316 #endif
1317 #endif
1318 }
1319
1320 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1321 void CentralLB::ReceiveDummyMigration(int globalDecisionCount){
1322     DEBUGF(("[%d] ReceiveDummyMigration called for step %d with globalDecisionCount %d\n",CkMyPe(),step(),globalDecisionCount));
1323     //TODO: this is gonna be important when a crash happens during checkpoint
1324     //the globalDecisionCount would have to be saved and compared against
1325     //a future recvMigration
1326                 
1327         thisProxy[CkMyPe()].ResumeClients(1);
1328 }
1329 #endif
1330
1331 void CentralLB::MigrationDone(int balancing)
1332 {
1333 #if CMK_LBDB_ON
1334   migrates_completed = 0;
1335   migrates_expected = -1;
1336   // clear load stats
1337   if (balancing) theLbdb->ClearLoads();
1338   // Increment to next step
1339   theLbdb->incStep();
1340         DEBUGF(("[%d] Incrementing Step %d \n",CkMyPe(),step()));
1341   // if sync resume, invoke a barrier
1342
1343 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1344     savedBalancing = balancing;
1345     startLoadBalancingMlog(&resumeCentralLbAfterChkpt,(void *)this);
1346 #endif
1347
1348   LBDatabase::Object()->MigrationDone();    // call registered callbacks
1349
1350   LoadbalanceDone(balancing);        // callback
1351 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
1352   // if sync resume invoke a barrier
1353   if (balancing && _lb_args.syncResume()) {
1354     CkCallback cb(CkIndex_CentralLB::ResumeClients((CkReductionMsg*)NULL), 
1355                   thisProxy);
1356     contribute(0, NULL, CkReduction::sum_int, cb);
1357   }
1358   else{ 
1359     if(CmiNodeAlive(CkMyPe())){
1360         thisProxy [CkMyPe()].ResumeClients(balancing);
1361     }   
1362   }     
1363 #if CMK_GRID_QUEUE_AVAILABLE
1364   CmiGridQueueDeregisterAll ();
1365   CpvAccess(CkGridObject) = NULL;
1366 #endif
1367 #endif 
1368 #endif
1369 }
1370
1371 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1372 void CentralLB::endMigrationDone(int balancing){
1373     DEBUGF(("[%d] CentralLB::endMigrationDone step %d\n",CkMyPe(),step()));
1374
1375
1376   if (balancing && _lb_args.syncResume()) {
1377     CkCallback cb(CkIndex_CentralLB::ResumeClients((CkReductionMsg*)NULL),
1378                   thisProxy);
1379     contribute(0, NULL, CkReduction::sum_int, cb);
1380   }
1381   else{
1382     if(CmiNodeAlive(CkMyPe())){
1383     DEBUGF(("[%d] Sending ResumeClients balancing %d \n",CkMyPe(),balancing));
1384     thisProxy [CkMyPe()].ResumeClients(balancing);
1385     }
1386   }
1387
1388 }
1389 #endif
1390
1391 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1392 void resumeCentralLbAfterChkpt(void *_lb){
1393     CentralLB *lb= (CentralLB *)_lb;
1394     CpvAccess(_currentObj)=lb;
1395     lb->endMigrationDone(lb->savedBalancing);
1396 }
1397 #endif
1398
1399
1400 void CentralLB::ResumeClients(CkReductionMsg *msg)
1401 {
1402   ResumeClients(1);
1403   delete msg;
1404 }
1405
1406 void CentralLB::ResumeClients(int balancing)
1407 {
1408 #if CMK_LBDB_ON
1409 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1410     resumeCount++;
1411     globalResumeCount = resumeCount;
1412 #endif
1413   DEBUGF(("[%d] Resuming clients. balancing:%d.\n",CkMyPe(),balancing));
1414   if (balancing && _lb_args.debug() && CkMyPe() == cur_ld_balancer) {
1415     double end_lb_time = CkWallTimer();
1416   }
1417
1418 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
1419   if (balancing) ComlibNotifyMigrationDone();  
1420 #endif
1421
1422   theLbdb->ResumeClients();
1423   if (balancing)  {
1424
1425     CheckMigrationComplete();
1426     if (future_migrates_expected == 0 || 
1427             future_migrates_expected == future_migrates_completed) {
1428       CheckMigrationComplete();
1429     }
1430   }
1431 #endif
1432 }
1433
1434 /*
1435   migration of objects contains two different kinds:
1436   (1) objects want to make a barrier for migration completion
1437       (waitForBarrier is true)
1438       migrationDone() to finish and resumeClients
1439   (2) objects don't need a barrier
1440   However, next load balancing can only happen when both migrations complete
1441 */ 
1442 void CentralLB::CheckMigrationComplete()
1443 {
1444 #if CMK_LBDB_ON
1445   lbdone ++;
1446   if (lbdone == 2) {
1447     if (_lb_args.debug() && CkMyPe()==0) {
1448       double end_lb_time = CkWallTimer();
1449       CkPrintf("CharmLB> %s: PE [%d] step %d finished at %f duration %f s\n\n",
1450                 lbname, cur_ld_balancer, step()-1, end_lb_time,
1451                 end_lb_time-start_lb_time);
1452     }
1453
1454     //FIX ME!!! adaptive_struct.lb_migration_cost = (CkWallTimer() - start_lb_time);
1455 //NOTE    theLbdb->SetMigrationCost(CkWallTimer() - start_lb_time);
1456
1457     lbdone = 0;
1458     future_migrates_expected = -1;
1459     future_migrates_completed = 0;
1460
1461
1462     DEBUGF(("[%d] Migration Complete\n", CkMyPe()));
1463     // release local barrier  so that the next load balancer can go
1464     LDOMHandle h;
1465     h.id.id.idx = 0;
1466     theLbdb->getLBDB()->DoneRegisteringObjects(h);
1467     // switch to the next load balancer in the list
1468     // subtle: called from Migrated() may result in Migrated() called in next LB
1469     theLbdb->nextLoadbalancer(seqno);
1470   }
1471 #endif
1472 }
1473
1474 void CentralLB::preprocess(LDStats* stats)
1475 {
1476   if (_lb_args.ignoreBgLoad())
1477     stats->clearBgLoad();
1478
1479   // Call the predictor for the future
1480   if (_lb_predict) FuturePredictor(statsData);
1481 }
1482
1483 // default load balancing strategy
1484 LBMigrateMsg* CentralLB::Strategy(LDStats* stats)
1485 {
1486 #if CMK_LBDB_ON
1487   double strat_start_time = CkWallTimer();
1488   if (_lb_args.debug())
1489     CkPrintf("CharmLB> %s: PE [%d] strategy starting at %f\n", lbname, cur_ld_balancer, strat_start_time);
1490
1491   work(stats);
1492
1493
1494   if (_lb_args.debug()>2)  {
1495     CkPrintf("CharmLB> Obj Map:\n");
1496     for (int i=0; i<stats->n_objs; i++) CkPrintf("%d ", stats->to_proc[i]);
1497     CkPrintf("\n");
1498   }
1499
1500   LBMigrateMsg *msg = createMigrateMsg(stats);
1501
1502   int clients = CkNumPes();
1503   LBInfo info(clients);
1504   getPredictedLoadWithMsg(stats, clients, msg, info, 0);
1505   LBRealType mLoad, mCpuLoad, totalLoad, totalLoadWComm;
1506   info.getSummary(mLoad, mCpuLoad, totalLoad);
1507   //CkPrintf("CharmLB> Max load w/o comm %lf Max cpu load %lf Avg load %lf\n", mLoad, mCpuLoad, totalLoad/clients);
1508   //info.print();
1509 //NOTE!!  theLbdb->UpdateAfterLBData(mLoad, mCpuLoad, totalLoad/clients);
1510
1511   //getPredictedLoadWithMsg(stats, clients, msg, info,1);
1512   //info.getSummary(mLoad, mCpuLoad, totalLoadWComm);
1513   //info.print();
1514   //CkPrintf("CharmLB> Max load with comm %lf Max cpu load %lf Avg load %lf\n", mLoad, mCpuLoad, totalLoad/clients);
1515   //int nmsgs, nbytes;
1516   //stats->computeNonlocalComm(nmsgs, nbytes);
1517   //CkPrintf("CharmLB> Non local communication %d msg and %d bytes\n", nmsgs, nbytes);
1518
1519
1520   //long msg_n;
1521   //long long bytes_n;
1522   //stats->computeComm(msg_n, bytes_n);
1523   //CkPrintf("CharmLB> Total communication %ld msg and %lld bytes\n", nmsgs, nbytes);
1524
1525   //double alpha_beta_cost = (msg_n * alpha) + (bytes_n * beta);
1526   //theLbdb->UpdateAfterLBComm(alpha_beta_cost/totalLoad);
1527
1528   if (_lb_args.debug()) {
1529     double strat_end_time = CkWallTimer();
1530     envelope *env = UsrToEnv(msg);
1531
1532     double lbdbMemsize = LBDatabase::Object()->useMem()/1000;
1533     CkPrintf("CharmLB> %s: PE [%d] Memory: LBManager: %d KB CentralLB: %d KB\n",
1534               lbname, cur_ld_balancer, (int)lbdbMemsize, (int)(useMem()/1000));
1535     CkPrintf("CharmLB> %s: PE [%d] #Objects migrating: %d, LBMigrateMsg size: %.2f MB\n", lbname, cur_ld_balancer, msg->n_moves, env->getTotalsize()/1024.0/1024.0);
1536     CkPrintf("CharmLB> %s: PE [%d] strategy finished at %f duration %f s\n",
1537               lbname, cur_ld_balancer, strat_end_time, strat_end_time-strat_start_time);
1538     //CkPrintf("Strategy cost %f %f %f\n", strat_end_time, strat_start_time, adaptive_struct.lb_strategy_cost);
1539 //NOTE    theLbdb->SetStrategyCost(strat_end_time - strat_start_time);
1540   }
1541   return msg;
1542 #else
1543   return NULL;
1544 #endif
1545 }
1546
1547 void CentralLB::work(LDStats* stats)
1548 {
1549   // does nothing but print the database
1550   stats->print();
1551 }
1552
1553 // generate migrate message from stats->from_proc and to_proc
1554 LBMigrateMsg * CentralLB::createMigrateMsg(LDStats* stats)
1555 {
1556   int i;
1557   CkVec<MigrateInfo*> migrateInfo;
1558   for (i=0; i<stats->n_objs; i++) {
1559     LDObjData &objData = stats->objData[i];
1560     int frompe = stats->from_proc[i];
1561     int tope = stats->to_proc[i];
1562     if (frompe != tope) {
1563       //      CkPrintf("[%d] Obj %d migrating from %d to %d\n",
1564       //         CkMyPe(),obj,pe,dest);
1565       MigrateInfo *migrateMe = new MigrateInfo;
1566       migrateMe->obj = objData.handle;
1567       migrateMe->from_pe = frompe;
1568       migrateMe->to_pe = tope;
1569       migrateMe->async_arrival = objData.asyncArrival;
1570       migrateInfo.insertAtEnd(migrateMe);
1571     }
1572   }
1573
1574   int migrate_count=migrateInfo.length();
1575   LBMigrateMsg* msg = new(migrate_count,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
1576   msg->n_moves = migrate_count;
1577   for(i=0; i < migrate_count; i++) {
1578     MigrateInfo* item = (MigrateInfo*) migrateInfo[i];
1579     msg->moves[i] = *item;
1580     delete item;
1581     migrateInfo[i] = 0;
1582   }
1583   return msg;
1584 }
1585
1586 LBMigrateMsg * CentralLB::extractMigrateMsg(LBMigrateMsg *m, int p)
1587 {
1588   int nmoves = 0;
1589   int nunavail = 0;
1590   int i;
1591   for (i=0; i<m->n_moves; i++) {
1592     MigrateInfo* item = (MigrateInfo*) &m->moves[i];
1593     if (item->from_pe == p || item->to_pe == p) nmoves++;
1594   }
1595   for (i=0; i<CkNumPes();i++) {
1596     if (!m->avail_vector[i]) nunavail++;
1597   }
1598   LBMigrateMsg* msg;
1599   if (nunavail) msg = new(nmoves,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
1600   else msg = new(nmoves,0,0,0) LBMigrateMsg;
1601   msg->n_moves = nmoves;
1602   msg->level = m->level;
1603   msg->next_lb = m->next_lb;
1604   for (i=0,nmoves=0; i<m->n_moves; i++) {
1605     MigrateInfo* item = (MigrateInfo*) &m->moves[i];
1606     if (item->from_pe == p || item->to_pe == p) {
1607       msg->moves[nmoves] = *item;
1608       nmoves++;
1609     }
1610   }
1611   // copy processor data
1612   if (nunavail)
1613   for (i=0; i<CkNumPes();i++) {
1614     msg->avail_vector[i] = m->avail_vector[i];
1615     msg->expectedLoad[i] = m->expectedLoad[i];
1616   }
1617   return msg;
1618 }
1619
1620 void CentralLB::simulationWrite() {
1621   if(step() == LBSimulation::dumpStep)
1622   {
1623     // here we are supposed to dump the database
1624     int dumpFileSize = strlen(LBSimulation::dumpFile) + 4;
1625     char *dumpFileName = (char *)malloc(dumpFileSize);
1626     while (sprintf(dumpFileName, "%s.%d", LBSimulation::dumpFile, LBSimulation::dumpStep) >= dumpFileSize) {
1627       free(dumpFileName);
1628       dumpFileSize+=3;
1629       dumpFileName = (char *)malloc(dumpFileSize);
1630     }
1631     writeStatsMsgs(dumpFileName);
1632     free(dumpFileName);
1633     CmiPrintf("LBDump: Dumped the load balancing data at step %d.\n",LBSimulation::dumpStep);
1634     ++LBSimulation::dumpStep;
1635     --LBSimulation::dumpStepSize;
1636     if (LBSimulation::dumpStepSize <= 0) { // prevent stupid step sizes
1637       CmiPrintf("Charm++> Exiting...\n");
1638       CkExit();
1639     }
1640     return;
1641   }
1642 }
1643
1644 void CentralLB::simulationRead() {
1645   LBSimulation *simResults = NULL, *realResults;
1646   LBMigrateMsg *voidMessage = new (0,0,0,0) LBMigrateMsg();
1647   voidMessage->n_moves=0;
1648   for ( ;LBSimulation::simStepSize > 0; --LBSimulation::simStepSize, ++LBSimulation::simStep) {
1649     // here we are supposed to read the data from the dump database
1650     int simFileSize = strlen(LBSimulation::dumpFile) + 4;
1651     char *simFileName = (char *)malloc(simFileSize);
1652     while (sprintf(simFileName, "%s.%d", LBSimulation::dumpFile, LBSimulation::simStep) >= simFileSize) {
1653       free(simFileName);
1654       simFileSize+=3;
1655       simFileName = (char *)malloc(simFileSize);
1656     }
1657     readStatsMsgs(simFileName);
1658
1659     // allocate simResults (only the first step)
1660     if (simResults == NULL) {
1661       simResults = new LBSimulation(LBSimulation::simProcs);
1662       realResults = new LBSimulation(LBSimulation::simProcs);
1663     }
1664     else {
1665       // should be the same number of procs of the original simulation!
1666       if (!LBSimulation::procsChanged) {
1667         // it means we have a previous step, so in simResults there is data.
1668         // we can now print the real effects of the load balancer during the simulation
1669         // or print the difference between the predicted data and the real one.
1670         realResults->reset();
1671         // reset to_proc of statsData to be equal to from_proc
1672         for (int k=0; k < statsData->n_objs; ++k) statsData->to_proc[k] = statsData->from_proc[k];
1673         findSimResults(statsData, LBSimulation::simProcs, voidMessage, realResults);
1674         simResults->PrintDifferences(realResults,statsData);
1675       }
1676       simResults->reset();
1677     }
1678
1679     // now pass it to the strategy routine
1680     double startT = CkWallTimer();
1681     preprocess(statsData);
1682     CmiPrintf("%s> Strategy starts ... \n", lbname);
1683     LBMigrateMsg* migrateMsg = Strategy(statsData);
1684     CmiPrintf("%s> Strategy took %fs memory usage: CentralLB: %d KB.\n",
1685                lbname, CkWallTimer()-startT, (int)(useMem()/1000));
1686
1687     // now calculate the results of the load balancing simulation
1688     findSimResults(statsData, LBSimulation::simProcs, migrateMsg, simResults);
1689
1690     // now we have the simulation data, so print it and loop
1691     CmiPrintf("Charm++> LBSim: Simulation of load balancing step %d done.\n",LBSimulation::simStep);
1692     // **CWL** Officially recording my disdain here for using ints for bool
1693     if (LBSimulation::showDecisionsOnly) {
1694       simResults->PrintDecisions(migrateMsg, simFileName, 
1695                                  LBSimulation::simProcs);
1696     } else {
1697       simResults->PrintSimulationResults();
1698     }
1699
1700     free(simFileName);
1701     delete migrateMsg;
1702     CmiPrintf("Charm++> LBSim: Passing to the next step\n");
1703   }
1704   // deallocate simResults
1705   delete simResults;
1706   CmiPrintf("Charm++> Exiting...\n");
1707   CkExit();
1708 }
1709
1710 void CentralLB::readStatsMsgs(const char* filename) 
1711 {
1712 #if CMK_LBDB_ON
1713   int i;
1714   FILE *f = fopen(filename, "r");
1715   if (f==NULL) {
1716     CmiPrintf("Fatal Error> Cannot open LB Dump file %s!\n", filename);
1717     CmiAbort("");
1718   }
1719
1720   // at this stage, we need to rebuild the statsMsgList and
1721   // statsDataList structures. For that first deallocate the
1722   // old structures
1723   if (statsMsgsList) {
1724     for(i = 0; i < stats_msg_count; i++)
1725       delete statsMsgsList[i];
1726     delete[] statsMsgsList;
1727     statsMsgsList=0;
1728   }
1729
1730   PUP::fromDisk pd(f);
1731   PUP::machineInfo machInfo;
1732
1733   pd((char *)&machInfo, sizeof(machInfo));      // read machine info
1734   PUP::xlater p(machInfo, pd);
1735
1736   if (_lb_args.lbversion() > 1) {
1737     p|_lb_args.lbversion();             // write version number
1738     CkPrintf("LB> File version detected: %d\n", _lb_args.lbversion());
1739     CmiAssert(_lb_args.lbversion() <= LB_FORMAT_VERSION);
1740   }
1741   p|stats_msg_count;
1742
1743   CmiPrintf("readStatsMsgs for %d pes starts ... \n", stats_msg_count);
1744   if (LBSimulation::simProcs == 0) LBSimulation::simProcs = stats_msg_count;
1745   if (LBSimulation::simProcs != stats_msg_count) LBSimulation::procsChanged = true;
1746
1747   // LBSimulation::simProcs must be set
1748   statsData->pup(p);
1749
1750   CmiPrintf("Simulation for %d pes \n", LBSimulation::simProcs);
1751   CmiPrintf("n_obj: %d n_migratble: %d \n", statsData->n_objs, statsData->n_migrateobjs);
1752
1753   // file f is closed in the destructor of PUP::fromDisk
1754   CmiPrintf("ReadStatsMsg from %s completed\n", filename);
1755 #endif
1756 }
1757
1758 void CentralLB::writeStatsMsgs(const char* filename) 
1759 {
1760 #if CMK_LBDB_ON
1761   FILE *f = fopen(filename, "w");
1762   if (f==NULL) {
1763     CmiPrintf("Fatal Error> writeStatsMsgs failed to open the output file %s!\n", filename);
1764     CmiAbort("");
1765   }
1766
1767   const PUP::machineInfo &machInfo = PUP::machineInfo::current();
1768   PUP::toDisk p(f);
1769   p((char *)&machInfo, sizeof(machInfo));       // machine info
1770
1771   p|_lb_args.lbversion();               // write version number
1772   p|stats_msg_count;
1773   statsData->pup(p);
1774
1775   fclose(f);
1776
1777   CmiPrintf("WriteStatsMsgs to %s succeed!\n", filename);
1778 #endif
1779 }
1780
1781 // calculate the predicted wallclock/cpu load for every processors
1782 // considering communication overhead if considerComm is true
1783 void getPredictedLoadWithMsg(BaseLB::LDStats* stats, int count, 
1784                       LBMigrateMsg *msg, LBInfo &info, 
1785                       int considerComm)
1786 {
1787 #if CMK_LBDB_ON
1788         stats->makeCommHash();
1789
1790         // update to_proc according to migration msgs
1791         for(int i = 0; i < msg->n_moves; i++) {
1792           MigrateInfo &mInfo = msg->moves[i];
1793           int idx = stats->getHash(mInfo.obj.objID(), mInfo.obj.omID());
1794           CmiAssert(idx != -1);
1795           stats->to_proc[idx] = mInfo.to_pe;
1796         }
1797
1798         info.getInfo(stats, count, considerComm);
1799 #endif
1800 }
1801
1802
1803 void CentralLB::findSimResults(LDStats* stats, int count, LBMigrateMsg* msg, LBSimulation* simResults)
1804 {
1805     CkAssert(simResults != NULL && count == simResults->numPes);
1806     // estimate the new loads of the processors. As a first approximation, this is the
1807     // sum of the cpu times of the objects on that processor
1808     double startT = CkWallTimer();
1809     getPredictedLoadWithMsg(stats, count, msg, simResults->lbinfo, 1);
1810     CmiPrintf("getPredictedLoad finished in %fs\n", CkWallTimer()-startT);
1811 }
1812
1813 void CentralLB::pup(PUP::er &p) { 
1814   BaseLB::pup(p); 
1815   if (p.isUnpacking())  {
1816     initLB(CkLBOptions(seqno)); 
1817   }
1818   p|reduction_started;
1819   int has_statsMsg=0;
1820   if (p.isPacking()) has_statsMsg = (statsMsg!=NULL);
1821   p|has_statsMsg;
1822   if (has_statsMsg) {
1823     if (p.isUnpacking())
1824       statsMsg = new CLBStatsMsg;
1825     statsMsg->pup(p);
1826   }
1827 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1828   p | lbDecisionCount;
1829   p | resumeCount;
1830 #endif
1831         
1832 }
1833
1834 int CentralLB::useMem() { 
1835   return sizeof(CentralLB) + statsData->useMem() + 
1836          CkNumPes() * sizeof(CLBStatsMsg *);
1837 }
1838
1839
1840 /**
1841   CLBStatsMsg is not a real message now.
1842   CLBStatsMsg is used for all processors to fill in their local load and comm
1843   statistics and send to processor 0
1844 */
1845
1846 CLBStatsMsg::CLBStatsMsg(int osz, int csz) {
1847   n_objs = osz;
1848   n_comm = csz;
1849   objData = new LDObjData[osz];
1850   commData = new LDCommData[csz];
1851   avail_vector = NULL;
1852 }
1853
1854 CLBStatsMsg::~CLBStatsMsg() {
1855   delete [] objData;
1856   delete [] commData;
1857   delete [] avail_vector;
1858 }
1859
1860 void CLBStatsMsg::pup(PUP::er &p) {
1861   int i;
1862   p|from_pe;
1863   p|pe_speed;
1864   p|total_walltime;
1865   p|idletime;
1866   p|bg_walltime;
1867 #if CMK_LB_CPUTIMER
1868   p|total_cputime;
1869   p|bg_cputime;
1870 #endif
1871 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1872   p | step;
1873 #endif
1874   p|n_objs;
1875   if (p.isUnpacking()) objData = new LDObjData[n_objs];
1876   for (i=0; i<n_objs; i++) p|objData[i];
1877   p|n_comm;
1878   if (p.isUnpacking()) commData = new LDCommData[n_comm];
1879   for (i=0; i<n_comm; i++) p|commData[i];
1880
1881   int has_avail_vector;
1882   if (!p.isUnpacking()) has_avail_vector = (avail_vector != NULL);
1883   p|has_avail_vector;
1884   if (p.isUnpacking()) {
1885     if (has_avail_vector) avail_vector = new char[CkNumPes()];
1886     else avail_vector = NULL;
1887   }
1888   if (has_avail_vector) p(avail_vector, CkNumPes());
1889
1890   p(next_lb);
1891 }
1892
1893 // CkMarshalledCLBStatsMessage is used in the marshalled parameter in
1894 // the entry function, it is just used to use to pup.
1895 // I don't use CLBStatsMsg directly as marshalled parameter because
1896 // I want the data pointer stored and not to be freed by the Charm++.
1897 void CkMarshalledCLBStatsMessage::free() { 
1898   int count = msgs.size();
1899   for  (int i=0; i<count; i++) {
1900     delete msgs[i];
1901     msgs[i] = NULL;
1902   }
1903   msgs.free();
1904 }
1905
1906 void CkMarshalledCLBStatsMessage::add(CkMarshalledCLBStatsMessage &m)
1907 {
1908   int count = m.getCount();
1909   for (int i=0; i<count; i++) add(m.getMessage(i));
1910 }
1911
1912 void CkMarshalledCLBStatsMessage::pup(PUP::er &p)
1913 {
1914   int count = msgs.size();
1915   p|count;
1916   for (int i=0; i<count; i++) {
1917     CLBStatsMsg *msg;
1918     if (p.isUnpacking()) msg = new CLBStatsMsg;
1919     else { 
1920       msg = msgs[i]; CmiAssert(msg!=NULL);
1921     }
1922     msg->pup(p);
1923     if (p.isUnpacking()) add(msg);
1924   }
1925 }
1926
1927 SpanningTree::SpanningTree()
1928 {
1929         double sq = sqrt(CkNumPes()*4.0-3.0) - 1; // 1 + arity + arity*arity = CkNumPes()
1930         arity = (int)ceil(sq/2);
1931         calcParent(CkMyPe());
1932         calcNumChildren(CkMyPe());
1933 }
1934
1935 void SpanningTree::calcParent(int n)
1936 {
1937         parent=-1;
1938         if(n != 0  && arity > 0)
1939                 parent = (n-1)/arity;
1940 }
1941
1942 void SpanningTree::calcNumChildren(int n)
1943 {
1944         numChildren = 0;
1945         if (arity == 0) return;
1946         int fullNode=(CkNumPes()-1-arity)/arity;
1947         if(n <= fullNode)
1948                 numChildren = arity;
1949         if(n == fullNode+1)
1950                 numChildren = CkNumPes()-1-(fullNode+1)*arity;
1951         if(n > fullNode+1)
1952                 numChildren = 0;
1953 }
1954
1955 #include "CentralLB.def.h"
1956  
1957 /*@}*/