9a8d37b0e0091169eedd14e34f637d21c2523c83
[charm.git] / src / ck-ldb / RefineLB.C
1 #include <charm++.h>
2
3 #if CMK_LBDB_ON
4
5 #if CMK_STL_USE_DOT_H
6 #include <deque.h>
7 #include <queue.h>
8 #else
9 #include <deque>
10 #include <queue>
11 #endif
12
13 #include "RefineLB.h"
14 #include "RefineLB.def.h"
15
16 #if CMK_STL_USE_DOT_H
17 template class deque<CentralLB::MigrateInfo>;
18 #else
19 template class std::deque<CentralLB::MigrateInfo>;
20 #endif
21
22 void CreateRefineLB()
23 {
24   CkPrintf("[%d] creating RefineLB %d\n",CkMyPe(),loadbalancer);
25   loadbalancer = CProxy_RefineLB::ckNew();
26   CkPrintf("[%d] created RefineLB %d\n",CkMyPe(),loadbalancer);
27 }
28
29 RefineLB::RefineLB()
30 {
31   CkPrintf("[%d] RefineLB created\n",CkMyPe());
32 }
33
34 CmiBool RefineLB::QueryBalanceNow(int _step)
35 {
36   CkPrintf("[%d] Balancing on step %d\n",CkMyPe(),_step);
37   return CmiTrue;
38 }
39
40 void RefineLB::create(CentralLB::LDStats* stats, int count)
41 {
42   int i,j;
43
44   P = count;
45
46   numComputes = 0;
47   for(j=0; j < P; j++) numComputes+= stats[j].n_objs;
48   computes = new computeInfo[numComputes];
49
50   processors = new processorInfo[count];
51
52   int index = 0;
53   for(j=0; j < count; j++) {
54     processors[j].Id = j;
55     processors[j].backgroundLoad = 0;
56     processors[j].load = processors[j].backgroundLoad;
57     processors[j].computeLoad = 0;
58     processors[j].computeSet = new Set();
59
60     LDObjData *odata = stats[j].objData;
61     const int osz = stats[j].n_objs;  
62     for(i=0; i < osz; i++) {
63 //      computes[index].omID = odata[i].omID;
64 //      computes[index].id = odata[i].id;
65       computes[index].id = odata[i].id;
66       computes[index].handle = odata[i].handle;
67       computes[index].load = odata[i].cpuTime;
68       computes[index].processor = -1;
69       computes[index].oldProcessor = j;
70       index ++;
71     }
72   }
73
74 //  for (i=0; i < numComputes; i++)
75 //      processors[computes[i].oldProcessor].computeLoad += computes[i].load;
76 }
77
78 void RefineLB::assign(computeInfo *c, int processor)
79 {
80    assign(c, &(processors[processor]));
81 }
82
83 void RefineLB::assign(computeInfo *c, processorInfo *p)
84 {
85    c->processor = p->Id;
86    p->computeSet->insert((InfoRecord *) c);
87    p->computeLoad += c->load;
88    p->load = p->computeLoad + p->backgroundLoad;
89 }
90
91 void  RefineLB::deAssign(computeInfo *c, processorInfo *p)
92 {
93    c->processor = -1;
94    p->computeSet->remove(c);
95    p->computeLoad -= c->load;
96    p->load = p->computeLoad + p->backgroundLoad;
97 }
98
99 void RefineLB::computeAverage()
100 {
101    int i;
102    double total = 0;
103    for (i=0; i<numComputes; i++)
104       total += computes[i].load;
105
106    for (i=0; i<P; i++)
107       total += processors[i].backgroundLoad;
108
109    averageLoad = total/P;
110 }
111
112 double RefineLB::computeMax()
113 {
114    int i;
115    double max = processors[0].load;
116    for (i=1; i<P; i++)
117    {
118       if (processors[i].load > max)
119          max = processors[i].load;
120    }
121    return max;
122 }
123
124 int RefineLB::refine()
125 {
126    int finish = 1;
127    maxHeap *heavyProcessors = new maxHeap(P);
128
129    Set *lightProcessors = new Set();
130    int i;
131    for (i=0; i<P; i++)
132    {
133       if (processors[i].load > overLoad*averageLoad)
134       {
135 //CkPrintf("Processor %d is HEAVY: load:%f averageLoad:%f!\n", i, processors[i].load, averageLoad);
136          heavyProcessors->insert((InfoRecord *) &(processors[i]));
137       }
138       else if (processors[i].load < averageLoad)
139       {
140 //CkPrintf("Processor %d is LIGHT: load:%f averageLoad:%f!\n", i, processors[i].load, averageLoad);
141               lightProcessors->insert((InfoRecord *) &(processors[i]));
142       }
143    }
144    int done = 0;
145
146    while (!done)
147    {
148       double bestSize;
149       computeInfo *bestCompute;
150       processorInfo *bestP;
151     
152       processorInfo *donor = (processorInfo *) heavyProcessors->deleteMax();
153       if (!donor) break;
154
155       //find the best pair (c,receiver)
156       Iterator nextProcessor;
157       processorInfo *p = (processorInfo *) 
158              lightProcessors->iterator((Iterator *) &nextProcessor);
159       bestSize = 0;
160       bestP = 0;
161       bestCompute = 0;
162
163       while (p)
164       {
165          Iterator nextCompute;
166          nextCompute.id = 0;
167          computeInfo *c = (computeInfo *) 
168             donor->computeSet->iterator((Iterator *)&nextCompute);
169          // iout << iINFO << "Considering Procsessor : " << p->Id << "\n" << endi;
170          while (c)
171          {
172 //CkPrintf("c->load: %f p->load:%f overLoad*averageLoad:%f \n", c->load, p->load, overLoad*averageLoad);
173             if ( c->load + p->load < overLoad*averageLoad) 
174             {
175                // iout << iINFO << "Considering Compute : " << c->Id << " with load " 
176                //      << c->load << "\n" << endi;
177                if(c->load > bestSize) 
178                {
179                   bestSize = c->load;
180                   bestCompute = c;
181                   bestP = p;
182                }
183             }
184             nextCompute.id++;
185             c = (computeInfo *) donor->computeSet->next((Iterator *)&nextCompute);
186          }
187          p = (processorInfo *) 
188          lightProcessors->next((Iterator *) &nextProcessor);
189       }
190
191       if (bestCompute)
192       {
193 //CkPrintf("Assign: [%d] with load: %f from %d to %d \n", bestCompute->id.id[0], bestCompute->load, donor->Id, bestP->Id);
194         deAssign(bestCompute, donor);      
195         assign(bestCompute, bestP);
196       }
197       else {
198         finish = 0;
199         break;
200       }
201
202       if (bestP->load > averageLoad)
203          lightProcessors->remove(bestP);
204     
205       if (donor->load > overLoad*averageLoad)
206          heavyProcessors->insert((InfoRecord *) donor);
207       else if (donor->load < averageLoad)
208          lightProcessors->insert((InfoRecord *) donor);
209    }  
210    return finish;
211 }
212
213 CLBMigrateMsg* RefineLB::Strategy(CentralLB::LDStats* stats, int count)
214 {
215   CkPrintf("[%d] RefineLB strategy\n",CkMyPe());
216
217   create(stats, count);
218
219   int i;
220   for (i=0; i<numComputes; i++)
221     assign((computeInfo *) &(computes[i]),
222            (processorInfo *) &(processors[computes[i].oldProcessor]));
223
224   computeAverage();
225   overLoad = 1.02;
226
227   refine();
228
229 #if CMK_STL_USE_DOT_H
230   queue<MigrateInfo> migrateInfo;
231 #else
232   std::queue<MigrateInfo> migrateInfo;
233 #endif
234
235   for (int pe=0; pe < P; pe++) {
236     Iterator nextCompute;
237     nextCompute.id = 0;
238     computeInfo *c = (computeInfo *)
239          processors[pe].computeSet->iterator((Iterator *)&nextCompute);
240     while(c)
241     {
242       if (c->oldProcessor != c->processor)
243       {
244 CkPrintf("Migrate: from %d to %d\n", c->oldProcessor, c->processor);
245         MigrateInfo migrateMe;
246         migrateMe.obj = c->handle;
247         migrateMe.from_pe = c->oldProcessor;
248         migrateMe.to_pe = c->processor;
249         migrateInfo.push(migrateMe);
250       }
251
252         nextCompute.id++;
253         c = (computeInfo *) processors[pe].computeSet->next((Iterator *)&nextCompute);
254     }
255   }
256
257 /*
258   for(int pe=0; pe < count; pe++) {
259     CkPrintf("[%d] PE %d : %d Objects : %d Communication\n",
260              CkMyPe(),pe,stats[pe].n_objs,stats[pe].n_comm);
261     for(int obj=0; obj < stats[pe].n_objs; obj++) {
262       const int dest = static_cast<int>(drand48()*(CmiNumPes()-1) + 0.5);
263       if (dest != pe) {
264         CkPrintf("[%d] Obj %d migrating from %d to %d\n",
265                  CkMyPe(),obj,pe,dest);
266         MigrateInfo migrateMe;
267         migrateMe.obj = stats[pe].objData[obj].handle;
268         migrateMe.from_pe = pe;
269         migrateMe.to_pe = dest;
270         migrateInfo.push(migrateMe);
271       }
272     }
273   }
274 */
275
276   int migrate_count=migrateInfo.size();
277   CLBMigrateMsg* msg = new(&migrate_count,1) CLBMigrateMsg;
278   msg->n_moves = migrate_count;
279   for(i=0; i < migrate_count; i++) {
280     msg->moves[i] = migrateInfo.front();
281     migrateInfo.pop();
282   }
283
284   return msg;
285 };
286
287 #endif