Changes to AdaptiveLB
[charm.git] / src / ck-ldb / CentralLB.C
1 /**
2  * \addtogroup CkLdb
3 */
4 /*@{*/
5
6 #include <charm++.h>
7 #include "ck.h"
8 #include "envelope.h"
9 #include "CentralLB.h"
10 #include "LBDBManager.h"
11 #include "LBSimulation.h"
12
13 #include <vector>
14
15 #define  DEBUGF(x)       // CmiPrintf x;
16 #define  DEBUG(x)        // x;
17
18 #if CMK_MEM_CHECKPOINT
19    /* can not handle reduction in inmem FT */
20 #define USE_REDUCTION         0
21 #define USE_LDB_SPANNING_TREE 0
22 #elif defined(_FAULT_MLOG_)
23 /* can not handle reduction in inmem FT */
24 #define USE_REDUCTION         0
25 #define USE_LDB_SPANNING_TREE 0
26 #else
27 #define USE_REDUCTION         1
28 #define USE_LDB_SPANNING_TREE 1
29 #endif
30
31 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
32 extern int _restartFlag;
33 extern void getGlobalStep(CkGroupID );
34 extern void initMlogLBStep(CkGroupID );
35 extern int globalResumeCount;
36 extern void sendDummyMigrationCounts(int *);
37 #endif
38
39 #if CMK_GRID_QUEUE_AVAILABLE
40 CpvExtern(void *, CkGridObject);
41 #endif
42
43 CkGroupID loadbalancer;
44 int * lb_ptr;
45 int load_balancer_created;
46
47 double lb_migration_cost = 0.0;
48 double lb_strategy_cost = 0.0;
49
50 int lb_no_iterations = -1;
51
52 double cur_max_pe_load = 0.0;
53 double cur_avg_pe_load = 0.0;
54 double prev_load = 0.0;
55
56 bool islb_done = false;
57
58 struct AdaptiveData {
59   int iteration;
60   double max_load;
61   double avg_load;
62 };
63
64 struct AdaptiveLBDatabase {
65   std::vector<AdaptiveData> history_data;
66 } adaptive_lbdb;
67
68 CreateLBFunc_Def(CentralLB, "CentralLB base class")
69
70 static void getPredictedLoadWithMsg(BaseLB::LDStats* stats, int count, 
71                              LBMigrateMsg *, LBInfo &info, int considerComm);
72
73 CkReductionMsg* lbDataCollection(int nMsg, CkReductionMsg** msgs) {
74   double lb_data[3];
75   lb_data[0] = 0;
76   lb_data[1] = 0;
77   lb_data[2] = 0;
78   for (int i = 0; i < nMsg; i++) {
79     CkAssert(msgs[i]->getSize() == 3*sizeof(double));
80     double* m = (double *)msgs[i]->getData();
81     lb_data[0] += m[0];
82     lb_data[1] = ((m[1] > lb_data[1])? m[1] : lb_data[1]);
83     lb_data[2] += m[2];
84   }
85   return CkReductionMsg::buildNew(3*sizeof(double), lb_data);
86 }
87
88 /*global*/ CkReduction::reducerType lbDataCollectionType;
89 /*initcall*/ void registerLBDataCollection(void) {
90   lbDataCollectionType = CkReduction::addReducer(lbDataCollection);
91 }
92
93 /*
94 void CreateCentralLB()
95 {
96   CProxy_CentralLB::ckNew(0);
97 }
98 */
99
100 void CentralLB::staticStartLB(void* data)
101 {
102   CentralLB *me = (CentralLB*)(data);
103   me->StartLB();
104 }
105
106 void CentralLB::staticMigrated(void* data, LDObjHandle h, int waitBarrier)
107 {
108   CentralLB *me = (CentralLB*)(data);
109   me->Migrated(h, waitBarrier);
110 }
111
112 void CentralLB::staticAtSync(void* data)
113 {
114   CentralLB *me = (CentralLB*)(data);
115   me->AtSync();
116 }
117
118 void CentralLB::initLB(const CkLBOptions &opt)
119 {
120 #if CMK_LBDB_ON
121   lbname = "CentralLB";
122   thisProxy = CProxy_CentralLB(thisgroup);
123   //  CkPrintf("Construct in %d\n",CkMyPe());
124
125   // create and turn on by default
126   receiver = theLbdb->
127     AddLocalBarrierReceiver((LDBarrierFn)(staticAtSync),(void*)(this));
128   notifier = theLbdb->getLBDB()->
129     NotifyMigrated((LDMigratedFn)(staticMigrated),(void*)(this));
130   startLbFnHdl = theLbdb->getLBDB()->
131     AddStartLBFn((LDStartLBFn)(staticStartLB),(void*)(this));
132
133   // CkPrintf("[%d] CentralLB initLB \n",CkMyPe());
134   if (opt.getSeqNo() > 0) turnOff();
135
136   stats_msg_count = 0;
137   statsMsgsList = NULL;
138   statsData = NULL;
139
140   storedMigrateMsg = NULL;
141   reduction_started = 0;
142
143   // for future predictor
144   if (_lb_predict) predicted_model = new FutureModel(_lb_predict_window);
145   else predicted_model=0;
146   // register user interface callbacks
147   theLbdb->getLBDB()->SetupPredictor((LDPredictModelFn)(staticPredictorOn),(LDPredictWindowFn)(staticPredictorOnWin),(LDPredictFn)(staticPredictorOff),(LDPredictModelFn)(staticChangePredictor),(void*)(this));
148
149   myspeed = theLbdb->ProcessorSpeed();
150
151   migrates_completed = 0;
152   future_migrates_completed = 0;
153   migrates_expected = -1;
154   future_migrates_expected = -1;
155   cur_ld_balancer = _lb_args.central_pe();      // 0 default
156   lbdone = 0;
157   count_msgs=0;
158   statsMsg = NULL;
159
160   if (_lb_args.statsOn()) theLbdb->CollectStatsOn();
161
162   load_balancer_created = 1;
163 #endif
164 }
165
166 CentralLB::~CentralLB()
167 {
168 #if CMK_LBDB_ON
169   delete [] statsMsgsList;
170   delete statsData;
171   theLbdb = CProxy_LBDatabase(_lbdb).ckLocalBranch();
172   if (theLbdb) {
173     theLbdb->getLBDB()->
174       RemoveNotifyMigrated(notifier);
175     theLbdb->
176       RemoveStartLBFn((LDStartLBFn)(staticStartLB));
177   }
178 #endif
179 }
180
181 void CentralLB::turnOn() 
182 {
183 #if CMK_LBDB_ON
184   theLbdb->getLBDB()->
185     TurnOnBarrierReceiver(receiver);
186   theLbdb->getLBDB()->
187     TurnOnNotifyMigrated(notifier);
188   theLbdb->getLBDB()->
189     TurnOnStartLBFn(startLbFnHdl);
190 #endif
191 }
192
193 void CentralLB::turnOff() 
194 {
195 #if CMK_LBDB_ON
196   theLbdb->getLBDB()->
197     TurnOffBarrierReceiver(receiver);
198   theLbdb->getLBDB()->
199     TurnOffNotifyMigrated(notifier);
200   theLbdb->getLBDB()->
201     TurnOffStartLBFn(startLbFnHdl);
202 #endif
203 }
204
205 void CentralLB::AtSync()
206 {
207 //  CkPrintf("AtSync CEntral LB [%d]\n", CkMyPe());
208 #if CMK_LBDB_ON
209 //  DEBUGF(("[%d] CentralLB AtSync step %d!!!!!\n",CkMyPe(),step()));
210
211 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
212         CpvAccess(_currentObj)=this;
213 #endif
214
215   // if num of processor is only 1, nothing should happen
216   if (!QueryBalanceNow(step()) || CkNumPes() == 1) {
217     MigrationDone(0);
218     return;
219   }
220   if(CmiNodeAlive(CkMyPe())){
221     thisProxy [CkMyPe()].ProcessAtSyncMin();
222   }
223 #endif
224 }
225
226 #include "ComlibStrategy.h"
227
228 void CentralLB::ProcessAtSync(int ideal_lb_period) {
229   lb_ideal_period = ideal_lb_period;
230   ProcessAtSync();
231 }
232
233 void CentralLB::ProcessAtSync()
234 {
235
236   // Reset all adaptive lb related fields since load balancing is being done.
237   lb_no_iterations = -1;
238   adaptive_lbdb.history_data.clear();
239   prev_load = 0.0;
240
241 #if CMK_LBDB_ON
242   if (reduction_started) return;              // reducton in progress
243
244   CmiAssert(CmiNodeAlive(CkMyPe()));
245   if (CkMyPe() == cur_ld_balancer) {
246     start_lb_time = CkWallTimer();
247   }
248
249
250 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
251         initMlogLBStep(thisgroup);
252 #endif
253
254   // build message
255   BuildStatsMsg();
256
257 #if USE_REDUCTION
258     // reduction to get total number of objects and comm
259     // so that processor 0 can pre-allocate load balancing database
260   int counts[2];
261   counts[0] = theLbdb->GetObjDataSz();
262   counts[1] = theLbdb->GetCommDataSz();
263
264   CkCallback cb(CkIndex_CentralLB::ReceiveCounts((CkReductionMsg*)NULL), 
265                   thisProxy[0]);
266   contribute(2*sizeof(int), counts, CkReduction::sum_int, cb);
267   reduction_started = 1;
268 #else
269   SendStats();
270 #endif
271 #endif
272 }
273
274 void CentralLB::ProcessAtSyncMin()
275 {
276 #if CMK_LBDB_ON
277   if (reduction_started) return;              // reducton in progress
278
279   CmiAssert(CmiNodeAlive(CkMyPe()));
280   if (CkMyPe() == cur_ld_balancer) {
281     start_lb_time = CkWallTimer();
282   }
283
284
285 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
286         initMlogLBStep(thisgroup);
287 #endif
288   
289   lb_no_iterations++;
290   CkPrintf("ProcessAtSyncMin [%d] lb_iteration [%d] lb_ideal_period [%d]\n", CkMyPe(),
291       lb_no_iterations, lb_ideal_period);
292
293   if (lb_no_iterations >= lb_ideal_period) {
294 //    CkPrintf("Since the time is past the collection period, send MinStats of [%d]\n", CkMyPe());
295     SendMinStats();
296   } else {
297
298     SendMinStats();
299 // double total_walltime;
300 // double idle_time;
301 // double bg_walltime;
302 //  theLbdb->GetTime(&total_walltime,&total_walltime, &idle_time, &bg_walltime, &bg_walltime);
303 //    CkPrintf("No need to send MinStats of [%d] load: %lf\n\n", CkMyPe(),
304 //    total_walltime);
305 //    ResumeClients(0);
306   }
307 #endif
308 }
309
310 void CentralLB::SendMinStats() {
311
312  double total_load;
313  double idle_time;
314  double bg_walltime;
315   theLbdb->GetTime(&total_load,&total_load, &idle_time, &bg_walltime, &bg_walltime);
316   CkPrintf("Total walltime [%d] %lf: %lf: %lf final laod: %lf\n", CkMyPe(), total_load, idle_time, bg_walltime, (total_load - idle_time));
317
318   // Since the total_load is cumulative since the last load balancing stage,
319   // Hence it is subtracted from the previous load.
320   total_load -= idle_time;
321   double tmp = total_load;
322   total_load -= prev_load;
323   prev_load = tmp; 
324
325   double lb_data[3];
326   lb_data[0] = total_load;
327   lb_data[1] = total_load;
328   lb_data[2] = 1;
329
330   CkCallback cb(CkIndex_CentralLB::ReceiveMinStats((CkReductionMsg*)NULL), 
331                   thisProxy[0]);
332   contribute(3*sizeof(double), lb_data, lbDataCollectionType, cb);
333 }
334
335 void CentralLB::ReceiveMinStats(CkReductionMsg *msg) {
336   CmiAssert(CkMyPe() == 0);
337   double* load = (LBRealType *) msg->getData();
338   double max = load[1];
339   double avg = load[0]/load[2];
340   CkPrintf("Iteration %d Total load : %lf Avg load: %lf Max load: %lf\n\n",lb_no_iterations, load[0], load[0]/load[2], load[1]);
341
342 //  if (lb_no_iterations == 0 || lb_no_iterations < lb_ideal_period) {
343   if (lb_no_iterations == 0) {
344     thisProxy.ResumeClients(0);
345     return;
346   }
347
348   // Store the data for this iteration
349   AdaptiveData data;
350   data.iteration = lb_no_iterations;
351   data.max_load = max;
352   data.avg_load = avg;
353   adaptive_lbdb.history_data.push_back(data);
354
355   // If the max/avg ratio is greater than the threshold and also this is not the
356   // step immediately after load balancing, carry out load balancing
357   if (max/avg >= 1.1 && adaptive_lbdb.history_data.size() > 5) {
358     CkPrintf("Carry out load balancing step\n");
359     islb_done = true;
360     thisProxy.ProcessAtSync();
361     return;
362   }
363
364   // Generate the plan for the adaptive strategy
365   if (generatePlan()) {
366     thisProxy.ResumeClients(lb_ideal_period, 0);
367   } else {
368     thisProxy.ResumeClients(0);
369   }
370 }
371
372 bool CentralLB::generatePlan() {
373   if (adaptive_lbdb.history_data.size() <= 4) {
374     return false;
375   }
376
377   // Some heuristics for lbperiod
378   // If constant load or almost constant,
379   // then max * new_lb_period > avg * new_lb_period + lb_cost
380   double max = 0.0;
381   double avg = 0.0;
382   AdaptiveData data;
383   for (int i = 1; i < adaptive_lbdb.history_data.size(); i++) {
384     data = adaptive_lbdb.history_data[i];
385     max += data.max_load;
386     avg += data.avg_load;
387     CkPrintf("max (%d, %lf) avg (%d, %lf)\n", i, data.max_load, i, data.avg_load);
388   }
389 //  max /= (lb_no_iterations - adaptive_lbdb.history_data[0].iteration);
390 //  avg /= (lb_no_iterations - adaptive_lbdb.history_data[0].iteration);
391 //
392 //  lb_ideal_period = (lb_strategy_cost + lb_migration_cost) / (max - avg);
393 //  CkPrintf("max : %lf, avg: %lf, strat cost: %lf, migration_cost: %lf, idealperiod : %d \n",
394 //      max, avg, lb_strategy_cost, lb_migration_cost, lb_ideal_period);
395 //
396   // If linearly varying load, then find lb_period
397   // area between the max and avg curve 
398   double mslope, aslope, mc, ac;
399   getLineEq(aslope, ac, mslope, mc);
400   CkPrintf("\n max: %fx + %f; avg: %fx + %f\n", mslope, mc, aslope, ac);
401   double a = (mslope - aslope)/2;
402   double b = (mc - ac);
403   double c = -(lb_strategy_cost + lb_migration_cost);
404   //c = -2.5;
405   lb_ideal_period = getPeriodForLinear(a, b, c);
406   CkPrintf("Ideal period for linear load %d\n", lb_ideal_period);
407
408   return true;
409 }
410
411 int CentralLB::getPeriodForLinear(double a, double b, double c) {
412   if (a == 0.0) {
413     return (-c / b);
414   }
415   int x;
416   double t = (b * b) - (4*a*c);
417   t = (-b + sqrt(t)) / (2*a);
418   x = t;
419   return x;
420 }
421
422 bool CentralLB::getLineEq(double& aslope, double& ac, double& mslope, double& mc) {
423   int total = adaptive_lbdb.history_data.size();
424   int iterations = 1 + adaptive_lbdb.history_data[total - 1].iteration -
425       adaptive_lbdb.history_data[0].iteration;
426   double a1 = 0;
427   double m1 = 0;
428   double a2 = 0;
429   double m2 = 0;
430   AdaptiveData data;
431   int i = 0;
432   for (i = 0; i <= total/2; i++) {
433     data = adaptive_lbdb.history_data[i];
434     m1 += data.max_load;
435     a1 += data.avg_load;
436   }
437   m1 /= i;
438   a1 /= i;
439
440   for (i = total/2; i < total; i++) {
441     data = adaptive_lbdb.history_data[i];
442     m2 += data.max_load;
443     a2 += data.avg_load;
444   }
445   m2 /= (i - total/2);
446   a2 /= (i - total/2);
447
448   aslope = 2 * (a2 - a1) / iterations;
449   mslope = 2 * (m2 - m1) / iterations;
450   ac = adaptive_lbdb.history_data[0].avg_load;
451   mc = adaptive_lbdb.history_data[0].max_load;
452   return true;
453 }
454
455 // called only on 0
456 void CentralLB::ReceiveCounts(CkReductionMsg  *msg)
457 {
458   CmiAssert(CkMyPe() == 0);
459   if (statsData == NULL) statsData = new LDStats;
460
461   int *counts = (int *)msg->getData();
462   int n_objs = counts[0];
463   int n_comm = counts[1];
464
465     // resize database
466   statsData->objData.resize(n_objs);
467   statsData->from_proc.resize(n_objs);
468   statsData->to_proc.resize(n_objs);
469   statsData->commData.resize(n_comm);
470
471   DEBUGF(("[%d] ReceiveCounts: n_objs:%d n_comm:%d\n",CkMyPe(), n_objs, n_comm));
472         
473     // broadcast call to let everybody start to send stats
474   thisProxy.SendStats();
475 }
476
477 void CentralLB::BuildStatsMsg()
478 {
479 #if CMK_LBDB_ON
480   // build and send stats
481   const int osz = theLbdb->GetObjDataSz();
482   const int csz = theLbdb->GetCommDataSz();
483
484   int npes = CkNumPes();
485   CLBStatsMsg* msg = new CLBStatsMsg(osz, csz);
486   _MEMCHECK(msg);
487   msg->from_pe = CkMyPe();
488 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
489         msg->step = step();
490 #endif
491   //msg->serial = CrnRand();
492
493 /*
494   theLbdb->TotalTime(&msg->total_walltime,&msg->total_cputime);
495   theLbdb->IdleTime(&msg->idletime);
496   theLbdb->BackgroundLoad(&msg->bg_walltime,&msg->bg_cputime);
497 */
498 #if CMK_LB_CPUTIMER
499   theLbdb->GetTime(&msg->total_walltime,&msg->total_cputime,
500                    &msg->idletime, &msg->bg_walltime,&msg->bg_cputime);
501 #else
502   theLbdb->GetTime(&msg->total_walltime,&msg->total_walltime,
503                    &msg->idletime, &msg->bg_walltime,&msg->bg_walltime);
504 #endif
505
506   msg->pe_speed = myspeed;
507   DEBUGF(("Processor %d Total time (wall,cpu) = %f %f Idle = %f Bg = %f %f\n", CkMyPe(),msg->total_walltime,msg->total_cputime,msg->idletime,msg->bg_walltime,msg->bg_cputime));
508
509   msg->n_objs = osz;
510   theLbdb->GetObjData(msg->objData);
511   msg->n_comm = csz;
512   theLbdb->GetCommData(msg->commData);
513 //  theLbdb->ClearLoads();
514   DEBUGF(("PE %d BuildStatsMsg %d objs, %d comm\n",CkMyPe(),msg->n_objs,msg->n_comm));
515
516   if(CkMyPe() == cur_ld_balancer) {
517     msg->avail_vector = new char[CkNumPes()];
518     LBDatabaseObj()->get_avail_vector(msg->avail_vector);
519     msg->next_lb = LBDatabaseObj()->new_lbbalancer();
520   }
521
522   CmiAssert(statsMsg == NULL);
523   statsMsg = msg;
524 #endif
525 }
526
527
528 // called on every processor
529 void CentralLB::SendStats()
530 {
531 #if CMK_LBDB_ON
532   CmiAssert(statsMsg != NULL);
533   reduction_started = 0;
534
535 #if USE_LDB_SPANNING_TREE
536   if(CkNumPes()>1024)
537   {
538     if (CkMyPe() == cur_ld_balancer)
539       thisProxy[CkMyPe()].ReceiveStats(statsMsg);
540     else
541       thisProxy[CkMyPe()].ReceiveStatsViaTree(statsMsg);
542   }
543   else
544 #endif
545   {
546     DEBUGF(("[%d] calling ReceiveStats on step %d \n",CmiMyPe(),step()));
547     thisProxy[cur_ld_balancer].ReceiveStats(statsMsg);
548   }
549
550   statsMsg = NULL;
551
552 #ifdef __BIGSIM__
553   BgEndStreaming();
554 #endif
555
556   {
557   // enfore the barrier to wait until centralLB says no
558   LDOMHandle h;
559   h.id.id.idx = 0;
560   theLbdb->getLBDB()->RegisteringObjects(h);
561   }
562 #endif
563 }
564
565 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
566 extern int donotCountMigration;
567 #endif
568
569 void CentralLB::Migrated(LDObjHandle h, int waitBarrier)
570 {
571 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
572     if(donotCountMigration){
573         return ;
574     }
575 #endif
576
577 #if CMK_LBDB_ON
578   if (waitBarrier) {
579             migrates_completed++;
580       DEBUGF(("[%d] An object migrated! %d %d\n",CkMyPe(),migrates_completed,migrates_expected));
581     if (migrates_completed == migrates_expected) {
582       MigrationDone(1);
583     }
584   }
585   else {
586     future_migrates_completed ++;
587     DEBUGF(("[%d] An object migrated with no barrier! %d expected: %d\n",CkMyPe(),future_migrates_completed,future_migrates_expected));
588     if (future_migrates_completed == future_migrates_expected)  {
589         CheckMigrationComplete();
590     }
591   }
592 #endif
593 }
594
595 void CentralLB::MissMigrate(int waitForBarrier)
596 {
597   LDObjHandle h;
598   Migrated(h, waitForBarrier);
599 }
600
601 // build a complete data from bufferred messages
602 // not used when USE_REDUCTION = 1
603 void CentralLB::buildStats()
604 {
605     statsData->nprocs() = stats_msg_count;
606     // allocate space
607     statsData->objData.resize(statsData->n_objs);
608     statsData->from_proc.resize(statsData->n_objs);
609     statsData->to_proc.resize(statsData->n_objs);
610     statsData->commData.resize(statsData->n_comm);
611
612     int nobj = 0;
613     int ncom = 0;
614     int nmigobj = 0;
615     // copy all data in individule message to this big structure
616     for (int pe=0; pe<CkNumPes(); pe++) {
617        int i;
618        CLBStatsMsg *msg = statsMsgsList[pe];
619        if(msg == NULL) continue;
620        for (i=0; i<msg->n_objs; i++) {
621          statsData->from_proc[nobj] = statsData->to_proc[nobj] = pe;
622          statsData->objData[nobj] = msg->objData[i];
623          if (msg->objData[i].migratable) nmigobj++;
624          nobj++;
625        }
626        for (i=0; i<msg->n_comm; i++) {
627          statsData->commData[ncom] = msg->commData[i];
628          ncom++;
629        }
630        // free the memory
631        delete msg;
632        statsMsgsList[pe]=0;
633     }
634     statsData->n_migrateobjs = nmigobj;
635 }
636
637 // deposit one processor data at a time, note database is pre-allocated
638 // to have enough space
639 // used when USE_REDUCTION = 1
640 void CentralLB::depositData(CLBStatsMsg *m)
641 {
642   int i;
643   if (m == NULL) return;
644
645   const int pe = m->from_pe;
646   struct ProcStats &procStat = statsData->procs[pe];
647   procStat.pe = pe;
648   procStat.total_walltime = m->total_walltime;
649   procStat.idletime = m->idletime;
650   procStat.bg_walltime = m->bg_walltime;
651 #if CMK_LB_CPUTIMER
652   procStat.total_cputime = m->total_cputime;
653   procStat.bg_cputime = m->bg_cputime;
654 #endif
655   procStat.pe_speed = m->pe_speed;
656   //procStat.utilization = 1.0;
657   procStat.available = CmiTrue;
658   procStat.n_objs = m->n_objs;
659
660   int &nobj = statsData->n_objs;
661   int &nmigobj = statsData->n_migrateobjs;
662   for (i=0; i<m->n_objs; i++) {
663       statsData->from_proc[nobj] = statsData->to_proc[nobj] = pe;
664       statsData->objData[nobj] = m->objData[i];
665       if (m->objData[i].migratable) nmigobj++;
666       nobj++;
667       CmiAssert(nobj <= statsData->objData.capacity());
668   }
669   int &n_comm = statsData->n_comm;
670   for (i=0; i<m->n_comm; i++) {
671       statsData->commData[n_comm] = m->commData[i];
672       n_comm++;
673       CmiAssert(n_comm <= statsData->commData.capacity());
674   }
675   delete m;
676 }
677
678 void CentralLB::ReceiveStats(CkMarshalledCLBStatsMessage &msg)
679 {
680 #if CMK_LBDB_ON
681   if (statsMsgsList == NULL) {
682     statsMsgsList = new CLBStatsMsg*[CkNumPes()];
683     CmiAssert(statsMsgsList != NULL);
684     for(int i=0; i < CkNumPes(); i++)
685       statsMsgsList[i] = 0;
686   }
687   if (statsData == NULL) statsData = new LDStats;
688
689     //  loop through all CLBStatsMsg in the incoming msg
690   int count = msg.getCount();
691   for (int num = 0; num < count; num++) 
692   {
693     CLBStatsMsg *m = msg.getMessage(num);
694     CmiAssert(m!=NULL);
695     const int pe = m->from_pe;
696     DEBUGF(("Stats msg received, %d %d %d %p step %d\n", pe,stats_msg_count,m->n_objs,m,step()));
697 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))     
698 /*      
699  *  if(m->step < step()){
700  *    //TODO: if a processor is redoing an old load balance step..
701  *    //tell it that the step is done and that it should not perform any migrations
702  *      thisProxy[pe].ReceiveDummyMigration();
703  *  }*/
704 #endif
705         
706     if(!CmiNodeAlive(pe)){
707         DEBUGF(("[%d] ReceiveStats called from invalidProcessor %d\n",CkMyPe(),pe));
708         continue;
709     }
710         
711     if (m->avail_vector!=NULL) {
712       LBDatabaseObj()->set_avail_vector(m->avail_vector,  m->next_lb);
713     }
714
715     if (statsMsgsList[pe] != 0) {
716       CkPrintf("*** Unexpected CLBStatsMsg in ReceiveStats from PE %d ***\n",
717              pe);
718     } else {
719       statsMsgsList[pe] = m;
720 #if USE_REDUCTION
721       depositData(m);
722 #else
723       // store per processor data right away
724       struct ProcStats &procStat = statsData->procs[pe];
725       procStat.pe = pe;
726       procStat.total_walltime = m->total_walltime;
727       procStat.idletime = m->idletime;
728       procStat.bg_walltime = m->bg_walltime;
729 #if CMK_LB_CPUTIMER
730       procStat.total_cputime = m->total_cputime;
731       procStat.bg_cputime = m->bg_cputime;
732 #endif
733       procStat.pe_speed = m->pe_speed;
734       //procStat.utilization = 1.0;
735       procStat.available = CmiTrue;
736       procStat.n_objs = m->n_objs;
737
738       statsData->n_objs += m->n_objs;
739       statsData->n_comm += m->n_comm;
740 #endif
741       stats_msg_count++;
742     }
743   }    // end of for
744
745   const int clients = CkNumValidPes();
746   DEBUGF(("THIS POINT count = %d, clients = %d\n",stats_msg_count,clients));
747  
748   if (stats_msg_count == clients) {
749         DEBUGF(("[%d] All stats messages received \n",CmiMyPe()));
750     statsData->nprocs() = stats_msg_count;
751     thisProxy[CkMyPe()].LoadBalance();
752   }
753 #endif
754 }
755
756 /** added by Abhinav for receiving msgs via spanning tree */
757 void CentralLB::ReceiveStatsViaTree(CkMarshalledCLBStatsMessage &msg)
758 {
759 #if CMK_LBDB_ON
760         CmiAssert(CkMyPe() != 0);
761         bufMsg.add(msg);         // buffer messages
762         count_msgs++;
763         //CkPrintf("here %d\n", CkMyPe());
764         if (count_msgs == st.numChildren+1) {
765                 if(st.parent == 0)
766                 {
767                         thisProxy[0].ReceiveStats(bufMsg);
768                         //CkPrintf("from %d\n", CkMyPe());
769                 }
770                 else
771                         thisProxy[st.parent].ReceiveStatsViaTree(bufMsg);
772                 count_msgs = 0;
773                 bufMsg.free();
774         } 
775 #endif
776 }
777
778 void CentralLB::LoadBalance()
779 {
780 #if CMK_LBDB_ON
781   int proc;
782   const int clients = CkNumPes();
783
784 #if ! USE_REDUCTION
785   // build data
786   buildStats();
787 #else
788   for (proc = 0; proc < clients; proc++) statsMsgsList[proc] = NULL;
789 #endif
790
791   if (_lb_args.debug()) 
792       CmiPrintf("\nCharmLB> %s: PE [%d] step %d starting at %f Memory: %f MB\n",
793                   lbname, cur_ld_balancer, step(), start_lb_time,
794                   CmiMemoryUsage()/(1024.0*1024.0));
795
796   // if we are in simulation mode read data
797   if (LBSimulation::doSimulation) simulationRead();
798
799   char *availVector = LBDatabaseObj()->availVector();
800   for(proc = 0; proc < clients; proc++)
801       statsData->procs[proc].available = (CmiBool)availVector[proc];
802
803   preprocess(statsData);
804
805 //    CkPrintf("Before Calling Strategy\n");
806
807   if (_lb_args.printSummary()) {
808       LBInfo info(clients);
809         // not take comm data
810       info.getInfo(statsData, clients, 0);
811       LBRealType mLoad, mCpuLoad, totalLoad;
812       info.getSummary(mLoad, mCpuLoad, totalLoad);
813       int nmsgs, nbytes;
814       statsData->computeNonlocalComm(nmsgs, nbytes);
815       CkPrintf("[%d] Load Summary (before LB): max (with bg load): %f max (obj only): %f average: %f at step %d nonlocal: %d msgs %.2fKB.\n", CkMyPe(), mLoad, mCpuLoad, totalLoad/clients, step(), nmsgs, 1.0*nbytes/1024);
816 //      if (_lb_args.debug() > 1) {
817 //        for (int i=0; i<statsData->n_objs; i++)
818 //          CmiPrintf("[%d] %.10f %.10f\n", i, statsData->objData[i].minWall, statsData->objData[i].maxWall);
819 //      }
820   }
821
822 #if CMK_REPLAYSYSTEM
823   LDHandle *loadBalancer_pointers;
824   if (_replaySystem) {
825     loadBalancer_pointers = (LDHandle*)malloc(CkNumPes()*sizeof(LDHandle));
826     for (int i=0; i<statsData->n_objs; ++i) loadBalancer_pointers[statsData->from_proc[i]] = statsData->objData[i].handle.omhandle.ldb;
827   }
828 #endif
829   
830   LBMigrateMsg* migrateMsg = Strategy(statsData);
831 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
832         migrateMsg->step = step();
833 #endif
834
835 #if CMK_REPLAYSYSTEM
836   CpdHandleLBMessage(&migrateMsg);
837   if (_replaySystem) {
838     for (int i=0; i<migrateMsg->n_moves; ++i) migrateMsg->moves[i].obj.omhandle.ldb = loadBalancer_pointers[migrateMsg->moves[i].from_pe];
839     free(loadBalancer_pointers);
840   }
841 #endif
842   
843   LBDatabaseObj()->get_avail_vector(migrateMsg->avail_vector);
844   migrateMsg->next_lb = LBDatabaseObj()->new_lbbalancer();
845
846   // if this is the step at which we need to dump the database
847   simulationWrite();
848
849 //  calculate predicted load
850 //  very time consuming though, so only happen when debugging is on
851   if (_lb_args.printSummary()) {
852       LBInfo info(clients);
853         // not take comm data
854       getPredictedLoadWithMsg(statsData, clients, migrateMsg, info, 0);
855       LBRealType mLoad, mCpuLoad, totalLoad;
856       info.getSummary(mLoad, mCpuLoad, totalLoad);
857       int nmsgs, nbytes;
858       statsData->computeNonlocalComm(nmsgs, nbytes);
859       CkPrintf("[%d] Load Summary (after LB): max (with bg load): %f max (obj only): %f average: %f at step %d nonlocal: %d msgs %.2fKB useMem: %.2fKB.\n", CkMyPe(), mLoad, mCpuLoad, totalLoad/clients, step(), nmsgs, 1.0*nbytes/1024, (1.0*useMem())/1024);
860       for (int i=0; i<clients; i++)
861         migrateMsg->expectedLoad[i] = info.peLoads[i];
862   }
863
864   DEBUGF(("[%d]calling recv migration\n",CkMyPe()));
865 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_)) 
866     lbDecisionCount++;
867     migrateMsg->lbDecisionCount = lbDecisionCount;
868 #endif
869
870   envelope *env = UsrToEnv(migrateMsg);
871   if (1) {
872       // broadcast
873     thisProxy.ReceiveMigration(migrateMsg);
874   }
875   else {
876     // split the migration for each processor
877     for (int p=0; p<CkNumPes(); p++) {
878       LBMigrateMsg *m = extractMigrateMsg(migrateMsg, p);
879       thisProxy[p].ReceiveMigration(m);
880     }
881     delete migrateMsg;
882   }
883
884   // Zero out data structures for next cycle
885   // CkPrintf("zeroing out data\n");
886   statsData->clear();
887   stats_msg_count=0;
888 #endif
889 }
890
891 // test if sender and receiver in a commData is nonmigratable.
892 static int isMigratable(LDObjData **objData, int *len, int count, const LDCommData &commData)
893 {
894 #if CMK_LBDB_ON
895   for (int pe=0 ; pe<count; pe++)
896   {
897     for (int i=0; i<len[pe]; i++)
898       if (LDObjIDEqual(objData[pe][i].objID(), commData.sender.objID()) ||
899           LDObjIDEqual(objData[pe][i].objID(), commData.receiver.get_destObj().objID())) 
900       return 0;
901   }
902 #endif
903   return 1;
904 }
905
906 // rebuild LDStats and remove all non-migratble objects and related things
907 void CentralLB::removeNonMigratable(LDStats* stats, int count)
908 {
909   int i;
910
911   // check if we have non-migratable objects
912   int have = 0;
913   for (i=0; i<stats->n_objs; i++) 
914   {
915     LDObjData &odata = stats->objData[i];
916     if (!odata.migratable) {
917       have = 1; break;
918     }
919   }
920   if (have == 0) return;
921
922   CkVec<LDObjData> nonmig;
923   CkVec<int> new_from_proc, new_to_proc;
924   nonmig.resize(stats->n_migrateobjs);
925   new_from_proc.resize(stats->n_migrateobjs);
926   new_to_proc.resize(stats->n_migrateobjs);
927   int n_objs = 0;
928   for (i=0; i<stats->n_objs; i++) 
929   {
930     LDObjData &odata = stats->objData[i];
931     if (odata.migratable) {
932       nonmig[n_objs] = odata;
933       new_from_proc[n_objs] = stats->from_proc[i];
934       new_to_proc[n_objs] = stats->to_proc[i];
935       n_objs ++;
936     }
937     else {
938       stats->procs[stats->from_proc[i]].bg_walltime += odata.wallTime;
939 #if CMK_LB_CPUTIMER
940       stats->procs[stats->from_proc[i]].bg_cputime += odata.cpuTime;
941 #endif
942     }
943   }
944   CmiAssert(stats->n_migrateobjs == n_objs);
945
946   stats->makeCommHash();
947   
948   CkVec<LDCommData> newCommData;
949   newCommData.resize(stats->n_comm);
950   int n_comm = 0;
951   for (i=0; i<stats->n_comm; i++) 
952   {
953     LDCommData& cdata = stats->commData[i];
954     if (!cdata.from_proc()) 
955     {
956       int idx = stats->getSendHash(cdata);
957       CmiAssert(idx != -1);
958       if (!stats->objData[idx].migratable) continue;
959     }
960     switch (cdata.receiver.get_type()) {
961     case LD_PROC_MSG:
962       break;
963     case LD_OBJ_MSG:  {
964       int idx = stats->getRecvHash(cdata);
965       if (stats->complete_flag)
966         CmiAssert(idx != -1);
967       else if (idx == -1) continue;          // receiver not in this group
968       if (!stats->objData[idx].migratable) continue;
969       break;
970       }
971     case LD_OBJLIST_MSG:    // object message FIXME add multicast
972       break;
973     }
974     newCommData[n_comm] = cdata;
975     n_comm ++;
976   }
977
978   if (n_objs != stats->n_objs) CmiPrintf("Removed %d nonmigratable %d comms - n_objs:%d migratable:%d\n", stats->n_objs-n_objs, stats->n_objs, stats->n_migrateobjs, stats->n_comm-n_comm);
979
980   // swap to new data
981   stats->objData = nonmig;
982   stats->from_proc = new_from_proc;
983   stats->to_proc = new_to_proc;
984   stats->n_objs = n_objs;
985
986   stats->commData = newCommData;
987   stats->n_comm = n_comm;
988
989   stats->deleteCommHash();
990   stats->makeCommHash();
991
992 }
993
994
995 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
996 extern int restarted;
997 #endif
998
999 void CentralLB::ReceiveMigration(LBMigrateMsg *m)
1000 {
1001   storedMigrateMsg = m;
1002   CkCallback cb(CkIndex_CentralLB::ProcessReceiveMigration((CkReductionMsg*)NULL),
1003                   thisProxy);
1004   contribute(0, NULL, CkReduction::max_int, cb);
1005 }
1006
1007 void CentralLB::ProcessReceiveMigration(CkReductionMsg  *msg)
1008 {
1009 #if CMK_LBDB_ON
1010         int i;
1011         LBMigrateMsg *m = storedMigrateMsg;
1012         CmiAssert(m!=NULL);
1013         delete msg;
1014
1015 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1016         int *dummyCounts;
1017
1018         DEBUGF(("[%d] Starting ReceiveMigration WITH step %d m->step %d\n",CkMyPe(),step(),m->step));
1019         // CmiPrintf("[%d] Starting ReceiveMigration step %d m->step %d\n",CkMyPe(),step(),m->step);
1020         if(step() > m->step){
1021                 char str[100];
1022                 envelope *env = UsrToEnv(m);
1023                 CmiPrintf("[%d] Object %s tProcessed %d m->TN %d\n",CmiMyPe(),mlogData->objID.toString(str),mlogData->tProcessed,env->TN);
1024                 return;
1025         }
1026         lbDecisionCount = m->lbDecisionCount;
1027 #endif
1028
1029   if (_lb_args.debug() > 1) 
1030     if (CkMyPe()%1024==0) CmiPrintf("[%d] Starting ReceiveMigration step %d at %f\n",CkMyPe(),step(), CmiWallTimer());
1031
1032   for (i=0; i<CkNumPes(); i++) theLbdb->lastLBInfo.expectedLoad[i] = m->expectedLoad[i];
1033   CmiAssert(migrates_expected <= 0 || migrates_completed == migrates_expected);
1034 /*FAULT_EVAC*/
1035   if(!CmiNodeAlive(CkMyPe())){
1036         delete m;
1037         return;
1038   }
1039   migrates_expected = 0;
1040   future_migrates_expected = 0;
1041 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1042         int sending=0;
1043     int dummy=0;
1044         LBDB *_myLBDB = theLbdb->getLBDB();
1045         if(_restartFlag){
1046         dummyCounts = new int[CmiNumPes()];
1047         bzero(dummyCounts,sizeof(int)*CmiNumPes());
1048     }
1049 #endif
1050   for(i=0; i < m->n_moves; i++) {
1051     MigrateInfo& move = m->moves[i];
1052     const int me = CkMyPe();
1053     if (move.from_pe == me && move.to_pe != me) {
1054       DEBUGF(("[%d] migrating object to %d\n",move.from_pe,move.to_pe));
1055       // migrate object, in case it is already gone, inform toPe
1056 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
1057       if (theLbdb->Migrate(move.obj,move.to_pe) == 0) 
1058          thisProxy[move.to_pe].MissMigrate(!move.async_arrival);
1059 #else
1060             if(_restartFlag == 0){
1061                 DEBUG(CmiPrintf("[%d] need to move object from %d to %d \n",CkMyPe(),move.from_pe,move.to_pe));
1062                 theLbdb->Migrate(move.obj,move.to_pe);
1063                 sending++;
1064             }else{
1065                 if(_myLBDB->validObjHandle(move.obj)){
1066                     DEBUG(CmiPrintf("[%d] need to move object from %d to %d \n",CkMyPe(),move.from_pe,move.to_pe));
1067                     theLbdb->Migrate(move.obj,move.to_pe);
1068                     sending++;
1069                 }else{
1070                     DEBUG(CmiPrintf("[%d] dummy move to pe %d detected after restart \n",CmiMyPe(),move.to_pe));
1071                     dummyCounts[move.to_pe]++;
1072                     dummy++;
1073                 }
1074             }
1075 #endif
1076     } else if (move.from_pe != me && move.to_pe == me) {
1077        DEBUGF(("[%d] expecting object from %d\n",move.to_pe,move.from_pe));
1078       if (!move.async_arrival) migrates_expected++;
1079       else future_migrates_expected++;
1080     }
1081   }
1082   DEBUGF(("[%d] in ReceiveMigration %d moves expected: %d future expected: %d\n",CkMyPe(),m->n_moves, migrates_expected, future_migrates_expected));
1083   // if (_lb_debug) CkPrintf("[%d] expecting %d objects migrating.\n", CkMyPe(), migrates_expected);
1084
1085 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1086         if(_restartFlag){
1087                 sendDummyMigrationCounts(dummyCounts);
1088                 _restartFlag  =0;
1089         delete []dummyCounts;
1090         }
1091 #endif
1092
1093
1094 #if 0
1095   if (m->n_moves ==0) {
1096     theLbdb->SetLBPeriod(theLbdb->GetLBPeriod()*2);
1097   }
1098 #endif
1099   cur_ld_balancer = m->next_lb;
1100   if((CkMyPe() == cur_ld_balancer) && (cur_ld_balancer != 0)){
1101       LBDatabaseObj()->set_avail_vector(m->avail_vector, -2);
1102   }
1103
1104   if (migrates_expected == 0 || migrates_completed == migrates_expected)
1105     MigrationDone(1);
1106   delete m;
1107
1108 //      CkEvacuatedElement();
1109 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1110 //  migrates_expected = 0;
1111 //  //  ResumeClients(1);
1112 #endif
1113 #endif
1114 }
1115
1116 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1117 void CentralLB::ReceiveDummyMigration(int globalDecisionCount){
1118     DEBUGF(("[%d] ReceiveDummyMigration called for step %d with globalDecisionCount %d\n",CkMyPe(),step(),globalDecisionCount));
1119     //TODO: this is gonna be important when a crash happens during checkpoint
1120     //the globalDecisionCount would have to be saved and compared against
1121     //a future recvMigration
1122                 
1123         thisProxy[CkMyPe()].ResumeClients(1);
1124 }
1125 #endif
1126
1127 void CentralLB::MigrationDone(int balancing)
1128 {
1129 #if CMK_LBDB_ON
1130   migrates_completed = 0;
1131   migrates_expected = -1;
1132   // clear load stats
1133   if (balancing) theLbdb->ClearLoads();
1134   // Increment to next step
1135   theLbdb->incStep();
1136         DEBUGF(("[%d] Incrementing Step %d \n",CkMyPe(),step()));
1137   // if sync resume, invoke a barrier
1138
1139 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1140     savedBalancing = balancing;
1141     startLoadBalancingMlog(&resumeCentralLbAfterChkpt,(void *)this);
1142 #endif
1143
1144   LBDatabase::Object()->MigrationDone();    // call registered callbacks
1145
1146   LoadbalanceDone(balancing);        // callback
1147 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
1148   // if sync resume invoke a barrier
1149   if (balancing && _lb_args.syncResume()) {
1150     CkCallback cb(CkIndex_CentralLB::ResumeClients((CkReductionMsg*)NULL), 
1151                   thisProxy);
1152     contribute(0, NULL, CkReduction::sum_int, cb);
1153   }
1154   else{ 
1155     if(CmiNodeAlive(CkMyPe())){
1156         thisProxy [CkMyPe()].ResumeClients(balancing);
1157     }   
1158   }     
1159 #if CMK_GRID_QUEUE_AVAILABLE
1160   CmiGridQueueDeregisterAll ();
1161   CpvAccess(CkGridObject) = NULL;
1162 #endif
1163 #endif 
1164 #endif
1165 }
1166
1167 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1168 void CentralLB::endMigrationDone(int balancing){
1169     DEBUGF(("[%d] CentralLB::endMigrationDone step %d\n",CkMyPe(),step()));
1170
1171
1172   if (balancing && _lb_args.syncResume()) {
1173     CkCallback cb(CkIndex_CentralLB::ResumeClients((CkReductionMsg*)NULL),
1174                   thisProxy);
1175     contribute(0, NULL, CkReduction::sum_int, cb);
1176   }
1177   else{
1178     if(CmiNodeAlive(CkMyPe())){
1179     DEBUGF(("[%d] Sending ResumeClients balancing %d \n",CkMyPe(),balancing));
1180     thisProxy [CkMyPe()].ResumeClients(balancing);
1181     }
1182   }
1183
1184 }
1185 #endif
1186
1187 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1188 void resumeCentralLbAfterChkpt(void *_lb){
1189     CentralLB *lb= (CentralLB *)_lb;
1190     CpvAccess(_currentObj)=lb;
1191     lb->endMigrationDone(lb->savedBalancing);
1192 }
1193 #endif
1194
1195
1196 void CentralLB::ResumeClients(CkReductionMsg *msg)
1197 {
1198   ResumeClients(1);
1199   delete msg;
1200 }
1201
1202 void CentralLB::ResumeClients(int ideal_period, int balancing) {
1203   lb_ideal_period = ideal_period;
1204   ResumeClients(balancing);
1205 }
1206
1207 void CentralLB::ResumeClients(int balancing)
1208 {
1209 #if CMK_LBDB_ON
1210 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1211     resumeCount++;
1212     globalResumeCount = resumeCount;
1213 #endif
1214   DEBUGF(("[%d] Resuming clients. balancing:%d.\n",CkMyPe(),balancing));
1215   if (balancing && _lb_args.debug() && CkMyPe() == cur_ld_balancer) {
1216     double end_lb_time = CkWallTimer();
1217   }
1218
1219 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
1220   if (balancing) ComlibNotifyMigrationDone();  
1221 #endif
1222
1223   theLbdb->ResumeClients();
1224   if (balancing)  {
1225     CheckMigrationComplete();
1226     if (future_migrates_expected == 0 || 
1227             future_migrates_expected == future_migrates_completed) {
1228       CheckMigrationComplete();
1229     }
1230   }
1231 #endif
1232 }
1233
1234 /*
1235   migration of objects contains two different kinds:
1236   (1) objects want to make a barrier for migration completion
1237       (waitForBarrier is true)
1238       migrationDone() to finish and resumeClients
1239   (2) objects don't need a barrier
1240   However, next load balancing can only happen when both migrations complete
1241 */ 
1242 void CentralLB::CheckMigrationComplete()
1243 {
1244 #if CMK_LBDB_ON
1245   lbdone ++;
1246   if (lbdone == 2) {
1247     if (_lb_args.debug() && CkMyPe()==0) {
1248       double end_lb_time = CkWallTimer();
1249       CkPrintf("CharmLB> %s: PE [%d] step %d finished at %f duration %f s\n\n",
1250                 lbname, cur_ld_balancer, step()-1, end_lb_time,
1251                 end_lb_time-start_lb_time);
1252     }
1253
1254     lb_migration_cost = (CkWallTimer() - start_lb_time);
1255
1256     lbdone = 0;
1257     future_migrates_expected = -1;
1258     future_migrates_completed = 0;
1259     DEBUGF(("[%d] Migration Complete\n", CkMyPe()));
1260     // release local barrier  so that the next load balancer can go
1261     LDOMHandle h;
1262     h.id.id.idx = 0;
1263     theLbdb->getLBDB()->DoneRegisteringObjects(h);
1264     // switch to the next load balancer in the list
1265     // subtle: called from Migrated() may result in Migrated() called in next LB
1266     theLbdb->nextLoadbalancer(seqno);
1267   }
1268 #endif
1269 }
1270
1271 void CentralLB::preprocess(LDStats* stats)
1272 {
1273   if (_lb_args.ignoreBgLoad())
1274     stats->clearBgLoad();
1275
1276   // Call the predictor for the future
1277   if (_lb_predict) FuturePredictor(statsData);
1278 }
1279
1280 // default load balancing strategy
1281 LBMigrateMsg* CentralLB::Strategy(LDStats* stats)
1282 {
1283 #if CMK_LBDB_ON
1284   double strat_start_time = CkWallTimer();
1285   if (_lb_args.debug())
1286     CkPrintf("CharmLB> %s: PE [%d] strategy starting at %f\n", lbname, cur_ld_balancer, strat_start_time);
1287
1288   work(stats);
1289
1290   if (_lb_args.debug()>1)  {
1291     CkPrintf("CharmLB> Obj Map:\n");
1292     for (int i=0; i<stats->n_objs; i++) CkPrintf("%d ", stats->to_proc[i]);
1293     CkPrintf("\n");
1294   }
1295
1296   LBMigrateMsg *msg = createMigrateMsg(stats);
1297
1298   if (_lb_args.debug()) {
1299     double strat_end_time = CkWallTimer();
1300     envelope *env = UsrToEnv(msg);
1301
1302     double lbdbMemsize = LBDatabase::Object()->useMem()/1000;
1303     CkPrintf("CharmLB> %s: PE [%d] Memory: LBManager: %d KB CentralLB: %d KB\n",
1304               lbname, cur_ld_balancer, (int)lbdbMemsize, (int)(useMem()/1000));
1305     CkPrintf("CharmLB> %s: PE [%d] #Objects migrating: %d, LBMigrateMsg size: %.2f MB\n", lbname, cur_ld_balancer, msg->n_moves, env->getTotalsize()/1024.0/1024.0);
1306     CkPrintf("CharmLB> %s: PE [%d] strategy finished at %f duration %f s\n",
1307               lbname, cur_ld_balancer, strat_end_time, strat_end_time-strat_start_time);
1308     lb_strategy_cost = (strat_end_time - strat_start_time);
1309     CkPrintf("Strategy cost %f %f %f\n", strat_end_time, strat_start_time, lb_strategy_cost);
1310   }
1311
1312   return msg;
1313 #else
1314   return NULL;
1315 #endif
1316 }
1317
1318 void CentralLB::work(LDStats* stats)
1319 {
1320   // does nothing but print the database
1321   stats->print();
1322 }
1323
1324 // generate migrate message from stats->from_proc and to_proc
1325 LBMigrateMsg * CentralLB::createMigrateMsg(LDStats* stats)
1326 {
1327   int i;
1328   CkVec<MigrateInfo*> migrateInfo;
1329   for (i=0; i<stats->n_objs; i++) {
1330     LDObjData &objData = stats->objData[i];
1331     int frompe = stats->from_proc[i];
1332     int tope = stats->to_proc[i];
1333     if (frompe != tope) {
1334       //      CkPrintf("[%d] Obj %d migrating from %d to %d\n",
1335       //         CkMyPe(),obj,pe,dest);
1336       MigrateInfo *migrateMe = new MigrateInfo;
1337       migrateMe->obj = objData.handle;
1338       migrateMe->from_pe = frompe;
1339       migrateMe->to_pe = tope;
1340       migrateMe->async_arrival = objData.asyncArrival;
1341       migrateInfo.insertAtEnd(migrateMe);
1342     }
1343   }
1344
1345   int migrate_count=migrateInfo.length();
1346   LBMigrateMsg* msg = new(migrate_count,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
1347   msg->n_moves = migrate_count;
1348   for(i=0; i < migrate_count; i++) {
1349     MigrateInfo* item = (MigrateInfo*) migrateInfo[i];
1350     msg->moves[i] = *item;
1351     delete item;
1352     migrateInfo[i] = 0;
1353   }
1354   return msg;
1355 }
1356
1357 LBMigrateMsg * CentralLB::extractMigrateMsg(LBMigrateMsg *m, int p)
1358 {
1359   int nmoves = 0;
1360   int nunavail = 0;
1361   int i;
1362   for (i=0; i<m->n_moves; i++) {
1363     MigrateInfo* item = (MigrateInfo*) &m->moves[i];
1364     if (item->from_pe == p || item->to_pe == p) nmoves++;
1365   }
1366   for (i=0; i<CkNumPes();i++) {
1367     if (!m->avail_vector[i]) nunavail++;
1368   }
1369   LBMigrateMsg* msg;
1370   if (nunavail) msg = new(nmoves,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
1371   else msg = new(nmoves,0,0,0) LBMigrateMsg;
1372   msg->n_moves = nmoves;
1373   msg->level = m->level;
1374   msg->next_lb = m->next_lb;
1375   for (i=0,nmoves=0; i<m->n_moves; i++) {
1376     MigrateInfo* item = (MigrateInfo*) &m->moves[i];
1377     if (item->from_pe == p || item->to_pe == p) {
1378       msg->moves[nmoves] = *item;
1379       nmoves++;
1380     }
1381   }
1382   // copy processor data
1383   if (nunavail)
1384   for (i=0; i<CkNumPes();i++) {
1385     msg->avail_vector[i] = m->avail_vector[i];
1386     msg->expectedLoad[i] = m->expectedLoad[i];
1387   }
1388   return msg;
1389 }
1390
1391 void CentralLB::simulationWrite() {
1392   if(step() == LBSimulation::dumpStep)
1393   {
1394     // here we are supposed to dump the database
1395     int dumpFileSize = strlen(LBSimulation::dumpFile) + 4;
1396     char *dumpFileName = (char *)malloc(dumpFileSize);
1397     while (sprintf(dumpFileName, "%s.%d", LBSimulation::dumpFile, LBSimulation::dumpStep) >= dumpFileSize) {
1398       free(dumpFileName);
1399       dumpFileSize+=3;
1400       dumpFileName = (char *)malloc(dumpFileSize);
1401     }
1402     writeStatsMsgs(dumpFileName);
1403     free(dumpFileName);
1404     CmiPrintf("LBDump: Dumped the load balancing data at step %d.\n",LBSimulation::dumpStep);
1405     ++LBSimulation::dumpStep;
1406     --LBSimulation::dumpStepSize;
1407     if (LBSimulation::dumpStepSize <= 0) { // prevent stupid step sizes
1408       CmiPrintf("Charm++> Exiting...\n");
1409       CkExit();
1410     }
1411     return;
1412   }
1413 }
1414
1415 void CentralLB::simulationRead() {
1416   LBSimulation *simResults = NULL, *realResults;
1417   LBMigrateMsg *voidMessage = new (0,0,0,0) LBMigrateMsg();
1418   voidMessage->n_moves=0;
1419   for ( ;LBSimulation::simStepSize > 0; --LBSimulation::simStepSize, ++LBSimulation::simStep) {
1420     // here we are supposed to read the data from the dump database
1421     int simFileSize = strlen(LBSimulation::dumpFile) + 4;
1422     char *simFileName = (char *)malloc(simFileSize);
1423     while (sprintf(simFileName, "%s.%d", LBSimulation::dumpFile, LBSimulation::simStep) >= simFileSize) {
1424       free(simFileName);
1425       simFileSize+=3;
1426       simFileName = (char *)malloc(simFileSize);
1427     }
1428     readStatsMsgs(simFileName);
1429
1430     // allocate simResults (only the first step)
1431     if (simResults == NULL) {
1432       simResults = new LBSimulation(LBSimulation::simProcs);
1433       realResults = new LBSimulation(LBSimulation::simProcs);
1434     }
1435     else {
1436       // should be the same number of procs of the original simulation!
1437       if (!LBSimulation::procsChanged) {
1438         // it means we have a previous step, so in simResults there is data.
1439         // we can now print the real effects of the load balancer during the simulation
1440         // or print the difference between the predicted data and the real one.
1441         realResults->reset();
1442         // reset to_proc of statsData to be equal to from_proc
1443         for (int k=0; k < statsData->n_objs; ++k) statsData->to_proc[k] = statsData->from_proc[k];
1444         findSimResults(statsData, LBSimulation::simProcs, voidMessage, realResults);
1445         simResults->PrintDifferences(realResults,statsData);
1446       }
1447       simResults->reset();
1448     }
1449
1450     // now pass it to the strategy routine
1451     double startT = CkWallTimer();
1452     preprocess(statsData);
1453     CmiPrintf("%s> Strategy starts ... \n", lbname);
1454     LBMigrateMsg* migrateMsg = Strategy(statsData);
1455     CmiPrintf("%s> Strategy took %fs memory usage: CentralLB: %d KB.\n",
1456                lbname, CkWallTimer()-startT, (int)(useMem()/1000));
1457
1458     // now calculate the results of the load balancing simulation
1459     findSimResults(statsData, LBSimulation::simProcs, migrateMsg, simResults);
1460
1461     // now we have the simulation data, so print it and loop
1462     CmiPrintf("Charm++> LBSim: Simulation of load balancing step %d done.\n",LBSimulation::simStep);
1463     // **CWL** Officially recording my disdain here for using ints for bool
1464     if (LBSimulation::showDecisionsOnly) {
1465       simResults->PrintDecisions(migrateMsg, simFileName, 
1466                                  LBSimulation::simProcs);
1467     } else {
1468       simResults->PrintSimulationResults();
1469     }
1470
1471     free(simFileName);
1472     delete migrateMsg;
1473     CmiPrintf("Charm++> LBSim: Passing to the next step\n");
1474   }
1475   // deallocate simResults
1476   delete simResults;
1477   CmiPrintf("Charm++> Exiting...\n");
1478   CkExit();
1479 }
1480
1481 void CentralLB::readStatsMsgs(const char* filename) 
1482 {
1483 #if CMK_LBDB_ON
1484   int i;
1485   FILE *f = fopen(filename, "r");
1486   if (f==NULL) {
1487     CmiPrintf("Fatal Error> Cannot open LB Dump file %s!\n", filename);
1488     CmiAbort("");
1489   }
1490
1491   // at this stage, we need to rebuild the statsMsgList and
1492   // statsDataList structures. For that first deallocate the
1493   // old structures
1494   if (statsMsgsList) {
1495     for(i = 0; i < stats_msg_count; i++)
1496       delete statsMsgsList[i];
1497     delete[] statsMsgsList;
1498     statsMsgsList=0;
1499   }
1500
1501   PUP::fromDisk pd(f);
1502   PUP::machineInfo machInfo;
1503
1504   pd((char *)&machInfo, sizeof(machInfo));      // read machine info
1505   PUP::xlater p(machInfo, pd);
1506
1507   if (_lb_args.lbversion() > 1) {
1508     p|_lb_args.lbversion();             // write version number
1509     CkPrintf("LB> File version detected: %d\n", _lb_args.lbversion());
1510     CmiAssert(_lb_args.lbversion() <= LB_FORMAT_VERSION);
1511   }
1512   p|stats_msg_count;
1513
1514   CmiPrintf("readStatsMsgs for %d pes starts ... \n", stats_msg_count);
1515   if (LBSimulation::simProcs == 0) LBSimulation::simProcs = stats_msg_count;
1516   if (LBSimulation::simProcs != stats_msg_count) LBSimulation::procsChanged = true;
1517
1518   // LBSimulation::simProcs must be set
1519   statsData->pup(p);
1520
1521   CmiPrintf("Simulation for %d pes \n", LBSimulation::simProcs);
1522   CmiPrintf("n_obj: %d n_migratble: %d \n", statsData->n_objs, statsData->n_migrateobjs);
1523
1524   // file f is closed in the destructor of PUP::fromDisk
1525   CmiPrintf("ReadStatsMsg from %s completed\n", filename);
1526 #endif
1527 }
1528
1529 void CentralLB::writeStatsMsgs(const char* filename) 
1530 {
1531 #if CMK_LBDB_ON
1532   FILE *f = fopen(filename, "w");
1533   if (f==NULL) {
1534     CmiPrintf("Fatal Error> writeStatsMsgs failed to open the output file %s!\n", filename);
1535     CmiAbort("");
1536   }
1537
1538   const PUP::machineInfo &machInfo = PUP::machineInfo::current();
1539   PUP::toDisk p(f);
1540   p((char *)&machInfo, sizeof(machInfo));       // machine info
1541
1542   p|_lb_args.lbversion();               // write version number
1543   p|stats_msg_count;
1544   statsData->pup(p);
1545
1546   fclose(f);
1547
1548   CmiPrintf("WriteStatsMsgs to %s succeed!\n", filename);
1549 #endif
1550 }
1551
1552 // calculate the predicted wallclock/cpu load for every processors
1553 // considering communication overhead if considerComm is true
1554 void getPredictedLoadWithMsg(BaseLB::LDStats* stats, int count, 
1555                       LBMigrateMsg *msg, LBInfo &info, 
1556                       int considerComm)
1557 {
1558 #if CMK_LBDB_ON
1559         stats->makeCommHash();
1560
1561         // update to_proc according to migration msgs
1562         for(int i = 0; i < msg->n_moves; i++) {
1563           MigrateInfo &mInfo = msg->moves[i];
1564           int idx = stats->getHash(mInfo.obj.objID(), mInfo.obj.omID());
1565           CmiAssert(idx != -1);
1566           stats->to_proc[idx] = mInfo.to_pe;
1567         }
1568
1569         info.getInfo(stats, count, considerComm);
1570 #endif
1571 }
1572
1573
1574 void CentralLB::findSimResults(LDStats* stats, int count, LBMigrateMsg* msg, LBSimulation* simResults)
1575 {
1576     CkAssert(simResults != NULL && count == simResults->numPes);
1577     // estimate the new loads of the processors. As a first approximation, this is the
1578     // sum of the cpu times of the objects on that processor
1579     double startT = CkWallTimer();
1580     getPredictedLoadWithMsg(stats, count, msg, simResults->lbinfo, 1);
1581     CmiPrintf("getPredictedLoad finished in %fs\n", CkWallTimer()-startT);
1582 }
1583
1584 void CentralLB::pup(PUP::er &p) { 
1585   BaseLB::pup(p); 
1586   if (p.isUnpacking())  {
1587     initLB(CkLBOptions(seqno)); 
1588   }
1589   p|reduction_started;
1590   int has_statsMsg=0;
1591   if (p.isPacking()) has_statsMsg = (statsMsg!=NULL);
1592   p|has_statsMsg;
1593   if (has_statsMsg) {
1594     if (p.isUnpacking())
1595       statsMsg = new CLBStatsMsg;
1596     statsMsg->pup(p);
1597   }
1598   p|lb_ideal_period;
1599 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1600   p | lbDecisionCount;
1601   p | resumeCount;
1602 #endif
1603         
1604 }
1605
1606 int CentralLB::useMem() { 
1607   return sizeof(CentralLB) + statsData->useMem() + 
1608          CkNumPes() * sizeof(CLBStatsMsg *);
1609 }
1610
1611
1612 /**
1613   CLBStatsMsg is not a real message now.
1614   CLBStatsMsg is used for all processors to fill in their local load and comm
1615   statistics and send to processor 0
1616 */
1617
1618 CLBStatsMsg::CLBStatsMsg(int osz, int csz) {
1619   n_objs = osz;
1620   n_comm = csz;
1621   objData = new LDObjData[osz];
1622   commData = new LDCommData[csz];
1623   avail_vector = NULL;
1624 }
1625
1626 CLBStatsMsg::~CLBStatsMsg() {
1627   delete [] objData;
1628   delete [] commData;
1629   delete [] avail_vector;
1630 }
1631
1632 void CLBStatsMsg::pup(PUP::er &p) {
1633   int i;
1634   p|from_pe;
1635   p|pe_speed;
1636   p|total_walltime;
1637   p|idletime;
1638   p|bg_walltime;
1639 #if CMK_LB_CPUTIMER
1640   p|total_cputime;
1641   p|bg_cputime;
1642 #endif
1643 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1644   p | step;
1645 #endif
1646   p|n_objs;
1647   if (p.isUnpacking()) objData = new LDObjData[n_objs];
1648   for (i=0; i<n_objs; i++) p|objData[i];
1649   p|n_comm;
1650   if (p.isUnpacking()) commData = new LDCommData[n_comm];
1651   for (i=0; i<n_comm; i++) p|commData[i];
1652
1653   int has_avail_vector;
1654   if (!p.isUnpacking()) has_avail_vector = (avail_vector != NULL);
1655   p|has_avail_vector;
1656   if (p.isUnpacking()) {
1657     if (has_avail_vector) avail_vector = new char[CkNumPes()];
1658     else avail_vector = NULL;
1659   }
1660   if (has_avail_vector) p(avail_vector, CkNumPes());
1661
1662   p(next_lb);
1663 }
1664
1665 // CkMarshalledCLBStatsMessage is used in the marshalled parameter in
1666 // the entry function, it is just used to use to pup.
1667 // I don't use CLBStatsMsg directly as marshalled parameter because
1668 // I want the data pointer stored and not to be freed by the Charm++.
1669 void CkMarshalledCLBStatsMessage::free() { 
1670   int count = msgs.size();
1671   for  (int i=0; i<count; i++) {
1672     delete msgs[i];
1673     msgs[i] = NULL;
1674   }
1675   msgs.free();
1676 }
1677
1678 void CkMarshalledCLBStatsMessage::add(CkMarshalledCLBStatsMessage &m)
1679 {
1680   int count = m.getCount();
1681   for (int i=0; i<count; i++) add(m.getMessage(i));
1682 }
1683
1684 void CkMarshalledCLBStatsMessage::pup(PUP::er &p)
1685 {
1686   int count = msgs.size();
1687   p|count;
1688   for (int i=0; i<count; i++) {
1689     CLBStatsMsg *msg;
1690     if (p.isUnpacking()) msg = new CLBStatsMsg;
1691     else { 
1692       msg = msgs[i]; CmiAssert(msg!=NULL);
1693     }
1694     msg->pup(p);
1695     if (p.isUnpacking()) add(msg);
1696   }
1697 }
1698
1699 SpanningTree::SpanningTree()
1700 {
1701         double sq = sqrt(CkNumPes()*4.0-3.0) - 1; // 1 + arity + arity*arity = CkNumPes()
1702         arity = (int)ceil(sq/2);
1703         calcParent(CkMyPe());
1704         calcNumChildren(CkMyPe());
1705 }
1706
1707 void SpanningTree::calcParent(int n)
1708 {
1709         parent=-1;
1710         if(n != 0  && arity > 0)
1711                 parent = (n-1)/arity;
1712 }
1713
1714 void SpanningTree::calcNumChildren(int n)
1715 {
1716         numChildren = 0;
1717         if (arity == 0) return;
1718         int fullNode=(CkNumPes()-1-arity)/arity;
1719         if(n <= fullNode)
1720                 numChildren = arity;
1721         if(n == fullNode+1)
1722                 numChildren = CkNumPes()-1-(fullNode+1)*arity;
1723         if(n > fullNode+1)
1724                 numChildren = 0;
1725 }
1726
1727 #include "CentralLB.def.h"
1728  
1729 /*@}*/