Added doxygen documentation.
[charm.git] / src / ck-com / RingMulticastStrategy.C
1
2 #include "RingMulticastStrategy.h"
3
4 //Array Constructor
5 RingMulticastStrategy::RingMulticastStrategy(CkArrayID dest_aid, int flag)
6   : DirectMulticastStrategy(dest_aid, flag){
7 }
8
9
10 void RingMulticastStrategy::pup(PUP::er &p){
11
12     DirectMulticastStrategy::pup(p);
13 }
14
15 void RingMulticastStrategy::beginProcessing(int  nelements){
16
17     DirectMulticastStrategy::beginProcessing(nelements);
18 }
19
20 ComlibSectionHashObject *RingMulticastStrategy::createObjectOnSrcPe
21 (int nelements, CkArrayIndexMax *elements){
22
23     ComlibSectionHashObject *robj = new ComlibSectionHashObject;
24
25     int next_pe = CkNumPes();
26     int acount = 0;
27     int min_dest = CkNumPes();
28     
29     //Equivalent to sorting the list of destination processors and
30     //sending the message to the next processor greater than MyPE.
31     //If MyPE is the largest processor send it to minpe
32     for(acount = 0; acount < nelements; acount++){
33         
34         CkArrayID dest;
35         int nidx;
36         CkArrayIndexMax *idx_list;        
37         ainfo.getDestinationArray(dest, idx_list, nidx);
38
39         int p = ComlibGetLastKnown(dest, elements[acount]);
40         
41         //Find the smallest destination
42         if(p < min_dest)
43             min_dest = p;
44         
45         //If there is a processor greater than me and less than next_pe
46         //then he is my next_pe
47         if(p > CkMyPe() && next_pe > p) 
48             next_pe = p;       
49
50         if (p == CkMyPe())
51             robj->indices.insertAtEnd(elements[acount]);
52     }
53     
54     //Recycle the destination pelist and start from the begining
55     if(next_pe == CkNumPes() && min_dest != CkMyPe())        
56         next_pe = min_dest;
57     
58     if(next_pe == CkNumPes())
59         next_pe = -1;
60
61     if(next_pe != -1) {
62         robj->pelist = new int[1];
63         robj->npes = 1;
64         robj->pelist[0] = next_pe;
65     }
66     else {
67         robj->pelist = NULL;
68         robj->npes = 0;
69     }        
70     
71     return robj;
72 }
73
74
75 ComlibSectionHashObject *
76 RingMulticastStrategy::createObjectOnIntermediatePe(int nindices,
77                                                     CkArrayIndexMax *idxlist,
78                                                     int npes,
79                                                     ComlibMulticastIndexCount *counts,
80                                                     int src_pe){
81
82   ComlibSectionHashObject *obj = new ComlibSectionHashObject;
83
84   obj->indices.resize(0);
85   for (int i=0; i<nindices; ++i) obj->indices.insertAtEnd(idxlist[i]);
86   //obj = createObjectOnSrcPe(nelements, elements);
87
88   obj->pelist = new int[1];
89   obj->npes = 1;
90   obj->pelist[0] = CkMyPe(); // this is neutral for the if inside next loop
91
92   // find the next processor in the ring
93   for (int i=0; i<npes; ++i) {
94     if (obj->pelist[0] > CkMyPe()) { // we have already found a processor greater
95                                      // than us, find the smallest among them
96       if (counts[i].pe > CkMyPe() && counts[i].pe < obj->pelist[0])
97         obj->pelist[0] = counts[i].pe;
98     } else {  // we have not yet found a processor greater than us, stick with
99               // the smallest, or one greater than us
100       if (counts[i].pe < obj->pelist[0] || counts[i].pe > CkMyPe())
101         obj->pelist[0] = counts[i].pe;
102     }
103   }
104     
105   //here we check if have reached the end of the ring
106   if(obj->npes > 0 && isEndOfRing(*obj->pelist, src_pe)) {
107     delete obj->pelist;
108     obj->pelist = NULL;
109     obj->npes = 0;
110   }
111
112   return obj;
113 }
114
115 //We need to end the ring, 
116 //    if next_pe is the same as the source_pe, or
117 //    if next_pe is the first processor in the ring, greater than srouce_pe.
118 //Both these comparisons are done in a 'cyclic' way with wraparounds.
119
120 int RingMulticastStrategy::isEndOfRing(int next_pe, int src_pe){
121     
122     if(next_pe < 0)
123         return 1;
124     
125     ComlibPrintf("[%d] isEndofring %d, %d\n", CkMyPe(), next_pe, src_pe);
126     
127     if(next_pe > CkMyPe()){
128         if(src_pe <= next_pe && src_pe > CkMyPe())
129             return 1;
130         
131         return 0;
132     }
133     
134     if(src_pe > CkMyPe() || src_pe <= next_pe)
135         return 1;
136     
137     return 0;
138 }