Making the learner a dynamic switch rather than a compile time switch.
[charm.git] / src / ck-com / EachToManyMulticastStrategy.C
1
2 /*********************************************************
3             The EachToManyMulticast Strategy optimizes all-to-all
4             communication. It combines messages and sends them along
5             virtual topologies 2d mesh, 3d mesh and hypercube.
6
7             For large messages send them directly.
8
9             This is the object level strategy. For processor level
10             optimizations routers are called.
11
12   - Sameer Kumar.
13
14 **********************************************************/
15
16
17 #include "EachToManyMulticastStrategy.h"
18 #include "string.h"
19 #include "routerstrategy.h"
20
21 #include "AAPLearner.h"
22 #include "AAMLearner.h"
23
24 //EachToManyMulticastStrategy CODE
25 CkpvExtern(int, RecvdummyHandle);
26 CkpvExtern(CkGroupID, cmgrID);
27
28 void *itrDoneHandler(void *msg){
29
30     EachToManyMulticastStrategy *nm_mgr;
31     
32     DummyMsg *dmsg = (DummyMsg *)msg;
33     comID id = dmsg->id;
34     int instid = id.instanceID;
35     
36     CmiFree(msg);
37     ComlibPrintf("[%d] Iteration finished %d\n", CkMyPe(), instid);
38
39     StrategyTableEntry *sentry = 
40         CProxy_ComlibManager(CkpvAccess(cmgrID)).ckLocalBranch()
41         ->getStrategyTableEntry(instid);
42     int nexpected = sentry->numElements;
43     
44     if(nexpected == 0) {             
45         ComlibPrintf("[%d] Calling Dummy Done Inserting, %d, %d\n", 
46                      CkMyPe(), instid, nexpected);
47         nm_mgr = (EachToManyMulticastStrategy *)sentry->strategy;    
48         nm_mgr->doneInserting();
49         
50         if (!nm_mgr->getOnFinish().isInvalid()) nm_mgr->getOnFinish().send(0);
51     
52     }
53
54     return NULL;
55 }
56
57 void *E2MHandler(void *msg){
58     //CkPrintf("[%d]:In EachtoMany CallbackHandler\n", CkMyPe());
59     EachToManyMulticastStrategy *nm_mgr;    
60     
61     CmiMsgHeaderExt *conv_header = (CmiMsgHeaderExt *) msg;
62     int instid = conv_header->stratid;
63
64     envelope *env = (envelope *)msg;
65     
66     nm_mgr = (EachToManyMulticastStrategy *) 
67         CProxy_ComlibManager(CkpvAccess(cmgrID)).
68         ckLocalBranch()->getStrategy(instid);
69     
70     RECORD_RECV_STATS(instid, env->getTotalsize(), env->getSrcPe());
71     
72     nm_mgr->localMulticast(msg);
73     return NULL;
74 }
75
76 //Group Constructor
77 EachToManyMulticastStrategy::EachToManyMulticastStrategy(int substrategy, 
78                                                          int n_srcpes, 
79                                                          int *src_pelist,
80                                                          int n_destpes, 
81                                                          int *dest_pelist) 
82     : routerID(substrategy), CharmStrategy() {
83     
84     setType(GROUP_STRATEGY);
85
86     CkGroupID gid;
87     gid.setZero();
88     ginfo.setSourceGroup(gid, src_pelist, n_srcpes);    
89     ginfo.setDestinationGroup(gid, dest_pelist, n_destpes);
90
91     //Written in this funny way to be symmetric with the array case.
92     ginfo.getDestinationGroup(gid, destpelist, ndestpes);
93     ginfo.getCombinedPeList(pelist, npes);
94
95     commonInit();
96 }
97
98 //Array Constructor
99 EachToManyMulticastStrategy::EachToManyMulticastStrategy(int substrategy, 
100                                                          CkArrayID src, 
101                                                          CkArrayID dest, 
102                                                          int nsrc, 
103                                                          CkArrayIndexMax 
104                                                          *srcelements, 
105                                                          int ndest, 
106                                                          CkArrayIndexMax 
107                                                          *destelements)
108     :routerID(substrategy), CharmStrategy() {
109
110     setType(ARRAY_STRATEGY);
111     ainfo.setSourceArray(src, srcelements, nsrc);
112     ainfo.setDestinationArray(dest, destelements, ndest);
113
114     ainfo.getDestinationPeList(destpelist, ndestpes);
115     ainfo.getCombinedPeList(pelist, npes);
116     
117     /*
118       char dump[1000];
119       char sdump[100];
120       sprintf(dump, "%d: Each To MANY PELIST :\n", CkMyPe());
121       for(int count = 0; count < npes; count ++){
122       sprintf(sdump, "%d, ", pelist[count]);
123       strcat(dump, sdump);           
124       }    
125       ComlibPrintf("%s\n", dump);
126     */
127
128     commonInit();
129 }
130
131 extern char *router;
132 //Common initialization for both group and array constructors
133 void EachToManyMulticastStrategy::commonInit() {
134
135     setBracketed();
136     setForwardOnMigration(1);
137
138     if(CkMyPe() == 0 && router != NULL){
139         if(strcmp(router, "USE_MESH") == 0)
140             routerID = USE_MESH;
141         else if(strcmp(router, "USE_GRID") == 0)
142             routerID = USE_GRID;
143         else  if(strcmp(router, "USE_HYPERCUBE") == 0)
144             routerID = USE_HYPERCUBE;
145         else  if(strcmp(router, "USE_DIRECT") == 0)
146             routerID = USE_DIRECT;        
147         else  if(strcmp(router, "USE_PREFIX") == 0)
148             routerID = USE_PREFIX;        
149
150         //Just for the first step. After learning the learned
151         //strategies will be chosen
152         router = NULL;
153     }
154     
155     ComlibPrintf("Creating Strategy %d\n", routerID);
156
157     useLearner = 0;
158     rstrat = NULL;
159 }
160
161 EachToManyMulticastStrategy::~EachToManyMulticastStrategy() {
162 }
163
164
165 void EachToManyMulticastStrategy::insertMessage(CharmMessageHolder *cmsg){
166
167     ComlibPrintf("[%d] EachToManyMulticast: insertMessage \n", 
168                  CkMyPe());   
169
170     envelope *env = UsrToEnv(cmsg->getCharmMessage());
171
172     if(cmsg->dest_proc == IS_BROADCAST) {
173         //All to all multicast
174         
175         cmsg->npes = ndestpes;
176         cmsg->pelist = destpelist;
177         
178         //Use Multicast Learner (Foobar will not work for combinations
179         //of personalized and multicast messages
180         
181         CmiSetHandler(env, handlerId);
182
183         //Collect Multicast Statistics
184         RECORD_SENDM_STATS(getInstance(), env->getTotalsize(), 
185                            cmsg->pelist, cmsg->npes);
186     }
187     else {
188         //All to all personalized
189
190         //Collect Statistics
191         RECORD_SEND_STATS(getInstance(), env->getTotalsize(), 
192                           cmsg->dest_proc);
193     }        
194
195     rstrat->insertMessage(cmsg);
196 }
197
198 void EachToManyMulticastStrategy::doneInserting(){
199
200     StrategyTableEntry *sentry = 
201         CProxy_ComlibManager(CkpvAccess(cmgrID)).ckLocalBranch()
202         ->getStrategyTableEntry(getInstance());
203     int nexpected = sentry->numElements;
204     
205     if(routerID == USE_DIRECT && nexpected == 0)
206         return;
207     
208     if(MyPe < 0)
209         return;
210
211     //ComlibPrintf("%d: DoneInserting \n", CkMyPe());    
212     rstrat->doneInserting();
213 }
214
215 void EachToManyMulticastStrategy::pup(PUP::er &p){
216
217     int count = 0;
218     ComlibPrintf("[%d] Each To many::pup %s\n", CkMyPe(), 
219                  ((!p.isUnpacking() == 0)?("UnPacking"):("Packing")));
220
221     CharmStrategy::pup(p);
222
223     p | routerID; 
224     p | npes; p | ndestpes;     
225     p | useLearner;
226
227     if(p.isUnpacking() && npes > 0) {
228         pelist = new int[npes];    
229     }
230
231     if(npes > 0)
232         p(pelist, npes);
233
234     if(p.isUnpacking() && ndestpes > 0) {
235         destpelist = new int[ndestpes];    
236     }    
237
238     if(ndestpes > 0)
239         p(destpelist, ndestpes);
240
241     if(p.isUnpacking()){
242         handlerId = CkRegisterHandler((CmiHandler)E2MHandler);
243         int handler = CkRegisterHandler((CmiHandler)itrDoneHandler);        
244         
245         if(npes > 0) {
246             rstrat = new RouterStrategy(routerID, handler, npes, pelist);
247             setConverseStrategy(rstrat);
248             MyPe = rstrat->getProcMap()[CkMyPe()];
249         }
250         else MyPe = -1;
251     }
252     
253     ComlibPrintf("[%d] End of pup\n", CkMyPe());
254 }
255
256 void EachToManyMulticastStrategy::beginProcessing(int numElements){
257     
258     ComlibPrintf("[%d] Begin processing %d\n", CkMyPe(), numElements);
259     /*
260     char dump[1000];
261     char sdump[100];
262     sprintf(dump, "%d: Each To MANY PELIST :\n", CkMyPe());
263     for(int count = 0; count < npes; count ++){
264         sprintf(sdump, "%d, ", pelist[count]);
265         strcat(dump, sdump);           
266     }    
267     ComlibPrintf("%s\n", dump);
268     */
269
270     int expectedDeposits = 0;
271
272     rstrat->setInstance(getInstance());
273
274     if(ainfo.isSourceArray()) 
275         expectedDeposits = numElements;
276     
277     if(getType() == GROUP_STRATEGY) {
278         
279         CkGroupID gid;
280         int *srcpelist;
281         int nsrcpes;
282         
283         ginfo.getSourceGroup(gid, srcpelist, nsrcpes);
284         
285         for(int count = 0; count < nsrcpes; count ++)
286             if(srcpelist[count] == CkMyPe()){
287                 expectedDeposits = 1;
288                 break;
289             }
290         
291         StrategyTableEntry *sentry = 
292             CProxy_ComlibManager(CkpvAccess(cmgrID)).ckLocalBranch()
293             ->getStrategyTableEntry(myInstanceID);
294         sentry->numElements = expectedDeposits;
295     }
296     
297     if(useLearner) {
298         if(!mflag) 
299             setLearner(new AAPLearner());    
300         else 
301             setLearner(new AAMLearner());                
302     }
303     
304     if(expectedDeposits > 0)
305         return;
306     
307     if(expectedDeposits == 0 && MyPe >= 0)
308         ConvComlibScheduleDoneInserting(myInstanceID);
309 }
310
311 void EachToManyMulticastStrategy::finalizeProcessing() {
312     if(npes > 0)
313         delete [] pelist;
314     
315     if(ndestpes > 0)
316         delete [] destpelist;
317
318     if(rstrat)
319         delete rstrat;
320
321     if(useLearner && getLearner() != NULL)
322         delete getLearner();
323 }
324
325 void EachToManyMulticastStrategy::localMulticast(void *msg){
326     register envelope *env = (envelope *)msg;
327     CkUnpackMessage(&env);
328     
329     ainfo.localBroadcast(env);
330 }
331