ckmulticast: code docs and pretty indent for recvRedMsg()
authorRamprasad Venkataraman <ramv@illinois.edu>
Fri, 29 Oct 2010 20:42:33 +0000 (15:42 -0500)
committerRamprasad Venkataraman <ramv@illinois.edu>
Fri, 19 Nov 2010 15:42:53 +0000 (09:42 -0600)
src/libs/ck-libs/multicast/ckmulticast.C
src/libs/ck-libs/multicast/ckmulticast.h

index 879c33f24dfee433ec272aebb54d77439b3a8631..f19f296c4c4291d615de8b4c805e26e697c2a53f 100644 (file)
@@ -1188,127 +1188,132 @@ void CkMulticastMgr::reduceFragment (int index, CkSectionInfo& id,
   }
 }
 
   }
 }
 
+
+//
 void CkMulticastMgr::recvRedMsg(CkReductionMsg *msg)
 {
 void CkMulticastMgr::recvRedMsg(CkReductionMsg *msg)
 {
-  int i;
-  CkSectionInfo id = msg->sid;
-  mCastEntry *entry = (mCastEntry *)id.get_val();
-  CmiAssert(entry!=NULL);
-//CmiPrintf("[%d] recvRedMsg: entry: %p\n", CkMyPe(), entry);
-
-  CProxy_CkMulticastMgr  mCastGrp(thisgroup);
+    int i;
+    /// Grab the section info embedded in the redn msg
+    CkSectionInfo id = msg->sid;
+    /// ... and get at the ptr which shows me which cookie to use
+    mCastEntry *entry = (mCastEntry *)id.get_val();
+    CmiAssert(entry!=NULL);
 
 
-  int updateReduceNo = 0;
-
-  // update entry if obsolete
-  if (entry->isObsolete()) {
-      // send up to root
-    DEBUGF(("[%d] entry obsolete-send to root %d\n", CkMyPe(), entry->rootSid.pe));
-    if (!entry->hasParent()) { //rootSid.pe == CkMyPe()
-      // I am root, set to the new cookie if there is
-      mCastEntry *newentry = entry->newc;
-      while (newentry && newentry->newc) newentry=newentry->newc;
-      if (newentry) entry = newentry;
-      CmiAssert(entry!=NULL);
-    }
-    if (!entry->hasParent() && !entry->isObsolete()) {
-       // root find the latest cookie that is not obsolete
-      msg->sourceFlag = 0;      // indicate it is not on old spanning tree
-      updateReduceNo = 1;        // reduce from old tree, new entry need update.
-    }
-    else {
-      CmiAssert(entry->rootSid.get_pe() != CkMyPe() || entry->rootSid.get_val() != entry);
-      // entry is obsolete, send to root directly
-      msg->sid = entry->rootSid;
+    CProxy_CkMulticastMgr  mCastGrp(thisgroup);
 
 
-      msg->sourceFlag = 0;
-      mCastGrp[entry->rootSid.get_pe()].recvRedMsg(msg);
-      return;
-    }
-  }
+    int updateReduceNo = 0;
 
 
-  reductionInfo &redInfo = entry->red;
+    //-------------------------------------------------------------------------
+    /// If this cookie is obsolete
+    if (entry->isObsolete()) {
+        // Send up to root
+        DEBUGF(("[%d] ckmulticast: section cookie obsolete. Will send to root %d\n", CkMyPe(), entry->rootSid.pe));
 
 
-  DEBUGF(("[%d] msg %p red:%d, entry:%p redno:%d\n", CkMyPe(), msg, msg->redNo, entry, entry->red.redNo));
-  // old message come, ignore
-  if (msg->redNo < redInfo.redNo) {
-    CmiPrintf("[%d] msg redNo:%d, msg:%p, entry:%p redno:%d\n", CkMyPe(), msg->redNo, msg, entry, redInfo.redNo);
-    CmiAbort("Could never happen! \n");
-  }
-  if (entry->notReady() || msg->redNo > redInfo.redNo) {
-    DEBUGF(("[%d] Future redmsgs, buffered! msg:%p entry:%p ready:%d msg red:%d sys redno:%d\n", CkMyPe(), msg, entry, entry->notReady(), msg->redNo, redInfo.redNo));
-    redInfo.futureMsgs.push_back(msg);
-    return;
-  }
+        /// If I am the root, traverse the linked list of cookies to get the latest
+        if (!entry->hasParent()) {
+            mCastEntry *newentry = entry->newc;
+            while (newentry && newentry->newc) newentry=newentry->newc;
+            if (newentry) entry = newentry;
+            CmiAssert(entry!=NULL);
+        }
 
 
-  DEBUGF(("[%d] recvRedMsg rebuilt:%d red:%d\n", CkMyPe(), msg->rebuilt, redInfo.redNo));
+        ///
+        if (!entry->hasParent() && !entry->isObsolete()) {
+            /// Indicate it is not on old spanning tree
+            msg->sourceFlag = 0;
+            /// Flag the redn as coming from an old tree and that the new entry cookie needs an update.
+            updateReduceNo  = 1;
+        }
+        /// If I am not the root or this latest cookie is also obsolete
+        else {
+            // Ensure that you're here with reason
+            CmiAssert(entry->rootSid.get_pe() != CkMyPe() || entry->rootSid.get_val() != entry);
+            // Edit the msg so that the recipient knows where to find its cookie
+            msg->sid = entry->rootSid;
+            msg->sourceFlag = 0;
+            // Send the msg directly to the root of the redn tree
+            mCastGrp[entry->rootSid.get_pe()].recvRedMsg(msg);
+            return;
+        }
+    }
 
 
-  const int index = msg->fragNo;
+    /// Grab the locally stored redn info
+    reductionInfo &redInfo = entry->red;
 
 
-  // buffer this msg
-  if (msg->sourceFlag == 1) {
-    // new reduction message from ArrayElement
-    redInfo.lcount [index] ++;
-  }
+    //-------------------------------------------------------------------------
+    /// If you've received a msg from a previous redn, something has gone horribly wrong somewhere!
+    if (msg->redNo < redInfo.redNo) {
+        CmiPrintf("[%d] msg redNo:%d, msg:%p, entry:%p redno:%d\n", CkMyPe(), msg->redNo, msg, entry, redInfo.redNo);
+        CmiAbort("Could never happen! \n");
+    }
 
 
-  if (msg->sourceFlag == 2) {
-    redInfo.ccount [index] ++;
-  }
+    //-------------------------------------------------------------------------
+    /// If the current tree is not yet ready or if you've received a msg for a future redn, buffer the msg
+    if (entry->notReady() || msg->redNo > redInfo.redNo) {
+        DEBUGF(("[%d] Future redmsgs, buffered! msg:%p entry:%p ready:%d msg red:%d sys redno:%d\n", CkMyPe(), msg, entry, entry->notReady(), msg->redNo, redInfo.redNo));
+        redInfo.futureMsgs.push_back(msg);
+        return;
+    }
 
 
-  redInfo.gcount [index] += msg->gcount;
+    //-------------------------------------------------------------------------
+    const int index = msg->fragNo;
+    // New contribution from an ArrayElement
+    if (msg->sourceFlag == 1) {
+        redInfo.lcount [index] ++;
+    }
+    // Redn from a child
+    if (msg->sourceFlag == 2) {
+        redInfo.ccount [index] ++;
+    }
+    // Total elems that have contributed the indexth fragment
+    redInfo.gcount [index] += msg->gcount;
 
 
-  // buffer the msg
-  // first check if message is of proper size
-  if ((0 != redInfo.msgs[index].length()) && 
-      (msg->dataSize != (redInfo.msgs [index][0]->dataSize))) {
+    // Check if message is of proper size
+    if ((0 != redInfo.msgs[index].length()) && (msg->dataSize != (redInfo.msgs [index][0]->dataSize)))
     CmiAbort("Reduction data are not of same length!");
     CmiAbort("Reduction data are not of same length!");
-  }
 
 
-  redInfo.msgs [index].push_back(msg);
-
-  const int numFragsRcvd = redInfo.msgs [index].length();
-
-  DEBUGF(("[%d] index:%d lcount:%d-%d, ccount:%d-%d, gcount:%d-%d root:%d\n", CkMyPe(),index, entry->red.lcount[index],entry->localElem.length(), entry->red.ccount[index], entry->children.length(), entry->red.gcount[index], entry->allElem.length(), !entry->hasParent()));
-
-  int currentTreeUp = 0;
-  if (redInfo.lcount [index] == entry->localElem.length() &&
-      redInfo.ccount [index] == entry->children.length())
-      currentTreeUp = 1;
+    //-------------------------------------------------------------------------
+    // Buffer the msg
+    redInfo.msgs [index].push_back(msg);
+
+    //-------------------------------------------------------------------------
+    /// Flag if this fragment can be reduced (if all local elements and children have contributed this fragment)
+    int currentTreeUp = 0;
+    if (redInfo.lcount [index] == entry->localElem.length() && redInfo.ccount [index] == entry->children.length())
+        currentTreeUp = 1;
+
+    /// Flag (only at the redn root) if all array elements contributed all their fragments
+    int mixTreeUp = 0;
+    if (!entry->hasParent()) {
+        mixTreeUp = 1;
+        for (int i=0; i<msg->nFrags; i++)
+            if (entry->allElem.length() != redInfo.gcount [i])
+                mixTreeUp = 0;
+    }
 
 
-  int mixTreeUp = 0;
-  const int numElems = entry->allElem.length();
-  
-  if (!entry->hasParent()) {
-    mixTreeUp = 1;
-    for (int i=0; i<msg->nFrags; i++) {
-      if (entry->allElem.length() != redInfo.gcount [i]) {
-        mixTreeUp = 0;
-      }
+    //-------------------------------------------------------------------------
+    /// If this fragment can be reduced, or if I am the root and have received all fragments from all elements
+    if (currentTreeUp || mixTreeUp)
+    {
+        const int nFrags = msg->nFrags;
+        /// Reduce this fragment
+        reduceFragment (index, id, entry, redInfo, updateReduceNo, currentTreeUp);
+        /// Reset counters if you have processed all fragments
+        if (redInfo.npProcessed == nFrags) {
+            for (i=0; i<nFrags; i++) {
+                redInfo.lcount [i] = 0;
+                redInfo.ccount [i] = 0;
+                redInfo.gcount [i] = 0;
+            }
+            redInfo.npProcessed = 0;
+            /// Now that, the current redn is done, release any pending msgs from future redns
+            releaseFutureReduceMsgs(entry);
+        }
     }
     }
-  }
+}
 
 
-  if (currentTreeUp || mixTreeUp)
-  {
-    const int nFrags = msg->nFrags;  
-    
-    // msg from children contain only one fragment
-    reduceFragment (index, id, entry, redInfo, updateReduceNo, 
-                    currentTreeUp);
 
 
-    if (redInfo.npProcessed == nFrags) {
-      // reset counters
-      for (i=0; i<nFrags; i++) {
-        redInfo.lcount [i] = 0;
-        redInfo.ccount [i] = 0;
-        redInfo.gcount [i] = 0;
-      }
-      redInfo.npProcessed = 0;
 
 
-      // release future msgs
-      releaseFutureReduceMsgs(entry);
-    }
-  }
-}
 
 void CkMulticastMgr::releaseFutureReduceMsgs(mCastEntryPtr entry)
 {
 
 void CkMulticastMgr::releaseFutureReduceMsgs(mCastEntryPtr entry)
 {
index 5b5dbe3f3a5f2768ee4efea4815069ebe0d0aee7..1facc35613ee7514783db4b430053818ee89d3b4 100644 (file)
@@ -109,7 +109,7 @@ class CkMulticastMgr: public CkDelegateMgr
         /// entry
         void recvPacket(CkSectionInfo &_cookie, int n, char *data, int seqno, int count, int totalsize, int fromBuffer);
         // ------------------------- Reductions ------------------------
         /// entry
         void recvPacket(CkSectionInfo &_cookie, int n, char *data, int seqno, int count, int totalsize, int fromBuffer);
         // ------------------------- Reductions ------------------------
-        /// entry
+        /// entry Accept a redn msg from a child in the spanning tree
         void recvRedMsg(CkReductionMsg *msg);
         /// entry
         void updateRedNo(mCastEntryPtr, int red);
         void recvRedMsg(CkReductionMsg *msg);
         /// entry
         void updateRedNo(mCastEntryPtr, int red);