cleanup
[charm.git] / src / ck-ldb / CentralLB.C
1
2 /**
3  * \addtogroup CkLdb
4 */
5 /*@{*/
6
7 #include <charm++.h>
8 #include "ck.h"
9 #include "envelope.h"
10 #include "CentralLB.h"
11 #include "LBDBManager.h"
12 #include "LBSimulation.h"
13
14 //#include "limits.h"
15 #include <vector>
16
17 #define alpha 4.0e-6
18 #define beta 2.67e-9
19 #define percent_overhead 10
20
21
22 #define  DEBUGF(x)       // CmiPrintf x;
23 #define  DEBUG(x)        // x;
24 #define  DEBAD(x)        // CmiPrintf x
25
26 #if CMK_MEM_CHECKPOINT
27    /* can not handle reduction in inmem FT */
28 #define USE_REDUCTION         0
29 #define USE_LDB_SPANNING_TREE 0
30 #elif defined(_FAULT_MLOG_)
31 /* can not handle reduction in inmem FT */
32 #define USE_REDUCTION         0
33 #define USE_LDB_SPANNING_TREE 0
34 #else
35 #define USE_REDUCTION         1
36 #define USE_LDB_SPANNING_TREE 1
37 #endif
38
39 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
40 extern int _restartFlag;
41 extern void getGlobalStep(CkGroupID );
42 extern void initMlogLBStep(CkGroupID );
43 extern int globalResumeCount;
44 extern void sendDummyMigrationCounts(int *);
45 #endif
46
47 #if CMK_GRID_QUEUE_AVAILABLE
48 CpvExtern(void *, CkGridObject);
49 #endif
50
51 CkGroupID loadbalancer;
52 int * lb_ptr;
53 int load_balancer_created;
54
55 //struct AdaptiveData {
56 //  int iteration;
57 //  double max_load;
58 //  double avg_load;
59 //};
60 //
61 //struct AdaptiveLBDatabase {
62 //  std::vector<AdaptiveData> history_data;
63 //} adaptive_lbdb;
64 //
65 //enum state {
66 //  OFF,
67 //  ON,
68 //  PAUSE,
69 //  DECIDED,
70 //  LOAD_BALANCE
71 //} local_state;
72 //
73 //struct AdaptiveLBStructure {
74 //  int lb_ideal_period;
75 //  int lb_calculated_period;
76 //  int lb_no_iterations;
77 //  int global_max_iter_no;
78 //  int global_recv_iter_counter;
79 //  bool in_progress;
80 //  double prev_load;
81 //  double lb_strategy_cost;
82 //  double lb_migration_cost;
83 //  bool lb_period_informed;
84 //  int lb_msg_send_no;
85 //  int lb_msg_recv_no;
86 //} adaptive_struct;
87
88 CreateLBFunc_Def(CentralLB, "CentralLB base class")
89
90 static void getPredictedLoadWithMsg(BaseLB::LDStats* stats, int count, 
91                              LBMigrateMsg *, LBInfo &info, int considerComm);
92
93 //CkReductionMsg* lbDataCollection(int nMsg, CkReductionMsg** msgs) {
94 //  double lb_data[4];
95 //  lb_data[0] = 0;
96 //  lb_data[1] = 0;
97 //  lb_data[2] = 0;
98 //  for (int i = 0; i < nMsg; i++) {
99 //    CkAssert(msgs[i]->getSize() == 4*sizeof(double));
100 //    double* m = (double *)msgs[i]->getData();
101 //    lb_data[0] += m[0];
102 //    lb_data[1] = ((m[1] > lb_data[1])? m[1] : lb_data[1]);
103 //    lb_data[2] += m[2];
104 //    if (i == 0) {
105 //      lb_data[3] = m[3];
106 //    }
107 //    if (m[3] != lb_data[3]) {
108 //      CkPrintf("Error!!! Reduction is intermingled between iteration %lf and\
109 //      %lf\n", lb_data[3], m[3]);
110 //    }
111 //  }
112 //  return CkReductionMsg::buildNew(4*sizeof(double), lb_data);
113 //}
114 //
115 ///*global*/ CkReduction::reducerType lbDataCollectionType;
116 ///*initcall*/ void registerLBDataCollection(void) {
117 //  lbDataCollectionType = CkReduction::addReducer(lbDataCollection);
118 //}
119
120 /*
121 void CreateCentralLB()
122 {
123   CProxy_CentralLB::ckNew(0);
124 }
125 */
126
127 void CentralLB::staticStartLB(void* data)
128 {
129   CentralLB *me = (CentralLB*)(data);
130   me->StartLB();
131 }
132
133 void CentralLB::staticMigrated(void* data, LDObjHandle h, int waitBarrier)
134 {
135   CentralLB *me = (CentralLB*)(data);
136   me->Migrated(h, waitBarrier);
137 }
138
139 void CentralLB::staticAtSync(void* data)
140 {
141   CentralLB *me = (CentralLB*)(data);
142   me->AtSync();
143 }
144
145 void CentralLB::initLB(const CkLBOptions &opt)
146 {
147 #if CMK_LBDB_ON
148   lbname = "CentralLB";
149   thisProxy = CProxy_CentralLB(thisgroup);
150   //  CkPrintf("Construct in %d\n",CkMyPe());
151
152   // create and turn on by default
153   receiver = theLbdb->
154     AddLocalBarrierReceiver((LDBarrierFn)(staticAtSync),(void*)(this));
155   notifier = theLbdb->getLBDB()->
156     NotifyMigrated((LDMigratedFn)(staticMigrated),(void*)(this));
157   startLbFnHdl = theLbdb->getLBDB()->
158     AddStartLBFn((LDStartLBFn)(staticStartLB),(void*)(this));
159
160   // CkPrintf("[%d] CentralLB initLB \n",CkMyPe());
161   if (opt.getSeqNo() > 0) turnOff();
162
163   stats_msg_count = 0;
164   statsMsgsList = NULL;
165   statsData = NULL;
166
167   storedMigrateMsg = NULL;
168   reduction_started = 0;
169
170   // for future predictor
171   if (_lb_predict) predicted_model = new FutureModel(_lb_predict_window);
172   else predicted_model=0;
173   // register user interface callbacks
174   theLbdb->getLBDB()->SetupPredictor((LDPredictModelFn)(staticPredictorOn),(LDPredictWindowFn)(staticPredictorOnWin),(LDPredictFn)(staticPredictorOff),(LDPredictModelFn)(staticChangePredictor),(void*)(this));
175
176   myspeed = theLbdb->ProcessorSpeed();
177
178   migrates_completed = 0;
179   future_migrates_completed = 0;
180   migrates_expected = -1;
181   future_migrates_expected = -1;
182   cur_ld_balancer = _lb_args.central_pe();      // 0 default
183   lbdone = 0;
184   count_msgs=0;
185   statsMsg = NULL;
186
187   if (_lb_args.statsOn()) theLbdb->CollectStatsOn();
188
189   load_balancer_created = 1;
190
191   // If metabalancer enabled, initialize the variables
192  // adaptive_struct.lb_ideal_period =  INT_MAX;
193  // adaptive_struct.lb_calculated_period = INT_MAX;
194  // adaptive_struct.lb_no_iterations = -1;
195  // adaptive_struct.global_max_iter_no = 0;
196  // adaptive_struct.global_recv_iter_counter = 0;
197  // adaptive_struct.in_progress = false;
198  // adaptive_struct.prev_load = 0.0;
199  // adaptive_struct.lb_strategy_cost = 0.0;
200  // adaptive_struct.lb_migration_cost = 0.0;
201  // adaptive_struct.lb_msg_send_no = 0;
202  // adaptive_struct.lb_msg_recv_no = 0;
203  // local_state = OFF;
204 #endif
205 }
206
207 CentralLB::~CentralLB()
208 {
209 #if CMK_LBDB_ON
210   delete [] statsMsgsList;
211   delete statsData;
212   theLbdb = CProxy_LBDatabase(_lbdb).ckLocalBranch();
213   if (theLbdb) {
214     theLbdb->getLBDB()->
215       RemoveNotifyMigrated(notifier);
216     theLbdb->
217       RemoveStartLBFn((LDStartLBFn)(staticStartLB));
218   }
219 #endif
220 }
221
222 void CentralLB::turnOn() 
223 {
224 #if CMK_LBDB_ON
225   theLbdb->getLBDB()->
226     TurnOnBarrierReceiver(receiver);
227   theLbdb->getLBDB()->
228     TurnOnNotifyMigrated(notifier);
229   theLbdb->getLBDB()->
230     TurnOnStartLBFn(startLbFnHdl);
231 #endif
232 }
233
234 void CentralLB::turnOff() 
235 {
236 #if CMK_LBDB_ON
237   theLbdb->getLBDB()->
238     TurnOffBarrierReceiver(receiver);
239   theLbdb->getLBDB()->
240     TurnOffNotifyMigrated(notifier);
241   theLbdb->getLBDB()->
242     TurnOffStartLBFn(startLbFnHdl);
243 #endif
244 }
245
246 void CentralLB::AtSync()
247 {
248 //  CkPrintf("AtSync CEntral LB [%d]\n", CkMyPe());
249 #if CMK_LBDB_ON
250 //  DEBUGF(("[%d] CentralLB AtSync step %d!!!!!\n",CkMyPe(),step()));
251
252 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
253         CpvAccess(_currentObj)=this;
254 #endif
255
256   // if num of processor is only 1, nothing should happen
257   if (!QueryBalanceNow(step()) || CkNumPes() == 1) {
258     MigrationDone(0);
259     return;
260   }
261   if(CmiNodeAlive(CkMyPe())){
262     thisProxy [CkMyPe()].ProcessAtSync();
263   }
264 #endif
265 }
266
267 #include "ComlibStrategy.h"
268
269 void CentralLB::ProcessAtSync()
270 {
271
272
273
274 #if CMK_LBDB_ON
275   if (reduction_started) return;              // reducton in progress
276
277   CmiAssert(CmiNodeAlive(CkMyPe()));
278   if (CkMyPe() == cur_ld_balancer) {
279     start_lb_time = CkWallTimer();
280   }
281  double total_load;
282  double idle_time;
283  double bg_walltime;
284  theLbdb->GetTime(&total_load,&total_load, &idle_time, &bg_walltime, &bg_walltime);
285  theLbdb->IdleTime(&idle_time);
286  DEBAD(("Total walltime [%d] %lf: %lf: %lf final laod: %lf\n", CkMyPe(),
287     total_load, idle_time, bg_walltime, (total_load - idle_time - bg_walltime)));
288
289 // CkPrintf("Total walltime [%d] %lf: %lf: %lf final laod: %lf\n", CkMyPe(),
290 //    total_load, idle_time, bg_walltime, (total_load - idle_time - bg_walltime));
291
292
293 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
294         initMlogLBStep(thisgroup);
295 #endif
296
297   // build message
298   BuildStatsMsg();
299
300 #if USE_REDUCTION
301     // reduction to get total number of objects and comm
302     // so that processor 0 can pre-allocate load balancing database
303   int counts[2];
304   counts[0] = theLbdb->GetObjDataSz();
305   counts[1] = theLbdb->GetCommDataSz();
306
307   CkCallback cb(CkIndex_CentralLB::ReceiveCounts((CkReductionMsg*)NULL), 
308                   thisProxy[0]);
309   contribute(2*sizeof(int), counts, CkReduction::sum_int, cb);
310   reduction_started = 1;
311 #else
312   SendStats();
313 #endif
314 #endif
315 }
316
317 // called only on 0
318 void CentralLB::ReceiveCounts(CkReductionMsg  *msg)
319 {
320   CmiAssert(CkMyPe() == 0);
321   if (statsData == NULL) statsData = new LDStats;
322
323   int *counts = (int *)msg->getData();
324   int n_objs = counts[0];
325   int n_comm = counts[1];
326
327     // resize database
328   statsData->objData.resize(n_objs);
329   statsData->from_proc.resize(n_objs);
330   statsData->to_proc.resize(n_objs);
331   statsData->commData.resize(n_comm);
332
333   DEBUGF(("[%d] ReceiveCounts: n_objs:%d n_comm:%d\n",CkMyPe(), n_objs, n_comm));
334         
335     // broadcast call to let everybody start to send stats
336   thisProxy.SendStats();
337 }
338
339 void CentralLB::BuildStatsMsg()
340 {
341 #if CMK_LBDB_ON
342   // build and send stats
343   const int osz = theLbdb->GetObjDataSz();
344   const int csz = theLbdb->GetCommDataSz();
345
346   int npes = CkNumPes();
347   CLBStatsMsg* msg = new CLBStatsMsg(osz, csz);
348   _MEMCHECK(msg);
349   msg->from_pe = CkMyPe();
350 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
351         msg->step = step();
352 #endif
353   //msg->serial = CrnRand();
354
355 /*
356   theLbdb->TotalTime(&msg->total_walltime,&msg->total_cputime);
357   theLbdb->IdleTime(&msg->idletime);
358   theLbdb->BackgroundLoad(&msg->bg_walltime,&msg->bg_cputime);
359 */
360 #if CMK_LB_CPUTIMER
361   theLbdb->GetTime(&msg->total_walltime,&msg->total_cputime,
362                    &msg->idletime, &msg->bg_walltime,&msg->bg_cputime);
363 #else
364   theLbdb->GetTime(&msg->total_walltime,&msg->total_walltime,
365                    &msg->idletime, &msg->bg_walltime,&msg->bg_walltime);
366 #endif
367
368   msg->pe_speed = myspeed;
369   DEBUGF(("Processor %d Total time (wall,cpu) = %f %f Idle = %f Bg = %f %f\n", CkMyPe(),msg->total_walltime,msg->total_cputime,msg->idletime,msg->bg_walltime,msg->bg_cputime));
370
371   msg->n_objs = osz;
372   theLbdb->GetObjData(msg->objData);
373   msg->n_comm = csz;
374   theLbdb->GetCommData(msg->commData);
375 //  theLbdb->ClearLoads();
376   DEBUGF(("PE %d BuildStatsMsg %d objs, %d comm\n",CkMyPe(),msg->n_objs,msg->n_comm));
377
378   if(CkMyPe() == cur_ld_balancer) {
379     msg->avail_vector = new char[CkNumPes()];
380     LBDatabaseObj()->get_avail_vector(msg->avail_vector);
381     msg->next_lb = LBDatabaseObj()->new_lbbalancer();
382   }
383
384   CmiAssert(statsMsg == NULL);
385   statsMsg = msg;
386 #endif
387 }
388
389
390 // called on every processor
391 void CentralLB::SendStats()
392 {
393 #if CMK_LBDB_ON
394   CmiAssert(statsMsg != NULL);
395   reduction_started = 0;
396
397 #if USE_LDB_SPANNING_TREE
398   if(CkNumPes()>1024)
399   {
400     if (CkMyPe() == cur_ld_balancer)
401       thisProxy[CkMyPe()].ReceiveStats(statsMsg);
402     else
403       thisProxy[CkMyPe()].ReceiveStatsViaTree(statsMsg);
404   }
405   else
406 #endif
407   {
408     DEBUGF(("[%d] calling ReceiveStats on step %d \n",CmiMyPe(),step()));
409     thisProxy[cur_ld_balancer].ReceiveStats(statsMsg);
410   }
411
412   statsMsg = NULL;
413
414 #ifdef __BIGSIM__
415   BgEndStreaming();
416 #endif
417
418   {
419   // enfore the barrier to wait until centralLB says no
420   LDOMHandle h;
421   h.id.id.idx = 0;
422   theLbdb->getLBDB()->RegisteringObjects(h);
423   }
424 #endif
425 }
426
427 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
428 extern int donotCountMigration;
429 #endif
430
431 void CentralLB::Migrated(LDObjHandle h, int waitBarrier)
432 {
433 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
434     if(donotCountMigration){
435         return ;
436     }
437 #endif
438
439 #if CMK_LBDB_ON
440   if (waitBarrier) {
441             migrates_completed++;
442       DEBUGF(("[%d] An object migrated! %d %d\n",CkMyPe(),migrates_completed,migrates_expected));
443     if (migrates_completed == migrates_expected) {
444       MigrationDone(1);
445     }
446   }
447   else {
448     future_migrates_completed ++;
449     DEBUGF(("[%d] An object migrated with no barrier! %d expected: %d\n",CkMyPe(),future_migrates_completed,future_migrates_expected));
450     if (future_migrates_completed == future_migrates_expected)  {
451         CheckMigrationComplete();
452     }
453   }
454 #endif
455 }
456
457 void CentralLB::MissMigrate(int waitForBarrier)
458 {
459   LDObjHandle h;
460   Migrated(h, waitForBarrier);
461 }
462
463 // build a complete data from bufferred messages
464 // not used when USE_REDUCTION = 1
465 void CentralLB::buildStats()
466 {
467     statsData->nprocs() = stats_msg_count;
468     // allocate space
469     statsData->objData.resize(statsData->n_objs);
470     statsData->from_proc.resize(statsData->n_objs);
471     statsData->to_proc.resize(statsData->n_objs);
472     statsData->commData.resize(statsData->n_comm);
473
474     int nobj = 0;
475     int ncom = 0;
476     int nmigobj = 0;
477     // copy all data in individule message to this big structure
478     for (int pe=0; pe<CkNumPes(); pe++) {
479        int i;
480        CLBStatsMsg *msg = statsMsgsList[pe];
481        if(msg == NULL) continue;
482        for (i=0; i<msg->n_objs; i++) {
483          statsData->from_proc[nobj] = statsData->to_proc[nobj] = pe;
484          statsData->objData[nobj] = msg->objData[i];
485          if (msg->objData[i].migratable) nmigobj++;
486          nobj++;
487        }
488        for (i=0; i<msg->n_comm; i++) {
489          statsData->commData[ncom] = msg->commData[i];
490          ncom++;
491        }
492        // free the memory
493        delete msg;
494        statsMsgsList[pe]=0;
495     }
496     statsData->n_migrateobjs = nmigobj;
497 }
498
499 // deposit one processor data at a time, note database is pre-allocated
500 // to have enough space
501 // used when USE_REDUCTION = 1
502 void CentralLB::depositData(CLBStatsMsg *m)
503 {
504   int i;
505   if (m == NULL) return;
506
507   const int pe = m->from_pe;
508   struct ProcStats &procStat = statsData->procs[pe];
509   procStat.pe = pe;
510   procStat.total_walltime = m->total_walltime;
511   procStat.idletime = m->idletime;
512   procStat.bg_walltime = m->bg_walltime;
513 #if CMK_LB_CPUTIMER
514   procStat.total_cputime = m->total_cputime;
515   procStat.bg_cputime = m->bg_cputime;
516 #endif
517   procStat.pe_speed = m->pe_speed;
518   //procStat.utilization = 1.0;
519   procStat.available = CmiTrue;
520   procStat.n_objs = m->n_objs;
521
522   int &nobj = statsData->n_objs;
523   int &nmigobj = statsData->n_migrateobjs;
524   for (i=0; i<m->n_objs; i++) {
525       statsData->from_proc[nobj] = statsData->to_proc[nobj] = pe;
526       statsData->objData[nobj] = m->objData[i];
527       if (m->objData[i].migratable) nmigobj++;
528       nobj++;
529       CmiAssert(nobj <= statsData->objData.capacity());
530   }
531   int &n_comm = statsData->n_comm;
532   for (i=0; i<m->n_comm; i++) {
533       statsData->commData[n_comm] = m->commData[i];
534       n_comm++;
535       CmiAssert(n_comm <= statsData->commData.capacity());
536   }
537   delete m;
538 }
539
540 void CentralLB::ReceiveStats(CkMarshalledCLBStatsMessage &msg)
541 {
542 #if CMK_LBDB_ON
543   if (statsMsgsList == NULL) {
544     statsMsgsList = new CLBStatsMsg*[CkNumPes()];
545     CmiAssert(statsMsgsList != NULL);
546     for(int i=0; i < CkNumPes(); i++)
547       statsMsgsList[i] = 0;
548   }
549   if (statsData == NULL) statsData = new LDStats;
550
551     //  loop through all CLBStatsMsg in the incoming msg
552   int count = msg.getCount();
553   for (int num = 0; num < count; num++) 
554   {
555     CLBStatsMsg *m = msg.getMessage(num);
556     CmiAssert(m!=NULL);
557     const int pe = m->from_pe;
558     DEBUGF(("Stats msg received, %d %d %d %p step %d\n", pe,stats_msg_count,m->n_objs,m,step()));
559 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))     
560 /*      
561  *  if(m->step < step()){
562  *    //TODO: if a processor is redoing an old load balance step..
563  *    //tell it that the step is done and that it should not perform any migrations
564  *      thisProxy[pe].ReceiveDummyMigration();
565  *  }*/
566 #endif
567         
568     if(!CmiNodeAlive(pe)){
569         DEBUGF(("[%d] ReceiveStats called from invalidProcessor %d\n",CkMyPe(),pe));
570         continue;
571     }
572         
573     if (m->avail_vector!=NULL) {
574       LBDatabaseObj()->set_avail_vector(m->avail_vector,  m->next_lb);
575     }
576
577     if (statsMsgsList[pe] != 0) {
578       CkPrintf("*** Unexpected CLBStatsMsg in ReceiveStats from PE %d ***\n",
579              pe);
580     } else {
581       statsMsgsList[pe] = m;
582 #if USE_REDUCTION
583       depositData(m);
584 #else
585       // store per processor data right away
586       struct ProcStats &procStat = statsData->procs[pe];
587       procStat.pe = pe;
588       procStat.total_walltime = m->total_walltime;
589       procStat.idletime = m->idletime;
590       procStat.bg_walltime = m->bg_walltime;
591 #if CMK_LB_CPUTIMER
592       procStat.total_cputime = m->total_cputime;
593       procStat.bg_cputime = m->bg_cputime;
594 #endif
595       procStat.pe_speed = m->pe_speed;
596       //procStat.utilization = 1.0;
597       procStat.available = CmiTrue;
598       procStat.n_objs = m->n_objs;
599
600       statsData->n_objs += m->n_objs;
601       statsData->n_comm += m->n_comm;
602 #endif
603       stats_msg_count++;
604     }
605   }    // end of for
606
607   const int clients = CkNumValidPes();
608   DEBUGF(("THIS POINT count = %d, clients = %d\n",stats_msg_count,clients));
609  
610   if (stats_msg_count == clients) {
611         DEBUGF(("[%d] All stats messages received \n",CmiMyPe()));
612     statsData->nprocs() = stats_msg_count;
613     thisProxy[CkMyPe()].LoadBalance();
614   }
615 #endif
616 }
617
618 /** added by Abhinav for receiving msgs via spanning tree */
619 void CentralLB::ReceiveStatsViaTree(CkMarshalledCLBStatsMessage &msg)
620 {
621 #if CMK_LBDB_ON
622         CmiAssert(CkMyPe() != 0);
623         bufMsg.add(msg);         // buffer messages
624         count_msgs++;
625         //CkPrintf("here %d\n", CkMyPe());
626         if (count_msgs == st.numChildren+1) {
627                 if(st.parent == 0)
628                 {
629                         thisProxy[0].ReceiveStats(bufMsg);
630                         //CkPrintf("from %d\n", CkMyPe());
631                 }
632                 else
633                         thisProxy[st.parent].ReceiveStatsViaTree(bufMsg);
634                 count_msgs = 0;
635                 bufMsg.free();
636         } 
637 #endif
638 }
639
640 void CentralLB::LoadBalance()
641 {
642 #if CMK_LBDB_ON
643   int proc;
644   const int clients = CkNumPes();
645
646 #if ! USE_REDUCTION
647   // build data
648   buildStats();
649 #else
650   for (proc = 0; proc < clients; proc++) statsMsgsList[proc] = NULL;
651 #endif
652
653 //NOTE  theLbdb->ResetAdaptive();
654   if (!_lb_args.samePeSpeed()) statsData->normalize_speed();
655
656   if (_lb_args.debug()) 
657       CmiPrintf("\nCharmLB> %s: PE [%d] step %d starting at %f Memory: %f MB\n",
658                   lbname, cur_ld_balancer, step(), start_lb_time,
659                   CmiMemoryUsage()/(1024.0*1024.0));
660
661   // if we are in simulation mode read data
662   if (LBSimulation::doSimulation) simulationRead();
663
664   char *availVector = LBDatabaseObj()->availVector();
665   for(proc = 0; proc < clients; proc++)
666       statsData->procs[proc].available = (CmiBool)availVector[proc];
667
668   preprocess(statsData);
669
670 //    CkPrintf("Before Calling Strategy\n");
671
672   if (_lb_args.printSummary()) {
673       LBInfo info(clients);
674         // not take comm data
675       info.getInfo(statsData, clients, 0);
676       LBRealType mLoad, mCpuLoad, totalLoad;
677       info.getSummary(mLoad, mCpuLoad, totalLoad);
678       int nmsgs, nbytes;
679       statsData->computeNonlocalComm(nmsgs, nbytes);
680       CkPrintf("[%d] Load Summary (before LB): max (with bg load): %f max (obj only): %f average: %f at step %d nonlocal: %d msgs %.2fKB.\n", CkMyPe(), mLoad, mCpuLoad, totalLoad/clients, step(), nmsgs, 1.0*nbytes/1024);
681 //      if (_lb_args.debug() > 1) {
682 //        for (int i=0; i<statsData->n_objs; i++)
683 //          CmiPrintf("[%d] %.10f %.10f\n", i, statsData->objData[i].minWall, statsData->objData[i].maxWall);
684 //      }
685   }
686
687 #if CMK_REPLAYSYSTEM
688   LDHandle *loadBalancer_pointers;
689   if (_replaySystem) {
690     loadBalancer_pointers = (LDHandle*)malloc(CkNumPes()*sizeof(LDHandle));
691     for (int i=0; i<statsData->n_objs; ++i) loadBalancer_pointers[statsData->from_proc[i]] = statsData->objData[i].handle.omhandle.ldb;
692   }
693 #endif
694   
695   LBMigrateMsg* migrateMsg = Strategy(statsData);
696 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
697         migrateMsg->step = step();
698 #endif
699
700 #if CMK_REPLAYSYSTEM
701   CpdHandleLBMessage(&migrateMsg);
702   if (_replaySystem) {
703     for (int i=0; i<migrateMsg->n_moves; ++i) migrateMsg->moves[i].obj.omhandle.ldb = loadBalancer_pointers[migrateMsg->moves[i].from_pe];
704     free(loadBalancer_pointers);
705   }
706 #endif
707   
708   LBDatabaseObj()->get_avail_vector(migrateMsg->avail_vector);
709   migrateMsg->next_lb = LBDatabaseObj()->new_lbbalancer();
710
711   // if this is the step at which we need to dump the database
712   simulationWrite();
713
714 //  calculate predicted load
715 //  very time consuming though, so only happen when debugging is on
716   if (_lb_args.printSummary()) {
717       LBInfo info(clients);
718         // not take comm data
719       getPredictedLoadWithMsg(statsData, clients, migrateMsg, info, 0);
720       LBRealType mLoad, mCpuLoad, totalLoad;
721       info.getSummary(mLoad, mCpuLoad, totalLoad);
722       int nmsgs, nbytes;
723       statsData->computeNonlocalComm(nmsgs, nbytes);
724       CkPrintf("[%d] Load Summary (after LB): max (with bg load): %f max (obj only): %f average: %f at step %d nonlocal: %d msgs %.2fKB useMem: %.2fKB.\n", CkMyPe(), mLoad, mCpuLoad, totalLoad/clients, step(), nmsgs, 1.0*nbytes/1024, (1.0*useMem())/1024);
725       for (int i=0; i<clients; i++)
726         migrateMsg->expectedLoad[i] = info.peLoads[i];
727   }
728
729   DEBUGF(("[%d]calling recv migration\n",CkMyPe()));
730 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_)) 
731     lbDecisionCount++;
732     migrateMsg->lbDecisionCount = lbDecisionCount;
733 #endif
734
735   envelope *env = UsrToEnv(migrateMsg);
736   if (1) {
737       // broadcast
738     thisProxy.ReceiveMigration(migrateMsg);
739   }
740   else {
741     // split the migration for each processor
742     for (int p=0; p<CkNumPes(); p++) {
743       LBMigrateMsg *m = extractMigrateMsg(migrateMsg, p);
744       thisProxy[p].ReceiveMigration(m);
745     }
746     delete migrateMsg;
747   }
748
749   // Zero out data structures for next cycle
750   // CkPrintf("zeroing out data\n");
751   statsData->clear();
752   stats_msg_count=0;
753 #endif
754 }
755
756 // test if sender and receiver in a commData is nonmigratable.
757 static int isMigratable(LDObjData **objData, int *len, int count, const LDCommData &commData)
758 {
759 #if CMK_LBDB_ON
760   for (int pe=0 ; pe<count; pe++)
761   {
762     for (int i=0; i<len[pe]; i++)
763       if (LDObjIDEqual(objData[pe][i].objID(), commData.sender.objID()) ||
764           LDObjIDEqual(objData[pe][i].objID(), commData.receiver.get_destObj().objID())) 
765       return 0;
766   }
767 #endif
768   return 1;
769 }
770
771 // rebuild LDStats and remove all non-migratble objects and related things
772 void CentralLB::removeNonMigratable(LDStats* stats, int count)
773 {
774   int i;
775
776   // check if we have non-migratable objects
777   int have = 0;
778   for (i=0; i<stats->n_objs; i++) 
779   {
780     LDObjData &odata = stats->objData[i];
781     if (!odata.migratable) {
782       have = 1; break;
783     }
784   }
785   if (have == 0) return;
786
787   CkVec<LDObjData> nonmig;
788   CkVec<int> new_from_proc, new_to_proc;
789   nonmig.resize(stats->n_migrateobjs);
790   new_from_proc.resize(stats->n_migrateobjs);
791   new_to_proc.resize(stats->n_migrateobjs);
792   int n_objs = 0;
793   for (i=0; i<stats->n_objs; i++) 
794   {
795     LDObjData &odata = stats->objData[i];
796     if (odata.migratable) {
797       nonmig[n_objs] = odata;
798       new_from_proc[n_objs] = stats->from_proc[i];
799       new_to_proc[n_objs] = stats->to_proc[i];
800       n_objs ++;
801     }
802     else {
803       stats->procs[stats->from_proc[i]].bg_walltime += odata.wallTime;
804 #if CMK_LB_CPUTIMER
805       stats->procs[stats->from_proc[i]].bg_cputime += odata.cpuTime;
806 #endif
807     }
808   }
809   CmiAssert(stats->n_migrateobjs == n_objs);
810
811   stats->makeCommHash();
812   
813   CkVec<LDCommData> newCommData;
814   newCommData.resize(stats->n_comm);
815   int n_comm = 0;
816   for (i=0; i<stats->n_comm; i++) 
817   {
818     LDCommData& cdata = stats->commData[i];
819     if (!cdata.from_proc()) 
820     {
821       int idx = stats->getSendHash(cdata);
822       CmiAssert(idx != -1);
823       if (!stats->objData[idx].migratable) continue;
824     }
825     switch (cdata.receiver.get_type()) {
826     case LD_PROC_MSG:
827       break;
828     case LD_OBJ_MSG:  {
829       int idx = stats->getRecvHash(cdata);
830       if (stats->complete_flag)
831         CmiAssert(idx != -1);
832       else if (idx == -1) continue;          // receiver not in this group
833       if (!stats->objData[idx].migratable) continue;
834       break;
835       }
836     case LD_OBJLIST_MSG:    // object message FIXME add multicast
837       break;
838     }
839     newCommData[n_comm] = cdata;
840     n_comm ++;
841   }
842
843   if (n_objs != stats->n_objs) CmiPrintf("Removed %d nonmigratable %d comms - n_objs:%d migratable:%d\n", stats->n_objs-n_objs, stats->n_objs, stats->n_migrateobjs, stats->n_comm-n_comm);
844
845   // swap to new data
846   stats->objData = nonmig;
847   stats->from_proc = new_from_proc;
848   stats->to_proc = new_to_proc;
849   stats->n_objs = n_objs;
850
851   stats->commData = newCommData;
852   stats->n_comm = n_comm;
853
854   stats->deleteCommHash();
855   stats->makeCommHash();
856
857 }
858
859
860 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
861 extern int restarted;
862 #endif
863
864 void CentralLB::ReceiveMigration(LBMigrateMsg *m)
865 {
866   storedMigrateMsg = m;
867 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
868         ProcessReceiveMigration((CkReductionMsg*)NULL);
869 #else
870   CkCallback cb(CkIndex_CentralLB::ProcessReceiveMigration((CkReductionMsg*)NULL),
871                   thisProxy);
872   contribute(0, NULL, CkReduction::max_int, cb);
873
874 #endif
875 }
876
877 void CentralLB::ProcessReceiveMigration(CkReductionMsg  *msg)
878 {
879 #if CMK_LBDB_ON
880         int i;
881         LBMigrateMsg *m = storedMigrateMsg;
882         CmiAssert(m!=NULL);
883         delete msg;
884
885 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
886         int *dummyCounts;
887
888         DEBUGF(("[%d] Starting ReceiveMigration WITH step %d m->step %d\n",CkMyPe(),step(),m->step));
889         // CmiPrintf("[%d] Starting ReceiveMigration step %d m->step %d\n",CkMyPe(),step(),m->step);
890         if(step() > m->step){
891                 char str[100];
892                 envelope *env = UsrToEnv(m);
893                 CmiPrintf("[%d] Object %s tProcessed %d m->TN %d\n",CmiMyPe(),mlogData->objID.toString(str),mlogData->tProcessed,env->TN);
894                 return;
895         }
896         lbDecisionCount = m->lbDecisionCount;
897 #endif
898
899   if (_lb_args.debug() > 1) 
900     if (CkMyPe()%1024==0) CmiPrintf("[%d] Starting ReceiveMigration step %d at %f\n",CkMyPe(),step(), CmiWallTimer());
901
902   for (i=0; i<CkNumPes(); i++) theLbdb->lastLBInfo.expectedLoad[i] = m->expectedLoad[i];
903   CmiAssert(migrates_expected <= 0 || migrates_completed == migrates_expected);
904 /*FAULT_EVAC*/
905   if(!CmiNodeAlive(CkMyPe())){
906         delete m;
907         return;
908   }
909   migrates_expected = 0;
910   future_migrates_expected = 0;
911 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
912         int sending=0;
913     int dummy=0;
914         LBDB *_myLBDB = theLbdb->getLBDB();
915         if(_restartFlag){
916         dummyCounts = new int[CmiNumPes()];
917         bzero(dummyCounts,sizeof(int)*CmiNumPes());
918     }
919 #endif
920   for(i=0; i < m->n_moves; i++) {
921     MigrateInfo& move = m->moves[i];
922     const int me = CkMyPe();
923     if (move.from_pe == me && move.to_pe != me) {
924       DEBUGF(("[%d] migrating object to %d\n",move.from_pe,move.to_pe));
925       // migrate object, in case it is already gone, inform toPe
926 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
927       if (theLbdb->Migrate(move.obj,move.to_pe) == 0) 
928          thisProxy[move.to_pe].MissMigrate(!move.async_arrival);
929 #else
930             if(_restartFlag == 0){
931                 DEBUG(CmiPrintf("[%d] need to move object from %d to %d \n",CkMyPe(),move.from_pe,move.to_pe));
932                 theLbdb->Migrate(move.obj,move.to_pe);
933                 sending++;
934             }else{
935                 if(_myLBDB->validObjHandle(move.obj)){
936                     DEBUG(CmiPrintf("[%d] need to move object from %d to %d \n",CkMyPe(),move.from_pe,move.to_pe));
937                     theLbdb->Migrate(move.obj,move.to_pe);
938                     sending++;
939                 }else{
940                     DEBUG(CmiPrintf("[%d] dummy move to pe %d detected after restart \n",CmiMyPe(),move.to_pe));
941                     dummyCounts[move.to_pe]++;
942                     dummy++;
943                 }
944             }
945 #endif
946     } else if (move.from_pe != me && move.to_pe == me) {
947        DEBUGF(("[%d] expecting object from %d\n",move.to_pe,move.from_pe));
948       if (!move.async_arrival) migrates_expected++;
949       else future_migrates_expected++;
950     }
951   }
952   DEBUGF(("[%d] in ReceiveMigration %d moves expected: %d future expected: %d\n",CkMyPe(),m->n_moves, migrates_expected, future_migrates_expected));
953   // if (_lb_debug) CkPrintf("[%d] expecting %d objects migrating.\n", CkMyPe(), migrates_expected);
954
955 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
956         if(_restartFlag){
957                 sendDummyMigrationCounts(dummyCounts);
958                 _restartFlag  =0;
959         delete []dummyCounts;
960         }
961 #endif
962
963
964 #if 0
965   if (m->n_moves ==0) {
966     theLbdb->SetLBPeriod(theLbdb->GetLBPeriod()*2);
967   }
968 #endif
969   cur_ld_balancer = m->next_lb;
970   if((CkMyPe() == cur_ld_balancer) && (cur_ld_balancer != 0)){
971       LBDatabaseObj()->set_avail_vector(m->avail_vector, -2);
972   }
973
974   if (migrates_expected == 0 || migrates_completed == migrates_expected)
975     MigrationDone(1);
976   delete m;
977
978 //      CkEvacuatedElement();
979 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
980 //  migrates_expected = 0;
981 //  //  ResumeClients(1);
982 #endif
983 #endif
984 }
985
986 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
987 void CentralLB::ReceiveDummyMigration(int globalDecisionCount){
988     DEBUGF(("[%d] ReceiveDummyMigration called for step %d with globalDecisionCount %d\n",CkMyPe(),step(),globalDecisionCount));
989     //TODO: this is gonna be important when a crash happens during checkpoint
990     //the globalDecisionCount would have to be saved and compared against
991     //a future recvMigration
992                 
993         thisProxy[CkMyPe()].ResumeClients(1);
994 }
995 #endif
996
997 void CentralLB::MigrationDone(int balancing)
998 {
999 #if CMK_LBDB_ON
1000   migrates_completed = 0;
1001   migrates_expected = -1;
1002   // clear load stats
1003   if (balancing) theLbdb->ClearLoads();
1004   // Increment to next step
1005   theLbdb->incStep();
1006         DEBUGF(("[%d] Incrementing Step %d \n",CkMyPe(),step()));
1007   // if sync resume, invoke a barrier
1008
1009 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1010     savedBalancing = balancing;
1011     startLoadBalancingMlog(&resumeCentralLbAfterChkpt,(void *)this);
1012 #endif
1013
1014   LBDatabase::Object()->MigrationDone();    // call registered callbacks
1015
1016   LoadbalanceDone(balancing);        // callback
1017 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
1018   // if sync resume invoke a barrier
1019   if (balancing && _lb_args.syncResume()) {
1020     CkCallback cb(CkIndex_CentralLB::ResumeClients((CkReductionMsg*)NULL), 
1021                   thisProxy);
1022     contribute(0, NULL, CkReduction::sum_int, cb);
1023   }
1024   else{ 
1025     if(CmiNodeAlive(CkMyPe())){
1026         thisProxy [CkMyPe()].ResumeClients(balancing);
1027     }   
1028   }     
1029 #if CMK_GRID_QUEUE_AVAILABLE
1030   CmiGridQueueDeregisterAll ();
1031   CpvAccess(CkGridObject) = NULL;
1032 #endif
1033 #endif 
1034 #endif
1035 }
1036
1037 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1038 void CentralLB::endMigrationDone(int balancing){
1039     DEBUGF(("[%d] CentralLB::endMigrationDone step %d\n",CkMyPe(),step()));
1040
1041
1042   if (balancing && _lb_args.syncResume()) {
1043     CkCallback cb(CkIndex_CentralLB::ResumeClients((CkReductionMsg*)NULL),
1044                   thisProxy);
1045     contribute(0, NULL, CkReduction::sum_int, cb);
1046   }
1047   else{
1048     if(CmiNodeAlive(CkMyPe())){
1049     DEBUGF(("[%d] Sending ResumeClients balancing %d \n",CkMyPe(),balancing));
1050     thisProxy [CkMyPe()].ResumeClients(balancing);
1051     }
1052   }
1053
1054 }
1055 #endif
1056
1057 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1058 void resumeCentralLbAfterChkpt(void *_lb){
1059     CentralLB *lb= (CentralLB *)_lb;
1060     CpvAccess(_currentObj)=lb;
1061     lb->endMigrationDone(lb->savedBalancing);
1062 }
1063 #endif
1064
1065
1066 void CentralLB::ResumeClients(CkReductionMsg *msg)
1067 {
1068   ResumeClients(1);
1069   delete msg;
1070 }
1071
1072 void CentralLB::ResumeClients(int balancing)
1073 {
1074 #if CMK_LBDB_ON
1075 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1076     resumeCount++;
1077     globalResumeCount = resumeCount;
1078 #endif
1079   DEBUGF(("[%d] Resuming clients. balancing:%d.\n",CkMyPe(),balancing));
1080   if (balancing && _lb_args.debug() && CkMyPe() == cur_ld_balancer) {
1081     double end_lb_time = CkWallTimer();
1082   }
1083
1084 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
1085   if (balancing) ComlibNotifyMigrationDone();  
1086 #endif
1087
1088   theLbdb->ResumeClients();
1089   if (balancing)  {
1090
1091     CheckMigrationComplete();
1092     if (future_migrates_expected == 0 || 
1093             future_migrates_expected == future_migrates_completed) {
1094       CheckMigrationComplete();
1095     }
1096   }
1097 #endif
1098 }
1099
1100 /*
1101   migration of objects contains two different kinds:
1102   (1) objects want to make a barrier for migration completion
1103       (waitForBarrier is true)
1104       migrationDone() to finish and resumeClients
1105   (2) objects don't need a barrier
1106   However, next load balancing can only happen when both migrations complete
1107 */ 
1108 void CentralLB::CheckMigrationComplete()
1109 {
1110 #if CMK_LBDB_ON
1111   lbdone ++;
1112   if (lbdone == 2) {
1113     double end_lb_time = CkWallTimer();
1114     if (_lb_args.debug() && CkMyPe()==0) {
1115       CkPrintf("CharmLB> %s: PE [%d] step %d finished at %f duration %f s\n\n",
1116                 lbname, cur_ld_balancer, step()-1, end_lb_time,
1117                 end_lb_time-start_lb_time);
1118     }
1119
1120     theLbdb->SetMigrationCost(end_lb_time - start_lb_time);
1121
1122     lbdone = 0;
1123     future_migrates_expected = -1;
1124     future_migrates_completed = 0;
1125
1126
1127     DEBUGF(("[%d] Migration Complete\n", CkMyPe()));
1128     // release local barrier  so that the next load balancer can go
1129     LDOMHandle h;
1130     h.id.id.idx = 0;
1131     theLbdb->getLBDB()->DoneRegisteringObjects(h);
1132     // switch to the next load balancer in the list
1133     // subtle: called from Migrated() may result in Migrated() called in next LB
1134     theLbdb->nextLoadbalancer(seqno);
1135   }
1136 #endif
1137 }
1138
1139 void CentralLB::preprocess(LDStats* stats)
1140 {
1141   if (_lb_args.ignoreBgLoad())
1142     stats->clearBgLoad();
1143
1144   // Call the predictor for the future
1145   if (_lb_predict) FuturePredictor(statsData);
1146 }
1147
1148 // default load balancing strategy
1149 LBMigrateMsg* CentralLB::Strategy(LDStats* stats)
1150 {
1151 #if CMK_LBDB_ON
1152   double strat_start_time = CkWallTimer();
1153   if (_lb_args.debug())
1154     CkPrintf("CharmLB> %s: PE [%d] strategy starting at %f\n", lbname, cur_ld_balancer, strat_start_time);
1155
1156   work(stats);
1157
1158
1159   if (_lb_args.debug()>2)  {
1160     CkPrintf("CharmLB> Obj Map:\n");
1161     for (int i=0; i<stats->n_objs; i++) CkPrintf("%d ", stats->to_proc[i]);
1162     CkPrintf("\n");
1163   }
1164
1165   LBMigrateMsg *msg = createMigrateMsg(stats);
1166
1167   int clients = CkNumPes();
1168   LBInfo info(clients);
1169   getPredictedLoadWithMsg(stats, clients, msg, info, 0);
1170   LBRealType mLoad, mCpuLoad, totalLoad, totalLoadWComm;
1171   info.getSummary(mLoad, mCpuLoad, totalLoad);
1172   //CkPrintf("CharmLB> Max load w/o comm %lf Max cpu load %lf Avg load %lf\n", mLoad, mCpuLoad, totalLoad/clients);
1173   //info.print();
1174 //NOTE!!  theLbdb->UpdateAfterLBData(mLoad, mCpuLoad, totalLoad/clients);
1175
1176   //getPredictedLoadWithMsg(stats, clients, msg, info,1);
1177   //info.getSummary(mLoad, mCpuLoad, totalLoadWComm);
1178   //info.print();
1179   //CkPrintf("CharmLB> Max load with comm %lf Max cpu load %lf Avg load %lf\n", mLoad, mCpuLoad, totalLoad/clients);
1180   //int nmsgs, nbytes;
1181   //stats->computeNonlocalComm(nmsgs, nbytes);
1182   //CkPrintf("CharmLB> Non local communication %d msg and %d bytes\n", nmsgs, nbytes);
1183
1184
1185   //long msg_n;
1186   //long long bytes_n;
1187   //stats->computeComm(msg_n, bytes_n);
1188   //CkPrintf("CharmLB> Total communication %ld msg and %lld bytes\n", nmsgs, nbytes);
1189
1190   //double alpha_beta_cost = (msg_n * alpha) + (bytes_n * beta);
1191   //theLbdb->UpdateAfterLBComm(alpha_beta_cost/totalLoad);
1192
1193   if (_lb_args.debug()) {
1194     double strat_end_time = CkWallTimer();
1195     envelope *env = UsrToEnv(msg);
1196
1197     double lbdbMemsize = LBDatabase::Object()->useMem()/1000;
1198     CkPrintf("CharmLB> %s: PE [%d] Memory: LBManager: %d KB CentralLB: %d KB\n",
1199               lbname, cur_ld_balancer, (int)lbdbMemsize, (int)(useMem()/1000));
1200     CkPrintf("CharmLB> %s: PE [%d] #Objects migrating: %d, LBMigrateMsg size: %.2f MB\n", lbname, cur_ld_balancer, msg->n_moves, env->getTotalsize()/1024.0/1024.0);
1201     CkPrintf("CharmLB> %s: PE [%d] strategy finished at %f duration %f s\n",
1202               lbname, cur_ld_balancer, strat_end_time, strat_end_time-strat_start_time);
1203     //CkPrintf("Strategy cost %f %f %f\n", strat_end_time, strat_start_time, adaptive_struct.lb_strategy_cost);
1204     theLbdb->SetStrategyCost(strat_end_time - strat_start_time);
1205   }
1206   return msg;
1207 #else
1208   return NULL;
1209 #endif
1210 }
1211
1212 void CentralLB::work(LDStats* stats)
1213 {
1214   // does nothing but print the database
1215   stats->print();
1216 }
1217
1218 // generate migrate message from stats->from_proc and to_proc
1219 LBMigrateMsg * CentralLB::createMigrateMsg(LDStats* stats)
1220 {
1221   int i;
1222   CkVec<MigrateInfo*> migrateInfo;
1223   for (i=0; i<stats->n_objs; i++) {
1224     LDObjData &objData = stats->objData[i];
1225     int frompe = stats->from_proc[i];
1226     int tope = stats->to_proc[i];
1227     if (frompe != tope) {
1228       //      CkPrintf("[%d] Obj %d migrating from %d to %d\n",
1229       //         CkMyPe(),obj,pe,dest);
1230       MigrateInfo *migrateMe = new MigrateInfo;
1231       migrateMe->obj = objData.handle;
1232       migrateMe->from_pe = frompe;
1233       migrateMe->to_pe = tope;
1234       migrateMe->async_arrival = objData.asyncArrival;
1235       migrateInfo.insertAtEnd(migrateMe);
1236     }
1237   }
1238
1239   int migrate_count=migrateInfo.length();
1240   LBMigrateMsg* msg = new(migrate_count,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
1241   msg->n_moves = migrate_count;
1242   for(i=0; i < migrate_count; i++) {
1243     MigrateInfo* item = (MigrateInfo*) migrateInfo[i];
1244     msg->moves[i] = *item;
1245     delete item;
1246     migrateInfo[i] = 0;
1247   }
1248   return msg;
1249 }
1250
1251 LBMigrateMsg * CentralLB::extractMigrateMsg(LBMigrateMsg *m, int p)
1252 {
1253   int nmoves = 0;
1254   int nunavail = 0;
1255   int i;
1256   for (i=0; i<m->n_moves; i++) {
1257     MigrateInfo* item = (MigrateInfo*) &m->moves[i];
1258     if (item->from_pe == p || item->to_pe == p) nmoves++;
1259   }
1260   for (i=0; i<CkNumPes();i++) {
1261     if (!m->avail_vector[i]) nunavail++;
1262   }
1263   LBMigrateMsg* msg;
1264   if (nunavail) msg = new(nmoves,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
1265   else msg = new(nmoves,0,0,0) LBMigrateMsg;
1266   msg->n_moves = nmoves;
1267   msg->level = m->level;
1268   msg->next_lb = m->next_lb;
1269   for (i=0,nmoves=0; i<m->n_moves; i++) {
1270     MigrateInfo* item = (MigrateInfo*) &m->moves[i];
1271     if (item->from_pe == p || item->to_pe == p) {
1272       msg->moves[nmoves] = *item;
1273       nmoves++;
1274     }
1275   }
1276   // copy processor data
1277   if (nunavail)
1278   for (i=0; i<CkNumPes();i++) {
1279     msg->avail_vector[i] = m->avail_vector[i];
1280     msg->expectedLoad[i] = m->expectedLoad[i];
1281   }
1282   return msg;
1283 }
1284
1285 void CentralLB::simulationWrite() {
1286   if(step() == LBSimulation::dumpStep)
1287   {
1288     // here we are supposed to dump the database
1289     int dumpFileSize = strlen(LBSimulation::dumpFile) + 4;
1290     char *dumpFileName = (char *)malloc(dumpFileSize);
1291     while (sprintf(dumpFileName, "%s.%d", LBSimulation::dumpFile, LBSimulation::dumpStep) >= dumpFileSize) {
1292       free(dumpFileName);
1293       dumpFileSize+=3;
1294       dumpFileName = (char *)malloc(dumpFileSize);
1295     }
1296     writeStatsMsgs(dumpFileName);
1297     free(dumpFileName);
1298     CmiPrintf("LBDump: Dumped the load balancing data at step %d.\n",LBSimulation::dumpStep);
1299     ++LBSimulation::dumpStep;
1300     --LBSimulation::dumpStepSize;
1301     if (LBSimulation::dumpStepSize <= 0) { // prevent stupid step sizes
1302       CmiPrintf("Charm++> Exiting...\n");
1303       CkExit();
1304     }
1305     return;
1306   }
1307 }
1308
1309 void CentralLB::simulationRead() {
1310   LBSimulation *simResults = NULL, *realResults;
1311   LBMigrateMsg *voidMessage = new (0,0,0,0) LBMigrateMsg();
1312   voidMessage->n_moves=0;
1313   for ( ;LBSimulation::simStepSize > 0; --LBSimulation::simStepSize, ++LBSimulation::simStep) {
1314     // here we are supposed to read the data from the dump database
1315     int simFileSize = strlen(LBSimulation::dumpFile) + 4;
1316     char *simFileName = (char *)malloc(simFileSize);
1317     while (sprintf(simFileName, "%s.%d", LBSimulation::dumpFile, LBSimulation::simStep) >= simFileSize) {
1318       free(simFileName);
1319       simFileSize+=3;
1320       simFileName = (char *)malloc(simFileSize);
1321     }
1322     readStatsMsgs(simFileName);
1323
1324     // allocate simResults (only the first step)
1325     if (simResults == NULL) {
1326       simResults = new LBSimulation(LBSimulation::simProcs);
1327       realResults = new LBSimulation(LBSimulation::simProcs);
1328     }
1329     else {
1330       // should be the same number of procs of the original simulation!
1331       if (!LBSimulation::procsChanged) {
1332         // it means we have a previous step, so in simResults there is data.
1333         // we can now print the real effects of the load balancer during the simulation
1334         // or print the difference between the predicted data and the real one.
1335         realResults->reset();
1336         // reset to_proc of statsData to be equal to from_proc
1337         for (int k=0; k < statsData->n_objs; ++k) statsData->to_proc[k] = statsData->from_proc[k];
1338         findSimResults(statsData, LBSimulation::simProcs, voidMessage, realResults);
1339         simResults->PrintDifferences(realResults,statsData);
1340       }
1341       simResults->reset();
1342     }
1343
1344     // now pass it to the strategy routine
1345     double startT = CkWallTimer();
1346     preprocess(statsData);
1347     CmiPrintf("%s> Strategy starts ... \n", lbname);
1348     LBMigrateMsg* migrateMsg = Strategy(statsData);
1349     CmiPrintf("%s> Strategy took %fs memory usage: CentralLB: %d KB.\n",
1350                lbname, CkWallTimer()-startT, (int)(useMem()/1000));
1351
1352     // now calculate the results of the load balancing simulation
1353     findSimResults(statsData, LBSimulation::simProcs, migrateMsg, simResults);
1354
1355     // now we have the simulation data, so print it and loop
1356     CmiPrintf("Charm++> LBSim: Simulation of load balancing step %d done.\n",LBSimulation::simStep);
1357     // **CWL** Officially recording my disdain here for using ints for bool
1358     if (LBSimulation::showDecisionsOnly) {
1359       simResults->PrintDecisions(migrateMsg, simFileName, 
1360                                  LBSimulation::simProcs);
1361     } else {
1362       simResults->PrintSimulationResults();
1363     }
1364
1365     free(simFileName);
1366     delete migrateMsg;
1367     CmiPrintf("Charm++> LBSim: Passing to the next step\n");
1368   }
1369   // deallocate simResults
1370   delete simResults;
1371   CmiPrintf("Charm++> Exiting...\n");
1372   CkExit();
1373 }
1374
1375 void CentralLB::readStatsMsgs(const char* filename) 
1376 {
1377 #if CMK_LBDB_ON
1378   int i;
1379   FILE *f = fopen(filename, "r");
1380   if (f==NULL) {
1381     CmiPrintf("Fatal Error> Cannot open LB Dump file %s!\n", filename);
1382     CmiAbort("");
1383   }
1384
1385   // at this stage, we need to rebuild the statsMsgList and
1386   // statsDataList structures. For that first deallocate the
1387   // old structures
1388   if (statsMsgsList) {
1389     for(i = 0; i < stats_msg_count; i++)
1390       delete statsMsgsList[i];
1391     delete[] statsMsgsList;
1392     statsMsgsList=0;
1393   }
1394
1395   PUP::fromDisk pd(f);
1396   PUP::machineInfo machInfo;
1397
1398   pd((char *)&machInfo, sizeof(machInfo));      // read machine info
1399   PUP::xlater p(machInfo, pd);
1400
1401   if (_lb_args.lbversion() > 1) {
1402     p|_lb_args.lbversion();             // write version number
1403     CkPrintf("LB> File version detected: %d\n", _lb_args.lbversion());
1404     CmiAssert(_lb_args.lbversion() <= LB_FORMAT_VERSION);
1405   }
1406   p|stats_msg_count;
1407
1408   CmiPrintf("readStatsMsgs for %d pes starts ... \n", stats_msg_count);
1409   if (LBSimulation::simProcs == 0) LBSimulation::simProcs = stats_msg_count;
1410   if (LBSimulation::simProcs != stats_msg_count) LBSimulation::procsChanged = true;
1411
1412   // LBSimulation::simProcs must be set
1413   statsData->pup(p);
1414
1415   CmiPrintf("Simulation for %d pes \n", LBSimulation::simProcs);
1416   CmiPrintf("n_obj: %d n_migratble: %d \n", statsData->n_objs, statsData->n_migrateobjs);
1417
1418   // file f is closed in the destructor of PUP::fromDisk
1419   CmiPrintf("ReadStatsMsg from %s completed\n", filename);
1420 #endif
1421 }
1422
1423 void CentralLB::writeStatsMsgs(const char* filename) 
1424 {
1425 #if CMK_LBDB_ON
1426   FILE *f = fopen(filename, "w");
1427   if (f==NULL) {
1428     CmiPrintf("Fatal Error> writeStatsMsgs failed to open the output file %s!\n", filename);
1429     CmiAbort("");
1430   }
1431
1432   const PUP::machineInfo &machInfo = PUP::machineInfo::current();
1433   PUP::toDisk p(f);
1434   p((char *)&machInfo, sizeof(machInfo));       // machine info
1435
1436   p|_lb_args.lbversion();               // write version number
1437   p|stats_msg_count;
1438   statsData->pup(p);
1439
1440   fclose(f);
1441
1442   CmiPrintf("WriteStatsMsgs to %s succeed!\n", filename);
1443 #endif
1444 }
1445
1446 // calculate the predicted wallclock/cpu load for every processors
1447 // considering communication overhead if considerComm is true
1448 void getPredictedLoadWithMsg(BaseLB::LDStats* stats, int count, 
1449                       LBMigrateMsg *msg, LBInfo &info, 
1450                       int considerComm)
1451 {
1452 #if CMK_LBDB_ON
1453         stats->makeCommHash();
1454
1455         // update to_proc according to migration msgs
1456         for(int i = 0; i < msg->n_moves; i++) {
1457           MigrateInfo &mInfo = msg->moves[i];
1458           int idx = stats->getHash(mInfo.obj.objID(), mInfo.obj.omID());
1459           CmiAssert(idx != -1);
1460           stats->to_proc[idx] = mInfo.to_pe;
1461         }
1462
1463         info.getInfo(stats, count, considerComm);
1464 #endif
1465 }
1466
1467
1468 void CentralLB::findSimResults(LDStats* stats, int count, LBMigrateMsg* msg, LBSimulation* simResults)
1469 {
1470     CkAssert(simResults != NULL && count == simResults->numPes);
1471     // estimate the new loads of the processors. As a first approximation, this is the
1472     // sum of the cpu times of the objects on that processor
1473     double startT = CkWallTimer();
1474     getPredictedLoadWithMsg(stats, count, msg, simResults->lbinfo, 1);
1475     CmiPrintf("getPredictedLoad finished in %fs\n", CkWallTimer()-startT);
1476 }
1477
1478 void CentralLB::pup(PUP::er &p) { 
1479   BaseLB::pup(p); 
1480   if (p.isUnpacking())  {
1481     initLB(CkLBOptions(seqno)); 
1482   }
1483   p|reduction_started;
1484   int has_statsMsg=0;
1485   if (p.isPacking()) has_statsMsg = (statsMsg!=NULL);
1486   p|has_statsMsg;
1487   if (has_statsMsg) {
1488     if (p.isUnpacking())
1489       statsMsg = new CLBStatsMsg;
1490     statsMsg->pup(p);
1491   }
1492 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1493   p | lbDecisionCount;
1494   p | resumeCount;
1495 #endif
1496         
1497 }
1498
1499 int CentralLB::useMem() { 
1500   return sizeof(CentralLB) + statsData->useMem() + 
1501          CkNumPes() * sizeof(CLBStatsMsg *);
1502 }
1503
1504
1505 /**
1506   CLBStatsMsg is not a real message now.
1507   CLBStatsMsg is used for all processors to fill in their local load and comm
1508   statistics and send to processor 0
1509 */
1510
1511 CLBStatsMsg::CLBStatsMsg(int osz, int csz) {
1512   n_objs = osz;
1513   n_comm = csz;
1514   objData = new LDObjData[osz];
1515   commData = new LDCommData[csz];
1516   avail_vector = NULL;
1517 }
1518
1519 CLBStatsMsg::~CLBStatsMsg() {
1520   delete [] objData;
1521   delete [] commData;
1522   delete [] avail_vector;
1523 }
1524
1525 void CLBStatsMsg::pup(PUP::er &p) {
1526   int i;
1527   p|from_pe;
1528   p|pe_speed;
1529   p|total_walltime;
1530   p|idletime;
1531   p|bg_walltime;
1532 #if CMK_LB_CPUTIMER
1533   p|total_cputime;
1534   p|bg_cputime;
1535 #endif
1536 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1537   p | step;
1538 #endif
1539   p|n_objs;
1540   if (p.isUnpacking()) objData = new LDObjData[n_objs];
1541   for (i=0; i<n_objs; i++) p|objData[i];
1542   p|n_comm;
1543   if (p.isUnpacking()) commData = new LDCommData[n_comm];
1544   for (i=0; i<n_comm; i++) p|commData[i];
1545
1546   int has_avail_vector;
1547   if (!p.isUnpacking()) has_avail_vector = (avail_vector != NULL);
1548   p|has_avail_vector;
1549   if (p.isUnpacking()) {
1550     if (has_avail_vector) avail_vector = new char[CkNumPes()];
1551     else avail_vector = NULL;
1552   }
1553   if (has_avail_vector) p(avail_vector, CkNumPes());
1554
1555   p(next_lb);
1556 }
1557
1558 // CkMarshalledCLBStatsMessage is used in the marshalled parameter in
1559 // the entry function, it is just used to use to pup.
1560 // I don't use CLBStatsMsg directly as marshalled parameter because
1561 // I want the data pointer stored and not to be freed by the Charm++.
1562 void CkMarshalledCLBStatsMessage::free() { 
1563   int count = msgs.size();
1564   for  (int i=0; i<count; i++) {
1565     delete msgs[i];
1566     msgs[i] = NULL;
1567   }
1568   msgs.free();
1569 }
1570
1571 void CkMarshalledCLBStatsMessage::add(CkMarshalledCLBStatsMessage &m)
1572 {
1573   int count = m.getCount();
1574   for (int i=0; i<count; i++) add(m.getMessage(i));
1575 }
1576
1577 void CkMarshalledCLBStatsMessage::pup(PUP::er &p)
1578 {
1579   int count = msgs.size();
1580   p|count;
1581   for (int i=0; i<count; i++) {
1582     CLBStatsMsg *msg;
1583     if (p.isUnpacking()) msg = new CLBStatsMsg;
1584     else { 
1585       msg = msgs[i]; CmiAssert(msg!=NULL);
1586     }
1587     msg->pup(p);
1588     if (p.isUnpacking()) add(msg);
1589   }
1590 }
1591
1592 SpanningTree::SpanningTree()
1593 {
1594         double sq = sqrt(CkNumPes()*4.0-3.0) - 1; // 1 + arity + arity*arity = CkNumPes()
1595         arity = (int)ceil(sq/2);
1596         calcParent(CkMyPe());
1597         calcNumChildren(CkMyPe());
1598 }
1599
1600 void SpanningTree::calcParent(int n)
1601 {
1602         parent=-1;
1603         if(n != 0  && arity > 0)
1604                 parent = (n-1)/arity;
1605 }
1606
1607 void SpanningTree::calcNumChildren(int n)
1608 {
1609         numChildren = 0;
1610         if (arity == 0) return;
1611         int fullNode=(CkNumPes()-1-arity)/arity;
1612         if(n <= fullNode)
1613                 numChildren = arity;
1614         if(n == fullNode+1)
1615                 numChildren = CkNumPes()-1-(fullNode+1)*arity;
1616         if(n > fullNode+1)
1617                 numChildren = 0;
1618 }
1619
1620 #include "CentralLB.def.h"
1621  
1622 /*@}*/