Fixed getPETopoTreeEdges forming tree of logical nodes instead of PEs
[charm.git] / src / util / spanningTree.C
1 /**
2  * Author: jjgalvez@illinois.edu (Juan Galvez)
3  * Uses recursive bisect/trisect functionality similar to what was in
4  * src/util/treeStrategy_3dTorus_minHops.h
5  */
6 #include "spanningTree.h"
7 #include "TopoManager.h"
8 #include <algorithm>
9 #include <limits.h>
10
11 #include <unordered_map>
12 typedef std::unordered_map<int,int> intMap;
13
14 #include <bitset>
15 #define DIM_SET_SIZE 32     // bitset size
16
17 #define _DEBUG_SPANNING_TREE_ 0
18
19 #if _DEBUG_SPANNING_TREE_
20 #include <sstream>
21 #endif
22
23 template <typename Iterator>
24 class ST_RecursivePartition<Iterator>::PhyNode {
25 public:
26   PhyNode(int id, int pe, TopoManager *tmgr) : id(id), pe(pe) {
27     if (tmgr->haveTopologyInfo()) tmgr->rankToCoordinates(pe, coords);
28   }
29   inline void addNode(int n) { nodes.push_back(n); }
30   inline int size() const { return nodes.size(); }
31   inline int getNode(int i) const {
32     CkAssert(i >= 0 && i < nodes.size());
33     return nodes[i];
34   }
35   /// distance to other phynode
36   inline int distance(const PhyNode &o, TopoManager *tmgr) const {
37     return tmgr->getHopsBetweenRanks(pe, o.pe);
38   }
39
40 #if _DEBUG_SPANNING_TREE_
41   void print() {
42     CkPrintf("phynode %d, pe=%d, coords=", id, pe);
43     for (int i=0; i < coords.size(); i++) CkPrintf("%d ", coords[i]);
44     CkPrintf(", nodes: ");
45     for (int i=0; i < nodes.size(); i++) CkPrintf("%d ", nodes[i]);
46     CkPrintf("\n");
47   }
48 #endif
49
50   int id;
51   int pe; /// a pe in physical node (doesn't matter which one it is)
52   std::vector<int> nodes;  /// (charm)nodes in this phynode
53   std::vector<int> coords; /// coordinates of this phynode
54 };
55
56 template <typename Iterator>
57 class ST_RecursivePartition<Iterator>::PhyNodeCompare {
58 public:
59   PhyNodeCompare(int dim): dim(dim) {}
60   inline bool operator()(const typename ST_RecursivePartition::PhyNode *a,
61                          const typename ST_RecursivePartition::PhyNode *b) const {
62     if (a->coords[dim] == b->coords[dim]) return (a->id < b->id);
63     else return (a->coords[dim] < b->coords[dim]);
64   }
65 private:
66   const int dim;
67 };
68
69 // ----------------- ST_RecursivePartition -----------------
70
71 template <typename Iterator>
72 ST_RecursivePartition<Iterator>::ST_RecursivePartition(bool nodeTree, bool preSorted)
73   : nodeTree(nodeTree), preSorted(preSorted)
74 {
75   tmgr = TopoManager::getTopoManager();
76   if (tmgr->haveTopologyInfo()) {
77     for (int i=0; i < tmgr->getNumDims(); i++) {
78       if (tmgr->getDimSize(i) > DIM_SET_SIZE)
79         CkAbort("ST_RecursivePartition:: Increase bitset size to match size of largest topology dimension");
80     }
81   }
82 #if _DEBUG_SPANNING_TREE_
83   if (CkMyNode() == 0) {
84     CkPrintf("TopoManager reports topoinfo=%d, %d dims, dim sizes: ", tmgr->haveTopologyInfo(), tmgr->getNumDims());
85     for (int i=0; i < tmgr->getNumDims(); i++) CkPrintf("%d ", tmgr->getDimSize(i));
86     CkPrintf("\n");
87   }
88 #endif
89 }
90
91 template <typename Iterator>
92 int ST_RecursivePartition<Iterator>::buildSpanningTree(Iterator start, Iterator end,
93                                                        unsigned int maxBranches)
94 {
95   children.clear();
96   const int numNodes = std::distance(start, end);
97   if (numNodes == 0) CkAbort("Error: requested spanning tree but no nodes\n");
98   else if (numNodes == 1) return 0;
99
100 #if _DEBUG_SPANNING_TREE_
101   CkPrintf("[%d] ST_RecursivePartition:: Root is %d, being requested %d children, Num nodes incl root is %d\n",
102            CkMyNode(), *start, maxBranches, numNodes);
103 #endif
104
105   // group nodes into phynodes
106   std::vector<typename ST_RecursivePartition<Iterator>::PhyNode> phynodes;
107   initPhyNodes(start, end, phynodes);
108   std::vector<typename ST_RecursivePartition<Iterator>::PhyNode*> pphynodes(phynodes.size());
109   for (int i=0; i < phynodes.size(); i++) pphynodes[i] = &phynodes[i];
110
111   // build the spanning tree of physical nodes
112   build(pphynodes, start, maxBranches);
113
114 #if _DEBUG_SPANNING_TREE_
115   // print this node and children
116   for (int i=0; i < children.size()-1; i++) {
117     std::ostringstream oss;
118     for (Iterator j=children[i]; j != children[i+1]; j++) {
119       if (j == children[i]) oss << "[" << CkMyNode() << "] subtree " << *j << ": ";
120       else oss << *j << " ";
121     }
122     CkPrintf("%s\n", oss.str().c_str());
123   }
124 #endif
125   return (children.size() - 1);
126 }
127
128 template <typename Iterator>
129 void ST_RecursivePartition<Iterator>::initPhyNodes(Iterator start, Iterator end,
130                                                    std::vector<PhyNode> &phynodes) const
131 {
132 #if _DEBUG_SPANNING_TREE_
133   int rootPhyNodeId;
134   if (nodeTree) rootPhyNodeId = CmiPhysicalNodeID(CmiNodeFirst(*start));
135   else rootPhyNodeId = CmiPhysicalNodeID(*start);   // contains pes
136   CkPrintf("[%d] Root phynode is %d\n", CkMyNode(), rootPhyNodeId);
137 #endif
138
139   const int numNodes = std::distance(start, end);
140   phynodes.reserve(std::min(CmiNumPhysicalNodes(), numNodes));
141   intMap phyNodeMap;
142   int last = -1;
143   for (Iterator i=start; i != end; i++) {
144     int n = *i;
145     int pe = n;
146     if (nodeTree) pe = CmiNodeFirst(n);
147     int phyNodeId = CmiPhysicalNodeID(pe);
148 #if _DEBUG_SPANNING_TREE_
149     if (phyNodeId == rootPhyNodeId) CkPrintf("[%d] Node %d is in root phynode\n", CkMyNode(), n);
150 #endif
151     PhyNode *phyNode; // phynode of node n
152     if (preSorted) {
153       if (phyNodeId != last) {
154         phynodes.push_back(PhyNode(phyNodeId,pe,tmgr));
155         last = phyNodeId;
156       }
157       phyNode = &(phynodes.back());
158     } else {
159       intMap::iterator it = phyNodeMap.find(phyNodeId);
160       if (it == phyNodeMap.end()) {
161         phynodes.push_back(PhyNode(phyNodeId,pe,tmgr));
162         phyNodeMap[phyNodeId] = int(phynodes.size()-1);
163         phyNode = &(phynodes.back());
164       } else {
165         phyNode = &(phynodes[it->second]);
166       }
167     }
168     phyNode->addNode(n);
169   }
170
171 #if _DEBUG_SPANNING_TREE_
172   CkPrintf("%d physical nodes:\n", int(phynodes.size()));
173   for (int i=0; i < phynodes.size(); i++) phynodes[i].print();
174 #endif
175 #if XE6_TOPOLOGY
176   translateCoordinates(phynodes);
177 #endif
178 }
179
180 template <typename Iterator>
181 void ST_RecursivePartition<Iterator>::withinPhyNodeTree(PhyNode &rootPhyNode, int bfactor, Iterator &pos)
182 {
183   if (rootPhyNode.size() == 1) return; // only one element in physical node
184
185   std::vector<int> nodes; // nodes in this phynode (root is ignored)
186   std::map<int, std::vector<int>> nodePes; // PEs in each node (used when building PE tree)
187   if (nodeTree) nodes.assign(rootPhyNode.nodes.begin()+1, rootPhyNode.nodes.end());
188   else {
189     // group PEs into nodes
190     for (int i=1; i < rootPhyNode.size(); i++) {
191       int pe = rootPhyNode.getNode(i);
192       nodePes[CkNodeOf(pe)].push_back(pe);
193     }
194     std::map<int, std::vector<int>>::iterator it;
195     for (it = nodePes.begin(); it != nodePes.end(); it++) nodes.push_back(it->first);
196   }
197
198   const int numNodes = nodes.size();
199   if (!nodeTree && (numNodes == 1)) {
200     // make all PEs in node direct children
201     std::vector<int> &pes = nodePes.begin()->second;
202     for (int i=0; i < pes.size(); i++) {
203       children.push_back(pos);
204       *pos = pes[i]; pos++;
205     }
206   } else {
207     int numChildren = std::min(bfactor, numNodes);
208     int partSize = numNodes / numChildren, parts=0;
209     for (std::vector<int>::iterator i=nodes.begin(); parts < numChildren; i += partSize, parts++) {
210       children.push_back(pos);
211       std::vector<int>::iterator end;
212       if (parts == numChildren-1) end = nodes.end();
213       else end = i + partSize;
214       for (std::vector<int>::iterator j=i; j != end; j++) {
215         int n = *j;
216         if (!nodeTree) {
217           std::vector<int> &pes = nodePes[n];
218           for (int k=0; k < pes.size(); k++) { *pos = pes[k]; pos++; }
219         } else {
220           *pos = n; pos++;
221         }
222       }
223     }
224   }
225 }
226
227 template <typename Iterator>
228 void ST_RecursivePartition<Iterator>::build(std::vector<PhyNode*> &phyNodes,
229                                             Iterator start,
230                                             unsigned int maxBranches)
231 {
232   typename ST_RecursivePartition<Iterator>::PhyNode *rootPhyNode = phyNodes[0];
233   children.reserve(rootPhyNode->size() + maxBranches); // reserve for max number of children
234
235   Iterator pos = start+1;
236   withinPhyNodeTree(*rootPhyNode, maxBranches, pos);
237
238   // TODO another option, don't know if better, is if
239   // I'm the root node of a phynode (and phynodes.size() > 1), only have one other node
240   // in my phynode as direct child, and have that child direct-send to every other
241   // node in the phynode. This would be an easy change.
242
243   if (phyNodes.size() == 1) {
244     children.push_back(pos);
245     return;
246   }
247
248   // this will partition the nodes in phyNodes, by reorganizing the list.
249   // phyNodeChildren will point to where each partition starts
250   std::vector<int> phyNodeChildren;
251   phyNodeChildren.reserve(maxBranches+1);
252   partition(phyNodes, 1, phyNodes.size(), maxBranches, phyNodeChildren);
253   phyNodeChildren.push_back(phyNodes.size());
254   if (tmgr->haveTopologyInfo())
255     // choose root phynode in each subtree (closest one to top-level root phynode), put at beginning
256     chooseSubtreeRoots(phyNodes, phyNodeChildren);
257
258   // store result as subtrees of nodes
259   for (int i=0; i < phyNodeChildren.size() - 1; i++) {
260     children.push_back(pos);
261     for (int j=phyNodeChildren[i]; j < phyNodeChildren[i+1]; j++) {  // for each phynode in subtree
262       for (int k=0; k < phyNodes[j]->size(); k++) {    // for each node in phynode
263         *pos = phyNodes[j]->getNode(k);
264         pos++;
265       }
266     }
267   }
268   children.push_back(pos);
269 }
270
271 /**
272  * phyNodes is list of phyNodes, grouped by subtrees (rootPhyNode in position 0)
273  * phyNodeChildren contains the indices (in phyNodes) of first node of each subtree
274  */
275 template <typename Iterator>
276 void ST_RecursivePartition<Iterator>::chooseSubtreeRoots(std::vector<PhyNode*> &phyNodes,
277                                                          std::vector<int> &children) const
278 {
279   for (int i=0; i < children.size() - 1; i++) { // for each subtree
280     int start = children[i];  // subtree start
281     int minDistance = INT_MAX;
282     int closestIdx = -1;
283     for (int j=start; j < children[i+1]; j++) { // for each phynode in subtree
284       int d = phyNodes[0]->distance(*phyNodes[j], tmgr);
285       if (d < minDistance) {
286         minDistance = d;
287         closestIdx = j;
288       }
289     }
290 #if _DEBUG_SPANNING_TREE_
291     if (CkMyNode() == 0) CkPrintf("Subtree %d, closest phynode to root is %d, distance=%d\n", i, phyNodes[closestIdx]->id, minDistance);
292 #endif
293     // make closest one the root
294     std::swap(phyNodes[start], phyNodes[closestIdx]);
295   }
296 }
297
298 /// recursive partitioning of phynodes into numPartitions
299 template <typename Iterator>
300 void ST_RecursivePartition<Iterator>::partition(std::vector<PhyNode*> &nodes,
301                                       int start, int end, int numPartitions,
302                                       std::vector<int> &children) const
303 {
304 #if _DEBUG_SPANNING_TREE_
305     CkPrintf("Partitioning into at most %d parts, phynodes [", numPartitions);
306     for (int i=start; i < end; i++) CkPrintf("%d ", nodes[i]->id);
307     CkPrintf("]\n");
308 #endif
309   int numNodes = end - start;
310   if ((numPartitions > 1) && (numNodes > 1)) {
311     // further partitioning is needed and there are nodes left to partition
312     if (numPartitions % 3 == 0) trisect(nodes, start, end, numPartitions, children);
313     else bisect(nodes, start, end, numPartitions, children);
314   } else if ((numPartitions >= 1) && (numNodes >= 1)) {
315     // just register the remaining node(s) as a sub-tree
316     children.push_back(start);
317   } else if (numNodes == 0) {
318     // there are no nodes left, do nothing
319   } else if ((numNodes >= 0) && (numPartitions == 0)) {
320     // if there are nodes remaining but no partitions to put them in
321     CkAbort("\nThere are nodes left but no remaining partitions to put them in.");
322   } else {
323     // fall through case. Should never get here unless something is wrong
324     CkAbort("\nPartitioning fell through to the default case (which it never should). Check the logic in this routine.");
325   }
326 }
327
328 template <typename Iterator>
329 void ST_RecursivePartition<Iterator>::bisect(std::vector<PhyNode*> &nodes,
330                                    int start, int end, int numPartitions,
331                                    std::vector<int> &children) const
332 {
333   const int numNodes = end - start;
334   int median = start + (numNodes / 2);
335   if (tmgr->haveTopologyInfo()) {
336     // Find the dimension along which to bisect the bounding box
337     int maxSpreadDim = maxSpreadDimension(nodes,start,end);
338     // Bisect the vertex list at the median element
339     typename std::vector<PhyNode*>::iterator itr = nodes.begin();
340     std::nth_element(itr+start, itr+median, itr+end, typename ST_RecursivePartition::PhyNodeCompare(maxSpreadDim));
341 #if _DEBUG_SPANNING_TREE_
342     CkPrintf("Bisecting, maxSpreadDim=%d\n", maxSpreadDim);
343 #endif
344   }
345   // Partition the two pieces further
346   int numLeft = numPartitions/2;
347   partition(nodes, start, median, numLeft, children);
348   partition(nodes, median, end, numPartitions - numLeft, children);
349 }
350
351 template <typename Iterator>
352 void ST_RecursivePartition<Iterator>::trisect(std::vector<PhyNode*> &nodes,
353                                    int start, int end, int numPartitions,
354                                    std::vector<int> &children) const
355 {
356   const int numNodes = end - start;
357   /// Pin the location of the 1/3 and 2/3 elements
358   int oneThird = start + (numNodes / 3);
359   int twoThird = oneThird + (numNodes / 3);
360   if (tmgr->haveTopologyInfo()) {
361     int maxSpreadDim = maxSpreadDimension(nodes,start,end);
362     typename std::vector<PhyNode*>::iterator itr = nodes.begin();
363     std::nth_element(itr+start,    itr+oneThird, itr+end, typename ST_RecursivePartition::PhyNodeCompare(maxSpreadDim));
364     std::nth_element(itr+oneThird, itr+twoThird, itr+end, typename ST_RecursivePartition::PhyNodeCompare(maxSpreadDim));
365 #if _DEBUG_SPANNING_TREE_
366     CkPrintf("Trisecting, maxSpreadDim=%d\n", maxSpreadDim);
367 #endif
368   }
369   /// Partition the three pieces further
370   int numLeft = numPartitions/3;
371   partition(nodes, start,    oneThird, numLeft, children);
372   partition(nodes, oneThird, twoThird, numLeft, children);
373   partition(nodes, twoThird, end,      numLeft, children);
374 }
375
376 template <typename Iterator>
377 int ST_RecursivePartition<Iterator>::maxSpreadDimension(std::vector<PhyNode*> &nodes,
378                                                         int start, int end) const
379 {
380   const int nDims = tmgr->getNumDims();
381   if (!tmgr->haveTopologyInfo() || (nDims <= 1)) return 0;
382
383   std::vector<std::bitset<DIM_SET_SIZE> > used(nDims);
384   for (int i=start; i < end; i++) {
385     PhyNode *n = nodes[i];
386     for (int j=0; j < nDims; j++) used[j].set(n->coords[j]);
387   }
388   int max_spread = -1;
389   int max_spread_dim = -1;
390   for (int i=0; i < nDims; i++) {
391     int c(used[i].count());
392     if (c > max_spread) {
393       max_spread = c;
394       max_spread_dim = i;
395     }
396   }
397   return max_spread_dim;
398 }
399
400 #if XE6_TOPOLOGY
401
402 inline static int modulo(int k, int n) { return ((k %= n) < 0) ? k+n : k; }
403
404 /**
405  * Translate coordinates of phynodes such that the max spread in each dimension
406  * is equal (or closer) to the number of coordinates actually used in that dimension.
407  * In other words, "moves" all phynodes by some translation offset so that their bounding box
408  * includes minimum number of coordinates, which implies that adjacent nodes won't go through torus edges.
409  * Works for any number of dimensions (N-d torus)
410  */
411 template <typename Iterator>
412 void ST_RecursivePartition<Iterator>::translateCoordinates(std::vector<PhyNode> &nodes) const
413 {
414   const int nDims = tmgr->getNumDims();
415   std::vector<std::bitset<DIM_SET_SIZE> > usedCoordinates(nDims);
416   std::vector<int> max_coord(nDims, -1);
417   std::vector<int> min_coord(nDims, INT_MAX);
418   std::vector<int> maxSpread(nDims);
419   std::vector<int> gapCenter(nDims, -1);
420   std::vector<int> dimSizes(nDims);
421   for (int i=0; i < nDims; i++) dimSizes[i] = tmgr->getDimSize(i);
422
423   for (int i=0; i < nodes.size(); i++) {
424     PhyNode &n = nodes[i];
425     for (int j=0; j < nDims; j++) {
426       int c = n.coords[j];
427       usedCoordinates[j].set(c);
428       if (c > max_coord[j]) max_coord[j] = c;
429       if (c < min_coord[j]) min_coord[j] = c;
430     }
431   }
432   for (int i=0; i < nDims; i++) {
433     maxSpread[i] = max_coord[i] - min_coord[i] + 1; // store max spread of each dimension
434     int sum = 0, nbUnusedCoords = 0;
435     for (int j=0; j < dimSizes[i]; j++) {
436       if (!usedCoordinates[i].test(j)) {  // j coordinate not used by any element
437         sum += j;
438         nbUnusedCoords++;
439       }
440     }
441     if (nbUnusedCoords > 0) gapCenter[i] = sum / nbUnusedCoords;
442   }
443
444 #if _DEBUG_SPANNING_TREE_
445   if (CkMyNode() == 0) {
446     CkPrintf("Used coordinates in each dimension:\n");
447     for (int i=0; i < nDims; i++) {
448       CkPrintf("%d: ", i);
449       for (int j=0; j < dimSizes[i]; j++) if (usedCoordinates[i].test(j)) CkPrintf("%d ", j);
450       CkPrintf(", %d\n", int(usedCoordinates[i].count()));
451     }
452     CkPrintf("Max,min coord in each dimension:\n");
453     for (int i=0; i < nDims; i++) CkPrintf("%d: %d %d\n", i, max_coord[i], min_coord[i]);
454     CkPrintf("Gap center for each dimension:\n");
455     for (int i=0; i < nDims; i++) CkPrintf("%d: %d\n", i, gapCenter[i]);
456     CkPrintf("Max spread for each dimension:\n");
457     for (int i=0; i < nDims; i++) CkPrintf("%d: %d\n", i, maxSpread[i]);
458   }
459 #endif
460
461   for (int i=0; i < nDims; i++) { // find best translation offset for each dimension
462     if (maxSpread[i] == int(usedCoordinates[i].count())) continue; // nothing to correct for this dimension
463
464 #if _DEBUG_SPANNING_TREE_
465     if (CkMyNode() == 0) CkPrintf("Going to attempt to correct coordinates on dimension %d\n", i);
466 #endif
467
468     // choose direction of unused coordinates to finish faster
469     int direction = 1;  // go "right"
470     if (gapCenter[i] < dimSizes[i]/2) direction = -1;   // go "left"
471 #if _DEBUG_SPANNING_TREE_
472     if (CkMyNode() == 0) CkPrintf("Chose direction %d\n", direction);
473 #endif
474
475     // we're going to attempt to minimize the max spread in dimension i
476     int bestMaxSpread = maxSpread[i];
477     int bestOffset=0;
478     for (int m=0; ; m++) {
479       // apply offset of 'm' in 'direction' to all nodes
480       int max_coord = -1;
481       int min_coord = INT_MAX;
482       for (int j=0; j < nodes.size(); j++) {
483         int &x = nodes[j].coords[i];
484         x += direction;
485         if (x >= dimSizes[i]) x = 0;
486         else if (x < 0) x = dimSizes[i] - 1;
487         if (x > max_coord) max_coord = x;
488         if (x < min_coord) min_coord = x;
489       }
490       // evaluate max spread with new offset
491       int maxSpread_new = max_coord - min_coord + 1;
492 #if _DEBUG_SPANNING_TREE_
493       if (CkMyNode() == 0) CkPrintf("%d %d\n", m, maxSpread_new);
494 #endif
495       if (maxSpread_new == int(usedCoordinates[i].count())) {
496 #if _DEBUG_SPANNING_TREE_
497         if (CkMyNode() == 0) CkPrintf("FIXED after %d movements\n", m+1);
498 #endif
499         break;
500       } else if (maxSpread_new < bestMaxSpread) {
501         bestMaxSpread = maxSpread_new;
502         bestOffset = m;
503       }
504       if (m == dimSizes[i] - 2) {
505         // did max number of possible movements (another movement would return us to original
506         // coordinates/offset), exit loop
507         if (maxSpread_new > bestMaxSpread) {
508           for (int j=0; j < nodes.size(); j++) {
509             // roll back to bestOffset
510             int &x = nodes[j].coords[i];
511             x += ((m - bestOffset)*-direction);
512             x = modulo(x, dimSizes[i]);
513           }
514         }
515 #if _DEBUG_SPANNING_TREE_
516         if ((CkMyNode() == 0) && (bestMaxSpread < maxSpread[i])) CkPrintf("Improved to %d max spread\n", bestMaxSpread);
517 #endif
518         break;   // we're done correcting in this dimension
519       }
520     }
521   }
522 }
523 #endif
524
525 template class ST_RecursivePartition<int*>;
526 template class ST_RecursivePartition<std::vector<int>::iterator>;
527
528 // ------------------------------------------------------------------
529
530 template <typename Iterator>
531 void getNeighborsTopoTree_R(Iterator start, Iterator end, int myElem, int prevLvlParent,
532                             bool nodeTree, unsigned int bfactor, CmiSpanningTreeInfo &t)
533 {
534   ST_RecursivePartition<Iterator> tb(nodeTree, prevLvlParent != -1);
535   int numSubtrees = tb.buildSpanningTree(start, end, std::min(bfactor, (unsigned int)std::distance(start,end)-1));
536   if (myElem == *start) {
537     // I am the root of this subtree so we're done (collect my children and return)
538     t.parent = prevLvlParent;
539     if (numSubtrees > 0) t.children = (int*)malloc(sizeof(int)*numSubtrees);
540     t.child_count = numSubtrees;
541     for (int i=0; i < numSubtrees; i++) t.children[i] = *tb.begin(i);
542     return;
543   }
544   // find in which subtree myElem is in and recursively continue on only that subtree
545   bool elemFound = false;
546   for (int i=0; i < numSubtrees; i++) {
547     Iterator subtreeStart = tb.begin(i), subtreeEnd = tb.end(i);
548     Iterator f = std::find(subtreeStart, subtreeEnd, myElem);
549     if (f != subtreeEnd) {
550       getNeighborsTopoTree_R(subtreeStart, subtreeEnd, myElem, *start, nodeTree, bfactor, t);
551       elemFound = true;
552       break;
553     }
554   }
555   if (!elemFound) CkAbort("Can't build tree. The element is not in the list of tree nodes");
556 }
557
558 void getNodeTopoTreeEdges(int node, int rootNode, int *nodes, int numnodes, unsigned int bfactor,
559                           int *parent, int *child_count, int **children)
560 {
561   CkAssert((node >= 0 && node < CkNumNodes()) && (rootNode >= 0 && rootNode < CkNumNodes()) && (bfactor > 0));
562
563   std::vector<int> node_v;
564   if (nodes == NULL) {
565     numnodes = CkNumNodes();
566     node_v.resize(numnodes);
567     node_v[0] = rootNode;
568     for (int i=0, j=1; i < numnodes; i++) {
569       if (i != rootNode) node_v[j++] = i;
570     }
571     nodes = node_v.data();
572   } else {
573     CkAssert(numnodes > 0);
574     if (rootNode != nodes[0]) CkAbort("getNodeTopoTreeEdges: root must be in first position of nodes");
575   }
576
577   CmiSpanningTreeInfo t;
578   getNeighborsTopoTree_R(nodes, nodes + numnodes, node, -1, true, bfactor, t);
579   *parent      = t.parent;
580   *child_count = t.child_count;
581   *children    = t.children;
582 }
583
584 void getPETopoTreeEdges(int pe, int rootPE, int *pes, int numpes, unsigned int bfactor,
585                         int *parent, int *child_count, int **children)
586 {
587   CkAssert((pe >= 0 && pe < CkNumPes()) && (rootPE >= 0 && rootPE < CkNumPes()) && (bfactor > 0));
588
589   std::vector<int> pe_v;
590   if (pes == NULL) {
591     numpes = CkNumPes();
592     pe_v.resize(numpes);
593     pe_v[0] = rootPE;
594     for (int i=0, j=1; i < numpes; i++) {
595       if (i != rootPE) pe_v[j++] = i;
596     }
597     pes = pe_v.data();
598   } else {
599     CkAssert(numpes > 0);
600     if (rootPE != pes[0]) CkAbort("getPETopoTreeEdges: root must be in first position of pes");
601   }
602
603   CmiSpanningTreeInfo t;
604   getNeighborsTopoTree_R(pes, pes + numpes, pe, -1, false, bfactor, t);
605   *parent      = t.parent;
606   *child_count = t.child_count;
607   *children    = t.children;
608 }
609
610 typedef std::unordered_map<int,CmiSpanningTreeInfo*> TreeInfoMap;
611
612 static TreeInfoMap trees;
613 CmiNodeLock _treeLock;
614
615 CmiSpanningTreeInfo *ST_RecursivePartition_getTreeInfo(int root) {
616   if (trees.size() == 0) {
617     _treeLock = CmiCreateLock();
618 #if CMK_ERROR_CHECKING
619     if (CkMyRank() != 0) CkAbort("First call to getTreeInfo has to be by rank 0");
620 #endif
621   }
622   CmiLock(_treeLock);
623   TreeInfoMap::iterator it = trees.find(root);
624   if (it != trees.end()) {
625     CmiSpanningTreeInfo *t = it->second;
626     CmiUnlock(_treeLock);
627     return t;
628   } else {
629     CmiSpanningTreeInfo *t = new CmiSpanningTreeInfo;
630     t->children = NULL;
631     trees[root] = t;
632     getNodeTopoTreeEdges(CkMyNode(), root, NULL, -1, 4, &t->parent, &t->child_count, &t->children);
633     CmiUnlock(_treeLock);
634     return t;
635   }
636 }
637
638 void get_topo_tree_nbs(int root, int *parent, int *child_count, int **children) {
639   CmiSpanningTreeInfo *t = ST_RecursivePartition_getTreeInfo(root);
640   *parent = t->parent;
641   *child_count = t->child_count;
642   *children = t->children;
643 }