doc: Add serial to list of ci file reserved words
[charm.git] / src / ck-ldb / MetaBalancer.C
1 /**
2  * Author: Harshitha Menon
3  * Date created: 2012
4 */
5 /*@{*/
6
7 #include "converse.h"
8
9 /*
10  */
11
12 #include "MetaBalancer.h"
13 #include "topology.h"
14
15 #include "limits.h"
16
17 #define VEC_SIZE 50
18 #define IMB_TOLERANCE 1.1
19 #define OUTOFWAY_TOLERANCE 2
20 #define UTILIZATION_THRESHOLD 0.7
21 #define NEGLECT_IDLE 2 // Should never be == 1
22 #define MIN_STATS 6
23 #define STATS_COUNT 8 // The number of stats collected during reduction
24
25 #define DEBAD(x) /*CkPrintf x*/
26 #define DEBADDETAIL(x) /*CkPrintf x*/
27 #define EXTRA_FEATURE 0
28
29 CkReductionMsg* lbDataCollection(int nMsg, CkReductionMsg** msgs) {
30   double lb_data[STATS_COUNT];
31   lb_data[1] = 0.0; // total number of processors contributing
32   lb_data[2] = 0.0; // total load
33   lb_data[3] = 0.0; // max load
34   lb_data[4] = 0.0; // idle time
35   lb_data[5] = 1.0; // utilization
36   lb_data[6] = 0.0; // total load with bg
37   lb_data[7] = 0.0; // max load with bg
38   for (int i = 0; i < nMsg; i++) {
39     CkAssert(msgs[i]->getSize() == STATS_COUNT*sizeof(double));
40     if (msgs[i]->getSize() != STATS_COUNT*sizeof(double)) {
41       CkPrintf("Error!!! Reduction not correct. Msg size is %d\n",
42           msgs[i]->getSize());
43       CkAbort("Incorrect Reduction size in MetaBalancer\n");
44     }
45     double* m = (double *)msgs[i]->getData();
46     // Total count
47     lb_data[1] += m[1];
48     // Avg load
49     lb_data[2] += m[2];
50     // Max load
51     lb_data[3] = ((m[3] > lb_data[3])? m[3] : lb_data[3]);
52     // Avg idle
53     lb_data[4] += m[4];
54     // Get least utilization
55     lb_data[5] = ((m[5] < lb_data[5]) ? m[5] : lb_data[5]);
56     // Get Avg load with bg
57     lb_data[6] += m[6];
58     // Get Max load with bg
59     lb_data[7] = ((m[7] > lb_data[7])? m[7] : lb_data[7]);
60     if (i == 0) {
61       // Iteration no
62       lb_data[0] = m[0];
63     }
64     if (m[0] != lb_data[0]) {
65       CkPrintf("Error!!! Reduction is intermingled between iteration %lf \
66         and %lf\n", lb_data[0], m[0]);
67       CkAbort("Intermingling iterations in MetaBalancer\n");
68     }
69   }
70   return CkReductionMsg::buildNew(STATS_COUNT*sizeof(double), lb_data);
71 }
72
73 /*global*/ CkReduction::reducerType lbDataCollectionType;
74 /*initcall*/ void registerLBDataCollection(void) {
75   lbDataCollectionType = CkReduction::addReducer(lbDataCollection);
76 }
77
78 CkGroupID _metalb;
79 CkGroupID _metalbred;
80
81 CkpvDeclare(int, metalbInited);  /**< true if metabalancer is inited */
82
83 double _nobj_timer = 10.0;
84
85 // mainchare
86 MetaLBInit::MetaLBInit(CkArgMsg *m) {
87 #if CMK_LBDB_ON
88   _metalbred = CProxy_MetaBalancerRedn::ckNew();
89   _metalb = CProxy_MetaBalancer::ckNew();
90 #endif
91   delete m;
92 }
93
94 // called from init.C
95 void _metabalancerInit() {
96   CkpvInitialize(int, metalbInited);
97   CkpvAccess(metalbInited) = 0;
98   char **argv = CkGetArgv();
99         CmiGetArgDoubleDesc(argv, "+MetaLBNoObjTimer", &_nobj_timer,
100                         "Time in seconds before triggering reduction for no objs");
101 }
102
103 void MetaBalancer::initnodeFn() {
104 }
105
106 // called by my constructor
107 void MetaBalancer::init(void) {
108   lbdatabase = (LBDatabase *)CkLocalBranch(_lbdb);
109   CkpvAccess(metalbInited) = 1;
110   total_load_vec.resize(VEC_SIZE, 0.0);
111   total_count_vec.resize(VEC_SIZE, 0);
112   prev_idle = 0.0;
113   alpha_beta_cost_to_load = 1.0; // Some random value. TODO: Find the actual
114
115   metaRdnGroup = (MetaBalancerRedn*)CkLocalBranch(_metalbred);
116
117   adaptive_lbdb.lb_iter_no = -1;
118
119   // If metabalancer enabled, initialize the variables
120   adaptive_struct.tentative_period =  INT_MAX;
121   adaptive_struct.final_lb_period =  INT_MAX;
122   adaptive_struct.lb_calculated_period = INT_MAX;
123   adaptive_struct.lb_iteration_no = -1;
124   adaptive_struct.finished_iteration_no = -1;
125   adaptive_struct.global_max_iter_no = 0;
126   adaptive_struct.tentative_max_iter_no = -1;
127   adaptive_struct.in_progress = false;
128   adaptive_struct.lb_strategy_cost = 0.0;
129   adaptive_struct.lb_migration_cost = 0.0;
130   adaptive_struct.lb_msg_send_no = 0;
131   adaptive_struct.lb_msg_recv_no = 0;
132   adaptive_struct.total_syncs_called = 0;
133   adaptive_struct.last_lb_type = -1;
134
135
136   // This is indicating if the load balancing strategy and migration started.
137   // This is mainly used to register callbacks for noobj pes. They would
138   // register as soon as resumefromsync is called. On receiving the handles at
139   // the central pe, it clears the previous handlers and sets lb_in_progress
140   // to false so that it doesn't clear the handles.
141   lb_in_progress = false;
142
143   is_prev_lb_refine = -1;
144   if (_lb_args.metaLbOn()) {
145     periodicCall((void *) this);
146   }
147 }
148
149 void MetaBalancer::pup(PUP::er& p) {
150         CBase_MetaBalancer::pup(p);
151   if (p.isUnpacking()) {
152     lbdatabase = (LBDatabase *)CkLocalBranch(_lbdb);
153     metaRdnGroup = (MetaBalancerRedn*)CkLocalBranch(_metalbred);
154   }
155   p|prev_idle;
156   p|alpha_beta_cost_to_load;
157   p|is_prev_lb_refine;
158   p|lb_in_progress;
159 }
160
161
162 void MetaBalancer::ResumeClients() {
163   // If metabalancer enabled, initialize the variables
164   adaptive_lbdb.history_data.clear();
165
166   adaptive_struct.tentative_period =  INT_MAX;
167   adaptive_struct.final_lb_period =  INT_MAX;
168   adaptive_struct.lb_calculated_period = INT_MAX;
169   adaptive_struct.lb_iteration_no = -1;
170   adaptive_struct.finished_iteration_no = -1;
171   adaptive_struct.global_max_iter_no = 0;
172   adaptive_struct.tentative_max_iter_no = -1;
173   adaptive_struct.in_progress = false;
174   adaptive_struct.lb_strategy_cost = 0.0;
175   adaptive_struct.lb_migration_cost = 0.0;
176   adaptive_struct.lb_msg_send_no = 0;
177   adaptive_struct.lb_msg_recv_no = 0;
178   adaptive_struct.total_syncs_called = 0;
179
180   prev_idle = 0.0;
181   if (lb_in_progress) {
182     lbdb_no_obj_callback.clear();
183     lb_in_progress = false;
184   }
185   HandleAdaptiveNoObj();
186 }
187
188 int MetaBalancer::get_iteration() {
189   return adaptive_struct.lb_iteration_no;
190 }
191
192 int MetaBalancer::get_finished_iteration() {
193   return adaptive_struct.finished_iteration_no;
194 }
195
196 bool MetaBalancer::AddLoad(int it_n, double load) {
197 #if CMK_LBDB_ON
198   int index = it_n % VEC_SIZE;
199   total_count_vec[index]++;
200   adaptive_struct.total_syncs_called++;
201   DEBADDETAIL(("At PE %d Total contribution for iteration %d is %d \
202       total objs %d\n", CkMyPe(), it_n, total_count_vec[index],
203       lbdatabase->getLBDB()->ObjDataCount()));
204
205   if (it_n <= adaptive_struct.finished_iteration_no) {
206     CkAbort("Error!! Received load for iteration that has contributed\n");
207   }
208   if (it_n > adaptive_struct.lb_iteration_no) {
209     adaptive_struct.lb_iteration_no = it_n;
210   }
211   total_load_vec[index] += load;
212   if (total_count_vec[index] > lbdatabase->getLBDB()->ObjDataCount()) {
213     CkPrintf("iteration %d received %d contributions and expected %d\n", it_n,
214         total_count_vec[index], lbdatabase->getLBDB()->ObjDataCount());
215     CkAbort("Abort!!! Received more contribution");
216   }
217
218   if (total_count_vec[index] == lbdatabase->getLBDB()->ObjDataCount()) {
219     double idle_time, bg_walltime, cpu_bgtime;
220     lbdatabase->IdleTime(&idle_time);
221     lbdatabase->BackgroundLoad(&bg_walltime, &cpu_bgtime);
222
223     int sync_for_bg = adaptive_struct.total_syncs_called +
224         lbdatabase->getLBDB()->ObjDataCount();
225     bg_walltime = bg_walltime * lbdatabase->getLBDB()->ObjDataCount() / sync_for_bg;
226
227     if (it_n < NEGLECT_IDLE) {
228       prev_idle = idle_time;
229     }
230     idle_time -= prev_idle;
231
232     // The chares do not contribute their 0th iteration load. So the total syncs
233     // in reality is total_syncs_called + obj_counts
234     int total_countable_syncs = adaptive_struct.total_syncs_called +
235         (1 - NEGLECT_IDLE) * lbdatabase->getLBDB()->ObjDataCount(); // TODO: Fix me!
236     if (total_countable_syncs != 0) {
237       idle_time = idle_time * lbdatabase->getLBDB()->ObjDataCount() / total_countable_syncs;
238     }
239
240     double lb_data[STATS_COUNT];
241     lb_data[0] = it_n;
242     lb_data[1] = 1;
243     lb_data[2] = total_load_vec[index]; // For average load
244     lb_data[3] = total_load_vec[index]; // For max load
245     // Set utilization
246     if (total_load_vec[index] == 0.0) {
247         lb_data[4] = 0.0;
248       lb_data[5] = 0.0;
249     } else {
250       lb_data[4] = total_load_vec[index]/(idle_time + total_load_vec[index]);
251       lb_data[5] = total_load_vec[index]/(idle_time + total_load_vec[index]);
252     }
253     lb_data[6] = lb_data[2] + bg_walltime; // For Avg load with bg
254     lb_data[7] = lb_data[6]; // For Max load with bg
255     total_load_vec[index] = 0.0;
256     total_count_vec[index] = 0;
257
258     adaptive_struct.finished_iteration_no = it_n;
259     DEBADDETAIL(("[%d] sends total load %lf idle time %lf utilization %lf at iter %d\n",
260         CkMyPe(), total_load_vec[index], idle_time,
261         lb_data[5], adaptive_struct.finished_iteration_no));
262
263     CkCallback cb(CkIndex_MetaBalancer::ReceiveMinStats((CkReductionMsg*)NULL), thisProxy[0]);
264     contribute(STATS_COUNT*sizeof(double), lb_data, lbDataCollectionType, cb);
265   }
266 #endif
267   return true;
268 }
269
270 void MetaBalancer::ReceiveMinStats(CkReductionMsg *msg) {
271   double* load = (double *) msg->getData();
272   double avg = load[2]/load[1];
273   double max = load[3];
274   double avg_utilization = load[4]/load[1];
275   double min_utilization = load[5];
276   int iteration_n = (int) load[0];
277   double avg_load_bg = load[6]/load[1];
278   double max_load_bg = load[7];
279         // Set the max and avg to be the load with background
280         max = max_load_bg;
281         avg = avg_load_bg;
282   DEBAD(("** [%d] Iteration Avg load: %lf Max load: %lf Avg Util : %lf \
283       Min Util : %lf for %lf procs\n",iteration_n, avg, max, avg_utilization,
284       min_utilization, load[1]));
285   delete msg;
286
287   // For processors with  no  objs, trigger MetaBalancer reduction
288   if (adaptive_struct.final_lb_period != iteration_n) {
289     for (int i = 0; i < lbdb_no_obj_callback.size(); i++) {
290       thisProxy[lbdb_no_obj_callback[i]].TriggerAdaptiveReduction();
291     }
292   }
293
294   // Store the data for this iteration
295   adaptive_lbdb.lb_iter_no = iteration_n;
296   AdaptiveData data;
297   data.iteration = adaptive_lbdb.lb_iter_no;
298   data.max_load = max;
299   data.avg_load = avg;
300   data.min_utilization = min_utilization;
301   data.avg_utilization = avg_utilization;
302   adaptive_lbdb.history_data.push_back(data);
303
304   if (iteration_n == 1) {
305     adaptive_struct.info_first_iter.max_avg_ratio = max/avg;
306   }
307
308
309   if (adaptive_struct.final_lb_period == iteration_n) {
310     thisProxy.MetaLBCallLBOnChares();
311   }
312
313   // If lb period inform is in progress, dont inform again.
314   // If this is the lb data corresponding to the final lb period informed, then
315   // don't recalculate as some of the processors might have already gone into
316   // LB_STAGE.
317   if (adaptive_struct.in_progress || 
318       (adaptive_struct.final_lb_period == iteration_n)) {
319     return;
320   }
321
322         // If the utilization is beyond 90%, then do nothing
323         if (data.avg_utilization >= 0.90) {
324                 return;
325         }
326
327   double utilization_threshold = UTILIZATION_THRESHOLD;
328
329 #if EXTRA_FEATURE
330   DEBAD(("alpha_beta_to_load %lf\n", alpha_beta_cost_to_load));
331   if (alpha_beta_cost_to_load < 0.1) {
332     // Ignore the effect of idle time and there by lesser utilization. So we
333     // assign utilization threshold to be 0.0
334     DEBAD(("Changing the idle load tolerance coz this isn't \
335         communication intensive benchmark\n"));
336     utilization_threshold = 0.0;
337   }
338 #endif
339
340   // First generate the lb period based on the cost of lb. Find out what is the
341   // expected imbalance at the calculated lb period.
342   int period;
343   // This is based on the new max load after load balancing. So technically, it
344   // is calculated based on the shifter up avg curve.
345   double ratio_at_t = 1.0;
346   int tmp_lb_type;
347   double tmp_max_avg_ratio, tmp_comm_ratio;
348   GetPrevLBData(tmp_lb_type, tmp_max_avg_ratio, tmp_comm_ratio);
349
350   double tolerate_imb = IMB_TOLERANCE * tmp_max_avg_ratio;
351
352   if (generatePlan(period, ratio_at_t)) {
353     DEBAD(("Generated period and calculated %d and period %d max iter %d\n",
354       adaptive_struct.lb_calculated_period, period,
355       adaptive_struct.tentative_max_iter_no));
356     // set the imbalance tolerance to be ratio_at_calculated_lb_period
357     if (ratio_at_t != 1.0) {
358       DEBAD(("Changed tolerance to %lf after line eq whereas max/avg is %lf\n",
359         ratio_at_t, max/avg));
360       // Since ratio_at_t is shifter up, max/(tmp_max_avg_ratio * avg) should be
361       // compared with the tolerance
362       tolerate_imb = ratio_at_t * tmp_max_avg_ratio * OUTOFWAY_TOLERANCE;
363     }
364
365     DEBAD(("Prev LB Data Type %d, max/avg %lf, local/remote %lf\n",
366       tmp_lb_type, tmp_max_avg_ratio, tmp_comm_ratio));
367
368 #if EXTRA_FEATURE
369     if ((avg_utilization < utilization_threshold || max/avg >= tolerate_imb) &&
370           adaptive_lbdb.history_data.size() > MIN_STATS) {
371       DEBAD(("Trigger soon even though we calculated lbperiod max/avg(%lf) and \
372                                         utilization ratio (%lf)\n", max/avg, avg_utilization));
373       TriggerSoon(iteration_n, max/avg, tolerate_imb);
374       return;
375     }
376 #endif
377
378     // If the new lb period from linear extrapolation is greater than maximum
379     // iteration known from previously collected data, then inform all the
380     // processors about the new calculated period.
381     if (period > adaptive_struct.tentative_max_iter_no && period !=
382           adaptive_struct.final_lb_period) {
383       adaptive_struct.doCommStrategy = false;
384       adaptive_struct.lb_calculated_period = period;
385       adaptive_struct.in_progress = true;
386       DEBAD(("Sticking to the calculated period %d\n",
387           adaptive_struct.lb_calculated_period));
388       thisProxy.LoadBalanceDecision(adaptive_struct.lb_msg_send_no++,
389         adaptive_struct.lb_calculated_period);
390       return;
391     }
392     return;
393   }
394
395   DEBAD(("Prev LB Data Type %d, max/avg %lf, local/remote %lf\n", tmp_lb_type,
396       tmp_max_avg_ratio, tmp_comm_ratio));
397
398 #if EXTRA_FEATURE
399   // This would be called when linear extrapolation did not provide suitable
400   // period provided there is enough  historical data 
401   if ((avg_utilization < utilization_threshold || max/avg >= tolerate_imb) &&
402                         adaptive_lbdb.history_data.size() > 4) {
403     DEBAD(("Carry out load balancing step at iter max/avg(%lf) and utilization \
404       ratio (%lf)\n", max/avg, avg_utilization));
405     TriggerSoon(iteration_n, max/avg, tolerate_imb);
406     return;
407   }
408 #endif
409
410 }
411
412 void MetaBalancer::TriggerSoon(int iteration_n, double imbalance_ratio,
413     double tolerate_imb) {
414
415   // If the previously calculated_period (not the final decision) is greater
416   // than the iter +1 and if it is greater than the maximum iteration we have
417   // seen so far, then we can inform this
418   if ((iteration_n + 1 > adaptive_struct.tentative_max_iter_no) &&
419       (iteration_n+1 < adaptive_struct.lb_calculated_period) &&
420       (iteration_n + 1 != adaptive_struct.final_lb_period)) {
421     if (imbalance_ratio < tolerate_imb) {
422       adaptive_struct.doCommStrategy = true;
423       DEBAD(("No load imbalance but idle time\n"));
424     } else {
425       adaptive_struct.doCommStrategy = false;
426       DEBAD(("load imbalance \n"));
427     }
428     adaptive_struct.lb_calculated_period = iteration_n + 1;
429     adaptive_struct.in_progress = true;
430     DEBAD(("Informing everyone the lb period is %d\n",
431         adaptive_struct.lb_calculated_period));
432     thisProxy.LoadBalanceDecision(adaptive_struct.lb_msg_send_no++,
433         adaptive_struct.lb_calculated_period);
434   }
435 }
436
437 bool MetaBalancer::generatePlan(int& period, double& ratio_at_t) {
438   if (adaptive_lbdb.history_data.size() <= 4) {
439     return false;
440   }
441
442   // Some heuristics for lbperiod
443   // If constant load or almost constant,
444   // then max * new_lb_period > avg * new_lb_period + lb_cost
445   double max = 0.0;
446   double avg = 0.0;
447   AdaptiveData data;
448   for (int i = 0; i < adaptive_lbdb.history_data.size(); i++) {
449     data = adaptive_lbdb.history_data[i];
450     max += data.max_load;
451     avg += data.avg_load;
452     DEBAD(("max (%d, %lf) avg (%d, %lf)\n", i, data.max_load, i, data.avg_load));
453   }
454 //  max /= (adaptive_struct.lb_iteration_no - adaptive_lbdb.history_data[0].iteration);
455 //  avg /= (adaptive_struct.lb_iteration_no - adaptive_lbdb.history_data[0].iteration);
456
457   // If linearly varying load, then find lb_period
458   // area between the max and avg curve
459   // If we can attain perfect balance, then the new load is close to the
460   // average. Hence we pass 1, else pass in some other value which would be the
461   // new max_load after load balancing.
462   int tmp_lb_type;
463   double tmp_max_avg_ratio, tmp_comm_ratio;
464   double tolerate_imb;
465
466 #if EXTRA_FEATURE
467   // First get the data for refine.
468   GetLBDataForLB(1, tmp_max_avg_ratio, tmp_comm_ratio);
469   tolerate_imb = tmp_max_avg_ratio;
470
471   // If RefineLB does a good job, then find the period considering RefineLB
472   if (tmp_max_avg_ratio <= 1.01) {
473     if (max/avg < tolerate_imb) {
474       DEBAD(("Resorting to imb = 1.0 coz max/avg (%lf) < imb(%lf)\n", max/avg,
475           tolerate_imb));
476       tolerate_imb = 1.0;
477     }
478     DEBAD(("Will generate plan for refine %lf imb and %lf overhead\n",
479         tolerate_imb, 0.2));
480     return getPeriodForStrategy(tolerate_imb, 0.2, period, ratio_at_t);
481   }
482
483   GetLBDataForLB(0, tmp_max_avg_ratio, tmp_comm_ratio);
484 #endif
485
486   GetPrevLBData(tmp_lb_type, tmp_max_avg_ratio, tmp_comm_ratio);
487   tolerate_imb = tmp_max_avg_ratio;
488 //  if (max/avg < tolerate_imb) {
489 //    CkPrintf("Resorting to imb = 1.0 coz max/avg (%lf) < imb(%lf)\n", max/avg, tolerate_imb);
490 //    tolerate_imb = 1.0;
491 //  }
492   if (max/avg > tolerate_imb) {
493     if (getPeriodForStrategy(tolerate_imb, 1, period, ratio_at_t)) {
494       return true;
495     }
496   }
497
498   max = 0.0;
499   avg = 0.0;
500   for (int i = 0; i < adaptive_lbdb.history_data.size(); i++) {
501     data = adaptive_lbdb.history_data[i];
502     max += data.max_load;
503     avg += data.avg_load*tolerate_imb;
504   }
505   max /= adaptive_lbdb.history_data.size();
506   avg /= adaptive_lbdb.history_data.size();
507   double cost = adaptive_struct.lb_strategy_cost + adaptive_struct.lb_migration_cost;
508   period = (int) (cost/(max - avg));
509   DEBAD(("Obtained period %d from constant prediction tolerated \
510                         imbalance(%f)\n", period, tolerate_imb));
511   if (period < 0) { 
512     period = adaptive_struct.final_lb_period;
513     DEBAD(("Obtained -ve period from constant prediction so changing to prev %d\n", period));
514   } 
515   ratio_at_t = max / avg;
516   return true;
517 }
518
519 bool MetaBalancer::getPeriodForStrategy(double new_load_percent,
520     double overhead_percent, int& period, double& ratio_at_t) {
521   double mslope, aslope, mc, ac;
522   getLineEq(new_load_percent, aslope, ac, mslope, mc);
523   DEBAD(("new load percent %lf\n", new_load_percent));
524   DEBAD(("\n max: %fx + %f; avg: %fx + %f\n", mslope, mc, aslope, ac));
525   double a = (mslope - aslope)/2;
526   double b = (mc - ac);
527   double c = -(adaptive_struct.lb_strategy_cost +
528       adaptive_struct.lb_migration_cost) * overhead_percent;
529   DEBAD(("cost %f\n",
530       (adaptive_struct.lb_strategy_cost+adaptive_struct.lb_migration_cost)));
531   bool got_period = getPeriodForLinear(a, b, c, period);
532   if (!got_period) {
533     return false;
534   }
535
536   if (mslope < 0) {
537     if (period > (-mc/mslope)) {
538       DEBAD(("Max < 0 Period set when max load is -ve\n"));
539       return false;
540     }
541   }
542
543   if (aslope < 0) {
544     if (period > (-ac/aslope)) {
545       DEBAD(("Avg < 0 Period set when avg load is -ve\n"));
546       return false;
547     }
548   }
549
550   int intersection_t = (int) ((mc-ac) / (aslope - mslope));
551   if (intersection_t > 0 && period > intersection_t) {
552     DEBAD(("Avg | Max Period set when curves intersect\n"));
553     return false;
554   }
555   ratio_at_t = ((mslope*period + mc)/(aslope*period + ac));
556   DEBAD(("Ratio at t (%lf*%d + %lf) / (%lf*%d+%lf) = %lf\n", mslope, period, mc, aslope, period, ac, ratio_at_t));
557   return true;
558 }
559
560 bool MetaBalancer::getPeriodForLinear(double a, double b, double c, int& period) {
561   DEBAD(("Quadratic Equation %lf X^2 + %lf X + %lf\n", a, b, c));
562   if (a == 0.0) {
563     period = (int) (-c / b);
564     if (period < 0) {
565       DEBAD(("-ve period for -c/b (%d)\n", period));
566       return false;
567     }
568     DEBAD(("Ideal period for linear load %d\n", period));
569     return true;
570   }
571   int x;
572   double t = (b * b) - (4*a*c);
573   if (t < 0) {
574     DEBAD(("(b * b) - (4*a*c) is -ve sqrt : %lf\n", sqrt(t)));
575     return false;
576   }
577   t = (-b + sqrt(t)) / (2*a);
578   x = (int) t;
579   if (x < 0) {
580     DEBAD(("Oops!!! x (%d) < 0\n", x));
581     x = 0;
582     return false;
583   }
584   period = x;
585   DEBAD(("Ideal period for linear load %d\n", period));
586   return true;
587 }
588
589 bool MetaBalancer::getLineEq(double new_load_percent, double& aslope, double& ac, double& mslope, double& mc) {
590   int total = adaptive_lbdb.history_data.size();
591   int iterations = (int) (1 + adaptive_lbdb.history_data[total - 1].iteration -
592       adaptive_lbdb.history_data[0].iteration);
593   double a1 = 0;
594   double m1 = 0;
595   double a2 = 0;
596   double m2 = 0;
597   AdaptiveData data;
598   int i = 0;
599   for (i = 0; i < total/2; i++) {
600     data = adaptive_lbdb.history_data[i];
601     m1 += data.max_load;
602     a1 += data.avg_load;
603     DEBAD(("max (%d, %lf) avg (%d, %lf) adjusted_avg (%d, %lf)\n", i, data.max_load, i, data.avg_load, i, new_load_percent*data.avg_load));
604   }
605   m1 /= i;
606   a1 = (a1 * new_load_percent) / i;
607
608   for (i = total/2; i < total; i++) {
609     data = adaptive_lbdb.history_data[i];
610     m2 += data.max_load;
611     a2 += data.avg_load;
612     DEBAD(("max (%d, %lf) avg (%d, %lf) adjusted_avg (%d, %lf)\n", i, data.max_load, i, data.avg_load, i, new_load_percent*data.avg_load));
613   }
614   m2 /= (i - total/2);
615   a2 = (a2 * new_load_percent) / (i - total/2);
616
617   aslope = 2 * (a2 - a1) / iterations;
618   mslope = 2 * (m2 - m1) / iterations;
619   ac = adaptive_lbdb.history_data[0].avg_load * new_load_percent;
620   mc = adaptive_lbdb.history_data[0].max_load;
621
622   ac = a1 - ((aslope * total)/4);
623   mc = m1 - ((mslope * total)/4);
624
625   //ac = (adaptive_lbdb.history_data[1].avg_load * new_load_percent - aslope);
626   //mc = (adaptive_lbdb.history_data[1].max_load - mslope);
627
628   return true;
629 }
630
631 void MetaBalancer::LoadBalanceDecision(int req_no, int period) {
632   if (req_no < adaptive_struct.lb_msg_recv_no) {
633     DEBAD(("Error!!! Received a request which was already sent or old\n"));
634     return;
635   }
636   DEBADDETAIL(("[%d] Load balance decision made cur iteration: %d period:%d\n",
637                         CkMyPe(), adaptive_struct.lb_iteration_no, period));
638   adaptive_struct.tentative_period = period;
639   adaptive_struct.lb_msg_recv_no = req_no;
640   if (metaRdnGroup == NULL) {
641     metaRdnGroup = (MetaBalancerRedn*)CkLocalBranch(_metalbred);
642   }
643   if (metaRdnGroup != NULL) {
644     metaRdnGroup->getMaxIter(adaptive_struct.lb_iteration_no);
645   }
646 }
647
648 void MetaBalancer::LoadBalanceDecisionFinal(int req_no, int period) {
649   if (req_no < adaptive_struct.lb_msg_recv_no) {
650     return;
651   }
652   DEBADDETAIL(("[%d] Final Load balance decision made cur iteration: %d \
653                         period:%d \n",CkMyPe(), adaptive_struct.lb_iteration_no, period));
654   adaptive_struct.tentative_period = period;
655   adaptive_struct.final_lb_period = period;
656   lbdatabase->MetaLBResumeWaitingChares(period);
657 }
658
659 void MetaBalancer::MetaLBCallLBOnChares() {
660   lbdatabase->MetaLBCallLBOnChares();
661 }
662
663 void MetaBalancer::ReceiveIterationNo(int local_iter_no) {
664   CmiAssert(CkMyPe() == 0);
665
666   if (local_iter_no > adaptive_struct.global_max_iter_no) {
667     adaptive_struct.global_max_iter_no = local_iter_no;
668   }
669
670   int period;
671
672     if (adaptive_struct.global_max_iter_no > adaptive_struct.tentative_max_iter_no) {
673       adaptive_struct.tentative_max_iter_no = adaptive_struct.global_max_iter_no;
674     }
675     period = (adaptive_struct.tentative_period > adaptive_struct.global_max_iter_no) ?
676                                 adaptive_struct.tentative_period : adaptive_struct.global_max_iter_no + 1;
677     // If no one has gone into load balancing stage, then we can safely change
678     // the period otherwise keep the old period.
679     if (adaptive_struct.global_max_iter_no < adaptive_struct.final_lb_period) {
680       adaptive_struct.tentative_period = period;
681       DEBAD(("Final lb_period CHANGED!%d\n", adaptive_struct.tentative_period));
682     } else {
683       adaptive_struct.tentative_period = adaptive_struct.final_lb_period;
684       DEBAD(("Final lb_period NOT CHANGED!%d\n", adaptive_struct.tentative_period));
685     }
686     thisProxy.LoadBalanceDecisionFinal(adaptive_struct.lb_msg_recv_no, adaptive_struct.tentative_period);
687     adaptive_struct.in_progress = false;
688 }
689
690 int MetaBalancer::getPredictedLBPeriod(bool& is_tentative) {
691   // If tentative and final_lb_period are the same, then the decision has been
692   // made but if not, they are in the middle of consensus, hence return the
693   // lease of the two
694   if (adaptive_struct.tentative_period != adaptive_struct.final_lb_period) {
695     is_tentative = true;
696   } else {
697     is_tentative = false;
698   }
699   if (adaptive_struct.tentative_period < adaptive_struct.final_lb_period) {
700     return adaptive_struct.tentative_period;
701    } else {
702      return adaptive_struct.final_lb_period;
703    }
704 }
705
706 // Called by CentralLB to indicate that the LB strategy and migration is in
707 // progress.
708 void MetaBalancer::ResetAdaptive() {
709   adaptive_lbdb.lb_iter_no = -1;
710   lb_in_progress = true;
711 }
712
713 // This is required for PEs with no objs
714 void MetaBalancer::periodicCall(void *ad) {
715   MetaBalancer *s = (MetaBalancer *)ad;
716   CcdCallFnAfterOnPE((CcdVoidFn)checkForNoObj, (void *)s, _nobj_timer, CkMyPe());
717 }
718
719 void MetaBalancer::checkForNoObj(void *ad) {
720   MetaBalancer *s = (MetaBalancer *) ad;
721   s->HandleAdaptiveNoObj();
722 }
723
724 // Called by LBDatabase to indicate that no objs are there in this processor
725 void MetaBalancer::HandleAdaptiveNoObj() {
726 #if CMK_LBDB_ON
727   if (lbdatabase->getLBDB()->ObjDataCount() == 0) {
728     adaptive_struct.finished_iteration_no++;
729     adaptive_struct.lb_iteration_no++;
730     DEBAD(("(%d) --HandleAdaptiveNoObj %d\n", CkMyPe(),
731           adaptive_struct.finished_iteration_no));
732     thisProxy[0].RegisterNoObjCallback(CkMyPe());
733     TriggerAdaptiveReduction();
734   }
735 #endif
736 }
737
738 void MetaBalancer::RegisterNoObjCallback(int index) {
739   // If the load balancing process (migration) is going on and in the meantime
740   // one of the processor finishes everything and finds that there are no objs
741   // in it, then it registers a callback. So clear the old data in case it
742   // hasn't been done.
743   if (lb_in_progress) {
744     lbdb_no_obj_callback.clear();
745     lb_in_progress = false;
746   }
747   lbdb_no_obj_callback.push_back(index);
748   DEBAD(("Registered %d to have no objs.\n", index));
749
750   // If collection has already happened and this is second iteration, then
751   // trigger reduction.
752   if (adaptive_lbdb.lb_iter_no != -1) {
753     DEBAD(("Collection already started now %d so kick in\n",
754         adaptive_struct.finished_iteration_no));
755     //thisProxy[index].TriggerAdaptiveReduction();
756   }
757 }
758
759 void MetaBalancer::TriggerAdaptiveReduction() {
760 #if CMK_LBDB_ON
761   if (lbdatabase->getLBDB()->ObjDataCount() == 0) {
762     adaptive_struct.finished_iteration_no++;
763     adaptive_struct.lb_iteration_no++;
764     double lb_data[STATS_COUNT];
765     lb_data[0] = adaptive_struct.finished_iteration_no;
766     lb_data[1] = 1;
767     lb_data[2] = 0.0;
768     lb_data[3] = 0.0;
769     lb_data[4] = 0.0;
770     lb_data[5] = 0.0;
771     lb_data[6] = 0.0;
772     lb_data[7] = 0.0;
773
774     DEBAD(("[%d] Triggered adaptive reduction for noobj %d\n", CkMyPe(),
775           adaptive_struct.finished_iteration_no));
776
777     CkCallback cb(CkIndex_MetaBalancer::ReceiveMinStats((CkReductionMsg*)NULL),
778         thisProxy[0]);
779     contribute(STATS_COUNT*sizeof(double), lb_data, lbDataCollectionType, cb);
780   }
781 #endif
782 }
783
784
785 bool MetaBalancer::isStrategyComm() {
786   return adaptive_struct.doCommStrategy;
787 }
788
789 void MetaBalancer::SetMigrationCost(double lb_migration_cost) {
790   adaptive_struct.lb_migration_cost = lb_migration_cost;
791 }
792
793 void MetaBalancer::SetStrategyCost(double lb_strategy_cost) {
794   adaptive_struct.lb_strategy_cost = lb_strategy_cost;
795 }
796
797 void MetaBalancer::UpdateAfterLBData(int lb, double lb_max, double lb_avg, double
798     local_comm, double remote_comm) {
799   adaptive_struct.last_lb_type = lb;
800   if (lb == 0) {
801     adaptive_struct.greedy_info.max_avg_ratio = lb_max/lb_avg;
802   } else if (lb == 1) {
803     adaptive_struct.refine_info.max_avg_ratio = lb_max/lb_avg;
804   } else if (lb == 2) {
805     adaptive_struct.comm_info.remote_local_ratio = remote_comm/local_comm;
806   } else if (lb == 3) {
807     adaptive_struct.comm_refine_info.remote_local_ratio =
808     remote_comm/local_comm;
809   }
810 }
811
812 void MetaBalancer::UpdateAfterLBData(double max_load, double max_cpu,
813     double avg_load) {
814   if (adaptive_struct.last_lb_type == -1) {
815     adaptive_struct.last_lb_type = 0;
816   }
817   int lb = adaptive_struct.last_lb_type;
818   if (lb == 0) {
819     adaptive_struct.greedy_info.max_avg_ratio = max_load/avg_load;
820   } else if (lb == 1) {
821     adaptive_struct.refine_info.max_avg_ratio = max_load/avg_load;
822   } else if (lb == 2) {
823     adaptive_struct.comm_info.max_avg_ratio = max_load/avg_load;
824   } else if (lb == 3) {
825     adaptive_struct.comm_refine_info.max_avg_ratio = max_load/avg_load;
826   }
827 }
828
829 void MetaBalancer::UpdateAfterLBComm(double alpha_beta_to_load) {
830   DEBAD(("Setting alpha beta %lf\n", alpha_beta_to_load));
831   alpha_beta_cost_to_load = alpha_beta_to_load;
832 }
833
834
835 void MetaBalancer::GetPrevLBData(int& lb_type, double& lb_max_avg_ratio,
836     double& remote_local_comm_ratio) {
837   lb_type = adaptive_struct.last_lb_type;
838   lb_max_avg_ratio = 1;
839   remote_local_comm_ratio = 1;
840   GetLBDataForLB(lb_type, lb_max_avg_ratio, remote_local_comm_ratio);
841
842   // Based on the first iteration
843   lb_max_avg_ratio = adaptive_struct.info_first_iter.max_avg_ratio;
844 }
845
846 void MetaBalancer::GetLBDataForLB(int lb_type, double& lb_max_avg_ratio,
847     double& remote_local_comm_ratio) {
848   if (lb_type == 0) {
849     lb_max_avg_ratio = adaptive_struct.greedy_info.max_avg_ratio;
850   } else if (lb_type == 1) {
851     lb_max_avg_ratio = adaptive_struct.refine_info.max_avg_ratio;
852   } else if (lb_type == 2) {
853     remote_local_comm_ratio = adaptive_struct.comm_info.remote_local_ratio;
854   } else if (lb_type == 3) {
855     remote_local_comm_ratio =
856        adaptive_struct.comm_refine_info.remote_local_ratio;
857   }
858 }
859
860 void MetaBalancerRedn::init() {
861 //  metabalancer = (MetaBalancer *)CkLocalBranch(_metalb);
862   metabalancer = NULL;
863 }
864
865 void MetaBalancerRedn::pup(PUP::er& p) {
866         CBase_MetaBalancerRedn::pup(p);
867 }
868
869 void MetaBalancerRedn::ReceiveIterNo(int max_iter) {
870   CkAssert(CkMyPe() == 0);
871   if (metabalancer == NULL) {
872     metabalancer = (MetaBalancer*)CkLocalBranch(_metalb);
873   }
874   if (metabalancer != NULL) {
875     metabalancer->ReceiveIterationNo(max_iter);
876   }
877 }
878
879 void MetaBalancerRedn::getMaxIter(int max_iter) {
880   CkCallback cb(CkReductionTarget(MetaBalancerRedn, ReceiveIterNo), thisProxy[0]);
881   contribute(sizeof(int), &max_iter, CkReduction::max_int, cb);
882 }
883
884 #include "MetaBalancer.def.h"
885
886 /*@}*/