doc: Add serial to list of ci file reserved words
[charm.git] / src / ck-ldb / NborBaseLB.C
1 /**
2  * \addtogroup CkLdb
3 */
4 /*@{*/
5
6 #include "BaseLB.h"
7 #include "NborBaseLB.h"
8 #include "LBDBManager.h"
9 #include "NborBaseLB.def.h"
10
11 #define  DEBUGF(x)      // CmiPrintf x;
12
13 //CreateLBFunc_Def(NborBaseLB);
14
15 void NborBaseLB::staticMigrated(void* data, LDObjHandle h, int waitBarrier)
16 {
17   NborBaseLB *me = (NborBaseLB*)(data);
18
19   me->Migrated(h, waitBarrier);
20 }
21
22 void NborBaseLB::staticAtSync(void* data)
23 {
24   NborBaseLB *me = (NborBaseLB*)(data);
25
26   me->AtSync();
27 }
28
29 NborBaseLB::NborBaseLB(const CkLBOptions &opt): BaseLB(opt)
30 {
31 #if CMK_LBDB_ON
32   lbname = (char *)"NborBaseLB";
33   thisProxy = CProxy_NborBaseLB(thisgroup);
34   receiver = theLbdb->
35     AddLocalBarrierReceiver((LDBarrierFn)(staticAtSync),
36                             (void*)(this));
37   notifier = theLbdb->getLBDB()->
38     NotifyMigrated((LDMigratedFn)(staticMigrated), (void*)(this));
39
40
41   // I had to move neighbor initialization outside the constructor
42   // in order to get the virtual functions of any derived classes
43   // so I'll just set them to illegal values here.
44   LBtopoFn topofn = LBTopoLookup(_lbtopo);
45   if (topofn == NULL) {
46     if (CkMyPe()==0) CmiPrintf("LB> Fatal error: Unknown topology: %s.\n", _lbtopo);
47     CmiAbort("");
48   }
49   topo = topofn(CkNumPes());
50
51   mig_msgs_expected = 0;
52   neighbor_pes = NULL;
53   stats_msg_count = 0;
54   statsMsgsList = NULL;
55   statsDataList = NULL;
56   migrates_completed = 0;
57   migrates_expected = -1;
58   mig_msgs_received = 0;
59   mig_msgs = NULL;
60
61   myStats.pe_speed = theLbdb->ProcessorSpeed();
62 //  char hostname[80];
63 //  gethostname(hostname,79);
64 //  CkPrintf("[%d] host %s speed %d\n",CkMyPe(),hostname,myStats.pe_speed);
65   myStats.from_pe = CkMyPe();
66   myStats.n_objs = 0;
67   myStats.objData = NULL;
68   myStats.n_comm = 0;
69   myStats.commData = NULL;
70
71   receive_stats_ready = 0;
72
73   if (_lb_args.statsOn()) theLbdb->CollectStatsOn();
74 #endif
75 }
76
77 NborBaseLB::~NborBaseLB()
78 {
79 #if CMK_LBDB_ON
80   theLbdb = CProxy_LBDatabase(_lbdb).ckLocalBranch();
81   if (theLbdb) {
82     theLbdb->getLBDB()->
83       RemoveNotifyMigrated(notifier);
84     //theLbdb->
85     //  RemoveStartLBFn((LDStartLBFn)(staticStartLB));
86   }
87   if (statsMsgsList) delete [] statsMsgsList;
88   if (statsDataList) delete [] statsDataList;
89   if (neighbor_pes)  delete [] neighbor_pes;
90   if (mig_msgs)      delete [] mig_msgs;
91 #endif
92 }
93
94 void NborBaseLB::FindNeighbors()
95 {
96   if (neighbor_pes == 0) { // Neighbors never initialized, so init them
97                            // and other things that depend on the number
98                            // of neighbors
99     int maxneighbors = topo->max_neighbors();
100     statsMsgsList = new NLBStatsMsg*[maxneighbors];
101     for(int i=0; i < maxneighbors; i++)
102       statsMsgsList[i] = 0;
103     statsDataList = new LDStats[maxneighbors];
104
105     neighbor_pes = new int[maxneighbors];
106     topo->neighbors(CkMyPe(), neighbor_pes, mig_msgs_expected);
107     mig_msgs = new LBMigrateMsg*[mig_msgs_expected];
108   }
109
110 }
111
112 void NborBaseLB::AtSync()
113 {
114 #if CMK_LBDB_ON
115   //  CkPrintf("[%d] NborBaseLB At Sync step %d!!!!\n",CkMyPe(),mystep);
116
117   if (neighbor_pes == 0) FindNeighbors();
118   start_lb_time = 0;
119
120   if (!QueryBalanceNow(step()) || mig_msgs_expected == 0) {
121     MigrationDone(0);
122     return;
123   }
124
125   if (CkMyPe() == 0) {
126     start_lb_time = CkWallTimer();
127     if (_lb_args.debug())
128       CkPrintf("[%s] Load balancing step %d starting at %f\n",
129                lbName(), step(),start_lb_time);
130   }
131
132   NLBStatsMsg* msg = AssembleStats();
133
134   if (mig_msgs_expected > 0) {
135     CkMarshalledNLBStatsMessage marshmsg(msg);
136     thisProxy.ReceiveStats(marshmsg, mig_msgs_expected, neighbor_pes);
137   }
138
139   // Tell our own node that we are ready
140   CkMarshalledNLBStatsMessage mmsg(NULL);
141   thisProxy[CkMyPe()].ReceiveStats(mmsg);
142 #endif
143 }
144
145 NLBStatsMsg* NborBaseLB::AssembleStats()
146 {
147 #if CMK_LBDB_ON
148   // Get my own stats, this is used in NeighborLB for example
149 #if CMK_LB_CPUTIMER
150   theLbdb->TotalTime(&myStats.total_walltime,&myStats.total_cputime);
151   theLbdb->BackgroundLoad(&myStats.bg_walltime,&myStats.bg_cputime);
152 #else
153   theLbdb->TotalTime(&myStats.total_walltime,&myStats.total_walltime);
154   theLbdb->BackgroundLoad(&myStats.bg_walltime,&myStats.bg_walltime);
155 #endif
156   theLbdb->IdleTime(&myStats.idletime);
157
158   myStats.move = QueryMigrateStep(step());
159
160   myStats.n_objs = theLbdb->GetObjDataSz();
161   if (myStats.objData) delete [] myStats.objData;
162   myStats.objData = new LDObjData[myStats.n_objs];
163   theLbdb->GetObjData(myStats.objData);
164
165   myStats.n_comm = theLbdb->GetCommDataSz();
166   if (myStats.commData) delete [] myStats.commData;
167   myStats.commData = new LDCommData[myStats.n_comm];
168   theLbdb->GetCommData(myStats.commData);
169
170   myStats.obj_walltime = 0;
171 #if CMK_LB_CPUTIMER
172   myStats.obj_cputime = 0;
173 #endif
174   for(int i=0; i < myStats.n_objs; i++) {
175     myStats.obj_walltime += myStats.objData[i].wallTime;
176 #if CMK_LB_CPUTIMER
177     myStats.obj_cputime += myStats.objData[i].cpuTime;
178 #endif
179   }    
180
181   const int osz = theLbdb->GetObjDataSz();
182   const int csz = theLbdb->GetCommDataSz();
183   NLBStatsMsg* msg = new NLBStatsMsg(osz, csz);
184
185   msg->from_pe = CkMyPe();
186   // msg->serial = rand();
187   msg->serial = CrnRand();
188   msg->pe_speed = myStats.pe_speed;
189   msg->total_walltime = myStats.total_walltime;
190   msg->idletime = myStats.idletime;
191   msg->bg_walltime = myStats.bg_walltime;
192   msg->obj_walltime = myStats.obj_walltime;
193 #if CMK_LB_CPUTIMER
194   msg->total_cputime = myStats.total_cputime;
195   msg->bg_cputime = myStats.bg_cputime;
196   msg->obj_cputime = myStats.obj_cputime;
197 #endif
198   msg->n_objs = osz;
199   theLbdb->GetObjData(msg->objData);
200   msg->n_comm = csz;
201   theLbdb->GetCommData(msg->commData);
202
203   // cleanup 
204   delete [] myStats.objData;
205   myStats.objData = NULL;
206   myStats.n_objs = 0;
207   delete [] myStats.commData;
208   myStats.commData = NULL;
209   myStats.n_comm = 0;
210
211   //  CkPrintf(
212   //    "Proc %d speed=%d Total(wall,cpu)=%f %f Idle=%f Bg=%f %f Obj=%f %f\n",
213   //    CkMyPe(),msg->proc_speed,msg->total_walltime,msg->total_cputime,
214   //    msg->idletime,msg->bg_walltime,msg->bg_cputime,
215   //    msg->obj_walltime,msg->obj_cputime);
216
217   //  CkPrintf("PE %d sending %d to ReceiveStats %d objs, %d comm\n",
218   //       CkMyPe(),msg->serial,msg->n_objs,msg->n_comm);
219   return msg;
220 #else
221   return NULL;
222 #endif
223 }
224
225 void NborBaseLB::Migrated(LDObjHandle h, int waitBarrier)
226 {
227   migrates_completed++;
228   //  CkPrintf("[%d] An object migrated! %d %d\n",
229   //       CkMyPe(),migrates_completed,migrates_expected);
230   if (migrates_completed == migrates_expected) {
231     MigrationDone(1);
232   }
233 }
234
235 void NborBaseLB::ReceiveStats(CkMarshalledNLBStatsMessage &data)
236 {
237 #if CMK_LBDB_ON
238   NLBStatsMsg *m = data.getMessage();
239   if (neighbor_pes == 0) FindNeighbors();
240
241   if (m == 0) { // This is from our own node
242     receive_stats_ready = 1;
243   } else {
244     const int pe = m->from_pe;
245     //  CkPrintf("Stats msg received, %d %d %d %d %p\n",
246     //             pe,stats_msg_count,m->n_objs,m->serial,m);
247     int peslot = NeighborIndex(pe);
248
249     if (peslot == -1 || statsMsgsList[peslot] != 0) {
250       CkPrintf("*** Unexpected NLBStatsMsg in ReceiveStats from PE %d ***\n",
251                pe);
252     } else {
253       statsMsgsList[peslot] = m;
254       statsDataList[peslot].from_pe = m->from_pe;
255       statsDataList[peslot].total_walltime = m->total_walltime;
256       statsDataList[peslot].idletime = m->idletime;
257       statsDataList[peslot].bg_walltime = m->bg_walltime;
258       statsDataList[peslot].pe_speed = m->pe_speed;
259       statsDataList[peslot].obj_walltime = m->obj_walltime;
260 #if CMK_LB_CPUTIMER
261       statsDataList[peslot].total_cputime = m->total_cputime;
262       statsDataList[peslot].bg_cputime = m->bg_cputime;
263       statsDataList[peslot].obj_cputime = m->obj_cputime;
264 #endif
265
266       statsDataList[peslot].n_objs = m->n_objs;
267       statsDataList[peslot].objData = m->objData;
268       statsDataList[peslot].n_comm = m->n_comm;
269       statsDataList[peslot].commData = m->commData;
270
271       if (_lb_args.ignoreBgLoad()) statsDataList[peslot].clearBgLoad();
272
273       stats_msg_count++;
274     }
275   }
276
277   const int clients = mig_msgs_expected;
278   if (stats_msg_count == clients && receive_stats_ready) {
279     double strat_start_time = CkWallTimer();
280     receive_stats_ready = 0;
281     LBMigrateMsg* migrateMsg = Strategy(statsDataList,clients);
282
283     int i;
284
285     // Migrate messages from me to elsewhere
286     for(i=0; i < migrateMsg->n_moves; i++) {
287       MigrateInfo& move = migrateMsg->moves[i];
288       const int me = CkMyPe();
289       if (move.from_pe == me && move.to_pe != me) {
290         theLbdb->Migrate(move.obj,move.to_pe);
291       } else if (move.from_pe != me) {
292         CkPrintf("[%d] error, strategy wants to move from %d to  %d\n",
293                  me,move.from_pe,move.to_pe);
294       }
295     }
296     
297     // Now, send migrate messages to neighbors
298     if (clients > 0)
299       thisProxy.ReceiveMigration(migrateMsg, clients, neighbor_pes);
300     
301     // Zero out data structures for next cycle
302     for(i=0; i < clients; i++) {
303       delete statsMsgsList[i];
304       statsMsgsList[i]=NULL;
305     }
306     stats_msg_count=0;
307
308     //theLbdb->ClearLoads();
309     if (CkMyPe() == 0) {
310       double strat_end_time = CkWallTimer();
311       if (_lb_args.debug())
312         CkPrintf("[%d] %s Strat elapsed time %f\n",CkMyPe(),lbName(),strat_end_time-strat_start_time);
313     }
314   }
315 #endif  
316 }
317
318 void NborBaseLB::ReceiveMigration(LBMigrateMsg *msg)
319 {
320 #if CMK_LBDB_ON
321   if (neighbor_pes == 0) FindNeighbors();
322
323   if (mig_msgs_received == 0) migrates_expected = 0;
324
325   mig_msgs[mig_msgs_received] = msg;
326   mig_msgs_received++;
327   //  CkPrintf("[%d] Received migration msg %d of %d\n",
328   //       CkMyPe(),mig_msgs_received,mig_msgs_expected);
329
330   if (mig_msgs_received > mig_msgs_expected) {
331     CkPrintf("[%d] NeighborLB Error! Too many migration messages received\n",
332              CkMyPe());
333   }
334
335   if (mig_msgs_received != mig_msgs_expected) {
336     return;
337   }
338
339   //  CkPrintf("[%d] in ReceiveMigration %d moves\n",CkMyPe(),msg->n_moves);
340   for(int neigh=0; neigh < mig_msgs_received;neigh++) {
341     LBMigrateMsg* m = mig_msgs[neigh];
342     for(int i=0; i < m->n_moves; i++) {
343       MigrateInfo& move = m->moves[i];
344       const int me = CkMyPe();
345       if (move.from_pe != me && move.to_pe == me) {
346         migrates_expected++;
347       }
348     }
349     delete m;
350     mig_msgs[neigh]=0;
351   }
352   //  CkPrintf("[%d] in ReceiveMigration %d expected\n",
353   //       CkMyPe(),migrates_expected);
354   mig_msgs_received = 0;
355   if (migrates_expected == 0 || migrates_expected == migrates_completed)
356     MigrationDone(1);
357 #endif
358 }
359
360
361 void NborBaseLB::MigrationDone(int balancing)
362 {
363 #if CMK_LBDB_ON
364   migrates_completed = 0;
365   migrates_expected = -1;
366   // Increment to next step
367   theLbdb->incStep();
368
369   theLbdb->ClearLoads();
370
371   // if sync resume invoke a barrier
372   if (balancing && _lb_args.syncResume()) {
373     CkCallback cb(CkIndex_NborBaseLB::ResumeClients((CkReductionMsg*)NULL), 
374                   thisProxy);
375     contribute(0, NULL, CkReduction::sum_int, cb);
376   }
377   else 
378     thisProxy [CkMyPe()].ResumeClients(balancing);
379   //thisProxy [CkMyPe()].ResumeClients();
380 #endif
381 }
382
383 void NborBaseLB::ResumeClients(CkReductionMsg *msg)
384 {
385   ResumeClients(1);
386   delete msg;
387 }
388
389 void NborBaseLB::ResumeClients(int balancing)
390 {
391 #if CMK_LBDB_ON
392   DEBUGF(("[%d] ResumeClients. \n", CkMyPe()));
393
394   if (CkMyPe() == 0 && balancing) {
395     double end_lb_time = CkWallTimer();
396     if (_lb_args.debug())
397       CkPrintf("[%s] Load balancing step %d finished at %f duration %f\n",
398                 lbName(), step()-1,end_lb_time,end_lb_time - start_lb_time);
399   }
400
401   theLbdb->ResumeClients();
402 #endif
403 }
404
405 LBMigrateMsg* NborBaseLB::Strategy(LDStats* stats, int n_nbrs)
406 {
407   for(int j=0; j < n_nbrs; j++) {
408     CkPrintf("[%d] Proc %d Speed %d WALL: Total %f Idle %f Bg %f obj %f",
409     CkMyPe(), stats[j].from_pe, stats[j].pe_speed, stats[j].total_walltime,
410     stats[j].idletime, stats[j].bg_walltime, stats[j].obj_walltime);
411 #if CMK_LB_CPUTIMER
412     CkPrintf(" CPU: Total %f Bg %f obj %f", stats[j].total_cputime,
413     stats[j].bg_cputime, stats[j].obj_cputime);
414 #endif
415     CkPrintf("\n");
416   }
417
418   int sizes=0;
419   LBMigrateMsg* msg = new(sizes,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
420   msg->n_moves = 0;
421
422   return msg;
423 }
424
425 int NborBaseLB::NeighborIndex(int pe)
426 {
427     int peslot = -1;
428     for(int i=0; i < mig_msgs_expected; i++) {
429       if (pe == neighbor_pes[i]) {
430         peslot = i;
431         break;
432       }
433     }
434     return peslot;
435 }
436
437 NLBStatsMsg::NLBStatsMsg(int osz, int csz) {
438   objData = new LDObjData[osz];
439   commData = new LDCommData[csz];
440 }
441
442 NLBStatsMsg::NLBStatsMsg(NLBStatsMsg *src) 
443 {
444   int size;
445   {
446     PUP::sizer  p;
447     src->pup(p);
448     size = p.size();
449   }
450   char *buf = new char[size];
451   {
452     PUP::toMem  p(buf);
453     src->pup(p);
454   }
455   {
456     PUP::fromMem  p(buf);
457     pup(p);
458   }
459   delete [] buf;
460 }
461
462 NLBStatsMsg::~NLBStatsMsg() {
463   delete [] objData;
464   delete [] commData;
465 }
466
467 void NLBStatsMsg::pup(PUP::er &p) {
468   int i;
469   p|from_pe;
470   p|serial;
471   p|pe_speed;
472   p|total_walltime;
473   p|idletime;
474   p|bg_walltime;
475 #if CMK_LB_CPUTIMER
476   p|total_cputime;
477   p|bg_cputime;
478 #endif
479   p|n_objs;
480   if (p.isUnpacking()) objData = new LDObjData[n_objs];
481   for (i=0; i<n_objs; i++) p|objData[i];
482   p|n_comm;
483   if (p.isUnpacking()) commData = new LDCommData[n_comm];
484   for (i=0; i<n_comm; i++) p|commData[i];
485 }
486
487 // CkMarshalledCLBStatsMessage is used in the marshalled parameter in
488 // the entry function, it is just used to use to pup.
489 // I don't use CLBStatsMsg directly as marshalled parameter because
490 // I want the data pointer stored and not to be freed by the Charm++.
491 CkMarshalledNLBStatsMessage::~CkMarshalledNLBStatsMessage() {
492   if (msg) delete msg;
493 }
494
495 void CkMarshalledNLBStatsMessage::pup(PUP::er &p)
496 {
497   int isnull;
498   if (p.isPacking()) isnull = (msg==NULL?1:0);
499   p|isnull;
500   if (p.isUnpacking()) {
501     if (!isnull) msg = new NLBStatsMsg;
502     else msg = NULL;
503   }
504   if (msg) msg->pup(p);
505 #if 0
506   if (p.isUnpacking()) msg = new NLBStatsMsg;
507   else CmiAssert(msg);
508   msg->pup(p);
509 #endif
510 }
511
512
513
514 /*@{*/