ab423e45dc4244adb806feba23d516077957689d
[charm.git] / src / ck-ldb / DistributedLB.C
1 /**
2  * Author: gplkrsh2@illinois.edu (Harshitha Menon)
3  * A distributed load balancer.
4 */
5
6 #include "DistributedLB.h"
7
8 #include "elements.h"
9
10 extern int quietModeRequested;
11
12 CreateLBFunc_Def(DistributedLB, "The distributed load balancer")
13
14 using std::vector;
15
16 DistributedLB::DistributedLB(CkMigrateMessage *m) : CBase_DistributedLB(m) {
17 }
18
19 DistributedLB::DistributedLB(const CkLBOptions &opt) : CBase_DistributedLB(opt) {
20   lbname = "DistributedLB";
21   if (CkMyPe() == 0 && !quietModeRequested) {
22     CkPrintf("CharmLB> DistributedLB created: threshold %lf, max phases %i\n",
23         kTargetRatio, kMaxPhases);
24   }
25   InitLB(opt);
26 }
27
28 void DistributedLB::initnodeFn()
29 {
30   _registerCommandLineOpt("+DistLBTargetRatio");
31   _registerCommandLineOpt("+DistLBMaxPhases");
32 }
33
34 void DistributedLB::turnOn()
35 {
36 #if CMK_LBDB_ON
37   theLbdb->getLBDB()->
38     TurnOnBarrierReceiver(receiver);
39   theLbdb->getLBDB()->
40     TurnOnNotifyMigrated(notifier);
41   theLbdb->getLBDB()->
42     TurnOnStartLBFn(startLbFnHdl);
43 #endif
44 }
45
46 void DistributedLB::turnOff()
47 {
48 #if CMK_LBDB_ON
49   theLbdb->getLBDB()->
50     TurnOffBarrierReceiver(receiver);
51   theLbdb->getLBDB()->
52     TurnOffNotifyMigrated(notifier);
53   theLbdb->getLBDB()->
54     TurnOffStartLBFn(startLbFnHdl);
55 #endif
56 }
57
58 void DistributedLB::InitLB(const CkLBOptions &opt) {
59   thisProxy = CProxy_DistributedLB(thisgroup);
60   if (opt.getSeqNo() > 0 || (_lb_args.metaLbOn() && _lb_args.metaLbModelDir() != nullptr))
61     turnOff();
62
63   // Set constants
64   kUseAck = true;
65   kPartialInfoCount = -1;
66   kMaxPhases = _lb_args.maxDistPhases();
67   kTargetRatio = _lb_args.targetRatio();
68 }
69
70 void DistributedLB::Strategy(const DistBaseLB::LDStats* const stats) {
71   if (CkMyPe() == 0 && _lb_args.debug() >= 1) {
72     start_time = CmiWallTimer();
73     CkPrintf("In DistributedLB strategy at %lf\n", start_time);
74   }
75
76   // Set constants for this iteration (these depend on CkNumPes() or number of
77   // objects, so may not be constant for the entire program)
78   kMaxObjPickTrials = stats->n_objs;
79   // Maximum number of times we will try to find a PE to transfer an object
80   // successfully
81   kMaxTrials = CkNumPes();
82   // Max gossip messages sent from each PE
83   kMaxGossipMsgCount = 2 * CmiLog2(CkNumPes());
84
85   // Reset member variables for this LB iteration
86   phase_number = 0;
87   my_stats = stats;
88
89         my_load = 0.0;
90         for (int i = 0; i < my_stats->n_objs; i++) {
91                 my_load += my_stats->objData[i].wallTime; 
92   }
93   init_load = my_load;
94   b_load = my_stats->total_walltime - (my_stats->idletime + my_load);
95
96         pe_no.clear();
97         loads.clear();
98         distribution.clear();
99   lb_started = false;
100   gossip_msg_count = 0;
101   negack_count = 0;
102
103   total_migrates = 0;
104   total_migrates_ack = 0;
105
106   srand((unsigned)(CmiWallTimer()*1.0e06) + CkMyPe());
107
108   // Do a reduction to obtain load information for the system
109   CkReduction::tupleElement tupleRedn[] = {
110     CkReduction::tupleElement(sizeof(double), &my_load, CkReduction::sum_double),
111     CkReduction::tupleElement(sizeof(double), &my_load, CkReduction::max_double)
112   };
113   CkReductionMsg* msg = CkReductionMsg::buildFromTuple(tupleRedn, 2);
114   CkCallback cb(CkIndex_DistributedLB::LoadReduction(NULL), thisProxy);
115   msg->setCallback(cb);
116   contribute(msg);
117 }
118
119 /*
120 * Once the reduction callback is obtained for average load in the system, the
121 * gossiping starts. Only the underloaded processors gossip.
122 * Termination of gossip is via QD and callback is DoneGossip.
123 */
124 void DistributedLB::LoadReduction(CkReductionMsg* redn_msg) {
125   int count;
126   CkReduction::tupleElement* results;
127   redn_msg->toTuple(&results, &count);
128   delete redn_msg;
129
130   // Set the initial global load stats and print when LBDebug is on
131   avg_load = *(double*)results[0].data / CkNumPes();
132   max_load = *(double*)results[1].data;
133   load_ratio = max_load / avg_load;
134
135   if (CkMyPe() == 0 && _lb_args.debug() >= 1) {
136     CkPrintf("DistributedLB>>>Before LB: max = %lf, avg = %lf, ratio = %lf\n",
137         max_load, avg_load, load_ratio);
138   }
139
140   // If there are no overloaded processors, immediately terminate
141   if (load_ratio <= kTargetRatio) {
142     if (CkMyPe() == 0 && _lb_args.debug() >= 1) {
143       CkPrintf("DistributedLB>>>Load ratio already within the target of %lf, ending early.\n",
144           kTargetRatio);
145     }
146     PackAndSendMigrateMsgs();
147     return;
148   }
149
150   // Set transfer threshold for the gossip phase, for which only PEs lower than
151   // the target ratio are considered underloaded.
152   transfer_threshold = kTargetRatio * avg_load;
153
154   // If my load is under the acceptance threshold, then I am underloaded and
155   // can receive more work. So assuming there exists an overloaded PE that can
156   // donate work, I will start gossipping my load information.
157   if (my_load < transfer_threshold) {
158                 double r_loads[1];
159                 int r_pe_no[1];
160     r_loads[0] = my_load;
161     r_pe_no[0] = CkMyPe();
162     GossipLoadInfo(CkMyPe(), 1, r_pe_no, r_loads);
163   }
164
165   // Start quiescence detection at PE 0.
166   if (CkMyPe() == 0) {
167     CkCallback cb(CkIndex_DistributedLB::DoneGossip(), thisProxy);
168     CkStartQD(cb);
169   }
170 }
171
172 /*
173 * Gossip load information between peers. Receive the gossip message.
174 */
175 void DistributedLB::GossipLoadInfo(int from_pe, int n,
176     int remote_pe_no[], double remote_loads[]) {
177   // Placeholder temp vectors for the sorted pe and their load 
178   vector<int> p_no;
179   vector<double> l;
180
181   int i = 0;
182   int j = 0;
183   int m = pe_no.size();
184
185   // Merge (using merge sort) information received with the information at hand
186   // Since the initial list is sorted, the merging is linear in the size of the
187   // list. 
188   while (i < m && j < n) {
189     if (pe_no[i] < remote_pe_no[j]) {
190       p_no.push_back(pe_no[i]);
191       l.push_back(loads[i]);
192       i++;
193     } else {
194       p_no.push_back(remote_pe_no[j]);
195       l.push_back(remote_loads[j]);
196       if (pe_no[i] == remote_pe_no[j]) {
197         i++;
198       }
199       j++;
200     }
201   }
202
203   if (i == m && j != n) {
204     while (j < n) {
205       p_no.push_back(remote_pe_no[j]);
206       l.push_back(remote_loads[j]);
207       j++;
208     }
209   } else if (j == n && i != m) {
210     while (i < m) {
211       p_no.push_back(pe_no[i]);
212       l.push_back(loads[i]);
213       i++;
214     }
215   }
216
217   // After the merge sort, swap. Now pe_no and loads have updated information
218   pe_no.swap(p_no);
219   loads.swap(l);
220
221   SendLoadInfo();
222 }
223
224 /*
225 * Construct the gossip message and send to peers
226 */
227 void DistributedLB::SendLoadInfo() {
228   // TODO: Keep it 0.8*log
229   // This PE has already sent the maximum set threshold for gossip messages.
230   // Hence don't send out any more messages. This is to prevent flooding.
231   if (gossip_msg_count > kMaxGossipMsgCount) {
232     return;
233   }
234
235   // Pick two random neighbors to send the message to
236   int rand_nbor1;
237   int rand_nbor2 = -1;
238   do {
239     rand_nbor1 = rand() % CkNumPes();
240   } while (rand_nbor1 == CkMyPe());
241   // Pick the second neighbor which is not the same as the first one.
242   if(CkNumPes() > 2)
243     do {
244       rand_nbor2 = rand() % CkNumPes();
245     } while ((rand_nbor2 == CkMyPe()) || (rand_nbor2 == rand_nbor1));
246
247   // kPartialInfoCount indicates how much information is send in gossip. If it
248   // is set to -1, it means use all the information available.
249   int info_count = (kPartialInfoCount >= 0) ? kPartialInfoCount : pe_no.size();
250   int* p = new int[info_count];
251   double* l = new double[info_count];
252   for (int i = 0; i < info_count; i++) {
253     p[i] = pe_no[i];
254     l[i] = loads[i];
255   }
256
257   thisProxy[rand_nbor1].GossipLoadInfo(CkMyPe(), info_count, p, l);
258
259   if(CkNumPes() > 2)
260     thisProxy[rand_nbor2].GossipLoadInfo(CkMyPe(), info_count, p, l);
261
262   // Increment the outgoind msg count
263   gossip_msg_count++;
264
265   delete[] p;
266   delete[] l;
267 }
268
269 /*
270 * Callback invoked when gossip is done and QD is detected
271 */
272 void DistributedLB::DoneGossip() {
273   if (CkMyPe() == 0 && _lb_args.debug() >= 1) {
274     double end_time = CmiWallTimer();
275     CkPrintf("DistributedLB>>>Gossip finished at %lf (%lf elapsed)\n",
276         end_time, end_time - start_time);
277   }
278   // Set a new transfer threshold for the actual load balancing phase. It starts
279   // high so that load is initially only transferred from the most loaded PEs.
280   // In subsequent phases it gets relaxed to allow less overloaded PEs to
281   // transfer load as well.
282   transfer_threshold = (max_load + avg_load) / 2;
283   lb_started = true;
284   underloaded_pe_count = pe_no.size();
285   Setup();
286   StartNextLBPhase();
287 }
288
289 void DistributedLB::StartNextLBPhase() {
290   if (underloaded_pe_count == 0 || my_load <= transfer_threshold) {
291     // If this PE has no information about underloaded processors, or it has
292     // no objects to donate to underloaded processors then do nothing.
293     DoneWithLBPhase();
294   } else {
295     // Otherwise this PE has work to donate, and should attempt to do so.
296     LoadBalance();
297   }
298 }
299
300 void DistributedLB::DoneWithLBPhase() {
301   phase_number++;
302
303   int count = 1;
304   if (_lb_args.debug() >= 1) count = 3;
305   CkReduction::tupleElement tupleRedn[] = {
306     CkReduction::tupleElement(sizeof(double), &my_load, CkReduction::max_double),
307     CkReduction::tupleElement(sizeof(double), &total_migrates, CkReduction::sum_int),
308     CkReduction::tupleElement(sizeof(double), &negack_count, CkReduction::max_int)
309   };
310   CkReductionMsg* msg = CkReductionMsg::buildFromTuple(tupleRedn, count);
311   CkCallback cb(CkIndex_DistributedLB::AfterLBReduction(NULL), thisProxy);
312   msg->setCallback(cb);
313   contribute(msg);
314 }
315
316 void DistributedLB::AfterLBReduction(CkReductionMsg* redn_msg) {
317   int count, migrations, max_nacks;
318   CkReduction::tupleElement* results;
319   redn_msg->toTuple(&results, &count);
320   delete redn_msg;
321
322   // Update load stats and print if in debug mode
323   max_load = *(double*)results[0].data;
324   double old_ratio = load_ratio;
325   load_ratio = max_load / avg_load;
326   if (count > 1) migrations = *(int*)results[1].data;
327   if (count > 2) max_nacks = *(int*)results[2].data;
328
329   if (CkMyPe() == 0 && _lb_args.debug() >= 1) {
330     CkPrintf("DistributedLB>>>After phase %i: max = %lf, avg = %lf, ratio = %lf\n",
331         phase_number, max_load, avg_load, load_ratio);
332   }
333
334   // Try more phases as long as our load ratio is still worse than our target,
335   // our transfer threshold hasn't decayed below the target, and we haven't hit
336   // the maximum number of phases yet.
337   if (load_ratio > kTargetRatio &&
338       transfer_threshold > kTargetRatio * avg_load &&
339       phase_number < kMaxPhases) {
340     // Relax the transfer ratio to allow more phases based on whether or not the
341     // previous phase was able to reduce the load ratio at all.
342     if (std::abs(load_ratio - old_ratio) < 0.01) {
343       // The previous phase didn't meaningfully reduce the max load, so relax
344       // the transfer threshold.
345       transfer_threshold = (transfer_threshold + avg_load) / 2;
346     } else {
347       // The previous phase did reduce the max load, so update the transfer
348       // threshold based on the new max load.
349       transfer_threshold = (max_load + avg_load) / 2;
350     }
351     StartNextLBPhase();
352   } else {
353     if (CkMyPe() == 0 && _lb_args.debug() >= 1) {
354       double end_time = CmiWallTimer();
355       CkPrintf("DistributedLB>>>Balancing completed at %lf (%lf elapsed)\n",
356           end_time, end_time - start_time);
357       CkPrintf("DistributedLB>>>%i total migrations with %i negative ack max\n",
358           migrations, max_nacks);
359     }
360     Cleanup();
361     PackAndSendMigrateMsgs();
362     if (!(_lb_args.metaLbOn() && _lb_args.metaLbModelDir() != nullptr))
363       theLbdb->nextLoadbalancer(seqno);
364   }
365 }
366
367 /*
368 * Perform load balancing based on the partial information obtained from the
369 * information propagation stage (gossip).
370 */
371 void DistributedLB::LoadBalance() {
372   CkVec<int> obj_no;
373   CkVec<int> obj_pe_no;
374
375   // Balance load and add objs to be transferred to obj_no and pe to be
376   // transferred to in obj_pe_no
377   MapObjsToPe(objs, obj_no, obj_pe_no);
378   total_migrates += obj_no.length();
379   total_migrates_ack = obj_no.length();
380
381   // If there is no migration, then this is done
382   if (obj_no.length() == 0) {
383     DoneWithLBPhase();
384         }
385 }
386
387 void DistributedLB::Setup() {
388   objs_count = 0;
389   double avg_objload = 0.0;
390   double max_objload = 0.0;
391   // Count the number of objs that are migratable and whose load is not 0.
392   for(int i=0; i < my_stats->n_objs; i++) {
393     if (my_stats->objData[i].migratable &&
394       my_stats->objData[i].wallTime > 0.000001) {
395       objs_count++;
396     }
397   }
398  
399   // Create a min heap of objs. The idea is to transfer smaller objs. The reason
400   // is that since we are making probabilistic transfer of load, sending small
401   // objs will result in better load balance.
402   objs = new minHeap(objs_count);
403   for(int i=0; i < my_stats->n_objs; i++) {
404     if (my_stats->objData[i].migratable &&
405         my_stats->objData[i].wallTime > 0.0001) {
406       InfoRecord* item = new InfoRecord;
407       item->load = my_stats->objData[i].wallTime;
408       item->Id = i;
409       objs->insert(item);
410     }
411   }
412
413   // Calculate the probabilities and cdf for PEs based on their load
414   // distribution
415         CalculateCumulateDistribution();
416 }
417
418 void DistributedLB::Cleanup() {
419
420   // Delete the object records from the heap
421   InfoRecord* obj;
422   while (NULL!=(obj=objs->deleteMin())) {
423     delete obj;
424   }
425   delete objs;
426 }
427
428 /*
429 * Map objects to PE for load balance. It takes in a min heap of objects which
430 * can be transferred and finds suitable receiver PEs. The mapping is stored in
431 * obj_no and the corresponding entry in obj_pe_no indicates the receiver PE.
432 */
433 void DistributedLB::MapObjsToPe(minHeap *objs, CkVec<int> &obj_no,
434     CkVec<int> &obj_pe_no) {
435   int p_id;
436   double p_load;
437   int rand_pe;
438
439   // While my load is more than the threshold, try to transfer objs
440   while (my_load > transfer_threshold) {
441     // If there is only one object, then nothing can be done to balance it.
442     if (objs_count < 2) break;
443
444     // Flag to indicate whether successful in finding a transfer
445     bool success = false;
446
447     // Get the smallest object
448     InfoRecord* obj = objs->deleteMin();
449     // No more objects to retrieve
450     if (obj == 0) break;
451
452     // Pick random PE based on the probability and the find is successful only
453     // if on transferring the object, that PE does not become overloaded
454     do {
455       rand_pe = PickRandReceiverPeIdx();
456       if (rand_pe == -1) break;
457       p_id = pe_no[rand_pe];
458       p_load = loads[rand_pe];
459       if (p_load + obj->load < transfer_threshold) {
460         success = true;
461       }
462       kMaxTrials--;
463     } while (!success && (kMaxTrials > 0));
464
465     kMaxObjPickTrials--;
466
467     // No successful in finding a suitable PE to transfer the object
468     if (!success) {
469       objs->insert(obj);
470       break;
471     }
472
473     // Found an object and a suitable PE to transfer it to. Decrement the obj
474     // count and update the loads.
475     obj_no.insertAtEnd(obj->Id);
476     obj_pe_no.insertAtEnd(p_id);
477     objs_count--;
478     loads[rand_pe] += obj->load;
479     my_load -= obj->load;
480
481     // Send information to the receiver PE about this obj. This is necessary for
482     // ack as well as finding out how many objs are migrating in
483                 thisProxy[p_id].InformMigration(obj->Id, CkMyPe(),
484         my_stats->objData[obj->Id].wallTime, false);
485
486     // This object is assigned, so we delete it from the heap
487     delete obj;
488   }
489 }
490
491 /*
492 * Receive information about inbound object including the id, from_pe and its
493 * load. 
494 *
495 * obj_id is the index of the object in the original PE.
496 * from_pe is the originating PE
497 * obj_load is the load of this object
498 * force flag indicates that this PE is forced to accept the object after
499 * multiple trials and ack should not be sent.
500 */
501 void DistributedLB::InformMigration(int obj_id, int from_pe, double obj_load,
502     bool force) {
503   // If not using ack based scheme or adding this obj does not make this PE
504   // overloaded, then accept the migrated obj and return. 
505   if (!kUseAck || my_load + obj_load <= transfer_threshold) {
506     migrates_expected++;
507     // add to my load and reply true
508     my_load += obj_load;
509     thisProxy[from_pe].RecvAck(obj_id, CkMyPe(), true);
510     return;
511   }
512
513   // We are using ack based scheme and turns out accepting this obj will make me
514   // overloaded but if it is a forced one, then accept it else return negative
515   // acknowledgement.
516   if (force) {
517     migrates_expected++;
518     // add to my load and reply with positive ack
519     my_load += obj_load;
520   } else {
521     // If my_load + obj_load is > threshold, then reply with negative ack 
522     //CkPrintf("[%d] Cannot accept obj with load %lf my_load %lf and init_load %lf migrates_expected %d\n", CkMyPe(), obj_load, my_load, init_load, migrates_expected);
523     thisProxy[from_pe].RecvAck(obj_id, CkMyPe(), false);
524   }
525 }
526
527 /*
528 * Receive an ack message which the message whether the assigned object can be
529 * assigned or not. If all the acks have been received, then create migration
530 * message.
531 */
532 void DistributedLB::RecvAck(int obj_id, int assigned_pe, bool can_accept) {
533   total_migrates_ack--;
534
535   // If it is a positive ack, then create a migrate msg for that object
536   if (can_accept) {
537     MigrateInfo* migrateMe = new MigrateInfo;
538     migrateMe->obj = my_stats->objData[obj_id].handle;
539     migrateMe->from_pe = CkMyPe();
540     migrateMe->to_pe = assigned_pe;
541     migrateInfo.push_back(migrateMe);
542   } else if (negack_count > 0.01*underloaded_pe_count) {
543     // If received negative acks more than the specified threshold, then drop it
544     negack_count++;
545     total_migrates--;
546     objs_count++;
547     my_load += my_stats->objData[obj_id].wallTime;
548   } else {
549     // Try to transfer again. Add the object back to a heap, update my load and
550     // try to find a suitable PE now.
551     total_migrates--;
552     negack_count++;
553     objs_count++;
554     my_load += my_stats->objData[obj_id].wallTime;
555
556     minHeap* objs = new minHeap(1);
557     InfoRecord* item = new InfoRecord;
558     item->load = my_stats->objData[obj_id].wallTime;
559     item->Id = obj_id;
560     objs->insert(item);
561
562     CkVec<int> obj_no;
563     CkVec<int> obj_pe_no;
564     MapObjsToPe(objs, obj_no, obj_pe_no);
565
566     // If a PE could be found to transfer this object, MapObjsToPe sends a
567     // message to it. Wait for the ack.
568     // Maybe at this point we can try to force it or just drop it.
569     if (obj_pe_no.size() > 0) {
570       total_migrates_ack++;
571       total_migrates++;
572     }
573     InfoRecord* obj;
574     while (NULL!=(obj=objs->deleteMin())) {
575       delete obj;
576     }
577   }
578
579   // Whenever all the acks have been received, create migration msg, go into the
580   // barrier and wait for everyone to finish their load balancing phase
581   if (total_migrates_ack == 0) {
582     DoneWithLBPhase();
583   }
584 }
585
586 void DistributedLB::PackAndSendMigrateMsgs() {
587   LBMigrateMsg* msg = new(total_migrates,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
588   msg->n_moves = total_migrates;
589   for(int i=0; i < total_migrates; i++) {
590     MigrateInfo* item = (MigrateInfo*) migrateInfo[i];
591     msg->moves[i] = *item;
592     delete item;
593     migrateInfo[i] = 0;
594   }
595   migrateInfo.clear();
596   ProcessMigrationDecision(msg);
597 }
598
599 /*
600 * Pick a random PE based on the probability distribution.
601 */
602 int DistributedLB::PickRandReceiverPeIdx() const {
603   // The min loaded PEs have probabilities inversely proportional to their load.
604   // A cumulative distribution is calculated and a PE is randomly selected based
605   // on the cdf.
606   // Generate a random number and return the index of min loaded PE whose cdf is
607   // greater than the random number.
608         double no = (double) rand()/(double) RAND_MAX;
609         for (int i = 0; i < underloaded_pe_count; i++) {
610                 if (distribution[i] >= no) {
611                         return i;
612                 }
613         }
614         return -1;
615 }
616
617 /*
618 * The PEs have probabilities inversely proportional to their load. Construct a
619 * CDF based on this.
620 */
621 void DistributedLB::CalculateCumulateDistribution() {
622   // The min loaded PEs have probabilities inversely proportional to their load.
623         double cumulative = 0.0;
624         for (int i = 0; i < underloaded_pe_count; i++) {
625                 cumulative += (transfer_threshold - loads[i])/transfer_threshold;
626                 distribution.push_back(cumulative);
627         }
628
629   for (int i = 0; i < underloaded_pe_count; i++) {
630     distribution[i] = distribution[i]/cumulative;
631   }
632 }
633
634 #include "DistributedLB.def.h"