c17230961a69b69578329b77e04f3da5b006c3c2
[charm.git] / src / ck-ldb / GridCommRefineLB.C
1 /**************************************************************************
2 ** Greg Koenig (koenig@uiuc.edu)
3 ** March 1, 2006
4 **
5 ** This is GridCommRefineLB.C
6 **
7 ** GridCommRefineLB is a load balancer for the Charm++ load balancing
8 ** framework.  It is designed to work in a Grid computing environment
9 ** consisting of two or more clusters separated by wide-area communication
10 ** links.  Communication between objects within a cluster is assumed to be
11 ** light weight (measured in microseconds) while communication between
12 ** objects on different clusters is assumed to be heavy weight (measured in
13 ** milliseconds).
14 **
15 ** The load balancer examines all communications in a computation and
16 ** attempts to spread the objects that communicate with objects on remote
17 ** clusters evenly across the PEs in the local cluster.  No objects are
18 ** ever migrated across cluster boundaries, they are simply distributed
19 ** as evenly as possible across the PEs in the cluster in which they were
20 ** originally placed.  The idea is that by spreading objects that
21 ** communicate over the wide-area evenly, a relatively small number of
22 ** WAN objects will be mixed with a relatively large number of LAN
23 ** objects, allowing the message-driven characteristics of Charm++ the
24 ** greatest possibility of overlapping the high-cost WAN communication
25 ** with locally-driven work.
26 **
27 ** The load balancer secondarily balances on scaled processor load
28 ** (i.e., a processor that is 2x the speed of another processor in
29 ** the local cluster will get 2x the work) as well as the number of
30 ** LAN objects.
31 **
32 ** This load balancer applies a "refinement" approach which attempts to
33 ** avoid disrupting the object-to-PE mapping by causing large numbers of
34 ** objects to migrate with each load balancing iĀ®nvocation.  This may be
35 ** undesirable in some cases.  (For example, if the vmi-linux "eager
36 ** protocol" is used, eager channels may be pinned between two PEs, and
37 ** migrating objects that communicate heavily with each other onto other
38 ** PEs could actually slow the computationif they no longer communicate
39 ** with each other over an eager channel.)  To prevent this, the balancer
40 ** determines the average number of objects per PE that communicate with
41 ** objects on remote clusters, and then migrates objects away from PEs
42 ** that exceed this average plus some tolerance (e.g., 110% of average).
43 ** This means that only the objects on the most overloaded PEs will be
44 ** migrated.
45 */
46
47 #include "GridCommRefineLB.decl.h"
48
49 #include "GridCommRefineLB.h"
50 #include "manager.h"
51
52 CreateLBFunc_Def (GridCommRefineLB, "Grid communication load balancer (refines object mapping within each cluster)");
53
54
55
56 /**************************************************************************
57 **
58 */
59 GridCommRefineLB::GridCommRefineLB (const CkLBOptions &opt) : CentralLB (opt)
60 {
61   char *value;
62
63
64   lbname = (char *) "GridCommRefineLB";
65
66   if (CkMyPe() == 0) {
67     CkPrintf ("[%d] GridCommRefineLB created.\n", CkMyPe());
68   }
69
70   if (value = getenv ("CK_LDB_GRIDCOMMREFINELB_TOLERANCE")) {
71     CK_LDB_GridCommRefineLB_Tolerance = atof (value);
72   } else {
73     CK_LDB_GridCommRefineLB_Tolerance = CK_LDB_GRIDCOMMREFINELB_TOLERANCE;
74   }
75
76   manager_init ();
77 }
78
79
80
81 /**************************************************************************
82 **
83 */
84 GridCommRefineLB::GridCommRefineLB (CkMigrateMessage *msg) : CentralLB (msg)
85 {
86   char *value;
87
88
89   lbname = (char *) "GridCommRefineLB";
90
91   if (value = getenv ("CK_LDB_GRIDCOMMREFINELB_TOLERANCE")) {
92     CK_LDB_GridCommRefineLB_Tolerance = atof (value);
93   } else {
94     CK_LDB_GridCommRefineLB_Tolerance = CK_LDB_GRIDCOMMREFINELB_TOLERANCE;
95   }
96
97   manager_init ();
98 }
99
100
101
102 /**************************************************************************
103 ** The Charm++ load balancing framework invokes this method to determine
104 ** whether load balancing can be performed at a specified time.
105 */
106 CmiBool GridCommRefineLB::QueryBalanceNow (int step)
107 {
108   if (_lb_args.debug() >= 3) {
109     CkPrintf ("[%d] GridCommRefineLB is balancing on step %d.\n", CkMyPe(), step);
110   }
111
112   return (CmiTrue);
113 }
114
115
116
117 /**************************************************************************
118 ** The vmi-linux machine layer incorporates the idea that PEs are located
119 ** within identifiable clusters.  This information can be supplied by the
120 ** user or can be probed automatically by the machine layer.  The exposed
121 ** API call CmiGetCluster() returns the integer cluster number for a
122 ** specified PE or -1 if the information is unknown.
123 **
124 ** For machine layers other than vmi-linux, simply return the constant 0.
125 ** GridCommRefineLB will assume a single-cluster computation and will
126 ** balance on the scaled processor load and number of LAN messages.
127 */
128 int GridCommRefineLB::Get_Cluster (int pe)
129 {
130 #if CONVERSE_VERSION_VMI
131   return (CmiGetCluster (pe));
132 #else
133   return (0);
134 #endif
135 }
136
137
138
139 /**************************************************************************
140 ** This method locates the maximum WAN object in terms of number of
141 ** messages that traverse a wide-area connection.  The search is
142 ** constrained to objects on the specified PE.
143 **
144 ** The method returns -1 if no matching object is found.
145 */
146 int GridCommRefineLB::Find_Maximum_WAN_Object (int pe)
147 {
148   int i;
149   int max_index;
150   int max_wan_msgs;
151
152
153   max_index = -1;
154   max_wan_msgs = -1;
155
156   for (i = 0; i < Num_Objects; i++) {
157     if ((&Object_Data[i])->from_pe == pe) {
158       if ((&Object_Data[i])->migratable) {
159         if ((&Object_Data[i])->num_wan_msgs > max_wan_msgs) {
160           max_index = i;
161           max_wan_msgs = (&Object_Data[i])->num_wan_msgs;
162         }
163       }
164     }
165   }
166
167   return (max_index);
168 }
169
170
171
172 /**************************************************************************
173 ** This method locates the minimum WAN PE in terms of number of objects
174 ** that communicate with objects across a wide-area connection.  The search
175 ** is constrained to PEs within the specified cluster.
176 **
177 ** In the event of a "tie" (i.e., the number of WAN objects on a candidate
178 ** PE is equal to the minimum number of WAN objects discovered so far) the
179 ** tie is broken by considering the scaled CPU loads on the PEs.  The PE
180 ** with the smaller scaled load is the better candidate.  In the event of
181 ** a secondary tie, the secondary tie is broken by considering the number
182 ** of LAN objects on the two PEs.
183 **
184 ** The method returns -1 if no matching PE is found.
185 */
186 int GridCommRefineLB::Find_Minimum_WAN_PE (int cluster)
187 {
188   int i;
189   int min_index;
190   int min_wan_objs;
191
192
193   min_index = -1;
194   min_wan_objs = INT_MAX;
195
196   for (i = 0; i < Num_PEs; i++) {
197     if (((&PE_Data[i])->available) && ((&PE_Data[i])->cluster == cluster)) {
198       if ((&PE_Data[i])->num_wan_objs < min_wan_objs) {
199         min_index = i;
200         min_wan_objs = (&PE_Data[i])->num_wan_objs;
201       } else if (((&PE_Data[i])->num_wan_objs == min_wan_objs) &&
202                  ((&PE_Data[i])->scaled_load < (&PE_Data[min_index])->scaled_load)) {
203         min_index = i;
204         min_wan_objs = (&PE_Data[i])->num_wan_objs;
205       } else if (((&PE_Data[i])->num_wan_objs == min_wan_objs) &&
206                  ((&PE_Data[i])->scaled_load == (&PE_Data[min_index])->scaled_load) &&
207                  ((&PE_Data[i])->num_lan_objs < (&PE_Data[min_index])->num_lan_objs)) {
208         min_index = i;
209         min_wan_objs = (&PE_Data[i])->num_wan_objs;
210       }
211     }
212   }
213
214   return (min_index);
215 }
216
217
218
219 /**************************************************************************
220 ** This method removes target_object from target_pe.  The data structure
221 ** entry for target_pe is updated appropriately with measurements from
222 ** target_object.
223 */
224 void GridCommRefineLB::Remove_Object_From_PE (int target_object, int target_pe)
225 {
226   (&Object_Data[target_object])->to_pe = -1;
227
228   (&PE_Data[target_pe])->num_objs -= 1;
229
230   if ((&Object_Data[target_object])->num_lan_msgs > 0) {
231     (&PE_Data[target_pe])->num_lan_objs -= 1;
232     (&PE_Data[target_pe])->num_lan_msgs -= (&Object_Data[target_object])->num_lan_msgs;
233   }
234
235   if ((&Object_Data[target_object])->num_wan_msgs > 0) {
236     (&PE_Data[target_pe])->num_wan_objs -= 1;
237     (&PE_Data[target_pe])->num_wan_msgs -= (&Object_Data[target_object])->num_wan_msgs;
238   }
239
240   (&PE_Data[target_pe])->scaled_load -= (&Object_Data[target_object])->load / (&PE_Data[target_pe])->relative_speed;
241 }
242
243
244
245 /**************************************************************************
246 ** This method assigns target_object to target_pe.  The data structure
247 ** entry for target_pe is updated appropriately with measurements from
248 ** target_object.
249 */
250 void GridCommRefineLB::Assign_Object_To_PE (int target_object, int target_pe)
251 {
252   (&Object_Data[target_object])->to_pe = target_pe;
253
254   (&PE_Data[target_pe])->num_objs += 1;
255
256   if ((&Object_Data[target_object])->num_lan_msgs > 0) {
257     (&PE_Data[target_pe])->num_lan_objs += 1;
258     (&PE_Data[target_pe])->num_lan_msgs += (&Object_Data[target_object])->num_lan_msgs;
259   }
260
261   if ((&Object_Data[target_object])->num_wan_msgs > 0) {
262     (&PE_Data[target_pe])->num_wan_objs += 1;
263     (&PE_Data[target_pe])->num_wan_msgs += (&Object_Data[target_object])->num_wan_msgs;
264   }
265
266   (&PE_Data[target_pe])->scaled_load += (&Object_Data[target_object])->load / (&PE_Data[target_pe])->relative_speed;
267 }
268
269
270
271 /**************************************************************************
272 ** The Charm++ load balancing framework invokes this method to cause the
273 ** load balancer to migrate objects to "better" PEs.
274 */
275 void GridCommRefineLB::work (CentralLB::LDStats *stats, int count)
276 {
277   int i;
278   int j;
279   CmiBool available;
280   CmiBool all_pes_mapped;
281   int max_cluster;
282   int min_speed;
283   int send_object;
284   int send_pe;
285   int send_cluster;
286   int recv_object;
287   int recv_pe;
288   int recv_cluster;
289   int num_cluster_pes;
290   int num_wan_objs;
291   int avg_wan_objs;
292   int target_object;
293   int target_pe;
294   LDCommData *com_data;
295
296
297   if (_lb_args.debug() >= 1) {
298     CkPrintf ("[%d] GridCommRefineLB is working.\n", CkMyPe());
299   }
300
301   // Since this load balancer looks at communications data, it must call stats->makeCommHash().
302   stats->makeCommHash ();
303
304   // Initialize object variables for the number of PEs and number of objects.
305   Num_PEs = count;
306   Num_Objects = stats->n_objs;
307
308   if (_lb_args.debug() >= 1) {
309     CkPrintf ("[%d] GridCommRefineLB is examining %d PEs and %d objects.\n", CkMyPe(), Num_PEs, Num_Objects);
310   }
311
312   // Instantiate and initialize the PE_Data[] data structure.
313   //
314   // While doing this...
315   //    - ensure that there is at least one available PE
316   //    - ensure that all PEs are mapped to a cluster
317   //    - determine the maximum cluster number (gives the number of clusters)
318   //    - determine the minimum speed PE (used to compute relative PE speeds)
319   PE_Data = new PE_Data_T[Num_PEs];
320
321   available = CmiFalse;
322   all_pes_mapped = CmiTrue;
323   max_cluster = -1;
324   min_speed = INT_MAX;
325
326   for (i = 0; i < Num_PEs; i++) {
327     (&PE_Data[i])->available      = stats->procs[i].available;
328     (&PE_Data[i])->cluster        = Get_Cluster (i);
329     (&PE_Data[i])->num_objs       = 0;
330     (&PE_Data[i])->num_lan_objs   = 0;
331     (&PE_Data[i])->num_lan_msgs   = 0;
332     (&PE_Data[i])->num_wan_objs   = 0;
333     (&PE_Data[i])->num_wan_msgs   = 0;
334     (&PE_Data[i])->relative_speed = 0.0;
335     (&PE_Data[i])->scaled_load    = 0.0;
336
337     available |= (&PE_Data[i])->available;
338
339     all_pes_mapped &= ((&PE_Data[i])->cluster >= 0);
340
341     if ((&PE_Data[i])->cluster > max_cluster) {
342       max_cluster = (&PE_Data[i])->cluster;
343     }
344
345     if (stats->procs[i].pe_speed < min_speed) {
346       min_speed = stats->procs[i].pe_speed;
347     }
348   }
349
350   // If at least one available PE does not exist, return from load balancing.
351   if (!available) {
352     if (_lb_args.debug() >= 1) {
353       CkPrintf ("[%d] GridCommRefineLB finds no available PEs -- no balancing done.\n", CkMyPe());
354     }
355
356     delete [] PE_Data;
357
358     return;
359   }
360
361   // If not all PEs are mapped to a cluster, return from load balancing.
362   if (!all_pes_mapped) {
363     if (_lb_args.debug() >= 1) {
364       CkPrintf ("[%d] GridCommRefineLB finds incomplete PE cluster map -- no balancing done.\n", CkMyPe());
365     }
366
367     delete [] PE_Data;
368
369     return;
370   }
371
372   // The number of clusters is equal to the maximum cluster number plus one.
373   Num_Clusters = max_cluster + 1;
374
375   if (_lb_args.debug() >= 1) {
376     CkPrintf ("[%d] GridCommRefineLB finds %d clusters.\n", CkMyPe(), Num_Clusters);
377   }
378
379   // Compute the relative PE speeds.
380   // Also add background CPU time to each PE's scaled load.
381   for (i = 0; i < Num_PEs; i++) {
382     (&PE_Data[i])->relative_speed = (double) (stats->procs[i].pe_speed / min_speed);
383
384     (&PE_Data[i])->scaled_load += stats->procs[i].bg_cputime;
385   }
386
387   // Initialize the Object_Data[] data structure.
388   Object_Data = new Object_Data_T[Num_Objects];
389
390   for (i = 0; i < Num_Objects; i++) {
391     (&Object_Data[i])->migratable   = (&stats->objData[i])->migratable;
392     (&Object_Data[i])->cluster      = Get_Cluster (stats->from_proc[i]);
393     (&Object_Data[i])->from_pe      = stats->from_proc[i];
394     (&Object_Data[i])->to_pe        = stats->from_proc[i];
395     (&Object_Data[i])->num_lan_msgs = 0;
396     (&Object_Data[i])->num_wan_msgs = 0;
397     (&Object_Data[i])->load         = (&stats->objData[i])->wallTime;
398
399     (&PE_Data[(&Object_Data[i])->from_pe])->num_objs += 1;
400     (&PE_Data[(&Object_Data[i])->from_pe])->scaled_load += (&Object_Data[i])->load;
401   }
402
403   // Examine all object-to-object messages for intra-cluster and inter-cluster communications.
404   for (i = 0; i < stats->n_comm; i++) {
405     com_data = &(stats->commData[i]);
406     if ((!com_data->from_proc()) && (com_data->recv_type() == LD_OBJ_MSG)) {
407       send_object = stats->getHash (com_data->sender);
408       recv_object = stats->getHash (com_data->receiver.get_destObj());
409
410       send_pe = (&Object_Data[send_object])->from_pe;
411       recv_pe = (&Object_Data[recv_object])->from_pe;
412
413       send_cluster = Get_Cluster (send_pe);
414       recv_cluster = Get_Cluster (recv_pe);
415
416       if (send_cluster == recv_cluster) {
417         (&Object_Data[send_object])->num_lan_msgs += com_data->messages;
418       } else {
419         (&Object_Data[send_object])->num_wan_msgs += com_data->messages;
420       }
421     }
422   }
423
424   // Increment the number of LAN and WAN objects and messages for each PE.
425   for (i = 0; i < Num_Objects; i++) {
426     if ((&Object_Data[i])->num_lan_msgs > 0) {
427       (&PE_Data[(&Object_Data[i])->from_pe])->num_lan_objs += 1;
428       (&PE_Data[(&Object_Data[i])->from_pe])->num_lan_msgs += (&Object_Data[i])->num_lan_msgs;
429     }
430
431     if ((&Object_Data[i])->num_wan_msgs > 0) {
432       (&PE_Data[(&Object_Data[i])->from_pe])->num_wan_objs += 1;
433       (&PE_Data[(&Object_Data[i])->from_pe])->num_wan_msgs += (&Object_Data[i])->num_wan_msgs;
434     }
435   }
436
437   // Map objects to PEs in each cluster.
438   for (i = 0; i < Num_Clusters; i++) {
439     // Compute average number of objects per PE for this cluster.
440     num_cluster_pes = 0;
441     num_wan_objs = 0;
442     for (j = 0; j < Num_PEs; j++) {
443       if (i == (&PE_Data[j])->cluster) {
444         num_cluster_pes += 1;
445         num_wan_objs += (&PE_Data[j])->num_wan_objs;
446       }
447     }
448     avg_wan_objs = num_wan_objs / num_cluster_pes;
449
450     // Move objects away from PEs that exceed the average.
451     for (j = 0; j < Num_PEs; j++) {
452       if (i == (&PE_Data[j])->cluster) {
453         while ((&PE_Data[j])->num_wan_objs > (avg_wan_objs * CK_LDB_GridCommRefineLB_Tolerance)) {
454           target_object = Find_Maximum_WAN_Object (j);
455           target_pe = Find_Minimum_WAN_PE (i);
456
457           if ((target_object == -1) || (target_pe == -1)) {
458             break;
459           }
460
461           Remove_Object_From_PE (target_object, j);
462           Assign_Object_To_PE (target_object, target_pe);
463         }
464       }
465     }
466   }
467
468   // Make the assignment of objects to PEs in the load balancer framework.
469   for (i = 0; i < Num_Objects; i++) {
470     stats->to_proc[i] = (&Object_Data[i])->to_pe;
471
472     if (_lb_args.debug() >= 3) {
473       CkPrintf ("[%d] GridCommRefineLB migrates object %d from PE %d to PE %d.\n", CkMyPe(), i, stats->from_proc[i], stats->to_proc[i]);
474     } else if (_lb_args.debug() >= 2) {
475       if (stats->to_proc[i] != stats->from_proc[i]) {
476         CkPrintf ("[%d] GridCommRefineLB migrates object %d from PE %d to PE %d.\n", CkMyPe(), i, stats->from_proc[i], stats->to_proc[i]);
477       }
478     }
479   }
480
481   // Free memory.
482   delete [] Object_Data;
483   delete [] PE_Data;
484 }
485
486 #include "GridCommRefineLB.def.h"