f4b3209e75c2ec4cf95587040f3fe8a90ada593c
[charm.git] / src / ck-ldb / CentralLB.C
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 /**
9  * \addtogroup CkLdb
10 */
11 /*@{*/
12
13 #include <charm++.h>
14 #include "envelope.h"
15 #include "CentralLB.h"
16 #include "LBDBManager.h"
17 #include "LBSimulation.h"
18
19 #define  DEBUGF(x)       CmiPrintf x;
20 #define  DEBUG(x)        x;
21
22 #if CMK_MEM_CHECKPOINT
23    /* can not handle reduction in inmem FT */
24 #define USE_REDUCTION         0
25 #define USE_LDB_SPANNING_TREE 0
26 #elif defined(_FAULT_MLOG_)
27 /* can not handle reduction in inmem FT */
28 #define USE_REDUCTION         0
29 #define USE_LDB_SPANNING_TREE 0
30 #else
31 #define USE_REDUCTION         1
32 #define USE_LDB_SPANNING_TREE 1
33 #endif
34
35 #ifdef _FAULT_MLOG_
36 extern int _restartFlag;
37 extern void getGlobalStep(CkGroupID );
38 extern void initMlogLBStep(CkGroupID );
39 extern int globalResumeCount;
40 extern void sendDummyMigrationCounts(int *);
41 #endif
42
43 #if CMK_GRID_QUEUE_AVAILABLE
44 CpvExtern(void *, CkGridObject);
45 #endif
46
47 CkGroupID loadbalancer;
48 int * lb_ptr;
49 int load_balancer_created;
50
51 CreateLBFunc_Def(CentralLB, "CentralLB base class");
52
53 static void getPredictedLoadWithMsg(BaseLB::LDStats* stats, int count, 
54                              LBMigrateMsg *, LBInfo &info, int considerComm);
55
56 /*
57 void CreateCentralLB()
58 {
59   CProxy_CentralLB::ckNew(0);
60 }
61 */
62
63 void CentralLB::staticStartLB(void* data)
64 {
65   CentralLB *me = (CentralLB*)(data);
66   me->StartLB();
67 }
68
69 void CentralLB::staticMigrated(void* data, LDObjHandle h, int waitBarrier)
70 {
71   CentralLB *me = (CentralLB*)(data);
72   me->Migrated(h, waitBarrier);
73 }
74
75 void CentralLB::staticAtSync(void* data)
76 {
77   CentralLB *me = (CentralLB*)(data);
78   me->AtSync();
79 }
80
81 void CentralLB::initLB(const CkLBOptions &opt)
82 {
83 #if CMK_LBDB_ON
84   lbname = "CentralLB";
85   thisProxy = CProxy_CentralLB(thisgroup);
86   //  CkPrintf("Construct in %d\n",CkMyPe());
87
88   // create and turn on by default
89   receiver = theLbdb->
90     AddLocalBarrierReceiver((LDBarrierFn)(staticAtSync),(void*)(this));
91   notifier = theLbdb->getLBDB()->
92     NotifyMigrated((LDMigratedFn)(staticMigrated),(void*)(this));
93   startLbFnHdl = theLbdb->getLBDB()->
94     AddStartLBFn((LDStartLBFn)(staticStartLB),(void*)(this));
95
96   // CkPrintf("[%d] CentralLB initLB \n",CkMyPe());
97   if (opt.getSeqNo() > 0) turnOff();
98
99   stats_msg_count = 0;
100   statsMsgsList = new CLBStatsMsg*[CkNumPes()];
101   for(int i=0; i < CkNumPes(); i++)
102     statsMsgsList[i] = 0;
103
104   statsData = new LDStats;
105
106   // for future predictor
107   if (_lb_predict) predicted_model = new FutureModel(_lb_predict_window);
108   else predicted_model=0;
109   // register user interface callbacks
110   theLbdb->getLBDB()->SetupPredictor((LDPredictModelFn)(staticPredictorOn),(LDPredictWindowFn)(staticPredictorOnWin),(LDPredictFn)(staticPredictorOff),(LDPredictModelFn)(staticChangePredictor),(void*)(this));
111
112   myspeed = theLbdb->ProcessorSpeed();
113
114   migrates_completed = 0;
115   future_migrates_completed = 0;
116   migrates_expected = -1;
117   future_migrates_expected = -1;
118   cur_ld_balancer = _lb_args.central_pe();      // 0 default
119   lbdone = 0;
120   count_msgs=0;
121   statsMsg = NULL;
122
123   if (_lb_args.statsOn()) theLbdb->CollectStatsOn();
124
125   load_balancer_created = 1;
126 #endif
127 }
128
129 CentralLB::~CentralLB()
130 {
131 #if CMK_LBDB_ON
132   delete [] statsMsgsList;
133   delete statsData;
134   theLbdb = CProxy_LBDatabase(_lbdb).ckLocalBranch();
135   if (theLbdb) {
136     theLbdb->getLBDB()->
137       RemoveNotifyMigrated(notifier);
138     theLbdb->
139       RemoveStartLBFn((LDStartLBFn)(staticStartLB));
140   }
141 #endif
142 }
143
144 void CentralLB::turnOn() 
145 {
146 #if CMK_LBDB_ON
147   theLbdb->getLBDB()->
148     TurnOnBarrierReceiver(receiver);
149   theLbdb->getLBDB()->
150     TurnOnNotifyMigrated(notifier);
151   theLbdb->getLBDB()->
152     TurnOnStartLBFn(startLbFnHdl);
153 #endif
154 }
155
156 void CentralLB::turnOff() 
157 {
158 #if CMK_LBDB_ON
159   theLbdb->getLBDB()->
160     TurnOffBarrierReceiver(receiver);
161   theLbdb->getLBDB()->
162     TurnOffNotifyMigrated(notifier);
163   theLbdb->getLBDB()->
164     TurnOffStartLBFn(startLbFnHdl);
165 #endif
166 }
167
168 void CentralLB::AtSync()
169 {
170 #if CMK_LBDB_ON
171   DEBUGF(("[%d] CentralLB AtSync step %d!!!!!\n",CkMyPe(),step()));
172
173 #ifdef _FAULT_MLOG_
174         CpvAccess(_currentObj)=this;
175 #endif
176
177   // if num of processor is only 1, nothing should happen
178   if (!QueryBalanceNow(step()) || CkNumPes() == 1) {
179     MigrationDone(0);
180     return;
181   }
182   if(CmiNodeAlive(CkMyPe())){
183     thisProxy [CkMyPe()].ProcessAtSync();
184   }
185 #endif
186 }
187
188 #include "ComlibStrategy.h"
189
190 void CentralLB::ProcessAtSync()
191 {
192         CkPrintf("[%d] .................... TRACE\n",CkMyPe());
193
194 #if CMK_LBDB_ON
195   CmiAssert(CmiNodeAlive(CkMyPe()));
196   if (CkMyPe() == cur_ld_balancer) {
197     start_lb_time = CkWallTimer();
198   }
199
200
201 #ifdef _FAULT_MLOG_
202         initMlogLBStep(thisgroup);
203 #endif
204
205   // build message
206   BuildStatsMsg();
207
208 #if USE_REDUCTION
209     // reduction to get total number of objects and comm
210     // so that processor 0 can pre-allocate load balancing database
211   int counts[2];
212   counts[0] = theLbdb->GetObjDataSz();
213   counts[1] = theLbdb->GetCommDataSz();
214
215   CkCallback cb(CkIndex_CentralLB::ReceiveCounts((CkReductionMsg*)NULL), 
216                   thisProxy[0]);
217   contribute(2*sizeof(int), counts, CkReduction::sum_int, cb);
218 #else
219   SendStats();
220 #endif
221 #endif
222 }
223
224 // called only on 0
225 void CentralLB::ReceiveCounts(CkReductionMsg  *msg)
226 {
227   CmiAssert(CkMyPe() == 0);
228
229   int *counts = (int *)msg->getData();
230   int n_objs = counts[0];
231   int n_comm = counts[1];
232
233     // resize database
234   statsData->objData.resize(n_objs);
235   statsData->from_proc.resize(n_objs);
236   statsData->to_proc.resize(n_objs);
237   statsData->commData.resize(n_comm);
238
239   DEBUGF(("[%d] ReceiveCounts\n",CkMyPe()));
240         
241     // broadcast call to let everybody start to send stats
242   thisProxy.SendStats();
243 }
244
245 void CentralLB::BuildStatsMsg()
246 {
247 #if CMK_LBDB_ON
248   // build and send stats
249   const int osz = theLbdb->GetObjDataSz();
250   const int csz = theLbdb->GetCommDataSz();
251
252   int npes = CkNumPes();
253   CLBStatsMsg* msg = new CLBStatsMsg(osz, csz);
254   msg->from_pe = CkMyPe();
255         CkPrintf("[%d] the step is %d\n",CkMyPe(),step());
256 #ifdef _FAULT_MLOG_
257         msg->step = step();
258 #endif
259   //msg->serial = CrnRand();
260
261 /*
262   theLbdb->TotalTime(&msg->total_walltime,&msg->total_cputime);
263   theLbdb->IdleTime(&msg->idletime);
264   theLbdb->BackgroundLoad(&msg->bg_walltime,&msg->bg_cputime);
265 */
266   theLbdb->GetTime(&msg->total_walltime,&msg->total_cputime,
267                    &msg->idletime, &msg->bg_walltime,&msg->bg_cputime);
268   msg->pe_speed = myspeed;
269   DEBUGF(("Processor %d Total time (wall,cpu) = %f %f Idle = %f Bg = %f %f\n", CkMyPe(),msg->total_walltime,msg->total_cputime,msg->idletime,msg->bg_walltime,msg->bg_cputime));
270
271   msg->n_objs = osz;
272   theLbdb->GetObjData(msg->objData);
273   msg->n_comm = csz;
274   theLbdb->GetCommData(msg->commData);
275 //  theLbdb->ClearLoads();
276   DEBUGF(("PE %d sending stats to ReceiveStats %d objs, %d comm\n",CkMyPe(),msg->n_objs,msg->n_comm));
277
278   if(CkMyPe() == cur_ld_balancer) {
279     msg->avail_vector = new char[CkNumPes()];
280     LBDatabaseObj()->get_avail_vector(msg->avail_vector);
281     msg->next_lb = LBDatabaseObj()->new_lbbalancer();
282   }
283
284   CmiAssert(statsMsg == NULL);
285   statsMsg = msg;
286 #endif
287 }
288
289 // called on every processor
290 void CentralLB::SendStats()
291 {
292 #if CMK_LBDB_ON
293   CmiAssert(statsMsg != NULL);
294
295 #if USE_LDB_SPANNING_TREE
296   if(CkNumPes()>1024)
297   {
298     if (CkMyPe() == cur_ld_balancer)
299       thisProxy[CkMyPe()].ReceiveStats(statsMsg);
300     else
301       thisProxy[CkMyPe()].ReceiveStatsViaTree(statsMsg);
302   }
303   else
304 #endif
305         DEBUGF(("[%d] calling ReceiveStats on step %d \n",CmiMyPe(),step()));
306   thisProxy[cur_ld_balancer].ReceiveStats(statsMsg);
307
308   statsMsg = NULL;
309
310 #ifdef __BLUEGENE__
311   BgEndStreaming();
312 #endif
313
314   {
315   // enfore the barrier to wait until centralLB says no
316   LDOMHandle h;
317   h.id.id.idx = 0;
318   theLbdb->getLBDB()->RegisteringObjects(h);
319   }
320 #endif
321 }
322
323 #ifdef _FAULT_MLOG_
324 extern int donotCountMigration;
325 #endif
326
327 void CentralLB::Migrated(LDObjHandle h, int waitBarrier)
328 {
329 #ifdef _FAULT_MLOG_
330     if(donotCountMigration){
331         return ;
332     }
333 #endif
334
335 #if CMK_LBDB_ON
336   if (waitBarrier) {
337             migrates_completed++;
338       DEBUGF(("[%d] An object migrated! %d %d\n",CkMyPe(),migrates_completed,migrates_expected));
339     if (migrates_completed == migrates_expected) {
340       MigrationDone(1);
341     }
342   }
343   else {
344     future_migrates_completed ++;
345     DEBUGF(("[%d] An object migrated with no barrier! %d expected: %d\n",CkMyPe(),future_migrates_completed,future_migrates_expected));
346     if (future_migrates_completed == future_migrates_expected)  {
347         CheckMigrationComplete();
348     }
349   }
350 #endif
351 }
352
353 void CentralLB::MissMigrate(int waitForBarrier)
354 {
355   LDObjHandle h;
356   Migrated(h, waitForBarrier);
357 }
358
359 // build a complete data from bufferred messages
360 // not used when USE_REDUCTION = 1
361 void CentralLB::buildStats()
362 {
363     statsData->count = stats_msg_count;
364     // allocate space
365     statsData->objData.resize(statsData->n_objs);
366     statsData->from_proc.resize(statsData->n_objs);
367     statsData->to_proc.resize(statsData->n_objs);
368     statsData->commData.resize(statsData->n_comm);
369
370     int nobj = 0;
371     int ncom = 0;
372     int nmigobj = 0;
373     // copy all data in individule message to this big structure
374     for (int pe=0; pe<CkNumPes(); pe++) {
375        int i;
376        CLBStatsMsg *msg = statsMsgsList[pe];
377        if(msg == NULL) continue;
378        for (i=0; i<msg->n_objs; i++) {
379          statsData->from_proc[nobj] = statsData->to_proc[nobj] = pe;
380          statsData->objData[nobj] = msg->objData[i];
381          if (msg->objData[i].migratable) nmigobj++;
382          nobj++;
383        }
384        for (i=0; i<msg->n_comm; i++) {
385          statsData->commData[ncom] = msg->commData[i];
386          ncom++;
387        }
388        // free the memory
389        delete msg;
390        statsMsgsList[pe]=0;
391     }
392     statsData->n_migrateobjs = nmigobj;
393 }
394
395 // deposit one processor data at a time, note database is pre-allocated
396 // to have enough space
397 // used when USE_REDUCTION = 1
398 void CentralLB::depositData(CLBStatsMsg *m)
399 {
400   int i;
401   if (m == NULL) return;
402
403   const int pe = m->from_pe;
404   struct ProcStats &procStat = statsData->procs[pe];
405   procStat.pe = pe;
406   procStat.total_walltime = m->total_walltime;
407   procStat.total_cputime = m->total_cputime;
408   procStat.idletime = m->idletime;
409   procStat.bg_walltime = m->bg_walltime;
410   procStat.bg_cputime = m->bg_cputime;
411   procStat.pe_speed = m->pe_speed;
412   //procStat.utilization = 1.0;
413   procStat.available = CmiTrue;
414   procStat.n_objs = m->n_objs;
415
416   int &nobj = statsData->n_objs;
417   int &nmigobj = statsData->n_migrateobjs;
418   for (i=0; i<m->n_objs; i++) {
419       statsData->from_proc[nobj] = statsData->to_proc[nobj] = pe;
420       statsData->objData[nobj] = m->objData[i];
421       if (m->objData[i].migratable) nmigobj++;
422       nobj++;
423       CmiAssert(nobj <= statsData->objData.capacity());
424   }
425   int &n_comm = statsData->n_comm;
426   for (i=0; i<m->n_comm; i++) {
427       statsData->commData[n_comm] = m->commData[i];
428       n_comm++;
429       CmiAssert(n_comm <= statsData->commData.capacity());
430   }
431   delete m;
432 }
433
434 void CentralLB::ReceiveStats(CkMarshalledCLBStatsMessage &msg)
435 {
436 #if CMK_LBDB_ON
437     //  loop through all CLBStatsMsg in the incoming msg
438   for (int num = 0; num < msg.getCount(); num++) 
439   {
440     CLBStatsMsg *m = (CLBStatsMsg *)msg.getMessage(num);
441     const int pe = m->from_pe;
442         DEBUGF(("Stats msg received, %d %d %d %p step %d\n", pe,stats_msg_count,m->n_objs,m,step()));
443 #ifdef _FAULT_MLOG_     
444 /*      
445  *              if(m->step < step()){
446  *                          //TODO: if a processor is redoing an old load balance step..
447  *                                      //tell it that the step is done and that it should not perform any migrations
448  *                                                  thisProxy[pe].ReceiveDummyMigration();
449  *                                                          }*/
450 #endif
451         
452     if(!CmiNodeAlive(pe)){
453         DEBUGF(("[%d] ReceiveStats called from invalidProcessor %d\n",CkMyPe(),pe));
454         continue;
455     }
456         
457     if (m->avail_vector) {
458       LBDatabaseObj()->set_avail_vector(m->avail_vector,  m->next_lb);
459     }
460
461     if (statsMsgsList[pe] != 0) {
462       CkPrintf("*** Unexpected CLBStatsMsg in ReceiveStats from PE %d ***\n",
463              pe);
464     } else {
465       statsMsgsList[pe] = m;
466 #if USE_REDUCTION
467       depositData(m);
468 #else
469       // store per processor data right away
470       struct ProcStats &procStat = statsData->procs[pe];
471       procStat.pe = pe;
472       procStat.total_walltime = m->total_walltime;
473       procStat.total_cputime = m->total_cputime;
474       procStat.idletime = m->idletime;
475       procStat.bg_walltime = m->bg_walltime;
476       procStat.bg_cputime = m->bg_cputime;
477       procStat.pe_speed = m->pe_speed;
478       //procStat.utilization = 1.0;
479       procStat.available = CmiTrue;
480       procStat.n_objs = m->n_objs;
481
482       statsData->n_objs += m->n_objs;
483       statsData->n_comm += m->n_comm;
484 #endif
485       stats_msg_count++;
486     }
487   }    // end of for
488
489   const int clients = CkNumValidPes();
490   DEBUGF(("THIS POINT count = %d, clients = %d\n",stats_msg_count,clients));
491  
492   if (stats_msg_count == clients) {
493         DEBUGF(("[%d] All stats messages received \n",CmiMyPe()));
494     statsData->count = stats_msg_count;
495     thisProxy[CkMyPe()].LoadBalance();
496   }
497 #endif
498 }
499
500 /** added by Abhinav for receiving msgs via spanning tree */
501 void CentralLB::ReceiveStatsViaTree(CkMarshalledCLBStatsMessage &msg)
502 {
503 #if CMK_LBDB_ON
504         CmiAssert(CkMyPe() != 0);
505         bufMsg.add(msg);         // buffer messages
506         count_msgs++;
507         //CkPrintf("here %d\n", CkMyPe());
508         if (count_msgs == st.numChildren+1) {
509                 if(st.parent == 0)
510                 {
511                         thisProxy[0].ReceiveStats(bufMsg);
512                         //CkPrintf("from %d\n", CkMyPe());
513                 }
514                 else
515                         thisProxy[st.parent].ReceiveStatsViaTree(bufMsg);
516                 count_msgs = 0;
517                 bufMsg.free();
518         } 
519 #endif
520 }
521
522 void CentralLB::LoadBalance()
523 {
524 #if CMK_LBDB_ON
525   int proc;
526   const int clients = CkNumPes();
527
528 #if ! USE_REDUCTION
529   // build data
530   buildStats();
531 #else
532   for (proc = 0; proc < clients; proc++) statsMsgsList[proc] = NULL;
533 #endif
534
535   if (_lb_args.debug()) 
536       CmiPrintf("[%s] Load balancing step %d starting at %f in PE%d Memory:%dMB\n",
537                  lbName(), step(),start_lb_time, cur_ld_balancer, CmiMemoryUsage()/(1024.0*1024.0));
538  //     CmiPrintf("[%d] n_obj:%d migratable:%d n_comm:%d\n", CkMyPe(), statsData->n_objs, statsData->n_migrateobjs, statsData->n_comm);
539 //    double strat_start_time = CkWallTimer();
540
541   // if we are in simulation mode read data
542   if (LBSimulation::doSimulation) simulationRead();
543
544   char *availVector = LBDatabaseObj()->availVector();
545   for(proc = 0; proc < clients; proc++)
546       statsData->procs[proc].available = (CmiBool)availVector[proc];
547
548   preprocess(statsData, clients);
549
550 //    CkPrintf("Before Calling Strategy\n");
551
552   if (_lb_args.printSummary()) {
553       LBInfo info(clients);
554         // not take comm data
555       info.getInfo(statsData, clients, 0);
556       double mLoad, mCpuLoad, totalLoad;
557       info.getSummary(mLoad, mCpuLoad, totalLoad);
558       int nmsgs, nbytes;
559       statsData->computeNonlocalComm(nmsgs, nbytes);
560       CkPrintf("[%d] Load Summary (before LB): max (with bg load): %f max (obj only): %f average: %f at step %d nonlocal: %d msgs %.2fKB.\n", CkMyPe(), mLoad, mCpuLoad, totalLoad/clients, step(), nmsgs, 1.0*nbytes/1024);
561 //      if (_lb_args.debug() > 1) {
562 //        for (int i=0; i<statsData->n_objs; i++)
563 //          CmiPrintf("[%d] %.10f %.10f\n", i, statsData->objData[i].minWall, statsData->objData[i].maxWall);
564 //      }
565   }
566
567   double strat_start_time = CkWallTimer();
568   LBMigrateMsg* migrateMsg = Strategy(statsData, clients);
569 #ifdef _FAULT_MLOG_
570         migrateMsg->step = step();
571 #endif
572   if (_lb_args.debug()) {
573     CkPrintf("Strategy took %f seconds.\n", CkWallTimer()-strat_start_time);
574     double lbdbMemsize = LBDatabase::Object()->useMem()/1000;
575     CkPrintf("[%s] memUsage: LBManager:%dKB CentralLB:%dKB\n", 
576               lbName(), (int)lbdbMemsize, (int)(useMem()/1000));
577   }
578
579 //    CkPrintf("returned successfully\n");
580
581   LBDatabaseObj()->get_avail_vector(migrateMsg->avail_vector);
582   migrateMsg->next_lb = LBDatabaseObj()->new_lbbalancer();
583
584   // if this is the step at which we need to dump the database
585   simulationWrite();
586
587 //  calculate predicted load
588 //  very time consuming though, so only happen when debugging is on
589   if (_lb_args.printSummary()) {
590       LBInfo info(clients);
591         // not take comm data
592       getPredictedLoadWithMsg(statsData, clients, migrateMsg, info, 0);
593       double mLoad, mCpuLoad, totalLoad;
594       info.getSummary(mLoad, mCpuLoad, totalLoad);
595       int nmsgs, nbytes;
596       statsData->computeNonlocalComm(nmsgs, nbytes);
597       CkPrintf("[%d] Load Summary (after LB): max (with bg load): %f max (obj only): %f average: %f at step %d nonlocal: %d msgs %.2fKB useMem: %.2fKB.\n", CkMyPe(), mLoad, mCpuLoad, totalLoad/clients, step(), nmsgs, 1.0*nbytes/1024, (1.0*useMem())/1024);
598       for (int i=0; i<clients; i++)
599         migrateMsg->expectedLoad[i] = info.peLoads[i];
600   }
601
602   DEBUGF(("[%d]calling recv migration\n",CkMyPe()));
603 #ifdef _FAULT_MLOG_ 
604     lbDecisionCount++;
605     migrateMsg->lbDecisionCount = lbDecisionCount;
606 #endif
607  CkPrintf("ReceiveMigration called at %.6lf \n",CmiWallTimer());
608   thisProxy.ReceiveMigration(migrateMsg);
609
610   // Zero out data structures for next cycle
611   // CkPrintf("zeroing out data\n");
612   statsData->clear();
613   stats_msg_count=0;
614 #endif
615 }
616
617 //    double strat_end_time = CkWallTimer();
618 //    CkPrintf("Strat elapsed time %f\n",strat_end_time-strat_start_time);
619 // test if sender and receiver in a commData is nonmigratable.
620 static int isMigratable(LDObjData **objData, int *len, int count, const LDCommData &commData)
621 {
622 #if CMK_LBDB_ON
623   for (int pe=0 ; pe<count; pe++)
624   {
625     for (int i=0; i<len[pe]; i++)
626       if (LDObjIDEqual(objData[pe][i].objID(), commData.sender.objID()) ||
627           LDObjIDEqual(objData[pe][i].objID(), commData.receiver.get_destObj().objID())) 
628       return 0;
629   }
630 #endif
631   return 1;
632 }
633
634 // rebuild LDStats and remove all non-migratble objects and related things
635 void CentralLB::removeNonMigratable(LDStats* stats, int count)
636 {
637   int i;
638
639   // check if we have non-migratable objects
640   int have = 0;
641   for (i=0; i<stats->n_objs; i++) 
642   {
643     LDObjData &odata = stats->objData[i];
644     if (!odata.migratable) {
645       have = 1; break;
646     }
647   }
648   if (have == 0) return;
649
650   CkVec<LDObjData> nonmig;
651   CkVec<int> new_from_proc, new_to_proc;
652   nonmig.resize(stats->n_migrateobjs);
653   new_from_proc.resize(stats->n_migrateobjs);
654   new_to_proc.resize(stats->n_migrateobjs);
655   int n_objs = 0;
656   for (i=0; i<stats->n_objs; i++) 
657   {
658     LDObjData &odata = stats->objData[i];
659     if (odata.migratable) {
660       nonmig[n_objs] = odata;
661       new_from_proc[n_objs] = stats->from_proc[i];
662       new_to_proc[n_objs] = stats->to_proc[i];
663       n_objs ++;
664     }
665     else {
666       stats->procs[stats->from_proc[i]].bg_walltime += odata.wallTime;
667       stats->procs[stats->from_proc[i]].bg_cputime += odata.cpuTime;
668     }
669   }
670   CmiAssert(stats->n_migrateobjs == n_objs);
671
672   stats->makeCommHash();
673   
674   CkVec<LDCommData> newCommData;
675   newCommData.resize(stats->n_comm);
676   int n_comm = 0;
677   for (i=0; i<stats->n_comm; i++) 
678   {
679     LDCommData& cdata = stats->commData[i];
680     if (!cdata.from_proc()) 
681     {
682       int idx = stats->getSendHash(cdata);
683       CmiAssert(idx != -1);
684       if (!stats->objData[idx].migratable) continue;
685     }
686     switch (cdata.receiver.get_type()) {
687     case LD_PROC_MSG:
688       break;
689     case LD_OBJ_MSG:  {
690       int idx = stats->getRecvHash(cdata);
691       if (stats->complete_flag)
692         CmiAssert(idx != -1);
693       else if (idx == -1) continue;          // receiver not in this group
694       if (!stats->objData[idx].migratable) continue;
695       break;
696       }
697     case LD_OBJLIST_MSG:    // object message FIXME add multicast
698       break;
699     }
700     newCommData[n_comm] = cdata;
701     n_comm ++;
702   }
703
704   if (n_objs != stats->n_objs) CmiPrintf("Removed %d nonmigratable %d comms - n_objs:%d migratable:%d\n", stats->n_objs-n_objs, stats->n_objs, stats->n_migrateobjs, stats->n_comm-n_comm);
705
706   // swap to new data
707   stats->objData = nonmig;
708   stats->from_proc = new_from_proc;
709   stats->to_proc = new_to_proc;
710   stats->n_objs = n_objs;
711
712   stats->commData = newCommData;
713   stats->n_comm = n_comm;
714
715   stats->deleteCommHash();
716   stats->makeCommHash();
717
718 }
719
720
721 #ifdef _FAULT_MLOG_
722 extern int restarted;
723 #endif
724
725 void CentralLB::ReceiveMigration(LBMigrateMsg *m)
726 {
727 #if CMK_LBDB_ON
728         int i;
729
730 #ifdef _FAULT_MLOG_
731         int *dummyCounts;
732
733         DEBUGF(("[%d] Starting ReceiveMigration WITH step %d m->step %d\n",CkMyPe(),step(),m->step));
734         // CmiPrintf("[%d] Starting ReceiveMigration step %d m->step %d\n",CkMyPe(),step(),m->step);
735         if(step() > m->step){
736                 char str[100];
737                 envelope *env = UsrToEnv(m);
738                 CmiPrintf("[%d] Object %s tProcessed %d m->TN %d\n",CmiMyPe(),mlogData->objID.toString(str),mlogData->tProcessed,env->TN);
739                 return;
740         }
741         lbDecisionCount = m->lbDecisionCount;
742 #endif
743
744   for (i=0; i<CkNumPes(); i++) theLbdb->lastLBInfo.expectedLoad[i] = m->expectedLoad[i];
745   CmiAssert(migrates_expected <= 0 || migrates_completed == migrates_expected);
746 /*FAULT_EVAC*/
747   if(!CmiNodeAlive(CkMyPe())){
748         delete m;
749         return;
750   }
751   migrates_expected = 0;
752   future_migrates_expected = 0;
753 #ifdef _FAULT_MLOG_
754         int sending=0;
755     int dummy=0;
756         LBDB *_myLBDB = theLbdb->getLBDB();
757         if(_restartFlag){
758         dummyCounts = new int[CmiNumPes()];
759         bzero(dummyCounts,sizeof(int)*CmiNumPes());
760     }
761 #endif
762   for(i=0; i < m->n_moves; i++) {
763     MigrateInfo& move = m->moves[i];
764     const int me = CkMyPe();
765     if (move.from_pe == me && move.to_pe != me) {
766       DEBUGF(("[%d] migrating object to %d\n",move.from_pe,move.to_pe));
767       // migrate object, in case it is already gone, inform toPe
768 #ifndef _FAULT_MLOG_
769       if (theLbdb->Migrate(move.obj,move.to_pe) == 0) 
770          thisProxy[move.to_pe].MissMigrate(!move.async_arrival);
771 #else
772             if(_restartFlag == 0){
773                 DEBUG(CmiPrintf("[%d] need to move object from %d to %d \n",CkMyPe(),move.from_pe,move.to_pe));
774                 theLbdb->Migrate(move.obj,move.to_pe);
775                 sending++;
776             }else{
777                 if(_myLBDB->validObjHandle(move.obj)){
778                     DEBUG(CmiPrintf("[%d] need to move object from %d to %d \n",CkMyPe(),move.from_pe,move.to_pe));
779                     theLbdb->Migrate(move.obj,move.to_pe);
780                     sending++;
781                 }else{
782                     DEBUG(CmiPrintf("[%d] dummy move to pe %d detected after restart \n",CmiMyPe(),move.to_pe));
783                     dummyCounts[move.to_pe]++;
784                     dummy++;
785                 }
786             }
787 #endif
788     } else if (move.from_pe != me && move.to_pe == me) {
789        DEBUGF(("[%d] expecting object from %d\n",move.to_pe,move.from_pe));
790       if (!move.async_arrival) migrates_expected++;
791       else future_migrates_expected++;
792     }
793   }
794   DEBUGF(("[%d] in ReceiveMigration %d moves expected: %d future expected: %d\n",CkMyPe(),m->n_moves, migrates_expected, future_migrates_expected));
795   // if (_lb_debug) CkPrintf("[%d] expecting %d objects migrating.\n", CkMyPe(), migrates_expected);
796
797 #ifdef _FAULT_MLOG_
798         if(_restartFlag){
799                 sendDummyMigrationCounts(dummyCounts);
800                 _restartFlag  =0;
801         delete []dummyCounts;
802         }
803 #endif
804
805
806 #if 0
807   if (m->n_moves ==0) {
808     theLbdb->SetLBPeriod(theLbdb->GetLBPeriod()*2);
809   }
810 #endif
811   cur_ld_balancer = m->next_lb;
812   if((CkMyPe() == cur_ld_balancer) && (cur_ld_balancer != 0)){
813       LBDatabaseObj()->set_avail_vector(m->avail_vector, -2);
814   }
815
816   if (migrates_expected == 0 || migrates_completed == migrates_expected)
817     MigrationDone(1);
818   delete m;
819
820 //      CkEvacuatedElement();
821 #ifdef _FAULT_MLOG_
822 //  migrates_expected = 0;
823 //  //  ResumeClients(1);
824 #endif
825 #endif
826 }
827
828 #ifdef _FAULT_MLOG_
829 void CentralLB::ReceiveDummyMigration(int globalDecisionCount){
830     DEBUGF(("[%d] ReceiveDummyMigration called for step %d with globalDecisionCount %d\n",CkMyPe(),step(),globalDecisionCount));
831     //TODO: this is gonna be important when a crash happens during checkpoint
832     //the globalDecisionCount would have to be saved and compared against
833     //a future recvMigration
834                 
835         thisProxy[CkMyPe()].ResumeClients(1);
836 }
837 #endif
838
839 void CentralLB::MigrationDone(int balancing)
840 {
841 #if CMK_LBDB_ON
842   migrates_completed = 0;
843   migrates_expected = -1;
844   // clear load stats
845   if (balancing) theLbdb->ClearLoads();
846   // Increment to next step
847   theLbdb->incStep();
848         DEBUGF(("[%d] Incrementing Step %d \n",CkMyPe(),step()));
849   // if sync resume, invoke a barrier
850
851 #ifdef _FAULT_MLOG_
852     savedBalancing = balancing;
853     startLoadBalancingMlog(&resumeCentralLbAfterChkpt,(void *)this);
854 #endif
855
856   LoadbalanceDone(balancing);        // callback
857 #ifndef _FAULT_MLOG_
858   // if sync resume invoke a barrier
859   if (balancing && _lb_args.syncResume()) {
860     CkCallback cb(CkIndex_CentralLB::ResumeClients((CkReductionMsg*)NULL), 
861                   thisProxy);
862     contribute(0, NULL, CkReduction::sum_int, cb);
863   }
864   else{ 
865     if(CmiNodeAlive(CkMyPe())){
866         thisProxy [CkMyPe()].ResumeClients(balancing);
867     }   
868   }     
869 #if CMK_GRID_QUEUE_AVAILABLE
870   CmiGridQueueDeregisterAll ();
871   CpvAccess(CkGridObject) = NULL;
872 #endif
873 #endif 
874 #endif
875 }
876
877 #ifdef _FAULT_MLOG_
878 void CentralLB::endMigrationDone(int balancing){
879     DEBUGF(("[%d] CentralLB::endMigrationDone step %d\n",CkMyPe(),step()));
880
881
882   if (balancing && _lb_args.syncResume()) {
883     CkCallback cb(CkIndex_CentralLB::ResumeClients((CkReductionMsg*)NULL),
884                   thisProxy);
885     contribute(0, NULL, CkReduction::sum_int, cb);
886   }
887   else{
888     if(CmiNodeAlive(CkMyPe())){
889     DEBUGF(("[%d] Sending ResumeClients balancing %d \n",CkMyPe(),balancing));
890     thisProxy [CkMyPe()].ResumeClients(balancing);
891     }
892   }
893
894 }
895 #endif
896
897 #ifdef _FAULT_MLOG_
898 void resumeCentralLbAfterChkpt(void *_lb){
899         DEBUGF(("[%d] HERE\n"));
900     CentralLB *lb= (CentralLB *)_lb;
901     CpvAccess(_currentObj)=lb;
902     lb->endMigrationDone(lb->savedBalancing);
903 }
904 #endif
905
906
907 void CentralLB::ResumeClients(CkReductionMsg *msg)
908 {
909   ResumeClients(1);
910   delete msg;
911 }
912
913 void CentralLB::ResumeClients(int balancing)
914 {
915 #if CMK_LBDB_ON
916 #ifdef _FAULT_MLOG_
917     resumeCount++;
918         CkPrintf("[%d] \n\n\n INCREMENTING THIS to %d  \n\n\n",CkMyPe(),resumeCount);
919     globalResumeCount = resumeCount;
920 #endif
921   DEBUGF(("[%d] Resuming clients. balancing:%d.\n",CkMyPe(),balancing));
922   if (balancing && _lb_args.debug() && CkMyPe() == cur_ld_balancer) {
923     double end_lb_time = CkWallTimer();
924     CkPrintf("[%s] Load balancing step %d finished at %f (duration %fs) Mem:%fMB\n",
925               lbName(), step()-1,end_lb_time, end_lb_time - start_lb_time, CmiMemoryUsage()/1024.0/1024.0);
926   }
927
928 #ifndef _FAULT_MLOG_
929   if (balancing) ComlibNotifyMigrationDone();  
930 #endif
931
932   theLbdb->ResumeClients();
933 CkPrintf("[%d] RIGHT HERE\n",CkMyPe());
934   if (balancing)  {
935     CheckMigrationComplete();
936     if (future_migrates_expected == 0 || 
937             future_migrates_expected == future_migrates_completed) {
938       CheckMigrationComplete();
939     }
940   }
941 #endif
942 }
943
944 /*
945   migration of objects contains two different kinds:
946   (1) objects want to make a barrier for migration completion
947       (waitForBarrier is true)
948       migrationDone() to finish and resumeClients
949   (2) objects don't need a barrier
950   However, next load balancing can only happen when both migrations complete
951 */ 
952 void CentralLB::CheckMigrationComplete()
953 {
954 #if CMK_LBDB_ON
955   lbdone ++;
956   if (lbdone == 2) {
957     lbdone = 0;
958     future_migrates_expected = -1;
959     future_migrates_completed = 0;
960     DEBUGF(("[%d] Migration Complete\n", CkMyPe()));
961     // release local barrier  so that the next load balancer can go
962     LDOMHandle h;
963     h.id.id.idx = 0;
964     theLbdb->getLBDB()->DoneRegisteringObjects(h);
965     // switch to the next load balancer in the list
966     // subtle: called from Migrated() may result in Migrated() called in next LB
967     theLbdb->nextLoadbalancer(seqno);
968   }
969 #endif
970 }
971
972 void CentralLB::preprocess(LDStats* stats,int count)
973 {
974   if (_lb_args.ignoreBgLoad())
975     stats->clearBgLoad();
976
977   // Call the predictor for the future
978   if (_lb_predict) FuturePredictor(statsData);
979 }
980
981 // default load balancing strategy
982 LBMigrateMsg* CentralLB::Strategy(LDStats* stats,int count)
983 {
984 #if CMK_LBDB_ON
985   work(stats, count);
986
987   if (_lb_args.debug()>1)  {
988     CkPrintf("Obj Map:\n");
989     for (int i=0; i<stats->n_objs; i++) CkPrintf("%d ", stats->to_proc[i]);
990     CkPrintf("\n");
991   }
992
993   return createMigrateMsg(stats, count);
994 #else
995   return NULL;
996 #endif
997 }
998
999 void CentralLB::work(LDStats* stats,int count)
1000 {
1001   // does nothing but print the database
1002   stats->print();
1003 }
1004
1005 // generate migrate message from stats->from_proc and to_proc
1006 LBMigrateMsg * CentralLB::createMigrateMsg(LDStats* stats,int count)
1007 {
1008   int i;
1009   CkVec<MigrateInfo*> migrateInfo;
1010   for (i=0; i<stats->n_objs; i++) {
1011     LDObjData &objData = stats->objData[i];
1012     int frompe = stats->from_proc[i];
1013     int tope = stats->to_proc[i];
1014     if (frompe != tope) {
1015       //      CkPrintf("[%d] Obj %d migrating from %d to %d\n",
1016       //         CkMyPe(),obj,pe,dest);
1017       MigrateInfo *migrateMe = new MigrateInfo;
1018       migrateMe->obj = objData.handle;
1019       migrateMe->from_pe = frompe;
1020       migrateMe->to_pe = tope;
1021       migrateMe->async_arrival = objData.asyncArrival;
1022       migrateInfo.insertAtEnd(migrateMe);
1023     }
1024   }
1025
1026   int migrate_count=migrateInfo.length();
1027   LBMigrateMsg* msg = new(migrate_count,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
1028   msg->n_moves = migrate_count;
1029   for(i=0; i < migrate_count; i++) {
1030     MigrateInfo* item = (MigrateInfo*) migrateInfo[i];
1031     msg->moves[i] = *item;
1032     delete item;
1033     migrateInfo[i] = 0;
1034   }
1035   if (_lb_args.debug())
1036     CkPrintf("%s: %d objects migrating.\n", lbname, migrate_count);
1037   return msg;
1038 }
1039
1040 void CentralLB::simulationWrite() {
1041   if(step() == LBSimulation::dumpStep)
1042   {
1043     // here we are supposed to dump the database
1044     int dumpFileSize = strlen(LBSimulation::dumpFile) + 4;
1045     char *dumpFileName = (char *)malloc(dumpFileSize);
1046     while (sprintf(dumpFileName, "%s.%d", LBSimulation::dumpFile, LBSimulation::dumpStep) >= dumpFileSize) {
1047       free(dumpFileName);
1048       dumpFileSize+=3;
1049       dumpFileName = (char *)malloc(dumpFileSize);
1050     }
1051     writeStatsMsgs(dumpFileName);
1052     free(dumpFileName);
1053     CmiPrintf("LBDump: Dumped the load balancing data at step %d.\n",LBSimulation::dumpStep);
1054     ++LBSimulation::dumpStep;
1055     --LBSimulation::dumpStepSize;
1056     if (LBSimulation::dumpStepSize <= 0) { // prevent stupid step sizes
1057       CmiPrintf("Charm++> Exiting...\n");
1058       CkExit();
1059     }
1060     return;
1061   }
1062 }
1063
1064 void CentralLB::simulationRead() {
1065   LBSimulation *simResults = NULL, *realResults;
1066   LBMigrateMsg *voidMessage = new (0,0,0,0) LBMigrateMsg();
1067   voidMessage->n_moves=0;
1068   for ( ;LBSimulation::simStepSize > 0; --LBSimulation::simStepSize, ++LBSimulation::simStep) {
1069     // here we are supposed to read the data from the dump database
1070     int simFileSize = strlen(LBSimulation::dumpFile) + 4;
1071     char *simFileName = (char *)malloc(simFileSize);
1072     while (sprintf(simFileName, "%s.%d", LBSimulation::dumpFile, LBSimulation::simStep) >= simFileSize) {
1073       free(simFileName);
1074       simFileSize+=3;
1075       simFileName = (char *)malloc(simFileSize);
1076     }
1077     readStatsMsgs(simFileName);
1078
1079     // allocate simResults (only the first step)
1080     if (simResults == NULL) {
1081       simResults = new LBSimulation(LBSimulation::simProcs);
1082       realResults = new LBSimulation(LBSimulation::simProcs);
1083     }
1084     else {
1085       // should be the same number of procs of the original simulation!
1086       if (!LBSimulation::procsChanged) {
1087         // it means we have a previous step, so in simResults there is data.
1088         // we can now print the real effects of the load balancer during the simulation
1089         // or print the difference between the predicted data and the real one.
1090         realResults->reset();
1091         // reset to_proc of statsData to be equal to from_proc
1092         for (int k=0; k < statsData->n_objs; ++k) statsData->to_proc[k] = statsData->from_proc[k];
1093         findSimResults(statsData, LBSimulation::simProcs, voidMessage, realResults);
1094         simResults->PrintDifferences(realResults,statsData);
1095       }
1096       simResults->reset();
1097     }
1098
1099     // now pass it to the strategy routine
1100     double startT = CkWallTimer();
1101     preprocess(statsData, LBSimulation::simProcs);
1102     CmiPrintf("%s> Strategy starts ... \n", lbname);
1103     LBMigrateMsg* migrateMsg = Strategy(statsData, LBSimulation::simProcs);
1104     CmiPrintf("%s> Strategy took %fs memory usage: CentralLB:%dKB. \n", 
1105                lbname, CkWallTimer()-startT, (int)(useMem()/1000));
1106
1107     // now calculate the results of the load balancing simulation
1108     findSimResults(statsData, LBSimulation::simProcs, migrateMsg, simResults);
1109
1110     // now we have the simulation data, so print it and loop
1111     CmiPrintf("Charm++> LBSim: Simulation of load balancing step %d done.\n",LBSimulation::simStep);
1112     // **CWL** Officially recording my disdain here for using ints for bool
1113     if (LBSimulation::showDecisionsOnly) {
1114       simResults->PrintDecisions(migrateMsg, simFileName, 
1115                                  LBSimulation::simProcs);
1116     } else {
1117       simResults->PrintSimulationResults();
1118     }
1119
1120     free(simFileName);
1121     delete migrateMsg;
1122     CmiPrintf("Charm++> LBSim: Passing to the next step\n");
1123   }
1124   // deallocate simResults
1125   delete simResults;
1126   CmiPrintf("Charm++> Exiting...\n");
1127   CkExit();
1128 }
1129
1130 void CentralLB::readStatsMsgs(const char* filename) 
1131 {
1132 #if CMK_LBDB_ON
1133   int i;
1134   FILE *f = fopen(filename, "r");
1135   if (f==NULL) {
1136     CmiPrintf("Fatal Error> Cannot open LB Dump file %s!\n", filename);
1137     CmiAbort("");
1138   }
1139
1140   // at this stage, we need to rebuild the statsMsgList and
1141   // statsDataList structures. For that first deallocate the
1142   // old structures
1143   if (statsMsgsList) {
1144     for(i = 0; i < stats_msg_count; i++)
1145       delete statsMsgsList[i];
1146     delete[] statsMsgsList;
1147     statsMsgsList=0;
1148   }
1149
1150   PUP::fromDisk pd(f);
1151   PUP::machineInfo machInfo;
1152
1153   pd((char *)&machInfo, sizeof(machInfo));      // read machine info
1154   PUP::xlater p(machInfo, pd);
1155
1156   if (_lb_args.lbversion() > 1) {
1157     p|_lb_args.lbversion();             // write version number
1158     CkPrintf("LB> File version detected: %d\n", _lb_args.lbversion());
1159     CmiAssert(_lb_args.lbversion() <= LB_FORMAT_VERSION);
1160   }
1161   p|stats_msg_count;
1162
1163   CmiPrintf("readStatsMsgs for %d pes starts ... \n", stats_msg_count);
1164   if (LBSimulation::simProcs == 0) LBSimulation::simProcs = stats_msg_count;
1165   if (LBSimulation::simProcs != stats_msg_count) LBSimulation::procsChanged = true;
1166
1167   // LBSimulation::simProcs must be set
1168   statsData->pup(p);
1169
1170   CmiPrintf("Simulation for %d pes \n", LBSimulation::simProcs);
1171   CmiPrintf("n_obj: %d n_migratble: %d \n", statsData->n_objs, statsData->n_migrateobjs);
1172
1173   // file f is closed in the destructor of PUP::fromDisk
1174   CmiPrintf("ReadStatsMsg from %s completed\n", filename);
1175 #endif
1176 }
1177
1178 void CentralLB::writeStatsMsgs(const char* filename) 
1179 {
1180 #if CMK_LBDB_ON
1181   FILE *f = fopen(filename, "w");
1182   if (f==NULL) {
1183     CmiPrintf("Fatal Error> writeStatsMsgs failed to open the output file %s!\n", filename);
1184     CmiAbort("");
1185   }
1186
1187   const PUP::machineInfo &machInfo = PUP::machineInfo::current();
1188   PUP::toDisk p(f);
1189   p((char *)&machInfo, sizeof(machInfo));       // machine info
1190
1191   p|_lb_args.lbversion();               // write version number
1192   p|stats_msg_count;
1193   statsData->pup(p);
1194
1195   fclose(f);
1196
1197   CmiPrintf("WriteStatsMsgs to %s succeed!\n", filename);
1198 #endif
1199 }
1200
1201 // calculate the predicted wallclock/cpu load for every processors
1202 // considering communication overhead if considerComm is true
1203 void getPredictedLoadWithMsg(BaseLB::LDStats* stats, int count, 
1204                       LBMigrateMsg *msg, LBInfo &info, 
1205                       int considerComm)
1206 {
1207 #if CMK_LBDB_ON
1208         stats->makeCommHash();
1209
1210         // update to_proc according to migration msgs
1211         for(int i = 0; i < msg->n_moves; i++) {
1212           MigrateInfo &mInfo = msg->moves[i];
1213           int idx = stats->getHash(mInfo.obj.objID(), mInfo.obj.omID());
1214           CmiAssert(idx != -1);
1215           stats->to_proc[idx] = mInfo.to_pe;
1216         }
1217
1218         info.getInfo(stats, count, considerComm);
1219 #endif
1220 }
1221
1222
1223 void CentralLB::findSimResults(LDStats* stats, int count, LBMigrateMsg* msg, LBSimulation* simResults)
1224 {
1225     CkAssert(simResults != NULL && count == simResults->numPes);
1226     // estimate the new loads of the processors. As a first approximation, this is the
1227     // sum of the cpu times of the objects on that processor
1228     double startT = CkWallTimer();
1229     getPredictedLoadWithMsg(stats, count, msg, simResults->lbinfo, 1);
1230     CmiPrintf("getPredictedLoad finished in %fs\n", CkWallTimer()-startT);
1231 }
1232
1233 void CentralLB::pup(PUP::er &p) { 
1234   BaseLB::pup(p); 
1235   if (p.isUnpacking())  {
1236     initLB(CkLBOptions(seqno)); 
1237   }
1238 #ifdef _FAULT_MLOG_
1239         p | lbDecisionCount;
1240     p | resumeCount;
1241 #endif
1242         
1243 }
1244
1245 int CentralLB::useMem() { 
1246   return sizeof(CentralLB) + statsData->useMem() + 
1247          CkNumPes() * sizeof(CLBStatsMsg *);
1248 }
1249
1250
1251 /**
1252   CLBStatsMsg is not a real message now.
1253   CLBStatsMsg is used for all processors to fill in their local load and comm
1254   statistics and send to processor 0
1255 */
1256
1257 CLBStatsMsg::CLBStatsMsg(int osz, int csz) {
1258   n_objs = osz;
1259   n_comm = csz;
1260   objData = new LDObjData[osz];
1261   commData = new LDCommData[csz];
1262   avail_vector = NULL;
1263 }
1264
1265 CLBStatsMsg::~CLBStatsMsg() {
1266   delete [] objData;
1267   delete [] commData;
1268   if (avail_vector) delete [] avail_vector;
1269 }
1270
1271 void CLBStatsMsg::pup(PUP::er &p) {
1272   int i;
1273   p|from_pe;
1274   p|pe_speed;
1275   p|total_walltime; p|total_cputime;
1276   p|idletime;
1277   p|bg_walltime;   p|bg_cputime;
1278   p|n_objs;
1279   if (p.isUnpacking()) objData = new LDObjData[n_objs];
1280   for (i=0; i<n_objs; i++) p|objData[i];
1281   p|n_comm;
1282   if (p.isUnpacking()) commData = new LDCommData[n_comm];
1283   for (i=0; i<n_comm; i++) p|commData[i];
1284
1285   int has_avail_vector;
1286   if (!p.isUnpacking()) has_avail_vector = (avail_vector != NULL);
1287   p|has_avail_vector;
1288   if (p.isUnpacking()) {
1289     if (has_avail_vector) avail_vector = new char[CkNumPes()];
1290     else avail_vector = NULL;
1291   }
1292   if (has_avail_vector) p(avail_vector, CkNumPes());
1293
1294   p(next_lb);
1295 }
1296
1297 // CkMarshalledCLBStatsMessage is used in the marshalled parameter in
1298 // the entry function, it is just used to use to pup.
1299 // I don't use CLBStatsMsg directly as marshalled parameter because
1300 // I want the data pointer stored and not to be freed by the Charm++.
1301 void CkMarshalledCLBStatsMessage::free() { 
1302   int count = msgs.size();
1303   for  (int i=0; i<count; i++) 
1304     if (msgs[i]) delete msgs[i];
1305   msgs.free();
1306 }
1307
1308 void CkMarshalledCLBStatsMessage::add(CkMarshalledCLBStatsMessage &m)
1309 {
1310   int count = m.getCount();
1311   for (int i=0; i<count; i++) add(m.getMessage(i));
1312 }
1313
1314 void CkMarshalledCLBStatsMessage::pup(PUP::er &p)
1315 {
1316   int count = msgs.size();
1317   p|count;
1318   for (int i=0; i<count; i++) {
1319     CLBStatsMsg *msg;
1320     if (p.isUnpacking()) msg = new CLBStatsMsg;
1321     else { 
1322       msg = msgs[i]; CmiAssert(msg);
1323     }
1324     msg->pup(p);
1325     if (p.isUnpacking()) add(msg);
1326   }
1327 }
1328
1329 SpanningTree::SpanningTree()
1330 {
1331         double sq = sqrt(CkNumPes()*4.0-3.0) - 1; // 1 + arity + arity*arity = CkNumPes()
1332         arity = (int)ceil(sq/2);
1333         calcParent(CkMyPe());
1334         calcNumChildren(CkMyPe());
1335 }
1336
1337 void SpanningTree::calcParent(int n)
1338 {
1339         parent=-1;
1340         if(n != 0  && arity > 0)
1341                 parent = (n-1)/arity;
1342 }
1343
1344 void SpanningTree::calcNumChildren(int n)
1345 {
1346         numChildren = 0;
1347         if (arity == 0) return;
1348         int fullNode=(CkNumPes()-1-arity)/arity;
1349         if(n <= fullNode)
1350                 numChildren = arity;
1351         if(n == fullNode+1)
1352                 numChildren = CkNumPes()-1-(fullNode+1)*arity;
1353         if(n > fullNode+1)
1354                 numChildren = 0;
1355 }
1356
1357 #include "CentralLB.def.h"
1358  
1359 /*@}*/