Adding fix for AMPI broadcast strategy.
[charm.git] / src / ck-com / EachToManyMulticastStrategy.C
1
2 #include "EachToManyMulticastStrategy.h"
3 #include "string.h"
4 #include "routerstrategy.h"
5
6 #include "AAPLearner.h"
7 #include "AAMLearner.h"
8
9 //EachToManyMulticastStrategy CODE
10 CkpvExtern(int, RecvdummyHandle);
11 CkpvExtern(CkGroupID, cmgrID);
12
13 void *itrDoneHandler(void *msg){
14
15     EachToManyMulticastStrategy *nm_mgr;
16     
17     DummyMsg *dmsg = (DummyMsg *)msg;
18     comID id = dmsg->id;
19     int instid = id.instanceID;
20
21     CmiFree(msg);
22     ComlibPrintf("[%d] Iteration finished %d\n", CkMyPe(), instid);
23
24     StrategyTableEntry *sentry = 
25         CProxy_ComlibManager(CkpvAccess(cmgrID)).ckLocalBranch()
26         ->getStrategyTableEntry(instid);
27     int nexpected = sentry->numElements;
28     
29     if(nexpected == 0) {             
30         ComlibPrintf("[%d] Calling Dummy Done Inserting, %d, %d\n", CkMyPe(), instid, nexpected);
31         nm_mgr = (EachToManyMulticastStrategy *)sentry->strategy;    
32         nm_mgr->doneInserting();
33     }
34     
35     return NULL;
36 }
37
38 void *E2MHandler(void *msg){
39     //CkPrintf("[%d]:In EachtoMany CallbackHandler\n", CkMyPe());
40     EachToManyMulticastStrategy *nm_mgr;    
41
42     envelope *env = (envelope *)msg;
43     CkMcastBaseMsg *bmsg = (CkMcastBaseMsg *)EnvToUsr(env);
44     int instid = bmsg->_cookie.sInfo.cInfo.instId;
45     
46     nm_mgr = (EachToManyMulticastStrategy *) 
47         CProxy_ComlibManager(CkpvAccess(cmgrID)).
48         ckLocalBranch()->getStrategy(instid);
49     
50     RECORD_RECV_STATS(instid, env->getTotalsize(), env->getSrcPe());
51     
52     nm_mgr->localMulticast(msg);
53     return NULL;
54 }
55
56 //Group Constructor
57 EachToManyMulticastStrategy::EachToManyMulticastStrategy(int substrategy, 
58                                                          int n_srcpes, 
59                                                          int *src_pelist,
60                                                          int n_destpes, 
61                                                          int *dest_pelist) 
62     : routerID(substrategy), CharmStrategy() {
63     
64     setType(GROUP_STRATEGY);
65
66     CkGroupID gid;
67     gid.setZero();
68     ginfo.setSourceGroup(gid, src_pelist, n_srcpes);    
69     ginfo.setDestinationGroup(gid, dest_pelist, n_destpes);
70
71     //Written in this funny way to be symettric with the array case.
72     ginfo.getDestinationGroup(gid, destpelist, ndestpes);
73     ginfo.getCombinedPeList(pelist, npes);
74
75     commonInit();
76 }
77
78 //Array Constructor
79 EachToManyMulticastStrategy::EachToManyMulticastStrategy(int substrategy, 
80                                                          CkArrayID src, 
81                                                          CkArrayID dest, 
82                                                          int nsrc, 
83                                                          CkArrayIndexMax 
84                                                          *srcelements, 
85                                                          int ndest, 
86                                                          CkArrayIndexMax 
87                                                          *destelements)
88     :routerID(substrategy), CharmStrategy() {
89
90     setType(ARRAY_STRATEGY);
91     ainfo.setSourceArray(src, srcelements, nsrc);
92     ainfo.setDestinationArray(dest, destelements, ndest);
93
94     ainfo.getDestinationPeList(destpelist, ndestpes);
95     ainfo.getCombinedPeList(pelist, npes);
96     
97     /*
98       char dump[1000];
99       char sdump[100];
100       sprintf(dump, "%d: Each To MANY PELIST :\n", CkMyPe());
101       for(int count = 0; count < npes; count ++){
102       sprintf(sdump, "%d, ", pelist[count]);
103       strcat(dump, sdump);           
104       }    
105       ComlibPrintf("%s\n", dump);
106     */
107
108     commonInit();
109 }
110
111 extern char *router;
112 //Common initialization for both group and array constructors
113 void EachToManyMulticastStrategy::commonInit() {
114
115     setBracketed();
116     setForwardOnMigration(1);
117
118     mflag = CmiFalse;
119
120     if(CkMyPe() == 0 && router != NULL){
121         if(strcmp(router, "USE_MESH") == 0)
122             routerID = USE_MESH;
123         else if(strcmp(router, "USE_GRID") == 0)
124             routerID = USE_GRID;
125         else  if(strcmp(router, "USE_HYPERCUBE") == 0)
126             routerID = USE_HYPERCUBE;
127         else  if(strcmp(router, "USE_DIRECT") == 0)
128             routerID = USE_DIRECT;        
129     }
130     
131     ComlibPrintf("Creating Strategy %d\n", routerID);
132
133     rstrat = NULL;
134 }
135
136 EachToManyMulticastStrategy::~EachToManyMulticastStrategy() {
137     CkHashtableIterator *ht_iterator = sec_ht.iterator();
138     ht_iterator->seekStart();
139     while(ht_iterator->hasNext()){
140         void **data;
141         data = (void **)ht_iterator->next();        
142         CkVec<CkArrayIndexMax> *a_vec = (CkVec<CkArrayIndexMax> *) (* data);
143         if(a_vec != NULL)
144             delete a_vec;
145     }
146 }
147
148
149 void EachToManyMulticastStrategy::insertMessage(CharmMessageHolder *cmsg){
150
151     ComlibPrintf("[%d] EachToManyMulticast: insertMessage \n", 
152                  CkMyPe());   
153
154     if(cmsg->dest_proc == IS_SECTION_MULTICAST && cmsg->sec_id != NULL) { 
155         int cur_sec_id = ComlibSectionInfo::getSectionID(*cmsg->sec_id);
156
157         if(cur_sec_id > 0) {        
158             sinfo.processOldSectionMessage(cmsg);
159         }
160         else {
161             //New sec id, so send it along with the message
162             void *newmsg = sinfo.getNewMulticastMessage(cmsg);
163             CkFreeMsg(cmsg->getCharmMessage());
164             CkSectionID *sid = cmsg->sec_id;
165             delete cmsg;
166             
167             cmsg = new CharmMessageHolder((char *)newmsg,
168                                           IS_SECTION_MULTICAST); 
169             cmsg->sec_id = sid;
170             initSectionID(cmsg->sec_id);
171         }        
172
173         if(cmsg->sec_id != NULL && cmsg->sec_id->pelist != NULL) {
174             cmsg->pelist = cmsg->sec_id->pelist;
175             cmsg->npes = cmsg->sec_id->npes;
176         }        
177     }
178
179     if(cmsg->dest_proc == IS_BROADCAST) {
180         cmsg->npes = ndestpes;
181         cmsg->pelist = destpelist;
182
183         //Added write now as a move from ComlibManager::ArrayBroadcast
184         void *m = cmsg->getCharmMessage();
185         CkSectionInfo minfo;
186         minfo.type = COMLIB_MULTICAST_MESSAGE;
187         minfo.sInfo.cInfo.instId = getInstance();
188         minfo.sInfo.cInfo.status = COMLIB_MULTICAST_ALL;  
189         minfo.sInfo.cInfo.id = 0; 
190         minfo.pe = CkMyPe();
191         ((CkMcastBaseMsg *)m)->_cookie = minfo;       
192     }
193
194     //For section multicasts and broadcasts
195     if(cmsg->dest_proc == IS_SECTION_MULTICAST 
196        || cmsg->dest_proc == IS_BROADCAST ) {
197         
198         //Use Multicast Learner (Foobar will not work for combinations
199         //of personalized and multicast messages
200         
201         if(!mflag) {
202             ComlibLearner *l = getLearner();
203             if(l != NULL) {
204                 delete l;
205                 l = NULL;
206             }
207             
208             AAMLearner *alearner = new AAMLearner();
209             //setLearner(alearner);
210             mflag = CmiTrue;
211         }
212
213         CmiSetHandler(UsrToEnv(cmsg->getCharmMessage()), handlerId);
214
215         //Collect Multicast Statistics
216         RECORD_SENDM_STATS(getInstance(), 
217                            ((envelope *)cmsg->getMessage())->getTotalsize(), 
218                            cmsg->pelist, cmsg->npes);
219     }
220     else {
221         //Collect Statistics
222         RECORD_SEND_STATS(getInstance(), 
223                           ((envelope *)cmsg->getMessage())->getTotalsize(), 
224                           cmsg->dest_proc);
225     }
226     
227     rstrat->insertMessage(cmsg);
228 }
229
230 void EachToManyMulticastStrategy::doneInserting(){
231
232     StrategyTableEntry *sentry = 
233         CProxy_ComlibManager(CkpvAccess(cmgrID)).ckLocalBranch()
234         ->getStrategyTableEntry(getInstance());
235     int nexpected = sentry->numElements;
236
237     if(routerID == USE_DIRECT && nexpected == 0)
238         return;
239
240     //ComlibPrintf("%d: DoneInserting \n", CkMyPe());    
241     rstrat->doneInserting();
242 }
243
244 void EachToManyMulticastStrategy::pup(PUP::er &p){
245
246     int count = 0;
247     ComlibPrintf("[%d] Each To many::pup %s\n", CkMyPe(), 
248                  ((!p.isUnpacking() == 0)?("UnPacking"):("Packing")));
249
250     CharmStrategy::pup(p);
251
252     p | routerID; 
253     p | npes; p | ndestpes;     
254     p | mflag;
255     
256     if(p.isUnpacking() && npes > 0) {
257         pelist = new int[npes];    
258     }
259
260     if(npes > 0)
261         p(pelist, npes);
262
263     if(p.isUnpacking() && ndestpes > 0) {
264         destpelist = new int[ndestpes];    
265     }    
266
267     if(ndestpes > 0)
268         p(destpelist, ndestpes);
269
270     if(p.isUnpacking()){
271         handlerId = CkRegisterHandler((CmiHandler)E2MHandler);
272         int handler = CkRegisterHandler((CmiHandler)itrDoneHandler);        
273         
274         rstrat = new RouterStrategy(routerID, handler, npes, pelist);
275         setConverseStrategy(rstrat);
276         MyPe = rstrat->getProcMap()[CkMyPe()];
277     }
278     
279     ComlibPrintf("[%d] End of pup\n", CkMyPe());
280 }
281
282 void EachToManyMulticastStrategy::beginProcessing(int numElements){
283     
284     ComlibPrintf("[%d] Begin processing %d\n", CkMyPe(), numElements);
285     
286     char dump[1000];
287     char sdump[100];
288     sprintf(dump, "%d: Each To MANY PELIST :\n", CkMyPe());
289     for(int count = 0; count < npes; count ++){
290         sprintf(sdump, "%d, ", pelist[count]);
291         strcat(dump, sdump);           
292     }    
293     ComlibPrintf("%s\n", dump);
294
295     int expectedDeposits = 0;
296
297     rstrat->setInstance(getInstance());
298
299     if(ainfo.isSourceArray()) 
300         expectedDeposits = numElements;
301
302     if(getType() == GROUP_STRATEGY) {
303         
304         CkGroupID gid;
305         int *srcpelist;
306         int nsrcpes;
307         
308         ginfo.getSourceGroup(gid, srcpelist, nsrcpes);
309         
310         for(int count = 0; count < nsrcpes; count ++)
311             if(srcpelist[count] == CkMyPe()){
312                 expectedDeposits = 1;
313                 break;
314             }
315         
316         StrategyTableEntry *sentry = 
317             CProxy_ComlibManager(CkpvAccess(cmgrID)).ckLocalBranch()
318             ->getStrategyTableEntry(myInstanceID);
319         sentry->numElements = expectedDeposits;
320     }
321     
322     CkArrayID dest;
323     int nidx;
324     CkArrayIndexMax *idx_list;
325     
326     ainfo.getDestinationArray(dest, idx_list, nidx);
327     sinfo = ComlibSectionInfo(dest, myInstanceID);
328     
329     AAPLearner *alearner = new AAPLearner();
330     //setLearner(alearner);
331
332     if(expectedDeposits > 0)
333         return;
334     
335     if(expectedDeposits == 0 && MyPe >= 0)
336         //doneInserting();
337         ConvComlibScheduleDoneInserting(myInstanceID);
338 }
339
340 void EachToManyMulticastStrategy::finalizeProcessing() {
341     if(npes > 0)
342         delete [] pelist;
343     
344     if(ndestpes > 0)
345         delete [] destpelist;
346
347     if(rstrat)
348         delete rstrat;
349
350     if(getLearner() != NULL)
351         delete getLearner();
352 }
353
354
355 void EachToManyMulticastStrategy::localMulticast(void *msg){
356     register envelope *env = (envelope *)msg;
357     CkUnpackMessage(&env);
358     
359     CkMcastBaseMsg *cbmsg = (CkMcastBaseMsg *)EnvToUsr(env);
360
361     int status = cbmsg->_cookie.sInfo.cInfo.status;
362     ComlibPrintf("[%d] In local multicast %d\n", CkMyPe(), status);
363         
364     if(status == COMLIB_MULTICAST_ALL) {        
365         ainfo.localBroadcast(env);
366         return;
367     }   
368
369     CkVec<CkArrayIndexMax> *dest_indices;    
370     if(status == COMLIB_MULTICAST_NEW_SECTION){        
371         envelope *newenv;
372         sinfo.unpack(env, dest_indices, newenv);        
373         ComlibArrayInfo::localMulticast(dest_indices, newenv);
374
375         CkVec<CkArrayIndexMax> *old_dest_indices;
376         ComlibSectionHashKey key(cbmsg->_cookie.pe, 
377                                  cbmsg->_cookie.sInfo.cInfo.id);
378
379         old_dest_indices = (CkVec<CkArrayIndexMax> *)sec_ht.get(key);
380         if(old_dest_indices != NULL)
381             delete old_dest_indices;
382         
383         sec_ht.put(key) = dest_indices;
384         CmiFree(env);
385         return;       
386     }
387
388     //status == COMLIB_MULTICAST_OLD_SECTION, use the cached section id
389     ComlibSectionHashKey key(cbmsg->_cookie.pe, 
390                              cbmsg->_cookie.sInfo.cInfo.id);    
391     dest_indices = (CkVec<CkArrayIndexMax> *)sec_ht.get(key);
392
393     if(dest_indices == NULL)
394         CkAbort("Destination indices is NULL\n");
395
396     ComlibArrayInfo::localMulticast(dest_indices, env);
397 }
398
399 void EachToManyMulticastStrategy::initSectionID(CkSectionID *sid){
400
401     sinfo.initSectionID(sid);    
402
403     //Convert real processor numbers to virtual processors in the all
404     //to all multicast group
405     for(int count = 0; count < sid->npes; count ++) {
406         sid->pelist[count] = rstrat->getProcMap()[sid->pelist[count]]; 
407         if(sid->pelist[count] == -1) CkAbort("Invalid Section\n");
408     }
409 }