add phase change notification
authorXiang Ni <xiangni2@illinois.edu>
Sun, 13 Jan 2013 19:41:39 +0000 (13:41 -0600)
committerXiang Ni <xiangni2@illinois.edu>
Sun, 13 Jan 2013 19:41:39 +0000 (13:41 -0600)
src/ck-core/ckmemcheckpoint.C

index 160c6f3af016d414a89aaaee4ecbc5afd19ff581..08bea975f2d2c488f8dc5fba9936403a5663613e 100644 (file)
@@ -74,6 +74,7 @@ void noopck(const char*, ...)
 #define STREAMING_INFORMHOME                    1
 CpvDeclare(int, _crashedNode);
 CpvDeclare(int, use_checksum);
+CpvDeclare(int, resilience);
 CpvDeclare(int, _remoteCrashedNode);
 
 // static, so that it is accessible from Converse part
@@ -125,6 +126,8 @@ CpvDeclare(CkCheckPTMessage**, localProcChkpBuf);
 //store the checkpoint of the buddy to compare
 //do not need the whole msg, can be the checksum
 CpvDeclare(CkCheckPTMessage*, buddyBuf);
+CpvDeclare(CkCheckPTMessage*, recoverProcBuf);
+CpvDeclare(CkCheckPTMessage*, recoverArrayBuf);
 //pointer of the checkpoint going to be written
 CpvDeclare(int, curPointer);
 CpvDeclare(int, recvdRemote);
@@ -133,6 +136,8 @@ CpvDeclare(int, localChkpDone);
 CpvDeclare(int, remoteChkpDone);
 CpvDeclare(int, remoteStarted);
 CpvDeclare(int, localStarted);
+CpvDeclare(int, remoteReady);
+CpvDeclare(int, localReady);
 CpvDeclare(int, recvdArrayChkp);
 CpvDeclare(int, recvdProcChkp);
 CpvDeclare(int, localChecksum);
@@ -145,6 +150,7 @@ static inline void _handleProcData(PUP::er &p,CmiBool create= CmiTrue);
 static int askPhaseHandlerIdx;
 static int recvPhaseHandlerIdx;
 static int askProcDataHandlerIdx;
+static int askRecoverDataHandlerIdx;
 static int restartBcastHandlerIdx;
 static int recoverProcDataHandlerIdx;
 static int restartBeginHandlerIdx;
@@ -432,6 +438,8 @@ void CkMemCheckPT::getIter(){
   localMaxIter = maxIter+1;
   contribute(sizeof(int),&localMaxIter,CkReduction::max_int,CkCallback(CkReductionTarget(CkMemCheckPT,recvMaxIter),thisProxy));
   int elemCount = CkCountChkpSyncElements();
+  if(CkMyPe()==0)
+    startTime = CmiWallTimer();
   if(elemCount == 0){
     contribute(CkCallback(CkReductionTarget(CkMemCheckPT,startChkp),thisProxy[0]));
   }
@@ -460,7 +468,7 @@ void CkMemCheckPT::reachChkpIter(){
 }
 
 void CkMemCheckPT::startChkp(){
-  CkPrintf("start checkpoint\n");
+  CkPrintf("start checkpoint at %lf in %lf\n",CmiWallTimer(),CmiWallTimer()-startTime);
   CkStartMemCheckpoint(cpCallback);
 }
 
@@ -776,29 +784,41 @@ void CkMemCheckPT::startCheckpoint(){
   }
   else{
     if(CkReplicaAlive()==0){//TODO add flag if sent already but the replica hasn't recovered when the next checkpoint
-      if(CkMyPe() == CpvAccess(_remoteCrashedNode))
-      {        
-        int pointer = CpvAccess(curPointer);
-        //send the proc data
-        CkCheckPTMessage * procMsg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(localProcChkpBuf)[pointer]);
-        procMsg->pointer = pointer;
-        envelope * env = (envelope *)(UsrToEnv(procMsg));
+      //control when the message can be sent: after the crashed replica change the phase number, otherwise buffer it
+      if(CpvAccess(remoteReady)==1){
+        if(CkMyPe() == CpvAccess(_remoteCrashedNode))
+        {      
+          int pointer = CpvAccess(curPointer);
+          //send the proc data
+          CkCheckPTMessage * procMsg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(localProcChkpBuf)[pointer]);
+          procMsg->pointer = pointer;
+          envelope * env = (envelope *)(UsrToEnv(procMsg));
+          CkPackMessage(&env);
+          CmiSetHandler(env,recoverRemoteProcDataHandlerIdx);
+          CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
+          if(CkMyPe() == CpvAccess(_remoteCrashedNode)){
+            CkPrintf("[%d] sendProcdata\n",CkMyPe());
+          }
+        }
+        //send the array checkpoint data
+        CkCheckPTMessage * msg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
+        msg->pointer = CpvAccess(curPointer);
+        envelope * env = (envelope *)(UsrToEnv(msg));
         CkPackMessage(&env);
-        CmiSetHandler(env,recoverRemoteProcDataHandlerIdx);
+        CmiSetHandler(env,recoverRemoteArrayDataHandlerIdx);
         CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
+        if(CkMyPe() == CpvAccess(_remoteCrashedNode))
+          CkPrintf("[%d] sendArraydata\n",CkMyPe());
+      }else{
         if(CkMyPe() == CpvAccess(_remoteCrashedNode)){
-          CkPrintf("[%d] sendProcdata\n",CkMyPe());
+          int pointer = CpvAccess(curPointer);
+          CpvAccess(recoverArrayBuf) = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(localProcChkpBuf)[pointer]);
+          (CpvAccess(recoverArrayBuf))->pointer = pointer;
         }
+        CpvAccess(recoverProcBuf) = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
+        (CpvAccess(recoverProcBuf))->pointer = CpvAccess(curPointer);
+        CpvAccess(localReady) = 1;
       }
-      //send the array checkpoint data
-      CkCheckPTMessage * msg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
-      msg->pointer = CpvAccess(curPointer);
-      envelope * env = (envelope *)(UsrToEnv(msg));
-      CkPackMessage(&env);
-      CmiSetHandler(env,recoverRemoteArrayDataHandlerIdx);
-      CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
-      if(CkMyPe() == CpvAccess(_remoteCrashedNode))
-        CkPrintf("[%d] sendArraydata\n",CkMyPe());
       //can continue work, no need to wait for my replica
       int _ret = 1;
       notifyReplica = 1;
@@ -848,6 +868,8 @@ void CkMemCheckPT::doneRComparison(int ret){
 
 void CkMemCheckPT::doneBothComparison(){
   CpvAccess(recvdRemote) = 0;
+  CpvAccess(remoteReady) = 0;
+  CpvAccess(localReady) = 0;
   CpvAccess(recvdLocal) = 0;
   CpvAccess(localChkpDone) = 0;
   CpvAccess(remoteChkpDone) = 0;
@@ -1861,6 +1883,35 @@ void CkMemCheckPT::RollBack(){
       CpvAccess(_curRestartPhase)--;
       CkMemCheckPT::inRestarting = 1;
       CmiFree(msg);
+      //notify the buddy in the replica now i can receive the checkpoint msg
+      char *rmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
+      CmiSetHandler(rmsg, askRecoverDataHandlerIdx);
+      CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes,(char *)rmsg);
+      //timer start
+      if(CpvAccess( _crashedNode )==CkMyPe()){
+        CkMemCheckPT::startTime = restartT = CmiWallTimer();
+        CmiPrintf("[%d][%d] ask for checkpoint data in replica at %lf\n",CmiMyPartition(),CmiMyPe(),CmiWallTimer());
+      }
+    }
+    
+    static void askRecoverDataHandler(char * msg){
+      CpvAccess(remoteReady)=1;
+      if(CpvAccess(localReady)==1){
+        if(CmiMyPe() == CpvAccess(_remoteCrashedNode))
+        {      
+          envelope * env = (envelope *)(UsrToEnv(CpvAccess(recoverArrayBuf)));
+          CkPackMessage(&env);
+          CmiSetHandler(env,recoverRemoteProcDataHandlerIdx);
+          CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
+          CmiPrintf("[%d] sendProcdata\n",CmiMyPe());
+        }
+        //send the array checkpoint data
+        envelope * env = (envelope *)(UsrToEnv(CpvAccess(recoverArrayBuf)));
+        CkPackMessage(&env);
+        CmiSetHandler(env,recoverRemoteArrayDataHandlerIdx);
+        CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
+        
+      }
     }
     // called on crashed processor
     static void recoverProcDataHandler(char *msg)
@@ -2061,10 +2112,21 @@ void CkMemCheckPT::RollBack(){
         arg_where = CkCheckPoint_inDISK;
       }
       CpvInitialize(int, use_checksum);
+      CpvInitialize(int, resilience);
       CpvAccess(use_checksum)=0;
+      CpvAccess(resilience)=0;
       if(CmiGetArgFlagDesc(argv, "+use_checksum", "use checksum strategy")){
         CpvAccess(use_checksum)=1;
       }
+      if(CmiGetArgFlagDesc(argv, "+strong_resilience", "use strong resilience")){
+        CpvAccess(resilience)=1;
+      }
+      if(CmiGetArgFlagDesc(argv, "+weak_resilience", "use strong resilience")){
+        CpvAccess(resilience)=2;
+      }
+      if(CmiGetArgFlagDesc(argv, "+medium_resilience", "use strong resilience")){
+        CpvAccess(resilience)=3;
+      }
       // initiliazing _crashedNode variable
       CpvInitialize(int, _crashedNode);
       CpvInitialize(int, _remoteCrashedNode);
@@ -2208,6 +2270,7 @@ void CkMemCheckPT::RollBack(){
 #if CMK_MEM_CHECKPOINT
         notifyHandlerIdx = CkRegisterHandler((CmiHandler)notifyHandler);
         askProcDataHandlerIdx = CkRegisterHandler((CmiHandler)askProcDataHandler);
+        askRecoverDataHandlerIdx = CkRegisterHandler((CmiHandler)askRecoverDataHandler);
         recoverProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverProcDataHandler);
         restartBcastHandlerIdx = CkRegisterHandler((CmiHandler)restartBcastHandler);
         restartBeginHandlerIdx = CkRegisterHandler((CmiHandler)restartBeginHandler);
@@ -2234,12 +2297,16 @@ void CkMemCheckPT::RollBack(){
         CpvInitialize(CkCheckPTMessage **, chkpBuf);
         CpvInitialize(CkCheckPTMessage **, localProcChkpBuf);
         CpvInitialize(CkCheckPTMessage *, buddyBuf);
+        CpvInitialize(CkCheckPTMessage *, recoverProcBuf);
+        CpvInitialize(CkCheckPTMessage *, recoverArrayBuf);
         CpvInitialize(int,curPointer);
         CpvInitialize(int,recvdLocal);
         CpvInitialize(int,localChkpDone);
         CpvInitialize(int,remoteChkpDone);
         CpvInitialize(int,remoteStarted);
         CpvInitialize(int,localStarted);
+        CpvInitialize(int,localReady);
+        CpvInitialize(int,remoteReady);
         CpvInitialize(int,recvdRemote);
         CpvInitialize(int,recvdProcChkp);
         CpvInitialize(int,localChecksum);
@@ -2248,6 +2315,8 @@ void CkMemCheckPT::RollBack(){
 
         CpvAccess(procChkptBuf) = NULL;
         CpvAccess(buddyBuf) = NULL;
+        CpvAccess(recoverProcBuf) = NULL;
+        CpvAccess(recoverArrayBuf) = NULL;
         CpvAccess(chkpBuf) = new CkCheckPTMessage *[2];
         CpvAccess(localProcChkpBuf) = new CkCheckPTMessage *[2];
         CpvAccess(chkpBuf)[0] = NULL;
@@ -2261,6 +2330,8 @@ void CkMemCheckPT::RollBack(){
         CpvAccess(remoteChkpDone) = 0;
         CpvAccess(remoteStarted) = 0;
         CpvAccess(localStarted) = 0;
+        CpvAccess(localReady) = 0;
+        CpvAccess(remoteReady) = 0;
         CpvAccess(recvdRemote) = 0;
         CpvAccess(recvdProcChkp) = 0;
         CpvAccess(localChecksum) = 0;