a3fb3f05942a2a6479a62f50f622775394694766
[charm.git] / src / ck-ldb / MetaBalancer.C
1 /**
2  * 
3 */
4 /*@{*/
5
6 #include "converse.h"
7
8 /*
9  */
10
11 #include "MetaBalancer.h"
12 #include "topology.h"
13
14 #include "limits.h"
15
16 #define VEC_SIZE 50
17 #define IMB_TOLERANCE 1.1
18 #define OUTOFWAY_TOLERANCE 2
19 #define UTILIZATION_THRESHOLD 0.7
20 #define NEGLECT_IDLE 2 // Should never be == 1
21 #define MIN_STATS 6
22 #define STATS_COUNT 8 // The number of stats collected during reduction
23
24 #define DEBAD(x) /*CkPrintf x*/
25 #define DEBADDETAIL(x) /*CkPrintf x*/
26 #define EXTRA_FEATURE 0
27
28 CkReductionMsg* lbDataCollection(int nMsg, CkReductionMsg** msgs) {
29   double lb_data[STATS_COUNT];
30   lb_data[1] = 0.0; // total number of processors contributing
31   lb_data[2] = 0.0; // total load
32   lb_data[3] = 0.0; // max load
33   lb_data[4] = 0.0; // idle time
34   lb_data[5] = 1.0; // utilization
35   lb_data[6] = 0.0; // total load with bg
36   lb_data[7] = 0.0; // max load with bg
37   for (int i = 0; i < nMsg; i++) {
38     CkAssert(msgs[i]->getSize() == STATS_COUNT*sizeof(double));
39     if (msgs[i]->getSize() != STATS_COUNT*sizeof(double)) {
40       CkPrintf("Error!!! Reduction not correct. Msg size is %d\n",
41           msgs[i]->getSize());
42       CkAbort("Incorrect Reduction size in MetaBalancer\n");
43     }
44     double* m = (double *)msgs[i]->getData();
45     // Total count
46     lb_data[1] += m[1];
47     // Avg load
48     lb_data[2] += m[2];
49     // Max load
50     lb_data[3] = ((m[3] > lb_data[3])? m[3] : lb_data[3]);
51     // Avg idle
52     lb_data[4] += m[4];
53     // Get least utilization
54     lb_data[5] = ((m[5] < lb_data[5]) ? m[5] : lb_data[5]);
55     // Get Avg load with bg
56     lb_data[6] += m[6];
57     // Get Max load with bg
58     lb_data[7] = ((m[7] > lb_data[7])? m[7] : lb_data[7]);
59     if (i == 0) {
60       // Iteration no
61       lb_data[0] = m[0];
62     }
63     if (m[0] != lb_data[0]) {
64       CkPrintf("Error!!! Reduction is intermingled between iteration %lf \
65         and %lf\n", lb_data[0], m[0]);
66       CkAbort("Intermingling iterations in MetaBalancer\n");
67     }
68   }
69   return CkReductionMsg::buildNew(STATS_COUNT*sizeof(double), lb_data);
70 }
71
72 /*global*/ CkReduction::reducerType lbDataCollectionType;
73 /*initcall*/ void registerLBDataCollection(void) {
74   lbDataCollectionType = CkReduction::addReducer(lbDataCollection);
75 }
76
77 CkGroupID _metalb;
78 CkGroupID _metalbred;
79
80 CkpvDeclare(int, metalbInited);  /**< true if metabalancer is inited */
81
82 // mainchare
83 MetaLBInit::MetaLBInit(CkArgMsg *m) {
84 #if CMK_LBDB_ON
85   _metalb = CProxy_MetaBalancer::ckNew();
86   _metalbred = CProxy_MetaBalancerRedn::ckNew();
87 #endif
88   delete m;
89 }
90
91 // called from init.C
92 void _metabalancerInit() {
93   CkpvInitialize(int, metalbInited);
94   CkpvAccess(metalbInited) = 0;
95 }
96
97 void MetaBalancer::initnodeFn() {
98 }
99
100 // called by my constructor
101 void MetaBalancer::init(void) {
102   lbdatabase = (LBDatabase *)CkLocalBranch(_lbdb);
103   CkpvAccess(metalbInited) = 1;
104   total_load_vec.resize(VEC_SIZE, 0.0);
105   total_count_vec.resize(VEC_SIZE, 0);
106   prev_idle = 0.0;
107   alpha_beta_cost_to_load = 1.0; // Some random value. TODO: Find the actual
108
109   metaRdnGroup = (MetaBalancerRedn*)CkLocalBranch(_metalbred);
110
111   adaptive_lbdb.lb_iter_no = -1;
112
113   // If metabalancer enabled, initialize the variables
114   adaptive_struct.tentative_period =  INT_MAX;
115   adaptive_struct.final_lb_period =  INT_MAX;
116   adaptive_struct.lb_calculated_period = INT_MAX;
117   adaptive_struct.lb_iteration_no = -1;
118   adaptive_struct.finished_iteration_no = -1;
119   adaptive_struct.global_max_iter_no = 0;
120   adaptive_struct.tentative_max_iter_no = -1;
121   adaptive_struct.in_progress = false;
122   adaptive_struct.lb_strategy_cost = 0.0;
123   adaptive_struct.lb_migration_cost = 0.0;
124   adaptive_struct.lb_msg_send_no = 0;
125   adaptive_struct.lb_msg_recv_no = 0;
126   adaptive_struct.total_syncs_called = 0;
127   adaptive_struct.last_lb_type = -1;
128
129
130   // This is indicating if the load balancing strategy and migration started.
131   // This is mainly used to register callbacks for noobj pes. They would
132   // register as soon as resumefromsync is called. On receiving the handles at
133   // the central pe, it clears the previous handlers and sets lb_in_progress
134   // to false so that it doesn't clear the handles.
135   lb_in_progress = false;
136
137   is_prev_lb_refine = -1;
138   if (_lb_args.metaLbOn()) {
139     periodicCall((void *) this);
140   }
141 }
142
143 void MetaBalancer::pup(PUP::er& p) {
144         CBase_MetaBalancer::pup(p);
145   if (p.isUnpacking()) {
146     lbdatabase = (LBDatabase *)CkLocalBranch(_lbdb);
147     metaRdnGroup = (MetaBalancerRedn*)CkLocalBranch(_metalbred);
148   }
149   p|prev_idle;
150   p|alpha_beta_cost_to_load;
151   p|is_prev_lb_refine;
152   p|lb_in_progress;
153 }
154
155
156 void MetaBalancer::ResumeClients() {
157   // If metabalancer enabled, initialize the variables
158   adaptive_lbdb.history_data.clear();
159
160   adaptive_struct.tentative_period =  INT_MAX;
161   adaptive_struct.final_lb_period =  INT_MAX;
162   adaptive_struct.lb_calculated_period = INT_MAX;
163   adaptive_struct.lb_iteration_no = -1;
164   adaptive_struct.finished_iteration_no = -1;
165   adaptive_struct.global_max_iter_no = 0;
166   adaptive_struct.tentative_max_iter_no = -1;
167   adaptive_struct.in_progress = false;
168   adaptive_struct.lb_strategy_cost = 0.0;
169   adaptive_struct.lb_migration_cost = 0.0;
170   adaptive_struct.lb_msg_send_no = 0;
171   adaptive_struct.lb_msg_recv_no = 0;
172   adaptive_struct.total_syncs_called = 0;
173
174   prev_idle = 0.0;
175   if (lb_in_progress) {
176     lbdb_no_obj_callback.clear();
177     lb_in_progress = false;
178   }
179   HandleAdaptiveNoObj();
180 }
181
182 int MetaBalancer::get_iteration() {
183   return adaptive_struct.lb_iteration_no;
184 }
185
186 int MetaBalancer::get_finished_iteration() {
187   return adaptive_struct.finished_iteration_no;
188 }
189
190 bool MetaBalancer::AddLoad(int it_n, double load) {
191   int index = it_n % VEC_SIZE;
192   total_count_vec[index]++;
193   adaptive_struct.total_syncs_called++;
194   DEBADDETAIL(("At PE %d Total contribution for iteration %d is %d \
195       total objs %d\n", CkMyPe(), it_n, total_count_vec[index],
196       lbdatabase->getLBDB()->ObjDataCount()));
197
198   if (it_n <= adaptive_struct.finished_iteration_no) {
199     CkAbort("Error!! Received load for iteration that has contributed\n");
200   }
201   if (it_n > adaptive_struct.lb_iteration_no) {
202     adaptive_struct.lb_iteration_no = it_n;
203   }
204   total_load_vec[index] += load;
205   if (total_count_vec[index] > lbdatabase->getLBDB()->ObjDataCount()) {
206     CkPrintf("iteration %d received %d contributions and expected %d\n", it_n,
207         total_count_vec[index], lbdatabase->getLBDB()->ObjDataCount());
208     CkAbort("Abort!!! Received more contribution");
209   }
210
211   if (total_count_vec[index] == lbdatabase->getLBDB()->ObjDataCount()) {
212     double idle_time, bg_walltime, cpu_bgtime;
213     lbdatabase->IdleTime(&idle_time);
214     lbdatabase->BackgroundLoad(&bg_walltime, &cpu_bgtime);
215
216     int sync_for_bg = adaptive_struct.total_syncs_called +
217         lbdatabase->getLBDB()->ObjDataCount();
218     bg_walltime = bg_walltime * lbdatabase->getLBDB()->ObjDataCount() / sync_for_bg;
219
220     if (it_n < NEGLECT_IDLE) {
221       prev_idle = idle_time;
222     }
223     idle_time -= prev_idle;
224
225     // The chares do not contribute their 0th iteration load. So the total syncs
226     // in reality is total_syncs_called + obj_counts
227     int total_countable_syncs = adaptive_struct.total_syncs_called +
228         (1 - NEGLECT_IDLE) * lbdatabase->getLBDB()->ObjDataCount(); // TODO: Fix me!
229     if (total_countable_syncs != 0) {
230       idle_time = idle_time * lbdatabase->getLBDB()->ObjDataCount() / total_countable_syncs;
231     }
232
233     double lb_data[STATS_COUNT];
234     lb_data[0] = it_n;
235     lb_data[1] = 1;
236     lb_data[2] = total_load_vec[index]; // For average load
237     lb_data[3] = total_load_vec[index]; // For max load
238     // Set utilization
239     if (total_load_vec[index] == 0.0) {
240         lb_data[4] = 0.0;
241       lb_data[5] = 0.0;
242     } else {
243       lb_data[4] = total_load_vec[index]/(idle_time + total_load_vec[index]);
244       lb_data[5] = total_load_vec[index]/(idle_time + total_load_vec[index]);
245     }
246     lb_data[6] = lb_data[2] + bg_walltime; // For Avg load with bg
247     lb_data[7] = lb_data[6]; // For Max load with bg
248     total_load_vec[index] = 0.0;
249     total_count_vec[index] = 0;
250
251     adaptive_struct.finished_iteration_no = it_n;
252     DEBADDETAIL(("[%d] sends total load %lf idle time %lf utilization %lf at iter %d\n",
253         CkMyPe(), total_load_vec[index], idle_time,
254         lb_data[5], adaptive_struct.finished_iteration_no));
255
256     CkCallback cb(CkIndex_MetaBalancer::ReceiveMinStats((CkReductionMsg*)NULL), thisProxy[0]);
257     contribute(STATS_COUNT*sizeof(double), lb_data, lbDataCollectionType, cb);
258   }
259   return true;
260 }
261
262 void MetaBalancer::ReceiveMinStats(CkReductionMsg *msg) {
263   double* load = (double *) msg->getData();
264   double avg = load[2]/load[1];
265   double max = load[3];
266   double avg_utilization = load[4]/load[1];
267   double min_utilization = load[5];
268   int iteration_n = (int) load[0];
269   double avg_load_bg = load[6]/load[1];
270   double max_load_bg = load[7];
271         // Set the max and avg to be the load with background
272         max = max_load_bg;
273         avg = avg_load_bg;
274   DEBAD(("** [%d] Iteration Avg load: %lf Max load: %lf Avg Util : %lf \
275       Min Util : %lf for %lf procs\n",iteration_n, avg, max, avg_utilization,
276       min_utilization, load[1]));
277   delete msg;
278
279   // For processors with  no  objs, trigger MetaBalancer reduction
280   if (adaptive_struct.final_lb_period != iteration_n) {
281     for (int i = 0; i < lbdb_no_obj_callback.size(); i++) {
282       thisProxy[lbdb_no_obj_callback[i]].TriggerAdaptiveReduction();
283     }
284   }
285
286   // Store the data for this iteration
287   adaptive_lbdb.lb_iter_no = iteration_n;
288   AdaptiveData data;
289   data.iteration = adaptive_lbdb.lb_iter_no;
290   data.max_load = max;
291   data.avg_load = avg;
292   data.min_utilization = min_utilization;
293   data.avg_utilization = avg_utilization;
294   adaptive_lbdb.history_data.push_back(data);
295
296   if (iteration_n == 1) {
297     adaptive_struct.info_first_iter.max_avg_ratio = max/avg;
298   }
299
300
301   if (adaptive_struct.final_lb_period == iteration_n) {
302     thisProxy.MetaLBCallLBOnChares();
303   }
304
305   // If lb period inform is in progress, dont inform again.
306   // If this is the lb data corresponding to the final lb period informed, then
307   // don't recalculate as some of the processors might have already gone into
308   // LB_STAGE.
309   if (adaptive_struct.in_progress || 
310       (adaptive_struct.final_lb_period == iteration_n)) {
311     return;
312   }
313
314         // If the utilization is beyond 90%, then do nothing
315         if (data.avg_utilization >= 0.90) {
316                 return;
317         }
318
319   double utilization_threshold = UTILIZATION_THRESHOLD;
320
321 #if EXTRA_FEATURE
322   DEBAD(("alpha_beta_to_load %lf\n", alpha_beta_cost_to_load));
323   if (alpha_beta_cost_to_load < 0.1) {
324     // Ignore the effect of idle time and there by lesser utilization. So we
325     // assign utilization threshold to be 0.0
326     DEBAD(("Changing the idle load tolerance coz this isn't \
327         communication intensive benchmark\n"));
328     utilization_threshold = 0.0;
329   }
330 #endif
331
332   // First generate the lb period based on the cost of lb. Find out what is the
333   // expected imbalance at the calculated lb period.
334   int period;
335   // This is based on the new max load after load balancing. So technically, it
336   // is calculated based on the shifter up avg curve.
337   double ratio_at_t = 1.0;
338   int tmp_lb_type;
339   double tmp_max_avg_ratio, tmp_comm_ratio;
340   GetPrevLBData(tmp_lb_type, tmp_max_avg_ratio, tmp_comm_ratio);
341
342   double tolerate_imb = IMB_TOLERANCE * tmp_max_avg_ratio;
343
344   if (generatePlan(period, ratio_at_t)) {
345     DEBAD(("Generated period and calculated %d and period %d max iter %d\n",
346       adaptive_struct.lb_calculated_period, period,
347       adaptive_struct.tentative_max_iter_no));
348     // set the imbalance tolerance to be ratio_at_calculated_lb_period
349     if (ratio_at_t != 1.0) {
350       DEBAD(("Changed tolerance to %lf after line eq whereas max/avg is %lf\n",
351         ratio_at_t, max/avg));
352       // Since ratio_at_t is shifter up, max/(tmp_max_avg_ratio * avg) should be
353       // compared with the tolerance
354       tolerate_imb = ratio_at_t * tmp_max_avg_ratio * OUTOFWAY_TOLERANCE;
355     }
356
357     DEBAD(("Prev LB Data Type %d, max/avg %lf, local/remote %lf\n",
358       tmp_lb_type, tmp_max_avg_ratio, tmp_comm_ratio));
359
360     if ((avg_utilization < utilization_threshold || max/avg >= tolerate_imb) &&
361           adaptive_lbdb.history_data.size() > MIN_STATS) {
362       DEBAD(("Trigger soon even though we calculated lbperiod max/avg(%lf) and \
363                                         utilization ratio (%lf)\n", max/avg, avg_utilization));
364       TriggerSoon(iteration_n, max/avg, tolerate_imb);
365       return;
366     }
367
368     // If the new lb period from linear extrapolation is greater than maximum
369     // iteration known from previously collected data, then inform all the
370     // processors about the new calculated period.
371     if (period > adaptive_struct.tentative_max_iter_no && period !=
372           adaptive_struct.final_lb_period) {
373       adaptive_struct.doCommStrategy = false;
374       adaptive_struct.lb_calculated_period = period;
375       adaptive_struct.in_progress = true;
376       DEBAD(("Sticking to the calculated period %d\n",
377           adaptive_struct.lb_calculated_period));
378       thisProxy.LoadBalanceDecision(adaptive_struct.lb_msg_send_no++,
379         adaptive_struct.lb_calculated_period);
380       return;
381     }
382     return;
383   }
384
385   DEBAD(("Prev LB Data Type %d, max/avg %lf, local/remote %lf\n", tmp_lb_type,
386       tmp_max_avg_ratio, tmp_comm_ratio));
387
388   // This would be called when linear extrapolation did not provide suitable
389   // period provided there is enough  historical data 
390   if ((avg_utilization < utilization_threshold || max/avg >= tolerate_imb) &&
391                         adaptive_lbdb.history_data.size() > 4) {
392     DEBAD(("Carry out load balancing step at iter max/avg(%lf) and utilization \
393       ratio (%lf)\n", max/avg, avg_utilization));
394     TriggerSoon(iteration_n, max/avg, tolerate_imb);
395     return;
396   }
397
398 }
399
400 void MetaBalancer::TriggerSoon(int iteration_n, double imbalance_ratio,
401     double tolerate_imb) {
402
403   // If the previously calculated_period (not the final decision) is greater
404   // than the iter +1 and if it is greater than the maximum iteration we have
405   // seen so far, then we can inform this
406   if ((iteration_n + 1 > adaptive_struct.tentative_max_iter_no) &&
407       (iteration_n+1 < adaptive_struct.lb_calculated_period) &&
408       (iteration_n + 1 != adaptive_struct.final_lb_period)) {
409     if (imbalance_ratio < tolerate_imb) {
410       adaptive_struct.doCommStrategy = true;
411       DEBAD(("No load imbalance but idle time\n"));
412     } else {
413       adaptive_struct.doCommStrategy = false;
414       DEBAD(("load imbalance \n"));
415     }
416     adaptive_struct.lb_calculated_period = iteration_n + 1;
417     adaptive_struct.in_progress = true;
418     DEBAD(("Informing everyone the lb period is %d\n",
419         adaptive_struct.lb_calculated_period));
420     thisProxy.LoadBalanceDecision(adaptive_struct.lb_msg_send_no++,
421         adaptive_struct.lb_calculated_period);
422   }
423 }
424
425 bool MetaBalancer::generatePlan(int& period, double& ratio_at_t) {
426   if (adaptive_lbdb.history_data.size() <= 4) {
427     return false;
428   }
429
430   // Some heuristics for lbperiod
431   // If constant load or almost constant,
432   // then max * new_lb_period > avg * new_lb_period + lb_cost
433   double max = 0.0;
434   double avg = 0.0;
435   AdaptiveData data;
436   for (int i = 0; i < adaptive_lbdb.history_data.size(); i++) {
437     data = adaptive_lbdb.history_data[i];
438     max += data.max_load;
439     avg += data.avg_load;
440     DEBAD(("max (%d, %lf) avg (%d, %lf)\n", i, data.max_load, i, data.avg_load));
441   }
442 //  max /= (adaptive_struct.lb_iteration_no - adaptive_lbdb.history_data[0].iteration);
443 //  avg /= (adaptive_struct.lb_iteration_no - adaptive_lbdb.history_data[0].iteration);
444
445   // If linearly varying load, then find lb_period
446   // area between the max and avg curve
447   // If we can attain perfect balance, then the new load is close to the
448   // average. Hence we pass 1, else pass in some other value which would be the
449   // new max_load after load balancing.
450   int tmp_lb_type;
451   double tmp_max_avg_ratio, tmp_comm_ratio;
452   double tolerate_imb;
453
454 #if EXTRA_FEATURE
455   // First get the data for refine.
456   GetLBDataForLB(1, tmp_max_avg_ratio, tmp_comm_ratio);
457   tolerate_imb = tmp_max_avg_ratio;
458
459   // If RefineLB does a good job, then find the period considering RefineLB
460   if (tmp_max_avg_ratio <= 1.01) {
461     if (max/avg < tolerate_imb) {
462       DEBAD(("Resorting to imb = 1.0 coz max/avg (%lf) < imb(%lf)\n", max/avg,
463           tolerate_imb));
464       tolerate_imb = 1.0;
465     }
466     DEBAD(("Will generate plan for refine %lf imb and %lf overhead\n",
467         tolerate_imb, 0.2));
468     return getPeriodForStrategy(tolerate_imb, 0.2, period, ratio_at_t);
469   }
470
471   GetLBDataForLB(0, tmp_max_avg_ratio, tmp_comm_ratio);
472 #endif
473
474   GetPrevLBData(tmp_lb_type, tmp_max_avg_ratio, tmp_comm_ratio);
475   tolerate_imb = tmp_max_avg_ratio;
476 //  if (max/avg < tolerate_imb) {
477 //    CkPrintf("Resorting to imb = 1.0 coz max/avg (%lf) < imb(%lf)\n", max/avg, tolerate_imb);
478 //    tolerate_imb = 1.0;
479 //  }
480   if (max/avg > tolerate_imb) {
481     if (getPeriodForStrategy(tolerate_imb, 1, period, ratio_at_t)) {
482       return true;
483     }
484   }
485
486   max = 0.0;
487   avg = 0.0;
488   for (int i = 0; i < adaptive_lbdb.history_data.size(); i++) {
489     data = adaptive_lbdb.history_data[i];
490     max += data.max_load;
491     avg += data.avg_load*tolerate_imb;
492   }
493   max /= adaptive_lbdb.history_data.size();
494   avg /= adaptive_lbdb.history_data.size();
495   double cost = adaptive_struct.lb_strategy_cost + adaptive_struct.lb_migration_cost;
496   period = (int) (cost/(max - avg));
497   DEBAD(("Obtained period %d from constant prediction tolerated \
498                         imbalance(%f)\n", period, tolerate_imb));
499   if (period < 0) { 
500     period = adaptive_struct.final_lb_period;
501     DEBAD(("Obtained -ve period from constant prediction so changing to prev %d\n", period));
502   } 
503   ratio_at_t = max / avg;
504   return true;
505 }
506
507 bool MetaBalancer::getPeriodForStrategy(double new_load_percent,
508     double overhead_percent, int& period, double& ratio_at_t) {
509   double mslope, aslope, mc, ac;
510   getLineEq(new_load_percent, aslope, ac, mslope, mc);
511   DEBAD(("new load percent %lf\n", new_load_percent));
512   DEBAD(("\n max: %fx + %f; avg: %fx + %f\n", mslope, mc, aslope, ac));
513   double a = (mslope - aslope)/2;
514   double b = (mc - ac);
515   double c = -(adaptive_struct.lb_strategy_cost +
516       adaptive_struct.lb_migration_cost) * overhead_percent;
517   DEBAD(("cost %f\n",
518       (adaptive_struct.lb_strategy_cost+adaptive_struct.lb_migration_cost)));
519   bool got_period = getPeriodForLinear(a, b, c, period);
520   if (!got_period) {
521     return false;
522   }
523
524   if (mslope < 0) {
525     if (period > (-mc/mslope)) {
526       DEBAD(("Max < 0 Period set when max load is -ve\n"));
527       return false;
528     }
529   }
530
531   if (aslope < 0) {
532     if (period > (-ac/aslope)) {
533       DEBAD(("Avg < 0 Period set when avg load is -ve\n"));
534       return false;
535     }
536   }
537
538   int intersection_t = (int) ((mc-ac) / (aslope - mslope));
539   if (intersection_t > 0 && period > intersection_t) {
540     DEBAD(("Avg | Max Period set when curves intersect\n"));
541     return false;
542   }
543   ratio_at_t = ((mslope*period + mc)/(aslope*period + ac));
544   DEBAD(("Ratio at t (%lf*%d + %lf) / (%lf*%d+%lf) = %lf\n", mslope, period, mc, aslope, period, ac, ratio_at_t));
545   return true;
546 }
547
548 bool MetaBalancer::getPeriodForLinear(double a, double b, double c, int& period) {
549   DEBAD(("Quadratic Equation %lf X^2 + %lf X + %lf\n", a, b, c));
550   if (a == 0.0) {
551     period = (int) (-c / b);
552     if (period < 0) {
553       DEBAD(("-ve period for -c/b (%d)\n", period));
554       return false;
555     }
556     DEBAD(("Ideal period for linear load %d\n", period));
557     return true;
558   }
559   int x;
560   double t = (b * b) - (4*a*c);
561   if (t < 0) {
562     DEBAD(("(b * b) - (4*a*c) is -ve sqrt : %lf\n", sqrt(t)));
563     return false;
564   }
565   t = (-b + sqrt(t)) / (2*a);
566   x = (int) t;
567   if (x < 0) {
568     DEBAD(("Oops!!! x (%d) < 0\n", x));
569     x = 0;
570     return false;
571   }
572   period = x;
573   DEBAD(("Ideal period for linear load %d\n", period));
574   return true;
575 }
576
577 bool MetaBalancer::getLineEq(double new_load_percent, double& aslope, double& ac, double& mslope, double& mc) {
578   int total = adaptive_lbdb.history_data.size();
579   int iterations = (int) (1 + adaptive_lbdb.history_data[total - 1].iteration -
580       adaptive_lbdb.history_data[0].iteration);
581   double a1 = 0;
582   double m1 = 0;
583   double a2 = 0;
584   double m2 = 0;
585   AdaptiveData data;
586   int i = 0;
587   for (i = 0; i < total/2; i++) {
588     data = adaptive_lbdb.history_data[i];
589     m1 += data.max_load;
590     a1 += data.avg_load;
591     DEBAD(("max (%d, %lf) avg (%d, %lf) adjusted_avg (%d, %lf)\n", i, data.max_load, i, data.avg_load, i, new_load_percent*data.avg_load));
592   }
593   m1 /= i;
594   a1 = (a1 * new_load_percent) / i;
595
596   for (i = total/2; i < total; i++) {
597     data = adaptive_lbdb.history_data[i];
598     m2 += data.max_load;
599     a2 += data.avg_load;
600     DEBAD(("max (%d, %lf) avg (%d, %lf) adjusted_avg (%d, %lf)\n", i, data.max_load, i, data.avg_load, i, new_load_percent*data.avg_load));
601   }
602   m2 /= (i - total/2);
603   a2 = (a2 * new_load_percent) / (i - total/2);
604
605   aslope = 2 * (a2 - a1) / iterations;
606   mslope = 2 * (m2 - m1) / iterations;
607   ac = adaptive_lbdb.history_data[0].avg_load * new_load_percent;
608   mc = adaptive_lbdb.history_data[0].max_load;
609
610   ac = a1 - ((aslope * total)/4);
611   mc = m1 - ((mslope * total)/4);
612
613   //ac = (adaptive_lbdb.history_data[1].avg_load * new_load_percent - aslope);
614   //mc = (adaptive_lbdb.history_data[1].max_load - mslope);
615
616   return true;
617 }
618
619 void MetaBalancer::LoadBalanceDecision(int req_no, int period) {
620   if (req_no < adaptive_struct.lb_msg_recv_no) {
621     DEBAD(("Error!!! Received a request which was already sent or old\n"));
622     return;
623   }
624   DEBADDETAIL(("[%d] Load balance decision made cur iteration: %d period:%d\n",
625                         CkMyPe(), adaptive_struct.lb_iteration_no, period));
626   adaptive_struct.tentative_period = period;
627   adaptive_struct.lb_msg_recv_no = req_no;
628   if (metaRdnGroup == NULL) {
629     metaRdnGroup = (MetaBalancerRedn*)CkLocalBranch(_metalbred);
630   }
631   if (metaRdnGroup != NULL) {
632     metaRdnGroup->getMaxIter(adaptive_struct.lb_iteration_no);
633   }
634 }
635
636 void MetaBalancer::LoadBalanceDecisionFinal(int req_no, int period) {
637   if (req_no < adaptive_struct.lb_msg_recv_no) {
638     return;
639   }
640   DEBADDETAIL(("[%d] Final Load balance decision made cur iteration: %d \
641                         period:%d \n",CkMyPe(), adaptive_struct.lb_iteration_no, period));
642   adaptive_struct.tentative_period = period;
643   adaptive_struct.final_lb_period = period;
644   lbdatabase->MetaLBResumeWaitingChares(period);
645 }
646
647 void MetaBalancer::MetaLBCallLBOnChares() {
648   lbdatabase->MetaLBCallLBOnChares();
649 }
650
651 void MetaBalancer::ReceiveIterationNo(int local_iter_no) {
652   CmiAssert(CkMyPe() == 0);
653
654   if (local_iter_no > adaptive_struct.global_max_iter_no) {
655     adaptive_struct.global_max_iter_no = local_iter_no;
656   }
657
658   int period;
659
660     if (adaptive_struct.global_max_iter_no > adaptive_struct.tentative_max_iter_no) {
661       adaptive_struct.tentative_max_iter_no = adaptive_struct.global_max_iter_no;
662     }
663     period = (adaptive_struct.tentative_period > adaptive_struct.global_max_iter_no) ?
664                                 adaptive_struct.tentative_period : adaptive_struct.global_max_iter_no + 1;
665     // If no one has gone into load balancing stage, then we can safely change
666     // the period otherwise keep the old period.
667     if (adaptive_struct.global_max_iter_no < adaptive_struct.final_lb_period) {
668       adaptive_struct.tentative_period = period;
669       DEBAD(("Final lb_period CHANGED!%d\n", adaptive_struct.tentative_period));
670     } else {
671       adaptive_struct.tentative_period = adaptive_struct.final_lb_period;
672       DEBAD(("Final lb_period NOT CHANGED!%d\n", adaptive_struct.tentative_period));
673     }
674     thisProxy.LoadBalanceDecisionFinal(adaptive_struct.lb_msg_recv_no, adaptive_struct.tentative_period);
675     adaptive_struct.in_progress = false;
676 }
677
678 int MetaBalancer::getPredictedLBPeriod(bool& is_tentative) {
679   // If tentative and final_lb_period are the same, then the decision has been
680   // made but if not, they are in the middle of consensus, hence return the
681   // lease of the two
682   if (adaptive_struct.tentative_period != adaptive_struct.final_lb_period) {
683     is_tentative = true;
684   } else {
685     is_tentative = false;
686   }
687   if (adaptive_struct.tentative_period < adaptive_struct.final_lb_period) {
688     return adaptive_struct.tentative_period;
689    } else {
690      return adaptive_struct.final_lb_period;
691    }
692 }
693
694 // Called by CentralLB to indicate that the LB strategy and migration is in
695 // progress.
696 void MetaBalancer::ResetAdaptive() {
697   adaptive_lbdb.lb_iter_no = -1;
698   lb_in_progress = true;
699 }
700
701 // This is required for PEs with no objs
702 void MetaBalancer::periodicCall(void *ad) {
703   MetaBalancer *s = (MetaBalancer *)ad;
704   CcdCallFnAfterOnPE((CcdVoidFn)checkForNoObj, (void *)s, 500, CkMyPe());
705 }
706
707 void MetaBalancer::checkForNoObj(void *ad) {
708   MetaBalancer *s = (MetaBalancer *) ad;
709   s->HandleAdaptiveNoObj();
710 }
711
712 // Called by LBDatabase to indicate that no objs are there in this processor
713 void MetaBalancer::HandleAdaptiveNoObj() {
714   if (lbdatabase->getLBDB()->ObjDataCount() == 0) {
715     adaptive_struct.finished_iteration_no++;
716     adaptive_struct.lb_iteration_no++;
717     DEBAD(("(%d) --HandleAdaptiveNoObj %d\n", CkMyPe(),
718           adaptive_struct.finished_iteration_no));
719     thisProxy[0].RegisterNoObjCallback(CkMyPe());
720     TriggerAdaptiveReduction();
721   }
722 }
723
724 void MetaBalancer::RegisterNoObjCallback(int index) {
725   // If the load balancing process (migration) is going on and in the meantime
726   // one of the processor finishes everything and finds that there are no objs
727   // in it, then it registers a callback. So clear the old data in case it
728   // hasn't been done.
729   if (lb_in_progress) {
730     lbdb_no_obj_callback.clear();
731     lb_in_progress = false;
732   }
733   lbdb_no_obj_callback.push_back(index);
734   DEBAD(("Registered %d to have no objs.\n", index));
735
736   // If collection has already happened and this is second iteration, then
737   // trigger reduction.
738   if (adaptive_lbdb.lb_iter_no != -1) {
739     DEBAD(("Collection already started now %d so kick in\n",
740         adaptive_struct.finished_iteration_no));
741     //thisProxy[index].TriggerAdaptiveReduction();
742   }
743 }
744
745 void MetaBalancer::TriggerAdaptiveReduction() {
746   if (lbdatabase->getLBDB()->ObjDataCount() == 0) {
747     adaptive_struct.finished_iteration_no++;
748     adaptive_struct.lb_iteration_no++;
749     double lb_data[STATS_COUNT];
750     lb_data[0] = adaptive_struct.finished_iteration_no;
751     lb_data[1] = 1;
752     lb_data[2] = 0.0;
753     lb_data[3] = 0.0;
754     lb_data[4] = 0.0;
755     lb_data[5] = 0.0;
756     lb_data[6] = 0.0;
757     lb_data[7] = 0.0;
758
759     DEBAD(("[%d] Triggered adaptive reduction for noobj %d\n", CkMyPe(),
760           adaptive_struct.finished_iteration_no));
761
762     CkCallback cb(CkIndex_MetaBalancer::ReceiveMinStats((CkReductionMsg*)NULL),
763         thisProxy[0]);
764     contribute(STATS_COUNT*sizeof(double), lb_data, lbDataCollectionType, cb);
765   }
766 }
767
768
769 bool MetaBalancer::isStrategyComm() {
770   return adaptive_struct.doCommStrategy;
771 }
772
773 void MetaBalancer::SetMigrationCost(double lb_migration_cost) {
774   adaptive_struct.lb_migration_cost = lb_migration_cost;
775 }
776
777 void MetaBalancer::SetStrategyCost(double lb_strategy_cost) {
778   adaptive_struct.lb_strategy_cost = lb_strategy_cost;
779 }
780
781 void MetaBalancer::UpdateAfterLBData(int lb, double lb_max, double lb_avg, double
782     local_comm, double remote_comm) {
783   adaptive_struct.last_lb_type = lb;
784   if (lb == 0) {
785     adaptive_struct.greedy_info.max_avg_ratio = lb_max/lb_avg;
786   } else if (lb == 1) {
787     adaptive_struct.refine_info.max_avg_ratio = lb_max/lb_avg;
788   } else if (lb == 2) {
789     adaptive_struct.comm_info.remote_local_ratio = remote_comm/local_comm;
790   } else if (lb == 3) {
791     adaptive_struct.comm_refine_info.remote_local_ratio =
792     remote_comm/local_comm;
793   }
794 }
795
796 void MetaBalancer::UpdateAfterLBData(double max_load, double max_cpu,
797     double avg_load) {
798   if (adaptive_struct.last_lb_type == -1) {
799     adaptive_struct.last_lb_type = 0;
800   }
801   int lb = adaptive_struct.last_lb_type;
802   if (lb == 0) {
803     adaptive_struct.greedy_info.max_avg_ratio = max_load/avg_load;
804   } else if (lb == 1) {
805     adaptive_struct.refine_info.max_avg_ratio = max_load/avg_load;
806   } else if (lb == 2) {
807     adaptive_struct.comm_info.max_avg_ratio = max_load/avg_load;
808   } else if (lb == 3) {
809     adaptive_struct.comm_refine_info.max_avg_ratio = max_load/avg_load;
810   }
811 }
812
813 void MetaBalancer::UpdateAfterLBComm(double alpha_beta_to_load) {
814   DEBAD(("Setting alpha beta %lf\n", alpha_beta_to_load));
815   alpha_beta_cost_to_load = alpha_beta_to_load;
816 }
817
818
819 void MetaBalancer::GetPrevLBData(int& lb_type, double& lb_max_avg_ratio,
820     double& remote_local_comm_ratio) {
821   lb_type = adaptive_struct.last_lb_type;
822   lb_max_avg_ratio = 1;
823   remote_local_comm_ratio = 1;
824   GetLBDataForLB(lb_type, lb_max_avg_ratio, remote_local_comm_ratio);
825
826   // Based on the first iteration
827   lb_max_avg_ratio = adaptive_struct.info_first_iter.max_avg_ratio;
828 }
829
830 void MetaBalancer::GetLBDataForLB(int lb_type, double& lb_max_avg_ratio,
831     double& remote_local_comm_ratio) {
832   if (lb_type == 0) {
833     lb_max_avg_ratio = adaptive_struct.greedy_info.max_avg_ratio;
834   } else if (lb_type == 1) {
835     lb_max_avg_ratio = adaptive_struct.refine_info.max_avg_ratio;
836   } else if (lb_type == 2) {
837     remote_local_comm_ratio = adaptive_struct.comm_info.remote_local_ratio;
838   } else if (lb_type == 3) {
839     remote_local_comm_ratio =
840        adaptive_struct.comm_refine_info.remote_local_ratio;
841   }
842 }
843
844 void MetaBalancerRedn::init() {
845   metabalancer = (MetaBalancer *)CkLocalBranch(_metalb);
846 }
847
848 void MetaBalancerRedn::pup(PUP::er& p) {
849         CBase_MetaBalancerRedn::pup(p);
850 }
851
852 void MetaBalancerRedn::ReceiveIterNo(int max_iter) {
853   CkAssert(CkMyPe() == 0);
854   if (metabalancer == NULL) {
855     metabalancer = (MetaBalancer*)CkLocalBranch(_metalb);
856   }
857   if (metabalancer != NULL) {
858     metabalancer->ReceiveIterationNo(max_iter);
859   }
860 }
861
862 void MetaBalancerRedn::getMaxIter(int max_iter) {
863   CkCallback cb(CkReductionTarget(MetaBalancerRedn, ReceiveIterNo), thisProxy[0]);
864   contribute(sizeof(int), &max_iter, CkReduction::max_int, cb);
865 }
866
867 #include "MetaBalancer.def.h"
868
869 /*@}*/