register WSLB.
[charm.git] / src / ck-ldb / WSLB.C
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 /**
9  * \addtogroup CkLdb
10 */
11 /*@{*/
12
13 #ifndef  WIN32
14 #include <unistd.h>
15 #endif
16 #include <charm++.h>
17 #include <BaseLB.h>
18 #include <cklists.h>
19 #include "heap.h"
20 #include "WSLB.h"
21
22 // Temporary vacating flags
23 // Set PROC to -1 to disable
24
25 #define VACATE_PROC -1
26 //#define VACATE_PROC (CkNumPes()/2)
27 #define VACATE_AFTER 30
28 #define UNVACATE_AFTER 15
29
30 CreateLBFunc_Def(WSLB);
31
32 static void lbinit(void) {
33   LBRegisterBalancer("WSLB", 
34                      CreateWSLB, 
35                      AllocateWSLB, 
36                      "Workstation load balancer");
37 }
38
39
40 void WSLB::staticMigrated(void* data, LDObjHandle h)
41 {
42   WSLB *me = (WSLB*)(data);
43
44   me->Migrated(h);
45 }
46
47 void WSLB::staticAtSync(void* data)
48 {
49   WSLB *me = (WSLB*)(data);
50
51   me->AtSync();
52 }
53
54 WSLB::WSLB(const CkLBOptions &opt) : BaseLB(opt) 
55 {
56 #if CMK_LBDB_ON
57   thisProxy = CProxy_WSLB(thisgroup);
58   lbname = "WSLB";
59   if (CkMyPe() == 0)
60     CkPrintf("[%d] WSLB created\n",CkMyPe());
61
62   mystep = 0;
63   theLbdb->
64     AddLocalBarrierReceiver((LDBarrierFn)(staticAtSync),(void*)(this));
65   theLbdb->
66     NotifyMigrated((LDMigratedFn)(staticMigrated),(void*)(this));
67
68
69   // I had to move neighbor initialization outside the constructor
70   // in order to get the virtual functions of any derived classes
71   // so I'll just set them to illegal values here.
72   neighbor_pes = 0;
73   stats_msg_count = 0;
74   statsMsgsList = 0;
75   statsDataList = 0;
76   migrates_completed = 0;
77   migrates_expected = -1;
78   mig_msgs_received = 0;
79   mig_msgs = 0;
80
81   myStats.proc_speed = theLbdb->ProcessorSpeed();
82 //  char hostname[80];
83 //  gethostname(hostname,79);
84 //  CkPrintf("[%d] host %s speed %d\n",CkMyPe(),hostname,myStats.proc_speed);
85   myStats.obj_data_sz = 0;
86   myStats.comm_data_sz = 0;
87   receive_stats_ready = 0;
88
89   vacate = CmiFalse;
90   usage = 1.0;
91   usage_int_err = 0.;
92
93   theLbdb->CollectStatsOn();
94 #endif
95 }
96
97 WSLB::~WSLB()
98 {
99   CkPrintf("Going away\n");
100 }
101
102 void WSLB::FindNeighbors()
103 {
104   if (neighbor_pes == 0) { // Neighbors never initialized, so init them
105                            // and other things that depend on the number
106                            // of neighbors
107     statsMsgsList = new WSLBStatsMsg*[num_neighbors()];
108     for(int i=0; i < num_neighbors(); i++)
109       statsMsgsList[i] = 0;
110     statsDataList = new LDStats[num_neighbors()];
111
112     neighbor_pes = new int[num_neighbors()];
113     neighbors(neighbor_pes);
114     mig_msgs_expected = num_neighbors();
115     mig_msgs = new LBMigrateMsg*[num_neighbors()];
116   }
117
118 }
119
120 void WSLB::AtSync()
121 {
122 #if CMK_LBDB_ON
123   //  CkPrintf("[%d] WSLB At Sync step %d!!!!\n",CkMyPe(),mystep);
124
125   if (CkMyPe() == 0) {
126     start_lb_time = CmiWallTimer();
127     CkPrintf("Load balancing step %d starting at %f\n",
128              step(),start_lb_time);
129   }
130
131   if (neighbor_pes == 0) FindNeighbors();
132
133   if (!QueryBalanceNow(step()) || num_neighbors() == 0) {
134     MigrationDone();
135     return;
136   }
137
138   WSLBStatsMsg* msg = AssembleStats();
139
140   thisProxy.ReceiveStats(msg,num_neighbors(),neighbor_pes);
141
142   // Tell our own node that we are ready
143   ReceiveStats((WSLBStatsMsg*)0);
144 #endif
145 }
146
147 WSLBStatsMsg* WSLB::AssembleStats()
148 {
149 #if CMK_LBDB_ON
150   // Get stats
151   theLbdb->TotalTime(&myStats.total_walltime,&myStats.total_cputime);
152   theLbdb->IdleTime(&myStats.idletime);
153   theLbdb->BackgroundLoad(&myStats.bg_walltime,&myStats.bg_cputime);
154   myStats.obj_data_sz = theLbdb->GetObjDataSz();
155   myStats.objData = new LDObjData[myStats.obj_data_sz];
156   theLbdb->GetObjData(myStats.objData);
157
158   myStats.comm_data_sz = theLbdb->GetCommDataSz();
159   myStats.commData = new LDCommData[myStats.comm_data_sz];
160   theLbdb->GetCommData(myStats.commData);
161
162   myStats.obj_walltime = myStats.obj_cputime = 0;
163   for(int i=0; i < myStats.obj_data_sz; i++) {
164     myStats.obj_walltime += myStats.objData[i].wallTime;
165     myStats.obj_cputime += myStats.objData[i].cpuTime;
166   }    
167
168   WSLBStatsMsg* msg = new WSLBStatsMsg;
169
170   // Calculate usage percentage
171   double myload = myStats.total_walltime - myStats.idletime;
172   double myusage;
173 //   for(i=0; i < myStats.obj_data_sz; i++) {
174 //     myobjcpu += myStats.objData[i].cpuTime;
175 //     myobjwall += myStats.objData[i].wallTime;
176 //   }
177 //   if (myobjwall > 0)
178 //     myusage = myobjcpu / myobjwall;
179 //   else
180
181   if (myload > 0)
182     myusage = myStats.total_cputime / myload;
183   else myusage = 1.0;
184   // Apply proportional-integral control on usage changes
185   const double usage_err = myusage - usage;
186   usage_int_err += usage_err;
187   usage += usage_err * 0.1 + usage_int_err * 0.01;
188   //  CkPrintf("[%d] Usage err = %f %f\n",CkMyPe(),usage_err,usage_int_err);
189  
190   // Allow usage to decrease quickly, but increase slowly
191   //   if (myusage > usage)
192   //     usage += (myusage-usage) * 0.1;
193   //   else usage = myusage;
194  
195
196   //  CkPrintf("PE %d myload = %f myusage = %f usage = %f\n",
197   //       CkMyPe(),myload,myusage,usage);
198
199   msg->from_pe = CkMyPe();
200   // msg->serial = rand();
201   msg->serial = CrnRand();
202   msg->proc_speed = myStats.proc_speed;
203   msg->total_walltime = myStats.total_walltime;
204   msg->total_cputime = myStats.total_cputime;
205   msg->idletime = myStats.idletime;
206   msg->bg_walltime = myStats.bg_walltime;
207   msg->bg_cputime = myStats.bg_cputime;
208   msg->obj_walltime = myStats.obj_walltime;
209   msg->obj_cputime = myStats.obj_cputime;
210   msg->vacate_me = vacate;
211   msg->usage = usage;
212
213   if (_lb_debug) {
214     CkPrintf(
215       "Proc %d speed=%d Total(wall,cpu)=%f %f Idle=%f Bg=%f %f Obj=%f %f\n",
216       CkMyPe(),msg->proc_speed,msg->total_walltime,msg->total_cputime,
217       msg->idletime,msg->bg_walltime,msg->bg_cputime,
218       msg->obj_walltime,msg->obj_cputime);
219   }
220
221   //  CkPrintf("PE %d sending %d to ReceiveStats %d objs, %d comm\n",
222   //       CkMyPe(),msg->serial,msg->n_objs,msg->n_comm);
223   return msg;
224 #else
225   return NULL;
226 #endif
227 }
228
229 void WSLB::Migrated(LDObjHandle h)
230 {
231 #if CMK_LBDB_ON
232   migrates_completed++;
233   //  CkPrintf("[%d] An object migrated! %d %d\n",
234   //       CkMyPe(),migrates_completed,migrates_expected);
235   if (migrates_completed == migrates_expected) {
236     MigrationDone();
237   }
238 #endif
239 }
240
241 void WSLB::ReceiveStats(WSLBStatsMsg *m)
242 {
243 #if CMK_LBDB_ON
244   if (neighbor_pes == 0) FindNeighbors();
245
246   if (m == 0) { // This is from our own node
247     receive_stats_ready = 1;
248   } else {
249     const int pe = m->from_pe;
250     //  CkPrintf("Stats msg received, %d %d %d %d %p\n",
251     //             pe,stats_msg_count,m->n_objs,m->serial,m);
252     int peslot = -1;
253     for(int i=0; i < num_neighbors(); i++) {
254       if (pe == neighbor_pes[i]) {
255         peslot = i;
256         break;
257       }
258     }
259     if (peslot == -1 || statsMsgsList[peslot] != 0) {
260       CkPrintf("*** Unexpected WSLBStatsMsg in ReceiveStats from PE %d ***\n",
261                pe);
262     } else {
263       statsMsgsList[peslot] = m;
264       statsDataList[peslot].from_pe = m->from_pe;
265       statsDataList[peslot].total_walltime = m->total_walltime;
266       statsDataList[peslot].total_cputime = m->total_cputime;
267       statsDataList[peslot].idletime = m->idletime;
268       statsDataList[peslot].bg_walltime = m->bg_walltime;
269       statsDataList[peslot].bg_cputime = m->bg_cputime;
270       statsDataList[peslot].proc_speed = m->proc_speed;
271       statsDataList[peslot].obj_walltime = m->obj_walltime;
272       statsDataList[peslot].obj_cputime = m->obj_cputime;
273       statsDataList[peslot].vacate_me = m->vacate_me;
274       statsDataList[peslot].usage = m->usage;
275       stats_msg_count++;
276     }
277   }
278
279   const int clients = num_neighbors();
280   if (stats_msg_count == clients && receive_stats_ready) {
281     double strat_start_time = CmiWallTimer();
282     receive_stats_ready = 0;
283     LBMigrateMsg* migrateMsg = Strategy(statsDataList,clients);
284
285     int i;
286
287     // Migrate messages from me to elsewhere
288     for(i=0; i < migrateMsg->n_moves; i++) {
289       MigrateInfo& move = migrateMsg->moves[i];
290       const int me = CkMyPe();
291       if (move.from_pe == me && move.to_pe != me) {
292         theLbdb->Migrate(move.obj,move.to_pe);
293       } else if (move.from_pe != me) {
294         CkPrintf("[%d] error, strategy wants to move from %d to  %d\n",
295                  me,move.from_pe,move.to_pe);
296       }
297     }
298     
299     // Now, send migrate messages to neighbors
300     thisProxy.ReceiveMigration(migrateMsg,
301         num_neighbors(),neighbor_pes);
302     
303     // Zero out data structures for next cycle
304     for(i=0; i < clients; i++) {
305       delete statsMsgsList[i];
306       statsMsgsList[i]=0;
307     }
308     stats_msg_count=0;
309
310     theLbdb->ClearLoads();
311     if (CkMyPe() == 0) {
312       double strat_end_time = CmiWallTimer();
313       CkPrintf("Strat elapsed time %f\n",strat_end_time-strat_start_time);
314     }
315   }
316 #endif  
317 }
318
319 void WSLB::ReceiveMigration(LBMigrateMsg *msg)
320 {
321 #if CMK_LBDB_ON
322   if (neighbor_pes == 0) FindNeighbors();
323
324   if (mig_msgs_received == 0) migrates_expected = 0;
325
326   mig_msgs[mig_msgs_received] = msg;
327   mig_msgs_received++;
328   //  CkPrintf("[%d] Received migration msg %d of %d\n",
329   //       CkMyPe(),mig_msgs_received,mig_msgs_expected);
330
331   if (mig_msgs_received > mig_msgs_expected) {
332     CkPrintf("[%d] WSLB Error! Too many migration messages received\n",
333              CkMyPe());
334   }
335
336   if (mig_msgs_received != mig_msgs_expected) {
337     return;
338   }
339
340   //  CkPrintf("[%d] in ReceiveMigration %d moves\n",CkMyPe(),msg->n_moves);
341   for(int neigh=0; neigh < mig_msgs_received;neigh++) {
342     LBMigrateMsg* m = mig_msgs[neigh];
343     for(int i=0; i < m->n_moves; i++) {
344       MigrateInfo& move = m->moves[i];
345       const int me = CkMyPe();
346       if (move.from_pe != me && move.to_pe == me) {
347         migrates_expected++;
348       }
349     }
350     delete m;
351     mig_msgs[neigh]=0;
352   }
353   //  CkPrintf("[%d] in ReceiveMigration %d expected\n",
354   //       CkMyPe(),migrates_expected);
355   mig_msgs_received = 0;
356   if (migrates_expected == 0 || migrates_expected == migrates_completed)
357     MigrationDone();
358 #endif
359 }
360
361
362 void WSLB::MigrationDone()
363 {
364 #if CMK_LBDB_ON
365   if (CkMyPe() == 0) {
366     double end_lb_time = CmiWallTimer();
367     CkPrintf("Load balancing step %d finished at %f duration %f\n",
368              step(),end_lb_time,end_lb_time - start_lb_time);
369   }
370   migrates_completed = 0;
371   migrates_expected = -1;
372   // Increment to next step
373   mystep++;
374   thisProxy [CkMyPe()].ResumeClients();
375 #endif
376 }
377
378 void WSLB::ResumeClients()
379 {
380 #if CMK_LBDB_ON
381   theLbdb->ResumeClients();
382 #endif
383 }
384
385 CmiBool WSLB::QueryBalanceNow(int step)
386 {
387 #if CMK_LBDB_ON
388   double now = CmiWallTimer();
389
390   if (step==0)
391     first_step_time = now;
392   else if (CkMyPe() == VACATE_PROC && now > VACATE_AFTER
393            && now < (VACATE_AFTER+UNVACATE_AFTER)) {
394     if (vacate == CmiFalse) 
395       CkPrintf("PE %d vacating at %f\n",CkMyPe(),now);
396     vacate = CmiTrue;
397   } else {
398     if (vacate == CmiTrue)
399       CkPrintf("PE %d unvacating at %f\n",CkMyPe(),now);
400     vacate = CmiFalse;
401   }
402 #endif
403   return CmiTrue;
404 }
405
406 LBMigrateMsg* WSLB::Strategy(WSLB::LDStats* stats, int count)
407 {
408 #if CMK_LBDB_ON
409   //  CkPrintf("[%d] Strategy starting\n",CkMyPe());
410   // Compute the average load to see if we are overloaded relative
411   // to our neighbors
412   const double load_factor = 1.05;
413   double objload;
414
415   double myload = myStats.total_walltime - myStats.idletime;
416   double avgload = myload;
417   int unvacated_neighbors = 0;
418   int i;
419   for(i=0; i < count; i++) {
420     // If the neighbor is vacating, skip him
421     if (stats[i].vacate_me)
422       continue;
423
424     // Scale times we need appropriately for relative proc speeds
425     double hisload = stats[i].total_walltime - stats[i].idletime;
426     const double hisusage = stats[i].usage;
427
428     const double scale =  (myStats.proc_speed * usage) 
429       / (stats[i].proc_speed * hisusage);
430
431     hisload *= scale;
432     stats[i].total_walltime *= scale;
433     stats[i].idletime *= scale;
434
435     //    CkPrintf("PE %d %d hisload = %f hisusage = %f\n",
436     //       CkMyPe(),i,hisload,hisusage);
437     avgload += hisload;
438     unvacated_neighbors++;
439   }
440   if (vacate && unvacated_neighbors == 0)
441     CkPrintf("[%d] ALL NEIGHBORS WANT TO VACATE!!!\n",CkMyPe());
442
443   avgload /= (unvacated_neighbors+1);
444
445   CkVec<MigrateInfo*> migrateInfo;
446
447   // If we want to vacate, we always dump our load, otherwise
448   // only if we are overloaded
449
450   if (vacate || myload > avgload) {
451     //    CkPrintf("[%d] OVERLOAD My load is %f, average load is %f\n",
452     //       CkMyPe(),myload,avgload);
453
454     // First, build heaps of other processors and my objects
455     // Then assign objects to other processors until either
456     //   - The smallest remaining object would put me below average, or
457     //   - I only have 1 object left, or
458     //   - The smallest remaining object would put someone else 
459     //     above average
460
461     // Build heaps
462     minHeap procs(count);
463     for(i=0; i < count; i++) {
464       // If all my neighbors vacate, I won't have anyone to give work 
465       // to
466       if (!stats[i].vacate_me) {
467         InfoRecord* item = new InfoRecord;
468         item->load = stats[i].total_walltime - stats[i].idletime;
469         item->Id =  stats[i].from_pe;
470         procs.insert(item);
471       }
472     }
473       
474     maxHeap objs(myStats.obj_data_sz);
475     for(i=0; i < myStats.obj_data_sz; i++) {
476       InfoRecord* item = new InfoRecord;
477       item->load = myStats.objData[i].wallTime;
478       item->Id = i;
479       objs.insert(item);
480     }
481
482     int objs_here = myStats.obj_data_sz;
483     do {
484       //      if (objs_here <= 1) break;  // For now, always leave 1 object
485
486       InfoRecord* p;
487       InfoRecord* obj;
488
489       // Get the lightest-loaded processor
490       p = procs.deleteMin();
491       if (p == 0) {
492         //      CkPrintf("[%d] No destination PE found!\n",CkMyPe());
493         break;
494       }
495
496       // Get the biggest object
497       CmiBool objfound = CmiFalse;
498       do {
499         obj = objs.deleteMax();
500         if (obj == 0) break;
501
502         objload = load_factor * obj->load;
503
504         double new_p_load = p->load + objload;
505         double my_new_load = myload - objload;
506
507         // If we're vacating, the biggest object is always good.
508         // Otherwise, only take it if it doesn't produce overload
509         if (vacate || new_p_load < my_new_load) {
510           objfound = CmiTrue;
511         } else {
512           // This object is too big, so throw it away
513 //        CkPrintf("[%d] Can't move object w/ load %f to proc %d load %f %f\n",
514 //                 CkMyPe(),obj->load,p->Id,p->load,avgload);
515           delete obj;
516         }
517       } while (!objfound);
518
519       if (!objfound) {
520         //      CkPrintf("[%d] No suitable object found!\n",CkMyPe());
521         break;
522       }
523
524       const int me = CkMyPe();
525       // Apparently we can give this object to this processor
526       CkPrintf("[%d] Obj %d of %d migrating from %d to %d\n",
527                CkMyPe(),obj->Id,myStats.obj_data_sz,me,p->Id);
528
529       MigrateInfo* migrateMe = new MigrateInfo;
530       migrateMe->obj = myStats.objData[obj->Id].handle;
531       migrateMe->from_pe = me;
532       migrateMe->to_pe = p->Id;
533       migrateInfo.insertAtEnd(migrateMe);
534
535       objs_here--;
536       
537       // We may want to assign more to this processor, so lets
538       // update it and put it back in the heap
539       p->load += objload;
540       myload -= objload;
541       procs.insert(p);
542       
543       // This object is assigned, so we delete it from the heap
544       delete obj;
545
546     } while(vacate || myload > avgload);
547
548     // Now empty out the heaps
549     InfoRecord* p;
550     while (NULL!=(p=procs.deleteMin()))
551       delete p;
552     InfoRecord* obj;
553     while (NULL!=(obj=objs.deleteMax()))
554       delete obj;
555   }  
556
557   // Now build the message to actually perform the migrations
558   int migrate_count=migrateInfo.length();
559   //  if (migrate_count) {
560   //    CkPrintf("PE %d: Sent away %d of %d objects\n",
561   //         CkMyPe(),migrate_count,myStats.obj_data_sz);
562   //  }
563   LBMigrateMsg* msg = new(migrate_count,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
564   msg->n_moves = migrate_count;
565   for(i=0; i < migrate_count; i++) {
566     MigrateInfo* item = (MigrateInfo*) migrateInfo[i];
567     msg->moves[i] = *item;
568     delete item;
569     migrateInfo[i] = 0;
570   }
571
572   return msg;
573 #else
574   return NULL;
575 #endif
576 };
577
578 #include "WSLB.def.h"
579
580
581 /*@}*/