doc: Add serial to list of ci file reserved words
[charm.git] / src / ck-ldb / GridCommRefineLB.C
1 /**************************************************************************
2 ** Greg Koenig (koenig@uiuc.edu)
3 ** March 1, 2006
4 **
5 ** This is GridCommRefineLB.C
6 **
7 ** GridCommRefineLB is a load balancer for the Charm++ load balancing
8 ** framework.  It is designed to work in a Grid computing environment
9 ** consisting of two or more clusters separated by wide-area communication
10 ** links.  Communication between objects within a cluster is assumed to be
11 ** light weight (measured in microseconds) while communication between
12 ** objects on different clusters is assumed to be heavy weight (measured in
13 ** milliseconds).
14 **
15 ** The load balancer examines all communications in a computation and
16 ** attempts to spread the objects that communicate with objects on remote
17 ** clusters evenly across the PEs in the local cluster.  No objects are
18 ** ever migrated across cluster boundaries, they are simply distributed
19 ** as evenly as possible across the PEs in the cluster in which they were
20 ** originally placed.  The idea is that by spreading objects that
21 ** communicate over the wide-area evenly, a relatively small number of
22 ** WAN objects will be mixed with a relatively large number of LAN
23 ** objects, allowing the message-driven characteristics of Charm++ the
24 ** greatest possibility of overlapping the high-cost WAN communication
25 ** with locally-driven work.
26 **
27 ** The load balancer secondarily balances on scaled processor load
28 ** (i.e., a processor that is 2x the speed of another processor in
29 ** the local cluster will get 2x the work) as well as the number of
30 ** LAN objects.
31 **
32 ** This load balancer applies a "refinement" approach which attempts to
33 ** avoid disrupting the object-to-PE mapping by causing large numbers of
34 ** objects to migrate with each load balancing iĀ®nvocation.  This may be
35 ** undesirable in some cases.  (For example, if the vmi-linux "eager
36 ** protocol" is used, eager channels may be pinned between two PEs, and
37 ** migrating objects that communicate heavily with each other onto other
38 ** PEs could actually slow the computationif they no longer communicate
39 ** with each other over an eager channel.)  To prevent this, the balancer
40 ** determines the average number of objects per PE that communicate with
41 ** objects on remote clusters, and then migrates objects away from PEs
42 ** that exceed this average plus some tolerance (e.g., 110% of average).
43 ** This means that only the objects on the most overloaded PEs will be
44 ** migrated.
45 */
46
47 #include "GridCommRefineLB.decl.h"
48
49 #include "GridCommRefineLB.h"
50 #include "manager.h"
51
52 CreateLBFunc_Def (GridCommRefineLB, "Grid communication load balancer (refines object mapping within each cluster)")
53
54
55
56 /**************************************************************************
57 **
58 */
59 GridCommRefineLB::GridCommRefineLB (const CkLBOptions &opt) : CentralLB (opt)
60 {
61   char *value;
62
63
64   lbname = (char *) "GridCommRefineLB";
65
66   if (CkMyPe() == 0) {
67     CkPrintf ("[%d] GridCommRefineLB created.\n", CkMyPe());
68   }
69
70   if (value = getenv ("CK_LDB_GRIDCOMMREFINELB_TOLERANCE")) {
71     CK_LDB_GridCommRefineLB_Tolerance = atof (value);
72   } else {
73     CK_LDB_GridCommRefineLB_Tolerance = CK_LDB_GRIDCOMMREFINELB_TOLERANCE;
74   }
75
76   manager_init ();
77 }
78
79
80
81 /**************************************************************************
82 **
83 */
84 GridCommRefineLB::GridCommRefineLB (CkMigrateMessage *msg) : CentralLB (msg)
85 {
86   char *value;
87
88
89   lbname = (char *) "GridCommRefineLB";
90
91   if (value = getenv ("CK_LDB_GRIDCOMMREFINELB_TOLERANCE")) {
92     CK_LDB_GridCommRefineLB_Tolerance = atof (value);
93   } else {
94     CK_LDB_GridCommRefineLB_Tolerance = CK_LDB_GRIDCOMMREFINELB_TOLERANCE;
95   }
96
97   manager_init ();
98 }
99
100
101
102 /**************************************************************************
103 ** The Charm++ load balancing framework invokes this method to determine
104 ** whether load balancing can be performed at a specified time.
105 */
106 CmiBool GridCommRefineLB::QueryBalanceNow (int step)
107 {
108   if (_lb_args.debug() > 2) {
109     CkPrintf ("[%d] GridCommRefineLB is balancing on step %d.\n", CkMyPe(), step);
110   }
111
112   return (CmiTrue);
113 }
114
115
116
117 /**************************************************************************
118 ** The vmi-linux machine layer incorporates the idea that PEs are located
119 ** within identifiable clusters.  This information can be supplied by the
120 ** user or can be probed automatically by the machine layer.  The exposed
121 ** API call CmiGetCluster() returns the integer cluster number for a
122 ** specified PE or -1 if the information is unknown.
123 **
124 ** For machine layers other than vmi-linux, simply return the constant 0.
125 ** GridCommRefineLB will assume a single-cluster computation and will
126 ** balance on the scaled processor load and number of LAN messages.
127 */
128 int GridCommRefineLB::Get_Cluster (int pe)
129 {
130 #if CONVERSE_VERSION_VMI
131   return (CmiGetCluster (pe));
132 #else
133   return (0);
134 #endif
135 }
136
137
138
139 /**************************************************************************
140 ** Instantiate and initialize the PE_Data[] data structure.
141 **
142 ** While doing this...
143 **    - ensure that there is at least one available PE
144 **    - ensure that all PEs are mapped to a cluster
145 **    - determine the maximum cluster number (gives the number of clusters)
146 **    - determine the minimum speed PE (used to compute relative PE speeds)
147 */
148 void GridCommRefineLB::Initialize_PE_Data (CentralLB::LDStats *stats)
149 {
150   int min_speed;
151   int i;
152
153
154   PE_Data = new PE_Data_T[Num_PEs];
155
156   min_speed = MAXINT;
157   for (i = 0; i < Num_PEs; i++) {
158     (&PE_Data[i])->available      = stats->procs[i].available;
159     (&PE_Data[i])->cluster        = Get_Cluster (i);
160     (&PE_Data[i])->num_objs       = 0;
161     (&PE_Data[i])->num_lan_objs   = 0;
162     (&PE_Data[i])->num_lan_msgs   = 0;
163     (&PE_Data[i])->num_wan_objs   = 0;
164     (&PE_Data[i])->num_wan_msgs   = 0;
165     (&PE_Data[i])->relative_speed = 0.0;
166     (&PE_Data[i])->scaled_load    = 0.0;
167
168     if (stats->procs[i].pe_speed < min_speed) {
169       min_speed = stats->procs[i].pe_speed;
170     }
171   }
172
173   // Compute the relative PE speeds.
174   // Also add background CPU time to each PE's scaled load.
175   for (i = 0; i < Num_PEs; i++) {
176     (&PE_Data[i])->relative_speed = (double) (stats->procs[i].pe_speed / min_speed);
177     (&PE_Data[i])->scaled_load += stats->procs[i].bg_walltime;
178   }
179 }
180
181
182
183 /**************************************************************************
184 **
185 */
186 int GridCommRefineLB::Available_PE_Count ()
187 {
188   int available_pe_count;
189   int i;
190
191
192   available_pe_count = 0;
193   for (i = 0; i < Num_PEs; i++) {
194     if ((&PE_Data[i])->available) {
195       available_pe_count += 1;
196     }
197   }
198   return (available_pe_count);
199 }
200
201
202
203 /**************************************************************************
204 **
205 */
206 int GridCommRefineLB::Compute_Number_Of_Clusters ()
207 {
208   int max_cluster;
209   int i;
210
211
212   max_cluster = 0;
213   for (i = 0; i < Num_PEs; i++) {
214     if ((&PE_Data[i])->cluster < 0) {
215       return (-1);
216     }
217
218     if ((&PE_Data[i])->cluster > max_cluster) {
219       max_cluster = (&PE_Data[i])->cluster;
220     }
221   }
222   return (max_cluster + 1);
223 }
224
225
226
227 /**************************************************************************
228 **
229 */
230 void GridCommRefineLB::Initialize_Object_Data (CentralLB::LDStats *stats)
231 {
232   int i;
233
234
235   Object_Data = new Object_Data_T[Num_Objects];
236
237   for (i = 0; i < Num_Objects; i++) {
238     (&Object_Data[i])->migratable   = (&stats->objData[i])->migratable;
239     (&Object_Data[i])->cluster      = Get_Cluster (stats->from_proc[i]);
240     (&Object_Data[i])->from_pe      = stats->from_proc[i];
241     (&Object_Data[i])->to_pe        = stats->from_proc[i];
242     (&Object_Data[i])->num_lan_msgs = 0;
243     (&Object_Data[i])->num_wan_msgs = 0;
244     (&Object_Data[i])->load         = (&stats->objData[i])->wallTime;
245
246     //(&PE_Data[(&Object_Data[i])->from_pe])->num_objs += 1;
247     //(&PE_Data[(&Object_Data[i])->from_pe])->scaled_load += (&Object_Data[i])->load / (&PE_Data[(&Object_Data[i])->from_pe])->relative_speed;
248   }
249 }
250
251
252
253 /**************************************************************************
254 **
255 */
256 void GridCommRefineLB::Examine_InterObject_Messages (CentralLB::LDStats *stats)
257 {
258   int i;
259   int j;
260   LDCommData *com_data;
261   int send_object;
262   int send_pe;
263   int send_cluster;
264   int recv_object;
265   int recv_pe;
266   int recv_cluster;
267   LDObjKey *recv_objects;
268   int num_objects;
269
270
271   for (i = 0; i < stats->n_comm; i++) {
272     com_data = &(stats->commData[i]);
273     if ((!com_data->from_proc()) && (com_data->recv_type() == LD_OBJ_MSG)) {
274       send_object = stats->getHash (com_data->sender);
275       recv_object = stats->getHash (com_data->receiver.get_destObj());
276
277       if ((send_object < 0) || (send_object > Num_Objects) || (recv_object < 0) || (recv_object > Num_Objects)) {
278         continue;
279       }
280
281       send_pe = (&Object_Data[send_object])->from_pe;
282       recv_pe = (&Object_Data[recv_object])->from_pe;
283
284       send_cluster = Get_Cluster (send_pe);
285       recv_cluster = Get_Cluster (recv_pe);
286
287       if (send_cluster == recv_cluster) {
288         (&Object_Data[send_object])->num_lan_msgs += com_data->messages;
289       } else {
290         (&Object_Data[send_object])->num_wan_msgs += com_data->messages;
291       }
292     } else if (com_data->receiver.get_type() == LD_OBJLIST_MSG) {
293       send_object = stats->getHash (com_data->sender);
294
295       if ((send_object < 0) || (send_object > Num_Objects)) {
296         continue;
297       }
298
299       send_pe = (&Object_Data[send_object])->from_pe;
300       send_cluster = Get_Cluster (send_pe);
301
302       recv_objects = com_data->receiver.get_destObjs (num_objects);   // (num_objects is passed by reference)
303
304       for (j = 0; j < num_objects; j++) {
305         recv_object = stats->getHash (recv_objects[j]);
306
307         if ((recv_object < 0) || (recv_object > Num_Objects)) {
308           continue;
309         }
310
311         recv_pe = (&Object_Data[recv_object])->from_pe;
312         recv_cluster = Get_Cluster (recv_pe);
313
314         if (send_cluster == recv_cluster) {
315           (&Object_Data[send_object])->num_lan_msgs += com_data->messages;
316         } else {
317           (&Object_Data[send_object])->num_wan_msgs += com_data->messages;
318         }
319       }
320     }
321   }
322 }
323
324
325
326 /**************************************************************************
327 **
328 */
329 void GridCommRefineLB::Place_Objects_On_PEs ()
330 {
331   int i;
332
333
334   for (i = 0; i < Num_Objects; i++) {
335     Assign_Object_To_PE (i, (&Object_Data[i])->from_pe);
336   }
337 }
338
339
340
341 /**************************************************************************
342 **
343 */
344 void GridCommRefineLB::Remap_Objects_To_PEs (int cluster)
345 {
346   int num_cluster_pes;
347   int num_wan_msgs;
348   int avg_wan_msgs;
349   int target_object;
350   int target_pe;
351   int i;
352
353
354   // Compute average number of objects per PE for this cluster.
355   num_cluster_pes = 0;
356   num_wan_msgs = 0;
357   for (i = 0; i < Num_PEs; i++) {
358     if (cluster == (&PE_Data[i])->cluster) {
359       num_cluster_pes += 1;
360       num_wan_msgs += (&PE_Data[i])->num_wan_msgs;
361     }
362   }
363   avg_wan_msgs = num_wan_msgs / num_cluster_pes;
364
365   // Move objects away from PEs that exceed the average.
366   for (i = 0; i < Num_PEs; i++) {
367     if (cluster == (&PE_Data[i])->cluster) {
368       while ((&PE_Data[i])->num_wan_msgs > (avg_wan_msgs * CK_LDB_GridCommRefineLB_Tolerance)) {
369         target_object = Find_Maximum_WAN_Object (i);
370         target_pe = Find_Minimum_WAN_PE (cluster);
371
372         if ((target_object == -1) || (target_pe == -1)) {
373           break;
374         }
375
376         Remove_Object_From_PE (target_object, i);
377         Assign_Object_To_PE (target_object, target_pe);
378       }
379     }
380   }
381
382 /*
383   // Compute average number of objects per PE for this cluster.
384   num_cluster_pes = 0;
385   num_wan_objs = 0;
386   for (j = 0; j < Num_PEs; j++) {
387     if (cluster == (&PE_Data[j])->cluster) {
388       num_cluster_pes += 1;
389       num_wan_objs += (&PE_Data[j])->num_wan_objs;
390     }
391   }
392   avg_wan_objs = num_wan_objs / num_cluster_pes;
393
394   // Move objects away from PEs that exceed the average.
395   for (j = 0; j < Num_PEs; j++) {
396     if (cluster == (&PE_Data[j])->cluster) {
397       while ((&PE_Data[j])->num_wan_objs > (avg_wan_objs * CK_LDB_GridCommRefineLB_Tolerance)) {
398         target_object = Find_Maximum_WAN_Object (j);
399         target_pe = Find_Minimum_WAN_PE (i);
400
401         if ((target_object == -1) || (target_pe == -1)) {
402           break;
403         }
404
405         Remove_Object_From_PE (target_object, j);
406         Assign_Object_To_PE (target_object, target_pe);
407       }
408     }
409   }
410 */
411 }
412
413
414
415 /**************************************************************************
416 ** This method locates the maximum WAN object in terms of number of
417 ** messages that traverse a wide-area connection.  The search is
418 ** constrained to objects on the specified PE.
419 **
420 ** The method returns -1 if no matching object is found.
421 */
422 int GridCommRefineLB::Find_Maximum_WAN_Object (int pe)
423 {
424   int i;
425   int max_index;
426   int max_wan_msgs;
427
428
429   max_index = -1;
430   max_wan_msgs = -1;
431
432   for (i = 0; i < Num_Objects; i++) {
433     if ((&Object_Data[i])->from_pe == pe) {
434       if ((&Object_Data[i])->migratable) {
435         if ((&Object_Data[i])->num_wan_msgs > max_wan_msgs) {
436           max_index = i;
437           max_wan_msgs = (&Object_Data[i])->num_wan_msgs;
438         }
439       }
440     }
441   }
442
443   return (max_index);
444 }
445
446
447
448 /**************************************************************************
449 ** This method locates the minimum WAN PE in terms of number of objects
450 ** that communicate with objects across a wide-area connection.  The search
451 ** is constrained to PEs within the specified cluster.
452 **
453 ** In the event of a "tie" (i.e., the number of WAN objects on a candidate
454 ** PE is equal to the minimum number of WAN objects discovered so far) the
455 ** tie is broken by considering the scaled CPU loads on the PEs.  The PE
456 ** with the smaller scaled load is the better candidate.  In the event of
457 ** a secondary tie, the secondary tie is broken by considering the number
458 ** of LAN objects on the two PEs.
459 **
460 ** The method returns -1 if no matching PE is found.
461 */
462 int GridCommRefineLB::Find_Minimum_WAN_PE (int cluster)
463 {
464   int i;
465   int min_index;
466   int min_wan_msgs;
467
468
469   min_index = -1;
470   min_wan_msgs = MAXINT;
471
472   for (i = 0; i < Num_PEs; i++) {
473     if (((&PE_Data[i])->available) && ((&PE_Data[i])->cluster == cluster)) {
474       if ((&PE_Data[i])->num_wan_msgs < min_wan_msgs) {
475         min_index = i;
476         min_wan_msgs = (&PE_Data[i])->num_wan_msgs;
477       } else if (((&PE_Data[i])->num_wan_msgs == min_wan_msgs) &&
478                  ((&PE_Data[i])->scaled_load < (&PE_Data[min_index])->scaled_load)) {
479         min_index = i;
480         min_wan_msgs = (&PE_Data[i])->num_wan_msgs;
481       } else if (((&PE_Data[i])->num_wan_msgs == min_wan_msgs) &&
482                  ((&PE_Data[i])->scaled_load == (&PE_Data[min_index])->scaled_load) &&
483                  ((&PE_Data[i])->num_objs < (&PE_Data[min_index])->num_objs)) {
484         min_index = i;
485         min_wan_msgs = (&PE_Data[i])->num_wan_msgs;
486       }
487     }
488   }
489
490   return (min_index);
491
492 /*
493   int i;
494   int min_index;
495   int min_wan_objs;
496
497
498   min_index = -1;
499   min_wan_objs = MAXINT;
500
501   for (i = 0; i < Num_PEs; i++) {
502     if (((&PE_Data[i])->available) && ((&PE_Data[i])->cluster == cluster)) {
503       if ((&PE_Data[i])->num_wan_objs < min_wan_objs) {
504         min_index = i;
505         min_wan_objs = (&PE_Data[i])->num_wan_objs;
506       } else if (((&PE_Data[i])->num_wan_objs == min_wan_objs) &&
507                  ((&PE_Data[i])->scaled_load < (&PE_Data[min_index])->scaled_load)) {
508         min_index = i;
509         min_wan_objs = (&PE_Data[i])->num_wan_objs;
510       } else if (((&PE_Data[i])->num_wan_objs == min_wan_objs) &&
511                  ((&PE_Data[i])->scaled_load == (&PE_Data[min_index])->scaled_load) &&
512                  ((&PE_Data[i])->num_lan_objs < (&PE_Data[min_index])->num_lan_objs)) {
513         min_index = i;
514         min_wan_objs = (&PE_Data[i])->num_wan_objs;
515       }
516     }
517   }
518
519   return (min_index);
520 */
521 }
522
523
524
525 /**************************************************************************
526 ** This method removes target_object from target_pe.  The data structure
527 ** entry for target_pe is updated appropriately with measurements from
528 ** target_object.
529 */
530 void GridCommRefineLB::Remove_Object_From_PE (int target_object, int target_pe)
531 {
532   (&Object_Data[target_object])->to_pe = -1;
533
534   (&PE_Data[target_pe])->num_objs -= 1;
535
536   if ((&Object_Data[target_object])->num_lan_msgs > 0) {
537     (&PE_Data[target_pe])->num_lan_objs -= 1;
538     (&PE_Data[target_pe])->num_lan_msgs -= (&Object_Data[target_object])->num_lan_msgs;
539   }
540
541   if ((&Object_Data[target_object])->num_wan_msgs > 0) {
542     (&PE_Data[target_pe])->num_wan_objs -= 1;
543     (&PE_Data[target_pe])->num_wan_msgs -= (&Object_Data[target_object])->num_wan_msgs;
544   }
545
546   (&PE_Data[target_pe])->scaled_load -= (&Object_Data[target_object])->load / (&PE_Data[target_pe])->relative_speed;
547 }
548
549
550
551 /**************************************************************************
552 ** This method assigns target_object to target_pe.  The data structure
553 ** entry for target_pe is updated appropriately with measurements from
554 ** target_object.
555 */
556 void GridCommRefineLB::Assign_Object_To_PE (int target_object, int target_pe)
557 {
558   (&Object_Data[target_object])->to_pe = target_pe;
559
560   (&PE_Data[target_pe])->num_objs += 1;
561
562   if ((&Object_Data[target_object])->num_lan_msgs > 0) {
563     (&PE_Data[target_pe])->num_lan_objs += 1;
564     (&PE_Data[target_pe])->num_lan_msgs += (&Object_Data[target_object])->num_lan_msgs;
565   }
566
567   if ((&Object_Data[target_object])->num_wan_msgs > 0) {
568     (&PE_Data[target_pe])->num_wan_objs += 1;
569     (&PE_Data[target_pe])->num_wan_msgs += (&Object_Data[target_object])->num_wan_msgs;
570   }
571
572   (&PE_Data[target_pe])->scaled_load += (&Object_Data[target_object])->load / (&PE_Data[target_pe])->relative_speed;
573 }
574
575
576
577 /**************************************************************************
578 ** The Charm++ load balancing framework invokes this method to cause the
579 ** load balancer to migrate objects to "better" PEs.
580 */
581 void GridCommRefineLB::work (LDStats *stats)
582 {
583   int i;
584   // int j;
585   // CmiBool available;
586   // CmiBool all_pes_mapped;
587   // int max_cluster;
588   // int min_speed;
589   // int send_object;
590   // int send_pe;
591   // int send_cluster;
592   // int recv_object;
593   // int recv_pe;
594   // int recv_cluster;
595   // LDCommData *com_data;
596
597
598   if (_lb_args.debug() > 0) {
599     CkPrintf ("[%d] GridCommRefineLB is working.\n", CkMyPe());
600   }
601
602   // Since this load balancer looks at communications data, it must call stats->makeCommHash().
603   stats->makeCommHash ();
604
605   // Initialize object variables for the number of PEs and number of objects.
606   Num_PEs = stats->nprocs();
607   Num_Objects = stats->n_objs;
608
609   if (_lb_args.debug() > 0) {
610     CkPrintf ("[%d] GridCommRefineLB is examining %d PEs and %d objects.\n", CkMyPe(), Num_PEs, Num_Objects);
611   }
612
613   // Initialize the PE_Data[] data structure.
614   Initialize_PE_Data (stats);
615
616   // If at least one available PE does not exist, return from load balancing.
617   if (Available_PE_Count() < 1) {
618     if (_lb_args.debug() > 0) {
619       CkPrintf ("[%d] GridCommRefineLB finds no available PEs -- no balancing done.\n", CkMyPe());
620     }
621
622     delete [] PE_Data;
623
624     return;
625   }
626
627   // Determine the number of clusters.
628   // If any PE is not mapped to a cluster, return from load balancing.
629   Num_Clusters = Compute_Number_Of_Clusters ();
630   if (Num_Clusters < 1) {
631     if (_lb_args.debug() > 0) {
632       CkPrintf ("[%d] GridCommRefineLB finds incomplete PE cluster map -- no balancing done.\n", CkMyPe());
633     }
634
635     delete [] PE_Data;
636
637     return;
638   }
639
640   if (_lb_args.debug() > 0) {
641     CkPrintf ("[%d] GridCommRefineLB finds %d clusters.\n", CkMyPe(), Num_Clusters);
642   }
643
644   // Initialize the Object_Data[] data structure.
645   Initialize_Object_Data (stats);
646
647   // Examine all object-to-object messages for intra-cluster and inter-cluster communications.
648   Examine_InterObject_Messages (stats);
649
650   // Place objects on the PE they are currently assigned to.
651   Place_Objects_On_PEs ();
652
653   // Remap objects to PEs in each cluster.
654   for (i = 0; i < Num_Clusters; i++) {
655     Remap_Objects_To_PEs (i);
656   }
657
658   // Make the assignment of objects to PEs in the load balancer framework.
659   for (i = 0; i < Num_Objects; i++) {
660     stats->to_proc[i] = (&Object_Data[i])->to_pe;
661
662     if (_lb_args.debug() > 2) {
663       CkPrintf ("[%d] GridCommRefineLB migrates object %d from PE %d to PE %d.\n", CkMyPe(), i, stats->from_proc[i], stats->to_proc[i]);
664     } else if (_lb_args.debug() > 1) {
665       if (stats->to_proc[i] != stats->from_proc[i]) {
666         CkPrintf ("[%d] GridCommRefineLB migrates object %d from PE %d to PE %d.\n", CkMyPe(), i, stats->from_proc[i], stats->to_proc[i]);
667       }
668     }
669   }
670
671   // Free memory.
672   delete [] Object_Data;
673   delete [] PE_Data;
674 }
675
676 #include "GridCommRefineLB.def.h"