Merge branch 'charm' of charmgit:charm into charm
[charm.git] / src / ck-ldb / RefineSwapLB.C
1 /** \file RefineSwapLB.C
2  *
3  *  Written by Harshitha Menon 
4  *
5  *  Status:
6  *    -- Does not support pe_speed's currently
7  *    -- Does not support nonmigratable attribute
8  */
9
10 /**
11  * \addtogroup CkLdb
12 */
13 /*@{*/
14
15 #include "RefineSwapLB.h"
16 #include "ckgraph.h"
17 #include <algorithm>
18 #include <iostream>
19
20 using std::cout;
21 using std::endl;
22
23 CreateLBFunc_Def(RefineSwapLB,
24     "always assign the heaviest obj onto lightest loaded processor.")
25
26 RefineSwapLB::RefineSwapLB(const CkLBOptions &opt): CentralLB(opt)
27 {
28   lbname = "RefineSwapLB";
29   if (CkMyPe()==0)
30     CkPrintf("[%d] RefineSwapLB created\n",CkMyPe());
31 }
32
33 CmiBool RefineSwapLB::QueryBalanceNow(int _step)
34 {
35   //  CkPrintf("[%d] Balancing on step %d\n",CkMyPe(),_step);
36   return CmiTrue;
37 }
38
39 class ProcLoadGreater {
40   public:
41     bool operator()(ProcInfo p1, ProcInfo p2) {
42       return (p1.getTotalLoad() > p2.getTotalLoad());
43     }
44 };
45
46 class ProcLoadGreaterIndex {
47  public: 
48   ProcLoadGreaterIndex(ProcArray * parr) : parr(parr) {}
49   bool operator()(int lhs, int rhs) {
50     return (parr->procs[lhs].getTotalLoad() < parr->procs[rhs].getTotalLoad());
51   }
52  private:
53   ProcArray *parr;
54 };
55
56 class ObjLoadGreater {
57   public:
58     ObjLoadGreater(ObjGraph* ogr) : ogr(ogr) {}
59     bool operator()(int lhs, int rhs) {
60       return (ogr->vertices[lhs].getVertexLoad() < ogr->vertices[rhs].getVertexLoad());
61     }
62   private:
63     ObjGraph* ogr;
64 };
65
66 inline void addObjToProc(ProcArray* parr, ObjGraph* ogr, std::vector<int>*
67     pe_obj, int pe_index, int obj_index) {
68
69   // Set the new pe
70   ogr->vertices[obj_index].setNewPe(pe_index);
71
72   // Add obj to the pe obj list
73   pe_obj[pe_index].push_back(obj_index);
74
75   // Update load
76   parr->procs[pe_index].totalLoad() += ogr->vertices[obj_index].getVertexLoad();
77 }
78
79 inline void removeObjFromProc(ProcArray* parr, ObjGraph* ogr, std::vector<int>*
80     pe_obj, int pe_index, int arr_index) {
81
82   // Update load
83   parr->procs[pe_index].totalLoad() -=
84       ogr->vertices[pe_obj[pe_index][arr_index]].getVertexLoad();
85
86   // Remove from pe_obj
87   pe_obj[pe_index].erase(pe_obj[pe_index].begin() + arr_index);
88 }
89
90
91 inline int getMax(ProcArray* parr, std::vector<int>& max_pe_heap) {
92   int p_index = max_pe_heap.front();
93   std::pop_heap(max_pe_heap.begin(), max_pe_heap.end(),
94       ProcLoadGreaterIndex(parr));
95   max_pe_heap.pop_back();
96   return p_index;
97 }
98
99 bool refine(ProcArray* parr, ObjGraph* ogr, std::vector<int>& max_pe_heap, 
100     std::vector<int>& min_pe_heap, std::vector<int>* pe_obj, int max_pe,
101     double avg_load, double threshold) {
102
103   int best_p, best_p_iter, arr_index;
104   bool allocated = false;
105   int pe_considered;
106   int obj_considered;
107   double best_size = 0.0;
108   std::sort(pe_obj[max_pe].begin(), pe_obj[max_pe].end(), ObjLoadGreater(ogr));
109
110   // Iterate over all the min pes and see which is the best object to
111   // transfer.
112
113   for (int i = (pe_obj[max_pe].size()-1); i >= 0; i--) {
114     for (int j = 0; j < min_pe_heap.size(); j++) {
115       obj_considered = pe_obj[max_pe][i];
116       pe_considered = min_pe_heap[j];
117    
118       if (parr->procs[pe_considered].getTotalLoad() +
119           ogr->vertices[obj_considered].getVertexLoad() < (avg_load + threshold)) {
120     //    if (ogr->vertices[obj_considered].getVertexLoad() > best_size) {
121           best_size = ogr->vertices[obj_considered].getVertexLoad();
122           best_p = pe_considered;
123           best_p_iter = j;
124           arr_index = i;
125           allocated = true;
126           break;
127     //    }
128       }
129     }
130   }
131
132   if (allocated) {
133
134     int best_obj = pe_obj[max_pe][arr_index];
135     addObjToProc(parr, ogr, pe_obj, best_p, best_obj);
136     removeObjFromProc(parr, ogr, pe_obj, max_pe, arr_index);
137
138     // Update the max heap and min list
139     if (parr->procs[max_pe].getTotalLoad() > (avg_load + threshold)) {
140       // Reinsert
141       max_pe_heap.push_back(max_pe);
142       std::push_heap(max_pe_heap.begin(), max_pe_heap.end(),
143           ProcLoadGreaterIndex(parr));
144     } else if (parr->procs[max_pe].getTotalLoad() < (avg_load - threshold)) {
145       // Insert into the list of underloaded procs
146       min_pe_heap.push_back(max_pe);
147     }
148
149     if (parr->procs[best_p].getTotalLoad() > (avg_load - threshold)) {
150       // Remove from list of underloaded procs
151       min_pe_heap.erase(min_pe_heap.begin() + best_p_iter);
152     }
153   }
154   return allocated;
155 }
156
157 bool IsSwapPossWithPe(ProcArray* parr, ObjGraph* ogr, std::vector<int>* pe_obj,
158     std::vector<int>& max_pe_heap, std::vector<int>& min_pe_heap,
159     int max_pe, int pe_considered, int pe_cons_iter, double diff,
160     double avg_load, double threshold) {
161
162   bool set = false;
163   for (int i = pe_obj[max_pe].size() - 1; i >= 0; i--) {
164     for (int j = 0; j < pe_obj[pe_considered].size(); j++) {
165       int pe_cons = pe_obj[pe_considered][j];
166       int max_pe_obj = pe_obj[max_pe][i];
167      // CkPrintf("\tCandidates %d(%lf) with %d(%lf) : diff (%lf)\n", max_pe_obj,
168      //     ogr->vertices[max_pe_obj].getVertexLoad(), pe_cons,
169      //     ogr->vertices[pe_cons].getVertexLoad(), diff);
170
171       if (ogr->vertices[pe_cons].getVertexLoad() <
172           ogr->vertices[max_pe_obj].getVertexLoad()) {
173         if ((diff + ogr->vertices[pe_cons].getVertexLoad()) >
174             ogr->vertices[max_pe_obj].getVertexLoad()) {
175           //CkPrintf("\tSwapping %d with %d\n", max_pe_obj, pe_cons);
176           set = true;
177
178           addObjToProc(parr, ogr, pe_obj, pe_considered, max_pe_obj);
179           removeObjFromProc(parr, ogr, pe_obj, max_pe, i);
180
181           addObjToProc(parr, ogr, pe_obj, max_pe, pe_cons);
182           removeObjFromProc(parr, ogr, pe_obj, pe_considered, j);
183
184           // Update the max heap and min list
185           if (parr->procs[max_pe].getTotalLoad() > (avg_load + threshold)) {
186             // Reinsert
187             max_pe_heap.push_back(max_pe);
188             std::push_heap(max_pe_heap.begin(), max_pe_heap.end(),
189                 ProcLoadGreaterIndex(parr));
190           } else if (parr->procs[max_pe].getTotalLoad() < (avg_load - threshold)) {
191             // Insert into the list of underloaded procs
192             min_pe_heap.push_back(max_pe);
193           }
194
195           if (parr->procs[pe_considered].getTotalLoad() > (avg_load - threshold)) {
196             // Remove from list of underloaded procs
197             min_pe_heap.erase(min_pe_heap.begin() + pe_cons_iter);
198           }
199           break;
200         }
201       }
202     }
203
204     if (set) {
205       break;
206     }
207   }
208   return set;
209 }
210
211 bool refineSwap(ProcArray* parr, ObjGraph* ogr, std::vector<int>& max_pe_heap, 
212     std::vector<int>& min_pe_heap, std::vector<int>* pe_obj, int max_pe,
213     double avg_load, double threshold) {
214
215   double diff = 0;
216   bool is_possible = false;
217   int pe_considered;
218   int pe_cons_iter;
219   for (int i = 0; i < min_pe_heap.size(); i++) {
220     pe_considered = min_pe_heap[i];
221     pe_cons_iter = i;
222     std::sort(pe_obj[pe_considered].begin(), pe_obj[pe_considered].end(), ObjLoadGreater(ogr));
223     diff = avg_load - parr->procs[pe_considered].getTotalLoad();
224
225 //    CkPrintf("Checking to swap maxload pe %d  with minpe %d  + diff %lf \n",
226 //        max_pe, pe_considered, diff);
227     is_possible = IsSwapPossWithPe(parr, ogr, pe_obj, max_pe_heap, min_pe_heap, max_pe,
228         pe_considered, pe_cons_iter, diff, avg_load, threshold); 
229     if (is_possible) {
230       break;
231     }
232   }
233
234   if (!is_possible) {
235     return false;
236   }
237
238   return true;
239 }
240
241 void RefineSwapLB::work(LDStats* stats)
242 {
243   /** ========================== INITIALIZATION ============================= */
244   ProcArray *parr = new ProcArray(stats);       // Processor Array
245   ObjGraph *ogr = new ObjGraph(stats);          // Object Graph
246
247
248   /** ============================= STRATEGY ================================ */
249
250   if (_lb_args.debug()>1) 
251     CkPrintf("[%d] In RefineSwapLB strategy\n",CkMyPe());
252
253   int vert;
254   double avg_load = parr->getAverageLoad();
255   double threshold = avg_load * 0.01;
256   double lower_bound_load = avg_load - threshold;
257   double upper_bound_load = avg_load + threshold;
258   cout <<"Average load " << avg_load << endl;
259   
260   std::vector<int> min_pe_heap;
261   std::vector<int> max_pe_heap;
262
263   std::vector<int>* pe_obj = new std::vector<int>[parr->procs.size()];
264
265
266   // Create a datastructure to store the objects in a processor
267   for (int i = 0; i < ogr->vertices.size(); i++) {
268     pe_obj[ogr->vertices[i].getCurrentPe()].push_back(i);
269 //    CkPrintf("%d pe %d: %lf\n", i, ogr->vertices[i].getCurrentPe(), ogr->vertices[i].getVertexLoad());
270   }
271
272   // Construct max heap of overloaded processors and min heap of underloaded
273   // processors.
274   for (int i = 0; i < parr->procs.size(); i++) {
275     //CkPrintf("%d : %lf\n", i, parr->procs[i].getTotalLoad());
276     if (parr->procs[i].getTotalLoad() > upper_bound_load) {
277       max_pe_heap.push_back(i);
278     } else if (parr->procs[i].getTotalLoad() < lower_bound_load) {
279       min_pe_heap.push_back(i);
280     }
281   }
282
283   std::make_heap(max_pe_heap.begin(), max_pe_heap.end(), ProcLoadGreaterIndex(parr));
284
285   while (max_pe_heap.size() != 0 && min_pe_heap.size() != 0) {
286     int p_index = getMax(parr, max_pe_heap);
287     ProcInfo &pinfo = parr->procs[p_index];
288
289     bool success = refine(parr, ogr, max_pe_heap, min_pe_heap, pe_obj, p_index, avg_load, threshold);
290     
291
292     if (!success) {
293       // Swap with something. 
294
295       if (!refineSwap(parr, ogr, max_pe_heap, min_pe_heap, pe_obj, p_index, avg_load,
296             threshold)) {
297         max_pe_heap.push_back(p_index);
298         std::push_heap(max_pe_heap.begin(), max_pe_heap.end(),
299             ProcLoadGreaterIndex(parr));
300         break;
301       }
302     }
303   }
304
305   /** ============================== CLEANUP ================================ */
306   ogr->convertDecisions(stats);         // Send decisions back to LDStats
307   delete[] pe_obj;
308   delete parr;
309   delete ogr;
310 }
311
312 #include "RefineSwapLB.def.h"
313
314 /*@}*/
315