A few bug fixes related to the mlogft machine.
[charm.git] / src / ck-ldb / CentralLB.h
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 /**
9  * \addtogroup CkLdb
10 */
11 /*@{*/
12
13 #ifndef CENTRALLB_H
14 #define CENTRALLB_H
15
16 #include <math.h>
17 #include "BaseLB.h"
18 #include "CentralLB.decl.h"
19
20 extern CkGroupID loadbalancer;
21
22 void CreateCentralLB();
23
24 class CLBStatsMsg;
25 class LBSimulation;
26
27 /// for backward compatibility
28 typedef LBMigrateMsg  CLBMigrateMsg;
29
30 class LBInfo
31 {
32 public:
33   double *peLoads;      // total load: object + background
34   double *objLoads;     // total obj load
35   double *comLoads;     // total comm load
36   double *bgLoads;      // background load
37   int    numPes;
38   int    msgCount;      // total non-local communication
39   CmiUInt8  msgBytes;   // total non-local communication
40   double minObjLoad, maxObjLoad;
41   LBInfo(): peLoads(NULL), objLoads(NULL), comLoads(NULL), 
42             bgLoads(NULL), msgCount(0), msgBytes(0),
43             numPes(0), minObjLoad(0.0), maxObjLoad(0.0) {}
44   LBInfo(double *pl, int count): peLoads(pl), objLoads(NULL), 
45             comLoads(NULL), bgLoads(NULL), msgCount(0), msgBytes(0),
46             numPes(count), minObjLoad(0.0), maxObjLoad(0.0) {}
47   LBInfo(int count);
48   ~LBInfo();
49   void getInfo(BaseLB::LDStats* stats, int count, int considerComm);
50   void clear();
51   void print();
52   void getSummary(double &maxLoad, double &maxCpuLoad, double &totalLoad);
53 };
54
55 /** added by Abhinav
56  * class for computing the parent and children of a processor 
57  */
58 class SpanningTree
59 {
60         public:
61                 int arity;
62                 int parent;
63                 int numChildren;
64                 SpanningTree();
65                 void calcParent(int n);
66                 void calcNumChildren(int n);
67 };
68
69 class CentralLB : public BaseLB
70 {
71 private:
72   CLBStatsMsg *statsMsg;
73   int count_msgs;
74   void initLB(const CkLBOptions &);
75 public:
76   CkMarshalledCLBStatsMessage bufMsg;
77   SpanningTree st;
78   CentralLB(const CkLBOptions & opt):BaseLB(opt) { initLB(opt); 
79 #ifdef _FAULT_MLOG_
80         lbDecisionCount= resumeCount=0;
81 #endif
82
83   CentralLB(CkMigrateMessage *m):BaseLB(m) {}
84   virtual ~CentralLB();
85
86   void pup(PUP::er &p);
87
88   void turnOn();
89   void turnOff();
90
91   static void staticAtSync(void*);
92   void AtSync(void); // Everything is at the PE barrier
93   void ProcessAtSync(void); // Receive a message from AtSync to avoid
94                             // making projections output look funny
95
96   void SendStats();
97   void ReceiveCounts(CkReductionMsg *);
98   void ReceiveStats(CkMarshalledCLBStatsMessage &msg);  // Receive stats on PE 0
99   void ReceiveStatsViaTree(CkMarshalledCLBStatsMessage &msg); // Receive stats using a tree structure  
100   
101   void depositData(CLBStatsMsg *m);
102   void LoadBalance(void); 
103   void ResumeClients(int);                      // Resuming clients needs
104                                                 // to be resumed via message
105   void ResumeClients(CkReductionMsg *);
106   void ReceiveMigration(LBMigrateMsg *);        // Receive migration data
107 #ifdef _FAULT_MLOG_
108         void ReceiveDummyMigration(int _step);
109 #endif
110   void MissMigrate(int waitForBarrier);
111
112   // manual predictor start/stop
113   static void staticPredictorOn(void* data, void* model);
114   static void staticPredictorOnWin(void* data, void* model, int wind);
115   static void staticPredictorOff(void* data);
116   static void staticChangePredictor(void* data, void* model);
117
118   // manual start load balancing
119   inline void StartLB() { thisProxy.ProcessAtSync(); }
120   static void staticStartLB(void* data);
121
122   // Migrated-element callback
123   static void staticMigrated(void* me, LDObjHandle h, int waitBarrier=1);
124   void Migrated(LDObjHandle h, int waitBarrier=1);
125
126   void MigrationDone(int balancing);  // Call when migration is complete
127   void CheckMigrationComplete();      // Call when all migration is complete
128
129   // IMPLEMENTATION FOR FUTURE PREDICTOR
130   void FuturePredictor(LDStats* stats);
131
132   struct FutureModel {
133     int n_stats;    // total number of statistics allocated
134     int cur_stats;   // number of statistics currently present
135     int start_stats; // next stat to be written
136     LDStats *collection;
137     int n_objs;     // each object has its own parameters
138     LBPredictorFunction *predictor;
139     double **parameters;
140     bool *model_valid;
141
142     FutureModel(): n_stats(0), cur_stats(0), start_stats(0), collection(NULL),
143          n_objs(0), parameters(NULL) {predictor = new DefaultFunction();}
144
145     FutureModel(int n): n_stats(n), cur_stats(0), start_stats(0), n_objs(0),
146          parameters(NULL) {
147       collection = new LDStats[n];
148       //for (int i=0;i<n;++i) collection[i].objData=NULL;
149       predictor = new DefaultFunction();
150     }
151
152     FutureModel(int n, LBPredictorFunction *myfunc): n_stats(n), cur_stats(0), start_stats(0), n_objs(0), parameters(NULL) {
153       collection = new LDStats[n];
154       //for (int i=0;i<n;++i) collection[i].objData=NULL;
155       predictor = myfunc;
156     }
157
158     ~FutureModel() {
159       delete[] collection;
160       for (int i=0;i<n_objs;++i) delete[] parameters[i];
161       delete[] parameters;
162       delete predictor;
163     }
164
165     void changePredictor(LBPredictorFunction *new_predictor) {
166       delete predictor;
167       int i;
168       // gain control of the provided predictor;
169       predictor = new_predictor;
170       for (i=0;i<n_objs;++i) delete[] parameters[i];
171       for (i=0;i<n_objs;++i) {
172         parameters[i] = new double[new_predictor->num_params];
173         model_valid = false;
174       }
175     }
176   };
177
178   // create new predictor, if one already existing, delete it first
179   // if "pred" == 0 then the default function is used
180   void predictorOn(LBPredictorFunction *pred) {
181     predictorOn(pred, _lb_predict_window);
182   }
183   void predictorOn(LBPredictorFunction *pred, int window_size) {
184     if (predicted_model) PredictorPrintf("Predictor already allocated");
185     else {
186       _lb_predict_window = window_size;
187       if (pred) predicted_model = new FutureModel(window_size, pred);
188       else predicted_model = new FutureModel(window_size);
189       _lb_predict = CmiTrue;
190     }
191     PredictorPrintf("Predictor turned on, window size %d\n",window_size);
192   }
193
194   // deallocate the predictor
195   void predictorOff() {
196     if (predicted_model) delete predicted_model;
197     predicted_model = 0;
198     _lb_predict = CmiFalse;
199     PredictorPrintf("Predictor turned off\n");
200   }
201
202   // change the function of the predictor, at runtime
203   // it will do nothing if it does not exist
204   void changePredictor(LBPredictorFunction *new_predictor) {
205     if (predicted_model) {
206       predicted_model->changePredictor(new_predictor);
207       PredictorPrintf("Predictor model changed\n");
208     }
209   }
210   // END IMPLEMENTATION FOR FUTURE PREDICTOR
211
212   LBMigrateMsg* callStrategy(LDStats* stats,int count){
213     return Strategy(stats,count);
214   };
215
216   int cur_ld_balancer;
217
218   void readStatsMsgs(const char* filename);
219   void writeStatsMsgs(const char* filename);
220
221   void preprocess(LDStats* stats,int count);
222   virtual LBMigrateMsg* Strategy(LDStats* stats,int count);
223   virtual void work(LDStats* stats,int count);
224   virtual LBMigrateMsg * createMigrateMsg(LDStats* stats,int count);
225 protected:
226   virtual CmiBool QueryBalanceNow(int) { return CmiTrue; };  
227   virtual CmiBool QueryDumpData() { return CmiFalse; };  
228   virtual void LoadbalanceDone(int balancing) {}
229
230   void simulationRead();
231   void simulationWrite();
232   void findSimResults(LDStats* stats, int count, 
233                       LBMigrateMsg* msg, LBSimulation* simResults);
234   void removeNonMigratable(LDStats* statsDataList, int count);
235
236 private:  
237   CProxy_CentralLB thisProxy;
238   int myspeed;
239   int stats_msg_count;
240   CLBStatsMsg **statsMsgsList;
241   LDStats *statsData;
242   int migrates_completed;
243   int migrates_expected;
244   int future_migrates_completed;
245   int future_migrates_expected;
246   int lbdone;
247   double start_lb_time;
248
249   FutureModel *predicted_model;
250
251   void BuildStatsMsg();
252   void buildStats();
253
254 public:
255   int useMem();
256 #ifdef _FAULT_MLOG_
257     int savedBalancing;
258     void endMigrationDone(int balancing);
259     int lbDecisionCount ,resumeCount;
260 #endif
261 };
262
263 #ifdef _FAULT_MLOG_ 
264     void resumeCentralLbAfterChkpt(void *lb);
265 #endif
266
267 // CLBStatsMsg is not directly sent in the entry function
268 // CkMarshalledCLBStatsMessage is used instead to use the pup defined here.
269 //class CLBStatsMsg: public CMessage_CLBStatsMsg {
270 class CLBStatsMsg {
271 public:
272   int from_pe;
273   int pe_speed;
274   double total_walltime;
275   double total_cputime;
276   double idletime;
277   double bg_walltime;
278   double bg_cputime;
279   int n_objs;
280   LDObjData *objData;
281   int n_comm;
282   LDCommData *commData;
283
284   char * avail_vector;
285   int next_lb;
286 #ifdef _FAULT_MLOG_
287         int step;
288 #endif
289
290 public:
291   CLBStatsMsg(int osz, int csz);
292   CLBStatsMsg(): from_pe(0), pe_speed(0), 
293                  total_walltime(0.0), total_cputime(0.0),
294                  idletime(0.0), bg_walltime(0.0), bg_cputime(0.0),
295                  n_objs(0), objData(NULL), n_comm(0), commData(NULL), 
296                  avail_vector(NULL), next_lb(0)  {}
297   ~CLBStatsMsg();
298   void pup(PUP::er &p);
299 }; 
300
301
302 // compute load distribution info
303 void getLoadInfo(BaseLB::LDStats* stats, int count, LBInfo &info, int considerComm);
304
305 #endif /* CENTRALLB_H */
306
307 /*@}*/
308
309