all load balancers inherit from BaseLB now.
[charm.git] / src / ck-ldb / NborBaseLB.C
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 #ifndef  WIN32
9 #include <unistd.h>
10 #endif
11 #include <charm++.h>
12 #include <BaseLB.h>
13 #include "NborBaseLB.h"
14 #include "NborBaseLB.def.h"
15
16 CkGroupID nborBaselb;
17
18 #if CMK_LBDB_ON
19
20 void CreateNborBaseLB()
21 {
22   nborBaselb = CProxy_NborBaseLB::ckNew();
23 }
24
25 void NborBaseLB::staticMigrated(void* data, LDObjHandle h)
26 {
27   NborBaseLB *me = (NborBaseLB*)(data);
28
29   me->Migrated(h);
30 }
31
32 void NborBaseLB::staticAtSync(void* data)
33 {
34   NborBaseLB *me = (NborBaseLB*)(data);
35
36   me->AtSync();
37 }
38
39 NborBaseLB::NborBaseLB()
40 {
41   mystep = 0;
42   theLbdb = CProxy_LBDatabase(lbdb).ckLocalBranch();
43   theLbdb->
44     AddLocalBarrierReceiver((LDBarrierFn)(staticAtSync),
45                             (void*)(this));
46   theLbdb->
47     NotifyMigrated((LDMigratedFn)(staticMigrated),
48                    (void*)(this));
49
50
51   // I had to move neighbor initialization outside the constructor
52   // in order to get the virtual functions of any derived classes
53   // so I'll just set them to illegal values here.
54   neighbor_pes = 0;
55   stats_msg_count = 0;
56   statsMsgsList = 0;
57   statsDataList = 0;
58   migrates_completed = 0;
59   migrates_expected = -1;
60   mig_msgs_received = 0;
61   mig_msgs = 0;
62
63   myStats.pe_speed = theLbdb->ProcessorSpeed();
64 //  char hostname[80];
65 //  gethostname(hostname,79);
66 //  CkPrintf("[%d] host %s speed %d\n",CkMyPe(),hostname,myStats.pe_speed);
67   myStats.from_pe = CkMyPe();
68   myStats.n_objs = 0;
69   myStats.objData = NULL;
70   myStats.n_comm = 0;
71   myStats.commData = NULL;
72   receive_stats_ready = 0;
73
74   theLbdb->CollectStatsOn();
75 }
76
77 NborBaseLB::~NborBaseLB()
78 {
79   CkPrintf("Going away\n");
80 }
81
82 void NborBaseLB::FindNeighbors()
83 {
84   if (neighbor_pes == 0) { // Neighbors never initialized, so init them
85                            // and other things that depend on the number
86                            // of neighbors
87     statsMsgsList = new NLBStatsMsg*[max_neighbors()];
88     for(int i=0; i < max_neighbors(); i++)
89       statsMsgsList[i] = 0;
90     statsDataList = new LDStats[max_neighbors()];
91
92     neighbor_pes = new int[max_neighbors()];
93     neighbors(neighbor_pes);
94     mig_msgs_expected = num_neighbors();
95     mig_msgs = new NLBMigrateMsg*[num_neighbors()];
96   }
97
98 }
99
100 void NborBaseLB::AtSync()
101 {
102   //  CkPrintf("[%d] NborBaseLB At Sync step %d!!!!\n",CkMyPe(),mystep);
103
104   if (neighbor_pes == 0) FindNeighbors();
105   start_lb_time = 0;
106
107   if (!QueryBalanceNow(step()) || num_neighbors() == 0) {
108     MigrationDone();
109     return;
110   }
111
112   if (CkMyPe() == 0) {
113     start_lb_time = CmiWallTimer();
114     CkPrintf("Load balancing step %d starting at %f\n",
115              step(),start_lb_time);
116   }
117
118   NLBStatsMsg* msg = AssembleStats();
119
120   int i;
121   for(i=0; i < num_neighbors()-1; i++) {
122     NLBStatsMsg* m2 = (NLBStatsMsg*) CkCopyMsg((void**)&msg);
123     thisProxy [neighbor_pes[i]].ReceiveStats(m2);
124   }
125   if (0 < num_neighbors()) {
126     thisProxy [neighbor_pes[i]].ReceiveStats(msg);
127   } else delete msg;
128
129   // Tell our own node that we are ready
130   ReceiveStats((NLBStatsMsg*)0);
131 }
132
133 NLBStatsMsg* NborBaseLB::AssembleStats()
134 {
135   // Get stats
136   theLbdb->TotalTime(&myStats.total_walltime,&myStats.total_cputime);
137   theLbdb->IdleTime(&myStats.idletime);
138   theLbdb->BackgroundLoad(&myStats.bg_walltime,&myStats.bg_cputime);
139
140   myStats.move = QueryMigrateStep(step());
141
142   myStats.n_objs = theLbdb->GetObjDataSz();
143   if (myStats.objData) delete [] myStats.objData;
144   myStats.objData = new LDObjData[myStats.n_objs];
145   theLbdb->GetObjData(myStats.objData);
146
147   myStats.n_comm = theLbdb->GetCommDataSz();
148   if (myStats.commData) delete [] myStats.commData;
149   myStats.commData = new LDCommData[myStats.n_comm];
150   theLbdb->GetCommData(myStats.commData);
151
152   myStats.obj_walltime = myStats.obj_cputime = 0;
153   for(int i=0; i < myStats.n_objs; i++) {
154     myStats.obj_walltime += myStats.objData[i].wallTime;
155     myStats.obj_cputime += myStats.objData[i].cpuTime;
156   }    
157
158   const int osz = theLbdb->GetObjDataSz();
159   const int csz = theLbdb->GetCommDataSz();
160   NLBStatsMsg* msg = new(osz, csz, 0)  NLBStatsMsg;
161
162   msg->from_pe = CkMyPe();
163   // msg->serial = rand();
164   msg->serial = CrnRand();
165   msg->pe_speed = myStats.pe_speed;
166   msg->total_walltime = myStats.total_walltime;
167   msg->total_cputime = myStats.total_cputime;
168   msg->idletime = myStats.idletime;
169   msg->bg_walltime = myStats.bg_walltime;
170   msg->bg_cputime = myStats.bg_cputime;
171   msg->obj_walltime = myStats.obj_walltime;
172   msg->obj_cputime = myStats.obj_cputime;
173   msg->n_objs = osz;
174   theLbdb->GetObjData(msg->objData);
175   msg->n_comm = csz;
176   theLbdb->GetCommData(msg->commData);
177
178   //  CkPrintf(
179   //    "Proc %d speed=%d Total(wall,cpu)=%f %f Idle=%f Bg=%f %f Obj=%f %f\n",
180   //    CkMyPe(),msg->proc_speed,msg->total_walltime,msg->total_cputime,
181   //    msg->idletime,msg->bg_walltime,msg->bg_cputime,
182   //    msg->obj_walltime,msg->obj_cputime);
183
184   //  CkPrintf("PE %d sending %d to ReceiveStats %d objs, %d comm\n",
185   //       CkMyPe(),msg->serial,msg->n_objs,msg->n_comm);
186   return msg;
187 }
188
189 void NborBaseLB::Migrated(LDObjHandle h)
190 {
191   migrates_completed++;
192   //  CkPrintf("[%d] An object migrated! %d %d\n",
193   //       CkMyPe(),migrates_completed,migrates_expected);
194   if (migrates_completed == migrates_expected) {
195     MigrationDone();
196   }
197 }
198
199 void NborBaseLB::ReceiveStats(NLBStatsMsg *m)
200 {
201   if (neighbor_pes == 0) FindNeighbors();
202
203   if (m == 0) { // This is from our own node
204     receive_stats_ready = 1;
205   } else {
206     const int pe = m->from_pe;
207     //  CkPrintf("Stats msg received, %d %d %d %d %p\n",
208     //             pe,stats_msg_count,m->n_objs,m->serial,m);
209     int peslot = NeighborIndex(pe);
210
211     if (peslot == -1 || statsMsgsList[peslot] != 0) {
212       CkPrintf("*** Unexpected NLBStatsMsg in ReceiveStats from PE %d ***\n",
213                pe);
214     } else {
215       statsMsgsList[peslot] = m;
216       statsDataList[peslot].from_pe = m->from_pe;
217       statsDataList[peslot].total_walltime = m->total_walltime;
218       statsDataList[peslot].total_cputime = m->total_cputime;
219       statsDataList[peslot].idletime = m->idletime;
220       statsDataList[peslot].bg_walltime = m->bg_walltime;
221       statsDataList[peslot].bg_cputime = m->bg_cputime;
222       statsDataList[peslot].pe_speed = m->pe_speed;
223       statsDataList[peslot].obj_walltime = m->obj_walltime;
224       statsDataList[peslot].obj_cputime = m->obj_cputime;
225
226       statsDataList[peslot].n_objs = m->n_objs;
227       statsDataList[peslot].objData = m->objData;
228       statsDataList[peslot].n_comm = m->n_comm;
229       statsDataList[peslot].commData = m->commData;
230       stats_msg_count++;
231     }
232   }
233
234   const int clients = num_neighbors();
235   if (stats_msg_count == clients && receive_stats_ready) {
236     double strat_start_time = CmiWallTimer();
237     receive_stats_ready = 0;
238     NLBMigrateMsg* migrateMsg = Strategy(statsDataList,clients);
239
240     int i;
241
242     // Migrate messages from me to elsewhere
243     for(i=0; i < migrateMsg->n_moves; i++) {
244       MigrateInfo& move = migrateMsg->moves[i];
245       const int me = CkMyPe();
246       if (move.from_pe == me && move.to_pe != me) {
247         theLbdb->Migrate(move.obj,move.to_pe);
248       } else if (move.from_pe != me) {
249         CkPrintf("[%d] error, strategy wants to move from %d to  %d\n",
250                  me,move.from_pe,move.to_pe);
251       }
252     }
253     
254     // Now, send migrate messages to neighbors
255     for(i=1; i < num_neighbors(); i++) {
256       NLBMigrateMsg* m2 = (NLBMigrateMsg*) CkCopyMsg((void**)&migrateMsg);
257       thisProxy [neighbor_pes[i]].ReceiveMigration(m2);
258     }
259     if (0 < num_neighbors())
260       thisProxy [neighbor_pes[0]].ReceiveMigration(migrateMsg);
261     else delete migrateMsg;
262     
263     // Zero out data structures for next cycle
264     for(i=0; i < clients; i++) {
265       delete statsMsgsList[i];
266       statsMsgsList[i]=0;
267     }
268     stats_msg_count=0;
269
270     theLbdb->ClearLoads();
271     if (CkMyPe() == 0) {
272       double strat_end_time = CmiWallTimer();
273       CkPrintf("Strat elapsed time %f\n",strat_end_time-strat_start_time);
274     }
275   }
276   
277 }
278
279 void NborBaseLB::ReceiveMigration(NLBMigrateMsg *msg)
280 {
281   if (neighbor_pes == 0) FindNeighbors();
282
283   if (mig_msgs_received == 0) migrates_expected = 0;
284
285   mig_msgs[mig_msgs_received] = msg;
286   mig_msgs_received++;
287   //  CkPrintf("[%d] Received migration msg %d of %d\n",
288   //       CkMyPe(),mig_msgs_received,mig_msgs_expected);
289
290   if (mig_msgs_received > mig_msgs_expected) {
291     CkPrintf("[%d] NeighborLB Error! Too many migration messages received\n",
292              CkMyPe());
293   }
294
295   if (mig_msgs_received != mig_msgs_expected) {
296     return;
297   }
298
299   //  CkPrintf("[%d] in ReceiveMigration %d moves\n",CkMyPe(),msg->n_moves);
300   for(int neigh=0; neigh < mig_msgs_received;neigh++) {
301     NLBMigrateMsg* m = mig_msgs[neigh];
302     for(int i=0; i < m->n_moves; i++) {
303       MigrateInfo& move = m->moves[i];
304       const int me = CkMyPe();
305       if (move.from_pe != me && move.to_pe == me) {
306         migrates_expected++;
307       }
308     }
309     delete m;
310     mig_msgs[neigh]=0;
311   }
312   //  CkPrintf("[%d] in ReceiveMigration %d expected\n",
313   //       CkMyPe(),migrates_expected);
314   mig_msgs_received = 0;
315   if (migrates_expected == 0 || migrates_expected == migrates_completed)
316     MigrationDone();
317 }
318
319
320 void NborBaseLB::MigrationDone()
321 {
322   if (CkMyPe() == 0 && start_lb_time != 0.0) {
323     double end_lb_time = CmiWallTimer();
324     CkPrintf("Load balancing step %d finished at %f duration %f\n",
325              step(),end_lb_time,end_lb_time - start_lb_time);
326   }
327   migrates_completed = 0;
328   migrates_expected = -1;
329   // Increment to next step
330   mystep++;
331   thisProxy [CkMyPe()].ResumeClients();
332 }
333
334 void NborBaseLB::ResumeClients()
335 {
336   theLbdb->ResumeClients();
337 }
338
339 NLBMigrateMsg* NborBaseLB::Strategy(LDStats* stats,int count)
340 {
341   for(int j=0; j < count; j++) {
342     CkPrintf(
343     "[%d] Proc %d Speed %d Total(wall,cpu)=%f %f Idle=%f Bg=%f %f obj=%f %f\n",
344     CkMyPe(),stats[j].from_pe,stats[j].pe_speed,
345     stats[j].total_walltime,stats[j].total_cputime,
346     stats[j].idletime,stats[j].bg_walltime,stats[j].bg_cputime,
347     stats[j].obj_walltime,stats[j].obj_cputime);
348   }
349
350   delete [] myStats.objData;
351   myStats.n_objs = 0;
352   delete [] myStats.commData;
353   myStats.n_comm = 0;
354
355   int sizes=0;
356   NLBMigrateMsg* msg = new(&sizes,1) NLBMigrateMsg;
357   msg->n_moves = 0;
358
359   return msg;
360 }
361
362 int NborBaseLB::NeighborIndex(int pe)
363 {
364     int peslot = -1;
365     for(int i=0; i < num_neighbors(); i++) {
366       if (pe == neighbor_pes[i]) {
367         peslot = i;
368         break;
369       }
370     }
371     return peslot;
372 }
373
374 void* NLBMigrateMsg::alloc(int msgnum, size_t size, int* array, int priobits)
375 {
376   int totalsize = size + array[0] * sizeof(NborBaseLB::MigrateInfo);
377
378   NLBMigrateMsg* ret =
379     (NLBMigrateMsg*)(CkAllocMsg(msgnum,totalsize,priobits));
380
381   ret->moves = (NborBaseLB::MigrateInfo*) ((char*)(ret)+ size);
382
383   return (void*)(ret);
384 }
385
386 void* NLBMigrateMsg::pack(NLBMigrateMsg* m)
387 {
388   m->moves = (NborBaseLB::MigrateInfo*)
389     ((char*)(m->moves) - (char*)(&m->moves));
390
391   return (void*)(m);
392 }
393
394 NLBMigrateMsg* NLBMigrateMsg::unpack(void *m)
395 {
396   NLBMigrateMsg* ret_val = (NLBMigrateMsg*)(m);
397
398   ret_val->moves = (NborBaseLB::MigrateInfo*)
399     ((char*)(&ret_val->moves) + (size_t)(ret_val->moves));
400
401   return ret_val;
402 }
403
404 #endif