fix of the MultiRingMulticast strategy
[charm.git] / src / ck-com / MultiRingMulticast.C
1
2 #include "MultiRingMulticast.h"
3
4 //Array Constructor
5 MultiRingMulticast::MultiRingMulticast(CkArrayID dest_aid, int flag)
6     : DirectMulticastStrategy(dest_aid, flag){
7 }
8
9
10 void MultiRingMulticast::pup(PUP::er &p){
11
12     DirectMulticastStrategy::pup(p);
13 }
14
15 void MultiRingMulticast::beginProcessing(int  nelements){
16
17     DirectMulticastStrategy::beginProcessing(nelements);
18 }
19
20
21 inline int getMyId(int *pelist, int npes, int mype) {
22     int myid = -1;
23
24     for(int count = 0; count < npes; count ++)
25         if(mype == pelist[count])
26             myid = count;
27
28     //if(myid == -1)
29     //  CkPrintf("Warning myid = -1\n");
30
31     return myid;
32 }
33
34
35 //Assumes a sorted input. Returns the next processor greater a given
36 //current pe
37 inline void getNextPe(int *pelist, int npes, int mype, int &nextpe) {
38     
39     nextpe = pelist[0];
40     
41     int count= 0;
42     for(count = 0; count < npes; count++)
43         if(pelist[count] > mype)
44             break;
45     
46     if(count < npes) 
47         nextpe = pelist[count];
48     
49     return;
50 }
51
52
53 inline int getMidPe(int *pelist, int npes, int src_pe) {
54     
55     int my_id = 0;
56     int mid_pe = -1;
57     
58     my_id = getMyId(pelist, npes, src_pe);    
59     
60     CkAssert(my_id >= 0 && my_id < npes);
61
62     if(my_id < npes/2)
63         mid_pe = pelist[npes/2 + my_id];        
64     else
65         mid_pe = pelist[my_id % (npes/2)];
66     
67     //if(mid_pe == -1)
68     //  CkPrintf("Warning midpe = -1\n");
69
70     return mid_pe;
71 }
72
73 //Unlike ring the source here sends two or more messages while all
74 //elements along the ring only send one.
75
76 ComlibSectionHashObject *MultiRingMulticast::createObjectOnSrcPe
77 (int nelements, CkArrayIndexMax *elements){
78
79     ComlibSectionHashObject *obj = new ComlibSectionHashObject();
80
81     obj->npes = 0;
82     obj->pelist = 0;
83
84     int *pelist;
85     int npes;
86     sinfo.getRemotePelist(nelements, elements, npes, pelist);
87     
88     sinfo.getLocalIndices(nelements, elements, obj->indices);
89
90     if(npes == 0)
91         return obj;
92
93     if(npes < 4) {
94         obj->npes = npes;
95         obj->pelist = pelist;
96         CkPrintf("MultiRingMulticast::createObjectOnSrcPe, less than 4 procs\n");
97
98         return obj;
99     }
100     
101     CkPrintf("MultiRingMulticast::createObjectOnSrcPe, more than 3 procs\n");
102     //pelist[npes ++] = CkMyPe();
103     qsort(pelist, npes, sizeof(int), intCompare);
104
105     /*
106       char dump[2560];
107       sprintf(dump, "Section on %d : ", CkMyPe());
108       for(int count = 0; count < npes; count ++) {
109       sprintf(dump, "%s, %d", dump, pelist[count]);
110       }
111     
112       CkPrintf("%s\n\n", dump);
113     */
114     
115     int myid = -1; // getMyId(pelist, npes, CkMyPe());    
116     for (int i=0; i<npes; ++i) {
117       if (pelist[i] == CkMyPe()) {
118         myid = i;
119         break;
120       }
121     }
122
123     //CkAssert(myid >= 0 && myid < npes);
124
125     int breaking = npes/2; /* 0 : breaking-1    is the first ring
126                               breaking : npes-1 is the second ring
127                            */
128
129     int nextpe = myid + 1;
130     // wrap nextpe around the ring
131     if(myid < breaking) {
132       if (nextpe >= breaking) nextpe = 0;
133     } else {
134       if (nextpe >= npes) nextpe = breaking;
135     }
136     
137     int midpe;
138     if (myid < breaking) {
139       midpe = myid + breaking;
140       if (midpe >= npes || midpe < breaking) midpe = breaking;
141     } else {
142       midpe = myid - breaking;
143     }
144     //mid_pe = getMidPe(pelist, npes, CkMyPe());
145     
146     if(nextpe != CkMyPe()) {
147         obj->pelist = new int[2];
148         obj->npes = 2;
149         
150         obj->pelist[0] = pelist[nextpe];
151         obj->pelist[1] = pelist[midpe];
152     }
153     else {
154         CkAbort("Warning Should not be here !!!!!!!!!\n");
155         //obj->pelist = new int[1];
156         //obj->npes = 1;
157         
158         //obj->pelist[0] = midpe;
159     }
160     
161     delete [] pelist;
162
163     //CkPrintf("%d Src = %d Next = %d Mid Pe =%d\n", CkMyPe(), CkMyPe(), nextpe, mid_pe);    
164     
165     return obj;
166 }
167
168
169 ComlibSectionHashObject *MultiRingMulticast::createObjectOnIntermediatePe(int nindices, CkArrayIndexMax *idxlist, int npes, ComlibMulticastIndexCount *counts, int srcpe) {
170
171     ComlibSectionHashObject *obj = new ComlibSectionHashObject();
172
173     CkPrintf("MultiRingMulticast: creating object on intermediate Pe %d-%d (%d-%d)\n", CkMyPe(),srcpe, npes,nindices);
174     //int *pelist;
175     //int npes;
176     //sinfo.getRemotePelist(nelements, elements, npes, pelist);
177     
178     obj->pelist = 0;
179     obj->npes = 0;
180     for (int i=0; i<nindices; ++i) obj->indices.insertAtEnd(idxlist[i]);
181     //sinfo.getLocalIndices(nelements, elements, obj->indices);
182
183     //pelist[npes ++] = CkMyPe();
184
185     if(npes <= 4)
186         return obj;
187     
188     qsort(counts, npes, sizeof(ComlibMulticastIndexCount), indexCountCompare);
189     
190     int myid = -1;
191     for (int i=0; i<npes; ++i) {
192       if (counts[i].pe == CkMyPe()) {
193         myid = i;
194         break;
195       }
196     }
197     //getMyId(pelist, npes, CkMyPe());
198     
199     CkAssert(myid >= 0 && myid < npes);
200
201     int breaking = npes/2;
202     int srcid = 0; // = getMyId(pelist, npes, src_pe);
203     for (int i=0; i<npes; ++i) {
204       if (counts[i].pe == srcpe) {
205         srcid = i;
206         break;
207       }
208     }
209
210     if (srcid < breaking ^ myid < breaking) {
211       // if we are in the two different halves, correct srcid
212       if (srcid < breaking) {
213         srcid += breaking;
214         if (srcid >= npes || srcid < breaking) srcid = breaking;
215       } else {
216         srcid -= breaking;
217       }
218     }
219     // now srcid is the starting point of this half ring, which could be the
220     // original sender itself (0 if the sender is not part of the recipients),
221     // or the counterpart in the other ring
222
223     int nextid = myid + 1;
224     // wrap nextpe around the ring
225     if(myid < breaking) {
226       if (nextid >= breaking) nextid = 0;
227     }
228     else {
229       if (nextid >= npes) nextid = breaking;
230     }
231
232     if (nextid != srcid) {
233       obj->pelist = new int[1];
234       obj->npes = 1;
235       obj->pelist[0] = counts[nextid].pe;
236     }
237
238     return obj;
239 }