Asynchronous collection of lb stats and informing lb period
authorHarshitha <gplkrsh2@illinois.edu>
Mon, 5 Mar 2012 01:54:27 +0000 (19:54 -0600)
committerHarshitha <gplkrsh2@illinois.edu>
Mon, 5 Mar 2012 01:54:27 +0000 (19:54 -0600)
src/ck-ldb/CentralLB.C
src/ck-ldb/CentralLB.ci
src/ck-ldb/CentralLB.h

index af1be0bc100f71a7b9813dbb698e544cf78591e5..c4ea7d1ed67651873f757796db75e8fe10c7572a 100644 (file)
@@ -48,13 +48,10 @@ double lb_migration_cost = 0.0;
 double lb_strategy_cost = 0.0;
 
 int lb_no_iterations = -1;
-
 double cur_max_pe_load = 0.0;
 double cur_avg_pe_load = 0.0;
 double prev_load = 0.0;
 
-bool islb_done = false;
-
 struct AdaptiveData {
   int iteration;
   double max_load;
@@ -65,24 +62,35 @@ struct AdaptiveLBDatabase {
   std::vector<AdaptiveData> history_data;
 } adaptive_lbdb;
 
+enum state {
+  OFF,
+  ON,
+  PAUSE,
+  DECIDED,
+  LOAD_BALANCE
+} local_state;
+
 CreateLBFunc_Def(CentralLB, "CentralLB base class")
 
 static void getPredictedLoadWithMsg(BaseLB::LDStats* stats, int count, 
                             LBMigrateMsg *, LBInfo &info, int considerComm);
 
 CkReductionMsg* lbDataCollection(int nMsg, CkReductionMsg** msgs) {
-  double lb_data[3];
+  double lb_data[4];
   lb_data[0] = 0;
   lb_data[1] = 0;
   lb_data[2] = 0;
   for (int i = 0; i < nMsg; i++) {
-    CkAssert(msgs[i]->getSize() == 3*sizeof(double));
+    CkAssert(msgs[i]->getSize() == 4*sizeof(double));
     double* m = (double *)msgs[i]->getData();
     lb_data[0] += m[0];
     lb_data[1] = ((m[1] > lb_data[1])? m[1] : lb_data[1]);
     lb_data[2] += m[2];
+    if (i == 0) {
+      lb_data[3] = m[3];
+    }
   }
-  return CkReductionMsg::buildNew(3*sizeof(double), lb_data);
+  return CkReductionMsg::buildNew(4*sizeof(double), lb_data);
 }
 
 /*global*/ CkReduction::reducerType lbDataCollectionType;
@@ -225,11 +233,6 @@ void CentralLB::AtSync()
 
 #include "ComlibStrategy.h"
 
-void CentralLB::ProcessAtSync(int ideal_lb_period) {
-  lb_ideal_period = ideal_lb_period;
-  ProcessAtSync();
-}
-
 void CentralLB::ProcessAtSync()
 {
 
@@ -237,6 +240,9 @@ void CentralLB::ProcessAtSync()
   lb_no_iterations = -1;
   adaptive_lbdb.history_data.clear();
   prev_load = 0.0;
+  lb_ideal_period = INT_MAX;
+  local_state = OFF;
+
 
 #if CMK_LBDB_ON
   if (reduction_started) return;              // reducton in progress
@@ -290,20 +296,32 @@ void CentralLB::ProcessAtSyncMin()
   CkPrintf("ProcessAtSyncMin [%d] lb_iteration [%d] lb_ideal_period [%d]\n", CkMyPe(),
       lb_no_iterations, lb_ideal_period);
 
-  if (lb_no_iterations >= lb_ideal_period) {
-//    CkPrintf("Since the time is past the collection period, send MinStats of [%d]\n", CkMyPe());
-    SendMinStats();
-  } else {
-
-    SendMinStats();
-// double total_walltime;
-// double idle_time;
-// double bg_walltime;
-//  theLbdb->GetTime(&total_walltime,&total_walltime, &idle_time, &bg_walltime, &bg_walltime);
-//    CkPrintf("No need to send MinStats of [%d] load: %lf\n\n", CkMyPe(),
-//    total_walltime);
-//    ResumeClients(0);
+  // If decision has been made and has reached the lb_period, then do load
+  // balancing, else if hasn't reached ideal_period, then resume.
+  if (local_state == DECIDED) {
+    if (lb_no_iterations < lb_ideal_period) {
+      ResumeClients(1);
+    } else {
+      local_state = LOAD_BALANCE;
+      ProcessAtSync();
+    }
+    return;
   }
+   
+  // If the state is ON and not DECIDED, then if havn't reached lb_period, then
+  // move ahead. If has reached lb_ideal_period, then change state to PAUSE and
+  // dont resume client.
+  if (local_state == ON) {
+    if (lb_no_iterations < lb_ideal_period) {
+      ResumeClients(1);
+    } else {
+      local_state = PAUSE;
+    }
+    return;
+  }
+
+  SendMinStats();
+  ResumeClients(1);
 #endif
 }
 
@@ -322,14 +340,15 @@ void CentralLB::SendMinStats() {
   total_load -= prev_load;
   prev_load = tmp; 
 
-  double lb_data[3];
+  double lb_data[4];
   lb_data[0] = total_load;
   lb_data[1] = total_load;
   lb_data[2] = 1;
+  lb_data[3] = lb_no_iterations;
 
   CkCallback cb(CkIndex_CentralLB::ReceiveMinStats((CkReductionMsg*)NULL), 
                   thisProxy[0]);
-  contribute(3*sizeof(double), lb_data, lbDataCollectionType, cb);
+  contribute(4*sizeof(double), lb_data, lbDataCollectionType, cb);
 }
 
 void CentralLB::ReceiveMinStats(CkReductionMsg *msg) {
@@ -337,14 +356,9 @@ void CentralLB::ReceiveMinStats(CkReductionMsg *msg) {
   double* load = (LBRealType *) msg->getData();
   double max = load[1];
   double avg = load[0]/load[2];
+  int iteration_n = load[3];
   CkPrintf("Iteration %d Total load : %lf Avg load: %lf Max load: %lf\n\n",lb_no_iterations, load[0], load[0]/load[2], load[1]);
 
-//  if (lb_no_iterations == 0 || lb_no_iterations < lb_ideal_period) {
-  if (lb_no_iterations == 0) {
-    thisProxy.ResumeClients(0);
-    return;
-  }
-
   // Store the data for this iteration
   AdaptiveData data;
   data.iteration = lb_no_iterations;
@@ -354,18 +368,16 @@ void CentralLB::ReceiveMinStats(CkReductionMsg *msg) {
 
   // If the max/avg ratio is greater than the threshold and also this is not the
   // step immediately after load balancing, carry out load balancing
-  if (max/avg >= 1.1 && adaptive_lbdb.history_data.size() > 5) {
+  if (max/avg >= 1.1 && adaptive_lbdb.history_data.size() > 4) {
     CkPrintf("Carry out load balancing step\n");
-    islb_done = true;
-    thisProxy.ProcessAtSync();
+    lb_ideal_period = iteration_n + 1;
+    thisProxy.LoadBalanceDecision(lb_ideal_period);
     return;
   }
 
   // Generate the plan for the adaptive strategy
   if (generatePlan()) {
-    thisProxy.ResumeClients(lb_ideal_period, 0);
-  } else {
-    thisProxy.ResumeClients(0);
+    thisProxy.LoadBalanceDecision(lb_ideal_period);
   }
 }
 
@@ -452,6 +464,47 @@ bool CentralLB::getLineEq(double& aslope, double& ac, double& mslope, double& mc
   return true;
 }
 
+void CentralLB::LoadBalanceDecision(int period) {
+  CkPrintf("[%d] Load balance decision made cur iteration: %d period:%d state: %d\n",CkMyPe(), lb_no_iterations, period, local_state);
+  lb_ideal_period = period;
+  if (local_state == OFF) {
+    local_state = ON;
+
+    // Send the current iteration no
+    CkCallback cb(CkIndex_CentralLB::ReceiveIterationNo((CkReductionMsg*)NULL), 
+        thisProxy[0]);
+    int tmp = lb_no_iterations;
+    contribute(sizeof(int), &tmp, CkReduction::max_int, cb);
+    return;
+  }
+
+  if (local_state == ON) {
+    local_state = DECIDED;
+    return;
+  }
+
+  // If the state is PAUSE, then its waiting for the final decision from central
+  // processor. If the decision is that the ideal period is in the future,
+  // resume. If the ideal period is now, then carry out load balancing.
+  if (local_state == PAUSE) {
+    if (lb_no_iterations < lb_ideal_period) {
+      local_state = DECIDED;
+      ResumeClients(1);
+    } else {
+      ProcessAtSync();
+    }
+  }
+}
+
+void CentralLB::ReceiveIterationNo(CkReductionMsg *msg) {
+  CmiAssert(CkMyPe() == 0);
+  int *it_no = (int *) msg->getData();
+  CkPrintf("^^^^ Received all iteration no and final decision %d^^^^\n", it_no[0]);
+  //thisProxy.LoadBalanceDecision(it_no[0]);
+  lb_ideal_period = (lb_ideal_period > it_no[0]) ? lb_ideal_period : it_no[0] + 1;
+  thisProxy.LoadBalanceDecision(lb_ideal_period);
+}
+
 // called only on 0
 void CentralLB::ReceiveCounts(CkReductionMsg  *msg)
 {
@@ -1199,11 +1252,6 @@ void CentralLB::ResumeClients(CkReductionMsg *msg)
   delete msg;
 }
 
-void CentralLB::ResumeClients(int ideal_period, int balancing) {
-  lb_ideal_period = ideal_period;
-  ResumeClients(balancing);
-}
-
 void CentralLB::ResumeClients(int balancing)
 {
 #if CMK_LBDB_ON
index 755c02872c83ed4b831f4aeaa889ccd94a565deb..2189dbe797a5e97685905c75c8a87d852c1801f9 100644 (file)
@@ -17,20 +17,21 @@ group [migratable] CentralLB : BaseLB {
   entry void CentralLB(const CkLBOptions &);  
   entry void ProcessAtSyncMin(void);
   entry void ProcessAtSync(void);
-  entry void ProcessAtSync(int);
   entry void SendStats();
   entry void SendMinStats();
   entry void ReceiveStats(CkMarshalledCLBStatsMessage data);
+  entry void ReceiveMinStats(CkReductionMsg *msg);
   entry void ReceiveStatsViaTree(CkMarshalledCLBStatsMessage data);
   entry void ReceiveCounts(CkReductionMsg *);
-  entry void ReceiveMinStats(CkReductionMsg *msg);
+  entry void LoadBalanceDecision(int period);
+  entry void ReceiveIterationNo(CkReductionMsg *msg);
   entry void LoadBalance(void);
   entry void ResumeClients(int);
-  entry void ResumeClients(int, int);
   entry void ResumeClients(CkReductionMsg *);
   entry void ReceiveMigration(LBMigrateMsg*);  
   entry void ProcessReceiveMigration(CkReductionMsg  *);
   entry void MissMigrate(int);
+
 };
 
 };
index 5c3e3a45f68ba3967e652102a96d4d9dbb9187e9..17cfaccef5e11e90b990c9005e937953fca4a1b6 100644 (file)
@@ -84,7 +84,6 @@ public:
 
   static void staticAtSync(void*);
   void AtSync(void); // Everything is at the PE barrier
-  void ProcessAtSync(int lb_period);
   void ProcessAtSync(void); // Receive a message from AtSync to avoid
                             // making projections output look funny
   void ProcessAtSyncMin(void);
@@ -98,9 +97,11 @@ public:
   void depositData(CLBStatsMsg *m);
   void LoadBalance(void); 
   void ResumeClients(int);                      // Resuming clients needs
-                                               // to be resumed via message
-  void ResumeClients(CkReductionMsg *);
-  void ResumeClients(int, int);
+
+  void LoadBalanceDecision(int);
+  void ReceiveIterationNo(CkReductionMsg *msg); // Receives the current iter no
+
+  void ResumeClients(CkReductionMsg *); // to be resumed via message
   void ReceiveMigration(LBMigrateMsg *);       // Receive migration data
   void ProcessReceiveMigration(CkReductionMsg  *);
 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))