I Rearranged the load balancer, so WSLB->NeighborLB, NeighborLB->NborBaseLB,
authorRobert Brunner <rbrunner@uiuc.edu>
Wed, 12 Jan 2000 18:34:10 +0000 (18:34 +0000)
committerRobert Brunner <rbrunner@uiuc.edu>
Wed, 12 Jan 2000 18:34:10 +0000 (18:34 +0000)
and WSLB is a new, no-base class load balancer that currently functions
the same as NeighborLB

src/ck-ldb/NborBaseLB.C [new file with mode: 0644]
src/ck-ldb/NborBaseLB.ci [new file with mode: 0644]
src/ck-ldb/NborBaseLB.h [new file with mode: 0644]
src/ck-ldb/NeighborLB.C
src/ck-ldb/NeighborLB.ci
src/ck-ldb/NeighborLB.h
src/ck-ldb/WSLB.C
src/ck-ldb/WSLB.ci
src/ck-ldb/WSLB.h
src/scripts/Makefile

diff --git a/src/ck-ldb/NborBaseLB.C b/src/ck-ldb/NborBaseLB.C
new file mode 100644 (file)
index 0000000..d05662d
--- /dev/null
@@ -0,0 +1,370 @@
+#include <unistd.h>
+#include <charm++.h>
+#include <LBDatabase.h>
+#include "NborBaseLB.h"
+#include "NborBaseLB.def.h"
+
+CkGroupID nborBaselb;
+
+#if CMK_LBDB_ON
+
+void CreateNborBaseLB()
+{
+  nborBaselb = CProxy_NborBaseLB::ckNew();
+}
+
+void NborBaseLB::staticMigrated(void* data, LDObjHandle h)
+{
+  NborBaseLB *me = static_cast<NborBaseLB*>(data);
+
+  me->Migrated(h);
+}
+
+void NborBaseLB::staticAtSync(void* data)
+{
+  NborBaseLB *me = static_cast<NborBaseLB*>(data);
+
+  me->AtSync();
+}
+
+NborBaseLB::NborBaseLB()
+{
+  mystep = 0;
+  theLbdb = CProxy_LBDatabase(lbdb).ckLocalBranch();
+  theLbdb->
+    AddLocalBarrierReceiver(reinterpret_cast<LDBarrierFn>(staticAtSync),
+                           static_cast<void*>(this));
+  theLbdb->
+    NotifyMigrated(reinterpret_cast<LDMigratedFn>(staticMigrated),
+                  static_cast<void*>(this));
+
+
+  // I had to move neighbor initialization outside the constructor
+  // in order to get the virtual functions of any derived classes
+  // so I'll just set them to illegal values here.
+  neighbor_pes = 0;
+  stats_msg_count = 0;
+  statsMsgsList = 0;
+  statsDataList = 0;
+  migrates_completed = 0;
+  migrates_expected = -1;
+  mig_msgs_received = 0;
+  mig_msgs = 0;
+
+  myStats.proc_speed = theLbdb->ProcessorSpeed();
+//  char hostname[80];
+//  gethostname(hostname,79);
+//  CkPrintf("[%d] host %s speed %d\n",CkMyPe(),hostname,myStats.proc_speed);
+  myStats.obj_data_sz = 0;
+  myStats.comm_data_sz = 0;
+  receive_stats_ready = 0;
+
+  theLbdb->CollectStatsOn();
+}
+
+NborBaseLB::~NborBaseLB()
+{
+  CkPrintf("Going away\n");
+}
+
+void NborBaseLB::FindNeighbors()
+{
+  if (neighbor_pes == 0) { // Neighbors never initialized, so init them
+                           // and other things that depend on the number
+                           // of neighbors
+    statsMsgsList = new NLBStatsMsg*[num_neighbors()];
+    for(int i=0; i < num_neighbors(); i++)
+      statsMsgsList[i] = 0;
+    statsDataList = new LDStats[num_neighbors()];
+
+    neighbor_pes = new int[num_neighbors()];
+    neighbors(neighbor_pes);
+    mig_msgs_expected = num_neighbors();
+    mig_msgs = new NLBMigrateMsg*[num_neighbors()];
+  }
+
+}
+
+void NborBaseLB::AtSync()
+{
+  //  CkPrintf("[%d] NborBaseLB At Sync step %d!!!!\n",CkMyPe(),mystep);
+
+  if (CkMyPe() == 0) {
+    start_lb_time = CmiWallTimer();
+    CkPrintf("Load balancing step %d starting at %f\n",
+            step(),start_lb_time);
+  }
+
+  if (neighbor_pes == 0) FindNeighbors();
+
+  if (!QueryBalanceNow(step()) || num_neighbors() == 0) {
+    MigrationDone();
+    return;
+  }
+
+  NLBStatsMsg* msg = AssembleStats();
+
+  int i;
+  for(i=1; i < num_neighbors(); i++) {
+    NLBStatsMsg* m2 = (NLBStatsMsg*) CkCopyMsg((void**)&msg);
+    CProxy_NborBaseLB(thisgroup).ReceiveStats(m2,neighbor_pes[i]);
+  }
+  if (0 < num_neighbors()) {
+    CProxy_NborBaseLB(thisgroup).ReceiveStats(msg,neighbor_pes[0]);
+  } else delete msg;
+
+  // Tell our own node that we are ready
+  ReceiveStats((NLBStatsMsg*)0);
+}
+
+NLBStatsMsg* NborBaseLB::AssembleStats()
+{
+  // Get stats
+  theLbdb->TotalTime(&myStats.total_walltime,&myStats.total_cputime);
+  theLbdb->IdleTime(&myStats.idletime);
+  theLbdb->BackgroundLoad(&myStats.bg_walltime,&myStats.bg_cputime);
+  myStats.obj_data_sz = theLbdb->GetObjDataSz();
+  myStats.objData = new LDObjData[myStats.obj_data_sz];
+  theLbdb->GetObjData(myStats.objData);
+
+  myStats.comm_data_sz = theLbdb->GetCommDataSz();
+  myStats.commData = new LDCommData[myStats.comm_data_sz];
+  theLbdb->GetCommData(myStats.commData);
+
+  myStats.obj_walltime = myStats.obj_cputime = 0;
+  for(int i=0; i < myStats.obj_data_sz; i++) {
+    myStats.obj_walltime += myStats.objData[i].wallTime;
+    myStats.obj_cputime += myStats.objData[i].cpuTime;
+  }    
+
+  NLBStatsMsg* msg = new NLBStatsMsg;
+
+  msg->from_pe = CkMyPe();
+  msg->serial = rand();
+  msg->proc_speed = myStats.proc_speed;
+  msg->total_walltime = myStats.total_walltime;
+  msg->total_cputime = myStats.total_cputime;
+  msg->idletime = myStats.idletime;
+  msg->bg_walltime = myStats.bg_walltime;
+  msg->bg_cputime = myStats.bg_cputime;
+  msg->obj_walltime = myStats.obj_walltime;
+  msg->obj_cputime = myStats.obj_cputime;
+
+  //  CkPrintf(
+  //    "Proc %d speed=%d Total(wall,cpu)=%f %f Idle=%f Bg=%f %f Obj=%f %f\n",
+  //    CkMyPe(),msg->proc_speed,msg->total_walltime,msg->total_cputime,
+  //    msg->idletime,msg->bg_walltime,msg->bg_cputime,
+  //    msg->obj_walltime,msg->obj_cputime);
+
+  //  CkPrintf("PE %d sending %d to ReceiveStats %d objs, %d comm\n",
+  //      CkMyPe(),msg->serial,msg->n_objs,msg->n_comm);
+  return msg;
+}
+
+void NborBaseLB::Migrated(LDObjHandle h)
+{
+  migrates_completed++;
+  //  CkPrintf("[%d] An object migrated! %d %d\n",
+  //      CkMyPe(),migrates_completed,migrates_expected);
+  if (migrates_completed == migrates_expected) {
+    MigrationDone();
+  }
+}
+
+void NborBaseLB::ReceiveStats(NLBStatsMsg *m)
+{
+  if (neighbor_pes == 0) FindNeighbors();
+
+  if (m == 0) { // This is from our own node
+    receive_stats_ready = 1;
+  } else {
+    const int pe = m->from_pe;
+    //  CkPrintf("Stats msg received, %d %d %d %d %p\n",
+    //            pe,stats_msg_count,m->n_objs,m->serial,m);
+    int peslot = -1;
+    for(int i=0; i < num_neighbors(); i++) {
+      if (pe == neighbor_pes[i]) {
+       peslot = i;
+       break;
+      }
+    }
+    if (peslot == -1 || statsMsgsList[peslot] != 0) {
+      CkPrintf("*** Unexpected NLBStatsMsg in ReceiveStats from PE %d ***\n",
+              pe);
+    } else {
+      statsMsgsList[peslot] = m;
+      statsDataList[peslot].from_pe = m->from_pe;
+      statsDataList[peslot].total_walltime = m->total_walltime;
+      statsDataList[peslot].total_cputime = m->total_cputime;
+      statsDataList[peslot].idletime = m->idletime;
+      statsDataList[peslot].bg_walltime = m->bg_walltime;
+      statsDataList[peslot].bg_cputime = m->bg_cputime;
+      statsDataList[peslot].proc_speed = m->proc_speed;
+      statsDataList[peslot].obj_walltime = m->obj_walltime;
+      statsDataList[peslot].obj_cputime = m->obj_cputime;
+      stats_msg_count++;
+    }
+  }
+
+  const int clients = num_neighbors();
+  if (stats_msg_count == clients && receive_stats_ready) {
+    double strat_start_time = CmiWallTimer();
+    receive_stats_ready = 0;
+    NLBMigrateMsg* migrateMsg = Strategy(statsDataList,clients);
+
+    int i;
+
+    // Migrate messages from me to elsewhere
+    for(i=0; i < migrateMsg->n_moves; i++) {
+      MigrateInfo& move = migrateMsg->moves[i];
+      const int me = CkMyPe();
+      if (move.from_pe == me && move.to_pe != me) {
+       theLbdb->Migrate(move.obj,move.to_pe);
+      } else if (move.from_pe != me) {
+       CkPrintf("[%d] error, strategy wants to move from %d to  %d\n",
+                me,move.from_pe,move.to_pe);
+      }
+    }
+    
+    // Now, send migrate messages to neighbors
+    for(i=1; i < num_neighbors(); i++) {
+      NLBMigrateMsg* m2 = (NLBMigrateMsg*) CkCopyMsg((void**)&migrateMsg);
+      CProxy_NborBaseLB(thisgroup).ReceiveMigration(m2,neighbor_pes[i]);
+    }
+    if (0 < num_neighbors())
+      CProxy_NborBaseLB(thisgroup).ReceiveMigration(migrateMsg,
+                                                   neighbor_pes[0]);
+    else delete migrateMsg;
+    
+    // Zero out data structures for next cycle
+    for(i=0; i < clients; i++) {
+      delete statsMsgsList[i];
+      statsMsgsList[i]=0;
+    }
+    stats_msg_count=0;
+
+    theLbdb->ClearLoads();
+    if (CkMyPe() == 0) {
+      double strat_end_time = CmiWallTimer();
+      CkPrintf("Strat elapsed time %f\n",strat_end_time-strat_start_time);
+    }
+  }
+  
+}
+
+void NborBaseLB::ReceiveMigration(NLBMigrateMsg *msg)
+{
+  if (neighbor_pes == 0) FindNeighbors();
+
+  if (mig_msgs_received == 0) migrates_expected = 0;
+
+  mig_msgs[mig_msgs_received] = msg;
+  mig_msgs_received++;
+  //  CkPrintf("[%d] Received migration msg %d of %d\n",
+  //      CkMyPe(),mig_msgs_received,mig_msgs_expected);
+
+  if (mig_msgs_received > mig_msgs_expected) {
+    CkPrintf("[%d] NeighborLB Error! Too many migration messages received\n",
+            CkMyPe());
+  }
+
+  if (mig_msgs_received != mig_msgs_expected) {
+    return;
+  }
+
+  //  CkPrintf("[%d] in ReceiveMigration %d moves\n",CkMyPe(),msg->n_moves);
+  for(int neigh=0; neigh < mig_msgs_received;neigh++) {
+    NLBMigrateMsg* m = mig_msgs[neigh];
+    for(int i=0; i < m->n_moves; i++) {
+      MigrateInfo& move = m->moves[i];
+      const int me = CkMyPe();
+      if (move.from_pe != me && move.to_pe == me) {
+       migrates_expected++;
+      }
+    }
+    delete m;
+    mig_msgs[neigh]=0;
+  }
+  //  CkPrintf("[%d] in ReceiveMigration %d expected\n",
+  //      CkMyPe(),migrates_expected);
+  mig_msgs_received = 0;
+  if (migrates_expected == 0 || migrates_expected == migrates_completed)
+    MigrationDone();
+}
+
+
+void NborBaseLB::MigrationDone()
+{
+  if (CkMyPe() == 0) {
+    double end_lb_time = CmiWallTimer();
+    CkPrintf("Load balancing step %d finished at %f duration %f\n",
+            step(),end_lb_time,end_lb_time - start_lb_time);
+  }
+  migrates_completed = 0;
+  migrates_expected = -1;
+  // Increment to next step
+  mystep++;
+  CProxy_NborBaseLB(thisgroup).ResumeClients(CkMyPe());
+}
+
+void NborBaseLB::ResumeClients()
+{
+  theLbdb->ResumeClients();
+}
+
+NLBMigrateMsg* NborBaseLB::Strategy(LDStats* stats,int count)
+{
+  for(int j=0; j < count; j++) {
+    CkPrintf(
+    "[%d] Proc %d Speed %d Total(wall,cpu)=%f %f Idle=%f Bg=%f %f obj=%f %f\n",
+    CkMyPe(),stats[j].from_pe,stats[j].proc_speed,
+    stats[j].total_walltime,stats[j].total_cputime,
+    stats[j].idletime,stats[j].bg_walltime,stats[j].bg_cputime,
+    stats[j].obj_walltime,stats[j].obj_cputime);
+  }
+
+  delete [] myStats.objData;
+  myStats.obj_data_sz = 0;
+  delete [] myStats.commData;
+  myStats.comm_data_sz = 0;
+
+  int sizes=0;
+  NLBMigrateMsg* msg = new(&sizes,1) NLBMigrateMsg;
+  msg->n_moves = 0;
+
+  return msg;
+}
+
+void* NLBMigrateMsg::alloc(int msgnum, size_t size, int* array, int priobits)
+{
+  int totalsize = size + array[0] * sizeof(NborBaseLB::MigrateInfo);
+
+  NLBMigrateMsg* ret =
+    static_cast<NLBMigrateMsg*>(CkAllocMsg(msgnum,totalsize,priobits));
+
+  ret->moves = reinterpret_cast<NborBaseLB::MigrateInfo*>
+    (reinterpret_cast<char*>(ret)+ size);
+
+  return static_cast<void*>(ret);
+}
+
+void* NLBMigrateMsg::pack(NLBMigrateMsg* m)
+{
+  m->moves = reinterpret_cast<NborBaseLB::MigrateInfo*>
+    (reinterpret_cast<char*>(m->moves) - reinterpret_cast<char*>(&m->moves));
+
+  return static_cast<void*>(m);
+}
+
+NLBMigrateMsg* NLBMigrateMsg::unpack(void *m)
+{
+  NLBMigrateMsg* ret_val = static_cast<NLBMigrateMsg*>(m);
+
+  ret_val->moves = reinterpret_cast<NborBaseLB::MigrateInfo*>
+    (reinterpret_cast<char*>(&ret_val->moves) 
+     + reinterpret_cast<size_t>(ret_val->moves));
+
+  return ret_val;
+}
+
+#endif
diff --git a/src/ck-ldb/NborBaseLB.ci b/src/ck-ldb/NborBaseLB.ci
new file mode 100644 (file)
index 0000000..26a8d47
--- /dev/null
@@ -0,0 +1,17 @@
+module NborBaseLB {
+
+extern module LBDatabase;
+
+readonly CkGroupID nborBaselb;
+
+message NLBStatsMsg;
+message[varsize] NLBMigrateMsg;
+
+group NborBaseLB {
+  entry void NborBaseLB(void);  
+  entry void ReceiveStats(NLBStatsMsg*);
+  entry void ResumeClients(void);
+  entry void ReceiveMigration(NLBMigrateMsg*); 
+};
+
+};
diff --git a/src/ck-ldb/NborBaseLB.h b/src/ck-ldb/NborBaseLB.h
new file mode 100644 (file)
index 0000000..649fd48
--- /dev/null
@@ -0,0 +1,124 @@
+#ifndef NBORBASELB_H
+#define NBORBASELB_H
+
+#include <LBDatabase.h>
+#include "NborBaseLB.decl.h"
+
+
+void CreateNborBaseLB();
+
+class NLBStatsMsg;
+class NLBMigrateMsg;
+
+class NborBaseLB : public Group
+{
+public:
+  NborBaseLB();
+  ~NborBaseLB();
+  static void staticAtSync(void*);
+  void AtSync(void); // Everything is at the PE barrier
+
+  void ReceiveStats(NLBStatsMsg *);            // Receive stats on PE 0
+  void ResumeClients();
+  void ReceiveMigration(NLBMigrateMsg *);      // Receive migration data
+
+  // Migrated-element callback
+  static void staticMigrated(void* me, LDObjHandle h);
+  void Migrated(LDObjHandle h);
+
+  void MigrationDone(void);  // Call when migration is complete
+  int step() { return mystep; };
+
+  struct MigrateInfo {  // Used in NLBMigrateMsg
+    LDObjHandle obj;
+    int from_pe;
+    int to_pe;
+  };
+
+  struct LDStats {  // Passed to Strategy
+    int from_pe;
+    double total_walltime;
+    double total_cputime;
+    double idletime;
+    double bg_walltime;
+    double bg_cputime;
+    double obj_walltime;
+    double obj_cputime;
+    int proc_speed;
+  };
+
+protected:
+  virtual CmiBool QueryBalanceNow(int) { return CmiTrue; };  
+  virtual NLBMigrateMsg* Strategy(LDStats* stats,int count);
+
+  virtual int num_neighbors() {
+    if (CmiNumPes() > 2) return 2;
+    else return (CmiNumPes()-1);
+  };
+
+  virtual void neighbors(int* _n) {
+    _n[0] = (CmiMyPe() + CmiNumPes() -1) % CmiNumPes();
+    _n[1] = (CmiMyPe() + 1) % CmiNumPes();
+  };
+
+  LBDatabase* theLbdb;
+  struct {
+    int proc_speed;
+    double total_walltime;
+    double total_cputime;
+    double idletime;
+    double bg_walltime;
+    double bg_cputime;
+    int obj_data_sz;
+    LDObjData* objData;
+    int comm_data_sz;
+    LDCommData* commData;
+    double obj_walltime;
+    double obj_cputime;
+  } myStats;
+
+private:
+  void FindNeighbors();
+  NLBStatsMsg* AssembleStats();
+
+  int mystep;
+  int stats_msg_count;
+  NLBStatsMsg** statsMsgsList;
+  LDStats* statsDataList;
+  int migrates_completed;
+  int migrates_expected;
+  NLBMigrateMsg** mig_msgs;
+  int mig_msgs_received;
+  int mig_msgs_expected;
+  int* neighbor_pes;
+  int receive_stats_ready;
+  double start_lb_time;
+};
+
+class NLBStatsMsg : public CMessage_NLBStatsMsg {
+public:
+  int from_pe;
+  int serial;
+  int proc_speed;
+  double total_walltime;
+  double total_cputime;
+  double idletime;
+  double bg_walltime;
+  double bg_cputime;
+  double obj_walltime;
+  double obj_cputime;
+}; 
+
+class NLBMigrateMsg : public CMessage_NLBMigrateMsg {
+public:
+  int n_moves;
+  NborBaseLB::MigrateInfo* moves;
+
+  // Other methods & data members 
+  
+  static void* alloc(int msgnum, size_t size, int* array, int priobits); 
+  static void* pack(NLBMigrateMsg* in); 
+  static NLBMigrateMsg* unpack(void* in); 
+}; 
+
+#endif /* NBORBASELB_H */
index d20256336ddebb85ffda8a6e3d0f3d2e7705211b..373c75b0c60bc3ca71d0a382ae195c95b1ca7068 100644 (file)
-#include <unistd.h>
 #include <charm++.h>
-#include <LBDatabase.h>
-#include "NeighborLB.h"
-#include "NeighborLB.def.h"
-
-CkGroupID neighborlb;
 
 #if CMK_LBDB_ON
 
-void CreateNeighborLB()
-{
-  neighborlb = CProxy_NeighborLB::ckNew();
-}
+#include "CkLists.h"
 
-void NeighborLB::staticMigrated(void* data, LDObjHandle h)
-{
-  NeighborLB *me = static_cast<NeighborLB*>(data);
-
-  me->Migrated(h);
-}
+#include "heap.h"
+#include "NeighborLB.h"
+#include "NeighborLB.def.h"
 
-void NeighborLB::staticAtSync(void* data)
+void CreateNeighborLB()
 {
-  NeighborLB *me = static_cast<NeighborLB*>(data);
-
-  me->AtSync();
+  nborBaselb = CProxy_NeighborLB::ckNew();
 }
 
 NeighborLB::NeighborLB()
 {
-  mystep = 0;
-  theLbdb = CProxy_LBDatabase(lbdb).ckLocalBranch();
-  theLbdb->
-    AddLocalBarrierReceiver(reinterpret_cast<LDBarrierFn>(staticAtSync),
-                           static_cast<void*>(this));
-  theLbdb->
-    NotifyMigrated(reinterpret_cast<LDMigratedFn>(staticMigrated),
-                  static_cast<void*>(this));
-
-
-  // I had to move neighbor initialization outside the constructor
-  // in order to get the virtual functions of any derived classes
-  // so I'll just set them to illegal values here.
-  neighbor_pes = 0;
-  stats_msg_count = 0;
-  statsMsgsList = 0;
-  statsDataList = 0;
-  migrates_completed = 0;
-  migrates_expected = -1;
-  mig_msgs_received = 0;
-  mig_msgs = 0;
-
-  myStats.proc_speed = theLbdb->ProcessorSpeed();
-//  char hostname[80];
-//  gethostname(hostname,79);
-//  CkPrintf("[%d] host %s speed %d\n",CkMyPe(),hostname,myStats.proc_speed);
-  myStats.obj_data_sz = 0;
-  myStats.comm_data_sz = 0;
-  receive_stats_ready = 0;
-
-  theLbdb->CollectStatsOn();
-}
-
-NeighborLB::~NeighborLB()
-{
-  CkPrintf("Going away\n");
-}
-
-void NeighborLB::FindNeighbors()
-{
-  if (neighbor_pes == 0) { // Neighbors never initialized, so init them
-                           // and other things that depend on the number
-                           // of neighbors
-    statsMsgsList = new NLBStatsMsg*[num_neighbors()];
-    for(int i=0; i < num_neighbors(); i++)
-      statsMsgsList[i] = 0;
-    statsDataList = new LDStats[num_neighbors()];
-
-    neighbor_pes = new int[num_neighbors()];
-    neighbors(neighbor_pes);
-    mig_msgs_expected = num_neighbors();
-    mig_msgs = new NLBMigrateMsg*[num_neighbors()];
-  }
-
+  if (CkMyPe() == 0)
+    CkPrintf("[%d] NeighborLB created\n",CkMyPe());
 }
 
-void NeighborLB::AtSync()
+NLBMigrateMsg* NeighborLB::Strategy(NborBaseLB::LDStats* stats, int count)
 {
-  //  CkPrintf("[%d] NeighborLB At Sync step %d!!!!\n",CkMyPe(),mystep);
-
-  if (CkMyPe() == 0) {
-    start_lb_time = CmiWallTimer();
-    CkPrintf("Load balancing step %d starting at %f\n",
-            step(),start_lb_time);
-  }
-
-  if (neighbor_pes == 0) FindNeighbors();
-
-  if (!QueryBalanceNow(step()) || num_neighbors() == 0) {
-    MigrationDone();
-    return;
-  }
-
-  NLBStatsMsg* msg = AssembleStats();
-
+  //  CkPrintf("[%d] Strategy starting\n",CkMyPe());
+  // Compute the average load to see if we are overloaded relative
+  // to our neighbors
+  double myload = myStats.total_walltime - myStats.idletime;
+  double avgload = myload;
   int i;
-  for(i=1; i < num_neighbors(); i++) {
-    NLBStatsMsg* m2 = (NLBStatsMsg*) CkCopyMsg((void**)&msg);
-    CProxy_NeighborLB(thisgroup).ReceiveStats(m2,neighbor_pes[i]);
-  }
-  if (0 < num_neighbors()) {
-    CProxy_NeighborLB(thisgroup).ReceiveStats(msg,neighbor_pes[0]);
-  } else delete msg;
-
-  // Tell our own node that we are ready
-  ReceiveStats((NLBStatsMsg*)0);
-}
-
-NLBStatsMsg* NeighborLB::AssembleStats()
-{
-  // Get stats
-  theLbdb->TotalTime(&myStats.total_walltime,&myStats.total_cputime);
-  theLbdb->IdleTime(&myStats.idletime);
-  theLbdb->BackgroundLoad(&myStats.bg_walltime,&myStats.bg_cputime);
-  myStats.obj_data_sz = theLbdb->GetObjDataSz();
-  myStats.objData = new LDObjData[myStats.obj_data_sz];
-  theLbdb->GetObjData(myStats.objData);
-
-  myStats.comm_data_sz = theLbdb->GetCommDataSz();
-  myStats.commData = new LDCommData[myStats.comm_data_sz];
-  theLbdb->GetCommData(myStats.commData);
+  for(i=0; i < count; i++) {
+    // Scale times we need appropriately for relative proc speeds
+    const double scale =  ((double)myStats.proc_speed) 
+      / stats[i].proc_speed;
 
-  myStats.obj_walltime = myStats.obj_cputime = 0;
-  for(int i=0; i < myStats.obj_data_sz; i++) {
-    myStats.obj_walltime += myStats.objData[i].wallTime;
-    myStats.obj_cputime += myStats.objData[i].cpuTime;
-  }    
+    stats[i].total_walltime *= scale;
+    stats[i].idletime *= scale;
 
-  NLBStatsMsg* msg = new NLBStatsMsg;
-
-  msg->from_pe = CkMyPe();
-  msg->serial = rand();
-  msg->proc_speed = myStats.proc_speed;
-  msg->total_walltime = myStats.total_walltime;
-  msg->total_cputime = myStats.total_cputime;
-  msg->idletime = myStats.idletime;
-  msg->bg_walltime = myStats.bg_walltime;
-  msg->bg_cputime = myStats.bg_cputime;
-  msg->obj_walltime = myStats.obj_walltime;
-  msg->obj_cputime = myStats.obj_cputime;
-
-  //  CkPrintf(
-  //    "Proc %d speed=%d Total(wall,cpu)=%f %f Idle=%f Bg=%f %f Obj=%f %f\n",
-  //    CkMyPe(),msg->proc_speed,msg->total_walltime,msg->total_cputime,
-  //    msg->idletime,msg->bg_walltime,msg->bg_cputime,
-  //    msg->obj_walltime,msg->obj_cputime);
-
-  //  CkPrintf("PE %d sending %d to ReceiveStats %d objs, %d comm\n",
-  //      CkMyPe(),msg->serial,msg->n_objs,msg->n_comm);
-  return msg;
-}
-
-void NeighborLB::Migrated(LDObjHandle h)
-{
-  migrates_completed++;
-  //  CkPrintf("[%d] An object migrated! %d %d\n",
-  //      CkMyPe(),migrates_completed,migrates_expected);
-  if (migrates_completed == migrates_expected) {
-    MigrationDone();
+    avgload += (stats[i].total_walltime - stats[i].idletime);
   }
-}
-
-void NeighborLB::ReceiveStats(NLBStatsMsg *m)
-{
-  if (neighbor_pes == 0) FindNeighbors();
-
-  if (m == 0) { // This is from our own node
-    receive_stats_ready = 1;
-  } else {
-    const int pe = m->from_pe;
-    //  CkPrintf("Stats msg received, %d %d %d %d %p\n",
-    //            pe,stats_msg_count,m->n_objs,m->serial,m);
-    int peslot = -1;
-    for(int i=0; i < num_neighbors(); i++) {
-      if (pe == neighbor_pes[i]) {
-       peslot = i;
-       break;
-      }
+  avgload /= (count+1);
+
+  CkVector migrateInfo;
+
+  if (myload > avgload) {
+    //    CkPrintf("[%d] OVERLOAD My load is %f, average load is %f\n",
+    //              CkMyPe(),myload,avgload);
+
+    // First, build heaps of other processors and my objects
+    // Then assign objects to other processors until either
+    //   - The smallest remaining object would put me below average, or
+    //   - I only have 1 object left, or
+    //   - The smallest remaining object would put someone else 
+    //     above average
+
+    // Build heaps
+    minHeap procs(count);
+    for(i=0; i < count; i++) {
+      InfoRecord* item = new InfoRecord;
+      item->load = stats[i].total_walltime - stats[i].idletime;
+      item->Id =  stats[i].from_pe;
+      procs.insert(item);
     }
-    if (peslot == -1 || statsMsgsList[peslot] != 0) {
-      CkPrintf("*** Unexpected NLBStatsMsg in ReceiveStats from PE %d ***\n",
-              pe);
-    } else {
-      statsMsgsList[peslot] = m;
-      statsDataList[peslot].from_pe = m->from_pe;
-      statsDataList[peslot].total_walltime = m->total_walltime;
-      statsDataList[peslot].total_cputime = m->total_cputime;
-      statsDataList[peslot].idletime = m->idletime;
-      statsDataList[peslot].bg_walltime = m->bg_walltime;
-      statsDataList[peslot].bg_cputime = m->bg_cputime;
-      statsDataList[peslot].proc_speed = m->proc_speed;
-      statsDataList[peslot].obj_walltime = m->obj_walltime;
-      statsDataList[peslot].obj_cputime = m->obj_cputime;
-      stats_msg_count++;
+      
+    maxHeap objs(myStats.obj_data_sz);
+    for(i=0; i < myStats.obj_data_sz; i++) {
+      InfoRecord* item = new InfoRecord;
+      item->load = myStats.objData[i].wallTime;
+      item->Id = i;
+      objs.insert(item);
     }
-  }
 
-  const int clients = num_neighbors();
-  if (stats_msg_count == clients && receive_stats_ready) {
-    double strat_start_time = CmiWallTimer();
-    receive_stats_ready = 0;
-    NLBMigrateMsg* migrateMsg = Strategy(statsDataList,clients);
+    int objs_here = myStats.obj_data_sz;
+    do {
+      if (objs_here <= 1) break;  // For now, always leave 1 object
 
-    int i;
+      InfoRecord* p;
+      InfoRecord* obj;
 
-    // Migrate messages from me to elsewhere
-    for(i=0; i < migrateMsg->n_moves; i++) {
-      MigrateInfo& move = migrateMsg->moves[i];
-      const int me = CkMyPe();
-      if (move.from_pe == me && move.to_pe != me) {
-       theLbdb->Migrate(move.obj,move.to_pe);
-      } else if (move.from_pe != me) {
-       CkPrintf("[%d] error, strategy wants to move from %d to  %d\n",
-                me,move.from_pe,move.to_pe);
+      // Get the lightest-loaded processor
+      p = procs.deleteMin();
+      if (p == 0) {
+       //      CkPrintf("[%d] No destination PE found!\n",CkMyPe());
+       break;
       }
-    }
-    
-    // Now, send migrate messages to neighbors
-    for(i=1; i < num_neighbors(); i++) {
-      NLBMigrateMsg* m2 = (NLBMigrateMsg*) CkCopyMsg((void**)&migrateMsg);
-      CProxy_NeighborLB(thisgroup).ReceiveMigration(m2,neighbor_pes[i]);
-    }
-    if (0 < num_neighbors())
-      CProxy_NeighborLB(thisgroup).ReceiveMigration(migrateMsg,
-                                                   neighbor_pes[0]);
-    else delete migrateMsg;
-    
-    // Zero out data structures for next cycle
-    for(i=0; i < clients; i++) {
-      delete statsMsgsList[i];
-      statsMsgsList[i]=0;
-    }
-    stats_msg_count=0;
-
-    theLbdb->ClearLoads();
-    if (CkMyPe() == 0) {
-      double strat_end_time = CmiWallTimer();
-      CkPrintf("Strat elapsed time %f\n",strat_end_time-strat_start_time);
-    }
-  }
-  
-}
-
-void NeighborLB::ReceiveMigration(NLBMigrateMsg *msg)
-{
-  if (neighbor_pes == 0) FindNeighbors();
-
-  if (mig_msgs_received == 0) migrates_expected = 0;
 
-  mig_msgs[mig_msgs_received] = msg;
-  mig_msgs_received++;
-  //  CkPrintf("[%d] Received migration msg %d of %d\n",
-  //      CkMyPe(),mig_msgs_received,mig_msgs_expected);
-
-  if (mig_msgs_received > mig_msgs_expected) {
-    CkPrintf("[%d] NeighborLB Error! Too many migration messages received\n",
-            CkMyPe());
-  }
-
-  if (mig_msgs_received != mig_msgs_expected) {
-    return;
-  }
-
-  //  CkPrintf("[%d] in ReceiveMigration %d moves\n",CkMyPe(),msg->n_moves);
-  for(int neigh=0; neigh < mig_msgs_received;neigh++) {
-    NLBMigrateMsg* m = mig_msgs[neigh];
-    for(int i=0; i < m->n_moves; i++) {
-      MigrateInfo& move = m->moves[i];
-      const int me = CkMyPe();
-      if (move.from_pe != me && move.to_pe == me) {
-       migrates_expected++;
+      // Get the biggest object
+      CmiBool objfound = CmiFalse;
+      do {
+       obj = objs.deleteMax();
+       if (obj == 0) break;
+
+       double new_p_load = p->load + obj->load;
+       double my_new_load = myload - obj->load;
+       if (new_p_load < my_new_load) {
+//     if (new_p_load < avgload) {
+         objfound = CmiTrue;
+       } else {
+         // This object is too big, so throw it away
+//       CkPrintf("[%d] Can't move object w/ load %f to proc %d load %f %f\n",
+//                CkMyPe(),obj->load,p->Id,p->load,avgload);
+         delete obj;
+       }
+      } while (!objfound);
+
+      if (!objfound) {
+       //      CkPrintf("[%d] No suitable object found!\n",CkMyPe());
+       break;
       }
-    }
-    delete m;
-    mig_msgs[neigh]=0;
-  }
-  //  CkPrintf("[%d] in ReceiveMigration %d expected\n",
-  //      CkMyPe(),migrates_expected);
-  mig_msgs_received = 0;
-  if (migrates_expected == 0 || migrates_expected == migrates_completed)
-    MigrationDone();
-}
 
-
-void NeighborLB::MigrationDone()
-{
-  if (CkMyPe() == 0) {
-    double end_lb_time = CmiWallTimer();
-    CkPrintf("Load balancing step %d finished at %f duration %f\n",
-            step(),end_lb_time,end_lb_time - start_lb_time);
-  }
-  migrates_completed = 0;
-  migrates_expected = -1;
-  // Increment to next step
-  mystep++;
-  CProxy_NeighborLB(thisgroup).ResumeClients(CkMyPe());
-}
-
-void NeighborLB::ResumeClients()
-{
-  theLbdb->ResumeClients();
-}
-
-NLBMigrateMsg* NeighborLB::Strategy(LDStats* stats,int count)
-{
-  for(int j=0; j < count; j++) {
-    CkPrintf(
-    "[%d] Proc %d Speed %d Total(wall,cpu)=%f %f Idle=%f Bg=%f %f obj=%f %f\n",
-    CkMyPe(),stats[j].from_pe,stats[j].proc_speed,
-    stats[j].total_walltime,stats[j].total_cputime,
-    stats[j].idletime,stats[j].bg_walltime,stats[j].bg_cputime,
-    stats[j].obj_walltime,stats[j].obj_cputime);
+      const int me = CkMyPe();
+      // Apparently we can give this object to this processor
+      //      CkPrintf("[%d] Obj %d of %d migrating from %d to %d\n",
+      //              CkMyPe(),obj->Id,myStats.obj_data_sz,me,p->Id);
+
+      MigrateInfo* migrateMe = new MigrateInfo;
+      migrateMe->obj = myStats.objData[obj->Id].handle;
+      migrateMe->from_pe = me;
+      migrateMe->to_pe = p->Id;
+      migrateInfo.push_back((void*)migrateMe);
+
+      objs_here--;
+      
+      // We may want to assign more to this processor, so lets
+      // update it and put it back in the heap
+      p->load += obj->load;
+      myload -= obj->load;
+      procs.insert(p);
+      
+      // This object is assigned, so we delete it from the heap
+      delete obj;
+
+    } while(myload > avgload);
+
+    // Now empty out the heaps
+    while (InfoRecord* p=procs.deleteMin())
+      delete p;
+    while (InfoRecord* obj=objs.deleteMax())
+      delete obj;
+  }  
+
+  // Now build the message to actually perform the migrations
+  int migrate_count=migrateInfo.size();
+  //  if (migrate_count > 0) {
+  //    CkPrintf("PE %d migrating %d elements\n",CkMyPe(),migrate_count);
+  //  }
+  NLBMigrateMsg* msg = new(&migrate_count,1) NLBMigrateMsg;
+  msg->n_moves = migrate_count;
+  for(i=0; i < migrate_count; i++) {
+    MigrateInfo* item = (MigrateInfo*) migrateInfo[i];
+    msg->moves[i] = *item;
+    delete item;
+    migrateInfo[i] = 0;
   }
 
-  delete [] myStats.objData;
-  myStats.obj_data_sz = 0;
-  delete [] myStats.commData;
-  myStats.comm_data_sz = 0;
-
-  int sizes=0;
-  NLBMigrateMsg* msg = new(&sizes,1) NLBMigrateMsg;
-  msg->n_moves = 0;
-
   return msg;
-}
-
-void* NLBMigrateMsg::alloc(int msgnum, size_t size, int* array, int priobits)
-{
-  int totalsize = size + array[0] * sizeof(NeighborLB::MigrateInfo);
-
-  NLBMigrateMsg* ret =
-    static_cast<NLBMigrateMsg*>(CkAllocMsg(msgnum,totalsize,priobits));
-
-  ret->moves = reinterpret_cast<NeighborLB::MigrateInfo*>
-    (reinterpret_cast<char*>(ret)+ size);
-
-  return static_cast<void*>(ret);
-}
-
-void* NLBMigrateMsg::pack(NLBMigrateMsg* m)
-{
-  m->moves = reinterpret_cast<NeighborLB::MigrateInfo*>
-    (reinterpret_cast<char*>(m->moves) - reinterpret_cast<char*>(&m->moves));
-
-  return static_cast<void*>(m);
-}
-
-NLBMigrateMsg* NLBMigrateMsg::unpack(void *m)
-{
-  NLBMigrateMsg* ret_val = static_cast<NLBMigrateMsg*>(m);
-
-  ret_val->moves = reinterpret_cast<NeighborLB::MigrateInfo*>
-    (reinterpret_cast<char*>(&ret_val->moves) 
-     + reinterpret_cast<size_t>(ret_val->moves));
-
-  return ret_val;
-}
+};
 
 #endif
index bddc8761af3d5067817306c24f27e0879102cdd4..430d51954c691253c6074ef337e98eb5fdb28328 100644 (file)
@@ -1,17 +1,9 @@
 module NeighborLB {
 
-extern module LBDatabase;
+extern module NborBaseLB;
 
-readonly CkGroupID neighborlb;
-
-message NLBStatsMsg;
-message[varsize] NLBMigrateMsg;
-
-group NeighborLB {
+group NeighborLB : NborBaseLB {
   entry void NeighborLB(void);  
-  entry void ReceiveStats(NLBStatsMsg*);
-  entry void ResumeClients(void);
-  entry void ReceiveMigration(NLBMigrateMsg*); 
 };
 
 };
index 045c31a2b16528f3c0ef41343498084eceea7d22..0214ce9b5901a65f8301c6b1fceaa124e42b28ca 100644 (file)
-#ifndef NEIGHBORLB_H
-#define NEIGHBORLB_H
+#ifndef _NEIGHBORLB_H_
+#define _NEIGHBORLB_H_
 
-#include <LBDatabase.h>
-#include "NeighborLB.decl.h"
+#include <math.h>
 
+#include "NborBaseLB.h"
+#include "NeighborLB.decl.h"
 
 void CreateNeighborLB();
 
-class NLBStatsMsg;
-class NLBMigrateMsg;
-
-class NeighborLB : public Group
-{
+class NeighborLB : public NborBaseLB {
 public:
   NeighborLB();
-  ~NeighborLB();
-  static void staticAtSync(void*);
-  void AtSync(void); // Everything is at the PE barrier
-
-  void ReceiveStats(NLBStatsMsg *);            // Receive stats on PE 0
-  void ResumeClients();
-  void ReceiveMigration(NLBMigrateMsg *);      // Receive migration data
-
-  // Migrated-element callback
-  static void staticMigrated(void* me, LDObjHandle h);
-  void Migrated(LDObjHandle h);
-
-  void MigrationDone(void);  // Call when migration is complete
-  int step() { return mystep; };
-
-  struct MigrateInfo {  // Used in NLBMigrateMsg
-    LDObjHandle obj;
-    int from_pe;
-    int to_pe;
-  };
-
-  struct LDStats {  // Passed to Strategy
-    int from_pe;
-    double total_walltime;
-    double total_cputime;
-    double idletime;
-    double bg_walltime;
-    double bg_cputime;
-    double obj_walltime;
-    double obj_cputime;
-    int proc_speed;
-  };
-
-protected:
-  virtual CmiBool QueryBalanceNow(int) { return CmiTrue; };  
-  virtual NLBMigrateMsg* Strategy(LDStats* stats,int count);
-
+private:
+  CmiBool QueryBalanceNow(int step) { return CmiTrue; };
   virtual int num_neighbors() {
-    if (CmiNumPes() > 2) return 2;
-    else return (CmiNumPes()-1);
+    return (CkNumPes() > 5) ? 4 : (CkNumPes()-1);
   };
-
   virtual void neighbors(int* _n) {
-    _n[0] = (CmiMyPe() + CmiNumPes() -1) % CmiNumPes();
-    _n[1] = (CmiMyPe() + 1) % CmiNumPes();
+    const int me = CkMyPe();
+    const int npe = CkNumPes();
+    if (npe > 1)
+      _n[0] = (me + npe - 1) % npe;
+    if (npe > 2)
+      _n[1] = (me + 1) % npe;
+
+    int bigstep = (npe - 1) / 3 + 1;
+    if (bigstep == 1) bigstep++;
+
+    if (npe > 3)
+      _n[2] = (me + bigstep) % npe;
+    if (npe > 4)
+      _n[3] = (me + npe - bigstep) % npe;
   };
 
-  LBDatabase* theLbdb;
-  struct {
-    int proc_speed;
-    double total_walltime;
-    double total_cputime;
-    double idletime;
-    double bg_walltime;
-    double bg_cputime;
-    int obj_data_sz;
-    LDObjData* objData;
-    int comm_data_sz;
-    LDCommData* commData;
-    double obj_walltime;
-    double obj_cputime;
-  } myStats;
-
-private:
-  void FindNeighbors();
-  NLBStatsMsg* AssembleStats();
-
-  int mystep;
-  int stats_msg_count;
-  NLBStatsMsg** statsMsgsList;
-  LDStats* statsDataList;
-  int migrates_completed;
-  int migrates_expected;
-  NLBMigrateMsg** mig_msgs;
-  int mig_msgs_received;
-  int mig_msgs_expected;
-  int* neighbor_pes;
-  int receive_stats_ready;
-  double start_lb_time;
+  NLBMigrateMsg* Strategy(NborBaseLB::LDStats* stats, int count);
 };
 
-class NLBStatsMsg : public CMessage_NLBStatsMsg {
-public:
-  int from_pe;
-  int serial;
-  int proc_speed;
-  double total_walltime;
-  double total_cputime;
-  double idletime;
-  double bg_walltime;
-  double bg_cputime;
-  double obj_walltime;
-  double obj_cputime;
-}; 
-
-class NLBMigrateMsg : public CMessage_NLBMigrateMsg {
-public:
-  int n_moves;
-  NeighborLB::MigrateInfo* moves;
-
-  // Other methods & data members 
-  
-  static void* alloc(int msgnum, size_t size, int* array, int priobits); 
-  static void* pack(NLBMigrateMsg* in); 
-  static NLBMigrateMsg* unpack(void* in); 
-}; 
-
-#endif /* NEIGHBORLB_H */
+#endif /* _NeighborLB_H_ */
index fed1508325d500185cbe26d80e3e305e4dd47cc4..0ec9ad25843fb75f99505447dab7e496fc6c37d0 100644 (file)
+#include <unistd.h>
 #include <charm++.h>
-
-#if CMK_LBDB_ON
-
-#include "CkLists.h"
-
+#include <LBDatabase.h>
+#include <CkLists.h>
 #include "heap.h"
 #include "WSLB.h"
 #include "WSLB.def.h"
 
+CkGroupID wslb;
+
+#if CMK_LBDB_ON
+
 void CreateWSLB()
 {
-  neighborlb = CProxy_WSLB::ckNew();
+  wslb = CProxy_WSLB::ckNew();
+}
+
+void WSLB::staticMigrated(void* data, LDObjHandle h)
+{
+  WSLB *me = static_cast<WSLB*>(data);
+
+  me->Migrated(h);
+}
+
+void WSLB::staticAtSync(void* data)
+{
+  WSLB *me = static_cast<WSLB*>(data);
+
+  me->AtSync();
 }
 
 WSLB::WSLB()
 {
-  if (CkMyPe() == 0)
-    CkPrintf("[%d] WSLB created\n",CkMyPe());
+  mystep = 0;
+  theLbdb = CProxy_LBDatabase(lbdb).ckLocalBranch();
+  theLbdb->
+    AddLocalBarrierReceiver(reinterpret_cast<LDBarrierFn>(staticAtSync),
+                           static_cast<void*>(this));
+  theLbdb->
+    NotifyMigrated(reinterpret_cast<LDMigratedFn>(staticMigrated),
+                  static_cast<void*>(this));
+
+
+  // I had to move neighbor initialization outside the constructor
+  // in order to get the virtual functions of any derived classes
+  // so I'll just set them to illegal values here.
+  neighbor_pes = 0;
+  stats_msg_count = 0;
+  statsMsgsList = 0;
+  statsDataList = 0;
+  migrates_completed = 0;
+  migrates_expected = -1;
+  mig_msgs_received = 0;
+  mig_msgs = 0;
+
+  myStats.proc_speed = theLbdb->ProcessorSpeed();
+//  char hostname[80];
+//  gethostname(hostname,79);
+//  CkPrintf("[%d] host %s speed %d\n",CkMyPe(),hostname,myStats.proc_speed);
+  myStats.obj_data_sz = 0;
+  myStats.comm_data_sz = 0;
+  receive_stats_ready = 0;
+
+  theLbdb->CollectStatsOn();
+}
+
+WSLB::~WSLB()
+{
+  CkPrintf("Going away\n");
+}
+
+void WSLB::FindNeighbors()
+{
+  if (neighbor_pes == 0) { // Neighbors never initialized, so init them
+                           // and other things that depend on the number
+                           // of neighbors
+    statsMsgsList = new WSLBStatsMsg*[num_neighbors()];
+    for(int i=0; i < num_neighbors(); i++)
+      statsMsgsList[i] = 0;
+    statsDataList = new LDStats[num_neighbors()];
+
+    neighbor_pes = new int[num_neighbors()];
+    neighbors(neighbor_pes);
+    mig_msgs_expected = num_neighbors();
+    mig_msgs = new WSLBMigrateMsg*[num_neighbors()];
+  }
+
+}
+
+void WSLB::AtSync()
+{
+  //  CkPrintf("[%d] WSLB At Sync step %d!!!!\n",CkMyPe(),mystep);
+
+  if (CkMyPe() == 0) {
+    start_lb_time = CmiWallTimer();
+    CkPrintf("Load balancing step %d starting at %f\n",
+            step(),start_lb_time);
+  }
+
+  if (neighbor_pes == 0) FindNeighbors();
+
+  if (!QueryBalanceNow(step()) || num_neighbors() == 0) {
+    MigrationDone();
+    return;
+  }
+
+  WSLBStatsMsg* msg = AssembleStats();
+
+  int i;
+  for(i=1; i < num_neighbors(); i++) {
+    WSLBStatsMsg* m2 = (WSLBStatsMsg*) CkCopyMsg((void**)&msg);
+    CProxy_WSLB(thisgroup).ReceiveStats(m2,neighbor_pes[i]);
+  }
+  if (0 < num_neighbors()) {
+    CProxy_WSLB(thisgroup).ReceiveStats(msg,neighbor_pes[0]);
+  } else delete msg;
+
+  // Tell our own node that we are ready
+  ReceiveStats((WSLBStatsMsg*)0);
+}
+
+WSLBStatsMsg* WSLB::AssembleStats()
+{
+  // Get stats
+  theLbdb->TotalTime(&myStats.total_walltime,&myStats.total_cputime);
+  theLbdb->IdleTime(&myStats.idletime);
+  theLbdb->BackgroundLoad(&myStats.bg_walltime,&myStats.bg_cputime);
+  myStats.obj_data_sz = theLbdb->GetObjDataSz();
+  myStats.objData = new LDObjData[myStats.obj_data_sz];
+  theLbdb->GetObjData(myStats.objData);
+
+  myStats.comm_data_sz = theLbdb->GetCommDataSz();
+  myStats.commData = new LDCommData[myStats.comm_data_sz];
+  theLbdb->GetCommData(myStats.commData);
+
+  myStats.obj_walltime = myStats.obj_cputime = 0;
+  for(int i=0; i < myStats.obj_data_sz; i++) {
+    myStats.obj_walltime += myStats.objData[i].wallTime;
+    myStats.obj_cputime += myStats.objData[i].cpuTime;
+  }    
+
+  WSLBStatsMsg* msg = new WSLBStatsMsg;
+
+  msg->from_pe = CkMyPe();
+  msg->serial = rand();
+  msg->proc_speed = myStats.proc_speed;
+  msg->total_walltime = myStats.total_walltime;
+  msg->total_cputime = myStats.total_cputime;
+  msg->idletime = myStats.idletime;
+  msg->bg_walltime = myStats.bg_walltime;
+  msg->bg_cputime = myStats.bg_cputime;
+  msg->obj_walltime = myStats.obj_walltime;
+  msg->obj_cputime = myStats.obj_cputime;
+
+  //  CkPrintf(
+  //    "Proc %d speed=%d Total(wall,cpu)=%f %f Idle=%f Bg=%f %f Obj=%f %f\n",
+  //    CkMyPe(),msg->proc_speed,msg->total_walltime,msg->total_cputime,
+  //    msg->idletime,msg->bg_walltime,msg->bg_cputime,
+  //    msg->obj_walltime,msg->obj_cputime);
+
+  //  CkPrintf("PE %d sending %d to ReceiveStats %d objs, %d comm\n",
+  //      CkMyPe(),msg->serial,msg->n_objs,msg->n_comm);
+  return msg;
+}
+
+void WSLB::Migrated(LDObjHandle h)
+{
+  migrates_completed++;
+  //  CkPrintf("[%d] An object migrated! %d %d\n",
+  //      CkMyPe(),migrates_completed,migrates_expected);
+  if (migrates_completed == migrates_expected) {
+    MigrationDone();
+  }
+}
+
+void WSLB::ReceiveStats(WSLBStatsMsg *m)
+{
+  if (neighbor_pes == 0) FindNeighbors();
+
+  if (m == 0) { // This is from our own node
+    receive_stats_ready = 1;
+  } else {
+    const int pe = m->from_pe;
+    //  CkPrintf("Stats msg received, %d %d %d %d %p\n",
+    //            pe,stats_msg_count,m->n_objs,m->serial,m);
+    int peslot = -1;
+    for(int i=0; i < num_neighbors(); i++) {
+      if (pe == neighbor_pes[i]) {
+       peslot = i;
+       break;
+      }
+    }
+    if (peslot == -1 || statsMsgsList[peslot] != 0) {
+      CkPrintf("*** Unexpected WSLBStatsMsg in ReceiveStats from PE %d ***\n",
+              pe);
+    } else {
+      statsMsgsList[peslot] = m;
+      statsDataList[peslot].from_pe = m->from_pe;
+      statsDataList[peslot].total_walltime = m->total_walltime;
+      statsDataList[peslot].total_cputime = m->total_cputime;
+      statsDataList[peslot].idletime = m->idletime;
+      statsDataList[peslot].bg_walltime = m->bg_walltime;
+      statsDataList[peslot].bg_cputime = m->bg_cputime;
+      statsDataList[peslot].proc_speed = m->proc_speed;
+      statsDataList[peslot].obj_walltime = m->obj_walltime;
+      statsDataList[peslot].obj_cputime = m->obj_cputime;
+      stats_msg_count++;
+    }
+  }
+
+  const int clients = num_neighbors();
+  if (stats_msg_count == clients && receive_stats_ready) {
+    double strat_start_time = CmiWallTimer();
+    receive_stats_ready = 0;
+    WSLBMigrateMsg* migrateMsg = Strategy(statsDataList,clients);
+
+    int i;
+
+    // Migrate messages from me to elsewhere
+    for(i=0; i < migrateMsg->n_moves; i++) {
+      MigrateInfo& move = migrateMsg->moves[i];
+      const int me = CkMyPe();
+      if (move.from_pe == me && move.to_pe != me) {
+       theLbdb->Migrate(move.obj,move.to_pe);
+      } else if (move.from_pe != me) {
+       CkPrintf("[%d] error, strategy wants to move from %d to  %d\n",
+                me,move.from_pe,move.to_pe);
+      }
+    }
+    
+    // Now, send migrate messages to neighbors
+    for(i=1; i < num_neighbors(); i++) {
+      WSLBMigrateMsg* m2 = (WSLBMigrateMsg*) CkCopyMsg((void**)&migrateMsg);
+      CProxy_WSLB(thisgroup).ReceiveMigration(m2,neighbor_pes[i]);
+    }
+    if (0 < num_neighbors())
+      CProxy_WSLB(thisgroup).ReceiveMigration(migrateMsg,
+                                                   neighbor_pes[0]);
+    else delete migrateMsg;
+    
+    // Zero out data structures for next cycle
+    for(i=0; i < clients; i++) {
+      delete statsMsgsList[i];
+      statsMsgsList[i]=0;
+    }
+    stats_msg_count=0;
+
+    theLbdb->ClearLoads();
+    if (CkMyPe() == 0) {
+      double strat_end_time = CmiWallTimer();
+      CkPrintf("Strat elapsed time %f\n",strat_end_time-strat_start_time);
+    }
+  }
+  
+}
+
+void WSLB::ReceiveMigration(WSLBMigrateMsg *msg)
+{
+  if (neighbor_pes == 0) FindNeighbors();
+
+  if (mig_msgs_received == 0) migrates_expected = 0;
+
+  mig_msgs[mig_msgs_received] = msg;
+  mig_msgs_received++;
+  //  CkPrintf("[%d] Received migration msg %d of %d\n",
+  //      CkMyPe(),mig_msgs_received,mig_msgs_expected);
+
+  if (mig_msgs_received > mig_msgs_expected) {
+    CkPrintf("[%d] WSLB Error! Too many migration messages received\n",
+            CkMyPe());
+  }
+
+  if (mig_msgs_received != mig_msgs_expected) {
+    return;
+  }
+
+  //  CkPrintf("[%d] in ReceiveMigration %d moves\n",CkMyPe(),msg->n_moves);
+  for(int neigh=0; neigh < mig_msgs_received;neigh++) {
+    WSLBMigrateMsg* m = mig_msgs[neigh];
+    for(int i=0; i < m->n_moves; i++) {
+      MigrateInfo& move = m->moves[i];
+      const int me = CkMyPe();
+      if (move.from_pe != me && move.to_pe == me) {
+       migrates_expected++;
+      }
+    }
+    delete m;
+    mig_msgs[neigh]=0;
+  }
+  //  CkPrintf("[%d] in ReceiveMigration %d expected\n",
+  //      CkMyPe(),migrates_expected);
+  mig_msgs_received = 0;
+  if (migrates_expected == 0 || migrates_expected == migrates_completed)
+    MigrationDone();
+}
+
+
+void WSLB::MigrationDone()
+{
+  if (CkMyPe() == 0) {
+    double end_lb_time = CmiWallTimer();
+    CkPrintf("Load balancing step %d finished at %f duration %f\n",
+            step(),end_lb_time,end_lb_time - start_lb_time);
+  }
+  migrates_completed = 0;
+  migrates_expected = -1;
+  // Increment to next step
+  mystep++;
+  CProxy_WSLB(thisgroup).ResumeClients(CkMyPe());
 }
 
-NLBMigrateMsg* WSLB::Strategy(NeighborLB::LDStats* stats, int count)
+void WSLB::ResumeClients()
+{
+  theLbdb->ResumeClients();
+}
+
+WSLBMigrateMsg* WSLB::Strategy(WSLB::LDStats* stats, int count)
 {
   //  CkPrintf("[%d] Strategy starting\n",CkMyPe());
   // Compute the average load to see if we are overloaded relative
@@ -42,8 +337,8 @@ NLBMigrateMsg* WSLB::Strategy(NeighborLB::LDStats* stats, int count)
   CkVector migrateInfo;
 
   if (myload > avgload) {
-    //    CkPrintf("[%d] OVERLOAD My load is %f, average load is %f\n",
-    //      CkMyPe(),myload,avgload);
+    //CkPrintf("[%d] OVERLOAD My load is %f, average load is %f\n",
+    //              CkMyPe(),myload,avgload);
 
     // First, build heaps of other processors and my objects
     // Then assign objects to other processors until either
@@ -110,7 +405,7 @@ NLBMigrateMsg* WSLB::Strategy(NeighborLB::LDStats* stats, int count)
       const int me = CkMyPe();
       // Apparently we can give this object to this processor
       //      CkPrintf("[%d] Obj %d of %d migrating from %d to %d\n",
-      //              CkMyPe(),obj->Id,myStats.obj_data_sz,me,p->Id);
+      //              CkMyPe(),obj->Id,myStats.obj_data_sz,me,p->Id);
 
       MigrateInfo* migrateMe = new MigrateInfo;
       migrateMe->obj = myStats.objData[obj->Id].handle;
@@ -143,7 +438,7 @@ NLBMigrateMsg* WSLB::Strategy(NeighborLB::LDStats* stats, int count)
   //  if (migrate_count > 0) {
   //    CkPrintf("PE %d migrating %d elements\n",CkMyPe(),migrate_count);
   //  }
-  NLBMigrateMsg* msg = new(&migrate_count,1) NLBMigrateMsg;
+  WSLBMigrateMsg* msg = new(&migrate_count,1) WSLBMigrateMsg;
   msg->n_moves = migrate_count;
   for(i=0; i < migrate_count; i++) {
     MigrateInfo* item = (MigrateInfo*) migrateInfo[i];
@@ -155,4 +450,36 @@ NLBMigrateMsg* WSLB::Strategy(NeighborLB::LDStats* stats, int count)
   return msg;
 };
 
+void* WSLBMigrateMsg::alloc(int msgnum, size_t size, int* array, int priobits)
+{
+  int totalsize = size + array[0] * sizeof(WSLB::MigrateInfo);
+
+  WSLBMigrateMsg* ret =
+    static_cast<WSLBMigrateMsg*>(CkAllocMsg(msgnum,totalsize,priobits));
+
+  ret->moves = reinterpret_cast<WSLB::MigrateInfo*>
+    (reinterpret_cast<char*>(ret)+ size);
+
+  return static_cast<void*>(ret);
+}
+
+void* WSLBMigrateMsg::pack(WSLBMigrateMsg* m)
+{
+  m->moves = reinterpret_cast<WSLB::MigrateInfo*>
+    (reinterpret_cast<char*>(m->moves) - reinterpret_cast<char*>(&m->moves));
+
+  return static_cast<void*>(m);
+}
+
+WSLBMigrateMsg* WSLBMigrateMsg::unpack(void *m)
+{
+  WSLBMigrateMsg* ret_val = static_cast<WSLBMigrateMsg*>(m);
+
+  ret_val->moves = reinterpret_cast<WSLB::MigrateInfo*>
+    (reinterpret_cast<char*>(&ret_val->moves) 
+     + reinterpret_cast<size_t>(ret_val->moves));
+
+  return ret_val;
+}
+
 #endif
index b5b3e0c5654992d9a0351283d541bae27d17c612..bef5be267a503cc80ea85fd964a9ec7ed7326e55 100644 (file)
@@ -1,9 +1,17 @@
 module WSLB {
 
-extern module NeighborLB;
+extern module LBDatabase;
 
-group WSLB : NeighborLB {
+readonly CkGroupID wslb;
+
+message WSLBStatsMsg;
+message[varsize] WSLBMigrateMsg;
+
+group WSLB {
   entry void WSLB(void);  
+  entry void ReceiveStats(WSLBStatsMsg*);
+  entry void ResumeClients(void);
+  entry void ReceiveMigration(WSLBMigrateMsg*);        
 };
 
 };
index da2254ef4a831d389fc3e17d5e579a25aecc23aa..f0f5ce24b6372c17bb705737fab59f01aa109f9c 100644 (file)
@@ -1,18 +1,54 @@
-#ifndef _WSLB_H_
-#define _WSLB_H_
+#ifndef NEIGHBORLB_H
+#define NEIGHBORLB_H
 
-#include <math.h>
-
-#include "NeighborLB.h"
+#include <LBDatabase.h>
 #include "WSLB.decl.h"
 
 void CreateWSLB();
 
-class WSLB : public NeighborLB {
+class WSLBStatsMsg;
+class WSLBMigrateMsg;
+
+class WSLB : public Group
+{
 public:
   WSLB();
-private:
-  CmiBool QueryBalanceNow(int step) { return CmiTrue; };
+  ~WSLB();
+  static void staticAtSync(void*);
+  void AtSync(void); // Everything is at the PE barrier
+
+  void ReceiveStats(WSLBStatsMsg *);           // Receive stats on PE 0
+  void ResumeClients();
+  void ReceiveMigration(WSLBMigrateMsg *);     // Receive migration data
+
+  // Migrated-element callback
+  static void staticMigrated(void* me, LDObjHandle h);
+  void Migrated(LDObjHandle h);
+
+  void MigrationDone(void);  // Call when migration is complete
+  int step() { return mystep; };
+
+  struct MigrateInfo {  // Used in WSLBMigrateMsg
+    LDObjHandle obj;
+    int from_pe;
+    int to_pe;
+  };
+
+  struct LDStats {  // Passed to Strategy
+    int from_pe;
+    double total_walltime;
+    double total_cputime;
+    double idletime;
+    double bg_walltime;
+    double bg_cputime;
+    double obj_walltime;
+    double obj_cputime;
+    int proc_speed;
+  };
+
+protected:
+  virtual CmiBool QueryBalanceNow(int) { return CmiTrue; };  
+  virtual WSLBMigrateMsg* Strategy(LDStats* stats,int count);
   virtual int num_neighbors() {
     return (CkNumPes() > 5) ? 4 : (CkNumPes()-1);
   };
@@ -33,7 +69,64 @@ private:
       _n[3] = (me + npe - bigstep) % npe;
   };
 
-  NLBMigrateMsg* Strategy(NeighborLB::LDStats* stats, int count);
+  LBDatabase* theLbdb;
+  struct {
+    int proc_speed;
+    double total_walltime;
+    double total_cputime;
+    double idletime;
+    double bg_walltime;
+    double bg_cputime;
+    int obj_data_sz;
+    LDObjData* objData;
+    int comm_data_sz;
+    LDCommData* commData;
+    double obj_walltime;
+    double obj_cputime;
+  } myStats;
+
+private:
+  void FindNeighbors();
+  WSLBStatsMsg* AssembleStats();
+
+  int mystep;
+  int stats_msg_count;
+  WSLBStatsMsg** statsMsgsList;
+  LDStats* statsDataList;
+  int migrates_completed;
+  int migrates_expected;
+  WSLBMigrateMsg** mig_msgs;
+  int mig_msgs_received;
+  int mig_msgs_expected;
+  int* neighbor_pes;
+  int receive_stats_ready;
+  double start_lb_time;
 };
 
-#endif /* _WSLB_H_ */
+class WSLBStatsMsg : public CMessage_WSLBStatsMsg {
+public:
+  int from_pe;
+  int serial;
+  int proc_speed;
+  double total_walltime;
+  double total_cputime;
+  double idletime;
+  double bg_walltime;
+  double bg_cputime;
+  double obj_walltime;
+  double obj_cputime;
+}; 
+
+class WSLBMigrateMsg : public CMessage_WSLBMigrateMsg {
+public:
+  int n_moves;
+  WSLB::MigrateInfo* moves;
+
+  // Other methods & data members 
+  
+  static void* alloc(int msgnum, size_t size, int* array, int priobits); 
+  static void* pack(WSLBMigrateMsg* in); 
+  static WSLBMigrateMsg* unpack(void* in); 
+}; 
+
+#endif /* NEIGHBORLB_H */
index c20a42f893b6524c6f13092434c3938e0b0184ed..51ae326b81c89b96247e086c064a7529f826bbca 100644 (file)
@@ -67,7 +67,8 @@ ALLHEADERS=charm++.h ckstream.h charm.h cpthreads.h converse.h \
        Refiner.h ObjGraph.h CentralLB.h RandCentLB.h \
        RecBisectBfLB.h graph.h fifoInt.h bitvecset.h \
        RefineLB.h MetisLB.h  Comm1LB.h \
-       HeapCentLB.h Set.h elements.h heap.h conv-ccs.h NeighborLB.h \
+       HeapCentLB.h Set.h elements.h heap.h conv-ccs.h \
+       NborBaseLB.h NeighborLB.h \
        WSLB.h CkLists.h GreedyRefLB.h RandRefLB.h CommLB.h CommLBHeap.h
 
 ALLINTERFACES=idl.ci
@@ -142,6 +143,7 @@ CVHEADERS=converse.h conv-mach.h conv-mach.sh \
           ../include/MetisLB.decl.h MetisLB.def.h \
           ../include/RefineLB.decl.h RefineLB.def.h \
           ../include/HeapCentLB.decl.h HeapCentLB.def.h \
+          ../include/NborBaseLB.decl.h NborBaseLB.def.h \
           ../include/NeighborLB.decl.h NeighborLB.def.h \
           ../include/WSLB.decl.h WSLB.def.h \
           ../include/GreedyRefLB.decl.h GreedyRefLB.def.h \
@@ -259,7 +261,8 @@ CKHEADERS=ck.h ckstream.h envelope.h init.h qd.h charm.h charm++.h trace.h \
          Refiner.h ObjGraph.h heap.h elements.h CommLBHeap.h\
          CentralLB.h RandCentLB.h RecBisectBfLB.h \
          RefineLB.h HeapCentLB.h CommLB.h Comm1LB.h\
-         MetisLB.h NeighborLB.h WSLB.h GreedyRefLB.h RandRefLB.h CkLists.h \
+         MetisLB.h NborBaseLB.h \
+         NeighborLB.h WSLB.h GreedyRefLB.h RandRefLB.h CkLists.h \
          $(CVHEADERS)
 
 CK_LIBS_CORE=libck.a
@@ -268,10 +271,11 @@ LIBCK_CORE=init.o register.o qd.o ck.o main.o msgalloc.o ckfutures.o \
            ckarray.o tempo.o waitqd.o LBDatabase.o lbdb.o \
           LBDBManager.o LBComm.o LBObj.o LBMachineUtil.o Refiner.o \
           ObjGraph.o \
-          CentralLB.o RandCentLB.o RecBisectBfLB.o graph.o bitvecset.o fifoInt.o \
+          CentralLB.o RandCentLB.o \
+          RecBisectBfLB.o graph.o bitvecset.o fifoInt.o \
           MetisLB.o RefineLB.o Set.o CommLB.o Comm1LB.o\
-           HeapCentLB.o heap.o NeighborLB.o WSLB.o GreedyRefLB.o \
-          RandRefLB.o CommLBHeap.o
+           HeapCentLB.o heap.o NborBaseLB.o NeighborLB.o WSLB.o \
+          GreedyRefLB.o RandRefLB.o CommLBHeap.o
 
 charmlibs: converse $(CK_LIBS_CORE)
        cd libs; make charmlibs OPTS='$(OPTS)'
@@ -321,6 +325,9 @@ libtrace-summary.a: $(LIBTRACE_SUMM)
 ../include/HeapCentLB.decl.h : HeapCentLB.decl.h
        /bin/cp HeapCentLB.decl.h ../include
 
+../include/NborBaseLB.decl.h : NborBaseLB.decl.h
+       /bin/cp NborBaseLB.decl.h ../include
+
 ../include/NeighborLB.decl.h : NeighborLB.decl.h
        /bin/cp NeighborLB.decl.h ../include
 
@@ -381,6 +388,9 @@ Comm1LB.decl.h Comm1LB.def.h : Comm1LB.ci charmxi
 HeapCentLB.decl.h HeapCentLB.def.h : HeapCentLB.ci charmxi
        $(CHARMC) HeapCentLB.ci
 
+NborBaseLB.decl.h NborBaseLB.def.h : NborBaseLB.ci charmxi
+       $(CHARMC) NborBaseLB.ci
+
 NeighborLB.decl.h NeighborLB.def.h : NeighborLB.ci charmxi
        $(CHARMC) NeighborLB.ci
 
@@ -480,6 +490,9 @@ heap.o: heap.C $(CKHEADERS) elements.h heap.h
 CommLBHeap.o: CommLBHeap.C $(CKHEADERS) CommLBHeap.h
        $(CHARMC) -o CommLBHeap.o CommLBHeap.C
 
+NborBaseLB.o: NborBaseLB.C $(CKHEADERS)
+       $(CHARMC) -o NborBaseLB.o NborBaseLB.C
+
 NeighborLB.o: NeighborLB.C $(CKHEADERS)
        $(CHARMC) -o NeighborLB.o NeighborLB.C