f0cd019a1c7717f48c1234cc55f5d9149f067c73
[charm.git] / src / ck-ldb / CentralLB.C
1 /**
2  * \addtogroup CkLdb
3 */
4 /*@{*/
5
6 #include <charm++.h>
7 #include "ck.h"
8 #include "envelope.h"
9 #include "CentralLB.h"
10 #include "LBDBManager.h"
11 #include "LBSimulation.h"
12
13 #include <vector>
14
15 #define  DEBUGF(x)       // CmiPrintf x;
16 #define  DEBUG(x)        // x;
17
18 #if CMK_MEM_CHECKPOINT
19    /* can not handle reduction in inmem FT */
20 #define USE_REDUCTION         0
21 #define USE_LDB_SPANNING_TREE 0
22 #elif defined(_FAULT_MLOG_)
23 /* can not handle reduction in inmem FT */
24 #define USE_REDUCTION         0
25 #define USE_LDB_SPANNING_TREE 0
26 #else
27 #define USE_REDUCTION         1
28 #define USE_LDB_SPANNING_TREE 1
29 #endif
30
31 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
32 extern int _restartFlag;
33 extern void getGlobalStep(CkGroupID );
34 extern void initMlogLBStep(CkGroupID );
35 extern int globalResumeCount;
36 extern void sendDummyMigrationCounts(int *);
37 #endif
38
39 #if CMK_GRID_QUEUE_AVAILABLE
40 CpvExtern(void *, CkGridObject);
41 #endif
42
43 CkGroupID loadbalancer;
44 int * lb_ptr;
45 int load_balancer_created;
46
47 double lb_migration_cost = 0.0;
48 double lb_strategy_cost = 0.0;
49
50 int lb_ideal_period = 0;
51 int lb_no_iterations = -1;
52
53 double cur_max_pe_load = 0.0;
54 double cur_avg_pe_load = 0.0;
55 double prev_load = 0.0;
56
57 struct AdaptiveData {
58   int iteration;
59   double max_load;
60   double avg_load;
61 };
62
63 struct AdaptiveLBDatabase {
64   std::vector<AdaptiveData> history_data;
65 } adaptive_lbdb;
66
67 CreateLBFunc_Def(CentralLB, "CentralLB base class")
68
69 static void getPredictedLoadWithMsg(BaseLB::LDStats* stats, int count, 
70                              LBMigrateMsg *, LBInfo &info, int considerComm);
71
72 CkReductionMsg* lbDataCollection(int nMsg, CkReductionMsg** msgs) {
73   double lb_data[3];
74   lb_data[0] = 0;
75   lb_data[1] = 0;
76   lb_data[2] = 0;
77   for (int i = 0; i < nMsg; i++) {
78     CkAssert(msgs[i]->getSize() == 3*sizeof(double));
79     double* m = (double *)msgs[i]->getData();
80     lb_data[0] += m[0];
81     lb_data[1] = ((m[1] > lb_data[1])? m[1] : lb_data[1]);
82     lb_data[2] += m[2];
83   }
84   return CkReductionMsg::buildNew(3*sizeof(double), lb_data);
85 }
86
87 /*global*/ CkReduction::reducerType lbDataCollectionType;
88 /*initcall*/ void registerLBDataCollection(void) {
89   lbDataCollectionType = CkReduction::addReducer(lbDataCollection);
90 }
91
92 /*
93 void CreateCentralLB()
94 {
95   CProxy_CentralLB::ckNew(0);
96 }
97 */
98
99 void CentralLB::staticStartLB(void* data)
100 {
101   CentralLB *me = (CentralLB*)(data);
102   me->StartLB();
103 }
104
105 void CentralLB::staticMigrated(void* data, LDObjHandle h, int waitBarrier)
106 {
107   CentralLB *me = (CentralLB*)(data);
108   me->Migrated(h, waitBarrier);
109 }
110
111 void CentralLB::staticAtSync(void* data)
112 {
113   CentralLB *me = (CentralLB*)(data);
114   me->AtSync();
115 }
116
117 void CentralLB::initLB(const CkLBOptions &opt)
118 {
119 #if CMK_LBDB_ON
120   lbname = "CentralLB";
121   thisProxy = CProxy_CentralLB(thisgroup);
122   //  CkPrintf("Construct in %d\n",CkMyPe());
123
124   // create and turn on by default
125   receiver = theLbdb->
126     AddLocalBarrierReceiver((LDBarrierFn)(staticAtSync),(void*)(this));
127   notifier = theLbdb->getLBDB()->
128     NotifyMigrated((LDMigratedFn)(staticMigrated),(void*)(this));
129   startLbFnHdl = theLbdb->getLBDB()->
130     AddStartLBFn((LDStartLBFn)(staticStartLB),(void*)(this));
131
132   // CkPrintf("[%d] CentralLB initLB \n",CkMyPe());
133   if (opt.getSeqNo() > 0) turnOff();
134
135   stats_msg_count = 0;
136   statsMsgsList = NULL;
137   statsData = NULL;
138
139   storedMigrateMsg = NULL;
140   reduction_started = 0;
141
142   // for future predictor
143   if (_lb_predict) predicted_model = new FutureModel(_lb_predict_window);
144   else predicted_model=0;
145   // register user interface callbacks
146   theLbdb->getLBDB()->SetupPredictor((LDPredictModelFn)(staticPredictorOn),(LDPredictWindowFn)(staticPredictorOnWin),(LDPredictFn)(staticPredictorOff),(LDPredictModelFn)(staticChangePredictor),(void*)(this));
147
148   myspeed = theLbdb->ProcessorSpeed();
149
150   migrates_completed = 0;
151   future_migrates_completed = 0;
152   migrates_expected = -1;
153   future_migrates_expected = -1;
154   cur_ld_balancer = _lb_args.central_pe();      // 0 default
155   lbdone = 0;
156   count_msgs=0;
157   statsMsg = NULL;
158
159   if (_lb_args.statsOn()) theLbdb->CollectStatsOn();
160
161   load_balancer_created = 1;
162 #endif
163 }
164
165 CentralLB::~CentralLB()
166 {
167 #if CMK_LBDB_ON
168   delete [] statsMsgsList;
169   delete statsData;
170   theLbdb = CProxy_LBDatabase(_lbdb).ckLocalBranch();
171   if (theLbdb) {
172     theLbdb->getLBDB()->
173       RemoveNotifyMigrated(notifier);
174     theLbdb->
175       RemoveStartLBFn((LDStartLBFn)(staticStartLB));
176   }
177 #endif
178 }
179
180 void CentralLB::turnOn() 
181 {
182 #if CMK_LBDB_ON
183   theLbdb->getLBDB()->
184     TurnOnBarrierReceiver(receiver);
185   theLbdb->getLBDB()->
186     TurnOnNotifyMigrated(notifier);
187   theLbdb->getLBDB()->
188     TurnOnStartLBFn(startLbFnHdl);
189 #endif
190 }
191
192 void CentralLB::turnOff() 
193 {
194 #if CMK_LBDB_ON
195   theLbdb->getLBDB()->
196     TurnOffBarrierReceiver(receiver);
197   theLbdb->getLBDB()->
198     TurnOffNotifyMigrated(notifier);
199   theLbdb->getLBDB()->
200     TurnOffStartLBFn(startLbFnHdl);
201 #endif
202 }
203
204 void CentralLB::AtSync()
205 {
206   CkPrintf("AtSync CEntral LB [%d]\n", CkMyPe());
207 #if CMK_LBDB_ON
208   DEBUGF(("[%d] CentralLB AtSync step %d!!!!!\n",CkMyPe(),step()));
209
210 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
211         CpvAccess(_currentObj)=this;
212 #endif
213
214   // if num of processor is only 1, nothing should happen
215   if (!QueryBalanceNow(step()) || CkNumPes() == 1) {
216     MigrationDone(0);
217     return;
218   }
219   if(CmiNodeAlive(CkMyPe())){
220     thisProxy [CkMyPe()].ProcessAtSyncMin();
221   }
222 #endif
223 }
224
225 #include "ComlibStrategy.h"
226
227 void CentralLB::ProcessAtSync()
228 {
229
230   // Reset all adaptive lb related fields since load balancing is being done.
231   lb_no_iterations = -1;
232   adaptive_lbdb.history_data.clear();
233   prev_load = 0.0;
234
235 #if CMK_LBDB_ON
236   if (reduction_started) return;              // reducton in progress
237
238   CmiAssert(CmiNodeAlive(CkMyPe()));
239   if (CkMyPe() == cur_ld_balancer) {
240     start_lb_time = CkWallTimer();
241   }
242
243
244 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
245         initMlogLBStep(thisgroup);
246 #endif
247
248   // build message
249   BuildStatsMsg();
250
251 #if USE_REDUCTION
252     // reduction to get total number of objects and comm
253     // so that processor 0 can pre-allocate load balancing database
254   int counts[2];
255   counts[0] = theLbdb->GetObjDataSz();
256   counts[1] = theLbdb->GetCommDataSz();
257
258   CkCallback cb(CkIndex_CentralLB::ReceiveCounts((CkReductionMsg*)NULL), 
259                   thisProxy[0]);
260   contribute(2*sizeof(int), counts, CkReduction::sum_int, cb);
261   reduction_started = 1;
262 #else
263   SendStats();
264 #endif
265 #endif
266 }
267
268 void CentralLB::ProcessAtSyncMin()
269 {
270 #if CMK_LBDB_ON
271   if (reduction_started) return;              // reducton in progress
272
273   CmiAssert(CmiNodeAlive(CkMyPe()));
274   if (CkMyPe() == cur_ld_balancer) {
275     start_lb_time = CkWallTimer();
276   }
277
278
279 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
280         initMlogLBStep(thisgroup);
281 #endif
282   
283   lb_no_iterations++;
284   CkPrintf("ProcessAtSyncMin [%d] lb_iteration [%d] lb_ideal_period [%d]\n", CkMyPe(),
285       lb_no_iterations, lb_ideal_period);
286
287   if (lb_no_iterations >= lb_ideal_period) {
288     CkPrintf("Since the time is past the collection period, send MinStats of [%d]\n", CkMyPe());
289     SendMinStats();
290   } else {
291
292  double total_walltime;
293  double idle_time;
294  double bg_walltime;
295   theLbdb->GetTime(&total_walltime,&total_walltime, &idle_time, &bg_walltime, &bg_walltime);
296     CkPrintf("No need to send MinStats of [%d] load: %lf\n\n", CkMyPe(),
297     total_walltime);
298     ResumeClients(0);
299   }
300 #endif
301 }
302
303 void CentralLB::SendMinStats() {
304
305  double total_walltime;
306  double idle_time;
307  double bg_walltime;
308   theLbdb->GetTime(&total_walltime,&total_walltime, &idle_time, &bg_walltime, &bg_walltime);
309   CkPrintf("Total walltime [%d] %lf\n", CkMyPe(), total_walltime);
310
311   // Since the total_walltime is cumulative since the last load balancing stage,
312   // Hence it is subtracted from the previous load.
313   double tmp = total_walltime;
314   total_walltime -= prev_load;
315   prev_load = tmp; 
316
317   double lb_data[3];
318   lb_data[0] = total_walltime;
319   lb_data[1] = total_walltime;
320   lb_data[2] = 1;
321
322   CkCallback cb(CkIndex_CentralLB::ReceiveMinStats((CkReductionMsg*)NULL), 
323                   thisProxy[0]);
324   contribute(3*sizeof(double), lb_data, lbDataCollectionType, cb);
325 }
326
327 void CentralLB::ReceiveMinStats(CkReductionMsg *msg) {
328   CmiAssert(CkMyPe() == 0);
329   double* load = (LBRealType *) msg->getData();
330   double max = load[1];
331   double avg = load[0]/load[2];
332   CkPrintf("Iteration[%d] Total load : %lf Avg load: %lf Max load: %lf\n\n",lb_no_iterations, load[0], load[0]/load[2], load[1]);
333
334   // Store the data for this iteration
335   AdaptiveData data;
336   data.iteration = lb_no_iterations;
337   data.max_load = max;
338   data.avg_load = avg;
339   adaptive_lbdb.history_data.push_back(data);
340
341   if (max/avg >= 1.01) {
342     CkPrintf("Carry out load balancing step\n");
343     thisProxy.ProcessAtSync();
344     return;
345   }
346
347   // Generate the plan for the adaptive strategy
348   if (generatePlan()) {
349     thisProxy.ResumeClients(lb_ideal_period, 0);
350   } else {
351     thisProxy.ResumeClients(0);
352   }
353 }
354
355 bool CentralLB::generatePlan() {
356   if (adaptive_lbdb.history_data.size() <= 3) {
357     return false;
358   }
359
360   // Some heuristics for lbperiod
361   // If constant load or almost constant,
362   // then max * new_lb_period > avg * new_lb_period + lb_cost
363   double max = 0.0;
364   double avg = 0.0;
365   AdaptiveData data;
366   for (int i = 1; i < adaptive_lbdb.history_data.size(); i++) {
367     data = adaptive_lbdb.history_data[i];
368     max += data.max_load;
369     avg += data.avg_load;
370   }
371   max /= (lb_no_iterations - adaptive_lbdb.history_data[0].iteration);
372   avg /= (lb_no_iterations - adaptive_lbdb.history_data[0].iteration);
373
374   lb_ideal_period = (lb_strategy_cost + lb_migration_cost) / (max - avg);
375   CkPrintf("max : %lf, avg: %lf, strat cost: %lf, migration_cost: %lf, idealperiod : %d \n",
376       max, avg, lb_strategy_cost, lb_migration_cost, lb_ideal_period);
377
378   return true;
379 }
380
381 // called only on 0
382 void CentralLB::ReceiveCounts(CkReductionMsg  *msg)
383 {
384   CmiAssert(CkMyPe() == 0);
385   if (statsData == NULL) statsData = new LDStats;
386
387   int *counts = (int *)msg->getData();
388   int n_objs = counts[0];
389   int n_comm = counts[1];
390
391     // resize database
392   statsData->objData.resize(n_objs);
393   statsData->from_proc.resize(n_objs);
394   statsData->to_proc.resize(n_objs);
395   statsData->commData.resize(n_comm);
396
397   DEBUGF(("[%d] ReceiveCounts: n_objs:%d n_comm:%d\n",CkMyPe(), n_objs, n_comm));
398         
399     // broadcast call to let everybody start to send stats
400   thisProxy.SendStats();
401 }
402
403 void CentralLB::BuildStatsMsg()
404 {
405 #if CMK_LBDB_ON
406   // build and send stats
407   const int osz = theLbdb->GetObjDataSz();
408   const int csz = theLbdb->GetCommDataSz();
409
410   int npes = CkNumPes();
411   CLBStatsMsg* msg = new CLBStatsMsg(osz, csz);
412   _MEMCHECK(msg);
413   msg->from_pe = CkMyPe();
414 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
415         msg->step = step();
416 #endif
417   //msg->serial = CrnRand();
418
419 /*
420   theLbdb->TotalTime(&msg->total_walltime,&msg->total_cputime);
421   theLbdb->IdleTime(&msg->idletime);
422   theLbdb->BackgroundLoad(&msg->bg_walltime,&msg->bg_cputime);
423 */
424 #if CMK_LB_CPUTIMER
425   theLbdb->GetTime(&msg->total_walltime,&msg->total_cputime,
426                    &msg->idletime, &msg->bg_walltime,&msg->bg_cputime);
427 #else
428   theLbdb->GetTime(&msg->total_walltime,&msg->total_walltime,
429                    &msg->idletime, &msg->bg_walltime,&msg->bg_walltime);
430 #endif
431
432   msg->pe_speed = myspeed;
433   DEBUGF(("Processor %d Total time (wall,cpu) = %f %f Idle = %f Bg = %f %f\n", CkMyPe(),msg->total_walltime,msg->total_cputime,msg->idletime,msg->bg_walltime,msg->bg_cputime));
434
435   msg->n_objs = osz;
436   theLbdb->GetObjData(msg->objData);
437   msg->n_comm = csz;
438   theLbdb->GetCommData(msg->commData);
439 //  theLbdb->ClearLoads();
440   DEBUGF(("PE %d BuildStatsMsg %d objs, %d comm\n",CkMyPe(),msg->n_objs,msg->n_comm));
441
442   if(CkMyPe() == cur_ld_balancer) {
443     msg->avail_vector = new char[CkNumPes()];
444     LBDatabaseObj()->get_avail_vector(msg->avail_vector);
445     msg->next_lb = LBDatabaseObj()->new_lbbalancer();
446   }
447
448   CmiAssert(statsMsg == NULL);
449   statsMsg = msg;
450 #endif
451 }
452
453
454 // called on every processor
455 void CentralLB::SendStats()
456 {
457 #if CMK_LBDB_ON
458   CmiAssert(statsMsg != NULL);
459   reduction_started = 0;
460
461 #if USE_LDB_SPANNING_TREE
462   if(CkNumPes()>1024)
463   {
464     if (CkMyPe() == cur_ld_balancer)
465       thisProxy[CkMyPe()].ReceiveStats(statsMsg);
466     else
467       thisProxy[CkMyPe()].ReceiveStatsViaTree(statsMsg);
468   }
469   else
470 #endif
471   {
472     DEBUGF(("[%d] calling ReceiveStats on step %d \n",CmiMyPe(),step()));
473     thisProxy[cur_ld_balancer].ReceiveStats(statsMsg);
474   }
475
476   statsMsg = NULL;
477
478 #ifdef __BIGSIM__
479   BgEndStreaming();
480 #endif
481
482   {
483   // enfore the barrier to wait until centralLB says no
484   LDOMHandle h;
485   h.id.id.idx = 0;
486   theLbdb->getLBDB()->RegisteringObjects(h);
487   }
488 #endif
489 }
490
491 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
492 extern int donotCountMigration;
493 #endif
494
495 void CentralLB::Migrated(LDObjHandle h, int waitBarrier)
496 {
497 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
498     if(donotCountMigration){
499         return ;
500     }
501 #endif
502
503 #if CMK_LBDB_ON
504   if (waitBarrier) {
505             migrates_completed++;
506       DEBUGF(("[%d] An object migrated! %d %d\n",CkMyPe(),migrates_completed,migrates_expected));
507     if (migrates_completed == migrates_expected) {
508       MigrationDone(1);
509     }
510   }
511   else {
512     future_migrates_completed ++;
513     DEBUGF(("[%d] An object migrated with no barrier! %d expected: %d\n",CkMyPe(),future_migrates_completed,future_migrates_expected));
514     if (future_migrates_completed == future_migrates_expected)  {
515         CheckMigrationComplete();
516     }
517   }
518 #endif
519 }
520
521 void CentralLB::MissMigrate(int waitForBarrier)
522 {
523   LDObjHandle h;
524   Migrated(h, waitForBarrier);
525 }
526
527 // build a complete data from bufferred messages
528 // not used when USE_REDUCTION = 1
529 void CentralLB::buildStats()
530 {
531     statsData->nprocs() = stats_msg_count;
532     // allocate space
533     statsData->objData.resize(statsData->n_objs);
534     statsData->from_proc.resize(statsData->n_objs);
535     statsData->to_proc.resize(statsData->n_objs);
536     statsData->commData.resize(statsData->n_comm);
537
538     int nobj = 0;
539     int ncom = 0;
540     int nmigobj = 0;
541     // copy all data in individule message to this big structure
542     for (int pe=0; pe<CkNumPes(); pe++) {
543        int i;
544        CLBStatsMsg *msg = statsMsgsList[pe];
545        if(msg == NULL) continue;
546        for (i=0; i<msg->n_objs; i++) {
547          statsData->from_proc[nobj] = statsData->to_proc[nobj] = pe;
548          statsData->objData[nobj] = msg->objData[i];
549          if (msg->objData[i].migratable) nmigobj++;
550          nobj++;
551        }
552        for (i=0; i<msg->n_comm; i++) {
553          statsData->commData[ncom] = msg->commData[i];
554          ncom++;
555        }
556        // free the memory
557        delete msg;
558        statsMsgsList[pe]=0;
559     }
560     statsData->n_migrateobjs = nmigobj;
561 }
562
563 // deposit one processor data at a time, note database is pre-allocated
564 // to have enough space
565 // used when USE_REDUCTION = 1
566 void CentralLB::depositData(CLBStatsMsg *m)
567 {
568   int i;
569   if (m == NULL) return;
570
571   const int pe = m->from_pe;
572   struct ProcStats &procStat = statsData->procs[pe];
573   procStat.pe = pe;
574   procStat.total_walltime = m->total_walltime;
575   procStat.idletime = m->idletime;
576   procStat.bg_walltime = m->bg_walltime;
577 #if CMK_LB_CPUTIMER
578   procStat.total_cputime = m->total_cputime;
579   procStat.bg_cputime = m->bg_cputime;
580 #endif
581   procStat.pe_speed = m->pe_speed;
582   //procStat.utilization = 1.0;
583   procStat.available = CmiTrue;
584   procStat.n_objs = m->n_objs;
585
586   int &nobj = statsData->n_objs;
587   int &nmigobj = statsData->n_migrateobjs;
588   for (i=0; i<m->n_objs; i++) {
589       statsData->from_proc[nobj] = statsData->to_proc[nobj] = pe;
590       statsData->objData[nobj] = m->objData[i];
591       if (m->objData[i].migratable) nmigobj++;
592       nobj++;
593       CmiAssert(nobj <= statsData->objData.capacity());
594   }
595   int &n_comm = statsData->n_comm;
596   for (i=0; i<m->n_comm; i++) {
597       statsData->commData[n_comm] = m->commData[i];
598       n_comm++;
599       CmiAssert(n_comm <= statsData->commData.capacity());
600   }
601   delete m;
602 }
603
604 void CentralLB::ReceiveStats(CkMarshalledCLBStatsMessage &msg)
605 {
606 #if CMK_LBDB_ON
607   if (statsMsgsList == NULL) {
608     statsMsgsList = new CLBStatsMsg*[CkNumPes()];
609     CmiAssert(statsMsgsList != NULL);
610     for(int i=0; i < CkNumPes(); i++)
611       statsMsgsList[i] = 0;
612   }
613   if (statsData == NULL) statsData = new LDStats;
614
615     //  loop through all CLBStatsMsg in the incoming msg
616   int count = msg.getCount();
617   for (int num = 0; num < count; num++) 
618   {
619     CLBStatsMsg *m = msg.getMessage(num);
620     CmiAssert(m!=NULL);
621     const int pe = m->from_pe;
622     DEBUGF(("Stats msg received, %d %d %d %p step %d\n", pe,stats_msg_count,m->n_objs,m,step()));
623 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))     
624 /*      
625  *  if(m->step < step()){
626  *    //TODO: if a processor is redoing an old load balance step..
627  *    //tell it that the step is done and that it should not perform any migrations
628  *      thisProxy[pe].ReceiveDummyMigration();
629  *  }*/
630 #endif
631         
632     if(!CmiNodeAlive(pe)){
633         DEBUGF(("[%d] ReceiveStats called from invalidProcessor %d\n",CkMyPe(),pe));
634         continue;
635     }
636         
637     if (m->avail_vector!=NULL) {
638       LBDatabaseObj()->set_avail_vector(m->avail_vector,  m->next_lb);
639     }
640
641     if (statsMsgsList[pe] != 0) {
642       CkPrintf("*** Unexpected CLBStatsMsg in ReceiveStats from PE %d ***\n",
643              pe);
644     } else {
645       statsMsgsList[pe] = m;
646 #if USE_REDUCTION
647       depositData(m);
648 #else
649       // store per processor data right away
650       struct ProcStats &procStat = statsData->procs[pe];
651       procStat.pe = pe;
652       procStat.total_walltime = m->total_walltime;
653       procStat.idletime = m->idletime;
654       procStat.bg_walltime = m->bg_walltime;
655 #if CMK_LB_CPUTIMER
656       procStat.total_cputime = m->total_cputime;
657       procStat.bg_cputime = m->bg_cputime;
658 #endif
659       procStat.pe_speed = m->pe_speed;
660       //procStat.utilization = 1.0;
661       procStat.available = CmiTrue;
662       procStat.n_objs = m->n_objs;
663
664       statsData->n_objs += m->n_objs;
665       statsData->n_comm += m->n_comm;
666 #endif
667       stats_msg_count++;
668     }
669   }    // end of for
670
671   const int clients = CkNumValidPes();
672   DEBUGF(("THIS POINT count = %d, clients = %d\n",stats_msg_count,clients));
673  
674   if (stats_msg_count == clients) {
675         DEBUGF(("[%d] All stats messages received \n",CmiMyPe()));
676     statsData->nprocs() = stats_msg_count;
677     thisProxy[CkMyPe()].LoadBalance();
678   }
679 #endif
680 }
681
682 /** added by Abhinav for receiving msgs via spanning tree */
683 void CentralLB::ReceiveStatsViaTree(CkMarshalledCLBStatsMessage &msg)
684 {
685 #if CMK_LBDB_ON
686         CmiAssert(CkMyPe() != 0);
687         bufMsg.add(msg);         // buffer messages
688         count_msgs++;
689         //CkPrintf("here %d\n", CkMyPe());
690         if (count_msgs == st.numChildren+1) {
691                 if(st.parent == 0)
692                 {
693                         thisProxy[0].ReceiveStats(bufMsg);
694                         //CkPrintf("from %d\n", CkMyPe());
695                 }
696                 else
697                         thisProxy[st.parent].ReceiveStatsViaTree(bufMsg);
698                 count_msgs = 0;
699                 bufMsg.free();
700         } 
701 #endif
702 }
703
704 void CentralLB::LoadBalance()
705 {
706 #if CMK_LBDB_ON
707   int proc;
708   const int clients = CkNumPes();
709
710 #if ! USE_REDUCTION
711   // build data
712   buildStats();
713 #else
714   for (proc = 0; proc < clients; proc++) statsMsgsList[proc] = NULL;
715 #endif
716
717   if (_lb_args.debug()) 
718       CmiPrintf("\nCharmLB> %s: PE [%d] step %d starting at %f Memory: %f MB\n",
719                   lbname, cur_ld_balancer, step(), start_lb_time,
720                   CmiMemoryUsage()/(1024.0*1024.0));
721
722   // if we are in simulation mode read data
723   if (LBSimulation::doSimulation) simulationRead();
724
725   char *availVector = LBDatabaseObj()->availVector();
726   for(proc = 0; proc < clients; proc++)
727       statsData->procs[proc].available = (CmiBool)availVector[proc];
728
729   preprocess(statsData);
730
731 //    CkPrintf("Before Calling Strategy\n");
732
733   if (_lb_args.printSummary()) {
734       LBInfo info(clients);
735         // not take comm data
736       info.getInfo(statsData, clients, 0);
737       LBRealType mLoad, mCpuLoad, totalLoad;
738       info.getSummary(mLoad, mCpuLoad, totalLoad);
739       int nmsgs, nbytes;
740       statsData->computeNonlocalComm(nmsgs, nbytes);
741       CkPrintf("[%d] Load Summary (before LB): max (with bg load): %f max (obj only): %f average: %f at step %d nonlocal: %d msgs %.2fKB.\n", CkMyPe(), mLoad, mCpuLoad, totalLoad/clients, step(), nmsgs, 1.0*nbytes/1024);
742 //      if (_lb_args.debug() > 1) {
743 //        for (int i=0; i<statsData->n_objs; i++)
744 //          CmiPrintf("[%d] %.10f %.10f\n", i, statsData->objData[i].minWall, statsData->objData[i].maxWall);
745 //      }
746   }
747
748 #if CMK_REPLAYSYSTEM
749   LDHandle *loadBalancer_pointers;
750   if (_replaySystem) {
751     loadBalancer_pointers = (LDHandle*)malloc(CkNumPes()*sizeof(LDHandle));
752     for (int i=0; i<statsData->n_objs; ++i) loadBalancer_pointers[statsData->from_proc[i]] = statsData->objData[i].handle.omhandle.ldb;
753   }
754 #endif
755   
756   LBMigrateMsg* migrateMsg = Strategy(statsData);
757 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
758         migrateMsg->step = step();
759 #endif
760
761 #if CMK_REPLAYSYSTEM
762   CpdHandleLBMessage(&migrateMsg);
763   if (_replaySystem) {
764     for (int i=0; i<migrateMsg->n_moves; ++i) migrateMsg->moves[i].obj.omhandle.ldb = loadBalancer_pointers[migrateMsg->moves[i].from_pe];
765     free(loadBalancer_pointers);
766   }
767 #endif
768   
769   LBDatabaseObj()->get_avail_vector(migrateMsg->avail_vector);
770   migrateMsg->next_lb = LBDatabaseObj()->new_lbbalancer();
771
772   // if this is the step at which we need to dump the database
773   simulationWrite();
774
775 //  calculate predicted load
776 //  very time consuming though, so only happen when debugging is on
777   if (_lb_args.printSummary()) {
778       LBInfo info(clients);
779         // not take comm data
780       getPredictedLoadWithMsg(statsData, clients, migrateMsg, info, 0);
781       LBRealType mLoad, mCpuLoad, totalLoad;
782       info.getSummary(mLoad, mCpuLoad, totalLoad);
783       int nmsgs, nbytes;
784       statsData->computeNonlocalComm(nmsgs, nbytes);
785       CkPrintf("[%d] Load Summary (after LB): max (with bg load): %f max (obj only): %f average: %f at step %d nonlocal: %d msgs %.2fKB useMem: %.2fKB.\n", CkMyPe(), mLoad, mCpuLoad, totalLoad/clients, step(), nmsgs, 1.0*nbytes/1024, (1.0*useMem())/1024);
786       for (int i=0; i<clients; i++)
787         migrateMsg->expectedLoad[i] = info.peLoads[i];
788   }
789
790   DEBUGF(("[%d]calling recv migration\n",CkMyPe()));
791 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_)) 
792     lbDecisionCount++;
793     migrateMsg->lbDecisionCount = lbDecisionCount;
794 #endif
795
796   envelope *env = UsrToEnv(migrateMsg);
797   if (1) {
798       // broadcast
799     thisProxy.ReceiveMigration(migrateMsg);
800   }
801   else {
802     // split the migration for each processor
803     for (int p=0; p<CkNumPes(); p++) {
804       LBMigrateMsg *m = extractMigrateMsg(migrateMsg, p);
805       thisProxy[p].ReceiveMigration(m);
806     }
807     delete migrateMsg;
808   }
809
810   // Zero out data structures for next cycle
811   // CkPrintf("zeroing out data\n");
812   statsData->clear();
813   stats_msg_count=0;
814 #endif
815 }
816
817 // test if sender and receiver in a commData is nonmigratable.
818 static int isMigratable(LDObjData **objData, int *len, int count, const LDCommData &commData)
819 {
820 #if CMK_LBDB_ON
821   for (int pe=0 ; pe<count; pe++)
822   {
823     for (int i=0; i<len[pe]; i++)
824       if (LDObjIDEqual(objData[pe][i].objID(), commData.sender.objID()) ||
825           LDObjIDEqual(objData[pe][i].objID(), commData.receiver.get_destObj().objID())) 
826       return 0;
827   }
828 #endif
829   return 1;
830 }
831
832 // rebuild LDStats and remove all non-migratble objects and related things
833 void CentralLB::removeNonMigratable(LDStats* stats, int count)
834 {
835   int i;
836
837   // check if we have non-migratable objects
838   int have = 0;
839   for (i=0; i<stats->n_objs; i++) 
840   {
841     LDObjData &odata = stats->objData[i];
842     if (!odata.migratable) {
843       have = 1; break;
844     }
845   }
846   if (have == 0) return;
847
848   CkVec<LDObjData> nonmig;
849   CkVec<int> new_from_proc, new_to_proc;
850   nonmig.resize(stats->n_migrateobjs);
851   new_from_proc.resize(stats->n_migrateobjs);
852   new_to_proc.resize(stats->n_migrateobjs);
853   int n_objs = 0;
854   for (i=0; i<stats->n_objs; i++) 
855   {
856     LDObjData &odata = stats->objData[i];
857     if (odata.migratable) {
858       nonmig[n_objs] = odata;
859       new_from_proc[n_objs] = stats->from_proc[i];
860       new_to_proc[n_objs] = stats->to_proc[i];
861       n_objs ++;
862     }
863     else {
864       stats->procs[stats->from_proc[i]].bg_walltime += odata.wallTime;
865 #if CMK_LB_CPUTIMER
866       stats->procs[stats->from_proc[i]].bg_cputime += odata.cpuTime;
867 #endif
868     }
869   }
870   CmiAssert(stats->n_migrateobjs == n_objs);
871
872   stats->makeCommHash();
873   
874   CkVec<LDCommData> newCommData;
875   newCommData.resize(stats->n_comm);
876   int n_comm = 0;
877   for (i=0; i<stats->n_comm; i++) 
878   {
879     LDCommData& cdata = stats->commData[i];
880     if (!cdata.from_proc()) 
881     {
882       int idx = stats->getSendHash(cdata);
883       CmiAssert(idx != -1);
884       if (!stats->objData[idx].migratable) continue;
885     }
886     switch (cdata.receiver.get_type()) {
887     case LD_PROC_MSG:
888       break;
889     case LD_OBJ_MSG:  {
890       int idx = stats->getRecvHash(cdata);
891       if (stats->complete_flag)
892         CmiAssert(idx != -1);
893       else if (idx == -1) continue;          // receiver not in this group
894       if (!stats->objData[idx].migratable) continue;
895       break;
896       }
897     case LD_OBJLIST_MSG:    // object message FIXME add multicast
898       break;
899     }
900     newCommData[n_comm] = cdata;
901     n_comm ++;
902   }
903
904   if (n_objs != stats->n_objs) CmiPrintf("Removed %d nonmigratable %d comms - n_objs:%d migratable:%d\n", stats->n_objs-n_objs, stats->n_objs, stats->n_migrateobjs, stats->n_comm-n_comm);
905
906   // swap to new data
907   stats->objData = nonmig;
908   stats->from_proc = new_from_proc;
909   stats->to_proc = new_to_proc;
910   stats->n_objs = n_objs;
911
912   stats->commData = newCommData;
913   stats->n_comm = n_comm;
914
915   stats->deleteCommHash();
916   stats->makeCommHash();
917
918 }
919
920
921 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
922 extern int restarted;
923 #endif
924
925 void CentralLB::ReceiveMigration(LBMigrateMsg *m)
926 {
927   storedMigrateMsg = m;
928   CkCallback cb(CkIndex_CentralLB::ProcessReceiveMigration((CkReductionMsg*)NULL),
929                   thisProxy);
930   contribute(0, NULL, CkReduction::max_int, cb);
931 }
932
933 void CentralLB::ProcessReceiveMigration(CkReductionMsg  *msg)
934 {
935 #if CMK_LBDB_ON
936         int i;
937         LBMigrateMsg *m = storedMigrateMsg;
938         CmiAssert(m!=NULL);
939         delete msg;
940
941 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
942         int *dummyCounts;
943
944         DEBUGF(("[%d] Starting ReceiveMigration WITH step %d m->step %d\n",CkMyPe(),step(),m->step));
945         // CmiPrintf("[%d] Starting ReceiveMigration step %d m->step %d\n",CkMyPe(),step(),m->step);
946         if(step() > m->step){
947                 char str[100];
948                 envelope *env = UsrToEnv(m);
949                 CmiPrintf("[%d] Object %s tProcessed %d m->TN %d\n",CmiMyPe(),mlogData->objID.toString(str),mlogData->tProcessed,env->TN);
950                 return;
951         }
952         lbDecisionCount = m->lbDecisionCount;
953 #endif
954
955   if (_lb_args.debug() > 1) 
956     if (CkMyPe()%1024==0) CmiPrintf("[%d] Starting ReceiveMigration step %d at %f\n",CkMyPe(),step(), CmiWallTimer());
957
958   for (i=0; i<CkNumPes(); i++) theLbdb->lastLBInfo.expectedLoad[i] = m->expectedLoad[i];
959   CmiAssert(migrates_expected <= 0 || migrates_completed == migrates_expected);
960 /*FAULT_EVAC*/
961   if(!CmiNodeAlive(CkMyPe())){
962         delete m;
963         return;
964   }
965   migrates_expected = 0;
966   future_migrates_expected = 0;
967 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
968         int sending=0;
969     int dummy=0;
970         LBDB *_myLBDB = theLbdb->getLBDB();
971         if(_restartFlag){
972         dummyCounts = new int[CmiNumPes()];
973         bzero(dummyCounts,sizeof(int)*CmiNumPes());
974     }
975 #endif
976   for(i=0; i < m->n_moves; i++) {
977     MigrateInfo& move = m->moves[i];
978     const int me = CkMyPe();
979     if (move.from_pe == me && move.to_pe != me) {
980       DEBUGF(("[%d] migrating object to %d\n",move.from_pe,move.to_pe));
981       // migrate object, in case it is already gone, inform toPe
982 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
983       if (theLbdb->Migrate(move.obj,move.to_pe) == 0) 
984          thisProxy[move.to_pe].MissMigrate(!move.async_arrival);
985 #else
986             if(_restartFlag == 0){
987                 DEBUG(CmiPrintf("[%d] need to move object from %d to %d \n",CkMyPe(),move.from_pe,move.to_pe));
988                 theLbdb->Migrate(move.obj,move.to_pe);
989                 sending++;
990             }else{
991                 if(_myLBDB->validObjHandle(move.obj)){
992                     DEBUG(CmiPrintf("[%d] need to move object from %d to %d \n",CkMyPe(),move.from_pe,move.to_pe));
993                     theLbdb->Migrate(move.obj,move.to_pe);
994                     sending++;
995                 }else{
996                     DEBUG(CmiPrintf("[%d] dummy move to pe %d detected after restart \n",CmiMyPe(),move.to_pe));
997                     dummyCounts[move.to_pe]++;
998                     dummy++;
999                 }
1000             }
1001 #endif
1002     } else if (move.from_pe != me && move.to_pe == me) {
1003        DEBUGF(("[%d] expecting object from %d\n",move.to_pe,move.from_pe));
1004       if (!move.async_arrival) migrates_expected++;
1005       else future_migrates_expected++;
1006     }
1007   }
1008   DEBUGF(("[%d] in ReceiveMigration %d moves expected: %d future expected: %d\n",CkMyPe(),m->n_moves, migrates_expected, future_migrates_expected));
1009   // if (_lb_debug) CkPrintf("[%d] expecting %d objects migrating.\n", CkMyPe(), migrates_expected);
1010
1011 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1012         if(_restartFlag){
1013                 sendDummyMigrationCounts(dummyCounts);
1014                 _restartFlag  =0;
1015         delete []dummyCounts;
1016         }
1017 #endif
1018
1019
1020 #if 0
1021   if (m->n_moves ==0) {
1022     theLbdb->SetLBPeriod(theLbdb->GetLBPeriod()*2);
1023   }
1024 #endif
1025   cur_ld_balancer = m->next_lb;
1026   if((CkMyPe() == cur_ld_balancer) && (cur_ld_balancer != 0)){
1027       LBDatabaseObj()->set_avail_vector(m->avail_vector, -2);
1028   }
1029
1030   if (migrates_expected == 0 || migrates_completed == migrates_expected)
1031     MigrationDone(1);
1032   delete m;
1033
1034 //      CkEvacuatedElement();
1035 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1036 //  migrates_expected = 0;
1037 //  //  ResumeClients(1);
1038 #endif
1039 #endif
1040 }
1041
1042 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1043 void CentralLB::ReceiveDummyMigration(int globalDecisionCount){
1044     DEBUGF(("[%d] ReceiveDummyMigration called for step %d with globalDecisionCount %d\n",CkMyPe(),step(),globalDecisionCount));
1045     //TODO: this is gonna be important when a crash happens during checkpoint
1046     //the globalDecisionCount would have to be saved and compared against
1047     //a future recvMigration
1048                 
1049         thisProxy[CkMyPe()].ResumeClients(1);
1050 }
1051 #endif
1052
1053 void CentralLB::MigrationDone(int balancing)
1054 {
1055 #if CMK_LBDB_ON
1056   migrates_completed = 0;
1057   migrates_expected = -1;
1058   // clear load stats
1059   if (balancing) theLbdb->ClearLoads();
1060   // Increment to next step
1061   theLbdb->incStep();
1062         DEBUGF(("[%d] Incrementing Step %d \n",CkMyPe(),step()));
1063   // if sync resume, invoke a barrier
1064
1065 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1066     savedBalancing = balancing;
1067     startLoadBalancingMlog(&resumeCentralLbAfterChkpt,(void *)this);
1068 #endif
1069
1070   LBDatabase::Object()->MigrationDone();    // call registered callbacks
1071
1072   LoadbalanceDone(balancing);        // callback
1073 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
1074   // if sync resume invoke a barrier
1075   if (balancing && _lb_args.syncResume()) {
1076     CkCallback cb(CkIndex_CentralLB::ResumeClients((CkReductionMsg*)NULL), 
1077                   thisProxy);
1078     contribute(0, NULL, CkReduction::sum_int, cb);
1079   }
1080   else{ 
1081     if(CmiNodeAlive(CkMyPe())){
1082         thisProxy [CkMyPe()].ResumeClients(balancing);
1083     }   
1084   }     
1085 #if CMK_GRID_QUEUE_AVAILABLE
1086   CmiGridQueueDeregisterAll ();
1087   CpvAccess(CkGridObject) = NULL;
1088 #endif
1089 #endif 
1090 #endif
1091 }
1092
1093 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1094 void CentralLB::endMigrationDone(int balancing){
1095     DEBUGF(("[%d] CentralLB::endMigrationDone step %d\n",CkMyPe(),step()));
1096
1097
1098   if (balancing && _lb_args.syncResume()) {
1099     CkCallback cb(CkIndex_CentralLB::ResumeClients((CkReductionMsg*)NULL),
1100                   thisProxy);
1101     contribute(0, NULL, CkReduction::sum_int, cb);
1102   }
1103   else{
1104     if(CmiNodeAlive(CkMyPe())){
1105     DEBUGF(("[%d] Sending ResumeClients balancing %d \n",CkMyPe(),balancing));
1106     thisProxy [CkMyPe()].ResumeClients(balancing);
1107     }
1108   }
1109
1110 }
1111 #endif
1112
1113 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1114 void resumeCentralLbAfterChkpt(void *_lb){
1115     CentralLB *lb= (CentralLB *)_lb;
1116     CpvAccess(_currentObj)=lb;
1117     lb->endMigrationDone(lb->savedBalancing);
1118 }
1119 #endif
1120
1121
1122 void CentralLB::ResumeClients(CkReductionMsg *msg)
1123 {
1124   ResumeClients(1);
1125   delete msg;
1126 }
1127
1128 void CentralLB::ResumeClients(int ideal_period, int balancing) {
1129   lb_ideal_period = ideal_period;
1130   ResumeClients(balancing);
1131 }
1132
1133 void CentralLB::ResumeClients(int balancing)
1134 {
1135 #if CMK_LBDB_ON
1136 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1137     resumeCount++;
1138     globalResumeCount = resumeCount;
1139 #endif
1140   DEBUGF(("[%d] Resuming clients. balancing:%d.\n",CkMyPe(),balancing));
1141   if (balancing && _lb_args.debug() && CkMyPe() == cur_ld_balancer) {
1142     double end_lb_time = CkWallTimer();
1143   }
1144
1145 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
1146   if (balancing) ComlibNotifyMigrationDone();  
1147 #endif
1148
1149   theLbdb->ResumeClients();
1150   if (balancing)  {
1151     CheckMigrationComplete();
1152     if (future_migrates_expected == 0 || 
1153             future_migrates_expected == future_migrates_completed) {
1154       CheckMigrationComplete();
1155     }
1156   }
1157 #endif
1158 }
1159
1160 /*
1161   migration of objects contains two different kinds:
1162   (1) objects want to make a barrier for migration completion
1163       (waitForBarrier is true)
1164       migrationDone() to finish and resumeClients
1165   (2) objects don't need a barrier
1166   However, next load balancing can only happen when both migrations complete
1167 */ 
1168 void CentralLB::CheckMigrationComplete()
1169 {
1170 #if CMK_LBDB_ON
1171   lbdone ++;
1172   if (lbdone == 2) {
1173     if (_lb_args.debug() && CkMyPe()==0) {
1174       double end_lb_time = CkWallTimer();
1175       CkPrintf("CharmLB> %s: PE [%d] step %d finished at %f duration %f s\n\n",
1176                 lbname, cur_ld_balancer, step()-1, end_lb_time,
1177                 end_lb_time-start_lb_time);
1178     }
1179
1180     lb_migration_cost = (CkWallTimer() - start_lb_time);
1181
1182     lbdone = 0;
1183     future_migrates_expected = -1;
1184     future_migrates_completed = 0;
1185     DEBUGF(("[%d] Migration Complete\n", CkMyPe()));
1186     // release local barrier  so that the next load balancer can go
1187     LDOMHandle h;
1188     h.id.id.idx = 0;
1189     theLbdb->getLBDB()->DoneRegisteringObjects(h);
1190     // switch to the next load balancer in the list
1191     // subtle: called from Migrated() may result in Migrated() called in next LB
1192     theLbdb->nextLoadbalancer(seqno);
1193   }
1194 #endif
1195 }
1196
1197 void CentralLB::preprocess(LDStats* stats)
1198 {
1199   if (_lb_args.ignoreBgLoad())
1200     stats->clearBgLoad();
1201
1202   // Call the predictor for the future
1203   if (_lb_predict) FuturePredictor(statsData);
1204 }
1205
1206 // default load balancing strategy
1207 LBMigrateMsg* CentralLB::Strategy(LDStats* stats)
1208 {
1209 #if CMK_LBDB_ON
1210   double strat_start_time = CkWallTimer();
1211   if (_lb_args.debug())
1212     CkPrintf("CharmLB> %s: PE [%d] strategy starting at %f\n", lbname, cur_ld_balancer, strat_start_time);
1213
1214   work(stats);
1215
1216   if (_lb_args.debug()>1)  {
1217     CkPrintf("CharmLB> Obj Map:\n");
1218     for (int i=0; i<stats->n_objs; i++) CkPrintf("%d ", stats->to_proc[i]);
1219     CkPrintf("\n");
1220   }
1221
1222   LBMigrateMsg *msg = createMigrateMsg(stats);
1223
1224   if (_lb_args.debug()) {
1225     double strat_end_time = CkWallTimer();
1226     envelope *env = UsrToEnv(msg);
1227
1228     double lbdbMemsize = LBDatabase::Object()->useMem()/1000;
1229     CkPrintf("CharmLB> %s: PE [%d] Memory: LBManager: %d KB CentralLB: %d KB\n",
1230               lbname, cur_ld_balancer, (int)lbdbMemsize, (int)(useMem()/1000));
1231     CkPrintf("CharmLB> %s: PE [%d] #Objects migrating: %d, LBMigrateMsg size: %.2f MB\n", lbname, cur_ld_balancer, msg->n_moves, env->getTotalsize()/1024.0/1024.0);
1232     CkPrintf("CharmLB> %s: PE [%d] strategy finished at %f duration %f s\n",
1233               lbname, cur_ld_balancer, strat_end_time, strat_end_time-strat_start_time);
1234     lb_strategy_cost = (strat_end_time - strat_start_time);
1235     CkPrintf("Strategy cost %f %f %f\n", strat_end_time, strat_start_time, lb_strategy_cost);
1236   }
1237
1238   return msg;
1239 #else
1240   return NULL;
1241 #endif
1242 }
1243
1244 void CentralLB::work(LDStats* stats)
1245 {
1246   // does nothing but print the database
1247   stats->print();
1248 }
1249
1250 // generate migrate message from stats->from_proc and to_proc
1251 LBMigrateMsg * CentralLB::createMigrateMsg(LDStats* stats)
1252 {
1253   int i;
1254   CkVec<MigrateInfo*> migrateInfo;
1255   for (i=0; i<stats->n_objs; i++) {
1256     LDObjData &objData = stats->objData[i];
1257     int frompe = stats->from_proc[i];
1258     int tope = stats->to_proc[i];
1259     if (frompe != tope) {
1260       //      CkPrintf("[%d] Obj %d migrating from %d to %d\n",
1261       //         CkMyPe(),obj,pe,dest);
1262       MigrateInfo *migrateMe = new MigrateInfo;
1263       migrateMe->obj = objData.handle;
1264       migrateMe->from_pe = frompe;
1265       migrateMe->to_pe = tope;
1266       migrateMe->async_arrival = objData.asyncArrival;
1267       migrateInfo.insertAtEnd(migrateMe);
1268     }
1269   }
1270
1271   int migrate_count=migrateInfo.length();
1272   LBMigrateMsg* msg = new(migrate_count,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
1273   msg->n_moves = migrate_count;
1274   for(i=0; i < migrate_count; i++) {
1275     MigrateInfo* item = (MigrateInfo*) migrateInfo[i];
1276     msg->moves[i] = *item;
1277     delete item;
1278     migrateInfo[i] = 0;
1279   }
1280   return msg;
1281 }
1282
1283 LBMigrateMsg * CentralLB::extractMigrateMsg(LBMigrateMsg *m, int p)
1284 {
1285   int nmoves = 0;
1286   int nunavail = 0;
1287   int i;
1288   for (i=0; i<m->n_moves; i++) {
1289     MigrateInfo* item = (MigrateInfo*) &m->moves[i];
1290     if (item->from_pe == p || item->to_pe == p) nmoves++;
1291   }
1292   for (i=0; i<CkNumPes();i++) {
1293     if (!m->avail_vector[i]) nunavail++;
1294   }
1295   LBMigrateMsg* msg;
1296   if (nunavail) msg = new(nmoves,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
1297   else msg = new(nmoves,0,0,0) LBMigrateMsg;
1298   msg->n_moves = nmoves;
1299   msg->level = m->level;
1300   msg->next_lb = m->next_lb;
1301   for (i=0,nmoves=0; i<m->n_moves; i++) {
1302     MigrateInfo* item = (MigrateInfo*) &m->moves[i];
1303     if (item->from_pe == p || item->to_pe == p) {
1304       msg->moves[nmoves] = *item;
1305       nmoves++;
1306     }
1307   }
1308   // copy processor data
1309   if (nunavail)
1310   for (i=0; i<CkNumPes();i++) {
1311     msg->avail_vector[i] = m->avail_vector[i];
1312     msg->expectedLoad[i] = m->expectedLoad[i];
1313   }
1314   return msg;
1315 }
1316
1317 void CentralLB::simulationWrite() {
1318   if(step() == LBSimulation::dumpStep)
1319   {
1320     // here we are supposed to dump the database
1321     int dumpFileSize = strlen(LBSimulation::dumpFile) + 4;
1322     char *dumpFileName = (char *)malloc(dumpFileSize);
1323     while (sprintf(dumpFileName, "%s.%d", LBSimulation::dumpFile, LBSimulation::dumpStep) >= dumpFileSize) {
1324       free(dumpFileName);
1325       dumpFileSize+=3;
1326       dumpFileName = (char *)malloc(dumpFileSize);
1327     }
1328     writeStatsMsgs(dumpFileName);
1329     free(dumpFileName);
1330     CmiPrintf("LBDump: Dumped the load balancing data at step %d.\n",LBSimulation::dumpStep);
1331     ++LBSimulation::dumpStep;
1332     --LBSimulation::dumpStepSize;
1333     if (LBSimulation::dumpStepSize <= 0) { // prevent stupid step sizes
1334       CmiPrintf("Charm++> Exiting...\n");
1335       CkExit();
1336     }
1337     return;
1338   }
1339 }
1340
1341 void CentralLB::simulationRead() {
1342   LBSimulation *simResults = NULL, *realResults;
1343   LBMigrateMsg *voidMessage = new (0,0,0,0) LBMigrateMsg();
1344   voidMessage->n_moves=0;
1345   for ( ;LBSimulation::simStepSize > 0; --LBSimulation::simStepSize, ++LBSimulation::simStep) {
1346     // here we are supposed to read the data from the dump database
1347     int simFileSize = strlen(LBSimulation::dumpFile) + 4;
1348     char *simFileName = (char *)malloc(simFileSize);
1349     while (sprintf(simFileName, "%s.%d", LBSimulation::dumpFile, LBSimulation::simStep) >= simFileSize) {
1350       free(simFileName);
1351       simFileSize+=3;
1352       simFileName = (char *)malloc(simFileSize);
1353     }
1354     readStatsMsgs(simFileName);
1355
1356     // allocate simResults (only the first step)
1357     if (simResults == NULL) {
1358       simResults = new LBSimulation(LBSimulation::simProcs);
1359       realResults = new LBSimulation(LBSimulation::simProcs);
1360     }
1361     else {
1362       // should be the same number of procs of the original simulation!
1363       if (!LBSimulation::procsChanged) {
1364         // it means we have a previous step, so in simResults there is data.
1365         // we can now print the real effects of the load balancer during the simulation
1366         // or print the difference between the predicted data and the real one.
1367         realResults->reset();
1368         // reset to_proc of statsData to be equal to from_proc
1369         for (int k=0; k < statsData->n_objs; ++k) statsData->to_proc[k] = statsData->from_proc[k];
1370         findSimResults(statsData, LBSimulation::simProcs, voidMessage, realResults);
1371         simResults->PrintDifferences(realResults,statsData);
1372       }
1373       simResults->reset();
1374     }
1375
1376     // now pass it to the strategy routine
1377     double startT = CkWallTimer();
1378     preprocess(statsData);
1379     CmiPrintf("%s> Strategy starts ... \n", lbname);
1380     LBMigrateMsg* migrateMsg = Strategy(statsData);
1381     CmiPrintf("%s> Strategy took %fs memory usage: CentralLB: %d KB.\n",
1382                lbname, CkWallTimer()-startT, (int)(useMem()/1000));
1383
1384     // now calculate the results of the load balancing simulation
1385     findSimResults(statsData, LBSimulation::simProcs, migrateMsg, simResults);
1386
1387     // now we have the simulation data, so print it and loop
1388     CmiPrintf("Charm++> LBSim: Simulation of load balancing step %d done.\n",LBSimulation::simStep);
1389     // **CWL** Officially recording my disdain here for using ints for bool
1390     if (LBSimulation::showDecisionsOnly) {
1391       simResults->PrintDecisions(migrateMsg, simFileName, 
1392                                  LBSimulation::simProcs);
1393     } else {
1394       simResults->PrintSimulationResults();
1395     }
1396
1397     free(simFileName);
1398     delete migrateMsg;
1399     CmiPrintf("Charm++> LBSim: Passing to the next step\n");
1400   }
1401   // deallocate simResults
1402   delete simResults;
1403   CmiPrintf("Charm++> Exiting...\n");
1404   CkExit();
1405 }
1406
1407 void CentralLB::readStatsMsgs(const char* filename) 
1408 {
1409 #if CMK_LBDB_ON
1410   int i;
1411   FILE *f = fopen(filename, "r");
1412   if (f==NULL) {
1413     CmiPrintf("Fatal Error> Cannot open LB Dump file %s!\n", filename);
1414     CmiAbort("");
1415   }
1416
1417   // at this stage, we need to rebuild the statsMsgList and
1418   // statsDataList structures. For that first deallocate the
1419   // old structures
1420   if (statsMsgsList) {
1421     for(i = 0; i < stats_msg_count; i++)
1422       delete statsMsgsList[i];
1423     delete[] statsMsgsList;
1424     statsMsgsList=0;
1425   }
1426
1427   PUP::fromDisk pd(f);
1428   PUP::machineInfo machInfo;
1429
1430   pd((char *)&machInfo, sizeof(machInfo));      // read machine info
1431   PUP::xlater p(machInfo, pd);
1432
1433   if (_lb_args.lbversion() > 1) {
1434     p|_lb_args.lbversion();             // write version number
1435     CkPrintf("LB> File version detected: %d\n", _lb_args.lbversion());
1436     CmiAssert(_lb_args.lbversion() <= LB_FORMAT_VERSION);
1437   }
1438   p|stats_msg_count;
1439
1440   CmiPrintf("readStatsMsgs for %d pes starts ... \n", stats_msg_count);
1441   if (LBSimulation::simProcs == 0) LBSimulation::simProcs = stats_msg_count;
1442   if (LBSimulation::simProcs != stats_msg_count) LBSimulation::procsChanged = true;
1443
1444   // LBSimulation::simProcs must be set
1445   statsData->pup(p);
1446
1447   CmiPrintf("Simulation for %d pes \n", LBSimulation::simProcs);
1448   CmiPrintf("n_obj: %d n_migratble: %d \n", statsData->n_objs, statsData->n_migrateobjs);
1449
1450   // file f is closed in the destructor of PUP::fromDisk
1451   CmiPrintf("ReadStatsMsg from %s completed\n", filename);
1452 #endif
1453 }
1454
1455 void CentralLB::writeStatsMsgs(const char* filename) 
1456 {
1457 #if CMK_LBDB_ON
1458   FILE *f = fopen(filename, "w");
1459   if (f==NULL) {
1460     CmiPrintf("Fatal Error> writeStatsMsgs failed to open the output file %s!\n", filename);
1461     CmiAbort("");
1462   }
1463
1464   const PUP::machineInfo &machInfo = PUP::machineInfo::current();
1465   PUP::toDisk p(f);
1466   p((char *)&machInfo, sizeof(machInfo));       // machine info
1467
1468   p|_lb_args.lbversion();               // write version number
1469   p|stats_msg_count;
1470   statsData->pup(p);
1471
1472   fclose(f);
1473
1474   CmiPrintf("WriteStatsMsgs to %s succeed!\n", filename);
1475 #endif
1476 }
1477
1478 // calculate the predicted wallclock/cpu load for every processors
1479 // considering communication overhead if considerComm is true
1480 void getPredictedLoadWithMsg(BaseLB::LDStats* stats, int count, 
1481                       LBMigrateMsg *msg, LBInfo &info, 
1482                       int considerComm)
1483 {
1484 #if CMK_LBDB_ON
1485         stats->makeCommHash();
1486
1487         // update to_proc according to migration msgs
1488         for(int i = 0; i < msg->n_moves; i++) {
1489           MigrateInfo &mInfo = msg->moves[i];
1490           int idx = stats->getHash(mInfo.obj.objID(), mInfo.obj.omID());
1491           CmiAssert(idx != -1);
1492           stats->to_proc[idx] = mInfo.to_pe;
1493         }
1494
1495         info.getInfo(stats, count, considerComm);
1496 #endif
1497 }
1498
1499
1500 void CentralLB::findSimResults(LDStats* stats, int count, LBMigrateMsg* msg, LBSimulation* simResults)
1501 {
1502     CkAssert(simResults != NULL && count == simResults->numPes);
1503     // estimate the new loads of the processors. As a first approximation, this is the
1504     // sum of the cpu times of the objects on that processor
1505     double startT = CkWallTimer();
1506     getPredictedLoadWithMsg(stats, count, msg, simResults->lbinfo, 1);
1507     CmiPrintf("getPredictedLoad finished in %fs\n", CkWallTimer()-startT);
1508 }
1509
1510 void CentralLB::pup(PUP::er &p) { 
1511   BaseLB::pup(p); 
1512   if (p.isUnpacking())  {
1513     initLB(CkLBOptions(seqno)); 
1514   }
1515   p|reduction_started;
1516   int has_statsMsg=0;
1517   if (p.isPacking()) has_statsMsg = (statsMsg!=NULL);
1518   p|has_statsMsg;
1519   if (has_statsMsg) {
1520     if (p.isUnpacking())
1521       statsMsg = new CLBStatsMsg;
1522     statsMsg->pup(p);
1523   }
1524 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1525   p | lbDecisionCount;
1526   p | resumeCount;
1527 #endif
1528         
1529 }
1530
1531 int CentralLB::useMem() { 
1532   return sizeof(CentralLB) + statsData->useMem() + 
1533          CkNumPes() * sizeof(CLBStatsMsg *);
1534 }
1535
1536
1537 /**
1538   CLBStatsMsg is not a real message now.
1539   CLBStatsMsg is used for all processors to fill in their local load and comm
1540   statistics and send to processor 0
1541 */
1542
1543 CLBStatsMsg::CLBStatsMsg(int osz, int csz) {
1544   n_objs = osz;
1545   n_comm = csz;
1546   objData = new LDObjData[osz];
1547   commData = new LDCommData[csz];
1548   avail_vector = NULL;
1549 }
1550
1551 CLBStatsMsg::~CLBStatsMsg() {
1552   delete [] objData;
1553   delete [] commData;
1554   delete [] avail_vector;
1555 }
1556
1557 void CLBStatsMsg::pup(PUP::er &p) {
1558   int i;
1559   p|from_pe;
1560   p|pe_speed;
1561   p|total_walltime;
1562   p|idletime;
1563   p|bg_walltime;
1564 #if CMK_LB_CPUTIMER
1565   p|total_cputime;
1566   p|bg_cputime;
1567 #endif
1568 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1569   p | step;
1570 #endif
1571   p|n_objs;
1572   if (p.isUnpacking()) objData = new LDObjData[n_objs];
1573   for (i=0; i<n_objs; i++) p|objData[i];
1574   p|n_comm;
1575   if (p.isUnpacking()) commData = new LDCommData[n_comm];
1576   for (i=0; i<n_comm; i++) p|commData[i];
1577
1578   int has_avail_vector;
1579   if (!p.isUnpacking()) has_avail_vector = (avail_vector != NULL);
1580   p|has_avail_vector;
1581   if (p.isUnpacking()) {
1582     if (has_avail_vector) avail_vector = new char[CkNumPes()];
1583     else avail_vector = NULL;
1584   }
1585   if (has_avail_vector) p(avail_vector, CkNumPes());
1586
1587   p(next_lb);
1588 }
1589
1590 // CkMarshalledCLBStatsMessage is used in the marshalled parameter in
1591 // the entry function, it is just used to use to pup.
1592 // I don't use CLBStatsMsg directly as marshalled parameter because
1593 // I want the data pointer stored and not to be freed by the Charm++.
1594 void CkMarshalledCLBStatsMessage::free() { 
1595   int count = msgs.size();
1596   for  (int i=0; i<count; i++) {
1597     delete msgs[i];
1598     msgs[i] = NULL;
1599   }
1600   msgs.free();
1601 }
1602
1603 void CkMarshalledCLBStatsMessage::add(CkMarshalledCLBStatsMessage &m)
1604 {
1605   int count = m.getCount();
1606   for (int i=0; i<count; i++) add(m.getMessage(i));
1607 }
1608
1609 void CkMarshalledCLBStatsMessage::pup(PUP::er &p)
1610 {
1611   int count = msgs.size();
1612   p|count;
1613   for (int i=0; i<count; i++) {
1614     CLBStatsMsg *msg;
1615     if (p.isUnpacking()) msg = new CLBStatsMsg;
1616     else { 
1617       msg = msgs[i]; CmiAssert(msg!=NULL);
1618     }
1619     msg->pup(p);
1620     if (p.isUnpacking()) add(msg);
1621   }
1622 }
1623
1624 SpanningTree::SpanningTree()
1625 {
1626         double sq = sqrt(CkNumPes()*4.0-3.0) - 1; // 1 + arity + arity*arity = CkNumPes()
1627         arity = (int)ceil(sq/2);
1628         calcParent(CkMyPe());
1629         calcNumChildren(CkMyPe());
1630 }
1631
1632 void SpanningTree::calcParent(int n)
1633 {
1634         parent=-1;
1635         if(n != 0  && arity > 0)
1636                 parent = (n-1)/arity;
1637 }
1638
1639 void SpanningTree::calcNumChildren(int n)
1640 {
1641         numChildren = 0;
1642         if (arity == 0) return;
1643         int fullNode=(CkNumPes()-1-arity)/arity;
1644         if(n <= fullNode)
1645                 numChildren = arity;
1646         if(n == fullNode+1)
1647                 numChildren = CkNumPes()-1-(fullNode+1)*arity;
1648         if(n > fullNode+1)
1649                 numChildren = 0;
1650 }
1651
1652 #include "CentralLB.def.h"
1653  
1654 /*@}*/