ckmulticast: more code docs.
authorRamprasad Venkataraman <ramv@illinois.edu>
Thu, 11 Nov 2010 17:23:09 +0000 (11:23 -0600)
committerRamprasad Venkataraman <ramv@illinois.edu>
Fri, 19 Nov 2010 15:42:53 +0000 (09:42 -0600)
src/libs/ck-libs/multicast/ckmulticast.C
src/libs/ck-libs/multicast/ckmulticast.h

index f19f296c4c4291d615de8b4c805e26e697c2a53f..e217feaceb236e6a1790f864b2c123de687e69ad 100644 (file)
@@ -898,8 +898,8 @@ void CkMulticastMgr::recvMsg(multicastGrpMsg *msg)
   }
 }
 
-// user function
-// to retrieve section info from a multicast msg
+
+
 void CkGetSectionInfo(CkSectionInfo &id, void *msg)
 {
   CkMcastBaseMsg *m = (CkMcastBaseMsg *)msg;
@@ -1086,109 +1086,115 @@ CkReductionMsg* CkMulticastMgr::combineFrags (CkSectionInfo& id,
   return msg;
 }
 
-void CkMulticastMgr::reduceFragment (int index, CkSectionInfo& id,
-                                     mCastEntry* entry, reductionInfo& redInfo,
-                                     int& updateReduceNo, int currentTreeUp){
-
-  CProxy_CkMulticastMgr  mCastGrp(thisgroup);
-  reductionMsgs& rmsgs = redInfo.msgs[index];
-  int dataSize         = rmsgs[0]->dataSize;
-  CkReduction::reducerType reducer = rmsgs[0]->reducer;
-  int i;
-  int oldRedNo = redInfo.redNo;
-  int nFrags   = rmsgs[0]->nFrags;
-  int fragNo   = rmsgs[0]->fragNo;
-  int userFlag   = rmsgs[0]->userFlag;
-                                                                                
-  // reduce msgs
-  CkReduction::reducerFn f= CkReduction::reducerTable[reducer];
-  CkAssert(NULL != f);
-
-  // check valid callback in msg and check if migration happened
-  CkCallback msg_cb;
-  int rebuilt = 0;
-  for (i=0; i<rmsgs.length(); i++) {
-    if (rmsgs[i]->rebuilt) rebuilt = 1;
-    if (!rmsgs[i]->callback.isInvalid()) msg_cb = rmsgs[i]->callback;
-  }
 
-  CkReductionMsg *newmsg = (*f)(rmsgs.length(), rmsgs.getVec()); 
-  newmsg->redNo  = redInfo.redNo;
-  newmsg->nFrags = nFrags;
-  newmsg->fragNo = fragNo;
-  newmsg->userFlag = userFlag;
-  newmsg->reducer = reducer;
 
-  // increment num-frags processed
-  redInfo.npProcessed ++;
+void CkMulticastMgr::reduceFragment (int index, CkSectionInfo& id,
+                                     mCastEntry* entry, reductionInfo& redInfo,
+                                     int& updateReduceNo, int currentTreeUp) {
 
-  // check if migration and free messages
-  for (i=0; i<rmsgs.length(); i++) {
-    if (rmsgs[i]!=newmsg) delete rmsgs[i];
-  }
-  rmsgs.length() = 0;
+    CProxy_CkMulticastMgr  mCastGrp(thisgroup);
+    reductionMsgs& rmsgs = redInfo.msgs[index];
+    int dataSize         = rmsgs[0]->dataSize;
+    int i;
+    int oldRedNo = redInfo.redNo;
+    int nFrags   = rmsgs[0]->nFrags;
+    int fragNo   = rmsgs[0]->fragNo;
+    int userFlag = rmsgs[0]->userFlag;
+
+    // Figure out (from one of the msg fragments) which reducer function to use
+    CkReduction::reducerType reducer = rmsgs[0]->reducer;
+    CkReduction::reducerFn f= CkReduction::reducerTable[reducer];
+    CkAssert(NULL != f);
+
+    // Check if migration occurred in any of the subtrees, and pick one valid callback
+    CkCallback msg_cb;
+    int rebuilt = 0;
+    for (i=0; i<rmsgs.length(); i++) {
+        if (rmsgs[i]->rebuilt) rebuilt = 1;
+        if (!rmsgs[i]->callback.isInvalid()) msg_cb = rmsgs[i]->callback;
+    }
 
-  if (redInfo.npProcessed == nFrags) {
-    entry->incReduceNo();
-    DEBUGF(("Advanced entry:%p redNo: %d\n", entry, entry->red.redNo));
-  }
-  if (updateReduceNo) mCastGrp[CkMyPe()].updateRedNo(entry, redInfo.redNo);
-                                                                                
-  if (entry->hasParent()) {
-    // send up to parent
-    newmsg->sid        = entry->parentGrp;
-    newmsg->sourceFlag = 2;
-    newmsg->redNo      = oldRedNo;
-    newmsg->gcount     = redInfo.gcount [index];
-    newmsg->rebuilt    = rebuilt;
-    newmsg->callback   = msg_cb;
-    DEBUGF(("send to parent %p: %d\n", entry->parentGrp.get_val(), entry->parentGrp.get_pe()));
-    mCastGrp[entry->parentGrp.get_pe()].recvRedMsg(newmsg);
-  } else { // root
-    newmsg->sid = id;
-    // buffer message
-    rmsgs.push_back (newmsg);
-
-    //if (entry->allElem.length() == redInfo.gcount) {
-    if (redInfo.npProcessed == nFrags) {
-
-      newmsg = combineFrags (id, entry, redInfo);
-      CkSetRefNum(newmsg, userFlag);
-
-      if (!msg_cb.isInvalid()) {
-        msg_cb.send(newmsg);
-      }
-      else if (redInfo.storedCallback != NULL) {
-        redInfo.storedCallback->send(newmsg);
-      }
-      else if (redInfo.storedClient != NULL) {
-        redInfo.storedClient(id, redInfo.storedClientParam, dataSize,
-           newmsg->data);
-        delete newmsg;
-      }
-      else
-        CmiAbort("Did you forget to register a reduction client?");
-                                                                                
-      DEBUGF(("Reduction client called - currentTreeUp: %d entry:%p oldc: %p\n", currentTreeUp, entry, entry->oldc));
-      if (currentTreeUp) {
-        if (entry->oldc) {
-            // free old tree on same processor;
-          mCastGrp[CkMyPe()].freeup(CkSectionInfo(id.get_pe(), entry->oldc, 0, entry->getAid()));
-          entry->oldc = NULL;
-        }
-        if (entry->hasOldtree()) {
-            // free old tree on old processor
-          int oldpe = entry->oldtree.pe;
-          mCastGrp[oldpe].freeup(CkSectionInfo(oldpe, entry->oldtree.entry, 0, entry->getAid()));
-          entry->oldtree.clear();
+    // Perform the actual reduction
+    CkReductionMsg *newmsg = (*f)(rmsgs.length(), rmsgs.getVec());
+    newmsg->redNo  = redInfo.redNo;
+    newmsg->nFrags = nFrags;
+    newmsg->fragNo = fragNo;
+    newmsg->userFlag = userFlag;
+    newmsg->reducer = reducer;
+
+    // Increment the number of fragments processed
+    redInfo.npProcessed ++;
+
+    // Delete all the fragments which are no longer needed
+    for (i=0; i<rmsgs.length(); i++)
+        if (rmsgs[i]!=newmsg) delete rmsgs[i];
+    rmsgs.length() = 0;
+
+    // If all the fragments for the current reduction have been processed
+    if (redInfo.npProcessed == nFrags)
+        entry->incReduceNo();
+
+    // If migration happened, and my sub-tree reconstructed itself,
+    // share the current reduction number with myself and all my children
+    if (updateReduceNo)
+        mCastGrp[CkMyPe()].updateRedNo(entry, redInfo.redNo);
+
+    // If I am not the tree root
+    if (entry->hasParent()) {
+        // send up to parent
+        newmsg->sid        = entry->parentGrp;
+        newmsg->sourceFlag = 2;
+        newmsg->redNo      = oldRedNo; ///< @todo: redundant, duplicate assignment?
+        newmsg->gcount     = redInfo.gcount [index];
+        newmsg->rebuilt    = rebuilt;
+        newmsg->callback   = msg_cb;
+        DEBUGF(("[%d] ckmulticast: send %p to parent %d\n", CkMyPe(), entry->parentGrp.get_val(), entry->parentGrp.get_pe()));
+        mCastGrp[entry->parentGrp.get_pe()].recvRedMsg(newmsg);
+    } else {
+        newmsg->sid = id;
+        // Buffer the reduced fragment
+        rmsgs.push_back (newmsg);
+        // If all the fragments have been reduced
+        if (redInfo.npProcessed == nFrags) {
+            // Combine the fragments
+            newmsg = combineFrags (id, entry, redInfo);
+            // Set the reference number based on the user flag at the contribute call
+            CkSetRefNum(newmsg, userFlag);
+            // Trigger the appropriate reduction client
+            if ( !msg_cb.isInvalid() )
+                msg_cb.send(newmsg);
+            else if (redInfo.storedCallback != NULL)
+                redInfo.storedCallback->send(newmsg);
+            else if (redInfo.storedClient != NULL) {
+                redInfo.storedClient(id, redInfo.storedClientParam, dataSize, newmsg->data);
+                delete newmsg;
+            }
+            else
+                CmiAbort("Did you forget to register a reduction client?");
+
+            DEBUGF(("ckmulticast: redn client called - currentTreeUp: %d entry:%p oldc: %p\n", currentTreeUp, entry, entry->oldc));
+            //
+            if (currentTreeUp) {
+                if (entry->oldc) {
+                    // free old tree on same processor;
+                    mCastGrp[CkMyPe()].freeup(CkSectionInfo(id.get_pe(), entry->oldc, 0, entry->getAid()));
+                    entry->oldc = NULL;
+                }
+                if (entry->hasOldtree()) {
+                    // free old tree on old processor
+                    int oldpe = entry->oldtree.pe;
+                    mCastGrp[oldpe].freeup(CkSectionInfo(oldpe, entry->oldtree.entry, 0, entry->getAid()));
+                    entry->oldtree.clear();
+                }
+            }
+            // Indicate if a tree rebuild is required
+            if (rebuilt && !entry->needRebuild) entry->needRebuild = 1;
         }
-      }
-      if (rebuilt && !entry->needRebuild) entry->needRebuild = 1;
     }
-  }
 }
 
 
+
 //
 void CkMulticastMgr::recvRedMsg(CkReductionMsg *msg)
 {
@@ -1221,7 +1227,7 @@ void CkMulticastMgr::recvRedMsg(CkReductionMsg *msg)
         if (!entry->hasParent() && !entry->isObsolete()) {
             /// Indicate it is not on old spanning tree
             msg->sourceFlag = 0;
-            /// Flag the redn as coming from an old tree and that the new entry cookie needs an update.
+            /// Flag the redn as coming from an old tree and that the new entry cookie needs to know the new redn num.
             updateReduceNo  = 1;
         }
         /// If I am not the root or this latest cookie is also obsolete
@@ -1314,7 +1320,6 @@ void CkMulticastMgr::recvRedMsg(CkReductionMsg *msg)
 
 
 
-
 void CkMulticastMgr::releaseFutureReduceMsgs(mCastEntryPtr entry)
 {
   CProxy_CkMulticastMgr  mCastGrp(thisgroup);
@@ -1326,6 +1331,8 @@ void CkMulticastMgr::releaseFutureReduceMsgs(mCastEntryPtr entry)
   entry->red.futureMsgs.length() = 0;
 }
 
+
+
 // these messages have to be sent to root
 void CkMulticastMgr::releaseBufferedReduceMsgs(mCastEntryPtr entry)
 {
@@ -1354,6 +1361,8 @@ void CkMulticastMgr::releaseBufferedReduceMsgs(mCastEntryPtr entry)
   entry->red.futureMsgs.length() = 0;
 }
 
+
+
 void CkMulticastMgr::updateRedNo(mCastEntryPtr entry, int red)
 {
   DEBUGF(("[%d] updateRedNo entry:%p to %d\n", CkMyPe(), entry, red));
index 1facc35613ee7514783db4b430053818ee89d3b4..0dbee89caad10deaefedf78c000c6087788fd505 100644 (file)
@@ -43,6 +43,9 @@ public:
 
 typedef void (*redClientFn)(CkSectionInfo sid, void *param,int dataSize,void *data);
 
+/// Retrieve section info from a multicast msg. Part of API
+extern void CkGetSectionInfo(CkSectionInfo &id, void *msg);
+
 
 
 /**
@@ -111,7 +114,7 @@ class CkMulticastMgr: public CkDelegateMgr
         // ------------------------- Reductions ------------------------
         /// entry Accept a redn msg from a child in the spanning tree
         void recvRedMsg(CkReductionMsg *msg);
-        /// entry
+        /// entry Update the current completed redn num to input value
         void updateRedNo(mCastEntryPtr, int red);
         /// Configure a client to accept the reduction result
         void setReductionClient(CProxySection_ArrayElement &, redClientFn fn,void *param=NULL);
@@ -134,7 +137,6 @@ class CkMulticastMgr: public CkDelegateMgr
         void SimpleSend(int ep,void *m, CkArrayID a, CkSectionID &sid, int opts);
         /// Retire and rebuild the spanning tree when one of the intermediate vertices migrates
         void rebuild(CkSectionInfo &);
-        // typedef CkMcastReductionMsg *(*reducerFn)(int nMsg,CkMcastReductionMsg **msgs);
 
     private:
         /// Fill the SectionInfo cookie in the SectionID obj with relevant info
@@ -148,16 +150,14 @@ class CkMulticastMgr: public CkDelegateMgr
         enum {MAXREDUCERS=256};
         // static CkReduction::reducerFn reducerTable[MAXREDUCERS];
         void releaseBufferedReduceMsgs(mCastEntryPtr entry);
+        /// Release buffered redn msgs from later reductions which arrived early (out of order)
         void releaseFutureReduceMsgs(mCastEntryPtr entry);
+        ///
         inline CkReductionMsg *buildContributeMsg(int dataSize,void *data,CkReduction::reducerType type, CkSectionInfo &id, CkCallback &cb, int userFlag=-1);
-        void reduceFragment (int index, CkSectionInfo& id, mCastEntry* entry, reductionInfo& redInfo,
-                         int& updateReduceNo, int currentTreeUp);
+        /// Reduce one fragment of a reduction msg and handle appropriately (transmit up the tree, buffer, combine etc)
+        void reduceFragment (int index, CkSectionInfo& id, mCastEntry* entry, reductionInfo& redInfo, int& updateReduceNo, int currentTreeUp);
+        /// At the tree root: Combine all msg fragments for final delivery to the client
         CkReductionMsg* combineFrags (CkSectionInfo& id, mCastEntry* entry, reductionInfo& redInfo);
 };
 
-
-
-
-extern void CkGetSectionInfo(CkSectionInfo &id, void *msg);
-
 #endif