commented out a few useless routines
[charm.git] / src / ck-com / MultiRingMulticast.C
1
2 #include "MultiRingMulticast.h"
3
4 //Array Constructor
5 MultiRingMulticast::MultiRingMulticast(CkArrayID dest_aid, int flag)
6     : DirectMulticastStrategy(dest_aid, flag){
7 }
8
9
10 void MultiRingMulticast::pup(PUP::er &p){
11
12     DirectMulticastStrategy::pup(p);
13 }
14
15 void MultiRingMulticast::beginProcessing(int  nelements){
16
17     DirectMulticastStrategy::beginProcessing(nelements);
18 }
19
20 /*
21 inline int getMyId(int *pelist, int npes, int mype) {
22     int myid = -1;
23
24     for(int count = 0; count < npes; count ++)
25         if(mype == pelist[count])
26             myid = count;
27
28     //if(myid == -1)
29     //  CkPrintf("Warning myid = -1\n");
30
31     return myid;
32 }
33
34
35 //Assumes a sorted input. Returns the next processor greater a given
36 //current pe
37 inline void getNextPe(int *pelist, int npes, int mype, int &nextpe) {
38     
39     nextpe = pelist[0];
40     
41     int count= 0;
42     for(count = 0; count < npes; count++)
43         if(pelist[count] > mype)
44             break;
45     
46     if(count < npes) 
47         nextpe = pelist[count];
48     
49     return;
50 }
51
52
53 inline int getMidPe(int *pelist, int npes, int src_pe) {
54     
55     int my_id = 0;
56     int mid_pe = -1;
57     
58     my_id = getMyId(pelist, npes, src_pe);    
59     
60     CkAssert(my_id >= 0 && my_id < npes);
61
62     if(my_id < npes/2)
63         mid_pe = pelist[npes/2 + my_id];        
64     else
65         mid_pe = pelist[my_id % (npes/2)];
66     
67     //if(mid_pe == -1)
68     //  CkPrintf("Warning midpe = -1\n");
69
70     return mid_pe;
71 }
72 */
73
74 //Unlike ring the source here sends two or more messages while all
75 //elements along the ring only send one.
76
77 ComlibSectionHashObject *MultiRingMulticast::createObjectOnSrcPe
78 (int nelements, CkArrayIndexMax *elements){
79
80     ComlibSectionHashObject *obj = new ComlibSectionHashObject();
81
82     obj->npes = 0;
83     obj->pelist = 0;
84
85     int *pelist;
86     int npes;
87     sinfo.getRemotePelist(nelements, elements, npes, pelist);
88     
89     sinfo.getLocalIndices(nelements, elements, obj->indices);
90
91     if(npes == 0)
92         return obj;
93
94     if(npes < 4) {
95         obj->npes = npes;
96         obj->pelist = pelist;
97         //CkPrintf("MultiRingMulticast::createObjectOnSrcPe, less than 4 procs\n");
98
99         return obj;
100     }
101     
102     //CkPrintf("MultiRingMulticast::createObjectOnSrcPe, more than 3 procs\n");
103     //pelist[npes ++] = CkMyPe();
104     qsort(pelist, npes, sizeof(int), intCompare);
105
106     /*
107       char dump[2560];
108       sprintf(dump, "Section on %d : ", CkMyPe());
109       for(int count = 0; count < npes; count ++) {
110       sprintf(dump, "%s, %d", dump, pelist[count]);
111       }
112     
113       CkPrintf("%s\n\n", dump);
114     */
115     
116     int myid = -1; // getMyId(pelist, npes, CkMyPe());    
117     for (int i=0; i<npes; ++i) {
118       if (pelist[i] == CkMyPe()) {
119         myid = i;
120         break;
121       }
122     }
123
124     //CkAssert(myid >= 0 && myid < npes);
125
126     int breaking = npes/2; /* 0 : breaking-1    is the first ring
127                               breaking : npes-1 is the second ring
128                            */
129
130     int nextpe = myid + 1;
131     // wrap nextpe around the ring
132     if(myid < breaking) {
133       if (nextpe >= breaking) nextpe = 0;
134     } else {
135       if (nextpe >= npes) nextpe = breaking;
136     }
137     
138     int midpe;
139     if (myid < breaking) {
140       midpe = myid + breaking;
141       if (midpe >= npes || midpe < breaking) midpe = breaking;
142     } else {
143       midpe = myid - breaking;
144     }
145     //mid_pe = getMidPe(pelist, npes, CkMyPe());
146     
147     if(nextpe != CkMyPe()) {
148         obj->pelist = new int[2];
149         obj->npes = 2;
150         
151         obj->pelist[0] = pelist[nextpe];
152         obj->pelist[1] = pelist[midpe];
153     }
154     else {
155         CkAbort("Warning Should not be here !!!!!!!!!\n");
156         //obj->pelist = new int[1];
157         //obj->npes = 1;
158         
159         //obj->pelist[0] = midpe;
160     }
161     
162     delete [] pelist;
163
164     //CkPrintf("%d Src = %d Next = %d Mid Pe =%d\n", CkMyPe(), CkMyPe(), nextpe, mid_pe);    
165     
166     return obj;
167 }
168
169
170 ComlibSectionHashObject *MultiRingMulticast::createObjectOnIntermediatePe(int nindices, CkArrayIndexMax *idxlist, int npes, ComlibMulticastIndexCount *counts, int srcpe) {
171
172     ComlibSectionHashObject *obj = new ComlibSectionHashObject();
173
174     //CkPrintf("MultiRingMulticast: creating object on intermediate Pe %d-%d (%d-%d)\n", CkMyPe(),srcpe, npes,nindices);
175     //int *pelist;
176     //int npes;
177     //sinfo.getRemotePelist(nelements, elements, npes, pelist);
178     
179     obj->pelist = 0;
180     obj->npes = 0;
181     for (int i=0; i<nindices; ++i) obj->indices.insertAtEnd(idxlist[i]);
182     //sinfo.getLocalIndices(nelements, elements, obj->indices);
183
184     //pelist[npes ++] = CkMyPe();
185
186     if(npes <= 4)
187         return obj;
188     
189     qsort(counts, npes, sizeof(ComlibMulticastIndexCount), indexCountCompare);
190     
191     int myid = -1;
192     for (int i=0; i<npes; ++i) {
193       if (counts[i].pe == CkMyPe()) {
194         myid = i;
195         break;
196       }
197     }
198     //getMyId(pelist, npes, CkMyPe());
199     
200     CkAssert(myid >= 0 && myid < npes);
201
202     int breaking = npes/2;
203     int srcid = 0; // = getMyId(pelist, npes, src_pe);
204     for (int i=0; i<npes; ++i) {
205       if (counts[i].pe == srcpe) {
206         srcid = i;
207         break;
208       }
209     }
210
211     if (srcid < breaking ^ myid < breaking) {
212       // if we are in the two different halves, correct srcid
213       if (srcid < breaking) {
214         srcid += breaking;
215         if (srcid >= npes || srcid < breaking) srcid = breaking;
216       } else {
217         srcid -= breaking;
218       }
219     }
220     // now srcid is the starting point of this half ring, which could be the
221     // original sender itself (0 if the sender is not part of the recipients),
222     // or the counterpart in the other ring
223
224     int nextid = myid + 1;
225     // wrap nextpe around the ring
226     if(myid < breaking) {
227       if (nextid >= breaking) nextid = 0;
228     }
229     else {
230       if (nextid >= npes) nextid = breaking;
231     }
232
233     if (nextid != srcid) {
234       obj->pelist = new int[1];
235       obj->npes = 1;
236       obj->pelist[0] = counts[nextid].pe;
237     }
238
239     return obj;
240 }