testing
[charm.git] / src / ck-ldb / CentralLB.C
1 /**
2  * \addtogroup CkLdb
3 */
4 /*@{*/
5
6 #include <charm++.h>
7 #include "ck.h"
8 #include "envelope.h"
9 #include "CentralLB.h"
10 #include "LBDBManager.h"
11 #include "LBSimulation.h"
12
13 #include <vector>
14
15 #define  DEBUGF(x)       // CmiPrintf x;
16 #define  DEBUG(x)        // x;
17
18 #if CMK_MEM_CHECKPOINT
19    /* can not handle reduction in inmem FT */
20 #define USE_REDUCTION         0
21 #define USE_LDB_SPANNING_TREE 0
22 #elif defined(_FAULT_MLOG_)
23 /* can not handle reduction in inmem FT */
24 #define USE_REDUCTION         0
25 #define USE_LDB_SPANNING_TREE 0
26 #else
27 #define USE_REDUCTION         1
28 #define USE_LDB_SPANNING_TREE 1
29 #endif
30
31 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
32 extern int _restartFlag;
33 extern void getGlobalStep(CkGroupID );
34 extern void initMlogLBStep(CkGroupID );
35 extern int globalResumeCount;
36 extern void sendDummyMigrationCounts(int *);
37 #endif
38
39 #if CMK_GRID_QUEUE_AVAILABLE
40 CpvExtern(void *, CkGridObject);
41 #endif
42
43 CkGroupID loadbalancer;
44 int * lb_ptr;
45 int load_balancer_created;
46
47 double lb_migration_cost = 0.0;
48 double lb_strategy_cost = 0.0;
49 int lb_no_iterations = -1;
50 double cur_max_pe_load = 0.0;
51 double cur_avg_pe_load = 0.0;
52 double prev_load = 0.0;
53
54 struct AdaptiveData {
55   int iteration;
56   double max_load;
57   double avg_load;
58 };
59
60 struct AdaptiveLBDatabase {
61   std::vector<AdaptiveData> history_data;
62 } adaptive_lbdb;
63
64 enum state {
65   OFF,
66   ON,
67   PAUSE,
68   DECIDED,
69   LOAD_BALANCE
70 } local_state;
71
72 CreateLBFunc_Def(CentralLB, "CentralLB base class")
73
74 static void getPredictedLoadWithMsg(BaseLB::LDStats* stats, int count, 
75                              LBMigrateMsg *, LBInfo &info, int considerComm);
76
77 CkReductionMsg* lbDataCollection(int nMsg, CkReductionMsg** msgs) {
78   double lb_data[4];
79   lb_data[0] = 0;
80   lb_data[1] = 0;
81   lb_data[2] = 0;
82   for (int i = 0; i < nMsg; i++) {
83     CkAssert(msgs[i]->getSize() == 4*sizeof(double));
84     double* m = (double *)msgs[i]->getData();
85     lb_data[0] += m[0];
86     lb_data[1] = ((m[1] > lb_data[1])? m[1] : lb_data[1]);
87     lb_data[2] += m[2];
88     if (i == 0) {
89       lb_data[3] = m[3];
90     }
91   }
92   return CkReductionMsg::buildNew(4*sizeof(double), lb_data);
93 }
94
95 /*global*/ CkReduction::reducerType lbDataCollectionType;
96 /*initcall*/ void registerLBDataCollection(void) {
97   lbDataCollectionType = CkReduction::addReducer(lbDataCollection);
98 }
99
100 /*
101 void CreateCentralLB()
102 {
103   CProxy_CentralLB::ckNew(0);
104 }
105 */
106
107 void CentralLB::staticStartLB(void* data)
108 {
109   CentralLB *me = (CentralLB*)(data);
110   me->StartLB();
111 }
112
113 void CentralLB::staticMigrated(void* data, LDObjHandle h, int waitBarrier)
114 {
115   CentralLB *me = (CentralLB*)(data);
116   me->Migrated(h, waitBarrier);
117 }
118
119 void CentralLB::staticAtSync(void* data)
120 {
121   CentralLB *me = (CentralLB*)(data);
122   me->AtSync();
123 }
124
125 void CentralLB::initLB(const CkLBOptions &opt)
126 {
127 #if CMK_LBDB_ON
128   lbname = "CentralLB";
129   thisProxy = CProxy_CentralLB(thisgroup);
130   //  CkPrintf("Construct in %d\n",CkMyPe());
131
132   // create and turn on by default
133   receiver = theLbdb->
134     AddLocalBarrierReceiver((LDBarrierFn)(staticAtSync),(void*)(this));
135   notifier = theLbdb->getLBDB()->
136     NotifyMigrated((LDMigratedFn)(staticMigrated),(void*)(this));
137   startLbFnHdl = theLbdb->getLBDB()->
138     AddStartLBFn((LDStartLBFn)(staticStartLB),(void*)(this));
139
140   // CkPrintf("[%d] CentralLB initLB \n",CkMyPe());
141   if (opt.getSeqNo() > 0) turnOff();
142
143   stats_msg_count = 0;
144   statsMsgsList = NULL;
145   statsData = NULL;
146
147   storedMigrateMsg = NULL;
148   reduction_started = 0;
149
150   // for future predictor
151   if (_lb_predict) predicted_model = new FutureModel(_lb_predict_window);
152   else predicted_model=0;
153   // register user interface callbacks
154   theLbdb->getLBDB()->SetupPredictor((LDPredictModelFn)(staticPredictorOn),(LDPredictWindowFn)(staticPredictorOnWin),(LDPredictFn)(staticPredictorOff),(LDPredictModelFn)(staticChangePredictor),(void*)(this));
155
156   myspeed = theLbdb->ProcessorSpeed();
157
158   migrates_completed = 0;
159   future_migrates_completed = 0;
160   migrates_expected = -1;
161   future_migrates_expected = -1;
162   cur_ld_balancer = _lb_args.central_pe();      // 0 default
163   lbdone = 0;
164   count_msgs=0;
165   statsMsg = NULL;
166
167   if (_lb_args.statsOn()) theLbdb->CollectStatsOn();
168
169   load_balancer_created = 1;
170 #endif
171 }
172
173 CentralLB::~CentralLB()
174 {
175 #if CMK_LBDB_ON
176   delete [] statsMsgsList;
177   delete statsData;
178   theLbdb = CProxy_LBDatabase(_lbdb).ckLocalBranch();
179   if (theLbdb) {
180     theLbdb->getLBDB()->
181       RemoveNotifyMigrated(notifier);
182     theLbdb->
183       RemoveStartLBFn((LDStartLBFn)(staticStartLB));
184   }
185 #endif
186 }
187
188 void CentralLB::turnOn() 
189 {
190 #if CMK_LBDB_ON
191   theLbdb->getLBDB()->
192     TurnOnBarrierReceiver(receiver);
193   theLbdb->getLBDB()->
194     TurnOnNotifyMigrated(notifier);
195   theLbdb->getLBDB()->
196     TurnOnStartLBFn(startLbFnHdl);
197 #endif
198 }
199
200 void CentralLB::turnOff() 
201 {
202 #if CMK_LBDB_ON
203   theLbdb->getLBDB()->
204     TurnOffBarrierReceiver(receiver);
205   theLbdb->getLBDB()->
206     TurnOffNotifyMigrated(notifier);
207   theLbdb->getLBDB()->
208     TurnOffStartLBFn(startLbFnHdl);
209 #endif
210 }
211
212 void CentralLB::AtSync()
213 {
214 //  CkPrintf("AtSync CEntral LB [%d]\n", CkMyPe());
215 #if CMK_LBDB_ON
216 //  DEBUGF(("[%d] CentralLB AtSync step %d!!!!!\n",CkMyPe(),step()));
217
218 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
219         CpvAccess(_currentObj)=this;
220 #endif
221
222   // if num of processor is only 1, nothing should happen
223   if (!QueryBalanceNow(step()) || CkNumPes() == 1) {
224     MigrationDone(0);
225     return;
226   }
227   if(CmiNodeAlive(CkMyPe())){
228     thisProxy [CkMyPe()].ProcessAtSyncMin();
229   }
230 #endif
231 }
232
233 #include "ComlibStrategy.h"
234
235 void CentralLB::ProcessAtSync()
236 {
237
238   // Reset all adaptive lb related fields since load balancing is being done.
239   lb_no_iterations = -1;
240   adaptive_lbdb.history_data.clear();
241   prev_load = 0.0;
242   lb_ideal_period = INT_MAX;
243   local_state = OFF;
244
245
246 #if CMK_LBDB_ON
247   if (reduction_started) return;              // reducton in progress
248
249   CmiAssert(CmiNodeAlive(CkMyPe()));
250   if (CkMyPe() == cur_ld_balancer) {
251     start_lb_time = CkWallTimer();
252   }
253
254
255 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
256         initMlogLBStep(thisgroup);
257 #endif
258
259   // build message
260   BuildStatsMsg();
261
262 #if USE_REDUCTION
263     // reduction to get total number of objects and comm
264     // so that processor 0 can pre-allocate load balancing database
265   int counts[2];
266   counts[0] = theLbdb->GetObjDataSz();
267   counts[1] = theLbdb->GetCommDataSz();
268
269   CkCallback cb(CkIndex_CentralLB::ReceiveCounts((CkReductionMsg*)NULL), 
270                   thisProxy[0]);
271   contribute(2*sizeof(int), counts, CkReduction::sum_int, cb);
272   reduction_started = 1;
273 #else
274   SendStats();
275 #endif
276 #endif
277 }
278
279 void CentralLB::ProcessAtSyncMin()
280 {
281 #if CMK_LBDB_ON
282   if (reduction_started) return;              // reducton in progress
283
284   CmiAssert(CmiNodeAlive(CkMyPe()));
285   if (CkMyPe() == cur_ld_balancer) {
286     start_lb_time = CkWallTimer();
287   }
288
289
290 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
291         initMlogLBStep(thisgroup);
292 #endif
293   
294   lb_no_iterations++;
295   CkPrintf("ProcessAtSyncMin [%d] lb_iteration [%d] lb_ideal_period [%d]\n", CkMyPe(),
296       lb_no_iterations, lb_ideal_period);
297
298   // If decision has been made and has reached the lb_period, then do load
299   // balancing, else if hasn't reached ideal_period, then resume.
300   if (local_state == DECIDED) {
301     if (lb_no_iterations < lb_ideal_period) {
302       ResumeClients(1);
303     } else {
304       local_state = LOAD_BALANCE;
305       ProcessAtSync();
306     }
307     return;
308   }
309    
310   // If the state is ON and not DECIDED, then if havn't reached lb_period, then
311   // move ahead. If has reached lb_ideal_period, then change state to PAUSE and
312   // dont resume client.
313   if (local_state == ON) {
314     if (lb_no_iterations < lb_ideal_period) {
315       ResumeClients(1);
316     } else {
317       local_state = PAUSE;
318     }
319     return;
320   }
321
322   SendMinStats();
323   ResumeClients(1);
324 #endif
325 }
326
327 void CentralLB::SendMinStats() {
328
329  double total_load;
330  double idle_time;
331  double bg_walltime;
332   theLbdb->GetTime(&total_load,&total_load, &idle_time, &bg_walltime, &bg_walltime);
333   CkPrintf("Total walltime [%d] %lf: %lf: %lf final laod: %lf\n", CkMyPe(), total_load, idle_time, bg_walltime, (total_load - idle_time));
334
335   // Since the total_load is cumulative since the last load balancing stage,
336   // Hence it is subtracted from the previous load.
337   total_load -= idle_time;
338   double tmp = total_load;
339   total_load -= prev_load;
340   prev_load = tmp; 
341
342   double lb_data[4];
343   lb_data[0] = total_load;
344   lb_data[1] = total_load;
345   lb_data[2] = 1;
346   lb_data[3] = lb_no_iterations;
347
348   CkCallback cb(CkIndex_CentralLB::ReceiveMinStats((CkReductionMsg*)NULL), 
349                   thisProxy[0]);
350   contribute(4*sizeof(double), lb_data, lbDataCollectionType, cb);
351 }
352
353 void CentralLB::ReceiveMinStats(CkReductionMsg *msg) {
354   CmiAssert(CkMyPe() == 0);
355   double* load = (LBRealType *) msg->getData();
356   double max = load[1];
357   double avg = load[0]/load[2];
358   int iteration_n = load[3];
359   CkPrintf("Iteration %d Total load : %lf Avg load: %lf Max load: %lf\n\n",lb_no_iterations, load[0], load[0]/load[2], load[1]);
360
361   // Store the data for this iteration
362   AdaptiveData data;
363   data.iteration = lb_no_iterations;
364   data.max_load = max;
365   data.avg_load = avg;
366   adaptive_lbdb.history_data.push_back(data);
367
368   // If the max/avg ratio is greater than the threshold and also this is not the
369   // step immediately after load balancing, carry out load balancing
370   if (max/avg >= 1.1 && adaptive_lbdb.history_data.size() > 4) {
371     CkPrintf("Carry out load balancing step\n");
372     lb_ideal_period = iteration_n + 1;
373     thisProxy.LoadBalanceDecision(lb_ideal_period);
374     return;
375   }
376
377   // Generate the plan for the adaptive strategy
378   if (generatePlan()) {
379     thisProxy.LoadBalanceDecision(lb_ideal_period);
380   }
381 }
382
383 bool CentralLB::generatePlan() {
384   if (adaptive_lbdb.history_data.size() <= 4) {
385     return false;
386   }
387
388   // Some heuristics for lbperiod
389   // If constant load or almost constant,
390   // then max * new_lb_period > avg * new_lb_period + lb_cost
391   double max = 0.0;
392   double avg = 0.0;
393   AdaptiveData data;
394   for (int i = 1; i < adaptive_lbdb.history_data.size(); i++) {
395     data = adaptive_lbdb.history_data[i];
396     max += data.max_load;
397     avg += data.avg_load;
398     CkPrintf("max (%d, %lf) avg (%d, %lf)\n", i, data.max_load, i, data.avg_load);
399   }
400 //  max /= (lb_no_iterations - adaptive_lbdb.history_data[0].iteration);
401 //  avg /= (lb_no_iterations - adaptive_lbdb.history_data[0].iteration);
402 //
403 //  lb_ideal_period = (lb_strategy_cost + lb_migration_cost) / (max - avg);
404 //  CkPrintf("max : %lf, avg: %lf, strat cost: %lf, migration_cost: %lf, idealperiod : %d \n",
405 //      max, avg, lb_strategy_cost, lb_migration_cost, lb_ideal_period);
406 //
407   // If linearly varying load, then find lb_period
408   // area between the max and avg curve 
409   double mslope, aslope, mc, ac;
410   getLineEq(aslope, ac, mslope, mc);
411   CkPrintf("\n max: %fx + %f; avg: %fx + %f\n", mslope, mc, aslope, ac);
412   double a = (mslope - aslope)/2;
413   double b = (mc - ac);
414   double c = -(lb_strategy_cost + lb_migration_cost);
415   //c = -2.5;
416   lb_ideal_period = getPeriodForLinear(a, b, c);
417   CkPrintf("Ideal period for linear load %d\n", lb_ideal_period);
418
419   return true;
420 }
421
422 int CentralLB::getPeriodForLinear(double a, double b, double c) {
423   if (a == 0.0) {
424     return (-c / b);
425   }
426   int x;
427   double t = (b * b) - (4*a*c);
428   t = (-b + sqrt(t)) / (2*a);
429   x = t;
430   return x;
431 }
432
433 bool CentralLB::getLineEq(double& aslope, double& ac, double& mslope, double& mc) {
434   int total = adaptive_lbdb.history_data.size();
435   int iterations = 1 + adaptive_lbdb.history_data[total - 1].iteration -
436       adaptive_lbdb.history_data[0].iteration;
437   double a1 = 0;
438   double m1 = 0;
439   double a2 = 0;
440   double m2 = 0;
441   AdaptiveData data;
442   int i = 0;
443   for (i = 0; i <= total/2; i++) {
444     data = adaptive_lbdb.history_data[i];
445     m1 += data.max_load;
446     a1 += data.avg_load;
447   }
448   m1 /= i;
449   a1 /= i;
450
451   for (i = total/2; i < total; i++) {
452     data = adaptive_lbdb.history_data[i];
453     m2 += data.max_load;
454     a2 += data.avg_load;
455   }
456   m2 /= (i - total/2);
457   a2 /= (i - total/2);
458
459   aslope = 2 * (a2 - a1) / iterations;
460   mslope = 2 * (m2 - m1) / iterations;
461   ac = adaptive_lbdb.history_data[0].avg_load;
462   mc = adaptive_lbdb.history_data[0].max_load;
463   return true;
464 }
465
466 void CentralLB::LoadBalanceDecision(int period) {
467   CkPrintf("[%d] Load balance decision made cur iteration: %d period:%d state: %d\n",CkMyPe(), lb_no_iterations, period, local_state);
468   lb_ideal_period = period;
469   if (local_state == OFF) {
470     local_state = ON;
471
472     // Send the current iteration no
473     CkCallback cb(CkIndex_CentralLB::ReceiveIterationNo((CkReductionMsg*)NULL), 
474         thisProxy[0]);
475     int tmp = lb_no_iterations;
476     contribute(sizeof(int), &tmp, CkReduction::max_int, cb);
477     return;
478   }
479
480   if (local_state == ON) {
481     local_state = DECIDED;
482     return;
483   }
484
485   // If the state is PAUSE, then its waiting for the final decision from central
486   // processor. If the decision is that the ideal period is in the future,
487   // resume. If the ideal period is now, then carry out load balancing.
488   if (local_state == PAUSE) {
489     if (lb_no_iterations < lb_ideal_period) {
490       local_state = DECIDED;
491       ResumeClients(1);
492     } else {
493       ProcessAtSync();
494     }
495   }
496 }
497
498 void CentralLB::ReceiveIterationNo(CkReductionMsg *msg) {
499   CmiAssert(CkMyPe() == 0);
500   int *it_no = (int *) msg->getData();
501   CkPrintf("^^^^ Received all iteration no and final decision %d^^^^\n", it_no[0]);
502   //thisProxy.LoadBalanceDecision(it_no[0]);
503   lb_ideal_period = (lb_ideal_period > it_no[0]) ? lb_ideal_period : it_no[0] + 1;
504   thisProxy.LoadBalanceDecision(lb_ideal_period);
505 }
506
507 // called only on 0
508 void CentralLB::ReceiveCounts(CkReductionMsg  *msg)
509 {
510   CmiAssert(CkMyPe() == 0);
511   if (statsData == NULL) statsData = new LDStats;
512
513   int *counts = (int *)msg->getData();
514   int n_objs = counts[0];
515   int n_comm = counts[1];
516
517     // resize database
518   statsData->objData.resize(n_objs);
519   statsData->from_proc.resize(n_objs);
520   statsData->to_proc.resize(n_objs);
521   statsData->commData.resize(n_comm);
522
523   DEBUGF(("[%d] ReceiveCounts: n_objs:%d n_comm:%d\n",CkMyPe(), n_objs, n_comm));
524         
525     // broadcast call to let everybody start to send stats
526   thisProxy.SendStats();
527 }
528
529 void CentralLB::BuildStatsMsg()
530 {
531 #if CMK_LBDB_ON
532   // build and send stats
533   const int osz = theLbdb->GetObjDataSz();
534   const int csz = theLbdb->GetCommDataSz();
535
536   int npes = CkNumPes();
537   CLBStatsMsg* msg = new CLBStatsMsg(osz, csz);
538   _MEMCHECK(msg);
539   msg->from_pe = CkMyPe();
540 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
541         msg->step = step();
542 #endif
543   //msg->serial = CrnRand();
544
545 /*
546   theLbdb->TotalTime(&msg->total_walltime,&msg->total_cputime);
547   theLbdb->IdleTime(&msg->idletime);
548   theLbdb->BackgroundLoad(&msg->bg_walltime,&msg->bg_cputime);
549 */
550 #if CMK_LB_CPUTIMER
551   theLbdb->GetTime(&msg->total_walltime,&msg->total_cputime,
552                    &msg->idletime, &msg->bg_walltime,&msg->bg_cputime);
553 #else
554   theLbdb->GetTime(&msg->total_walltime,&msg->total_walltime,
555                    &msg->idletime, &msg->bg_walltime,&msg->bg_walltime);
556 #endif
557
558   msg->pe_speed = myspeed;
559   DEBUGF(("Processor %d Total time (wall,cpu) = %f %f Idle = %f Bg = %f %f\n", CkMyPe(),msg->total_walltime,msg->total_cputime,msg->idletime,msg->bg_walltime,msg->bg_cputime));
560
561   msg->n_objs = osz;
562   theLbdb->GetObjData(msg->objData);
563   msg->n_comm = csz;
564   theLbdb->GetCommData(msg->commData);
565 //  theLbdb->ClearLoads();
566   DEBUGF(("PE %d BuildStatsMsg %d objs, %d comm\n",CkMyPe(),msg->n_objs,msg->n_comm));
567
568   if(CkMyPe() == cur_ld_balancer) {
569     msg->avail_vector = new char[CkNumPes()];
570     LBDatabaseObj()->get_avail_vector(msg->avail_vector);
571     msg->next_lb = LBDatabaseObj()->new_lbbalancer();
572   }
573
574   CmiAssert(statsMsg == NULL);
575   statsMsg = msg;
576 #endif
577 }
578
579
580 // called on every processor
581 void CentralLB::SendStats()
582 {
583 #if CMK_LBDB_ON
584   CmiAssert(statsMsg != NULL);
585   reduction_started = 0;
586
587 #if USE_LDB_SPANNING_TREE
588   if(CkNumPes()>1024)
589   {
590     if (CkMyPe() == cur_ld_balancer)
591       thisProxy[CkMyPe()].ReceiveStats(statsMsg);
592     else
593       thisProxy[CkMyPe()].ReceiveStatsViaTree(statsMsg);
594   }
595   else
596 #endif
597   {
598     DEBUGF(("[%d] calling ReceiveStats on step %d \n",CmiMyPe(),step()));
599     thisProxy[cur_ld_balancer].ReceiveStats(statsMsg);
600   }
601
602   statsMsg = NULL;
603
604 #ifdef __BIGSIM__
605   BgEndStreaming();
606 #endif
607
608   {
609   // enfore the barrier to wait until centralLB says no
610   LDOMHandle h;
611   h.id.id.idx = 0;
612   theLbdb->getLBDB()->RegisteringObjects(h);
613   }
614 #endif
615 }
616
617 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
618 extern int donotCountMigration;
619 #endif
620
621 void CentralLB::Migrated(LDObjHandle h, int waitBarrier)
622 {
623 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
624     if(donotCountMigration){
625         return ;
626     }
627 #endif
628
629 #if CMK_LBDB_ON
630   if (waitBarrier) {
631             migrates_completed++;
632       DEBUGF(("[%d] An object migrated! %d %d\n",CkMyPe(),migrates_completed,migrates_expected));
633     if (migrates_completed == migrates_expected) {
634       MigrationDone(1);
635     }
636   }
637   else {
638     future_migrates_completed ++;
639     DEBUGF(("[%d] An object migrated with no barrier! %d expected: %d\n",CkMyPe(),future_migrates_completed,future_migrates_expected));
640     if (future_migrates_completed == future_migrates_expected)  {
641         CheckMigrationComplete();
642     }
643   }
644 #endif
645 }
646
647 void CentralLB::MissMigrate(int waitForBarrier)
648 {
649   LDObjHandle h;
650   Migrated(h, waitForBarrier);
651 }
652
653 // build a complete data from bufferred messages
654 // not used when USE_REDUCTION = 1
655 void CentralLB::buildStats()
656 {
657     statsData->nprocs() = stats_msg_count;
658     // allocate space
659     statsData->objData.resize(statsData->n_objs);
660     statsData->from_proc.resize(statsData->n_objs);
661     statsData->to_proc.resize(statsData->n_objs);
662     statsData->commData.resize(statsData->n_comm);
663
664     int nobj = 0;
665     int ncom = 0;
666     int nmigobj = 0;
667     // copy all data in individule message to this big structure
668     for (int pe=0; pe<CkNumPes(); pe++) {
669        int i;
670        CLBStatsMsg *msg = statsMsgsList[pe];
671        if(msg == NULL) continue;
672        for (i=0; i<msg->n_objs; i++) {
673          statsData->from_proc[nobj] = statsData->to_proc[nobj] = pe;
674          statsData->objData[nobj] = msg->objData[i];
675          if (msg->objData[i].migratable) nmigobj++;
676          nobj++;
677        }
678        for (i=0; i<msg->n_comm; i++) {
679          statsData->commData[ncom] = msg->commData[i];
680          ncom++;
681        }
682        // free the memory
683        delete msg;
684        statsMsgsList[pe]=0;
685     }
686     statsData->n_migrateobjs = nmigobj;
687 }
688
689 // deposit one processor data at a time, note database is pre-allocated
690 // to have enough space
691 // used when USE_REDUCTION = 1
692 void CentralLB::depositData(CLBStatsMsg *m)
693 {
694   int i;
695   if (m == NULL) return;
696
697   const int pe = m->from_pe;
698   struct ProcStats &procStat = statsData->procs[pe];
699   procStat.pe = pe;
700   procStat.total_walltime = m->total_walltime;
701   procStat.idletime = m->idletime;
702   procStat.bg_walltime = m->bg_walltime;
703 #if CMK_LB_CPUTIMER
704   procStat.total_cputime = m->total_cputime;
705   procStat.bg_cputime = m->bg_cputime;
706 #endif
707   procStat.pe_speed = m->pe_speed;
708   //procStat.utilization = 1.0;
709   procStat.available = CmiTrue;
710   procStat.n_objs = m->n_objs;
711
712   int &nobj = statsData->n_objs;
713   int &nmigobj = statsData->n_migrateobjs;
714   for (i=0; i<m->n_objs; i++) {
715       statsData->from_proc[nobj] = statsData->to_proc[nobj] = pe;
716       statsData->objData[nobj] = m->objData[i];
717       if (m->objData[i].migratable) nmigobj++;
718       nobj++;
719       CmiAssert(nobj <= statsData->objData.capacity());
720   }
721   int &n_comm = statsData->n_comm;
722   for (i=0; i<m->n_comm; i++) {
723       statsData->commData[n_comm] = m->commData[i];
724       n_comm++;
725       CmiAssert(n_comm <= statsData->commData.capacity());
726   }
727   delete m;
728 }
729
730 void CentralLB::ReceiveStats(CkMarshalledCLBStatsMessage &msg)
731 {
732 #if CMK_LBDB_ON
733   if (statsMsgsList == NULL) {
734     statsMsgsList = new CLBStatsMsg*[CkNumPes()];
735     CmiAssert(statsMsgsList != NULL);
736     for(int i=0; i < CkNumPes(); i++)
737       statsMsgsList[i] = 0;
738   }
739   if (statsData == NULL) statsData = new LDStats;
740
741     //  loop through all CLBStatsMsg in the incoming msg
742   int count = msg.getCount();
743   for (int num = 0; num < count; num++) 
744   {
745     CLBStatsMsg *m = msg.getMessage(num);
746     CmiAssert(m!=NULL);
747     const int pe = m->from_pe;
748     DEBUGF(("Stats msg received, %d %d %d %p step %d\n", pe,stats_msg_count,m->n_objs,m,step()));
749 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))     
750 /*      
751  *  if(m->step < step()){
752  *    //TODO: if a processor is redoing an old load balance step..
753  *    //tell it that the step is done and that it should not perform any migrations
754  *      thisProxy[pe].ReceiveDummyMigration();
755  *  }*/
756 #endif
757         
758     if(!CmiNodeAlive(pe)){
759         DEBUGF(("[%d] ReceiveStats called from invalidProcessor %d\n",CkMyPe(),pe));
760         continue;
761     }
762         
763     if (m->avail_vector!=NULL) {
764       LBDatabaseObj()->set_avail_vector(m->avail_vector,  m->next_lb);
765     }
766
767     if (statsMsgsList[pe] != 0) {
768       CkPrintf("*** Unexpected CLBStatsMsg in ReceiveStats from PE %d ***\n",
769              pe);
770     } else {
771       statsMsgsList[pe] = m;
772 #if USE_REDUCTION
773       depositData(m);
774 #else
775       // store per processor data right away
776       struct ProcStats &procStat = statsData->procs[pe];
777       procStat.pe = pe;
778       procStat.total_walltime = m->total_walltime;
779       procStat.idletime = m->idletime;
780       procStat.bg_walltime = m->bg_walltime;
781 #if CMK_LB_CPUTIMER
782       procStat.total_cputime = m->total_cputime;
783       procStat.bg_cputime = m->bg_cputime;
784 #endif
785       procStat.pe_speed = m->pe_speed;
786       //procStat.utilization = 1.0;
787       procStat.available = CmiTrue;
788       procStat.n_objs = m->n_objs;
789
790       statsData->n_objs += m->n_objs;
791       statsData->n_comm += m->n_comm;
792 #endif
793       stats_msg_count++;
794     }
795   }    // end of for
796
797   const int clients = CkNumValidPes();
798   DEBUGF(("THIS POINT count = %d, clients = %d\n",stats_msg_count,clients));
799  
800   if (stats_msg_count == clients) {
801         DEBUGF(("[%d] All stats messages received \n",CmiMyPe()));
802     statsData->nprocs() = stats_msg_count;
803     thisProxy[CkMyPe()].LoadBalance();
804   }
805 #endif
806 }
807
808 /** added by Abhinav for receiving msgs via spanning tree */
809 void CentralLB::ReceiveStatsViaTree(CkMarshalledCLBStatsMessage &msg)
810 {
811 #if CMK_LBDB_ON
812         CmiAssert(CkMyPe() != 0);
813         bufMsg.add(msg);         // buffer messages
814         count_msgs++;
815         //CkPrintf("here %d\n", CkMyPe());
816         if (count_msgs == st.numChildren+1) {
817                 if(st.parent == 0)
818                 {
819                         thisProxy[0].ReceiveStats(bufMsg);
820                         //CkPrintf("from %d\n", CkMyPe());
821                 }
822                 else
823                         thisProxy[st.parent].ReceiveStatsViaTree(bufMsg);
824                 count_msgs = 0;
825                 bufMsg.free();
826         } 
827 #endif
828 }
829
830 void CentralLB::LoadBalance()
831 {
832 #if CMK_LBDB_ON
833   int proc;
834   const int clients = CkNumPes();
835
836 #if ! USE_REDUCTION
837   // build data
838   buildStats();
839 #else
840   for (proc = 0; proc < clients; proc++) statsMsgsList[proc] = NULL;
841 #endif
842
843   if (_lb_args.debug()) 
844       CmiPrintf("\nCharmLB> %s: PE [%d] step %d starting at %f Memory: %f MB\n",
845                   lbname, cur_ld_balancer, step(), start_lb_time,
846                   CmiMemoryUsage()/(1024.0*1024.0));
847
848   // if we are in simulation mode read data
849   if (LBSimulation::doSimulation) simulationRead();
850
851   char *availVector = LBDatabaseObj()->availVector();
852   for(proc = 0; proc < clients; proc++)
853       statsData->procs[proc].available = (CmiBool)availVector[proc];
854
855   preprocess(statsData);
856
857 //    CkPrintf("Before Calling Strategy\n");
858
859   if (_lb_args.printSummary()) {
860       LBInfo info(clients);
861         // not take comm data
862       info.getInfo(statsData, clients, 0);
863       LBRealType mLoad, mCpuLoad, totalLoad;
864       info.getSummary(mLoad, mCpuLoad, totalLoad);
865       int nmsgs, nbytes;
866       statsData->computeNonlocalComm(nmsgs, nbytes);
867       CkPrintf("[%d] Load Summary (before LB): max (with bg load): %f max (obj only): %f average: %f at step %d nonlocal: %d msgs %.2fKB.\n", CkMyPe(), mLoad, mCpuLoad, totalLoad/clients, step(), nmsgs, 1.0*nbytes/1024);
868 //      if (_lb_args.debug() > 1) {
869 //        for (int i=0; i<statsData->n_objs; i++)
870 //          CmiPrintf("[%d] %.10f %.10f\n", i, statsData->objData[i].minWall, statsData->objData[i].maxWall);
871 //      }
872   }
873
874 #if CMK_REPLAYSYSTEM
875   LDHandle *loadBalancer_pointers;
876   if (_replaySystem) {
877     loadBalancer_pointers = (LDHandle*)malloc(CkNumPes()*sizeof(LDHandle));
878     for (int i=0; i<statsData->n_objs; ++i) loadBalancer_pointers[statsData->from_proc[i]] = statsData->objData[i].handle.omhandle.ldb;
879   }
880 #endif
881   
882   LBMigrateMsg* migrateMsg = Strategy(statsData);
883 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
884         migrateMsg->step = step();
885 #endif
886
887 #if CMK_REPLAYSYSTEM
888   CpdHandleLBMessage(&migrateMsg);
889   if (_replaySystem) {
890     for (int i=0; i<migrateMsg->n_moves; ++i) migrateMsg->moves[i].obj.omhandle.ldb = loadBalancer_pointers[migrateMsg->moves[i].from_pe];
891     free(loadBalancer_pointers);
892   }
893 #endif
894   
895   LBDatabaseObj()->get_avail_vector(migrateMsg->avail_vector);
896   migrateMsg->next_lb = LBDatabaseObj()->new_lbbalancer();
897
898   // if this is the step at which we need to dump the database
899   simulationWrite();
900
901 //  calculate predicted load
902 //  very time consuming though, so only happen when debugging is on
903   if (_lb_args.printSummary()) {
904       LBInfo info(clients);
905         // not take comm data
906       getPredictedLoadWithMsg(statsData, clients, migrateMsg, info, 0);
907       LBRealType mLoad, mCpuLoad, totalLoad;
908       info.getSummary(mLoad, mCpuLoad, totalLoad);
909       int nmsgs, nbytes;
910       statsData->computeNonlocalComm(nmsgs, nbytes);
911       CkPrintf("[%d] Load Summary (after LB): max (with bg load): %f max (obj only): %f average: %f at step %d nonlocal: %d msgs %.2fKB useMem: %.2fKB.\n", CkMyPe(), mLoad, mCpuLoad, totalLoad/clients, step(), nmsgs, 1.0*nbytes/1024, (1.0*useMem())/1024);
912       for (int i=0; i<clients; i++)
913         migrateMsg->expectedLoad[i] = info.peLoads[i];
914   }
915
916   DEBUGF(("[%d]calling recv migration\n",CkMyPe()));
917 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_)) 
918     lbDecisionCount++;
919     migrateMsg->lbDecisionCount = lbDecisionCount;
920 #endif
921
922   envelope *env = UsrToEnv(migrateMsg);
923   if (1) {
924       // broadcast
925     thisProxy.ReceiveMigration(migrateMsg);
926   }
927   else {
928     // split the migration for each processor
929     for (int p=0; p<CkNumPes(); p++) {
930       LBMigrateMsg *m = extractMigrateMsg(migrateMsg, p);
931       thisProxy[p].ReceiveMigration(m);
932     }
933     delete migrateMsg;
934   }
935
936   // Zero out data structures for next cycle
937   // CkPrintf("zeroing out data\n");
938   statsData->clear();
939   stats_msg_count=0;
940 #endif
941 }
942
943 // test if sender and receiver in a commData is nonmigratable.
944 static int isMigratable(LDObjData **objData, int *len, int count, const LDCommData &commData)
945 {
946 #if CMK_LBDB_ON
947   for (int pe=0 ; pe<count; pe++)
948   {
949     for (int i=0; i<len[pe]; i++)
950       if (LDObjIDEqual(objData[pe][i].objID(), commData.sender.objID()) ||
951           LDObjIDEqual(objData[pe][i].objID(), commData.receiver.get_destObj().objID())) 
952       return 0;
953   }
954 #endif
955   return 1;
956 }
957
958 // rebuild LDStats and remove all non-migratble objects and related things
959 void CentralLB::removeNonMigratable(LDStats* stats, int count)
960 {
961   int i;
962
963   // check if we have non-migratable objects
964   int have = 0;
965   for (i=0; i<stats->n_objs; i++) 
966   {
967     LDObjData &odata = stats->objData[i];
968     if (!odata.migratable) {
969       have = 1; break;
970     }
971   }
972   if (have == 0) return;
973
974   CkVec<LDObjData> nonmig;
975   CkVec<int> new_from_proc, new_to_proc;
976   nonmig.resize(stats->n_migrateobjs);
977   new_from_proc.resize(stats->n_migrateobjs);
978   new_to_proc.resize(stats->n_migrateobjs);
979   int n_objs = 0;
980   for (i=0; i<stats->n_objs; i++) 
981   {
982     LDObjData &odata = stats->objData[i];
983     if (odata.migratable) {
984       nonmig[n_objs] = odata;
985       new_from_proc[n_objs] = stats->from_proc[i];
986       new_to_proc[n_objs] = stats->to_proc[i];
987       n_objs ++;
988     }
989     else {
990       stats->procs[stats->from_proc[i]].bg_walltime += odata.wallTime;
991 #if CMK_LB_CPUTIMER
992       stats->procs[stats->from_proc[i]].bg_cputime += odata.cpuTime;
993 #endif
994     }
995   }
996   CmiAssert(stats->n_migrateobjs == n_objs);
997
998   stats->makeCommHash();
999   
1000   CkVec<LDCommData> newCommData;
1001   newCommData.resize(stats->n_comm);
1002   int n_comm = 0;
1003   for (i=0; i<stats->n_comm; i++) 
1004   {
1005     LDCommData& cdata = stats->commData[i];
1006     if (!cdata.from_proc()) 
1007     {
1008       int idx = stats->getSendHash(cdata);
1009       CmiAssert(idx != -1);
1010       if (!stats->objData[idx].migratable) continue;
1011     }
1012     switch (cdata.receiver.get_type()) {
1013     case LD_PROC_MSG:
1014       break;
1015     case LD_OBJ_MSG:  {
1016       int idx = stats->getRecvHash(cdata);
1017       if (stats->complete_flag)
1018         CmiAssert(idx != -1);
1019       else if (idx == -1) continue;          // receiver not in this group
1020       if (!stats->objData[idx].migratable) continue;
1021       break;
1022       }
1023     case LD_OBJLIST_MSG:    // object message FIXME add multicast
1024       break;
1025     }
1026     newCommData[n_comm] = cdata;
1027     n_comm ++;
1028   }
1029
1030   if (n_objs != stats->n_objs) CmiPrintf("Removed %d nonmigratable %d comms - n_objs:%d migratable:%d\n", stats->n_objs-n_objs, stats->n_objs, stats->n_migrateobjs, stats->n_comm-n_comm);
1031
1032   // swap to new data
1033   stats->objData = nonmig;
1034   stats->from_proc = new_from_proc;
1035   stats->to_proc = new_to_proc;
1036   stats->n_objs = n_objs;
1037
1038   stats->commData = newCommData;
1039   stats->n_comm = n_comm;
1040
1041   stats->deleteCommHash();
1042   stats->makeCommHash();
1043
1044 }
1045
1046
1047 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1048 extern int restarted;
1049 #endif
1050
1051 void CentralLB::ReceiveMigration(LBMigrateMsg *m)
1052 {
1053   storedMigrateMsg = m;
1054   CkCallback cb(CkIndex_CentralLB::ProcessReceiveMigration((CkReductionMsg*)NULL),
1055                   thisProxy);
1056   contribute(0, NULL, CkReduction::max_int, cb);
1057 }
1058
1059 void CentralLB::ProcessReceiveMigration(CkReductionMsg  *msg)
1060 {
1061 #if CMK_LBDB_ON
1062         int i;
1063         LBMigrateMsg *m = storedMigrateMsg;
1064         CmiAssert(m!=NULL);
1065         delete msg;
1066
1067 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1068         int *dummyCounts;
1069
1070         DEBUGF(("[%d] Starting ReceiveMigration WITH step %d m->step %d\n",CkMyPe(),step(),m->step));
1071         // CmiPrintf("[%d] Starting ReceiveMigration step %d m->step %d\n",CkMyPe(),step(),m->step);
1072         if(step() > m->step){
1073                 char str[100];
1074                 envelope *env = UsrToEnv(m);
1075                 CmiPrintf("[%d] Object %s tProcessed %d m->TN %d\n",CmiMyPe(),mlogData->objID.toString(str),mlogData->tProcessed,env->TN);
1076                 return;
1077         }
1078         lbDecisionCount = m->lbDecisionCount;
1079 #endif
1080
1081   if (_lb_args.debug() > 1) 
1082     if (CkMyPe()%1024==0) CmiPrintf("[%d] Starting ReceiveMigration step %d at %f\n",CkMyPe(),step(), CmiWallTimer());
1083
1084   for (i=0; i<CkNumPes(); i++) theLbdb->lastLBInfo.expectedLoad[i] = m->expectedLoad[i];
1085   CmiAssert(migrates_expected <= 0 || migrates_completed == migrates_expected);
1086 /*FAULT_EVAC*/
1087   if(!CmiNodeAlive(CkMyPe())){
1088         delete m;
1089         return;
1090   }
1091   migrates_expected = 0;
1092   future_migrates_expected = 0;
1093 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1094         int sending=0;
1095     int dummy=0;
1096         LBDB *_myLBDB = theLbdb->getLBDB();
1097         if(_restartFlag){
1098         dummyCounts = new int[CmiNumPes()];
1099         bzero(dummyCounts,sizeof(int)*CmiNumPes());
1100     }
1101 #endif
1102   for(i=0; i < m->n_moves; i++) {
1103     MigrateInfo& move = m->moves[i];
1104     const int me = CkMyPe();
1105     if (move.from_pe == me && move.to_pe != me) {
1106       DEBUGF(("[%d] migrating object to %d\n",move.from_pe,move.to_pe));
1107       // migrate object, in case it is already gone, inform toPe
1108 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
1109       if (theLbdb->Migrate(move.obj,move.to_pe) == 0) 
1110          thisProxy[move.to_pe].MissMigrate(!move.async_arrival);
1111 #else
1112             if(_restartFlag == 0){
1113                 DEBUG(CmiPrintf("[%d] need to move object from %d to %d \n",CkMyPe(),move.from_pe,move.to_pe));
1114                 theLbdb->Migrate(move.obj,move.to_pe);
1115                 sending++;
1116             }else{
1117                 if(_myLBDB->validObjHandle(move.obj)){
1118                     DEBUG(CmiPrintf("[%d] need to move object from %d to %d \n",CkMyPe(),move.from_pe,move.to_pe));
1119                     theLbdb->Migrate(move.obj,move.to_pe);
1120                     sending++;
1121                 }else{
1122                     DEBUG(CmiPrintf("[%d] dummy move to pe %d detected after restart \n",CmiMyPe(),move.to_pe));
1123                     dummyCounts[move.to_pe]++;
1124                     dummy++;
1125                 }
1126             }
1127 #endif
1128     } else if (move.from_pe != me && move.to_pe == me) {
1129        DEBUGF(("[%d] expecting object from %d\n",move.to_pe,move.from_pe));
1130       if (!move.async_arrival) migrates_expected++;
1131       else future_migrates_expected++;
1132     }
1133   }
1134   DEBUGF(("[%d] in ReceiveMigration %d moves expected: %d future expected: %d\n",CkMyPe(),m->n_moves, migrates_expected, future_migrates_expected));
1135   // if (_lb_debug) CkPrintf("[%d] expecting %d objects migrating.\n", CkMyPe(), migrates_expected);
1136
1137 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1138         if(_restartFlag){
1139                 sendDummyMigrationCounts(dummyCounts);
1140                 _restartFlag  =0;
1141         delete []dummyCounts;
1142         }
1143 #endif
1144
1145
1146 #if 0
1147   if (m->n_moves ==0) {
1148     theLbdb->SetLBPeriod(theLbdb->GetLBPeriod()*2);
1149   }
1150 #endif
1151   cur_ld_balancer = m->next_lb;
1152   if((CkMyPe() == cur_ld_balancer) && (cur_ld_balancer != 0)){
1153       LBDatabaseObj()->set_avail_vector(m->avail_vector, -2);
1154   }
1155
1156   if (migrates_expected == 0 || migrates_completed == migrates_expected)
1157     MigrationDone(1);
1158   delete m;
1159
1160 //      CkEvacuatedElement();
1161 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1162 //  migrates_expected = 0;
1163 //  //  ResumeClients(1);
1164 #endif
1165 #endif
1166 }
1167
1168 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1169 void CentralLB::ReceiveDummyMigration(int globalDecisionCount){
1170     DEBUGF(("[%d] ReceiveDummyMigration called for step %d with globalDecisionCount %d\n",CkMyPe(),step(),globalDecisionCount));
1171     //TODO: this is gonna be important when a crash happens during checkpoint
1172     //the globalDecisionCount would have to be saved and compared against
1173     //a future recvMigration
1174                 
1175         thisProxy[CkMyPe()].ResumeClients(1);
1176 }
1177 #endif
1178
1179 void CentralLB::MigrationDone(int balancing)
1180 {
1181 #if CMK_LBDB_ON
1182   migrates_completed = 0;
1183   migrates_expected = -1;
1184   // clear load stats
1185   if (balancing) theLbdb->ClearLoads();
1186   // Increment to next step
1187   theLbdb->incStep();
1188         DEBUGF(("[%d] Incrementing Step %d \n",CkMyPe(),step()));
1189   // if sync resume, invoke a barrier
1190
1191 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1192     savedBalancing = balancing;
1193     startLoadBalancingMlog(&resumeCentralLbAfterChkpt,(void *)this);
1194 #endif
1195
1196   LBDatabase::Object()->MigrationDone();    // call registered callbacks
1197
1198   LoadbalanceDone(balancing);        // callback
1199 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
1200   // if sync resume invoke a barrier
1201   if (balancing && _lb_args.syncResume()) {
1202     CkCallback cb(CkIndex_CentralLB::ResumeClients((CkReductionMsg*)NULL), 
1203                   thisProxy);
1204     contribute(0, NULL, CkReduction::sum_int, cb);
1205   }
1206   else{ 
1207     if(CmiNodeAlive(CkMyPe())){
1208         thisProxy [CkMyPe()].ResumeClients(balancing);
1209     }   
1210   }     
1211 #if CMK_GRID_QUEUE_AVAILABLE
1212   CmiGridQueueDeregisterAll ();
1213   CpvAccess(CkGridObject) = NULL;
1214 #endif
1215 #endif 
1216 #endif
1217 }
1218
1219 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1220 void CentralLB::endMigrationDone(int balancing){
1221     DEBUGF(("[%d] CentralLB::endMigrationDone step %d\n",CkMyPe(),step()));
1222
1223
1224   if (balancing && _lb_args.syncResume()) {
1225     CkCallback cb(CkIndex_CentralLB::ResumeClients((CkReductionMsg*)NULL),
1226                   thisProxy);
1227     contribute(0, NULL, CkReduction::sum_int, cb);
1228   }
1229   else{
1230     if(CmiNodeAlive(CkMyPe())){
1231     DEBUGF(("[%d] Sending ResumeClients balancing %d \n",CkMyPe(),balancing));
1232     thisProxy [CkMyPe()].ResumeClients(balancing);
1233     }
1234   }
1235
1236 }
1237 #endif
1238
1239 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1240 void resumeCentralLbAfterChkpt(void *_lb){
1241     CentralLB *lb= (CentralLB *)_lb;
1242     CpvAccess(_currentObj)=lb;
1243     lb->endMigrationDone(lb->savedBalancing);
1244 }
1245 #endif
1246
1247
1248 void CentralLB::ResumeClients(CkReductionMsg *msg)
1249 {
1250   ResumeClients(1);
1251   delete msg;
1252 }
1253
1254 void CentralLB::ResumeClients(int balancing)
1255 {
1256 #if CMK_LBDB_ON
1257 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1258     resumeCount++;
1259     globalResumeCount = resumeCount;
1260 #endif
1261   DEBUGF(("[%d] Resuming clients. balancing:%d.\n",CkMyPe(),balancing));
1262   if (balancing && _lb_args.debug() && CkMyPe() == cur_ld_balancer) {
1263     double end_lb_time = CkWallTimer();
1264   }
1265
1266 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
1267   if (balancing) ComlibNotifyMigrationDone();  
1268 #endif
1269
1270   theLbdb->ResumeClients();
1271   if (balancing)  {
1272     CheckMigrationComplete();
1273     if (future_migrates_expected == 0 || 
1274             future_migrates_expected == future_migrates_completed) {
1275       CheckMigrationComplete();
1276     }
1277   }
1278 #endif
1279 }
1280
1281 /*
1282   migration of objects contains two different kinds:
1283   (1) objects want to make a barrier for migration completion
1284       (waitForBarrier is true)
1285       migrationDone() to finish and resumeClients
1286   (2) objects don't need a barrier
1287   However, next load balancing can only happen when both migrations complete
1288 */ 
1289 void CentralLB::CheckMigrationComplete()
1290 {
1291 #if CMK_LBDB_ON
1292   lbdone ++;
1293   if (lbdone == 2) {
1294     if (_lb_args.debug() && CkMyPe()==0) {
1295       double end_lb_time = CkWallTimer();
1296       CkPrintf("CharmLB> %s: PE [%d] step %d finished at %f duration %f s\n\n",
1297                 lbname, cur_ld_balancer, step()-1, end_lb_time,
1298                 end_lb_time-start_lb_time);
1299     }
1300
1301     lb_migration_cost = (CkWallTimer() - start_lb_time);
1302
1303     lbdone = 0;
1304     future_migrates_expected = -1;
1305     future_migrates_completed = 0;
1306     DEBUGF(("[%d] Migration Complete\n", CkMyPe()));
1307     // release local barrier  so that the next load balancer can go
1308     LDOMHandle h;
1309     h.id.id.idx = 0;
1310     theLbdb->getLBDB()->DoneRegisteringObjects(h);
1311     // switch to the next load balancer in the list
1312     // subtle: called from Migrated() may result in Migrated() called in next LB
1313     theLbdb->nextLoadbalancer(seqno);
1314   }
1315 #endif
1316 }
1317
1318 void CentralLB::preprocess(LDStats* stats)
1319 {
1320   if (_lb_args.ignoreBgLoad())
1321     stats->clearBgLoad();
1322
1323   // Call the predictor for the future
1324   if (_lb_predict) FuturePredictor(statsData);
1325 }
1326
1327 // default load balancing strategy
1328 LBMigrateMsg* CentralLB::Strategy(LDStats* stats)
1329 {
1330 #if CMK_LBDB_ON
1331   double strat_start_time = CkWallTimer();
1332   if (_lb_args.debug())
1333     CkPrintf("CharmLB> %s: PE [%d] strategy starting at %f\n", lbname, cur_ld_balancer, strat_start_time);
1334
1335   work(stats);
1336
1337   if (_lb_args.debug()>1)  {
1338     CkPrintf("CharmLB> Obj Map:\n");
1339     for (int i=0; i<stats->n_objs; i++) CkPrintf("%d ", stats->to_proc[i]);
1340     CkPrintf("\n");
1341   }
1342
1343   LBMigrateMsg *msg = createMigrateMsg(stats);
1344
1345   if (_lb_args.debug()) {
1346     double strat_end_time = CkWallTimer();
1347     envelope *env = UsrToEnv(msg);
1348
1349     double lbdbMemsize = LBDatabase::Object()->useMem()/1000;
1350     CkPrintf("CharmLB> %s: PE [%d] Memory: LBManager: %d KB CentralLB: %d KB\n",
1351               lbname, cur_ld_balancer, (int)lbdbMemsize, (int)(useMem()/1000));
1352     CkPrintf("CharmLB> %s: PE [%d] #Objects migrating: %d, LBMigrateMsg size: %.2f MB\n", lbname, cur_ld_balancer, msg->n_moves, env->getTotalsize()/1024.0/1024.0);
1353     CkPrintf("CharmLB> %s: PE [%d] strategy finished at %f duration %f s\n",
1354               lbname, cur_ld_balancer, strat_end_time, strat_end_time-strat_start_time);
1355     lb_strategy_cost = (strat_end_time - strat_start_time);
1356     CkPrintf("Strategy cost %f %f %f\n", strat_end_time, strat_start_time, lb_strategy_cost);
1357   }
1358
1359   return msg;
1360 #else
1361   return NULL;
1362 #endif
1363 }
1364
1365 void CentralLB::work(LDStats* stats)
1366 {
1367   // does nothing but print the database
1368   stats->print();
1369 }
1370
1371 // generate migrate message from stats->from_proc and to_proc
1372 LBMigrateMsg * CentralLB::createMigrateMsg(LDStats* stats)
1373 {
1374   int i;
1375   CkVec<MigrateInfo*> migrateInfo;
1376   for (i=0; i<stats->n_objs; i++) {
1377     LDObjData &objData = stats->objData[i];
1378     int frompe = stats->from_proc[i];
1379     int tope = stats->to_proc[i];
1380     if (frompe != tope) {
1381       //      CkPrintf("[%d] Obj %d migrating from %d to %d\n",
1382       //         CkMyPe(),obj,pe,dest);
1383       MigrateInfo *migrateMe = new MigrateInfo;
1384       migrateMe->obj = objData.handle;
1385       migrateMe->from_pe = frompe;
1386       migrateMe->to_pe = tope;
1387       migrateMe->async_arrival = objData.asyncArrival;
1388       migrateInfo.insertAtEnd(migrateMe);
1389     }
1390   }
1391
1392   int migrate_count=migrateInfo.length();
1393   LBMigrateMsg* msg = new(migrate_count,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
1394   msg->n_moves = migrate_count;
1395   for(i=0; i < migrate_count; i++) {
1396     MigrateInfo* item = (MigrateInfo*) migrateInfo[i];
1397     msg->moves[i] = *item;
1398     delete item;
1399     migrateInfo[i] = 0;
1400   }
1401   return msg;
1402 }
1403
1404 LBMigrateMsg * CentralLB::extractMigrateMsg(LBMigrateMsg *m, int p)
1405 {
1406   int nmoves = 0;
1407   int nunavail = 0;
1408   int i;
1409   for (i=0; i<m->n_moves; i++) {
1410     MigrateInfo* item = (MigrateInfo*) &m->moves[i];
1411     if (item->from_pe == p || item->to_pe == p) nmoves++;
1412   }
1413   for (i=0; i<CkNumPes();i++) {
1414     if (!m->avail_vector[i]) nunavail++;
1415   }
1416   LBMigrateMsg* msg;
1417   if (nunavail) msg = new(nmoves,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
1418   else msg = new(nmoves,0,0,0) LBMigrateMsg;
1419   msg->n_moves = nmoves;
1420   msg->level = m->level;
1421   msg->next_lb = m->next_lb;
1422   for (i=0,nmoves=0; i<m->n_moves; i++) {
1423     MigrateInfo* item = (MigrateInfo*) &m->moves[i];
1424     if (item->from_pe == p || item->to_pe == p) {
1425       msg->moves[nmoves] = *item;
1426       nmoves++;
1427     }
1428   }
1429   // copy processor data
1430   if (nunavail)
1431   for (i=0; i<CkNumPes();i++) {
1432     msg->avail_vector[i] = m->avail_vector[i];
1433     msg->expectedLoad[i] = m->expectedLoad[i];
1434   }
1435   return msg;
1436 }
1437
1438 void CentralLB::simulationWrite() {
1439   if(step() == LBSimulation::dumpStep)
1440   {
1441     // here we are supposed to dump the database
1442     int dumpFileSize = strlen(LBSimulation::dumpFile) + 4;
1443     char *dumpFileName = (char *)malloc(dumpFileSize);
1444     while (sprintf(dumpFileName, "%s.%d", LBSimulation::dumpFile, LBSimulation::dumpStep) >= dumpFileSize) {
1445       free(dumpFileName);
1446       dumpFileSize+=3;
1447       dumpFileName = (char *)malloc(dumpFileSize);
1448     }
1449     writeStatsMsgs(dumpFileName);
1450     free(dumpFileName);
1451     CmiPrintf("LBDump: Dumped the load balancing data at step %d.\n",LBSimulation::dumpStep);
1452     ++LBSimulation::dumpStep;
1453     --LBSimulation::dumpStepSize;
1454     if (LBSimulation::dumpStepSize <= 0) { // prevent stupid step sizes
1455       CmiPrintf("Charm++> Exiting...\n");
1456       CkExit();
1457     }
1458     return;
1459   }
1460 }
1461
1462 void CentralLB::simulationRead() {
1463   LBSimulation *simResults = NULL, *realResults;
1464   LBMigrateMsg *voidMessage = new (0,0,0,0) LBMigrateMsg();
1465   voidMessage->n_moves=0;
1466   for ( ;LBSimulation::simStepSize > 0; --LBSimulation::simStepSize, ++LBSimulation::simStep) {
1467     // here we are supposed to read the data from the dump database
1468     int simFileSize = strlen(LBSimulation::dumpFile) + 4;
1469     char *simFileName = (char *)malloc(simFileSize);
1470     while (sprintf(simFileName, "%s.%d", LBSimulation::dumpFile, LBSimulation::simStep) >= simFileSize) {
1471       free(simFileName);
1472       simFileSize+=3;
1473       simFileName = (char *)malloc(simFileSize);
1474     }
1475     readStatsMsgs(simFileName);
1476
1477     // allocate simResults (only the first step)
1478     if (simResults == NULL) {
1479       simResults = new LBSimulation(LBSimulation::simProcs);
1480       realResults = new LBSimulation(LBSimulation::simProcs);
1481     }
1482     else {
1483       // should be the same number of procs of the original simulation!
1484       if (!LBSimulation::procsChanged) {
1485         // it means we have a previous step, so in simResults there is data.
1486         // we can now print the real effects of the load balancer during the simulation
1487         // or print the difference between the predicted data and the real one.
1488         realResults->reset();
1489         // reset to_proc of statsData to be equal to from_proc
1490         for (int k=0; k < statsData->n_objs; ++k) statsData->to_proc[k] = statsData->from_proc[k];
1491         findSimResults(statsData, LBSimulation::simProcs, voidMessage, realResults);
1492         simResults->PrintDifferences(realResults,statsData);
1493       }
1494       simResults->reset();
1495     }
1496
1497     // now pass it to the strategy routine
1498     double startT = CkWallTimer();
1499     preprocess(statsData);
1500     CmiPrintf("%s> Strategy starts ... \n", lbname);
1501     LBMigrateMsg* migrateMsg = Strategy(statsData);
1502     CmiPrintf("%s> Strategy took %fs memory usage: CentralLB: %d KB.\n",
1503                lbname, CkWallTimer()-startT, (int)(useMem()/1000));
1504
1505     // now calculate the results of the load balancing simulation
1506     findSimResults(statsData, LBSimulation::simProcs, migrateMsg, simResults);
1507
1508     // now we have the simulation data, so print it and loop
1509     CmiPrintf("Charm++> LBSim: Simulation of load balancing step %d done.\n",LBSimulation::simStep);
1510     // **CWL** Officially recording my disdain here for using ints for bool
1511     if (LBSimulation::showDecisionsOnly) {
1512       simResults->PrintDecisions(migrateMsg, simFileName, 
1513                                  LBSimulation::simProcs);
1514     } else {
1515       simResults->PrintSimulationResults();
1516     }
1517
1518     free(simFileName);
1519     delete migrateMsg;
1520     CmiPrintf("Charm++> LBSim: Passing to the next step\n");
1521   }
1522   // deallocate simResults
1523   delete simResults;
1524   CmiPrintf("Charm++> Exiting...\n");
1525   CkExit();
1526 }
1527
1528 void CentralLB::readStatsMsgs(const char* filename) 
1529 {
1530 #if CMK_LBDB_ON
1531   int i;
1532   FILE *f = fopen(filename, "r");
1533   if (f==NULL) {
1534     CmiPrintf("Fatal Error> Cannot open LB Dump file %s!\n", filename);
1535     CmiAbort("");
1536   }
1537
1538   // at this stage, we need to rebuild the statsMsgList and
1539   // statsDataList structures. For that first deallocate the
1540   // old structures
1541   if (statsMsgsList) {
1542     for(i = 0; i < stats_msg_count; i++)
1543       delete statsMsgsList[i];
1544     delete[] statsMsgsList;
1545     statsMsgsList=0;
1546   }
1547
1548   PUP::fromDisk pd(f);
1549   PUP::machineInfo machInfo;
1550
1551   pd((char *)&machInfo, sizeof(machInfo));      // read machine info
1552   PUP::xlater p(machInfo, pd);
1553
1554   if (_lb_args.lbversion() > 1) {
1555     p|_lb_args.lbversion();             // write version number
1556     CkPrintf("LB> File version detected: %d\n", _lb_args.lbversion());
1557     CmiAssert(_lb_args.lbversion() <= LB_FORMAT_VERSION);
1558   }
1559   p|stats_msg_count;
1560
1561   CmiPrintf("readStatsMsgs for %d pes starts ... \n", stats_msg_count);
1562   if (LBSimulation::simProcs == 0) LBSimulation::simProcs = stats_msg_count;
1563   if (LBSimulation::simProcs != stats_msg_count) LBSimulation::procsChanged = true;
1564
1565   // LBSimulation::simProcs must be set
1566   statsData->pup(p);
1567
1568   CmiPrintf("Simulation for %d pes \n", LBSimulation::simProcs);
1569   CmiPrintf("n_obj: %d n_migratble: %d \n", statsData->n_objs, statsData->n_migrateobjs);
1570
1571   // file f is closed in the destructor of PUP::fromDisk
1572   CmiPrintf("ReadStatsMsg from %s completed\n", filename);
1573 #endif
1574 }
1575
1576 void CentralLB::writeStatsMsgs(const char* filename) 
1577 {
1578 #if CMK_LBDB_ON
1579   FILE *f = fopen(filename, "w");
1580   if (f==NULL) {
1581     CmiPrintf("Fatal Error> writeStatsMsgs failed to open the output file %s!\n", filename);
1582     CmiAbort("");
1583   }
1584
1585   const PUP::machineInfo &machInfo = PUP::machineInfo::current();
1586   PUP::toDisk p(f);
1587   p((char *)&machInfo, sizeof(machInfo));       // machine info
1588
1589   p|_lb_args.lbversion();               // write version number
1590   p|stats_msg_count;
1591   statsData->pup(p);
1592
1593   fclose(f);
1594
1595   CmiPrintf("WriteStatsMsgs to %s succeed!\n", filename);
1596 #endif
1597 }
1598
1599 // calculate the predicted wallclock/cpu load for every processors
1600 // considering communication overhead if considerComm is true
1601 void getPredictedLoadWithMsg(BaseLB::LDStats* stats, int count, 
1602                       LBMigrateMsg *msg, LBInfo &info, 
1603                       int considerComm)
1604 {
1605 #if CMK_LBDB_ON
1606         stats->makeCommHash();
1607
1608         // update to_proc according to migration msgs
1609         for(int i = 0; i < msg->n_moves; i++) {
1610           MigrateInfo &mInfo = msg->moves[i];
1611           int idx = stats->getHash(mInfo.obj.objID(), mInfo.obj.omID());
1612           CmiAssert(idx != -1);
1613           stats->to_proc[idx] = mInfo.to_pe;
1614         }
1615
1616         info.getInfo(stats, count, considerComm);
1617 #endif
1618 }
1619
1620
1621 void CentralLB::findSimResults(LDStats* stats, int count, LBMigrateMsg* msg, LBSimulation* simResults)
1622 {
1623     CkAssert(simResults != NULL && count == simResults->numPes);
1624     // estimate the new loads of the processors. As a first approximation, this is the
1625     // sum of the cpu times of the objects on that processor
1626     double startT = CkWallTimer();
1627     getPredictedLoadWithMsg(stats, count, msg, simResults->lbinfo, 1);
1628     CmiPrintf("getPredictedLoad finished in %fs\n", CkWallTimer()-startT);
1629 }
1630
1631 void CentralLB::pup(PUP::er &p) { 
1632   BaseLB::pup(p); 
1633   if (p.isUnpacking())  {
1634     initLB(CkLBOptions(seqno)); 
1635   }
1636   p|reduction_started;
1637   int has_statsMsg=0;
1638   if (p.isPacking()) has_statsMsg = (statsMsg!=NULL);
1639   p|has_statsMsg;
1640   if (has_statsMsg) {
1641     if (p.isUnpacking())
1642       statsMsg = new CLBStatsMsg;
1643     statsMsg->pup(p);
1644   }
1645   p|lb_ideal_period;
1646 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1647   p | lbDecisionCount;
1648   p | resumeCount;
1649 #endif
1650         
1651 }
1652
1653 int CentralLB::useMem() { 
1654   return sizeof(CentralLB) + statsData->useMem() + 
1655          CkNumPes() * sizeof(CLBStatsMsg *);
1656 }
1657
1658
1659 /**
1660   CLBStatsMsg is not a real message now.
1661   CLBStatsMsg is used for all processors to fill in their local load and comm
1662   statistics and send to processor 0
1663 */
1664
1665 CLBStatsMsg::CLBStatsMsg(int osz, int csz) {
1666   n_objs = osz;
1667   n_comm = csz;
1668   objData = new LDObjData[osz];
1669   commData = new LDCommData[csz];
1670   avail_vector = NULL;
1671 }
1672
1673 CLBStatsMsg::~CLBStatsMsg() {
1674   delete [] objData;
1675   delete [] commData;
1676   delete [] avail_vector;
1677 }
1678
1679 void CLBStatsMsg::pup(PUP::er &p) {
1680   int i;
1681   p|from_pe;
1682   p|pe_speed;
1683   p|total_walltime;
1684   p|idletime;
1685   p|bg_walltime;
1686 #if CMK_LB_CPUTIMER
1687   p|total_cputime;
1688   p|bg_cputime;
1689 #endif
1690 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1691   p | step;
1692 #endif
1693   p|n_objs;
1694   if (p.isUnpacking()) objData = new LDObjData[n_objs];
1695   for (i=0; i<n_objs; i++) p|objData[i];
1696   p|n_comm;
1697   if (p.isUnpacking()) commData = new LDCommData[n_comm];
1698   for (i=0; i<n_comm; i++) p|commData[i];
1699
1700   int has_avail_vector;
1701   if (!p.isUnpacking()) has_avail_vector = (avail_vector != NULL);
1702   p|has_avail_vector;
1703   if (p.isUnpacking()) {
1704     if (has_avail_vector) avail_vector = new char[CkNumPes()];
1705     else avail_vector = NULL;
1706   }
1707   if (has_avail_vector) p(avail_vector, CkNumPes());
1708
1709   p(next_lb);
1710 }
1711
1712 // CkMarshalledCLBStatsMessage is used in the marshalled parameter in
1713 // the entry function, it is just used to use to pup.
1714 // I don't use CLBStatsMsg directly as marshalled parameter because
1715 // I want the data pointer stored and not to be freed by the Charm++.
1716 void CkMarshalledCLBStatsMessage::free() { 
1717   int count = msgs.size();
1718   for  (int i=0; i<count; i++) {
1719     delete msgs[i];
1720     msgs[i] = NULL;
1721   }
1722   msgs.free();
1723 }
1724
1725 void CkMarshalledCLBStatsMessage::add(CkMarshalledCLBStatsMessage &m)
1726 {
1727   int count = m.getCount();
1728   for (int i=0; i<count; i++) add(m.getMessage(i));
1729 }
1730
1731 void CkMarshalledCLBStatsMessage::pup(PUP::er &p)
1732 {
1733   int count = msgs.size();
1734   p|count;
1735   for (int i=0; i<count; i++) {
1736     CLBStatsMsg *msg;
1737     if (p.isUnpacking()) msg = new CLBStatsMsg;
1738     else { 
1739       msg = msgs[i]; CmiAssert(msg!=NULL);
1740     }
1741     msg->pup(p);
1742     if (p.isUnpacking()) add(msg);
1743   }
1744 }
1745
1746 SpanningTree::SpanningTree()
1747 {
1748         double sq = sqrt(CkNumPes()*4.0-3.0) - 1; // 1 + arity + arity*arity = CkNumPes()
1749         arity = (int)ceil(sq/2);
1750         calcParent(CkMyPe());
1751         calcNumChildren(CkMyPe());
1752 }
1753
1754 void SpanningTree::calcParent(int n)
1755 {
1756         parent=-1;
1757         if(n != 0  && arity > 0)
1758                 parent = (n-1)/arity;
1759 }
1760
1761 void SpanningTree::calcNumChildren(int n)
1762 {
1763         numChildren = 0;
1764         if (arity == 0) return;
1765         int fullNode=(CkNumPes()-1-arity)/arity;
1766         if(n <= fullNode)
1767                 numChildren = arity;
1768         if(n == fullNode+1)
1769                 numChildren = CkNumPes()-1-(fullNode+1)*arity;
1770         if(n > fullNode+1)
1771                 numChildren = 0;
1772 }
1773
1774 #include "CentralLB.def.h"
1775  
1776 /*@}*/