doc: Add serial to list of ci file reserved words
[charm.git] / src / ck-ldb / CentralLB.C
1
2 /**
3  * \addtogroup CkLdb
4 */
5 /*@{*/
6
7 #include <charm++.h>
8 #include "ck.h"
9 #include "envelope.h"
10 #include "CentralLB.h"
11 #include "LBDBManager.h"
12 #include "LBSimulation.h"
13
14 #define  DEBUGF(x)       // CmiPrintf x;
15 #define  DEBUG(x)        // x;
16
17 #if CMK_MEM_CHECKPOINT
18    /* can not handle reduction in inmem FT */
19 #define USE_REDUCTION         0
20 #define USE_LDB_SPANNING_TREE 0
21 #else
22 #define USE_REDUCTION         1
23 #define USE_LDB_SPANNING_TREE 1
24 #endif
25
26 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
27 extern int _restartFlag;
28 extern void getGlobalStep(CkGroupID );
29 extern void initMlogLBStep(CkGroupID );
30 extern int globalResumeCount;
31 extern void sendDummyMigrationCounts(int *);
32 #endif
33
34 #if CMK_GRID_QUEUE_AVAILABLE
35 CpvExtern(void *, CkGridObject);
36 #endif
37
38 #if CMK_GLOBAL_LOCATION_UPDATE      
39 extern void UpdateLocation(MigrateInfo& migData); 
40 #endif
41
42 CkGroupID loadbalancer;
43 int * lb_ptr;
44 int load_balancer_created;
45
46 CreateLBFunc_Def(CentralLB, "CentralLB base class")
47
48 static void getPredictedLoadWithMsg(BaseLB::LDStats* stats, int count, 
49                              LBMigrateMsg *, LBInfo &info, int considerComm);
50
51 /*
52 void CreateCentralLB()
53 {
54   CProxy_CentralLB::ckNew(0);
55 }
56 */
57
58 void CentralLB::staticStartLB(void* data)
59 {
60   CentralLB *me = (CentralLB*)(data);
61   me->StartLB();
62 }
63
64 void CentralLB::staticMigrated(void* data, LDObjHandle h, int waitBarrier)
65 {
66   CentralLB *me = (CentralLB*)(data);
67   me->Migrated(h, waitBarrier);
68 }
69
70 void CentralLB::staticAtSync(void* data)
71 {
72   CentralLB *me = (CentralLB*)(data);
73   me->AtSync();
74 }
75
76 void CentralLB::initLB(const CkLBOptions &opt)
77 {
78 #if CMK_LBDB_ON
79   lbname = "CentralLB";
80   thisProxy = CProxy_CentralLB(thisgroup);
81   //  CkPrintf("Construct in %d\n",CkMyPe());
82
83   // create and turn on by default
84   receiver = theLbdb->
85     AddLocalBarrierReceiver((LDBarrierFn)(staticAtSync),(void*)(this));
86   notifier = theLbdb->getLBDB()->
87     NotifyMigrated((LDMigratedFn)(staticMigrated),(void*)(this));
88   startLbFnHdl = theLbdb->getLBDB()->
89     AddStartLBFn((LDStartLBFn)(staticStartLB),(void*)(this));
90
91   // CkPrintf("[%d] CentralLB initLB \n",CkMyPe());
92   if (opt.getSeqNo() > 0) turnOff();
93
94   stats_msg_count = 0;
95   statsMsgsList = NULL;
96   statsData = NULL;
97
98   storedMigrateMsg = NULL;
99   reduction_started = 0;
100
101   // for future predictor
102   if (_lb_predict) predicted_model = new FutureModel(_lb_predict_window);
103   else predicted_model=0;
104   // register user interface callbacks
105   theLbdb->getLBDB()->SetupPredictor((LDPredictModelFn)(staticPredictorOn),(LDPredictWindowFn)(staticPredictorOnWin),(LDPredictFn)(staticPredictorOff),(LDPredictModelFn)(staticChangePredictor),(void*)(this));
106
107   myspeed = theLbdb->ProcessorSpeed();
108
109   migrates_completed = 0;
110   future_migrates_completed = 0;
111   migrates_expected = -1;
112   future_migrates_expected = -1;
113   cur_ld_balancer = _lb_args.central_pe();      // 0 default
114   lbdone = 0;
115   count_msgs=0;
116   statsMsg = NULL;
117
118   if (_lb_args.statsOn()) theLbdb->CollectStatsOn();
119
120   load_balancer_created = 1;
121 #endif
122 }
123
124 CentralLB::~CentralLB()
125 {
126 #if CMK_LBDB_ON
127   delete [] statsMsgsList;
128   delete statsData;
129   theLbdb = CProxy_LBDatabase(_lbdb).ckLocalBranch();
130   if (theLbdb) {
131     theLbdb->getLBDB()->
132       RemoveNotifyMigrated(notifier);
133     theLbdb->
134       RemoveStartLBFn((LDStartLBFn)(staticStartLB));
135   }
136 #endif
137 }
138
139 void CentralLB::turnOn() 
140 {
141 #if CMK_LBDB_ON
142   theLbdb->getLBDB()->
143     TurnOnBarrierReceiver(receiver);
144   theLbdb->getLBDB()->
145     TurnOnNotifyMigrated(notifier);
146   theLbdb->getLBDB()->
147     TurnOnStartLBFn(startLbFnHdl);
148 #endif
149 }
150
151 void CentralLB::turnOff() 
152 {
153 #if CMK_LBDB_ON
154   theLbdb->getLBDB()->
155     TurnOffBarrierReceiver(receiver);
156   theLbdb->getLBDB()->
157     TurnOffNotifyMigrated(notifier);
158   theLbdb->getLBDB()->
159     TurnOffStartLBFn(startLbFnHdl);
160 #endif
161 }
162
163 void CentralLB::AtSync()
164 {
165 #if CMK_LBDB_ON
166   DEBUGF(("[%d] CentralLB AtSync step %d!!!!!\n",CkMyPe(),step()));
167 #if CMK_MEM_CHECKPOINT  
168   CkSetInLdb();
169 #endif
170 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
171         CpvAccess(_currentObj)=this;
172 #endif
173
174   // if num of processor is only 1, nothing should happen
175   if (!QueryBalanceNow(step()) || CkNumPes() == 1) {
176     MigrationDone(0);
177     return;
178   }
179   if(CmiNodeAlive(CkMyPe())){
180     thisProxy [CkMyPe()].ProcessAtSync();
181   }
182 #endif
183 }
184
185 #include "ComlibStrategy.h"
186
187 void CentralLB::ProcessAtSync()
188 {
189 #if CMK_LBDB_ON
190   if (reduction_started) return;              // reducton in progress
191
192   CmiAssert(CmiNodeAlive(CkMyPe()));
193   if (CkMyPe() == cur_ld_balancer) {
194     start_lb_time = CkWallTimer();
195   }
196
197 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
198         initMlogLBStep(thisgroup);
199 #endif
200
201   // build message
202   BuildStatsMsg();
203
204 #if USE_REDUCTION
205     // reduction to get total number of objects and comm
206     // so that processor 0 can pre-allocate load balancing database
207   int counts[2];
208   counts[0] = theLbdb->GetObjDataSz();
209   counts[1] = theLbdb->GetCommDataSz();
210
211   CkCallback cb(CkIndex_CentralLB::ReceiveCounts((CkReductionMsg*)NULL), 
212                   thisProxy[0]);
213   contribute(2*sizeof(int), counts, CkReduction::sum_int, cb);
214   reduction_started = 1;
215 #else
216   SendStats();
217 #endif
218 #endif
219 }
220
221 // called only on 0
222 void CentralLB::ReceiveCounts(CkReductionMsg  *msg)
223 {
224   CmiAssert(CkMyPe() == 0);
225   if (statsData == NULL) statsData = new LDStats;
226
227   int *counts = (int *)msg->getData();
228   int n_objs = counts[0];
229   int n_comm = counts[1];
230
231     // resize database
232   statsData->objData.resize(n_objs);
233   statsData->from_proc.resize(n_objs);
234   statsData->to_proc.resize(n_objs);
235   statsData->commData.resize(n_comm);
236
237   DEBUGF(("[%d] ReceiveCounts: n_objs:%d n_comm:%d\n",CkMyPe(), n_objs, n_comm));
238         
239     // broadcast call to let everybody start to send stats
240   thisProxy.SendStats();
241 }
242
243 void CentralLB::BuildStatsMsg()
244 {
245 #if CMK_LBDB_ON
246   // build and send stats
247   const int osz = theLbdb->GetObjDataSz();
248   const int csz = theLbdb->GetCommDataSz();
249
250   int npes = CkNumPes();
251   CLBStatsMsg* msg = new CLBStatsMsg(osz, csz);
252   _MEMCHECK(msg);
253   msg->from_pe = CkMyPe();
254 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
255         msg->step = step();
256 #endif
257   //msg->serial = CrnRand();
258
259 /*
260   theLbdb->TotalTime(&msg->total_walltime,&msg->total_cputime);
261   theLbdb->IdleTime(&msg->idletime);
262   theLbdb->BackgroundLoad(&msg->bg_walltime,&msg->bg_cputime);
263 */
264 #if CMK_LB_CPUTIMER
265   theLbdb->GetTime(&msg->total_walltime,&msg->total_cputime,
266                    &msg->idletime, &msg->bg_walltime,&msg->bg_cputime);
267 #else
268   theLbdb->GetTime(&msg->total_walltime,&msg->total_walltime,
269                    &msg->idletime, &msg->bg_walltime,&msg->bg_walltime);
270 #endif
271
272   msg->pe_speed = myspeed;
273   DEBUGF(("Processor %d Total time (wall,cpu) = %f %f Idle = %f Bg = %f %f\n", CkMyPe(),msg->total_walltime,msg->total_cputime,msg->idletime,msg->bg_walltime,msg->bg_cputime));
274
275   msg->n_objs = osz;
276   theLbdb->GetObjData(msg->objData);
277   msg->n_comm = csz;
278   theLbdb->GetCommData(msg->commData);
279 //  theLbdb->ClearLoads();
280   DEBUGF(("PE %d BuildStatsMsg %d objs, %d comm\n",CkMyPe(),msg->n_objs,msg->n_comm));
281
282   if(CkMyPe() == cur_ld_balancer) {
283     msg->avail_vector = new char[CkNumPes()];
284     LBDatabaseObj()->get_avail_vector(msg->avail_vector);
285     msg->next_lb = LBDatabaseObj()->new_lbbalancer();
286   }
287
288   CmiAssert(statsMsg == NULL);
289   statsMsg = msg;
290 #endif
291 }
292
293
294 // called on every processor
295 void CentralLB::SendStats()
296 {
297 #if CMK_LBDB_ON
298   CmiAssert(statsMsg != NULL);
299   reduction_started = 0;
300
301 #if USE_LDB_SPANNING_TREE
302   if(CkNumPes()>1024)
303   {
304     if (CkMyPe() == cur_ld_balancer)
305       thisProxy[CkMyPe()].ReceiveStats(statsMsg);
306     else
307       thisProxy[CkMyPe()].ReceiveStatsViaTree(statsMsg);
308   }
309   else
310 #endif
311   {
312     DEBUGF(("[%d] calling ReceiveStats on step %d \n",CmiMyPe(),step()));
313     thisProxy[cur_ld_balancer].ReceiveStats(statsMsg);
314   }
315
316   statsMsg = NULL;
317
318 #ifdef __BIGSIM__
319   BgEndStreaming();
320 #endif
321
322   {
323   // enfore the barrier to wait until centralLB says no
324   LDOMHandle h;
325   h.id.id.idx = 0;
326   theLbdb->getLBDB()->RegisteringObjects(h);
327   }
328 #endif
329 }
330
331 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
332 extern int donotCountMigration;
333 #endif
334
335 void CentralLB::Migrated(LDObjHandle h, int waitBarrier)
336 {
337 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
338     if(donotCountMigration){
339         return ;
340     }
341 #endif
342
343 #if CMK_LBDB_ON
344   if (waitBarrier) {
345             migrates_completed++;
346       DEBUGF(("[%d] An object migrated! %d %d\n",CkMyPe(),migrates_completed,migrates_expected));
347     if (migrates_completed == migrates_expected) {
348       MigrationDone(1);
349     }
350   }
351   else {
352     future_migrates_completed ++;
353     DEBUGF(("[%d] An object migrated with no barrier! %d expected: %d\n",CkMyPe(),future_migrates_completed,future_migrates_expected));
354     if (future_migrates_completed == future_migrates_expected)  {
355         CheckMigrationComplete();
356     }
357   }
358 #endif
359 }
360
361 void CentralLB::MissMigrate(int waitForBarrier)
362 {
363   LDObjHandle h;
364   Migrated(h, waitForBarrier);
365 }
366
367 // build a complete data from bufferred messages
368 // not used when USE_REDUCTION = 1
369 void CentralLB::buildStats()
370 {
371     statsData->nprocs() = stats_msg_count;
372     // allocate space
373     statsData->objData.resize(statsData->n_objs);
374     statsData->from_proc.resize(statsData->n_objs);
375     statsData->to_proc.resize(statsData->n_objs);
376     statsData->commData.resize(statsData->n_comm);
377
378     int nobj = 0;
379     int ncom = 0;
380     int nmigobj = 0;
381     // copy all data in individule message to this big structure
382     for (int pe=0; pe<CkNumPes(); pe++) {
383        int i;
384        CLBStatsMsg *msg = statsMsgsList[pe];
385        if(msg == NULL) continue;
386        for (i=0; i<msg->n_objs; i++) {
387          statsData->from_proc[nobj] = statsData->to_proc[nobj] = pe;
388          statsData->objData[nobj] = msg->objData[i];
389          if (msg->objData[i].migratable) nmigobj++;
390          nobj++;
391        }
392        for (i=0; i<msg->n_comm; i++) {
393          statsData->commData[ncom] = msg->commData[i];
394          ncom++;
395        }
396        // free the memory
397        delete msg;
398        statsMsgsList[pe]=0;
399     }
400     statsData->n_migrateobjs = nmigobj;
401 }
402
403 // deposit one processor data at a time, note database is pre-allocated
404 // to have enough space
405 // used when USE_REDUCTION = 1
406 void CentralLB::depositData(CLBStatsMsg *m)
407 {
408   int i;
409   if (m == NULL) return;
410
411   const int pe = m->from_pe;
412   struct ProcStats &procStat = statsData->procs[pe];
413   procStat.pe = pe;
414   procStat.total_walltime = m->total_walltime;
415   procStat.idletime = m->idletime;
416   procStat.bg_walltime = m->bg_walltime;
417 #if CMK_LB_CPUTIMER
418   procStat.total_cputime = m->total_cputime;
419   procStat.bg_cputime = m->bg_cputime;
420 #endif
421   procStat.pe_speed = m->pe_speed;
422   //procStat.utilization = 1.0;
423   procStat.available = CmiTrue;
424   procStat.n_objs = m->n_objs;
425
426   int &nobj = statsData->n_objs;
427   int &nmigobj = statsData->n_migrateobjs;
428   for (i=0; i<m->n_objs; i++) {
429       statsData->from_proc[nobj] = statsData->to_proc[nobj] = pe;
430       statsData->objData[nobj] = m->objData[i];
431       if (m->objData[i].migratable) nmigobj++;
432       nobj++;
433       CmiAssert(nobj <= statsData->objData.capacity());
434   }
435   int &n_comm = statsData->n_comm;
436   for (i=0; i<m->n_comm; i++) {
437       statsData->commData[n_comm] = m->commData[i];
438       n_comm++;
439       CmiAssert(n_comm <= statsData->commData.capacity());
440   }
441   delete m;
442 }
443
444 void CentralLB::ReceiveStats(CkMarshalledCLBStatsMessage &msg)
445 {
446 #if CMK_LBDB_ON
447   if (statsMsgsList == NULL) {
448     statsMsgsList = new CLBStatsMsg*[CkNumPes()];
449     CmiAssert(statsMsgsList != NULL);
450     for(int i=0; i < CkNumPes(); i++)
451       statsMsgsList[i] = 0;
452   }
453   if (statsData == NULL) statsData = new LDStats;
454
455     //  loop through all CLBStatsMsg in the incoming msg
456   int count = msg.getCount();
457   for (int num = 0; num < count; num++) 
458   {
459     CLBStatsMsg *m = msg.getMessage(num);
460     CmiAssert(m!=NULL);
461     const int pe = m->from_pe;
462     DEBUGF(("Stats msg received, %d %d %d %p step %d\n", pe,stats_msg_count,m->n_objs,m,step()));
463 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))     
464 /*      
465  *  if(m->step < step()){
466  *    //TODO: if a processor is redoing an old load balance step..
467  *    //tell it that the step is done and that it should not perform any migrations
468  *      thisProxy[pe].ReceiveDummyMigration();
469  *  }*/
470 #endif
471         
472     if(!CmiNodeAlive(pe)){
473         DEBUGF(("[%d] ReceiveStats called from invalidProcessor %d\n",CkMyPe(),pe));
474         continue;
475     }
476         
477     if (m->avail_vector!=NULL) {
478       LBDatabaseObj()->set_avail_vector(m->avail_vector,  m->next_lb);
479     }
480
481     if (statsMsgsList[pe] != 0) {
482       CkPrintf("*** Unexpected CLBStatsMsg in ReceiveStats from PE %d ***\n",
483              pe);
484     } else {
485       statsMsgsList[pe] = m;
486 #if USE_REDUCTION
487       depositData(m);
488 #else
489       // store per processor data right away
490       struct ProcStats &procStat = statsData->procs[pe];
491       procStat.pe = pe;
492       procStat.total_walltime = m->total_walltime;
493       procStat.idletime = m->idletime;
494       procStat.bg_walltime = m->bg_walltime;
495 #if CMK_LB_CPUTIMER
496       procStat.total_cputime = m->total_cputime;
497       procStat.bg_cputime = m->bg_cputime;
498 #endif
499       procStat.pe_speed = m->pe_speed;
500       //procStat.utilization = 1.0;
501       procStat.available = CmiTrue;
502       procStat.n_objs = m->n_objs;
503
504       statsData->n_objs += m->n_objs;
505       statsData->n_comm += m->n_comm;
506 #endif
507       stats_msg_count++;
508     }
509   }    // end of for
510
511   const int clients = CkNumValidPes();
512   DEBUGF(("THIS POINT count = %d, clients = %d\n",stats_msg_count,clients));
513  
514   if (stats_msg_count == clients) {
515         DEBUGF(("[%d] All stats messages received \n",CmiMyPe()));
516     statsData->nprocs() = stats_msg_count;
517     thisProxy[CkMyPe()].LoadBalance();
518   }
519 #endif
520 }
521
522 /** added by Abhinav for receiving msgs via spanning tree */
523 void CentralLB::ReceiveStatsViaTree(CkMarshalledCLBStatsMessage &msg)
524 {
525 #if CMK_LBDB_ON
526         CmiAssert(CkMyPe() != 0);
527         bufMsg.add(msg);         // buffer messages
528         count_msgs++;
529         //CkPrintf("here %d\n", CkMyPe());
530         if (count_msgs == st.numChildren+1) {
531                 if(st.parent == 0)
532                 {
533                         thisProxy[0].ReceiveStats(bufMsg);
534                         //CkPrintf("from %d\n", CkMyPe());
535                 }
536                 else
537                         thisProxy[st.parent].ReceiveStatsViaTree(bufMsg);
538                 count_msgs = 0;
539                 bufMsg.free();
540         } 
541 #endif
542 }
543
544 void CentralLB::LoadBalance()
545 {
546 #if CMK_LBDB_ON
547   int proc;
548   const int clients = CkNumPes();
549
550 #if ! USE_REDUCTION
551   // build data
552   buildStats();
553 #else
554   for (proc = 0; proc < clients; proc++) statsMsgsList[proc] = NULL;
555 #endif
556
557   theLbdb->ResetAdaptive();
558   if (!_lb_args.samePeSpeed()) statsData->normalize_speed();
559
560   if (_lb_args.debug()) 
561       CmiPrintf("\nCharmLB> %s: PE [%d] step %d starting at %f Memory: %f MB\n",
562                   lbname, cur_ld_balancer, step(), start_lb_time,
563                   CmiMemoryUsage()/(1024.0*1024.0));
564
565   // if we are in simulation mode read data
566   if (LBSimulation::doSimulation) simulationRead();
567
568   char *availVector = LBDatabaseObj()->availVector();
569   for(proc = 0; proc < clients; proc++)
570       statsData->procs[proc].available = (CmiBool)availVector[proc];
571
572   preprocess(statsData);
573
574 //    CkPrintf("Before Calling Strategy\n");
575
576   if (_lb_args.printSummary()) {
577       LBInfo info(clients);
578         // not take comm data
579       info.getInfo(statsData, clients, 0);
580       LBRealType mLoad, mCpuLoad, totalLoad;
581       info.getSummary(mLoad, mCpuLoad, totalLoad);
582       int nmsgs, nbytes;
583       statsData->computeNonlocalComm(nmsgs, nbytes);
584       CkPrintf("[%d] Load Summary (before LB): max (with bg load): %f max (obj only): %f average: %f at step %d nonlocal: %d msgs %.2fKB.\n", CkMyPe(), mLoad, mCpuLoad, totalLoad/clients, step(), nmsgs, 1.0*nbytes/1024);
585 //      if (_lb_args.debug() > 1) {
586 //        for (int i=0; i<statsData->n_objs; i++)
587 //          CmiPrintf("[%d] %.10f %.10f\n", i, statsData->objData[i].minWall, statsData->objData[i].maxWall);
588 //      }
589   }
590
591 #if CMK_REPLAYSYSTEM
592   LDHandle *loadBalancer_pointers;
593   if (_replaySystem) {
594     loadBalancer_pointers = (LDHandle*)malloc(CkNumPes()*sizeof(LDHandle));
595     for (int i=0; i<statsData->n_objs; ++i) loadBalancer_pointers[statsData->from_proc[i]] = statsData->objData[i].handle.omhandle.ldb;
596   }
597 #endif
598   
599   LBMigrateMsg* migrateMsg = Strategy(statsData);
600 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
601         migrateMsg->step = step();
602 #endif
603
604 #if CMK_REPLAYSYSTEM
605   CpdHandleLBMessage(&migrateMsg);
606   if (_replaySystem) {
607     for (int i=0; i<migrateMsg->n_moves; ++i) migrateMsg->moves[i].obj.omhandle.ldb = loadBalancer_pointers[migrateMsg->moves[i].from_pe];
608     free(loadBalancer_pointers);
609   }
610 #endif
611   
612   LBDatabaseObj()->get_avail_vector(migrateMsg->avail_vector);
613   migrateMsg->next_lb = LBDatabaseObj()->new_lbbalancer();
614
615   // if this is the step at which we need to dump the database
616   simulationWrite();
617
618 //  calculate predicted load
619 //  very time consuming though, so only happen when debugging is on
620   if (_lb_args.printSummary()) {
621       LBInfo info(clients);
622         // not take comm data
623       getPredictedLoadWithMsg(statsData, clients, migrateMsg, info, 0);
624       LBRealType mLoad, mCpuLoad, totalLoad;
625       info.getSummary(mLoad, mCpuLoad, totalLoad);
626       int nmsgs, nbytes;
627       statsData->computeNonlocalComm(nmsgs, nbytes);
628       CkPrintf("[%d] Load Summary (after LB): max (with bg load): %f max (obj only): %f average: %f at step %d nonlocal: %d msgs %.2fKB useMem: %.2fKB.\n", CkMyPe(), mLoad, mCpuLoad, totalLoad/clients, step(), nmsgs, 1.0*nbytes/1024, (1.0*useMem())/1024);
629       for (int i=0; i<clients; i++)
630         migrateMsg->expectedLoad[i] = info.peLoads[i];
631   }
632
633   DEBUGF(("[%d]calling recv migration\n",CkMyPe()));
634 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_)) 
635     lbDecisionCount++;
636     migrateMsg->lbDecisionCount = lbDecisionCount;
637 #endif
638
639   envelope *env = UsrToEnv(migrateMsg);
640   if (1) {
641       // broadcast
642     thisProxy.ReceiveMigration(migrateMsg);
643   }
644   else {
645     // split the migration for each processor
646     for (int p=0; p<CkNumPes(); p++) {
647       LBMigrateMsg *m = extractMigrateMsg(migrateMsg, p);
648       thisProxy[p].ReceiveMigration(m);
649     }
650     delete migrateMsg;
651   }
652
653   // Zero out data structures for next cycle
654   // CkPrintf("zeroing out data\n");
655   statsData->clear();
656   stats_msg_count=0;
657 #endif
658 }
659
660 // test if sender and receiver in a commData is nonmigratable.
661 static int isMigratable(LDObjData **objData, int *len, int count, const LDCommData &commData)
662 {
663 #if CMK_LBDB_ON
664   for (int pe=0 ; pe<count; pe++)
665   {
666     for (int i=0; i<len[pe]; i++)
667       if (LDObjIDEqual(objData[pe][i].objID(), commData.sender.objID()) ||
668           LDObjIDEqual(objData[pe][i].objID(), commData.receiver.get_destObj().objID())) 
669       return 0;
670   }
671 #endif
672   return 1;
673 }
674
675 // rebuild LDStats and remove all non-migratble objects and related things
676 void CentralLB::removeNonMigratable(LDStats* stats, int count)
677 {
678   int i;
679
680   // check if we have non-migratable objects
681   int have = 0;
682   for (i=0; i<stats->n_objs; i++) 
683   {
684     LDObjData &odata = stats->objData[i];
685     if (!odata.migratable) {
686       have = 1; break;
687     }
688   }
689   if (have == 0) return;
690
691   CkVec<LDObjData> nonmig;
692   CkVec<int> new_from_proc, new_to_proc;
693   nonmig.resize(stats->n_migrateobjs);
694   new_from_proc.resize(stats->n_migrateobjs);
695   new_to_proc.resize(stats->n_migrateobjs);
696   int n_objs = 0;
697   for (i=0; i<stats->n_objs; i++) 
698   {
699     LDObjData &odata = stats->objData[i];
700     if (odata.migratable) {
701       nonmig[n_objs] = odata;
702       new_from_proc[n_objs] = stats->from_proc[i];
703       new_to_proc[n_objs] = stats->to_proc[i];
704       n_objs ++;
705     }
706     else {
707       stats->procs[stats->from_proc[i]].bg_walltime += odata.wallTime;
708 #if CMK_LB_CPUTIMER
709       stats->procs[stats->from_proc[i]].bg_cputime += odata.cpuTime;
710 #endif
711     }
712   }
713   CmiAssert(stats->n_migrateobjs == n_objs);
714
715   stats->makeCommHash();
716   
717   CkVec<LDCommData> newCommData;
718   newCommData.resize(stats->n_comm);
719   int n_comm = 0;
720   for (i=0; i<stats->n_comm; i++) 
721   {
722     LDCommData& cdata = stats->commData[i];
723     if (!cdata.from_proc()) 
724     {
725       int idx = stats->getSendHash(cdata);
726       CmiAssert(idx != -1);
727       if (!stats->objData[idx].migratable) continue;
728     }
729     switch (cdata.receiver.get_type()) {
730     case LD_PROC_MSG:
731       break;
732     case LD_OBJ_MSG:  {
733       int idx = stats->getRecvHash(cdata);
734       if (stats->complete_flag)
735         CmiAssert(idx != -1);
736       else if (idx == -1) continue;          // receiver not in this group
737       if (!stats->objData[idx].migratable) continue;
738       break;
739       }
740     case LD_OBJLIST_MSG:    // object message FIXME add multicast
741       break;
742     }
743     newCommData[n_comm] = cdata;
744     n_comm ++;
745   }
746
747   if (n_objs != stats->n_objs) CmiPrintf("Removed %d nonmigratable %d comms - n_objs:%d migratable:%d\n", stats->n_objs-n_objs, stats->n_objs, stats->n_migrateobjs, stats->n_comm-n_comm);
748
749   // swap to new data
750   stats->objData = nonmig;
751   stats->from_proc = new_from_proc;
752   stats->to_proc = new_to_proc;
753   stats->n_objs = n_objs;
754
755   stats->commData = newCommData;
756   stats->n_comm = n_comm;
757
758   stats->deleteCommHash();
759   stats->makeCommHash();
760
761 }
762
763
764 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
765 extern int restarted;
766 #endif
767
768 void CentralLB::ReceiveMigration(LBMigrateMsg *m)
769 {
770   storedMigrateMsg = m;
771 #if CMK_MEM_CHECKPOINT
772   CkResetInLdb();
773 #endif
774 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
775         restoreParallelRecovery(&resumeAfterRestoreParallelRecovery,(void *)this);
776 #else
777   CkCallback cb(CkIndex_CentralLB::ProcessReceiveMigration((CkReductionMsg*)NULL),
778                   thisProxy);
779   contribute(0, NULL, CkReduction::max_int, cb);
780 #endif
781 }
782
783 void CentralLB::ProcessReceiveMigration(CkReductionMsg  *msg)
784 {
785 #if CMK_LBDB_ON
786         int i;
787         LBMigrateMsg *m = storedMigrateMsg;
788         CmiAssert(m!=NULL);
789         delete msg;
790
791 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
792         int *dummyCounts;
793
794         DEBUGF(("[%d] Starting ReceiveMigration WITH step %d m->step %d\n",CkMyPe(),step(),m->step));
795         // CmiPrintf("[%d] Starting ReceiveMigration step %d m->step %d\n",CkMyPe(),step(),m->step);
796         if(step() > m->step){
797                 char str[100];
798                 envelope *env = UsrToEnv(m);
799                 return;
800         }
801         lbDecisionCount = m->lbDecisionCount;
802 #endif
803
804   if (_lb_args.debug() > 1) 
805     if (CkMyPe()%1024==0) CmiPrintf("[%d] Starting ReceiveMigration step %d at %f\n",CkMyPe(),step(), CmiWallTimer());
806
807   for (i=0; i<CkNumPes(); i++) theLbdb->lastLBInfo.expectedLoad[i] = m->expectedLoad[i];
808   CmiAssert(migrates_expected <= 0 || migrates_completed == migrates_expected);
809 /*FAULT_EVAC*/
810   if(!CmiNodeAlive(CkMyPe())){
811         delete m;
812         return;
813   }
814   migrates_expected = 0;
815   future_migrates_expected = 0;
816 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
817         int sending=0;
818     int dummy=0;
819         LBDB *_myLBDB = theLbdb->getLBDB();
820         if(_restartFlag){
821         dummyCounts = new int[CmiNumPes()];
822         bzero(dummyCounts,sizeof(int)*CmiNumPes());
823     }
824 #endif
825   for(i=0; i < m->n_moves; i++) {
826     MigrateInfo& move = m->moves[i];
827     const int me = CkMyPe();
828     if (move.from_pe == me && move.to_pe != me) {
829       DEBUGF(("[%d] migrating object to %d\n",move.from_pe,move.to_pe));
830       // migrate object, in case it is already gone, inform toPe
831 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
832       if (theLbdb->Migrate(move.obj,move.to_pe) == 0) 
833          thisProxy[move.to_pe].MissMigrate(!move.async_arrival);
834 #else
835             if(_restartFlag == 0){
836                 DEBUG(CmiPrintf("[%d] need to move object from %d to %d \n",CkMyPe(),move.from_pe,move.to_pe));
837                 theLbdb->Migrate(move.obj,move.to_pe);
838                 sending++;
839             }else{
840                 if(_myLBDB->validObjHandle(move.obj)){
841                     DEBUG(CmiPrintf("[%d] need to move object from %d to %d \n",CkMyPe(),move.from_pe,move.to_pe));
842                     theLbdb->Migrate(move.obj,move.to_pe);
843                     sending++;
844                 }else{
845                     DEBUG(CmiPrintf("[%d] dummy move to pe %d detected after restart \n",CmiMyPe(),move.to_pe));
846                     dummyCounts[move.to_pe]++;
847                     dummy++;
848                 }
849             }
850 #endif
851     } else if (move.from_pe != me && move.to_pe == me) {
852        DEBUGF(("[%d] expecting object from %d\n",move.to_pe,move.from_pe));
853       if (!move.async_arrival) migrates_expected++;
854       else future_migrates_expected++;
855     }
856     else {
857 #if CMK_GLOBAL_LOCATION_UPDATE      
858       UpdateLocation(move); 
859 #endif
860     }
861
862   }
863   DEBUGF(("[%d] in ReceiveMigration %d moves expected: %d future expected: %d\n",CkMyPe(),m->n_moves, migrates_expected, future_migrates_expected));
864   // if (_lb_debug) CkPrintf("[%d] expecting %d objects migrating.\n", CkMyPe(), migrates_expected);
865
866 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
867         if(_restartFlag){
868                 sendDummyMigrationCounts(dummyCounts);
869                 _restartFlag  =0;
870         delete []dummyCounts;
871         }
872 #endif
873
874
875 #if 0
876   if (m->n_moves ==0) {
877     theLbdb->SetLBPeriod(theLbdb->GetLBPeriod()*2);
878   }
879 #endif
880   cur_ld_balancer = m->next_lb;
881   if((CkMyPe() == cur_ld_balancer) && (cur_ld_balancer != 0)){
882       LBDatabaseObj()->set_avail_vector(m->avail_vector, -2);
883   }
884
885   if (migrates_expected == 0 || migrates_completed == migrates_expected)
886     MigrationDone(1);
887   delete m;
888
889 //      CkEvacuatedElement();
890 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
891 //  migrates_expected = 0;
892 //  //  ResumeClients(1);
893 #endif
894 #endif
895 }
896
897 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
898 void CentralLB::ReceiveDummyMigration(int globalDecisionCount){
899     DEBUGF(("[%d] ReceiveDummyMigration called for step %d with globalDecisionCount %d\n",CkMyPe(),step(),globalDecisionCount));
900     //TODO: this is gonna be important when a crash happens during checkpoint
901     //the globalDecisionCount would have to be saved and compared against
902     //a future recvMigration
903                 
904         thisProxy[CkMyPe()].ResumeClients(1);
905 }
906 #endif
907
908 void CentralLB::MigrationDone(int balancing)
909 {
910 #if CMK_LBDB_ON
911   migrates_completed = 0;
912   migrates_expected = -1;
913   // clear load stats
914   if (balancing) theLbdb->ClearLoads();
915   // Increment to next step
916   theLbdb->incStep();
917         DEBUGF(("[%d] Incrementing Step %d \n",CkMyPe(),step()));
918   // if sync resume, invoke a barrier
919
920 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
921     savedBalancing = balancing;
922     startLoadBalancingMlog(&resumeCentralLbAfterChkpt,(void *)this);
923 #endif
924
925   LBDatabase::Object()->MigrationDone();    // call registered callbacks
926
927   LoadbalanceDone(balancing);        // callback
928 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
929   // if sync resume invoke a barrier
930   if (balancing && _lb_args.syncResume()) {
931     CkCallback cb(CkIndex_CentralLB::ResumeClients((CkReductionMsg*)NULL), 
932                   thisProxy);
933     contribute(0, NULL, CkReduction::sum_int, cb);
934   }
935   else{ 
936     if(CmiNodeAlive(CkMyPe())){
937         thisProxy [CkMyPe()].ResumeClients(balancing);
938     }   
939   }     
940 #if CMK_GRID_QUEUE_AVAILABLE
941   CmiGridQueueDeregisterAll ();
942   CpvAccess(CkGridObject) = NULL;
943 #endif
944 #endif 
945 #endif
946 }
947
948 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
949 void CentralLB::endMigrationDone(int balancing){
950     DEBUGF(("[%d] CentralLB::endMigrationDone step %d\n",CkMyPe(),step()));
951
952
953   if (balancing && _lb_args.syncResume()) {
954     CkCallback cb(CkIndex_CentralLB::ResumeClients((CkReductionMsg*)NULL),
955                   thisProxy);
956     contribute(0, NULL, CkReduction::sum_int, cb);
957   }
958   else{
959     if(CmiNodeAlive(CkMyPe())){
960     DEBUGF(("[%d] Sending ResumeClients balancing %d \n",CkMyPe(),balancing));
961     thisProxy [CkMyPe()].ResumeClients(balancing);
962     }
963   }
964
965 }
966 #endif
967
968 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
969 void resumeCentralLbAfterChkpt(void *_lb){
970     CentralLB *lb= (CentralLB *)_lb;
971     CpvAccess(_currentObj)=lb;
972     lb->endMigrationDone(lb->savedBalancing);
973 }
974 void resumeAfterRestoreParallelRecovery(void *_lb){
975     CentralLB *lb= (CentralLB *)_lb;
976         lb->ProcessReceiveMigration((CkReductionMsg*)NULL);
977 }
978 #endif
979
980
981 void CentralLB::ResumeClients(CkReductionMsg *msg)
982 {
983   ResumeClients(1);
984   delete msg;
985 }
986
987 void CentralLB::ResumeClients(int balancing)
988 {
989 #if CMK_LBDB_ON
990 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
991     resumeCount++;
992     globalResumeCount = resumeCount;
993 #endif
994   DEBUGF(("[%d] Resuming clients. balancing:%d.\n",CkMyPe(),balancing));
995   if (balancing && _lb_args.debug() && CkMyPe() == cur_ld_balancer) {
996     double end_lb_time = CkWallTimer();
997   }
998
999 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
1000   if (balancing) ComlibNotifyMigrationDone();  
1001 #endif
1002
1003   theLbdb->ResumeClients();
1004   if (balancing)  {
1005
1006     CheckMigrationComplete();
1007     if (future_migrates_expected == 0 || 
1008             future_migrates_expected == future_migrates_completed) {
1009       CheckMigrationComplete();
1010     }
1011   }
1012 #endif
1013 }
1014
1015 /*
1016   migration of objects contains two different kinds:
1017   (1) objects want to make a barrier for migration completion
1018       (waitForBarrier is true)
1019       migrationDone() to finish and resumeClients
1020   (2) objects don't need a barrier
1021   However, next load balancing can only happen when both migrations complete
1022 */ 
1023 void CentralLB::CheckMigrationComplete()
1024 {
1025 #if CMK_LBDB_ON
1026   lbdone ++;
1027   if (lbdone == 2) {
1028     double end_lb_time = CkWallTimer();
1029     if (_lb_args.debug() && CkMyPe()==0) {
1030       CkPrintf("CharmLB> %s: PE [%d] step %d finished at %f duration %f s\n\n",
1031                 lbname, cur_ld_balancer, step()-1, end_lb_time,
1032                 end_lb_time-start_lb_time);
1033     }
1034
1035     theLbdb->SetMigrationCost(end_lb_time - start_lb_time);
1036
1037     lbdone = 0;
1038     future_migrates_expected = -1;
1039     future_migrates_completed = 0;
1040
1041
1042     DEBUGF(("[%d] Migration Complete\n", CkMyPe()));
1043     // release local barrier  so that the next load balancer can go
1044     LDOMHandle h;
1045     h.id.id.idx = 0;
1046     theLbdb->getLBDB()->DoneRegisteringObjects(h);
1047     // switch to the next load balancer in the list
1048     // subtle: called from Migrated() may result in Migrated() called in next LB
1049     theLbdb->nextLoadbalancer(seqno);
1050   }
1051 #endif
1052 }
1053
1054 void CentralLB::preprocess(LDStats* stats)
1055 {
1056   if (_lb_args.ignoreBgLoad())
1057     stats->clearBgLoad();
1058
1059   // Call the predictor for the future
1060   if (_lb_predict) FuturePredictor(statsData);
1061 }
1062
1063 // default load balancing strategy
1064 LBMigrateMsg* CentralLB::Strategy(LDStats* stats)
1065 {
1066 #if CMK_LBDB_ON
1067   double strat_start_time = CkWallTimer();
1068   if (_lb_args.debug())
1069     CkPrintf("CharmLB> %s: PE [%d] strategy starting at %f\n", lbname, cur_ld_balancer, strat_start_time);
1070
1071   work(stats);
1072
1073
1074   if (_lb_args.debug()>2)  {
1075     CkPrintf("CharmLB> Obj Map:\n");
1076     for (int i=0; i<stats->n_objs; i++) CkPrintf("%d ", stats->to_proc[i]);
1077     CkPrintf("\n");
1078   }
1079
1080   LBMigrateMsg *msg = createMigrateMsg(stats);
1081
1082         /* Extra feature for MetaBalancer
1083   if (_lb_args.metaLbOn()) {
1084     int clients = CkNumPes();
1085     LBInfo info(clients);
1086     getPredictedLoadWithMsg(stats, clients, msg, info, 0);
1087     LBRealType mLoad, mCpuLoad, totalLoad, totalLoadWComm;
1088     info.getSummary(mLoad, mCpuLoad, totalLoad);
1089     theLbdb->UpdateDataAfterLB(mLoad, mCpuLoad, totalLoad/clients);
1090   }
1091         */
1092
1093   if (_lb_args.debug()) {
1094     double strat_end_time = CkWallTimer();
1095     envelope *env = UsrToEnv(msg);
1096
1097     double lbdbMemsize = LBDatabase::Object()->useMem()/1000;
1098     CkPrintf("CharmLB> %s: PE [%d] Memory: LBManager: %d KB CentralLB: %d KB\n",
1099               lbname, cur_ld_balancer, (int)lbdbMemsize, (int)(useMem()/1000));
1100     CkPrintf("CharmLB> %s: PE [%d] #Objects migrating: %d, LBMigrateMsg size: %.2f MB\n", lbname, cur_ld_balancer, msg->n_moves, env->getTotalsize()/1024.0/1024.0);
1101     CkPrintf("CharmLB> %s: PE [%d] strategy finished at %f duration %f s\n",
1102               lbname, cur_ld_balancer, strat_end_time, strat_end_time-strat_start_time);
1103     theLbdb->SetStrategyCost(strat_end_time - strat_start_time);
1104   }
1105   return msg;
1106 #else
1107   return NULL;
1108 #endif
1109 }
1110
1111 void CentralLB::work(LDStats* stats)
1112 {
1113   // does nothing but print the database
1114   stats->print();
1115 }
1116
1117 // generate migrate message from stats->from_proc and to_proc
1118 LBMigrateMsg * CentralLB::createMigrateMsg(LDStats* stats)
1119 {
1120   int i;
1121   CkVec<MigrateInfo*> migrateInfo;
1122   for (i=0; i<stats->n_objs; i++) {
1123     LDObjData &objData = stats->objData[i];
1124     int frompe = stats->from_proc[i];
1125     int tope = stats->to_proc[i];
1126     if (frompe != tope) {
1127       //      CkPrintf("[%d] Obj %d migrating from %d to %d\n",
1128       //         CkMyPe(),obj,pe,dest);
1129       MigrateInfo *migrateMe = new MigrateInfo;
1130       migrateMe->obj = objData.handle;
1131       migrateMe->from_pe = frompe;
1132       migrateMe->to_pe = tope;
1133       migrateMe->async_arrival = objData.asyncArrival;
1134       migrateInfo.insertAtEnd(migrateMe);
1135     }
1136   }
1137
1138   int migrate_count=migrateInfo.length();
1139   LBMigrateMsg* msg = new(migrate_count,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
1140   msg->n_moves = migrate_count;
1141   for(i=0; i < migrate_count; i++) {
1142     MigrateInfo* item = (MigrateInfo*) migrateInfo[i];
1143     msg->moves[i] = *item;
1144     delete item;
1145     migrateInfo[i] = 0;
1146   }
1147   return msg;
1148 }
1149
1150 LBMigrateMsg * CentralLB::extractMigrateMsg(LBMigrateMsg *m, int p)
1151 {
1152   int nmoves = 0;
1153   int nunavail = 0;
1154   int i;
1155   for (i=0; i<m->n_moves; i++) {
1156     MigrateInfo* item = (MigrateInfo*) &m->moves[i];
1157     if (item->from_pe == p || item->to_pe == p) nmoves++;
1158   }
1159   for (i=0; i<CkNumPes();i++) {
1160     if (!m->avail_vector[i]) nunavail++;
1161   }
1162   LBMigrateMsg* msg;
1163   if (nunavail) msg = new(nmoves,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
1164   else msg = new(nmoves,0,0,0) LBMigrateMsg;
1165   msg->n_moves = nmoves;
1166   msg->level = m->level;
1167   msg->next_lb = m->next_lb;
1168   for (i=0,nmoves=0; i<m->n_moves; i++) {
1169     MigrateInfo* item = (MigrateInfo*) &m->moves[i];
1170     if (item->from_pe == p || item->to_pe == p) {
1171       msg->moves[nmoves] = *item;
1172       nmoves++;
1173     }
1174   }
1175   // copy processor data
1176   if (nunavail)
1177   for (i=0; i<CkNumPes();i++) {
1178     msg->avail_vector[i] = m->avail_vector[i];
1179     msg->expectedLoad[i] = m->expectedLoad[i];
1180   }
1181   return msg;
1182 }
1183
1184 void CentralLB::simulationWrite() {
1185   if(step() == LBSimulation::dumpStep)
1186   {
1187     // here we are supposed to dump the database
1188     int dumpFileSize = strlen(LBSimulation::dumpFile) + 4;
1189     char *dumpFileName = (char *)malloc(dumpFileSize);
1190     while (sprintf(dumpFileName, "%s.%d", LBSimulation::dumpFile, LBSimulation::dumpStep) >= dumpFileSize) {
1191       free(dumpFileName);
1192       dumpFileSize+=3;
1193       dumpFileName = (char *)malloc(dumpFileSize);
1194     }
1195     writeStatsMsgs(dumpFileName);
1196     free(dumpFileName);
1197     CmiPrintf("LBDump: Dumped the load balancing data at step %d.\n",LBSimulation::dumpStep);
1198     ++LBSimulation::dumpStep;
1199     --LBSimulation::dumpStepSize;
1200     if (LBSimulation::dumpStepSize <= 0) { // prevent stupid step sizes
1201       CmiPrintf("Charm++> Exiting...\n");
1202       CkExit();
1203     }
1204     return;
1205   }
1206 }
1207
1208 void CentralLB::simulationRead() {
1209   LBSimulation *simResults = NULL, *realResults;
1210   LBMigrateMsg *voidMessage = new (0,0,0,0) LBMigrateMsg();
1211   voidMessage->n_moves=0;
1212   for ( ;LBSimulation::simStepSize > 0; --LBSimulation::simStepSize, ++LBSimulation::simStep) {
1213     // here we are supposed to read the data from the dump database
1214     int simFileSize = strlen(LBSimulation::dumpFile) + 4;
1215     char *simFileName = (char *)malloc(simFileSize);
1216     while (sprintf(simFileName, "%s.%d", LBSimulation::dumpFile, LBSimulation::simStep) >= simFileSize) {
1217       free(simFileName);
1218       simFileSize+=3;
1219       simFileName = (char *)malloc(simFileSize);
1220     }
1221     readStatsMsgs(simFileName);
1222
1223     // allocate simResults (only the first step)
1224     if (simResults == NULL) {
1225       simResults = new LBSimulation(LBSimulation::simProcs);
1226       realResults = new LBSimulation(LBSimulation::simProcs);
1227     }
1228     else {
1229       // should be the same number of procs of the original simulation!
1230       if (!LBSimulation::procsChanged) {
1231         // it means we have a previous step, so in simResults there is data.
1232         // we can now print the real effects of the load balancer during the simulation
1233         // or print the difference between the predicted data and the real one.
1234         realResults->reset();
1235         // reset to_proc of statsData to be equal to from_proc
1236         for (int k=0; k < statsData->n_objs; ++k) statsData->to_proc[k] = statsData->from_proc[k];
1237         findSimResults(statsData, LBSimulation::simProcs, voidMessage, realResults);
1238         simResults->PrintDifferences(realResults,statsData);
1239       }
1240       simResults->reset();
1241     }
1242
1243     // now pass it to the strategy routine
1244     double startT = CkWallTimer();
1245     preprocess(statsData);
1246     CmiPrintf("%s> Strategy starts ... \n", lbname);
1247     LBMigrateMsg* migrateMsg = Strategy(statsData);
1248     CmiPrintf("%s> Strategy took %fs memory usage: CentralLB: %d KB.\n",
1249                lbname, CkWallTimer()-startT, (int)(useMem()/1000));
1250
1251     // now calculate the results of the load balancing simulation
1252     findSimResults(statsData, LBSimulation::simProcs, migrateMsg, simResults);
1253
1254     // now we have the simulation data, so print it and loop
1255     CmiPrintf("Charm++> LBSim: Simulation of load balancing step %d done.\n",LBSimulation::simStep);
1256     // **CWL** Officially recording my disdain here for using ints for bool
1257     if (LBSimulation::showDecisionsOnly) {
1258       simResults->PrintDecisions(migrateMsg, simFileName, 
1259                                  LBSimulation::simProcs);
1260     } else {
1261       simResults->PrintSimulationResults();
1262     }
1263
1264     free(simFileName);
1265     delete migrateMsg;
1266     CmiPrintf("Charm++> LBSim: Passing to the next step\n");
1267   }
1268   // deallocate simResults
1269   delete simResults;
1270   CmiPrintf("Charm++> Exiting...\n");
1271   CkExit();
1272 }
1273
1274 void CentralLB::readStatsMsgs(const char* filename) 
1275 {
1276 #if CMK_LBDB_ON
1277   int i;
1278   FILE *f = fopen(filename, "r");
1279   if (f==NULL) {
1280     CmiPrintf("Fatal Error> Cannot open LB Dump file %s!\n", filename);
1281     CmiAbort("");
1282   }
1283
1284   // at this stage, we need to rebuild the statsMsgList and
1285   // statsDataList structures. For that first deallocate the
1286   // old structures
1287   if (statsMsgsList) {
1288     for(i = 0; i < stats_msg_count; i++)
1289       delete statsMsgsList[i];
1290     delete[] statsMsgsList;
1291     statsMsgsList=0;
1292   }
1293
1294   PUP::fromDisk pd(f);
1295   PUP::machineInfo machInfo;
1296
1297   pd((char *)&machInfo, sizeof(machInfo));      // read machine info
1298   PUP::xlater p(machInfo, pd);
1299
1300   if (_lb_args.lbversion() > 1) {
1301     p|_lb_args.lbversion();             // write version number
1302     CkPrintf("LB> File version detected: %d\n", _lb_args.lbversion());
1303     CmiAssert(_lb_args.lbversion() <= LB_FORMAT_VERSION);
1304   }
1305   p|stats_msg_count;
1306
1307   CmiPrintf("readStatsMsgs for %d pes starts ... \n", stats_msg_count);
1308   if (LBSimulation::simProcs == 0) LBSimulation::simProcs = stats_msg_count;
1309   if (LBSimulation::simProcs != stats_msg_count) LBSimulation::procsChanged = true;
1310
1311   // LBSimulation::simProcs must be set
1312   statsData->pup(p);
1313
1314   CmiPrintf("Simulation for %d pes \n", LBSimulation::simProcs);
1315   CmiPrintf("n_obj: %d n_migratble: %d \n", statsData->n_objs, statsData->n_migrateobjs);
1316
1317   // file f is closed in the destructor of PUP::fromDisk
1318   CmiPrintf("ReadStatsMsg from %s completed\n", filename);
1319 #endif
1320 }
1321
1322 void CentralLB::writeStatsMsgs(const char* filename) 
1323 {
1324 #if CMK_LBDB_ON
1325   FILE *f = fopen(filename, "w");
1326   if (f==NULL) {
1327     CmiPrintf("Fatal Error> writeStatsMsgs failed to open the output file %s!\n", filename);
1328     CmiAbort("");
1329   }
1330
1331   const PUP::machineInfo &machInfo = PUP::machineInfo::current();
1332   PUP::toDisk p(f);
1333   p((char *)&machInfo, sizeof(machInfo));       // machine info
1334
1335   p|_lb_args.lbversion();               // write version number
1336   p|stats_msg_count;
1337   statsData->pup(p);
1338
1339   fclose(f);
1340
1341   CmiPrintf("WriteStatsMsgs to %s succeed!\n", filename);
1342 #endif
1343 }
1344
1345 // calculate the predicted wallclock/cpu load for every processors
1346 // considering communication overhead if considerComm is true
1347 void getPredictedLoadWithMsg(BaseLB::LDStats* stats, int count, 
1348                       LBMigrateMsg *msg, LBInfo &info, 
1349                       int considerComm)
1350 {
1351 #if CMK_LBDB_ON
1352         stats->makeCommHash();
1353
1354         // update to_proc according to migration msgs
1355         for(int i = 0; i < msg->n_moves; i++) {
1356           MigrateInfo &mInfo = msg->moves[i];
1357           int idx = stats->getHash(mInfo.obj.objID(), mInfo.obj.omID());
1358           CmiAssert(idx != -1);
1359           stats->to_proc[idx] = mInfo.to_pe;
1360         }
1361
1362         info.getInfo(stats, count, considerComm);
1363 #endif
1364 }
1365
1366
1367 void CentralLB::findSimResults(LDStats* stats, int count, LBMigrateMsg* msg, LBSimulation* simResults)
1368 {
1369     CkAssert(simResults != NULL && count == simResults->numPes);
1370     // estimate the new loads of the processors. As a first approximation, this is the
1371     // sum of the cpu times of the objects on that processor
1372     double startT = CkWallTimer();
1373     getPredictedLoadWithMsg(stats, count, msg, simResults->lbinfo, 1);
1374     CmiPrintf("getPredictedLoad finished in %fs\n", CkWallTimer()-startT);
1375 }
1376
1377 void CentralLB::pup(PUP::er &p) { 
1378   BaseLB::pup(p); 
1379   if (p.isUnpacking())  {
1380     initLB(CkLBOptions(seqno)); 
1381   }
1382   p|reduction_started;
1383   int has_statsMsg=0;
1384   if (p.isPacking()) has_statsMsg = (statsMsg!=NULL);
1385   p|has_statsMsg;
1386   if (has_statsMsg) {
1387     if (p.isUnpacking())
1388       statsMsg = new CLBStatsMsg;
1389     statsMsg->pup(p);
1390   }
1391 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1392   p | lbDecisionCount;
1393   p | resumeCount;
1394 #endif
1395         
1396 }
1397
1398 int CentralLB::useMem() { 
1399   return sizeof(CentralLB) + statsData->useMem() + 
1400          CkNumPes() * sizeof(CLBStatsMsg *);
1401 }
1402
1403
1404 /**
1405   CLBStatsMsg is not a real message now.
1406   CLBStatsMsg is used for all processors to fill in their local load and comm
1407   statistics and send to processor 0
1408 */
1409
1410 CLBStatsMsg::CLBStatsMsg(int osz, int csz) {
1411   n_objs = osz;
1412   n_comm = csz;
1413   objData = new LDObjData[osz];
1414   commData = new LDCommData[csz];
1415   avail_vector = NULL;
1416 }
1417
1418 CLBStatsMsg::~CLBStatsMsg() {
1419   delete [] objData;
1420   delete [] commData;
1421   delete [] avail_vector;
1422 }
1423
1424 void CLBStatsMsg::pup(PUP::er &p) {
1425   int i;
1426   p|from_pe;
1427   p|pe_speed;
1428   p|total_walltime;
1429   p|idletime;
1430   p|bg_walltime;
1431 #if CMK_LB_CPUTIMER
1432   p|total_cputime;
1433   p|bg_cputime;
1434 #endif
1435 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1436   p | step;
1437 #endif
1438   p|n_objs;
1439   if (p.isUnpacking()) objData = new LDObjData[n_objs];
1440   for (i=0; i<n_objs; i++) p|objData[i];
1441   p|n_comm;
1442   if (p.isUnpacking()) commData = new LDCommData[n_comm];
1443   for (i=0; i<n_comm; i++) p|commData[i];
1444
1445   int has_avail_vector;
1446   if (!p.isUnpacking()) has_avail_vector = (avail_vector != NULL);
1447   p|has_avail_vector;
1448   if (p.isUnpacking()) {
1449     if (has_avail_vector) avail_vector = new char[CkNumPes()];
1450     else avail_vector = NULL;
1451   }
1452   if (has_avail_vector) p(avail_vector, CkNumPes());
1453
1454   p(next_lb);
1455 }
1456
1457 // CkMarshalledCLBStatsMessage is used in the marshalled parameter in
1458 // the entry function, it is just used to use to pup.
1459 // I don't use CLBStatsMsg directly as marshalled parameter because
1460 // I want the data pointer stored and not to be freed by the Charm++.
1461 void CkMarshalledCLBStatsMessage::free() { 
1462   int count = msgs.size();
1463   for  (int i=0; i<count; i++) {
1464     delete msgs[i];
1465     msgs[i] = NULL;
1466   }
1467   msgs.free();
1468 }
1469
1470 void CkMarshalledCLBStatsMessage::add(CkMarshalledCLBStatsMessage &m)
1471 {
1472   int count = m.getCount();
1473   for (int i=0; i<count; i++) add(m.getMessage(i));
1474 }
1475
1476 void CkMarshalledCLBStatsMessage::pup(PUP::er &p)
1477 {
1478   int count = msgs.size();
1479   p|count;
1480   for (int i=0; i<count; i++) {
1481     CLBStatsMsg *msg;
1482     if (p.isUnpacking()) msg = new CLBStatsMsg;
1483     else { 
1484       msg = msgs[i]; CmiAssert(msg!=NULL);
1485     }
1486     msg->pup(p);
1487     if (p.isUnpacking()) add(msg);
1488   }
1489 }
1490
1491 SpanningTree::SpanningTree()
1492 {
1493         double sq = sqrt(CkNumPes()*4.0-3.0) - 1; // 1 + arity + arity*arity = CkNumPes()
1494         arity = (int)ceil(sq/2);
1495         calcParent(CkMyPe());
1496         calcNumChildren(CkMyPe());
1497 }
1498
1499 void SpanningTree::calcParent(int n)
1500 {
1501         parent=-1;
1502         if(n != 0  && arity > 0)
1503                 parent = (n-1)/arity;
1504 }
1505
1506 void SpanningTree::calcNumChildren(int n)
1507 {
1508         numChildren = 0;
1509         if (arity == 0) return;
1510         int fullNode=(CkNumPes()-1-arity)/arity;
1511         if(n <= fullNode)
1512                 numChildren = arity;
1513         if(n == fullNode+1)
1514                 numChildren = CkNumPes()-1-(fullNode+1)*arity;
1515         if(n > fullNode+1)
1516                 numChildren = 0;
1517 }
1518
1519 #include "CentralLB.def.h"
1520  
1521 /*@}*/