Merging
[charm.git] / src / ck-ldb / CentralLB.C
index 22d856c778d3665102d1f9b57ceb795e7caa67a7..ba78bef06e254b0112293ddfd99bacf55fa79da3 100644 (file)
@@ -1,3 +1,4 @@
+
 /**
  * \addtogroup CkLdb
 */
@@ -10,6 +11,9 @@
 #include "LBDBManager.h"
 #include "LBSimulation.h"
 
+//#include "limits.h"
+#include <vector>
+
 #define  DEBUGF(x)       // CmiPrintf x;
 #define  DEBUG(x)        // x;
 
@@ -42,11 +46,71 @@ CkGroupID loadbalancer;
 int * lb_ptr;
 int load_balancer_created;
 
+//struct AdaptiveData {
+//  int iteration;
+//  double max_load;
+//  double avg_load;
+//};
+//
+//struct AdaptiveLBDatabase {
+//  std::vector<AdaptiveData> history_data;
+//} adaptive_lbdb;
+//
+//enum state {
+//  OFF,
+//  ON,
+//  PAUSE,
+//  DECIDED,
+//  LOAD_BALANCE
+//} local_state;
+//
+//struct AdaptiveLBStructure {
+//  int lb_ideal_period;
+//  int lb_calculated_period;
+//  int lb_no_iterations;
+//  int global_max_iter_no;
+//  int global_recv_iter_counter;
+//  bool in_progress;
+//  double prev_load;
+//  double lb_strategy_cost;
+//  double lb_migration_cost;
+//  bool lb_period_informed;
+//  int lb_msg_send_no;
+//  int lb_msg_recv_no;
+//} adaptive_struct;
+
 CreateLBFunc_Def(CentralLB, "CentralLB base class")
 
 static void getPredictedLoadWithMsg(BaseLB::LDStats* stats, int count, 
                             LBMigrateMsg *, LBInfo &info, int considerComm);
 
+//CkReductionMsg* lbDataCollection(int nMsg, CkReductionMsg** msgs) {
+//  double lb_data[4];
+//  lb_data[0] = 0;
+//  lb_data[1] = 0;
+//  lb_data[2] = 0;
+//  for (int i = 0; i < nMsg; i++) {
+//    CkAssert(msgs[i]->getSize() == 4*sizeof(double));
+//    double* m = (double *)msgs[i]->getData();
+//    lb_data[0] += m[0];
+//    lb_data[1] = ((m[1] > lb_data[1])? m[1] : lb_data[1]);
+//    lb_data[2] += m[2];
+//    if (i == 0) {
+//      lb_data[3] = m[3];
+//    }
+//    if (m[3] != lb_data[3]) {
+//      CkPrintf("Error!!! Reduction is intermingled between iteration %lf and\
+//      %lf\n", lb_data[3], m[3]);
+//    }
+//  }
+//  return CkReductionMsg::buildNew(4*sizeof(double), lb_data);
+//}
+//
+///*global*/ CkReduction::reducerType lbDataCollectionType;
+///*initcall*/ void registerLBDataCollection(void) {
+//  lbDataCollectionType = CkReduction::addReducer(lbDataCollection);
+//}
+
 /*
 void CreateCentralLB()
 {
@@ -117,6 +181,20 @@ void CentralLB::initLB(const CkLBOptions &opt)
   if (_lb_args.statsOn()) theLbdb->CollectStatsOn();
 
   load_balancer_created = 1;
+
+  // If metabalancer enabled, initialize the variables
+ // adaptive_struct.lb_ideal_period =  INT_MAX;
+ // adaptive_struct.lb_calculated_period = INT_MAX;
+ // adaptive_struct.lb_no_iterations = -1;
+ // adaptive_struct.global_max_iter_no = 0;
+ // adaptive_struct.global_recv_iter_counter = 0;
+ // adaptive_struct.in_progress = false;
+ // adaptive_struct.prev_load = 0.0;
+ // adaptive_struct.lb_strategy_cost = 0.0;
+ // adaptive_struct.lb_migration_cost = 0.0;
+ // adaptive_struct.lb_msg_send_no = 0;
+ // adaptive_struct.lb_msg_recv_no = 0;
+ // local_state = OFF;
 #endif
 }
 
@@ -161,8 +239,9 @@ void CentralLB::turnOff()
 
 void CentralLB::AtSync()
 {
+//  CkPrintf("AtSync CEntral LB [%d]\n", CkMyPe());
 #if CMK_LBDB_ON
-  DEBUGF(("[%d] CentralLB AtSync step %d!!!!!\n",CkMyPe(),step()));
+//  DEBUGF(("[%d] CentralLB AtSync step %d!!!!!\n",CkMyPe(),step()));
 
 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
        CpvAccess(_currentObj)=this;
@@ -183,6 +262,9 @@ void CentralLB::AtSync()
 
 void CentralLB::ProcessAtSync()
 {
+
+
+
 #if CMK_LBDB_ON
   if (reduction_started) return;              // reducton in progress
 
@@ -190,6 +272,14 @@ void CentralLB::ProcessAtSync()
   if (CkMyPe() == cur_ld_balancer) {
     start_lb_time = CkWallTimer();
   }
+ double total_load;
+ double idle_time;
+ double bg_walltime;
+ theLbdb->GetTime(&total_load,&total_load, &idle_time, &bg_walltime, &bg_walltime);
+ theLbdb->IdleTime(&idle_time);
+ CkPrintf("Total walltime [%d] %lf: %lf: %lf final laod: %lf\n", CkMyPe(),
+    total_load, idle_time, bg_walltime, (total_load - idle_time - bg_walltime));
+
 
 
 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
@@ -216,6 +306,340 @@ void CentralLB::ProcessAtSync()
 #endif
 }
 
+//void CentralLB::ProcessAtSyncMin()
+//{
+//#if CMK_LBDB_ON
+//  if (reduction_started) return;              // reducton in progress
+//
+//  CmiAssert(CmiNodeAlive(CkMyPe()));
+//  if (CkMyPe() == cur_ld_balancer) {
+//    start_lb_time = CkWallTimer();
+//  }
+//
+//
+//#if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
+//     initMlogLBStep(thisgroup);
+//#endif
+//  
+//  adaptive_struct.lb_no_iterations++;
+// // CkPrintf("[%d] ProcessAtSyncMin lb_iteration [%d] adaptive_struct.lb_ideal_period [%d]\n", CkMyPe(),
+// //     adaptive_struct.lb_no_iterations, adaptive_struct.lb_ideal_period);
+//
+//  // If decision has been made and has reached the lb_period, then do load
+//  // balancing, else if hasn't reached ideal_period, then resume.
+//  if (local_state == DECIDED) {
+//    if (adaptive_struct.lb_no_iterations < adaptive_struct.lb_ideal_period) {
+// //     CkPrintf("[%d] Decision is made but lagging\n", CkMyPe());
+//      SendMinStats();
+//      ResumeClients(0);
+//    } else {
+//      local_state = LOAD_BALANCE;
+// //     CkPrintf("[%d] Decision is made and do LB\n", CkMyPe());
+//      ProcessAtSync();
+//    }
+//    return;
+//  }
+//   
+//  // If the state is ON and not DECIDED, then if havn't reached lb_period, then
+//  // move ahead. If has reached lb_ideal_period, then change state to PAUSE and
+//  // dont resume client.
+//  if (local_state == ON) {
+//    if (adaptive_struct.lb_no_iterations < adaptive_struct.lb_ideal_period) {
+//      SendMinStats();
+//      ResumeClients(0);
+//    } else {
+//      local_state = PAUSE;
+//    }
+//    return;
+//  }
+//
+//  SendMinStats();
+//  ResumeClients(0);
+//#endif
+//}
+//
+//void CentralLB::SendMinStats() {
+//
+// double total_load;
+// double idle_time;
+// double bg_walltime;
+//  theLbdb->GetTime(&total_load,&total_load, &idle_time, &bg_walltime, &bg_walltime);
+// // CkPrintf("Total walltime [%d] %lf: %lf: %lf final laod: %lf\n", CkMyPe(), total_load, idle_time, bg_walltime, (total_load - idle_time));
+//
+//  // Since the total_load is cumulative since the last load balancing stage,
+//  // Hence it is subtracted from the previous load.
+//  total_load -= idle_time;
+//  double tmp = total_load;
+//  total_load -= adaptive_struct.prev_load;
+//  adaptive_struct.prev_load = tmp; 
+//
+//  double lb_data[4];
+//  lb_data[0] = total_load;
+//  lb_data[1] = total_load;
+//  lb_data[2] = 1;
+//  lb_data[3] = adaptive_struct.lb_no_iterations;
+//  //CkPrintf("[%d] sends total load %lf at iter %d\n", CkMyPe(), total_load, adaptive_struct.lb_no_iterations);
+//
+//  if (adaptive_struct.lb_no_iterations != 0) {
+//    CkCallback cb(CkIndex_CentralLB::ReceiveMinStats((CkReductionMsg*)NULL), 
+//        thisProxy[0]);
+////    contribute(4*sizeof(double), lb_data, lbDataCollectionType, cb);
+//  }
+//
+////    int tmp1 = adaptive_struct.lb_no_iterations;
+////    CkPrintf("[%d] contribution iteration_no: %d\n",CkMyPe(), tmp1);
+////    // Send the current iteration no
+////    CkCallback cb1(CkIndex_CentralLB::ReceiveIterationNo((CkReductionMsg*)NULL), 
+////        thisProxy[0]);
+////    contribute(sizeof(int), &tmp1, CkReduction::max_int, cb1);
+//}
+//
+//void CentralLB::ReceiveMinStats(CkReductionMsg *msg) {
+//  CmiAssert(CkMyPe() == 0);
+//  double* load = (LBRealType *) msg->getData();
+//  double max = load[1];
+//  double avg = load[0]/load[2];
+//  int iteration_n = load[3];
+//  CkPrintf("Iteration %d Total load : %lf Avg load: %lf Max load: %lf for %lf procs\n",iteration_n, load[0], load[0]/load[2], load[1], load[2]);
+//  CkPrintf("Current calculated period %d\n", adaptive_struct.lb_calculated_period);
+//  delete msg;
+//
+//  // Store the data for this iteration
+//  AdaptiveData data;
+//  data.iteration = adaptive_struct.lb_no_iterations;
+//  data.max_load = max;
+//  data.avg_load = avg;
+//  adaptive_lbdb.history_data.push_back(data);
+//
+//  // If lb period inform is in progress, dont inform again
+//  if (adaptive_struct.in_progress) {
+//    return;
+//  }
+//
+////  if (adaptive_struct.lb_period_informed) {
+////    return;
+////  }
+//
+//  // If the max/avg ratio is greater than the threshold and also this is not the
+//  // step immediately after load balancing, carry out load balancing
+//  //if (max/avg >= 1.1 && adaptive_lbdb.history_data.size() > 4) {
+//  if (max/avg >= 1.5 && adaptive_lbdb.history_data.size() > 4) {
+//    CkPrintf("Carry out load balancing step at iter max/avg(%lf) > 1.1\n", max/avg);
+////    if (!adaptive_struct.lb_period_informed) {
+////      // Just for testing
+////      adaptive_struct.lb_calculated_period = 40;
+////      adaptive_struct.lb_period_informed = true;
+////      thisProxy.LoadBalanceDecision(adaptive_struct.lb_calculated_period);
+////      return;
+////    }
+//
+//    // If the new lb period is less than current set lb period
+//    if (adaptive_struct.lb_calculated_period > iteration_n + 1) {
+//      adaptive_struct.lb_calculated_period = iteration_n + 1;
+//      adaptive_struct.lb_period_informed = true;
+//      adaptive_struct.in_progress = true;
+//      CkPrintf("Informing everyone the lb period is %d\n",
+//          adaptive_struct.lb_calculated_period);
+//      thisProxy.LoadBalanceDecision(adaptive_struct.lb_msg_send_no++, adaptive_struct.lb_calculated_period);
+//    }
+//    return;
+//  }
+//
+//  // Generate the plan for the adaptive strategy
+//  int period;
+//  if (generatePlan(period)) {
+//    //CkPrintf("Carry out load balancing step at iter\n");
+//
+//    // If the new lb period is less than current set lb period
+//    if (adaptive_struct.lb_calculated_period > period) {
+//      adaptive_struct.lb_calculated_period = period;
+//      adaptive_struct.in_progress = true;
+//      adaptive_struct.lb_period_informed = true;
+//      CkPrintf("Informing everyone the lb period is %d\n",
+//          adaptive_struct.lb_calculated_period);
+//      thisProxy.LoadBalanceDecision(adaptive_struct.lb_msg_send_no++, adaptive_struct.lb_calculated_period);
+//    }
+//  }
+//}
+//
+//bool CentralLB::generatePlan(int& period) {
+//  if (adaptive_lbdb.history_data.size() <= 8) {
+//    return false;
+//  }
+//
+//  // Some heuristics for lbperiod
+//  // If constant load or almost constant,
+//  // then max * new_lb_period > avg * new_lb_period + lb_cost
+//  double max = 0.0;
+//  double avg = 0.0;
+//  AdaptiveData data;
+//  for (int i = 0; i < adaptive_lbdb.history_data.size(); i++) {
+//    data = adaptive_lbdb.history_data[i];
+//    max += data.max_load;
+//    avg += data.avg_load;
+//    CkPrintf("max (%d, %lf) avg (%d, %lf)\n", i, data.max_load, i, data.avg_load);
+//  }
+////  max /= (adaptive_struct.lb_no_iterations - adaptive_lbdb.history_data[0].iteration);
+////  avg /= (adaptive_struct.lb_no_iterations - adaptive_lbdb.history_data[0].iteration);
+////
+////  adaptive_struct.lb_ideal_period = (adaptive_struct.lb_strategy_cost +
+////  adaptive_struct.lb_migration_cost) / (max - avg);
+////  CkPrintf("max : %lf, avg: %lf, strat cost: %lf, migration_cost: %lf, idealperiod : %d \n",
+////      max, avg, adaptive_struct.lb_strategy_cost, adaptive_struct.lb_migration_cost, adaptive_struct.lb_ideal_period);
+////
+//  // If linearly varying load, then find lb_period
+//  // area between the max and avg curve 
+//  double mslope, aslope, mc, ac;
+//  getLineEq(aslope, ac, mslope, mc);
+//  CkPrintf("\n max: %fx + %f; avg: %fx + %f\n", mslope, mc, aslope, ac);
+//  double a = (mslope - aslope)/2;
+//  double b = (mc - ac);
+//  double c = -(adaptive_struct.lb_strategy_cost + adaptive_struct.lb_migration_cost);
+//  //c = -2.5;
+//  bool got_period = getPeriodForLinear(a, b, c, period);
+//  if (!got_period) {
+//    return false;
+//  }
+//  
+//  if (mslope < 0) {
+//    if (period > (-mc/mslope)) {
+//      CkPrintf("Max < 0 Period set when max load is -ve\n");
+//      return false;
+//    }
+//  }
+//
+//  if (aslope < 0) {
+//    if (period > (-ac/aslope)) {
+//      CkPrintf("Avg < 0 Period set when avg load is -ve\n");
+//      return false;
+//    }
+//  }
+//
+//  int intersection_t = (mc-ac) / (aslope - mslope);
+//  if (intersection_t > 0 && period > intersection_t) {
+//    CkPrintf("Avg | Max Period set when curves intersect\n");
+//    return false;
+//  }
+//  return true;
+//}
+//
+//bool CentralLB::getPeriodForLinear(double a, double b, double c, int& period) {
+//  CkPrintf("Quadratic Equation %lf X^2 + %lf X + %lf\n", a, b, c);
+//  if (a == 0.0) {
+//    period = (-c / b);
+//    CkPrintf("Ideal period for linear load %d\n", period);
+//    return true;
+//  }
+//  int x;
+//  double t = (b * b) - (4*a*c);
+//  if (t < 0) {
+//    CkPrintf("(b * b) - (4*a*c) is -ve sqrt : %lf\n", sqrt(t));
+//    return false;
+//  }
+//  t = (-b + sqrt(t)) / (2*a);
+//  x = t;
+//  if (x < 0) {
+//    CkPrintf("boo!!! x (%d) < 0\n", x);
+//    x = 0;
+//    return false;
+//  }
+//  period = x;
+//  CkPrintf("Ideal period for linear load %d\n", period);
+//  return true;
+//}
+//
+//bool CentralLB::getLineEq(double& aslope, double& ac, double& mslope, double& mc) {
+//  int total = adaptive_lbdb.history_data.size();
+//  int iterations = 1 + adaptive_lbdb.history_data[total - 1].iteration -
+//      adaptive_lbdb.history_data[0].iteration;
+//  double a1 = 0;
+//  double m1 = 0;
+//  double a2 = 0;
+//  double m2 = 0;
+//  AdaptiveData data;
+//  int i = 0;
+//  for (i = 0; i < total/2; i++) {
+//    data = adaptive_lbdb.history_data[i];
+//    m1 += data.max_load;
+//    a1 += data.avg_load;
+//  }
+//  m1 /= i;
+//  a1 /= i;
+//
+//  for (i = total/2; i < total; i++) {
+//    data = adaptive_lbdb.history_data[i];
+//    m2 += data.max_load;
+//    a2 += data.avg_load;
+//  }
+//  m2 /= (i - total/2);
+//  a2 /= (i - total/2);
+//
+//  aslope = 2 * (a2 - a1) / iterations;
+//  mslope = 2 * (m2 - m1) / iterations;
+//  ac = adaptive_lbdb.history_data[0].avg_load;
+//  mc = adaptive_lbdb.history_data[0].max_load;
+//  return true;
+//}
+//
+//void CentralLB::LoadBalanceDecision(int req_no, int period) {
+//  if (req_no < adaptive_struct.lb_msg_recv_no) {
+//    CkPrintf("Error!!! Received a request which was already sent or old\n");
+//    return;
+//  }
+//  //CkPrintf("[%d] Load balance decision made cur iteration: %d period:%d state: %d\n",CkMyPe(), adaptive_struct.lb_no_iterations, period, local_state);
+//  adaptive_struct.lb_ideal_period = period;
+//  local_state = ON;
+//  adaptive_struct.lb_msg_recv_no = req_no;
+//  thisProxy[0].ReceiveIterationNo(req_no, adaptive_struct.lb_no_iterations);
+//}
+//
+//void CentralLB::LoadBalanceDecisionFinal(int req_no, int period) {
+//  if (req_no < adaptive_struct.lb_msg_recv_no) {
+//    return;
+//  }
+//  //CkPrintf("[%d] Final Load balance decision made cur iteration: %d period:%d state: %d\n",CkMyPe(), adaptive_struct.lb_no_iterations, period, local_state);
+//  adaptive_struct.lb_ideal_period = period;
+//
+//  if (local_state == ON) {
+//    local_state = DECIDED;
+//    return;
+//  }
+//
+//  // If the state is PAUSE, then its waiting for the final decision from central
+//  // processor. If the decision is that the ideal period is in the future,
+//  // resume. If the ideal period is now, then carry out load balancing.
+//  if (local_state == PAUSE) {
+//    if (adaptive_struct.lb_no_iterations < adaptive_struct.lb_ideal_period) {
+//      local_state = DECIDED;
+//      SendMinStats();
+//      ResumeClients(0);
+//    } else {
+//      local_state = LOAD_BALANCE;
+//      ProcessAtSync();
+//    }
+//    return;
+//  }
+//  CkPrintf("Error!!! Final decision received but the state is invalid %d\n", local_state);
+//}
+//
+//
+//void CentralLB::ReceiveIterationNo(int req_no, int local_iter_no) {
+//  CmiAssert(CkMyPe() == 0);
+//
+//  adaptive_struct.global_recv_iter_counter++;
+//  if (local_iter_no > adaptive_struct.global_max_iter_no) {
+//    adaptive_struct.global_max_iter_no = local_iter_no;
+//  }
+//  if (CkNumPes() == adaptive_struct.global_recv_iter_counter) {
+//    adaptive_struct.lb_ideal_period = (adaptive_struct.lb_ideal_period > adaptive_struct.global_max_iter_no) ? adaptive_struct.lb_ideal_period : adaptive_struct.global_max_iter_no + 1;
+//    thisProxy.LoadBalanceDecisionFinal(req_no, adaptive_struct.lb_ideal_period);
+//    CkPrintf("Final lb_period %d\n", adaptive_struct.lb_ideal_period);
+//    adaptive_struct.in_progress = false;
+//    adaptive_struct.global_max_iter_no = 0;
+//    adaptive_struct.global_recv_iter_counter = 0;
+//  }
+//}
+
 // called only on 0
 void CentralLB::ReceiveCounts(CkReductionMsg  *msg)
 {
@@ -288,6 +712,7 @@ void CentralLB::BuildStatsMsg()
 #endif
 }
 
+
 // called on every processor
 void CentralLB::SendStats()
 {
@@ -770,6 +1195,7 @@ void CentralLB::ReceiveMigration(LBMigrateMsg *m)
   CkCallback cb(CkIndex_CentralLB::ProcessReceiveMigration((CkReductionMsg*)NULL),
                   thisProxy);
   contribute(0, NULL, CkReduction::max_int, cb);
+
 #endif
 }
 
@@ -986,6 +1412,7 @@ void CentralLB::ResumeClients(int balancing)
 
   theLbdb->ResumeClients();
   if (balancing)  {
+
     CheckMigrationComplete();
     if (future_migrates_expected == 0 || 
             future_migrates_expected == future_migrates_completed) {
@@ -1014,9 +1441,15 @@ void CentralLB::CheckMigrationComplete()
                 lbname, cur_ld_balancer, step()-1, end_lb_time,
                end_lb_time-start_lb_time);
     }
+
+    //FIX ME!!! adaptive_struct.lb_migration_cost = (CkWallTimer() - start_lb_time);
+    theLbdb->SetMigrationCost(CkWallTimer() - start_lb_time);
+
     lbdone = 0;
     future_migrates_expected = -1;
     future_migrates_completed = 0;
+
+
     DEBUGF(("[%d] Migration Complete\n", CkMyPe()));
     // release local barrier  so that the next load balancer can go
     LDOMHandle h;
@@ -1066,8 +1499,10 @@ LBMigrateMsg* CentralLB::Strategy(LDStats* stats)
     CkPrintf("CharmLB> %s: PE [%d] #Objects migrating: %d, LBMigrateMsg size: %.2f MB\n", lbname, cur_ld_balancer, msg->n_moves, env->getTotalsize()/1024.0/1024.0);
     CkPrintf("CharmLB> %s: PE [%d] strategy finished at %f duration %f s\n",
              lbname, cur_ld_balancer, strat_end_time, strat_end_time-strat_start_time);
+    // FIX ME!!! adaptive_struct.lb_strategy_cost = (strat_end_time - strat_start_time);
+    //CkPrintf("Strategy cost %f %f %f\n", strat_end_time, strat_start_time, adaptive_struct.lb_strategy_cost);
+    theLbdb->SetStrategyCost(strat_end_time - strat_start_time);
   }
-
   return msg;
 #else
   return NULL;