doc: Add serial to list of ci file reserved words
[charm.git] / src / ck-ldb / CommAwareRefineLB.C
1 /** \file CommAwareRefineLB.C
2  *
3  *  Written by Harshitha Menon
4  *  
5  *  This Loadbalancer strategy is Refine but taking into consideration the
6  *  Communication between the processors.
7  *  The following are the steps in the loadbalancing strategy
8  *
9  *  1. Construct a max heap of processor load whose load is greater than avg
10  *  2. Construct a sorted array of processor load whose load is less than avg
11  *  3. Pick the heaviest processor from the heap, randomly select a chare in
12  *  that processor and decide on which processor in the underloaded processor
13  *  list to transfer it to based on the one with which it is 
14  *  heavily communicating.
15  *  4. If the load of the processors after the transfer is less than the avg
16  *  load, then push it into the underloaded processor list, else push it into
17  *  the max heap.
18  */
19
20 /**
21  * \addtogroup CkLdb
22 */
23 /*@{*/
24
25 #include "CommAwareRefineLB.h"
26 #include "ckgraph.h"
27 #include <algorithm>
28 #include <map>
29
30 #include <time.h>
31
32 #define THRESHOLD 0.02
33 #define SWAP_MULTIPLIER 5 
34
35 inline void eraseObjFromParrObjs(std::vector<int> & parr_objs, int remove_objid);
36 inline void printMapping(std::vector<Vertex> &vertices);
37 inline void removeFromArray(int pe_id, std::vector<int> &array);
38 inline int popFromProcHeap(std::vector<int> & parr_above_avg, ProcArray *parr);
39 inline void handleTransfer(int randomly_obj_id, ProcInfo& p, int possible_pe, std::vector<int> *parr_objs, ObjGraph *ogr, ProcArray* parr);
40 inline void updateLoadInfo(int p_index, int possible_pe, double upper_threshold, double lower_threshold,
41                            std::vector<int> &parr_above_avg, std::vector<int> &parr_below_avg,
42                            bool* proc_load_info, ProcArray *parr);
43 inline void getPossiblePes(std::vector<int>& possible_pes, int randomly_obj_id,
44     ObjGraph *ogr, ProcArray* parr);
45
46 double upper_threshold;
47 double lower_threshold;
48
49 CreateLBFunc_Def(CommAwareRefineLB, "always assign the heaviest obj onto lightest loaded processor.")
50
51 CommAwareRefineLB::CommAwareRefineLB(const CkLBOptions &opt): CentralLB(opt)
52 {
53   lbname = "CommAwareRefineLB";
54   if (CkMyPe()==0)
55     CkPrintf("[%d] CommAwareRefineLB created\n",CkMyPe());
56 }
57
58 CmiBool CommAwareRefineLB::QueryBalanceNow(int _step)
59 {
60   //  CkPrintf("[%d] Balancing on step %d\n",CkMyPe(),_step);
61   return CmiTrue;
62 }
63
64 class ProcLoadGreater {
65   public:
66     ProcLoadGreater(ProcArray *parr) : parr(parr) {
67     }
68     bool operator()(int lhs, int rhs) {
69       return (parr->procs[lhs].getTotalLoad() < parr->procs[rhs].getTotalLoad());
70     }
71
72   private:
73     ProcArray *parr;
74 };
75
76 class ObjLoadGreater {
77   public:
78     bool operator()(Vertex v1, Vertex v2) {
79       return (v1.getVertexLoad() > v2.getVertexLoad());
80     }
81 };
82
83 class PeCommInfo {
84   public:
85     PeCommInfo() : num_msg(0), num_bytes(0) {
86     }
87
88     PeCommInfo(int pe_id) : pe_id(pe_id), num_msg(0) , num_bytes(0) {
89     }
90     int pe_id;
91     int num_msg;
92     int num_bytes;
93     // TODO: Should probably have a communication cost
94 };
95
96 // Consists of communication information of an object with is maintained
97 // as a list of PeCommInfo containing the processor id and the bytes transferred
98 class ObjPeCommInfo {
99   public:
100     ObjPeCommInfo() {
101     }
102
103     int obj_id;
104     std::vector<PeCommInfo> pcomm;
105 };
106
107 class ProcCommGreater {
108   public:
109     bool operator()(PeCommInfo p1, PeCommInfo p2) {
110       // TODO(Harshitha): Should probably consider total communication cost
111       return (p1.num_bytes > p2.num_bytes);
112     }
113 };
114
115 void PrintProcLoad(ProcArray *parr) {
116   int vert;
117   double pe_load;
118   for (vert = 0; vert < parr->procs.size(); vert++) {
119     pe_load = parr->procs[vert].getTotalLoad();
120     if (pe_load > upper_threshold) {
121       CkPrintf("Above load : %d load : %E overhead : %E\n",
122         parr->procs[vert].getProcId(), parr->procs[vert].getTotalLoad(),
123         parr->procs[vert].overhead());
124     } else if (pe_load < lower_threshold) {
125       CkPrintf("Below load : %d load : %E overhead : %E\n",
126         parr->procs[vert].getProcId(), parr->procs[vert].getTotalLoad(),
127         parr->procs[vert].overhead());
128     } else {
129       CkPrintf("Within avg load : %d load : %E overhead : %E\n",
130         parr->procs[vert].getProcId(), parr->procs[vert].getTotalLoad(),
131         parr->procs[vert].overhead());
132     }
133   }
134 }
135
136 void PrintProcObj(ProcArray *parr, std::vector<int>* parr_objs) {
137   int i, j;
138   CkPrintf("---------------------\n");
139   for (i = 0; i < parr->procs.size(); i++) {
140     CkPrintf("[%d] contains ", i);
141     for (j = 0; j < parr_objs[i].size(); j++) {
142       CkPrintf(" %d, ", parr_objs[i][j]);
143     }
144     CkPrintf("\n");
145   }
146   CkPrintf("---------------------\n");
147 }
148
149
150 void CommAwareRefineLB::work(LDStats* stats) {
151   /** ========================== INITIALIZATION ============================= */
152   ProcArray *parr = new ProcArray(stats);       // Processor Array
153   ObjGraph *ogr = new ObjGraph(stats);          // Object Graph
154   double avgload = parr->getAverageLoad();      // Average load of processors
155
156   // Sets to false if it is overloaded, else to true
157   bool* proc_load_info = new bool[parr->procs.size()];
158   memset(proc_load_info, false, parr->procs.size());
159
160   // Create an array of vectors for each processor mapping to the objects in
161   // that processor
162   std::vector<int>* parr_objs = new std::vector<int>[parr->procs.size()];
163
164   upper_threshold = avgload + (avgload * THRESHOLD);
165   //lower_threshold = avgload - (avgload * THRESHOLD * THRESHOLD);
166   lower_threshold = avgload;
167
168   int less_loaded_counter = 0;
169
170   srand(time(NULL));
171   /** ============================= STRATEGY ================================ */
172
173   if (_lb_args.debug()>1) 
174     CkPrintf("[%d] In CommAwareRefineLB strategy\n",CkMyPe());
175
176   CkPrintf("Average load %E\n", avgload);
177
178   int vert, i, j;
179   int curr_pe;
180
181   // Iterate over all the chares and construct the peid, vector<chareid> array
182   for(vert = 0; vert < ogr->vertices.size(); vert++) {
183     curr_pe = ogr->vertices[vert].getCurrentPe();
184     parr_objs[curr_pe].push_back(vert);
185     ogr->vertices[vert].setNewPe(curr_pe);
186   }
187
188   std::vector<int> parr_above_avg;
189   std::vector<int> parr_below_avg;
190
191   double pe_load;  
192
193   // Insert into parr_above_avg if the processor fits under the criteria of
194   // overloaded processor.
195   // Insert the processor id into parr_below_avg if the processor is underloaded 
196   for (vert = 0; vert < parr->procs.size(); vert++) {
197     pe_load = parr->procs[vert].getTotalLoad();
198     if (pe_load > upper_threshold) {
199       // Pushing ProcInfo into this list
200       parr_above_avg.push_back(vert);
201     } else if (pe_load < lower_threshold) {
202       parr_below_avg.push_back(parr->procs[vert].getProcId());
203       proc_load_info[parr->procs[vert].getProcId()] = true;
204       less_loaded_counter++;
205     }
206   }
207
208   std::make_heap(parr_above_avg.begin(), parr_above_avg.end(),
209       ProcLoadGreater(parr));
210
211   int random;
212   int randomly_obj_id;
213   bool obj_allocated;
214   int num_tries;
215   // Allow as many swaps as there are chares
216   int total_swaps = ogr->vertices.size() * SWAP_MULTIPLIER;
217   int possible_pe;
218   double obj_load;
219
220   // Keep on loadbalancing until the number of above avg processors is 0
221   while (parr_above_avg.size() != 0 && total_swaps > 0 && parr_below_avg.size() != 0) {
222     // CkPrintf("Above avg : %d Below avg : %d Total swaps: %d\n", parr_above_avg.size(),
223     //    parr_below_avg.size(), total_swaps);
224     obj_allocated = false;
225     num_tries = 0;
226
227     // Pop the heaviest processor
228     int p_index = popFromProcHeap(parr_above_avg, parr);
229     ProcInfo& p = parr->procs[p_index];
230
231     while (!obj_allocated && num_tries < parr_objs[p.getProcId()].size()) {
232
233       // It might so happen that due to overhead load, it might not have any
234       // more objects in its list
235       if (parr_objs[p.getProcId()].size() == 0) {
236         // CkPrintf("No obj left to be allocated\n");
237         obj_allocated = true;
238         break;
239       }
240
241       int randd = rand();
242       random = randd % parr_objs[p.getProcId()].size();
243       randomly_obj_id = parr_objs[p.getProcId()][random];
244       obj_load = ogr->vertices[randomly_obj_id].getVertexLoad();
245
246       // CkPrintf("Heavy %d: Parr obj size : %d random : %d random obj id : %d\n", p_index,
247       //     parr_objs[p.getProcId()].size(), randd, randomly_obj_id);
248       std::vector<int> possible_pes;
249       getPossiblePes(possible_pes, randomly_obj_id, ogr, parr);
250       for (i = 0; i < possible_pes.size(); i++) {
251
252         // If the heaviest communicating processor is there in the list, then
253         // assign it to that.
254         possible_pe = possible_pes[i];
255
256         if ((parr->procs[possible_pe].getTotalLoad() + obj_load) < upper_threshold) {
257          // CkPrintf("**  Transfered %d(Load %lf) from %d:%d(Load %lf) to %d:%d(Load %lf)\n",
258          //     randomly_obj_id, obj_load, CkNodeOf(p.getProcId()), p.getProcId(), p.getTotalLoad(),
259          //     CkNodeOf(possible_pe), possible_pe,
260          //     parr->procs[possible_pe].getTotalLoad());
261
262           handleTransfer(randomly_obj_id, p, possible_pe, parr_objs, ogr, parr);
263           obj_allocated = true;
264           total_swaps--;
265           updateLoadInfo(p_index, possible_pe, upper_threshold, lower_threshold,
266               parr_above_avg, parr_below_avg, proc_load_info, parr);
267
268           break;
269         }
270       }
271
272       // Since there is no processor in the least loaded list with which this
273       // chare communicates, pick a random least loaded processor.
274       if (!obj_allocated) {
275         //CkPrintf(":( Could not transfer to the nearest communicating ones\n");
276         for (int x = 0; x < parr_below_avg.size(); x++) {
277           int random_pe = parr_below_avg[x];
278           if ((parr->procs[random_pe].getTotalLoad() + obj_load) < upper_threshold) {
279             obj_allocated = true;
280             total_swaps--;
281             handleTransfer(randomly_obj_id, p, random_pe, parr_objs, ogr, parr);
282             updateLoadInfo(p_index, random_pe, upper_threshold, lower_threshold,
283                 parr_above_avg, parr_below_avg, proc_load_info, parr);
284             break;
285           }
286           num_tries++;
287         }
288       }
289     }
290
291     if (!obj_allocated) {
292       //CkPrintf("!!!! Could not handle the heavy proc %d so giving up\n", p_index);
293       // parr_above_avg.push_back(p_index);
294       // std::push_heap(parr_above_avg.begin(), parr_above_avg.end(),
295       //     ProcLoadGreater(parr));
296     }
297   }
298
299   //CkPrintf("CommAwareRefine> After lb max load: %lf avg load: %lf\n", max_load, avg_load/parr->procs.size());
300
301   /** ============================== CLEANUP ================================ */
302   ogr->convertDecisions(stats);         // Send decisions back to LDStats
303   delete parr;
304   delete ogr;
305   delete proc_load_info;
306   delete[] parr_objs;
307 }
308
309 inline void eraseObjFromParrObjs(std::vector<int> & parr_objs, int remove_objid) {
310   for (int i = 0; i < parr_objs.size(); i++) {
311     if (parr_objs[i] == remove_objid) {
312       parr_objs.erase(parr_objs.begin() + i);
313       return;
314     }
315   }
316 }
317
318 inline void printMapping(std::vector<Vertex> &vertices) {
319   for (int i = 0; i < vertices.size(); i++) {
320     CkPrintf("%d: old map : %d new map : %d\n", i, vertices[i].getCurrentPe(),
321         vertices[i].getNewPe());
322   }
323 }
324
325 inline void removeFromArray(int pe_id, std::vector<int> &array) {
326   for (int i = 0; i < array.size(); i++) {
327     if (array[i] == pe_id) {
328       array.erase(array.begin() + i);
329     }
330   }
331 }
332
333 inline int popFromProcHeap(std::vector<int> & parr_above_avg, ProcArray *parr) {
334   int p_index = parr_above_avg.front();
335   std::pop_heap(parr_above_avg.begin(), parr_above_avg.end(),
336       ProcLoadGreater(parr));
337   parr_above_avg.pop_back();
338   return p_index;
339 }
340
341     
342 inline void handleTransfer(int randomly_obj_id, ProcInfo& p, int possible_pe, std::vector<int>* parr_objs, ObjGraph *ogr, ProcArray* parr) {
343   ogr->vertices[randomly_obj_id].setNewPe(possible_pe);
344   parr_objs[possible_pe].push_back(randomly_obj_id);
345   ProcInfo &possible_pe_procinfo = parr->procs[possible_pe];
346
347   p.totalLoad() -= ogr->vertices[randomly_obj_id].getVertexLoad();
348   possible_pe_procinfo.totalLoad() += ogr->vertices[randomly_obj_id].getVertexLoad();
349   eraseObjFromParrObjs(parr_objs[p.getProcId()], randomly_obj_id);
350   //CkPrintf("After transfered %d from %d : Load %E to %d : Load %E\n", randomly_obj_id, p.getProcId(), p.getTotalLoad(),
351   //    possible_pe, possible_pe_procinfo.getTotalLoad());
352 }
353
354 inline void updateLoadInfo(int p_index, int possible_pe, double upper_threshold, double lower_threshold,
355                            std::vector<int>& parr_above_avg, std::vector<int>& parr_below_avg,
356                            bool* proc_load_info, ProcArray *parr) {
357
358   ProcInfo& p = parr->procs[p_index];
359   ProcInfo& possible_pe_procinfo = parr->procs[possible_pe];
360
361   // If the updated load is still greater than the average by the
362   // threshold value, then push it back to the max heap
363   if (p.getTotalLoad() > upper_threshold) {
364     parr_above_avg.push_back(p_index);
365     std::push_heap(parr_above_avg.begin(), parr_above_avg.end(),
366         ProcLoadGreater(parr));
367     //CkPrintf("\t Pushing pe : %d to max heap\n", p.getProcId());
368   } else if (p.getTotalLoad() < lower_threshold) {
369     parr_below_avg.push_back(p_index);
370     proc_load_info[p_index] = true;
371     //CkPrintf("\t Adding pe : %d to less loaded\n", p.getProcId());
372   }
373
374   // If the newly assigned processor's load is greater than the average
375   // by the threshold value, then push it into the max heap.
376   if (possible_pe_procinfo.getTotalLoad() > upper_threshold) {
377     // TODO: It should be the index in procarray :(
378     parr_above_avg.push_back(possible_pe);
379     std::push_heap(parr_above_avg.begin(), parr_above_avg.end(),
380         ProcLoadGreater(parr));
381     removeFromArray(possible_pe, parr_below_avg);
382     proc_load_info[possible_pe] = false;
383     //CkPrintf("\t Pusing pe : %d to max heap\n", possible_pe);
384   } else if (possible_pe_procinfo.getTotalLoad() < lower_threshold) {
385   } else {
386     removeFromArray(possible_pe, parr_below_avg);
387     proc_load_info[possible_pe] = false;
388     //CkPrintf("\t Removing from lower list pe : %d\n", possible_pe);
389   }
390
391 }
392
393 inline void getPossiblePes(std::vector<int>& possible_pes, int vert,
394     ObjGraph *ogr, ProcArray* parr) {
395   std::map<int, int> tmp_map_pid_index;
396   int counter = 0;
397   int index;
398   int i, j, nbrid;
399   ObjPeCommInfo objpcomm;
400  // CkPrintf("%d sends msgs to %d and recv msgs from %d\n", vert,
401  //   ogr->vertices[vert].sendToList.size(),
402  //   ogr->vertices[vert].recvFromList.size());
403   
404   for (i = 0; i < ogr->vertices[vert].sendToList.size(); i++) {
405     nbrid = ogr->vertices[vert].sendToList[i].getNeighborId();
406     j = ogr->vertices[nbrid].getNewPe(); // Fix me!! New PE
407     // TODO: Should it index with vertexId?
408     if (tmp_map_pid_index.count(j) == 0) {
409       tmp_map_pid_index[j] = counter;
410       PeCommInfo pecomminf(j);
411       // TODO: Shouldn't it use vertexId instead of vert?
412       objpcomm.pcomm.push_back(pecomminf);
413       counter++;
414     }
415     index = tmp_map_pid_index[j];
416
417     objpcomm.pcomm[index].num_msg +=
418       ogr->vertices[vert].sendToList[i].getNumMsgs();
419     objpcomm.pcomm[index].num_bytes +=
420       ogr->vertices[vert].sendToList[i].getNumBytes();
421   }
422
423   for (i = 0; i < ogr->vertices[vert].recvFromList.size(); i++) {
424     nbrid = ogr->vertices[vert].recvFromList[i].getNeighborId();
425     j = ogr->vertices[nbrid].getNewPe();
426
427     if (tmp_map_pid_index.count(j) == 0) {
428       tmp_map_pid_index[j] = counter;
429       PeCommInfo pecomminf(j);
430       // TODO: Shouldn't it use vertexId instead of vert?
431       objpcomm.pcomm.push_back(pecomminf);
432       counter++;
433     }
434     index = tmp_map_pid_index[j];
435
436     objpcomm.pcomm[index].num_msg +=
437       ogr->vertices[vert].sendToList[i].getNumMsgs();
438     objpcomm.pcomm[index].num_bytes +=
439       ogr->vertices[vert].sendToList[i].getNumBytes();
440   }
441
442   // Sort the pe communication vector for this chare
443   std::sort(objpcomm.pcomm.begin(), objpcomm.pcomm.end(),
444       ProcCommGreater());
445
446   int pe_id;
447   int node_id;
448   int node_size;
449   int node_first;
450   //CkPrintf("%d talks to %d pes and possible pes are :\n", vert,
451   //    objpcomm.pcomm.size());
452   for (i = 0; i < objpcomm.pcomm.size(); i++) {
453     pe_id = objpcomm.pcomm[i].pe_id;
454     node_id = CkNodeOf(pe_id);
455     node_size = CkNodeSize(node_id);
456     node_first = CkNodeFirst(node_id);
457    // CkPrintf("smp details pe_id %d, node_id %d, node_size %d, node_first %d\n",
458    //   pe_id, node_id, node_size, node_first);
459     for (j = 0; j < node_size; j++) {
460       possible_pes.push_back(node_first + j);
461       //CkPrintf("\t %d:%d (comm: %d)\n",node_id, node_first+j, objpcomm.pcomm[i].num_bytes); 
462     }
463   }
464 }
465
466
467 #include "CommAwareRefineLB.def.h"
468
469 /*@}*/
470