a550a226ba9f7d1be813f3ace1f54a16ec18c427
[charm.git] / src / ck-ldb / MetaBalancer.C
1 /**
2  * 
3 */
4 /*@{*/
5
6 #include "converse.h"
7
8 /*
9  */
10
11 #include "MetaBalancer.h"
12 #include "topology.h"
13
14 #include "limits.h"
15
16 #define VEC_SIZE 50
17 #define IMB_TOLERANCE 1.1
18 #define OUTOFWAY_TOLERANCE 2
19 #define UTILIZATION_THRESHOLD 0.7
20 #define NEGLECT_IDLE 2 // Should never be == 1
21 #define MIN_STATS 6
22 #define STATS_COUNT 8 // The number of stats collected during reduction
23
24 #define DEBAD(x) CkPrintf x
25 #define DEBADDETAIL(x) /*CkPrintf x*/
26 #define EXTRA_FEATURE 0
27
28 CkReductionMsg* lbDataCollection(int nMsg, CkReductionMsg** msgs) {
29   double lb_data[STATS_COUNT];
30   lb_data[1] = 0.0; // total number of processors contributing
31   lb_data[2] = 0.0; // total load
32   lb_data[3] = 0.0; // max load
33   lb_data[4] = 0.0; // idle time
34   lb_data[5] = 1.0; // utilization
35   lb_data[6] = 0.0; // total load with bg
36   lb_data[7] = 0.0; // max load with bg
37   for (int i = 0; i < nMsg; i++) {
38     CkAssert(msgs[i]->getSize() == STATS_COUNT*sizeof(double));
39     if (msgs[i]->getSize() != STATS_COUNT*sizeof(double)) {
40       CkPrintf("Error!!! Reduction not correct. Msg size is %d\n",
41           msgs[i]->getSize());
42       CkAbort("Incorrect Reduction size in MetaBalancer\n");
43     }
44     double* m = (double *)msgs[i]->getData();
45     // Total count
46     lb_data[1] += m[1];
47     // Avg load
48     lb_data[2] += m[2];
49     // Max load
50     lb_data[3] = ((m[3] > lb_data[3])? m[3] : lb_data[3]);
51     // Avg idle
52     lb_data[4] += m[4];
53     // Get least utilization
54     lb_data[5] = ((m[5] < lb_data[5]) ? m[5] : lb_data[5]);
55     // Get Avg load with bg
56     lb_data[6] += m[6];
57     // Get Max load with bg
58     lb_data[7] = ((m[7] > lb_data[7])? m[7] : lb_data[7]);
59     if (i == 0) {
60       // Iteration no
61       lb_data[0] = m[0];
62     }
63     if (m[0] != lb_data[0]) {
64       CkPrintf("Error!!! Reduction is intermingled between iteration %lf \
65         and %lf\n", lb_data[0], m[0]);
66       CkAbort("Intermingling iterations in MetaBalancer\n");
67     }
68   }
69   return CkReductionMsg::buildNew(STATS_COUNT*sizeof(double), lb_data);
70 }
71
72 /*global*/ CkReduction::reducerType lbDataCollectionType;
73 /*initcall*/ void registerLBDataCollection(void) {
74   lbDataCollectionType = CkReduction::addReducer(lbDataCollection);
75 }
76
77 CkGroupID _metalb;
78
79 CkpvDeclare(int, metalbInited);  /**< true if metabalancer is inited */
80
81 // mainchare
82 MetaLBInit::MetaLBInit(CkArgMsg *m) {
83 #if CMK_LBDB_ON
84   _metalb = CProxy_MetaBalancer::ckNew();
85 #endif
86   delete m;
87 }
88
89 // called from init.C
90 void _metabalancerInit() {
91   CkpvInitialize(int, metalbInited);
92   CkpvAccess(metalbInited) = 0;
93 }
94
95 void MetaBalancer::initnodeFn() {
96 }
97
98 // called by my constructor
99 void MetaBalancer::init(void) {
100   lbdatabase = (LBDatabase *)CkLocalBranch(_lbdb);
101   CkpvAccess(metalbInited) = 1;
102   total_load_vec.resize(VEC_SIZE, 0.0);
103   total_count_vec.resize(VEC_SIZE, 0);
104   prev_idle = 0.0;
105   alpha_beta_cost_to_load = 1.0; // Some random value. TODO: Find the actual
106
107   adaptive_lbdb.lb_iter_no = -1;
108
109   // If metabalancer enabled, initialize the variables
110   adaptive_struct.tentative_period =  INT_MAX;
111   adaptive_struct.final_lb_period =  INT_MAX;
112   adaptive_struct.lb_calculated_period = INT_MAX;
113   adaptive_struct.lb_iteration_no = -1;
114   adaptive_struct.global_max_iter_no = 0;
115   adaptive_struct.tentative_max_iter_no = -1;
116   adaptive_struct.global_recv_iter_counter = 0;
117   adaptive_struct.in_progress = false;
118   adaptive_struct.lb_strategy_cost = 0.0;
119   adaptive_struct.lb_migration_cost = 0.0;
120   adaptive_struct.lb_msg_send_no = 0;
121   adaptive_struct.lb_msg_recv_no = 0;
122   adaptive_struct.total_syncs_called = 0;
123   adaptive_struct.last_lb_type = -1;
124
125
126   // This is indicating if the load balancing strategy and migration started.
127   // This is mainly used to register callbacks for noobj pes. They would
128   // register as soon as resumefromsync is called. On receiving the handles at
129   // the central pe, it clears the previous handlers and sets lb_in_progress
130   // to false so that it doesn't clear the handles.
131   lb_in_progress = false;
132
133   is_prev_lb_refine = -1;
134   if (_lb_args.metaLbOn()) {
135     periodicCall((void *) this);
136   }
137 }
138
139 void MetaBalancer::pup(PUP::er& p) {
140         IrrGroup::pup(p);
141   if (p.isUnpacking()) {
142     lbdatabase = (LBDatabase *)CkLocalBranch(_lbdb);
143   }
144   p|prev_idle;
145   p|alpha_beta_cost_to_load;
146   p|is_prev_lb_refine;
147   p|lb_in_progress;
148 }
149
150
151 void MetaBalancer::ResumeClients() {
152   // If metabalancer enabled, initialize the variables
153   adaptive_lbdb.history_data.clear();
154
155   adaptive_struct.tentative_period =  INT_MAX;
156   adaptive_struct.final_lb_period =  INT_MAX;
157   adaptive_struct.lb_calculated_period = INT_MAX;
158   adaptive_struct.lb_iteration_no = -1;
159   adaptive_struct.global_max_iter_no = 0;
160   adaptive_struct.tentative_max_iter_no = -1;
161   adaptive_struct.global_recv_iter_counter = 0;
162   adaptive_struct.in_progress = false;
163   adaptive_struct.lb_strategy_cost = 0.0;
164   adaptive_struct.lb_migration_cost = 0.0;
165   adaptive_struct.lb_msg_send_no = 0;
166   adaptive_struct.lb_msg_recv_no = 0;
167   adaptive_struct.total_syncs_called = 0;
168
169   prev_idle = 0.0;
170   if (lb_in_progress) {
171     lbdb_no_obj_callback.clear();
172     lb_in_progress = false;
173   }
174   HandleAdaptiveNoObj();
175 }
176
177 int MetaBalancer::get_iteration() {
178   return adaptive_struct.lb_iteration_no;
179 }
180
181 bool MetaBalancer::AddLoad(int it_n, double load) {
182   int index = it_n % VEC_SIZE;
183   total_count_vec[index]++;
184   adaptive_struct.total_syncs_called++;
185   DEBADDETAIL(("At PE %d Total contribution for iteration %d is %d \
186       total objs %d\n", CkMyPe(), it_n, total_count_vec[index],
187       lbdatabase->getLBDB()->ObjDataCount()));
188
189   if (it_n < adaptive_struct.lb_iteration_no) {
190     CkAbort("Error!! Received load for previous iteration\n");
191   }
192   if (it_n > adaptive_struct.lb_iteration_no) {
193     adaptive_struct.lb_iteration_no = it_n;
194   }
195   total_load_vec[index] += load;
196   if (total_count_vec[index] > lbdatabase->getLBDB()->ObjDataCount()) {
197     CkPrintf("iteration %d received %d contributions and expected %d\n", it_n,
198         total_count_vec[index], lbdatabase->getLBDB()->ObjDataCount());
199     CkAbort("Abort!!! Received more contribution");
200   }
201
202   if (total_count_vec[index] == lbdatabase->getLBDB()->ObjDataCount()) {
203     double idle_time, bg_walltime, cpu_bgtime;
204     lbdatabase->IdleTime(&idle_time);
205     lbdatabase->BackgroundLoad(&bg_walltime, &cpu_bgtime);
206
207     int sync_for_bg = adaptive_struct.total_syncs_called +
208         lbdatabase->getLBDB()->ObjDataCount();
209     bg_walltime = bg_walltime * lbdatabase->getLBDB()->ObjDataCount() / sync_for_bg;
210
211     if (it_n < NEGLECT_IDLE) {
212       prev_idle = idle_time;
213     }
214     idle_time -= prev_idle;
215
216     // The chares do not contribute their 0th iteration load. So the total syncs
217     // in reality is total_syncs_called + obj_counts
218     int total_countable_syncs = adaptive_struct.total_syncs_called +
219         (1 - NEGLECT_IDLE) * lbdatabase->getLBDB()->ObjDataCount(); // TODO: Fix me!
220     if (total_countable_syncs != 0) {
221       idle_time = idle_time * lbdatabase->getLBDB()->ObjDataCount() / total_countable_syncs;
222     }
223
224     double lb_data[STATS_COUNT];
225     lb_data[0] = it_n;
226     lb_data[1] = 1;
227     lb_data[2] = total_load_vec[index]; // For average load
228     lb_data[3] = total_load_vec[index]; // For max load
229     lb_data[4] = idle_time;
230     // Set utilization
231     if (total_load_vec[index] == 0.0) {
232       lb_data[5] = 0.0;
233     } else {
234       lb_data[5] = total_load_vec[index]/(idle_time + total_load_vec[index]);
235     }
236     lb_data[6] = lb_data[2] + bg_walltime; // For Avg load with bg
237     lb_data[7] = lb_data[6]; // For Max load with bg
238     total_load_vec[index] = 0.0;
239     total_count_vec[index] = 0;
240
241     DEBADDETAIL(("[%d] sends total load %lf idle time %lf ratio of idle/load %lf at iter %d\n",
242         CkMyPe(), total_load_vec[index], idle_time,
243         idle_time/total_load_vec[index], adaptive_struct.lb_iteration_no));
244
245     CkCallback cb(CkIndex_MetaBalancer::ReceiveMinStats((CkReductionMsg*)NULL), thisProxy[0]);
246     contribute(STATS_COUNT*sizeof(double), lb_data, lbDataCollectionType, cb);
247   }
248   return true;
249 }
250
251 void MetaBalancer::ReceiveMinStats(CkReductionMsg *msg) {
252   double* load = (double *) msg->getData();
253   double avg = load[2]/load[1];
254   double max = load[3];
255   double avg_idle = load[4]/load[1];
256   double utilization = load[5];
257   int iteration_n = (int) load[0];
258   double avg_load_bg = load[6]/load[1];
259   double max_load_bg = load[7];
260   DEBAD(("** [%d] Iteration Avg load: %lf Max load: %lf Avg Idle : %lf \
261       Max Idle : %lf for %lf procs\n",iteration_n, avg, max, avg_idle,
262       utilization, load[1]));
263   delete msg;
264
265   // For processors with  no  objs, trigger MetaBalancer reduction
266   if (adaptive_struct.final_lb_period != iteration_n) {
267     for (int i = 0; i < lbdb_no_obj_callback.size(); i++) {
268       thisProxy[lbdb_no_obj_callback[i]].TriggerAdaptiveReduction();
269     }
270   }
271
272   // Store the data for this iteration
273   adaptive_lbdb.lb_iter_no = iteration_n;
274   AdaptiveData data;
275   data.iteration = adaptive_lbdb.lb_iter_no;
276   data.max_load = max;
277   data.avg_load = avg;
278   data.utilization = utilization;
279   data.idle_time = avg_idle;
280   adaptive_lbdb.history_data.push_back(data);
281
282   // If lb period inform is in progress, dont inform again.
283   // If this is the lb data corresponding to the final lb period informed, then
284   // don't recalculate as some of the processors might have already gone into
285   // LB_STAGE.
286   if (adaptive_struct.in_progress || 
287       (adaptive_struct.final_lb_period == iteration_n)) {
288     return;
289   }
290
291   double utilization_threshold = UTILIZATION_THRESHOLD;
292
293 #if EXTRA_FEATURE
294   DEBAD(("alpha_beta_to_load %lf\n", alpha_beta_cost_to_load));
295   if (alpha_beta_cost_to_load < 0.1) {
296     // Ignore the effect of idle time and there by lesser utilization. So we
297     // assign utilization threshold to be 0.0
298     DEBAD(("Changing the idle load tolerance coz this isn't \
299         communication intensive benchmark\n"));
300     utilization_threshold = 0.0;
301   }
302 #endif
303
304   // First generate the lb period based on the cost of lb. Find out what is the
305   // expected imbalance at the calculated lb period.
306   int period;
307   // This is based on the new max load after load balancing. So technically, it
308   // is calculated based on the shifter up avg curve.
309   double ratio_at_t = 1.0;
310   int tmp_lb_type;
311   double tmp_max_avg_ratio, tmp_comm_ratio;
312   GetPrevLBData(tmp_lb_type, tmp_max_avg_ratio, tmp_comm_ratio);
313   double tolerate_imb = IMB_TOLERANCE * tmp_max_avg_ratio;
314
315   if (generatePlan(period, ratio_at_t)) {
316     DEBAD(("Generated period and calculated %d and period %d max iter %d\n",
317       adaptive_struct.lb_calculated_period, period,
318       adaptive_struct.tentative_max_iter_no));
319     // set the imbalance tolerance to be ratio_at_calculated_lb_period
320     if (ratio_at_t != 1.0) {
321       DEBAD(("Changed tolerance to %lf after line eq whereas max/avg is %lf\n",
322         ratio_at_t, max/avg));
323       // Since ratio_at_t is shifter up, max/(tmp_max_avg_ratio * avg) should be
324       // compared with the tolerance
325       tolerate_imb = ratio_at_t * tmp_max_avg_ratio * OUTOFWAY_TOLERANCE;
326     }
327
328     DEBAD(("Prev LB Data Type %d, max/avg %lf, local/remote %lf\n",
329       tmp_lb_type, tmp_max_avg_ratio, tmp_comm_ratio));
330
331     if ((utilization < utilization_threshold || max/avg >= tolerate_imb) &&
332           adaptive_lbdb.history_data.size() > MIN_STATS) {
333       DEBAD(("Trigger soon even though we calculated lbperiod max/avg(%lf) and utilization ratio (%lf)\n", max/avg, utilization));
334       TriggerSoon(iteration_n, max/avg, tolerate_imb);
335       return;
336     }
337
338     // If the new lb period from linear extrapolation is greater than maximum
339     // iteration known from previously collected data, then inform all the
340     // processors about the new calculated period.
341     if (period > adaptive_struct.tentative_max_iter_no && period !=
342           adaptive_struct.final_lb_period) {
343       adaptive_struct.doCommStrategy = false;
344       adaptive_struct.lb_calculated_period = period;
345       adaptive_struct.in_progress = true;
346       DEBAD(("Sticking to the calculated period %d\n",
347           adaptive_struct.lb_calculated_period));
348       thisProxy.LoadBalanceDecision(adaptive_struct.lb_msg_send_no++,
349         adaptive_struct.lb_calculated_period);
350       return;
351     }
352     return;
353   }
354
355   DEBAD(("Prev LB Data Type %d, max/avg %lf, local/remote %lf\n", tmp_lb_type,
356       tmp_max_avg_ratio, tmp_comm_ratio));
357
358   // This would be called when linear extrapolation did not provide suitable
359   // period provided there is enough  historical data 
360   if ((utilization < utilization_threshold || max/avg >= tolerate_imb) && adaptive_lbdb.history_data.size() > 4) {
361     DEBAD(("Carry out load balancing step at iter max/avg(%lf) and utilization \
362       ratio (%lf)\n", max/avg, utilization));
363     TriggerSoon(iteration_n, max/avg, tolerate_imb);
364     return;
365   }
366
367 }
368
369 void MetaBalancer::TriggerSoon(int iteration_n, double imbalance_ratio,
370     double tolerate_imb) {
371
372   // If the previously calculated_period (not the final decision) is greater
373   // than the iter +1 and if it is greater than the maximum iteration we have
374   // seen so far, then we can inform this
375   if ((iteration_n + 1 > adaptive_struct.tentative_max_iter_no) &&
376       (iteration_n+1 < adaptive_struct.lb_calculated_period) &&
377       (iteration_n + 1 != adaptive_struct.final_lb_period)) {
378     if (imbalance_ratio < tolerate_imb) {
379       adaptive_struct.doCommStrategy = true;
380       DEBAD(("No load imbalance but idle time\n"));
381     } else {
382       adaptive_struct.doCommStrategy = false;
383       DEBAD(("load imbalance \n"));
384     }
385     adaptive_struct.lb_calculated_period = iteration_n + 1;
386     adaptive_struct.in_progress = true;
387     DEBAD(("Informing everyone the lb period is %d\n",
388         adaptive_struct.lb_calculated_period));
389     thisProxy.LoadBalanceDecision(adaptive_struct.lb_msg_send_no++,
390         adaptive_struct.lb_calculated_period);
391   }
392 }
393
394 bool MetaBalancer::generatePlan(int& period, double& ratio_at_t) {
395   if (adaptive_lbdb.history_data.size() <= 4) {
396     return false;
397   }
398
399   // Some heuristics for lbperiod
400   // If constant load or almost constant,
401   // then max * new_lb_period > avg * new_lb_period + lb_cost
402   double max = 0.0;
403   double avg = 0.0;
404   AdaptiveData data;
405   for (int i = 0; i < adaptive_lbdb.history_data.size(); i++) {
406     data = adaptive_lbdb.history_data[i];
407     max += data.max_load;
408     avg += data.avg_load;
409     DEBAD(("max (%d, %lf) avg (%d, %lf)\n", i, data.max_load, i, data.avg_load));
410   }
411 //  max /= (adaptive_struct.lb_iteration_no - adaptive_lbdb.history_data[0].iteration);
412 //  avg /= (adaptive_struct.lb_iteration_no - adaptive_lbdb.history_data[0].iteration);
413
414   // If linearly varying load, then find lb_period
415   // area between the max and avg curve
416   // If we can attain perfect balance, then the new load is close to the
417   // average. Hence we pass 1, else pass in some other value which would be the
418   // new max_load after load balancing.
419   int tmp_lb_type;
420   double tmp_max_avg_ratio, tmp_comm_ratio;
421   double tolerate_imb;
422
423 #if EXTRA_FEATURE
424   // First get the data for refine.
425   GetLBDataForLB(1, tmp_max_avg_ratio, tmp_comm_ratio);
426   tolerate_imb = tmp_max_avg_ratio;
427
428   // If RefineLB does a good job, then find the period considering RefineLB
429   if (tmp_max_avg_ratio <= 1.01) {
430     if (max/avg < tolerate_imb) {
431       DEBAD(("Resorting to imb = 1.0 coz max/avg (%lf) < imb(%lf)\n", max/avg,
432           tolerate_imb));
433       tolerate_imb = 1.0;
434     }
435     DEBAD(("Will generate plan for refine %lf imb and %lf overhead\n",
436         tolerate_imb, 0.2));
437     return getPeriodForStrategy(tolerate_imb, 0.2, period, ratio_at_t);
438   }
439
440   GetLBDataForLB(0, tmp_max_avg_ratio, tmp_comm_ratio);
441 #endif
442
443   GetPrevLBData(tmp_lb_type, tmp_max_avg_ratio, tmp_comm_ratio);
444   tolerate_imb = tmp_max_avg_ratio;
445 //  if (max/avg < tolerate_imb) {
446 //    CkPrintf("Resorting to imb = 1.0 coz max/avg (%lf) < imb(%lf)\n", max/avg, tolerate_imb);
447 //    tolerate_imb = 1.0;
448 //  }
449   if (max/avg > tolerate_imb) {
450     if (getPeriodForStrategy(tolerate_imb, 1, period, ratio_at_t)) {
451       return true;
452     }
453   }
454
455   max = 0.0;
456   avg = 0.0;
457   for (int i = 0; i < adaptive_lbdb.history_data.size(); i++) {
458     data = adaptive_lbdb.history_data[i];
459     max += data.max_load;
460     avg += data.avg_load*tolerate_imb;
461   }
462   max /= adaptive_lbdb.history_data.size();
463   avg /= adaptive_lbdb.history_data.size();
464   double cost = adaptive_struct.lb_strategy_cost + adaptive_struct.lb_migration_cost;
465   period = (int) (cost/(max - avg));
466   DEBAD(("Obtained period %d from constant prediction\n", period));
467   if (period < 0) { 
468     period = adaptive_struct.final_lb_period;
469     DEBAD(("Obtained -ve period from constant prediction so changing to prev %d\n", period));
470   } 
471   ratio_at_t = max / avg;
472   return true;
473 }
474
475 bool MetaBalancer::getPeriodForStrategy(double new_load_percent,
476     double overhead_percent, int& period, double& ratio_at_t) {
477   double mslope, aslope, mc, ac;
478   getLineEq(new_load_percent, aslope, ac, mslope, mc);
479   DEBAD(("new load percent %lf\n", new_load_percent));
480   DEBAD(("\n max: %fx + %f; avg: %fx + %f\n", mslope, mc, aslope, ac));
481   double a = (mslope - aslope)/2;
482   double b = (mc - ac);
483   double c = -(adaptive_struct.lb_strategy_cost +
484       adaptive_struct.lb_migration_cost) * overhead_percent;
485   CkPrintf("cost %f\n",
486       (adaptive_struct.lb_strategy_cost+adaptive_struct.lb_migration_cost));
487   bool got_period = getPeriodForLinear(a, b, c, period);
488   if (!got_period) {
489     return false;
490   }
491
492   if (mslope < 0) {
493     if (period > (-mc/mslope)) {
494       DEBAD(("Max < 0 Period set when max load is -ve\n"));
495       return false;
496     }
497   }
498
499   if (aslope < 0) {
500     if (period > (-ac/aslope)) {
501       DEBAD(("Avg < 0 Period set when avg load is -ve\n"));
502       return false;
503     }
504   }
505
506   int intersection_t = (int) ((mc-ac) / (aslope - mslope));
507   if (intersection_t > 0 && period > intersection_t) {
508     DEBAD(("Avg | Max Period set when curves intersect\n"));
509     return false;
510   }
511   ratio_at_t = ((mslope*period + mc)/(aslope*period + ac));
512   DEBAD(("Ratio at t (%lf*%d + %lf) / (%lf*%d+%lf) = %lf\n", mslope, period, mc, aslope, period, ac, ratio_at_t));
513   return true;
514 }
515
516 bool MetaBalancer::getPeriodForLinear(double a, double b, double c, int& period) {
517   DEBAD(("Quadratic Equation %lf X^2 + %lf X + %lf\n", a, b, c));
518   if (a == 0.0) {
519     period = (int) (-c / b);
520     if (period < 0) {
521       DEBAD(("-ve period for -c/b (%d)\n", period));
522       return false;
523     }
524     DEBAD(("Ideal period for linear load %d\n", period));
525     return true;
526   }
527   int x;
528   double t = (b * b) - (4*a*c);
529   if (t < 0) {
530     DEBAD(("(b * b) - (4*a*c) is -ve sqrt : %lf\n", sqrt(t)));
531     return false;
532   }
533   t = (-b + sqrt(t)) / (2*a);
534   x = (int) t;
535   if (x < 0) {
536     DEBAD(("boo!!! x (%d) < 0\n", x));
537     x = 0;
538     return false;
539   }
540   period = x;
541   DEBAD(("Ideal period for linear load %d\n", period));
542   return true;
543 }
544
545 bool MetaBalancer::getLineEq(double new_load_percent, double& aslope, double& ac, double& mslope, double& mc) {
546   int total = adaptive_lbdb.history_data.size();
547   int iterations = (int) (1 + adaptive_lbdb.history_data[total - 1].iteration -
548       adaptive_lbdb.history_data[0].iteration);
549   double a1 = 0;
550   double m1 = 0;
551   double a2 = 0;
552   double m2 = 0;
553   AdaptiveData data;
554   int i = 0;
555   for (i = 0; i < total/2; i++) {
556     data = adaptive_lbdb.history_data[i];
557     m1 += data.max_load;
558     a1 += data.avg_load;
559     DEBAD(("max (%d, %lf) avg (%d, %lf) adjusted_avg (%d, %lf)\n", i, data.max_load, i, data.avg_load, i, new_load_percent*data.avg_load));
560   }
561   m1 /= i;
562   a1 = (a1 * new_load_percent) / i;
563
564   for (i = total/2; i < total; i++) {
565     data = adaptive_lbdb.history_data[i];
566     m2 += data.max_load;
567     a2 += data.avg_load;
568     DEBAD(("max (%d, %lf) avg (%d, %lf) adjusted_avg (%d, %lf)\n", i, data.max_load, i, data.avg_load, i, new_load_percent*data.avg_load));
569   }
570   m2 /= (i - total/2);
571   a2 = (a2 * new_load_percent) / (i - total/2);
572
573   aslope = 2 * (a2 - a1) / iterations;
574   mslope = 2 * (m2 - m1) / iterations;
575   ac = adaptive_lbdb.history_data[0].avg_load * new_load_percent;
576   mc = adaptive_lbdb.history_data[0].max_load;
577
578   ac = a1 - ((aslope * total)/4);
579   mc = m1 - ((mslope * total)/4);
580
581   //ac = (adaptive_lbdb.history_data[1].avg_load * new_load_percent - aslope);
582   //mc = (adaptive_lbdb.history_data[1].max_load - mslope);
583
584   return true;
585 }
586
587 void MetaBalancer::LoadBalanceDecision(int req_no, int period) {
588   if (req_no < adaptive_struct.lb_msg_recv_no) {
589     DEBAD(("Error!!! Received a request which was already sent or old\n"));
590     return;
591   }
592   DEBADDETAIL(("[%d] Load balance decision made cur iteration: %d period:%d\n",
593                         CkMyPe(), adaptive_struct.lb_iteration_no, period));
594   adaptive_struct.tentative_period = period;
595   adaptive_struct.lb_msg_recv_no = req_no;
596   thisProxy[0].ReceiveIterationNo(req_no, adaptive_struct.lb_iteration_no);
597 }
598
599 void MetaBalancer::LoadBalanceDecisionFinal(int req_no, int period) {
600   if (req_no < adaptive_struct.lb_msg_recv_no) {
601     return;
602   }
603   DEBADDETAIL(("[%d] Final Load balance decision made cur iteration: %d \
604                         period:%d \n",CkMyPe(), adaptive_struct.lb_iteration_no, period));
605   adaptive_struct.tentative_period = period;
606   adaptive_struct.final_lb_period = period;
607   lbdatabase->MetaResumeWaitingChares(period);
608 }
609
610
611 void MetaBalancer::ReceiveIterationNo(int req_no, int local_iter_no) {
612   CmiAssert(CkMyPe() == 0);
613
614   adaptive_struct.global_recv_iter_counter++;
615   if (local_iter_no > adaptive_struct.global_max_iter_no) {
616     adaptive_struct.global_max_iter_no = local_iter_no;
617   }
618
619   int period;
620   if (CkNumPes() == adaptive_struct.global_recv_iter_counter) {
621
622     if (adaptive_struct.global_max_iter_no > adaptive_struct.tentative_max_iter_no) {
623       adaptive_struct.tentative_max_iter_no = adaptive_struct.global_max_iter_no;
624     }
625     period = (adaptive_struct.tentative_period > adaptive_struct.global_max_iter_no) ?
626                                 adaptive_struct.tentative_period : adaptive_struct.global_max_iter_no + 1;
627     // If no one has gone into load balancing stage, then we can safely change
628     // the period otherwise keep the old period.
629     if (adaptive_struct.global_max_iter_no < adaptive_struct.final_lb_period) {
630       adaptive_struct.tentative_period = period;
631       DEBAD(("Final lb_period CHANGED!%d\n", adaptive_struct.tentative_period));
632     } else {
633       adaptive_struct.tentative_period = adaptive_struct.final_lb_period;
634       DEBAD(("Final lb_period NOT CHANGED!%d\n", adaptive_struct.tentative_period));
635     }
636     thisProxy.LoadBalanceDecisionFinal(req_no, adaptive_struct.tentative_period);
637     adaptive_struct.in_progress = false;
638     adaptive_struct.global_recv_iter_counter = 0;
639   }
640 }
641
642 int MetaBalancer::getPredictedLBPeriod(bool& is_tentative) {
643   // If tentative and final_lb_period are the same, then the decision has been
644   // made but if not, they are in the middle of consensus, hence return the
645   // lease of the two
646   if (adaptive_struct.tentative_period != adaptive_struct.final_lb_period) {
647     is_tentative = true;
648   } else {
649     is_tentative = false;
650   }
651   if (adaptive_struct.tentative_period < adaptive_struct.final_lb_period) {
652     return adaptive_struct.tentative_period;
653    } else {
654      return adaptive_struct.final_lb_period;
655    }
656 }
657
658 // Called by CentralLB to indicate that the LB strategy and migration is in
659 // progress.
660 void MetaBalancer::ResetAdaptive() {
661   adaptive_lbdb.lb_iter_no = -1;
662   lb_in_progress = true;
663 }
664
665 // This is required for PEs with no objs
666 void MetaBalancer::periodicCall(void *ad) {
667   MetaBalancer *s = (MetaBalancer *)ad;
668   CcdCallFnAfterOnPE((CcdVoidFn)checkForNoObj, (void *)s, 1, CkMyPe());
669 }
670
671 void MetaBalancer::checkForNoObj(void *ad) {
672   MetaBalancer *s = (MetaBalancer *) ad;
673   s->HandleAdaptiveNoObj();
674 }
675
676 // Called by LBDatabase to indicate that no objs are there in this processor
677 void MetaBalancer::HandleAdaptiveNoObj() {
678   if (lbdatabase->getLBDB()->ObjDataCount() == 0) {
679     adaptive_struct.lb_iteration_no++;
680     DEBAD(("(%d) --HandleAdaptiveNoObj %d\n", CkMyPe(),
681           adaptive_struct.lb_iteration_no));
682     thisProxy[0].RegisterNoObjCallback(CkMyPe());
683     TriggerAdaptiveReduction();
684   }
685 }
686
687 void MetaBalancer::RegisterNoObjCallback(int index) {
688   // If the load balancing process (migration) is going on and in the meantime
689   // one of the processor finishes everything and finds that there are no objs
690   // in it, then it registers a callback. So clear the old data in case it
691   // hasn't been done.
692   if (lb_in_progress) {
693     lbdb_no_obj_callback.clear();
694     lb_in_progress = false;
695   }
696   lbdb_no_obj_callback.push_back(index);
697   DEBAD(("Registered %d to have no objs.\n", index));
698
699   // If collection has already happened and this is second iteration, then
700   // trigger reduction.
701   if (adaptive_lbdb.lb_iter_no != -1) {
702     DEBAD(("Collection already started now %d so kick in\n",
703         adaptive_struct.lb_iteration_no));
704     thisProxy[index].TriggerAdaptiveReduction();
705   }
706 }
707
708 void MetaBalancer::TriggerAdaptiveReduction() {
709   if (lbdatabase->getLBDB()->ObjDataCount() == 0) {
710     adaptive_struct.lb_iteration_no++;
711     double lb_data[STATS_COUNT];
712     lb_data[0] = adaptive_struct.lb_iteration_no;
713     lb_data[1] = 1;
714     lb_data[2] = 0.0;
715     lb_data[3] = 0.0;
716     lb_data[4] = 0.0;
717     lb_data[5] = 0.0;
718     lb_data[6] = 0.0;
719     lb_data[7] = 0.0;
720
721     DEBAD(("[%d] Triggered adaptive reduction for noobj %d\n", CkMyPe(),
722           adaptive_struct.lb_iteration_no));
723
724     CkCallback cb(CkIndex_MetaBalancer::ReceiveMinStats((CkReductionMsg*)NULL),
725         thisProxy[0]);
726     contribute(STATS_COUNT*sizeof(double), lb_data, lbDataCollectionType, cb);
727   }
728 }
729
730
731 bool MetaBalancer::isStrategyComm() {
732   return adaptive_struct.doCommStrategy;
733 }
734
735 void MetaBalancer::SetMigrationCost(double lb_migration_cost) {
736   adaptive_struct.lb_migration_cost = lb_migration_cost;
737 }
738
739 void MetaBalancer::SetStrategyCost(double lb_strategy_cost) {
740   adaptive_struct.lb_strategy_cost = lb_strategy_cost;
741 }
742
743 void MetaBalancer::UpdateAfterLBData(int lb, double lb_max, double lb_avg, double
744     local_comm, double remote_comm) {
745   adaptive_struct.last_lb_type = lb;
746   if (lb == 0) {
747     adaptive_struct.greedy_info.max_avg_ratio = lb_max/lb_avg;
748   } else if (lb == 1) {
749     adaptive_struct.refine_info.max_avg_ratio = lb_max/lb_avg;
750   } else if (lb == 2) {
751     adaptive_struct.comm_info.remote_local_ratio = remote_comm/local_comm;
752   } else if (lb == 3) {
753     adaptive_struct.comm_refine_info.remote_local_ratio =
754     remote_comm/local_comm;
755   }
756 }
757
758 void MetaBalancer::UpdateAfterLBData(double max_load, double max_cpu,
759     double avg_load) {
760   if (adaptive_struct.last_lb_type == -1) {
761     adaptive_struct.last_lb_type = 0;
762   }
763   int lb = adaptive_struct.last_lb_type;
764   if (lb == 0) {
765     adaptive_struct.greedy_info.max_avg_ratio = max_load/avg_load;
766   } else if (lb == 1) {
767     adaptive_struct.refine_info.max_avg_ratio = max_load/avg_load;
768   } else if (lb == 2) {
769     adaptive_struct.comm_info.max_avg_ratio = max_load/avg_load;
770   } else if (lb == 3) {
771     adaptive_struct.comm_refine_info.max_avg_ratio = max_load/avg_load;
772   }
773 }
774
775 void MetaBalancer::UpdateAfterLBComm(double alpha_beta_to_load) {
776   DEBAD(("Setting alpha beta %lf\n", alpha_beta_to_load));
777   alpha_beta_cost_to_load = alpha_beta_to_load;
778 }
779
780
781 void MetaBalancer::GetPrevLBData(int& lb_type, double& lb_max_avg_ratio,
782     double& remote_local_comm_ratio) {
783   lb_type = adaptive_struct.last_lb_type;
784   lb_max_avg_ratio = 1;
785   remote_local_comm_ratio = 1;
786   GetLBDataForLB(lb_type, lb_max_avg_ratio, remote_local_comm_ratio);
787 }
788
789 void MetaBalancer::GetLBDataForLB(int lb_type, double& lb_max_avg_ratio,
790     double& remote_local_comm_ratio) {
791   if (lb_type == 0) {
792     lb_max_avg_ratio = adaptive_struct.greedy_info.max_avg_ratio;
793   } else if (lb_type == 1) {
794     lb_max_avg_ratio = adaptive_struct.refine_info.max_avg_ratio;
795   } else if (lb_type == 2) {
796     remote_local_comm_ratio = adaptive_struct.comm_info.remote_local_ratio;
797   } else if (lb_type == 3) {
798     remote_local_comm_ratio =
799        adaptive_struct.comm_refine_info.remote_local_ratio;
800   }
801 }
802
803 #include "MetaBalancer.def.h"
804
805 /*@}*/