Changes to AdaptiveLB strategy
[charm.git] / src / ck-ldb / CentralLB.h
1 /**
2  * \addtogroup CkLdb
3 */
4 /*@{*/
5
6 #ifndef CENTRALLB_H
7 #define CENTRALLB_H
8
9 #include "BaseLB.h"
10 #include "CentralLB.decl.h"
11
12 extern CkGroupID loadbalancer;
13
14 void CreateCentralLB();
15
16 class CLBStatsMsg;
17 class LBSimulation;
18
19 /// for backward compatibility
20 typedef LBMigrateMsg  CLBMigrateMsg;
21
22 class LBInfo
23 {
24 public:
25   LBRealType *peLoads;  // total load: object + background
26   LBRealType *objLoads;         // total obj load
27   LBRealType *comLoads;         // total comm load
28   LBRealType *bgLoads;  // background load
29   int    numPes;
30   int    msgCount;      // total non-local communication
31   CmiUInt8  msgBytes;   // total non-local communication
32   LBRealType minObjLoad, maxObjLoad;
33   LBInfo(): peLoads(NULL), objLoads(NULL), comLoads(NULL), 
34             bgLoads(NULL), numPes(0), msgCount(0),
35             msgBytes(0), minObjLoad(0.0), maxObjLoad(0.0) {}
36   LBInfo(LBRealType *pl, int count): peLoads(pl), objLoads(NULL), 
37             comLoads(NULL), bgLoads(NULL), numPes(count), msgCount(0),
38             msgBytes(0), minObjLoad(0.0), maxObjLoad(0.0) {}
39   LBInfo(int count);
40   ~LBInfo();
41   void getInfo(BaseLB::LDStats* stats, int count, int considerComm);
42   void clear();
43   void print();
44   void getSummary(LBRealType &maxLoad, LBRealType &maxCpuLoad, LBRealType &totalLoad);
45 };
46
47 /** added by Abhinav
48  * class for computing the parent and children of a processor 
49  */
50 class SpanningTree
51 {
52         public:
53                 int arity;
54                 int parent;
55                 int numChildren;
56                 SpanningTree();
57                 void calcParent(int n);
58                 void calcNumChildren(int n);
59 };
60
61 class CentralLB : public BaseLB
62 {
63 private:
64   CLBStatsMsg *statsMsg;
65   int count_msgs;
66   void initLB(const CkLBOptions &);
67 public:
68   CkMarshalledCLBStatsMessage bufMsg;
69   SpanningTree st;
70   CentralLB(const CkLBOptions & opt):BaseLB(opt) { initLB(opt); 
71 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
72         lbDecisionCount= resumeCount=0;
73 #endif
74
75   CentralLB(CkMigrateMessage *m):BaseLB(m) {}
76   virtual ~CentralLB();
77
78   void pup(PUP::er &p);
79
80   void turnOn();
81   void turnOff();
82
83   static void staticAtSync(void*);
84   void AtSync(void); // Everything is at the PE barrier
85   void ProcessAtSync(void); // Receive a message from AtSync to avoid
86                             // making projections output look funny
87   //void ProcessAtSyncMin(void);
88   void SendStats();
89   //void SendMinStats();
90   void ReceiveCounts(CkReductionMsg *);
91   //void ReceiveMinStats(CkReductionMsg *);
92   void ReceiveStats(CkMarshalledCLBStatsMessage &msg);  // Receive stats on PE 0
93   void ReceiveStatsViaTree(CkMarshalledCLBStatsMessage &msg); // Receive stats using a tree structure  
94   
95   void depositData(CLBStatsMsg *m);
96   void LoadBalance(void); 
97   void ResumeClients(int);                      // Resuming clients needs
98
99  // void LoadBalanceDecision(int, int);
100  // void LoadBalanceDecisionFinal(int, int);
101  // void ReceiveIterationNo(int, int); // Receives the current iter no
102
103   void ResumeClients(CkReductionMsg *); // to be resumed via message
104   void ReceiveMigration(LBMigrateMsg *);        // Receive migration data
105   void ProcessReceiveMigration(CkReductionMsg  *);
106 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
107         void ReceiveDummyMigration(int _step);
108 #endif
109   void MissMigrate(int waitForBarrier);
110
111   // manual predictor start/stop
112   static void staticPredictorOn(void* data, void* model);
113   static void staticPredictorOnWin(void* data, void* model, int wind);
114   static void staticPredictorOff(void* data);
115   static void staticChangePredictor(void* data, void* model);
116
117   // manual start load balancing
118   inline void StartLB() { thisProxy.ProcessAtSync(); }
119   static void staticStartLB(void* data);
120
121   // Migrated-element callback
122   static void staticMigrated(void* me, LDObjHandle h, int waitBarrier=1);
123   void Migrated(LDObjHandle h, int waitBarrier=1);
124
125   void MigrationDone(int balancing);  // Call when migration is complete
126   void CheckMigrationComplete();      // Call when all migration is complete
127
128   // IMPLEMENTATION FOR FUTURE PREDICTOR
129   void FuturePredictor(LDStats* stats);
130
131   struct FutureModel {
132     int n_stats;    // total number of statistics allocated
133     int cur_stats;   // number of statistics currently present
134     int start_stats; // next stat to be written
135     LDStats *collection;
136     int n_objs;     // each object has its own parameters
137     LBPredictorFunction *predictor;
138     double **parameters;
139     bool *model_valid;
140
141     FutureModel(): n_stats(0), cur_stats(0), start_stats(0), collection(NULL),
142          n_objs(0), parameters(NULL) {predictor = new DefaultFunction();}
143
144     FutureModel(int n): n_stats(n), cur_stats(0), start_stats(0), n_objs(0),
145          parameters(NULL) {
146       collection = new LDStats[n];
147       //for (int i=0;i<n;++i) collection[i].objData=NULL;
148       predictor = new DefaultFunction();
149     }
150
151     FutureModel(int n, LBPredictorFunction *myfunc): n_stats(n), cur_stats(0), start_stats(0), n_objs(0), parameters(NULL) {
152       collection = new LDStats[n];
153       //for (int i=0;i<n;++i) collection[i].objData=NULL;
154       predictor = myfunc;
155     }
156
157     ~FutureModel() {
158       delete[] collection;
159       for (int i=0;i<n_objs;++i) delete[] parameters[i];
160       delete[] parameters;
161       delete predictor;
162     }
163
164     void changePredictor(LBPredictorFunction *new_predictor) {
165       delete predictor;
166       int i;
167       // gain control of the provided predictor;
168       predictor = new_predictor;
169       for (i=0;i<n_objs;++i) delete[] parameters[i];
170       for (i=0;i<n_objs;++i) {
171         parameters[i] = new double[new_predictor->num_params];
172         model_valid = false;
173       }
174     }
175   };
176
177   // create new predictor, if one already existing, delete it first
178   // if "pred" == 0 then the default function is used
179   void predictorOn(LBPredictorFunction *pred) {
180     predictorOn(pred, _lb_predict_window);
181   }
182   void predictorOn(LBPredictorFunction *pred, int window_size) {
183     if (predicted_model) PredictorPrintf("Predictor already allocated");
184     else {
185       _lb_predict_window = window_size;
186       if (pred) predicted_model = new FutureModel(window_size, pred);
187       else predicted_model = new FutureModel(window_size);
188       _lb_predict = CmiTrue;
189     }
190     PredictorPrintf("Predictor turned on, window size %d\n",window_size);
191   }
192
193   // deallocate the predictor
194   void predictorOff() {
195     if (predicted_model) delete predicted_model;
196     predicted_model = 0;
197     _lb_predict = CmiFalse;
198     PredictorPrintf("Predictor turned off\n");
199   }
200
201   // change the function of the predictor, at runtime
202   // it will do nothing if it does not exist
203   void changePredictor(LBPredictorFunction *new_predictor) {
204     if (predicted_model) {
205       predicted_model->changePredictor(new_predictor);
206       PredictorPrintf("Predictor model changed\n");
207     }
208   }
209   // END IMPLEMENTATION FOR FUTURE PREDICTOR
210
211   LBMigrateMsg* callStrategy(LDStats* stats,int count){
212     return Strategy(stats);
213   };
214
215   int cur_ld_balancer;
216
217   void readStatsMsgs(const char* filename);
218   void writeStatsMsgs(const char* filename);
219
220   void preprocess(LDStats* stats);
221   virtual LBMigrateMsg* Strategy(LDStats* stats);
222   virtual void work(LDStats* stats);
223   virtual LBMigrateMsg * createMigrateMsg(LDStats* stats);
224   virtual LBMigrateMsg * extractMigrateMsg(LBMigrateMsg *m, int p);
225
226   // Not to be used -- maintained for legacy applications
227   virtual LBMigrateMsg* Strategy(LDStats* stats, int nprocs) {
228     return Strategy(stats);
229   }
230
231 protected:
232   virtual CmiBool QueryBalanceNow(int) { return CmiTrue; };  
233   virtual CmiBool QueryDumpData() { return CmiFalse; };  
234   virtual void LoadbalanceDone(int balancing) {}
235
236   void simulationRead();
237   void simulationWrite();
238   void findSimResults(LDStats* stats, int count, 
239                       LBMigrateMsg* msg, LBSimulation* simResults);
240   void removeNonMigratable(LDStats* statsDataList, int count);
241
242   virtual void UpdateLBDBWithData(int is_prev_lb_refine, double lb_max,
243       double lb_avg) {
244     theLbdb->UpdateAfterLBData(is_prev_lb_refine, lb_max, lb_avg);
245   }
246
247   virtual void GetPrevLBData(int& is_prev_lb_refine, double& lb_max,
248       double& lb_avg) {
249     theLbdb->GetPrevLBData(is_prev_lb_refine, lb_max, lb_avg);
250   }
251
252
253 private:  
254   CProxy_CentralLB thisProxy;
255   int myspeed;
256   int stats_msg_count;
257   CLBStatsMsg **statsMsgsList;
258   LDStats *statsData;
259   int migrates_completed;
260   int migrates_expected;
261   int future_migrates_completed;
262   int future_migrates_expected;
263   int lbdone;
264   double start_lb_time;
265   LBMigrateMsg   *storedMigrateMsg;
266   int  reduction_started;
267
268
269   FutureModel *predicted_model;
270
271   void BuildStatsMsg();
272   void buildStats();
273   bool generatePlan(int& period);
274   bool getLineEq(double& aslope, double& ac, double& mslope, double& mc);
275   bool getPeriodForLinear(double a, double b, double c, int& period);
276
277 public:
278   int useMem();
279 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
280     int savedBalancing;
281     void endMigrationDone(int balancing);
282     int lbDecisionCount ,resumeCount;
283 #endif
284 };
285
286 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_)) 
287     void resumeCentralLbAfterChkpt(void *lb);
288 #endif
289
290 // CLBStatsMsg is not directly sent in the entry function
291 // CkMarshalledCLBStatsMessage is used instead to use the pup defined here.
292 //class CLBStatsMsg: public CMessage_CLBStatsMsg {
293 class CLBStatsMsg {
294 public:
295   int from_pe;
296   int pe_speed;
297   LBRealType total_walltime;
298   LBRealType idletime;
299   LBRealType bg_walltime;
300 #if CMK_LB_CPUTIMER
301   LBRealType total_cputime;
302   LBRealType bg_cputime;
303 #endif
304   int n_objs;
305   LDObjData *objData;
306   int n_comm;
307   LDCommData *commData;
308
309   char * avail_vector;
310   int next_lb;
311 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
312         int step;
313 #endif
314
315 public:
316   CLBStatsMsg(int osz, int csz);
317   CLBStatsMsg(): from_pe(0), pe_speed(0), total_walltime(0.0), idletime(0.0),
318                  bg_walltime(0.0), n_objs(0), objData(NULL), n_comm(0),
319 #if CMK_LB_CPUTIMER
320                  total_cputime(0.0), bg_cputime(0.0),
321 #endif
322                  commData(NULL), avail_vector(NULL), next_lb(0) {}
323   ~CLBStatsMsg();
324   void pup(PUP::er &p);
325 }; 
326
327
328 // compute load distribution info
329 void getLoadInfo(BaseLB::LDStats* stats, int count, LBInfo &info, int considerComm);
330
331 #endif /* CENTRALLB_H */
332
333 /*@}*/
334
335