bug fix: handle failure detected during checkpoint
[charm.git] / src / ck-core / ckmemcheckpoint.C
index 96e9eb0a505517541a8f0f9b582e9f6708da9668..2ab62cb990b2ce487e4aa40c490350f58fbad29f 100644 (file)
@@ -221,7 +221,6 @@ void ArrayElement::init_checkpt() {
   CmiAssert(budPEs[0] != budPEs[1]);
   // inform checkPTMgr
   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
-  //CmiPrintf("[%d] ArrayElement::init_checkpt array %d %p pe: %d %d\n", CkMyPe(), ((CkGroupID)thisArrayID).idx, this, budPEs[0], budPEs[1]);
   checkptMgr[budPEs[0]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[1]);        
   checkptMgr[budPEs[1]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[0]);
 }
@@ -474,6 +473,9 @@ void CkMemCheckPT::reachChkpIter(){
 }
 
 void CkMemCheckPT::startChkp(){
+  if(CkInCheckpointing()){
+    return;
+  }
   CkPrintf("start checkpoint at %lf in %lf\n",CmiWallTimer(),CmiWallTimer()-startTime);
   CkStartMemCheckpoint(cpCallback);
 }
@@ -580,6 +582,11 @@ void CkMemCheckPT::recoverEntry(CkArrayCheckPTMessage *msg)
 #endif
 }
 
+void CkMemCheckPT::chkpLocalStart()
+{
+  CpvAccess(localStarted) = 1;
+}
+
 // loop through my checkpoint table and ask checkpointed array elements
 // to send me checkpoint data.
 //void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
@@ -597,7 +604,6 @@ void CkMemCheckPT::doItNow(int starter)
   if(CmiNumPartition()==1)
 #endif   
   {
-
 #if !CMK_CHKP_ALL
     int len = ckTable.length();
     for (int i=0; i<len; i++) {
@@ -616,6 +622,7 @@ void CkMemCheckPT::doItNow(int starter)
 #else
     startArrayCheckpoint();
 #endif
+    // pack and send proc level data
     sendProcData();
   }
 #if CMK_CONVERSE_MPI
@@ -624,7 +631,6 @@ void CkMemCheckPT::doItNow(int starter)
     startCheckpoint();
   }
 #endif
-  // pack and send proc level data
 }
 
 class MemElementPacker : public CkLocIterator{
@@ -802,9 +808,8 @@ void CkMemCheckPT::startCheckpoint(){
           CkPackMessage(&env);
           CmiSetHandler(env,recoverRemoteProcDataHandlerIdx);
           CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
-          if(CkMyPe() == CpvAccess(_remoteCrashedNode)){
+          if(CkMyPe() == CpvAccess(_remoteCrashedNode))
             CkPrintf("[%d] sendProcdata at %lf\n",CkMyPe(),CmiWallTimer());
-          }
         }
         //send the array checkpoint data
         CkCheckPTMessage * msg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
@@ -848,6 +853,7 @@ void CkMemCheckPT::doneComparison(bool ret){
 }
 
 void CkMemCheckPT::doneRComparison(int ret){
+  CkPrintf("[%d][%d] doneRComparison\n", CmiMyPartition(), CkMyPe());
   //   if(CpvAccess(curPointer) == 0){
   //if(ret==CkNumPes()){
     CpvAccess(localChkpDone) = 1;
@@ -1520,6 +1526,7 @@ void CkMemCheckPT::RollBack(){
         CpvAccess(curPointer)^=1;
         //notify my replica, restart is done
         if (CkMyPe() == 0&&CpvAccess(resilience)!=1){
+          CpvAccess(remoteStarted) =0;
           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
           CmiSetHandler(msg,replicaRecoverHandlerIdx);
           CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes,(char *)msg);
@@ -1596,6 +1603,8 @@ void CkMemCheckPT::RollBack(){
         cb.send();
         return;
       }
+      if(CkInCheckpointing())
+        return;
       // store user callback and user data
       CkMemCheckPT::cpCallback = cb;
 
@@ -1607,6 +1616,8 @@ void CkMemCheckPT::RollBack(){
         CmiRemoteSyncSendAndFree(0,CmiMyPartition()^1,CmiMsgHeaderSizeBytes,msg);
       }
       CpvAccess(localStarted) = 1;
+      CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
+      checkptMgr.chkpLocalStart();
       // broadcast to start check pointing
       if(CmiNumPartition()==1||(CmiNumPartition()==2&&CpvAccess(remoteStarted)==1)||CkReplicaAlive()==0){
         CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
@@ -1797,18 +1808,25 @@ void CkMemCheckPT::RollBack(){
 
     static void replicaRecoverHandler(char *msg){
       //fflush(stdout);
-      //CmiPrintf("[%d]receive replica recover\n",CmiMyPe());
+      CmiPrintf("[%d][%d]receive replica recover\n",CmiMyPartition(),CmiMyPe());
       CpvAccess(remoteChkpDone) = 1;
       if(CpvAccess(localChkpDone) == 1)
         CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneRComparison(CkNumPes());
+      else{  
+        CmiPrintf("[%d]localChkpDone hasn't been finished\n",CmiMyPe());
+        //CmiAbort("impossible state");
+      }
       CmiFree(msg);
     }
 
     static void replicaChkpDoneHandler(char *msg){
+      CmiPrintf("[%d][%d]receive replica checkpoint done\n",CmiMyPartition(),CmiMyPe());
       CpvAccess(remoteChkpDone) = 1;
       int ret = *(int *)(msg+CmiMsgHeaderSizeBytes);
       if(CpvAccess(localChkpDone) == 1)
         CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneRComparison(ret);
+      else  
+        CmiPrintf("[%d]localChkpDone hasn't been finished\n",CmiMyPe());
       CmiFree(msg);
     }
 
@@ -1821,6 +1839,9 @@ void CkMemCheckPT::RollBack(){
     }
 
     static void replicaChkpStartHandler(char * msg){
+      if(CkMyPe()==0){
+        CkPrintf("[%d][%d] my replica will begin to checkpoint at %lf\n", CmiMyPartition(), CkMyPe(), CmiWallTimer());
+      }
       CpvAccess(remoteStarted) =1;
       if(CpvAccess(localStarted)==1){    
         CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
@@ -1828,22 +1849,37 @@ void CkMemCheckPT::RollBack(){
       }
     }
 
-
     static void replicaDieBcastHandler(char *msg){
       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
       CpvAccess(_remoteCrashedNode) = diePe;
       if(CkMyPe()==diePe){
-        CmiPrintf("pe %d in replicad word die\n",diePe);
+        CmiPrintf("pe %d in replica world die\n",diePe);
         fflush(stdout);
       }
+      
       if(CpvAccess(resilience)!=1||CkMyPe()!=diePe){
         find_spare_mpirank(diePe,CmiMyPartition()^1);
       }
-      //broadcast to my partition to get local max iter
+
+      //what if am already ready to checkpoint
       if(CpvAccess(resilience)!=1){
         CkMemCheckPT::replicaAlive = 0;
-        CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->getIter();
+        if(CpvAccess(localStarted)==1){    
+          if(CkMyPe()==0){
+            CkPrintf("[%d][%d] start checkpoint after replica die\n", CmiMyPartition(),CkMyPe());
+            CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
+            checkptMgr.doItNow(0);
+          }
+        }
+        //broadcast to my partition to get local max iter
+        else{
+          if(CkMyPe()==diePe){
+            CkPrintf("[%d][%d] begin to find the next checkpoint iteration\n", CmiMyPartition(),CkMyPe());
+          }
+          CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->getIter();
+        }
       }
+
       CmiFree(msg);
     }
 
@@ -1862,6 +1898,7 @@ void CkMemCheckPT::RollBack(){
         PUP::fromMem p(procMsg->packData);
         _handleProcData(p,CmiTrue);
         _initDone();
+        CkPrintf("[%d] receive recover proc data at %lf\n", CkMyPe(), CmiWallTimer());
       }
       else{
         if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
@@ -1901,6 +1938,10 @@ void CkMemCheckPT::RollBack(){
       CpvAccess(chkpBuf)[pointer] = chkpMsg;
       CpvAccess(recvdArrayChkp) =1;
       CkMemCheckPT::inRestarting = 1;
+
+      if(CkMyPe()==CpvAccess(_crashedNode))
+        CkPrintf("[%d] receive recover array data at %lf\n", CkMyPe(), CmiWallTimer());
+      
       if(CpvAccess(recvdProcChkp) == 1||CkMyPe()!= CpvAccess(_crashedNode)){
         _resume_charm_message();
         _diePE = CpvAccess(_crashedNode);
@@ -1921,7 +1962,9 @@ void CkMemCheckPT::RollBack(){
 
     static void recvPhaseHandler(char * msg)
     {
+      //decrease the phase number so that the crashed replica can communicate with the healthy one
       CpvAccess(_curRestartPhase)--;
+      
       CkMemCheckPT::inRestarting = 1;
       CmiFree(msg);
       //notify the buddy in the replica now i can receive the checkpoint msg
@@ -1930,10 +1973,10 @@ void CkMemCheckPT::RollBack(){
         CmiSetHandler(rmsg, askRecoverDataHandlerIdx);
         CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes,(char *)rmsg);
         //timer start
-        if(CpvAccess(_crashedNode)==CkMyPe())
+        if(CpvAccess(_crashedNode)==CkMyPe()){
           CkMemCheckPT::startTime = restartT = CmiWallTimer();
-        if(CpvAccess(_crashedNode)==CmiMyPe())
-          CmiPrintf("[%d][%d] ask for checkpoint data in replica at %lf\n",CmiMyPartition(),CmiMyPe(),CmiWallTimer());
+          CmiPrintf("[%d][%d] ask for checkpoint data in replica at %lf phase %d\n",CmiMyPartition(),CmiMyPe(),CmiWallTimer(), CpvAccess(_curRestartPhase));
+        }
       }else{
         CpvAccess(curPointer)^=1;
         CkMemCheckPT::inRestarting = 1;
@@ -1944,7 +1987,7 @@ void CkMemCheckPT::RollBack(){
     
     static void askRecoverDataHandler(char * msg){
       if(CmiMyPe() == CpvAccess(_remoteCrashedNode))
-       CmiPrintf("[%d][%d] receive replica phase change at %lf\n",CmiMyPartition(),CmiMyPe(),CmiWallTimer());
+        CmiPrintf("[%d][%d] receive replica phase change at %lf\n",CmiMyPartition(),CmiMyPe(),CmiWallTimer());
       if(CpvAccess(resilience)!=1){
         CpvAccess(remoteReady)=1;
         if(CpvAccess(localReady)==1){
@@ -1954,7 +1997,7 @@ void CkMemCheckPT::RollBack(){
             CkPackMessage(&env);
             CmiSetHandler(env,recoverRemoteProcDataHandlerIdx);
             CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
-            CmiPrintf("[%d] sendProcdata after request at \n",CmiMyPe(),CmiWallTimer());
+            CmiPrintf("[%d] sendProcdata after request at %lf\n",CmiMyPe(),CmiWallTimer());
           }
           //send the array checkpoint data
           envelope * env = (envelope *)(UsrToEnv(CpvAccess(recoverArrayBuf)));
@@ -2049,7 +2092,7 @@ void CkMemCheckPT::RollBack(){
     // called on PE 0
     void qd_callback(void *m)
     {
-      CmiPrintf("[%d] callback after QD for crashed node: %d. at %lf\n", CkMyPe(), CpvAccess(_crashedNode),CmiWallTimer());
+      CmiPrintf("[%d][%d] callback after QD for crashed node: %d. at %lf\n",CmiMyPartition(), CkMyPe(), CpvAccess(_crashedNode),CmiWallTimer());
       fflush(stdout);
       CkFreeMsg(m);
       if(CmiNumPartition()==1){
@@ -2185,7 +2228,7 @@ void CkMemCheckPT::RollBack(){
       CpvInitialize(int, use_checksum);
       CpvInitialize(int, resilience);
       CpvAccess(use_checksum)=0;
-      CpvAccess(resilience)=0;
+      CpvAccess(resilience)=0;//TODO should set a default resilience level
       if(CmiGetArgFlagDesc(argv, "+use_checksum", "use checksum strategy")){
         CpvAccess(use_checksum)=1;
       }
@@ -2305,12 +2348,10 @@ void CkMemCheckPT::RollBack(){
         CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
         //send to everyone in the other world
         if(CmiNumPartition()!=1){
-          //for(int i=0;i<CmiNumPes();i++){
-            char * rMsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
-            *(int *)(rMsg+CmiMsgHeaderSizeBytes) = buddy;
-            CmiSetHandler(rMsg, replicaDieHandlerIdx);
-            CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),(char *)rMsg);
-          //}
+          char * rMsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
+          *(int *)(rMsg+CmiMsgHeaderSizeBytes) = buddy;
+          CmiSetHandler(rMsg, replicaDieHandlerIdx);
+          CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),(char *)rMsg);
         }
       }
         else