important changes:
[charm.git] / src / ck-ldb / Comm1LB.C
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 /**
9  * \addtogroup CkLdb
10 */
11 /*@{*/
12
13 #include <charm++.h>
14 #include <stdio.h>
15
16 #if CMK_LBDB_ON
17
18 #include "cklists.h"
19
20 #include "Comm1LB.h"
21
22 #define alpha 35e-6
23 #define beeta 8.5e-9
24
25 #define LOWER_FACTOR 0.33
26 #define UPPER_FACTOR 0.67
27 #define MAX_WEIGHT 5.0
28
29 void CreateComm1LB()
30 {
31   loadbalancer = CProxy_Comm1LB::ckNew();
32 }
33
34 static void lbinit(void) {
35 //        LBSetDefaultCreate(CreateComm1LB);        
36   LBRegisterBalancer("Comm1LB", CreateComm1LB, "another variation of CommLB");
37 }
38
39 #include "Comm1LB.def.h"
40
41 Comm1LB::Comm1LB()
42 {
43   if (CkMyPe() == 0)
44     CkPrintf("[%d] Comm1LB created\n",CkMyPe());
45   lbname = "Comm1LB";
46 }
47
48 CmiBool Comm1LB::QueryBalanceNow(int _step)
49 {
50   //  CkPrintf("[%d] Balancing on step %d\n",CkMyPe(),_step);
51   return CmiTrue;
52 }
53
54 int Comm1LB::search(const LDObjKey &objKey) {
55   const LDObjid &oid = objKey.objID();
56   const LDOMid &mid = objKey.omID();
57   int id, hash;
58   
59   hash = (oid.id[0] | oid.id[1]) % nobj;
60
61   for(id=0;id<nobj;id++){
62     if((translate[htable[(id+hash)%nobj]].objID() == oid)
63        &&(translate[htable[(id+hash)%nobj]].omID().id == mid.id))
64       return htable[(id + hash)%nobj];
65   }
66   //  CkPrintf("not found \n");
67   return -1;
68 }
69
70 void Comm1LB::alloc(int pe , int id, double load, int nmsg, int nbyte){
71   alloc_array[npe][id].load = 1.0;
72   alloc_array[pe][id].load = load;
73   alloc_array[pe][id].nmsg = nmsg;
74   alloc_array[pe][id].nbyte = nbyte;
75   alloc_array[pe][nobj].load += load;
76   alloc_array[pe][nobj].nmsg += nmsg;
77   alloc_array[pe][nobj].nbyte += nbyte;
78 }
79
80 double Comm1LB::compute_cost(int id, int pe, int n_alloc, int &com_msg, int &com_data){
81   int j;
82   double total_cost, com_cost, weight=0.0;
83   graph * ptr;
84   double bound1,bound2;
85
86   bound1 = LOWER_FACTOR * nobj;
87   bound2 = UPPER_FACTOR * nobj;
88
89   if(n_alloc <= (int)bound1)
90     weight = MAX_WEIGHT;
91   else if((n_alloc > (int)bound1)&&(n_alloc < (int)bound2))
92     weight = (bound2 - n_alloc)/(bound2 - bound1) * (MAX_WEIGHT - 1) + 1;
93   else if(n_alloc >= (int)bound2)
94     weight = 1.0;
95
96 //  weight = MAX_WEIGHT;
97   ptr = object_graph[id].next;
98
99   com_msg = 0;
100   com_data = 0;
101   for(j=0;(j<2*nobj)&&(ptr != NULL);j++,ptr=ptr->next){
102     if(alloc_array[npe][ptr->id].load == 0.0)
103       continue;
104     if(alloc_array[pe][ptr->id].load > 0.0)
105       continue;
106     com_data += ptr->data;
107     com_msg += ptr->nmsg;
108   }
109   com_cost = weight * (alpha*(com_msg + alloc_array[pe][nobj].nmsg) + beeta*(com_data + alloc_array[pe][nobj].nbyte));
110 //  CkPrintf("%d, %d \n",com_data,com_msg);
111   total_cost = alloc_array[pe][nobj].load + com_cost;
112   return total_cost;
113 }
114
115 void Comm1LB::add_graph(int x, int y, int data, int nmsg){
116   graph * ptr, *temp;
117
118 //  CkPrintf("Add graph : %d,%d", data, nmsg);
119   ptr = &(object_graph[x]);  
120   for(;ptr->next != NULL; ptr = ptr->next);
121   
122   temp = new graph;
123   
124   temp->id = y;
125   temp->data = data;
126   temp->nmsg = nmsg;
127   temp->next = NULL;
128
129   ptr->next = temp;
130
131   ptr = &(object_graph[y]);  
132   for(;ptr->next != NULL; ptr = ptr->next);
133   
134   temp = new graph;
135   
136   temp->id = x;
137   temp->data = data;
138   temp->nmsg = nmsg;
139   temp->next = NULL;
140
141   ptr->next = temp;
142 }
143   
144 void Comm1LB::make_hash(){
145   int i, hash;
146   LDObjid oid;
147   
148   htable = new int[nobj];
149   for(i=0;i<nobj;i++)
150     htable[i] = -1;
151   
152   for(i=0;i<nobj;i++){
153     oid = translate[i].objID();
154     hash = ((oid.id[0])|(oid.id[1])) % nobj;
155     while(htable[hash] != -1)
156       hash = (hash+1)%nobj;
157     
158     htable[hash] = i;
159   }
160
161 }
162     
163 void init(alloc_struct **a, graph * object_graph, int l, int b){
164   int i,j;
165
166   for(i=0;i<l+1;i++)
167     for(j=0;j<b+1;j++){
168       a[i][j].load = 0.0;
169       a[i][j].nbyte = 0;
170       a[i][j].nmsg = 0;
171     }
172       
173   for(j=0;j<b;j++){
174     object_graph[j].data = 0;
175     object_graph[j].nmsg = 0;
176     object_graph[j].next = NULL;
177   }
178 }
179
180 LBMigrateMsg* Comm1LB::Strategy(CentralLB::LDStats* stats, int count)
181 {
182   int pe,obj,com;
183   double load_pe=0.0,mean_load =0.0;
184   ObjectRecord *x;
185
186   //  CkPrintf("[%d] Comm1LB strategy\n",CkMyPe());
187
188   CkVec<MigrateInfo*> migrateInfo;
189
190   alloc_array = new alloc_struct *[count+1];
191
192   nobj = stats->n_objs;
193   //  CkPrintf("OBJ: Before \n");
194
195   ObjectHeap maxh(nobj+1);
196   for(obj=0; obj < nobj; obj++) {
197       x = new ObjectRecord;
198       x->id = obj;
199       x->pos = obj;
200       x->load = stats->objData[obj].wallTime;
201       x->pe = stats->from_proc[obj];
202       maxh.insert(x);
203   }
204   for(pe=0; pe < count; pe++) {
205      mean_load += stats->procs[pe].total_walltime;
206   }
207   mean_load /= count;
208 /*
209   for(pe=0; pe < count; pe++) {
210     load_pe = 0.0;
211     for(obj=0; obj < stats[pe].n_objs; obj++) {
212       load_pe += stats->objData[obj].data.wallTime;
213       nobj++;
214       x = new ObjectRecord;
215       x->id = nobj -1;
216       x->pos = obj;
217       x->load = stats->objData[obj].data.wallTime;
218       x->pe = pe;
219       maxh.insert(x);
220     }
221     mean_load += load_pe/count;
222 //    CkPrintf("LOAD on %d = %5.3lf\n",pe,load_pe);
223   }
224 */
225
226   npe = count;
227   translate = new LDObjKey[nobj];
228   int objno=0;
229
230   for(obj=0; obj < stats->n_objs; obj++){ 
231       LDObjData &oData = stats->objData[obj];
232       translate[objno].omID() = oData.omID();
233       translate[objno].objID() = oData.objID();
234       objno++;
235   }
236
237   make_hash();
238
239   object_graph = new graph[nobj];
240   
241   for(pe=0;pe <= count;pe++)
242     alloc_array[pe] = new alloc_struct[nobj +1];
243
244   init(alloc_array,object_graph,npe,nobj);
245
246   int xcoord=0,ycoord=0;
247
248   for(com =0; com< stats->n_comm;com++) {
249       LDCommData &commData = stats->commData[com];
250       if((!commData.from_proc())&&(commData.recv_type()==LD_OBJ_MSG)){
251         xcoord = search(commData.sender); 
252         ycoord = search(commData.receiver.get_destObj());
253         if((xcoord == -1)||(ycoord == -1))
254           if (lb_ignoreBgLoad) continue;
255           else CkAbort("Error in search\n");
256         add_graph(xcoord,ycoord,commData.bytes, commData.messages);     
257       }
258   }
259   
260   int id,maxid,spe=0,minpe=0,mpos;
261   double temp_cost,min_cost;
262
263   pe = 0;
264   x  = maxh.deleteMax();
265   maxid = x->id;
266   spe = x->pe;
267   mpos = x->pos;
268   
269   alloc(pe,maxid,stats->objData[mpos].wallTime,0,0);
270   if(pe != spe){
271     //      CkPrintf("**Moving from %d to %d\n",spe,pe);
272     MigrateInfo* migrateMe = new MigrateInfo;
273     migrateMe->obj = stats->objData[mpos].handle;
274     migrateMe->from_pe = spe;
275     migrateMe->to_pe = pe;
276     migrateInfo.insertAtEnd(migrateMe);
277   }
278
279   int out_msg,out_byte,min_msg,min_byte;
280
281   for(id = 1;id<nobj;id++){
282     x  = maxh.deleteMax();   
283
284     maxid = x->id;
285     spe = x->pe;
286     mpos = x->pos;
287
288     for(pe =0; pe < count; pe++)
289       if((alloc_array[pe][nobj].load <= mean_load)||(id >= UPPER_FACTOR*nobj))
290         break;
291
292     temp_cost = compute_cost(maxid,pe,id,out_msg,out_byte);
293     min_cost = temp_cost;
294     minpe = pe;
295     min_msg = out_msg;
296     min_byte = out_byte;
297     pe++;
298     for(; pe < count;pe++){
299       if((alloc_array[pe][nobj].load > mean_load) && (id < UPPER_FACTOR*nobj))
300         continue;
301       temp_cost = compute_cost(maxid,pe,id,out_msg,out_byte);
302       if(min_cost > temp_cost){
303         minpe = pe;
304         min_cost = temp_cost;
305         min_msg = out_msg;
306         min_byte = out_byte;
307       }
308     }
309
310     alloc(minpe,maxid,x->load,min_msg,min_byte);
311
312     if(minpe != spe){
313       //      CkPrintf("**Moving from %d to %d\n",spe,minpe);
314       MigrateInfo *migrateMe = new MigrateInfo;
315       migrateMe->obj = stats->objData[mpos].handle;
316       migrateMe->from_pe = spe;
317       migrateMe->to_pe = minpe;
318       migrateInfo.insertAtEnd(migrateMe);
319     }
320   }
321
322   int migrate_count = migrateInfo.length();
323   LBMigrateMsg* msg = new(&migrate_count,1) LBMigrateMsg;
324   msg->n_moves = migrate_count;
325   for(int i=0; i < migrate_count; i++) {
326     MigrateInfo* item = (MigrateInfo*)migrateInfo[i];
327     msg->moves[i] = *item;
328     delete item;
329     migrateInfo[i] = 0;
330   }
331
332   return msg;
333 }
334
335
336 #endif
337
338 /*@}*/
339
340