a5e391b2198a38c03dc903686d1e52289b462241
[charm.git] / src / ck-com / MultiRingMulticast.C
1
2 #include "MultiRingMulticast.h"
3
4 //Array Constructor
5 MultiRingMulticast::MultiRingMulticast(CkArrayID dest_aid, int flag)
6     : DirectMulticastStrategy(dest_aid, flag){
7 }
8
9
10 void MultiRingMulticast::pup(PUP::er &p){
11
12     DirectMulticastStrategy::pup(p);
13 }
14
15 void MultiRingMulticast::beginProcessing(int  nelements){
16
17     DirectMulticastStrategy::beginProcessing(nelements);
18 }
19
20
21 inline int getMyId(int *pelist, int npes, int mype) {
22     int myid = -1;
23
24     for(int count = 0; count < npes; count ++)
25         if(mype == pelist[count])
26             myid = count;
27
28     //if(myid == -1)
29     //  CkPrintf("Warning myid = -1\n");
30
31     return myid;
32 }
33
34
35 //Assumes a sorted input. Returns the next processor greater a given
36 //current pe
37 inline void getNextPe(int *pelist, int npes, int mype, int &nextpe) {
38     
39     nextpe = pelist[0];
40     
41     int count= 0;
42     for(count = 0; count < npes; count++)
43         if(pelist[count] > mype)
44             break;
45     
46     if(count < npes) 
47         nextpe = pelist[count];
48     
49     return;
50 }
51
52
53 inline int getMidPe(int *pelist, int npes, int src_pe) {
54     
55     int my_id = 0;
56     int mid_pe = -1;
57     
58     my_id = getMyId(pelist, npes, src_pe);    
59     
60     CkAssert(my_id >= 0 && my_id < npes);
61
62     if(my_id < npes/2)
63         mid_pe = pelist[npes/2 + my_id];        
64     else
65         mid_pe = pelist[my_id % (npes/2)];
66     
67     //if(mid_pe == -1)
68     //  CkPrintf("Warning midpe = -1\n");
69
70     return mid_pe;
71 }
72
73 //Unlike ring the source here sends two or more messages while all
74 //elements along the ring only send one.
75
76 ComlibSectionHashObject *MultiRingMulticast::createObjectOnSrcPe
77 (int nelements, CkArrayIndexMax *elements){
78
79     ComlibSectionHashObject *obj = new ComlibSectionHashObject();
80
81     obj->npes = 0;
82     obj->pelist = 0;
83
84     int *pelist;
85     int npes;
86     sinfo.getRemotePelist(nelements, elements, npes, pelist);
87     
88     sinfo.getLocalIndices(nelements, elements, obj->indices);
89
90     if(npes == 0)
91         return obj;
92
93     if(npes == 1) {        
94         obj->npes = 1;        
95         obj->pelist = pelist;
96
97         return obj;
98     }
99
100     if(npes == 2) {        
101         obj->npes = 2;        
102         obj->pelist = pelist;
103         
104         return obj;
105     }
106     
107     pelist[npes ++] = CkMyPe();
108     qsort(pelist, npes, sizeof(int), intCompare);
109
110     /*
111       char dump[2560];
112       sprintf(dump, "Section on %d : ", CkMyPe());
113       for(int count = 0; count < npes; count ++) {
114       sprintf(dump, "%s, %d", dump, pelist[count]);
115       }
116     
117       CkPrintf("%s\n\n", dump);
118     */
119     
120     int myid = getMyId(pelist, npes, CkMyPe());    
121
122     CkAssert(myid >= 0 && myid < npes);
123
124     int nextpe = -1;
125     
126     if(myid < npes / 2) 
127         getNextPe(pelist, npes/2, CkMyPe(), nextpe);
128     else 
129         getNextPe(pelist+npes/2, npes - npes/2, CkMyPe(), nextpe);
130     
131     int mid_pe = -1;
132     mid_pe = getMidPe(pelist, npes, CkMyPe());
133     
134     delete [] pelist;
135
136     if(nextpe != CkMyPe()) {
137         obj->pelist = new int[2];
138         obj->npes = 2;
139         
140         obj->pelist[0] = nextpe;
141         obj->pelist[1] = mid_pe;
142     }
143     else {
144         CkPrintf("Warning Should not be here !!!!!!!!!\n");
145         obj->pelist = new int[1];
146         obj->npes = 1;
147         
148         obj->pelist[0] = mid_pe;
149     }
150     
151     //CkPrintf("%d Src = %d Next = %d Mid Pe =%d\n", CkMyPe(), CkMyPe(), nextpe, mid_pe);    
152     
153     return obj;
154 }
155
156
157 ComlibSectionHashObject *MultiRingMulticast::createObjectOnIntermediatePe
158 (int nelements, CkArrayIndexMax *elements, int src_pe){
159
160     ComlibSectionHashObject *obj = new ComlibSectionHashObject();
161
162     int *pelist;
163     int npes;
164     sinfo.getRemotePelist(nelements, elements, npes, pelist);
165     
166     obj->pelist = 0;
167     obj->npes = 0;   
168     sinfo.getLocalIndices(nelements, elements, obj->indices);
169
170     pelist[npes ++] = CkMyPe();
171
172     if(npes <= 3)
173         return obj;
174     
175     qsort(pelist, npes, sizeof(int), intCompare);
176     
177     int myid = getMyId(pelist, npes, CkMyPe());
178     
179     CkAssert(myid >= 0 && myid < npes);
180
181     int src_id = getMyId(pelist, npes, src_pe);
182     
183     if(src_id == -1) { //Src isnt the receipient of the multicast
184         pelist[npes ++] = src_pe;
185         qsort(pelist, npes, sizeof(int), intCompare);
186         src_id = getMyId(pelist, npes, src_pe);
187         myid = getMyId(pelist, npes, CkMyPe());
188         
189         CkAssert(src_id >= 0 && src_id < npes);
190     }
191
192     int nextpe = -1;
193     int new_src_pe = src_pe;
194     int mid_pe = getMidPe(pelist, npes, src_pe);
195     
196     if(myid < npes / 2) {
197         getNextPe(pelist, npes/2, CkMyPe(), nextpe);
198
199         //If source is in the other half I have to change to the
200         //middle guy
201         if(src_id >= npes/2)
202             new_src_pe = mid_pe;
203     }
204     else {
205         getNextPe(pelist + (npes/2), npes - npes/2, CkMyPe(), nextpe);
206
207         //If source is in the other half I have to change to the
208         //middle guy
209         if(src_id < npes/2)
210             new_src_pe = mid_pe;
211     }
212
213     bool end_flag = isEndOfRing(nextpe, new_src_pe);
214
215     if(!end_flag) {
216         obj->pelist = new int[1];
217         obj->npes = 1;
218         obj->pelist[0] = nextpe;
219     }
220     
221     //CkPrintf("%d: Src = %d Next = %d end = %d Midpe = %d\n", CkMyPe(), src_pe, 
222     //       nextpe, end_flag, mid_pe);    
223     
224     delete [] pelist;
225     return obj;
226 }
227
228 //We need to end the ring,
229 //    if next_pe is the same as the source_pe, or
230 //    if next_pe is the first processor in the ring, greater than srouce_pe.
231 //Both these comparisons are done in a 'cyclic' way with wraparounds.
232
233 int MultiRingMulticast::isEndOfRing(int next_pe, int src_pe){
234     
235     ComlibPrintf("[%d] isEndofring %d, %d\n", CkMyPe(), next_pe, src_pe);
236     
237     if((next_pe != CkMyPe()) && (next_pe != src_pe)) 
238         return 0;
239     
240     return 1;
241 }
242