cleanup
[charm.git] / src / ck-ldb / CentralLB.C
1
2 /**
3  * \addtogroup CkLdb
4 */
5 /*@{*/
6
7 #include <charm++.h>
8 #include "ck.h"
9 #include "envelope.h"
10 #include "CentralLB.h"
11 #include "LBDBManager.h"
12 #include "LBSimulation.h"
13
14 #define  DEBUGF(x)       // CmiPrintf x;
15 #define  DEBUG(x)        // x;
16
17 #if CMK_MEM_CHECKPOINT
18    /* can not handle reduction in inmem FT */
19 #define USE_REDUCTION         0
20 #define USE_LDB_SPANNING_TREE 0
21 #else
22 #define USE_REDUCTION         1
23 #define USE_LDB_SPANNING_TREE 1
24 #endif
25
26 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
27 extern int _restartFlag;
28 extern void getGlobalStep(CkGroupID );
29 extern void initMlogLBStep(CkGroupID );
30 extern int globalResumeCount;
31 extern void sendDummyMigrationCounts(int *);
32 #endif
33
34 #if CMK_GRID_QUEUE_AVAILABLE
35 CpvExtern(void *, CkGridObject);
36 #endif
37
38 #if CMK_GLOBAL_LOCATION_UPDATE      
39 extern void UpdateLocation(MigrateInfo& migData); 
40 #endif
41
42 CkGroupID loadbalancer;
43 int * lb_ptr;
44 int load_balancer_created;
45
46 CreateLBFunc_Def(CentralLB, "CentralLB base class")
47
48 static void getPredictedLoadWithMsg(BaseLB::LDStats* stats, int count, 
49                              LBMigrateMsg *, LBInfo &info, int considerComm);
50
51 /*
52 void CreateCentralLB()
53 {
54   CProxy_CentralLB::ckNew(0);
55 }
56 */
57
58 void CentralLB::staticStartLB(void* data)
59 {
60   CentralLB *me = (CentralLB*)(data);
61   me->StartLB();
62 }
63
64 void CentralLB::staticMigrated(void* data, LDObjHandle h, int waitBarrier)
65 {
66   CentralLB *me = (CentralLB*)(data);
67   me->Migrated(h, waitBarrier);
68 }
69
70 void CentralLB::staticAtSync(void* data)
71 {
72   CentralLB *me = (CentralLB*)(data);
73   me->AtSync();
74 }
75
76 void CentralLB::initLB(const CkLBOptions &opt)
77 {
78 #if CMK_LBDB_ON
79   lbname = "CentralLB";
80   thisProxy = CProxy_CentralLB(thisgroup);
81   //  CkPrintf("Construct in %d\n",CkMyPe());
82
83   // create and turn on by default
84   receiver = theLbdb->
85     AddLocalBarrierReceiver((LDBarrierFn)(staticAtSync),(void*)(this));
86   notifier = theLbdb->getLBDB()->
87     NotifyMigrated((LDMigratedFn)(staticMigrated),(void*)(this));
88   startLbFnHdl = theLbdb->getLBDB()->
89     AddStartLBFn((LDStartLBFn)(staticStartLB),(void*)(this));
90
91   // CkPrintf("[%d] CentralLB initLB \n",CkMyPe());
92   if (opt.getSeqNo() > 0) turnOff();
93
94   stats_msg_count = 0;
95   statsMsgsList = NULL;
96   statsData = NULL;
97
98   storedMigrateMsg = NULL;
99   reduction_started = 0;
100
101   // for future predictor
102   if (_lb_predict) predicted_model = new FutureModel(_lb_predict_window);
103   else predicted_model=0;
104   // register user interface callbacks
105   theLbdb->getLBDB()->SetupPredictor((LDPredictModelFn)(staticPredictorOn),(LDPredictWindowFn)(staticPredictorOnWin),(LDPredictFn)(staticPredictorOff),(LDPredictModelFn)(staticChangePredictor),(void*)(this));
106
107   myspeed = theLbdb->ProcessorSpeed();
108
109   migrates_completed = 0;
110   future_migrates_completed = 0;
111   migrates_expected = -1;
112   future_migrates_expected = -1;
113   cur_ld_balancer = _lb_args.central_pe();      // 0 default
114   lbdone = 0;
115   count_msgs=0;
116   statsMsg = NULL;
117
118   if (_lb_args.statsOn()) theLbdb->CollectStatsOn();
119
120   load_balancer_created = 1;
121 #endif
122 }
123
124 CentralLB::~CentralLB()
125 {
126 #if CMK_LBDB_ON
127   delete [] statsMsgsList;
128   delete statsData;
129   theLbdb = CProxy_LBDatabase(_lbdb).ckLocalBranch();
130   if (theLbdb) {
131     theLbdb->getLBDB()->
132       RemoveNotifyMigrated(notifier);
133     theLbdb->
134       RemoveStartLBFn((LDStartLBFn)(staticStartLB));
135   }
136 #endif
137 }
138
139 void CentralLB::turnOn() 
140 {
141 #if CMK_LBDB_ON
142   theLbdb->getLBDB()->
143     TurnOnBarrierReceiver(receiver);
144   theLbdb->getLBDB()->
145     TurnOnNotifyMigrated(notifier);
146   theLbdb->getLBDB()->
147     TurnOnStartLBFn(startLbFnHdl);
148 #endif
149 }
150
151 void CentralLB::turnOff() 
152 {
153 #if CMK_LBDB_ON
154   theLbdb->getLBDB()->
155     TurnOffBarrierReceiver(receiver);
156   theLbdb->getLBDB()->
157     TurnOffNotifyMigrated(notifier);
158   theLbdb->getLBDB()->
159     TurnOffStartLBFn(startLbFnHdl);
160 #endif
161 }
162
163 void CentralLB::AtSync()
164 {
165 #if CMK_LBDB_ON
166   DEBUGF(("[%d] CentralLB AtSync step %d!!!!!\n",CkMyPe(),step()));
167
168 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
169         CpvAccess(_currentObj)=this;
170 #endif
171
172   // if num of processor is only 1, nothing should happen
173   if (!QueryBalanceNow(step()) || CkNumPes() == 1) {
174     MigrationDone(0);
175     return;
176   }
177   if(CmiNodeAlive(CkMyPe())){
178     thisProxy [CkMyPe()].ProcessAtSync();
179   }
180 #endif
181 }
182
183 #include "ComlibStrategy.h"
184
185 void CentralLB::ProcessAtSync()
186 {
187 #if CMK_LBDB_ON
188   if (reduction_started) return;              // reducton in progress
189
190   CmiAssert(CmiNodeAlive(CkMyPe()));
191   if (CkMyPe() == cur_ld_balancer) {
192     start_lb_time = CkWallTimer();
193   }
194
195 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
196         initMlogLBStep(thisgroup);
197 #endif
198
199   // build message
200   BuildStatsMsg();
201
202 #if USE_REDUCTION
203     // reduction to get total number of objects and comm
204     // so that processor 0 can pre-allocate load balancing database
205   int counts[2];
206   counts[0] = theLbdb->GetObjDataSz();
207   counts[1] = theLbdb->GetCommDataSz();
208
209   CkCallback cb(CkIndex_CentralLB::ReceiveCounts((CkReductionMsg*)NULL), 
210                   thisProxy[0]);
211   contribute(2*sizeof(int), counts, CkReduction::sum_int, cb);
212   reduction_started = 1;
213 #else
214   SendStats();
215 #endif
216 #endif
217 }
218
219 // called only on 0
220 void CentralLB::ReceiveCounts(CkReductionMsg  *msg)
221 {
222   CmiAssert(CkMyPe() == 0);
223   if (statsData == NULL) statsData = new LDStats;
224
225   int *counts = (int *)msg->getData();
226   int n_objs = counts[0];
227   int n_comm = counts[1];
228
229     // resize database
230   statsData->objData.resize(n_objs);
231   statsData->from_proc.resize(n_objs);
232   statsData->to_proc.resize(n_objs);
233   statsData->commData.resize(n_comm);
234
235   DEBUGF(("[%d] ReceiveCounts: n_objs:%d n_comm:%d\n",CkMyPe(), n_objs, n_comm));
236         
237     // broadcast call to let everybody start to send stats
238   thisProxy.SendStats();
239 }
240
241 void CentralLB::BuildStatsMsg()
242 {
243 #if CMK_LBDB_ON
244   // build and send stats
245   const int osz = theLbdb->GetObjDataSz();
246   const int csz = theLbdb->GetCommDataSz();
247
248   int npes = CkNumPes();
249   CLBStatsMsg* msg = new CLBStatsMsg(osz, csz);
250   _MEMCHECK(msg);
251   msg->from_pe = CkMyPe();
252 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
253         msg->step = step();
254 #endif
255   //msg->serial = CrnRand();
256
257 /*
258   theLbdb->TotalTime(&msg->total_walltime,&msg->total_cputime);
259   theLbdb->IdleTime(&msg->idletime);
260   theLbdb->BackgroundLoad(&msg->bg_walltime,&msg->bg_cputime);
261 */
262 #if CMK_LB_CPUTIMER
263   theLbdb->GetTime(&msg->total_walltime,&msg->total_cputime,
264                    &msg->idletime, &msg->bg_walltime,&msg->bg_cputime);
265 #else
266   theLbdb->GetTime(&msg->total_walltime,&msg->total_walltime,
267                    &msg->idletime, &msg->bg_walltime,&msg->bg_walltime);
268 #endif
269
270   msg->pe_speed = myspeed;
271   DEBUGF(("Processor %d Total time (wall,cpu) = %f %f Idle = %f Bg = %f %f\n", CkMyPe(),msg->total_walltime,msg->total_cputime,msg->idletime,msg->bg_walltime,msg->bg_cputime));
272
273   msg->n_objs = osz;
274   theLbdb->GetObjData(msg->objData);
275   msg->n_comm = csz;
276   theLbdb->GetCommData(msg->commData);
277 //  theLbdb->ClearLoads();
278   DEBUGF(("PE %d BuildStatsMsg %d objs, %d comm\n",CkMyPe(),msg->n_objs,msg->n_comm));
279
280   if(CkMyPe() == cur_ld_balancer) {
281     msg->avail_vector = new char[CkNumPes()];
282     LBDatabaseObj()->get_avail_vector(msg->avail_vector);
283     msg->next_lb = LBDatabaseObj()->new_lbbalancer();
284   }
285
286   CmiAssert(statsMsg == NULL);
287   statsMsg = msg;
288 #endif
289 }
290
291
292 // called on every processor
293 void CentralLB::SendStats()
294 {
295 #if CMK_LBDB_ON
296   CmiAssert(statsMsg != NULL);
297   reduction_started = 0;
298
299 #if USE_LDB_SPANNING_TREE
300   if(CkNumPes()>1024)
301   {
302     if (CkMyPe() == cur_ld_balancer)
303       thisProxy[CkMyPe()].ReceiveStats(statsMsg);
304     else
305       thisProxy[CkMyPe()].ReceiveStatsViaTree(statsMsg);
306   }
307   else
308 #endif
309   {
310     DEBUGF(("[%d] calling ReceiveStats on step %d \n",CmiMyPe(),step()));
311     thisProxy[cur_ld_balancer].ReceiveStats(statsMsg);
312   }
313
314   statsMsg = NULL;
315
316 #ifdef __BIGSIM__
317   BgEndStreaming();
318 #endif
319
320   {
321   // enfore the barrier to wait until centralLB says no
322   LDOMHandle h;
323   h.id.id.idx = 0;
324   theLbdb->getLBDB()->RegisteringObjects(h);
325   }
326 #endif
327 }
328
329 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
330 extern int donotCountMigration;
331 #endif
332
333 void CentralLB::Migrated(LDObjHandle h, int waitBarrier)
334 {
335 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
336     if(donotCountMigration){
337         return ;
338     }
339 #endif
340
341 #if CMK_LBDB_ON
342   if (waitBarrier) {
343             migrates_completed++;
344       DEBUGF(("[%d] An object migrated! %d %d\n",CkMyPe(),migrates_completed,migrates_expected));
345     if (migrates_completed == migrates_expected) {
346       MigrationDone(1);
347     }
348   }
349   else {
350     future_migrates_completed ++;
351     DEBUGF(("[%d] An object migrated with no barrier! %d expected: %d\n",CkMyPe(),future_migrates_completed,future_migrates_expected));
352     if (future_migrates_completed == future_migrates_expected)  {
353         CheckMigrationComplete();
354     }
355   }
356 #endif
357 }
358
359 void CentralLB::MissMigrate(int waitForBarrier)
360 {
361   LDObjHandle h;
362   Migrated(h, waitForBarrier);
363 }
364
365 // build a complete data from bufferred messages
366 // not used when USE_REDUCTION = 1
367 void CentralLB::buildStats()
368 {
369     statsData->nprocs() = stats_msg_count;
370     // allocate space
371     statsData->objData.resize(statsData->n_objs);
372     statsData->from_proc.resize(statsData->n_objs);
373     statsData->to_proc.resize(statsData->n_objs);
374     statsData->commData.resize(statsData->n_comm);
375
376     int nobj = 0;
377     int ncom = 0;
378     int nmigobj = 0;
379     // copy all data in individule message to this big structure
380     for (int pe=0; pe<CkNumPes(); pe++) {
381        int i;
382        CLBStatsMsg *msg = statsMsgsList[pe];
383        if(msg == NULL) continue;
384        for (i=0; i<msg->n_objs; i++) {
385          statsData->from_proc[nobj] = statsData->to_proc[nobj] = pe;
386          statsData->objData[nobj] = msg->objData[i];
387          if (msg->objData[i].migratable) nmigobj++;
388          nobj++;
389        }
390        for (i=0; i<msg->n_comm; i++) {
391          statsData->commData[ncom] = msg->commData[i];
392          ncom++;
393        }
394        // free the memory
395        delete msg;
396        statsMsgsList[pe]=0;
397     }
398     statsData->n_migrateobjs = nmigobj;
399 }
400
401 // deposit one processor data at a time, note database is pre-allocated
402 // to have enough space
403 // used when USE_REDUCTION = 1
404 void CentralLB::depositData(CLBStatsMsg *m)
405 {
406   int i;
407   if (m == NULL) return;
408
409   const int pe = m->from_pe;
410   struct ProcStats &procStat = statsData->procs[pe];
411   procStat.pe = pe;
412   procStat.total_walltime = m->total_walltime;
413   procStat.idletime = m->idletime;
414   procStat.bg_walltime = m->bg_walltime;
415 #if CMK_LB_CPUTIMER
416   procStat.total_cputime = m->total_cputime;
417   procStat.bg_cputime = m->bg_cputime;
418 #endif
419   procStat.pe_speed = m->pe_speed;
420   //procStat.utilization = 1.0;
421   procStat.available = CmiTrue;
422   procStat.n_objs = m->n_objs;
423
424   int &nobj = statsData->n_objs;
425   int &nmigobj = statsData->n_migrateobjs;
426   for (i=0; i<m->n_objs; i++) {
427       statsData->from_proc[nobj] = statsData->to_proc[nobj] = pe;
428       statsData->objData[nobj] = m->objData[i];
429       if (m->objData[i].migratable) nmigobj++;
430       nobj++;
431       CmiAssert(nobj <= statsData->objData.capacity());
432   }
433   int &n_comm = statsData->n_comm;
434   for (i=0; i<m->n_comm; i++) {
435       statsData->commData[n_comm] = m->commData[i];
436       n_comm++;
437       CmiAssert(n_comm <= statsData->commData.capacity());
438   }
439   delete m;
440 }
441
442 void CentralLB::ReceiveStats(CkMarshalledCLBStatsMessage &msg)
443 {
444 #if CMK_LBDB_ON
445   if (statsMsgsList == NULL) {
446     statsMsgsList = new CLBStatsMsg*[CkNumPes()];
447     CmiAssert(statsMsgsList != NULL);
448     for(int i=0; i < CkNumPes(); i++)
449       statsMsgsList[i] = 0;
450   }
451   if (statsData == NULL) statsData = new LDStats;
452
453     //  loop through all CLBStatsMsg in the incoming msg
454   int count = msg.getCount();
455   for (int num = 0; num < count; num++) 
456   {
457     CLBStatsMsg *m = msg.getMessage(num);
458     CmiAssert(m!=NULL);
459     const int pe = m->from_pe;
460     DEBUGF(("Stats msg received, %d %d %d %p step %d\n", pe,stats_msg_count,m->n_objs,m,step()));
461 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))     
462 /*      
463  *  if(m->step < step()){
464  *    //TODO: if a processor is redoing an old load balance step..
465  *    //tell it that the step is done and that it should not perform any migrations
466  *      thisProxy[pe].ReceiveDummyMigration();
467  *  }*/
468 #endif
469         
470     if(!CmiNodeAlive(pe)){
471         DEBUGF(("[%d] ReceiveStats called from invalidProcessor %d\n",CkMyPe(),pe));
472         continue;
473     }
474         
475     if (m->avail_vector!=NULL) {
476       LBDatabaseObj()->set_avail_vector(m->avail_vector,  m->next_lb);
477     }
478
479     if (statsMsgsList[pe] != 0) {
480       CkPrintf("*** Unexpected CLBStatsMsg in ReceiveStats from PE %d ***\n",
481              pe);
482     } else {
483       statsMsgsList[pe] = m;
484 #if USE_REDUCTION
485       depositData(m);
486 #else
487       // store per processor data right away
488       struct ProcStats &procStat = statsData->procs[pe];
489       procStat.pe = pe;
490       procStat.total_walltime = m->total_walltime;
491       procStat.idletime = m->idletime;
492       procStat.bg_walltime = m->bg_walltime;
493 #if CMK_LB_CPUTIMER
494       procStat.total_cputime = m->total_cputime;
495       procStat.bg_cputime = m->bg_cputime;
496 #endif
497       procStat.pe_speed = m->pe_speed;
498       //procStat.utilization = 1.0;
499       procStat.available = CmiTrue;
500       procStat.n_objs = m->n_objs;
501
502       statsData->n_objs += m->n_objs;
503       statsData->n_comm += m->n_comm;
504 #endif
505       stats_msg_count++;
506     }
507   }    // end of for
508
509   const int clients = CkNumValidPes();
510   DEBUGF(("THIS POINT count = %d, clients = %d\n",stats_msg_count,clients));
511  
512   if (stats_msg_count == clients) {
513         DEBUGF(("[%d] All stats messages received \n",CmiMyPe()));
514     statsData->nprocs() = stats_msg_count;
515     thisProxy[CkMyPe()].LoadBalance();
516   }
517 #endif
518 }
519
520 /** added by Abhinav for receiving msgs via spanning tree */
521 void CentralLB::ReceiveStatsViaTree(CkMarshalledCLBStatsMessage &msg)
522 {
523 #if CMK_LBDB_ON
524         CmiAssert(CkMyPe() != 0);
525         bufMsg.add(msg);         // buffer messages
526         count_msgs++;
527         //CkPrintf("here %d\n", CkMyPe());
528         if (count_msgs == st.numChildren+1) {
529                 if(st.parent == 0)
530                 {
531                         thisProxy[0].ReceiveStats(bufMsg);
532                         //CkPrintf("from %d\n", CkMyPe());
533                 }
534                 else
535                         thisProxy[st.parent].ReceiveStatsViaTree(bufMsg);
536                 count_msgs = 0;
537                 bufMsg.free();
538         } 
539 #endif
540 }
541
542 void CentralLB::LoadBalance()
543 {
544 #if CMK_LBDB_ON
545   int proc;
546   const int clients = CkNumPes();
547
548 #if ! USE_REDUCTION
549   // build data
550   buildStats();
551 #else
552   for (proc = 0; proc < clients; proc++) statsMsgsList[proc] = NULL;
553 #endif
554
555   theLbdb->ResetAdaptive();
556   if (!_lb_args.samePeSpeed()) statsData->normalize_speed();
557
558   if (_lb_args.debug()) 
559       CmiPrintf("\nCharmLB> %s: PE [%d] step %d starting at %f Memory: %f MB\n",
560                   lbname, cur_ld_balancer, step(), start_lb_time,
561                   CmiMemoryUsage()/(1024.0*1024.0));
562
563   // if we are in simulation mode read data
564   if (LBSimulation::doSimulation) simulationRead();
565
566   char *availVector = LBDatabaseObj()->availVector();
567   for(proc = 0; proc < clients; proc++)
568       statsData->procs[proc].available = (CmiBool)availVector[proc];
569
570   preprocess(statsData);
571
572 //    CkPrintf("Before Calling Strategy\n");
573
574   if (_lb_args.printSummary()) {
575       LBInfo info(clients);
576         // not take comm data
577       info.getInfo(statsData, clients, 0);
578       LBRealType mLoad, mCpuLoad, totalLoad;
579       info.getSummary(mLoad, mCpuLoad, totalLoad);
580       int nmsgs, nbytes;
581       statsData->computeNonlocalComm(nmsgs, nbytes);
582       CkPrintf("[%d] Load Summary (before LB): max (with bg load): %f max (obj only): %f average: %f at step %d nonlocal: %d msgs %.2fKB.\n", CkMyPe(), mLoad, mCpuLoad, totalLoad/clients, step(), nmsgs, 1.0*nbytes/1024);
583 //      if (_lb_args.debug() > 1) {
584 //        for (int i=0; i<statsData->n_objs; i++)
585 //          CmiPrintf("[%d] %.10f %.10f\n", i, statsData->objData[i].minWall, statsData->objData[i].maxWall);
586 //      }
587   }
588
589 #if CMK_REPLAYSYSTEM
590   LDHandle *loadBalancer_pointers;
591   if (_replaySystem) {
592     loadBalancer_pointers = (LDHandle*)malloc(CkNumPes()*sizeof(LDHandle));
593     for (int i=0; i<statsData->n_objs; ++i) loadBalancer_pointers[statsData->from_proc[i]] = statsData->objData[i].handle.omhandle.ldb;
594   }
595 #endif
596   
597   LBMigrateMsg* migrateMsg = Strategy(statsData);
598 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
599         migrateMsg->step = step();
600 #endif
601
602 #if CMK_REPLAYSYSTEM
603   CpdHandleLBMessage(&migrateMsg);
604   if (_replaySystem) {
605     for (int i=0; i<migrateMsg->n_moves; ++i) migrateMsg->moves[i].obj.omhandle.ldb = loadBalancer_pointers[migrateMsg->moves[i].from_pe];
606     free(loadBalancer_pointers);
607   }
608 #endif
609   
610   LBDatabaseObj()->get_avail_vector(migrateMsg->avail_vector);
611   migrateMsg->next_lb = LBDatabaseObj()->new_lbbalancer();
612
613   // if this is the step at which we need to dump the database
614   simulationWrite();
615
616 //  calculate predicted load
617 //  very time consuming though, so only happen when debugging is on
618   if (_lb_args.printSummary()) {
619       LBInfo info(clients);
620         // not take comm data
621       getPredictedLoadWithMsg(statsData, clients, migrateMsg, info, 0);
622       LBRealType mLoad, mCpuLoad, totalLoad;
623       info.getSummary(mLoad, mCpuLoad, totalLoad);
624       int nmsgs, nbytes;
625       statsData->computeNonlocalComm(nmsgs, nbytes);
626       CkPrintf("[%d] Load Summary (after LB): max (with bg load): %f max (obj only): %f average: %f at step %d nonlocal: %d msgs %.2fKB useMem: %.2fKB.\n", CkMyPe(), mLoad, mCpuLoad, totalLoad/clients, step(), nmsgs, 1.0*nbytes/1024, (1.0*useMem())/1024);
627       for (int i=0; i<clients; i++)
628         migrateMsg->expectedLoad[i] = info.peLoads[i];
629   }
630
631   DEBUGF(("[%d]calling recv migration\n",CkMyPe()));
632 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_)) 
633     lbDecisionCount++;
634     migrateMsg->lbDecisionCount = lbDecisionCount;
635 #endif
636
637   envelope *env = UsrToEnv(migrateMsg);
638   if (1) {
639       // broadcast
640     thisProxy.ReceiveMigration(migrateMsg);
641   }
642   else {
643     // split the migration for each processor
644     for (int p=0; p<CkNumPes(); p++) {
645       LBMigrateMsg *m = extractMigrateMsg(migrateMsg, p);
646       thisProxy[p].ReceiveMigration(m);
647     }
648     delete migrateMsg;
649   }
650
651   // Zero out data structures for next cycle
652   // CkPrintf("zeroing out data\n");
653   statsData->clear();
654   stats_msg_count=0;
655 #endif
656 }
657
658 // test if sender and receiver in a commData is nonmigratable.
659 static int isMigratable(LDObjData **objData, int *len, int count, const LDCommData &commData)
660 {
661 #if CMK_LBDB_ON
662   for (int pe=0 ; pe<count; pe++)
663   {
664     for (int i=0; i<len[pe]; i++)
665       if (LDObjIDEqual(objData[pe][i].objID(), commData.sender.objID()) ||
666           LDObjIDEqual(objData[pe][i].objID(), commData.receiver.get_destObj().objID())) 
667       return 0;
668   }
669 #endif
670   return 1;
671 }
672
673 // rebuild LDStats and remove all non-migratble objects and related things
674 void CentralLB::removeNonMigratable(LDStats* stats, int count)
675 {
676   int i;
677
678   // check if we have non-migratable objects
679   int have = 0;
680   for (i=0; i<stats->n_objs; i++) 
681   {
682     LDObjData &odata = stats->objData[i];
683     if (!odata.migratable) {
684       have = 1; break;
685     }
686   }
687   if (have == 0) return;
688
689   CkVec<LDObjData> nonmig;
690   CkVec<int> new_from_proc, new_to_proc;
691   nonmig.resize(stats->n_migrateobjs);
692   new_from_proc.resize(stats->n_migrateobjs);
693   new_to_proc.resize(stats->n_migrateobjs);
694   int n_objs = 0;
695   for (i=0; i<stats->n_objs; i++) 
696   {
697     LDObjData &odata = stats->objData[i];
698     if (odata.migratable) {
699       nonmig[n_objs] = odata;
700       new_from_proc[n_objs] = stats->from_proc[i];
701       new_to_proc[n_objs] = stats->to_proc[i];
702       n_objs ++;
703     }
704     else {
705       stats->procs[stats->from_proc[i]].bg_walltime += odata.wallTime;
706 #if CMK_LB_CPUTIMER
707       stats->procs[stats->from_proc[i]].bg_cputime += odata.cpuTime;
708 #endif
709     }
710   }
711   CmiAssert(stats->n_migrateobjs == n_objs);
712
713   stats->makeCommHash();
714   
715   CkVec<LDCommData> newCommData;
716   newCommData.resize(stats->n_comm);
717   int n_comm = 0;
718   for (i=0; i<stats->n_comm; i++) 
719   {
720     LDCommData& cdata = stats->commData[i];
721     if (!cdata.from_proc()) 
722     {
723       int idx = stats->getSendHash(cdata);
724       CmiAssert(idx != -1);
725       if (!stats->objData[idx].migratable) continue;
726     }
727     switch (cdata.receiver.get_type()) {
728     case LD_PROC_MSG:
729       break;
730     case LD_OBJ_MSG:  {
731       int idx = stats->getRecvHash(cdata);
732       if (stats->complete_flag)
733         CmiAssert(idx != -1);
734       else if (idx == -1) continue;          // receiver not in this group
735       if (!stats->objData[idx].migratable) continue;
736       break;
737       }
738     case LD_OBJLIST_MSG:    // object message FIXME add multicast
739       break;
740     }
741     newCommData[n_comm] = cdata;
742     n_comm ++;
743   }
744
745   if (n_objs != stats->n_objs) CmiPrintf("Removed %d nonmigratable %d comms - n_objs:%d migratable:%d\n", stats->n_objs-n_objs, stats->n_objs, stats->n_migrateobjs, stats->n_comm-n_comm);
746
747   // swap to new data
748   stats->objData = nonmig;
749   stats->from_proc = new_from_proc;
750   stats->to_proc = new_to_proc;
751   stats->n_objs = n_objs;
752
753   stats->commData = newCommData;
754   stats->n_comm = n_comm;
755
756   stats->deleteCommHash();
757   stats->makeCommHash();
758
759 }
760
761
762 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
763 extern int restarted;
764 #endif
765
766 void CentralLB::ReceiveMigration(LBMigrateMsg *m)
767 {
768   storedMigrateMsg = m;
769 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
770         restoreParallelRecovery(&resumeAfterRestoreParallelRecovery,(void *)this);
771 #else
772   CkCallback cb(CkIndex_CentralLB::ProcessReceiveMigration((CkReductionMsg*)NULL),
773                   thisProxy);
774   contribute(0, NULL, CkReduction::max_int, cb);
775 #endif
776 }
777
778 void CentralLB::ProcessReceiveMigration(CkReductionMsg  *msg)
779 {
780 #if CMK_LBDB_ON
781         int i;
782         LBMigrateMsg *m = storedMigrateMsg;
783         CmiAssert(m!=NULL);
784         delete msg;
785
786 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
787         int *dummyCounts;
788
789         DEBUGF(("[%d] Starting ReceiveMigration WITH step %d m->step %d\n",CkMyPe(),step(),m->step));
790         // CmiPrintf("[%d] Starting ReceiveMigration step %d m->step %d\n",CkMyPe(),step(),m->step);
791         if(step() > m->step){
792                 char str[100];
793                 envelope *env = UsrToEnv(m);
794                 return;
795         }
796         lbDecisionCount = m->lbDecisionCount;
797 #endif
798
799   if (_lb_args.debug() > 1) 
800     if (CkMyPe()%1024==0) CmiPrintf("[%d] Starting ReceiveMigration step %d at %f\n",CkMyPe(),step(), CmiWallTimer());
801
802   for (i=0; i<CkNumPes(); i++) theLbdb->lastLBInfo.expectedLoad[i] = m->expectedLoad[i];
803   CmiAssert(migrates_expected <= 0 || migrates_completed == migrates_expected);
804 /*FAULT_EVAC*/
805   if(!CmiNodeAlive(CkMyPe())){
806         delete m;
807         return;
808   }
809   migrates_expected = 0;
810   future_migrates_expected = 0;
811 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
812         int sending=0;
813     int dummy=0;
814         LBDB *_myLBDB = theLbdb->getLBDB();
815         if(_restartFlag){
816         dummyCounts = new int[CmiNumPes()];
817         bzero(dummyCounts,sizeof(int)*CmiNumPes());
818     }
819 #endif
820   for(i=0; i < m->n_moves; i++) {
821     MigrateInfo& move = m->moves[i];
822     const int me = CkMyPe();
823     if (move.from_pe == me && move.to_pe != me) {
824       DEBUGF(("[%d] migrating object to %d\n",move.from_pe,move.to_pe));
825       // migrate object, in case it is already gone, inform toPe
826 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
827       if (theLbdb->Migrate(move.obj,move.to_pe) == 0) 
828          thisProxy[move.to_pe].MissMigrate(!move.async_arrival);
829 #else
830             if(_restartFlag == 0){
831                 DEBUG(CmiPrintf("[%d] need to move object from %d to %d \n",CkMyPe(),move.from_pe,move.to_pe));
832                 theLbdb->Migrate(move.obj,move.to_pe);
833                 sending++;
834             }else{
835                 if(_myLBDB->validObjHandle(move.obj)){
836                     DEBUG(CmiPrintf("[%d] need to move object from %d to %d \n",CkMyPe(),move.from_pe,move.to_pe));
837                     theLbdb->Migrate(move.obj,move.to_pe);
838                     sending++;
839                 }else{
840                     DEBUG(CmiPrintf("[%d] dummy move to pe %d detected after restart \n",CmiMyPe(),move.to_pe));
841                     dummyCounts[move.to_pe]++;
842                     dummy++;
843                 }
844             }
845 #endif
846     } else if (move.from_pe != me && move.to_pe == me) {
847        DEBUGF(("[%d] expecting object from %d\n",move.to_pe,move.from_pe));
848       if (!move.async_arrival) migrates_expected++;
849       else future_migrates_expected++;
850     }
851     else {
852 #if CMK_GLOBAL_LOCATION_UPDATE      
853       UpdateLocation(move); 
854 #endif
855     }
856
857   }
858   DEBUGF(("[%d] in ReceiveMigration %d moves expected: %d future expected: %d\n",CkMyPe(),m->n_moves, migrates_expected, future_migrates_expected));
859   // if (_lb_debug) CkPrintf("[%d] expecting %d objects migrating.\n", CkMyPe(), migrates_expected);
860
861 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
862         if(_restartFlag){
863                 sendDummyMigrationCounts(dummyCounts);
864                 _restartFlag  =0;
865         delete []dummyCounts;
866         }
867 #endif
868
869
870 #if 0
871   if (m->n_moves ==0) {
872     theLbdb->SetLBPeriod(theLbdb->GetLBPeriod()*2);
873   }
874 #endif
875   cur_ld_balancer = m->next_lb;
876   if((CkMyPe() == cur_ld_balancer) && (cur_ld_balancer != 0)){
877       LBDatabaseObj()->set_avail_vector(m->avail_vector, -2);
878   }
879
880   if (migrates_expected == 0 || migrates_completed == migrates_expected)
881     MigrationDone(1);
882   delete m;
883
884 //      CkEvacuatedElement();
885 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
886 //  migrates_expected = 0;
887 //  //  ResumeClients(1);
888 #endif
889 #endif
890 }
891
892 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
893 void CentralLB::ReceiveDummyMigration(int globalDecisionCount){
894     DEBUGF(("[%d] ReceiveDummyMigration called for step %d with globalDecisionCount %d\n",CkMyPe(),step(),globalDecisionCount));
895     //TODO: this is gonna be important when a crash happens during checkpoint
896     //the globalDecisionCount would have to be saved and compared against
897     //a future recvMigration
898                 
899         thisProxy[CkMyPe()].ResumeClients(1);
900 }
901 #endif
902
903 void CentralLB::MigrationDone(int balancing)
904 {
905 #if CMK_LBDB_ON
906   migrates_completed = 0;
907   migrates_expected = -1;
908   // clear load stats
909   if (balancing) theLbdb->ClearLoads();
910   // Increment to next step
911   theLbdb->incStep();
912         DEBUGF(("[%d] Incrementing Step %d \n",CkMyPe(),step()));
913   // if sync resume, invoke a barrier
914
915 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
916     savedBalancing = balancing;
917     startLoadBalancingMlog(&resumeCentralLbAfterChkpt,(void *)this);
918 #endif
919
920   LBDatabase::Object()->MigrationDone();    // call registered callbacks
921
922   LoadbalanceDone(balancing);        // callback
923 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
924   // if sync resume invoke a barrier
925   if (balancing && _lb_args.syncResume()) {
926     CkCallback cb(CkIndex_CentralLB::ResumeClients((CkReductionMsg*)NULL), 
927                   thisProxy);
928     contribute(0, NULL, CkReduction::sum_int, cb);
929   }
930   else{ 
931     if(CmiNodeAlive(CkMyPe())){
932         thisProxy [CkMyPe()].ResumeClients(balancing);
933     }   
934   }     
935 #if CMK_GRID_QUEUE_AVAILABLE
936   CmiGridQueueDeregisterAll ();
937   CpvAccess(CkGridObject) = NULL;
938 #endif
939 #endif 
940 #endif
941 }
942
943 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
944 void CentralLB::endMigrationDone(int balancing){
945     DEBUGF(("[%d] CentralLB::endMigrationDone step %d\n",CkMyPe(),step()));
946
947
948   if (balancing && _lb_args.syncResume()) {
949     CkCallback cb(CkIndex_CentralLB::ResumeClients((CkReductionMsg*)NULL),
950                   thisProxy);
951     contribute(0, NULL, CkReduction::sum_int, cb);
952   }
953   else{
954     if(CmiNodeAlive(CkMyPe())){
955     DEBUGF(("[%d] Sending ResumeClients balancing %d \n",CkMyPe(),balancing));
956     thisProxy [CkMyPe()].ResumeClients(balancing);
957     }
958   }
959
960 }
961 #endif
962
963 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
964 void resumeCentralLbAfterChkpt(void *_lb){
965     CentralLB *lb= (CentralLB *)_lb;
966     CpvAccess(_currentObj)=lb;
967     lb->endMigrationDone(lb->savedBalancing);
968 }
969 void resumeAfterRestoreParallelRecovery(void *_lb){
970     CentralLB *lb= (CentralLB *)_lb;
971         lb->ProcessReceiveMigration((CkReductionMsg*)NULL);
972 }
973 #endif
974
975
976 void CentralLB::ResumeClients(CkReductionMsg *msg)
977 {
978   ResumeClients(1);
979   delete msg;
980 }
981
982 void CentralLB::ResumeClients(int balancing)
983 {
984 #if CMK_LBDB_ON
985 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
986     resumeCount++;
987     globalResumeCount = resumeCount;
988 #endif
989   DEBUGF(("[%d] Resuming clients. balancing:%d.\n",CkMyPe(),balancing));
990   if (balancing && _lb_args.debug() && CkMyPe() == cur_ld_balancer) {
991     double end_lb_time = CkWallTimer();
992   }
993
994 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
995   if (balancing) ComlibNotifyMigrationDone();  
996 #endif
997
998   theLbdb->ResumeClients();
999   if (balancing)  {
1000
1001     CheckMigrationComplete();
1002     if (future_migrates_expected == 0 || 
1003             future_migrates_expected == future_migrates_completed) {
1004       CheckMigrationComplete();
1005     }
1006   }
1007 #endif
1008 }
1009
1010 /*
1011   migration of objects contains two different kinds:
1012   (1) objects want to make a barrier for migration completion
1013       (waitForBarrier is true)
1014       migrationDone() to finish and resumeClients
1015   (2) objects don't need a barrier
1016   However, next load balancing can only happen when both migrations complete
1017 */ 
1018 void CentralLB::CheckMigrationComplete()
1019 {
1020 #if CMK_LBDB_ON
1021   lbdone ++;
1022   if (lbdone == 2) {
1023     double end_lb_time = CkWallTimer();
1024     if (_lb_args.debug() && CkMyPe()==0) {
1025       CkPrintf("CharmLB> %s: PE [%d] step %d finished at %f duration %f s\n\n",
1026                 lbname, cur_ld_balancer, step()-1, end_lb_time,
1027                 end_lb_time-start_lb_time);
1028     }
1029
1030     theLbdb->SetMigrationCost(end_lb_time - start_lb_time);
1031
1032     lbdone = 0;
1033     future_migrates_expected = -1;
1034     future_migrates_completed = 0;
1035
1036
1037     DEBUGF(("[%d] Migration Complete\n", CkMyPe()));
1038     // release local barrier  so that the next load balancer can go
1039     LDOMHandle h;
1040     h.id.id.idx = 0;
1041     theLbdb->getLBDB()->DoneRegisteringObjects(h);
1042     // switch to the next load balancer in the list
1043     // subtle: called from Migrated() may result in Migrated() called in next LB
1044     theLbdb->nextLoadbalancer(seqno);
1045   }
1046 #endif
1047 }
1048
1049 void CentralLB::preprocess(LDStats* stats)
1050 {
1051   if (_lb_args.ignoreBgLoad())
1052     stats->clearBgLoad();
1053
1054   // Call the predictor for the future
1055   if (_lb_predict) FuturePredictor(statsData);
1056 }
1057
1058 // default load balancing strategy
1059 LBMigrateMsg* CentralLB::Strategy(LDStats* stats)
1060 {
1061 #if CMK_LBDB_ON
1062   double strat_start_time = CkWallTimer();
1063   if (_lb_args.debug())
1064     CkPrintf("CharmLB> %s: PE [%d] strategy starting at %f\n", lbname, cur_ld_balancer, strat_start_time);
1065
1066   work(stats);
1067
1068
1069   if (_lb_args.debug()>2)  {
1070     CkPrintf("CharmLB> Obj Map:\n");
1071     for (int i=0; i<stats->n_objs; i++) CkPrintf("%d ", stats->to_proc[i]);
1072     CkPrintf("\n");
1073   }
1074
1075   LBMigrateMsg *msg = createMigrateMsg(stats);
1076
1077   if (_lb_args.metaLbOn()) {
1078     int clients = CkNumPes();
1079     LBInfo info(clients);
1080     getPredictedLoadWithMsg(stats, clients, msg, info, 0);
1081     LBRealType mLoad, mCpuLoad, totalLoad, totalLoadWComm;
1082     info.getSummary(mLoad, mCpuLoad, totalLoad);
1083     theLbdb->UpdateDataAfterLB(mLoad, mCpuLoad, totalLoad/clients);
1084   }
1085
1086   if (_lb_args.debug()) {
1087     double strat_end_time = CkWallTimer();
1088     envelope *env = UsrToEnv(msg);
1089
1090     double lbdbMemsize = LBDatabase::Object()->useMem()/1000;
1091     CkPrintf("CharmLB> %s: PE [%d] Memory: LBManager: %d KB CentralLB: %d KB\n",
1092               lbname, cur_ld_balancer, (int)lbdbMemsize, (int)(useMem()/1000));
1093     CkPrintf("CharmLB> %s: PE [%d] #Objects migrating: %d, LBMigrateMsg size: %.2f MB\n", lbname, cur_ld_balancer, msg->n_moves, env->getTotalsize()/1024.0/1024.0);
1094     CkPrintf("CharmLB> %s: PE [%d] strategy finished at %f duration %f s\n",
1095               lbname, cur_ld_balancer, strat_end_time, strat_end_time-strat_start_time);
1096     theLbdb->SetStrategyCost(strat_end_time - strat_start_time);
1097   }
1098   return msg;
1099 #else
1100   return NULL;
1101 #endif
1102 }
1103
1104 void CentralLB::work(LDStats* stats)
1105 {
1106   // does nothing but print the database
1107   stats->print();
1108 }
1109
1110 // generate migrate message from stats->from_proc and to_proc
1111 LBMigrateMsg * CentralLB::createMigrateMsg(LDStats* stats)
1112 {
1113   int i;
1114   CkVec<MigrateInfo*> migrateInfo;
1115   for (i=0; i<stats->n_objs; i++) {
1116     LDObjData &objData = stats->objData[i];
1117     int frompe = stats->from_proc[i];
1118     int tope = stats->to_proc[i];
1119     if (frompe != tope) {
1120       //      CkPrintf("[%d] Obj %d migrating from %d to %d\n",
1121       //         CkMyPe(),obj,pe,dest);
1122       MigrateInfo *migrateMe = new MigrateInfo;
1123       migrateMe->obj = objData.handle;
1124       migrateMe->from_pe = frompe;
1125       migrateMe->to_pe = tope;
1126       migrateMe->async_arrival = objData.asyncArrival;
1127       migrateInfo.insertAtEnd(migrateMe);
1128     }
1129   }
1130
1131   int migrate_count=migrateInfo.length();
1132   LBMigrateMsg* msg = new(migrate_count,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
1133   msg->n_moves = migrate_count;
1134   for(i=0; i < migrate_count; i++) {
1135     MigrateInfo* item = (MigrateInfo*) migrateInfo[i];
1136     msg->moves[i] = *item;
1137     delete item;
1138     migrateInfo[i] = 0;
1139   }
1140   return msg;
1141 }
1142
1143 LBMigrateMsg * CentralLB::extractMigrateMsg(LBMigrateMsg *m, int p)
1144 {
1145   int nmoves = 0;
1146   int nunavail = 0;
1147   int i;
1148   for (i=0; i<m->n_moves; i++) {
1149     MigrateInfo* item = (MigrateInfo*) &m->moves[i];
1150     if (item->from_pe == p || item->to_pe == p) nmoves++;
1151   }
1152   for (i=0; i<CkNumPes();i++) {
1153     if (!m->avail_vector[i]) nunavail++;
1154   }
1155   LBMigrateMsg* msg;
1156   if (nunavail) msg = new(nmoves,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
1157   else msg = new(nmoves,0,0,0) LBMigrateMsg;
1158   msg->n_moves = nmoves;
1159   msg->level = m->level;
1160   msg->next_lb = m->next_lb;
1161   for (i=0,nmoves=0; i<m->n_moves; i++) {
1162     MigrateInfo* item = (MigrateInfo*) &m->moves[i];
1163     if (item->from_pe == p || item->to_pe == p) {
1164       msg->moves[nmoves] = *item;
1165       nmoves++;
1166     }
1167   }
1168   // copy processor data
1169   if (nunavail)
1170   for (i=0; i<CkNumPes();i++) {
1171     msg->avail_vector[i] = m->avail_vector[i];
1172     msg->expectedLoad[i] = m->expectedLoad[i];
1173   }
1174   return msg;
1175 }
1176
1177 void CentralLB::simulationWrite() {
1178   if(step() == LBSimulation::dumpStep)
1179   {
1180     // here we are supposed to dump the database
1181     int dumpFileSize = strlen(LBSimulation::dumpFile) + 4;
1182     char *dumpFileName = (char *)malloc(dumpFileSize);
1183     while (sprintf(dumpFileName, "%s.%d", LBSimulation::dumpFile, LBSimulation::dumpStep) >= dumpFileSize) {
1184       free(dumpFileName);
1185       dumpFileSize+=3;
1186       dumpFileName = (char *)malloc(dumpFileSize);
1187     }
1188     writeStatsMsgs(dumpFileName);
1189     free(dumpFileName);
1190     CmiPrintf("LBDump: Dumped the load balancing data at step %d.\n",LBSimulation::dumpStep);
1191     ++LBSimulation::dumpStep;
1192     --LBSimulation::dumpStepSize;
1193     if (LBSimulation::dumpStepSize <= 0) { // prevent stupid step sizes
1194       CmiPrintf("Charm++> Exiting...\n");
1195       CkExit();
1196     }
1197     return;
1198   }
1199 }
1200
1201 void CentralLB::simulationRead() {
1202   LBSimulation *simResults = NULL, *realResults;
1203   LBMigrateMsg *voidMessage = new (0,0,0,0) LBMigrateMsg();
1204   voidMessage->n_moves=0;
1205   for ( ;LBSimulation::simStepSize > 0; --LBSimulation::simStepSize, ++LBSimulation::simStep) {
1206     // here we are supposed to read the data from the dump database
1207     int simFileSize = strlen(LBSimulation::dumpFile) + 4;
1208     char *simFileName = (char *)malloc(simFileSize);
1209     while (sprintf(simFileName, "%s.%d", LBSimulation::dumpFile, LBSimulation::simStep) >= simFileSize) {
1210       free(simFileName);
1211       simFileSize+=3;
1212       simFileName = (char *)malloc(simFileSize);
1213     }
1214     readStatsMsgs(simFileName);
1215
1216     // allocate simResults (only the first step)
1217     if (simResults == NULL) {
1218       simResults = new LBSimulation(LBSimulation::simProcs);
1219       realResults = new LBSimulation(LBSimulation::simProcs);
1220     }
1221     else {
1222       // should be the same number of procs of the original simulation!
1223       if (!LBSimulation::procsChanged) {
1224         // it means we have a previous step, so in simResults there is data.
1225         // we can now print the real effects of the load balancer during the simulation
1226         // or print the difference between the predicted data and the real one.
1227         realResults->reset();
1228         // reset to_proc of statsData to be equal to from_proc
1229         for (int k=0; k < statsData->n_objs; ++k) statsData->to_proc[k] = statsData->from_proc[k];
1230         findSimResults(statsData, LBSimulation::simProcs, voidMessage, realResults);
1231         simResults->PrintDifferences(realResults,statsData);
1232       }
1233       simResults->reset();
1234     }
1235
1236     // now pass it to the strategy routine
1237     double startT = CkWallTimer();
1238     preprocess(statsData);
1239     CmiPrintf("%s> Strategy starts ... \n", lbname);
1240     LBMigrateMsg* migrateMsg = Strategy(statsData);
1241     CmiPrintf("%s> Strategy took %fs memory usage: CentralLB: %d KB.\n",
1242                lbname, CkWallTimer()-startT, (int)(useMem()/1000));
1243
1244     // now calculate the results of the load balancing simulation
1245     findSimResults(statsData, LBSimulation::simProcs, migrateMsg, simResults);
1246
1247     // now we have the simulation data, so print it and loop
1248     CmiPrintf("Charm++> LBSim: Simulation of load balancing step %d done.\n",LBSimulation::simStep);
1249     // **CWL** Officially recording my disdain here for using ints for bool
1250     if (LBSimulation::showDecisionsOnly) {
1251       simResults->PrintDecisions(migrateMsg, simFileName, 
1252                                  LBSimulation::simProcs);
1253     } else {
1254       simResults->PrintSimulationResults();
1255     }
1256
1257     free(simFileName);
1258     delete migrateMsg;
1259     CmiPrintf("Charm++> LBSim: Passing to the next step\n");
1260   }
1261   // deallocate simResults
1262   delete simResults;
1263   CmiPrintf("Charm++> Exiting...\n");
1264   CkExit();
1265 }
1266
1267 void CentralLB::readStatsMsgs(const char* filename) 
1268 {
1269 #if CMK_LBDB_ON
1270   int i;
1271   FILE *f = fopen(filename, "r");
1272   if (f==NULL) {
1273     CmiPrintf("Fatal Error> Cannot open LB Dump file %s!\n", filename);
1274     CmiAbort("");
1275   }
1276
1277   // at this stage, we need to rebuild the statsMsgList and
1278   // statsDataList structures. For that first deallocate the
1279   // old structures
1280   if (statsMsgsList) {
1281     for(i = 0; i < stats_msg_count; i++)
1282       delete statsMsgsList[i];
1283     delete[] statsMsgsList;
1284     statsMsgsList=0;
1285   }
1286
1287   PUP::fromDisk pd(f);
1288   PUP::machineInfo machInfo;
1289
1290   pd((char *)&machInfo, sizeof(machInfo));      // read machine info
1291   PUP::xlater p(machInfo, pd);
1292
1293   if (_lb_args.lbversion() > 1) {
1294     p|_lb_args.lbversion();             // write version number
1295     CkPrintf("LB> File version detected: %d\n", _lb_args.lbversion());
1296     CmiAssert(_lb_args.lbversion() <= LB_FORMAT_VERSION);
1297   }
1298   p|stats_msg_count;
1299
1300   CmiPrintf("readStatsMsgs for %d pes starts ... \n", stats_msg_count);
1301   if (LBSimulation::simProcs == 0) LBSimulation::simProcs = stats_msg_count;
1302   if (LBSimulation::simProcs != stats_msg_count) LBSimulation::procsChanged = true;
1303
1304   // LBSimulation::simProcs must be set
1305   statsData->pup(p);
1306
1307   CmiPrintf("Simulation for %d pes \n", LBSimulation::simProcs);
1308   CmiPrintf("n_obj: %d n_migratble: %d \n", statsData->n_objs, statsData->n_migrateobjs);
1309
1310   // file f is closed in the destructor of PUP::fromDisk
1311   CmiPrintf("ReadStatsMsg from %s completed\n", filename);
1312 #endif
1313 }
1314
1315 void CentralLB::writeStatsMsgs(const char* filename) 
1316 {
1317 #if CMK_LBDB_ON
1318   FILE *f = fopen(filename, "w");
1319   if (f==NULL) {
1320     CmiPrintf("Fatal Error> writeStatsMsgs failed to open the output file %s!\n", filename);
1321     CmiAbort("");
1322   }
1323
1324   const PUP::machineInfo &machInfo = PUP::machineInfo::current();
1325   PUP::toDisk p(f);
1326   p((char *)&machInfo, sizeof(machInfo));       // machine info
1327
1328   p|_lb_args.lbversion();               // write version number
1329   p|stats_msg_count;
1330   statsData->pup(p);
1331
1332   fclose(f);
1333
1334   CmiPrintf("WriteStatsMsgs to %s succeed!\n", filename);
1335 #endif
1336 }
1337
1338 // calculate the predicted wallclock/cpu load for every processors
1339 // considering communication overhead if considerComm is true
1340 void getPredictedLoadWithMsg(BaseLB::LDStats* stats, int count, 
1341                       LBMigrateMsg *msg, LBInfo &info, 
1342                       int considerComm)
1343 {
1344 #if CMK_LBDB_ON
1345         stats->makeCommHash();
1346
1347         // update to_proc according to migration msgs
1348         for(int i = 0; i < msg->n_moves; i++) {
1349           MigrateInfo &mInfo = msg->moves[i];
1350           int idx = stats->getHash(mInfo.obj.objID(), mInfo.obj.omID());
1351           CmiAssert(idx != -1);
1352           stats->to_proc[idx] = mInfo.to_pe;
1353         }
1354
1355         info.getInfo(stats, count, considerComm);
1356 #endif
1357 }
1358
1359
1360 void CentralLB::findSimResults(LDStats* stats, int count, LBMigrateMsg* msg, LBSimulation* simResults)
1361 {
1362     CkAssert(simResults != NULL && count == simResults->numPes);
1363     // estimate the new loads of the processors. As a first approximation, this is the
1364     // sum of the cpu times of the objects on that processor
1365     double startT = CkWallTimer();
1366     getPredictedLoadWithMsg(stats, count, msg, simResults->lbinfo, 1);
1367     CmiPrintf("getPredictedLoad finished in %fs\n", CkWallTimer()-startT);
1368 }
1369
1370 void CentralLB::pup(PUP::er &p) { 
1371   BaseLB::pup(p); 
1372   if (p.isUnpacking())  {
1373     initLB(CkLBOptions(seqno)); 
1374   }
1375   p|reduction_started;
1376   int has_statsMsg=0;
1377   if (p.isPacking()) has_statsMsg = (statsMsg!=NULL);
1378   p|has_statsMsg;
1379   if (has_statsMsg) {
1380     if (p.isUnpacking())
1381       statsMsg = new CLBStatsMsg;
1382     statsMsg->pup(p);
1383   }
1384 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1385   p | lbDecisionCount;
1386   p | resumeCount;
1387 #endif
1388         
1389 }
1390
1391 int CentralLB::useMem() { 
1392   return sizeof(CentralLB) + statsData->useMem() + 
1393          CkNumPes() * sizeof(CLBStatsMsg *);
1394 }
1395
1396
1397 /**
1398   CLBStatsMsg is not a real message now.
1399   CLBStatsMsg is used for all processors to fill in their local load and comm
1400   statistics and send to processor 0
1401 */
1402
1403 CLBStatsMsg::CLBStatsMsg(int osz, int csz) {
1404   n_objs = osz;
1405   n_comm = csz;
1406   objData = new LDObjData[osz];
1407   commData = new LDCommData[csz];
1408   avail_vector = NULL;
1409 }
1410
1411 CLBStatsMsg::~CLBStatsMsg() {
1412   delete [] objData;
1413   delete [] commData;
1414   delete [] avail_vector;
1415 }
1416
1417 void CLBStatsMsg::pup(PUP::er &p) {
1418   int i;
1419   p|from_pe;
1420   p|pe_speed;
1421   p|total_walltime;
1422   p|idletime;
1423   p|bg_walltime;
1424 #if CMK_LB_CPUTIMER
1425   p|total_cputime;
1426   p|bg_cputime;
1427 #endif
1428 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1429   p | step;
1430 #endif
1431   p|n_objs;
1432   if (p.isUnpacking()) objData = new LDObjData[n_objs];
1433   for (i=0; i<n_objs; i++) p|objData[i];
1434   p|n_comm;
1435   if (p.isUnpacking()) commData = new LDCommData[n_comm];
1436   for (i=0; i<n_comm; i++) p|commData[i];
1437
1438   int has_avail_vector;
1439   if (!p.isUnpacking()) has_avail_vector = (avail_vector != NULL);
1440   p|has_avail_vector;
1441   if (p.isUnpacking()) {
1442     if (has_avail_vector) avail_vector = new char[CkNumPes()];
1443     else avail_vector = NULL;
1444   }
1445   if (has_avail_vector) p(avail_vector, CkNumPes());
1446
1447   p(next_lb);
1448 }
1449
1450 // CkMarshalledCLBStatsMessage is used in the marshalled parameter in
1451 // the entry function, it is just used to use to pup.
1452 // I don't use CLBStatsMsg directly as marshalled parameter because
1453 // I want the data pointer stored and not to be freed by the Charm++.
1454 void CkMarshalledCLBStatsMessage::free() { 
1455   int count = msgs.size();
1456   for  (int i=0; i<count; i++) {
1457     delete msgs[i];
1458     msgs[i] = NULL;
1459   }
1460   msgs.free();
1461 }
1462
1463 void CkMarshalledCLBStatsMessage::add(CkMarshalledCLBStatsMessage &m)
1464 {
1465   int count = m.getCount();
1466   for (int i=0; i<count; i++) add(m.getMessage(i));
1467 }
1468
1469 void CkMarshalledCLBStatsMessage::pup(PUP::er &p)
1470 {
1471   int count = msgs.size();
1472   p|count;
1473   for (int i=0; i<count; i++) {
1474     CLBStatsMsg *msg;
1475     if (p.isUnpacking()) msg = new CLBStatsMsg;
1476     else { 
1477       msg = msgs[i]; CmiAssert(msg!=NULL);
1478     }
1479     msg->pup(p);
1480     if (p.isUnpacking()) add(msg);
1481   }
1482 }
1483
1484 SpanningTree::SpanningTree()
1485 {
1486         double sq = sqrt(CkNumPes()*4.0-3.0) - 1; // 1 + arity + arity*arity = CkNumPes()
1487         arity = (int)ceil(sq/2);
1488         calcParent(CkMyPe());
1489         calcNumChildren(CkMyPe());
1490 }
1491
1492 void SpanningTree::calcParent(int n)
1493 {
1494         parent=-1;
1495         if(n != 0  && arity > 0)
1496                 parent = (n-1)/arity;
1497 }
1498
1499 void SpanningTree::calcNumChildren(int n)
1500 {
1501         numChildren = 0;
1502         if (arity == 0) return;
1503         int fullNode=(CkNumPes()-1-arity)/arity;
1504         if(n <= fullNode)
1505                 numChildren = arity;
1506         if(n == fullNode+1)
1507                 numChildren = CkNumPes()-1-(fullNode+1)*arity;
1508         if(n > fullNode+1)
1509                 numChildren = 0;
1510 }
1511
1512 #include "CentralLB.def.h"
1513  
1514 /*@}*/