I Rearranged the load balancer, so WSLB->NeighborLB, NeighborLB->NborBaseLB,
[charm.git] / src / ck-ldb / WSLB.C
index fed1508325d500185cbe26d80e3e305e4dd47cc4..0ec9ad25843fb75f99505447dab7e496fc6c37d0 100644 (file)
+#include <unistd.h>
 #include <charm++.h>
-
-#if CMK_LBDB_ON
-
-#include "CkLists.h"
-
+#include <LBDatabase.h>
+#include <CkLists.h>
 #include "heap.h"
 #include "WSLB.h"
 #include "WSLB.def.h"
 
+CkGroupID wslb;
+
+#if CMK_LBDB_ON
+
 void CreateWSLB()
 {
-  neighborlb = CProxy_WSLB::ckNew();
+  wslb = CProxy_WSLB::ckNew();
+}
+
+void WSLB::staticMigrated(void* data, LDObjHandle h)
+{
+  WSLB *me = static_cast<WSLB*>(data);
+
+  me->Migrated(h);
+}
+
+void WSLB::staticAtSync(void* data)
+{
+  WSLB *me = static_cast<WSLB*>(data);
+
+  me->AtSync();
 }
 
 WSLB::WSLB()
 {
-  if (CkMyPe() == 0)
-    CkPrintf("[%d] WSLB created\n",CkMyPe());
+  mystep = 0;
+  theLbdb = CProxy_LBDatabase(lbdb).ckLocalBranch();
+  theLbdb->
+    AddLocalBarrierReceiver(reinterpret_cast<LDBarrierFn>(staticAtSync),
+                           static_cast<void*>(this));
+  theLbdb->
+    NotifyMigrated(reinterpret_cast<LDMigratedFn>(staticMigrated),
+                  static_cast<void*>(this));
+
+
+  // I had to move neighbor initialization outside the constructor
+  // in order to get the virtual functions of any derived classes
+  // so I'll just set them to illegal values here.
+  neighbor_pes = 0;
+  stats_msg_count = 0;
+  statsMsgsList = 0;
+  statsDataList = 0;
+  migrates_completed = 0;
+  migrates_expected = -1;
+  mig_msgs_received = 0;
+  mig_msgs = 0;
+
+  myStats.proc_speed = theLbdb->ProcessorSpeed();
+//  char hostname[80];
+//  gethostname(hostname,79);
+//  CkPrintf("[%d] host %s speed %d\n",CkMyPe(),hostname,myStats.proc_speed);
+  myStats.obj_data_sz = 0;
+  myStats.comm_data_sz = 0;
+  receive_stats_ready = 0;
+
+  theLbdb->CollectStatsOn();
+}
+
+WSLB::~WSLB()
+{
+  CkPrintf("Going away\n");
+}
+
+void WSLB::FindNeighbors()
+{
+  if (neighbor_pes == 0) { // Neighbors never initialized, so init them
+                           // and other things that depend on the number
+                           // of neighbors
+    statsMsgsList = new WSLBStatsMsg*[num_neighbors()];
+    for(int i=0; i < num_neighbors(); i++)
+      statsMsgsList[i] = 0;
+    statsDataList = new LDStats[num_neighbors()];
+
+    neighbor_pes = new int[num_neighbors()];
+    neighbors(neighbor_pes);
+    mig_msgs_expected = num_neighbors();
+    mig_msgs = new WSLBMigrateMsg*[num_neighbors()];
+  }
+
+}
+
+void WSLB::AtSync()
+{
+  //  CkPrintf("[%d] WSLB At Sync step %d!!!!\n",CkMyPe(),mystep);
+
+  if (CkMyPe() == 0) {
+    start_lb_time = CmiWallTimer();
+    CkPrintf("Load balancing step %d starting at %f\n",
+            step(),start_lb_time);
+  }
+
+  if (neighbor_pes == 0) FindNeighbors();
+
+  if (!QueryBalanceNow(step()) || num_neighbors() == 0) {
+    MigrationDone();
+    return;
+  }
+
+  WSLBStatsMsg* msg = AssembleStats();
+
+  int i;
+  for(i=1; i < num_neighbors(); i++) {
+    WSLBStatsMsg* m2 = (WSLBStatsMsg*) CkCopyMsg((void**)&msg);
+    CProxy_WSLB(thisgroup).ReceiveStats(m2,neighbor_pes[i]);
+  }
+  if (0 < num_neighbors()) {
+    CProxy_WSLB(thisgroup).ReceiveStats(msg,neighbor_pes[0]);
+  } else delete msg;
+
+  // Tell our own node that we are ready
+  ReceiveStats((WSLBStatsMsg*)0);
+}
+
+WSLBStatsMsg* WSLB::AssembleStats()
+{
+  // Get stats
+  theLbdb->TotalTime(&myStats.total_walltime,&myStats.total_cputime);
+  theLbdb->IdleTime(&myStats.idletime);
+  theLbdb->BackgroundLoad(&myStats.bg_walltime,&myStats.bg_cputime);
+  myStats.obj_data_sz = theLbdb->GetObjDataSz();
+  myStats.objData = new LDObjData[myStats.obj_data_sz];
+  theLbdb->GetObjData(myStats.objData);
+
+  myStats.comm_data_sz = theLbdb->GetCommDataSz();
+  myStats.commData = new LDCommData[myStats.comm_data_sz];
+  theLbdb->GetCommData(myStats.commData);
+
+  myStats.obj_walltime = myStats.obj_cputime = 0;
+  for(int i=0; i < myStats.obj_data_sz; i++) {
+    myStats.obj_walltime += myStats.objData[i].wallTime;
+    myStats.obj_cputime += myStats.objData[i].cpuTime;
+  }    
+
+  WSLBStatsMsg* msg = new WSLBStatsMsg;
+
+  msg->from_pe = CkMyPe();
+  msg->serial = rand();
+  msg->proc_speed = myStats.proc_speed;
+  msg->total_walltime = myStats.total_walltime;
+  msg->total_cputime = myStats.total_cputime;
+  msg->idletime = myStats.idletime;
+  msg->bg_walltime = myStats.bg_walltime;
+  msg->bg_cputime = myStats.bg_cputime;
+  msg->obj_walltime = myStats.obj_walltime;
+  msg->obj_cputime = myStats.obj_cputime;
+
+  //  CkPrintf(
+  //    "Proc %d speed=%d Total(wall,cpu)=%f %f Idle=%f Bg=%f %f Obj=%f %f\n",
+  //    CkMyPe(),msg->proc_speed,msg->total_walltime,msg->total_cputime,
+  //    msg->idletime,msg->bg_walltime,msg->bg_cputime,
+  //    msg->obj_walltime,msg->obj_cputime);
+
+  //  CkPrintf("PE %d sending %d to ReceiveStats %d objs, %d comm\n",
+  //      CkMyPe(),msg->serial,msg->n_objs,msg->n_comm);
+  return msg;
+}
+
+void WSLB::Migrated(LDObjHandle h)
+{
+  migrates_completed++;
+  //  CkPrintf("[%d] An object migrated! %d %d\n",
+  //      CkMyPe(),migrates_completed,migrates_expected);
+  if (migrates_completed == migrates_expected) {
+    MigrationDone();
+  }
+}
+
+void WSLB::ReceiveStats(WSLBStatsMsg *m)
+{
+  if (neighbor_pes == 0) FindNeighbors();
+
+  if (m == 0) { // This is from our own node
+    receive_stats_ready = 1;
+  } else {
+    const int pe = m->from_pe;
+    //  CkPrintf("Stats msg received, %d %d %d %d %p\n",
+    //            pe,stats_msg_count,m->n_objs,m->serial,m);
+    int peslot = -1;
+    for(int i=0; i < num_neighbors(); i++) {
+      if (pe == neighbor_pes[i]) {
+       peslot = i;
+       break;
+      }
+    }
+    if (peslot == -1 || statsMsgsList[peslot] != 0) {
+      CkPrintf("*** Unexpected WSLBStatsMsg in ReceiveStats from PE %d ***\n",
+              pe);
+    } else {
+      statsMsgsList[peslot] = m;
+      statsDataList[peslot].from_pe = m->from_pe;
+      statsDataList[peslot].total_walltime = m->total_walltime;
+      statsDataList[peslot].total_cputime = m->total_cputime;
+      statsDataList[peslot].idletime = m->idletime;
+      statsDataList[peslot].bg_walltime = m->bg_walltime;
+      statsDataList[peslot].bg_cputime = m->bg_cputime;
+      statsDataList[peslot].proc_speed = m->proc_speed;
+      statsDataList[peslot].obj_walltime = m->obj_walltime;
+      statsDataList[peslot].obj_cputime = m->obj_cputime;
+      stats_msg_count++;
+    }
+  }
+
+  const int clients = num_neighbors();
+  if (stats_msg_count == clients && receive_stats_ready) {
+    double strat_start_time = CmiWallTimer();
+    receive_stats_ready = 0;
+    WSLBMigrateMsg* migrateMsg = Strategy(statsDataList,clients);
+
+    int i;
+
+    // Migrate messages from me to elsewhere
+    for(i=0; i < migrateMsg->n_moves; i++) {
+      MigrateInfo& move = migrateMsg->moves[i];
+      const int me = CkMyPe();
+      if (move.from_pe == me && move.to_pe != me) {
+       theLbdb->Migrate(move.obj,move.to_pe);
+      } else if (move.from_pe != me) {
+       CkPrintf("[%d] error, strategy wants to move from %d to  %d\n",
+                me,move.from_pe,move.to_pe);
+      }
+    }
+    
+    // Now, send migrate messages to neighbors
+    for(i=1; i < num_neighbors(); i++) {
+      WSLBMigrateMsg* m2 = (WSLBMigrateMsg*) CkCopyMsg((void**)&migrateMsg);
+      CProxy_WSLB(thisgroup).ReceiveMigration(m2,neighbor_pes[i]);
+    }
+    if (0 < num_neighbors())
+      CProxy_WSLB(thisgroup).ReceiveMigration(migrateMsg,
+                                                   neighbor_pes[0]);
+    else delete migrateMsg;
+    
+    // Zero out data structures for next cycle
+    for(i=0; i < clients; i++) {
+      delete statsMsgsList[i];
+      statsMsgsList[i]=0;
+    }
+    stats_msg_count=0;
+
+    theLbdb->ClearLoads();
+    if (CkMyPe() == 0) {
+      double strat_end_time = CmiWallTimer();
+      CkPrintf("Strat elapsed time %f\n",strat_end_time-strat_start_time);
+    }
+  }
+  
+}
+
+void WSLB::ReceiveMigration(WSLBMigrateMsg *msg)
+{
+  if (neighbor_pes == 0) FindNeighbors();
+
+  if (mig_msgs_received == 0) migrates_expected = 0;
+
+  mig_msgs[mig_msgs_received] = msg;
+  mig_msgs_received++;
+  //  CkPrintf("[%d] Received migration msg %d of %d\n",
+  //      CkMyPe(),mig_msgs_received,mig_msgs_expected);
+
+  if (mig_msgs_received > mig_msgs_expected) {
+    CkPrintf("[%d] WSLB Error! Too many migration messages received\n",
+            CkMyPe());
+  }
+
+  if (mig_msgs_received != mig_msgs_expected) {
+    return;
+  }
+
+  //  CkPrintf("[%d] in ReceiveMigration %d moves\n",CkMyPe(),msg->n_moves);
+  for(int neigh=0; neigh < mig_msgs_received;neigh++) {
+    WSLBMigrateMsg* m = mig_msgs[neigh];
+    for(int i=0; i < m->n_moves; i++) {
+      MigrateInfo& move = m->moves[i];
+      const int me = CkMyPe();
+      if (move.from_pe != me && move.to_pe == me) {
+       migrates_expected++;
+      }
+    }
+    delete m;
+    mig_msgs[neigh]=0;
+  }
+  //  CkPrintf("[%d] in ReceiveMigration %d expected\n",
+  //      CkMyPe(),migrates_expected);
+  mig_msgs_received = 0;
+  if (migrates_expected == 0 || migrates_expected == migrates_completed)
+    MigrationDone();
+}
+
+
+void WSLB::MigrationDone()
+{
+  if (CkMyPe() == 0) {
+    double end_lb_time = CmiWallTimer();
+    CkPrintf("Load balancing step %d finished at %f duration %f\n",
+            step(),end_lb_time,end_lb_time - start_lb_time);
+  }
+  migrates_completed = 0;
+  migrates_expected = -1;
+  // Increment to next step
+  mystep++;
+  CProxy_WSLB(thisgroup).ResumeClients(CkMyPe());
 }
 
-NLBMigrateMsg* WSLB::Strategy(NeighborLB::LDStats* stats, int count)
+void WSLB::ResumeClients()
+{
+  theLbdb->ResumeClients();
+}
+
+WSLBMigrateMsg* WSLB::Strategy(WSLB::LDStats* stats, int count)
 {
   //  CkPrintf("[%d] Strategy starting\n",CkMyPe());
   // Compute the average load to see if we are overloaded relative
@@ -42,8 +337,8 @@ NLBMigrateMsg* WSLB::Strategy(NeighborLB::LDStats* stats, int count)
   CkVector migrateInfo;
 
   if (myload > avgload) {
-    //    CkPrintf("[%d] OVERLOAD My load is %f, average load is %f\n",
-    //      CkMyPe(),myload,avgload);
+    //CkPrintf("[%d] OVERLOAD My load is %f, average load is %f\n",
+    //              CkMyPe(),myload,avgload);
 
     // First, build heaps of other processors and my objects
     // Then assign objects to other processors until either
@@ -110,7 +405,7 @@ NLBMigrateMsg* WSLB::Strategy(NeighborLB::LDStats* stats, int count)
       const int me = CkMyPe();
       // Apparently we can give this object to this processor
       //      CkPrintf("[%d] Obj %d of %d migrating from %d to %d\n",
-      //              CkMyPe(),obj->Id,myStats.obj_data_sz,me,p->Id);
+      //              CkMyPe(),obj->Id,myStats.obj_data_sz,me,p->Id);
 
       MigrateInfo* migrateMe = new MigrateInfo;
       migrateMe->obj = myStats.objData[obj->Id].handle;
@@ -143,7 +438,7 @@ NLBMigrateMsg* WSLB::Strategy(NeighborLB::LDStats* stats, int count)
   //  if (migrate_count > 0) {
   //    CkPrintf("PE %d migrating %d elements\n",CkMyPe(),migrate_count);
   //  }
-  NLBMigrateMsg* msg = new(&migrate_count,1) NLBMigrateMsg;
+  WSLBMigrateMsg* msg = new(&migrate_count,1) WSLBMigrateMsg;
   msg->n_moves = migrate_count;
   for(i=0; i < migrate_count; i++) {
     MigrateInfo* item = (MigrateInfo*) migrateInfo[i];
@@ -155,4 +450,36 @@ NLBMigrateMsg* WSLB::Strategy(NeighborLB::LDStats* stats, int count)
   return msg;
 };
 
+void* WSLBMigrateMsg::alloc(int msgnum, size_t size, int* array, int priobits)
+{
+  int totalsize = size + array[0] * sizeof(WSLB::MigrateInfo);
+
+  WSLBMigrateMsg* ret =
+    static_cast<WSLBMigrateMsg*>(CkAllocMsg(msgnum,totalsize,priobits));
+
+  ret->moves = reinterpret_cast<WSLB::MigrateInfo*>
+    (reinterpret_cast<char*>(ret)+ size);
+
+  return static_cast<void*>(ret);
+}
+
+void* WSLBMigrateMsg::pack(WSLBMigrateMsg* m)
+{
+  m->moves = reinterpret_cast<WSLB::MigrateInfo*>
+    (reinterpret_cast<char*>(m->moves) - reinterpret_cast<char*>(&m->moves));
+
+  return static_cast<void*>(m);
+}
+
+WSLBMigrateMsg* WSLBMigrateMsg::unpack(void *m)
+{
+  WSLBMigrateMsg* ret_val = static_cast<WSLBMigrateMsg*>(m);
+
+  ret_val->moves = reinterpret_cast<WSLB::MigrateInfo*>
+    (reinterpret_cast<char*>(&ret_val->moves) 
+     + reinterpret_cast<size_t>(ret_val->moves));
+
+  return ret_val;
+}
+
 #endif