fc925624b4fe2b3fce9eeb90bf25af51a0ff2f5f
[charm.git] / src / ck-ldb / RefinerApprox.C
1 #include "elements.h"
2 #include "ckheap.h"
3 #include "RefinerApprox.h"
4
5 int _lb_debug=0;
6
7 void RefinerApprox::create(int count, CentralLB::LDStats* stats, int* procs)
8 {
9   int i;
10   // now numComputes is all the computes: both migratable and nonmigratable.
11   // afterwards, nonmigratable computes will be taken off
12
13   numAvail = 0;
14   for(i=0; i < P; i++) 
15   {
16     processors[i].Id = i;
17     processors[i].backgroundLoad = stats->procs[i].bg_walltime;
18 //    processors[i].backgroundLoad = 0;
19     processors[i].computeLoad = 0;
20     processors[i].load = processors[i].backgroundLoad;
21     processors[i].computeSet = new Set();
22     processors[i].pe_speed = stats->procs[i].pe_speed;
23 //    processors[i].utilization = stats->procs[i].utilization;
24     processors[i].available = stats->procs[i].available;
25     if (processors[i].available == CmiTrue) numAvail++;
26   }
27
28   int index=0;
29   for (i=0; i<stats->n_objs; i++)
30   {
31   
32       LDObjData &odata = stats->objData[i];
33       if (odata.migratable == CmiTrue)
34       {
35         computes[index].id = odata.objID();
36         computes[index].Id = i;
37  //       computes[index].handle = odata.handle;
38         computes[index].load = odata.wallTime;
39         computes[index].processor = -1;
40         computes[index].oldProcessor = procs[i];
41         computes[index].migratable = odata.migratable;
42         if (computes[index].oldProcessor >= P)  {
43           if (stats->complete_flag) {
44             CmiPrintf("LB Panic: the old processor %d of obj %d in RefineKLB cannot be found, is this in a simulation mode?\n", computes[index].oldProcessor, i);
45             CmiAbort("Abort!");
46           }
47           else {
48               // an object from outside domain, randomize its location
49             computes[i].oldProcessor = CrnRand()%P;
50           }
51         }
52         index ++;
53       }
54       else
55       {
56         // put this compute into background load
57         processors[procs[i]].backgroundLoad += odata.wallTime;
58         processors[procs[i]].load+= odata.wallTime;
59         numComputes--;
60       }
61   }
62   //for (i=0; i < numComputes; i++)
63     //  processors[computes[i].oldProcessor].computeLoad += computes[i].load;
64 }
65
66 void RefinerApprox::reinitAssignment()
67 {
68     int i;
69     // now numComputes is all the computes: both migratable and nonmigratable.
70     // afterwards, nonmigratable computes will be taken off
71
72     for(i=0;i<P;i++)
73     {
74         Iterator nextCompute;
75         nextCompute.id=0;
76         computeInfo *c = (computeInfo *)
77             processors[i].computeSet->iterator((Iterator *)&nextCompute);
78         while(c)
79         {
80             if(c->oldProcessor!=i)
81             {
82                 deAssign(c,&processors[i]);
83                 if(c->oldProcessor!=-1)
84                 {
85                         assign(c,c->oldProcessor);
86                 }
87                 else
88                 {
89                         assign(c,0);
90                 }
91             }
92             nextCompute.id++;
93             c = (computeInfo *)
94                 processors[i].computeSet->next((Iterator *)&nextCompute);
95         }
96     }
97 }
98
99 void RefinerApprox::multirefine(int num_moves)
100 {
101   computeAverage();
102   double avg = averageLoad;
103   double max = computeMax();
104
105   int numMoves=0;
106   double lbound=avg;
107   double ubound=max;
108
109   double EPSILON=0.01;
110
111   numMoves=refine(avg);
112   
113   if(_lb_debug) CkPrintf("Refined within %d moves\n",numMoves);
114   if(numMoves<=num_moves)
115       return ;
116       
117   if(_lb_debug)CkPrintf("[ %lf , %lf ] = %lf > %lf\n",lbound,ubound,ubound-lbound,EPSILON*avg);
118   while(ubound-lbound> EPSILON*avg)
119   {
120       reinitAssignment();
121       double testVal=(ubound+lbound)/2;
122       numMoves=refine(testVal);
123       if(_lb_debug) CkPrintf("Refined within %d moves\n",numMoves);
124       if(numMoves>num_moves)
125       {
126           lbound=testVal;
127       }
128       else
129       {
130           ubound=testVal;
131       }
132       if(_lb_debug)CkPrintf("[ %lf , %lf ] = %lf > %lf\n",lbound,ubound,ubound-lbound,EPSILON*avg);
133   }
134   if(_lb_debug) CkPrintf("Refined within %d moves\n",numMoves);
135   return;
136 }
137
138 Set * RefinerApprox::removeBiggestSmallComputes(int num,processorInfo * p,double opt)
139 {
140     int numPComputes=p->computeSet->numElements();
141     double totalSmallLoad=0;
142     maxHeap *h=new maxHeap(numPComputes);
143     Set * removedComputes=new Set();
144     int numSmallPComputes=0;
145         
146     Iterator nextCompute;
147     nextCompute.id=0;
148     computeInfo *c = (computeInfo *)
149             p->computeSet->iterator((Iterator *)&nextCompute);
150
151     for(int i=0;i<numPComputes;i++)
152     {
153         if(c->load < opt/2)
154         {
155             h->insert((InfoRecord *)c);
156             numSmallPComputes++;
157         }
158         nextCompute.id++;
159         c = (computeInfo *)
160             p->computeSet->next((Iterator *)&nextCompute);
161     }
162
163     if(p->backgroundLoad <opt/2)
164     {
165         totalSmallLoad+=p->backgroundLoad;
166     }
167     if(numSmallPComputes<num)
168     {
169         if(_lb_debug)CkPrintf("Error[%d]: Cant remove %d small computes from a total of %d small computes\n",p->Id,num,numSmallPComputes);
170     }
171
172     //while(totalSmallLoad > opt/2)
173     int j;
174     for(j=0;j<num;j++)
175     {
176         computeInfo *rec=(computeInfo *)(h->deleteMax());
177         removedComputes->insert((InfoRecord *)rec);
178         totalSmallLoad-=rec->load;
179     }
180
181     delete h;
182     return removedComputes;
183 }
184
185 Set * RefinerApprox::removeBigComputes(int num,processorInfo * p,double opt)
186 {
187     int numPComputes=p->computeSet->numElements();
188     if(num>numPComputes)
189     {
190         if(_lb_debug)CkPrintf("Error [%d]: Cant remove %d computes out of a total of %d\n",p->Id,num,numPComputes);
191         return new Set();
192     }
193     double totalLoad=p->load;
194     maxHeap *h=new maxHeap(numPComputes);
195     Set * removedComputes=new Set();
196         
197     Iterator nextCompute;
198     nextCompute.id=0;
199     computeInfo *c = (computeInfo *)
200             p->computeSet->iterator((Iterator *)&nextCompute);
201
202     for(int i=0;i<numPComputes;i++)
203     {
204         h->insert((InfoRecord *)c);
205         nextCompute.id++;
206         c = (computeInfo *)
207             p->computeSet->next((Iterator *)&nextCompute);
208     }
209
210     int j;
211     for(j=0;j<num;j++)
212     {
213         computeInfo *rec=(computeInfo *)(h->deleteMax());
214         removedComputes->insert((InfoRecord *)rec);
215         totalLoad-=rec->load;
216     }
217
218     delete h;
219     return removedComputes;
220 }
221
222 double RefinerApprox::getLargestCompute()
223 {
224     double largestC=0;
225     for(int i=0;i<P;i++)
226     {
227         if(processors[i].backgroundLoad > largestC)
228             largestC=processors[i].backgroundLoad;
229
230         Iterator nextCompute;
231         nextCompute.id=0;
232         computeInfo *c = (computeInfo *)
233             processors[i].computeSet->iterator((Iterator *)&nextCompute);
234         while(c)
235         {
236             if(c->load > largestC)
237             {
238                 largestC=c->load;
239             }
240             nextCompute.id++;
241             c = (computeInfo *)
242                 processors[i].computeSet->next((Iterator *)&nextCompute);
243         }
244     }
245     return largestC;
246 }
247
248 int RefinerApprox::getNumLargeComputes(double opt)
249 {
250     int numLarge=0;
251     for(int i=0;i<P;i++)
252     {
253         if(processors[i].backgroundLoad>=(opt/2))
254             numLarge++;
255         Iterator nextCompute;
256         nextCompute.id=0;
257         computeInfo *c = (computeInfo *)
258             processors[i].computeSet->iterator((Iterator *)&nextCompute);
259         int numC=0;
260 //      CkPrintf("Processor %d \n",i);
261         while(c)
262         {
263             numC++;
264 //          CkPrintf("%d  ",numC);
265             if(c->load>(opt/2))
266                 numLarge++;
267       
268             nextCompute.id++;
269             c = (computeInfo *)
270                 processors[i].computeSet->next((Iterator *)&nextCompute);
271         }
272     }
273     return numLarge;
274     
275 }
276
277 int RefinerApprox::computeA(processorInfo *p,double opt)
278 {
279     int numPComputes=p->computeSet->numElements();
280     double totalSmallLoad=0;
281     maxHeap *h=new maxHeap(numPComputes);
282         
283     Iterator nextCompute;
284     nextCompute.id=0;
285     computeInfo *c = (computeInfo *)
286             p->computeSet->iterator((Iterator *)&nextCompute);
287
288     for(int i=0;i<numPComputes;i++)
289     {
290         if(c->load < opt/2)
291         {
292             totalSmallLoad+=c->load;
293             h->insert((InfoRecord *)c);
294         }
295         nextCompute.id++;
296         c = (computeInfo *)
297             p->computeSet->next((Iterator *)&nextCompute);
298     }
299
300     if(p->backgroundLoad <opt/2)
301     {
302         totalSmallLoad+=p->backgroundLoad;
303     }
304
305     int avalue=0;
306     while(totalSmallLoad > opt/2)
307     {
308         avalue++;
309         InfoRecord *rec=h->deleteMax();
310         totalSmallLoad-=rec->load;
311     }
312     delete h;
313     return avalue;
314 }
315
316 int RefinerApprox::computeB(processorInfo *p,double opt)
317 {
318     int numPComputes=p->computeSet->numElements();
319     double totalLoad=p->load;
320     if(p->backgroundLoad > opt)
321     {
322         if(_lb_debug)
323             CkPrintf("Error in computeB: Background load greater than OPT!\n");
324         return 0;
325     }
326     maxHeap *h=new maxHeap(numPComputes);
327         
328     Iterator nextCompute;
329     nextCompute.id=0;
330     computeInfo *c = (computeInfo *)
331             p->computeSet->iterator((Iterator *)&nextCompute);
332
333     for(int i=0;i<numPComputes;i++)
334     {
335         h->insert((InfoRecord*)c);
336         nextCompute.id++;
337         c = (computeInfo *)
338             p->computeSet->next((Iterator *)&nextCompute);
339     }
340
341     int bvalue=0;
342     while(totalLoad > opt)
343     {
344         bvalue++;
345         InfoRecord *rec=h->deleteMax();
346         totalLoad-=rec->load;
347     }
348
349     delete h;
350     return bvalue;
351 }
352
353 int RefinerApprox::refine(double opt)
354 {
355     int i;
356     if(_lb_debug)CkPrintf("RefinerApprox::refine called with %lf\n",opt);
357     if(opt<averageLoad)
358         return INFTY;
359
360     int numLargeComputes=getNumLargeComputes(opt);
361     if(_lb_debug) CkPrintf("Num of large Computes %d for opt = %10f\n",numLargeComputes,opt);
362     if(numLargeComputes>P)
363         return INFTY;
364     if(getLargestCompute()>opt)
365         return INFTY;
366     //CkPrintf("Not returning INFTY\n");
367
368     //a[i]= min. number of small jobs to be removed so that total size
369     //of remaining small jobs is at most opt/2
370     int *a=new int[P];
371     //b[i]= min. number of jobs to be removed so that total size
372     //of remaining jobs (including large jobs) is at most opt
373     int *b=new int[P];
374     bool *largeFree=new bool[P];
375     Set *largeComputes=new Set();
376     Set *smallComputes=new Set();
377     int nmoves=0;
378
379     //Step 1: Remove all but one large computes on each node
380     for(i=0;i<P;i++)
381     {
382         computeInfo *smallestLargeCompute=NULL;
383         largeFree[i]=true;
384         Iterator nextCompute;
385         nextCompute.id=0;
386         computeInfo *c = (computeInfo *)
387             processors[i].computeSet->iterator((Iterator *)&nextCompute);
388         while(c)
389         {
390             if(c->load>opt/2)
391             {
392                 largeFree[i]=false;
393                 largeComputes->insert((InfoRecord*)c);
394                 deAssign(c,&(processors[i]));
395                 if (smallestLargeCompute==NULL)
396                 {
397                     smallestLargeCompute=c;
398                 }
399                 else if(smallestLargeCompute->load > c->load)
400                 {
401                     smallestLargeCompute=c;
402                 }
403             }
404             nextCompute.id++;
405             c = (computeInfo *)
406                 processors[i].computeSet->next((Iterator *)&nextCompute);
407         }
408         //Check if processor's fixed load is itself large
409         if(processors[i].backgroundLoad>opt/2)
410         {
411             largeFree[i]=false;
412         }
413         else 
414         {
415             if(smallestLargeCompute)
416             {
417                 assign(smallestLargeCompute,i);
418                 largeComputes->remove((InfoRecord*)smallestLargeCompute);
419             }
420         }
421         if(!largeFree[i]) 
422         {
423             if(_lb_debug)
424                 CkPrintf("Processor %d not LargeFree !\n",i);
425         }
426
427         //Step 2: Calculate a[i] and b[i] for each proc.
428         a[i]=computeA(&processors[i],opt);
429         b[i]=computeB(&processors[i],opt);
430     }
431
432     //Step 3: Select L_t(=numLargeComputes) procs with minimum c[i] (=a[i]-b[i]) values.
433     //Remove a[i] small jobs from each to get small job load at most opt/2
434     minHeap *cHeap=new minHeap(P);
435     for(i=0;i<P;i++)
436     {
437         InfoRecord *ci=new InfoRecord();
438         ci->load=a[i]-b[i];
439         ci->Id=i;
440         cHeap->insert(ci);
441     }
442
443     //Set of largeFree procs created with (small jobs < opt/2)
444     minHeap *largeFreeLightProcs=new minHeap(P);
445
446     for(i=0;i<numLargeComputes;i++)
447     {
448         if(_lb_debug) CkPrintf("Removing a large compute %d\n",i);
449         //Remove biggest a_k computes from L_t procs
450         InfoRecord *cdata= cHeap->deleteMin();
451         Set *smallComputesRemoved= removeBiggestSmallComputes(a[cdata->Id],&(processors[cdata->Id]),opt);
452         if(largeFree[cdata->Id])
453             largeFreeLightProcs->insert((InfoRecord *)&(processors[cdata->Id]));
454
455         // Keep removed small computes in unassigned set for now
456         Iterator nextCompute;
457         nextCompute.id=0;
458         computeInfo *c=(computeInfo *)
459             smallComputesRemoved->iterator((Iterator*)&nextCompute);
460         while(c)
461         {
462             deAssign(c,&(processors[cdata->Id]));
463             if(c->load > opt/2)
464             {
465                 if (_lb_debug) CkPrintf(" Error : Large compute not expected here\n");
466             }
467             else
468             {
469                 smallComputes->insert((InfoRecord *)c);
470             }
471             nextCompute.id++;
472             c = (computeInfo *)
473                 smallComputesRemoved->next((Iterator *)&nextCompute);
474         }
475         delete smallComputesRemoved;
476         delete cdata;
477     }
478
479     //Step 4 :
480     //Remove biggest b computes from P - L_t procs
481     // Assign removed large computes to large free procs created in Step 3
482     // Keep removed small computes unassigned for now.
483     for(i=numLargeComputes;i<P;i++)
484     {
485         //Remove biggest b computes from P - L_t procs
486         InfoRecord *cdata= cHeap->deleteMin();
487         Set *computesRemoved=removeBigComputes(b[cdata->Id],&(processors[cdata->Id]),opt);
488
489         // Assign removed large computes to large free procs created in Step 3
490         // Keep removed small computes unassigned for now.
491         Iterator nextCompute;
492         nextCompute.id=0;
493         computeInfo *c=(computeInfo *)
494             computesRemoved->iterator((Iterator*)&nextCompute);
495         while(c)
496         {
497             deAssign(c,&(processors[cdata->Id]));
498             if(c->load > opt/2)
499             {
500                 processorInfo *targetproc=(processorInfo *)largeFreeLightProcs->deleteMin();
501                 assign(c,targetproc);
502                 largeFree[cdata->Id]=true;
503                 largeFree[targetproc->Id]=false;
504             }
505             else
506             {
507                 smallComputes->insert((InfoRecord *)c);
508             }
509             nextCompute.id++;
510             c = (computeInfo *)
511                 computesRemoved->next((Iterator *)&nextCompute);
512         }
513         delete computesRemoved;
514         delete cdata;
515     }
516     delete cHeap;
517     
518     //Step 5: Arbitrarily assign remaining largeComputes to large-free procs
519     int j=0;
520     Iterator nextCompute;
521     nextCompute.id=0;
522     computeInfo *c=(computeInfo *)
523         largeComputes->iterator((Iterator*)&nextCompute);
524     if(_lb_debug)
525     {
526         if(c) 
527         {
528             CkPrintf("Reassigning Large computes removes in Step 1\n");
529         }
530         else
531         {
532             CkPrintf("No  Large computes removed in Step 1\n");
533         }
534     }
535     while(c)
536     {
537         /*
538         //BUG:: Assign to largeFreeLight Procs instead of largeFree procs
539         while(!(largeFree[j]) && j<P-1)
540         {
541             j++;
542         }
543         if(!largeFree[j])
544         {
545             if(_lb_debug) CkPrintf("Error finding a large free processor in Step 5\n");
546         }
547         assign(c,j);
548         largeFree[j]=false;
549         */
550
551         processorInfo *targetproc=(processorInfo *)largeFreeLightProcs->deleteMin();
552         if(_lb_debug)
553         {
554             if(!targetproc)
555                 CkPrintf("Error finding a large free light proc\n");
556         }
557         assign(c,targetproc);
558         largeFree[targetproc->Id]=false;
559         nextCompute.id++;
560         c = (computeInfo *)
561                 largeComputes->next((Iterator *)&nextCompute);
562     }
563
564     //Step 6: Assign remaining small jobs one by one to least loaded proc
565     minHeap *procsLoad=new minHeap(P);
566     for(i=0;i<P;i++)
567     {
568         procsLoad->insert((InfoRecord *) &(processors[i]) );
569     }
570     nextCompute.id=0;
571     c=(computeInfo *)
572         smallComputes->iterator((Iterator*)&nextCompute);
573     while(c)
574     {
575         processorInfo *leastLoadedP=(processorInfo *)procsLoad->deleteMin();
576         assign(c,leastLoadedP);
577         procsLoad->insert((InfoRecord *)  leastLoadedP);
578         nextCompute.id++;
579         c = (computeInfo *)
580                 smallComputes->next((Iterator *)&nextCompute);
581     }
582
583     delete largeFreeLightProcs;
584     delete procsLoad;
585     delete [] a;
586     delete [] b;
587     delete [] largeFree;
588     delete largeComputes;
589     delete smallComputes;
590     return numMoves();
591
592
593 int RefinerApprox::numMoves()
594 {
595     int nmoves=0;
596     for(int i=0;i<numComputes;i++)
597     {
598         if(computes[i].processor!=computes[i].oldProcessor)
599             nmoves++;
600     }
601     return nmoves;
602 }
603
604 void RefinerApprox::Refine(int count, CentralLB::LDStats* stats, 
605                      int* cur_p, int* new_p, int percentMoves)
606 {
607     
608   if(_lb_debug) CkPrintf("\n\n");
609   if(_lb_debug) CkPrintf("[%d] RefinerApprox strategy\n",CkMyPe());
610   P = count;
611   numComputes = stats->n_objs;
612   computes = new computeInfo[numComputes];
613   processors = new processorInfo[count];
614
615   if(_lb_debug) CkPrintf("Total Number of computes : %d\n",numComputes);
616
617   create(count, stats, cur_p);
618   if(_lb_debug) printStats(0);
619
620   int i;
621   for (i=0; i<numComputes; i++)
622   {
623       if(computes[i].oldProcessor!=-1)
624       //if(false)
625       {
626             assign((computeInfo *) &(computes[i]),
627                    (processorInfo *) &(processors[computes[i].oldProcessor]));
628       }
629       else
630       {
631             assign((computeInfo *) &(computes[i]),
632                    (processorInfo *) &(processors[0]));
633       }
634   }
635   
636   if(_lb_debug) CkPrintf("Total Migratable computes : %d\n\n",numComputes);
637   if(_lb_debug) CkPrintf("Total  processors : %d\n",P);
638   if(_lb_debug) CkPrintf("Total  available processors : %d\n",numAvail);
639
640   removeComputes();
641
642   computeAverage();
643
644   if(_lb_debug) CkPrintf("Avearge load : %lf\n",averageLoad);
645   if(_lb_debug) printStats(0);
646
647  
648   int numAllowedMoves=(int)(percentMoves*numComputes/100.0);
649   if(numAllowedMoves<0)
650     numAllowedMoves=0;
651   if(numAllowedMoves>numComputes)
652     numAllowedMoves=numComputes;
653
654   if(_lb_args.debug())
655   {
656     CkPrintf("Percent of allowed moves = %d\n",percentMoves);
657     CkPrintf("Number of allowed moves = %d\n",numAllowedMoves);
658   }
659   //multirefine(numComputes);
660   multirefine(numAllowedMoves);
661
662   int nmoves = 0;
663
664   //Initialize new_p[i] to cur_p[i]
665   //so that non-migratable computes which
666   //are ignored in the calcuation get their
667   //new_p asssigned same as cur_p
668   for(i=0;i<stats->n_objs;i++)
669   {
670       new_p[i]=cur_p[i];
671   }
672
673
674   for (int pe=0; pe < P; pe++) 
675   {
676       Iterator nextCompute;
677       nextCompute.id = 0;
678       computeInfo *c = (computeInfo *)
679           processors[pe].computeSet->iterator((Iterator *)&nextCompute);
680     
681       while(c) 
682       {
683           new_p[c->Id] = c->processor;
684           if (new_p[c->Id] != cur_p[c->Id]) nmoves++;
685
686           nextCompute.id++;
687           c = (computeInfo *) processors[pe].computeSet->
688                      next((Iterator *)&nextCompute);
689       }
690   }
691   if (_lb_debug) CkPrintf("RefinerApprox: moving %d objects. \n", nmoves);
692   delete [] computes;
693   delete [] processors;
694 }
695
696
697 void  RefinerApprox::printStats(int newStats)
698 {
699         
700     CkPrintf("%Proc#\tLoad\tObjLoad\tBgdLoad\n");
701     for(int i=0;i<P;i++)
702     {
703         CkPrintf("%d\t\t%lf\t%lf\t%lf\n",i,processors[i].load,processors[i].computeLoad,processors[i].backgroundLoad);
704     }
705     
706 }
707