4f0f21e78946d8053d0a219752e9de1114126b41
[charm.git] / src / ck-com / MultiRingMulticast.C
1
2 #include "MultiRingMulticast.h"
3
4 //Array Constructor
5 MultiRingMulticast::MultiRingMulticast(CkArrayID dest_aid, int flag)
6     : DirectMulticastStrategy(dest_aid, flag){
7 }
8
9
10 void MultiRingMulticast::pup(PUP::er &p){
11
12     DirectMulticastStrategy::pup(p);
13 }
14
15 void MultiRingMulticast::beginProcessing(int  nelements){
16
17     DirectMulticastStrategy::beginProcessing(nelements);
18 }
19
20
21 inline int getMyId(int *pelist, int npes, int mype) {
22     int myid = -1;
23
24     for(int count = 0; count < npes; count ++)
25         if(mype == pelist[count])
26             myid = count;
27
28     if(myid == -1)
29         CkPrintf("Warning myid = -1\n");
30
31     return myid;
32 }
33
34
35 //Assumes a sorted input. Returns the next processor greater a given
36 //current pe
37 inline void getNextPe(int *pelist, int npes, int mype, int &nextpe) {
38     
39     nextpe = pelist[0];
40     
41     int count= 0;
42     for(count = 0; count < npes; count++)
43         if(pelist[count] > mype)
44             break;
45     
46     if(count < npes) 
47         nextpe = pelist[count];
48     
49     return;
50 }
51
52
53 inline int getMidPe(int *pelist, int npes, int src_pe) {
54     
55     int my_id = 0;
56     int mid_pe = -1;
57     
58     my_id = getMyId(pelist, npes, src_pe);    
59     
60     CkAssert(my_id >= 0 && my_id < npes);
61
62     if(my_id < npes/2)
63         mid_pe = pelist[npes/2 + my_id];        
64     else
65         mid_pe = pelist[my_id % (npes/2)];
66     
67     if(mid_pe == -1)
68         CkPrintf("Warning midpe = -1\n");
69
70     return mid_pe;
71 }
72
73 //Unlike ring the source here sends two or more messages while all
74 //elements along the ring only send one.
75
76 ComlibSectionHashObject *MultiRingMulticast::createObjectOnSrcPe
77 (int nelements, CkArrayIndexMax *elements){
78
79     ComlibSectionHashObject *obj = new ComlibSectionHashObject();
80
81     obj->npes = 0;
82     obj->pelist = 0;
83
84     int *pelist;
85     int npes;
86     sinfo.getRemotePelist(nelements, elements, npes, pelist);
87     
88     sinfo.getLocalIndices(nelements, elements, obj->indices);
89
90     if(npes == 0)
91         return obj;
92
93     if(npes == 1) {        
94         obj->npes = 1;        
95         obj->pelist = pelist;
96
97         return obj;
98     }
99
100     if(npes == 2) {        
101         obj->npes = 2;        
102         obj->pelist = pelist;
103         
104         return obj;
105     }
106     
107     pelist[npes ++] = CkMyPe();
108     qsort(pelist, npes, sizeof(int), intCompare);
109
110     char dump[2560];
111     sprintf(dump, "Section on %d : ", CkMyPe());
112     for(int count = 0; count < npes; count ++) {
113         sprintf(dump, "%s, %d", dump, pelist[count]);
114     }
115     
116     CkPrintf("%s\n\n", dump);
117
118     int myid = getMyId(pelist, npes, CkMyPe());    
119
120     CkAssert(myid >= 0 && myid < npes);
121
122     int nextpe = -1;
123     
124     if(myid < npes / 2) 
125         getNextPe(pelist, npes/2, CkMyPe(), nextpe);
126     else 
127         getNextPe(pelist+npes/2, npes - npes/2, CkMyPe(), nextpe);
128     
129     int mid_pe = -1;
130     mid_pe = getMidPe(pelist, npes, CkMyPe());
131     
132     delete [] pelist;
133
134     if(nextpe != CkMyPe()) {
135         obj->pelist = new int[2];
136         obj->npes = 2;
137         
138         obj->pelist[0] = nextpe;
139         obj->pelist[1] = mid_pe;
140     }
141     else {
142         CkPrintf("Warning Should not be here !!!!!!!!!\n");
143         obj->pelist = new int[1];
144         obj->npes = 1;
145         
146         obj->pelist[0] = mid_pe;
147     }
148     
149     CkPrintf("%d Src = %d Next = %d Mid Pe =%d\n", CkMyPe(), CkMyPe(), nextpe, mid_pe);    
150     
151     return obj;
152 }
153
154
155 ComlibSectionHashObject *MultiRingMulticast::createObjectOnIntermediatePe
156 (int nelements, CkArrayIndexMax *elements, int src_pe){
157
158     ComlibSectionHashObject *obj = new ComlibSectionHashObject();
159
160     int *pelist;
161     int npes;
162     sinfo.getRemotePelist(nelements, elements, npes, pelist);
163     
164     obj->pelist = 0;
165     obj->npes = 0;   
166     sinfo.getLocalIndices(nelements, elements, obj->indices);
167
168     pelist[npes ++] = CkMyPe();
169
170     if(npes <= 3)
171         return obj;
172     
173     qsort(pelist, npes, sizeof(int), intCompare);
174     
175     int myid = getMyId(pelist, npes, CkMyPe());
176     
177     CkAssert(myid >= 0 && myid < npes);
178
179     int src_id = getMyId(pelist, npes, src_pe);
180     
181     if(src_id == -1) { //Src isnt the receipient of the multicast
182         pelist[npes ++] = src_pe;
183         qsort(pelist, npes, sizeof(int), intCompare);
184         src_id = getMyId(pelist, npes, src_pe);
185         myid = getMyId(pelist, npes, CkMyPe());
186         
187         CkAssert(src_id >= 0 && src_id < npes);
188     }
189
190     int nextpe = -1;
191     int new_src_pe = src_pe;
192     int mid_pe = getMidPe(pelist, npes, src_pe);
193     
194     if(myid < npes / 2) {
195         getNextPe(pelist, npes/2, CkMyPe(), nextpe);
196
197         //If source is in the other half I have to change to the
198         //middle guy
199         if(src_id >= npes/2)
200             new_src_pe = mid_pe;
201     }
202     else {
203         getNextPe(pelist + (npes/2), npes - npes/2, CkMyPe(), nextpe);
204
205         //If source is in the other half I have to change to the
206         //middle guy
207         if(src_id < npes/2)
208             new_src_pe = mid_pe;
209     }
210
211     bool end_flag = isEndOfRing(nextpe, new_src_pe);
212
213     if(!end_flag) {
214         obj->pelist = new int[1];
215         obj->npes = 1;
216         obj->pelist[0] = nextpe;
217     }
218     
219     CkPrintf("%d: Src = %d Next = %d end = %d Midpe = %d\n", CkMyPe(), src_pe, 
220              nextpe, end_flag, mid_pe);    
221     
222     delete [] pelist;
223     return obj;
224 }
225
226 //We need to end the ring,
227 //    if next_pe is the same as the source_pe, or
228 //    if next_pe is the first processor in the ring, greater than srouce_pe.
229 //Both these comparisons are done in a 'cyclic' way with wraparounds.
230
231 int MultiRingMulticast::isEndOfRing(int next_pe, int src_pe){
232     
233     ComlibPrintf("[%d] isEndofring %d, %d\n", CkMyPe(), next_pe, src_pe);
234     
235     if((next_pe != CkMyPe()) && (next_pe != src_pe)) 
236         return 0;
237     
238     return 1;
239 }
240