ignore idle timers for BigSim, changed CmiWallTimer to CkWallTimer() to better handle...
[charm.git] / src / ck-ldb / NborBaseLB.C
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 /**
9  * \addtogroup CkLdb
10 */
11 /*@{*/
12
13 #include "charm++.h"
14 #include "BaseLB.h"
15 #include "NborBaseLB.h"
16 #include "LBDBManager.h"
17 #include "NborBaseLB.def.h"
18
19 //CreateLBFunc_Def(NborBaseLB);
20
21 void NborBaseLB::staticMigrated(void* data, LDObjHandle h, int waitBarrier)
22 {
23   NborBaseLB *me = (NborBaseLB*)(data);
24
25   me->Migrated(h, waitBarrier);
26 }
27
28 void NborBaseLB::staticAtSync(void* data)
29 {
30   NborBaseLB *me = (NborBaseLB*)(data);
31
32   me->AtSync();
33 }
34
35 NborBaseLB::NborBaseLB(const CkLBOptions &opt): BaseLB(opt)
36 {
37 #if CMK_LBDB_ON
38   lbname = (char *)"NborBaseLB";
39   mystep = 0;
40   thisProxy = CProxy_NborBaseLB(thisgroup);
41   receiver = theLbdb->
42     AddLocalBarrierReceiver((LDBarrierFn)(staticAtSync),
43                             (void*)(this));
44   notifier = theLbdb->getLBDB()->
45     NotifyMigrated((LDMigratedFn)(staticMigrated), (void*)(this));
46
47
48   // I had to move neighbor initialization outside the constructor
49   // in order to get the virtual functions of any derived classes
50   // so I'll just set them to illegal values here.
51   LBtopoFn topofn = LBTopoLookup(_lbtopo);
52   if (topofn == NULL) {
53     if (CkMyPe()==0) CmiPrintf("LB> Fatal error: Unknown topology: %s.\n", _lbtopo);
54     CmiAbort("");
55   }
56   topo = topofn();
57
58   mig_msgs_expected = 0;
59   neighbor_pes = NULL;
60   stats_msg_count = 0;
61   statsMsgsList = NULL;
62   statsDataList = NULL;
63   migrates_completed = 0;
64   migrates_expected = -1;
65   mig_msgs_received = 0;
66   mig_msgs = NULL;
67
68   myStats.pe_speed = theLbdb->ProcessorSpeed();
69 //  char hostname[80];
70 //  gethostname(hostname,79);
71 //  CkPrintf("[%d] host %s speed %d\n",CkMyPe(),hostname,myStats.pe_speed);
72   myStats.from_pe = CkMyPe();
73   myStats.n_objs = 0;
74   myStats.objData = NULL;
75   myStats.n_comm = 0;
76   myStats.commData = NULL;
77   receive_stats_ready = 0;
78
79   theLbdb->CollectStatsOn();
80 #endif
81 }
82
83 NborBaseLB::~NborBaseLB()
84 {
85 #if CMK_LBDB_ON
86   theLbdb = CProxy_LBDatabase(_lbdb).ckLocalBranch();
87   if (theLbdb) {
88     theLbdb->getLBDB()->
89       RemoveNotifyMigrated(notifier);
90     //theLbdb->
91     //  RemoveStartLBFn((LDStartLBFn)(staticStartLB));
92   }
93   if (statsMsgsList) delete [] statsMsgsList;
94   if (statsDataList) delete [] statsDataList;
95   if (neighbor_pes)  delete [] neighbor_pes;
96   if (mig_msgs)      delete [] mig_msgs;
97 #endif
98 }
99
100 void NborBaseLB::FindNeighbors()
101 {
102   if (neighbor_pes == 0) { // Neighbors never initialized, so init them
103                            // and other things that depend on the number
104                            // of neighbors
105     int maxneighbors = topo->max_neighbors();
106     statsMsgsList = new NLBStatsMsg*[maxneighbors];
107     for(int i=0; i < maxneighbors; i++)
108       statsMsgsList[i] = 0;
109     statsDataList = new LDStats[maxneighbors];
110
111     neighbor_pes = new int[maxneighbors];
112     topo->neighbors(CkMyPe(), neighbor_pes, mig_msgs_expected);
113     mig_msgs = new LBMigrateMsg*[mig_msgs_expected];
114   }
115
116 }
117
118 void NborBaseLB::AtSync()
119 {
120 #if CMK_LBDB_ON
121   //  CkPrintf("[%d] NborBaseLB At Sync step %d!!!!\n",CkMyPe(),mystep);
122
123   if (neighbor_pes == 0) FindNeighbors();
124   start_lb_time = 0;
125
126   if (!QueryBalanceNow(step()) || mig_msgs_expected == 0) {
127     MigrationDone();
128     return;
129   }
130
131   if (CkMyPe() == 0) {
132     start_lb_time = CkWallTimer();
133     if (_lb_args.debug())
134       CkPrintf("Load balancing step %d starting at %f\n",
135                step(),start_lb_time);
136   }
137
138   NLBStatsMsg* msg = AssembleStats();
139
140   if (mig_msgs_expected > 0) {
141     CkMarshalledNLBStatsMessage marshmsg(msg);
142     thisProxy.ReceiveStats(marshmsg, mig_msgs_expected, neighbor_pes);
143   }
144
145   // Tell our own node that we are ready
146   CkMarshalledNLBStatsMessage mmsg(NULL);
147   thisProxy[CkMyPe()].ReceiveStats(mmsg);
148 #endif
149 }
150
151 NLBStatsMsg* NborBaseLB::AssembleStats()
152 {
153 #if CMK_LBDB_ON
154   // Get stats
155   theLbdb->TotalTime(&myStats.total_walltime,&myStats.total_cputime);
156   theLbdb->IdleTime(&myStats.idletime);
157   theLbdb->BackgroundLoad(&myStats.bg_walltime,&myStats.bg_cputime);
158
159   myStats.move = QueryMigrateStep(step());
160
161   myStats.n_objs = theLbdb->GetObjDataSz();
162   if (myStats.objData) delete [] myStats.objData;
163   myStats.objData = new LDObjData[myStats.n_objs];
164   theLbdb->GetObjData(myStats.objData);
165
166   myStats.n_comm = theLbdb->GetCommDataSz();
167   if (myStats.commData) delete [] myStats.commData;
168   myStats.commData = new LDCommData[myStats.n_comm];
169   theLbdb->GetCommData(myStats.commData);
170
171   myStats.obj_walltime = myStats.obj_cputime = 0;
172   for(int i=0; i < myStats.n_objs; i++) {
173     myStats.obj_walltime += myStats.objData[i].wallTime;
174     myStats.obj_cputime += myStats.objData[i].cpuTime;
175   }    
176
177   const int osz = theLbdb->GetObjDataSz();
178   const int csz = theLbdb->GetCommDataSz();
179   NLBStatsMsg* msg = new NLBStatsMsg(osz, csz);
180
181   msg->from_pe = CkMyPe();
182   // msg->serial = rand();
183   msg->serial = CrnRand();
184   msg->pe_speed = myStats.pe_speed;
185   msg->total_walltime = myStats.total_walltime;
186   msg->total_cputime = myStats.total_cputime;
187   msg->idletime = myStats.idletime;
188   msg->bg_walltime = myStats.bg_walltime;
189   msg->bg_cputime = myStats.bg_cputime;
190   msg->obj_walltime = myStats.obj_walltime;
191   msg->obj_cputime = myStats.obj_cputime;
192   msg->n_objs = osz;
193   theLbdb->GetObjData(msg->objData);
194   msg->n_comm = csz;
195   theLbdb->GetCommData(msg->commData);
196
197   //  CkPrintf(
198   //    "Proc %d speed=%d Total(wall,cpu)=%f %f Idle=%f Bg=%f %f Obj=%f %f\n",
199   //    CkMyPe(),msg->proc_speed,msg->total_walltime,msg->total_cputime,
200   //    msg->idletime,msg->bg_walltime,msg->bg_cputime,
201   //    msg->obj_walltime,msg->obj_cputime);
202
203   //  CkPrintf("PE %d sending %d to ReceiveStats %d objs, %d comm\n",
204   //       CkMyPe(),msg->serial,msg->n_objs,msg->n_comm);
205   return msg;
206 #else
207   return NULL;
208 #endif
209 }
210
211 void NborBaseLB::Migrated(LDObjHandle h, int waitBarrier)
212 {
213   migrates_completed++;
214   //  CkPrintf("[%d] An object migrated! %d %d\n",
215   //       CkMyPe(),migrates_completed,migrates_expected);
216   if (migrates_completed == migrates_expected) {
217     MigrationDone();
218   }
219 }
220
221 void NborBaseLB::ReceiveStats(CkMarshalledNLBStatsMessage &data)
222 {
223 #if CMK_LBDB_ON
224   NLBStatsMsg *m = data.getMessage();
225   if (neighbor_pes == 0) FindNeighbors();
226
227   if (m == 0) { // This is from our own node
228     receive_stats_ready = 1;
229   } else {
230     const int pe = m->from_pe;
231     //  CkPrintf("Stats msg received, %d %d %d %d %p\n",
232     //             pe,stats_msg_count,m->n_objs,m->serial,m);
233     int peslot = NeighborIndex(pe);
234
235     if (peslot == -1 || statsMsgsList[peslot] != 0) {
236       CkPrintf("*** Unexpected NLBStatsMsg in ReceiveStats from PE %d ***\n",
237                pe);
238     } else {
239       statsMsgsList[peslot] = m;
240       statsDataList[peslot].from_pe = m->from_pe;
241       statsDataList[peslot].total_walltime = m->total_walltime;
242       statsDataList[peslot].total_cputime = m->total_cputime;
243       statsDataList[peslot].idletime = m->idletime;
244       statsDataList[peslot].bg_walltime = m->bg_walltime;
245       statsDataList[peslot].bg_cputime = m->bg_cputime;
246       statsDataList[peslot].pe_speed = m->pe_speed;
247       statsDataList[peslot].obj_walltime = m->obj_walltime;
248       statsDataList[peslot].obj_cputime = m->obj_cputime;
249
250       statsDataList[peslot].n_objs = m->n_objs;
251       statsDataList[peslot].objData = m->objData;
252       statsDataList[peslot].n_comm = m->n_comm;
253       statsDataList[peslot].commData = m->commData;
254       stats_msg_count++;
255     }
256   }
257
258   const int clients = mig_msgs_expected;
259   if (stats_msg_count == clients && receive_stats_ready) {
260     double strat_start_time = CkWallTimer();
261     receive_stats_ready = 0;
262     LBMigrateMsg* migrateMsg = Strategy(statsDataList,clients);
263
264     int i;
265
266     // Migrate messages from me to elsewhere
267     for(i=0; i < migrateMsg->n_moves; i++) {
268       MigrateInfo& move = migrateMsg->moves[i];
269       const int me = CkMyPe();
270       if (move.from_pe == me && move.to_pe != me) {
271         theLbdb->Migrate(move.obj,move.to_pe);
272       } else if (move.from_pe != me) {
273         CkPrintf("[%d] error, strategy wants to move from %d to  %d\n",
274                  me,move.from_pe,move.to_pe);
275       }
276     }
277     
278     // Now, send migrate messages to neighbors
279     if (clients > 0)
280       thisProxy.ReceiveMigration(migrateMsg, clients, neighbor_pes);
281     
282     // Zero out data structures for next cycle
283     for(i=0; i < clients; i++) {
284       delete statsMsgsList[i];
285       statsMsgsList[i]=NULL;
286     }
287     stats_msg_count=0;
288
289     theLbdb->ClearLoads();
290     if (CkMyPe() == 0) {
291       double strat_end_time = CkWallTimer();
292       if (_lb_args.debug())
293         CkPrintf("Strat elapsed time %f\n",strat_end_time-strat_start_time);
294     }
295   }
296 #endif  
297 }
298
299 void NborBaseLB::ReceiveMigration(LBMigrateMsg *msg)
300 {
301 #if CMK_LBDB_ON
302   if (neighbor_pes == 0) FindNeighbors();
303
304   if (mig_msgs_received == 0) migrates_expected = 0;
305
306   mig_msgs[mig_msgs_received] = msg;
307   mig_msgs_received++;
308   //  CkPrintf("[%d] Received migration msg %d of %d\n",
309   //       CkMyPe(),mig_msgs_received,mig_msgs_expected);
310
311   if (mig_msgs_received > mig_msgs_expected) {
312     CkPrintf("[%d] NeighborLB Error! Too many migration messages received\n",
313              CkMyPe());
314   }
315
316   if (mig_msgs_received != mig_msgs_expected) {
317     return;
318   }
319
320   //  CkPrintf("[%d] in ReceiveMigration %d moves\n",CkMyPe(),msg->n_moves);
321   for(int neigh=0; neigh < mig_msgs_received;neigh++) {
322     LBMigrateMsg* m = mig_msgs[neigh];
323     for(int i=0; i < m->n_moves; i++) {
324       MigrateInfo& move = m->moves[i];
325       const int me = CkMyPe();
326       if (move.from_pe != me && move.to_pe == me) {
327         migrates_expected++;
328       }
329     }
330     delete m;
331     mig_msgs[neigh]=0;
332   }
333   //  CkPrintf("[%d] in ReceiveMigration %d expected\n",
334   //       CkMyPe(),migrates_expected);
335   mig_msgs_received = 0;
336   if (migrates_expected == 0 || migrates_expected == migrates_completed)
337     MigrationDone();
338 #endif
339 }
340
341
342 void NborBaseLB::MigrationDone()
343 {
344 #if CMK_LBDB_ON
345   if (CkMyPe() == 0 && start_lb_time != 0.0) {
346     double end_lb_time = CkWallTimer();
347     if (_lb_args.debug())
348       CkPrintf("Load balancing step %d finished at %f duration %f\n",
349                 step(),end_lb_time,end_lb_time - start_lb_time);
350   }
351   migrates_completed = 0;
352   migrates_expected = -1;
353   // Increment to next step
354   mystep++;
355   thisProxy [CkMyPe()].ResumeClients();
356 #endif
357 }
358
359 void NborBaseLB::ResumeClients()
360 {
361 #if CMK_LBDB_ON
362   theLbdb->ResumeClients();
363 #endif
364 }
365
366 LBMigrateMsg* NborBaseLB::Strategy(LDStats* stats,int count)
367 {
368   for(int j=0; j < count; j++) {
369     CkPrintf(
370     "[%d] Proc %d Speed %d Total(wall,cpu)=%f %f Idle=%f Bg=%f %f obj=%f %f\n",
371     CkMyPe(),stats[j].from_pe,stats[j].pe_speed,
372     stats[j].total_walltime,stats[j].total_cputime,
373     stats[j].idletime,stats[j].bg_walltime,stats[j].bg_cputime,
374     stats[j].obj_walltime,stats[j].obj_cputime);
375   }
376
377   delete [] myStats.objData;
378   myStats.n_objs = 0;
379   delete [] myStats.commData;
380   myStats.n_comm = 0;
381
382   int sizes=0;
383   LBMigrateMsg* msg = new(sizes,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
384   msg->n_moves = 0;
385
386   return msg;
387 }
388
389 int NborBaseLB::NeighborIndex(int pe)
390 {
391     int peslot = -1;
392     for(int i=0; i < mig_msgs_expected; i++) {
393       if (pe == neighbor_pes[i]) {
394         peslot = i;
395         break;
396       }
397     }
398     return peslot;
399 }
400
401 NLBStatsMsg::NLBStatsMsg(int osz, int csz) {
402   objData = new LDObjData[osz];
403   commData = new LDCommData[csz];
404 }
405
406 NLBStatsMsg::NLBStatsMsg(NLBStatsMsg *src) 
407 {
408   int size;
409   {
410     PUP::sizer  p;
411     src->pup(p);
412     size = p.size();
413   }
414   char *buf = new char[size];
415   {
416     PUP::toMem  p(buf);
417     src->pup(p);
418   }
419   {
420     PUP::fromMem  p(buf);
421     pup(p);
422   }
423   delete [] buf;
424 }
425
426 NLBStatsMsg::~NLBStatsMsg() {
427   delete [] objData;
428   delete [] commData;
429 }
430
431 void NLBStatsMsg::pup(PUP::er &p) {
432   int i;
433   p|from_pe;
434   p|serial;
435   p|pe_speed;
436   p|total_walltime; p|total_cputime;
437   p|idletime;
438   p|bg_walltime;   p|bg_cputime;
439   p|n_objs;
440   if (p.isUnpacking()) objData = new LDObjData[n_objs];
441   for (i=0; i<n_objs; i++) p|objData[i];
442   p|n_comm;
443   if (p.isUnpacking()) commData = new LDCommData[n_comm];
444   for (i=0; i<n_comm; i++) p|commData[i];
445 }
446
447 // CkMarshalledCLBStatsMessage is used in the marshalled parameter in
448 // the entry function, it is just used to use to pup.
449 // I don't use CLBStatsMsg directly as marshalled parameter because
450 // I want the data pointer stored and not to be freed by the Charm++.
451 CkMarshalledNLBStatsMessage::~CkMarshalledNLBStatsMessage() {
452   if (msg) delete msg;
453 }
454
455 void CkMarshalledNLBStatsMessage::pup(PUP::er &p)
456 {
457   int isnull;
458   if (p.isPacking()) isnull = (msg==NULL?1:0);
459   p|isnull;
460   if (p.isUnpacking()) {
461     if (!isnull) msg = new NLBStatsMsg;
462     else msg = NULL;
463   }
464   if (msg) msg->pup(p);
465 #if 0
466   if (p.isUnpacking()) msg = new NLBStatsMsg;
467   else CmiAssert(msg);
468   msg->pup(p);
469 #endif
470 }
471
472
473
474 /*@{*/