doc: Add serial to list of ci file reserved words
[charm.git] / src / ck-ldb / WSLB.C
1 /**
2  * \addtogroup CkLdb
3 */
4 /*@{*/
5
6 #ifndef  WIN32
7 #include <unistd.h>
8 #endif
9
10 #include "elements.h"
11 #include "ckheap.h"
12 #include "WSLB.h"
13 #include "LBDBManager.h"
14
15 // Temporary vacating flags
16 // Set PROC to -1 to disable
17
18 #define VACATE_PROC -1
19 //#define VACATE_PROC (CkNumPes()/2)
20 #define VACATE_AFTER 30
21 #define UNVACATE_AFTER 15
22
23 CreateLBFunc_Def(WSLB, "Workstation load balancer")
24
25 void WSLB::staticMigrated(void* data, LDObjHandle h, int waitBarrier)
26 {
27   WSLB *me = (WSLB*)(data);
28
29   me->Migrated(h, waitBarrier);
30 }
31
32 void WSLB::staticAtSync(void* data)
33 {
34   WSLB *me = (WSLB*)(data);
35
36   me->AtSync();
37 }
38
39 WSLB::WSLB(const CkLBOptions &opt) : BaseLB(opt) 
40 {
41 #if CMK_LBDB_ON
42   thisProxy = CProxy_WSLB(thisgroup);
43   lbname = "WSLB";
44   if (CkMyPe() == 0)
45     CkPrintf("[%d] WSLB created\n",CkMyPe());
46
47   mystep = 0;
48   theLbdb->
49     AddLocalBarrierReceiver((LDBarrierFn)(staticAtSync),(void*)(this));
50   notifier = theLbdb->getLBDB()->
51     NotifyMigrated((LDMigratedFn)(staticMigrated),(void*)(this));
52
53
54    LBtopoFn topofn = LBTopoLookup(_lbtopo);
55   if (topofn == NULL) {
56     if (CkMyPe()==0) CmiPrintf("LB> Fatal error: Unknown topology: %s.\n", _lbtopo);
57     CmiAbort("");
58   }
59   topo = topofn(CkNumPes());
60
61   // I had to move neighbor initialization outside the constructor
62   // in order to get the virtual functions of any derived classes
63   // so I'll just set them to illegal values here.
64   neighbor_pes = NULL;
65   stats_msg_count = 0;
66   statsMsgsList = NULL;
67   statsDataList = NULL;
68   migrates_completed = 0;
69   migrates_expected = -1;
70   mig_msgs_received = 0;
71   mig_msgs = NULL;
72
73   myStats.proc_speed = theLbdb->ProcessorSpeed();
74 //  char hostname[80];
75 //  gethostname(hostname,79);
76 //  CkPrintf("[%d] host %s speed %d\n",CkMyPe(),hostname,myStats.proc_speed);
77   myStats.obj_data_sz = 0;
78   myStats.comm_data_sz = 0;
79   receive_stats_ready = 0;
80
81   vacate = CmiFalse;
82   usage = 1.0;
83   usage_int_err = 0.;
84
85   theLbdb->CollectStatsOn();
86 #endif
87 }
88
89 WSLB::~WSLB()
90 {
91 #if CMK_LBDB_ON
92   theLbdb = CProxy_LBDatabase(_lbdb).ckLocalBranch();
93   if (theLbdb) {
94     theLbdb->getLBDB()->
95       RemoveNotifyMigrated(notifier);
96     //theLbdb->
97     //  RemoveStartLBFn((LDStartLBFn)(staticStartLB));
98   }
99   if (statsMsgsList) delete [] statsMsgsList;
100   if (statsDataList) delete [] statsDataList;
101   if (neighbor_pes)  delete [] neighbor_pes;
102   if (mig_msgs)      delete [] mig_msgs;
103 #endif
104 }
105
106 void WSLB::FindNeighbors()
107 {
108   if (neighbor_pes == 0) { // Neighbors never initialized, so init them
109                            // and other things that depend on the number
110                            // of neighbors
111     int maxneighbors = topo->max_neighbors();
112     statsMsgsList = new WSLBStatsMsg*[maxneighbors];
113     for(int i=0; i < maxneighbors; i++)
114       statsMsgsList[i] = 0;
115     statsDataList = new LDStats[maxneighbors];
116
117     neighbor_pes = new int[maxneighbors];
118     topo->neighbors(CkMyPe(), neighbor_pes, mig_msgs_expected);
119     mig_msgs = new LBMigrateMsg*[mig_msgs_expected];
120   }
121
122 }
123
124 void WSLB::AtSync()
125 {
126 #if CMK_LBDB_ON
127   //  CkPrintf("[%d] WSLB At Sync step %d!!!!\n",CkMyPe(),mystep);
128
129   if (CkMyPe() == 0) {
130     start_lb_time = CkWallTimer();
131     CkPrintf("Load balancing step %d starting at %f\n",
132              step(),start_lb_time);
133   }
134
135   if (neighbor_pes == 0) FindNeighbors();
136
137   if (!QueryBalanceNow(step()) || mig_msgs_expected == 0) {
138     MigrationDone();
139     return;
140   }
141
142   WSLBStatsMsg* msg = AssembleStats();
143
144   thisProxy.ReceiveStats(msg,mig_msgs_expected,neighbor_pes);
145
146   // Tell our own node that we are ready
147   ReceiveStats((WSLBStatsMsg*)0);
148 #endif
149 }
150
151 WSLBStatsMsg* WSLB::AssembleStats()
152 {
153 #if CMK_LBDB_ON
154   // Get stats
155   theLbdb->TotalTime(&myStats.total_walltime,&myStats.total_cputime);
156   theLbdb->IdleTime(&myStats.idletime);
157   theLbdb->BackgroundLoad(&myStats.bg_walltime,&myStats.bg_cputime);
158   myStats.obj_data_sz = theLbdb->GetObjDataSz();
159   myStats.objData = new LDObjData[myStats.obj_data_sz];
160   theLbdb->GetObjData(myStats.objData);
161
162   myStats.comm_data_sz = theLbdb->GetCommDataSz();
163   myStats.commData = new LDCommData[myStats.comm_data_sz];
164   theLbdb->GetCommData(myStats.commData);
165
166   myStats.obj_walltime = myStats.obj_cputime = 0;
167   for(int i=0; i < myStats.obj_data_sz; i++) {
168     myStats.obj_walltime += myStats.objData[i].wallTime;
169     myStats.obj_cputime += myStats.objData[i].cpuTime;
170   }    
171
172   WSLBStatsMsg* msg = new WSLBStatsMsg;
173
174   // Calculate usage percentage
175   double myload = myStats.total_walltime - myStats.idletime;
176   double myusage;
177 //   for(i=0; i < myStats.obj_data_sz; i++) {
178 //     myobjcpu += myStats.objData[i].cpuTime;
179 //     myobjwall += myStats.objData[i].wallTime;
180 //   }
181 //   if (myobjwall > 0)
182 //     myusage = myobjcpu / myobjwall;
183 //   else
184
185   if (myload > 0)
186     myusage = myStats.total_cputime / myload;
187   else myusage = 1.0;
188   // Apply proportional-integral control on usage changes
189   const double usage_err = myusage - usage;
190   usage_int_err += usage_err;
191   usage += usage_err * 0.1 + usage_int_err * 0.01;
192   //  CkPrintf("[%d] Usage err = %f %f\n",CkMyPe(),usage_err,usage_int_err);
193  
194   // Allow usage to decrease quickly, but increase slowly
195   //   if (myusage > usage)
196   //     usage += (myusage-usage) * 0.1;
197   //   else usage = myusage;
198  
199
200   //  CkPrintf("PE %d myload = %f myusage = %f usage = %f\n",
201   //       CkMyPe(),myload,myusage,usage);
202
203   msg->from_pe = CkMyPe();
204   // msg->serial = rand();
205   msg->serial = CrnRand();
206   msg->proc_speed = myStats.proc_speed;
207   msg->total_walltime = myStats.total_walltime;
208   msg->total_cputime = myStats.total_cputime;
209   msg->idletime = myStats.idletime;
210   msg->bg_walltime = myStats.bg_walltime;
211   msg->bg_cputime = myStats.bg_cputime;
212   msg->obj_walltime = myStats.obj_walltime;
213   msg->obj_cputime = myStats.obj_cputime;
214   msg->vacate_me = vacate;
215   msg->usage = usage;
216
217   if (_lb_args.debug()) {
218     CkPrintf(
219       "Proc %d speed=%d Total(wall,cpu)=%f %f Idle=%f Bg=%f %f Obj=%f %f\n",
220       CkMyPe(),msg->proc_speed,msg->total_walltime,msg->total_cputime,
221       msg->idletime,msg->bg_walltime,msg->bg_cputime,
222       msg->obj_walltime,msg->obj_cputime);
223   }
224
225   //  CkPrintf("PE %d sending %d to ReceiveStats %d objs, %d comm\n",
226   //       CkMyPe(),msg->serial,msg->n_objs,msg->n_comm);
227   return msg;
228 #else
229   return NULL;
230 #endif
231 }
232
233 void WSLB::Migrated(LDObjHandle h, int waitBarrier)
234 {
235 #if CMK_LBDB_ON
236   migrates_completed++;
237   //  CkPrintf("[%d] An object migrated! %d %d\n",
238   //       CkMyPe(),migrates_completed,migrates_expected);
239   if (migrates_completed == migrates_expected) {
240     MigrationDone();
241   }
242 #endif
243 }
244
245 void WSLB::ReceiveStats(WSLBStatsMsg *m)
246 {
247 #if CMK_LBDB_ON
248   if (neighbor_pes == 0) FindNeighbors();
249
250   if (m == 0) { // This is from our own node
251     receive_stats_ready = 1;
252   } else {
253     const int pe = m->from_pe;
254     //  CkPrintf("Stats msg received, %d %d %d %d %p\n",
255     //             pe,stats_msg_count,m->n_objs,m->serial,m);
256     int peslot = -1;
257     for(int i=0; i < mig_msgs_expected; i++) {
258       if (pe == neighbor_pes[i]) {
259         peslot = i;
260         break;
261       }
262     }
263     if (peslot == -1 || statsMsgsList[peslot] != 0) {
264       CkPrintf("*** Unexpected WSLBStatsMsg in ReceiveStats from PE %d ***\n",
265                pe);
266     } else {
267       statsMsgsList[peslot] = m;
268       statsDataList[peslot].from_pe = m->from_pe;
269       statsDataList[peslot].total_walltime = m->total_walltime;
270       statsDataList[peslot].total_cputime = m->total_cputime;
271       statsDataList[peslot].idletime = m->idletime;
272       statsDataList[peslot].bg_walltime = m->bg_walltime;
273       statsDataList[peslot].bg_cputime = m->bg_cputime;
274       statsDataList[peslot].proc_speed = m->proc_speed;
275       statsDataList[peslot].obj_walltime = m->obj_walltime;
276       statsDataList[peslot].obj_cputime = m->obj_cputime;
277       statsDataList[peslot].vacate_me = m->vacate_me;
278       statsDataList[peslot].usage = m->usage;
279       stats_msg_count++;
280     }
281   }
282
283   const int clients = mig_msgs_expected;
284   if (stats_msg_count == clients && receive_stats_ready) {
285     double strat_start_time = CkWallTimer();
286     receive_stats_ready = 0;
287     LBMigrateMsg* migrateMsg = Strategy(statsDataList,clients);
288
289     int i;
290
291     // Migrate messages from me to elsewhere
292     for(i=0; i < migrateMsg->n_moves; i++) {
293       MigrateInfo& move = migrateMsg->moves[i];
294       const int me = CkMyPe();
295       if (move.from_pe == me && move.to_pe != me) {
296         theLbdb->Migrate(move.obj,move.to_pe);
297       } else if (move.from_pe != me) {
298         CkPrintf("[%d] error, strategy wants to move from %d to  %d\n",
299                  me,move.from_pe,move.to_pe);
300       }
301     }
302     
303     // Now, send migrate messages to neighbors
304     thisProxy.ReceiveMigration(migrateMsg,mig_msgs_expected,neighbor_pes);
305     
306     // Zero out data structures for next cycle
307     for(i=0; i < clients; i++) {
308       delete statsMsgsList[i];
309       statsMsgsList[i]=0;
310     }
311     stats_msg_count=0;
312
313     theLbdb->ClearLoads();
314     if (CkMyPe() == 0) {
315       double strat_end_time = CkWallTimer();
316       CkPrintf("Strat elapsed time %f\n",strat_end_time-strat_start_time);
317     }
318   }
319 #endif  
320 }
321
322 void WSLB::ReceiveMigration(LBMigrateMsg *msg)
323 {
324 #if CMK_LBDB_ON
325   if (neighbor_pes == 0) FindNeighbors();
326
327   if (mig_msgs_received == 0) migrates_expected = 0;
328
329   mig_msgs[mig_msgs_received] = msg;
330   mig_msgs_received++;
331   //  CkPrintf("[%d] Received migration msg %d of %d\n",
332   //       CkMyPe(),mig_msgs_received,mig_msgs_expected);
333
334   if (mig_msgs_received > mig_msgs_expected) {
335     CkPrintf("[%d] WSLB Error! Too many migration messages received\n",
336              CkMyPe());
337   }
338
339   if (mig_msgs_received != mig_msgs_expected) {
340     return;
341   }
342
343   //  CkPrintf("[%d] in ReceiveMigration %d moves\n",CkMyPe(),msg->n_moves);
344   for(int neigh=0; neigh < mig_msgs_received;neigh++) {
345     LBMigrateMsg* m = mig_msgs[neigh];
346     for(int i=0; i < m->n_moves; i++) {
347       MigrateInfo& move = m->moves[i];
348       const int me = CkMyPe();
349       if (move.from_pe != me && move.to_pe == me) {
350         migrates_expected++;
351       }
352     }
353     delete m;
354     mig_msgs[neigh]=0;
355   }
356   //  CkPrintf("[%d] in ReceiveMigration %d expected\n",
357   //       CkMyPe(),migrates_expected);
358   mig_msgs_received = 0;
359   if (migrates_expected == 0 || migrates_expected == migrates_completed)
360     MigrationDone();
361 #endif
362 }
363
364
365 void WSLB::MigrationDone()
366 {
367 #if CMK_LBDB_ON
368   if (CkMyPe() == 0) {
369     double end_lb_time = CkWallTimer();
370     CkPrintf("Load balancing step %d finished at %f duration %f\n",
371              step(),end_lb_time,end_lb_time - start_lb_time);
372   }
373   migrates_completed = 0;
374   migrates_expected = -1;
375   // Increment to next step
376   mystep++;
377   thisProxy [CkMyPe()].ResumeClients();
378 #endif
379 }
380
381 void WSLB::ResumeClients()
382 {
383 #if CMK_LBDB_ON
384   theLbdb->ResumeClients();
385 #endif
386 }
387
388 CmiBool WSLB::QueryBalanceNow(int step)
389 {
390 #if CMK_LBDB_ON
391   double now = CkWallTimer();
392
393   if (step==0)
394     first_step_time = now;
395   else if (CkMyPe() == VACATE_PROC && now > VACATE_AFTER
396            && now < (VACATE_AFTER+UNVACATE_AFTER)) {
397     if (vacate == CmiFalse) 
398       CkPrintf("PE %d vacating at %f\n",CkMyPe(),now);
399     vacate = CmiTrue;
400   } else {
401     if (vacate == CmiTrue)
402       CkPrintf("PE %d unvacating at %f\n",CkMyPe(),now);
403     vacate = CmiFalse;
404   }
405 #endif
406   return CmiTrue;
407 }
408
409 LBMigrateMsg* WSLB::Strategy(WSLB::LDStats* stats, int count)
410 {
411 #if CMK_LBDB_ON
412   //  CkPrintf("[%d] Strategy starting\n",CkMyPe());
413   // Compute the average load to see if we are overloaded relative
414   // to our neighbors
415   const double load_factor = 1.05;
416   double objload;
417
418   double myload = myStats.total_walltime - myStats.idletime;
419   double avgload = myload;
420   int unvacated_neighbors = 0;
421   int i;
422   for(i=0; i < count; i++) {
423     // If the neighbor is vacating, skip him
424     if (stats[i].vacate_me)
425       continue;
426
427     // Scale times we need appropriately for relative proc speeds
428     double hisload = stats[i].total_walltime - stats[i].idletime;
429     const double hisusage = stats[i].usage;
430
431     const double scale =  (myStats.proc_speed * usage) 
432       / (stats[i].proc_speed * hisusage);
433
434     hisload *= scale;
435     stats[i].total_walltime *= scale;
436     stats[i].idletime *= scale;
437
438     //    CkPrintf("PE %d %d hisload = %f hisusage = %f\n",
439     //       CkMyPe(),i,hisload,hisusage);
440     avgload += hisload;
441     unvacated_neighbors++;
442   }
443   if (vacate && unvacated_neighbors == 0)
444     CkPrintf("[%d] ALL NEIGHBORS WANT TO VACATE!!!\n",CkMyPe());
445
446   avgload /= (unvacated_neighbors+1);
447
448   CkVec<MigrateInfo*> migrateInfo;
449
450   // If we want to vacate, we always dump our load, otherwise
451   // only if we are overloaded
452
453   if (vacate || myload > avgload) {
454     //    CkPrintf("[%d] OVERLOAD My load is %f, average load is %f\n",
455     //       CkMyPe(),myload,avgload);
456
457     // First, build heaps of other processors and my objects
458     // Then assign objects to other processors until either
459     //   - The smallest remaining object would put me below average, or
460     //   - I only have 1 object left, or
461     //   - The smallest remaining object would put someone else 
462     //     above average
463
464     // Build heaps
465     minHeap procs(count);
466     for(i=0; i < count; i++) {
467       // If all my neighbors vacate, I won't have anyone to give work 
468       // to
469       if (!stats[i].vacate_me) {
470         InfoRecord* item = new InfoRecord;
471         item->load = stats[i].total_walltime - stats[i].idletime;
472         item->Id =  stats[i].from_pe;
473         procs.insert(item);
474       }
475     }
476       
477     maxHeap objs(myStats.obj_data_sz);
478     for(i=0; i < myStats.obj_data_sz; i++) {
479       InfoRecord* item = new InfoRecord;
480       item->load = myStats.objData[i].wallTime;
481       item->Id = i;
482       objs.insert(item);
483     }
484
485     int objs_here = myStats.obj_data_sz;
486     do {
487       //      if (objs_here <= 1) break;  // For now, always leave 1 object
488
489       InfoRecord* p;
490       InfoRecord* obj;
491
492       // Get the lightest-loaded processor
493       p = procs.deleteMin();
494       if (p == 0) {
495         //      CkPrintf("[%d] No destination PE found!\n",CkMyPe());
496         break;
497       }
498
499       // Get the biggest object
500       CmiBool objfound = CmiFalse;
501       do {
502         obj = objs.deleteMax();
503         if (obj == 0) break;
504
505         objload = load_factor * obj->load;
506
507         double new_p_load = p->load + objload;
508         double my_new_load = myload - objload;
509
510         // If we're vacating, the biggest object is always good.
511         // Otherwise, only take it if it doesn't produce overload
512         if (vacate || new_p_load < my_new_load) {
513           objfound = CmiTrue;
514         } else {
515           // This object is too big, so throw it away
516 //        CkPrintf("[%d] Can't move object w/ load %f to proc %d load %f %f\n",
517 //                 CkMyPe(),obj->load,p->Id,p->load,avgload);
518           delete obj;
519         }
520       } while (!objfound);
521
522       if (!objfound) {
523         //      CkPrintf("[%d] No suitable object found!\n",CkMyPe());
524         break;
525       }
526
527       const int me = CkMyPe();
528       // Apparently we can give this object to this processor
529       if (_lb_args.debug())
530       CkPrintf("[%d] Obj %d of %d migrating from %d to %d\n",
531                CkMyPe(),obj->Id,myStats.obj_data_sz,me,p->Id);
532
533       MigrateInfo* migrateMe = new MigrateInfo;
534       migrateMe->obj = myStats.objData[obj->Id].handle;
535       migrateMe->from_pe = me;
536       migrateMe->to_pe = p->Id;
537       migrateInfo.insertAtEnd(migrateMe);
538
539       objs_here--;
540       
541       // We may want to assign more to this processor, so lets
542       // update it and put it back in the heap
543       p->load += objload;
544       myload -= objload;
545       procs.insert(p);
546       
547       // This object is assigned, so we delete it from the heap
548       delete obj;
549
550     } while(vacate || myload > avgload);
551
552     // Now empty out the heaps
553     InfoRecord* p;
554     while (NULL!=(p=procs.deleteMin()))
555       delete p;
556     InfoRecord* obj;
557     while (NULL!=(obj=objs.deleteMax()))
558       delete obj;
559   }  
560
561   // Now build the message to actually perform the migrations
562   int migrate_count=migrateInfo.length();
563   //  if (migrate_count) {
564   //    CkPrintf("PE %d: Sent away %d of %d objects\n",
565   //         CkMyPe(),migrate_count,myStats.obj_data_sz);
566   //  }
567   LBMigrateMsg* msg = new(migrate_count,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
568   msg->n_moves = migrate_count;
569   for(i=0; i < migrate_count; i++) {
570     MigrateInfo* item = (MigrateInfo*) migrateInfo[i];
571     msg->moves[i] = *item;
572     delete item;
573     migrateInfo[i] = 0;
574   }
575
576   return msg;
577 #else
578   return NULL;
579 #endif
580 }
581
582 #include "WSLB.def.h"
583
584
585 /*@}*/