A few bug fixes related to the mlogft machine.
[charm.git] / src / ck-ldb / CentralLB.C
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 /**
9  * \addtogroup CkLdb
10 */
11 /*@{*/
12
13 #include <charm++.h>
14 #include "envelope.h"
15 #include "CentralLB.h"
16 #include "LBDBManager.h"
17 #include "LBSimulation.h"
18
19 #define  DEBUGF(x)      //  CmiPrintf x;
20 #define  DEBUG(x)       // x;
21
22 #if CMK_MEM_CHECKPOINT
23    /* can not handle reduction in inmem FT */
24 #define USE_REDUCTION         0
25 #define USE_LDB_SPANNING_TREE 0
26 #else
27 #define USE_REDUCTION         1
28 #define USE_LDB_SPANNING_TREE 1
29 #endif
30
31 #ifdef _FAULT_MLOG_
32 /* can not handle reduction in inmem FT */
33 #undef USE_REDUCTION
34 #undef USE_LDB_SPANNING_TREE
35 #define USE_REDUCTION         0
36 #define USE_LDB_SPANNING_TREE 0
37 #else
38 #define USE_REDUCTION         1
39 #define USE_LDB_SPANNING_TREE 1
40 #endif
41
42 #ifdef _FAULT_MLOG_
43 extern int _restartFlag;
44 extern void getGlobalStep(CkGroupID );
45 extern void initMlogLBStep(CkGroupID );
46 extern int globalResumeCount;
47 extern void sendDummyMigrationCounts(int *);
48 #endif
49
50 #if CMK_GRID_QUEUE_AVAILABLE
51 CpvExtern(void *, CkGridObject);
52 #endif
53
54 CkGroupID loadbalancer;
55 int * lb_ptr;
56 int load_balancer_created;
57
58 CreateLBFunc_Def(CentralLB, "CentralLB base class");
59
60 static void getPredictedLoadWithMsg(BaseLB::LDStats* stats, int count, 
61                              LBMigrateMsg *, LBInfo &info, int considerComm);
62
63 /*
64 void CreateCentralLB()
65 {
66   CProxy_CentralLB::ckNew(0);
67 }
68 */
69
70 void CentralLB::staticStartLB(void* data)
71 {
72   CentralLB *me = (CentralLB*)(data);
73   me->StartLB();
74 }
75
76 void CentralLB::staticMigrated(void* data, LDObjHandle h, int waitBarrier)
77 {
78   CentralLB *me = (CentralLB*)(data);
79   me->Migrated(h, waitBarrier);
80 }
81
82 void CentralLB::staticAtSync(void* data)
83 {
84   CentralLB *me = (CentralLB*)(data);
85   me->AtSync();
86 }
87
88 void CentralLB::initLB(const CkLBOptions &opt)
89 {
90 #if CMK_LBDB_ON
91   lbname = "CentralLB";
92   thisProxy = CProxy_CentralLB(thisgroup);
93   //  CkPrintf("Construct in %d\n",CkMyPe());
94
95   // create and turn on by default
96   receiver = theLbdb->
97     AddLocalBarrierReceiver((LDBarrierFn)(staticAtSync),(void*)(this));
98   notifier = theLbdb->getLBDB()->
99     NotifyMigrated((LDMigratedFn)(staticMigrated),(void*)(this));
100   startLbFnHdl = theLbdb->getLBDB()->
101     AddStartLBFn((LDStartLBFn)(staticStartLB),(void*)(this));
102
103   // CkPrintf("[%d] CentralLB initLB \n",CkMyPe());
104   if (opt.getSeqNo() > 0) turnOff();
105
106   stats_msg_count = 0;
107   statsMsgsList = new CLBStatsMsg*[CkNumPes()];
108   for(int i=0; i < CkNumPes(); i++)
109     statsMsgsList[i] = 0;
110
111   statsData = new LDStats;
112
113   // for future predictor
114   if (_lb_predict) predicted_model = new FutureModel(_lb_predict_window);
115   else predicted_model=0;
116   // register user interface callbacks
117   theLbdb->getLBDB()->SetupPredictor((LDPredictModelFn)(staticPredictorOn),(LDPredictWindowFn)(staticPredictorOnWin),(LDPredictFn)(staticPredictorOff),(LDPredictModelFn)(staticChangePredictor),(void*)(this));
118
119   myspeed = theLbdb->ProcessorSpeed();
120
121   migrates_completed = 0;
122   future_migrates_completed = 0;
123   migrates_expected = -1;
124   future_migrates_expected = -1;
125   cur_ld_balancer = _lb_args.central_pe();      // 0 default
126   lbdone = 0;
127   count_msgs=0;
128   statsMsg = NULL;
129
130   if (_lb_args.statsOn()) theLbdb->CollectStatsOn();
131
132   load_balancer_created = 1;
133 #endif
134 }
135
136 CentralLB::~CentralLB()
137 {
138 #if CMK_LBDB_ON
139   delete [] statsMsgsList;
140   delete statsData;
141   theLbdb = CProxy_LBDatabase(_lbdb).ckLocalBranch();
142   if (theLbdb) {
143     theLbdb->getLBDB()->
144       RemoveNotifyMigrated(notifier);
145     theLbdb->
146       RemoveStartLBFn((LDStartLBFn)(staticStartLB));
147   }
148 #endif
149 }
150
151 void CentralLB::turnOn() 
152 {
153 #if CMK_LBDB_ON
154   theLbdb->getLBDB()->
155     TurnOnBarrierReceiver(receiver);
156   theLbdb->getLBDB()->
157     TurnOnNotifyMigrated(notifier);
158   theLbdb->getLBDB()->
159     TurnOnStartLBFn(startLbFnHdl);
160 #endif
161 }
162
163 void CentralLB::turnOff() 
164 {
165 #if CMK_LBDB_ON
166   theLbdb->getLBDB()->
167     TurnOffBarrierReceiver(receiver);
168   theLbdb->getLBDB()->
169     TurnOffNotifyMigrated(notifier);
170   theLbdb->getLBDB()->
171     TurnOffStartLBFn(startLbFnHdl);
172 #endif
173 }
174
175 void CentralLB::AtSync()
176 {
177 #if CMK_LBDB_ON
178   DEBUGF(("[%d] CentralLB AtSync step %d!!!!!\n",CkMyPe(),step()));
179
180 #ifdef _FAULT_MLOG_
181 #endif
182
183   // if num of processor is only 1, nothing should happen
184   if (!QueryBalanceNow(step()) || CkNumPes() == 1) {
185     MigrationDone(0);
186     return;
187   }
188   if(CmiNodeAlive(CkMyPe())){
189     thisProxy [CkMyPe()].ProcessAtSync();
190   }
191 #endif
192 }
193
194 #include "ComlibStrategy.h"
195
196 void CentralLB::ProcessAtSync()
197 {
198 #if CMK_LBDB_ON
199   CmiAssert(CmiNodeAlive(CkMyPe()));
200   if (CkMyPe() == cur_ld_balancer) {
201     start_lb_time = CkWallTimer();
202   }
203
204 #ifdef _FAULT_MLOG_
205         initMlogLBStep(thisgroup);
206 #endif
207
208   // build message
209   BuildStatsMsg();
210
211 #if USE_REDUCTION
212     // reduction to get total number of objects and comm
213     // so that processor 0 can pre-allocate load balancing database
214   int counts[2];
215   counts[0] = theLbdb->GetObjDataSz();
216   counts[1] = theLbdb->GetCommDataSz();
217
218   CkCallback cb(CkIndex_CentralLB::ReceiveCounts((CkReductionMsg*)NULL), 
219                   thisProxy[0]);
220   contribute(2*sizeof(int), counts, CkReduction::sum_int, cb);
221 #else
222   SendStats();
223 #endif
224 #endif
225 }
226
227 // called only on 0
228 void CentralLB::ReceiveCounts(CkReductionMsg  *msg)
229 {
230   CmiAssert(CkMyPe() == 0);
231
232   int *counts = (int *)msg->getData();
233   int n_objs = counts[0];
234   int n_comm = counts[1];
235
236     // resize database
237   statsData->objData.resize(n_objs);
238   statsData->from_proc.resize(n_objs);
239   statsData->to_proc.resize(n_objs);
240   statsData->commData.resize(n_comm);
241
242   DEBUGF(("[%d] ReceiveCounts\n",CkMyPe()));
243         
244     // broadcast call to let everybody start to send stats
245   thisProxy.SendStats();
246 }
247
248 void CentralLB::BuildStatsMsg()
249 {
250 #if CMK_LBDB_ON
251   // build and send stats
252   const int osz = theLbdb->GetObjDataSz();
253   const int csz = theLbdb->GetCommDataSz();
254
255   int npes = CkNumPes();
256   CLBStatsMsg* msg = new CLBStatsMsg(osz, csz);
257   msg->from_pe = CkMyPe();
258 #ifdef _FAULT_MLOG_
259         msg->step = step();
260 #endif
261   //msg->serial = CrnRand();
262
263 /*
264   theLbdb->TotalTime(&msg->total_walltime,&msg->total_cputime);
265   theLbdb->IdleTime(&msg->idletime);
266   theLbdb->BackgroundLoad(&msg->bg_walltime,&msg->bg_cputime);
267 */
268   theLbdb->GetTime(&msg->total_walltime,&msg->total_cputime,
269                    &msg->idletime, &msg->bg_walltime,&msg->bg_cputime);
270   msg->pe_speed = myspeed;
271   DEBUGF(("Processor %d Total time (wall,cpu) = %f %f Idle = %f Bg = %f %f\n", CkMyPe(),msg->total_walltime,msg->total_cputime,msg->idletime,msg->bg_walltime,msg->bg_cputime));
272
273   msg->n_objs = osz;
274   theLbdb->GetObjData(msg->objData);
275   msg->n_comm = csz;
276   theLbdb->GetCommData(msg->commData);
277 //  theLbdb->ClearLoads();
278   DEBUGF(("PE %d sending stats to ReceiveStats %d objs, %d comm\n",CkMyPe(),msg->n_objs,msg->n_comm));
279
280   if(CkMyPe() == cur_ld_balancer) {
281     msg->avail_vector = new char[CkNumPes()];
282     LBDatabaseObj()->get_avail_vector(msg->avail_vector);
283     msg->next_lb = LBDatabaseObj()->new_lbbalancer();
284   }
285
286   CmiAssert(statsMsg == NULL);
287   statsMsg = msg;
288 #endif
289 }
290
291 // called on every processor
292 void CentralLB::SendStats()
293 {
294 #if CMK_LBDB_ON
295   CmiAssert(statsMsg != NULL);
296
297 #if USE_LDB_SPANNING_TREE
298   if(CkNumPes()>1024)
299   {
300     if (CkMyPe() == cur_ld_balancer)
301       thisProxy[CkMyPe()].ReceiveStats(statsMsg);
302     else
303       thisProxy[CkMyPe()].ReceiveStatsViaTree(statsMsg);
304   }
305   else
306 #endif
307         DEBUGF(("[%d] calling ReceiveStats on step %d \n",CmiMyPe(),step()));
308   thisProxy[cur_ld_balancer].ReceiveStats(statsMsg);
309
310   statsMsg = NULL;
311
312 #ifdef __BLUEGENE__
313   BgEndStreaming();
314 #endif
315
316   {
317   // enfore the barrier to wait until centralLB says no
318   LDOMHandle h;
319   h.id.id.idx = 0;
320   theLbdb->getLBDB()->RegisteringObjects(h);
321   }
322 #endif
323 }
324
325 #ifdef _FAULT_MLOG_
326 extern int donotCountMigration;
327 #endif
328
329 void CentralLB::Migrated(LDObjHandle h, int waitBarrier)
330 {
331 #ifdef _FAULT_MLOG_
332     if(donotCountMigration){
333         return ;
334     }
335 #endif
336
337 #if CMK_LBDB_ON
338   if (waitBarrier) {
339             migrates_completed++;
340       DEBUGF(("[%d] An object migrated! %d %d\n",CkMyPe(),migrates_completed,migrates_expected));
341     if (migrates_completed == migrates_expected) {
342       MigrationDone(1);
343     }
344   }
345   else {
346     future_migrates_completed ++;
347     DEBUGF(("[%d] An object migrated with no barrier! %d expected: %d\n",CkMyPe(),future_migrates_completed,future_migrates_expected));
348     if (future_migrates_completed == future_migrates_expected)  {
349         CheckMigrationComplete();
350     }
351   }
352 #endif
353 }
354
355 void CentralLB::MissMigrate(int waitForBarrier)
356 {
357   LDObjHandle h;
358   Migrated(h, waitForBarrier);
359 }
360
361 // build a complete data from bufferred messages
362 // not used when USE_REDUCTION = 1
363 void CentralLB::buildStats()
364 {
365     statsData->count = stats_msg_count;
366     // allocate space
367     statsData->objData.resize(statsData->n_objs);
368     statsData->from_proc.resize(statsData->n_objs);
369     statsData->to_proc.resize(statsData->n_objs);
370     statsData->commData.resize(statsData->n_comm);
371
372     int nobj = 0;
373     int ncom = 0;
374     int nmigobj = 0;
375     // copy all data in individule message to this big structure
376     for (int pe=0; pe<CkNumPes(); pe++) {
377        int i;
378        CLBStatsMsg *msg = statsMsgsList[pe];
379        if(msg == NULL) continue;
380        for (i=0; i<msg->n_objs; i++) {
381          statsData->from_proc[nobj] = statsData->to_proc[nobj] = pe;
382          statsData->objData[nobj] = msg->objData[i];
383          if (msg->objData[i].migratable) nmigobj++;
384          nobj++;
385        }
386        for (i=0; i<msg->n_comm; i++) {
387          statsData->commData[ncom] = msg->commData[i];
388          ncom++;
389        }
390        // free the memory
391        delete msg;
392        statsMsgsList[pe]=0;
393     }
394     statsData->n_migrateobjs = nmigobj;
395 }
396
397 // deposit one processor data at a time, note database is pre-allocated
398 // to have enough space
399 // used when USE_REDUCTION = 1
400 void CentralLB::depositData(CLBStatsMsg *m)
401 {
402   int i;
403   if (m == NULL) return;
404
405   const int pe = m->from_pe;
406   struct ProcStats &procStat = statsData->procs[pe];
407   procStat.pe = pe;
408   procStat.total_walltime = m->total_walltime;
409   procStat.total_cputime = m->total_cputime;
410   procStat.idletime = m->idletime;
411   procStat.bg_walltime = m->bg_walltime;
412   procStat.bg_cputime = m->bg_cputime;
413   procStat.pe_speed = m->pe_speed;
414   //procStat.utilization = 1.0;
415   procStat.available = CmiTrue;
416   procStat.n_objs = m->n_objs;
417
418   int &nobj = statsData->n_objs;
419   int &nmigobj = statsData->n_migrateobjs;
420   for (i=0; i<m->n_objs; i++) {
421       statsData->from_proc[nobj] = statsData->to_proc[nobj] = pe;
422       statsData->objData[nobj] = m->objData[i];
423       if (m->objData[i].migratable) nmigobj++;
424       nobj++;
425       CmiAssert(nobj <= statsData->objData.capacity());
426   }
427   int &n_comm = statsData->n_comm;
428   for (i=0; i<m->n_comm; i++) {
429       statsData->commData[n_comm] = m->commData[i];
430       n_comm++;
431       CmiAssert(n_comm <= statsData->commData.capacity());
432   }
433   delete m;
434 }
435
436 void CentralLB::ReceiveStats(CkMarshalledCLBStatsMessage &msg)
437 {
438 #if CMK_LBDB_ON
439     //  loop through all CLBStatsMsg in the incoming msg
440   for (int num = 0; num < msg.getCount(); num++) 
441   {
442     CLBStatsMsg *m = (CLBStatsMsg *)msg.getMessage(num);
443     const int pe = m->from_pe;
444         DEBUGF(("Stats msg received, %d %d %d %p step %d\n", pe,stats_msg_count,m->n_objs,m,step()));
445 #ifdef _FAULT_MLOG_     
446 /*      
447  *              if(m->step < step()){
448  *                          //TODO: if a processor is redoing an old load balance step..
449  *                                      //tell it that the step is done and that it should not perform any migrations
450  *                                                  thisProxy[pe].ReceiveDummyMigration();
451  *                                                          }*/
452 #endif
453         
454     if(!CmiNodeAlive(pe)){
455         DEBUGF(("[%d] ReceiveStats called from invalidProcessor %d\n",CkMyPe(),pe));
456         continue;
457     }
458         
459     if (m->avail_vector) {
460       LBDatabaseObj()->set_avail_vector(m->avail_vector,  m->next_lb);
461     }
462
463     if (statsMsgsList[pe] != 0) {
464       CkPrintf("*** Unexpected CLBStatsMsg in ReceiveStats from PE %d ***\n",
465              pe);
466     } else {
467       statsMsgsList[pe] = m;
468 #if USE_REDUCTION
469       depositData(m);
470 #else
471       // store per processor data right away
472       struct ProcStats &procStat = statsData->procs[pe];
473       procStat.pe = pe;
474       procStat.total_walltime = m->total_walltime;
475       procStat.total_cputime = m->total_cputime;
476       procStat.idletime = m->idletime;
477       procStat.bg_walltime = m->bg_walltime;
478       procStat.bg_cputime = m->bg_cputime;
479       procStat.pe_speed = m->pe_speed;
480       //procStat.utilization = 1.0;
481       procStat.available = CmiTrue;
482       procStat.n_objs = m->n_objs;
483
484       statsData->n_objs += m->n_objs;
485       statsData->n_comm += m->n_comm;
486 #endif
487       stats_msg_count++;
488     }
489   }    // end of for
490
491   const int clients = CkNumValidPes();
492  
493   if (stats_msg_count == clients) {
494         DEBUGF(("[%d] All stats messages received \n",CmiMyPe()));
495     statsData->count = stats_msg_count;
496     thisProxy[CkMyPe()].LoadBalance();
497   }
498 #endif
499 }
500
501 /** added by Abhinav for receiving msgs via spanning tree */
502 void CentralLB::ReceiveStatsViaTree(CkMarshalledCLBStatsMessage &msg)
503 {
504 #if CMK_LBDB_ON
505         CmiAssert(CkMyPe() != 0);
506         bufMsg.add(msg);         // buffer messages
507         count_msgs++;
508         //CkPrintf("here %d\n", CkMyPe());
509         if (count_msgs == st.numChildren+1) {
510                 if(st.parent == 0)
511                 {
512                         thisProxy[0].ReceiveStats(bufMsg);
513                         //CkPrintf("from %d\n", CkMyPe());
514                 }
515                 else
516                         thisProxy[st.parent].ReceiveStatsViaTree(bufMsg);
517                 count_msgs = 0;
518                 bufMsg.free();
519         } 
520 #endif
521 }
522
523 void CentralLB::LoadBalance()
524 {
525 #if CMK_LBDB_ON
526   int proc;
527   const int clients = CkNumPes();
528
529 #if ! USE_REDUCTION
530   // build data
531   buildStats();
532 #else
533   for (proc = 0; proc < clients; proc++) statsMsgsList[proc] = NULL;
534 #endif
535
536   if (_lb_args.debug()) 
537       CmiPrintf("[%s] Load balancing step %d starting at %f in PE%d Memory:%dMB\n",
538                  lbName(), step(),start_lb_time, cur_ld_balancer, CmiMemoryUsage()/(1024.0*1024.0));
539  //     CmiPrintf("[%d] n_obj:%d migratable:%d n_comm:%d\n", CkMyPe(), statsData->n_objs, statsData->n_migrateobjs, statsData->n_comm);
540 //    double strat_start_time = CkWallTimer();
541
542   // if we are in simulation mode read data
543   if (LBSimulation::doSimulation) simulationRead();
544
545   char *availVector = LBDatabaseObj()->availVector();
546   for(proc = 0; proc < clients; proc++)
547       statsData->procs[proc].available = (CmiBool)availVector[proc];
548
549   preprocess(statsData, clients);
550
551 //    CkPrintf("Before Calling Strategy\n");
552
553   if (_lb_args.printSummary()) {
554       LBInfo info(clients);
555         // not take comm data
556       info.getInfo(statsData, clients, 0);
557       double mLoad, mCpuLoad, totalLoad;
558       info.getSummary(mLoad, mCpuLoad, totalLoad);
559       int nmsgs, nbytes;
560       statsData->computeNonlocalComm(nmsgs, nbytes);
561       CkPrintf("[%d] Load Summary (before LB): max (with bg load): %f max (obj only): %f average: %f at step %d nonlocal: %d msgs %.2fKB.\n", CkMyPe(), mLoad, mCpuLoad, totalLoad/clients, step(), nmsgs, 1.0*nbytes/1024);
562 //      if (_lb_args.debug() > 1) {
563 //        for (int i=0; i<statsData->n_objs; i++)
564 //          CmiPrintf("[%d] %.10f %.10f\n", i, statsData->objData[i].minWall, statsData->objData[i].maxWall);
565 //      }
566   }
567
568   double strat_start_time = CkWallTimer();
569   LBMigrateMsg* migrateMsg = Strategy(statsData, clients);
570 #ifdef _FAULT_MLOG_
571         migrateMsg->step = step();
572 #endif
573   if (_lb_args.debug()) {
574     CkPrintf("Strategy took %f seconds.\n", CkWallTimer()-strat_start_time);
575     double lbdbMemsize = LBDatabase::Object()->useMem()/1000;
576     CkPrintf("[%s] memUsage: LBManager:%dKB CentralLB:%dKB\n", 
577               lbName(), (int)lbdbMemsize, (int)(useMem()/1000));
578   }
579
580 //    CkPrintf("returned successfully\n");
581
582   LBDatabaseObj()->get_avail_vector(migrateMsg->avail_vector);
583   migrateMsg->next_lb = LBDatabaseObj()->new_lbbalancer();
584
585   // if this is the step at which we need to dump the database
586   simulationWrite();
587
588 //  calculate predicted load
589 //  very time consuming though, so only happen when debugging is on
590   if (_lb_args.printSummary()) {
591       LBInfo info(clients);
592         // not take comm data
593       getPredictedLoadWithMsg(statsData, clients, migrateMsg, info, 0);
594       double mLoad, mCpuLoad, totalLoad;
595       info.getSummary(mLoad, mCpuLoad, totalLoad);
596       int nmsgs, nbytes;
597       statsData->computeNonlocalComm(nmsgs, nbytes);
598       CkPrintf("[%d] Load Summary (after LB): max (with bg load): %f max (obj only): %f average: %f at step %d nonlocal: %d msgs %.2fKB useMem: %.2fKB.\n", CkMyPe(), mLoad, mCpuLoad, totalLoad/clients, step(), nmsgs, 1.0*nbytes/1024, (1.0*useMem())/1024);
599       for (int i=0; i<clients; i++)
600         migrateMsg->expectedLoad[i] = info.peLoads[i];
601   }
602
603   DEBUGF(("[%d]calling recv migration\n",CkMyPe()));
604 #ifdef _FAULT_MLOG_ 
605     lbDecisionCount++;
606     migrateMsg->lbDecisionCount = lbDecisionCount;
607 #endif
608  CkPrintf("ReceiveMigration called at %.6lf \n",CmiWallTimer());
609   thisProxy.ReceiveMigration(migrateMsg);
610
611   // Zero out data structures for next cycle
612   // CkPrintf("zeroing out data\n");
613   statsData->clear();
614   stats_msg_count=0;
615 #endif
616 }
617
618 //    double strat_end_time = CkWallTimer();
619 //    CkPrintf("Strat elapsed time %f\n",strat_end_time-strat_start_time);
620 // test if sender and receiver in a commData is nonmigratable.
621 static int isMigratable(LDObjData **objData, int *len, int count, const LDCommData &commData)
622 {
623 #if CMK_LBDB_ON
624   for (int pe=0 ; pe<count; pe++)
625   {
626     for (int i=0; i<len[pe]; i++)
627       if (LDObjIDEqual(objData[pe][i].objID(), commData.sender.objID()) ||
628           LDObjIDEqual(objData[pe][i].objID(), commData.receiver.get_destObj().objID())) 
629       return 0;
630   }
631 #endif
632   return 1;
633 }
634
635 // rebuild LDStats and remove all non-migratble objects and related things
636 void CentralLB::removeNonMigratable(LDStats* stats, int count)
637 {
638   int i;
639
640   // check if we have non-migratable objects
641   int have = 0;
642   for (i=0; i<stats->n_objs; i++) 
643   {
644     LDObjData &odata = stats->objData[i];
645     if (!odata.migratable) {
646       have = 1; break;
647     }
648   }
649   if (have == 0) return;
650
651   CkVec<LDObjData> nonmig;
652   CkVec<int> new_from_proc, new_to_proc;
653   nonmig.resize(stats->n_migrateobjs);
654   new_from_proc.resize(stats->n_migrateobjs);
655   new_to_proc.resize(stats->n_migrateobjs);
656   int n_objs = 0;
657   for (i=0; i<stats->n_objs; i++) 
658   {
659     LDObjData &odata = stats->objData[i];
660     if (odata.migratable) {
661       nonmig[n_objs] = odata;
662       new_from_proc[n_objs] = stats->from_proc[i];
663       new_to_proc[n_objs] = stats->to_proc[i];
664       n_objs ++;
665     }
666     else {
667       stats->procs[stats->from_proc[i]].bg_walltime += odata.wallTime;
668       stats->procs[stats->from_proc[i]].bg_cputime += odata.cpuTime;
669     }
670   }
671   CmiAssert(stats->n_migrateobjs == n_objs);
672
673   stats->makeCommHash();
674   
675   CkVec<LDCommData> newCommData;
676   newCommData.resize(stats->n_comm);
677   int n_comm = 0;
678   for (i=0; i<stats->n_comm; i++) 
679   {
680     LDCommData& cdata = stats->commData[i];
681     if (!cdata.from_proc()) 
682     {
683       int idx = stats->getSendHash(cdata);
684       CmiAssert(idx != -1);
685       if (!stats->objData[idx].migratable) continue;
686     }
687     switch (cdata.receiver.get_type()) {
688     case LD_PROC_MSG:
689       break;
690     case LD_OBJ_MSG:  {
691       int idx = stats->getRecvHash(cdata);
692       if (stats->complete_flag)
693         CmiAssert(idx != -1);
694       else if (idx == -1) continue;          // receiver not in this group
695       if (!stats->objData[idx].migratable) continue;
696       break;
697       }
698     case LD_OBJLIST_MSG:    // object message FIXME add multicast
699       break;
700     }
701     newCommData[n_comm] = cdata;
702     n_comm ++;
703   }
704
705   if (n_objs != stats->n_objs) CmiPrintf("Removed %d nonmigratable %d comms - n_objs:%d migratable:%d\n", stats->n_objs-n_objs, stats->n_objs, stats->n_migrateobjs, stats->n_comm-n_comm);
706
707   // swap to new data
708   stats->objData = nonmig;
709   stats->from_proc = new_from_proc;
710   stats->to_proc = new_to_proc;
711   stats->n_objs = n_objs;
712
713   stats->commData = newCommData;
714   stats->n_comm = n_comm;
715
716   stats->deleteCommHash();
717   stats->makeCommHash();
718
719 }
720
721
722 #ifdef _FAULT_MLOG_
723 extern int restarted;
724 #endif
725
726 void CentralLB::ReceiveMigration(LBMigrateMsg *m)
727 {
728 #if CMK_LBDB_ON
729         int i;
730
731 #ifdef _FAULT_MLOG_
732         int *dummyCounts;
733
734         DEBUGF(("[%d] Starting ReceiveMigration WITH step %d m->step %d\n",CkMyPe(),step(),m->step));
735         // CmiPrintf("[%d] Starting ReceiveMigration step %d m->step %d\n",CkMyPe(),step(),m->step);
736         if(step() > m->step){
737                 char str[100];
738                 envelope *env = UsrToEnv(m);
739                 CmiPrintf("[%d] Object %s tProcessed %d m->TN %d\n",CmiMyPe(),mlogData->objID.toString(str),mlogData->tProcessed,env->TN);
740                 return;
741         }
742         lbDecisionCount = m->lbDecisionCount;
743 #endif
744
745   for (i=0; i<CkNumPes(); i++) theLbdb->lastLBInfo.expectedLoad[i] = m->expectedLoad[i];
746   CmiAssert(migrates_expected <= 0 || migrates_completed == migrates_expected);
747 /*FAULT_EVAC*/
748   if(!CmiNodeAlive(CkMyPe())){
749         delete m;
750         return;
751   }
752   migrates_expected = 0;
753   future_migrates_expected = 0;
754 #ifdef _FAULT_MLOG_
755         int sending=0;
756     int dummy=0;
757         LBDB *_myLBDB = theLbdb->getLBDB();
758         if(_restartFlag){
759         dummyCounts = new int[CmiNumPes()];
760         bzero(dummyCounts,sizeof(int)*CmiNumPes());
761     }
762 #endif
763   for(i=0; i < m->n_moves; i++) {
764     MigrateInfo& move = m->moves[i];
765     const int me = CkMyPe();
766     if (move.from_pe == me && move.to_pe != me) {
767       DEBUGF(("[%d] migrating object to %d\n",move.from_pe,move.to_pe));
768       // migrate object, in case it is already gone, inform toPe
769 #ifndef _FAULT_MLOG_
770       if (theLbdb->Migrate(move.obj,move.to_pe) == 0) 
771          thisProxy[move.to_pe].MissMigrate(!move.async_arrival);
772 #else
773             if(_restartFlag == 0){
774                 DEBUG(CmiPrintf("[%d] need to move object from %d to %d \n",CkMyPe(),move.from_pe,move.to_pe));
775                 theLbdb->Migrate(move.obj,move.to_pe);
776                 sending++;
777             }else{
778                 if(_myLBDB->validObjHandle(move.obj)){
779                     DEBUG(CmiPrintf("[%d] need to move object from %d to %d \n",CkMyPe(),move.from_pe,move.to_pe));
780                     theLbdb->Migrate(move.obj,move.to_pe);
781                     sending++;
782                 }else{
783                     DEBUG(CmiPrintf("[%d] dummy move to pe %d detected after restart \n",CmiMyPe(),move.to_pe));
784                     dummyCounts[move.to_pe]++;
785                     dummy++;
786                 }
787             }
788 #endif
789     } else if (move.from_pe != me && move.to_pe == me) {
790        DEBUGF(("[%d] expecting object from %d\n",move.to_pe,move.from_pe));
791       if (!move.async_arrival) migrates_expected++;
792       else future_migrates_expected++;
793     }
794   }
795   DEBUGF(("[%d] in ReceiveMigration %d moves expected: %d future expected: %d\n",CkMyPe(),m->n_moves, migrates_expected, future_migrates_expected));
796   // if (_lb_debug) CkPrintf("[%d] expecting %d objects migrating.\n", CkMyPe(), migrates_expected);
797
798 #ifdef _FAULT_MLOG_
799         if(_restartFlag){
800                 sendDummyMigrationCounts(dummyCounts);
801                 _restartFlag  =0;
802         delete []dummyCounts;
803         }
804 #endif
805
806
807 #if 0
808   if (m->n_moves ==0) {
809     theLbdb->SetLBPeriod(theLbdb->GetLBPeriod()*2);
810   }
811 #endif
812   cur_ld_balancer = m->next_lb;
813   if((CkMyPe() == cur_ld_balancer) && (cur_ld_balancer != 0)){
814       LBDatabaseObj()->set_avail_vector(m->avail_vector, -2);
815   }
816
817   if (migrates_expected == 0 || migrates_completed == migrates_expected)
818     MigrationDone(1);
819   delete m;
820
821 //      CkEvacuatedElement();
822 #ifdef _FAULT_MLOG_
823 //  migrates_expected = 0;
824 //  //  ResumeClients(1);
825 #endif
826 #endif
827 }
828
829 #ifdef _FAULT_MLOG_
830 void CentralLB::ReceiveDummyMigration(int globalDecisionCount){
831     DEBUGF(("[%d] ReceiveDummyMigration called for step %d with globalDecisionCount %d\n",CkMyPe(),step(),globalDecisionCount));
832     //TODO: this is gonna be important when a crash happens during checkpoint
833     //the globalDecisionCount would have to be saved and compared against
834     //a future recvMigration
835                 
836         thisProxy[CkMyPe()].ResumeClients(1);
837 }
838 #endif
839
840 void CentralLB::MigrationDone(int balancing)
841 {
842 #if CMK_LBDB_ON
843   migrates_completed = 0;
844   migrates_expected = -1;
845   // clear load stats
846   if (balancing) theLbdb->ClearLoads();
847   // Increment to next step
848   theLbdb->incStep();
849         DEBUGF(("[%d] Incrementing Step %d \n",CkMyPe(),step()));
850   // if sync resume, invoke a barrier
851
852 #ifdef _FAULT_MLOG_
853     savedBalancing = balancing;
854     startLoadBalancingMlog(&resumeCentralLbAfterChkpt,(void *)this);
855 #endif
856
857   LoadbalanceDone(balancing);        // callback
858 #ifndef _FAULT_MLOG_
859   // if sync resume invoke a barrier
860   if (balancing && _lb_args.syncResume()) {
861     CkCallback cb(CkIndex_CentralLB::ResumeClients((CkReductionMsg*)NULL), 
862                   thisProxy);
863     contribute(0, NULL, CkReduction::sum_int, cb);
864   }
865   else{ 
866     if(CmiNodeAlive(CkMyPe())){
867         thisProxy [CkMyPe()].ResumeClients(balancing);
868     }   
869   }     
870 #if CMK_GRID_QUEUE_AVAILABLE
871   CmiGridQueueDeregisterAll ();
872   CpvAccess(CkGridObject) = NULL;
873 #endif
874 #endif 
875 #endif
876 }
877
878 #ifdef _FAULT_MLOG_
879 void CentralLB::endMigrationDone(int balancing){
880     DEBUGF(("[%d] CentralLB::endMigrationDone step %d\n",CkMyPe(),step()));
881
882
883   if (balancing && _lb_args.syncResume()) {
884     CkCallback cb(CkIndex_CentralLB::ResumeClients((CkReductionMsg*)NULL),
885                   thisProxy);
886     contribute(0, NULL, CkReduction::sum_int, cb);
887   }
888   else{
889     if(CmiNodeAlive(CkMyPe())){
890     DEBUGF(("[%d] Sending ResumeClients balancing %d \n",CkMyPe(),balancing));
891     thisProxy [CkMyPe()].ResumeClients(balancing);
892     }
893   }
894
895 }
896 #endif
897
898 #ifdef _FAULT_MLOG_
899 void resumeCentralLbAfterChkpt(void *_lb){
900         DEBUGF(("[%d] HERE\n"));
901     CentralLB *lb= (CentralLB *)_lb;
902     CpvAccess(_currentObj)=lb;
903     lb->endMigrationDone(lb->savedBalancing);
904 }
905 #endif
906
907
908 void CentralLB::ResumeClients(CkReductionMsg *msg)
909 {
910   ResumeClients(1);
911   delete msg;
912 }
913
914 void CentralLB::ResumeClients(int balancing)
915 {
916 #if CMK_LBDB_ON
917 #ifdef _FAULT_MLOG_
918     resumeCount++;
919     globalResumeCount = resumeCount;
920 #endif
921   DEBUGF(("[%d] Resuming clients. balancing:%d.\n",CkMyPe(),balancing));
922   if (balancing && _lb_args.debug() && CkMyPe() == cur_ld_balancer) {
923     double end_lb_time = CkWallTimer();
924     CkPrintf("[%s] Load balancing step %d finished at %f (duration %fs) Mem:%fMB\n",
925               lbName(), step()-1,end_lb_time, end_lb_time - start_lb_time, CmiMemoryUsage()/1024.0/1024.0);
926   }
927
928 #ifndef _FAULT_MLOG_
929   if (balancing) ComlibNotifyMigrationDone();  
930 #endif
931
932   theLbdb->ResumeClients();
933   if (balancing)  {
934     CheckMigrationComplete();
935     if (future_migrates_expected == 0 || 
936             future_migrates_expected == future_migrates_completed) {
937       CheckMigrationComplete();
938     }
939   }
940 #endif
941 }
942
943 /*
944   migration of objects contains two different kinds:
945   (1) objects want to make a barrier for migration completion
946       (waitForBarrier is true)
947       migrationDone() to finish and resumeClients
948   (2) objects don't need a barrier
949   However, next load balancing can only happen when both migrations complete
950 */ 
951 void CentralLB::CheckMigrationComplete()
952 {
953 #if CMK_LBDB_ON
954   lbdone ++;
955   if (lbdone == 2) {
956     lbdone = 0;
957     future_migrates_expected = -1;
958     future_migrates_completed = 0;
959     DEBUGF(("[%d] Migration Complete\n", CkMyPe()));
960     // release local barrier  so that the next load balancer can go
961     LDOMHandle h;
962     h.id.id.idx = 0;
963     theLbdb->getLBDB()->DoneRegisteringObjects(h);
964     // switch to the next load balancer in the list
965     // subtle: called from Migrated() may result in Migrated() called in next LB
966     theLbdb->nextLoadbalancer(seqno);
967   }
968 #endif
969 }
970
971 void CentralLB::preprocess(LDStats* stats,int count)
972 {
973   if (_lb_args.ignoreBgLoad())
974     stats->clearBgLoad();
975
976   // Call the predictor for the future
977   if (_lb_predict) FuturePredictor(statsData);
978 }
979
980 // default load balancing strategy
981 LBMigrateMsg* CentralLB::Strategy(LDStats* stats,int count)
982 {
983 #if CMK_LBDB_ON
984   work(stats, count);
985
986   if (_lb_args.debug()>1)  {
987     CkPrintf("Obj Map:\n");
988     for (int i=0; i<stats->n_objs; i++) CkPrintf("%d ", stats->to_proc[i]);
989     CkPrintf("\n");
990   }
991
992   return createMigrateMsg(stats, count);
993 #else
994   return NULL;
995 #endif
996 }
997
998 void CentralLB::work(LDStats* stats,int count)
999 {
1000   // does nothing but print the database
1001   stats->print();
1002 }
1003
1004 // generate migrate message from stats->from_proc and to_proc
1005 LBMigrateMsg * CentralLB::createMigrateMsg(LDStats* stats,int count)
1006 {
1007   int i;
1008   CkVec<MigrateInfo*> migrateInfo;
1009   for (i=0; i<stats->n_objs; i++) {
1010     LDObjData &objData = stats->objData[i];
1011     int frompe = stats->from_proc[i];
1012     int tope = stats->to_proc[i];
1013     if (frompe != tope) {
1014       //      CkPrintf("[%d] Obj %d migrating from %d to %d\n",
1015       //         CkMyPe(),obj,pe,dest);
1016       MigrateInfo *migrateMe = new MigrateInfo;
1017       migrateMe->obj = objData.handle;
1018       migrateMe->from_pe = frompe;
1019       migrateMe->to_pe = tope;
1020       migrateMe->async_arrival = objData.asyncArrival;
1021       migrateInfo.insertAtEnd(migrateMe);
1022     }
1023   }
1024
1025   int migrate_count=migrateInfo.length();
1026   LBMigrateMsg* msg = new(migrate_count,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
1027   msg->n_moves = migrate_count;
1028   for(i=0; i < migrate_count; i++) {
1029     MigrateInfo* item = (MigrateInfo*) migrateInfo[i];
1030     msg->moves[i] = *item;
1031     delete item;
1032     migrateInfo[i] = 0;
1033   }
1034   if (_lb_args.debug())
1035     CkPrintf("%s: %d objects migrating.\n", lbname, migrate_count);
1036   return msg;
1037 }
1038
1039 void CentralLB::simulationWrite() {
1040   if(step() == LBSimulation::dumpStep)
1041   {
1042     // here we are supposed to dump the database
1043     int dumpFileSize = strlen(LBSimulation::dumpFile) + 4;
1044     char *dumpFileName = (char *)malloc(dumpFileSize);
1045     while (sprintf(dumpFileName, "%s.%d", LBSimulation::dumpFile, LBSimulation::dumpStep) >= dumpFileSize) {
1046       free(dumpFileName);
1047       dumpFileSize+=3;
1048       dumpFileName = (char *)malloc(dumpFileSize);
1049     }
1050     writeStatsMsgs(dumpFileName);
1051     free(dumpFileName);
1052     CmiPrintf("LBDump: Dumped the load balancing data at step %d.\n",LBSimulation::dumpStep);
1053     ++LBSimulation::dumpStep;
1054     --LBSimulation::dumpStepSize;
1055     if (LBSimulation::dumpStepSize <= 0) { // prevent stupid step sizes
1056       CmiPrintf("Charm++> Exiting...\n");
1057       CkExit();
1058     }
1059     return;
1060   }
1061 }
1062
1063 void CentralLB::simulationRead() {
1064   LBSimulation *simResults = NULL, *realResults;
1065   LBMigrateMsg *voidMessage = new (0,0,0,0) LBMigrateMsg();
1066   voidMessage->n_moves=0;
1067   for ( ;LBSimulation::simStepSize > 0; --LBSimulation::simStepSize, ++LBSimulation::simStep) {
1068     // here we are supposed to read the data from the dump database
1069     int simFileSize = strlen(LBSimulation::dumpFile) + 4;
1070     char *simFileName = (char *)malloc(simFileSize);
1071     while (sprintf(simFileName, "%s.%d", LBSimulation::dumpFile, LBSimulation::simStep) >= simFileSize) {
1072       free(simFileName);
1073       simFileSize+=3;
1074       simFileName = (char *)malloc(simFileSize);
1075     }
1076     readStatsMsgs(simFileName);
1077
1078     // allocate simResults (only the first step)
1079     if (simResults == NULL) {
1080       simResults = new LBSimulation(LBSimulation::simProcs);
1081       realResults = new LBSimulation(LBSimulation::simProcs);
1082     }
1083     else {
1084       // should be the same number of procs of the original simulation!
1085       if (!LBSimulation::procsChanged) {
1086         // it means we have a previous step, so in simResults there is data.
1087         // we can now print the real effects of the load balancer during the simulation
1088         // or print the difference between the predicted data and the real one.
1089         realResults->reset();
1090         // reset to_proc of statsData to be equal to from_proc
1091         for (int k=0; k < statsData->n_objs; ++k) statsData->to_proc[k] = statsData->from_proc[k];
1092         findSimResults(statsData, LBSimulation::simProcs, voidMessage, realResults);
1093         simResults->PrintDifferences(realResults,statsData);
1094       }
1095       simResults->reset();
1096     }
1097
1098     // now pass it to the strategy routine
1099     double startT = CkWallTimer();
1100     preprocess(statsData, LBSimulation::simProcs);
1101     CmiPrintf("%s> Strategy starts ... \n", lbname);
1102     LBMigrateMsg* migrateMsg = Strategy(statsData, LBSimulation::simProcs);
1103     CmiPrintf("%s> Strategy took %fs memory usage: CentralLB:%dKB. \n", 
1104                lbname, CkWallTimer()-startT, (int)(useMem()/1000));
1105
1106     // now calculate the results of the load balancing simulation
1107     findSimResults(statsData, LBSimulation::simProcs, migrateMsg, simResults);
1108
1109     // now we have the simulation data, so print it and loop
1110     CmiPrintf("Charm++> LBSim: Simulation of load balancing step %d done.\n",LBSimulation::simStep);
1111     // **CWL** Officially recording my disdain here for using ints for bool
1112     if (LBSimulation::showDecisionsOnly) {
1113       simResults->PrintDecisions(migrateMsg, simFileName, 
1114                                  LBSimulation::simProcs);
1115     } else {
1116       simResults->PrintSimulationResults();
1117     }
1118
1119     free(simFileName);
1120     delete migrateMsg;
1121     CmiPrintf("Charm++> LBSim: Passing to the next step\n");
1122   }
1123   // deallocate simResults
1124   delete simResults;
1125   CmiPrintf("Charm++> Exiting...\n");
1126   CkExit();
1127 }
1128
1129 void CentralLB::readStatsMsgs(const char* filename) 
1130 {
1131 #if CMK_LBDB_ON
1132   int i;
1133   FILE *f = fopen(filename, "r");
1134   if (f==NULL) {
1135     CmiPrintf("Fatal Error> Cannot open LB Dump file %s!\n", filename);
1136     CmiAbort("");
1137   }
1138
1139   // at this stage, we need to rebuild the statsMsgList and
1140   // statsDataList structures. For that first deallocate the
1141   // old structures
1142   if (statsMsgsList) {
1143     for(i = 0; i < stats_msg_count; i++)
1144       delete statsMsgsList[i];
1145     delete[] statsMsgsList;
1146     statsMsgsList=0;
1147   }
1148
1149   PUP::fromDisk pd(f);
1150   PUP::machineInfo machInfo;
1151
1152   pd((char *)&machInfo, sizeof(machInfo));      // read machine info
1153   PUP::xlater p(machInfo, pd);
1154
1155   if (_lb_args.lbversion() > 1) {
1156     p|_lb_args.lbversion();             // write version number
1157     CkPrintf("LB> File version detected: %d\n", _lb_args.lbversion());
1158     CmiAssert(_lb_args.lbversion() <= LB_FORMAT_VERSION);
1159   }
1160   p|stats_msg_count;
1161
1162   CmiPrintf("readStatsMsgs for %d pes starts ... \n", stats_msg_count);
1163   if (LBSimulation::simProcs == 0) LBSimulation::simProcs = stats_msg_count;
1164   if (LBSimulation::simProcs != stats_msg_count) LBSimulation::procsChanged = true;
1165
1166   // LBSimulation::simProcs must be set
1167   statsData->pup(p);
1168
1169   CmiPrintf("Simulation for %d pes \n", LBSimulation::simProcs);
1170   CmiPrintf("n_obj: %d n_migratble: %d \n", statsData->n_objs, statsData->n_migrateobjs);
1171
1172   // file f is closed in the destructor of PUP::fromDisk
1173   CmiPrintf("ReadStatsMsg from %s completed\n", filename);
1174 #endif
1175 }
1176
1177 void CentralLB::writeStatsMsgs(const char* filename) 
1178 {
1179 #if CMK_LBDB_ON
1180   FILE *f = fopen(filename, "w");
1181   if (f==NULL) {
1182     CmiPrintf("Fatal Error> writeStatsMsgs failed to open the output file %s!\n", filename);
1183     CmiAbort("");
1184   }
1185
1186   const PUP::machineInfo &machInfo = PUP::machineInfo::current();
1187   PUP::toDisk p(f);
1188   p((char *)&machInfo, sizeof(machInfo));       // machine info
1189
1190   p|_lb_args.lbversion();               // write version number
1191   p|stats_msg_count;
1192   statsData->pup(p);
1193
1194   fclose(f);
1195
1196   CmiPrintf("WriteStatsMsgs to %s succeed!\n", filename);
1197 #endif
1198 }
1199
1200 // calculate the predicted wallclock/cpu load for every processors
1201 // considering communication overhead if considerComm is true
1202 void getPredictedLoadWithMsg(BaseLB::LDStats* stats, int count, 
1203                       LBMigrateMsg *msg, LBInfo &info, 
1204                       int considerComm)
1205 {
1206 #if CMK_LBDB_ON
1207         stats->makeCommHash();
1208
1209         // update to_proc according to migration msgs
1210         for(int i = 0; i < msg->n_moves; i++) {
1211           MigrateInfo &mInfo = msg->moves[i];
1212           int idx = stats->getHash(mInfo.obj.objID(), mInfo.obj.omID());
1213           CmiAssert(idx != -1);
1214           stats->to_proc[idx] = mInfo.to_pe;
1215         }
1216
1217         info.getInfo(stats, count, considerComm);
1218 #endif
1219 }
1220
1221
1222 void CentralLB::findSimResults(LDStats* stats, int count, LBMigrateMsg* msg, LBSimulation* simResults)
1223 {
1224     CkAssert(simResults != NULL && count == simResults->numPes);
1225     // estimate the new loads of the processors. As a first approximation, this is the
1226     // sum of the cpu times of the objects on that processor
1227     double startT = CkWallTimer();
1228     getPredictedLoadWithMsg(stats, count, msg, simResults->lbinfo, 1);
1229     CmiPrintf("getPredictedLoad finished in %fs\n", CkWallTimer()-startT);
1230 }
1231
1232 void CentralLB::pup(PUP::er &p) { 
1233   BaseLB::pup(p); 
1234   if (p.isUnpacking())  {
1235     initLB(CkLBOptions(seqno)); 
1236   }
1237 }
1238
1239 int CentralLB::useMem() { 
1240   return sizeof(CentralLB) + statsData->useMem() + 
1241          CkNumPes() * sizeof(CLBStatsMsg *);
1242 }
1243
1244
1245 /**
1246   CLBStatsMsg is not a real message now.
1247   CLBStatsMsg is used for all processors to fill in their local load and comm
1248   statistics and send to processor 0
1249 */
1250
1251 CLBStatsMsg::CLBStatsMsg(int osz, int csz) {
1252   n_objs = osz;
1253   n_comm = csz;
1254   objData = new LDObjData[osz];
1255   commData = new LDCommData[csz];
1256   avail_vector = NULL;
1257 }
1258
1259 CLBStatsMsg::~CLBStatsMsg() {
1260   delete [] objData;
1261   delete [] commData;
1262   if (avail_vector) delete [] avail_vector;
1263 }
1264
1265 void CLBStatsMsg::pup(PUP::er &p) {
1266   int i;
1267   p|from_pe;
1268   p|pe_speed;
1269   p|total_walltime; p|total_cputime;
1270   p|idletime;
1271   p|bg_walltime;   p|bg_cputime;
1272   p|n_objs;
1273   if (p.isUnpacking()) objData = new LDObjData[n_objs];
1274   for (i=0; i<n_objs; i++) p|objData[i];
1275   p|n_comm;
1276   if (p.isUnpacking()) commData = new LDCommData[n_comm];
1277   for (i=0; i<n_comm; i++) p|commData[i];
1278
1279   int has_avail_vector;
1280   if (!p.isUnpacking()) has_avail_vector = (avail_vector != NULL);
1281   p|has_avail_vector;
1282   if (p.isUnpacking()) {
1283     if (has_avail_vector) avail_vector = new char[CkNumPes()];
1284     else avail_vector = NULL;
1285   }
1286   if (has_avail_vector) p(avail_vector, CkNumPes());
1287
1288   p(next_lb);
1289 }
1290
1291 // CkMarshalledCLBStatsMessage is used in the marshalled parameter in
1292 // the entry function, it is just used to use to pup.
1293 // I don't use CLBStatsMsg directly as marshalled parameter because
1294 // I want the data pointer stored and not to be freed by the Charm++.
1295 void CkMarshalledCLBStatsMessage::free() { 
1296   int count = msgs.size();
1297   for  (int i=0; i<count; i++) 
1298     if (msgs[i]) delete msgs[i];
1299   msgs.free();
1300 }
1301
1302 void CkMarshalledCLBStatsMessage::add(CkMarshalledCLBStatsMessage &m)
1303 {
1304   int count = m.getCount();
1305   for (int i=0; i<count; i++) add(m.getMessage(i));
1306 }
1307
1308 void CkMarshalledCLBStatsMessage::pup(PUP::er &p)
1309 {
1310   int count = msgs.size();
1311   p|count;
1312   for (int i=0; i<count; i++) {
1313     CLBStatsMsg *msg;
1314     if (p.isUnpacking()) msg = new CLBStatsMsg;
1315     else { 
1316       msg = msgs[i]; CmiAssert(msg);
1317     }
1318     msg->pup(p);
1319     if (p.isUnpacking()) add(msg);
1320   }
1321 }
1322
1323 SpanningTree::SpanningTree()
1324 {
1325         double sq = sqrt(CkNumPes()*4.0-3.0) - 1; // 1 + arity + arity*arity = CkNumPes()
1326         arity = (int)ceil(sq/2);
1327         calcParent(CkMyPe());
1328         calcNumChildren(CkMyPe());
1329 }
1330
1331 void SpanningTree::calcParent(int n)
1332 {
1333         parent=-1;
1334         if(n != 0  && arity > 0)
1335                 parent = (n-1)/arity;
1336 }
1337
1338 void SpanningTree::calcNumChildren(int n)
1339 {
1340         numChildren = 0;
1341         if (arity == 0) return;
1342         int fullNode=(CkNumPes()-1-arity)/arity;
1343         if(n <= fullNode)
1344                 numChildren = arity;
1345         if(n == fullNode+1)
1346                 numChildren = CkNumPes()-1-(fullNode+1)*arity;
1347         if(n > fullNode+1)
1348                 numChildren = 0;
1349 }
1350
1351 #include "CentralLB.def.h"
1352  
1353 /*@}*/