Cleaning up
authorHarshitha <gplkrsh2@illinois.edu>
Tue, 22 May 2012 21:55:05 +0000 (16:55 -0500)
committerHarshitha <gplkrsh2@illinois.edu>
Tue, 22 May 2012 21:55:05 +0000 (16:55 -0500)
src/ck-core/cklocation.C
src/ck-core/cklocation.h
src/ck-ldb/CentralLB.C
src/ck-ldb/LBDatabase.C
src/ck-ldb/LBDatabase.h

index a6c076c17f812a154011d63effaadefa2cbef925..ad7f808b358be06ef5e4362974df38c5d82398a9 100644 (file)
@@ -973,11 +973,7 @@ void CkMigratable::commonInit(void) {
        usesAtSync=CmiFalse;
        usesAutoMeasure=CmiTrue;
        barrierRegistered=CmiFalse;
-  atsync_iteration = -1;
-  //CkPrintf("%s in init and off\n", idx2str(thisIndexMax));
-  local_state = OFF;
-  prev_load = 0.0;
-  lb_done = false;
+  clearAdaptiveData();
        /*
        FAULT_EVAC
        */
@@ -999,6 +995,7 @@ void CkMigratable::pup(PUP::er &p) {
        Chare::pup(p);
        p|thisIndexMax;
        p(usesAtSync);
+  p(can_reset);
        p(usesAutoMeasure);
 #if CMK_LBDB_ON 
        int readyMigrate;
@@ -1078,12 +1075,18 @@ double CkMigratable::getObjTime() {
 }
 
 void CkMigratable::clearAdaptiveData() {
-  local_state = OFF;
-  atsync_iteration = -1;
-  prev_load = 0.0;
+  if (can_reset) {
+    local_state = OFF;
+    atsync_iteration = -1;
+    prev_load = 0.0;
+    can_reset = false;
+  }
 }
 
 void CkMigratable::recvLBPeriod(void *data) {
+  if (atsync_iteration < 0) {
+    return;
+  }
   int lb_period = *((int *) data);
  DEBAD(("\t[obj %s] Received the LB Period %d current iter %d state %d on PE %d\n",
      idx2str(thisIndexMax), lb_period, atsync_iteration, local_state, CkMyPe()));
@@ -1098,17 +1101,13 @@ void CkMigratable::recvLBPeriod(void *data) {
     if (atsync_iteration < lb_period) {
     //  CkPrintf("---[pe %s] pause and decided\n", idx2str(thisIndexMax));
       local_state = DECIDED;
-      lb_done = false;
       ResumeFromSync();
       return;
     }
    // CkPrintf("---[pe %s] load balance\n", idx2str(thisIndexMax));
     local_state = LOAD_BALANCE;
 
-   // local_state = OFF;
-   // atsync_iteration = -1;
-   // prev_load = 0.0;
-    lb_done = true;
+    can_reset = true;
     myRec->getLBDB()->AtLocalBarrier(ldBarrierHandle);
     return;
   }
@@ -1145,8 +1144,12 @@ void CkMigratable::AtSync(int waitForMigration)
   // model-based load balancing, ask user to provide cpu load
   if (usesAutoMeasure == CmiFalse) UserSetLBLoad();
   
-  //  lb_done = true;
   //   myRec->getLBDB()->AtLocalBarrier(ldBarrierHandle);
+  if (atsync_iteration == -1) {
+    can_reset = false;
+    local_state = OFF;
+    prev_load = 0.0;
+  }
 
   atsync_iteration++;
   // CkPrintf("[pe %s] atsync_iter %d && predicted period %d state: %d\n",
@@ -1171,17 +1174,13 @@ void CkMigratable::AtSync(int waitForMigration)
 
   bool is_tentative;
   if (atsync_iteration < myRec->getLBDB()->getPredictedLBPeriod(is_tentative)) {
-    lb_done = false;
     ResumeFromSync();
   } else if (is_tentative) {
     local_state = PAUSE;
   } else if (local_state == DECIDED) {
     DEBAD(("[pe %s] Went to load balance iter %d\n", idx2str(thisIndexMax), atsync_iteration));
     local_state = LOAD_BALANCE;
-   // local_state = OFF;
-   // atsync_iteration = -1;
-   // prev_load = 0.0;
-    lb_done = true;
+    can_reset = true;
     myRec->getLBDB()->AtLocalBarrier(ldBarrierHandle);
   } else {
     DEBAD(("[pe %s] Went to pause state iter %d\n", idx2str(thisIndexMax), atsync_iteration));
index d0e577cec39899b6b38f4ca8f48fd71704a88841..733e9a435eb9457a6bdb8c80106a7e4dc05eabcd 100644 (file)
@@ -286,7 +286,7 @@ private:
     LOAD_BALANCE
   } local_state;
   double  prev_load;
-  double lb_done;
+  bool can_reset;
 
 public:
   CkArrayIndex thisIndexMax;
@@ -299,7 +299,6 @@ public:
 
   virtual int ckGetChareType(void) const;// {return thisChareType;}
   const CkArrayIndex &ckGetArrayIndex(void) const {return myRec->getIndex();}
-  bool didLB() {return lb_done;}
 
 #if CMK_LBDB_ON  //For load balancing:
   //Suspend load balancer measurements (e.g., before CthSuspend)
index 65fdf4e2591a11fe2d40ffa3c79ebf0bdaabdebb..96ee229ecb9670f06a6c0f4eb984fdeda9a37bbb 100644 (file)
@@ -1505,25 +1505,25 @@ LBMigrateMsg* CentralLB::Strategy(LDStats* stats)
   LBRealType mLoad, mCpuLoad, totalLoad, totalLoadWComm;
   info.getSummary(mLoad, mCpuLoad, totalLoad);
   //CkPrintf("CharmLB> Max load w/o comm %lf Max cpu load %lf Avg load %lf\n", mLoad, mCpuLoad, totalLoad/clients);
-  info.print();
-  theLbdb->UpdateAfterLBData(mLoad, mCpuLoad, totalLoad/clients);
+  //info.print();
+  //theLbdb->UpdateAfterLBData(mLoad, mCpuLoad, totalLoad/clients);
 
-  getPredictedLoadWithMsg(stats, clients, msg, info,1);
-  info.getSummary(mLoad, mCpuLoad, totalLoadWComm);
-  info.print();
+  //getPredictedLoadWithMsg(stats, clients, msg, info,1);
+  //info.getSummary(mLoad, mCpuLoad, totalLoadWComm);
+  //info.print();
   //CkPrintf("CharmLB> Max load with comm %lf Max cpu load %lf Avg load %lf\n", mLoad, mCpuLoad, totalLoad/clients);
-  int nmsgs, nbytes;
-  stats->computeNonlocalComm(nmsgs, nbytes);
-  CkPrintf("CharmLB> Non local communication %d msg and %d bytes\n", nmsgs, nbytes);
+  //int nmsgs, nbytes;
+  //stats->computeNonlocalComm(nmsgs, nbytes);
+  //CkPrintf("CharmLB> Non local communication %d msg and %d bytes\n", nmsgs, nbytes);
 
 
-  long msg_n;
-  long long bytes_n;
-  stats->computeComm(msg_n, bytes_n);
-  CkPrintf("CharmLB> Total communication %ld msg and %lld bytes\n", nmsgs, nbytes);
+  //long msg_n;
+  //long long bytes_n;
+  //stats->computeComm(msg_n, bytes_n);
+  //CkPrintf("CharmLB> Total communication %ld msg and %lld bytes\n", nmsgs, nbytes);
 
-  double alpha_beta_cost = (msg_n * alpha) + (bytes_n * beta);
-  theLbdb->UpdateAfterLBComm(alpha_beta_cost/totalLoad);
+  //double alpha_beta_cost = (msg_n * alpha) + (bytes_n * beta);
+  //theLbdb->UpdateAfterLBComm(alpha_beta_cost/totalLoad);
 
   if (_lb_args.debug()) {
     double strat_end_time = CkWallTimer();
@@ -1535,7 +1535,6 @@ LBMigrateMsg* CentralLB::Strategy(LDStats* stats)
     CkPrintf("CharmLB> %s: PE [%d] #Objects migrating: %d, LBMigrateMsg size: %.2f MB\n", lbname, cur_ld_balancer, msg->n_moves, env->getTotalsize()/1024.0/1024.0);
     CkPrintf("CharmLB> %s: PE [%d] strategy finished at %f duration %f s\n",
              lbname, cur_ld_balancer, strat_end_time, strat_end_time-strat_start_time);
-    // FIX ME!!! adaptive_struct.lb_strategy_cost = (strat_end_time - strat_start_time);
     //CkPrintf("Strategy cost %f %f %f\n", strat_end_time, strat_start_time, adaptive_struct.lb_strategy_cost);
     theLbdb->SetStrategyCost(strat_end_time - strat_start_time);
   }
index 4f5471dd759c9ec637b1d48dc0e0769a5272ea26..8360fd128aff603a5ad823a6d9691103cf4280f4 100644 (file)
 
 #include "NullLB.h"
 
-#define VEC_SIZE 500
+#define VEC_SIZE 50
 #define IMB_TOLERANCE 1.1
 #define OUTOFWAY_TOLERANCE 2
 #define UTILIZATION_THRESHOLD 0.7
 #define NEGLECT_IDLE 2 // Should never be == 1
+#define MIN_STATS 6
 
 #   define DEBAD(x) /*CkPrintf x*/
 #   define EXTRA_FEATURE 0
@@ -117,6 +118,7 @@ CkReductionMsg* lbDataCollection(int nMsg, CkReductionMsg** msgs) {
     if (m[0] != lb_data[0]) {
       CkPrintf("Error!!! Reduction is intermingled between iteration %lf and\
       %lf\n", lb_data[0], m[0]);
+      CkAbort("Intermingling iterations\n");
     }
   }
   return CkReductionMsg::buildNew(8*sizeof(double), lb_data);
@@ -460,10 +462,10 @@ void LBDatabase::init(void)
 
   total_load_vec.resize(VEC_SIZE, 0.0);
   total_count_vec.resize(VEC_SIZE, 0);
-  purge_index = 0;
   max_iteration = -1;
   prev_idle = 0.0;
   alpha_beta_cost_to_load = 1.0; // Some random value. TODO: Find the actual
+  adaptive_lbdb.lb_iter_no = -1;
 
   // If metabalancer enabled, initialize the variables
   adaptive_struct.tentative_period =  INT_MAX;
@@ -637,21 +639,16 @@ void LBDatabase::ResumeClients() {
   adaptive_struct.lb_msg_recv_no = 0;
   adaptive_struct.total_syncs_called = 0;
 
-  total_load_vec.clear();
-  total_count_vec.clear();
-  purge_index = 0;
   prev_idle = 0.0;
   if (lb_in_progress) {
     lbdb_no_obj_callback.clear();
     lb_in_progress = false;
   }
 
-  total_load_vec.resize(VEC_SIZE, 0.0);
-  total_count_vec.resize(VEC_SIZE, 0);
-
   // While resuming client, if we find that there are no objects, then handle
   // the case accordingly.
   if (getLBDB()->ObjDataCount() == 0) {
+    CkPrintf("%d processor has 0 objs\n", CkMyPe());
     HandleAdaptiveNoObj();
   }
   LDResumeClients(myLDHandle);
@@ -669,6 +666,12 @@ bool LBDatabase::AddLoad(int it_n, double load) {
     adaptive_struct.lb_iteration_no = it_n;
   }
   total_load_vec[index] += load;
+  if (total_count_vec[index] > getLBDB()->ObjDataCount()) {
+    CkPrintf("iteration %d received %d contributions and expected %d\n", it_n,
+        total_count_vec[index], getLBDB()->ObjDataCount());
+    CkAbort("Abort!!! Received more contribution");
+  }
+
   if (total_count_vec[index] == getLBDB()->ObjDataCount()) {
     double idle_time, bg_walltime, cpu_bgtime;
     IdleTime(&idle_time);
@@ -795,7 +798,8 @@ void LBDatabase::ReceiveMinStats(CkReductionMsg *msg) {
 
     CkPrintf("Prev LB Data Type %d, max/avg %lf, local/remote %lf\n", tmp_lb_type, tmp_max_avg_ratio, tmp_comm_ratio);
 
-    if ((utilization < utilization_threshold || max/avg >= tolerate_imb) && adaptive_lbdb.history_data.size() > 6) {
+    if ((utilization < utilization_threshold || max/avg >= tolerate_imb) &&
+          adaptive_lbdb.history_data.size() > MIN_STATS) {
       CkPrintf("Trigger soon even though we calculated lbperiod max/avg(%lf) and utilization ratio (%lf)\n", max/avg, utilization);
       TriggerSoon(iteration_n, max/avg, tolerate_imb);
       return;
@@ -804,7 +808,8 @@ void LBDatabase::ReceiveMinStats(CkReductionMsg *msg) {
     // If the new lb period from linear extrapolation is greater than maximum
     // iteration known from previously collected data, then inform all the
     // processors about the new calculated period.
-    if (period > adaptive_struct.tentative_max_iter_no) {
+    if (period > adaptive_struct.tentative_max_iter_no && period !=
+          adaptive_struct.final_lb_period) {
       adaptive_struct.doCommStrategy = false;
       adaptive_struct.lb_calculated_period = period;
       adaptive_struct.in_progress = true;
@@ -835,7 +840,8 @@ void LBDatabase::TriggerSoon(int iteration_n, double imbalance_ratio,
   // than the iter +1 and if it is greater than the maximum iteration we have
   // seen so far, then we can inform this
   if ((iteration_n + 1 > adaptive_struct.tentative_max_iter_no) &&
-      (iteration_n+1 < adaptive_struct.lb_calculated_period)) {
+      (iteration_n+1 < adaptive_struct.lb_calculated_period) &&
+      (iteration_n + 1 != adaptive_struct.final_lb_period)) {
     if (imbalance_ratio < tolerate_imb) {
       adaptive_struct.doCommStrategy = true;
       CkPrintf("No load imbalance but idle time\n");
@@ -902,13 +908,14 @@ bool LBDatabase::generatePlan(int& period, double& ratio_at_t) {
 
   GetPrevLBData(tmp_lb_type, tmp_max_avg_ratio, tmp_comm_ratio);
   tolerate_imb = tmp_max_avg_ratio;
-  if (max/avg < tolerate_imb) {
-    CkPrintf("Resorting to imb = 1.0 coz max/avg (%lf) < imb(%lf)\n", max/avg, tolerate_imb);
-    tolerate_imb = 1.0;
-  }
-
-  if (getPeriodForStrategy(tolerate_imb, 1, period, ratio_at_t)) {
-    return true;
+//  if (max/avg < tolerate_imb) {
+//    CkPrintf("Resorting to imb = 1.0 coz max/avg (%lf) < imb(%lf)\n", max/avg, tolerate_imb);
+//    tolerate_imb = 1.0;
+//  }
+  if (max/avg > tolerate_imb) {
+    if (getPeriodForStrategy(tolerate_imb, 1, period, ratio_at_t)) {
+      return true;
+    }
   }
 
   max = 0.0;
@@ -924,6 +931,11 @@ bool LBDatabase::generatePlan(int& period, double& ratio_at_t) {
   avg /= adaptive_lbdb.history_data.size();
   double cost = adaptive_struct.lb_strategy_cost + adaptive_struct.lb_migration_cost;
   period = cost/(max - avg); 
+  CkPrintf("Obtained period %d from constant prediction\n", period);
+  if (period < 0) { 
+    period = adaptive_struct.final_lb_period;
+    CkPrintf("Obtained -ve period from constant prediction so changing to prev %d\n", period);
+  } 
   ratio_at_t = max / avg;
   return true;
 }
@@ -1145,20 +1157,22 @@ void LBDatabase::TriggerAdaptiveReduction() {
 #if EXTRA_FEATURE
   adaptive_struct.lb_iteration_no++;
   //CkPrintf("Trigger adaptive for %d\n", adaptive_struct.lb_iteration_no);
-  double lb_data[6];
+  double lb_data[8];
   lb_data[0] = adaptive_struct.lb_iteration_no;
   lb_data[1] = 1;
   lb_data[2] = 0.0;
   lb_data[3] = 0.0;
   lb_data[4] = 0.0;
   lb_data[5] = 0.0;
+  lb_data[6] = 0.0;
+  lb_data[7] = 0.0;
 
   // CkPrintf("   [%d] sends total load %lf idle time %lf ratio of idle/load %lf at iter %d\n", CkMyPe(),
   //     total_load_vec[iteration], idle_time,
   //     idle_time/total_load_vec[iteration], adaptive_struct.lb_iteration_no);
 
   CkCallback cb(CkIndex_LBDatabase::ReceiveMinStats((CkReductionMsg*)NULL), thisProxy[0]);
-  contribute(6*sizeof(double), lb_data, lbDataCollectionType, cb);
+  contribute(8*sizeof(double), lb_data, lbDataCollectionType, cb);
 #endif
 }
 
index 3244a6ae60b46e1cbe2b6284fa31ab14d03f2796..f1b14a009bbd96b620a6e22960d36a4cebdc1dc6 100644 (file)
@@ -404,7 +404,6 @@ private:
   std::vector<int> total_count_vec;
   std::vector<int> lbdb_no_obj_callback;
   int max_iteration;
-  int purge_index;
 
   double after_lb_max;
   double after_lb_avg;