6c3771cbd9182b3291f313060ae4a23ac975d33d
[charm.git] / src / ck-ldb / WSLB.C
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 #ifndef  WIN32
9 #include <unistd.h>
10 #endif
11 #include <charm++.h>
12 #include <LBDatabase.h>
13 #include <cklists.h>
14 #include "heap.h"
15 #include "WSLB.h"
16
17 CkGroupID wslb;
18
19 #if CMK_LBDB_ON
20
21 // Temporary vacating flags
22 // Set PROC to -1 to disable
23
24 #define VACATE_PROC -1
25 //#define VACATE_PROC (CkNumPes()/2)
26 #define VACATE_AFTER 30
27 #define UNVACATE_AFTER 15
28
29 void CreateWSLB()
30 {
31   wslb = CProxy_WSLB::ckNew();
32 }
33
34 void WSLB::staticMigrated(void* data, LDObjHandle h)
35 {
36   WSLB *me = (WSLB*)(data);
37
38   me->Migrated(h);
39 }
40
41 void WSLB::staticAtSync(void* data)
42 {
43   WSLB *me = (WSLB*)(data);
44
45   me->AtSync();
46 }
47
48 WSLB::WSLB()  :thisproxy(thisgroup)
49 {
50   if (CkMyPe() == 0)
51     CkPrintf("[%d] WSLB created\n",CkMyPe());
52
53   mystep = 0;
54   theLbdb = CProxy_LBDatabase(lbdb).ckLocalBranch();
55   theLbdb->
56     AddLocalBarrierReceiver((LDBarrierFn)(staticAtSync),(void*)(this));
57   theLbdb->
58     NotifyMigrated((LDMigratedFn)(staticMigrated),(void*)(this));
59
60
61   // I had to move neighbor initialization outside the constructor
62   // in order to get the virtual functions of any derived classes
63   // so I'll just set them to illegal values here.
64   neighbor_pes = 0;
65   stats_msg_count = 0;
66   statsMsgsList = 0;
67   statsDataList = 0;
68   migrates_completed = 0;
69   migrates_expected = -1;
70   mig_msgs_received = 0;
71   mig_msgs = 0;
72
73   myStats.proc_speed = theLbdb->ProcessorSpeed();
74 //  char hostname[80];
75 //  gethostname(hostname,79);
76 //  CkPrintf("[%d] host %s speed %d\n",CkMyPe(),hostname,myStats.proc_speed);
77   myStats.obj_data_sz = 0;
78   myStats.comm_data_sz = 0;
79   receive_stats_ready = 0;
80
81   vacate = CmiFalse;
82   usage = 1.0;
83   usage_int_err = 0.;
84
85   theLbdb->CollectStatsOn();
86 }
87
88 WSLB::~WSLB()
89 {
90   CkPrintf("Going away\n");
91 }
92
93 void WSLB::FindNeighbors()
94 {
95   if (neighbor_pes == 0) { // Neighbors never initialized, so init them
96                            // and other things that depend on the number
97                            // of neighbors
98     statsMsgsList = new WSLBStatsMsg*[num_neighbors()];
99     for(int i=0; i < num_neighbors(); i++)
100       statsMsgsList[i] = 0;
101     statsDataList = new LDStats[num_neighbors()];
102
103     neighbor_pes = new int[num_neighbors()];
104     neighbors(neighbor_pes);
105     mig_msgs_expected = num_neighbors();
106     mig_msgs = new WSLBMigrateMsg*[num_neighbors()];
107   }
108
109 }
110
111 void WSLB::AtSync()
112 {
113   //  CkPrintf("[%d] WSLB At Sync step %d!!!!\n",CkMyPe(),mystep);
114
115   if (CkMyPe() == 0) {
116     start_lb_time = CmiWallTimer();
117     CkPrintf("Load balancing step %d starting at %f\n",
118              step(),start_lb_time);
119   }
120
121   if (neighbor_pes == 0) FindNeighbors();
122
123   if (!QueryBalanceNow(step()) || num_neighbors() == 0) {
124     MigrationDone();
125     return;
126   }
127
128   WSLBStatsMsg* msg = AssembleStats();
129
130   thisproxy.ReceiveStats(msg,num_neighbors(),neighbor_pes);
131
132   // Tell our own node that we are ready
133   ReceiveStats((WSLBStatsMsg*)0);
134 }
135
136 WSLBStatsMsg* WSLB::AssembleStats()
137 {
138   // Get stats
139   theLbdb->TotalTime(&myStats.total_walltime,&myStats.total_cputime);
140   theLbdb->IdleTime(&myStats.idletime);
141   theLbdb->BackgroundLoad(&myStats.bg_walltime,&myStats.bg_cputime);
142   myStats.obj_data_sz = theLbdb->GetObjDataSz();
143   myStats.objData = new LDObjData[myStats.obj_data_sz];
144   theLbdb->GetObjData(myStats.objData);
145
146   myStats.comm_data_sz = theLbdb->GetCommDataSz();
147   myStats.commData = new LDCommData[myStats.comm_data_sz];
148   theLbdb->GetCommData(myStats.commData);
149
150   myStats.obj_walltime = myStats.obj_cputime = 0;
151   for(int i=0; i < myStats.obj_data_sz; i++) {
152     myStats.obj_walltime += myStats.objData[i].wallTime;
153     myStats.obj_cputime += myStats.objData[i].cpuTime;
154   }    
155
156   WSLBStatsMsg* msg = new WSLBStatsMsg;
157
158   // Calculate usage percentage
159   double myload = myStats.total_walltime - myStats.idletime;
160   double myusage;
161 //   for(i=0; i < myStats.obj_data_sz; i++) {
162 //     myobjcpu += myStats.objData[i].cpuTime;
163 //     myobjwall += myStats.objData[i].wallTime;
164 //   }
165 //   if (myobjwall > 0)
166 //     myusage = myobjcpu / myobjwall;
167 //   else
168
169   if (myload > 0)
170     myusage = myStats.total_cputime / myload;
171   else myusage = 1.0;
172   // Apply proportional-integral control on usage changes
173   const double usage_err = myusage - usage;
174   usage_int_err += usage_err;
175   usage += usage_err * 0.1 + usage_int_err * 0.01;
176   //  CkPrintf("[%d] Usage err = %f %f\n",CkMyPe(),usage_err,usage_int_err);
177  
178   // Allow usage to decrease quickly, but increase slowly
179   //   if (myusage > usage)
180   //     usage += (myusage-usage) * 0.1;
181   //   else usage = myusage;
182  
183
184   //  CkPrintf("PE %d myload = %f myusage = %f usage = %f\n",
185   //       CkMyPe(),myload,myusage,usage);
186
187   msg->from_pe = CkMyPe();
188   // msg->serial = rand();
189   msg->serial = CrnRand();
190   msg->proc_speed = myStats.proc_speed;
191   msg->total_walltime = myStats.total_walltime;
192   msg->total_cputime = myStats.total_cputime;
193   msg->idletime = myStats.idletime;
194   msg->bg_walltime = myStats.bg_walltime;
195   msg->bg_cputime = myStats.bg_cputime;
196   msg->obj_walltime = myStats.obj_walltime;
197   msg->obj_cputime = myStats.obj_cputime;
198   msg->vacate_me = vacate;
199   msg->usage = usage;
200
201   //  CkPrintf(
202   //    "Proc %d speed=%d Total(wall,cpu)=%f %f Idle=%f Bg=%f %f Obj=%f %f\n",
203   //    CkMyPe(),msg->proc_speed,msg->total_walltime,msg->total_cputime,
204   //    msg->idletime,msg->bg_walltime,msg->bg_cputime,
205   //    msg->obj_walltime,msg->obj_cputime);
206
207   //  CkPrintf("PE %d sending %d to ReceiveStats %d objs, %d comm\n",
208   //       CkMyPe(),msg->serial,msg->n_objs,msg->n_comm);
209   return msg;
210 }
211
212 void WSLB::Migrated(LDObjHandle h)
213 {
214   migrates_completed++;
215   //  CkPrintf("[%d] An object migrated! %d %d\n",
216   //       CkMyPe(),migrates_completed,migrates_expected);
217   if (migrates_completed == migrates_expected) {
218     MigrationDone();
219   }
220 }
221
222 void WSLB::ReceiveStats(WSLBStatsMsg *m)
223 {
224   if (neighbor_pes == 0) FindNeighbors();
225
226   if (m == 0) { // This is from our own node
227     receive_stats_ready = 1;
228   } else {
229     const int pe = m->from_pe;
230     //  CkPrintf("Stats msg received, %d %d %d %d %p\n",
231     //             pe,stats_msg_count,m->n_objs,m->serial,m);
232     int peslot = -1;
233     for(int i=0; i < num_neighbors(); i++) {
234       if (pe == neighbor_pes[i]) {
235         peslot = i;
236         break;
237       }
238     }
239     if (peslot == -1 || statsMsgsList[peslot] != 0) {
240       CkPrintf("*** Unexpected WSLBStatsMsg in ReceiveStats from PE %d ***\n",
241                pe);
242     } else {
243       statsMsgsList[peslot] = m;
244       statsDataList[peslot].from_pe = m->from_pe;
245       statsDataList[peslot].total_walltime = m->total_walltime;
246       statsDataList[peslot].total_cputime = m->total_cputime;
247       statsDataList[peslot].idletime = m->idletime;
248       statsDataList[peslot].bg_walltime = m->bg_walltime;
249       statsDataList[peslot].bg_cputime = m->bg_cputime;
250       statsDataList[peslot].proc_speed = m->proc_speed;
251       statsDataList[peslot].obj_walltime = m->obj_walltime;
252       statsDataList[peslot].obj_cputime = m->obj_cputime;
253       statsDataList[peslot].vacate_me = m->vacate_me;
254       statsDataList[peslot].usage = m->usage;
255       stats_msg_count++;
256     }
257   }
258
259   const int clients = num_neighbors();
260   if (stats_msg_count == clients && receive_stats_ready) {
261     double strat_start_time = CmiWallTimer();
262     receive_stats_ready = 0;
263     WSLBMigrateMsg* migrateMsg = Strategy(statsDataList,clients);
264
265     int i;
266
267     // Migrate messages from me to elsewhere
268     for(i=0; i < migrateMsg->n_moves; i++) {
269       MigrateInfo& move = migrateMsg->moves[i];
270       const int me = CkMyPe();
271       if (move.from_pe == me && move.to_pe != me) {
272         theLbdb->Migrate(move.obj,move.to_pe);
273       } else if (move.from_pe != me) {
274         CkPrintf("[%d] error, strategy wants to move from %d to  %d\n",
275                  me,move.from_pe,move.to_pe);
276       }
277     }
278     
279     // Now, send migrate messages to neighbors
280     thisproxy.ReceiveMigration(migrateMsg,
281         num_neighbors(),neighbor_pes);
282     
283     // Zero out data structures for next cycle
284     for(i=0; i < clients; i++) {
285       delete statsMsgsList[i];
286       statsMsgsList[i]=0;
287     }
288     stats_msg_count=0;
289
290     theLbdb->ClearLoads();
291     if (CkMyPe() == 0) {
292       double strat_end_time = CmiWallTimer();
293       CkPrintf("Strat elapsed time %f\n",strat_end_time-strat_start_time);
294     }
295   }
296   
297 }
298
299 void WSLB::ReceiveMigration(WSLBMigrateMsg *msg)
300 {
301   if (neighbor_pes == 0) FindNeighbors();
302
303   if (mig_msgs_received == 0) migrates_expected = 0;
304
305   mig_msgs[mig_msgs_received] = msg;
306   mig_msgs_received++;
307   //  CkPrintf("[%d] Received migration msg %d of %d\n",
308   //       CkMyPe(),mig_msgs_received,mig_msgs_expected);
309
310   if (mig_msgs_received > mig_msgs_expected) {
311     CkPrintf("[%d] WSLB Error! Too many migration messages received\n",
312              CkMyPe());
313   }
314
315   if (mig_msgs_received != mig_msgs_expected) {
316     return;
317   }
318
319   //  CkPrintf("[%d] in ReceiveMigration %d moves\n",CkMyPe(),msg->n_moves);
320   for(int neigh=0; neigh < mig_msgs_received;neigh++) {
321     WSLBMigrateMsg* m = mig_msgs[neigh];
322     for(int i=0; i < m->n_moves; i++) {
323       MigrateInfo& move = m->moves[i];
324       const int me = CkMyPe();
325       if (move.from_pe != me && move.to_pe == me) {
326         migrates_expected++;
327       }
328     }
329     delete m;
330     mig_msgs[neigh]=0;
331   }
332   //  CkPrintf("[%d] in ReceiveMigration %d expected\n",
333   //       CkMyPe(),migrates_expected);
334   mig_msgs_received = 0;
335   if (migrates_expected == 0 || migrates_expected == migrates_completed)
336     MigrationDone();
337 }
338
339
340 void WSLB::MigrationDone()
341 {
342   if (CkMyPe() == 0) {
343     double end_lb_time = CmiWallTimer();
344     CkPrintf("Load balancing step %d finished at %f duration %f\n",
345              step(),end_lb_time,end_lb_time - start_lb_time);
346   }
347   migrates_completed = 0;
348   migrates_expected = -1;
349   // Increment to next step
350   mystep++;
351   thisproxy [CkMyPe()].ResumeClients();
352 }
353
354 void WSLB::ResumeClients()
355 {
356   theLbdb->ResumeClients();
357 }
358
359 CmiBool WSLB::QueryBalanceNow(int step)
360 {
361   double now = CmiWallTimer();
362
363   if (step==0)
364     first_step_time = now;
365   else if (CkMyPe() == VACATE_PROC && now > VACATE_AFTER
366            && now < (VACATE_AFTER+UNVACATE_AFTER)) {
367     if (vacate == CmiFalse) 
368       CkPrintf("PE %d vacating at %f\n",CkMyPe(),now);
369     vacate = CmiTrue;
370   } else {
371     if (vacate == CmiTrue)
372       CkPrintf("PE %d unvacating at %f\n",CkMyPe(),now);
373     vacate = CmiFalse;
374   }
375
376   return CmiTrue;
377 }
378
379 WSLBMigrateMsg* WSLB::Strategy(WSLB::LDStats* stats, int count)
380 {
381   //  CkPrintf("[%d] Strategy starting\n",CkMyPe());
382   // Compute the average load to see if we are overloaded relative
383   // to our neighbors
384   const double load_factor = 1.05;
385   double objload;
386
387   double myload = myStats.total_walltime - myStats.idletime;
388   double avgload = myload;
389   int unvacated_neighbors = 0;
390   int i;
391   for(i=0; i < count; i++) {
392     // If the neighbor is vacating, skip him
393     if (stats[i].vacate_me)
394       continue;
395
396     // Scale times we need appropriately for relative proc speeds
397     double hisload = stats[i].total_walltime - stats[i].idletime;
398     const double hisusage = stats[i].usage;
399
400     const double scale =  (myStats.proc_speed * usage) 
401       / (stats[i].proc_speed * hisusage);
402
403     hisload *= scale;
404     stats[i].total_walltime *= scale;
405     stats[i].idletime *= scale;
406
407     //    CkPrintf("PE %d %d hisload = %f hisusage = %f\n",
408     //       CkMyPe(),i,hisload,hisusage);
409     avgload += hisload;
410     unvacated_neighbors++;
411   }
412   if (vacate && unvacated_neighbors == 0)
413     CkPrintf("[%d] ALL NEIGHBORS WANT TO VACATE!!!\n",CkMyPe());
414
415   avgload /= (unvacated_neighbors+1);
416
417   CkVec<MigrateInfo*> migrateInfo;
418
419   // If we want to vacate, we always dump our load, otherwise
420   // only if we are overloaded
421
422   if (vacate || myload > avgload) {
423     //    CkPrintf("[%d] OVERLOAD My load is %f, average load is %f\n",
424     //       CkMyPe(),myload,avgload);
425
426     // First, build heaps of other processors and my objects
427     // Then assign objects to other processors until either
428     //   - The smallest remaining object would put me below average, or
429     //   - I only have 1 object left, or
430     //   - The smallest remaining object would put someone else 
431     //     above average
432
433     // Build heaps
434     minHeap procs(count);
435     for(i=0; i < count; i++) {
436       // If all my neighbors vacate, I won't have anyone to give work 
437       // to
438       if (!stats[i].vacate_me) {
439         InfoRecord* item = new InfoRecord;
440         item->load = stats[i].total_walltime - stats[i].idletime;
441         item->Id =  stats[i].from_pe;
442         procs.insert(item);
443       }
444     }
445       
446     maxHeap objs(myStats.obj_data_sz);
447     for(i=0; i < myStats.obj_data_sz; i++) {
448       InfoRecord* item = new InfoRecord;
449       item->load = myStats.objData[i].wallTime;
450       item->Id = i;
451       objs.insert(item);
452     }
453
454     int objs_here = myStats.obj_data_sz;
455     do {
456       //      if (objs_here <= 1) break;  // For now, always leave 1 object
457
458       InfoRecord* p;
459       InfoRecord* obj;
460
461       // Get the lightest-loaded processor
462       p = procs.deleteMin();
463       if (p == 0) {
464         //      CkPrintf("[%d] No destination PE found!\n",CkMyPe());
465         break;
466       }
467
468       // Get the biggest object
469       CmiBool objfound = CmiFalse;
470       do {
471         obj = objs.deleteMax();
472         if (obj == 0) break;
473
474         objload = load_factor * obj->load;
475
476         double new_p_load = p->load + objload;
477         double my_new_load = myload - objload;
478
479         // If we're vacating, the biggest object is always good.
480         // Otherwise, only take it if it doesn't produce overload
481         if (vacate || new_p_load < my_new_load) {
482           objfound = CmiTrue;
483         } else {
484           // This object is too big, so throw it away
485 //        CkPrintf("[%d] Can't move object w/ load %f to proc %d load %f %f\n",
486 //                 CkMyPe(),obj->load,p->Id,p->load,avgload);
487           delete obj;
488         }
489       } while (!objfound);
490
491       if (!objfound) {
492         //      CkPrintf("[%d] No suitable object found!\n",CkMyPe());
493         break;
494       }
495
496       const int me = CkMyPe();
497       // Apparently we can give this object to this processor
498       CkPrintf("[%d] Obj %d of %d migrating from %d to %d\n",
499                CkMyPe(),obj->Id,myStats.obj_data_sz,me,p->Id);
500
501       MigrateInfo* migrateMe = new MigrateInfo;
502       migrateMe->obj = myStats.objData[obj->Id].handle;
503       migrateMe->from_pe = me;
504       migrateMe->to_pe = p->Id;
505       migrateInfo.insertAtEnd(migrateMe);
506
507       objs_here--;
508       
509       // We may want to assign more to this processor, so lets
510       // update it and put it back in the heap
511       p->load += objload;
512       myload -= objload;
513       procs.insert(p);
514       
515       // This object is assigned, so we delete it from the heap
516       delete obj;
517
518     } while(vacate || myload > avgload);
519
520     // Now empty out the heaps
521     InfoRecord* p;
522     while (p=procs.deleteMin())
523       delete p;
524     InfoRecord* obj;
525     while (obj=objs.deleteMax())
526       delete obj;
527   }  
528
529   // Now build the message to actually perform the migrations
530   int migrate_count=migrateInfo.length();
531   //  if (migrate_count) {
532   //    CkPrintf("PE %d: Sent away %d of %d objects\n",
533   //         CkMyPe(),migrate_count,myStats.obj_data_sz);
534   //  }
535   WSLBMigrateMsg* msg = new(&migrate_count,1) WSLBMigrateMsg;
536   msg->n_moves = migrate_count;
537   for(i=0; i < migrate_count; i++) {
538     MigrateInfo* item = (MigrateInfo*) migrateInfo[i];
539     msg->moves[i] = *item;
540     delete item;
541     migrateInfo[i] = 0;
542   }
543
544   return msg;
545 };
546
547 void* WSLBMigrateMsg::alloc(int msgnum, size_t size, int* array, int priobits)
548 {
549   int totalsize = size + array[0] * sizeof(WSLB::MigrateInfo);
550
551   WSLBMigrateMsg* ret =
552     (WSLBMigrateMsg*)(CkAllocMsg(msgnum,totalsize,priobits));
553
554   ret->moves = (WSLB::MigrateInfo*) ((char*)(ret)+ size);
555
556   return (void*)(ret);
557 }
558
559 void* WSLBMigrateMsg::pack(WSLBMigrateMsg* m)
560 {
561   m->moves = (WSLB::MigrateInfo*)((char*)(m->moves) - (char*)(&m->moves));
562
563   return (void*)(m);
564 }
565
566 WSLBMigrateMsg* WSLBMigrateMsg::unpack(void *m)
567 {
568   WSLBMigrateMsg* ret_val = (WSLBMigrateMsg*)(m);
569
570   ret_val->moves = (WSLB::MigrateInfo*)
571     ((char*)(&ret_val->moves) + (size_t)(ret_val->moves));
572
573   return ret_val;
574 }
575
576 #include "WSLB.def.h"
577 #endif
578
579