doc: Add serial to list of ci file reserved words
[charm.git] / src / ck-ldb / RefinerApprox.C
1 #include "elements.h"
2 #include "ckheap.h"
3 #include "RefinerApprox.h"
4
5 int _lb_debug=0;
6
7 void RefinerApprox::create(int count, CentralLB::LDStats* stats, int* procs)
8 {
9   int i;
10   // now numComputes is all the computes: both migratable and nonmigratable.
11   // afterwards, nonmigratable computes will be taken off
12
13   numAvail = 0;
14   for(i=0; i < P; i++) 
15   {
16     processors[i].Id = i;
17     processors[i].backgroundLoad = stats->procs[i].bg_walltime;
18 //    processors[i].backgroundLoad = 0;
19     processors[i].computeLoad = 0;
20     processors[i].load = processors[i].backgroundLoad;
21     processors[i].computeSet = new Set();
22     processors[i].pe_speed = stats->procs[i].pe_speed;
23 //    processors[i].utilization = stats->procs[i].utilization;
24     processors[i].available = stats->procs[i].available;
25     if (processors[i].available == CmiTrue) numAvail++;
26   }
27
28   int index=0;
29   for (i=0; i<stats->n_objs; i++)
30   {
31   
32       LDObjData &odata = stats->objData[i];
33       if (odata.migratable == CmiTrue)
34       {
35         computes[index].id = odata.objID();
36         computes[index].Id = i;
37  //       computes[index].handle = odata.handle;
38         computes[index].load = odata.wallTime;
39         computes[index].processor = -1;
40         computes[index].oldProcessor = procs[i];
41         computes[index].migratable = odata.migratable;
42         if (computes[index].oldProcessor >= P)  {
43           if (stats->complete_flag) {
44             CmiPrintf("LB Panic: the old processor %d of obj %d in RefineKLB cannot be found, is this in a simulation mode?\n", computes[index].oldProcessor, i);
45             CmiAbort("Abort!");
46           }
47           else {
48               // an object from outside domain, randomize its location
49             computes[i].oldProcessor = CrnRand()%P;
50           }
51         }
52         index ++;
53       }
54       else
55       {
56         // put this compute into background load
57         processors[procs[i]].backgroundLoad += odata.wallTime;
58         processors[procs[i]].load+= odata.wallTime;
59         numComputes--;
60       }
61   }
62   //for (i=0; i < numComputes; i++)
63     //  processors[computes[i].oldProcessor].computeLoad += computes[i].load;
64 }
65
66 void RefinerApprox::reinitAssignment()
67 {
68     int i;
69     // now numComputes is all the computes: both migratable and nonmigratable.
70     // afterwards, nonmigratable computes will be taken off
71
72     for(i=0;i<P;i++)
73     {
74         Iterator nextCompute;
75         nextCompute.id=0;
76         computeInfo *c = (computeInfo *)
77             processors[i].computeSet->iterator((Iterator *)&nextCompute);
78         while(c)
79         {
80             if(c->oldProcessor!=i)
81             {
82                 deAssign(c,&processors[i]);
83                 if(c->oldProcessor!=-1)
84                 {
85                         assign(c,c->oldProcessor);
86                 }
87                 else
88                 {
89                         assign(c,0);
90                 }
91             }
92             nextCompute.id++;
93             c = (computeInfo *)
94                 processors[i].computeSet->next((Iterator *)&nextCompute);
95         }
96     }
97 }
98
99 void RefinerApprox::multirefine(int num_moves)
100 {
101   computeAverage();
102   double avg = averageLoad;
103   double max = computeMax();
104
105   int numMoves=0;
106   double lbound=avg;
107   double ubound=max;
108
109   double EPSILON=0.01;
110
111   numMoves=refine(avg);
112   
113   if(_lb_debug) CkPrintf("Refined within %d moves\n",numMoves);
114   if(numMoves<=num_moves)
115       return ;
116       
117   if(_lb_debug)CkPrintf("[ %lf , %lf ] = %lf > %lf\n",lbound,ubound,ubound-lbound,EPSILON*avg);
118   while(ubound-lbound> EPSILON*avg)
119   {
120       reinitAssignment();
121       double testVal=(ubound+lbound)/2;
122       numMoves=refine(testVal);
123       if(_lb_debug) CkPrintf("Refined within %d moves\n",numMoves);
124       if(numMoves>num_moves)
125       {
126           lbound=testVal;
127       }
128       else
129       {
130           ubound=testVal;
131       }
132       if(_lb_debug)CkPrintf("[ %lf , %lf ] = %lf > %lf\n",lbound,ubound,ubound-lbound,EPSILON*avg);
133   }
134   if(_lb_debug) CkPrintf("Refined within %d moves\n",numMoves);
135   return;
136 }
137
138 Set * RefinerApprox::removeBiggestSmallComputes(int num,processorInfo * p,double opt)
139 {
140     int numPComputes=p->computeSet->numElements();
141     double totalSmallLoad=0;
142     maxHeap *h=new maxHeap(numPComputes);
143     Set * removedComputes=new Set();
144     int numSmallPComputes=0;
145         
146     Iterator nextCompute;
147     nextCompute.id=0;
148     computeInfo *c = (computeInfo *)
149             p->computeSet->iterator((Iterator *)&nextCompute);
150
151     for(int i=0;i<numPComputes;i++)
152     {
153         if(c->load < opt/2)
154         {
155             h->insert((InfoRecord *)c);
156             numSmallPComputes++;
157         }
158         nextCompute.id++;
159         c = (computeInfo *)
160             p->computeSet->next((Iterator *)&nextCompute);
161     }
162
163     if(p->backgroundLoad <opt/2)
164     {
165         totalSmallLoad+=p->backgroundLoad;
166     }
167     if(numSmallPComputes<num)
168     {
169         if(_lb_debug)CkPrintf("Error[%d]: Cant remove %d small computes from a total of %d small computes\n",p->Id,num,numSmallPComputes);
170     }
171
172     //while(totalSmallLoad > opt/2)
173     int j;
174     for(j=0;j<num;j++)
175     {
176         computeInfo *rec=(computeInfo *)(h->deleteMax());
177         removedComputes->insert((InfoRecord *)rec);
178         totalSmallLoad-=rec->load;
179     }
180
181     delete h;
182     return removedComputes;
183 }
184
185 Set * RefinerApprox::removeBigComputes(int num,processorInfo * p,double opt)
186 {
187     int numPComputes=p->computeSet->numElements();
188     if(num>numPComputes)
189     {
190         if(_lb_debug)CkPrintf("Error [%d]: Cant remove %d computes out of a total of %d\n",p->Id,num,numPComputes);
191         return new Set();
192     }
193     double totalLoad=p->load;
194     maxHeap *h=new maxHeap(numPComputes);
195     Set * removedComputes=new Set();
196         
197     Iterator nextCompute;
198     nextCompute.id=0;
199     computeInfo *c = (computeInfo *)
200             p->computeSet->iterator((Iterator *)&nextCompute);
201
202     for(int i=0;i<numPComputes;i++)
203     {
204         h->insert((InfoRecord *)c);
205         nextCompute.id++;
206         c = (computeInfo *)
207             p->computeSet->next((Iterator *)&nextCompute);
208     }
209
210     int j;
211     for(j=0;j<num;j++)
212     {
213         computeInfo *rec=(computeInfo *)(h->deleteMax());
214         removedComputes->insert((InfoRecord *)rec);
215         totalLoad-=rec->load;
216     }
217
218     delete h;
219     return removedComputes;
220 }
221
222 double RefinerApprox::getLargestCompute()
223 {
224     double largestC=0;
225     for(int i=0;i<P;i++)
226     {
227         if(processors[i].backgroundLoad > largestC)
228             largestC=processors[i].backgroundLoad;
229
230         Iterator nextCompute;
231         nextCompute.id=0;
232         computeInfo *c = (computeInfo *)
233             processors[i].computeSet->iterator((Iterator *)&nextCompute);
234         while(c)
235         {
236             if(c->load > largestC)
237             {
238                 largestC=c->load;
239             }
240             nextCompute.id++;
241             c = (computeInfo *)
242                 processors[i].computeSet->next((Iterator *)&nextCompute);
243         }
244     }
245     return largestC;
246 }
247
248 int RefinerApprox::getNumLargeComputes(double opt)
249 {
250     int numLarge=0;
251     for(int i=0;i<P;i++)
252     {
253         if(processors[i].backgroundLoad>=(opt/2))
254             numLarge++;
255         Iterator nextCompute;
256         nextCompute.id=0;
257         computeInfo *c = (computeInfo *)
258             processors[i].computeSet->iterator((Iterator *)&nextCompute);
259         int numC=0;
260 //      CkPrintf("Processor %d \n",i);
261         while(c)
262         {
263             numC++;
264 //          CkPrintf("%d  ",numC);
265             if(c->load>(opt/2))
266                 numLarge++;
267       
268             nextCompute.id++;
269             c = (computeInfo *)
270                 processors[i].computeSet->next((Iterator *)&nextCompute);
271         }
272     }
273     return numLarge;
274     
275 }
276
277 int RefinerApprox::computeA(processorInfo *p,double opt)
278 {
279     int numPComputes=p->computeSet->numElements();
280     double totalSmallLoad=0;
281     maxHeap *h=new maxHeap(numPComputes);
282         
283     Iterator nextCompute;
284     nextCompute.id=0;
285     computeInfo *c = (computeInfo *)
286             p->computeSet->iterator((Iterator *)&nextCompute);
287
288     for(int i=0;i<numPComputes;i++)
289     {
290         if(c->load < opt/2)
291         {
292             totalSmallLoad+=c->load;
293             h->insert((InfoRecord *)c);
294         }
295         nextCompute.id++;
296         c = (computeInfo *)
297             p->computeSet->next((Iterator *)&nextCompute);
298     }
299
300     if(p->backgroundLoad <opt/2)
301     {
302         totalSmallLoad+=p->backgroundLoad;
303     }
304
305     int avalue=0;
306     while(totalSmallLoad > opt/2)
307     {
308         avalue++;
309         InfoRecord *rec=h->deleteMax();
310         totalSmallLoad-=rec->load;
311     }
312     delete h;
313     return avalue;
314 }
315
316 int RefinerApprox::computeB(processorInfo *p,double opt)
317 {
318     int numPComputes=p->computeSet->numElements();
319     double totalLoad=p->load;
320     if(p->backgroundLoad > opt)
321     {
322         if(_lb_debug)
323             CkPrintf("Error in computeB: Background load greater than OPT!\n");
324         return 0;
325     }
326     maxHeap *h=new maxHeap(numPComputes);
327         
328     Iterator nextCompute;
329     nextCompute.id=0;
330     computeInfo *c = (computeInfo *)
331             p->computeSet->iterator((Iterator *)&nextCompute);
332
333     for(int i=0;i<numPComputes;i++)
334     {
335         h->insert((InfoRecord*)c);
336         nextCompute.id++;
337         c = (computeInfo *)
338             p->computeSet->next((Iterator *)&nextCompute);
339     }
340
341     int bvalue=0;
342     while(totalLoad > opt)
343     {
344         bvalue++;
345         InfoRecord *rec=h->deleteMax();
346         totalLoad-=rec->load;
347     }
348
349     delete h;
350     return bvalue;
351 }
352
353 int RefinerApprox::refine(double opt)
354 {
355     int i;
356     if(_lb_debug)CkPrintf("RefinerApprox::refine called with %lf\n",opt);
357     if(opt<averageLoad)
358         return INFTY;
359
360     int numLargeComputes=getNumLargeComputes(opt);
361     if(_lb_debug) CkPrintf("Num of large Computes %d for opt = %10f\n",numLargeComputes,opt);
362     if(numLargeComputes>P)
363         return INFTY;
364     if(getLargestCompute()>opt)
365         return INFTY;
366     //CkPrintf("Not returning INFTY\n");
367
368     //a[i]= min. number of small jobs to be removed so that total size
369     //of remaining small jobs is at most opt/2
370     int *a=new int[P];
371     //b[i]= min. number of jobs to be removed so that total size
372     //of remaining jobs (including large jobs) is at most opt
373     int *b=new int[P];
374     bool *largeFree=new bool[P];
375     Set *largeComputes=new Set();
376     Set *smallComputes=new Set();
377
378     //Step 1: Remove all but one large computes on each node
379     for(i=0;i<P;i++)
380     {
381         computeInfo *smallestLargeCompute=NULL;
382         largeFree[i]=true;
383         Iterator nextCompute;
384         nextCompute.id=0;
385         computeInfo *c = (computeInfo *)
386             processors[i].computeSet->iterator((Iterator *)&nextCompute);
387         while(c)
388         {
389             if(c->load>opt/2)
390             {
391                 largeFree[i]=false;
392                 largeComputes->insert((InfoRecord*)c);
393                 deAssign(c,&(processors[i]));
394                 if (smallestLargeCompute==NULL)
395                 {
396                     smallestLargeCompute=c;
397                 }
398                 else if(smallestLargeCompute->load > c->load)
399                 {
400                     smallestLargeCompute=c;
401                 }
402             }
403             nextCompute.id++;
404             c = (computeInfo *)
405                 processors[i].computeSet->next((Iterator *)&nextCompute);
406         }
407         //Check if processor's fixed load is itself large
408         if(processors[i].backgroundLoad>opt/2)
409         {
410             largeFree[i]=false;
411         }
412         else 
413         {
414             if(smallestLargeCompute)
415             {
416                 assign(smallestLargeCompute,i);
417                 largeComputes->remove((InfoRecord*)smallestLargeCompute);
418             }
419         }
420         if(!largeFree[i]) 
421         {
422             if(_lb_debug)
423                 CkPrintf("Processor %d not LargeFree !\n",i);
424         }
425
426         //Step 2: Calculate a[i] and b[i] for each proc.
427         a[i]=computeA(&processors[i],opt);
428         b[i]=computeB(&processors[i],opt);
429     }
430
431     //Step 3: Select L_t(=numLargeComputes) procs with minimum c[i] (=a[i]-b[i]) values.
432     //Remove a[i] small jobs from each to get small job load at most opt/2
433     minHeap *cHeap=new minHeap(P);
434     for(i=0;i<P;i++)
435     {
436         InfoRecord *ci=new InfoRecord();
437         ci->load=a[i]-b[i];
438         ci->Id=i;
439         cHeap->insert(ci);
440     }
441
442     //Set of largeFree procs created with (small jobs < opt/2)
443     minHeap *largeFreeLightProcs=new minHeap(P);
444
445     for(i=0;i<numLargeComputes;i++)
446     {
447         if(_lb_debug) CkPrintf("Removing a large compute %d\n",i);
448         //Remove biggest a_k computes from L_t procs
449         InfoRecord *cdata= cHeap->deleteMin();
450         Set *smallComputesRemoved= removeBiggestSmallComputes(a[cdata->Id],&(processors[cdata->Id]),opt);
451         if(largeFree[cdata->Id])
452             largeFreeLightProcs->insert((InfoRecord *)&(processors[cdata->Id]));
453
454         // Keep removed small computes in unassigned set for now
455         Iterator nextCompute;
456         nextCompute.id=0;
457         computeInfo *c=(computeInfo *)
458             smallComputesRemoved->iterator((Iterator*)&nextCompute);
459         while(c)
460         {
461             deAssign(c,&(processors[cdata->Id]));
462             if(c->load > opt/2)
463             {
464                 if (_lb_debug) CkPrintf(" Error : Large compute not expected here\n");
465             }
466             else
467             {
468                 smallComputes->insert((InfoRecord *)c);
469             }
470             nextCompute.id++;
471             c = (computeInfo *)
472                 smallComputesRemoved->next((Iterator *)&nextCompute);
473         }
474         delete smallComputesRemoved;
475         delete cdata;
476     }
477
478     //Step 4 :
479     //Remove biggest b computes from P - L_t procs
480     // Assign removed large computes to large free procs created in Step 3
481     // Keep removed small computes unassigned for now.
482     for(i=numLargeComputes;i<P;i++)
483     {
484         //Remove biggest b computes from P - L_t procs
485         InfoRecord *cdata= cHeap->deleteMin();
486         Set *computesRemoved=removeBigComputes(b[cdata->Id],&(processors[cdata->Id]),opt);
487
488         // Assign removed large computes to large free procs created in Step 3
489         // Keep removed small computes unassigned for now.
490         Iterator nextCompute;
491         nextCompute.id=0;
492         computeInfo *c=(computeInfo *)
493             computesRemoved->iterator((Iterator*)&nextCompute);
494         while(c)
495         {
496             deAssign(c,&(processors[cdata->Id]));
497             if(c->load > opt/2)
498             {
499                 processorInfo *targetproc=(processorInfo *)largeFreeLightProcs->deleteMin();
500                 assign(c,targetproc);
501                 largeFree[cdata->Id]=true;
502                 largeFree[targetproc->Id]=false;
503             }
504             else
505             {
506                 smallComputes->insert((InfoRecord *)c);
507             }
508             nextCompute.id++;
509             c = (computeInfo *)
510                 computesRemoved->next((Iterator *)&nextCompute);
511         }
512         delete computesRemoved;
513         delete cdata;
514     }
515     delete cHeap;
516     
517     //Step 5: Arbitrarily assign remaining largeComputes to large-free procs
518
519     Iterator nextCompute;
520     nextCompute.id=0;
521     computeInfo *c=(computeInfo *)
522       largeComputes->iterator((Iterator*)&nextCompute);
523     if(_lb_debug)
524     {
525         if(c) 
526         {
527             CkPrintf("Reassigning Large computes removes in Step 1\n");
528         }
529         else
530         {
531             CkPrintf("No  Large computes removed in Step 1\n");
532         }
533     }
534     while(c)
535     {
536         /*
537         //BUG:: Assign to largeFreeLight Procs instead of largeFree procs
538         while(!(largeFree[j]) && j<P-1)
539         {
540             j++;
541         }
542         if(!largeFree[j])
543         {
544             if(_lb_debug) CkPrintf("Error finding a large free processor in Step 5\n");
545         }
546         assign(c,j);
547         largeFree[j]=false;
548         */
549
550         processorInfo *targetproc=(processorInfo *)largeFreeLightProcs->deleteMin();
551         if(_lb_debug)
552         {
553             if(!targetproc)
554                 CkPrintf("Error finding a large free light proc\n");
555         }
556         assign(c,targetproc);
557         largeFree[targetproc->Id]=false;
558         nextCompute.id++;
559         c = (computeInfo *)
560                 largeComputes->next((Iterator *)&nextCompute);
561     }
562
563     //Step 6: Assign remaining small jobs one by one to least loaded proc
564     minHeap *procsLoad=new minHeap(P);
565     for(i=0;i<P;i++)
566     {
567         procsLoad->insert((InfoRecord *) &(processors[i]) );
568     }
569     nextCompute.id=0;
570     c=(computeInfo *)
571         smallComputes->iterator((Iterator*)&nextCompute);
572     while(c)
573     {
574         processorInfo *leastLoadedP=(processorInfo *)procsLoad->deleteMin();
575         assign(c,leastLoadedP);
576         procsLoad->insert((InfoRecord *)  leastLoadedP);
577         nextCompute.id++;
578         c = (computeInfo *)
579                 smallComputes->next((Iterator *)&nextCompute);
580     }
581
582     delete largeFreeLightProcs;
583     delete procsLoad;
584     delete [] a;
585     delete [] b;
586     delete [] largeFree;
587     delete largeComputes;
588     delete smallComputes;
589     return numMoves();
590
591
592 int RefinerApprox::numMoves()
593 {
594     int nmoves=0;
595     for(int i=0;i<numComputes;i++)
596     {
597         if(computes[i].processor!=computes[i].oldProcessor)
598             nmoves++;
599     }
600     return nmoves;
601 }
602
603 void RefinerApprox::Refine(int count, CentralLB::LDStats* stats, 
604                      int* cur_p, int* new_p, int percentMoves)
605 {
606     
607   if(_lb_debug) CkPrintf("\n\n");
608   if(_lb_debug) CkPrintf("[%d] RefinerApprox strategy\n",CkMyPe());
609   P = count;
610   numComputes = stats->n_objs;
611   computes = new computeInfo[numComputes];
612   processors = new processorInfo[count];
613
614   if(_lb_debug) CkPrintf("Total Number of computes : %d\n",numComputes);
615
616   create(count, stats, cur_p);
617   if(_lb_debug) printStats(0);
618
619   int i;
620   for (i=0; i<numComputes; i++)
621   {
622       if(computes[i].oldProcessor!=-1)
623       //if(false)
624       {
625             assign((computeInfo *) &(computes[i]),
626                    (processorInfo *) &(processors[computes[i].oldProcessor]));
627       }
628       else
629       {
630             assign((computeInfo *) &(computes[i]),
631                    (processorInfo *) &(processors[0]));
632       }
633   }
634   
635   if(_lb_debug) CkPrintf("Total Migratable computes : %d\n\n",numComputes);
636   if(_lb_debug) CkPrintf("Total  processors : %d\n",P);
637   if(_lb_debug) CkPrintf("Total  available processors : %d\n",numAvail);
638
639   removeComputes();
640
641   computeAverage();
642
643   if(_lb_debug) CkPrintf("Avearge load : %lf\n",averageLoad);
644   if(_lb_debug) printStats(0);
645
646  
647   int numAllowedMoves=(int)(percentMoves*numComputes/100.0);
648   if(numAllowedMoves<0)
649     numAllowedMoves=0;
650   if(numAllowedMoves>numComputes)
651     numAllowedMoves=numComputes;
652
653   if(_lb_args.debug())
654   {
655     CkPrintf("Percent of allowed moves = %d\n",percentMoves);
656     CkPrintf("Number of allowed moves = %d\n",numAllowedMoves);
657   }
658   //multirefine(numComputes);
659   multirefine(numAllowedMoves);
660
661   int nmoves = 0;
662
663   //Initialize new_p[i] to cur_p[i]
664   //so that non-migratable computes which
665   //are ignored in the calcuation get their
666   //new_p asssigned same as cur_p
667   for(i=0;i<stats->n_objs;i++)
668   {
669       new_p[i]=cur_p[i];
670   }
671
672
673   for (int pe=0; pe < P; pe++) 
674   {
675       Iterator nextCompute;
676       nextCompute.id = 0;
677       computeInfo *c = (computeInfo *)
678           processors[pe].computeSet->iterator((Iterator *)&nextCompute);
679     
680       while(c) 
681       {
682           new_p[c->Id] = c->processor;
683           if (new_p[c->Id] != cur_p[c->Id]) nmoves++;
684
685           nextCompute.id++;
686           c = (computeInfo *) processors[pe].computeSet->
687                      next((Iterator *)&nextCompute);
688       }
689   }
690   if (_lb_debug) CkPrintf("RefinerApprox: moving %d objects. \n", nmoves);
691   delete [] computes;
692   delete [] processors;
693 }
694
695
696 void  RefinerApprox::printStats(int newStats)
697 {
698         
699     CkPrintf("%Proc#\tLoad\tObjLoad\tBgdLoad\n");
700     for(int i=0;i<P;i++)
701     {
702         CkPrintf("%d\t\t%lf\t%lf\t%lf\n",i,processors[i].load,processors[i].computeLoad,processors[i].backgroundLoad);
703     }
704     
705 }
706