Using remote/local instead of local/remote
[charm.git] / src / ck-ldb / LBDatabase.C
1 /**
2  * \addtogroup CkLdb
3 */
4 /*@{*/
5
6 #include "converse.h"
7
8 /*
9  * This C++ file contains the Charm stub functions
10  */
11
12 #include "LBDatabase.h"
13 #include "LBSimulation.h"
14 #include "topology.h"
15
16 #include "limits.h"
17
18 #include "NullLB.h"
19
20 #define VEC_SIZE 500
21 #define IMB_TOLERANCE 1.1
22 #define IDLE_LOAD_TOLERANCE 0.3
23
24 struct AdaptiveData {
25   int iteration;
26   double max_load;
27   double avg_load;
28   double max_idle_load_ratio;
29   double idle_time;
30 };
31
32 struct AdaptiveLBDatabase {
33   std::vector<AdaptiveData> history_data;
34 } adaptive_lbdb;
35
36 struct AdaptiveLBInfo {
37   double max_avg_ratio;
38   double remote_local_ratio;
39 };
40
41 struct AdaptiveLBStructure {
42   int lb_ideal_period;
43   int lb_calculated_period;
44   int lb_no_iterations;
45   int global_max_iter_no;
46   int global_recv_iter_counter;
47   bool in_progress;
48   double lb_strategy_cost;
49   double lb_migration_cost;
50   bool lb_period_informed;
51   bool doCommStrategy;
52   int lb_msg_send_no;
53   int lb_msg_recv_no;
54   int total_syncs_called;
55   int last_lb_type;
56   AdaptiveLBInfo greedy_info;
57   AdaptiveLBInfo refine_info;
58   AdaptiveLBInfo comm_info;
59   AdaptiveLBInfo comm_refine_info;
60 } adaptive_struct;
61
62
63 CkReductionMsg* lbDataCollection(int nMsg, CkReductionMsg** msgs) {
64   double lb_data[6];
65   lb_data[1] = 0.0;
66   lb_data[2] = 0.0;
67   lb_data[3] = 0.0;
68   lb_data[4] = 0.0;
69   lb_data[5] = 0.0;
70   for (int i = 0; i < nMsg; i++) {
71     CkAssert(msgs[i]->getSize() == 6*sizeof(double));
72     if (msgs[i]->getSize() != 6*sizeof(double)) {
73       CkPrintf("Error!!! Reduction not correct. Msg size is %d\n", msgs[i]->getSize());
74     }
75     double* m = (double *)msgs[i]->getData();
76     // Total count
77     lb_data[1] += m[1];
78     // Avg load
79     lb_data[2] += m[2];
80     // Max load
81     lb_data[3] = ((m[3] > lb_data[3])? m[3] : lb_data[3]);
82     // Avg idle
83     lb_data[4] += m[4];
84     // Max idle
85     lb_data[5] = ((m[5] > lb_data[5]) ? m[5] : lb_data[5]);
86     if (i == 0) {
87       // Iteration no
88       lb_data[0] = m[0];
89     }
90     if (m[0] != lb_data[0]) {
91       CkPrintf("Error!!! Reduction is intermingled between iteration %lf and\
92       %lf\n", lb_data[0], m[0]);
93     }
94   }
95   return CkReductionMsg::buildNew(6*sizeof(double), lb_data);
96 }
97
98 /*global*/ CkReduction::reducerType lbDataCollectionType;
99 /*initcall*/ void registerLBDataCollection(void) {
100   lbDataCollectionType = CkReduction::addReducer(lbDataCollection);
101 }
102
103 CkGroupID _lbdb;
104
105 CkpvDeclare(int, numLoadBalancers);  /**< num of lb created */
106 CkpvDeclare(int, hasNullLB);         /**< true if NullLB is created */
107 CkpvDeclare(int, lbdatabaseInited);  /**< true if lbdatabase is inited */
108
109 // command line options
110 CkLBArgs _lb_args;
111 int _lb_predict=0;
112 int _lb_predict_delay=10;
113 int _lb_predict_window=20;
114
115 // registry class stores all load balancers linked and created at runtime
116 class LBDBRegistry {
117 friend class LBDBInit;
118 friend class LBDatabase;
119 private:
120   // table for all available LBs linked in
121   struct LBDBEntry {
122     const char *name;
123     LBCreateFn  cfn;
124     LBAllocFn   afn;
125     const char *help;
126     int         shown;          // if 0, donot show in help page
127     LBDBEntry(): name(0), cfn(0), afn(0), help(0), shown(1) {}
128     LBDBEntry(int) {}
129     LBDBEntry(const char *n, LBCreateFn cf, LBAllocFn af, 
130               const char *h, int show=1):
131       name(n), cfn(cf), afn(af), help(h), shown(show) {};
132   };
133   CkVec<LBDBEntry> lbtables;            // a list of available LBs linked
134   CkVec<const char *>   compile_lbs;    // load balancers at compile time
135   CkVec<const char *>   runtime_lbs;    // load balancers at run time
136 public:
137   LBDBRegistry() {}
138   void displayLBs()
139   {
140     CmiPrintf("\nAvailable load balancers:\n");
141     for (int i=0; i<lbtables.length(); i++) {
142       LBDBEntry &entry = lbtables[i];
143       if (entry.shown) CmiPrintf("* %s: %s\n", entry.name, entry.help);
144     }
145     CmiPrintf("\n");
146   }
147   void addEntry(const char *name, LBCreateFn fn, LBAllocFn afn, const char *help, int shown) {
148     lbtables.push_back(LBDBEntry(name, fn, afn, help, shown));
149   }
150   void addCompiletimeBalancer(const char *name) {
151     compile_lbs.push_back(name); 
152   }
153   void addRuntimeBalancer(const char *name) {
154     runtime_lbs.push_back(name); 
155   }
156   LBCreateFn search(const char *name) {
157     char *ptr = strpbrk((char *)name, ":,");
158     int slen = ptr!=NULL?ptr-name:strlen(name);
159     for (int i=0; i<lbtables.length(); i++)
160       if (0==strncmp(name, lbtables[i].name, slen)) return lbtables[i].cfn;
161     return NULL;
162   }
163   LBAllocFn getLBAllocFn(const char *name) {
164     char *ptr = strpbrk((char *)name, ":,");
165     int slen = ptr-name;
166     for (int i=0; i<lbtables.length(); i++)
167       if (0==strncmp(name, lbtables[i].name, slen)) return lbtables[i].afn;
168     return NULL;
169   }
170 };
171
172 static LBDBRegistry lbRegistry;
173
174 void LBDefaultCreate(const char *lbname)
175 {
176   lbRegistry.addCompiletimeBalancer(lbname);
177 }
178
179 // default is to show the helper
180 void LBRegisterBalancer(const char *name, LBCreateFn fn, LBAllocFn afn, const char *help, int shown)
181 {
182   lbRegistry.addEntry(name, fn, afn, help, shown);
183 }
184
185 LBAllocFn getLBAllocFn(char *lbname) {
186     return lbRegistry.getLBAllocFn(lbname);
187 }
188
189 LBCreateFn getLBCreateFn(const char *lbname) {
190     return lbRegistry.search(lbname);
191 }
192 // create a load balancer group using the strategy name
193 static void createLoadBalancer(const char *lbname)
194 {
195     LBCreateFn fn = lbRegistry.search(lbname);
196     if (!fn) {    // invalid lb name
197       CmiPrintf("Abort: Unknown load balancer: '%s'!\n", lbname);
198       lbRegistry.displayLBs();    // display help page
199       CkAbort("Abort");
200     }
201     // invoke function to create load balancer 
202     fn();
203 }
204
205 // mainchare
206 LBDBInit::LBDBInit(CkArgMsg *m)
207 {
208 #if CMK_LBDB_ON
209   _lbdb = CProxy_LBDatabase::ckNew();
210
211   // runtime specified load balancer
212   if (lbRegistry.runtime_lbs.size() > 0) {
213     for (int i=0; i<lbRegistry.runtime_lbs.size(); i++) {
214       const char *balancer = lbRegistry.runtime_lbs[i];
215       createLoadBalancer(balancer);
216     }
217   }
218   else if (lbRegistry.compile_lbs.size() > 0) {
219     for (int i=0; i<lbRegistry.compile_lbs.size(); i++) {
220       const char* balancer = lbRegistry.compile_lbs[i];
221       createLoadBalancer(balancer);
222     }
223   }
224   else {
225     // NullLB is the default when none of above lb created
226     // note user may create his own load balancer in his code manually like
227     // in NAMD, but never mind NullLB can disable itself if there is 
228     // a non NULL LB.
229     createLoadBalancer("NullLB");
230   }
231
232   // simulation mode
233   if (LBSimulation::doSimulation) {
234     CmiPrintf("Charm++> Entering Load Balancer Simulation Mode ... \n");
235     CProxy_LBDatabase(_lbdb).ckLocalBranch()->StartLB();
236   }
237 #endif
238   delete m;
239 }
240
241 // called from init.C
242 void _loadbalancerInit()
243 {
244   CkpvInitialize(int, lbdatabaseInited);
245   CkpvAccess(lbdatabaseInited) = 0;
246   CkpvInitialize(int, numLoadBalancers);
247   CkpvAccess(numLoadBalancers) = 0;
248   CkpvInitialize(int, hasNullLB);
249   CkpvAccess(hasNullLB) = 0;
250
251   char **argv = CkGetArgv();
252   char *balancer = NULL;
253   CmiArgGroup("Charm++","Load Balancer");
254   while (CmiGetArgStringDesc(argv, "+balancer", &balancer, "Use this load balancer")) {
255     if (CkMyRank() == 0)                
256       lbRegistry.addRuntimeBalancer(balancer);   /* lbRegistry is a static */
257   }
258
259   // set up init value for LBPeriod time in seconds
260   // it can also be set by calling LDSetLBPeriod()
261   CmiGetArgDoubleDesc(argv,"+LBPeriod", &_lb_args.lbperiod(),"the minimum time period in seconds allowed for two consecutive automatic load balancing");
262   _lb_args.loop() = CmiGetArgFlagDesc(argv, "+LBLoop", "Use multiple load balancing strategies in loop");
263
264   // now called in cldb.c: CldModuleGeneralInit()
265   // registerLBTopos();
266   CmiGetArgStringDesc(argv, "+LBTopo", &_lbtopo, "define load balancing topology");
267   //Read the K parameter for RefineKLB
268   CmiGetArgIntDesc(argv, "+LBNumMoves", &_lb_args.percentMovesAllowed() , "Percentage of chares to be moved (used by RefineKLB) [0-100]");
269
270   /**************** FUTURE PREDICTOR ****************/
271   _lb_predict = CmiGetArgFlagDesc(argv, "+LBPredictor", "Turn on LB future predictor");
272   CmiGetArgIntDesc(argv, "+LBPredictorDelay", &_lb_predict_delay, "Number of balance steps before learning a model");
273   CmiGetArgIntDesc(argv, "+LBPredictorWindow", &_lb_predict_window, "Number of steps to use to learn a model");
274   if (_lb_predict_window < _lb_predict_delay) {
275     CmiPrintf("LB> [%d] Argument LBPredictorWindow (%d) less than LBPredictorDelay (%d) , fixing\n", CkMyPe(), _lb_predict_window, _lb_predict_delay);
276     _lb_predict_delay = _lb_predict_window;
277   }
278
279   /******************* SIMULATION *******************/
280   // get the step number at which to dump the LB database
281   CmiGetArgIntDesc(argv, "+LBVersion", &_lb_args.lbversion(), "LB database file version number");
282   CmiGetArgIntDesc(argv, "+LBCentPE", &_lb_args.central_pe(), "CentralLB processor");
283   int _lb_dump_activated = 0;
284   if (CmiGetArgIntDesc(argv, "+LBDump", &LBSimulation::dumpStep, "Dump the LB state from this step"))
285     _lb_dump_activated = 1;
286   if (_lb_dump_activated && LBSimulation::dumpStep < 0) {
287     CmiPrintf("LB> Argument LBDump (%d) negative, setting to 0\n",LBSimulation::dumpStep);
288     LBSimulation::dumpStep = 0;
289   }
290   CmiGetArgIntDesc(argv, "+LBDumpSteps", &LBSimulation::dumpStepSize, "Dump the LB state for this amount of steps");
291   if (LBSimulation::dumpStepSize <= 0) {
292     CmiPrintf("LB> Argument LBDumpSteps (%d) too small, setting to 1\n",LBSimulation::dumpStepSize);
293     LBSimulation::dumpStepSize = 1;
294   }
295   CmiGetArgStringDesc(argv, "+LBDumpFile", &LBSimulation::dumpFile, "Set the LB state file name");
296   // get the simulation flag and number. Now the flag can also be avoided by the presence of the number
297   LBSimulation::doSimulation = CmiGetArgIntDesc(argv, "+LBSim", &LBSimulation::simStep, "Read LB state from LBDumpFile since this step");
298   // check for stupid LBSim parameter
299   if (LBSimulation::doSimulation && LBSimulation::simStep < 0) {
300     CmiPrintf("LB> Argument LBSim (%d) invalid, should be >= 0\n");
301     CkExit();
302     return;
303   }
304   CmiGetArgIntDesc(argv, "+LBSimSteps", &LBSimulation::simStepSize, "Read LB state for this number of steps");
305   if (LBSimulation::simStepSize <= 0) {
306     CmiPrintf("LB> Argument LBSimSteps (%d) too small, setting to 1\n",LBSimulation::simStepSize);
307     LBSimulation::simStepSize = 1;
308   }
309
310
311   LBSimulation::simProcs = 0;
312   CmiGetArgIntDesc(argv, "+LBSimProcs", &LBSimulation::simProcs, "Number of target processors.");
313
314   LBSimulation::showDecisionsOnly = 
315     CmiGetArgFlagDesc(argv, "+LBShowDecisions",
316                       "Write to File: Load Balancing Object to Processor Map decisions during LB Simulation");
317
318   // force a global barrier after migration done
319   _lb_args.syncResume() = CmiGetArgFlagDesc(argv, "+LBSyncResume", 
320                   "LB performs a barrier after migration is finished");
321
322   // both +LBDebug and +LBDebug level should work
323   if (!CmiGetArgIntDesc(argv, "+LBDebug", &_lb_args.debug(), 
324                                           "Turn on LB debugging printouts"))
325     _lb_args.debug() = CmiGetArgFlagDesc(argv, "+LBDebug", 
326                                              "Turn on LB debugging printouts");
327
328   // getting the size of the team with +teamSize
329   if (!CmiGetArgIntDesc(argv, "+teamSize", &_lb_args.teamSize(), 
330                                           "Team size"))
331     _lb_args.teamSize() = 1;
332
333   // ask to print summary/quality of load balancer
334   _lb_args.printSummary() = CmiGetArgFlagDesc(argv, "+LBPrintSummary",
335                 "Print load balancing result summary");
336
337   // to ignore baclground load
338   _lb_args.ignoreBgLoad() = CmiGetArgFlagDesc(argv, "+LBNoBackground", 
339                       "Load balancer ignores the background load.");
340 #ifdef __BIGSIM__
341   _lb_args.ignoreBgLoad() = 1;
342 #endif
343   _lb_args.migObjOnly() = CmiGetArgFlagDesc(argv, "+LBObjOnly", 
344                       "Only load balancing migratable objects, ignoring all others.");
345   if (_lb_args.migObjOnly()) _lb_args.ignoreBgLoad() = 1;
346
347   // assume all CPUs are identical
348   _lb_args.testPeSpeed() = CmiGetArgFlagDesc(argv, "+LBTestPESpeed", 
349                       "Load balancer test all CPUs speed.");
350   _lb_args.samePeSpeed() = CmiGetArgFlagDesc(argv, "+LBSameCpus", 
351                       "Load balancer assumes all CPUs are of same speed.");
352   if (!_lb_args.testPeSpeed()) _lb_args.samePeSpeed() = 1;
353
354   _lb_args.useCpuTime() = CmiGetArgFlagDesc(argv, "+LBUseCpuTime", 
355                       "Load balancer uses CPU time instead of wallclock time.");
356
357   // turn instrumentation off at startup
358   _lb_args.statsOn() = !CmiGetArgFlagDesc(argv, "+LBOff",
359                         "Turn load balancer instrumentation off");
360
361   // turn instrumentation of communicatin off at startup
362   _lb_args.traceComm() = !CmiGetArgFlagDesc(argv, "+LBCommOff",
363                 "Turn load balancer instrumentation of communication off");
364
365   // set alpha and beeta
366   _lb_args.alpha() = PER_MESSAGE_SEND_OVERHEAD_DEFAULT;
367   _lb_args.beeta() = PER_BYTE_SEND_OVERHEAD_DEFAULT;
368   CmiGetArgDoubleDesc(argv,"+LBAlpha", &_lb_args.alpha(),
369                            "per message send overhead");
370   CmiGetArgDoubleDesc(argv,"+LBBeta", &_lb_args.beeta(),
371                            "per byte send overhead");
372
373   if (CkMyPe() == 0) {
374     if (_lb_args.debug()) {
375       CmiPrintf("CharmLB> Verbose level %d, load balancing period: %g seconds\n", _lb_args.debug(), _lb_args.lbperiod());
376     }
377     if (_lb_args.debug() > 1) {
378       CmiPrintf("CharmLB> Topology %s alpha: %es beta: %es.\n", _lbtopo, _lb_args.alpha(), _lb_args.beeta());
379     }
380     if (_lb_args.printSummary())
381       CmiPrintf("CharmLB> Load balancer print summary of load balancing result.\n");
382     if (_lb_args.ignoreBgLoad())
383       CmiPrintf("CharmLB> Load balancer ignores processor background load.\n");
384     if (_lb_args.samePeSpeed())
385       CmiPrintf("CharmLB> Load balancer assumes all CPUs are same.\n");
386     if (_lb_args.useCpuTime())
387       CmiPrintf("CharmLB> Load balancer uses CPU time instead of wallclock time.\n");
388     if (LBSimulation::doSimulation)
389       CmiPrintf("CharmLB> Load balancer running in simulation mode on file '%s' version %d.\n", LBSimulation::dumpFile, _lb_args.lbversion());
390     if (_lb_args.statsOn()==0)
391       CkPrintf("CharmLB> Load balancing instrumentation is off.\n");
392     if (_lb_args.traceComm()==0)
393       CkPrintf("CharmLB> Load balancing instrumentation for communication is off.\n");
394     if (_lb_args.migObjOnly())
395       CkPrintf("LB> Load balancing strategy ignores non-migratable objects.\n");
396   }
397 }
398
399 int LBDatabase::manualOn = 0;
400 char *LBDatabase::avail_vector = NULL;
401 CmiNodeLock avail_vector_lock;
402
403 static LBRealType * _expectedLoad = NULL;
404
405 void LBDatabase::initnodeFn()
406 {
407   int proc;
408   int num_proc = CkNumPes();
409   avail_vector= new char[num_proc];
410   for(proc = 0; proc < num_proc; proc++)
411       avail_vector[proc] = 1;
412   avail_vector_lock = CmiCreateLock();
413
414   _expectedLoad = new LBRealType[num_proc];
415   for (proc=0; proc<num_proc; proc++) _expectedLoad[proc]=0.0;
416 }
417
418 // called my constructor
419 void LBDatabase::init(void) 
420 {
421   //thisProxy = CProxy_LBDatabase(thisgroup);
422   myLDHandle = LDCreate();
423   mystep = 0;
424   nloadbalancers = 0;
425   new_ld_balancer = 0;
426
427   CkpvAccess(lbdatabaseInited) = 1;
428 #if CMK_LBDB_ON
429   if (manualOn) TurnManualLBOn();
430 #endif
431   
432   max_load_vec.resize(VEC_SIZE, 0.0);
433   total_load_vec.resize(VEC_SIZE, 0.0);
434   total_contrib_vec.resize(VEC_SIZE, 0.0);
435   max_iteration = -1;
436
437   // If metabalancer enabled, initialize the variables
438   adaptive_struct.lb_ideal_period =  INT_MAX;
439   adaptive_struct.lb_calculated_period = INT_MAX;
440   adaptive_struct.lb_no_iterations = -1;
441   adaptive_struct.global_max_iter_no = 0;
442   adaptive_struct.global_recv_iter_counter = 0;
443   adaptive_struct.in_progress = false;
444   adaptive_struct.lb_strategy_cost = 0.0;
445   adaptive_struct.lb_migration_cost = 0.0;
446   adaptive_struct.lb_msg_send_no = 0;
447   adaptive_struct.lb_msg_recv_no = 0;
448   adaptive_struct.total_syncs_called = 0;
449   adaptive_struct.last_lb_type = -1;
450
451   is_prev_lb_refine = -1;
452 }
453
454 LBDatabase::LastLBInfo::LastLBInfo()
455 {
456   expectedLoad = _expectedLoad;
457 }
458
459 void LBDatabase::get_avail_vector(char * bitmap) {
460     CmiAssert(bitmap && avail_vector);
461     const int num_proc = CkNumPes();
462     for(int proc = 0; proc < num_proc; proc++){
463       bitmap[proc] = avail_vector[proc];
464     }
465 }
466
467 // new_ld == -1(default) : calcualte a new ld
468 //           -2 : ignore new ld
469 //           >=0: given a new ld
470 void LBDatabase::set_avail_vector(char * bitmap, int new_ld){
471     int assigned = 0;
472     const int num_proc = CkNumPes();
473     if (new_ld == -2) assigned = 1;
474     else if (new_ld >= 0) {
475       CmiAssert(new_ld < num_proc);
476       new_ld_balancer = new_ld;
477       assigned = 1;
478     }
479     CmiAssert(bitmap && avail_vector);
480     for(int count = 0; count < num_proc; count++){
481         avail_vector[count] = bitmap[count];
482         if((bitmap[count] == 1) && !assigned){
483             new_ld_balancer = count;
484             assigned = 1;
485         }
486     }
487 }
488
489 // called in CreateFooLB() when multiple load balancers are created
490 // on PE0, BaseLB of each load balancer applies a ticket number
491 // and broadcast the ticket number to all processors
492 int LBDatabase::getLoadbalancerTicket()  { 
493   int seq = nloadbalancers;
494   nloadbalancers ++;
495   loadbalancers.resize(nloadbalancers); 
496   loadbalancers[seq] = NULL;
497   return seq; 
498 }
499
500 void LBDatabase::addLoadbalancer(BaseLB *lb, int seq) {
501 //  CmiPrintf("[%d] addLoadbalancer for seq %d\n", CkMyPe(), seq);
502   if (seq == -1) return;
503   if (CkMyPe() == 0) {
504     CmiAssert(seq < nloadbalancers);
505     if (loadbalancers[seq]) {
506       CmiPrintf("Duplicate load balancer created at %d\n", seq);
507       CmiAbort("LBDatabase");
508     }
509   }
510   else
511     nloadbalancers ++;
512   loadbalancers.resize(seq+1);
513   loadbalancers[seq] = lb;
514 }
515
516 // switch strategy in order
517 void LBDatabase::nextLoadbalancer(int seq) {
518   if (seq == -1) return;                // -1 means this is the only LB
519   int next = seq+1;
520   if (_lb_args.loop()) {
521     if (next == nloadbalancers) next = 0;
522   }
523   else {
524     if (next == nloadbalancers) next --;  // keep using the last one
525   }
526   if (seq != next) {
527     loadbalancers[seq]->turnOff();
528     CmiAssert(loadbalancers[next]);
529     loadbalancers[next]->turnOn();
530   }
531 }
532
533 // return the seq-th load balancer string name of
534 // it can be specified in either compile time or runtime
535 // runtime has higher priority
536 const char *LBDatabase::loadbalancer(int seq) {
537   if (lbRegistry.runtime_lbs.length()) {
538     CmiAssert(seq < lbRegistry.runtime_lbs.length());
539     return lbRegistry.runtime_lbs[seq];
540   }
541   else {
542     CmiAssert(seq < lbRegistry.compile_lbs.length());
543     return lbRegistry.compile_lbs[seq];
544   }
545 }
546
547 void LBDatabase::pup(PUP::er& p)
548
549         IrrGroup::pup(p); 
550         // the memory should be already allocated
551         int np;
552         if (!p.isUnpacking()) np = CkNumPes();
553         p|np;
554         CmiAssert(avail_vector);
555         // in case number of processors changes
556         if (p.isUnpacking() && np > CkNumPes()) {
557                 CmiLock(avail_vector_lock);
558                 delete [] avail_vector;
559                 avail_vector = new char[np];
560                 for (int i=0; i<np; i++) avail_vector[i] = 1;
561                 CmiUnlock(avail_vector_lock);
562         }
563         p(avail_vector, np);
564         p|mystep;
565         if(p.isUnpacking()) nloadbalancers = 0;
566 }
567
568
569 void LBDatabase::EstObjLoad(const LDObjHandle &_h, double cputime)
570 {
571 #if CMK_LBDB_ON
572   LBDB *const db = (LBDB*)(_h.omhandle.ldb.handle);
573   LBObj *const obj = db->LbObj(_h);
574
575   CmiAssert(obj != NULL);
576   obj->setTiming(cputime);
577 #endif
578 }
579
580 void LBDatabase::ResumeClients() {
581   // If metabalancer enabled, initialize the variables
582   adaptive_lbdb.history_data.clear();
583
584   adaptive_struct.lb_ideal_period =  INT_MAX;
585   adaptive_struct.lb_calculated_period = INT_MAX;
586   adaptive_struct.lb_no_iterations = -1;
587   adaptive_struct.global_max_iter_no = 0;
588   adaptive_struct.global_recv_iter_counter = 0;
589   adaptive_struct.in_progress = false;
590   adaptive_struct.lb_strategy_cost = 0.0;
591   adaptive_struct.lb_migration_cost = 0.0;
592   adaptive_struct.lb_msg_send_no = 0;
593   adaptive_struct.lb_msg_recv_no = 0;
594   adaptive_struct.total_syncs_called = 0;
595   
596   max_load_vec.clear();
597   total_load_vec.clear();
598   total_contrib_vec.clear();
599
600   max_load_vec.resize(VEC_SIZE, 0.0);
601   total_load_vec.resize(VEC_SIZE, 0.0);
602   total_contrib_vec.resize(VEC_SIZE, 0.0);
603
604   LDResumeClients(myLDHandle);
605 }
606
607 bool LBDatabase::AddLoad(int iteration, double load) {
608   total_contrib_vec[iteration]++;
609   adaptive_struct.total_syncs_called++;
610   //CkPrintf("At PE %d Total contribution for iteration %d is %lf total objs %d\n", CkMyPe(), iteration,
611   //total_contrib_vec[iteration], getLBDB()->ObjDataCount());
612
613   if (iteration > adaptive_struct.lb_no_iterations) {
614     adaptive_struct.lb_no_iterations = iteration;
615   }
616   total_load_vec[iteration] += load;
617  // if (max_load_vec[iteration] < load) {
618  //   max_load_vec[iteration] = load;
619  // }
620   if (total_contrib_vec[iteration] == getLBDB()->ObjDataCount()) {
621     double idle_time;
622     IdleTime(&idle_time);
623     //CkPrintf("[%d] Idle time %lf for iteration %d\n", CkMyPe(), idle_time, iteration);
624     // Skips the 0th iteration collection of stats hence...
625     idle_time = idle_time * getLBDB()->ObjDataCount() /
626        (adaptive_struct.total_syncs_called + getLBDB()->ObjDataCount());
627
628     double lb_data[6];
629     lb_data[0] = iteration;
630     lb_data[1] = 1;
631     lb_data[2] = total_load_vec[iteration];
632     //lb_data[2] = max_load_vec[iteration];
633     lb_data[3] = total_load_vec[iteration];
634     //lb_data[3] = getLBDB()->ObjDataCount();
635     lb_data[4] = idle_time;
636     if (total_load_vec[iteration] == 0.0) {
637       lb_data[5] = idle_time;
638     } else {
639       lb_data[5] = idle_time/total_load_vec[iteration];
640     }
641
642    // CkPrintf("[%d] sends total load %lf idle time %lf ratio of idle/load %lf at iter %d\n", CkMyPe(),
643    //     total_load_vec[iteration], idle_time,
644    //     idle_time/total_load_vec[iteration], adaptive_struct.lb_no_iterations);
645
646     CkCallback cb(CkIndex_LBDatabase::ReceiveMinStats((CkReductionMsg*)NULL), thisProxy[0]);
647     contribute(6*sizeof(double), lb_data, lbDataCollectionType, cb);
648   }
649   return true;
650 }
651
652 void LBDatabase::ReceiveMinStats(CkReductionMsg *msg) {
653   double* load = (double *) msg->getData();
654   double avg = load[2]/load[1];
655   double max = load[3];
656   double avg_idle = load[4]/load[1];
657   double max_idle_load_ratio = load[5];
658   int iteration_n = load[0];
659   CkPrintf("** [%d] Iteration Avg load: %lf Max load: %lf Avg Idle : %lf Max Idle : %lf for %lf procs\n",iteration_n, avg, max, avg_idle, max_idle_load_ratio, load[1]);
660   delete msg;
661  
662   // Store the data for this iteration
663   adaptive_struct.lb_no_iterations = iteration_n;
664   AdaptiveData data;
665   data.iteration = adaptive_struct.lb_no_iterations;
666   data.max_load = max;
667   data.avg_load = avg;
668   data.max_idle_load_ratio = max_idle_load_ratio;
669   data.idle_time = avg_idle;
670   adaptive_lbdb.history_data.push_back(data);
671
672   // If lb period inform is in progress, dont inform again
673   if (adaptive_struct.in_progress) {
674     return;
675   }
676
677 //  if (adaptive_struct.lb_period_informed) {
678 //    return;
679 //  }
680
681   // If the max/avg ratio is greater than the threshold and also this is not the
682   // step immediately after load balancing, carry out load balancing
683   //if (max/avg >= 1.1 && adaptive_lbdb.history_data.size() > 4) {
684   int tmp1;
685   double tmp2, tmp3;
686   GetPrevLBData(tmp1, tmp2, tmp3);
687   double tolerate_imb = IMB_TOLERANCE * tmp2;
688
689   CkPrintf("Prev LB Data Type %d, max/avg %lf, local/remote %lf\n", tmp1, tmp2, tmp3);
690
691   if ((max_idle_load_ratio >= IDLE_LOAD_TOLERANCE || max/avg >= tolerate_imb) && adaptive_lbdb.history_data.size() > 4) {
692     CkPrintf("Carry out load balancing step at iter max/avg(%lf) and max_idle_load_ratio ratio (%lf)\n", max/avg, max_idle_load_ratio);
693 //    if (!adaptive_struct.lb_period_informed) {
694 //      // Just for testing
695 //      adaptive_struct.lb_calculated_period = 40;
696 //      adaptive_struct.lb_period_informed = true;
697 //      thisProxy.LoadBalanceDecision(adaptive_struct.lb_calculated_period);
698 //      return;
699 //    }
700
701
702
703     // If the new lb period is less than current set lb period
704     if (adaptive_struct.lb_calculated_period > iteration_n + 1) {
705       if (max/avg < tolerate_imb) {
706         adaptive_struct.doCommStrategy = true;
707         CkPrintf("No load imbalance but idle time\n");
708       } else {
709         adaptive_struct.doCommStrategy = false;
710         CkPrintf("load imbalance \n");
711       }
712       adaptive_struct.lb_calculated_period = iteration_n + 1;
713       adaptive_struct.lb_period_informed = true;
714       adaptive_struct.in_progress = true;
715       CkPrintf("Informing everyone the lb period is %d\n",
716           adaptive_struct.lb_calculated_period);
717       thisProxy.LoadBalanceDecision(adaptive_struct.lb_msg_send_no++, adaptive_struct.lb_calculated_period);
718     }
719     return;
720   }
721
722   // Generate the plan for the adaptive strategy
723   int period;
724   if (generatePlan(period)) {
725     //CkPrintf("Carry out load balancing step at iter\n");
726
727     // If the new lb period is less than current set lb period
728     if (adaptive_struct.lb_calculated_period > period) {
729       adaptive_struct.doCommStrategy = false;
730       adaptive_struct.lb_calculated_period = period;
731       adaptive_struct.in_progress = true;
732       adaptive_struct.lb_period_informed = true;
733       CkPrintf("Informing everyone the lb period is %d\n",
734           adaptive_struct.lb_calculated_period);
735       thisProxy.LoadBalanceDecision(adaptive_struct.lb_msg_send_no++, adaptive_struct.lb_calculated_period);
736     }
737   }
738 }
739
740 bool LBDatabase::generatePlan(int& period) {
741   if (adaptive_lbdb.history_data.size() <= 8) {
742     return false;
743   }
744
745   // Some heuristics for lbperiod
746   // If constant load or almost constant,
747   // then max * new_lb_period > avg * new_lb_period + lb_cost
748   double max = 0.0;
749   double avg = 0.0;
750   AdaptiveData data;
751   for (int i = 0; i < adaptive_lbdb.history_data.size(); i++) {
752     data = adaptive_lbdb.history_data[i];
753     max += data.max_load;
754     avg += data.avg_load;
755     CkPrintf("max (%d, %lf) avg (%d, %lf)\n", i, data.max_load, i, data.avg_load);
756   }
757 //  max /= (adaptive_struct.lb_no_iterations - adaptive_lbdb.history_data[0].iteration);
758 //  avg /= (adaptive_struct.lb_no_iterations - adaptive_lbdb.history_data[0].iteration);
759 //
760 //  adaptive_struct.lb_ideal_period = (adaptive_struct.lb_strategy_cost +
761 //  adaptive_struct.lb_migration_cost) / (max - avg);
762 //  CkPrintf("max : %lf, avg: %lf, strat cost: %lf, migration_cost: %lf, idealperiod : %d \n",
763 //      max, avg, adaptive_struct.lb_strategy_cost, adaptive_struct.lb_migration_cost, adaptive_struct.lb_ideal_period);
764 //
765
766   // If linearly varying load, then find lb_period
767   // area between the max and avg curve 
768   // If we can attain perfect balance, then the new load is close to the
769   // average. Hence we pass 1, else pass in some other value which would be the
770   // new max_load after load balancing.
771   int tmp1;
772   double tmp2, tmp3;
773   GetPrevLBData(tmp1, tmp2, tmp3);
774   
775   double tolerate_imb = tmp2;
776   if (max/avg < tolerate_imb) {
777     tolerate_imb = 1.0;
778   }
779
780   return getPeriodForStrategy(tolerate_imb, 1, period);
781
782 //  int refine_period, scratch_period;
783 //  bool obtained_refine, obtained_scratch;
784 //  obtained_refine = getPeriodForStrategy(1, 1, refine_period);
785 //  obtained_scratch = getPeriodForStrategy(1, 1, scratch_period);
786 //
787 //  if (obtained_refine) {
788 //    if (!obtained_scratch) {
789 //      period = refine_period;
790 //      adaptive_struct.isRefine = true;
791 //      return true;
792 //    }
793 //    if (scratch_period < 1.1*refine_period) {
794 //      adaptive_struct.isRefine = false;
795 //      period = scratch_period;
796 //      return true;
797 //    }
798 //    period = refine_period;
799 //    adaptive_struct.isRefine = true;
800 //    return true;
801 //  }
802 //
803 //  if (obtained_scratch) {
804 //    period = scratch_period;
805 //    adaptive_struct.isRefine = false;
806 //    return true;
807 //  }
808 //  return false;
809 }
810
811 bool LBDatabase::getPeriodForStrategy(double new_load_percent, double overhead_percent, int& period) {
812   double mslope, aslope, mc, ac;
813   getLineEq(new_load_percent, aslope, ac, mslope, mc);
814   CkPrintf("new load percent %lf\n", new_load_percent);
815   CkPrintf("\n max: %fx + %f; avg: %fx + %f\n", mslope, mc, aslope, ac);
816   double a = (mslope - aslope)/2;
817   double b = (mc - ac);
818   double c = -(adaptive_struct.lb_strategy_cost +
819       adaptive_struct.lb_migration_cost) * overhead_percent;
820   //c = -2.5;
821   bool got_period = getPeriodForLinear(a, b, c, period);
822   if (!got_period) {
823     return false;
824   }
825   
826   if (mslope < 0) {
827     if (period > (-mc/mslope)) {
828       CkPrintf("Max < 0 Period set when max load is -ve\n");
829       return false;
830     }
831   }
832
833   if (aslope < 0) {
834     if (period > (-ac/aslope)) {
835       CkPrintf("Avg < 0 Period set when avg load is -ve\n");
836       return false;
837     }
838   }
839
840   int intersection_t = (mc-ac) / (aslope - mslope);
841   if (intersection_t > 0 && period > intersection_t) {
842     CkPrintf("Avg | Max Period set when curves intersect\n");
843     return false;
844   }
845   return true;
846 }
847
848 bool LBDatabase::getPeriodForLinear(double a, double b, double c, int& period) {
849   CkPrintf("Quadratic Equation %lf X^2 + %lf X + %lf\n", a, b, c);
850   if (a == 0.0) {
851     period = (-c / b);
852     CkPrintf("Ideal period for linear load %d\n", period);
853     return true;
854   }
855   int x;
856   double t = (b * b) - (4*a*c);
857   if (t < 0) {
858     CkPrintf("(b * b) - (4*a*c) is -ve sqrt : %lf\n", sqrt(t));
859     return false;
860   }
861   t = (-b + sqrt(t)) / (2*a);
862   x = t;
863   if (x < 0) {
864     CkPrintf("boo!!! x (%d) < 0\n", x);
865     x = 0;
866     return false;
867   }
868   period = x;
869   CkPrintf("Ideal period for linear load %d\n", period);
870   return true;
871 }
872
873 bool LBDatabase::getLineEq(double new_load_percent, double& aslope, double& ac, double& mslope, double& mc) {
874   int total = adaptive_lbdb.history_data.size();
875   int iterations = 1 + adaptive_lbdb.history_data[total - 1].iteration -
876       adaptive_lbdb.history_data[0].iteration;
877   double a1 = 0;
878   double m1 = 0;
879   double a2 = 0;
880   double m2 = 0;
881   AdaptiveData data;
882   int i = 0;
883   for (i = 0; i < total/2; i++) {
884     data = adaptive_lbdb.history_data[i];
885     m1 += data.max_load;
886     a1 += data.avg_load;
887   }
888   m1 /= i;
889   a1 = (a1 * new_load_percent) / i;
890
891   for (i = total/2; i < total; i++) {
892     data = adaptive_lbdb.history_data[i];
893     m2 += data.max_load;
894     a2 += data.avg_load;
895   }
896   m2 /= (i - total/2);
897   a2 = (a2 * new_load_percent) / (i - total/2);
898
899   aslope = 2 * (a2 - a1) / iterations;
900   mslope = 2 * (m2 - m1) / iterations;
901   ac = adaptive_lbdb.history_data[0].avg_load * new_load_percent;
902   mc = adaptive_lbdb.history_data[0].max_load;
903
904   //ac = (adaptive_lbdb.history_data[1].avg_load * new_load_percent - aslope);
905   //mc = (adaptive_lbdb.history_data[1].max_load - mslope);
906
907   return true;
908 }
909
910 void LBDatabase::LoadBalanceDecision(int req_no, int period) {
911   if (req_no < adaptive_struct.lb_msg_recv_no) {
912     CkPrintf("Error!!! Received a request which was already sent or old\n");
913     return;
914   }
915   //CkPrintf("[%d] Load balance decision made cur iteration: %d period:%d state: %d\n",CkMyPe(), adaptive_struct.lb_no_iterations, period, local_state);
916   adaptive_struct.lb_ideal_period = period;
917   //local_state = ON;
918   adaptive_struct.lb_msg_recv_no = req_no;
919   thisProxy[0].ReceiveIterationNo(req_no, adaptive_struct.lb_no_iterations);
920 }
921
922 void LBDatabase::LoadBalanceDecisionFinal(int req_no, int period) {
923   if (req_no < adaptive_struct.lb_msg_recv_no) {
924     return;
925   }
926 //  CkPrintf("[%d] Final Load balance decision made cur iteration: %d period:%d \n",CkMyPe(), adaptive_struct.lb_no_iterations, period);
927   adaptive_struct.lb_ideal_period = period;
928   LDOMAdaptResumeSync(myLDHandle, period);
929
930 //  if (local_state == ON) {
931 //    local_state = DECIDED;
932 //    return;
933 //  }
934
935   // If the state is PAUSE, then its waiting for the final decision from central
936   // processor. If the decision is that the ideal period is in the future,
937   // resume. If the ideal period is now, then carry out load balancing.
938 //  if (local_state == PAUSE) {
939 //    if (adaptive_struct.lb_no_iterations < adaptive_struct.lb_ideal_period) {
940 //      local_state = DECIDED;
941 //      //SendMinStats();
942 //      //FIX ME!!! ResumeClients(0);
943 //    } else {
944 //      local_state = LOAD_BALANCE;
945 //      //FIX ME!!! ProcessAtSync();
946 //    }
947 //    return;
948 //  }
949 //  CkPrintf("Error!!! Final decision received but the state is invalid %d\n", local_state);
950 }
951
952
953 void LBDatabase::ReceiveIterationNo(int req_no, int local_iter_no) {
954   CmiAssert(CkMyPe() == 0);
955
956   adaptive_struct.global_recv_iter_counter++;
957   if (local_iter_no > adaptive_struct.global_max_iter_no) {
958     adaptive_struct.global_max_iter_no = local_iter_no;
959   }
960   if (CkNumPes() == adaptive_struct.global_recv_iter_counter) {
961     adaptive_struct.lb_ideal_period = (adaptive_struct.lb_ideal_period > adaptive_struct.global_max_iter_no) ? adaptive_struct.lb_ideal_period : adaptive_struct.global_max_iter_no + 1;
962     thisProxy.LoadBalanceDecisionFinal(req_no, adaptive_struct.lb_ideal_period);
963     CkPrintf("Final lb_period %d\n", adaptive_struct.lb_ideal_period);
964     adaptive_struct.in_progress = false;
965     adaptive_struct.global_max_iter_no = 0;
966     adaptive_struct.global_recv_iter_counter = 0;
967   }
968 }
969
970 int LBDatabase::getPredictedLBPeriod() {
971   return adaptive_struct.lb_ideal_period;
972 }
973
974 bool LBDatabase::isStrategyComm() {
975   return adaptive_struct.doCommStrategy;
976 }
977
978 void LBDatabase::SetMigrationCost(double lb_migration_cost) {
979   adaptive_struct.lb_migration_cost = lb_migration_cost;
980 }
981
982 void LBDatabase::SetStrategyCost(double lb_strategy_cost) {
983   adaptive_struct.lb_strategy_cost = lb_strategy_cost;
984 }
985
986 void LBDatabase::UpdateAfterLBData(int lb, double lb_max, double lb_avg, double
987     local_comm, double remote_comm) {
988   adaptive_struct.last_lb_type = lb;
989   if (lb == 0) {
990     adaptive_struct.greedy_info.max_avg_ratio = lb_max/lb_avg;
991   } else if (lb == 1) {
992     adaptive_struct.refine_info.max_avg_ratio = lb_max/lb_avg;
993   } else if (lb == 2) {
994     adaptive_struct.comm_info.remote_local_ratio = remote_comm/local_comm;
995   } else if (lb == 3) {
996     adaptive_struct.comm_refine_info.remote_local_ratio =
997     remote_comm/local_comm;
998   }
999 }
1000
1001 void LBDatabase::GetPrevLBData(int& lb_type, double& lb_max_avg_ratio, double&
1002     remote_local_comm_ratio) {
1003   lb_type = adaptive_struct.last_lb_type;
1004   lb_max_avg_ratio = 1;
1005   remote_local_comm_ratio = 1;
1006   GetLBDataForLB(lb_type, lb_max_avg_ratio, remote_local_comm_ratio);
1007 }
1008
1009 void LBDatabase::GetLBDataForLB(int lb_type, double& lb_max_avg_ratio, double&
1010     remote_local_comm_ratio) {
1011   if (lb_type == 0) {
1012     lb_max_avg_ratio = adaptive_struct.greedy_info.max_avg_ratio;
1013   } else if (lb_type == 1) {
1014     lb_max_avg_ratio = adaptive_struct.refine_info.max_avg_ratio;
1015   } else if (lb_type == 2) {
1016     remote_local_comm_ratio = adaptive_struct.comm_info.remote_local_ratio;
1017   } else if (lb_type == 3) {
1018     remote_local_comm_ratio =
1019        adaptive_struct.comm_refine_info.remote_local_ratio;
1020   }
1021 }
1022
1023 /*
1024   callable from user's code
1025 */
1026 void TurnManualLBOn()
1027 {
1028 #if CMK_LBDB_ON
1029    LBDatabase * myLbdb = LBDatabase::Object();
1030    if (myLbdb) {
1031      myLbdb->TurnManualLBOn();
1032    }
1033    else {
1034      LBDatabase::manualOn = 1;
1035    }
1036 #endif
1037 }
1038
1039 void TurnManualLBOff()
1040 {
1041 #if CMK_LBDB_ON
1042    LBDatabase * myLbdb = LBDatabase::Object();
1043    if (myLbdb) {
1044      myLbdb->TurnManualLBOff();
1045    }
1046    else {
1047      LBDatabase::manualOn = 0;
1048    }
1049 #endif
1050 }
1051
1052 extern "C" void LBTurnInstrumentOn() { 
1053 #if CMK_LBDB_ON
1054   if (CkpvAccess(lbdatabaseInited))
1055     LBDatabase::Object()->CollectStatsOn(); 
1056   else
1057     _lb_args.statsOn() = 1;
1058 #endif
1059 }
1060
1061 extern "C" void LBTurnInstrumentOff() { 
1062 #if CMK_LBDB_ON
1063   if (CkpvAccess(lbdatabaseInited))
1064     LBDatabase::Object()->CollectStatsOff(); 
1065   else
1066     _lb_args.statsOn() = 0;
1067 #endif
1068 }
1069 void LBClearLoads() {
1070 #if CMK_LBDB_ON
1071   LBDatabase::Object()->ClearLoads(); 
1072 #endif
1073 }
1074
1075 void LBTurnPredictorOn(LBPredictorFunction *model) {
1076 #if CMK_LBDB_ON
1077   LBDatabase::Object()->PredictorOn(model);
1078 #endif
1079 }
1080
1081 void LBTurnPredictorOn(LBPredictorFunction *model, int wind) {
1082 #if CMK_LBDB_ON
1083   LBDatabase::Object()->PredictorOn(model, wind);
1084 #endif
1085 }
1086
1087 void LBTurnPredictorOff() {
1088 #if CMK_LBDB_ON
1089   LBDatabase::Object()->PredictorOff();
1090 #endif
1091 }
1092
1093 void LBChangePredictor(LBPredictorFunction *model) {
1094 #if CMK_LBDB_ON
1095   LBDatabase::Object()->ChangePredictor(model);
1096 #endif
1097 }
1098
1099 void LBSetPeriod(double second) {
1100 #if CMK_LBDB_ON
1101   if (CkpvAccess(lbdatabaseInited))
1102     LBDatabase::Object()->SetLBPeriod(second); 
1103   else
1104     _lb_args.lbperiod() = second;
1105 #endif
1106 }
1107
1108 #include "LBDatabase.def.h"
1109
1110 /*@}*/