Adding support for bypassing message logging layer.
authorEsteban Meneses <emenese2@illinois.edu>
Tue, 3 Jul 2012 16:04:20 +0000 (11:04 -0500)
committerEsteban Meneses <emenese2@illinois.edu>
Tue, 3 Jul 2012 16:04:20 +0000 (11:04 -0500)
src/ck-core/ckcausalmlog.C
src/ck-core/ckreduction.C
src/ck-core/envelope.h

index 37ace0dc8c272824758d15ed134d722ec768ac0d..8278cf60c54f8077267a59e8cd5d94020b2011e8 100644 (file)
@@ -38,6 +38,7 @@
 #define DEBUG_CHECKPOINT 1
 #define DEBUG_NOW(x) x
 #define DEBUG_PE(x,y) // if(CkMyPe() == x) y
+#define DEBUG_PE_NOW(x,y)  if(CkMyPe() == x) y
 #define DEBUG_RECOVERY(x) //x
 
 extern const char *idx2str(const CkArrayIndex &ind);
@@ -601,6 +602,7 @@ void sendCommonMsg(CkObjID &recver,envelope *_env,int destPE,int _infoIdx){
        MCount ticketNumber = 0;
        int resend=0; //is it a resend
        char recverName[100];
+       char senderString[100];
        double _startTime=CkWallTimer();
        
        DEBUG_MEM(CmiMemoryCheck());
@@ -611,6 +613,15 @@ void sendCommonMsg(CkObjID &recver,envelope *_env,int destPE,int _infoIdx){
                generalCldEnqueue(destPE,env,_infoIdx);
                return;
        }
+
+       // checking if this message should bypass determinants in message-logging
+       if(env->flags & CK_BYPASS_DET_MLOG){
+               env->sender = CpvAccess(_currentObj)->mlogData->objID;
+               env->recver = recver;
+               CkPrintf("[%d] Bypassing determinants from %s to %s PE %d\n",CkMyPe(),CpvAccess(_currentObj)->mlogData->objID.toString(senderString),recver.toString(recverName),destPE);
+               generalCldEnqueue(destPE,env,_infoIdx);
+               return;
+       }
        
        // setting message logging data in the envelope
        env->incarnation = CpvAccess(_incarnation)[CkMyPe()];
@@ -637,7 +648,6 @@ void sendCommonMsg(CkObjID &recver,envelope *_env,int destPE,int _infoIdx){
                resend = 1;
        }
        
-       char senderString[100];
 //     if(env->SN != 1){
                DEBUG(printf("[%d] Generate Ticket Request to %s from %s PE %d SN %d \n",CkMyPe(),env->recver.toString(recverName),env->sender.toString(senderString),destPE,env->SN));
        //      CmiPrintStackTrace(0);
@@ -727,7 +737,7 @@ void sendMsg(CkObjID &sender,CkObjID &recver,int destPE,MlogEntry *entry,MCount
 #endif
                }else{
                        // the message has to be deleted after it has been sent
-                       entry->env->freeMsg = true;
+                       entry->env->flags = entry->env->flags | CK_FREE_MSG_MLOG;
                }
        }
 
@@ -1034,9 +1044,17 @@ int preProcessReceivedMessage(envelope *env, Chare **objPointer, MlogEntry **log
 
        // getting the receiver object
        CkObjID recver = env->recver;
+
+       // checking for determinants bypass in message logging
+       if(env->flags & CK_BYPASS_DET_MLOG){
+               DEBUG_NOW(printf("[%d] Bypassing message sender %s recver %s \n",CkMyPe(),env->sender.toString(senderString), recver.toString(recverString)));
+               return 1;       
+       }
+
+       // checking if receiver is fault aware
        if(!fault_aware(recver)){
-               return 1;
                CkPrintf("[%d] Receiver NOT fault aware\n",CkMyPe());
+               return 1;
        }
 
        Chare *obj = (Chare *)recver.getObject();
@@ -1066,7 +1084,7 @@ int preProcessReceivedMessage(envelope *env, Chare **objPointer, MlogEntry **log
        }
 
        DEBUG_MEM(CmiMemoryCheck());
-       DEBUG_PE(0,printf("[%d] Message received, sender = %s SN %d TN %d tProcessed %d for recver %s at %.6lf \n",CkMyPe(),env->sender.toString(senderString),env->SN,env->TN,obj->mlogData->tProcessed, recver.toString(recverString),CkWallTimer()));
+       DEBUG_PE(2,printf("[%d] Message received, sender = %s SN %d TN %d tProcessed %d for recver %s at %.6lf \n",CkMyPe(),env->sender.toString(senderString),env->SN,env->TN,obj->mlogData->tProcessed, recver.toString(recverString),CkWallTimer()));
 
        // getting a ticket for this message
        ticketSuccess = _getTicket(env,&flag);
@@ -1095,7 +1113,7 @@ int preProcessReceivedMessage(envelope *env, Chare **objPointer, MlogEntry **log
 //env->sender.updatePosition(env->getSrcPe());
        if(env->TN == obj->mlogData->tProcessed+1){
                //the message that needs to be processed now
-               DEBUG_PE(3,printf("[%d] Message SN %d TN %d sender %s recver %s being processed recvPointer %p\n",CkMyPe(),env->SN,env->TN,env->sender.toString(senderString), recver.toString(recverString),obj));
+               DEBUG_PE(2,printf("[%d] Message SN %d TN %d sender %s recver %s being processed recvPointer %p\n",CkMyPe(),env->SN,env->TN,env->sender.toString(senderString), recver.toString(recverString),obj));
                // once we find a message that we can process we put back all the messages in the out of order queue
                // back into the main scheduler queue. 
        DEBUG_MEM(CmiMemoryCheck());
index 316aae725e9b966e223e056c89ba528b07fab750..c1eb410a1cf4e3fa382b00596e181f24955ee67e 100644 (file)
@@ -399,6 +399,10 @@ void CkReductionMgr::contribute(contributorInfo *ci,CkReductionMsg *m)
 
        // if object is an immigrant recovery object, we send the contribution to the source PE
        if(CpvAccess(_currentObj)->mlogData->immigrantRecFlag){
+               
+               // turning on the message-logging bypass flag
+               envelope *env = UsrToEnv(m);
+               env->flags = env->flags | CK_BYPASS_DET_MLOG;
        thisProxy[CpvAccess(_currentObj)->mlogData->immigrantSourcePE].contributeViaMessage(m);
                return;
        }
@@ -417,6 +421,11 @@ void CkReductionMgr::contribute(contributorInfo *ci,CkReductionMsg *m)
 
 #if defined(_FAULT_CAUSAL_)
 void CkReductionMgr::contributeViaMessage(CkReductionMsg *m){
+       //if(CkMyPe() == 2) CkPrintf("[%d] ---> Contributing Via Message\n",CkMyPe());
+       
+       // turning off bypassing flag
+       envelope *env = UsrToEnv(m);
+       env->flags = env->flags & ~CK_BYPASS_DET_MLOG;
 
        // adding contribution
     addContribution(m);
index c3de29f2f26fb96bd4373a9a2fc472c00935beaa..37248a4f26dc08430c49078a77d8a54ab5e94640 100644 (file)
 // silly ancient name: for backward compatability only.
 #define PW(x) CkPriobitsToInts(x) 
 
-
+#if defined(_FAULT_CAUSAL_)
+#define CK_FREE_MSG_MLOG       0x1
+#define CK_BYPASS_DET_MLOG     0x2
+#endif
 
 //#define USE_CRITICAL_PATH_HEADER_ARRAY
 
@@ -185,8 +188,7 @@ public:
     MCount SN;
     MCount TN;
     int incarnation;
-    MlogEntry *localMlogEntry;
-    bool freeMsg;
+       int flags;
 #endif
 private:
     u_type type;           ///< Depends on message type (attribs.mtype)
@@ -265,7 +267,6 @@ private:
       env->SN = 0;
       env->TN = 0;
          env->incarnation = -1;
-      env->localMlogEntry = NULL;
 #endif
 
       return env;