e93caa25c0b95adf53d543a1b6ed5688faf83303
[charm.git] / src / ck-ldb / DistributedLB.C
1 /**
2  * Author: gplkrsh2@illinois.edu (Harshitha Menon)
3  * A distributed load balancer.
4 */
5
6 #include "DistributedLB.h"
7
8 #include "elements.h"
9
10 extern int quietModeRequested;
11
12 CreateLBFunc_Def(DistributedLB, "The distributed load balancer")
13
14 using std::vector;
15
16 DistributedLB::DistributedLB(CkMigrateMessage *m) : CBase_DistributedLB(m) {
17 }
18
19 DistributedLB::DistributedLB(const CkLBOptions &opt) : CBase_DistributedLB(opt) {
20   lbname = "DistributedLB";
21   if (CkMyPe() == 0 && !quietModeRequested) {
22     CkPrintf("CharmLB> DistributedLB created: threshold %lf, max phases %i\n",
23         kTargetRatio, kMaxPhases);
24   }
25   InitLB(opt);
26 }
27
28 void DistributedLB::initnodeFn()
29 {
30   _registerCommandLineOpt("+DistLBTargetRatio");
31   _registerCommandLineOpt("+DistLBMaxPhases");
32 }
33
34 void DistributedLB::turnOn()
35 {
36 #if CMK_LBDB_ON
37   theLbdb->getLBDB()->
38     TurnOnBarrierReceiver(receiver);
39   theLbdb->getLBDB()->
40     TurnOnNotifyMigrated(notifier);
41   theLbdb->getLBDB()->
42     TurnOnStartLBFn(startLbFnHdl);
43 #endif
44 }
45
46 void DistributedLB::turnOff()
47 {
48 #if CMK_LBDB_ON
49   theLbdb->getLBDB()->
50     TurnOffBarrierReceiver(receiver);
51   theLbdb->getLBDB()->
52     TurnOffNotifyMigrated(notifier);
53   theLbdb->getLBDB()->
54     TurnOffStartLBFn(startLbFnHdl);
55 #endif
56 }
57
58 void DistributedLB::InitLB(const CkLBOptions &opt) {
59   thisProxy = CProxy_DistributedLB(thisgroup);
60   if (opt.getSeqNo() > 0 || (_lb_args.metaLbOn() && _lb_args.metaLbModelDir() != nullptr))
61     turnOff();
62
63   // Set constants
64   kUseAck = true;
65   kPartialInfoCount = -1;
66   kMaxPhases = _lb_args.maxDistPhases();
67   kTargetRatio = _lb_args.targetRatio();
68 }
69
70 void DistributedLB::Strategy(const DistBaseLB::LDStats* const stats) {
71   if (CkMyPe() == 0 && _lb_args.debug() >= 1) {
72     start_time = CmiWallTimer();
73     CkPrintf("In DistributedLB strategy at %lf\n", start_time);
74   }
75
76   // Set constants for this iteration (these depend on CkNumPes() or number of
77   // objects, so may not be constant for the entire program)
78   kMaxObjPickTrials = stats->n_objs;
79   // Maximum number of times we will try to find a PE to transfer an object
80   // successfully
81   kMaxTrials = CkNumPes();
82   // Max gossip messages sent from each PE
83   kMaxGossipMsgCount = 2 * CmiLog2(CkNumPes());
84
85   // Reset member variables for this LB iteration
86   phase_number = 0;
87   my_stats = stats;
88
89         my_load = 0.0;
90         for (int i = 0; i < my_stats->n_objs; i++) {
91                 my_load += my_stats->objData[i].wallTime; 
92   }
93   init_load = my_load;
94   b_load = my_stats->total_walltime - (my_stats->idletime + my_load);
95
96         pe_no.clear();
97         loads.clear();
98         distribution.clear();
99   lb_started = false;
100   gossip_msg_count = 0;
101   negack_count = 0;
102
103   total_migrates = 0;
104   total_migrates_ack = 0;
105
106   srand((unsigned)(CmiWallTimer()*1.0e06) + CkMyPe());
107
108   // Do a reduction to obtain load information for the system
109   CkReduction::tupleElement tupleRedn[] = {
110     CkReduction::tupleElement(sizeof(double), &my_load, CkReduction::sum_double),
111     CkReduction::tupleElement(sizeof(double), &my_load, CkReduction::max_double)
112   };
113   CkReductionMsg* msg = CkReductionMsg::buildFromTuple(tupleRedn, 2);
114   CkCallback cb(CkIndex_DistributedLB::LoadReduction(NULL), thisProxy);
115   msg->setCallback(cb);
116   contribute(msg);
117 }
118
119 /*
120 * Once the reduction callback is obtained for average load in the system, the
121 * gossiping starts. Only the underloaded processors gossip.
122 * Termination of gossip is via QD and callback is DoneGossip.
123 */
124 void DistributedLB::LoadReduction(CkReductionMsg* redn_msg) {
125   int count;
126   CkReduction::tupleElement* results;
127   redn_msg->toTuple(&results, &count);
128   delete redn_msg;
129
130   // Set the initial global load stats and print when LBDebug is on
131   avg_load = *(double*)results[0].data / CkNumPes();
132   max_load = *(double*)results[1].data;
133   load_ratio = max_load / avg_load;
134
135   if (CkMyPe() == 0 && _lb_args.debug() >= 1) {
136     CkPrintf("DistributedLB>>>Before LB: max = %lf, avg = %lf, ratio = %lf\n",
137         max_load, avg_load, load_ratio);
138   }
139
140   // If there are no overloaded processors, immediately terminate
141   if (load_ratio <= kTargetRatio) {
142     if (CkMyPe() == 0 && _lb_args.debug() >= 1) {
143       CkPrintf("DistributedLB>>>Load ratio already within the target of %lf, ending early.\n",
144           kTargetRatio);
145     }
146     PackAndSendMigrateMsgs();
147     delete [] results;
148     return;
149   }
150
151   // Set transfer threshold for the gossip phase, for which only PEs lower than
152   // the target ratio are considered underloaded.
153   transfer_threshold = kTargetRatio * avg_load;
154
155   // If my load is under the acceptance threshold, then I am underloaded and
156   // can receive more work. So assuming there exists an overloaded PE that can
157   // donate work, I will start gossipping my load information.
158   if (my_load < transfer_threshold) {
159                 double r_loads[1];
160                 int r_pe_no[1];
161     r_loads[0] = my_load;
162     r_pe_no[0] = CkMyPe();
163     GossipLoadInfo(CkMyPe(), 1, r_pe_no, r_loads);
164   }
165
166   // Start quiescence detection at PE 0.
167   if (CkMyPe() == 0) {
168     CkCallback cb(CkIndex_DistributedLB::DoneGossip(), thisProxy);
169     CkStartQD(cb);
170   }
171   delete [] results;
172 }
173
174 /*
175 * Gossip load information between peers. Receive the gossip message.
176 */
177 void DistributedLB::GossipLoadInfo(int from_pe, int n,
178     int remote_pe_no[], double remote_loads[]) {
179   // Placeholder temp vectors for the sorted pe and their load 
180   vector<int> p_no;
181   vector<double> l;
182
183   int i = 0;
184   int j = 0;
185   int m = pe_no.size();
186
187   // Merge (using merge sort) information received with the information at hand
188   // Since the initial list is sorted, the merging is linear in the size of the
189   // list. 
190   while (i < m && j < n) {
191     if (pe_no[i] < remote_pe_no[j]) {
192       p_no.push_back(pe_no[i]);
193       l.push_back(loads[i]);
194       i++;
195     } else {
196       p_no.push_back(remote_pe_no[j]);
197       l.push_back(remote_loads[j]);
198       if (pe_no[i] == remote_pe_no[j]) {
199         i++;
200       }
201       j++;
202     }
203   }
204
205   if (i == m && j != n) {
206     while (j < n) {
207       p_no.push_back(remote_pe_no[j]);
208       l.push_back(remote_loads[j]);
209       j++;
210     }
211   } else if (j == n && i != m) {
212     while (i < m) {
213       p_no.push_back(pe_no[i]);
214       l.push_back(loads[i]);
215       i++;
216     }
217   }
218
219   // After the merge sort, swap. Now pe_no and loads have updated information
220   pe_no.swap(p_no);
221   loads.swap(l);
222
223   SendLoadInfo();
224 }
225
226 /*
227 * Construct the gossip message and send to peers
228 */
229 void DistributedLB::SendLoadInfo() {
230   // TODO: Keep it 0.8*log
231   // This PE has already sent the maximum set threshold for gossip messages.
232   // Hence don't send out any more messages. This is to prevent flooding.
233   if (gossip_msg_count > kMaxGossipMsgCount) {
234     return;
235   }
236
237   // Pick two random neighbors to send the message to
238   int rand_nbor1;
239   int rand_nbor2 = -1;
240   do {
241     rand_nbor1 = rand() % CkNumPes();
242   } while (rand_nbor1 == CkMyPe());
243   // Pick the second neighbor which is not the same as the first one.
244   if(CkNumPes() > 2)
245     do {
246       rand_nbor2 = rand() % CkNumPes();
247     } while ((rand_nbor2 == CkMyPe()) || (rand_nbor2 == rand_nbor1));
248
249   // kPartialInfoCount indicates how much information is send in gossip. If it
250   // is set to -1, it means use all the information available.
251   int info_count = (kPartialInfoCount >= 0) ? kPartialInfoCount : pe_no.size();
252   int* p = new int[info_count];
253   double* l = new double[info_count];
254   for (int i = 0; i < info_count; i++) {
255     p[i] = pe_no[i];
256     l[i] = loads[i];
257   }
258
259   thisProxy[rand_nbor1].GossipLoadInfo(CkMyPe(), info_count, p, l);
260
261   if(CkNumPes() > 2)
262     thisProxy[rand_nbor2].GossipLoadInfo(CkMyPe(), info_count, p, l);
263
264   // Increment the outgoind msg count
265   gossip_msg_count++;
266
267   delete[] p;
268   delete[] l;
269 }
270
271 /*
272 * Callback invoked when gossip is done and QD is detected
273 */
274 void DistributedLB::DoneGossip() {
275   if (CkMyPe() == 0 && _lb_args.debug() >= 1) {
276     double end_time = CmiWallTimer();
277     CkPrintf("DistributedLB>>>Gossip finished at %lf (%lf elapsed)\n",
278         end_time, end_time - start_time);
279   }
280   // Set a new transfer threshold for the actual load balancing phase. It starts
281   // high so that load is initially only transferred from the most loaded PEs.
282   // In subsequent phases it gets relaxed to allow less overloaded PEs to
283   // transfer load as well.
284   transfer_threshold = (max_load + avg_load) / 2;
285   lb_started = true;
286   underloaded_pe_count = pe_no.size();
287   Setup();
288   StartNextLBPhase();
289 }
290
291 void DistributedLB::StartNextLBPhase() {
292   if (underloaded_pe_count == 0 || my_load <= transfer_threshold) {
293     // If this PE has no information about underloaded processors, or it has
294     // no objects to donate to underloaded processors then do nothing.
295     DoneWithLBPhase();
296   } else {
297     // Otherwise this PE has work to donate, and should attempt to do so.
298     LoadBalance();
299   }
300 }
301
302 void DistributedLB::DoneWithLBPhase() {
303   phase_number++;
304
305   int count = 1;
306   if (_lb_args.debug() >= 1) count = 3;
307   CkReduction::tupleElement tupleRedn[] = {
308     CkReduction::tupleElement(sizeof(double), &my_load, CkReduction::max_double),
309     CkReduction::tupleElement(sizeof(double), &total_migrates, CkReduction::sum_int),
310     CkReduction::tupleElement(sizeof(double), &negack_count, CkReduction::max_int)
311   };
312   CkReductionMsg* msg = CkReductionMsg::buildFromTuple(tupleRedn, count);
313   CkCallback cb(CkIndex_DistributedLB::AfterLBReduction(NULL), thisProxy);
314   msg->setCallback(cb);
315   contribute(msg);
316 }
317
318 void DistributedLB::AfterLBReduction(CkReductionMsg* redn_msg) {
319   int count, migrations, max_nacks;
320   CkReduction::tupleElement* results;
321   redn_msg->toTuple(&results, &count);
322   delete redn_msg;
323
324   // Update load stats and print if in debug mode
325   max_load = *(double*)results[0].data;
326   double old_ratio = load_ratio;
327   load_ratio = max_load / avg_load;
328   if (count > 1) migrations = *(int*)results[1].data;
329   if (count > 2) max_nacks = *(int*)results[2].data;
330
331   if (CkMyPe() == 0 && _lb_args.debug() >= 1) {
332     CkPrintf("DistributedLB>>>After phase %i: max = %lf, avg = %lf, ratio = %lf\n",
333         phase_number, max_load, avg_load, load_ratio);
334   }
335
336   // Try more phases as long as our load ratio is still worse than our target,
337   // our transfer threshold hasn't decayed below the target, and we haven't hit
338   // the maximum number of phases yet.
339   if (load_ratio > kTargetRatio &&
340       transfer_threshold > kTargetRatio * avg_load &&
341       phase_number < kMaxPhases) {
342     // Relax the transfer ratio to allow more phases based on whether or not the
343     // previous phase was able to reduce the load ratio at all.
344     if (std::abs(load_ratio - old_ratio) < 0.01) {
345       // The previous phase didn't meaningfully reduce the max load, so relax
346       // the transfer threshold.
347       transfer_threshold = (transfer_threshold + avg_load) / 2;
348     } else {
349       // The previous phase did reduce the max load, so update the transfer
350       // threshold based on the new max load.
351       transfer_threshold = (max_load + avg_load) / 2;
352     }
353     StartNextLBPhase();
354   } else {
355     if (CkMyPe() == 0 && _lb_args.debug() >= 1) {
356       double end_time = CmiWallTimer();
357       CkPrintf("DistributedLB>>>Balancing completed at %lf (%lf elapsed)\n",
358           end_time, end_time - start_time);
359       CkPrintf("DistributedLB>>>%i total migrations with %i negative ack max\n",
360           migrations, max_nacks);
361     }
362     Cleanup();
363     PackAndSendMigrateMsgs();
364     if (!(_lb_args.metaLbOn() && _lb_args.metaLbModelDir() != nullptr))
365       theLbdb->nextLoadbalancer(seqno);
366   }
367   delete [] results;
368 }
369
370 /*
371 * Perform load balancing based on the partial information obtained from the
372 * information propagation stage (gossip).
373 */
374 void DistributedLB::LoadBalance() {
375   CkVec<int> obj_no;
376   CkVec<int> obj_pe_no;
377
378   // Balance load and add objs to be transferred to obj_no and pe to be
379   // transferred to in obj_pe_no
380   MapObjsToPe(objs, obj_no, obj_pe_no);
381   total_migrates += obj_no.length();
382   total_migrates_ack = obj_no.length();
383
384   // If there is no migration, then this is done
385   if (obj_no.length() == 0) {
386     DoneWithLBPhase();
387         }
388 }
389
390 void DistributedLB::Setup() {
391   objs_count = 0;
392   double avg_objload = 0.0;
393   double max_objload = 0.0;
394   // Count the number of objs that are migratable and whose load is not 0.
395   for(int i=0; i < my_stats->n_objs; i++) {
396     if (my_stats->objData[i].migratable &&
397       my_stats->objData[i].wallTime > 0.000001) {
398       objs_count++;
399     }
400   }
401  
402   // Create a min heap of objs. The idea is to transfer smaller objs. The reason
403   // is that since we are making probabilistic transfer of load, sending small
404   // objs will result in better load balance.
405   objs = new minHeap(objs_count);
406   for(int i=0; i < my_stats->n_objs; i++) {
407     if (my_stats->objData[i].migratable &&
408         my_stats->objData[i].wallTime > 0.0001) {
409       InfoRecord* item = new InfoRecord;
410       item->load = my_stats->objData[i].wallTime;
411       item->Id = i;
412       objs->insert(item);
413     }
414   }
415
416   // Calculate the probabilities and cdf for PEs based on their load
417   // distribution
418         CalculateCumulateDistribution();
419 }
420
421 void DistributedLB::Cleanup() {
422
423   // Delete the object records from the heap
424   InfoRecord* obj;
425   while (NULL!=(obj=objs->deleteMin())) {
426     delete obj;
427   }
428   delete objs;
429 }
430
431 /*
432 * Map objects to PE for load balance. It takes in a min heap of objects which
433 * can be transferred and finds suitable receiver PEs. The mapping is stored in
434 * obj_no and the corresponding entry in obj_pe_no indicates the receiver PE.
435 */
436 void DistributedLB::MapObjsToPe(minHeap *objs, CkVec<int> &obj_no,
437     CkVec<int> &obj_pe_no) {
438   int p_id;
439   double p_load;
440   int rand_pe;
441
442   // While my load is more than the threshold, try to transfer objs
443   while (my_load > transfer_threshold) {
444     // If there is only one object, then nothing can be done to balance it.
445     if (objs_count < 2) break;
446
447     // Flag to indicate whether successful in finding a transfer
448     bool success = false;
449
450     // Get the smallest object
451     InfoRecord* obj = objs->deleteMin();
452     // No more objects to retrieve
453     if (obj == 0) break;
454
455     // Pick random PE based on the probability and the find is successful only
456     // if on transferring the object, that PE does not become overloaded
457     do {
458       rand_pe = PickRandReceiverPeIdx();
459       if (rand_pe == -1) break;
460       p_id = pe_no[rand_pe];
461       p_load = loads[rand_pe];
462       if (p_load + obj->load < transfer_threshold) {
463         success = true;
464       }
465       kMaxTrials--;
466     } while (!success && (kMaxTrials > 0));
467
468     kMaxObjPickTrials--;
469
470     // No successful in finding a suitable PE to transfer the object
471     if (!success) {
472       objs->insert(obj);
473       break;
474     }
475
476     // Found an object and a suitable PE to transfer it to. Decrement the obj
477     // count and update the loads.
478     obj_no.insertAtEnd(obj->Id);
479     obj_pe_no.insertAtEnd(p_id);
480     objs_count--;
481     loads[rand_pe] += obj->load;
482     my_load -= obj->load;
483
484     // Send information to the receiver PE about this obj. This is necessary for
485     // ack as well as finding out how many objs are migrating in
486                 thisProxy[p_id].InformMigration(obj->Id, CkMyPe(),
487         my_stats->objData[obj->Id].wallTime, false);
488
489     // This object is assigned, so we delete it from the heap
490     delete obj;
491   }
492 }
493
494 /*
495 * Receive information about inbound object including the id, from_pe and its
496 * load. 
497 *
498 * obj_id is the index of the object in the original PE.
499 * from_pe is the originating PE
500 * obj_load is the load of this object
501 * force flag indicates that this PE is forced to accept the object after
502 * multiple trials and ack should not be sent.
503 */
504 void DistributedLB::InformMigration(int obj_id, int from_pe, double obj_load,
505     bool force) {
506   // If not using ack based scheme or adding this obj does not make this PE
507   // overloaded, then accept the migrated obj and return. 
508   if (!kUseAck || my_load + obj_load <= transfer_threshold) {
509     migrates_expected++;
510     // add to my load and reply true
511     my_load += obj_load;
512     thisProxy[from_pe].RecvAck(obj_id, CkMyPe(), true);
513     return;
514   }
515
516   // We are using ack based scheme and turns out accepting this obj will make me
517   // overloaded but if it is a forced one, then accept it else return negative
518   // acknowledgement.
519   if (force) {
520     migrates_expected++;
521     // add to my load and reply with positive ack
522     my_load += obj_load;
523   } else {
524     // If my_load + obj_load is > threshold, then reply with negative ack 
525     //CkPrintf("[%d] Cannot accept obj with load %lf my_load %lf and init_load %lf migrates_expected %d\n", CkMyPe(), obj_load, my_load, init_load, migrates_expected);
526     thisProxy[from_pe].RecvAck(obj_id, CkMyPe(), false);
527   }
528 }
529
530 /*
531 * Receive an ack message which the message whether the assigned object can be
532 * assigned or not. If all the acks have been received, then create migration
533 * message.
534 */
535 void DistributedLB::RecvAck(int obj_id, int assigned_pe, bool can_accept) {
536   total_migrates_ack--;
537
538   // If it is a positive ack, then create a migrate msg for that object
539   if (can_accept) {
540     MigrateInfo* migrateMe = new MigrateInfo;
541     migrateMe->obj = my_stats->objData[obj_id].handle;
542     migrateMe->from_pe = CkMyPe();
543     migrateMe->to_pe = assigned_pe;
544     migrateInfo.push_back(migrateMe);
545   } else if (negack_count > 0.01*underloaded_pe_count) {
546     // If received negative acks more than the specified threshold, then drop it
547     negack_count++;
548     total_migrates--;
549     objs_count++;
550     my_load += my_stats->objData[obj_id].wallTime;
551   } else {
552     // Try to transfer again. Add the object back to a heap, update my load and
553     // try to find a suitable PE now.
554     total_migrates--;
555     negack_count++;
556     objs_count++;
557     my_load += my_stats->objData[obj_id].wallTime;
558
559     minHeap* objs = new minHeap(1);
560     InfoRecord* item = new InfoRecord;
561     item->load = my_stats->objData[obj_id].wallTime;
562     item->Id = obj_id;
563     objs->insert(item);
564
565     CkVec<int> obj_no;
566     CkVec<int> obj_pe_no;
567     MapObjsToPe(objs, obj_no, obj_pe_no);
568
569     // If a PE could be found to transfer this object, MapObjsToPe sends a
570     // message to it. Wait for the ack.
571     // Maybe at this point we can try to force it or just drop it.
572     if (obj_pe_no.size() > 0) {
573       total_migrates_ack++;
574       total_migrates++;
575     }
576     InfoRecord* obj;
577     while (NULL!=(obj=objs->deleteMin())) {
578       delete obj;
579     }
580   }
581
582   // Whenever all the acks have been received, create migration msg, go into the
583   // barrier and wait for everyone to finish their load balancing phase
584   if (total_migrates_ack == 0) {
585     DoneWithLBPhase();
586   }
587 }
588
589 void DistributedLB::PackAndSendMigrateMsgs() {
590   LBMigrateMsg* msg = new(total_migrates,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
591   msg->n_moves = total_migrates;
592   for(int i=0; i < total_migrates; i++) {
593     MigrateInfo* item = (MigrateInfo*) migrateInfo[i];
594     msg->moves[i] = *item;
595     delete item;
596     migrateInfo[i] = 0;
597   }
598   migrateInfo.clear();
599   ProcessMigrationDecision(msg);
600 }
601
602 /*
603 * Pick a random PE based on the probability distribution.
604 */
605 int DistributedLB::PickRandReceiverPeIdx() const {
606   // The min loaded PEs have probabilities inversely proportional to their load.
607   // A cumulative distribution is calculated and a PE is randomly selected based
608   // on the cdf.
609   // Generate a random number and return the index of min loaded PE whose cdf is
610   // greater than the random number.
611         double no = (double) rand()/(double) RAND_MAX;
612         for (int i = 0; i < underloaded_pe_count; i++) {
613                 if (distribution[i] >= no) {
614                         return i;
615                 }
616         }
617         return -1;
618 }
619
620 /*
621 * The PEs have probabilities inversely proportional to their load. Construct a
622 * CDF based on this.
623 */
624 void DistributedLB::CalculateCumulateDistribution() {
625   // The min loaded PEs have probabilities inversely proportional to their load.
626         double cumulative = 0.0;
627         for (int i = 0; i < underloaded_pe_count; i++) {
628                 cumulative += (transfer_threshold - loads[i])/transfer_threshold;
629                 distribution.push_back(cumulative);
630         }
631
632   for (int i = 0; i < underloaded_pe_count; i++) {
633     distribution[i] = distribution[i]/cumulative;
634   }
635 }
636
637 #include "DistributedLB.def.h"