Merging
[charm.git] / src / ck-ldb / CentralLB.h
1 /**
2  * \addtogroup CkLdb
3 */
4 /*@{*/
5
6 #ifndef CENTRALLB_H
7 #define CENTRALLB_H
8
9 #include "BaseLB.h"
10 #include "CentralLB.decl.h"
11
12 extern CkGroupID loadbalancer;
13
14 void CreateCentralLB();
15
16 class CLBStatsMsg;
17 class LBSimulation;
18
19 /// for backward compatibility
20 typedef LBMigrateMsg  CLBMigrateMsg;
21
22 class LBInfo
23 {
24 public:
25   LBRealType *peLoads;  // total load: object + background
26   LBRealType *objLoads;         // total obj load
27   LBRealType *comLoads;         // total comm load
28   LBRealType *bgLoads;  // background load
29   int    numPes;
30   int    msgCount;      // total non-local communication
31   CmiUInt8  msgBytes;   // total non-local communication
32   LBRealType minObjLoad, maxObjLoad;
33   LBInfo(): peLoads(NULL), objLoads(NULL), comLoads(NULL), 
34             bgLoads(NULL), numPes(0), msgCount(0),
35             msgBytes(0), minObjLoad(0.0), maxObjLoad(0.0) {}
36   LBInfo(LBRealType *pl, int count): peLoads(pl), objLoads(NULL), 
37             comLoads(NULL), bgLoads(NULL), numPes(count), msgCount(0),
38             msgBytes(0), minObjLoad(0.0), maxObjLoad(0.0) {}
39   LBInfo(int count);
40   ~LBInfo();
41   void getInfo(BaseLB::LDStats* stats, int count, int considerComm);
42   void clear();
43   void print();
44   void getSummary(LBRealType &maxLoad, LBRealType &maxCpuLoad, LBRealType &totalLoad);
45 };
46
47 /** added by Abhinav
48  * class for computing the parent and children of a processor 
49  */
50 class SpanningTree
51 {
52         public:
53                 int arity;
54                 int parent;
55                 int numChildren;
56                 SpanningTree();
57                 void calcParent(int n);
58                 void calcNumChildren(int n);
59 };
60
61 class CentralLB : public BaseLB
62 {
63 private:
64   CLBStatsMsg *statsMsg;
65   int count_msgs;
66   void initLB(const CkLBOptions &);
67 public:
68   CkMarshalledCLBStatsMessage bufMsg;
69   SpanningTree st;
70   CentralLB(const CkLBOptions & opt):BaseLB(opt) { initLB(opt); 
71 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
72         lbDecisionCount= resumeCount=0;
73 #endif
74
75   CentralLB(CkMigrateMessage *m):BaseLB(m) {}
76   virtual ~CentralLB();
77
78   void pup(PUP::er &p);
79
80   void turnOn();
81   void turnOff();
82
83   static void staticAtSync(void*);
84   void AtSync(void); // Everything is at the PE barrier
85   void ProcessAtSync(void); // Receive a message from AtSync to avoid
86                             // making projections output look funny
87   //void ProcessAtSyncMin(void);
88   void SendStats();
89   //void SendMinStats();
90   void ReceiveCounts(CkReductionMsg *);
91   //void ReceiveMinStats(CkReductionMsg *);
92   void ReceiveStats(CkMarshalledCLBStatsMessage &msg);  // Receive stats on PE 0
93   void ReceiveStatsViaTree(CkMarshalledCLBStatsMessage &msg); // Receive stats using a tree structure  
94   
95   void depositData(CLBStatsMsg *m);
96   void LoadBalance(void); 
97   void ResumeClients(int);                      // Resuming clients needs
98
99  // void LoadBalanceDecision(int, int);
100  // void LoadBalanceDecisionFinal(int, int);
101  // void ReceiveIterationNo(int, int); // Receives the current iter no
102
103   void ResumeClients(CkReductionMsg *); // to be resumed via message
104   void ReceiveMigration(LBMigrateMsg *);        // Receive migration data
105   void ProcessReceiveMigration(CkReductionMsg  *);
106 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
107         void ReceiveDummyMigration(int _step);
108 #endif
109   void MissMigrate(int waitForBarrier);
110
111   // manual predictor start/stop
112   static void staticPredictorOn(void* data, void* model);
113   static void staticPredictorOnWin(void* data, void* model, int wind);
114   static void staticPredictorOff(void* data);
115   static void staticChangePredictor(void* data, void* model);
116
117   // manual start load balancing
118   inline void StartLB() { thisProxy.ProcessAtSync(); }
119   static void staticStartLB(void* data);
120
121   // Migrated-element callback
122   static void staticMigrated(void* me, LDObjHandle h, int waitBarrier=1);
123   void Migrated(LDObjHandle h, int waitBarrier=1);
124
125   void MigrationDone(int balancing);  // Call when migration is complete
126   void CheckMigrationComplete();      // Call when all migration is complete
127
128   // IMPLEMENTATION FOR FUTURE PREDICTOR
129   void FuturePredictor(LDStats* stats);
130
131   struct FutureModel {
132     int n_stats;    // total number of statistics allocated
133     int cur_stats;   // number of statistics currently present
134     int start_stats; // next stat to be written
135     LDStats *collection;
136     int n_objs;     // each object has its own parameters
137     LBPredictorFunction *predictor;
138     double **parameters;
139     bool *model_valid;
140
141     FutureModel(): n_stats(0), cur_stats(0), start_stats(0), collection(NULL),
142          n_objs(0), parameters(NULL) {predictor = new DefaultFunction();}
143
144     FutureModel(int n): n_stats(n), cur_stats(0), start_stats(0), n_objs(0),
145          parameters(NULL) {
146       collection = new LDStats[n];
147       //for (int i=0;i<n;++i) collection[i].objData=NULL;
148       predictor = new DefaultFunction();
149     }
150
151     FutureModel(int n, LBPredictorFunction *myfunc): n_stats(n), cur_stats(0), start_stats(0), n_objs(0), parameters(NULL) {
152       collection = new LDStats[n];
153       //for (int i=0;i<n;++i) collection[i].objData=NULL;
154       predictor = myfunc;
155     }
156
157     ~FutureModel() {
158       delete[] collection;
159       for (int i=0;i<n_objs;++i) delete[] parameters[i];
160       delete[] parameters;
161       delete predictor;
162     }
163
164     void changePredictor(LBPredictorFunction *new_predictor) {
165       delete predictor;
166       int i;
167       // gain control of the provided predictor;
168       predictor = new_predictor;
169       for (i=0;i<n_objs;++i) delete[] parameters[i];
170       for (i=0;i<n_objs;++i) {
171         parameters[i] = new double[new_predictor->num_params];
172         model_valid[i] = false;
173       }
174     }
175   };
176
177   // create new predictor, if one already existing, delete it first
178   // if "pred" == 0 then the default function is used
179   void predictorOn(LBPredictorFunction *pred) {
180     predictorOn(pred, _lb_predict_window);
181   }
182   void predictorOn(LBPredictorFunction *pred, int window_size) {
183     if (predicted_model) PredictorPrintf("Predictor already allocated");
184     else {
185       _lb_predict_window = window_size;
186       if (pred) predicted_model = new FutureModel(window_size, pred);
187       else predicted_model = new FutureModel(window_size);
188       _lb_predict = CmiTrue;
189     }
190     PredictorPrintf("Predictor turned on, window size %d\n",window_size);
191   }
192
193   // deallocate the predictor
194   void predictorOff() {
195     if (predicted_model) delete predicted_model;
196     predicted_model = 0;
197     _lb_predict = CmiFalse;
198     PredictorPrintf("Predictor turned off\n");
199   }
200
201   // change the function of the predictor, at runtime
202   // it will do nothing if it does not exist
203   void changePredictor(LBPredictorFunction *new_predictor) {
204     if (predicted_model) {
205       predicted_model->changePredictor(new_predictor);
206       PredictorPrintf("Predictor model changed\n");
207     }
208   }
209   // END IMPLEMENTATION FOR FUTURE PREDICTOR
210
211   LBMigrateMsg* callStrategy(LDStats* stats,int count){
212     return Strategy(stats);
213   };
214
215   int cur_ld_balancer;
216
217   void readStatsMsgs(const char* filename);
218   void writeStatsMsgs(const char* filename);
219
220   void preprocess(LDStats* stats);
221   virtual LBMigrateMsg* Strategy(LDStats* stats);
222   virtual void work(LDStats* stats);
223   virtual LBMigrateMsg * createMigrateMsg(LDStats* stats);
224   virtual LBMigrateMsg * extractMigrateMsg(LBMigrateMsg *m, int p);
225
226   // Not to be used -- maintained for legacy applications
227   virtual LBMigrateMsg* Strategy(LDStats* stats, int nprocs) {
228     return Strategy(stats);
229   }
230
231 protected:
232   virtual CmiBool QueryBalanceNow(int) { return CmiTrue; };  
233   virtual CmiBool QueryDumpData() { return CmiFalse; };  
234   virtual void LoadbalanceDone(int balancing) {}
235
236   void simulationRead();
237   void simulationWrite();
238   void findSimResults(LDStats* stats, int count, 
239                       LBMigrateMsg* msg, LBSimulation* simResults);
240   void removeNonMigratable(LDStats* statsDataList, int count);
241
242   virtual void UpdateLBDBWithData(int is_prev_lb_refine, double lb_max,
243       double lb_avg, double local_comm, double remote_comm) {
244     theLbdb->UpdateAfterLBData(is_prev_lb_refine, lb_max, lb_avg, local_comm,
245         remote_comm);
246   }
247
248   virtual void GetPrevLBData(int& is_prev_lb_refine, double& lb_max_avg_ratio,
249       double& local_remote_comm_ratio) {
250     theLbdb->GetPrevLBData(is_prev_lb_refine, lb_max_avg_ratio,
251         local_remote_comm_ratio);
252   }
253
254   virtual void GetLBDataForLB(int prev_lb, double& lb_max_avg_ratio, double&
255       local_remote_comm_ratio) {
256     theLbdb->GetLBDataForLB(prev_lb, lb_max_avg_ratio, local_remote_comm_ratio);
257   }
258
259
260 private:  
261   CProxy_CentralLB thisProxy;
262   int myspeed;
263   int stats_msg_count;
264   CLBStatsMsg **statsMsgsList;
265   LDStats *statsData;
266   int migrates_completed;
267   int migrates_expected;
268   int future_migrates_completed;
269   int future_migrates_expected;
270   int lbdone;
271   double start_lb_time;
272   LBMigrateMsg   *storedMigrateMsg;
273   int  reduction_started;
274
275
276   FutureModel *predicted_model;
277
278   void BuildStatsMsg();
279   void buildStats();
280   bool generatePlan(int& period);
281   bool getLineEq(double& aslope, double& ac, double& mslope, double& mc);
282   bool getPeriodForLinear(double a, double b, double c, int& period);
283
284 public:
285   int useMem();
286 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
287     int savedBalancing;
288     void endMigrationDone(int balancing);
289     int lbDecisionCount ,resumeCount;
290 #endif
291 };
292
293 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_)) 
294     void resumeCentralLbAfterChkpt(void *lb);
295 #endif
296
297 // CLBStatsMsg is not directly sent in the entry function
298 // CkMarshalledCLBStatsMessage is used instead to use the pup defined here.
299 //class CLBStatsMsg: public CMessage_CLBStatsMsg {
300 class CLBStatsMsg {
301 public:
302   int from_pe;
303   int pe_speed;
304   LBRealType total_walltime;
305   LBRealType idletime;
306   LBRealType bg_walltime;
307 #if CMK_LB_CPUTIMER
308   LBRealType total_cputime;
309   LBRealType bg_cputime;
310 #endif
311   int n_objs;
312   LDObjData *objData;
313   int n_comm;
314   LDCommData *commData;
315
316   char * avail_vector;
317   int next_lb;
318 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
319         int step;
320 #endif
321
322 public:
323   CLBStatsMsg(int osz, int csz);
324   CLBStatsMsg(): from_pe(0), pe_speed(0), total_walltime(0.0), idletime(0.0),
325                  bg_walltime(0.0), n_objs(0), objData(NULL), n_comm(0),
326 #if CMK_LB_CPUTIMER
327                  total_cputime(0.0), bg_cputime(0.0),
328 #endif
329                  commData(NULL), avail_vector(NULL), next_lb(0) {}
330   ~CLBStatsMsg();
331   void pup(PUP::er &p);
332 }; 
333
334
335 // compute load distribution info
336 void getLoadInfo(BaseLB::LDStats* stats, int count, LBInfo &info, int considerComm);
337
338 #endif /* CENTRALLB_H */
339
340 /*@}*/
341
342