4fd93082ad396789b2648ff36913939e2398c762
[charm.git] / src / ck-ldb / CentralLB.C
1
2 /**
3  * \addtogroup CkLdb
4 */
5 /*@{*/
6
7 #include <charm++.h>
8 #include "ck.h"
9 #include "envelope.h"
10 #include "CentralLB.h"
11 #include "LBDBManager.h"
12 #include "LBSimulation.h"
13
14 //#include "limits.h"
15 #include <vector>
16
17 #define  DEBUGF(x)       // CmiPrintf x;
18 #define  DEBUG(x)        // x;
19
20 #if CMK_MEM_CHECKPOINT
21    /* can not handle reduction in inmem FT */
22 #define USE_REDUCTION         0
23 #define USE_LDB_SPANNING_TREE 0
24 #elif defined(_FAULT_MLOG_)
25 /* can not handle reduction in inmem FT */
26 #define USE_REDUCTION         0
27 #define USE_LDB_SPANNING_TREE 0
28 #else
29 #define USE_REDUCTION         1
30 #define USE_LDB_SPANNING_TREE 1
31 #endif
32
33 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
34 extern int _restartFlag;
35 extern void getGlobalStep(CkGroupID );
36 extern void initMlogLBStep(CkGroupID );
37 extern int globalResumeCount;
38 extern void sendDummyMigrationCounts(int *);
39 #endif
40
41 #if CMK_GRID_QUEUE_AVAILABLE
42 CpvExtern(void *, CkGridObject);
43 #endif
44
45 CkGroupID loadbalancer;
46 int * lb_ptr;
47 int load_balancer_created;
48
49 //struct AdaptiveData {
50 //  int iteration;
51 //  double max_load;
52 //  double avg_load;
53 //};
54 //
55 //struct AdaptiveLBDatabase {
56 //  std::vector<AdaptiveData> history_data;
57 //} adaptive_lbdb;
58 //
59 //enum state {
60 //  OFF,
61 //  ON,
62 //  PAUSE,
63 //  DECIDED,
64 //  LOAD_BALANCE
65 //} local_state;
66 //
67 //struct AdaptiveLBStructure {
68 //  int lb_ideal_period;
69 //  int lb_calculated_period;
70 //  int lb_no_iterations;
71 //  int global_max_iter_no;
72 //  int global_recv_iter_counter;
73 //  bool in_progress;
74 //  double prev_load;
75 //  double lb_strategy_cost;
76 //  double lb_migration_cost;
77 //  bool lb_period_informed;
78 //  int lb_msg_send_no;
79 //  int lb_msg_recv_no;
80 //} adaptive_struct;
81
82 CreateLBFunc_Def(CentralLB, "CentralLB base class")
83
84 static void getPredictedLoadWithMsg(BaseLB::LDStats* stats, int count, 
85                              LBMigrateMsg *, LBInfo &info, int considerComm);
86
87 //CkReductionMsg* lbDataCollection(int nMsg, CkReductionMsg** msgs) {
88 //  double lb_data[4];
89 //  lb_data[0] = 0;
90 //  lb_data[1] = 0;
91 //  lb_data[2] = 0;
92 //  for (int i = 0; i < nMsg; i++) {
93 //    CkAssert(msgs[i]->getSize() == 4*sizeof(double));
94 //    double* m = (double *)msgs[i]->getData();
95 //    lb_data[0] += m[0];
96 //    lb_data[1] = ((m[1] > lb_data[1])? m[1] : lb_data[1]);
97 //    lb_data[2] += m[2];
98 //    if (i == 0) {
99 //      lb_data[3] = m[3];
100 //    }
101 //    if (m[3] != lb_data[3]) {
102 //      CkPrintf("Error!!! Reduction is intermingled between iteration %lf and\
103 //      %lf\n", lb_data[3], m[3]);
104 //    }
105 //  }
106 //  return CkReductionMsg::buildNew(4*sizeof(double), lb_data);
107 //}
108 //
109 ///*global*/ CkReduction::reducerType lbDataCollectionType;
110 ///*initcall*/ void registerLBDataCollection(void) {
111 //  lbDataCollectionType = CkReduction::addReducer(lbDataCollection);
112 //}
113
114 /*
115 void CreateCentralLB()
116 {
117   CProxy_CentralLB::ckNew(0);
118 }
119 */
120
121 void CentralLB::staticStartLB(void* data)
122 {
123   CentralLB *me = (CentralLB*)(data);
124   me->StartLB();
125 }
126
127 void CentralLB::staticMigrated(void* data, LDObjHandle h, int waitBarrier)
128 {
129   CentralLB *me = (CentralLB*)(data);
130   me->Migrated(h, waitBarrier);
131 }
132
133 void CentralLB::staticAtSync(void* data)
134 {
135   CentralLB *me = (CentralLB*)(data);
136   me->AtSync();
137 }
138
139 void CentralLB::initLB(const CkLBOptions &opt)
140 {
141 #if CMK_LBDB_ON
142   lbname = "CentralLB";
143   thisProxy = CProxy_CentralLB(thisgroup);
144   //  CkPrintf("Construct in %d\n",CkMyPe());
145
146   // create and turn on by default
147   receiver = theLbdb->
148     AddLocalBarrierReceiver((LDBarrierFn)(staticAtSync),(void*)(this));
149   notifier = theLbdb->getLBDB()->
150     NotifyMigrated((LDMigratedFn)(staticMigrated),(void*)(this));
151   startLbFnHdl = theLbdb->getLBDB()->
152     AddStartLBFn((LDStartLBFn)(staticStartLB),(void*)(this));
153
154   // CkPrintf("[%d] CentralLB initLB \n",CkMyPe());
155   if (opt.getSeqNo() > 0) turnOff();
156
157   stats_msg_count = 0;
158   statsMsgsList = NULL;
159   statsData = NULL;
160
161   storedMigrateMsg = NULL;
162   reduction_started = 0;
163
164   // for future predictor
165   if (_lb_predict) predicted_model = new FutureModel(_lb_predict_window);
166   else predicted_model=0;
167   // register user interface callbacks
168   theLbdb->getLBDB()->SetupPredictor((LDPredictModelFn)(staticPredictorOn),(LDPredictWindowFn)(staticPredictorOnWin),(LDPredictFn)(staticPredictorOff),(LDPredictModelFn)(staticChangePredictor),(void*)(this));
169
170   myspeed = theLbdb->ProcessorSpeed();
171
172   migrates_completed = 0;
173   future_migrates_completed = 0;
174   migrates_expected = -1;
175   future_migrates_expected = -1;
176   cur_ld_balancer = _lb_args.central_pe();      // 0 default
177   lbdone = 0;
178   count_msgs=0;
179   statsMsg = NULL;
180
181   if (_lb_args.statsOn()) theLbdb->CollectStatsOn();
182
183   load_balancer_created = 1;
184
185   // If metabalancer enabled, initialize the variables
186  // adaptive_struct.lb_ideal_period =  INT_MAX;
187  // adaptive_struct.lb_calculated_period = INT_MAX;
188  // adaptive_struct.lb_no_iterations = -1;
189  // adaptive_struct.global_max_iter_no = 0;
190  // adaptive_struct.global_recv_iter_counter = 0;
191  // adaptive_struct.in_progress = false;
192  // adaptive_struct.prev_load = 0.0;
193  // adaptive_struct.lb_strategy_cost = 0.0;
194  // adaptive_struct.lb_migration_cost = 0.0;
195  // adaptive_struct.lb_msg_send_no = 0;
196  // adaptive_struct.lb_msg_recv_no = 0;
197  // local_state = OFF;
198 #endif
199 }
200
201 CentralLB::~CentralLB()
202 {
203 #if CMK_LBDB_ON
204   delete [] statsMsgsList;
205   delete statsData;
206   theLbdb = CProxy_LBDatabase(_lbdb).ckLocalBranch();
207   if (theLbdb) {
208     theLbdb->getLBDB()->
209       RemoveNotifyMigrated(notifier);
210     theLbdb->
211       RemoveStartLBFn((LDStartLBFn)(staticStartLB));
212   }
213 #endif
214 }
215
216 void CentralLB::turnOn() 
217 {
218 #if CMK_LBDB_ON
219   theLbdb->getLBDB()->
220     TurnOnBarrierReceiver(receiver);
221   theLbdb->getLBDB()->
222     TurnOnNotifyMigrated(notifier);
223   theLbdb->getLBDB()->
224     TurnOnStartLBFn(startLbFnHdl);
225 #endif
226 }
227
228 void CentralLB::turnOff() 
229 {
230 #if CMK_LBDB_ON
231   theLbdb->getLBDB()->
232     TurnOffBarrierReceiver(receiver);
233   theLbdb->getLBDB()->
234     TurnOffNotifyMigrated(notifier);
235   theLbdb->getLBDB()->
236     TurnOffStartLBFn(startLbFnHdl);
237 #endif
238 }
239
240 void CentralLB::AtSync()
241 {
242 //  CkPrintf("AtSync CEntral LB [%d]\n", CkMyPe());
243 #if CMK_LBDB_ON
244 //  DEBUGF(("[%d] CentralLB AtSync step %d!!!!!\n",CkMyPe(),step()));
245
246 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
247         CpvAccess(_currentObj)=this;
248 #endif
249
250   // if num of processor is only 1, nothing should happen
251   if (!QueryBalanceNow(step()) || CkNumPes() == 1) {
252     MigrationDone(0);
253     return;
254   }
255   if(CmiNodeAlive(CkMyPe())){
256     thisProxy [CkMyPe()].ProcessAtSync();
257   }
258 #endif
259 }
260
261 #include "ComlibStrategy.h"
262
263 void CentralLB::ProcessAtSync()
264 {
265
266
267
268 #if CMK_LBDB_ON
269   if (reduction_started) return;              // reducton in progress
270
271   CmiAssert(CmiNodeAlive(CkMyPe()));
272   if (CkMyPe() == cur_ld_balancer) {
273     start_lb_time = CkWallTimer();
274   }
275  double total_load;
276  double idle_time;
277  double bg_walltime;
278  theLbdb->GetTime(&total_load,&total_load, &idle_time, &bg_walltime, &bg_walltime);
279  theLbdb->IdleTime(&idle_time);
280  CkPrintf("Total walltime [%d] %lf: %lf: %lf final laod: %lf\n", CkMyPe(), total_load, idle_time, bg_walltime, (total_load - idle_time));
281
282
283
284 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
285         initMlogLBStep(thisgroup);
286 #endif
287
288   // build message
289   BuildStatsMsg();
290
291 #if USE_REDUCTION
292     // reduction to get total number of objects and comm
293     // so that processor 0 can pre-allocate load balancing database
294   int counts[2];
295   counts[0] = theLbdb->GetObjDataSz();
296   counts[1] = theLbdb->GetCommDataSz();
297
298   CkCallback cb(CkIndex_CentralLB::ReceiveCounts((CkReductionMsg*)NULL), 
299                   thisProxy[0]);
300   contribute(2*sizeof(int), counts, CkReduction::sum_int, cb);
301   reduction_started = 1;
302 #else
303   SendStats();
304 #endif
305 #endif
306 }
307
308 //void CentralLB::ProcessAtSyncMin()
309 //{
310 //#if CMK_LBDB_ON
311 //  if (reduction_started) return;              // reducton in progress
312 //
313 //  CmiAssert(CmiNodeAlive(CkMyPe()));
314 //  if (CkMyPe() == cur_ld_balancer) {
315 //    start_lb_time = CkWallTimer();
316 //  }
317 //
318 //
319 //#if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
320 //      initMlogLBStep(thisgroup);
321 //#endif
322 //  
323 //  adaptive_struct.lb_no_iterations++;
324 // // CkPrintf("[%d] ProcessAtSyncMin lb_iteration [%d] adaptive_struct.lb_ideal_period [%d]\n", CkMyPe(),
325 // //     adaptive_struct.lb_no_iterations, adaptive_struct.lb_ideal_period);
326 //
327 //  // If decision has been made and has reached the lb_period, then do load
328 //  // balancing, else if hasn't reached ideal_period, then resume.
329 //  if (local_state == DECIDED) {
330 //    if (adaptive_struct.lb_no_iterations < adaptive_struct.lb_ideal_period) {
331 // //     CkPrintf("[%d] Decision is made but lagging\n", CkMyPe());
332 //      SendMinStats();
333 //      ResumeClients(0);
334 //    } else {
335 //      local_state = LOAD_BALANCE;
336 // //     CkPrintf("[%d] Decision is made and do LB\n", CkMyPe());
337 //      ProcessAtSync();
338 //    }
339 //    return;
340 //  }
341 //   
342 //  // If the state is ON and not DECIDED, then if havn't reached lb_period, then
343 //  // move ahead. If has reached lb_ideal_period, then change state to PAUSE and
344 //  // dont resume client.
345 //  if (local_state == ON) {
346 //    if (adaptive_struct.lb_no_iterations < adaptive_struct.lb_ideal_period) {
347 //      SendMinStats();
348 //      ResumeClients(0);
349 //    } else {
350 //      local_state = PAUSE;
351 //    }
352 //    return;
353 //  }
354 //
355 //  SendMinStats();
356 //  ResumeClients(0);
357 //#endif
358 //}
359 //
360 //void CentralLB::SendMinStats() {
361 //
362 // double total_load;
363 // double idle_time;
364 // double bg_walltime;
365 //  theLbdb->GetTime(&total_load,&total_load, &idle_time, &bg_walltime, &bg_walltime);
366 // // CkPrintf("Total walltime [%d] %lf: %lf: %lf final laod: %lf\n", CkMyPe(), total_load, idle_time, bg_walltime, (total_load - idle_time));
367 //
368 //  // Since the total_load is cumulative since the last load balancing stage,
369 //  // Hence it is subtracted from the previous load.
370 //  total_load -= idle_time;
371 //  double tmp = total_load;
372 //  total_load -= adaptive_struct.prev_load;
373 //  adaptive_struct.prev_load = tmp; 
374 //
375 //  double lb_data[4];
376 //  lb_data[0] = total_load;
377 //  lb_data[1] = total_load;
378 //  lb_data[2] = 1;
379 //  lb_data[3] = adaptive_struct.lb_no_iterations;
380 //  //CkPrintf("[%d] sends total load %lf at iter %d\n", CkMyPe(), total_load, adaptive_struct.lb_no_iterations);
381 //
382 //  if (adaptive_struct.lb_no_iterations != 0) {
383 //    CkCallback cb(CkIndex_CentralLB::ReceiveMinStats((CkReductionMsg*)NULL), 
384 //        thisProxy[0]);
385 ////    contribute(4*sizeof(double), lb_data, lbDataCollectionType, cb);
386 //  }
387 //
388 ////    int tmp1 = adaptive_struct.lb_no_iterations;
389 ////    CkPrintf("[%d] contribution iteration_no: %d\n",CkMyPe(), tmp1);
390 ////    // Send the current iteration no
391 ////    CkCallback cb1(CkIndex_CentralLB::ReceiveIterationNo((CkReductionMsg*)NULL), 
392 ////        thisProxy[0]);
393 ////    contribute(sizeof(int), &tmp1, CkReduction::max_int, cb1);
394 //}
395 //
396 //void CentralLB::ReceiveMinStats(CkReductionMsg *msg) {
397 //  CmiAssert(CkMyPe() == 0);
398 //  double* load = (LBRealType *) msg->getData();
399 //  double max = load[1];
400 //  double avg = load[0]/load[2];
401 //  int iteration_n = load[3];
402 //  CkPrintf("Iteration %d Total load : %lf Avg load: %lf Max load: %lf for %lf procs\n",iteration_n, load[0], load[0]/load[2], load[1], load[2]);
403 //  CkPrintf("Current calculated period %d\n", adaptive_struct.lb_calculated_period);
404 //  delete msg;
405 //
406 //  // Store the data for this iteration
407 //  AdaptiveData data;
408 //  data.iteration = adaptive_struct.lb_no_iterations;
409 //  data.max_load = max;
410 //  data.avg_load = avg;
411 //  adaptive_lbdb.history_data.push_back(data);
412 //
413 //  // If lb period inform is in progress, dont inform again
414 //  if (adaptive_struct.in_progress) {
415 //    return;
416 //  }
417 //
418 ////  if (adaptive_struct.lb_period_informed) {
419 ////    return;
420 ////  }
421 //
422 //  // If the max/avg ratio is greater than the threshold and also this is not the
423 //  // step immediately after load balancing, carry out load balancing
424 //  //if (max/avg >= 1.1 && adaptive_lbdb.history_data.size() > 4) {
425 //  if (max/avg >= 1.5 && adaptive_lbdb.history_data.size() > 4) {
426 //    CkPrintf("Carry out load balancing step at iter max/avg(%lf) > 1.1\n", max/avg);
427 ////    if (!adaptive_struct.lb_period_informed) {
428 ////      // Just for testing
429 ////      adaptive_struct.lb_calculated_period = 40;
430 ////      adaptive_struct.lb_period_informed = true;
431 ////      thisProxy.LoadBalanceDecision(adaptive_struct.lb_calculated_period);
432 ////      return;
433 ////    }
434 //
435 //    // If the new lb period is less than current set lb period
436 //    if (adaptive_struct.lb_calculated_period > iteration_n + 1) {
437 //      adaptive_struct.lb_calculated_period = iteration_n + 1;
438 //      adaptive_struct.lb_period_informed = true;
439 //      adaptive_struct.in_progress = true;
440 //      CkPrintf("Informing everyone the lb period is %d\n",
441 //          adaptive_struct.lb_calculated_period);
442 //      thisProxy.LoadBalanceDecision(adaptive_struct.lb_msg_send_no++, adaptive_struct.lb_calculated_period);
443 //    }
444 //    return;
445 //  }
446 //
447 //  // Generate the plan for the adaptive strategy
448 //  int period;
449 //  if (generatePlan(period)) {
450 //    //CkPrintf("Carry out load balancing step at iter\n");
451 //
452 //    // If the new lb period is less than current set lb period
453 //    if (adaptive_struct.lb_calculated_period > period) {
454 //      adaptive_struct.lb_calculated_period = period;
455 //      adaptive_struct.in_progress = true;
456 //      adaptive_struct.lb_period_informed = true;
457 //      CkPrintf("Informing everyone the lb period is %d\n",
458 //          adaptive_struct.lb_calculated_period);
459 //      thisProxy.LoadBalanceDecision(adaptive_struct.lb_msg_send_no++, adaptive_struct.lb_calculated_period);
460 //    }
461 //  }
462 //}
463 //
464 //bool CentralLB::generatePlan(int& period) {
465 //  if (adaptive_lbdb.history_data.size() <= 8) {
466 //    return false;
467 //  }
468 //
469 //  // Some heuristics for lbperiod
470 //  // If constant load or almost constant,
471 //  // then max * new_lb_period > avg * new_lb_period + lb_cost
472 //  double max = 0.0;
473 //  double avg = 0.0;
474 //  AdaptiveData data;
475 //  for (int i = 0; i < adaptive_lbdb.history_data.size(); i++) {
476 //    data = adaptive_lbdb.history_data[i];
477 //    max += data.max_load;
478 //    avg += data.avg_load;
479 //    CkPrintf("max (%d, %lf) avg (%d, %lf)\n", i, data.max_load, i, data.avg_load);
480 //  }
481 ////  max /= (adaptive_struct.lb_no_iterations - adaptive_lbdb.history_data[0].iteration);
482 ////  avg /= (adaptive_struct.lb_no_iterations - adaptive_lbdb.history_data[0].iteration);
483 ////
484 ////  adaptive_struct.lb_ideal_period = (adaptive_struct.lb_strategy_cost +
485 ////  adaptive_struct.lb_migration_cost) / (max - avg);
486 ////  CkPrintf("max : %lf, avg: %lf, strat cost: %lf, migration_cost: %lf, idealperiod : %d \n",
487 ////      max, avg, adaptive_struct.lb_strategy_cost, adaptive_struct.lb_migration_cost, adaptive_struct.lb_ideal_period);
488 ////
489 //  // If linearly varying load, then find lb_period
490 //  // area between the max and avg curve 
491 //  double mslope, aslope, mc, ac;
492 //  getLineEq(aslope, ac, mslope, mc);
493 //  CkPrintf("\n max: %fx + %f; avg: %fx + %f\n", mslope, mc, aslope, ac);
494 //  double a = (mslope - aslope)/2;
495 //  double b = (mc - ac);
496 //  double c = -(adaptive_struct.lb_strategy_cost + adaptive_struct.lb_migration_cost);
497 //  //c = -2.5;
498 //  bool got_period = getPeriodForLinear(a, b, c, period);
499 //  if (!got_period) {
500 //    return false;
501 //  }
502 //  
503 //  if (mslope < 0) {
504 //    if (period > (-mc/mslope)) {
505 //      CkPrintf("Max < 0 Period set when max load is -ve\n");
506 //      return false;
507 //    }
508 //  }
509 //
510 //  if (aslope < 0) {
511 //    if (period > (-ac/aslope)) {
512 //      CkPrintf("Avg < 0 Period set when avg load is -ve\n");
513 //      return false;
514 //    }
515 //  }
516 //
517 //  int intersection_t = (mc-ac) / (aslope - mslope);
518 //  if (intersection_t > 0 && period > intersection_t) {
519 //    CkPrintf("Avg | Max Period set when curves intersect\n");
520 //    return false;
521 //  }
522 //  return true;
523 //}
524 //
525 //bool CentralLB::getPeriodForLinear(double a, double b, double c, int& period) {
526 //  CkPrintf("Quadratic Equation %lf X^2 + %lf X + %lf\n", a, b, c);
527 //  if (a == 0.0) {
528 //    period = (-c / b);
529 //    CkPrintf("Ideal period for linear load %d\n", period);
530 //    return true;
531 //  }
532 //  int x;
533 //  double t = (b * b) - (4*a*c);
534 //  if (t < 0) {
535 //    CkPrintf("(b * b) - (4*a*c) is -ve sqrt : %lf\n", sqrt(t));
536 //    return false;
537 //  }
538 //  t = (-b + sqrt(t)) / (2*a);
539 //  x = t;
540 //  if (x < 0) {
541 //    CkPrintf("boo!!! x (%d) < 0\n", x);
542 //    x = 0;
543 //    return false;
544 //  }
545 //  period = x;
546 //  CkPrintf("Ideal period for linear load %d\n", period);
547 //  return true;
548 //}
549 //
550 //bool CentralLB::getLineEq(double& aslope, double& ac, double& mslope, double& mc) {
551 //  int total = adaptive_lbdb.history_data.size();
552 //  int iterations = 1 + adaptive_lbdb.history_data[total - 1].iteration -
553 //      adaptive_lbdb.history_data[0].iteration;
554 //  double a1 = 0;
555 //  double m1 = 0;
556 //  double a2 = 0;
557 //  double m2 = 0;
558 //  AdaptiveData data;
559 //  int i = 0;
560 //  for (i = 0; i < total/2; i++) {
561 //    data = adaptive_lbdb.history_data[i];
562 //    m1 += data.max_load;
563 //    a1 += data.avg_load;
564 //  }
565 //  m1 /= i;
566 //  a1 /= i;
567 //
568 //  for (i = total/2; i < total; i++) {
569 //    data = adaptive_lbdb.history_data[i];
570 //    m2 += data.max_load;
571 //    a2 += data.avg_load;
572 //  }
573 //  m2 /= (i - total/2);
574 //  a2 /= (i - total/2);
575 //
576 //  aslope = 2 * (a2 - a1) / iterations;
577 //  mslope = 2 * (m2 - m1) / iterations;
578 //  ac = adaptive_lbdb.history_data[0].avg_load;
579 //  mc = adaptive_lbdb.history_data[0].max_load;
580 //  return true;
581 //}
582 //
583 //void CentralLB::LoadBalanceDecision(int req_no, int period) {
584 //  if (req_no < adaptive_struct.lb_msg_recv_no) {
585 //    CkPrintf("Error!!! Received a request which was already sent or old\n");
586 //    return;
587 //  }
588 //  //CkPrintf("[%d] Load balance decision made cur iteration: %d period:%d state: %d\n",CkMyPe(), adaptive_struct.lb_no_iterations, period, local_state);
589 //  adaptive_struct.lb_ideal_period = period;
590 //  local_state = ON;
591 //  adaptive_struct.lb_msg_recv_no = req_no;
592 //  thisProxy[0].ReceiveIterationNo(req_no, adaptive_struct.lb_no_iterations);
593 //}
594 //
595 //void CentralLB::LoadBalanceDecisionFinal(int req_no, int period) {
596 //  if (req_no < adaptive_struct.lb_msg_recv_no) {
597 //    return;
598 //  }
599 //  //CkPrintf("[%d] Final Load balance decision made cur iteration: %d period:%d state: %d\n",CkMyPe(), adaptive_struct.lb_no_iterations, period, local_state);
600 //  adaptive_struct.lb_ideal_period = period;
601 //
602 //  if (local_state == ON) {
603 //    local_state = DECIDED;
604 //    return;
605 //  }
606 //
607 //  // If the state is PAUSE, then its waiting for the final decision from central
608 //  // processor. If the decision is that the ideal period is in the future,
609 //  // resume. If the ideal period is now, then carry out load balancing.
610 //  if (local_state == PAUSE) {
611 //    if (adaptive_struct.lb_no_iterations < adaptive_struct.lb_ideal_period) {
612 //      local_state = DECIDED;
613 //      SendMinStats();
614 //      ResumeClients(0);
615 //    } else {
616 //      local_state = LOAD_BALANCE;
617 //      ProcessAtSync();
618 //    }
619 //    return;
620 //  }
621 //  CkPrintf("Error!!! Final decision received but the state is invalid %d\n", local_state);
622 //}
623 //
624 //
625 //void CentralLB::ReceiveIterationNo(int req_no, int local_iter_no) {
626 //  CmiAssert(CkMyPe() == 0);
627 //
628 //  adaptive_struct.global_recv_iter_counter++;
629 //  if (local_iter_no > adaptive_struct.global_max_iter_no) {
630 //    adaptive_struct.global_max_iter_no = local_iter_no;
631 //  }
632 //  if (CkNumPes() == adaptive_struct.global_recv_iter_counter) {
633 //    adaptive_struct.lb_ideal_period = (adaptive_struct.lb_ideal_period > adaptive_struct.global_max_iter_no) ? adaptive_struct.lb_ideal_period : adaptive_struct.global_max_iter_no + 1;
634 //    thisProxy.LoadBalanceDecisionFinal(req_no, adaptive_struct.lb_ideal_period);
635 //    CkPrintf("Final lb_period %d\n", adaptive_struct.lb_ideal_period);
636 //    adaptive_struct.in_progress = false;
637 //    adaptive_struct.global_max_iter_no = 0;
638 //    adaptive_struct.global_recv_iter_counter = 0;
639 //  }
640 //}
641
642 // called only on 0
643 void CentralLB::ReceiveCounts(CkReductionMsg  *msg)
644 {
645   CmiAssert(CkMyPe() == 0);
646   if (statsData == NULL) statsData = new LDStats;
647
648   int *counts = (int *)msg->getData();
649   int n_objs = counts[0];
650   int n_comm = counts[1];
651
652     // resize database
653   statsData->objData.resize(n_objs);
654   statsData->from_proc.resize(n_objs);
655   statsData->to_proc.resize(n_objs);
656   statsData->commData.resize(n_comm);
657
658   DEBUGF(("[%d] ReceiveCounts: n_objs:%d n_comm:%d\n",CkMyPe(), n_objs, n_comm));
659         
660     // broadcast call to let everybody start to send stats
661   thisProxy.SendStats();
662 }
663
664 void CentralLB::BuildStatsMsg()
665 {
666 #if CMK_LBDB_ON
667   // build and send stats
668   const int osz = theLbdb->GetObjDataSz();
669   const int csz = theLbdb->GetCommDataSz();
670
671   int npes = CkNumPes();
672   CLBStatsMsg* msg = new CLBStatsMsg(osz, csz);
673   _MEMCHECK(msg);
674   msg->from_pe = CkMyPe();
675 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
676         msg->step = step();
677 #endif
678   //msg->serial = CrnRand();
679
680 /*
681   theLbdb->TotalTime(&msg->total_walltime,&msg->total_cputime);
682   theLbdb->IdleTime(&msg->idletime);
683   theLbdb->BackgroundLoad(&msg->bg_walltime,&msg->bg_cputime);
684 */
685 #if CMK_LB_CPUTIMER
686   theLbdb->GetTime(&msg->total_walltime,&msg->total_cputime,
687                    &msg->idletime, &msg->bg_walltime,&msg->bg_cputime);
688 #else
689   theLbdb->GetTime(&msg->total_walltime,&msg->total_walltime,
690                    &msg->idletime, &msg->bg_walltime,&msg->bg_walltime);
691 #endif
692
693   msg->pe_speed = myspeed;
694   DEBUGF(("Processor %d Total time (wall,cpu) = %f %f Idle = %f Bg = %f %f\n", CkMyPe(),msg->total_walltime,msg->total_cputime,msg->idletime,msg->bg_walltime,msg->bg_cputime));
695
696   msg->n_objs = osz;
697   theLbdb->GetObjData(msg->objData);
698   msg->n_comm = csz;
699   theLbdb->GetCommData(msg->commData);
700 //  theLbdb->ClearLoads();
701   DEBUGF(("PE %d BuildStatsMsg %d objs, %d comm\n",CkMyPe(),msg->n_objs,msg->n_comm));
702
703   if(CkMyPe() == cur_ld_balancer) {
704     msg->avail_vector = new char[CkNumPes()];
705     LBDatabaseObj()->get_avail_vector(msg->avail_vector);
706     msg->next_lb = LBDatabaseObj()->new_lbbalancer();
707   }
708
709   CmiAssert(statsMsg == NULL);
710   statsMsg = msg;
711 #endif
712 }
713
714
715 // called on every processor
716 void CentralLB::SendStats()
717 {
718 #if CMK_LBDB_ON
719   CmiAssert(statsMsg != NULL);
720   reduction_started = 0;
721
722 #if USE_LDB_SPANNING_TREE
723   if(CkNumPes()>1024)
724   {
725     if (CkMyPe() == cur_ld_balancer)
726       thisProxy[CkMyPe()].ReceiveStats(statsMsg);
727     else
728       thisProxy[CkMyPe()].ReceiveStatsViaTree(statsMsg);
729   }
730   else
731 #endif
732   {
733     DEBUGF(("[%d] calling ReceiveStats on step %d \n",CmiMyPe(),step()));
734     thisProxy[cur_ld_balancer].ReceiveStats(statsMsg);
735   }
736
737   statsMsg = NULL;
738
739 #ifdef __BIGSIM__
740   BgEndStreaming();
741 #endif
742
743   {
744   // enfore the barrier to wait until centralLB says no
745   LDOMHandle h;
746   h.id.id.idx = 0;
747   theLbdb->getLBDB()->RegisteringObjects(h);
748   }
749 #endif
750 }
751
752 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
753 extern int donotCountMigration;
754 #endif
755
756 void CentralLB::Migrated(LDObjHandle h, int waitBarrier)
757 {
758 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
759     if(donotCountMigration){
760         return ;
761     }
762 #endif
763
764 #if CMK_LBDB_ON
765   if (waitBarrier) {
766             migrates_completed++;
767       DEBUGF(("[%d] An object migrated! %d %d\n",CkMyPe(),migrates_completed,migrates_expected));
768     if (migrates_completed == migrates_expected) {
769       MigrationDone(1);
770     }
771   }
772   else {
773     future_migrates_completed ++;
774     DEBUGF(("[%d] An object migrated with no barrier! %d expected: %d\n",CkMyPe(),future_migrates_completed,future_migrates_expected));
775     if (future_migrates_completed == future_migrates_expected)  {
776         CheckMigrationComplete();
777     }
778   }
779 #endif
780 }
781
782 void CentralLB::MissMigrate(int waitForBarrier)
783 {
784   LDObjHandle h;
785   Migrated(h, waitForBarrier);
786 }
787
788 // build a complete data from bufferred messages
789 // not used when USE_REDUCTION = 1
790 void CentralLB::buildStats()
791 {
792     statsData->nprocs() = stats_msg_count;
793     // allocate space
794     statsData->objData.resize(statsData->n_objs);
795     statsData->from_proc.resize(statsData->n_objs);
796     statsData->to_proc.resize(statsData->n_objs);
797     statsData->commData.resize(statsData->n_comm);
798
799     int nobj = 0;
800     int ncom = 0;
801     int nmigobj = 0;
802     // copy all data in individule message to this big structure
803     for (int pe=0; pe<CkNumPes(); pe++) {
804        int i;
805        CLBStatsMsg *msg = statsMsgsList[pe];
806        if(msg == NULL) continue;
807        for (i=0; i<msg->n_objs; i++) {
808          statsData->from_proc[nobj] = statsData->to_proc[nobj] = pe;
809          statsData->objData[nobj] = msg->objData[i];
810          if (msg->objData[i].migratable) nmigobj++;
811          nobj++;
812        }
813        for (i=0; i<msg->n_comm; i++) {
814          statsData->commData[ncom] = msg->commData[i];
815          ncom++;
816        }
817        // free the memory
818        delete msg;
819        statsMsgsList[pe]=0;
820     }
821     statsData->n_migrateobjs = nmigobj;
822 }
823
824 // deposit one processor data at a time, note database is pre-allocated
825 // to have enough space
826 // used when USE_REDUCTION = 1
827 void CentralLB::depositData(CLBStatsMsg *m)
828 {
829   int i;
830   if (m == NULL) return;
831
832   const int pe = m->from_pe;
833   struct ProcStats &procStat = statsData->procs[pe];
834   procStat.pe = pe;
835   procStat.total_walltime = m->total_walltime;
836   procStat.idletime = m->idletime;
837   procStat.bg_walltime = m->bg_walltime;
838 #if CMK_LB_CPUTIMER
839   procStat.total_cputime = m->total_cputime;
840   procStat.bg_cputime = m->bg_cputime;
841 #endif
842   procStat.pe_speed = m->pe_speed;
843   //procStat.utilization = 1.0;
844   procStat.available = CmiTrue;
845   procStat.n_objs = m->n_objs;
846
847   int &nobj = statsData->n_objs;
848   int &nmigobj = statsData->n_migrateobjs;
849   for (i=0; i<m->n_objs; i++) {
850       statsData->from_proc[nobj] = statsData->to_proc[nobj] = pe;
851       statsData->objData[nobj] = m->objData[i];
852       if (m->objData[i].migratable) nmigobj++;
853       nobj++;
854       CmiAssert(nobj <= statsData->objData.capacity());
855   }
856   int &n_comm = statsData->n_comm;
857   for (i=0; i<m->n_comm; i++) {
858       statsData->commData[n_comm] = m->commData[i];
859       n_comm++;
860       CmiAssert(n_comm <= statsData->commData.capacity());
861   }
862   delete m;
863 }
864
865 void CentralLB::ReceiveStats(CkMarshalledCLBStatsMessage &msg)
866 {
867 #if CMK_LBDB_ON
868   if (statsMsgsList == NULL) {
869     statsMsgsList = new CLBStatsMsg*[CkNumPes()];
870     CmiAssert(statsMsgsList != NULL);
871     for(int i=0; i < CkNumPes(); i++)
872       statsMsgsList[i] = 0;
873   }
874   if (statsData == NULL) statsData = new LDStats;
875
876     //  loop through all CLBStatsMsg in the incoming msg
877   int count = msg.getCount();
878   for (int num = 0; num < count; num++) 
879   {
880     CLBStatsMsg *m = msg.getMessage(num);
881     CmiAssert(m!=NULL);
882     const int pe = m->from_pe;
883     DEBUGF(("Stats msg received, %d %d %d %p step %d\n", pe,stats_msg_count,m->n_objs,m,step()));
884 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))     
885 /*      
886  *  if(m->step < step()){
887  *    //TODO: if a processor is redoing an old load balance step..
888  *    //tell it that the step is done and that it should not perform any migrations
889  *      thisProxy[pe].ReceiveDummyMigration();
890  *  }*/
891 #endif
892         
893     if(!CmiNodeAlive(pe)){
894         DEBUGF(("[%d] ReceiveStats called from invalidProcessor %d\n",CkMyPe(),pe));
895         continue;
896     }
897         
898     if (m->avail_vector!=NULL) {
899       LBDatabaseObj()->set_avail_vector(m->avail_vector,  m->next_lb);
900     }
901
902     if (statsMsgsList[pe] != 0) {
903       CkPrintf("*** Unexpected CLBStatsMsg in ReceiveStats from PE %d ***\n",
904              pe);
905     } else {
906       statsMsgsList[pe] = m;
907 #if USE_REDUCTION
908       depositData(m);
909 #else
910       // store per processor data right away
911       struct ProcStats &procStat = statsData->procs[pe];
912       procStat.pe = pe;
913       procStat.total_walltime = m->total_walltime;
914       procStat.idletime = m->idletime;
915       procStat.bg_walltime = m->bg_walltime;
916 #if CMK_LB_CPUTIMER
917       procStat.total_cputime = m->total_cputime;
918       procStat.bg_cputime = m->bg_cputime;
919 #endif
920       procStat.pe_speed = m->pe_speed;
921       //procStat.utilization = 1.0;
922       procStat.available = CmiTrue;
923       procStat.n_objs = m->n_objs;
924
925       statsData->n_objs += m->n_objs;
926       statsData->n_comm += m->n_comm;
927 #endif
928       stats_msg_count++;
929     }
930   }    // end of for
931
932   const int clients = CkNumValidPes();
933   DEBUGF(("THIS POINT count = %d, clients = %d\n",stats_msg_count,clients));
934  
935   if (stats_msg_count == clients) {
936         DEBUGF(("[%d] All stats messages received \n",CmiMyPe()));
937     statsData->nprocs() = stats_msg_count;
938     thisProxy[CkMyPe()].LoadBalance();
939   }
940 #endif
941 }
942
943 /** added by Abhinav for receiving msgs via spanning tree */
944 void CentralLB::ReceiveStatsViaTree(CkMarshalledCLBStatsMessage &msg)
945 {
946 #if CMK_LBDB_ON
947         CmiAssert(CkMyPe() != 0);
948         bufMsg.add(msg);         // buffer messages
949         count_msgs++;
950         //CkPrintf("here %d\n", CkMyPe());
951         if (count_msgs == st.numChildren+1) {
952                 if(st.parent == 0)
953                 {
954                         thisProxy[0].ReceiveStats(bufMsg);
955                         //CkPrintf("from %d\n", CkMyPe());
956                 }
957                 else
958                         thisProxy[st.parent].ReceiveStatsViaTree(bufMsg);
959                 count_msgs = 0;
960                 bufMsg.free();
961         } 
962 #endif
963 }
964
965 void CentralLB::LoadBalance()
966 {
967 #if CMK_LBDB_ON
968   int proc;
969   const int clients = CkNumPes();
970
971 #if ! USE_REDUCTION
972   // build data
973   buildStats();
974 #else
975   for (proc = 0; proc < clients; proc++) statsMsgsList[proc] = NULL;
976 #endif
977
978   if (!_lb_args.samePeSpeed()) statsData->normalize_speed();
979
980   if (_lb_args.debug()) 
981       CmiPrintf("\nCharmLB> %s: PE [%d] step %d starting at %f Memory: %f MB\n",
982                   lbname, cur_ld_balancer, step(), start_lb_time,
983                   CmiMemoryUsage()/(1024.0*1024.0));
984
985   // if we are in simulation mode read data
986   if (LBSimulation::doSimulation) simulationRead();
987
988   char *availVector = LBDatabaseObj()->availVector();
989   for(proc = 0; proc < clients; proc++)
990       statsData->procs[proc].available = (CmiBool)availVector[proc];
991
992   preprocess(statsData);
993
994 //    CkPrintf("Before Calling Strategy\n");
995
996   if (_lb_args.printSummary()) {
997       LBInfo info(clients);
998         // not take comm data
999       info.getInfo(statsData, clients, 0);
1000       LBRealType mLoad, mCpuLoad, totalLoad;
1001       info.getSummary(mLoad, mCpuLoad, totalLoad);
1002       int nmsgs, nbytes;
1003       statsData->computeNonlocalComm(nmsgs, nbytes);
1004       CkPrintf("[%d] Load Summary (before LB): max (with bg load): %f max (obj only): %f average: %f at step %d nonlocal: %d msgs %.2fKB.\n", CkMyPe(), mLoad, mCpuLoad, totalLoad/clients, step(), nmsgs, 1.0*nbytes/1024);
1005 //      if (_lb_args.debug() > 1) {
1006 //        for (int i=0; i<statsData->n_objs; i++)
1007 //          CmiPrintf("[%d] %.10f %.10f\n", i, statsData->objData[i].minWall, statsData->objData[i].maxWall);
1008 //      }
1009   }
1010
1011 #if CMK_REPLAYSYSTEM
1012   LDHandle *loadBalancer_pointers;
1013   if (_replaySystem) {
1014     loadBalancer_pointers = (LDHandle*)malloc(CkNumPes()*sizeof(LDHandle));
1015     for (int i=0; i<statsData->n_objs; ++i) loadBalancer_pointers[statsData->from_proc[i]] = statsData->objData[i].handle.omhandle.ldb;
1016   }
1017 #endif
1018   
1019   LBMigrateMsg* migrateMsg = Strategy(statsData);
1020 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1021         migrateMsg->step = step();
1022 #endif
1023
1024 #if CMK_REPLAYSYSTEM
1025   CpdHandleLBMessage(&migrateMsg);
1026   if (_replaySystem) {
1027     for (int i=0; i<migrateMsg->n_moves; ++i) migrateMsg->moves[i].obj.omhandle.ldb = loadBalancer_pointers[migrateMsg->moves[i].from_pe];
1028     free(loadBalancer_pointers);
1029   }
1030 #endif
1031   
1032   LBDatabaseObj()->get_avail_vector(migrateMsg->avail_vector);
1033   migrateMsg->next_lb = LBDatabaseObj()->new_lbbalancer();
1034
1035   // if this is the step at which we need to dump the database
1036   simulationWrite();
1037
1038 //  calculate predicted load
1039 //  very time consuming though, so only happen when debugging is on
1040   if (_lb_args.printSummary()) {
1041       LBInfo info(clients);
1042         // not take comm data
1043       getPredictedLoadWithMsg(statsData, clients, migrateMsg, info, 0);
1044       LBRealType mLoad, mCpuLoad, totalLoad;
1045       info.getSummary(mLoad, mCpuLoad, totalLoad);
1046       int nmsgs, nbytes;
1047       statsData->computeNonlocalComm(nmsgs, nbytes);
1048       CkPrintf("[%d] Load Summary (after LB): max (with bg load): %f max (obj only): %f average: %f at step %d nonlocal: %d msgs %.2fKB useMem: %.2fKB.\n", CkMyPe(), mLoad, mCpuLoad, totalLoad/clients, step(), nmsgs, 1.0*nbytes/1024, (1.0*useMem())/1024);
1049       for (int i=0; i<clients; i++)
1050         migrateMsg->expectedLoad[i] = info.peLoads[i];
1051   }
1052
1053   DEBUGF(("[%d]calling recv migration\n",CkMyPe()));
1054 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_)) 
1055     lbDecisionCount++;
1056     migrateMsg->lbDecisionCount = lbDecisionCount;
1057 #endif
1058
1059   envelope *env = UsrToEnv(migrateMsg);
1060   if (1) {
1061       // broadcast
1062     thisProxy.ReceiveMigration(migrateMsg);
1063   }
1064   else {
1065     // split the migration for each processor
1066     for (int p=0; p<CkNumPes(); p++) {
1067       LBMigrateMsg *m = extractMigrateMsg(migrateMsg, p);
1068       thisProxy[p].ReceiveMigration(m);
1069     }
1070     delete migrateMsg;
1071   }
1072
1073   // Zero out data structures for next cycle
1074   // CkPrintf("zeroing out data\n");
1075   statsData->clear();
1076   stats_msg_count=0;
1077 #endif
1078 }
1079
1080 // test if sender and receiver in a commData is nonmigratable.
1081 static int isMigratable(LDObjData **objData, int *len, int count, const LDCommData &commData)
1082 {
1083 #if CMK_LBDB_ON
1084   for (int pe=0 ; pe<count; pe++)
1085   {
1086     for (int i=0; i<len[pe]; i++)
1087       if (LDObjIDEqual(objData[pe][i].objID(), commData.sender.objID()) ||
1088           LDObjIDEqual(objData[pe][i].objID(), commData.receiver.get_destObj().objID())) 
1089       return 0;
1090   }
1091 #endif
1092   return 1;
1093 }
1094
1095 // rebuild LDStats and remove all non-migratble objects and related things
1096 void CentralLB::removeNonMigratable(LDStats* stats, int count)
1097 {
1098   int i;
1099
1100   // check if we have non-migratable objects
1101   int have = 0;
1102   for (i=0; i<stats->n_objs; i++) 
1103   {
1104     LDObjData &odata = stats->objData[i];
1105     if (!odata.migratable) {
1106       have = 1; break;
1107     }
1108   }
1109   if (have == 0) return;
1110
1111   CkVec<LDObjData> nonmig;
1112   CkVec<int> new_from_proc, new_to_proc;
1113   nonmig.resize(stats->n_migrateobjs);
1114   new_from_proc.resize(stats->n_migrateobjs);
1115   new_to_proc.resize(stats->n_migrateobjs);
1116   int n_objs = 0;
1117   for (i=0; i<stats->n_objs; i++) 
1118   {
1119     LDObjData &odata = stats->objData[i];
1120     if (odata.migratable) {
1121       nonmig[n_objs] = odata;
1122       new_from_proc[n_objs] = stats->from_proc[i];
1123       new_to_proc[n_objs] = stats->to_proc[i];
1124       n_objs ++;
1125     }
1126     else {
1127       stats->procs[stats->from_proc[i]].bg_walltime += odata.wallTime;
1128 #if CMK_LB_CPUTIMER
1129       stats->procs[stats->from_proc[i]].bg_cputime += odata.cpuTime;
1130 #endif
1131     }
1132   }
1133   CmiAssert(stats->n_migrateobjs == n_objs);
1134
1135   stats->makeCommHash();
1136   
1137   CkVec<LDCommData> newCommData;
1138   newCommData.resize(stats->n_comm);
1139   int n_comm = 0;
1140   for (i=0; i<stats->n_comm; i++) 
1141   {
1142     LDCommData& cdata = stats->commData[i];
1143     if (!cdata.from_proc()) 
1144     {
1145       int idx = stats->getSendHash(cdata);
1146       CmiAssert(idx != -1);
1147       if (!stats->objData[idx].migratable) continue;
1148     }
1149     switch (cdata.receiver.get_type()) {
1150     case LD_PROC_MSG:
1151       break;
1152     case LD_OBJ_MSG:  {
1153       int idx = stats->getRecvHash(cdata);
1154       if (stats->complete_flag)
1155         CmiAssert(idx != -1);
1156       else if (idx == -1) continue;          // receiver not in this group
1157       if (!stats->objData[idx].migratable) continue;
1158       break;
1159       }
1160     case LD_OBJLIST_MSG:    // object message FIXME add multicast
1161       break;
1162     }
1163     newCommData[n_comm] = cdata;
1164     n_comm ++;
1165   }
1166
1167   if (n_objs != stats->n_objs) CmiPrintf("Removed %d nonmigratable %d comms - n_objs:%d migratable:%d\n", stats->n_objs-n_objs, stats->n_objs, stats->n_migrateobjs, stats->n_comm-n_comm);
1168
1169   // swap to new data
1170   stats->objData = nonmig;
1171   stats->from_proc = new_from_proc;
1172   stats->to_proc = new_to_proc;
1173   stats->n_objs = n_objs;
1174
1175   stats->commData = newCommData;
1176   stats->n_comm = n_comm;
1177
1178   stats->deleteCommHash();
1179   stats->makeCommHash();
1180
1181 }
1182
1183
1184 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1185 extern int restarted;
1186 #endif
1187
1188 void CentralLB::ReceiveMigration(LBMigrateMsg *m)
1189 {
1190   storedMigrateMsg = m;
1191   CkCallback cb(CkIndex_CentralLB::ProcessReceiveMigration((CkReductionMsg*)NULL),
1192                   thisProxy);
1193   contribute(0, NULL, CkReduction::max_int, cb);
1194
1195   // Reset all adaptive lb related fields since load balancing is being done.
1196  // adaptive_struct.lb_no_iterations = -1;
1197  // adaptive_lbdb.history_data.clear();
1198  // adaptive_struct.prev_load = 0.0;
1199  // local_state = OFF;
1200  // adaptive_struct.lb_period_informed = false;
1201  // adaptive_struct.lb_ideal_period = INT_MAX;
1202  // adaptive_struct.lb_calculated_period = INT_MAX;
1203  // adaptive_struct.lb_msg_send_no = 0;
1204  // adaptive_struct.lb_msg_recv_no = 0;
1205 }
1206
1207 void CentralLB::ProcessReceiveMigration(CkReductionMsg  *msg)
1208 {
1209 #if CMK_LBDB_ON
1210         int i;
1211         LBMigrateMsg *m = storedMigrateMsg;
1212         CmiAssert(m!=NULL);
1213         delete msg;
1214
1215 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1216         int *dummyCounts;
1217
1218         DEBUGF(("[%d] Starting ReceiveMigration WITH step %d m->step %d\n",CkMyPe(),step(),m->step));
1219         // CmiPrintf("[%d] Starting ReceiveMigration step %d m->step %d\n",CkMyPe(),step(),m->step);
1220         if(step() > m->step){
1221                 char str[100];
1222                 envelope *env = UsrToEnv(m);
1223                 CmiPrintf("[%d] Object %s tProcessed %d m->TN %d\n",CmiMyPe(),mlogData->objID.toString(str),mlogData->tProcessed,env->TN);
1224                 return;
1225         }
1226         lbDecisionCount = m->lbDecisionCount;
1227 #endif
1228
1229   if (_lb_args.debug() > 1) 
1230     if (CkMyPe()%1024==0) CmiPrintf("[%d] Starting ReceiveMigration step %d at %f\n",CkMyPe(),step(), CmiWallTimer());
1231
1232   for (i=0; i<CkNumPes(); i++) theLbdb->lastLBInfo.expectedLoad[i] = m->expectedLoad[i];
1233   CmiAssert(migrates_expected <= 0 || migrates_completed == migrates_expected);
1234 /*FAULT_EVAC*/
1235   if(!CmiNodeAlive(CkMyPe())){
1236         delete m;
1237         return;
1238   }
1239   migrates_expected = 0;
1240   future_migrates_expected = 0;
1241 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1242         int sending=0;
1243     int dummy=0;
1244         LBDB *_myLBDB = theLbdb->getLBDB();
1245         if(_restartFlag){
1246         dummyCounts = new int[CmiNumPes()];
1247         bzero(dummyCounts,sizeof(int)*CmiNumPes());
1248     }
1249 #endif
1250   for(i=0; i < m->n_moves; i++) {
1251     MigrateInfo& move = m->moves[i];
1252     const int me = CkMyPe();
1253     if (move.from_pe == me && move.to_pe != me) {
1254       DEBUGF(("[%d] migrating object to %d\n",move.from_pe,move.to_pe));
1255       // migrate object, in case it is already gone, inform toPe
1256 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
1257       if (theLbdb->Migrate(move.obj,move.to_pe) == 0) 
1258          thisProxy[move.to_pe].MissMigrate(!move.async_arrival);
1259 #else
1260             if(_restartFlag == 0){
1261                 DEBUG(CmiPrintf("[%d] need to move object from %d to %d \n",CkMyPe(),move.from_pe,move.to_pe));
1262                 theLbdb->Migrate(move.obj,move.to_pe);
1263                 sending++;
1264             }else{
1265                 if(_myLBDB->validObjHandle(move.obj)){
1266                     DEBUG(CmiPrintf("[%d] need to move object from %d to %d \n",CkMyPe(),move.from_pe,move.to_pe));
1267                     theLbdb->Migrate(move.obj,move.to_pe);
1268                     sending++;
1269                 }else{
1270                     DEBUG(CmiPrintf("[%d] dummy move to pe %d detected after restart \n",CmiMyPe(),move.to_pe));
1271                     dummyCounts[move.to_pe]++;
1272                     dummy++;
1273                 }
1274             }
1275 #endif
1276     } else if (move.from_pe != me && move.to_pe == me) {
1277        DEBUGF(("[%d] expecting object from %d\n",move.to_pe,move.from_pe));
1278       if (!move.async_arrival) migrates_expected++;
1279       else future_migrates_expected++;
1280     }
1281   }
1282   DEBUGF(("[%d] in ReceiveMigration %d moves expected: %d future expected: %d\n",CkMyPe(),m->n_moves, migrates_expected, future_migrates_expected));
1283   // if (_lb_debug) CkPrintf("[%d] expecting %d objects migrating.\n", CkMyPe(), migrates_expected);
1284
1285 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1286         if(_restartFlag){
1287                 sendDummyMigrationCounts(dummyCounts);
1288                 _restartFlag  =0;
1289         delete []dummyCounts;
1290         }
1291 #endif
1292
1293
1294 #if 0
1295   if (m->n_moves ==0) {
1296     theLbdb->SetLBPeriod(theLbdb->GetLBPeriod()*2);
1297   }
1298 #endif
1299   cur_ld_balancer = m->next_lb;
1300   if((CkMyPe() == cur_ld_balancer) && (cur_ld_balancer != 0)){
1301       LBDatabaseObj()->set_avail_vector(m->avail_vector, -2);
1302   }
1303
1304   if (migrates_expected == 0 || migrates_completed == migrates_expected)
1305     MigrationDone(1);
1306   delete m;
1307
1308 //      CkEvacuatedElement();
1309 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1310 //  migrates_expected = 0;
1311 //  //  ResumeClients(1);
1312 #endif
1313 #endif
1314 }
1315
1316 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1317 void CentralLB::ReceiveDummyMigration(int globalDecisionCount){
1318     DEBUGF(("[%d] ReceiveDummyMigration called for step %d with globalDecisionCount %d\n",CkMyPe(),step(),globalDecisionCount));
1319     //TODO: this is gonna be important when a crash happens during checkpoint
1320     //the globalDecisionCount would have to be saved and compared against
1321     //a future recvMigration
1322                 
1323         thisProxy[CkMyPe()].ResumeClients(1);
1324 }
1325 #endif
1326
1327 void CentralLB::MigrationDone(int balancing)
1328 {
1329 #if CMK_LBDB_ON
1330   migrates_completed = 0;
1331   migrates_expected = -1;
1332   // clear load stats
1333   if (balancing) theLbdb->ClearLoads();
1334   // Increment to next step
1335   theLbdb->incStep();
1336         DEBUGF(("[%d] Incrementing Step %d \n",CkMyPe(),step()));
1337   // if sync resume, invoke a barrier
1338
1339 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1340     savedBalancing = balancing;
1341     startLoadBalancingMlog(&resumeCentralLbAfterChkpt,(void *)this);
1342 #endif
1343
1344   LBDatabase::Object()->MigrationDone();    // call registered callbacks
1345
1346   LoadbalanceDone(balancing);        // callback
1347 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
1348   // if sync resume invoke a barrier
1349   if (balancing && _lb_args.syncResume()) {
1350     CkCallback cb(CkIndex_CentralLB::ResumeClients((CkReductionMsg*)NULL), 
1351                   thisProxy);
1352     contribute(0, NULL, CkReduction::sum_int, cb);
1353   }
1354   else{ 
1355     if(CmiNodeAlive(CkMyPe())){
1356         thisProxy [CkMyPe()].ResumeClients(balancing);
1357     }   
1358   }     
1359 #if CMK_GRID_QUEUE_AVAILABLE
1360   CmiGridQueueDeregisterAll ();
1361   CpvAccess(CkGridObject) = NULL;
1362 #endif
1363 #endif 
1364 #endif
1365 }
1366
1367 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1368 void CentralLB::endMigrationDone(int balancing){
1369     DEBUGF(("[%d] CentralLB::endMigrationDone step %d\n",CkMyPe(),step()));
1370
1371
1372   if (balancing && _lb_args.syncResume()) {
1373     CkCallback cb(CkIndex_CentralLB::ResumeClients((CkReductionMsg*)NULL),
1374                   thisProxy);
1375     contribute(0, NULL, CkReduction::sum_int, cb);
1376   }
1377   else{
1378     if(CmiNodeAlive(CkMyPe())){
1379     DEBUGF(("[%d] Sending ResumeClients balancing %d \n",CkMyPe(),balancing));
1380     thisProxy [CkMyPe()].ResumeClients(balancing);
1381     }
1382   }
1383
1384 }
1385 #endif
1386
1387 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1388 void resumeCentralLbAfterChkpt(void *_lb){
1389     CentralLB *lb= (CentralLB *)_lb;
1390     CpvAccess(_currentObj)=lb;
1391     lb->endMigrationDone(lb->savedBalancing);
1392 }
1393 #endif
1394
1395
1396 void CentralLB::ResumeClients(CkReductionMsg *msg)
1397 {
1398   ResumeClients(1);
1399   delete msg;
1400 }
1401
1402 void CentralLB::ResumeClients(int balancing)
1403 {
1404 #if CMK_LBDB_ON
1405 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1406     resumeCount++;
1407     globalResumeCount = resumeCount;
1408 #endif
1409   DEBUGF(("[%d] Resuming clients. balancing:%d.\n",CkMyPe(),balancing));
1410   if (balancing && _lb_args.debug() && CkMyPe() == cur_ld_balancer) {
1411     double end_lb_time = CkWallTimer();
1412   }
1413
1414 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
1415   if (balancing) ComlibNotifyMigrationDone();  
1416 #endif
1417
1418   theLbdb->ResumeClients();
1419   if (balancing)  {
1420
1421     CheckMigrationComplete();
1422     if (future_migrates_expected == 0 || 
1423             future_migrates_expected == future_migrates_completed) {
1424       CheckMigrationComplete();
1425     }
1426   }
1427 #endif
1428 }
1429
1430 /*
1431   migration of objects contains two different kinds:
1432   (1) objects want to make a barrier for migration completion
1433       (waitForBarrier is true)
1434       migrationDone() to finish and resumeClients
1435   (2) objects don't need a barrier
1436   However, next load balancing can only happen when both migrations complete
1437 */ 
1438 void CentralLB::CheckMigrationComplete()
1439 {
1440 #if CMK_LBDB_ON
1441   lbdone ++;
1442   if (lbdone == 2) {
1443     if (_lb_args.debug() && CkMyPe()==0) {
1444       double end_lb_time = CkWallTimer();
1445       CkPrintf("CharmLB> %s: PE [%d] step %d finished at %f duration %f s\n\n",
1446                 lbname, cur_ld_balancer, step()-1, end_lb_time,
1447                 end_lb_time-start_lb_time);
1448     }
1449
1450     //FIX ME!!! adaptive_struct.lb_migration_cost = (CkWallTimer() - start_lb_time);
1451     theLbdb->SetMigrationCost(CkWallTimer() - start_lb_time);
1452
1453     lbdone = 0;
1454     future_migrates_expected = -1;
1455     future_migrates_completed = 0;
1456
1457
1458     DEBUGF(("[%d] Migration Complete\n", CkMyPe()));
1459     // release local barrier  so that the next load balancer can go
1460     LDOMHandle h;
1461     h.id.id.idx = 0;
1462     theLbdb->getLBDB()->DoneRegisteringObjects(h);
1463     // switch to the next load balancer in the list
1464     // subtle: called from Migrated() may result in Migrated() called in next LB
1465     theLbdb->nextLoadbalancer(seqno);
1466   }
1467 #endif
1468 }
1469
1470 void CentralLB::preprocess(LDStats* stats)
1471 {
1472   if (_lb_args.ignoreBgLoad())
1473     stats->clearBgLoad();
1474
1475   // Call the predictor for the future
1476   if (_lb_predict) FuturePredictor(statsData);
1477 }
1478
1479 // default load balancing strategy
1480 LBMigrateMsg* CentralLB::Strategy(LDStats* stats)
1481 {
1482 #if CMK_LBDB_ON
1483   double strat_start_time = CkWallTimer();
1484   if (_lb_args.debug())
1485     CkPrintf("CharmLB> %s: PE [%d] strategy starting at %f\n", lbname, cur_ld_balancer, strat_start_time);
1486
1487   work(stats);
1488
1489   if (_lb_args.debug()>2)  {
1490     CkPrintf("CharmLB> Obj Map:\n");
1491     for (int i=0; i<stats->n_objs; i++) CkPrintf("%d ", stats->to_proc[i]);
1492     CkPrintf("\n");
1493   }
1494
1495   LBMigrateMsg *msg = createMigrateMsg(stats);
1496
1497   if (_lb_args.debug()) {
1498     double strat_end_time = CkWallTimer();
1499     envelope *env = UsrToEnv(msg);
1500
1501     double lbdbMemsize = LBDatabase::Object()->useMem()/1000;
1502     CkPrintf("CharmLB> %s: PE [%d] Memory: LBManager: %d KB CentralLB: %d KB\n",
1503               lbname, cur_ld_balancer, (int)lbdbMemsize, (int)(useMem()/1000));
1504     CkPrintf("CharmLB> %s: PE [%d] #Objects migrating: %d, LBMigrateMsg size: %.2f MB\n", lbname, cur_ld_balancer, msg->n_moves, env->getTotalsize()/1024.0/1024.0);
1505     CkPrintf("CharmLB> %s: PE [%d] strategy finished at %f duration %f s\n",
1506               lbname, cur_ld_balancer, strat_end_time, strat_end_time-strat_start_time);
1507     // FIX ME!!! adaptive_struct.lb_strategy_cost = (strat_end_time - strat_start_time);
1508     //CkPrintf("Strategy cost %f %f %f\n", strat_end_time, strat_start_time, adaptive_struct.lb_strategy_cost);
1509     theLbdb->SetStrategyCost(strat_end_time - strat_start_time);
1510   }
1511   return msg;
1512 #else
1513   return NULL;
1514 #endif
1515 }
1516
1517 void CentralLB::work(LDStats* stats)
1518 {
1519   // does nothing but print the database
1520   stats->print();
1521 }
1522
1523 // generate migrate message from stats->from_proc and to_proc
1524 LBMigrateMsg * CentralLB::createMigrateMsg(LDStats* stats)
1525 {
1526   int i;
1527   CkVec<MigrateInfo*> migrateInfo;
1528   for (i=0; i<stats->n_objs; i++) {
1529     LDObjData &objData = stats->objData[i];
1530     int frompe = stats->from_proc[i];
1531     int tope = stats->to_proc[i];
1532     if (frompe != tope) {
1533       //      CkPrintf("[%d] Obj %d migrating from %d to %d\n",
1534       //         CkMyPe(),obj,pe,dest);
1535       MigrateInfo *migrateMe = new MigrateInfo;
1536       migrateMe->obj = objData.handle;
1537       migrateMe->from_pe = frompe;
1538       migrateMe->to_pe = tope;
1539       migrateMe->async_arrival = objData.asyncArrival;
1540       migrateInfo.insertAtEnd(migrateMe);
1541     }
1542   }
1543
1544   int migrate_count=migrateInfo.length();
1545   LBMigrateMsg* msg = new(migrate_count,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
1546   msg->n_moves = migrate_count;
1547   for(i=0; i < migrate_count; i++) {
1548     MigrateInfo* item = (MigrateInfo*) migrateInfo[i];
1549     msg->moves[i] = *item;
1550     delete item;
1551     migrateInfo[i] = 0;
1552   }
1553   return msg;
1554 }
1555
1556 LBMigrateMsg * CentralLB::extractMigrateMsg(LBMigrateMsg *m, int p)
1557 {
1558   int nmoves = 0;
1559   int nunavail = 0;
1560   int i;
1561   for (i=0; i<m->n_moves; i++) {
1562     MigrateInfo* item = (MigrateInfo*) &m->moves[i];
1563     if (item->from_pe == p || item->to_pe == p) nmoves++;
1564   }
1565   for (i=0; i<CkNumPes();i++) {
1566     if (!m->avail_vector[i]) nunavail++;
1567   }
1568   LBMigrateMsg* msg;
1569   if (nunavail) msg = new(nmoves,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
1570   else msg = new(nmoves,0,0,0) LBMigrateMsg;
1571   msg->n_moves = nmoves;
1572   msg->level = m->level;
1573   msg->next_lb = m->next_lb;
1574   for (i=0,nmoves=0; i<m->n_moves; i++) {
1575     MigrateInfo* item = (MigrateInfo*) &m->moves[i];
1576     if (item->from_pe == p || item->to_pe == p) {
1577       msg->moves[nmoves] = *item;
1578       nmoves++;
1579     }
1580   }
1581   // copy processor data
1582   if (nunavail)
1583   for (i=0; i<CkNumPes();i++) {
1584     msg->avail_vector[i] = m->avail_vector[i];
1585     msg->expectedLoad[i] = m->expectedLoad[i];
1586   }
1587   return msg;
1588 }
1589
1590 void CentralLB::simulationWrite() {
1591   if(step() == LBSimulation::dumpStep)
1592   {
1593     // here we are supposed to dump the database
1594     int dumpFileSize = strlen(LBSimulation::dumpFile) + 4;
1595     char *dumpFileName = (char *)malloc(dumpFileSize);
1596     while (sprintf(dumpFileName, "%s.%d", LBSimulation::dumpFile, LBSimulation::dumpStep) >= dumpFileSize) {
1597       free(dumpFileName);
1598       dumpFileSize+=3;
1599       dumpFileName = (char *)malloc(dumpFileSize);
1600     }
1601     writeStatsMsgs(dumpFileName);
1602     free(dumpFileName);
1603     CmiPrintf("LBDump: Dumped the load balancing data at step %d.\n",LBSimulation::dumpStep);
1604     ++LBSimulation::dumpStep;
1605     --LBSimulation::dumpStepSize;
1606     if (LBSimulation::dumpStepSize <= 0) { // prevent stupid step sizes
1607       CmiPrintf("Charm++> Exiting...\n");
1608       CkExit();
1609     }
1610     return;
1611   }
1612 }
1613
1614 void CentralLB::simulationRead() {
1615   LBSimulation *simResults = NULL, *realResults;
1616   LBMigrateMsg *voidMessage = new (0,0,0,0) LBMigrateMsg();
1617   voidMessage->n_moves=0;
1618   for ( ;LBSimulation::simStepSize > 0; --LBSimulation::simStepSize, ++LBSimulation::simStep) {
1619     // here we are supposed to read the data from the dump database
1620     int simFileSize = strlen(LBSimulation::dumpFile) + 4;
1621     char *simFileName = (char *)malloc(simFileSize);
1622     while (sprintf(simFileName, "%s.%d", LBSimulation::dumpFile, LBSimulation::simStep) >= simFileSize) {
1623       free(simFileName);
1624       simFileSize+=3;
1625       simFileName = (char *)malloc(simFileSize);
1626     }
1627     readStatsMsgs(simFileName);
1628
1629     // allocate simResults (only the first step)
1630     if (simResults == NULL) {
1631       simResults = new LBSimulation(LBSimulation::simProcs);
1632       realResults = new LBSimulation(LBSimulation::simProcs);
1633     }
1634     else {
1635       // should be the same number of procs of the original simulation!
1636       if (!LBSimulation::procsChanged) {
1637         // it means we have a previous step, so in simResults there is data.
1638         // we can now print the real effects of the load balancer during the simulation
1639         // or print the difference between the predicted data and the real one.
1640         realResults->reset();
1641         // reset to_proc of statsData to be equal to from_proc
1642         for (int k=0; k < statsData->n_objs; ++k) statsData->to_proc[k] = statsData->from_proc[k];
1643         findSimResults(statsData, LBSimulation::simProcs, voidMessage, realResults);
1644         simResults->PrintDifferences(realResults,statsData);
1645       }
1646       simResults->reset();
1647     }
1648
1649     // now pass it to the strategy routine
1650     double startT = CkWallTimer();
1651     preprocess(statsData);
1652     CmiPrintf("%s> Strategy starts ... \n", lbname);
1653     LBMigrateMsg* migrateMsg = Strategy(statsData);
1654     CmiPrintf("%s> Strategy took %fs memory usage: CentralLB: %d KB.\n",
1655                lbname, CkWallTimer()-startT, (int)(useMem()/1000));
1656
1657     // now calculate the results of the load balancing simulation
1658     findSimResults(statsData, LBSimulation::simProcs, migrateMsg, simResults);
1659
1660     // now we have the simulation data, so print it and loop
1661     CmiPrintf("Charm++> LBSim: Simulation of load balancing step %d done.\n",LBSimulation::simStep);
1662     // **CWL** Officially recording my disdain here for using ints for bool
1663     if (LBSimulation::showDecisionsOnly) {
1664       simResults->PrintDecisions(migrateMsg, simFileName, 
1665                                  LBSimulation::simProcs);
1666     } else {
1667       simResults->PrintSimulationResults();
1668     }
1669
1670     free(simFileName);
1671     delete migrateMsg;
1672     CmiPrintf("Charm++> LBSim: Passing to the next step\n");
1673   }
1674   // deallocate simResults
1675   delete simResults;
1676   CmiPrintf("Charm++> Exiting...\n");
1677   CkExit();
1678 }
1679
1680 void CentralLB::readStatsMsgs(const char* filename) 
1681 {
1682 #if CMK_LBDB_ON
1683   int i;
1684   FILE *f = fopen(filename, "r");
1685   if (f==NULL) {
1686     CmiPrintf("Fatal Error> Cannot open LB Dump file %s!\n", filename);
1687     CmiAbort("");
1688   }
1689
1690   // at this stage, we need to rebuild the statsMsgList and
1691   // statsDataList structures. For that first deallocate the
1692   // old structures
1693   if (statsMsgsList) {
1694     for(i = 0; i < stats_msg_count; i++)
1695       delete statsMsgsList[i];
1696     delete[] statsMsgsList;
1697     statsMsgsList=0;
1698   }
1699
1700   PUP::fromDisk pd(f);
1701   PUP::machineInfo machInfo;
1702
1703   pd((char *)&machInfo, sizeof(machInfo));      // read machine info
1704   PUP::xlater p(machInfo, pd);
1705
1706   if (_lb_args.lbversion() > 1) {
1707     p|_lb_args.lbversion();             // write version number
1708     CkPrintf("LB> File version detected: %d\n", _lb_args.lbversion());
1709     CmiAssert(_lb_args.lbversion() <= LB_FORMAT_VERSION);
1710   }
1711   p|stats_msg_count;
1712
1713   CmiPrintf("readStatsMsgs for %d pes starts ... \n", stats_msg_count);
1714   if (LBSimulation::simProcs == 0) LBSimulation::simProcs = stats_msg_count;
1715   if (LBSimulation::simProcs != stats_msg_count) LBSimulation::procsChanged = true;
1716
1717   // LBSimulation::simProcs must be set
1718   statsData->pup(p);
1719
1720   CmiPrintf("Simulation for %d pes \n", LBSimulation::simProcs);
1721   CmiPrintf("n_obj: %d n_migratble: %d \n", statsData->n_objs, statsData->n_migrateobjs);
1722
1723   // file f is closed in the destructor of PUP::fromDisk
1724   CmiPrintf("ReadStatsMsg from %s completed\n", filename);
1725 #endif
1726 }
1727
1728 void CentralLB::writeStatsMsgs(const char* filename) 
1729 {
1730 #if CMK_LBDB_ON
1731   FILE *f = fopen(filename, "w");
1732   if (f==NULL) {
1733     CmiPrintf("Fatal Error> writeStatsMsgs failed to open the output file %s!\n", filename);
1734     CmiAbort("");
1735   }
1736
1737   const PUP::machineInfo &machInfo = PUP::machineInfo::current();
1738   PUP::toDisk p(f);
1739   p((char *)&machInfo, sizeof(machInfo));       // machine info
1740
1741   p|_lb_args.lbversion();               // write version number
1742   p|stats_msg_count;
1743   statsData->pup(p);
1744
1745   fclose(f);
1746
1747   CmiPrintf("WriteStatsMsgs to %s succeed!\n", filename);
1748 #endif
1749 }
1750
1751 // calculate the predicted wallclock/cpu load for every processors
1752 // considering communication overhead if considerComm is true
1753 void getPredictedLoadWithMsg(BaseLB::LDStats* stats, int count, 
1754                       LBMigrateMsg *msg, LBInfo &info, 
1755                       int considerComm)
1756 {
1757 #if CMK_LBDB_ON
1758         stats->makeCommHash();
1759
1760         // update to_proc according to migration msgs
1761         for(int i = 0; i < msg->n_moves; i++) {
1762           MigrateInfo &mInfo = msg->moves[i];
1763           int idx = stats->getHash(mInfo.obj.objID(), mInfo.obj.omID());
1764           CmiAssert(idx != -1);
1765           stats->to_proc[idx] = mInfo.to_pe;
1766         }
1767
1768         info.getInfo(stats, count, considerComm);
1769 #endif
1770 }
1771
1772
1773 void CentralLB::findSimResults(LDStats* stats, int count, LBMigrateMsg* msg, LBSimulation* simResults)
1774 {
1775     CkAssert(simResults != NULL && count == simResults->numPes);
1776     // estimate the new loads of the processors. As a first approximation, this is the
1777     // sum of the cpu times of the objects on that processor
1778     double startT = CkWallTimer();
1779     getPredictedLoadWithMsg(stats, count, msg, simResults->lbinfo, 1);
1780     CmiPrintf("getPredictedLoad finished in %fs\n", CkWallTimer()-startT);
1781 }
1782
1783 void CentralLB::pup(PUP::er &p) { 
1784   BaseLB::pup(p); 
1785   if (p.isUnpacking())  {
1786     initLB(CkLBOptions(seqno)); 
1787   }
1788   p|reduction_started;
1789   int has_statsMsg=0;
1790   if (p.isPacking()) has_statsMsg = (statsMsg!=NULL);
1791   p|has_statsMsg;
1792   if (has_statsMsg) {
1793     if (p.isUnpacking())
1794       statsMsg = new CLBStatsMsg;
1795     statsMsg->pup(p);
1796   }
1797 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1798   p | lbDecisionCount;
1799   p | resumeCount;
1800 #endif
1801         
1802 }
1803
1804 int CentralLB::useMem() { 
1805   return sizeof(CentralLB) + statsData->useMem() + 
1806          CkNumPes() * sizeof(CLBStatsMsg *);
1807 }
1808
1809
1810 /**
1811   CLBStatsMsg is not a real message now.
1812   CLBStatsMsg is used for all processors to fill in their local load and comm
1813   statistics and send to processor 0
1814 */
1815
1816 CLBStatsMsg::CLBStatsMsg(int osz, int csz) {
1817   n_objs = osz;
1818   n_comm = csz;
1819   objData = new LDObjData[osz];
1820   commData = new LDCommData[csz];
1821   avail_vector = NULL;
1822 }
1823
1824 CLBStatsMsg::~CLBStatsMsg() {
1825   delete [] objData;
1826   delete [] commData;
1827   delete [] avail_vector;
1828 }
1829
1830 void CLBStatsMsg::pup(PUP::er &p) {
1831   int i;
1832   p|from_pe;
1833   p|pe_speed;
1834   p|total_walltime;
1835   p|idletime;
1836   p|bg_walltime;
1837 #if CMK_LB_CPUTIMER
1838   p|total_cputime;
1839   p|bg_cputime;
1840 #endif
1841 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1842   p | step;
1843 #endif
1844   p|n_objs;
1845   if (p.isUnpacking()) objData = new LDObjData[n_objs];
1846   for (i=0; i<n_objs; i++) p|objData[i];
1847   p|n_comm;
1848   if (p.isUnpacking()) commData = new LDCommData[n_comm];
1849   for (i=0; i<n_comm; i++) p|commData[i];
1850
1851   int has_avail_vector;
1852   if (!p.isUnpacking()) has_avail_vector = (avail_vector != NULL);
1853   p|has_avail_vector;
1854   if (p.isUnpacking()) {
1855     if (has_avail_vector) avail_vector = new char[CkNumPes()];
1856     else avail_vector = NULL;
1857   }
1858   if (has_avail_vector) p(avail_vector, CkNumPes());
1859
1860   p(next_lb);
1861 }
1862
1863 // CkMarshalledCLBStatsMessage is used in the marshalled parameter in
1864 // the entry function, it is just used to use to pup.
1865 // I don't use CLBStatsMsg directly as marshalled parameter because
1866 // I want the data pointer stored and not to be freed by the Charm++.
1867 void CkMarshalledCLBStatsMessage::free() { 
1868   int count = msgs.size();
1869   for  (int i=0; i<count; i++) {
1870     delete msgs[i];
1871     msgs[i] = NULL;
1872   }
1873   msgs.free();
1874 }
1875
1876 void CkMarshalledCLBStatsMessage::add(CkMarshalledCLBStatsMessage &m)
1877 {
1878   int count = m.getCount();
1879   for (int i=0; i<count; i++) add(m.getMessage(i));
1880 }
1881
1882 void CkMarshalledCLBStatsMessage::pup(PUP::er &p)
1883 {
1884   int count = msgs.size();
1885   p|count;
1886   for (int i=0; i<count; i++) {
1887     CLBStatsMsg *msg;
1888     if (p.isUnpacking()) msg = new CLBStatsMsg;
1889     else { 
1890       msg = msgs[i]; CmiAssert(msg!=NULL);
1891     }
1892     msg->pup(p);
1893     if (p.isUnpacking()) add(msg);
1894   }
1895 }
1896
1897 SpanningTree::SpanningTree()
1898 {
1899         double sq = sqrt(CkNumPes()*4.0-3.0) - 1; // 1 + arity + arity*arity = CkNumPes()
1900         arity = (int)ceil(sq/2);
1901         calcParent(CkMyPe());
1902         calcNumChildren(CkMyPe());
1903 }
1904
1905 void SpanningTree::calcParent(int n)
1906 {
1907         parent=-1;
1908         if(n != 0  && arity > 0)
1909                 parent = (n-1)/arity;
1910 }
1911
1912 void SpanningTree::calcNumChildren(int n)
1913 {
1914         numChildren = 0;
1915         if (arity == 0) return;
1916         int fullNode=(CkNumPes()-1-arity)/arity;
1917         if(n <= fullNode)
1918                 numChildren = arity;
1919         if(n == fullNode+1)
1920                 numChildren = CkNumPes()-1-(fullNode+1)*arity;
1921         if(n > fullNode+1)
1922                 numChildren = 0;
1923 }
1924
1925 #include "CentralLB.def.h"
1926  
1927 /*@}*/