27deed0d046ee482bde8e77698926de954cf2ca5
[charm.git] / src / ck-ldb / LBDatabase.C
1 /**
2  * \addtogroup CkLdb
3 */
4 /*@{*/
5
6 #include "converse.h"
7
8 /*
9  * This C++ file contains the Charm stub functions
10  */
11
12 #include "LBDatabase.h"
13 #include "LBSimulation.h"
14 #include "topology.h"
15
16 #include "limits.h"
17
18 #include "NullLB.h"
19
20 struct AdaptiveData {
21   int iteration;
22   double max_load;
23   double avg_load;
24   double max_idle_time;
25   double idle_time;
26 };
27
28 struct AdaptiveLBDatabase {
29   std::vector<AdaptiveData> history_data;
30 } adaptive_lbdb;
31
32 struct AdaptiveLBStructure {
33   int lb_ideal_period;
34   int lb_calculated_period;
35   int lb_no_iterations;
36   int global_max_iter_no;
37   int global_recv_iter_counter;
38   bool in_progress;
39   double prev_load;
40   double lb_strategy_cost;
41   double lb_migration_cost;
42   bool lb_period_informed;
43   bool isCommLB;
44   int lb_msg_send_no;
45   int lb_msg_recv_no;
46   int total_syncs_called;
47 } adaptive_struct;
48
49
50 CkReductionMsg* lbDataCollection(int nMsg, CkReductionMsg** msgs) {
51   double lb_data[6];
52   lb_data[1] = 0.0;
53   lb_data[2] = 0.0;
54   lb_data[3] = 0.0;
55   lb_data[4] = 0.0;
56   lb_data[5] = 0.0;
57   for (int i = 0; i < nMsg; i++) {
58     CkAssert(msgs[i]->getSize() == 4*sizeof(double));
59     double* m = (double *)msgs[i]->getData();
60     lb_data[1] += m[1];
61     lb_data[2] = ((m[2] > lb_data[2])? m[2] : lb_data[2]);
62     lb_data[3] += m[3];
63     lb_data[4] = ((m[4] > lb_data[4]) ? m[4] : lb_data[4]);
64     lb_data[5] += m[5];
65     if (i == 0) {
66       lb_data[0] = m[0];
67     }
68     if (m[0] != lb_data[0]) {
69       CkPrintf("Error!!! Reduction is intermingled between iteration %lf and\
70       %lf\n", lb_data[0], m[0]);
71     }
72   }
73   return CkReductionMsg::buildNew(6*sizeof(double), lb_data);
74 }
75
76 /*global*/ CkReduction::reducerType lbDataCollectionType;
77 /*initcall*/ void registerLBDataCollection(void) {
78   lbDataCollectionType = CkReduction::addReducer(lbDataCollection);
79 }
80
81 CkGroupID _lbdb;
82
83 CkpvDeclare(int, numLoadBalancers);  /**< num of lb created */
84 CkpvDeclare(int, hasNullLB);         /**< true if NullLB is created */
85 CkpvDeclare(int, lbdatabaseInited);  /**< true if lbdatabase is inited */
86
87 // command line options
88 CkLBArgs _lb_args;
89 int _lb_predict=0;
90 int _lb_predict_delay=10;
91 int _lb_predict_window=20;
92
93 // registry class stores all load balancers linked and created at runtime
94 class LBDBRegistry {
95 friend class LBDBInit;
96 friend class LBDatabase;
97 private:
98   // table for all available LBs linked in
99   struct LBDBEntry {
100     const char *name;
101     LBCreateFn  cfn;
102     LBAllocFn   afn;
103     const char *help;
104     int         shown;          // if 0, donot show in help page
105     LBDBEntry(): name(0), cfn(0), afn(0), help(0), shown(1) {}
106     LBDBEntry(int) {}
107     LBDBEntry(const char *n, LBCreateFn cf, LBAllocFn af, 
108               const char *h, int show=1):
109       name(n), cfn(cf), afn(af), help(h), shown(show) {};
110   };
111   CkVec<LBDBEntry> lbtables;            // a list of available LBs linked
112   CkVec<const char *>   compile_lbs;    // load balancers at compile time
113   CkVec<const char *>   runtime_lbs;    // load balancers at run time
114 public:
115   LBDBRegistry() {}
116   void displayLBs()
117   {
118     CmiPrintf("\nAvailable load balancers:\n");
119     for (int i=0; i<lbtables.length(); i++) {
120       LBDBEntry &entry = lbtables[i];
121       if (entry.shown) CmiPrintf("* %s: %s\n", entry.name, entry.help);
122     }
123     CmiPrintf("\n");
124   }
125   void addEntry(const char *name, LBCreateFn fn, LBAllocFn afn, const char *help, int shown) {
126     lbtables.push_back(LBDBEntry(name, fn, afn, help, shown));
127   }
128   void addCompiletimeBalancer(const char *name) {
129     compile_lbs.push_back(name); 
130   }
131   void addRuntimeBalancer(const char *name) {
132     runtime_lbs.push_back(name); 
133   }
134   LBCreateFn search(const char *name) {
135     char *ptr = strpbrk((char *)name, ":,");
136     int slen = ptr!=NULL?ptr-name:strlen(name);
137     for (int i=0; i<lbtables.length(); i++)
138       if (0==strncmp(name, lbtables[i].name, slen)) return lbtables[i].cfn;
139     return NULL;
140   }
141   LBAllocFn getLBAllocFn(const char *name) {
142     char *ptr = strpbrk((char *)name, ":,");
143     int slen = ptr-name;
144     for (int i=0; i<lbtables.length(); i++)
145       if (0==strncmp(name, lbtables[i].name, slen)) return lbtables[i].afn;
146     return NULL;
147   }
148 };
149
150 static LBDBRegistry lbRegistry;
151
152 void LBDefaultCreate(const char *lbname)
153 {
154   lbRegistry.addCompiletimeBalancer(lbname);
155 }
156
157 // default is to show the helper
158 void LBRegisterBalancer(const char *name, LBCreateFn fn, LBAllocFn afn, const char *help, int shown)
159 {
160   lbRegistry.addEntry(name, fn, afn, help, shown);
161 }
162
163 LBAllocFn getLBAllocFn(char *lbname) {
164     return lbRegistry.getLBAllocFn(lbname);
165 }
166
167 LBCreateFn getLBCreateFn(const char *lbname) {
168     return lbRegistry.search(lbname);
169 }
170 // create a load balancer group using the strategy name
171 static void createLoadBalancer(const char *lbname)
172 {
173     LBCreateFn fn = lbRegistry.search(lbname);
174     if (!fn) {    // invalid lb name
175       CmiPrintf("Abort: Unknown load balancer: '%s'!\n", lbname);
176       lbRegistry.displayLBs();    // display help page
177       CkAbort("Abort");
178     }
179     // invoke function to create load balancer 
180     fn();
181 }
182
183 // mainchare
184 LBDBInit::LBDBInit(CkArgMsg *m)
185 {
186 #if CMK_LBDB_ON
187   _lbdb = CProxy_LBDatabase::ckNew();
188
189   // runtime specified load balancer
190   if (lbRegistry.runtime_lbs.size() > 0) {
191     for (int i=0; i<lbRegistry.runtime_lbs.size(); i++) {
192       const char *balancer = lbRegistry.runtime_lbs[i];
193       createLoadBalancer(balancer);
194     }
195   }
196   else if (lbRegistry.compile_lbs.size() > 0) {
197     for (int i=0; i<lbRegistry.compile_lbs.size(); i++) {
198       const char* balancer = lbRegistry.compile_lbs[i];
199       createLoadBalancer(balancer);
200     }
201   }
202   else {
203     // NullLB is the default when none of above lb created
204     // note user may create his own load balancer in his code manually like
205     // in NAMD, but never mind NullLB can disable itself if there is 
206     // a non NULL LB.
207     createLoadBalancer("NullLB");
208   }
209
210   // simulation mode
211   if (LBSimulation::doSimulation) {
212     CmiPrintf("Charm++> Entering Load Balancer Simulation Mode ... \n");
213     CProxy_LBDatabase(_lbdb).ckLocalBranch()->StartLB();
214   }
215 #endif
216   delete m;
217 }
218
219 // called from init.C
220 void _loadbalancerInit()
221 {
222   CkpvInitialize(int, lbdatabaseInited);
223   CkpvAccess(lbdatabaseInited) = 0;
224   CkpvInitialize(int, numLoadBalancers);
225   CkpvAccess(numLoadBalancers) = 0;
226   CkpvInitialize(int, hasNullLB);
227   CkpvAccess(hasNullLB) = 0;
228
229   char **argv = CkGetArgv();
230   char *balancer = NULL;
231   CmiArgGroup("Charm++","Load Balancer");
232   while (CmiGetArgStringDesc(argv, "+balancer", &balancer, "Use this load balancer")) {
233     if (CkMyRank() == 0)                
234       lbRegistry.addRuntimeBalancer(balancer);   /* lbRegistry is a static */
235   }
236
237   // set up init value for LBPeriod time in seconds
238   // it can also be set by calling LDSetLBPeriod()
239   CmiGetArgDoubleDesc(argv,"+LBPeriod", &_lb_args.lbperiod(),"the minimum time period in seconds allowed for two consecutive automatic load balancing");
240   _lb_args.loop() = CmiGetArgFlagDesc(argv, "+LBLoop", "Use multiple load balancing strategies in loop");
241
242   // now called in cldb.c: CldModuleGeneralInit()
243   // registerLBTopos();
244   CmiGetArgStringDesc(argv, "+LBTopo", &_lbtopo, "define load balancing topology");
245   //Read the K parameter for RefineKLB
246   CmiGetArgIntDesc(argv, "+LBNumMoves", &_lb_args.percentMovesAllowed() , "Percentage of chares to be moved (used by RefineKLB) [0-100]");
247
248   /**************** FUTURE PREDICTOR ****************/
249   _lb_predict = CmiGetArgFlagDesc(argv, "+LBPredictor", "Turn on LB future predictor");
250   CmiGetArgIntDesc(argv, "+LBPredictorDelay", &_lb_predict_delay, "Number of balance steps before learning a model");
251   CmiGetArgIntDesc(argv, "+LBPredictorWindow", &_lb_predict_window, "Number of steps to use to learn a model");
252   if (_lb_predict_window < _lb_predict_delay) {
253     CmiPrintf("LB> [%d] Argument LBPredictorWindow (%d) less than LBPredictorDelay (%d) , fixing\n", CkMyPe(), _lb_predict_window, _lb_predict_delay);
254     _lb_predict_delay = _lb_predict_window;
255   }
256
257   /******************* SIMULATION *******************/
258   // get the step number at which to dump the LB database
259   CmiGetArgIntDesc(argv, "+LBVersion", &_lb_args.lbversion(), "LB database file version number");
260   CmiGetArgIntDesc(argv, "+LBCentPE", &_lb_args.central_pe(), "CentralLB processor");
261   int _lb_dump_activated = 0;
262   if (CmiGetArgIntDesc(argv, "+LBDump", &LBSimulation::dumpStep, "Dump the LB state from this step"))
263     _lb_dump_activated = 1;
264   if (_lb_dump_activated && LBSimulation::dumpStep < 0) {
265     CmiPrintf("LB> Argument LBDump (%d) negative, setting to 0\n",LBSimulation::dumpStep);
266     LBSimulation::dumpStep = 0;
267   }
268   CmiGetArgIntDesc(argv, "+LBDumpSteps", &LBSimulation::dumpStepSize, "Dump the LB state for this amount of steps");
269   if (LBSimulation::dumpStepSize <= 0) {
270     CmiPrintf("LB> Argument LBDumpSteps (%d) too small, setting to 1\n",LBSimulation::dumpStepSize);
271     LBSimulation::dumpStepSize = 1;
272   }
273   CmiGetArgStringDesc(argv, "+LBDumpFile", &LBSimulation::dumpFile, "Set the LB state file name");
274   // get the simulation flag and number. Now the flag can also be avoided by the presence of the number
275   LBSimulation::doSimulation = CmiGetArgIntDesc(argv, "+LBSim", &LBSimulation::simStep, "Read LB state from LBDumpFile since this step");
276   // check for stupid LBSim parameter
277   if (LBSimulation::doSimulation && LBSimulation::simStep < 0) {
278     CmiPrintf("LB> Argument LBSim (%d) invalid, should be >= 0\n");
279     CkExit();
280     return;
281   }
282   CmiGetArgIntDesc(argv, "+LBSimSteps", &LBSimulation::simStepSize, "Read LB state for this number of steps");
283   if (LBSimulation::simStepSize <= 0) {
284     CmiPrintf("LB> Argument LBSimSteps (%d) too small, setting to 1\n",LBSimulation::simStepSize);
285     LBSimulation::simStepSize = 1;
286   }
287
288
289   LBSimulation::simProcs = 0;
290   CmiGetArgIntDesc(argv, "+LBSimProcs", &LBSimulation::simProcs, "Number of target processors.");
291
292   LBSimulation::showDecisionsOnly = 
293     CmiGetArgFlagDesc(argv, "+LBShowDecisions",
294                       "Write to File: Load Balancing Object to Processor Map decisions during LB Simulation");
295
296   // force a global barrier after migration done
297   _lb_args.syncResume() = CmiGetArgFlagDesc(argv, "+LBSyncResume", 
298                   "LB performs a barrier after migration is finished");
299
300   // both +LBDebug and +LBDebug level should work
301   if (!CmiGetArgIntDesc(argv, "+LBDebug", &_lb_args.debug(), 
302                                           "Turn on LB debugging printouts"))
303     _lb_args.debug() = CmiGetArgFlagDesc(argv, "+LBDebug", 
304                                              "Turn on LB debugging printouts");
305
306   // getting the size of the team with +teamSize
307   if (!CmiGetArgIntDesc(argv, "+teamSize", &_lb_args.teamSize(), 
308                                           "Team size"))
309     _lb_args.teamSize() = 1;
310
311   // ask to print summary/quality of load balancer
312   _lb_args.printSummary() = CmiGetArgFlagDesc(argv, "+LBPrintSummary",
313                 "Print load balancing result summary");
314
315   // to ignore baclground load
316   _lb_args.ignoreBgLoad() = CmiGetArgFlagDesc(argv, "+LBNoBackground", 
317                       "Load balancer ignores the background load.");
318 #ifdef __BIGSIM__
319   _lb_args.ignoreBgLoad() = 1;
320 #endif
321   _lb_args.migObjOnly() = CmiGetArgFlagDesc(argv, "+LBObjOnly", 
322                       "Only load balancing migratable objects, ignoring all others.");
323   if (_lb_args.migObjOnly()) _lb_args.ignoreBgLoad() = 1;
324
325   // assume all CPUs are identical
326   _lb_args.testPeSpeed() = CmiGetArgFlagDesc(argv, "+LBTestPESpeed", 
327                       "Load balancer test all CPUs speed.");
328   _lb_args.samePeSpeed() = CmiGetArgFlagDesc(argv, "+LBSameCpus", 
329                       "Load balancer assumes all CPUs are of same speed.");
330   if (!_lb_args.testPeSpeed()) _lb_args.samePeSpeed() = 1;
331
332   _lb_args.useCpuTime() = CmiGetArgFlagDesc(argv, "+LBUseCpuTime", 
333                       "Load balancer uses CPU time instead of wallclock time.");
334
335   // turn instrumentation off at startup
336   _lb_args.statsOn() = !CmiGetArgFlagDesc(argv, "+LBOff",
337                         "Turn load balancer instrumentation off");
338
339   // turn instrumentation of communicatin off at startup
340   _lb_args.traceComm() = !CmiGetArgFlagDesc(argv, "+LBCommOff",
341                 "Turn load balancer instrumentation of communication off");
342
343   // set alpha and beeta
344   _lb_args.alpha() = PER_MESSAGE_SEND_OVERHEAD_DEFAULT;
345   _lb_args.beeta() = PER_BYTE_SEND_OVERHEAD_DEFAULT;
346   CmiGetArgDoubleDesc(argv,"+LBAlpha", &_lb_args.alpha(),
347                            "per message send overhead");
348   CmiGetArgDoubleDesc(argv,"+LBBeta", &_lb_args.beeta(),
349                            "per byte send overhead");
350
351   if (CkMyPe() == 0) {
352     if (_lb_args.debug()) {
353       CmiPrintf("CharmLB> Verbose level %d, load balancing period: %g seconds\n", _lb_args.debug(), _lb_args.lbperiod());
354     }
355     if (_lb_args.debug() > 1) {
356       CmiPrintf("CharmLB> Topology %s alpha: %es beta: %es.\n", _lbtopo, _lb_args.alpha(), _lb_args.beeta());
357     }
358     if (_lb_args.printSummary())
359       CmiPrintf("CharmLB> Load balancer print summary of load balancing result.\n");
360     if (_lb_args.ignoreBgLoad())
361       CmiPrintf("CharmLB> Load balancer ignores processor background load.\n");
362     if (_lb_args.samePeSpeed())
363       CmiPrintf("CharmLB> Load balancer assumes all CPUs are same.\n");
364     if (_lb_args.useCpuTime())
365       CmiPrintf("CharmLB> Load balancer uses CPU time instead of wallclock time.\n");
366     if (LBSimulation::doSimulation)
367       CmiPrintf("CharmLB> Load balancer running in simulation mode on file '%s' version %d.\n", LBSimulation::dumpFile, _lb_args.lbversion());
368     if (_lb_args.statsOn()==0)
369       CkPrintf("CharmLB> Load balancing instrumentation is off.\n");
370     if (_lb_args.traceComm()==0)
371       CkPrintf("CharmLB> Load balancing instrumentation for communication is off.\n");
372     if (_lb_args.migObjOnly())
373       CkPrintf("LB> Load balancing strategy ignores non-migratable objects.\n");
374   }
375 }
376
377 int LBDatabase::manualOn = 0;
378 char *LBDatabase::avail_vector = NULL;
379 CmiNodeLock avail_vector_lock;
380
381 static LBRealType * _expectedLoad = NULL;
382
383 void LBDatabase::initnodeFn()
384 {
385   int proc;
386   int num_proc = CkNumPes();
387   avail_vector= new char[num_proc];
388   for(proc = 0; proc < num_proc; proc++)
389       avail_vector[proc] = 1;
390   avail_vector_lock = CmiCreateLock();
391
392   _expectedLoad = new LBRealType[num_proc];
393   for (proc=0; proc<num_proc; proc++) _expectedLoad[proc]=0.0;
394 }
395
396 // called my constructor
397 void LBDatabase::init(void) 
398 {
399   //thisProxy = CProxy_LBDatabase(thisgroup);
400   myLDHandle = LDCreate();
401   mystep = 0;
402   nloadbalancers = 0;
403   new_ld_balancer = 0;
404
405   CkpvAccess(lbdatabaseInited) = 1;
406 #if CMK_LBDB_ON
407   if (manualOn) TurnManualLBOn();
408 #endif
409   
410   max_load_vec.resize(100, 0.0);
411   total_load_vec.resize(100, 0.0);
412   total_contrib_vec.resize(100, 0.0);
413   max_iteration = -1;
414
415   // If metabalancer enabled, initialize the variables
416   adaptive_struct.lb_ideal_period =  INT_MAX;
417   adaptive_struct.lb_calculated_period = INT_MAX;
418   adaptive_struct.lb_no_iterations = -1;
419   adaptive_struct.global_max_iter_no = 0;
420   adaptive_struct.global_recv_iter_counter = 0;
421   adaptive_struct.in_progress = false;
422   adaptive_struct.prev_load = 0.0;
423   adaptive_struct.lb_strategy_cost = 0.0;
424   adaptive_struct.lb_migration_cost = 0.0;
425   adaptive_struct.lb_msg_send_no = 0;
426   adaptive_struct.lb_msg_recv_no = 0;
427   adaptive_struct.total_syncs_called = 0;
428
429   is_prev_lb_refine = -1;
430 }
431
432 LBDatabase::LastLBInfo::LastLBInfo()
433 {
434   expectedLoad = _expectedLoad;
435 }
436
437 void LBDatabase::get_avail_vector(char * bitmap) {
438     CmiAssert(bitmap && avail_vector);
439     const int num_proc = CkNumPes();
440     for(int proc = 0; proc < num_proc; proc++){
441       bitmap[proc] = avail_vector[proc];
442     }
443 }
444
445 // new_ld == -1(default) : calcualte a new ld
446 //           -2 : ignore new ld
447 //           >=0: given a new ld
448 void LBDatabase::set_avail_vector(char * bitmap, int new_ld){
449     int assigned = 0;
450     const int num_proc = CkNumPes();
451     if (new_ld == -2) assigned = 1;
452     else if (new_ld >= 0) {
453       CmiAssert(new_ld < num_proc);
454       new_ld_balancer = new_ld;
455       assigned = 1;
456     }
457     CmiAssert(bitmap && avail_vector);
458     for(int count = 0; count < num_proc; count++){
459         avail_vector[count] = bitmap[count];
460         if((bitmap[count] == 1) && !assigned){
461             new_ld_balancer = count;
462             assigned = 1;
463         }
464     }
465 }
466
467 // called in CreateFooLB() when multiple load balancers are created
468 // on PE0, BaseLB of each load balancer applies a ticket number
469 // and broadcast the ticket number to all processors
470 int LBDatabase::getLoadbalancerTicket()  { 
471   int seq = nloadbalancers;
472   nloadbalancers ++;
473   loadbalancers.resize(nloadbalancers); 
474   loadbalancers[seq] = NULL;
475   return seq; 
476 }
477
478 void LBDatabase::addLoadbalancer(BaseLB *lb, int seq) {
479 //  CmiPrintf("[%d] addLoadbalancer for seq %d\n", CkMyPe(), seq);
480   if (seq == -1) return;
481   if (CkMyPe() == 0) {
482     CmiAssert(seq < nloadbalancers);
483     if (loadbalancers[seq]) {
484       CmiPrintf("Duplicate load balancer created at %d\n", seq);
485       CmiAbort("LBDatabase");
486     }
487   }
488   else
489     nloadbalancers ++;
490   loadbalancers.resize(seq+1);
491   loadbalancers[seq] = lb;
492 }
493
494 // switch strategy in order
495 void LBDatabase::nextLoadbalancer(int seq) {
496   if (seq == -1) return;                // -1 means this is the only LB
497   int next = seq+1;
498   if (_lb_args.loop()) {
499     if (next == nloadbalancers) next = 0;
500   }
501   else {
502     if (next == nloadbalancers) next --;  // keep using the last one
503   }
504   if (seq != next) {
505     loadbalancers[seq]->turnOff();
506     CmiAssert(loadbalancers[next]);
507     loadbalancers[next]->turnOn();
508   }
509 }
510
511 // return the seq-th load balancer string name of
512 // it can be specified in either compile time or runtime
513 // runtime has higher priority
514 const char *LBDatabase::loadbalancer(int seq) {
515   if (lbRegistry.runtime_lbs.length()) {
516     CmiAssert(seq < lbRegistry.runtime_lbs.length());
517     return lbRegistry.runtime_lbs[seq];
518   }
519   else {
520     CmiAssert(seq < lbRegistry.compile_lbs.length());
521     return lbRegistry.compile_lbs[seq];
522   }
523 }
524
525 void LBDatabase::pup(PUP::er& p)
526
527         IrrGroup::pup(p); 
528         // the memory should be already allocated
529         int np;
530         if (!p.isUnpacking()) np = CkNumPes();
531         p|np;
532         CmiAssert(avail_vector);
533         // in case number of processors changes
534         if (p.isUnpacking() && np > CkNumPes()) {
535                 CmiLock(avail_vector_lock);
536                 delete [] avail_vector;
537                 avail_vector = new char[np];
538                 for (int i=0; i<np; i++) avail_vector[i] = 1;
539                 CmiUnlock(avail_vector_lock);
540         }
541         p(avail_vector, np);
542         p|mystep;
543         if(p.isUnpacking()) nloadbalancers = 0;
544 }
545
546
547 void LBDatabase::EstObjLoad(const LDObjHandle &_h, double cputime)
548 {
549 #if CMK_LBDB_ON
550   LBDB *const db = (LBDB*)(_h.omhandle.ldb.handle);
551   LBObj *const obj = db->LbObj(_h);
552
553   CmiAssert(obj != NULL);
554   obj->setTiming(cputime);
555 #endif
556 }
557
558 void LBDatabase::ResumeClients() {
559   // If metabalancer enabled, initialize the variables
560   adaptive_lbdb.history_data.clear();
561
562   adaptive_struct.lb_ideal_period =  INT_MAX;
563   adaptive_struct.lb_calculated_period = INT_MAX;
564   adaptive_struct.lb_no_iterations = -1;
565   adaptive_struct.global_max_iter_no = 0;
566   adaptive_struct.global_recv_iter_counter = 0;
567   adaptive_struct.in_progress = false;
568   adaptive_struct.prev_load = 0.0;
569   adaptive_struct.lb_strategy_cost = 0.0;
570   adaptive_struct.lb_migration_cost = 0.0;
571   adaptive_struct.lb_msg_send_no = 0;
572   adaptive_struct.lb_msg_recv_no = 0;
573   adaptive_struct.total_syncs_called = 0;
574   
575   max_load_vec.clear();
576   total_load_vec.clear();
577   total_contrib_vec.clear();
578
579   max_load_vec.resize(100, 0.0);
580   total_load_vec.resize(100, 0.0);
581   total_contrib_vec.resize(100, 0.0);
582
583   LDResumeClients(myLDHandle);
584 }
585
586 bool LBDatabase::AddLoad(int iteration, double load) {
587   total_contrib_vec[iteration]++;
588   adaptive_struct.total_syncs_called++;
589   //CkPrintf("At PE %d Total contribution for iteration %d is %lf total objs %d\n", CkMyPe(), iteration,
590   //total_contrib_vec[iteration], getLBDB()->ObjDataCount());
591
592   if (iteration > adaptive_struct.lb_no_iterations) {
593     adaptive_struct.lb_no_iterations = iteration;
594   }
595   total_load_vec[iteration] += load;
596  // if (max_load_vec[iteration] < load) {
597  //   max_load_vec[iteration] = load;
598  // }
599   if (total_contrib_vec[iteration] == getLBDB()->ObjDataCount()) {
600     double idle_time;
601     double tmp;
602     GetTime(&tmp, &tmp, &idle_time, &tmp, &tmp);
603     idle_time = idle_time * iteration * getLBDB()->ObjDataCount() /
604       adaptive_struct.total_syncs_called;
605
606     double lb_data[6];
607     lb_data[0] = iteration;
608     lb_data[1] = total_load_vec[iteration];
609     //lb_data[2] = max_load_vec[iteration];
610     lb_data[2] = total_load_vec[iteration];
611     //lb_data[3] = getLBDB()->ObjDataCount();
612     lb_data[3] = 1;
613     lb_data[4] = idle_time;
614     lb_data[5] = idle_time;
615
616     CkPrintf("[%d] sends total load %lf idle time %lf at iter %d\n", CkMyPe(),
617         total_load_vec[iteration], idle_time, adaptive_struct.lb_no_iterations);
618
619     CkCallback cb(CkIndex_LBDatabase::ReceiveMinStats((CkReductionMsg*)NULL), thisProxy[0]);
620     contribute(6*sizeof(double), lb_data, lbDataCollectionType, cb);
621   }
622   return true;
623 }
624
625 void LBDatabase::ReceiveMinStats(CkReductionMsg *msg) {
626   double* load = (double *) msg->getData();
627   double max = load[2];
628   double avg = load[1]/load[3];
629   double max_idle = load[5];
630   double avg_idle = load[4]/load[3];
631   int iteration_n = load[0];
632   CkPrintf("** [%d] Iteration Avg load: %lf Max load: %lf Avg Idle : %lf Max Idle : %lf for %lf procs\n",iteration_n, avg, max, avg_idle, max_idle, load[3]);
633   delete msg;
634  
635   // Store the data for this iteration
636   adaptive_struct.lb_no_iterations = iteration_n;
637   AdaptiveData data;
638   data.iteration = adaptive_struct.lb_no_iterations;
639   data.max_load = max;
640   data.avg_load = avg;
641   data.max_idle_time = max_idle;
642   data.idle_time = avg_idle;
643   adaptive_lbdb.history_data.push_back(data);
644
645   // If lb period inform is in progress, dont inform again
646   if (adaptive_struct.in_progress) {
647     return;
648   }
649
650 //  if (adaptive_struct.lb_period_informed) {
651 //    return;
652 //  }
653
654   // If the max/avg ratio is greater than the threshold and also this is not the
655   // step immediately after load balancing, carry out load balancing
656   //if (max/avg >= 1.1 && adaptive_lbdb.history_data.size() > 4) {
657   if ((max_idle >= 0.1*avg || max/avg >= 1.5) && adaptive_lbdb.history_data.size() > 4) {
658     CkPrintf("Carry out load balancing step at iter max/avg(%lf) and avg_idle/avg_load (%lf)\n", max/avg, avg_idle/avg);
659 //    if (!adaptive_struct.lb_period_informed) {
660 //      // Just for testing
661 //      adaptive_struct.lb_calculated_period = 40;
662 //      adaptive_struct.lb_period_informed = true;
663 //      thisProxy.LoadBalanceDecision(adaptive_struct.lb_calculated_period);
664 //      return;
665 //    }
666
667     // If the new lb period is less than current set lb period
668     if (adaptive_struct.lb_calculated_period > iteration_n + 1) {
669       if (max/avg < 1.5) {
670         adaptive_struct.isCommLB = true;
671         CkPrintf("No load imbalance but idle time\n");
672       } else {
673         adaptive_struct.isCommLB = false;
674         CkPrintf("Has load imbalance\n");
675       }
676       adaptive_struct.lb_calculated_period = iteration_n + 1;
677       adaptive_struct.lb_period_informed = true;
678       adaptive_struct.in_progress = true;
679       CkPrintf("Informing everyone the lb period is %d\n",
680           adaptive_struct.lb_calculated_period);
681       thisProxy.LoadBalanceDecision(adaptive_struct.lb_msg_send_no++, adaptive_struct.lb_calculated_period);
682     }
683     return;
684   }
685
686   // Generate the plan for the adaptive strategy
687   int period;
688   if (generatePlan(period)) {
689     //CkPrintf("Carry out load balancing step at iter\n");
690
691     // If the new lb period is less than current set lb period
692     if (adaptive_struct.lb_calculated_period > period) {
693       adaptive_struct.lb_calculated_period = period;
694       adaptive_struct.in_progress = true;
695       adaptive_struct.lb_period_informed = true;
696       CkPrintf("Informing everyone the lb period is %d\n",
697           adaptive_struct.lb_calculated_period);
698       thisProxy.LoadBalanceDecision(adaptive_struct.lb_msg_send_no++, adaptive_struct.lb_calculated_period);
699     }
700   }
701 }
702
703 bool LBDatabase::generatePlan(int& period) {
704   if (adaptive_lbdb.history_data.size() <= 8) {
705     return false;
706   }
707
708   // Some heuristics for lbperiod
709   // If constant load or almost constant,
710   // then max * new_lb_period > avg * new_lb_period + lb_cost
711   double max = 0.0;
712   double avg = 0.0;
713   AdaptiveData data;
714   for (int i = 0; i < adaptive_lbdb.history_data.size(); i++) {
715     data = adaptive_lbdb.history_data[i];
716     max += data.max_load;
717     avg += data.avg_load;
718     CkPrintf("max (%d, %lf) avg (%d, %lf)\n", i, data.max_load, i, data.avg_load);
719   }
720 //  max /= (adaptive_struct.lb_no_iterations - adaptive_lbdb.history_data[0].iteration);
721 //  avg /= (adaptive_struct.lb_no_iterations - adaptive_lbdb.history_data[0].iteration);
722 //
723 //  adaptive_struct.lb_ideal_period = (adaptive_struct.lb_strategy_cost +
724 //  adaptive_struct.lb_migration_cost) / (max - avg);
725 //  CkPrintf("max : %lf, avg: %lf, strat cost: %lf, migration_cost: %lf, idealperiod : %d \n",
726 //      max, avg, adaptive_struct.lb_strategy_cost, adaptive_struct.lb_migration_cost, adaptive_struct.lb_ideal_period);
727 //
728
729   // If linearly varying load, then find lb_period
730   // area between the max and avg curve 
731   // If we can attain perfect balance, then the new load is close to the
732   // average. Hence we pass 1, else pass in some other value which would be the
733   // new max_load after load balancing.
734
735
736   return getPeriodForStrategy(1, 1, period);
737
738 //  int refine_period, scratch_period;
739 //  bool obtained_refine, obtained_scratch;
740 //  obtained_refine = getPeriodForStrategy(1, 1, refine_period);
741 //  obtained_scratch = getPeriodForStrategy(1, 1, scratch_period);
742 //
743 //  if (obtained_refine) {
744 //    if (!obtained_scratch) {
745 //      period = refine_period;
746 //      adaptive_struct.isRefine = true;
747 //      return true;
748 //    }
749 //    if (scratch_period < 1.1*refine_period) {
750 //      adaptive_struct.isRefine = false;
751 //      period = scratch_period;
752 //      return true;
753 //    }
754 //    period = refine_period;
755 //    adaptive_struct.isRefine = true;
756 //    return true;
757 //  }
758 //
759 //  if (obtained_scratch) {
760 //    period = scratch_period;
761 //    adaptive_struct.isRefine = false;
762 //    return true;
763 //  }
764 //  return false;
765 }
766
767 bool LBDatabase::getPeriodForStrategy(double new_load_percent, double overhead_percent, int& period) {
768   double mslope, aslope, mc, ac;
769   getLineEq(new_load_percent, aslope, ac, mslope, mc);
770   CkPrintf("\n max: %fx + %f; avg: %fx + %f\n", mslope, mc, aslope, ac);
771   double a = (mslope - aslope)/2;
772   double b = (mc - ac);
773   double c = -(adaptive_struct.lb_strategy_cost +
774       adaptive_struct.lb_migration_cost) * overhead_percent;
775   //c = -2.5;
776   bool got_period = getPeriodForLinear(a, b, c, period);
777   if (!got_period) {
778     return false;
779   }
780   
781   if (mslope < 0) {
782     if (period > (-mc/mslope)) {
783       CkPrintf("Max < 0 Period set when max load is -ve\n");
784       return false;
785     }
786   }
787
788   if (aslope < 0) {
789     if (period > (-ac/aslope)) {
790       CkPrintf("Avg < 0 Period set when avg load is -ve\n");
791       return false;
792     }
793   }
794
795   int intersection_t = (mc-ac) / (aslope - mslope);
796   if (intersection_t > 0 && period > intersection_t) {
797     CkPrintf("Avg | Max Period set when curves intersect\n");
798     return false;
799   }
800   return true;
801 }
802
803 bool LBDatabase::getPeriodForLinear(double a, double b, double c, int& period) {
804   CkPrintf("Quadratic Equation %lf X^2 + %lf X + %lf\n", a, b, c);
805   if (a == 0.0) {
806     period = (-c / b);
807     CkPrintf("Ideal period for linear load %d\n", period);
808     return true;
809   }
810   int x;
811   double t = (b * b) - (4*a*c);
812   if (t < 0) {
813     CkPrintf("(b * b) - (4*a*c) is -ve sqrt : %lf\n", sqrt(t));
814     return false;
815   }
816   t = (-b + sqrt(t)) / (2*a);
817   x = t;
818   if (x < 0) {
819     CkPrintf("boo!!! x (%d) < 0\n", x);
820     x = 0;
821     return false;
822   }
823   period = x;
824   CkPrintf("Ideal period for linear load %d\n", period);
825   return true;
826 }
827
828 bool LBDatabase::getLineEq(double new_load_percent, double& aslope, double& ac, double& mslope, double& mc) {
829   int total = adaptive_lbdb.history_data.size();
830   int iterations = 1 + adaptive_lbdb.history_data[total - 1].iteration -
831       adaptive_lbdb.history_data[0].iteration;
832   double a1 = 0;
833   double m1 = 0;
834   double a2 = 0;
835   double m2 = 0;
836   AdaptiveData data;
837   int i = 0;
838   for (i = 0; i < total/2; i++) {
839     data = adaptive_lbdb.history_data[i];
840     m1 += data.max_load;
841     a1 += data.avg_load;
842   }
843   m1 /= i;
844   a1 = (a1 * new_load_percent) / i;
845
846   for (i = total/2; i < total; i++) {
847     data = adaptive_lbdb.history_data[i];
848     m2 += data.max_load;
849     a2 += data.avg_load;
850   }
851   m2 /= (i - total/2);
852   a2 = (a2 * new_load_percent) / (i - total/2);
853
854   aslope = 2 * (a2 - a1) / iterations;
855   mslope = 2 * (m2 - m1) / iterations;
856   ac = adaptive_lbdb.history_data[0].avg_load * new_load_percent;
857   mc = adaptive_lbdb.history_data[0].max_load;
858   return true;
859 }
860
861 void LBDatabase::LoadBalanceDecision(int req_no, int period) {
862   if (req_no < adaptive_struct.lb_msg_recv_no) {
863     CkPrintf("Error!!! Received a request which was already sent or old\n");
864     return;
865   }
866   //CkPrintf("[%d] Load balance decision made cur iteration: %d period:%d state: %d\n",CkMyPe(), adaptive_struct.lb_no_iterations, period, local_state);
867   adaptive_struct.lb_ideal_period = period;
868   //local_state = ON;
869   adaptive_struct.lb_msg_recv_no = req_no;
870   thisProxy[0].ReceiveIterationNo(req_no, adaptive_struct.lb_no_iterations);
871 }
872
873 void LBDatabase::LoadBalanceDecisionFinal(int req_no, int period) {
874   if (req_no < adaptive_struct.lb_msg_recv_no) {
875     return;
876   }
877 //  CkPrintf("[%d] Final Load balance decision made cur iteration: %d period:%d \n",CkMyPe(), adaptive_struct.lb_no_iterations, period);
878   adaptive_struct.lb_ideal_period = period;
879   LDOMAdaptResumeSync(myLDHandle, period);
880
881 //  if (local_state == ON) {
882 //    local_state = DECIDED;
883 //    return;
884 //  }
885
886   // If the state is PAUSE, then its waiting for the final decision from central
887   // processor. If the decision is that the ideal period is in the future,
888   // resume. If the ideal period is now, then carry out load balancing.
889 //  if (local_state == PAUSE) {
890 //    if (adaptive_struct.lb_no_iterations < adaptive_struct.lb_ideal_period) {
891 //      local_state = DECIDED;
892 //      //SendMinStats();
893 //      //FIX ME!!! ResumeClients(0);
894 //    } else {
895 //      local_state = LOAD_BALANCE;
896 //      //FIX ME!!! ProcessAtSync();
897 //    }
898 //    return;
899 //  }
900 //  CkPrintf("Error!!! Final decision received but the state is invalid %d\n", local_state);
901 }
902
903
904 void LBDatabase::ReceiveIterationNo(int req_no, int local_iter_no) {
905   CmiAssert(CkMyPe() == 0);
906
907   adaptive_struct.global_recv_iter_counter++;
908   if (local_iter_no > adaptive_struct.global_max_iter_no) {
909     adaptive_struct.global_max_iter_no = local_iter_no;
910   }
911   if (CkNumPes() == adaptive_struct.global_recv_iter_counter) {
912     adaptive_struct.lb_ideal_period = (adaptive_struct.lb_ideal_period > adaptive_struct.global_max_iter_no) ? adaptive_struct.lb_ideal_period : adaptive_struct.global_max_iter_no + 1;
913     thisProxy.LoadBalanceDecisionFinal(req_no, adaptive_struct.lb_ideal_period);
914     CkPrintf("Final lb_period %d\n", adaptive_struct.lb_ideal_period);
915     adaptive_struct.in_progress = false;
916     adaptive_struct.global_max_iter_no = 0;
917     adaptive_struct.global_recv_iter_counter = 0;
918   }
919 }
920
921 int LBDatabase::getPredictedLBPeriod() {
922   return adaptive_struct.lb_ideal_period;
923 }
924
925 bool LBDatabase::isStrategyComm() {
926   return adaptive_struct.isCommLB;
927 }
928
929 /*
930   callable from user's code
931 */
932 void TurnManualLBOn()
933 {
934 #if CMK_LBDB_ON
935    LBDatabase * myLbdb = LBDatabase::Object();
936    if (myLbdb) {
937      myLbdb->TurnManualLBOn();
938    }
939    else {
940      LBDatabase::manualOn = 1;
941    }
942 #endif
943 }
944
945 void TurnManualLBOff()
946 {
947 #if CMK_LBDB_ON
948    LBDatabase * myLbdb = LBDatabase::Object();
949    if (myLbdb) {
950      myLbdb->TurnManualLBOff();
951    }
952    else {
953      LBDatabase::manualOn = 0;
954    }
955 #endif
956 }
957
958 extern "C" void LBTurnInstrumentOn() { 
959 #if CMK_LBDB_ON
960   if (CkpvAccess(lbdatabaseInited))
961     LBDatabase::Object()->CollectStatsOn(); 
962   else
963     _lb_args.statsOn() = 1;
964 #endif
965 }
966
967 extern "C" void LBTurnInstrumentOff() { 
968 #if CMK_LBDB_ON
969   if (CkpvAccess(lbdatabaseInited))
970     LBDatabase::Object()->CollectStatsOff(); 
971   else
972     _lb_args.statsOn() = 0;
973 #endif
974 }
975 void LBClearLoads() {
976 #if CMK_LBDB_ON
977   LBDatabase::Object()->ClearLoads(); 
978 #endif
979 }
980
981 void LBTurnPredictorOn(LBPredictorFunction *model) {
982 #if CMK_LBDB_ON
983   LBDatabase::Object()->PredictorOn(model);
984 #endif
985 }
986
987 void LBTurnPredictorOn(LBPredictorFunction *model, int wind) {
988 #if CMK_LBDB_ON
989   LBDatabase::Object()->PredictorOn(model, wind);
990 #endif
991 }
992
993 void LBTurnPredictorOff() {
994 #if CMK_LBDB_ON
995   LBDatabase::Object()->PredictorOff();
996 #endif
997 }
998
999 void LBChangePredictor(LBPredictorFunction *model) {
1000 #if CMK_LBDB_ON
1001   LBDatabase::Object()->ChangePredictor(model);
1002 #endif
1003 }
1004
1005 void LBSetPeriod(double second) {
1006 #if CMK_LBDB_ON
1007   if (CkpvAccess(lbdatabaseInited))
1008     LBDatabase::Object()->SetLBPeriod(second); 
1009   else
1010     _lb_args.lbperiod() = second;
1011 #endif
1012 }
1013
1014 #include "LBDatabase.def.h"
1015
1016 /*@}*/