Merge branch 'harshitha/adaptive_lb' of charmgit:charm into harshitha/adaptive_lb
[charm.git] / src / ck-ldb / RefineSwapLB.C
1 /** \file RefineSwapLB.C
2  *
3  *  Written by Gengbin Zheng
4  *  Updated by Abhinav Bhatele, 2010-12-09 to use ckgraph
5  *
6  *  Status:
7  *    -- Does not support pe_speed's currently
8  *    -- Does not support nonmigratable attribute
9  */
10
11 /**
12  * \addtogroup CkLdb
13 */
14 /*@{*/
15
16 #include "RefineSwapLB.h"
17 #include "ckgraph.h"
18 #include <algorithm>
19 #include <iostream>
20
21 using std::cout;
22 using std::endl;
23
24 CreateLBFunc_Def(RefineSwapLB, "always assign the heaviest obj onto lightest loaded processor.")
25
26 RefineSwapLB::RefineSwapLB(const CkLBOptions &opt): CentralLB(opt)
27 {
28   lbname = "RefineSwapLB";
29   if (CkMyPe()==0)
30     CkPrintf("[%d] RefineSwapLB created\n",CkMyPe());
31 }
32
33 CmiBool RefineSwapLB::QueryBalanceNow(int _step)
34 {
35   //  CkPrintf("[%d] Balancing on step %d\n",CkMyPe(),_step);
36   return CmiTrue;
37 }
38
39 class ProcLoadGreater {
40   public:
41     bool operator()(ProcInfo p1, ProcInfo p2) {
42       return (p1.getTotalLoad() > p2.getTotalLoad());
43     }
44 };
45
46 class ProcLoadGreaterIndex {
47  public: 
48   ProcLoadGreaterIndex(ProcArray * parr) : parr(parr) {}
49   bool operator()(int lhs, int rhs) {
50     return (parr->procs[lhs].getTotalLoad() < parr->procs[rhs].getTotalLoad());
51   }
52  private:
53   ProcArray *parr;
54 };
55
56 class ObjLoadGreater {
57   public:
58     ObjLoadGreater(ObjGraph* ogr) : ogr(ogr) {}
59     bool operator()(int lhs, int rhs) {
60       return (ogr->vertices[lhs].getVertexLoad() < ogr->vertices[rhs].getVertexLoad());
61     }
62   private:
63     ObjGraph* ogr;
64 };
65
66 inline void addObjToProc(ProcArray* parr, ObjGraph* ogr, std::vector<int>*
67     pe_obj, int pe_index, int obj_index) {
68
69   // Set the new pe
70   ogr->vertices[obj_index].setNewPe(pe_index);
71
72   // Add obj to the pe obj list
73   pe_obj[pe_index].push_back(obj_index);
74
75   // Update load
76   parr->procs[pe_index].setTotalLoad(parr->procs[pe_index].getTotalLoad() +
77       ogr->vertices[obj_index].getVertexLoad());
78 }
79
80 inline void removeObjFromProc(ProcArray* parr, ObjGraph* ogr, std::vector<int>*
81     pe_obj, int pe_index, int arr_index) {
82
83   // Update load
84   parr->procs[pe_index].setTotalLoad(parr->procs[pe_index].getTotalLoad() -
85       ogr->vertices[pe_obj[pe_index][arr_index]].getVertexLoad());
86
87   // Remove from pe_obj
88   pe_obj[pe_index].erase(pe_obj[pe_index].begin() + arr_index);
89 }
90
91
92 inline int getMax(ProcArray* parr, std::vector<int>& max_pe_heap) {
93   int p_index = max_pe_heap.front();
94   std::pop_heap(max_pe_heap.begin(), max_pe_heap.end(),
95       ProcLoadGreaterIndex(parr));
96   max_pe_heap.pop_back();
97   return p_index;
98 }
99
100 bool refine(ProcArray* parr, ObjGraph* ogr, std::vector<int>& max_pe_heap, 
101     std::vector<int>& min_pe_heap, std::vector<int>* pe_obj, int max_pe,
102     double avg_load, double threshold) {
103
104   //cout << "Picked max pe " << max_pe << " (" << parr->procs[max_pe].getTotalLoad() << ")" << endl;
105   int best_p, best_p_iter, arr_index;
106   bool allocated = false;
107   int pe_considered;
108   int obj_considered;
109   double best_size = 0.0;
110   std::sort(pe_obj[max_pe].begin(), pe_obj[max_pe].end(), ObjLoadGreater(ogr));
111
112   // Iterate over all the min pes and see which is the best object to
113   // transfer.
114
115   for (int i = (pe_obj[max_pe].size()-1); i >= 0; i--) {
116     for (int j = 0; j < min_pe_heap.size(); j++) {
117       obj_considered = pe_obj[max_pe][i];
118       pe_considered = min_pe_heap[j];
119    
120       if (parr->procs[pe_considered].getTotalLoad() + ogr->vertices[obj_considered].getVertexLoad() < (avg_load + threshold)) {
121     //    if (ogr->vertices[obj_considered].getVertexLoad() > best_size) {
122           best_size = ogr->vertices[obj_considered].getVertexLoad();
123           best_p = pe_considered;
124           best_p_iter = j;
125           arr_index = i;
126           allocated = true;
127           break;
128     //    }
129       }
130     }
131   }
132
133   if (allocated) {
134
135     int best_obj = pe_obj[max_pe][arr_index];
136     addObjToProc(parr, ogr, pe_obj, best_p, best_obj);
137     removeObjFromProc(parr, ogr, pe_obj, max_pe, arr_index);
138
139    // std::cout << " Moving obj " << best_obj << " (" <<
140    //   ogr->vertices[best_obj].getVertexLoad() << ") from " << max_pe << " to " <<
141    //   best_p << " New load " << max_pe << ":" << parr->procs[max_pe].getTotalLoad()
142    //   << " " << best_p << ":" << parr->procs[best_p].getTotalLoad()<< std::endl; 
143
144     // Update the max heap and min list
145     if (parr->procs[max_pe].getTotalLoad() > (avg_load + threshold)) {
146       // Reinsert
147       max_pe_heap.push_back(max_pe);
148       std::push_heap(max_pe_heap.begin(), max_pe_heap.end(),
149           ProcLoadGreaterIndex(parr));
150     } else if (parr->procs[max_pe].getTotalLoad() < (avg_load - threshold)) {
151       // Insert into the list of underloaded procs
152       min_pe_heap.push_back(max_pe);
153     }
154
155     if (parr->procs[best_p].getTotalLoad() > (avg_load - threshold)) {
156       // Remove from list of underloaded procs
157       min_pe_heap.erase(min_pe_heap.begin() + best_p_iter);
158     }
159   }
160   return allocated;
161 }
162
163 bool IsSwapPossWithPe(ProcArray* parr, ObjGraph* ogr, std::vector<int>* pe_obj,
164     std::vector<int>& max_pe_heap, std::vector<int>& min_pe_heap,
165     int max_pe, int pe_considered, int pe_cons_iter, double diff,
166     double avg_load, double threshold) {
167
168   bool set = false;
169   for (int i = pe_obj[max_pe].size() - 1; i >= 0; i--) {
170     for (int j = 0; j < pe_obj[pe_considered].size(); j++) {
171       int pe_cons = pe_obj[pe_considered][j];
172       int max_pe_obj = pe_obj[max_pe][i];
173      // CkPrintf("\tCandidates %d(%lf) with %d(%lf) : diff (%lf)\n", max_pe_obj,
174      //     ogr->vertices[max_pe_obj].getVertexLoad(), pe_cons,
175      //     ogr->vertices[pe_cons].getVertexLoad(), diff);
176
177       if (ogr->vertices[pe_cons].getVertexLoad() <
178           ogr->vertices[max_pe_obj].getVertexLoad()) {
179         if ((diff + ogr->vertices[pe_cons].getVertexLoad()) >
180             ogr->vertices[max_pe_obj].getVertexLoad()) {
181           //CkPrintf("\tSwapping %d with %d\n", max_pe_obj, pe_cons);
182           set = true;
183
184           addObjToProc(parr, ogr, pe_obj, pe_considered, max_pe_obj);
185           removeObjFromProc(parr, ogr, pe_obj, max_pe, i);
186
187           addObjToProc(parr, ogr, pe_obj, max_pe, pe_cons);
188           removeObjFromProc(parr, ogr, pe_obj, pe_considered, j);
189
190           // Update the max heap and min list
191           if (parr->procs[max_pe].getTotalLoad() > (avg_load + threshold)) {
192             // Reinsert
193             max_pe_heap.push_back(max_pe);
194             std::push_heap(max_pe_heap.begin(), max_pe_heap.end(),
195                 ProcLoadGreaterIndex(parr));
196           } else if (parr->procs[max_pe].getTotalLoad() < (avg_load - threshold)) {
197             // Insert into the list of underloaded procs
198             min_pe_heap.push_back(max_pe);
199           }
200
201           if (parr->procs[pe_considered].getTotalLoad() > (avg_load - threshold)) {
202             // Remove from list of underloaded procs
203             min_pe_heap.erase(min_pe_heap.begin() + pe_cons_iter);
204           }
205           break;
206         }
207       }
208     }
209
210     if (set) {
211       break;
212     }
213   }
214   return set;
215 }
216
217 bool refineSwap(ProcArray* parr, ObjGraph* ogr, std::vector<int>& max_pe_heap, 
218     std::vector<int>& min_pe_heap, std::vector<int>* pe_obj, int max_pe,
219     double avg_load, double threshold) {
220
221   double diff = 0;
222   bool is_possible = false;
223   int pe_considered;
224   int pe_cons_iter;
225   for (int i = 0; i < min_pe_heap.size(); i++) {
226     pe_considered = min_pe_heap[i];
227     pe_cons_iter = i;
228     std::sort(pe_obj[pe_considered].begin(), pe_obj[pe_considered].end(), ObjLoadGreater(ogr));
229     diff = avg_load - parr->procs[pe_considered].getTotalLoad();
230
231 //    CkPrintf("Checking to swap maxload pe %d  with minpe %d  + diff %lf \n",
232 //        max_pe, pe_considered, diff);
233     is_possible = IsSwapPossWithPe(parr, ogr, pe_obj, max_pe_heap, min_pe_heap, max_pe,
234         pe_considered, pe_cons_iter, diff, avg_load, threshold); 
235     if (is_possible) {
236       break;
237     }
238   }
239
240   if (!is_possible) {
241     return false;
242   }
243
244   return true;
245 }
246
247 void RefineSwapLB::work(LDStats* stats)
248 {
249   /** ========================== INITIALIZATION ============================= */
250   ProcArray *parr = new ProcArray(stats);       // Processor Array
251   ObjGraph *ogr = new ObjGraph(stats);          // Object Graph
252
253
254   /** ============================= STRATEGY ================================ */
255   //parr->resetTotalLoad();
256
257   if (_lb_args.debug()>1) 
258     CkPrintf("[%d] In RefineSwapLB strategy\n",CkMyPe());
259
260   int vert;
261   double avg_load = parr->getAverageLoad();
262   double threshold = avg_load * 0.01;
263   double lower_bound_load = avg_load - threshold;
264   double upper_bound_load = avg_load + threshold;
265   cout <<"Average load " << avg_load << endl;
266   
267   std::vector<int> min_pe_heap;
268   std::vector<int> max_pe_heap;
269
270   std::vector<int>* pe_obj = new std::vector<int>[parr->procs.size()];
271
272
273   // Create a datastructure to store the objects in a processor
274 //  CkPrintf("Object load\n");
275   for (int i = 0; i < ogr->vertices.size(); i++) {
276     pe_obj[ogr->vertices[i].getCurrentPe()].push_back(i);
277 //    CkPrintf("%d pe %d: %lf\n", i, ogr->vertices[i].getCurrentPe(), ogr->vertices[i].getVertexLoad());
278   }
279
280   // Construct max heap of overloaded processors and min heap of underloaded
281   // processors.
282   for (int i = 0; i < parr->procs.size(); i++) {
283     //CkPrintf("%d : %lf\n", i, parr->procs[i].getTotalLoad());
284     if (parr->procs[i].getTotalLoad() > upper_bound_load) {
285       max_pe_heap.push_back(i);
286     } else if (parr->procs[i].getTotalLoad() < lower_bound_load) {
287       min_pe_heap.push_back(i);
288     }
289   }
290
291   std::make_heap(max_pe_heap.begin(), max_pe_heap.end(), ProcLoadGreaterIndex(parr));
292
293   while (max_pe_heap.size() != 0 && min_pe_heap.size() != 0) {
294     int p_index = getMax(parr, max_pe_heap);
295     ProcInfo &pinfo = parr->procs[p_index];
296
297     bool success = refine(parr, ogr, max_pe_heap, min_pe_heap, pe_obj, p_index, avg_load, threshold);
298     
299
300     if (!success) {
301       // Swap with something. 
302
303       if (!refineSwap(parr, ogr, max_pe_heap, min_pe_heap, pe_obj, p_index, avg_load,
304             threshold)) {
305         max_pe_heap.push_back(p_index);
306         std::push_heap(max_pe_heap.begin(), max_pe_heap.end(),
307             ProcLoadGreaterIndex(parr));
308         break;
309       }
310     }
311   }
312
313   //std::cout << "Overloaded Processor load " << avg_load << endl;
314   for (int p_index = 0; p_index < max_pe_heap.size(); p_index++) {
315     //std::cout << max_pe_heap[p_index] << ": " << parr->procs[max_pe_heap[p_index]].getTotalLoad() << std::endl;
316   }
317
318   //std::cout << "Underloaded Processor load"<< endl;
319   for (int p_index = 0; p_index < min_pe_heap.size(); p_index++) {
320     //std::cout << min_pe_heap[p_index] << ": " << parr->procs[min_pe_heap[p_index]].getTotalLoad() << std::endl;
321   }
322
323
324   //std::cout << "Processor load"<< endl;
325   for (int i = 0; i < parr->procs.size(); i++) {
326     //CkPrintf("%d : %lf\n", i, parr->procs[i].getTotalLoad());
327   }
328
329   /** ============================== CLEANUP ================================ */
330   ogr->convertDecisions(stats);         // Send decisions back to LDStats
331   delete[] pe_obj;
332   delete parr;
333   delete ogr;
334 }
335
336 #include "RefineSwapLB.def.h"
337
338 /*@}*/
339