Merge branch 'charm' of charmgit:charm into charm
[charm.git] / src / ck-ldb / Refiner.C
1 /**
2  * \addtogroup CkLdb
3 */
4 /*@{*/
5
6 /** This code is derived from RefineLB.C, and RefineLB.C should
7  be rewritten to use this, so there is no code duplication
8 */
9
10 #include "Refiner.h"
11
12 int* Refiner::AllocProcs(int count, BaseLB::LDStats* stats)
13 {
14   return new int[stats->n_objs];
15 }
16
17 void Refiner::FreeProcs(int* bufs)
18 {
19   delete [] bufs;
20 }
21
22 void Refiner::create(int count, BaseLB::LDStats* stats, int* procs)
23 {
24   int i;
25
26   // now numComputes is all the computes: both migratable and nonmigratable.
27   // afterwards, nonmigratable computes will be taken off
28
29   numAvail = 0;
30   for(i=0; i < P; i++) {
31     processors[i].Id = i;
32     processors[i].backgroundLoad = stats->procs[i].bg_walltime;
33     processors[i].load = processors[i].backgroundLoad;
34     processors[i].computeLoad = 0;
35     processors[i].computeSet = new Set();
36     processors[i].pe_speed = stats->procs[i].pe_speed;
37 //    processors[i].utilization = stats->procs[i].utilization;
38     processors[i].available = stats->procs[i].available;
39     if (processors[i].available == CmiTrue) numAvail++;
40   }
41
42   for (i=0; i<stats->n_objs; i++)
43   {
44         LDObjData &odata = stats->objData[i];
45         computes[i].Id = i;
46         computes[i].id = odata.objID();
47 //        computes[i].handle = odata.handle;
48         computes[i].load = odata.wallTime;     // was cpuTime
49         computes[i].processor = -1;
50         computes[i].oldProcessor = procs[i];
51         computes[i].migratable = odata.migratable;
52         if (computes[i].oldProcessor >= P)  {
53           if (stats->complete_flag)
54             CmiAbort("LB Panic: the old processor in RefineLB cannot be found, is this in a simulation mode?");
55           else {
56               // an object from outside domain, randomize its location
57             computes[i].oldProcessor = CrnRand()%P;
58           }
59         }
60   }
61 //  for (i=0; i < numComputes; i++)
62 //      processors[computes[i].oldProcessor].computeLoad += computes[i].load;
63 }
64
65 void Refiner::assign(computeInfo *c, int processor)
66 {
67   assign(c, &(processors[processor]));
68 }
69
70 void Refiner::assign(computeInfo *c, processorInfo *p)
71 {
72    c->processor = p->Id;
73    p->computeSet->insert((InfoRecord *) c);
74    p->computeLoad += c->load;
75    p->load = p->computeLoad + p->backgroundLoad;
76 }
77
78 void  Refiner::deAssign(computeInfo *c, processorInfo *p)
79 {
80    c->processor = -1;
81    p->computeSet->remove(c);
82    p->computeLoad -= c->load;
83    p->load = p->computeLoad + p->backgroundLoad;
84 }
85
86 void Refiner::computeAverage()
87 {
88   int i;
89   double total = 0.;
90   for (i=0; i<numComputes; i++) total += computes[i].load;
91
92   for (i=0; i<P; i++)
93     if (processors[i].available == CmiTrue) 
94         total += processors[i].backgroundLoad;
95
96   averageLoad = total/numAvail;
97 }
98
99 double Refiner::computeMax()
100 {
101   int i;
102   double max = -1.0;
103   for (i=0; i<P; i++) {
104     if (processors[i].available == CmiTrue && processors[i].load > max)
105       max = processors[i].load;
106   }
107   return max;
108 }
109
110 int Refiner::isHeavy(processorInfo *p)
111 {
112   if (p->available == CmiTrue) 
113      return p->load > overLoad*averageLoad;
114   else {
115      return p->computeSet->numElements() != 0;
116   }
117 }
118
119 int Refiner::isLight(processorInfo *p)
120 {
121   if (p->available == CmiTrue) 
122      return p->load < averageLoad;
123   else 
124      return 0;
125 }
126
127 // move the compute jobs out from unavailable PE
128 void Refiner::removeComputes()
129 {
130   int first;
131   Iterator nextCompute;
132
133   if (numAvail < P) {
134     if (numAvail == 0) CmiAbort("No processor available!");
135     for (first=0; first<P; first++)
136       if (processors[first].available == CmiTrue) break;
137     for (int i=0; i<P; i++) {
138       if (processors[i].available == CmiFalse) {
139           computeInfo *c = (computeInfo *)
140                    processors[i].computeSet->iterator((Iterator *)&nextCompute);
141           while (c) {
142             deAssign(c, &processors[i]);
143             assign(c, &processors[first]);
144             nextCompute.id++;
145             c = (computeInfo *)
146                    processors[i].computeSet->next((Iterator *)&nextCompute);
147           }
148       }
149     }
150   }
151 }
152
153 int Refiner::refine()
154 {
155   int i;
156   int finish = 1;
157   maxHeap *heavyProcessors = new maxHeap(P);
158
159   Set *lightProcessors = new Set();
160   for (i=0; i<P; i++) {
161     if (isHeavy(&processors[i])) {  
162       //      CkPrintf("Processor %d is HEAVY: load:%f averageLoad:%f!\n",
163       //               i, processors[i].load, averageLoad);
164       heavyProcessors->insert((InfoRecord *) &(processors[i]));
165     } else if (isLight(&processors[i])) {
166       //      CkPrintf("Processor %d is LIGHT: load:%f averageLoad:%f!\n",
167       //               i, processors[i].load, averageLoad);
168       lightProcessors->insert((InfoRecord *) &(processors[i]));
169     }
170   }
171   int done = 0;
172
173   while (!done) {
174     double bestSize;
175     computeInfo *bestCompute;
176     processorInfo *bestP;
177     
178     processorInfo *donor = (processorInfo *) heavyProcessors->deleteMax();
179     if (!donor) break;
180
181     //find the best pair (c,receiver)
182     Iterator nextProcessor;
183     processorInfo *p = (processorInfo *) 
184       lightProcessors->iterator((Iterator *) &nextProcessor);
185     bestSize = 0;
186     bestP = 0;
187     bestCompute = 0;
188
189     while (p) {
190       Iterator nextCompute;
191       nextCompute.id = 0;
192       computeInfo *c = (computeInfo *) 
193         donor->computeSet->iterator((Iterator *)&nextCompute);
194       // iout << iINFO << "Considering Procsessor : " 
195       //      << p->Id << "\n" << endi;
196       while (c) {
197         if (!c->migratable) {
198           nextCompute.id++;
199           c = (computeInfo *) 
200             donor->computeSet->next((Iterator *)&nextCompute);
201           continue;
202         }
203         //CkPrintf("c->load: %f p->load:%f overLoad*averageLoad:%f \n",
204         //c->load, p->load, overLoad*averageLoad);
205         if ( c->load + p->load < overLoad*averageLoad) {
206           // iout << iINFO << "Considering Compute : " 
207           //      << c->Id << " with load " 
208           //      << c->load << "\n" << endi;
209           if(c->load > bestSize) {
210             bestSize = c->load;
211             bestCompute = c;
212             bestP = p;
213           }
214         }
215         nextCompute.id++;
216         c = (computeInfo *) 
217           donor->computeSet->next((Iterator *)&nextCompute);
218       }
219       p = (processorInfo *) 
220         lightProcessors->next((Iterator *) &nextProcessor);
221     }
222
223     if (bestCompute) {
224       //      CkPrintf("Assign: [%d] with load: %f from %d to %d \n",
225       //               bestCompute->id.id[0], bestCompute->load, 
226       //               donor->Id, bestP->Id);
227       deAssign(bestCompute, donor);      
228       assign(bestCompute, bestP);
229     } else {
230       finish = 0;
231       break;
232     }
233
234     if (bestP->load > averageLoad)
235       lightProcessors->remove(bestP);
236     
237     if (isHeavy(donor))
238       heavyProcessors->insert((InfoRecord *) donor);
239     else if (isLight(donor))
240       lightProcessors->insert((InfoRecord *) donor);
241   }  
242
243   delete heavyProcessors;
244   delete lightProcessors;
245
246   return finish;
247 }
248
249 int Refiner::multirefine()
250 {
251   computeAverage();
252   double avg = averageLoad;
253   double max = computeMax();
254
255   const double overloadStep = 0.01;
256   const double overloadStart = 1.001;
257   double dCurOverload = max / avg;
258                                                                                 
259   int minOverload = 0;
260   int maxOverload = (int)((dCurOverload - overloadStart)/overloadStep + 1);
261   double dMinOverload = minOverload * overloadStep + overloadStart;
262   double dMaxOverload = maxOverload * overloadStep + overloadStart;
263   int curOverload;
264   int refineDone = 0;
265   if (_lb_args.debug()>=1)
266     CmiPrintf("dMinOverload: %f dMaxOverload: %f\n", dMinOverload, dMaxOverload);
267                                                                                 
268   overLoad = dMinOverload;
269   if (refine())
270     refineDone = 1;
271   else {
272     overLoad = dMaxOverload;
273     if (!refine()) {
274       CmiPrintf("ERROR: Could not refine at max overload\n");
275       refineDone = 1;
276     }
277   }
278                                                                                 
279   // Scan up, until we find a refine that works
280   while (!refineDone) {
281     if (maxOverload - minOverload <= 1)
282       refineDone = 1;
283     else {
284       curOverload = (maxOverload + minOverload ) / 2;
285                                                                                 
286       overLoad = curOverload * overloadStep + overloadStart;
287       if (_lb_args.debug()>=1)
288       CmiPrintf("Testing curOverload %d = %f [min,max]= %d, %d\n", curOverload, overLoad, minOverload, maxOverload);
289       if (refine())
290         maxOverload = curOverload;
291       else
292         minOverload = curOverload;
293     }
294   }
295   return 1;
296 }
297
298 void Refiner::Refine(int count, BaseLB::LDStats* stats, 
299                      int* cur_p, int* new_p)
300 {
301   //  CkPrintf("[%d] Refiner strategy\n",CkMyPe());
302
303   P = count;
304   numComputes = stats->n_objs;
305   computes = new computeInfo[numComputes];
306   processors = new processorInfo[count];
307
308   create(count, stats, cur_p);
309
310   int i;
311   for (i=0; i<numComputes; i++)
312     assign((computeInfo *) &(computes[i]),
313            (processorInfo *) &(processors[computes[i].oldProcessor]));
314
315   removeComputes();
316
317   computeAverage();
318
319   if (_lb_args.debug()>2)  {
320     CkPrintf("Old PE load (bg load): ");
321     for (i=0; i<count; i++) CkPrintf("%d:%f(%f) ", i, processors[i].load, processors[i].backgroundLoad);
322     CkPrintf("\n");
323   }
324
325   multirefine();
326
327   int nmoves = 0;
328   for (int pe=0; pe < P; pe++) {
329     Iterator nextCompute;
330     nextCompute.id = 0;
331     computeInfo *c = (computeInfo *)
332       processors[pe].computeSet->iterator((Iterator *)&nextCompute);
333     while(c) {
334       new_p[c->Id] = c->processor;
335       if (new_p[c->Id] != cur_p[c->Id]) nmoves++;
336 //      if (c->oldProcessor != c->processor)
337 //      CkPrintf("Refiner::Refine: from %d to %d\n", c->oldProcessor, c->processor);
338       nextCompute.id++;
339       c = (computeInfo *) processors[pe].computeSet->
340                      next((Iterator *)&nextCompute);
341     }
342   }
343   if (_lb_args.debug()>2)  {
344     CkPrintf("New PE load: ");
345     for (i=0; i<count; i++) CkPrintf("%f ", processors[i].load);
346     CkPrintf("\n");
347   }
348   if (_lb_args.debug()>1) 
349     CkPrintf("Refiner: moving %d obejcts. \n", nmoves);
350   delete [] computes;
351   delete [] processors;
352 }
353
354
355 /*@}*/