ignore idle timers for BigSim, changed CmiWallTimer to CkWallTimer() to better handle...
[charm.git] / src / ck-ldb / WSLB.C
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 /**
9  * \addtogroup CkLdb
10 */
11 /*@{*/
12
13 #ifndef  WIN32
14 #include <unistd.h>
15 #endif
16 #include "charm++.h"
17 #include <BaseLB.h>
18 #include <cklists.h>
19 #include "heap.h"
20 #include "WSLB.h"
21 #include "LBDBManager.h"
22
23 // Temporary vacating flags
24 // Set PROC to -1 to disable
25
26 #define VACATE_PROC -1
27 //#define VACATE_PROC (CkNumPes()/2)
28 #define VACATE_AFTER 30
29 #define UNVACATE_AFTER 15
30
31 CreateLBFunc_Def(WSLB);
32
33 static void lbinit(void) {
34   LBRegisterBalancer("WSLB", 
35                      CreateWSLB, 
36                      AllocateWSLB, 
37                      "Workstation load balancer");
38 }
39
40
41 void WSLB::staticMigrated(void* data, LDObjHandle h, int waitBarrier)
42 {
43   WSLB *me = (WSLB*)(data);
44
45   me->Migrated(h, waitBarrier);
46 }
47
48 void WSLB::staticAtSync(void* data)
49 {
50   WSLB *me = (WSLB*)(data);
51
52   me->AtSync();
53 }
54
55 WSLB::WSLB(const CkLBOptions &opt) : BaseLB(opt) 
56 {
57 #if CMK_LBDB_ON
58   thisProxy = CProxy_WSLB(thisgroup);
59   lbname = "WSLB";
60   if (CkMyPe() == 0)
61     CkPrintf("[%d] WSLB created\n",CkMyPe());
62
63   mystep = 0;
64   theLbdb->
65     AddLocalBarrierReceiver((LDBarrierFn)(staticAtSync),(void*)(this));
66   notifier = theLbdb->getLBDB()->
67     NotifyMigrated((LDMigratedFn)(staticMigrated),(void*)(this));
68
69
70    LBtopoFn topofn = LBTopoLookup(_lbtopo);
71   if (topofn == NULL) {
72     if (CkMyPe()==0) CmiPrintf("LB> Fatal error: Unknown topology: %s.\n", _lbtopo);
73     CmiAbort("");
74   }
75   topo = topofn();
76
77   // I had to move neighbor initialization outside the constructor
78   // in order to get the virtual functions of any derived classes
79   // so I'll just set them to illegal values here.
80   neighbor_pes = NULL;
81   stats_msg_count = 0;
82   statsMsgsList = NULL;
83   statsDataList = NULL;
84   migrates_completed = 0;
85   migrates_expected = -1;
86   mig_msgs_received = 0;
87   mig_msgs = NULL;
88
89   myStats.proc_speed = theLbdb->ProcessorSpeed();
90 //  char hostname[80];
91 //  gethostname(hostname,79);
92 //  CkPrintf("[%d] host %s speed %d\n",CkMyPe(),hostname,myStats.proc_speed);
93   myStats.obj_data_sz = 0;
94   myStats.comm_data_sz = 0;
95   receive_stats_ready = 0;
96
97   vacate = CmiFalse;
98   usage = 1.0;
99   usage_int_err = 0.;
100
101   theLbdb->CollectStatsOn();
102 #endif
103 }
104
105 WSLB::~WSLB()
106 {
107 #if CMK_LBDB_ON
108   theLbdb = CProxy_LBDatabase(_lbdb).ckLocalBranch();
109   if (theLbdb) {
110     theLbdb->getLBDB()->
111       RemoveNotifyMigrated(notifier);
112     //theLbdb->
113     //  RemoveStartLBFn((LDStartLBFn)(staticStartLB));
114   }
115   if (statsMsgsList) delete [] statsMsgsList;
116   if (statsDataList) delete [] statsDataList;
117   if (neighbor_pes)  delete [] neighbor_pes;
118   if (mig_msgs)      delete [] mig_msgs;
119 #endif
120 }
121
122 void WSLB::FindNeighbors()
123 {
124   if (neighbor_pes == 0) { // Neighbors never initialized, so init them
125                            // and other things that depend on the number
126                            // of neighbors
127     int maxneighbors = topo->max_neighbors();
128     statsMsgsList = new WSLBStatsMsg*[maxneighbors];
129     for(int i=0; i < maxneighbors; i++)
130       statsMsgsList[i] = 0;
131     statsDataList = new LDStats[maxneighbors];
132
133     neighbor_pes = new int[maxneighbors];
134     topo->neighbors(CkMyPe(), neighbor_pes, mig_msgs_expected);
135     mig_msgs = new LBMigrateMsg*[mig_msgs_expected];
136   }
137
138 }
139
140 void WSLB::AtSync()
141 {
142 #if CMK_LBDB_ON
143   //  CkPrintf("[%d] WSLB At Sync step %d!!!!\n",CkMyPe(),mystep);
144
145   if (CkMyPe() == 0) {
146     start_lb_time = CkWallTimer();
147     CkPrintf("Load balancing step %d starting at %f\n",
148              step(),start_lb_time);
149   }
150
151   if (neighbor_pes == 0) FindNeighbors();
152
153   if (!QueryBalanceNow(step()) || mig_msgs_expected == 0) {
154     MigrationDone();
155     return;
156   }
157
158   WSLBStatsMsg* msg = AssembleStats();
159
160   thisProxy.ReceiveStats(msg,mig_msgs_expected,neighbor_pes);
161
162   // Tell our own node that we are ready
163   ReceiveStats((WSLBStatsMsg*)0);
164 #endif
165 }
166
167 WSLBStatsMsg* WSLB::AssembleStats()
168 {
169 #if CMK_LBDB_ON
170   // Get stats
171   theLbdb->TotalTime(&myStats.total_walltime,&myStats.total_cputime);
172   theLbdb->IdleTime(&myStats.idletime);
173   theLbdb->BackgroundLoad(&myStats.bg_walltime,&myStats.bg_cputime);
174   myStats.obj_data_sz = theLbdb->GetObjDataSz();
175   myStats.objData = new LDObjData[myStats.obj_data_sz];
176   theLbdb->GetObjData(myStats.objData);
177
178   myStats.comm_data_sz = theLbdb->GetCommDataSz();
179   myStats.commData = new LDCommData[myStats.comm_data_sz];
180   theLbdb->GetCommData(myStats.commData);
181
182   myStats.obj_walltime = myStats.obj_cputime = 0;
183   for(int i=0; i < myStats.obj_data_sz; i++) {
184     myStats.obj_walltime += myStats.objData[i].wallTime;
185     myStats.obj_cputime += myStats.objData[i].cpuTime;
186   }    
187
188   WSLBStatsMsg* msg = new WSLBStatsMsg;
189
190   // Calculate usage percentage
191   double myload = myStats.total_walltime - myStats.idletime;
192   double myusage;
193 //   for(i=0; i < myStats.obj_data_sz; i++) {
194 //     myobjcpu += myStats.objData[i].cpuTime;
195 //     myobjwall += myStats.objData[i].wallTime;
196 //   }
197 //   if (myobjwall > 0)
198 //     myusage = myobjcpu / myobjwall;
199 //   else
200
201   if (myload > 0)
202     myusage = myStats.total_cputime / myload;
203   else myusage = 1.0;
204   // Apply proportional-integral control on usage changes
205   const double usage_err = myusage - usage;
206   usage_int_err += usage_err;
207   usage += usage_err * 0.1 + usage_int_err * 0.01;
208   //  CkPrintf("[%d] Usage err = %f %f\n",CkMyPe(),usage_err,usage_int_err);
209  
210   // Allow usage to decrease quickly, but increase slowly
211   //   if (myusage > usage)
212   //     usage += (myusage-usage) * 0.1;
213   //   else usage = myusage;
214  
215
216   //  CkPrintf("PE %d myload = %f myusage = %f usage = %f\n",
217   //       CkMyPe(),myload,myusage,usage);
218
219   msg->from_pe = CkMyPe();
220   // msg->serial = rand();
221   msg->serial = CrnRand();
222   msg->proc_speed = myStats.proc_speed;
223   msg->total_walltime = myStats.total_walltime;
224   msg->total_cputime = myStats.total_cputime;
225   msg->idletime = myStats.idletime;
226   msg->bg_walltime = myStats.bg_walltime;
227   msg->bg_cputime = myStats.bg_cputime;
228   msg->obj_walltime = myStats.obj_walltime;
229   msg->obj_cputime = myStats.obj_cputime;
230   msg->vacate_me = vacate;
231   msg->usage = usage;
232
233   if (_lb_args.debug()) {
234     CkPrintf(
235       "Proc %d speed=%d Total(wall,cpu)=%f %f Idle=%f Bg=%f %f Obj=%f %f\n",
236       CkMyPe(),msg->proc_speed,msg->total_walltime,msg->total_cputime,
237       msg->idletime,msg->bg_walltime,msg->bg_cputime,
238       msg->obj_walltime,msg->obj_cputime);
239   }
240
241   //  CkPrintf("PE %d sending %d to ReceiveStats %d objs, %d comm\n",
242   //       CkMyPe(),msg->serial,msg->n_objs,msg->n_comm);
243   return msg;
244 #else
245   return NULL;
246 #endif
247 }
248
249 void WSLB::Migrated(LDObjHandle h, int waitBarrier)
250 {
251 #if CMK_LBDB_ON
252   migrates_completed++;
253   //  CkPrintf("[%d] An object migrated! %d %d\n",
254   //       CkMyPe(),migrates_completed,migrates_expected);
255   if (migrates_completed == migrates_expected) {
256     MigrationDone();
257   }
258 #endif
259 }
260
261 void WSLB::ReceiveStats(WSLBStatsMsg *m)
262 {
263 #if CMK_LBDB_ON
264   if (neighbor_pes == 0) FindNeighbors();
265
266   if (m == 0) { // This is from our own node
267     receive_stats_ready = 1;
268   } else {
269     const int pe = m->from_pe;
270     //  CkPrintf("Stats msg received, %d %d %d %d %p\n",
271     //             pe,stats_msg_count,m->n_objs,m->serial,m);
272     int peslot = -1;
273     for(int i=0; i < mig_msgs_expected; i++) {
274       if (pe == neighbor_pes[i]) {
275         peslot = i;
276         break;
277       }
278     }
279     if (peslot == -1 || statsMsgsList[peslot] != 0) {
280       CkPrintf("*** Unexpected WSLBStatsMsg in ReceiveStats from PE %d ***\n",
281                pe);
282     } else {
283       statsMsgsList[peslot] = m;
284       statsDataList[peslot].from_pe = m->from_pe;
285       statsDataList[peslot].total_walltime = m->total_walltime;
286       statsDataList[peslot].total_cputime = m->total_cputime;
287       statsDataList[peslot].idletime = m->idletime;
288       statsDataList[peslot].bg_walltime = m->bg_walltime;
289       statsDataList[peslot].bg_cputime = m->bg_cputime;
290       statsDataList[peslot].proc_speed = m->proc_speed;
291       statsDataList[peslot].obj_walltime = m->obj_walltime;
292       statsDataList[peslot].obj_cputime = m->obj_cputime;
293       statsDataList[peslot].vacate_me = m->vacate_me;
294       statsDataList[peslot].usage = m->usage;
295       stats_msg_count++;
296     }
297   }
298
299   const int clients = mig_msgs_expected;
300   if (stats_msg_count == clients && receive_stats_ready) {
301     double strat_start_time = CkWallTimer();
302     receive_stats_ready = 0;
303     LBMigrateMsg* migrateMsg = Strategy(statsDataList,clients);
304
305     int i;
306
307     // Migrate messages from me to elsewhere
308     for(i=0; i < migrateMsg->n_moves; i++) {
309       MigrateInfo& move = migrateMsg->moves[i];
310       const int me = CkMyPe();
311       if (move.from_pe == me && move.to_pe != me) {
312         theLbdb->Migrate(move.obj,move.to_pe);
313       } else if (move.from_pe != me) {
314         CkPrintf("[%d] error, strategy wants to move from %d to  %d\n",
315                  me,move.from_pe,move.to_pe);
316       }
317     }
318     
319     // Now, send migrate messages to neighbors
320     thisProxy.ReceiveMigration(migrateMsg,mig_msgs_expected,neighbor_pes);
321     
322     // Zero out data structures for next cycle
323     for(i=0; i < clients; i++) {
324       delete statsMsgsList[i];
325       statsMsgsList[i]=0;
326     }
327     stats_msg_count=0;
328
329     theLbdb->ClearLoads();
330     if (CkMyPe() == 0) {
331       double strat_end_time = CkWallTimer();
332       CkPrintf("Strat elapsed time %f\n",strat_end_time-strat_start_time);
333     }
334   }
335 #endif  
336 }
337
338 void WSLB::ReceiveMigration(LBMigrateMsg *msg)
339 {
340 #if CMK_LBDB_ON
341   if (neighbor_pes == 0) FindNeighbors();
342
343   if (mig_msgs_received == 0) migrates_expected = 0;
344
345   mig_msgs[mig_msgs_received] = msg;
346   mig_msgs_received++;
347   //  CkPrintf("[%d] Received migration msg %d of %d\n",
348   //       CkMyPe(),mig_msgs_received,mig_msgs_expected);
349
350   if (mig_msgs_received > mig_msgs_expected) {
351     CkPrintf("[%d] WSLB Error! Too many migration messages received\n",
352              CkMyPe());
353   }
354
355   if (mig_msgs_received != mig_msgs_expected) {
356     return;
357   }
358
359   //  CkPrintf("[%d] in ReceiveMigration %d moves\n",CkMyPe(),msg->n_moves);
360   for(int neigh=0; neigh < mig_msgs_received;neigh++) {
361     LBMigrateMsg* m = mig_msgs[neigh];
362     for(int i=0; i < m->n_moves; i++) {
363       MigrateInfo& move = m->moves[i];
364       const int me = CkMyPe();
365       if (move.from_pe != me && move.to_pe == me) {
366         migrates_expected++;
367       }
368     }
369     delete m;
370     mig_msgs[neigh]=0;
371   }
372   //  CkPrintf("[%d] in ReceiveMigration %d expected\n",
373   //       CkMyPe(),migrates_expected);
374   mig_msgs_received = 0;
375   if (migrates_expected == 0 || migrates_expected == migrates_completed)
376     MigrationDone();
377 #endif
378 }
379
380
381 void WSLB::MigrationDone()
382 {
383 #if CMK_LBDB_ON
384   if (CkMyPe() == 0) {
385     double end_lb_time = CkWallTimer();
386     CkPrintf("Load balancing step %d finished at %f duration %f\n",
387              step(),end_lb_time,end_lb_time - start_lb_time);
388   }
389   migrates_completed = 0;
390   migrates_expected = -1;
391   // Increment to next step
392   mystep++;
393   thisProxy [CkMyPe()].ResumeClients();
394 #endif
395 }
396
397 void WSLB::ResumeClients()
398 {
399 #if CMK_LBDB_ON
400   theLbdb->ResumeClients();
401 #endif
402 }
403
404 CmiBool WSLB::QueryBalanceNow(int step)
405 {
406 #if CMK_LBDB_ON
407   double now = CkWallTimer();
408
409   if (step==0)
410     first_step_time = now;
411   else if (CkMyPe() == VACATE_PROC && now > VACATE_AFTER
412            && now < (VACATE_AFTER+UNVACATE_AFTER)) {
413     if (vacate == CmiFalse) 
414       CkPrintf("PE %d vacating at %f\n",CkMyPe(),now);
415     vacate = CmiTrue;
416   } else {
417     if (vacate == CmiTrue)
418       CkPrintf("PE %d unvacating at %f\n",CkMyPe(),now);
419     vacate = CmiFalse;
420   }
421 #endif
422   return CmiTrue;
423 }
424
425 LBMigrateMsg* WSLB::Strategy(WSLB::LDStats* stats, int count)
426 {
427 #if CMK_LBDB_ON
428   //  CkPrintf("[%d] Strategy starting\n",CkMyPe());
429   // Compute the average load to see if we are overloaded relative
430   // to our neighbors
431   const double load_factor = 1.05;
432   double objload;
433
434   double myload = myStats.total_walltime - myStats.idletime;
435   double avgload = myload;
436   int unvacated_neighbors = 0;
437   int i;
438   for(i=0; i < count; i++) {
439     // If the neighbor is vacating, skip him
440     if (stats[i].vacate_me)
441       continue;
442
443     // Scale times we need appropriately for relative proc speeds
444     double hisload = stats[i].total_walltime - stats[i].idletime;
445     const double hisusage = stats[i].usage;
446
447     const double scale =  (myStats.proc_speed * usage) 
448       / (stats[i].proc_speed * hisusage);
449
450     hisload *= scale;
451     stats[i].total_walltime *= scale;
452     stats[i].idletime *= scale;
453
454     //    CkPrintf("PE %d %d hisload = %f hisusage = %f\n",
455     //       CkMyPe(),i,hisload,hisusage);
456     avgload += hisload;
457     unvacated_neighbors++;
458   }
459   if (vacate && unvacated_neighbors == 0)
460     CkPrintf("[%d] ALL NEIGHBORS WANT TO VACATE!!!\n",CkMyPe());
461
462   avgload /= (unvacated_neighbors+1);
463
464   CkVec<MigrateInfo*> migrateInfo;
465
466   // If we want to vacate, we always dump our load, otherwise
467   // only if we are overloaded
468
469   if (vacate || myload > avgload) {
470     //    CkPrintf("[%d] OVERLOAD My load is %f, average load is %f\n",
471     //       CkMyPe(),myload,avgload);
472
473     // First, build heaps of other processors and my objects
474     // Then assign objects to other processors until either
475     //   - The smallest remaining object would put me below average, or
476     //   - I only have 1 object left, or
477     //   - The smallest remaining object would put someone else 
478     //     above average
479
480     // Build heaps
481     minHeap procs(count);
482     for(i=0; i < count; i++) {
483       // If all my neighbors vacate, I won't have anyone to give work 
484       // to
485       if (!stats[i].vacate_me) {
486         InfoRecord* item = new InfoRecord;
487         item->load = stats[i].total_walltime - stats[i].idletime;
488         item->Id =  stats[i].from_pe;
489         procs.insert(item);
490       }
491     }
492       
493     maxHeap objs(myStats.obj_data_sz);
494     for(i=0; i < myStats.obj_data_sz; i++) {
495       InfoRecord* item = new InfoRecord;
496       item->load = myStats.objData[i].wallTime;
497       item->Id = i;
498       objs.insert(item);
499     }
500
501     int objs_here = myStats.obj_data_sz;
502     do {
503       //      if (objs_here <= 1) break;  // For now, always leave 1 object
504
505       InfoRecord* p;
506       InfoRecord* obj;
507
508       // Get the lightest-loaded processor
509       p = procs.deleteMin();
510       if (p == 0) {
511         //      CkPrintf("[%d] No destination PE found!\n",CkMyPe());
512         break;
513       }
514
515       // Get the biggest object
516       CmiBool objfound = CmiFalse;
517       do {
518         obj = objs.deleteMax();
519         if (obj == 0) break;
520
521         objload = load_factor * obj->load;
522
523         double new_p_load = p->load + objload;
524         double my_new_load = myload - objload;
525
526         // If we're vacating, the biggest object is always good.
527         // Otherwise, only take it if it doesn't produce overload
528         if (vacate || new_p_load < my_new_load) {
529           objfound = CmiTrue;
530         } else {
531           // This object is too big, so throw it away
532 //        CkPrintf("[%d] Can't move object w/ load %f to proc %d load %f %f\n",
533 //                 CkMyPe(),obj->load,p->Id,p->load,avgload);
534           delete obj;
535         }
536       } while (!objfound);
537
538       if (!objfound) {
539         //      CkPrintf("[%d] No suitable object found!\n",CkMyPe());
540         break;
541       }
542
543       const int me = CkMyPe();
544       // Apparently we can give this object to this processor
545       if (_lb_args.debug())
546       CkPrintf("[%d] Obj %d of %d migrating from %d to %d\n",
547                CkMyPe(),obj->Id,myStats.obj_data_sz,me,p->Id);
548
549       MigrateInfo* migrateMe = new MigrateInfo;
550       migrateMe->obj = myStats.objData[obj->Id].handle;
551       migrateMe->from_pe = me;
552       migrateMe->to_pe = p->Id;
553       migrateInfo.insertAtEnd(migrateMe);
554
555       objs_here--;
556       
557       // We may want to assign more to this processor, so lets
558       // update it and put it back in the heap
559       p->load += objload;
560       myload -= objload;
561       procs.insert(p);
562       
563       // This object is assigned, so we delete it from the heap
564       delete obj;
565
566     } while(vacate || myload > avgload);
567
568     // Now empty out the heaps
569     InfoRecord* p;
570     while (NULL!=(p=procs.deleteMin()))
571       delete p;
572     InfoRecord* obj;
573     while (NULL!=(obj=objs.deleteMax()))
574       delete obj;
575   }  
576
577   // Now build the message to actually perform the migrations
578   int migrate_count=migrateInfo.length();
579   //  if (migrate_count) {
580   //    CkPrintf("PE %d: Sent away %d of %d objects\n",
581   //         CkMyPe(),migrate_count,myStats.obj_data_sz);
582   //  }
583   LBMigrateMsg* msg = new(migrate_count,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
584   msg->n_moves = migrate_count;
585   for(i=0; i < migrate_count; i++) {
586     MigrateInfo* item = (MigrateInfo*) migrateInfo[i];
587     msg->moves[i] = *item;
588     delete item;
589     migrateInfo[i] = 0;
590   }
591
592   return msg;
593 #else
594   return NULL;
595 #endif
596 };
597
598 #include "WSLB.def.h"
599
600
601 /*@}*/