6eff90b84a5887777087f8e3e634e9a7592f3e09
[charm.git] / src / ck-ldb / CentralLB.C
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 /**
9  * \addtogroup CkLdb
10 */
11 /*@{*/
12
13 #include <charm++.h>
14 #include "envelope.h"
15 #include "CentralLB.h"
16 #include "CentralLB.def.h"
17
18 #define  DEBUGF(x)    // CmiPrintf x;
19
20 CkGroupID loadbalancer;
21 char ** avail_vector_address;
22 int * lb_ptr;
23 int load_balancer_created;
24
25 #if CMK_LBDB_ON
26
27 static void getPredictedLoad(CentralLB::LDStats* stats, int count, 
28                           LBMigrateMsg *, double *peLoads, double &, double &);
29
30 void CreateCentralLB()
31 {
32   loadbalancer = CProxy_CentralLB::ckNew();
33 }
34
35 void set_avail_vector(char * bitmap){
36     int count;
37     int assigned = 0;
38     for(count = 0; count < CkNumPes(); count++){
39         (*avail_vector_address)[count] = bitmap[count];
40         if((bitmap[count] == 1) && !assigned){
41             *lb_ptr = count;
42             assigned = 1;
43         }
44     }
45 }
46
47 void CentralLB::staticStartLB(void* data)
48 {
49   CentralLB *me = (CentralLB*)(data);
50
51   me->StartLB();
52 }
53
54 void CentralLB::staticMigrated(void* data, LDObjHandle h)
55 {
56   CentralLB *me = (CentralLB*)(data);
57
58   me->Migrated(h);
59 }
60
61 void CentralLB::staticAtSync(void* data)
62 {
63   CentralLB *me = (CentralLB*)(data);
64
65   me->AtSync();
66 }
67
68 CentralLB::CentralLB()
69 {
70   lbname = "CentralLB";
71   mystep = 0;
72   //  CkPrintf("Construct in %d\n",CkMyPe());
73   theLbdb = CProxy_LBDatabase(lbdb).ckLocalBranch();
74   theLbdb->
75     AddLocalBarrierReceiver((LDBarrierFn)(staticAtSync),(void*)(this));
76   theLbdb->
77     NotifyMigrated((LDMigratedFn)(staticMigrated),(void*)(this));
78   theLbdb->
79     AddStartLBFn((LDStartLBFn)(staticStartLB),(void*)(this));
80
81   stats_msg_count = 0;
82   statsMsgsList = new CLBStatsMsg*[CkNumPes()];
83   for(int i=0; i < CkNumPes(); i++)
84     statsMsgsList[i] = 0;
85
86   statsData = new LDStats;
87   statsData->procs = new ProcStats[CkNumPes()];
88
89   myspeed = theLbdb->ProcessorSpeed();
90   theLbdb->CollectStatsOn();
91   migrates_completed = 0;
92   migrates_expected = -1;
93
94   cur_ld_balancer = 0;
95   new_ld_balancer = 0;
96   int num_proc = CkNumPes();
97   avail_vector = new char[num_proc];
98   for(int proc = 0; proc < num_proc; proc++)
99       avail_vector[proc] = 1;
100   avail_vector_address = &(avail_vector);
101   lb_ptr = &new_ld_balancer;
102
103   load_balancer_created = 1;
104 }
105
106 CentralLB::~CentralLB()
107 {
108   CkPrintf("Going away\n");
109   theLbdb->
110     RemoveStartLBFn((LDStartLBFn)(staticStartLB));
111 }
112
113 void CentralLB::AtSync()
114 {
115   DEBUGF(("[%d] CentralLB At Sync step %d!!!!\n",CkMyPe(),mystep));
116
117   if (!QueryBalanceNow(step())) {
118     MigrationDone(0);
119     return;
120   }
121   thisProxy [CkMyPe()].ProcessAtSync();
122 }
123
124 void CentralLB::ProcessAtSync()
125 {
126   if (CkMyPe() == cur_ld_balancer) {
127     start_lb_time = CmiWallTimer();
128     if (lb_debug) 
129       CmiPrintf("[%s] Load balancing step %d starting at %f in PE%d\n",
130                  lbName(), step(),start_lb_time, cur_ld_balancer);
131   }
132   // build and send stats
133   const int osz = theLbdb->GetObjDataSz();
134   const int csz = theLbdb->GetCommDataSz();
135
136   int npes = CkNumPes();
137   CLBStatsMsg* msg = new CLBStatsMsg(osz, csz);
138   msg->from_pe = CkMyPe();
139   // msg->serial = rand();
140   msg->serial = CrnRand();
141
142   theLbdb->TotalTime(&msg->total_walltime,&msg->total_cputime);
143   theLbdb->IdleTime(&msg->idletime);
144   theLbdb->BackgroundLoad(&msg->bg_walltime,&msg->bg_cputime);
145   msg->pe_speed = myspeed;
146 //  CkPrintf(
147 //    "Processors %d Total time (wall,cpu) = %f %f Idle = %f Bg = %f %f\n",
148 //    CkMyPe(),msg->total_walltime,msg->total_cputime,
149 //    msg->idletime,msg->bg_walltime,msg->bg_cputime);
150
151   msg->n_objs = osz;
152   theLbdb->GetObjData(msg->objData);
153   msg->n_comm = csz;
154   theLbdb->GetCommData(msg->commData);
155 //  theLbdb->ClearLoads();
156   DEBUGF(("PE %d sending %d to ReceiveStats %d objs, %d comm\n",
157            CkMyPe(),msg->serial,msg->n_objs,msg->n_comm));
158
159 // Scheduler PART.
160
161   if(CkMyPe() == cur_ld_balancer) {
162     int num_proc = CkNumPes();
163     for(int proc = 0; proc < num_proc; proc++){
164       msg->avail_vector[proc] = avail_vector[proc];
165     }
166     msg->next_lb = new_ld_balancer;
167   }
168
169   thisProxy[cur_ld_balancer].ReceiveStats(msg);
170 }
171
172 void CentralLB::Migrated(LDObjHandle h)
173 {
174   migrates_completed++;
175   //  CkPrintf("[%d] An object migrated! %d %d\n",
176   //       CkMyPe(),migrates_completed,migrates_expected);
177   if (migrates_completed == migrates_expected) {
178     MigrationDone(1);
179   }
180 }
181
182 // build data from buffered msg
183 void CentralLB::buildStats()
184 {
185     statsData->count = stats_msg_count;
186     statsData->objData = new LDObjData[statsData->n_objs];
187     statsData->from_proc = new int[statsData->n_objs];
188     statsData->to_proc = new int[statsData->n_objs];
189     statsData->commData = new LDCommData[statsData->n_comm];
190     int nobj = 0;
191     int ncom = 0;
192     // copy all data in individule message to this bug structure
193     for (int pe=0; pe<stats_msg_count; pe++) {
194        int i;
195        CLBStatsMsg *msg = statsMsgsList[pe];
196        for (i=0; i<msg->n_objs; i++) {
197          statsData->from_proc[nobj] = statsData->to_proc[nobj] = pe;
198          statsData->objData[nobj] = msg->objData[i];
199          nobj++;
200        }
201        for (i=0; i<msg->n_comm; i++) {
202          statsData->commData[ncom] = msg->commData[i];
203          ncom++;
204        }
205        delete msg;
206        statsMsgsList[pe]=0;
207     }
208 }
209
210 void CentralLB::ReceiveStats(CkMarshalledCLBStatsMessage &msg)
211 {
212   CLBStatsMsg *m = (CLBStatsMsg *)msg.getMessage();
213   int proc;
214   const int pe = m->from_pe;
215 //  CkPrintf("Stats msg received, %d %d %d %d %p\n",
216 //         pe,stats_msg_count,m->n_objs,m->serial,m);
217
218   if((pe == 0) && (CkMyPe() != 0)){
219       new_ld_balancer = m->next_lb;
220       int num_proc = CkNumPes();
221       for(int proc = 0; proc < num_proc; proc++)
222           avail_vector[proc] = m->avail_vector[proc];
223   }
224
225   DEBUGF(("ReceiveStats from %d step: %d\n", pe, mystep));
226   if (statsMsgsList[pe] != 0) {
227     CkPrintf("*** Unexpected CLBStatsMsg in ReceiveStats from PE %d ***\n",
228              pe);
229   } else {
230     statsMsgsList[pe] = m;
231     struct ProcStats &procStat = statsData->procs[pe];
232     procStat.total_walltime = m->total_walltime;
233     procStat.total_cputime = m->total_cputime;
234     if (lb_ignoreBgLoad) {
235     procStat.idletime = 0.0;
236     procStat.bg_walltime = 0.0;
237     procStat.bg_cputime = 0.0;
238     }
239     else {
240     procStat.idletime = m->idletime;
241     procStat.bg_walltime = m->bg_walltime;
242     procStat.bg_cputime = m->bg_cputime;
243     }
244     procStat.pe_speed = m->pe_speed;
245     procStat.utilization = 1.0;
246     procStat.available = CmiTrue;
247     procStat.n_objs = m->n_objs;
248
249     statsData->n_objs += m->n_objs;
250     statsData->n_comm += m->n_comm;
251     stats_msg_count++;
252   }
253
254   const int clients = CkNumPes();
255   if (stats_msg_count == clients) {
256 //    double strat_start_time = CmiWallTimer();
257
258 //    CkPrintf("Before setting bitmap\n");
259     // build data
260     buildStats();
261
262     // if this is the step at which we need to dump the database
263     simulation();
264
265     for(proc = 0; proc < clients; proc++)
266       statsData->procs[proc].available = (CmiBool)avail_vector[proc];
267
268 //    CkPrintf("Before Calling Strategy\n");
269
270     LBMigrateMsg* migrateMsg = Strategy(statsData, clients);
271
272 //    CkPrintf("returned successfully\n");
273     int num_proc = CkNumPes();
274
275     for(proc = 0; proc < num_proc; proc++)
276         migrateMsg->avail_vector[proc] = avail_vector[proc];
277     migrateMsg->next_lb = new_ld_balancer;
278
279 //  very time consuming, only needed for step load balancing
280 #if 0
281     getPredictedLoad(statsDataList, clients, migrateMsg, migrateMsg->expectedLoad);
282 #endif
283
284
285 //  CkPrintf("calling recv migration\n");
286     thisProxy.ReceiveMigration(migrateMsg);
287
288     // Zero out data structures for next cycle
289     // CkPrintf("zeroing out data\n");
290     statsData->clear();
291     stats_msg_count=0;
292
293     double strat_end_time = CmiWallTimer();
294     //     CkPrintf("Strat elapsed time %f\n",strat_end_time-strat_start_time);
295   }
296
297 }
298
299 // test if sender and receiver in a commData is nonmigratable.
300 static int isMigratable(LDObjData **objData, int *len, int count, const LDCommData &commData)
301 {
302   for (int pe=0 ; pe<count; pe++)
303   {
304     for (int i=0; i<len[pe]; i++)
305       if (LDObjIDEqual(objData[pe][i].id(), commData.sender) ||
306           LDObjIDEqual(objData[pe][i].id(), commData.receiver)) return 0;
307   }
308   return 1;
309 }
310
311 #if 0
312 // remove in the LDStats those objects that are non migratable
313 void CentralLB::RemoveNonMigratable(LDStats* stats, int count)
314 {
315   int pe;
316   LDObjData **nonmig = new LDObjData*[count];
317   int   *lens = new int[count];
318   int n_objs = stats->n_objs;
319     LDObjStats *objStat = stats.objData[n]; 
320     int l=-1, h=n_objs;
321     while (l<h) {
322       while (objStat[l+1].data.migratable && l<h) l++;
323       while (h>0 && !objStat[h-1].data.migratable && l<h) h--;
324       if (h-l>2) {
325         LDObjStats tmp = objStat[l+1];
326         objStat[l+1] = objStat[h-1];
327         objStat[h-1] = tmp;
328       }
329     }
330     stats->n_objs = h;
331     if (n_objs != h) CmiPrintf("Removed %d nonmigratable on pe:%d n_objs:%d migratable:%d\n", n_objs-h, pe, n_objs, h);
332     nonmig[pe] = objData+stats[pe].n_objs;
333     lens[pe] = n_objs-stats[pe].n_objs;
334
335     // now turn nonmigratable to bg load
336     for (int j=stats[pe].n_objs; j<n_objs; j++) {
337       stats[pe].bg_walltime += objData[j].wallTime;
338       stats[pe].bg_cputime += objData[j].cpuTime;
339     }
340
341   // modify comm data
342   for (pe=0; pe<count; pe++) {
343     int n_comm = stats[pe].n_comm;
344     if (n_comm == 0) continue;
345     LDCommData *commData = stats[pe].commData;
346     int l=-1, h=n_comm;
347     while (l<h) {
348       while (isMigratable(nonmig, lens, count, commData[l+1]) && l<h) l++;
349       while (!isMigratable(nonmig, lens, count, commData[h-1]) && l<h) h--;
350       if (h-l>2) {
351         LDCommData tmp = commData[l+1];
352         commData[l+1] = commData[h-1];
353         commData[h-1] = tmp;
354       }
355       else 
356         break;
357     }
358     if (l==-1 && h==-1) h=0;
359     stats[pe].n_comm = h;
360     if (n_comm != h) CmiPrintf("Removed %d nonmigratable on pe:%d n_comm:%d migratable:%d\n", n_comm-h, pe, n_comm, h);
361   }
362   delete [] nonmig;
363   delete [] lens;
364 }
365 #endif
366
367 void CentralLB::ReceiveMigration(LBMigrateMsg *m)
368 {
369   int i;
370   for (i=0; i<CkNumPes(); i++) theLbdb->lastLBInfo.expectedLoad[i] = m->expectedLoad[i];
371   
372   DEBUGF(("[%d] in ReceiveMigration %d moves\n",CkMyPe(),m->n_moves));
373   migrates_expected = 0;
374   for(i=0; i < m->n_moves; i++) {
375     MigrateInfo& move = m->moves[i];
376     const int me = CkMyPe();
377     if (move.from_pe == me && move.to_pe != me) {
378       DEBUGF(("[%d] migrating object to %d\n",move.from_pe,move.to_pe));
379       theLbdb->Migrate(move.obj,move.to_pe);
380     } else if (move.from_pe != me && move.to_pe == me) {
381         //  CkPrintf("[%d] expecting object from %d\n",move.to_pe,move.from_pe);
382       migrates_expected++;
383     }
384   }
385 #if 0
386   if (m->n_moves ==0) {
387     theLbdb->SetLBPeriod(theLbdb->GetLBPeriod()*2);
388   }
389 #endif
390
391   cur_ld_balancer = m->next_lb;
392   if((CkMyPe() == cur_ld_balancer) && (cur_ld_balancer != 0)){
393       int num_proc = CkNumPes();
394       for(int proc = 0; proc < num_proc; proc++)
395           avail_vector[proc] = m->avail_vector[proc];
396   }
397
398   if (migrates_expected == 0 || migrates_completed == migrates_expected)
399     MigrationDone(1);
400   delete m;
401 }
402
403
404 void CentralLB::MigrationDone(int balancing)
405 {
406   if (balancing && lb_debug && CkMyPe() == cur_ld_balancer) {
407     double end_lb_time = CmiWallTimer();
408       CkPrintf("[%s] Load balancing step %d finished at %f\n",
409                 lbName(), step(),end_lb_time);
410       double lbdbMemsize = LBDatabase::Object()->useMem()/1000;
411       CkPrintf("[%s] duration %f memUsage: LBManager:%dKB CentralLB:%dKB\n", 
412                 lbName(), end_lb_time - start_lb_time,
413                 (int)lbdbMemsize, (int)(useMem()/1000));
414   }
415   migrates_completed = 0;
416   migrates_expected = -1;
417   // clear load stats
418   if (balancing) theLbdb->ClearLoads();
419   // Increment to next step
420   mystep++;
421   thisProxy [CkMyPe()].ResumeClients();
422 }
423
424 void CentralLB::ResumeClients()
425 {
426   DEBUGF(("Resuming clients on PE %d\n",CkMyPe()));
427   theLbdb->ResumeClients();
428 }
429
430 // default load balancing strategy
431 LBMigrateMsg* CentralLB::Strategy(LDStats* stats,int count)
432 {
433   work(stats, count);
434   return createMigrateMsg(stats, count);
435 }
436
437 void CentralLB::work(LDStats* stats,int count)
438 {
439   int i;
440   for(int pe=0; pe < count; pe++) {
441     struct ProcStats &proc = stats->procs[pe];
442
443     CkPrintf(
444       "Proc %d Sp %d Total time (wall,cpu) = %f %f Idle = %f Bg = %f %f\n",
445       pe,proc.pe_speed,proc.total_walltime,proc.total_cputime,
446       proc.idletime,proc.bg_walltime,proc.bg_cputime);
447   }
448
449   int osz = stats->n_objs;
450     CkPrintf("------------- Object Data: %d objects -------------\n",
451              stats->n_objs);
452     for(i=0; i < osz; i++) {
453       LDObjData &odata = stats->objData[i];
454       CkPrintf("Object %d\n",i);
455       CkPrintf("     id = %d\n",odata.id().id[0]);
456       CkPrintf("  OM id = %d\n",odata.omID().id);
457       CkPrintf("   Mig. = %d\n",odata.migratable);
458       CkPrintf("    CPU = %f\n",odata.cpuTime);
459       CkPrintf("   Wall = %f\n",odata.wallTime);
460     }
461
462     const int csz = stats->n_comm;
463
464     CkPrintf("------------- Comm Data: %d records -------------\n",
465              csz);
466     LDCommData *cdata = stats->commData;
467     for(i=0; i < csz; i++) {
468       CkPrintf("Link %d\n",i);
469
470       if (cdata[i].from_proc())
471         CkPrintf("    sender PE = %d\n",cdata[i].src_proc);
472       else
473         CkPrintf("    sender id = %d:%d\n",
474                  cdata[i].senderOM.id,cdata[i].sender.id[0]);
475
476       if (cdata[i].to_proc())
477         CkPrintf("  receiver PE = %d\n",cdata[i].dest_proc);
478       else      
479         CkPrintf("  receiver id = %d:%d\n",
480                  cdata[i].receiverOM.id,cdata[i].receiver.id[0]);
481       
482       CkPrintf("     messages = %d\n",cdata[i].messages);
483       CkPrintf("        bytes = %d\n",cdata[i].bytes);
484     }
485 }
486
487 LBMigrateMsg * CentralLB::createMigrateMsg(LDStats* stats,int count)
488 {
489   int sizes=0;
490   LBMigrateMsg* msg = new(&sizes,1) LBMigrateMsg;
491   msg->n_moves = 0;
492
493   return msg;
494 }
495
496 void CentralLB::simulation() {
497   if(step() == LBSimulation::dumpStep)
498   {
499     // here we are supposed to dump the database
500     writeStatsMsgs(LBSimulation::dumpFile);
501     CmiPrintf("LBDump: Dumped the load balancing data.\n");
502     CmiPrintf("Charm++> Exiting...\n");
503     CkExit();
504     return;
505   }
506   else if(LBSimulation::doSimulation)
507   {
508     // here we are supposed to read the data from the dump database
509     readStatsMsgs(LBSimulation::dumpFile);
510
511     LBSimulation simResults(LBSimulation::simProcs);
512
513     // now pass it to the strategy routine
514     double startT = CmiWallTimer();
515     CmiPrintf("%s> Strategy starts ... \n", lbname);
516     LBMigrateMsg* migrateMsg = Strategy(statsData, LBSimulation::simProcs);
517     CmiPrintf("%s> Strategy took %fs. \n", lbname, CmiWallTimer()-startT);
518
519     // now calculate the results of the load balancing simulation
520     FindSimResults(statsData, LBSimulation::simProcs, migrateMsg, &simResults);
521
522     // now we have the simulation data, so print it and exit
523     CmiPrintf("Charm++> LBSim: Simulation of one load balancing step done.\n");
524     simResults.PrintSimulationResults();
525
526     delete migrateMsg;
527     CmiPrintf("Charm++> Exiting...\n");
528     CkExit();
529   }
530 }
531
532 void CentralLB::readStatsMsgs(const char* filename) {
533
534   int i;
535   FILE *f = fopen(filename, "r");
536   if (f==NULL) CmiAbort("Fatal Error> Cannot open LB Dump file!\n");
537
538   // at this stage, we need to rebuild the statsMsgList and
539   // statsDataList structures. For that first deallocate the
540   // old structures
541   for(i = 0; i < stats_msg_count; i++)
542         delete statsMsgsList[i];
543   delete[] statsMsgsList;
544
545   PUP::fromDisk p(f);
546   p|stats_msg_count;
547
548   CmiPrintf("readStatsMsgs for %d pes starts ... \n", stats_msg_count);
549   if (LBSimulation::simProcs == 0) LBSimulation::simProcs = stats_msg_count;
550
551   // LBSimulation::simProcs must be set
552   statsData->pup(p);
553
554   CmiPrintf("Simulation for %d pes \n", LBSimulation::simProcs);
555
556   // file f is closed in the destructor of PUP::fromDisk
557   CmiPrintf("ReadStatsMsg from %s completed\n", filename);
558 }
559
560 void CentralLB::writeStatsMsgs(const char* filename) {
561
562   int i;
563   FILE *f = fopen(filename, "w");
564   if (f == NULL) 
565     CmiAbort("writeStatsMsgs failed to open the output file!\n");
566
567   PUP::toDisk p(f);
568   p|stats_msg_count;
569
570 #if LB_DUMP_MSG
571   for (i = 0; i < stats_msg_count; i++) {
572     CLBStatsMsg *m = statsMsgsList[i];
573     envelope *env=UsrToEnv(m);
574     CkPackMessage(&env); //Pack it
575     m = (CLBStatsMsg *)EnvToUsr(env);
576     CkPupMessage(p, (void **)&m, 2);
577   }
578 #else
579   statsData->pup(p);
580 #endif
581
582   fclose(f);
583
584   CmiPrintf("WriteStatsMsgs to %s succeed!\n", filename);
585 }
586
587 static void getPredictedLoad(CentralLB::LDStats* stats, int count, 
588                              LBMigrateMsg *msg, double *peLoads, 
589                              double &minObjLoad, double &maxObjLoad)
590 {
591         int* msgSentCount = new int[count]; // # of messages sent by each PE
592         int* msgRecvCount = new int[count]; // # of messages received by each PE
593         int* byteSentCount = new int[count];// # of bytes sent by each PE
594         int* byteRecvCount = new int[count];// # of bytes reeived by each PE
595         int i, pe;
596
597         for(i = 0; i < count; i++)
598           msgSentCount[i] = msgRecvCount[i] = byteSentCount[i] = byteRecvCount[i] = 0;
599         minObjLoad = 1.0e20;    // I suppose no object load is beyond this
600         maxObjLoad = 0.0;
601
602         stats->makeCommHash();
603         // update to_proc according to migration msgs
604         for(i = 0; i < msg->n_moves; i++) {
605           MigrateInfo &mInfo = msg->moves[i];
606           int idx = stats->getHash(mInfo.obj.objID(), mInfo.obj.omID());
607           CmiAssert(idx != -1);
608           stats->to_proc[idx] = mInfo.to_pe;
609         }
610
611         for(pe = 0; pe < count; pe++)
612         {
613           peLoads[pe] = stats->procs[pe].bg_walltime;
614         }
615
616         for(int obj = 0; obj < stats->n_objs; obj++)
617         {
618                 int pe = stats->to_proc[obj];
619                 double &oload = stats->objData[obj].wallTime;
620                 if (oload < minObjLoad) minObjLoad = oload;
621                 if (oload > maxObjLoad) maxObjLoad = oload;
622                 peLoads[pe] += oload;
623         }
624
625         // handling of the communication overheads. 
626         for (int cidx=0; cidx < stats->n_comm; cidx++) {
627           LDCommData& cdata = stats->commData[cidx];
628           int senderPE, receiverPE;
629           if(cdata.from_proc())
630             senderPE = cdata.src_proc;
631           else {
632             int idx = stats->getHash(cdata.sender, cdata.senderOM);
633             CmiAssert(idx != -1);
634             senderPE = stats->to_proc[idx];
635             CmiAssert(senderPE != -1);
636           }
637           if(cdata.to_proc())
638             receiverPE = cdata.dest_proc;
639           else {
640             int idx = stats->getHash(cdata.receiver, cdata.receiverOM);
641             CmiAssert(idx != -1);
642             receiverPE = stats->to_proc[idx];
643             CmiAssert(receiverPE != -1);
644           }
645           if(senderPE != receiverPE)
646           {
647                 msgSentCount[senderPE] += cdata.messages;
648                 byteSentCount[senderPE] += cdata.bytes;
649
650                 msgRecvCount[receiverPE] += cdata.messages;
651                 byteRecvCount[receiverPE] += cdata.bytes;
652           }
653         }
654
655         // now for each processor, add to its load the send and receive overheads
656 #if 1
657         for(i = 0; i < count; i++)
658         {
659                 peLoads[i] += msgRecvCount[i]  * PER_MESSAGE_RECV_OVERHEAD +
660                                   msgSentCount[i]  * PER_MESSAGE_SEND_OVERHEAD +
661                                   byteRecvCount[i] * PER_BYTE_RECV_OVERHEAD +
662                                   byteSentCount[i] * PER_BYTE_SEND_OVERHEAD;
663         }
664 #endif
665         delete msgRecvCount;
666         delete msgSentCount;
667         delete byteRecvCount;
668         delete byteSentCount;
669 }
670
671 void CentralLB::FindSimResults(LDStats* stats, int count, LBMigrateMsg* msg, LBSimulation* simResults)
672 {
673     CkAssert(simResults != NULL && count == simResults->numPes);
674     // estimate the new loads of the processors. As a first approximation, this is the
675     // get background load
676     for(int pe = 0; pe < count; pe++)
677           simResults->bgLoads[pe] = stats->procs[pe].bg_walltime;
678     // sum of the cpu times of the objects on that processor
679     double startT = CmiWallTimer();
680     getPredictedLoad(stats, count, msg, simResults->peLoads, 
681                      simResults->minObjLoad, simResults->maxObjLoad);
682     CmiPrintf("getPredictedLoad finished in %fs\n", CmiWallTimer()-startT);
683 }
684
685 int CentralLB::useMem() { 
686   return CkNumPes() * (sizeof(CentralLB::LDStats)+sizeof(CLBStatsMsg *)) +
687                         sizeof(CentralLB);
688 }
689
690 inline static int ObjKey(const LDObjid &oid, const int hashSize) {
691   return ((oid.id[0]<<16)|(oid.id[1]<<8)|oid.id[2]) % hashSize;
692 }
693
694 void CentralLB::LDStats::makeCommHash() {
695   if (transTable) return;
696
697   transTable = new LDOId[n_objs];
698   for (int obj=0; obj < n_objs; obj++){
699       LDObjData &oData = objData[obj];
700       transTable[obj].mid.id = oData.omID().id;
701       transTable[obj].oid = oData.id();
702   }
703   int i;
704    
705   hashSize = n_objs*2;
706   objHash = new int[hashSize];
707   for(i=0;i<hashSize;i++)
708         objHash[i] = -1;
709    
710   for(i=0;i<n_objs;i++){
711         LDObjid &oid = transTable[i].oid;
712         int hash = ObjKey(oid, hashSize);
713         while(objHash[hash] != -1)
714             hash = (hash+1)%hashSize;
715         objHash[hash] = i;
716   }
717 }
718
719 void CentralLB::LDStats::deleteCommHash() {
720   if (objHash) delete [] objHash;
721   objHash = NULL;
722   if (transTable) delete [] transTable;
723   transTable = NULL;
724 }
725
726 int CentralLB::LDStats::getHash(const LDObjid &oid, const LDOMid &mid)
727 {
728     int hash = ObjKey(oid, hashSize);
729
730     for(int id=0;id<hashSize;id++){
731         int index = (id+hash)%hashSize;
732         if (LDObjIDEqual(transTable[objHash[index]].oid, oid) &&
733             LDOMidEqual(transTable[objHash[index]].mid, mid))
734             return objHash[index];
735     }
736     //  CkPrintf("not found \n");
737     return -1;
738 }
739
740
741 void CentralLB::LDStats::pup(PUP::er &p)
742 {
743   int i;
744   p(count);  
745   p(n_objs);
746   p(n_comm);
747   if (p.isUnpacking()) {
748     // user can specify simulated processors other than the real # of procs.
749     int maxpe = count>LBSimulation::simProcs?count:LBSimulation::simProcs;
750     procs = new ProcStats[maxpe];
751     objData = new LDObjData[n_objs];
752     commData = new LDCommData[n_comm];
753     from_proc = new int[n_objs];
754     to_proc = new int[n_objs];
755     transTable = NULL;
756     objHash = NULL;
757   }
758   // ignore the background load when unpacking
759   if (p.isUnpacking()) {
760     ProcStats dummy;
761     for (i=0; i<count; i++) p|dummy; 
762   }
763   else
764     for (i=0; i<count; i++) p|procs[i];
765   for (i=0; i<n_objs; i++) p|objData[i]; 
766   p(from_proc, n_objs);
767   p(to_proc, n_objs);
768   for (i=0; i<n_comm; i++) p|commData[i];
769   if (p.isUnpacking())
770     count = LBSimulation::simProcs;
771 }
772
773 int CentralLB::LDStats::useMem() { 
774   // calculate the memory usage of this LB (superclass).
775   return sizeof(LDStats) + sizeof(ProcStats)*count + 
776          (sizeof(LDObjData) + 2*sizeof(int)) * n_objs +
777          sizeof(LDCommData) * n_comm;
778 }
779
780 /**
781   CLBStatsMsg is not a real message now.
782   CLBStatsMsg is used for all processors to fill in their local load and comm
783   statistics and send to processor 0
784 */
785
786 CLBStatsMsg::CLBStatsMsg(int osz, int csz) {
787   objData = new LDObjData[osz];
788   commData = new LDCommData[csz];
789   avail_vector = new char[CkNumPes()];
790 }
791
792 CLBStatsMsg::~CLBStatsMsg() {
793   delete [] objData;
794   delete [] commData;
795   delete [] avail_vector;
796 }
797
798 void CLBStatsMsg::pup(PUP::er &p) {
799   int i;
800   p|from_pe;
801   p|serial;
802   p|pe_speed;
803   p|total_walltime; p|total_cputime;
804   p|idletime;
805   p|bg_walltime;   p|bg_cputime;
806   p|n_objs;        
807   if (p.isUnpacking()) objData = new LDObjData[n_objs];
808   for (i=0; i<n_objs; i++) p|objData[i];
809   p|n_comm;
810   if (p.isUnpacking()) commData = new LDCommData[n_comm];
811   for (i=0; i<n_comm; i++) p|commData[i];
812   if (p.isUnpacking()) avail_vector = new char[CkNumPes()];
813   p(avail_vector, CkNumPes());
814   p(next_lb);
815 }
816
817 // CkMarshalledCLBStatsMessage is used in the marshalled parameter in
818 // the entry function, it is just used to use to pup.
819 // I don't use CLBStatsMsg directly as marshalled parameter because
820 // I want the data pointer stored and not to be freed by the Charm++.
821 CkMarshalledCLBStatsMessage::~CkMarshalledCLBStatsMessage() {
822   if (msg) delete msg;
823 }
824
825 void CkMarshalledCLBStatsMessage::pup(PUP::er &p)
826 {
827   if (p.isUnpacking()) msg = new CLBStatsMsg;
828   else CmiAssert(msg);
829   msg->pup(p);
830 }
831
832 #endif
833
834 /*@}*/