minor changes, removed warnings.
[charm.git] / src / ck-ldb / CentralLB.C
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 /**
9  * \addtogroup CkLdb
10 */
11 /*@{*/
12
13 #include <charm++.h>
14 #include "envelope.h"
15 #include "CentralLB.h"
16 #include "CentralLB.def.h"
17
18 #define  DEBUGF(x)    // CmiPrintf x;
19
20 CkGroupID loadbalancer;
21 char ** avail_vector_address;
22 int * lb_ptr;
23 int load_balancer_created;
24
25 #if CMK_LBDB_ON
26
27 static void getPredictedLoad(CentralLB::LDStats* stats, int count, 
28                           LBMigrateMsg *, double *peLoads, double &, double &);
29
30 void CreateCentralLB()
31 {
32   loadbalancer = CProxy_CentralLB::ckNew();
33 }
34
35 void set_avail_vector(char * bitmap){
36     int count;
37     int assigned = 0;
38     for(count = 0; count < CkNumPes(); count++){
39         (*avail_vector_address)[count] = bitmap[count];
40         if((bitmap[count] == 1) && !assigned){
41             *lb_ptr = count;
42             assigned = 1;
43         }
44     }
45 }
46
47 void CentralLB::staticStartLB(void* data)
48 {
49   CentralLB *me = (CentralLB*)(data);
50   me->StartLB();
51 }
52
53 void CentralLB::staticMigrated(void* data, LDObjHandle h)
54 {
55   CentralLB *me = (CentralLB*)(data);
56   me->Migrated(h);
57 }
58
59 void CentralLB::staticAtSync(void* data)
60 {
61   CentralLB *me = (CentralLB*)(data);
62   me->AtSync();
63 }
64
65 CentralLB::CentralLB()
66 {
67   lbname = "CentralLB";
68   mystep = 0;
69   //  CkPrintf("Construct in %d\n",CkMyPe());
70   theLbdb = CProxy_LBDatabase(lbdb).ckLocalBranch();
71   theLbdb->
72     AddLocalBarrierReceiver((LDBarrierFn)(staticAtSync),(void*)(this));
73   theLbdb->
74     NotifyMigrated((LDMigratedFn)(staticMigrated),(void*)(this));
75   theLbdb->
76     AddStartLBFn((LDStartLBFn)(staticStartLB),(void*)(this));
77
78   stats_msg_count = 0;
79   statsMsgsList = new CLBStatsMsg*[CkNumPes()];
80   for(int i=0; i < CkNumPes(); i++)
81     statsMsgsList[i] = 0;
82
83   statsData = new LDStats;
84   statsData->procs = new ProcStats[CkNumPes()];
85
86   myspeed = theLbdb->ProcessorSpeed();
87   theLbdb->CollectStatsOn();
88   migrates_completed = 0;
89   migrates_expected = -1;
90
91   cur_ld_balancer = 0;
92   new_ld_balancer = 0;
93   int num_proc = CkNumPes();
94   avail_vector = new char[num_proc];
95   for(int proc = 0; proc < num_proc; proc++)
96       avail_vector[proc] = 1;
97   avail_vector_address = &(avail_vector);
98   lb_ptr = &new_ld_balancer;
99
100   load_balancer_created = 1;
101 }
102
103 CentralLB::~CentralLB()
104 {
105   CkPrintf("Going away\n");
106   theLbdb->
107     RemoveStartLBFn((LDStartLBFn)(staticStartLB));
108 }
109
110 void CentralLB::AtSync()
111 {
112   DEBUGF(("[%d] CentralLB At Sync step %d!!!!\n",CkMyPe(),mystep));
113
114   if (!QueryBalanceNow(step())) {
115     MigrationDone(0);
116     return;
117   }
118   thisProxy [CkMyPe()].ProcessAtSync();
119 }
120
121 void CentralLB::ProcessAtSync()
122 {
123   if (CkMyPe() == cur_ld_balancer) {
124     start_lb_time = CmiWallTimer();
125     if (lb_debug) 
126       CmiPrintf("[%s] Load balancing step %d starting at %f in PE%d\n",
127                  lbName(), step(),start_lb_time, cur_ld_balancer);
128   }
129   // build and send stats
130   const int osz = theLbdb->GetObjDataSz();
131   const int csz = theLbdb->GetCommDataSz();
132
133   int npes = CkNumPes();
134   CLBStatsMsg* msg = new CLBStatsMsg(osz, csz);
135   msg->from_pe = CkMyPe();
136   // msg->serial = rand();
137   msg->serial = CrnRand();
138
139   theLbdb->TotalTime(&msg->total_walltime,&msg->total_cputime);
140   theLbdb->IdleTime(&msg->idletime);
141   theLbdb->BackgroundLoad(&msg->bg_walltime,&msg->bg_cputime);
142   msg->pe_speed = myspeed;
143 //  CkPrintf(
144 //    "Processors %d Total time (wall,cpu) = %f %f Idle = %f Bg = %f %f\n",
145 //    CkMyPe(),msg->total_walltime,msg->total_cputime,
146 //    msg->idletime,msg->bg_walltime,msg->bg_cputime);
147
148   msg->n_objs = osz;
149   theLbdb->GetObjData(msg->objData);
150   msg->n_comm = csz;
151   theLbdb->GetCommData(msg->commData);
152 //  theLbdb->ClearLoads();
153   DEBUGF(("PE %d sending %d to ReceiveStats %d objs, %d comm\n",
154            CkMyPe(),msg->serial,msg->n_objs,msg->n_comm));
155
156 // Scheduler PART.
157
158   if(CkMyPe() == cur_ld_balancer) {
159     int num_proc = CkNumPes();
160     for(int proc = 0; proc < num_proc; proc++){
161       msg->avail_vector[proc] = avail_vector[proc];
162     }
163     msg->next_lb = new_ld_balancer;
164   }
165
166   thisProxy[cur_ld_balancer].ReceiveStats(msg);
167 }
168
169 void CentralLB::Migrated(LDObjHandle h)
170 {
171   migrates_completed++;
172   //  CkPrintf("[%d] An object migrated! %d %d\n",
173   //       CkMyPe(),migrates_completed,migrates_expected);
174   if (migrates_completed == migrates_expected) {
175     MigrationDone(1);
176   }
177 }
178
179 // build data from buffered msg
180 void CentralLB::buildStats()
181 {
182     statsData->count = stats_msg_count;
183     statsData->objData = new LDObjData[statsData->n_objs];
184     statsData->from_proc = new int[statsData->n_objs];
185     statsData->to_proc = new int[statsData->n_objs];
186     statsData->commData = new LDCommData[statsData->n_comm];
187     int nobj = 0;
188     int ncom = 0;
189     // copy all data in individule message to this big structure
190     for (int pe=0; pe<stats_msg_count; pe++) {
191        int i;
192        CLBStatsMsg *msg = statsMsgsList[pe];
193        for (i=0; i<msg->n_objs; i++) {
194          statsData->from_proc[nobj] = statsData->to_proc[nobj] = pe;
195          statsData->objData[nobj] = msg->objData[i];
196          nobj++;
197        }
198        for (i=0; i<msg->n_comm; i++) {
199          statsData->commData[ncom] = msg->commData[i];
200          ncom++;
201        }
202        delete msg;
203        statsMsgsList[pe]=0;
204     }
205 }
206
207 void CentralLB::ReceiveStats(CkMarshalledCLBStatsMessage &msg)
208 {
209   CLBStatsMsg *m = (CLBStatsMsg *)msg.getMessage();
210   int proc;
211   const int pe = m->from_pe;
212 //  CkPrintf("Stats msg received, %d %d %d %d %p\n",
213 //         pe,stats_msg_count,m->n_objs,m->serial,m);
214
215   if((pe == 0) && (CkMyPe() != 0)){
216       new_ld_balancer = m->next_lb;
217       int num_proc = CkNumPes();
218       for(int proc = 0; proc < num_proc; proc++)
219           avail_vector[proc] = m->avail_vector[proc];
220   }
221
222   DEBUGF(("ReceiveStats from %d step: %d\n", pe, mystep));
223   if (statsMsgsList[pe] != 0) {
224     CkPrintf("*** Unexpected CLBStatsMsg in ReceiveStats from PE %d ***\n",
225              pe);
226   } else {
227     statsMsgsList[pe] = m;
228     // store per processor data right away
229     struct ProcStats &procStat = statsData->procs[pe];
230     procStat.total_walltime = m->total_walltime;
231     procStat.total_cputime = m->total_cputime;
232     if (lb_ignoreBgLoad) {
233       procStat.idletime = 0.0;
234       procStat.bg_walltime = 0.0;
235       procStat.bg_cputime = 0.0;
236     }
237     else {
238       procStat.idletime = m->idletime;
239       procStat.bg_walltime = m->bg_walltime;
240       procStat.bg_cputime = m->bg_cputime;
241     }
242     procStat.pe_speed = m->pe_speed;
243     procStat.utilization = 1.0;
244     procStat.available = CmiTrue;
245     procStat.n_objs = m->n_objs;
246
247     statsData->n_objs += m->n_objs;
248     statsData->n_comm += m->n_comm;
249     stats_msg_count++;
250   }
251
252   const int clients = CkNumPes();
253   if (stats_msg_count == clients) {
254 //    double strat_start_time = CmiWallTimer();
255
256     // build data
257     buildStats();
258
259     // if this is the step at which we need to dump the database
260     simulation();
261
262     for(proc = 0; proc < clients; proc++)
263       statsData->procs[proc].available = (CmiBool)avail_vector[proc];
264
265 //    CkPrintf("Before Calling Strategy\n");
266     LBMigrateMsg* migrateMsg = Strategy(statsData, clients);
267
268 //    CkPrintf("returned successfully\n");
269     int num_proc = CkNumPes();
270
271     for(proc = 0; proc < num_proc; proc++)
272         migrateMsg->avail_vector[proc] = avail_vector[proc];
273     migrateMsg->next_lb = new_ld_balancer;
274
275 //  very time consuming, only needed for step load balancing
276 //    getPredictedLoad(statsDataList, clients, migrateMsg, migrateMsg->expectedLoad);
277
278 //  CkPrintf("calling recv migration\n");
279     thisProxy.ReceiveMigration(migrateMsg);
280
281     // Zero out data structures for next cycle
282     // CkPrintf("zeroing out data\n");
283     statsData->clear();
284     stats_msg_count=0;
285
286 //    double strat_end_time = CmiWallTimer();
287 //    CkPrintf("Strat elapsed time %f\n",strat_end_time-strat_start_time);
288   }
289
290 }
291
292 // test if sender and receiver in a commData is nonmigratable.
293 static int isMigratable(LDObjData **objData, int *len, int count, const LDCommData &commData)
294 {
295   for (int pe=0 ; pe<count; pe++)
296   {
297     for (int i=0; i<len[pe]; i++)
298       if (LDObjIDEqual(objData[pe][i].objID(), commData.sender.objID()) ||
299           LDObjIDEqual(objData[pe][i].objID(), commData.receiver.get_destObj().objID())) 
300       return 0;
301   }
302   return 1;
303 }
304
305 #if 0
306 // remove in the LDStats those objects that are non migratable
307 void CentralLB::RemoveNonMigratable(LDStats* stats, int count)
308 {
309   int pe;
310   LDObjData **nonmig = new LDObjData*[count];
311   int   *lens = new int[count];
312   int n_objs = stats->n_objs;
313     LDObjStats *objStat = stats.objData[n]; 
314     int l=-1, h=n_objs;
315     while (l<h) {
316       while (objStat[l+1].data.migratable && l<h) l++;
317       while (h>0 && !objStat[h-1].data.migratable && l<h) h--;
318       if (h-l>2) {
319         LDObjStats tmp = objStat[l+1];
320         objStat[l+1] = objStat[h-1];
321         objStat[h-1] = tmp;
322       }
323     }
324     stats->n_objs = h;
325     if (n_objs != h) CmiPrintf("Removed %d nonmigratable on pe:%d n_objs:%d migratable:%d\n", n_objs-h, pe, n_objs, h);
326     nonmig[pe] = objData+stats[pe].n_objs;
327     lens[pe] = n_objs-stats[pe].n_objs;
328
329     // now turn nonmigratable to bg load
330     for (int j=stats[pe].n_objs; j<n_objs; j++) {
331       stats[pe].bg_walltime += objData[j].wallTime;
332       stats[pe].bg_cputime += objData[j].cpuTime;
333     }
334
335   // modify comm data
336   for (pe=0; pe<count; pe++) {
337     int n_comm = stats[pe].n_comm;
338     if (n_comm == 0) continue;
339     LDCommData *commData = stats[pe].commData;
340     int l=-1, h=n_comm;
341     while (l<h) {
342       while (isMigratable(nonmig, lens, count, commData[l+1]) && l<h) l++;
343       while (!isMigratable(nonmig, lens, count, commData[h-1]) && l<h) h--;
344       if (h-l>2) {
345         LDCommData tmp = commData[l+1];
346         commData[l+1] = commData[h-1];
347         commData[h-1] = tmp;
348       }
349       else 
350         break;
351     }
352     if (l==-1 && h==-1) h=0;
353     stats[pe].n_comm = h;
354     if (n_comm != h) CmiPrintf("Removed %d nonmigratable on pe:%d n_comm:%d migratable:%d\n", n_comm-h, pe, n_comm, h);
355   }
356   delete [] nonmig;
357   delete [] lens;
358 }
359 #endif
360
361 void CentralLB::ReceiveMigration(LBMigrateMsg *m)
362 {
363   int i;
364   for (i=0; i<CkNumPes(); i++) theLbdb->lastLBInfo.expectedLoad[i] = m->expectedLoad[i];
365   
366   DEBUGF(("[%d] in ReceiveMigration %d moves\n",CkMyPe(),m->n_moves));
367   migrates_expected = 0;
368   for(i=0; i < m->n_moves; i++) {
369     MigrateInfo& move = m->moves[i];
370     const int me = CkMyPe();
371     if (move.from_pe == me && move.to_pe != me) {
372       DEBUGF(("[%d] migrating object to %d\n",move.from_pe,move.to_pe));
373       theLbdb->Migrate(move.obj,move.to_pe);
374     } else if (move.from_pe != me && move.to_pe == me) {
375         //  CkPrintf("[%d] expecting object from %d\n",move.to_pe,move.from_pe);
376       migrates_expected++;
377     }
378   }
379 #if 0
380   if (m->n_moves ==0) {
381     theLbdb->SetLBPeriod(theLbdb->GetLBPeriod()*2);
382   }
383 #endif
384
385   cur_ld_balancer = m->next_lb;
386   if((CkMyPe() == cur_ld_balancer) && (cur_ld_balancer != 0)){
387       int num_proc = CkNumPes();
388       for(int proc = 0; proc < num_proc; proc++)
389           avail_vector[proc] = m->avail_vector[proc];
390   }
391
392   if (migrates_expected == 0 || migrates_completed == migrates_expected)
393     MigrationDone(1);
394   delete m;
395 }
396
397
398 void CentralLB::MigrationDone(int balancing)
399 {
400   if (balancing && lb_debug && CkMyPe() == cur_ld_balancer) {
401     double end_lb_time = CmiWallTimer();
402       CkPrintf("[%s] Load balancing step %d finished at %f\n",
403                 lbName(), step(),end_lb_time);
404       double lbdbMemsize = LBDatabase::Object()->useMem()/1000;
405       CkPrintf("[%s] duration %f memUsage: LBManager:%dKB CentralLB:%dKB\n", 
406                 lbName(), end_lb_time - start_lb_time,
407                 (int)lbdbMemsize, (int)(useMem()/1000));
408   }
409   migrates_completed = 0;
410   migrates_expected = -1;
411   // clear load stats
412   if (balancing) theLbdb->ClearLoads();
413   // Increment to next step
414   mystep++;
415   thisProxy [CkMyPe()].ResumeClients();
416 }
417
418 void CentralLB::ResumeClients()
419 {
420   DEBUGF(("Resuming clients on PE %d\n",CkMyPe()));
421   theLbdb->ResumeClients();
422 }
423
424 // default load balancing strategy
425 LBMigrateMsg* CentralLB::Strategy(LDStats* stats,int count)
426 {
427   work(stats, count);
428   return createMigrateMsg(stats, count);
429 }
430
431 void CentralLB::work(LDStats* stats,int count)
432 {
433   int i;
434   for(int pe=0; pe < count; pe++) {
435     struct ProcStats &proc = stats->procs[pe];
436
437     CkPrintf(
438       "Proc %d Sp %d Total time (wall,cpu) = %f %f Idle = %f Bg = %f %f\n",
439       pe,proc.pe_speed,proc.total_walltime,proc.total_cputime,
440       proc.idletime,proc.bg_walltime,proc.bg_cputime);
441   }
442
443   int osz = stats->n_objs;
444     CkPrintf("------------- Object Data: %d objects -------------\n",
445              stats->n_objs);
446     for(i=0; i < osz; i++) {
447       LDObjData &odata = stats->objData[i];
448       CkPrintf("Object %d\n",i);
449       CkPrintf("     id = %d\n",odata.objID().id[0]);
450       CkPrintf("  OM id = %d\n",odata.omID().id);
451       CkPrintf("   Mig. = %d\n",odata.migratable);
452       CkPrintf("    CPU = %f\n",odata.cpuTime);
453       CkPrintf("   Wall = %f\n",odata.wallTime);
454     }
455
456     const int csz = stats->n_comm;
457
458     CkPrintf("------------- Comm Data: %d records -------------\n",
459              csz);
460     LDCommData *cdata = stats->commData;
461     for(i=0; i < csz; i++) {
462       CkPrintf("Link %d\n",i);
463
464       if (cdata[i].from_proc())
465         CkPrintf("    sender PE = %d\n",cdata[i].src_proc);
466       else
467         CkPrintf("    sender id = %d:%d\n",
468                  cdata[i].sender.omID().id,cdata[i].sender.objID().id[0]);
469
470       if (cdata[i].recv_type() == LD_PROC_MSG)
471         CkPrintf("  receiver PE = %d\n",cdata[i].receiver.proc());
472       else      
473         CkPrintf("  receiver id = %d:%d\n",
474                  cdata[i].receiver.get_destObj().omID().id,cdata[i].receiver.get_destObj().objID().id[0]);
475       
476       CkPrintf("     messages = %d\n",cdata[i].messages);
477       CkPrintf("        bytes = %d\n",cdata[i].bytes);
478     }
479 }
480
481 LBMigrateMsg * CentralLB::createMigrateMsg(LDStats* stats,int count)
482 {
483   int sizes=0;
484   LBMigrateMsg* msg = new(&sizes,1) LBMigrateMsg;
485   msg->n_moves = 0;
486
487   return msg;
488 }
489
490 void CentralLB::simulation() {
491   if(step() == LBSimulation::dumpStep)
492   {
493     // here we are supposed to dump the database
494     writeStatsMsgs(LBSimulation::dumpFile);
495     CmiPrintf("LBDump: Dumped the load balancing data.\n");
496     CmiPrintf("Charm++> Exiting...\n");
497     CkExit();
498     return;
499   }
500   else if(LBSimulation::doSimulation)
501   {
502     // here we are supposed to read the data from the dump database
503     readStatsMsgs(LBSimulation::dumpFile);
504
505     LBSimulation simResults(LBSimulation::simProcs);
506
507     // now pass it to the strategy routine
508     double startT = CmiWallTimer();
509     CmiPrintf("%s> Strategy starts ... \n", lbname);
510     LBMigrateMsg* migrateMsg = Strategy(statsData, LBSimulation::simProcs);
511     CmiPrintf("%s> Strategy took %fs. \n", lbname, CmiWallTimer()-startT);
512
513     // now calculate the results of the load balancing simulation
514     FindSimResults(statsData, LBSimulation::simProcs, migrateMsg, &simResults);
515
516     // now we have the simulation data, so print it and exit
517     CmiPrintf("Charm++> LBSim: Simulation of one load balancing step done.\n");
518     simResults.PrintSimulationResults();
519
520     delete migrateMsg;
521     CmiPrintf("Charm++> Exiting...\n");
522     CkExit();
523   }
524 }
525
526 void CentralLB::readStatsMsgs(const char* filename) {
527
528   int i;
529   FILE *f = fopen(filename, "r");
530   if (f==NULL) CmiAbort("Fatal Error> Cannot open LB Dump file!\n");
531
532   // at this stage, we need to rebuild the statsMsgList and
533   // statsDataList structures. For that first deallocate the
534   // old structures
535   for(i = 0; i < stats_msg_count; i++)
536         delete statsMsgsList[i];
537   delete[] statsMsgsList;
538
539   PUP::fromDisk p(f);
540   p|stats_msg_count;
541
542   CmiPrintf("readStatsMsgs for %d pes starts ... \n", stats_msg_count);
543   if (LBSimulation::simProcs == 0) LBSimulation::simProcs = stats_msg_count;
544
545   // LBSimulation::simProcs must be set
546   statsData->pup(p);
547
548   CmiPrintf("Simulation for %d pes \n", LBSimulation::simProcs);
549
550   // file f is closed in the destructor of PUP::fromDisk
551   CmiPrintf("ReadStatsMsg from %s completed\n", filename);
552 }
553
554 void CentralLB::writeStatsMsgs(const char* filename) {
555
556   FILE *f = fopen(filename, "w");
557   if (f == NULL) 
558     CmiAbort("writeStatsMsgs failed to open the output file!\n");
559
560   PUP::toDisk p(f);
561
562   p|stats_msg_count;
563   statsData->pup(p);
564
565   fclose(f);
566
567   CmiPrintf("WriteStatsMsgs to %s succeed!\n", filename);
568 }
569
570 static void getPredictedLoad(CentralLB::LDStats* stats, int count, 
571                              LBMigrateMsg *msg, double *peLoads, 
572                              double &minObjLoad, double &maxObjLoad)
573 {
574         int* msgSentCount = new int[count]; // # of messages sent by each PE
575         int* msgRecvCount = new int[count]; // # of messages received by each PE
576         int* byteSentCount = new int[count];// # of bytes sent by each PE
577         int* byteRecvCount = new int[count];// # of bytes reeived by each PE
578         int i, pe;
579
580         for(i = 0; i < count; i++)
581           msgSentCount[i] = msgRecvCount[i] = byteSentCount[i] = byteRecvCount[i] = 0;
582         minObjLoad = 1.0e20;    // I suppose no object load is beyond this
583         maxObjLoad = 0.0;
584
585         stats->makeCommHash();
586         // update to_proc according to migration msgs
587         for(i = 0; i < msg->n_moves; i++) {
588           MigrateInfo &mInfo = msg->moves[i];
589           int idx = stats->getHash(mInfo.obj.objID(), mInfo.obj.omID());
590           CmiAssert(idx != -1);
591           stats->to_proc[idx] = mInfo.to_pe;
592         }
593
594         for(pe = 0; pe < count; pe++)
595         {
596           peLoads[pe] = stats->procs[pe].bg_walltime;
597         }
598
599         for(int obj = 0; obj < stats->n_objs; obj++)
600         {
601                 int pe = stats->to_proc[obj];
602                 double &oload = stats->objData[obj].wallTime;
603                 if (oload < minObjLoad) minObjLoad = oload;
604                 if (oload > maxObjLoad) maxObjLoad = oload;
605                 peLoads[pe] += oload;
606         }
607
608         // handling of the communication overheads. 
609         for (int cidx=0; cidx < stats->n_comm; cidx++) {
610           LDCommData& cdata = stats->commData[cidx];
611           int senderPE, receiverPE;
612           if(cdata.from_proc())
613             senderPE = cdata.src_proc;
614           else {
615             int idx = stats->getHash(cdata.sender);
616             CmiAssert(idx != -1);
617             senderPE = stats->to_proc[idx];
618             CmiAssert(senderPE != -1);
619           }
620           if(cdata.receiver.get_type() == LD_PROC_MSG)
621             receiverPE = cdata.receiver.proc();
622           else {
623             int idx = stats->getHash(cdata.receiver.get_destObj());
624             CmiAssert(idx != -1);
625             receiverPE = stats->to_proc[idx];
626             CmiAssert(receiverPE != -1);
627           }
628           if(senderPE != receiverPE)
629           {
630                 msgSentCount[senderPE] += cdata.messages;
631                 byteSentCount[senderPE] += cdata.bytes;
632
633                 msgRecvCount[receiverPE] += cdata.messages;
634                 byteRecvCount[receiverPE] += cdata.bytes;
635           }
636         }
637
638         // now for each processor, add to its load the send and receive overheads
639 #if 1
640         for(i = 0; i < count; i++)
641         {
642                 peLoads[i] += msgRecvCount[i]  * PER_MESSAGE_RECV_OVERHEAD +
643                                   msgSentCount[i]  * PER_MESSAGE_SEND_OVERHEAD +
644                                   byteRecvCount[i] * PER_BYTE_RECV_OVERHEAD +
645                                   byteSentCount[i] * PER_BYTE_SEND_OVERHEAD;
646         }
647 #endif
648         delete msgRecvCount;
649         delete msgSentCount;
650         delete byteRecvCount;
651         delete byteSentCount;
652 }
653
654 void CentralLB::FindSimResults(LDStats* stats, int count, LBMigrateMsg* msg, LBSimulation* simResults)
655 {
656     CkAssert(simResults != NULL && count == simResults->numPes);
657     // estimate the new loads of the processors. As a first approximation, this is the
658     // get background load
659     for(int pe = 0; pe < count; pe++)
660           simResults->bgLoads[pe] = stats->procs[pe].bg_walltime;
661     // sum of the cpu times of the objects on that processor
662     double startT = CmiWallTimer();
663     getPredictedLoad(stats, count, msg, simResults->peLoads, 
664                      simResults->minObjLoad, simResults->maxObjLoad);
665     CmiPrintf("getPredictedLoad finished in %fs\n", CmiWallTimer()-startT);
666 }
667
668 int CentralLB::useMem() { 
669   return CkNumPes() * (sizeof(CentralLB::LDStats)+sizeof(CLBStatsMsg *)) +
670                         sizeof(CentralLB);
671 }
672
673 inline static int ObjKey(const LDObjid &oid, const int hashSize) {
674   return ((oid.id[0]<<16)|(oid.id[1]<<8)|oid.id[2]) % hashSize;
675 }
676
677 void CentralLB::LDStats::makeCommHash() {
678   int i;
679   if (objHash) return;
680    
681   hashSize = n_objs*2;
682   objHash = new int[hashSize];
683   for(i=0;i<hashSize;i++)
684         objHash[i] = -1;
685    
686   for(i=0;i<n_objs;i++){
687         const LDObjid &oid = objData[i].objID();
688         int hash = ObjKey(oid, hashSize);
689         while(objHash[hash] != -1)
690             hash = (hash+1)%hashSize;
691         objHash[hash] = i;
692   }
693 }
694
695 void CentralLB::LDStats::deleteCommHash() {
696   if (objHash) delete [] objHash;
697   objHash = NULL;
698 }
699
700 int CentralLB::LDStats::getHash(const LDObjid &oid, const LDOMid &mid)
701 {
702     int hash = ObjKey(oid, hashSize);
703
704     for(int id=0;id<hashSize;id++){
705         int index = (id+hash)%hashSize;
706         if (LDObjIDEqual(objData[objHash[index]].objID(), oid) &&
707             LDOMidEqual(objData[objHash[index]].omID(), mid))
708             return objHash[index];
709     }
710     //  CkPrintf("not found \n");
711     return -1;
712 }
713
714 int CentralLB::LDStats::getHash(const LDObjKey &objKey)
715 {
716   const LDObjid &oid = objKey.objID();
717   const LDOMid  &mid = objKey.omID();
718   return getHash(oid, mid);
719 }
720
721 void CentralLB::LDStats::pup(PUP::er &p)
722 {
723   int i;
724   p(count);  
725   p(n_objs);
726   p(n_comm);
727   if (p.isUnpacking()) {
728     // user can specify simulated processors other than the real # of procs.
729     int maxpe = count>LBSimulation::simProcs?count:LBSimulation::simProcs;
730     procs = new ProcStats[maxpe];
731     objData = new LDObjData[n_objs];
732     commData = new LDCommData[n_comm];
733     from_proc = new int[n_objs];
734     to_proc = new int[n_objs];
735     objHash = NULL;
736   }
737   // ignore the background load when unpacking
738   if (p.isUnpacking()) {
739     ProcStats dummy;
740     for (i=0; i<count; i++) p|dummy; 
741   }
742   else
743     for (i=0; i<count; i++) p|procs[i];
744   for (i=0; i<n_objs; i++) p|objData[i]; 
745   p(from_proc, n_objs);
746   p(to_proc, n_objs);
747   for (i=0; i<n_comm; i++) p|commData[i];
748   if (p.isUnpacking())
749     count = LBSimulation::simProcs;
750 }
751
752 int CentralLB::LDStats::useMem() { 
753   // calculate the memory usage of this LB (superclass).
754   return sizeof(CentralLB) + sizeof(LDStats) + sizeof(ProcStats)*count + 
755          (sizeof(LDObjData) + 2*sizeof(int)) * n_objs +
756          sizeof(LDCommData) * n_comm;
757 }
758
759 /**
760   CLBStatsMsg is not a real message now.
761   CLBStatsMsg is used for all processors to fill in their local load and comm
762   statistics and send to processor 0
763 */
764
765 CLBStatsMsg::CLBStatsMsg(int osz, int csz) {
766   objData = new LDObjData[osz];
767   commData = new LDCommData[csz];
768   avail_vector = new char[CkNumPes()];
769 }
770
771 CLBStatsMsg::~CLBStatsMsg() {
772   delete [] objData;
773   delete [] commData;
774   delete [] avail_vector;
775 }
776
777 void CLBStatsMsg::pup(PUP::er &p) {
778   int i;
779   p|from_pe;
780   p|serial;
781   p|pe_speed;
782   p|total_walltime; p|total_cputime;
783   p|idletime;
784   p|bg_walltime;   p|bg_cputime;
785   p|n_objs;        
786   if (p.isUnpacking()) objData = new LDObjData[n_objs];
787   for (i=0; i<n_objs; i++) p|objData[i];
788   p|n_comm;
789   if (p.isUnpacking()) commData = new LDCommData[n_comm];
790   for (i=0; i<n_comm; i++) p|commData[i];
791   if (p.isUnpacking()) avail_vector = new char[CkNumPes()];
792   p(avail_vector, CkNumPes());
793   p(next_lb);
794 }
795
796 // CkMarshalledCLBStatsMessage is used in the marshalled parameter in
797 // the entry function, it is just used to use to pup.
798 // I don't use CLBStatsMsg directly as marshalled parameter because
799 // I want the data pointer stored and not to be freed by the Charm++.
800 CkMarshalledCLBStatsMessage::~CkMarshalledCLBStatsMessage() {
801   if (msg) delete msg;
802 }
803
804 void CkMarshalledCLBStatsMessage::pup(PUP::er &p)
805 {
806   if (p.isUnpacking()) msg = new CLBStatsMsg;
807   else CmiAssert(msg);
808   msg->pup(p);
809 }
810
811 #endif
812
813 /*@}*/