These modifications to GridCommLB allow the balancer to automatically
authorGreg Koenig <koenig@uiuc.edu>
Sun, 5 Mar 2006 20:04:41 +0000 (20:04 +0000)
committerGreg Koenig <koenig@uiuc.edu>
Sun, 5 Mar 2006 20:04:41 +0000 (20:04 +0000)
detect cluster topology as well as balance secondarily on measured CPU
load.  The balancer also works with any number of clusters now.

src/ck-ldb/GridCommLB.C
src/ck-ldb/GridCommLB.h

index cf56b18483e799ff65365569df10840feeaf3b25..55173bd67efd5e43b004527b0cece2cffb9aa5f0 100644 (file)
@@ -1,19 +1,50 @@
 /**************************************************************************
 ** Greg Koenig (koenig@uiuc.edu)
 ** November 4, 2004
+**
+** This is GridCommLB.C
+**
+** GridCommLB is a load balancer for the Charm++ load balancing framework.
+** It is designed to work in a Grid computing environment consisting of
+** two or more clusters separated by wide-area communication links.
+** Communication between objects within a cluster is assumed to be light
+** weight (measured in microseconds) while communication between objects
+** on different clusters is assumed to be heavy weight (measured in
+** milliseconds).
+**
+** The load balancer examines all communications in a computation and
+** attempts to spread the objects that communicate to objects on remote
+** clusters evenly across the PEs in the local cluster.  No objects are
+** ever migrated across cluster boundaries, they are simply distributed
+** as evenly as possible across the PEs in the cluster in which they were
+** originally placed.  The idea is that by spreading objects that
+** communicate over the wide-area evenly, a relatively small number of
+** WAN objects will be mixed with a relatively large number of LAN
+** objects, allowing the message-driven characteristics of Charm++ the
+** greatest possibility of overlapping the high-cost WAN communication
+** with locally-driven work.
+**
+** The load balancer secondarily balances on scaled processor load
+** (i.e., a processor that is 2x the speed of another processor in
+** the local cluster will get 2x the work) as well as the number of
+** LAN objects.
+**
+** This load balancer can severely disrupt the object-to-PE mapping by
+** causing large numbers of objects to migrate with each load balancing
+** invocation.  This may be undesirable in some cases.  (For example, if
+** the vmi-linux "eager protocol" is used, eager channels may be pinned
+** between two PEs, and migrating objects that communicate heavily with
+** each other onto other PEs could actually slow the computationif they
+** no longer communicate with each other over an eager channel.)
 */
-#include <stdio.h>
-
-#include "charm++.h"
-#include "cklists.h"
 
 #include "GridCommLB.decl.h"
 
 #include "GridCommLB.h"
 #include "manager.h"
 
+CreateLBFunc_Def (GridCommLB, "Grid communication load balancer (evenly distribute objects across each cluster)");
 
-CreateLBFunc_Def (GridCommLB, "Load balancer for Grid computing environments");
 
 
 /**************************************************************************
@@ -21,20 +52,21 @@ CreateLBFunc_Def (GridCommLB, "Load balancer for Grid computing environments");
 */
 GridCommLB::GridCommLB (const CkLBOptions &opt) : CentralLB (opt)
 {
-    lbname = (char *) "GridCommLB";
+  lbname = (char *) "GridCommLB";
 
-    if (CkMyPe () == 0) {
-      CkPrintf ("[%d] GridCommLB created\n", CkMyPe ());
-    }
+  if (CkMyPe() == 0) {
+    CkPrintf ("[%d] GridCommLB created.\n", CkMyPe());
+  }
 
-    manager_init ();
+  manager_init ();
 }
 
 
+
 /**************************************************************************
 **
 */
-GridCommLB::GridCommLB (CkMigrateMessage *m) : CentralLB (m)
+GridCommLB::GridCommLB (CkMigrateMessage *msg) : CentralLB (msg)
 {
   lbname = (char *) "GridCommLB";
 
@@ -42,31 +74,51 @@ GridCommLB::GridCommLB (CkMigrateMessage *m) : CentralLB (m)
 }
 
 
+
 /**************************************************************************
-**
+** The Charm++ load balancing framework invokes this method to determine
+** whether load balancing can be performed at a specified time.
 */
 CmiBool GridCommLB::QueryBalanceNow (int step)
 {
-  // CkPrintf ("[%d] Balancing on step %d\n", CkMyPe (), step);
+  if (_lb_args.debug() >= 1) {
+    CkPrintf ("[%d] GridCommLB is balancing on step %d.\n", CkMyPe(), step);
+  }
+
   return (CmiTrue);
 }
 
 
+
 /**************************************************************************
+** The vmi-linux machine layer incorporates the idea that PEs are located
+** within identifiable clusters.  This information can be supplied by the
+** user or can be probed automatically by the machine layer.  The exposed
+** API call CmiGetCluster() returns the integer cluster number for a
+** specified PE or -1 if the information is unknown.
 **
+** For machine layers other than vmi-linux, simply return the constant 0.
+** GridCommLB will assume a single-cluster computation and will balance
+** on the scaled processor load and number of LAN messages.
 */
 int GridCommLB::Get_Cluster (int pe)
 {
-  if (pe < (Num_PEs / 2)) {
-    return (0);
-  } else {
-    return (1);
-  }
+#if CONVERSE_VERSION_VMI
+  return (CmiGetCluster (pe));
+#else
+  return (0);
+#endif
 }
 
 
+
 /**************************************************************************
+** This method locates the maximum WAN object in terms of number of
+** messages that traverse a wide-area connection.  The search is
+** constrained to objects within the specified cluster that have not yet
+** been mapped (balanced) to a PE.
 **
+** The method returns -1 if no matching object is found.
 */
 int GridCommLB::Find_Maximum_WAN_Object (int cluster)
 {
@@ -75,7 +127,7 @@ int GridCommLB::Find_Maximum_WAN_Object (int cluster)
   int max_wan_msgs;
 
 
-  max_index    = -1;
+  max_index = -1;
   max_wan_msgs = -1;
 
   for (i = 0; i < Num_Objects; i++) {
@@ -93,32 +145,45 @@ int GridCommLB::Find_Maximum_WAN_Object (int cluster)
 }
 
 
+
 /**************************************************************************
+** This method locates the minimum WAN PE in terms of number of objects
+** that communicate with objects across a wide-area connection.  The search
+** is constrained to PEs within the specified cluster.
+**
+** In the event of a "tie" (i.e., the number of WAN objects on a candidate
+** PE is equal to the minimum number of WAN objects discovered so far) the
+** tie is broken by considering the scaled CPU loads on the PEs.  The PE
+** with the smaller scaled load is the better candidate.  In the event of
+** a secondary tie, the secondary tie is broken by considering the number
+** of LAN objects on the two PEs.
 **
+** The method returns -1 if no matching PE is found.
 */
 int GridCommLB::Find_Minimum_WAN_PE (int cluster)
 {
   int i;
   int min_index;
   int min_wan_objs;
-  int min_lan_objs;
 
 
   min_index = -1;
-  min_wan_objs = 999999999;
-  min_lan_objs = 999999999;
+  min_wan_objs = INT_MAX;
 
   for (i = 0; i < Num_PEs; i++) {
     if (((&PE_Data[i])->available) && ((&PE_Data[i])->cluster == cluster)) {
       if ((&PE_Data[i])->num_wan_objs < min_wan_objs) {
        min_index = i;
        min_wan_objs = (&PE_Data[i])->num_wan_objs;
-       min_lan_objs = (&PE_Data[i])->num_lan_objs;
-      } else if ((&PE_Data[i])->num_wan_objs == min_wan_objs) {
-       if ((&PE_Data[i])->num_lan_objs < min_lan_objs) {
-         min_index = i;
-         min_lan_objs = (&PE_Data[i])->num_lan_objs;
-       }
+      } else if (((&PE_Data[i])->num_wan_objs == min_wan_objs) &&
+                ((&PE_Data[i])->scaled_load < (&PE_Data[min_index])->scaled_load)) {
+       min_index = i;
+       min_wan_objs = (&PE_Data[i])->num_wan_objs;
+      } else if (((&PE_Data[i])->num_wan_objs == min_wan_objs) &&
+                ((&PE_Data[i])->scaled_load == (&PE_Data[min_index])->scaled_load) &&
+                ((&PE_Data[i])->num_lan_objs < (&PE_Data[min_index])->num_lan_objs)) {
+       min_index = i;
+       min_wan_objs = (&PE_Data[i])->num_wan_objs;
       }
     }
   }
@@ -127,35 +192,45 @@ int GridCommLB::Find_Minimum_WAN_PE (int cluster)
 }
 
 
+
 /**************************************************************************
-**
+** This method assigns target_object to target_pe.  The data structure
+** entry for target_pe is updated appropriately with measurements from
+** target_object.  This updated information is considered when placing
+** successive objects onto PEs.
 */
 void GridCommLB::Assign_Object_To_PE (int target_object, int target_pe)
 {
   (&Object_Data[target_object])->to_pe = target_pe;
 
-  (&PE_Data[target_pe])->num_objs++;
+  (&PE_Data[target_pe])->num_objs += 1;
 
   if ((&Object_Data[target_object])->num_lan_msgs > 0) {
-    (&PE_Data[target_pe])->num_lan_objs++;
-    (&PE_Data[target_pe])->num_lan_msgs +=
-        (&Object_Data[target_object])->num_lan_msgs;
+    (&PE_Data[target_pe])->num_lan_objs += 1;
+    (&PE_Data[target_pe])->num_lan_msgs += (&Object_Data[target_object])->num_lan_msgs;
   }
 
   if ((&Object_Data[target_object])->num_wan_msgs > 0) {
-    (&PE_Data[target_pe])->num_wan_objs++;
-    (&PE_Data[target_pe])->num_wan_msgs +=
-        (&Object_Data[target_object])->num_wan_msgs;
+    (&PE_Data[target_pe])->num_wan_objs += 1;
+    (&PE_Data[target_pe])->num_wan_msgs += (&Object_Data[target_object])->num_wan_msgs;
   }
+
+  (&PE_Data[target_pe])->scaled_load += (&Object_Data[target_object])->load / (&PE_Data[target_pe])->relative_speed;
 }
 
 
+
 /**************************************************************************
-**
+** The Charm++ load balancing framework invokes this method to cause the
+** load balancer to migrate objects to "better" PEs.
 */
 void GridCommLB::work (CentralLB::LDStats *stats, int count)
 {
   int i;
+  CmiBool available;
+  CmiBool all_pes_mapped;
+  int max_cluster;
+  int min_speed;
   int send_object;
   int send_pe;
   int send_cluster;
@@ -167,38 +242,121 @@ void GridCommLB::work (CentralLB::LDStats *stats, int count)
   LDCommData *com_data;
 
 
+  if (_lb_args.debug() >= 1) {
+    CkPrintf ("[%d] GridCommLB is working...\n", CkMyPe());
+  }
+
+  // Since this load balancer looks at communications data, it must initialize the CommHash.
   stats->makeCommHash ();
 
+  // Initialize object variables for the number of PEs and number of objects.
   Num_PEs = count;
   Num_Objects = stats->n_objs;
 
+  if (_lb_args.debug() >= 1) {
+    CkPrintf ("[%d] GridCommLB is examining %d PEs and %d objects.\n", CkMyPe(), Num_PEs, Num_Objects);
+  }
+
+  // Instantiate and initialize the PE_Data[] data structure.
+  //
+  // While doing this...
+  //    - ensure that there is at least one available PE
+  //    - ensure that all PEs are mapped to a cluster
+  //    - determine the maximum cluster number (gives the number of clusters)
+  //    - determine the minimum speed PE (used to compute relative PE speeds)
   PE_Data = new PE_Data_T[Num_PEs];
-  Object_Data = new Object_Data_T[Num_Objects];
+
+  available = CmiFalse;
+  all_pes_mapped = CmiTrue;
+  max_cluster = -1;
+  min_speed = INT_MAX;
 
   for (i = 0; i < Num_PEs; i++) {
-    (&PE_Data[i])->available    = stats->procs[i].available;
-    (&PE_Data[i])->cluster      = Get_Cluster (i);
-    (&PE_Data[i])->num_objs     = 0;
-    (&PE_Data[i])->num_lan_objs = 0;
-    (&PE_Data[i])->num_lan_msgs = 0;
-    (&PE_Data[i])->num_wan_objs = 0;
-    (&PE_Data[i])->num_wan_msgs = 0;
+    (&PE_Data[i])->available      = stats->procs[i].available;
+    (&PE_Data[i])->cluster        = Get_Cluster (i);
+    (&PE_Data[i])->num_objs       = 0;
+    (&PE_Data[i])->num_lan_objs   = 0;
+    (&PE_Data[i])->num_lan_msgs   = 0;
+    (&PE_Data[i])->num_wan_objs   = 0;
+    (&PE_Data[i])->num_wan_msgs   = 0;
+    (&PE_Data[i])->relative_speed = 0.0;
+    (&PE_Data[i])->scaled_load    = 0.0;
+
+    available |= (&PE_Data[i])->available;
+
+    all_pes_mapped &= ((&PE_Data[i])->cluster >= 0);
+
+    if ((&PE_Data[i])->cluster > max_cluster) {
+      max_cluster = (&PE_Data[i])->cluster;
+    }
+
+    if (stats->procs[i].pe_speed < min_speed) {
+      min_speed = stats->procs[i].pe_speed;
+    }
+  }
+
+  // If at least one available PE does not exist, return from load balancing.
+  if (!available) {
+    if (_lb_args.debug() >= 1) {
+      CkPrintf ("[%d] GridCommLB finds no available PEs -- no balancing done.\n", CkMyPe());
+    }
+
+    delete [] PE_Data;
+
+    return;
+  }
+
+  // If not all PEs are mapped to a cluster, return from load balancing.
+  if (!all_pes_mapped) {
+    if (_lb_args.debug() >= 1) {
+      CkPrintf ("[%d] GridCommLB finds incomplete PE cluster map -- no balancing done.\n", CkMyPe());
+    }
+
+    delete [] PE_Data;
+
+    return;
+  }
+
+  // The number of clusters is equal to the maximum cluster number plus one.
+  Num_Clusters = max_cluster + 1;
+
+  if (_lb_args.debug() >= 1) {
+    CkPrintf ("[%d] GridCommLB finds %d clusters.\n", CkMyPe(), Num_Clusters);
+  }
+
+  // Compute the relative PE speeds.
+  // Also add background CPU time to each PE's scaled load.
+  for (i = 0; i < Num_PEs; i++) {
+    (&PE_Data[i])->relative_speed = (double) (stats->procs[i].pe_speed / min_speed);
+
+    (&PE_Data[i])->scaled_load += stats->procs[i].bg_cputime;
   }
 
+  // Initialize the Object_Data[] data structure.
+  Object_Data = new Object_Data_T[Num_Objects];
+
   for (i = 0; i < Num_Objects; i++) {
     (&Object_Data[i])->migratable   = (&stats->objData[i])->migratable;
     (&Object_Data[i])->cluster      = Get_Cluster (stats->from_proc[i]);
     (&Object_Data[i])->from_pe      = stats->from_proc[i];
     (&Object_Data[i])->num_lan_msgs = 0;
     (&Object_Data[i])->num_wan_msgs = 0;
+    (&Object_Data[i])->load         = (&stats->objData[i])->wallTime;
 
     if ((&Object_Data[i])->migratable) {
       (&Object_Data[i])->to_pe = -1;
     } else {
       (&Object_Data[i])->to_pe = (&Object_Data[i])->from_pe;
+
+      (&PE_Data[(&Object_Data[i])->to_pe])->scaled_load += (&Object_Data[i])->load;
+
+      if (_lb_args.debug() >= 2) {
+       CkPrintf ("[%d] GridCommLB identifies object %d as non-migratable.\n", CkMyPe(), i);
+      }
     }
   }
 
+  // Examine all object-to-object messages for intra-cluster and inter-cluster communications.
   for (i = 0; i < stats->n_comm; i++) {
     com_data = &(stats->commData[i]);
     if ((!com_data->from_proc()) && (com_data->recv_type() == LD_OBJ_MSG)) {
@@ -219,35 +377,36 @@ void GridCommLB::work (CentralLB::LDStats *stats, int count)
     }
   }
 
-  // Map objects to PEs in cluster 0.
-  while (1) {
-    target_object = Find_Maximum_WAN_Object (0);
-    target_pe     = Find_Minimum_WAN_PE (0);
-
-    if ((target_object == -1) || (target_pe == -1)) {
-      break;
-    }
-
-    Assign_Object_To_PE (target_object, target_pe);
-  }
+  // Map objects to PEs in each cluster.
+  for (i = 0; i < Num_Clusters; i++) {
+    while (1) {
+      target_object = Find_Maximum_WAN_Object (i);
+      target_pe = Find_Minimum_WAN_PE (i);
 
-  // Map objects to PEs in cluster 1.
-  while (1) {
-    target_object = Find_Maximum_WAN_Object (1);
-    target_pe     = Find_Minimum_WAN_PE (1);
+      if ((target_object == -1) || (target_pe == -1)) {
+       break;
+      }
 
-    if ((target_object == -1) || (target_pe == -1)) {
-      break;
+      Assign_Object_To_PE (target_object, target_pe);
     }
-
-    Assign_Object_To_PE (target_object, target_pe);
   }
 
   // Make the assignment of objects to PEs in the load balancer framework.
   for (i = 0; i < Num_Objects; i++) {
     stats->to_proc[i] = (&Object_Data[i])->to_pe;
+
+    if (_lb_args.debug() >= 3) {
+      CkPrintf ("[%d] GridCommLB migrates object %d from PE %d to PE %d.\n", CkMyPe(), i, stats->from_proc[i], stats->to_proc[i]);
+    } else if (_lb_args.debug() >= 2) {
+      if (stats->to_proc[i] != stats->from_proc[i]) {
+       CkPrintf ("[%d] GridCommLB migrates object %d from PE %d to PE %d.\n", CkMyPe(), i, stats->from_proc[i], stats->to_proc[i]);
+      }
+    }
   }
-}
 
+  // Free memory.
+  delete [] Object_Data;
+  delete [] PE_Data;
+}
 
 #include "GridCommLB.def.h"
index 30c4d0650fc3db0105ec174b8de3eac7d16648ce..857315ad5ccc8474dd244c9dbf11acfa907b22d5 100644 (file)
@@ -1,8 +1,18 @@
 #ifndef _GRIDCOMMLB_H_
 #define _GRIDCOMMLB_H_
 
+#include <limits.h>
+#include <stdio.h>
+
+#include "charm++.h"
+#include "cklists.h"
+
 #include "CentralLB.h"
 
+#if CONVERSE_VERSION_VMI
+extern "C" int CmiGetCluster (int process);
+#endif
+
 void CreateGridCommLB ();
 
 class PE_Data_T
@@ -15,6 +25,8 @@ class PE_Data_T
     int num_lan_msgs;
     int num_wan_objs;
     int num_wan_msgs;
+    double relative_speed;
+    double scaled_load;
 };
 
 class Object_Data_T
@@ -26,13 +38,14 @@ class Object_Data_T
     int to_pe;
     int num_lan_msgs;
     int num_wan_msgs;
+    double load;
 };
 
 class GridCommLB : public CentralLB
 {
   public:
     GridCommLB (const CkLBOptions &);
-    GridCommLB (CkMigrateMessage *m);
+    GridCommLB (CkMigrateMessage *msg);
 
     void work (CentralLB::LDStats *stats, int count);
 
@@ -47,6 +60,7 @@ class GridCommLB : public CentralLB
 
     int Num_PEs;
     int Num_Objects;
+    int Num_Clusters;
     PE_Data_T *PE_Data;
     Object_Data_T *Object_Data;
 };