61b6aa260b87006b23e3ecbbf2bea09a69cef74b
[charm.git] / src / ck-com / EachToManyMulticastStrategy.C
1
2 /*********************************************************
3             The EachToManyMulticast Strategy optimizes all-to-all
4             communication. It combines messages and sends them along
5             virtual topologies 2d mesh, 3d mesh and hypercube.
6
7             For large messages send them directly.
8
9             This is the object level strategy. For processor level
10             optimizations routers are called.
11
12   - Sameer Kumar.
13
14 **********************************************************/
15
16
17 #include "EachToManyMulticastStrategy.h"
18 #include "string.h"
19 #include "routerstrategy.h"
20
21 #include "AAPLearner.h"
22 #include "AAMLearner.h"
23
24 //EachToManyMulticastStrategy CODE
25 CkpvExtern(int, RecvdummyHandle);
26 CkpvExtern(CkGroupID, cmgrID);
27
28 void *itrDoneHandler(void *msg){
29
30     EachToManyMulticastStrategy *nm_mgr;
31     
32     DummyMsg *dmsg = (DummyMsg *)msg;
33     comID id = dmsg->id;
34     int instid = id.instanceID;
35     
36     CmiFree(msg);
37     ComlibPrintf("[%d] Iteration finished %d\n", CkMyPe(), instid);
38
39     StrategyTableEntry *sentry = 
40         CProxy_ComlibManager(CkpvAccess(cmgrID)).ckLocalBranch()
41         ->getStrategyTableEntry(instid);
42     int nexpected = sentry->numElements;
43     
44     if(nexpected == 0) {             
45         ComlibPrintf("[%d] Calling Dummy Done Inserting, %d, %d\n", CkMyPe(), instid, nexpected);
46         nm_mgr = (EachToManyMulticastStrategy *)sentry->strategy;    
47         nm_mgr->doneInserting();
48         
49         if (!nm_mgr->getOnFinish().isInvalid()) nm_mgr->getOnFinish().send(0);
50     
51     }
52
53     return NULL;
54 }
55
56 void *E2MHandler(void *msg){
57     //CkPrintf("[%d]:In EachtoMany CallbackHandler\n", CkMyPe());
58     EachToManyMulticastStrategy *nm_mgr;    
59     
60     CmiMsgHeaderExt *conv_header = (CmiMsgHeaderExt *) msg;
61     int instid = conv_header->stratid;
62
63     envelope *env = (envelope *)msg;
64     
65     nm_mgr = (EachToManyMulticastStrategy *) 
66         CProxy_ComlibManager(CkpvAccess(cmgrID)).
67         ckLocalBranch()->getStrategy(instid);
68     
69     RECORD_RECV_STATS(instid, env->getTotalsize(), env->getSrcPe());
70     
71     nm_mgr->localMulticast(msg);
72     return NULL;
73 }
74
75 //Group Constructor
76 EachToManyMulticastStrategy::EachToManyMulticastStrategy(int substrategy, 
77                                                          int n_srcpes, 
78                                                          int *src_pelist,
79                                                          int n_destpes, 
80                                                          int *dest_pelist) 
81     : routerID(substrategy), CharmStrategy() {
82     
83     setType(GROUP_STRATEGY);
84
85     CkGroupID gid;
86     gid.setZero();
87     ginfo.setSourceGroup(gid, src_pelist, n_srcpes);    
88     ginfo.setDestinationGroup(gid, dest_pelist, n_destpes);
89
90     //Written in this funny way to be symmetric with the array case.
91     ginfo.getDestinationGroup(gid, destpelist, ndestpes);
92     ginfo.getCombinedPeList(pelist, npes);
93
94     commonInit();
95 }
96
97 //Array Constructor
98 EachToManyMulticastStrategy::EachToManyMulticastStrategy(int substrategy, 
99                                                          CkArrayID src, 
100                                                          CkArrayID dest, 
101                                                          int nsrc, 
102                                                          CkArrayIndexMax 
103                                                          *srcelements, 
104                                                          int ndest, 
105                                                          CkArrayIndexMax 
106                                                          *destelements)
107     :routerID(substrategy), CharmStrategy() {
108
109     setType(ARRAY_STRATEGY);
110     ainfo.setSourceArray(src, srcelements, nsrc);
111     ainfo.setDestinationArray(dest, destelements, ndest);
112
113     ainfo.getDestinationPeList(destpelist, ndestpes);
114     ainfo.getCombinedPeList(pelist, npes);
115     
116     /*
117       char dump[1000];
118       char sdump[100];
119       sprintf(dump, "%d: Each To MANY PELIST :\n", CkMyPe());
120       for(int count = 0; count < npes; count ++){
121       sprintf(sdump, "%d, ", pelist[count]);
122       strcat(dump, sdump);           
123       }    
124       ComlibPrintf("%s\n", dump);
125     */
126
127     commonInit();
128 }
129
130 extern char *router;
131 //Common initialization for both group and array constructors
132 void EachToManyMulticastStrategy::commonInit() {
133
134     setBracketed();
135     setForwardOnMigration(1);
136
137     if(CkMyPe() == 0 && router != NULL){
138         if(strcmp(router, "USE_MESH") == 0)
139             routerID = USE_MESH;
140         else if(strcmp(router, "USE_GRID") == 0)
141             routerID = USE_GRID;
142         else  if(strcmp(router, "USE_HYPERCUBE") == 0)
143             routerID = USE_HYPERCUBE;
144         else  if(strcmp(router, "USE_DIRECT") == 0)
145             routerID = USE_DIRECT;        
146
147         //Just for the first step. After learning the learned
148         //strategies will be chosen
149         router = NULL;
150     }
151     
152     ComlibPrintf("Creating Strategy %d\n", routerID);
153
154     rstrat = NULL;
155 }
156
157 EachToManyMulticastStrategy::~EachToManyMulticastStrategy() {
158 }
159
160
161 void EachToManyMulticastStrategy::insertMessage(CharmMessageHolder *cmsg){
162
163     ComlibPrintf("[%d] EachToManyMulticast: insertMessage \n", 
164                  CkMyPe());   
165
166     envelope *env = UsrToEnv(cmsg->getCharmMessage());
167
168     if(cmsg->dest_proc == IS_BROADCAST) {
169         //All to all multicast
170         
171         cmsg->npes = ndestpes;
172         cmsg->pelist = destpelist;
173         
174         //Use Multicast Learner (Foobar will not work for combinations
175         //of personalized and multicast messages
176         
177         CmiSetHandler(env, handlerId);
178
179         //Collect Multicast Statistics
180         RECORD_SENDM_STATS(getInstance(), env->getTotalsize(), 
181                            cmsg->pelist, cmsg->npes);
182     }
183     else {
184         //All to all personalized
185
186         //Collect Statistics
187         RECORD_SEND_STATS(getInstance(), env->getTotalsize(), 
188                           cmsg->dest_proc);
189     }        
190
191     rstrat->insertMessage(cmsg);
192 }
193
194 void EachToManyMulticastStrategy::doneInserting(){
195
196     StrategyTableEntry *sentry = 
197         CProxy_ComlibManager(CkpvAccess(cmgrID)).ckLocalBranch()
198         ->getStrategyTableEntry(getInstance());
199     int nexpected = sentry->numElements;
200     
201     if(routerID == USE_DIRECT && nexpected == 0)
202         return;
203     
204     if(MyPe < 0)
205         return;
206
207     //ComlibPrintf("%d: DoneInserting \n", CkMyPe());    
208     rstrat->doneInserting();
209 }
210
211 void EachToManyMulticastStrategy::pup(PUP::er &p){
212
213     int count = 0;
214     ComlibPrintf("[%d] Each To many::pup %s\n", CkMyPe(), 
215                  ((!p.isUnpacking() == 0)?("UnPacking"):("Packing")));
216
217     CharmStrategy::pup(p);
218
219     p | routerID; 
220     p | npes; p | ndestpes;     
221     
222     if(p.isUnpacking() && npes > 0) {
223         pelist = new int[npes];    
224     }
225
226     if(npes > 0)
227         p(pelist, npes);
228
229     if(p.isUnpacking() && ndestpes > 0) {
230         destpelist = new int[ndestpes];    
231     }    
232
233     if(ndestpes > 0)
234         p(destpelist, ndestpes);
235
236     if(p.isUnpacking()){
237         handlerId = CkRegisterHandler((CmiHandler)E2MHandler);
238         int handler = CkRegisterHandler((CmiHandler)itrDoneHandler);        
239         
240         if(npes > 0) {
241             rstrat = new RouterStrategy(routerID, handler, npes, pelist);
242             setConverseStrategy(rstrat);
243             MyPe = rstrat->getProcMap()[CkMyPe()];
244         }
245         else MyPe = -1;
246     }
247     
248     ComlibPrintf("[%d] End of pup\n", CkMyPe());
249 }
250
251 void EachToManyMulticastStrategy::beginProcessing(int numElements){
252     
253     ComlibPrintf("[%d] Begin processing %d\n", CkMyPe(), numElements);
254     /*
255     char dump[1000];
256     char sdump[100];
257     sprintf(dump, "%d: Each To MANY PELIST :\n", CkMyPe());
258     for(int count = 0; count < npes; count ++){
259         sprintf(sdump, "%d, ", pelist[count]);
260         strcat(dump, sdump);           
261     }    
262     ComlibPrintf("%s\n", dump);
263     */
264
265     int expectedDeposits = 0;
266
267     rstrat->setInstance(getInstance());
268
269     if(ainfo.isSourceArray()) 
270         expectedDeposits = numElements;
271     
272     if(getType() == GROUP_STRATEGY) {
273         
274         CkGroupID gid;
275         int *srcpelist;
276         int nsrcpes;
277         
278         ginfo.getSourceGroup(gid, srcpelist, nsrcpes);
279         
280         for(int count = 0; count < nsrcpes; count ++)
281             if(srcpelist[count] == CkMyPe()){
282                 expectedDeposits = 1;
283                 break;
284             }
285         
286         StrategyTableEntry *sentry = 
287             CProxy_ComlibManager(CkpvAccess(cmgrID)).ckLocalBranch()
288             ->getStrategyTableEntry(myInstanceID);
289         sentry->numElements = expectedDeposits;
290     }
291     
292     if(!mflag) 
293         setLearner(new AAPLearner());    
294     else 
295         setLearner(new AAMLearner());                
296     
297     if(expectedDeposits > 0)
298         return;
299     
300     if(expectedDeposits == 0 && MyPe >= 0)
301         ConvComlibScheduleDoneInserting(myInstanceID);
302 }
303
304 void EachToManyMulticastStrategy::finalizeProcessing() {
305     if(npes > 0)
306         delete [] pelist;
307     
308     if(ndestpes > 0)
309         delete [] destpelist;
310
311     if(rstrat)
312         delete rstrat;
313
314     if(getLearner() != NULL)
315         delete getLearner();
316 }
317
318 void EachToManyMulticastStrategy::localMulticast(void *msg){
319     register envelope *env = (envelope *)msg;
320     CkUnpackMessage(&env);
321     
322     ainfo.localBroadcast(env);
323 }
324