added command line option +ftc_disk to allow user to switch to local-disk checkpointing.
authorGengbin Zheng <gzheng@illinois.edu>
Sat, 8 May 2004 21:09:29 +0000 (21:09 +0000)
committerGengbin Zheng <gzheng@illinois.edu>
Sat, 8 May 2004 21:09:29 +0000 (21:09 +0000)
src/ck-core/ckmemcheckpoint.C
src/ck-core/ckmemcheckpoint.ci
src/ck-core/ckmemcheckpoint.h

index 8053bce591a1ab35a6467c0a244edbc9b9e2d887..babba2acf4222324132965d2c98b71451e07ee74 100644 (file)
@@ -8,32 +8,34 @@ written by Gengbin Zheng, gzheng@uiuc.edu
 
 added 12/18/03:
 
-To ensure fault tolerance while allowing migration, it uses double
-checkpointing scheme for each array element.
+To support fault tolerance while allowing migration, it uses double
+checkpointing scheme for each array element (not a infallible scheme).
 In this version, checkpointing is done based on array elements. 
 Each array element individully sends its checkpoint data to two buddies.
 
-In this implementation, assume at a time only one failure happens,
-and there is no failure during a checkpointing or restarting phase.
+In this implementation, assume only one failure happens at a time,
+or two failures on two processors which are not buddy to each other;
+also assume there is no failure during a checkpointing or restarting phase.
 
 Restart phase contains two steps:
-1. Converse level restart where only the newly created process for the failed
-   processor is working on restoring the system data (except array elements)
-   from its backup processor.
-2. CkMemCheckPT gets control and recover array elements and reset all
-   states to be consistent.
+1. Converse level restart: the newly created process for the failed
+   processor recover its system data (no array elements) from 
+   its backup processor.
+2. Charm++ level restart: CkMemCheckPT gets control and recover array 
+   elements and reset all states of system groups to be consistent.
 
 added 3/14/04:
 1. also support for double in-disk checkpoint/restart
-   use macro CK_USE_MEM and CK_USE_DISK to switch.
+   set "where" to CkCheckPoint_inDISK/CkCheckPoint_inMEM in init()
 
 added 4/16/04:
 1. also support the case when there is a pool of extra processors.
    set CK_NO_PROC_POOL to 0.
 
 TODO:
- checkpoint scheme can be reimplemented based on per processor scheme;
+1. checkpoint scheme can be reimplemented based on per processor scheme;
  restart phase should restore/reset group table, etc on all processors, thus flushStates() can be eliminated.
+2. crash at checkpointing phase currently is catastrophic, can be fixed by storing another transient checkpoints.
 
 */
 
@@ -44,24 +46,21 @@ TODO:
 
 #define DEBUGF     // CkPrintf
 
-// choose to use in-memory or in-disk checkpointing
-#define CK_USE_MEM                             1
-#define CK_USE_DISK                            0       
-
 // assume NO extra processors--1
 // assume extra processors--0
 #define CK_NO_PROC_POOL                                1
 
+// static, so that it is accessible from Converse part
 int CkMemCheckPT::inRestarting = 0;
 double CkMemCheckPT::startTime;
 char *CkMemCheckPT::stage;
+CkCallback CkMemCheckPT::cpCallback;
 
-int _memChkptOn = 1;
+int _memChkptOn = 1;                   // checkpoint is on or off
 
 CkGroupID ckCheckPTGroupID;            // readonly
 
-CkCallback CkMemCheckPT::cpCallback;    // static
-
+/// checkpoint buffer for processor system data
 CpvStaticDeclare(CkProcCheckPTMessage*, procChkptBuf);
 
 // compute the backup processor
@@ -69,7 +68,7 @@ CpvStaticDeclare(CkProcCheckPTMessage*, procChkptBuf);
 inline int ChkptOnPe() { return (CkMyPe()+1)%CkNumPes(); }
 
 // called in array element constructor
-// choose and register with 2 buggies for checkpoiting 
+// choose and register with 2 buddies for checkpoiting 
 #if CMK_MEM_CHECKPOINT
 void ArrayElement::init_checkpt() {
        if (_memChkptOn == 0) return;
@@ -77,7 +76,6 @@ void ArrayElement::init_checkpt() {
        // only master init checkpoint
         if (thisArray->getLocMgr()->firstManager->mgr!=thisArray) return;
 
-//        budPEs[0] = (CkMyPe()-1+CkNumPes())%CkNumPes();
         budPEs[0] = CkMyPe();
         budPEs[1] = (CkMyPe()+1)%CkNumPes();
        CmiAssert(budPEs[0] != budPEs[1]);
@@ -119,7 +117,7 @@ void ArrayElement::inmem_checkpoint(CkArrayCheckPTReqMessage *m) {
 #endif
 }
 
-// class for in memory checkpointing
+// checkpoint holder class - for memory checkpointing
 class CkMemCheckPTInfo: public CkCheckPTInfo
 {
   CkArrayCheckPTMessage *ckBuffer;
@@ -156,7 +154,7 @@ public:
   }
 };
 
-// class for in-disk checkpointing
+// checkpoint holder class - for in-disk checkpointing
 class CkDiskCheckPTInfo: public CkCheckPTInfo 
 {
   char *fname;
@@ -167,7 +165,7 @@ public:
   {
 #if CMK_USE_MKSTEMP
     fname = new char[64];
-    sprintf(fname, "/tmp/ckpt%d-XXXXXX", myidx);
+    sprintf(fname, "/tmp/ckpt%d-%d-XXXXXX", CkMyPe(), myidx);
     mkstemp(fname);
 #else
     fname=tmpnam(NULL);
@@ -182,10 +180,14 @@ public:
   inline void updateBuffer(CkArrayCheckPTMessage *data) 
   {
     double t = CmiWallTimer();
+    // unpack it
+    envelope *env = UsrToEnv(data);
+    CkUnpackMessage(&env);
+    data = (CkArrayCheckPTMessage *)EnvToUsr(env);
     FILE *f = fopen(fname,"wb");
     PUP::toDisk p(f);
     CkPupMessage(p, (void **)&data);
-    // delay sync to the end because otherwise the messages are delayed
+    // delay sync to the end because otherwise the messages are blocked
 //    fsync(fileno(f));
     fclose(f);
     bud1 = data->bud1;
@@ -213,7 +215,7 @@ public:
   }
 };
 
-CkMemCheckPT::CkMemCheckPT()
+CkMemCheckPT::CkMemCheckPT(int w)
 {
 #if CK_NO_PROC_POOL
   if (CkNumPes() <= 2) {
@@ -225,6 +227,7 @@ CkMemCheckPT::CkMemCheckPT()
   }
   inRestarting = 0;
   recvCount = peCount = 0;
+  where = w;
 }
 
 CkMemCheckPT::~CkMemCheckPT()
@@ -243,6 +246,7 @@ void CkMemCheckPT::pup(PUP::er& p)
   p|failedPes;
   p|ckCheckPTGroupID;          // recover global variable
   p|cpCallback;                        // store callback
+  p|where;                     // where to checkpoint
   if (p.isUnpacking()) {
     recvCount = peCount = 0;
   }
@@ -252,7 +256,8 @@ void CkMemCheckPT::pup(PUP::er& p)
 void CkMemCheckPT::inmem_restore(CkArrayCheckPTMessage *m) 
 {
 #if CMK_MEM_CHECKPOINT
-  DEBUGF("[%d] inmem_restore restore: mgr: %d ", CmiMyPe(), m->locMgr);  m->index.print();
+  DEBUGF("[%d] inmem_restore restore: mgr: %d ", CmiMyPe(), m->locMgr);  
+  // m->index.print();
   PUP::fromMem p(m->packData);
   CkLocMgr *mgr = CProxy_CkLocMgr(m->locMgr).ckLocalBranch();
   CmiAssert(mgr);
@@ -317,11 +322,11 @@ void CkMemCheckPT::createEntry(CkArrayID aid, CkGroupID loc, CkArrayIndexMax ind
       }
     }
   }
-#if CK_USE_MEM
-  CkMemCheckPTInfo *newEntry = new CkMemCheckPTInfo(aid, loc, index, buddy);
-#else
-  CkDiskCheckPTInfo *newEntry = new CkDiskCheckPTInfo(aid, loc, index, buddy, len+1);
-#endif
+  CkCheckPTInfo *newEntry;
+  if (where == CkCheckPoint_inMEM)
+    newEntry = new CkMemCheckPTInfo(aid, loc, index, buddy);
+  else
+    newEntry = new CkDiskCheckPTInfo(aid, loc, index, buddy, len+1);
   ckTable.push_back(newEntry);
 }
 
@@ -336,7 +341,6 @@ void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
     CkPrintf("[%d] Start checkpointing  starter: %d... \n", CkMyPe(), cpStarter);
   }
 
-//  if (iFailed()) return;
   int len = ckTable.length();
   for (int i=0; i<len; i++) {
     CkCheckPTInfo *entry = ckTable[i];
@@ -388,7 +392,7 @@ void CkMemCheckPT::sendProcData()
   }
   msg->pe = CkMyPe();
   msg->len = size;
-  msg->reportPe = cpStarter;    // in case other processor is not in checkpoint mode
+  msg->reportPe = cpStarter;  //in case other processor isn't in checkpoint mode
   thisProxy[ChkptOnPe()].recvProcData(msg);
 }
 
@@ -417,20 +421,22 @@ void CkMemCheckPT::recvData(CkArrayCheckPTMessage *msg)
       // inform starter processor that I am done.
     recvCount ++;
     if (recvCount == ckTable.length()) {
-#if CK_USE_MEM
-//CmiPrintf("[%d] CkMemCheckPT::recvData report to %d\n", CkMyPe(), cpStarter);
-      thisProxy[cpStarter].cpFinish();
-#else
-      // another barrier for finalize the writing using fsync
-      CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
-      contribute(0,NULL,CkReduction::sum_int,localcb);
-#endif
+      if (where == CkCheckPoint_inMEM) {
+        thisProxy[cpStarter].cpFinish();
+      }
+      else if (where == CkCheckPoint_inDISK) {
+        // another barrier for finalize the writing using fsync
+        CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
+        contribute(0,NULL,CkReduction::sum_int,localcb);
+      }
+      else
+        CmiAbort("Unknown checkpoint scheme");
       recvCount = 0;
     } 
   }
 }
 
-// only used in disk
+// only used in disk checkpointing
 void CkMemCheckPT::syncFiles(CkReductionMsg *m)
 {
   delete m;
@@ -526,6 +532,7 @@ void CkMemCheckPT::resetLB(int diepe)
   for (i=0; i<failedPes.length(); i++)
     bitmap[failedPes[i]] = 0; 
   bitmap[diepe] = 0;
+
 #if CK_NO_PROC_POOL
   set_avail_vector(bitmap);
 #endif
@@ -568,13 +575,11 @@ void CkMemCheckPT::restart(int diePe)
 
   CKLOCMGR_LOOP(mgr->startInserting(););
 
-  // afterwards, the QD detection should work again
-  //if (CkMyPe() == 0)
-  //  CkStartQD(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
   thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
 #endif
 }
 
+// loally remove all array elements
 void CkMemCheckPT::removeArrayElements()
 {
 #if CMK_MEM_CHECKPOINT
@@ -593,23 +598,6 @@ void CkMemCheckPT::removeArrayElements()
 
 //  CKLOCMGR_LOOP(ElementDestoryer chk(mgr); mgr->iterate(chk););
 
-#if 0
-  // first phase: destroy all existing array elements
-  for (int idx=0; idx<len; idx++) {
-    CkMemCheckPTInfo *entry = ckTable[idx];
-    // the bigger number PE do the destory
-    if (CkMyPe() < entry->pNo && entry->pNo != thisFailedPe) continue;
-    CkArrayMessage *msg = (CkArrayMessage *)CkAllocSysMsg();
-    msg->array_setIfNotThere(CkArray_IfNotThere_buffer);
-    CkSendMsgArray(CkIndex_ArrayElement::ckDestroy(),msg,entry->aid,entry->index);
-    //CkCallback cb(CkIndex_ArrayElement::ckDestroy(), entry->index, entry->aid);
-    //cb.send(msg);
-CkPrintf("[%d] Destory: ", CkMyPe()); entry->index.print();
-  }
-#endif
-
-  //if (CkMyPe() == 0)
-  //  CkStartQD(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
   thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
 #endif
 }
@@ -628,8 +616,6 @@ void CkMemCheckPT::resetReductionMgr()
   // reset again
   //CpvAccess(_qd)->flushStates();
 
-  //if (CkMyPe() == 0)
-  //  CkStartQD(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
   thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
 }
 
@@ -653,7 +639,8 @@ void CkMemCheckPT::recoverBuddies()
       // find a new buddy
       int budPe = CkMyPe();
 //      while (budPe == CkMyPe() || isFailed(budPe)) budPe = CrnRand()%CkNumPes();
-      while (budPe == CkMyPe() || isFailed(budPe)) budPe = (budPe+1)%CkNumPes();
+      while (budPe == CkMyPe() || isFailed(budPe)) 
+          budPe = (budPe+1)%CkNumPes();
       entry->pNo = budPe;
 #else
       int budPe = thisFailedPe;
@@ -692,7 +679,8 @@ void CkMemCheckPT::recoverArrayElements()
     if (CkMyPe() == entry->pNo+1 || 
         CkMyPe()+CkNumPes() == entry->pNo+1) continue;
 #endif
-CkPrintf("[%d] restore idx:%d aid:%d loc:%d ", CkMyPe(), idx, (CkGroupID)(entry->aid), entry->locMgr); entry->index.print();
+//CkPrintf("[%d] restore idx:%d aid:%d loc:%d ", CkMyPe(), idx, (CkGroupID)(entry->aid), entry->locMgr); entry->index.print();
+
     entry->updateBuddy(CkMyPe(), entry->pNo);
     CkArrayCheckPTMessage *msg = entry->getCopy();
     // gzheng
@@ -740,7 +728,7 @@ void CkMemCheckPT::quiescence(CkCallback &cb)
   }
 }
 
-// function called by user to start a check point
+// User callable function - to start a checkpoint
 // callback cb is used to pass control back
 void CkStartMemCheckpoint(CkCallback &cb)
 {
@@ -762,7 +750,7 @@ void CkStartMemCheckpoint(CkCallback &cb)
   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
   checkptMgr.doItNow(CkMyPe(), cb);
 #else
-  // when mem checkpoint is not enabled, invike cb immediately
+  // when mem checkpoint is disabled, invike cb immediately
   cb.send();
 #endif
 }
@@ -784,6 +772,7 @@ CkPrintf("CkRestartCheckPointCallback activated for diePe: %d\n", _diePE);
   CkRestartCheckPoint(_diePE);
 }
 
+// Converse function handles
 static int askProcDataHandlerIdx;
 static int restartBcastHandlerIdx;
 static int recoverProcDataHandlerIdx;
@@ -853,7 +842,6 @@ static void recoverProcDataHandler(char *msg)
    CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
    // gzheng
    CKLOCMGR_LOOP(mgr->startInserting(););
-//   cur_restart_phase = temp;
 
    char reqmsg[CmiMsgHeaderSizeBytes+sizeof(int)];
    *(int *)(&reqmsg[CmiMsgHeaderSizeBytes]) = CkMyPe();
@@ -900,7 +888,7 @@ void CkMemRestart(const char *dummy)
    CmiSyncSend(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)&msg);
    cur_restart_phase=0;    // allow all message to come in
 #else
-   CmiAbort("Fault tolerance is not support, rebuild charm++ with 'ft' option");
+   CmiAbort("Fault tolerance is not support, rebuild charm++ with 'syncft' option");
 #endif
 }
 
@@ -926,7 +914,12 @@ class CkMemCheckPTInit: public Chare {
 public:
   CkMemCheckPTInit(CkArgMsg *m) {
 #if CMK_MEM_CHECKPOINT
-    ckCheckPTGroupID = CProxy_CkMemCheckPT::ckNew();
+    int  where = CkCheckPoint_inMEM;
+    if (CmiGetArgFlagDesc(m->argv, "+ftc_disk", "Double-disk Checkpointing")) {
+      where = CkCheckPoint_inDISK;
+      CkPrintf("Charm++> Double-disk Checkpointing. \n");
+    }
+    ckCheckPTGroupID = CProxy_CkMemCheckPT::ckNew(where);
     CkPrintf("CkMemCheckPTInit main chare created!\n");
 #endif
   }
index 9ff6bbaef33e8a2ca03b0dcae7f94c85eca23166..504de02e237be4f5b08b882e44ac275317a23723 100644 (file)
@@ -13,7 +13,7 @@ module CkMemCheckpoint {
   message CkArrayCheckPTReqMessage;
 
   group [migratable] CkMemCheckPT {
-        entry CkMemCheckPT(void);
+        entry CkMemCheckPT(int w);
        entry void createEntry(CkArrayID, CkGroupID, CkArrayIndexMax, int);
        // checkpointing
         entry void doItNow(int spe, CkCallback &);  //checkpointing
index 079f59c3b7136ad8be145056cf790ff9f04f6759..2b7ad432cffb8ed5b34807d0663385210b111e4d 100644 (file)
@@ -50,10 +50,14 @@ public:
    virtual int getSize() = 0;
 };
 
+/// memory or disk checkpointing
+#define CkCheckPoint_inMEM   1
+#define CkCheckPoint_inDISK  2
+
 class CkMemCheckPT: public CBase_CkMemCheckPT {
 public:
-  CkMemCheckPT();
-  CkMemCheckPT(CkMigrateMessage *m):CBase_CkMemCheckPT(m) { }
+  CkMemCheckPT(int w);
+  CkMemCheckPT(CkMigrateMessage *m):CBase_CkMemCheckPT(m) {}
   virtual ~CkMemCheckPT();
   void pup(PUP::er& p);
   void doItNow(int sp, CkCallback &);
@@ -82,16 +86,20 @@ private:
   CkVec<CkCheckPTInfo *> ckTable;
 
   int recvCount, peCount;
+    /// the processor who initiate the checkpointing
   int cpStarter;
   CkVec<int> failedPes;
   int thisFailedPe;
+
+    /// to use memory or disk checkpointing
+  int    where;
 private:
-  inline int iFailed() { return isFailed(CkMyPe()); }
-  int isFailed(int pe);
-  int totalFailed();
-  void failed(int pe);
   inline int isMaster(int pe);
 
+  int  isFailed(int pe);
+  void failed(int pe);
+  int  totalFailed();
+
   void sendProcData();
 };