do the same thing to statsData, only allocated when needed instead of on every pe.
[charm.git] / src / ck-ldb / CentralLB.C
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 /**
9  * \addtogroup CkLdb
10 */
11 /*@{*/
12
13 #include <charm++.h>
14 #include "envelope.h"
15 #include "CentralLB.h"
16 #include "LBDBManager.h"
17 #include "LBSimulation.h"
18
19 #define  DEBUGF(x)       // CmiPrintf x;
20 #define  DEBUG(x)        // x;
21
22 #if CMK_MEM_CHECKPOINT
23    /* can not handle reduction in inmem FT */
24 #define USE_REDUCTION         0
25 #define USE_LDB_SPANNING_TREE 0
26 #elif defined(_FAULT_MLOG_)
27 /* can not handle reduction in inmem FT */
28 #define USE_REDUCTION         0
29 #define USE_LDB_SPANNING_TREE 0
30 #else
31 #define USE_REDUCTION         1
32 #define USE_LDB_SPANNING_TREE 1
33 #endif
34
35 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
36 extern int _restartFlag;
37 extern void getGlobalStep(CkGroupID );
38 extern void initMlogLBStep(CkGroupID );
39 extern int globalResumeCount;
40 extern void sendDummyMigrationCounts(int *);
41 #endif
42
43 #if CMK_GRID_QUEUE_AVAILABLE
44 CpvExtern(void *, CkGridObject);
45 #endif
46
47 CkGroupID loadbalancer;
48 int * lb_ptr;
49 int load_balancer_created;
50
51 CreateLBFunc_Def(CentralLB, "CentralLB base class")
52
53 static void getPredictedLoadWithMsg(BaseLB::LDStats* stats, int count, 
54                              LBMigrateMsg *, LBInfo &info, int considerComm);
55
56 /*
57 void CreateCentralLB()
58 {
59   CProxy_CentralLB::ckNew(0);
60 }
61 */
62
63 void CentralLB::staticStartLB(void* data)
64 {
65   CentralLB *me = (CentralLB*)(data);
66   me->StartLB();
67 }
68
69 void CentralLB::staticMigrated(void* data, LDObjHandle h, int waitBarrier)
70 {
71   CentralLB *me = (CentralLB*)(data);
72   me->Migrated(h, waitBarrier);
73 }
74
75 void CentralLB::staticAtSync(void* data)
76 {
77   CentralLB *me = (CentralLB*)(data);
78   me->AtSync();
79 }
80
81 void CentralLB::initLB(const CkLBOptions &opt)
82 {
83 #if CMK_LBDB_ON
84   lbname = "CentralLB";
85   thisProxy = CProxy_CentralLB(thisgroup);
86   //  CkPrintf("Construct in %d\n",CkMyPe());
87
88   // create and turn on by default
89   receiver = theLbdb->
90     AddLocalBarrierReceiver((LDBarrierFn)(staticAtSync),(void*)(this));
91   notifier = theLbdb->getLBDB()->
92     NotifyMigrated((LDMigratedFn)(staticMigrated),(void*)(this));
93   startLbFnHdl = theLbdb->getLBDB()->
94     AddStartLBFn((LDStartLBFn)(staticStartLB),(void*)(this));
95
96   // CkPrintf("[%d] CentralLB initLB \n",CkMyPe());
97   if (opt.getSeqNo() > 0) turnOff();
98
99   stats_msg_count = 0;
100   statsMsgsList = NULL;
101   statsData = NULL;
102
103   // for future predictor
104   if (_lb_predict) predicted_model = new FutureModel(_lb_predict_window);
105   else predicted_model=0;
106   // register user interface callbacks
107   theLbdb->getLBDB()->SetupPredictor((LDPredictModelFn)(staticPredictorOn),(LDPredictWindowFn)(staticPredictorOnWin),(LDPredictFn)(staticPredictorOff),(LDPredictModelFn)(staticChangePredictor),(void*)(this));
108
109   myspeed = theLbdb->ProcessorSpeed();
110
111   migrates_completed = 0;
112   future_migrates_completed = 0;
113   migrates_expected = -1;
114   future_migrates_expected = -1;
115   cur_ld_balancer = _lb_args.central_pe();      // 0 default
116   lbdone = 0;
117   count_msgs=0;
118   statsMsg = NULL;
119
120   if (_lb_args.statsOn()) theLbdb->CollectStatsOn();
121
122   load_balancer_created = 1;
123 #endif
124 }
125
126 CentralLB::~CentralLB()
127 {
128 #if CMK_LBDB_ON
129   delete [] statsMsgsList;
130   delete statsData;
131   theLbdb = CProxy_LBDatabase(_lbdb).ckLocalBranch();
132   if (theLbdb) {
133     theLbdb->getLBDB()->
134       RemoveNotifyMigrated(notifier);
135     theLbdb->
136       RemoveStartLBFn((LDStartLBFn)(staticStartLB));
137   }
138 #endif
139 }
140
141 void CentralLB::turnOn() 
142 {
143 #if CMK_LBDB_ON
144   theLbdb->getLBDB()->
145     TurnOnBarrierReceiver(receiver);
146   theLbdb->getLBDB()->
147     TurnOnNotifyMigrated(notifier);
148   theLbdb->getLBDB()->
149     TurnOnStartLBFn(startLbFnHdl);
150 #endif
151 }
152
153 void CentralLB::turnOff() 
154 {
155 #if CMK_LBDB_ON
156   theLbdb->getLBDB()->
157     TurnOffBarrierReceiver(receiver);
158   theLbdb->getLBDB()->
159     TurnOffNotifyMigrated(notifier);
160   theLbdb->getLBDB()->
161     TurnOffStartLBFn(startLbFnHdl);
162 #endif
163 }
164
165 void CentralLB::AtSync()
166 {
167 #if CMK_LBDB_ON
168   DEBUGF(("[%d] CentralLB AtSync step %d!!!!!\n",CkMyPe(),step()));
169
170 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
171         CpvAccess(_currentObj)=this;
172 #endif
173
174   // if num of processor is only 1, nothing should happen
175   if (!QueryBalanceNow(step()) || CkNumPes() == 1) {
176     MigrationDone(0);
177     return;
178   }
179   if(CmiNodeAlive(CkMyPe())){
180     thisProxy [CkMyPe()].ProcessAtSync();
181   }
182 #endif
183 }
184
185 #include "ComlibStrategy.h"
186
187 void CentralLB::ProcessAtSync()
188 {
189
190 #if CMK_LBDB_ON
191   CmiAssert(CmiNodeAlive(CkMyPe()));
192   if (CkMyPe() == cur_ld_balancer) {
193     start_lb_time = CkWallTimer();
194   }
195
196
197 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
198         initMlogLBStep(thisgroup);
199 #endif
200
201   // build message
202   BuildStatsMsg();
203
204 #if USE_REDUCTION
205     // reduction to get total number of objects and comm
206     // so that processor 0 can pre-allocate load balancing database
207   int counts[2];
208   counts[0] = theLbdb->GetObjDataSz();
209   counts[1] = theLbdb->GetCommDataSz();
210
211   CkCallback cb(CkIndex_CentralLB::ReceiveCounts((CkReductionMsg*)NULL), 
212                   thisProxy[0]);
213   contribute(2*sizeof(int), counts, CkReduction::sum_int, cb);
214 #else
215   SendStats();
216 #endif
217 #endif
218 }
219
220 // called only on 0
221 void CentralLB::ReceiveCounts(CkReductionMsg  *msg)
222 {
223   CmiAssert(CkMyPe() == 0);
224   if (statsData == NULL) statsData = new LDStats;
225
226   int *counts = (int *)msg->getData();
227   int n_objs = counts[0];
228   int n_comm = counts[1];
229
230     // resize database
231   statsData->objData.resize(n_objs);
232   statsData->from_proc.resize(n_objs);
233   statsData->to_proc.resize(n_objs);
234   statsData->commData.resize(n_comm);
235
236   DEBUGF(("[%d] ReceiveCounts: n_objs:%d n_comm:%d\n",CkMyPe(), n_objs, n_comm));
237         
238     // broadcast call to let everybody start to send stats
239   thisProxy.SendStats();
240 }
241
242 void CentralLB::BuildStatsMsg()
243 {
244 #if CMK_LBDB_ON
245   // build and send stats
246   const int osz = theLbdb->GetObjDataSz();
247   const int csz = theLbdb->GetCommDataSz();
248
249   int npes = CkNumPes();
250   CLBStatsMsg* msg = new CLBStatsMsg(osz, csz);
251   _MEMCHECK(msg);
252   msg->from_pe = CkMyPe();
253 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
254         msg->step = step();
255 #endif
256   //msg->serial = CrnRand();
257
258 /*
259   theLbdb->TotalTime(&msg->total_walltime,&msg->total_cputime);
260   theLbdb->IdleTime(&msg->idletime);
261   theLbdb->BackgroundLoad(&msg->bg_walltime,&msg->bg_cputime);
262 */
263   theLbdb->GetTime(&msg->total_walltime,&msg->total_cputime,
264                    &msg->idletime, &msg->bg_walltime,&msg->bg_cputime);
265   msg->pe_speed = myspeed;
266   DEBUGF(("Processor %d Total time (wall,cpu) = %f %f Idle = %f Bg = %f %f\n", CkMyPe(),msg->total_walltime,msg->total_cputime,msg->idletime,msg->bg_walltime,msg->bg_cputime));
267
268   msg->n_objs = osz;
269   theLbdb->GetObjData(msg->objData);
270   msg->n_comm = csz;
271   theLbdb->GetCommData(msg->commData);
272 //  theLbdb->ClearLoads();
273   DEBUGF(("PE %d BuildStatsMsg %d objs, %d comm\n",CkMyPe(),msg->n_objs,msg->n_comm));
274
275   if(CkMyPe() == cur_ld_balancer) {
276     msg->avail_vector = new char[CkNumPes()];
277     LBDatabaseObj()->get_avail_vector(msg->avail_vector);
278     msg->next_lb = LBDatabaseObj()->new_lbbalancer();
279   }
280
281   CmiAssert(statsMsg == NULL);
282   statsMsg = msg;
283 #endif
284 }
285
286 // called on every processor
287 void CentralLB::SendStats()
288 {
289 #if CMK_LBDB_ON
290   CmiAssert(statsMsg != NULL);
291
292 #if USE_LDB_SPANNING_TREE
293   if(CkNumPes()>1024)
294   {
295     if (CkMyPe() == cur_ld_balancer)
296       thisProxy[CkMyPe()].ReceiveStats(statsMsg);
297     else
298       thisProxy[CkMyPe()].ReceiveStatsViaTree(statsMsg);
299   }
300   else
301 #endif
302   {
303     DEBUGF(("[%d] calling ReceiveStats on step %d \n",CmiMyPe(),step()));
304     thisProxy[cur_ld_balancer].ReceiveStats(statsMsg);
305   }
306
307   statsMsg = NULL;
308
309 #ifdef __BLUEGENE__
310   BgEndStreaming();
311 #endif
312
313   {
314   // enfore the barrier to wait until centralLB says no
315   LDOMHandle h;
316   h.id.id.idx = 0;
317   theLbdb->getLBDB()->RegisteringObjects(h);
318   }
319 #endif
320 }
321
322 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
323 extern int donotCountMigration;
324 #endif
325
326 void CentralLB::Migrated(LDObjHandle h, int waitBarrier)
327 {
328 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
329     if(donotCountMigration){
330         return ;
331     }
332 #endif
333
334 #if CMK_LBDB_ON
335   if (waitBarrier) {
336             migrates_completed++;
337       DEBUGF(("[%d] An object migrated! %d %d\n",CkMyPe(),migrates_completed,migrates_expected));
338     if (migrates_completed == migrates_expected) {
339       MigrationDone(1);
340     }
341   }
342   else {
343     future_migrates_completed ++;
344     DEBUGF(("[%d] An object migrated with no barrier! %d expected: %d\n",CkMyPe(),future_migrates_completed,future_migrates_expected));
345     if (future_migrates_completed == future_migrates_expected)  {
346         CheckMigrationComplete();
347     }
348   }
349 #endif
350 }
351
352 void CentralLB::MissMigrate(int waitForBarrier)
353 {
354   LDObjHandle h;
355   Migrated(h, waitForBarrier);
356 }
357
358 // build a complete data from bufferred messages
359 // not used when USE_REDUCTION = 1
360 void CentralLB::buildStats()
361 {
362     statsData->count = stats_msg_count;
363     // allocate space
364     statsData->objData.resize(statsData->n_objs);
365     statsData->from_proc.resize(statsData->n_objs);
366     statsData->to_proc.resize(statsData->n_objs);
367     statsData->commData.resize(statsData->n_comm);
368
369     int nobj = 0;
370     int ncom = 0;
371     int nmigobj = 0;
372     // copy all data in individule message to this big structure
373     for (int pe=0; pe<CkNumPes(); pe++) {
374        int i;
375        CLBStatsMsg *msg = statsMsgsList[pe];
376        if(msg == NULL) continue;
377        for (i=0; i<msg->n_objs; i++) {
378          statsData->from_proc[nobj] = statsData->to_proc[nobj] = pe;
379          statsData->objData[nobj] = msg->objData[i];
380          if (msg->objData[i].migratable) nmigobj++;
381          nobj++;
382        }
383        for (i=0; i<msg->n_comm; i++) {
384          statsData->commData[ncom] = msg->commData[i];
385          ncom++;
386        }
387        // free the memory
388        delete msg;
389        statsMsgsList[pe]=0;
390     }
391     statsData->n_migrateobjs = nmigobj;
392 }
393
394 // deposit one processor data at a time, note database is pre-allocated
395 // to have enough space
396 // used when USE_REDUCTION = 1
397 void CentralLB::depositData(CLBStatsMsg *m)
398 {
399   int i;
400   if (m == NULL) return;
401
402   const int pe = m->from_pe;
403   struct ProcStats &procStat = statsData->procs[pe];
404   procStat.pe = pe;
405   procStat.total_walltime = m->total_walltime;
406   procStat.total_cputime = m->total_cputime;
407   procStat.idletime = m->idletime;
408   procStat.bg_walltime = m->bg_walltime;
409   procStat.bg_cputime = m->bg_cputime;
410   procStat.pe_speed = m->pe_speed;
411   //procStat.utilization = 1.0;
412   procStat.available = CmiTrue;
413   procStat.n_objs = m->n_objs;
414
415   int &nobj = statsData->n_objs;
416   int &nmigobj = statsData->n_migrateobjs;
417   for (i=0; i<m->n_objs; i++) {
418       statsData->from_proc[nobj] = statsData->to_proc[nobj] = pe;
419       statsData->objData[nobj] = m->objData[i];
420       if (m->objData[i].migratable) nmigobj++;
421       nobj++;
422       CmiAssert(nobj <= statsData->objData.capacity());
423   }
424   int &n_comm = statsData->n_comm;
425   for (i=0; i<m->n_comm; i++) {
426       statsData->commData[n_comm] = m->commData[i];
427       n_comm++;
428       CmiAssert(n_comm <= statsData->commData.capacity());
429   }
430   delete m;
431 }
432
433 void CentralLB::ReceiveStats(CkMarshalledCLBStatsMessage &msg)
434 {
435 #if CMK_LBDB_ON
436   if (statsMsgsList == NULL) {
437     statsMsgsList = new CLBStatsMsg*[CkNumPes()];
438     CmiAssert(statsMsgsList != NULL);
439     for(int i=0; i < CkNumPes(); i++)
440       statsMsgsList[i] = 0;
441   }
442   if (statsData == NULL) statsData = new LDStats;
443
444     //  loop through all CLBStatsMsg in the incoming msg
445   int count = msg.getCount();
446   for (int num = 0; num < count; num++) 
447   {
448     CLBStatsMsg *m = msg.getMessage(num);
449     CmiAssert(m!=NULL);
450     const int pe = m->from_pe;
451     DEBUGF(("Stats msg received, %d %d %d %p step %d\n", pe,stats_msg_count,m->n_objs,m,step()));
452 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))     
453 /*      
454  *  if(m->step < step()){
455  *    //TODO: if a processor is redoing an old load balance step..
456  *    //tell it that the step is done and that it should not perform any migrations
457  *      thisProxy[pe].ReceiveDummyMigration();
458  *  }*/
459 #endif
460         
461     if(!CmiNodeAlive(pe)){
462         DEBUGF(("[%d] ReceiveStats called from invalidProcessor %d\n",CkMyPe(),pe));
463         continue;
464     }
465         
466     if (m->avail_vector!=NULL) {
467       LBDatabaseObj()->set_avail_vector(m->avail_vector,  m->next_lb);
468     }
469
470     if (statsMsgsList[pe] != 0) {
471       CkPrintf("*** Unexpected CLBStatsMsg in ReceiveStats from PE %d ***\n",
472              pe);
473     } else {
474       statsMsgsList[pe] = m;
475 #if USE_REDUCTION
476       depositData(m);
477 #else
478       // store per processor data right away
479       struct ProcStats &procStat = statsData->procs[pe];
480       procStat.pe = pe;
481       procStat.total_walltime = m->total_walltime;
482       procStat.total_cputime = m->total_cputime;
483       procStat.idletime = m->idletime;
484       procStat.bg_walltime = m->bg_walltime;
485       procStat.bg_cputime = m->bg_cputime;
486       procStat.pe_speed = m->pe_speed;
487       //procStat.utilization = 1.0;
488       procStat.available = CmiTrue;
489       procStat.n_objs = m->n_objs;
490
491       statsData->n_objs += m->n_objs;
492       statsData->n_comm += m->n_comm;
493 #endif
494       stats_msg_count++;
495     }
496   }    // end of for
497
498   const int clients = CkNumValidPes();
499   DEBUGF(("THIS POINT count = %d, clients = %d\n",stats_msg_count,clients));
500  
501   if (stats_msg_count == clients) {
502         DEBUGF(("[%d] All stats messages received \n",CmiMyPe()));
503     statsData->count = stats_msg_count;
504     thisProxy[CkMyPe()].LoadBalance();
505   }
506 #endif
507 }
508
509 /** added by Abhinav for receiving msgs via spanning tree */
510 void CentralLB::ReceiveStatsViaTree(CkMarshalledCLBStatsMessage &msg)
511 {
512 #if CMK_LBDB_ON
513         CmiAssert(CkMyPe() != 0);
514         bufMsg.add(msg);         // buffer messages
515         count_msgs++;
516         //CkPrintf("here %d\n", CkMyPe());
517         if (count_msgs == st.numChildren+1) {
518                 if(st.parent == 0)
519                 {
520                         thisProxy[0].ReceiveStats(bufMsg);
521                         //CkPrintf("from %d\n", CkMyPe());
522                 }
523                 else
524                         thisProxy[st.parent].ReceiveStatsViaTree(bufMsg);
525                 count_msgs = 0;
526                 bufMsg.free();
527         } 
528 #endif
529 }
530
531 void CentralLB::LoadBalance()
532 {
533 #if CMK_LBDB_ON
534   int proc;
535   const int clients = CkNumPes();
536
537 #if ! USE_REDUCTION
538   // build data
539   buildStats();
540 #else
541   for (proc = 0; proc < clients; proc++) statsMsgsList[proc] = NULL;
542 #endif
543
544   if (_lb_args.debug()) 
545       CmiPrintf("[%s] Load balancing step %d starting at %f in PE%d Memory:%fMB\n",
546                  lbName(), step(),start_lb_time, cur_ld_balancer, CmiMemoryUsage()/(1024.0*1024.0));
547  //     CmiPrintf("[%d] n_obj:%d migratable:%d n_comm:%d\n", CkMyPe(), statsData->n_objs, statsData->n_migrateobjs, statsData->n_comm);
548 //    double strat_start_time = CkWallTimer();
549
550   // if we are in simulation mode read data
551   if (LBSimulation::doSimulation) simulationRead();
552
553   char *availVector = LBDatabaseObj()->availVector();
554   for(proc = 0; proc < clients; proc++)
555       statsData->procs[proc].available = (CmiBool)availVector[proc];
556
557   preprocess(statsData, clients);
558
559 //    CkPrintf("Before Calling Strategy\n");
560
561   if (_lb_args.printSummary()) {
562       LBInfo info(clients);
563         // not take comm data
564       info.getInfo(statsData, clients, 0);
565       double mLoad, mCpuLoad, totalLoad;
566       info.getSummary(mLoad, mCpuLoad, totalLoad);
567       int nmsgs, nbytes;
568       statsData->computeNonlocalComm(nmsgs, nbytes);
569       CkPrintf("[%d] Load Summary (before LB): max (with bg load): %f max (obj only): %f average: %f at step %d nonlocal: %d msgs %.2fKB.\n", CkMyPe(), mLoad, mCpuLoad, totalLoad/clients, step(), nmsgs, 1.0*nbytes/1024);
570 //      if (_lb_args.debug() > 1) {
571 //        for (int i=0; i<statsData->n_objs; i++)
572 //          CmiPrintf("[%d] %.10f %.10f\n", i, statsData->objData[i].minWall, statsData->objData[i].maxWall);
573 //      }
574   }
575
576   double strat_start_time = CkWallTimer();
577   LBMigrateMsg* migrateMsg = Strategy(statsData, clients);
578 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
579         migrateMsg->step = step();
580 #endif
581   if (_lb_args.debug()) {
582     CkPrintf("Strategy took %f seconds.\n", CkWallTimer()-strat_start_time);
583     double lbdbMemsize = LBDatabase::Object()->useMem()/1000;
584     CkPrintf("[%s] memUsage: LBManager:%dKB CentralLB:%dKB\n", 
585               lbName(), (int)lbdbMemsize, (int)(useMem()/1000));
586   }
587
588 //    CkPrintf("returned successfully\n");
589
590   LBDatabaseObj()->get_avail_vector(migrateMsg->avail_vector);
591   migrateMsg->next_lb = LBDatabaseObj()->new_lbbalancer();
592
593   // if this is the step at which we need to dump the database
594   simulationWrite();
595
596 //  calculate predicted load
597 //  very time consuming though, so only happen when debugging is on
598   if (_lb_args.printSummary()) {
599       LBInfo info(clients);
600         // not take comm data
601       getPredictedLoadWithMsg(statsData, clients, migrateMsg, info, 0);
602       double mLoad, mCpuLoad, totalLoad;
603       info.getSummary(mLoad, mCpuLoad, totalLoad);
604       int nmsgs, nbytes;
605       statsData->computeNonlocalComm(nmsgs, nbytes);
606       CkPrintf("[%d] Load Summary (after LB): max (with bg load): %f max (obj only): %f average: %f at step %d nonlocal: %d msgs %.2fKB useMem: %.2fKB.\n", CkMyPe(), mLoad, mCpuLoad, totalLoad/clients, step(), nmsgs, 1.0*nbytes/1024, (1.0*useMem())/1024);
607       for (int i=0; i<clients; i++)
608         migrateMsg->expectedLoad[i] = info.peLoads[i];
609   }
610
611   DEBUGF(("[%d]calling recv migration\n",CkMyPe()));
612 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_)) 
613     lbDecisionCount++;
614     migrateMsg->lbDecisionCount = lbDecisionCount;
615 #endif
616   thisProxy.ReceiveMigration(migrateMsg);
617
618   // Zero out data structures for next cycle
619   // CkPrintf("zeroing out data\n");
620   statsData->clear();
621   stats_msg_count=0;
622 #endif
623 }
624
625 //    double strat_end_time = CkWallTimer();
626 //    CkPrintf("Strat elapsed time %f\n",strat_end_time-strat_start_time);
627 // test if sender and receiver in a commData is nonmigratable.
628 static int isMigratable(LDObjData **objData, int *len, int count, const LDCommData &commData)
629 {
630 #if CMK_LBDB_ON
631   for (int pe=0 ; pe<count; pe++)
632   {
633     for (int i=0; i<len[pe]; i++)
634       if (LDObjIDEqual(objData[pe][i].objID(), commData.sender.objID()) ||
635           LDObjIDEqual(objData[pe][i].objID(), commData.receiver.get_destObj().objID())) 
636       return 0;
637   }
638 #endif
639   return 1;
640 }
641
642 // rebuild LDStats and remove all non-migratble objects and related things
643 void CentralLB::removeNonMigratable(LDStats* stats, int count)
644 {
645   int i;
646
647   // check if we have non-migratable objects
648   int have = 0;
649   for (i=0; i<stats->n_objs; i++) 
650   {
651     LDObjData &odata = stats->objData[i];
652     if (!odata.migratable) {
653       have = 1; break;
654     }
655   }
656   if (have == 0) return;
657
658   CkVec<LDObjData> nonmig;
659   CkVec<int> new_from_proc, new_to_proc;
660   nonmig.resize(stats->n_migrateobjs);
661   new_from_proc.resize(stats->n_migrateobjs);
662   new_to_proc.resize(stats->n_migrateobjs);
663   int n_objs = 0;
664   for (i=0; i<stats->n_objs; i++) 
665   {
666     LDObjData &odata = stats->objData[i];
667     if (odata.migratable) {
668       nonmig[n_objs] = odata;
669       new_from_proc[n_objs] = stats->from_proc[i];
670       new_to_proc[n_objs] = stats->to_proc[i];
671       n_objs ++;
672     }
673     else {
674       stats->procs[stats->from_proc[i]].bg_walltime += odata.wallTime;
675       stats->procs[stats->from_proc[i]].bg_cputime += odata.cpuTime;
676     }
677   }
678   CmiAssert(stats->n_migrateobjs == n_objs);
679
680   stats->makeCommHash();
681   
682   CkVec<LDCommData> newCommData;
683   newCommData.resize(stats->n_comm);
684   int n_comm = 0;
685   for (i=0; i<stats->n_comm; i++) 
686   {
687     LDCommData& cdata = stats->commData[i];
688     if (!cdata.from_proc()) 
689     {
690       int idx = stats->getSendHash(cdata);
691       CmiAssert(idx != -1);
692       if (!stats->objData[idx].migratable) continue;
693     }
694     switch (cdata.receiver.get_type()) {
695     case LD_PROC_MSG:
696       break;
697     case LD_OBJ_MSG:  {
698       int idx = stats->getRecvHash(cdata);
699       if (stats->complete_flag)
700         CmiAssert(idx != -1);
701       else if (idx == -1) continue;          // receiver not in this group
702       if (!stats->objData[idx].migratable) continue;
703       break;
704       }
705     case LD_OBJLIST_MSG:    // object message FIXME add multicast
706       break;
707     }
708     newCommData[n_comm] = cdata;
709     n_comm ++;
710   }
711
712   if (n_objs != stats->n_objs) CmiPrintf("Removed %d nonmigratable %d comms - n_objs:%d migratable:%d\n", stats->n_objs-n_objs, stats->n_objs, stats->n_migrateobjs, stats->n_comm-n_comm);
713
714   // swap to new data
715   stats->objData = nonmig;
716   stats->from_proc = new_from_proc;
717   stats->to_proc = new_to_proc;
718   stats->n_objs = n_objs;
719
720   stats->commData = newCommData;
721   stats->n_comm = n_comm;
722
723   stats->deleteCommHash();
724   stats->makeCommHash();
725
726 }
727
728
729 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
730 extern int restarted;
731 #endif
732
733 void CentralLB::ReceiveMigration(LBMigrateMsg *m)
734 {
735 #if CMK_LBDB_ON
736         int i;
737
738 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
739         int *dummyCounts;
740
741         DEBUGF(("[%d] Starting ReceiveMigration WITH step %d m->step %d\n",CkMyPe(),step(),m->step));
742         // CmiPrintf("[%d] Starting ReceiveMigration step %d m->step %d\n",CkMyPe(),step(),m->step);
743         if(step() > m->step){
744                 char str[100];
745                 envelope *env = UsrToEnv(m);
746                 CmiPrintf("[%d] Object %s tProcessed %d m->TN %d\n",CmiMyPe(),mlogData->objID.toString(str),mlogData->tProcessed,env->TN);
747                 return;
748         }
749         lbDecisionCount = m->lbDecisionCount;
750 #endif
751
752   for (i=0; i<CkNumPes(); i++) theLbdb->lastLBInfo.expectedLoad[i] = m->expectedLoad[i];
753   CmiAssert(migrates_expected <= 0 || migrates_completed == migrates_expected);
754 /*FAULT_EVAC*/
755   if(!CmiNodeAlive(CkMyPe())){
756         delete m;
757         return;
758   }
759   migrates_expected = 0;
760   future_migrates_expected = 0;
761 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
762         int sending=0;
763     int dummy=0;
764         LBDB *_myLBDB = theLbdb->getLBDB();
765         if(_restartFlag){
766         dummyCounts = new int[CmiNumPes()];
767         bzero(dummyCounts,sizeof(int)*CmiNumPes());
768     }
769 #endif
770   for(i=0; i < m->n_moves; i++) {
771     MigrateInfo& move = m->moves[i];
772     const int me = CkMyPe();
773     if (move.from_pe == me && move.to_pe != me) {
774       DEBUGF(("[%d] migrating object to %d\n",move.from_pe,move.to_pe));
775       // migrate object, in case it is already gone, inform toPe
776 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
777       if (theLbdb->Migrate(move.obj,move.to_pe) == 0) 
778          thisProxy[move.to_pe].MissMigrate(!move.async_arrival);
779 #else
780             if(_restartFlag == 0){
781                 DEBUG(CmiPrintf("[%d] need to move object from %d to %d \n",CkMyPe(),move.from_pe,move.to_pe));
782                 theLbdb->Migrate(move.obj,move.to_pe);
783                 sending++;
784             }else{
785                 if(_myLBDB->validObjHandle(move.obj)){
786                     DEBUG(CmiPrintf("[%d] need to move object from %d to %d \n",CkMyPe(),move.from_pe,move.to_pe));
787                     theLbdb->Migrate(move.obj,move.to_pe);
788                     sending++;
789                 }else{
790                     DEBUG(CmiPrintf("[%d] dummy move to pe %d detected after restart \n",CmiMyPe(),move.to_pe));
791                     dummyCounts[move.to_pe]++;
792                     dummy++;
793                 }
794             }
795 #endif
796     } else if (move.from_pe != me && move.to_pe == me) {
797        DEBUGF(("[%d] expecting object from %d\n",move.to_pe,move.from_pe));
798       if (!move.async_arrival) migrates_expected++;
799       else future_migrates_expected++;
800     }
801   }
802   DEBUGF(("[%d] in ReceiveMigration %d moves expected: %d future expected: %d\n",CkMyPe(),m->n_moves, migrates_expected, future_migrates_expected));
803   // if (_lb_debug) CkPrintf("[%d] expecting %d objects migrating.\n", CkMyPe(), migrates_expected);
804
805 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
806         if(_restartFlag){
807                 sendDummyMigrationCounts(dummyCounts);
808                 _restartFlag  =0;
809         delete []dummyCounts;
810         }
811 #endif
812
813
814 #if 0
815   if (m->n_moves ==0) {
816     theLbdb->SetLBPeriod(theLbdb->GetLBPeriod()*2);
817   }
818 #endif
819   cur_ld_balancer = m->next_lb;
820   if((CkMyPe() == cur_ld_balancer) && (cur_ld_balancer != 0)){
821       LBDatabaseObj()->set_avail_vector(m->avail_vector, -2);
822   }
823
824   if (migrates_expected == 0 || migrates_completed == migrates_expected)
825     MigrationDone(1);
826   delete m;
827
828 //      CkEvacuatedElement();
829 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
830 //  migrates_expected = 0;
831 //  //  ResumeClients(1);
832 #endif
833 #endif
834 }
835
836 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
837 void CentralLB::ReceiveDummyMigration(int globalDecisionCount){
838     DEBUGF(("[%d] ReceiveDummyMigration called for step %d with globalDecisionCount %d\n",CkMyPe(),step(),globalDecisionCount));
839     //TODO: this is gonna be important when a crash happens during checkpoint
840     //the globalDecisionCount would have to be saved and compared against
841     //a future recvMigration
842                 
843         thisProxy[CkMyPe()].ResumeClients(1);
844 }
845 #endif
846
847 void CentralLB::MigrationDone(int balancing)
848 {
849 #if CMK_LBDB_ON
850   migrates_completed = 0;
851   migrates_expected = -1;
852   // clear load stats
853   if (balancing) theLbdb->ClearLoads();
854   // Increment to next step
855   theLbdb->incStep();
856         DEBUGF(("[%d] Incrementing Step %d \n",CkMyPe(),step()));
857   // if sync resume, invoke a barrier
858
859 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
860     savedBalancing = balancing;
861     startLoadBalancingMlog(&resumeCentralLbAfterChkpt,(void *)this);
862 #endif
863
864   LoadbalanceDone(balancing);        // callback
865 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
866   // if sync resume invoke a barrier
867   if (balancing && _lb_args.syncResume()) {
868     CkCallback cb(CkIndex_CentralLB::ResumeClients((CkReductionMsg*)NULL), 
869                   thisProxy);
870     contribute(0, NULL, CkReduction::sum_int, cb);
871   }
872   else{ 
873     if(CmiNodeAlive(CkMyPe())){
874         thisProxy [CkMyPe()].ResumeClients(balancing);
875     }   
876   }     
877 #if CMK_GRID_QUEUE_AVAILABLE
878   CmiGridQueueDeregisterAll ();
879   CpvAccess(CkGridObject) = NULL;
880 #endif
881 #endif 
882 #endif
883 }
884
885 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
886 void CentralLB::endMigrationDone(int balancing){
887     DEBUGF(("[%d] CentralLB::endMigrationDone step %d\n",CkMyPe(),step()));
888
889
890   if (balancing && _lb_args.syncResume()) {
891     CkCallback cb(CkIndex_CentralLB::ResumeClients((CkReductionMsg*)NULL),
892                   thisProxy);
893     contribute(0, NULL, CkReduction::sum_int, cb);
894   }
895   else{
896     if(CmiNodeAlive(CkMyPe())){
897     DEBUGF(("[%d] Sending ResumeClients balancing %d \n",CkMyPe(),balancing));
898     thisProxy [CkMyPe()].ResumeClients(balancing);
899     }
900   }
901
902 }
903 #endif
904
905 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
906 void resumeCentralLbAfterChkpt(void *_lb){
907     CentralLB *lb= (CentralLB *)_lb;
908     CpvAccess(_currentObj)=lb;
909     lb->endMigrationDone(lb->savedBalancing);
910 }
911 #endif
912
913
914 void CentralLB::ResumeClients(CkReductionMsg *msg)
915 {
916   ResumeClients(1);
917   delete msg;
918 }
919
920 void CentralLB::ResumeClients(int balancing)
921 {
922 #if CMK_LBDB_ON
923 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
924     resumeCount++;
925     globalResumeCount = resumeCount;
926 #endif
927   DEBUGF(("[%d] Resuming clients. balancing:%d.\n",CkMyPe(),balancing));
928   if (balancing && _lb_args.debug() && CkMyPe() == cur_ld_balancer) {
929     double end_lb_time = CkWallTimer();
930   }
931
932 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
933   if (balancing) ComlibNotifyMigrationDone();  
934 #endif
935
936   theLbdb->ResumeClients();
937   if (balancing)  {
938     CheckMigrationComplete();
939     if (future_migrates_expected == 0 || 
940             future_migrates_expected == future_migrates_completed) {
941       CheckMigrationComplete();
942     }
943   }
944 #endif
945 }
946
947 /*
948   migration of objects contains two different kinds:
949   (1) objects want to make a barrier for migration completion
950       (waitForBarrier is true)
951       migrationDone() to finish and resumeClients
952   (2) objects don't need a barrier
953   However, next load balancing can only happen when both migrations complete
954 */ 
955 void CentralLB::CheckMigrationComplete()
956 {
957 #if CMK_LBDB_ON
958   lbdone ++;
959   if (lbdone == 2) {
960     if (_lb_args.debug() && CkMyPe()==0) {
961       double end_lb_time = CkWallTimer();
962       CkPrintf("[%s] Load balancing step %d finished at %f duration %f\n",
963                 lbName(), step()-1,end_lb_time,end_lb_time - start_lb_time);
964     }
965     lbdone = 0;
966     future_migrates_expected = -1;
967     future_migrates_completed = 0;
968     DEBUGF(("[%d] Migration Complete\n", CkMyPe()));
969     // release local barrier  so that the next load balancer can go
970     LDOMHandle h;
971     h.id.id.idx = 0;
972     theLbdb->getLBDB()->DoneRegisteringObjects(h);
973     // switch to the next load balancer in the list
974     // subtle: called from Migrated() may result in Migrated() called in next LB
975     theLbdb->nextLoadbalancer(seqno);
976   }
977 #endif
978 }
979
980 void CentralLB::preprocess(LDStats* stats,int count)
981 {
982   if (_lb_args.ignoreBgLoad())
983     stats->clearBgLoad();
984
985   // Call the predictor for the future
986   if (_lb_predict) FuturePredictor(statsData);
987 }
988
989 // default load balancing strategy
990 LBMigrateMsg* CentralLB::Strategy(LDStats* stats,int count)
991 {
992 #if CMK_LBDB_ON
993   work(stats, count);
994
995   if (_lb_args.debug()>1)  {
996     CkPrintf("Obj Map:\n");
997     for (int i=0; i<stats->n_objs; i++) CkPrintf("%d ", stats->to_proc[i]);
998     CkPrintf("\n");
999   }
1000
1001   return createMigrateMsg(stats, count);
1002 #else
1003   return NULL;
1004 #endif
1005 }
1006
1007 void CentralLB::work(LDStats* stats,int count)
1008 {
1009   // does nothing but print the database
1010   stats->print();
1011 }
1012
1013 // generate migrate message from stats->from_proc and to_proc
1014 LBMigrateMsg * CentralLB::createMigrateMsg(LDStats* stats,int count)
1015 {
1016   int i;
1017   CkVec<MigrateInfo*> migrateInfo;
1018   for (i=0; i<stats->n_objs; i++) {
1019     LDObjData &objData = stats->objData[i];
1020     int frompe = stats->from_proc[i];
1021     int tope = stats->to_proc[i];
1022     if (frompe != tope) {
1023       //      CkPrintf("[%d] Obj %d migrating from %d to %d\n",
1024       //         CkMyPe(),obj,pe,dest);
1025       MigrateInfo *migrateMe = new MigrateInfo;
1026       migrateMe->obj = objData.handle;
1027       migrateMe->from_pe = frompe;
1028       migrateMe->to_pe = tope;
1029       migrateMe->async_arrival = objData.asyncArrival;
1030       migrateInfo.insertAtEnd(migrateMe);
1031     }
1032   }
1033
1034   int migrate_count=migrateInfo.length();
1035   LBMigrateMsg* msg = new(migrate_count,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
1036   msg->n_moves = migrate_count;
1037   for(i=0; i < migrate_count; i++) {
1038     MigrateInfo* item = (MigrateInfo*) migrateInfo[i];
1039     msg->moves[i] = *item;
1040     delete item;
1041     migrateInfo[i] = 0;
1042   }
1043   if (_lb_args.debug())
1044     CkPrintf("%s: %d objects migrating.\n", lbname, migrate_count);
1045   return msg;
1046 }
1047
1048 void CentralLB::simulationWrite() {
1049   if(step() == LBSimulation::dumpStep)
1050   {
1051     // here we are supposed to dump the database
1052     int dumpFileSize = strlen(LBSimulation::dumpFile) + 4;
1053     char *dumpFileName = (char *)malloc(dumpFileSize);
1054     while (sprintf(dumpFileName, "%s.%d", LBSimulation::dumpFile, LBSimulation::dumpStep) >= dumpFileSize) {
1055       free(dumpFileName);
1056       dumpFileSize+=3;
1057       dumpFileName = (char *)malloc(dumpFileSize);
1058     }
1059     writeStatsMsgs(dumpFileName);
1060     free(dumpFileName);
1061     CmiPrintf("LBDump: Dumped the load balancing data at step %d.\n",LBSimulation::dumpStep);
1062     ++LBSimulation::dumpStep;
1063     --LBSimulation::dumpStepSize;
1064     if (LBSimulation::dumpStepSize <= 0) { // prevent stupid step sizes
1065       CmiPrintf("Charm++> Exiting...\n");
1066       CkExit();
1067     }
1068     return;
1069   }
1070 }
1071
1072 void CentralLB::simulationRead() {
1073   LBSimulation *simResults = NULL, *realResults;
1074   LBMigrateMsg *voidMessage = new (0,0,0,0) LBMigrateMsg();
1075   voidMessage->n_moves=0;
1076   for ( ;LBSimulation::simStepSize > 0; --LBSimulation::simStepSize, ++LBSimulation::simStep) {
1077     // here we are supposed to read the data from the dump database
1078     int simFileSize = strlen(LBSimulation::dumpFile) + 4;
1079     char *simFileName = (char *)malloc(simFileSize);
1080     while (sprintf(simFileName, "%s.%d", LBSimulation::dumpFile, LBSimulation::simStep) >= simFileSize) {
1081       free(simFileName);
1082       simFileSize+=3;
1083       simFileName = (char *)malloc(simFileSize);
1084     }
1085     readStatsMsgs(simFileName);
1086
1087     // allocate simResults (only the first step)
1088     if (simResults == NULL) {
1089       simResults = new LBSimulation(LBSimulation::simProcs);
1090       realResults = new LBSimulation(LBSimulation::simProcs);
1091     }
1092     else {
1093       // should be the same number of procs of the original simulation!
1094       if (!LBSimulation::procsChanged) {
1095         // it means we have a previous step, so in simResults there is data.
1096         // we can now print the real effects of the load balancer during the simulation
1097         // or print the difference between the predicted data and the real one.
1098         realResults->reset();
1099         // reset to_proc of statsData to be equal to from_proc
1100         for (int k=0; k < statsData->n_objs; ++k) statsData->to_proc[k] = statsData->from_proc[k];
1101         findSimResults(statsData, LBSimulation::simProcs, voidMessage, realResults);
1102         simResults->PrintDifferences(realResults,statsData);
1103       }
1104       simResults->reset();
1105     }
1106
1107     // now pass it to the strategy routine
1108     double startT = CkWallTimer();
1109     preprocess(statsData, LBSimulation::simProcs);
1110     CmiPrintf("%s> Strategy starts ... \n", lbname);
1111     LBMigrateMsg* migrateMsg = Strategy(statsData, LBSimulation::simProcs);
1112     CmiPrintf("%s> Strategy took %fs memory usage: CentralLB:%dKB. \n", 
1113                lbname, CkWallTimer()-startT, (int)(useMem()/1000));
1114
1115     // now calculate the results of the load balancing simulation
1116     findSimResults(statsData, LBSimulation::simProcs, migrateMsg, simResults);
1117
1118     // now we have the simulation data, so print it and loop
1119     CmiPrintf("Charm++> LBSim: Simulation of load balancing step %d done.\n",LBSimulation::simStep);
1120     // **CWL** Officially recording my disdain here for using ints for bool
1121     if (LBSimulation::showDecisionsOnly) {
1122       simResults->PrintDecisions(migrateMsg, simFileName, 
1123                                  LBSimulation::simProcs);
1124     } else {
1125       simResults->PrintSimulationResults();
1126     }
1127
1128     free(simFileName);
1129     delete migrateMsg;
1130     CmiPrintf("Charm++> LBSim: Passing to the next step\n");
1131   }
1132   // deallocate simResults
1133   delete simResults;
1134   CmiPrintf("Charm++> Exiting...\n");
1135   CkExit();
1136 }
1137
1138 void CentralLB::readStatsMsgs(const char* filename) 
1139 {
1140 #if CMK_LBDB_ON
1141   int i;
1142   FILE *f = fopen(filename, "r");
1143   if (f==NULL) {
1144     CmiPrintf("Fatal Error> Cannot open LB Dump file %s!\n", filename);
1145     CmiAbort("");
1146   }
1147
1148   // at this stage, we need to rebuild the statsMsgList and
1149   // statsDataList structures. For that first deallocate the
1150   // old structures
1151   if (statsMsgsList) {
1152     for(i = 0; i < stats_msg_count; i++)
1153       delete statsMsgsList[i];
1154     delete[] statsMsgsList;
1155     statsMsgsList=0;
1156   }
1157
1158   PUP::fromDisk pd(f);
1159   PUP::machineInfo machInfo;
1160
1161   pd((char *)&machInfo, sizeof(machInfo));      // read machine info
1162   PUP::xlater p(machInfo, pd);
1163
1164   if (_lb_args.lbversion() > 1) {
1165     p|_lb_args.lbversion();             // write version number
1166     CkPrintf("LB> File version detected: %d\n", _lb_args.lbversion());
1167     CmiAssert(_lb_args.lbversion() <= LB_FORMAT_VERSION);
1168   }
1169   p|stats_msg_count;
1170
1171   CmiPrintf("readStatsMsgs for %d pes starts ... \n", stats_msg_count);
1172   if (LBSimulation::simProcs == 0) LBSimulation::simProcs = stats_msg_count;
1173   if (LBSimulation::simProcs != stats_msg_count) LBSimulation::procsChanged = true;
1174
1175   // LBSimulation::simProcs must be set
1176   statsData->pup(p);
1177
1178   CmiPrintf("Simulation for %d pes \n", LBSimulation::simProcs);
1179   CmiPrintf("n_obj: %d n_migratble: %d \n", statsData->n_objs, statsData->n_migrateobjs);
1180
1181   // file f is closed in the destructor of PUP::fromDisk
1182   CmiPrintf("ReadStatsMsg from %s completed\n", filename);
1183 #endif
1184 }
1185
1186 void CentralLB::writeStatsMsgs(const char* filename) 
1187 {
1188 #if CMK_LBDB_ON
1189   FILE *f = fopen(filename, "w");
1190   if (f==NULL) {
1191     CmiPrintf("Fatal Error> writeStatsMsgs failed to open the output file %s!\n", filename);
1192     CmiAbort("");
1193   }
1194
1195   const PUP::machineInfo &machInfo = PUP::machineInfo::current();
1196   PUP::toDisk p(f);
1197   p((char *)&machInfo, sizeof(machInfo));       // machine info
1198
1199   p|_lb_args.lbversion();               // write version number
1200   p|stats_msg_count;
1201   statsData->pup(p);
1202
1203   fclose(f);
1204
1205   CmiPrintf("WriteStatsMsgs to %s succeed!\n", filename);
1206 #endif
1207 }
1208
1209 // calculate the predicted wallclock/cpu load for every processors
1210 // considering communication overhead if considerComm is true
1211 void getPredictedLoadWithMsg(BaseLB::LDStats* stats, int count, 
1212                       LBMigrateMsg *msg, LBInfo &info, 
1213                       int considerComm)
1214 {
1215 #if CMK_LBDB_ON
1216         stats->makeCommHash();
1217
1218         // update to_proc according to migration msgs
1219         for(int i = 0; i < msg->n_moves; i++) {
1220           MigrateInfo &mInfo = msg->moves[i];
1221           int idx = stats->getHash(mInfo.obj.objID(), mInfo.obj.omID());
1222           CmiAssert(idx != -1);
1223           stats->to_proc[idx] = mInfo.to_pe;
1224         }
1225
1226         info.getInfo(stats, count, considerComm);
1227 #endif
1228 }
1229
1230
1231 void CentralLB::findSimResults(LDStats* stats, int count, LBMigrateMsg* msg, LBSimulation* simResults)
1232 {
1233     CkAssert(simResults != NULL && count == simResults->numPes);
1234     // estimate the new loads of the processors. As a first approximation, this is the
1235     // sum of the cpu times of the objects on that processor
1236     double startT = CkWallTimer();
1237     getPredictedLoadWithMsg(stats, count, msg, simResults->lbinfo, 1);
1238     CmiPrintf("getPredictedLoad finished in %fs\n", CkWallTimer()-startT);
1239 }
1240
1241 void CentralLB::pup(PUP::er &p) { 
1242   BaseLB::pup(p); 
1243   if (p.isUnpacking())  {
1244     initLB(CkLBOptions(seqno)); 
1245   }
1246 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1247   p | lbDecisionCount;
1248   p | resumeCount;
1249 #endif
1250         
1251 }
1252
1253 int CentralLB::useMem() { 
1254   return sizeof(CentralLB) + statsData->useMem() + 
1255          CkNumPes() * sizeof(CLBStatsMsg *);
1256 }
1257
1258
1259 /**
1260   CLBStatsMsg is not a real message now.
1261   CLBStatsMsg is used for all processors to fill in their local load and comm
1262   statistics and send to processor 0
1263 */
1264
1265 CLBStatsMsg::CLBStatsMsg(int osz, int csz) {
1266   n_objs = osz;
1267   n_comm = csz;
1268   objData = new LDObjData[osz];
1269   commData = new LDCommData[csz];
1270   avail_vector = NULL;
1271 }
1272
1273 CLBStatsMsg::~CLBStatsMsg() {
1274   delete [] objData;
1275   delete [] commData;
1276   delete [] avail_vector;
1277 }
1278
1279 void CLBStatsMsg::pup(PUP::er &p) {
1280   int i;
1281   p|from_pe;
1282   p|pe_speed;
1283   p|total_walltime; p|total_cputime;
1284   p|idletime;
1285   p|bg_walltime;   p|bg_cputime;
1286 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1287   p | step;
1288 #endif
1289   p|n_objs;
1290   if (p.isUnpacking()) objData = new LDObjData[n_objs];
1291   for (i=0; i<n_objs; i++) p|objData[i];
1292   p|n_comm;
1293   if (p.isUnpacking()) commData = new LDCommData[n_comm];
1294   for (i=0; i<n_comm; i++) p|commData[i];
1295
1296   int has_avail_vector;
1297   if (!p.isUnpacking()) has_avail_vector = (avail_vector != NULL);
1298   p|has_avail_vector;
1299   if (p.isUnpacking()) {
1300     if (has_avail_vector) avail_vector = new char[CkNumPes()];
1301     else avail_vector = NULL;
1302   }
1303   if (has_avail_vector) p(avail_vector, CkNumPes());
1304
1305   p(next_lb);
1306 }
1307
1308 // CkMarshalledCLBStatsMessage is used in the marshalled parameter in
1309 // the entry function, it is just used to use to pup.
1310 // I don't use CLBStatsMsg directly as marshalled parameter because
1311 // I want the data pointer stored and not to be freed by the Charm++.
1312 void CkMarshalledCLBStatsMessage::free() { 
1313   int count = msgs.size();
1314   for  (int i=0; i<count; i++) {
1315     delete msgs[i];
1316     msgs[i] = NULL;
1317   }
1318   msgs.free();
1319 }
1320
1321 void CkMarshalledCLBStatsMessage::add(CkMarshalledCLBStatsMessage &m)
1322 {
1323   int count = m.getCount();
1324   for (int i=0; i<count; i++) add(m.getMessage(i));
1325 }
1326
1327 void CkMarshalledCLBStatsMessage::pup(PUP::er &p)
1328 {
1329   int count = msgs.size();
1330   p|count;
1331   for (int i=0; i<count; i++) {
1332     CLBStatsMsg *msg;
1333     if (p.isUnpacking()) msg = new CLBStatsMsg;
1334     else { 
1335       msg = msgs[i]; CmiAssert(msg!=NULL);
1336     }
1337     msg->pup(p);
1338     if (p.isUnpacking()) add(msg);
1339   }
1340 }
1341
1342 SpanningTree::SpanningTree()
1343 {
1344         double sq = sqrt(CkNumPes()*4.0-3.0) - 1; // 1 + arity + arity*arity = CkNumPes()
1345         arity = (int)ceil(sq/2);
1346         calcParent(CkMyPe());
1347         calcNumChildren(CkMyPe());
1348 }
1349
1350 void SpanningTree::calcParent(int n)
1351 {
1352         parent=-1;
1353         if(n != 0  && arity > 0)
1354                 parent = (n-1)/arity;
1355 }
1356
1357 void SpanningTree::calcNumChildren(int n)
1358 {
1359         numChildren = 0;
1360         if (arity == 0) return;
1361         int fullNode=(CkNumPes()-1-arity)/arity;
1362         if(n <= fullNode)
1363                 numChildren = arity;
1364         if(n == fullNode+1)
1365                 numChildren = CkNumPes()-1-(fullNode+1)*arity;
1366         if(n > fullNode+1)
1367                 numChildren = 0;
1368 }
1369
1370 #include "CentralLB.def.h"
1371  
1372 /*@}*/