49024694e298405623afd6178cce5929dc033ca3
[charm.git] / src / ck-ldb / CentralLB.C
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 /**
9  * \addtogroup CkLdb
10 */
11 /*@{*/
12
13 #include <charm++.h>
14 #include "envelope.h"
15 #include "CentralLB.h"
16 #include "LBDBManager.h"
17 #include "LBSimulation.h"
18
19 #define  DEBUGF(x)      // CmiPrintf x;
20
21 CkGroupID loadbalancer;
22 int * lb_ptr;
23 int load_balancer_created;
24
25 CreateLBFunc_Def(CentralLB);
26
27 static void lbinit(void) {
28   LBRegisterBalancer("CentralLB", CreateCentralLB, AllocateCentralLB, "CentralLB base class");
29 }
30
31 static void getPredictedLoad(CentralLB::LDStats* stats, int count, 
32                              LBMigrateMsg *, double *peLoads, 
33                              double &, double &, int considerComm);
34
35 /*
36 void CreateCentralLB()
37 {
38   CProxy_CentralLB::ckNew(0);
39 }
40 */
41
42 void CentralLB::staticStartLB(void* data)
43 {
44   CentralLB *me = (CentralLB*)(data);
45   me->StartLB();
46 }
47
48 void CentralLB::staticMigrated(void* data, LDObjHandle h, int waitBarrier)
49 {
50   CentralLB *me = (CentralLB*)(data);
51   me->Migrated(h, waitBarrier);
52 }
53
54 void CentralLB::staticAtSync(void* data)
55 {
56   CentralLB *me = (CentralLB*)(data);
57   me->AtSync();
58 }
59
60 void CentralLB::initLB(const CkLBOptions &opt)
61 {
62 #if CMK_LBDB_ON
63   lbname = "CentralLB";
64   thisProxy = CProxy_CentralLB(thisgroup);
65   //  CkPrintf("Construct in %d\n",CkMyPe());
66
67   // create and turn on by default
68   receiver = theLbdb->
69     AddLocalBarrierReceiver((LDBarrierFn)(staticAtSync),(void*)(this));
70   notifier = theLbdb->getLBDB()->
71     NotifyMigrated((LDMigratedFn)(staticMigrated),(void*)(this));
72   startLbFnHdl = theLbdb->getLBDB()->
73     AddStartLBFn((LDStartLBFn)(staticStartLB),(void*)(this));
74
75   // CkPrintf("[%d] CentralLB seq %d\n",CkMyPe(), seq);
76   if (opt.getSeqNo() > 0) turnOff();
77
78   stats_msg_count = 0;
79   statsMsgsList = new CLBStatsMsg*[CkNumPes()];
80   for(int i=0; i < CkNumPes(); i++)
81     statsMsgsList[i] = 0;
82
83   statsData = new LDStats;
84
85   // for future predictor
86   if (_lb_predict) predicted_model = new FutureModel(_lb_predict_window);
87   else predicted_model=0;
88   // register user interface callbacks
89   theLbdb->getLBDB()->SetupPredictor((LDPredictModelFn)(staticPredictorOn),(LDPredictWindowFn)(staticPredictorOnWin),(LDPredictFn)(staticPredictorOff),(LDPredictModelFn)(staticChangePredictor),(void*)(this));
90
91   myspeed = theLbdb->ProcessorSpeed();
92
93   migrates_completed = 0;
94   future_migrates_completed = 0;
95   migrates_expected = -1;
96   future_migrates_expected = -1;
97   cur_ld_balancer = 0;
98   lbdone = 0;
99   int num_proc = CkNumPes();
100
101   theLbdb->CollectStatsOn();
102
103   load_balancer_created = 1;
104 #endif
105 }
106
107 CentralLB::~CentralLB()
108 {
109 #if CMK_LBDB_ON
110   delete [] statsMsgsList;
111   delete statsData;
112   theLbdb = CProxy_LBDatabase(_lbdb).ckLocalBranch();
113   if (theLbdb) {
114     theLbdb->getLBDB()->
115       RemoveNotifyMigrated(notifier);
116     theLbdb->
117       RemoveStartLBFn((LDStartLBFn)(staticStartLB));
118   }
119 #endif
120 }
121
122 void CentralLB::turnOn() 
123 {
124 #if CMK_LBDB_ON
125   theLbdb->getLBDB()->
126     TurnOnBarrierReceiver(receiver);
127   theLbdb->getLBDB()->
128     TurnOnNotifyMigrated(notifier);
129   theLbdb->getLBDB()->
130     TurnOnStartLBFn(startLbFnHdl);
131 #endif
132 }
133
134 void CentralLB::turnOff() 
135 {
136 #if CMK_LBDB_ON
137   theLbdb->getLBDB()->
138     TurnOffBarrierReceiver(receiver);
139   theLbdb->getLBDB()->
140     TurnOffNotifyMigrated(notifier);
141   theLbdb->getLBDB()->
142     TurnOffStartLBFn(startLbFnHdl);
143 #endif
144 }
145
146 void CentralLB::AtSync()
147 {
148 #if CMK_LBDB_ON
149   DEBUGF(("[%d] CentralLB AtSync step %d!!!!!\n",CkMyPe(),step()));
150
151   // if num of processor is only 1, nothing should happen
152   if (!QueryBalanceNow(step()) || CkNumPes() == 1) {
153     MigrationDone(0);
154     return;
155   }
156   thisProxy [CkMyPe()].ProcessAtSync();
157 #endif
158 }
159
160 void CentralLB::ProcessAtSync()
161 {
162 #if CMK_LBDB_ON
163   if (CkMyPe() == cur_ld_balancer) {
164     start_lb_time = CmiWallTimer();
165   }
166   // build and send stats
167   const int osz = theLbdb->GetObjDataSz();
168   const int csz = theLbdb->GetCommDataSz();
169
170   int npes = CkNumPes();
171   CLBStatsMsg* msg = new CLBStatsMsg(osz, csz);
172   msg->from_pe = CkMyPe();
173   // msg->serial = rand();
174   msg->serial = CrnRand();
175
176   theLbdb->TotalTime(&msg->total_walltime,&msg->total_cputime);
177   theLbdb->IdleTime(&msg->idletime);
178   theLbdb->BackgroundLoad(&msg->bg_walltime,&msg->bg_cputime);
179   msg->pe_speed = myspeed;
180 //  CkPrintf(
181 //    "Processors %d Total time (wall,cpu) = %f %f Idle = %f Bg = %f %f\n",
182 //    CkMyPe(),msg->total_walltime,msg->total_cputime,
183 //    msg->idletime,msg->bg_walltime,msg->bg_cputime);
184
185   msg->n_objs = osz;
186   theLbdb->GetObjData(msg->objData);
187   msg->n_comm = csz;
188   theLbdb->GetCommData(msg->commData);
189 //  theLbdb->ClearLoads();
190   DEBUGF(("PE %d sending %d to ReceiveStats %d objs, %d comm\n",
191            CkMyPe(),msg->serial,msg->n_objs,msg->n_comm));
192
193 // Scheduler PART.
194
195   if(CkMyPe() == cur_ld_balancer) {
196     LBDatabaseObj()->get_avail_vector(msg->avail_vector);
197     msg->next_lb = LBDatabaseObj()->new_lbbalancer();
198   }
199
200   thisProxy[cur_ld_balancer].ReceiveStats(msg);
201
202   {
203   // enfore the barrier to wait until centralLB says no
204   LDOMHandle h;
205   h.id.id.idx = 0;
206   theLbdb->getLBDB()->RegisteringObjects(h);
207   }
208 #endif
209 }
210
211 void CentralLB::Migrated(LDObjHandle h, int waitBarrier)
212 {
213 #if CMK_LBDB_ON
214   if (waitBarrier) {
215     migrates_completed++;
216     //  CkPrintf("[%d] An object migrated! %d %d\n",
217     //             CkMyPe(),migrates_completed,migrates_expected);
218     if (migrates_completed == migrates_expected) {
219       MigrationDone(1);
220     }
221   }
222   else {
223     future_migrates_completed ++;
224     DEBUGF(("[%d] An object migrated with no barrier! %d %d\n",
225            CkMyPe(),future_migrates_completed,future_migrates_expected));
226     if (future_migrates_completed == future_migrates_expected)  {
227         CheckMigrationComplete();
228     }
229   }
230 #endif
231 }
232
233 void CentralLB::MissMigrate(int waitForBarrier)
234 {
235   LDObjHandle h;
236   Migrated(h, waitForBarrier);
237 }
238
239 // build data from buffered msg
240 void CentralLB::buildStats()
241 {
242     statsData->count = stats_msg_count;
243     statsData->objData = new LDObjData[statsData->n_objs];
244     statsData->from_proc = new int[statsData->n_objs];
245     statsData->to_proc = new int[statsData->n_objs];
246     statsData->commData = new LDCommData[statsData->n_comm];
247     int nobj = 0;
248     int ncom = 0;
249     int nmigobj = 0;
250     // copy all data in individule message to this big structure
251     for (int pe=0; pe<stats_msg_count; pe++) {
252        int i;
253        CLBStatsMsg *msg = statsMsgsList[pe];
254        for (i=0; i<msg->n_objs; i++) {
255          statsData->from_proc[nobj] = statsData->to_proc[nobj] = pe;
256          statsData->objData[nobj] = msg->objData[i];
257          if (msg->objData[i].migratable) nmigobj++;
258          nobj++;
259        }
260        for (i=0; i<msg->n_comm; i++) {
261          statsData->commData[ncom] = msg->commData[i];
262          ncom++;
263        }
264        // free the memory
265        delete msg;
266        statsMsgsList[pe]=0;
267     }
268     statsData->n_migrateobjs = nmigobj;
269     if (_lb_args.debug()) {
270       CmiPrintf("n_obj:%d migratable:%d ncom:%d\n", nobj, nmigobj, ncom);
271     }
272 }
273
274 void CentralLB::ReceiveStats(CkMarshalledCLBStatsMessage &msg)
275 {
276 #if CMK_LBDB_ON
277   CLBStatsMsg *m = (CLBStatsMsg *)msg.getMessage();
278   const int pe = m->from_pe;
279 //  CkPrintf("Stats msg received, %d %d %d %d %p\n",
280 //         pe,stats_msg_count,m->n_objs,m->serial,m);
281
282   if (pe == cur_ld_balancer) {
283       LBDatabaseObj()->set_avail_vector(m->avail_vector,  m->next_lb);
284   }
285
286   if (statsMsgsList[pe] != 0) {
287     CkPrintf("*** Unexpected CLBStatsMsg in ReceiveStats from PE %d ***\n",
288              pe);
289   } else {
290     statsMsgsList[pe] = m;
291     // store per processor data right away
292     struct ProcStats &procStat = statsData->procs[pe];
293     procStat.total_walltime = m->total_walltime;
294     procStat.total_cputime = m->total_cputime;
295     if (_lb_args.ignoreBgLoad()) {
296       procStat.idletime = 0.0;
297       procStat.bg_walltime = 0.0;
298       procStat.bg_cputime = 0.0;
299     }
300     else {
301       procStat.idletime = m->idletime;
302       procStat.bg_walltime = m->bg_walltime;
303       procStat.bg_cputime = m->bg_cputime;
304     }
305     procStat.pe_speed = m->pe_speed;
306     procStat.utilization = 1.0;
307     procStat.available = CmiTrue;
308     procStat.n_objs = m->n_objs;
309
310     statsData->n_objs += m->n_objs;
311     statsData->n_comm += m->n_comm;
312     stats_msg_count++;
313   }
314
315   DEBUGF(("[0] ReceiveStats from %d step: %d count: %d\n", pe, step(), stats_msg_count));
316   const int clients = CkNumPes();
317
318   if (stats_msg_count == clients) {
319     thisProxy[CkMyPe()].LoadBalance();
320   }
321 #endif
322 }
323
324 void CentralLB::LoadBalance()
325 {
326 #if CMK_LBDB_ON
327   int proc;
328   if (_lb_args.debug()) 
329       CmiPrintf("[%s] Load balancing step %d starting at %f in PE%d\n",
330                  lbName(), step(),start_lb_time, cur_ld_balancer);
331 //    double strat_start_time = CmiWallTimer();
332
333   // build data
334   buildStats();
335
336   // if we are in simulation mode read data
337   if (LBSimulation::doSimulation) simulationRead();
338
339   char *availVector = LBDatabaseObj()->availVector();
340   const int clients = CkNumPes();
341   for(proc = 0; proc < clients; proc++)
342       statsData->procs[proc].available = (CmiBool)availVector[proc];
343
344   // Call the predictor for the future
345   if (_lb_predict) FuturePredictor(statsData);
346
347 //    CkPrintf("Before Calling Strategy\n");
348   LBMigrateMsg* migrateMsg = Strategy(statsData, clients);
349
350 //    CkPrintf("returned successfully\n");
351   LBDatabaseObj()->get_avail_vector(migrateMsg->avail_vector);
352   migrateMsg->next_lb = LBDatabaseObj()->new_lbbalancer();
353
354   // if this is the step at which we need to dump the database
355   simulationWrite();
356
357 //  calculate predicted load
358 //  very time consuming though, so only happen when debugging is on
359   if (_lb_args.debug()) {
360       double minObjLoad, maxObjLoad;
361       getPredictedLoad(statsData, clients, migrateMsg, migrateMsg->expectedLoad, minObjLoad, maxObjLoad, 1);
362   }
363
364 //  CkPrintf("calling recv migration\n");
365   thisProxy.ReceiveMigration(migrateMsg);
366
367   // Zero out data structures for next cycle
368   // CkPrintf("zeroing out data\n");
369   statsData->clear();
370   stats_msg_count=0;
371 #endif
372 }
373
374 //    double strat_end_time = CmiWallTimer();
375 //    CkPrintf("Strat elapsed time %f\n",strat_end_time-strat_start_time);
376 // test if sender and receiver in a commData is nonmigratable.
377 static int isMigratable(LDObjData **objData, int *len, int count, const LDCommData &commData)
378 {
379 #if CMK_LBDB_ON
380   for (int pe=0 ; pe<count; pe++)
381   {
382     for (int i=0; i<len[pe]; i++)
383       if (LDObjIDEqual(objData[pe][i].objID(), commData.sender.objID()) ||
384           LDObjIDEqual(objData[pe][i].objID(), commData.receiver.get_destObj().objID())) 
385       return 0;
386   }
387 #endif
388   return 1;
389 }
390
391 // rebuild LDStats and remove all non-migratble objects and related things
392 void CentralLB::removeNonMigratable(LDStats* stats, int count)
393 {
394   int i;
395
396   LDObjData *nonmig = new LDObjData[stats->n_migrateobjs];
397   int *new_from_proc = new int[stats->n_migrateobjs];
398   int *new_to_proc = new int[stats->n_migrateobjs];
399   int n_objs = 0;
400   for (i=0; i<stats->n_objs; i++) 
401   {
402     LDObjData &odata = stats->objData[i];
403     if (odata.migratable) {
404       nonmig[n_objs] = odata;
405       new_from_proc[n_objs] = stats->from_proc[i];
406       new_to_proc[n_objs] = stats->to_proc[i];
407       n_objs ++;
408     }
409     else {
410       stats->procs[stats->from_proc[i]].bg_walltime += odata.wallTime;
411       stats->procs[stats->from_proc[i]].bg_cputime += odata.cpuTime;
412     }
413   }
414   CmiAssert(stats->n_migrateobjs == n_objs);
415
416   stats->makeCommHash();
417   
418   LDCommData *newCommData = new LDCommData[stats->n_comm];
419   int n_comm = 0;
420   for (i=0; i<stats->n_comm; i++) 
421   {
422     LDCommData& cdata = stats->commData[i];
423     if (!cdata.from_proc()) 
424     {
425       int idx = stats->getSendHash(cdata);
426       CmiAssert(idx != -1);
427       if (!stats->objData[idx].migratable) continue;
428     }
429     switch (cdata.receiver.get_type()) {
430     case LD_PROC_MSG:
431       break;
432     case LD_OBJ_MSG:  {
433       int idx = stats->getRecvHash(cdata);
434       CmiAssert(idx != -1);
435       if (!stats->objData[idx].migratable) continue;
436       break;
437       }
438     case LD_OBJLIST_MSG:    // object message FIXME add multicast
439       break;
440     }
441     newCommData[n_comm] = cdata;
442     n_comm ++;
443   }
444
445   if (n_objs != stats->n_objs) CmiPrintf("Removed %d nonmigratable %d comms - n_objs:%d migratable:%d\n", stats->n_objs-n_objs, stats->n_objs, stats->n_migrateobjs, stats->n_comm-n_comm);
446
447   // swap to new data
448   delete [] stats->objData;
449   delete [] stats->from_proc;
450   delete [] stats->to_proc;
451
452   stats->objData = nonmig;
453   stats->from_proc = new_from_proc;
454   stats->to_proc = new_to_proc;
455   stats->n_objs = n_objs;
456
457   delete [] stats->commData;
458   stats->commData = newCommData;
459   stats->n_comm = n_comm;
460
461   stats->deleteCommHash();
462   stats->makeCommHash();
463
464 }
465
466
467 void CentralLB::ReceiveMigration(LBMigrateMsg *m)
468 {
469 #if CMK_LBDB_ON
470   int i;
471   for (i=0; i<CkNumPes(); i++) theLbdb->lastLBInfo.expectedLoad[i] = m->expectedLoad[i];
472   
473   migrates_expected = 0;
474   future_migrates_expected = 0;
475   for(i=0; i < m->n_moves; i++) {
476     MigrateInfo& move = m->moves[i];
477     const int me = CkMyPe();
478     if (move.from_pe == me && move.to_pe != me) {
479       DEBUGF(("[%d] migrating object to %d\n",move.from_pe,move.to_pe));
480       // migrate object, in case it is already gone, inform toPe
481       if (theLbdb->Migrate(move.obj,move.to_pe) == 0) 
482          thisProxy[move.to_pe].MissMigrate(!move.async_arrival);
483     } else if (move.from_pe != me && move.to_pe == me) {
484       // CkPrintf("[%d] expecting object from %d\n",move.to_pe,move.from_pe);
485       if (!move.async_arrival) migrates_expected++;
486       else future_migrates_expected++;
487     }
488   }
489   DEBUGF(("[%d] in ReceiveMigration %d moves expected: %d future expected: %d\n",CkMyPe(),m->n_moves, migrates_expected, future_migrates_expected));
490   // if (_lb_debug) CkPrintf("[%d] expecting %d objects migrating.\n", CkMyPe(), migrates_expected);
491 #if 0
492   if (m->n_moves ==0) {
493     theLbdb->SetLBPeriod(theLbdb->GetLBPeriod()*2);
494   }
495 #endif
496
497   cur_ld_balancer = m->next_lb;
498   if((CkMyPe() == cur_ld_balancer) && (cur_ld_balancer != 0)){
499       LBDatabaseObj()->set_avail_vector(m->avail_vector, -2);
500   }
501
502   if (migrates_expected == 0 || migrates_completed == migrates_expected)
503     MigrationDone(1);
504   delete m;
505 #endif
506 }
507
508
509 void CentralLB::MigrationDone(int balancing)
510 {
511 #if CMK_LBDB_ON
512   migrates_completed = 0;
513   migrates_expected = -1;
514   // clear load stats
515   if (balancing) theLbdb->ClearLoads();
516   // Increment to next step
517   theLbdb->incStep();
518   // if sync resume, invoke a barrier
519   if (balancing && _lb_args.syncResume()) {
520     CkCallback cb(CkIndex_CentralLB::ResumeClients((CkReductionMsg*)NULL), 
521                   thisProxy);
522     contribute(0, NULL, CkReduction::sum_int, cb);
523   }
524   else 
525     thisProxy [CkMyPe()].ResumeClients(balancing);
526 #endif
527 }
528
529 void CentralLB::ResumeClients(CkReductionMsg *msg)
530 {
531   ResumeClients(1);
532   delete msg;
533 }
534
535 void CentralLB::ResumeClients(int balancing)
536 {
537 #if CMK_LBDB_ON
538   DEBUGF(("[%d] Resuming clients. balancing:%d.\n",CkMyPe(),balancing));
539   if (balancing && _lb_args.debug() && CkMyPe() == cur_ld_balancer) {
540     double end_lb_time = CmiWallTimer();
541     CkPrintf("[%s] Load balancing step %d finished at %f\n",
542               lbName(), step()-1,end_lb_time);
543     double lbdbMemsize = LBDatabase::Object()->useMem()/1000;
544     CkPrintf("[%s] duration %fs memUsage: LBManager:%dKB CentralLB:%dKB\n", 
545               lbName(), end_lb_time - start_lb_time,
546               (int)lbdbMemsize, (int)(useMem()/1000));
547   }
548
549   theLbdb->ResumeClients();
550   if (balancing)  {
551     CheckMigrationComplete();
552     if (future_migrates_expected == 0 || 
553             future_migrates_expected == future_migrates_completed) {
554       CheckMigrationComplete();
555     }
556   }
557 #endif
558 }
559
560 /*
561   migration of objects contains two different kinds:
562   (1) objects want to make a barrier for migration completion
563       (waitForBarrier is true)
564       migrationDone() to finish and resumeClients
565   (2) objects don't need a barrier
566   However, next load balancing can only happen when both migrations complete
567 */ 
568 void CentralLB::CheckMigrationComplete()
569 {
570 #if CMK_LBDB_ON
571   lbdone ++;
572   if (lbdone == 2) {
573     lbdone = 0;
574     future_migrates_expected = -1;
575     future_migrates_completed = 0;
576     DEBUGF(("[%d] MigrationComplete\n", CkMyPe()));
577     // release local barrier  so that the next load balancer can go
578     LDOMHandle h;
579     h.id.id.idx = 0;
580     theLbdb->getLBDB()->DoneRegisteringObjects(h);
581     // switch to the next load balancer in the list
582     theLbdb->nextLoadbalancer(seqno);
583   }
584 #endif
585 }
586
587 // default load balancing strategy
588 LBMigrateMsg* CentralLB::Strategy(LDStats* stats,int count)
589 {
590 #if CMK_LBDB_ON
591   work(stats, count);
592   return createMigrateMsg(stats, count);
593 #else
594   return NULL;
595 #endif
596 }
597
598 void CentralLB::work(LDStats* stats,int count)
599 {
600 #if CMK_LBDB_ON
601   int i;
602   for(int pe=0; pe < count; pe++) {
603     struct ProcStats &proc = stats->procs[pe];
604
605     CkPrintf(
606       "Proc %d Sp %d Total time (wall,cpu) = %f %f Idle = %f Bg = %f %f\n",
607       pe,proc.pe_speed,proc.total_walltime,proc.total_cputime,
608       proc.idletime,proc.bg_walltime,proc.bg_cputime);
609   }
610
611   int osz = stats->n_objs;
612     CkPrintf("------------- Object Data: %d objects -------------\n",
613              stats->n_objs);
614     for(i=0; i < osz; i++) {
615       LDObjData &odata = stats->objData[i];
616       CkPrintf("Object %d\n",i);
617       CkPrintf("     id = %d\n",odata.objID().id[0]);
618       CkPrintf("  OM id = %d\n",odata.omID().id);
619       CkPrintf("   Mig. = %d\n",odata.migratable);
620       CkPrintf("    CPU = %f\n",odata.cpuTime);
621       CkPrintf("   Wall = %f\n",odata.wallTime);
622     }
623
624     const int csz = stats->n_comm;
625
626     CkPrintf("------------- Comm Data: %d records -------------\n",
627              csz);
628     LDCommData *cdata = stats->commData;
629     for(i=0; i < csz; i++) {
630       CkPrintf("Link %d\n",i);
631
632       if (cdata[i].from_proc())
633         CkPrintf("    sender PE = %d\n",cdata[i].src_proc);
634       else
635         CkPrintf("    sender id = %d:%d\n",
636                  cdata[i].sender.omID().id,cdata[i].sender.objID().id[0]);
637
638       if (cdata[i].recv_type() == LD_PROC_MSG)
639         CkPrintf("  receiver PE = %d\n",cdata[i].receiver.proc());
640       else      
641         CkPrintf("  receiver id = %d:%d\n",
642                  cdata[i].receiver.get_destObj().omID().id,cdata[i].receiver.get_destObj().objID().id[0]);
643       
644       CkPrintf("     messages = %d\n",cdata[i].messages);
645       CkPrintf("        bytes = %d\n",cdata[i].bytes);
646     }
647 #endif
648 }
649
650 // generate migrate message from stats->from_proc and to_proc
651 LBMigrateMsg * CentralLB::createMigrateMsg(LDStats* stats,int count)
652 {
653   int i;
654   CkVec<MigrateInfo*> migrateInfo;
655   for (i=0; i<stats->n_objs; i++) {
656     LDObjData &objData = stats->objData[i];
657     int frompe = stats->from_proc[i];
658     int tope = stats->to_proc[i];
659     if (frompe != tope) {
660       //      CkPrintf("[%d] Obj %d migrating from %d to %d\n",
661       //         CkMyPe(),obj,pe,dest);
662       MigrateInfo *migrateMe = new MigrateInfo;
663       migrateMe->obj = objData.handle;
664       migrateMe->from_pe = frompe;
665       migrateMe->to_pe = tope;
666       migrateMe->async_arrival = objData.asyncArrival;
667       migrateInfo.insertAtEnd(migrateMe);
668     }
669   }
670
671   int migrate_count=migrateInfo.length();
672   LBMigrateMsg* msg = new(migrate_count,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
673   msg->n_moves = migrate_count;
674   for(i=0; i < migrate_count; i++) {
675     MigrateInfo* item = (MigrateInfo*) migrateInfo[i];
676     msg->moves[i] = *item;
677     delete item;
678     migrateInfo[i] = 0;
679   }
680   if (_lb_args.debug())
681     CkPrintf("%s: %d objects migrating.\n", lbname, migrate_count);
682   return msg;
683 }
684
685 void CentralLB::simulationWrite() {
686   if(step() == LBSimulation::dumpStep)
687   {
688     // here we are supposed to dump the database
689     int dumpFileSize = strlen(LBSimulation::dumpFile) + 4;
690     char *dumpFileName = (char *)malloc(dumpFileSize);
691     while (sprintf(dumpFileName, "%s.%d", LBSimulation::dumpFile, LBSimulation::dumpStep) >= dumpFileSize) {
692       free(dumpFileName);
693       dumpFileSize+=3;
694       dumpFileName = (char *)malloc(dumpFileSize);
695     }
696     writeStatsMsgs(dumpFileName);
697     free(dumpFileName);
698     CmiPrintf("LBDump: Dumped the load balancing data at step %d.\n",LBSimulation::dumpStep);
699     ++LBSimulation::dumpStep;
700     --LBSimulation::dumpStepSize;
701     if (LBSimulation::dumpStepSize <= 0) { // prevent stupid step sizes
702       CmiPrintf("Charm++> Exiting...\n");
703       CkExit();
704     }
705     return;
706   }
707 }
708
709 void CentralLB::simulationRead() {
710   LBSimulation *simResults = NULL, *realResults;
711   LBMigrateMsg *voidMessage = new (0,0,0,0) LBMigrateMsg();
712   voidMessage->n_moves=0;
713   for ( ;LBSimulation::simStepSize > 0; --LBSimulation::simStepSize, ++LBSimulation::simStep) {
714     // here we are supposed to read the data from the dump database
715     int simFileSize = strlen(LBSimulation::dumpFile) + 4;
716     char *simFileName = (char *)malloc(simFileSize);
717     while (sprintf(simFileName, "%s.%d", LBSimulation::dumpFile, LBSimulation::simStep) >= simFileSize) {
718       free(simFileName);
719       simFileSize+=3;
720       simFileName = (char *)malloc(simFileSize);
721     }
722     readStatsMsgs(simFileName);
723     free(simFileName);
724
725     // allocate simResults (only the first step
726     if (simResults == NULL) {
727       simResults = new LBSimulation(LBSimulation::simProcs);
728       realResults = new LBSimulation(LBSimulation::simProcs);
729     }
730     else {
731       // should be the same number of procs of the original simulation!
732       if (!LBSimulation::procsChanged) {
733         // it means we have a previous step, so in simResults there is data.
734         // we can now print the real effects of the load balancer during the simulation
735         // or print the difference between the predicted data and the real one.
736         realResults->reset();
737         // reset to_proc of statsData to be equal to from_proc
738         for (int k=0; k < statsData->n_objs; ++k) statsData->to_proc[k] = statsData->from_proc[k];
739         findSimResults(statsData, LBSimulation::simProcs, voidMessage, realResults);
740         simResults->PrintDifferences(realResults,statsData);
741       }
742       simResults->reset();
743     }
744
745     // now pass it to the strategy routine
746     double startT = CmiWallTimer();
747     CmiPrintf("%s> Strategy starts ... \n", lbname);
748     LBMigrateMsg* migrateMsg = Strategy(statsData, LBSimulation::simProcs);
749     CmiPrintf("%s> Strategy took %fs memory usage: CentralLB:%dKB. \n", 
750                lbname, CmiWallTimer()-startT, (int)(useMem()/1000));
751
752     // now calculate the results of the load balancing simulation
753     findSimResults(statsData, LBSimulation::simProcs, migrateMsg, simResults);
754
755     // now we have the simulation data, so print it and loop
756     CmiPrintf("Charm++> LBSim: Simulation of load balancing step %d done.\n",LBSimulation::simStep);
757     simResults->PrintSimulationResults();
758
759     delete migrateMsg;
760     CmiPrintf("Charm++> LBSim: Passing to the next step\n");
761   }
762   // deallocate simResults
763   delete simResults;
764   CmiPrintf("Charm++> Exiting...\n");
765   CkExit();
766 }
767
768 void CentralLB::readStatsMsgs(const char* filename) 
769 {
770 #if CMK_LBDB_ON
771   int i;
772   FILE *f = fopen(filename, "r");
773   if (f==NULL) {
774     CmiPrintf("Fatal Error> Cannot open LB Dump file %s!\n", filename);
775     CmiAbort("");
776   }
777
778   // at this stage, we need to rebuild the statsMsgList and
779   // statsDataList structures. For that first deallocate the
780   // old structures
781   if (statsMsgsList) {
782     for(i = 0; i < stats_msg_count; i++)
783       delete statsMsgsList[i];
784     delete[] statsMsgsList;
785     statsMsgsList=0;
786   }
787
788   PUP::fromDisk pd(f);
789   PUP::machineInfo machInfo;
790
791   pd((char *)&machInfo, sizeof(machInfo));      // read machine info
792   PUP::xlater p(machInfo, pd);
793
794   p|stats_msg_count;
795
796   CmiPrintf("readStatsMsgs for %d pes starts ... \n", stats_msg_count);
797   if (LBSimulation::simProcs != stats_msg_count) LBSimulation::procsChanged = true;
798   if (LBSimulation::simProcs == 0) LBSimulation::simProcs = stats_msg_count;
799
800   // LBSimulation::simProcs must be set
801   statsData->pup(p);
802
803   CmiPrintf("Simulation for %d pes \n", LBSimulation::simProcs);
804   CmiPrintf("n_obj: %d n_migratble: %d \n", statsData->n_objs, statsData->n_migrateobjs);
805
806   // file f is closed in the destructor of PUP::fromDisk
807   CmiPrintf("ReadStatsMsg from %s completed\n", filename);
808 #endif
809 }
810
811 void CentralLB::writeStatsMsgs(const char* filename) 
812 {
813 #if CMK_LBDB_ON
814   FILE *f = fopen(filename, "w");
815   if (f==NULL) {
816     CmiPrintf("Fatal Error> writeStatsMsgs failed to open the output file %s!\n", filename);
817     CmiAbort("");
818   }
819
820   const PUP::machineInfo &machInfo = PUP::machineInfo::current();
821   PUP::toDisk p(f);
822   p((char *)&machInfo, sizeof(machInfo));       // machine info
823
824   p|stats_msg_count;
825   statsData->pup(p);
826
827   fclose(f);
828
829   CmiPrintf("WriteStatsMsgs to %s succeed!\n", filename);
830 #endif
831 }
832
833 // calculate the predicted wallclock/cpu load for every processors
834 // considering communication overhead if considerComm is true
835 static void getPredictedLoad(CentralLB::LDStats* stats, int count, 
836                              LBMigrateMsg *msg, double *peLoads, 
837                              double &minObjLoad, double &maxObjLoad,
838                              int considerComm)
839 {
840 #if CMK_LBDB_ON
841         int i, pe;
842
843         minObjLoad = 1.0e20;    // I suppose no object load is beyond this
844         maxObjLoad = 0.0;
845
846         stats->makeCommHash();
847
848         // update to_proc according to migration msgs
849         for(i = 0; i < msg->n_moves; i++) {
850           MigrateInfo &mInfo = msg->moves[i];
851           int idx = stats->getHash(mInfo.obj.objID(), mInfo.obj.omID());
852           CmiAssert(idx != -1);
853           stats->to_proc[idx] = mInfo.to_pe;
854         }
855
856         for(pe = 0; pe < count; pe++)
857           peLoads[pe] = stats->procs[pe].bg_walltime;
858
859         for(int obj = 0; obj < stats->n_objs; obj++)
860         {
861                 int pe = stats->to_proc[obj];
862                 double &oload = stats->objData[obj].wallTime;
863                 if (oload < minObjLoad) minObjLoad = oload;
864                 if (oload > maxObjLoad) maxObjLoad = oload;
865                 peLoads[pe] += oload;
866         }
867
868         // handling of the communication overheads. 
869         if (considerComm) {
870           int* msgSentCount = new int[count]; // # of messages sent by each PE
871           int* msgRecvCount = new int[count]; // # of messages received by each PE
872           int* byteSentCount = new int[count];// # of bytes sent by each PE
873           int* byteRecvCount = new int[count];// # of bytes reeived by each PE
874           for(i = 0; i < count; i++)
875             msgSentCount[i] = msgRecvCount[i] = byteSentCount[i] = byteRecvCount[i] = 0;
876
877           for (int cidx=0; cidx < stats->n_comm; cidx++) {
878             LDCommData& cdata = stats->commData[cidx];
879             int senderPE, receiverPE;
880             if (cdata.from_proc())
881               senderPE = cdata.src_proc;
882             else {
883               int idx = stats->getHash(cdata.sender);
884               if (idx == -1) continue;    // sender has just migrated?
885               senderPE = stats->to_proc[idx];
886               CmiAssert(senderPE != -1);
887             }
888             if (cdata.receiver.get_type() == LD_PROC_MSG)
889               receiverPE = cdata.receiver.proc();
890             else {
891               int idx = stats->getHash(cdata.receiver.get_destObj());
892               if (idx == -1) continue;    // receiver has just been removed?
893               receiverPE = stats->to_proc[idx];
894               CmiAssert(receiverPE != -1);
895             }
896             if(senderPE != receiverPE)
897             {
898                 CmiAssert(senderPE < count && senderPE >= 0);
899                 CmiAssert(receiverPE < count && receiverPE >= 0);
900                 msgSentCount[senderPE] += cdata.messages;
901                 byteSentCount[senderPE] += cdata.bytes;
902
903                 msgRecvCount[receiverPE] += cdata.messages;
904                 byteRecvCount[receiverPE] += cdata.bytes;
905             }
906           }
907
908           // now for each processor, add to its load the send and receive overheads
909           for(i = 0; i < count; i++)
910           {
911                 peLoads[i] += msgRecvCount[i]  * PER_MESSAGE_RECV_OVERHEAD +
912                               msgSentCount[i]  * PER_MESSAGE_SEND_OVERHEAD +
913                               byteRecvCount[i] * PER_BYTE_RECV_OVERHEAD +
914                               byteSentCount[i] * PER_BYTE_SEND_OVERHEAD;
915           }
916           delete [] msgRecvCount;
917           delete [] msgSentCount;
918           delete [] byteRecvCount;
919           delete [] byteSentCount;
920         }
921 #endif
922 }
923
924 void CentralLB::findSimResults(LDStats* stats, int count, LBMigrateMsg* msg, LBSimulation* simResults)
925 {
926     CkAssert(simResults != NULL && count == simResults->numPes);
927     // estimate the new loads of the processors. As a first approximation, this is the
928     // get background load
929     for(int pe = 0; pe < count; pe++)
930           simResults->bgLoads[pe] = stats->procs[pe].bg_walltime;
931     // sum of the cpu times of the objects on that processor
932     double startT = CmiWallTimer();
933     getPredictedLoad(stats, count, msg, simResults->peLoads, 
934                      simResults->minObjLoad, simResults->maxObjLoad,1);
935     CmiPrintf("getPredictedLoad finished in %fs\n", CmiWallTimer()-startT);
936 }
937
938 void CentralLB::pup(PUP::er &p) { 
939   BaseLB::pup(p); 
940   if (p.isUnpacking())  {
941     initLB(CkLBOptions(seqno)); 
942   }
943 }
944
945 int CentralLB::useMem() { 
946   return sizeof(CentralLB) + statsData->useMem() + 
947          CkNumPes() * sizeof(CLBStatsMsg *);
948 }
949
950 static inline int i_abs(int c) { return c>0?c:-c; }
951
952 // assume integer is 32 bits
953 inline static int ObjKey(const LDObjid &oid, const int hashSize) {
954   // make sure all positive
955   return (((i_abs(oid.id[2]) & 0x7F)<<24)
956          |((i_abs(oid.id[1]) & 0xFF)<<16)
957          |i_abs(oid.id[0])) % hashSize;
958 }
959
960 CentralLB::LDStats::LDStats():  
961         n_objs(0), n_migrateobjs(0), objData(NULL), 
962         n_comm(0), commData(NULL), from_proc(NULL), to_proc(NULL), 
963         objHash(NULL) { 
964   procs = new ProcStats[CkNumPes()]; 
965 }
966
967 const static unsigned int doublingPrimes[] = {
968 3,
969 7,
970 17,
971 37,
972 73,
973 157,
974 307,
975 617,
976 1217,
977 2417,
978 4817,
979 9677,
980 20117,
981 40177,
982 80177,
983 160117,
984 320107,
985 640007,
986 1280107,
987 2560171,
988 5120117,
989 10000079,
990 20000077,
991 40000217,
992 80000111,
993 160000177,
994 320000171,
995 640000171,
996 1280000017,
997 2560000217u,
998 4200000071u
999 /* extra primes larger than an unsigned 32-bit integer:
1000 51200000077,
1001 100000000171,
1002 200000000171,
1003 400000000171,
1004 800000000117,
1005 1600000000021,
1006 3200000000051,
1007 6400000000081,
1008 12800000000003,
1009 25600000000021,
1010 51200000000077,
1011 100000000000067,
1012 200000000000027,
1013 400000000000063,
1014 800000000000017,
1015 1600000000000007,
1016 3200000000000059,
1017 6400000000000007,
1018 12800000000000009,
1019 25600000000000003,
1020 51200000000000023,
1021 100000000000000003,
1022 200000000000000003,
1023 400000000000000013,
1024 800000000000000119,
1025 1600000000000000031,
1026 3200000000000000059 //This is a 62-bit number
1027 */
1028 };
1029
1030 //This routine returns an arbitrary prime larger than x
1031 static unsigned int primeLargerThan(unsigned int x)
1032 {
1033         int i=0;
1034         while (doublingPrimes[i]<=x) i++;
1035         return doublingPrimes[i];
1036 }
1037
1038 void CentralLB::LDStats::makeCommHash() {
1039   // hash table is already build
1040   if (objHash) return;
1041    
1042   int i;
1043   hashSize = n_objs*2;
1044   hashSize = primeLargerThan(hashSize);
1045   objHash = new int[hashSize];
1046   for(i=0;i<hashSize;i++)
1047         objHash[i] = -1;
1048    
1049   for(i=0;i<n_objs;i++){
1050         const LDObjid &oid = objData[i].objID();
1051         int hash = ObjKey(oid, hashSize);
1052         CmiAssert(hash != -1);
1053         while(objHash[hash] != -1)
1054             hash = (hash+1)%hashSize;
1055         objHash[hash] = i;
1056   }
1057 }
1058
1059 void CentralLB::LDStats::deleteCommHash() {
1060   if (objHash) delete [] objHash;
1061   objHash = NULL;
1062 }
1063
1064 int CentralLB::LDStats::getHash(const LDObjid &oid, const LDOMid &mid)
1065 {
1066 #if CMK_LBDB_ON
1067     CmiAssert(hashSize > 0);
1068     int hash = ObjKey(oid, hashSize);
1069
1070     for(int id=0;id<hashSize;id++){
1071         int index = (id+hash)%hashSize;
1072         if (index == -1 || objHash[index] == -1) return -1;
1073         if (LDObjIDEqual(objData[objHash[index]].objID(), oid) &&
1074             LDOMidEqual(objData[objHash[index]].omID(), mid))
1075             return objHash[index];
1076     }
1077     //  CkPrintf("not found \n");
1078 #endif
1079     return -1;
1080 }
1081
1082 int CentralLB::LDStats::getHash(const LDObjKey &objKey)
1083 {
1084   const LDObjid &oid = objKey.objID();
1085   const LDOMid  &mid = objKey.omID();
1086   return getHash(oid, mid);
1087 }
1088
1089 int CentralLB::LDStats::getSendHash(LDCommData &cData)
1090 {
1091   if (cData.sendHash == -1) {
1092     cData.sendHash = getHash(cData.sender);
1093   }
1094   return cData.sendHash;
1095 }
1096
1097 int CentralLB::LDStats::getRecvHash(LDCommData &cData)
1098 {
1099   if (cData.recvHash == -1) {
1100     cData.recvHash =  getHash(cData.receiver.get_destObj());
1101   }
1102   return cData.recvHash;
1103 }
1104
1105 double CentralLB::LDStats::computeAverageLoad()
1106 {
1107   int i, numAvail=0;
1108   double total = 0;
1109   for (i=0; i<n_objs; i++) total += objData[i].wallTime;
1110                                                                                 
1111   for (i=0; i<count; i++)
1112     if (procs[i].available == CmiTrue) {
1113         total += procs[i].bg_walltime;
1114         numAvail++;
1115     }
1116                                                                                 
1117   double averageLoad = total/numAvail;
1118   return averageLoad;
1119 }
1120
1121 void CentralLB::LDStats::pup(PUP::er &p)
1122 {
1123   int i;
1124   p(count);  
1125   p(n_objs);
1126   p(n_migrateobjs);
1127   p(n_comm);
1128   if (p.isUnpacking()) {
1129     // user can specify simulated processors other than the real # of procs.
1130     int maxpe = count>LBSimulation::simProcs?count:LBSimulation::simProcs;
1131     procs = new ProcStats[maxpe];
1132     objData = new LDObjData[n_objs];
1133     commData = new LDCommData[n_comm];
1134     from_proc = new int[n_objs];
1135     to_proc = new int[n_objs];
1136     objHash = NULL;
1137   }
1138   // ignore the background load when unpacking if the user change the # of procs
1139   // otherwise load everything
1140   if (p.isUnpacking() && LBSimulation::procsChanged) {
1141     ProcStats dummy;
1142     for (i=0; i<count; i++) p|dummy; 
1143   }
1144   else
1145     for (i=0; i<count; i++) p|procs[i];
1146   for (i=0; i<n_objs; i++) p|objData[i]; 
1147   p(from_proc, n_objs);
1148   p(to_proc, n_objs);
1149   for (i=0; i<n_comm; i++) p|commData[i];
1150   if (p.isUnpacking())
1151     count = LBSimulation::simProcs;
1152   if (p.isUnpacking()) {
1153     objHash = NULL;
1154   }
1155 }
1156
1157 int CentralLB::LDStats::useMem() { 
1158   // calculate the memory usage of this LB (superclass).
1159   return sizeof(LDStats) + sizeof(ProcStats)*count + 
1160          (sizeof(LDObjData) + 2*sizeof(int)) * n_objs +
1161          sizeof(LDCommData) * n_comm;
1162 }
1163
1164 /**
1165   CLBStatsMsg is not a real message now.
1166   CLBStatsMsg is used for all processors to fill in their local load and comm
1167   statistics and send to processor 0
1168 */
1169
1170 CLBStatsMsg::CLBStatsMsg(int osz, int csz) {
1171   objData = new LDObjData[osz];
1172   commData = new LDCommData[csz];
1173   avail_vector = new char[CkNumPes()];
1174 }
1175
1176 CLBStatsMsg::~CLBStatsMsg() {
1177   delete [] objData;
1178   delete [] commData;
1179   delete [] avail_vector;
1180 }
1181
1182 void CLBStatsMsg::pup(PUP::er &p) {
1183   int i;
1184   p|from_pe;
1185   p|serial;
1186   p|pe_speed;
1187   p|total_walltime; p|total_cputime;
1188   p|idletime;
1189   p|bg_walltime;   p|bg_cputime;
1190   p|n_objs;
1191   if (p.isUnpacking()) objData = new LDObjData[n_objs];
1192   for (i=0; i<n_objs; i++) p|objData[i];
1193   p|n_comm;
1194   if (p.isUnpacking()) commData = new LDCommData[n_comm];
1195   for (i=0; i<n_comm; i++) p|commData[i];
1196   if (p.isUnpacking()) avail_vector = new char[CkNumPes()];
1197   p(avail_vector, CkNumPes());
1198   p(next_lb);
1199 }
1200
1201 // CkMarshalledCLBStatsMessage is used in the marshalled parameter in
1202 // the entry function, it is just used to use to pup.
1203 // I don't use CLBStatsMsg directly as marshalled parameter because
1204 // I want the data pointer stored and not to be freed by the Charm++.
1205 CkMarshalledCLBStatsMessage::~CkMarshalledCLBStatsMessage() {
1206   if (msg) delete msg;
1207 }
1208
1209 void CkMarshalledCLBStatsMessage::pup(PUP::er &p)
1210 {
1211   if (p.isUnpacking()) msg = new CLBStatsMsg;
1212   else CmiAssert(msg);
1213   msg->pup(p);
1214 }
1215
1216 #include "CentralLB.def.h"
1217
1218 /*@}*/