These modifications to GridCommLB allow the balancer to automatically
[charm.git] / src / ck-ldb / GridCommLB.C
1 /**************************************************************************
2 ** Greg Koenig (koenig@uiuc.edu)
3 ** November 4, 2004
4 **
5 ** This is GridCommLB.C
6 **
7 ** GridCommLB is a load balancer for the Charm++ load balancing framework.
8 ** It is designed to work in a Grid computing environment consisting of
9 ** two or more clusters separated by wide-area communication links.
10 ** Communication between objects within a cluster is assumed to be light
11 ** weight (measured in microseconds) while communication between objects
12 ** on different clusters is assumed to be heavy weight (measured in
13 ** milliseconds).
14 **
15 ** The load balancer examines all communications in a computation and
16 ** attempts to spread the objects that communicate to objects on remote
17 ** clusters evenly across the PEs in the local cluster.  No objects are
18 ** ever migrated across cluster boundaries, they are simply distributed
19 ** as evenly as possible across the PEs in the cluster in which they were
20 ** originally placed.  The idea is that by spreading objects that
21 ** communicate over the wide-area evenly, a relatively small number of
22 ** WAN objects will be mixed with a relatively large number of LAN
23 ** objects, allowing the message-driven characteristics of Charm++ the
24 ** greatest possibility of overlapping the high-cost WAN communication
25 ** with locally-driven work.
26 **
27 ** The load balancer secondarily balances on scaled processor load
28 ** (i.e., a processor that is 2x the speed of another processor in
29 ** the local cluster will get 2x the work) as well as the number of
30 ** LAN objects.
31 **
32 ** This load balancer can severely disrupt the object-to-PE mapping by
33 ** causing large numbers of objects to migrate with each load balancing
34 ** invocation.  This may be undesirable in some cases.  (For example, if
35 ** the vmi-linux "eager protocol" is used, eager channels may be pinned
36 ** between two PEs, and migrating objects that communicate heavily with
37 ** each other onto other PEs could actually slow the computationif they
38 ** no longer communicate with each other over an eager channel.)
39 */
40
41 #include "GridCommLB.decl.h"
42
43 #include "GridCommLB.h"
44 #include "manager.h"
45
46 CreateLBFunc_Def (GridCommLB, "Grid communication load balancer (evenly distribute objects across each cluster)");
47
48
49
50 /**************************************************************************
51 **
52 */
53 GridCommLB::GridCommLB (const CkLBOptions &opt) : CentralLB (opt)
54 {
55   lbname = (char *) "GridCommLB";
56
57   if (CkMyPe() == 0) {
58     CkPrintf ("[%d] GridCommLB created.\n", CkMyPe());
59   }
60
61   manager_init ();
62 }
63
64
65
66 /**************************************************************************
67 **
68 */
69 GridCommLB::GridCommLB (CkMigrateMessage *msg) : CentralLB (msg)
70 {
71   lbname = (char *) "GridCommLB";
72
73   manager_init ();
74 }
75
76
77
78 /**************************************************************************
79 ** The Charm++ load balancing framework invokes this method to determine
80 ** whether load balancing can be performed at a specified time.
81 */
82 CmiBool GridCommLB::QueryBalanceNow (int step)
83 {
84   if (_lb_args.debug() >= 1) {
85     CkPrintf ("[%d] GridCommLB is balancing on step %d.\n", CkMyPe(), step);
86   }
87
88   return (CmiTrue);
89 }
90
91
92
93 /**************************************************************************
94 ** The vmi-linux machine layer incorporates the idea that PEs are located
95 ** within identifiable clusters.  This information can be supplied by the
96 ** user or can be probed automatically by the machine layer.  The exposed
97 ** API call CmiGetCluster() returns the integer cluster number for a
98 ** specified PE or -1 if the information is unknown.
99 **
100 ** For machine layers other than vmi-linux, simply return the constant 0.
101 ** GridCommLB will assume a single-cluster computation and will balance
102 ** on the scaled processor load and number of LAN messages.
103 */
104 int GridCommLB::Get_Cluster (int pe)
105 {
106 #if CONVERSE_VERSION_VMI
107   return (CmiGetCluster (pe));
108 #else
109   return (0);
110 #endif
111 }
112
113
114
115 /**************************************************************************
116 ** This method locates the maximum WAN object in terms of number of
117 ** messages that traverse a wide-area connection.  The search is
118 ** constrained to objects within the specified cluster that have not yet
119 ** been mapped (balanced) to a PE.
120 **
121 ** The method returns -1 if no matching object is found.
122 */
123 int GridCommLB::Find_Maximum_WAN_Object (int cluster)
124 {
125   int i;
126   int max_index;
127   int max_wan_msgs;
128
129
130   max_index = -1;
131   max_wan_msgs = -1;
132
133   for (i = 0; i < Num_Objects; i++) {
134     if ((&Object_Data[i])->cluster == cluster) {
135       if ((&Object_Data[i])->to_pe == -1) {
136         if ((&Object_Data[i])->num_wan_msgs > max_wan_msgs) {
137           max_index = i;
138           max_wan_msgs = (&Object_Data[i])->num_wan_msgs;
139         }
140       }
141     }
142   }
143
144   return (max_index);
145 }
146
147
148
149 /**************************************************************************
150 ** This method locates the minimum WAN PE in terms of number of objects
151 ** that communicate with objects across a wide-area connection.  The search
152 ** is constrained to PEs within the specified cluster.
153 **
154 ** In the event of a "tie" (i.e., the number of WAN objects on a candidate
155 ** PE is equal to the minimum number of WAN objects discovered so far) the
156 ** tie is broken by considering the scaled CPU loads on the PEs.  The PE
157 ** with the smaller scaled load is the better candidate.  In the event of
158 ** a secondary tie, the secondary tie is broken by considering the number
159 ** of LAN objects on the two PEs.
160 **
161 ** The method returns -1 if no matching PE is found.
162 */
163 int GridCommLB::Find_Minimum_WAN_PE (int cluster)
164 {
165   int i;
166   int min_index;
167   int min_wan_objs;
168
169
170   min_index = -1;
171   min_wan_objs = INT_MAX;
172
173   for (i = 0; i < Num_PEs; i++) {
174     if (((&PE_Data[i])->available) && ((&PE_Data[i])->cluster == cluster)) {
175       if ((&PE_Data[i])->num_wan_objs < min_wan_objs) {
176         min_index = i;
177         min_wan_objs = (&PE_Data[i])->num_wan_objs;
178       } else if (((&PE_Data[i])->num_wan_objs == min_wan_objs) &&
179                  ((&PE_Data[i])->scaled_load < (&PE_Data[min_index])->scaled_load)) {
180         min_index = i;
181         min_wan_objs = (&PE_Data[i])->num_wan_objs;
182       } else if (((&PE_Data[i])->num_wan_objs == min_wan_objs) &&
183                  ((&PE_Data[i])->scaled_load == (&PE_Data[min_index])->scaled_load) &&
184                  ((&PE_Data[i])->num_lan_objs < (&PE_Data[min_index])->num_lan_objs)) {
185         min_index = i;
186         min_wan_objs = (&PE_Data[i])->num_wan_objs;
187       }
188     }
189   }
190
191   return (min_index);
192 }
193
194
195
196 /**************************************************************************
197 ** This method assigns target_object to target_pe.  The data structure
198 ** entry for target_pe is updated appropriately with measurements from
199 ** target_object.  This updated information is considered when placing
200 ** successive objects onto PEs.
201 */
202 void GridCommLB::Assign_Object_To_PE (int target_object, int target_pe)
203 {
204   (&Object_Data[target_object])->to_pe = target_pe;
205
206   (&PE_Data[target_pe])->num_objs += 1;
207
208   if ((&Object_Data[target_object])->num_lan_msgs > 0) {
209     (&PE_Data[target_pe])->num_lan_objs += 1;
210     (&PE_Data[target_pe])->num_lan_msgs += (&Object_Data[target_object])->num_lan_msgs;
211   }
212
213   if ((&Object_Data[target_object])->num_wan_msgs > 0) {
214     (&PE_Data[target_pe])->num_wan_objs += 1;
215     (&PE_Data[target_pe])->num_wan_msgs += (&Object_Data[target_object])->num_wan_msgs;
216   }
217
218   (&PE_Data[target_pe])->scaled_load += (&Object_Data[target_object])->load / (&PE_Data[target_pe])->relative_speed;
219 }
220
221
222
223 /**************************************************************************
224 ** The Charm++ load balancing framework invokes this method to cause the
225 ** load balancer to migrate objects to "better" PEs.
226 */
227 void GridCommLB::work (CentralLB::LDStats *stats, int count)
228 {
229   int i;
230   CmiBool available;
231   CmiBool all_pes_mapped;
232   int max_cluster;
233   int min_speed;
234   int send_object;
235   int send_pe;
236   int send_cluster;
237   int recv_object;
238   int recv_pe;
239   int recv_cluster;
240   int target_object;
241   int target_pe;
242   LDCommData *com_data;
243
244
245   if (_lb_args.debug() >= 1) {
246     CkPrintf ("[%d] GridCommLB is working...\n", CkMyPe());
247   }
248
249   // Since this load balancer looks at communications data, it must initialize the CommHash.
250   stats->makeCommHash ();
251
252   // Initialize object variables for the number of PEs and number of objects.
253   Num_PEs = count;
254   Num_Objects = stats->n_objs;
255
256   if (_lb_args.debug() >= 1) {
257     CkPrintf ("[%d] GridCommLB is examining %d PEs and %d objects.\n", CkMyPe(), Num_PEs, Num_Objects);
258   }
259
260   // Instantiate and initialize the PE_Data[] data structure.
261   //
262   // While doing this...
263   //    - ensure that there is at least one available PE
264   //    - ensure that all PEs are mapped to a cluster
265   //    - determine the maximum cluster number (gives the number of clusters)
266   //    - determine the minimum speed PE (used to compute relative PE speeds)
267   PE_Data = new PE_Data_T[Num_PEs];
268
269   available = CmiFalse;
270   all_pes_mapped = CmiTrue;
271   max_cluster = -1;
272   min_speed = INT_MAX;
273
274   for (i = 0; i < Num_PEs; i++) {
275     (&PE_Data[i])->available      = stats->procs[i].available;
276     (&PE_Data[i])->cluster        = Get_Cluster (i);
277     (&PE_Data[i])->num_objs       = 0;
278     (&PE_Data[i])->num_lan_objs   = 0;
279     (&PE_Data[i])->num_lan_msgs   = 0;
280     (&PE_Data[i])->num_wan_objs   = 0;
281     (&PE_Data[i])->num_wan_msgs   = 0;
282     (&PE_Data[i])->relative_speed = 0.0;
283     (&PE_Data[i])->scaled_load    = 0.0;
284
285     available |= (&PE_Data[i])->available;
286
287     all_pes_mapped &= ((&PE_Data[i])->cluster >= 0);
288
289     if ((&PE_Data[i])->cluster > max_cluster) {
290       max_cluster = (&PE_Data[i])->cluster;
291     }
292
293     if (stats->procs[i].pe_speed < min_speed) {
294       min_speed = stats->procs[i].pe_speed;
295     }
296   }
297
298   // If at least one available PE does not exist, return from load balancing.
299   if (!available) {
300     if (_lb_args.debug() >= 1) {
301       CkPrintf ("[%d] GridCommLB finds no available PEs -- no balancing done.\n", CkMyPe());
302     }
303
304     delete [] PE_Data;
305
306     return;
307   }
308
309   // If not all PEs are mapped to a cluster, return from load balancing.
310   if (!all_pes_mapped) {
311     if (_lb_args.debug() >= 1) {
312       CkPrintf ("[%d] GridCommLB finds incomplete PE cluster map -- no balancing done.\n", CkMyPe());
313     }
314
315     delete [] PE_Data;
316
317     return;
318   }
319
320   // The number of clusters is equal to the maximum cluster number plus one.
321   Num_Clusters = max_cluster + 1;
322
323   if (_lb_args.debug() >= 1) {
324     CkPrintf ("[%d] GridCommLB finds %d clusters.\n", CkMyPe(), Num_Clusters);
325   }
326
327   // Compute the relative PE speeds.
328   // Also add background CPU time to each PE's scaled load.
329   for (i = 0; i < Num_PEs; i++) {
330     (&PE_Data[i])->relative_speed = (double) (stats->procs[i].pe_speed / min_speed);
331
332     (&PE_Data[i])->scaled_load += stats->procs[i].bg_cputime;
333   }
334
335   // Initialize the Object_Data[] data structure.
336   Object_Data = new Object_Data_T[Num_Objects];
337
338   for (i = 0; i < Num_Objects; i++) {
339     (&Object_Data[i])->migratable   = (&stats->objData[i])->migratable;
340     (&Object_Data[i])->cluster      = Get_Cluster (stats->from_proc[i]);
341     (&Object_Data[i])->from_pe      = stats->from_proc[i];
342     (&Object_Data[i])->num_lan_msgs = 0;
343     (&Object_Data[i])->num_wan_msgs = 0;
344     (&Object_Data[i])->load         = (&stats->objData[i])->wallTime;
345
346     if ((&Object_Data[i])->migratable) {
347       (&Object_Data[i])->to_pe = -1;
348     } else {
349       (&Object_Data[i])->to_pe = (&Object_Data[i])->from_pe;
350
351       (&PE_Data[(&Object_Data[i])->to_pe])->scaled_load += (&Object_Data[i])->load;
352
353       if (_lb_args.debug() >= 2) {
354         CkPrintf ("[%d] GridCommLB identifies object %d as non-migratable.\n", CkMyPe(), i);
355       }
356     }
357   }
358
359   // Examine all object-to-object messages for intra-cluster and inter-cluster communications.
360   for (i = 0; i < stats->n_comm; i++) {
361     com_data = &(stats->commData[i]);
362     if ((!com_data->from_proc()) && (com_data->recv_type() == LD_OBJ_MSG)) {
363       send_object = stats->getHash (com_data->sender);
364       recv_object = stats->getHash (com_data->receiver.get_destObj());
365
366       send_pe = (&Object_Data[send_object])->from_pe;
367       recv_pe = (&Object_Data[recv_object])->from_pe;
368
369       send_cluster = Get_Cluster (send_pe);
370       recv_cluster = Get_Cluster (recv_pe);
371
372       if (send_cluster == recv_cluster) {
373         (&Object_Data[send_object])->num_lan_msgs += com_data->messages;
374       } else {
375         (&Object_Data[send_object])->num_wan_msgs += com_data->messages;
376       }
377     }
378   }
379
380   // Map objects to PEs in each cluster.
381   for (i = 0; i < Num_Clusters; i++) {
382     while (1) {
383       target_object = Find_Maximum_WAN_Object (i);
384       target_pe = Find_Minimum_WAN_PE (i);
385
386       if ((target_object == -1) || (target_pe == -1)) {
387         break;
388       }
389
390       Assign_Object_To_PE (target_object, target_pe);
391     }
392   }
393
394   // Make the assignment of objects to PEs in the load balancer framework.
395   for (i = 0; i < Num_Objects; i++) {
396     stats->to_proc[i] = (&Object_Data[i])->to_pe;
397
398     if (_lb_args.debug() >= 3) {
399       CkPrintf ("[%d] GridCommLB migrates object %d from PE %d to PE %d.\n", CkMyPe(), i, stats->from_proc[i], stats->to_proc[i]);
400     } else if (_lb_args.debug() >= 2) {
401       if (stats->to_proc[i] != stats->from_proc[i]) {
402         CkPrintf ("[%d] GridCommLB migrates object %d from PE %d to PE %d.\n", CkMyPe(), i, stats->from_proc[i], stats->to_proc[i]);
403       }
404     }
405   }
406
407   // Free memory.
408   delete [] Object_Data;
409   delete [] PE_Data;
410 }
411
412 #include "GridCommLB.def.h"