more cleanup
[charm.git] / src / ck-ldb / MetaBalancer.C
1 /**
2  * 
3 */
4 /*@{*/
5
6 #include "converse.h"
7
8 /*
9  */
10
11 #include "MetaBalancer.h"
12 #include "topology.h"
13
14 #include "limits.h"
15
16 #define VEC_SIZE 50
17 #define IMB_TOLERANCE 1.1
18 #define OUTOFWAY_TOLERANCE 2
19 #define UTILIZATION_THRESHOLD 0.7
20 #define NEGLECT_IDLE 2 // Should never be == 1
21 #define MIN_STATS 6
22 #define STATS_COUNT 8 // The number of stats collected during reduction
23
24 #   define DEBAD(x) /*CkPrintf x*/
25 #   define EXTRA_FEATURE 0
26
27 CkReductionMsg* lbDataCollection(int nMsg, CkReductionMsg** msgs) {
28   double lb_data[STATS_COUNT];
29   lb_data[1] = 0.0; // total number of processors contributing
30   lb_data[2] = 0.0; // total load
31   lb_data[3] = 0.0; // max load
32   lb_data[4] = 0.0; // idle time
33   lb_data[5] = 1.0; // utilization
34   lb_data[6] = 0.0; // total load with bg
35   lb_data[7] = 0.0; // max load with bg
36   for (int i = 0; i < nMsg; i++) {
37     CkAssert(msgs[i]->getSize() == STATS_COUNT*sizeof(double));
38     if (msgs[i]->getSize() != STATS_COUNT*sizeof(double)) {
39       CkPrintf("Error!!! Reduction not correct. Msg size is %d\n", msgs[i]->getSize());
40       CkAbort("Incorrect Reduction size in MetaBalancer\n");
41     }
42     double* m = (double *)msgs[i]->getData();
43     // Total count
44     lb_data[1] += m[1];
45     // Avg load
46     lb_data[2] += m[2];
47     // Max load
48     lb_data[3] = ((m[3] > lb_data[3])? m[3] : lb_data[3]);
49     // Avg idle
50     lb_data[4] += m[4];
51     // Get least utilization
52     lb_data[5] = ((m[5] < lb_data[5]) ? m[5] : lb_data[5]);
53     // Get Avg load with bg
54     lb_data[6] += m[6];
55     // Get Max load with bg
56     lb_data[7] = ((m[7] > lb_data[7])? m[7] : lb_data[7]);
57     if (i == 0) {
58       // Iteration no
59       lb_data[0] = m[0];
60     }
61     if (m[0] != lb_data[0]) {
62       CkPrintf("Error!!! Reduction is intermingled between iteration %lf \
63         and %lf\n", lb_data[0], m[0]);
64       CkAbort("Intermingling iterations in MetaBalancer\n");
65     }
66   }
67   return CkReductionMsg::buildNew(STATS_COUNT*sizeof(double), lb_data);
68 }
69
70 /*global*/ CkReduction::reducerType lbDataCollectionType;
71 /*initcall*/ void registerLBDataCollection(void) {
72   lbDataCollectionType = CkReduction::addReducer(lbDataCollection);
73 }
74
75 CkGroupID _metalb;
76
77 CkpvDeclare(int, metalbInited);  /**< true if metabalancer is inited */
78
79 // mainchare
80 MetaLBInit::MetaLBInit(CkArgMsg *m) {
81 #if CMK_LBDB_ON
82   _metalb = CProxy_MetaBalancer::ckNew();
83 #endif
84   delete m;
85 }
86
87 // called from init.C
88 void _metabalancerInit() {
89   CkpvInitialize(int, metalbInited);
90   CkpvAccess(metalbInited) = 0;
91 }
92
93 void MetaBalancer::initnodeFn() {
94 }
95
96 // called by my constructor
97 void MetaBalancer::init(void) {
98   lbdatabase = (LBDatabase *)CkLocalBranch(_lbdb);
99   CkpvAccess(metalbInited) = 1;
100   total_load_vec.resize(VEC_SIZE, 0.0);
101   total_count_vec.resize(VEC_SIZE, 0);
102   max_iteration = -1;
103   prev_idle = 0.0;
104   alpha_beta_cost_to_load = 1.0; // Some random value. TODO: Find the actual
105
106   adaptive_lbdb.lb_iter_no = -1;
107
108   // If metabalancer enabled, initialize the variables
109   adaptive_struct.tentative_period =  INT_MAX;
110   adaptive_struct.final_lb_period =  INT_MAX;
111   adaptive_struct.lb_calculated_period = INT_MAX;
112   adaptive_struct.lb_iteration_no = -1;
113   adaptive_struct.global_max_iter_no = 0;
114   adaptive_struct.tentative_max_iter_no = -1;
115   adaptive_struct.global_recv_iter_counter = 0;
116   adaptive_struct.in_progress = false;
117   adaptive_struct.lb_strategy_cost = 0.0;
118   adaptive_struct.lb_migration_cost = 0.0;
119   adaptive_struct.lb_msg_send_no = 0;
120   adaptive_struct.lb_msg_recv_no = 0;
121   adaptive_struct.total_syncs_called = 0;
122   adaptive_struct.last_lb_type = -1;
123
124
125   // This is indicating if the load balancing strategy and migration started.
126   // This is mainly used to register callbacks for noobj pes. They would
127   // register as soon as resumefromsync is called. On receiving the handles at
128   // the central pe, it clears the previous handlers and sets lb_in_progress
129   // to false so that it doesn't clear the handles.
130   lb_in_progress = false;
131
132   is_prev_lb_refine = -1;
133 }
134
135 void MetaBalancer::pup(PUP::er& p) {
136         IrrGroup::pup(p);
137   if (p.isUnpacking()) {
138     lbdatabase = (LBDatabase *)CkLocalBranch(_lbdb);
139   }
140 }
141
142
143 void MetaBalancer::ResumeClients() {
144   // If metabalancer enabled, initialize the variables
145   adaptive_lbdb.history_data.clear();
146
147   adaptive_struct.tentative_period =  INT_MAX;
148   adaptive_struct.final_lb_period =  INT_MAX;
149   adaptive_struct.lb_calculated_period = INT_MAX;
150   adaptive_struct.lb_iteration_no = -1;
151   adaptive_struct.global_max_iter_no = 0;
152   adaptive_struct.tentative_max_iter_no = -1;
153   adaptive_struct.global_recv_iter_counter = 0;
154   adaptive_struct.in_progress = false;
155   adaptive_struct.lb_strategy_cost = 0.0;
156   adaptive_struct.lb_migration_cost = 0.0;
157   adaptive_struct.lb_msg_send_no = 0;
158   adaptive_struct.lb_msg_recv_no = 0;
159   adaptive_struct.total_syncs_called = 0;
160
161   prev_idle = 0.0;
162   if (lb_in_progress) {
163     lbdb_no_obj_callback.clear();
164     lb_in_progress = false;
165   }
166 }
167
168 int MetaBalancer::get_iteration() {
169   return adaptive_struct.lb_iteration_no;
170 }
171
172 bool MetaBalancer::AddLoad(int it_n, double load) {
173   int index = it_n % VEC_SIZE;
174   total_count_vec[index]++;
175   adaptive_struct.total_syncs_called++;
176   DEBAD(("At PE %d Total contribution for iteration %d is %d total objs %d\n",
177       CkMyPe(), it_n, total_count_vec[index],
178       lbdatabase->getLBDB()->ObjDataCount()));
179
180   if (it_n < adaptive_struct.lb_iteration_no) {
181     CkAbort("Error!! Received load for previous iteration\n");
182   }
183   if (it_n > adaptive_struct.lb_iteration_no) {
184     adaptive_struct.lb_iteration_no = it_n;
185   }
186   total_load_vec[index] += load;
187   if (total_count_vec[index] > lbdatabase->getLBDB()->ObjDataCount()) {
188     CkPrintf("iteration %d received %d contributions and expected %d\n", it_n,
189         total_count_vec[index], lbdatabase->getLBDB()->ObjDataCount());
190     CkAbort("Abort!!! Received more contribution");
191   }
192
193   if (total_count_vec[index] == lbdatabase->getLBDB()->ObjDataCount()) {
194     double idle_time, bg_walltime, cpu_bgtime;
195     lbdatabase->IdleTime(&idle_time);
196     lbdatabase->BackgroundLoad(&bg_walltime, &cpu_bgtime);
197
198     int sync_for_bg = adaptive_struct.total_syncs_called +
199         lbdatabase->getLBDB()->ObjDataCount();
200     bg_walltime = bg_walltime * lbdatabase->getLBDB()->ObjDataCount() / sync_for_bg;
201
202     if (it_n < NEGLECT_IDLE) {
203       prev_idle = idle_time;
204     }
205     idle_time -= prev_idle;
206
207     // The chares do not contribute their 0th iteration load. So the total syncs
208     // in reality is total_syncs_called + obj_counts
209     int total_countable_syncs = adaptive_struct.total_syncs_called +
210         (1 - NEGLECT_IDLE) * lbdatabase->getLBDB()->ObjDataCount(); // TODO: Fix me! weird!
211     if (total_countable_syncs != 0) {
212       idle_time = idle_time * lbdatabase->getLBDB()->ObjDataCount() / total_countable_syncs;
213     }
214
215     double lb_data[STATS_COUNT];
216     lb_data[0] = it_n;
217     lb_data[1] = 1;
218     lb_data[2] = total_load_vec[index]; // For average load
219     lb_data[3] = total_load_vec[index]; // For max load
220     lb_data[4] = idle_time;
221     // Set utilization
222     if (total_load_vec[index] == 0.0) {
223       lb_data[5] = 0.0;
224     } else {
225       lb_data[5] = total_load_vec[index]/(idle_time + total_load_vec[index]);
226     }
227     lb_data[6] = lb_data[2] + bg_walltime; // For Avg load with bg
228     lb_data[7] = lb_data[6]; // For Max load with bg
229     total_load_vec[index] = 0.0;
230     total_count_vec[index] = 0;
231
232     DEBAD(("[%d] sends total load %lf idle time %lf ratio of idle/load %lf at iter %d\n",
233         CkMyPe(), total_load_vec[index], idle_time,
234         idle_time/total_load_vec[index], adaptive_struct.lb_iteration_no));
235
236     CkCallback cb(CkIndex_MetaBalancer::ReceiveMinStats((CkReductionMsg*)NULL), thisProxy[0]);
237     contribute(STATS_COUNT*sizeof(double), lb_data, lbDataCollectionType, cb);
238   }
239   return true;
240 }
241
242 void MetaBalancer::ReceiveMinStats(CkReductionMsg *msg) {
243   double* load = (double *) msg->getData();
244   double avg = load[2]/load[1];
245   double max = load[3];
246   double avg_idle = load[4]/load[1];
247   double utilization = load[5];
248   int iteration_n = (int) load[0];
249   double avg_load_bg = load[6]/load[1];
250   double max_load_bg = load[7];
251   DEBAD(("** [%d] Iteration Avg load: %lf Max load: %lf Avg Idle : %lf \
252       Max Idle : %lf for %lf procs\n",iteration_n, avg, max, avg_idle,
253       utilization, load[1]));
254   delete msg;
255
256   // For processors with  no  objs, trigger MetaBalancer reduction
257   if (adaptive_struct.final_lb_period != iteration_n) {
258     for (int i = 0; i < lbdb_no_obj_callback.size(); i++) {
259       thisProxy[lbdb_no_obj_callback[i]].TriggerAdaptiveReduction();
260     }
261   }
262
263   // Store the data for this iteration
264   adaptive_lbdb.lb_iter_no = iteration_n;
265   AdaptiveData data;
266   data.iteration = adaptive_lbdb.lb_iter_no;
267   data.max_load = max;
268   data.avg_load = avg;
269   data.utilization = utilization;
270   data.idle_time = avg_idle;
271   adaptive_lbdb.history_data.push_back(data);
272
273   // If lb period inform is in progress, dont inform again.
274   // If this is the lb data corresponding to the final lb period informed, then
275   // don't recalculate as some of the processors might have already gone into
276   // LB_STAGE.
277   if (adaptive_struct.in_progress || (adaptive_struct.final_lb_period == iteration_n)) {
278     return;
279   }
280
281   double utilization_threshold = UTILIZATION_THRESHOLD;
282
283 #if EXTRA_FEATURE
284   DEBAD(("alpha_beta_to_load %lf\n", alpha_beta_cost_to_load));
285   if (alpha_beta_cost_to_load < 0.1) {
286     // Ignore the effect of idle time and there by lesser utilization. So we
287     // assign utilization threshold to be 0.0
288     DEBAD(("Changing the idle load tolerance coz this isn't communication intensive benchmark\n"));
289     utilization_threshold = 0.0;
290   }
291 #endif
292
293   // First generate the lb period based on the cost of lb. Find out what is the
294   // expected imbalance at the calculated lb period.
295   int period;
296   // This is based on the new max load after load balancing. So technically, it
297   // is calculated based on the shifter up avg curve.
298   double ratio_at_t = 1.0;
299   int tmp_lb_type;
300   double tmp_max_avg_ratio, tmp_comm_ratio;
301   GetPrevLBData(tmp_lb_type, tmp_max_avg_ratio, tmp_comm_ratio);
302   double tolerate_imb = IMB_TOLERANCE * tmp_max_avg_ratio;
303
304   if (generatePlan(period, ratio_at_t)) {
305     DEBAD(("Generated period and calculated %d and period %d max iter %d\n",
306       adaptive_struct.lb_calculated_period, period,
307       adaptive_struct.tentative_max_iter_no));
308     // set the imbalance tolerance to be ratio_at_calculated_lb_period
309     if (ratio_at_t != 1.0) {
310       DEBAD(("Changed tolerance to %lf after line eq whereas max/avg is %lf\n", ratio_at_t, max/avg));
311       // Since ratio_at_t is shifter up, max/(tmp_max_avg_ratio * avg) should be
312       // compared with the tolerance
313       tolerate_imb = ratio_at_t * tmp_max_avg_ratio * OUTOFWAY_TOLERANCE;
314     }
315
316     DEBAD(("Prev LB Data Type %d, max/avg %lf, local/remote %lf\n", tmp_lb_type, tmp_max_avg_ratio, tmp_comm_ratio));
317
318     if ((utilization < utilization_threshold || max/avg >= tolerate_imb) &&
319           adaptive_lbdb.history_data.size() > MIN_STATS) {
320       DEBAD(("Trigger soon even though we calculated lbperiod max/avg(%lf) and utilization ratio (%lf)\n", max/avg, utilization));
321       TriggerSoon(iteration_n, max/avg, tolerate_imb);
322       return;
323     }
324
325     // If the new lb period from linear extrapolation is greater than maximum
326     // iteration known from previously collected data, then inform all the
327     // processors about the new calculated period.
328     if (period > adaptive_struct.tentative_max_iter_no && period !=
329           adaptive_struct.final_lb_period) {
330       adaptive_struct.doCommStrategy = false;
331       adaptive_struct.lb_calculated_period = period;
332       adaptive_struct.in_progress = true;
333       CkPrintf("Sticking to the calculated period %d\n",
334         adaptive_struct.lb_calculated_period);
335       thisProxy.LoadBalanceDecision(adaptive_struct.lb_msg_send_no++,
336         adaptive_struct.lb_calculated_period);
337       return;
338     }
339     return;
340   }
341
342   DEBAD(("Prev LB Data Type %d, max/avg %lf, local/remote %lf\n", tmp_lb_type,
343       tmp_max_avg_ratio, tmp_comm_ratio));
344
345   // This would be called when linear extrapolation did not provide suitable
346   // period provided there is enough  historical data 
347   if ((utilization < utilization_threshold || max/avg >= tolerate_imb) && adaptive_lbdb.history_data.size() > 4) {
348     DEBAD(("Carry out load balancing step at iter max/avg(%lf) and utilization \
349       ratio (%lf)\n", max/avg, utilization));
350     TriggerSoon(iteration_n, max/avg, tolerate_imb);
351     return;
352   }
353
354 }
355
356 void MetaBalancer::TriggerSoon(int iteration_n, double imbalance_ratio,
357     double tolerate_imb) {
358
359   // If the previously calculated_period (not the final decision) is greater
360   // than the iter +1 and if it is greater than the maximum iteration we have
361   // seen so far, then we can inform this
362   if ((iteration_n + 1 > adaptive_struct.tentative_max_iter_no) &&
363       (iteration_n+1 < adaptive_struct.lb_calculated_period) &&
364       (iteration_n + 1 != adaptive_struct.final_lb_period)) {
365     if (imbalance_ratio < tolerate_imb) {
366       adaptive_struct.doCommStrategy = true;
367       DEBAD(("No load imbalance but idle time\n"));
368     } else {
369       adaptive_struct.doCommStrategy = false;
370       DEBAD(("load imbalance \n"));
371     }
372     adaptive_struct.lb_calculated_period = iteration_n + 1;
373     adaptive_struct.in_progress = true;
374     DEBAD(("Informing everyone the lb period is %d\n",
375         adaptive_struct.lb_calculated_period));
376     thisProxy.LoadBalanceDecision(adaptive_struct.lb_msg_send_no++,
377         adaptive_struct.lb_calculated_period);
378   }
379 }
380
381 bool MetaBalancer::generatePlan(int& period, double& ratio_at_t) {
382   if (adaptive_lbdb.history_data.size() <= 4) {
383     return false;
384   }
385
386   // Some heuristics for lbperiod
387   // If constant load or almost constant,
388   // then max * new_lb_period > avg * new_lb_period + lb_cost
389   double max = 0.0;
390   double avg = 0.0;
391   AdaptiveData data;
392   for (int i = 0; i < adaptive_lbdb.history_data.size(); i++) {
393     data = adaptive_lbdb.history_data[i];
394     max += data.max_load;
395     avg += data.avg_load;
396     DEBAD(("max (%d, %lf) avg (%d, %lf)\n", i, data.max_load, i, data.avg_load));
397   }
398 //  max /= (adaptive_struct.lb_iteration_no - adaptive_lbdb.history_data[0].iteration);
399 //  avg /= (adaptive_struct.lb_iteration_no - adaptive_lbdb.history_data[0].iteration);
400
401   // If linearly varying load, then find lb_period
402   // area between the max and avg curve
403   // If we can attain perfect balance, then the new load is close to the
404   // average. Hence we pass 1, else pass in some other value which would be the
405   // new max_load after load balancing.
406   int tmp_lb_type;
407   double tmp_max_avg_ratio, tmp_comm_ratio;
408   double tolerate_imb;
409
410 #if EXTRA_FEATURE
411   // First get the data for refine.
412   GetLBDataForLB(1, tmp_max_avg_ratio, tmp_comm_ratio);
413   tolerate_imb = tmp_max_avg_ratio;
414
415   // If RefineLB does a good job, then find the period considering RefineLB
416   if (tmp_max_avg_ratio <= 1.01) {
417     if (max/avg < tolerate_imb) {
418       DEBAD(("Resorting to imb = 1.0 coz max/avg (%lf) < imb(%lf)\n", max/avg,
419           tolerate_imb));
420       tolerate_imb = 1.0;
421     }
422     DEBAD(("Will generate plan for refine %lf imb and %lf overhead\n",
423         tolerate_imb, 0.2));
424     return getPeriodForStrategy(tolerate_imb, 0.2, period, ratio_at_t);
425   }
426
427   GetLBDataForLB(0, tmp_max_avg_ratio, tmp_comm_ratio);
428 #endif
429
430   GetPrevLBData(tmp_lb_type, tmp_max_avg_ratio, tmp_comm_ratio);
431   tolerate_imb = tmp_max_avg_ratio;
432 //  if (max/avg < tolerate_imb) {
433 //    CkPrintf("Resorting to imb = 1.0 coz max/avg (%lf) < imb(%lf)\n", max/avg, tolerate_imb);
434 //    tolerate_imb = 1.0;
435 //  }
436   if (max/avg > tolerate_imb) {
437     if (getPeriodForStrategy(tolerate_imb, 1, period, ratio_at_t)) {
438       return true;
439     }
440   }
441
442   max = 0.0;
443   avg = 0.0;
444   for (int i = 0; i < adaptive_lbdb.history_data.size(); i++) {
445     data = adaptive_lbdb.history_data[i];
446     max += data.max_load;
447     avg += data.avg_load*tolerate_imb;
448   }
449   max /= adaptive_lbdb.history_data.size();
450   avg /= adaptive_lbdb.history_data.size();
451   double cost = adaptive_struct.lb_strategy_cost + adaptive_struct.lb_migration_cost;
452   period = (int) cost/(max - avg); 
453   DEBAD(("Obtained period %d from constant prediction\n", period));
454   if (period < 0) { 
455     period = adaptive_struct.final_lb_period;
456     DEBAD(("Obtained -ve period from constant prediction so changing to prev %d\n", period));
457   } 
458   ratio_at_t = max / avg;
459   return true;
460 }
461
462 bool MetaBalancer::getPeriodForStrategy(double new_load_percent,
463     double overhead_percent, int& period, double& ratio_at_t) {
464   double mslope, aslope, mc, ac;
465   getLineEq(new_load_percent, aslope, ac, mslope, mc);
466   DEBAD(("new load percent %lf\n", new_load_percent));
467   DEBAD(("\n max: %fx + %f; avg: %fx + %f\n", mslope, mc, aslope, ac));
468   double a = (mslope - aslope)/2;
469   double b = (mc - ac);
470   double c = -(adaptive_struct.lb_strategy_cost +
471       adaptive_struct.lb_migration_cost) * overhead_percent;
472       CkPrintf("cost %f\n",
473       (adaptive_struct.lb_strategy_cost+adaptive_struct.lb_migration_cost));
474   bool got_period = getPeriodForLinear(a, b, c, period);
475   if (!got_period) {
476     return false;
477   }
478
479   if (mslope < 0) {
480     if (period > (-mc/mslope)) {
481       DEBAD(("Max < 0 Period set when max load is -ve\n"));
482       return false;
483     }
484   }
485
486   if (aslope < 0) {
487     if (period > (-ac/aslope)) {
488       DEBAD(("Avg < 0 Period set when avg load is -ve\n"));
489       return false;
490     }
491   }
492
493   int intersection_t = (int) (mc-ac) / (aslope - mslope);
494   if (intersection_t > 0 && period > intersection_t) {
495     DEBAD(("Avg | Max Period set when curves intersect\n"));
496     return false;
497   }
498   ratio_at_t = ((mslope*period + mc)/(aslope*period + ac));
499   DEBAD(("Ratio at t (%lf*%d + %lf) / (%lf*%d+%lf) = %lf\n", mslope, period, mc, aslope, period, ac, ratio_at_t));
500   return true;
501 }
502
503 bool MetaBalancer::getPeriodForLinear(double a, double b, double c, int& period) {
504   DEBAD(("Quadratic Equation %lf X^2 + %lf X + %lf\n", a, b, c));
505   if (a == 0.0) {
506     period = (int) (-c / b);
507     if (period < 0) {
508       DEBAD(("-ve period for -c/b (%d)\n", period));
509       return false;
510     }
511     DEBAD(("Ideal period for linear load %d\n", period));
512     return true;
513   }
514   int x;
515   double t = (b * b) - (4*a*c);
516   if (t < 0) {
517     DEBAD(("(b * b) - (4*a*c) is -ve sqrt : %lf\n", sqrt(t)));
518     return false;
519   }
520   t = (-b + sqrt(t)) / (2*a);
521   x = (int) t;
522   if (x < 0) {
523     DEBAD(("boo!!! x (%d) < 0\n", x));
524     x = 0;
525     return false;
526   }
527   period = x;
528   DEBAD(("Ideal period for linear load %d\n", period));
529   return true;
530 }
531
532 bool MetaBalancer::getLineEq(double new_load_percent, double& aslope, double& ac, double& mslope, double& mc) {
533   int total = adaptive_lbdb.history_data.size();
534   int iterations = (int) (1 + adaptive_lbdb.history_data[total - 1].iteration -
535       adaptive_lbdb.history_data[0].iteration);
536   double a1 = 0;
537   double m1 = 0;
538   double a2 = 0;
539   double m2 = 0;
540   AdaptiveData data;
541   int i = 0;
542   for (i = 0; i < total/2; i++) {
543     data = adaptive_lbdb.history_data[i];
544     m1 += data.max_load;
545     a1 += data.avg_load;
546     DEBAD(("max (%d, %lf) avg (%d, %lf) adjusted_avg (%d, %lf)\n", i, data.max_load, i, data.avg_load, i, new_load_percent*data.avg_load));
547   }
548   m1 /= i;
549   a1 = (a1 * new_load_percent) / i;
550
551   for (i = total/2; i < total; i++) {
552     data = adaptive_lbdb.history_data[i];
553     m2 += data.max_load;
554     a2 += data.avg_load;
555     DEBAD(("max (%d, %lf) avg (%d, %lf) adjusted_avg (%d, %lf)\n", i, data.max_load, i, data.avg_load, i, new_load_percent*data.avg_load));
556   }
557   m2 /= (i - total/2);
558   a2 = (a2 * new_load_percent) / (i - total/2);
559
560   aslope = 2 * (a2 - a1) / iterations;
561   mslope = 2 * (m2 - m1) / iterations;
562   ac = adaptive_lbdb.history_data[0].avg_load * new_load_percent;
563   mc = adaptive_lbdb.history_data[0].max_load;
564
565   ac = a1 - ((aslope * total)/4);
566   mc = m1 - ((mslope * total)/4);
567
568   //ac = (adaptive_lbdb.history_data[1].avg_load * new_load_percent - aslope);
569   //mc = (adaptive_lbdb.history_data[1].max_load - mslope);
570
571   return true;
572 }
573
574 void MetaBalancer::LoadBalanceDecision(int req_no, int period) {
575   if (req_no < adaptive_struct.lb_msg_recv_no) {
576     DEBAD(("Error!!! Received a request which was already sent or old\n"));
577     return;
578   }
579   DEBAD(("[%d] Load balance decision made cur iteration: %d period:%d\n",CkMyPe(), adaptive_struct.lb_iteration_no, period));
580   adaptive_struct.tentative_period = period;
581   adaptive_struct.lb_msg_recv_no = req_no;
582   thisProxy[0].ReceiveIterationNo(req_no, adaptive_struct.lb_iteration_no);
583 }
584
585 void MetaBalancer::LoadBalanceDecisionFinal(int req_no, int period) {
586   if (req_no < adaptive_struct.lb_msg_recv_no) {
587     return;
588   }
589   DEBAD(("[%d] Final Load balance decision made cur iteration: %d period:%d \n",CkMyPe(), adaptive_struct.lb_iteration_no, period));
590   adaptive_struct.tentative_period = period;
591   adaptive_struct.final_lb_period = period;
592   lbdatabase->AdaptResumeSync(period);
593 }
594
595
596 void MetaBalancer::ReceiveIterationNo(int req_no, int local_iter_no) {
597   CmiAssert(CkMyPe() == 0);
598
599   adaptive_struct.global_recv_iter_counter++;
600   if (local_iter_no > adaptive_struct.global_max_iter_no) {
601     adaptive_struct.global_max_iter_no = local_iter_no;
602   }
603
604   int period;
605   if (CkNumPes() == adaptive_struct.global_recv_iter_counter) {
606
607     if (adaptive_struct.global_max_iter_no > adaptive_struct.tentative_max_iter_no) {
608       adaptive_struct.tentative_max_iter_no = adaptive_struct.global_max_iter_no;
609     }
610     period = (adaptive_struct.tentative_period > adaptive_struct.global_max_iter_no) ? adaptive_struct.tentative_period : adaptive_struct.global_max_iter_no + 1;
611     // If no one has gone into load balancing stage, then we can safely change
612     // the period otherwise keep the old period.
613     if (adaptive_struct.global_max_iter_no < adaptive_struct.final_lb_period) {
614       adaptive_struct.tentative_period = period;
615       DEBAD(("Final lb_period CHANGED!%d\n", adaptive_struct.tentative_period));
616     } else {
617       adaptive_struct.tentative_period = adaptive_struct.final_lb_period;
618       DEBAD(("Final lb_period NOT CHANGED!%d\n", adaptive_struct.tentative_period));
619     }
620     thisProxy.LoadBalanceDecisionFinal(req_no, adaptive_struct.tentative_period);
621     adaptive_struct.in_progress = false;
622     adaptive_struct.global_recv_iter_counter = 0;
623   }
624 }
625
626 int MetaBalancer::getPredictedLBPeriod(bool& is_tentative) {
627   // If tentative and final_lb_period are the same, then the decision has been
628   // made but if not, they are in the middle of consensus, hence return the
629   // lease of the two
630   if (adaptive_struct.tentative_period != adaptive_struct.final_lb_period) {
631     is_tentative = true;
632   } else {
633     is_tentative = false;
634   }
635   if (adaptive_struct.tentative_period < adaptive_struct.final_lb_period) {
636     return adaptive_struct.tentative_period;
637    } else {
638      return adaptive_struct.final_lb_period;
639    }
640 }
641
642 // Called by CentralLB to indicate that the LB strategy and migration is in
643 // progress.
644 void MetaBalancer::ResetAdaptive() {
645   adaptive_lbdb.lb_iter_no = -1;
646   lb_in_progress = true;
647 }
648
649 // Called by LBDatabase to indicate that no objs are there in this processor
650 void MetaBalancer::HandleAdaptiveNoObj() {
651   adaptive_struct.lb_iteration_no++;
652   DEBAD(("(%d) --HandleAdaptiveNoObj %d\n", CkMyPe(),
653       adaptive_struct.lb_iteration_no));
654   thisProxy[0].RegisterNoObjCallback(CkMyPe());
655   TriggerAdaptiveReduction();
656 }
657
658 void MetaBalancer::RegisterNoObjCallback(int index) {
659   // If the load balancing process (migration) is going on and in the meantime
660   // one of the processor finishes everything and finds that there are no objs
661   // in it, then it registers a callback. So clear the old data in case it
662   // hasn't been done.
663   if (lb_in_progress) {
664     lbdb_no_obj_callback.clear();
665     lb_in_progress = false;
666   }
667   lbdb_no_obj_callback.push_back(index);
668   DEBAD(("Registered %d to have no objs.\n", index));
669
670   // If collection has already happened and this is second iteration, then
671   // trigger reduction.
672   if (adaptive_lbdb.lb_iter_no != -1) {
673     DEBAD(("Collection already started now %d so kick in\n",
674         adaptive_struct.lb_iteration_no));
675     thisProxy[index].TriggerAdaptiveReduction();
676   }
677 }
678
679 void MetaBalancer::TriggerAdaptiveReduction() {
680   adaptive_struct.lb_iteration_no++;
681   double lb_data[STATS_COUNT];
682   lb_data[0] = adaptive_struct.lb_iteration_no;
683   lb_data[1] = 1;
684   lb_data[2] = 0.0;
685   lb_data[3] = 0.0;
686   lb_data[4] = 0.0;
687   lb_data[5] = 0.0;
688   lb_data[6] = 0.0;
689   lb_data[7] = 0.0;
690
691   DEBAD(("[%d] Triggered adaptive reduction for noobj %d\n", CkMyPe(),
692       adaptive_struct.lb_iteration_no));
693
694   CkCallback cb(CkIndex_MetaBalancer::ReceiveMinStats((CkReductionMsg*)NULL),
695       thisProxy[0]);
696   contribute(STATS_COUNT*sizeof(double), lb_data, lbDataCollectionType, cb);
697 }
698
699
700 bool MetaBalancer::isStrategyComm() {
701   return adaptive_struct.doCommStrategy;
702 }
703
704 void MetaBalancer::SetMigrationCost(double lb_migration_cost) {
705   adaptive_struct.lb_migration_cost = lb_migration_cost;
706 }
707
708 void MetaBalancer::SetStrategyCost(double lb_strategy_cost) {
709   adaptive_struct.lb_strategy_cost = lb_strategy_cost;
710 }
711
712 void MetaBalancer::UpdateAfterLBData(int lb, double lb_max, double lb_avg, double
713     local_comm, double remote_comm) {
714   adaptive_struct.last_lb_type = lb;
715   if (lb == 0) {
716     adaptive_struct.greedy_info.max_avg_ratio = lb_max/lb_avg;
717   } else if (lb == 1) {
718     adaptive_struct.refine_info.max_avg_ratio = lb_max/lb_avg;
719   } else if (lb == 2) {
720     adaptive_struct.comm_info.remote_local_ratio = remote_comm/local_comm;
721   } else if (lb == 3) {
722     adaptive_struct.comm_refine_info.remote_local_ratio =
723     remote_comm/local_comm;
724   }
725 }
726
727 void MetaBalancer::UpdateAfterLBData(double max_load, double max_cpu, double
728 avg_load) {
729   if (adaptive_struct.last_lb_type == -1) {
730     adaptive_struct.last_lb_type = 0;
731   }
732   int lb = adaptive_struct.last_lb_type;
733   if (lb == 0) {
734     adaptive_struct.greedy_info.max_avg_ratio = max_load/avg_load;
735   } else if (lb == 1) {
736     adaptive_struct.refine_info.max_avg_ratio = max_load/avg_load;
737   } else if (lb == 2) {
738     adaptive_struct.comm_info.max_avg_ratio = max_load/avg_load;
739   } else if (lb == 3) {
740     adaptive_struct.comm_refine_info.max_avg_ratio = max_load/avg_load;
741   }
742 }
743
744 void MetaBalancer::UpdateAfterLBComm(double alpha_beta_to_load) {
745   DEBAD(("Setting alpha beta %lf\n", alpha_beta_to_load));
746   alpha_beta_cost_to_load = alpha_beta_to_load;
747 }
748
749
750 void MetaBalancer::GetPrevLBData(int& lb_type, double& lb_max_avg_ratio, double&
751     remote_local_comm_ratio) {
752   lb_type = adaptive_struct.last_lb_type;
753   lb_max_avg_ratio = 1;
754   remote_local_comm_ratio = 1;
755   GetLBDataForLB(lb_type, lb_max_avg_ratio, remote_local_comm_ratio);
756 }
757
758 void MetaBalancer::GetLBDataForLB(int lb_type, double& lb_max_avg_ratio, double&
759     remote_local_comm_ratio) {
760   if (lb_type == 0) {
761     lb_max_avg_ratio = adaptive_struct.greedy_info.max_avg_ratio;
762   } else if (lb_type == 1) {
763     lb_max_avg_ratio = adaptive_struct.refine_info.max_avg_ratio;
764   } else if (lb_type == 2) {
765     remote_local_comm_ratio = adaptive_struct.comm_info.remote_local_ratio;
766   } else if (lb_type == 3) {
767     remote_local_comm_ratio =
768        adaptive_struct.comm_refine_info.remote_local_ratio;
769   }
770 }
771
772 #include "MetaBalancer.def.h"
773
774 /*@}*/