7e28a43468cdc626c130185488b8a9845f695a58
[charm.git] / src / ck-ldb / MetaBalancer.C
1 /**
2  * \addtogroup CkLdb
3 */
4 /*@{*/
5
6 #include "converse.h"
7
8 /*
9  * This C++ file contains the Charm stub functions
10  */
11
12 #include "MetaBalancer.h"
13 #include "topology.h"
14
15 #include "limits.h"
16
17 #define VEC_SIZE 50
18 #define IMB_TOLERANCE 1.1
19 #define OUTOFWAY_TOLERANCE 2
20 #define UTILIZATION_THRESHOLD 0.7
21 #define NEGLECT_IDLE 2 // Should never be == 1
22 #define MIN_STATS 6
23
24 #   define DEBAD(x) /*CkPrintf x*/
25 #   define EXTRA_FEATURE 0
26
27 CkReductionMsg* lbDataCollection(int nMsg, CkReductionMsg** msgs) {
28   double lb_data[8];
29   lb_data[1] = 0.0; // total number of processors contributing
30   lb_data[2] = 0.0; // total load
31   lb_data[3] = 0.0; // max load
32   lb_data[4] = 0.0; // idle time
33   lb_data[5] = 1.0; // utilization
34   lb_data[6] = 0.0; // total load with bg
35   lb_data[7] = 0.0; // max load with bg
36   for (int i = 0; i < nMsg; i++) {
37     CkAssert(msgs[i]->getSize() == 8*sizeof(double));
38     if (msgs[i]->getSize() != 8*sizeof(double)) {
39       CkPrintf("Error!!! Reduction not correct. Msg size is %d\n", msgs[i]->getSize());
40     }
41     double* m = (double *)msgs[i]->getData();
42     // Total count
43     lb_data[1] += m[1];
44     // Avg load
45     lb_data[2] += m[2];
46     // Max load
47     lb_data[3] = ((m[3] > lb_data[3])? m[3] : lb_data[3]);
48     // Avg idle
49     lb_data[4] += m[4];
50     // Get least utilization
51     lb_data[5] = ((m[5] < lb_data[5]) ? m[5] : lb_data[5]);
52     // Get Avg load with bg
53     lb_data[6] += m[6];
54     // Get Max load with bg
55     lb_data[7] = ((m[7] > lb_data[7])? m[7] : lb_data[7]);
56     if (i == 0) {
57       // Iteration no
58       lb_data[0] = m[0];
59     }
60     if (m[0] != lb_data[0]) {
61       CkPrintf("Error!!! Reduction is intermingled between iteration %lf and\
62       %lf\n", lb_data[0], m[0]);
63       CkAbort("Intermingling iterations\n");
64     }
65   }
66   return CkReductionMsg::buildNew(8*sizeof(double), lb_data);
67 }
68
69 /*global*/ CkReduction::reducerType lbDataCollectionType;
70 /*initcall*/ void registerLBDataCollection(void) {
71   lbDataCollectionType = CkReduction::addReducer(lbDataCollection);
72 }
73
74 CkGroupID _metalb;
75
76 CkpvDeclare(int, metalbInited);  /**< true if metabalancer is inited */
77
78 // mainchare
79 MetaLBInit::MetaLBInit(CkArgMsg *m)
80 {
81 #if CMK_LBDB_ON
82   _metalb = CProxy_MetaBalancer::ckNew();
83 #endif
84   CkPrintf("META LB Init Called\n");
85   delete m;
86 }
87
88 // called from init.C
89 void _metabalancerInit()
90 {
91   CkpvInitialize(int, metalbInited);
92   CkpvAccess(metalbInited) = 0;
93 }
94
95 void MetaBalancer::initnodeFn()
96 {
97 }
98
99 // called my constructor
100 void MetaBalancer::init(void)
101 {
102   CkPrintf("Metabalancer init %d lbdb\n", _lbdb);
103   lbdatabase = (LBDatabase *)CkLocalBranch(_lbdb);
104   CkpvAccess(metalbInited) = 1;
105   total_load_vec.resize(VEC_SIZE, 0.0);
106   total_count_vec.resize(VEC_SIZE, 0);
107   max_iteration = -1;
108   prev_idle = 0.0;
109   alpha_beta_cost_to_load = 1.0; // Some random value. TODO: Find the actual
110
111   adaptive_lbdb.lb_iter_no = -1;
112
113   // If metabalancer enabled, initialize the variables
114   adaptive_struct.tentative_period =  INT_MAX;
115   adaptive_struct.final_lb_period =  INT_MAX;
116   adaptive_struct.lb_calculated_period = INT_MAX;
117   adaptive_struct.lb_iteration_no = -1;
118   adaptive_struct.global_max_iter_no = 0;
119   adaptive_struct.tentative_max_iter_no = -1;
120   adaptive_struct.global_recv_iter_counter = 0;
121   adaptive_struct.in_progress = false;
122   adaptive_struct.lb_strategy_cost = 0.0;
123   adaptive_struct.lb_migration_cost = 0.0;
124   adaptive_struct.lb_msg_send_no = 0;
125   adaptive_struct.lb_msg_recv_no = 0;
126   adaptive_struct.total_syncs_called = 0;
127   adaptive_struct.last_lb_type = -1;
128
129
130   // This is indicating if the load balancing strategy and migration started.
131   // This is mainly used to register callbacks for noobj pes. They would
132   // register as soon as resumefromsync is called. On receiving the handles at
133   // the central pe, it clears the previous handlers and sets lb_in_progress
134   // to false so that it doesn't clear the handles.
135   lb_in_progress = false;
136
137   is_prev_lb_refine = -1;
138 }
139
140 void MetaBalancer::pup(PUP::er& p)
141 {
142         IrrGroup::pup(p);
143   if (p.isUnpacking()) {
144     lbdatabase = (LBDatabase *)CkLocalBranch(_lbdb);
145   }
146   // NOTE set lbdatabase using the id
147 }
148
149
150 void MetaBalancer::ResumeClients() {
151   // If metabalancer enabled, initialize the variables
152   adaptive_lbdb.history_data.clear();
153
154   adaptive_struct.tentative_period =  INT_MAX;
155   adaptive_struct.final_lb_period =  INT_MAX;
156   adaptive_struct.lb_calculated_period = INT_MAX;
157   adaptive_struct.lb_iteration_no = -1;
158   adaptive_struct.global_max_iter_no = 0;
159   adaptive_struct.tentative_max_iter_no = -1;
160   adaptive_struct.global_recv_iter_counter = 0;
161   adaptive_struct.in_progress = false;
162   adaptive_struct.lb_strategy_cost = 0.0;
163   adaptive_struct.lb_migration_cost = 0.0;
164   adaptive_struct.lb_msg_send_no = 0;
165   adaptive_struct.lb_msg_recv_no = 0;
166   adaptive_struct.total_syncs_called = 0;
167
168   prev_idle = 0.0;
169   if (lb_in_progress) {
170     lbdb_no_obj_callback.clear();
171     lb_in_progress = false;
172   }
173   // While resuming client, if we find that there are no objects, then handle
174   // the case accordingly.
175   if (lbdatabase->getLBDB()->ObjDataCount() == 0) {
176     CkPrintf("%d processor has 0 objs\n", CkMyPe());
177     HandleAdaptiveNoObj();
178   }
179 }
180
181 int MetaBalancer::get_iteration() {
182   return adaptive_struct.lb_iteration_no;
183 }
184
185 bool MetaBalancer::AddLoad(int it_n, double load) {
186   int index = it_n % VEC_SIZE;
187   total_count_vec[index]++;
188   adaptive_struct.total_syncs_called++;
189   DEBAD(("At PE %d Total contribution for iteration %d is %d total objs %d\n",
190       CkMyPe(), it_n, total_count_vec[index],
191       lbdatabase->getLBDB()->ObjDataCount()));
192
193   if (it_n < adaptive_struct.lb_iteration_no) {
194     CkAbort("Error!! Received load for previous iteration\n");
195   }
196   if (it_n > adaptive_struct.lb_iteration_no) {
197     adaptive_struct.lb_iteration_no = it_n;
198   }
199   total_load_vec[index] += load;
200   if (total_count_vec[index] > lbdatabase->getLBDB()->ObjDataCount()) {
201     CkPrintf("iteration %d received %d contributions and expected %d\n", it_n,
202         total_count_vec[index], lbdatabase->getLBDB()->ObjDataCount());
203     CkAbort("Abort!!! Received more contribution");
204   }
205
206   if (total_count_vec[index] == lbdatabase->getLBDB()->ObjDataCount()) {
207     double idle_time, bg_walltime, cpu_bgtime;
208     lbdatabase->IdleTime(&idle_time);
209     lbdatabase->BackgroundLoad(&bg_walltime, &cpu_bgtime);
210
211     int sync_for_bg = adaptive_struct.total_syncs_called +
212         lbdatabase->getLBDB()->ObjDataCount();
213     bg_walltime = bg_walltime * lbdatabase->getLBDB()->ObjDataCount() / sync_for_bg;
214
215     if (it_n < NEGLECT_IDLE) {
216       prev_idle = idle_time;
217     }
218     idle_time -= prev_idle;
219
220     // The chares do not contribute their 0th iteration load. So the total syncs
221     // in reality is total_syncs_called + obj_counts
222     int total_countable_syncs = adaptive_struct.total_syncs_called +
223         (1 - NEGLECT_IDLE) * lbdatabase->getLBDB()->ObjDataCount(); // TODO: Fix me! weird!
224     if (total_countable_syncs != 0) {
225       idle_time = idle_time * lbdatabase->getLBDB()->ObjDataCount() / total_countable_syncs;
226     }
227     //CkPrintf("[%d] Idle time %lf and countable %d for iteration %d\n", CkMyPe(), idle_time, total_countable_syncs, iteration);
228
229     double lb_data[8];
230     lb_data[0] = it_n;
231     lb_data[1] = 1;
232     lb_data[2] = total_load_vec[index]; // For average load
233     lb_data[3] = total_load_vec[index]; // For max load
234     lb_data[4] = idle_time;
235     // Set utilization
236     if (total_load_vec[index] == 0.0) {
237       lb_data[5] = 0.0;
238     } else {
239       lb_data[5] = total_load_vec[index]/(idle_time + total_load_vec[index]);
240     }
241     lb_data[6] = lb_data[2] + bg_walltime; // For Avg load with bg
242     lb_data[7] = lb_data[6]; // For Max load with bg
243     total_load_vec[index] = 0.0;
244     total_count_vec[index] = 0;
245
246     //CkPrintf("   [%d] sends total load %lf idle time %lf ratio of idle/load %lf at iter %d\n", CkMyPe(),
247     //    total_load_vec[iteration], idle_time,
248     //    idle_time/total_load_vec[iteration], adaptive_struct.lb_iteration_no);
249
250     CkCallback cb(CkIndex_MetaBalancer::ReceiveMinStats((CkReductionMsg*)NULL), thisProxy[0]);
251     contribute(8*sizeof(double), lb_data, lbDataCollectionType, cb);
252   }
253   return true;
254 }
255
256 void MetaBalancer::ReceiveMinStats(CkReductionMsg *msg) {
257   double* load = (double *) msg->getData();
258   double avg = load[2]/load[1];
259   double max = load[3];
260   double avg_idle = load[4]/load[1];
261   double utilization = load[5];
262   int iteration_n = load[0];
263   double avg_load_bg = load[6]/load[1];
264   double max_load_bg = load[7];
265   DEBAD(("** [%d] Iteration Avg load: %lf Max load: %lf Avg Idle : %lf Max Idle : %lf for %lf procs\n",iteration_n, avg, max, avg_idle, utilization, load[1]));
266   CkPrintf("** [%d] Iteration Avg load: %lf Max load: %lf With bg Avg load: %lf Max load: %lf Avg Idle : %lf Max Idle : %lf for %lf procs\n",iteration_n, avg, max, avg_load_bg, max_load_bg, avg_idle, utilization, load[1]);
267   delete msg;
268
269 #if EXTRA_FEATURE
270   if (adaptive_struct.final_lb_period != iteration_n) {
271     for (int i = 0; i < lbdb_no_obj_callback.size(); i++) {
272       thisProxy[lbdb_no_obj_callback[i]].TriggerAdaptiveReduction();
273     }
274   }
275 #endif
276
277   // Store the data for this iteration
278   adaptive_lbdb.lb_iter_no = iteration_n;
279   AdaptiveData data;
280   data.iteration = adaptive_lbdb.lb_iter_no;
281   data.max_load = max;
282   data.avg_load = avg;
283   data.utilization = utilization;
284   data.idle_time = avg_idle;
285   adaptive_lbdb.history_data.push_back(data);
286
287   // If lb period inform is in progress, dont inform again.
288   // If this is the lb data corresponding to the final lb period informed, then
289   // don't recalculate as some of the processors might have already gone into
290   // LB_STAGE.
291   if (adaptive_struct.in_progress || (adaptive_struct.final_lb_period == iteration_n)) {
292     return;
293   }
294
295   double utilization_threshold = UTILIZATION_THRESHOLD;
296
297 #if EXTRA_FEATURE
298   CkPrintf("alpha_beta_to_load %lf\n", alpha_beta_cost_to_load);
299   if (alpha_beta_cost_to_load < 0.1) {
300     // Ignore the effect of idle time and there by lesser utilization. So we
301     // assign utilization threshold to be 0.0
302     CkPrintf("Changing the idle load tolerance coz this isn't communication intensive benchmark\n");
303     utilization_threshold = 0.0;
304   }
305 #endif
306
307   // First generate the lb period based on the cost of lb. Find out what is the
308   // expected imbalance at the calculated lb period.
309   int period;
310   // This is based on the new max load after load balancing. So technically, it
311   // is calculated based on the shifter up avg curve.
312   double ratio_at_t = 1.0;
313   int tmp_lb_type;
314   double tmp_max_avg_ratio, tmp_comm_ratio;
315   GetPrevLBData(tmp_lb_type, tmp_max_avg_ratio, tmp_comm_ratio);
316   double tolerate_imb = IMB_TOLERANCE * tmp_max_avg_ratio;
317
318   if (generatePlan(period, ratio_at_t)) {
319     DEBAD(("Generated period and calculated %d and period %d max iter %d\n",
320       adaptive_struct.lb_calculated_period, period,
321       adaptive_struct.tentative_max_iter_no));
322     // set the imbalance tolerance to be ratio_at_calculated_lb_period
323     if (ratio_at_t != 1.0) {
324       CkPrintf("Changed tolerance to %lf after line eq whereas max/avg is %lf\n", ratio_at_t, max/avg);
325       // Since ratio_at_t is shifter up, max/(tmp_max_avg_ratio * avg) should be
326       // compared with the tolerance
327       tolerate_imb = ratio_at_t * tmp_max_avg_ratio * OUTOFWAY_TOLERANCE;
328     }
329
330     CkPrintf("Prev LB Data Type %d, max/avg %lf, local/remote %lf\n", tmp_lb_type, tmp_max_avg_ratio, tmp_comm_ratio);
331
332     if ((utilization < utilization_threshold || max/avg >= tolerate_imb) &&
333           adaptive_lbdb.history_data.size() > MIN_STATS) {
334       CkPrintf("Trigger soon even though we calculated lbperiod max/avg(%lf) and utilization ratio (%lf)\n", max/avg, utilization);
335       TriggerSoon(iteration_n, max/avg, tolerate_imb);
336       return;
337     }
338
339     // If the new lb period from linear extrapolation is greater than maximum
340     // iteration known from previously collected data, then inform all the
341     // processors about the new calculated period.
342     if (period > adaptive_struct.tentative_max_iter_no && period !=
343           adaptive_struct.final_lb_period) {
344       adaptive_struct.doCommStrategy = false;
345       adaptive_struct.lb_calculated_period = period;
346       adaptive_struct.in_progress = true;
347       CkPrintf("Sticking to the calculated period %d\n",
348         adaptive_struct.lb_calculated_period);
349       thisProxy.LoadBalanceDecision(adaptive_struct.lb_msg_send_no++,
350         adaptive_struct.lb_calculated_period);
351       return;
352     }
353     // TODO: Shouldn't we return from here??
354   }
355
356   CkPrintf("Prev LB Data Type %d, max/avg %lf, local/remote %lf\n", tmp_lb_type, tmp_max_avg_ratio, tmp_comm_ratio);
357
358   // This would be called when the datasize is not enough to calculate lb period
359   if ((utilization < utilization_threshold || max/avg >= tolerate_imb) && adaptive_lbdb.history_data.size() > 4) {
360     CkPrintf("Carry out load balancing step at iter max/avg(%lf) and utilization ratio (%lf)\n", max/avg, utilization);
361     TriggerSoon(iteration_n, max/avg, tolerate_imb);
362     return;
363   }
364
365 }
366
367 void MetaBalancer::TriggerSoon(int iteration_n, double imbalance_ratio,
368     double tolerate_imb) {
369
370   // If the previously calculated_period (not the final decision) is greater
371   // than the iter +1 and if it is greater than the maximum iteration we have
372   // seen so far, then we can inform this
373   if ((iteration_n + 1 > adaptive_struct.tentative_max_iter_no) &&
374       (iteration_n+1 < adaptive_struct.lb_calculated_period) &&
375       (iteration_n + 1 != adaptive_struct.final_lb_period)) {
376     if (imbalance_ratio < tolerate_imb) {
377       adaptive_struct.doCommStrategy = true;
378       CkPrintf("No load imbalance but idle time\n");
379     } else {
380       adaptive_struct.doCommStrategy = false;
381       CkPrintf("load imbalance \n");
382     }
383     adaptive_struct.lb_calculated_period = iteration_n + 1;
384     adaptive_struct.in_progress = true;
385     CkPrintf("Informing everyone the lb period is %d\n",
386         adaptive_struct.lb_calculated_period);
387     thisProxy.LoadBalanceDecision(adaptive_struct.lb_msg_send_no++,
388         adaptive_struct.lb_calculated_period);
389   }
390 }
391
392 bool MetaBalancer::generatePlan(int& period, double& ratio_at_t) {
393   if (adaptive_lbdb.history_data.size() <= 4) {
394     return false;
395   }
396
397   // Some heuristics for lbperiod
398   // If constant load or almost constant,
399   // then max * new_lb_period > avg * new_lb_period + lb_cost
400   double max = 0.0;
401   double avg = 0.0;
402   AdaptiveData data;
403   for (int i = 0; i < adaptive_lbdb.history_data.size(); i++) {
404     data = adaptive_lbdb.history_data[i];
405     max += data.max_load;
406     avg += data.avg_load;
407     //DEBAD(("max (%d, %lf) avg (%d, %lf)\n", i, data.max_load, i, data.avg_load));
408     //CkPrintf("max (%d, %lf) avg (%d, %lf)\n", i, data.max_load, i, data.avg_load);
409   }
410 //  max /= (adaptive_struct.lb_iteration_no - adaptive_lbdb.history_data[0].iteration);
411 //  avg /= (adaptive_struct.lb_iteration_no - adaptive_lbdb.history_data[0].iteration);
412
413   // If linearly varying load, then find lb_period
414   // area between the max and avg curve
415   // If we can attain perfect balance, then the new load is close to the
416   // average. Hence we pass 1, else pass in some other value which would be the
417   // new max_load after load balancing.
418   int tmp_lb_type;
419   double tmp_max_avg_ratio, tmp_comm_ratio;
420   double tolerate_imb;
421
422 #if EXTRA_FEATURE
423   // First get the data for refine.
424   GetLBDataForLB(1, tmp_max_avg_ratio, tmp_comm_ratio);
425   tolerate_imb = tmp_max_avg_ratio;
426
427   // If RefineLB does a good job, then find the period considering RefineLB
428   if (tmp_max_avg_ratio <= 1.01) {
429     if (max/avg < tolerate_imb) {
430       CkPrintf("Resorting to imb = 1.0 coz max/avg (%lf) < imb(%lf)\n", max/avg, tolerate_imb);
431       tolerate_imb = 1.0;
432     }
433     CkPrintf("Will generate plan for refine %lf imb and %lf overhead\n", tolerate_imb, 0.2);
434     return getPeriodForStrategy(tolerate_imb, 0.2, period, ratio_at_t);
435   }
436
437   GetLBDataForLB(0, tmp_max_avg_ratio, tmp_comm_ratio);
438 #endif
439
440   GetPrevLBData(tmp_lb_type, tmp_max_avg_ratio, tmp_comm_ratio);
441   tolerate_imb = tmp_max_avg_ratio;
442 //  if (max/avg < tolerate_imb) {
443 //    CkPrintf("Resorting to imb = 1.0 coz max/avg (%lf) < imb(%lf)\n", max/avg, tolerate_imb);
444 //    tolerate_imb = 1.0;
445 //  }
446   if (max/avg > tolerate_imb) {
447     if (getPeriodForStrategy(tolerate_imb, 1, period, ratio_at_t)) {
448       return true;
449     }
450   }
451
452   max = 0.0;
453   avg = 0.0;
454   for (int i = 0; i < adaptive_lbdb.history_data.size(); i++) {
455     data = adaptive_lbdb.history_data[i];
456     max += data.max_load;
457     avg += data.avg_load*tolerate_imb;
458     //DEBAD(("max (%d, %lf) avg (%d, %lf)\n", i, data.max_load, i, data.avg_load));
459     //CkPrintf("max (%d, %lf) avg (%d, %lf)\n", i, data.max_load, i, data.avg_load);
460   }
461   max /= adaptive_lbdb.history_data.size();
462   avg /= adaptive_lbdb.history_data.size();
463   double cost = adaptive_struct.lb_strategy_cost + adaptive_struct.lb_migration_cost;
464   period = cost/(max - avg); 
465   CkPrintf("Obtained period %d from constant prediction\n", period);
466   if (period < 0) { 
467     period = adaptive_struct.final_lb_period;
468     CkPrintf("Obtained -ve period from constant prediction so changing to prev %d\n", period);
469   } 
470   ratio_at_t = max / avg;
471   return true;
472 }
473
474 bool MetaBalancer::getPeriodForStrategy(double new_load_percent,
475     double overhead_percent, int& period, double& ratio_at_t) {
476   double mslope, aslope, mc, ac;
477   getLineEq(new_load_percent, aslope, ac, mslope, mc);
478   CkPrintf("new load percent %lf\n", new_load_percent);
479   CkPrintf("\n max: %fx + %f; avg: %fx + %f\n", mslope, mc, aslope, ac);
480   double a = (mslope - aslope)/2;
481   double b = (mc - ac);
482   double c = -(adaptive_struct.lb_strategy_cost +
483       adaptive_struct.lb_migration_cost) * overhead_percent;
484   bool got_period = getPeriodForLinear(a, b, c, period);
485   if (!got_period) {
486     return false;
487   }
488
489   if (mslope < 0) {
490     if (period > (-mc/mslope)) {
491       CkPrintf("Max < 0 Period set when max load is -ve\n");
492       return false;
493     }
494   }
495
496   if (aslope < 0) {
497     if (period > (-ac/aslope)) {
498       CkPrintf("Avg < 0 Period set when avg load is -ve\n");
499       return false;
500     }
501   }
502
503   int intersection_t = (mc-ac) / (aslope - mslope);
504   if (intersection_t > 0 && period > intersection_t) {
505     CkPrintf("Avg | Max Period set when curves intersect\n");
506     return false;
507   }
508   ratio_at_t = ((mslope*period + mc)/(aslope*period + ac));
509   CkPrintf("Ratio at t (%lf*%d + %lf) / (%lf*%d+%lf) = %lf\n", mslope, period, mc, aslope, period, ac, ratio_at_t);
510   return true;
511 }
512
513 bool MetaBalancer::getPeriodForLinear(double a, double b, double c, int& period) {
514   CkPrintf("Quadratic Equation %lf X^2 + %lf X + %lf\n", a, b, c);
515   if (a == 0.0) {
516     period = (-c / b);
517     if (period < 0) {
518       CkPrintf("-ve period for -c/b (%d)\n", period);
519       return false;
520     }
521     CkPrintf("Ideal period for linear load %d\n", period);
522     return true;
523   }
524   int x;
525   double t = (b * b) - (4*a*c);
526   if (t < 0) {
527     CkPrintf("(b * b) - (4*a*c) is -ve sqrt : %lf\n", sqrt(t));
528     return false;
529   }
530   t = (-b + sqrt(t)) / (2*a);
531   x = t;
532   if (x < 0) {
533     CkPrintf("boo!!! x (%d) < 0\n", x);
534     x = 0;
535     return false;
536   }
537   period = x;
538   CkPrintf("Ideal period for linear load %d\n", period);
539   return true;
540 }
541
542 bool MetaBalancer::getLineEq(double new_load_percent, double& aslope, double& ac, double& mslope, double& mc) {
543   int total = adaptive_lbdb.history_data.size();
544   int iterations = 1 + adaptive_lbdb.history_data[total - 1].iteration -
545       adaptive_lbdb.history_data[0].iteration;
546   double a1 = 0;
547   double m1 = 0;
548   double a2 = 0;
549   double m2 = 0;
550   AdaptiveData data;
551   int i = 0;
552   for (i = 0; i < total/2; i++) {
553     data = adaptive_lbdb.history_data[i];
554     m1 += data.max_load;
555     a1 += data.avg_load;
556     CkPrintf("max (%d, %lf) avg (%d, %lf) adjusted_avg (%d, %lf)\n", i, data.max_load, i, data.avg_load, i, new_load_percent*data.avg_load);
557   }
558   m1 /= i;
559   a1 = (a1 * new_load_percent) / i;
560
561   for (i = total/2; i < total; i++) {
562     data = adaptive_lbdb.history_data[i];
563     m2 += data.max_load;
564     a2 += data.avg_load;
565     CkPrintf("max (%d, %lf) avg (%d, %lf) adjusted_avg (%d, %lf)\n", i, data.max_load, i, data.avg_load, i, new_load_percent*data.avg_load);
566   }
567   m2 /= (i - total/2);
568   a2 = (a2 * new_load_percent) / (i - total/2);
569
570   aslope = 2 * (a2 - a1) / iterations;
571   mslope = 2 * (m2 - m1) / iterations;
572   ac = adaptive_lbdb.history_data[0].avg_load * new_load_percent;
573   mc = adaptive_lbdb.history_data[0].max_load;
574
575   ac = a1 - ((aslope * total)/4);
576   mc = m1 - ((mslope * total)/4);
577
578   //ac = (adaptive_lbdb.history_data[1].avg_load * new_load_percent - aslope);
579   //mc = (adaptive_lbdb.history_data[1].max_load - mslope);
580
581   return true;
582 }
583
584 void MetaBalancer::LoadBalanceDecision(int req_no, int period) {
585   if (req_no < adaptive_struct.lb_msg_recv_no) {
586     CkPrintf("Error!!! Received a request which was already sent or old\n");
587     return;
588   }
589   CkPrintf("[%d] Load balance decision made cur iteration: %d period:%d\n",CkMyPe(), adaptive_struct.lb_iteration_no, period);
590   adaptive_struct.tentative_period = period;
591   adaptive_struct.lb_msg_recv_no = req_no;
592   thisProxy[0].ReceiveIterationNo(req_no, adaptive_struct.lb_iteration_no);
593 }
594
595 void MetaBalancer::LoadBalanceDecisionFinal(int req_no, int period) {
596   if (req_no < adaptive_struct.lb_msg_recv_no) {
597     return;
598   }
599   DEBAD(("[%d] Final Load balance decision made cur iteration: %d period:%d \n",CkMyPe(), adaptive_struct.lb_iteration_no, period));
600   adaptive_struct.tentative_period = period;
601   adaptive_struct.final_lb_period = period;
602   // NOTE LDOMAdaptResumeSync(myLDHandle, period);
603   lbdatabase->AdaptResumeSync(period);
604 }
605
606
607 void MetaBalancer::ReceiveIterationNo(int req_no, int local_iter_no) {
608   CmiAssert(CkMyPe() == 0);
609
610   adaptive_struct.global_recv_iter_counter++;
611   if (local_iter_no > adaptive_struct.global_max_iter_no) {
612     adaptive_struct.global_max_iter_no = local_iter_no;
613   }
614
615   int period;
616   if (CkNumPes() == adaptive_struct.global_recv_iter_counter) {
617
618     if (adaptive_struct.global_max_iter_no > adaptive_struct.tentative_max_iter_no) {
619       adaptive_struct.tentative_max_iter_no = adaptive_struct.global_max_iter_no;
620     }
621     period = (adaptive_struct.tentative_period > adaptive_struct.global_max_iter_no) ? adaptive_struct.tentative_period : adaptive_struct.global_max_iter_no + 1;
622     // If no one has gone into load balancing stage, then we can safely change
623     // the period otherwise keep the old period.
624     if (adaptive_struct.global_max_iter_no < adaptive_struct.final_lb_period) {
625       adaptive_struct.tentative_period = period;
626       CkPrintf("Final lb_period CHANGED!%d\n", adaptive_struct.tentative_period);
627     } else {
628       adaptive_struct.tentative_period = adaptive_struct.final_lb_period;
629       CkPrintf("Final lb_period NOT CHANGED!%d\n", adaptive_struct.tentative_period);
630     }
631     thisProxy.LoadBalanceDecisionFinal(req_no, adaptive_struct.tentative_period);
632     adaptive_struct.in_progress = false;
633     adaptive_struct.global_recv_iter_counter = 0;
634   }
635 }
636
637 int MetaBalancer::getPredictedLBPeriod(bool& is_tentative) {
638   // If tentative and final_lb_period are the same, then the decision has been
639   // made but if not, they are in the middle of consensus, hence return the
640   // lease of the two
641   if (adaptive_struct.tentative_period != adaptive_struct.final_lb_period) {
642     is_tentative = true;
643   } else {
644     is_tentative = false;
645   }
646   if (adaptive_struct.tentative_period < adaptive_struct.final_lb_period) {
647     return adaptive_struct.tentative_period;
648    } else {
649      return adaptive_struct.final_lb_period;
650    }
651 }
652
653 // Called by CentralLB to indicate that the LB strategy and migration is in
654 // progress.
655 void MetaBalancer::ResetAdaptive() {
656   adaptive_lbdb.lb_iter_no = -1;
657   lb_in_progress = true;
658 }
659
660 void MetaBalancer::HandleAdaptiveNoObj() {
661 #if EXTRA_FEATURE
662   adaptive_struct.lb_iteration_no++;
663   //CkPrintf("HandleAdaptiveNoObj %d\n", adaptive_struct.lb_iteration_no);
664   thisProxy[0].RegisterNoObjCallback(CkMyPe());
665   TriggerAdaptiveReduction();
666 #endif
667 }
668
669 void MetaBalancer::RegisterNoObjCallback(int index) {
670 #if EXTRA_FEATURE
671   if (lb_in_progress) {
672     lbdb_no_obj_callback.clear();
673     //CkPrintf("Clearing and registering\n");
674     lb_in_progress = false;
675   }
676   lbdb_no_obj_callback.push_back(index);
677   CkPrintf("Registered %d to have no objs.\n", index);
678
679   // If collection has already happened and this is second iteration, then
680   // trigger reduction.
681   if (adaptive_lbdb.lb_iter_no != -1) {
682     //CkPrintf("Collection already started now %d so kick in\n", adaptive_struct.lb_iteration_no);
683     thisProxy[index].TriggerAdaptiveReduction();
684   }
685 #endif
686 }
687
688 void MetaBalancer::TriggerAdaptiveReduction() {
689 #if EXTRA_FEATURE
690   adaptive_struct.lb_iteration_no++;
691   //CkPrintf("Trigger adaptive for %d\n", adaptive_struct.lb_iteration_no);
692   double lb_data[8];
693   lb_data[0] = adaptive_struct.lb_iteration_no;
694   lb_data[1] = 1;
695   lb_data[2] = 0.0;
696   lb_data[3] = 0.0;
697   lb_data[4] = 0.0;
698   lb_data[5] = 0.0;
699   lb_data[6] = 0.0;
700   lb_data[7] = 0.0;
701
702   // CkPrintf("   [%d] sends total load %lf idle time %lf ratio of idle/load %lf at iter %d\n", CkMyPe(),
703   //     total_load_vec[iteration], idle_time,
704   //     idle_time/total_load_vec[iteration], adaptive_struct.lb_iteration_no);
705
706   CkCallback cb(CkIndex_MetaBalancer::ReceiveMinStats((CkReductionMsg*)NULL), thisProxy[0]);
707   contribute(8*sizeof(double), lb_data, lbDataCollectionType, cb);
708 #endif
709 }
710
711
712 bool MetaBalancer::isStrategyComm() {
713   return adaptive_struct.doCommStrategy;
714 }
715
716 void MetaBalancer::SetMigrationCost(double lb_migration_cost) {
717   adaptive_struct.lb_migration_cost = lb_migration_cost;
718 }
719
720 void MetaBalancer::SetStrategyCost(double lb_strategy_cost) {
721   adaptive_struct.lb_strategy_cost = lb_strategy_cost;
722 }
723
724 void MetaBalancer::UpdateAfterLBData(int lb, double lb_max, double lb_avg, double
725     local_comm, double remote_comm) {
726   adaptive_struct.last_lb_type = lb;
727   if (lb == 0) {
728     adaptive_struct.greedy_info.max_avg_ratio = lb_max/lb_avg;
729   } else if (lb == 1) {
730     adaptive_struct.refine_info.max_avg_ratio = lb_max/lb_avg;
731   } else if (lb == 2) {
732     adaptive_struct.comm_info.remote_local_ratio = remote_comm/local_comm;
733   } else if (lb == 3) {
734     adaptive_struct.comm_refine_info.remote_local_ratio =
735     remote_comm/local_comm;
736   }
737 }
738
739 void MetaBalancer::UpdateAfterLBData(double max_load, double max_cpu, double
740 avg_load) {
741   if (adaptive_struct.last_lb_type == -1) {
742     adaptive_struct.last_lb_type = 0;
743   }
744   int lb = adaptive_struct.last_lb_type;
745   //CkPrintf("Storing data after lb ratio %lf for lb %d\n", max_load/avg_load, lb);
746   if (lb == 0) {
747     adaptive_struct.greedy_info.max_avg_ratio = max_load/avg_load;
748   } else if (lb == 1) {
749     adaptive_struct.refine_info.max_avg_ratio = max_load/avg_load;
750   } else if (lb == 2) {
751     adaptive_struct.comm_info.max_avg_ratio = max_load/avg_load;
752   } else if (lb == 3) {
753     adaptive_struct.comm_refine_info.max_avg_ratio = max_load/avg_load;
754   }
755 }
756
757 void MetaBalancer::UpdateAfterLBComm(double alpha_beta_to_load) {
758   CkPrintf("Setting alpha beta %lf\n", alpha_beta_to_load);
759   alpha_beta_cost_to_load = alpha_beta_to_load;
760 }
761
762
763 void MetaBalancer::GetPrevLBData(int& lb_type, double& lb_max_avg_ratio, double&
764     remote_local_comm_ratio) {
765   lb_type = adaptive_struct.last_lb_type;
766   lb_max_avg_ratio = 1;
767   remote_local_comm_ratio = 1;
768   GetLBDataForLB(lb_type, lb_max_avg_ratio, remote_local_comm_ratio);
769 }
770
771 void MetaBalancer::GetLBDataForLB(int lb_type, double& lb_max_avg_ratio, double&
772     remote_local_comm_ratio) {
773   if (lb_type == 0) {
774     lb_max_avg_ratio = adaptive_struct.greedy_info.max_avg_ratio;
775   } else if (lb_type == 1) {
776     lb_max_avg_ratio = adaptive_struct.refine_info.max_avg_ratio;
777   } else if (lb_type == 2) {
778     remote_local_comm_ratio = adaptive_struct.comm_info.remote_local_ratio;
779   } else if (lb_type == 3) {
780     remote_local_comm_ratio =
781        adaptive_struct.comm_refine_info.remote_local_ratio;
782   }
783 }
784
785 #include "MetaBalancer.def.h"
786
787 /*@}*/