I Rearranged the load balancer, so WSLB->NeighborLB, NeighborLB->NborBaseLB,
[charm.git] / src / ck-ldb / NborBaseLB.C
1 #include <unistd.h>
2 #include <charm++.h>
3 #include <LBDatabase.h>
4 #include "NborBaseLB.h"
5 #include "NborBaseLB.def.h"
6
7 CkGroupID nborBaselb;
8
9 #if CMK_LBDB_ON
10
11 void CreateNborBaseLB()
12 {
13   nborBaselb = CProxy_NborBaseLB::ckNew();
14 }
15
16 void NborBaseLB::staticMigrated(void* data, LDObjHandle h)
17 {
18   NborBaseLB *me = static_cast<NborBaseLB*>(data);
19
20   me->Migrated(h);
21 }
22
23 void NborBaseLB::staticAtSync(void* data)
24 {
25   NborBaseLB *me = static_cast<NborBaseLB*>(data);
26
27   me->AtSync();
28 }
29
30 NborBaseLB::NborBaseLB()
31 {
32   mystep = 0;
33   theLbdb = CProxy_LBDatabase(lbdb).ckLocalBranch();
34   theLbdb->
35     AddLocalBarrierReceiver(reinterpret_cast<LDBarrierFn>(staticAtSync),
36                             static_cast<void*>(this));
37   theLbdb->
38     NotifyMigrated(reinterpret_cast<LDMigratedFn>(staticMigrated),
39                    static_cast<void*>(this));
40
41
42   // I had to move neighbor initialization outside the constructor
43   // in order to get the virtual functions of any derived classes
44   // so I'll just set them to illegal values here.
45   neighbor_pes = 0;
46   stats_msg_count = 0;
47   statsMsgsList = 0;
48   statsDataList = 0;
49   migrates_completed = 0;
50   migrates_expected = -1;
51   mig_msgs_received = 0;
52   mig_msgs = 0;
53
54   myStats.proc_speed = theLbdb->ProcessorSpeed();
55 //  char hostname[80];
56 //  gethostname(hostname,79);
57 //  CkPrintf("[%d] host %s speed %d\n",CkMyPe(),hostname,myStats.proc_speed);
58   myStats.obj_data_sz = 0;
59   myStats.comm_data_sz = 0;
60   receive_stats_ready = 0;
61
62   theLbdb->CollectStatsOn();
63 }
64
65 NborBaseLB::~NborBaseLB()
66 {
67   CkPrintf("Going away\n");
68 }
69
70 void NborBaseLB::FindNeighbors()
71 {
72   if (neighbor_pes == 0) { // Neighbors never initialized, so init them
73                            // and other things that depend on the number
74                            // of neighbors
75     statsMsgsList = new NLBStatsMsg*[num_neighbors()];
76     for(int i=0; i < num_neighbors(); i++)
77       statsMsgsList[i] = 0;
78     statsDataList = new LDStats[num_neighbors()];
79
80     neighbor_pes = new int[num_neighbors()];
81     neighbors(neighbor_pes);
82     mig_msgs_expected = num_neighbors();
83     mig_msgs = new NLBMigrateMsg*[num_neighbors()];
84   }
85
86 }
87
88 void NborBaseLB::AtSync()
89 {
90   //  CkPrintf("[%d] NborBaseLB At Sync step %d!!!!\n",CkMyPe(),mystep);
91
92   if (CkMyPe() == 0) {
93     start_lb_time = CmiWallTimer();
94     CkPrintf("Load balancing step %d starting at %f\n",
95              step(),start_lb_time);
96   }
97
98   if (neighbor_pes == 0) FindNeighbors();
99
100   if (!QueryBalanceNow(step()) || num_neighbors() == 0) {
101     MigrationDone();
102     return;
103   }
104
105   NLBStatsMsg* msg = AssembleStats();
106
107   int i;
108   for(i=1; i < num_neighbors(); i++) {
109     NLBStatsMsg* m2 = (NLBStatsMsg*) CkCopyMsg((void**)&msg);
110     CProxy_NborBaseLB(thisgroup).ReceiveStats(m2,neighbor_pes[i]);
111   }
112   if (0 < num_neighbors()) {
113     CProxy_NborBaseLB(thisgroup).ReceiveStats(msg,neighbor_pes[0]);
114   } else delete msg;
115
116   // Tell our own node that we are ready
117   ReceiveStats((NLBStatsMsg*)0);
118 }
119
120 NLBStatsMsg* NborBaseLB::AssembleStats()
121 {
122   // Get stats
123   theLbdb->TotalTime(&myStats.total_walltime,&myStats.total_cputime);
124   theLbdb->IdleTime(&myStats.idletime);
125   theLbdb->BackgroundLoad(&myStats.bg_walltime,&myStats.bg_cputime);
126   myStats.obj_data_sz = theLbdb->GetObjDataSz();
127   myStats.objData = new LDObjData[myStats.obj_data_sz];
128   theLbdb->GetObjData(myStats.objData);
129
130   myStats.comm_data_sz = theLbdb->GetCommDataSz();
131   myStats.commData = new LDCommData[myStats.comm_data_sz];
132   theLbdb->GetCommData(myStats.commData);
133
134   myStats.obj_walltime = myStats.obj_cputime = 0;
135   for(int i=0; i < myStats.obj_data_sz; i++) {
136     myStats.obj_walltime += myStats.objData[i].wallTime;
137     myStats.obj_cputime += myStats.objData[i].cpuTime;
138   }    
139
140   NLBStatsMsg* msg = new NLBStatsMsg;
141
142   msg->from_pe = CkMyPe();
143   msg->serial = rand();
144   msg->proc_speed = myStats.proc_speed;
145   msg->total_walltime = myStats.total_walltime;
146   msg->total_cputime = myStats.total_cputime;
147   msg->idletime = myStats.idletime;
148   msg->bg_walltime = myStats.bg_walltime;
149   msg->bg_cputime = myStats.bg_cputime;
150   msg->obj_walltime = myStats.obj_walltime;
151   msg->obj_cputime = myStats.obj_cputime;
152
153   //  CkPrintf(
154   //    "Proc %d speed=%d Total(wall,cpu)=%f %f Idle=%f Bg=%f %f Obj=%f %f\n",
155   //    CkMyPe(),msg->proc_speed,msg->total_walltime,msg->total_cputime,
156   //    msg->idletime,msg->bg_walltime,msg->bg_cputime,
157   //    msg->obj_walltime,msg->obj_cputime);
158
159   //  CkPrintf("PE %d sending %d to ReceiveStats %d objs, %d comm\n",
160   //       CkMyPe(),msg->serial,msg->n_objs,msg->n_comm);
161   return msg;
162 }
163
164 void NborBaseLB::Migrated(LDObjHandle h)
165 {
166   migrates_completed++;
167   //  CkPrintf("[%d] An object migrated! %d %d\n",
168   //       CkMyPe(),migrates_completed,migrates_expected);
169   if (migrates_completed == migrates_expected) {
170     MigrationDone();
171   }
172 }
173
174 void NborBaseLB::ReceiveStats(NLBStatsMsg *m)
175 {
176   if (neighbor_pes == 0) FindNeighbors();
177
178   if (m == 0) { // This is from our own node
179     receive_stats_ready = 1;
180   } else {
181     const int pe = m->from_pe;
182     //  CkPrintf("Stats msg received, %d %d %d %d %p\n",
183     //             pe,stats_msg_count,m->n_objs,m->serial,m);
184     int peslot = -1;
185     for(int i=0; i < num_neighbors(); i++) {
186       if (pe == neighbor_pes[i]) {
187         peslot = i;
188         break;
189       }
190     }
191     if (peslot == -1 || statsMsgsList[peslot] != 0) {
192       CkPrintf("*** Unexpected NLBStatsMsg in ReceiveStats from PE %d ***\n",
193                pe);
194     } else {
195       statsMsgsList[peslot] = m;
196       statsDataList[peslot].from_pe = m->from_pe;
197       statsDataList[peslot].total_walltime = m->total_walltime;
198       statsDataList[peslot].total_cputime = m->total_cputime;
199       statsDataList[peslot].idletime = m->idletime;
200       statsDataList[peslot].bg_walltime = m->bg_walltime;
201       statsDataList[peslot].bg_cputime = m->bg_cputime;
202       statsDataList[peslot].proc_speed = m->proc_speed;
203       statsDataList[peslot].obj_walltime = m->obj_walltime;
204       statsDataList[peslot].obj_cputime = m->obj_cputime;
205       stats_msg_count++;
206     }
207   }
208
209   const int clients = num_neighbors();
210   if (stats_msg_count == clients && receive_stats_ready) {
211     double strat_start_time = CmiWallTimer();
212     receive_stats_ready = 0;
213     NLBMigrateMsg* migrateMsg = Strategy(statsDataList,clients);
214
215     int i;
216
217     // Migrate messages from me to elsewhere
218     for(i=0; i < migrateMsg->n_moves; i++) {
219       MigrateInfo& move = migrateMsg->moves[i];
220       const int me = CkMyPe();
221       if (move.from_pe == me && move.to_pe != me) {
222         theLbdb->Migrate(move.obj,move.to_pe);
223       } else if (move.from_pe != me) {
224         CkPrintf("[%d] error, strategy wants to move from %d to  %d\n",
225                  me,move.from_pe,move.to_pe);
226       }
227     }
228     
229     // Now, send migrate messages to neighbors
230     for(i=1; i < num_neighbors(); i++) {
231       NLBMigrateMsg* m2 = (NLBMigrateMsg*) CkCopyMsg((void**)&migrateMsg);
232       CProxy_NborBaseLB(thisgroup).ReceiveMigration(m2,neighbor_pes[i]);
233     }
234     if (0 < num_neighbors())
235       CProxy_NborBaseLB(thisgroup).ReceiveMigration(migrateMsg,
236                                                     neighbor_pes[0]);
237     else delete migrateMsg;
238     
239     // Zero out data structures for next cycle
240     for(i=0; i < clients; i++) {
241       delete statsMsgsList[i];
242       statsMsgsList[i]=0;
243     }
244     stats_msg_count=0;
245
246     theLbdb->ClearLoads();
247     if (CkMyPe() == 0) {
248       double strat_end_time = CmiWallTimer();
249       CkPrintf("Strat elapsed time %f\n",strat_end_time-strat_start_time);
250     }
251   }
252   
253 }
254
255 void NborBaseLB::ReceiveMigration(NLBMigrateMsg *msg)
256 {
257   if (neighbor_pes == 0) FindNeighbors();
258
259   if (mig_msgs_received == 0) migrates_expected = 0;
260
261   mig_msgs[mig_msgs_received] = msg;
262   mig_msgs_received++;
263   //  CkPrintf("[%d] Received migration msg %d of %d\n",
264   //       CkMyPe(),mig_msgs_received,mig_msgs_expected);
265
266   if (mig_msgs_received > mig_msgs_expected) {
267     CkPrintf("[%d] NeighborLB Error! Too many migration messages received\n",
268              CkMyPe());
269   }
270
271   if (mig_msgs_received != mig_msgs_expected) {
272     return;
273   }
274
275   //  CkPrintf("[%d] in ReceiveMigration %d moves\n",CkMyPe(),msg->n_moves);
276   for(int neigh=0; neigh < mig_msgs_received;neigh++) {
277     NLBMigrateMsg* m = mig_msgs[neigh];
278     for(int i=0; i < m->n_moves; i++) {
279       MigrateInfo& move = m->moves[i];
280       const int me = CkMyPe();
281       if (move.from_pe != me && move.to_pe == me) {
282         migrates_expected++;
283       }
284     }
285     delete m;
286     mig_msgs[neigh]=0;
287   }
288   //  CkPrintf("[%d] in ReceiveMigration %d expected\n",
289   //       CkMyPe(),migrates_expected);
290   mig_msgs_received = 0;
291   if (migrates_expected == 0 || migrates_expected == migrates_completed)
292     MigrationDone();
293 }
294
295
296 void NborBaseLB::MigrationDone()
297 {
298   if (CkMyPe() == 0) {
299     double end_lb_time = CmiWallTimer();
300     CkPrintf("Load balancing step %d finished at %f duration %f\n",
301              step(),end_lb_time,end_lb_time - start_lb_time);
302   }
303   migrates_completed = 0;
304   migrates_expected = -1;
305   // Increment to next step
306   mystep++;
307   CProxy_NborBaseLB(thisgroup).ResumeClients(CkMyPe());
308 }
309
310 void NborBaseLB::ResumeClients()
311 {
312   theLbdb->ResumeClients();
313 }
314
315 NLBMigrateMsg* NborBaseLB::Strategy(LDStats* stats,int count)
316 {
317   for(int j=0; j < count; j++) {
318     CkPrintf(
319     "[%d] Proc %d Speed %d Total(wall,cpu)=%f %f Idle=%f Bg=%f %f obj=%f %f\n",
320     CkMyPe(),stats[j].from_pe,stats[j].proc_speed,
321     stats[j].total_walltime,stats[j].total_cputime,
322     stats[j].idletime,stats[j].bg_walltime,stats[j].bg_cputime,
323     stats[j].obj_walltime,stats[j].obj_cputime);
324   }
325
326   delete [] myStats.objData;
327   myStats.obj_data_sz = 0;
328   delete [] myStats.commData;
329   myStats.comm_data_sz = 0;
330
331   int sizes=0;
332   NLBMigrateMsg* msg = new(&sizes,1) NLBMigrateMsg;
333   msg->n_moves = 0;
334
335   return msg;
336 }
337
338 void* NLBMigrateMsg::alloc(int msgnum, size_t size, int* array, int priobits)
339 {
340   int totalsize = size + array[0] * sizeof(NborBaseLB::MigrateInfo);
341
342   NLBMigrateMsg* ret =
343     static_cast<NLBMigrateMsg*>(CkAllocMsg(msgnum,totalsize,priobits));
344
345   ret->moves = reinterpret_cast<NborBaseLB::MigrateInfo*>
346     (reinterpret_cast<char*>(ret)+ size);
347
348   return static_cast<void*>(ret);
349 }
350
351 void* NLBMigrateMsg::pack(NLBMigrateMsg* m)
352 {
353   m->moves = reinterpret_cast<NborBaseLB::MigrateInfo*>
354     (reinterpret_cast<char*>(m->moves) - reinterpret_cast<char*>(&m->moves));
355
356   return static_cast<void*>(m);
357 }
358
359 NLBMigrateMsg* NLBMigrateMsg::unpack(void *m)
360 {
361   NLBMigrateMsg* ret_val = static_cast<NLBMigrateMsg*>(m);
362
363   ret_val->moves = reinterpret_cast<NborBaseLB::MigrateInfo*>
364     (reinterpret_cast<char*>(&ret_val->moves) 
365      + reinterpret_cast<size_t>(ret_val->moves));
366
367   return ret_val;
368 }
369
370 #endif