2d190e3a20fa4a958c0e8dac5b8cb906fd330f2a
[charm.git] / src / ck-com / EachToManyMulticastStrategy.C
1
2 #include "EachToManyMulticastStrategy.h"
3 #include "string.h"
4 #include "routerstrategy.h"
5
6 //EachToManyMulticastStrategy CODE
7 CkpvExtern(int, RecvdummyHandle);
8 CkpvExtern(CkGroupID, cmgrID);
9
10 void *itrDoneHandler(void *msg){
11
12     EachToManyMulticastStrategy *nm_mgr;
13     
14     DummyMsg *dmsg = (DummyMsg *)msg;
15     comID id = dmsg->id;
16     int instid = id.instanceID;
17
18     CmiFree(msg);
19     ComlibPrintf("[%d] Iteration finished %d\n", CkMyPe(), instid);
20
21     StrategyTableEntry *sentry = 
22         CProxy_ComlibManager(CkpvAccess(cmgrID)).ckLocalBranch()
23         ->getStrategyTableEntry(instid);
24     int nexpected = sentry->numElements;
25     
26     if(nexpected == 0) {             
27         CkPrintf("[%d] Calling Dummy Done Inserting\n", CkMyPe());
28         nm_mgr = (EachToManyMulticastStrategy *)sentry->strategy;    
29         nm_mgr->doneInserting();
30     }
31     
32     return NULL;
33 }
34
35 void *E2MHandler(void *msg){
36     //CkPrintf("[%d]:In EachtoMany CallbackHandler\n", CkMyPe());
37     EachToManyMulticastStrategy *nm_mgr;    
38     
39     CkMcastBaseMsg *bmsg = (CkMcastBaseMsg *)EnvToUsr((envelope *)msg);
40     int instid = bmsg->_cookie.sInfo.cInfo.instId;
41     
42     nm_mgr = (EachToManyMulticastStrategy *) 
43         CProxy_ComlibManager(CkpvAccess(cmgrID)).
44         ckLocalBranch()->getStrategy(instid);
45     
46     nm_mgr->localMulticast(msg);
47     return NULL;
48 }
49
50 //Group Constructor
51 EachToManyMulticastStrategy::EachToManyMulticastStrategy(int substrategy, 
52                                                          int n_srcpes, 
53                                                          int *src_pelist,
54                                                          int n_destpes, 
55                                                          int *dest_pelist) 
56     : routerID(substrategy), CharmStrategy() {
57     
58     setType(GROUP_STRATEGY);
59
60     int count = 0;
61
62     if(n_srcpes == 0) {
63         n_srcpes = CkNumPes();
64         src_pelist = new int[n_srcpes];
65         for(count =0; count < n_srcpes; count ++)
66             src_pelist[count] = count;
67     }
68     
69     CkGroupID gid;
70     gid.setZero();
71     ginfo.setSourceGroup(gid, src_pelist, n_srcpes);    
72
73     if(n_destpes == 0) {
74         ndestpes = CkNumPes();
75         destpelist = new int[ndestpes];
76         for(count =0; count < ndestpes; count ++)
77             destpelist[count] = count;
78     }
79     else {
80         ndestpes = n_destpes;
81         destpelist = dest_pelist;
82     }
83
84     if(n_srcpes == 0){
85         pelist = src_pelist;
86         npes = n_srcpes;
87
88         commonInit();
89         return;
90     }
91
92     if(n_destpes == 0) {
93         pelist = destpelist;
94         npes = ndestpes;
95         
96         commonInit();
97         return;
98     }
99     
100     //source and destination lists are both subsets
101     pelist = new int[CkNumPes()];
102     npes = n_srcpes;
103     memcpy(pelist, src_pelist, n_srcpes * sizeof(int));
104     
105     for(int dcount = 0; dcount < ndestpes; dcount++) {
106         int p = destpelist[dcount];
107         
108         for(count = 0; count < npes; count ++)
109             if(pelist[count] == p)
110                 break;
111         
112         if(count == npes)
113             pelist[npes++] = p;
114     }    
115
116     commonInit();
117 }
118
119 //Array Constructor
120 EachToManyMulticastStrategy::EachToManyMulticastStrategy(int substrategy, 
121                                                          CkArrayID src, 
122                                                          CkArrayID dest, 
123                                                          int nsrc, 
124                                                          CkArrayIndexMax 
125                                                          *srcelements, 
126                                                          int ndest, 
127                                                          CkArrayIndexMax 
128                                                          *destelements)
129     :routerID(substrategy), CharmStrategy() {
130
131     setType(ARRAY_STRATEGY);
132     ainfo.setSourceArray(src, srcelements, nsrc);
133     ainfo.setDestinationArray(dest, destelements, ndest);
134
135     ainfo.getDestinationPeList(destpelist, ndestpes);
136     ainfo.getCombinedPeList(pelist, npes);
137     
138     //    for(int count = 0; count < npes; count ++){
139     //CkPrintf("%d, ", pelist[count]);
140     //}    
141     //CkPrintf("\n");
142
143     commonInit();
144 }
145
146 extern char *router;
147 //Common initialization for both group and array constructors
148 void EachToManyMulticastStrategy::commonInit() {
149
150     setBracketed();
151     setForwardOnMigration(1);
152
153     if(CkMyPe() == 0 && router != NULL){
154         if(strcmp(router, "USE_MESH") == 0)
155             routerID = USE_MESH;
156         else if(strcmp(router, "USE_GRID") == 0)
157             routerID = USE_GRID;
158         else  if(strcmp(router, "USE_HYPERCUBE") == 0)
159             routerID = USE_HYPERCUBE;
160         else  if(strcmp(router, "USE_DIRECT") == 0)
161             routerID = USE_DIRECT;        
162     }
163     
164     ComlibPrintf("Creating Strategy %d\n", routerID);
165
166     rstrat = NULL;
167 }
168
169
170 void EachToManyMulticastStrategy::insertMessage(CharmMessageHolder *cmsg){
171
172     ComlibPrintf("[%d] EachToManyMulticast: insertMessage \n", 
173                  CkMyPe());   
174
175     if(cmsg->dest_proc == IS_MULTICAST && cmsg->sec_id != NULL) {        
176         int cur_sec_id = cmsg->sec_id->_cookie.sInfo.cInfo.id;
177
178         if(cur_sec_id > 0) {        
179             //Old section id, send the id with the message
180             CkMcastBaseMsg *cbmsg = (CkMcastBaseMsg *)cmsg->getCharmMessage();
181             cbmsg->_cookie.sInfo.cInfo.id = cur_sec_id;
182             cbmsg->_cookie.sInfo.cInfo.status = COMLIB_MULTICAST_OLD_SECTION;
183         }
184         else {
185             //New sec id, so send it along with the message
186             void *newmsg = (void *)getNewMulticastMessage(cmsg);
187             CkFreeMsg(cmsg->getCharmMessage());
188             CkSectionID *sid = cmsg->sec_id;
189             delete cmsg;
190             
191             cmsg = new CharmMessageHolder((char *)newmsg, IS_MULTICAST); 
192             cmsg->sec_id = sid;
193             initSectionID(cmsg->sec_id);
194         }        
195
196         if(cmsg->sec_id != NULL && cmsg->sec_id->pelist != NULL) {
197             cmsg->pelist = cmsg->sec_id->pelist;
198             cmsg->npes = cmsg->sec_id->npes;
199         }
200         
201         CmiSetHandler(UsrToEnv(cmsg->getCharmMessage()), handlerId);
202     }
203     
204     rstrat->insertMessage(cmsg);
205 }
206
207 void EachToManyMulticastStrategy::doneInserting(){
208     ComlibPrintf("%d: DoneInserting \n", CkMyPe());
209     
210     rstrat->doneInserting();
211 }
212
213 void EachToManyMulticastStrategy::pup(PUP::er &p){
214
215     int count = 0;
216     ComlibPrintf("[%d] Each To many::pup %s\n", CkMyPe(), 
217                  ((p.isPacking()==0)?("UnPacking"):("Packing")));
218
219     CharmStrategy::pup(p);
220
221     p | routerID; 
222     p | npes; p | ndestpes;     
223     
224     if(p.isUnpacking() && npes > 0) {
225         pelist = new int[npes];    
226     }
227
228     if(npes > 0)
229         p(pelist, npes);
230
231     if(p.isUnpacking() && ndestpes > 0) {
232         destpelist = new int[ndestpes];    
233     }    
234
235     if(ndestpes > 0)
236         p(destpelist, ndestpes);
237
238     if(p.isUnpacking()){
239         handlerId = CkRegisterHandler((CmiHandler)E2MHandler);
240         int handler = CkRegisterHandler((CmiHandler)itrDoneHandler);
241         
242         
243         rstrat = new RouterStrategy(routerID, handler, npes, pelist);
244         setConverseStrategy(rstrat);
245         MyPe = rstrat->getProcMap()[CkMyPe()];
246     }
247     
248     ComlibPrintf("[%d] End of pup\n", CkMyPe());
249 }
250
251 void EachToManyMulticastStrategy::beginProcessing(int numElements){
252
253     int expectedDeposits = 0;
254     MaxSectionID = 0;
255
256     rstrat->setInstance(getInstance());
257
258     if(ainfo.isSourceArray()) 
259         expectedDeposits = numElements;
260
261     if(getType() == GROUP_STRATEGY) {
262         
263         CkGroupID gid;
264         int *srcpelist;
265         int nsrcpes;
266         
267         ginfo.getSourceGroup(gid, srcpelist, nsrcpes);
268         
269         for(int count = 0; count < nsrcpes; count ++)
270             if(srcpelist[count] == CkMyPe()){
271                 expectedDeposits = 1;
272                 break;
273             }
274         
275         StrategyTableEntry *sentry = 
276             CProxy_ComlibManager(CkpvAccess(cmgrID)).ckLocalBranch()
277             ->getStrategyTableEntry(myInstanceID);
278         sentry->numElements = expectedDeposits;
279     }
280     
281     if(expectedDeposits > 0)
282         return;
283     
284     if(expectedDeposits == 0 && MyPe >= 0)
285         doneInserting();
286 }
287
288 void EachToManyMulticastStrategy::localMulticast(void *msg){
289     register envelope *env = (envelope *)msg;
290     CkUnpackMessage(&env);
291     
292     CkMcastBaseMsg *cbmsg = (CkMcastBaseMsg *)EnvToUsr(env);
293
294     int status = cbmsg->_cookie.sInfo.cInfo.status;
295     ComlibPrintf("[%d] In local multicast %d\n", CkMyPe(), status);
296         
297     if(status == COMLIB_MULTICAST_ALL) {        
298         ainfo.localMulticast(env);
299         return;
300     }   
301
302     CkVec<CkArrayIndexMax> *dest_indices;    
303     if(status == COMLIB_MULTICAST_NEW_SECTION){        
304
305         dest_indices = new CkVec<CkArrayIndexMax>;
306
307         //CkPrintf("[%d] Received message for new section\n", CkMyPe());
308
309         CkArrayID destArrayID;
310         int nDestElements;
311         CkArrayIndexMax *destelements;
312         ainfo.getSourceArray(destArrayID, destelements, nDestElements);
313
314         ComlibMulticastMsg *ccmsg = (ComlibMulticastMsg *)cbmsg;
315         for(int count = 0; count < ccmsg->nIndices; count++){
316             CkArrayIndexMax idx = ccmsg->indices[count];
317             //idx.print();
318             int dest_proc =CkArrayID::CkLocalBranch(destArrayID)
319                 ->lastKnown(idx);
320             
321             if(dest_proc == CkMyPe())
322                 dest_indices->insertAtEnd(idx);                        
323         }            
324
325         envelope *usrenv = (envelope *) ccmsg->usrMsg;
326         envelope *newenv = (envelope *)CmiAlloc(usrenv->getTotalsize());
327         memcpy(newenv, ccmsg->usrMsg, usrenv->getTotalsize());
328
329         ainfo.localMulticast(dest_indices, newenv);
330
331         CkVec<CkArrayIndexMax> *old_dest_indices;
332         ComlibSectionHashKey key(cbmsg->_cookie.pe, 
333                                  cbmsg->_cookie.sInfo.cInfo.id);
334
335         old_dest_indices = (CkVec<CkArrayIndexMax> *)sec_ht.get(key);
336         if(old_dest_indices != NULL)
337             delete old_dest_indices;
338         
339         sec_ht.put(key) = dest_indices;
340         CmiFree(env);
341         return;       
342     }
343
344     //status == COMLIB_MULTICAST_OLD_SECTION, use the cached section id
345     ComlibSectionHashKey key(cbmsg->_cookie.pe, 
346                              cbmsg->_cookie.sInfo.cInfo.id);    
347     dest_indices = (CkVec<CkArrayIndexMax> *)sec_ht.get(key);
348
349     if(dest_indices == NULL)
350         CkAbort("Destination indices is NULL\n");
351
352     ainfo.localMulticast(dest_indices, env);
353 }
354
355 ComlibMulticastMsg * EachToManyMulticastStrategy::getNewMulticastMessage
356 (CharmMessageHolder *cmsg){
357     
358     if(cmsg->sec_id == NULL || cmsg->sec_id->_nElems == 0)
359         return NULL;
360
361     void *m = cmsg->getCharmMessage();
362     envelope *env = UsrToEnv(m);
363     
364     if(cmsg->sec_id->_cookie.sInfo.cInfo.id == 0) {  //New Section ID;
365         CkPackMessage(&env);
366         int sizes[2];
367         sizes[0] = cmsg->sec_id->_nElems;
368         sizes[1] = env->getTotalsize();                
369
370         cmsg->sec_id->_cookie.sInfo.cInfo.id = MaxSectionID ++;
371
372         ComlibPrintf("Creating new comlib multicast message %d, %d\n", sizes[0], sizes[1]);
373
374         ComlibMulticastMsg *msg = new(sizes, 0) ComlibMulticastMsg;
375         msg->nIndices = cmsg->sec_id->_nElems;
376         msg->_cookie.sInfo.cInfo.instId = myInstanceID;
377         msg->_cookie.type = COMLIB_MULTICAST_MESSAGE;
378         msg->_cookie.sInfo.cInfo.id = MaxSectionID - 1;
379         msg->_cookie.sInfo.cInfo.status = COMLIB_MULTICAST_NEW_SECTION;
380         msg->_cookie.pe = CkMyPe();
381
382         memcpy(msg->indices, cmsg->sec_id->_elems, 
383                sizes[0] * sizeof(CkArrayIndexMax));
384         memcpy(msg->usrMsg, env, sizes[1] * sizeof(char));         
385         envelope *newenv = UsrToEnv(msg);
386         
387         newenv->getsetArrayMgr() = env->getsetArrayMgr();
388         newenv->getsetArraySrcPe() = env->getsetArraySrcPe();
389         newenv->getsetArrayEp() = env->getsetArrayEp();
390         newenv->getsetArrayHops() = env->getsetArrayHops();
391         newenv->getsetArrayIndex() = env->getsetArrayIndex();
392         // for trace projections
393         newenv->setEvent(env->getEvent());
394         newenv->setSrcPe(env->getSrcPe());
395
396         CkPackMessage(&newenv);        
397         return (ComlibMulticastMsg *)EnvToUsr(newenv);
398     }   
399
400     return NULL;
401 }
402
403
404 void EachToManyMulticastStrategy::initSectionID(CkSectionID *sid){
405
406     ainfo.initSectionID(sid);    
407
408     //Convert real processor numbers to virtual processors in the all
409     //to all multicast group
410     for(int count = 0; count < sid->npes; count ++) {
411         sid->pelist[count] = rstrat->getProcMap()[sid->pelist[count]];        
412         if(sid->pelist[count] == -1) CkAbort("Invalid Section\n");
413     }
414 }