added interface to allow Centralized load balancer to be able to ignore some of the...
[charm.git] / src / ck-ldb / CentralPredictor.C
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 /**
9  * \addtogroup CkLdb
10 */
11 /*@{*/
12
13 #include <math.h>
14 #include <charm++.h>
15 #include "CentralLB.h"
16
17 void CentralLB::staticPredictorOn(void* data, void *model)
18 {
19   CentralLB *me = (CentralLB*)(data);
20   me->predictorOn((LBPredictorFunction*)model);
21 }
22
23 void CentralLB::staticPredictorOnWin(void* data, void *model, int wind)
24 {
25   CentralLB *me = (CentralLB*)(data);
26   me->predictorOn((LBPredictorFunction*)model, wind);
27 }
28
29 void CentralLB::staticPredictorOff(void* data)
30 {
31   CentralLB *me = (CentralLB*)(data);
32   me->predictorOff();
33 }
34
35 void CentralLB::staticChangePredictor(void* data, void *model)
36 {
37   CentralLB *me = (CentralLB*)(data);
38   me->changePredictor((LBPredictorFunction*)model);
39 }
40
41 /*
42 // definitions for the implemented predictor model
43 #define NR_THRESHOLD            0.0001
44 #define INITIALIZE_PARAMS(x)    {x[0]=0; x[1]=1; x[2]=0; x[3]=0; x[4]=0; x[5]=0;}
45 #define PRED_FUNCTION(x, param) param[0] + param[1]*x + param[2]*x*x + param[3]*sin(param[4]*(x+param[5]))
46
47 #define ADD_DIFFERENTIAL(diff,collec,index,param)  {diff[0]+=PRED_DERIVATE_0(collec[index+1].objData[object].cpuTime,collec[index].objData[object].cpuTime,param); \
48 diff[1]+=PRED_DERIVATE_1(collec[index+1].objData[object].cpuTime,collec[index].objData[object].cpuTime,param); \
49 diff[2]+=PRED_DERIVATE_2(collec[index+1].objData[object].cpuTime,collec[index].objData[object].cpuTime,param); \
50 diff[3]+=PRED_DERIVATE_3(collec[index+1].objData[object].cpuTime,collec[index].objData[object].cpuTime,param); \
51 diff[4]+=PRED_DERIVATE_4(collec[index+1].objData[object].cpuTime,collec[index].objData[object].cpuTime,param); \
52 diff[5]+=PRED_DERIVATE_5(collec[index+1].objData[object].cpuTime,collec[index].objData[object].cpuTime,param);}
53
54 #define PRED_DERIVATE_0(y,x,p)  (y-PRED_FUNCTION(x,p))
55 #define PRED_DERIVATE_1(y,x,p)  (y-PRED_FUNCTION(x,p))*x
56 #define PRED_DERIVATE_2(y,x,p)  (y-PRED_FUNCTION(x,p))*x*x
57 #define PRED_DERIVATE_3(y,x,p)  (y-PRED_FUNCTION(x,p))*sin(p[4]*(x+p[5]))
58 #define PRED_DERIVATE_4(y,x,p)  (y-PRED_FUNCTION(x,p))*p[3]*(x+p[5])*cos(p[4]*(x+p[5]))
59 #define PRED_DERIVATE_5(y,x,p)  (y-PRED_FUNCTION(x,p))*p[3]*p[4]*cos(p[4]*(x+p[5]))
60
61 #define PRINT_MODEL(param)   "LB: %f + %fx + %fx^2 + %fsin%f(x+%f)\n",param[0],param[1],param[2],param[3],param[4],param[5]
62 #define LEARNING_PARAM   1
63
64 static void Newton_Raphson(CentralLB::FutureModel *mod, int object) {
65 #if CMK_LBDB_ON
66   int i,j;
67   double differential[NUM_PARAMETERS];
68   double error = 0;
69   int loop=0;
70   char mystring[10]="my";
71   for (i=0; i<_lb_predict_delay-1; ++i) {
72     CkPrintf("y,x = %f %f\n",mod->collection[i+1].objData[object].cpuTime,mod->collection[i].objData[object].cpuTime,PRED_FUNCTION(mod->collection[i].objData[object].cpuTime, mod->parameters[object]));
73     error += pow(mod->collection[i+1].objData[object].cpuTime - PRED_FUNCTION(mod->collection[i].objData[object].cpuTime, mod->parameters[object]), 2);
74   }
75
76   while (error > NR_THRESHOLD) {
77     CkPrintf("error: %f\n",error);
78     if (++loop==10) CmiAbort(mystring);
79     for (j=0; j<NUM_PARAMETERS; ++j) differential[j] = 0;
80     for (i=0; i<_lb_predict_delay-1; ++i) {
81       ADD_DIFFERENTIAL(differential, mod->collection, i, mod->parameters[object]);
82
83       //        differential[i] += PRED_DERIVATE(i, mod->collection[j+1].objData[object].cpuTime, mod->collection[j].objData[object].cpuTime, mod->parameters[object]);
84       mod->parameters[object][i] += LEARNING_PARAM * differential[i];
85     }
86     CkPrintf(PRINT_MODEL(mod->parameters[object]));
87     error = 0;
88     for (i=0; i<_lb_predict_delay-1; ++i) error += pow(mod->collection[i+1].objData[object].cpuTime - PRED_FUNCTION(mod->collection[i].objData[object].cpuTime, mod->parameters[object]), 2);
89   }
90
91 #endif
92 }
93 */
94
95 #define MAX_CHISQ_ITER 10000
96 #define SMALL_NUMBER   0.00001    // to avoid singular matrix in gaussj
97
98 /*
99 #define NUM_PARAMETERS 6
100 #define MY_FUNCTION    hypothesis_function
101
102 // FUNCTIONS TO DEFINE A SPECIFIC MODEL
103
104 inline int future_numPar() {return 6;}
105
106 inline void initialize_params(double *x) {double normall=1.0/pow(2,31); x[0]=rand()*normall; x[1]=rand()*normall; x[2]=rand()*normall; x[3]=rand()*normall; x[4]=rand()*normall; x[5]=rand()*normall;}
107
108 // compute the prediction function for the variable x with parameters param
109 inline double pred_function(double x, double *param) {return (param[0] + param[1]*x + param[2]*x*x + param[3]*sin(param[4]*(x+param[5])));}
110
111 inline void print_future_model(double *param) {CkPrintf("LB: %f + %fx + %fx^2 + %fsin%f(x+%f)\n",param[0],param[1],param[2],param[3],param[4],param[5]);}
112
113 // compute the prediction function and its derivatives respect to the parameters
114 void hypothesis_function(double x, double *param, double &y, double *dyda) {
115 #if CMK_LBDB_ON
116   double tmp;
117
118   y = pred_function(x, param);
119
120   dyda[0] = 1;
121   dyda[1] = x;
122   dyda[2] = x*x;
123   tmp = param[4] * (x+param[5]);
124   dyda[3] = sin(tmp);
125   dyda[4] = param[3] * (x+param[5]) * cos(tmp);
126   dyda[5] = param[3] * param[4] *cos(tmp);
127
128 #endif
129 }
130 */
131 // END OF FUNCTIONS TO DEFINE A SPECIFIC MODEL
132
133 void gaussj(double **a, double *b, int n) {
134 #if CMK_LBDB_ON
135   int i,j,k;
136   int irow, icol;
137   double big, dum, pivinv;
138   // arrays for bookkeeping on the pivoting
139   int *indxc, *indxr, *ipiv;
140
141   indxc = new int[n];
142   indxr = new int[n];
143   ipiv = new int[n];
144   for (j=0; j<n; ++j) ipiv[j]=0;
145   // main loop over the columns to be reduced
146   for (i=0; i<n; ++i) {
147     big = 0;
148     // outer loop of the search for a pivot element
149     for (j=0; j<n; ++j)
150       if (ipiv[j] != 1)
151         for (k=0; k<n; ++k) {
152           if (ipiv[k] == 0 && fabs(a[j][k]) >= big) {
153             big = fabs(a[j][k]);
154             irow=j;
155             icol=k;
156           }
157         }
158     ++(ipiv[icol]);
159
160     if (irow != icol) {
161       for (j=0; j<n; ++j) {dum=a[irow][j]; a[irow][j]=a[icol][j]; a[icol][j]=dum;}
162       dum=b[irow]; b[irow]=b[icol]; b[icol]=dum;
163     }
164     // we are now ready to divide the pivot row by the pivot element, located at irow, icol
165     indxr[i]=irow;
166     indxc[i]=icol;
167     if (a[icol][icol] == 0) {
168       a[icol][icol] = SMALL_NUMBER;
169       CkPrintf("LB: Singular Matrix\n");
170     }
171     pivinv = 1.0/a[icol][icol];
172     a[icol][icol] = 1;
173     for (j=0; j<n; ++j) a[icol][j] *= pivinv;
174     b[icol] *= pivinv;
175     for (j=0; j<n; ++j)
176       if (j != icol) {
177         dum = a[j][icol];
178         a[j][icol] = 0;
179         for (k=0; k<n; ++k) a[j][k] -= a[icol][k]*dum;
180         b[j] -= b[icol]*dum;
181       }
182   }
183   // unscramble the matrix
184   for (i=n-1; i>=0; --i) {
185     if (indxr[i] != indxc[i])
186       for (j=0; j<n; ++j) {dum=a[j][indxr[i]]; a[j][indxr[i]]=a[j][indxc[i]]; a[j][indxc[i]]=dum;}
187   }
188   delete[] indxr;
189   delete[] indxc;
190   delete[] ipiv;
191
192 #endif
193 }
194
195 void Marquardt_coefficients(double *x, double *y, double *param, double **alpha, double *beta, double &chisq, LBPredictorFunction *predict) {
196 #if CMK_LBDB_ON
197   int i,j,k,l,m;
198   double ymod, dy;
199   double *dyda = new double[predict->num_params];
200
201   for (i=0; i<predict->num_params; ++i) {
202     for (j=0; j<=i; ++j) alpha[i][j] = 0;
203     beta[i]=0;
204   }
205   chisq = 0;
206
207   // summation loop over all data
208   for (i=0; i<predict->num_params; ++i) {
209     predict->function(x[i], param, ymod, dyda);
210     dy = y[i] - ymod;
211     for (j=0, l=0; l<predict->num_params; ++l) {
212       for (k=0, m=0; m<l+1; ++m) {
213         alpha[j][k++] += dyda[l]*dyda[m];
214       }
215       beta[j++] += dy*dyda[l];
216     }
217     chisq += dy*dy;
218   }
219
220   // fill the symmetric side
221   for (j=1; j<predict->num_params; ++j) {
222     for (k=0; k<j; ++k) alpha[k][j] = alpha[j][k];
223   }
224
225   delete[] dyda;
226 #endif
227 }
228
229 bool Marquardt_solver(CentralLB::FutureModel *mod, int object) {
230 #if CMK_LBDB_ON
231   double chisq, ochisq;
232   double lambda = 0.001;
233   int i,j;
234   int iterations=0;
235   bool allow_stop = false;
236
237   double *oneda = new double[mod->predictor->num_params];
238   double *atry = new double[mod->predictor->num_params];
239   double *beta = new double[mod->predictor->num_params];
240   double *da = new double[mod->predictor->num_params];
241   double **covar = new double*[mod->predictor->num_params];
242   double **alpha = new double*[mod->predictor->num_params];
243   double *x = new double[mod->cur_stats-1];
244   double *y = new double[mod->cur_stats-1];
245   double **temp = new double*[mod->predictor->num_params];
246
247   for (i=0; i<mod->predictor->num_params; ++i) {
248     alpha[i] = new double[mod->predictor->num_params];
249     covar[i] = new double[mod->predictor->num_params];
250     temp[i] = new double[mod->predictor->num_params];
251     atry[i] = mod->parameters[object][i];
252   }
253   for (i=0; i<mod->cur_stats-2; ++i) {
254     x[i] = mod->collection[i].objData[object].wallTime;
255     y[i] = mod->collection[i+1].objData[object].wallTime;
256   }
257
258   Marquardt_coefficients(x,y,mod->parameters[object],alpha,beta,chisq,mod->predictor);
259   ochisq = chisq;
260
261   while (chisq > 0.01 || !allow_stop) {
262     if (++iterations > MAX_CHISQ_ITER) {
263       // something wrong!!!
264       return false;
265     }
266     // alter linearized fitting matrix, by augmenting diagonal elements
267     for (i=0; i<mod->predictor->num_params; ++i) {
268       for (j=0; j<mod->predictor->num_params; ++j) covar[i][j] = alpha[i][j];
269       covar[i][i] = alpha[i][i] * (1 + lambda);
270       for (j=0; j<mod->predictor->num_params; ++j) temp[i][j] = covar[i][j];
271       oneda[i] = beta[i];
272     }
273
274     // matrix solution
275     gaussj(temp, oneda, mod->predictor->num_params);
276     for (i=0; i<mod->predictor->num_params; ++i) {
277       for (j=0; j<mod->predictor->num_params; ++j) covar[i][j] = temp[i][j];
278       da[i] = oneda[i];
279     }
280
281     // did the trial succeed?
282     for (i=0, j=0; j<mod->predictor->num_params; ++j) atry[j] = mod->parameters[object][j] + da[i++];
283     Marquardt_coefficients(x,y,atry,covar,da,chisq,mod->predictor);
284     if (chisq < ochisq) {  // success, accept the new solution
285       lambda *= 0.1;
286       allow_stop = true;
287       for (i=0; i<mod->predictor->num_params; ++i) {
288         for (j=0; j<mod->predictor->num_params; ++j) alpha[i][j] = covar[i][j];
289         beta[i] = da[i];
290         mod->parameters[object][i] = atry[i];
291       }
292     } else {  // failure, increase lamda
293       lambda *= 10;
294       allow_stop = false;
295     }
296     ochisq = chisq;
297   }
298   for (i=0; i<mod->predictor->num_params; ++i) {
299     delete[] alpha[i];
300     delete[] covar[i];
301     delete[] temp[i];
302   }
303   delete[] oneda;
304   delete[] atry;
305   delete[] beta;
306   delete[] da;
307   delete[] covar;
308   delete[] alpha;
309   delete[] x;
310   delete[] y;
311   delete[] temp;
312
313   return true;
314 #endif
315 }
316
317 // routine that update LDStats given a predictor model
318 void CentralLB::FuturePredictor(CentralLB::LDStats* stats) {
319 #if CMK_LBDB_ON
320   bool model_done;
321   int i;
322
323   if (predicted_model->cur_stats < _lb_predict_delay) {
324     // not yet ready to create the model, just store the relevant statistic
325     predicted_model->collection[predicted_model->start_stats].objData = new LDObjData[stats->n_objs];
326     predicted_model->collection[predicted_model->start_stats].commData = new LDCommData[stats->n_comm];
327     predicted_model->collection[predicted_model->start_stats].n_objs = stats->n_objs;
328     predicted_model->collection[predicted_model->start_stats].n_migrateobjs = stats->n_migrateobjs;
329     predicted_model->collection[predicted_model->start_stats].n_comm = stats->n_comm;
330     for (i=0; i<stats->n_objs; ++i)
331       predicted_model->collection[predicted_model->start_stats].objData[i] = stats->objData[i];
332     for (i=0; i<stats->n_comm; ++i)
333       predicted_model->collection[predicted_model->start_stats].commData[i] = stats->commData[i];
334     ++predicted_model->cur_stats;
335     ++predicted_model->start_stats;
336
337   } else {
338
339     if (predicted_model->parameters == NULL) {     // time to create the new prediction model
340       // allocate parameters
341       predicted_model->model_valid = new bool[stats->n_objs];
342       predicted_model->parameters = new double*[stats->n_objs];
343       for (i=0; i<stats->n_objs; ++i) predicted_model->parameters[i] = new double[predicted_model->predictor->num_params];
344       for (i=0; i<stats->n_objs; ++i) {
345         // initialization
346         predicted_model->predictor->initialize_params(predicted_model->parameters[i]);
347         predicted_model->predictor->print(predicted_model->parameters[i]);
348
349         model_done = Marquardt_solver(predicted_model, i);
350         // always initialize to false for conservativity
351         predicted_model->model_valid[i] = false;
352         CkPrintf("LB: Model for object %d %s\n",i,model_done?"found":"not found");
353         predicted_model->predictor->print(predicted_model->parameters[i]);
354       }
355
356       if (predicted_model->model_valid) {
357         CkPrintf("LB: New model completely constructed\n");
358       } else {
359         CkPrintf("LB: Construction of new model failed\n");
360       }
361
362     } else {     // model already constructed, update it
363
364       double *error_model = new double[stats->n_objs];
365       double *error_default = new double[stats->n_objs];
366
367       CkPrintf("Error in estimation:\n");
368       for (i=0; i<stats->n_objs; ++i) {
369         error_model[i] = stats->objData[i].wallTime-predicted_model->predictor->predict(predicted_model->collection[(predicted_model->start_stats-1)%predicted_model->n_stats].objData[i].wallTime,predicted_model->parameters[i]);
370         error_default[i] = stats->objData[i].wallTime-predicted_model->collection[(predicted_model->start_stats-1)%predicted_model->n_stats].objData[i].wallTime;
371         CkPrintf("object %d: real time=%f, model error=%f, default error=%f\n",i,stats->objData[i].wallTime,error_model[i],error_default[i]);
372       }
373
374       // save statistics in the last position
375       if (predicted_model->start_stats >= predicted_model->n_stats) predicted_model->start_stats -= predicted_model->n_stats;
376       if (predicted_model->cur_stats < predicted_model->n_stats) ++predicted_model->cur_stats;
377
378       if (predicted_model->collection[predicted_model->start_stats].objData != NULL) {
379         delete predicted_model->collection[predicted_model->start_stats].objData;
380         delete predicted_model->collection[predicted_model->start_stats].commData;
381       }
382       predicted_model->collection[predicted_model->start_stats].objData = new LDObjData[stats->n_objs];
383       predicted_model->collection[predicted_model->start_stats].commData = new LDCommData[stats->n_comm];
384
385       predicted_model->collection[predicted_model->start_stats].n_objs = stats->n_objs;
386       predicted_model->collection[predicted_model->start_stats].n_migrateobjs = stats->n_migrateobjs;
387       predicted_model->collection[predicted_model->start_stats].n_comm = stats->n_comm;
388       for (i=0; i<stats->n_objs; ++i)
389         predicted_model->collection[predicted_model->start_stats].objData[i] = stats->objData[i];
390       for (i=0; i<stats->n_comm; ++i)
391         predicted_model->collection[predicted_model->start_stats].commData[i] = stats->commData[i];      
392       ++predicted_model->start_stats;      
393
394       // check if model is ok
395       // the check can be performed even if the model is not valid since it will
396       // releave which objects are wrongly updated and will try to fix them
397
398       // the update of the model is done if the model does not approximate
399       // sufficiently well the underlining function or if the time-invariante
400       // approach is performing better
401       for (i=0; i<stats->n_objs; ++i) {
402         //if (fabs(error_model[i]) > 0.2*stats->objData[i].wallTime || fabs(error_model[i]) > fabs(error_default[i])) {
403         if (fabs(error_model[i]) > fabs(error_default[i])) {  // no absolute error check
404           predicted_model->model_valid[i] = false;
405           // model wrong, rebuild it now
406           predicted_model->predictor->initialize_params(predicted_model->parameters[i]);
407           model_done = Marquardt_solver(predicted_model, i);
408           CkPrintf("LB: Updated model for object %d %s",i,model_done?"success":"failed. ");
409           predicted_model->predictor->print(predicted_model->parameters[i]);
410         }
411         if (fabs(error_model[i]) < fabs(error_default[i])) predicted_model->model_valid[i] = true;
412       }
413
414     }
415
416     // use the model to update statistics
417     double *param;
418     for (int i=0; i<stats->n_objs; ++i) {
419       if (predicted_model->model_valid[i]) {
420         param = predicted_model->parameters[i];
421         stats->objData[i].cpuTime = predicted_model->predictor->predict(stats->objData[i].cpuTime, param);
422         stats->objData[i].wallTime = predicted_model->predictor->predict(stats->objData[i].wallTime, param);
423       }
424     }
425
426   }
427
428 #endif
429 }
430
431 /*@}*/