add strong resilience
authorXiang Ni <xiangni2@illinois.edu>
Sun, 13 Jan 2013 21:33:40 +0000 (15:33 -0600)
committerXiang Ni <xiangni2@illinois.edu>
Sun, 13 Jan 2013 21:33:40 +0000 (15:33 -0600)
src/ck-core/ckmemcheckpoint.C

index 08bea975f2d2c488f8dc5fba9936403a5663613e..510fbd0c6a10ddc035c01720c76c568010676c74 100644 (file)
@@ -812,11 +812,11 @@ void CkMemCheckPT::startCheckpoint(){
       }else{
         if(CkMyPe() == CpvAccess(_remoteCrashedNode)){
           int pointer = CpvAccess(curPointer);
-          CpvAccess(recoverArrayBuf) = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(localProcChkpBuf)[pointer]);
-          (CpvAccess(recoverArrayBuf))->pointer = pointer;
+          CpvAccess(recoverProcBuf) = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(localProcChkpBuf)[pointer]);
+          (CpvAccess(recoverProcBuf))->pointer = pointer;
         }
-        CpvAccess(recoverProcBuf) = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
-        (CpvAccess(recoverProcBuf))->pointer = CpvAccess(curPointer);
+        CpvAccess(recoverArrayBuf) = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
+        (CpvAccess(recoverArrayBuf))->pointer = CpvAccess(curPointer);
         CpvAccess(localReady) = 1;
       }
       //can continue work, no need to wait for my replica
@@ -1505,7 +1505,7 @@ void CkMemCheckPT::RollBack(){
         CpvAccess(recvdArrayChkp) = 0;
         CpvAccess(curPointer)^=1;
         //notify my replica, restart is done
-        if (CkMyPe() == 0){
+        if (CkMyPe() == 0&&CpvAccess(resilience)!=1){
           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
           CmiSetHandler(msg,replicaRecoverHandlerIdx);
           CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes,(char *)msg);
@@ -1632,7 +1632,7 @@ void CkMemCheckPT::RollBack(){
       static int count = 0;
       CmiAssert(CkMyPe() == _diePE);
       count ++;
-      if (count == CkNumPes()) {
+      if (count == CkNumPes()||(CpvAccess(resilience)==1&&count==1)) {
         printf("restart begin on %d\n",CkMyPe());
         CkRestartCheckPointCallback(NULL, NULL);
         count = 0;
@@ -1810,7 +1810,6 @@ void CkMemCheckPT::RollBack(){
     static void replicaDieBcastHandler(char *msg){
       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
       CpvAccess(_remoteCrashedNode) = diePe;
-      CkMemCheckPT::replicaAlive = 0;
       if(CkMyPe()==diePe){
         CmiPrintf("pe %d in replicad word die\n",diePe);
         CmiPrintf("replicaAlive %d\n",CkMemCheckPT::replicaAlive);
@@ -1818,7 +1817,10 @@ void CkMemCheckPT::RollBack(){
       }
       find_spare_mpirank(diePe,CmiMyPartition()^1);
       //broadcast to my partition to get local max iter
-      CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->getIter();
+      if(CpvAccess(resilience)!=1){
+        CkMemCheckPT::replicaAlive = 0;
+        CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->getIter();
+      }
       CmiFree(msg);
     }
 
@@ -1884,33 +1886,62 @@ void CkMemCheckPT::RollBack(){
       CkMemCheckPT::inRestarting = 1;
       CmiFree(msg);
       //notify the buddy in the replica now i can receive the checkpoint msg
-      char *rmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
-      CmiSetHandler(rmsg, askRecoverDataHandlerIdx);
-      CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes,(char *)rmsg);
-      //timer start
-      if(CpvAccess( _crashedNode )==CkMyPe()){
-        CkMemCheckPT::startTime = restartT = CmiWallTimer();
-        CmiPrintf("[%d][%d] ask for checkpoint data in replica at %lf\n",CmiMyPartition(),CmiMyPe(),CmiWallTimer());
+      if(CpvAccess(resilience)!=1||CpvAccess(_crashedNode)==CmiMyPe()){
+        char *rmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
+        CmiSetHandler(rmsg, askRecoverDataHandlerIdx);
+        CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes,(char *)rmsg);
+        //timer start
+        if(CpvAccess(_crashedNode)==CkMyPe())
+          CkMemCheckPT::startTime = restartT = CmiWallTimer();
+        if(CpvAccess(_crashedNode)==CmiMyPe())
+          CmiPrintf("[%d][%d] ask for checkpoint data in replica at %lf\n",CmiMyPartition(),CmiMyPe(),CmiWallTimer());
+      }else{
+        CpvAccess(curPointer)^=1;
+        CkMemCheckPT::inRestarting = 1;
+        _resume_charm_message();
+        _diePE = CpvAccess(_crashedNode);
       }
     }
     
     static void askRecoverDataHandler(char * msg){
-      CpvAccess(remoteReady)=1;
-      if(CpvAccess(localReady)==1){
+      if(CpvAccess(resilience)!=1){
+        CpvAccess(remoteReady)=1;
         if(CmiMyPe() == CpvAccess(_remoteCrashedNode))
-        {      
+          CmiPrintf("[%d][%d] receive replica phase change at %lf\n",CmiMyPartition(),CmiMyPe(),CmiWallTimer());
+        if(CpvAccess(localReady)==1){
+          if(CmiMyPe() == CpvAccess(_remoteCrashedNode))
+          {    
+            envelope * env = (envelope *)(UsrToEnv(CpvAccess(recoverProcBuf)));
+            CkPackMessage(&env);
+            CmiSetHandler(env,recoverRemoteProcDataHandlerIdx);
+            CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
+            CmiPrintf("[%d] sendProcdata after request\n",CmiMyPe());
+          }
+          //send the array checkpoint data
           envelope * env = (envelope *)(UsrToEnv(CpvAccess(recoverArrayBuf)));
           CkPackMessage(&env);
-          CmiSetHandler(env,recoverRemoteProcDataHandlerIdx);
+          CmiSetHandler(env,recoverRemoteArrayDataHandlerIdx);
           CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
-          CmiPrintf("[%d] sendProcdata\n",CmiMyPe());
+          if(CmiMyPe() == CpvAccess(_remoteCrashedNode))
+            CmiPrintf("[%d] sendArraydata after request\n",CmiMyPe());
         }
-        //send the array checkpoint data
-        envelope * env = (envelope *)(UsrToEnv(CpvAccess(recoverArrayBuf)));
+      }else{
+        int pointer = CpvAccess(curPointer)^1;
+        CkCheckPTMessage * procMsg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(localProcChkpBuf)[pointer]);
+        procMsg->pointer = pointer;
+        envelope * env = (envelope *)(UsrToEnv(procMsg));
         CkPackMessage(&env);
-        CmiSetHandler(env,recoverRemoteArrayDataHandlerIdx);
+        CmiSetHandler(env,recoverRemoteProcDataHandlerIdx);
         CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
+        CmiPrintf("[%d] sendProcdata after request\n",CmiMyPe());
         
+        CkCheckPTMessage * arrayMsg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
+        arrayMsg->pointer = pointer;
+        envelope * env1 = (envelope *)(UsrToEnv(arrayMsg));
+        CkPackMessage(&env1);
+        CmiSetHandler(env1,recoverRemoteArrayDataHandlerIdx);
+        CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,env1->getTotalsize(),(char *)env1);
+        CmiPrintf("[%d] sendArraydata after request\n",CmiMyPe());
       }
     }
     // called on crashed processor