added interface to allow Centralized load balancer to be able to ignore some of the...
authorGengbin Zheng <gzheng@illinois.edu>
Mon, 26 Jan 2004 00:45:50 +0000 (00:45 +0000)
committerGengbin Zheng <gzheng@illinois.edu>
Mon, 26 Jan 2004 00:45:50 +0000 (00:45 +0000)
moved future predictor code into a separate file CentralPredictor.C.

src/ck-ldb/BaseLB.h
src/ck-ldb/CentralLB.C
src/ck-ldb/CentralPredictor.C [new file with mode: 0644]
src/ck-ldb/LBDatabase.h
src/ck-ldb/LBObj.h
src/ck-ldb/RandRefLB.C
src/ck-ldb/lbdb.C
src/ck-ldb/lbdb.h

index 967f67f8bc15007aaa57fdc39fa153b4b6ef6d41..938e69018f5fe94323ee20c12d1d76bef97096e5 100644 (file)
@@ -50,6 +50,8 @@ struct MigrateInfo {
     LDObjHandle obj;
     int from_pe;
     int to_pe;
+    int ignore_arrival;            // if an object is available for immediate migrate
+    MigrateInfo():  ignore_arrival(0) {}
 };
 
 /**
index 5f0d0467557435848219d58f1a04d0438d40ea63..565dd67cb0823075db4caa440c5d184de56892e3 100644 (file)
@@ -10,7 +10,6 @@
 */
 /*@{*/
 
-#include <math.h>
 #include <charm++.h>
 #include "envelope.h"
 #include "CentralLB.h"
@@ -58,31 +57,6 @@ void CentralLB::staticAtSync(void* data)
   me->AtSync();
 }
 
-void CentralLB::staticPredictorOn(void* data, void *model)
-{
-  CentralLB *me = (CentralLB*)(data);
-  me->predictorOn((LBPredictorFunction*)model);
-}
-
-void CentralLB::staticPredictorOnWin(void* data, void *model, int wind)
-{
-  CentralLB *me = (CentralLB*)(data);
-  me->predictorOn((LBPredictorFunction*)model, wind);
-}
-
-void CentralLB::staticPredictorOff(void* data)
-{
-  CentralLB *me = (CentralLB*)(data);
-  me->predictorOff();
-}
-
-void CentralLB::staticChangePredictor(void* data, void *model)
-{
-  CentralLB *me = (CentralLB*)(data);
-  me->changePredictor((LBPredictorFunction*)model);
-}
-
-
 void CentralLB::initLB(const CkLBOptions &opt)
 {
 #if CMK_LBDB_ON
@@ -108,6 +82,7 @@ void CentralLB::initLB(const CkLBOptions &opt)
 
   statsData = new LDStats;
 
+  // for future predictor
   if (_lb_predict) predicted_model = new FutureModel(_lb_predict_window);
   else predicted_model=0;
   // register user interface callbacks
@@ -448,10 +423,11 @@ void CentralLB::ReceiveMigration(LBMigrateMsg *m)
       DEBUGF(("[%d] migrating object to %d\n",move.from_pe,move.to_pe));
       theLbdb->Migrate(move.obj,move.to_pe);
     } else if (move.from_pe != me && move.to_pe == me) {
-       //  CkPrintf("[%d] expecting object from %d\n",move.to_pe,move.from_pe);
-      migrates_expected++;
+      // CkPrintf("[%d] expecting object from %d\n",move.to_pe,move.from_pe);
+      if (!move.ignore_arrival) migrates_expected++;
     }
   }
+  // if (_lb_debug) CkPrintf("[%d] expecting %d objects migrating.\n", CkMyPe(), migrates_expected);
 #if 0
   if (m->n_moves ==0) {
     theLbdb->SetLBPeriod(theLbdb->GetLBPeriod()*2);
@@ -581,6 +557,7 @@ LBMigrateMsg * CentralLB::createMigrateMsg(LDStats* stats,int count)
       migrateMe->obj = objData.handle;
       migrateMe->from_pe = frompe;
       migrateMe->to_pe = tope;
+      migrateMe->ignore_arrival = objData.ignoreArrival;
       migrateInfo.insertAtEnd(migrateMe);
     }
   }
@@ -745,396 +722,6 @@ void CentralLB::writeStatsMsgs(const char* filename)
 #endif
 }
 
-/*
-// definitions for the implemented predictor model
-#define NR_THRESHOLD            0.0001
-#define INITIALIZE_PARAMS(x)    {x[0]=0; x[1]=1; x[2]=0; x[3]=0; x[4]=0; x[5]=0;}
-#define PRED_FUNCTION(x, param) param[0] + param[1]*x + param[2]*x*x + param[3]*sin(param[4]*(x+param[5]))
-
-#define ADD_DIFFERENTIAL(diff,collec,index,param)  {diff[0]+=PRED_DERIVATE_0(collec[index+1].objData[object].cpuTime,collec[index].objData[object].cpuTime,param); \
-diff[1]+=PRED_DERIVATE_1(collec[index+1].objData[object].cpuTime,collec[index].objData[object].cpuTime,param); \
-diff[2]+=PRED_DERIVATE_2(collec[index+1].objData[object].cpuTime,collec[index].objData[object].cpuTime,param); \
-diff[3]+=PRED_DERIVATE_3(collec[index+1].objData[object].cpuTime,collec[index].objData[object].cpuTime,param); \
-diff[4]+=PRED_DERIVATE_4(collec[index+1].objData[object].cpuTime,collec[index].objData[object].cpuTime,param); \
-diff[5]+=PRED_DERIVATE_5(collec[index+1].objData[object].cpuTime,collec[index].objData[object].cpuTime,param);}
-
-#define PRED_DERIVATE_0(y,x,p)  (y-PRED_FUNCTION(x,p))
-#define PRED_DERIVATE_1(y,x,p)  (y-PRED_FUNCTION(x,p))*x
-#define PRED_DERIVATE_2(y,x,p)  (y-PRED_FUNCTION(x,p))*x*x
-#define PRED_DERIVATE_3(y,x,p)  (y-PRED_FUNCTION(x,p))*sin(p[4]*(x+p[5]))
-#define PRED_DERIVATE_4(y,x,p)  (y-PRED_FUNCTION(x,p))*p[3]*(x+p[5])*cos(p[4]*(x+p[5]))
-#define PRED_DERIVATE_5(y,x,p)  (y-PRED_FUNCTION(x,p))*p[3]*p[4]*cos(p[4]*(x+p[5]))
-
-#define PRINT_MODEL(param)   "LB: %f + %fx + %fx^2 + %fsin%f(x+%f)\n",param[0],param[1],param[2],param[3],param[4],param[5]
-#define LEARNING_PARAM   1
-
-static void Newton_Raphson(CentralLB::FutureModel *mod, int object) {
-#if CMK_LBDB_ON
-  int i,j;
-  double differential[NUM_PARAMETERS];
-  double error = 0;
-  int loop=0;
-  char mystring[10]="my";
-  for (i=0; i<_lb_predict_delay-1; ++i) {
-    CkPrintf("y,x = %f %f\n",mod->collection[i+1].objData[object].cpuTime,mod->collection[i].objData[object].cpuTime,PRED_FUNCTION(mod->collection[i].objData[object].cpuTime, mod->parameters[object]));
-    error += pow(mod->collection[i+1].objData[object].cpuTime - PRED_FUNCTION(mod->collection[i].objData[object].cpuTime, mod->parameters[object]), 2);
-  }
-
-  while (error > NR_THRESHOLD) {
-    CkPrintf("error: %f\n",error);
-    if (++loop==10) CmiAbort(mystring);
-    for (j=0; j<NUM_PARAMETERS; ++j) differential[j] = 0;
-    for (i=0; i<_lb_predict_delay-1; ++i) {
-      ADD_DIFFERENTIAL(differential, mod->collection, i, mod->parameters[object]);
-
-      //       differential[i] += PRED_DERIVATE(i, mod->collection[j+1].objData[object].cpuTime, mod->collection[j].objData[object].cpuTime, mod->parameters[object]);
-      mod->parameters[object][i] += LEARNING_PARAM * differential[i];
-    }
-    CkPrintf(PRINT_MODEL(mod->parameters[object]));
-    error = 0;
-    for (i=0; i<_lb_predict_delay-1; ++i) error += pow(mod->collection[i+1].objData[object].cpuTime - PRED_FUNCTION(mod->collection[i].objData[object].cpuTime, mod->parameters[object]), 2);
-  }
-
-#endif
-}
-*/
-
-#define MAX_CHISQ_ITER 10000
-#define SMALL_NUMBER   0.00001    // to avoid singular matrix in gaussj
-
-/*
-#define NUM_PARAMETERS 6
-#define MY_FUNCTION    hypothesis_function
-
-// FUNCTIONS TO DEFINE A SPECIFIC MODEL
-
-inline int future_numPar() {return 6;}
-
-inline void initialize_params(double *x) {double normall=1.0/pow(2,31); x[0]=rand()*normall; x[1]=rand()*normall; x[2]=rand()*normall; x[3]=rand()*normall; x[4]=rand()*normall; x[5]=rand()*normall;}
-
-// compute the prediction function for the variable x with parameters param
-inline double pred_function(double x, double *param) {return (param[0] + param[1]*x + param[2]*x*x + param[3]*sin(param[4]*(x+param[5])));}
-
-inline void print_future_model(double *param) {CkPrintf("LB: %f + %fx + %fx^2 + %fsin%f(x+%f)\n",param[0],param[1],param[2],param[3],param[4],param[5]);}
-
-// compute the prediction function and its derivatives respect to the parameters
-void hypothesis_function(double x, double *param, double &y, double *dyda) {
-#if CMK_LBDB_ON
-  double tmp;
-
-  y = pred_function(x, param);
-
-  dyda[0] = 1;
-  dyda[1] = x;
-  dyda[2] = x*x;
-  tmp = param[4] * (x+param[5]);
-  dyda[3] = sin(tmp);
-  dyda[4] = param[3] * (x+param[5]) * cos(tmp);
-  dyda[5] = param[3] * param[4] *cos(tmp);
-
-#endif
-}
-*/
-// END OF FUNCTIONS TO DEFINE A SPECIFIC MODEL
-
-void gaussj(double **a, double *b, int n) {
-#if CMK_LBDB_ON
-  int i,j,k;
-  int irow, icol;
-  double big, dum, pivinv;
-  // arrays for bookkeeping on the pivoting
-  int *indxc, *indxr, *ipiv;
-
-  indxc = new int[n];
-  indxr = new int[n];
-  ipiv = new int[n];
-  for (j=0; j<n; ++j) ipiv[j]=0;
-  // main loop over the columns to be reduced
-  for (i=0; i<n; ++i) {
-    big = 0;
-    // outer loop of the search for a pivot element
-    for (j=0; j<n; ++j)
-      if (ipiv[j] != 1)
-       for (k=0; k<n; ++k) {
-         if (ipiv[k] == 0 && fabs(a[j][k]) >= big) {
-           big = fabs(a[j][k]);
-           irow=j;
-           icol=k;
-         }
-       }
-    ++(ipiv[icol]);
-
-    if (irow != icol) {
-      for (j=0; j<n; ++j) {dum=a[irow][j]; a[irow][j]=a[icol][j]; a[icol][j]=dum;}
-      dum=b[irow]; b[irow]=b[icol]; b[icol]=dum;
-    }
-    // we are now ready to divide the pivot row by the pivot element, located at irow, icol
-    indxr[i]=irow;
-    indxc[i]=icol;
-    if (a[icol][icol] == 0) {
-      a[icol][icol] = SMALL_NUMBER;
-      CkPrintf("LB: Singular Matrix\n");
-    }
-    pivinv = 1.0/a[icol][icol];
-    a[icol][icol] = 1;
-    for (j=0; j<n; ++j) a[icol][j] *= pivinv;
-    b[icol] *= pivinv;
-    for (j=0; j<n; ++j)
-      if (j != icol) {
-       dum = a[j][icol];
-       a[j][icol] = 0;
-       for (k=0; k<n; ++k) a[j][k] -= a[icol][k]*dum;
-       b[j] -= b[icol]*dum;
-      }
-  }
-  // unscramble the matrix
-  for (i=n-1; i>=0; --i) {
-    if (indxr[i] != indxc[i])
-      for (j=0; j<n; ++j) {dum=a[j][indxr[i]]; a[j][indxr[i]]=a[j][indxc[i]]; a[j][indxc[i]]=dum;}
-  }
-  delete[] indxr;
-  delete[] indxc;
-  delete[] ipiv;
-
-#endif
-}
-
-void Marquardt_coefficients(double *x, double *y, double *param, double **alpha, double *beta, double &chisq, LBPredictorFunction *predict) {
-#if CMK_LBDB_ON
-  int i,j,k,l,m;
-  double ymod, dy;
-  double *dyda = new double[predict->num_params];
-
-  for (i=0; i<predict->num_params; ++i) {
-    for (j=0; j<=i; ++j) alpha[i][j] = 0;
-    beta[i]=0;
-  }
-  chisq = 0;
-
-  // summation loop over all data
-  for (i=0; i<predict->num_params; ++i) {
-    predict->function(x[i], param, ymod, dyda);
-    dy = y[i] - ymod;
-    for (j=0, l=0; l<predict->num_params; ++l) {
-      for (k=0, m=0; m<l+1; ++m) {
-       alpha[j][k++] += dyda[l]*dyda[m];
-      }
-      beta[j++] += dy*dyda[l];
-    }
-    chisq += dy*dy;
-  }
-
-  // fill the symmetric side
-  for (j=1; j<predict->num_params; ++j) {
-    for (k=0; k<j; ++k) alpha[k][j] = alpha[j][k];
-  }
-
-  delete[] dyda;
-#endif
-}
-
-bool Marquardt_solver(CentralLB::FutureModel *mod, int object) {
-#if CMK_LBDB_ON
-  double chisq, ochisq;
-  double lambda = 0.001;
-  int i,j;
-  int iterations=0;
-  bool allow_stop = false;
-
-  double *oneda = new double[mod->predictor->num_params];
-  double *atry = new double[mod->predictor->num_params];
-  double *beta = new double[mod->predictor->num_params];
-  double *da = new double[mod->predictor->num_params];
-  double **covar = new double*[mod->predictor->num_params];
-  double **alpha = new double*[mod->predictor->num_params];
-  double *x = new double[mod->cur_stats-1];
-  double *y = new double[mod->cur_stats-1];
-  double **temp = new double*[mod->predictor->num_params];
-
-  for (i=0; i<mod->predictor->num_params; ++i) {
-    alpha[i] = new double[mod->predictor->num_params];
-    covar[i] = new double[mod->predictor->num_params];
-    temp[i] = new double[mod->predictor->num_params];
-    atry[i] = mod->parameters[object][i];
-  }
-  for (i=0; i<mod->cur_stats-2; ++i) {
-    x[i] = mod->collection[i].objData[object].wallTime;
-    y[i] = mod->collection[i+1].objData[object].wallTime;
-  }
-
-  Marquardt_coefficients(x,y,mod->parameters[object],alpha,beta,chisq,mod->predictor);
-  ochisq = chisq;
-
-  while (chisq > 0.01 || !allow_stop) {
-    if (++iterations > MAX_CHISQ_ITER) {
-      // something wrong!!!
-      return false;
-    }
-    // alter linearized fitting matrix, by augmenting diagonal elements
-    for (i=0; i<mod->predictor->num_params; ++i) {
-      for (j=0; j<mod->predictor->num_params; ++j) covar[i][j] = alpha[i][j];
-      covar[i][i] = alpha[i][i] * (1 + lambda);
-      for (j=0; j<mod->predictor->num_params; ++j) temp[i][j] = covar[i][j];
-      oneda[i] = beta[i];
-    }
-
-    // matrix solution
-    gaussj(temp, oneda, mod->predictor->num_params);
-    for (i=0; i<mod->predictor->num_params; ++i) {
-      for (j=0; j<mod->predictor->num_params; ++j) covar[i][j] = temp[i][j];
-      da[i] = oneda[i];
-    }
-
-    // did the trial succeed?
-    for (i=0, j=0; j<mod->predictor->num_params; ++j) atry[j] = mod->parameters[object][j] + da[i++];
-    Marquardt_coefficients(x,y,atry,covar,da,chisq,mod->predictor);
-    if (chisq < ochisq) {  // success, accept the new solution
-      lambda *= 0.1;
-      allow_stop = true;
-      for (i=0; i<mod->predictor->num_params; ++i) {
-       for (j=0; j<mod->predictor->num_params; ++j) alpha[i][j] = covar[i][j];
-       beta[i] = da[i];
-       mod->parameters[object][i] = atry[i];
-      }
-    } else {  // failure, increase lamda
-      lambda *= 10;
-      allow_stop = false;
-    }
-    ochisq = chisq;
-  }
-  for (i=0; i<mod->predictor->num_params; ++i) {
-    delete[] alpha[i];
-    delete[] covar[i];
-    delete[] temp[i];
-  }
-  delete[] oneda;
-  delete[] atry;
-  delete[] beta;
-  delete[] da;
-  delete[] covar;
-  delete[] alpha;
-  delete[] x;
-  delete[] y;
-  delete[] temp;
-
-  return true;
-#endif
-}
-
-// routine that update LDStats given a predictor model
-void CentralLB::FuturePredictor(CentralLB::LDStats* stats) {
-#if CMK_LBDB_ON
-  bool model_done;
-  int i;
-
-  if (predicted_model->cur_stats < _lb_predict_delay) {
-    // not yet ready to create the model, just store the relevant statistic
-    predicted_model->collection[predicted_model->start_stats].objData = new LDObjData[stats->n_objs];
-    predicted_model->collection[predicted_model->start_stats].commData = new LDCommData[stats->n_comm];
-    predicted_model->collection[predicted_model->start_stats].n_objs = stats->n_objs;
-    predicted_model->collection[predicted_model->start_stats].n_migrateobjs = stats->n_migrateobjs;
-    predicted_model->collection[predicted_model->start_stats].n_comm = stats->n_comm;
-    for (i=0; i<stats->n_objs; ++i)
-      predicted_model->collection[predicted_model->start_stats].objData[i] = stats->objData[i];
-    for (i=0; i<stats->n_comm; ++i)
-      predicted_model->collection[predicted_model->start_stats].commData[i] = stats->commData[i];
-    ++predicted_model->cur_stats;
-    ++predicted_model->start_stats;
-
-  } else {
-
-    if (predicted_model->parameters == NULL) {     // time to create the new prediction model
-      // allocate parameters
-      predicted_model->model_valid = new bool[stats->n_objs];
-      predicted_model->parameters = new double*[stats->n_objs];
-      for (i=0; i<stats->n_objs; ++i) predicted_model->parameters[i] = new double[predicted_model->predictor->num_params];
-      for (i=0; i<stats->n_objs; ++i) {
-       // initialization
-       predicted_model->predictor->initialize_params(predicted_model->parameters[i]);
-       predicted_model->predictor->print(predicted_model->parameters[i]);
-
-       model_done = Marquardt_solver(predicted_model, i);
-       // always initialize to false for conservativity
-       predicted_model->model_valid[i] = false;
-       CkPrintf("LB: Model for object %d %s\n",i,model_done?"found":"not found");
-       predicted_model->predictor->print(predicted_model->parameters[i]);
-      }
-
-      if (predicted_model->model_valid) {
-       CkPrintf("LB: New model completely constructed\n");
-      } else {
-       CkPrintf("LB: Construction of new model failed\n");
-      }
-
-    } else {     // model already constructed, update it
-
-      double *error_model = new double[stats->n_objs];
-      double *error_default = new double[stats->n_objs];
-
-      CkPrintf("Error in estimation:\n");
-      for (i=0; i<stats->n_objs; ++i) {
-       error_model[i] = stats->objData[i].wallTime-predicted_model->predictor->predict(predicted_model->collection[(predicted_model->start_stats-1)%predicted_model->n_stats].objData[i].wallTime,predicted_model->parameters[i]);
-       error_default[i] = stats->objData[i].wallTime-predicted_model->collection[(predicted_model->start_stats-1)%predicted_model->n_stats].objData[i].wallTime;
-       CkPrintf("object %d: real time=%f, model error=%f, default error=%f\n",i,stats->objData[i].wallTime,error_model[i],error_default[i]);
-      }
-
-      // save statistics in the last position
-      if (predicted_model->start_stats >= predicted_model->n_stats) predicted_model->start_stats -= predicted_model->n_stats;
-      if (predicted_model->cur_stats < predicted_model->n_stats) ++predicted_model->cur_stats;
-
-      if (predicted_model->collection[predicted_model->start_stats].objData != NULL) {
-       delete predicted_model->collection[predicted_model->start_stats].objData;
-       delete predicted_model->collection[predicted_model->start_stats].commData;
-      }
-      predicted_model->collection[predicted_model->start_stats].objData = new LDObjData[stats->n_objs];
-      predicted_model->collection[predicted_model->start_stats].commData = new LDCommData[stats->n_comm];
-
-      predicted_model->collection[predicted_model->start_stats].n_objs = stats->n_objs;
-      predicted_model->collection[predicted_model->start_stats].n_migrateobjs = stats->n_migrateobjs;
-      predicted_model->collection[predicted_model->start_stats].n_comm = stats->n_comm;
-      for (i=0; i<stats->n_objs; ++i)
-       predicted_model->collection[predicted_model->start_stats].objData[i] = stats->objData[i];
-      for (i=0; i<stats->n_comm; ++i)
-       predicted_model->collection[predicted_model->start_stats].commData[i] = stats->commData[i];      
-      ++predicted_model->start_stats;      
-
-      // check if model is ok
-      // the check can be performed even if the model is not valid since it will
-      // releave which objects are wrongly updated and will try to fix them
-
-      // the update of the model is done if the model does not approximate
-      // sufficiently well the underlining function or if the time-invariante
-      // approach is performing better
-      for (i=0; i<stats->n_objs; ++i) {
-        //if (fabs(error_model[i]) > 0.2*stats->objData[i].wallTime || fabs(error_model[i]) > fabs(error_default[i])) {
-        if (fabs(error_model[i]) > fabs(error_default[i])) {  // no absolute error check
-         predicted_model->model_valid[i] = false;
-         // model wrong, rebuild it now
-         predicted_model->predictor->initialize_params(predicted_model->parameters[i]);
-         model_done = Marquardt_solver(predicted_model, i);
-         CkPrintf("LB: Updated model for object %d %s",i,model_done?"success":"failed. ");
-         predicted_model->predictor->print(predicted_model->parameters[i]);
-       }
-       if (fabs(error_model[i]) < fabs(error_default[i])) predicted_model->model_valid[i] = true;
-      }
-
-    }
-
-    // use the model to update statistics
-    double *param;
-    for (int i=0; i<stats->n_objs; ++i) {
-      if (predicted_model->model_valid[i]) {
-       param = predicted_model->parameters[i];
-       stats->objData[i].cpuTime = predicted_model->predictor->predict(stats->objData[i].cpuTime, param);
-       stats->objData[i].wallTime = predicted_model->predictor->predict(stats->objData[i].wallTime, param);
-      }
-    }
-
-  }
-
-#endif
-}
-
 // calculate the predicted wallclock/cpu load for every processors
 // considering communication overhead if considerComm is true
 static void getPredictedLoad(CentralLB::LDStats* stats, int count, 
@@ -1186,7 +773,7 @@ static void getPredictedLoad(CentralLB::LDStats* stats, int count,
              senderPE = cdata.src_proc;
            else {
              int idx = stats->getHash(cdata.sender);
-             CmiAssert(idx != -1);
+             if (idx == -1) continue;    // sender has just migrated?
              senderPE = stats->to_proc[idx];
              CmiAssert(senderPE != -1);
            }
@@ -1194,7 +781,7 @@ static void getPredictedLoad(CentralLB::LDStats* stats, int count,
              receiverPE = cdata.receiver.proc();
            else {
              int idx = stats->getHash(cdata.receiver.get_destObj());
-             CmiAssert(idx != -1);
+             if (idx == -1) continue;    // receiver has just been removed?
              receiverPE = stats->to_proc[idx];
              CmiAssert(receiverPE != -1);
            }
diff --git a/src/ck-ldb/CentralPredictor.C b/src/ck-ldb/CentralPredictor.C
new file mode 100644 (file)
index 0000000..e04f79b
--- /dev/null
@@ -0,0 +1,431 @@
+/*****************************************************************************
+ * $Source$
+ * $Author$
+ * $Date$
+ * $Revision$
+ *****************************************************************************/
+
+/**
+ * \addtogroup CkLdb
+*/
+/*@{*/
+
+#include <math.h>
+#include <charm++.h>
+#include "CentralLB.h"
+
+void CentralLB::staticPredictorOn(void* data, void *model)
+{
+  CentralLB *me = (CentralLB*)(data);
+  me->predictorOn((LBPredictorFunction*)model);
+}
+
+void CentralLB::staticPredictorOnWin(void* data, void *model, int wind)
+{
+  CentralLB *me = (CentralLB*)(data);
+  me->predictorOn((LBPredictorFunction*)model, wind);
+}
+
+void CentralLB::staticPredictorOff(void* data)
+{
+  CentralLB *me = (CentralLB*)(data);
+  me->predictorOff();
+}
+
+void CentralLB::staticChangePredictor(void* data, void *model)
+{
+  CentralLB *me = (CentralLB*)(data);
+  me->changePredictor((LBPredictorFunction*)model);
+}
+
+/*
+// definitions for the implemented predictor model
+#define NR_THRESHOLD            0.0001
+#define INITIALIZE_PARAMS(x)    {x[0]=0; x[1]=1; x[2]=0; x[3]=0; x[4]=0; x[5]=0;}
+#define PRED_FUNCTION(x, param) param[0] + param[1]*x + param[2]*x*x + param[3]*sin(param[4]*(x+param[5]))
+
+#define ADD_DIFFERENTIAL(diff,collec,index,param)  {diff[0]+=PRED_DERIVATE_0(collec[index+1].objData[object].cpuTime,collec[index].objData[object].cpuTime,param); \
+diff[1]+=PRED_DERIVATE_1(collec[index+1].objData[object].cpuTime,collec[index].objData[object].cpuTime,param); \
+diff[2]+=PRED_DERIVATE_2(collec[index+1].objData[object].cpuTime,collec[index].objData[object].cpuTime,param); \
+diff[3]+=PRED_DERIVATE_3(collec[index+1].objData[object].cpuTime,collec[index].objData[object].cpuTime,param); \
+diff[4]+=PRED_DERIVATE_4(collec[index+1].objData[object].cpuTime,collec[index].objData[object].cpuTime,param); \
+diff[5]+=PRED_DERIVATE_5(collec[index+1].objData[object].cpuTime,collec[index].objData[object].cpuTime,param);}
+
+#define PRED_DERIVATE_0(y,x,p)  (y-PRED_FUNCTION(x,p))
+#define PRED_DERIVATE_1(y,x,p)  (y-PRED_FUNCTION(x,p))*x
+#define PRED_DERIVATE_2(y,x,p)  (y-PRED_FUNCTION(x,p))*x*x
+#define PRED_DERIVATE_3(y,x,p)  (y-PRED_FUNCTION(x,p))*sin(p[4]*(x+p[5]))
+#define PRED_DERIVATE_4(y,x,p)  (y-PRED_FUNCTION(x,p))*p[3]*(x+p[5])*cos(p[4]*(x+p[5]))
+#define PRED_DERIVATE_5(y,x,p)  (y-PRED_FUNCTION(x,p))*p[3]*p[4]*cos(p[4]*(x+p[5]))
+
+#define PRINT_MODEL(param)   "LB: %f + %fx + %fx^2 + %fsin%f(x+%f)\n",param[0],param[1],param[2],param[3],param[4],param[5]
+#define LEARNING_PARAM   1
+
+static void Newton_Raphson(CentralLB::FutureModel *mod, int object) {
+#if CMK_LBDB_ON
+  int i,j;
+  double differential[NUM_PARAMETERS];
+  double error = 0;
+  int loop=0;
+  char mystring[10]="my";
+  for (i=0; i<_lb_predict_delay-1; ++i) {
+    CkPrintf("y,x = %f %f\n",mod->collection[i+1].objData[object].cpuTime,mod->collection[i].objData[object].cpuTime,PRED_FUNCTION(mod->collection[i].objData[object].cpuTime, mod->parameters[object]));
+    error += pow(mod->collection[i+1].objData[object].cpuTime - PRED_FUNCTION(mod->collection[i].objData[object].cpuTime, mod->parameters[object]), 2);
+  }
+
+  while (error > NR_THRESHOLD) {
+    CkPrintf("error: %f\n",error);
+    if (++loop==10) CmiAbort(mystring);
+    for (j=0; j<NUM_PARAMETERS; ++j) differential[j] = 0;
+    for (i=0; i<_lb_predict_delay-1; ++i) {
+      ADD_DIFFERENTIAL(differential, mod->collection, i, mod->parameters[object]);
+
+      //       differential[i] += PRED_DERIVATE(i, mod->collection[j+1].objData[object].cpuTime, mod->collection[j].objData[object].cpuTime, mod->parameters[object]);
+      mod->parameters[object][i] += LEARNING_PARAM * differential[i];
+    }
+    CkPrintf(PRINT_MODEL(mod->parameters[object]));
+    error = 0;
+    for (i=0; i<_lb_predict_delay-1; ++i) error += pow(mod->collection[i+1].objData[object].cpuTime - PRED_FUNCTION(mod->collection[i].objData[object].cpuTime, mod->parameters[object]), 2);
+  }
+
+#endif
+}
+*/
+
+#define MAX_CHISQ_ITER 10000
+#define SMALL_NUMBER   0.00001    // to avoid singular matrix in gaussj
+
+/*
+#define NUM_PARAMETERS 6
+#define MY_FUNCTION    hypothesis_function
+
+// FUNCTIONS TO DEFINE A SPECIFIC MODEL
+
+inline int future_numPar() {return 6;}
+
+inline void initialize_params(double *x) {double normall=1.0/pow(2,31); x[0]=rand()*normall; x[1]=rand()*normall; x[2]=rand()*normall; x[3]=rand()*normall; x[4]=rand()*normall; x[5]=rand()*normall;}
+
+// compute the prediction function for the variable x with parameters param
+inline double pred_function(double x, double *param) {return (param[0] + param[1]*x + param[2]*x*x + param[3]*sin(param[4]*(x+param[5])));}
+
+inline void print_future_model(double *param) {CkPrintf("LB: %f + %fx + %fx^2 + %fsin%f(x+%f)\n",param[0],param[1],param[2],param[3],param[4],param[5]);}
+
+// compute the prediction function and its derivatives respect to the parameters
+void hypothesis_function(double x, double *param, double &y, double *dyda) {
+#if CMK_LBDB_ON
+  double tmp;
+
+  y = pred_function(x, param);
+
+  dyda[0] = 1;
+  dyda[1] = x;
+  dyda[2] = x*x;
+  tmp = param[4] * (x+param[5]);
+  dyda[3] = sin(tmp);
+  dyda[4] = param[3] * (x+param[5]) * cos(tmp);
+  dyda[5] = param[3] * param[4] *cos(tmp);
+
+#endif
+}
+*/
+// END OF FUNCTIONS TO DEFINE A SPECIFIC MODEL
+
+void gaussj(double **a, double *b, int n) {
+#if CMK_LBDB_ON
+  int i,j,k;
+  int irow, icol;
+  double big, dum, pivinv;
+  // arrays for bookkeeping on the pivoting
+  int *indxc, *indxr, *ipiv;
+
+  indxc = new int[n];
+  indxr = new int[n];
+  ipiv = new int[n];
+  for (j=0; j<n; ++j) ipiv[j]=0;
+  // main loop over the columns to be reduced
+  for (i=0; i<n; ++i) {
+    big = 0;
+    // outer loop of the search for a pivot element
+    for (j=0; j<n; ++j)
+      if (ipiv[j] != 1)
+       for (k=0; k<n; ++k) {
+         if (ipiv[k] == 0 && fabs(a[j][k]) >= big) {
+           big = fabs(a[j][k]);
+           irow=j;
+           icol=k;
+         }
+       }
+    ++(ipiv[icol]);
+
+    if (irow != icol) {
+      for (j=0; j<n; ++j) {dum=a[irow][j]; a[irow][j]=a[icol][j]; a[icol][j]=dum;}
+      dum=b[irow]; b[irow]=b[icol]; b[icol]=dum;
+    }
+    // we are now ready to divide the pivot row by the pivot element, located at irow, icol
+    indxr[i]=irow;
+    indxc[i]=icol;
+    if (a[icol][icol] == 0) {
+      a[icol][icol] = SMALL_NUMBER;
+      CkPrintf("LB: Singular Matrix\n");
+    }
+    pivinv = 1.0/a[icol][icol];
+    a[icol][icol] = 1;
+    for (j=0; j<n; ++j) a[icol][j] *= pivinv;
+    b[icol] *= pivinv;
+    for (j=0; j<n; ++j)
+      if (j != icol) {
+       dum = a[j][icol];
+       a[j][icol] = 0;
+       for (k=0; k<n; ++k) a[j][k] -= a[icol][k]*dum;
+       b[j] -= b[icol]*dum;
+      }
+  }
+  // unscramble the matrix
+  for (i=n-1; i>=0; --i) {
+    if (indxr[i] != indxc[i])
+      for (j=0; j<n; ++j) {dum=a[j][indxr[i]]; a[j][indxr[i]]=a[j][indxc[i]]; a[j][indxc[i]]=dum;}
+  }
+  delete[] indxr;
+  delete[] indxc;
+  delete[] ipiv;
+
+#endif
+}
+
+void Marquardt_coefficients(double *x, double *y, double *param, double **alpha, double *beta, double &chisq, LBPredictorFunction *predict) {
+#if CMK_LBDB_ON
+  int i,j,k,l,m;
+  double ymod, dy;
+  double *dyda = new double[predict->num_params];
+
+  for (i=0; i<predict->num_params; ++i) {
+    for (j=0; j<=i; ++j) alpha[i][j] = 0;
+    beta[i]=0;
+  }
+  chisq = 0;
+
+  // summation loop over all data
+  for (i=0; i<predict->num_params; ++i) {
+    predict->function(x[i], param, ymod, dyda);
+    dy = y[i] - ymod;
+    for (j=0, l=0; l<predict->num_params; ++l) {
+      for (k=0, m=0; m<l+1; ++m) {
+       alpha[j][k++] += dyda[l]*dyda[m];
+      }
+      beta[j++] += dy*dyda[l];
+    }
+    chisq += dy*dy;
+  }
+
+  // fill the symmetric side
+  for (j=1; j<predict->num_params; ++j) {
+    for (k=0; k<j; ++k) alpha[k][j] = alpha[j][k];
+  }
+
+  delete[] dyda;
+#endif
+}
+
+bool Marquardt_solver(CentralLB::FutureModel *mod, int object) {
+#if CMK_LBDB_ON
+  double chisq, ochisq;
+  double lambda = 0.001;
+  int i,j;
+  int iterations=0;
+  bool allow_stop = false;
+
+  double *oneda = new double[mod->predictor->num_params];
+  double *atry = new double[mod->predictor->num_params];
+  double *beta = new double[mod->predictor->num_params];
+  double *da = new double[mod->predictor->num_params];
+  double **covar = new double*[mod->predictor->num_params];
+  double **alpha = new double*[mod->predictor->num_params];
+  double *x = new double[mod->cur_stats-1];
+  double *y = new double[mod->cur_stats-1];
+  double **temp = new double*[mod->predictor->num_params];
+
+  for (i=0; i<mod->predictor->num_params; ++i) {
+    alpha[i] = new double[mod->predictor->num_params];
+    covar[i] = new double[mod->predictor->num_params];
+    temp[i] = new double[mod->predictor->num_params];
+    atry[i] = mod->parameters[object][i];
+  }
+  for (i=0; i<mod->cur_stats-2; ++i) {
+    x[i] = mod->collection[i].objData[object].wallTime;
+    y[i] = mod->collection[i+1].objData[object].wallTime;
+  }
+
+  Marquardt_coefficients(x,y,mod->parameters[object],alpha,beta,chisq,mod->predictor);
+  ochisq = chisq;
+
+  while (chisq > 0.01 || !allow_stop) {
+    if (++iterations > MAX_CHISQ_ITER) {
+      // something wrong!!!
+      return false;
+    }
+    // alter linearized fitting matrix, by augmenting diagonal elements
+    for (i=0; i<mod->predictor->num_params; ++i) {
+      for (j=0; j<mod->predictor->num_params; ++j) covar[i][j] = alpha[i][j];
+      covar[i][i] = alpha[i][i] * (1 + lambda);
+      for (j=0; j<mod->predictor->num_params; ++j) temp[i][j] = covar[i][j];
+      oneda[i] = beta[i];
+    }
+
+    // matrix solution
+    gaussj(temp, oneda, mod->predictor->num_params);
+    for (i=0; i<mod->predictor->num_params; ++i) {
+      for (j=0; j<mod->predictor->num_params; ++j) covar[i][j] = temp[i][j];
+      da[i] = oneda[i];
+    }
+
+    // did the trial succeed?
+    for (i=0, j=0; j<mod->predictor->num_params; ++j) atry[j] = mod->parameters[object][j] + da[i++];
+    Marquardt_coefficients(x,y,atry,covar,da,chisq,mod->predictor);
+    if (chisq < ochisq) {  // success, accept the new solution
+      lambda *= 0.1;
+      allow_stop = true;
+      for (i=0; i<mod->predictor->num_params; ++i) {
+       for (j=0; j<mod->predictor->num_params; ++j) alpha[i][j] = covar[i][j];
+       beta[i] = da[i];
+       mod->parameters[object][i] = atry[i];
+      }
+    } else {  // failure, increase lamda
+      lambda *= 10;
+      allow_stop = false;
+    }
+    ochisq = chisq;
+  }
+  for (i=0; i<mod->predictor->num_params; ++i) {
+    delete[] alpha[i];
+    delete[] covar[i];
+    delete[] temp[i];
+  }
+  delete[] oneda;
+  delete[] atry;
+  delete[] beta;
+  delete[] da;
+  delete[] covar;
+  delete[] alpha;
+  delete[] x;
+  delete[] y;
+  delete[] temp;
+
+  return true;
+#endif
+}
+
+// routine that update LDStats given a predictor model
+void CentralLB::FuturePredictor(CentralLB::LDStats* stats) {
+#if CMK_LBDB_ON
+  bool model_done;
+  int i;
+
+  if (predicted_model->cur_stats < _lb_predict_delay) {
+    // not yet ready to create the model, just store the relevant statistic
+    predicted_model->collection[predicted_model->start_stats].objData = new LDObjData[stats->n_objs];
+    predicted_model->collection[predicted_model->start_stats].commData = new LDCommData[stats->n_comm];
+    predicted_model->collection[predicted_model->start_stats].n_objs = stats->n_objs;
+    predicted_model->collection[predicted_model->start_stats].n_migrateobjs = stats->n_migrateobjs;
+    predicted_model->collection[predicted_model->start_stats].n_comm = stats->n_comm;
+    for (i=0; i<stats->n_objs; ++i)
+      predicted_model->collection[predicted_model->start_stats].objData[i] = stats->objData[i];
+    for (i=0; i<stats->n_comm; ++i)
+      predicted_model->collection[predicted_model->start_stats].commData[i] = stats->commData[i];
+    ++predicted_model->cur_stats;
+    ++predicted_model->start_stats;
+
+  } else {
+
+    if (predicted_model->parameters == NULL) {     // time to create the new prediction model
+      // allocate parameters
+      predicted_model->model_valid = new bool[stats->n_objs];
+      predicted_model->parameters = new double*[stats->n_objs];
+      for (i=0; i<stats->n_objs; ++i) predicted_model->parameters[i] = new double[predicted_model->predictor->num_params];
+      for (i=0; i<stats->n_objs; ++i) {
+       // initialization
+       predicted_model->predictor->initialize_params(predicted_model->parameters[i]);
+       predicted_model->predictor->print(predicted_model->parameters[i]);
+
+       model_done = Marquardt_solver(predicted_model, i);
+       // always initialize to false for conservativity
+       predicted_model->model_valid[i] = false;
+       CkPrintf("LB: Model for object %d %s\n",i,model_done?"found":"not found");
+       predicted_model->predictor->print(predicted_model->parameters[i]);
+      }
+
+      if (predicted_model->model_valid) {
+       CkPrintf("LB: New model completely constructed\n");
+      } else {
+       CkPrintf("LB: Construction of new model failed\n");
+      }
+
+    } else {     // model already constructed, update it
+
+      double *error_model = new double[stats->n_objs];
+      double *error_default = new double[stats->n_objs];
+
+      CkPrintf("Error in estimation:\n");
+      for (i=0; i<stats->n_objs; ++i) {
+       error_model[i] = stats->objData[i].wallTime-predicted_model->predictor->predict(predicted_model->collection[(predicted_model->start_stats-1)%predicted_model->n_stats].objData[i].wallTime,predicted_model->parameters[i]);
+       error_default[i] = stats->objData[i].wallTime-predicted_model->collection[(predicted_model->start_stats-1)%predicted_model->n_stats].objData[i].wallTime;
+       CkPrintf("object %d: real time=%f, model error=%f, default error=%f\n",i,stats->objData[i].wallTime,error_model[i],error_default[i]);
+      }
+
+      // save statistics in the last position
+      if (predicted_model->start_stats >= predicted_model->n_stats) predicted_model->start_stats -= predicted_model->n_stats;
+      if (predicted_model->cur_stats < predicted_model->n_stats) ++predicted_model->cur_stats;
+
+      if (predicted_model->collection[predicted_model->start_stats].objData != NULL) {
+       delete predicted_model->collection[predicted_model->start_stats].objData;
+       delete predicted_model->collection[predicted_model->start_stats].commData;
+      }
+      predicted_model->collection[predicted_model->start_stats].objData = new LDObjData[stats->n_objs];
+      predicted_model->collection[predicted_model->start_stats].commData = new LDCommData[stats->n_comm];
+
+      predicted_model->collection[predicted_model->start_stats].n_objs = stats->n_objs;
+      predicted_model->collection[predicted_model->start_stats].n_migrateobjs = stats->n_migrateobjs;
+      predicted_model->collection[predicted_model->start_stats].n_comm = stats->n_comm;
+      for (i=0; i<stats->n_objs; ++i)
+       predicted_model->collection[predicted_model->start_stats].objData[i] = stats->objData[i];
+      for (i=0; i<stats->n_comm; ++i)
+       predicted_model->collection[predicted_model->start_stats].commData[i] = stats->commData[i];      
+      ++predicted_model->start_stats;      
+
+      // check if model is ok
+      // the check can be performed even if the model is not valid since it will
+      // releave which objects are wrongly updated and will try to fix them
+
+      // the update of the model is done if the model does not approximate
+      // sufficiently well the underlining function or if the time-invariante
+      // approach is performing better
+      for (i=0; i<stats->n_objs; ++i) {
+        //if (fabs(error_model[i]) > 0.2*stats->objData[i].wallTime || fabs(error_model[i]) > fabs(error_default[i])) {
+        if (fabs(error_model[i]) > fabs(error_default[i])) {  // no absolute error check
+         predicted_model->model_valid[i] = false;
+         // model wrong, rebuild it now
+         predicted_model->predictor->initialize_params(predicted_model->parameters[i]);
+         model_done = Marquardt_solver(predicted_model, i);
+         CkPrintf("LB: Updated model for object %d %s",i,model_done?"success":"failed. ");
+         predicted_model->predictor->print(predicted_model->parameters[i]);
+       }
+       if (fabs(error_model[i]) < fabs(error_default[i])) predicted_model->model_valid[i] = true;
+      }
+
+    }
+
+    // use the model to update statistics
+    double *param;
+    for (int i=0; i<stats->n_objs; ++i) {
+      if (predicted_model->model_valid[i]) {
+       param = predicted_model->parameters[i];
+       stats->objData[i].cpuTime = predicted_model->predictor->predict(stats->objData[i].cpuTime, param);
+       stats->objData[i].wallTime = predicted_model->predictor->predict(stats->objData[i].wallTime, param);
+      }
+    }
+
+  }
+
+#endif
+}
+
+/*@}*/
index ef89290a6ba404a4a3a08f35b55aec04e90ee6d0..224606f490e042d44555b8462286b2b33db2b867 100644 (file)
@@ -168,6 +168,7 @@ public:
   inline void EstObjLoad(LDObjHandle h, double load) { LDEstObjLoad(h,load); };
   inline void NonMigratable(LDObjHandle h) { LDNonMigratable(h); };
   inline void Migratable(LDObjHandle h) { LDMigratable(h); };
+  inline void UseReadyMigrate(LDObjHandle h, CmiBool flag) { LDReadyMigrate(h, flag); };
   inline void DumpDatabase(void) { LDDumpDatabase(myLDHandle); };
 
   /*
index c127fc1b0ae213fd80810db23368459ec3d65c21..e6e8dd6152ebe504422785d0b28bd8edaecd35e7 100644 (file)
@@ -25,6 +25,7 @@ public:
   LBObj(LBDB *_parentDB, const LDObjHandle &_h, void *usr_ptr = NULL, CmiBool _migratable=CmiTrue) {
     data.handle = _h;
     data.migratable = _migratable;
+    data.ignoreArrival = CmiFalse;
     data.cpuTime = 0.;
     data.wallTime = 0.;
     userData = usr_ptr;
@@ -68,6 +69,7 @@ public:
   inline LDOMHandle &parentOM() { return data.handle.omhandle; }
   inline const LDObjHandle &GetLDObjHandle() const { return data.handle; }
   inline void SetMigratable(CmiBool mig) { data.migratable = mig; }
+  inline void SetReadyMigrate(CmiBool ready) { data.ignoreArrival = ready; }
   inline LDObjData &ObjData() { return data; };
   inline void lastKnownLoad(double *c, double *w) {*c=lastCpuTime; *w=lastWallTime; }
   inline void *getUserData() { return  userData; }
index b4b40b9ee49ab9c09d37f8ff24fff030b1f2713b..9835f440a09654e12a08cefdc06d57cdef05b024 100644 (file)
@@ -36,8 +36,6 @@ RandRefLB::RandRefLB(const CkLBOptions &opt): RandCentLB(opt)
 void RandRefLB::work(CentralLB::LDStats* stats, int count)
 {
   //  CkPrintf("[%d] RandRefLB strategy\n",CkMyPe());
-
-  CkVec<MigrateInfo*> migrateInfo;
   int obj;
 
   RandCentLB::work(stats, count);
index a2320c333b782141b7d5c5a529eb8b8782df7130..fa560f352511838ba68c9146f8cccff7067c8d21 100644 (file)
@@ -253,6 +253,14 @@ extern "C" void LDMigratable(const LDObjHandle &h)
   obj->SetMigratable(CmiTrue);
 }
 
+extern "C" void LDReadyMigrate(const LDObjHandle &h, CmiBool isReady)
+{
+  LBDB *const db = (LBDB*)(h.omhandle.ldb.handle);
+  LBObj *const obj = db->LbObj(h);
+
+  obj->SetReadyMigrate(isReady);
+}
+
 extern "C" void LDClearLoads(LDHandle _db)
 {
   LBDB *const db = (LBDB*)(_db.handle);
index 96f42ab64dc32efb924628eb7670bc753612c73a..0734a4f2bca0a971539efa85a35a828f6091c693 100644 (file)
@@ -101,6 +101,7 @@ typedef struct {
   double cpuTime;
   double wallTime;
   CmiBool migratable;
+  CmiBool ignoreArrival;
 #ifdef __cplusplus
   inline const LDOMHandle &omHandle() const { return handle.omhandle; }
   inline const LDOMid &omID() const { return handle.omhandle.id; }
@@ -237,6 +238,7 @@ void LDMessage(LDObjHandle from,
 void LDEstObjLoad(LDObjHandle h, double load);
 void LDNonMigratable(const LDObjHandle &h);
 void LDMigratable(const LDObjHandle &h);
+void LDReadyMigrate(const LDObjHandle &h, CmiBool);
 void LDDumpDatabase(LDHandle _lbdb);
 
 /*
@@ -370,6 +372,7 @@ inline void LDObjData::pup(PUP::er &p) {
   p|cpuTime;
   p|wallTime;
   p|migratable;
+  p|ignoreArrival;
 }
 PUPmarshall(LDObjData);
 inline void LDCommDesc::pup(PUP::er &p) {