Adding support for collectives and parallel recovery.
[charm.git] / src / ck-core / ckreduction.C
index 55406d222b6718806f6bbc4ccd68133bafef05c9..e32561922cae962a73cc32abe03d1e201e865062 100644 (file)
@@ -386,11 +386,6 @@ void CkReductionMgr::contributorArriving(contributorInfo *ci)
 // Each contributor must contribute exactly once to the each reduction.
 void CkReductionMgr::contribute(contributorInfo *ci,CkReductionMsg *m)
 {
-#if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
-    Chare *oldObj =CpvAccess(_currentObj);
-    CpvAccess(_currentObj) = this;
-#endif
-
 #if CMK_BIGSIM_CHARM
   _TRACE_BG_TLINE_END(&(m->log));
 #endif
@@ -400,56 +395,34 @@ void CkReductionMgr::contribute(contributorInfo *ci,CkReductionMsg *m)
   m->sourceFlag=-1;//A single contribution
   m->gcount=0;
 
-#if (defined(_FAULT_MLOG_) && _MLOG_REDUCE_P2P_)
-    if(lcount == 0){
-        m->sourceProcessorCount = 1;
-    }else{
-        m->sourceProcessorCount = lcount;
-    }
-    m->fromPE = CmiMyPe();
-    Chare *oldObj =CpvAccess(_currentObj);
-    char currentObjName[100];
-    DEBR(("[%d] contribute called with currentObj %s redNo %d lcount %d\n",CkMyPe(),oldObj->mlogData->objID.toString(currentObjName),m->redNo,lcount));
-    thisProxy[0].contributeViaMessage(m);
-#else
-  addContribution(m);
-#endif
+#if defined(_FAULT_CAUSAL_)
+
+       // if object is an immigrant recovery object, we send the contribution to the source PE
+       if(CpvAccess(_currentObj)->mlogData->immigrantRecFlag){
+               //DELETE CkPrintf("[%d] Sending contribution via message %d\n",CkMyPe(),m->redNo);
+       thisProxy[CpvAccess(_currentObj)->mlogData->immigrantSourcePE].contributeViaMessage(m);
+               return;
+       }
+
+    Chare *oldObj = CpvAccess(_currentObj);
+    CpvAccess(_currentObj) = this;
+
+       // adding contribution
+       //DELETE if(CkMyPe() == 4) CkPrintf("[%d] Adding local contribution %d\n",CkMyPe(),m->redNo);
+       addContribution(m);
 
-#if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
     CpvAccess(_currentObj) = oldObj;
+#else
+  addContribution(m);
 #endif
 }
 
-#if (defined(_FAULT_MLOG_) && _MLOG_REDUCE_P2P_)
+#if defined(_FAULT_CAUSAL_)
 void CkReductionMgr::contributeViaMessage(CkReductionMsg *m){
-    CmiAssert(CmiMyPe() == 0);
-    DEBR(("[%d] contributeViaMessage fromPE %d\n",CmiMyPe(),m->fromPE));
-    if(redNo == 0){
-        if(perProcessorCounts[m->fromPE] == -1){
-            processorCount++;
-            perProcessorCounts[m->fromPE] = m->sourceProcessorCount;
-            DEBR(("[%d] Group %d processorCount %d fromPE %d sourceProcessorCount %d\n",CmiMyPe(),thisgroup.idx,processorCount,m->fromPE,m->sourceProcessorCount));
-        }else{
-            if(perProcessorCounts[m->fromPE] < m->sourceProcessorCount){
-                DEBR(("[%d] Group %d processorCount %d fromPE %d sourceProcessorCount %d\n",CmiMyPe(),thisgroup.idx,processorCount,m->fromPE,m->sourceProcessorCount));
-                perProcessorCounts[m->fromPE] = m->sourceProcessorCount;
-            }
-        }
-        if(processorCount == CmiNumPes()){
-            totalCount = 0;
-            for(int i=0;i<CmiNumPes();i++){
-                CmiAssert(perProcessorCounts[i] != -1);
-                totalCount += perProcessorCounts[i];
-            }
-            DEBR(("[%d] Group %d totalCount %d\n",CmiMyPe(),thisgroup.idx,totalCount));
-        }
-        if(m->sourceProcessorCount == 0){
-            if(processorCount == CmiNumPes()){
-                finishReduction();
-            }
-            return;
-        }
-    }
+
+       //DELETE CkPrintf("[%d] Receiving contribution via message %d %d\n",CkMyPe(),m->redNo,redNo);
+       
+       // adding contribution
     addContribution(m);
 }
 #else
@@ -681,40 +654,24 @@ void CkReductionMgr::finishReduction(void)
        return;
   }
   //CkPrintf("[%d]finishReduction called for redNo %d with nContrib %d at %.6f\n",CkMyPe(),redNo, nContrib,CmiWallTimer());
-#if (!defined(_FAULT_MLOG_) || !_MLOG_REDUCE_P2P_)
-
+#if (defined(_FAULT_CAUSAL_))
+       //DELETE CkPrintf("[%d] finishReduction %d %d %d %d\n",CkMyPe(),nContrib,(lcount+adj(redNo).lcount),nRemote,treeKids()); 
+       //CODING
+       if (nContrib<(lcount+adj(redNo).lcount)-CpvAccess(_numImmigrantRecObjs)){
+        DEBR((AA"Need more local messages %d %d\n"AB,nContrib,(lcount+adj(redNo).lcount)));
+               return;//Need more local messages
+       }
+#else
   if (nContrib<(lcount+adj(redNo).lcount)){
          DEBR((AA"Need more local messages %d %d\n"AB,nContrib,(lcount+adj(redNo).lcount)));
         return;//Need more local messages
   }
+#endif
+
 #if GROUP_LEVEL_REDUCTION
   if (nRemote<treeKids()) return;//Need more remote messages
 #endif
  
-#else
-
-    if(CkMyPe() != 0){
-        if(redNo != 0){
-            return;
-        }else{
-            CmiAssert(lcount == 0);
-            CkReductionMsg *dummy = reduceMessages();
-            dummy->fromPE = CmiMyPe();
-            dummy->sourceProcessorCount = 0;
-            thisProxy[0].contributeViaMessage(dummy);
-            return;
-        }
-    }
-    if (CkMyPe() == 0 && nContrib<numberReductionMessages()){
-        DEBR((AA"Need more messages %d %d\n"AB,nContrib,numberReductionMessages()));
-         return;//Need more local messages
-  }else{
-        DEBR(("[%d] Group %d nContrib %d numberReductionMessages() %d totalCount %d\n",CmiMyPe(),thisgroup.idx,nContrib,numberReductionMessages(),totalCount));
-    }
-
-
-#endif
-
   DEBR((AA"Reducing data... %d %d\n"AB,nContrib,(lcount+adj(redNo).lcount)));
   CkReductionMsg *result=reduceMessages();
   result->redNo=redNo;
@@ -831,6 +788,7 @@ void CkReductionMgr::finishReduction(void)
   }
  
 #endif
+
 }
 
 //Sent up the reduction tree with reduced data