RefineSwapLB
authorHarshitha Menon <gplkrsh2@illinois.edu>
Mon, 31 Oct 2011 16:42:39 +0000 (11:42 -0500)
committerHarshitha Menon <gplkrsh2@illinois.edu>
Mon, 31 Oct 2011 16:42:39 +0000 (11:42 -0500)
src/ck-ldb/RefineSwapLB.C [new file with mode: 0644]
src/ck-ldb/RefineSwapLB.ci [new file with mode: 0644]
src/ck-ldb/RefineSwapLB.h [new file with mode: 0644]

diff --git a/src/ck-ldb/RefineSwapLB.C b/src/ck-ldb/RefineSwapLB.C
new file mode 100644 (file)
index 0000000..af07a30
--- /dev/null
@@ -0,0 +1,269 @@
+/** \file RefineSwapLB.C
+ *
+ *  Written by Gengbin Zheng
+ *  Updated by Abhinav Bhatele, 2010-12-09 to use ckgraph
+ *
+ *  Status:
+ *    -- Does not support pe_speed's currently
+ *    -- Does not support nonmigratable attribute
+ */
+
+/**
+ * \addtogroup CkLdb
+*/
+/*@{*/
+
+#include "RefineSwapLB.h"
+#include "ckgraph.h"
+#include <algorithm>
+#include <iostream>
+
+using std::cout;
+using std::endl;
+
+CreateLBFunc_Def(RefineSwapLB, "always assign the heaviest obj onto lightest loaded processor.")
+
+RefineSwapLB::RefineSwapLB(const CkLBOptions &opt): CentralLB(opt)
+{
+  lbname = "RefineSwapLB";
+  if (CkMyPe()==0)
+    CkPrintf("[%d] RefineSwapLB created\n",CkMyPe());
+}
+
+CmiBool RefineSwapLB::QueryBalanceNow(int _step)
+{
+  //  CkPrintf("[%d] Balancing on step %d\n",CkMyPe(),_step);
+  return CmiTrue;
+}
+
+class ProcLoadGreater {
+  public:
+    bool operator()(ProcInfo p1, ProcInfo p2) {
+      return (p1.getTotalLoad() > p2.getTotalLoad());
+    }
+};
+
+class ProcLoadGreaterIndex {
+ public: 
+  ProcLoadGreaterIndex(ProcArray * parr) : parr(parr) {}
+  bool operator()(int lhs, int rhs) {
+    return (parr->procs[lhs].getTotalLoad() < parr->procs[rhs].getTotalLoad());
+  }
+ private:
+  ProcArray *parr;
+};
+
+class ObjLoadGreater {
+  public:
+    bool operator()(Vertex v1, Vertex v2) {
+      return (v1.getVertexLoad() > v2.getVertexLoad());
+    }
+};
+
+
+void RefineSwapLB::work(LDStats* stats)
+{
+  /** ========================== INITIALIZATION ============================= */
+  ProcArray *parr = new ProcArray(stats);       // Processor Array
+  ObjGraph *ogr = new ObjGraph(stats);          // Object Graph
+
+
+  /** ============================= STRATEGY ================================ */
+  //parr->resetTotalLoad();
+
+  if (_lb_args.debug()>1) 
+    CkPrintf("[%d] In RefineSwapLB strategy\n",CkMyPe());
+
+  int vert;
+  double avg_load = parr->getAverageLoad();
+  double threshold = avg_load * 0.01;
+  double lower_bound_load = avg_load - threshold;
+  double upper_bound_load = avg_load + threshold;
+  cout <<"Average load " << avg_load << endl;
+  
+  std::vector<int> min_pe_heap;
+  std::vector<int> max_pe_heap;
+
+  std::vector<int>* pe_obj = new std::vector<int>[parr->procs.size()];
+
+  // Construct max heap of overloaded processors and min heap of underloaded
+  // processors.
+  for (int i = 0; i < parr->procs.size(); i++) {
+    if (parr->procs[i].getTotalLoad() > upper_bound_load) {
+      max_pe_heap.push_back(i);
+    } else if (parr->procs[i].getTotalLoad() < lower_bound_load) {
+      min_pe_heap.push_back(i);
+    }
+  }
+
+  // Create a datastructure to store the objects in a processor
+  for (int i = 0; i < ogr->vertices.size(); i++) {
+    pe_obj[ogr->vertices[i].getCurrentPe()].push_back(i);
+  }
+
+  std::make_heap(max_pe_heap.begin(), max_pe_heap.end(), ProcLoadGreaterIndex(parr));
+
+  int best_p;
+  int best_obj;
+  int iter_location;
+  int obj_iter_location;
+  int min_pe_iter;
+  int min_weight_obj;
+  int min_iter_location;
+  while (max_pe_heap.size() != 0 && min_pe_heap.size() != 0) {
+    int ideal_transfer_pe = 0;
+    int p_index = max_pe_heap.front();
+    double best_size = 0.0;
+    double obj_wg_min = 100.0;
+    int allocated = false;
+    int obj_considered;
+    int pe_considered;
+    ProcInfo &pinfo = parr->procs[p_index];
+    std::pop_heap(max_pe_heap.begin(), max_pe_heap.end(),
+        ProcLoadGreaterIndex(parr));
+    max_pe_heap.pop_back();
+    cout << "Picked max pe " << p_index << " (" <<
+        parr->procs[p_index].getTotalLoad() << ")" << endl;
+
+    // Iterate over all the min pes and see which is the best object to
+    // transfer.
+    for (int j = 0; j < min_pe_heap.size(); j++) {
+      for (int i = 0; i < pe_obj[p_index].size(); i++) {
+        obj_considered = pe_obj[p_index][i];
+        pe_considered = min_pe_heap[j];
+
+        if (pe_obj[pe_considered].size() > pe_obj[ideal_transfer_pe].size()) {
+          ideal_transfer_pe = pe_considered;
+        }
+        if (ogr->vertices[obj_considered].getVertexLoad() < obj_wg_min) {
+          min_weight_obj = obj_considered;
+          min_iter_location = i;
+        }
+
+        if (parr->procs[pe_considered].getTotalLoad() + ogr->vertices[obj_considered].getVertexLoad() < (avg_load + threshold)) {
+          if (ogr->vertices[obj_considered].getVertexLoad() > best_size) {
+            best_obj = obj_considered;
+            best_size = ogr->vertices[obj_considered].getVertexLoad();
+            best_p = pe_considered;
+            iter_location = i;
+            min_pe_iter = j;
+            allocated = true;
+          }
+        }
+      }
+    }
+
+    if (allocated) {
+      // Set the new pe
+      ogr->vertices[best_obj].setNewPe(best_p);
+
+      // Remove from pe_obj
+      pe_obj[p_index].erase(pe_obj[p_index].begin() + iter_location);
+      pe_obj[best_p].push_back(best_obj);
+
+      // Update load of underloaded and overloaded
+      parr->procs[p_index].setTotalLoad(parr->procs[p_index].getTotalLoad() -
+          best_size);
+      parr->procs[best_p].setTotalLoad(parr->procs[best_p].getTotalLoad() +
+          best_size);
+
+      std::cout << " Moving obj " << best_obj << " (" << best_size << ") from " << p_index << " to " <<
+            best_p << " New load " << p_index << ":" << parr->procs[p_index].getTotalLoad()
+            << " " << best_p << ":" << parr->procs[best_p].getTotalLoad()<< std::endl; 
+
+      // Update the max heap and min list
+      if (parr->procs[p_index].getTotalLoad() > (avg_load + threshold)) {
+        // Reinsert
+        max_pe_heap.push_back(p_index);
+        std::push_heap(max_pe_heap.begin(), max_pe_heap.end(),
+            ProcLoadGreaterIndex(parr));
+      } else if (parr->procs[p_index].getTotalLoad() < (avg_load - threshold)) {
+        // Insert into the list of underloaded procs
+        min_pe_heap.push_back(p_index);
+      }
+
+      if (parr->procs[best_p].getTotalLoad() > (avg_load - threshold)) {
+        // Remove from list of underloaded procs
+        min_pe_heap.erase(min_pe_heap.begin() + min_pe_iter);
+      }
+    } else {
+      // Swap with something. 
+      // TODO:
+      cout << " Swapping needs to be done" << endl;
+      best_size = 0.0;
+      int temp_iter = -1;
+      for (int i = 0; i < pe_obj[ideal_transfer_pe].size(); i++) {
+        obj_considered = pe_obj[ideal_transfer_pe][i];
+        // Find the max weight obj from the ideal transfer pe.
+          if (ogr->vertices[obj_considered].getVertexLoad() > best_size &&
+              ogr->vertices[min_weight_obj].getVertexLoad() < ogr->vertices[obj_considered].getVertexLoad()) {
+            best_obj = obj_considered;
+            best_size = ogr->vertices[obj_considered].getVertexLoad();
+            temp_iter = i;
+          }
+      }
+          // Swap max weight obj from 
+
+      // Set the new pe
+      ogr->vertices[obj_considered].setNewPe(p_index);
+      ogr->vertices[min_weight_obj].setNewPe(ideal_transfer_pe);
+
+      // Remove from pe_obj
+      pe_obj[p_index].erase(pe_obj[p_index].begin() + min_iter_location);
+      pe_obj[ideal_transfer_pe].erase(pe_obj[ideal_transfer_pe].begin() + temp_iter);
+      pe_obj[p_index].push_back(obj_considered);
+      pe_obj[ideal_transfer_pe].push_back(min_weight_obj);
+
+      // Update load of underloaded and overloaded
+      parr->procs[p_index].setTotalLoad(parr->procs[p_index].getTotalLoad() +
+          ogr->vertices[obj_considered].getVertexLoad() - ogr->vertices[min_weight_obj].getVertexLoad());
+      parr->procs[ideal_transfer_pe].setTotalLoad(parr->procs[best_p].getTotalLoad() -
+          ogr->vertices[obj_considered].getVertexLoad() + ogr->vertices[min_weight_obj].getVertexLoad());
+
+      // Update the max heap and min list
+      if (parr->procs[p_index].getTotalLoad() > (avg_load + threshold)) {
+        // Reinsert
+        max_pe_heap.push_back(p_index);
+        std::push_heap(max_pe_heap.begin(), max_pe_heap.end(),
+            ProcLoadGreaterIndex(parr));
+      } else if (parr->procs[p_index].getTotalLoad() < (avg_load - threshold)) {
+        // Insert into the list of underloaded procs
+        min_pe_heap.push_back(p_index);
+      }
+
+      if (parr->procs[ideal_transfer_pe].getTotalLoad() > (avg_load + threshold)) {
+        // Reinsert
+        max_pe_heap.push_back(ideal_transfer_pe);
+        std::push_heap(max_pe_heap.begin(), max_pe_heap.end(),
+            ProcLoadGreaterIndex(parr));
+        min_pe_heap.erase(min_pe_heap.begin() + min_pe_iter);
+      } else if (parr->procs[ideal_transfer_pe].getTotalLoad() > (avg_load - threshold)) {
+        // Remove from list of underloaded procs
+        min_pe_heap.erase(min_pe_heap.begin() + min_pe_iter);
+      }
+
+       cout << "  Swapping " << obj_considered << " (" <<
+       ogr->vertices[obj_considered].getVertexLoad() << ") " <<  min_weight_obj
+       << "(" << ogr->vertices[min_weight_obj].getVertexLoad() << ")"
+       <<  " from " << ideal_transfer_pe << " (" <<
+       parr->procs[ideal_transfer_pe].getTotalLoad() <<
+       ") to " << p_index << " (" << parr->procs[p_index].getTotalLoad() << ")"<< endl; 
+    }
+  }
+
+  std::cout << "Overloaded Processor load"<< endl;
+  for (int i = 0; i < max_pe_heap.size(); i++) {
+    std::cout << max_pe_heap[i] << ": " << parr->procs[max_pe_heap[i]].getTotalLoad() << std::endl;
+  }
+
+  /** ============================== CLEANUP ================================ */
+  ogr->convertDecisions(stats);         // Send decisions back to LDStats
+  delete[] pe_obj;
+  delete parr;
+  delete ogr;
+}
+
+#include "RefineSwapLB.def.h"
+
+/*@}*/
+
diff --git a/src/ck-ldb/RefineSwapLB.ci b/src/ck-ldb/RefineSwapLB.ci
new file mode 100644 (file)
index 0000000..704e490
--- /dev/null
@@ -0,0 +1,9 @@
+module RefineSwapLB {
+
+extern module CentralLB;
+initnode void lbinit(void);
+group [migratable] RefineSwapLB : CentralLB {
+  entry void RefineSwapLB(const CkLBOptions &);  
+};
+
+};
diff --git a/src/ck-ldb/RefineSwapLB.h b/src/ck-ldb/RefineSwapLB.h
new file mode 100644 (file)
index 0000000..08f26df
--- /dev/null
@@ -0,0 +1,40 @@
+/**
+ * \addtogroup CkLdb
+*/
+/*@{*/
+
+#ifndef _GREEDYLB_H_
+#define _GREEDYLB_H_
+
+#include "CentralLB.h"
+#include "RefineSwapLB.decl.h"
+
+void CreateRefineSwapLB();
+BaseLB * AllocateRefineSwapLB();
+
+class RefineSwapLB : public CentralLB {
+
+public:
+  struct HeapData {
+    double load;
+    int    pe;
+    int    id;
+  };
+
+  RefineSwapLB(const CkLBOptions &);
+  RefineSwapLB(CkMigrateMessage *m):CentralLB(m) { lbname = "RefineSwapLB"; }
+  void work(LDStats* stats);
+private:
+       enum           HeapCmp {GT = '>', LT = '<'};
+       void           Heapify(HeapData*, int, int, HeapCmp);
+       void           HeapSort(HeapData*, int, HeapCmp);
+       void           BuildHeap(HeapData*, int, HeapCmp);
+       CmiBool        Compare(double, double, HeapCmp);
+       HeapData*      BuildCpuArray(BaseLB::LDStats*, int, int *);  
+       HeapData*      BuildObjectArray(BaseLB::LDStats*, int, int *);      
+       CmiBool        QueryBalanceNow(int step);
+};
+
+#endif /* _HEAPCENTLB_H_ */
+
+/*@}*/