Fixing an error with parallel recovery.
authorEsteban Meneses <emenese2@illinois.edu>
Tue, 16 Oct 2012 21:25:54 +0000 (16:25 -0500)
committerEsteban Meneses <emenese2@illinois.edu>
Tue, 16 Oct 2012 21:25:54 +0000 (16:25 -0500)
src/ck-core/ckcausalmlog.C

index 5e9a04703053a416d552325679020af9fcb7d9c7..c47fe3e18fa093093918936e1eb331e43d33e220 100644 (file)
@@ -53,6 +53,7 @@ inline bool isLocal(int destPE);
 inline bool isTeamLocal(int destPE);
 void printLog(TProcessedLog *log);
 
+int _recoveryFlag=0;
 int _restartFlag=0;
 int _numRestartResponses=0;
 
@@ -1055,8 +1056,6 @@ int preProcessReceivedMessage(envelope *env, Chare **objPointer, MlogEntry **log
 
        // getting the receiver object
        CkObjID recver = env->recver;
-       Chare *obj = (Chare *)recver.getObject();
-       *objPointer = obj;
 
        // checking for determinants bypass in message logging
        if(env->flags & CK_BYPASS_DET_MLOG){
@@ -1071,23 +1070,26 @@ int preProcessReceivedMessage(envelope *env, Chare **objPointer, MlogEntry **log
        }
 
        // checking if receiver is NULL
+       Chare *obj = (Chare *)recver.getObject();
+       *objPointer = obj;
        if(obj == NULL){
-               return 1;
-
-/*             int possiblePE = recver.guessPE();
-               if(possiblePE != CkMyPe()){
-                       int totalSize = env->getTotalsize();
-                       CmiSyncSendAndFree(possiblePE,totalSize,(char *)env);
-                       DEBUG_PE(0,printf("[%d] Forwarding message SN %d sender %s recver %s to %d\n",CkMyPe(),env->SN,env->sender.toString(senderString), recver.toString(recverString), possiblePE));
-               }else{
-                       // this is the case where a message is received and the object has not been initialized
-                       // we delayed the delivery of the message
-                       CqsEnqueue(CpvAccess(_outOfOrderMessageQueue),env);
-                       DEBUG_PE(0,printf("[%d] Message SN %d TN %d sender %s recver %s, receiver NOT found\n",CkMyPe(),env->SN,env->TN,env->sender.toString(senderString), recver.toString(recverString)));
+               if(_recoveryFlag){
+                       int possiblePE = recver.guessPE();
+                       if(possiblePE != CkMyPe()){
+                               int totalSize = env->getTotalsize();
+                               CmiSyncSendAndFree(possiblePE,totalSize,(char *)env);
+                               DEBUG_PE(0,printf("[%d] Forwarding message SN %d sender %s recver %s to %d\n",CkMyPe(),env->SN,env->sender.toString(senderString), recver.toString(recverString), possiblePE));
+                       }else{
+                               // this is the case where a message is received and the object has not been initialized
+                               // we delayed the delivery of the message
+                               CqsEnqueue(CpvAccess(_outOfOrderMessageQueue),env);
+                               DEBUG_PE(0,printf("[%d] Message SN %d TN %d sender %s recver %s, receiver NOT found\n",CkMyPe(),env->SN,env->TN,env->sender.toString(senderString), recver.toString(recverString)));
+                       }
+                       return 0;
+               } else {
+                       return 1;
                }
-               return 0;*/
-
-       }
+       } 
 
        // checking if message comes from an old incarnation (if so, message must be discarded)
        if(env->incarnation < CpvAccess(_incarnation)[env->getSrcPe()]){
@@ -1251,6 +1253,7 @@ void startMlogCheckpoint(void *_dummy, double curWallTime){
 
        // increasing the checkpoint counter
        checkpointCount++;
+       _recoveryFlag = 0;
        
 #if DEBUG_CHECKPOINT
        if(CmiMyPe() == 0){
@@ -1702,6 +1705,7 @@ void CkMlogRestart(const char * dummy, CkArgMsg * dummyMsg){
 
        // setting the restart flag
        _restartFlag = 1;
+       _recoveryFlag = 1;
        _numRestartResponses = 0;
 
        // if we are using team-based message logging, all members of the group have to be restarted