fixing multicast
[charm.git] / src / ck-com / EachToManyMulticastStrategy.C
1
2 #include "EachToManyMulticastStrategy.h"
3 #include "string.h"
4 #include "routerstrategy.h"
5
6 //EachToManyMulticastStrategy CODE
7 CkpvExtern(int, RecvdummyHandle);
8 CkpvExtern(CkGroupID, cmgrID);
9
10 void *itrDoneHandler(void *msg){
11
12     EachToManyMulticastStrategy *nm_mgr;
13     
14     DummyMsg *dmsg = (DummyMsg *)msg;
15     comID id = dmsg->id;
16     int instid = id.instanceID;
17
18     CmiFree(msg);
19     ComlibPrintf("[%d] Iteration finished %d\n", CkMyPe(), instid);
20
21     StrategyTableEntry *sentry = 
22         CProxy_ComlibManager(CkpvAccess(cmgrID)).ckLocalBranch()
23         ->getStrategyTableEntry(instid);
24     int nexpected = sentry->numElements;
25     
26     if(nexpected == 0) {             
27         //CkPrintf("[%d] Calling Dummy Done Inserting\n", CkMyPe());
28         nm_mgr = (EachToManyMulticastStrategy *)sentry->strategy;    
29         nm_mgr->doneInserting();
30     }
31     
32     return NULL;
33 }
34
35 void *E2MHandler(void *msg){
36     //CkPrintf("[%d]:In EachtoMany CallbackHandler\n", CkMyPe());
37     EachToManyMulticastStrategy *nm_mgr;    
38     
39     CkMcastBaseMsg *bmsg = (CkMcastBaseMsg *)EnvToUsr((envelope *)msg);
40     int instid = bmsg->_cookie.sInfo.cInfo.instId;
41     
42     nm_mgr = (EachToManyMulticastStrategy *) 
43         CProxy_ComlibManager(CkpvAccess(cmgrID)).
44         ckLocalBranch()->getStrategy(instid);
45     
46     nm_mgr->localMulticast(msg);
47     return NULL;
48 }
49
50 //Group Constructor
51 EachToManyMulticastStrategy::EachToManyMulticastStrategy(int substrategy, 
52                                                          int n_srcpes, 
53                                                          int *src_pelist,
54                                                          int n_destpes, 
55                                                          int *dest_pelist) 
56     : routerID(substrategy), CharmStrategy() {
57     
58     setType(GROUP_STRATEGY);
59
60     int count = 0;
61
62     if(n_srcpes == 0) {
63         n_srcpes = CkNumPes();
64         src_pelist = new int[n_srcpes];
65         for(count =0; count < n_srcpes; count ++)
66             src_pelist[count] = count;
67     }
68     
69     CkGroupID gid;
70     gid.setZero();
71     ginfo.setSourceGroup(gid, src_pelist, n_srcpes);    
72
73     if(n_destpes == 0) {
74         ndestpes = CkNumPes();
75         destpelist = new int[ndestpes];
76         for(count =0; count < ndestpes; count ++)
77             destpelist[count] = count;
78     }
79     else {
80         ndestpes = n_destpes;
81         destpelist = dest_pelist;
82     }
83
84     if(n_srcpes == 0){
85         pelist = src_pelist;
86         npes = n_srcpes;
87
88         commonInit();
89         return;
90     }
91
92     if(n_destpes == 0) {
93         pelist = destpelist;
94         npes = ndestpes;
95         
96         commonInit();
97         return;
98     }
99     
100     //source and destination lists are both subsets
101     pelist = new int[CkNumPes()];
102     npes = n_srcpes;
103     memcpy(pelist, src_pelist, n_srcpes * sizeof(int));
104     
105     for(int dcount = 0; dcount < ndestpes; dcount++) {
106         int p = destpelist[dcount];
107         
108         for(count = 0; count < npes; count ++)
109             if(pelist[count] == p)
110                 break;
111         
112         if(count == npes)
113             pelist[npes++] = p;
114     }    
115
116     commonInit();
117 }
118
119 //Array Constructor
120 EachToManyMulticastStrategy::EachToManyMulticastStrategy(int substrategy, 
121                                                          CkArrayID src, 
122                                                          CkArrayID dest, 
123                                                          int nsrc, 
124                                                          CkArrayIndexMax 
125                                                          *srcelements, 
126                                                          int ndest, 
127                                                          CkArrayIndexMax 
128                                                          *destelements)
129     :routerID(substrategy), CharmStrategy() {
130
131     setType(ARRAY_STRATEGY);
132     ainfo.setSourceArray(src, srcelements, nsrc);
133     ainfo.setDestinationArray(dest, destelements, ndest);
134
135     ainfo.getDestinationPeList(destpelist, ndestpes);
136     ainfo.getCombinedPeList(pelist, npes);
137     
138     //    for(int count = 0; count < npes; count ++){
139     //CkPrintf("%d, ", pelist[count]);
140     //}    
141     //CkPrintf("\n");
142     
143     commonInit();
144 }
145
146 extern char *router;
147 //Common initialization for both group and array constructors
148 void EachToManyMulticastStrategy::commonInit() {
149
150     setBracketed();
151     setForwardOnMigration(1);
152
153     if(CkMyPe() == 0 && router != NULL){
154         if(strcmp(router, "USE_MESH") == 0)
155             routerID = USE_MESH;
156         else if(strcmp(router, "USE_GRID") == 0)
157             routerID = USE_GRID;
158         else  if(strcmp(router, "USE_HYPERCUBE") == 0)
159             routerID = USE_HYPERCUBE;
160         else  if(strcmp(router, "USE_DIRECT") == 0)
161             routerID = USE_DIRECT;        
162     }
163     
164     ComlibPrintf("Creating Strategy %d\n", routerID);
165
166     rstrat = NULL;
167 }
168
169
170 void EachToManyMulticastStrategy::insertMessage(CharmMessageHolder *cmsg){
171
172     ComlibPrintf("[%d] EachToManyMulticast: insertMessage \n", 
173                  CkMyPe());   
174
175     if(cmsg->dest_proc == IS_MULTICAST && cmsg->sec_id != NULL) {        
176         int cur_sec_id = ComlibSectionInfo::getSectionID(*cmsg->sec_id);
177
178         if(cur_sec_id > 0) {        
179             sinfo.processOldSectionMessage(cmsg);
180         }
181         else {
182             //New sec id, so send it along with the message
183             void *newmsg = sinfo.getNewMulticastMessage(cmsg);
184             CkFreeMsg(cmsg->getCharmMessage());
185             CkSectionID *sid = cmsg->sec_id;
186             delete cmsg;
187             
188             cmsg = new CharmMessageHolder((char *)newmsg, IS_MULTICAST); 
189             cmsg->sec_id = sid;
190             initSectionID(cmsg->sec_id);
191         }        
192
193         if(cmsg->sec_id != NULL && cmsg->sec_id->pelist != NULL) {
194             cmsg->pelist = cmsg->sec_id->pelist;
195             cmsg->npes = cmsg->sec_id->npes;
196         }        
197     }
198
199     //For section multicasts and broadcasts
200     if(cmsg->dest_proc == IS_MULTICAST)
201         CmiSetHandler(UsrToEnv(cmsg->getCharmMessage()), handlerId);
202     
203     rstrat->insertMessage(cmsg);
204 }
205
206 void EachToManyMulticastStrategy::doneInserting(){
207     ComlibPrintf("%d: DoneInserting \n", CkMyPe());
208     
209     rstrat->doneInserting();
210 }
211
212 void EachToManyMulticastStrategy::pup(PUP::er &p){
213
214     int count = 0;
215     ComlibPrintf("[%d] Each To many::pup %s\n", CkMyPe(), 
216                  ((p.isPacking()==0)?("UnPacking"):("Packing")));
217
218     CharmStrategy::pup(p);
219
220     p | routerID; 
221     p | npes; p | ndestpes;     
222     
223     if(p.isUnpacking() && npes > 0) {
224         pelist = new int[npes];    
225     }
226
227     if(npes > 0)
228         p(pelist, npes);
229
230     if(p.isUnpacking() && ndestpes > 0) {
231         destpelist = new int[ndestpes];    
232     }    
233
234     if(ndestpes > 0)
235         p(destpelist, ndestpes);
236
237     if(p.isUnpacking()){
238         handlerId = CkRegisterHandler((CmiHandler)E2MHandler);
239         int handler = CkRegisterHandler((CmiHandler)itrDoneHandler);
240         
241         
242         rstrat = new RouterStrategy(routerID, handler, npes, pelist);
243         setConverseStrategy(rstrat);
244         MyPe = rstrat->getProcMap()[CkMyPe()];
245     }
246     
247     ComlibPrintf("[%d] End of pup\n", CkMyPe());
248 }
249
250 void EachToManyMulticastStrategy::beginProcessing(int numElements){
251
252     ComlibPrintf("Begin processing %d\n", numElements);
253
254     int expectedDeposits = 0;
255     MaxSectionID = 0;
256
257     rstrat->setInstance(getInstance());
258
259     if(ainfo.isSourceArray()) 
260         expectedDeposits = numElements;
261
262     if(getType() == GROUP_STRATEGY) {
263         
264         CkGroupID gid;
265         int *srcpelist;
266         int nsrcpes;
267         
268         ginfo.getSourceGroup(gid, srcpelist, nsrcpes);
269         
270         for(int count = 0; count < nsrcpes; count ++)
271             if(srcpelist[count] == CkMyPe()){
272                 expectedDeposits = 1;
273                 break;
274             }
275         
276         StrategyTableEntry *sentry = 
277             CProxy_ComlibManager(CkpvAccess(cmgrID)).ckLocalBranch()
278             ->getStrategyTableEntry(myInstanceID);
279         sentry->numElements = expectedDeposits;
280     }
281     
282     CkArrayID dest;
283     int nidx;
284     CkArrayIndexMax *idx_list;
285     
286     ainfo.getDestinationArray(dest, idx_list, nidx);
287     sinfo = ComlibSectionInfo(dest, myInstanceID);
288     
289     if(expectedDeposits > 0)
290         return;
291     
292     if(expectedDeposits == 0 && MyPe >= 0)
293         //doneInserting();
294         ConvComlibScheduleDoneInserting(myInstanceID);
295 }
296
297 void EachToManyMulticastStrategy::localMulticast(void *msg){
298     register envelope *env = (envelope *)msg;
299     CkUnpackMessage(&env);
300     
301     CkMcastBaseMsg *cbmsg = (CkMcastBaseMsg *)EnvToUsr(env);
302
303     int status = cbmsg->_cookie.sInfo.cInfo.status;
304     ComlibPrintf("[%d] In local multicast %d\n", CkMyPe(), status);
305         
306     if(status == COMLIB_MULTICAST_ALL) {        
307         ainfo.localBroadcast(env);
308         return;
309     }   
310
311     CkVec<CkArrayIndexMax> *dest_indices;    
312     if(status == COMLIB_MULTICAST_NEW_SECTION){        
313         envelope *newenv;
314         sinfo.unpack(env, dest_indices, newenv);        
315         ComlibArrayInfo::localMulticast(dest_indices, newenv);
316
317         CkVec<CkArrayIndexMax> *old_dest_indices;
318         ComlibSectionHashKey key(cbmsg->_cookie.pe, 
319                                  cbmsg->_cookie.sInfo.cInfo.id);
320
321         old_dest_indices = (CkVec<CkArrayIndexMax> *)sec_ht.get(key);
322         if(old_dest_indices != NULL)
323             delete old_dest_indices;
324         
325         sec_ht.put(key) = dest_indices;
326         CmiFree(env);
327         return;       
328     }
329
330     //status == COMLIB_MULTICAST_OLD_SECTION, use the cached section id
331     ComlibSectionHashKey key(cbmsg->_cookie.pe, 
332                              cbmsg->_cookie.sInfo.cInfo.id);    
333     dest_indices = (CkVec<CkArrayIndexMax> *)sec_ht.get(key);
334
335     if(dest_indices == NULL)
336         CkAbort("Destination indices is NULL\n");
337
338     ComlibArrayInfo::localMulticast(dest_indices, env);
339 }
340
341 void EachToManyMulticastStrategy::initSectionID(CkSectionID *sid){
342
343     sinfo.initSectionID(sid);    
344
345     //Convert real processor numbers to virtual processors in the all
346     //to all multicast group
347     for(int count = 0; count < sid->npes; count ++) {
348         sid->pelist[count] = rstrat->getProcMap()[sid->pelist[count]];        
349         if(sid->pelist[count] == -1) CkAbort("Invalid Section\n");
350     }
351 }