add pup support
[charm.git] / src / ck-ldb / CentralLB.C
1
2 /**
3  * \addtogroup CkLdb
4 */
5 /*@{*/
6
7 #include <charm++.h>
8 #include "ck.h"
9 #include "envelope.h"
10 #include "CentralLB.h"
11 #include "LBDBManager.h"
12 #include "LBSimulation.h"
13
14 #define  DEBUGF(x)       // CmiPrintf x;
15 #define  DEBUG(x)        // x;
16
17 #if CMK_MEM_CHECKPOINT
18    /* can not handle reduction in inmem FT */
19 #define USE_REDUCTION         0
20 #define USE_LDB_SPANNING_TREE 0
21 #else
22 #define USE_REDUCTION         1
23 #define USE_LDB_SPANNING_TREE 1
24 #endif
25
26 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
27 extern int _restartFlag;
28 extern void getGlobalStep(CkGroupID );
29 extern void initMlogLBStep(CkGroupID );
30 extern int globalResumeCount;
31 extern void sendDummyMigrationCounts(int *);
32 #endif
33
34 #if CMK_GRID_QUEUE_AVAILABLE
35 CpvExtern(void *, CkGridObject);
36 #endif
37
38 #if CMK_GLOBAL_LOCATION_UPDATE      
39 extern void UpdateLocation(MigrateInfo& migData); 
40 #endif
41
42 CkGroupID loadbalancer;
43 int * lb_ptr;
44 int load_balancer_created;
45
46 CreateLBFunc_Def(CentralLB, "CentralLB base class")
47
48 static void getPredictedLoadWithMsg(BaseLB::LDStats* stats, int count, 
49                              LBMigrateMsg *, LBInfo &info, int considerComm);
50
51 /*
52 void CreateCentralLB()
53 {
54   CProxy_CentralLB::ckNew(0);
55 }
56 */
57
58 void CentralLB::staticStartLB(void* data)
59 {
60   CentralLB *me = (CentralLB*)(data);
61   me->StartLB();
62 }
63
64 void CentralLB::staticMigrated(void* data, LDObjHandle h, int waitBarrier)
65 {
66   CentralLB *me = (CentralLB*)(data);
67   me->Migrated(h, waitBarrier);
68 }
69
70 void CentralLB::staticAtSync(void* data)
71 {
72   CentralLB *me = (CentralLB*)(data);
73   me->AtSync();
74 }
75
76 void CentralLB::initLB(const CkLBOptions &opt)
77 {
78 #if CMK_LBDB_ON
79   lbname = "CentralLB";
80   thisProxy = CProxy_CentralLB(thisgroup);
81   //  CkPrintf("Construct in %d\n",CkMyPe());
82
83   // create and turn on by default
84   receiver = theLbdb->
85     AddLocalBarrierReceiver((LDBarrierFn)(staticAtSync),(void*)(this));
86   notifier = theLbdb->getLBDB()->
87     NotifyMigrated((LDMigratedFn)(staticMigrated),(void*)(this));
88   startLbFnHdl = theLbdb->getLBDB()->
89     AddStartLBFn((LDStartLBFn)(staticStartLB),(void*)(this));
90
91   // CkPrintf("[%d] CentralLB initLB \n",CkMyPe());
92   if (opt.getSeqNo() > 0) turnOff();
93
94   stats_msg_count = 0;
95   statsMsgsList = NULL;
96   statsData = NULL;
97
98   storedMigrateMsg = NULL;
99   reduction_started = 0;
100
101   // for future predictor
102   if (_lb_predict) predicted_model = new FutureModel(_lb_predict_window);
103   else predicted_model=0;
104   // register user interface callbacks
105   theLbdb->getLBDB()->SetupPredictor((LDPredictModelFn)(staticPredictorOn),(LDPredictWindowFn)(staticPredictorOnWin),(LDPredictFn)(staticPredictorOff),(LDPredictModelFn)(staticChangePredictor),(void*)(this));
106
107   myspeed = theLbdb->ProcessorSpeed();
108
109   migrates_completed = 0;
110   future_migrates_completed = 0;
111   migrates_expected = -1;
112   future_migrates_expected = -1;
113   cur_ld_balancer = _lb_args.central_pe();      // 0 default
114   lbdone = 0;
115   count_msgs=0;
116   statsMsg = NULL;
117
118   if (_lb_args.statsOn()) theLbdb->CollectStatsOn();
119
120   load_balancer_created = 1;
121 #endif
122 }
123
124 CentralLB::~CentralLB()
125 {
126 #if CMK_LBDB_ON
127   delete [] statsMsgsList;
128   delete statsData;
129   theLbdb = CProxy_LBDatabase(_lbdb).ckLocalBranch();
130   if (theLbdb) {
131     theLbdb->getLBDB()->
132       RemoveNotifyMigrated(notifier);
133     theLbdb->
134       RemoveStartLBFn((LDStartLBFn)(staticStartLB));
135   }
136 #endif
137 }
138
139 void CentralLB::turnOn() 
140 {
141 #if CMK_LBDB_ON
142   theLbdb->getLBDB()->
143     TurnOnBarrierReceiver(receiver);
144   theLbdb->getLBDB()->
145     TurnOnNotifyMigrated(notifier);
146   theLbdb->getLBDB()->
147     TurnOnStartLBFn(startLbFnHdl);
148 #endif
149 }
150
151 void CentralLB::turnOff() 
152 {
153 #if CMK_LBDB_ON
154   theLbdb->getLBDB()->
155     TurnOffBarrierReceiver(receiver);
156   theLbdb->getLBDB()->
157     TurnOffNotifyMigrated(notifier);
158   theLbdb->getLBDB()->
159     TurnOffStartLBFn(startLbFnHdl);
160 #endif
161 }
162
163 void CentralLB::AtSync()
164 {
165 #if CMK_LBDB_ON
166   DEBUGF(("[%d] CentralLB AtSync step %d!!!!!\n",CkMyPe(),step()));
167   CkPrintf("[%d][%d] CentralLB AtSync step %d!!!!!\n",CmiMyPartition(),CkMyPe(),step());
168 #if CMK_MEM_CHECKPOINT  
169   CkSetInLdb();
170 #endif
171 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
172         CpvAccess(_currentObj)=this;
173 #endif
174
175   // if num of processor is only 1, nothing should happen
176   if (!QueryBalanceNow(step()) || CkNumPes() == 1) {
177     MigrationDone(0);
178     return;
179   }
180   if(CmiNodeAlive(CkMyPe())){
181     thisProxy [CkMyPe()].ProcessAtSync();
182   }
183 #endif
184 }
185
186 #include "ComlibStrategy.h"
187
188 void CentralLB::ProcessAtSync()
189 {
190 #if CMK_LBDB_ON
191   if (reduction_started) return;              // reducton in progress
192
193   CmiAssert(CmiNodeAlive(CkMyPe()));
194   if (CkMyPe() == cur_ld_balancer) {
195     start_lb_time = CkWallTimer();
196   }
197
198 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
199         initMlogLBStep(thisgroup);
200 #endif
201
202   // build message
203   BuildStatsMsg();
204
205 #if USE_REDUCTION
206     // reduction to get total number of objects and comm
207     // so that processor 0 can pre-allocate load balancing database
208   int counts[2];
209   counts[0] = theLbdb->GetObjDataSz();
210   counts[1] = theLbdb->GetCommDataSz();
211
212   CkCallback cb(CkIndex_CentralLB::ReceiveCounts((CkReductionMsg*)NULL), 
213                   thisProxy[0]);
214   contribute(2*sizeof(int), counts, CkReduction::sum_int, cb);
215   reduction_started = 1;
216 #else
217   SendStats();
218 #endif
219 #endif
220 }
221
222 // called only on 0
223 void CentralLB::ReceiveCounts(CkReductionMsg  *msg)
224 {
225   CmiAssert(CkMyPe() == 0);
226   if (statsData == NULL) statsData = new LDStats;
227
228   int *counts = (int *)msg->getData();
229   int n_objs = counts[0];
230   int n_comm = counts[1];
231
232     // resize database
233   statsData->objData.resize(n_objs);
234   statsData->from_proc.resize(n_objs);
235   statsData->to_proc.resize(n_objs);
236   statsData->commData.resize(n_comm);
237
238   DEBUGF(("[%d] ReceiveCounts: n_objs:%d n_comm:%d\n",CkMyPe(), n_objs, n_comm));
239         
240     // broadcast call to let everybody start to send stats
241   thisProxy.SendStats();
242 }
243
244 void CentralLB::BuildStatsMsg()
245 {
246 #if CMK_LBDB_ON
247   // build and send stats
248   const int osz = theLbdb->GetObjDataSz();
249   const int csz = theLbdb->GetCommDataSz();
250
251   int npes = CkNumPes();
252   CLBStatsMsg* msg = new CLBStatsMsg(osz, csz);
253   _MEMCHECK(msg);
254   msg->from_pe = CkMyPe();
255 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
256         msg->step = step();
257 #endif
258   //msg->serial = CrnRand();
259
260 /*
261   theLbdb->TotalTime(&msg->total_walltime,&msg->total_cputime);
262   theLbdb->IdleTime(&msg->idletime);
263   theLbdb->BackgroundLoad(&msg->bg_walltime,&msg->bg_cputime);
264 */
265 #if CMK_LB_CPUTIMER
266   theLbdb->GetTime(&msg->total_walltime,&msg->total_cputime,
267                    &msg->idletime, &msg->bg_walltime,&msg->bg_cputime);
268 #else
269   theLbdb->GetTime(&msg->total_walltime,&msg->total_walltime,
270                    &msg->idletime, &msg->bg_walltime,&msg->bg_walltime);
271 #endif
272
273   msg->pe_speed = myspeed;
274   DEBUGF(("Processor %d Total time (wall,cpu) = %f %f Idle = %f Bg = %f %f\n", CkMyPe(),msg->total_walltime,msg->total_cputime,msg->idletime,msg->bg_walltime,msg->bg_cputime));
275
276   msg->n_objs = osz;
277   theLbdb->GetObjData(msg->objData);
278   msg->n_comm = csz;
279   theLbdb->GetCommData(msg->commData);
280 //  theLbdb->ClearLoads();
281   DEBUGF(("PE %d BuildStatsMsg %d objs, %d comm\n",CkMyPe(),msg->n_objs,msg->n_comm));
282
283   if(CkMyPe() == cur_ld_balancer) {
284     msg->avail_vector = new char[CkNumPes()];
285     LBDatabaseObj()->get_avail_vector(msg->avail_vector);
286     msg->next_lb = LBDatabaseObj()->new_lbbalancer();
287   }
288
289   CmiAssert(statsMsg == NULL);
290   statsMsg = msg;
291 #endif
292 }
293
294
295 // called on every processor
296 void CentralLB::SendStats()
297 {
298 #if CMK_LBDB_ON
299   CmiAssert(statsMsg != NULL);
300   reduction_started = 0;
301
302 #if USE_LDB_SPANNING_TREE
303   if(CkNumPes()>1024)
304   {
305     if (CkMyPe() == cur_ld_balancer)
306       thisProxy[CkMyPe()].ReceiveStats(statsMsg);
307     else
308       thisProxy[CkMyPe()].ReceiveStatsViaTree(statsMsg);
309   }
310   else
311 #endif
312   {
313     DEBUGF(("[%d] calling ReceiveStats on step %d \n",CmiMyPe(),step()));
314     thisProxy[cur_ld_balancer].ReceiveStats(statsMsg);
315   }
316
317   statsMsg = NULL;
318
319 #ifdef __BIGSIM__
320   BgEndStreaming();
321 #endif
322
323   {
324   // enfore the barrier to wait until centralLB says no
325   LDOMHandle h;
326   h.id.id.idx = 0;
327   theLbdb->getLBDB()->RegisteringObjects(h);
328   }
329 #endif
330 }
331
332 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
333 extern int donotCountMigration;
334 #endif
335
336 void CentralLB::Migrated(LDObjHandle h, int waitBarrier)
337 {
338 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
339     if(donotCountMigration){
340         return ;
341     }
342 #endif
343
344 #if CMK_LBDB_ON
345   if (waitBarrier) {
346             migrates_completed++;
347       DEBUGF(("[%d] An object migrated! %d %d\n",CkMyPe(),migrates_completed,migrates_expected));
348     if (migrates_completed == migrates_expected) {
349       MigrationDone(1);
350     }
351   }
352   else {
353     future_migrates_completed ++;
354     DEBUGF(("[%d] An object migrated with no barrier! %d expected: %d\n",CkMyPe(),future_migrates_completed,future_migrates_expected));
355     if (future_migrates_completed == future_migrates_expected)  {
356         CheckMigrationComplete();
357     }
358   }
359 #endif
360 }
361
362 void CentralLB::MissMigrate(int waitForBarrier)
363 {
364   LDObjHandle h;
365   Migrated(h, waitForBarrier);
366 }
367
368 // build a complete data from bufferred messages
369 // not used when USE_REDUCTION = 1
370 void CentralLB::buildStats()
371 {
372     statsData->nprocs() = stats_msg_count;
373     // allocate space
374     statsData->objData.resize(statsData->n_objs);
375     statsData->from_proc.resize(statsData->n_objs);
376     statsData->to_proc.resize(statsData->n_objs);
377     statsData->commData.resize(statsData->n_comm);
378
379     int nobj = 0;
380     int ncom = 0;
381     int nmigobj = 0;
382     // copy all data in individule message to this big structure
383     for (int pe=0; pe<CkNumPes(); pe++) {
384        int i;
385        CLBStatsMsg *msg = statsMsgsList[pe];
386        if(msg == NULL) continue;
387        for (i=0; i<msg->n_objs; i++) {
388          statsData->from_proc[nobj] = statsData->to_proc[nobj] = pe;
389          statsData->objData[nobj] = msg->objData[i];
390          if (msg->objData[i].migratable) nmigobj++;
391          nobj++;
392        }
393        for (i=0; i<msg->n_comm; i++) {
394          statsData->commData[ncom] = msg->commData[i];
395          ncom++;
396        }
397        // free the memory
398        delete msg;
399        statsMsgsList[pe]=0;
400     }
401     statsData->n_migrateobjs = nmigobj;
402 }
403
404 // deposit one processor data at a time, note database is pre-allocated
405 // to have enough space
406 // used when USE_REDUCTION = 1
407 void CentralLB::depositData(CLBStatsMsg *m)
408 {
409   int i;
410   if (m == NULL) return;
411
412   const int pe = m->from_pe;
413   struct ProcStats &procStat = statsData->procs[pe];
414   procStat.pe = pe;
415   procStat.total_walltime = m->total_walltime;
416   procStat.idletime = m->idletime;
417   procStat.bg_walltime = m->bg_walltime;
418 #if CMK_LB_CPUTIMER
419   procStat.total_cputime = m->total_cputime;
420   procStat.bg_cputime = m->bg_cputime;
421 #endif
422   procStat.pe_speed = m->pe_speed;
423   //procStat.utilization = 1.0;
424   procStat.available = CmiTrue;
425   procStat.n_objs = m->n_objs;
426
427   int &nobj = statsData->n_objs;
428   int &nmigobj = statsData->n_migrateobjs;
429   for (i=0; i<m->n_objs; i++) {
430       statsData->from_proc[nobj] = statsData->to_proc[nobj] = pe;
431       statsData->objData[nobj] = m->objData[i];
432       if (m->objData[i].migratable) nmigobj++;
433       nobj++;
434       CmiAssert(nobj <= statsData->objData.capacity());
435   }
436   int &n_comm = statsData->n_comm;
437   for (i=0; i<m->n_comm; i++) {
438       statsData->commData[n_comm] = m->commData[i];
439       n_comm++;
440       CmiAssert(n_comm <= statsData->commData.capacity());
441   }
442   delete m;
443 }
444
445 void CentralLB::ReceiveStats(CkMarshalledCLBStatsMessage &msg)
446 {
447 #if CMK_LBDB_ON
448   if (statsMsgsList == NULL) {
449     statsMsgsList = new CLBStatsMsg*[CkNumPes()];
450     CmiAssert(statsMsgsList != NULL);
451     for(int i=0; i < CkNumPes(); i++)
452       statsMsgsList[i] = 0;
453   }
454   if (statsData == NULL) statsData = new LDStats;
455
456     //  loop through all CLBStatsMsg in the incoming msg
457   int count = msg.getCount();
458   for (int num = 0; num < count; num++) 
459   {
460     CLBStatsMsg *m = msg.getMessage(num);
461     CmiAssert(m!=NULL);
462     const int pe = m->from_pe;
463     DEBUGF(("Stats msg received, %d %d %d %p step %d\n", pe,stats_msg_count,m->n_objs,m,step()));
464 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))     
465 /*      
466  *  if(m->step < step()){
467  *    //TODO: if a processor is redoing an old load balance step..
468  *    //tell it that the step is done and that it should not perform any migrations
469  *      thisProxy[pe].ReceiveDummyMigration();
470  *  }*/
471 #endif
472         
473     if(!CmiNodeAlive(pe)){
474         DEBUGF(("[%d] ReceiveStats called from invalidProcessor %d\n",CkMyPe(),pe));
475         continue;
476     }
477         
478     if (m->avail_vector!=NULL) {
479       LBDatabaseObj()->set_avail_vector(m->avail_vector,  m->next_lb);
480     }
481
482     if (statsMsgsList[pe] != 0) {
483       CkPrintf("*** Unexpected CLBStatsMsg in ReceiveStats from PE %d ***\n",
484              pe);
485     } else {
486       statsMsgsList[pe] = m;
487 #if USE_REDUCTION
488       depositData(m);
489 #else
490       // store per processor data right away
491       struct ProcStats &procStat = statsData->procs[pe];
492       procStat.pe = pe;
493       procStat.total_walltime = m->total_walltime;
494       procStat.idletime = m->idletime;
495       procStat.bg_walltime = m->bg_walltime;
496 #if CMK_LB_CPUTIMER
497       procStat.total_cputime = m->total_cputime;
498       procStat.bg_cputime = m->bg_cputime;
499 #endif
500       procStat.pe_speed = m->pe_speed;
501       //procStat.utilization = 1.0;
502       procStat.available = CmiTrue;
503       procStat.n_objs = m->n_objs;
504
505       statsData->n_objs += m->n_objs;
506       statsData->n_comm += m->n_comm;
507 #endif
508       stats_msg_count++;
509     }
510   }    // end of for
511
512   const int clients = CkNumValidPes();
513   DEBUGF(("THIS POINT count = %d, clients = %d\n",stats_msg_count,clients));
514  
515   if (stats_msg_count == clients) {
516         DEBUGF(("[%d] All stats messages received \n",CmiMyPe()));
517     statsData->nprocs() = stats_msg_count;
518     thisProxy[CkMyPe()].LoadBalance();
519   }
520 #endif
521 }
522
523 /** added by Abhinav for receiving msgs via spanning tree */
524 void CentralLB::ReceiveStatsViaTree(CkMarshalledCLBStatsMessage &msg)
525 {
526 #if CMK_LBDB_ON
527         CmiAssert(CkMyPe() != 0);
528         bufMsg.add(msg);         // buffer messages
529         count_msgs++;
530         //CkPrintf("here %d\n", CkMyPe());
531         if (count_msgs == st.numChildren+1) {
532                 if(st.parent == 0)
533                 {
534                         thisProxy[0].ReceiveStats(bufMsg);
535                         //CkPrintf("from %d\n", CkMyPe());
536                 }
537                 else
538                         thisProxy[st.parent].ReceiveStatsViaTree(bufMsg);
539                 count_msgs = 0;
540                 bufMsg.free();
541         } 
542 #endif
543 }
544
545 void CentralLB::LoadBalance()
546 {
547 #if CMK_LBDB_ON
548   int proc;
549   const int clients = CkNumPes();
550
551 #if ! USE_REDUCTION
552   // build data
553   buildStats();
554 #else
555   for (proc = 0; proc < clients; proc++) statsMsgsList[proc] = NULL;
556 #endif
557
558   theLbdb->ResetAdaptive();
559   if (!_lb_args.samePeSpeed()) statsData->normalize_speed();
560
561   if (_lb_args.debug()) 
562       CmiPrintf("\nCharmLB> %s: PE [%d] step %d starting at %f Memory: %f MB\n",
563                   lbname, cur_ld_balancer, step(), start_lb_time,
564                   CmiMemoryUsage()/(1024.0*1024.0));
565
566   // if we are in simulation mode read data
567   if (LBSimulation::doSimulation) simulationRead();
568
569   char *availVector = LBDatabaseObj()->availVector();
570   for(proc = 0; proc < clients; proc++)
571       statsData->procs[proc].available = (CmiBool)availVector[proc];
572
573   preprocess(statsData);
574
575 //    CkPrintf("Before Calling Strategy\n");
576
577   if (_lb_args.printSummary()) {
578       LBInfo info(clients);
579         // not take comm data
580       info.getInfo(statsData, clients, 0);
581       LBRealType mLoad, mCpuLoad, totalLoad;
582       info.getSummary(mLoad, mCpuLoad, totalLoad);
583       int nmsgs, nbytes;
584       statsData->computeNonlocalComm(nmsgs, nbytes);
585       CkPrintf("[%d] Load Summary (before LB): max (with bg load): %f max (obj only): %f average: %f at step %d nonlocal: %d msgs %.2fKB.\n", CkMyPe(), mLoad, mCpuLoad, totalLoad/clients, step(), nmsgs, 1.0*nbytes/1024);
586 //      if (_lb_args.debug() > 1) {
587 //        for (int i=0; i<statsData->n_objs; i++)
588 //          CmiPrintf("[%d] %.10f %.10f\n", i, statsData->objData[i].minWall, statsData->objData[i].maxWall);
589 //      }
590   }
591
592 #if CMK_REPLAYSYSTEM
593   LDHandle *loadBalancer_pointers;
594   if (_replaySystem) {
595     loadBalancer_pointers = (LDHandle*)malloc(CkNumPes()*sizeof(LDHandle));
596     for (int i=0; i<statsData->n_objs; ++i) loadBalancer_pointers[statsData->from_proc[i]] = statsData->objData[i].handle.omhandle.ldb;
597   }
598 #endif
599   
600   LBMigrateMsg* migrateMsg = Strategy(statsData);
601 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
602         migrateMsg->step = step();
603 #endif
604
605 #if CMK_REPLAYSYSTEM
606   CpdHandleLBMessage(&migrateMsg);
607   if (_replaySystem) {
608     for (int i=0; i<migrateMsg->n_moves; ++i) migrateMsg->moves[i].obj.omhandle.ldb = loadBalancer_pointers[migrateMsg->moves[i].from_pe];
609     free(loadBalancer_pointers);
610   }
611 #endif
612   
613   LBDatabaseObj()->get_avail_vector(migrateMsg->avail_vector);
614   migrateMsg->next_lb = LBDatabaseObj()->new_lbbalancer();
615
616   // if this is the step at which we need to dump the database
617   simulationWrite();
618
619 //  calculate predicted load
620 //  very time consuming though, so only happen when debugging is on
621   if (_lb_args.printSummary()) {
622       LBInfo info(clients);
623         // not take comm data
624       getPredictedLoadWithMsg(statsData, clients, migrateMsg, info, 0);
625       LBRealType mLoad, mCpuLoad, totalLoad;
626       info.getSummary(mLoad, mCpuLoad, totalLoad);
627       int nmsgs, nbytes;
628       statsData->computeNonlocalComm(nmsgs, nbytes);
629       CkPrintf("[%d] Load Summary (after LB): max (with bg load): %f max (obj only): %f average: %f at step %d nonlocal: %d msgs %.2fKB useMem: %.2fKB.\n", CkMyPe(), mLoad, mCpuLoad, totalLoad/clients, step(), nmsgs, 1.0*nbytes/1024, (1.0*useMem())/1024);
630       for (int i=0; i<clients; i++)
631         migrateMsg->expectedLoad[i] = info.peLoads[i];
632   }
633
634   DEBUGF(("[%d]calling recv migration\n",CkMyPe()));
635 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_)) 
636     lbDecisionCount++;
637     migrateMsg->lbDecisionCount = lbDecisionCount;
638 #endif
639
640   envelope *env = UsrToEnv(migrateMsg);
641   if (1) {
642       // broadcast
643     thisProxy.ReceiveMigration(migrateMsg);
644   }
645   else {
646     // split the migration for each processor
647     for (int p=0; p<CkNumPes(); p++) {
648       LBMigrateMsg *m = extractMigrateMsg(migrateMsg, p);
649       thisProxy[p].ReceiveMigration(m);
650     }
651     delete migrateMsg;
652   }
653
654   // Zero out data structures for next cycle
655   // CkPrintf("zeroing out data\n");
656   statsData->clear();
657   stats_msg_count=0;
658 #endif
659 }
660
661 // test if sender and receiver in a commData is nonmigratable.
662 static int isMigratable(LDObjData **objData, int *len, int count, const LDCommData &commData)
663 {
664 #if CMK_LBDB_ON
665   for (int pe=0 ; pe<count; pe++)
666   {
667     for (int i=0; i<len[pe]; i++)
668       if (LDObjIDEqual(objData[pe][i].objID(), commData.sender.objID()) ||
669           LDObjIDEqual(objData[pe][i].objID(), commData.receiver.get_destObj().objID())) 
670       return 0;
671   }
672 #endif
673   return 1;
674 }
675
676 // rebuild LDStats and remove all non-migratble objects and related things
677 void CentralLB::removeNonMigratable(LDStats* stats, int count)
678 {
679   int i;
680
681   // check if we have non-migratable objects
682   int have = 0;
683   for (i=0; i<stats->n_objs; i++) 
684   {
685     LDObjData &odata = stats->objData[i];
686     if (!odata.migratable) {
687       have = 1; break;
688     }
689   }
690   if (have == 0) return;
691
692   CkVec<LDObjData> nonmig;
693   CkVec<int> new_from_proc, new_to_proc;
694   nonmig.resize(stats->n_migrateobjs);
695   new_from_proc.resize(stats->n_migrateobjs);
696   new_to_proc.resize(stats->n_migrateobjs);
697   int n_objs = 0;
698   for (i=0; i<stats->n_objs; i++) 
699   {
700     LDObjData &odata = stats->objData[i];
701     if (odata.migratable) {
702       nonmig[n_objs] = odata;
703       new_from_proc[n_objs] = stats->from_proc[i];
704       new_to_proc[n_objs] = stats->to_proc[i];
705       n_objs ++;
706     }
707     else {
708       stats->procs[stats->from_proc[i]].bg_walltime += odata.wallTime;
709 #if CMK_LB_CPUTIMER
710       stats->procs[stats->from_proc[i]].bg_cputime += odata.cpuTime;
711 #endif
712     }
713   }
714   CmiAssert(stats->n_migrateobjs == n_objs);
715
716   stats->makeCommHash();
717   
718   CkVec<LDCommData> newCommData;
719   newCommData.resize(stats->n_comm);
720   int n_comm = 0;
721   for (i=0; i<stats->n_comm; i++) 
722   {
723     LDCommData& cdata = stats->commData[i];
724     if (!cdata.from_proc()) 
725     {
726       int idx = stats->getSendHash(cdata);
727       CmiAssert(idx != -1);
728       if (!stats->objData[idx].migratable) continue;
729     }
730     switch (cdata.receiver.get_type()) {
731     case LD_PROC_MSG:
732       break;
733     case LD_OBJ_MSG:  {
734       int idx = stats->getRecvHash(cdata);
735       if (stats->complete_flag)
736         CmiAssert(idx != -1);
737       else if (idx == -1) continue;          // receiver not in this group
738       if (!stats->objData[idx].migratable) continue;
739       break;
740       }
741     case LD_OBJLIST_MSG:    // object message FIXME add multicast
742       break;
743     }
744     newCommData[n_comm] = cdata;
745     n_comm ++;
746   }
747
748   if (n_objs != stats->n_objs) CmiPrintf("Removed %d nonmigratable %d comms - n_objs:%d migratable:%d\n", stats->n_objs-n_objs, stats->n_objs, stats->n_migrateobjs, stats->n_comm-n_comm);
749
750   // swap to new data
751   stats->objData = nonmig;
752   stats->from_proc = new_from_proc;
753   stats->to_proc = new_to_proc;
754   stats->n_objs = n_objs;
755
756   stats->commData = newCommData;
757   stats->n_comm = n_comm;
758
759   stats->deleteCommHash();
760   stats->makeCommHash();
761
762 }
763
764
765 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
766 extern int restarted;
767 #endif
768
769 void CentralLB::ReceiveMigration(LBMigrateMsg *m)
770 {
771   storedMigrateMsg = m;
772 #if CMK_MEM_CHECKPOINT
773   CkResetInLdb();
774 #endif
775 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
776         restoreParallelRecovery(&resumeAfterRestoreParallelRecovery,(void *)this);
777 #else
778   CkCallback cb(CkIndex_CentralLB::ProcessReceiveMigration((CkReductionMsg*)NULL),
779                   thisProxy);
780   contribute(0, NULL, CkReduction::max_int, cb);
781 #endif
782 }
783
784 void CentralLB::ProcessReceiveMigration(CkReductionMsg  *msg)
785 {
786 #if CMK_LBDB_ON
787         int i;
788         LBMigrateMsg *m = storedMigrateMsg;
789         CmiAssert(m!=NULL);
790         delete msg;
791
792 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
793         int *dummyCounts;
794
795         DEBUGF(("[%d] Starting ReceiveMigration WITH step %d m->step %d\n",CkMyPe(),step(),m->step));
796         // CmiPrintf("[%d] Starting ReceiveMigration step %d m->step %d\n",CkMyPe(),step(),m->step);
797         if(step() > m->step){
798                 char str[100];
799                 envelope *env = UsrToEnv(m);
800                 return;
801         }
802         lbDecisionCount = m->lbDecisionCount;
803 #endif
804
805   if (_lb_args.debug() > 1) 
806     if (CkMyPe()%1024==0) CmiPrintf("[%d] Starting ReceiveMigration step %d at %f\n",CkMyPe(),step(), CmiWallTimer());
807
808   for (i=0; i<CkNumPes(); i++) theLbdb->lastLBInfo.expectedLoad[i] = m->expectedLoad[i];
809   CmiAssert(migrates_expected <= 0 || migrates_completed == migrates_expected);
810 /*FAULT_EVAC*/
811   if(!CmiNodeAlive(CkMyPe())){
812         delete m;
813         return;
814   }
815   migrates_expected = 0;
816   future_migrates_expected = 0;
817 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
818         int sending=0;
819     int dummy=0;
820         LBDB *_myLBDB = theLbdb->getLBDB();
821         if(_restartFlag){
822         dummyCounts = new int[CmiNumPes()];
823         bzero(dummyCounts,sizeof(int)*CmiNumPes());
824     }
825 #endif
826   for(i=0; i < m->n_moves; i++) {
827     MigrateInfo& move = m->moves[i];
828     const int me = CkMyPe();
829     if (move.from_pe == me && move.to_pe != me) {
830       DEBUGF(("[%d] migrating object to %d\n",move.from_pe,move.to_pe));
831       // migrate object, in case it is already gone, inform toPe
832 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
833       if (theLbdb->Migrate(move.obj,move.to_pe) == 0) 
834          thisProxy[move.to_pe].MissMigrate(!move.async_arrival);
835 #else
836             if(_restartFlag == 0){
837                 DEBUG(CmiPrintf("[%d] need to move object from %d to %d \n",CkMyPe(),move.from_pe,move.to_pe));
838                 theLbdb->Migrate(move.obj,move.to_pe);
839                 sending++;
840             }else{
841                 if(_myLBDB->validObjHandle(move.obj)){
842                     DEBUG(CmiPrintf("[%d] need to move object from %d to %d \n",CkMyPe(),move.from_pe,move.to_pe));
843                     theLbdb->Migrate(move.obj,move.to_pe);
844                     sending++;
845                 }else{
846                     DEBUG(CmiPrintf("[%d] dummy move to pe %d detected after restart \n",CmiMyPe(),move.to_pe));
847                     dummyCounts[move.to_pe]++;
848                     dummy++;
849                 }
850             }
851 #endif
852     } else if (move.from_pe != me && move.to_pe == me) {
853        DEBUGF(("[%d] expecting object from %d\n",move.to_pe,move.from_pe));
854       if (!move.async_arrival) migrates_expected++;
855       else future_migrates_expected++;
856     }
857     else {
858 #if CMK_GLOBAL_LOCATION_UPDATE      
859       UpdateLocation(move); 
860 #endif
861     }
862
863   }
864   DEBUGF(("[%d] in ReceiveMigration %d moves expected: %d future expected: %d\n",CkMyPe(),m->n_moves, migrates_expected, future_migrates_expected));
865   // if (_lb_debug) CkPrintf("[%d] expecting %d objects migrating.\n", CkMyPe(), migrates_expected);
866
867 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
868         if(_restartFlag){
869                 sendDummyMigrationCounts(dummyCounts);
870                 _restartFlag  =0;
871         delete []dummyCounts;
872         }
873 #endif
874
875
876 #if 0
877   if (m->n_moves ==0) {
878     theLbdb->SetLBPeriod(theLbdb->GetLBPeriod()*2);
879   }
880 #endif
881   cur_ld_balancer = m->next_lb;
882   if((CkMyPe() == cur_ld_balancer) && (cur_ld_balancer != 0)){
883       LBDatabaseObj()->set_avail_vector(m->avail_vector, -2);
884   }
885
886   if (migrates_expected == 0 || migrates_completed == migrates_expected)
887     MigrationDone(1);
888   delete m;
889
890 //      CkEvacuatedElement();
891 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
892 //  migrates_expected = 0;
893 //  //  ResumeClients(1);
894 #endif
895 #endif
896 }
897
898 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
899 void CentralLB::ReceiveDummyMigration(int globalDecisionCount){
900     DEBUGF(("[%d] ReceiveDummyMigration called for step %d with globalDecisionCount %d\n",CkMyPe(),step(),globalDecisionCount));
901     //TODO: this is gonna be important when a crash happens during checkpoint
902     //the globalDecisionCount would have to be saved and compared against
903     //a future recvMigration
904                 
905         thisProxy[CkMyPe()].ResumeClients(1);
906 }
907 #endif
908
909 void CentralLB::MigrationDone(int balancing)
910 {
911 #if CMK_LBDB_ON
912   migrates_completed = 0;
913   migrates_expected = -1;
914   // clear load stats
915   if (balancing) theLbdb->ClearLoads();
916   // Increment to next step
917   theLbdb->incStep();
918         DEBUGF(("[%d] Incrementing Step %d \n",CkMyPe(),step()));
919   // if sync resume, invoke a barrier
920
921 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
922     savedBalancing = balancing;
923     startLoadBalancingMlog(&resumeCentralLbAfterChkpt,(void *)this);
924 #endif
925
926   LBDatabase::Object()->MigrationDone();    // call registered callbacks
927
928   LoadbalanceDone(balancing);        // callback
929 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
930   // if sync resume invoke a barrier
931   if (balancing && _lb_args.syncResume()) {
932     CkCallback cb(CkIndex_CentralLB::ResumeClients((CkReductionMsg*)NULL), 
933                   thisProxy);
934     contribute(0, NULL, CkReduction::sum_int, cb);
935   }
936   else{ 
937     if(CmiNodeAlive(CkMyPe())){
938         thisProxy [CkMyPe()].ResumeClients(balancing);
939     }   
940   }     
941 #if CMK_GRID_QUEUE_AVAILABLE
942   CmiGridQueueDeregisterAll ();
943   CpvAccess(CkGridObject) = NULL;
944 #endif
945 #endif 
946 #endif
947 }
948
949 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
950 void CentralLB::endMigrationDone(int balancing){
951     DEBUGF(("[%d] CentralLB::endMigrationDone step %d\n",CkMyPe(),step()));
952
953
954   if (balancing && _lb_args.syncResume()) {
955     CkCallback cb(CkIndex_CentralLB::ResumeClients((CkReductionMsg*)NULL),
956                   thisProxy);
957     contribute(0, NULL, CkReduction::sum_int, cb);
958   }
959   else{
960     if(CmiNodeAlive(CkMyPe())){
961     DEBUGF(("[%d] Sending ResumeClients balancing %d \n",CkMyPe(),balancing));
962     thisProxy [CkMyPe()].ResumeClients(balancing);
963     }
964   }
965
966 }
967 #endif
968
969 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
970 void resumeCentralLbAfterChkpt(void *_lb){
971     CentralLB *lb= (CentralLB *)_lb;
972     CpvAccess(_currentObj)=lb;
973     lb->endMigrationDone(lb->savedBalancing);
974 }
975 void resumeAfterRestoreParallelRecovery(void *_lb){
976     CentralLB *lb= (CentralLB *)_lb;
977         lb->ProcessReceiveMigration((CkReductionMsg*)NULL);
978 }
979 #endif
980
981
982 void CentralLB::ResumeClients(CkReductionMsg *msg)
983 {
984   ResumeClients(1);
985   delete msg;
986 }
987
988 void CentralLB::ResumeClients(int balancing)
989 {
990 #if CMK_LBDB_ON
991 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
992     resumeCount++;
993     globalResumeCount = resumeCount;
994 #endif
995   DEBUGF(("[%d] Resuming clients. balancing:%d.\n",CkMyPe(),balancing));
996   if (balancing && _lb_args.debug() && CkMyPe() == cur_ld_balancer) {
997     double end_lb_time = CkWallTimer();
998   }
999
1000 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
1001   if (balancing) ComlibNotifyMigrationDone();  
1002 #endif
1003
1004   theLbdb->ResumeClients();
1005   if (balancing)  {
1006
1007     CheckMigrationComplete();
1008     if (future_migrates_expected == 0 || 
1009             future_migrates_expected == future_migrates_completed) {
1010       CheckMigrationComplete();
1011     }
1012   }
1013 #endif
1014 }
1015
1016 /*
1017   migration of objects contains two different kinds:
1018   (1) objects want to make a barrier for migration completion
1019       (waitForBarrier is true)
1020       migrationDone() to finish and resumeClients
1021   (2) objects don't need a barrier
1022   However, next load balancing can only happen when both migrations complete
1023 */ 
1024 void CentralLB::CheckMigrationComplete()
1025 {
1026 #if CMK_LBDB_ON
1027   lbdone ++;
1028   if (lbdone == 2) {
1029     double end_lb_time = CkWallTimer();
1030     if (_lb_args.debug() && CkMyPe()==0) {
1031       CkPrintf("CharmLB> %s: PE [%d] step %d finished at %f duration %f s\n\n",
1032                 lbname, cur_ld_balancer, step()-1, end_lb_time,
1033                 end_lb_time-start_lb_time);
1034     }
1035
1036     theLbdb->SetMigrationCost(end_lb_time - start_lb_time);
1037
1038     lbdone = 0;
1039     future_migrates_expected = -1;
1040     future_migrates_completed = 0;
1041
1042
1043     DEBUGF(("[%d] Migration Complete\n", CkMyPe()));
1044     // release local barrier  so that the next load balancer can go
1045     LDOMHandle h;
1046     h.id.id.idx = 0;
1047     theLbdb->getLBDB()->DoneRegisteringObjects(h);
1048     // switch to the next load balancer in the list
1049     // subtle: called from Migrated() may result in Migrated() called in next LB
1050     theLbdb->nextLoadbalancer(seqno);
1051   }
1052 #endif
1053 }
1054
1055 void CentralLB::preprocess(LDStats* stats)
1056 {
1057   if (_lb_args.ignoreBgLoad())
1058     stats->clearBgLoad();
1059
1060   // Call the predictor for the future
1061   if (_lb_predict) FuturePredictor(statsData);
1062 }
1063
1064 // default load balancing strategy
1065 LBMigrateMsg* CentralLB::Strategy(LDStats* stats)
1066 {
1067 #if CMK_LBDB_ON
1068   double strat_start_time = CkWallTimer();
1069   if (_lb_args.debug())
1070     CkPrintf("CharmLB> %s: PE [%d] strategy starting at %f\n", lbname, cur_ld_balancer, strat_start_time);
1071
1072   work(stats);
1073
1074
1075   if (_lb_args.debug()>2)  {
1076     CkPrintf("CharmLB> Obj Map:\n");
1077     for (int i=0; i<stats->n_objs; i++) CkPrintf("%d ", stats->to_proc[i]);
1078     CkPrintf("\n");
1079   }
1080
1081   LBMigrateMsg *msg = createMigrateMsg(stats);
1082
1083         /* Extra feature for MetaBalancer
1084   if (_lb_args.metaLbOn()) {
1085     int clients = CkNumPes();
1086     LBInfo info(clients);
1087     getPredictedLoadWithMsg(stats, clients, msg, info, 0);
1088     LBRealType mLoad, mCpuLoad, totalLoad, totalLoadWComm;
1089     info.getSummary(mLoad, mCpuLoad, totalLoad);
1090     theLbdb->UpdateDataAfterLB(mLoad, mCpuLoad, totalLoad/clients);
1091   }
1092         */
1093
1094   if (_lb_args.debug()) {
1095     double strat_end_time = CkWallTimer();
1096     envelope *env = UsrToEnv(msg);
1097
1098     double lbdbMemsize = LBDatabase::Object()->useMem()/1000;
1099     CkPrintf("CharmLB> %s: PE [%d] Memory: LBManager: %d KB CentralLB: %d KB\n",
1100               lbname, cur_ld_balancer, (int)lbdbMemsize, (int)(useMem()/1000));
1101     CkPrintf("CharmLB> %s: PE [%d] #Objects migrating: %d, LBMigrateMsg size: %.2f MB\n", lbname, cur_ld_balancer, msg->n_moves, env->getTotalsize()/1024.0/1024.0);
1102     CkPrintf("CharmLB> %s: PE [%d] strategy finished at %f duration %f s\n",
1103               lbname, cur_ld_balancer, strat_end_time, strat_end_time-strat_start_time);
1104     theLbdb->SetStrategyCost(strat_end_time - strat_start_time);
1105   }
1106   return msg;
1107 #else
1108   return NULL;
1109 #endif
1110 }
1111
1112 void CentralLB::work(LDStats* stats)
1113 {
1114   // does nothing but print the database
1115   stats->print();
1116 }
1117
1118 // generate migrate message from stats->from_proc and to_proc
1119 LBMigrateMsg * CentralLB::createMigrateMsg(LDStats* stats)
1120 {
1121   int i;
1122   CkVec<MigrateInfo*> migrateInfo;
1123   for (i=0; i<stats->n_objs; i++) {
1124     LDObjData &objData = stats->objData[i];
1125     int frompe = stats->from_proc[i];
1126     int tope = stats->to_proc[i];
1127     if (frompe != tope) {
1128       //      CkPrintf("[%d] Obj %d migrating from %d to %d\n",
1129       //         CkMyPe(),obj,pe,dest);
1130       MigrateInfo *migrateMe = new MigrateInfo;
1131       migrateMe->obj = objData.handle;
1132       migrateMe->from_pe = frompe;
1133       migrateMe->to_pe = tope;
1134       migrateMe->async_arrival = objData.asyncArrival;
1135       migrateInfo.insertAtEnd(migrateMe);
1136     }
1137   }
1138
1139   int migrate_count=migrateInfo.length();
1140   LBMigrateMsg* msg = new(migrate_count,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
1141   msg->n_moves = migrate_count;
1142   for(i=0; i < migrate_count; i++) {
1143     MigrateInfo* item = (MigrateInfo*) migrateInfo[i];
1144     msg->moves[i] = *item;
1145     delete item;
1146     migrateInfo[i] = 0;
1147   }
1148   return msg;
1149 }
1150
1151 LBMigrateMsg * CentralLB::extractMigrateMsg(LBMigrateMsg *m, int p)
1152 {
1153   int nmoves = 0;
1154   int nunavail = 0;
1155   int i;
1156   for (i=0; i<m->n_moves; i++) {
1157     MigrateInfo* item = (MigrateInfo*) &m->moves[i];
1158     if (item->from_pe == p || item->to_pe == p) nmoves++;
1159   }
1160   for (i=0; i<CkNumPes();i++) {
1161     if (!m->avail_vector[i]) nunavail++;
1162   }
1163   LBMigrateMsg* msg;
1164   if (nunavail) msg = new(nmoves,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
1165   else msg = new(nmoves,0,0,0) LBMigrateMsg;
1166   msg->n_moves = nmoves;
1167   msg->level = m->level;
1168   msg->next_lb = m->next_lb;
1169   for (i=0,nmoves=0; i<m->n_moves; i++) {
1170     MigrateInfo* item = (MigrateInfo*) &m->moves[i];
1171     if (item->from_pe == p || item->to_pe == p) {
1172       msg->moves[nmoves] = *item;
1173       nmoves++;
1174     }
1175   }
1176   // copy processor data
1177   if (nunavail)
1178   for (i=0; i<CkNumPes();i++) {
1179     msg->avail_vector[i] = m->avail_vector[i];
1180     msg->expectedLoad[i] = m->expectedLoad[i];
1181   }
1182   return msg;
1183 }
1184
1185 void CentralLB::simulationWrite() {
1186   if(step() == LBSimulation::dumpStep)
1187   {
1188     // here we are supposed to dump the database
1189     int dumpFileSize = strlen(LBSimulation::dumpFile) + 4;
1190     char *dumpFileName = (char *)malloc(dumpFileSize);
1191     while (sprintf(dumpFileName, "%s.%d", LBSimulation::dumpFile, LBSimulation::dumpStep) >= dumpFileSize) {
1192       free(dumpFileName);
1193       dumpFileSize+=3;
1194       dumpFileName = (char *)malloc(dumpFileSize);
1195     }
1196     writeStatsMsgs(dumpFileName);
1197     free(dumpFileName);
1198     CmiPrintf("LBDump: Dumped the load balancing data at step %d.\n",LBSimulation::dumpStep);
1199     ++LBSimulation::dumpStep;
1200     --LBSimulation::dumpStepSize;
1201     if (LBSimulation::dumpStepSize <= 0) { // prevent stupid step sizes
1202       CmiPrintf("Charm++> Exiting...\n");
1203       CkExit();
1204     }
1205     return;
1206   }
1207 }
1208
1209 void CentralLB::simulationRead() {
1210   LBSimulation *simResults = NULL, *realResults;
1211   LBMigrateMsg *voidMessage = new (0,0,0,0) LBMigrateMsg();
1212   voidMessage->n_moves=0;
1213   for ( ;LBSimulation::simStepSize > 0; --LBSimulation::simStepSize, ++LBSimulation::simStep) {
1214     // here we are supposed to read the data from the dump database
1215     int simFileSize = strlen(LBSimulation::dumpFile) + 4;
1216     char *simFileName = (char *)malloc(simFileSize);
1217     while (sprintf(simFileName, "%s.%d", LBSimulation::dumpFile, LBSimulation::simStep) >= simFileSize) {
1218       free(simFileName);
1219       simFileSize+=3;
1220       simFileName = (char *)malloc(simFileSize);
1221     }
1222     readStatsMsgs(simFileName);
1223
1224     // allocate simResults (only the first step)
1225     if (simResults == NULL) {
1226       simResults = new LBSimulation(LBSimulation::simProcs);
1227       realResults = new LBSimulation(LBSimulation::simProcs);
1228     }
1229     else {
1230       // should be the same number of procs of the original simulation!
1231       if (!LBSimulation::procsChanged) {
1232         // it means we have a previous step, so in simResults there is data.
1233         // we can now print the real effects of the load balancer during the simulation
1234         // or print the difference between the predicted data and the real one.
1235         realResults->reset();
1236         // reset to_proc of statsData to be equal to from_proc
1237         for (int k=0; k < statsData->n_objs; ++k) statsData->to_proc[k] = statsData->from_proc[k];
1238         findSimResults(statsData, LBSimulation::simProcs, voidMessage, realResults);
1239         simResults->PrintDifferences(realResults,statsData);
1240       }
1241       simResults->reset();
1242     }
1243
1244     // now pass it to the strategy routine
1245     double startT = CkWallTimer();
1246     preprocess(statsData);
1247     CmiPrintf("%s> Strategy starts ... \n", lbname);
1248     LBMigrateMsg* migrateMsg = Strategy(statsData);
1249     CmiPrintf("%s> Strategy took %fs memory usage: CentralLB: %d KB.\n",
1250                lbname, CkWallTimer()-startT, (int)(useMem()/1000));
1251
1252     // now calculate the results of the load balancing simulation
1253     findSimResults(statsData, LBSimulation::simProcs, migrateMsg, simResults);
1254
1255     // now we have the simulation data, so print it and loop
1256     CmiPrintf("Charm++> LBSim: Simulation of load balancing step %d done.\n",LBSimulation::simStep);
1257     // **CWL** Officially recording my disdain here for using ints for bool
1258     if (LBSimulation::showDecisionsOnly) {
1259       simResults->PrintDecisions(migrateMsg, simFileName, 
1260                                  LBSimulation::simProcs);
1261     } else {
1262       simResults->PrintSimulationResults();
1263     }
1264
1265     free(simFileName);
1266     delete migrateMsg;
1267     CmiPrintf("Charm++> LBSim: Passing to the next step\n");
1268   }
1269   // deallocate simResults
1270   delete simResults;
1271   CmiPrintf("Charm++> Exiting...\n");
1272   CkExit();
1273 }
1274
1275 void CentralLB::readStatsMsgs(const char* filename) 
1276 {
1277 #if CMK_LBDB_ON
1278   int i;
1279   FILE *f = fopen(filename, "r");
1280   if (f==NULL) {
1281     CmiPrintf("Fatal Error> Cannot open LB Dump file %s!\n", filename);
1282     CmiAbort("");
1283   }
1284
1285   // at this stage, we need to rebuild the statsMsgList and
1286   // statsDataList structures. For that first deallocate the
1287   // old structures
1288   if (statsMsgsList) {
1289     for(i = 0; i < stats_msg_count; i++)
1290       delete statsMsgsList[i];
1291     delete[] statsMsgsList;
1292     statsMsgsList=0;
1293   }
1294
1295   PUP::fromDisk pd(f);
1296   PUP::machineInfo machInfo;
1297
1298   pd((char *)&machInfo, sizeof(machInfo));      // read machine info
1299   PUP::xlater p(machInfo, pd);
1300
1301   if (_lb_args.lbversion() > 1) {
1302     p|_lb_args.lbversion();             // write version number
1303     CkPrintf("LB> File version detected: %d\n", _lb_args.lbversion());
1304     CmiAssert(_lb_args.lbversion() <= LB_FORMAT_VERSION);
1305   }
1306   p|stats_msg_count;
1307
1308   CmiPrintf("readStatsMsgs for %d pes starts ... \n", stats_msg_count);
1309   if (LBSimulation::simProcs == 0) LBSimulation::simProcs = stats_msg_count;
1310   if (LBSimulation::simProcs != stats_msg_count) LBSimulation::procsChanged = true;
1311
1312   // LBSimulation::simProcs must be set
1313   statsData->pup(p);
1314
1315   CmiPrintf("Simulation for %d pes \n", LBSimulation::simProcs);
1316   CmiPrintf("n_obj: %d n_migratble: %d \n", statsData->n_objs, statsData->n_migrateobjs);
1317
1318   // file f is closed in the destructor of PUP::fromDisk
1319   CmiPrintf("ReadStatsMsg from %s completed\n", filename);
1320 #endif
1321 }
1322
1323 void CentralLB::writeStatsMsgs(const char* filename) 
1324 {
1325 #if CMK_LBDB_ON
1326   FILE *f = fopen(filename, "w");
1327   if (f==NULL) {
1328     CmiPrintf("Fatal Error> writeStatsMsgs failed to open the output file %s!\n", filename);
1329     CmiAbort("");
1330   }
1331
1332   const PUP::machineInfo &machInfo = PUP::machineInfo::current();
1333   PUP::toDisk p(f);
1334   p((char *)&machInfo, sizeof(machInfo));       // machine info
1335
1336   p|_lb_args.lbversion();               // write version number
1337   p|stats_msg_count;
1338   statsData->pup(p);
1339
1340   fclose(f);
1341
1342   CmiPrintf("WriteStatsMsgs to %s succeed!\n", filename);
1343 #endif
1344 }
1345
1346 // calculate the predicted wallclock/cpu load for every processors
1347 // considering communication overhead if considerComm is true
1348 void getPredictedLoadWithMsg(BaseLB::LDStats* stats, int count, 
1349                       LBMigrateMsg *msg, LBInfo &info, 
1350                       int considerComm)
1351 {
1352 #if CMK_LBDB_ON
1353         stats->makeCommHash();
1354
1355         // update to_proc according to migration msgs
1356         for(int i = 0; i < msg->n_moves; i++) {
1357           MigrateInfo &mInfo = msg->moves[i];
1358           int idx = stats->getHash(mInfo.obj.objID(), mInfo.obj.omID());
1359           CmiAssert(idx != -1);
1360           stats->to_proc[idx] = mInfo.to_pe;
1361         }
1362
1363         info.getInfo(stats, count, considerComm);
1364 #endif
1365 }
1366
1367
1368 void CentralLB::findSimResults(LDStats* stats, int count, LBMigrateMsg* msg, LBSimulation* simResults)
1369 {
1370     CkAssert(simResults != NULL && count == simResults->numPes);
1371     // estimate the new loads of the processors. As a first approximation, this is the
1372     // sum of the cpu times of the objects on that processor
1373     double startT = CkWallTimer();
1374     getPredictedLoadWithMsg(stats, count, msg, simResults->lbinfo, 1);
1375     CmiPrintf("getPredictedLoad finished in %fs\n", CkWallTimer()-startT);
1376 }
1377
1378 void CentralLB::pup(PUP::er &p) { 
1379   BaseLB::pup(p); 
1380   if (p.isUnpacking())  {
1381     initLB(CkLBOptions(seqno)); 
1382   }
1383   p|reduction_started;
1384   int has_statsMsg=0;
1385   if (p.isPacking()) has_statsMsg = (statsMsg!=NULL);
1386   p|has_statsMsg;
1387   if (has_statsMsg) {
1388     if (p.isUnpacking())
1389       statsMsg = new CLBStatsMsg;
1390     statsMsg->pup(p);
1391   }
1392 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1393   p | lbDecisionCount;
1394   p | resumeCount;
1395 #endif
1396         
1397 }
1398
1399 int CentralLB::useMem() { 
1400   return sizeof(CentralLB) + statsData->useMem() + 
1401          CkNumPes() * sizeof(CLBStatsMsg *);
1402 }
1403
1404
1405 /**
1406   CLBStatsMsg is not a real message now.
1407   CLBStatsMsg is used for all processors to fill in their local load and comm
1408   statistics and send to processor 0
1409 */
1410
1411 CLBStatsMsg::CLBStatsMsg(int osz, int csz) {
1412   n_objs = osz;
1413   n_comm = csz;
1414   objData = new LDObjData[osz];
1415   commData = new LDCommData[csz];
1416   avail_vector = NULL;
1417 }
1418
1419 CLBStatsMsg::~CLBStatsMsg() {
1420   delete [] objData;
1421   delete [] commData;
1422   delete [] avail_vector;
1423 }
1424
1425 void CLBStatsMsg::pup(PUP::er &p) {
1426   int i;
1427   p|from_pe;
1428   p|pe_speed;
1429   p|total_walltime;
1430   p|idletime;
1431   p|bg_walltime;
1432 #if CMK_LB_CPUTIMER
1433   p|total_cputime;
1434   p|bg_cputime;
1435 #endif
1436 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1437   p | step;
1438 #endif
1439   p|n_objs;
1440   if (p.isUnpacking()) objData = new LDObjData[n_objs];
1441   for (i=0; i<n_objs; i++) p|objData[i];
1442   p|n_comm;
1443   if (p.isUnpacking()) commData = new LDCommData[n_comm];
1444   for (i=0; i<n_comm; i++) p|commData[i];
1445
1446   int has_avail_vector;
1447   if (!p.isUnpacking()) has_avail_vector = (avail_vector != NULL);
1448   p|has_avail_vector;
1449   if (p.isUnpacking()) {
1450     if (has_avail_vector) avail_vector = new char[CkNumPes()];
1451     else avail_vector = NULL;
1452   }
1453   if (has_avail_vector) p(avail_vector, CkNumPes());
1454
1455   p(next_lb);
1456 }
1457
1458 // CkMarshalledCLBStatsMessage is used in the marshalled parameter in
1459 // the entry function, it is just used to use to pup.
1460 // I don't use CLBStatsMsg directly as marshalled parameter because
1461 // I want the data pointer stored and not to be freed by the Charm++.
1462 void CkMarshalledCLBStatsMessage::free() { 
1463   int count = msgs.size();
1464   for  (int i=0; i<count; i++) {
1465     delete msgs[i];
1466     msgs[i] = NULL;
1467   }
1468   msgs.free();
1469 }
1470
1471 void CkMarshalledCLBStatsMessage::add(CkMarshalledCLBStatsMessage &m)
1472 {
1473   int count = m.getCount();
1474   for (int i=0; i<count; i++) add(m.getMessage(i));
1475 }
1476
1477 void CkMarshalledCLBStatsMessage::pup(PUP::er &p)
1478 {
1479   int count = msgs.size();
1480   p|count;
1481   for (int i=0; i<count; i++) {
1482     CLBStatsMsg *msg;
1483     if (p.isUnpacking()) msg = new CLBStatsMsg;
1484     else { 
1485       msg = msgs[i]; CmiAssert(msg!=NULL);
1486     }
1487     msg->pup(p);
1488     if (p.isUnpacking()) add(msg);
1489   }
1490 }
1491
1492 SpanningTree::SpanningTree()
1493 {
1494         double sq = sqrt(CkNumPes()*4.0-3.0) - 1; // 1 + arity + arity*arity = CkNumPes()
1495         arity = (int)ceil(sq/2);
1496         calcParent(CkMyPe());
1497         calcNumChildren(CkMyPe());
1498 }
1499
1500 void SpanningTree::calcParent(int n)
1501 {
1502         parent=-1;
1503         if(n != 0  && arity > 0)
1504                 parent = (n-1)/arity;
1505 }
1506
1507 void SpanningTree::calcNumChildren(int n)
1508 {
1509         numChildren = 0;
1510         if (arity == 0) return;
1511         int fullNode=(CkNumPes()-1-arity)/arity;
1512         if(n <= fullNode)
1513                 numChildren = arity;
1514         if(n == fullNode+1)
1515                 numChildren = CkNumPes()-1-(fullNode+1)*arity;
1516         if(n > fullNode+1)
1517                 numChildren = 0;
1518 }
1519
1520 #include "CentralLB.def.h"
1521  
1522 /*@}*/