fixed a bug in MultiRing where the sorting of the data was causing problems.
[charm.git] / src / ck-com / ComlibSectionInfo.C
1 #include <alloca.h>
2
3 #include "ComlibManager.h"
4 #include "ComlibSectionInfo.h"
5
6 ComlibMulticastMsg * ComlibSectionInfo::getNewMulticastMessage(CharmMessageHolder *cmsg, int needSort){
7     
8     if(cmsg->sec_id == NULL || cmsg->sec_id->_nElems == 0)
9         return NULL;
10
11     void *m = cmsg->getCharmMessage();
12     envelope *env = UsrToEnv(m);
13         
14     if(cmsg->sec_id->_cookie.sInfo.cInfo.id != 0) 
15         CmiAbort("In correct section\n");
16
17     initSectionID(cmsg->sec_id);   
18
19     CkPackMessage(&env);
20     int nRemotePes, nRemoteIndices;
21     ComlibMulticastIndexCount *indicesCount;
22     int *belongingList;
23     getPeCount(cmsg->sec_id->_nElems, cmsg->sec_id->_elems, nRemotePes, nRemoteIndices, indicesCount, belongingList);
24     if (nRemotePes == 0) return NULL;
25
26     int sizes[3];
27     sizes[0] = nRemotePes;
28     sizes[1] = nRemoteIndices; // only those remote ///cmsg->sec_id->_nElems;
29     sizes[2] = env->getTotalsize();
30     
31     ComlibPrintf("Creating new comlib multicast message %d, %d %d\n", sizes[0], sizes[1], sizes[2]);
32     
33     ComlibMulticastMsg *msg = new(sizes, 0) ComlibMulticastMsg;
34     msg->nPes = nRemotePes;
35     msg->_cookie.sInfo.cInfo.instId = instanceID;
36     msg->_cookie.sInfo.cInfo.id = MaxSectionID - 1;
37     msg->_cookie.sInfo.cInfo.status = COMLIB_MULTICAST_NEW_SECTION;
38     msg->_cookie.type = COMLIB_MULTICAST_MESSAGE;
39     msg->_cookie.pe = CkMyPe();
40
41     // fill in the three pointers of the ComlibMulticastMsg
42     memcpy(msg->indicesCount, indicesCount, sizes[0] * sizeof(ComlibMulticastIndexCount));
43     //memcpy(msg->indices, cmsg->sec_id->_elems, sizes[1] * sizeof(CkArrayIndexMax));
44
45     CkArrayIndexMax **indicesPe = (CkArrayIndexMax**)alloca(nRemotePes * sizeof(CkArrayIndexMax*));
46
47     if (needSort) {
48       // if we are sorting the array, then we need to fix the problem that belongingList
49       // refers to the original ordering! This is done by mapping indicesPe in a way coherent
50       // with the original ordering.
51       int previous, i, j;
52       qsort(msg->indicesCount, sizes[0], sizeof(ComlibMulticastIndexCount), indexCountCompare);
53
54       for (j=0; j<nRemotePes; ++j) if (indicesCount[j].pe == msg->indicesCount[0].pe) break;
55       indicesPe[j] = msg->indices;
56       previous = j;
57       for (i=1; i<nRemotePes; ++i) {
58         for (j=0; j<nRemotePes; ++j) if (indicesCount[j].pe == msg->indicesCount[i].pe) break;
59         indicesPe[j] = indicesPe[previous] + indicesCount[previous].count;
60         previous = j;
61       }
62     } else {
63       indicesPe[0] = msg->indices;
64       for (int i=1; i<nRemotePes; ++i) indicesPe[i] = indicesPe[i-1] + indicesCount[i-1].count;
65     }
66
67     for (int i=0; i<cmsg->sec_id->_nElems; ++i) {
68       if (belongingList[i] >= 0) {
69         *indicesPe[belongingList[i]] = cmsg->sec_id->_elems[i];
70         indicesPe[belongingList[i]]++;
71       }
72     }
73     memcpy(msg->usrMsg, env, sizes[2] * sizeof(char));
74     envelope *newenv = UsrToEnv(msg);
75     delete [] indicesCount;
76     delete [] belongingList;
77
78     newenv->getsetArrayMgr() = env->getsetArrayMgr();
79     newenv->getsetArraySrcPe() = env->getsetArraySrcPe();
80     newenv->getsetArrayEp() = env->getsetArrayEp();
81     newenv->getsetArrayHops() = env->getsetArrayHops();
82     newenv->getsetArrayIndex() = env->getsetArrayIndex();
83
84     // for trace projections
85     newenv->setEvent(env->getEvent());
86     newenv->setSrcPe(env->getSrcPe());
87     
88     CkPackMessage(&newenv);        
89     return (ComlibMulticastMsg *)EnvToUsr(newenv);
90 }
91
92 void ComlibSectionInfo::unpack(envelope *cb_env,
93                                int &nLocalElems,
94                                CkArrayIndexMax *&dest_indices, 
95                                envelope *&env) {
96         
97     ComlibMulticastMsg *ccmsg = (ComlibMulticastMsg *)EnvToUsr(cb_env);
98     int i;
99
100     dest_indices = ccmsg->indices;
101     for (i=0; i<ccmsg->nPes; ++i) {
102       if (ccmsg->indicesCount[i].pe == CkMyPe()) break;
103       dest_indices += ccmsg->indicesCount[i].count;
104     }
105
106     CkAssert(i < ccmsg->nPes);
107     nLocalElems = ccmsg->indicesCount[i].count;
108
109     /*
110     CkPrintf("Unpacking: %d local elements:",nLocalElems);
111     for (int j=0; j<nLocalElems; ++j) CkPrintf(" %d",((int*)&dest_indices[j])[1]);
112     CkPrintf("\n");
113     */
114     /*
115     for(int count = 0; count < ccmsg->nIndices; count++){
116         CkArrayIndexMax idx = ccmsg->indices[count];
117         
118         //This will work because. lastknown always knows if I have the
119         //element of not
120         int dest_proc = ComlibGetLastKnown(destArrayID, idx);
121         //CkArrayID::CkLocalBranch(destArrayID)->lastKnown(idx);
122         
123         //        if(dest_proc == CkMyPe())
124         dest_indices.insertAtEnd(idx);                        
125     }
126     */
127
128     envelope *usrenv = (envelope *) ccmsg->usrMsg;
129     env = (envelope *)CmiAlloc(usrenv->getTotalsize());
130     memcpy(env, ccmsg->usrMsg, usrenv->getTotalsize());
131 }
132
133
134 void ComlibSectionInfo::processOldSectionMessage(CharmMessageHolder *cmsg) {
135
136     ComlibPrintf("Process Old Section Message \n");
137
138     int cur_sec_id = ComlibSectionInfo::getSectionID(*cmsg->sec_id);
139
140     //Old section id, send the id with the message
141     CkMcastBaseMsg *cbmsg = (CkMcastBaseMsg *)cmsg->getCharmMessage();
142     cbmsg->_cookie.sInfo.cInfo.id = cur_sec_id;
143     cbmsg->_cookie.sInfo.cInfo.status = COMLIB_MULTICAST_OLD_SECTION;
144 }
145
146 void ComlibSectionInfo::getPeList(int _nElems, 
147                                   CkArrayIndexMax *_elems, 
148                                   int &npes, int *&pelist){
149     
150     int length = CkNumPes();
151     if(length > _nElems)    //There will not be more processors than
152                             //number of elements. This is wastage of
153                             //memory as there may be fewer
154                             //processors. Fix later.
155         length = _nElems;
156     
157     pelist = new int[length];
158     npes = 0;
159     
160     int count = 0, acount = 0;
161     
162     for(acount = 0; acount < _nElems; acount++){
163         
164         int p = ComlibGetLastKnown(destArrayID, _elems[acount]);
165         
166         if(p == -1) CkAbort("Invalid Section\n");        
167         for(count = 0; count < npes; count ++)
168             if(pelist[count] == p)
169                 break;
170         
171         if(count == npes) {
172             pelist[npes ++] = p;
173         }
174     }   
175
176     if(npes == 0) {
177         delete [] pelist;
178         pelist = NULL;
179     }
180 }
181
182
183 void ComlibSectionInfo::getPeCount(int nindices, CkArrayIndexMax *idxlist, 
184                       int &npes, int &nidx,
185                       ComlibMulticastIndexCount *&counts, int *&belongs) {
186   int count = 0;
187   int i;
188     
189   int length = CkNumPes();
190
191   if(length > nindices) length = nindices;
192     
193   counts = new ComlibMulticastIndexCount[length];
194   belongs = new int[nindices];
195   npes = 0;
196   nidx = 0;
197
198   for(i=0; i<nindices; ++i){
199     int p = ComlibGetLastKnown(destArrayID, idxlist[i]);
200     
201     if(p == -1) CkAbort("Invalid Section\n");        
202
203     //Collect processors
204     for(count = 0; count < npes; count ++)
205       if(counts[count].pe == p)
206         break;
207     
208     if(count == npes) {
209       counts[npes].pe = p;
210       counts[npes].count = 0;
211       ++npes;
212     }
213
214     if(p == CkMyPe()) {
215       belongs[i] = -1;
216       continue;
217     }
218
219     ++nidx;
220     counts[count].count++;
221     belongs[i] = count;
222   }
223   //CkPrintf("section has %d procs\n",npes);
224
225   if(npes == 0) {
226     delete [] counts;
227     delete [] belongs;
228     counts = NULL;
229     belongs = NULL;
230   }
231 }
232
233
234 void ComlibSectionInfo::getRemotePelist(int nindices, 
235                                         CkArrayIndexMax *idxlist, 
236                                         int &npes, int *&pelist) {
237
238     int count = 0, acount = 0;
239     
240     int length = CkNumPes();
241
242     // HACK FOR DEBUGGING
243     /*pelist = new int[length-1];
244     npes = length-1;
245     for (acount=0; acount<length; acount++) {
246       if (acount == CkMyPe()) continue;
247       pelist[count]=acount;
248       count++;
249     }
250     return;*/
251     // END HACK
252
253     if(length > nindices)
254         length = nindices;
255     
256     pelist = new int[length+1];
257     npes = 0;
258
259     for(acount = 0; acount < nindices; acount++){
260         
261         int p = ComlibGetLastKnown(destArrayID, idxlist[acount]);
262         if(p == CkMyPe())
263             continue;
264         
265         if(p == -1) CkAbort("Invalid Section\n");        
266         
267         //Collect remote processors
268         for(count = 0; count < npes; count ++)
269             if(pelist[count] == p)
270                 break;
271         
272         if(count == npes) {
273             pelist[npes ++] = p;
274         }
275     }
276     
277     if(npes == 0) {
278         delete [] pelist;
279         pelist = NULL;
280     }
281 }
282
283
284 void ComlibSectionInfo::getLocalIndices(int nindices, 
285                                         CkArrayIndexMax *idxlist, 
286                                         CkVec<CkArrayIndexMax> &idx_vec){    
287     int count = 0, acount = 0;
288     idx_vec.resize(0);
289     
290     for(acount = 0; acount < nindices; acount++){
291         int p = ComlibGetLastKnown(destArrayID, idxlist[acount]);
292         if(p == CkMyPe()) 
293             idx_vec.insertAtEnd(idxlist[acount]);
294     }
295 }
296
297
298 void ComlibSectionInfo::localMulticast(envelope *env){
299     ComlibArrayInfo::localMulticast(&localDestIndexVec, env);
300 }