Bug fixes for NeighborLB and heap, and a new load balancer for workstations,
authorRobert Brunner <rbrunner@uiuc.edu>
Tue, 23 Nov 1999 01:14:50 +0000 (01:14 +0000)
committerRobert Brunner <rbrunner@uiuc.edu>
Tue, 23 Nov 1999 01:14:50 +0000 (01:14 +0000)
WSLB

src/ck-ldb/NeighborLB.C
src/ck-ldb/NeighborLB.h
src/ck-ldb/WSLB.C [new file with mode: 0644]
src/ck-ldb/WSLB.ci [new file with mode: 0644]
src/ck-ldb/WSLB.h [new file with mode: 0644]
src/ck-ldb/heap.C
src/ck-ldb/heap.h

index 73203caf5197f14813e0157d40415f1f4ddb2561..74afbfeafe037c84fa36237754f0e33f75ec45a9 100644 (file)
@@ -1,3 +1,4 @@
+#include <unistd.h>
 #include <charm++.h>
 #include <LBDatabase.h>
 #include "NeighborLB.h"
@@ -37,24 +38,25 @@ NeighborLB::NeighborLB()
     NotifyMigrated(reinterpret_cast<LDMigratedFn>(staticMigrated),
                   static_cast<void*>(this));
 
-  stats_msg_count = 0;
-  statsMsgsList = new NLBStatsMsg*[num_neighbors()];
-  for(int i=0; i < CkNumPes(); i++)
-    statsMsgsList[i] = 0;
-  statsDataList = new LDStats[num_neighbors()];
-
-  neighbor_pes = new int[num_neighbors()];
-  neighbors(neighbor_pes);
 
+  // I had to move neighbor initialization outside the constructor
+  // in order to get the virtual functions of any derived classes
+  // so I'll just set them to illegal values here.
+  neighbor_pes = 0;
+  stats_msg_count = 0;
+  statsMsgsList = 0;
+  statsDataList = 0;
   migrates_completed = 0;
   migrates_expected = -1;
   mig_msgs_received = 0;
-  mig_msgs_expected = num_neighbors();
-  mig_msgs = new NLBMigrateMsg*[num_neighbors()];
-
-  proc_speed = theLbdb->ProcessorSpeed();
-  obj_data_sz = 0;
-  comm_data_sz = 0;
+  mig_msgs = 0;
+
+  myStats.proc_speed = theLbdb->ProcessorSpeed();
+  char hostname[80];
+  gethostname(hostname,79);
+  CkPrintf("[%d] host %s speed %d\n",CkMyPe(),hostname,myStats.proc_speed);
+  myStats.obj_data_sz = 0;
+  myStats.comm_data_sz = 0;
   receive_stats_ready = 0;
 
   theLbdb->CollectStatsOn();
@@ -65,9 +67,29 @@ NeighborLB::~NeighborLB()
   CkPrintf("Going away\n");
 }
 
+void NeighborLB::FindNeighbors()
+{
+  if (neighbor_pes == 0) { // Neighbors never initialized, so init them
+                           // and other things that depend on the number
+                           // of neighbors
+    statsMsgsList = new NLBStatsMsg*[num_neighbors()];
+    for(int i=0; i < num_neighbors(); i++)
+      statsMsgsList[i] = 0;
+    statsDataList = new LDStats[num_neighbors()];
+
+    neighbor_pes = new int[num_neighbors()];
+    neighbors(neighbor_pes);
+    mig_msgs_expected = num_neighbors();
+    mig_msgs = new NLBMigrateMsg*[num_neighbors()];
+  }
+
+}
+
 void NeighborLB::AtSync()
 {
-  CkPrintf("[%d] NeighborLB At Sync step %d!!!!\n",CkMyPe(),mystep);
+  //  CkPrintf("[%d] NeighborLB At Sync step %d!!!!\n",CkMyPe(),mystep);
+
+  if (neighbor_pes == 0) FindNeighbors();
 
   if (!QueryBalanceNow(step()) || num_neighbors() == 0) {
     MigrationDone();
@@ -85,45 +107,48 @@ void NeighborLB::AtSync()
     CProxy_NeighborLB(thisgroup).ReceiveStats(msg,neighbor_pes[0]);
   } else delete msg;
 
-  //  delete msg;
   // Tell our own node that we are ready
   ReceiveStats((NLBStatsMsg*)0);
-  CkPrintf("[%d] done with AtSync\n",CkMyPe());
 }
 
 NLBStatsMsg* NeighborLB::AssembleStats()
 {
-  // Send stats
-  obj_data_sz = theLbdb->GetObjDataSz();
-  myObjData = new LDObjData[obj_data_sz];
-  theLbdb->GetObjData(myObjData);
-
-  comm_data_sz = theLbdb->GetCommDataSz();
-  myCommData = new LDCommData[comm_data_sz];
-  theLbdb->GetCommData(myCommData);
-  
+  // Get stats
+  theLbdb->TotalTime(&myStats.total_walltime,&myStats.total_cputime);
+  theLbdb->IdleTime(&myStats.idletime);
+  theLbdb->BackgroundLoad(&myStats.bg_walltime,&myStats.bg_cputime);
+  myStats.obj_data_sz = theLbdb->GetObjDataSz();
+  myStats.objData = new LDObjData[myStats.obj_data_sz];
+  theLbdb->GetObjData(myStats.objData);
+
+  myStats.comm_data_sz = theLbdb->GetCommDataSz();
+  myStats.commData = new LDCommData[myStats.comm_data_sz];
+  theLbdb->GetCommData(myStats.commData);
+
+  myStats.obj_walltime = myStats.obj_cputime = 0;
+  for(int i=0; i < myStats.obj_data_sz; i++) {
+    myStats.obj_walltime += myStats.objData[i].wallTime;
+    myStats.obj_cputime += myStats.objData[i].cpuTime;
+  }    
+
   NLBStatsMsg* msg = new NLBStatsMsg;
 
   msg->from_pe = CkMyPe();
   msg->serial = rand();
-
-  theLbdb->TotalTime(&msg->total_walltime,&msg->total_cputime);
-  theLbdb->IdleTime(&msg->idletime);
-  theLbdb->BackgroundLoad(&msg->bg_walltime,&msg->bg_cputime);
-  msg->proc_speed = proc_speed;
-
-  msg->obj_walltime = msg->obj_cputime = 0;
-
-  for(int i=0; i < obj_data_sz; i++) {
-    msg->obj_walltime += myObjData[i].wallTime;
-    msg->obj_cputime += myObjData[i].cpuTime;
-  }    
-
-  CkPrintf(
-    "Proc %d speed=%d Total(wall,cpu)=%f %f Idle=%f Bg=%f %f Obj=%f %f\n",
-    CkMyPe(),msg->proc_speed,msg->total_walltime,msg->total_cputime,
-    msg->idletime,msg->bg_walltime,msg->bg_cputime,
-    msg->obj_walltime,msg->obj_cputime);
+  msg->proc_speed = myStats.proc_speed;
+  msg->total_walltime = myStats.total_walltime;
+  msg->total_cputime = myStats.total_cputime;
+  msg->idletime = myStats.idletime;
+  msg->bg_walltime = myStats.bg_walltime;
+  msg->bg_cputime = myStats.bg_cputime;
+  msg->obj_walltime += myStats.obj_walltime;
+  msg->obj_cputime += myStats.obj_cputime;
+
+  //  CkPrintf(
+  //    "Proc %d speed=%d Total(wall,cpu)=%f %f Idle=%f Bg=%f %f Obj=%f %f\n",
+  //    CkMyPe(),msg->proc_speed,msg->total_walltime,msg->total_cputime,
+  //    msg->idletime,msg->bg_walltime,msg->bg_cputime,
+  //    msg->obj_walltime,msg->obj_cputime);
 
   //  CkPrintf("PE %d sending %d to ReceiveStats %d objs, %d comm\n",
   //      CkMyPe(),msg->serial,msg->n_objs,msg->n_comm);
@@ -142,6 +167,8 @@ void NeighborLB::Migrated(LDObjHandle h)
 
 void NeighborLB::ReceiveStats(NLBStatsMsg *m)
 {
+  if (neighbor_pes == 0) FindNeighbors();
+
   if (m == 0) { // This is from our own node
     receive_stats_ready = 1;
   } else {
@@ -185,7 +212,6 @@ void NeighborLB::ReceiveStats(NLBStatsMsg *m)
       MigrateInfo& move = migrateMsg->moves[i];
       const int me = CkMyPe();
       if (move.from_pe == me && move.to_pe != me) {
-       CkPrintf("[%d] migrating object to %d\n",move.from_pe,move.to_pe);
        theLbdb->Migrate(move.obj,move.to_pe);
       } else if (move.from_pe != me) {
        CkPrintf("[%d] error, strategy wants to move from %d to  %d\n",
@@ -209,28 +235,33 @@ void NeighborLB::ReceiveStats(NLBStatsMsg *m)
       statsMsgsList[i]=0;
     }
     stats_msg_count=0;
+
+    theLbdb->ClearLoads();
   }
   
-  theLbdb->ClearLoads();
 }
 
 void NeighborLB::ReceiveMigration(NLBMigrateMsg *msg)
 {
+  if (neighbor_pes == 0) FindNeighbors();
+
+  if (mig_msgs_received == 0) migrates_expected = 0;
+
   mig_msgs[mig_msgs_received] = msg;
   mig_msgs_received++;
   //  CkPrintf("[%d] Received migration msg %d of %d\n",
   //      CkMyPe(),mig_msgs_received,mig_msgs_expected);
 
-  if (mig_msgs_received < mig_msgs_expected)
-    return;
-  else if (mig_msgs_received < mig_msgs_expected) {
+  if (mig_msgs_received > mig_msgs_expected) {
     CkPrintf("[%d] NeighborLB Error! Too many migration messages received\n",
             CkMyPe());
+  }
+
+  if (mig_msgs_received != mig_msgs_expected) {
     return;
   }
 
   //  CkPrintf("[%d] in ReceiveMigration %d moves\n",CkMyPe(),msg->n_moves);
-  migrates_expected = 0;
   for(int neigh=0; neigh < mig_msgs_received;neigh++) {
     NLBMigrateMsg* m = mig_msgs[neigh];
     for(int i=0; i < m->n_moves; i++) {
@@ -257,7 +288,6 @@ void NeighborLB::MigrationDone()
   migrates_expected = -1;
   // Increment to next step
   mystep++;
-  //  CkPrintf("[%d] Resuming clients\n",CkMyPe());
   CProxy_NeighborLB(thisgroup).ResumeClients(CkMyPe());
 }
 
@@ -277,10 +307,10 @@ NLBMigrateMsg* NeighborLB::Strategy(LDStats* stats,int count)
     stats[j].obj_walltime,stats[j].obj_cputime);
   }
 
-  delete [] myObjData;
-  obj_data_sz = 0;
-  delete [] myCommData;
-  comm_data_sz = 0;
+  delete [] myStats.objData;
+  myStats.obj_data_sz = 0;
+  delete [] myStats.commData;
+  myStats.comm_data_sz = 0;
 
   int sizes=0;
   NLBMigrateMsg* msg = new(&sizes,1) NLBMigrateMsg;
index 428160ffdc3c819302ebab3befc233c48b0c97af..a510b7784acfa4d50a709a6cc54986ecb756e27d 100644 (file)
@@ -62,8 +62,23 @@ protected:
   };
 
   LBDatabase* theLbdb;
+  struct {
+    int proc_speed;
+    double total_walltime;
+    double total_cputime;
+    double idletime;
+    double bg_walltime;
+    double bg_cputime;
+    int obj_data_sz;
+    LDObjData* objData;
+    int comm_data_sz;
+    LDCommData* commData;
+    double obj_walltime;
+    double obj_cputime;
+  } myStats;
 
 private:
+  void FindNeighbors();
   NLBStatsMsg* AssembleStats();
 
   int mystep;
@@ -76,11 +91,6 @@ private:
   int mig_msgs_received;
   int mig_msgs_expected;
   int* neighbor_pes;
-  int proc_speed;
-  LDObjData* myObjData;
-  int obj_data_sz;
-  LDCommData* myCommData;
-  int comm_data_sz;
   int receive_stats_ready;
 };
 
diff --git a/src/ck-ldb/WSLB.C b/src/ck-ldb/WSLB.C
new file mode 100644 (file)
index 0000000..5eb5b16
--- /dev/null
@@ -0,0 +1,167 @@
+#include <charm++.h>
+
+#if CMK_LBDB_ON
+
+#if CMK_STL_USE_DOT_H
+#include <deque.h>
+#include <queue.h>
+#else
+#include <deque>
+#include <queue>
+#endif
+
+#include "heap.h"
+#include "WSLB.h"
+#include "WSLB.def.h"
+
+#if CMK_STL_USE_DOT_H
+template class deque<NeighborLB::MigrateInfo>;
+#else
+template class std::deque<NeighborLB::MigrateInfo>;
+#endif
+
+void CreateWSLB()
+{
+  loadbalancer = CProxy_WSLB::ckNew();
+}
+
+WSLB::WSLB()
+{
+  CkPrintf("[%d] WSLB created\n",CkMyPe());
+}
+
+NLBMigrateMsg* WSLB::Strategy(NeighborLB::LDStats* stats, int count)
+{
+  //  CkPrintf("[%d] Strategy starting\n",CkMyPe());
+  // Compute the average load to see if we are overloaded relative
+  // to our neighbors
+  double myload = myStats.total_walltime - myStats.idletime;
+  double avgload = myload;
+  int i;
+  for(i=0; i < count; i++) {
+    // Scale times we need appropriately for relative proc speeds
+    const double scale =  ((double)myStats.proc_speed) 
+      / stats[i].proc_speed;
+
+    stats[i].total_walltime *= scale;
+    stats[i].idletime *= scale;
+
+    avgload += (stats[i].total_walltime - stats[i].idletime);
+  }
+  avgload /= (count+1);
+
+#if CMK_STL_USE_DOT_H
+  queue<MigrateInfo> migrateInfo;
+#else
+  std::queue<MigrateInfo> migrateInfo;
+#endif
+
+  if (myload < avgload)
+    CkPrintf("[%d] Underload My load is %f, average load is %f\n",
+            CkMyPe(),myload,avgload);
+  else {
+    CkPrintf("[%d] OVERLOAD My load is %f, average load is %f\n",
+            CkMyPe(),myload,avgload);
+
+    // First, build heaps of other processors and my objects
+    // Then assign objects to other processors until either
+    //   - The smallest remaining object would put me below average, or
+    //   - I only have 1 object left, or
+    //   - The smallest remaining object would put someone else 
+    //     above average
+
+    // Build heaps
+    minHeap procs(count);
+    for(i=0; i < count; i++) {
+      InfoRecord* item = new InfoRecord;
+      item->load = stats[i].total_walltime - stats[i].idletime;
+      item->Id =  stats[i].from_pe;
+      procs.insert(item);
+    }
+      
+    maxHeap objs(myStats.obj_data_sz);
+    for(i=0; i < myStats.obj_data_sz; i++) {
+      InfoRecord* item = new InfoRecord;
+      item->load = myStats.objData[i].wallTime;
+      item->Id = i;
+      objs.insert(item);
+    }
+
+    int objs_here = myStats.obj_data_sz;
+    do {
+      if (objs_here <= 1) break;  // For now, always leave 1 object
+
+      InfoRecord* p;
+      InfoRecord* obj;
+
+      // Get the lightest-loaded processor
+      p = procs.deleteMin();
+      if (p == 0) {
+       CkPrintf("[%d] No destination PE found!\n",CkMyPe());
+       break;
+      }
+
+      // Get the biggest object
+      CmiBool objfound = CmiFalse;
+      do {
+       obj = objs.deleteMax();
+       if (obj == 0) break;
+
+       double new_p_load = p->load + obj->load;
+       if (new_p_load < avgload) {
+         objfound = CmiTrue;
+       } else {
+         // This object is too big, so throw it away
+         delete obj;
+       }
+      } while (!objfound);
+
+      if (!objfound) {
+       //      CkPrintf("[%d] No suitable object found!\n",CkMyPe());
+       break;
+      }
+
+      const int me = CkMyPe();
+      // Apparently we can give this object to this processor
+      CkPrintf("[%d] Obj %d of %d migrating from %d to %d\n",
+              CkMyPe(),obj->Id,myStats.obj_data_sz,me,p->Id);
+
+      MigrateInfo migrateMe;
+      migrateMe.obj = myStats.objData[obj->Id].handle;
+      migrateMe.from_pe = me;
+      migrateMe.to_pe = p->Id;
+      migrateInfo.push(migrateMe);
+
+      objs_here--;
+      
+      // We may want to assign more to this processor, so lets
+      // update it and put it back in the heap
+      p->load += obj->load;
+      myload -= obj->load;
+      procs.insert(p);
+      
+      // This object is assigned, so we delete it from the heap
+      delete obj;
+
+    } while(myload > avgload);
+
+    // Now empty out the heaps
+    while (InfoRecord* p=procs.deleteMin())
+      delete p;
+    while (InfoRecord* obj=objs.deleteMax())
+      delete obj;
+  }  
+
+  // Now build the message to actually perform the migrations
+  int migrate_count=migrateInfo.size();
+  NLBMigrateMsg* msg = new(&migrate_count,1) NLBMigrateMsg;
+  msg->n_moves = migrate_count;
+  for(int i=0; i < migrate_count; i++) {
+    msg->moves[i] = migrateInfo.front();
+    migrateInfo.pop();
+  }
+
+  return msg;
+};
+
+#endif
diff --git a/src/ck-ldb/WSLB.ci b/src/ck-ldb/WSLB.ci
new file mode 100644 (file)
index 0000000..b5b3e0c
--- /dev/null
@@ -0,0 +1,9 @@
+module WSLB {
+
+extern module NeighborLB;
+
+group WSLB : NeighborLB {
+  entry void WSLB(void);  
+};
+
+};
diff --git a/src/ck-ldb/WSLB.h b/src/ck-ldb/WSLB.h
new file mode 100644 (file)
index 0000000..dba7d7a
--- /dev/null
@@ -0,0 +1,34 @@
+#ifndef _WSLB_H_
+#define _WSLB_H_
+
+#include "NeighborLB.h"
+#include "WSLB.decl.h"
+
+void CreateWSLB();
+
+class WSLB : public NeighborLB {
+public:
+  WSLB();
+private:
+  CmiBool QueryBalanceNow(int step) { return CmiTrue; };
+  virtual int num_neighbors() {
+    return (CkNumPes() > 5) ? 4 : (CkNumPes()-1);
+  };
+  virtual void neighbors(int* _n) {
+    CkPrintf("[%d] Saving neighbors\n",CkMyPe());
+    const int me = CkMyPe();
+    const int npe = CkNumPes();
+    if (npe > 1)
+      _n[0] = (me + npe - 1) % npe;
+    if (npe > 2)
+      _n[1] = (me + 1) % npe;
+    if (npe > 3)
+      _n[2] = (me + 2) % npe;
+    if (npe > 4)
+      _n[3] = (me + npe - 2) % npe;
+  };
+
+  NLBMigrateMsg* Strategy(NeighborLB::LDStats* stats, int count);
+};
+
+#endif /* _WSLB_H_ */
index 53292951f7085f5b264c85d3bde3a58e5b724ef4..44f21802a7925b5ad0e0c3acfc1cb0d930cbf51e 100644 (file)
@@ -18,6 +18,11 @@ minHeap::minHeap(int nsize)
   count = 0;
 }
 
+minHeap::~minHeap()
+{
+  delete [] h;
+}
+
 int minHeap::numElements()
 {
   return count;
@@ -25,15 +30,17 @@ int minHeap::numElements()
 
 int minHeap::insert(InfoRecord *x)
 {
-  h[count].info = x;
-  h[count].deleted = 0;
-
-  int current = count;
-  count++;
-
-  if (count >= size) {
+  int current;
+
+  if (count < size) {
+    h[count].info = x;
+    h[count].deleted = 0;
+    current = count;
+    count++;
+  } else {
     cout << "minHeap overflow. \n" ; 
-    return -1;}
+    return -1;
+  }
 
   int parent = (current - 1)/2;
   while (current != 0)
@@ -110,6 +117,11 @@ maxHeap::maxHeap(int nsize)
   count = 0;
 }
 
+maxHeap::~maxHeap()
+{
+  delete [] h;
+}
+
 int maxHeap::numElements()
 {
   return count;
@@ -117,14 +129,17 @@ int maxHeap::numElements()
 
 int maxHeap::insert(InfoRecord *x)
 {
-  h[count].info = x;
-  h[count].deleted  = 0;
-  int current = count;
-  count++;
-
-  if (count >= size) {
+  int current;
+
+  if (count < size) {
+    h[count].info = x;
+    h[count].deleted  = 0;
+    current = count;
+    count++;
+  } else {
     cout << "maxHeap overflow. \n" ; 
-    return -1;}
+    return -1;
+  }
 
   int parent = (current - 1)/2;
   while (current != 0)
index b21c884f86502df9ac2dc9e7248556ed0af51927..12313bc8e5ffec830e006968a71ad029343df5b7 100644 (file)
@@ -23,15 +23,15 @@ private:
   heapRecord *h;
   int count;
   int size;
-  void swap(int i, int j) 
-    {
-      heapRecord temp = h[i];
-      h[i] = h[j];
-      h[j] = temp;
-    }
+  void swap(int i, int j) {
+    heapRecord temp = h[i];
+    h[i] = h[j];
+    h[j] = temp;
+  }
   
 public:
   minHeap(int size);
+  ~minHeap();
   int numElements();
   int insert(InfoRecord *);
   InfoRecord *deleteMin();
@@ -46,15 +46,15 @@ private:
   int count;
   int size;
 
-  void swap(int i, int j) 
-    {
-      heapRecord temp = h[i];
-      h[i] = h[j];
-      h[j] = temp;
-    }
+  void swap(int i, int j) {
+    heapRecord temp = h[i];
+    h[i] = h[j];
+    h[j] = temp;
+  }
   
 public:  
   maxHeap(int size);
+  ~maxHeap();
   int numElements();
   int insert(InfoRecord *);
   InfoRecord *deleteMax();