latest stable version, with a lot of bug fixes.
authorLixia Shi <lixiashi@uiuc.edu>
Sun, 29 Feb 2004 04:13:11 +0000 (04:13 +0000)
committerLixia Shi <lixiashi@uiuc.edu>
Sun, 29 Feb 2004 04:13:11 +0000 (04:13 +0000)
src/ck-core/ckmemcheckpoint.C
src/ck-core/ckmemcheckpoint.ci
src/ck-core/ckmemcheckpoint.h

index 15959c73b39c2d77fd6edc9d9564cd9879b67269..21677331354b7973252ed4d97f4fddd336f58118 100644 (file)
@@ -34,7 +34,13 @@ TODO:
 #include "register.h"
 #include "conv-ccs.h"
 
-#define DEBUGF      CkPrintf
+#define DEBUGF     // CkPrintf
+
+int CkMemCheckPT::inRestarting = 0;
+double CkMemCheckPT::startTime;
+char *CkMemCheckPT::stage;
+
+int _memChkptOn = 1;
 
 CkGroupID ckCheckPTGroupID;            // readonly
 
@@ -50,9 +56,15 @@ inline int ChkptOnPe() { return (CkMyPe()+1)%CkNumPes(); }
 // choose and register with 2 buggies for checkpoiting 
 #if CMK_MEM_CHECKPOINT
 void ArrayElement::init_checkpt() {
+       if (_memChkptOn == 0) return;
        // CmiPrintf("[%d] ArrayElement::init_checkpt %d\n", CkMyPe(), info.fromMigration);
-        budPEs[0] = (CkMyPe()-1+CkNumPes())%CkNumPes();
+       // only masteriinit checkpoint
+        if (thisArray->getLocMgr()->firstManager->mgr!=thisArray) return;
+
+//        budPEs[0] = (CkMyPe()-1+CkNumPes())%CkNumPes();
+        budPEs[0] = CkMyPe();
         budPEs[1] = (CkMyPe()+1)%CkNumPes();
+       CmiAssert(budPEs[0] != budPEs[1]);
         // inform checkPTMgr
         CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
         checkptMgr[budPEs[0]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[1]);        
@@ -96,30 +108,37 @@ void CkMemCheckPT::inmem_restore(CkArrayCheckPTMessage *m)
 {
 #if CMK_MEM_CHECKPOINT
   //DEBUGF("[%d] inmem_restore restore", CmiMyPe());  m->index.print();
+//  CmiPrintf("[%d] inmem_restore restore: mgr: %d ", CmiMyPe(), m->locMgr);  m->index.print();
   PUP::fromMem p(m->packData);
   CkLocMgr *mgr = CProxy_CkLocMgr(m->locMgr).ckLocalBranch();
   mgr->resume(m->index, p);
-  //   reset, may not needed now
+
+  // find a list of array elements bound together
   ArrayElement *elt = (ArrayElement *)mgr->lookup(m->index, m->aid);
-  CmiAssert(elt);
-  elt->budPEs[0] = m->bud1;
-  elt->budPEs[1] = m->bud2;
-  // for now.
-  for (int i=0; i<CK_ARRAYLISTENER_MAXLEN; i++) {
-    contributorInfo *c=(contributorInfo *)&elt->listenerData[i];
-    if (c) c->redNo = 0;
+  CkLocRec_local *rec = elt->myRec;
+  CkVec<CkMigratable *> list;
+  mgr->elementList(rec, list);
+  CmiAssert(list.length() > 0);
+  for (int l=0; l<list.length(); l++) {
+    elt = (ArrayElement *)list[l];
+    elt->budPEs[0] = m->bud1;
+    elt->budPEs[1] = m->bud2;
+    //    reset, may not needed now
+    // for now.
+    for (int i=0; i<CK_ARRAYLISTENER_MAXLEN; i++) {
+      contributorInfo *c=(contributorInfo *)&elt->listenerData[i];
+      if (c) c->redNo = 0;
+    }
   }
-/*
-  contributorInfo *c=(contributorInfo *)&elt->listenerData[elt->thisArray->reducer->ckGetOffset()];
-  if (c) c->redNo = 0;
-*/
 #endif
 }
 
 CkMemCheckPT::CkMemCheckPT()
 {
-  if (CkNumPes() <= 3 && CkMyPe() == 0) 
-    CkPrintf("CkMemCheckPT disabled!\n");
+  if (CkNumPes() <= 2) {
+    if (CkMyPe() == 0)  CkPrintf("Warning: CkMemCheckPT disabled!\n");
+    _memChkptOn = 0;
+  }
   inRestarting = 0;
   recvCount = peCount = 0;
 }
@@ -156,6 +175,11 @@ void CkMemCheckPT::failed(int pe)
   failedPes.push_back(pe);
 }
 
+int CkMemCheckPT::totalFailed()
+{
+  return failedPes.length();
+}
+
 // create an checkpoint entry for array element of aid with index.
 void CkMemCheckPT::createEntry(CkArrayID aid, CkGroupID loc, CkArrayIndexMax index, int buddy)
 {
@@ -163,11 +187,16 @@ void CkMemCheckPT::createEntry(CkArrayID aid, CkGroupID loc, CkArrayIndexMax ind
   int idx, len = ckTable.size();
   for (idx=0; idx<len; idx++) {
     CkMemCheckPTInfo *entry = ckTable[idx];
-    if (aid == entry->aid && index == entry->index) break;
-  }
-  if (idx<len) {
-    CkPrintf("[%d] CkMemCheckPT::createEntry a duplciated entry. \n", CkMyPe());
-    CmiAbort("CkMemCheckPT::createEntry a duplciated entry");
+    if (index == entry->index) {
+      if (aid == entry->aid) {
+        CkPrintf("[%d] CkMemCheckPT::createEntry a duplciated entry. \n", CkMyPe());
+        CmiAbort("CkMemCheckPT::createEntry a duplciated entry");
+      }
+      if (loc == entry->locMgr) {
+       // bindTo array elements
+        return;
+      }
+    }
   }
   CkMemCheckPTInfo *newEntry = new CkMemCheckPTInfo(aid, loc, index, buddy);
   ckTable.push_back(newEntry);
@@ -179,7 +208,8 @@ void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
 {
   cpCallback = cb;
   cpStarter = starter;
-  CkPrintf("[%d] Start checkpointing ... \n", CkMyPe());
+  CkPrintf("[%d] Start checkpointing  starter: %d... \n", CkMyPe(), cpStarter);
+  if (CkMyPe() == cpStarter) startTime = CmiWallTimer();
 
 //  if (iFailed()) return;
   int len = ckTable.length();
@@ -206,15 +236,13 @@ static void _handleProcData(PUP::er &p)
     CkPupROData(p);
 
     // save mainchares into MainChares.dat
-    if(CkMyPe()==0) {
-       CkPupMainChareData(p);
-    }
+    if(CkMyPe()==0) CkPupMainChareData(p);
        
     // save groups into Groups.dat
     CkPupGroupData(p);
 
     // save nodegroups into NodeGroups.dat
-    CkPupNodeGroupData(p);
+    if(CkMyRank()==0) CkPupNodeGroupData(p);
 }
 
 void CkMemCheckPT::sendProcData()
@@ -236,6 +264,7 @@ void CkMemCheckPT::sendProcData()
   }
   msg->pe = CkMyPe();
   msg->len = size;
+  msg->reportPe = cpStarter;    // in case other processor is not in checkpoint mode
   thisProxy[ChkptOnPe()].recvProcData(msg);
 }
 
@@ -243,8 +272,8 @@ void CkMemCheckPT::recvProcData(CkProcCheckPTMessage *msg)
 {
   if (CpvAccess(procChkptBuf)) delete CpvAccess(procChkptBuf);
   CpvAccess(procChkptBuf) = msg;
-  cpStarter = 0;    // fix me
-  thisProxy[cpStarter].cpFinish();
+//CmiPrintf("[%d] CkMemCheckPT::recvProcData report to %d\n", CkMyPe(), msg->reportPe);
+  thisProxy[msg->reportPe].cpFinish();
 }
 
 // ArrayElement call this function to give us the checkpointed data
@@ -254,7 +283,7 @@ void CkMemCheckPT::recvData(CkArrayCheckPTMessage *msg)
   int idx;
   for (idx=0; idx<len; idx++) {
     CkMemCheckPTInfo *entry = ckTable[idx];
-    if (msg->aid == entry->aid && msg->index == entry->index) break;
+    if (msg->locMgr == entry->locMgr && msg->index == entry->index) break;
   }
   CkAssert(idx < len);
   ckTable[idx]->updateBuffer(msg);
@@ -269,23 +298,51 @@ void CkMemCheckPT::recvData(CkArrayCheckPTMessage *msg)
   }
 }
 
-// only is called on cpStarter
+// only is called on cpStarter when checkpoint is done
 void CkMemCheckPT::cpFinish()
 {
-  CmiAssert(CkMyPe() == 0);
+  CmiAssert(CkMyPe() == cpStarter);
   peCount++;
     // now all processors have finished, activate callback
+//CmiPrintf("peCount:%d\n",peCount);
   if (peCount == 2*(CkNumPes())) {
-    CmiPrintf("[%d] Checkpoint finished, sending callback ... \n", CkMyPe());
+    CmiPrintf("[%d] Checkpoint finished in %f seconds, sending callback ... \n", CkMyPe(), CmiWallTimer()-startTime);
     cpCallback.send();
     peCount = 0;
+    thisProxy.report();
   }
 }
 
+void CkMemCheckPT::report()
+{
+  int objsize = 0;
+  int len = ckTable.length();
+  for (int i=0; i<len; i++) {
+    CkMemCheckPTInfo *entry = ckTable[i];
+    CmiAssert(entry && entry->ckBuffer);
+    objsize += entry->ckBuffer->len;
+  }
+  CmiAssert(CpvAccess(procChkptBuf));
+  CkPrintf("[%d] Checkpointed Object size: %d Processor data: %d\n", CkMyPe(), objsize, CpvAccess(procChkptBuf)->len);
+}
+
 /*****************************************************************************
                        RESTART Procedure
 *****************************************************************************/
 
+inline int CkMemCheckPT::isMaster(int pe)
+{
+  int mype = CkMyPe(); 
+//CkPrintf("ismaster: %d %d\n", pe, mype);
+  for (int i=1; i<CkNumPes(); i++) {
+    int me = (pe+i)%CkNumPes();
+    if (isFailed(me)) continue;
+    if (me == mype) return 1;
+    else return 0;
+  }
+  return 0;
+}
+
 // loop over all CkLocMgr and do "code"
 #define  CKLOCMGR_LOOP(code)   {       \
   int numGroups = CkpvAccess(_groupIDTable)->size();   \
@@ -298,6 +355,21 @@ void CkMemCheckPT::cpFinish()
   }    \
  }
 
+#if 0
+// helper class to pup all elements that belong to same ckLocMgr
+class ElementDestoryer : public CkLocIterator {
+private:
+        CkLocMgr *locMgr;
+public:
+        ElementDestoryer(CkLocMgr* mgr_):locMgr(mgr_){};
+        void addLocation(CkLocation &loc) {
+               CkArrayIndexMax idx=loc.getIndex();
+               CkPrintf("[%d] destroy: ", CkMyPe()); idx.print();
+               loc.destroy();
+        }
+};
+#endif
+
 // restore the bitmap vector for LB
 void CkMemCheckPT::resetLB(int diepe)
 {
@@ -314,8 +386,8 @@ void CkMemCheckPT::resetLB(int diepe)
 
   // if I am the crashed pe, rebuild my failedPEs array
   if (CkMyPe() == diepe)
-  for (i=0; i<CkNumPes(); i++) 
-    if (bitmap[i]==0) failed(bitmap[i]);
+    for (i=0; i<CkNumPes(); i++) 
+      if (bitmap[i]==0) failed(i);
 
   delete [] bitmap;
 #endif
@@ -329,10 +401,16 @@ void CkMemCheckPT::resetLB(int diepe)
 void CkMemCheckPT::restart(int diePe)
 {
 #if CMK_MEM_CHECKPOINT
+  double curTime = CmiWallTimer();
+  if (CkMyPe() == diePe)
+    CkPrintf("[%d] Process data restored in %f seconds\n", CkMyPe(), curTime - startTime);
+  stage = "resetLB";
+  startTime = curTime;
+  CkPrintf("[%d] CkMemCheckPT ----- restart.\n",CkMyPe());
+
   failed(diePe);       // add into the list of failed pes
   thisFailedPe = diePe;
 
-  CkPrintf("[%d] CkMemCheckPT ----- restart.\n",CkMyPe());
   // clean array chkpt table
   if (CkMyPe() == diePe) ckTable.length() = 0;
 
@@ -343,22 +421,30 @@ void CkMemCheckPT::restart(int diePe)
 
   CKLOCMGR_LOOP(mgr->startInserting(););
 
-  thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
   // afterwards, the QD detection should work again
+  //if (CkMyPe() == 0)
+  //  CkStartQD(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
+  thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
 #endif
 }
 
 void CkMemCheckPT::removeArrayElements()
 {
   int len = ckTable.length();
-  CkPrintf("[%d] CkMemCheckPT ----- removeArrayElements len:%d.\n",CkMyPe(),len);
+  double curTime = CmiWallTimer();
+  CkPrintf("[%d] CkMemCheckPT ----- %s len:%d in %f seconds.\n",CkMyPe(),stage,len,curTime-startTime);
+  stage = "removeArrayElements";
+  startTime = curTime;
 
-  if (cpCallback.isInvalid()) CkAbort("Don't set restart callback\n");;
+  if (cpCallback.isInvalid()) CkAbort("Didn't set restart callback\n");;
   if (CkMyPe()==thisFailedPe) CmiAssert(len == 0);
 
   // get rid of all buffering and remote recs
-  CKLOCMGR_LOOP(mgr->flushStates(););
+  CKLOCMGR_LOOP(mgr->flushCkRecs(););
 
+//  CKLOCMGR_LOOP(ElementDestoryer chk(mgr); mgr->iterate(chk););
+
+#if 0
   // first phase: destroy all existing array elements
   for (int idx=0; idx<len; idx++) {
     CkMemCheckPTInfo *entry = ckTable[idx];
@@ -369,17 +455,19 @@ void CkMemCheckPT::removeArrayElements()
     CkSendMsgArray(CkIndex_ArrayElement::ckDestroy(),msg,entry->aid,entry->index);
     //CkCallback cb(CkIndex_ArrayElement::ckDestroy(), entry->index, entry->aid);
     //cb.send(msg);
-//CkPrintf("[%d] Destory: ", CkMyPe()); entry->index.print();
+CkPrintf("[%d] Destory: ", CkMyPe()); entry->index.print();
   }
+#endif
 
-  if (CkMyPe() == 0)
-  CkStartQD(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
+  //if (CkMyPe() == 0)
+  //  CkStartQD(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
+  thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
 }
 
 // flush state in reduction manager
 void CkMemCheckPT::resetReductionMgr()
 {
-  CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr\n",CkMyPe());
+  //CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr\n",CkMyPe());
   int numGroups = CkpvAccess(_groupIDTable)->size();
   for(int i=0;i<numGroups;i++) {
     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
@@ -388,9 +476,11 @@ void CkMemCheckPT::resetReductionMgr()
     obj->ckJustMigrated();
   }
   // reset again
-  //CkResetQD();
-  if (CkMyPe() == 0)
-    CkStartQD(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
+  //CpvAccess(_qd)->flushStates();
+
+  //if (CkMyPe() == 0)
+  //  CkStartQD(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
+  thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
 }
 
 // recover the lost buddies
@@ -400,7 +490,10 @@ void CkMemCheckPT::recoverBuddies()
   int len = ckTable.length();
   // ready to flush reduction manager
   // cannot be CkMemCheckPT::restart because destory will modify states
-  CkPrintf("[%d] CkMemCheckPT ----- recoverBuddies  len: %d\n",CkMyPe(),len);
+  double curTime = CmiWallTimer();
+  CkPrintf("[%d] CkMemCheckPT ----- %s  in %f seconds\n",CkMyPe(), stage, curTime-startTime);
+  stage = "recoverBuddies";
+  startTime = curTime;
 
   //if (iFailed()) return;   ??????
 
@@ -409,7 +502,8 @@ void CkMemCheckPT::recoverBuddies()
     CkMemCheckPTInfo *entry = ckTable[idx];
     if (entry->pNo == thisFailedPe) {
       int budPe = CkMyPe();
-      while (budPe == CkMyPe() || isFailed(budPe)) budPe = CrnRand()%CkNumPes();
+//      while (budPe == CkMyPe() || isFailed(budPe)) budPe = CrnRand()%CkNumPes();
+      while (budPe == CkMyPe() || isFailed(budPe)) budPe = (budPe+1)%CkNumPes();
       entry->pNo = budPe;
       thisProxy[budPe].createEntry(entry->aid, entry->locMgr, entry->index, CkMyPe());
       CmiAssert(entry->ckBuffer);
@@ -426,23 +520,32 @@ void CkMemCheckPT::recoverBuddies()
 // restore 
 void CkMemCheckPT::recoverArrayElements()
 {
-  CkPrintf("[%d] CkMemCheckPT ----- recoverArrayElements\n",CkMyPe());
+  double curTime = CmiWallTimer();
+  CkPrintf("[%d] CkMemCheckPT ----- %s in %f seconds\n",CkMyPe(), stage, curTime-startTime);
+  stage = "recoverArrayElements";
+  startTime = curTime;
   //if (iFailed()) return;
 
   // recover all array elements
+  int count = 0;
   int len = ckTable.length();
   for (int idx=0; idx<len; idx++)
   {
     CkMemCheckPTInfo *entry = ckTable[idx];
     // the bigger one will do 
-    if (CkMyPe() < entry->pNo) continue;
-//CkPrintf("[%d] restore idx:%d  ", CkMyPe(), idx); entry->index.print();
+//    if (CkMyPe() < entry->pNo) continue;
+    if (!isMaster(entry->pNo)) continue;
+//CkPrintf("[%d] restore idx:%d aid:%d loc:%d ", CkMyPe(), idx, (CkGroupID)(entry->aid), entry->locMgr); entry->index.print();
     if (entry->ckBuffer == NULL) CmiAbort("recoverArrayElements: element does not have checkpoint data.");
     entry->ckBuffer->bud1 = CkMyPe(); entry->ckBuffer->bud2 = entry->pNo;
     CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
     CkArrayCheckPTMessage *msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&entry->ckBuffer);
-    checkptMgr[CkMyPe()].inmem_restore(msg);
+    // gzheng
+    //checkptMgr[CkMyPe()].inmem_restore(msg);
+    inmem_restore(msg);
+    count ++;
   }
+  //CkPrintf("[%d] recoverArrayElements restore %d objects\n", CkMyPe(), count);
 
   if (CkMyPe() == 0)
     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::finishUp(), thisProxy));
@@ -465,8 +568,12 @@ void CkMemCheckPT::finishUp()
 
   if (CkMyPe() == 0)
   {
-       CkPrintf("Restart finished, sending out callback ...\n");
+       CkPrintf("[%d] CkMemCheckPT ----- %s in %f seconds\n",CkMyPe(), stage, CmiWallTimer()-startTime);
        CkStartQD(cpCallback);
+       if (CkNumPes()-totalFailed() <=2) {
+         if (CkMyPe()==0) CkPrintf("Warning: CkMemCheckPT disabled!\n");
+         _memChkptOn = 0;
+       }
   } 
 }
 
@@ -476,7 +583,7 @@ void CkMemCheckPT::quiescence(CkCallback &cb)
   static int pe_count = 0;
   pe_count ++;
   CmiAssert(CkMyPe() == 0);
-  CkPrintf("quiescence %d\n", pe_count);
+//  CkPrintf("quiescence %d\n", pe_count);
   if (pe_count == CkNumPes()) {
     pe_count = 0;
     cb.send();
@@ -488,6 +595,11 @@ void CkMemCheckPT::quiescence(CkCallback &cb)
 void CkStartMemCheckpoint(CkCallback &cb)
 {
 #if CMK_MEM_CHECKPOINT
+  if (_memChkptOn == 0) {
+    CkPrintf("Warning: In-Memory checkpoint has been disabled! \n");
+    cb.send();
+    return;
+  }
     // store user callback and user data
   CkMemCheckPT::cpCallback = cb;
 
@@ -508,7 +620,7 @@ CkPrintf("CkRestartCheckPoint  CkMemCheckPT GID:%d\n", ckCheckPTGroupID.idx);
   checkptMgr.restart(diePe);
 }
 
-static int _diePE;
+static int _diePE = -1;
 
 // callback function used locally by ccs handler
 static void CkRestartCheckPointCallback(void *ignore, void *msg)
@@ -520,22 +632,45 @@ CkPrintf("CkRestartCheckPointCallback activated for diePe: %d\n", _diePE);
 static int askProcDataHandlerIdx;
 static int restartBcastHandlerIdx;
 static int recoverProcDataHandlerIdx;
+static int restartBeginHandlerIdx;
+
+static void restartBeginHandler(char *msg)
+{
+#if CMK_MEM_CHECKPOINT
+  static int count = 0;
+  CmiFree(msg);
+  CmiAssert(CkMyPe() == _diePE);
+  count ++;
+  if (count == CkNumPes()) {
+    CkRestartCheckPointCallback(NULL, NULL);
+    count = 0;
+  }
+#endif
+}
 
 static void restartBcastHandler(char *msg)
 {
 #if CMK_MEM_CHECKPOINT
   // advance phase counter
-  cur_restart_phase ++;
+  CkMemCheckPT::inRestarting = 1;
   _diePE = *(int *)(msg+CmiMsgHeaderSizeBytes);
+  // gzheng
+  if (CkMyPe() != _diePE) cur_restart_phase ++;
 
   CkPrintf("[%d] restartBcastHandler cur_restart_phase=%d _diePE:%d.\n", CkMyPe(), cur_restart_phase, _diePE);
 
   // reset QD counters
-  CpvAccess(_qd)->flushStates();
+  if (CkMyPe() != _diePE) CpvAccess(_qd)->flushStates();
 
+/*  gzheng
   if (CkMyPe()==_diePE)
       CkRestartCheckPointCallback(NULL, NULL);
+*/
   CmiFree(msg);
+
+  char restartmsg[CmiMsgHeaderSizeBytes];
+  CmiSetHandler(restartmsg, restartBeginHandlerIdx);
+  CmiSyncSend(_diePE, CmiMsgHeaderSizeBytes, (char *)&restartmsg);
 #endif
 }
 
@@ -546,16 +681,24 @@ static void recoverProcDataHandler(char *msg)
 {
 #if CMK_MEM_CHECKPOINT
    int i;
-   CmiPrintf("[%d] ----- recoverProcDataHandler  cur_restart_phase:%d\n", CkMyPe(), cur_restart_phase);
    envelope *env = (envelope *)msg;
    CkUnpackMessage(&env);
    CkProcCheckPTMessage* procMsg = (CkProcCheckPTMessage *)(EnvToUsr(env));
    cur_restart_phase = procMsg->cur_restart_phase;
+   CmiPrintf("[%d] ----- recoverProcDataHandler  cur_restart_phase:%d\n", CkMyPe(), cur_restart_phase);
+   cur_restart_phase ++;
+   CpvAccess(_qd)->flushStates();
+
    // restore readonly, mainchare, group, nodegroup
+//   int temp = cur_restart_phase;
+//   cur_restart_phase = -1;
    PUP::fromMem p(procMsg->packData);
    _handleProcData(p);
 
    CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
+   // gzheng
+   CKLOCMGR_LOOP(mgr->startInserting(););
+//   cur_restart_phase = temp;
 
    char reqmsg[CmiMsgHeaderSizeBytes+sizeof(int)];
    *(int *)(&reqmsg[CmiMsgHeaderSizeBytes]) = CkMyPe();
@@ -590,14 +733,17 @@ static void askProcDataHandler(char *msg)
 void CkMemRestart(const char *dummy)
 {
 #if CMK_MEM_CHECKPOINT
+   _diePE = CkMyPe();
    CmiPrintf("[%d] I am restarting  cur_restart_phase:%d \n",CmiMyPe(), cur_restart_phase);
+   CkMemCheckPT::startTime = CmiWallTimer();
+   CkMemCheckPT::inRestarting = 1;
    char msg[CmiMsgHeaderSizeBytes+sizeof(int)];
    *(int *)(&msg[CmiMsgHeaderSizeBytes]) = CkMyPe();
-   cur_restart_phase = 999999;             // big enough to get it processed
+   cur_restart_phase = 9999;             // big enough to get it processed
    CmiSetHandler(msg, askProcDataHandlerIdx);
    int pe = ChkptOnPe();
    CmiSyncSend(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)&msg);
-   cur_restart_phase=-1;
+   cur_restart_phase=0;    // allow all message to come in
 #else
    CmiAbort("Fault tolerance is not support, rebuild charm++ with 'ft' option");
 #endif
@@ -607,7 +753,14 @@ void CkMemRestart(const char *dummy)
 // return true if it is in restarting
 int CkInRestarting()
 {
-  return CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->inRestarting;
+#if CMK_MEM_CHECKPOINT
+  // gzheng
+  if (cur_restart_phase == 9999 || cur_restart_phase == 0) return 1;
+  //return CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->inRestarting;
+  return CkMemCheckPT::inRestarting;
+#else
+  return 0;
+#endif
 }
 
 /*****************************************************************************
@@ -631,14 +784,15 @@ void CkRegisterRestartHandler( )
   askProcDataHandlerIdx = CkRegisterHandler((CmiHandler)askProcDataHandler);
   recoverProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverProcDataHandler);
   restartBcastHandlerIdx = CkRegisterHandler((CmiHandler)restartBcastHandler);
+  restartBeginHandlerIdx = CkRegisterHandler((CmiHandler)restartBeginHandler);
 
   CpvInitialize(CkProcCheckPTMessage *, procChkptBuf);
   CpvAccess(procChkptBuf) = NULL;
 
 #if 1
   // for debugging
-  CkPrintf("[%d] PID %d\n", CkMyPe(), getpid());
-  sleep(6);
+  CkPrintf("[%d] PID %d \n", CkMyPe(), getpid());
+//  sleep(4);
 #endif
 #endif
 }
index 4ba2e81ba80d72046c1bbac1d57a0fd4b257a5c6..0fa2d5287598976328f532af901aec0d25087a50 100644 (file)
@@ -20,6 +20,7 @@ module CkMemCheckpoint {
        entry void recvData(CkArrayCheckPTMessage *);
        entry void recvProcData(CkProcCheckPTMessage *);
        entry void cpFinish();
+       entry void report();
        // restart
         entry void restart(int);
        entry void resetReductionMgr();
index dc47b85115ed4a5c51c03bbdb3b1842086646742..705fb2fc65478a21b8175f39276437126ecb439f 100644 (file)
@@ -24,6 +24,7 @@ public:
 class CkProcCheckPTMessage: public CMessage_CkProcCheckPTMessage {
 public:
        int pe;
+       int reportPe;           // chkpt starter
        int failedpe;
        int cur_restart_phase;
        int len;
@@ -62,6 +63,7 @@ public:
   void recvData(CkArrayCheckPTMessage *);
   void recvProcData(CkProcCheckPTMessage *);
   void cpFinish();
+  void report();
   void recoverBuddies();
   void recoverArrayElements();
   void quiescence(CkCallback &);
@@ -72,7 +74,9 @@ public:
 public:
   static CkCallback  cpCallback;
 
-  int inRestarting;
+  static int inRestarting;
+  static double startTime;
+  static char*  stage;
 private:
   CkVec<CkMemCheckPTInfo *> ckTable;
 
@@ -83,7 +87,9 @@ private:
 private:
   inline int iFailed() { return isFailed(CkMyPe()); }
   int isFailed(int pe);
+  int totalFailed();
   void failed(int pe);
+  inline int isMaster(int pe);
 
   void sendProcData();
 };