These changes fix things that I have discovered about Grid load balancing.
[charm.git] / src / ck-ldb / GridCommLB.C
1 /**************************************************************************
2 ** Greg Koenig (koenig@uiuc.edu)
3 ** November 4, 2004
4 **
5 ** This is GridCommLB.C
6 **
7 ** GridCommLB is a load balancer for the Charm++ load balancing framework.
8 ** It is designed to work in a Grid computing environment consisting of
9 ** two or more clusters separated by wide-area communication links.
10 ** Communication between objects within a cluster is assumed to be light
11 ** weight (measured in microseconds) while communication between objects
12 ** on different clusters is assumed to be heavy weight (measured in
13 ** milliseconds).
14 **
15 ** The load balancer examines all communications in a computation and
16 ** attempts to spread the objects that communicate to objects on remote
17 ** clusters evenly across the PEs in the local cluster.  No objects are
18 ** ever migrated across cluster boundaries, they are simply distributed
19 ** as evenly as possible across the PEs in the cluster in which they were
20 ** originally placed.  The idea is that by spreading objects that
21 ** communicate over the wide-area evenly, a relatively small number of
22 ** WAN objects will be mixed with a relatively large number of LAN
23 ** objects, allowing the message-driven characteristics of Charm++ the
24 ** greatest possibility of overlapping the high-cost WAN communication
25 ** with locally-driven work.
26 **
27 ** The load balancer secondarily balances on scaled processor load
28 ** (i.e., a processor that is 2x the speed of another processor in
29 ** the local cluster will get 2x the work) as well as the number of
30 ** LAN objects.
31 **
32 ** This load balancer can severely disrupt the object-to-PE mapping by
33 ** causing large numbers of objects to migrate with each load balancing
34 ** invocation.  This may be undesirable in some cases.  (For example, if
35 ** the vmi-linux "eager protocol" is used, eager channels may be pinned
36 ** between two PEs, and migrating objects that communicate heavily with
37 ** each other onto other PEs could actually slow the computationif they
38 ** no longer communicate with each other over an eager channel.)
39 */
40
41 #include "GridCommLB.decl.h"
42
43 #include "GridCommLB.h"
44 #include "manager.h"
45
46 CreateLBFunc_Def (GridCommLB, "Grid communication load balancer (evenly distribute objects across each cluster)");
47
48
49
50 /**************************************************************************
51 **
52 */
53 GridCommLB::GridCommLB (const CkLBOptions &opt) : CentralLB (opt)
54 {
55   lbname = (char *) "GridCommLB";
56
57   if (CkMyPe() == 0) {
58     CkPrintf ("[%d] GridCommLB created.\n", CkMyPe());
59   }
60
61   manager_init ();
62 }
63
64
65
66 /**************************************************************************
67 **
68 */
69 GridCommLB::GridCommLB (CkMigrateMessage *msg) : CentralLB (msg)
70 {
71   lbname = (char *) "GridCommLB";
72
73   manager_init ();
74 }
75
76
77
78 /**************************************************************************
79 ** The Charm++ load balancing framework invokes this method to determine
80 ** whether load balancing can be performed at a specified time.
81 */
82 CmiBool GridCommLB::QueryBalanceNow (int step)
83 {
84   if (_lb_args.debug() > 2) {
85     CkPrintf ("[%d] GridCommLB is balancing on step %d.\n", CkMyPe(), step);
86   }
87
88   return (CmiTrue);
89 }
90
91
92
93 /**************************************************************************
94 ** The vmi-linux machine layer incorporates the idea that PEs are located
95 ** within identifiable clusters.  This information can be supplied by the
96 ** user or can be probed automatically by the machine layer.  The exposed
97 ** API call CmiGetCluster() returns the integer cluster number for a
98 ** specified PE or -1 if the information is unknown.
99 **
100 ** For machine layers other than vmi-linux, simply return the constant 0.
101 ** GridCommLB will assume a single-cluster computation and will balance
102 ** on the scaled processor load and number of LAN messages.
103 */
104 int GridCommLB::Get_Cluster (int pe)
105 {
106 #if CONVERSE_VERSION_VMI
107   return (CmiGetCluster (pe));
108 #else
109   return (0);
110 #endif
111 }
112
113
114
115 /**************************************************************************
116 ** Instantiate and initialize the PE_Data[] data structure.
117 **
118 ** While doing this...
119 **    - ensure that there is at least one available PE
120 **    - ensure that all PEs are mapped to a cluster
121 **    - determine the maximum cluster number (gives the number of clusters)
122 **    - determine the minimum speed PE (used to compute relative PE speeds)
123 */
124 void GridCommLB::Initialize_PE_Data (CentralLB::LDStats *stats)
125 {
126   int min_speed;
127   int i;
128
129
130   PE_Data = new PE_Data_T[Num_PEs];
131
132   min_speed = MAXINT;
133   for (i = 0; i < Num_PEs; i++) {
134     (&PE_Data[i])->available      = stats->procs[i].available;
135     (&PE_Data[i])->cluster        = Get_Cluster (i);
136     (&PE_Data[i])->num_objs       = 0;
137     (&PE_Data[i])->num_lan_objs   = 0;
138     (&PE_Data[i])->num_lan_msgs   = 0;
139     (&PE_Data[i])->num_wan_objs   = 0;
140     (&PE_Data[i])->num_wan_msgs   = 0;
141     (&PE_Data[i])->relative_speed = 0.0;
142     (&PE_Data[i])->scaled_load    = 0.0;
143
144     if (stats->procs[i].pe_speed < min_speed) {
145       min_speed = stats->procs[i].pe_speed;
146     }
147   }
148
149   // Compute the relative PE speeds.
150   // Also add background CPU time to each PE's scaled load.
151   for (i = 0; i < Num_PEs; i++) {
152     (&PE_Data[i])->relative_speed = (double) (stats->procs[i].pe_speed / min_speed);
153     (&PE_Data[i])->scaled_load += stats->procs[i].bg_cputime;
154   }
155 }
156
157
158
159 /**************************************************************************
160 **
161 */
162 int GridMetisLB::Available_PE_Count ()
163 {
164   int available_pe_count;
165   int i;
166
167
168   available_pe_count = 0;
169   for (i = 0; i < Num_PEs; i++) {
170     if ((&PE_Data[i])->available) {
171       available_pe_count += 1;
172     }
173   }
174   return (available_pe_count);
175 }
176
177
178
179 /**************************************************************************
180 **
181 */
182 int GridMetisLB::Compute_Number_Of_Clusters ()
183 {
184   int max_cluster;
185   int i;
186
187
188   max_cluster = 0;
189   for (i = 0; i < Num_PEs; i++) {
190     if ((&PE_Data[i])->cluster < 0) {
191       return (-1);
192     }
193
194     if ((&PE_Data[i])->cluster > max_cluster) {
195       max_cluster = (&PE_Data[i])->cluster;
196     }
197   }
198   return (max_cluster + 1);
199 }
200
201
202
203 /**************************************************************************
204 **
205 */
206 void GridCommLB::Initialize_Object_Data (CentralLB::LDStats *stats)
207 {
208   int i;
209
210
211   Object_Data = new Object_Data_T[Num_Objects];
212
213   for (i = 0; i < Num_Objects; i++) {
214     (&Object_Data[i])->migratable   = (&stats->objData[i])->migratable;
215     (&Object_Data[i])->cluster      = Get_Cluster (stats->from_proc[i]);
216     (&Object_Data[i])->from_pe      = stats->from_proc[i];
217     (&Object_Data[i])->to_pe        = -1;
218     (&Object_Data[i])->num_lan_msgs = 0;
219     (&Object_Data[i])->num_wan_msgs = 0;
220     (&Object_Data[i])->load         = (&stats->objData[i])->wallTime;
221   }
222 }
223
224
225
226 /**************************************************************************
227 **
228 */
229 void GridCommLB::Examine_InterObject_Messages (CentralLB::LDStats *stats)
230 {
231   int i;
232   LDCommData *com_data;
233   int send_object;
234   int send_pe;
235   int send_cluster;
236   int recv_object;
237   int recv_pe;
238   int recv_cluster;
239   LDObjKey *recv_objects;
240   int num_objects;
241
242
243   for (i = 0; i < stats->n_comm; i++) {
244     com_data = &(stats->commData[i]);
245     if ((!com_data->from_proc()) && (com_data->recv_type() == LD_OBJ_MSG)) {
246       send_object = stats->getHash (com_data->sender);
247       recv_object = stats->getHash (com_data->receiver.get_destObj());
248
249       if ((send_object < 0) || (send_object > Num_Objects) || (recv_object < 0) || (recv_object > Num_Objects)) {
250         continue;
251       }
252
253       send_pe = (&Object_Data[send_object])->from_pe;
254       recv_pe = (&Object_Data[recv_object])->from_pe;
255
256       send_cluster = Get_Cluster (send_pe);
257       recv_cluster = Get_Cluster (recv_pe);
258
259       if (send_cluster == recv_cluster) {
260         (&Object_Data[send_object])->num_lan_msgs += com_data->messages;
261       } else {
262         (&Object_Data[send_object])->num_wan_msgs += com_data->messages;
263       }
264     } else if (com_data->receiver.get_type() == LD_OBJLIST_MSG) {
265       send_object = stats->getHash (com_data->sender);
266
267       if ((send_object < 0) || (send_object > Num_Objects)) {
268         continue;
269       }
270
271       send_pe = (&Object_Data[send_object])->from_pe;
272       send_cluster = Get_Cluster (send_pe);
273
274       recv_objects = com_data->receiver.get_destObjs (num_objects);   // (num_objects is passed by reference)
275
276       for (j = 0; j < num_objects; j++) {
277         recv_object = stats->getHash (recv_objects[j]);
278
279         if ((recv_object < 0) || (recv_object > Num_Objects)) {
280           continue;
281         }
282
283         recv_pe = (&Object_Data[recv_object])->from_pe;
284         recv_cluster = Get_Cluster (recv_pe);
285
286         if (send_cluster == recv_cluster) {
287           (&Object_Data[send_object])->num_lan_msgs += com_data->messages;
288         } else {
289           (&Object_Data[send_object])->num_wan_msgs += com_data->messages;
290         }
291       }
292     }
293   }
294 }
295
296
297
298 /**************************************************************************
299 **
300 */
301 void GridCommLB::Map_NonMigratable_Objects_To_PEs ()
302 {
303   int i;
304
305
306   for (i = 0; i < Num_Objects; i++) {
307     if (!((&Object_Data[i])->migratable)) {
308       if (_lb_args.debug() > 1) {
309         CkPrintf ("[%d] GridCommLB identifies object %d as non-migratable.\n", CkMyPe(), i);
310       }
311
312       Assign_Object_To_PE (i, (&Object_Data[i])->from_pe);
313     }
314   }
315 }
316
317
318
319 /**************************************************************************
320 **
321 */
322 void GridCommLB::Map_Migratable_Objects_To_PEs (int cluster)
323 {
324   int target_object;
325   int target_pe;
326
327
328   while (1) {
329     target_object = Find_Maximum_WAN_Object (cluster);
330     target_pe = Find_Minimum_WAN_PE (cluster);
331
332     if ((target_object == -1) || (target_pe == -1)) {
333       break;
334     }
335
336     Assign_Object_To_PE (target_object, target_pe);
337   }
338 }
339
340
341
342 /**************************************************************************
343 ** This method locates the maximum WAN object in terms of number of
344 ** messages that traverse a wide-area connection.  The search is
345 ** constrained to objects within the specified cluster that have not yet
346 ** been mapped (balanced) to a PE.
347 **
348 ** The method returns -1 if no matching object is found.
349 */
350 int GridCommLB::Find_Maximum_WAN_Object (int cluster)
351 {
352   int i;
353   int max_index;
354   int max_wan_msgs;
355
356
357   max_index = -1;
358   max_wan_msgs = -1;
359
360   for (i = 0; i < Num_Objects; i++) {
361     if ((&Object_Data[i])->cluster == cluster) {
362       if ((&Object_Data[i])->to_pe == -1) {
363         if ((&Object_Data[i])->num_wan_msgs > max_wan_msgs) {
364           max_index = i;
365           max_wan_msgs = (&Object_Data[i])->num_wan_msgs;
366         }
367       }
368     }
369   }
370
371   return (max_index);
372 }
373
374
375
376 /**************************************************************************
377 ** This method locates the minimum WAN PE in terms of number of objects
378 ** that communicate with objects across a wide-area connection.  The search
379 ** is constrained to PEs within the specified cluster.
380 **
381 ** In the event of a "tie" (i.e., the number of WAN objects on a candidate
382 ** PE is equal to the minimum number of WAN objects discovered so far) the
383 ** tie is broken by considering the scaled CPU loads on the PEs.  The PE
384 ** with the smaller scaled load is the better candidate.  In the event of
385 ** a secondary tie, the secondary tie is broken by considering the number
386 ** of LAN objects on the two PEs.
387 **
388 ** The method returns -1 if no matching PE is found.
389 */
390 int GridCommLB::Find_Minimum_WAN_PE (int cluster)
391 {
392   int i;
393   int min_index;
394   int min_wan_msgs;
395
396
397   min_index = -1;
398   min_wan_msgs = MAXINT;
399
400   for (i = 0; i < Num_PEs; i++) {
401     if (((&PE_Data[i])->available) && ((&PE_Data[i])->cluster == cluster)) {
402       if ((&PE_Data[i])->num_wan_msgs < min_wan_msgs) {
403         min_index = i;
404         min_wan_msgs = (&PE_Data[i])->num_wan_msgs;
405       } else if (((&PE_Data[i])->num_wan_msgs == min_wan_msgs) &&
406                  ((&PE_Data[i])->scaled_load < (&PE_Data[min_index])->scaled_load)) {
407         min_index = i;
408         min_wan_msgs = (&PE_Data[i])->num_wan_msgs;
409       } else if (((&PE_Data[i])->num_wan_msgs == min_wan_msgs) &&
410                  ((&PE_Data[i])->scaled_load == (&PE_Data[min_index])->scaled_load) &&
411                  ((&PE_Data[i])->num_objs < (&PE_Data[min_index])->num_objs)) {
412         min_index = i;
413         min_wan_msgs = (&PE_Data[i])->num_wan_msgs;
414       }
415     }
416   }
417
418   return (min_index);
419
420 /*
421   int i;
422   int min_index;
423   int min_wan_objs;
424
425
426   min_index = -1;
427   min_wan_objs = MAXINT;
428
429   for (i = 0; i < Num_PEs; i++) {
430     if (((&PE_Data[i])->available) && ((&PE_Data[i])->cluster == cluster)) {
431       if ((&PE_Data[i])->num_wan_objs < min_wan_objs) {
432         min_index = i;
433         min_wan_objs = (&PE_Data[i])->num_wan_objs;
434       } else if (((&PE_Data[i])->num_wan_objs == min_wan_objs) &&
435                  ((&PE_Data[i])->scaled_load < (&PE_Data[min_index])->scaled_load)) {
436         min_index = i;
437         min_wan_objs = (&PE_Data[i])->num_wan_objs;
438       } else if (((&PE_Data[i])->num_wan_objs == min_wan_objs) &&
439                  ((&PE_Data[i])->scaled_load == (&PE_Data[min_index])->scaled_load) &&
440                  ((&PE_Data[i])->num_lan_objs < (&PE_Data[min_index])->num_lan_objs)) {
441         min_index = i;
442         min_wan_objs = (&PE_Data[i])->num_wan_objs;
443       }
444     }
445   }
446
447   return (min_index);
448 */
449 }
450
451
452
453 /**************************************************************************
454 ** This method assigns target_object to target_pe.  The data structure
455 ** entry for target_pe is updated appropriately with measurements from
456 ** target_object.  This updated information is considered when placing
457 ** successive objects onto PEs.
458 */
459 void GridCommLB::Assign_Object_To_PE (int target_object, int target_pe)
460 {
461   (&Object_Data[target_object])->to_pe = target_pe;
462
463   (&PE_Data[target_pe])->num_objs += 1;
464
465   if ((&Object_Data[target_object])->num_lan_msgs > 0) {
466     (&PE_Data[target_pe])->num_lan_objs += 1;
467     (&PE_Data[target_pe])->num_lan_msgs += (&Object_Data[target_object])->num_lan_msgs;
468   }
469
470   if ((&Object_Data[target_object])->num_wan_msgs > 0) {
471     (&PE_Data[target_pe])->num_wan_objs += 1;
472     (&PE_Data[target_pe])->num_wan_msgs += (&Object_Data[target_object])->num_wan_msgs;
473   }
474
475   (&PE_Data[target_pe])->scaled_load += (&Object_Data[target_object])->load / (&PE_Data[target_pe])->relative_speed;
476 }
477
478
479
480 /**************************************************************************
481 ** The Charm++ load balancing framework invokes this method to cause the
482 ** load balancer to migrate objects to "better" PEs.
483 */
484 void GridCommLB::work (CentralLB::LDStats *stats, int count)
485 {
486   int i;
487   // CmiBool available;
488   // CmiBool all_pes_mapped;
489   // int max_cluster;
490   // int min_speed;
491   // int send_object;
492   // int send_pe;
493   // int send_cluster;
494   // int recv_object;
495   // int recv_pe;
496   // int recv_cluster;
497   // int target_object;
498   // int target_pe;
499   // LDCommData *com_data;
500
501
502   if (_lb_args.debug() > 0) {
503     CkPrintf ("[%d] GridCommLB is working.\n", CkMyPe());
504   }
505
506   // Since this load balancer looks at communications data, it must initialize the CommHash.
507   stats->makeCommHash ();
508
509   // Initialize object variables for the number of PEs and number of objects.
510   Num_PEs = count;
511   Num_Objects = stats->n_objs;
512
513   if (_lb_args.debug() > 0) {
514     CkPrintf ("[%d] GridCommLB is examining %d PEs and %d objects.\n", CkMyPe(), Num_PEs, Num_Objects);
515   }
516
517   // Initialize the PE_Data[] data structure.
518   Initialize_PE_Data (stats);
519
520   // If at least one available PE does not exist, return from load balancing.
521   if (Available_PE_Count() < 1) {
522     if (_lb_args.debug() > 0) {
523       CkPrintf ("[%d] GridCommLB finds no available PEs -- no balancing done.\n", CkMyPe());
524     }
525
526     delete [] PE_Data;
527
528     return;
529   }
530
531   // Determine the number of clusters.
532   // If any PE is not mapped to a cluster, return from load balancing.
533   Num_Clusters = Compute_Number_Of_Clusters ();
534   if (Num_Clusters < 1) {
535     if (_lb_args.debug() > 0) {
536       CkPrintf ("[%d] GridCommLB finds incomplete PE cluster map -- no balancing done.\n", CkMyPe());
537     }
538
539     delete [] PE_Data;
540
541     return;
542   }
543
544   if (_lb_args.debug() > 0) {
545     CkPrintf ("[%d] GridCommLB finds %d clusters.\n", CkMyPe(), Num_Clusters);
546   }
547
548   // Initialize the Object_Data[] data structure.
549   Initialize_Object_Data (stats);
550
551   // Examine all object-to-object messages for intra-cluster and inter-cluster communications.
552   Examine_InterObject_Messages (stats);
553
554   // Map non-migratable objects to PEs.
555   Map_NonMigratable_Objects_To_PEs ();
556
557   // Map migratable objects to PEs in each cluster.
558   for (i = 0; i < Num_Clusters; i++) {
559     Map_Migratable_Objects_To_PEs (i);
560   }
561
562   // Make the assignment of objects to PEs in the load balancer framework.
563   for (i = 0; i < Num_Objects; i++) {
564     stats->to_proc[i] = (&Object_Data[i])->to_pe;
565
566     if (_lb_args.debug() > 2) {
567       CkPrintf ("[%d] GridCommLB migrates object %d from PE %d to PE %d.\n", CkMyPe(), i, stats->from_proc[i], stats->to_proc[i]);
568     } else if (_lb_args.debug() > 1) {
569       if (stats->to_proc[i] != stats->from_proc[i]) {
570         CkPrintf ("[%d] GridCommLB migrates object %d from PE %d to PE %d.\n", CkMyPe(), i, stats->from_proc[i], stats->to_proc[i]);
571       }
572     }
573   }
574
575   // Free memory.
576   delete [] Object_Data;
577   delete [] PE_Data;
578 }
579
580 #include "GridCommLB.def.h"