New comlib strategies for CPMD. Propagates faster than a ring and is largely contenti...
[charm.git] / src / ck-com / MultiRingMulticast.C
1
2 #include "MultiRingMulticast.h"
3
4 //Array Constructor
5 MultiRingMulticast::MultiRingMulticast(CkArrayID dest_aid, int flag)
6     : DirectMulticastStrategy(dest_aid, flag){
7 }
8
9
10 void MultiRingMulticast::pup(PUP::er &p){
11
12     DirectMulticastStrategy::pup(p);
13 }
14
15 void MultiRingMulticast::beginProcessing(int  nelements){
16
17     DirectMulticastStrategy::beginProcessing(nelements);
18 }
19
20
21 inline int getMyId(int *pelist, int npes, int mype) {
22     int myid = -1;
23
24     for(int count = 0; count < npes; count ++)
25         if(mype == pelist[count])
26             myid = count;
27
28     return myid;
29 }
30
31
32 //Assumes a sorted input. Returns the next processor greater a given
33 //current pe
34 inline void getNextPe(int *pelist, int npes, int mype, int &nextpe) {
35     
36     nextpe = pelist[0];
37     
38     int count= 0;
39     for(count = 0; count < npes; count++)
40         if(pelist[count] > mype)
41             break;
42     
43     if(count < npes) 
44         nextpe = pelist[count];
45     
46     return;
47 }
48
49
50 inline int getMidPe(int *pelist, int npes, int src_pe) {
51     
52     int my_id = 0;
53     int mid_pe = -1;
54     
55     my_id = getMyId(pelist, npes, src_pe);    
56     
57     if(my_id < npes/2)
58         mid_pe = pelist[npes/2 + my_id];        
59     else
60         mid_pe = pelist[my_id % (npes/2)];
61     
62     return mid_pe;
63 }
64
65 //Unlike ring the source here sends two or more messages while all
66 //elements along the ring only send one.
67
68 ComlibSectionHashObject *MultiRingMulticast::createObjectOnSrcPe
69 (int nelements, CkArrayIndexMax *elements){
70
71     ComlibSectionHashObject *obj = new ComlibSectionHashObject;
72
73     obj->npes = 0;
74     obj->pelist = 0;
75
76     sinfo.getLocalIndices(nelements, elements, obj->indices);
77
78     int *pelist;
79     int npes;
80     sinfo.getRemotePelist(nelements, elements, npes, pelist);
81     
82     if(npes == 0)
83         return obj;
84
85     if(npes == 1) {        
86         obj->npes = 1;        
87         obj->pelist = pelist;
88
89         return obj;
90     }
91
92     if(npes == 2) {        
93         obj->npes = 2;        
94         obj->pelist = pelist;
95         
96         return obj;
97     }
98     
99     pelist[npes ++] = CkMyPe();
100     qsort(pelist, npes, sizeof(int), intCompare);
101
102     int myid = getMyId(pelist, npes, CkMyPe());    
103     int nextpe = -1;
104     
105     if(myid < npes / 2) 
106         getNextPe(pelist, npes/2, CkMyPe(), nextpe);
107     else 
108         getNextPe(pelist+npes/2, npes - npes/2, CkMyPe(), nextpe);
109     
110     int mid_pe = -1;
111     mid_pe = getMidPe(pelist, npes, CkMyPe());
112     
113     delete [] pelist;
114
115     if(nextpe != CkMyPe()) {
116         obj->pelist = new int[2];
117         obj->npes = 2;
118         
119         obj->pelist[0] = nextpe;
120         obj->pelist[1] = mid_pe;
121     }
122     else {
123         CkPrintf("Warning Should not be here !!!!!!!!!\n");
124         obj->pelist = new int[1];
125         obj->npes = 1;
126         
127         obj->pelist[0] = mid_pe;
128     }
129     
130     CkPrintf("%d Src = %d Next = %d Mid Pe =%d\n", CkMyPe(), CkMyPe(), nextpe, mid_pe);    
131     
132     return obj;
133 }
134
135
136 ComlibSectionHashObject *MultiRingMulticast::createObjectOnIntermediatePe
137 (int nelements, CkArrayIndexMax *elements, int src_pe){
138
139     ComlibSectionHashObject *obj = new ComlibSectionHashObject;
140
141     sinfo.getLocalIndices(nelements, elements, obj->indices);
142
143     int *pelist;
144     int npes;
145     sinfo.getRemotePelist(nelements, elements, npes, pelist);
146     
147     obj->pelist = 0;
148     obj->npes = 0;   
149
150     pelist[npes ++] = CkMyPe();
151
152     if(npes <= 3)
153         return obj;
154     
155     qsort(pelist, npes, sizeof(int), intCompare);
156     
157     int myid = getMyId(pelist, npes, CkMyPe());
158     
159     int nextpe = -1;
160
161     int new_src_pe = src_pe;
162     int mid_pe = getMidPe(pelist, npes, src_pe);
163     int src_id = getMyId(pelist, npes, src_pe);
164     
165     if(myid < npes / 2) {
166         getNextPe(pelist, npes/2, CkMyPe(), nextpe);
167
168         //If source is in the other half I have to change to the
169         //middle guy
170         if(src_id >= npes/2)
171             new_src_pe = mid_pe;
172     }
173     else {
174         getNextPe(pelist + (npes/2), npes - npes/2, CkMyPe(), nextpe);
175
176         //If source is in the other half I have to change to the
177         //middle guy
178         if(src_id < npes/2)
179             new_src_pe = mid_pe;
180     }
181
182     bool end_flag = isEndOfRing(nextpe, new_src_pe);
183
184     if((nextpe != CkMyPe()) && ((CkMyPe() == mid_pe) || !end_flag)) {
185         obj->pelist = new int[1];
186         obj->npes = 1;
187         obj->pelist[0] = nextpe;
188     }
189     
190     CkPrintf("%d: Src = %d Next = %d end = %d\n", CkMyPe(), src_pe, 
191              nextpe, end_flag);    
192     
193     
194     delete [] pelist;
195
196     return obj;
197 }
198
199 //We need to end the ring,
200 //    if next_pe is the same as the source_pe, or
201 //    if next_pe is the first processor in the ring, greater than srouce_pe.
202 //Both these comparisons are done in a 'cyclic' way with wraparounds.
203
204 int MultiRingMulticast::isEndOfRing(int next_pe, int src_pe){
205     
206     ComlibPrintf("[%d] isEndofring %d, %d\n", CkMyPe(), next_pe, src_pe);
207     
208     if(next_pe > CkMyPe()){
209         if(src_pe <= next_pe && src_pe > CkMyPe())
210             return 1;
211         
212         return 0;
213     }
214     
215     if(src_pe > CkMyPe() || src_pe <= next_pe)
216         return 1;
217     
218     return 0;
219 }
220