many greate changes. Restructured the checkpoint table to have separate class of...
authorGengbin Zheng <gzheng@illinois.edu>
Wed, 24 Mar 2004 07:01:59 +0000 (07:01 +0000)
committerGengbin Zheng <gzheng@illinois.edu>
Wed, 24 Mar 2004 07:01:59 +0000 (07:01 +0000)
src/ck-core/ckmemcheckpoint.C
src/ck-core/ckmemcheckpoint.ci
src/ck-core/ckmemcheckpoint.h

index f051066f95e34f1c1bd439d085e2bbb049b888b3..ba6c8f9a056ed72857e972a554d9d4e0843f41be 100644 (file)
@@ -6,7 +6,7 @@ In memory synchronous checkpointing and restart
 written by Gengbin Zheng, gzheng@uiuc.edu
            Lixia Shi,     lixiashi@uiuc.edu
 
-added 12/18/03
+added 12/18/03:
 
 To ensure fault tolerance while allowing migration, it uses double
 checkpointing scheme for each array element.
@@ -23,6 +23,11 @@ Restart phase contains two steps:
 2. CkMemCheckPT gets control and recover array elements and reset all
    states to be consistent.
 
+added 3/14/04:
+
+1. also support for double in-disk checkpoint/restart
+   use macro CK_USE_MEM and CK_USE_DISK to switch.
+
 TODO:
  checkpoint scheme can be reimplemented based on per processor scheme;
  restart phase should restore/reset group table, etc on all processors, thus flushStates() can be eliminated.
@@ -36,6 +41,10 @@ TODO:
 
 #define DEBUGF     // CkPrintf
 
+// choose to use in-memory or in-disk checkpointing
+#define CK_USE_MEM                             1
+#define CK_USE_DISK                            0       
+
 int CkMemCheckPT::inRestarting = 0;
 double CkMemCheckPT::startTime;
 char *CkMemCheckPT::stage;
@@ -58,7 +67,7 @@ inline int ChkptOnPe() { return (CkMyPe()+1)%CkNumPes(); }
 void ArrayElement::init_checkpt() {
        if (_memChkptOn == 0) return;
        // CmiPrintf("[%d] ArrayElement::init_checkpt %d\n", CkMyPe(), info.fromMigration);
-       // only masteriinit checkpoint
+       // only master init checkpoint
         if (thisArray->getLocMgr()->firstManager->mgr!=thisArray) return;
 
 //        budPEs[0] = (CkMyPe()-1+CkNumPes())%CkNumPes();
@@ -103,35 +112,92 @@ void ArrayElement::inmem_checkpoint(CkArrayCheckPTReqMessage *m) {
 #endif
 }
 
-// called by checkpoint mgr to restore an array element
-void CkMemCheckPT::inmem_restore(CkArrayCheckPTMessage *m) 
+// class for in memory checkpointing
+class CkMemCheckPTInfo: public CkCheckPTInfo
 {
-#if CMK_MEM_CHECKPOINT
-  //DEBUGF("[%d] inmem_restore restore", CmiMyPe());  m->index.print();
-//  CmiPrintf("[%d] inmem_restore restore: mgr: %d ", CmiMyPe(), m->locMgr);  m->index.print();
-  PUP::fromMem p(m->packData);
-  CkLocMgr *mgr = CProxy_CkLocMgr(m->locMgr).ckLocalBranch();
-  mgr->resume(m->index, p);
-
-  // find a list of array elements bound together
-  ArrayElement *elt = (ArrayElement *)mgr->lookup(m->index, m->aid);
-  CkLocRec_local *rec = elt->myRec;
-  CkVec<CkMigratable *> list;
-  mgr->migratableList(rec, list);
-  CmiAssert(list.length() > 0);
-  for (int l=0; l<list.length(); l++) {
-    elt = (ArrayElement *)list[l];
-    elt->budPEs[0] = m->bud1;
-    elt->budPEs[1] = m->bud2;
-    //    reset, may not needed now
-    // for now.
-    for (int i=0; i<CK_ARRAYLISTENER_MAXLEN; i++) {
-      contributorInfo *c=(contributorInfo *)&elt->listenerData[i];
-      if (c) c->redNo = 0;
+  CkArrayCheckPTMessage *ckBuffer;
+public:
+  CkMemCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndexMax idx, int pno): 
+                   CkCheckPTInfo(a, loc, idx, pno)
+  {
+    ckBuffer = NULL;
+  }
+  ~CkMemCheckPTInfo() 
+  {
+    if (ckBuffer) delete ckBuffer; 
+  }
+  inline void updateBuffer(CkArrayCheckPTMessage *data) 
+  {
+    if (ckBuffer) delete ckBuffer;
+    ckBuffer = data;
+  }    
+  inline CkArrayCheckPTMessage * getCopy()
+  {
+    if (ckBuffer == NULL) {
+      CmiPrintf("[%d] recoverArrayElements: element does not have checkpoint data.", CkMyPe());
+      CmiAbort("Abort!");
     }
+    return (CkArrayCheckPTMessage *)CkCopyMsg((void **)&ckBuffer);
+  }     
+  inline void updateBuddy(int b1, int b2) {
+     ckBuffer->bud1 = b1; ckBuffer->bud2 = b2;
   }
-#endif
-}
+  inline int getSize() { 
+     CmiAssert(ckBuffer);
+     return ckBuffer->len; 
+  }
+};
+
+// class for in-disk checkpointing
+class CkDiskCheckPTInfo: public CkCheckPTInfo 
+{
+  char fname[64];
+  int bud1, bud2;
+  int len;                     // checkpoint size
+public:
+  CkDiskCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndexMax idx, int pno, int myidx): CkCheckPTInfo(a, loc, idx, pno)
+  {
+    sprintf(fname, "/tmp/ckpt%d.%d", getpid(), myidx);
+    bud1 = bud2 = -1;
+    len = 0;
+  }
+  ~CkDiskCheckPTInfo() 
+  {
+    remove(fname);
+  }
+  inline void updateBuffer(CkArrayCheckPTMessage *data) 
+  {
+    double t = CmiWallTimer();
+    FILE *f = fopen(fname,"wb");
+    PUP::toDisk p(f);
+    CkPupMessage(p, (void **)&data);
+    // delay sync to the end because otherwise the messages are delayed
+//    fsync(fileno(f));
+    fclose(f);
+    bud1 = data->bud1;
+    bud2 = data->bud2;
+    len = data->len;
+    delete data;
+    //CmiPrintf("[%d] updateBuffer took %f seconds. \n", CkMyPe(), CmiWallTimer()-t);
+  }
+  inline CkArrayCheckPTMessage * getCopy()     // get a copy of checkpoint
+  {
+    CkArrayCheckPTMessage *data;
+    FILE *f = fopen(fname,"rb");
+    PUP::fromDisk p(f);
+    CkPupMessage(p, (void **)&data);
+    fclose(f);
+    data->bud1 = bud1;                         // update the buddies
+    data->bud2 = bud2;
+    return data;
+  }
+  inline void updateBuddy(int b1, int b2) {
+     bud1 = b1; bud2 = b2;
+  }
+  inline int getSize() { 
+     return len; 
+  }
+};
 
 CkMemCheckPT::CkMemCheckPT()
 {
@@ -145,6 +211,10 @@ CkMemCheckPT::CkMemCheckPT()
 
 CkMemCheckPT::~CkMemCheckPT()
 {
+  int len = ckTable.length();
+  for (int i=0; i<len; i++) {
+    delete ckTable[i];
+  }
 }
 
 void CkMemCheckPT::pup(PUP::er& p) 
@@ -160,6 +230,36 @@ void CkMemCheckPT::pup(PUP::er& p)
   }
 }
 
+// called by checkpoint mgr to restore an array element
+void CkMemCheckPT::inmem_restore(CkArrayCheckPTMessage *m) 
+{
+#if CMK_MEM_CHECKPOINT
+  //DEBUGF("[%d] inmem_restore restore", CmiMyPe());  m->index.print();
+//  CmiPrintf("[%d] inmem_restore restore: mgr: %d ", CmiMyPe(), m->locMgr);  m->index.print();
+  PUP::fromMem p(m->packData);
+  CkLocMgr *mgr = CProxy_CkLocMgr(m->locMgr).ckLocalBranch();
+  mgr->resume(m->index, p);
+
+  // find a list of array elements bound together
+  ArrayElement *elt = (ArrayElement *)mgr->lookup(m->index, m->aid);
+  CkLocRec_local *rec = elt->myRec;
+  CkVec<CkMigratable *> list;
+  mgr->migratableList(rec, list);
+  CmiAssert(list.length() > 0);
+  for (int l=0; l<list.length(); l++) {
+    elt = (ArrayElement *)list[l];
+    elt->budPEs[0] = m->bud1;
+    elt->budPEs[1] = m->bud2;
+    //    reset, may not needed now
+    // for now.
+    for (int i=0; i<CK_ARRAYLISTENER_MAXLEN; i++) {
+      contributorInfo *c=(contributorInfo *)&elt->listenerData[i];
+      if (c) c->redNo = 0;
+    }
+  }
+#endif
+}
+
 // return 1 is pe was a crashed processor
 int CkMemCheckPT::isFailed(int pe)
 {
@@ -186,7 +286,7 @@ void CkMemCheckPT::createEntry(CkArrayID aid, CkGroupID loc, CkArrayIndexMax ind
   // error check, no duplicate
   int idx, len = ckTable.size();
   for (idx=0; idx<len; idx++) {
-    CkMemCheckPTInfo *entry = ckTable[idx];
+    CkCheckPTInfo *entry = ckTable[idx];
     if (index == entry->index) {
       if (aid == entry->aid) {
         CkPrintf("[%d] CkMemCheckPT::createEntry a duplciated entry. \n", CkMyPe());
@@ -198,7 +298,11 @@ void CkMemCheckPT::createEntry(CkArrayID aid, CkGroupID loc, CkArrayIndexMax ind
       }
     }
   }
+#if CK_USE_MEM
   CkMemCheckPTInfo *newEntry = new CkMemCheckPTInfo(aid, loc, index, buddy);
+#else
+  CkDiskCheckPTInfo *newEntry = new CkDiskCheckPTInfo(aid, loc, index, buddy, len+1);
+#endif
   ckTable.push_back(newEntry);
 }
 
@@ -208,13 +312,15 @@ void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
 {
   cpCallback = cb;
   cpStarter = starter;
-  CkPrintf("[%d] Start checkpointing  starter: %d... \n", CkMyPe(), cpStarter);
-  if (CkMyPe() == cpStarter) startTime = CmiWallTimer();
+  if (CkMyPe() == cpStarter) {
+    startTime = CmiWallTimer();
+    CkPrintf("[%d] Start checkpointing  starter: %d... \n", CkMyPe(), cpStarter);
+  }
 
 //  if (iFailed()) return;
   int len = ckTable.length();
   for (int i=0; i<len; i++) {
-    CkMemCheckPTInfo *entry = ckTable[i];
+    CkCheckPTInfo *entry = ckTable[i];
       // always let the bigger number processor send request
     if (CkMyPe() < entry->pNo) continue;
       // call inmem_checkpoint to the array element, ask it to send
@@ -230,7 +336,7 @@ void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
 }
 
 // don't handle array elements
-static void _handleProcData(PUP::er &p)
+static inline void _handleProcData(PUP::er &p)
 {
     // save readonlys, and callback BTW
     CkPupROData(p);
@@ -255,8 +361,7 @@ void CkMemCheckPT::sendProcData()
     size = p.size();
   }
   int packSize = size;
-  CkProcCheckPTMessage *msg =
-                 new (packSize, 0) CkProcCheckPTMessage;
+  CkProcCheckPTMessage *msg = new (packSize, 0) CkProcCheckPTMessage;
   DEBUGF("[%d] CkMemCheckPT::sendProcData - size: %d\n", CkMyPe(), size);
   {
     PUP::toMem p(msg->packData);
@@ -282,22 +387,37 @@ void CkMemCheckPT::recvData(CkArrayCheckPTMessage *msg)
   int len = ckTable.length();
   int idx;
   for (idx=0; idx<len; idx++) {
-    CkMemCheckPTInfo *entry = ckTable[idx];
+    CkCheckPTInfo *entry = ckTable[idx];
     if (msg->locMgr == entry->locMgr && msg->index == entry->index) break;
   }
   CkAssert(idx < len);
+  int isChkpting = msg->cp_flag;
   ckTable[idx]->updateBuffer(msg);
     // all my array elements have returned their inmem data
     // inform starter processor that I am done.
-  if (msg->cp_flag) {
+  if (isChkpting) {
     recvCount ++;
     if (recvCount == ckTable.length()) {
+#if CK_USE_MEM
       thisProxy[cpStarter].cpFinish();
+#else
+      // another barrier for finalize the writing using fsync
+      CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
+      contribute(0,NULL,CkReduction::sum_int,localcb);
+#endif
       recvCount = 0;
     } 
   }
 }
 
+// only used in disk
+void CkMemCheckPT::syncFiles(CkReductionMsg *m)
+{
+  delete m;
+  system("sync");
+  thisProxy[cpStarter].cpFinish();
+}
+
 // only is called on cpStarter when checkpoint is done
 void CkMemCheckPT::cpFinish()
 {
@@ -318,9 +438,9 @@ void CkMemCheckPT::report()
   int objsize = 0;
   int len = ckTable.length();
   for (int i=0; i<len; i++) {
-    CkMemCheckPTInfo *entry = ckTable[i];
-    CmiAssert(entry && entry->ckBuffer);
-    objsize += entry->ckBuffer->len;
+    CkCheckPTInfo *entry = ckTable[i];
+    CmiAssert(entry);
+    objsize += entry->getSize();
   }
   CmiAssert(CpvAccess(procChkptBuf));
   CkPrintf("[%d] Checkpointed Object size: %d Processor data: %d\n", CkMyPe(), objsize, CpvAccess(procChkptBuf)->len);
@@ -501,15 +621,14 @@ void CkMemCheckPT::recoverBuddies()
 
   // recover buddies
   for (idx=0; idx<len; idx++) {
-    CkMemCheckPTInfo *entry = ckTable[idx];
+    CkCheckPTInfo *entry = ckTable[idx];
     if (entry->pNo == thisFailedPe) {
       int budPe = CkMyPe();
 //      while (budPe == CkMyPe() || isFailed(budPe)) budPe = CrnRand()%CkNumPes();
       while (budPe == CkMyPe() || isFailed(budPe)) budPe = (budPe+1)%CkNumPes();
       entry->pNo = budPe;
       thisProxy[budPe].createEntry(entry->aid, entry->locMgr, entry->index, CkMyPe());
-      CmiAssert(entry->ckBuffer);
-      CkArrayCheckPTMessage *msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&entry->ckBuffer);
+      CkArrayCheckPTMessage *msg = entry->getCopy();
       msg->cp_flag = 0;            // not checkpointing
       thisProxy[budPe].recvData(msg);
     }
@@ -533,15 +652,13 @@ void CkMemCheckPT::recoverArrayElements()
   int len = ckTable.length();
   for (int idx=0; idx<len; idx++)
   {
-    CkMemCheckPTInfo *entry = ckTable[idx];
+    CkCheckPTInfo *entry = ckTable[idx];
     // the bigger one will do 
 //    if (CkMyPe() < entry->pNo) continue;
     if (!isMaster(entry->pNo)) continue;
 //CkPrintf("[%d] restore idx:%d aid:%d loc:%d ", CkMyPe(), idx, (CkGroupID)(entry->aid), entry->locMgr); entry->index.print();
-    if (entry->ckBuffer == NULL) CmiAbort("recoverArrayElements: element does not have checkpoint data.");
-    entry->ckBuffer->bud1 = CkMyPe(); entry->ckBuffer->bud2 = entry->pNo;
-    CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
-    CkArrayCheckPTMessage *msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&entry->ckBuffer);
+    entry->updateBuddy(CkMyPe(), entry->pNo);
+    CkArrayCheckPTMessage *msg = entry->getCopy();
     // gzheng
     //checkptMgr[CkMyPe()].inmem_restore(msg);
     inmem_restore(msg);
index 0fa2d5287598976328f532af901aec0d25087a50..9ff6bbaef33e8a2ca03b0dcae7f94c85eca23166 100644 (file)
@@ -19,6 +19,7 @@ module CkMemCheckpoint {
         entry void doItNow(int spe, CkCallback &);  //checkpointing
        entry void recvData(CkArrayCheckPTMessage *);
        entry void recvProcData(CkProcCheckPTMessage *);
+       entry void syncFiles(CkReductionMsg *);
        entry void cpFinish();
        entry void report();
        // restart
index 705fb2fc65478a21b8175f39276437126ecb439f..079f59c3b7136ad8be145056cf790ff9f04f6759 100644 (file)
@@ -1,5 +1,5 @@
-#ifndef _CKCHECKPT_
-#define _CKCHECKPT_
+#ifndef _CK_MEM_CHECKPT_
+#define _CK_MEM_CHECKPT_
 
 #include "CkMemCheckpoint.decl.h"
 
@@ -31,30 +31,30 @@ public:
        char *packData;
 };
 
-// table entry
-class CkMemCheckPTInfo {
+// table entry base class
+class CkCheckPTInfo {
    friend class CkMemCheckPT;
-private:
+protected:
    CkArrayID aid;
    CkGroupID locMgr;
    CkArrayIndexMax index;
    int pNo;   //another buddy
-   CkArrayCheckPTMessage* ckBuffer; 
 public:
-   CkMemCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndexMax idx, int no):
-            aid(a), locMgr(loc), index(idx), pNo(no), ckBuffer(NULL)  {}
-   ~CkMemCheckPTInfo() { if (ckBuffer) delete ckBuffer; }
-   void updateBuffer(CkArrayCheckPTMessage *data) { 
-       if (ckBuffer) delete ckBuffer;
-       ckBuffer = data;
-   }
+   CkCheckPTInfo();
+   CkCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndexMax idx, int pno):
+                  aid(a), locMgr(loc), index(idx), pNo(pno)   {}
+   virtual ~CkCheckPTInfo() {}
+   virtual void updateBuffer(CkArrayCheckPTMessage *data) = 0;
+   virtual CkArrayCheckPTMessage * getCopy() = 0;
+   virtual void updateBuddy(int b1, int b2) = 0;
+   virtual int getSize() = 0;
 };
 
 class CkMemCheckPT: public CBase_CkMemCheckPT {
 public:
   CkMemCheckPT();
   CkMemCheckPT(CkMigrateMessage *m):CBase_CkMemCheckPT(m) { }
-  ~CkMemCheckPT();
+  virtual ~CkMemCheckPT();
   void pup(PUP::er& p);
   void doItNow(int sp, CkCallback &);
   void restart(int failedPe);
@@ -63,6 +63,7 @@ public:
   void recvData(CkArrayCheckPTMessage *);
   void recvProcData(CkProcCheckPTMessage *);
   void cpFinish();
+  void syncFiles(CkReductionMsg *);
   void report();
   void recoverBuddies();
   void recoverArrayElements();
@@ -78,7 +79,7 @@ public:
   static double startTime;
   static char*  stage;
 private:
-  CkVec<CkMemCheckPTInfo *> ckTable;
+  CkVec<CkCheckPTInfo *> ckTable;
 
   int recvCount, peCount;
   int cpStarter;