aad97b07d08a4645d3638f84090d3b5602bf1624
[charm.git] / src / ck-ldb / CentralLB.C
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 /**
9  * \addtogroup CkLdb
10 */
11 /*@{*/
12
13 #include <charm++.h>
14 #include "envelope.h"
15 #include "CentralLB.h"
16 #include "LBDBManager.h"
17 #include "LBSimulation.h"
18
19 #define  DEBUGF(x)       // CmiPrintf x;
20 #define  DEBUG(x)        // x;
21
22 #if CMK_MEM_CHECKPOINT
23    /* can not handle reduction in inmem FT */
24 #define USE_REDUCTION         0
25 #define USE_LDB_SPANNING_TREE 0
26 #elif defined(_FAULT_MLOG_)
27 /* can not handle reduction in inmem FT */
28 #define USE_REDUCTION         0
29 #define USE_LDB_SPANNING_TREE 0
30 #else
31 #define USE_REDUCTION         1
32 #define USE_LDB_SPANNING_TREE 1
33 #endif
34
35 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
36 extern int _restartFlag;
37 extern void getGlobalStep(CkGroupID );
38 extern void initMlogLBStep(CkGroupID );
39 extern int globalResumeCount;
40 extern void sendDummyMigrationCounts(int *);
41 #endif
42
43 #if CMK_GRID_QUEUE_AVAILABLE
44 CpvExtern(void *, CkGridObject);
45 #endif
46
47 CkGroupID loadbalancer;
48 int * lb_ptr;
49 int load_balancer_created;
50
51 CreateLBFunc_Def(CentralLB, "CentralLB base class")
52
53 static void getPredictedLoadWithMsg(BaseLB::LDStats* stats, int count, 
54                              LBMigrateMsg *, LBInfo &info, int considerComm);
55
56 /*
57 void CreateCentralLB()
58 {
59   CProxy_CentralLB::ckNew(0);
60 }
61 */
62
63 void CentralLB::staticStartLB(void* data)
64 {
65   CentralLB *me = (CentralLB*)(data);
66   me->StartLB();
67 }
68
69 void CentralLB::staticMigrated(void* data, LDObjHandle h, int waitBarrier)
70 {
71   CentralLB *me = (CentralLB*)(data);
72   me->Migrated(h, waitBarrier);
73 }
74
75 void CentralLB::staticAtSync(void* data)
76 {
77   CentralLB *me = (CentralLB*)(data);
78   me->AtSync();
79 }
80
81 void CentralLB::initLB(const CkLBOptions &opt)
82 {
83 #if CMK_LBDB_ON
84   lbname = "CentralLB";
85   thisProxy = CProxy_CentralLB(thisgroup);
86   //  CkPrintf("Construct in %d\n",CkMyPe());
87
88   // create and turn on by default
89   receiver = theLbdb->
90     AddLocalBarrierReceiver((LDBarrierFn)(staticAtSync),(void*)(this));
91   notifier = theLbdb->getLBDB()->
92     NotifyMigrated((LDMigratedFn)(staticMigrated),(void*)(this));
93   startLbFnHdl = theLbdb->getLBDB()->
94     AddStartLBFn((LDStartLBFn)(staticStartLB),(void*)(this));
95
96   // CkPrintf("[%d] CentralLB initLB \n",CkMyPe());
97   if (opt.getSeqNo() > 0) turnOff();
98
99   stats_msg_count = 0;
100   statsMsgsList = new CLBStatsMsg*[CkNumPes()];
101   for(int i=0; i < CkNumPes(); i++)
102     statsMsgsList[i] = 0;
103
104   statsData = new LDStats;
105
106   // for future predictor
107   if (_lb_predict) predicted_model = new FutureModel(_lb_predict_window);
108   else predicted_model=0;
109   // register user interface callbacks
110   theLbdb->getLBDB()->SetupPredictor((LDPredictModelFn)(staticPredictorOn),(LDPredictWindowFn)(staticPredictorOnWin),(LDPredictFn)(staticPredictorOff),(LDPredictModelFn)(staticChangePredictor),(void*)(this));
111
112   myspeed = theLbdb->ProcessorSpeed();
113
114   migrates_completed = 0;
115   future_migrates_completed = 0;
116   migrates_expected = -1;
117   future_migrates_expected = -1;
118   cur_ld_balancer = _lb_args.central_pe();      // 0 default
119   lbdone = 0;
120   count_msgs=0;
121   statsMsg = NULL;
122
123   if (_lb_args.statsOn()) theLbdb->CollectStatsOn();
124
125   load_balancer_created = 1;
126 #endif
127 }
128
129 CentralLB::~CentralLB()
130 {
131 #if CMK_LBDB_ON
132   delete [] statsMsgsList;
133   delete statsData;
134   theLbdb = CProxy_LBDatabase(_lbdb).ckLocalBranch();
135   if (theLbdb) {
136     theLbdb->getLBDB()->
137       RemoveNotifyMigrated(notifier);
138     theLbdb->
139       RemoveStartLBFn((LDStartLBFn)(staticStartLB));
140   }
141 #endif
142 }
143
144 void CentralLB::turnOn() 
145 {
146 #if CMK_LBDB_ON
147   theLbdb->getLBDB()->
148     TurnOnBarrierReceiver(receiver);
149   theLbdb->getLBDB()->
150     TurnOnNotifyMigrated(notifier);
151   theLbdb->getLBDB()->
152     TurnOnStartLBFn(startLbFnHdl);
153 #endif
154 }
155
156 void CentralLB::turnOff() 
157 {
158 #if CMK_LBDB_ON
159   theLbdb->getLBDB()->
160     TurnOffBarrierReceiver(receiver);
161   theLbdb->getLBDB()->
162     TurnOffNotifyMigrated(notifier);
163   theLbdb->getLBDB()->
164     TurnOffStartLBFn(startLbFnHdl);
165 #endif
166 }
167
168 void CentralLB::AtSync()
169 {
170 #if CMK_LBDB_ON
171   DEBUGF(("[%d] CentralLB AtSync step %d!!!!!\n",CkMyPe(),step()));
172
173 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
174         CpvAccess(_currentObj)=this;
175 #endif
176
177   // if num of processor is only 1, nothing should happen
178   if (!QueryBalanceNow(step()) || CkNumPes() == 1) {
179     MigrationDone(0);
180     return;
181   }
182   if(CmiNodeAlive(CkMyPe())){
183     thisProxy [CkMyPe()].ProcessAtSync();
184   }
185 #endif
186 }
187
188 #include "ComlibStrategy.h"
189
190 void CentralLB::ProcessAtSync()
191 {
192
193 #if CMK_LBDB_ON
194   CmiAssert(CmiNodeAlive(CkMyPe()));
195   if (CkMyPe() == cur_ld_balancer) {
196     start_lb_time = CkWallTimer();
197   }
198
199
200 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
201         initMlogLBStep(thisgroup);
202 #endif
203
204   // build message
205   BuildStatsMsg();
206
207 #if USE_REDUCTION
208     // reduction to get total number of objects and comm
209     // so that processor 0 can pre-allocate load balancing database
210   int counts[2];
211   counts[0] = theLbdb->GetObjDataSz();
212   counts[1] = theLbdb->GetCommDataSz();
213
214   CkCallback cb(CkIndex_CentralLB::ReceiveCounts((CkReductionMsg*)NULL), 
215                   thisProxy[0]);
216   contribute(2*sizeof(int), counts, CkReduction::sum_int, cb);
217 #else
218   SendStats();
219 #endif
220 #endif
221 }
222
223 // called only on 0
224 void CentralLB::ReceiveCounts(CkReductionMsg  *msg)
225 {
226   CmiAssert(CkMyPe() == 0);
227
228   int *counts = (int *)msg->getData();
229   int n_objs = counts[0];
230   int n_comm = counts[1];
231
232     // resize database
233   statsData->objData.resize(n_objs);
234   statsData->from_proc.resize(n_objs);
235   statsData->to_proc.resize(n_objs);
236   statsData->commData.resize(n_comm);
237
238   DEBUGF(("[%d] ReceiveCounts: n_objs:%d n_comm:%d\n",CkMyPe(), n_objs, n_comm));
239         
240     // broadcast call to let everybody start to send stats
241   thisProxy.SendStats();
242 }
243
244 void CentralLB::BuildStatsMsg()
245 {
246 #if CMK_LBDB_ON
247   // build and send stats
248   const int osz = theLbdb->GetObjDataSz();
249   const int csz = theLbdb->GetCommDataSz();
250
251   int npes = CkNumPes();
252   CLBStatsMsg* msg = new CLBStatsMsg(osz, csz);
253   _MEMCHECK(msg);
254   msg->from_pe = CkMyPe();
255 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
256         msg->step = step();
257 #endif
258   //msg->serial = CrnRand();
259
260 /*
261   theLbdb->TotalTime(&msg->total_walltime,&msg->total_cputime);
262   theLbdb->IdleTime(&msg->idletime);
263   theLbdb->BackgroundLoad(&msg->bg_walltime,&msg->bg_cputime);
264 */
265   theLbdb->GetTime(&msg->total_walltime,&msg->total_cputime,
266                    &msg->idletime, &msg->bg_walltime,&msg->bg_cputime);
267   msg->pe_speed = myspeed;
268   DEBUGF(("Processor %d Total time (wall,cpu) = %f %f Idle = %f Bg = %f %f\n", CkMyPe(),msg->total_walltime,msg->total_cputime,msg->idletime,msg->bg_walltime,msg->bg_cputime));
269
270   msg->n_objs = osz;
271   theLbdb->GetObjData(msg->objData);
272   msg->n_comm = csz;
273   theLbdb->GetCommData(msg->commData);
274 //  theLbdb->ClearLoads();
275   DEBUGF(("PE %d BuildStatsMsg %d objs, %d comm\n",CkMyPe(),msg->n_objs,msg->n_comm));
276
277   if(CkMyPe() == cur_ld_balancer) {
278     msg->avail_vector = new char[CkNumPes()];
279     LBDatabaseObj()->get_avail_vector(msg->avail_vector);
280     msg->next_lb = LBDatabaseObj()->new_lbbalancer();
281   }
282
283   CmiAssert(statsMsg == NULL);
284   statsMsg = msg;
285 #endif
286 }
287
288 // called on every processor
289 void CentralLB::SendStats()
290 {
291 #if CMK_LBDB_ON
292   CmiAssert(statsMsg != NULL);
293
294 #if USE_LDB_SPANNING_TREE
295   if(CkNumPes()>1024)
296   {
297     if (CkMyPe() == cur_ld_balancer)
298       thisProxy[CkMyPe()].ReceiveStats(statsMsg);
299     else
300       thisProxy[CkMyPe()].ReceiveStatsViaTree(statsMsg);
301   }
302   else
303 #endif
304   {
305     DEBUGF(("[%d] calling ReceiveStats on step %d \n",CmiMyPe(),step()));
306     thisProxy[cur_ld_balancer].ReceiveStats(statsMsg);
307   }
308
309   statsMsg = NULL;
310
311 #ifdef __BLUEGENE__
312   BgEndStreaming();
313 #endif
314
315   {
316   // enfore the barrier to wait until centralLB says no
317   LDOMHandle h;
318   h.id.id.idx = 0;
319   theLbdb->getLBDB()->RegisteringObjects(h);
320   }
321 #endif
322 }
323
324 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
325 extern int donotCountMigration;
326 #endif
327
328 void CentralLB::Migrated(LDObjHandle h, int waitBarrier)
329 {
330 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
331     if(donotCountMigration){
332         return ;
333     }
334 #endif
335
336 #if CMK_LBDB_ON
337   if (waitBarrier) {
338             migrates_completed++;
339       DEBUGF(("[%d] An object migrated! %d %d\n",CkMyPe(),migrates_completed,migrates_expected));
340     if (migrates_completed == migrates_expected) {
341       MigrationDone(1);
342     }
343   }
344   else {
345     future_migrates_completed ++;
346     DEBUGF(("[%d] An object migrated with no barrier! %d expected: %d\n",CkMyPe(),future_migrates_completed,future_migrates_expected));
347     if (future_migrates_completed == future_migrates_expected)  {
348         CheckMigrationComplete();
349     }
350   }
351 #endif
352 }
353
354 void CentralLB::MissMigrate(int waitForBarrier)
355 {
356   LDObjHandle h;
357   Migrated(h, waitForBarrier);
358 }
359
360 // build a complete data from bufferred messages
361 // not used when USE_REDUCTION = 1
362 void CentralLB::buildStats()
363 {
364     statsData->count = stats_msg_count;
365     // allocate space
366     statsData->objData.resize(statsData->n_objs);
367     statsData->from_proc.resize(statsData->n_objs);
368     statsData->to_proc.resize(statsData->n_objs);
369     statsData->commData.resize(statsData->n_comm);
370
371     int nobj = 0;
372     int ncom = 0;
373     int nmigobj = 0;
374     // copy all data in individule message to this big structure
375     for (int pe=0; pe<CkNumPes(); pe++) {
376        int i;
377        CLBStatsMsg *msg = statsMsgsList[pe];
378        if(msg == NULL) continue;
379        for (i=0; i<msg->n_objs; i++) {
380          statsData->from_proc[nobj] = statsData->to_proc[nobj] = pe;
381          statsData->objData[nobj] = msg->objData[i];
382          if (msg->objData[i].migratable) nmigobj++;
383          nobj++;
384        }
385        for (i=0; i<msg->n_comm; i++) {
386          statsData->commData[ncom] = msg->commData[i];
387          ncom++;
388        }
389        // free the memory
390        delete msg;
391        statsMsgsList[pe]=0;
392     }
393     statsData->n_migrateobjs = nmigobj;
394 }
395
396 // deposit one processor data at a time, note database is pre-allocated
397 // to have enough space
398 // used when USE_REDUCTION = 1
399 void CentralLB::depositData(CLBStatsMsg *m)
400 {
401   int i;
402   if (m == NULL) return;
403
404   const int pe = m->from_pe;
405   struct ProcStats &procStat = statsData->procs[pe];
406   procStat.pe = pe;
407   procStat.total_walltime = m->total_walltime;
408   procStat.total_cputime = m->total_cputime;
409   procStat.idletime = m->idletime;
410   procStat.bg_walltime = m->bg_walltime;
411   procStat.bg_cputime = m->bg_cputime;
412   procStat.pe_speed = m->pe_speed;
413   //procStat.utilization = 1.0;
414   procStat.available = CmiTrue;
415   procStat.n_objs = m->n_objs;
416
417   int &nobj = statsData->n_objs;
418   int &nmigobj = statsData->n_migrateobjs;
419   for (i=0; i<m->n_objs; i++) {
420       statsData->from_proc[nobj] = statsData->to_proc[nobj] = pe;
421       statsData->objData[nobj] = m->objData[i];
422       if (m->objData[i].migratable) nmigobj++;
423       nobj++;
424       CmiAssert(nobj <= statsData->objData.capacity());
425   }
426   int &n_comm = statsData->n_comm;
427   for (i=0; i<m->n_comm; i++) {
428       statsData->commData[n_comm] = m->commData[i];
429       n_comm++;
430       CmiAssert(n_comm <= statsData->commData.capacity());
431   }
432   delete m;
433 }
434
435 void CentralLB::ReceiveStats(CkMarshalledCLBStatsMessage &msg)
436 {
437 #if CMK_LBDB_ON
438     //  loop through all CLBStatsMsg in the incoming msg
439   int count = msg.getCount();
440   for (int num = 0; num < count; num++) 
441   {
442     CLBStatsMsg *m = msg.getMessage(num);
443     CmiAssert(m!=NULL);
444     const int pe = m->from_pe;
445     DEBUGF(("Stats msg received, %d %d %d %p step %d\n", pe,stats_msg_count,m->n_objs,m,step()));
446 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))     
447 /*      
448  *  if(m->step < step()){
449  *    //TODO: if a processor is redoing an old load balance step..
450  *    //tell it that the step is done and that it should not perform any migrations
451  *      thisProxy[pe].ReceiveDummyMigration();
452  *  }*/
453 #endif
454         
455     if(!CmiNodeAlive(pe)){
456         DEBUGF(("[%d] ReceiveStats called from invalidProcessor %d\n",CkMyPe(),pe));
457         continue;
458     }
459         
460     if (m->avail_vector!=NULL) {
461       LBDatabaseObj()->set_avail_vector(m->avail_vector,  m->next_lb);
462     }
463
464     if (statsMsgsList[pe] != 0) {
465       CkPrintf("*** Unexpected CLBStatsMsg in ReceiveStats from PE %d ***\n",
466              pe);
467     } else {
468       statsMsgsList[pe] = m;
469 #if USE_REDUCTION
470       depositData(m);
471 #else
472       // store per processor data right away
473       struct ProcStats &procStat = statsData->procs[pe];
474       procStat.pe = pe;
475       procStat.total_walltime = m->total_walltime;
476       procStat.total_cputime = m->total_cputime;
477       procStat.idletime = m->idletime;
478       procStat.bg_walltime = m->bg_walltime;
479       procStat.bg_cputime = m->bg_cputime;
480       procStat.pe_speed = m->pe_speed;
481       //procStat.utilization = 1.0;
482       procStat.available = CmiTrue;
483       procStat.n_objs = m->n_objs;
484
485       statsData->n_objs += m->n_objs;
486       statsData->n_comm += m->n_comm;
487 #endif
488       stats_msg_count++;
489     }
490   }    // end of for
491
492   const int clients = CkNumValidPes();
493   DEBUGF(("THIS POINT count = %d, clients = %d\n",stats_msg_count,clients));
494  
495   if (stats_msg_count == clients) {
496         DEBUGF(("[%d] All stats messages received \n",CmiMyPe()));
497     statsData->count = stats_msg_count;
498     thisProxy[CkMyPe()].LoadBalance();
499   }
500 #endif
501 }
502
503 /** added by Abhinav for receiving msgs via spanning tree */
504 void CentralLB::ReceiveStatsViaTree(CkMarshalledCLBStatsMessage &msg)
505 {
506 #if CMK_LBDB_ON
507         CmiAssert(CkMyPe() != 0);
508         bufMsg.add(msg);         // buffer messages
509         count_msgs++;
510         //CkPrintf("here %d\n", CkMyPe());
511         if (count_msgs == st.numChildren+1) {
512                 if(st.parent == 0)
513                 {
514                         thisProxy[0].ReceiveStats(bufMsg);
515                         //CkPrintf("from %d\n", CkMyPe());
516                 }
517                 else
518                         thisProxy[st.parent].ReceiveStatsViaTree(bufMsg);
519                 count_msgs = 0;
520                 bufMsg.free();
521         } 
522 #endif
523 }
524
525 void CentralLB::LoadBalance()
526 {
527 #if CMK_LBDB_ON
528   int proc;
529   const int clients = CkNumPes();
530
531 #if ! USE_REDUCTION
532   // build data
533   buildStats();
534 #else
535   for (proc = 0; proc < clients; proc++) statsMsgsList[proc] = NULL;
536 #endif
537
538   if (_lb_args.debug()) 
539       CmiPrintf("[%s] Load balancing step %d starting at %f in PE%d Memory:%fMB\n",
540                  lbName(), step(),start_lb_time, cur_ld_balancer, CmiMemoryUsage()/(1024.0*1024.0));
541  //     CmiPrintf("[%d] n_obj:%d migratable:%d n_comm:%d\n", CkMyPe(), statsData->n_objs, statsData->n_migrateobjs, statsData->n_comm);
542 //    double strat_start_time = CkWallTimer();
543
544   // if we are in simulation mode read data
545   if (LBSimulation::doSimulation) simulationRead();
546
547   char *availVector = LBDatabaseObj()->availVector();
548   for(proc = 0; proc < clients; proc++)
549       statsData->procs[proc].available = (CmiBool)availVector[proc];
550
551   preprocess(statsData, clients);
552
553 //    CkPrintf("Before Calling Strategy\n");
554
555   if (_lb_args.printSummary()) {
556       LBInfo info(clients);
557         // not take comm data
558       info.getInfo(statsData, clients, 0);
559       double mLoad, mCpuLoad, totalLoad;
560       info.getSummary(mLoad, mCpuLoad, totalLoad);
561       int nmsgs, nbytes;
562       statsData->computeNonlocalComm(nmsgs, nbytes);
563       CkPrintf("[%d] Load Summary (before LB): max (with bg load): %f max (obj only): %f average: %f at step %d nonlocal: %d msgs %.2fKB.\n", CkMyPe(), mLoad, mCpuLoad, totalLoad/clients, step(), nmsgs, 1.0*nbytes/1024);
564 //      if (_lb_args.debug() > 1) {
565 //        for (int i=0; i<statsData->n_objs; i++)
566 //          CmiPrintf("[%d] %.10f %.10f\n", i, statsData->objData[i].minWall, statsData->objData[i].maxWall);
567 //      }
568   }
569
570   double strat_start_time = CkWallTimer();
571   LBMigrateMsg* migrateMsg = Strategy(statsData, clients);
572 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
573         migrateMsg->step = step();
574 #endif
575   if (_lb_args.debug()) {
576     CkPrintf("Strategy took %f seconds.\n", CkWallTimer()-strat_start_time);
577     double lbdbMemsize = LBDatabase::Object()->useMem()/1000;
578     CkPrintf("[%s] memUsage: LBManager:%dKB CentralLB:%dKB\n", 
579               lbName(), (int)lbdbMemsize, (int)(useMem()/1000));
580   }
581
582 //    CkPrintf("returned successfully\n");
583
584   LBDatabaseObj()->get_avail_vector(migrateMsg->avail_vector);
585   migrateMsg->next_lb = LBDatabaseObj()->new_lbbalancer();
586
587   // if this is the step at which we need to dump the database
588   simulationWrite();
589
590 //  calculate predicted load
591 //  very time consuming though, so only happen when debugging is on
592   if (_lb_args.printSummary()) {
593       LBInfo info(clients);
594         // not take comm data
595       getPredictedLoadWithMsg(statsData, clients, migrateMsg, info, 0);
596       double mLoad, mCpuLoad, totalLoad;
597       info.getSummary(mLoad, mCpuLoad, totalLoad);
598       int nmsgs, nbytes;
599       statsData->computeNonlocalComm(nmsgs, nbytes);
600       CkPrintf("[%d] Load Summary (after LB): max (with bg load): %f max (obj only): %f average: %f at step %d nonlocal: %d msgs %.2fKB useMem: %.2fKB.\n", CkMyPe(), mLoad, mCpuLoad, totalLoad/clients, step(), nmsgs, 1.0*nbytes/1024, (1.0*useMem())/1024);
601       for (int i=0; i<clients; i++)
602         migrateMsg->expectedLoad[i] = info.peLoads[i];
603   }
604
605   DEBUGF(("[%d]calling recv migration\n",CkMyPe()));
606 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_)) 
607     lbDecisionCount++;
608     migrateMsg->lbDecisionCount = lbDecisionCount;
609 #endif
610   thisProxy.ReceiveMigration(migrateMsg);
611
612   // Zero out data structures for next cycle
613   // CkPrintf("zeroing out data\n");
614   statsData->clear();
615   stats_msg_count=0;
616 #endif
617 }
618
619 //    double strat_end_time = CkWallTimer();
620 //    CkPrintf("Strat elapsed time %f\n",strat_end_time-strat_start_time);
621 // test if sender and receiver in a commData is nonmigratable.
622 static int isMigratable(LDObjData **objData, int *len, int count, const LDCommData &commData)
623 {
624 #if CMK_LBDB_ON
625   for (int pe=0 ; pe<count; pe++)
626   {
627     for (int i=0; i<len[pe]; i++)
628       if (LDObjIDEqual(objData[pe][i].objID(), commData.sender.objID()) ||
629           LDObjIDEqual(objData[pe][i].objID(), commData.receiver.get_destObj().objID())) 
630       return 0;
631   }
632 #endif
633   return 1;
634 }
635
636 // rebuild LDStats and remove all non-migratble objects and related things
637 void CentralLB::removeNonMigratable(LDStats* stats, int count)
638 {
639   int i;
640
641   // check if we have non-migratable objects
642   int have = 0;
643   for (i=0; i<stats->n_objs; i++) 
644   {
645     LDObjData &odata = stats->objData[i];
646     if (!odata.migratable) {
647       have = 1; break;
648     }
649   }
650   if (have == 0) return;
651
652   CkVec<LDObjData> nonmig;
653   CkVec<int> new_from_proc, new_to_proc;
654   nonmig.resize(stats->n_migrateobjs);
655   new_from_proc.resize(stats->n_migrateobjs);
656   new_to_proc.resize(stats->n_migrateobjs);
657   int n_objs = 0;
658   for (i=0; i<stats->n_objs; i++) 
659   {
660     LDObjData &odata = stats->objData[i];
661     if (odata.migratable) {
662       nonmig[n_objs] = odata;
663       new_from_proc[n_objs] = stats->from_proc[i];
664       new_to_proc[n_objs] = stats->to_proc[i];
665       n_objs ++;
666     }
667     else {
668       stats->procs[stats->from_proc[i]].bg_walltime += odata.wallTime;
669       stats->procs[stats->from_proc[i]].bg_cputime += odata.cpuTime;
670     }
671   }
672   CmiAssert(stats->n_migrateobjs == n_objs);
673
674   stats->makeCommHash();
675   
676   CkVec<LDCommData> newCommData;
677   newCommData.resize(stats->n_comm);
678   int n_comm = 0;
679   for (i=0; i<stats->n_comm; i++) 
680   {
681     LDCommData& cdata = stats->commData[i];
682     if (!cdata.from_proc()) 
683     {
684       int idx = stats->getSendHash(cdata);
685       CmiAssert(idx != -1);
686       if (!stats->objData[idx].migratable) continue;
687     }
688     switch (cdata.receiver.get_type()) {
689     case LD_PROC_MSG:
690       break;
691     case LD_OBJ_MSG:  {
692       int idx = stats->getRecvHash(cdata);
693       if (stats->complete_flag)
694         CmiAssert(idx != -1);
695       else if (idx == -1) continue;          // receiver not in this group
696       if (!stats->objData[idx].migratable) continue;
697       break;
698       }
699     case LD_OBJLIST_MSG:    // object message FIXME add multicast
700       break;
701     }
702     newCommData[n_comm] = cdata;
703     n_comm ++;
704   }
705
706   if (n_objs != stats->n_objs) CmiPrintf("Removed %d nonmigratable %d comms - n_objs:%d migratable:%d\n", stats->n_objs-n_objs, stats->n_objs, stats->n_migrateobjs, stats->n_comm-n_comm);
707
708   // swap to new data
709   stats->objData = nonmig;
710   stats->from_proc = new_from_proc;
711   stats->to_proc = new_to_proc;
712   stats->n_objs = n_objs;
713
714   stats->commData = newCommData;
715   stats->n_comm = n_comm;
716
717   stats->deleteCommHash();
718   stats->makeCommHash();
719
720 }
721
722
723 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
724 extern int restarted;
725 #endif
726
727 void CentralLB::ReceiveMigration(LBMigrateMsg *m)
728 {
729 #if CMK_LBDB_ON
730         int i;
731
732 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
733         int *dummyCounts;
734
735         DEBUGF(("[%d] Starting ReceiveMigration WITH step %d m->step %d\n",CkMyPe(),step(),m->step));
736         // CmiPrintf("[%d] Starting ReceiveMigration step %d m->step %d\n",CkMyPe(),step(),m->step);
737         if(step() > m->step){
738                 char str[100];
739                 envelope *env = UsrToEnv(m);
740                 CmiPrintf("[%d] Object %s tProcessed %d m->TN %d\n",CmiMyPe(),mlogData->objID.toString(str),mlogData->tProcessed,env->TN);
741                 return;
742         }
743         lbDecisionCount = m->lbDecisionCount;
744 #endif
745
746   for (i=0; i<CkNumPes(); i++) theLbdb->lastLBInfo.expectedLoad[i] = m->expectedLoad[i];
747   CmiAssert(migrates_expected <= 0 || migrates_completed == migrates_expected);
748 /*FAULT_EVAC*/
749   if(!CmiNodeAlive(CkMyPe())){
750         delete m;
751         return;
752   }
753   migrates_expected = 0;
754   future_migrates_expected = 0;
755 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
756         int sending=0;
757     int dummy=0;
758         LBDB *_myLBDB = theLbdb->getLBDB();
759         if(_restartFlag){
760         dummyCounts = new int[CmiNumPes()];
761         bzero(dummyCounts,sizeof(int)*CmiNumPes());
762     }
763 #endif
764   for(i=0; i < m->n_moves; i++) {
765     MigrateInfo& move = m->moves[i];
766     const int me = CkMyPe();
767     if (move.from_pe == me && move.to_pe != me) {
768       DEBUGF(("[%d] migrating object to %d\n",move.from_pe,move.to_pe));
769       // migrate object, in case it is already gone, inform toPe
770 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
771       if (theLbdb->Migrate(move.obj,move.to_pe) == 0) 
772          thisProxy[move.to_pe].MissMigrate(!move.async_arrival);
773 #else
774             if(_restartFlag == 0){
775                 DEBUG(CmiPrintf("[%d] need to move object from %d to %d \n",CkMyPe(),move.from_pe,move.to_pe));
776                 theLbdb->Migrate(move.obj,move.to_pe);
777                 sending++;
778             }else{
779                 if(_myLBDB->validObjHandle(move.obj)){
780                     DEBUG(CmiPrintf("[%d] need to move object from %d to %d \n",CkMyPe(),move.from_pe,move.to_pe));
781                     theLbdb->Migrate(move.obj,move.to_pe);
782                     sending++;
783                 }else{
784                     DEBUG(CmiPrintf("[%d] dummy move to pe %d detected after restart \n",CmiMyPe(),move.to_pe));
785                     dummyCounts[move.to_pe]++;
786                     dummy++;
787                 }
788             }
789 #endif
790     } else if (move.from_pe != me && move.to_pe == me) {
791        DEBUGF(("[%d] expecting object from %d\n",move.to_pe,move.from_pe));
792       if (!move.async_arrival) migrates_expected++;
793       else future_migrates_expected++;
794     }
795   }
796   DEBUGF(("[%d] in ReceiveMigration %d moves expected: %d future expected: %d\n",CkMyPe(),m->n_moves, migrates_expected, future_migrates_expected));
797   // if (_lb_debug) CkPrintf("[%d] expecting %d objects migrating.\n", CkMyPe(), migrates_expected);
798
799 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
800         if(_restartFlag){
801                 sendDummyMigrationCounts(dummyCounts);
802                 _restartFlag  =0;
803         delete []dummyCounts;
804         }
805 #endif
806
807
808 #if 0
809   if (m->n_moves ==0) {
810     theLbdb->SetLBPeriod(theLbdb->GetLBPeriod()*2);
811   }
812 #endif
813   cur_ld_balancer = m->next_lb;
814   if((CkMyPe() == cur_ld_balancer) && (cur_ld_balancer != 0)){
815       LBDatabaseObj()->set_avail_vector(m->avail_vector, -2);
816   }
817
818   if (migrates_expected == 0 || migrates_completed == migrates_expected)
819     MigrationDone(1);
820   delete m;
821
822 //      CkEvacuatedElement();
823 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
824 //  migrates_expected = 0;
825 //  //  ResumeClients(1);
826 #endif
827 #endif
828 }
829
830 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
831 void CentralLB::ReceiveDummyMigration(int globalDecisionCount){
832     DEBUGF(("[%d] ReceiveDummyMigration called for step %d with globalDecisionCount %d\n",CkMyPe(),step(),globalDecisionCount));
833     //TODO: this is gonna be important when a crash happens during checkpoint
834     //the globalDecisionCount would have to be saved and compared against
835     //a future recvMigration
836                 
837         thisProxy[CkMyPe()].ResumeClients(1);
838 }
839 #endif
840
841 void CentralLB::MigrationDone(int balancing)
842 {
843 #if CMK_LBDB_ON
844   migrates_completed = 0;
845   migrates_expected = -1;
846   // clear load stats
847   if (balancing) theLbdb->ClearLoads();
848   // Increment to next step
849   theLbdb->incStep();
850         DEBUGF(("[%d] Incrementing Step %d \n",CkMyPe(),step()));
851   // if sync resume, invoke a barrier
852
853 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
854     savedBalancing = balancing;
855     startLoadBalancingMlog(&resumeCentralLbAfterChkpt,(void *)this);
856 #endif
857
858   LoadbalanceDone(balancing);        // callback
859 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
860   // if sync resume invoke a barrier
861   if (balancing && _lb_args.syncResume()) {
862     CkCallback cb(CkIndex_CentralLB::ResumeClients((CkReductionMsg*)NULL), 
863                   thisProxy);
864     contribute(0, NULL, CkReduction::sum_int, cb);
865   }
866   else{ 
867     if(CmiNodeAlive(CkMyPe())){
868         thisProxy [CkMyPe()].ResumeClients(balancing);
869     }   
870   }     
871 #if CMK_GRID_QUEUE_AVAILABLE
872   CmiGridQueueDeregisterAll ();
873   CpvAccess(CkGridObject) = NULL;
874 #endif
875 #endif 
876 #endif
877 }
878
879 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
880 void CentralLB::endMigrationDone(int balancing){
881     DEBUGF(("[%d] CentralLB::endMigrationDone step %d\n",CkMyPe(),step()));
882
883
884   if (balancing && _lb_args.syncResume()) {
885     CkCallback cb(CkIndex_CentralLB::ResumeClients((CkReductionMsg*)NULL),
886                   thisProxy);
887     contribute(0, NULL, CkReduction::sum_int, cb);
888   }
889   else{
890     if(CmiNodeAlive(CkMyPe())){
891     DEBUGF(("[%d] Sending ResumeClients balancing %d \n",CkMyPe(),balancing));
892     thisProxy [CkMyPe()].ResumeClients(balancing);
893     }
894   }
895
896 }
897 #endif
898
899 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
900 void resumeCentralLbAfterChkpt(void *_lb){
901     CentralLB *lb= (CentralLB *)_lb;
902     CpvAccess(_currentObj)=lb;
903     lb->endMigrationDone(lb->savedBalancing);
904 }
905 #endif
906
907
908 void CentralLB::ResumeClients(CkReductionMsg *msg)
909 {
910   ResumeClients(1);
911   delete msg;
912 }
913
914 void CentralLB::ResumeClients(int balancing)
915 {
916 #if CMK_LBDB_ON
917 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
918     resumeCount++;
919     globalResumeCount = resumeCount;
920 #endif
921   DEBUGF(("[%d] Resuming clients. balancing:%d.\n",CkMyPe(),balancing));
922   if (balancing && _lb_args.debug() && CkMyPe() == cur_ld_balancer) {
923     double end_lb_time = CkWallTimer();
924   }
925
926 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
927   if (balancing) ComlibNotifyMigrationDone();  
928 #endif
929
930   theLbdb->ResumeClients();
931   if (balancing)  {
932     CheckMigrationComplete();
933     if (future_migrates_expected == 0 || 
934             future_migrates_expected == future_migrates_completed) {
935       CheckMigrationComplete();
936     }
937   }
938 #endif
939 }
940
941 /*
942   migration of objects contains two different kinds:
943   (1) objects want to make a barrier for migration completion
944       (waitForBarrier is true)
945       migrationDone() to finish and resumeClients
946   (2) objects don't need a barrier
947   However, next load balancing can only happen when both migrations complete
948 */ 
949 void CentralLB::CheckMigrationComplete()
950 {
951 #if CMK_LBDB_ON
952   lbdone ++;
953   if (lbdone == 2) {
954     if (_lb_args.debug() && CkMyPe()==0) {
955       double end_lb_time = CkWallTimer();
956       CkPrintf("[%s] Load balancing step %d finished at %f duration %f\n",
957                 lbName(), step()-1,end_lb_time,end_lb_time - start_lb_time);
958     }
959     lbdone = 0;
960     future_migrates_expected = -1;
961     future_migrates_completed = 0;
962     DEBUGF(("[%d] Migration Complete\n", CkMyPe()));
963     // release local barrier  so that the next load balancer can go
964     LDOMHandle h;
965     h.id.id.idx = 0;
966     theLbdb->getLBDB()->DoneRegisteringObjects(h);
967     // switch to the next load balancer in the list
968     // subtle: called from Migrated() may result in Migrated() called in next LB
969     theLbdb->nextLoadbalancer(seqno);
970   }
971 #endif
972 }
973
974 void CentralLB::preprocess(LDStats* stats,int count)
975 {
976   if (_lb_args.ignoreBgLoad())
977     stats->clearBgLoad();
978
979   // Call the predictor for the future
980   if (_lb_predict) FuturePredictor(statsData);
981 }
982
983 // default load balancing strategy
984 LBMigrateMsg* CentralLB::Strategy(LDStats* stats,int count)
985 {
986 #if CMK_LBDB_ON
987   work(stats, count);
988
989   if (_lb_args.debug()>1)  {
990     CkPrintf("Obj Map:\n");
991     for (int i=0; i<stats->n_objs; i++) CkPrintf("%d ", stats->to_proc[i]);
992     CkPrintf("\n");
993   }
994
995   return createMigrateMsg(stats, count);
996 #else
997   return NULL;
998 #endif
999 }
1000
1001 void CentralLB::work(LDStats* stats,int count)
1002 {
1003   // does nothing but print the database
1004   stats->print();
1005 }
1006
1007 // generate migrate message from stats->from_proc and to_proc
1008 LBMigrateMsg * CentralLB::createMigrateMsg(LDStats* stats,int count)
1009 {
1010   int i;
1011   CkVec<MigrateInfo*> migrateInfo;
1012   for (i=0; i<stats->n_objs; i++) {
1013     LDObjData &objData = stats->objData[i];
1014     int frompe = stats->from_proc[i];
1015     int tope = stats->to_proc[i];
1016     if (frompe != tope) {
1017       //      CkPrintf("[%d] Obj %d migrating from %d to %d\n",
1018       //         CkMyPe(),obj,pe,dest);
1019       MigrateInfo *migrateMe = new MigrateInfo;
1020       migrateMe->obj = objData.handle;
1021       migrateMe->from_pe = frompe;
1022       migrateMe->to_pe = tope;
1023       migrateMe->async_arrival = objData.asyncArrival;
1024       migrateInfo.insertAtEnd(migrateMe);
1025     }
1026   }
1027
1028   int migrate_count=migrateInfo.length();
1029   LBMigrateMsg* msg = new(migrate_count,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
1030   msg->n_moves = migrate_count;
1031   for(i=0; i < migrate_count; i++) {
1032     MigrateInfo* item = (MigrateInfo*) migrateInfo[i];
1033     msg->moves[i] = *item;
1034     delete item;
1035     migrateInfo[i] = 0;
1036   }
1037   if (_lb_args.debug())
1038     CkPrintf("%s: %d objects migrating.\n", lbname, migrate_count);
1039   return msg;
1040 }
1041
1042 void CentralLB::simulationWrite() {
1043   if(step() == LBSimulation::dumpStep)
1044   {
1045     // here we are supposed to dump the database
1046     int dumpFileSize = strlen(LBSimulation::dumpFile) + 4;
1047     char *dumpFileName = (char *)malloc(dumpFileSize);
1048     while (sprintf(dumpFileName, "%s.%d", LBSimulation::dumpFile, LBSimulation::dumpStep) >= dumpFileSize) {
1049       free(dumpFileName);
1050       dumpFileSize+=3;
1051       dumpFileName = (char *)malloc(dumpFileSize);
1052     }
1053     writeStatsMsgs(dumpFileName);
1054     free(dumpFileName);
1055     CmiPrintf("LBDump: Dumped the load balancing data at step %d.\n",LBSimulation::dumpStep);
1056     ++LBSimulation::dumpStep;
1057     --LBSimulation::dumpStepSize;
1058     if (LBSimulation::dumpStepSize <= 0) { // prevent stupid step sizes
1059       CmiPrintf("Charm++> Exiting...\n");
1060       CkExit();
1061     }
1062     return;
1063   }
1064 }
1065
1066 void CentralLB::simulationRead() {
1067   LBSimulation *simResults = NULL, *realResults;
1068   LBMigrateMsg *voidMessage = new (0,0,0,0) LBMigrateMsg();
1069   voidMessage->n_moves=0;
1070   for ( ;LBSimulation::simStepSize > 0; --LBSimulation::simStepSize, ++LBSimulation::simStep) {
1071     // here we are supposed to read the data from the dump database
1072     int simFileSize = strlen(LBSimulation::dumpFile) + 4;
1073     char *simFileName = (char *)malloc(simFileSize);
1074     while (sprintf(simFileName, "%s.%d", LBSimulation::dumpFile, LBSimulation::simStep) >= simFileSize) {
1075       free(simFileName);
1076       simFileSize+=3;
1077       simFileName = (char *)malloc(simFileSize);
1078     }
1079     readStatsMsgs(simFileName);
1080
1081     // allocate simResults (only the first step)
1082     if (simResults == NULL) {
1083       simResults = new LBSimulation(LBSimulation::simProcs);
1084       realResults = new LBSimulation(LBSimulation::simProcs);
1085     }
1086     else {
1087       // should be the same number of procs of the original simulation!
1088       if (!LBSimulation::procsChanged) {
1089         // it means we have a previous step, so in simResults there is data.
1090         // we can now print the real effects of the load balancer during the simulation
1091         // or print the difference between the predicted data and the real one.
1092         realResults->reset();
1093         // reset to_proc of statsData to be equal to from_proc
1094         for (int k=0; k < statsData->n_objs; ++k) statsData->to_proc[k] = statsData->from_proc[k];
1095         findSimResults(statsData, LBSimulation::simProcs, voidMessage, realResults);
1096         simResults->PrintDifferences(realResults,statsData);
1097       }
1098       simResults->reset();
1099     }
1100
1101     // now pass it to the strategy routine
1102     double startT = CkWallTimer();
1103     preprocess(statsData, LBSimulation::simProcs);
1104     CmiPrintf("%s> Strategy starts ... \n", lbname);
1105     LBMigrateMsg* migrateMsg = Strategy(statsData, LBSimulation::simProcs);
1106     CmiPrintf("%s> Strategy took %fs memory usage: CentralLB:%dKB. \n", 
1107                lbname, CkWallTimer()-startT, (int)(useMem()/1000));
1108
1109     // now calculate the results of the load balancing simulation
1110     findSimResults(statsData, LBSimulation::simProcs, migrateMsg, simResults);
1111
1112     // now we have the simulation data, so print it and loop
1113     CmiPrintf("Charm++> LBSim: Simulation of load balancing step %d done.\n",LBSimulation::simStep);
1114     // **CWL** Officially recording my disdain here for using ints for bool
1115     if (LBSimulation::showDecisionsOnly) {
1116       simResults->PrintDecisions(migrateMsg, simFileName, 
1117                                  LBSimulation::simProcs);
1118     } else {
1119       simResults->PrintSimulationResults();
1120     }
1121
1122     free(simFileName);
1123     delete migrateMsg;
1124     CmiPrintf("Charm++> LBSim: Passing to the next step\n");
1125   }
1126   // deallocate simResults
1127   delete simResults;
1128   CmiPrintf("Charm++> Exiting...\n");
1129   CkExit();
1130 }
1131
1132 void CentralLB::readStatsMsgs(const char* filename) 
1133 {
1134 #if CMK_LBDB_ON
1135   int i;
1136   FILE *f = fopen(filename, "r");
1137   if (f==NULL) {
1138     CmiPrintf("Fatal Error> Cannot open LB Dump file %s!\n", filename);
1139     CmiAbort("");
1140   }
1141
1142   // at this stage, we need to rebuild the statsMsgList and
1143   // statsDataList structures. For that first deallocate the
1144   // old structures
1145   if (statsMsgsList) {
1146     for(i = 0; i < stats_msg_count; i++)
1147       delete statsMsgsList[i];
1148     delete[] statsMsgsList;
1149     statsMsgsList=0;
1150   }
1151
1152   PUP::fromDisk pd(f);
1153   PUP::machineInfo machInfo;
1154
1155   pd((char *)&machInfo, sizeof(machInfo));      // read machine info
1156   PUP::xlater p(machInfo, pd);
1157
1158   if (_lb_args.lbversion() > 1) {
1159     p|_lb_args.lbversion();             // write version number
1160     CkPrintf("LB> File version detected: %d\n", _lb_args.lbversion());
1161     CmiAssert(_lb_args.lbversion() <= LB_FORMAT_VERSION);
1162   }
1163   p|stats_msg_count;
1164
1165   CmiPrintf("readStatsMsgs for %d pes starts ... \n", stats_msg_count);
1166   if (LBSimulation::simProcs == 0) LBSimulation::simProcs = stats_msg_count;
1167   if (LBSimulation::simProcs != stats_msg_count) LBSimulation::procsChanged = true;
1168
1169   // LBSimulation::simProcs must be set
1170   statsData->pup(p);
1171
1172   CmiPrintf("Simulation for %d pes \n", LBSimulation::simProcs);
1173   CmiPrintf("n_obj: %d n_migratble: %d \n", statsData->n_objs, statsData->n_migrateobjs);
1174
1175   // file f is closed in the destructor of PUP::fromDisk
1176   CmiPrintf("ReadStatsMsg from %s completed\n", filename);
1177 #endif
1178 }
1179
1180 void CentralLB::writeStatsMsgs(const char* filename) 
1181 {
1182 #if CMK_LBDB_ON
1183   FILE *f = fopen(filename, "w");
1184   if (f==NULL) {
1185     CmiPrintf("Fatal Error> writeStatsMsgs failed to open the output file %s!\n", filename);
1186     CmiAbort("");
1187   }
1188
1189   const PUP::machineInfo &machInfo = PUP::machineInfo::current();
1190   PUP::toDisk p(f);
1191   p((char *)&machInfo, sizeof(machInfo));       // machine info
1192
1193   p|_lb_args.lbversion();               // write version number
1194   p|stats_msg_count;
1195   statsData->pup(p);
1196
1197   fclose(f);
1198
1199   CmiPrintf("WriteStatsMsgs to %s succeed!\n", filename);
1200 #endif
1201 }
1202
1203 // calculate the predicted wallclock/cpu load for every processors
1204 // considering communication overhead if considerComm is true
1205 void getPredictedLoadWithMsg(BaseLB::LDStats* stats, int count, 
1206                       LBMigrateMsg *msg, LBInfo &info, 
1207                       int considerComm)
1208 {
1209 #if CMK_LBDB_ON
1210         stats->makeCommHash();
1211
1212         // update to_proc according to migration msgs
1213         for(int i = 0; i < msg->n_moves; i++) {
1214           MigrateInfo &mInfo = msg->moves[i];
1215           int idx = stats->getHash(mInfo.obj.objID(), mInfo.obj.omID());
1216           CmiAssert(idx != -1);
1217           stats->to_proc[idx] = mInfo.to_pe;
1218         }
1219
1220         info.getInfo(stats, count, considerComm);
1221 #endif
1222 }
1223
1224
1225 void CentralLB::findSimResults(LDStats* stats, int count, LBMigrateMsg* msg, LBSimulation* simResults)
1226 {
1227     CkAssert(simResults != NULL && count == simResults->numPes);
1228     // estimate the new loads of the processors. As a first approximation, this is the
1229     // sum of the cpu times of the objects on that processor
1230     double startT = CkWallTimer();
1231     getPredictedLoadWithMsg(stats, count, msg, simResults->lbinfo, 1);
1232     CmiPrintf("getPredictedLoad finished in %fs\n", CkWallTimer()-startT);
1233 }
1234
1235 void CentralLB::pup(PUP::er &p) { 
1236   BaseLB::pup(p); 
1237   if (p.isUnpacking())  {
1238     initLB(CkLBOptions(seqno)); 
1239   }
1240 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1241         p | lbDecisionCount;
1242     p | resumeCount;
1243 #endif
1244         
1245 }
1246
1247 int CentralLB::useMem() { 
1248   return sizeof(CentralLB) + statsData->useMem() + 
1249          CkNumPes() * sizeof(CLBStatsMsg *);
1250 }
1251
1252
1253 /**
1254   CLBStatsMsg is not a real message now.
1255   CLBStatsMsg is used for all processors to fill in their local load and comm
1256   statistics and send to processor 0
1257 */
1258
1259 CLBStatsMsg::CLBStatsMsg(int osz, int csz) {
1260   n_objs = osz;
1261   n_comm = csz;
1262   objData = new LDObjData[osz];
1263   commData = new LDCommData[csz];
1264   avail_vector = NULL;
1265 }
1266
1267 CLBStatsMsg::~CLBStatsMsg() {
1268   delete [] objData;
1269   delete [] commData;
1270   delete [] avail_vector;
1271 }
1272
1273 void CLBStatsMsg::pup(PUP::er &p) {
1274   int i;
1275   p|from_pe;
1276   p|pe_speed;
1277   p|total_walltime; p|total_cputime;
1278   p|idletime;
1279   p|bg_walltime;   p|bg_cputime;
1280 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1281   p | step;
1282 #endif
1283   p|n_objs;
1284   if (p.isUnpacking()) objData = new LDObjData[n_objs];
1285   for (i=0; i<n_objs; i++) p|objData[i];
1286   p|n_comm;
1287   if (p.isUnpacking()) commData = new LDCommData[n_comm];
1288   for (i=0; i<n_comm; i++) p|commData[i];
1289
1290   int has_avail_vector;
1291   if (!p.isUnpacking()) has_avail_vector = (avail_vector != NULL);
1292   p|has_avail_vector;
1293   if (p.isUnpacking()) {
1294     if (has_avail_vector) avail_vector = new char[CkNumPes()];
1295     else avail_vector = NULL;
1296   }
1297   if (has_avail_vector) p(avail_vector, CkNumPes());
1298
1299   p(next_lb);
1300 }
1301
1302 // CkMarshalledCLBStatsMessage is used in the marshalled parameter in
1303 // the entry function, it is just used to use to pup.
1304 // I don't use CLBStatsMsg directly as marshalled parameter because
1305 // I want the data pointer stored and not to be freed by the Charm++.
1306 void CkMarshalledCLBStatsMessage::free() { 
1307   int count = msgs.size();
1308   for  (int i=0; i<count; i++) {
1309     delete msgs[i];
1310     msgs[i] = NULL;
1311   }
1312   msgs.free();
1313 }
1314
1315 void CkMarshalledCLBStatsMessage::add(CkMarshalledCLBStatsMessage &m)
1316 {
1317   int count = m.getCount();
1318   for (int i=0; i<count; i++) add(m.getMessage(i));
1319 }
1320
1321 void CkMarshalledCLBStatsMessage::pup(PUP::er &p)
1322 {
1323   int count = msgs.size();
1324   p|count;
1325   for (int i=0; i<count; i++) {
1326     CLBStatsMsg *msg;
1327     if (p.isUnpacking()) msg = new CLBStatsMsg;
1328     else { 
1329       msg = msgs[i]; CmiAssert(msg!=NULL);
1330     }
1331     msg->pup(p);
1332     if (p.isUnpacking()) add(msg);
1333   }
1334 }
1335
1336 SpanningTree::SpanningTree()
1337 {
1338         double sq = sqrt(CkNumPes()*4.0-3.0) - 1; // 1 + arity + arity*arity = CkNumPes()
1339         arity = (int)ceil(sq/2);
1340         calcParent(CkMyPe());
1341         calcNumChildren(CkMyPe());
1342 }
1343
1344 void SpanningTree::calcParent(int n)
1345 {
1346         parent=-1;
1347         if(n != 0  && arity > 0)
1348                 parent = (n-1)/arity;
1349 }
1350
1351 void SpanningTree::calcNumChildren(int n)
1352 {
1353         numChildren = 0;
1354         if (arity == 0) return;
1355         int fullNode=(CkNumPes()-1-arity)/arity;
1356         if(n <= fullNode)
1357                 numChildren = arity;
1358         if(n == fullNode+1)
1359                 numChildren = CkNumPes()-1-(fullNode+1)*arity;
1360         if(n > fullNode+1)
1361                 numChildren = 0;
1362 }
1363
1364 #include "CentralLB.def.h"
1365  
1366 /*@}*/