Make the tree construction to be charm SMP node-aware to ensure children are on
authorChao Mei <chaomei2@illinois.edu>
Mon, 8 Nov 2010 19:02:57 +0000 (13:02 -0600)
committerChao Mei <chaomei2@illinois.edu>
Mon, 8 Nov 2010 19:02:57 +0000 (13:02 -0600)
different charm SMP nodes.

src/util/treeStrategy_topoUnaware.h

index 4aac7a57cc44343f860bd06f3dce98f3d3034877..aecd7c10066c99518ec5c680ed50bcfc2556e882 100644 (file)
@@ -1,5 +1,7 @@
 #include "charm++.h"
 
+#include <map>
+
 #ifndef TREE_STRATEGY_TOPO_UNAWARE
 #define TREE_STRATEGY_TOPO_UNAWARE
 namespace topo {
@@ -62,22 +64,61 @@ namespace impl {
         /// Return data holds the parent vertex info [and child info, if there are any]
         SpanningTreeVertex *parent = new SpanningTreeVertex(*firstVtx);
 
-        /// Compute the number of vertices in each branch
-        const int numDescendants = std::distance(firstVtx,beyondLastVtx) - 1;
-        int numInSubTree = numDescendants / maxBranches;
-        int remainder    = numDescendants % maxBranches;
+       //key is the node id, and the mapped value is the local index
+       std::map<int, int> nodesMap;
+       #if CMK_SMP     
+               int localIndex = 1; //relative to the root
+               for(Iterator vtx=firstVtx+1; vtx!=beyondLastVtx; vtx++, localIndex++){
+                       int nid = CmiNodeOf(topo::getProcID(*vtx));
+                       if(nodesMap.find(nid) == nodesMap.end()){
+                               //encounter a new node id
+                               nodesMap[nid] = localIndex;
+                       }
+               }
+               int totalNodes = nodesMap.size();
+       #else
+               //in non-SMP case, just to fake there's no SMP nodes so the work flow of spanning tree 
+               //creation is correct
+               int totalNodes = 0;
+       #endif
+               
+               if(totalNodes <= 1){
+                       /// Compute the number of vertices in each branch
+                       const int numDescendants = std::distance(firstVtx,beyondLastVtx) - 1;
+                       int numInSubTree = numDescendants / maxBranches;
+                       int remainder    = numDescendants % maxBranches;
 
-        /// Push the appropriate relative distances (from the tree root) into the container of child indices
-        for (int i=0, indx=1; (i<maxBranches && indx<=numDescendants); i++, indx += numInSubTree)
-        {
-            parent->childIndex.push_back(indx);
-            /// Distribute any remainder vertices as evenly as possible amongst all the branches 
-            if (remainder-- > 0) indx++;
-        }
+                       /// Push the appropriate relative distances (from the tree root) into the container of child indices
+                       for (int i=0, indx=1; (i<maxBranches && indx<=numDescendants); i++, indx += numInSubTree)
+                       {
+                               parent->childIndex.push_back(indx);
+                               /// Distribute any remainder vertices as evenly as possible amongst all the branches 
+                               if (remainder-- > 0) indx++;
+                       }       
+               }else{
+                       int *nidLocalIdx = new int[totalNodes];
+                       std::map<int, int>::iterator it;                
+                       int lIdx;
+                       for(it=nodesMap.begin(), lIdx=0; it!=nodesMap.end(); it++, lIdx++)
+                               nidLocalIdx[lIdx] = (*it).second;
+                       
+                       /// Compute the number of vertices in each branch
+                       const int numDescendants = totalNodes;
+                       int numInSubTree = numDescendants / maxBranches;
+                       int remainder    = numDescendants % maxBranches;
 
+                       /// Push the appropriate relative distances (from the tree root) into the container of child indices
+                       for (int i=0, indx=0; (i<maxBranches && indx<numDescendants); i++, indx += numInSubTree)
+                       {
+                               parent->childIndex.push_back(nidLocalIdx[indx]);
+                               /// Distribute any remainder vertices as evenly as possible amongst all the branches 
+                               if (remainder-- > 0) indx++;
+                       }
+                       delete [] nidLocalIdx;
+               }
         /// Return the output structure
         return parent;
-    }
+    }  
 }
 
 } // end namespace topo