doc: Add serial to list of ci file reserved words
[charm.git] / src / ck-ldb / GreedyCommLB.C
1 /**
2  * \addtogroup CkLdb
3 */
4 /*@{*/
5
6 /*
7 Status:
8   * support processor avail bitvector
9   * support nonmigratable attrib
10   * support background load
11
12   rewritten by Gengbin Zheng to use the new load balancer database and hash table;
13   modified to recognize the nonmigratable attrib of an object 
14   by Gengbin Zheng, 7/28/2003
15 */
16
17 #include "elements.h"
18 #include "ckheap.h"
19 #include "GreedyCommLB.h"
20 #include "manager.h"
21
22 CreateLBFunc_Def(GreedyCommLB, "Greedy algorithm which takes communication graph into account")
23
24 void GreedyCommLB::init()
25 {
26     lbname = (char*)"GreedyCommLB";
27     alpha = _lb_args.alpha();
28     beeta = _lb_args.beeta();
29     manager_init();
30 }
31
32 GreedyCommLB::GreedyCommLB(const CkLBOptions &opt): CentralLB(opt)
33 {
34     init();
35     if (CkMyPe() == 0)
36         CkPrintf("[%d] GreedyCommLB created\n",CkMyPe());
37 }
38
39 GreedyCommLB::GreedyCommLB(CkMigrateMessage *m):CentralLB(m) {
40     init();
41 }
42
43 CmiBool GreedyCommLB::QueryBalanceNow(int _step)
44 {
45     //  CkPrintf("[%d] Balancing on step %d\n",CkMyPe(),_step);
46     return CmiTrue;
47 }
48
49 // assign id to processor pe, load including both computation and communication
50 void GreedyCommLB::alloc(int pe,int id,double load){
51     //  CkPrintf("alloc %d ,%d\n",pe,id);
52     assigned_array[id] = 1;
53     processors[pe].load += load;
54 }
55
56 // communication cost when obj id is put on proc pe
57 double GreedyCommLB::compute_com(LDStats* stats, int id, int pe){
58     int j,com_data=0,com_msg=0;
59     double total_time;
60     graph * ptr;
61     
62     ptr = object_graph[id].next;
63     
64     for(j=0;(j<2*nobj)&&(ptr != NULL);j++,ptr=ptr->next){
65         int destObj = ptr->id;
66         if(assigned_array[destObj] == 0)  // this obj has not been assigned
67             continue;
68         if (stats->to_proc[destObj] == pe)    // this obj is assigned to same pe
69             continue;
70         com_data += ptr->data;
71         com_msg += ptr->nmsg;
72     }
73     
74     total_time = alpha*com_msg + beeta*com_data;
75     return total_time;
76 }
77
78 // update all communicating processors after assigning obj id on proc pe
79 void GreedyCommLB::update(LDStats* stats, int id, int pe){
80     graph * ptr = object_graph[id].next;
81     
82     for(int j=0;(j<2*nobj)&&(ptr != NULL);j++,ptr=ptr->next){
83         int destObj = ptr->id;
84         if(assigned_array[destObj] == 0)  // this obj has not been assigned
85             continue;
86         int destPe = stats->to_proc[destObj];
87         if (destPe == pe)                     // this obj is assigned to same pe
88             continue;
89         int com_data = ptr->data;
90         int com_msg = ptr->nmsg;
91         double com_time = alpha*com_msg + beeta*com_data;
92         processors[destPe].load += com_time;
93     }
94 }
95
96 // add comm between obj x and y
97 // two direction
98 void GreedyCommLB::add_graph(int x, int y, int data, int nmsg){
99     graph * ptr, *temp;
100     
101     ptr = &(object_graph[x]);  
102     
103     temp = new graph;
104     
105     temp->id = y;
106     temp->data = data;
107     temp->nmsg = nmsg;
108     temp->next = ptr->next;
109     
110     ptr->next = temp;
111     
112     ptr = &(object_graph[y]);  
113
114     temp = new graph;
115     
116     temp->id = x;
117     temp->data = data;
118     temp->nmsg = nmsg;
119     temp->next = ptr->next;
120     
121     ptr->next = temp;
122 }
123   
124 static void init_data(int *assign, graph * object_graph, int l, int b){
125     for(int obj=0;obj < b;obj++)
126         assign[obj] = 0;
127
128     for(int j=0;j<b;j++){
129         object_graph[j].data = 0;
130         object_graph[j].nmsg = 0;
131         object_graph[j].next = NULL;
132     }
133 }
134
135 void GreedyCommLB::work(LDStats* stats)
136 {
137     int pe,obj,com;
138     ObjectRecord *x;
139     int i;
140     
141     if (_lb_args.debug()) CkPrintf("In GreedyCommLB strategy\n",CkMyPe());
142     npe = stats->nprocs();
143     nobj = stats->n_objs;
144
145     // nmigobj is calculated as the number of migratable objects
146     // ObjectHeap maxh is of size nmigobj
147     nmigobj = stats->n_migrateobjs;
148
149     stats->makeCommHash();
150
151     assigned_array = new int[nobj];
152
153     object_graph = new graph[nobj];
154
155     init_data(assigned_array,object_graph,npe,nobj);
156
157 #define MAXDOUBLE   1e10;
158
159     // processor heap
160     processors = new processorInfo[npe];
161     for (int p=0; p<npe; p++) {
162       processors[p].Id = p;
163       processors[p].backgroundLoad = stats->procs[p].bg_walltime;
164       processors[p].computeLoad = 0;
165       processors[p].pe_speed = stats->procs[p].pe_speed;
166       if (!stats->procs[p].available) {
167         processors[p].load = MAXDOUBLE;
168       }
169       else {
170         processors[p].load = 0;
171         if (!_lb_args.ignoreBgLoad())
172           processors[p].load = processors[p].backgroundLoad;
173       }
174     }
175
176
177     // assign communication graph
178     for(com =0; com< stats->n_comm;com++) {
179          int xcoord=0,ycoord=0;
180          LDCommData &commData = stats->commData[com];
181          if((!commData.from_proc())&&(commData.recv_type()==LD_OBJ_MSG))
182          {
183                 xcoord = stats->getHash(commData.sender);
184                 ycoord = stats->getHash(commData.receiver.get_destObj());
185                 if((xcoord == -1)||(ycoord == -1))
186                     if (_lb_args.ignoreBgLoad() || stats->complete_flag==0) continue;
187                     else CkAbort("Error in search\n");
188                 add_graph(xcoord,ycoord,commData.bytes, commData.messages);
189          }
190          else if (commData.recv_type()==LD_OBJLIST_MSG) {
191                 int nobjs;
192                 LDObjKey *objs = commData.receiver.get_destObjs(nobjs);
193                 xcoord = stats->getHash(commData.sender);
194                 for (int i=0; i<nobjs; i++) {
195                   ycoord = stats->getHash(objs[i]);
196                   if((xcoord == -1)||(ycoord == -1))
197                     if (_lb_args.migObjOnly()) continue;
198                     else CkAbort("Error in search\n");
199 //printf("Multicast: %d => %d %d %d\n", xcoord, ycoord, commData.bytes, commData.messages);
200                   add_graph(xcoord,ycoord,commData.bytes, commData.messages);
201                 }
202          }
203     }
204
205     // only build heap with migratable objects, 
206     // mapping nonmigratable objects to the same processors
207     ObjectHeap maxh(nmigobj+1);
208     for(obj=0; obj < stats->n_objs; obj++) {
209       LDObjData &objData = stats->objData[obj];
210       int onpe = stats->from_proc[obj];
211       if (!objData.migratable) {
212         if (!stats->procs[onpe].available) {
213           CmiAbort("Load balancer is not be able to move a nonmigratable object out of an unavailable processor.\n");
214         }
215         alloc(onpe, obj, objData.wallTime);
216         update(stats, obj, onpe);            // update communication cost on other pes
217       }
218       else {
219         x = new ObjectRecord;
220         x->id = obj;
221         x->pos = obj;
222         x->val = objData.wallTime;
223         x->pe = onpe;
224         maxh.insert(x);
225       }
226     }
227
228     minHeap *lightProcessors = new minHeap(npe);
229     for (i=0; i<npe; i++)
230       if (stats->procs[i].available)
231         lightProcessors->insert((InfoRecord *) &(processors[i]));
232
233     int id,maxid,minpe=0;
234     double temp,total_time,min_temp;
235     // for(pe=0;pe < count;pe++)
236     //  CkPrintf("avail for %d = %d\n",pe,stats[pe].available);
237
238     double *pe_comm = new double[npe];
239     for (int i=0; i<npe; i++) pe_comm[i] = 0.0;
240
241     for(id = 0;id<nmigobj;id++){
242         x  = maxh.deleteMax();
243
244         maxid = x->id;
245
246         processorInfo *donor = (processorInfo *) lightProcessors->deleteMin();
247         CmiAssert(donor);
248         int first_avail_pe = donor->Id;
249         temp = compute_com(stats, maxid, first_avail_pe);
250         min_temp = temp;
251         //total_time = temp + alloc_array[first_avail_pe][nobj];
252         total_time = temp + donor->load; 
253         minpe = first_avail_pe;
254         
255         // search all procs for best
256         // optimization: only search processors that it communicates
257         // and the minimum of all others
258         CkVec<int> commPes;
259         graph * ptr = object_graph[maxid].next;
260     
261         // find out all processors that this obj communicates
262         double commload = 0.0;                  // total comm load
263         for(int com=0;(com<2*nobj)&&(ptr != NULL);com++,ptr=ptr->next){
264           int destObj = ptr->id;
265           if(assigned_array[destObj] == 0)  // this obj has not been assigned
266             continue;
267           int destPe = stats->to_proc[destObj];
268           if(stats->procs[destPe].available == 0) continue;
269           
270           double cload = alpha*ptr->nmsg + beeta*ptr->data;
271           pe_comm[destPe] += cload;
272           commload += cload;
273
274           int exist = 0;
275           for (int pp=0; pp<commPes.size(); pp++)
276             if (destPe == commPes[pp]) { exist=1; break; }    // duplicated
277           if (!exist) commPes.push_back(destPe);
278         }
279
280         int k;
281         for(k = 0; k < commPes.size(); k++){
282             pe = commPes[k];
283             processorInfo *commpe = (processorInfo *) &processors[pe];
284             
285             temp = commload - pe_comm[pe];
286             
287             //CkPrintf("check id = %d, processor = %d,com = %lf, pro = %lf, comp=%lf\n", maxid,pe,temp,alloc_array[pe][nobj],total_time);
288             if(total_time > (temp + commpe->load)){
289                 minpe = pe;
290                 total_time = temp + commpe->load;
291                 min_temp = temp;
292             }
293         }
294         /* CkPrintf("check id = %d, processor = %d, obj = %lf com = %lf, pro = %lf, comp=%lf\n", maxid,minpe,x->load,min_temp,alloc_array[minpe][nobj],total_time); */
295         
296         //    CkPrintf("before 2nd alloc\n");
297         stats->assign(maxid, minpe);
298         
299         alloc(minpe, maxid, x->val + min_temp);
300
301         // now that maxid assigned to minpe, update other pes load
302         update(stats, maxid, minpe);
303
304         // update heap
305         lightProcessors->insert(donor);
306         for(k = 0; k < commPes.size(); k++) {
307             pe = commPes[k];
308             processorInfo *commpe = (processorInfo *) &processors[pe];
309             lightProcessors->update(commpe);
310             pe_comm[pe] = 0.0;                  // clear
311         }
312
313         delete x;
314     }
315     
316     // free up memory
317     delete [] pe_comm;
318
319     delete [] processors;
320     delete [] assigned_array;
321
322     delete lightProcessors;
323
324     for(int oindex= 0; oindex < nobj; oindex++){
325       graph * ptr = &object_graph[oindex];
326       ptr = ptr->next;
327       
328       while(ptr != NULL){
329         graph *cur = ptr;
330         ptr = ptr->next;
331         delete cur;
332       }
333     }
334     delete [] object_graph;
335
336 }
337
338 #include "GreedyCommLB.def.h"
339
340 /*@}*/
341