Added checkpointing to POSE. Also added a couple tests to the Ring Makefile in charm...
authorRyan Mokos <mokos@illinois.edu>
Fri, 26 Mar 2010 23:21:52 +0000 (18:21 -0500)
committerRyan Mokos <mokos@illinois.edu>
Fri, 26 Mar 2010 23:21:52 +0000 (18:21 -0500)
20 files changed:
examples/pose/Ring/Makefile
src/libs/ck-libs/pose/evq.C
src/libs/ck-libs/pose/gvt.C
src/libs/ck-libs/pose/gvt.ci
src/libs/ck-libs/pose/gvt.h
src/libs/ck-libs/pose/memory_temporal.ci
src/libs/ck-libs/pose/memory_temporal.h
src/libs/ck-libs/pose/mempool.ci
src/libs/ck-libs/pose/mempool.h
src/libs/ck-libs/pose/pose.C
src/libs/ck-libs/pose/pose.ci
src/libs/ck-libs/pose/pose.h
src/libs/ck-libs/pose/pose_config.h
src/libs/ck-libs/pose/seq.C
src/libs/ck-libs/pose/sim.C
src/libs/ck-libs/pose/sim.ci
src/libs/ck-libs/pose/sim.h
src/libs/ck-libs/pose/srtable.h
src/libs/ck-libs/pose/stats.ci
src/libs/ck-libs/pose/stats.h

index fe298fbeec9839b70b659a4adaf906cc5163d19a..de4dc79f3af1ce6cc1951d2378d3537d15ad730d 100644 (file)
@@ -9,7 +9,13 @@
 #
 # ***********************************************************************
 
-OPTS=-O#-DCMK_OPTIMIZE=1 -DCMK_MEMCHECKS_OFF=1
+# For parallel simulation
+OPTS=-O #-DCMK_OPTIMIZE=1 -DCMK_MEMCHECKS_OFF=1
+ETRANS_OPTS=
+
+# For sequential simulation
+#OPTS=-O -DSEQUENTIAL_POSE=1 #-DCMK_OPTIMIZE=1 -DCMK_MEMCHECKS_OFF=1
+#ETRANS_OPTS=-s
 
 CHARMBASE=../../..
 CHARMBIN=$(CHARMBASE)/bin
@@ -114,8 +120,15 @@ Worker.def.h Worker.decl.h : Worker_sim.ci
        $(CHARMC) $(INCLUDES) $(LIBS) Worker_sim.ci
 
 Worker_sim.C Worker_sim.h Worker_sim.ci: Worker.C Worker.h Worker.ci 
-       $(CHARMBIN)/etrans.pl Worker
+       $(CHARMBIN)/etrans.pl $(ETRANS_OPTS) Worker
 
 test: $(PGM)
        ./pgm 20 200 MEDIUM -gm 10
 
+partest: $(PGM)
+       ./charmrun +p4 ./pgm 20 200 MEDIUM -gm 1
+       ./charmrun +p4 ./pgm +restart __pose_chkpt_files/
+
+seqtest: $(PGM).seq
+       ./pgm.seq 20 800 MEDIUM -gm 1
+       ./pgm.seq +restart __pose_chkpt_files/
index 976b66b5bd71a4c3d68b8223b94749d58df1736c..d54ce365b4dea0227e899a6e6edc38a2b7091ecf 100644 (file)
@@ -7,6 +7,10 @@ eventQueue::eventQueue()
   if(pose_config.dop){
     sprintf(filename, "dop%d.log", CkMyPe());
     fp = fopen(filename, "a");
+    if (fp == NULL) {
+      CkPrintf("ERROR: unable to open DOP file %s for append\n");
+      CkAbort("Error opening file");
+    }
     lastLoggedVT = 0;
   }
   Event *e;
index 5192a7132ce437d3edcdb78128aaddb9c09d043a..bda52e312651293c91e4da1b47192702585acbd3 100644 (file)
@@ -71,12 +71,40 @@ PVT::PVT()
     else reportsExpected = 1 + (P-2)/4 + (P-2)%2;
   }
   //  CkPrintf("PE %d reports to %d, receives %d reports, reduces and sends to %d, and reports directly to GVT if %d = 1!\n", CkMyPe(), reportTo, reportsExpected, reportReduceTo, reportEnd);
+  parCheckpointInProgress = 0;
+  parLastCheckpointGVT = 0LL;
 #ifndef CMK_OPTIMIZE
   if(pose_config.stats)
     localStats->TimerStop();
 #endif
 }
 
+/// PUP routine
+void PVT::pup(PUP::er &p) {
+  p|optPVT; p|conPVT; p|estGVT; p|repPVT;
+  p|simdone; p|iterMin; p|waitForFirst;
+  p|reportTo; p|reportsExpected; p|reportReduceTo; p|reportEnd;
+  p|gvtTurn; p|specEventCount; p|eventCount;
+  p|startPhaseActive; p|parCheckpointInProgress; p|parLastCheckpointGVT;
+  p|optGVT; p|conGVT; p|rdone;
+
+  if (p.isUnpacking()) {
+#ifndef CMK_OPTIMIZE
+    localStats = (localStat *)CkLocalBranch(theLocalStats);
+#endif
+#ifdef MEM_TEMPORAL
+    localTimePool = (TimePool *)CkLocalBranch(TempMemID);
+#endif
+    SendsAndRecvs = new SRtable();
+  }
+
+  SendsAndRecvs->pup(p);
+
+  if (SRs != NULL) {
+    CkAbort("ERROR: PVT member *SRs is unexpectedly not NULL\n");
+  }
+}
+
 void PVT::startPhaseExp(prioBcMsg *m) {
   startPhase(m);
 }
@@ -205,6 +233,58 @@ void PVT::setGVT(GVTMsg *m)
 #ifdef MEM_TEMPORAL
   localTimePool->set_min_time(estGVT);
 #endif
+
+  // Parallel checkpointing: setGVT was broken into two functions, and
+  // beginCheckpoint was added.  Only initiate the checkpointing
+  // procedure on PE 0, after commits have occurred.  This should
+  // minimize the amount of data written to disk.  In order to ensure
+  // a stable state, we wait for quiescence to be reached before
+  // beginning the checkpoint.  Inconsistent results were obtained
+  // (possibly from messages still in transit) without this step.
+  // Once quiescence is reached, PE 0 begins the checkpoint, and then
+  // resumes the simulation in resumeAfterCheckpoint.  This Callback
+  // function is also the first POSE function to be called when
+  // restarting from a checkpoint.
+
+  // Currently, checkpoints are initiated approximately every
+  // POSE_CHECKPOINT_INTERVAL GVT ticks (defined in pose_config.h).
+  // Support for a time-based interval could easily be added.
+
+  if ((CkMyPe() == 0) && (parCheckpointInProgress == 0) && 
+      (POSE_CHECKPOINT_INTERVAL > 0) && (estGVT >= (parLastCheckpointGVT + POSE_CHECKPOINT_INTERVAL))) {
+    // wait for quiescence to occur before checkpointing
+    eventMsg *dummyMsg = new eventMsg();
+    CkCallback cb(CkIndex_PVT::beginCheckpoint(dummyMsg), CkMyPe(), ThePVT);
+    CkStartQD(cb);
+  } else {
+    // skip checkpointing
+    eventMsg *dummyMsg = new eventMsg();
+    p[CkMyPe()].resumeAfterCheckpoint(dummyMsg);
+  }
+}
+
+/// ENTRY: begin checkpoint now that quiescence has been reached
+void PVT::beginCheckpoint(eventMsg *m) {
+  CkFreeMsg(m);
+  if (!parCheckpointInProgress) {  // ensure this only happens once
+    parCheckpointInProgress = 1;
+    CkPrintf("POSE: quiescence detected\n");
+    CkPrintf("POSE: beginning checkpoint on processor %d at GVT=%lld\n", CkMyPe(), estGVT);
+    eventMsg *dummyMsg = new eventMsg();
+    CkCallback cb(CkIndex_PVT::resumeAfterCheckpoint(dummyMsg), CkMyPe(), ThePVT);
+    CkStartCheckpoint(POSE_CHECKPOINT_DIRECTORY, cb);
+  }
+}
+
+/// ENTRY: resume after checkpointing, restarting, or if checkpointing doesn't occur
+void PVT::resumeAfterCheckpoint(eventMsg *m) {
+  if (parCheckpointInProgress) {
+    CkPrintf("POSE: checkpoint/restart complete on processor %d at GVT=%lld\n", CkMyPe(), estGVT);
+    parCheckpointInProgress = 0;
+    parLastCheckpointGVT = estGVT;
+  }
+  CkFreeMsg(m);
+  CProxy_PVT p(ThePVT);
   startPhaseActive = 0;
   prioBcMsg *startMsg = new (8*sizeof(int)) prioBcMsg;
   startMsg->bc = 1;
index 2315590bde43df1e0438bc34adce7a9f37709fdb..6734bb511cc4ea5212083a24f1f83e5c6d33f7e9 100644 (file)
@@ -12,16 +12,18 @@ module gvt {
   message prioBcMsg;
 
   /// PVT chare group for computing local processor virtual time 
-  group PVT {
+  group [migratable] PVT {
     entry PVT(void);
     entry void startPhase(prioBcMsg *);
     entry void startPhaseExp(prioBcMsg *);     // not SMP safe to make this expedited
+    entry void beginCheckpoint(eventMsg *);
+    entry void resumeAfterCheckpoint(eventMsg *);
     entry [expedited] void setGVT(GVTMsg *);
     entry [expedited] void reportReduce(UpdateMsg *);
   };
 
   /// GVT chare group for estimating GVT
-  group GVT {
+  group [migratable] GVT {
     entry GVT(void);
     entry [expedited] void runGVT(UpdateMsg *);
     entry [expedited] void computeGVT(UpdateMsg *);
index 1b90cee0ed438358c7da50de5e5a104a177207ab..1f928bdbb66b032248215e7e906bbd1f1dedd858 100644 (file)
@@ -95,9 +95,15 @@ class PVT : public Group {
   int specEventCount, eventCount;
   /// startPhase active flag
   int startPhaseActive;
+  /// indicates if checkpointing is in progress
+  int parCheckpointInProgress;
+  /// GVT at which the last checkpoint was performed
+  POSE_TimeType parLastCheckpointGVT;
   /* things which used to be member function statics */
+  /// optimistic and coservative GVTs
   POSE_TimeType optGVT, conGVT;
   int rdone;
+  /// used in PVT report reduction
   SRentry *SRs;
 #ifdef MEM_TEMPORAL
   TimePool *localTimePool;
@@ -106,7 +112,12 @@ class PVT : public Group {
  public:
   /// Basic Constructor
   PVT(void);
-  PVT(CkMigrateMessage *) { };
+  /// Migration Constructor
+  PVT(CkMigrateMessage *msg) : Group(msg) { };
+  /// PUP routine
+  void pup(PUP::er &p);
+  /// Destructor
+  ~PVT() { }
   /// ENTRY: runs the PVT calculation and reports to GVT
   void startPhase(prioBcMsg *m);             
   /// ENTRY: runs the expedited PVT calculation and reports to GVT
@@ -115,6 +126,10 @@ class PVT : public Group {
   /** Receives the new GVT estimate and termination flag; wakes up objects
       for fossil collection and forward execution with new GVT estimate. */
   void setGVT(GVTMsg *m);            
+  /// ENTRY: begin checkpoint now that quiescence has been reached
+  void beginCheckpoint(eventMsg *m);
+  /// ENTRY: resume after checkpointing, restarting, or if checkpointing doesn't occur
+  void resumeAfterCheckpoint(eventMsg *m);
   /// Returns GVT estimate
   POSE_TimeType getGVT() { return estGVT; }    
 
@@ -165,14 +180,33 @@ private:
   /// Number of PVT reports expected (1 or 2)
   int reportsExpected;
   /* things which used to be member function static */
-  POSE_TimeType optGVT,  conGVT;
+  /// optimistic and coservative GVTs
+  POSE_TimeType optGVT, conGVT;
   int done;
+  /// used to calculate GVT from PVT reports
   SRentry *SRs;
   int startOffset;
 public:
   /// Basic Constructor
   GVT(void);
-  GVT(CkMigrateMessage *) { };
+  /// Migration Constructor
+  GVT(CkMigrateMessage *msg) : Group(msg) { };
+  /// PUP routine
+  void pup(PUP::er &p) {
+    p|estGVT; p|inactive; p|inactiveTime; p|nextLBstart;
+    p|lastEarliest; p|lastSends; p|lastRecvs; p|reportsExpected;
+    p|optGVT; p|conGVT; p|done; p|startOffset;
+
+    if (p.isUnpacking()) {
+#ifndef CMK_OPTIMIZE
+      localStats = (localStat *)CkLocalBranch(theLocalStats);
+#endif
+    }
+
+    if (SRs != NULL) {
+      CkAbort("ERROR: GVT member *SRs is unexpectedly not NULL\n");
+    }
+  }
   //Use this for Ccd calls
   //static void _runGVT(UpdateMsg *);
   /// ENTRY: Run the GVT
index 79094eebe6ab30e0e16f266254bc884e4a656ed8..0bf789c01f10e018d324ac0922e7d3296ce84027 100644 (file)
@@ -3,7 +3,7 @@
 module memory_temporal {
   readonly CkGroupID TempMemID;
 
-  group TimePool {
+  group [migratable] TimePool {
     entry TimePool();
   };
 };
index 2ac3e8a52b8f20e60aed1f1bf0244ca23657727e..c4233fe3e20454adfdfc14c8fa0d88b6b437bec7 100644 (file)
@@ -231,8 +231,9 @@ class TimePool : public Group {
  public:
   TimePool() : min_time(POSE_UnsetTS), last_in_use(NULL), first_in_use(NULL), 
     not_in_use(NULL), not_in_use_sz(0) {}
-  TimePool(CkMigrateMessage *) {}
+  TimePool(CkMigrateMessage *msg) : Group(msg) {}
   ~TimePool();
+  void pup(PUP::er &p) {}
   // Return memory from a time range
   char *tmp_alloc(POSE_TimeType timestamp, int sz_in_bytes);
   // "Free" up memory from a time range
index 4c87e4522116e05f1022b1b3e7c5652b169ffcd2..e168cc179c06deaf16e1a1a91cf326e80960a3a2 100644 (file)
@@ -4,7 +4,7 @@
 module mempool {
   readonly CkGroupID MemPoolID;
 
-  group MemoryPool {
+  group [migratable] MemoryPool {
     entry MemoryPool();
   };
 };
index 006afc4d84cd82a2eae126fb47a36e1643dfb393..ab9318d3fc12f758f5fdd1dbaa4a4ddb3f1740fa 100644 (file)
@@ -27,11 +27,11 @@ public:
   /// Basic initialization
   MemoryPool() :memPools(NULL),lastLook(NULL){
 #ifdef VERBOSE_DEBUG
-  CkPrintf("[%d] constructing MemoryPool\n",CkMyPe());
+    CkPrintf("[%d] constructing MemoryPool\n",CkMyPe());
 #endif
-
- }
-  MemoryPool(CkMigrateMessage *) { };
+  }
 MemoryPool(CkMigrateMessage *msg) : Group(msg) { }
+  void pup(PUP::er &p) { }
   /// returns number of blocks of size sz in pool
   int CheckPool(int sz); 
   /// returns a block from pool with size sz
index 32e9457ad8496060b2177261c3e9adc9872899bc..935161494d01915b858d5667355afa49b393a63b 100644 (file)
@@ -16,8 +16,13 @@ POSE_TimeType POSE_endtime;
 POSE_TimeType POSE_GlobalClock;
 POSE_TimeType POSE_GlobalTS;
 POSE_Config pose_config;
+#ifdef POSE_COMM_ON
 ComlibInstanceHandle POSE_commlib_insthndl;
+#endif
 int _POSE_SEQUENTIAL;
+int seqCheckpointInProgress;
+POSE_TimeType seqLastCheckpointGVT;
+CkQ<int> POSE_Skipped_Events;
 
 const eventID& GetEventID() {
   //CpvStaticDeclare(eventID, theEventID);  // initializes to [0.pe]
@@ -117,6 +122,8 @@ void POSE_init(int IDflag, int ET) // can specify both
   CkStartQD(fnIdx, &POSE_Coordinator_ID);
   POSE_GlobalClock = 0;
   POSE_GlobalTS = 0;
+  seqCheckpointInProgress = 0;
+  seqLastCheckpointGVT = 0;
 #else
   /*  CkPrintf("WARNING: Charm Quiescence termination enabled!\n");
   int fnIdx = CkIndex_pose::stop();
@@ -211,13 +218,18 @@ void pose::registerCallBack(callBack *cbm)
 void pose::stop(void) 
 { 
 #ifdef SEQUENTIAL_POSE
+  // don't stop if quiescence was reached for a checkpoint operation
+  if (seqCheckpointInProgress) {
+    POSE_Objects[0].SeqBeginCheckpoint();
+  } else {
 #if USE_LONG_TIMESTAMPS
-  CkPrintf("Sequential Endtime Approximation: %lld\n", POSE_GlobalClock);
+    CkPrintf("Sequential Endtime Approximation: %lld\n", POSE_GlobalClock);
 #else
-  CkPrintf("Sequential Endtime Approximation: %d\n", POSE_GlobalClock);
+    CkPrintf("Sequential Endtime Approximation: %d\n", POSE_GlobalClock);
 #endif
-  // Call sequential termination here, when done it calls prepExit
-  POSE_Objects.Terminate();
+    // Call sequential termination here, when done it calls prepExit
+    POSE_Objects.Terminate();
+  }
 #endif
   // prepExit();
 }
index a6a6928ff66b62df6497104347ddba6a21e1e26a..2119fd9913fc41be0b43250d61da067a4624b511 100644 (file)
@@ -3,11 +3,11 @@ module pose {
   readonly double busyWait;
   readonly double sim_timer;
   readonly int POSE_inactDetect;
-  readonly ComlibInstanceHandle POSE_commlib_insthndl;
+//  readonly ComlibInstanceHandle POSE_commlib_insthndl;
   readonly POSE_TimeType POSE_endtime;  
   readonly POSE_Config pose_config;
   message callBack;
-  chare pose {
+  chare [migratable] pose {
     entry pose();
     entry void registerCallBack(callBack *);
     entry void stop();
index a7edd1c6d952136f11666d8bd939f68c16995013..44321fa1a411debb754504e5331b4e0008d145bd 100644 (file)
@@ -105,8 +105,18 @@ extern int POSE_inactDetect;
 extern POSE_TimeType POSE_GlobalClock;
 extern POSE_TimeType POSE_GlobalTS;
 
+/// Checkpointing (for sequential simulation)
+extern int seqCheckpointInProgress;
+extern POSE_TimeType seqLastCheckpointGVT;
+// Global queue for storing POSE object array indices that are
+// skipped during quiescence detection just before checkpointing
+// (used in sequential mode only)
+extern CkQ<int> POSE_Skipped_Events;
+
 /// For getting access to the commlib strategy
+#ifdef POSE_COMM_ON
 extern ComlibInstanceHandle POSE_commlib_insthndl;
+#endif
 
 extern POSE_Config pose_config;
 
@@ -149,7 +159,13 @@ class pose : public Chare {
 #endif
 
  }
+  /// Migration constructor
   pose(CkMigrateMessage *) { }
+  /// PUP routine
+  void pup(PUP::er &p) {
+    p|cb;
+    p|callBackSet;
+  }
   /// Register the callback with POSE
   void registerCallBack(callBack *);
   /// Stop the simulation
index 65a457973ba349039685ac2b96258fb0796d3040..06d1ac4333bf7da360ca68a9d06b1ed8bf44a28b 100644 (file)
 /// Uncomment to turn on POSE load balancer
 //#define LB_ON 1
 
+#ifdef POSE_COMM_ON
 #include <StreamingStrategy.h>
 #include <MeshStreamingStrategy.h>
 #include <PrioStreaming.h>
+#endif
+
 #define COMM_TIMEOUT 1
 #define COMM_MAXMSG 20
 
 #define LB_THRESHOLD 4000   // 20 heavy objects
 #define LB_DIFF 2000       // min diff between min and max load PEs
 
+/// Checkpointing constants
+#define POSE_CHECKPOINT_INTERVAL 10000  // GVT ticks between checkpoints; 0 = no checkpointing
+#define POSE_CHECKPOINT_DIRECTORY "__pose_chkpt_files" // directory where checkpoint files are stored
+
 // MISC
 #define MAX_POOL_SIZE 40    // maximum size of a memory pool
 #define MAX_RECYCLABLE 1000 // maximum size of a recyclable block
index d6038dd97987d44ced3b3161e89840213fec0cae..e76cf0a6ad4c9dff12574ad90af5563297693611 100644 (file)
@@ -2,19 +2,40 @@
 // Module for sequential simulation strategy class
 #include "pose.h"
 
-void seq::Step()
-{
-  Event *ev;
-  // Prepare to execute an event
-  ev = eq->currentPtr;
-  currentEvent = ev;
-  if (ev->timestamp < POSE_GlobalTS)
-    CkPrintf("WARNING: SEQUENTIAL POSE BUG! Event timestamp %d is less than a previous one! This is due to stupid Charm++ Scheduler implementation and needs to be fixed.\n", ev->timestamp);
-  POSE_GlobalTS = ev->timestamp;
-  parent->ResolveFn(ev->fnIdx, ev->msg);  // execute it
-  if (userObj->OVT() > POSE_GlobalClock)
-    POSE_GlobalClock = userObj->OVT();
-  ev->done = 1;
-  eq->ShiftEvent();                       // move on to next event
-  eq->CommitAll(parent);
+void seq::Step() {
+
+  // execute event if checkpointing is not in progress or if the
+  // timestamp is before the GVT at which a checkpoint is occurring
+  if ((eq->currentPtr->timestamp < seqLastCheckpointGVT) || (!seqCheckpointInProgress)) {
+
+    Event *ev;
+    // Prepare to execute an event
+    ev = eq->currentPtr;
+    currentEvent = ev;
+    if (ev->timestamp < POSE_GlobalTS)
+      CkPrintf("WARNING: SEQUENTIAL POSE BUG! Event timestamp %d is less than a previous one! This is due to stupid Charm++ Scheduler implementation and needs to be fixed.\n", ev->timestamp);
+    POSE_GlobalTS = ev->timestamp;
+    parent->ResolveFn(ev->fnIdx, ev->msg);  // execute it
+    if (userObj->OVT() > POSE_GlobalClock)
+      POSE_GlobalClock = userObj->OVT();
+    ev->done = 1;
+    eq->ShiftEvent();                       // move on to next event
+    eq->CommitAll(parent);
+
+    // checkpoint if appropriate
+    if ((userObj->myHandle == 0) && (seqCheckpointInProgress == 0) && (POSE_CHECKPOINT_INTERVAL > 0) && 
+       (POSE_GlobalClock >= (seqLastCheckpointGVT + POSE_CHECKPOINT_INTERVAL))) {
+      // start quiescence detection on the sim chare
+      seqCheckpointInProgress = 1;
+      seqLastCheckpointGVT = POSE_GlobalClock;
+    }
+
+  } else {
+
+    // if in the process of checkpointing, store sim handle so Step()
+    // can be called for this event on restart
+    POSE_Skipped_Events.enq(userObj->myHandle);
+
+  }
+
 }
index 842d921553fb9a7d3ab1557faa67843b1a1da819..5743ea672d492678bd0743ed66c4df1f5ec74bf5 100644 (file)
@@ -41,11 +41,53 @@ sim::sim()
 /// Destructor
 sim::~sim() 
 {
+  active = -1;
+#ifndef SEQUENTIAL_POSE
+  localPVT->objRemove(myPVTidx);
+#endif
+  if(pose_config.lb_on)
+    localLBG->objRemove(myLBidx);
+
   delete(eq);
   delete(myStrat);
   delete(objID);
 }
 
+/// Pack/unpack/sizing operator
+void sim::pup(PUP::er &p) {
+  ArrayElement1D::pup(p); // call parent class pup method
+  // pup simple types
+  p(active); p(myPVTidx); p(myLBidx); p(sync); p(DOs); p(UNDOs);
+  // pup event queue
+  if (p.isUnpacking())
+    eq = new eventQueue();
+  eq->pup(p);
+  // pup cancellations
+  cancels.pup(p);
+  if (p.isUnpacking()) { // reactivate migrated object
+#ifndef CMK_OPTIMIZE
+    localStats = (localStat *)CkLocalBranch(theLocalStats);
+#endif
+#ifndef SEQUENTIAL_POSE
+    localPVT = (PVT *)CkLocalBranch(ThePVT);
+    myPVTidx = localPVT->objRegister(thisIndex, localPVT->getGVT(), sync, this);
+    if(pose_config.lb_on){
+      localLBG = TheLBG.ckLocalBranch();
+      myLBidx = localLBG->objRegister(thisIndex, sync, this);
+    }
+#endif
+    active = 0;
+  }
+  // pup checkpoint info for sequential mode using sim 0 only
+#ifdef SEQUENTIAL_POSE
+  if (thisIndex == 0) {
+    p|seqCheckpointInProgress;
+    p|seqLastCheckpointGVT;
+    p|POSE_Skipped_Events;
+  }
+#endif
+}
+
 /// Start a forward execution step on myStrat
 void sim::Step()
 {
@@ -221,6 +263,70 @@ void sim::Cancel(cancelMsg *m)
 #endif
 }
 
+// Sequential checkpointing: Two functions, SeqBeginCheckpoint and
+// SeqResumeAfterCheckpoint, were added to the sim class to
+// handle this.  Only initiate the checkpointing procedure on sim
+// 0, after commits have occurred.  This should minimize the
+// amount of data written to disk.  In order to ensure a stable
+// state, we wait for quiescence to be reached before beginning
+// the checkpoint.  Once this happens, sim 0 checkpoints and then
+// resumes the simulation in SeqResumeAfterCheckpoint.  This
+// Callback function is also the first POSE function to be called
+// when restarting from a checkpoint.
+
+// While waiting for quiescence to be reached, all events with
+// timestamps less than the checkpoint GVT are allowed to
+// execute.  All others are skipped, instead storing their sim handles (indices
+// into the POSE_Objects array) in POSE_Skipped_Events.  Even
+// though execution is skipped, the events still remain in their
+// event queues.  When resuming the simulation, both after
+// checkpointing and after a restart, sim::Step() (which calls
+// seq::Step()) is called on each poser listed in
+// POSE_Skipped_Events to execute the skipped events.
+
+// Currently, checkpoints are initiated approximately every
+// POSE_CHECKPOINT_INTERVAL GVT ticks (defined in pose_config.h).
+// Support for a time-based interval could easily be added.
+
+/// In sequential mode, begin checkpoint after reaching quiescence
+void sim::SeqBeginCheckpoint() {
+  // Ensure this only happens on sim 0
+  CkAssert(thisIndex == 0);
+  // Ensure we're checkpointing
+  CkAssert(seqCheckpointInProgress);
+  CkPrintf("POSE: quiescence detected\n");
+  CkPrintf("POSE: beginning checkpoint on sim %d at GVT=%lld\n", thisIndex, seqLastCheckpointGVT);
+  CkCallback cb(CkIndex_sim::SeqResumeAfterCheckpoint(), CkArrayIndex1D(thisIndex), thisProxy);
+  CkStartCheckpoint(POSE_CHECKPOINT_DIRECTORY, cb);
+}
+
+/// In sequential mode, resume after checkpointing or restarting
+void sim::SeqResumeAfterCheckpoint() {
+  // Ensure this only happens on sim 0
+  CkAssert(thisIndex == 0);
+  // Ensure this function is only called once after a checkpoint
+  CkAssert(seqCheckpointInProgress);
+  seqCheckpointInProgress = 0;
+  POSE_GlobalClock = seqLastCheckpointGVT;
+  CkPrintf("POSE: checkpoint/restart complete on sim %d at GVT=%lld\n", thisIndex, POSE_GlobalClock);
+  // restart simulation
+  while (POSE_Skipped_Events.length() > 0) {
+    // These Step iterations MUST be executed now, before any messages
+    // are delivered, or else the event queues will break.  To do this
+    // efficiently, since we're in sequential mode, call Step() as a
+    // local function.
+    int index = POSE_Skipped_Events.deq();
+    sim *localSim = POSE_Objects[index].ckLocal();
+    if (localSim == NULL) {
+      CkPrintf("ERROR: could not obtain pointer to local sim object %d after checkpoint/restart\n", index);
+      CkAbort("Pointer to local sim is NULL...this shouldn't happen in sequential mode\n");
+    } else {
+      localSim->Step();
+    }
+  }
+  CkStartQD(CkIndex_pose::stop(), &POSE_Coordinator_ID);
+}
+
 /// Dump all data fields
 void sim::dump()
 {
index e50b56ac5694a1a2f23f396fc41ccc271e8cddd5..8ac0fb773c56769235599ca61f28e7b2e1cb66ca 100644 (file)
@@ -23,5 +23,7 @@ module sim {
     entry void Migrate(destMsg *);     
     entry void Terminate(void);        
     entry void Cancel(cancelMsg *);
+    entry void SeqBeginCheckpoint();
+    entry void SeqResumeAfterCheckpoint();
   };
 };
index a1ba4b81b88b69fd6a74cfe5d492db5fea697faa..1bc82e9954166f52ac74663e337ccdc9dfc088ae 100644 (file)
@@ -236,39 +236,7 @@ class sim : public CBase_sim {
   /// Destructor
   virtual ~sim();
   /// Pack/unpack/sizing operator
-  virtual void pup(PUP::er &p) {
-    ArrayElement1D::pup(p); // call parent class pup method
-    // pup simple types
-    p(active); p(myPVTidx); p(myLBidx); p(sync); p(DOs); p(UNDOs);
-    // pup event queue
-    if (p.isUnpacking())
-      eq = new eventQueue();
-    eq->pup(p);
-    // pup cancellations
-    cancels.pup(p);
-    if (p.isUnpacking()) { // reactivate migrated object
-#ifndef CMK_OPTIMIZE
-      localStats = (localStat *)CkLocalBranch(theLocalStats);
-#endif
-#ifndef SEQUENTIAL_POSE
-      localPVT = (PVT *)CkLocalBranch(ThePVT);
-      myPVTidx = localPVT->objRegister(thisIndex, localPVT->getGVT(), sync, this);
-      if(pose_config.lb_on){
-       localLBG = TheLBG.ckLocalBranch();
-       myLBidx = localLBG->objRegister(thisIndex, sync, this);
-      }
-#endif
-      active = 0;
-    }
-    else if (p.isPacking()) { // deactivate migrating object
-      active = -1;
-#ifndef SEQUENTIAL_POSE
-      localPVT->objRemove(myPVTidx);
-#endif
-      if(pose_config.lb_on)
-       localLBG->objRemove(myLBidx);
-    }
-  }
+  virtual void pup(PUP::er &p);
   /// Start a forward execution step on myStrat
   void Step();                 
   /// Start a prioritized forward execution step on myStrat
@@ -286,8 +254,11 @@ class sim : public CBase_sim {
   /// Migrate this poser to processor indicated in m
   void Migrate(destMsg *m) { migrateMe(m->destPE); }
   /// Terminate this poser, when everyone is terminated we exit 
-
   void Terminate() { objID->terminus(); int i=1;contribute(sizeof(int),&i,CkReduction::sum_int,CkCallback(POSE_prepExit,NULL)); }
+  /// In sequential mode, begin checkpoint after reaching quiescence
+  void SeqBeginCheckpoint();
+  /// In sequential mode, resume after checkpointing or restarting
+  void SeqResumeAfterCheckpoint();
   /// Return this poser's unique index on PVT branch
   int PVTindex() { return myPVTidx; }
   /// Test active flag
index 80ec8ad8b4acf768239b53ac5bba50a236d28050..0eac5798e75699fcefee3848585621d60514f58a 100644 (file)
@@ -24,8 +24,8 @@ class SRentry {
   /// Basic constructor
   /** Initializes all data members */
   SRentry() :timestamp(POSE_UnsetTS), sends(0), recvs(0), next(NULL) 
-{ 
-  }
+    {
+    }
   /// Initializing constructor 1
   /** Initializes timestamp & next w/parameters, sends & recvs to 0 */
   SRentry(POSE_TimeType ts, SRentry *p) :timestamp(ts), sends(0), recvs(0), next(p) 
@@ -46,6 +46,31 @@ class SRentry {
       if (sr == SEND) { sends = 1; recvs = 0; }
       else { sends = 0; recvs = 1; }
     }
+  /// PUP routine
+  /** This traverses the whole list of SRentrys using the next
+      pointer, PUPing each one */
+  void pup(PUP::er &p) {
+    p|timestamp; p|sends; p|recvs;
+    int nullFlag;
+    if (next == NULL) {
+      nullFlag = 1;
+    } else {
+      nullFlag = 0;
+    }
+    p|nullFlag;
+    if (p.isUnpacking()) {
+      if (nullFlag) {
+       next == NULL;
+      } else {
+       next = new SRentry();
+       next->pup(p);
+      }
+    } else {
+      if (!nullFlag) {
+       next->pup(p);
+      }
+    }
+  }
   /// Assignment operator
   SRentry& operator=(const SRentry& e) {
     timestamp = e.timestamp;
@@ -115,6 +140,79 @@ class SRtable {
   SRtable();
   /// Destructor
   ~SRtable() { FreeTable(); }
+  /// PUP routine
+  void pup(PUP::er &p) {
+    p|offset; p|b; p|size_b; p|numOverflow;
+    PUParray(p, sends, MAX_B);
+    PUParray(p, recvs, MAX_B);
+    p|ofSends; p|ofRecvs;
+    PUParray(p, numEntries, MAX_B);
+
+    // pup buckets
+    int nullFlag;
+    SRentry *tmp;
+    for (int i = 0; i < MAX_B; i++) {
+      if (buckets[i] == NULL) {
+       nullFlag = 1;
+      } else {
+       nullFlag = 0;
+      }
+      p|nullFlag;
+      if (p.isUnpacking()) {  // unpacking
+       if (nullFlag) {
+         buckets[i] = end_bucket[i] = NULL;
+       } else {
+         buckets[i] = new SRentry();
+         buckets[i]->pup(p);
+       }
+      } else {  // packing
+       if (!nullFlag) {
+         buckets[i]->pup(p);
+       }
+      }
+    }
+
+    // pup overflow bucket
+    if (overflow == NULL) {
+      nullFlag = 1;
+    } else {
+      nullFlag = 0;
+    }
+    p|nullFlag;
+    if (p.isUnpacking()) {  // unpacking
+      if (nullFlag) {
+       overflow = end_overflow = NULL;
+      } else {
+       overflow = new SRentry();
+       tmp = overflow;
+       do {
+         tmp->pup(p);
+         // At this point in unpacking, tmp->next is the old
+         // pointer to the next SRentry in the list, which of
+         // course is no longer valid.  However, if it's not NULL,
+         // that does indicate the existence of another SRentry in
+         // the list, so create a new one and unpack it in the next
+         // do-while iteration.
+         if (tmp->next) {
+           tmp->next = new SRentry();
+         } else {
+           // tmp currently points to the last SRentry in the list,
+           // so point end_overflow to it, too
+           end_overflow = tmp;
+         }
+         tmp = tmp->next;
+       } while (tmp);
+      }
+    } else {  // packing
+      if (!nullFlag) {
+       tmp = overflow;
+       while (tmp) {
+         tmp->pup(p);
+         tmp = tmp->next;
+       }
+      }
+    }
+  }
   /// Initialize table to a minimum size
   void Initialize();
   /// Insert send/recv record sr at timestamp ts
index 21b3b2725d27e520acac71b52a1a9253fd7de62e..d4fe19aa6dce2185c5747e726790185b175f9179 100644 (file)
@@ -4,12 +4,12 @@ module stats {
   readonly CkChareID theGlobalStats;
   message localStatSummary; 
 
-  group localStat {
+  group [migratable] localStat {
     entry localStat(void);
     entry void SendStats(void);
   };
 
-  chare globalStat {
+  chare [migratable] globalStat {
     entry globalStat(void);
     entry void localStatReport(localStatSummary *);
   };
index 434ed348a3adcb04de47bbca20387a69336fd948..02e83253d0fce992b4d2840741d8e56308f71217 100644 (file)
@@ -64,7 +64,8 @@ public:
     CkPrintf("[%d] constructing localStat\n",CkMyPe());
 #endif
   }
-  localStat(CkMigrateMessage *) { };
+  /// Migration constructor
+  localStat(CkMigrateMessage *msg) : Group(msg) { };
   /// Start the specified timer
   void TimerStart(int timer);  
   /// Stop the currently active timer
@@ -99,6 +100,7 @@ public:
     if (grt > maxGRT) maxGRT = grt;
   }
 };
+PUPbytes(localStat);
 
 /// Entity to gather stats from each PE and prepare final report
 class globalStat : public Chare {
@@ -113,11 +115,13 @@ private:
 public:
   /// Basic Constructor
   globalStat(void);
-  globalStat(CkMigrateMessage *) { };
+  /// Migration constructor
+  globalStat(CkMigrateMessage *msg) { };
   /// Receive, calculate and print statistics
   void localStatReport(localStatSummary *m); 
   void DOPcalc(int gvt, double grt);
 };
+PUPbytes(globalStat);
 
 
 // All timer functions are inlined below