doc: Add serial to list of ci file reserved words
[charm.git] / src / ck-ldb / NeighborCommLB.C
1 /**
2  * \addtogroup CkLdb
3 */
4 /*@{*/
5
6 #include "elements.h"
7 #include "ckheap.h"
8 #include "NeighborCommLB.h"
9 #include "topology.h"
10
11 #define PER_MESSAGE_SEND_OVERHEAD   35e-6
12 #define PER_BYTE_SEND_OVERHEAD      8.5e-9
13 #define PER_MESSAGE_RECV_OVERHEAD   0.0
14 #define PER_BYTE_RECV_OVERHEAD      0.0
15
16 CreateLBFunc_Def(NeighborCommLB, "The neighborhood load balancer with communication")
17
18 NeighborCommLB::NeighborCommLB(const CkLBOptions &opt):NborBaseLB(opt)
19 {
20   lbname = "NeighborCommLB";
21   if (CkMyPe() == 0)
22     CkPrintf("[%d] NeighborCommLB created\n",CkMyPe());
23 }
24
25 LBMigrateMsg* NeighborCommLB::Strategy(NborBaseLB::LDStats* stats, int n_nbrs)
26 {
27 bool _lb_debug=0;
28 bool _lb_debug1=0;
29 bool _lb_debug2=0;
30 #if CMK_LBDB_ON
31   //  CkPrintf("[%d] Strategy starting\n",CkMyPe());
32   // Compute the average load to see if we are overloaded relative
33   // to our neighbors
34   double myload = myStats.total_walltime - myStats.idletime;
35   double avgload = myload;
36   int i;
37   if (_lb_debug) 
38     CkPrintf("[%d] Neighbor Count = %d\n", CkMyPe(), n_nbrs);
39   
40   for(i=0; i < n_nbrs; i++) {
41     // Scale times we need appropriately for relative proc speeds
42     const double scale =  ((double)myStats.pe_speed) 
43       / stats[i].pe_speed;
44
45     stats[i].total_walltime *= scale;
46     stats[i].idletime *= scale;
47
48     avgload += (stats[i].total_walltime - stats[i].idletime);
49   }
50   avgload /= (n_nbrs + 1);
51
52   CkVec<MigrateInfo*> migrateInfo;
53
54   if (_lb_debug) 
55     CkPrintf("[%d] My load is %lf\n", CkMyPe(),myload);
56   if (myload > avgload) {
57     if (_lb_debug1) 
58       CkPrintf("[%d] OVERLOAD My load is %lf average load is %lf\n", CkMyPe(), myload, avgload);
59
60     // First of all, explore the topology and get dimension
61     LBTopology* topo;
62     {
63       LBtopoFn topofn;
64       topofn = LBTopoLookup(_lbtopo);
65       if (topofn == NULL) {
66         char str[1024];
67         CmiPrintf("NeighborCommLB> Fatal error: Unknown topology: %s. Choose from:\n", _lbtopo);
68         printoutTopo();
69         sprintf(str, "NeighborCommLB> Fatal error: Unknown topology: %s", _lbtopo);
70         CmiAbort(str);
71       }
72       topo = topofn(CkNumPes());
73     }
74     int dimension = topo->get_dimension();
75     if (_lb_debug2) 
76       CkPrintf("[%d] Topology dimension = %d\n", CkMyPe(), dimension);
77     if (dimension == -1) {
78       char str[1024];
79       CmiPrintf("NeighborCommLB> Fatal error: Unsupported topology: %s. Only some of the following are supported:\n", _lbtopo);
80       printoutTopo();
81       sprintf(str, "NeighborCommLB> Fatal error: Unsupported topology: %s", _lbtopo);
82       CmiAbort(str);
83     }
84
85     // Position of this processor
86     int *myProc = new int[dimension];
87     topo->get_processor_coordinates(myStats.from_pe, myProc);
88     if (_lb_debug2) {
89       char temp[1000];
90       char* now=temp;
91       sprintf(now, "[%d] Coordinates = [", CkMyPe());
92       now += strlen(now);
93       for(i=0;i<dimension;i++) {
94         sprintf(now, "%d ", myProc[i]); 
95         now +=strlen(now);
96       }
97       sprintf(now, "]\n");
98       now += strlen(now);
99       CkPrintf(temp);
100     }
101
102     // Then calculate the communication center of each object
103     // The communication center is relative to myProc
104     double **commcenter = new double*[myStats.n_objs];
105     double *commamount = new double[myStats.n_objs];
106     if(_lb_debug1) {
107       CkPrintf("[%d] Number of Objs = %d \n", CkMyPe(), myStats.n_objs);
108     }
109     {
110       memset(commamount, 0, sizeof(double)*myStats.n_objs);
111       for(i=0; i<myStats.n_objs;i++) {
112         commcenter[i] = new double[dimension];
113         memset(commcenter[i], 0, sizeof(double)*dimension);
114       }
115
116       //coordinates of procs
117       int *destProc = new int[dimension];
118       int *diff = new int[dimension];
119       
120       //for each comm entry
121       for(i=0; i<myStats.n_comm;i++) {
122         int j;
123         //for each object //TODO use hashtable to accelerate
124         for(j=0; j<myStats.n_objs;j++) 
125           if((myStats.objData[j].handle.omhandle.id == myStats.commData[i].sender.omId)
126               && (myStats.objData[j].handle.id == myStats.commData[i].sender.objId)) {
127             double comm=
128               PER_MESSAGE_SEND_OVERHEAD * myStats.commData[i].messages 
129               + PER_BYTE_SEND_OVERHEAD * myStats.commData[i].bytes;
130             commamount[j] += comm;
131             int dest_pe = myStats.commData[i].receiver.lastKnown();
132             
133             if(dest_pe==-1) continue;
134             
135               topo->get_processor_coordinates(dest_pe, destProc);
136             topo->coordinate_difference(myProc, destProc, diff);
137             int k;
138             for(k=0;k<dimension;k++) {
139               commcenter[j][k] += diff[k] * comm;
140             }
141           }
142       }
143       for(i=0; i<myStats.n_objs;i++) if (commamount[i]>0) {
144         int k;
145         double ratio = 1.0 /commamount[i];
146         for(k=0;k<dimension;k++)
147           commcenter[i][k] *= ratio;
148       } else { //if no communication, set commcenter to myself
149         int k;
150         for(k=0;k<dimension;k++)
151           commcenter[i][k] = 0;
152       }
153       
154       delete [] destProc;
155       delete [] diff;
156     }
157     
158     if(_lb_debug2) {
159       for(i=0;i<myStats.n_objs;i++) {
160         char temp[1000];
161         char* now=temp;
162         sprintf(now, "[%d] Objs [%d] Load = %lf Comm Amount = %lf  ", 
163           CkMyPe(), i, myStats.objData[i].wallTime, commamount[i] );
164         now += strlen(now);
165         sprintf(now, "Comm Center = [");
166         now += strlen(now);
167         int j;
168         for(j=0;j<dimension;j++) {
169           sprintf(now, "%lf ", commcenter[i][j]); 
170           now += strlen(now);
171         }
172         sprintf(now, "]\n");
173         now += strlen(now);
174         CkPrintf(temp);
175       }
176     }
177     
178     // First, build heaps of my objects
179     // Then assign objects to the least loaded other processors until either
180     //   - The smallest remaining object would put me below average, or
181     //   - I only have 1 object left, or
182     //   - The smallest remaining object would put someone else 
183     //     above average
184     // Note: Object can only move towards its communication center!
185
186     // My neighbors: 
187     typedef struct _procInfo{
188       int id;
189       double load;
190       int* difference;
191     } procInfo;
192
193     if(_lb_debug2) {
194       CkPrintf("[%d] Querying neighborhood topology...\n", CkMyPe() );
195     }
196
197     procInfo* neighbors = new procInfo[n_nbrs];
198     {
199       int *destProc = new int[dimension];
200       for(i=0; i < n_nbrs; i++) {
201         neighbors[i].id = stats[i].from_pe;
202         neighbors[i].load = stats[i].total_walltime - stats[i].idletime;
203         neighbors[i].difference = new int[dimension];
204         topo->get_processor_coordinates(neighbors[i].id, destProc);
205         topo->coordinate_difference(myProc, destProc, neighbors[i].difference);
206       }
207       delete[] destProc;
208     }
209     
210     if(_lb_debug2) {
211       CkPrintf("[%d] Building obj heap...\n", CkMyPe() );
212     }
213     // My objects: build heaps
214     maxHeap objs(myStats.n_objs);
215     double totalObjLoad=0.0;
216     for(i=0; i < myStats.n_objs; i++) {
217       InfoRecord* item = new InfoRecord;
218       item->load = myStats.objData[i].wallTime;
219       totalObjLoad += item->load;
220       item->Id = i;
221       objs.insert(item);
222     }
223
224     if(_lb_debug2) {
225       CkPrintf("[%d] Beginning distributing objects...\n", CkMyPe() );
226     }
227
228     // for each object
229     while(objs.numElements()>0) {
230       InfoRecord* obj;
231       obj = objs.deleteMax();
232       int bestDest = -1;
233       for(i = 0; i < n_nbrs; i++)
234         if(neighbors[i].load +obj->load < myload - obj->load && (bestDest==-1 || neighbors[i].load < neighbors[bestDest].load)) {
235           double dotsum=0;
236           int j;
237           for(j=0; j<dimension; j++) dotsum += (commcenter[obj->Id][j] * neighbors[i].difference[j]);
238           if(myload - avgload < totalObjLoad || dotsum>0.5 || (dotsum>0 && objs.numElements()==0) || commamount[obj->Id]==0) {
239             bestDest = i;
240           }
241         }
242       // Best place for the object
243       if(bestDest != -1) {
244         if(_lb_debug1) {
245           CkPrintf("[%d] Obj[%d] will move to Proc[%d]\n", CkMyPe(), obj->Id, neighbors[bestDest].id);
246         }
247         //Migrate it
248         MigrateInfo* migrateMe = new MigrateInfo;
249         migrateMe->obj = myStats.objData[obj->Id].handle;
250         migrateMe->from_pe = myStats.from_pe;
251         migrateMe->to_pe = neighbors[bestDest].id;
252         migrateInfo.insertAtEnd(migrateMe);
253         //Modify loads
254         myload -= obj->load;
255         neighbors[bestDest].load += obj->load;
256       }
257       totalObjLoad -= obj->load;
258       delete obj;
259     }
260
261     if(_lb_debug2) {
262       CkPrintf("[%d] Clearing Up...\n", CkMyPe());
263     }
264
265     for(i=0; i<n_nbrs; i++) {
266       delete[] neighbors[i].difference;
267     }
268     delete[] neighbors;
269     
270     delete[] myProc;
271
272     for(i=0;i<myStats.n_objs;i++) {
273       delete[] commcenter[i];
274     }
275     delete[] commcenter;
276     delete[] commamount;        
277   }  
278
279   if(_lb_debug2) {
280     CkPrintf("[%d] Generating result...\n", CkMyPe());
281   }
282
283   // Now build the message to actually perform the migrations
284   int migrate_count=migrateInfo.length();
285   //  if (migrate_count > 0) {
286   //    CkPrintf("PE %d migrating %d elements\n",CkMyPe(),migrate_count);
287   //  }
288   LBMigrateMsg* msg = new(migrate_count,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
289   msg->n_moves = migrate_count;
290   for(i=0; i < migrate_count; i++) {
291     MigrateInfo* item = (MigrateInfo*) migrateInfo[i];
292     msg->moves[i] = *item;
293     delete item;
294     migrateInfo[i] = 0;
295   }
296
297   return msg;
298 #else
299   return NULL;
300 #endif
301 }
302
303 #include "NeighborCommLB.def.h"
304
305 /*@}*/