more optimization for recover
authorXiang Ni <xiangni2@illinois.edu>
Wed, 9 Jan 2013 21:16:24 +0000 (15:16 -0600)
committerXiang Ni <xiangni2@illinois.edu>
Wed, 9 Jan 2013 21:16:24 +0000 (15:16 -0600)
src/ck-core/ckmemcheckpoint.C

index 204cbe3aed5520d86c9b1d574a6d2762e9172649..d4798bec50ae555f4ddd3032fe7683a95ff9f840 100644 (file)
@@ -708,18 +708,15 @@ void CkMemCheckPT::startCheckpoint(){
   msg->cp_flag = 1;
   int checksum;
   {
   msg->cp_flag = 1;
   int checksum;
   {
-//#if CMK_USE_CHECKSUM
     if(CpvAccess(use_checksum)&&CkReplicaAlive()==1){
       PUP::checker p(msg->packData);
       pupAllElements(p);
       checksum = p.getChecksum();
     }else{  
 //    CmiPrintf("[%d][%d] checksum %d\n",CmiMyPartition(),CkMyPe(),checksum);
     if(CpvAccess(use_checksum)&&CkReplicaAlive()==1){
       PUP::checker p(msg->packData);
       pupAllElements(p);
       checksum = p.getChecksum();
     }else{  
 //    CmiPrintf("[%d][%d] checksum %d\n",CmiMyPartition(),CkMyPe(),checksum);
-//#else 
       PUP::toMem p(msg->packData);
       pupAllElements(p);
     }
       PUP::toMem p(msg->packData);
       pupAllElements(p);
     }
-//#endif
   }
   pointer = CpvAccess(curPointer);
   if(CpvAccess(chkpBuf)[pointer]) delete CpvAccess(chkpBuf)[pointer];
   }
   pointer = CpvAccess(curPointer);
   if(CpvAccess(chkpBuf)[pointer]) delete CpvAccess(chkpBuf)[pointer];
@@ -728,10 +725,6 @@ void CkMemCheckPT::startCheckpoint(){
     CmiPrintf("[%d][%d] local checkpoint done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
   if(CkReplicaAlive()==1){
     CpvAccess(recvdLocal) = 1;
     CmiPrintf("[%d][%d] local checkpoint done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
   if(CkReplicaAlive()==1){
     CpvAccess(recvdLocal) = 1;
-//#if CMK_USE_CHECKSUM
-//    CkCheckPTMessage * tmpMsg = (CkCheckPTMessage *)CkCopyMsg((void **)&msg);
-//    CpvAccess(localChecksum) = getChecksum((char *)(tmpMsg->packData));
-//    delete tmpMsg;
     if(CpvAccess(use_checksum)){
       CpvAccess(localChecksum) = checksum;
       char *chkpMsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
     if(CpvAccess(use_checksum)){
       CpvAccess(localChecksum) = checksum;
       char *chkpMsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
@@ -739,29 +732,23 @@ void CkMemCheckPT::startCheckpoint(){
       CmiSetHandler(chkpMsg,recvRemoteChkpHandlerIdx);
       CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),chkpMsg);
     }else{
       CmiSetHandler(chkpMsg,recvRemoteChkpHandlerIdx);
       CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),chkpMsg);
     }else{
-//#else
       envelope * env = (envelope *)(UsrToEnv((CkCheckPTMessage *)CkCopyMsg((void **)&msg)));
       CkPackMessage(&env);
       CmiSetHandler(env,recvRemoteChkpHandlerIdx);
       CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
     }
       envelope * env = (envelope *)(UsrToEnv((CkCheckPTMessage *)CkCopyMsg((void **)&msg)));
       CkPackMessage(&env);
       CmiSetHandler(env,recvRemoteChkpHandlerIdx);
       CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
     }
-//#endif
   }
   if(CpvAccess(recvdRemote)==1){
     //compare the checkpoint 
     int size = CpvAccess(chkpBuf)[pointer]->len;
   }
   if(CpvAccess(recvdRemote)==1){
     //compare the checkpoint 
     int size = CpvAccess(chkpBuf)[pointer]->len;
-//    CkPrintf("[%d][%d] checkpoint size %d pointer %d \n",CmiMyPartition(),CkMyPe(),size,pointer);
-//#if CMK_USE_CHECKSUM
     if(CpvAccess(use_checksum)){
       if(CpvAccess(localChecksum) == CpvAccess(remoteChecksum)){
         thisProxy[CkMyPe()].doneComparison(true);
       }
       else{
     if(CpvAccess(use_checksum)){
       if(CpvAccess(localChecksum) == CpvAccess(remoteChecksum)){
         thisProxy[CkMyPe()].doneComparison(true);
       }
       else{
-        //CkPrintf("[%d][%d] failed the test pointer %d \n",CmiMyPartition(),CkMyPe(),pointer);
         thisProxy[CkMyPe()].doneComparison(false);
       }
     }else{
         thisProxy[CkMyPe()].doneComparison(false);
       }
     }else{
-//#else
       if(CpvAccess(buddyBuf)->len == size && compare((char *)(CpvAccess(chkpBuf)[pointer]->packData),(char *)(CpvAccess(buddyBuf)->packData))){
         thisProxy[CkMyPe()].doneComparison(true);
       }
       if(CpvAccess(buddyBuf)->len == size && compare((char *)(CpvAccess(chkpBuf)[pointer]->packData),(char *)(CpvAccess(buddyBuf)->packData))){
         thisProxy[CkMyPe()].doneComparison(true);
       }
@@ -770,7 +757,6 @@ void CkMemCheckPT::startCheckpoint(){
         thisProxy[CkMyPe()].doneComparison(false);
       }
     }
         thisProxy[CkMyPe()].doneComparison(false);
       }
     }
-//#endif
     if(CkMyPe()==0)
       CmiPrintf("[%d][%d] comparison done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
   }
     if(CkMyPe()==0)
       CmiPrintf("[%d][%d] comparison done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
   }
@@ -800,6 +786,10 @@ void CkMemCheckPT::startCheckpoint(){
       if(CkMyPe() == CpvAccess(_remoteCrashedNode))
         CkPrintf("[%d] sendArraydata\n",CkMyPe());
       //can continue work, no need to wait for my replica
       if(CkMyPe() == CpvAccess(_remoteCrashedNode))
         CkPrintf("[%d] sendArraydata\n",CkMyPe());
       //can continue work, no need to wait for my replica
+      int _ret = 1;
+      notifyReplica = 1;
+      CkCallback cb(CkReductionTarget(CkMemCheckPT,doneRComparison),thisProxy[0]);
+      contribute(sizeof(int),&_ret,CkReduction::sum_int,cb);
     }
   }
 #endif
     }
   }
 #endif
@@ -823,6 +813,8 @@ void CkMemCheckPT::doneRComparison(int ret){
     CpvAccess(localChkpDone) = 1;
     if(CpvAccess(remoteChkpDone) ==1){
       thisProxy.doneBothComparison();
     CpvAccess(localChkpDone) = 1;
     if(CpvAccess(remoteChkpDone) ==1){
       thisProxy.doneBothComparison();
+    }else{
+      CmiPrintf("[%d][%d] Local checkpoint finished in %f seconds at %lf, waiting for replica ... \n", CmiMyPartition(),CkMyPe(), CmiWallTimer()-startTime,CmiWallTimer());
     }
     if(notifyReplica == 0){
       //notify the replica am done
     }
     if(notifyReplica == 0){
       //notify the replica am done
@@ -844,6 +836,8 @@ void CkMemCheckPT::doneBothComparison(){
   CpvAccess(recvdLocal) = 0;
   CpvAccess(localChkpDone) = 0;
   CpvAccess(remoteChkpDone) = 0;
   CpvAccess(recvdLocal) = 0;
   CpvAccess(localChkpDone) = 0;
   CpvAccess(remoteChkpDone) = 0;
+  CpvAccess(_remoteCrashedNode) = -1;
+  CkMemCheckPT::replicaAlive = 1;
   int size = CpvAccess(chkpBuf)[CpvAccess(curPointer)]->len;
   CpvAccess(curPointer)^=1;
   inCheckpointing = 0;
   int size = CpvAccess(chkpBuf)[CpvAccess(curPointer)]->len;
   CpvAccess(curPointer)^=1;
   inCheckpointing = 0;
@@ -1472,9 +1466,11 @@ void CkMemCheckPT::RollBack(){
         CpvAccess(recvdArrayChkp) = 0;
         CpvAccess(curPointer)^=1;
         //notify my replica, restart is done
         CpvAccess(recvdArrayChkp) = 0;
         CpvAccess(curPointer)^=1;
         //notify my replica, restart is done
-        char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
-        CmiSetHandler(msg,replicaRecoverHandlerIdx);
-        CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes,(char *)msg);
+        if (CkMyPe() == 0){
+          char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
+          CmiSetHandler(msg,replicaRecoverHandlerIdx);
+          CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes,(char *)msg);
+        }
       }
       if (CmiMyPe() == BuddyPE(thisFailedPe)) {
         CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
       }
       if (CmiMyPe() == BuddyPE(thisFailedPe)) {
         CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
@@ -1731,16 +1727,14 @@ void CkMemCheckPT::RollBack(){
     }
 
     static void replicaRecoverHandler(char *msg){
     }
 
     static void replicaRecoverHandler(char *msg){
-      CpvAccess(_remoteCrashedNode) = -1;
-      CkMemCheckPT::replicaAlive = 1;
       //fflush(stdout);
       //CmiPrintf("[%d]receive replica recover\n",CmiMyPe());
       //fflush(stdout);
       //CmiPrintf("[%d]receive replica recover\n",CmiMyPe());
-      bool ret = true;
       CpvAccess(remoteChkpDone) = 1;
       CpvAccess(remoteChkpDone) = 1;
-      CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(ret);
+      if(CpvAccess(localChkpDone) == 1)
+        CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneRComparison(CkNumPes());
       CmiFree(msg);
       CmiFree(msg);
-
     }
     }
+
     static void replicaChkpDoneHandler(char *msg){
       CpvAccess(remoteChkpDone) = 1;
       if(CpvAccess(localChkpDone) == 1)
     static void replicaChkpDoneHandler(char *msg){
       CpvAccess(remoteChkpDone) = 1;
       if(CpvAccess(localChkpDone) == 1)
@@ -1749,7 +1743,15 @@ void CkMemCheckPT::RollBack(){
     }
 
     static void replicaDieHandler(char * msg){
     }
 
     static void replicaDieHandler(char * msg){
-#if CMK_CONVERSE_MPI   
+#if CMK_CONVERSE_MPI
+      //broadcast to every one in my replica
+      CmiSetHandler(msg, replicaDieBcastHandlerIdx);
+      CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
+#endif
+    }
+
+
+    static void replicaDieBcastHandler(char *msg){
       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
       CpvAccess(_remoteCrashedNode) = diePe;
       CkMemCheckPT::replicaAlive = 0;
       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
       CpvAccess(_remoteCrashedNode) = diePe;
       CkMemCheckPT::replicaAlive = 0;
@@ -1759,20 +1761,11 @@ void CkMemCheckPT::RollBack(){
         fflush(stdout);
       }
       find_spare_mpirank(diePe,CmiMyPartition()^1);
         fflush(stdout);
       }
       find_spare_mpirank(diePe,CmiMyPartition()^1);
-#endif
       //broadcast to my partition to get local max iter
       CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->getIter();
       CmiFree(msg);
     }
 
       //broadcast to my partition to get local max iter
       CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->getIter();
       CmiFree(msg);
     }
 
-
-    static void replicaDieBcastHandler(char *msg){
-      int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
-      CpvAccess(_remoteCrashedNode) = diePe;
-      CkMemCheckPT::replicaAlive = 0;
-      CmiFree(msg);
-    }
-
     static void recoverRemoteProcDataHandler(char *msg){
       envelope *env = (envelope *)msg;
       CkUnpackMessage(&env);
     static void recoverRemoteProcDataHandler(char *msg){
       envelope *env = (envelope *)msg;
       CkUnpackMessage(&env);
@@ -2145,14 +2138,12 @@ void CkMemCheckPT::RollBack(){
         CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
         //send to everyone in the other world
         if(CmiNumPartition()!=1){
         CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
         //send to everyone in the other world
         if(CmiNumPartition()!=1){
-          for(int i=0;i<CmiNumPes();i++){
+          //for(int i=0;i<CmiNumPes();i++){
             char * rMsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
             *(int *)(rMsg+CmiMsgHeaderSizeBytes) = buddy;
             char * rMsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
             *(int *)(rMsg+CmiMsgHeaderSizeBytes) = buddy;
-            //CmiPrintf("[%d][%d] send to processor %d in replica. \n",CmiMyPartition(), CmiMyPe(), i);
-            //fflush(stdout);
             CmiSetHandler(rMsg, replicaDieHandlerIdx);
             CmiSetHandler(rMsg, replicaDieHandlerIdx);
-            CmiRemoteSyncSendAndFree(i,CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),(char *)rMsg);
-          }
+            CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),(char *)rMsg);
+          //}
         }
       }
         else 
         }
       }
         else