33b7993158a4cdc85ac61c00e638154e86fa030e
[charm.git] / src / ck-ldb / MetaBalancer.C
1 /**
2  * \addtogroup CkLdb
3 */
4 /*@{*/
5
6 #include "converse.h"
7
8 /*
9  * This C++ file contains the Charm stub functions
10  */
11
12 #include "MetaBalancer.h"
13 #include "topology.h"
14
15 #include "limits.h"
16
17 #define VEC_SIZE 50
18 #define IMB_TOLERANCE 1.1
19 #define OUTOFWAY_TOLERANCE 2
20 #define UTILIZATION_THRESHOLD 0.7
21 #define NEGLECT_IDLE 2 // Should never be == 1
22 #define MIN_STATS 6
23
24 #   define DEBAD(x) /*CkPrintf x*/
25 #   define EXTRA_FEATURE 0
26
27 struct AdaptiveData {
28   double iteration;
29   double max_load;
30   double avg_load;
31   double utilization;
32   double idle_time;
33 };
34
35 struct AdaptiveMetaBalancer {
36   std::vector<AdaptiveData> history_data;
37   int lb_iter_no;
38 };
39
40 struct AdaptiveLBInfo {
41   AdaptiveLBInfo() {
42     max_avg_ratio = 1;
43     remote_local_ratio = 1;
44   }
45   double max_avg_ratio;
46   double remote_local_ratio;
47 };
48
49 // TODO: Separate out the datastructure required by just the central and on all
50 // processors
51 struct AdaptiveLBStructure {
52   int tentative_period;
53   int final_lb_period;
54   // This is based on the linear extrapolation
55   int lb_calculated_period;
56   int lb_iteration_no;
57   // This is set when all the processor sends the maximum iteration no
58   int global_max_iter_no;
59   // This keeps track of what was the max iteration no we had previously
60   // received. TODO: Mostly global_max_iter_no should be sufficied.
61   int tentative_max_iter_no;
62   // TODO: Use reduction to collect max iteration. Then we don't need this
63   // counter.
64   int global_recv_iter_counter;
65   // true indicates it is in Inform->ReceiveMaxIter->FinalLBPeriod stage.
66   bool in_progress;
67   double lb_strategy_cost;
68   double lb_migration_cost;
69   bool doCommStrategy;
70   int lb_msg_send_no;
71   int lb_msg_recv_no;
72   // Total AtSync calls from all the chares residing on the processor
73   int total_syncs_called;
74   int last_lb_type;
75   AdaptiveLBInfo greedy_info;
76   AdaptiveLBInfo refine_info;
77   AdaptiveLBInfo comm_info;
78   AdaptiveLBInfo comm_refine_info;
79 };
80
81
82 CkReductionMsg* lbDataCollection(int nMsg, CkReductionMsg** msgs) {
83   double lb_data[8];
84   lb_data[1] = 0.0; // total number of processors contributing
85   lb_data[2] = 0.0; // total load
86   lb_data[3] = 0.0; // max load
87   lb_data[4] = 0.0; // idle time
88   lb_data[5] = 1.0; // utilization
89   lb_data[6] = 0.0; // total load with bg
90   lb_data[7] = 0.0; // max load with bg
91   for (int i = 0; i < nMsg; i++) {
92     CkAssert(msgs[i]->getSize() == 8*sizeof(double));
93     if (msgs[i]->getSize() != 8*sizeof(double)) {
94       CkPrintf("Error!!! Reduction not correct. Msg size is %d\n", msgs[i]->getSize());
95     }
96     double* m = (double *)msgs[i]->getData();
97     // Total count
98     lb_data[1] += m[1];
99     // Avg load
100     lb_data[2] += m[2];
101     // Max load
102     lb_data[3] = ((m[3] > lb_data[3])? m[3] : lb_data[3]);
103     // Avg idle
104     lb_data[4] += m[4];
105     // Get least utilization
106     lb_data[5] = ((m[5] < lb_data[5]) ? m[5] : lb_data[5]);
107     // Get Avg load with bg
108     lb_data[6] += m[6];
109     // Get Max load with bg
110     lb_data[7] = ((m[7] > lb_data[7])? m[7] : lb_data[7]);
111     if (i == 0) {
112       // Iteration no
113       lb_data[0] = m[0];
114     }
115     if (m[0] != lb_data[0]) {
116       CkPrintf("Error!!! Reduction is intermingled between iteration %lf and\
117       %lf\n", lb_data[0], m[0]);
118       CkAbort("Intermingling iterations\n");
119     }
120   }
121   return CkReductionMsg::buildNew(8*sizeof(double), lb_data);
122 }
123
124 /*global*/ CkReduction::reducerType lbDataCollectionType;
125 /*initcall*/ void registerLBDataCollection(void) {
126   lbDataCollectionType = CkReduction::addReducer(lbDataCollection);
127 }
128
129 CkGroupID _metalb;
130
131 CkpvDeclare(int, metalbInited);  /**< true if metabalancer is inited */
132
133 CkpvDeclare(AdaptiveMetaBalancer, adaptive_lbdb);
134 CkpvDeclare(AdaptiveLBStructure, adaptive_struct);
135
136 // mainchare
137 MetaLBInit::MetaLBInit(CkArgMsg *m)
138 {
139 #if CMK_LBDB_ON
140   _metalb = CProxy_MetaBalancer::ckNew();
141 #endif
142   CkPrintf("META LB Init Called\n");
143   delete m;
144 }
145
146 // called from init.C
147 void _metabalancerInit()
148 {
149   CkpvInitialize(int, metalbInited);
150   CkpvAccess(metalbInited) = 0;
151 }
152
153 void MetaBalancer::initnodeFn()
154 {
155 }
156
157 // called my constructor
158 void MetaBalancer::init(void)
159 {
160   CkPrintf("Metabalancer init %d lbdb\n", _lbdb);
161   lbdatabase = (LBDatabase *)CkLocalBranch(_lbdb);
162   CkpvAccess(metalbInited) = 1;
163   total_load_vec.resize(VEC_SIZE, 0.0);
164   total_count_vec.resize(VEC_SIZE, 0);
165   max_iteration = -1;
166   prev_idle = 0.0;
167   alpha_beta_cost_to_load = 1.0; // Some random value. TODO: Find the actual
168
169   CkpvInitialize(AdaptiveMetaBalancer, adaptive_lbdb);
170   CkpvInitialize(AdaptiveLBStructure, adaptive_struct);
171
172   CkpvAccess(adaptive_lbdb).lb_iter_no = -1;
173
174   // If metabalancer enabled, initialize the variables
175   CkpvAccess(adaptive_struct).tentative_period =  INT_MAX;
176   CkpvAccess(adaptive_struct).final_lb_period =  INT_MAX;
177   CkpvAccess(adaptive_struct).lb_calculated_period = INT_MAX;
178   CkpvAccess(adaptive_struct).lb_iteration_no = -1;
179   CkpvAccess(adaptive_struct).global_max_iter_no = 0;
180   CkpvAccess(adaptive_struct).tentative_max_iter_no = -1;
181   CkpvAccess(adaptive_struct).global_recv_iter_counter = 0;
182   CkpvAccess(adaptive_struct).in_progress = false;
183   CkpvAccess(adaptive_struct).lb_strategy_cost = 0.0;
184   CkpvAccess(adaptive_struct).lb_migration_cost = 0.0;
185   CkpvAccess(adaptive_struct).lb_msg_send_no = 0;
186   CkpvAccess(adaptive_struct).lb_msg_recv_no = 0;
187   CkpvAccess(adaptive_struct).total_syncs_called = 0;
188   CkpvAccess(adaptive_struct).last_lb_type = -1;
189
190
191   // This is indicating if the load balancing strategy and migration started.
192   // This is mainly used to register callbacks for noobj pes. They would
193   // register as soon as resumefromsync is called. On receiving the handles at
194   // the central pe, it clears the previous handlers and sets lb_in_progress
195   // to false so that it doesn't clear the handles.
196   lb_in_progress = false;
197
198   is_prev_lb_refine = -1;
199 }
200
201 void MetaBalancer::pup(PUP::er& p)
202 {
203         IrrGroup::pup(p);
204   if (p.isUnpacking()) {
205     lbdatabase = (LBDatabase *)CkLocalBranch(_lbdb);
206   }
207   // NOTE set lbdatabase using the id
208 }
209
210
211 void MetaBalancer::ResumeClients() {
212   // If metabalancer enabled, initialize the variables
213   CkpvAccess(adaptive_lbdb).history_data.clear();
214
215   CkpvAccess(adaptive_struct).tentative_period =  INT_MAX;
216   CkpvAccess(adaptive_struct).final_lb_period =  INT_MAX;
217   CkpvAccess(adaptive_struct).lb_calculated_period = INT_MAX;
218   CkpvAccess(adaptive_struct).lb_iteration_no = -1;
219   CkpvAccess(adaptive_struct).global_max_iter_no = 0;
220   CkpvAccess(adaptive_struct).tentative_max_iter_no = -1;
221   CkpvAccess(adaptive_struct).global_recv_iter_counter = 0;
222   CkpvAccess(adaptive_struct).in_progress = false;
223   CkpvAccess(adaptive_struct).lb_strategy_cost = 0.0;
224   CkpvAccess(adaptive_struct).lb_migration_cost = 0.0;
225   CkpvAccess(adaptive_struct).lb_msg_send_no = 0;
226   CkpvAccess(adaptive_struct).lb_msg_recv_no = 0;
227   CkpvAccess(adaptive_struct).total_syncs_called = 0;
228
229   prev_idle = 0.0;
230   if (lb_in_progress) {
231     lbdb_no_obj_callback.clear();
232     lb_in_progress = false;
233   }
234   // While resuming client, if we find that there are no objects, then handle
235   // the case accordingly.
236   if (lbdatabase->getLBDB()->ObjDataCount() == 0) {
237     CkPrintf("%d processor has 0 objs\n", CkMyPe());
238     HandleAdaptiveNoObj();
239   }
240 }
241
242 int MetaBalancer::get_iteration() {
243   return CkpvAccess(adaptive_struct).lb_iteration_no;
244 }
245
246 bool MetaBalancer::AddLoad(int it_n, double load) {
247   int index = it_n % VEC_SIZE;
248   total_count_vec[index]++;
249   CkpvAccess(adaptive_struct).total_syncs_called++;
250   DEBAD(("At PE %d Total contribution for iteration %d is %d total objs %d\n",
251       CkMyPe(), it_n, total_count_vec[index],
252       lbdatabase->getLBDB()->ObjDataCount()));
253
254   if (it_n < CkpvAccess(adaptive_struct).lb_iteration_no) {
255     CkAbort("Error!! Received load for previous iteration\n");
256   }
257   if (it_n > CkpvAccess(adaptive_struct).lb_iteration_no) {
258     CkpvAccess(adaptive_struct).lb_iteration_no = it_n;
259   }
260   total_load_vec[index] += load;
261   if (total_count_vec[index] > lbdatabase->getLBDB()->ObjDataCount()) {
262     CkPrintf("iteration %d received %d contributions and expected %d\n", it_n,
263         total_count_vec[index], lbdatabase->getLBDB()->ObjDataCount());
264     CkAbort("Abort!!! Received more contribution");
265   }
266
267   if (total_count_vec[index] == lbdatabase->getLBDB()->ObjDataCount()) {
268     double idle_time, bg_walltime, cpu_bgtime;
269     lbdatabase->IdleTime(&idle_time);
270     lbdatabase->BackgroundLoad(&bg_walltime, &cpu_bgtime);
271
272     int sync_for_bg = CkpvAccess(adaptive_struct).total_syncs_called +
273         lbdatabase->getLBDB()->ObjDataCount();
274     bg_walltime = bg_walltime * lbdatabase->getLBDB()->ObjDataCount() / sync_for_bg;
275
276     if (it_n < NEGLECT_IDLE) {
277       prev_idle = idle_time;
278     }
279     idle_time -= prev_idle;
280
281     // The chares do not contribute their 0th iteration load. So the total syncs
282     // in reality is total_syncs_called + obj_counts
283     int total_countable_syncs = CkpvAccess(adaptive_struct).total_syncs_called +
284         (1 - NEGLECT_IDLE) * lbdatabase->getLBDB()->ObjDataCount(); // TODO: Fix me! weird!
285     if (total_countable_syncs != 0) {
286       idle_time = idle_time * lbdatabase->getLBDB()->ObjDataCount() / total_countable_syncs;
287     }
288     //CkPrintf("[%d] Idle time %lf and countable %d for iteration %d\n", CkMyPe(), idle_time, total_countable_syncs, iteration);
289
290     double lb_data[8];
291     lb_data[0] = it_n;
292     lb_data[1] = 1;
293     lb_data[2] = total_load_vec[index]; // For average load
294     lb_data[3] = total_load_vec[index]; // For max load
295     lb_data[4] = idle_time;
296     // Set utilization
297     if (total_load_vec[index] == 0.0) {
298       lb_data[5] = 0.0;
299     } else {
300       lb_data[5] = total_load_vec[index]/(idle_time + total_load_vec[index]);
301     }
302     lb_data[6] = lb_data[2] + bg_walltime; // For Avg load with bg
303     lb_data[7] = lb_data[6]; // For Max load with bg
304     total_load_vec[index] = 0.0;
305     total_count_vec[index] = 0;
306
307     //CkPrintf("   [%d] sends total load %lf idle time %lf ratio of idle/load %lf at iter %d\n", CkMyPe(),
308     //    total_load_vec[iteration], idle_time,
309     //    idle_time/total_load_vec[iteration], adaptive_struct.lb_iteration_no);
310
311     CkCallback cb(CkIndex_MetaBalancer::ReceiveMinStats((CkReductionMsg*)NULL), thisProxy[0]);
312     contribute(8*sizeof(double), lb_data, lbDataCollectionType, cb);
313   }
314   return true;
315 }
316
317 void MetaBalancer::ReceiveMinStats(CkReductionMsg *msg) {
318   double* load = (double *) msg->getData();
319   double avg = load[2]/load[1];
320   double max = load[3];
321   double avg_idle = load[4]/load[1];
322   double utilization = load[5];
323   int iteration_n = load[0];
324   double avg_load_bg = load[6]/load[1];
325   double max_load_bg = load[7];
326   DEBAD(("** [%d] Iteration Avg load: %lf Max load: %lf Avg Idle : %lf Max Idle : %lf for %lf procs\n",iteration_n, avg, max, avg_idle, utilization, load[1]));
327   CkPrintf("** [%d] Iteration Avg load: %lf Max load: %lf With bg Avg load: %lf Max load: %lf Avg Idle : %lf Max Idle : %lf for %lf procs\n",iteration_n, avg, max, avg_load_bg, max_load_bg, avg_idle, utilization, load[1]);
328   delete msg;
329
330 #if EXTRA_FEATURE
331   if (CkpvAccess(adaptive_struct).final_lb_period != iteration_n) {
332     for (int i = 0; i < lbdb_no_obj_callback.size(); i++) {
333       thisProxy[lbdb_no_obj_callback[i]].TriggerAdaptiveReduction();
334     }
335   }
336 #endif
337
338   // Store the data for this iteration
339   CkpvAccess(adaptive_lbdb).lb_iter_no = iteration_n;
340   AdaptiveData data;
341   data.iteration = CkpvAccess(adaptive_lbdb).lb_iter_no;
342   data.max_load = max;
343   data.avg_load = avg;
344   data.utilization = utilization;
345   data.idle_time = avg_idle;
346   CkpvAccess(adaptive_lbdb).history_data.push_back(data);
347
348   // If lb period inform is in progress, dont inform again.
349   // If this is the lb data corresponding to the final lb period informed, then
350   // don't recalculate as some of the processors might have already gone into
351   // LB_STAGE.
352   if (CkpvAccess(adaptive_struct).in_progress || (CkpvAccess(adaptive_struct).final_lb_period == iteration_n)) {
353     return;
354   }
355
356   double utilization_threshold = UTILIZATION_THRESHOLD;
357
358 #if EXTRA_FEATURE
359   CkPrintf("alpha_beta_to_load %lf\n", alpha_beta_cost_to_load);
360   if (alpha_beta_cost_to_load < 0.1) {
361     // Ignore the effect of idle time and there by lesser utilization. So we
362     // assign utilization threshold to be 0.0
363     CkPrintf("Changing the idle load tolerance coz this isn't communication intensive benchmark\n");
364     utilization_threshold = 0.0;
365   }
366 #endif
367
368   // First generate the lb period based on the cost of lb. Find out what is the
369   // expected imbalance at the calculated lb period.
370   int period;
371   // This is based on the new max load after load balancing. So technically, it
372   // is calculated based on the shifter up avg curve.
373   double ratio_at_t = 1.0;
374   int tmp_lb_type;
375   double tmp_max_avg_ratio, tmp_comm_ratio;
376   GetPrevLBData(tmp_lb_type, tmp_max_avg_ratio, tmp_comm_ratio);
377   double tolerate_imb = IMB_TOLERANCE * tmp_max_avg_ratio;
378
379   if (generatePlan(period, ratio_at_t)) {
380     DEBAD(("Generated period and calculated %d and period %d max iter %d\n",
381       CkpvAccess(adaptive_struct).lb_calculated_period, period,
382       CkpvAccess(adaptive_struct).tentative_max_iter_no));
383     // set the imbalance tolerance to be ratio_at_calculated_lb_period
384     if (ratio_at_t != 1.0) {
385       CkPrintf("Changed tolerance to %lf after line eq whereas max/avg is %lf\n", ratio_at_t, max/avg);
386       // Since ratio_at_t is shifter up, max/(tmp_max_avg_ratio * avg) should be
387       // compared with the tolerance
388       tolerate_imb = ratio_at_t * tmp_max_avg_ratio * OUTOFWAY_TOLERANCE;
389     }
390
391     CkPrintf("Prev LB Data Type %d, max/avg %lf, local/remote %lf\n", tmp_lb_type, tmp_max_avg_ratio, tmp_comm_ratio);
392
393     if ((utilization < utilization_threshold || max/avg >= tolerate_imb) &&
394           CkpvAccess(adaptive_lbdb).history_data.size() > MIN_STATS) {
395       CkPrintf("Trigger soon even though we calculated lbperiod max/avg(%lf) and utilization ratio (%lf)\n", max/avg, utilization);
396       TriggerSoon(iteration_n, max/avg, tolerate_imb);
397       return;
398     }
399
400     // If the new lb period from linear extrapolation is greater than maximum
401     // iteration known from previously collected data, then inform all the
402     // processors about the new calculated period.
403     if (period > CkpvAccess(adaptive_struct).tentative_max_iter_no && period !=
404           CkpvAccess(adaptive_struct).final_lb_period) {
405       CkpvAccess(adaptive_struct).doCommStrategy = false;
406       CkpvAccess(adaptive_struct).lb_calculated_period = period;
407       CkpvAccess(adaptive_struct).in_progress = true;
408       CkPrintf("Sticking to the calculated period %d\n",
409         CkpvAccess(adaptive_struct).lb_calculated_period);
410       thisProxy.LoadBalanceDecision(CkpvAccess(adaptive_struct).lb_msg_send_no++,
411         CkpvAccess(adaptive_struct).lb_calculated_period);
412       return;
413     }
414     // TODO: Shouldn't we return from here??
415   }
416
417   CkPrintf("Prev LB Data Type %d, max/avg %lf, local/remote %lf\n", tmp_lb_type, tmp_max_avg_ratio, tmp_comm_ratio);
418
419   // This would be called when the datasize is not enough to calculate lb period
420   if ((utilization < utilization_threshold || max/avg >= tolerate_imb) && CkpvAccess(adaptive_lbdb).history_data.size() > 4) {
421     CkPrintf("Carry out load balancing step at iter max/avg(%lf) and utilization ratio (%lf)\n", max/avg, utilization);
422     TriggerSoon(iteration_n, max/avg, tolerate_imb);
423     return;
424   }
425
426 }
427
428 void MetaBalancer::TriggerSoon(int iteration_n, double imbalance_ratio,
429     double tolerate_imb) {
430
431   // If the previously calculated_period (not the final decision) is greater
432   // than the iter +1 and if it is greater than the maximum iteration we have
433   // seen so far, then we can inform this
434   if ((iteration_n + 1 > CkpvAccess(adaptive_struct).tentative_max_iter_no) &&
435       (iteration_n+1 < CkpvAccess(adaptive_struct).lb_calculated_period) &&
436       (iteration_n + 1 != CkpvAccess(adaptive_struct).final_lb_period)) {
437     if (imbalance_ratio < tolerate_imb) {
438       CkpvAccess(adaptive_struct).doCommStrategy = true;
439       CkPrintf("No load imbalance but idle time\n");
440     } else {
441       CkpvAccess(adaptive_struct).doCommStrategy = false;
442       CkPrintf("load imbalance \n");
443     }
444     CkpvAccess(adaptive_struct).lb_calculated_period = iteration_n + 1;
445     CkpvAccess(adaptive_struct).in_progress = true;
446     CkPrintf("Informing everyone the lb period is %d\n",
447         CkpvAccess(adaptive_struct).lb_calculated_period);
448     thisProxy.LoadBalanceDecision(CkpvAccess(adaptive_struct).lb_msg_send_no++,
449         CkpvAccess(adaptive_struct).lb_calculated_period);
450   }
451 }
452
453 bool MetaBalancer::generatePlan(int& period, double& ratio_at_t) {
454   if (CkpvAccess(adaptive_lbdb).history_data.size() <= 4) {
455     return false;
456   }
457
458   // Some heuristics for lbperiod
459   // If constant load or almost constant,
460   // then max * new_lb_period > avg * new_lb_period + lb_cost
461   double max = 0.0;
462   double avg = 0.0;
463   AdaptiveData data;
464   for (int i = 0; i < CkpvAccess(adaptive_lbdb).history_data.size(); i++) {
465     data = CkpvAccess(adaptive_lbdb).history_data[i];
466     max += data.max_load;
467     avg += data.avg_load;
468     //DEBAD(("max (%d, %lf) avg (%d, %lf)\n", i, data.max_load, i, data.avg_load));
469     //CkPrintf("max (%d, %lf) avg (%d, %lf)\n", i, data.max_load, i, data.avg_load);
470   }
471 //  max /= (adaptive_struct.lb_iteration_no - CkpvAccess(adaptive_lbdb).history_data[0].iteration);
472 //  avg /= (adaptive_struct.lb_iteration_no - CkpvAccess(adaptive_lbdb).history_data[0].iteration);
473
474   // If linearly varying load, then find lb_period
475   // area between the max and avg curve
476   // If we can attain perfect balance, then the new load is close to the
477   // average. Hence we pass 1, else pass in some other value which would be the
478   // new max_load after load balancing.
479   int tmp_lb_type;
480   double tmp_max_avg_ratio, tmp_comm_ratio;
481   double tolerate_imb;
482
483 #if EXTRA_FEATURE
484   // First get the data for refine.
485   GetLBDataForLB(1, tmp_max_avg_ratio, tmp_comm_ratio);
486   tolerate_imb = tmp_max_avg_ratio;
487
488   // If RefineLB does a good job, then find the period considering RefineLB
489   if (tmp_max_avg_ratio <= 1.01) {
490     if (max/avg < tolerate_imb) {
491       CkPrintf("Resorting to imb = 1.0 coz max/avg (%lf) < imb(%lf)\n", max/avg, tolerate_imb);
492       tolerate_imb = 1.0;
493     }
494     CkPrintf("Will generate plan for refine %lf imb and %lf overhead\n", tolerate_imb, 0.2);
495     return getPeriodForStrategy(tolerate_imb, 0.2, period, ratio_at_t);
496   }
497
498   GetLBDataForLB(0, tmp_max_avg_ratio, tmp_comm_ratio);
499 #endif
500
501   GetPrevLBData(tmp_lb_type, tmp_max_avg_ratio, tmp_comm_ratio);
502   tolerate_imb = tmp_max_avg_ratio;
503 //  if (max/avg < tolerate_imb) {
504 //    CkPrintf("Resorting to imb = 1.0 coz max/avg (%lf) < imb(%lf)\n", max/avg, tolerate_imb);
505 //    tolerate_imb = 1.0;
506 //  }
507   if (max/avg > tolerate_imb) {
508     if (getPeriodForStrategy(tolerate_imb, 1, period, ratio_at_t)) {
509       return true;
510     }
511   }
512
513   max = 0.0;
514   avg = 0.0;
515   for (int i = 0; i < CkpvAccess(adaptive_lbdb).history_data.size(); i++) {
516     data = CkpvAccess(adaptive_lbdb).history_data[i];
517     max += data.max_load;
518     avg += data.avg_load*tolerate_imb;
519     //DEBAD(("max (%d, %lf) avg (%d, %lf)\n", i, data.max_load, i, data.avg_load));
520     //CkPrintf("max (%d, %lf) avg (%d, %lf)\n", i, data.max_load, i, data.avg_load);
521   }
522   max /= CkpvAccess(adaptive_lbdb).history_data.size();
523   avg /= CkpvAccess(adaptive_lbdb).history_data.size();
524   double cost = CkpvAccess(adaptive_struct).lb_strategy_cost + CkpvAccess(adaptive_struct).lb_migration_cost;
525   period = cost/(max - avg); 
526   CkPrintf("Obtained period %d from constant prediction\n", period);
527   if (period < 0) { 
528     period = CkpvAccess(adaptive_struct).final_lb_period;
529     CkPrintf("Obtained -ve period from constant prediction so changing to prev %d\n", period);
530   } 
531   ratio_at_t = max / avg;
532   return true;
533 }
534
535 bool MetaBalancer::getPeriodForStrategy(double new_load_percent,
536     double overhead_percent, int& period, double& ratio_at_t) {
537   double mslope, aslope, mc, ac;
538   getLineEq(new_load_percent, aslope, ac, mslope, mc);
539   CkPrintf("new load percent %lf\n", new_load_percent);
540   CkPrintf("\n max: %fx + %f; avg: %fx + %f\n", mslope, mc, aslope, ac);
541   double a = (mslope - aslope)/2;
542   double b = (mc - ac);
543   double c = -(CkpvAccess(adaptive_struct).lb_strategy_cost +
544       CkpvAccess(adaptive_struct).lb_migration_cost) * overhead_percent;
545   bool got_period = getPeriodForLinear(a, b, c, period);
546   if (!got_period) {
547     return false;
548   }
549
550   if (mslope < 0) {
551     if (period > (-mc/mslope)) {
552       CkPrintf("Max < 0 Period set when max load is -ve\n");
553       return false;
554     }
555   }
556
557   if (aslope < 0) {
558     if (period > (-ac/aslope)) {
559       CkPrintf("Avg < 0 Period set when avg load is -ve\n");
560       return false;
561     }
562   }
563
564   int intersection_t = (mc-ac) / (aslope - mslope);
565   if (intersection_t > 0 && period > intersection_t) {
566     CkPrintf("Avg | Max Period set when curves intersect\n");
567     return false;
568   }
569   ratio_at_t = ((mslope*period + mc)/(aslope*period + ac));
570   CkPrintf("Ratio at t (%lf*%d + %lf) / (%lf*%d+%lf) = %lf\n", mslope, period, mc, aslope, period, ac, ratio_at_t);
571   return true;
572 }
573
574 bool MetaBalancer::getPeriodForLinear(double a, double b, double c, int& period) {
575   CkPrintf("Quadratic Equation %lf X^2 + %lf X + %lf\n", a, b, c);
576   if (a == 0.0) {
577     period = (-c / b);
578     if (period < 0) {
579       CkPrintf("-ve period for -c/b (%d)\n", period);
580       return false;
581     }
582     CkPrintf("Ideal period for linear load %d\n", period);
583     return true;
584   }
585   int x;
586   double t = (b * b) - (4*a*c);
587   if (t < 0) {
588     CkPrintf("(b * b) - (4*a*c) is -ve sqrt : %lf\n", sqrt(t));
589     return false;
590   }
591   t = (-b + sqrt(t)) / (2*a);
592   x = t;
593   if (x < 0) {
594     CkPrintf("boo!!! x (%d) < 0\n", x);
595     x = 0;
596     return false;
597   }
598   period = x;
599   CkPrintf("Ideal period for linear load %d\n", period);
600   return true;
601 }
602
603 bool MetaBalancer::getLineEq(double new_load_percent, double& aslope, double& ac, double& mslope, double& mc) {
604   int total = CkpvAccess(adaptive_lbdb).history_data.size();
605   int iterations = 1 + CkpvAccess(adaptive_lbdb).history_data[total - 1].iteration -
606       CkpvAccess(adaptive_lbdb).history_data[0].iteration;
607   double a1 = 0;
608   double m1 = 0;
609   double a2 = 0;
610   double m2 = 0;
611   AdaptiveData data;
612   int i = 0;
613   for (i = 0; i < total/2; i++) {
614     data = CkpvAccess(adaptive_lbdb).history_data[i];
615     m1 += data.max_load;
616     a1 += data.avg_load;
617     CkPrintf("max (%d, %lf) avg (%d, %lf) adjusted_avg (%d, %lf)\n", i, data.max_load, i, data.avg_load, i, new_load_percent*data.avg_load);
618   }
619   m1 /= i;
620   a1 = (a1 * new_load_percent) / i;
621
622   for (i = total/2; i < total; i++) {
623     data = CkpvAccess(adaptive_lbdb).history_data[i];
624     m2 += data.max_load;
625     a2 += data.avg_load;
626     CkPrintf("max (%d, %lf) avg (%d, %lf) adjusted_avg (%d, %lf)\n", i, data.max_load, i, data.avg_load, i, new_load_percent*data.avg_load);
627   }
628   m2 /= (i - total/2);
629   a2 = (a2 * new_load_percent) / (i - total/2);
630
631   aslope = 2 * (a2 - a1) / iterations;
632   mslope = 2 * (m2 - m1) / iterations;
633   ac = CkpvAccess(adaptive_lbdb).history_data[0].avg_load * new_load_percent;
634   mc = CkpvAccess(adaptive_lbdb).history_data[0].max_load;
635
636   ac = a1 - ((aslope * total)/4);
637   mc = m1 - ((mslope * total)/4);
638
639   //ac = (CkpvAccess(adaptive_lbdb).history_data[1].avg_load * new_load_percent - aslope);
640   //mc = (CkpvAccess(adaptive_lbdb).history_data[1].max_load - mslope);
641
642   return true;
643 }
644
645 void MetaBalancer::LoadBalanceDecision(int req_no, int period) {
646   if (req_no < CkpvAccess(adaptive_struct).lb_msg_recv_no) {
647     CkPrintf("Error!!! Received a request which was already sent or old\n");
648     return;
649   }
650   CkPrintf("[%d] Load balance decision made cur iteration: %d period:%d\n",CkMyPe(), CkpvAccess(adaptive_struct).lb_iteration_no, period);
651   CkpvAccess(adaptive_struct).tentative_period = period;
652   CkpvAccess(adaptive_struct).lb_msg_recv_no = req_no;
653   thisProxy[0].ReceiveIterationNo(req_no, CkpvAccess(adaptive_struct).lb_iteration_no);
654 }
655
656 void MetaBalancer::LoadBalanceDecisionFinal(int req_no, int period) {
657   if (req_no < CkpvAccess(adaptive_struct).lb_msg_recv_no) {
658     return;
659   }
660   DEBAD(("[%d] Final Load balance decision made cur iteration: %d period:%d \n",CkMyPe(), CkpvAccess(adaptive_struct).lb_iteration_no, period));
661   CkpvAccess(adaptive_struct).tentative_period = period;
662   CkpvAccess(adaptive_struct).final_lb_period = period;
663   // NOTE LDOMAdaptResumeSync(myLDHandle, period);
664   lbdatabase->AdaptResumeSync(period);
665 }
666
667
668 void MetaBalancer::ReceiveIterationNo(int req_no, int local_iter_no) {
669   CmiAssert(CkMyPe() == 0);
670
671   CkpvAccess(adaptive_struct).global_recv_iter_counter++;
672   if (local_iter_no > CkpvAccess(adaptive_struct).global_max_iter_no) {
673     CkpvAccess(adaptive_struct).global_max_iter_no = local_iter_no;
674   }
675
676   int period;
677   if (CkNumPes() == CkpvAccess(adaptive_struct).global_recv_iter_counter) {
678
679     if (CkpvAccess(adaptive_struct).global_max_iter_no > CkpvAccess(adaptive_struct).tentative_max_iter_no) {
680       CkpvAccess(adaptive_struct).tentative_max_iter_no = CkpvAccess(adaptive_struct).global_max_iter_no;
681     }
682     period = (CkpvAccess(adaptive_struct).tentative_period > CkpvAccess(adaptive_struct).global_max_iter_no) ? CkpvAccess(adaptive_struct).tentative_period : CkpvAccess(adaptive_struct).global_max_iter_no + 1;
683     // If no one has gone into load balancing stage, then we can safely change
684     // the period otherwise keep the old period.
685     if (CkpvAccess(adaptive_struct).global_max_iter_no < CkpvAccess(adaptive_struct).final_lb_period) {
686       CkpvAccess(adaptive_struct).tentative_period = period;
687       CkPrintf("Final lb_period CHANGED!%d\n", CkpvAccess(adaptive_struct).tentative_period);
688     } else {
689       CkpvAccess(adaptive_struct).tentative_period = CkpvAccess(adaptive_struct).final_lb_period;
690       CkPrintf("Final lb_period NOT CHANGED!%d\n", CkpvAccess(adaptive_struct).tentative_period);
691     }
692     thisProxy.LoadBalanceDecisionFinal(req_no, CkpvAccess(adaptive_struct).tentative_period);
693     CkpvAccess(adaptive_struct).in_progress = false;
694     CkpvAccess(adaptive_struct).global_recv_iter_counter = 0;
695   }
696 }
697
698 int MetaBalancer::getPredictedLBPeriod(bool& is_tentative) {
699   // If tentative and final_lb_period are the same, then the decision has been
700   // made but if not, they are in the middle of consensus, hence return the
701   // lease of the two
702   if (CkpvAccess(adaptive_struct).tentative_period != CkpvAccess(adaptive_struct).final_lb_period) {
703     is_tentative = true;
704   } else {
705     is_tentative = false;
706   }
707   if (CkpvAccess(adaptive_struct).tentative_period < CkpvAccess(adaptive_struct).final_lb_period) {
708     return CkpvAccess(adaptive_struct).tentative_period;
709    } else {
710      return CkpvAccess(adaptive_struct).final_lb_period;
711    }
712 }
713
714 // Called by CentralLB to indicate that the LB strategy and migration is in
715 // progress.
716 void MetaBalancer::ResetAdaptive() {
717   CkpvAccess(adaptive_lbdb).lb_iter_no = -1;
718   lb_in_progress = true;
719 }
720
721 void MetaBalancer::HandleAdaptiveNoObj() {
722 #if EXTRA_FEATURE
723   CkpvAccess(adaptive_struct).lb_iteration_no++;
724   //CkPrintf("HandleAdaptiveNoObj %d\n", adaptive_struct.lb_iteration_no);
725   thisProxy[0].RegisterNoObjCallback(CkMyPe());
726   TriggerAdaptiveReduction();
727 #endif
728 }
729
730 void MetaBalancer::RegisterNoObjCallback(int index) {
731 #if EXTRA_FEATURE
732   if (lb_in_progress) {
733     lbdb_no_obj_callback.clear();
734     //CkPrintf("Clearing and registering\n");
735     lb_in_progress = false;
736   }
737   lbdb_no_obj_callback.push_back(index);
738   CkPrintf("Registered %d to have no objs.\n", index);
739
740   // If collection has already happened and this is second iteration, then
741   // trigger reduction.
742   if (CkpvAccess(adaptive_lbdb).lb_iter_no != -1) {
743     //CkPrintf("Collection already started now %d so kick in\n", adaptive_struct.lb_iteration_no);
744     thisProxy[index].TriggerAdaptiveReduction();
745   }
746 #endif
747 }
748
749 void MetaBalancer::TriggerAdaptiveReduction() {
750 #if EXTRA_FEATURE
751   CkpvAccess(adaptive_struct).lb_iteration_no++;
752   //CkPrintf("Trigger adaptive for %d\n", CkpvAccess(adaptive_struct).lb_iteration_no);
753   double lb_data[8];
754   lb_data[0] = CkpvAccess(adaptive_struct).lb_iteration_no;
755   lb_data[1] = 1;
756   lb_data[2] = 0.0;
757   lb_data[3] = 0.0;
758   lb_data[4] = 0.0;
759   lb_data[5] = 0.0;
760   lb_data[6] = 0.0;
761   lb_data[7] = 0.0;
762
763   // CkPrintf("   [%d] sends total load %lf idle time %lf ratio of idle/load %lf at iter %d\n", CkMyPe(),
764   //     total_load_vec[iteration], idle_time,
765   //     idle_time/total_load_vec[iteration], adaptive_struct.lb_iteration_no);
766
767   CkCallback cb(CkIndex_MetaBalancer::ReceiveMinStats((CkReductionMsg*)NULL), thisProxy[0]);
768   contribute(8*sizeof(double), lb_data, lbDataCollectionType, cb);
769 #endif
770 }
771
772
773 bool MetaBalancer::isStrategyComm() {
774   return CkpvAccess(adaptive_struct).doCommStrategy;
775 }
776
777 void MetaBalancer::SetMigrationCost(double lb_migration_cost) {
778   CkpvAccess(adaptive_struct).lb_migration_cost = lb_migration_cost;
779 }
780
781 void MetaBalancer::SetStrategyCost(double lb_strategy_cost) {
782   CkpvAccess(adaptive_struct).lb_strategy_cost = lb_strategy_cost;
783 }
784
785 void MetaBalancer::UpdateAfterLBData(int lb, double lb_max, double lb_avg, double
786     local_comm, double remote_comm) {
787   CkpvAccess(adaptive_struct).last_lb_type = lb;
788   if (lb == 0) {
789     CkpvAccess(adaptive_struct).greedy_info.max_avg_ratio = lb_max/lb_avg;
790   } else if (lb == 1) {
791     CkpvAccess(adaptive_struct).refine_info.max_avg_ratio = lb_max/lb_avg;
792   } else if (lb == 2) {
793     CkpvAccess(adaptive_struct).comm_info.remote_local_ratio = remote_comm/local_comm;
794   } else if (lb == 3) {
795     CkpvAccess(adaptive_struct).comm_refine_info.remote_local_ratio =
796     remote_comm/local_comm;
797   }
798 }
799
800 void MetaBalancer::UpdateAfterLBData(double max_load, double max_cpu, double
801 avg_load) {
802   if (CkpvAccess(adaptive_struct).last_lb_type == -1) {
803     CkpvAccess(adaptive_struct).last_lb_type = 0;
804   }
805   int lb = CkpvAccess(adaptive_struct).last_lb_type;
806   //CkPrintf("Storing data after lb ratio %lf for lb %d\n", max_load/avg_load, lb);
807   if (lb == 0) {
808     CkpvAccess(adaptive_struct).greedy_info.max_avg_ratio = max_load/avg_load;
809   } else if (lb == 1) {
810     CkpvAccess(adaptive_struct).refine_info.max_avg_ratio = max_load/avg_load;
811   } else if (lb == 2) {
812     CkpvAccess(adaptive_struct).comm_info.max_avg_ratio = max_load/avg_load;
813   } else if (lb == 3) {
814     CkpvAccess(adaptive_struct).comm_refine_info.max_avg_ratio = max_load/avg_load;
815   }
816 }
817
818 void MetaBalancer::UpdateAfterLBComm(double alpha_beta_to_load) {
819   CkPrintf("Setting alpha beta %lf\n", alpha_beta_to_load);
820   alpha_beta_cost_to_load = alpha_beta_to_load;
821 }
822
823
824 void MetaBalancer::GetPrevLBData(int& lb_type, double& lb_max_avg_ratio, double&
825     remote_local_comm_ratio) {
826   lb_type = CkpvAccess(adaptive_struct).last_lb_type;
827   lb_max_avg_ratio = 1;
828   remote_local_comm_ratio = 1;
829   GetLBDataForLB(lb_type, lb_max_avg_ratio, remote_local_comm_ratio);
830 }
831
832 void MetaBalancer::GetLBDataForLB(int lb_type, double& lb_max_avg_ratio, double&
833     remote_local_comm_ratio) {
834   if (lb_type == 0) {
835     lb_max_avg_ratio = CkpvAccess(adaptive_struct).greedy_info.max_avg_ratio;
836   } else if (lb_type == 1) {
837     lb_max_avg_ratio = CkpvAccess(adaptive_struct).refine_info.max_avg_ratio;
838   } else if (lb_type == 2) {
839     remote_local_comm_ratio = CkpvAccess(adaptive_struct).comm_info.remote_local_ratio;
840   } else if (lb_type == 3) {
841     remote_local_comm_ratio =
842        CkpvAccess(adaptive_struct).comm_refine_info.remote_local_ratio;
843   }
844 }
845
846 #include "MetaBalancer.def.h"
847
848 /*@}*/