I Rearranged the load balancer, so WSLB->NeighborLB, NeighborLB->NborBaseLB,
[charm.git] / src / ck-ldb / WSLB.C
1 #include <unistd.h>
2 #include <charm++.h>
3 #include <LBDatabase.h>
4 #include <CkLists.h>
5 #include "heap.h"
6 #include "WSLB.h"
7 #include "WSLB.def.h"
8
9 CkGroupID wslb;
10
11 #if CMK_LBDB_ON
12
13 void CreateWSLB()
14 {
15   wslb = CProxy_WSLB::ckNew();
16 }
17
18 void WSLB::staticMigrated(void* data, LDObjHandle h)
19 {
20   WSLB *me = static_cast<WSLB*>(data);
21
22   me->Migrated(h);
23 }
24
25 void WSLB::staticAtSync(void* data)
26 {
27   WSLB *me = static_cast<WSLB*>(data);
28
29   me->AtSync();
30 }
31
32 WSLB::WSLB()
33 {
34   mystep = 0;
35   theLbdb = CProxy_LBDatabase(lbdb).ckLocalBranch();
36   theLbdb->
37     AddLocalBarrierReceiver(reinterpret_cast<LDBarrierFn>(staticAtSync),
38                             static_cast<void*>(this));
39   theLbdb->
40     NotifyMigrated(reinterpret_cast<LDMigratedFn>(staticMigrated),
41                    static_cast<void*>(this));
42
43
44   // I had to move neighbor initialization outside the constructor
45   // in order to get the virtual functions of any derived classes
46   // so I'll just set them to illegal values here.
47   neighbor_pes = 0;
48   stats_msg_count = 0;
49   statsMsgsList = 0;
50   statsDataList = 0;
51   migrates_completed = 0;
52   migrates_expected = -1;
53   mig_msgs_received = 0;
54   mig_msgs = 0;
55
56   myStats.proc_speed = theLbdb->ProcessorSpeed();
57 //  char hostname[80];
58 //  gethostname(hostname,79);
59 //  CkPrintf("[%d] host %s speed %d\n",CkMyPe(),hostname,myStats.proc_speed);
60   myStats.obj_data_sz = 0;
61   myStats.comm_data_sz = 0;
62   receive_stats_ready = 0;
63
64   theLbdb->CollectStatsOn();
65 }
66
67 WSLB::~WSLB()
68 {
69   CkPrintf("Going away\n");
70 }
71
72 void WSLB::FindNeighbors()
73 {
74   if (neighbor_pes == 0) { // Neighbors never initialized, so init them
75                            // and other things that depend on the number
76                            // of neighbors
77     statsMsgsList = new WSLBStatsMsg*[num_neighbors()];
78     for(int i=0; i < num_neighbors(); i++)
79       statsMsgsList[i] = 0;
80     statsDataList = new LDStats[num_neighbors()];
81
82     neighbor_pes = new int[num_neighbors()];
83     neighbors(neighbor_pes);
84     mig_msgs_expected = num_neighbors();
85     mig_msgs = new WSLBMigrateMsg*[num_neighbors()];
86   }
87
88 }
89
90 void WSLB::AtSync()
91 {
92   //  CkPrintf("[%d] WSLB At Sync step %d!!!!\n",CkMyPe(),mystep);
93
94   if (CkMyPe() == 0) {
95     start_lb_time = CmiWallTimer();
96     CkPrintf("Load balancing step %d starting at %f\n",
97              step(),start_lb_time);
98   }
99
100   if (neighbor_pes == 0) FindNeighbors();
101
102   if (!QueryBalanceNow(step()) || num_neighbors() == 0) {
103     MigrationDone();
104     return;
105   }
106
107   WSLBStatsMsg* msg = AssembleStats();
108
109   int i;
110   for(i=1; i < num_neighbors(); i++) {
111     WSLBStatsMsg* m2 = (WSLBStatsMsg*) CkCopyMsg((void**)&msg);
112     CProxy_WSLB(thisgroup).ReceiveStats(m2,neighbor_pes[i]);
113   }
114   if (0 < num_neighbors()) {
115     CProxy_WSLB(thisgroup).ReceiveStats(msg,neighbor_pes[0]);
116   } else delete msg;
117
118   // Tell our own node that we are ready
119   ReceiveStats((WSLBStatsMsg*)0);
120 }
121
122 WSLBStatsMsg* WSLB::AssembleStats()
123 {
124   // Get stats
125   theLbdb->TotalTime(&myStats.total_walltime,&myStats.total_cputime);
126   theLbdb->IdleTime(&myStats.idletime);
127   theLbdb->BackgroundLoad(&myStats.bg_walltime,&myStats.bg_cputime);
128   myStats.obj_data_sz = theLbdb->GetObjDataSz();
129   myStats.objData = new LDObjData[myStats.obj_data_sz];
130   theLbdb->GetObjData(myStats.objData);
131
132   myStats.comm_data_sz = theLbdb->GetCommDataSz();
133   myStats.commData = new LDCommData[myStats.comm_data_sz];
134   theLbdb->GetCommData(myStats.commData);
135
136   myStats.obj_walltime = myStats.obj_cputime = 0;
137   for(int i=0; i < myStats.obj_data_sz; i++) {
138     myStats.obj_walltime += myStats.objData[i].wallTime;
139     myStats.obj_cputime += myStats.objData[i].cpuTime;
140   }    
141
142   WSLBStatsMsg* msg = new WSLBStatsMsg;
143
144   msg->from_pe = CkMyPe();
145   msg->serial = rand();
146   msg->proc_speed = myStats.proc_speed;
147   msg->total_walltime = myStats.total_walltime;
148   msg->total_cputime = myStats.total_cputime;
149   msg->idletime = myStats.idletime;
150   msg->bg_walltime = myStats.bg_walltime;
151   msg->bg_cputime = myStats.bg_cputime;
152   msg->obj_walltime = myStats.obj_walltime;
153   msg->obj_cputime = myStats.obj_cputime;
154
155   //  CkPrintf(
156   //    "Proc %d speed=%d Total(wall,cpu)=%f %f Idle=%f Bg=%f %f Obj=%f %f\n",
157   //    CkMyPe(),msg->proc_speed,msg->total_walltime,msg->total_cputime,
158   //    msg->idletime,msg->bg_walltime,msg->bg_cputime,
159   //    msg->obj_walltime,msg->obj_cputime);
160
161   //  CkPrintf("PE %d sending %d to ReceiveStats %d objs, %d comm\n",
162   //       CkMyPe(),msg->serial,msg->n_objs,msg->n_comm);
163   return msg;
164 }
165
166 void WSLB::Migrated(LDObjHandle h)
167 {
168   migrates_completed++;
169   //  CkPrintf("[%d] An object migrated! %d %d\n",
170   //       CkMyPe(),migrates_completed,migrates_expected);
171   if (migrates_completed == migrates_expected) {
172     MigrationDone();
173   }
174 }
175
176 void WSLB::ReceiveStats(WSLBStatsMsg *m)
177 {
178   if (neighbor_pes == 0) FindNeighbors();
179
180   if (m == 0) { // This is from our own node
181     receive_stats_ready = 1;
182   } else {
183     const int pe = m->from_pe;
184     //  CkPrintf("Stats msg received, %d %d %d %d %p\n",
185     //             pe,stats_msg_count,m->n_objs,m->serial,m);
186     int peslot = -1;
187     for(int i=0; i < num_neighbors(); i++) {
188       if (pe == neighbor_pes[i]) {
189         peslot = i;
190         break;
191       }
192     }
193     if (peslot == -1 || statsMsgsList[peslot] != 0) {
194       CkPrintf("*** Unexpected WSLBStatsMsg in ReceiveStats from PE %d ***\n",
195                pe);
196     } else {
197       statsMsgsList[peslot] = m;
198       statsDataList[peslot].from_pe = m->from_pe;
199       statsDataList[peslot].total_walltime = m->total_walltime;
200       statsDataList[peslot].total_cputime = m->total_cputime;
201       statsDataList[peslot].idletime = m->idletime;
202       statsDataList[peslot].bg_walltime = m->bg_walltime;
203       statsDataList[peslot].bg_cputime = m->bg_cputime;
204       statsDataList[peslot].proc_speed = m->proc_speed;
205       statsDataList[peslot].obj_walltime = m->obj_walltime;
206       statsDataList[peslot].obj_cputime = m->obj_cputime;
207       stats_msg_count++;
208     }
209   }
210
211   const int clients = num_neighbors();
212   if (stats_msg_count == clients && receive_stats_ready) {
213     double strat_start_time = CmiWallTimer();
214     receive_stats_ready = 0;
215     WSLBMigrateMsg* migrateMsg = Strategy(statsDataList,clients);
216
217     int i;
218
219     // Migrate messages from me to elsewhere
220     for(i=0; i < migrateMsg->n_moves; i++) {
221       MigrateInfo& move = migrateMsg->moves[i];
222       const int me = CkMyPe();
223       if (move.from_pe == me && move.to_pe != me) {
224         theLbdb->Migrate(move.obj,move.to_pe);
225       } else if (move.from_pe != me) {
226         CkPrintf("[%d] error, strategy wants to move from %d to  %d\n",
227                  me,move.from_pe,move.to_pe);
228       }
229     }
230     
231     // Now, send migrate messages to neighbors
232     for(i=1; i < num_neighbors(); i++) {
233       WSLBMigrateMsg* m2 = (WSLBMigrateMsg*) CkCopyMsg((void**)&migrateMsg);
234       CProxy_WSLB(thisgroup).ReceiveMigration(m2,neighbor_pes[i]);
235     }
236     if (0 < num_neighbors())
237       CProxy_WSLB(thisgroup).ReceiveMigration(migrateMsg,
238                                                     neighbor_pes[0]);
239     else delete migrateMsg;
240     
241     // Zero out data structures for next cycle
242     for(i=0; i < clients; i++) {
243       delete statsMsgsList[i];
244       statsMsgsList[i]=0;
245     }
246     stats_msg_count=0;
247
248     theLbdb->ClearLoads();
249     if (CkMyPe() == 0) {
250       double strat_end_time = CmiWallTimer();
251       CkPrintf("Strat elapsed time %f\n",strat_end_time-strat_start_time);
252     }
253   }
254   
255 }
256
257 void WSLB::ReceiveMigration(WSLBMigrateMsg *msg)
258 {
259   if (neighbor_pes == 0) FindNeighbors();
260
261   if (mig_msgs_received == 0) migrates_expected = 0;
262
263   mig_msgs[mig_msgs_received] = msg;
264   mig_msgs_received++;
265   //  CkPrintf("[%d] Received migration msg %d of %d\n",
266   //       CkMyPe(),mig_msgs_received,mig_msgs_expected);
267
268   if (mig_msgs_received > mig_msgs_expected) {
269     CkPrintf("[%d] WSLB Error! Too many migration messages received\n",
270              CkMyPe());
271   }
272
273   if (mig_msgs_received != mig_msgs_expected) {
274     return;
275   }
276
277   //  CkPrintf("[%d] in ReceiveMigration %d moves\n",CkMyPe(),msg->n_moves);
278   for(int neigh=0; neigh < mig_msgs_received;neigh++) {
279     WSLBMigrateMsg* m = mig_msgs[neigh];
280     for(int i=0; i < m->n_moves; i++) {
281       MigrateInfo& move = m->moves[i];
282       const int me = CkMyPe();
283       if (move.from_pe != me && move.to_pe == me) {
284         migrates_expected++;
285       }
286     }
287     delete m;
288     mig_msgs[neigh]=0;
289   }
290   //  CkPrintf("[%d] in ReceiveMigration %d expected\n",
291   //       CkMyPe(),migrates_expected);
292   mig_msgs_received = 0;
293   if (migrates_expected == 0 || migrates_expected == migrates_completed)
294     MigrationDone();
295 }
296
297
298 void WSLB::MigrationDone()
299 {
300   if (CkMyPe() == 0) {
301     double end_lb_time = CmiWallTimer();
302     CkPrintf("Load balancing step %d finished at %f duration %f\n",
303              step(),end_lb_time,end_lb_time - start_lb_time);
304   }
305   migrates_completed = 0;
306   migrates_expected = -1;
307   // Increment to next step
308   mystep++;
309   CProxy_WSLB(thisgroup).ResumeClients(CkMyPe());
310 }
311
312 void WSLB::ResumeClients()
313 {
314   theLbdb->ResumeClients();
315 }
316
317 WSLBMigrateMsg* WSLB::Strategy(WSLB::LDStats* stats, int count)
318 {
319   //  CkPrintf("[%d] Strategy starting\n",CkMyPe());
320   // Compute the average load to see if we are overloaded relative
321   // to our neighbors
322   double myload = myStats.total_walltime - myStats.idletime;
323   double avgload = myload;
324   int i;
325   for(i=0; i < count; i++) {
326     // Scale times we need appropriately for relative proc speeds
327     const double scale =  ((double)myStats.proc_speed) 
328       / stats[i].proc_speed;
329
330     stats[i].total_walltime *= scale;
331     stats[i].idletime *= scale;
332
333     avgload += (stats[i].total_walltime - stats[i].idletime);
334   }
335   avgload /= (count+1);
336
337   CkVector migrateInfo;
338
339   if (myload > avgload) {
340     //CkPrintf("[%d] OVERLOAD My load is %f, average load is %f\n",
341     //               CkMyPe(),myload,avgload);
342
343     // First, build heaps of other processors and my objects
344     // Then assign objects to other processors until either
345     //   - The smallest remaining object would put me below average, or
346     //   - I only have 1 object left, or
347     //   - The smallest remaining object would put someone else 
348     //     above average
349
350     // Build heaps
351     minHeap procs(count);
352     for(i=0; i < count; i++) {
353       InfoRecord* item = new InfoRecord;
354       item->load = stats[i].total_walltime - stats[i].idletime;
355       item->Id =  stats[i].from_pe;
356       procs.insert(item);
357     }
358       
359     maxHeap objs(myStats.obj_data_sz);
360     for(i=0; i < myStats.obj_data_sz; i++) {
361       InfoRecord* item = new InfoRecord;
362       item->load = myStats.objData[i].wallTime;
363       item->Id = i;
364       objs.insert(item);
365     }
366
367     int objs_here = myStats.obj_data_sz;
368     do {
369       if (objs_here <= 1) break;  // For now, always leave 1 object
370
371       InfoRecord* p;
372       InfoRecord* obj;
373
374       // Get the lightest-loaded processor
375       p = procs.deleteMin();
376       if (p == 0) {
377         //      CkPrintf("[%d] No destination PE found!\n",CkMyPe());
378         break;
379       }
380
381       // Get the biggest object
382       CmiBool objfound = CmiFalse;
383       do {
384         obj = objs.deleteMax();
385         if (obj == 0) break;
386
387         double new_p_load = p->load + obj->load;
388         double my_new_load = myload - obj->load;
389         if (new_p_load < my_new_load) {
390 //      if (new_p_load < avgload) {
391           objfound = CmiTrue;
392         } else {
393           // This object is too big, so throw it away
394 //        CkPrintf("[%d] Can't move object w/ load %f to proc %d load %f %f\n",
395 //                 CkMyPe(),obj->load,p->Id,p->load,avgload);
396           delete obj;
397         }
398       } while (!objfound);
399
400       if (!objfound) {
401         //      CkPrintf("[%d] No suitable object found!\n",CkMyPe());
402         break;
403       }
404
405       const int me = CkMyPe();
406       // Apparently we can give this object to this processor
407       //      CkPrintf("[%d] Obj %d of %d migrating from %d to %d\n",
408       //               CkMyPe(),obj->Id,myStats.obj_data_sz,me,p->Id);
409
410       MigrateInfo* migrateMe = new MigrateInfo;
411       migrateMe->obj = myStats.objData[obj->Id].handle;
412       migrateMe->from_pe = me;
413       migrateMe->to_pe = p->Id;
414       migrateInfo.push_back((void*)migrateMe);
415
416       objs_here--;
417       
418       // We may want to assign more to this processor, so lets
419       // update it and put it back in the heap
420       p->load += obj->load;
421       myload -= obj->load;
422       procs.insert(p);
423       
424       // This object is assigned, so we delete it from the heap
425       delete obj;
426
427     } while(myload > avgload);
428
429     // Now empty out the heaps
430     while (InfoRecord* p=procs.deleteMin())
431       delete p;
432     while (InfoRecord* obj=objs.deleteMax())
433       delete obj;
434   }  
435
436   // Now build the message to actually perform the migrations
437   int migrate_count=migrateInfo.size();
438   //  if (migrate_count > 0) {
439   //    CkPrintf("PE %d migrating %d elements\n",CkMyPe(),migrate_count);
440   //  }
441   WSLBMigrateMsg* msg = new(&migrate_count,1) WSLBMigrateMsg;
442   msg->n_moves = migrate_count;
443   for(i=0; i < migrate_count; i++) {
444     MigrateInfo* item = (MigrateInfo*) migrateInfo[i];
445     msg->moves[i] = *item;
446     delete item;
447     migrateInfo[i] = 0;
448   }
449
450   return msg;
451 };
452
453 void* WSLBMigrateMsg::alloc(int msgnum, size_t size, int* array, int priobits)
454 {
455   int totalsize = size + array[0] * sizeof(WSLB::MigrateInfo);
456
457   WSLBMigrateMsg* ret =
458     static_cast<WSLBMigrateMsg*>(CkAllocMsg(msgnum,totalsize,priobits));
459
460   ret->moves = reinterpret_cast<WSLB::MigrateInfo*>
461     (reinterpret_cast<char*>(ret)+ size);
462
463   return static_cast<void*>(ret);
464 }
465
466 void* WSLBMigrateMsg::pack(WSLBMigrateMsg* m)
467 {
468   m->moves = reinterpret_cast<WSLB::MigrateInfo*>
469     (reinterpret_cast<char*>(m->moves) - reinterpret_cast<char*>(&m->moves));
470
471   return static_cast<void*>(m);
472 }
473
474 WSLBMigrateMsg* WSLBMigrateMsg::unpack(void *m)
475 {
476   WSLBMigrateMsg* ret_val = static_cast<WSLBMigrateMsg*>(m);
477
478   ret_val->moves = reinterpret_cast<WSLB::MigrateInfo*>
479     (reinterpret_cast<char*>(&ret_val->moves) 
480      + reinterpret_cast<size_t>(ret_val->moves));
481
482   return ret_val;
483 }
484
485 #endif