Making use of both message-logging techniques homogeneous in Charm++ core.
[charm.git] / src / ck-ldb / CentralLB.C
1 /**
2  * \addtogroup CkLdb
3 */
4 /*@{*/
5
6 #include <charm++.h>
7 #include "ck.h"
8 #include "envelope.h"
9 #include "CentralLB.h"
10 #include "LBDBManager.h"
11 #include "LBSimulation.h"
12
13 #define  DEBUGF(x)       // CmiPrintf x;
14 #define  DEBUG(x)        // x;
15
16 #if CMK_MEM_CHECKPOINT
17    /* can not handle reduction in inmem FT */
18 #define USE_REDUCTION         0
19 #define USE_LDB_SPANNING_TREE 0
20 #else
21 #define USE_REDUCTION         1
22 #define USE_LDB_SPANNING_TREE 1
23 #endif
24
25 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
26 extern int _restartFlag;
27 extern void getGlobalStep(CkGroupID );
28 extern void initMlogLBStep(CkGroupID );
29 extern int globalResumeCount;
30 extern void sendDummyMigrationCounts(int *);
31 #endif
32
33 #if CMK_GRID_QUEUE_AVAILABLE
34 CpvExtern(void *, CkGridObject);
35 #endif
36
37 CkGroupID loadbalancer;
38 int * lb_ptr;
39 int load_balancer_created;
40
41 CreateLBFunc_Def(CentralLB, "CentralLB base class")
42
43 static void getPredictedLoadWithMsg(BaseLB::LDStats* stats, int count, 
44                              LBMigrateMsg *, LBInfo &info, int considerComm);
45
46 /*
47 void CreateCentralLB()
48 {
49   CProxy_CentralLB::ckNew(0);
50 }
51 */
52
53 void CentralLB::staticStartLB(void* data)
54 {
55   CentralLB *me = (CentralLB*)(data);
56   me->StartLB();
57 }
58
59 void CentralLB::staticMigrated(void* data, LDObjHandle h, int waitBarrier)
60 {
61   CentralLB *me = (CentralLB*)(data);
62   me->Migrated(h, waitBarrier);
63 }
64
65 void CentralLB::staticAtSync(void* data)
66 {
67   CentralLB *me = (CentralLB*)(data);
68   me->AtSync();
69 }
70
71 void CentralLB::initLB(const CkLBOptions &opt)
72 {
73 #if CMK_LBDB_ON
74   lbname = "CentralLB";
75   thisProxy = CProxy_CentralLB(thisgroup);
76   //  CkPrintf("Construct in %d\n",CkMyPe());
77
78   // create and turn on by default
79   receiver = theLbdb->
80     AddLocalBarrierReceiver((LDBarrierFn)(staticAtSync),(void*)(this));
81   notifier = theLbdb->getLBDB()->
82     NotifyMigrated((LDMigratedFn)(staticMigrated),(void*)(this));
83   startLbFnHdl = theLbdb->getLBDB()->
84     AddStartLBFn((LDStartLBFn)(staticStartLB),(void*)(this));
85
86   // CkPrintf("[%d] CentralLB initLB \n",CkMyPe());
87   if (opt.getSeqNo() > 0) turnOff();
88
89   stats_msg_count = 0;
90   statsMsgsList = NULL;
91   statsData = NULL;
92
93   storedMigrateMsg = NULL;
94   reduction_started = 0;
95
96   // for future predictor
97   if (_lb_predict) predicted_model = new FutureModel(_lb_predict_window);
98   else predicted_model=0;
99   // register user interface callbacks
100   theLbdb->getLBDB()->SetupPredictor((LDPredictModelFn)(staticPredictorOn),(LDPredictWindowFn)(staticPredictorOnWin),(LDPredictFn)(staticPredictorOff),(LDPredictModelFn)(staticChangePredictor),(void*)(this));
101
102   myspeed = theLbdb->ProcessorSpeed();
103
104   migrates_completed = 0;
105   future_migrates_completed = 0;
106   migrates_expected = -1;
107   future_migrates_expected = -1;
108   cur_ld_balancer = _lb_args.central_pe();      // 0 default
109   lbdone = 0;
110   count_msgs=0;
111   statsMsg = NULL;
112
113   if (_lb_args.statsOn()) theLbdb->CollectStatsOn();
114
115   load_balancer_created = 1;
116 #endif
117 }
118
119 CentralLB::~CentralLB()
120 {
121 #if CMK_LBDB_ON
122   delete [] statsMsgsList;
123   delete statsData;
124   theLbdb = CProxy_LBDatabase(_lbdb).ckLocalBranch();
125   if (theLbdb) {
126     theLbdb->getLBDB()->
127       RemoveNotifyMigrated(notifier);
128     theLbdb->
129       RemoveStartLBFn((LDStartLBFn)(staticStartLB));
130   }
131 #endif
132 }
133
134 void CentralLB::turnOn() 
135 {
136 #if CMK_LBDB_ON
137   theLbdb->getLBDB()->
138     TurnOnBarrierReceiver(receiver);
139   theLbdb->getLBDB()->
140     TurnOnNotifyMigrated(notifier);
141   theLbdb->getLBDB()->
142     TurnOnStartLBFn(startLbFnHdl);
143 #endif
144 }
145
146 void CentralLB::turnOff() 
147 {
148 #if CMK_LBDB_ON
149   theLbdb->getLBDB()->
150     TurnOffBarrierReceiver(receiver);
151   theLbdb->getLBDB()->
152     TurnOffNotifyMigrated(notifier);
153   theLbdb->getLBDB()->
154     TurnOffStartLBFn(startLbFnHdl);
155 #endif
156 }
157
158 void CentralLB::AtSync()
159 {
160 #if CMK_LBDB_ON
161   DEBUGF(("[%d] CentralLB AtSync step %d!!!!!\n",CkMyPe(),step()));
162
163 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
164         CpvAccess(_currentObj)=this;
165 #endif
166
167   // if num of processor is only 1, nothing should happen
168   if (!QueryBalanceNow(step()) || CkNumPes() == 1) {
169     MigrationDone(0);
170     return;
171   }
172   if(CmiNodeAlive(CkMyPe())){
173     thisProxy [CkMyPe()].ProcessAtSync();
174   }
175 #endif
176 }
177
178 #include "ComlibStrategy.h"
179
180 void CentralLB::ProcessAtSync()
181 {
182 #if CMK_LBDB_ON
183   if (reduction_started) return;              // reducton in progress
184
185   CmiAssert(CmiNodeAlive(CkMyPe()));
186   if (CkMyPe() == cur_ld_balancer) {
187     start_lb_time = CkWallTimer();
188   }
189
190
191 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
192         initMlogLBStep(thisgroup);
193 #endif
194
195   // build message
196   BuildStatsMsg();
197
198 #if USE_REDUCTION
199     // reduction to get total number of objects and comm
200     // so that processor 0 can pre-allocate load balancing database
201   int counts[2];
202   counts[0] = theLbdb->GetObjDataSz();
203   counts[1] = theLbdb->GetCommDataSz();
204
205   CkCallback cb(CkIndex_CentralLB::ReceiveCounts((CkReductionMsg*)NULL), 
206                   thisProxy[0]);
207   contribute(2*sizeof(int), counts, CkReduction::sum_int, cb);
208   reduction_started = 1;
209 #else
210   SendStats();
211 #endif
212 #endif
213 }
214
215 // called only on 0
216 void CentralLB::ReceiveCounts(CkReductionMsg  *msg)
217 {
218   CmiAssert(CkMyPe() == 0);
219   if (statsData == NULL) statsData = new LDStats;
220
221   int *counts = (int *)msg->getData();
222   int n_objs = counts[0];
223   int n_comm = counts[1];
224
225     // resize database
226   statsData->objData.resize(n_objs);
227   statsData->from_proc.resize(n_objs);
228   statsData->to_proc.resize(n_objs);
229   statsData->commData.resize(n_comm);
230
231   DEBUGF(("[%d] ReceiveCounts: n_objs:%d n_comm:%d\n",CkMyPe(), n_objs, n_comm));
232         
233     // broadcast call to let everybody start to send stats
234   thisProxy.SendStats();
235 }
236
237 void CentralLB::BuildStatsMsg()
238 {
239 #if CMK_LBDB_ON
240   // build and send stats
241   const int osz = theLbdb->GetObjDataSz();
242   const int csz = theLbdb->GetCommDataSz();
243
244   int npes = CkNumPes();
245   CLBStatsMsg* msg = new CLBStatsMsg(osz, csz);
246   _MEMCHECK(msg);
247   msg->from_pe = CkMyPe();
248 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
249         msg->step = step();
250 #endif
251   //msg->serial = CrnRand();
252
253 /*
254   theLbdb->TotalTime(&msg->total_walltime,&msg->total_cputime);
255   theLbdb->IdleTime(&msg->idletime);
256   theLbdb->BackgroundLoad(&msg->bg_walltime,&msg->bg_cputime);
257 */
258 #if CMK_LB_CPUTIMER
259   theLbdb->GetTime(&msg->total_walltime,&msg->total_cputime,
260                    &msg->idletime, &msg->bg_walltime,&msg->bg_cputime);
261 #else
262   theLbdb->GetTime(&msg->total_walltime,&msg->total_walltime,
263                    &msg->idletime, &msg->bg_walltime,&msg->bg_walltime);
264 #endif
265
266   msg->pe_speed = myspeed;
267   DEBUGF(("Processor %d Total time (wall,cpu) = %f %f Idle = %f Bg = %f %f\n", CkMyPe(),msg->total_walltime,msg->total_cputime,msg->idletime,msg->bg_walltime,msg->bg_cputime));
268
269   msg->n_objs = osz;
270   theLbdb->GetObjData(msg->objData);
271   msg->n_comm = csz;
272   theLbdb->GetCommData(msg->commData);
273 //  theLbdb->ClearLoads();
274   DEBUGF(("PE %d BuildStatsMsg %d objs, %d comm\n",CkMyPe(),msg->n_objs,msg->n_comm));
275
276   if(CkMyPe() == cur_ld_balancer) {
277     msg->avail_vector = new char[CkNumPes()];
278     LBDatabaseObj()->get_avail_vector(msg->avail_vector);
279     msg->next_lb = LBDatabaseObj()->new_lbbalancer();
280   }
281
282   CmiAssert(statsMsg == NULL);
283   statsMsg = msg;
284 #endif
285 }
286
287 // called on every processor
288 void CentralLB::SendStats()
289 {
290 #if CMK_LBDB_ON
291   CmiAssert(statsMsg != NULL);
292   reduction_started = 0;
293
294 #if USE_LDB_SPANNING_TREE
295   if(CkNumPes()>1024)
296   {
297     if (CkMyPe() == cur_ld_balancer)
298       thisProxy[CkMyPe()].ReceiveStats(statsMsg);
299     else
300       thisProxy[CkMyPe()].ReceiveStatsViaTree(statsMsg);
301   }
302   else
303 #endif
304   {
305     DEBUGF(("[%d] calling ReceiveStats on step %d \n",CmiMyPe(),step()));
306     thisProxy[cur_ld_balancer].ReceiveStats(statsMsg);
307   }
308
309   statsMsg = NULL;
310
311 #ifdef __BIGSIM__
312   BgEndStreaming();
313 #endif
314
315   {
316   // enfore the barrier to wait until centralLB says no
317   LDOMHandle h;
318   h.id.id.idx = 0;
319   theLbdb->getLBDB()->RegisteringObjects(h);
320   }
321 #endif
322 }
323
324 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
325 extern int donotCountMigration;
326 #endif
327
328 void CentralLB::Migrated(LDObjHandle h, int waitBarrier)
329 {
330 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
331     if(donotCountMigration){
332         return ;
333     }
334 #endif
335
336 #if CMK_LBDB_ON
337   if (waitBarrier) {
338             migrates_completed++;
339       DEBUGF(("[%d] An object migrated! %d %d\n",CkMyPe(),migrates_completed,migrates_expected));
340     if (migrates_completed == migrates_expected) {
341       MigrationDone(1);
342     }
343   }
344   else {
345     future_migrates_completed ++;
346     DEBUGF(("[%d] An object migrated with no barrier! %d expected: %d\n",CkMyPe(),future_migrates_completed,future_migrates_expected));
347     if (future_migrates_completed == future_migrates_expected)  {
348         CheckMigrationComplete();
349     }
350   }
351 #endif
352 }
353
354 void CentralLB::MissMigrate(int waitForBarrier)
355 {
356   LDObjHandle h;
357   Migrated(h, waitForBarrier);
358 }
359
360 // build a complete data from bufferred messages
361 // not used when USE_REDUCTION = 1
362 void CentralLB::buildStats()
363 {
364     statsData->nprocs() = stats_msg_count;
365     // allocate space
366     statsData->objData.resize(statsData->n_objs);
367     statsData->from_proc.resize(statsData->n_objs);
368     statsData->to_proc.resize(statsData->n_objs);
369     statsData->commData.resize(statsData->n_comm);
370
371     int nobj = 0;
372     int ncom = 0;
373     int nmigobj = 0;
374     // copy all data in individule message to this big structure
375     for (int pe=0; pe<CkNumPes(); pe++) {
376        int i;
377        CLBStatsMsg *msg = statsMsgsList[pe];
378        if(msg == NULL) continue;
379        for (i=0; i<msg->n_objs; i++) {
380          statsData->from_proc[nobj] = statsData->to_proc[nobj] = pe;
381          statsData->objData[nobj] = msg->objData[i];
382          if (msg->objData[i].migratable) nmigobj++;
383          nobj++;
384        }
385        for (i=0; i<msg->n_comm; i++) {
386          statsData->commData[ncom] = msg->commData[i];
387          ncom++;
388        }
389        // free the memory
390        delete msg;
391        statsMsgsList[pe]=0;
392     }
393     statsData->n_migrateobjs = nmigobj;
394 }
395
396 // deposit one processor data at a time, note database is pre-allocated
397 // to have enough space
398 // used when USE_REDUCTION = 1
399 void CentralLB::depositData(CLBStatsMsg *m)
400 {
401   int i;
402   if (m == NULL) return;
403
404   const int pe = m->from_pe;
405   struct ProcStats &procStat = statsData->procs[pe];
406   procStat.pe = pe;
407   procStat.total_walltime = m->total_walltime;
408   procStat.idletime = m->idletime;
409   procStat.bg_walltime = m->bg_walltime;
410 #if CMK_LB_CPUTIMER
411   procStat.total_cputime = m->total_cputime;
412   procStat.bg_cputime = m->bg_cputime;
413 #endif
414   procStat.pe_speed = m->pe_speed;
415   //procStat.utilization = 1.0;
416   procStat.available = CmiTrue;
417   procStat.n_objs = m->n_objs;
418
419   int &nobj = statsData->n_objs;
420   int &nmigobj = statsData->n_migrateobjs;
421   for (i=0; i<m->n_objs; i++) {
422       statsData->from_proc[nobj] = statsData->to_proc[nobj] = pe;
423       statsData->objData[nobj] = m->objData[i];
424       if (m->objData[i].migratable) nmigobj++;
425       nobj++;
426       CmiAssert(nobj <= statsData->objData.capacity());
427   }
428   int &n_comm = statsData->n_comm;
429   for (i=0; i<m->n_comm; i++) {
430       statsData->commData[n_comm] = m->commData[i];
431       n_comm++;
432       CmiAssert(n_comm <= statsData->commData.capacity());
433   }
434   delete m;
435 }
436
437 void CentralLB::ReceiveStats(CkMarshalledCLBStatsMessage &msg)
438 {
439 #if CMK_LBDB_ON
440   if (statsMsgsList == NULL) {
441     statsMsgsList = new CLBStatsMsg*[CkNumPes()];
442     CmiAssert(statsMsgsList != NULL);
443     for(int i=0; i < CkNumPes(); i++)
444       statsMsgsList[i] = 0;
445   }
446   if (statsData == NULL) statsData = new LDStats;
447
448     //  loop through all CLBStatsMsg in the incoming msg
449   int count = msg.getCount();
450   for (int num = 0; num < count; num++) 
451   {
452     CLBStatsMsg *m = msg.getMessage(num);
453     CmiAssert(m!=NULL);
454     const int pe = m->from_pe;
455     DEBUGF(("Stats msg received, %d %d %d %p step %d\n", pe,stats_msg_count,m->n_objs,m,step()));
456 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))     
457 /*      
458  *  if(m->step < step()){
459  *    //TODO: if a processor is redoing an old load balance step..
460  *    //tell it that the step is done and that it should not perform any migrations
461  *      thisProxy[pe].ReceiveDummyMigration();
462  *  }*/
463 #endif
464         
465     if(!CmiNodeAlive(pe)){
466         DEBUGF(("[%d] ReceiveStats called from invalidProcessor %d\n",CkMyPe(),pe));
467         continue;
468     }
469         
470     if (m->avail_vector!=NULL) {
471       LBDatabaseObj()->set_avail_vector(m->avail_vector,  m->next_lb);
472     }
473
474     if (statsMsgsList[pe] != 0) {
475       CkPrintf("*** Unexpected CLBStatsMsg in ReceiveStats from PE %d ***\n",
476              pe);
477     } else {
478       statsMsgsList[pe] = m;
479 #if USE_REDUCTION
480       depositData(m);
481 #else
482       // store per processor data right away
483       struct ProcStats &procStat = statsData->procs[pe];
484       procStat.pe = pe;
485       procStat.total_walltime = m->total_walltime;
486       procStat.idletime = m->idletime;
487       procStat.bg_walltime = m->bg_walltime;
488 #if CMK_LB_CPUTIMER
489       procStat.total_cputime = m->total_cputime;
490       procStat.bg_cputime = m->bg_cputime;
491 #endif
492       procStat.pe_speed = m->pe_speed;
493       //procStat.utilization = 1.0;
494       procStat.available = CmiTrue;
495       procStat.n_objs = m->n_objs;
496
497       statsData->n_objs += m->n_objs;
498       statsData->n_comm += m->n_comm;
499 #endif
500       stats_msg_count++;
501     }
502   }    // end of for
503
504   const int clients = CkNumValidPes();
505   DEBUGF(("THIS POINT count = %d, clients = %d\n",stats_msg_count,clients));
506  
507   if (stats_msg_count == clients) {
508         DEBUGF(("[%d] All stats messages received \n",CmiMyPe()));
509     statsData->nprocs() = stats_msg_count;
510     thisProxy[CkMyPe()].LoadBalance();
511   }
512 #endif
513 }
514
515 /** added by Abhinav for receiving msgs via spanning tree */
516 void CentralLB::ReceiveStatsViaTree(CkMarshalledCLBStatsMessage &msg)
517 {
518 #if CMK_LBDB_ON
519         CmiAssert(CkMyPe() != 0);
520         bufMsg.add(msg);         // buffer messages
521         count_msgs++;
522         //CkPrintf("here %d\n", CkMyPe());
523         if (count_msgs == st.numChildren+1) {
524                 if(st.parent == 0)
525                 {
526                         thisProxy[0].ReceiveStats(bufMsg);
527                         //CkPrintf("from %d\n", CkMyPe());
528                 }
529                 else
530                         thisProxy[st.parent].ReceiveStatsViaTree(bufMsg);
531                 count_msgs = 0;
532                 bufMsg.free();
533         } 
534 #endif
535 }
536
537 void CentralLB::LoadBalance()
538 {
539 #if CMK_LBDB_ON
540   int proc;
541   const int clients = CkNumPes();
542
543 #if ! USE_REDUCTION
544   // build data
545   buildStats();
546 #else
547   for (proc = 0; proc < clients; proc++) statsMsgsList[proc] = NULL;
548 #endif
549
550   if (!_lb_args.samePeSpeed()) statsData->normalize_speed();
551
552   if (_lb_args.debug()) 
553       CmiPrintf("\nCharmLB> %s: PE [%d] step %d starting at %f Memory: %f MB\n",
554                   lbname, cur_ld_balancer, step(), start_lb_time,
555                   CmiMemoryUsage()/(1024.0*1024.0));
556
557   // if we are in simulation mode read data
558   if (LBSimulation::doSimulation) simulationRead();
559
560   char *availVector = LBDatabaseObj()->availVector();
561   for(proc = 0; proc < clients; proc++)
562       statsData->procs[proc].available = (CmiBool)availVector[proc];
563
564   preprocess(statsData);
565
566 //    CkPrintf("Before Calling Strategy\n");
567
568   if (_lb_args.printSummary()) {
569       LBInfo info(clients);
570         // not take comm data
571       info.getInfo(statsData, clients, 0);
572       LBRealType mLoad, mCpuLoad, totalLoad;
573       info.getSummary(mLoad, mCpuLoad, totalLoad);
574       int nmsgs, nbytes;
575       statsData->computeNonlocalComm(nmsgs, nbytes);
576       CkPrintf("[%d] Load Summary (before LB): max (with bg load): %f max (obj only): %f average: %f at step %d nonlocal: %d msgs %.2fKB.\n", CkMyPe(), mLoad, mCpuLoad, totalLoad/clients, step(), nmsgs, 1.0*nbytes/1024);
577 //      if (_lb_args.debug() > 1) {
578 //        for (int i=0; i<statsData->n_objs; i++)
579 //          CmiPrintf("[%d] %.10f %.10f\n", i, statsData->objData[i].minWall, statsData->objData[i].maxWall);
580 //      }
581   }
582
583 #if CMK_REPLAYSYSTEM
584   LDHandle *loadBalancer_pointers;
585   if (_replaySystem) {
586     loadBalancer_pointers = (LDHandle*)malloc(CkNumPes()*sizeof(LDHandle));
587     for (int i=0; i<statsData->n_objs; ++i) loadBalancer_pointers[statsData->from_proc[i]] = statsData->objData[i].handle.omhandle.ldb;
588   }
589 #endif
590   
591   LBMigrateMsg* migrateMsg = Strategy(statsData);
592 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
593         migrateMsg->step = step();
594 #endif
595
596 #if CMK_REPLAYSYSTEM
597   CpdHandleLBMessage(&migrateMsg);
598   if (_replaySystem) {
599     for (int i=0; i<migrateMsg->n_moves; ++i) migrateMsg->moves[i].obj.omhandle.ldb = loadBalancer_pointers[migrateMsg->moves[i].from_pe];
600     free(loadBalancer_pointers);
601   }
602 #endif
603   
604   LBDatabaseObj()->get_avail_vector(migrateMsg->avail_vector);
605   migrateMsg->next_lb = LBDatabaseObj()->new_lbbalancer();
606
607   // if this is the step at which we need to dump the database
608   simulationWrite();
609
610 //  calculate predicted load
611 //  very time consuming though, so only happen when debugging is on
612   if (_lb_args.printSummary()) {
613       LBInfo info(clients);
614         // not take comm data
615       getPredictedLoadWithMsg(statsData, clients, migrateMsg, info, 0);
616       LBRealType mLoad, mCpuLoad, totalLoad;
617       info.getSummary(mLoad, mCpuLoad, totalLoad);
618       int nmsgs, nbytes;
619       statsData->computeNonlocalComm(nmsgs, nbytes);
620       CkPrintf("[%d] Load Summary (after LB): max (with bg load): %f max (obj only): %f average: %f at step %d nonlocal: %d msgs %.2fKB useMem: %.2fKB.\n", CkMyPe(), mLoad, mCpuLoad, totalLoad/clients, step(), nmsgs, 1.0*nbytes/1024, (1.0*useMem())/1024);
621       for (int i=0; i<clients; i++)
622         migrateMsg->expectedLoad[i] = info.peLoads[i];
623   }
624
625   DEBUGF(("[%d]calling recv migration\n",CkMyPe()));
626 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_)) 
627     lbDecisionCount++;
628     migrateMsg->lbDecisionCount = lbDecisionCount;
629 #endif
630
631   envelope *env = UsrToEnv(migrateMsg);
632   if (1) {
633       // broadcast
634     thisProxy.ReceiveMigration(migrateMsg);
635   }
636   else {
637     // split the migration for each processor
638     for (int p=0; p<CkNumPes(); p++) {
639       LBMigrateMsg *m = extractMigrateMsg(migrateMsg, p);
640       thisProxy[p].ReceiveMigration(m);
641     }
642     delete migrateMsg;
643   }
644
645   // Zero out data structures for next cycle
646   // CkPrintf("zeroing out data\n");
647   statsData->clear();
648   stats_msg_count=0;
649 #endif
650 }
651
652 // test if sender and receiver in a commData is nonmigratable.
653 static int isMigratable(LDObjData **objData, int *len, int count, const LDCommData &commData)
654 {
655 #if CMK_LBDB_ON
656   for (int pe=0 ; pe<count; pe++)
657   {
658     for (int i=0; i<len[pe]; i++)
659       if (LDObjIDEqual(objData[pe][i].objID(), commData.sender.objID()) ||
660           LDObjIDEqual(objData[pe][i].objID(), commData.receiver.get_destObj().objID())) 
661       return 0;
662   }
663 #endif
664   return 1;
665 }
666
667 // rebuild LDStats and remove all non-migratble objects and related things
668 void CentralLB::removeNonMigratable(LDStats* stats, int count)
669 {
670   int i;
671
672   // check if we have non-migratable objects
673   int have = 0;
674   for (i=0; i<stats->n_objs; i++) 
675   {
676     LDObjData &odata = stats->objData[i];
677     if (!odata.migratable) {
678       have = 1; break;
679     }
680   }
681   if (have == 0) return;
682
683   CkVec<LDObjData> nonmig;
684   CkVec<int> new_from_proc, new_to_proc;
685   nonmig.resize(stats->n_migrateobjs);
686   new_from_proc.resize(stats->n_migrateobjs);
687   new_to_proc.resize(stats->n_migrateobjs);
688   int n_objs = 0;
689   for (i=0; i<stats->n_objs; i++) 
690   {
691     LDObjData &odata = stats->objData[i];
692     if (odata.migratable) {
693       nonmig[n_objs] = odata;
694       new_from_proc[n_objs] = stats->from_proc[i];
695       new_to_proc[n_objs] = stats->to_proc[i];
696       n_objs ++;
697     }
698     else {
699       stats->procs[stats->from_proc[i]].bg_walltime += odata.wallTime;
700 #if CMK_LB_CPUTIMER
701       stats->procs[stats->from_proc[i]].bg_cputime += odata.cpuTime;
702 #endif
703     }
704   }
705   CmiAssert(stats->n_migrateobjs == n_objs);
706
707   stats->makeCommHash();
708   
709   CkVec<LDCommData> newCommData;
710   newCommData.resize(stats->n_comm);
711   int n_comm = 0;
712   for (i=0; i<stats->n_comm; i++) 
713   {
714     LDCommData& cdata = stats->commData[i];
715     if (!cdata.from_proc()) 
716     {
717       int idx = stats->getSendHash(cdata);
718       CmiAssert(idx != -1);
719       if (!stats->objData[idx].migratable) continue;
720     }
721     switch (cdata.receiver.get_type()) {
722     case LD_PROC_MSG:
723       break;
724     case LD_OBJ_MSG:  {
725       int idx = stats->getRecvHash(cdata);
726       if (stats->complete_flag)
727         CmiAssert(idx != -1);
728       else if (idx == -1) continue;          // receiver not in this group
729       if (!stats->objData[idx].migratable) continue;
730       break;
731       }
732     case LD_OBJLIST_MSG:    // object message FIXME add multicast
733       break;
734     }
735     newCommData[n_comm] = cdata;
736     n_comm ++;
737   }
738
739   if (n_objs != stats->n_objs) CmiPrintf("Removed %d nonmigratable %d comms - n_objs:%d migratable:%d\n", stats->n_objs-n_objs, stats->n_objs, stats->n_migrateobjs, stats->n_comm-n_comm);
740
741   // swap to new data
742   stats->objData = nonmig;
743   stats->from_proc = new_from_proc;
744   stats->to_proc = new_to_proc;
745   stats->n_objs = n_objs;
746
747   stats->commData = newCommData;
748   stats->n_comm = n_comm;
749
750   stats->deleteCommHash();
751   stats->makeCommHash();
752
753 }
754
755
756 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
757 extern int restarted;
758 #endif
759
760 void CentralLB::ReceiveMigration(LBMigrateMsg *m)
761 {
762   storedMigrateMsg = m;
763 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
764         restoreParallelRecovery(&resumeAfterRestoreParallelRecovery,(void *)this);
765 #else
766   CkCallback cb(CkIndex_CentralLB::ProcessReceiveMigration((CkReductionMsg*)NULL),
767                   thisProxy);
768   contribute(0, NULL, CkReduction::max_int, cb);
769 #endif
770 }
771
772 void CentralLB::ProcessReceiveMigration(CkReductionMsg  *msg)
773 {
774 #if CMK_LBDB_ON
775         int i;
776         LBMigrateMsg *m = storedMigrateMsg;
777         CmiAssert(m!=NULL);
778         delete msg;
779
780 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
781         int *dummyCounts;
782
783         DEBUGF(("[%d] Starting ReceiveMigration WITH step %d m->step %d\n",CkMyPe(),step(),m->step));
784         // CmiPrintf("[%d] Starting ReceiveMigration step %d m->step %d\n",CkMyPe(),step(),m->step);
785         if(step() > m->step){
786                 char str[100];
787                 envelope *env = UsrToEnv(m);
788                 CmiPrintf("[%d] Object %s tProcessed %d m->TN %d\n",CmiMyPe(),mlogData->objID.toString(str),mlogData->tProcessed,env->TN);
789                 return;
790         }
791         lbDecisionCount = m->lbDecisionCount;
792 #endif
793
794   if (_lb_args.debug() > 1) 
795     if (CkMyPe()%1024==0) CmiPrintf("[%d] Starting ReceiveMigration step %d at %f\n",CkMyPe(),step(), CmiWallTimer());
796
797   for (i=0; i<CkNumPes(); i++) theLbdb->lastLBInfo.expectedLoad[i] = m->expectedLoad[i];
798   CmiAssert(migrates_expected <= 0 || migrates_completed == migrates_expected);
799 /*FAULT_EVAC*/
800   if(!CmiNodeAlive(CkMyPe())){
801         delete m;
802         return;
803   }
804   migrates_expected = 0;
805   future_migrates_expected = 0;
806 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
807         int sending=0;
808     int dummy=0;
809         LBDB *_myLBDB = theLbdb->getLBDB();
810         if(_restartFlag){
811         dummyCounts = new int[CmiNumPes()];
812         bzero(dummyCounts,sizeof(int)*CmiNumPes());
813     }
814 #endif
815   for(i=0; i < m->n_moves; i++) {
816     MigrateInfo& move = m->moves[i];
817     const int me = CkMyPe();
818     if (move.from_pe == me && move.to_pe != me) {
819       DEBUGF(("[%d] migrating object to %d\n",move.from_pe,move.to_pe));
820       // migrate object, in case it is already gone, inform toPe
821 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
822       if (theLbdb->Migrate(move.obj,move.to_pe) == 0) 
823          thisProxy[move.to_pe].MissMigrate(!move.async_arrival);
824 #else
825             if(_restartFlag == 0){
826                 DEBUG(CmiPrintf("[%d] need to move object from %d to %d \n",CkMyPe(),move.from_pe,move.to_pe));
827                 theLbdb->Migrate(move.obj,move.to_pe);
828                 sending++;
829             }else{
830                 if(_myLBDB->validObjHandle(move.obj)){
831                     DEBUG(CmiPrintf("[%d] need to move object from %d to %d \n",CkMyPe(),move.from_pe,move.to_pe));
832                     theLbdb->Migrate(move.obj,move.to_pe);
833                     sending++;
834                 }else{
835                     DEBUG(CmiPrintf("[%d] dummy move to pe %d detected after restart \n",CmiMyPe(),move.to_pe));
836                     dummyCounts[move.to_pe]++;
837                     dummy++;
838                 }
839             }
840 #endif
841     } else if (move.from_pe != me && move.to_pe == me) {
842        DEBUGF(("[%d] expecting object from %d\n",move.to_pe,move.from_pe));
843       if (!move.async_arrival) migrates_expected++;
844       else future_migrates_expected++;
845     }
846   }
847   DEBUGF(("[%d] in ReceiveMigration %d moves expected: %d future expected: %d\n",CkMyPe(),m->n_moves, migrates_expected, future_migrates_expected));
848   // if (_lb_debug) CkPrintf("[%d] expecting %d objects migrating.\n", CkMyPe(), migrates_expected);
849
850 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
851         if(_restartFlag){
852                 sendDummyMigrationCounts(dummyCounts);
853                 _restartFlag  =0;
854         delete []dummyCounts;
855         }
856 #endif
857
858
859 #if 0
860   if (m->n_moves ==0) {
861     theLbdb->SetLBPeriod(theLbdb->GetLBPeriod()*2);
862   }
863 #endif
864   cur_ld_balancer = m->next_lb;
865   if((CkMyPe() == cur_ld_balancer) && (cur_ld_balancer != 0)){
866       LBDatabaseObj()->set_avail_vector(m->avail_vector, -2);
867   }
868
869   if (migrates_expected == 0 || migrates_completed == migrates_expected)
870     MigrationDone(1);
871   delete m;
872
873 //      CkEvacuatedElement();
874 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
875 //  migrates_expected = 0;
876 //  //  ResumeClients(1);
877 #endif
878 #endif
879 }
880
881 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
882 void CentralLB::ReceiveDummyMigration(int globalDecisionCount){
883     DEBUGF(("[%d] ReceiveDummyMigration called for step %d with globalDecisionCount %d\n",CkMyPe(),step(),globalDecisionCount));
884     //TODO: this is gonna be important when a crash happens during checkpoint
885     //the globalDecisionCount would have to be saved and compared against
886     //a future recvMigration
887                 
888         thisProxy[CkMyPe()].ResumeClients(1);
889 }
890 #endif
891
892 void CentralLB::MigrationDone(int balancing)
893 {
894 #if CMK_LBDB_ON
895   migrates_completed = 0;
896   migrates_expected = -1;
897   // clear load stats
898   if (balancing) theLbdb->ClearLoads();
899   // Increment to next step
900   theLbdb->incStep();
901         DEBUGF(("[%d] Incrementing Step %d \n",CkMyPe(),step()));
902   // if sync resume, invoke a barrier
903
904 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
905     savedBalancing = balancing;
906     startLoadBalancingMlog(&resumeCentralLbAfterChkpt,(void *)this);
907 #endif
908
909   LBDatabase::Object()->MigrationDone();    // call registered callbacks
910
911   LoadbalanceDone(balancing);        // callback
912 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
913   // if sync resume invoke a barrier
914   if (balancing && _lb_args.syncResume()) {
915     CkCallback cb(CkIndex_CentralLB::ResumeClients((CkReductionMsg*)NULL), 
916                   thisProxy);
917     contribute(0, NULL, CkReduction::sum_int, cb);
918   }
919   else{ 
920     if(CmiNodeAlive(CkMyPe())){
921         thisProxy [CkMyPe()].ResumeClients(balancing);
922     }   
923   }     
924 #if CMK_GRID_QUEUE_AVAILABLE
925   CmiGridQueueDeregisterAll ();
926   CpvAccess(CkGridObject) = NULL;
927 #endif
928 #endif 
929 #endif
930 }
931
932 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
933 void CentralLB::endMigrationDone(int balancing){
934     DEBUGF(("[%d] CentralLB::endMigrationDone step %d\n",CkMyPe(),step()));
935
936
937   if (balancing && _lb_args.syncResume()) {
938     CkCallback cb(CkIndex_CentralLB::ResumeClients((CkReductionMsg*)NULL),
939                   thisProxy);
940     contribute(0, NULL, CkReduction::sum_int, cb);
941   }
942   else{
943     if(CmiNodeAlive(CkMyPe())){
944     DEBUGF(("[%d] Sending ResumeClients balancing %d \n",CkMyPe(),balancing));
945     thisProxy [CkMyPe()].ResumeClients(balancing);
946     }
947   }
948
949 }
950 #endif
951
952 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
953 void resumeCentralLbAfterChkpt(void *_lb){
954     CentralLB *lb= (CentralLB *)_lb;
955     CpvAccess(_currentObj)=lb;
956     lb->endMigrationDone(lb->savedBalancing);
957 }
958 void resumeAfterRestoreParallelRecovery(void *_lb){
959     CentralLB *lb= (CentralLB *)_lb;
960         lb->ProcessReceiveMigration((CkReductionMsg*)NULL);
961 }
962 #endif
963
964
965 void CentralLB::ResumeClients(CkReductionMsg *msg)
966 {
967   ResumeClients(1);
968   delete msg;
969 }
970
971 void CentralLB::ResumeClients(int balancing)
972 {
973 #if CMK_LBDB_ON
974 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
975     resumeCount++;
976     globalResumeCount = resumeCount;
977 #endif
978   DEBUGF(("[%d] Resuming clients. balancing:%d.\n",CkMyPe(),balancing));
979   if (balancing && _lb_args.debug() && CkMyPe() == cur_ld_balancer) {
980     double end_lb_time = CkWallTimer();
981   }
982
983 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
984   if (balancing) ComlibNotifyMigrationDone();  
985 #endif
986
987   theLbdb->ResumeClients();
988   if (balancing)  {
989     CheckMigrationComplete();
990     if (future_migrates_expected == 0 || 
991             future_migrates_expected == future_migrates_completed) {
992       CheckMigrationComplete();
993     }
994   }
995 #endif
996 }
997
998 /*
999   migration of objects contains two different kinds:
1000   (1) objects want to make a barrier for migration completion
1001       (waitForBarrier is true)
1002       migrationDone() to finish and resumeClients
1003   (2) objects don't need a barrier
1004   However, next load balancing can only happen when both migrations complete
1005 */ 
1006 void CentralLB::CheckMigrationComplete()
1007 {
1008 #if CMK_LBDB_ON
1009   lbdone ++;
1010   if (lbdone == 2) {
1011     if (_lb_args.debug() && CkMyPe()==0) {
1012       double end_lb_time = CkWallTimer();
1013       CkPrintf("CharmLB> %s: PE [%d] step %d finished at %f duration %f s\n\n",
1014                 lbname, cur_ld_balancer, step()-1, end_lb_time,
1015                 end_lb_time-start_lb_time);
1016     }
1017     lbdone = 0;
1018     future_migrates_expected = -1;
1019     future_migrates_completed = 0;
1020     DEBUGF(("[%d] Migration Complete\n", CkMyPe()));
1021     // release local barrier  so that the next load balancer can go
1022     LDOMHandle h;
1023     h.id.id.idx = 0;
1024     theLbdb->getLBDB()->DoneRegisteringObjects(h);
1025     // switch to the next load balancer in the list
1026     // subtle: called from Migrated() may result in Migrated() called in next LB
1027     theLbdb->nextLoadbalancer(seqno);
1028   }
1029 #endif
1030 }
1031
1032 void CentralLB::preprocess(LDStats* stats)
1033 {
1034   if (_lb_args.ignoreBgLoad())
1035     stats->clearBgLoad();
1036
1037   // Call the predictor for the future
1038   if (_lb_predict) FuturePredictor(statsData);
1039 }
1040
1041 // default load balancing strategy
1042 LBMigrateMsg* CentralLB::Strategy(LDStats* stats)
1043 {
1044 #if CMK_LBDB_ON
1045   double strat_start_time = CkWallTimer();
1046   if (_lb_args.debug())
1047     CkPrintf("CharmLB> %s: PE [%d] strategy starting at %f\n", lbname, cur_ld_balancer, strat_start_time);
1048
1049   work(stats);
1050
1051   if (_lb_args.debug()>2)  {
1052     CkPrintf("CharmLB> Obj Map:\n");
1053     for (int i=0; i<stats->n_objs; i++) CkPrintf("%d ", stats->to_proc[i]);
1054     CkPrintf("\n");
1055   }
1056
1057   LBMigrateMsg *msg = createMigrateMsg(stats);
1058
1059   if (_lb_args.debug()) {
1060     double strat_end_time = CkWallTimer();
1061     envelope *env = UsrToEnv(msg);
1062
1063     double lbdbMemsize = LBDatabase::Object()->useMem()/1000;
1064     CkPrintf("CharmLB> %s: PE [%d] Memory: LBManager: %d KB CentralLB: %d KB\n",
1065               lbname, cur_ld_balancer, (int)lbdbMemsize, (int)(useMem()/1000));
1066     CkPrintf("CharmLB> %s: PE [%d] #Objects migrating: %d, LBMigrateMsg size: %.2f MB\n", lbname, cur_ld_balancer, msg->n_moves, env->getTotalsize()/1024.0/1024.0);
1067     CkPrintf("CharmLB> %s: PE [%d] strategy finished at %f duration %f s\n",
1068               lbname, cur_ld_balancer, strat_end_time, strat_end_time-strat_start_time);
1069   }
1070
1071   return msg;
1072 #else
1073   return NULL;
1074 #endif
1075 }
1076
1077 void CentralLB::work(LDStats* stats)
1078 {
1079   // does nothing but print the database
1080   stats->print();
1081 }
1082
1083 // generate migrate message from stats->from_proc and to_proc
1084 LBMigrateMsg * CentralLB::createMigrateMsg(LDStats* stats)
1085 {
1086   int i;
1087   CkVec<MigrateInfo*> migrateInfo;
1088   for (i=0; i<stats->n_objs; i++) {
1089     LDObjData &objData = stats->objData[i];
1090     int frompe = stats->from_proc[i];
1091     int tope = stats->to_proc[i];
1092     if (frompe != tope) {
1093       //      CkPrintf("[%d] Obj %d migrating from %d to %d\n",
1094       //         CkMyPe(),obj,pe,dest);
1095       MigrateInfo *migrateMe = new MigrateInfo;
1096       migrateMe->obj = objData.handle;
1097       migrateMe->from_pe = frompe;
1098       migrateMe->to_pe = tope;
1099       migrateMe->async_arrival = objData.asyncArrival;
1100       migrateInfo.insertAtEnd(migrateMe);
1101     }
1102   }
1103
1104   int migrate_count=migrateInfo.length();
1105   LBMigrateMsg* msg = new(migrate_count,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
1106   msg->n_moves = migrate_count;
1107   for(i=0; i < migrate_count; i++) {
1108     MigrateInfo* item = (MigrateInfo*) migrateInfo[i];
1109     msg->moves[i] = *item;
1110     delete item;
1111     migrateInfo[i] = 0;
1112   }
1113   return msg;
1114 }
1115
1116 LBMigrateMsg * CentralLB::extractMigrateMsg(LBMigrateMsg *m, int p)
1117 {
1118   int nmoves = 0;
1119   int nunavail = 0;
1120   int i;
1121   for (i=0; i<m->n_moves; i++) {
1122     MigrateInfo* item = (MigrateInfo*) &m->moves[i];
1123     if (item->from_pe == p || item->to_pe == p) nmoves++;
1124   }
1125   for (i=0; i<CkNumPes();i++) {
1126     if (!m->avail_vector[i]) nunavail++;
1127   }
1128   LBMigrateMsg* msg;
1129   if (nunavail) msg = new(nmoves,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
1130   else msg = new(nmoves,0,0,0) LBMigrateMsg;
1131   msg->n_moves = nmoves;
1132   msg->level = m->level;
1133   msg->next_lb = m->next_lb;
1134   for (i=0,nmoves=0; i<m->n_moves; i++) {
1135     MigrateInfo* item = (MigrateInfo*) &m->moves[i];
1136     if (item->from_pe == p || item->to_pe == p) {
1137       msg->moves[nmoves] = *item;
1138       nmoves++;
1139     }
1140   }
1141   // copy processor data
1142   if (nunavail)
1143   for (i=0; i<CkNumPes();i++) {
1144     msg->avail_vector[i] = m->avail_vector[i];
1145     msg->expectedLoad[i] = m->expectedLoad[i];
1146   }
1147   return msg;
1148 }
1149
1150 void CentralLB::simulationWrite() {
1151   if(step() == LBSimulation::dumpStep)
1152   {
1153     // here we are supposed to dump the database
1154     int dumpFileSize = strlen(LBSimulation::dumpFile) + 4;
1155     char *dumpFileName = (char *)malloc(dumpFileSize);
1156     while (sprintf(dumpFileName, "%s.%d", LBSimulation::dumpFile, LBSimulation::dumpStep) >= dumpFileSize) {
1157       free(dumpFileName);
1158       dumpFileSize+=3;
1159       dumpFileName = (char *)malloc(dumpFileSize);
1160     }
1161     writeStatsMsgs(dumpFileName);
1162     free(dumpFileName);
1163     CmiPrintf("LBDump: Dumped the load balancing data at step %d.\n",LBSimulation::dumpStep);
1164     ++LBSimulation::dumpStep;
1165     --LBSimulation::dumpStepSize;
1166     if (LBSimulation::dumpStepSize <= 0) { // prevent stupid step sizes
1167       CmiPrintf("Charm++> Exiting...\n");
1168       CkExit();
1169     }
1170     return;
1171   }
1172 }
1173
1174 void CentralLB::simulationRead() {
1175   LBSimulation *simResults = NULL, *realResults;
1176   LBMigrateMsg *voidMessage = new (0,0,0,0) LBMigrateMsg();
1177   voidMessage->n_moves=0;
1178   for ( ;LBSimulation::simStepSize > 0; --LBSimulation::simStepSize, ++LBSimulation::simStep) {
1179     // here we are supposed to read the data from the dump database
1180     int simFileSize = strlen(LBSimulation::dumpFile) + 4;
1181     char *simFileName = (char *)malloc(simFileSize);
1182     while (sprintf(simFileName, "%s.%d", LBSimulation::dumpFile, LBSimulation::simStep) >= simFileSize) {
1183       free(simFileName);
1184       simFileSize+=3;
1185       simFileName = (char *)malloc(simFileSize);
1186     }
1187     readStatsMsgs(simFileName);
1188
1189     // allocate simResults (only the first step)
1190     if (simResults == NULL) {
1191       simResults = new LBSimulation(LBSimulation::simProcs);
1192       realResults = new LBSimulation(LBSimulation::simProcs);
1193     }
1194     else {
1195       // should be the same number of procs of the original simulation!
1196       if (!LBSimulation::procsChanged) {
1197         // it means we have a previous step, so in simResults there is data.
1198         // we can now print the real effects of the load balancer during the simulation
1199         // or print the difference between the predicted data and the real one.
1200         realResults->reset();
1201         // reset to_proc of statsData to be equal to from_proc
1202         for (int k=0; k < statsData->n_objs; ++k) statsData->to_proc[k] = statsData->from_proc[k];
1203         findSimResults(statsData, LBSimulation::simProcs, voidMessage, realResults);
1204         simResults->PrintDifferences(realResults,statsData);
1205       }
1206       simResults->reset();
1207     }
1208
1209     // now pass it to the strategy routine
1210     double startT = CkWallTimer();
1211     preprocess(statsData);
1212     CmiPrintf("%s> Strategy starts ... \n", lbname);
1213     LBMigrateMsg* migrateMsg = Strategy(statsData);
1214     CmiPrintf("%s> Strategy took %fs memory usage: CentralLB: %d KB.\n",
1215                lbname, CkWallTimer()-startT, (int)(useMem()/1000));
1216
1217     // now calculate the results of the load balancing simulation
1218     findSimResults(statsData, LBSimulation::simProcs, migrateMsg, simResults);
1219
1220     // now we have the simulation data, so print it and loop
1221     CmiPrintf("Charm++> LBSim: Simulation of load balancing step %d done.\n",LBSimulation::simStep);
1222     // **CWL** Officially recording my disdain here for using ints for bool
1223     if (LBSimulation::showDecisionsOnly) {
1224       simResults->PrintDecisions(migrateMsg, simFileName, 
1225                                  LBSimulation::simProcs);
1226     } else {
1227       simResults->PrintSimulationResults();
1228     }
1229
1230     free(simFileName);
1231     delete migrateMsg;
1232     CmiPrintf("Charm++> LBSim: Passing to the next step\n");
1233   }
1234   // deallocate simResults
1235   delete simResults;
1236   CmiPrintf("Charm++> Exiting...\n");
1237   CkExit();
1238 }
1239
1240 void CentralLB::readStatsMsgs(const char* filename) 
1241 {
1242 #if CMK_LBDB_ON
1243   int i;
1244   FILE *f = fopen(filename, "r");
1245   if (f==NULL) {
1246     CmiPrintf("Fatal Error> Cannot open LB Dump file %s!\n", filename);
1247     CmiAbort("");
1248   }
1249
1250   // at this stage, we need to rebuild the statsMsgList and
1251   // statsDataList structures. For that first deallocate the
1252   // old structures
1253   if (statsMsgsList) {
1254     for(i = 0; i < stats_msg_count; i++)
1255       delete statsMsgsList[i];
1256     delete[] statsMsgsList;
1257     statsMsgsList=0;
1258   }
1259
1260   PUP::fromDisk pd(f);
1261   PUP::machineInfo machInfo;
1262
1263   pd((char *)&machInfo, sizeof(machInfo));      // read machine info
1264   PUP::xlater p(machInfo, pd);
1265
1266   if (_lb_args.lbversion() > 1) {
1267     p|_lb_args.lbversion();             // write version number
1268     CkPrintf("LB> File version detected: %d\n", _lb_args.lbversion());
1269     CmiAssert(_lb_args.lbversion() <= LB_FORMAT_VERSION);
1270   }
1271   p|stats_msg_count;
1272
1273   CmiPrintf("readStatsMsgs for %d pes starts ... \n", stats_msg_count);
1274   if (LBSimulation::simProcs == 0) LBSimulation::simProcs = stats_msg_count;
1275   if (LBSimulation::simProcs != stats_msg_count) LBSimulation::procsChanged = true;
1276
1277   // LBSimulation::simProcs must be set
1278   statsData->pup(p);
1279
1280   CmiPrintf("Simulation for %d pes \n", LBSimulation::simProcs);
1281   CmiPrintf("n_obj: %d n_migratble: %d \n", statsData->n_objs, statsData->n_migrateobjs);
1282
1283   // file f is closed in the destructor of PUP::fromDisk
1284   CmiPrintf("ReadStatsMsg from %s completed\n", filename);
1285 #endif
1286 }
1287
1288 void CentralLB::writeStatsMsgs(const char* filename) 
1289 {
1290 #if CMK_LBDB_ON
1291   FILE *f = fopen(filename, "w");
1292   if (f==NULL) {
1293     CmiPrintf("Fatal Error> writeStatsMsgs failed to open the output file %s!\n", filename);
1294     CmiAbort("");
1295   }
1296
1297   const PUP::machineInfo &machInfo = PUP::machineInfo::current();
1298   PUP::toDisk p(f);
1299   p((char *)&machInfo, sizeof(machInfo));       // machine info
1300
1301   p|_lb_args.lbversion();               // write version number
1302   p|stats_msg_count;
1303   statsData->pup(p);
1304
1305   fclose(f);
1306
1307   CmiPrintf("WriteStatsMsgs to %s succeed!\n", filename);
1308 #endif
1309 }
1310
1311 // calculate the predicted wallclock/cpu load for every processors
1312 // considering communication overhead if considerComm is true
1313 void getPredictedLoadWithMsg(BaseLB::LDStats* stats, int count, 
1314                       LBMigrateMsg *msg, LBInfo &info, 
1315                       int considerComm)
1316 {
1317 #if CMK_LBDB_ON
1318         stats->makeCommHash();
1319
1320         // update to_proc according to migration msgs
1321         for(int i = 0; i < msg->n_moves; i++) {
1322           MigrateInfo &mInfo = msg->moves[i];
1323           int idx = stats->getHash(mInfo.obj.objID(), mInfo.obj.omID());
1324           CmiAssert(idx != -1);
1325           stats->to_proc[idx] = mInfo.to_pe;
1326         }
1327
1328         info.getInfo(stats, count, considerComm);
1329 #endif
1330 }
1331
1332
1333 void CentralLB::findSimResults(LDStats* stats, int count, LBMigrateMsg* msg, LBSimulation* simResults)
1334 {
1335     CkAssert(simResults != NULL && count == simResults->numPes);
1336     // estimate the new loads of the processors. As a first approximation, this is the
1337     // sum of the cpu times of the objects on that processor
1338     double startT = CkWallTimer();
1339     getPredictedLoadWithMsg(stats, count, msg, simResults->lbinfo, 1);
1340     CmiPrintf("getPredictedLoad finished in %fs\n", CkWallTimer()-startT);
1341 }
1342
1343 void CentralLB::pup(PUP::er &p) { 
1344   BaseLB::pup(p); 
1345   if (p.isUnpacking())  {
1346     initLB(CkLBOptions(seqno)); 
1347   }
1348   p|reduction_started;
1349   int has_statsMsg=0;
1350   if (p.isPacking()) has_statsMsg = (statsMsg!=NULL);
1351   p|has_statsMsg;
1352   if (has_statsMsg) {
1353     if (p.isUnpacking())
1354       statsMsg = new CLBStatsMsg;
1355     statsMsg->pup(p);
1356   }
1357 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1358   p | lbDecisionCount;
1359   p | resumeCount;
1360 #endif
1361         
1362 }
1363
1364 int CentralLB::useMem() { 
1365   return sizeof(CentralLB) + statsData->useMem() + 
1366          CkNumPes() * sizeof(CLBStatsMsg *);
1367 }
1368
1369
1370 /**
1371   CLBStatsMsg is not a real message now.
1372   CLBStatsMsg is used for all processors to fill in their local load and comm
1373   statistics and send to processor 0
1374 */
1375
1376 CLBStatsMsg::CLBStatsMsg(int osz, int csz) {
1377   n_objs = osz;
1378   n_comm = csz;
1379   objData = new LDObjData[osz];
1380   commData = new LDCommData[csz];
1381   avail_vector = NULL;
1382 }
1383
1384 CLBStatsMsg::~CLBStatsMsg() {
1385   delete [] objData;
1386   delete [] commData;
1387   delete [] avail_vector;
1388 }
1389
1390 void CLBStatsMsg::pup(PUP::er &p) {
1391   int i;
1392   p|from_pe;
1393   p|pe_speed;
1394   p|total_walltime;
1395   p|idletime;
1396   p|bg_walltime;
1397 #if CMK_LB_CPUTIMER
1398   p|total_cputime;
1399   p|bg_cputime;
1400 #endif
1401 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1402   p | step;
1403 #endif
1404   p|n_objs;
1405   if (p.isUnpacking()) objData = new LDObjData[n_objs];
1406   for (i=0; i<n_objs; i++) p|objData[i];
1407   p|n_comm;
1408   if (p.isUnpacking()) commData = new LDCommData[n_comm];
1409   for (i=0; i<n_comm; i++) p|commData[i];
1410
1411   int has_avail_vector;
1412   if (!p.isUnpacking()) has_avail_vector = (avail_vector != NULL);
1413   p|has_avail_vector;
1414   if (p.isUnpacking()) {
1415     if (has_avail_vector) avail_vector = new char[CkNumPes()];
1416     else avail_vector = NULL;
1417   }
1418   if (has_avail_vector) p(avail_vector, CkNumPes());
1419
1420   p(next_lb);
1421 }
1422
1423 // CkMarshalledCLBStatsMessage is used in the marshalled parameter in
1424 // the entry function, it is just used to use to pup.
1425 // I don't use CLBStatsMsg directly as marshalled parameter because
1426 // I want the data pointer stored and not to be freed by the Charm++.
1427 void CkMarshalledCLBStatsMessage::free() { 
1428   int count = msgs.size();
1429   for  (int i=0; i<count; i++) {
1430     delete msgs[i];
1431     msgs[i] = NULL;
1432   }
1433   msgs.free();
1434 }
1435
1436 void CkMarshalledCLBStatsMessage::add(CkMarshalledCLBStatsMessage &m)
1437 {
1438   int count = m.getCount();
1439   for (int i=0; i<count; i++) add(m.getMessage(i));
1440 }
1441
1442 void CkMarshalledCLBStatsMessage::pup(PUP::er &p)
1443 {
1444   int count = msgs.size();
1445   p|count;
1446   for (int i=0; i<count; i++) {
1447     CLBStatsMsg *msg;
1448     if (p.isUnpacking()) msg = new CLBStatsMsg;
1449     else { 
1450       msg = msgs[i]; CmiAssert(msg!=NULL);
1451     }
1452     msg->pup(p);
1453     if (p.isUnpacking()) add(msg);
1454   }
1455 }
1456
1457 SpanningTree::SpanningTree()
1458 {
1459         double sq = sqrt(CkNumPes()*4.0-3.0) - 1; // 1 + arity + arity*arity = CkNumPes()
1460         arity = (int)ceil(sq/2);
1461         calcParent(CkMyPe());
1462         calcNumChildren(CkMyPe());
1463 }
1464
1465 void SpanningTree::calcParent(int n)
1466 {
1467         parent=-1;
1468         if(n != 0  && arity > 0)
1469                 parent = (n-1)/arity;
1470 }
1471
1472 void SpanningTree::calcNumChildren(int n)
1473 {
1474         numChildren = 0;
1475         if (arity == 0) return;
1476         int fullNode=(CkNumPes()-1-arity)/arity;
1477         if(n <= fullNode)
1478                 numChildren = arity;
1479         if(n == fullNode+1)
1480                 numChildren = CkNumPes()-1-(fullNode+1)*arity;
1481         if(n > fullNode+1)
1482                 numChildren = 0;
1483 }
1484
1485 #include "CentralLB.def.h"
1486  
1487 /*@}*/