testing
[charm.git] / src / ck-ldb / CentralLB.C
1
2 /**
3  * \addtogroup CkLdb
4 */
5 /*@{*/
6
7 #include <charm++.h>
8 #include "ck.h"
9 #include "envelope.h"
10 #include "CentralLB.h"
11 #include "LBDBManager.h"
12 #include "LBSimulation.h"
13
14 #include <vector>
15
16 #define  DEBUGF(x)       // CmiPrintf x;
17 #define  DEBUG(x)        // x;
18
19 #if CMK_MEM_CHECKPOINT
20    /* can not handle reduction in inmem FT */
21 #define USE_REDUCTION         0
22 #define USE_LDB_SPANNING_TREE 0
23 #elif defined(_FAULT_MLOG_)
24 /* can not handle reduction in inmem FT */
25 #define USE_REDUCTION         0
26 #define USE_LDB_SPANNING_TREE 0
27 #else
28 #define USE_REDUCTION         1
29 #define USE_LDB_SPANNING_TREE 1
30 #endif
31
32 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
33 extern int _restartFlag;
34 extern void getGlobalStep(CkGroupID );
35 extern void initMlogLBStep(CkGroupID );
36 extern int globalResumeCount;
37 extern void sendDummyMigrationCounts(int *);
38 #endif
39
40 #if CMK_GRID_QUEUE_AVAILABLE
41 CpvExtern(void *, CkGridObject);
42 #endif
43
44 CkGroupID loadbalancer;
45 int * lb_ptr;
46 int load_balancer_created;
47
48 double lb_migration_cost = 0.0;
49 double lb_strategy_cost = 0.0;
50
51 int lb_no_iterations = -1;
52 double cur_max_pe_load = 0.0;
53 double cur_avg_pe_load = 0.0;
54 double prev_load = 0.0;
55
56 struct AdaptiveData {
57   int iteration;
58   double max_load;
59   double avg_load;
60 };
61
62 struct AdaptiveLBDatabase {
63   std::vector<AdaptiveData> history_data;
64 } adaptive_lbdb;
65
66 enum state {
67   OFF,
68   ON,
69   PAUSE,
70   DECIDED,
71   LOAD_BALANCE
72 } local_state;
73
74 CreateLBFunc_Def(CentralLB, "CentralLB base class")
75
76 static void getPredictedLoadWithMsg(BaseLB::LDStats* stats, int count, 
77                              LBMigrateMsg *, LBInfo &info, int considerComm);
78
79 CkReductionMsg* lbDataCollection(int nMsg, CkReductionMsg** msgs) {
80   double lb_data[4];
81   lb_data[0] = 0;
82   lb_data[1] = 0;
83   lb_data[2] = 0;
84   for (int i = 0; i < nMsg; i++) {
85     CkAssert(msgs[i]->getSize() == 4*sizeof(double));
86     double* m = (double *)msgs[i]->getData();
87     lb_data[0] += m[0];
88     lb_data[1] = ((m[1] > lb_data[1])? m[1] : lb_data[1]);
89     lb_data[2] += m[2];
90     if (i == 0) {
91       lb_data[3] = m[3];
92     }
93   }
94   return CkReductionMsg::buildNew(4*sizeof(double), lb_data);
95 }
96
97 /*global*/ CkReduction::reducerType lbDataCollectionType;
98 /*initcall*/ void registerLBDataCollection(void) {
99   lbDataCollectionType = CkReduction::addReducer(lbDataCollection);
100 }
101
102 /*
103 void CreateCentralLB()
104 {
105   CProxy_CentralLB::ckNew(0);
106 }
107 */
108
109 void CentralLB::staticStartLB(void* data)
110 {
111   CentralLB *me = (CentralLB*)(data);
112   me->StartLB();
113 }
114
115 void CentralLB::staticMigrated(void* data, LDObjHandle h, int waitBarrier)
116 {
117   CentralLB *me = (CentralLB*)(data);
118   me->Migrated(h, waitBarrier);
119 }
120
121 void CentralLB::staticAtSync(void* data)
122 {
123   CentralLB *me = (CentralLB*)(data);
124   me->AtSync();
125 }
126
127 void CentralLB::initLB(const CkLBOptions &opt)
128 {
129 #if CMK_LBDB_ON
130   lbname = "CentralLB";
131   thisProxy = CProxy_CentralLB(thisgroup);
132   //  CkPrintf("Construct in %d\n",CkMyPe());
133
134   // create and turn on by default
135   receiver = theLbdb->
136     AddLocalBarrierReceiver((LDBarrierFn)(staticAtSync),(void*)(this));
137   notifier = theLbdb->getLBDB()->
138     NotifyMigrated((LDMigratedFn)(staticMigrated),(void*)(this));
139   startLbFnHdl = theLbdb->getLBDB()->
140     AddStartLBFn((LDStartLBFn)(staticStartLB),(void*)(this));
141
142   // CkPrintf("[%d] CentralLB initLB \n",CkMyPe());
143   if (opt.getSeqNo() > 0) turnOff();
144
145   stats_msg_count = 0;
146   statsMsgsList = NULL;
147   statsData = NULL;
148
149   storedMigrateMsg = NULL;
150   reduction_started = 0;
151
152   // for future predictor
153   if (_lb_predict) predicted_model = new FutureModel(_lb_predict_window);
154   else predicted_model=0;
155   // register user interface callbacks
156   theLbdb->getLBDB()->SetupPredictor((LDPredictModelFn)(staticPredictorOn),(LDPredictWindowFn)(staticPredictorOnWin),(LDPredictFn)(staticPredictorOff),(LDPredictModelFn)(staticChangePredictor),(void*)(this));
157
158   myspeed = theLbdb->ProcessorSpeed();
159
160   migrates_completed = 0;
161   future_migrates_completed = 0;
162   migrates_expected = -1;
163   future_migrates_expected = -1;
164   cur_ld_balancer = _lb_args.central_pe();      // 0 default
165   lbdone = 0;
166   count_msgs=0;
167   statsMsg = NULL;
168
169   if (_lb_args.statsOn()) theLbdb->CollectStatsOn();
170
171   load_balancer_created = 1;
172 #endif
173 }
174
175 CentralLB::~CentralLB()
176 {
177 #if CMK_LBDB_ON
178   delete [] statsMsgsList;
179   delete statsData;
180   theLbdb = CProxy_LBDatabase(_lbdb).ckLocalBranch();
181   if (theLbdb) {
182     theLbdb->getLBDB()->
183       RemoveNotifyMigrated(notifier);
184     theLbdb->
185       RemoveStartLBFn((LDStartLBFn)(staticStartLB));
186   }
187 #endif
188 }
189
190 void CentralLB::turnOn() 
191 {
192 #if CMK_LBDB_ON
193   theLbdb->getLBDB()->
194     TurnOnBarrierReceiver(receiver);
195   theLbdb->getLBDB()->
196     TurnOnNotifyMigrated(notifier);
197   theLbdb->getLBDB()->
198     TurnOnStartLBFn(startLbFnHdl);
199 #endif
200 }
201
202 void CentralLB::turnOff() 
203 {
204 #if CMK_LBDB_ON
205   theLbdb->getLBDB()->
206     TurnOffBarrierReceiver(receiver);
207   theLbdb->getLBDB()->
208     TurnOffNotifyMigrated(notifier);
209   theLbdb->getLBDB()->
210     TurnOffStartLBFn(startLbFnHdl);
211 #endif
212 }
213
214 void CentralLB::AtSync()
215 {
216 //  CkPrintf("AtSync CEntral LB [%d]\n", CkMyPe());
217 #if CMK_LBDB_ON
218 //  DEBUGF(("[%d] CentralLB AtSync step %d!!!!!\n",CkMyPe(),step()));
219
220 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
221         CpvAccess(_currentObj)=this;
222 #endif
223
224   // if num of processor is only 1, nothing should happen
225   if (!QueryBalanceNow(step()) || CkNumPes() == 1) {
226     MigrationDone(0);
227     return;
228   }
229   if(CmiNodeAlive(CkMyPe())){
230     thisProxy [CkMyPe()].ProcessAtSyncMin();
231   }
232 #endif
233 }
234
235 #include "ComlibStrategy.h"
236
237 void CentralLB::ProcessAtSync()
238 {
239
240   // Reset all adaptive lb related fields since load balancing is being done.
241   lb_no_iterations = -1;
242   adaptive_lbdb.history_data.clear();
243   prev_load = 0.0;
244   lb_ideal_period = INT_MAX;
245   local_state = OFF;
246
247
248 #if CMK_LBDB_ON
249   if (reduction_started) return;              // reducton in progress
250
251   CmiAssert(CmiNodeAlive(CkMyPe()));
252   if (CkMyPe() == cur_ld_balancer) {
253     start_lb_time = CkWallTimer();
254   }
255
256
257 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
258         initMlogLBStep(thisgroup);
259 #endif
260
261   // build message
262   BuildStatsMsg();
263
264 #if USE_REDUCTION
265     // reduction to get total number of objects and comm
266     // so that processor 0 can pre-allocate load balancing database
267   int counts[2];
268   counts[0] = theLbdb->GetObjDataSz();
269   counts[1] = theLbdb->GetCommDataSz();
270
271   CkCallback cb(CkIndex_CentralLB::ReceiveCounts((CkReductionMsg*)NULL), 
272                   thisProxy[0]);
273   contribute(2*sizeof(int), counts, CkReduction::sum_int, cb);
274   reduction_started = 1;
275 #else
276   SendStats();
277 #endif
278 #endif
279 }
280
281 void CentralLB::ProcessAtSyncMin()
282 {
283 #if CMK_LBDB_ON
284   if (reduction_started) return;              // reducton in progress
285
286   CmiAssert(CmiNodeAlive(CkMyPe()));
287   if (CkMyPe() == cur_ld_balancer) {
288     start_lb_time = CkWallTimer();
289   }
290
291
292 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
293         initMlogLBStep(thisgroup);
294 #endif
295   
296   lb_no_iterations++;
297   CkPrintf("ProcessAtSyncMin [%d] lb_iteration [%d] lb_ideal_period [%d]\n", CkMyPe(),
298       lb_no_iterations, lb_ideal_period);
299
300   // If decision has been made and has reached the lb_period, then do load
301   // balancing, else if hasn't reached ideal_period, then resume.
302   if (local_state == DECIDED) {
303     if (lb_no_iterations < lb_ideal_period) {
304       ResumeClients(1);
305     } else {
306       local_state = LOAD_BALANCE;
307       ProcessAtSync();
308     }
309     return;
310   }
311    
312   // If the state is ON and not DECIDED, then if havn't reached lb_period, then
313   // move ahead. If has reached lb_ideal_period, then change state to PAUSE and
314   // dont resume client.
315   if (local_state == ON) {
316     if (lb_no_iterations < lb_ideal_period) {
317       ResumeClients(1);
318     } else {
319       local_state = PAUSE;
320     }
321     return;
322   }
323
324   SendMinStats();
325   ResumeClients(1);
326 #endif
327 }
328
329 void CentralLB::SendMinStats() {
330
331  double total_load;
332  double idle_time;
333  double bg_walltime;
334   theLbdb->GetTime(&total_load,&total_load, &idle_time, &bg_walltime, &bg_walltime);
335   CkPrintf("Total walltime [%d] %lf: %lf: %lf final laod: %lf\n", CkMyPe(), total_load, idle_time, bg_walltime, (total_load - idle_time));
336
337   // Since the total_load is cumulative since the last load balancing stage,
338   // Hence it is subtracted from the previous load.
339   total_load -= idle_time;
340   double tmp = total_load;
341   total_load -= prev_load;
342   prev_load = tmp; 
343
344   double lb_data[4];
345   lb_data[0] = total_load;
346   lb_data[1] = total_load;
347   lb_data[2] = 1;
348   lb_data[3] = lb_no_iterations;
349
350   CkCallback cb(CkIndex_CentralLB::ReceiveMinStats((CkReductionMsg*)NULL), 
351                   thisProxy[0]);
352   contribute(4*sizeof(double), lb_data, lbDataCollectionType, cb);
353 }
354
355 void CentralLB::ReceiveMinStats(CkReductionMsg *msg) {
356   CmiAssert(CkMyPe() == 0);
357   double* load = (LBRealType *) msg->getData();
358   double max = load[1];
359   double avg = load[0]/load[2];
360   int iteration_n = load[3];
361   CkPrintf("Iteration %d Total load : %lf Avg load: %lf Max load: %lf\n\n",lb_no_iterations, load[0], load[0]/load[2], load[1]);
362
363   // Store the data for this iteration
364   AdaptiveData data;
365   data.iteration = lb_no_iterations;
366   data.max_load = max;
367   data.avg_load = avg;
368   adaptive_lbdb.history_data.push_back(data);
369
370   // If the max/avg ratio is greater than the threshold and also this is not the
371   // step immediately after load balancing, carry out load balancing
372   if (max/avg >= 1.1 && adaptive_lbdb.history_data.size() > 4) {
373     CkPrintf("Carry out load balancing step\n");
374     lb_ideal_period = iteration_n + 1;
375     thisProxy.LoadBalanceDecision(lb_ideal_period);
376     return;
377   }
378
379   // Generate the plan for the adaptive strategy
380   if (generatePlan()) {
381     thisProxy.LoadBalanceDecision(lb_ideal_period);
382   }
383 }
384
385 bool CentralLB::generatePlan() {
386   if (adaptive_lbdb.history_data.size() <= 4) {
387     return false;
388   }
389
390   // Some heuristics for lbperiod
391   // If constant load or almost constant,
392   // then max * new_lb_period > avg * new_lb_period + lb_cost
393   double max = 0.0;
394   double avg = 0.0;
395   AdaptiveData data;
396   for (int i = 1; i < adaptive_lbdb.history_data.size(); i++) {
397     data = adaptive_lbdb.history_data[i];
398     max += data.max_load;
399     avg += data.avg_load;
400     CkPrintf("max (%d, %lf) avg (%d, %lf)\n", i, data.max_load, i, data.avg_load);
401   }
402 //  max /= (lb_no_iterations - adaptive_lbdb.history_data[0].iteration);
403 //  avg /= (lb_no_iterations - adaptive_lbdb.history_data[0].iteration);
404 //
405 //  lb_ideal_period = (lb_strategy_cost + lb_migration_cost) / (max - avg);
406 //  CkPrintf("max : %lf, avg: %lf, strat cost: %lf, migration_cost: %lf, idealperiod : %d \n",
407 //      max, avg, lb_strategy_cost, lb_migration_cost, lb_ideal_period);
408 //
409   // If linearly varying load, then find lb_period
410   // area between the max and avg curve 
411   double mslope, aslope, mc, ac;
412   getLineEq(aslope, ac, mslope, mc);
413   CkPrintf("\n max: %fx + %f; avg: %fx + %f\n", mslope, mc, aslope, ac);
414   double a = (mslope - aslope)/2;
415   double b = (mc - ac);
416   double c = -(lb_strategy_cost + lb_migration_cost);
417   //c = -2.5;
418   lb_ideal_period = getPeriodForLinear(a, b, c);
419   CkPrintf("Ideal period for linear load %d\n", lb_ideal_period);
420
421   return true;
422 }
423
424 int CentralLB::getPeriodForLinear(double a, double b, double c) {
425   if (a == 0.0) {
426     return (-c / b);
427   }
428   int x;
429   double t = (b * b) - (4*a*c);
430   t = (-b + sqrt(t)) / (2*a);
431   x = t;
432   return x;
433 }
434
435 bool CentralLB::getLineEq(double& aslope, double& ac, double& mslope, double& mc) {
436   int total = adaptive_lbdb.history_data.size();
437   int iterations = 1 + adaptive_lbdb.history_data[total - 1].iteration -
438       adaptive_lbdb.history_data[0].iteration;
439   double a1 = 0;
440   double m1 = 0;
441   double a2 = 0;
442   double m2 = 0;
443   AdaptiveData data;
444   int i = 0;
445   for (i = 0; i <= total/2; i++) {
446     data = adaptive_lbdb.history_data[i];
447     m1 += data.max_load;
448     a1 += data.avg_load;
449   }
450   m1 /= i;
451   a1 /= i;
452
453   for (i = total/2; i < total; i++) {
454     data = adaptive_lbdb.history_data[i];
455     m2 += data.max_load;
456     a2 += data.avg_load;
457   }
458   m2 /= (i - total/2);
459   a2 /= (i - total/2);
460
461   aslope = 2 * (a2 - a1) / iterations;
462   mslope = 2 * (m2 - m1) / iterations;
463   ac = adaptive_lbdb.history_data[0].avg_load;
464   mc = adaptive_lbdb.history_data[0].max_load;
465   return true;
466 }
467
468 void CentralLB::LoadBalanceDecision(int period) {
469   CkPrintf("[%d] Load balance decision made cur iteration: %d period:%d state: %d\n",CkMyPe(), lb_no_iterations, period, local_state);
470   lb_ideal_period = period;
471   if (local_state == OFF) {
472     local_state = ON;
473
474     // Send the current iteration no
475     CkCallback cb(CkIndex_CentralLB::ReceiveIterationNo((CkReductionMsg*)NULL), 
476         thisProxy[0]);
477     int tmp = lb_no_iterations;
478     contribute(sizeof(int), &tmp, CkReduction::max_int, cb);
479     return;
480   }
481
482   if (local_state == ON) {
483     local_state = DECIDED;
484     return;
485   }
486
487   // If the state is PAUSE, then its waiting for the final decision from central
488   // processor. If the decision is that the ideal period is in the future,
489   // resume. If the ideal period is now, then carry out load balancing.
490   if (local_state == PAUSE) {
491     if (lb_no_iterations < lb_ideal_period) {
492       local_state = DECIDED;
493       ResumeClients(1);
494     } else {
495       ProcessAtSync();
496     }
497   }
498 }
499
500 void CentralLB::ReceiveIterationNo(CkReductionMsg *msg) {
501   CmiAssert(CkMyPe() == 0);
502   int *it_no = (int *) msg->getData();
503   CkPrintf("^^^^ Received all iteration no and final decision %d^^^^\n", it_no[0]);
504   //thisProxy.LoadBalanceDecision(it_no[0]);
505   lb_ideal_period = (lb_ideal_period > it_no[0]) ? lb_ideal_period : it_no[0] + 1;
506   thisProxy.LoadBalanceDecision(lb_ideal_period);
507 }
508
509 // called only on 0
510 void CentralLB::ReceiveCounts(CkReductionMsg  *msg)
511 {
512   CmiAssert(CkMyPe() == 0);
513   if (statsData == NULL) statsData = new LDStats;
514
515   int *counts = (int *)msg->getData();
516   int n_objs = counts[0];
517   int n_comm = counts[1];
518
519     // resize database
520   statsData->objData.resize(n_objs);
521   statsData->from_proc.resize(n_objs);
522   statsData->to_proc.resize(n_objs);
523   statsData->commData.resize(n_comm);
524
525   DEBUGF(("[%d] ReceiveCounts: n_objs:%d n_comm:%d\n",CkMyPe(), n_objs, n_comm));
526         
527     // broadcast call to let everybody start to send stats
528   thisProxy.SendStats();
529 }
530
531 void CentralLB::BuildStatsMsg()
532 {
533 #if CMK_LBDB_ON
534   // build and send stats
535   const int osz = theLbdb->GetObjDataSz();
536   const int csz = theLbdb->GetCommDataSz();
537
538   int npes = CkNumPes();
539   CLBStatsMsg* msg = new CLBStatsMsg(osz, csz);
540   _MEMCHECK(msg);
541   msg->from_pe = CkMyPe();
542 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
543         msg->step = step();
544 #endif
545   //msg->serial = CrnRand();
546
547 /*
548   theLbdb->TotalTime(&msg->total_walltime,&msg->total_cputime);
549   theLbdb->IdleTime(&msg->idletime);
550   theLbdb->BackgroundLoad(&msg->bg_walltime,&msg->bg_cputime);
551 */
552 #if CMK_LB_CPUTIMER
553   theLbdb->GetTime(&msg->total_walltime,&msg->total_cputime,
554                    &msg->idletime, &msg->bg_walltime,&msg->bg_cputime);
555 #else
556   theLbdb->GetTime(&msg->total_walltime,&msg->total_walltime,
557                    &msg->idletime, &msg->bg_walltime,&msg->bg_walltime);
558 #endif
559
560   msg->pe_speed = myspeed;
561   DEBUGF(("Processor %d Total time (wall,cpu) = %f %f Idle = %f Bg = %f %f\n", CkMyPe(),msg->total_walltime,msg->total_cputime,msg->idletime,msg->bg_walltime,msg->bg_cputime));
562
563   msg->n_objs = osz;
564   theLbdb->GetObjData(msg->objData);
565   msg->n_comm = csz;
566   theLbdb->GetCommData(msg->commData);
567 //  theLbdb->ClearLoads();
568   DEBUGF(("PE %d BuildStatsMsg %d objs, %d comm\n",CkMyPe(),msg->n_objs,msg->n_comm));
569
570   if(CkMyPe() == cur_ld_balancer) {
571     msg->avail_vector = new char[CkNumPes()];
572     LBDatabaseObj()->get_avail_vector(msg->avail_vector);
573     msg->next_lb = LBDatabaseObj()->new_lbbalancer();
574   }
575
576   CmiAssert(statsMsg == NULL);
577   statsMsg = msg;
578 #endif
579 }
580
581
582 // called on every processor
583 void CentralLB::SendStats()
584 {
585 #if CMK_LBDB_ON
586   CmiAssert(statsMsg != NULL);
587   reduction_started = 0;
588
589 #if USE_LDB_SPANNING_TREE
590   if(CkNumPes()>1024)
591   {
592     if (CkMyPe() == cur_ld_balancer)
593       thisProxy[CkMyPe()].ReceiveStats(statsMsg);
594     else
595       thisProxy[CkMyPe()].ReceiveStatsViaTree(statsMsg);
596   }
597   else
598 #endif
599   {
600     DEBUGF(("[%d] calling ReceiveStats on step %d \n",CmiMyPe(),step()));
601     thisProxy[cur_ld_balancer].ReceiveStats(statsMsg);
602   }
603
604   statsMsg = NULL;
605
606 #ifdef __BIGSIM__
607   BgEndStreaming();
608 #endif
609
610   {
611   // enfore the barrier to wait until centralLB says no
612   LDOMHandle h;
613   h.id.id.idx = 0;
614   theLbdb->getLBDB()->RegisteringObjects(h);
615   }
616 #endif
617 }
618
619 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
620 extern int donotCountMigration;
621 #endif
622
623 void CentralLB::Migrated(LDObjHandle h, int waitBarrier)
624 {
625 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
626     if(donotCountMigration){
627         return ;
628     }
629 #endif
630
631 #if CMK_LBDB_ON
632   if (waitBarrier) {
633             migrates_completed++;
634       DEBUGF(("[%d] An object migrated! %d %d\n",CkMyPe(),migrates_completed,migrates_expected));
635     if (migrates_completed == migrates_expected) {
636       MigrationDone(1);
637     }
638   }
639   else {
640     future_migrates_completed ++;
641     DEBUGF(("[%d] An object migrated with no barrier! %d expected: %d\n",CkMyPe(),future_migrates_completed,future_migrates_expected));
642     if (future_migrates_completed == future_migrates_expected)  {
643         CheckMigrationComplete();
644     }
645   }
646 #endif
647 }
648
649 void CentralLB::MissMigrate(int waitForBarrier)
650 {
651   LDObjHandle h;
652   Migrated(h, waitForBarrier);
653 }
654
655 // build a complete data from bufferred messages
656 // not used when USE_REDUCTION = 1
657 void CentralLB::buildStats()
658 {
659     statsData->nprocs() = stats_msg_count;
660     // allocate space
661     statsData->objData.resize(statsData->n_objs);
662     statsData->from_proc.resize(statsData->n_objs);
663     statsData->to_proc.resize(statsData->n_objs);
664     statsData->commData.resize(statsData->n_comm);
665
666     int nobj = 0;
667     int ncom = 0;
668     int nmigobj = 0;
669     // copy all data in individule message to this big structure
670     for (int pe=0; pe<CkNumPes(); pe++) {
671        int i;
672        CLBStatsMsg *msg = statsMsgsList[pe];
673        if(msg == NULL) continue;
674        for (i=0; i<msg->n_objs; i++) {
675          statsData->from_proc[nobj] = statsData->to_proc[nobj] = pe;
676          statsData->objData[nobj] = msg->objData[i];
677          if (msg->objData[i].migratable) nmigobj++;
678          nobj++;
679        }
680        for (i=0; i<msg->n_comm; i++) {
681          statsData->commData[ncom] = msg->commData[i];
682          ncom++;
683        }
684        // free the memory
685        delete msg;
686        statsMsgsList[pe]=0;
687     }
688     statsData->n_migrateobjs = nmigobj;
689 }
690
691 // deposit one processor data at a time, note database is pre-allocated
692 // to have enough space
693 // used when USE_REDUCTION = 1
694 void CentralLB::depositData(CLBStatsMsg *m)
695 {
696   int i;
697   if (m == NULL) return;
698
699   const int pe = m->from_pe;
700   struct ProcStats &procStat = statsData->procs[pe];
701   procStat.pe = pe;
702   procStat.total_walltime = m->total_walltime;
703   procStat.idletime = m->idletime;
704   procStat.bg_walltime = m->bg_walltime;
705 #if CMK_LB_CPUTIMER
706   procStat.total_cputime = m->total_cputime;
707   procStat.bg_cputime = m->bg_cputime;
708 #endif
709   procStat.pe_speed = m->pe_speed;
710   //procStat.utilization = 1.0;
711   procStat.available = CmiTrue;
712   procStat.n_objs = m->n_objs;
713
714   int &nobj = statsData->n_objs;
715   int &nmigobj = statsData->n_migrateobjs;
716   for (i=0; i<m->n_objs; i++) {
717       statsData->from_proc[nobj] = statsData->to_proc[nobj] = pe;
718       statsData->objData[nobj] = m->objData[i];
719       if (m->objData[i].migratable) nmigobj++;
720       nobj++;
721       CmiAssert(nobj <= statsData->objData.capacity());
722   }
723   int &n_comm = statsData->n_comm;
724   for (i=0; i<m->n_comm; i++) {
725       statsData->commData[n_comm] = m->commData[i];
726       n_comm++;
727       CmiAssert(n_comm <= statsData->commData.capacity());
728   }
729   delete m;
730 }
731
732 void CentralLB::ReceiveStats(CkMarshalledCLBStatsMessage &msg)
733 {
734 #if CMK_LBDB_ON
735   if (statsMsgsList == NULL) {
736     statsMsgsList = new CLBStatsMsg*[CkNumPes()];
737     CmiAssert(statsMsgsList != NULL);
738     for(int i=0; i < CkNumPes(); i++)
739       statsMsgsList[i] = 0;
740   }
741   if (statsData == NULL) statsData = new LDStats;
742
743     //  loop through all CLBStatsMsg in the incoming msg
744   int count = msg.getCount();
745   for (int num = 0; num < count; num++) 
746   {
747     CLBStatsMsg *m = msg.getMessage(num);
748     CmiAssert(m!=NULL);
749     const int pe = m->from_pe;
750     DEBUGF(("Stats msg received, %d %d %d %p step %d\n", pe,stats_msg_count,m->n_objs,m,step()));
751 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))     
752 /*      
753  *  if(m->step < step()){
754  *    //TODO: if a processor is redoing an old load balance step..
755  *    //tell it that the step is done and that it should not perform any migrations
756  *      thisProxy[pe].ReceiveDummyMigration();
757  *  }*/
758 #endif
759         
760     if(!CmiNodeAlive(pe)){
761         DEBUGF(("[%d] ReceiveStats called from invalidProcessor %d\n",CkMyPe(),pe));
762         continue;
763     }
764         
765     if (m->avail_vector!=NULL) {
766       LBDatabaseObj()->set_avail_vector(m->avail_vector,  m->next_lb);
767     }
768
769     if (statsMsgsList[pe] != 0) {
770       CkPrintf("*** Unexpected CLBStatsMsg in ReceiveStats from PE %d ***\n",
771              pe);
772     } else {
773       statsMsgsList[pe] = m;
774 #if USE_REDUCTION
775       depositData(m);
776 #else
777       // store per processor data right away
778       struct ProcStats &procStat = statsData->procs[pe];
779       procStat.pe = pe;
780       procStat.total_walltime = m->total_walltime;
781       procStat.idletime = m->idletime;
782       procStat.bg_walltime = m->bg_walltime;
783 #if CMK_LB_CPUTIMER
784       procStat.total_cputime = m->total_cputime;
785       procStat.bg_cputime = m->bg_cputime;
786 #endif
787       procStat.pe_speed = m->pe_speed;
788       //procStat.utilization = 1.0;
789       procStat.available = CmiTrue;
790       procStat.n_objs = m->n_objs;
791
792       statsData->n_objs += m->n_objs;
793       statsData->n_comm += m->n_comm;
794 #endif
795       stats_msg_count++;
796     }
797   }    // end of for
798
799   const int clients = CkNumValidPes();
800   DEBUGF(("THIS POINT count = %d, clients = %d\n",stats_msg_count,clients));
801  
802   if (stats_msg_count == clients) {
803         DEBUGF(("[%d] All stats messages received \n",CmiMyPe()));
804     statsData->nprocs() = stats_msg_count;
805     thisProxy[CkMyPe()].LoadBalance();
806   }
807 #endif
808 }
809
810 /** added by Abhinav for receiving msgs via spanning tree */
811 void CentralLB::ReceiveStatsViaTree(CkMarshalledCLBStatsMessage &msg)
812 {
813 #if CMK_LBDB_ON
814         CmiAssert(CkMyPe() != 0);
815         bufMsg.add(msg);         // buffer messages
816         count_msgs++;
817         //CkPrintf("here %d\n", CkMyPe());
818         if (count_msgs == st.numChildren+1) {
819                 if(st.parent == 0)
820                 {
821                         thisProxy[0].ReceiveStats(bufMsg);
822                         //CkPrintf("from %d\n", CkMyPe());
823                 }
824                 else
825                         thisProxy[st.parent].ReceiveStatsViaTree(bufMsg);
826                 count_msgs = 0;
827                 bufMsg.free();
828         } 
829 #endif
830 }
831
832 void CentralLB::LoadBalance()
833 {
834 #if CMK_LBDB_ON
835   int proc;
836   const int clients = CkNumPes();
837
838 #if ! USE_REDUCTION
839   // build data
840   buildStats();
841 #else
842   for (proc = 0; proc < clients; proc++) statsMsgsList[proc] = NULL;
843 #endif
844
845   if (_lb_args.debug()) 
846       CmiPrintf("\nCharmLB> %s: PE [%d] step %d starting at %f Memory: %f MB\n",
847                   lbname, cur_ld_balancer, step(), start_lb_time,
848                   CmiMemoryUsage()/(1024.0*1024.0));
849
850   // if we are in simulation mode read data
851   if (LBSimulation::doSimulation) simulationRead();
852
853   char *availVector = LBDatabaseObj()->availVector();
854   for(proc = 0; proc < clients; proc++)
855       statsData->procs[proc].available = (CmiBool)availVector[proc];
856
857   preprocess(statsData);
858
859 //    CkPrintf("Before Calling Strategy\n");
860
861   if (_lb_args.printSummary()) {
862       LBInfo info(clients);
863         // not take comm data
864       info.getInfo(statsData, clients, 0);
865       LBRealType mLoad, mCpuLoad, totalLoad;
866       info.getSummary(mLoad, mCpuLoad, totalLoad);
867       int nmsgs, nbytes;
868       statsData->computeNonlocalComm(nmsgs, nbytes);
869       CkPrintf("[%d] Load Summary (before LB): max (with bg load): %f max (obj only): %f average: %f at step %d nonlocal: %d msgs %.2fKB.\n", CkMyPe(), mLoad, mCpuLoad, totalLoad/clients, step(), nmsgs, 1.0*nbytes/1024);
870 //      if (_lb_args.debug() > 1) {
871 //        for (int i=0; i<statsData->n_objs; i++)
872 //          CmiPrintf("[%d] %.10f %.10f\n", i, statsData->objData[i].minWall, statsData->objData[i].maxWall);
873 //      }
874   }
875
876 #if CMK_REPLAYSYSTEM
877   LDHandle *loadBalancer_pointers;
878   if (_replaySystem) {
879     loadBalancer_pointers = (LDHandle*)malloc(CkNumPes()*sizeof(LDHandle));
880     for (int i=0; i<statsData->n_objs; ++i) loadBalancer_pointers[statsData->from_proc[i]] = statsData->objData[i].handle.omhandle.ldb;
881   }
882 #endif
883   
884   LBMigrateMsg* migrateMsg = Strategy(statsData);
885 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
886         migrateMsg->step = step();
887 #endif
888
889 #if CMK_REPLAYSYSTEM
890   CpdHandleLBMessage(&migrateMsg);
891   if (_replaySystem) {
892     for (int i=0; i<migrateMsg->n_moves; ++i) migrateMsg->moves[i].obj.omhandle.ldb = loadBalancer_pointers[migrateMsg->moves[i].from_pe];
893     free(loadBalancer_pointers);
894   }
895 #endif
896   
897   LBDatabaseObj()->get_avail_vector(migrateMsg->avail_vector);
898   migrateMsg->next_lb = LBDatabaseObj()->new_lbbalancer();
899
900   // if this is the step at which we need to dump the database
901   simulationWrite();
902
903 //  calculate predicted load
904 //  very time consuming though, so only happen when debugging is on
905   if (_lb_args.printSummary()) {
906       LBInfo info(clients);
907         // not take comm data
908       getPredictedLoadWithMsg(statsData, clients, migrateMsg, info, 0);
909       LBRealType mLoad, mCpuLoad, totalLoad;
910       info.getSummary(mLoad, mCpuLoad, totalLoad);
911       int nmsgs, nbytes;
912       statsData->computeNonlocalComm(nmsgs, nbytes);
913       CkPrintf("[%d] Load Summary (after LB): max (with bg load): %f max (obj only): %f average: %f at step %d nonlocal: %d msgs %.2fKB useMem: %.2fKB.\n", CkMyPe(), mLoad, mCpuLoad, totalLoad/clients, step(), nmsgs, 1.0*nbytes/1024, (1.0*useMem())/1024);
914       for (int i=0; i<clients; i++)
915         migrateMsg->expectedLoad[i] = info.peLoads[i];
916   }
917
918   DEBUGF(("[%d]calling recv migration\n",CkMyPe()));
919 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_)) 
920     lbDecisionCount++;
921     migrateMsg->lbDecisionCount = lbDecisionCount;
922 #endif
923
924   envelope *env = UsrToEnv(migrateMsg);
925   if (1) {
926       // broadcast
927     thisProxy.ReceiveMigration(migrateMsg);
928   }
929   else {
930     // split the migration for each processor
931     for (int p=0; p<CkNumPes(); p++) {
932       LBMigrateMsg *m = extractMigrateMsg(migrateMsg, p);
933       thisProxy[p].ReceiveMigration(m);
934     }
935     delete migrateMsg;
936   }
937
938   // Zero out data structures for next cycle
939   // CkPrintf("zeroing out data\n");
940   statsData->clear();
941   stats_msg_count=0;
942 #endif
943 }
944
945 // test if sender and receiver in a commData is nonmigratable.
946 static int isMigratable(LDObjData **objData, int *len, int count, const LDCommData &commData)
947 {
948 #if CMK_LBDB_ON
949   for (int pe=0 ; pe<count; pe++)
950   {
951     for (int i=0; i<len[pe]; i++)
952       if (LDObjIDEqual(objData[pe][i].objID(), commData.sender.objID()) ||
953           LDObjIDEqual(objData[pe][i].objID(), commData.receiver.get_destObj().objID())) 
954       return 0;
955   }
956 #endif
957   return 1;
958 }
959
960 // rebuild LDStats and remove all non-migratble objects and related things
961 void CentralLB::removeNonMigratable(LDStats* stats, int count)
962 {
963   int i;
964
965   // check if we have non-migratable objects
966   int have = 0;
967   for (i=0; i<stats->n_objs; i++) 
968   {
969     LDObjData &odata = stats->objData[i];
970     if (!odata.migratable) {
971       have = 1; break;
972     }
973   }
974   if (have == 0) return;
975
976   CkVec<LDObjData> nonmig;
977   CkVec<int> new_from_proc, new_to_proc;
978   nonmig.resize(stats->n_migrateobjs);
979   new_from_proc.resize(stats->n_migrateobjs);
980   new_to_proc.resize(stats->n_migrateobjs);
981   int n_objs = 0;
982   for (i=0; i<stats->n_objs; i++) 
983   {
984     LDObjData &odata = stats->objData[i];
985     if (odata.migratable) {
986       nonmig[n_objs] = odata;
987       new_from_proc[n_objs] = stats->from_proc[i];
988       new_to_proc[n_objs] = stats->to_proc[i];
989       n_objs ++;
990     }
991     else {
992       stats->procs[stats->from_proc[i]].bg_walltime += odata.wallTime;
993 #if CMK_LB_CPUTIMER
994       stats->procs[stats->from_proc[i]].bg_cputime += odata.cpuTime;
995 #endif
996     }
997   }
998   CmiAssert(stats->n_migrateobjs == n_objs);
999
1000   stats->makeCommHash();
1001   
1002   CkVec<LDCommData> newCommData;
1003   newCommData.resize(stats->n_comm);
1004   int n_comm = 0;
1005   for (i=0; i<stats->n_comm; i++) 
1006   {
1007     LDCommData& cdata = stats->commData[i];
1008     if (!cdata.from_proc()) 
1009     {
1010       int idx = stats->getSendHash(cdata);
1011       CmiAssert(idx != -1);
1012       if (!stats->objData[idx].migratable) continue;
1013     }
1014     switch (cdata.receiver.get_type()) {
1015     case LD_PROC_MSG:
1016       break;
1017     case LD_OBJ_MSG:  {
1018       int idx = stats->getRecvHash(cdata);
1019       if (stats->complete_flag)
1020         CmiAssert(idx != -1);
1021       else if (idx == -1) continue;          // receiver not in this group
1022       if (!stats->objData[idx].migratable) continue;
1023       break;
1024       }
1025     case LD_OBJLIST_MSG:    // object message FIXME add multicast
1026       break;
1027     }
1028     newCommData[n_comm] = cdata;
1029     n_comm ++;
1030   }
1031
1032   if (n_objs != stats->n_objs) CmiPrintf("Removed %d nonmigratable %d comms - n_objs:%d migratable:%d\n", stats->n_objs-n_objs, stats->n_objs, stats->n_migrateobjs, stats->n_comm-n_comm);
1033
1034   // swap to new data
1035   stats->objData = nonmig;
1036   stats->from_proc = new_from_proc;
1037   stats->to_proc = new_to_proc;
1038   stats->n_objs = n_objs;
1039
1040   stats->commData = newCommData;
1041   stats->n_comm = n_comm;
1042
1043   stats->deleteCommHash();
1044   stats->makeCommHash();
1045
1046 }
1047
1048
1049 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1050 extern int restarted;
1051 #endif
1052
1053 void CentralLB::ReceiveMigration(LBMigrateMsg *m)
1054 {
1055   storedMigrateMsg = m;
1056   CkCallback cb(CkIndex_CentralLB::ProcessReceiveMigration((CkReductionMsg*)NULL),
1057                   thisProxy);
1058   contribute(0, NULL, CkReduction::max_int, cb);
1059 }
1060
1061 void CentralLB::ProcessReceiveMigration(CkReductionMsg  *msg)
1062 {
1063 #if CMK_LBDB_ON
1064         int i;
1065         LBMigrateMsg *m = storedMigrateMsg;
1066         CmiAssert(m!=NULL);
1067         delete msg;
1068
1069 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1070         int *dummyCounts;
1071
1072         DEBUGF(("[%d] Starting ReceiveMigration WITH step %d m->step %d\n",CkMyPe(),step(),m->step));
1073         // CmiPrintf("[%d] Starting ReceiveMigration step %d m->step %d\n",CkMyPe(),step(),m->step);
1074         if(step() > m->step){
1075                 char str[100];
1076                 envelope *env = UsrToEnv(m);
1077                 CmiPrintf("[%d] Object %s tProcessed %d m->TN %d\n",CmiMyPe(),mlogData->objID.toString(str),mlogData->tProcessed,env->TN);
1078                 return;
1079         }
1080         lbDecisionCount = m->lbDecisionCount;
1081 #endif
1082
1083   if (_lb_args.debug() > 1) 
1084     if (CkMyPe()%1024==0) CmiPrintf("[%d] Starting ReceiveMigration step %d at %f\n",CkMyPe(),step(), CmiWallTimer());
1085
1086   for (i=0; i<CkNumPes(); i++) theLbdb->lastLBInfo.expectedLoad[i] = m->expectedLoad[i];
1087   CmiAssert(migrates_expected <= 0 || migrates_completed == migrates_expected);
1088 /*FAULT_EVAC*/
1089   if(!CmiNodeAlive(CkMyPe())){
1090         delete m;
1091         return;
1092   }
1093   migrates_expected = 0;
1094   future_migrates_expected = 0;
1095 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1096         int sending=0;
1097     int dummy=0;
1098         LBDB *_myLBDB = theLbdb->getLBDB();
1099         if(_restartFlag){
1100         dummyCounts = new int[CmiNumPes()];
1101         bzero(dummyCounts,sizeof(int)*CmiNumPes());
1102     }
1103 #endif
1104   for(i=0; i < m->n_moves; i++) {
1105     MigrateInfo& move = m->moves[i];
1106     const int me = CkMyPe();
1107     if (move.from_pe == me && move.to_pe != me) {
1108       DEBUGF(("[%d] migrating object to %d\n",move.from_pe,move.to_pe));
1109       // migrate object, in case it is already gone, inform toPe
1110 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
1111       if (theLbdb->Migrate(move.obj,move.to_pe) == 0) 
1112          thisProxy[move.to_pe].MissMigrate(!move.async_arrival);
1113 #else
1114             if(_restartFlag == 0){
1115                 DEBUG(CmiPrintf("[%d] need to move object from %d to %d \n",CkMyPe(),move.from_pe,move.to_pe));
1116                 theLbdb->Migrate(move.obj,move.to_pe);
1117                 sending++;
1118             }else{
1119                 if(_myLBDB->validObjHandle(move.obj)){
1120                     DEBUG(CmiPrintf("[%d] need to move object from %d to %d \n",CkMyPe(),move.from_pe,move.to_pe));
1121                     theLbdb->Migrate(move.obj,move.to_pe);
1122                     sending++;
1123                 }else{
1124                     DEBUG(CmiPrintf("[%d] dummy move to pe %d detected after restart \n",CmiMyPe(),move.to_pe));
1125                     dummyCounts[move.to_pe]++;
1126                     dummy++;
1127                 }
1128             }
1129 #endif
1130     } else if (move.from_pe != me && move.to_pe == me) {
1131        DEBUGF(("[%d] expecting object from %d\n",move.to_pe,move.from_pe));
1132       if (!move.async_arrival) migrates_expected++;
1133       else future_migrates_expected++;
1134     }
1135   }
1136   DEBUGF(("[%d] in ReceiveMigration %d moves expected: %d future expected: %d\n",CkMyPe(),m->n_moves, migrates_expected, future_migrates_expected));
1137   // if (_lb_debug) CkPrintf("[%d] expecting %d objects migrating.\n", CkMyPe(), migrates_expected);
1138
1139 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1140         if(_restartFlag){
1141                 sendDummyMigrationCounts(dummyCounts);
1142                 _restartFlag  =0;
1143         delete []dummyCounts;
1144         }
1145 #endif
1146
1147
1148 #if 0
1149   if (m->n_moves ==0) {
1150     theLbdb->SetLBPeriod(theLbdb->GetLBPeriod()*2);
1151   }
1152 #endif
1153   cur_ld_balancer = m->next_lb;
1154   if((CkMyPe() == cur_ld_balancer) && (cur_ld_balancer != 0)){
1155       LBDatabaseObj()->set_avail_vector(m->avail_vector, -2);
1156   }
1157
1158   if (migrates_expected == 0 || migrates_completed == migrates_expected)
1159     MigrationDone(1);
1160   delete m;
1161
1162 //      CkEvacuatedElement();
1163 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1164 //  migrates_expected = 0;
1165 //  //  ResumeClients(1);
1166 #endif
1167 #endif
1168 }
1169
1170 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1171 void CentralLB::ReceiveDummyMigration(int globalDecisionCount){
1172     DEBUGF(("[%d] ReceiveDummyMigration called for step %d with globalDecisionCount %d\n",CkMyPe(),step(),globalDecisionCount));
1173     //TODO: this is gonna be important when a crash happens during checkpoint
1174     //the globalDecisionCount would have to be saved and compared against
1175     //a future recvMigration
1176                 
1177         thisProxy[CkMyPe()].ResumeClients(1);
1178 }
1179 #endif
1180
1181 void CentralLB::MigrationDone(int balancing)
1182 {
1183 #if CMK_LBDB_ON
1184   migrates_completed = 0;
1185   migrates_expected = -1;
1186   // clear load stats
1187   if (balancing) theLbdb->ClearLoads();
1188   // Increment to next step
1189   theLbdb->incStep();
1190         DEBUGF(("[%d] Incrementing Step %d \n",CkMyPe(),step()));
1191   // if sync resume, invoke a barrier
1192
1193 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1194     savedBalancing = balancing;
1195     startLoadBalancingMlog(&resumeCentralLbAfterChkpt,(void *)this);
1196 #endif
1197
1198   LBDatabase::Object()->MigrationDone();    // call registered callbacks
1199
1200   LoadbalanceDone(balancing);        // callback
1201 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
1202   // if sync resume invoke a barrier
1203   if (balancing && _lb_args.syncResume()) {
1204     CkCallback cb(CkIndex_CentralLB::ResumeClients((CkReductionMsg*)NULL), 
1205                   thisProxy);
1206     contribute(0, NULL, CkReduction::sum_int, cb);
1207   }
1208   else{ 
1209     if(CmiNodeAlive(CkMyPe())){
1210         thisProxy [CkMyPe()].ResumeClients(balancing);
1211     }   
1212   }     
1213 #if CMK_GRID_QUEUE_AVAILABLE
1214   CmiGridQueueDeregisterAll ();
1215   CpvAccess(CkGridObject) = NULL;
1216 #endif
1217 #endif 
1218 #endif
1219 }
1220
1221 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1222 void CentralLB::endMigrationDone(int balancing){
1223     DEBUGF(("[%d] CentralLB::endMigrationDone step %d\n",CkMyPe(),step()));
1224
1225
1226   if (balancing && _lb_args.syncResume()) {
1227     CkCallback cb(CkIndex_CentralLB::ResumeClients((CkReductionMsg*)NULL),
1228                   thisProxy);
1229     contribute(0, NULL, CkReduction::sum_int, cb);
1230   }
1231   else{
1232     if(CmiNodeAlive(CkMyPe())){
1233     DEBUGF(("[%d] Sending ResumeClients balancing %d \n",CkMyPe(),balancing));
1234     thisProxy [CkMyPe()].ResumeClients(balancing);
1235     }
1236   }
1237
1238 }
1239 #endif
1240
1241 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1242 void resumeCentralLbAfterChkpt(void *_lb){
1243     CentralLB *lb= (CentralLB *)_lb;
1244     CpvAccess(_currentObj)=lb;
1245     lb->endMigrationDone(lb->savedBalancing);
1246 }
1247 #endif
1248
1249
1250 void CentralLB::ResumeClients(CkReductionMsg *msg)
1251 {
1252   ResumeClients(1);
1253   delete msg;
1254 }
1255
1256 void CentralLB::ResumeClients(int balancing)
1257 {
1258 #if CMK_LBDB_ON
1259 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1260     resumeCount++;
1261     globalResumeCount = resumeCount;
1262 #endif
1263   DEBUGF(("[%d] Resuming clients. balancing:%d.\n",CkMyPe(),balancing));
1264   if (balancing && _lb_args.debug() && CkMyPe() == cur_ld_balancer) {
1265     double end_lb_time = CkWallTimer();
1266   }
1267
1268 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
1269   if (balancing) ComlibNotifyMigrationDone();  
1270 #endif
1271
1272   theLbdb->ResumeClients();
1273   if (balancing)  {
1274     CheckMigrationComplete();
1275     if (future_migrates_expected == 0 || 
1276             future_migrates_expected == future_migrates_completed) {
1277       CheckMigrationComplete();
1278     }
1279   }
1280 #endif
1281 }
1282
1283 /*
1284   migration of objects contains two different kinds:
1285   (1) objects want to make a barrier for migration completion
1286       (waitForBarrier is true)
1287       migrationDone() to finish and resumeClients
1288   (2) objects don't need a barrier
1289   However, next load balancing can only happen when both migrations complete
1290 */ 
1291 void CentralLB::CheckMigrationComplete()
1292 {
1293 #if CMK_LBDB_ON
1294   lbdone ++;
1295   if (lbdone == 2) {
1296     if (_lb_args.debug() && CkMyPe()==0) {
1297       double end_lb_time = CkWallTimer();
1298       CkPrintf("CharmLB> %s: PE [%d] step %d finished at %f duration %f s\n\n",
1299                 lbname, cur_ld_balancer, step()-1, end_lb_time,
1300                 end_lb_time-start_lb_time);
1301     }
1302
1303     lb_migration_cost = (CkWallTimer() - start_lb_time);
1304
1305     lbdone = 0;
1306     future_migrates_expected = -1;
1307     future_migrates_completed = 0;
1308     DEBUGF(("[%d] Migration Complete\n", CkMyPe()));
1309     // release local barrier  so that the next load balancer can go
1310     LDOMHandle h;
1311     h.id.id.idx = 0;
1312     theLbdb->getLBDB()->DoneRegisteringObjects(h);
1313     // switch to the next load balancer in the list
1314     // subtle: called from Migrated() may result in Migrated() called in next LB
1315     theLbdb->nextLoadbalancer(seqno);
1316   }
1317 #endif
1318 }
1319
1320 void CentralLB::preprocess(LDStats* stats)
1321 {
1322   if (_lb_args.ignoreBgLoad())
1323     stats->clearBgLoad();
1324
1325   // Call the predictor for the future
1326   if (_lb_predict) FuturePredictor(statsData);
1327 }
1328
1329 // default load balancing strategy
1330 LBMigrateMsg* CentralLB::Strategy(LDStats* stats)
1331 {
1332 #if CMK_LBDB_ON
1333   double strat_start_time = CkWallTimer();
1334   if (_lb_args.debug())
1335     CkPrintf("CharmLB> %s: PE [%d] strategy starting at %f\n", lbname, cur_ld_balancer, strat_start_time);
1336
1337   work(stats);
1338
1339   if (_lb_args.debug()>1)  {
1340     CkPrintf("CharmLB> Obj Map:\n");
1341     for (int i=0; i<stats->n_objs; i++) CkPrintf("%d ", stats->to_proc[i]);
1342     CkPrintf("\n");
1343   }
1344
1345   LBMigrateMsg *msg = createMigrateMsg(stats);
1346
1347   if (_lb_args.debug()) {
1348     double strat_end_time = CkWallTimer();
1349     envelope *env = UsrToEnv(msg);
1350
1351     double lbdbMemsize = LBDatabase::Object()->useMem()/1000;
1352     CkPrintf("CharmLB> %s: PE [%d] Memory: LBManager: %d KB CentralLB: %d KB\n",
1353               lbname, cur_ld_balancer, (int)lbdbMemsize, (int)(useMem()/1000));
1354     CkPrintf("CharmLB> %s: PE [%d] #Objects migrating: %d, LBMigrateMsg size: %.2f MB\n", lbname, cur_ld_balancer, msg->n_moves, env->getTotalsize()/1024.0/1024.0);
1355     CkPrintf("CharmLB> %s: PE [%d] strategy finished at %f duration %f s\n",
1356               lbname, cur_ld_balancer, strat_end_time, strat_end_time-strat_start_time);
1357     lb_strategy_cost = (strat_end_time - strat_start_time);
1358     CkPrintf("Strategy cost %f %f %f\n", strat_end_time, strat_start_time, lb_strategy_cost);
1359   }
1360
1361   return msg;
1362 #else
1363   return NULL;
1364 #endif
1365 }
1366
1367 void CentralLB::work(LDStats* stats)
1368 {
1369   // does nothing but print the database
1370   stats->print();
1371 }
1372
1373 // generate migrate message from stats->from_proc and to_proc
1374 LBMigrateMsg * CentralLB::createMigrateMsg(LDStats* stats)
1375 {
1376   int i;
1377   CkVec<MigrateInfo*> migrateInfo;
1378   for (i=0; i<stats->n_objs; i++) {
1379     LDObjData &objData = stats->objData[i];
1380     int frompe = stats->from_proc[i];
1381     int tope = stats->to_proc[i];
1382     if (frompe != tope) {
1383       //      CkPrintf("[%d] Obj %d migrating from %d to %d\n",
1384       //         CkMyPe(),obj,pe,dest);
1385       MigrateInfo *migrateMe = new MigrateInfo;
1386       migrateMe->obj = objData.handle;
1387       migrateMe->from_pe = frompe;
1388       migrateMe->to_pe = tope;
1389       migrateMe->async_arrival = objData.asyncArrival;
1390       migrateInfo.insertAtEnd(migrateMe);
1391     }
1392   }
1393
1394   int migrate_count=migrateInfo.length();
1395   LBMigrateMsg* msg = new(migrate_count,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
1396   msg->n_moves = migrate_count;
1397   for(i=0; i < migrate_count; i++) {
1398     MigrateInfo* item = (MigrateInfo*) migrateInfo[i];
1399     msg->moves[i] = *item;
1400     delete item;
1401     migrateInfo[i] = 0;
1402   }
1403   return msg;
1404 }
1405
1406 LBMigrateMsg * CentralLB::extractMigrateMsg(LBMigrateMsg *m, int p)
1407 {
1408   int nmoves = 0;
1409   int nunavail = 0;
1410   int i;
1411   for (i=0; i<m->n_moves; i++) {
1412     MigrateInfo* item = (MigrateInfo*) &m->moves[i];
1413     if (item->from_pe == p || item->to_pe == p) nmoves++;
1414   }
1415   for (i=0; i<CkNumPes();i++) {
1416     if (!m->avail_vector[i]) nunavail++;
1417   }
1418   LBMigrateMsg* msg;
1419   if (nunavail) msg = new(nmoves,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
1420   else msg = new(nmoves,0,0,0) LBMigrateMsg;
1421   msg->n_moves = nmoves;
1422   msg->level = m->level;
1423   msg->next_lb = m->next_lb;
1424   for (i=0,nmoves=0; i<m->n_moves; i++) {
1425     MigrateInfo* item = (MigrateInfo*) &m->moves[i];
1426     if (item->from_pe == p || item->to_pe == p) {
1427       msg->moves[nmoves] = *item;
1428       nmoves++;
1429     }
1430   }
1431   // copy processor data
1432   if (nunavail)
1433   for (i=0; i<CkNumPes();i++) {
1434     msg->avail_vector[i] = m->avail_vector[i];
1435     msg->expectedLoad[i] = m->expectedLoad[i];
1436   }
1437   return msg;
1438 }
1439
1440 void CentralLB::simulationWrite() {
1441   if(step() == LBSimulation::dumpStep)
1442   {
1443     // here we are supposed to dump the database
1444     int dumpFileSize = strlen(LBSimulation::dumpFile) + 4;
1445     char *dumpFileName = (char *)malloc(dumpFileSize);
1446     while (sprintf(dumpFileName, "%s.%d", LBSimulation::dumpFile, LBSimulation::dumpStep) >= dumpFileSize) {
1447       free(dumpFileName);
1448       dumpFileSize+=3;
1449       dumpFileName = (char *)malloc(dumpFileSize);
1450     }
1451     writeStatsMsgs(dumpFileName);
1452     free(dumpFileName);
1453     CmiPrintf("LBDump: Dumped the load balancing data at step %d.\n",LBSimulation::dumpStep);
1454     ++LBSimulation::dumpStep;
1455     --LBSimulation::dumpStepSize;
1456     if (LBSimulation::dumpStepSize <= 0) { // prevent stupid step sizes
1457       CmiPrintf("Charm++> Exiting...\n");
1458       CkExit();
1459     }
1460     return;
1461   }
1462 }
1463
1464 void CentralLB::simulationRead() {
1465   LBSimulation *simResults = NULL, *realResults;
1466   LBMigrateMsg *voidMessage = new (0,0,0,0) LBMigrateMsg();
1467   voidMessage->n_moves=0;
1468   for ( ;LBSimulation::simStepSize > 0; --LBSimulation::simStepSize, ++LBSimulation::simStep) {
1469     // here we are supposed to read the data from the dump database
1470     int simFileSize = strlen(LBSimulation::dumpFile) + 4;
1471     char *simFileName = (char *)malloc(simFileSize);
1472     while (sprintf(simFileName, "%s.%d", LBSimulation::dumpFile, LBSimulation::simStep) >= simFileSize) {
1473       free(simFileName);
1474       simFileSize+=3;
1475       simFileName = (char *)malloc(simFileSize);
1476     }
1477     readStatsMsgs(simFileName);
1478
1479     // allocate simResults (only the first step)
1480     if (simResults == NULL) {
1481       simResults = new LBSimulation(LBSimulation::simProcs);
1482       realResults = new LBSimulation(LBSimulation::simProcs);
1483     }
1484     else {
1485       // should be the same number of procs of the original simulation!
1486       if (!LBSimulation::procsChanged) {
1487         // it means we have a previous step, so in simResults there is data.
1488         // we can now print the real effects of the load balancer during the simulation
1489         // or print the difference between the predicted data and the real one.
1490         realResults->reset();
1491         // reset to_proc of statsData to be equal to from_proc
1492         for (int k=0; k < statsData->n_objs; ++k) statsData->to_proc[k] = statsData->from_proc[k];
1493         findSimResults(statsData, LBSimulation::simProcs, voidMessage, realResults);
1494         simResults->PrintDifferences(realResults,statsData);
1495       }
1496       simResults->reset();
1497     }
1498
1499     // now pass it to the strategy routine
1500     double startT = CkWallTimer();
1501     preprocess(statsData);
1502     CmiPrintf("%s> Strategy starts ... \n", lbname);
1503     LBMigrateMsg* migrateMsg = Strategy(statsData);
1504     CmiPrintf("%s> Strategy took %fs memory usage: CentralLB: %d KB.\n",
1505                lbname, CkWallTimer()-startT, (int)(useMem()/1000));
1506
1507     // now calculate the results of the load balancing simulation
1508     findSimResults(statsData, LBSimulation::simProcs, migrateMsg, simResults);
1509
1510     // now we have the simulation data, so print it and loop
1511     CmiPrintf("Charm++> LBSim: Simulation of load balancing step %d done.\n",LBSimulation::simStep);
1512     // **CWL** Officially recording my disdain here for using ints for bool
1513     if (LBSimulation::showDecisionsOnly) {
1514       simResults->PrintDecisions(migrateMsg, simFileName, 
1515                                  LBSimulation::simProcs);
1516     } else {
1517       simResults->PrintSimulationResults();
1518     }
1519
1520     free(simFileName);
1521     delete migrateMsg;
1522     CmiPrintf("Charm++> LBSim: Passing to the next step\n");
1523   }
1524   // deallocate simResults
1525   delete simResults;
1526   CmiPrintf("Charm++> Exiting...\n");
1527   CkExit();
1528 }
1529
1530 void CentralLB::readStatsMsgs(const char* filename) 
1531 {
1532 #if CMK_LBDB_ON
1533   int i;
1534   FILE *f = fopen(filename, "r");
1535   if (f==NULL) {
1536     CmiPrintf("Fatal Error> Cannot open LB Dump file %s!\n", filename);
1537     CmiAbort("");
1538   }
1539
1540   // at this stage, we need to rebuild the statsMsgList and
1541   // statsDataList structures. For that first deallocate the
1542   // old structures
1543   if (statsMsgsList) {
1544     for(i = 0; i < stats_msg_count; i++)
1545       delete statsMsgsList[i];
1546     delete[] statsMsgsList;
1547     statsMsgsList=0;
1548   }
1549
1550   PUP::fromDisk pd(f);
1551   PUP::machineInfo machInfo;
1552
1553   pd((char *)&machInfo, sizeof(machInfo));      // read machine info
1554   PUP::xlater p(machInfo, pd);
1555
1556   if (_lb_args.lbversion() > 1) {
1557     p|_lb_args.lbversion();             // write version number
1558     CkPrintf("LB> File version detected: %d\n", _lb_args.lbversion());
1559     CmiAssert(_lb_args.lbversion() <= LB_FORMAT_VERSION);
1560   }
1561   p|stats_msg_count;
1562
1563   CmiPrintf("readStatsMsgs for %d pes starts ... \n", stats_msg_count);
1564   if (LBSimulation::simProcs == 0) LBSimulation::simProcs = stats_msg_count;
1565   if (LBSimulation::simProcs != stats_msg_count) LBSimulation::procsChanged = true;
1566
1567   // LBSimulation::simProcs must be set
1568   statsData->pup(p);
1569
1570   CmiPrintf("Simulation for %d pes \n", LBSimulation::simProcs);
1571   CmiPrintf("n_obj: %d n_migratble: %d \n", statsData->n_objs, statsData->n_migrateobjs);
1572
1573   // file f is closed in the destructor of PUP::fromDisk
1574   CmiPrintf("ReadStatsMsg from %s completed\n", filename);
1575 #endif
1576 }
1577
1578 void CentralLB::writeStatsMsgs(const char* filename) 
1579 {
1580 #if CMK_LBDB_ON
1581   FILE *f = fopen(filename, "w");
1582   if (f==NULL) {
1583     CmiPrintf("Fatal Error> writeStatsMsgs failed to open the output file %s!\n", filename);
1584     CmiAbort("");
1585   }
1586
1587   const PUP::machineInfo &machInfo = PUP::machineInfo::current();
1588   PUP::toDisk p(f);
1589   p((char *)&machInfo, sizeof(machInfo));       // machine info
1590
1591   p|_lb_args.lbversion();               // write version number
1592   p|stats_msg_count;
1593   statsData->pup(p);
1594
1595   fclose(f);
1596
1597   CmiPrintf("WriteStatsMsgs to %s succeed!\n", filename);
1598 #endif
1599 }
1600
1601 // calculate the predicted wallclock/cpu load for every processors
1602 // considering communication overhead if considerComm is true
1603 void getPredictedLoadWithMsg(BaseLB::LDStats* stats, int count, 
1604                       LBMigrateMsg *msg, LBInfo &info, 
1605                       int considerComm)
1606 {
1607 #if CMK_LBDB_ON
1608         stats->makeCommHash();
1609
1610         // update to_proc according to migration msgs
1611         for(int i = 0; i < msg->n_moves; i++) {
1612           MigrateInfo &mInfo = msg->moves[i];
1613           int idx = stats->getHash(mInfo.obj.objID(), mInfo.obj.omID());
1614           CmiAssert(idx != -1);
1615           stats->to_proc[idx] = mInfo.to_pe;
1616         }
1617
1618         info.getInfo(stats, count, considerComm);
1619 #endif
1620 }
1621
1622
1623 void CentralLB::findSimResults(LDStats* stats, int count, LBMigrateMsg* msg, LBSimulation* simResults)
1624 {
1625     CkAssert(simResults != NULL && count == simResults->numPes);
1626     // estimate the new loads of the processors. As a first approximation, this is the
1627     // sum of the cpu times of the objects on that processor
1628     double startT = CkWallTimer();
1629     getPredictedLoadWithMsg(stats, count, msg, simResults->lbinfo, 1);
1630     CmiPrintf("getPredictedLoad finished in %fs\n", CkWallTimer()-startT);
1631 }
1632
1633 void CentralLB::pup(PUP::er &p) { 
1634   BaseLB::pup(p); 
1635   if (p.isUnpacking())  {
1636     initLB(CkLBOptions(seqno)); 
1637   }
1638   p|reduction_started;
1639   int has_statsMsg=0;
1640   if (p.isPacking()) has_statsMsg = (statsMsg!=NULL);
1641   p|has_statsMsg;
1642   if (has_statsMsg) {
1643     if (p.isUnpacking())
1644       statsMsg = new CLBStatsMsg;
1645     statsMsg->pup(p);
1646   }
1647   p|lb_ideal_period;
1648 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1649   p | lbDecisionCount;
1650   p | resumeCount;
1651 #endif
1652         
1653 }
1654
1655 int CentralLB::useMem() { 
1656   return sizeof(CentralLB) + statsData->useMem() + 
1657          CkNumPes() * sizeof(CLBStatsMsg *);
1658 }
1659
1660
1661 /**
1662   CLBStatsMsg is not a real message now.
1663   CLBStatsMsg is used for all processors to fill in their local load and comm
1664   statistics and send to processor 0
1665 */
1666
1667 CLBStatsMsg::CLBStatsMsg(int osz, int csz) {
1668   n_objs = osz;
1669   n_comm = csz;
1670   objData = new LDObjData[osz];
1671   commData = new LDCommData[csz];
1672   avail_vector = NULL;
1673 }
1674
1675 CLBStatsMsg::~CLBStatsMsg() {
1676   delete [] objData;
1677   delete [] commData;
1678   delete [] avail_vector;
1679 }
1680
1681 void CLBStatsMsg::pup(PUP::er &p) {
1682   int i;
1683   p|from_pe;
1684   p|pe_speed;
1685   p|total_walltime;
1686   p|idletime;
1687   p|bg_walltime;
1688 #if CMK_LB_CPUTIMER
1689   p|total_cputime;
1690   p|bg_cputime;
1691 #endif
1692 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1693   p | step;
1694 #endif
1695   p|n_objs;
1696   if (p.isUnpacking()) objData = new LDObjData[n_objs];
1697   for (i=0; i<n_objs; i++) p|objData[i];
1698   p|n_comm;
1699   if (p.isUnpacking()) commData = new LDCommData[n_comm];
1700   for (i=0; i<n_comm; i++) p|commData[i];
1701
1702   int has_avail_vector;
1703   if (!p.isUnpacking()) has_avail_vector = (avail_vector != NULL);
1704   p|has_avail_vector;
1705   if (p.isUnpacking()) {
1706     if (has_avail_vector) avail_vector = new char[CkNumPes()];
1707     else avail_vector = NULL;
1708   }
1709   if (has_avail_vector) p(avail_vector, CkNumPes());
1710
1711   p(next_lb);
1712 }
1713
1714 // CkMarshalledCLBStatsMessage is used in the marshalled parameter in
1715 // the entry function, it is just used to use to pup.
1716 // I don't use CLBStatsMsg directly as marshalled parameter because
1717 // I want the data pointer stored and not to be freed by the Charm++.
1718 void CkMarshalledCLBStatsMessage::free() { 
1719   int count = msgs.size();
1720   for  (int i=0; i<count; i++) {
1721     delete msgs[i];
1722     msgs[i] = NULL;
1723   }
1724   msgs.free();
1725 }
1726
1727 void CkMarshalledCLBStatsMessage::add(CkMarshalledCLBStatsMessage &m)
1728 {
1729   int count = m.getCount();
1730   for (int i=0; i<count; i++) add(m.getMessage(i));
1731 }
1732
1733 void CkMarshalledCLBStatsMessage::pup(PUP::er &p)
1734 {
1735   int count = msgs.size();
1736   p|count;
1737   for (int i=0; i<count; i++) {
1738     CLBStatsMsg *msg;
1739     if (p.isUnpacking()) msg = new CLBStatsMsg;
1740     else { 
1741       msg = msgs[i]; CmiAssert(msg!=NULL);
1742     }
1743     msg->pup(p);
1744     if (p.isUnpacking()) add(msg);
1745   }
1746 }
1747
1748 SpanningTree::SpanningTree()
1749 {
1750         double sq = sqrt(CkNumPes()*4.0-3.0) - 1; // 1 + arity + arity*arity = CkNumPes()
1751         arity = (int)ceil(sq/2);
1752         calcParent(CkMyPe());
1753         calcNumChildren(CkMyPe());
1754 }
1755
1756 void SpanningTree::calcParent(int n)
1757 {
1758         parent=-1;
1759         if(n != 0  && arity > 0)
1760                 parent = (n-1)/arity;
1761 }
1762
1763 void SpanningTree::calcNumChildren(int n)
1764 {
1765         numChildren = 0;
1766         if (arity == 0) return;
1767         int fullNode=(CkNumPes()-1-arity)/arity;
1768         if(n <= fullNode)
1769                 numChildren = arity;
1770         if(n == fullNode+1)
1771                 numChildren = CkNumPes()-1-(fullNode+1)*arity;
1772         if(n > fullNode+1)
1773                 numChildren = 0;
1774 }
1775
1776 #include "CentralLB.def.h"
1777  
1778 /*@}*/