Adding support for collectives and parallel recovery.
authorEsteban Meneses <emenese2@illinois.edu>
Mon, 4 Jun 2012 22:17:49 +0000 (17:17 -0500)
committerEsteban Meneses <emenese2@illinois.edu>
Mon, 4 Jun 2012 22:17:49 +0000 (17:17 -0500)
src/ck-core/ckcausalmlog.C
src/ck-core/ckcausalmlog.h
src/ck-core/ckreduction.C

index 381b79f706b7d4259396fe3d98becb9e99df45cb..4aed8567663589afb0399d05c93fb209849cd7ed 100644 (file)
@@ -133,7 +133,8 @@ CpvDeclare(char **, _storeDetsPtrs);
 /***** *****/
 
 /***** VARIABLES FOR PARALLEL RECOVERY *****/
-CpvDeclare(CkVec<LocationID *> *, _emigrantRecObjs);
+CpvDeclare(int, _numEmigrantRecObjs);
+CpvDeclare(int, _numImmigrantRecObjs);
 CpvDeclare(CkVec<CkLocRec_local *> *, _immigrantRecObjs);
 /***** *****/
 
@@ -333,10 +334,13 @@ void _messageLoggingInit(){
        CpvAccess(_storeDetsPtrs) = (char **) CmiAlloc(sizeof(char *) * 2);
 
        // Cpv variables for parallel recovery
-       CpvInitialize(CkVec<LocationID *> *, _emigrantRecObjs);
-       CpvAccess(_emigrantRecObjs) = new CkVec<LocationID *>;
-       CpvInitialize(CkVec<CkLocRec_local *> *, _immigrantRecObjs);
-       CpvAccess(_immigrantRecObjs) = new CkVec<CkLocRec_local *>;
+       CpvInitialize(int, _numEmigrantRecObjs);
+    CpvAccess(_numEmigrantRecObjs) = 0;
+    CpvInitialize(int, _numImmigrantRecObjs);
+    CpvAccess(_numImmigrantRecObjs) = 0;
+
+    CpvInitialize(CkVec<CkLocRec_local *> *, _immigrantRecObjs);
+    CpvAccess(_immigrantRecObjs) = new CkVec<CkLocRec_local *>;
 
        //Cpv variables for checkpoint
        CpvInitialize(StoredCheckpoint *,_storedCheckpointData);
@@ -2903,20 +2907,17 @@ public:
                CkPrintf("[%d] Distributing objects to Processor %d: ",CkMyPe(),*targetPE);
                idx.print();
 
-               // inserting the emigrant object to the list of emigrant recovery objects
-               // every broadcast message will be sent to these objects
-               LocationID *location = new LocationID();
-               location->idx = idx;
-               location->gid = loc.getManager()->getGroupID();
-               location->PE = *targetPE;
-               CpvAccess(_emigrantRecObjs)->push_back(location);       
+               // incrementing number of emigrant objects
+               CpvAccess(_numEmigrantRecObjs)++;
                        
                //pack up this location and send it across
                PUP::sizer psizer;
                pupLocation(loc,psizer);
-               int totalSize = psizer.size()+CmiMsgHeaderSizeBytes;
+               int totalSize = psizer.size() + sizeof(DistributeObjectMsg);
                char *msg = (char *)CmiAlloc(totalSize);
-               char *buf = &msg[CmiMsgHeaderSizeBytes];
+               DistributeObjectMsg *distributeMsg = (DistributeObjectMsg *)msg;
+               distributeMsg->PE = CkMyPe();
+               char *buf = &msg[sizeof(DistributeObjectMsg)];
                PUP::toMem pmem(buf);
                pmem.becomeDeleting();
                pupLocation(loc,pmem);
@@ -2955,7 +2956,9 @@ void distributeRestartedObjects(){
  */
 void _distributedLocationHandler(char *receivedMsg){
        printf("Array element received at processor %d after distribution at restart\n",CkMyPe());
-       char *buf = &receivedMsg[CmiMsgHeaderSizeBytes];
+       DistributeObjectMsg *distributeMsg = (DistributeObjectMsg *)receivedMsg;
+       int sourcePE = distributeMsg->PE;
+       char *buf = &receivedMsg[sizeof(DistributeObjectMsg)];
        PUP::fromMem pmem(buf);
        CkGroupID gID;
        CkArrayIndexMax idx;
@@ -2974,6 +2977,7 @@ void _distributedLocationHandler(char *receivedMsg){
 
        // adding object to the list of immigrant recovery objects
        CpvAccess(_immigrantRecObjs)->push_back((CkLocRec_local *)rec);
+       CpvAccess(_numImmigrantRecObjs)++;
        
        CkVec<CkMigratable *> eltList;
        mgr->migratableList((CkLocRec_local *)rec,eltList);
@@ -2981,6 +2985,7 @@ void _distributedLocationHandler(char *receivedMsg){
                if(eltList[i]->mlogData->toResumeOrNot == 1 && eltList[i]->mlogData->resumeCount < globalResumeCount){
                        CpvAccess(_currentObj) = eltList[i];
                        eltList[i]->mlogData->immigrantRecFlag = 1;
+                       eltList[i]->mlogData->immigrantSourcePE = sourcePE;
                        eltList[i]->ResumeFromSync();
                }
        }
index f3685c8c2f432d6248da3afd2d6eec25a7ac68ee..2af0f77ce8b8fd20db95d86cb1fd29224a1865fa 100644 (file)
@@ -10,6 +10,7 @@
 #endif
 
 CpvExtern(Chare *,_currentObj);
+CpvExtern(int, _numImmigrantRecObjs);
 
 //states of a ticket sent as a reply to a request
 #define NEW_TICKET 1
@@ -208,6 +209,7 @@ public:
        int toResumeOrNot;
        int resumeCount;
        int immigrantRecFlag;
+       int immigrantSourcePE;
 
 private:
 
@@ -282,16 +284,6 @@ public:
        virtual void pup(PUP::er &p);
 };
 
-/**
- * @brief Class that represents the location of an array element.
- */
-class LocationID{
-public:
-       CkArrayIndexMax idx;
-       CkGroupID gid;
-       int PE;
-};
-
 /**
  * @brief
  */
@@ -365,6 +357,12 @@ typedef struct{
        int dataSize;
 } CheckPointDataMsg;
 
+typedef struct{
+    char header[CmiMsgHeaderSizeBytes];
+    int PE;
+} DistributeObjectMsg;
+
+
 /*typedef struct{
        char header[CmiMsgHeaderSizeBytes];
        int PE;
index 55406d222b6718806f6bbc4ccd68133bafef05c9..e32561922cae962a73cc32abe03d1e201e865062 100644 (file)
@@ -386,11 +386,6 @@ void CkReductionMgr::contributorArriving(contributorInfo *ci)
 // Each contributor must contribute exactly once to the each reduction.
 void CkReductionMgr::contribute(contributorInfo *ci,CkReductionMsg *m)
 {
-#if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
-    Chare *oldObj =CpvAccess(_currentObj);
-    CpvAccess(_currentObj) = this;
-#endif
-
 #if CMK_BIGSIM_CHARM
   _TRACE_BG_TLINE_END(&(m->log));
 #endif
@@ -400,56 +395,34 @@ void CkReductionMgr::contribute(contributorInfo *ci,CkReductionMsg *m)
   m->sourceFlag=-1;//A single contribution
   m->gcount=0;
 
-#if (defined(_FAULT_MLOG_) && _MLOG_REDUCE_P2P_)
-    if(lcount == 0){
-        m->sourceProcessorCount = 1;
-    }else{
-        m->sourceProcessorCount = lcount;
-    }
-    m->fromPE = CmiMyPe();
-    Chare *oldObj =CpvAccess(_currentObj);
-    char currentObjName[100];
-    DEBR(("[%d] contribute called with currentObj %s redNo %d lcount %d\n",CkMyPe(),oldObj->mlogData->objID.toString(currentObjName),m->redNo,lcount));
-    thisProxy[0].contributeViaMessage(m);
-#else
-  addContribution(m);
-#endif
+#if defined(_FAULT_CAUSAL_)
+
+       // if object is an immigrant recovery object, we send the contribution to the source PE
+       if(CpvAccess(_currentObj)->mlogData->immigrantRecFlag){
+               //DELETE CkPrintf("[%d] Sending contribution via message %d\n",CkMyPe(),m->redNo);
+       thisProxy[CpvAccess(_currentObj)->mlogData->immigrantSourcePE].contributeViaMessage(m);
+               return;
+       }
+
+    Chare *oldObj = CpvAccess(_currentObj);
+    CpvAccess(_currentObj) = this;
+
+       // adding contribution
+       //DELETE if(CkMyPe() == 4) CkPrintf("[%d] Adding local contribution %d\n",CkMyPe(),m->redNo);
+       addContribution(m);
 
-#if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
     CpvAccess(_currentObj) = oldObj;
+#else
+  addContribution(m);
 #endif
 }
 
-#if (defined(_FAULT_MLOG_) && _MLOG_REDUCE_P2P_)
+#if defined(_FAULT_CAUSAL_)
 void CkReductionMgr::contributeViaMessage(CkReductionMsg *m){
-    CmiAssert(CmiMyPe() == 0);
-    DEBR(("[%d] contributeViaMessage fromPE %d\n",CmiMyPe(),m->fromPE));
-    if(redNo == 0){
-        if(perProcessorCounts[m->fromPE] == -1){
-            processorCount++;
-            perProcessorCounts[m->fromPE] = m->sourceProcessorCount;
-            DEBR(("[%d] Group %d processorCount %d fromPE %d sourceProcessorCount %d\n",CmiMyPe(),thisgroup.idx,processorCount,m->fromPE,m->sourceProcessorCount));
-        }else{
-            if(perProcessorCounts[m->fromPE] < m->sourceProcessorCount){
-                DEBR(("[%d] Group %d processorCount %d fromPE %d sourceProcessorCount %d\n",CmiMyPe(),thisgroup.idx,processorCount,m->fromPE,m->sourceProcessorCount));
-                perProcessorCounts[m->fromPE] = m->sourceProcessorCount;
-            }
-        }
-        if(processorCount == CmiNumPes()){
-            totalCount = 0;
-            for(int i=0;i<CmiNumPes();i++){
-                CmiAssert(perProcessorCounts[i] != -1);
-                totalCount += perProcessorCounts[i];
-            }
-            DEBR(("[%d] Group %d totalCount %d\n",CmiMyPe(),thisgroup.idx,totalCount));
-        }
-        if(m->sourceProcessorCount == 0){
-            if(processorCount == CmiNumPes()){
-                finishReduction();
-            }
-            return;
-        }
-    }
+
+       //DELETE CkPrintf("[%d] Receiving contribution via message %d %d\n",CkMyPe(),m->redNo,redNo);
+       
+       // adding contribution
     addContribution(m);
 }
 #else
@@ -681,40 +654,24 @@ void CkReductionMgr::finishReduction(void)
        return;
   }
   //CkPrintf("[%d]finishReduction called for redNo %d with nContrib %d at %.6f\n",CkMyPe(),redNo, nContrib,CmiWallTimer());
-#if (!defined(_FAULT_MLOG_) || !_MLOG_REDUCE_P2P_)
-
+#if (defined(_FAULT_CAUSAL_))
+       //DELETE CkPrintf("[%d] finishReduction %d %d %d %d\n",CkMyPe(),nContrib,(lcount+adj(redNo).lcount),nRemote,treeKids()); 
+       //CODING
+       if (nContrib<(lcount+adj(redNo).lcount)-CpvAccess(_numImmigrantRecObjs)){
+        DEBR((AA"Need more local messages %d %d\n"AB,nContrib,(lcount+adj(redNo).lcount)));
+               return;//Need more local messages
+       }
+#else
   if (nContrib<(lcount+adj(redNo).lcount)){
          DEBR((AA"Need more local messages %d %d\n"AB,nContrib,(lcount+adj(redNo).lcount)));
         return;//Need more local messages
   }
+#endif
+
 #if GROUP_LEVEL_REDUCTION
   if (nRemote<treeKids()) return;//Need more remote messages
 #endif
  
-#else
-
-    if(CkMyPe() != 0){
-        if(redNo != 0){
-            return;
-        }else{
-            CmiAssert(lcount == 0);
-            CkReductionMsg *dummy = reduceMessages();
-            dummy->fromPE = CmiMyPe();
-            dummy->sourceProcessorCount = 0;
-            thisProxy[0].contributeViaMessage(dummy);
-            return;
-        }
-    }
-    if (CkMyPe() == 0 && nContrib<numberReductionMessages()){
-        DEBR((AA"Need more messages %d %d\n"AB,nContrib,numberReductionMessages()));
-         return;//Need more local messages
-  }else{
-        DEBR(("[%d] Group %d nContrib %d numberReductionMessages() %d totalCount %d\n",CmiMyPe(),thisgroup.idx,nContrib,numberReductionMessages(),totalCount));
-    }
-
-
-#endif
-
   DEBR((AA"Reducing data... %d %d\n"AB,nContrib,(lcount+adj(redNo).lcount)));
   CkReductionMsg *result=reduceMessages();
   result->redNo=redNo;
@@ -831,6 +788,7 @@ void CkReductionMgr::finishReduction(void)
   }
  
 #endif
+
 }
 
 //Sent up the reduction tree with reduced data