Instead of doing a reduction to get the iteration no, send a msg
authorHarshitha <gplkrsh2@illinois.edu>
Mon, 5 Mar 2012 21:51:59 +0000 (15:51 -0600)
committerHarshitha <gplkrsh2@illinois.edu>
Mon, 5 Mar 2012 21:51:59 +0000 (15:51 -0600)
src/ck-ldb/CentralLB.C
src/ck-ldb/CentralLB.ci
src/ck-ldb/CentralLB.h

index f788332e0b9e431cb56151b937246fdf10a098b8..258b922c7484097c0f25011dfd3a4b7667e40107 100644 (file)
@@ -53,6 +53,11 @@ double cur_max_pe_load = 0.0;
 double cur_avg_pe_load = 0.0;
 double prev_load = 0.0;
 
+bool lb_period_informed = false;
+
+int global_max_iter_no = 0;
+int global_recv_iter_counter = 0;
+
 struct AdaptiveData {
   int iteration;
   double max_load;
@@ -94,9 +99,23 @@ CkReductionMsg* lbDataCollection(int nMsg, CkReductionMsg** msgs) {
   return CkReductionMsg::buildNew(4*sizeof(double), lb_data);
 }
 
+CkReductionMsg* lbIterMax(int nMsg, CkReductionMsg** msgs) {
+  int max[1];
+  //CkPrintf("\tReduction type no of msg %d\n", nMsg);
+  max[0] = 0;
+  for (int i = 0; i < nMsg; i++) {
+    int *m = (int *)msgs[i]->getData();
+    max[0] = ((m[0] > max[0]) ? m[0] : max[0]);
+  //  CkPrintf("\tReduction type msg %d\n", *m);
+  }
+  return CkReductionMsg::buildNew(sizeof(int), max);
+}
+
 /*global*/ CkReduction::reducerType lbDataCollectionType;
+/*global*/ CkReduction::reducerType lbIterMaxType;
 /*initcall*/ void registerLBDataCollection(void) {
   lbDataCollectionType = CkReduction::addReducer(lbDataCollection);
+  lbIterMaxType = CkReduction::addReducer(lbIterMax);
 }
 
 /*
@@ -237,12 +256,6 @@ void CentralLB::AtSync()
 void CentralLB::ProcessAtSync()
 {
 
-  // Reset all adaptive lb related fields since load balancing is being done.
-  lb_no_iterations = -1;
-  adaptive_lbdb.history_data.clear();
-  prev_load = 0.0;
-  lb_ideal_period = INT_MAX;
-  local_state = OFF;
 
 
 #if CMK_LBDB_ON
@@ -294,16 +307,19 @@ void CentralLB::ProcessAtSyncMin()
 #endif
   
   lb_no_iterations++;
 CkPrintf("ProcessAtSyncMin [%d] lb_iteration [%d] lb_ideal_period [%d]\n", CkMyPe(),
-      lb_no_iterations, lb_ideal_period);
// CkPrintf("[%d] ProcessAtSyncMin lb_iteration [%d] lb_ideal_period [%d]\n", CkMyPe(),
//     lb_no_iterations, lb_ideal_period);
 
   // If decision has been made and has reached the lb_period, then do load
   // balancing, else if hasn't reached ideal_period, then resume.
   if (local_state == DECIDED) {
     if (lb_no_iterations < lb_ideal_period) {
-      ResumeClients(1);
+ //     CkPrintf("[%d] Decision is made but lagging\n", CkMyPe());
+      SendMinStats();
+      ResumeClients(0);
     } else {
       local_state = LOAD_BALANCE;
+ //     CkPrintf("[%d] Decision is made and do LB\n", CkMyPe());
       ProcessAtSync();
     }
     return;
@@ -314,7 +330,8 @@ void CentralLB::ProcessAtSyncMin()
   // dont resume client.
   if (local_state == ON) {
     if (lb_no_iterations < lb_ideal_period) {
-      ResumeClients(1);
+      SendMinStats();
+      ResumeClients(0);
     } else {
       local_state = PAUSE;
     }
@@ -322,7 +339,7 @@ void CentralLB::ProcessAtSyncMin()
   }
 
   SendMinStats();
-  ResumeClients(1);
+  ResumeClients(0);
 #endif
 }
 
@@ -332,7 +349,7 @@ void CentralLB::SendMinStats() {
  double idle_time;
  double bg_walltime;
   theLbdb->GetTime(&total_load,&total_load, &idle_time, &bg_walltime, &bg_walltime);
-  CkPrintf("Total walltime [%d] %lf: %lf: %lf final laod: %lf\n", CkMyPe(), total_load, idle_time, bg_walltime, (total_load - idle_time));
// CkPrintf("Total walltime [%d] %lf: %lf: %lf final laod: %lf\n", CkMyPe(), total_load, idle_time, bg_walltime, (total_load - idle_time));
 
   // Since the total_load is cumulative since the last load balancing stage,
   // Hence it is subtracted from the previous load.
@@ -350,6 +367,13 @@ void CentralLB::SendMinStats() {
   CkCallback cb(CkIndex_CentralLB::ReceiveMinStats((CkReductionMsg*)NULL), 
                   thisProxy[0]);
   contribute(4*sizeof(double), lb_data, lbDataCollectionType, cb);
+
+//    int tmp1 = lb_no_iterations;
+//    CkPrintf("[%d] contribution iteration_no: %d\n",CkMyPe(), tmp1);
+//    // Send the current iteration no
+//    CkCallback cb1(CkIndex_CentralLB::ReceiveIterationNo((CkReductionMsg*)NULL), 
+//        thisProxy[0]);
+//    contribute(sizeof(int), &tmp1, CkReduction::max_int, cb1);
 }
 
 void CentralLB::ReceiveMinStats(CkReductionMsg *msg) {
@@ -358,7 +382,8 @@ void CentralLB::ReceiveMinStats(CkReductionMsg *msg) {
   double max = load[1];
   double avg = load[0]/load[2];
   int iteration_n = load[3];
-  CkPrintf("Iteration %d Total load : %lf Avg load: %lf Max load: %lf\n\n",lb_no_iterations, load[0], load[0]/load[2], load[1]);
+  //CkPrintf("Iteration %d Total load : %lf Avg load: %lf Max load: %lf for %lf procs\n\n",iteration_n, load[0], load[0]/load[2], load[1], load[2]);
+  delete msg;
 
   // Store the data for this iteration
   AdaptiveData data;
@@ -367,17 +392,25 @@ void CentralLB::ReceiveMinStats(CkReductionMsg *msg) {
   data.avg_load = avg;
   adaptive_lbdb.history_data.push_back(data);
 
+  if (lb_period_informed) {
+    return;
+  }
+
   // If the max/avg ratio is greater than the threshold and also this is not the
   // step immediately after load balancing, carry out load balancing
+  //if (max/avg >= 1.1 && adaptive_lbdb.history_data.size() > 4) {
   if (max/avg >= 1.1 && adaptive_lbdb.history_data.size() > 4) {
-    CkPrintf("Carry out load balancing step\n");
+    //CkPrintf("Carry out load balancing step at iter\n");
     lb_ideal_period = iteration_n + 1;
+    lb_period_informed = true;
     thisProxy.LoadBalanceDecision(lb_ideal_period);
     return;
   }
 
   // Generate the plan for the adaptive strategy
   if (generatePlan()) {
+    //CkPrintf("Carry out load balancing step at iter\n");
+    lb_period_informed = true;
     thisProxy.LoadBalanceDecision(lb_ideal_period);
   }
 }
@@ -397,7 +430,7 @@ bool CentralLB::generatePlan() {
     data = adaptive_lbdb.history_data[i];
     max += data.max_load;
     avg += data.avg_load;
-    CkPrintf("max (%d, %lf) avg (%d, %lf)\n", i, data.max_load, i, data.avg_load);
+    //CkPrintf("max (%d, %lf) avg (%d, %lf)\n", i, data.max_load, i, data.avg_load);
   }
 //  max /= (lb_no_iterations - adaptive_lbdb.history_data[0].iteration);
 //  avg /= (lb_no_iterations - adaptive_lbdb.history_data[0].iteration);
@@ -410,13 +443,13 @@ bool CentralLB::generatePlan() {
   // area between the max and avg curve 
   double mslope, aslope, mc, ac;
   getLineEq(aslope, ac, mslope, mc);
-  CkPrintf("\n max: %fx + %f; avg: %fx + %f\n", mslope, mc, aslope, ac);
+  //CkPrintf("\n max: %fx + %f; avg: %fx + %f\n", mslope, mc, aslope, ac);
   double a = (mslope - aslope)/2;
   double b = (mc - ac);
   double c = -(lb_strategy_cost + lb_migration_cost);
   //c = -2.5;
   lb_ideal_period = getPeriodForLinear(a, b, c);
-  CkPrintf("Ideal period for linear load %d\n", lb_ideal_period);
+  //CkPrintf("Ideal period for linear load %d\n", lb_ideal_period);
 
   return true;
 }
@@ -466,20 +499,16 @@ bool CentralLB::getLineEq(double& aslope, double& ac, double& mslope, double& mc
 }
 
 void CentralLB::LoadBalanceDecision(int period) {
-  CkPrintf("[%d] Load balance decision made cur iteration: %d period:%d state: %d\n",CkMyPe(), lb_no_iterations, period, local_state);
+  //CkPrintf("[%d] Load balance decision made cur iteration: %d period:%d state: %d\n",CkMyPe(), lb_no_iterations, period, local_state);
   lb_ideal_period = period;
   if (local_state == OFF) {
     local_state = ON;
-
-    // Send the current iteration no
-    CkCallback cb(CkIndex_CentralLB::ReceiveIterationNo((CkReductionMsg*)NULL), 
-        thisProxy[0]);
-    int tmp = lb_no_iterations;
-    contribute(sizeof(int), &tmp, CkReduction::max_int, cb);
+    thisProxy[0].ReceiveIterationNo(lb_no_iterations);
     return;
   }
 
   if (local_state == ON) {
+    CkPrintf("lb will happen at %d\n", lb_ideal_period);
     local_state = DECIDED;
     return;
   }
@@ -490,20 +519,25 @@ void CentralLB::LoadBalanceDecision(int period) {
   if (local_state == PAUSE) {
     if (lb_no_iterations < lb_ideal_period) {
       local_state = DECIDED;
-      ResumeClients(1);
+      ResumeClients(0);
     } else {
       ProcessAtSync();
     }
   }
 }
 
-void CentralLB::ReceiveIterationNo(CkReductionMsg *msg) {
+void CentralLB::ReceiveIterationNo(int local_iter_no) {
   CmiAssert(CkMyPe() == 0);
-  int *it_no = (int *) msg->getData();
-  CkPrintf("^^^^ Received all iteration no and final decision %d^^^^\n", it_no[0]);
-  //thisProxy.LoadBalanceDecision(it_no[0]);
-  lb_ideal_period = (lb_ideal_period > it_no[0]) ? lb_ideal_period : it_no[0] + 1;
-  thisProxy.LoadBalanceDecision(lb_ideal_period);
+  global_recv_iter_counter++;
+  if (local_iter_no > global_max_iter_no) {
+    global_max_iter_no = local_iter_no;
+  }
+  if (CkNumPes() == global_recv_iter_counter) {
+    lb_ideal_period = (lb_ideal_period > global_max_iter_no) ? lb_ideal_period : global_max_iter_no + 1;
+    thisProxy.LoadBalanceDecision(lb_ideal_period);
+    global_max_iter_no = 0;
+    global_recv_iter_counter = 0;
+  }
 }
 
 // called only on 0
@@ -1056,6 +1090,14 @@ void CentralLB::ReceiveMigration(LBMigrateMsg *m)
   CkCallback cb(CkIndex_CentralLB::ProcessReceiveMigration((CkReductionMsg*)NULL),
                   thisProxy);
   contribute(0, NULL, CkReduction::max_int, cb);
+
+  // Reset all adaptive lb related fields since load balancing is being done.
+  lb_no_iterations = -1;
+  adaptive_lbdb.history_data.clear();
+  prev_load = 0.0;
+  lb_ideal_period = INT_MAX;
+  local_state = OFF;
+  lb_period_informed = false;
 }
 
 void CentralLB::ProcessReceiveMigration(CkReductionMsg  *msg)
@@ -1271,6 +1313,7 @@ void CentralLB::ResumeClients(int balancing)
 
   theLbdb->ResumeClients();
   if (balancing)  {
+
     CheckMigrationComplete();
     if (future_migrates_expected == 0 || 
             future_migrates_expected == future_migrates_completed) {
@@ -1305,6 +1348,8 @@ void CentralLB::CheckMigrationComplete()
     lbdone = 0;
     future_migrates_expected = -1;
     future_migrates_completed = 0;
+
+
     DEBUGF(("[%d] Migration Complete\n", CkMyPe()));
     // release local barrier  so that the next load balancer can go
     LDOMHandle h;
@@ -1357,7 +1402,6 @@ LBMigrateMsg* CentralLB::Strategy(LDStats* stats)
     lb_strategy_cost = (strat_end_time - strat_start_time);
     CkPrintf("Strategy cost %f %f %f\n", strat_end_time, strat_start_time, lb_strategy_cost);
   }
-
   return msg;
 #else
   return NULL;
index 2189dbe797a5e97685905c75c8a87d852c1801f9..4ad418b31d0464900f86bac56336e4bc8828eda3 100644 (file)
@@ -24,7 +24,7 @@ group [migratable] CentralLB : BaseLB {
   entry void ReceiveStatsViaTree(CkMarshalledCLBStatsMessage data);
   entry void ReceiveCounts(CkReductionMsg *);
   entry void LoadBalanceDecision(int period);
-  entry void ReceiveIterationNo(CkReductionMsg *msg);
+  entry void ReceiveIterationNo(int);
   entry void LoadBalance(void);
   entry void ResumeClients(int);
   entry void ResumeClients(CkReductionMsg *);
index 17cfaccef5e11e90b990c9005e937953fca4a1b6..d69b24c20bfd4ba4d6c74c39f6b0903cdea130f7 100644 (file)
@@ -99,7 +99,7 @@ public:
   void ResumeClients(int);                      // Resuming clients needs
 
   void LoadBalanceDecision(int);
-  void ReceiveIterationNo(CkReductionMsg *msg); // Receives the current iter no
+  void ReceiveIterationNo(int); // Receives the current iter no
 
   void ResumeClients(CkReductionMsg *); // to be resumed via message
   void ReceiveMigration(LBMigrateMsg *);       // Receive migration data