Refinement scheme improved
[charm.git] / src / ck-ldb / RefineSwapLB.C
1 /** \file RefineSwapLB.C
2  *
3  *  Written by Gengbin Zheng
4  *  Updated by Abhinav Bhatele, 2010-12-09 to use ckgraph
5  *
6  *  Status:
7  *    -- Does not support pe_speed's currently
8  *    -- Does not support nonmigratable attribute
9  */
10
11 /**
12  * \addtogroup CkLdb
13 */
14 /*@{*/
15
16 #include "RefineSwapLB.h"
17 #include "ckgraph.h"
18 #include <algorithm>
19 #include <iostream>
20
21 using std::cout;
22 using std::endl;
23
24 CreateLBFunc_Def(RefineSwapLB, "always assign the heaviest obj onto lightest loaded processor.")
25
26 RefineSwapLB::RefineSwapLB(const CkLBOptions &opt): CentralLB(opt)
27 {
28   lbname = "RefineSwapLB";
29   if (CkMyPe()==0)
30     CkPrintf("[%d] RefineSwapLB created\n",CkMyPe());
31 }
32
33 CmiBool RefineSwapLB::QueryBalanceNow(int _step)
34 {
35   //  CkPrintf("[%d] Balancing on step %d\n",CkMyPe(),_step);
36   return CmiTrue;
37 }
38
39 class ProcLoadGreater {
40   public:
41     bool operator()(ProcInfo p1, ProcInfo p2) {
42       return (p1.getTotalLoad() > p2.getTotalLoad());
43     }
44 };
45
46 class ProcLoadGreaterIndex {
47  public: 
48   ProcLoadGreaterIndex(ProcArray * parr) : parr(parr) {}
49   bool operator()(int lhs, int rhs) {
50     return (parr->procs[lhs].getTotalLoad() < parr->procs[rhs].getTotalLoad());
51   }
52  private:
53   ProcArray *parr;
54 };
55
56 class ObjLoadGreater {
57   public:
58     ObjLoadGreater(ObjGraph* ogr) : ogr(ogr) {}
59     bool operator()(int lhs, int rhs) {
60       return (ogr->vertices[lhs].getVertexLoad() < ogr->vertices[rhs].getVertexLoad());
61     }
62   private:
63     ObjGraph* ogr;
64 };
65
66 inline void addObjToProc(ProcArray* parr, ObjGraph* ogr, std::vector<int>*
67     pe_obj, int pe_index, int obj_index) {
68
69   // Set the new pe
70   ogr->vertices[obj_index].setNewPe(pe_index);
71
72   // Add obj to the pe obj list
73   pe_obj[pe_index].push_back(obj_index);
74
75   // Update load
76   parr->procs[pe_index].setTotalLoad(parr->procs[pe_index].getTotalLoad() +
77       ogr->vertices[obj_index].getVertexLoad());
78 }
79
80 inline void removeObjFromProc(ProcArray* parr, ObjGraph* ogr, std::vector<int>*
81     pe_obj, int pe_index, int arr_index) {
82
83   // Update load
84   parr->procs[pe_index].setTotalLoad(parr->procs[pe_index].getTotalLoad() -
85       ogr->vertices[pe_obj[pe_index][arr_index]].getVertexLoad());
86
87   // Remove from pe_obj
88   pe_obj[pe_index].erase(pe_obj[pe_index].begin() + arr_index);
89 }
90
91
92 inline int getMax(ProcArray* parr, std::vector<int>& max_pe_heap) {
93   int p_index = max_pe_heap.front();
94   std::pop_heap(max_pe_heap.begin(), max_pe_heap.end(),
95       ProcLoadGreaterIndex(parr));
96   max_pe_heap.pop_back();
97   return p_index;
98 }
99
100 bool refine(ProcArray* parr, ObjGraph* ogr, std::vector<int>& max_pe_heap, 
101     std::vector<int>& min_pe_heap, std::vector<int>* pe_obj, int max_pe,
102     double avg_load, double threshold) {
103
104   cout << "Picked max pe " << max_pe << " (" << parr->procs[max_pe].getTotalLoad() << ")" << endl;
105   int best_p, best_p_iter, arr_index;
106   bool allocated = false;
107   int pe_considered;
108   int obj_considered;
109   double best_size = 0.0;
110   std::sort(pe_obj[max_pe].begin(), pe_obj[max_pe].end(), ObjLoadGreater(ogr));
111
112   // Iterate over all the min pes and see which is the best object to
113   // transfer.
114
115   for (int i = (pe_obj[max_pe].size()-1); i >= 0; i--) {
116     for (int j = 0; j < min_pe_heap.size(); j++) {
117       obj_considered = pe_obj[max_pe][i];
118       pe_considered = min_pe_heap[j];
119    
120       if (parr->procs[pe_considered].getTotalLoad() + ogr->vertices[obj_considered].getVertexLoad() < (avg_load + threshold)) {
121     //    if (ogr->vertices[obj_considered].getVertexLoad() > best_size) {
122           best_size = ogr->vertices[obj_considered].getVertexLoad();
123           best_p = pe_considered;
124           best_p_iter = j;
125           arr_index = i;
126           allocated = true;
127           break;
128     //    }
129       }
130     }
131   }
132
133   if (allocated) {
134
135     int best_obj = pe_obj[max_pe][arr_index];
136     addObjToProc(parr, ogr, pe_obj, best_p, best_obj);
137     removeObjFromProc(parr, ogr, pe_obj, max_pe, arr_index);
138
139     std::cout << " Moving obj " << best_obj << " (" <<
140       ogr->vertices[best_obj].getVertexLoad() << ") from " << max_pe << " to " <<
141       best_p << " New load " << max_pe << ":" << parr->procs[max_pe].getTotalLoad()
142       << " " << best_p << ":" << parr->procs[best_p].getTotalLoad()<< std::endl; 
143
144     // Update the max heap and min list
145     if (parr->procs[max_pe].getTotalLoad() > (avg_load + threshold)) {
146       // Reinsert
147       max_pe_heap.push_back(max_pe);
148       std::push_heap(max_pe_heap.begin(), max_pe_heap.end(),
149           ProcLoadGreaterIndex(parr));
150     } else if (parr->procs[max_pe].getTotalLoad() < (avg_load - threshold)) {
151       // Insert into the list of underloaded procs
152       min_pe_heap.push_back(max_pe);
153     }
154
155     if (parr->procs[best_p].getTotalLoad() > (avg_load - threshold)) {
156       // Remove from list of underloaded procs
157       min_pe_heap.erase(min_pe_heap.begin() + best_p_iter);
158     }
159   }
160   return allocated;
161 }
162
163 bool refineSwap(ProcArray* parr, ObjGraph* ogr, std::vector<int>& max_pe_heap, 
164     std::vector<int>& min_pe_heap, std::vector<int>* pe_obj, int max_pe,
165     double avg_load, double threshold) {
166
167   double diff = 0;
168   bool is_possible = false;
169   int pe_considered;
170   int pe_cons_iter;
171   for (int i = 0; i < min_pe_heap.size(); i++) {
172     pe_considered = min_pe_heap[i];
173     pe_cons_iter = i;
174     std::sort(pe_obj[pe_considered].begin(), pe_obj[pe_considered].end(), ObjLoadGreater(ogr));
175     diff = avg_load - parr->procs[pe_considered].getTotalLoad();
176     int pe_cons_max = pe_obj[pe_considered][pe_obj[pe_considered].size() - 1];
177
178       CkPrintf("Checking to swap maxload pe %d (%d) with minpe %d (%d) + diff %lf \n",
179             max_pe, pe_obj[max_pe][0], pe_considered, pe_cons_max, diff);
180
181     if (ogr->vertices[pe_cons_max].getVertexLoad() <
182         ogr->vertices[pe_obj[max_pe][0]].getVertexLoad()) {
183       if ((diff + ogr->vertices[pe_cons_max].getVertexLoad()) >
184           ogr->vertices[pe_obj[max_pe][0]].getVertexLoad()) {
185         CkPrintf("Possible to swap maxload pe %d (%d) with minpe %d (%d) + diff %lf \n",
186             max_pe, pe_obj[max_pe][0], pe_considered, pe_cons_max, diff);
187         is_possible = true;
188         break;
189       }
190     }
191   }
192
193   if (!is_possible) {
194     return false;
195   }
196
197   bool set = false;
198   for (int i = pe_obj[max_pe].size() - 1; i >= 0; i--) {
199     for (int j = 0; j < pe_obj[pe_considered].size(); j++) {
200       int pe_cons = pe_obj[pe_considered][j];
201       int max_pe_obj = pe_obj[max_pe][i];
202
203       if (ogr->vertices[pe_cons].getVertexLoad() <
204           ogr->vertices[max_pe_obj].getVertexLoad()) {
205         if ((diff + ogr->vertices[pe_cons].getVertexLoad()) >
206             ogr->vertices[max_pe_obj].getVertexLoad()) {
207           CkPrintf("\tSwapping %d with %d\n", max_pe_obj, pe_cons);
208           set = true;
209
210           addObjToProc(parr, ogr, pe_obj, pe_considered, max_pe_obj);
211           removeObjFromProc(parr, ogr, pe_obj, max_pe, i);
212
213           addObjToProc(parr, ogr, pe_obj, max_pe, pe_cons);
214           removeObjFromProc(parr, ogr, pe_obj, pe_considered, j);
215
216           // Update the max heap and min list
217           if (parr->procs[max_pe].getTotalLoad() > (avg_load + threshold)) {
218             // Reinsert
219             max_pe_heap.push_back(max_pe);
220             std::push_heap(max_pe_heap.begin(), max_pe_heap.end(),
221                 ProcLoadGreaterIndex(parr));
222           } else if (parr->procs[max_pe].getTotalLoad() < (avg_load - threshold)) {
223             // Insert into the list of underloaded procs
224             min_pe_heap.push_back(max_pe);
225           }
226
227           if (parr->procs[pe_considered].getTotalLoad() > (avg_load - threshold)) {
228             // Remove from list of underloaded procs
229             min_pe_heap.erase(min_pe_heap.begin() + pe_cons_iter);
230           }
231           break;
232         }
233       }
234     }
235     if (set) {
236       break;
237     }
238   }
239   return true;
240
241 }
242
243 void RefineSwapLB::work(LDStats* stats)
244 {
245   /** ========================== INITIALIZATION ============================= */
246   ProcArray *parr = new ProcArray(stats);       // Processor Array
247   ObjGraph *ogr = new ObjGraph(stats);          // Object Graph
248
249
250   /** ============================= STRATEGY ================================ */
251   //parr->resetTotalLoad();
252
253   if (_lb_args.debug()>1) 
254     CkPrintf("[%d] In RefineSwapLB strategy\n",CkMyPe());
255
256   int vert;
257   double avg_load = parr->getAverageLoad();
258   double threshold = avg_load * 0.01;
259   double lower_bound_load = avg_load - threshold;
260   double upper_bound_load = avg_load + threshold;
261   cout <<"Average load " << avg_load << endl;
262   
263   std::vector<int> min_pe_heap;
264   std::vector<int> max_pe_heap;
265
266   std::vector<int>* pe_obj = new std::vector<int>[parr->procs.size()];
267
268
269   // Create a datastructure to store the objects in a processor
270   CkPrintf("Object load\n");
271   for (int i = 0; i < ogr->vertices.size(); i++) {
272     pe_obj[ogr->vertices[i].getCurrentPe()].push_back(i);
273     CkPrintf("%d pe %d: %lf\n", i, ogr->vertices[i].getCurrentPe(), ogr->vertices[i].getVertexLoad());
274   }
275
276   // Construct max heap of overloaded processors and min heap of underloaded
277   // processors.
278   for (int i = 0; i < parr->procs.size(); i++) {
279     CkPrintf("%d : %lf\n", i, parr->procs[i].getTotalLoad());
280     if (parr->procs[i].getTotalLoad() > upper_bound_load) {
281       max_pe_heap.push_back(i);
282     } else if (parr->procs[i].getTotalLoad() < lower_bound_load) {
283       min_pe_heap.push_back(i);
284     }
285   }
286
287   std::make_heap(max_pe_heap.begin(), max_pe_heap.end(), ProcLoadGreaterIndex(parr));
288
289   while (max_pe_heap.size() != 0 && min_pe_heap.size() != 0) {
290     int p_index = getMax(parr, max_pe_heap);
291     ProcInfo &pinfo = parr->procs[p_index];
292
293     bool success = refine(parr, ogr, max_pe_heap, min_pe_heap, pe_obj, p_index, avg_load, threshold);
294     
295
296     if (!success) {
297       // Swap with something. 
298
299       if (!refineSwap(parr, ogr, max_pe_heap, min_pe_heap, pe_obj, p_index, avg_load,
300             threshold)) {
301         max_pe_heap.push_back(p_index);
302         std::push_heap(max_pe_heap.begin(), max_pe_heap.end(),
303             ProcLoadGreaterIndex(parr));
304         break;
305       }
306     }
307   }
308
309   std::cout << "Overloaded Processor load"<< endl;
310   for (int p_index = 0; p_index < max_pe_heap.size(); p_index++) {
311     std::cout << max_pe_heap[p_index] << ": " << parr->procs[max_pe_heap[p_index]].getTotalLoad() << std::endl;
312   }
313
314   std::cout << "Underloaded Processor load"<< endl;
315   for (int p_index = 0; p_index < min_pe_heap.size(); p_index++) {
316     std::cout << min_pe_heap[p_index] << ": " << parr->procs[min_pe_heap[p_index]].getTotalLoad() << std::endl;
317   }
318
319
320   std::cout << "Processor load"<< endl;
321   for (int i = 0; i < parr->procs.size(); i++) {
322     CkPrintf("%d : %lf\n", i, parr->procs[i].getTotalLoad());
323   }
324
325   /** ============================== CLEANUP ================================ */
326   ogr->convertDecisions(stats);         // Send decisions back to LDStats
327   delete[] pe_obj;
328   delete parr;
329   delete ogr;
330 }
331
332 #include "RefineSwapLB.def.h"
333
334 /*@}*/
335