Adding node awareness to CommAwareRefineLB
authorHarshitha <gplkrsh2@illinois.edu>
Sat, 28 Apr 2012 19:21:05 +0000 (14:21 -0500)
committerHarshitha <gplkrsh2@illinois.edu>
Sat, 28 Apr 2012 19:21:05 +0000 (14:21 -0500)
src/ck-ldb/CommAwareRefineLB.C

index 71498fc8db8d3a06d5b05ee80fb66e667df74edf..33d88b1ad21d225b03df9e678fde5f3483408174 100644 (file)
@@ -29,7 +29,7 @@
 
 #include <time.h>
 
-#define THRESHOLD 0.1
+#define THRESHOLD 0.02
 #define SWAP_MULTIPLIER 5 
 
 inline void eraseObjFromParrObjs(std::vector<int> & parr_objs, int remove_objid);
@@ -40,6 +40,8 @@ inline void handleTransfer(int randomly_obj_id, ProcInfo& p, int possible_pe, st
 inline void updateLoadInfo(int p_index, int possible_pe, double upper_threshold, double lower_threshold,
                            std::vector<int> &parr_above_avg, std::vector<int> &parr_below_avg,
                            bool* proc_load_info, ProcArray *parr);
+inline void getPossiblePes(std::vector<int>& possible_pes, int randomly_obj_id,
+    ObjGraph *ogr, ProcArray* parr);
 
 double upper_threshold;
 double lower_threshold;
@@ -145,12 +147,10 @@ void PrintProcObj(ProcArray *parr, std::vector<int>* parr_objs) {
 }
 
 
-void CommAwareRefineLB::work(LDStats* stats)
-{
+void CommAwareRefineLB::work(LDStats* stats) {
   /** ========================== INITIALIZATION ============================= */
   ProcArray *parr = new ProcArray(stats);       // Processor Array
   ObjGraph *ogr = new ObjGraph(stats);          // Object Graph
-  std::vector<int> vertexid_pe(ogr->vertices.size()); // Stores the cur pe of obj
   double avgload = parr->getAverageLoad();      // Average load of processors
 
   // Sets to false if it is overloaded, else to true
@@ -161,11 +161,9 @@ void CommAwareRefineLB::work(LDStats* stats)
   // that processor
   std::vector<int>* parr_objs = new std::vector<int>[parr->procs.size()];
 
-  // Create an array for each chare with processor communication info
-  ObjPeCommInfo* objpcomm = new ObjPeCommInfo[ogr->vertices.size()];
-
   upper_threshold = avgload + (avgload * THRESHOLD);
-  lower_threshold = avgload - (avgload * THRESHOLD * THRESHOLD);
+  //lower_threshold = avgload - (avgload * THRESHOLD * THRESHOLD);
+  lower_threshold = avgload;
 
   int less_loaded_counter = 0;
 
@@ -184,14 +182,13 @@ void CommAwareRefineLB::work(LDStats* stats)
   for(vert = 0; vert < ogr->vertices.size(); vert++) {
     curr_pe = ogr->vertices[vert].getCurrentPe();
     parr_objs[curr_pe].push_back(vert);
-    vertexid_pe[ogr->vertices[vert].getVertexId()] = curr_pe;
+    ogr->vertices[vert].setNewPe(curr_pe);
   }
 
-
   std::vector<int> parr_above_avg;
   std::vector<int> parr_below_avg;
 
-  double pe_load;
+  double pe_load;  
 
   // Insert into parr_above_avg if the processor fits under the criteria of
   // overloaded processor.
@@ -211,53 +208,6 @@ void CommAwareRefineLB::work(LDStats* stats)
   std::make_heap(parr_above_avg.begin(), parr_above_avg.end(),
       ProcLoadGreater(parr));
 
-  // Construct the datastructure that stores communication information for each
-  // chare with respect to processors
-  for (vert = 0; vert < ogr->vertices.size(); vert++) {
-    std::map<int, int> tmp_map_pid_index;
-    int counter = objpcomm[vert].pcomm.size();
-    int index;
-    for (i = 0; i < ogr->vertices[vert].sendToList.size(); i++) {
-      j = vertexid_pe[ogr->vertices[vert].sendToList[i].getNeighborId()];
-      // TODO: Should it index with vertexId?
-      if (tmp_map_pid_index.count(j) == 0) {
-        tmp_map_pid_index[j] = counter;
-        PeCommInfo pecomminf(j);
-        // TODO: Shouldn't it use vertexId instead of vert?
-        objpcomm[vert].pcomm.push_back(pecomminf);
-        counter++;
-      }
-      index = tmp_map_pid_index[j];
-    
-      objpcomm[vert].pcomm[index].num_msg +=
-        ogr->vertices[vert].sendToList[i].getNumMsgs();
-      objpcomm[vert].pcomm[index].num_bytes +=
-        ogr->vertices[vert].sendToList[i].getNumBytes();
-    }
-
-    for (i = 0; i < ogr->vertices[vert].recvFromList.size(); i++) {
-      j = vertexid_pe[ogr->vertices[vert].recvFromList[i].getNeighborId()];
-
-      if (tmp_map_pid_index.count(j) == 0) {
-        tmp_map_pid_index[j] = counter;
-        PeCommInfo pecomminf(j);
-        // TODO: Shouldn't it use vertexId instead of vert?
-        objpcomm[vert].pcomm.push_back(pecomminf);
-        counter++;
-      }
-      index = tmp_map_pid_index[j];
-
-      objpcomm[vert].pcomm[index].num_msg +=
-        ogr->vertices[vert].sendToList[i].getNumMsgs();
-      objpcomm[vert].pcomm[index].num_bytes +=
-        ogr->vertices[vert].sendToList[i].getNumBytes();
-    }
-
-    // Sort the pe communication vector for this chare
-    std::sort(objpcomm[vert].pcomm.begin(), objpcomm[vert].pcomm.end(),
-        ProcCommGreater());
-  }
-
   int random;
   int randomly_obj_id;
   bool obj_allocated;
@@ -269,88 +219,91 @@ void CommAwareRefineLB::work(LDStats* stats)
 
   // Keep on loadbalancing until the number of above avg processors is 0
   while (parr_above_avg.size() != 0 && total_swaps > 0 && parr_below_avg.size() != 0) {
-      // CkPrintf("Above avg : %d Below avg : %d Total swaps: %d\n", parr_above_avg.size(),
-      //    parr_below_avg.size(), total_swaps);
-      obj_allocated = false;
-      num_tries = 0;
-
-      // Pop the heaviest processor
-      int p_index = popFromProcHeap(parr_above_avg, parr);
-      ProcInfo& p = parr->procs[p_index];
+    // CkPrintf("Above avg : %d Below avg : %d Total swaps: %d\n", parr_above_avg.size(),
+    //    parr_below_avg.size(), total_swaps);
+    obj_allocated = false;
+    num_tries = 0;
+
+    // Pop the heaviest processor
+    int p_index = popFromProcHeap(parr_above_avg, parr);
+    ProcInfo& p = parr->procs[p_index];
+
+    while (!obj_allocated && num_tries < parr_objs[p.getProcId()].size()) {
+
+      // It might so happen that due to overhead load, it might not have any
+      // more objects in its list
+      if (parr_objs[p.getProcId()].size() == 0) {
+        // CkPrintf("No obj left to be allocated\n");
+        obj_allocated = true;
+        break;
+      }
 
-      while (!obj_allocated && num_tries < 5) {
+      int randd = rand();
+      random = randd % parr_objs[p.getProcId()].size();
+      randomly_obj_id = parr_objs[p.getProcId()][random];
+      obj_load = ogr->vertices[randomly_obj_id].getVertexLoad();
 
-        // It might so happen that due to overhead load, it might not have any
-        // more objects in its list
-        if (parr_objs[p.getProcId()].size() == 0) {
-          // CkPrintf("No obj left to be allocated\n");
-          obj_allocated = true;
-          break;
-        }
+      // CkPrintf("Heavy %d: Parr obj size : %d random : %d random obj id : %d\n", p_index,
+      //     parr_objs[p.getProcId()].size(), randd, randomly_obj_id);
+      std::vector<int> possible_pes;
+      getPossiblePes(possible_pes, randomly_obj_id, ogr, parr);
+      for (i = 0; i < possible_pes.size(); i++) {
 
-        int randd = rand();
-        random = randd % parr_objs[p.getProcId()].size();
-        randomly_obj_id = parr_objs[p.getProcId()][random];
-        obj_load = ogr->vertices[randomly_obj_id].getVertexLoad();
+        // If the heaviest communicating processor is there in the list, then
+        // assign it to that.
+        possible_pe = possible_pes[i];
 
-        // CkPrintf("Heavy %d: Parr obj size : %d random : %d random obj id : %d\n", p_index,
-        //     parr_objs[p.getProcId()].size(), randd, randomly_obj_id);
-        for (i = 0; i < objpcomm[randomly_obj_id].pcomm.size(); i++) {
+        if ((parr->procs[possible_pe].getTotalLoad() + obj_load) < upper_threshold) {
+         // CkPrintf("**  Transfered %d(Load %lf) from %d:%d(Load %lf) to %d:%d(Load %lf)\n",
+         //     randomly_obj_id, obj_load, CkNodeOf(p.getProcId()), p.getProcId(), p.getTotalLoad(),
+         //     CkNodeOf(possible_pe), possible_pe,
+         //     parr->procs[possible_pe].getTotalLoad());
 
-          // If the heaviest communicating processor is there in the list, then
-          // assign it to that.
-          possible_pe = objpcomm[randomly_obj_id].pcomm[i].pe_id;
+          handleTransfer(randomly_obj_id, p, possible_pe, parr_objs, ogr, parr);
+          obj_allocated = true;
+          total_swaps--;
+          updateLoadInfo(p_index, possible_pe, upper_threshold, lower_threshold,
+              parr_above_avg, parr_below_avg, proc_load_info, parr);
 
-          if ((parr->procs[possible_pe].getTotalLoad() + obj_load) < upper_threshold) {
-            //CkPrintf("** Iter[%d] : Transfered %d from %d : Load %E to %d : Load %E Comm : %d\n",
-            //    i, randomly_obj_id, p.getProcId(), p.getTotalLoad(),
-            //    possible_pe, parr->procs[possible_pe].getTotalLoad(),
-            //    objpcomm[randomly_obj_id].pcomm[i].num_msg);
+          break;
+        }
+      }
 
-            handleTransfer(randomly_obj_id, p, possible_pe, parr_objs, ogr, parr);
+      // Since there is no processor in the least loaded list with which this
+      // chare communicates, pick a random least loaded processor.
+      if (!obj_allocated) {
+        //CkPrintf(":( Could not transfer to the nearest communicating ones\n");
+        for (int x = 0; x < parr_below_avg.size(); x++) {
+          int random_pe = parr_below_avg[x];
+          if ((parr->procs[random_pe].getTotalLoad() + obj_load) < upper_threshold) {
             obj_allocated = true;
             total_swaps--;
-            updateLoadInfo(p_index, possible_pe, upper_threshold, lower_threshold,
+            handleTransfer(randomly_obj_id, p, random_pe, parr_objs, ogr, parr);
+            updateLoadInfo(p_index, random_pe, upper_threshold, lower_threshold,
                 parr_above_avg, parr_below_avg, proc_load_info, parr);
-
             break;
           }
-        }
-
-        // Since there is no processor in the least loaded list with which this
-        // chare communicates, pick a random least loaded processor.
-        if (!obj_allocated) {
-          // CkPrintf(":( Could not transfer to the nearest communicating ones\n");
-          for (int x = 0; x < parr_below_avg.size(); x++) {
-            int random_pe = parr_below_avg[x];
-            if ((parr->procs[random_pe].getTotalLoad() + obj_load) < upper_threshold) {
-              obj_allocated = true;
-              total_swaps--;
-              handleTransfer(randomly_obj_id, p, random_pe, parr_objs, ogr, parr);
-              updateLoadInfo(p_index, random_pe, upper_threshold, lower_threshold,
-                  parr_above_avg, parr_below_avg, proc_load_info, parr);
-              break;
-            }
-            num_tries++;
-          }
+          num_tries++;
         }
       }
+    }
 
-      if (!obj_allocated) {
-        CkPrintf("!!!! Could not handle the heavy proc %d so giving up\n", p_index);
-       // parr_above_avg.push_back(p_index);
-       // std::push_heap(parr_above_avg.begin(), parr_above_avg.end(),
-       //     ProcLoadGreater(parr));
-      }
+    if (!obj_allocated) {
+      //CkPrintf("!!!! Could not handle the heavy proc %d so giving up\n", p_index);
+      // parr_above_avg.push_back(p_index);
+      // std::push_heap(parr_above_avg.begin(), parr_above_avg.end(),
+      //     ProcLoadGreater(parr));
     }
+  }
 
+  //CkPrintf("CommAwareRefine> After lb max load: %lf avg load: %lf\n", max_load, avg_load/parr->procs.size());
 
   /** ============================== CLEANUP ================================ */
   ogr->convertDecisions(stats);         // Send decisions back to LDStats
   delete parr;
   delete ogr;
+  delete proc_load_info;
   delete[] parr_objs;
-  delete[] objpcomm;
 }
 
 inline void eraseObjFromParrObjs(std::vector<int> & parr_objs, int remove_objid) {
@@ -362,7 +315,6 @@ inline void eraseObjFromParrObjs(std::vector<int> & parr_objs, int remove_objid)
   }
 }
 
-
 inline void printMapping(std::vector<Vertex> &vertices) {
   for (int i = 0; i < vertices.size(); i++) {
     CkPrintf("%d: old map : %d new map : %d\n", i, vertices[i].getCurrentPe(),
@@ -438,6 +390,80 @@ inline void updateLoadInfo(int p_index, int possible_pe, double upper_threshold,
 
 }
 
+inline void getPossiblePes(std::vector<int>& possible_pes, int vert,
+    ObjGraph *ogr, ProcArray* parr) {
+  std::map<int, int> tmp_map_pid_index;
+  int counter = 0;
+  int index;
+  int i, j, nbrid;
+  ObjPeCommInfo objpcomm;
+ // CkPrintf("%d sends msgs to %d and recv msgs from %d\n", vert,
+ //   ogr->vertices[vert].sendToList.size(),
+ //   ogr->vertices[vert].recvFromList.size());
+  
+  for (i = 0; i < ogr->vertices[vert].sendToList.size(); i++) {
+    nbrid = ogr->vertices[vert].sendToList[i].getNeighborId();
+    j = ogr->vertices[nbrid].getNewPe(); // Fix me!! New PE
+    // TODO: Should it index with vertexId?
+    if (tmp_map_pid_index.count(j) == 0) {
+      tmp_map_pid_index[j] = counter;
+      PeCommInfo pecomminf(j);
+      // TODO: Shouldn't it use vertexId instead of vert?
+      objpcomm.pcomm.push_back(pecomminf);
+      counter++;
+    }
+    index = tmp_map_pid_index[j];
+
+    objpcomm.pcomm[index].num_msg +=
+      ogr->vertices[vert].sendToList[i].getNumMsgs();
+    objpcomm.pcomm[index].num_bytes +=
+      ogr->vertices[vert].sendToList[i].getNumBytes();
+  }
+
+  for (i = 0; i < ogr->vertices[vert].recvFromList.size(); i++) {
+    nbrid = ogr->vertices[vert].recvFromList[i].getNeighborId();
+    j = ogr->vertices[nbrid].getNewPe();
+
+    if (tmp_map_pid_index.count(j) == 0) {
+      tmp_map_pid_index[j] = counter;
+      PeCommInfo pecomminf(j);
+      // TODO: Shouldn't it use vertexId instead of vert?
+      objpcomm.pcomm.push_back(pecomminf);
+      counter++;
+    }
+    index = tmp_map_pid_index[j];
+
+    objpcomm.pcomm[index].num_msg +=
+      ogr->vertices[vert].sendToList[i].getNumMsgs();
+    objpcomm.pcomm[index].num_bytes +=
+      ogr->vertices[vert].sendToList[i].getNumBytes();
+  }
+
+  // Sort the pe communication vector for this chare
+  std::sort(objpcomm.pcomm.begin(), objpcomm.pcomm.end(),
+      ProcCommGreater());
+
+  int pe_id;
+  int node_id;
+  int node_size;
+  int node_first;
+  //CkPrintf("%d talks to %d pes and possible pes are :\n", vert,
+  //    objpcomm.pcomm.size());
+  for (i = 0; i < objpcomm.pcomm.size(); i++) {
+    pe_id = objpcomm.pcomm[i].pe_id;
+    node_id = CkNodeOf(pe_id);
+    node_size = CkNodeSize(node_id);
+    node_first = CkNodeFirst(node_id);
+   // CkPrintf("smp details pe_id %d, node_id %d, node_size %d, node_first %d\n",
+   //   pe_id, node_id, node_size, node_first);
+    for (j = 0; j < node_size; j++) {
+      possible_pes.push_back(node_first + j);
+      //CkPrintf("\t %d:%d (comm: %d)\n",node_id, node_first+j, objpcomm.pcomm[i].num_bytes); 
+    }
+  }
+}
+
+
 #include "CommAwareRefineLB.def.h"
 
 /*@}*/