Several new changes
[charm.git] / src / ck-com / EachToManyMulticastStrategy.C
1
2 /*********************************************************
3             The EachToManyMulticast Strategy optimizes all-to-all
4             communication. It combines messages and sends them along
5             virtual topologies 2d mesh, 3d mesh and hypercube.
6
7             For large messages send them directly.
8
9             This is the object level strategy. For processor level
10             optimizations routers are called.
11
12   - Sameer Kumar.
13
14 **********************************************************/
15
16
17 #include "EachToManyMulticastStrategy.h"
18 #include "string.h"
19 #include "routerstrategy.h"
20
21 #include "AAPLearner.h"
22 #include "AAMLearner.h"
23
24 //EachToManyMulticastStrategy CODE
25 CkpvExtern(int, RecvdummyHandle);
26 CkpvExtern(CkGroupID, cmgrID);
27
28 void *itrDoneHandler(void *msg){
29
30     EachToManyMulticastStrategy *nm_mgr;
31     
32     DummyMsg *dmsg = (DummyMsg *)msg;
33     comID id = dmsg->id;
34     int instid = id.instanceID;
35     
36     CmiFree(msg);
37     ComlibPrintf("[%d] Iteration finished %d\n", CkMyPe(), instid);
38
39     StrategyTableEntry *sentry = 
40         CProxy_ComlibManager(CkpvAccess(cmgrID)).ckLocalBranch()
41         ->getStrategyTableEntry(instid);
42     int nexpected = sentry->numElements;
43     
44     if(nexpected == 0) {             
45         ComlibPrintf("[%d] Calling Dummy Done Inserting, %d, %d\n", CkMyPe(), instid, nexpected);
46         nm_mgr = (EachToManyMulticastStrategy *)sentry->strategy;    
47         nm_mgr->doneInserting();
48     }
49     
50     return NULL;
51 }
52
53 void *E2MHandler(void *msg){
54     //CkPrintf("[%d]:In EachtoMany CallbackHandler\n", CkMyPe());
55     EachToManyMulticastStrategy *nm_mgr;    
56     
57     CmiMsgHeaderExt *conv_header = (CmiMsgHeaderExt *) msg;
58     int instid = conv_header->stratid;
59
60     envelope *env = (envelope *)msg;
61     
62     nm_mgr = (EachToManyMulticastStrategy *) 
63         CProxy_ComlibManager(CkpvAccess(cmgrID)).
64         ckLocalBranch()->getStrategy(instid);
65     
66     RECORD_RECV_STATS(instid, env->getTotalsize(), env->getSrcPe());
67     
68     nm_mgr->localMulticast(msg);
69     return NULL;
70 }
71
72 //Group Constructor
73 EachToManyMulticastStrategy::EachToManyMulticastStrategy(int substrategy, 
74                                                          int n_srcpes, 
75                                                          int *src_pelist,
76                                                          int n_destpes, 
77                                                          int *dest_pelist) 
78     : routerID(substrategy), CharmStrategy() {
79     
80     setType(GROUP_STRATEGY);
81
82     CkGroupID gid;
83     gid.setZero();
84     ginfo.setSourceGroup(gid, src_pelist, n_srcpes);    
85     ginfo.setDestinationGroup(gid, dest_pelist, n_destpes);
86
87     //Written in this funny way to be symmetric with the array case.
88     ginfo.getDestinationGroup(gid, destpelist, ndestpes);
89     ginfo.getCombinedPeList(pelist, npes);
90
91     commonInit();
92 }
93
94 //Array Constructor
95 EachToManyMulticastStrategy::EachToManyMulticastStrategy(int substrategy, 
96                                                          CkArrayID src, 
97                                                          CkArrayID dest, 
98                                                          int nsrc, 
99                                                          CkArrayIndexMax 
100                                                          *srcelements, 
101                                                          int ndest, 
102                                                          CkArrayIndexMax 
103                                                          *destelements)
104     :routerID(substrategy), CharmStrategy() {
105
106     setType(ARRAY_STRATEGY);
107     ainfo.setSourceArray(src, srcelements, nsrc);
108     ainfo.setDestinationArray(dest, destelements, ndest);
109
110     ainfo.getDestinationPeList(destpelist, ndestpes);
111     ainfo.getCombinedPeList(pelist, npes);
112     
113     /*
114       char dump[1000];
115       char sdump[100];
116       sprintf(dump, "%d: Each To MANY PELIST :\n", CkMyPe());
117       for(int count = 0; count < npes; count ++){
118       sprintf(sdump, "%d, ", pelist[count]);
119       strcat(dump, sdump);           
120       }    
121       ComlibPrintf("%s\n", dump);
122     */
123
124     commonInit();
125 }
126
127 extern char *router;
128 //Common initialization for both group and array constructors
129 void EachToManyMulticastStrategy::commonInit() {
130
131     setBracketed();
132     setForwardOnMigration(1);
133
134     if(CkMyPe() == 0 && router != NULL){
135         if(strcmp(router, "USE_MESH") == 0)
136             routerID = USE_MESH;
137         else if(strcmp(router, "USE_GRID") == 0)
138             routerID = USE_GRID;
139         else  if(strcmp(router, "USE_HYPERCUBE") == 0)
140             routerID = USE_HYPERCUBE;
141         else  if(strcmp(router, "USE_DIRECT") == 0)
142             routerID = USE_DIRECT;        
143
144         //Just for the first step. After learning the learned
145         //strategies will be chosen
146         router = NULL;
147     }
148     
149     ComlibPrintf("Creating Strategy %d\n", routerID);
150
151     rstrat = NULL;
152 }
153
154 EachToManyMulticastStrategy::~EachToManyMulticastStrategy() {
155 }
156
157
158 void EachToManyMulticastStrategy::insertMessage(CharmMessageHolder *cmsg){
159
160     ComlibPrintf("[%d] EachToManyMulticast: insertMessage \n", 
161                  CkMyPe());   
162
163     envelope *env = UsrToEnv(cmsg->getCharmMessage());
164
165     if(cmsg->dest_proc == IS_BROADCAST) {
166         //All to all multicast
167         
168         cmsg->npes = ndestpes;
169         cmsg->pelist = destpelist;
170         
171         //Use Multicast Learner (Foobar will not work for combinations
172         //of personalized and multicast messages
173         
174         CmiSetHandler(env, handlerId);
175
176         //Collect Multicast Statistics
177         RECORD_SENDM_STATS(getInstance(), env->getTotalsize(), 
178                            cmsg->pelist, cmsg->npes);
179     }
180     else {
181         //All to all personalized
182
183         //Collect Statistics
184         RECORD_SEND_STATS(getInstance(), env->getTotalsize(), 
185                           cmsg->dest_proc);
186     }        
187
188     rstrat->insertMessage(cmsg);
189 }
190
191 void EachToManyMulticastStrategy::doneInserting(){
192
193     StrategyTableEntry *sentry = 
194         CProxy_ComlibManager(CkpvAccess(cmgrID)).ckLocalBranch()
195         ->getStrategyTableEntry(getInstance());
196     int nexpected = sentry->numElements;
197     
198     if(routerID == USE_DIRECT && nexpected == 0)
199         return;
200     
201     //ComlibPrintf("%d: DoneInserting \n", CkMyPe());    
202     rstrat->doneInserting();
203 }
204
205 void EachToManyMulticastStrategy::pup(PUP::er &p){
206
207     int count = 0;
208     ComlibPrintf("[%d] Each To many::pup %s\n", CkMyPe(), 
209                  ((!p.isUnpacking() == 0)?("UnPacking"):("Packing")));
210
211     CharmStrategy::pup(p);
212
213     p | routerID; 
214     p | npes; p | ndestpes;     
215     
216     if(p.isUnpacking() && npes > 0) {
217         pelist = new int[npes];    
218     }
219
220     if(npes > 0)
221         p(pelist, npes);
222
223     if(p.isUnpacking() && ndestpes > 0) {
224         destpelist = new int[ndestpes];    
225     }    
226
227     if(ndestpes > 0)
228         p(destpelist, ndestpes);
229
230     if(p.isUnpacking()){
231         handlerId = CkRegisterHandler((CmiHandler)E2MHandler);
232         int handler = CkRegisterHandler((CmiHandler)itrDoneHandler);        
233         
234         rstrat = new RouterStrategy(routerID, handler, npes, pelist);
235         setConverseStrategy(rstrat);
236         MyPe = rstrat->getProcMap()[CkMyPe()];
237     }
238     
239     ComlibPrintf("[%d] End of pup\n", CkMyPe());
240 }
241
242 void EachToManyMulticastStrategy::beginProcessing(int numElements){
243     
244     ComlibPrintf("[%d] Begin processing %d\n", CkMyPe(), numElements);
245     /*
246     char dump[1000];
247     char sdump[100];
248     sprintf(dump, "%d: Each To MANY PELIST :\n", CkMyPe());
249     for(int count = 0; count < npes; count ++){
250         sprintf(sdump, "%d, ", pelist[count]);
251         strcat(dump, sdump);           
252     }    
253     ComlibPrintf("%s\n", dump);
254     */
255
256     int expectedDeposits = 0;
257
258     rstrat->setInstance(getInstance());
259
260     if(ainfo.isSourceArray()) 
261         expectedDeposits = numElements;
262     
263     if(getType() == GROUP_STRATEGY) {
264         
265         CkGroupID gid;
266         int *srcpelist;
267         int nsrcpes;
268         
269         ginfo.getSourceGroup(gid, srcpelist, nsrcpes);
270         
271         for(int count = 0; count < nsrcpes; count ++)
272             if(srcpelist[count] == CkMyPe()){
273                 expectedDeposits = 1;
274                 break;
275             }
276         
277         StrategyTableEntry *sentry = 
278             CProxy_ComlibManager(CkpvAccess(cmgrID)).ckLocalBranch()
279             ->getStrategyTableEntry(myInstanceID);
280         sentry->numElements = expectedDeposits;
281     }
282     
283     if(!mflag) 
284         setLearner(new AAPLearner());    
285     else 
286         setLearner(new AAMLearner());                
287     
288     if(expectedDeposits > 0)
289         return;
290     
291     if(expectedDeposits == 0 && MyPe >= 0)
292         ConvComlibScheduleDoneInserting(myInstanceID);
293 }
294
295 void EachToManyMulticastStrategy::finalizeProcessing() {
296     if(npes > 0)
297         delete [] pelist;
298     
299     if(ndestpes > 0)
300         delete [] destpelist;
301
302     if(rstrat)
303         delete rstrat;
304
305     if(getLearner() != NULL)
306         delete getLearner();
307 }
308
309 void EachToManyMulticastStrategy::localMulticast(void *msg){
310     register envelope *env = (envelope *)msg;
311     CkUnpackMessage(&env);
312     
313     ainfo.localBroadcast(env);
314 }
315