a modification in LDStats database, changed pointer to array to CkVec for easier...
[charm.git] / src / ck-ldb / CentralLB.C
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 /**
9  * \addtogroup CkLdb
10 */
11 /*@{*/
12
13 #include <charm++.h>
14 #include "envelope.h"
15 #include "CentralLB.h"
16 #include "LBDBManager.h"
17 #include "LBSimulation.h"
18
19 #define  DEBUGF(x)      // CmiPrintf x;
20
21 CkGroupID loadbalancer;
22 int * lb_ptr;
23 int load_balancer_created;
24
25 CreateLBFunc_Def(CentralLB, "CentralLB base class");
26
27 void getLoadInfo(BaseLB::LDStats* stats, int count, 
28                              LBInfo &info, int considerComm);
29
30 static void getPredictedLoadWithMsg(BaseLB::LDStats* stats, int count, 
31                              LBMigrateMsg *, LBInfo &info, int considerComm);
32
33 /*
34 void CreateCentralLB()
35 {
36   CProxy_CentralLB::ckNew(0);
37 }
38 */
39
40 void CentralLB::staticStartLB(void* data)
41 {
42   CentralLB *me = (CentralLB*)(data);
43   me->StartLB();
44 }
45
46 void CentralLB::staticMigrated(void* data, LDObjHandle h, int waitBarrier)
47 {
48   CentralLB *me = (CentralLB*)(data);
49   me->Migrated(h, waitBarrier);
50 }
51
52 void CentralLB::staticAtSync(void* data)
53 {
54   CentralLB *me = (CentralLB*)(data);
55   me->AtSync();
56 }
57
58 void CentralLB::initLB(const CkLBOptions &opt)
59 {
60 #if CMK_LBDB_ON
61   lbname = "CentralLB";
62   thisProxy = CProxy_CentralLB(thisgroup);
63   //  CkPrintf("Construct in %d\n",CkMyPe());
64
65   // create and turn on by default
66   receiver = theLbdb->
67     AddLocalBarrierReceiver((LDBarrierFn)(staticAtSync),(void*)(this));
68   notifier = theLbdb->getLBDB()->
69     NotifyMigrated((LDMigratedFn)(staticMigrated),(void*)(this));
70   startLbFnHdl = theLbdb->getLBDB()->
71     AddStartLBFn((LDStartLBFn)(staticStartLB),(void*)(this));
72
73   // CkPrintf("[%d] CentralLB seq %d\n",CkMyPe(), seq);
74   if (opt.getSeqNo() > 0) turnOff();
75
76   stats_msg_count = 0;
77   statsMsgsList = new CLBStatsMsg*[CkNumPes()];
78   for(int i=0; i < CkNumPes(); i++)
79     statsMsgsList[i] = 0;
80
81   statsData = new LDStats;
82
83   // for future predictor
84   if (_lb_predict) predicted_model = new FutureModel(_lb_predict_window);
85   else predicted_model=0;
86   // register user interface callbacks
87   theLbdb->getLBDB()->SetupPredictor((LDPredictModelFn)(staticPredictorOn),(LDPredictWindowFn)(staticPredictorOnWin),(LDPredictFn)(staticPredictorOff),(LDPredictModelFn)(staticChangePredictor),(void*)(this));
88
89   myspeed = theLbdb->ProcessorSpeed();
90
91   migrates_completed = 0;
92   future_migrates_completed = 0;
93   migrates_expected = -1;
94   future_migrates_expected = -1;
95   cur_ld_balancer = 0;
96   lbdone = 0;
97   int num_proc = CkNumPes();
98
99   if (_lb_args.statsOn()) theLbdb->CollectStatsOn();
100
101   load_balancer_created = 1;
102 #endif
103 }
104
105 CentralLB::~CentralLB()
106 {
107 #if CMK_LBDB_ON
108   delete [] statsMsgsList;
109   delete statsData;
110   theLbdb = CProxy_LBDatabase(_lbdb).ckLocalBranch();
111   if (theLbdb) {
112     theLbdb->getLBDB()->
113       RemoveNotifyMigrated(notifier);
114     theLbdb->
115       RemoveStartLBFn((LDStartLBFn)(staticStartLB));
116   }
117 #endif
118 }
119
120 void CentralLB::turnOn() 
121 {
122 #if CMK_LBDB_ON
123   theLbdb->getLBDB()->
124     TurnOnBarrierReceiver(receiver);
125   theLbdb->getLBDB()->
126     TurnOnNotifyMigrated(notifier);
127   theLbdb->getLBDB()->
128     TurnOnStartLBFn(startLbFnHdl);
129 #endif
130 }
131
132 void CentralLB::turnOff() 
133 {
134 #if CMK_LBDB_ON
135   theLbdb->getLBDB()->
136     TurnOffBarrierReceiver(receiver);
137   theLbdb->getLBDB()->
138     TurnOffNotifyMigrated(notifier);
139   theLbdb->getLBDB()->
140     TurnOffStartLBFn(startLbFnHdl);
141 #endif
142 }
143
144 void CentralLB::AtSync()
145 {
146 #if CMK_LBDB_ON
147   DEBUGF(("[%d] CentralLB AtSync step %d!!!!!\n",CkMyPe(),step()));
148
149   // if num of processor is only 1, nothing should happen
150   if (!QueryBalanceNow(step()) || CkNumPes() == 1) {
151     MigrationDone(0);
152     return;
153   }
154   thisProxy [CkMyPe()].ProcessAtSync();
155 #endif
156 }
157
158 #include "ComlibStrategy.h"
159
160 void CentralLB::ProcessAtSync()
161 {
162 #if CMK_LBDB_ON
163
164   if (CkMyPe() == cur_ld_balancer) {
165     start_lb_time = CkWallTimer();
166   }
167   // build and send stats
168   const int osz = theLbdb->GetObjDataSz();
169   const int csz = theLbdb->GetCommDataSz();
170
171   int npes = CkNumPes();
172   CLBStatsMsg* msg = new CLBStatsMsg(osz, csz);
173   msg->from_pe = CkMyPe();
174   // msg->serial = rand();
175   msg->serial = CrnRand();
176
177 /*
178   theLbdb->TotalTime(&msg->total_walltime,&msg->total_cputime);
179   theLbdb->IdleTime(&msg->idletime);
180   theLbdb->BackgroundLoad(&msg->bg_walltime,&msg->bg_cputime);
181 */
182   theLbdb->GetTime(&msg->total_walltime,&msg->total_cputime,
183                    &msg->idletime, &msg->bg_walltime,&msg->bg_cputime);
184   msg->pe_speed = myspeed;
185 //  CkPrintf(
186 //    "Processors %d Total time (wall,cpu) = %f %f Idle = %f Bg = %f %f\n",
187 //    CkMyPe(),msg->total_walltime,msg->total_cputime,
188 //    msg->idletime,msg->bg_walltime,msg->bg_cputime);
189
190   msg->n_objs = osz;
191   theLbdb->GetObjData(msg->objData);
192   msg->n_comm = csz;
193   theLbdb->GetCommData(msg->commData);
194 //  theLbdb->ClearLoads();
195   DEBUGF(("PE %d sending %d to ReceiveStats %d objs, %d comm\n",
196            CkMyPe(),msg->serial,msg->n_objs,msg->n_comm));
197
198 // Scheduler PART.
199
200   if(CkMyPe() == cur_ld_balancer) {
201     LBDatabaseObj()->get_avail_vector(msg->avail_vector);
202     msg->next_lb = LBDatabaseObj()->new_lbbalancer();
203   }
204
205   thisProxy[cur_ld_balancer].ReceiveStats(msg);
206
207   {
208   // enfore the barrier to wait until centralLB says no
209   LDOMHandle h;
210   h.id.id.idx = 0;
211   theLbdb->getLBDB()->RegisteringObjects(h);
212   }
213 #endif
214 }
215
216 void CentralLB::Migrated(LDObjHandle h, int waitBarrier)
217 {
218 #if CMK_LBDB_ON
219   if (waitBarrier) {
220     migrates_completed++;
221     //  CkPrintf("[%d] An object migrated! %d %d\n",
222     //             CkMyPe(),migrates_completed,migrates_expected);
223     if (migrates_completed == migrates_expected) {
224       MigrationDone(1);
225     }
226   }
227   else {
228     future_migrates_completed ++;
229     DEBUGF(("[%d] An object migrated with no barrier! %d %d\n",
230            CkMyPe(),future_migrates_completed,future_migrates_expected));
231     if (future_migrates_completed == future_migrates_expected)  {
232         CheckMigrationComplete();
233     }
234   }
235 #endif
236 }
237
238 void CentralLB::MissMigrate(int waitForBarrier)
239 {
240   LDObjHandle h;
241   Migrated(h, waitForBarrier);
242 }
243
244 // build data from buffered msg
245 void CentralLB::buildStats()
246 {
247     statsData->count = stats_msg_count;
248     // allocate space
249     statsData->objData.resize(statsData->n_objs);
250     statsData->from_proc.resize(statsData->n_objs);
251     statsData->to_proc.resize(statsData->n_objs);
252     statsData->commData.resize(statsData->n_comm);
253
254     int nobj = 0;
255     int ncom = 0;
256     int nmigobj = 0;
257     // copy all data from individule message to this big structure
258     for (int pe=0; pe<stats_msg_count; pe++) {
259        int i;
260        CLBStatsMsg *msg = statsMsgsList[pe];
261        for (i=0; i<msg->n_objs; i++) {
262          statsData->from_proc[nobj] = statsData->to_proc[nobj] = pe;
263          statsData->objData[nobj] = msg->objData[i];
264          if (msg->objData[i].migratable) nmigobj++;
265          nobj++;
266        }
267        for (i=0; i<msg->n_comm; i++) {
268          statsData->commData[ncom] = msg->commData[i];
269          ncom++;
270        }
271        // free the memory
272        delete msg;
273        statsMsgsList[pe]=0;
274     }
275     statsData->n_migrateobjs = nmigobj;
276     if (_lb_args.debug()) {
277       CmiPrintf("n_obj:%d migratable:%d ncom:%d\n", nobj, nmigobj, ncom);
278     }
279 }
280
281 void CentralLB::ReceiveStats(CkMarshalledCLBStatsMessage &msg)
282 {
283 #if CMK_LBDB_ON
284   CLBStatsMsg *m = (CLBStatsMsg *)msg.getMessage();
285   const int pe = m->from_pe;
286 //  CkPrintf("Stats msg received, %d %d %d %d %p\n",
287 //         pe,stats_msg_count,m->n_objs,m->serial,m);
288
289   if (pe == cur_ld_balancer) {
290       LBDatabaseObj()->set_avail_vector(m->avail_vector,  m->next_lb);
291   }
292
293   if (statsMsgsList[pe] != 0) {
294     CkPrintf("*** Unexpected CLBStatsMsg in ReceiveStats from PE %d ***\n",
295              pe);
296   } else {
297     statsMsgsList[pe] = m;
298     // store per processor data right away
299     struct ProcStats &procStat = statsData->procs[pe];
300     procStat.total_walltime = m->total_walltime;
301     procStat.total_cputime = m->total_cputime;
302     procStat.idletime = m->idletime;
303     procStat.bg_walltime = m->bg_walltime;
304     procStat.bg_cputime = m->bg_cputime;
305     procStat.pe_speed = m->pe_speed;
306     procStat.utilization = 1.0;
307     procStat.available = CmiTrue;
308     procStat.n_objs = m->n_objs;
309
310     statsData->n_objs += m->n_objs;
311     statsData->n_comm += m->n_comm;
312     stats_msg_count++;
313   }
314
315   DEBUGF(("[0] ReceiveStats from %d step: %d count: %d\n", pe, step(), stats_msg_count));
316   const int clients = CkNumPes();
317
318   if (stats_msg_count == clients) {
319     thisProxy[CkMyPe()].LoadBalance();
320   }
321 #endif
322 }
323
324 void CentralLB::LoadBalance()
325 {
326 #if CMK_LBDB_ON
327   int proc;
328   if (_lb_args.debug()) 
329       CmiPrintf("[%s] Load balancing step %d starting at %f in PE%d\n",
330                  lbName(), step(),start_lb_time, cur_ld_balancer);
331 //    double strat_start_time = CkWallTimer();
332
333   // build data
334   buildStats();
335
336   // if we are in simulation mode read data
337   if (LBSimulation::doSimulation) simulationRead();
338
339   char *availVector = LBDatabaseObj()->availVector();
340   const int clients = CkNumPes();
341   for(proc = 0; proc < clients; proc++)
342       statsData->procs[proc].available = (CmiBool)availVector[proc];
343
344   preprocess(statsData, clients);
345
346 //    CkPrintf("Before Calling Strategy\n");
347
348   LBMigrateMsg* migrateMsg = Strategy(statsData, clients);
349
350 //    CkPrintf("returned successfully\n");
351   LBDatabaseObj()->get_avail_vector(migrateMsg->avail_vector);
352   migrateMsg->next_lb = LBDatabaseObj()->new_lbbalancer();
353
354   // if this is the step at which we need to dump the database
355   simulationWrite();
356
357 //  calculate predicted load
358 //  very time consuming though, so only happen when debugging is on
359   if (_lb_args.debug()) {
360       LBInfo info(migrateMsg->expectedLoad, clients);
361       getPredictedLoadWithMsg(statsData, clients, migrateMsg, info, 1);
362   }
363
364   //  CkPrintf("calling recv migration\n");
365   thisProxy.ReceiveMigration(migrateMsg);
366
367   // Zero out data structures for next cycle
368   // CkPrintf("zeroing out data\n");
369   statsData->clear();
370   stats_msg_count=0;
371 #endif
372 }
373
374 //    double strat_end_time = CkWallTimer();
375 //    CkPrintf("Strat elapsed time %f\n",strat_end_time-strat_start_time);
376 // test if sender and receiver in a commData is nonmigratable.
377 static int isMigratable(LDObjData **objData, int *len, int count, const LDCommData &commData)
378 {
379 #if CMK_LBDB_ON
380   for (int pe=0 ; pe<count; pe++)
381   {
382     for (int i=0; i<len[pe]; i++)
383       if (LDObjIDEqual(objData[pe][i].objID(), commData.sender.objID()) ||
384           LDObjIDEqual(objData[pe][i].objID(), commData.receiver.get_destObj().objID())) 
385       return 0;
386   }
387 #endif
388   return 1;
389 }
390
391 // rebuild LDStats and remove all non-migratble objects and related things
392 void CentralLB::removeNonMigratable(LDStats* stats, int count)
393 {
394   int i;
395
396   CkVec<LDObjData> nonmig;
397   CkVec<int> new_from_proc, new_to_proc;
398   nonmig.resize(stats->n_migrateobjs);
399   new_from_proc.resize(stats->n_migrateobjs);
400   new_to_proc.resize(stats->n_migrateobjs);
401   int n_objs = 0;
402   for (i=0; i<stats->n_objs; i++) 
403   {
404     LDObjData &odata = stats->objData[i];
405     if (odata.migratable) {
406       nonmig[n_objs] = odata;
407       new_from_proc[n_objs] = stats->from_proc[i];
408       new_to_proc[n_objs] = stats->to_proc[i];
409       n_objs ++;
410     }
411     else {
412       stats->procs[stats->from_proc[i]].bg_walltime += odata.wallTime;
413       stats->procs[stats->from_proc[i]].bg_cputime += odata.cpuTime;
414     }
415   }
416   CmiAssert(stats->n_migrateobjs == n_objs);
417
418   stats->makeCommHash();
419   
420   CkVec<LDCommData> newCommData;
421   newCommData.resize(stats->n_comm);
422   int n_comm = 0;
423   for (i=0; i<stats->n_comm; i++) 
424   {
425     LDCommData& cdata = stats->commData[i];
426     if (!cdata.from_proc()) 
427     {
428       int idx = stats->getSendHash(cdata);
429       CmiAssert(idx != -1);
430       if (!stats->objData[idx].migratable) continue;
431     }
432     switch (cdata.receiver.get_type()) {
433     case LD_PROC_MSG:
434       break;
435     case LD_OBJ_MSG:  {
436       int idx = stats->getRecvHash(cdata);
437       CmiAssert(idx != -1);
438       if (!stats->objData[idx].migratable) continue;
439       break;
440       }
441     case LD_OBJLIST_MSG:    // object message FIXME add multicast
442       break;
443     }
444     newCommData[n_comm] = cdata;
445     n_comm ++;
446   }
447
448   if (n_objs != stats->n_objs) CmiPrintf("Removed %d nonmigratable %d comms - n_objs:%d migratable:%d\n", stats->n_objs-n_objs, stats->n_objs, stats->n_migrateobjs, stats->n_comm-n_comm);
449
450   // swap to new data
451   stats->objData = nonmig;
452   stats->from_proc = new_from_proc;
453   stats->to_proc = new_to_proc;
454   stats->n_objs = n_objs;
455
456   stats->commData = newCommData;
457   stats->n_comm = n_comm;
458
459   stats->deleteCommHash();
460   stats->makeCommHash();
461
462 }
463
464
465 void CentralLB::ReceiveMigration(LBMigrateMsg *m)
466 {
467 #if CMK_LBDB_ON
468   int i;
469   for (i=0; i<CkNumPes(); i++) theLbdb->lastLBInfo.expectedLoad[i] = m->expectedLoad[i];
470   
471   migrates_expected = 0;
472   future_migrates_expected = 0;
473   for(i=0; i < m->n_moves; i++) {
474     MigrateInfo& move = m->moves[i];
475     const int me = CkMyPe();
476     if (move.from_pe == me && move.to_pe != me) {
477       DEBUGF(("[%d] migrating object to %d\n",move.from_pe,move.to_pe));
478       // migrate object, in case it is already gone, inform toPe
479       if (theLbdb->Migrate(move.obj,move.to_pe) == 0) 
480          thisProxy[move.to_pe].MissMigrate(!move.async_arrival);
481     } else if (move.from_pe != me && move.to_pe == me) {
482       // CkPrintf("[%d] expecting object from %d\n",move.to_pe,move.from_pe);
483       if (!move.async_arrival) migrates_expected++;
484       else future_migrates_expected++;
485     }
486   }
487   DEBUGF(("[%d] in ReceiveMigration %d moves expected: %d future expected: %d\n",CkMyPe(),m->n_moves, migrates_expected, future_migrates_expected));
488   // if (_lb_debug) CkPrintf("[%d] expecting %d objects migrating.\n", CkMyPe(), migrates_expected);
489 #if 0
490   if (m->n_moves ==0) {
491     theLbdb->SetLBPeriod(theLbdb->GetLBPeriod()*2);
492   }
493 #endif
494
495   cur_ld_balancer = m->next_lb;
496   if((CkMyPe() == cur_ld_balancer) && (cur_ld_balancer != 0)){
497       LBDatabaseObj()->set_avail_vector(m->avail_vector, -2);
498   }
499
500   if (migrates_expected == 0 || migrates_completed == migrates_expected)
501     MigrationDone(1);
502   delete m;
503 #endif
504 }
505
506
507 void CentralLB::MigrationDone(int balancing)
508 {
509 #if CMK_LBDB_ON
510   migrates_completed = 0;
511   migrates_expected = -1;
512   // clear load stats
513   if (balancing) theLbdb->ClearLoads();
514   // Increment to next step
515   theLbdb->incStep();
516
517   LoadbalanceDone(balancing);        // callback
518
519   // if sync resume invoke a barrier
520   if (balancing && _lb_args.syncResume()) {
521     CkCallback cb(CkIndex_CentralLB::ResumeClients((CkReductionMsg*)NULL), 
522                   thisProxy);
523     contribute(0, NULL, CkReduction::sum_int, cb);
524   }
525   else 
526     thisProxy [CkMyPe()].ResumeClients(balancing);
527 #endif
528 }
529
530 void CentralLB::ResumeClients(CkReductionMsg *msg)
531 {
532   ResumeClients(1);
533   delete msg;
534 }
535
536 void CentralLB::ResumeClients(int balancing)
537 {
538
539 #if CMK_LBDB_ON
540   DEBUGF(("[%d] Resuming clients. balancing:%d.\n",CkMyPe(),balancing));
541   if (balancing && _lb_args.debug() && CkMyPe() == cur_ld_balancer) {
542     double end_lb_time = CkWallTimer();
543     CkPrintf("[%s] Load balancing step %d finished at %f\n",
544               lbName(), step()-1,end_lb_time);
545     double lbdbMemsize = LBDatabase::Object()->useMem()/1000;
546     CkPrintf("[%s] duration %fs memUsage: LBManager:%dKB CentralLB:%dKB\n", 
547               lbName(), end_lb_time - start_lb_time,
548               (int)lbdbMemsize, (int)(useMem()/1000));
549   }
550
551   ComlibNotifyMigrationDone();  
552
553   theLbdb->ResumeClients();
554   if (balancing)  {
555     CheckMigrationComplete();
556     if (future_migrates_expected == 0 || 
557             future_migrates_expected == future_migrates_completed) {
558       CheckMigrationComplete();
559     }
560   }
561 #endif
562 }
563
564 /*
565   migration of objects contains two different kinds:
566   (1) objects want to make a barrier for migration completion
567       (waitForBarrier is true)
568       migrationDone() to finish and resumeClients
569   (2) objects don't need a barrier
570   However, next load balancing can only happen when both migrations complete
571 */ 
572 void CentralLB::CheckMigrationComplete()
573 {
574 #if CMK_LBDB_ON
575   lbdone ++;
576   if (lbdone == 2) {
577     lbdone = 0;
578     future_migrates_expected = -1;
579     future_migrates_completed = 0;
580     DEBUGF(("[%d] MigrationComplete\n", CkMyPe()));
581     // release local barrier  so that the next load balancer can go
582     LDOMHandle h;
583     h.id.id.idx = 0;
584     theLbdb->getLBDB()->DoneRegisteringObjects(h);
585     // switch to the next load balancer in the list
586     theLbdb->nextLoadbalancer(seqno);
587   }
588 #endif
589 }
590
591 void CentralLB::preprocess(LDStats* stats,int count)
592 {
593   for (int pe=0; pe<count; pe++)
594   {
595     struct ProcStats &procStat = statsData->procs[pe];
596     if (_lb_args.ignoreBgLoad()) {
597       procStat.idletime = 0.0;
598       procStat.bg_walltime = 0.0;
599       procStat.bg_cputime = 0.0;
600     }
601   }
602
603   // Call the predictor for the future
604   if (_lb_predict) FuturePredictor(statsData);
605 }
606
607 // default load balancing strategy
608 LBMigrateMsg* CentralLB::Strategy(LDStats* stats,int count)
609 {
610 #if CMK_LBDB_ON
611   work(stats, count);
612   return createMigrateMsg(stats, count);
613 #else
614   return NULL;
615 #endif
616 }
617
618 void CentralLB::work(LDStats* stats,int count)
619 {
620   // does nothing but print the database
621   stats->print();
622 }
623
624 // generate migrate message from stats->from_proc and to_proc
625 LBMigrateMsg * CentralLB::createMigrateMsg(LDStats* stats,int count)
626 {
627   int i;
628   CkVec<MigrateInfo*> migrateInfo;
629   for (i=0; i<stats->n_objs; i++) {
630     LDObjData &objData = stats->objData[i];
631     int frompe = stats->from_proc[i];
632     int tope = stats->to_proc[i];
633     if (frompe != tope) {
634       //      CkPrintf("[%d] Obj %d migrating from %d to %d\n",
635       //         CkMyPe(),obj,pe,dest);
636       MigrateInfo *migrateMe = new MigrateInfo;
637       migrateMe->obj = objData.handle;
638       migrateMe->from_pe = frompe;
639       migrateMe->to_pe = tope;
640       migrateMe->async_arrival = objData.asyncArrival;
641       migrateInfo.insertAtEnd(migrateMe);
642     }
643   }
644
645   int migrate_count=migrateInfo.length();
646   LBMigrateMsg* msg = new(migrate_count,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
647   msg->n_moves = migrate_count;
648   for(i=0; i < migrate_count; i++) {
649     MigrateInfo* item = (MigrateInfo*) migrateInfo[i];
650     msg->moves[i] = *item;
651     delete item;
652     migrateInfo[i] = 0;
653   }
654   if (_lb_args.debug())
655     CkPrintf("%s: %d objects migrating.\n", lbname, migrate_count);
656   return msg;
657 }
658
659 void CentralLB::simulationWrite() {
660   if(step() == LBSimulation::dumpStep)
661   {
662     // here we are supposed to dump the database
663     int dumpFileSize = strlen(LBSimulation::dumpFile) + 4;
664     char *dumpFileName = (char *)malloc(dumpFileSize);
665     while (sprintf(dumpFileName, "%s.%d", LBSimulation::dumpFile, LBSimulation::dumpStep) >= dumpFileSize) {
666       free(dumpFileName);
667       dumpFileSize+=3;
668       dumpFileName = (char *)malloc(dumpFileSize);
669     }
670     writeStatsMsgs(dumpFileName);
671     free(dumpFileName);
672     CmiPrintf("LBDump: Dumped the load balancing data at step %d.\n",LBSimulation::dumpStep);
673     ++LBSimulation::dumpStep;
674     --LBSimulation::dumpStepSize;
675     if (LBSimulation::dumpStepSize <= 0) { // prevent stupid step sizes
676       CmiPrintf("Charm++> Exiting...\n");
677       CkExit();
678     }
679     return;
680   }
681 }
682
683 void CentralLB::simulationRead() {
684   LBSimulation *simResults = NULL, *realResults;
685   LBMigrateMsg *voidMessage = new (0,0,0,0) LBMigrateMsg();
686   voidMessage->n_moves=0;
687   for ( ;LBSimulation::simStepSize > 0; --LBSimulation::simStepSize, ++LBSimulation::simStep) {
688     // here we are supposed to read the data from the dump database
689     int simFileSize = strlen(LBSimulation::dumpFile) + 4;
690     char *simFileName = (char *)malloc(simFileSize);
691     while (sprintf(simFileName, "%s.%d", LBSimulation::dumpFile, LBSimulation::simStep) >= simFileSize) {
692       free(simFileName);
693       simFileSize+=3;
694       simFileName = (char *)malloc(simFileSize);
695     }
696     readStatsMsgs(simFileName);
697     free(simFileName);
698
699     // allocate simResults (only the first step)
700     if (simResults == NULL) {
701       simResults = new LBSimulation(LBSimulation::simProcs);
702       realResults = new LBSimulation(LBSimulation::simProcs);
703     }
704     else {
705       // should be the same number of procs of the original simulation!
706       if (!LBSimulation::procsChanged) {
707         // it means we have a previous step, so in simResults there is data.
708         // we can now print the real effects of the load balancer during the simulation
709         // or print the difference between the predicted data and the real one.
710         realResults->reset();
711         // reset to_proc of statsData to be equal to from_proc
712         for (int k=0; k < statsData->n_objs; ++k) statsData->to_proc[k] = statsData->from_proc[k];
713         findSimResults(statsData, LBSimulation::simProcs, voidMessage, realResults);
714         simResults->PrintDifferences(realResults,statsData);
715       }
716       simResults->reset();
717     }
718
719     // now pass it to the strategy routine
720     double startT = CkWallTimer();
721     preprocess(statsData, LBSimulation::simProcs);
722     CmiPrintf("%s> Strategy starts ... \n", lbname);
723     LBMigrateMsg* migrateMsg = Strategy(statsData, LBSimulation::simProcs);
724     CmiPrintf("%s> Strategy took %fs memory usage: CentralLB:%dKB. \n", 
725                lbname, CkWallTimer()-startT, (int)(useMem()/1000));
726
727     // now calculate the results of the load balancing simulation
728     findSimResults(statsData, LBSimulation::simProcs, migrateMsg, simResults);
729
730     // now we have the simulation data, so print it and loop
731     CmiPrintf("Charm++> LBSim: Simulation of load balancing step %d done.\n",LBSimulation::simStep);
732     simResults->PrintSimulationResults();
733
734     delete migrateMsg;
735     CmiPrintf("Charm++> LBSim: Passing to the next step\n");
736   }
737   // deallocate simResults
738   delete simResults;
739   CmiPrintf("Charm++> Exiting...\n");
740   CkExit();
741 }
742
743 void CentralLB::readStatsMsgs(const char* filename) 
744 {
745 #if CMK_LBDB_ON
746   int i;
747   FILE *f = fopen(filename, "r");
748   if (f==NULL) {
749     CmiPrintf("Fatal Error> Cannot open LB Dump file %s!\n", filename);
750     CmiAbort("");
751   }
752
753   // at this stage, we need to rebuild the statsMsgList and
754   // statsDataList structures. For that first deallocate the
755   // old structures
756   if (statsMsgsList) {
757     for(i = 0; i < stats_msg_count; i++)
758       delete statsMsgsList[i];
759     delete[] statsMsgsList;
760     statsMsgsList=0;
761   }
762
763   PUP::fromDisk pd(f);
764   PUP::machineInfo machInfo;
765
766   pd((char *)&machInfo, sizeof(machInfo));      // read machine info
767   PUP::xlater p(machInfo, pd);
768
769   p|stats_msg_count;
770
771   CmiPrintf("readStatsMsgs for %d pes starts ... \n", stats_msg_count);
772   if (LBSimulation::simProcs == 0) LBSimulation::simProcs = stats_msg_count;
773   if (LBSimulation::simProcs != stats_msg_count) LBSimulation::procsChanged = true;
774
775   // LBSimulation::simProcs must be set
776   statsData->pup(p);
777
778   CmiPrintf("Simulation for %d pes \n", LBSimulation::simProcs);
779   CmiPrintf("n_obj: %d n_migratble: %d \n", statsData->n_objs, statsData->n_migrateobjs);
780
781   // file f is closed in the destructor of PUP::fromDisk
782   CmiPrintf("ReadStatsMsg from %s completed\n", filename);
783 #endif
784 }
785
786 void CentralLB::writeStatsMsgs(const char* filename) 
787 {
788 #if CMK_LBDB_ON
789   FILE *f = fopen(filename, "w");
790   if (f==NULL) {
791     CmiPrintf("Fatal Error> writeStatsMsgs failed to open the output file %s!\n", filename);
792     CmiAbort("");
793   }
794
795   const PUP::machineInfo &machInfo = PUP::machineInfo::current();
796   PUP::toDisk p(f);
797   p((char *)&machInfo, sizeof(machInfo));       // machine info
798
799   p|stats_msg_count;
800   statsData->pup(p);
801
802   fclose(f);
803
804   CmiPrintf("WriteStatsMsgs to %s succeed!\n", filename);
805 #endif
806 }
807
808 // calculate the predicted wallclock/cpu load for every processors
809 // considering communication overhead if considerComm is true
810 void getPredictedLoadWithMsg(BaseLB::LDStats* stats, int count, 
811                       LBMigrateMsg *msg, LBInfo &info, 
812                       int considerComm)
813 {
814 #if CMK_LBDB_ON
815         stats->makeCommHash();
816
817         // update to_proc according to migration msgs
818         for(int i = 0; i < msg->n_moves; i++) {
819           MigrateInfo &mInfo = msg->moves[i];
820           int idx = stats->getHash(mInfo.obj.objID(), mInfo.obj.omID());
821           CmiAssert(idx != -1);
822           stats->to_proc[idx] = mInfo.to_pe;
823         }
824
825         getLoadInfo(stats, count, info, considerComm);
826 }
827
828
829 void getLoadInfo(BaseLB::LDStats* stats, int count, 
830                              LBInfo &info, int considerComm)
831 {
832         int i, pe;
833         double *peLoads = info.peLoads;
834         double *objLoads = info.objLoads;
835         double *comLoads = info.comLoads;
836         double *bgLoads = info.bgLoads;
837         double minObjLoad = 1.0e20;  // I suppose no object load is beyond this
838         double maxObjLoad = 0.0;
839         CmiAssert(peLoads);
840
841         double alpha = _lb_args.alpha();
842         double beeta = _lb_args.beeta();
843
844         stats->makeCommHash();
845
846         info.clear();
847
848         // get background load
849         if (bgLoads)
850           for(pe = 0; pe < count; pe++)
851            bgLoads[pe] = stats->procs[pe].bg_walltime;
852
853         for(pe = 0; pe < count; pe++)
854           peLoads[pe] = stats->procs[pe].bg_walltime;
855
856         for(int obj = 0; obj < stats->n_objs; obj++)
857         {
858                 int pe = stats->to_proc[obj];
859                 double &oload = stats->objData[obj].wallTime;
860                 if (oload < minObjLoad) minObjLoad = oload;
861                 if (oload > maxObjLoad) maxObjLoad = oload;
862                 peLoads[pe] += oload;
863                 if (objLoads) objLoads[pe] += oload;
864         }
865
866         // handling of the communication overheads. 
867         if (considerComm) {
868           int* msgSentCount = new int[count]; // # of messages sent by each PE
869           int* msgRecvCount = new int[count]; // # of messages received by each PE
870           int* byteSentCount = new int[count];// # of bytes sent by each PE
871           int* byteRecvCount = new int[count];// # of bytes reeived by each PE
872           for(i = 0; i < count; i++)
873             msgSentCount[i] = msgRecvCount[i] = byteSentCount[i] = byteRecvCount[i] = 0;
874
875           int mcast_count = 0;
876           for (int cidx=0; cidx < stats->n_comm; cidx++) {
877             LDCommData& cdata = stats->commData[cidx];
878             int senderPE, receiverPE;
879             if (cdata.from_proc())
880               senderPE = cdata.src_proc;
881             else {
882               int idx = stats->getHash(cdata.sender);
883               if (idx == -1) continue;    // sender has just migrated?
884               senderPE = stats->to_proc[idx];
885               CmiAssert(senderPE != -1);
886             }
887             CmiAssert(senderPE < count && senderPE >= 0);
888
889             // find receiver: point-to-point and multicast two cases
890             int receiver_type = cdata.receiver.get_type();
891             if (receiver_type == LD_PROC_MSG || receiver_type == LD_OBJ_MSG) {
892               if (receiver_type == LD_PROC_MSG)
893                 receiverPE = cdata.receiver.proc();
894               else  {  // LD_OBJ_MSG
895                 int idx = stats->getHash(cdata.receiver.get_destObj());
896                 if (idx == -1) continue;    // receiver has just been removed?
897                 receiverPE = stats->to_proc[idx];
898                 CmiAssert(receiverPE != -1);
899               }
900               CmiAssert(receiverPE < count && receiverPE >= 0);
901               if(senderPE != receiverPE)
902               {
903                 msgSentCount[senderPE] += cdata.messages;
904                 byteSentCount[senderPE] += cdata.bytes;
905                 msgRecvCount[receiverPE] += cdata.messages;
906                 byteRecvCount[receiverPE] += cdata.bytes;
907               }
908             }
909             else if (receiver_type == LD_OBJLIST_MSG) {
910               int nobjs;
911               LDObjKey *objs = cdata.receiver.get_destObjs(nobjs);
912               mcast_count ++;
913               for (i=0; i<nobjs; i++) {
914                 int idx = stats->getHash(objs[i]);
915                 CmiAssert(idx != -1);
916                 if (idx == -1) continue;    // receiver has just been removed?
917                 receiverPE = stats->to_proc[idx];
918                 CmiAssert(receiverPE < count && receiverPE >= 0);
919                 if(senderPE != receiverPE)
920                 {
921                 msgSentCount[senderPE] += cdata.messages;
922                 byteSentCount[senderPE] += cdata.bytes;
923                 msgRecvCount[receiverPE] += cdata.messages;
924                 byteRecvCount[receiverPE] += cdata.bytes;
925                 }
926               }
927             }
928           }   // end of for
929           if (_lb_args.debug())
930              CkPrintf("Number of MULTICAST: %d\n", mcast_count);
931
932           // now for each processor, add to its load the send and receive overheads
933           for(i = 0; i < count; i++)
934           {
935                 double comload = msgRecvCount[i]  * PER_MESSAGE_RECV_OVERHEAD +
936                               msgSentCount[i]  * alpha +
937                               byteRecvCount[i] * PER_BYTE_RECV_OVERHEAD +
938                               byteSentCount[i] * beeta;
939                 peLoads[i] += comload;
940                 if (comLoads) comLoads[i] += comload;
941           }
942           delete [] msgRecvCount;
943           delete [] msgSentCount;
944           delete [] byteRecvCount;
945           delete [] byteSentCount;
946         }
947
948         info.minObjLoad = minObjLoad;
949         info.maxObjLoad = maxObjLoad;
950 #endif
951 }
952
953 void CentralLB::findSimResults(LDStats* stats, int count, LBMigrateMsg* msg, LBSimulation* simResults)
954 {
955     CkAssert(simResults != NULL && count == simResults->numPes);
956     // estimate the new loads of the processors. As a first approximation, this is the
957     // sum of the cpu times of the objects on that processor
958     double startT = CkWallTimer();
959     getPredictedLoadWithMsg(stats, count, msg, simResults->lbinfo, 1);
960     CmiPrintf("getPredictedLoad finished in %fs\n", CkWallTimer()-startT);
961 }
962
963 void CentralLB::pup(PUP::er &p) { 
964   BaseLB::pup(p); 
965   if (p.isUnpacking())  {
966     initLB(CkLBOptions(seqno)); 
967   }
968 }
969
970 int CentralLB::useMem() { 
971   return sizeof(CentralLB) + statsData->useMem() + 
972          CkNumPes() * sizeof(CLBStatsMsg *);
973 }
974
975
976 /**
977   CLBStatsMsg is not a real message now.
978   CLBStatsMsg is used for all processors to fill in their local load and comm
979   statistics and send to processor 0
980 */
981
982 CLBStatsMsg::CLBStatsMsg(int osz, int csz) {
983   objData = new LDObjData[osz];
984   commData = new LDCommData[csz];
985   avail_vector = new char[CkNumPes()];
986 }
987
988 CLBStatsMsg::~CLBStatsMsg() {
989   delete [] objData;
990   delete [] commData;
991   delete [] avail_vector;
992 }
993
994 void CLBStatsMsg::pup(PUP::er &p) {
995   int i;
996   p|from_pe;
997   p|serial;
998   p|pe_speed;
999   p|total_walltime; p|total_cputime;
1000   p|idletime;
1001   p|bg_walltime;   p|bg_cputime;
1002   p|n_objs;
1003   if (p.isUnpacking()) objData = new LDObjData[n_objs];
1004   for (i=0; i<n_objs; i++) p|objData[i];
1005   p|n_comm;
1006   if (p.isUnpacking()) commData = new LDCommData[n_comm];
1007   for (i=0; i<n_comm; i++) p|commData[i];
1008   if (p.isUnpacking()) avail_vector = new char[CkNumPes()];
1009   p(avail_vector, CkNumPes());
1010   p(next_lb);
1011 }
1012
1013 // CkMarshalledCLBStatsMessage is used in the marshalled parameter in
1014 // the entry function, it is just used to use to pup.
1015 // I don't use CLBStatsMsg directly as marshalled parameter because
1016 // I want the data pointer stored and not to be freed by the Charm++.
1017 CkMarshalledCLBStatsMessage::~CkMarshalledCLBStatsMessage() {
1018   if (msg) delete msg;
1019 }
1020
1021 void CkMarshalledCLBStatsMessage::pup(PUP::er &p)
1022 {
1023   if (p.isUnpacking()) msg = new CLBStatsMsg;
1024   else CmiAssert(msg);
1025   msg->pup(p);
1026 }
1027
1028 #include "CentralLB.def.h"
1029
1030 /*@}*/