fixed a bug in MultiRing where the sorting of the data was causing problems.
[charm.git] / src / ck-com / MultiRingMulticast.C
1
2 #include "MultiRingMulticast.h"
3
4 //Array Constructor
5 MultiRingMulticast::MultiRingMulticast(CkArrayID dest_aid, int flag)
6     : DirectMulticastStrategy(dest_aid, flag){
7 }
8
9
10 void MultiRingMulticast::pup(PUP::er &p){
11
12     DirectMulticastStrategy::pup(p);
13 }
14
15 void MultiRingMulticast::beginProcessing(int  nelements){
16
17     DirectMulticastStrategy::beginProcessing(nelements);
18 }
19
20 /*
21 inline int getMyId(int *pelist, int npes, int mype) {
22     int myid = -1;
23
24     for(int count = 0; count < npes; count ++)
25         if(mype == pelist[count])
26             myid = count;
27
28     //if(myid == -1)
29     //  CkPrintf("Warning myid = -1\n");
30
31     return myid;
32 }
33
34
35 //Assumes a sorted input. Returns the next processor greater a given
36 //current pe
37 inline void getNextPe(int *pelist, int npes, int mype, int &nextpe) {
38     
39     nextpe = pelist[0];
40     
41     int count= 0;
42     for(count = 0; count < npes; count++)
43         if(pelist[count] > mype)
44             break;
45     
46     if(count < npes) 
47         nextpe = pelist[count];
48     
49     return;
50 }
51
52
53 inline int getMidPe(int *pelist, int npes, int src_pe) {
54     
55     int my_id = 0;
56     int mid_pe = -1;
57     
58     my_id = getMyId(pelist, npes, src_pe);    
59     
60     CkAssert(my_id >= 0 && my_id < npes);
61
62     if(my_id < npes/2)
63         mid_pe = pelist[npes/2 + my_id];        
64     else
65         mid_pe = pelist[my_id % (npes/2)];
66     
67     //if(mid_pe == -1)
68     //  CkPrintf("Warning midpe = -1\n");
69
70     return mid_pe;
71 }
72 */
73
74 //Unlike ring the source here sends two or more messages while all
75 //elements along the ring only send one.
76
77 ComlibSectionHashObject *MultiRingMulticast::createObjectOnSrcPe
78 (int nelements, CkArrayIndexMax *elements){
79
80     ComlibSectionHashObject *obj = new ComlibSectionHashObject();
81
82     obj->npes = 0;
83     obj->pelist = 0;
84
85     int *pelist;
86     int npes;
87     sinfo.getPeList(nelements, elements, npes, pelist);
88     
89     sinfo.getLocalIndices(nelements, elements, obj->indices);
90
91     if(npes == 0)
92         return obj;
93
94     if(npes < 4) {
95         // direct sending, take out ourself from the list!
96         for (int i=0; i<npes; ++i) {
97           if (pelist[i] == CkMyPe()) {
98             pelist[i] = pelist[--npes];
99             break;
100           }
101         }
102         obj->npes = npes;
103         obj->pelist = pelist;
104         //CkPrintf("MultiRingMulticast::createObjectOnSrcPe, less than 4 procs\n");
105
106         return obj;
107     }
108     
109     //CkPrintf("MultiRingMulticast::createObjectOnSrcPe, more than 3 procs\n");
110     //pelist[npes ++] = CkMyPe();
111     qsort(pelist, npes, sizeof(int), intCompare);
112
113     /*
114       char dump[2560];
115       sprintf(dump, "Section on %d : ", CkMyPe());
116       for(int count = 0; count < npes; count ++) {
117       sprintf(dump, "%s, %d", dump, pelist[count]);
118       }
119     
120       CkPrintf("%s\n\n", dump);
121     */
122     
123     int myid = -1; // getMyId(pelist, npes, CkMyPe());    
124     for (int i=0; i<npes; ++i) {
125       if (pelist[i] == CkMyPe()) {
126         myid = i;
127         break;
128       }
129     }
130
131     //CkAssert(myid >= 0 && myid < npes);
132
133     int breaking = npes/2; /* 0 : breaking-1    is the first ring
134                               breaking : npes-1 is the second ring
135                            */
136
137     int next_id = myid + 1;
138     // wrap nextpe around the ring
139     if(myid < breaking) {
140       if (next_id >= breaking) next_id = 0;
141     } else {
142       if (next_id >= npes) next_id = breaking;
143     }
144     
145     int mid_id;
146     if (myid < breaking) {
147       mid_id = myid + breaking;
148       if (mid_id < breaking) mid_id = breaking;
149     } else {
150       mid_id = myid - breaking;
151       if (mid_id >= breaking) mid_id = 0;
152     }
153     //mid_pe = getMidPe(pelist, npes, CkMyPe());
154     
155     if(pelist[next_id] != CkMyPe()) {
156         obj->pelist = new int[2];
157         obj->npes = 2;
158         
159         obj->pelist[0] = pelist[next_id];
160         obj->pelist[1] = pelist[mid_id];
161     }
162     else {
163         CkAbort("Warning Should not be here !!!!!!!!!\n");
164         //obj->pelist = new int[1];
165         //obj->npes = 1;
166         
167         //obj->pelist[0] = midpe;
168     }
169     
170     delete [] pelist;
171
172     //CkPrintf("%d Src = %d Next = %d Mid Pe =%d\n", CkMyPe(), CkMyPe(), pelist[next_id], pelist[mid_id]);
173     
174     return obj;
175 }
176
177
178 ComlibSectionHashObject *MultiRingMulticast::createObjectOnIntermediatePe(int nindices, CkArrayIndexMax *idxlist, int npes, ComlibMulticastIndexCount *counts, int srcpe) {
179
180     ComlibSectionHashObject *obj = new ComlibSectionHashObject();
181
182     //CkPrintf("MultiRingMulticast: creating object on intermediate Pe %d-%d (%d-%d)\n", CkMyPe(),srcpe, npes,nindices);
183     //int *pelist;
184     //int npes;
185     //sinfo.getRemotePelist(nelements, elements, npes, pelist);
186     
187     obj->pelist = 0;
188     obj->npes = 0;
189     for (int i=0; i<nindices; ++i) obj->indices.insertAtEnd(idxlist[i]);
190     //sinfo.getLocalIndices(nelements, elements, obj->indices);
191
192     //pelist[npes ++] = CkMyPe();
193
194     if(npes < 4)
195         return obj;
196     
197     //qsort(counts, npes, sizeof(ComlibMulticastIndexCount), indexCountCompare);
198     
199     int myid = -1;
200     for (int i=0; i<npes; ++i) {
201       if (counts[i].pe == CkMyPe()) {
202         myid = i;
203         break;
204       }
205     }
206     //getMyId(pelist, npes, CkMyPe());
207     
208     CkAssert(myid >= 0 && myid < npes);
209
210     int breaking = npes/2;
211     int srcid = 0; // = getMyId(pelist, npes, src_pe);
212     for (int i=0; i<npes; ++i) {
213       if (counts[i].pe == srcpe) {
214         srcid = i;
215         break;
216       }
217     }
218
219     if (srcid < breaking ^ myid < breaking) {
220       // if we are in the two different halves, correct srcid
221       if (srcid < breaking) {
222         srcid += breaking;
223         if (srcid < breaking) srcid = breaking;
224       } else {
225         srcid -= breaking;
226         if (srcid >= breaking) srcid = 0;
227       }
228     }
229     // now srcid is the starting point of this half ring, which could be the
230     // original sender itself (0 if the sender is not part of the recipients),
231     // or the counterpart in the other ring
232
233     int nextid = myid + 1;
234     // wrap nextpe around the ring
235     if(myid < breaking) {
236       if (nextid >= breaking) nextid = 0;
237     }
238     else {
239       if (nextid >= npes) nextid = breaking;
240     }
241
242     if (nextid != srcid) {
243       obj->pelist = new int[1];
244       obj->npes = 1;
245       obj->pelist[0] = counts[nextid].pe;
246     }
247
248     return obj;
249 }