a modification in LDStats database, changed pointer to array to CkVec for easier...
[charm.git] / src / ck-ldb / CentralLB.h
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 /**
9  * \addtogroup CkLdb
10 */
11 /*@{*/
12
13 #ifndef CENTRALLB_H
14 #define CENTRALLB_H
15
16 #include <math.h>
17 #include "BaseLB.h"
18 #include "CentralLB.decl.h"
19
20 extern CkGroupID loadbalancer;
21
22 void CreateCentralLB();
23
24 class CLBStatsMsg;
25 class LBSimulation;
26
27 /// for backward compatibility
28 typedef LBMigrateMsg  CLBMigrateMsg;
29
30 class LBInfo
31 {
32 public:
33   double *peLoads;      // total load: object + background
34   double *objLoads;     // total obj load
35   double *comLoads;     // total comm load
36   double *bgLoads;      // background load
37   int    numPes;
38   double minObjLoad, maxObjLoad;
39   LBInfo(): peLoads(NULL), objLoads(NULL), comLoads(NULL), bgLoads(NULL), numPes(0), minObjLoad(0.0), maxObjLoad(0.0) {}
40   LBInfo(double *pl, int count): peLoads(pl), objLoads(NULL), comLoads(NULL), bgLoads(NULL), numPes(count), minObjLoad(0.0), maxObjLoad(0.0) {}
41   LBInfo(int count);
42   ~LBInfo();
43   void clear();
44   void print();
45 };
46
47 class CentralLB : public BaseLB
48 {
49 private:
50   void initLB(const CkLBOptions &);
51 public:
52   CentralLB(const CkLBOptions & opt):BaseLB(opt) { initLB(opt); } 
53   CentralLB(CkMigrateMessage *m):BaseLB(m) {}
54   virtual ~CentralLB();
55
56   void pup(PUP::er &p);
57
58   void turnOn();
59   void turnOff();
60   inline int step() { return theLbdb->step(); }
61
62   static void staticAtSync(void*);
63   void AtSync(void); // Everything is at the PE barrier
64   void ProcessAtSync(void); // Receive a message from AtSync to avoid
65                             // making projections output look funny
66
67   void ReceiveStats(CkMarshalledCLBStatsMessage &msg);  // Receive stats on PE 0
68   void LoadBalance(void); 
69   void ResumeClients(int);                      // Resuming clients needs
70                                                 // to be resumed via message
71   void ResumeClients(CkReductionMsg *);
72   void ReceiveMigration(LBMigrateMsg *);        // Receive migration data
73   void MissMigrate(int waitForBarrier);
74
75   // manual predictor start/stop
76   static void staticPredictorOn(void* data, void* model);
77   static void staticPredictorOnWin(void* data, void* model, int wind);
78   static void staticPredictorOff(void* data);
79   static void staticChangePredictor(void* data, void* model);
80
81   // manual start load balancing
82   inline void StartLB() { thisProxy.ProcessAtSync(); }
83   static void staticStartLB(void* data);
84
85   // Migrated-element callback
86   static void staticMigrated(void* me, LDObjHandle h, int waitBarrier=1);
87   void Migrated(LDObjHandle h, int waitBarrier=1);
88
89   void MigrationDone(int balancing);  // Call when migration is complete
90   void CheckMigrationComplete();      // Call when all migration is complete
91
92   // IMPLEMENTATION FOR FUTURE PREDICTOR
93   void FuturePredictor(LDStats* stats);
94
95   struct FutureModel {
96     int n_stats;    // total number of statistics allocated
97     int cur_stats;   // number of statistics currently present
98     int start_stats; // next stat to be written
99     LDStats *collection;
100     int n_objs;     // each object has its own parameters
101     LBPredictorFunction *predictor;
102     double **parameters;
103     bool *model_valid;
104
105     FutureModel(): n_stats(0), cur_stats(0), start_stats(0), collection(NULL),
106          n_objs(0), parameters(NULL) {predictor = new DefaultFunction();}
107
108     FutureModel(int n): n_stats(n), cur_stats(0), start_stats(0), n_objs(0),
109          parameters(NULL) {
110       collection = new LDStats[n];
111       //for (int i=0;i<n;++i) collection[i].objData=NULL;
112       predictor = new DefaultFunction();
113     }
114
115     FutureModel(int n, LBPredictorFunction *myfunc): n_stats(n), cur_stats(0), start_stats(0), n_objs(0), parameters(NULL) {
116       collection = new LDStats[n];
117       //for (int i=0;i<n;++i) collection[i].objData=NULL;
118       predictor = myfunc;
119     }
120
121     ~FutureModel() {
122       delete[] collection;
123       for (int i=0;i<n_objs;++i) delete[] parameters[i];
124       delete[] parameters;
125       delete predictor;
126     }
127
128     void changePredictor(LBPredictorFunction *new_predictor) {
129       delete predictor;
130       int i;
131       // gain control of the provided predictor;
132       predictor = new_predictor;
133       for (i=0;i<n_objs;++i) delete[] parameters[i];
134       for (i=0;i<n_objs;++i) {
135         parameters[i] = new double[new_predictor->num_params];
136         model_valid = false;
137       }
138     }
139   };
140
141   // create new predictor, if one already existing, delete it first
142   // if "pred" == 0 then the default function is used
143   void predictorOn(LBPredictorFunction *pred) {
144     predictorOn(pred, _lb_predict_window);
145   }
146   void predictorOn(LBPredictorFunction *pred, int window_size) {
147     if (predicted_model) PredictorPrintf("Predictor already allocated");
148     else {
149       _lb_predict_window = window_size;
150       if (pred) predicted_model = new FutureModel(window_size, pred);
151       else predicted_model = new FutureModel(window_size);
152       _lb_predict = CmiTrue;
153     }
154     PredictorPrintf("Predictor turned on, window size %d\n",window_size);
155   }
156
157   // deallocate the predictor
158   void predictorOff() {
159     if (predicted_model) delete predicted_model;
160     predicted_model = 0;
161     _lb_predict = CmiFalse;
162     PredictorPrintf("Predictor turned off\n");
163   }
164
165   // change the function of the predictor, at runtime
166   // it will do nothing if it does not exist
167   void changePredictor(LBPredictorFunction *new_predictor) {
168     if (predicted_model) {
169       predicted_model->changePredictor(new_predictor);
170       PredictorPrintf("Predictor model changed\n");
171     }
172   }
173   // END IMPLEMENTATION FOR FUTURE PREDICTOR
174
175   LBMigrateMsg* callStrategy(LDStats* stats,int count){
176     return Strategy(stats,count);
177   };
178
179   int cur_ld_balancer;
180
181   void readStatsMsgs(const char* filename);
182   void writeStatsMsgs(const char* filename);
183
184   void preprocess(LDStats* stats,int count);
185   virtual LBMigrateMsg* Strategy(LDStats* stats,int count);
186   virtual void work(LDStats* stats,int count);
187   virtual LBMigrateMsg * createMigrateMsg(LDStats* stats,int count);
188 protected:
189   virtual CmiBool QueryBalanceNow(int) { return CmiTrue; };  
190   virtual CmiBool QueryDumpData() { return CmiFalse; };  
191   virtual void LoadbalanceDone(int balancing) {}
192
193   void simulationRead();
194   void simulationWrite();
195   void findSimResults(LDStats* stats, int count, 
196                       LBMigrateMsg* msg, LBSimulation* simResults);
197   void removeNonMigratable(LDStats* statsDataList, int count);
198
199 private:  
200   CProxy_CentralLB thisProxy;
201   int myspeed;
202   int stats_msg_count;
203   CLBStatsMsg **statsMsgsList;
204   LDStats *statsData;
205   int migrates_completed;
206   int migrates_expected;
207   int future_migrates_completed;
208   int future_migrates_expected;
209   int lbdone;
210   double start_lb_time;
211
212   FutureModel *predicted_model;
213
214   void buildStats();
215
216 public:
217   int useMem();
218 };
219
220 // CLBStatsMsg is not directly sent in the entry function
221 // CkMarshalledCLBStatsMessage is used instead to use the pup defined here.
222 //class CLBStatsMsg: public CMessage_CLBStatsMsg {
223 class CLBStatsMsg {
224 public:
225   int from_pe;
226   int serial;
227   int pe_speed;
228   double total_walltime;
229   double total_cputime;
230   double idletime;
231   double bg_walltime;
232   double bg_cputime;
233   int n_objs;
234   LDObjData *objData;
235   int n_comm;
236   LDCommData *commData;
237
238   char * avail_vector;
239   int next_lb;
240 public:
241   CLBStatsMsg(int osz, int csz);
242   CLBStatsMsg()  {}
243   ~CLBStatsMsg();
244   void pup(PUP::er &p);
245 }; 
246
247
248 // compute load distribution info
249 void getLoadInfo(BaseLB::LDStats* stats, int count, LBInfo &info, int considerComm);
250
251 #endif /* CENTRALLB_H */
252
253 /*@}*/
254
255