891b40793ddf53de59f665b5ff5e8347cdfaaa71
[charm.git] / src / ck-ldb / CentralLB.C
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 /**
9  * \addtogroup CkLdb
10 */
11 /*@{*/
12
13 #include <charm++.h>
14 #include "envelope.h"
15 #include "CentralLB.h"
16 #include "LBDBManager.h"
17 #include "LBSimulation.h"
18
19 #define  DEBUGF(x)        //CmiPrintf x;
20 #define  DEBUG(x)      // x;
21
22 #if CMK_MEM_CHECKPOINT
23    /* can not handle reduction in inmem FT */
24 #define USE_REDUCTION         0
25 #define USE_LDB_SPANNING_TREE 0
26 #else
27 #define USE_REDUCTION         1
28 #define USE_LDB_SPANNING_TREE 1
29 #endif
30
31 #ifdef _FAULT_MLOG_
32 /* can not handle reduction in inmem FT */
33 #undef USE_REDUCTION
34 #undef USE_LDB_SPANNING_TREE
35 #define USE_REDUCTION         0
36 #define USE_LDB_SPANNING_TREE 0
37 #else
38 #define USE_REDUCTION         1
39 #define USE_LDB_SPANNING_TREE 1
40 #endif
41
42 #ifdef _FAULT_MLOG_
43 extern int _restartFlag;
44 extern void getGlobalStep(CkGroupID );
45 extern void initMlogLBStep(CkGroupID );
46 extern int globalResumeCount;
47 extern void sendDummyMigrationCounts(int *);
48 #endif
49
50 #if CMK_GRID_QUEUE_AVAILABLE
51 CpvExtern(void *, CkGridObject);
52 #endif
53
54 CkGroupID loadbalancer;
55 int * lb_ptr;
56 int load_balancer_created;
57
58 CreateLBFunc_Def(CentralLB, "CentralLB base class");
59
60 static void getPredictedLoadWithMsg(BaseLB::LDStats* stats, int count, 
61                              LBMigrateMsg *, LBInfo &info, int considerComm);
62
63 /*
64 void CreateCentralLB()
65 {
66   CProxy_CentralLB::ckNew(0);
67 }
68 */
69
70 void CentralLB::staticStartLB(void* data)
71 {
72   CentralLB *me = (CentralLB*)(data);
73   me->StartLB();
74 }
75
76 void CentralLB::staticMigrated(void* data, LDObjHandle h, int waitBarrier)
77 {
78   CentralLB *me = (CentralLB*)(data);
79   me->Migrated(h, waitBarrier);
80 }
81
82 void CentralLB::staticAtSync(void* data)
83 {
84   CentralLB *me = (CentralLB*)(data);
85   me->AtSync();
86 }
87
88 void CentralLB::initLB(const CkLBOptions &opt)
89 {
90 #if CMK_LBDB_ON
91   lbname = "CentralLB";
92   thisProxy = CProxy_CentralLB(thisgroup);
93   //  CkPrintf("Construct in %d\n",CkMyPe());
94
95   // create and turn on by default
96   receiver = theLbdb->
97     AddLocalBarrierReceiver((LDBarrierFn)(staticAtSync),(void*)(this));
98   notifier = theLbdb->getLBDB()->
99     NotifyMigrated((LDMigratedFn)(staticMigrated),(void*)(this));
100   startLbFnHdl = theLbdb->getLBDB()->
101     AddStartLBFn((LDStartLBFn)(staticStartLB),(void*)(this));
102
103   // CkPrintf("[%d] CentralLB initLB \n",CkMyPe());
104   if (opt.getSeqNo() > 0) turnOff();
105
106   stats_msg_count = 0;
107   statsMsgsList = new CLBStatsMsg*[CkNumPes()];
108   for(int i=0; i < CkNumPes(); i++)
109     statsMsgsList[i] = 0;
110
111   statsData = new LDStats;
112
113   // for future predictor
114   if (_lb_predict) predicted_model = new FutureModel(_lb_predict_window);
115   else predicted_model=0;
116   // register user interface callbacks
117   theLbdb->getLBDB()->SetupPredictor((LDPredictModelFn)(staticPredictorOn),(LDPredictWindowFn)(staticPredictorOnWin),(LDPredictFn)(staticPredictorOff),(LDPredictModelFn)(staticChangePredictor),(void*)(this));
118
119   myspeed = theLbdb->ProcessorSpeed();
120
121   migrates_completed = 0;
122   future_migrates_completed = 0;
123   migrates_expected = -1;
124   future_migrates_expected = -1;
125   cur_ld_balancer = _lb_args.central_pe();      // 0 default
126   lbdone = 0;
127   count_msgs=0;
128   statsMsg = NULL;
129
130   if (_lb_args.statsOn()) theLbdb->CollectStatsOn();
131
132   load_balancer_created = 1;
133 #endif
134 }
135
136 CentralLB::~CentralLB()
137 {
138 #if CMK_LBDB_ON
139   delete [] statsMsgsList;
140   delete statsData;
141   theLbdb = CProxy_LBDatabase(_lbdb).ckLocalBranch();
142   if (theLbdb) {
143     theLbdb->getLBDB()->
144       RemoveNotifyMigrated(notifier);
145     theLbdb->
146       RemoveStartLBFn((LDStartLBFn)(staticStartLB));
147   }
148 #endif
149 }
150
151 void CentralLB::turnOn() 
152 {
153 #if CMK_LBDB_ON
154   theLbdb->getLBDB()->
155     TurnOnBarrierReceiver(receiver);
156   theLbdb->getLBDB()->
157     TurnOnNotifyMigrated(notifier);
158   theLbdb->getLBDB()->
159     TurnOnStartLBFn(startLbFnHdl);
160 #endif
161 }
162
163 void CentralLB::turnOff() 
164 {
165 #if CMK_LBDB_ON
166   theLbdb->getLBDB()->
167     TurnOffBarrierReceiver(receiver);
168   theLbdb->getLBDB()->
169     TurnOffNotifyMigrated(notifier);
170   theLbdb->getLBDB()->
171     TurnOffStartLBFn(startLbFnHdl);
172 #endif
173 }
174
175 void CentralLB::AtSync()
176 {
177 #if CMK_LBDB_ON
178   DEBUGF(("[%d] CentralLB AtSync step %d!!!!!\n",CkMyPe(),step()));
179
180 #ifdef _FAULT_MLOG_
181 #endif
182
183   // if num of processor is only 1, nothing should happen
184   if (!QueryBalanceNow(step()) || CkNumPes() == 1) {
185     MigrationDone(0);
186     return;
187   }
188   if(CmiNodeAlive(CkMyPe())){
189     thisProxy [CkMyPe()].ProcessAtSync();
190   }
191 #endif
192 }
193
194 #include "ComlibStrategy.h"
195
196 void CentralLB::ProcessAtSync()
197 {
198 #if CMK_LBDB_ON
199   CmiAssert(CmiNodeAlive(CkMyPe()));
200   if (CkMyPe() == cur_ld_balancer) {
201     start_lb_time = CkWallTimer();
202   }
203
204   // build message
205   BuildStatsMsg();
206
207 #if USE_REDUCTION
208     // reduction to get total number of objects and comm
209     // so that processor 0 can pre-allocate load balancing database
210   int counts[2];
211   counts[0] = theLbdb->GetObjDataSz();
212   counts[1] = theLbdb->GetCommDataSz();
213
214   CkCallback cb(CkIndex_CentralLB::ReceiveCounts((CkReductionMsg*)NULL), 
215                   thisProxy[0]);
216   contribute(2*sizeof(int), counts, CkReduction::sum_int, cb);
217 #else
218   SendStats();
219 #endif
220 #endif
221 }
222
223 // called only on 0
224 void CentralLB::ReceiveCounts(CkReductionMsg  *msg)
225 {
226   CmiAssert(CkMyPe() == 0);
227
228   int *counts = (int *)msg->getData();
229   int n_objs = counts[0];
230   int n_comm = counts[1];
231
232     // resize database
233   statsData->objData.resize(n_objs);
234   statsData->from_proc.resize(n_objs);
235   statsData->to_proc.resize(n_objs);
236   statsData->commData.resize(n_comm);
237
238   DEBUGF(("[%d] ReceiveCounts\n",CkMyPe()));
239         
240     // broadcast call to let everybody start to send stats
241   thisProxy.SendStats();
242 }
243
244 void CentralLB::BuildStatsMsg()
245 {
246 #if CMK_LBDB_ON
247   // build and send stats
248   const int osz = theLbdb->GetObjDataSz();
249   const int csz = theLbdb->GetCommDataSz();
250
251   int npes = CkNumPes();
252   CLBStatsMsg* msg = new CLBStatsMsg(osz, csz);
253   msg->from_pe = CkMyPe();
254   //msg->serial = CrnRand();
255
256 /*
257   theLbdb->TotalTime(&msg->total_walltime,&msg->total_cputime);
258   theLbdb->IdleTime(&msg->idletime);
259   theLbdb->BackgroundLoad(&msg->bg_walltime,&msg->bg_cputime);
260 */
261   theLbdb->GetTime(&msg->total_walltime,&msg->total_cputime,
262                    &msg->idletime, &msg->bg_walltime,&msg->bg_cputime);
263   msg->pe_speed = myspeed;
264   DEBUGF(("Processor %d Total time (wall,cpu) = %f %f Idle = %f Bg = %f %f\n", CkMyPe(),msg->total_walltime,msg->total_cputime,msg->idletime,msg->bg_walltime,msg->bg_cputime));
265
266   msg->n_objs = osz;
267   theLbdb->GetObjData(msg->objData);
268   msg->n_comm = csz;
269   theLbdb->GetCommData(msg->commData);
270 //  theLbdb->ClearLoads();
271   DEBUGF(("PE %d sending stats to ReceiveStats %d objs, %d comm\n",CkMyPe(),msg->n_objs,msg->n_comm));
272
273   if(CkMyPe() == cur_ld_balancer) {
274     msg->avail_vector = new char[CkNumPes()];
275     LBDatabaseObj()->get_avail_vector(msg->avail_vector);
276     msg->next_lb = LBDatabaseObj()->new_lbbalancer();
277   }
278
279   CmiAssert(statsMsg == NULL);
280   statsMsg = msg;
281 #endif
282 }
283
284 // called on every processor
285 void CentralLB::SendStats()
286 {
287 #if CMK_LBDB_ON
288   CmiAssert(statsMsg != NULL);
289
290 #if USE_LDB_SPANNING_TREE
291   if(CkNumPes()>1024)
292   {
293     if (CkMyPe() == cur_ld_balancer)
294       thisProxy[CkMyPe()].ReceiveStats(statsMsg);
295     else
296       thisProxy[CkMyPe()].ReceiveStatsViaTree(statsMsg);
297   }
298   else
299 #endif
300   thisProxy[cur_ld_balancer].ReceiveStats(statsMsg);
301
302   statsMsg = NULL;
303
304 #ifdef __BLUEGENE__
305   BgEndStreaming();
306 #endif
307
308   {
309   // enfore the barrier to wait until centralLB says no
310   LDOMHandle h;
311   h.id.id.idx = 0;
312   theLbdb->getLBDB()->RegisteringObjects(h);
313   }
314 #endif
315 }
316
317 #ifdef _FAULT_MLOG_
318 extern int donotCountMigration;
319 #endif
320
321 void CentralLB::Migrated(LDObjHandle h, int waitBarrier)
322 {
323 #ifdef _FAULT_MLOG_
324     if(donotCountMigration){
325         return ;
326     }
327 #endif
328
329 #if CMK_LBDB_ON
330   if (waitBarrier) {
331             migrates_completed++;
332       DEBUGF(("[%d] An object migrated! %d %d\n",CkMyPe(),migrates_completed,migrates_expected));
333     if (migrates_completed == migrates_expected) {
334       MigrationDone(1);
335     }
336   }
337   else {
338     future_migrates_completed ++;
339     DEBUGF(("[%d] An object migrated with no barrier! %d expected: %d\n",CkMyPe(),future_migrates_completed,future_migrates_expected));
340     if (future_migrates_completed == future_migrates_expected)  {
341         CheckMigrationComplete();
342     }
343   }
344 #endif
345 }
346
347 void CentralLB::MissMigrate(int waitForBarrier)
348 {
349   LDObjHandle h;
350   Migrated(h, waitForBarrier);
351 }
352
353 // build a complete data from bufferred messages
354 // not used when USE_REDUCTION = 1
355 void CentralLB::buildStats()
356 {
357     statsData->count = stats_msg_count;
358     // allocate space
359     statsData->objData.resize(statsData->n_objs);
360     statsData->from_proc.resize(statsData->n_objs);
361     statsData->to_proc.resize(statsData->n_objs);
362     statsData->commData.resize(statsData->n_comm);
363
364     int nobj = 0;
365     int ncom = 0;
366     int nmigobj = 0;
367     // copy all data in individule message to this big structure
368     for (int pe=0; pe<CkNumPes(); pe++) {
369        int i;
370        CLBStatsMsg *msg = statsMsgsList[pe];
371        if(msg == NULL) continue;
372        for (i=0; i<msg->n_objs; i++) {
373          statsData->from_proc[nobj] = statsData->to_proc[nobj] = pe;
374          statsData->objData[nobj] = msg->objData[i];
375          if (msg->objData[i].migratable) nmigobj++;
376          nobj++;
377        }
378        for (i=0; i<msg->n_comm; i++) {
379          statsData->commData[ncom] = msg->commData[i];
380          ncom++;
381        }
382        // free the memory
383        delete msg;
384        statsMsgsList[pe]=0;
385     }
386     statsData->n_migrateobjs = nmigobj;
387 }
388
389 // deposit one processor data at a time, note database is pre-allocated
390 // to have enough space
391 // used when USE_REDUCTION = 1
392 void CentralLB::depositData(CLBStatsMsg *m)
393 {
394   int i;
395   if (m == NULL) return;
396
397   const int pe = m->from_pe;
398   struct ProcStats &procStat = statsData->procs[pe];
399   procStat.pe = pe;
400   procStat.total_walltime = m->total_walltime;
401   procStat.total_cputime = m->total_cputime;
402   procStat.idletime = m->idletime;
403   procStat.bg_walltime = m->bg_walltime;
404   procStat.bg_cputime = m->bg_cputime;
405   procStat.pe_speed = m->pe_speed;
406   //procStat.utilization = 1.0;
407   procStat.available = CmiTrue;
408   procStat.n_objs = m->n_objs;
409
410   int &nobj = statsData->n_objs;
411   int &nmigobj = statsData->n_migrateobjs;
412   for (i=0; i<m->n_objs; i++) {
413       statsData->from_proc[nobj] = statsData->to_proc[nobj] = pe;
414       statsData->objData[nobj] = m->objData[i];
415       if (m->objData[i].migratable) nmigobj++;
416       nobj++;
417       CmiAssert(nobj <= statsData->objData.capacity());
418   }
419   int &n_comm = statsData->n_comm;
420   for (i=0; i<m->n_comm; i++) {
421       statsData->commData[n_comm] = m->commData[i];
422       n_comm++;
423       CmiAssert(n_comm <= statsData->commData.capacity());
424   }
425   delete m;
426 }
427
428 void CentralLB::ReceiveStats(CkMarshalledCLBStatsMessage &msg)
429 {
430 #if CMK_LBDB_ON
431     //  loop through all CLBStatsMsg in the incoming msg
432   for (int num = 0; num < msg.getCount(); num++) 
433   {
434     CLBStatsMsg *m = (CLBStatsMsg *)msg.getMessage(num);
435     const int pe = m->from_pe;
436     DEBUGF(("Stats msg received, %d %d %d %p\n", pe,stats_msg_count,m->n_objs,m));
437 #ifdef _FAULT_MLOG_     
438 /*      
439  *              if(m->step < step()){
440  *                          //TODO: if a processor is redoing an old load balance step..
441  *                                      //tell it that the step is done and that it should not perform any migrations
442  *                                                  thisProxy[pe].ReceiveDummyMigration();
443  *                                                          }*/
444 #endif
445         
446     if(!CmiNodeAlive(pe)){
447         DEBUGF(("[%d] ReceiveStats called from invalidProcessor %d\n",CkMyPe(),pe));
448         continue;
449     }
450         
451     if (m->avail_vector) {
452       LBDatabaseObj()->set_avail_vector(m->avail_vector,  m->next_lb);
453     }
454
455     if (statsMsgsList[pe] != 0) {
456       CkPrintf("*** Unexpected CLBStatsMsg in ReceiveStats from PE %d ***\n",
457              pe);
458     } else {
459       statsMsgsList[pe] = m;
460 #if USE_REDUCTION
461       depositData(m);
462 #else
463       // store per processor data right away
464       struct ProcStats &procStat = statsData->procs[pe];
465       procStat.pe = pe;
466       procStat.total_walltime = m->total_walltime;
467       procStat.total_cputime = m->total_cputime;
468       procStat.idletime = m->idletime;
469       procStat.bg_walltime = m->bg_walltime;
470       procStat.bg_cputime = m->bg_cputime;
471       procStat.pe_speed = m->pe_speed;
472       //procStat.utilization = 1.0;
473       procStat.available = CmiTrue;
474       procStat.n_objs = m->n_objs;
475
476       statsData->n_objs += m->n_objs;
477       statsData->n_comm += m->n_comm;
478 #endif
479       stats_msg_count++;
480     }
481   }    // end of for
482
483   const int clients = CkNumValidPes();
484  
485   if (stats_msg_count == clients) {
486     statsData->count = stats_msg_count;
487     thisProxy[CkMyPe()].LoadBalance();
488   }
489 #endif
490 }
491
492 /** added by Abhinav for receiving msgs via spanning tree */
493 void CentralLB::ReceiveStatsViaTree(CkMarshalledCLBStatsMessage &msg)
494 {
495 #if CMK_LBDB_ON
496         CmiAssert(CkMyPe() != 0);
497         bufMsg.add(msg);         // buffer messages
498         count_msgs++;
499         //CkPrintf("here %d\n", CkMyPe());
500         if (count_msgs == st.numChildren+1) {
501                 if(st.parent == 0)
502                 {
503                         thisProxy[0].ReceiveStats(bufMsg);
504                         //CkPrintf("from %d\n", CkMyPe());
505                 }
506                 else
507                         thisProxy[st.parent].ReceiveStatsViaTree(bufMsg);
508                 count_msgs = 0;
509                 bufMsg.free();
510         } 
511 #endif
512 }
513
514 void CentralLB::LoadBalance()
515 {
516 #if CMK_LBDB_ON
517   int proc;
518   const int clients = CkNumPes();
519
520 #if ! USE_REDUCTION
521   // build data
522   buildStats();
523 #else
524   for (proc = 0; proc < clients; proc++) statsMsgsList[proc] = NULL;
525 #endif
526
527   if (_lb_args.debug()) 
528       CmiPrintf("[%s] Load balancing step %d starting at %f in PE%d Memory:%dMB\n",
529                  lbName(), step(),start_lb_time, cur_ld_balancer, CmiMemoryUsage()/(1024.0*1024.0));
530  //     CmiPrintf("[%d] n_obj:%d migratable:%d n_comm:%d\n", CkMyPe(), statsData->n_objs, statsData->n_migrateobjs, statsData->n_comm);
531 //    double strat_start_time = CkWallTimer();
532
533   // if we are in simulation mode read data
534   if (LBSimulation::doSimulation) simulationRead();
535
536   char *availVector = LBDatabaseObj()->availVector();
537   for(proc = 0; proc < clients; proc++)
538       statsData->procs[proc].available = (CmiBool)availVector[proc];
539
540   preprocess(statsData, clients);
541
542 //    CkPrintf("Before Calling Strategy\n");
543
544   if (_lb_args.printSummary()) {
545       LBInfo info(clients);
546         // not take comm data
547       info.getInfo(statsData, clients, 0);
548       double mLoad, mCpuLoad, totalLoad;
549       info.getSummary(mLoad, mCpuLoad, totalLoad);
550       int nmsgs, nbytes;
551       statsData->computeNonlocalComm(nmsgs, nbytes);
552       CkPrintf("[%d] Load Summary (before LB): max (with bg load): %f max (obj only): %f average: %f at step %d nonlocal: %d msgs %.2fKB.\n", CkMyPe(), mLoad, mCpuLoad, totalLoad/clients, step(), nmsgs, 1.0*nbytes/1024);
553 //      if (_lb_args.debug() > 1) {
554 //        for (int i=0; i<statsData->n_objs; i++)
555 //          CmiPrintf("[%d] %.10f %.10f\n", i, statsData->objData[i].minWall, statsData->objData[i].maxWall);
556 //      }
557   }
558
559   double strat_start_time = CkWallTimer();
560   LBMigrateMsg* migrateMsg = Strategy(statsData, clients);
561   if (_lb_args.debug()) {
562     CkPrintf("Strategy took %f seconds.\n", CkWallTimer()-strat_start_time);
563     double lbdbMemsize = LBDatabase::Object()->useMem()/1000;
564     CkPrintf("[%s] memUsage: LBManager:%dKB CentralLB:%dKB\n", 
565               lbName(), (int)lbdbMemsize, (int)(useMem()/1000));
566   }
567
568 //    CkPrintf("returned successfully\n");
569
570   LBDatabaseObj()->get_avail_vector(migrateMsg->avail_vector);
571   migrateMsg->next_lb = LBDatabaseObj()->new_lbbalancer();
572
573   // if this is the step at which we need to dump the database
574   simulationWrite();
575
576 //  calculate predicted load
577 //  very time consuming though, so only happen when debugging is on
578   if (_lb_args.printSummary()) {
579       LBInfo info(clients);
580         // not take comm data
581       getPredictedLoadWithMsg(statsData, clients, migrateMsg, info, 0);
582       double mLoad, mCpuLoad, totalLoad;
583       info.getSummary(mLoad, mCpuLoad, totalLoad);
584       int nmsgs, nbytes;
585       statsData->computeNonlocalComm(nmsgs, nbytes);
586       CkPrintf("[%d] Load Summary (after LB): max (with bg load): %f max (obj only): %f average: %f at step %d nonlocal: %d msgs %.2fKB useMem: %.2fKB.\n", CkMyPe(), mLoad, mCpuLoad, totalLoad/clients, step(), nmsgs, 1.0*nbytes/1024, (1.0*useMem())/1024);
587       for (int i=0; i<clients; i++)
588         migrateMsg->expectedLoad[i] = info.peLoads[i];
589   }
590
591   DEBUGF(("[%d]calling recv migration\n",CkMyPe()));
592 #ifdef _FAULT_MLOG_ 
593     lbDecisionCount++;
594     migrateMsg->lbDecisionCount = lbDecisionCount;
595 #endif
596   thisProxy.ReceiveMigration(migrateMsg);
597
598   // Zero out data structures for next cycle
599   // CkPrintf("zeroing out data\n");
600   statsData->clear();
601   stats_msg_count=0;
602 #endif
603 }
604
605 //    double strat_end_time = CkWallTimer();
606 //    CkPrintf("Strat elapsed time %f\n",strat_end_time-strat_start_time);
607 // test if sender and receiver in a commData is nonmigratable.
608 static int isMigratable(LDObjData **objData, int *len, int count, const LDCommData &commData)
609 {
610 #if CMK_LBDB_ON
611   for (int pe=0 ; pe<count; pe++)
612   {
613     for (int i=0; i<len[pe]; i++)
614       if (LDObjIDEqual(objData[pe][i].objID(), commData.sender.objID()) ||
615           LDObjIDEqual(objData[pe][i].objID(), commData.receiver.get_destObj().objID())) 
616       return 0;
617   }
618 #endif
619   return 1;
620 }
621
622 // rebuild LDStats and remove all non-migratble objects and related things
623 void CentralLB::removeNonMigratable(LDStats* stats, int count)
624 {
625   int i;
626
627   // check if we have non-migratable objects
628   int have = 0;
629   for (i=0; i<stats->n_objs; i++) 
630   {
631     LDObjData &odata = stats->objData[i];
632     if (!odata.migratable) {
633       have = 1; break;
634     }
635   }
636   if (have == 0) return;
637
638   CkVec<LDObjData> nonmig;
639   CkVec<int> new_from_proc, new_to_proc;
640   nonmig.resize(stats->n_migrateobjs);
641   new_from_proc.resize(stats->n_migrateobjs);
642   new_to_proc.resize(stats->n_migrateobjs);
643   int n_objs = 0;
644   for (i=0; i<stats->n_objs; i++) 
645   {
646     LDObjData &odata = stats->objData[i];
647     if (odata.migratable) {
648       nonmig[n_objs] = odata;
649       new_from_proc[n_objs] = stats->from_proc[i];
650       new_to_proc[n_objs] = stats->to_proc[i];
651       n_objs ++;
652     }
653     else {
654       stats->procs[stats->from_proc[i]].bg_walltime += odata.wallTime;
655       stats->procs[stats->from_proc[i]].bg_cputime += odata.cpuTime;
656     }
657   }
658   CmiAssert(stats->n_migrateobjs == n_objs);
659
660   stats->makeCommHash();
661   
662   CkVec<LDCommData> newCommData;
663   newCommData.resize(stats->n_comm);
664   int n_comm = 0;
665   for (i=0; i<stats->n_comm; i++) 
666   {
667     LDCommData& cdata = stats->commData[i];
668     if (!cdata.from_proc()) 
669     {
670       int idx = stats->getSendHash(cdata);
671       CmiAssert(idx != -1);
672       if (!stats->objData[idx].migratable) continue;
673     }
674     switch (cdata.receiver.get_type()) {
675     case LD_PROC_MSG:
676       break;
677     case LD_OBJ_MSG:  {
678       int idx = stats->getRecvHash(cdata);
679       if (stats->complete_flag)
680         CmiAssert(idx != -1);
681       else if (idx == -1) continue;          // receiver not in this group
682       if (!stats->objData[idx].migratable) continue;
683       break;
684       }
685     case LD_OBJLIST_MSG:    // object message FIXME add multicast
686       break;
687     }
688     newCommData[n_comm] = cdata;
689     n_comm ++;
690   }
691
692   if (n_objs != stats->n_objs) CmiPrintf("Removed %d nonmigratable %d comms - n_objs:%d migratable:%d\n", stats->n_objs-n_objs, stats->n_objs, stats->n_migrateobjs, stats->n_comm-n_comm);
693
694   // swap to new data
695   stats->objData = nonmig;
696   stats->from_proc = new_from_proc;
697   stats->to_proc = new_to_proc;
698   stats->n_objs = n_objs;
699
700   stats->commData = newCommData;
701   stats->n_comm = n_comm;
702
703   stats->deleteCommHash();
704   stats->makeCommHash();
705
706 }
707
708
709 #ifdef _FAULT_MLOG_
710 extern int restarted;
711 #endif
712
713 void CentralLB::ReceiveMigration(LBMigrateMsg *m)
714 {
715 #if CMK_LBDB_ON
716   int i;
717   for (i=0; i<CkNumPes(); i++) theLbdb->lastLBInfo.expectedLoad[i] = m->expectedLoad[i];
718   CmiAssert(migrates_expected <= 0 || migrates_completed == migrates_expected);
719 /*FAULT_EVAC*/
720   if(!CmiNodeAlive(CkMyPe())){
721         delete m;
722         return;
723   }
724   migrates_expected = 0;
725   future_migrates_expected = 0;
726 #ifdef _FAULT_MLOG_
727         int sending=0;
728     int dummy=0;
729         int *dummyCounts;
730         LBDB *_myLBDB = theLbdb->getLBDB();
731 #endif
732   for(i=0; i < m->n_moves; i++) {
733     MigrateInfo& move = m->moves[i];
734     const int me = CkMyPe();
735     if (move.from_pe == me && move.to_pe != me) {
736       DEBUGF(("[%d] migrating object to %d\n",move.from_pe,move.to_pe));
737       // migrate object, in case it is already gone, inform toPe
738 #ifndef _FAULT_MLOG_
739       if (theLbdb->Migrate(move.obj,move.to_pe) == 0) 
740          thisProxy[move.to_pe].MissMigrate(!move.async_arrival);
741 #else
742             if(_restartFlag == 0){
743                 DEBUG(CmiPrintf("[%d] need to move object from %d to %d \n",CkMyPe(),move.from_pe,move.to_pe));
744                 theLbdb->Migrate(move.obj,move.to_pe);
745                 sending++;
746             }else{
747                 if(_myLBDB->validObjHandle(move.obj)){
748                     DEBUG(CmiPrintf("[%d] need to move object from %d to %d \n",CkMyPe(),move.from_pe,move.to_pe));
749                     theLbdb->Migrate(move.obj,move.to_pe);
750                     sending++;
751                 }else{
752                     DEBUG(CmiPrintf("[%d] dummy move to pe %d detected after restart \n",CmiMyPe(),move.to_pe));
753                     dummyCounts[move.to_pe]++;
754                     dummy++;
755                 }
756             }
757 #endif
758     } else if (move.from_pe != me && move.to_pe == me) {
759        DEBUGF(("[%d] expecting object from %d\n",move.to_pe,move.from_pe));
760       if (!move.async_arrival) migrates_expected++;
761       else future_migrates_expected++;
762     }
763   }
764   DEBUGF(("[%d] in ReceiveMigration %d moves expected: %d future expected: %d\n",CkMyPe(),m->n_moves, migrates_expected, future_migrates_expected));
765   // if (_lb_debug) CkPrintf("[%d] expecting %d objects migrating.\n", CkMyPe(), migrates_expected);
766 #if 0
767   if (m->n_moves ==0) {
768     theLbdb->SetLBPeriod(theLbdb->GetLBPeriod()*2);
769   }
770 #endif
771   cur_ld_balancer = m->next_lb;
772   if((CkMyPe() == cur_ld_balancer) && (cur_ld_balancer != 0)){
773       LBDatabaseObj()->set_avail_vector(m->avail_vector, -2);
774   }
775
776   if (migrates_expected == 0 || migrates_completed == migrates_expected)
777     MigrationDone(1);
778   delete m;
779
780 //      CkEvacuatedElement();
781 #ifdef _FAULT_MLOG_
782 //  migrates_expected = 0;
783 //  //  ResumeClients(1);
784 #endif
785 #endif
786 }
787
788
789 void CentralLB::MigrationDone(int balancing)
790 {
791 #if CMK_LBDB_ON
792   migrates_completed = 0;
793   migrates_expected = -1;
794   // clear load stats
795   if (balancing) theLbdb->ClearLoads();
796   // Increment to next step
797   theLbdb->incStep();
798         DEBUGF(("[%d] Incrementing Step %d \n",CkMyPe(),step()));
799   // if sync resume, invoke a barrier
800
801 #ifdef _FAULT_MLOG_
802     savedBalancing = balancing;
803     startLoadBalancingMlog(&resumeCentralLbAfterChkpt,(void *)this);
804 #endif
805
806   LoadbalanceDone(balancing);        // callback
807 #ifndef _FAULT_MLOG_
808   // if sync resume invoke a barrier
809   if (balancing && _lb_args.syncResume()) {
810     CkCallback cb(CkIndex_CentralLB::ResumeClients((CkReductionMsg*)NULL), 
811                   thisProxy);
812     contribute(0, NULL, CkReduction::sum_int, cb);
813   }
814   else{ 
815     if(CmiNodeAlive(CkMyPe())){
816         thisProxy [CkMyPe()].ResumeClients(balancing);
817     }   
818   }     
819 #if CMK_GRID_QUEUE_AVAILABLE
820   CmiGridQueueDeregisterAll ();
821   CpvAccess(CkGridObject) = NULL;
822 #endif
823 #endif 
824 #endif
825 }
826
827 #ifdef _FAULT_MLOG_
828 void CentralLB::endMigrationDone(int balancing){
829     DEBUGF(("[%d] CentralLB::endMigrationDone step %d\n",CkMyPe(),step()));
830
831
832   if (balancing && _lb_args.syncResume()) {
833     CkCallback cb(CkIndex_CentralLB::ResumeClients((CkReductionMsg*)NULL),
834                   thisProxy);
835     contribute(0, NULL, CkReduction::sum_int, cb);
836   }
837   else{
838     if(CmiNodeAlive(CkMyPe())){
839     DEBUGF(("[%d] Sending ResumeClients balancing %d \n",CkMyPe(),balancing));
840     thisProxy [CkMyPe()].ResumeClients(balancing);
841     }
842   }
843
844 }
845 #endif
846
847 #ifdef _FAULT_MLOG_
848 void resumeCentralLbAfterChkpt(void *_lb){
849     CentralLB *lb= (CentralLB *)_lb;
850     CpvAccess(_currentObj)=lb;
851     lb->endMigrationDone(lb->savedBalancing);
852 }
853 #endif
854
855
856 void CentralLB::ResumeClients(CkReductionMsg *msg)
857 {
858   ResumeClients(1);
859   delete msg;
860 }
861
862 void CentralLB::ResumeClients(int balancing)
863 {
864 #if CMK_LBDB_ON
865 #ifdef _FAULT_MLOG_
866     resumeCount++;
867     globalResumeCount = resumeCount;
868 #endif
869   DEBUGF(("[%d] Resuming clients. balancing:%d.\n",CkMyPe(),balancing));
870   if (balancing && _lb_args.debug() && CkMyPe() == cur_ld_balancer) {
871     double end_lb_time = CkWallTimer();
872     CkPrintf("[%s] Load balancing step %d finished at %f (duration %fs) Mem:%fMB\n",
873               lbName(), step()-1,end_lb_time, end_lb_time - start_lb_time, CmiMemoryUsage()/1024.0/1024.0);
874   }
875
876 #ifndef _FAULT_MLOG_
877   if (balancing) ComlibNotifyMigrationDone();  
878 #endif
879
880   theLbdb->ResumeClients();
881   if (balancing)  {
882     CheckMigrationComplete();
883     if (future_migrates_expected == 0 || 
884             future_migrates_expected == future_migrates_completed) {
885       CheckMigrationComplete();
886     }
887   }
888 #endif
889 }
890
891 /*
892   migration of objects contains two different kinds:
893   (1) objects want to make a barrier for migration completion
894       (waitForBarrier is true)
895       migrationDone() to finish and resumeClients
896   (2) objects don't need a barrier
897   However, next load balancing can only happen when both migrations complete
898 */ 
899 void CentralLB::CheckMigrationComplete()
900 {
901 #if CMK_LBDB_ON
902   lbdone ++;
903   if (lbdone == 2) {
904     lbdone = 0;
905     future_migrates_expected = -1;
906     future_migrates_completed = 0;
907     DEBUGF(("[%d] Migration Complete\n", CkMyPe()));
908     // release local barrier  so that the next load balancer can go
909     LDOMHandle h;
910     h.id.id.idx = 0;
911     theLbdb->getLBDB()->DoneRegisteringObjects(h);
912     // switch to the next load balancer in the list
913     // subtle: called from Migrated() may result in Migrated() called in next LB
914     theLbdb->nextLoadbalancer(seqno);
915   }
916 #endif
917 }
918
919 void CentralLB::preprocess(LDStats* stats,int count)
920 {
921   if (_lb_args.ignoreBgLoad())
922     stats->clearBgLoad();
923
924   // Call the predictor for the future
925   if (_lb_predict) FuturePredictor(statsData);
926 }
927
928 // default load balancing strategy
929 LBMigrateMsg* CentralLB::Strategy(LDStats* stats,int count)
930 {
931 #if CMK_LBDB_ON
932   work(stats, count);
933
934   if (_lb_args.debug()>1)  {
935     CkPrintf("Obj Map:\n");
936     for (int i=0; i<stats->n_objs; i++) CkPrintf("%d ", stats->to_proc[i]);
937     CkPrintf("\n");
938   }
939
940   return createMigrateMsg(stats, count);
941 #else
942   return NULL;
943 #endif
944 }
945
946 void CentralLB::work(LDStats* stats,int count)
947 {
948   // does nothing but print the database
949   stats->print();
950 }
951
952 // generate migrate message from stats->from_proc and to_proc
953 LBMigrateMsg * CentralLB::createMigrateMsg(LDStats* stats,int count)
954 {
955   int i;
956   CkVec<MigrateInfo*> migrateInfo;
957   for (i=0; i<stats->n_objs; i++) {
958     LDObjData &objData = stats->objData[i];
959     int frompe = stats->from_proc[i];
960     int tope = stats->to_proc[i];
961     if (frompe != tope) {
962       //      CkPrintf("[%d] Obj %d migrating from %d to %d\n",
963       //         CkMyPe(),obj,pe,dest);
964       MigrateInfo *migrateMe = new MigrateInfo;
965       migrateMe->obj = objData.handle;
966       migrateMe->from_pe = frompe;
967       migrateMe->to_pe = tope;
968       migrateMe->async_arrival = objData.asyncArrival;
969       migrateInfo.insertAtEnd(migrateMe);
970     }
971   }
972
973   int migrate_count=migrateInfo.length();
974   LBMigrateMsg* msg = new(migrate_count,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
975   msg->n_moves = migrate_count;
976   for(i=0; i < migrate_count; i++) {
977     MigrateInfo* item = (MigrateInfo*) migrateInfo[i];
978     msg->moves[i] = *item;
979     delete item;
980     migrateInfo[i] = 0;
981   }
982   if (_lb_args.debug())
983     CkPrintf("%s: %d objects migrating.\n", lbname, migrate_count);
984   return msg;
985 }
986
987 void CentralLB::simulationWrite() {
988   if(step() == LBSimulation::dumpStep)
989   {
990     // here we are supposed to dump the database
991     int dumpFileSize = strlen(LBSimulation::dumpFile) + 4;
992     char *dumpFileName = (char *)malloc(dumpFileSize);
993     while (sprintf(dumpFileName, "%s.%d", LBSimulation::dumpFile, LBSimulation::dumpStep) >= dumpFileSize) {
994       free(dumpFileName);
995       dumpFileSize+=3;
996       dumpFileName = (char *)malloc(dumpFileSize);
997     }
998     writeStatsMsgs(dumpFileName);
999     free(dumpFileName);
1000     CmiPrintf("LBDump: Dumped the load balancing data at step %d.\n",LBSimulation::dumpStep);
1001     ++LBSimulation::dumpStep;
1002     --LBSimulation::dumpStepSize;
1003     if (LBSimulation::dumpStepSize <= 0) { // prevent stupid step sizes
1004       CmiPrintf("Charm++> Exiting...\n");
1005       CkExit();
1006     }
1007     return;
1008   }
1009 }
1010
1011 void CentralLB::simulationRead() {
1012   LBSimulation *simResults = NULL, *realResults;
1013   LBMigrateMsg *voidMessage = new (0,0,0,0) LBMigrateMsg();
1014   voidMessage->n_moves=0;
1015   for ( ;LBSimulation::simStepSize > 0; --LBSimulation::simStepSize, ++LBSimulation::simStep) {
1016     // here we are supposed to read the data from the dump database
1017     int simFileSize = strlen(LBSimulation::dumpFile) + 4;
1018     char *simFileName = (char *)malloc(simFileSize);
1019     while (sprintf(simFileName, "%s.%d", LBSimulation::dumpFile, LBSimulation::simStep) >= simFileSize) {
1020       free(simFileName);
1021       simFileSize+=3;
1022       simFileName = (char *)malloc(simFileSize);
1023     }
1024     readStatsMsgs(simFileName);
1025
1026     // allocate simResults (only the first step)
1027     if (simResults == NULL) {
1028       simResults = new LBSimulation(LBSimulation::simProcs);
1029       realResults = new LBSimulation(LBSimulation::simProcs);
1030     }
1031     else {
1032       // should be the same number of procs of the original simulation!
1033       if (!LBSimulation::procsChanged) {
1034         // it means we have a previous step, so in simResults there is data.
1035         // we can now print the real effects of the load balancer during the simulation
1036         // or print the difference between the predicted data and the real one.
1037         realResults->reset();
1038         // reset to_proc of statsData to be equal to from_proc
1039         for (int k=0; k < statsData->n_objs; ++k) statsData->to_proc[k] = statsData->from_proc[k];
1040         findSimResults(statsData, LBSimulation::simProcs, voidMessage, realResults);
1041         simResults->PrintDifferences(realResults,statsData);
1042       }
1043       simResults->reset();
1044     }
1045
1046     // now pass it to the strategy routine
1047     double startT = CkWallTimer();
1048     preprocess(statsData, LBSimulation::simProcs);
1049     CmiPrintf("%s> Strategy starts ... \n", lbname);
1050     LBMigrateMsg* migrateMsg = Strategy(statsData, LBSimulation::simProcs);
1051     CmiPrintf("%s> Strategy took %fs memory usage: CentralLB:%dKB. \n", 
1052                lbname, CkWallTimer()-startT, (int)(useMem()/1000));
1053
1054     // now calculate the results of the load balancing simulation
1055     findSimResults(statsData, LBSimulation::simProcs, migrateMsg, simResults);
1056
1057     // now we have the simulation data, so print it and loop
1058     CmiPrintf("Charm++> LBSim: Simulation of load balancing step %d done.\n",LBSimulation::simStep);
1059     // **CWL** Officially recording my disdain here for using ints for bool
1060     if (LBSimulation::showDecisionsOnly) {
1061       simResults->PrintDecisions(migrateMsg, simFileName, 
1062                                  LBSimulation::simProcs);
1063     } else {
1064       simResults->PrintSimulationResults();
1065     }
1066
1067     free(simFileName);
1068     delete migrateMsg;
1069     CmiPrintf("Charm++> LBSim: Passing to the next step\n");
1070   }
1071   // deallocate simResults
1072   delete simResults;
1073   CmiPrintf("Charm++> Exiting...\n");
1074   CkExit();
1075 }
1076
1077 void CentralLB::readStatsMsgs(const char* filename) 
1078 {
1079 #if CMK_LBDB_ON
1080   int i;
1081   FILE *f = fopen(filename, "r");
1082   if (f==NULL) {
1083     CmiPrintf("Fatal Error> Cannot open LB Dump file %s!\n", filename);
1084     CmiAbort("");
1085   }
1086
1087   // at this stage, we need to rebuild the statsMsgList and
1088   // statsDataList structures. For that first deallocate the
1089   // old structures
1090   if (statsMsgsList) {
1091     for(i = 0; i < stats_msg_count; i++)
1092       delete statsMsgsList[i];
1093     delete[] statsMsgsList;
1094     statsMsgsList=0;
1095   }
1096
1097   PUP::fromDisk pd(f);
1098   PUP::machineInfo machInfo;
1099
1100   pd((char *)&machInfo, sizeof(machInfo));      // read machine info
1101   PUP::xlater p(machInfo, pd);
1102
1103   if (_lb_args.lbversion() > 1) {
1104     p|_lb_args.lbversion();             // write version number
1105     CkPrintf("LB> File version detected: %d\n", _lb_args.lbversion());
1106     CmiAssert(_lb_args.lbversion() <= LB_FORMAT_VERSION);
1107   }
1108   p|stats_msg_count;
1109
1110   CmiPrintf("readStatsMsgs for %d pes starts ... \n", stats_msg_count);
1111   if (LBSimulation::simProcs == 0) LBSimulation::simProcs = stats_msg_count;
1112   if (LBSimulation::simProcs != stats_msg_count) LBSimulation::procsChanged = true;
1113
1114   // LBSimulation::simProcs must be set
1115   statsData->pup(p);
1116
1117   CmiPrintf("Simulation for %d pes \n", LBSimulation::simProcs);
1118   CmiPrintf("n_obj: %d n_migratble: %d \n", statsData->n_objs, statsData->n_migrateobjs);
1119
1120   // file f is closed in the destructor of PUP::fromDisk
1121   CmiPrintf("ReadStatsMsg from %s completed\n", filename);
1122 #endif
1123 }
1124
1125 void CentralLB::writeStatsMsgs(const char* filename) 
1126 {
1127 #if CMK_LBDB_ON
1128   FILE *f = fopen(filename, "w");
1129   if (f==NULL) {
1130     CmiPrintf("Fatal Error> writeStatsMsgs failed to open the output file %s!\n", filename);
1131     CmiAbort("");
1132   }
1133
1134   const PUP::machineInfo &machInfo = PUP::machineInfo::current();
1135   PUP::toDisk p(f);
1136   p((char *)&machInfo, sizeof(machInfo));       // machine info
1137
1138   p|_lb_args.lbversion();               // write version number
1139   p|stats_msg_count;
1140   statsData->pup(p);
1141
1142   fclose(f);
1143
1144   CmiPrintf("WriteStatsMsgs to %s succeed!\n", filename);
1145 #endif
1146 }
1147
1148 // calculate the predicted wallclock/cpu load for every processors
1149 // considering communication overhead if considerComm is true
1150 void getPredictedLoadWithMsg(BaseLB::LDStats* stats, int count, 
1151                       LBMigrateMsg *msg, LBInfo &info, 
1152                       int considerComm)
1153 {
1154 #if CMK_LBDB_ON
1155         stats->makeCommHash();
1156
1157         // update to_proc according to migration msgs
1158         for(int i = 0; i < msg->n_moves; i++) {
1159           MigrateInfo &mInfo = msg->moves[i];
1160           int idx = stats->getHash(mInfo.obj.objID(), mInfo.obj.omID());
1161           CmiAssert(idx != -1);
1162           stats->to_proc[idx] = mInfo.to_pe;
1163         }
1164
1165         info.getInfo(stats, count, considerComm);
1166 #endif
1167 }
1168
1169
1170 void CentralLB::findSimResults(LDStats* stats, int count, LBMigrateMsg* msg, LBSimulation* simResults)
1171 {
1172     CkAssert(simResults != NULL && count == simResults->numPes);
1173     // estimate the new loads of the processors. As a first approximation, this is the
1174     // sum of the cpu times of the objects on that processor
1175     double startT = CkWallTimer();
1176     getPredictedLoadWithMsg(stats, count, msg, simResults->lbinfo, 1);
1177     CmiPrintf("getPredictedLoad finished in %fs\n", CkWallTimer()-startT);
1178 }
1179
1180 void CentralLB::pup(PUP::er &p) { 
1181   BaseLB::pup(p); 
1182   if (p.isUnpacking())  {
1183     initLB(CkLBOptions(seqno)); 
1184   }
1185 }
1186
1187 int CentralLB::useMem() { 
1188   return sizeof(CentralLB) + statsData->useMem() + 
1189          CkNumPes() * sizeof(CLBStatsMsg *);
1190 }
1191
1192
1193 /**
1194   CLBStatsMsg is not a real message now.
1195   CLBStatsMsg is used for all processors to fill in their local load and comm
1196   statistics and send to processor 0
1197 */
1198
1199 CLBStatsMsg::CLBStatsMsg(int osz, int csz) {
1200   n_objs = osz;
1201   n_comm = csz;
1202   objData = new LDObjData[osz];
1203   commData = new LDCommData[csz];
1204   avail_vector = NULL;
1205 }
1206
1207 CLBStatsMsg::~CLBStatsMsg() {
1208   delete [] objData;
1209   delete [] commData;
1210   if (avail_vector) delete [] avail_vector;
1211 }
1212
1213 void CLBStatsMsg::pup(PUP::er &p) {
1214   int i;
1215   p|from_pe;
1216   p|pe_speed;
1217   p|total_walltime; p|total_cputime;
1218   p|idletime;
1219   p|bg_walltime;   p|bg_cputime;
1220   p|n_objs;
1221   if (p.isUnpacking()) objData = new LDObjData[n_objs];
1222   for (i=0; i<n_objs; i++) p|objData[i];
1223   p|n_comm;
1224   if (p.isUnpacking()) commData = new LDCommData[n_comm];
1225   for (i=0; i<n_comm; i++) p|commData[i];
1226
1227   int has_avail_vector;
1228   if (!p.isUnpacking()) has_avail_vector = (avail_vector != NULL);
1229   p|has_avail_vector;
1230   if (p.isUnpacking()) {
1231     if (has_avail_vector) avail_vector = new char[CkNumPes()];
1232     else avail_vector = NULL;
1233   }
1234   if (has_avail_vector) p(avail_vector, CkNumPes());
1235
1236   p(next_lb);
1237 }
1238
1239 // CkMarshalledCLBStatsMessage is used in the marshalled parameter in
1240 // the entry function, it is just used to use to pup.
1241 // I don't use CLBStatsMsg directly as marshalled parameter because
1242 // I want the data pointer stored and not to be freed by the Charm++.
1243 void CkMarshalledCLBStatsMessage::free() { 
1244   int count = msgs.size();
1245   for  (int i=0; i<count; i++) 
1246     if (msgs[i]) delete msgs[i];
1247   msgs.free();
1248 }
1249
1250 void CkMarshalledCLBStatsMessage::add(CkMarshalledCLBStatsMessage &m)
1251 {
1252   int count = m.getCount();
1253   for (int i=0; i<count; i++) add(m.getMessage(i));
1254 }
1255
1256 void CkMarshalledCLBStatsMessage::pup(PUP::er &p)
1257 {
1258   int count = msgs.size();
1259   p|count;
1260   for (int i=0; i<count; i++) {
1261     CLBStatsMsg *msg;
1262     if (p.isUnpacking()) msg = new CLBStatsMsg;
1263     else { 
1264       msg = msgs[i]; CmiAssert(msg);
1265     }
1266     msg->pup(p);
1267     if (p.isUnpacking()) add(msg);
1268   }
1269 }
1270
1271 SpanningTree::SpanningTree()
1272 {
1273         double sq = sqrt(CkNumPes()*4.0-3.0) - 1; // 1 + arity + arity*arity = CkNumPes()
1274         arity = (int)ceil(sq/2);
1275         calcParent(CkMyPe());
1276         calcNumChildren(CkMyPe());
1277 }
1278
1279 void SpanningTree::calcParent(int n)
1280 {
1281         parent=-1;
1282         if(n != 0  && arity > 0)
1283                 parent = (n-1)/arity;
1284 }
1285
1286 void SpanningTree::calcNumChildren(int n)
1287 {
1288         numChildren = 0;
1289         if (arity == 0) return;
1290         int fullNode=(CkNumPes()-1-arity)/arity;
1291         if(n <= fullNode)
1292                 numChildren = arity;
1293         if(n == fullNode+1)
1294                 numChildren = CkNumPes()-1-(fullNode+1)*arity;
1295         if(n > fullNode+1)
1296                 numChildren = 0;
1297 }
1298
1299 #include "CentralLB.def.h"
1300  
1301 /*@}*/