8afca6518998973caf2464f9f7bdfc3a63495ed6
[charm.git] / src / ck-ldb / CentralLB.C
1 /**
2  * \addtogroup CkLdb
3 */
4 /*@{*/
5
6 #include <charm++.h>
7 #include "ck.h"
8 #include "envelope.h"
9 #include "CentralLB.h"
10 #include "LBDBManager.h"
11 #include "LBSimulation.h"
12
13 #define  DEBUGF(x)       // CmiPrintf x;
14 #define  DEBUG(x)        // x;
15
16 #if CMK_MEM_CHECKPOINT
17    /* can not handle reduction in inmem FT */
18 #define USE_REDUCTION         0
19 #define USE_LDB_SPANNING_TREE 0
20 #elif defined(_FAULT_MLOG_)
21 /* can not handle reduction in inmem FT */
22 #define USE_REDUCTION         0
23 #define USE_LDB_SPANNING_TREE 0
24 #else
25 #define USE_REDUCTION         1
26 #define USE_LDB_SPANNING_TREE 1
27 #endif
28
29 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
30 extern int _restartFlag;
31 extern void getGlobalStep(CkGroupID );
32 extern void initMlogLBStep(CkGroupID );
33 extern int globalResumeCount;
34 extern void sendDummyMigrationCounts(int *);
35 #endif
36
37 #if CMK_GRID_QUEUE_AVAILABLE
38 CpvExtern(void *, CkGridObject);
39 #endif
40
41 CkGroupID loadbalancer;
42 int * lb_ptr;
43 int load_balancer_created;
44
45 CreateLBFunc_Def(CentralLB, "CentralLB base class")
46
47 static void getPredictedLoadWithMsg(BaseLB::LDStats* stats, int count, 
48                              LBMigrateMsg *, LBInfo &info, int considerComm);
49
50 /*
51 void CreateCentralLB()
52 {
53   CProxy_CentralLB::ckNew(0);
54 }
55 */
56
57 void CentralLB::staticStartLB(void* data)
58 {
59   CentralLB *me = (CentralLB*)(data);
60   me->StartLB();
61 }
62
63 void CentralLB::staticMigrated(void* data, LDObjHandle h, int waitBarrier)
64 {
65   CentralLB *me = (CentralLB*)(data);
66   me->Migrated(h, waitBarrier);
67 }
68
69 void CentralLB::staticAtSync(void* data)
70 {
71   CentralLB *me = (CentralLB*)(data);
72   me->AtSync();
73 }
74
75 void CentralLB::initLB(const CkLBOptions &opt)
76 {
77 #if CMK_LBDB_ON
78   lbname = "CentralLB";
79   thisProxy = CProxy_CentralLB(thisgroup);
80   //  CkPrintf("Construct in %d\n",CkMyPe());
81
82   // create and turn on by default
83   receiver = theLbdb->
84     AddLocalBarrierReceiver((LDBarrierFn)(staticAtSync),(void*)(this));
85   notifier = theLbdb->getLBDB()->
86     NotifyMigrated((LDMigratedFn)(staticMigrated),(void*)(this));
87   startLbFnHdl = theLbdb->getLBDB()->
88     AddStartLBFn((LDStartLBFn)(staticStartLB),(void*)(this));
89
90   // CkPrintf("[%d] CentralLB initLB \n",CkMyPe());
91   if (opt.getSeqNo() > 0) turnOff();
92
93   stats_msg_count = 0;
94   statsMsgsList = NULL;
95   statsData = NULL;
96
97   storedMigrateMsg = NULL;
98   reduction_started = 0;
99
100   // for future predictor
101   if (_lb_predict) predicted_model = new FutureModel(_lb_predict_window);
102   else predicted_model=0;
103   // register user interface callbacks
104   theLbdb->getLBDB()->SetupPredictor((LDPredictModelFn)(staticPredictorOn),(LDPredictWindowFn)(staticPredictorOnWin),(LDPredictFn)(staticPredictorOff),(LDPredictModelFn)(staticChangePredictor),(void*)(this));
105
106   myspeed = theLbdb->ProcessorSpeed();
107
108   migrates_completed = 0;
109   future_migrates_completed = 0;
110   migrates_expected = -1;
111   future_migrates_expected = -1;
112   cur_ld_balancer = _lb_args.central_pe();      // 0 default
113   lbdone = 0;
114   count_msgs=0;
115   statsMsg = NULL;
116
117   if (_lb_args.statsOn()) theLbdb->CollectStatsOn();
118
119   load_balancer_created = 1;
120 #endif
121 }
122
123 CentralLB::~CentralLB()
124 {
125 #if CMK_LBDB_ON
126   delete [] statsMsgsList;
127   delete statsData;
128   theLbdb = CProxy_LBDatabase(_lbdb).ckLocalBranch();
129   if (theLbdb) {
130     theLbdb->getLBDB()->
131       RemoveNotifyMigrated(notifier);
132     theLbdb->
133       RemoveStartLBFn((LDStartLBFn)(staticStartLB));
134   }
135 #endif
136 }
137
138 void CentralLB::turnOn() 
139 {
140 #if CMK_LBDB_ON
141   theLbdb->getLBDB()->
142     TurnOnBarrierReceiver(receiver);
143   theLbdb->getLBDB()->
144     TurnOnNotifyMigrated(notifier);
145   theLbdb->getLBDB()->
146     TurnOnStartLBFn(startLbFnHdl);
147 #endif
148 }
149
150 void CentralLB::turnOff() 
151 {
152 #if CMK_LBDB_ON
153   theLbdb->getLBDB()->
154     TurnOffBarrierReceiver(receiver);
155   theLbdb->getLBDB()->
156     TurnOffNotifyMigrated(notifier);
157   theLbdb->getLBDB()->
158     TurnOffStartLBFn(startLbFnHdl);
159 #endif
160 }
161
162 void CentralLB::AtSync()
163 {
164 #if CMK_LBDB_ON
165   DEBUGF(("[%d] CentralLB AtSync step %d!!!!!\n",CkMyPe(),step()));
166
167 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
168         CpvAccess(_currentObj)=this;
169 #endif
170
171   // if num of processor is only 1, nothing should happen
172   if (!QueryBalanceNow(step()) || CkNumPes() == 1) {
173     MigrationDone(0);
174     return;
175   }
176   if(CmiNodeAlive(CkMyPe())){
177     thisProxy [CkMyPe()].ProcessAtSync();
178   }
179 #endif
180 }
181
182 #include "ComlibStrategy.h"
183
184 void CentralLB::ProcessAtSync()
185 {
186 #if CMK_LBDB_ON
187   if (reduction_started) return;              // reducton in progress
188
189   CmiAssert(CmiNodeAlive(CkMyPe()));
190   if (CkMyPe() == cur_ld_balancer) {
191     start_lb_time = CkWallTimer();
192   }
193
194
195 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
196         initMlogLBStep(thisgroup);
197 #endif
198
199   // build message
200   BuildStatsMsg();
201
202 #if USE_REDUCTION
203     // reduction to get total number of objects and comm
204     // so that processor 0 can pre-allocate load balancing database
205   int counts[2];
206   counts[0] = theLbdb->GetObjDataSz();
207   counts[1] = theLbdb->GetCommDataSz();
208
209   CkCallback cb(CkIndex_CentralLB::ReceiveCounts((CkReductionMsg*)NULL), 
210                   thisProxy[0]);
211   contribute(2*sizeof(int), counts, CkReduction::sum_int, cb);
212   reduction_started = 1;
213 #else
214   SendStats();
215 #endif
216 #endif
217 }
218
219 // called only on 0
220 void CentralLB::ReceiveCounts(CkReductionMsg  *msg)
221 {
222   CmiAssert(CkMyPe() == 0);
223   if (statsData == NULL) statsData = new LDStats;
224
225   int *counts = (int *)msg->getData();
226   int n_objs = counts[0];
227   int n_comm = counts[1];
228
229     // resize database
230   statsData->objData.resize(n_objs);
231   statsData->from_proc.resize(n_objs);
232   statsData->to_proc.resize(n_objs);
233   statsData->commData.resize(n_comm);
234
235   DEBUGF(("[%d] ReceiveCounts: n_objs:%d n_comm:%d\n",CkMyPe(), n_objs, n_comm));
236         
237     // broadcast call to let everybody start to send stats
238   thisProxy.SendStats();
239 }
240
241 void CentralLB::BuildStatsMsg()
242 {
243 #if CMK_LBDB_ON
244   // build and send stats
245   const int osz = theLbdb->GetObjDataSz();
246   const int csz = theLbdb->GetCommDataSz();
247
248   int npes = CkNumPes();
249   CLBStatsMsg* msg = new CLBStatsMsg(osz, csz);
250   _MEMCHECK(msg);
251   msg->from_pe = CkMyPe();
252 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
253         msg->step = step();
254 #endif
255   //msg->serial = CrnRand();
256
257 /*
258   theLbdb->TotalTime(&msg->total_walltime,&msg->total_cputime);
259   theLbdb->IdleTime(&msg->idletime);
260   theLbdb->BackgroundLoad(&msg->bg_walltime,&msg->bg_cputime);
261 */
262 #if CMK_LB_CPUTIMER
263   theLbdb->GetTime(&msg->total_walltime,&msg->total_cputime,
264                    &msg->idletime, &msg->bg_walltime,&msg->bg_cputime);
265 #else
266   theLbdb->GetTime(&msg->total_walltime,&msg->total_walltime,
267                    &msg->idletime, &msg->bg_walltime,&msg->bg_walltime);
268 #endif
269
270   msg->pe_speed = myspeed;
271   DEBUGF(("Processor %d Total time (wall,cpu) = %f %f Idle = %f Bg = %f %f\n", CkMyPe(),msg->total_walltime,msg->total_cputime,msg->idletime,msg->bg_walltime,msg->bg_cputime));
272
273   msg->n_objs = osz;
274   theLbdb->GetObjData(msg->objData);
275   msg->n_comm = csz;
276   theLbdb->GetCommData(msg->commData);
277 //  theLbdb->ClearLoads();
278   DEBUGF(("PE %d BuildStatsMsg %d objs, %d comm\n",CkMyPe(),msg->n_objs,msg->n_comm));
279
280   if(CkMyPe() == cur_ld_balancer) {
281     msg->avail_vector = new char[CkNumPes()];
282     LBDatabaseObj()->get_avail_vector(msg->avail_vector);
283     msg->next_lb = LBDatabaseObj()->new_lbbalancer();
284   }
285
286   CmiAssert(statsMsg == NULL);
287   statsMsg = msg;
288 #endif
289 }
290
291 // called on every processor
292 void CentralLB::SendStats()
293 {
294 #if CMK_LBDB_ON
295   CmiAssert(statsMsg != NULL);
296   reduction_started = 0;
297
298 #if USE_LDB_SPANNING_TREE
299   if(CkNumPes()>1024)
300   {
301     if (CkMyPe() == cur_ld_balancer)
302       thisProxy[CkMyPe()].ReceiveStats(statsMsg);
303     else
304       thisProxy[CkMyPe()].ReceiveStatsViaTree(statsMsg);
305   }
306   else
307 #endif
308   {
309     DEBUGF(("[%d] calling ReceiveStats on step %d \n",CmiMyPe(),step()));
310     thisProxy[cur_ld_balancer].ReceiveStats(statsMsg);
311   }
312
313   statsMsg = NULL;
314
315 #ifdef __BIGSIM__
316   BgEndStreaming();
317 #endif
318
319   {
320   // enfore the barrier to wait until centralLB says no
321   LDOMHandle h;
322   h.id.id.idx = 0;
323   theLbdb->getLBDB()->RegisteringObjects(h);
324   }
325 #endif
326 }
327
328 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
329 extern int donotCountMigration;
330 #endif
331
332 void CentralLB::Migrated(LDObjHandle h, int waitBarrier)
333 {
334 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
335     if(donotCountMigration){
336         return ;
337     }
338 #endif
339
340 #if CMK_LBDB_ON
341   if (waitBarrier) {
342             migrates_completed++;
343       DEBUGF(("[%d] An object migrated! %d %d\n",CkMyPe(),migrates_completed,migrates_expected));
344     if (migrates_completed == migrates_expected) {
345       MigrationDone(1);
346     }
347   }
348   else {
349     future_migrates_completed ++;
350     DEBUGF(("[%d] An object migrated with no barrier! %d expected: %d\n",CkMyPe(),future_migrates_completed,future_migrates_expected));
351     if (future_migrates_completed == future_migrates_expected)  {
352         CheckMigrationComplete();
353     }
354   }
355 #endif
356 }
357
358 void CentralLB::MissMigrate(int waitForBarrier)
359 {
360   LDObjHandle h;
361   Migrated(h, waitForBarrier);
362 }
363
364 // build a complete data from bufferred messages
365 // not used when USE_REDUCTION = 1
366 void CentralLB::buildStats()
367 {
368     statsData->nprocs() = stats_msg_count;
369     // allocate space
370     statsData->objData.resize(statsData->n_objs);
371     statsData->from_proc.resize(statsData->n_objs);
372     statsData->to_proc.resize(statsData->n_objs);
373     statsData->commData.resize(statsData->n_comm);
374
375     int nobj = 0;
376     int ncom = 0;
377     int nmigobj = 0;
378     // copy all data in individule message to this big structure
379     for (int pe=0; pe<CkNumPes(); pe++) {
380        int i;
381        CLBStatsMsg *msg = statsMsgsList[pe];
382        if(msg == NULL) continue;
383        for (i=0; i<msg->n_objs; i++) {
384          statsData->from_proc[nobj] = statsData->to_proc[nobj] = pe;
385          statsData->objData[nobj] = msg->objData[i];
386          if (msg->objData[i].migratable) nmigobj++;
387          nobj++;
388        }
389        for (i=0; i<msg->n_comm; i++) {
390          statsData->commData[ncom] = msg->commData[i];
391          ncom++;
392        }
393        // free the memory
394        delete msg;
395        statsMsgsList[pe]=0;
396     }
397     statsData->n_migrateobjs = nmigobj;
398 }
399
400 // deposit one processor data at a time, note database is pre-allocated
401 // to have enough space
402 // used when USE_REDUCTION = 1
403 void CentralLB::depositData(CLBStatsMsg *m)
404 {
405   int i;
406   if (m == NULL) return;
407
408   const int pe = m->from_pe;
409   struct ProcStats &procStat = statsData->procs[pe];
410   procStat.pe = pe;
411   procStat.total_walltime = m->total_walltime;
412   procStat.idletime = m->idletime;
413   procStat.bg_walltime = m->bg_walltime;
414 #if CMK_LB_CPUTIMER
415   procStat.total_cputime = m->total_cputime;
416   procStat.bg_cputime = m->bg_cputime;
417 #endif
418   procStat.pe_speed = m->pe_speed;
419   //procStat.utilization = 1.0;
420   procStat.available = CmiTrue;
421   procStat.n_objs = m->n_objs;
422
423   int &nobj = statsData->n_objs;
424   int &nmigobj = statsData->n_migrateobjs;
425   for (i=0; i<m->n_objs; i++) {
426       statsData->from_proc[nobj] = statsData->to_proc[nobj] = pe;
427       statsData->objData[nobj] = m->objData[i];
428       if (m->objData[i].migratable) nmigobj++;
429       nobj++;
430       CmiAssert(nobj <= statsData->objData.capacity());
431   }
432   int &n_comm = statsData->n_comm;
433   for (i=0; i<m->n_comm; i++) {
434       statsData->commData[n_comm] = m->commData[i];
435       n_comm++;
436       CmiAssert(n_comm <= statsData->commData.capacity());
437   }
438   delete m;
439 }
440
441 void CentralLB::ReceiveStats(CkMarshalledCLBStatsMessage &msg)
442 {
443 #if CMK_LBDB_ON
444   if (statsMsgsList == NULL) {
445     statsMsgsList = new CLBStatsMsg*[CkNumPes()];
446     CmiAssert(statsMsgsList != NULL);
447     for(int i=0; i < CkNumPes(); i++)
448       statsMsgsList[i] = 0;
449   }
450   if (statsData == NULL) statsData = new LDStats;
451
452     //  loop through all CLBStatsMsg in the incoming msg
453   int count = msg.getCount();
454   for (int num = 0; num < count; num++) 
455   {
456     CLBStatsMsg *m = msg.getMessage(num);
457     CmiAssert(m!=NULL);
458     const int pe = m->from_pe;
459     DEBUGF(("Stats msg received, %d %d %d %p step %d\n", pe,stats_msg_count,m->n_objs,m,step()));
460 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))     
461 /*      
462  *  if(m->step < step()){
463  *    //TODO: if a processor is redoing an old load balance step..
464  *    //tell it that the step is done and that it should not perform any migrations
465  *      thisProxy[pe].ReceiveDummyMigration();
466  *  }*/
467 #endif
468         
469     if(!CmiNodeAlive(pe)){
470         DEBUGF(("[%d] ReceiveStats called from invalidProcessor %d\n",CkMyPe(),pe));
471         continue;
472     }
473         
474     if (m->avail_vector!=NULL) {
475       LBDatabaseObj()->set_avail_vector(m->avail_vector,  m->next_lb);
476     }
477
478     if (statsMsgsList[pe] != 0) {
479       CkPrintf("*** Unexpected CLBStatsMsg in ReceiveStats from PE %d ***\n",
480              pe);
481     } else {
482       statsMsgsList[pe] = m;
483 #if USE_REDUCTION
484       depositData(m);
485 #else
486       // store per processor data right away
487       struct ProcStats &procStat = statsData->procs[pe];
488       procStat.pe = pe;
489       procStat.total_walltime = m->total_walltime;
490       procStat.idletime = m->idletime;
491       procStat.bg_walltime = m->bg_walltime;
492 #if CMK_LB_CPUTIMER
493       procStat.total_cputime = m->total_cputime;
494       procStat.bg_cputime = m->bg_cputime;
495 #endif
496       procStat.pe_speed = m->pe_speed;
497       //procStat.utilization = 1.0;
498       procStat.available = CmiTrue;
499       procStat.n_objs = m->n_objs;
500
501       statsData->n_objs += m->n_objs;
502       statsData->n_comm += m->n_comm;
503 #endif
504       stats_msg_count++;
505     }
506   }    // end of for
507
508   const int clients = CkNumValidPes();
509   DEBUGF(("THIS POINT count = %d, clients = %d\n",stats_msg_count,clients));
510  
511   if (stats_msg_count == clients) {
512         DEBUGF(("[%d] All stats messages received \n",CmiMyPe()));
513     statsData->nprocs() = stats_msg_count;
514     thisProxy[CkMyPe()].LoadBalance();
515   }
516 #endif
517 }
518
519 /** added by Abhinav for receiving msgs via spanning tree */
520 void CentralLB::ReceiveStatsViaTree(CkMarshalledCLBStatsMessage &msg)
521 {
522 #if CMK_LBDB_ON
523         CmiAssert(CkMyPe() != 0);
524         bufMsg.add(msg);         // buffer messages
525         count_msgs++;
526         //CkPrintf("here %d\n", CkMyPe());
527         if (count_msgs == st.numChildren+1) {
528                 if(st.parent == 0)
529                 {
530                         thisProxy[0].ReceiveStats(bufMsg);
531                         //CkPrintf("from %d\n", CkMyPe());
532                 }
533                 else
534                         thisProxy[st.parent].ReceiveStatsViaTree(bufMsg);
535                 count_msgs = 0;
536                 bufMsg.free();
537         } 
538 #endif
539 }
540
541 void CentralLB::LoadBalance()
542 {
543 #if CMK_LBDB_ON
544   int proc;
545   const int clients = CkNumPes();
546
547 #if ! USE_REDUCTION
548   // build data
549   buildStats();
550 #else
551   for (proc = 0; proc < clients; proc++) statsMsgsList[proc] = NULL;
552 #endif
553
554   if (!_lb_args.samePeSpeed()) statsData->normalize_speed();
555
556   if (_lb_args.debug()) 
557       CmiPrintf("\nCharmLB> %s: PE [%d] step %d starting at %f Memory: %f MB\n",
558                   lbname, cur_ld_balancer, step(), start_lb_time,
559                   CmiMemoryUsage()/(1024.0*1024.0));
560
561   // if we are in simulation mode read data
562   if (LBSimulation::doSimulation) simulationRead();
563
564   char *availVector = LBDatabaseObj()->availVector();
565   for(proc = 0; proc < clients; proc++)
566       statsData->procs[proc].available = (CmiBool)availVector[proc];
567
568   preprocess(statsData);
569
570 //    CkPrintf("Before Calling Strategy\n");
571
572   if (_lb_args.printSummary()) {
573       LBInfo info(clients);
574         // not take comm data
575       info.getInfo(statsData, clients, 0);
576       LBRealType mLoad, mCpuLoad, totalLoad;
577       info.getSummary(mLoad, mCpuLoad, totalLoad);
578       int nmsgs, nbytes;
579       statsData->computeNonlocalComm(nmsgs, nbytes);
580       CkPrintf("[%d] Load Summary (before LB): max (with bg load): %f max (obj only): %f average: %f at step %d nonlocal: %d msgs %.2fKB.\n", CkMyPe(), mLoad, mCpuLoad, totalLoad/clients, step(), nmsgs, 1.0*nbytes/1024);
581 //      if (_lb_args.debug() > 1) {
582 //        for (int i=0; i<statsData->n_objs; i++)
583 //          CmiPrintf("[%d] %.10f %.10f\n", i, statsData->objData[i].minWall, statsData->objData[i].maxWall);
584 //      }
585   }
586
587 #if CMK_REPLAYSYSTEM
588   LDHandle *loadBalancer_pointers;
589   if (_replaySystem) {
590     loadBalancer_pointers = (LDHandle*)malloc(CkNumPes()*sizeof(LDHandle));
591     for (int i=0; i<statsData->n_objs; ++i) loadBalancer_pointers[statsData->from_proc[i]] = statsData->objData[i].handle.omhandle.ldb;
592   }
593 #endif
594   
595   LBMigrateMsg* migrateMsg = Strategy(statsData);
596 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
597         migrateMsg->step = step();
598 #endif
599
600 #if CMK_REPLAYSYSTEM
601   CpdHandleLBMessage(&migrateMsg);
602   if (_replaySystem) {
603     for (int i=0; i<migrateMsg->n_moves; ++i) migrateMsg->moves[i].obj.omhandle.ldb = loadBalancer_pointers[migrateMsg->moves[i].from_pe];
604     free(loadBalancer_pointers);
605   }
606 #endif
607   
608   LBDatabaseObj()->get_avail_vector(migrateMsg->avail_vector);
609   migrateMsg->next_lb = LBDatabaseObj()->new_lbbalancer();
610
611   // if this is the step at which we need to dump the database
612   simulationWrite();
613
614 //  calculate predicted load
615 //  very time consuming though, so only happen when debugging is on
616   if (_lb_args.printSummary()) {
617       LBInfo info(clients);
618         // not take comm data
619       getPredictedLoadWithMsg(statsData, clients, migrateMsg, info, 0);
620       LBRealType mLoad, mCpuLoad, totalLoad;
621       info.getSummary(mLoad, mCpuLoad, totalLoad);
622       int nmsgs, nbytes;
623       statsData->computeNonlocalComm(nmsgs, nbytes);
624       CkPrintf("[%d] Load Summary (after LB): max (with bg load): %f max (obj only): %f average: %f at step %d nonlocal: %d msgs %.2fKB useMem: %.2fKB.\n", CkMyPe(), mLoad, mCpuLoad, totalLoad/clients, step(), nmsgs, 1.0*nbytes/1024, (1.0*useMem())/1024);
625       for (int i=0; i<clients; i++)
626         migrateMsg->expectedLoad[i] = info.peLoads[i];
627   }
628
629   DEBUGF(("[%d]calling recv migration\n",CkMyPe()));
630 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_)) 
631     lbDecisionCount++;
632     migrateMsg->lbDecisionCount = lbDecisionCount;
633 #endif
634
635   envelope *env = UsrToEnv(migrateMsg);
636   if (1) {
637       // broadcast
638     thisProxy.ReceiveMigration(migrateMsg);
639   }
640   else {
641     // split the migration for each processor
642     for (int p=0; p<CkNumPes(); p++) {
643       LBMigrateMsg *m = extractMigrateMsg(migrateMsg, p);
644       thisProxy[p].ReceiveMigration(m);
645     }
646     delete migrateMsg;
647   }
648
649   // Zero out data structures for next cycle
650   // CkPrintf("zeroing out data\n");
651   statsData->clear();
652   stats_msg_count=0;
653 #endif
654 }
655
656 // test if sender and receiver in a commData is nonmigratable.
657 static int isMigratable(LDObjData **objData, int *len, int count, const LDCommData &commData)
658 {
659 #if CMK_LBDB_ON
660   for (int pe=0 ; pe<count; pe++)
661   {
662     for (int i=0; i<len[pe]; i++)
663       if (LDObjIDEqual(objData[pe][i].objID(), commData.sender.objID()) ||
664           LDObjIDEqual(objData[pe][i].objID(), commData.receiver.get_destObj().objID())) 
665       return 0;
666   }
667 #endif
668   return 1;
669 }
670
671 // rebuild LDStats and remove all non-migratble objects and related things
672 void CentralLB::removeNonMigratable(LDStats* stats, int count)
673 {
674   int i;
675
676   // check if we have non-migratable objects
677   int have = 0;
678   for (i=0; i<stats->n_objs; i++) 
679   {
680     LDObjData &odata = stats->objData[i];
681     if (!odata.migratable) {
682       have = 1; break;
683     }
684   }
685   if (have == 0) return;
686
687   CkVec<LDObjData> nonmig;
688   CkVec<int> new_from_proc, new_to_proc;
689   nonmig.resize(stats->n_migrateobjs);
690   new_from_proc.resize(stats->n_migrateobjs);
691   new_to_proc.resize(stats->n_migrateobjs);
692   int n_objs = 0;
693   for (i=0; i<stats->n_objs; i++) 
694   {
695     LDObjData &odata = stats->objData[i];
696     if (odata.migratable) {
697       nonmig[n_objs] = odata;
698       new_from_proc[n_objs] = stats->from_proc[i];
699       new_to_proc[n_objs] = stats->to_proc[i];
700       n_objs ++;
701     }
702     else {
703       stats->procs[stats->from_proc[i]].bg_walltime += odata.wallTime;
704 #if CMK_LB_CPUTIMER
705       stats->procs[stats->from_proc[i]].bg_cputime += odata.cpuTime;
706 #endif
707     }
708   }
709   CmiAssert(stats->n_migrateobjs == n_objs);
710
711   stats->makeCommHash();
712   
713   CkVec<LDCommData> newCommData;
714   newCommData.resize(stats->n_comm);
715   int n_comm = 0;
716   for (i=0; i<stats->n_comm; i++) 
717   {
718     LDCommData& cdata = stats->commData[i];
719     if (!cdata.from_proc()) 
720     {
721       int idx = stats->getSendHash(cdata);
722       CmiAssert(idx != -1);
723       if (!stats->objData[idx].migratable) continue;
724     }
725     switch (cdata.receiver.get_type()) {
726     case LD_PROC_MSG:
727       break;
728     case LD_OBJ_MSG:  {
729       int idx = stats->getRecvHash(cdata);
730       if (stats->complete_flag)
731         CmiAssert(idx != -1);
732       else if (idx == -1) continue;          // receiver not in this group
733       if (!stats->objData[idx].migratable) continue;
734       break;
735       }
736     case LD_OBJLIST_MSG:    // object message FIXME add multicast
737       break;
738     }
739     newCommData[n_comm] = cdata;
740     n_comm ++;
741   }
742
743   if (n_objs != stats->n_objs) CmiPrintf("Removed %d nonmigratable %d comms - n_objs:%d migratable:%d\n", stats->n_objs-n_objs, stats->n_objs, stats->n_migrateobjs, stats->n_comm-n_comm);
744
745   // swap to new data
746   stats->objData = nonmig;
747   stats->from_proc = new_from_proc;
748   stats->to_proc = new_to_proc;
749   stats->n_objs = n_objs;
750
751   stats->commData = newCommData;
752   stats->n_comm = n_comm;
753
754   stats->deleteCommHash();
755   stats->makeCommHash();
756
757 }
758
759
760 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
761 extern int restarted;
762 #endif
763
764 void CentralLB::ReceiveMigration(LBMigrateMsg *m)
765 {
766   storedMigrateMsg = m;
767 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
768         restoreParallelRecovery(&resumeAfterRestoreParallelRecovery,(void *)this);
769 #else
770   CkCallback cb(CkIndex_CentralLB::ProcessReceiveMigration((CkReductionMsg*)NULL),
771                   thisProxy);
772   contribute(0, NULL, CkReduction::max_int, cb);
773 #endif
774 }
775
776 void CentralLB::ProcessReceiveMigration(CkReductionMsg  *msg)
777 {
778 #if CMK_LBDB_ON
779         int i;
780         LBMigrateMsg *m = storedMigrateMsg;
781         CmiAssert(m!=NULL);
782         delete msg;
783
784 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
785         int *dummyCounts;
786
787         DEBUGF(("[%d] Starting ReceiveMigration WITH step %d m->step %d\n",CkMyPe(),step(),m->step));
788         // CmiPrintf("[%d] Starting ReceiveMigration step %d m->step %d\n",CkMyPe(),step(),m->step);
789         if(step() > m->step){
790                 char str[100];
791                 envelope *env = UsrToEnv(m);
792                 CmiPrintf("[%d] Object %s tProcessed %d m->TN %d\n",CmiMyPe(),mlogData->objID.toString(str),mlogData->tProcessed,env->TN);
793                 return;
794         }
795         lbDecisionCount = m->lbDecisionCount;
796 #endif
797
798   if (_lb_args.debug() > 1) 
799     if (CkMyPe()%1024==0) CmiPrintf("[%d] Starting ReceiveMigration step %d at %f\n",CkMyPe(),step(), CmiWallTimer());
800
801   for (i=0; i<CkNumPes(); i++) theLbdb->lastLBInfo.expectedLoad[i] = m->expectedLoad[i];
802   CmiAssert(migrates_expected <= 0 || migrates_completed == migrates_expected);
803 /*FAULT_EVAC*/
804   if(!CmiNodeAlive(CkMyPe())){
805         delete m;
806         return;
807   }
808   migrates_expected = 0;
809   future_migrates_expected = 0;
810 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
811         int sending=0;
812     int dummy=0;
813         LBDB *_myLBDB = theLbdb->getLBDB();
814         if(_restartFlag){
815         dummyCounts = new int[CmiNumPes()];
816         bzero(dummyCounts,sizeof(int)*CmiNumPes());
817     }
818 #endif
819   for(i=0; i < m->n_moves; i++) {
820     MigrateInfo& move = m->moves[i];
821     const int me = CkMyPe();
822     if (move.from_pe == me && move.to_pe != me) {
823       DEBUGF(("[%d] migrating object to %d\n",move.from_pe,move.to_pe));
824       // migrate object, in case it is already gone, inform toPe
825 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
826       if (theLbdb->Migrate(move.obj,move.to_pe) == 0) 
827          thisProxy[move.to_pe].MissMigrate(!move.async_arrival);
828 #else
829             if(_restartFlag == 0){
830                 DEBUG(CmiPrintf("[%d] need to move object from %d to %d \n",CkMyPe(),move.from_pe,move.to_pe));
831                 theLbdb->Migrate(move.obj,move.to_pe);
832                 sending++;
833             }else{
834                 if(_myLBDB->validObjHandle(move.obj)){
835                     DEBUG(CmiPrintf("[%d] need to move object from %d to %d \n",CkMyPe(),move.from_pe,move.to_pe));
836                     theLbdb->Migrate(move.obj,move.to_pe);
837                     sending++;
838                 }else{
839                     DEBUG(CmiPrintf("[%d] dummy move to pe %d detected after restart \n",CmiMyPe(),move.to_pe));
840                     dummyCounts[move.to_pe]++;
841                     dummy++;
842                 }
843             }
844 #endif
845     } else if (move.from_pe != me && move.to_pe == me) {
846        DEBUGF(("[%d] expecting object from %d\n",move.to_pe,move.from_pe));
847       if (!move.async_arrival) migrates_expected++;
848       else future_migrates_expected++;
849     }
850   }
851   DEBUGF(("[%d] in ReceiveMigration %d moves expected: %d future expected: %d\n",CkMyPe(),m->n_moves, migrates_expected, future_migrates_expected));
852   // if (_lb_debug) CkPrintf("[%d] expecting %d objects migrating.\n", CkMyPe(), migrates_expected);
853
854 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
855         if(_restartFlag){
856                 sendDummyMigrationCounts(dummyCounts);
857                 _restartFlag  =0;
858         delete []dummyCounts;
859         }
860 #endif
861
862
863 #if 0
864   if (m->n_moves ==0) {
865     theLbdb->SetLBPeriod(theLbdb->GetLBPeriod()*2);
866   }
867 #endif
868   cur_ld_balancer = m->next_lb;
869   if((CkMyPe() == cur_ld_balancer) && (cur_ld_balancer != 0)){
870       LBDatabaseObj()->set_avail_vector(m->avail_vector, -2);
871   }
872
873   if (migrates_expected == 0 || migrates_completed == migrates_expected)
874     MigrationDone(1);
875   delete m;
876
877 //      CkEvacuatedElement();
878 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
879 //  migrates_expected = 0;
880 //  //  ResumeClients(1);
881 #endif
882 #endif
883 }
884
885 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
886 void CentralLB::ReceiveDummyMigration(int globalDecisionCount){
887     DEBUGF(("[%d] ReceiveDummyMigration called for step %d with globalDecisionCount %d\n",CkMyPe(),step(),globalDecisionCount));
888     //TODO: this is gonna be important when a crash happens during checkpoint
889     //the globalDecisionCount would have to be saved and compared against
890     //a future recvMigration
891                 
892         thisProxy[CkMyPe()].ResumeClients(1);
893 }
894 #endif
895
896 void CentralLB::MigrationDone(int balancing)
897 {
898 #if CMK_LBDB_ON
899   migrates_completed = 0;
900   migrates_expected = -1;
901   // clear load stats
902   if (balancing) theLbdb->ClearLoads();
903   // Increment to next step
904   theLbdb->incStep();
905         DEBUGF(("[%d] Incrementing Step %d \n",CkMyPe(),step()));
906   // if sync resume, invoke a barrier
907
908 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
909     savedBalancing = balancing;
910     startLoadBalancingMlog(&resumeCentralLbAfterChkpt,(void *)this);
911 #endif
912
913   LBDatabase::Object()->MigrationDone();    // call registered callbacks
914
915   LoadbalanceDone(balancing);        // callback
916 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
917   // if sync resume invoke a barrier
918   if (balancing && _lb_args.syncResume()) {
919     CkCallback cb(CkIndex_CentralLB::ResumeClients((CkReductionMsg*)NULL), 
920                   thisProxy);
921     contribute(0, NULL, CkReduction::sum_int, cb);
922   }
923   else{ 
924     if(CmiNodeAlive(CkMyPe())){
925         thisProxy [CkMyPe()].ResumeClients(balancing);
926     }   
927   }     
928 #if CMK_GRID_QUEUE_AVAILABLE
929   CmiGridQueueDeregisterAll ();
930   CpvAccess(CkGridObject) = NULL;
931 #endif
932 #endif 
933 #endif
934 }
935
936 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
937 void CentralLB::endMigrationDone(int balancing){
938     DEBUGF(("[%d] CentralLB::endMigrationDone step %d\n",CkMyPe(),step()));
939
940
941   if (balancing && _lb_args.syncResume()) {
942     CkCallback cb(CkIndex_CentralLB::ResumeClients((CkReductionMsg*)NULL),
943                   thisProxy);
944     contribute(0, NULL, CkReduction::sum_int, cb);
945   }
946   else{
947     if(CmiNodeAlive(CkMyPe())){
948     DEBUGF(("[%d] Sending ResumeClients balancing %d \n",CkMyPe(),balancing));
949     thisProxy [CkMyPe()].ResumeClients(balancing);
950     }
951   }
952
953 }
954 #endif
955
956 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
957 void resumeCentralLbAfterChkpt(void *_lb){
958     CentralLB *lb= (CentralLB *)_lb;
959     CpvAccess(_currentObj)=lb;
960     lb->endMigrationDone(lb->savedBalancing);
961 }
962 void resumeAfterRestoreParallelRecovery(void *_lb){
963     CentralLB *lb= (CentralLB *)_lb;
964         lb->ProcessReceiveMigration((CkReductionMsg*)NULL);
965 }
966 #endif
967
968
969 void CentralLB::ResumeClients(CkReductionMsg *msg)
970 {
971   ResumeClients(1);
972   delete msg;
973 }
974
975 void CentralLB::ResumeClients(int balancing)
976 {
977 #if CMK_LBDB_ON
978 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
979     resumeCount++;
980     globalResumeCount = resumeCount;
981 #endif
982   DEBUGF(("[%d] Resuming clients. balancing:%d.\n",CkMyPe(),balancing));
983   if (balancing && _lb_args.debug() && CkMyPe() == cur_ld_balancer) {
984     double end_lb_time = CkWallTimer();
985   }
986
987 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
988   if (balancing) ComlibNotifyMigrationDone();  
989 #endif
990
991   theLbdb->ResumeClients();
992   if (balancing)  {
993     CheckMigrationComplete();
994     if (future_migrates_expected == 0 || 
995             future_migrates_expected == future_migrates_completed) {
996       CheckMigrationComplete();
997     }
998   }
999 #endif
1000 }
1001
1002 /*
1003   migration of objects contains two different kinds:
1004   (1) objects want to make a barrier for migration completion
1005       (waitForBarrier is true)
1006       migrationDone() to finish and resumeClients
1007   (2) objects don't need a barrier
1008   However, next load balancing can only happen when both migrations complete
1009 */ 
1010 void CentralLB::CheckMigrationComplete()
1011 {
1012 #if CMK_LBDB_ON
1013   lbdone ++;
1014   if (lbdone == 2) {
1015     if (_lb_args.debug() && CkMyPe()==0) {
1016       double end_lb_time = CkWallTimer();
1017       CkPrintf("CharmLB> %s: PE [%d] step %d finished at %f duration %f s\n\n",
1018                 lbname, cur_ld_balancer, step()-1, end_lb_time,
1019                 end_lb_time-start_lb_time);
1020     }
1021     lbdone = 0;
1022     future_migrates_expected = -1;
1023     future_migrates_completed = 0;
1024     DEBUGF(("[%d] Migration Complete\n", CkMyPe()));
1025     // release local barrier  so that the next load balancer can go
1026     LDOMHandle h;
1027     h.id.id.idx = 0;
1028     theLbdb->getLBDB()->DoneRegisteringObjects(h);
1029     // switch to the next load balancer in the list
1030     // subtle: called from Migrated() may result in Migrated() called in next LB
1031     theLbdb->nextLoadbalancer(seqno);
1032   }
1033 #endif
1034 }
1035
1036 void CentralLB::preprocess(LDStats* stats)
1037 {
1038   if (_lb_args.ignoreBgLoad())
1039     stats->clearBgLoad();
1040
1041   // Call the predictor for the future
1042   if (_lb_predict) FuturePredictor(statsData);
1043 }
1044
1045 // default load balancing strategy
1046 LBMigrateMsg* CentralLB::Strategy(LDStats* stats)
1047 {
1048 #if CMK_LBDB_ON
1049   double strat_start_time = CkWallTimer();
1050   if (_lb_args.debug())
1051     CkPrintf("CharmLB> %s: PE [%d] strategy starting at %f\n", lbname, cur_ld_balancer, strat_start_time);
1052
1053   work(stats);
1054
1055   if (_lb_args.debug()>2)  {
1056     CkPrintf("CharmLB> Obj Map:\n");
1057     for (int i=0; i<stats->n_objs; i++) CkPrintf("%d ", stats->to_proc[i]);
1058     CkPrintf("\n");
1059   }
1060
1061   LBMigrateMsg *msg = createMigrateMsg(stats);
1062
1063   if (_lb_args.debug()) {
1064     double strat_end_time = CkWallTimer();
1065     envelope *env = UsrToEnv(msg);
1066
1067     double lbdbMemsize = LBDatabase::Object()->useMem()/1000;
1068     CkPrintf("CharmLB> %s: PE [%d] Memory: LBManager: %d KB CentralLB: %d KB\n",
1069               lbname, cur_ld_balancer, (int)lbdbMemsize, (int)(useMem()/1000));
1070     CkPrintf("CharmLB> %s: PE [%d] #Objects migrating: %d, LBMigrateMsg size: %.2f MB\n", lbname, cur_ld_balancer, msg->n_moves, env->getTotalsize()/1024.0/1024.0);
1071     CkPrintf("CharmLB> %s: PE [%d] strategy finished at %f duration %f s\n",
1072               lbname, cur_ld_balancer, strat_end_time, strat_end_time-strat_start_time);
1073   }
1074
1075   return msg;
1076 #else
1077   return NULL;
1078 #endif
1079 }
1080
1081 void CentralLB::work(LDStats* stats)
1082 {
1083   // does nothing but print the database
1084   stats->print();
1085 }
1086
1087 // generate migrate message from stats->from_proc and to_proc
1088 LBMigrateMsg * CentralLB::createMigrateMsg(LDStats* stats)
1089 {
1090   int i;
1091   CkVec<MigrateInfo*> migrateInfo;
1092   for (i=0; i<stats->n_objs; i++) {
1093     LDObjData &objData = stats->objData[i];
1094     int frompe = stats->from_proc[i];
1095     int tope = stats->to_proc[i];
1096     if (frompe != tope) {
1097       //      CkPrintf("[%d] Obj %d migrating from %d to %d\n",
1098       //         CkMyPe(),obj,pe,dest);
1099       MigrateInfo *migrateMe = new MigrateInfo;
1100       migrateMe->obj = objData.handle;
1101       migrateMe->from_pe = frompe;
1102       migrateMe->to_pe = tope;
1103       migrateMe->async_arrival = objData.asyncArrival;
1104       migrateInfo.insertAtEnd(migrateMe);
1105     }
1106   }
1107
1108   int migrate_count=migrateInfo.length();
1109   LBMigrateMsg* msg = new(migrate_count,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
1110   msg->n_moves = migrate_count;
1111   for(i=0; i < migrate_count; i++) {
1112     MigrateInfo* item = (MigrateInfo*) migrateInfo[i];
1113     msg->moves[i] = *item;
1114     delete item;
1115     migrateInfo[i] = 0;
1116   }
1117   return msg;
1118 }
1119
1120 LBMigrateMsg * CentralLB::extractMigrateMsg(LBMigrateMsg *m, int p)
1121 {
1122   int nmoves = 0;
1123   int nunavail = 0;
1124   int i;
1125   for (i=0; i<m->n_moves; i++) {
1126     MigrateInfo* item = (MigrateInfo*) &m->moves[i];
1127     if (item->from_pe == p || item->to_pe == p) nmoves++;
1128   }
1129   for (i=0; i<CkNumPes();i++) {
1130     if (!m->avail_vector[i]) nunavail++;
1131   }
1132   LBMigrateMsg* msg;
1133   if (nunavail) msg = new(nmoves,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
1134   else msg = new(nmoves,0,0,0) LBMigrateMsg;
1135   msg->n_moves = nmoves;
1136   msg->level = m->level;
1137   msg->next_lb = m->next_lb;
1138   for (i=0,nmoves=0; i<m->n_moves; i++) {
1139     MigrateInfo* item = (MigrateInfo*) &m->moves[i];
1140     if (item->from_pe == p || item->to_pe == p) {
1141       msg->moves[nmoves] = *item;
1142       nmoves++;
1143     }
1144   }
1145   // copy processor data
1146   if (nunavail)
1147   for (i=0; i<CkNumPes();i++) {
1148     msg->avail_vector[i] = m->avail_vector[i];
1149     msg->expectedLoad[i] = m->expectedLoad[i];
1150   }
1151   return msg;
1152 }
1153
1154 void CentralLB::simulationWrite() {
1155   if(step() == LBSimulation::dumpStep)
1156   {
1157     // here we are supposed to dump the database
1158     int dumpFileSize = strlen(LBSimulation::dumpFile) + 4;
1159     char *dumpFileName = (char *)malloc(dumpFileSize);
1160     while (sprintf(dumpFileName, "%s.%d", LBSimulation::dumpFile, LBSimulation::dumpStep) >= dumpFileSize) {
1161       free(dumpFileName);
1162       dumpFileSize+=3;
1163       dumpFileName = (char *)malloc(dumpFileSize);
1164     }
1165     writeStatsMsgs(dumpFileName);
1166     free(dumpFileName);
1167     CmiPrintf("LBDump: Dumped the load balancing data at step %d.\n",LBSimulation::dumpStep);
1168     ++LBSimulation::dumpStep;
1169     --LBSimulation::dumpStepSize;
1170     if (LBSimulation::dumpStepSize <= 0) { // prevent stupid step sizes
1171       CmiPrintf("Charm++> Exiting...\n");
1172       CkExit();
1173     }
1174     return;
1175   }
1176 }
1177
1178 void CentralLB::simulationRead() {
1179   LBSimulation *simResults = NULL, *realResults;
1180   LBMigrateMsg *voidMessage = new (0,0,0,0) LBMigrateMsg();
1181   voidMessage->n_moves=0;
1182   for ( ;LBSimulation::simStepSize > 0; --LBSimulation::simStepSize, ++LBSimulation::simStep) {
1183     // here we are supposed to read the data from the dump database
1184     int simFileSize = strlen(LBSimulation::dumpFile) + 4;
1185     char *simFileName = (char *)malloc(simFileSize);
1186     while (sprintf(simFileName, "%s.%d", LBSimulation::dumpFile, LBSimulation::simStep) >= simFileSize) {
1187       free(simFileName);
1188       simFileSize+=3;
1189       simFileName = (char *)malloc(simFileSize);
1190     }
1191     readStatsMsgs(simFileName);
1192
1193     // allocate simResults (only the first step)
1194     if (simResults == NULL) {
1195       simResults = new LBSimulation(LBSimulation::simProcs);
1196       realResults = new LBSimulation(LBSimulation::simProcs);
1197     }
1198     else {
1199       // should be the same number of procs of the original simulation!
1200       if (!LBSimulation::procsChanged) {
1201         // it means we have a previous step, so in simResults there is data.
1202         // we can now print the real effects of the load balancer during the simulation
1203         // or print the difference between the predicted data and the real one.
1204         realResults->reset();
1205         // reset to_proc of statsData to be equal to from_proc
1206         for (int k=0; k < statsData->n_objs; ++k) statsData->to_proc[k] = statsData->from_proc[k];
1207         findSimResults(statsData, LBSimulation::simProcs, voidMessage, realResults);
1208         simResults->PrintDifferences(realResults,statsData);
1209       }
1210       simResults->reset();
1211     }
1212
1213     // now pass it to the strategy routine
1214     double startT = CkWallTimer();
1215     preprocess(statsData);
1216     CmiPrintf("%s> Strategy starts ... \n", lbname);
1217     LBMigrateMsg* migrateMsg = Strategy(statsData);
1218     CmiPrintf("%s> Strategy took %fs memory usage: CentralLB: %d KB.\n",
1219                lbname, CkWallTimer()-startT, (int)(useMem()/1000));
1220
1221     // now calculate the results of the load balancing simulation
1222     findSimResults(statsData, LBSimulation::simProcs, migrateMsg, simResults);
1223
1224     // now we have the simulation data, so print it and loop
1225     CmiPrintf("Charm++> LBSim: Simulation of load balancing step %d done.\n",LBSimulation::simStep);
1226     // **CWL** Officially recording my disdain here for using ints for bool
1227     if (LBSimulation::showDecisionsOnly) {
1228       simResults->PrintDecisions(migrateMsg, simFileName, 
1229                                  LBSimulation::simProcs);
1230     } else {
1231       simResults->PrintSimulationResults();
1232     }
1233
1234     free(simFileName);
1235     delete migrateMsg;
1236     CmiPrintf("Charm++> LBSim: Passing to the next step\n");
1237   }
1238   // deallocate simResults
1239   delete simResults;
1240   CmiPrintf("Charm++> Exiting...\n");
1241   CkExit();
1242 }
1243
1244 void CentralLB::readStatsMsgs(const char* filename) 
1245 {
1246 #if CMK_LBDB_ON
1247   int i;
1248   FILE *f = fopen(filename, "r");
1249   if (f==NULL) {
1250     CmiPrintf("Fatal Error> Cannot open LB Dump file %s!\n", filename);
1251     CmiAbort("");
1252   }
1253
1254   // at this stage, we need to rebuild the statsMsgList and
1255   // statsDataList structures. For that first deallocate the
1256   // old structures
1257   if (statsMsgsList) {
1258     for(i = 0; i < stats_msg_count; i++)
1259       delete statsMsgsList[i];
1260     delete[] statsMsgsList;
1261     statsMsgsList=0;
1262   }
1263
1264   PUP::fromDisk pd(f);
1265   PUP::machineInfo machInfo;
1266
1267   pd((char *)&machInfo, sizeof(machInfo));      // read machine info
1268   PUP::xlater p(machInfo, pd);
1269
1270   if (_lb_args.lbversion() > 1) {
1271     p|_lb_args.lbversion();             // write version number
1272     CkPrintf("LB> File version detected: %d\n", _lb_args.lbversion());
1273     CmiAssert(_lb_args.lbversion() <= LB_FORMAT_VERSION);
1274   }
1275   p|stats_msg_count;
1276
1277   CmiPrintf("readStatsMsgs for %d pes starts ... \n", stats_msg_count);
1278   if (LBSimulation::simProcs == 0) LBSimulation::simProcs = stats_msg_count;
1279   if (LBSimulation::simProcs != stats_msg_count) LBSimulation::procsChanged = true;
1280
1281   // LBSimulation::simProcs must be set
1282   statsData->pup(p);
1283
1284   CmiPrintf("Simulation for %d pes \n", LBSimulation::simProcs);
1285   CmiPrintf("n_obj: %d n_migratble: %d \n", statsData->n_objs, statsData->n_migrateobjs);
1286
1287   // file f is closed in the destructor of PUP::fromDisk
1288   CmiPrintf("ReadStatsMsg from %s completed\n", filename);
1289 #endif
1290 }
1291
1292 void CentralLB::writeStatsMsgs(const char* filename) 
1293 {
1294 #if CMK_LBDB_ON
1295   FILE *f = fopen(filename, "w");
1296   if (f==NULL) {
1297     CmiPrintf("Fatal Error> writeStatsMsgs failed to open the output file %s!\n", filename);
1298     CmiAbort("");
1299   }
1300
1301   const PUP::machineInfo &machInfo = PUP::machineInfo::current();
1302   PUP::toDisk p(f);
1303   p((char *)&machInfo, sizeof(machInfo));       // machine info
1304
1305   p|_lb_args.lbversion();               // write version number
1306   p|stats_msg_count;
1307   statsData->pup(p);
1308
1309   fclose(f);
1310
1311   CmiPrintf("WriteStatsMsgs to %s succeed!\n", filename);
1312 #endif
1313 }
1314
1315 // calculate the predicted wallclock/cpu load for every processors
1316 // considering communication overhead if considerComm is true
1317 void getPredictedLoadWithMsg(BaseLB::LDStats* stats, int count, 
1318                       LBMigrateMsg *msg, LBInfo &info, 
1319                       int considerComm)
1320 {
1321 #if CMK_LBDB_ON
1322         stats->makeCommHash();
1323
1324         // update to_proc according to migration msgs
1325         for(int i = 0; i < msg->n_moves; i++) {
1326           MigrateInfo &mInfo = msg->moves[i];
1327           int idx = stats->getHash(mInfo.obj.objID(), mInfo.obj.omID());
1328           CmiAssert(idx != -1);
1329           stats->to_proc[idx] = mInfo.to_pe;
1330         }
1331
1332         info.getInfo(stats, count, considerComm);
1333 #endif
1334 }
1335
1336
1337 void CentralLB::findSimResults(LDStats* stats, int count, LBMigrateMsg* msg, LBSimulation* simResults)
1338 {
1339     CkAssert(simResults != NULL && count == simResults->numPes);
1340     // estimate the new loads of the processors. As a first approximation, this is the
1341     // sum of the cpu times of the objects on that processor
1342     double startT = CkWallTimer();
1343     getPredictedLoadWithMsg(stats, count, msg, simResults->lbinfo, 1);
1344     CmiPrintf("getPredictedLoad finished in %fs\n", CkWallTimer()-startT);
1345 }
1346
1347 void CentralLB::pup(PUP::er &p) { 
1348   BaseLB::pup(p); 
1349   if (p.isUnpacking())  {
1350     initLB(CkLBOptions(seqno)); 
1351   }
1352   p|reduction_started;
1353   int has_statsMsg=0;
1354   if (p.isPacking()) has_statsMsg = (statsMsg!=NULL);
1355   p|has_statsMsg;
1356   if (has_statsMsg) {
1357     if (p.isUnpacking())
1358       statsMsg = new CLBStatsMsg;
1359     statsMsg->pup(p);
1360   }
1361 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1362   p | lbDecisionCount;
1363   p | resumeCount;
1364 #endif
1365         
1366 }
1367
1368 int CentralLB::useMem() { 
1369   return sizeof(CentralLB) + statsData->useMem() + 
1370          CkNumPes() * sizeof(CLBStatsMsg *);
1371 }
1372
1373
1374 /**
1375   CLBStatsMsg is not a real message now.
1376   CLBStatsMsg is used for all processors to fill in their local load and comm
1377   statistics and send to processor 0
1378 */
1379
1380 CLBStatsMsg::CLBStatsMsg(int osz, int csz) {
1381   n_objs = osz;
1382   n_comm = csz;
1383   objData = new LDObjData[osz];
1384   commData = new LDCommData[csz];
1385   avail_vector = NULL;
1386 }
1387
1388 CLBStatsMsg::~CLBStatsMsg() {
1389   delete [] objData;
1390   delete [] commData;
1391   delete [] avail_vector;
1392 }
1393
1394 void CLBStatsMsg::pup(PUP::er &p) {
1395   int i;
1396   p|from_pe;
1397   p|pe_speed;
1398   p|total_walltime;
1399   p|idletime;
1400   p|bg_walltime;
1401 #if CMK_LB_CPUTIMER
1402   p|total_cputime;
1403   p|bg_cputime;
1404 #endif
1405 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1406   p | step;
1407 #endif
1408   p|n_objs;
1409   if (p.isUnpacking()) objData = new LDObjData[n_objs];
1410   for (i=0; i<n_objs; i++) p|objData[i];
1411   p|n_comm;
1412   if (p.isUnpacking()) commData = new LDCommData[n_comm];
1413   for (i=0; i<n_comm; i++) p|commData[i];
1414
1415   int has_avail_vector;
1416   if (!p.isUnpacking()) has_avail_vector = (avail_vector != NULL);
1417   p|has_avail_vector;
1418   if (p.isUnpacking()) {
1419     if (has_avail_vector) avail_vector = new char[CkNumPes()];
1420     else avail_vector = NULL;
1421   }
1422   if (has_avail_vector) p(avail_vector, CkNumPes());
1423
1424   p(next_lb);
1425 }
1426
1427 // CkMarshalledCLBStatsMessage is used in the marshalled parameter in
1428 // the entry function, it is just used to use to pup.
1429 // I don't use CLBStatsMsg directly as marshalled parameter because
1430 // I want the data pointer stored and not to be freed by the Charm++.
1431 void CkMarshalledCLBStatsMessage::free() { 
1432   int count = msgs.size();
1433   for  (int i=0; i<count; i++) {
1434     delete msgs[i];
1435     msgs[i] = NULL;
1436   }
1437   msgs.free();
1438 }
1439
1440 void CkMarshalledCLBStatsMessage::add(CkMarshalledCLBStatsMessage &m)
1441 {
1442   int count = m.getCount();
1443   for (int i=0; i<count; i++) add(m.getMessage(i));
1444 }
1445
1446 void CkMarshalledCLBStatsMessage::pup(PUP::er &p)
1447 {
1448   int count = msgs.size();
1449   p|count;
1450   for (int i=0; i<count; i++) {
1451     CLBStatsMsg *msg;
1452     if (p.isUnpacking()) msg = new CLBStatsMsg;
1453     else { 
1454       msg = msgs[i]; CmiAssert(msg!=NULL);
1455     }
1456     msg->pup(p);
1457     if (p.isUnpacking()) add(msg);
1458   }
1459 }
1460
1461 SpanningTree::SpanningTree()
1462 {
1463         double sq = sqrt(CkNumPes()*4.0-3.0) - 1; // 1 + arity + arity*arity = CkNumPes()
1464         arity = (int)ceil(sq/2);
1465         calcParent(CkMyPe());
1466         calcNumChildren(CkMyPe());
1467 }
1468
1469 void SpanningTree::calcParent(int n)
1470 {
1471         parent=-1;
1472         if(n != 0  && arity > 0)
1473                 parent = (n-1)/arity;
1474 }
1475
1476 void SpanningTree::calcNumChildren(int n)
1477 {
1478         numChildren = 0;
1479         if (arity == 0) return;
1480         int fullNode=(CkNumPes()-1-arity)/arity;
1481         if(n <= fullNode)
1482                 numChildren = arity;
1483         if(n == fullNode+1)
1484                 numChildren = CkNumPes()-1-(fullNode+1)*arity;
1485         if(n > fullNode+1)
1486                 numChildren = 0;
1487 }
1488
1489 #include "CentralLB.def.h"
1490  
1491 /*@}*/