46493e4f94f6622f22c33be6f1350d812e816292
[charm.git] / src / ck-ldb / CentralLB.C
1 /**
2  * \addtogroup CkLdb
3 */
4 /*@{*/
5
6 #include <charm++.h>
7 #include "ck.h"
8 #include "envelope.h"
9 #include "CentralLB.h"
10 #include "LBDBManager.h"
11 #include "LBSimulation.h"
12
13 #include <vector>
14
15 #define  DEBUGF(x)       // CmiPrintf x;
16 #define  DEBUG(x)        // x;
17
18 #if CMK_MEM_CHECKPOINT
19    /* can not handle reduction in inmem FT */
20 #define USE_REDUCTION         0
21 #define USE_LDB_SPANNING_TREE 0
22 #elif defined(_FAULT_MLOG_)
23 /* can not handle reduction in inmem FT */
24 #define USE_REDUCTION         0
25 #define USE_LDB_SPANNING_TREE 0
26 #else
27 #define USE_REDUCTION         1
28 #define USE_LDB_SPANNING_TREE 1
29 #endif
30
31 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
32 extern int _restartFlag;
33 extern void getGlobalStep(CkGroupID );
34 extern void initMlogLBStep(CkGroupID );
35 extern int globalResumeCount;
36 extern void sendDummyMigrationCounts(int *);
37 #endif
38
39 #if CMK_GRID_QUEUE_AVAILABLE
40 CpvExtern(void *, CkGridObject);
41 #endif
42
43 CkGroupID loadbalancer;
44 int * lb_ptr;
45 int load_balancer_created;
46
47 double lb_migration_cost = 0.0;
48 double lb_strategy_cost = 0.0;
49
50 int lb_ideal_period = 0;
51 int lb_no_iterations = -1;
52
53 double cur_max_pe_load = 0.0;
54 double cur_avg_pe_load = 0.0;
55 double prev_load = 0.0;
56
57 struct AdaptiveData {
58   int iteration;
59   double max_load;
60   double avg_load;
61 };
62
63 struct AdaptiveLBDatabase {
64   std::vector<AdaptiveData> history_data;
65 } adaptive_lbdb;
66
67 CreateLBFunc_Def(CentralLB, "CentralLB base class")
68
69 static void getPredictedLoadWithMsg(BaseLB::LDStats* stats, int count, 
70                              LBMigrateMsg *, LBInfo &info, int considerComm);
71
72 CkReductionMsg* lbDataCollection(int nMsg, CkReductionMsg** msgs) {
73   double lb_data[3];
74   lb_data[0] = 0;
75   lb_data[1] = 0;
76   lb_data[2] = 0;
77   for (int i = 0; i < nMsg; i++) {
78     CkAssert(msgs[i]->getSize() == 3*sizeof(double));
79     double* m = (double *)msgs[i]->getData();
80     lb_data[0] += m[0];
81     lb_data[1] = ((m[1] > lb_data[1])? m[1] : lb_data[1]);
82     lb_data[2] += m[2];
83   }
84   return CkReductionMsg::buildNew(3*sizeof(double), lb_data);
85 }
86
87 /*global*/ CkReduction::reducerType lbDataCollectionType;
88 /*initcall*/ void registerLBDataCollection(void) {
89   lbDataCollectionType = CkReduction::addReducer(lbDataCollection);
90 }
91
92 /*
93 void CreateCentralLB()
94 {
95   CProxy_CentralLB::ckNew(0);
96 }
97 */
98
99 void CentralLB::staticStartLB(void* data)
100 {
101   CentralLB *me = (CentralLB*)(data);
102   me->StartLB();
103 }
104
105 void CentralLB::staticMigrated(void* data, LDObjHandle h, int waitBarrier)
106 {
107   CentralLB *me = (CentralLB*)(data);
108   me->Migrated(h, waitBarrier);
109 }
110
111 void CentralLB::staticAtSync(void* data)
112 {
113   CentralLB *me = (CentralLB*)(data);
114   me->AtSync();
115 }
116
117 void CentralLB::initLB(const CkLBOptions &opt)
118 {
119 #if CMK_LBDB_ON
120   lbname = "CentralLB";
121   thisProxy = CProxy_CentralLB(thisgroup);
122   //  CkPrintf("Construct in %d\n",CkMyPe());
123
124   // create and turn on by default
125   receiver = theLbdb->
126     AddLocalBarrierReceiver((LDBarrierFn)(staticAtSync),(void*)(this));
127   notifier = theLbdb->getLBDB()->
128     NotifyMigrated((LDMigratedFn)(staticMigrated),(void*)(this));
129   startLbFnHdl = theLbdb->getLBDB()->
130     AddStartLBFn((LDStartLBFn)(staticStartLB),(void*)(this));
131
132   // CkPrintf("[%d] CentralLB initLB \n",CkMyPe());
133   if (opt.getSeqNo() > 0) turnOff();
134
135   stats_msg_count = 0;
136   statsMsgsList = NULL;
137   statsData = NULL;
138
139   storedMigrateMsg = NULL;
140   reduction_started = 0;
141
142   // for future predictor
143   if (_lb_predict) predicted_model = new FutureModel(_lb_predict_window);
144   else predicted_model=0;
145   // register user interface callbacks
146   theLbdb->getLBDB()->SetupPredictor((LDPredictModelFn)(staticPredictorOn),(LDPredictWindowFn)(staticPredictorOnWin),(LDPredictFn)(staticPredictorOff),(LDPredictModelFn)(staticChangePredictor),(void*)(this));
147
148   myspeed = theLbdb->ProcessorSpeed();
149
150   migrates_completed = 0;
151   future_migrates_completed = 0;
152   migrates_expected = -1;
153   future_migrates_expected = -1;
154   cur_ld_balancer = _lb_args.central_pe();      // 0 default
155   lbdone = 0;
156   count_msgs=0;
157   statsMsg = NULL;
158
159   if (_lb_args.statsOn()) theLbdb->CollectStatsOn();
160
161   load_balancer_created = 1;
162 #endif
163 }
164
165 CentralLB::~CentralLB()
166 {
167 #if CMK_LBDB_ON
168   delete [] statsMsgsList;
169   delete statsData;
170   theLbdb = CProxy_LBDatabase(_lbdb).ckLocalBranch();
171   if (theLbdb) {
172     theLbdb->getLBDB()->
173       RemoveNotifyMigrated(notifier);
174     theLbdb->
175       RemoveStartLBFn((LDStartLBFn)(staticStartLB));
176   }
177 #endif
178 }
179
180 void CentralLB::turnOn() 
181 {
182 #if CMK_LBDB_ON
183   theLbdb->getLBDB()->
184     TurnOnBarrierReceiver(receiver);
185   theLbdb->getLBDB()->
186     TurnOnNotifyMigrated(notifier);
187   theLbdb->getLBDB()->
188     TurnOnStartLBFn(startLbFnHdl);
189 #endif
190 }
191
192 void CentralLB::turnOff() 
193 {
194 #if CMK_LBDB_ON
195   theLbdb->getLBDB()->
196     TurnOffBarrierReceiver(receiver);
197   theLbdb->getLBDB()->
198     TurnOffNotifyMigrated(notifier);
199   theLbdb->getLBDB()->
200     TurnOffStartLBFn(startLbFnHdl);
201 #endif
202 }
203
204 void CentralLB::AtSync()
205 {
206   CkPrintf("AtSync CEntral LB [%d]\n", CkMyPe());
207 #if CMK_LBDB_ON
208   DEBUGF(("[%d] CentralLB AtSync step %d!!!!!\n",CkMyPe(),step()));
209
210 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
211         CpvAccess(_currentObj)=this;
212 #endif
213
214   // if num of processor is only 1, nothing should happen
215   if (!QueryBalanceNow(step()) || CkNumPes() == 1) {
216     MigrationDone(0);
217     return;
218   }
219   if(CmiNodeAlive(CkMyPe())){
220     thisProxy [CkMyPe()].ProcessAtSyncMin();
221   }
222 #endif
223 }
224
225 #include "ComlibStrategy.h"
226
227 void CentralLB::ProcessAtSync()
228 {
229
230   // Reset all adaptive lb related fields since load balancing is being done.
231   lb_no_iterations = -1;
232   adaptive_lbdb.history_data.clear();
233   prev_load = 0.0;
234
235 #if CMK_LBDB_ON
236   if (reduction_started) return;              // reducton in progress
237
238   CmiAssert(CmiNodeAlive(CkMyPe()));
239   if (CkMyPe() == cur_ld_balancer) {
240     start_lb_time = CkWallTimer();
241   }
242
243
244 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
245         initMlogLBStep(thisgroup);
246 #endif
247
248   // build message
249   BuildStatsMsg();
250
251 #if USE_REDUCTION
252     // reduction to get total number of objects and comm
253     // so that processor 0 can pre-allocate load balancing database
254   int counts[2];
255   counts[0] = theLbdb->GetObjDataSz();
256   counts[1] = theLbdb->GetCommDataSz();
257
258   CkCallback cb(CkIndex_CentralLB::ReceiveCounts((CkReductionMsg*)NULL), 
259                   thisProxy[0]);
260   contribute(2*sizeof(int), counts, CkReduction::sum_int, cb);
261   reduction_started = 1;
262 #else
263   SendStats();
264 #endif
265 #endif
266 }
267
268 void CentralLB::ProcessAtSyncMin()
269 {
270 #if CMK_LBDB_ON
271   if (reduction_started) return;              // reducton in progress
272
273   CmiAssert(CmiNodeAlive(CkMyPe()));
274   if (CkMyPe() == cur_ld_balancer) {
275     start_lb_time = CkWallTimer();
276   }
277
278
279 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
280         initMlogLBStep(thisgroup);
281 #endif
282   
283   lb_no_iterations++;
284   CkPrintf("ProcessAtSyncMin [%d] lb_iteration [%d] lb_ideal_period [%d]\n", CkMyPe(),
285       lb_no_iterations, lb_ideal_period);
286
287   if (lb_no_iterations >= lb_ideal_period) {
288     CkPrintf("Since the time is past the collection period, send MinStats of [%d]\n", CkMyPe());
289     SendMinStats();
290   } else {
291
292  double total_walltime;
293  double idle_time;
294  double bg_walltime;
295   theLbdb->GetTime(&total_walltime,&total_walltime, &idle_time, &bg_walltime, &bg_walltime);
296     CkPrintf("No need to send MinStats of [%d] load: %lf\n\n", CkMyPe(),
297     total_walltime);
298     ResumeClients(0);
299   }
300 #endif
301 }
302
303 void CentralLB::SendMinStats() {
304
305  double total_walltime;
306  double idle_time;
307  double bg_walltime;
308   theLbdb->GetTime(&total_walltime,&total_walltime, &idle_time, &bg_walltime, &bg_walltime);
309   CkPrintf("Total walltime [%d] %lf\n", CkMyPe(), total_walltime);
310
311   // Since the total_walltime is cumulative since the last load balancing stage,
312   // Hence it is subtracted from the previous load.
313   double tmp = total_walltime;
314   total_walltime -= prev_load;
315   prev_load = tmp; 
316
317   double lb_data[3];
318   lb_data[0] = total_walltime;
319   lb_data[1] = total_walltime;
320   lb_data[2] = 1;
321
322   CkCallback cb(CkIndex_CentralLB::ReceiveMinStats((CkReductionMsg*)NULL), 
323                   thisProxy[0]);
324   contribute(3*sizeof(double), lb_data, lbDataCollectionType, cb);
325 }
326
327 void CentralLB::ReceiveMinStats(CkReductionMsg *msg) {
328   CmiAssert(CkMyPe() == 0);
329   double* load = (LBRealType *) msg->getData();
330   double max = load[1];
331   double avg = load[0]/load[2];
332   CkPrintf("Iteration[%d] Total load : %lf Avg load: %lf Max load: %lf\n\n",lb_no_iterations, load[0], load[0]/load[2], load[1]);
333   if (lb_no_iterations == 0) {
334     return;
335   }
336
337   // Store the data for this iteration
338   AdaptiveData data;
339   data.iteration = lb_no_iterations;
340   data.max_load = max;
341   data.avg_load = avg;
342   adaptive_lbdb.history_data.push_back(data);
343
344   // If the max/avg ratio is greater than the threshold and also this is not the
345   // step immediately after load balancing, carry out load balancing
346   if (max/avg >= 1.1) {
347     CkPrintf("Carry out load balancing step\n");
348     thisProxy.ProcessAtSync();
349     return;
350   }
351
352   CkPrintf("No need to do lb now 
353   // Generate the plan for the adaptive strategy
354   if (generatePlan()) {
355     thisProxy.ResumeClients(lb_ideal_period, 0);
356   } else {
357     thisProxy.ResumeClients(0);
358   }
359 }
360
361 bool CentralLB::generatePlan() {
362   if (adaptive_lbdb.history_data.size() <= 3) {
363     return false;
364   }
365
366   // Some heuristics for lbperiod
367   // If constant load or almost constant,
368   // then max * new_lb_period > avg * new_lb_period + lb_cost
369 //  double max = 0.0;
370 //  double avg = 0.0;
371 //  AdaptiveData data;
372 //  for (int i = 1; i < adaptive_lbdb.history_data.size(); i++) {
373 //    data = adaptive_lbdb.history_data[i];
374 //    max += data.max_load;
375 //    avg += data.avg_load;
376 //  }
377 //  max /= (lb_no_iterations - adaptive_lbdb.history_data[0].iteration);
378 //  avg /= (lb_no_iterations - adaptive_lbdb.history_data[0].iteration);
379 //
380 //  lb_ideal_period = (lb_strategy_cost + lb_migration_cost) / (max - avg);
381 //  CkPrintf("max : %lf, avg: %lf, strat cost: %lf, migration_cost: %lf, idealperiod : %d \n",
382 //      max, avg, lb_strategy_cost, lb_migration_cost, lb_ideal_period);
383
384   // If linearly varying load, then find lb_period
385   // area between the max and avg curve 
386   double mslope, aslope, mc, ac;
387   getLineEq(aslope, ac, mslope, mc);
388   CkPrintf("\n max: %fx + %f; avg: %fx + %f\n", mslope, mc, aslope, ac);
389   double a = (mslope - aslope)/2;
390   double b = (mc - ac);
391   double c = -(lb_strategy_cost + lb_migration_cost);
392   lb_ideal_period = getPeriodForLinear(a, b, c);
393   CkPrintf("Ideal period for linear load %d\n", lb_ideal_period);
394
395   return true;
396 }
397
398 int CentralLB::getPeriodForLinear(double a, double b, double c) {
399   if (a == 0) {
400     return -c / b;
401   }
402   int x;
403   int t = (b * b) - (4*a*c);
404   x = (-b + sqrt(t)) / (2*a);
405   return x;
406 }
407
408 bool CentralLB::getLineEq(double& aslope, double& ac, double& mslope, double& mc) {
409   int total = adaptive_lbdb.history_data.size();
410   int iterations = 1 + adaptive_lbdb.history_data[total - 1].iteration -
411       adaptive_lbdb.history_data[0].iteration;
412   double a1 = 0;
413   double m1 = 0;
414   double a2 = 0;
415   double m2 = 0;
416   AdaptiveData data;
417   int i = 0;
418   for (i = 0; i <= total/2; i++) {
419     data = adaptive_lbdb.history_data[i];
420     m1 += data.max_load;
421     a1 += data.avg_load;
422   }
423   m1 /= i;
424   a1 /= i;
425
426   for (i = total/2; i < total; i++) {
427     data = adaptive_lbdb.history_data[i];
428     m2 += data.max_load;
429     a2 += data.avg_load;
430   }
431   m2 /= (i - total/2);
432   a2 /= (i - total/2);
433
434   aslope = 2 * (a2 - a1) / iterations;
435   mslope = 2 * (m2 - m1) / iterations;
436   ac = adaptive_lbdb.history_data[0].avg_load;
437   mc = adaptive_lbdb.history_data[0].max_load;
438   return true;
439 }
440
441 // called only on 0
442 void CentralLB::ReceiveCounts(CkReductionMsg  *msg)
443 {
444   CmiAssert(CkMyPe() == 0);
445   if (statsData == NULL) statsData = new LDStats;
446
447   int *counts = (int *)msg->getData();
448   int n_objs = counts[0];
449   int n_comm = counts[1];
450
451     // resize database
452   statsData->objData.resize(n_objs);
453   statsData->from_proc.resize(n_objs);
454   statsData->to_proc.resize(n_objs);
455   statsData->commData.resize(n_comm);
456
457   DEBUGF(("[%d] ReceiveCounts: n_objs:%d n_comm:%d\n",CkMyPe(), n_objs, n_comm));
458         
459     // broadcast call to let everybody start to send stats
460   thisProxy.SendStats();
461 }
462
463 void CentralLB::BuildStatsMsg()
464 {
465 #if CMK_LBDB_ON
466   // build and send stats
467   const int osz = theLbdb->GetObjDataSz();
468   const int csz = theLbdb->GetCommDataSz();
469
470   int npes = CkNumPes();
471   CLBStatsMsg* msg = new CLBStatsMsg(osz, csz);
472   _MEMCHECK(msg);
473   msg->from_pe = CkMyPe();
474 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
475         msg->step = step();
476 #endif
477   //msg->serial = CrnRand();
478
479 /*
480   theLbdb->TotalTime(&msg->total_walltime,&msg->total_cputime);
481   theLbdb->IdleTime(&msg->idletime);
482   theLbdb->BackgroundLoad(&msg->bg_walltime,&msg->bg_cputime);
483 */
484 #if CMK_LB_CPUTIMER
485   theLbdb->GetTime(&msg->total_walltime,&msg->total_cputime,
486                    &msg->idletime, &msg->bg_walltime,&msg->bg_cputime);
487 #else
488   theLbdb->GetTime(&msg->total_walltime,&msg->total_walltime,
489                    &msg->idletime, &msg->bg_walltime,&msg->bg_walltime);
490 #endif
491
492   msg->pe_speed = myspeed;
493   DEBUGF(("Processor %d Total time (wall,cpu) = %f %f Idle = %f Bg = %f %f\n", CkMyPe(),msg->total_walltime,msg->total_cputime,msg->idletime,msg->bg_walltime,msg->bg_cputime));
494
495   msg->n_objs = osz;
496   theLbdb->GetObjData(msg->objData);
497   msg->n_comm = csz;
498   theLbdb->GetCommData(msg->commData);
499 //  theLbdb->ClearLoads();
500   DEBUGF(("PE %d BuildStatsMsg %d objs, %d comm\n",CkMyPe(),msg->n_objs,msg->n_comm));
501
502   if(CkMyPe() == cur_ld_balancer) {
503     msg->avail_vector = new char[CkNumPes()];
504     LBDatabaseObj()->get_avail_vector(msg->avail_vector);
505     msg->next_lb = LBDatabaseObj()->new_lbbalancer();
506   }
507
508   CmiAssert(statsMsg == NULL);
509   statsMsg = msg;
510 #endif
511 }
512
513
514 // called on every processor
515 void CentralLB::SendStats()
516 {
517 #if CMK_LBDB_ON
518   CmiAssert(statsMsg != NULL);
519   reduction_started = 0;
520
521 #if USE_LDB_SPANNING_TREE
522   if(CkNumPes()>1024)
523   {
524     if (CkMyPe() == cur_ld_balancer)
525       thisProxy[CkMyPe()].ReceiveStats(statsMsg);
526     else
527       thisProxy[CkMyPe()].ReceiveStatsViaTree(statsMsg);
528   }
529   else
530 #endif
531   {
532     DEBUGF(("[%d] calling ReceiveStats on step %d \n",CmiMyPe(),step()));
533     thisProxy[cur_ld_balancer].ReceiveStats(statsMsg);
534   }
535
536   statsMsg = NULL;
537
538 #ifdef __BIGSIM__
539   BgEndStreaming();
540 #endif
541
542   {
543   // enfore the barrier to wait until centralLB says no
544   LDOMHandle h;
545   h.id.id.idx = 0;
546   theLbdb->getLBDB()->RegisteringObjects(h);
547   }
548 #endif
549 }
550
551 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
552 extern int donotCountMigration;
553 #endif
554
555 void CentralLB::Migrated(LDObjHandle h, int waitBarrier)
556 {
557 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
558     if(donotCountMigration){
559         return ;
560     }
561 #endif
562
563 #if CMK_LBDB_ON
564   if (waitBarrier) {
565             migrates_completed++;
566       DEBUGF(("[%d] An object migrated! %d %d\n",CkMyPe(),migrates_completed,migrates_expected));
567     if (migrates_completed == migrates_expected) {
568       MigrationDone(1);
569     }
570   }
571   else {
572     future_migrates_completed ++;
573     DEBUGF(("[%d] An object migrated with no barrier! %d expected: %d\n",CkMyPe(),future_migrates_completed,future_migrates_expected));
574     if (future_migrates_completed == future_migrates_expected)  {
575         CheckMigrationComplete();
576     }
577   }
578 #endif
579 }
580
581 void CentralLB::MissMigrate(int waitForBarrier)
582 {
583   LDObjHandle h;
584   Migrated(h, waitForBarrier);
585 }
586
587 // build a complete data from bufferred messages
588 // not used when USE_REDUCTION = 1
589 void CentralLB::buildStats()
590 {
591     statsData->nprocs() = stats_msg_count;
592     // allocate space
593     statsData->objData.resize(statsData->n_objs);
594     statsData->from_proc.resize(statsData->n_objs);
595     statsData->to_proc.resize(statsData->n_objs);
596     statsData->commData.resize(statsData->n_comm);
597
598     int nobj = 0;
599     int ncom = 0;
600     int nmigobj = 0;
601     // copy all data in individule message to this big structure
602     for (int pe=0; pe<CkNumPes(); pe++) {
603        int i;
604        CLBStatsMsg *msg = statsMsgsList[pe];
605        if(msg == NULL) continue;
606        for (i=0; i<msg->n_objs; i++) {
607          statsData->from_proc[nobj] = statsData->to_proc[nobj] = pe;
608          statsData->objData[nobj] = msg->objData[i];
609          if (msg->objData[i].migratable) nmigobj++;
610          nobj++;
611        }
612        for (i=0; i<msg->n_comm; i++) {
613          statsData->commData[ncom] = msg->commData[i];
614          ncom++;
615        }
616        // free the memory
617        delete msg;
618        statsMsgsList[pe]=0;
619     }
620     statsData->n_migrateobjs = nmigobj;
621 }
622
623 // deposit one processor data at a time, note database is pre-allocated
624 // to have enough space
625 // used when USE_REDUCTION = 1
626 void CentralLB::depositData(CLBStatsMsg *m)
627 {
628   int i;
629   if (m == NULL) return;
630
631   const int pe = m->from_pe;
632   struct ProcStats &procStat = statsData->procs[pe];
633   procStat.pe = pe;
634   procStat.total_walltime = m->total_walltime;
635   procStat.idletime = m->idletime;
636   procStat.bg_walltime = m->bg_walltime;
637 #if CMK_LB_CPUTIMER
638   procStat.total_cputime = m->total_cputime;
639   procStat.bg_cputime = m->bg_cputime;
640 #endif
641   procStat.pe_speed = m->pe_speed;
642   //procStat.utilization = 1.0;
643   procStat.available = CmiTrue;
644   procStat.n_objs = m->n_objs;
645
646   int &nobj = statsData->n_objs;
647   int &nmigobj = statsData->n_migrateobjs;
648   for (i=0; i<m->n_objs; i++) {
649       statsData->from_proc[nobj] = statsData->to_proc[nobj] = pe;
650       statsData->objData[nobj] = m->objData[i];
651       if (m->objData[i].migratable) nmigobj++;
652       nobj++;
653       CmiAssert(nobj <= statsData->objData.capacity());
654   }
655   int &n_comm = statsData->n_comm;
656   for (i=0; i<m->n_comm; i++) {
657       statsData->commData[n_comm] = m->commData[i];
658       n_comm++;
659       CmiAssert(n_comm <= statsData->commData.capacity());
660   }
661   delete m;
662 }
663
664 void CentralLB::ReceiveStats(CkMarshalledCLBStatsMessage &msg)
665 {
666 #if CMK_LBDB_ON
667   if (statsMsgsList == NULL) {
668     statsMsgsList = new CLBStatsMsg*[CkNumPes()];
669     CmiAssert(statsMsgsList != NULL);
670     for(int i=0; i < CkNumPes(); i++)
671       statsMsgsList[i] = 0;
672   }
673   if (statsData == NULL) statsData = new LDStats;
674
675     //  loop through all CLBStatsMsg in the incoming msg
676   int count = msg.getCount();
677   for (int num = 0; num < count; num++) 
678   {
679     CLBStatsMsg *m = msg.getMessage(num);
680     CmiAssert(m!=NULL);
681     const int pe = m->from_pe;
682     DEBUGF(("Stats msg received, %d %d %d %p step %d\n", pe,stats_msg_count,m->n_objs,m,step()));
683 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))     
684 /*      
685  *  if(m->step < step()){
686  *    //TODO: if a processor is redoing an old load balance step..
687  *    //tell it that the step is done and that it should not perform any migrations
688  *      thisProxy[pe].ReceiveDummyMigration();
689  *  }*/
690 #endif
691         
692     if(!CmiNodeAlive(pe)){
693         DEBUGF(("[%d] ReceiveStats called from invalidProcessor %d\n",CkMyPe(),pe));
694         continue;
695     }
696         
697     if (m->avail_vector!=NULL) {
698       LBDatabaseObj()->set_avail_vector(m->avail_vector,  m->next_lb);
699     }
700
701     if (statsMsgsList[pe] != 0) {
702       CkPrintf("*** Unexpected CLBStatsMsg in ReceiveStats from PE %d ***\n",
703              pe);
704     } else {
705       statsMsgsList[pe] = m;
706 #if USE_REDUCTION
707       depositData(m);
708 #else
709       // store per processor data right away
710       struct ProcStats &procStat = statsData->procs[pe];
711       procStat.pe = pe;
712       procStat.total_walltime = m->total_walltime;
713       procStat.idletime = m->idletime;
714       procStat.bg_walltime = m->bg_walltime;
715 #if CMK_LB_CPUTIMER
716       procStat.total_cputime = m->total_cputime;
717       procStat.bg_cputime = m->bg_cputime;
718 #endif
719       procStat.pe_speed = m->pe_speed;
720       //procStat.utilization = 1.0;
721       procStat.available = CmiTrue;
722       procStat.n_objs = m->n_objs;
723
724       statsData->n_objs += m->n_objs;
725       statsData->n_comm += m->n_comm;
726 #endif
727       stats_msg_count++;
728     }
729   }    // end of for
730
731   const int clients = CkNumValidPes();
732   DEBUGF(("THIS POINT count = %d, clients = %d\n",stats_msg_count,clients));
733  
734   if (stats_msg_count == clients) {
735         DEBUGF(("[%d] All stats messages received \n",CmiMyPe()));
736     statsData->nprocs() = stats_msg_count;
737     thisProxy[CkMyPe()].LoadBalance();
738   }
739 #endif
740 }
741
742 /** added by Abhinav for receiving msgs via spanning tree */
743 void CentralLB::ReceiveStatsViaTree(CkMarshalledCLBStatsMessage &msg)
744 {
745 #if CMK_LBDB_ON
746         CmiAssert(CkMyPe() != 0);
747         bufMsg.add(msg);         // buffer messages
748         count_msgs++;
749         //CkPrintf("here %d\n", CkMyPe());
750         if (count_msgs == st.numChildren+1) {
751                 if(st.parent == 0)
752                 {
753                         thisProxy[0].ReceiveStats(bufMsg);
754                         //CkPrintf("from %d\n", CkMyPe());
755                 }
756                 else
757                         thisProxy[st.parent].ReceiveStatsViaTree(bufMsg);
758                 count_msgs = 0;
759                 bufMsg.free();
760         } 
761 #endif
762 }
763
764 void CentralLB::LoadBalance()
765 {
766 #if CMK_LBDB_ON
767   int proc;
768   const int clients = CkNumPes();
769
770 #if ! USE_REDUCTION
771   // build data
772   buildStats();
773 #else
774   for (proc = 0; proc < clients; proc++) statsMsgsList[proc] = NULL;
775 #endif
776
777   if (_lb_args.debug()) 
778       CmiPrintf("\nCharmLB> %s: PE [%d] step %d starting at %f Memory: %f MB\n",
779                   lbname, cur_ld_balancer, step(), start_lb_time,
780                   CmiMemoryUsage()/(1024.0*1024.0));
781
782   // if we are in simulation mode read data
783   if (LBSimulation::doSimulation) simulationRead();
784
785   char *availVector = LBDatabaseObj()->availVector();
786   for(proc = 0; proc < clients; proc++)
787       statsData->procs[proc].available = (CmiBool)availVector[proc];
788
789   preprocess(statsData);
790
791 //    CkPrintf("Before Calling Strategy\n");
792
793   if (_lb_args.printSummary()) {
794       LBInfo info(clients);
795         // not take comm data
796       info.getInfo(statsData, clients, 0);
797       LBRealType mLoad, mCpuLoad, totalLoad;
798       info.getSummary(mLoad, mCpuLoad, totalLoad);
799       int nmsgs, nbytes;
800       statsData->computeNonlocalComm(nmsgs, nbytes);
801       CkPrintf("[%d] Load Summary (before LB): max (with bg load): %f max (obj only): %f average: %f at step %d nonlocal: %d msgs %.2fKB.\n", CkMyPe(), mLoad, mCpuLoad, totalLoad/clients, step(), nmsgs, 1.0*nbytes/1024);
802 //      if (_lb_args.debug() > 1) {
803 //        for (int i=0; i<statsData->n_objs; i++)
804 //          CmiPrintf("[%d] %.10f %.10f\n", i, statsData->objData[i].minWall, statsData->objData[i].maxWall);
805 //      }
806   }
807
808 #if CMK_REPLAYSYSTEM
809   LDHandle *loadBalancer_pointers;
810   if (_replaySystem) {
811     loadBalancer_pointers = (LDHandle*)malloc(CkNumPes()*sizeof(LDHandle));
812     for (int i=0; i<statsData->n_objs; ++i) loadBalancer_pointers[statsData->from_proc[i]] = statsData->objData[i].handle.omhandle.ldb;
813   }
814 #endif
815   
816   LBMigrateMsg* migrateMsg = Strategy(statsData);
817 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
818         migrateMsg->step = step();
819 #endif
820
821 #if CMK_REPLAYSYSTEM
822   CpdHandleLBMessage(&migrateMsg);
823   if (_replaySystem) {
824     for (int i=0; i<migrateMsg->n_moves; ++i) migrateMsg->moves[i].obj.omhandle.ldb = loadBalancer_pointers[migrateMsg->moves[i].from_pe];
825     free(loadBalancer_pointers);
826   }
827 #endif
828   
829   LBDatabaseObj()->get_avail_vector(migrateMsg->avail_vector);
830   migrateMsg->next_lb = LBDatabaseObj()->new_lbbalancer();
831
832   // if this is the step at which we need to dump the database
833   simulationWrite();
834
835 //  calculate predicted load
836 //  very time consuming though, so only happen when debugging is on
837   if (_lb_args.printSummary()) {
838       LBInfo info(clients);
839         // not take comm data
840       getPredictedLoadWithMsg(statsData, clients, migrateMsg, info, 0);
841       LBRealType mLoad, mCpuLoad, totalLoad;
842       info.getSummary(mLoad, mCpuLoad, totalLoad);
843       int nmsgs, nbytes;
844       statsData->computeNonlocalComm(nmsgs, nbytes);
845       CkPrintf("[%d] Load Summary (after LB): max (with bg load): %f max (obj only): %f average: %f at step %d nonlocal: %d msgs %.2fKB useMem: %.2fKB.\n", CkMyPe(), mLoad, mCpuLoad, totalLoad/clients, step(), nmsgs, 1.0*nbytes/1024, (1.0*useMem())/1024);
846       for (int i=0; i<clients; i++)
847         migrateMsg->expectedLoad[i] = info.peLoads[i];
848   }
849
850   DEBUGF(("[%d]calling recv migration\n",CkMyPe()));
851 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_)) 
852     lbDecisionCount++;
853     migrateMsg->lbDecisionCount = lbDecisionCount;
854 #endif
855
856   envelope *env = UsrToEnv(migrateMsg);
857   if (1) {
858       // broadcast
859     thisProxy.ReceiveMigration(migrateMsg);
860   }
861   else {
862     // split the migration for each processor
863     for (int p=0; p<CkNumPes(); p++) {
864       LBMigrateMsg *m = extractMigrateMsg(migrateMsg, p);
865       thisProxy[p].ReceiveMigration(m);
866     }
867     delete migrateMsg;
868   }
869
870   // Zero out data structures for next cycle
871   // CkPrintf("zeroing out data\n");
872   statsData->clear();
873   stats_msg_count=0;
874 #endif
875 }
876
877 // test if sender and receiver in a commData is nonmigratable.
878 static int isMigratable(LDObjData **objData, int *len, int count, const LDCommData &commData)
879 {
880 #if CMK_LBDB_ON
881   for (int pe=0 ; pe<count; pe++)
882   {
883     for (int i=0; i<len[pe]; i++)
884       if (LDObjIDEqual(objData[pe][i].objID(), commData.sender.objID()) ||
885           LDObjIDEqual(objData[pe][i].objID(), commData.receiver.get_destObj().objID())) 
886       return 0;
887   }
888 #endif
889   return 1;
890 }
891
892 // rebuild LDStats and remove all non-migratble objects and related things
893 void CentralLB::removeNonMigratable(LDStats* stats, int count)
894 {
895   int i;
896
897   // check if we have non-migratable objects
898   int have = 0;
899   for (i=0; i<stats->n_objs; i++) 
900   {
901     LDObjData &odata = stats->objData[i];
902     if (!odata.migratable) {
903       have = 1; break;
904     }
905   }
906   if (have == 0) return;
907
908   CkVec<LDObjData> nonmig;
909   CkVec<int> new_from_proc, new_to_proc;
910   nonmig.resize(stats->n_migrateobjs);
911   new_from_proc.resize(stats->n_migrateobjs);
912   new_to_proc.resize(stats->n_migrateobjs);
913   int n_objs = 0;
914   for (i=0; i<stats->n_objs; i++) 
915   {
916     LDObjData &odata = stats->objData[i];
917     if (odata.migratable) {
918       nonmig[n_objs] = odata;
919       new_from_proc[n_objs] = stats->from_proc[i];
920       new_to_proc[n_objs] = stats->to_proc[i];
921       n_objs ++;
922     }
923     else {
924       stats->procs[stats->from_proc[i]].bg_walltime += odata.wallTime;
925 #if CMK_LB_CPUTIMER
926       stats->procs[stats->from_proc[i]].bg_cputime += odata.cpuTime;
927 #endif
928     }
929   }
930   CmiAssert(stats->n_migrateobjs == n_objs);
931
932   stats->makeCommHash();
933   
934   CkVec<LDCommData> newCommData;
935   newCommData.resize(stats->n_comm);
936   int n_comm = 0;
937   for (i=0; i<stats->n_comm; i++) 
938   {
939     LDCommData& cdata = stats->commData[i];
940     if (!cdata.from_proc()) 
941     {
942       int idx = stats->getSendHash(cdata);
943       CmiAssert(idx != -1);
944       if (!stats->objData[idx].migratable) continue;
945     }
946     switch (cdata.receiver.get_type()) {
947     case LD_PROC_MSG:
948       break;
949     case LD_OBJ_MSG:  {
950       int idx = stats->getRecvHash(cdata);
951       if (stats->complete_flag)
952         CmiAssert(idx != -1);
953       else if (idx == -1) continue;          // receiver not in this group
954       if (!stats->objData[idx].migratable) continue;
955       break;
956       }
957     case LD_OBJLIST_MSG:    // object message FIXME add multicast
958       break;
959     }
960     newCommData[n_comm] = cdata;
961     n_comm ++;
962   }
963
964   if (n_objs != stats->n_objs) CmiPrintf("Removed %d nonmigratable %d comms - n_objs:%d migratable:%d\n", stats->n_objs-n_objs, stats->n_objs, stats->n_migrateobjs, stats->n_comm-n_comm);
965
966   // swap to new data
967   stats->objData = nonmig;
968   stats->from_proc = new_from_proc;
969   stats->to_proc = new_to_proc;
970   stats->n_objs = n_objs;
971
972   stats->commData = newCommData;
973   stats->n_comm = n_comm;
974
975   stats->deleteCommHash();
976   stats->makeCommHash();
977
978 }
979
980
981 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
982 extern int restarted;
983 #endif
984
985 void CentralLB::ReceiveMigration(LBMigrateMsg *m)
986 {
987   storedMigrateMsg = m;
988   CkCallback cb(CkIndex_CentralLB::ProcessReceiveMigration((CkReductionMsg*)NULL),
989                   thisProxy);
990   contribute(0, NULL, CkReduction::max_int, cb);
991 }
992
993 void CentralLB::ProcessReceiveMigration(CkReductionMsg  *msg)
994 {
995 #if CMK_LBDB_ON
996         int i;
997         LBMigrateMsg *m = storedMigrateMsg;
998         CmiAssert(m!=NULL);
999         delete msg;
1000
1001 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1002         int *dummyCounts;
1003
1004         DEBUGF(("[%d] Starting ReceiveMigration WITH step %d m->step %d\n",CkMyPe(),step(),m->step));
1005         // CmiPrintf("[%d] Starting ReceiveMigration step %d m->step %d\n",CkMyPe(),step(),m->step);
1006         if(step() > m->step){
1007                 char str[100];
1008                 envelope *env = UsrToEnv(m);
1009                 CmiPrintf("[%d] Object %s tProcessed %d m->TN %d\n",CmiMyPe(),mlogData->objID.toString(str),mlogData->tProcessed,env->TN);
1010                 return;
1011         }
1012         lbDecisionCount = m->lbDecisionCount;
1013 #endif
1014
1015   if (_lb_args.debug() > 1) 
1016     if (CkMyPe()%1024==0) CmiPrintf("[%d] Starting ReceiveMigration step %d at %f\n",CkMyPe(),step(), CmiWallTimer());
1017
1018   for (i=0; i<CkNumPes(); i++) theLbdb->lastLBInfo.expectedLoad[i] = m->expectedLoad[i];
1019   CmiAssert(migrates_expected <= 0 || migrates_completed == migrates_expected);
1020 /*FAULT_EVAC*/
1021   if(!CmiNodeAlive(CkMyPe())){
1022         delete m;
1023         return;
1024   }
1025   migrates_expected = 0;
1026   future_migrates_expected = 0;
1027 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1028         int sending=0;
1029     int dummy=0;
1030         LBDB *_myLBDB = theLbdb->getLBDB();
1031         if(_restartFlag){
1032         dummyCounts = new int[CmiNumPes()];
1033         bzero(dummyCounts,sizeof(int)*CmiNumPes());
1034     }
1035 #endif
1036   for(i=0; i < m->n_moves; i++) {
1037     MigrateInfo& move = m->moves[i];
1038     const int me = CkMyPe();
1039     if (move.from_pe == me && move.to_pe != me) {
1040       DEBUGF(("[%d] migrating object to %d\n",move.from_pe,move.to_pe));
1041       // migrate object, in case it is already gone, inform toPe
1042 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
1043       if (theLbdb->Migrate(move.obj,move.to_pe) == 0) 
1044          thisProxy[move.to_pe].MissMigrate(!move.async_arrival);
1045 #else
1046             if(_restartFlag == 0){
1047                 DEBUG(CmiPrintf("[%d] need to move object from %d to %d \n",CkMyPe(),move.from_pe,move.to_pe));
1048                 theLbdb->Migrate(move.obj,move.to_pe);
1049                 sending++;
1050             }else{
1051                 if(_myLBDB->validObjHandle(move.obj)){
1052                     DEBUG(CmiPrintf("[%d] need to move object from %d to %d \n",CkMyPe(),move.from_pe,move.to_pe));
1053                     theLbdb->Migrate(move.obj,move.to_pe);
1054                     sending++;
1055                 }else{
1056                     DEBUG(CmiPrintf("[%d] dummy move to pe %d detected after restart \n",CmiMyPe(),move.to_pe));
1057                     dummyCounts[move.to_pe]++;
1058                     dummy++;
1059                 }
1060             }
1061 #endif
1062     } else if (move.from_pe != me && move.to_pe == me) {
1063        DEBUGF(("[%d] expecting object from %d\n",move.to_pe,move.from_pe));
1064       if (!move.async_arrival) migrates_expected++;
1065       else future_migrates_expected++;
1066     }
1067   }
1068   DEBUGF(("[%d] in ReceiveMigration %d moves expected: %d future expected: %d\n",CkMyPe(),m->n_moves, migrates_expected, future_migrates_expected));
1069   // if (_lb_debug) CkPrintf("[%d] expecting %d objects migrating.\n", CkMyPe(), migrates_expected);
1070
1071 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1072         if(_restartFlag){
1073                 sendDummyMigrationCounts(dummyCounts);
1074                 _restartFlag  =0;
1075         delete []dummyCounts;
1076         }
1077 #endif
1078
1079
1080 #if 0
1081   if (m->n_moves ==0) {
1082     theLbdb->SetLBPeriod(theLbdb->GetLBPeriod()*2);
1083   }
1084 #endif
1085   cur_ld_balancer = m->next_lb;
1086   if((CkMyPe() == cur_ld_balancer) && (cur_ld_balancer != 0)){
1087       LBDatabaseObj()->set_avail_vector(m->avail_vector, -2);
1088   }
1089
1090   if (migrates_expected == 0 || migrates_completed == migrates_expected)
1091     MigrationDone(1);
1092   delete m;
1093
1094 //      CkEvacuatedElement();
1095 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1096 //  migrates_expected = 0;
1097 //  //  ResumeClients(1);
1098 #endif
1099 #endif
1100 }
1101
1102 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1103 void CentralLB::ReceiveDummyMigration(int globalDecisionCount){
1104     DEBUGF(("[%d] ReceiveDummyMigration called for step %d with globalDecisionCount %d\n",CkMyPe(),step(),globalDecisionCount));
1105     //TODO: this is gonna be important when a crash happens during checkpoint
1106     //the globalDecisionCount would have to be saved and compared against
1107     //a future recvMigration
1108                 
1109         thisProxy[CkMyPe()].ResumeClients(1);
1110 }
1111 #endif
1112
1113 void CentralLB::MigrationDone(int balancing)
1114 {
1115 #if CMK_LBDB_ON
1116   migrates_completed = 0;
1117   migrates_expected = -1;
1118   // clear load stats
1119   if (balancing) theLbdb->ClearLoads();
1120   // Increment to next step
1121   theLbdb->incStep();
1122         DEBUGF(("[%d] Incrementing Step %d \n",CkMyPe(),step()));
1123   // if sync resume, invoke a barrier
1124
1125 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1126     savedBalancing = balancing;
1127     startLoadBalancingMlog(&resumeCentralLbAfterChkpt,(void *)this);
1128 #endif
1129
1130   LBDatabase::Object()->MigrationDone();    // call registered callbacks
1131
1132   LoadbalanceDone(balancing);        // callback
1133 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
1134   // if sync resume invoke a barrier
1135   if (balancing && _lb_args.syncResume()) {
1136     CkCallback cb(CkIndex_CentralLB::ResumeClients((CkReductionMsg*)NULL), 
1137                   thisProxy);
1138     contribute(0, NULL, CkReduction::sum_int, cb);
1139   }
1140   else{ 
1141     if(CmiNodeAlive(CkMyPe())){
1142         thisProxy [CkMyPe()].ResumeClients(balancing);
1143     }   
1144   }     
1145 #if CMK_GRID_QUEUE_AVAILABLE
1146   CmiGridQueueDeregisterAll ();
1147   CpvAccess(CkGridObject) = NULL;
1148 #endif
1149 #endif 
1150 #endif
1151 }
1152
1153 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1154 void CentralLB::endMigrationDone(int balancing){
1155     DEBUGF(("[%d] CentralLB::endMigrationDone step %d\n",CkMyPe(),step()));
1156
1157
1158   if (balancing && _lb_args.syncResume()) {
1159     CkCallback cb(CkIndex_CentralLB::ResumeClients((CkReductionMsg*)NULL),
1160                   thisProxy);
1161     contribute(0, NULL, CkReduction::sum_int, cb);
1162   }
1163   else{
1164     if(CmiNodeAlive(CkMyPe())){
1165     DEBUGF(("[%d] Sending ResumeClients balancing %d \n",CkMyPe(),balancing));
1166     thisProxy [CkMyPe()].ResumeClients(balancing);
1167     }
1168   }
1169
1170 }
1171 #endif
1172
1173 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1174 void resumeCentralLbAfterChkpt(void *_lb){
1175     CentralLB *lb= (CentralLB *)_lb;
1176     CpvAccess(_currentObj)=lb;
1177     lb->endMigrationDone(lb->savedBalancing);
1178 }
1179 #endif
1180
1181
1182 void CentralLB::ResumeClients(CkReductionMsg *msg)
1183 {
1184   ResumeClients(1);
1185   delete msg;
1186 }
1187
1188 void CentralLB::ResumeClients(int ideal_period, int balancing) {
1189   lb_ideal_period = ideal_period;
1190   ResumeClients(balancing);
1191 }
1192
1193 void CentralLB::ResumeClients(int balancing)
1194 {
1195 #if CMK_LBDB_ON
1196 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1197     resumeCount++;
1198     globalResumeCount = resumeCount;
1199 #endif
1200   DEBUGF(("[%d] Resuming clients. balancing:%d.\n",CkMyPe(),balancing));
1201   if (balancing && _lb_args.debug() && CkMyPe() == cur_ld_balancer) {
1202     double end_lb_time = CkWallTimer();
1203   }
1204
1205 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
1206   if (balancing) ComlibNotifyMigrationDone();  
1207 #endif
1208
1209   theLbdb->ResumeClients();
1210   if (balancing)  {
1211     CheckMigrationComplete();
1212     if (future_migrates_expected == 0 || 
1213             future_migrates_expected == future_migrates_completed) {
1214       CheckMigrationComplete();
1215     }
1216   }
1217 #endif
1218 }
1219
1220 /*
1221   migration of objects contains two different kinds:
1222   (1) objects want to make a barrier for migration completion
1223       (waitForBarrier is true)
1224       migrationDone() to finish and resumeClients
1225   (2) objects don't need a barrier
1226   However, next load balancing can only happen when both migrations complete
1227 */ 
1228 void CentralLB::CheckMigrationComplete()
1229 {
1230 #if CMK_LBDB_ON
1231   lbdone ++;
1232   if (lbdone == 2) {
1233     if (_lb_args.debug() && CkMyPe()==0) {
1234       double end_lb_time = CkWallTimer();
1235       CkPrintf("CharmLB> %s: PE [%d] step %d finished at %f duration %f s\n\n",
1236                 lbname, cur_ld_balancer, step()-1, end_lb_time,
1237                 end_lb_time-start_lb_time);
1238     }
1239
1240     lb_migration_cost = (CkWallTimer() - start_lb_time);
1241
1242     lbdone = 0;
1243     future_migrates_expected = -1;
1244     future_migrates_completed = 0;
1245     DEBUGF(("[%d] Migration Complete\n", CkMyPe()));
1246     // release local barrier  so that the next load balancer can go
1247     LDOMHandle h;
1248     h.id.id.idx = 0;
1249     theLbdb->getLBDB()->DoneRegisteringObjects(h);
1250     // switch to the next load balancer in the list
1251     // subtle: called from Migrated() may result in Migrated() called in next LB
1252     theLbdb->nextLoadbalancer(seqno);
1253   }
1254 #endif
1255 }
1256
1257 void CentralLB::preprocess(LDStats* stats)
1258 {
1259   if (_lb_args.ignoreBgLoad())
1260     stats->clearBgLoad();
1261
1262   // Call the predictor for the future
1263   if (_lb_predict) FuturePredictor(statsData);
1264 }
1265
1266 // default load balancing strategy
1267 LBMigrateMsg* CentralLB::Strategy(LDStats* stats)
1268 {
1269 #if CMK_LBDB_ON
1270   double strat_start_time = CkWallTimer();
1271   if (_lb_args.debug())
1272     CkPrintf("CharmLB> %s: PE [%d] strategy starting at %f\n", lbname, cur_ld_balancer, strat_start_time);
1273
1274   work(stats);
1275
1276   if (_lb_args.debug()>1)  {
1277     CkPrintf("CharmLB> Obj Map:\n");
1278     for (int i=0; i<stats->n_objs; i++) CkPrintf("%d ", stats->to_proc[i]);
1279     CkPrintf("\n");
1280   }
1281
1282   LBMigrateMsg *msg = createMigrateMsg(stats);
1283
1284   if (_lb_args.debug()) {
1285     double strat_end_time = CkWallTimer();
1286     envelope *env = UsrToEnv(msg);
1287
1288     double lbdbMemsize = LBDatabase::Object()->useMem()/1000;
1289     CkPrintf("CharmLB> %s: PE [%d] Memory: LBManager: %d KB CentralLB: %d KB\n",
1290               lbname, cur_ld_balancer, (int)lbdbMemsize, (int)(useMem()/1000));
1291     CkPrintf("CharmLB> %s: PE [%d] #Objects migrating: %d, LBMigrateMsg size: %.2f MB\n", lbname, cur_ld_balancer, msg->n_moves, env->getTotalsize()/1024.0/1024.0);
1292     CkPrintf("CharmLB> %s: PE [%d] strategy finished at %f duration %f s\n",
1293               lbname, cur_ld_balancer, strat_end_time, strat_end_time-strat_start_time);
1294     lb_strategy_cost = (strat_end_time - strat_start_time);
1295     CkPrintf("Strategy cost %f %f %f\n", strat_end_time, strat_start_time, lb_strategy_cost);
1296   }
1297
1298   return msg;
1299 #else
1300   return NULL;
1301 #endif
1302 }
1303
1304 void CentralLB::work(LDStats* stats)
1305 {
1306   // does nothing but print the database
1307   stats->print();
1308 }
1309
1310 // generate migrate message from stats->from_proc and to_proc
1311 LBMigrateMsg * CentralLB::createMigrateMsg(LDStats* stats)
1312 {
1313   int i;
1314   CkVec<MigrateInfo*> migrateInfo;
1315   for (i=0; i<stats->n_objs; i++) {
1316     LDObjData &objData = stats->objData[i];
1317     int frompe = stats->from_proc[i];
1318     int tope = stats->to_proc[i];
1319     if (frompe != tope) {
1320       //      CkPrintf("[%d] Obj %d migrating from %d to %d\n",
1321       //         CkMyPe(),obj,pe,dest);
1322       MigrateInfo *migrateMe = new MigrateInfo;
1323       migrateMe->obj = objData.handle;
1324       migrateMe->from_pe = frompe;
1325       migrateMe->to_pe = tope;
1326       migrateMe->async_arrival = objData.asyncArrival;
1327       migrateInfo.insertAtEnd(migrateMe);
1328     }
1329   }
1330
1331   int migrate_count=migrateInfo.length();
1332   LBMigrateMsg* msg = new(migrate_count,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
1333   msg->n_moves = migrate_count;
1334   for(i=0; i < migrate_count; i++) {
1335     MigrateInfo* item = (MigrateInfo*) migrateInfo[i];
1336     msg->moves[i] = *item;
1337     delete item;
1338     migrateInfo[i] = 0;
1339   }
1340   return msg;
1341 }
1342
1343 LBMigrateMsg * CentralLB::extractMigrateMsg(LBMigrateMsg *m, int p)
1344 {
1345   int nmoves = 0;
1346   int nunavail = 0;
1347   int i;
1348   for (i=0; i<m->n_moves; i++) {
1349     MigrateInfo* item = (MigrateInfo*) &m->moves[i];
1350     if (item->from_pe == p || item->to_pe == p) nmoves++;
1351   }
1352   for (i=0; i<CkNumPes();i++) {
1353     if (!m->avail_vector[i]) nunavail++;
1354   }
1355   LBMigrateMsg* msg;
1356   if (nunavail) msg = new(nmoves,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
1357   else msg = new(nmoves,0,0,0) LBMigrateMsg;
1358   msg->n_moves = nmoves;
1359   msg->level = m->level;
1360   msg->next_lb = m->next_lb;
1361   for (i=0,nmoves=0; i<m->n_moves; i++) {
1362     MigrateInfo* item = (MigrateInfo*) &m->moves[i];
1363     if (item->from_pe == p || item->to_pe == p) {
1364       msg->moves[nmoves] = *item;
1365       nmoves++;
1366     }
1367   }
1368   // copy processor data
1369   if (nunavail)
1370   for (i=0; i<CkNumPes();i++) {
1371     msg->avail_vector[i] = m->avail_vector[i];
1372     msg->expectedLoad[i] = m->expectedLoad[i];
1373   }
1374   return msg;
1375 }
1376
1377 void CentralLB::simulationWrite() {
1378   if(step() == LBSimulation::dumpStep)
1379   {
1380     // here we are supposed to dump the database
1381     int dumpFileSize = strlen(LBSimulation::dumpFile) + 4;
1382     char *dumpFileName = (char *)malloc(dumpFileSize);
1383     while (sprintf(dumpFileName, "%s.%d", LBSimulation::dumpFile, LBSimulation::dumpStep) >= dumpFileSize) {
1384       free(dumpFileName);
1385       dumpFileSize+=3;
1386       dumpFileName = (char *)malloc(dumpFileSize);
1387     }
1388     writeStatsMsgs(dumpFileName);
1389     free(dumpFileName);
1390     CmiPrintf("LBDump: Dumped the load balancing data at step %d.\n",LBSimulation::dumpStep);
1391     ++LBSimulation::dumpStep;
1392     --LBSimulation::dumpStepSize;
1393     if (LBSimulation::dumpStepSize <= 0) { // prevent stupid step sizes
1394       CmiPrintf("Charm++> Exiting...\n");
1395       CkExit();
1396     }
1397     return;
1398   }
1399 }
1400
1401 void CentralLB::simulationRead() {
1402   LBSimulation *simResults = NULL, *realResults;
1403   LBMigrateMsg *voidMessage = new (0,0,0,0) LBMigrateMsg();
1404   voidMessage->n_moves=0;
1405   for ( ;LBSimulation::simStepSize > 0; --LBSimulation::simStepSize, ++LBSimulation::simStep) {
1406     // here we are supposed to read the data from the dump database
1407     int simFileSize = strlen(LBSimulation::dumpFile) + 4;
1408     char *simFileName = (char *)malloc(simFileSize);
1409     while (sprintf(simFileName, "%s.%d", LBSimulation::dumpFile, LBSimulation::simStep) >= simFileSize) {
1410       free(simFileName);
1411       simFileSize+=3;
1412       simFileName = (char *)malloc(simFileSize);
1413     }
1414     readStatsMsgs(simFileName);
1415
1416     // allocate simResults (only the first step)
1417     if (simResults == NULL) {
1418       simResults = new LBSimulation(LBSimulation::simProcs);
1419       realResults = new LBSimulation(LBSimulation::simProcs);
1420     }
1421     else {
1422       // should be the same number of procs of the original simulation!
1423       if (!LBSimulation::procsChanged) {
1424         // it means we have a previous step, so in simResults there is data.
1425         // we can now print the real effects of the load balancer during the simulation
1426         // or print the difference between the predicted data and the real one.
1427         realResults->reset();
1428         // reset to_proc of statsData to be equal to from_proc
1429         for (int k=0; k < statsData->n_objs; ++k) statsData->to_proc[k] = statsData->from_proc[k];
1430         findSimResults(statsData, LBSimulation::simProcs, voidMessage, realResults);
1431         simResults->PrintDifferences(realResults,statsData);
1432       }
1433       simResults->reset();
1434     }
1435
1436     // now pass it to the strategy routine
1437     double startT = CkWallTimer();
1438     preprocess(statsData);
1439     CmiPrintf("%s> Strategy starts ... \n", lbname);
1440     LBMigrateMsg* migrateMsg = Strategy(statsData);
1441     CmiPrintf("%s> Strategy took %fs memory usage: CentralLB: %d KB.\n",
1442                lbname, CkWallTimer()-startT, (int)(useMem()/1000));
1443
1444     // now calculate the results of the load balancing simulation
1445     findSimResults(statsData, LBSimulation::simProcs, migrateMsg, simResults);
1446
1447     // now we have the simulation data, so print it and loop
1448     CmiPrintf("Charm++> LBSim: Simulation of load balancing step %d done.\n",LBSimulation::simStep);
1449     // **CWL** Officially recording my disdain here for using ints for bool
1450     if (LBSimulation::showDecisionsOnly) {
1451       simResults->PrintDecisions(migrateMsg, simFileName, 
1452                                  LBSimulation::simProcs);
1453     } else {
1454       simResults->PrintSimulationResults();
1455     }
1456
1457     free(simFileName);
1458     delete migrateMsg;
1459     CmiPrintf("Charm++> LBSim: Passing to the next step\n");
1460   }
1461   // deallocate simResults
1462   delete simResults;
1463   CmiPrintf("Charm++> Exiting...\n");
1464   CkExit();
1465 }
1466
1467 void CentralLB::readStatsMsgs(const char* filename) 
1468 {
1469 #if CMK_LBDB_ON
1470   int i;
1471   FILE *f = fopen(filename, "r");
1472   if (f==NULL) {
1473     CmiPrintf("Fatal Error> Cannot open LB Dump file %s!\n", filename);
1474     CmiAbort("");
1475   }
1476
1477   // at this stage, we need to rebuild the statsMsgList and
1478   // statsDataList structures. For that first deallocate the
1479   // old structures
1480   if (statsMsgsList) {
1481     for(i = 0; i < stats_msg_count; i++)
1482       delete statsMsgsList[i];
1483     delete[] statsMsgsList;
1484     statsMsgsList=0;
1485   }
1486
1487   PUP::fromDisk pd(f);
1488   PUP::machineInfo machInfo;
1489
1490   pd((char *)&machInfo, sizeof(machInfo));      // read machine info
1491   PUP::xlater p(machInfo, pd);
1492
1493   if (_lb_args.lbversion() > 1) {
1494     p|_lb_args.lbversion();             // write version number
1495     CkPrintf("LB> File version detected: %d\n", _lb_args.lbversion());
1496     CmiAssert(_lb_args.lbversion() <= LB_FORMAT_VERSION);
1497   }
1498   p|stats_msg_count;
1499
1500   CmiPrintf("readStatsMsgs for %d pes starts ... \n", stats_msg_count);
1501   if (LBSimulation::simProcs == 0) LBSimulation::simProcs = stats_msg_count;
1502   if (LBSimulation::simProcs != stats_msg_count) LBSimulation::procsChanged = true;
1503
1504   // LBSimulation::simProcs must be set
1505   statsData->pup(p);
1506
1507   CmiPrintf("Simulation for %d pes \n", LBSimulation::simProcs);
1508   CmiPrintf("n_obj: %d n_migratble: %d \n", statsData->n_objs, statsData->n_migrateobjs);
1509
1510   // file f is closed in the destructor of PUP::fromDisk
1511   CmiPrintf("ReadStatsMsg from %s completed\n", filename);
1512 #endif
1513 }
1514
1515 void CentralLB::writeStatsMsgs(const char* filename) 
1516 {
1517 #if CMK_LBDB_ON
1518   FILE *f = fopen(filename, "w");
1519   if (f==NULL) {
1520     CmiPrintf("Fatal Error> writeStatsMsgs failed to open the output file %s!\n", filename);
1521     CmiAbort("");
1522   }
1523
1524   const PUP::machineInfo &machInfo = PUP::machineInfo::current();
1525   PUP::toDisk p(f);
1526   p((char *)&machInfo, sizeof(machInfo));       // machine info
1527
1528   p|_lb_args.lbversion();               // write version number
1529   p|stats_msg_count;
1530   statsData->pup(p);
1531
1532   fclose(f);
1533
1534   CmiPrintf("WriteStatsMsgs to %s succeed!\n", filename);
1535 #endif
1536 }
1537
1538 // calculate the predicted wallclock/cpu load for every processors
1539 // considering communication overhead if considerComm is true
1540 void getPredictedLoadWithMsg(BaseLB::LDStats* stats, int count, 
1541                       LBMigrateMsg *msg, LBInfo &info, 
1542                       int considerComm)
1543 {
1544 #if CMK_LBDB_ON
1545         stats->makeCommHash();
1546
1547         // update to_proc according to migration msgs
1548         for(int i = 0; i < msg->n_moves; i++) {
1549           MigrateInfo &mInfo = msg->moves[i];
1550           int idx = stats->getHash(mInfo.obj.objID(), mInfo.obj.omID());
1551           CmiAssert(idx != -1);
1552           stats->to_proc[idx] = mInfo.to_pe;
1553         }
1554
1555         info.getInfo(stats, count, considerComm);
1556 #endif
1557 }
1558
1559
1560 void CentralLB::findSimResults(LDStats* stats, int count, LBMigrateMsg* msg, LBSimulation* simResults)
1561 {
1562     CkAssert(simResults != NULL && count == simResults->numPes);
1563     // estimate the new loads of the processors. As a first approximation, this is the
1564     // sum of the cpu times of the objects on that processor
1565     double startT = CkWallTimer();
1566     getPredictedLoadWithMsg(stats, count, msg, simResults->lbinfo, 1);
1567     CmiPrintf("getPredictedLoad finished in %fs\n", CkWallTimer()-startT);
1568 }
1569
1570 void CentralLB::pup(PUP::er &p) { 
1571   BaseLB::pup(p); 
1572   if (p.isUnpacking())  {
1573     initLB(CkLBOptions(seqno)); 
1574   }
1575   p|reduction_started;
1576   int has_statsMsg=0;
1577   if (p.isPacking()) has_statsMsg = (statsMsg!=NULL);
1578   p|has_statsMsg;
1579   if (has_statsMsg) {
1580     if (p.isUnpacking())
1581       statsMsg = new CLBStatsMsg;
1582     statsMsg->pup(p);
1583   }
1584 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1585   p | lbDecisionCount;
1586   p | resumeCount;
1587 #endif
1588         
1589 }
1590
1591 int CentralLB::useMem() { 
1592   return sizeof(CentralLB) + statsData->useMem() + 
1593          CkNumPes() * sizeof(CLBStatsMsg *);
1594 }
1595
1596
1597 /**
1598   CLBStatsMsg is not a real message now.
1599   CLBStatsMsg is used for all processors to fill in their local load and comm
1600   statistics and send to processor 0
1601 */
1602
1603 CLBStatsMsg::CLBStatsMsg(int osz, int csz) {
1604   n_objs = osz;
1605   n_comm = csz;
1606   objData = new LDObjData[osz];
1607   commData = new LDCommData[csz];
1608   avail_vector = NULL;
1609 }
1610
1611 CLBStatsMsg::~CLBStatsMsg() {
1612   delete [] objData;
1613   delete [] commData;
1614   delete [] avail_vector;
1615 }
1616
1617 void CLBStatsMsg::pup(PUP::er &p) {
1618   int i;
1619   p|from_pe;
1620   p|pe_speed;
1621   p|total_walltime;
1622   p|idletime;
1623   p|bg_walltime;
1624 #if CMK_LB_CPUTIMER
1625   p|total_cputime;
1626   p|bg_cputime;
1627 #endif
1628 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1629   p | step;
1630 #endif
1631   p|n_objs;
1632   if (p.isUnpacking()) objData = new LDObjData[n_objs];
1633   for (i=0; i<n_objs; i++) p|objData[i];
1634   p|n_comm;
1635   if (p.isUnpacking()) commData = new LDCommData[n_comm];
1636   for (i=0; i<n_comm; i++) p|commData[i];
1637
1638   int has_avail_vector;
1639   if (!p.isUnpacking()) has_avail_vector = (avail_vector != NULL);
1640   p|has_avail_vector;
1641   if (p.isUnpacking()) {
1642     if (has_avail_vector) avail_vector = new char[CkNumPes()];
1643     else avail_vector = NULL;
1644   }
1645   if (has_avail_vector) p(avail_vector, CkNumPes());
1646
1647   p(next_lb);
1648 }
1649
1650 // CkMarshalledCLBStatsMessage is used in the marshalled parameter in
1651 // the entry function, it is just used to use to pup.
1652 // I don't use CLBStatsMsg directly as marshalled parameter because
1653 // I want the data pointer stored and not to be freed by the Charm++.
1654 void CkMarshalledCLBStatsMessage::free() { 
1655   int count = msgs.size();
1656   for  (int i=0; i<count; i++) {
1657     delete msgs[i];
1658     msgs[i] = NULL;
1659   }
1660   msgs.free();
1661 }
1662
1663 void CkMarshalledCLBStatsMessage::add(CkMarshalledCLBStatsMessage &m)
1664 {
1665   int count = m.getCount();
1666   for (int i=0; i<count; i++) add(m.getMessage(i));
1667 }
1668
1669 void CkMarshalledCLBStatsMessage::pup(PUP::er &p)
1670 {
1671   int count = msgs.size();
1672   p|count;
1673   for (int i=0; i<count; i++) {
1674     CLBStatsMsg *msg;
1675     if (p.isUnpacking()) msg = new CLBStatsMsg;
1676     else { 
1677       msg = msgs[i]; CmiAssert(msg!=NULL);
1678     }
1679     msg->pup(p);
1680     if (p.isUnpacking()) add(msg);
1681   }
1682 }
1683
1684 SpanningTree::SpanningTree()
1685 {
1686         double sq = sqrt(CkNumPes()*4.0-3.0) - 1; // 1 + arity + arity*arity = CkNumPes()
1687         arity = (int)ceil(sq/2);
1688         calcParent(CkMyPe());
1689         calcNumChildren(CkMyPe());
1690 }
1691
1692 void SpanningTree::calcParent(int n)
1693 {
1694         parent=-1;
1695         if(n != 0  && arity > 0)
1696                 parent = (n-1)/arity;
1697 }
1698
1699 void SpanningTree::calcNumChildren(int n)
1700 {
1701         numChildren = 0;
1702         if (arity == 0) return;
1703         int fullNode=(CkNumPes()-1-arity)/arity;
1704         if(n <= fullNode)
1705                 numChildren = arity;
1706         if(n == fullNode+1)
1707                 numChildren = CkNumPes()-1-(fullNode+1)*arity;
1708         if(n > fullNode+1)
1709                 numChildren = 0;
1710 }
1711
1712 #include "CentralLB.def.h"
1713  
1714 /*@}*/