New version with migration and forwarding always working. The test program also works.
[charm.git] / src / ck-com / EachToManyMulticastStrategy.C
1
2 #include "EachToManyMulticastStrategy.h"
3 #include "string.h"
4 #include "routerstrategy.h"
5
6 //EachToManyMulticastStrategy CODE
7 CkpvExtern(int, RecvdummyHandle);
8 CkpvExtern(CkGroupID, cmgrID);
9
10 void *itrDoneHandler(void *msg){
11
12     EachToManyMulticastStrategy *nm_mgr;
13     
14     DummyMsg *dmsg = (DummyMsg *)msg;
15     comID id = dmsg->id;
16     int instid = id.instanceID;
17
18     CmiFree(msg);
19     ComlibPrintf("[%d] Iteration finished %d\n", CkMyPe(), instid);
20
21     StrategyTableEntry *sentry = 
22         CProxy_ComlibManager(CkpvAccess(cmgrID)).ckLocalBranch()
23         ->getStrategyTableEntry(instid);
24     int nexpected = sentry->numElements;
25     
26     if(nexpected == 0) {             
27         //CkPrintf("[%d] Calling Dummy Done Inserting\n", CkMyPe());
28         nm_mgr = (EachToManyMulticastStrategy *)sentry->strategy;    
29         nm_mgr->doneInserting();
30     }
31     
32     return NULL;
33 }
34
35 void *E2MHandler(void *msg){
36     //CkPrintf("[%d]:In EachtoMany CallbackHandler\n", CkMyPe());
37     EachToManyMulticastStrategy *nm_mgr;    
38     
39     CkMcastBaseMsg *bmsg = (CkMcastBaseMsg *)EnvToUsr((envelope *)msg);
40     int instid = bmsg->_cookie.sInfo.cInfo.instId;
41     
42     nm_mgr = (EachToManyMulticastStrategy *) 
43         CProxy_ComlibManager(CkpvAccess(cmgrID)).
44         ckLocalBranch()->getStrategy(instid);
45     
46     nm_mgr->localMulticast(msg);
47     return NULL;
48 }
49
50 //Group Constructor
51 EachToManyMulticastStrategy::EachToManyMulticastStrategy(int substrategy, 
52                                                          int n_srcpes, 
53                                                          int *src_pelist,
54                                                          int n_destpes, 
55                                                          int *dest_pelist) 
56     : routerID(substrategy), CharmStrategy() {
57     
58     setType(GROUP_STRATEGY);
59
60     int count = 0;
61
62     if(n_srcpes == 0) {
63         n_srcpes = CkNumPes();
64         src_pelist = new int[n_srcpes];
65         for(count =0; count < n_srcpes; count ++)
66             src_pelist[count] = count;
67     }
68     
69     CkGroupID gid;
70     gid.setZero();
71     ginfo.setSourceGroup(gid, src_pelist, n_srcpes);    
72
73     if(n_destpes == 0) {
74         ndestpes = CkNumPes();
75         destpelist = new int[ndestpes];
76         for(count =0; count < ndestpes; count ++)
77             destpelist[count] = count;
78     }
79     else {
80         ndestpes = n_destpes;
81         destpelist = dest_pelist;
82     }
83
84     if(n_srcpes == 0){
85         pelist = src_pelist;
86         npes = n_srcpes;
87
88         commonInit();
89         return;
90     }
91
92     if(n_destpes == 0) {
93         pelist = destpelist;
94         npes = ndestpes;
95         
96         commonInit();
97         return;
98     }
99     
100     //source and destination lists are both subsets
101     pelist = new int[CkNumPes()];
102     npes = n_srcpes;
103     memcpy(pelist, src_pelist, n_srcpes * sizeof(int));
104     
105     for(int dcount = 0; dcount < ndestpes; dcount++) {
106         int p = destpelist[dcount];
107         
108         for(count = 0; count < npes; count ++)
109             if(pelist[count] == p)
110                 break;
111         
112         if(count == npes)
113             pelist[npes++] = p;
114     }    
115
116     commonInit();
117 }
118
119 //Array Constructor
120 EachToManyMulticastStrategy::EachToManyMulticastStrategy(int substrategy, 
121                                                          CkArrayID src, 
122                                                          CkArrayID dest, 
123                                                          int nsrc, 
124                                                          CkArrayIndexMax 
125                                                          *srcelements, 
126                                                          int ndest, 
127                                                          CkArrayIndexMax 
128                                                          *destelements)
129     :routerID(substrategy), CharmStrategy() {
130
131     setType(ARRAY_STRATEGY);
132     ainfo.setSourceArray(src, srcelements, nsrc);
133     ainfo.setDestinationArray(dest, destelements, ndest);
134
135     ainfo.getDestinationPeList(destpelist, ndestpes);
136     ainfo.getCombinedPeList(pelist, npes);
137     
138     //    for(int count = 0; count < npes; count ++){
139     //CkPrintf("%d, ", pelist[count]);
140     //}    
141     //CkPrintf("\n");
142     
143     commonInit();
144 }
145
146 extern char *router;
147 //Common initialization for both group and array constructors
148 void EachToManyMulticastStrategy::commonInit() {
149
150     setBracketed();
151     setForwardOnMigration(1);
152
153     if(CkMyPe() == 0 && router != NULL){
154         if(strcmp(router, "USE_MESH") == 0)
155             routerID = USE_MESH;
156         else if(strcmp(router, "USE_GRID") == 0)
157             routerID = USE_GRID;
158         else  if(strcmp(router, "USE_HYPERCUBE") == 0)
159             routerID = USE_HYPERCUBE;
160         else  if(strcmp(router, "USE_DIRECT") == 0)
161             routerID = USE_DIRECT;        
162     }
163     
164     ComlibPrintf("Creating Strategy %d\n", routerID);
165
166     rstrat = NULL;
167 }
168
169
170 void EachToManyMulticastStrategy::insertMessage(CharmMessageHolder *cmsg){
171
172     ComlibPrintf("[%d] EachToManyMulticast: insertMessage \n", 
173                  CkMyPe());   
174
175     if(cmsg->dest_proc == IS_MULTICAST && cmsg->sec_id != NULL) {        
176         int cur_sec_id = ComlibSectionInfo::getSectionID(*cmsg->sec_id);
177
178         if(cur_sec_id > 0) {        
179             sinfo.processOldSectionMessage(cmsg);
180         }
181         else {
182             //New sec id, so send it along with the message
183             void *newmsg = sinfo.getNewMulticastMessage(cmsg);
184             CkFreeMsg(cmsg->getCharmMessage());
185             CkSectionID *sid = cmsg->sec_id;
186             delete cmsg;
187             
188             cmsg = new CharmMessageHolder((char *)newmsg, IS_MULTICAST); 
189             cmsg->sec_id = sid;
190             initSectionID(cmsg->sec_id);
191         }        
192
193         if(cmsg->sec_id != NULL && cmsg->sec_id->pelist != NULL) {
194             cmsg->pelist = cmsg->sec_id->pelist;
195             cmsg->npes = cmsg->sec_id->npes;
196         }
197         
198         CmiSetHandler(UsrToEnv(cmsg->getCharmMessage()), handlerId);
199     }
200     
201     rstrat->insertMessage(cmsg);
202 }
203
204 void EachToManyMulticastStrategy::doneInserting(){
205     ComlibPrintf("%d: DoneInserting \n", CkMyPe());
206     
207     rstrat->doneInserting();
208 }
209
210 void EachToManyMulticastStrategy::pup(PUP::er &p){
211
212     int count = 0;
213     ComlibPrintf("[%d] Each To many::pup %s\n", CkMyPe(), 
214                  ((p.isPacking()==0)?("UnPacking"):("Packing")));
215
216     CharmStrategy::pup(p);
217
218     p | routerID; 
219     p | npes; p | ndestpes;     
220     
221     if(p.isUnpacking() && npes > 0) {
222         pelist = new int[npes];    
223     }
224
225     if(npes > 0)
226         p(pelist, npes);
227
228     if(p.isUnpacking() && ndestpes > 0) {
229         destpelist = new int[ndestpes];    
230     }    
231
232     if(ndestpes > 0)
233         p(destpelist, ndestpes);
234
235     if(p.isUnpacking()){
236         handlerId = CkRegisterHandler((CmiHandler)E2MHandler);
237         int handler = CkRegisterHandler((CmiHandler)itrDoneHandler);
238         
239         
240         rstrat = new RouterStrategy(routerID, handler, npes, pelist);
241         setConverseStrategy(rstrat);
242         MyPe = rstrat->getProcMap()[CkMyPe()];
243     }
244     
245     ComlibPrintf("[%d] End of pup\n", CkMyPe());
246 }
247
248 void EachToManyMulticastStrategy::beginProcessing(int numElements){
249
250     ComlibPrintf("Begin processing %d\n", numElements);
251
252     int expectedDeposits = 0;
253     MaxSectionID = 0;
254
255     rstrat->setInstance(getInstance());
256
257     if(ainfo.isSourceArray()) 
258         expectedDeposits = numElements;
259
260     if(getType() == GROUP_STRATEGY) {
261         
262         CkGroupID gid;
263         int *srcpelist;
264         int nsrcpes;
265         
266         ginfo.getSourceGroup(gid, srcpelist, nsrcpes);
267         
268         for(int count = 0; count < nsrcpes; count ++)
269             if(srcpelist[count] == CkMyPe()){
270                 expectedDeposits = 1;
271                 break;
272             }
273         
274         StrategyTableEntry *sentry = 
275             CProxy_ComlibManager(CkpvAccess(cmgrID)).ckLocalBranch()
276             ->getStrategyTableEntry(myInstanceID);
277         sentry->numElements = expectedDeposits;
278     }
279     
280     CkArrayID dest;
281     int nidx;
282     CkArrayIndexMax *idx_list;
283     
284     ainfo.getDestinationArray(dest, idx_list, nidx);
285     sinfo = ComlibSectionInfo(dest, myInstanceID);
286     
287     if(expectedDeposits > 0)
288         return;
289     
290     if(expectedDeposits == 0 && MyPe >= 0)
291         //doneInserting();
292         ConvComlibScheduleDoneInserting(myInstanceID);
293 }
294
295 void EachToManyMulticastStrategy::localMulticast(void *msg){
296     register envelope *env = (envelope *)msg;
297     CkUnpackMessage(&env);
298     
299     CkMcastBaseMsg *cbmsg = (CkMcastBaseMsg *)EnvToUsr(env);
300
301     int status = cbmsg->_cookie.sInfo.cInfo.status;
302     ComlibPrintf("[%d] In local multicast %d\n", CkMyPe(), status);
303         
304     if(status == COMLIB_MULTICAST_ALL) {        
305         ainfo.localBroadcast(env);
306         return;
307     }   
308
309     CkVec<CkArrayIndexMax> *dest_indices;    
310     if(status == COMLIB_MULTICAST_NEW_SECTION){        
311         envelope *newenv;
312         sinfo.unpack(env, dest_indices, newenv);        
313         ComlibArrayInfo::localMulticast(dest_indices, newenv);
314
315         CkVec<CkArrayIndexMax> *old_dest_indices;
316         ComlibSectionHashKey key(cbmsg->_cookie.pe, 
317                                  cbmsg->_cookie.sInfo.cInfo.id);
318
319         old_dest_indices = (CkVec<CkArrayIndexMax> *)sec_ht.get(key);
320         if(old_dest_indices != NULL)
321             delete old_dest_indices;
322         
323         sec_ht.put(key) = dest_indices;
324         CmiFree(env);
325         return;       
326     }
327
328     //status == COMLIB_MULTICAST_OLD_SECTION, use the cached section id
329     ComlibSectionHashKey key(cbmsg->_cookie.pe, 
330                              cbmsg->_cookie.sInfo.cInfo.id);    
331     dest_indices = (CkVec<CkArrayIndexMax> *)sec_ht.get(key);
332
333     if(dest_indices == NULL)
334         CkAbort("Destination indices is NULL\n");
335
336     ComlibArrayInfo::localMulticast(dest_indices, env);
337 }
338
339 void EachToManyMulticastStrategy::initSectionID(CkSectionID *sid){
340
341     sinfo.initSectionID(sid);    
342
343     //Convert real processor numbers to virtual processors in the all
344     //to all multicast group
345     for(int count = 0; count < sid->npes; count ++) {
346         sid->pelist[count] = rstrat->getProcMap()[sid->pelist[count]];        
347         if(sid->pelist[count] == -1) CkAbort("Invalid Section\n");
348     }
349 }