Adaptive LB
authorHarshitha <gplkrsh2@illinois.edu>
Fri, 18 Nov 2011 18:01:28 +0000 (12:01 -0600)
committerHarshitha <gplkrsh2@illinois.edu>
Fri, 18 Nov 2011 18:01:28 +0000 (12:01 -0600)
src/ck-ldb/CentralLB.C
src/ck-ldb/CentralLB.ci
src/ck-ldb/CentralLB.h

index 1257ea7efcc06b66428244108a44fc414c2f5bb5..06a1cb8675e407b963f1cd7cde0157fd45fe52b9 100644 (file)
@@ -42,11 +42,40 @@ CkGroupID loadbalancer;
 int * lb_ptr;
 int load_balancer_created;
 
+double lb_iteration_start_time = 0.0;
+double lb_ideal_trigger_lb_time = 0.0;
+double lb_migration_cost = 0.0;
+double lb_strategy_cost = 0.0;
+int lb_no_iterations = 0;
+double lb_min_stat_coll_period = 0.0;
+double cur_max_pe_load = 0.0;
+double cur_avg_pe_load = 0.0;
+
 CreateLBFunc_Def(CentralLB, "CentralLB base class")
 
 static void getPredictedLoadWithMsg(BaseLB::LDStats* stats, int count, 
                             LBMigrateMsg *, LBInfo &info, int considerComm);
 
+CkReductionMsg* lbDataCollection(int nMsg, CkReductionMsg** msgs) {
+  double lb_data[3];
+  lb_data[0] = 0;
+  lb_data[1] = 0;
+  lb_data[2] = 0;
+  for (int i = 0; i < nMsg; i++) {
+    CkAssert(msgs[i]->getSize() == 3*sizeof(double));
+    double* m = (double *)msgs[i]->getData();
+    lb_data[0] += m[0];
+    lb_data[1] = ((m[1] > lb_data[1])? m[1] : lb_data[1]);
+    lb_data[2] += m[2];
+  }
+  return CkReductionMsg::buildNew(3*sizeof(double), lb_data);
+}
+
+/*global*/ CkReduction::reducerType lbDataCollectionType;
+/*initcall*/ void registerLBDataCollection(void) {
+  lbDataCollectionType = CkReduction::addReducer(lbDataCollection);
+}
+
 /*
 void CreateCentralLB()
 {
@@ -161,6 +190,7 @@ void CentralLB::turnOff()
 
 void CentralLB::AtSync()
 {
+  CkPrintf("AtSync CEntral LB\n");
 #if CMK_LBDB_ON
   DEBUGF(("[%d] CentralLB AtSync step %d!!!!!\n",CkMyPe(),step()));
 
@@ -174,7 +204,7 @@ void CentralLB::AtSync()
     return;
   }
   if(CmiNodeAlive(CkMyPe())){
-    thisProxy [CkMyPe()].ProcessAtSync();
+    thisProxy [CkMyPe()].ProcessAtSyncMin();
   }
 #endif
 }
@@ -216,6 +246,96 @@ void CentralLB::ProcessAtSync()
 #endif
 }
 
+void CentralLB::ProcessAtSyncMin()
+{
+#if CMK_LBDB_ON
+  if (reduction_started) return;              // reducton in progress
+
+  CmiAssert(CmiNodeAlive(CkMyPe()));
+  if (CkMyPe() == cur_ld_balancer) {
+    start_lb_time = CkWallTimer();
+  }
+
+
+#if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
+       initMlogLBStep(thisgroup);
+#endif
+  
+  CkPrintf("ProcessAtSyncMin [%d]\n", CkMyPe());
+
+//  if (CkWallTimer() > lb_min_stat_coll_period) {
+//    CkPrintf("Since the time is past the collection period, send MinStats of [%d]\n", CkMyPe());
+    SendMinStats();
+//  } else {
+//    CkPrintf("No need to send MinStats of [%d]\n", CkMyPe());
+//    ResumeClients(1);
+//  }
+#endif
+}
+
+void CentralLB::SendMinStats() {
+
+ double total_walltime;
+ double idle_time;
+ double bg_walltime;
+  theLbdb->GetTime(&total_walltime,&total_walltime, &idle_time, &bg_walltime, &bg_walltime);
+  CkPrintf("Total walltime [%d] %lf\n", CkMyPe(), total_walltime);
+
+  double lb_data[3];
+  lb_data[0] = total_walltime;
+  lb_data[1] = total_walltime;
+  lb_data[2] = 1;
+
+  CkCallback cb(CkIndex_CentralLB::ReceiveMinStats((CkReductionMsg*)NULL), 
+                  thisProxy[0]);
+  contribute(3*sizeof(double), lb_data, lbDataCollectionType, cb);
+
+//  {
+//  // enfore the barrier to wait until centralLB says no
+//  LDOMHandle h;
+//  h.id.id.idx = 0;
+//  theLbdb->getLBDB()->RegisteringObjects(h);
+//  }
+}
+
+void CentralLB::ReceiveMinStats(CkReductionMsg *msg) {
+  CmiAssert(CkMyPe() == 0);
+  lb_no_iterations++;
+  double* load = (LBRealType *) msg->getData();
+  double max = load[1];
+  double avg = load[0]/load[2];
+  CkPrintf("Total load : %lf Avg load: %lf Max load: %lf\n", load[0], load[0]/load[2], load[1]);
+  cur_max_pe_load = max;
+  cur_avg_pe_load = avg;
+
+  // Some heuristics for lbperiod
+  int ideal_period = (lb_strategy_cost + lb_migration_cost) * lb_no_iterations / (max - avg);
+  lb_ideal_trigger_lb_time = CkWallTimer() + (ideal_period * max/lb_no_iterations);
+  lb_min_stat_coll_period = CkWallTimer() + (ideal_period * max / lb_no_iterations);
+  CkPrintf("max : %lf, avg: %lf, strat cost: %lf, migration_cost: %lf, idealperiod : %d trigger  lb: %lf current time: %lf\n",
+      max/lb_no_iterations, avg/lb_no_iterations,
+      lb_strategy_cost, lb_migration_cost, ideal_period, lb_ideal_trigger_lb_time,
+      CkWallTimer());
+  CkPrintf("Walltime: %lf, lb trigger time %f, lb min stats coll time %f\n", CkWallTimer(), lb_ideal_trigger_lb_time, lb_min_stat_coll_period);
+  //  thisProxy.ResumeClients(0);
+  
+  thisProxy.ReceiveIdealLBPeriod(lb_ideal_trigger_lb_time, lb_min_stat_coll_period);
+
+//  if (max/avg > 1.01) { 
+//    CkPrintf("Carrying out loadbalancing\n");
+//    thisProxy.ProcessAtSync();
+//    lb_no_iterations = 0;
+//  } else {
+    CkPrintf("Not very overloaded\n");
+         thisProxy.ResumeClients(1);
+//  }
+}
+
+void CentralLB::ReceiveIdealLBPeriod(double lb_ideal_trigger_lbtime,
+    double lb_min_stat_collperiod) {
+  lb_ideal_trigger_lb_time = lb_ideal_trigger_lbtime;
+  lb_min_stat_coll_period = lb_min_stat_collperiod;
+}
 // called only on 0
 void CentralLB::ReceiveCounts(CkReductionMsg  *msg)
 {
@@ -288,6 +408,7 @@ void CentralLB::BuildStatsMsg()
 #endif
 }
 
+
 // called on every processor
 void CentralLB::SendStats()
 {
@@ -859,6 +980,7 @@ void CentralLB::ProcessReceiveMigration(CkReductionMsg  *msg)
     theLbdb->SetLBPeriod(theLbdb->GetLBPeriod()*2);
   }
 #endif
+  CkPrintf("Can set the LBPeriod here [%d]\n", CkMyPe());
   cur_ld_balancer = m->next_lb;
   if((CkMyPe() == cur_ld_balancer) && (cur_ld_balancer != 0)){
       LBDatabaseObj()->set_avail_vector(m->avail_vector, -2);
@@ -1008,6 +1130,11 @@ void CentralLB::CheckMigrationComplete()
                 lbname, cur_ld_balancer, step()-1, end_lb_time,
                end_lb_time-start_lb_time);
     }
+
+    lb_migration_cost = (CkWallTimer() - start_lb_time);
+    lb_iteration_start_time = CkWallTimer();
+    CkPrintf("lb_iteration_start_time[%d] %lf\n", CkMyPe(), lb_iteration_start_time);
+
     lbdone = 0;
     future_migrates_expected = -1;
     future_migrates_completed = 0;
@@ -1060,6 +1187,8 @@ LBMigrateMsg* CentralLB::Strategy(LDStats* stats)
     CkPrintf("CharmLB> %s: PE [%d] #Objects migrating: %d, LBMigrateMsg size: %.2f MB\n", lbname, cur_ld_balancer, msg->n_moves, env->getTotalsize()/1024.0/1024.0);
     CkPrintf("CharmLB> %s: PE [%d] strategy finished at %f duration %f s\n",
              lbname, cur_ld_balancer, strat_end_time, strat_end_time-strat_start_time);
+    lb_strategy_cost = (strat_end_time - strat_start_time);
+    CkPrintf("Strategy cost %f %f %f\n", strat_end_time, strat_start_time, lb_strategy_cost);
   }
 
   return msg;
index 92806016babd64a12f95c112b23dee8d9939c311..8f2c908af9a541599c46b535ddd68c79a0df2f47 100644 (file)
@@ -11,13 +11,19 @@ readonly CkGroupID loadbalancer;
 
 initnode void lbinit(void);
 
+initcall void registerLBDataCollection(void);
+
 group [migratable] CentralLB : BaseLB {
   entry void CentralLB(const CkLBOptions &);  
+  entry void ProcessAtSyncMin(void);
   entry void ProcessAtSync(void);
   entry void SendStats();
+  entry void SendMinStats();
   entry void ReceiveStats(CkMarshalledCLBStatsMessage data);
   entry void ReceiveStatsViaTree(CkMarshalledCLBStatsMessage data);
   entry void ReceiveCounts(CkReductionMsg *);
+  entry void ReceiveMinStats(CkReductionMsg *msg);
+  entry void ReceiveIdealLBPeriod(double lb_ideal_trigger_lb_time, double lb_min_stat_coll_period);
   entry void LoadBalance(void);
   entry void ResumeClients(int);
   entry void ResumeClients(CkReductionMsg *);
index 30f227f139eddcd0e5ef99f25c8255137aa09907..b36d6871847a5f8c23c80e9279668bf87acd0fbb 100644 (file)
@@ -84,9 +84,12 @@ public:
   void AtSync(void); // Everything is at the PE barrier
   void ProcessAtSync(void); // Receive a message from AtSync to avoid
                             // making projections output look funny
-
+  void ProcessAtSyncMin(void);
   void SendStats();
+  void SendMinStats();
   void ReceiveCounts(CkReductionMsg *);
+  void ReceiveMinStats(CkReductionMsg *);
+  void ReceiveIdealLBPeriod(double lb_ideal_trigger_lb_time, double lb_min_stat_coll_period);
   void ReceiveStats(CkMarshalledCLBStatsMessage &msg); // Receive stats on PE 0
   void ReceiveStatsViaTree(CkMarshalledCLBStatsMessage &msg); // Receive stats using a tree structure