9b3b1bb601c8294a7077d1338005b51665f7a5e3
[charm.git] / src / ck-ldb / WSLB.C
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 /**
9  * \addtogroup CkLdb
10 */
11 /*@{*/
12
13 #ifndef  WIN32
14 #include <unistd.h>
15 #endif
16 #include <charm++.h>
17 #include <BaseLB.h>
18 #include <cklists.h>
19 #include "heap.h"
20 #include "WSLB.h"
21
22 // Temporary vacating flags
23 // Set PROC to -1 to disable
24
25 #define VACATE_PROC -1
26 //#define VACATE_PROC (CkNumPes()/2)
27 #define VACATE_AFTER 30
28 #define UNVACATE_AFTER 15
29
30 CreateLBFunc_Def(WSLB);
31
32
33 void WSLB::staticMigrated(void* data, LDObjHandle h)
34 {
35   WSLB *me = (WSLB*)(data);
36
37   me->Migrated(h);
38 }
39
40 void WSLB::staticAtSync(void* data)
41 {
42   WSLB *me = (WSLB*)(data);
43
44   me->AtSync();
45 }
46
47 WSLB::WSLB(const CkLBOptions &opt) : BaseLB(opt) 
48 {
49 #if CMK_LBDB_ON
50   thisProxy = CProxy_WSLB(thisgroup);
51   lbname = "WSLB";
52   if (CkMyPe() == 0)
53     CkPrintf("[%d] WSLB created\n",CkMyPe());
54
55   mystep = 0;
56   theLbdb->
57     AddLocalBarrierReceiver((LDBarrierFn)(staticAtSync),(void*)(this));
58   theLbdb->
59     NotifyMigrated((LDMigratedFn)(staticMigrated),(void*)(this));
60
61
62   // I had to move neighbor initialization outside the constructor
63   // in order to get the virtual functions of any derived classes
64   // so I'll just set them to illegal values here.
65   neighbor_pes = 0;
66   stats_msg_count = 0;
67   statsMsgsList = 0;
68   statsDataList = 0;
69   migrates_completed = 0;
70   migrates_expected = -1;
71   mig_msgs_received = 0;
72   mig_msgs = 0;
73
74   myStats.proc_speed = theLbdb->ProcessorSpeed();
75 //  char hostname[80];
76 //  gethostname(hostname,79);
77 //  CkPrintf("[%d] host %s speed %d\n",CkMyPe(),hostname,myStats.proc_speed);
78   myStats.obj_data_sz = 0;
79   myStats.comm_data_sz = 0;
80   receive_stats_ready = 0;
81
82   vacate = CmiFalse;
83   usage = 1.0;
84   usage_int_err = 0.;
85
86   theLbdb->CollectStatsOn();
87 #endif
88 }
89
90 WSLB::~WSLB()
91 {
92   CkPrintf("Going away\n");
93 }
94
95 void WSLB::FindNeighbors()
96 {
97   if (neighbor_pes == 0) { // Neighbors never initialized, so init them
98                            // and other things that depend on the number
99                            // of neighbors
100     statsMsgsList = new WSLBStatsMsg*[num_neighbors()];
101     for(int i=0; i < num_neighbors(); i++)
102       statsMsgsList[i] = 0;
103     statsDataList = new LDStats[num_neighbors()];
104
105     neighbor_pes = new int[num_neighbors()];
106     neighbors(neighbor_pes);
107     mig_msgs_expected = num_neighbors();
108     mig_msgs = new LBMigrateMsg*[num_neighbors()];
109   }
110
111 }
112
113 void WSLB::AtSync()
114 {
115 #if CMK_LBDB_ON
116   //  CkPrintf("[%d] WSLB At Sync step %d!!!!\n",CkMyPe(),mystep);
117
118   if (CkMyPe() == 0) {
119     start_lb_time = CmiWallTimer();
120     CkPrintf("Load balancing step %d starting at %f\n",
121              step(),start_lb_time);
122   }
123
124   if (neighbor_pes == 0) FindNeighbors();
125
126   if (!QueryBalanceNow(step()) || num_neighbors() == 0) {
127     MigrationDone();
128     return;
129   }
130
131   WSLBStatsMsg* msg = AssembleStats();
132
133   thisProxy.ReceiveStats(msg,num_neighbors(),neighbor_pes);
134
135   // Tell our own node that we are ready
136   ReceiveStats((WSLBStatsMsg*)0);
137 #endif
138 }
139
140 WSLBStatsMsg* WSLB::AssembleStats()
141 {
142 #if CMK_LBDB_ON
143   // Get stats
144   theLbdb->TotalTime(&myStats.total_walltime,&myStats.total_cputime);
145   theLbdb->IdleTime(&myStats.idletime);
146   theLbdb->BackgroundLoad(&myStats.bg_walltime,&myStats.bg_cputime);
147   myStats.obj_data_sz = theLbdb->GetObjDataSz();
148   myStats.objData = new LDObjData[myStats.obj_data_sz];
149   theLbdb->GetObjData(myStats.objData);
150
151   myStats.comm_data_sz = theLbdb->GetCommDataSz();
152   myStats.commData = new LDCommData[myStats.comm_data_sz];
153   theLbdb->GetCommData(myStats.commData);
154
155   myStats.obj_walltime = myStats.obj_cputime = 0;
156   for(int i=0; i < myStats.obj_data_sz; i++) {
157     myStats.obj_walltime += myStats.objData[i].wallTime;
158     myStats.obj_cputime += myStats.objData[i].cpuTime;
159   }    
160
161   WSLBStatsMsg* msg = new WSLBStatsMsg;
162
163   // Calculate usage percentage
164   double myload = myStats.total_walltime - myStats.idletime;
165   double myusage;
166 //   for(i=0; i < myStats.obj_data_sz; i++) {
167 //     myobjcpu += myStats.objData[i].cpuTime;
168 //     myobjwall += myStats.objData[i].wallTime;
169 //   }
170 //   if (myobjwall > 0)
171 //     myusage = myobjcpu / myobjwall;
172 //   else
173
174   if (myload > 0)
175     myusage = myStats.total_cputime / myload;
176   else myusage = 1.0;
177   // Apply proportional-integral control on usage changes
178   const double usage_err = myusage - usage;
179   usage_int_err += usage_err;
180   usage += usage_err * 0.1 + usage_int_err * 0.01;
181   //  CkPrintf("[%d] Usage err = %f %f\n",CkMyPe(),usage_err,usage_int_err);
182  
183   // Allow usage to decrease quickly, but increase slowly
184   //   if (myusage > usage)
185   //     usage += (myusage-usage) * 0.1;
186   //   else usage = myusage;
187  
188
189   //  CkPrintf("PE %d myload = %f myusage = %f usage = %f\n",
190   //       CkMyPe(),myload,myusage,usage);
191
192   msg->from_pe = CkMyPe();
193   // msg->serial = rand();
194   msg->serial = CrnRand();
195   msg->proc_speed = myStats.proc_speed;
196   msg->total_walltime = myStats.total_walltime;
197   msg->total_cputime = myStats.total_cputime;
198   msg->idletime = myStats.idletime;
199   msg->bg_walltime = myStats.bg_walltime;
200   msg->bg_cputime = myStats.bg_cputime;
201   msg->obj_walltime = myStats.obj_walltime;
202   msg->obj_cputime = myStats.obj_cputime;
203   msg->vacate_me = vacate;
204   msg->usage = usage;
205
206   //  CkPrintf(
207   //    "Proc %d speed=%d Total(wall,cpu)=%f %f Idle=%f Bg=%f %f Obj=%f %f\n",
208   //    CkMyPe(),msg->proc_speed,msg->total_walltime,msg->total_cputime,
209   //    msg->idletime,msg->bg_walltime,msg->bg_cputime,
210   //    msg->obj_walltime,msg->obj_cputime);
211
212   //  CkPrintf("PE %d sending %d to ReceiveStats %d objs, %d comm\n",
213   //       CkMyPe(),msg->serial,msg->n_objs,msg->n_comm);
214   return msg;
215 #else
216   return NULL;
217 #endif
218 }
219
220 void WSLB::Migrated(LDObjHandle h)
221 {
222 #if CMK_LBDB_ON
223   migrates_completed++;
224   //  CkPrintf("[%d] An object migrated! %d %d\n",
225   //       CkMyPe(),migrates_completed,migrates_expected);
226   if (migrates_completed == migrates_expected) {
227     MigrationDone();
228   }
229 #endif
230 }
231
232 void WSLB::ReceiveStats(WSLBStatsMsg *m)
233 {
234 #if CMK_LBDB_ON
235   if (neighbor_pes == 0) FindNeighbors();
236
237   if (m == 0) { // This is from our own node
238     receive_stats_ready = 1;
239   } else {
240     const int pe = m->from_pe;
241     //  CkPrintf("Stats msg received, %d %d %d %d %p\n",
242     //             pe,stats_msg_count,m->n_objs,m->serial,m);
243     int peslot = -1;
244     for(int i=0; i < num_neighbors(); i++) {
245       if (pe == neighbor_pes[i]) {
246         peslot = i;
247         break;
248       }
249     }
250     if (peslot == -1 || statsMsgsList[peslot] != 0) {
251       CkPrintf("*** Unexpected WSLBStatsMsg in ReceiveStats from PE %d ***\n",
252                pe);
253     } else {
254       statsMsgsList[peslot] = m;
255       statsDataList[peslot].from_pe = m->from_pe;
256       statsDataList[peslot].total_walltime = m->total_walltime;
257       statsDataList[peslot].total_cputime = m->total_cputime;
258       statsDataList[peslot].idletime = m->idletime;
259       statsDataList[peslot].bg_walltime = m->bg_walltime;
260       statsDataList[peslot].bg_cputime = m->bg_cputime;
261       statsDataList[peslot].proc_speed = m->proc_speed;
262       statsDataList[peslot].obj_walltime = m->obj_walltime;
263       statsDataList[peslot].obj_cputime = m->obj_cputime;
264       statsDataList[peslot].vacate_me = m->vacate_me;
265       statsDataList[peslot].usage = m->usage;
266       stats_msg_count++;
267     }
268   }
269
270   const int clients = num_neighbors();
271   if (stats_msg_count == clients && receive_stats_ready) {
272     double strat_start_time = CmiWallTimer();
273     receive_stats_ready = 0;
274     LBMigrateMsg* migrateMsg = Strategy(statsDataList,clients);
275
276     int i;
277
278     // Migrate messages from me to elsewhere
279     for(i=0; i < migrateMsg->n_moves; i++) {
280       MigrateInfo& move = migrateMsg->moves[i];
281       const int me = CkMyPe();
282       if (move.from_pe == me && move.to_pe != me) {
283         theLbdb->Migrate(move.obj,move.to_pe);
284       } else if (move.from_pe != me) {
285         CkPrintf("[%d] error, strategy wants to move from %d to  %d\n",
286                  me,move.from_pe,move.to_pe);
287       }
288     }
289     
290     // Now, send migrate messages to neighbors
291     thisProxy.ReceiveMigration(migrateMsg,
292         num_neighbors(),neighbor_pes);
293     
294     // Zero out data structures for next cycle
295     for(i=0; i < clients; i++) {
296       delete statsMsgsList[i];
297       statsMsgsList[i]=0;
298     }
299     stats_msg_count=0;
300
301     theLbdb->ClearLoads();
302     if (CkMyPe() == 0) {
303       double strat_end_time = CmiWallTimer();
304       CkPrintf("Strat elapsed time %f\n",strat_end_time-strat_start_time);
305     }
306   }
307 #endif  
308 }
309
310 void WSLB::ReceiveMigration(LBMigrateMsg *msg)
311 {
312 #if CMK_LBDB_ON
313   if (neighbor_pes == 0) FindNeighbors();
314
315   if (mig_msgs_received == 0) migrates_expected = 0;
316
317   mig_msgs[mig_msgs_received] = msg;
318   mig_msgs_received++;
319   //  CkPrintf("[%d] Received migration msg %d of %d\n",
320   //       CkMyPe(),mig_msgs_received,mig_msgs_expected);
321
322   if (mig_msgs_received > mig_msgs_expected) {
323     CkPrintf("[%d] WSLB Error! Too many migration messages received\n",
324              CkMyPe());
325   }
326
327   if (mig_msgs_received != mig_msgs_expected) {
328     return;
329   }
330
331   //  CkPrintf("[%d] in ReceiveMigration %d moves\n",CkMyPe(),msg->n_moves);
332   for(int neigh=0; neigh < mig_msgs_received;neigh++) {
333     LBMigrateMsg* m = mig_msgs[neigh];
334     for(int i=0; i < m->n_moves; i++) {
335       MigrateInfo& move = m->moves[i];
336       const int me = CkMyPe();
337       if (move.from_pe != me && move.to_pe == me) {
338         migrates_expected++;
339       }
340     }
341     delete m;
342     mig_msgs[neigh]=0;
343   }
344   //  CkPrintf("[%d] in ReceiveMigration %d expected\n",
345   //       CkMyPe(),migrates_expected);
346   mig_msgs_received = 0;
347   if (migrates_expected == 0 || migrates_expected == migrates_completed)
348     MigrationDone();
349 #endif
350 }
351
352
353 void WSLB::MigrationDone()
354 {
355 #if CMK_LBDB_ON
356   if (CkMyPe() == 0) {
357     double end_lb_time = CmiWallTimer();
358     CkPrintf("Load balancing step %d finished at %f duration %f\n",
359              step(),end_lb_time,end_lb_time - start_lb_time);
360   }
361   migrates_completed = 0;
362   migrates_expected = -1;
363   // Increment to next step
364   mystep++;
365   thisProxy [CkMyPe()].ResumeClients();
366 #endif
367 }
368
369 void WSLB::ResumeClients()
370 {
371 #if CMK_LBDB_ON
372   theLbdb->ResumeClients();
373 #endif
374 }
375
376 CmiBool WSLB::QueryBalanceNow(int step)
377 {
378 #if CMK_LBDB_ON
379   double now = CmiWallTimer();
380
381   if (step==0)
382     first_step_time = now;
383   else if (CkMyPe() == VACATE_PROC && now > VACATE_AFTER
384            && now < (VACATE_AFTER+UNVACATE_AFTER)) {
385     if (vacate == CmiFalse) 
386       CkPrintf("PE %d vacating at %f\n",CkMyPe(),now);
387     vacate = CmiTrue;
388   } else {
389     if (vacate == CmiTrue)
390       CkPrintf("PE %d unvacating at %f\n",CkMyPe(),now);
391     vacate = CmiFalse;
392   }
393 #endif
394   return CmiTrue;
395 }
396
397 LBMigrateMsg* WSLB::Strategy(WSLB::LDStats* stats, int count)
398 {
399 #if CMK_LBDB_ON
400   //  CkPrintf("[%d] Strategy starting\n",CkMyPe());
401   // Compute the average load to see if we are overloaded relative
402   // to our neighbors
403   const double load_factor = 1.05;
404   double objload;
405
406   double myload = myStats.total_walltime - myStats.idletime;
407   double avgload = myload;
408   int unvacated_neighbors = 0;
409   int i;
410   for(i=0; i < count; i++) {
411     // If the neighbor is vacating, skip him
412     if (stats[i].vacate_me)
413       continue;
414
415     // Scale times we need appropriately for relative proc speeds
416     double hisload = stats[i].total_walltime - stats[i].idletime;
417     const double hisusage = stats[i].usage;
418
419     const double scale =  (myStats.proc_speed * usage) 
420       / (stats[i].proc_speed * hisusage);
421
422     hisload *= scale;
423     stats[i].total_walltime *= scale;
424     stats[i].idletime *= scale;
425
426     //    CkPrintf("PE %d %d hisload = %f hisusage = %f\n",
427     //       CkMyPe(),i,hisload,hisusage);
428     avgload += hisload;
429     unvacated_neighbors++;
430   }
431   if (vacate && unvacated_neighbors == 0)
432     CkPrintf("[%d] ALL NEIGHBORS WANT TO VACATE!!!\n",CkMyPe());
433
434   avgload /= (unvacated_neighbors+1);
435
436   CkVec<MigrateInfo*> migrateInfo;
437
438   // If we want to vacate, we always dump our load, otherwise
439   // only if we are overloaded
440
441   if (vacate || myload > avgload) {
442     //    CkPrintf("[%d] OVERLOAD My load is %f, average load is %f\n",
443     //       CkMyPe(),myload,avgload);
444
445     // First, build heaps of other processors and my objects
446     // Then assign objects to other processors until either
447     //   - The smallest remaining object would put me below average, or
448     //   - I only have 1 object left, or
449     //   - The smallest remaining object would put someone else 
450     //     above average
451
452     // Build heaps
453     minHeap procs(count);
454     for(i=0; i < count; i++) {
455       // If all my neighbors vacate, I won't have anyone to give work 
456       // to
457       if (!stats[i].vacate_me) {
458         InfoRecord* item = new InfoRecord;
459         item->load = stats[i].total_walltime - stats[i].idletime;
460         item->Id =  stats[i].from_pe;
461         procs.insert(item);
462       }
463     }
464       
465     maxHeap objs(myStats.obj_data_sz);
466     for(i=0; i < myStats.obj_data_sz; i++) {
467       InfoRecord* item = new InfoRecord;
468       item->load = myStats.objData[i].wallTime;
469       item->Id = i;
470       objs.insert(item);
471     }
472
473     int objs_here = myStats.obj_data_sz;
474     do {
475       //      if (objs_here <= 1) break;  // For now, always leave 1 object
476
477       InfoRecord* p;
478       InfoRecord* obj;
479
480       // Get the lightest-loaded processor
481       p = procs.deleteMin();
482       if (p == 0) {
483         //      CkPrintf("[%d] No destination PE found!\n",CkMyPe());
484         break;
485       }
486
487       // Get the biggest object
488       CmiBool objfound = CmiFalse;
489       do {
490         obj = objs.deleteMax();
491         if (obj == 0) break;
492
493         objload = load_factor * obj->load;
494
495         double new_p_load = p->load + objload;
496         double my_new_load = myload - objload;
497
498         // If we're vacating, the biggest object is always good.
499         // Otherwise, only take it if it doesn't produce overload
500         if (vacate || new_p_load < my_new_load) {
501           objfound = CmiTrue;
502         } else {
503           // This object is too big, so throw it away
504 //        CkPrintf("[%d] Can't move object w/ load %f to proc %d load %f %f\n",
505 //                 CkMyPe(),obj->load,p->Id,p->load,avgload);
506           delete obj;
507         }
508       } while (!objfound);
509
510       if (!objfound) {
511         //      CkPrintf("[%d] No suitable object found!\n",CkMyPe());
512         break;
513       }
514
515       const int me = CkMyPe();
516       // Apparently we can give this object to this processor
517       CkPrintf("[%d] Obj %d of %d migrating from %d to %d\n",
518                CkMyPe(),obj->Id,myStats.obj_data_sz,me,p->Id);
519
520       MigrateInfo* migrateMe = new MigrateInfo;
521       migrateMe->obj = myStats.objData[obj->Id].handle;
522       migrateMe->from_pe = me;
523       migrateMe->to_pe = p->Id;
524       migrateInfo.insertAtEnd(migrateMe);
525
526       objs_here--;
527       
528       // We may want to assign more to this processor, so lets
529       // update it and put it back in the heap
530       p->load += objload;
531       myload -= objload;
532       procs.insert(p);
533       
534       // This object is assigned, so we delete it from the heap
535       delete obj;
536
537     } while(vacate || myload > avgload);
538
539     // Now empty out the heaps
540     InfoRecord* p;
541     while (NULL!=(p=procs.deleteMin()))
542       delete p;
543     InfoRecord* obj;
544     while (NULL!=(obj=objs.deleteMax()))
545       delete obj;
546   }  
547
548   // Now build the message to actually perform the migrations
549   int migrate_count=migrateInfo.length();
550   //  if (migrate_count) {
551   //    CkPrintf("PE %d: Sent away %d of %d objects\n",
552   //         CkMyPe(),migrate_count,myStats.obj_data_sz);
553   //  }
554   LBMigrateMsg* msg = new(migrate_count,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
555   msg->n_moves = migrate_count;
556   for(i=0; i < migrate_count; i++) {
557     MigrateInfo* item = (MigrateInfo*) migrateInfo[i];
558     msg->moves[i] = *item;
559     delete item;
560     migrateInfo[i] = 0;
561   }
562
563   return msg;
564 #else
565   return NULL;
566 #endif
567 };
568
569 #include "WSLB.def.h"
570
571
572 /*@}*/