compare avail_vector to NULL (xlc may not like it compares with 0)
[charm.git] / src / ck-ldb / CentralLB.C
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 /**
9  * \addtogroup CkLdb
10 */
11 /*@{*/
12
13 #include <charm++.h>
14 #include "envelope.h"
15 #include "CentralLB.h"
16 #include "LBDBManager.h"
17 #include "LBSimulation.h"
18
19 #define  DEBUGF(x)       // CmiPrintf x;
20 #define  DEBUG(x)        // x;
21
22 #if CMK_MEM_CHECKPOINT
23    /* can not handle reduction in inmem FT */
24 #define USE_REDUCTION         0
25 #define USE_LDB_SPANNING_TREE 0
26 #elif defined(_FAULT_MLOG_)
27 /* can not handle reduction in inmem FT */
28 #define USE_REDUCTION         0
29 #define USE_LDB_SPANNING_TREE 0
30 #else
31 #define USE_REDUCTION         1
32 #define USE_LDB_SPANNING_TREE 1
33 #endif
34
35 #ifdef _FAULT_MLOG_
36 extern int _restartFlag;
37 extern void getGlobalStep(CkGroupID );
38 extern void initMlogLBStep(CkGroupID );
39 extern int globalResumeCount;
40 extern void sendDummyMigrationCounts(int *);
41 #endif
42
43 #if CMK_GRID_QUEUE_AVAILABLE
44 CpvExtern(void *, CkGridObject);
45 #endif
46
47 CkGroupID loadbalancer;
48 int * lb_ptr;
49 int load_balancer_created;
50
51 CreateLBFunc_Def(CentralLB, "CentralLB base class");
52
53 static void getPredictedLoadWithMsg(BaseLB::LDStats* stats, int count, 
54                              LBMigrateMsg *, LBInfo &info, int considerComm);
55
56 /*
57 void CreateCentralLB()
58 {
59   CProxy_CentralLB::ckNew(0);
60 }
61 */
62
63 void CentralLB::staticStartLB(void* data)
64 {
65   CentralLB *me = (CentralLB*)(data);
66   me->StartLB();
67 }
68
69 void CentralLB::staticMigrated(void* data, LDObjHandle h, int waitBarrier)
70 {
71   CentralLB *me = (CentralLB*)(data);
72   me->Migrated(h, waitBarrier);
73 }
74
75 void CentralLB::staticAtSync(void* data)
76 {
77   CentralLB *me = (CentralLB*)(data);
78   me->AtSync();
79 }
80
81 void CentralLB::initLB(const CkLBOptions &opt)
82 {
83 #if CMK_LBDB_ON
84   lbname = "CentralLB";
85   thisProxy = CProxy_CentralLB(thisgroup);
86   //  CkPrintf("Construct in %d\n",CkMyPe());
87
88   // create and turn on by default
89   receiver = theLbdb->
90     AddLocalBarrierReceiver((LDBarrierFn)(staticAtSync),(void*)(this));
91   notifier = theLbdb->getLBDB()->
92     NotifyMigrated((LDMigratedFn)(staticMigrated),(void*)(this));
93   startLbFnHdl = theLbdb->getLBDB()->
94     AddStartLBFn((LDStartLBFn)(staticStartLB),(void*)(this));
95
96   // CkPrintf("[%d] CentralLB initLB \n",CkMyPe());
97   if (opt.getSeqNo() > 0) turnOff();
98
99   stats_msg_count = 0;
100   statsMsgsList = new CLBStatsMsg*[CkNumPes()];
101   for(int i=0; i < CkNumPes(); i++)
102     statsMsgsList[i] = 0;
103
104   statsData = new LDStats;
105
106   // for future predictor
107   if (_lb_predict) predicted_model = new FutureModel(_lb_predict_window);
108   else predicted_model=0;
109   // register user interface callbacks
110   theLbdb->getLBDB()->SetupPredictor((LDPredictModelFn)(staticPredictorOn),(LDPredictWindowFn)(staticPredictorOnWin),(LDPredictFn)(staticPredictorOff),(LDPredictModelFn)(staticChangePredictor),(void*)(this));
111
112   myspeed = theLbdb->ProcessorSpeed();
113
114   migrates_completed = 0;
115   future_migrates_completed = 0;
116   migrates_expected = -1;
117   future_migrates_expected = -1;
118   cur_ld_balancer = _lb_args.central_pe();      // 0 default
119   lbdone = 0;
120   count_msgs=0;
121   statsMsg = NULL;
122
123   if (_lb_args.statsOn()) theLbdb->CollectStatsOn();
124
125   load_balancer_created = 1;
126 #endif
127 }
128
129 CentralLB::~CentralLB()
130 {
131 #if CMK_LBDB_ON
132   delete [] statsMsgsList;
133   delete statsData;
134   theLbdb = CProxy_LBDatabase(_lbdb).ckLocalBranch();
135   if (theLbdb) {
136     theLbdb->getLBDB()->
137       RemoveNotifyMigrated(notifier);
138     theLbdb->
139       RemoveStartLBFn((LDStartLBFn)(staticStartLB));
140   }
141 #endif
142 }
143
144 void CentralLB::turnOn() 
145 {
146 #if CMK_LBDB_ON
147   theLbdb->getLBDB()->
148     TurnOnBarrierReceiver(receiver);
149   theLbdb->getLBDB()->
150     TurnOnNotifyMigrated(notifier);
151   theLbdb->getLBDB()->
152     TurnOnStartLBFn(startLbFnHdl);
153 #endif
154 }
155
156 void CentralLB::turnOff() 
157 {
158 #if CMK_LBDB_ON
159   theLbdb->getLBDB()->
160     TurnOffBarrierReceiver(receiver);
161   theLbdb->getLBDB()->
162     TurnOffNotifyMigrated(notifier);
163   theLbdb->getLBDB()->
164     TurnOffStartLBFn(startLbFnHdl);
165 #endif
166 }
167
168 void CentralLB::AtSync()
169 {
170 #if CMK_LBDB_ON
171   DEBUGF(("[%d] CentralLB AtSync step %d!!!!!\n",CkMyPe(),step()));
172
173 #ifdef _FAULT_MLOG_
174         CpvAccess(_currentObj)=this;
175 #endif
176
177   // if num of processor is only 1, nothing should happen
178   if (!QueryBalanceNow(step()) || CkNumPes() == 1) {
179     MigrationDone(0);
180     return;
181   }
182   if(CmiNodeAlive(CkMyPe())){
183     thisProxy [CkMyPe()].ProcessAtSync();
184   }
185 #endif
186 }
187
188 #include "ComlibStrategy.h"
189
190 void CentralLB::ProcessAtSync()
191 {
192
193 #if CMK_LBDB_ON
194   CmiAssert(CmiNodeAlive(CkMyPe()));
195   if (CkMyPe() == cur_ld_balancer) {
196     start_lb_time = CkWallTimer();
197   }
198
199
200 #ifdef _FAULT_MLOG_
201         initMlogLBStep(thisgroup);
202 #endif
203
204   // build message
205   BuildStatsMsg();
206
207 #if USE_REDUCTION
208     // reduction to get total number of objects and comm
209     // so that processor 0 can pre-allocate load balancing database
210   int counts[2];
211   counts[0] = theLbdb->GetObjDataSz();
212   counts[1] = theLbdb->GetCommDataSz();
213
214   CkCallback cb(CkIndex_CentralLB::ReceiveCounts((CkReductionMsg*)NULL), 
215                   thisProxy[0]);
216   contribute(2*sizeof(int), counts, CkReduction::sum_int, cb);
217 #else
218   SendStats();
219 #endif
220 #endif
221 }
222
223 // called only on 0
224 void CentralLB::ReceiveCounts(CkReductionMsg  *msg)
225 {
226   CmiAssert(CkMyPe() == 0);
227
228   int *counts = (int *)msg->getData();
229   int n_objs = counts[0];
230   int n_comm = counts[1];
231
232     // resize database
233   statsData->objData.resize(n_objs);
234   statsData->from_proc.resize(n_objs);
235   statsData->to_proc.resize(n_objs);
236   statsData->commData.resize(n_comm);
237
238   DEBUGF(("[%d] ReceiveCounts\n",CkMyPe()));
239         
240     // broadcast call to let everybody start to send stats
241   thisProxy.SendStats();
242 }
243
244 void CentralLB::BuildStatsMsg()
245 {
246 #if CMK_LBDB_ON
247   // build and send stats
248   const int osz = theLbdb->GetObjDataSz();
249   const int csz = theLbdb->GetCommDataSz();
250
251   int npes = CkNumPes();
252   CLBStatsMsg* msg = new CLBStatsMsg(osz, csz);
253   msg->from_pe = CkMyPe();
254 #ifdef _FAULT_MLOG_
255         msg->step = step();
256 #endif
257   //msg->serial = CrnRand();
258
259 /*
260   theLbdb->TotalTime(&msg->total_walltime,&msg->total_cputime);
261   theLbdb->IdleTime(&msg->idletime);
262   theLbdb->BackgroundLoad(&msg->bg_walltime,&msg->bg_cputime);
263 */
264   theLbdb->GetTime(&msg->total_walltime,&msg->total_cputime,
265                    &msg->idletime, &msg->bg_walltime,&msg->bg_cputime);
266   msg->pe_speed = myspeed;
267   DEBUGF(("Processor %d Total time (wall,cpu) = %f %f Idle = %f Bg = %f %f\n", CkMyPe(),msg->total_walltime,msg->total_cputime,msg->idletime,msg->bg_walltime,msg->bg_cputime));
268
269   msg->n_objs = osz;
270   theLbdb->GetObjData(msg->objData);
271   msg->n_comm = csz;
272   theLbdb->GetCommData(msg->commData);
273 //  theLbdb->ClearLoads();
274   DEBUGF(("PE %d sending stats to ReceiveStats %d objs, %d comm\n",CkMyPe(),msg->n_objs,msg->n_comm));
275
276   if(CkMyPe() == cur_ld_balancer) {
277     msg->avail_vector = new char[CkNumPes()];
278     LBDatabaseObj()->get_avail_vector(msg->avail_vector);
279     msg->next_lb = LBDatabaseObj()->new_lbbalancer();
280   }
281
282   CmiAssert(statsMsg == NULL);
283   statsMsg = msg;
284 #endif
285 }
286
287 // called on every processor
288 void CentralLB::SendStats()
289 {
290 #if CMK_LBDB_ON
291   CmiAssert(statsMsg != NULL);
292
293 #if USE_LDB_SPANNING_TREE
294   if(CkNumPes()>1024)
295   {
296     if (CkMyPe() == cur_ld_balancer)
297       thisProxy[CkMyPe()].ReceiveStats(statsMsg);
298     else
299       thisProxy[CkMyPe()].ReceiveStatsViaTree(statsMsg);
300   }
301   else
302 #endif
303         DEBUGF(("[%d] calling ReceiveStats on step %d \n",CmiMyPe(),step()));
304   thisProxy[cur_ld_balancer].ReceiveStats(statsMsg);
305
306   statsMsg = NULL;
307
308 #ifdef __BLUEGENE__
309   BgEndStreaming();
310 #endif
311
312   {
313   // enfore the barrier to wait until centralLB says no
314   LDOMHandle h;
315   h.id.id.idx = 0;
316   theLbdb->getLBDB()->RegisteringObjects(h);
317   }
318 #endif
319 }
320
321 #ifdef _FAULT_MLOG_
322 extern int donotCountMigration;
323 #endif
324
325 void CentralLB::Migrated(LDObjHandle h, int waitBarrier)
326 {
327 #ifdef _FAULT_MLOG_
328     if(donotCountMigration){
329         return ;
330     }
331 #endif
332
333 #if CMK_LBDB_ON
334   if (waitBarrier) {
335             migrates_completed++;
336       DEBUGF(("[%d] An object migrated! %d %d\n",CkMyPe(),migrates_completed,migrates_expected));
337     if (migrates_completed == migrates_expected) {
338       MigrationDone(1);
339     }
340   }
341   else {
342     future_migrates_completed ++;
343     DEBUGF(("[%d] An object migrated with no barrier! %d expected: %d\n",CkMyPe(),future_migrates_completed,future_migrates_expected));
344     if (future_migrates_completed == future_migrates_expected)  {
345         CheckMigrationComplete();
346     }
347   }
348 #endif
349 }
350
351 void CentralLB::MissMigrate(int waitForBarrier)
352 {
353   LDObjHandle h;
354   Migrated(h, waitForBarrier);
355 }
356
357 // build a complete data from bufferred messages
358 // not used when USE_REDUCTION = 1
359 void CentralLB::buildStats()
360 {
361     statsData->count = stats_msg_count;
362     // allocate space
363     statsData->objData.resize(statsData->n_objs);
364     statsData->from_proc.resize(statsData->n_objs);
365     statsData->to_proc.resize(statsData->n_objs);
366     statsData->commData.resize(statsData->n_comm);
367
368     int nobj = 0;
369     int ncom = 0;
370     int nmigobj = 0;
371     // copy all data in individule message to this big structure
372     for (int pe=0; pe<CkNumPes(); pe++) {
373        int i;
374        CLBStatsMsg *msg = statsMsgsList[pe];
375        if(msg == NULL) continue;
376        for (i=0; i<msg->n_objs; i++) {
377          statsData->from_proc[nobj] = statsData->to_proc[nobj] = pe;
378          statsData->objData[nobj] = msg->objData[i];
379          if (msg->objData[i].migratable) nmigobj++;
380          nobj++;
381        }
382        for (i=0; i<msg->n_comm; i++) {
383          statsData->commData[ncom] = msg->commData[i];
384          ncom++;
385        }
386        // free the memory
387        delete msg;
388        statsMsgsList[pe]=0;
389     }
390     statsData->n_migrateobjs = nmigobj;
391 }
392
393 // deposit one processor data at a time, note database is pre-allocated
394 // to have enough space
395 // used when USE_REDUCTION = 1
396 void CentralLB::depositData(CLBStatsMsg *m)
397 {
398   int i;
399   if (m == NULL) return;
400
401   const int pe = m->from_pe;
402   struct ProcStats &procStat = statsData->procs[pe];
403   procStat.pe = pe;
404   procStat.total_walltime = m->total_walltime;
405   procStat.total_cputime = m->total_cputime;
406   procStat.idletime = m->idletime;
407   procStat.bg_walltime = m->bg_walltime;
408   procStat.bg_cputime = m->bg_cputime;
409   procStat.pe_speed = m->pe_speed;
410   //procStat.utilization = 1.0;
411   procStat.available = CmiTrue;
412   procStat.n_objs = m->n_objs;
413
414   int &nobj = statsData->n_objs;
415   int &nmigobj = statsData->n_migrateobjs;
416   for (i=0; i<m->n_objs; i++) {
417       statsData->from_proc[nobj] = statsData->to_proc[nobj] = pe;
418       statsData->objData[nobj] = m->objData[i];
419       if (m->objData[i].migratable) nmigobj++;
420       nobj++;
421       CmiAssert(nobj <= statsData->objData.capacity());
422   }
423   int &n_comm = statsData->n_comm;
424   for (i=0; i<m->n_comm; i++) {
425       statsData->commData[n_comm] = m->commData[i];
426       n_comm++;
427       CmiAssert(n_comm <= statsData->commData.capacity());
428   }
429   delete m;
430 }
431
432 void CentralLB::ReceiveStats(CkMarshalledCLBStatsMessage &msg)
433 {
434 #if CMK_LBDB_ON
435     //  loop through all CLBStatsMsg in the incoming msg
436   for (int num = 0; num < msg.getCount(); num++) 
437   {
438     CLBStatsMsg *m = (CLBStatsMsg *)msg.getMessage(num);
439     const int pe = m->from_pe;
440         DEBUGF(("Stats msg received, %d %d %d %p step %d\n", pe,stats_msg_count,m->n_objs,m,step()));
441 #ifdef _FAULT_MLOG_     
442 /*      
443  *              if(m->step < step()){
444  *                          //TODO: if a processor is redoing an old load balance step..
445  *                                      //tell it that the step is done and that it should not perform any migrations
446  *                                                  thisProxy[pe].ReceiveDummyMigration();
447  *                                                          }*/
448 #endif
449         
450     if(!CmiNodeAlive(pe)){
451         DEBUGF(("[%d] ReceiveStats called from invalidProcessor %d\n",CkMyPe(),pe));
452         continue;
453     }
454         
455     if (m->avail_vector) {
456       LBDatabaseObj()->set_avail_vector(m->avail_vector,  m->next_lb);
457     }
458
459     if (statsMsgsList[pe] != 0) {
460       CkPrintf("*** Unexpected CLBStatsMsg in ReceiveStats from PE %d ***\n",
461              pe);
462     } else {
463       statsMsgsList[pe] = m;
464 #if USE_REDUCTION
465       depositData(m);
466 #else
467       // store per processor data right away
468       struct ProcStats &procStat = statsData->procs[pe];
469       procStat.pe = pe;
470       procStat.total_walltime = m->total_walltime;
471       procStat.total_cputime = m->total_cputime;
472       procStat.idletime = m->idletime;
473       procStat.bg_walltime = m->bg_walltime;
474       procStat.bg_cputime = m->bg_cputime;
475       procStat.pe_speed = m->pe_speed;
476       //procStat.utilization = 1.0;
477       procStat.available = CmiTrue;
478       procStat.n_objs = m->n_objs;
479
480       statsData->n_objs += m->n_objs;
481       statsData->n_comm += m->n_comm;
482 #endif
483       stats_msg_count++;
484     }
485   }    // end of for
486
487   const int clients = CkNumValidPes();
488   DEBUGF(("THIS POINT count = %d, clients = %d\n",stats_msg_count,clients));
489  
490   if (stats_msg_count == clients) {
491         DEBUGF(("[%d] All stats messages received \n",CmiMyPe()));
492     statsData->count = stats_msg_count;
493     thisProxy[CkMyPe()].LoadBalance();
494   }
495 #endif
496 }
497
498 /** added by Abhinav for receiving msgs via spanning tree */
499 void CentralLB::ReceiveStatsViaTree(CkMarshalledCLBStatsMessage &msg)
500 {
501 #if CMK_LBDB_ON
502         CmiAssert(CkMyPe() != 0);
503         bufMsg.add(msg);         // buffer messages
504         count_msgs++;
505         //CkPrintf("here %d\n", CkMyPe());
506         if (count_msgs == st.numChildren+1) {
507                 if(st.parent == 0)
508                 {
509                         thisProxy[0].ReceiveStats(bufMsg);
510                         //CkPrintf("from %d\n", CkMyPe());
511                 }
512                 else
513                         thisProxy[st.parent].ReceiveStatsViaTree(bufMsg);
514                 count_msgs = 0;
515                 bufMsg.free();
516         } 
517 #endif
518 }
519
520 void CentralLB::LoadBalance()
521 {
522 #if CMK_LBDB_ON
523   int proc;
524   const int clients = CkNumPes();
525
526 #if ! USE_REDUCTION
527   // build data
528   buildStats();
529 #else
530   for (proc = 0; proc < clients; proc++) statsMsgsList[proc] = NULL;
531 #endif
532
533   if (_lb_args.debug()) 
534       CmiPrintf("[%s] Load balancing step %d starting at %f in PE%d Memory:%dMB\n",
535                  lbName(), step(),start_lb_time, cur_ld_balancer, CmiMemoryUsage()/(1024.0*1024.0));
536  //     CmiPrintf("[%d] n_obj:%d migratable:%d n_comm:%d\n", CkMyPe(), statsData->n_objs, statsData->n_migrateobjs, statsData->n_comm);
537 //    double strat_start_time = CkWallTimer();
538
539   // if we are in simulation mode read data
540   if (LBSimulation::doSimulation) simulationRead();
541
542   char *availVector = LBDatabaseObj()->availVector();
543   for(proc = 0; proc < clients; proc++)
544       statsData->procs[proc].available = (CmiBool)availVector[proc];
545
546   preprocess(statsData, clients);
547
548 //    CkPrintf("Before Calling Strategy\n");
549
550   if (_lb_args.printSummary()) {
551       LBInfo info(clients);
552         // not take comm data
553       info.getInfo(statsData, clients, 0);
554       double mLoad, mCpuLoad, totalLoad;
555       info.getSummary(mLoad, mCpuLoad, totalLoad);
556       int nmsgs, nbytes;
557       statsData->computeNonlocalComm(nmsgs, nbytes);
558       CkPrintf("[%d] Load Summary (before LB): max (with bg load): %f max (obj only): %f average: %f at step %d nonlocal: %d msgs %.2fKB.\n", CkMyPe(), mLoad, mCpuLoad, totalLoad/clients, step(), nmsgs, 1.0*nbytes/1024);
559 //      if (_lb_args.debug() > 1) {
560 //        for (int i=0; i<statsData->n_objs; i++)
561 //          CmiPrintf("[%d] %.10f %.10f\n", i, statsData->objData[i].minWall, statsData->objData[i].maxWall);
562 //      }
563   }
564
565   double strat_start_time = CkWallTimer();
566   LBMigrateMsg* migrateMsg = Strategy(statsData, clients);
567 #ifdef _FAULT_MLOG_
568         migrateMsg->step = step();
569 #endif
570   if (_lb_args.debug()) {
571     CkPrintf("Strategy took %f seconds.\n", CkWallTimer()-strat_start_time);
572     double lbdbMemsize = LBDatabase::Object()->useMem()/1000;
573     CkPrintf("[%s] memUsage: LBManager:%dKB CentralLB:%dKB\n", 
574               lbName(), (int)lbdbMemsize, (int)(useMem()/1000));
575   }
576
577 //    CkPrintf("returned successfully\n");
578
579   LBDatabaseObj()->get_avail_vector(migrateMsg->avail_vector);
580   migrateMsg->next_lb = LBDatabaseObj()->new_lbbalancer();
581
582   // if this is the step at which we need to dump the database
583   simulationWrite();
584
585 //  calculate predicted load
586 //  very time consuming though, so only happen when debugging is on
587   if (_lb_args.printSummary()) {
588       LBInfo info(clients);
589         // not take comm data
590       getPredictedLoadWithMsg(statsData, clients, migrateMsg, info, 0);
591       double mLoad, mCpuLoad, totalLoad;
592       info.getSummary(mLoad, mCpuLoad, totalLoad);
593       int nmsgs, nbytes;
594       statsData->computeNonlocalComm(nmsgs, nbytes);
595       CkPrintf("[%d] Load Summary (after LB): max (with bg load): %f max (obj only): %f average: %f at step %d nonlocal: %d msgs %.2fKB useMem: %.2fKB.\n", CkMyPe(), mLoad, mCpuLoad, totalLoad/clients, step(), nmsgs, 1.0*nbytes/1024, (1.0*useMem())/1024);
596       for (int i=0; i<clients; i++)
597         migrateMsg->expectedLoad[i] = info.peLoads[i];
598   }
599
600   DEBUGF(("[%d]calling recv migration\n",CkMyPe()));
601 #ifdef _FAULT_MLOG_ 
602     lbDecisionCount++;
603     migrateMsg->lbDecisionCount = lbDecisionCount;
604 #endif
605   thisProxy.ReceiveMigration(migrateMsg);
606
607   // Zero out data structures for next cycle
608   // CkPrintf("zeroing out data\n");
609   statsData->clear();
610   stats_msg_count=0;
611 #endif
612 }
613
614 //    double strat_end_time = CkWallTimer();
615 //    CkPrintf("Strat elapsed time %f\n",strat_end_time-strat_start_time);
616 // test if sender and receiver in a commData is nonmigratable.
617 static int isMigratable(LDObjData **objData, int *len, int count, const LDCommData &commData)
618 {
619 #if CMK_LBDB_ON
620   for (int pe=0 ; pe<count; pe++)
621   {
622     for (int i=0; i<len[pe]; i++)
623       if (LDObjIDEqual(objData[pe][i].objID(), commData.sender.objID()) ||
624           LDObjIDEqual(objData[pe][i].objID(), commData.receiver.get_destObj().objID())) 
625       return 0;
626   }
627 #endif
628   return 1;
629 }
630
631 // rebuild LDStats and remove all non-migratble objects and related things
632 void CentralLB::removeNonMigratable(LDStats* stats, int count)
633 {
634   int i;
635
636   // check if we have non-migratable objects
637   int have = 0;
638   for (i=0; i<stats->n_objs; i++) 
639   {
640     LDObjData &odata = stats->objData[i];
641     if (!odata.migratable) {
642       have = 1; break;
643     }
644   }
645   if (have == 0) return;
646
647   CkVec<LDObjData> nonmig;
648   CkVec<int> new_from_proc, new_to_proc;
649   nonmig.resize(stats->n_migrateobjs);
650   new_from_proc.resize(stats->n_migrateobjs);
651   new_to_proc.resize(stats->n_migrateobjs);
652   int n_objs = 0;
653   for (i=0; i<stats->n_objs; i++) 
654   {
655     LDObjData &odata = stats->objData[i];
656     if (odata.migratable) {
657       nonmig[n_objs] = odata;
658       new_from_proc[n_objs] = stats->from_proc[i];
659       new_to_proc[n_objs] = stats->to_proc[i];
660       n_objs ++;
661     }
662     else {
663       stats->procs[stats->from_proc[i]].bg_walltime += odata.wallTime;
664       stats->procs[stats->from_proc[i]].bg_cputime += odata.cpuTime;
665     }
666   }
667   CmiAssert(stats->n_migrateobjs == n_objs);
668
669   stats->makeCommHash();
670   
671   CkVec<LDCommData> newCommData;
672   newCommData.resize(stats->n_comm);
673   int n_comm = 0;
674   for (i=0; i<stats->n_comm; i++) 
675   {
676     LDCommData& cdata = stats->commData[i];
677     if (!cdata.from_proc()) 
678     {
679       int idx = stats->getSendHash(cdata);
680       CmiAssert(idx != -1);
681       if (!stats->objData[idx].migratable) continue;
682     }
683     switch (cdata.receiver.get_type()) {
684     case LD_PROC_MSG:
685       break;
686     case LD_OBJ_MSG:  {
687       int idx = stats->getRecvHash(cdata);
688       if (stats->complete_flag)
689         CmiAssert(idx != -1);
690       else if (idx == -1) continue;          // receiver not in this group
691       if (!stats->objData[idx].migratable) continue;
692       break;
693       }
694     case LD_OBJLIST_MSG:    // object message FIXME add multicast
695       break;
696     }
697     newCommData[n_comm] = cdata;
698     n_comm ++;
699   }
700
701   if (n_objs != stats->n_objs) CmiPrintf("Removed %d nonmigratable %d comms - n_objs:%d migratable:%d\n", stats->n_objs-n_objs, stats->n_objs, stats->n_migrateobjs, stats->n_comm-n_comm);
702
703   // swap to new data
704   stats->objData = nonmig;
705   stats->from_proc = new_from_proc;
706   stats->to_proc = new_to_proc;
707   stats->n_objs = n_objs;
708
709   stats->commData = newCommData;
710   stats->n_comm = n_comm;
711
712   stats->deleteCommHash();
713   stats->makeCommHash();
714
715 }
716
717
718 #ifdef _FAULT_MLOG_
719 extern int restarted;
720 #endif
721
722 void CentralLB::ReceiveMigration(LBMigrateMsg *m)
723 {
724 #if CMK_LBDB_ON
725         int i;
726
727 #ifdef _FAULT_MLOG_
728         int *dummyCounts;
729
730         DEBUGF(("[%d] Starting ReceiveMigration WITH step %d m->step %d\n",CkMyPe(),step(),m->step));
731         // CmiPrintf("[%d] Starting ReceiveMigration step %d m->step %d\n",CkMyPe(),step(),m->step);
732         if(step() > m->step){
733                 char str[100];
734                 envelope *env = UsrToEnv(m);
735                 CmiPrintf("[%d] Object %s tProcessed %d m->TN %d\n",CmiMyPe(),mlogData->objID.toString(str),mlogData->tProcessed,env->TN);
736                 return;
737         }
738         lbDecisionCount = m->lbDecisionCount;
739 #endif
740
741   for (i=0; i<CkNumPes(); i++) theLbdb->lastLBInfo.expectedLoad[i] = m->expectedLoad[i];
742   CmiAssert(migrates_expected <= 0 || migrates_completed == migrates_expected);
743 /*FAULT_EVAC*/
744   if(!CmiNodeAlive(CkMyPe())){
745         delete m;
746         return;
747   }
748   migrates_expected = 0;
749   future_migrates_expected = 0;
750 #ifdef _FAULT_MLOG_
751         int sending=0;
752     int dummy=0;
753         LBDB *_myLBDB = theLbdb->getLBDB();
754         if(_restartFlag){
755         dummyCounts = new int[CmiNumPes()];
756         bzero(dummyCounts,sizeof(int)*CmiNumPes());
757     }
758 #endif
759   for(i=0; i < m->n_moves; i++) {
760     MigrateInfo& move = m->moves[i];
761     const int me = CkMyPe();
762     if (move.from_pe == me && move.to_pe != me) {
763       DEBUGF(("[%d] migrating object to %d\n",move.from_pe,move.to_pe));
764       // migrate object, in case it is already gone, inform toPe
765 #ifndef _FAULT_MLOG_
766       if (theLbdb->Migrate(move.obj,move.to_pe) == 0) 
767          thisProxy[move.to_pe].MissMigrate(!move.async_arrival);
768 #else
769             if(_restartFlag == 0){
770                 DEBUG(CmiPrintf("[%d] need to move object from %d to %d \n",CkMyPe(),move.from_pe,move.to_pe));
771                 theLbdb->Migrate(move.obj,move.to_pe);
772                 sending++;
773             }else{
774                 if(_myLBDB->validObjHandle(move.obj)){
775                     DEBUG(CmiPrintf("[%d] need to move object from %d to %d \n",CkMyPe(),move.from_pe,move.to_pe));
776                     theLbdb->Migrate(move.obj,move.to_pe);
777                     sending++;
778                 }else{
779                     DEBUG(CmiPrintf("[%d] dummy move to pe %d detected after restart \n",CmiMyPe(),move.to_pe));
780                     dummyCounts[move.to_pe]++;
781                     dummy++;
782                 }
783             }
784 #endif
785     } else if (move.from_pe != me && move.to_pe == me) {
786        DEBUGF(("[%d] expecting object from %d\n",move.to_pe,move.from_pe));
787       if (!move.async_arrival) migrates_expected++;
788       else future_migrates_expected++;
789     }
790   }
791   DEBUGF(("[%d] in ReceiveMigration %d moves expected: %d future expected: %d\n",CkMyPe(),m->n_moves, migrates_expected, future_migrates_expected));
792   // if (_lb_debug) CkPrintf("[%d] expecting %d objects migrating.\n", CkMyPe(), migrates_expected);
793
794 #ifdef _FAULT_MLOG_
795         if(_restartFlag){
796                 sendDummyMigrationCounts(dummyCounts);
797                 _restartFlag  =0;
798         delete []dummyCounts;
799         }
800 #endif
801
802
803 #if 0
804   if (m->n_moves ==0) {
805     theLbdb->SetLBPeriod(theLbdb->GetLBPeriod()*2);
806   }
807 #endif
808   cur_ld_balancer = m->next_lb;
809   if((CkMyPe() == cur_ld_balancer) && (cur_ld_balancer != 0)){
810       LBDatabaseObj()->set_avail_vector(m->avail_vector, -2);
811   }
812
813   if (migrates_expected == 0 || migrates_completed == migrates_expected)
814     MigrationDone(1);
815   delete m;
816
817 //      CkEvacuatedElement();
818 #ifdef _FAULT_MLOG_
819 //  migrates_expected = 0;
820 //  //  ResumeClients(1);
821 #endif
822 #endif
823 }
824
825 #ifdef _FAULT_MLOG_
826 void CentralLB::ReceiveDummyMigration(int globalDecisionCount){
827     DEBUGF(("[%d] ReceiveDummyMigration called for step %d with globalDecisionCount %d\n",CkMyPe(),step(),globalDecisionCount));
828     //TODO: this is gonna be important when a crash happens during checkpoint
829     //the globalDecisionCount would have to be saved and compared against
830     //a future recvMigration
831                 
832         thisProxy[CkMyPe()].ResumeClients(1);
833 }
834 #endif
835
836 void CentralLB::MigrationDone(int balancing)
837 {
838 #if CMK_LBDB_ON
839   migrates_completed = 0;
840   migrates_expected = -1;
841   // clear load stats
842   if (balancing) theLbdb->ClearLoads();
843   // Increment to next step
844   theLbdb->incStep();
845         DEBUGF(("[%d] Incrementing Step %d \n",CkMyPe(),step()));
846   // if sync resume, invoke a barrier
847
848 #ifdef _FAULT_MLOG_
849     savedBalancing = balancing;
850     startLoadBalancingMlog(&resumeCentralLbAfterChkpt,(void *)this);
851 #endif
852
853   LoadbalanceDone(balancing);        // callback
854 #ifndef _FAULT_MLOG_
855   // if sync resume invoke a barrier
856   if (balancing && _lb_args.syncResume()) {
857     CkCallback cb(CkIndex_CentralLB::ResumeClients((CkReductionMsg*)NULL), 
858                   thisProxy);
859     contribute(0, NULL, CkReduction::sum_int, cb);
860   }
861   else{ 
862     if(CmiNodeAlive(CkMyPe())){
863         thisProxy [CkMyPe()].ResumeClients(balancing);
864     }   
865   }     
866 #if CMK_GRID_QUEUE_AVAILABLE
867   CmiGridQueueDeregisterAll ();
868   CpvAccess(CkGridObject) = NULL;
869 #endif
870 #endif 
871 #endif
872 }
873
874 #ifdef _FAULT_MLOG_
875 void CentralLB::endMigrationDone(int balancing){
876     DEBUGF(("[%d] CentralLB::endMigrationDone step %d\n",CkMyPe(),step()));
877
878
879   if (balancing && _lb_args.syncResume()) {
880     CkCallback cb(CkIndex_CentralLB::ResumeClients((CkReductionMsg*)NULL),
881                   thisProxy);
882     contribute(0, NULL, CkReduction::sum_int, cb);
883   }
884   else{
885     if(CmiNodeAlive(CkMyPe())){
886     DEBUGF(("[%d] Sending ResumeClients balancing %d \n",CkMyPe(),balancing));
887     thisProxy [CkMyPe()].ResumeClients(balancing);
888     }
889   }
890
891 }
892 #endif
893
894 #ifdef _FAULT_MLOG_
895 void resumeCentralLbAfterChkpt(void *_lb){
896     CentralLB *lb= (CentralLB *)_lb;
897     CpvAccess(_currentObj)=lb;
898     lb->endMigrationDone(lb->savedBalancing);
899 }
900 #endif
901
902
903 void CentralLB::ResumeClients(CkReductionMsg *msg)
904 {
905   ResumeClients(1);
906   delete msg;
907 }
908
909 void CentralLB::ResumeClients(int balancing)
910 {
911 #if CMK_LBDB_ON
912 #ifdef _FAULT_MLOG_
913     resumeCount++;
914     globalResumeCount = resumeCount;
915 #endif
916   DEBUGF(("[%d] Resuming clients. balancing:%d.\n",CkMyPe(),balancing));
917   if (balancing && _lb_args.debug() && CkMyPe() == cur_ld_balancer) {
918     double end_lb_time = CkWallTimer();
919   }
920
921 #ifndef _FAULT_MLOG_
922   if (balancing) ComlibNotifyMigrationDone();  
923 #endif
924
925   theLbdb->ResumeClients();
926   if (balancing)  {
927     CheckMigrationComplete();
928     if (future_migrates_expected == 0 || 
929             future_migrates_expected == future_migrates_completed) {
930       CheckMigrationComplete();
931     }
932   }
933 #endif
934 }
935
936 /*
937   migration of objects contains two different kinds:
938   (1) objects want to make a barrier for migration completion
939       (waitForBarrier is true)
940       migrationDone() to finish and resumeClients
941   (2) objects don't need a barrier
942   However, next load balancing can only happen when both migrations complete
943 */ 
944 void CentralLB::CheckMigrationComplete()
945 {
946 #if CMK_LBDB_ON
947   lbdone ++;
948   if (lbdone == 2) {
949     lbdone = 0;
950     future_migrates_expected = -1;
951     future_migrates_completed = 0;
952     DEBUGF(("[%d] Migration Complete\n", CkMyPe()));
953     // release local barrier  so that the next load balancer can go
954     LDOMHandle h;
955     h.id.id.idx = 0;
956     theLbdb->getLBDB()->DoneRegisteringObjects(h);
957     // switch to the next load balancer in the list
958     // subtle: called from Migrated() may result in Migrated() called in next LB
959     theLbdb->nextLoadbalancer(seqno);
960   }
961 #endif
962 }
963
964 void CentralLB::preprocess(LDStats* stats,int count)
965 {
966   if (_lb_args.ignoreBgLoad())
967     stats->clearBgLoad();
968
969   // Call the predictor for the future
970   if (_lb_predict) FuturePredictor(statsData);
971 }
972
973 // default load balancing strategy
974 LBMigrateMsg* CentralLB::Strategy(LDStats* stats,int count)
975 {
976 #if CMK_LBDB_ON
977   work(stats, count);
978
979   if (_lb_args.debug()>1)  {
980     CkPrintf("Obj Map:\n");
981     for (int i=0; i<stats->n_objs; i++) CkPrintf("%d ", stats->to_proc[i]);
982     CkPrintf("\n");
983   }
984
985   return createMigrateMsg(stats, count);
986 #else
987   return NULL;
988 #endif
989 }
990
991 void CentralLB::work(LDStats* stats,int count)
992 {
993   // does nothing but print the database
994   stats->print();
995 }
996
997 // generate migrate message from stats->from_proc and to_proc
998 LBMigrateMsg * CentralLB::createMigrateMsg(LDStats* stats,int count)
999 {
1000   int i;
1001   CkVec<MigrateInfo*> migrateInfo;
1002   for (i=0; i<stats->n_objs; i++) {
1003     LDObjData &objData = stats->objData[i];
1004     int frompe = stats->from_proc[i];
1005     int tope = stats->to_proc[i];
1006     if (frompe != tope) {
1007       //      CkPrintf("[%d] Obj %d migrating from %d to %d\n",
1008       //         CkMyPe(),obj,pe,dest);
1009       MigrateInfo *migrateMe = new MigrateInfo;
1010       migrateMe->obj = objData.handle;
1011       migrateMe->from_pe = frompe;
1012       migrateMe->to_pe = tope;
1013       migrateMe->async_arrival = objData.asyncArrival;
1014       migrateInfo.insertAtEnd(migrateMe);
1015     }
1016   }
1017
1018   int migrate_count=migrateInfo.length();
1019   LBMigrateMsg* msg = new(migrate_count,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
1020   msg->n_moves = migrate_count;
1021   for(i=0; i < migrate_count; i++) {
1022     MigrateInfo* item = (MigrateInfo*) migrateInfo[i];
1023     msg->moves[i] = *item;
1024     delete item;
1025     migrateInfo[i] = 0;
1026   }
1027   if (_lb_args.debug())
1028     CkPrintf("%s: %d objects migrating.\n", lbname, migrate_count);
1029   return msg;
1030 }
1031
1032 void CentralLB::simulationWrite() {
1033   if(step() == LBSimulation::dumpStep)
1034   {
1035     // here we are supposed to dump the database
1036     int dumpFileSize = strlen(LBSimulation::dumpFile) + 4;
1037     char *dumpFileName = (char *)malloc(dumpFileSize);
1038     while (sprintf(dumpFileName, "%s.%d", LBSimulation::dumpFile, LBSimulation::dumpStep) >= dumpFileSize) {
1039       free(dumpFileName);
1040       dumpFileSize+=3;
1041       dumpFileName = (char *)malloc(dumpFileSize);
1042     }
1043     writeStatsMsgs(dumpFileName);
1044     free(dumpFileName);
1045     CmiPrintf("LBDump: Dumped the load balancing data at step %d.\n",LBSimulation::dumpStep);
1046     ++LBSimulation::dumpStep;
1047     --LBSimulation::dumpStepSize;
1048     if (LBSimulation::dumpStepSize <= 0) { // prevent stupid step sizes
1049       CmiPrintf("Charm++> Exiting...\n");
1050       CkExit();
1051     }
1052     return;
1053   }
1054 }
1055
1056 void CentralLB::simulationRead() {
1057   LBSimulation *simResults = NULL, *realResults;
1058   LBMigrateMsg *voidMessage = new (0,0,0,0) LBMigrateMsg();
1059   voidMessage->n_moves=0;
1060   for ( ;LBSimulation::simStepSize > 0; --LBSimulation::simStepSize, ++LBSimulation::simStep) {
1061     // here we are supposed to read the data from the dump database
1062     int simFileSize = strlen(LBSimulation::dumpFile) + 4;
1063     char *simFileName = (char *)malloc(simFileSize);
1064     while (sprintf(simFileName, "%s.%d", LBSimulation::dumpFile, LBSimulation::simStep) >= simFileSize) {
1065       free(simFileName);
1066       simFileSize+=3;
1067       simFileName = (char *)malloc(simFileSize);
1068     }
1069     readStatsMsgs(simFileName);
1070
1071     // allocate simResults (only the first step)
1072     if (simResults == NULL) {
1073       simResults = new LBSimulation(LBSimulation::simProcs);
1074       realResults = new LBSimulation(LBSimulation::simProcs);
1075     }
1076     else {
1077       // should be the same number of procs of the original simulation!
1078       if (!LBSimulation::procsChanged) {
1079         // it means we have a previous step, so in simResults there is data.
1080         // we can now print the real effects of the load balancer during the simulation
1081         // or print the difference between the predicted data and the real one.
1082         realResults->reset();
1083         // reset to_proc of statsData to be equal to from_proc
1084         for (int k=0; k < statsData->n_objs; ++k) statsData->to_proc[k] = statsData->from_proc[k];
1085         findSimResults(statsData, LBSimulation::simProcs, voidMessage, realResults);
1086         simResults->PrintDifferences(realResults,statsData);
1087       }
1088       simResults->reset();
1089     }
1090
1091     // now pass it to the strategy routine
1092     double startT = CkWallTimer();
1093     preprocess(statsData, LBSimulation::simProcs);
1094     CmiPrintf("%s> Strategy starts ... \n", lbname);
1095     LBMigrateMsg* migrateMsg = Strategy(statsData, LBSimulation::simProcs);
1096     CmiPrintf("%s> Strategy took %fs memory usage: CentralLB:%dKB. \n", 
1097                lbname, CkWallTimer()-startT, (int)(useMem()/1000));
1098
1099     // now calculate the results of the load balancing simulation
1100     findSimResults(statsData, LBSimulation::simProcs, migrateMsg, simResults);
1101
1102     // now we have the simulation data, so print it and loop
1103     CmiPrintf("Charm++> LBSim: Simulation of load balancing step %d done.\n",LBSimulation::simStep);
1104     // **CWL** Officially recording my disdain here for using ints for bool
1105     if (LBSimulation::showDecisionsOnly) {
1106       simResults->PrintDecisions(migrateMsg, simFileName, 
1107                                  LBSimulation::simProcs);
1108     } else {
1109       simResults->PrintSimulationResults();
1110     }
1111
1112     free(simFileName);
1113     delete migrateMsg;
1114     CmiPrintf("Charm++> LBSim: Passing to the next step\n");
1115   }
1116   // deallocate simResults
1117   delete simResults;
1118   CmiPrintf("Charm++> Exiting...\n");
1119   CkExit();
1120 }
1121
1122 void CentralLB::readStatsMsgs(const char* filename) 
1123 {
1124 #if CMK_LBDB_ON
1125   int i;
1126   FILE *f = fopen(filename, "r");
1127   if (f==NULL) {
1128     CmiPrintf("Fatal Error> Cannot open LB Dump file %s!\n", filename);
1129     CmiAbort("");
1130   }
1131
1132   // at this stage, we need to rebuild the statsMsgList and
1133   // statsDataList structures. For that first deallocate the
1134   // old structures
1135   if (statsMsgsList) {
1136     for(i = 0; i < stats_msg_count; i++)
1137       delete statsMsgsList[i];
1138     delete[] statsMsgsList;
1139     statsMsgsList=0;
1140   }
1141
1142   PUP::fromDisk pd(f);
1143   PUP::machineInfo machInfo;
1144
1145   pd((char *)&machInfo, sizeof(machInfo));      // read machine info
1146   PUP::xlater p(machInfo, pd);
1147
1148   if (_lb_args.lbversion() > 1) {
1149     p|_lb_args.lbversion();             // write version number
1150     CkPrintf("LB> File version detected: %d\n", _lb_args.lbversion());
1151     CmiAssert(_lb_args.lbversion() <= LB_FORMAT_VERSION);
1152   }
1153   p|stats_msg_count;
1154
1155   CmiPrintf("readStatsMsgs for %d pes starts ... \n", stats_msg_count);
1156   if (LBSimulation::simProcs == 0) LBSimulation::simProcs = stats_msg_count;
1157   if (LBSimulation::simProcs != stats_msg_count) LBSimulation::procsChanged = true;
1158
1159   // LBSimulation::simProcs must be set
1160   statsData->pup(p);
1161
1162   CmiPrintf("Simulation for %d pes \n", LBSimulation::simProcs);
1163   CmiPrintf("n_obj: %d n_migratble: %d \n", statsData->n_objs, statsData->n_migrateobjs);
1164
1165   // file f is closed in the destructor of PUP::fromDisk
1166   CmiPrintf("ReadStatsMsg from %s completed\n", filename);
1167 #endif
1168 }
1169
1170 void CentralLB::writeStatsMsgs(const char* filename) 
1171 {
1172 #if CMK_LBDB_ON
1173   FILE *f = fopen(filename, "w");
1174   if (f==NULL) {
1175     CmiPrintf("Fatal Error> writeStatsMsgs failed to open the output file %s!\n", filename);
1176     CmiAbort("");
1177   }
1178
1179   const PUP::machineInfo &machInfo = PUP::machineInfo::current();
1180   PUP::toDisk p(f);
1181   p((char *)&machInfo, sizeof(machInfo));       // machine info
1182
1183   p|_lb_args.lbversion();               // write version number
1184   p|stats_msg_count;
1185   statsData->pup(p);
1186
1187   fclose(f);
1188
1189   CmiPrintf("WriteStatsMsgs to %s succeed!\n", filename);
1190 #endif
1191 }
1192
1193 // calculate the predicted wallclock/cpu load for every processors
1194 // considering communication overhead if considerComm is true
1195 void getPredictedLoadWithMsg(BaseLB::LDStats* stats, int count, 
1196                       LBMigrateMsg *msg, LBInfo &info, 
1197                       int considerComm)
1198 {
1199 #if CMK_LBDB_ON
1200         stats->makeCommHash();
1201
1202         // update to_proc according to migration msgs
1203         for(int i = 0; i < msg->n_moves; i++) {
1204           MigrateInfo &mInfo = msg->moves[i];
1205           int idx = stats->getHash(mInfo.obj.objID(), mInfo.obj.omID());
1206           CmiAssert(idx != -1);
1207           stats->to_proc[idx] = mInfo.to_pe;
1208         }
1209
1210         info.getInfo(stats, count, considerComm);
1211 #endif
1212 }
1213
1214
1215 void CentralLB::findSimResults(LDStats* stats, int count, LBMigrateMsg* msg, LBSimulation* simResults)
1216 {
1217     CkAssert(simResults != NULL && count == simResults->numPes);
1218     // estimate the new loads of the processors. As a first approximation, this is the
1219     // sum of the cpu times of the objects on that processor
1220     double startT = CkWallTimer();
1221     getPredictedLoadWithMsg(stats, count, msg, simResults->lbinfo, 1);
1222     CmiPrintf("getPredictedLoad finished in %fs\n", CkWallTimer()-startT);
1223 }
1224
1225 void CentralLB::pup(PUP::er &p) { 
1226   BaseLB::pup(p); 
1227   if (p.isUnpacking())  {
1228     initLB(CkLBOptions(seqno)); 
1229   }
1230 #ifdef _FAULT_MLOG_
1231         p | lbDecisionCount;
1232     p | resumeCount;
1233 #endif
1234         
1235 }
1236
1237 int CentralLB::useMem() { 
1238   return sizeof(CentralLB) + statsData->useMem() + 
1239          CkNumPes() * sizeof(CLBStatsMsg *);
1240 }
1241
1242
1243 /**
1244   CLBStatsMsg is not a real message now.
1245   CLBStatsMsg is used for all processors to fill in their local load and comm
1246   statistics and send to processor 0
1247 */
1248
1249 CLBStatsMsg::CLBStatsMsg(int osz, int csz) {
1250   n_objs = osz;
1251   n_comm = csz;
1252   objData = new LDObjData[osz];
1253   commData = new LDCommData[csz];
1254   avail_vector = NULL;
1255 }
1256
1257 CLBStatsMsg::~CLBStatsMsg() {
1258   delete [] objData;
1259   delete [] commData;
1260   if (avail_vector!=NULL) delete [] avail_vector;
1261 }
1262
1263 void CLBStatsMsg::pup(PUP::er &p) {
1264   int i;
1265   p|from_pe;
1266   p|pe_speed;
1267   p|total_walltime; p|total_cputime;
1268   p|idletime;
1269   p|bg_walltime;   p|bg_cputime;
1270   p|n_objs;
1271 #ifdef _FAULT_MLOG_
1272   p | step;
1273 #endif
1274   if (p.isUnpacking()) objData = new LDObjData[n_objs];
1275   for (i=0; i<n_objs; i++) p|objData[i];
1276   p|n_comm;
1277   if (p.isUnpacking()) commData = new LDCommData[n_comm];
1278   for (i=0; i<n_comm; i++) p|commData[i];
1279
1280   int has_avail_vector;
1281   if (!p.isUnpacking()) has_avail_vector = (avail_vector != NULL);
1282   p|has_avail_vector;
1283   if (p.isUnpacking()) {
1284     if (has_avail_vector) avail_vector = new char[CkNumPes()];
1285     else avail_vector = NULL;
1286   }
1287   if (has_avail_vector) p(avail_vector, CkNumPes());
1288
1289   p(next_lb);
1290 }
1291
1292 // CkMarshalledCLBStatsMessage is used in the marshalled parameter in
1293 // the entry function, it is just used to use to pup.
1294 // I don't use CLBStatsMsg directly as marshalled parameter because
1295 // I want the data pointer stored and not to be freed by the Charm++.
1296 void CkMarshalledCLBStatsMessage::free() { 
1297   int count = msgs.size();
1298   for  (int i=0; i<count; i++) 
1299     if (msgs[i]) delete msgs[i];
1300   msgs.free();
1301 }
1302
1303 void CkMarshalledCLBStatsMessage::add(CkMarshalledCLBStatsMessage &m)
1304 {
1305   int count = m.getCount();
1306   for (int i=0; i<count; i++) add(m.getMessage(i));
1307 }
1308
1309 void CkMarshalledCLBStatsMessage::pup(PUP::er &p)
1310 {
1311   int count = msgs.size();
1312   p|count;
1313   for (int i=0; i<count; i++) {
1314     CLBStatsMsg *msg;
1315     if (p.isUnpacking()) msg = new CLBStatsMsg;
1316     else { 
1317       msg = msgs[i]; CmiAssert(msg);
1318     }
1319     msg->pup(p);
1320     if (p.isUnpacking()) add(msg);
1321   }
1322 }
1323
1324 SpanningTree::SpanningTree()
1325 {
1326         double sq = sqrt(CkNumPes()*4.0-3.0) - 1; // 1 + arity + arity*arity = CkNumPes()
1327         arity = (int)ceil(sq/2);
1328         calcParent(CkMyPe());
1329         calcNumChildren(CkMyPe());
1330 }
1331
1332 void SpanningTree::calcParent(int n)
1333 {
1334         parent=-1;
1335         if(n != 0  && arity > 0)
1336                 parent = (n-1)/arity;
1337 }
1338
1339 void SpanningTree::calcNumChildren(int n)
1340 {
1341         numChildren = 0;
1342         if (arity == 0) return;
1343         int fullNode=(CkNumPes()-1-arity)/arity;
1344         if(n <= fullNode)
1345                 numChildren = arity;
1346         if(n == fullNode+1)
1347                 numChildren = CkNumPes()-1-(fullNode+1)*arity;
1348         if(n > fullNode+1)
1349                 numChildren = 0;
1350 }
1351
1352 #include "CentralLB.def.h"
1353  
1354 /*@}*/