3236fb7968f806ea9ccbe5956c06e666325d234b
[charm.git] / src / ck-ldb / RefineSwapLB.C
1 /** \file RefineSwapLB.C
2  *
3  *  Written by Gengbin Zheng
4  *  Updated by Abhinav Bhatele, 2010-12-09 to use ckgraph
5  *
6  *  Status:
7  *    -- Does not support pe_speed's currently
8  *    -- Does not support nonmigratable attribute
9  */
10
11 /**
12  * \addtogroup CkLdb
13 */
14 /*@{*/
15
16 #include "RefineSwapLB.h"
17 #include "ckgraph.h"
18 #include <algorithm>
19 #include <iostream>
20
21 using std::cout;
22 using std::endl;
23
24 CreateLBFunc_Def(RefineSwapLB, "always assign the heaviest obj onto lightest loaded processor.")
25
26 RefineSwapLB::RefineSwapLB(const CkLBOptions &opt): CentralLB(opt)
27 {
28   lbname = "RefineSwapLB";
29   if (CkMyPe()==0)
30     CkPrintf("[%d] RefineSwapLB created\n",CkMyPe());
31 }
32
33 CmiBool RefineSwapLB::QueryBalanceNow(int _step)
34 {
35   //  CkPrintf("[%d] Balancing on step %d\n",CkMyPe(),_step);
36   return CmiTrue;
37 }
38
39 class ProcLoadGreater {
40   public:
41     bool operator()(ProcInfo p1, ProcInfo p2) {
42       return (p1.getTotalLoad() > p2.getTotalLoad());
43     }
44 };
45
46 class ProcLoadGreaterIndex {
47  public: 
48   ProcLoadGreaterIndex(ProcArray * parr) : parr(parr) {}
49   bool operator()(int lhs, int rhs) {
50     return (parr->procs[lhs].getTotalLoad() < parr->procs[rhs].getTotalLoad());
51   }
52  private:
53   ProcArray *parr;
54 };
55
56 class ObjLoadGreater {
57   public:
58     bool operator()(Vertex v1, Vertex v2) {
59       return (v1.getVertexLoad() > v2.getVertexLoad());
60     }
61 };
62
63
64 void RefineSwapLB::work(LDStats* stats)
65 {
66   /** ========================== INITIALIZATION ============================= */
67   ProcArray *parr = new ProcArray(stats);       // Processor Array
68   ObjGraph *ogr = new ObjGraph(stats);          // Object Graph
69
70
71   /** ============================= STRATEGY ================================ */
72   //parr->resetTotalLoad();
73
74   if (_lb_args.debug()>1) 
75     CkPrintf("[%d] In RefineSwapLB strategy\n",CkMyPe());
76
77   int vert;
78   double avg_load = parr->getAverageLoad();
79   double threshold = avg_load * 0.01;
80   double lower_bound_load = avg_load - threshold;
81   double upper_bound_load = avg_load + threshold;
82   cout <<"Average load " << avg_load << endl;
83   
84   std::vector<int> min_pe_heap;
85   std::vector<int> max_pe_heap;
86
87   std::vector<int>* pe_obj = new std::vector<int>[parr->procs.size()];
88
89   // Construct max heap of overloaded processors and min heap of underloaded
90   // processors.
91   for (int i = 0; i < parr->procs.size(); i++) {
92     CkPrintf("%d : %lf\n", i, parr->procs[i].getTotalLoad());
93     if (parr->procs[i].getTotalLoad() > upper_bound_load) {
94       max_pe_heap.push_back(i);
95     } else if (parr->procs[i].getTotalLoad() < lower_bound_load) {
96       min_pe_heap.push_back(i);
97     }
98   }
99
100   // Create a datastructure to store the objects in a processor
101   CkPrintf("Object load\n");
102   for (int i = 0; i < ogr->vertices.size(); i++) {
103     pe_obj[ogr->vertices[i].getCurrentPe()].push_back(i);
104     CkPrintf("%d: %lf\n", ogr->vertices[i].getCurrentPe(), ogr->vertices[i].getVertexLoad());
105   }
106
107   std::make_heap(max_pe_heap.begin(), max_pe_heap.end(), ProcLoadGreaterIndex(parr));
108
109   int best_p;
110   int best_obj;
111   int iter_location;
112   int obj_iter_location;
113   int min_pe_iter;
114   int min_weight_obj;
115   int min_iter_location;
116   while (max_pe_heap.size() != 0 && min_pe_heap.size() != 0) {
117     int ideal_transfer_pe = 0;
118     int p_index = max_pe_heap.front();
119     double best_size = 0.0;
120     double obj_wg_min = 100.0;
121     int allocated = false;
122     int obj_considered;
123     int pe_considered;
124     ProcInfo &pinfo = parr->procs[p_index];
125     std::pop_heap(max_pe_heap.begin(), max_pe_heap.end(),
126         ProcLoadGreaterIndex(parr));
127     max_pe_heap.pop_back();
128     cout << "Picked max pe " << p_index << " (" <<
129         parr->procs[p_index].getTotalLoad() << ")" << endl;
130     int second_phase_pe_considered_iter = 0;
131
132     // Iterate over all the min pes and see which is the best object to
133     // transfer.
134     for (int j = 0; j < min_pe_heap.size(); j++) {
135       for (int i = 0; i < pe_obj[p_index].size(); i++) {
136         obj_considered = pe_obj[p_index][i];
137         pe_considered = min_pe_heap[j];
138
139         if (parr->procs[pe_considered].getTotalLoad() < parr->procs[ideal_transfer_pe].getTotalLoad()) {
140           ideal_transfer_pe = pe_considered;
141           second_phase_pe_considered_iter = j;
142         }
143         if (ogr->vertices[obj_considered].getVertexLoad() < obj_wg_min) {
144           min_weight_obj = obj_considered;
145           min_iter_location = i;
146         }
147
148         if (parr->procs[pe_considered].getTotalLoad() + ogr->vertices[obj_considered].getVertexLoad() < (avg_load + threshold)) {
149           if (ogr->vertices[obj_considered].getVertexLoad() > best_size) {
150             best_obj = obj_considered;
151             best_size = ogr->vertices[obj_considered].getVertexLoad();
152             best_p = pe_considered;
153             iter_location = i;
154             min_pe_iter = j;
155             allocated = true;
156           }
157         }
158       }
159     }
160
161     if (allocated) {
162       // Set the new pe
163       ogr->vertices[best_obj].setNewPe(best_p);
164
165       // Remove from pe_obj
166       pe_obj[p_index].erase(pe_obj[p_index].begin() + iter_location);
167       pe_obj[best_p].push_back(best_obj);
168
169       // Update load of underloaded and overloaded
170       parr->procs[p_index].setTotalLoad(parr->procs[p_index].getTotalLoad() -
171           best_size);
172       parr->procs[best_p].setTotalLoad(parr->procs[best_p].getTotalLoad() +
173           best_size);
174
175       std::cout << " Moving obj " << best_obj << " (" << best_size << ") from " << p_index << " to " <<
176             best_p << " New load " << p_index << ":" << parr->procs[p_index].getTotalLoad()
177             << " " << best_p << ":" << parr->procs[best_p].getTotalLoad()<< std::endl; 
178
179       // Update the max heap and min list
180       if (parr->procs[p_index].getTotalLoad() > (avg_load + threshold)) {
181         // Reinsert
182         max_pe_heap.push_back(p_index);
183         std::push_heap(max_pe_heap.begin(), max_pe_heap.end(),
184             ProcLoadGreaterIndex(parr));
185       } else if (parr->procs[p_index].getTotalLoad() < (avg_load - threshold)) {
186         // Insert into the list of underloaded procs
187         min_pe_heap.push_back(p_index);
188       }
189
190       if (parr->procs[best_p].getTotalLoad() > (avg_load - threshold)) {
191         // Remove from list of underloaded procs
192         min_pe_heap.erase(min_pe_heap.begin() + min_pe_iter);
193       }
194     } else {
195       // Swap with something. 
196       // TODO:
197 //      cout << " Swapping needs to be done min weight pe : " << ideal_transfer_pe
198 //      << " load " << parr->procs[ideal_transfer_pe].getTotalLoad() << " diff "
199 //      << avg_load - parr->procs[ideal_transfer_pe].getTotalLoad() << endl;
200     //  max_pe_heap.push_back(p_index);
201     //  std::push_heap(max_pe_heap.begin(), max_pe_heap.end(),
202     //      ProcLoadGreaterIndex(parr));
203       double diff_load = avg_load - parr->procs[ideal_transfer_pe].getTotalLoad(); 
204
205       int possibility_x = 0;
206       int possibility_y = 0;
207       bool is_possible = false;
208       for (int i = 0; i < pe_obj[p_index].size(); i++) {
209         for (int j = 0; j < pe_obj[ideal_transfer_pe].size(); j++) {
210           if ((ogr->vertices[pe_obj[p_index][i]].getVertexLoad() >
211                 ogr->vertices[pe_obj[ideal_transfer_pe][j]].getVertexLoad())) {
212            // CkPrintf("%d (%lf) : %d(%lf) \n", pe_obj[p_index][i],
213            //     ogr->vertices[pe_obj[p_index][i]].getVertexLoad(),
214            //     pe_obj[ideal_transfer_pe][j],
215            //     ogr->vertices[pe_obj[ideal_transfer_pe][j]].getVertexLoad());
216            // CkPrintf("\t %lf : %lf \n",
217            // ogr->vertices[pe_obj[p_index][i]].getVertexLoad(),
218            // ogr->vertices[pe_obj[ideal_transfer_pe][j]].getVertexLoad() + diff_load);
219
220             if ((ogr->vertices[pe_obj[p_index][i]].getVertexLoad() - diff_load) <
221                 ogr->vertices[pe_obj[ideal_transfer_pe][j]].getVertexLoad()){
222               is_possible = true;
223               possibility_x = i;
224               possibility_y = j;
225             }
226           }
227         }
228       }
229       if (!is_possible) {
230         max_pe_heap.push_back(p_index);
231         std::push_heap(max_pe_heap.begin(), max_pe_heap.end(),
232             ProcLoadGreaterIndex(parr));
233         //for (int i = 0; i < pe_obj[p_index].size(); i++) {
234         //  for (int j = 0; j < pe_obj[ideal_transfer_pe].size(); j++) {
235         //    CkPrintf("\t :( %d (%lf) : %d(%lf) \n", pe_obj[p_index][i],
236         //        ogr->vertices[pe_obj[p_index][i]].getVertexLoad(),
237         //        pe_obj[ideal_transfer_pe][j],
238         //        ogr->vertices[pe_obj[ideal_transfer_pe][j]].getVertexLoad());
239         //    CkPrintf("\t\t %lf : %lf \n",
240         //        ogr->vertices[pe_obj[p_index][i]].getVertexLoad(),
241         //        ogr->vertices[pe_obj[ideal_transfer_pe][j]].getVertexLoad() + diff_load);
242         //  }
243         //}
244
245         break;
246       }
247      // CkPrintf(" Possibility of swap %d (%lf) : %d(%lf) \n",
248      //     pe_obj[p_index][possibility_x],
249      //     ogr->vertices[pe_obj[p_index][possibility_x]].getVertexLoad(),
250      //     pe_obj[ideal_transfer_pe][possibility_y],
251      //     ogr->vertices[pe_obj[ideal_transfer_pe][possibility_y]].getVertexLoad());
252
253
254       pe_obj[ideal_transfer_pe].push_back(pe_obj[p_index][possibility_x]);
255       parr->procs[p_index].setTotalLoad(parr->procs[p_index].getTotalLoad() -
256           ogr->vertices[pe_obj[p_index][possibility_x]].getVertexLoad());
257       parr->procs[ideal_transfer_pe].setTotalLoad(parr->procs[ideal_transfer_pe].getTotalLoad() +
258           ogr->vertices[pe_obj[p_index][possibility_x]].getVertexLoad());
259       pe_obj[p_index].erase(pe_obj[p_index].begin() + possibility_x);
260
261       pe_obj[p_index].push_back(pe_obj[ideal_transfer_pe][possibility_y]);
262       // Update load of underloaded and overloaded
263       parr->procs[p_index].setTotalLoad(parr->procs[p_index].getTotalLoad() +
264           ogr->vertices[pe_obj[ideal_transfer_pe][possibility_y]].getVertexLoad());
265       parr->procs[ideal_transfer_pe].setTotalLoad(parr->procs[ideal_transfer_pe].getTotalLoad() -
266           ogr->vertices[pe_obj[ideal_transfer_pe][possibility_y]].getVertexLoad());
267       pe_obj[ideal_transfer_pe].erase(pe_obj[ideal_transfer_pe].begin() + possibility_y);
268
269
270       // Update the max heap and min list
271       if (parr->procs[p_index].getTotalLoad() > (avg_load + threshold)) {
272         // Reinsert
273         max_pe_heap.push_back(p_index);
274         std::push_heap(max_pe_heap.begin(), max_pe_heap.end(),
275             ProcLoadGreaterIndex(parr));
276       } else if (parr->procs[p_index].getTotalLoad() < (avg_load - threshold)) {
277         // Insert into the list of underloaded procs
278         min_pe_heap.push_back(p_index);
279       }
280
281       if (parr->procs[ideal_transfer_pe].getTotalLoad() > (avg_load - threshold)) {
282         // Remove from list of underloaded procs
283         min_pe_heap.erase(min_pe_heap.begin() + second_phase_pe_considered_iter);
284       }
285
286     }
287   }
288
289   std::cout << "Overloaded Processor load"<< endl;
290   for (int p_index = 0; p_index < max_pe_heap.size(); p_index++) {
291     std::cout << max_pe_heap[p_index] << ": " << parr->procs[max_pe_heap[p_index]].getTotalLoad() << std::endl;
292
293        
294   }
295
296   std::cout << "Processor load"<< endl;
297   for (int i = 0; i < parr->procs.size(); i++) {
298     CkPrintf("%d : %lf\n", i, parr->procs[i].getTotalLoad());
299   }
300
301   /** ============================== CLEANUP ================================ */
302   ogr->convertDecisions(stats);         // Send decisions back to LDStats
303   delete[] pe_obj;
304   delete parr;
305   delete ogr;
306 }
307
308 #include "RefineSwapLB.def.h"
309
310 /*@}*/
311