add using namespace topo fix the gnu 3.2 on sun machine
[charm.git] / src / ck-com / OneTimeMulticastStrategy.C
1 /**
2    @addtogroup ComlibCharmStrategy
3    @{
4    @file
5
6 */
7
8
9 #include "OneTimeMulticastStrategy.h"
10 #include "TopoManager.h"
11 #include <string>
12 #include <set>
13 #include <vector>
14 #include <list>
15 #include <map>
16
17 //#define DEBUG 1
18
19 using std::list;
20 using std::set;
21 using std::vector;
22 using std::map;
23
24 /// @note: There is some bug that is preventing us from using CmiSyncListSend. 
25 #define SYNCLISTSENDANDFREE 1
26
27
28 CkpvExtern(CkGroupID, cmgrID);
29
30 OneTimeMulticastStrategy::OneTimeMulticastStrategy()
31   : Strategy(), CharmStrategy() {
32   //  ComlibPrintf("OneTimeMulticastStrategy constructor\n");
33   setType(ARRAY_STRATEGY);
34 }
35
36 OneTimeMulticastStrategy::~OneTimeMulticastStrategy() {
37 }
38
39 void OneTimeMulticastStrategy::pup(PUP::er &p){
40   Strategy::pup(p);
41   CharmStrategy::pup(p);
42 }
43
44
45 /** Called when the user invokes the entry method on the delegated proxy. */
46 void OneTimeMulticastStrategy::insertMessage(CharmMessageHolder *cmsg){
47 #if DEBUG
48   CkPrintf("[%d] OneTimeMulticastStrategy::insertMessage\n", CkMyPe());
49   fflush(stdout);
50 #endif 
51
52   if(cmsg->dest_proc != IS_SECTION_MULTICAST && cmsg->sec_id == NULL) { 
53     CkAbort("OneTimeMulticastStrategy can only be used with an array section proxy");
54   }
55     
56
57   envelope *env = UsrToEnv(cmsg->getCharmMessage());
58   int npes = 1;
59   int pes[1] = {0};
60   _TRACE_CREATION_MULTICAST(env, npes, pes);
61
62 #if DEBUG
63   CkPrintf("[%d] after TRACE_CREATION_MULTICAST menv->event=%d\n", CkMyPe(), (int)env->getEvent());  
64 #endif
65   
66   // Create a multicast message containing all information about remote destination objects 
67   ComlibMulticastMsg * multMsg = sinfo.getNewMulticastMessage(cmsg, 0, getInstance());
68
69
70 #if DEBUG
71     CkPrintf("[%d] after TRACE_CREATION_MULTICAST multMsg->event=%d\n", CkMyPe(), (int)( UsrToEnv(multMsg)->getEvent() ) );  
72 #endif
73
74   // The remote multicast method will send the message to the remote PEs, as specified in multMsg
75   remoteMulticast(multMsg, true);
76
77   // local multicast will re-extract a list of local destination objects (FIXME to make this more efficient)
78   localMulticast(cmsg);
79
80   delete cmsg;    
81 }
82
83
84
85 /** Deliver the message to the local elements. */
86 void OneTimeMulticastStrategy::localMulticast(CharmMessageHolder *cmsg) {
87   double start = CmiWallTimer();
88   CkSectionID *sec_id = cmsg->sec_id;
89   CkVec< CkArrayIndexMax > localIndices;
90   sinfo.getLocalIndices(sec_id->_nElems, sec_id->_elems, sec_id->_cookie.aid, localIndices);
91   deliverToIndices(cmsg->getCharmMessage(), localIndices );
92   traceUserBracketEvent(10000, start, CmiWallTimer());
93 }
94
95
96
97
98
99 /** 
100     Forward multicast message to our successor processors in the spanning tree. 
101     Uses CmiSyncListSend for delivery to this strategy's OneTimeMulticastStrategy::handleMessage method.
102 */
103 void OneTimeMulticastStrategy::remoteMulticast(ComlibMulticastMsg * multMsg, bool rootPE) {
104   double start = CmiWallTimer();
105
106   envelope *env = UsrToEnv(multMsg);
107     
108   
109   /// The index into the PE list in the message
110   int myIndex = -10000; 
111   const int totalDestPEs = multMsg->nPes;
112   const int myPe = CkMyPe();
113   
114   // Find my index in the list of all destination PEs
115   if(rootPE){
116     CkAssert(CkMyPe() == multMsg->_cookie.pe);
117     myIndex = -1;
118   } else {
119     for (int i=0; i<totalDestPEs; ++i) {
120       if(multMsg->indicesCount[i].pe == myPe){
121         myIndex = i;
122         break;
123       }
124     }
125   }
126   
127   if(myIndex == -10000)
128     CkAbort("My PE was not found in the list of destination PEs in the ComlibMulticastMsg");
129   
130   int npes;
131   int *pelist = NULL;
132
133   if(totalDestPEs > 0) {
134     //CkPrintf("totalDestPEs = %d\n", totalDestPEs);
135     determineNextHopPEs(totalDestPEs, multMsg->indicesCount, myIndex, multMsg->_cookie.pe, pelist, npes );
136   } else {
137     npes = 0;
138   }
139
140   if(npes == 0) {
141 #if DEBUG
142     CkPrintf("[%d] OneTimeMulticastStrategy::remoteMulticast is not forwarding to any other PEs\n", CkMyPe());
143 #endif
144     traceUserBracketEvent(10001, start, CmiWallTimer());
145     CmiFree(env);
146     return;
147   }
148   
149   //Collect Multicast Statistics
150   RECORD_SENDM_STATS(getInstance(), env->getTotalsize(), pelist, npes);
151   
152
153   CmiSetHandler(env, CkpvAccess(comlib_handler));
154   ((CmiMsgHeaderBasic *) env)->stratid = getInstance();  
155   CkPackMessage(&env);
156
157   double middle = CmiWallTimer();
158
159   
160   // CkPrintf("[%d] before CmiSyncListSendAndFree env->event=%d\n", CkMyPe(), (int)env->getEvent());
161
162 #if SYNCLISTSENDANDFREE
163   CmiSyncListSendAndFree(npes, pelist, env->getTotalsize(), (char*)env);
164 #else
165
166   CkAssert(npes > 0);
167   CmiSyncListSend(npes, pelist, env->getTotalsize(), (char*)env);
168   
169   delete[] pelist;
170 #endif
171
172   double end = CmiWallTimer();
173   traceUserBracketEvent(10001, start, middle);
174   traceUserBracketEvent(10002, middle, end);
175   
176 }
177
178
179
180 /** 
181     Receive an incoming multicast message(sent from OneTimeMulticastStrategy::remoteMulticast).
182     Deliver the message to all local elements 
183 */
184 void OneTimeMulticastStrategy::handleMessage(void *msg){
185 #if DEBUG
186   //  CkPrintf("[%d] OneTimeMulticastStrategy::handleMessage\n", CkMyPe());
187 #endif
188   envelope *env = (envelope *)msg;
189   CkUnpackMessage(&env);
190   ComlibMulticastMsg* multMsg = (ComlibMulticastMsg*)EnvToUsr(env);
191   
192   // Don't use msg after this point. Instead use the unpacked env
193   
194   RECORD_RECV_STATS(getInstance(), env->getTotalsize(), env->getSrcPe()); // DOESN'T DO ANYTHING IN NEW COMLIB
195   
196   // Deliver to objects marked as local in the message
197   int localElems;
198   envelope *newenv;
199   CkArrayIndexMax *local_idx_list;  
200   sinfo.unpack(env, localElems, local_idx_list, newenv);
201   ComlibMulticastMsg *newmsg = (ComlibMulticastMsg *)EnvToUsr(newenv);  
202
203   //  CkPrintf("[%d] in OneTimeMulticastStrategy::handleMessage before  deliverToIndices newenv->event=%d\n", CkMyPe(), (int)newenv->getEvent());
204
205
206 #if SYNCLISTSENDANDFREE
207
208   // Deliver locally
209   deliverToIndices(newmsg, localElems, local_idx_list );
210   
211   // Forward on to other processors if necessary
212   remoteMulticast(multMsg, false);
213  
214 #else
215
216   // Forward on to other processors if necessary
217   remoteMulticast(multMsg, false);
218
219   // Deliver locally
220   deliverToIndices(newmsg, localElems, local_idx_list );
221   
222   // Finally delete the reference counted message because remoteMulticast does not do this.
223   CmiFree(multMsg);
224
225 #endif
226   
227 }
228
229
230
231
232 void OneTimeMulticastStrategy::determineNextHopPEs(const int totalDestPEs, const ComlibMulticastIndexCount* destPEs, const int myIndex, const int rootPE, int * &pelist, int &npes) {
233   if(myIndex==-1){
234     // We are at a root node of the spanning tree. 
235     // We will forward the message to all other PEs in the destination list.
236     npes = totalDestPEs;
237     
238     pelist = new int[npes];
239     for (int i=0; i<npes; ++i) {
240       pelist[i] = destPEs[i].pe;
241     }
242   } else {
243     // We are at a leaf node of the spanning tree. 
244     npes = 0;
245   }
246   
247 }
248
249
250 void OneTimeRingMulticastStrategy::determineNextHopPEs(const int totalDestPEs, const ComlibMulticastIndexCount* destPEs, const int myIndex, const int rootPE, int * &pelist, int &npes) {
251   const int myPe = CkMyPe();
252
253   if(myIndex == totalDestPEs-1){
254     // Final PE won't send to anyone
255     npes = 0;
256     return;
257   } else {
258     // All non-final PEs will send to next PE in list
259     npes = 1;
260     pelist = new int[1];
261     pelist[0] = destPEs[myIndex+1].pe;
262   }
263
264 }
265
266
267 void OneTimeTreeMulticastStrategy::determineNextHopPEs(const int totalDestPEs, const ComlibMulticastIndexCount* destPEs, const int myIndex, const int rootPE, int * &pelist, int &npes){
268   const int myPe = CkMyPe();
269   
270   // The logical indices start at 0 = root node. Logical index i corresponds to the entry i+1 in the array of PEs in the message
271   
272   int sendLogicalIndexStart = degree*(myIndex+1) + 1;       // inclusive
273   int sendLogicalIndexEnd = sendLogicalIndexStart + degree - 1;   // inclusive
274   
275   if(sendLogicalIndexEnd-1 >= totalDestPEs){
276     sendLogicalIndexEnd = totalDestPEs;
277   }
278
279   int numSend = sendLogicalIndexEnd - sendLogicalIndexStart + 1;
280   if(numSend <= 0){
281     npes = 0;
282     return;
283   }
284  
285 #if DEBUG
286   if(numSend > 0)
287     CkPrintf("Tree logical index %d sending to logical %d to %d (totalDestPEs excluding root=%d)  numSend=%d\n",
288              myIndex+1, sendLogicalIndexStart, sendLogicalIndexEnd, totalDestPEs, numSend);
289 #endif
290
291   npes = numSend;
292   pelist = new int[npes];
293   
294   for(int i=0;i<numSend;i++){
295     CkAssert(sendLogicalIndexStart-1+i < totalDestPEs);
296     pelist[i] = destPEs[sendLogicalIndexStart-1+i].pe;
297 #if DEBUG
298     CkPrintf("Tree logical index %d sending to PE %d\n", myIndex+1, pelist[i]);
299 #endif
300     CkAssert(pelist[i] < CkNumPes());
301   }
302   
303 }
304
305
306 /** Find a unique representative PE for a node containing pe, with the restriction that the returned PE is in the list destPEs. */
307 int getFirstPeOnPhysicalNodeFromList(int pe, const int totalDestPEs, const ComlibMulticastIndexCount* destPEs){
308   int num;
309   int *nodePeList;
310   CmiGetPesOnPhysicalNode(pe, &nodePeList, &num);
311   
312   for(int i=0;i<num;i++){
313     // Scan destPEs for the pe
314     int p = nodePeList[i];
315     
316     for(int j=0;j<totalDestPEs;j++){
317       if(p == destPEs[j].pe){
318         // found the representative PE for the node that is in the destPEs list
319         return p;
320       }
321     }
322   }
323   
324   CkAbort("ERROR: Could not find an entry for pe in destPEs list.\n");
325   return -1;
326 }
327
328
329 /** Find a unique representative PE for a node containing pe, with the restriction that the returned PE is in the list destPEs. */
330 int getNthPeOnPhysicalNodeFromList(int n, int pe, const int totalDestPEs, const ComlibMulticastIndexCount* destPEs){
331   int num;
332   int *nodePeList;
333   CmiGetPesOnPhysicalNode(pe, &nodePeList, &num);
334   
335   int count = 0;
336   int lastFound = -1;
337   
338   // Foreach PE on this physical node
339   for(int i=0;i<num;i++){
340     int p = nodePeList[i];
341     
342     // Scan destPEs for the pe
343     for(int j=0;j<totalDestPEs;j++){
344       if(p == destPEs[j].pe){
345         lastFound = p;
346         if(count==n)
347           return p;
348         count++;
349       }
350     }
351   }
352   
353   if(lastFound != -1)
354     return lastFound;
355
356   CkAbort("ERROR: Could not find an entry for pe in destPEs list.\n");
357   return -1;
358 }
359
360
361 /** List all the PEs from the list that share the physical node */ 
362 vector<int> getPesOnPhysicalNodeFromList(int pe, const int totalDestPEs, const ComlibMulticastIndexCount* destPEs){ 
363    
364   vector<int> result; 
365  
366   int num; 
367   int *nodePeList; 
368   CmiGetPesOnPhysicalNode(pe, &nodePeList, &num); 
369   
370   for(int i=0;i<num;i++){ 
371     // Scan destPEs for the pe 
372     int p = nodePeList[i]; 
373     for(int j=0;j<totalDestPEs;j++){ 
374       if(p == destPEs[j].pe){ 
375         // found the representative PE for the node that is in the
376         // destPEs list 
377         result.push_back(p); 
378         break; 
379       } 
380     } 
381   } 
382   
383   return result; 
384 }
385
386
387
388 /** List all the other PEs from the list that share the physical node */
389 vector<int> getOtherPesOnPhysicalNodeFromList(int pe, const int totalDestPEs, const ComlibMulticastIndexCount* destPEs){
390   
391   vector<int> result;
392
393   int num;
394   int *nodePeList;
395   CmiGetPesOnPhysicalNode(pe, &nodePeList, &num);
396   
397   for(int i=0;i<num;i++){
398     // Scan destPEs for the pe
399     int p = nodePeList[i];
400     if(p != pe){
401       for(int j=0;j<totalDestPEs;j++){
402         if(p == destPEs[j].pe){
403           // found the representative PE for the node that is in the destPEs list
404           result.push_back(p);
405           break;
406         }
407       }
408     }
409   }
410   
411   return result;
412 }
413
414
415 void OneTimeNodeTreeMulticastStrategy::determineNextHopPEs(const int totalDestPEs, const ComlibMulticastIndexCount* destPEs, const int myIndex, const int rootPE, int * &pelist, int &npes){
416   const int myPe = CkMyPe();
417
418   set<int> nodePERepresentatives;
419   
420   // create a list of PEs, with one for each node to which the message must be sent
421   for(int i=0; i<totalDestPEs; i++){
422     int pe = destPEs[i].pe;
423     int representative = getFirstPeOnPhysicalNodeFromList(pe, totalDestPEs, destPEs);
424     nodePERepresentatives.insert(representative);    
425   }
426   
427   // Create an ordered list of PEs to send to, based upon the rootPE
428   // This should distribute load more evenly than the sorted list previously used
429   set<int>::iterator splitter = nodePERepresentatives.upper_bound(rootPE);
430   vector<int> nodePERepresentativesOrdered;
431   for(set<int>::iterator iter = splitter; iter!=nodePERepresentatives.end(); ++iter){
432     nodePERepresentativesOrdered.push_back(*iter);
433   }
434   for(set<int>::iterator iter = nodePERepresentatives.begin(); iter!=splitter; ++iter){
435     nodePERepresentativesOrdered.push_back(*iter);
436   }
437
438   CkAssert(nodePERepresentativesOrdered.size() == nodePERepresentatives.size());
439     
440   int numRepresentativePEs = nodePERepresentatives.size();
441   
442   int repForMyPe=-1;
443   if(myIndex != -1)
444     repForMyPe = getFirstPeOnPhysicalNodeFromList(CkMyPe(), totalDestPEs, destPEs);
445   
446 #if DEBUG
447   CkPrintf("[%d] Multicasting to %d PEs on %d physical nodes  repForMyPe=%d\n", CkMyPe(), totalDestPEs, numRepresentativePEs, repForMyPe);
448   fflush(stdout);
449 #endif
450   
451   // If this PE is part of the multicast tree, then it should forward the message along
452   if(CkMyPe() == repForMyPe || myIndex == -1){
453     // I am an internal node in the multicast tree
454     
455     // flatten the nodePERepresentativesOrdered data structure
456     int *repPeList = new int[numRepresentativePEs];
457     int myRepIndex = -1;
458     int p=0;
459     for(vector<int>::iterator iter=nodePERepresentativesOrdered.begin(); iter != nodePERepresentativesOrdered.end(); iter++){
460       repPeList[p] = *iter;
461       if(*iter == repForMyPe)
462         myRepIndex = p;
463       p++;
464     }
465     CkAssert(myRepIndex >=0 || myIndex==-1);
466       
467     // The logical indices start at 0 = root node. Logical index i corresponds to the entry i+1 in the array of PEs in the message
468     int sendLogicalIndexStart = degree*(myRepIndex+1) + 1;       // inclusive
469     int sendLogicalIndexEnd = sendLogicalIndexStart + degree - 1;   // inclusive
470     
471     if(sendLogicalIndexEnd-1 >= numRepresentativePEs){
472       sendLogicalIndexEnd = numRepresentativePEs;
473     }
474     
475     int numSendTree = sendLogicalIndexEnd - sendLogicalIndexStart + 1;
476     if(numSendTree < 0)
477       numSendTree = 0;
478     
479     vector<int> otherLocalPes = getOtherPesOnPhysicalNodeFromList(CkMyPe(), totalDestPEs, destPEs);
480     int numSendLocal;
481     if(myIndex == -1)
482       numSendLocal = 0;
483     else 
484       numSendLocal = otherLocalPes.size();
485     
486     
487
488 #if DEBUG
489     CkPrintf("[%d] numSendTree=%d numSendLocal=%d sendLogicalIndexStart=%d sendLogicalIndexEnd=%d\n", CkMyPe(), numSendTree, numSendLocal,  sendLogicalIndexStart, sendLogicalIndexEnd);
490     fflush(stdout);
491 #endif
492
493     int numSend = numSendTree + numSendLocal;
494     if(numSend <= 0){
495       npes = 0;
496       return;
497     }
498     
499     npes = numSend;
500     pelist = new int[npes];
501   
502     for(int i=0;i<numSendTree;i++){
503       CkAssert(sendLogicalIndexStart-1+i < numRepresentativePEs);
504       pelist[i] = repPeList[sendLogicalIndexStart-1+i];
505       CkAssert(pelist[i] < CkNumPes() && pelist[i] >= 0);
506     }
507     
508     delete[] repPeList;
509     repPeList = NULL;
510
511     for(int i=0;i<numSendLocal;i++){
512       pelist[i+numSendTree] = otherLocalPes[i];
513       CkAssert(pelist[i] < CkNumPes() && pelist[i] >= 0);
514     }
515     
516     
517 #if DEBUG
518     char buf[1024];
519     sprintf(buf, "PE %d is sending to Remote Node PEs: ", CkMyPe() );
520     for(int i=0;i<numSend;i++){
521       if(i==numSendTree)
522         sprintf(buf+strlen(buf), " and Local To Node PEs: ", pelist[i]);
523
524       sprintf(buf+strlen(buf), "%d ", pelist[i]);
525     }    
526     CkPrintf("%s\n", buf);
527     fflush(stdout);
528 #endif
529         
530   } else {
531     // We are a leaf PE
532     npes = 0;
533     return;
534   }
535
536   
537   
538 }
539
540
541 void OneTimeNodeTreeRingMulticastStrategy::determineNextHopPEs(const int totalDestPEs, const ComlibMulticastIndexCount* destPEs, const int myIndex, const int rootPE, int * &pelist, int &npes){
542   const int myPe = CkMyPe();
543
544   set<int> nodePERepresentatives;
545   
546   // create a list of PEs, with one for each node to which the message must be sent
547   for(int i=0; i<totalDestPEs; i++){
548     int pe = destPEs[i].pe;
549     int representative = getFirstPeOnPhysicalNodeFromList(pe, totalDestPEs, destPEs);
550     nodePERepresentatives.insert(representative);    
551   }
552
553    // Create an ordered list of PEs to send to, based upon the rootPE
554   // This should distribute load more evenly than the sorted list previously used
555   set<int>::iterator splitter = nodePERepresentatives.upper_bound(rootPE);
556   vector<int> nodePERepresentativesOrdered;
557   for(set<int>::iterator iter = splitter; iter!=nodePERepresentatives.end(); ++iter){
558     nodePERepresentativesOrdered.push_back(*iter);
559   }
560   for(set<int>::iterator iter = nodePERepresentatives.begin(); iter!=splitter; ++iter){
561     nodePERepresentativesOrdered.push_back(*iter);
562   }
563
564   CkAssert(nodePERepresentativesOrdered.size() == nodePERepresentatives.size());
565   int numRepresentativePEs = nodePERepresentatives.size();
566   
567   int repForMyPe=-1;
568   if(myIndex != -1)
569     repForMyPe = getFirstPeOnPhysicalNodeFromList(CkMyPe(), totalDestPEs, destPEs);
570   
571 #if DEBUG
572   CkPrintf("[%d] Multicasting to %d PEs on %d physical nodes  repForMyPe=%d\n", CkMyPe(), totalDestPEs, numRepresentativePEs, repForMyPe);
573   fflush(stdout);
574 #endif
575   
576   // If this PE is part of the multicast tree, then it should forward the message along
577   if(CkMyPe() == repForMyPe || myIndex == -1){
578     // I am an internal node in the multicast tree
579     
580     // flatten the data structure for nodePERepresentatives
581     int *repPeList = new int[numRepresentativePEs];
582     int myRepIndex = -1;
583     int p=0;
584     for(vector<int>::iterator iter=nodePERepresentativesOrdered.begin(); iter != nodePERepresentativesOrdered.end(); iter++){
585       repPeList[p] = *iter;
586       if(*iter == repForMyPe)
587         myRepIndex = p;
588       p++;
589     }
590     CkAssert(myRepIndex >=0 || myIndex==-1);
591       
592     // The logical indices start at 0 = root node. Logical index i corresponds to the entry i+1 in the array of PEs in the message
593     int sendLogicalIndexStart = degree*(myRepIndex+1) + 1;       // inclusive
594     int sendLogicalIndexEnd = sendLogicalIndexStart + degree - 1;   // inclusive
595     
596     if(sendLogicalIndexEnd-1 >= numRepresentativePEs){
597       sendLogicalIndexEnd = numRepresentativePEs;
598     }
599     
600     int numSendTree = sendLogicalIndexEnd - sendLogicalIndexStart + 1;
601     if(numSendTree < 0)
602       numSendTree = 0;
603
604
605     // Send in a ring to the PEs on this node
606     vector<int> otherLocalPes = getOtherPesOnPhysicalNodeFromList(CkMyPe(), totalDestPEs, destPEs);
607     int numSendLocal = 0;
608     if(myIndex == -1)
609       numSendLocal = 0;
610     else {
611       if(otherLocalPes.size() > 0)
612         numSendLocal = 1;
613       else
614         numSendLocal = 0;
615     }
616     
617
618 #if DEBUG
619     CkPrintf("[%d] numSendTree=%d numSendLocal=%d sendLogicalIndexStart=%d sendLogicalIndexEnd=%d\n", CkMyPe(), numSendTree, numSendLocal,  sendLogicalIndexStart, sendLogicalIndexEnd);
620     fflush(stdout);
621 #endif
622
623     int numSend = numSendTree + numSendLocal;
624     if(numSend <= 0){
625       npes = 0;
626       return;
627     }
628     
629     npes = numSend;
630     pelist = new int[npes];
631   
632     for(int i=0;i<numSendTree;i++){
633       CkAssert(sendLogicalIndexStart-1+i < numRepresentativePEs);
634       pelist[i] = repPeList[sendLogicalIndexStart-1+i];
635       CkAssert(pelist[i] < CkNumPes() && pelist[i] >= 0);
636     }
637     
638     delete[] repPeList;
639     repPeList = NULL;
640
641     for(int i=0;i<numSendLocal;i++){
642       pelist[i+numSendTree] = otherLocalPes[i];
643       CkAssert(pelist[i] < CkNumPes() && pelist[i] >= 0);
644     }
645     
646     
647 #if DEBUG
648     char buf[1024];
649     sprintf(buf, "PE %d is sending to Remote Node PEs: ", CkMyPe() );
650     for(int i=0;i<numSend;i++){
651       if(i==numSendTree)
652         sprintf(buf+strlen(buf), " and Local To Node PEs: ", pelist[i]);
653
654       sprintf(buf+strlen(buf), "%d ", pelist[i]);
655     }    
656     CkPrintf("%s\n", buf);
657     fflush(stdout);
658 #endif
659         
660   } else {
661     // We are a leaf PE, so forward in a ring to the PEs on this node
662     const vector<int> otherLocalPes = getPesOnPhysicalNodeFromList(CkMyPe(), totalDestPEs, destPEs);
663     
664     npes = 0;
665     pelist = new int[1];
666     
667     //    CkPrintf("[%d] otherLocalPes.size=%d\n", CkMyPe(), otherLocalPes.size() ); 
668     const int numOthers = otherLocalPes.size() ;
669     
670     for(int i=0;i<numOthers;i++){
671       if(otherLocalPes[i] == CkMyPe()){
672         // found me in the PE list for this node
673         if(i+1<otherLocalPes.size()){
674           // If we have a successor in the ring
675           pelist[0] = otherLocalPes[i+1];
676           npes = 1;
677         }
678       }
679     }
680     
681     
682 #if DEBUG
683     if(npes==0)
684       CkPrintf("[%d] At end of ring\n", CkMyPe() );
685     else
686       CkPrintf("[%d] sending along ring to %d\n", CkMyPe(), pelist[0] );
687     
688     fflush(stdout);
689 #endif
690     
691     
692   }
693   
694   
695   
696 }
697
698 // If min == 1 then this function finds the min else the max value in the array
699 // This function returns the index of the array that it found to be the max or the min
700 int OneTimeDimensionOrderedMulticastStrategy::findMinMaxArray(int min, int len, int *array, bool* notincluded, int value) {
701   int k = value;
702   int a = -1;
703   for (int j = 0; j < len; j++) {
704     if (notincluded[j]) continue;
705     if (min && array[j] < k) {
706       k = array[j];
707       a = j;
708     } else if (!min && array[j] > k) {
709       k = array[j];
710       a = j;
711     }
712   }
713   return a;
714 }
715
716 void OneTimeDimensionOrderedMulticastStrategy::determineNextHopPEs(const int totalDestPEs, const ComlibMulticastIndexCount* destPEs, const int myIndex, const int rootPe, int * &pelist, int &npes) {
717   const int myPe = CkMyPe();
718
719   set<int> nodePEReps;
720   
721   // create a list of PEs, with one for each node to which the message must be sent
722   for(int i=0; i<totalDestPEs; i++){
723     int pe = destPEs[i].pe;
724     CkAssert(pe != rootPe);
725     if (myPe == 0)
726       CkPrintf("destPE = %d\n", pe);
727     int rep = getFirstPeOnPhysicalNodeFromList(pe, totalDestPEs, destPEs);
728     nodePEReps.insert(rep);
729   }
730   
731   int numRepPEs = nodePEReps.size();
732   
733   int repForMyPe = -1;
734   if(myIndex != -1)
735     repForMyPe = getFirstPeOnPhysicalNodeFromList(CkMyPe(), totalDestPEs, destPEs);
736   
737   map<int, set<int> > spanTree;
738
739   TopoManager tmgr;
740
741   int myX, myY, myZ, myT;
742   tmgr.rankToCoordinates(rootPe, myX, myY, myZ, myT);
743
744   map<int, int> peRef;
745   int *repPeRef = new int[numRepPEs+1];
746   int *repPeX = new int[numRepPEs+1];
747   int *repPeY = new int[numRepPEs+1];
748   int *repPeZ = new int[numRepPEs+1];
749
750   int i = 0, myRepIndex;
751
752   for (set<int>::iterator iter = nodePEReps.begin(); iter != nodePEReps.end(); ++iter) {
753       int pe = *iter;
754       repPeRef[i] = pe;
755       peRef[pe] = i;
756       int t; // ignore the 't' dimension (which PE on the node)
757       tmgr.rankToCoordinates(pe, repPeX[i], repPeY[i], repPeZ[i], t);
758       if (*iter == repForMyPe)
759           myRepIndex = i;
760       i++;
761   }
762
763   int t;
764   repPeRef[i] = rootPe;
765   peRef[rootPe] = i;
766   tmgr.rankToCoordinates(rootPe, repPeX[i], repPeY[i], repPeZ[i], t);
767
768   CkAssert(myRepIndex >= 0 || myIndex == -1);
769  
770   bool *destAdded = new bool[numRepPEs];
771
772   for (int i = 0; i < numRepPEs; i++)
773       destAdded[i] = false;
774
775   int mode = 0; // 0 = x-axis, 1 = y-axis, 2 = z-axis
776
777   spanTree[rootPe].insert(rootPe);
778
779   //CkPrintf("Starting to build the tree...\n");
780
781   while (spanTree.size() < numRepPEs+1) {
782       int k = 0;
783       for (int i = 0; i < numRepPEs; i++) {
784           if (destAdded[i])
785               k++;
786       }
787
788       //CkPrintf("size of destAdded = %d, numRepPEs = %d, spanTree.size() = %d\n", k, numRepPEs, spanTree.size());
789
790       for(map<int, set<int> >::iterator iter = spanTree.begin(); iter != spanTree.end(); ++iter) {
791           int pe = iter->first;
792           int iPe = peRef[pe];
793           if (mode % 4 == 0) {
794               // Move in the -x direction
795               int i1 = findMinMaxArray(1, numRepPEs, repPeX, destAdded, repPeX[iPe]);
796         
797               if (i1 != -1) {
798                   destAdded[i1] = true;
799                   spanTree[pe].insert(repPeRef[i1]);
800                   spanTree[repPeRef[i1]].insert(repPeRef[i1]);
801                   //CkPrintf("added to -x\n");
802               }
803
804               // Move in the +x direction
805               int i2 = findMinMaxArray(0, numRepPEs, repPeX, destAdded, repPeX[iPe]);
806                 
807               if (i2 != -1) {
808                   destAdded[i2] = true;
809                   spanTree[pe].insert(repPeRef[i2]);
810                   spanTree[repPeRef[i2]].insert(repPeRef[i2]);
811                   //CkPrintf("added to +x\n");
812               }
813           } else if (mode % 4 == 1) {
814               bool* notEqX = new bool[numRepPEs];
815               for (int i = 0; i < numRepPEs; i++) {
816                   notEqX[i] = destAdded[i];
817                   if (!destAdded[i] && repPeX[iPe] != repPeX[i])
818                       notEqX[i] = true;
819               }
820
821               // Move in the -y direction
822               int i1 = findMinMaxArray(1, numRepPEs, repPeY, notEqX, repPeY[iPe]);
823         
824               if (i1 != -1) {
825                   destAdded[i1] = true;
826                   spanTree[pe].insert(repPeRef[i1]);
827                   spanTree[repPeRef[i1]].insert(repPeRef[i1]);
828                   //CkPrintf("added to -y\n");
829               }
830
831               // Move in the +y direction
832               int i2 = findMinMaxArray(0, numRepPEs, repPeY, notEqX, repPeY[iPe]);
833                 
834               if (i2 != -1) {
835                   destAdded[i2] = true;
836                   spanTree[pe].insert(repPeRef[i2]);
837                   spanTree[repPeRef[i2]].insert(repPeRef[i2]);
838                   //CkPrintf("added to +y\n");
839               }
840
841               delete[] notEqX;
842           } else if (mode % 4 == 2) {
843               bool* notEqXY = new bool[numRepPEs];
844               for (int i = 0; i < numRepPEs; i++) {
845                   notEqXY[i] = destAdded[i];
846                   if (!destAdded[i] && (repPeX[iPe] != repPeX[i] || repPeY[iPe] != repPeY[i]))
847                       notEqXY[i] = true;
848               }
849
850               // Move in the -z direction
851               int i1 = findMinMaxArray(1, numRepPEs, repPeZ, notEqXY, repPeZ[iPe]);
852         
853               if (i1 != -1) {
854                   destAdded[i1] = true;
855                   spanTree[pe].insert(repPeRef[i1]);
856                   spanTree[repPeRef[i1]].insert(repPeRef[i1]);
857                   //CkPrintf("added to -z\n");
858               }
859
860               // Move in the +z direction
861               int i2 = findMinMaxArray(0, numRepPEs, repPeZ, notEqXY, repPeZ[iPe]);
862                 
863               if (i2 != -1) {
864                   destAdded[i2] = true;
865                   spanTree[pe].insert(repPeRef[i2]);
866                   spanTree[repPeRef[i2]].insert(repPeRef[i2]);
867                   //CkPrintf("added to +z\n");
868               }
869
870               delete[] notEqXY;
871           }
872       }
873       mode++;
874   }
875
876   /*CkPrintf("Finished creating spanning tree\n");*/
877
878   static bool firstTime = true;
879
880   if (myPe == 0 && firstTime) {
881       firstTime = false;
882       for(map<int, set<int> >::iterator iter = spanTree.begin(); iter != spanTree.end(); ++iter) {
883           CkPrintf("Map %d to: ", iter->first);
884           for(set<int>::iterator iter2 = iter->second.begin(); iter2 != iter->second.end(); ++iter2) {
885               CkPrintf("%d, ", *iter2);
886           }
887           CkPrintf("\n");
888       }
889   }
890
891   // Send to local PEs
892   vector<int> otherLocalPes = getOtherPesOnPhysicalNodeFromList(CkMyPe(), totalDestPEs, destPEs);
893   int numSendLocal = otherLocalPes.size();
894
895   int numSend = spanTree[myPe].size() > 0 ? spanTree[myPe].size()-1 + numSendLocal : numSendLocal;
896     
897   if(numSend <= 0) {
898       npes = 0;
899       return;
900   }
901     
902   //CkPrintf("Sending to %d processors based on tree + local nodes\n", numSend);
903
904   npes = numSend;
905   pelist = new int[npes];
906   
907   i = 0;
908
909   for (set<int>::iterator iter = spanTree[myPe].begin(); iter != spanTree[myPe].end(); ++iter) {
910       if (*iter != myPe) {
911           pelist[i] = *iter;
912           i++;
913       }
914   }
915
916   for(int j = 0; j < numSendLocal; j++){
917       pelist[i] = otherLocalPes[j];
918       i++;
919   }
920 }
921
922 #include "spanningTreeStrategy.h"
923
924 using namespace topo;
925
926 void OneTimeTopoTreeMulticastStrategy::determineNextHopPEs(const int totalDestPEs, const ComlibMulticastIndexCount* destPEs, const int myIndex, const int rootPE, int * &pelist, int &npes)
927 {
928     /// Initialize
929     npes = 0; 
930     int myPE = (myIndex<0)? rootPE : destPEs[myIndex].pe;
931
932     /// Create a container of SpanningTreeVertex-es from the input list of PEs (include the root PE too)
933     std::vector<topo::SpanningTreeVertex> tree;
934     tree.push_back( topo::SpanningTreeVertex(rootPE) );
935     for (int i=0; i< totalDestPEs; i++)
936         tree.push_back( topo::SpanningTreeVertex(destPEs[i].pe) );
937
938     /// Build the complete spanning tree
939     topo::buildSpanningTree(tree.begin(),tree.end(),degree);
940
941     /// Identify this PE in the tree and find immediate children
942     int peIdx = -1;
943     bool noMatchFound = true;
944     while ( (++peIdx < tree.size()) && noMatchFound)
945     {
946         if (myPE == tree[peIdx].id)
947         {
948             /// Add immediate children to pelist and set npes accordingly
949             npes   = tree[peIdx].childIndex.size();
950             pelist = new int[npes];
951             for (int i=0; i< npes; i++)
952                 pelist[i] = tree[ peIdx + tree[peIdx].childIndex[i] ].id; ///< child indices are relative distances from the parent in the container
953             noMatchFound = false;
954         }
955     }
956 }
957
958 /*@}*/