Adding support for migration.
authorEsteban Meneses <emenese2@illinois.edu>
Fri, 21 Sep 2012 19:32:44 +0000 (14:32 -0500)
committerEsteban Meneses <emenese2@illinois.edu>
Fri, 21 Sep 2012 19:32:44 +0000 (14:32 -0500)
src/ck-core/ckmessagelogging.C
src/ck-core/ckmessagelogging.h

index 168707eee59fb89665068a2527a49fe104af59f8..05d5e580567387c97a2b6392afe4723f4d6fdb61 100644 (file)
@@ -665,7 +665,7 @@ void sendLocalMsg(envelope *env, int _infoIdx){
        DEBUG(printf("[%d] Local Message being sent for SN %d sender %s recver %s \n",CmiMyPe(),env->SN,env->sender.toString(senderString),env->recver.toString(recverString)));
 
        // setting flag to free allocated memory space for this message
-       env->flags = env->flags | CK_FREE_MSG_MLOG;
+       //env->flags = env->flags | CK_FREE_MSG_MLOG;
 
        // getting the receiver local object
        Chare *recverObj = (Chare *)env->recver.getObject();
@@ -1175,6 +1175,12 @@ void _verifyAckHandler(VerifyAckMsg *verifyReply){
        }
 }
 
+/**
+ * Resets sequences numbers.
+ */
+void resetRSSN(void *data, ChareMlogData *mlogData){
+       mlogData->resetRSSN();
+}
 
 /**
  * Sends the checkpoint to its buddy. 
@@ -1220,6 +1226,7 @@ void sendCheckpointData(){
        }
        
 
+
        memcpy(buf,storedChkpt->buf,storedChkpt->bufSize);
        buf = &buf[storedChkpt->bufSize];
 
@@ -1289,6 +1296,7 @@ void _recvCheckpointHandler(char *_restartData){
        
 }
 
+
 /**
  * @brief Initializes variables and flags for restarting procedure.
  */
@@ -1922,6 +1930,9 @@ void _checkpointBarrierAckHandler(CheckpointBarrierMsg *msg){
        DEBUG(CmiPrintf("[%d] _checkpointBarrierAckHandler \n",CmiMyPe()));
        DEBUGLB(CkPrintf("[%d] Reaching this point\n",CkMyPe()));
 
+       // resetting sequences
+       forAllCharesDo(resetRSSN,NULL);
+
        // resuming LB function pointer
        (*resumeLbFnPtr)(centralLb);
 
@@ -2171,6 +2182,18 @@ int ChareMlogData::checkAndStoreSsn(const CkObjID &sender, MCount ssn){
        return rssn->checkAndStore(ssn);
 }
 
+/**
+ * Resets all sequences.
+ */
+void ChareMlogData::resetRSSN(){
+       CkHashtableIterator *iter = receivedSsnTable.iterator();
+       while(iter->hasNext()){
+               RSSN **row = (RSSN **)iter->next();
+               (*row)->reset();
+       }
+       delete iter;
+}
+
 /**
  * Pup method for the metadata.
  * We are preventing the whole message log to be stored (as proposed by Sayantan for dealing with multiple failures).
index 1d57cd1fb6a109bebbd2b10c12f51678a2de402d..88a0fa33ccd0c9b93cf5b92a5d7f577dc16ffa3e 100644 (file)
@@ -32,6 +32,7 @@ extern char objString[100];
 #define SYNCHRONIZED_CHECKPOINT 1
 
 #define DEBUG(x) // x
+#define DEBUG_NOW(x)  x
 
 class MlogEntry;
 
@@ -57,6 +58,13 @@ public:
                bzero(data,sizeof(MCount)*currentSize);
        }
 
+       // Resets the sequences
+       void reset(){
+               start = 0;
+               end = 0;
+               bzero(data,sizeof(MCount)*currentSize);
+       }
+
        // Checks if a particular SSN is already in the data; if not, stores it         
        // return value: 0 (sucess, value stored), 1 (value already there)
        int checkAndStore(MCount ssn){
@@ -71,7 +79,7 @@ public:
 
                // checking if ssn was already received
                if(ssn <= data[start]){
-                       DEBUG(CkPrintf("[%d] Repeated ssn=%d start=%d\n",CkMyPe(),ssn,data[start]));
+                       DEBUG_NOW(CkPrintf("[%d] Repeated ssn=%d start=%d\n",CkMyPe(),ssn,data[start]));
                        return 1;
                }
 
@@ -91,7 +99,7 @@ public:
                        delete[] old;
                }
 
-               DEBUG(CkPrintf("[%d] Ahead ssn=%d start=%d\n",CkMyPe(),ssn,data[start]));
+               DEBUG_NOW(CkPrintf("[%d] Ahead ssn=%d start=%d\n",CkMyPe(),ssn,data[start]));
 
                // adding ssn into data
                num = end - start;
@@ -126,6 +134,12 @@ public:
                for(int i=0;i<currentSize;i++){
                        p | data[i];
                }
+               // HACK for migration
+               if(p.isUnpacking()){
+                       start = 0;
+                       end = 0;
+                       bzero(data,sizeof(MCount)*currentSize);
+               }
        }
 
 };
@@ -180,6 +194,7 @@ public:
        void addLogEntry(MlogEntry *entry);
        virtual void pup(PUP::er &p);
        CkQ<MlogEntry *> *getMlog(){ return &mlog;};
+       void resetRSSN();
 };
 
 /**