Disabling learning in each to many multicast.
[charm.git] / src / ck-com / EachToManyMulticastStrategy.C
1
2 /*********************************************************
3             The EachToManyMulticast Strategy optimizes all-to-all
4             communication. It combines messages and sends them along
5             virtual topologies 2d mesh, 3d mesh and hypercube.
6
7             For large messages send them directly.
8
9             This is the object level strategy. For processor level
10             optimizations routers are called.
11
12   - Sameer Kumar.
13
14 **********************************************************/
15
16
17 #include "EachToManyMulticastStrategy.h"
18 #include "string.h"
19 #include "routerstrategy.h"
20
21 #include "AAPLearner.h"
22 #include "AAMLearner.h"
23
24 //EachToManyMulticastStrategy CODE
25 CkpvExtern(int, RecvdummyHandle);
26 CkpvExtern(CkGroupID, cmgrID);
27
28 void *itrDoneHandler(void *msg){
29
30     EachToManyMulticastStrategy *nm_mgr;
31     
32     DummyMsg *dmsg = (DummyMsg *)msg;
33     comID id = dmsg->id;
34     int instid = id.instanceID;
35     
36     CmiFree(msg);
37     ComlibPrintf("[%d] Iteration finished %d\n", CkMyPe(), instid);
38
39     StrategyTableEntry *sentry = 
40         CProxy_ComlibManager(CkpvAccess(cmgrID)).ckLocalBranch()
41         ->getStrategyTableEntry(instid);
42     int nexpected = sentry->numElements;
43     
44     if(nexpected == 0) {             
45         ComlibPrintf("[%d] Calling Dummy Done Inserting, %d, %d\n", CkMyPe(), instid, nexpected);
46         nm_mgr = (EachToManyMulticastStrategy *)sentry->strategy;    
47         nm_mgr->doneInserting();
48         
49         if (!nm_mgr->getOnFinish().isInvalid()) nm_mgr->getOnFinish().send(0);
50     
51     }
52
53     return NULL;
54 }
55
56 void *E2MHandler(void *msg){
57     //CkPrintf("[%d]:In EachtoMany CallbackHandler\n", CkMyPe());
58     EachToManyMulticastStrategy *nm_mgr;    
59     
60     CmiMsgHeaderExt *conv_header = (CmiMsgHeaderExt *) msg;
61     int instid = conv_header->stratid;
62
63     envelope *env = (envelope *)msg;
64     
65     nm_mgr = (EachToManyMulticastStrategy *) 
66         CProxy_ComlibManager(CkpvAccess(cmgrID)).
67         ckLocalBranch()->getStrategy(instid);
68     
69     RECORD_RECV_STATS(instid, env->getTotalsize(), env->getSrcPe());
70     
71     nm_mgr->localMulticast(msg);
72     return NULL;
73 }
74
75 //Group Constructor
76 EachToManyMulticastStrategy::EachToManyMulticastStrategy(int substrategy, 
77                                                          int n_srcpes, 
78                                                          int *src_pelist,
79                                                          int n_destpes, 
80                                                          int *dest_pelist) 
81     : routerID(substrategy), CharmStrategy() {
82     
83     setType(GROUP_STRATEGY);
84
85     CkGroupID gid;
86     gid.setZero();
87     ginfo.setSourceGroup(gid, src_pelist, n_srcpes);    
88     ginfo.setDestinationGroup(gid, dest_pelist, n_destpes);
89
90     //Written in this funny way to be symmetric with the array case.
91     ginfo.getDestinationGroup(gid, destpelist, ndestpes);
92     ginfo.getCombinedPeList(pelist, npes);
93
94     commonInit();
95 }
96
97 //Array Constructor
98 EachToManyMulticastStrategy::EachToManyMulticastStrategy(int substrategy, 
99                                                          CkArrayID src, 
100                                                          CkArrayID dest, 
101                                                          int nsrc, 
102                                                          CkArrayIndexMax 
103                                                          *srcelements, 
104                                                          int ndest, 
105                                                          CkArrayIndexMax 
106                                                          *destelements)
107     :routerID(substrategy), CharmStrategy() {
108
109     setType(ARRAY_STRATEGY);
110     ainfo.setSourceArray(src, srcelements, nsrc);
111     ainfo.setDestinationArray(dest, destelements, ndest);
112
113     ainfo.getDestinationPeList(destpelist, ndestpes);
114     ainfo.getCombinedPeList(pelist, npes);
115     
116     /*
117       char dump[1000];
118       char sdump[100];
119       sprintf(dump, "%d: Each To MANY PELIST :\n", CkMyPe());
120       for(int count = 0; count < npes; count ++){
121       sprintf(sdump, "%d, ", pelist[count]);
122       strcat(dump, sdump);           
123       }    
124       ComlibPrintf("%s\n", dump);
125     */
126
127     commonInit();
128 }
129
130 extern char *router;
131 //Common initialization for both group and array constructors
132 void EachToManyMulticastStrategy::commonInit() {
133
134     setBracketed();
135     setForwardOnMigration(1);
136
137     if(CkMyPe() == 0 && router != NULL){
138         if(strcmp(router, "USE_MESH") == 0)
139             routerID = USE_MESH;
140         else if(strcmp(router, "USE_GRID") == 0)
141             routerID = USE_GRID;
142         else  if(strcmp(router, "USE_HYPERCUBE") == 0)
143             routerID = USE_HYPERCUBE;
144         else  if(strcmp(router, "USE_DIRECT") == 0)
145             routerID = USE_DIRECT;        
146         else  if(strcmp(router, "USE_PREFIX") == 0)
147             routerID = USE_PREFIX;        
148
149         //Just for the first step. After learning the learned
150         //strategies will be chosen
151         router = NULL;
152     }
153     
154     ComlibPrintf("Creating Strategy %d\n", routerID);
155
156     rstrat = NULL;
157 }
158
159 EachToManyMulticastStrategy::~EachToManyMulticastStrategy() {
160 }
161
162
163 void EachToManyMulticastStrategy::insertMessage(CharmMessageHolder *cmsg){
164
165     ComlibPrintf("[%d] EachToManyMulticast: insertMessage \n", 
166                  CkMyPe());   
167
168     envelope *env = UsrToEnv(cmsg->getCharmMessage());
169
170     if(cmsg->dest_proc == IS_BROADCAST) {
171         //All to all multicast
172         
173         cmsg->npes = ndestpes;
174         cmsg->pelist = destpelist;
175         
176         //Use Multicast Learner (Foobar will not work for combinations
177         //of personalized and multicast messages
178         
179         CmiSetHandler(env, handlerId);
180
181         //Collect Multicast Statistics
182         RECORD_SENDM_STATS(getInstance(), env->getTotalsize(), 
183                            cmsg->pelist, cmsg->npes);
184     }
185     else {
186         //All to all personalized
187
188         //Collect Statistics
189         RECORD_SEND_STATS(getInstance(), env->getTotalsize(), 
190                           cmsg->dest_proc);
191     }        
192
193     rstrat->insertMessage(cmsg);
194 }
195
196 void EachToManyMulticastStrategy::doneInserting(){
197
198     StrategyTableEntry *sentry = 
199         CProxy_ComlibManager(CkpvAccess(cmgrID)).ckLocalBranch()
200         ->getStrategyTableEntry(getInstance());
201     int nexpected = sentry->numElements;
202     
203     if(routerID == USE_DIRECT && nexpected == 0)
204         return;
205     
206     if(MyPe < 0)
207         return;
208
209     //ComlibPrintf("%d: DoneInserting \n", CkMyPe());    
210     rstrat->doneInserting();
211 }
212
213 void EachToManyMulticastStrategy::pup(PUP::er &p){
214
215     int count = 0;
216     ComlibPrintf("[%d] Each To many::pup %s\n", CkMyPe(), 
217                  ((!p.isUnpacking() == 0)?("UnPacking"):("Packing")));
218
219     CharmStrategy::pup(p);
220
221     p | routerID; 
222     p | npes; p | ndestpes;     
223     
224     if(p.isUnpacking() && npes > 0) {
225         pelist = new int[npes];    
226     }
227
228     if(npes > 0)
229         p(pelist, npes);
230
231     if(p.isUnpacking() && ndestpes > 0) {
232         destpelist = new int[ndestpes];    
233     }    
234
235     if(ndestpes > 0)
236         p(destpelist, ndestpes);
237
238     if(p.isUnpacking()){
239         handlerId = CkRegisterHandler((CmiHandler)E2MHandler);
240         int handler = CkRegisterHandler((CmiHandler)itrDoneHandler);        
241         
242         if(npes > 0) {
243             rstrat = new RouterStrategy(routerID, handler, npes, pelist);
244             setConverseStrategy(rstrat);
245             MyPe = rstrat->getProcMap()[CkMyPe()];
246         }
247         else MyPe = -1;
248     }
249     
250     ComlibPrintf("[%d] End of pup\n", CkMyPe());
251 }
252
253 void EachToManyMulticastStrategy::beginProcessing(int numElements){
254     
255     ComlibPrintf("[%d] Begin processing %d\n", CkMyPe(), numElements);
256     /*
257     char dump[1000];
258     char sdump[100];
259     sprintf(dump, "%d: Each To MANY PELIST :\n", CkMyPe());
260     for(int count = 0; count < npes; count ++){
261         sprintf(sdump, "%d, ", pelist[count]);
262         strcat(dump, sdump);           
263     }    
264     ComlibPrintf("%s\n", dump);
265     */
266
267     int expectedDeposits = 0;
268
269     rstrat->setInstance(getInstance());
270
271     if(ainfo.isSourceArray()) 
272         expectedDeposits = numElements;
273     
274     if(getType() == GROUP_STRATEGY) {
275         
276         CkGroupID gid;
277         int *srcpelist;
278         int nsrcpes;
279         
280         ginfo.getSourceGroup(gid, srcpelist, nsrcpes);
281         
282         for(int count = 0; count < nsrcpes; count ++)
283             if(srcpelist[count] == CkMyPe()){
284                 expectedDeposits = 1;
285                 break;
286             }
287         
288         StrategyTableEntry *sentry = 
289             CProxy_ComlibManager(CkpvAccess(cmgrID)).ckLocalBranch()
290             ->getStrategyTableEntry(myInstanceID);
291         sentry->numElements = expectedDeposits;
292     }
293     /*
294     if(!mflag) 
295         setLearner(new AAPLearner());    
296     else 
297         setLearner(new AAMLearner());                
298     */
299
300     if(expectedDeposits > 0)
301         return;
302     
303     if(expectedDeposits == 0 && MyPe >= 0)
304         ConvComlibScheduleDoneInserting(myInstanceID);
305 }
306
307 void EachToManyMulticastStrategy::finalizeProcessing() {
308     if(npes > 0)
309         delete [] pelist;
310     
311     if(ndestpes > 0)
312         delete [] destpelist;
313
314     if(rstrat)
315         delete rstrat;
316
317     if(getLearner() != NULL)
318         delete getLearner();
319 }
320
321 void EachToManyMulticastStrategy::localMulticast(void *msg){
322     register envelope *env = (envelope *)msg;
323     CkUnpackMessage(&env);
324     
325     ainfo.localBroadcast(env);
326 }
327