Refactor metabalancer portion and move it out of lbdatabase
[charm.git] / src / ck-ldb / MetaBalancer.C
1 /**
2  * \addtogroup CkLdb
3 */
4 /*@{*/
5
6 #include "converse.h"
7
8 /*
9  * This C++ file contains the Charm stub functions
10  */
11
12 #include "MetaBalancer.h"
13 #include "LBSimulation.h"
14 #include "topology.h"
15
16 #include "limits.h"
17
18 #include "NullLB.h"
19
20 #define VEC_SIZE 50
21 #define IMB_TOLERANCE 1.1
22 #define OUTOFWAY_TOLERANCE 2
23 #define UTILIZATION_THRESHOLD 0.7
24 #define NEGLECT_IDLE 2 // Should never be == 1
25 #define MIN_STATS 6
26
27 #   define DEBAD(x) /*CkPrintf x*/
28 #   define EXTRA_FEATURE 0
29
30 struct AdaptiveData {
31   double iteration;
32   double max_load;
33   double avg_load;
34   double utilization;
35   double idle_time;
36 };
37
38 struct AdaptiveMetaBalancer {
39   std::vector<AdaptiveData> history_data;
40   int lb_iter_no;
41 };
42
43 struct AdaptiveLBInfo {
44   AdaptiveLBInfo() {
45     max_avg_ratio = 1;
46     remote_local_ratio = 1;
47   }
48   double max_avg_ratio;
49   double remote_local_ratio;
50 };
51
52 // TODO: Separate out the datastructure required by just the central and on all
53 // processors
54 struct AdaptiveLBStructure {
55   int tentative_period;
56   int final_lb_period;
57   // This is based on the linear extrapolation
58   int lb_calculated_period;
59   int lb_iteration_no;
60   // This is set when all the processor sends the maximum iteration no
61   int global_max_iter_no;
62   // This keeps track of what was the max iteration no we had previously
63   // received. TODO: Mostly global_max_iter_no should be sufficied.
64   int tentative_max_iter_no;
65   // TODO: Use reduction to collect max iteration. Then we don't need this
66   // counter.
67   int global_recv_iter_counter;
68   // true indicates it is in Inform->ReceiveMaxIter->FinalLBPeriod stage.
69   bool in_progress;
70   double lb_strategy_cost;
71   double lb_migration_cost;
72   bool doCommStrategy;
73   int lb_msg_send_no;
74   int lb_msg_recv_no;
75   // Total AtSync calls from all the chares residing on the processor
76   int total_syncs_called;
77   int last_lb_type;
78   AdaptiveLBInfo greedy_info;
79   AdaptiveLBInfo refine_info;
80   AdaptiveLBInfo comm_info;
81   AdaptiveLBInfo comm_refine_info;
82 };
83
84
85 CkReductionMsg* lbDataCollection(int nMsg, CkReductionMsg** msgs) {
86   double lb_data[8];
87   lb_data[1] = 0.0; // total number of processors contributing
88   lb_data[2] = 0.0; // total load
89   lb_data[3] = 0.0; // max load
90   lb_data[4] = 0.0; // idle time
91   lb_data[5] = 1.0; // utilization
92   lb_data[6] = 0.0; // total load with bg
93   lb_data[7] = 0.0; // max load with bg
94   for (int i = 0; i < nMsg; i++) {
95     CkAssert(msgs[i]->getSize() == 8*sizeof(double));
96     if (msgs[i]->getSize() != 8*sizeof(double)) {
97       CkPrintf("Error!!! Reduction not correct. Msg size is %d\n", msgs[i]->getSize());
98     }
99     double* m = (double *)msgs[i]->getData();
100     // Total count
101     lb_data[1] += m[1];
102     // Avg load
103     lb_data[2] += m[2];
104     // Max load
105     lb_data[3] = ((m[3] > lb_data[3])? m[3] : lb_data[3]);
106     // Avg idle
107     lb_data[4] += m[4];
108     // Get least utilization
109     lb_data[5] = ((m[5] < lb_data[5]) ? m[5] : lb_data[5]);
110     // Get Avg load with bg
111     lb_data[6] += m[6];
112     // Get Max load with bg
113     lb_data[7] = ((m[7] > lb_data[7])? m[7] : lb_data[7]);
114     if (i == 0) {
115       // Iteration no
116       lb_data[0] = m[0];
117     }
118     if (m[0] != lb_data[0]) {
119       CkPrintf("Error!!! Reduction is intermingled between iteration %lf and\
120       %lf\n", lb_data[0], m[0]);
121       CkAbort("Intermingling iterations\n");
122     }
123   }
124   return CkReductionMsg::buildNew(8*sizeof(double), lb_data);
125 }
126
127 /*global*/ CkReduction::reducerType lbDataCollectionType;
128 /*initcall*/ void registerLBDataCollection(void) {
129   lbDataCollectionType = CkReduction::addReducer(lbDataCollection);
130 }
131
132 CkGroupID _metalb;
133
134 CkpvDeclare(int, metalbInited);  /**< true if metabalancer is inited */
135
136 CkpvDeclare(AdaptiveMetaBalancer, adaptive_lbdb);
137 CkpvDeclare(AdaptiveLBStructure, adaptive_struct);
138
139 // mainchare
140 MetaLBInit::MetaLBInit(CkArgMsg *m)
141 {
142 #if CMK_LBDB_ON
143   _metalb = CProxy_MetaBalancer::ckNew();
144 #endif
145   CkPrintf("META LB Init Called\n");
146   delete m;
147 }
148
149 // called from init.C
150 void _metabalancerInit()
151 {
152   CkpvInitialize(int, metalbInited);
153   CkpvAccess(metalbInited) = 0;
154 }
155
156 void MetaBalancer::initnodeFn()
157 {
158 }
159
160 // called my constructor
161 void MetaBalancer::init(void)
162 {
163   CkPrintf("Metabalancer init %d lbdb\n", _lbdb);
164   lbdatabase = (LBDatabase *)CkLocalBranch(_lbdb);
165   CkpvAccess(metalbInited) = 1;
166   total_load_vec.resize(VEC_SIZE, 0.0);
167   total_count_vec.resize(VEC_SIZE, 0);
168   max_iteration = -1;
169   prev_idle = 0.0;
170   alpha_beta_cost_to_load = 1.0; // Some random value. TODO: Find the actual
171
172   CkpvInitialize(AdaptiveMetaBalancer, adaptive_lbdb);
173   CkpvInitialize(AdaptiveLBStructure, adaptive_struct);
174
175   CkpvAccess(adaptive_lbdb).lb_iter_no = -1;
176
177   // If metabalancer enabled, initialize the variables
178   CkpvAccess(adaptive_struct).tentative_period =  INT_MAX;
179   CkpvAccess(adaptive_struct).final_lb_period =  INT_MAX;
180   CkpvAccess(adaptive_struct).lb_calculated_period = INT_MAX;
181   CkpvAccess(adaptive_struct).lb_iteration_no = -1;
182   CkpvAccess(adaptive_struct).global_max_iter_no = 0;
183   CkpvAccess(adaptive_struct).tentative_max_iter_no = -1;
184   CkpvAccess(adaptive_struct).global_recv_iter_counter = 0;
185   CkpvAccess(adaptive_struct).in_progress = false;
186   CkpvAccess(adaptive_struct).lb_strategy_cost = 0.0;
187   CkpvAccess(adaptive_struct).lb_migration_cost = 0.0;
188   CkpvAccess(adaptive_struct).lb_msg_send_no = 0;
189   CkpvAccess(adaptive_struct).lb_msg_recv_no = 0;
190   CkpvAccess(adaptive_struct).total_syncs_called = 0;
191   CkpvAccess(adaptive_struct).last_lb_type = -1;
192
193
194   // This is indicating if the load balancing strategy and migration started.
195   // This is mainly used to register callbacks for noobj pes. They would
196   // register as soon as resumefromsync is called. On receiving the handles at
197   // the central pe, it clears the previous handlers and sets lb_in_progress
198   // to false so that it doesn't clear the handles.
199   lb_in_progress = false;
200
201   is_prev_lb_refine = -1;
202 }
203
204 void MetaBalancer::pup(PUP::er& p)
205 {
206         IrrGroup::pup(p);
207   if (p.isUnpacking()) {
208     lbdatabase = (LBDatabase *)CkLocalBranch(_lbdb);
209   }
210   // NOTE set lbdatabase using the id
211 }
212
213
214 void MetaBalancer::ResumeClients() {
215   // If metabalancer enabled, initialize the variables
216   CkpvAccess(adaptive_lbdb).history_data.clear();
217
218   CkpvAccess(adaptive_struct).tentative_period =  INT_MAX;
219   CkpvAccess(adaptive_struct).final_lb_period =  INT_MAX;
220   CkpvAccess(adaptive_struct).lb_calculated_period = INT_MAX;
221   CkpvAccess(adaptive_struct).lb_iteration_no = -1;
222   CkpvAccess(adaptive_struct).global_max_iter_no = 0;
223   CkpvAccess(adaptive_struct).tentative_max_iter_no = -1;
224   CkpvAccess(adaptive_struct).global_recv_iter_counter = 0;
225   CkpvAccess(adaptive_struct).in_progress = false;
226   CkpvAccess(adaptive_struct).lb_strategy_cost = 0.0;
227   CkpvAccess(adaptive_struct).lb_migration_cost = 0.0;
228   CkpvAccess(adaptive_struct).lb_msg_send_no = 0;
229   CkpvAccess(adaptive_struct).lb_msg_recv_no = 0;
230   CkpvAccess(adaptive_struct).total_syncs_called = 0;
231
232   prev_idle = 0.0;
233   if (lb_in_progress) {
234     lbdb_no_obj_callback.clear();
235     lb_in_progress = false;
236   }
237   // While resuming client, if we find that there are no objects, then handle
238   // the case accordingly.
239   if (lbdatabase->getLBDB()->ObjDataCount() == 0) {
240     CkPrintf("%d processor has 0 objs\n", CkMyPe());
241     HandleAdaptiveNoObj();
242   }
243 }
244
245 int MetaBalancer::get_iteration() {
246   return CkpvAccess(adaptive_struct).lb_iteration_no;
247 }
248
249 bool MetaBalancer::AddLoad(int it_n, double load) {
250   int index = it_n % VEC_SIZE;
251   total_count_vec[index]++;
252   CkpvAccess(adaptive_struct).total_syncs_called++;
253   DEBAD(("At PE %d Total contribution for iteration %d is %d total objs %d\n",
254       CkMyPe(), it_n, total_count_vec[index],
255       lbdatabase->getLBDB()->ObjDataCount()));
256
257   if (it_n < CkpvAccess(adaptive_struct).lb_iteration_no) {
258     CkAbort("Error!! Received load for previous iteration\n");
259   }
260   if (it_n > CkpvAccess(adaptive_struct).lb_iteration_no) {
261     CkpvAccess(adaptive_struct).lb_iteration_no = it_n;
262   }
263   total_load_vec[index] += load;
264   if (total_count_vec[index] > lbdatabase->getLBDB()->ObjDataCount()) {
265     CkPrintf("iteration %d received %d contributions and expected %d\n", it_n,
266         total_count_vec[index], lbdatabase->getLBDB()->ObjDataCount());
267     CkAbort("Abort!!! Received more contribution");
268   }
269
270   if (total_count_vec[index] == lbdatabase->getLBDB()->ObjDataCount()) {
271     double idle_time, bg_walltime, cpu_bgtime;
272     lbdatabase->IdleTime(&idle_time);
273     lbdatabase->BackgroundLoad(&bg_walltime, &cpu_bgtime);
274
275     int sync_for_bg = CkpvAccess(adaptive_struct).total_syncs_called +
276         lbdatabase->getLBDB()->ObjDataCount();
277     bg_walltime = bg_walltime * lbdatabase->getLBDB()->ObjDataCount() / sync_for_bg;
278
279     if (it_n < NEGLECT_IDLE) {
280       prev_idle = idle_time;
281     }
282     idle_time -= prev_idle;
283
284     // The chares do not contribute their 0th iteration load. So the total syncs
285     // in reality is total_syncs_called + obj_counts
286     int total_countable_syncs = CkpvAccess(adaptive_struct).total_syncs_called +
287         (1 - NEGLECT_IDLE) * lbdatabase->getLBDB()->ObjDataCount(); // TODO: Fix me! weird!
288     if (total_countable_syncs != 0) {
289       idle_time = idle_time * lbdatabase->getLBDB()->ObjDataCount() / total_countable_syncs;
290     }
291     //CkPrintf("[%d] Idle time %lf and countable %d for iteration %d\n", CkMyPe(), idle_time, total_countable_syncs, iteration);
292
293     double lb_data[8];
294     lb_data[0] = it_n;
295     lb_data[1] = 1;
296     lb_data[2] = total_load_vec[index]; // For average load
297     lb_data[3] = total_load_vec[index]; // For max load
298     lb_data[4] = idle_time;
299     // Set utilization
300     if (total_load_vec[index] == 0.0) {
301       lb_data[5] = 0.0;
302     } else {
303       lb_data[5] = total_load_vec[index]/(idle_time + total_load_vec[index]);
304     }
305     lb_data[6] = lb_data[2] + bg_walltime; // For Avg load with bg
306     lb_data[7] = lb_data[6]; // For Max load with bg
307     total_load_vec[index] = 0.0;
308     total_count_vec[index] = 0;
309
310     //CkPrintf("   [%d] sends total load %lf idle time %lf ratio of idle/load %lf at iter %d\n", CkMyPe(),
311     //    total_load_vec[iteration], idle_time,
312     //    idle_time/total_load_vec[iteration], adaptive_struct.lb_iteration_no);
313
314     CkCallback cb(CkIndex_MetaBalancer::ReceiveMinStats((CkReductionMsg*)NULL), thisProxy[0]);
315     contribute(8*sizeof(double), lb_data, lbDataCollectionType, cb);
316   }
317   return true;
318 }
319
320 void MetaBalancer::ReceiveMinStats(CkReductionMsg *msg) {
321   double* load = (double *) msg->getData();
322   double avg = load[2]/load[1];
323   double max = load[3];
324   double avg_idle = load[4]/load[1];
325   double utilization = load[5];
326   int iteration_n = load[0];
327   double avg_load_bg = load[6]/load[1];
328   double max_load_bg = load[7];
329   DEBAD(("** [%d] Iteration Avg load: %lf Max load: %lf Avg Idle : %lf Max Idle : %lf for %lf procs\n",iteration_n, avg, max, avg_idle, utilization, load[1]));
330   CkPrintf("** [%d] Iteration Avg load: %lf Max load: %lf With bg Avg load: %lf Max load: %lf Avg Idle : %lf Max Idle : %lf for %lf procs\n",iteration_n, avg, max, avg_load_bg, max_load_bg, avg_idle, utilization, load[1]);
331   delete msg;
332
333 #if EXTRA_FEATURE
334   if (CkpvAccess(adaptive_struct).final_lb_period != iteration_n) {
335     for (int i = 0; i < lbdb_no_obj_callback.size(); i++) {
336       thisProxy[lbdb_no_obj_callback[i]].TriggerAdaptiveReduction();
337     }
338   }
339 #endif
340
341   // Store the data for this iteration
342   CkpvAccess(adaptive_lbdb).lb_iter_no = iteration_n;
343   AdaptiveData data;
344   data.iteration = CkpvAccess(adaptive_lbdb).lb_iter_no;
345   data.max_load = max;
346   data.avg_load = avg;
347   data.utilization = utilization;
348   data.idle_time = avg_idle;
349   CkpvAccess(adaptive_lbdb).history_data.push_back(data);
350
351   // If lb period inform is in progress, dont inform again.
352   // If this is the lb data corresponding to the final lb period informed, then
353   // don't recalculate as some of the processors might have already gone into
354   // LB_STAGE.
355   if (CkpvAccess(adaptive_struct).in_progress || (CkpvAccess(adaptive_struct).final_lb_period == iteration_n)) {
356     return;
357   }
358
359   double utilization_threshold = UTILIZATION_THRESHOLD;
360
361 #if EXTRA_FEATURE
362   CkPrintf("alpha_beta_to_load %lf\n", alpha_beta_cost_to_load);
363   if (alpha_beta_cost_to_load < 0.1) {
364     // Ignore the effect of idle time and there by lesser utilization. So we
365     // assign utilization threshold to be 0.0
366     CkPrintf("Changing the idle load tolerance coz this isn't communication intensive benchmark\n");
367     utilization_threshold = 0.0;
368   }
369 #endif
370
371   // First generate the lb period based on the cost of lb. Find out what is the
372   // expected imbalance at the calculated lb period.
373   int period;
374   // This is based on the new max load after load balancing. So technically, it
375   // is calculated based on the shifter up avg curve.
376   double ratio_at_t = 1.0;
377   int tmp_lb_type;
378   double tmp_max_avg_ratio, tmp_comm_ratio;
379   GetPrevLBData(tmp_lb_type, tmp_max_avg_ratio, tmp_comm_ratio);
380   double tolerate_imb = IMB_TOLERANCE * tmp_max_avg_ratio;
381
382   if (generatePlan(period, ratio_at_t)) {
383     DEBAD(("Generated period and calculated %d and period %d max iter %d\n",
384       CkpvAccess(adaptive_struct).lb_calculated_period, period,
385       CkpvAccess(adaptive_struct).tentative_max_iter_no));
386     // set the imbalance tolerance to be ratio_at_calculated_lb_period
387     if (ratio_at_t != 1.0) {
388       CkPrintf("Changed tolerance to %lf after line eq whereas max/avg is %lf\n", ratio_at_t, max/avg);
389       // Since ratio_at_t is shifter up, max/(tmp_max_avg_ratio * avg) should be
390       // compared with the tolerance
391       tolerate_imb = ratio_at_t * tmp_max_avg_ratio * OUTOFWAY_TOLERANCE;
392     }
393
394     CkPrintf("Prev LB Data Type %d, max/avg %lf, local/remote %lf\n", tmp_lb_type, tmp_max_avg_ratio, tmp_comm_ratio);
395
396     if ((utilization < utilization_threshold || max/avg >= tolerate_imb) &&
397           CkpvAccess(adaptive_lbdb).history_data.size() > MIN_STATS) {
398       CkPrintf("Trigger soon even though we calculated lbperiod max/avg(%lf) and utilization ratio (%lf)\n", max/avg, utilization);
399       TriggerSoon(iteration_n, max/avg, tolerate_imb);
400       return;
401     }
402
403     // If the new lb period from linear extrapolation is greater than maximum
404     // iteration known from previously collected data, then inform all the
405     // processors about the new calculated period.
406     if (period > CkpvAccess(adaptive_struct).tentative_max_iter_no && period !=
407           CkpvAccess(adaptive_struct).final_lb_period) {
408       CkpvAccess(adaptive_struct).doCommStrategy = false;
409       CkpvAccess(adaptive_struct).lb_calculated_period = period;
410       CkpvAccess(adaptive_struct).in_progress = true;
411       CkPrintf("Sticking to the calculated period %d\n",
412         CkpvAccess(adaptive_struct).lb_calculated_period);
413       thisProxy.LoadBalanceDecision(CkpvAccess(adaptive_struct).lb_msg_send_no++,
414         CkpvAccess(adaptive_struct).lb_calculated_period);
415       return;
416     }
417     // TODO: Shouldn't we return from here??
418   }
419
420   CkPrintf("Prev LB Data Type %d, max/avg %lf, local/remote %lf\n", tmp_lb_type, tmp_max_avg_ratio, tmp_comm_ratio);
421
422   // This would be called when the datasize is not enough to calculate lb period
423   if ((utilization < utilization_threshold || max/avg >= tolerate_imb) && CkpvAccess(adaptive_lbdb).history_data.size() > 4) {
424     CkPrintf("Carry out load balancing step at iter max/avg(%lf) and utilization ratio (%lf)\n", max/avg, utilization);
425     TriggerSoon(iteration_n, max/avg, tolerate_imb);
426     return;
427   }
428
429 }
430
431 void MetaBalancer::TriggerSoon(int iteration_n, double imbalance_ratio,
432     double tolerate_imb) {
433
434   // If the previously calculated_period (not the final decision) is greater
435   // than the iter +1 and if it is greater than the maximum iteration we have
436   // seen so far, then we can inform this
437   if ((iteration_n + 1 > CkpvAccess(adaptive_struct).tentative_max_iter_no) &&
438       (iteration_n+1 < CkpvAccess(adaptive_struct).lb_calculated_period) &&
439       (iteration_n + 1 != CkpvAccess(adaptive_struct).final_lb_period)) {
440     if (imbalance_ratio < tolerate_imb) {
441       CkpvAccess(adaptive_struct).doCommStrategy = true;
442       CkPrintf("No load imbalance but idle time\n");
443     } else {
444       CkpvAccess(adaptive_struct).doCommStrategy = false;
445       CkPrintf("load imbalance \n");
446     }
447     CkpvAccess(adaptive_struct).lb_calculated_period = iteration_n + 1;
448     CkpvAccess(adaptive_struct).in_progress = true;
449     CkPrintf("Informing everyone the lb period is %d\n",
450         CkpvAccess(adaptive_struct).lb_calculated_period);
451     thisProxy.LoadBalanceDecision(CkpvAccess(adaptive_struct).lb_msg_send_no++,
452         CkpvAccess(adaptive_struct).lb_calculated_period);
453   }
454 }
455
456 bool MetaBalancer::generatePlan(int& period, double& ratio_at_t) {
457   if (CkpvAccess(adaptive_lbdb).history_data.size() <= 4) {
458     return false;
459   }
460
461   // Some heuristics for lbperiod
462   // If constant load or almost constant,
463   // then max * new_lb_period > avg * new_lb_period + lb_cost
464   double max = 0.0;
465   double avg = 0.0;
466   AdaptiveData data;
467   for (int i = 0; i < CkpvAccess(adaptive_lbdb).history_data.size(); i++) {
468     data = CkpvAccess(adaptive_lbdb).history_data[i];
469     max += data.max_load;
470     avg += data.avg_load;
471     //DEBAD(("max (%d, %lf) avg (%d, %lf)\n", i, data.max_load, i, data.avg_load));
472     //CkPrintf("max (%d, %lf) avg (%d, %lf)\n", i, data.max_load, i, data.avg_load);
473   }
474 //  max /= (adaptive_struct.lb_iteration_no - CkpvAccess(adaptive_lbdb).history_data[0].iteration);
475 //  avg /= (adaptive_struct.lb_iteration_no - CkpvAccess(adaptive_lbdb).history_data[0].iteration);
476
477   // If linearly varying load, then find lb_period
478   // area between the max and avg curve
479   // If we can attain perfect balance, then the new load is close to the
480   // average. Hence we pass 1, else pass in some other value which would be the
481   // new max_load after load balancing.
482   int tmp_lb_type;
483   double tmp_max_avg_ratio, tmp_comm_ratio;
484   double tolerate_imb;
485
486 #if EXTRA_FEATURE
487   // First get the data for refine.
488   GetLBDataForLB(1, tmp_max_avg_ratio, tmp_comm_ratio);
489   tolerate_imb = tmp_max_avg_ratio;
490
491   // If RefineLB does a good job, then find the period considering RefineLB
492   if (tmp_max_avg_ratio <= 1.01) {
493     if (max/avg < tolerate_imb) {
494       CkPrintf("Resorting to imb = 1.0 coz max/avg (%lf) < imb(%lf)\n", max/avg, tolerate_imb);
495       tolerate_imb = 1.0;
496     }
497     CkPrintf("Will generate plan for refine %lf imb and %lf overhead\n", tolerate_imb, 0.2);
498     return getPeriodForStrategy(tolerate_imb, 0.2, period, ratio_at_t);
499   }
500
501   GetLBDataForLB(0, tmp_max_avg_ratio, tmp_comm_ratio);
502 #endif
503
504   GetPrevLBData(tmp_lb_type, tmp_max_avg_ratio, tmp_comm_ratio);
505   tolerate_imb = tmp_max_avg_ratio;
506 //  if (max/avg < tolerate_imb) {
507 //    CkPrintf("Resorting to imb = 1.0 coz max/avg (%lf) < imb(%lf)\n", max/avg, tolerate_imb);
508 //    tolerate_imb = 1.0;
509 //  }
510   if (max/avg > tolerate_imb) {
511     if (getPeriodForStrategy(tolerate_imb, 1, period, ratio_at_t)) {
512       return true;
513     }
514   }
515
516   max = 0.0;
517   avg = 0.0;
518   for (int i = 0; i < CkpvAccess(adaptive_lbdb).history_data.size(); i++) {
519     data = CkpvAccess(adaptive_lbdb).history_data[i];
520     max += data.max_load;
521     avg += data.avg_load*tolerate_imb;
522     //DEBAD(("max (%d, %lf) avg (%d, %lf)\n", i, data.max_load, i, data.avg_load));
523     //CkPrintf("max (%d, %lf) avg (%d, %lf)\n", i, data.max_load, i, data.avg_load);
524   }
525   max /= CkpvAccess(adaptive_lbdb).history_data.size();
526   avg /= CkpvAccess(adaptive_lbdb).history_data.size();
527   double cost = CkpvAccess(adaptive_struct).lb_strategy_cost + CkpvAccess(adaptive_struct).lb_migration_cost;
528   period = cost/(max - avg); 
529   CkPrintf("Obtained period %d from constant prediction\n", period);
530   if (period < 0) { 
531     period = CkpvAccess(adaptive_struct).final_lb_period;
532     CkPrintf("Obtained -ve period from constant prediction so changing to prev %d\n", period);
533   } 
534   ratio_at_t = max / avg;
535   return true;
536 }
537
538 bool MetaBalancer::getPeriodForStrategy(double new_load_percent,
539     double overhead_percent, int& period, double& ratio_at_t) {
540   double mslope, aslope, mc, ac;
541   getLineEq(new_load_percent, aslope, ac, mslope, mc);
542   CkPrintf("new load percent %lf\n", new_load_percent);
543   CkPrintf("\n max: %fx + %f; avg: %fx + %f\n", mslope, mc, aslope, ac);
544   double a = (mslope - aslope)/2;
545   double b = (mc - ac);
546   double c = -(CkpvAccess(adaptive_struct).lb_strategy_cost +
547       CkpvAccess(adaptive_struct).lb_migration_cost) * overhead_percent;
548   bool got_period = getPeriodForLinear(a, b, c, period);
549   if (!got_period) {
550     return false;
551   }
552
553   if (mslope < 0) {
554     if (period > (-mc/mslope)) {
555       CkPrintf("Max < 0 Period set when max load is -ve\n");
556       return false;
557     }
558   }
559
560   if (aslope < 0) {
561     if (period > (-ac/aslope)) {
562       CkPrintf("Avg < 0 Period set when avg load is -ve\n");
563       return false;
564     }
565   }
566
567   int intersection_t = (mc-ac) / (aslope - mslope);
568   if (intersection_t > 0 && period > intersection_t) {
569     CkPrintf("Avg | Max Period set when curves intersect\n");
570     return false;
571   }
572   ratio_at_t = ((mslope*period + mc)/(aslope*period + ac));
573   CkPrintf("Ratio at t (%lf*%d + %lf) / (%lf*%d+%lf) = %lf\n", mslope, period, mc, aslope, period, ac, ratio_at_t);
574   return true;
575 }
576
577 bool MetaBalancer::getPeriodForLinear(double a, double b, double c, int& period) {
578   CkPrintf("Quadratic Equation %lf X^2 + %lf X + %lf\n", a, b, c);
579   if (a == 0.0) {
580     period = (-c / b);
581     if (period < 0) {
582       CkPrintf("-ve period for -c/b (%d)\n", period);
583       return false;
584     }
585     CkPrintf("Ideal period for linear load %d\n", period);
586     return true;
587   }
588   int x;
589   double t = (b * b) - (4*a*c);
590   if (t < 0) {
591     CkPrintf("(b * b) - (4*a*c) is -ve sqrt : %lf\n", sqrt(t));
592     return false;
593   }
594   t = (-b + sqrt(t)) / (2*a);
595   x = t;
596   if (x < 0) {
597     CkPrintf("boo!!! x (%d) < 0\n", x);
598     x = 0;
599     return false;
600   }
601   period = x;
602   CkPrintf("Ideal period for linear load %d\n", period);
603   return true;
604 }
605
606 bool MetaBalancer::getLineEq(double new_load_percent, double& aslope, double& ac, double& mslope, double& mc) {
607   int total = CkpvAccess(adaptive_lbdb).history_data.size();
608   int iterations = 1 + CkpvAccess(adaptive_lbdb).history_data[total - 1].iteration -
609       CkpvAccess(adaptive_lbdb).history_data[0].iteration;
610   double a1 = 0;
611   double m1 = 0;
612   double a2 = 0;
613   double m2 = 0;
614   AdaptiveData data;
615   int i = 0;
616   for (i = 0; i < total/2; i++) {
617     data = CkpvAccess(adaptive_lbdb).history_data[i];
618     m1 += data.max_load;
619     a1 += data.avg_load;
620     CkPrintf("max (%d, %lf) avg (%d, %lf) adjusted_avg (%d, %lf)\n", i, data.max_load, i, data.avg_load, i, new_load_percent*data.avg_load);
621   }
622   m1 /= i;
623   a1 = (a1 * new_load_percent) / i;
624
625   for (i = total/2; i < total; i++) {
626     data = CkpvAccess(adaptive_lbdb).history_data[i];
627     m2 += data.max_load;
628     a2 += data.avg_load;
629     CkPrintf("max (%d, %lf) avg (%d, %lf) adjusted_avg (%d, %lf)\n", i, data.max_load, i, data.avg_load, i, new_load_percent*data.avg_load);
630   }
631   m2 /= (i - total/2);
632   a2 = (a2 * new_load_percent) / (i - total/2);
633
634   aslope = 2 * (a2 - a1) / iterations;
635   mslope = 2 * (m2 - m1) / iterations;
636   ac = CkpvAccess(adaptive_lbdb).history_data[0].avg_load * new_load_percent;
637   mc = CkpvAccess(adaptive_lbdb).history_data[0].max_load;
638
639   ac = a1 - ((aslope * total)/4);
640   mc = m1 - ((mslope * total)/4);
641
642   //ac = (CkpvAccess(adaptive_lbdb).history_data[1].avg_load * new_load_percent - aslope);
643   //mc = (CkpvAccess(adaptive_lbdb).history_data[1].max_load - mslope);
644
645   return true;
646 }
647
648 void MetaBalancer::LoadBalanceDecision(int req_no, int period) {
649   if (req_no < CkpvAccess(adaptive_struct).lb_msg_recv_no) {
650     CkPrintf("Error!!! Received a request which was already sent or old\n");
651     return;
652   }
653   CkPrintf("[%d] Load balance decision made cur iteration: %d period:%d\n",CkMyPe(), CkpvAccess(adaptive_struct).lb_iteration_no, period);
654   CkpvAccess(adaptive_struct).tentative_period = period;
655   CkpvAccess(adaptive_struct).lb_msg_recv_no = req_no;
656   thisProxy[0].ReceiveIterationNo(req_no, CkpvAccess(adaptive_struct).lb_iteration_no);
657 }
658
659 void MetaBalancer::LoadBalanceDecisionFinal(int req_no, int period) {
660   if (req_no < CkpvAccess(adaptive_struct).lb_msg_recv_no) {
661     return;
662   }
663   DEBAD(("[%d] Final Load balance decision made cur iteration: %d period:%d \n",CkMyPe(), CkpvAccess(adaptive_struct).lb_iteration_no, period));
664   CkpvAccess(adaptive_struct).tentative_period = period;
665   CkpvAccess(adaptive_struct).final_lb_period = period;
666   // NOTE LDOMAdaptResumeSync(myLDHandle, period);
667   lbdatabase->AdaptResumeSync(period);
668 }
669
670
671 void MetaBalancer::ReceiveIterationNo(int req_no, int local_iter_no) {
672   CmiAssert(CkMyPe() == 0);
673
674   CkpvAccess(adaptive_struct).global_recv_iter_counter++;
675   if (local_iter_no > CkpvAccess(adaptive_struct).global_max_iter_no) {
676     CkpvAccess(adaptive_struct).global_max_iter_no = local_iter_no;
677   }
678
679   int period;
680   if (CkNumPes() == CkpvAccess(adaptive_struct).global_recv_iter_counter) {
681
682     if (CkpvAccess(adaptive_struct).global_max_iter_no > CkpvAccess(adaptive_struct).tentative_max_iter_no) {
683       CkpvAccess(adaptive_struct).tentative_max_iter_no = CkpvAccess(adaptive_struct).global_max_iter_no;
684     }
685     period = (CkpvAccess(adaptive_struct).tentative_period > CkpvAccess(adaptive_struct).global_max_iter_no) ? CkpvAccess(adaptive_struct).tentative_period : CkpvAccess(adaptive_struct).global_max_iter_no + 1;
686     // If no one has gone into load balancing stage, then we can safely change
687     // the period otherwise keep the old period.
688     if (CkpvAccess(adaptive_struct).global_max_iter_no < CkpvAccess(adaptive_struct).final_lb_period) {
689       CkpvAccess(adaptive_struct).tentative_period = period;
690       CkPrintf("Final lb_period CHANGED!%d\n", CkpvAccess(adaptive_struct).tentative_period);
691     } else {
692       CkpvAccess(adaptive_struct).tentative_period = CkpvAccess(adaptive_struct).final_lb_period;
693       CkPrintf("Final lb_period NOT CHANGED!%d\n", CkpvAccess(adaptive_struct).tentative_period);
694     }
695     thisProxy.LoadBalanceDecisionFinal(req_no, CkpvAccess(adaptive_struct).tentative_period);
696     CkpvAccess(adaptive_struct).in_progress = false;
697     CkpvAccess(adaptive_struct).global_recv_iter_counter = 0;
698   }
699 }
700
701 int MetaBalancer::getPredictedLBPeriod(bool& is_tentative) {
702   // If tentative and final_lb_period are the same, then the decision has been
703   // made but if not, they are in the middle of consensus, hence return the
704   // lease of the two
705   if (CkpvAccess(adaptive_struct).tentative_period != CkpvAccess(adaptive_struct).final_lb_period) {
706     is_tentative = true;
707   } else {
708     is_tentative = false;
709   }
710   if (CkpvAccess(adaptive_struct).tentative_period < CkpvAccess(adaptive_struct).final_lb_period) {
711     return CkpvAccess(adaptive_struct).tentative_period;
712    } else {
713      return CkpvAccess(adaptive_struct).final_lb_period;
714    }
715 }
716
717 // Called by CentralLB to indicate that the LB strategy and migration is in
718 // progress.
719 void MetaBalancer::ResetAdaptive() {
720   CkpvAccess(adaptive_lbdb).lb_iter_no = -1;
721   lb_in_progress = true;
722 }
723
724 void MetaBalancer::HandleAdaptiveNoObj() {
725 #if EXTRA_FEATURE
726   CkpvAccess(adaptive_struct).lb_iteration_no++;
727   //CkPrintf("HandleAdaptiveNoObj %d\n", adaptive_struct.lb_iteration_no);
728   thisProxy[0].RegisterNoObjCallback(CkMyPe());
729   TriggerAdaptiveReduction();
730 #endif
731 }
732
733 void MetaBalancer::RegisterNoObjCallback(int index) {
734 #if EXTRA_FEATURE
735   if (lb_in_progress) {
736     lbdb_no_obj_callback.clear();
737     //CkPrintf("Clearing and registering\n");
738     lb_in_progress = false;
739   }
740   lbdb_no_obj_callback.push_back(index);
741   CkPrintf("Registered %d to have no objs.\n", index);
742
743   // If collection has already happened and this is second iteration, then
744   // trigger reduction.
745   if (CkpvAccess(adaptive_lbdb).lb_iter_no != -1) {
746     //CkPrintf("Collection already started now %d so kick in\n", adaptive_struct.lb_iteration_no);
747     thisProxy[index].TriggerAdaptiveReduction();
748   }
749 #endif
750 }
751
752 void MetaBalancer::TriggerAdaptiveReduction() {
753 #if EXTRA_FEATURE
754   CkpvAccess(adaptive_struct).lb_iteration_no++;
755   //CkPrintf("Trigger adaptive for %d\n", CkpvAccess(adaptive_struct).lb_iteration_no);
756   double lb_data[8];
757   lb_data[0] = CkpvAccess(adaptive_struct).lb_iteration_no;
758   lb_data[1] = 1;
759   lb_data[2] = 0.0;
760   lb_data[3] = 0.0;
761   lb_data[4] = 0.0;
762   lb_data[5] = 0.0;
763   lb_data[6] = 0.0;
764   lb_data[7] = 0.0;
765
766   // CkPrintf("   [%d] sends total load %lf idle time %lf ratio of idle/load %lf at iter %d\n", CkMyPe(),
767   //     total_load_vec[iteration], idle_time,
768   //     idle_time/total_load_vec[iteration], adaptive_struct.lb_iteration_no);
769
770   CkCallback cb(CkIndex_MetaBalancer::ReceiveMinStats((CkReductionMsg*)NULL), thisProxy[0]);
771   contribute(8*sizeof(double), lb_data, lbDataCollectionType, cb);
772 #endif
773 }
774
775
776 bool MetaBalancer::isStrategyComm() {
777   return CkpvAccess(adaptive_struct).doCommStrategy;
778 }
779
780 void MetaBalancer::SetMigrationCost(double lb_migration_cost) {
781   CkpvAccess(adaptive_struct).lb_migration_cost = lb_migration_cost;
782 }
783
784 void MetaBalancer::SetStrategyCost(double lb_strategy_cost) {
785   CkpvAccess(adaptive_struct).lb_strategy_cost = lb_strategy_cost;
786 }
787
788 void MetaBalancer::UpdateAfterLBData(int lb, double lb_max, double lb_avg, double
789     local_comm, double remote_comm) {
790   CkpvAccess(adaptive_struct).last_lb_type = lb;
791   if (lb == 0) {
792     CkpvAccess(adaptive_struct).greedy_info.max_avg_ratio = lb_max/lb_avg;
793   } else if (lb == 1) {
794     CkpvAccess(adaptive_struct).refine_info.max_avg_ratio = lb_max/lb_avg;
795   } else if (lb == 2) {
796     CkpvAccess(adaptive_struct).comm_info.remote_local_ratio = remote_comm/local_comm;
797   } else if (lb == 3) {
798     CkpvAccess(adaptive_struct).comm_refine_info.remote_local_ratio =
799     remote_comm/local_comm;
800   }
801 }
802
803 void MetaBalancer::UpdateAfterLBData(double max_load, double max_cpu, double
804 avg_load) {
805   if (CkpvAccess(adaptive_struct).last_lb_type == -1) {
806     CkpvAccess(adaptive_struct).last_lb_type = 0;
807   }
808   int lb = CkpvAccess(adaptive_struct).last_lb_type;
809   //CkPrintf("Storing data after lb ratio %lf for lb %d\n", max_load/avg_load, lb);
810   if (lb == 0) {
811     CkpvAccess(adaptive_struct).greedy_info.max_avg_ratio = max_load/avg_load;
812   } else if (lb == 1) {
813     CkpvAccess(adaptive_struct).refine_info.max_avg_ratio = max_load/avg_load;
814   } else if (lb == 2) {
815     CkpvAccess(adaptive_struct).comm_info.max_avg_ratio = max_load/avg_load;
816   } else if (lb == 3) {
817     CkpvAccess(adaptive_struct).comm_refine_info.max_avg_ratio = max_load/avg_load;
818   }
819 }
820
821 void MetaBalancer::UpdateAfterLBComm(double alpha_beta_to_load) {
822   CkPrintf("Setting alpha beta %lf\n", alpha_beta_to_load);
823   alpha_beta_cost_to_load = alpha_beta_to_load;
824 }
825
826
827 void MetaBalancer::GetPrevLBData(int& lb_type, double& lb_max_avg_ratio, double&
828     remote_local_comm_ratio) {
829   lb_type = CkpvAccess(adaptive_struct).last_lb_type;
830   lb_max_avg_ratio = 1;
831   remote_local_comm_ratio = 1;
832   GetLBDataForLB(lb_type, lb_max_avg_ratio, remote_local_comm_ratio);
833 }
834
835 void MetaBalancer::GetLBDataForLB(int lb_type, double& lb_max_avg_ratio, double&
836     remote_local_comm_ratio) {
837   if (lb_type == 0) {
838     lb_max_avg_ratio = CkpvAccess(adaptive_struct).greedy_info.max_avg_ratio;
839   } else if (lb_type == 1) {
840     lb_max_avg_ratio = CkpvAccess(adaptive_struct).refine_info.max_avg_ratio;
841   } else if (lb_type == 2) {
842     remote_local_comm_ratio = CkpvAccess(adaptive_struct).comm_info.remote_local_ratio;
843   } else if (lb_type == 3) {
844     remote_local_comm_ratio =
845        CkpvAccess(adaptive_struct).comm_refine_info.remote_local_ratio;
846   }
847 }
848
849 #include "MetaBalancer.def.h"
850
851 /*@}*/