Adding some user events to trace the multicasts.
[charm.git] / src / ck-com / EachToManyMulticastStrategy.C
1 /**
2    @addtogroup ComlibCharmStrategy
3    @{
4    @file 
5 */
6
7 #include "EachToManyMulticastStrategy.h"
8 #include "string.h"
9 //#include "routerstrategy.h"
10
11 #include "AAPLearner.h"
12 #include "AAMLearner.h"
13
14 //EachToManyMulticastStrategy CODE
15 //CkpvExtern(int, RecvdummyHandle);
16 //CkpvExtern(CkGroupID, cmgrID);
17
18 /*
19 void *itrDoneHandler(void *msg){
20
21     EachToManyMulticastStrategy *nm_mgr;
22
23     DummyMsg *dmsg = (DummyMsg *)msg;
24     comID id = dmsg->id;
25     int instid = id.instanceID;
26
27     CmiFree(msg);
28     ComlibPrintf("[%d] Iteration finished %d\n", CkMyPe(), instid);
29
30     StrategyTableEntry *sentry = 
31         CProxy_ComlibManager(CkpvAccess(cmgrID)).ckLocalBranch()
32         ->getStrategyTableEntry(instid);
33     int nexpected = sentry->numElements;
34
35     if(nexpected == 0) {             
36         ComlibPrintf("[%d] Calling Dummy Done Inserting, %d, %d\n", CkMyPe(), instid, nexpected);
37         nm_mgr = (EachToManyMulticastStrategy *)sentry->strategy;    
38         nm_mgr->doneInserting();
39
40         if (!nm_mgr->getOnFinish().isInvalid()) nm_mgr->getOnFinish().send(0);
41
42     }
43
44     return NULL;
45 }
46  */
47 /*
48 void *E2MHandler(void *msg){
49     //CkPrintf("[%d]:In EachtoMany CallbackHandler\n", CkMyPe());
50     EachToManyMulticastStrategy *nm_mgr;    
51
52     CmiMsgHeaderExt *conv_header = (CmiMsgHeaderExt *) msg;
53     int instid = conv_header->stratid;
54
55     envelope *env = (envelope *)msg;
56
57     nm_mgr = (EachToManyMulticastStrategy *) 
58         CProxy_ComlibManager(CkpvAccess(cmgrID)).
59         ckLocalBranch()->getStrategy(instid);
60
61     RECORD_RECV_STATS(instid, env->getTotalsize(), env->getSrcPe());
62
63     nm_mgr->localMulticast(msg);
64     return NULL;
65 }
66  */
67
68 //Group Constructor
69 EachToManyMulticastStrategy::EachToManyMulticastStrategy(int substrategy,
70                 CkGroupID src,
71                 CkGroupID dest,
72                 int n_srcpes, 
73                 int *src_pelist,
74                 int n_destpes, 
75                 int *dest_pelist) 
76 : RouterStrategy(substrategy), CharmStrategy() {
77         ComlibPrintf("[%d] EachToManyMulticast group constructor\n",CkMyPe());
78
79         setType(GROUP_STRATEGY);
80
81         //CkGroupID gid;
82         //gid.setZero();
83         ginfo.setSourceGroup(src, src_pelist, n_srcpes);    
84         ginfo.setDestinationGroup(dest, dest_pelist, n_destpes);
85
86         //Written in this funny way to be symmetric with the array case.
87         //ginfo.getDestinationGroup(gid, destpelist, ndestpes);
88         //ginfo.getCombinedPeList(pelist, npes);
89
90         commonInit(ginfo.getCombinedCountList());
91 }
92
93 //Array Constructor
94 EachToManyMulticastStrategy::EachToManyMulticastStrategy(int substrategy, 
95                 CkArrayID src, 
96                 CkArrayID dest, 
97                 int nsrc, 
98                 CkArrayIndexMax *srcelements, 
99                 int ndest, 
100                 CkArrayIndexMax *destelements)
101 : RouterStrategy(substrategy), CharmStrategy() {
102         ComlibPrintf("[%d] EachToManyMulticast array constructor. nsrc=%d ndest=%d\n",CkMyPe(), nsrc, ndest);
103
104         setType(ARRAY_STRATEGY);
105         ainfo.setSourceArray(src, srcelements, nsrc);
106         ainfo.setDestinationArray(dest, destelements, ndest);
107
108         int *count = ainfo.getCombinedCountList();
109         //ainfo.getSourcePeList(nsrcPe, srcPe);
110         //ainfo.getDestinationPeList(ndestPe, destPe);
111
112         /*
113       char dump[1000];
114       char sdump[100];
115       sprintf(dump, "%d: Each To MANY PELIST :\n", CkMyPe());
116       for(int count = 0; count < npes; count ++){
117       sprintf(sdump, "%d, ", pelist[count]);
118       strcat(dump, sdump);           
119       }    
120       ComlibPrintf("%s\n", dump);
121          */
122
123         commonInit(count);
124 }
125
126 extern char *routerName;
127 //Common initialization for both group and array constructors
128 void EachToManyMulticastStrategy::commonInit(int *count) {
129
130         setBracketed();
131         //setForwardOnMigration(1);
132
133         if(CkMyPe() == 0 && routerName != NULL){
134                 if(strcmp(routerName, "USE_MESH") == 0)
135                         routerIDsaved = USE_MESH;
136                 else if(strcmp(routerName, "USE_GRID") == 0)
137                         routerIDsaved = USE_GRID;
138                 else  if(strcmp(routerName, "USE_HYPERCUBE") == 0)
139                         routerIDsaved = USE_HYPERCUBE;
140                 else  if(strcmp(routerName, "USE_DIRECT") == 0)
141                         routerIDsaved = USE_DIRECT;        
142                 else  if(strcmp(routerName, "USE_PREFIX") == 0)
143                         routerIDsaved = USE_PREFIX;        
144
145                 //Just for the first step. After learning the learned
146                 //strategies will be chosen
147                 //router = NULL;
148         }
149
150         //ComlibPrintf("Creating Strategy %d\n", routerID);
151
152         // finish the creation of the RouterStrategy superclass
153         bracketedUpdatePeKnowledge(count);
154         delete [] count;
155         //rstrat = NULL;
156 }
157
158 EachToManyMulticastStrategy::~EachToManyMulticastStrategy() {
159 }
160
161
162 void EachToManyMulticastStrategy::insertMessage(CharmMessageHolder *cmsg){
163
164   cmsg -> checkme();
165
166         ComlibPrintf("[%d] EachToManyMulticast: insertMessage\n", CkMyPe());
167
168         envelope *env = UsrToEnv(cmsg->getCharmMessage());
169
170         if(cmsg->dest_proc == IS_BROADCAST) {
171                 //All to all multicast
172
173                 // not necessary to set cmsg since the superclass will take care
174                 //cmsg->npes = ndestPes;
175                 //cmsg->pelist = destPes;
176
177                 //Use Multicast Learner (Foobar will not work for combinations
178                 //of personalized and multicast messages
179
180                 // this handler will ensure that handleMessage is called if direcly
181                 // sent, or deliver is called if normal routing path is taken
182                 CmiSetHandler(env, CkpvAccess(comlib_handler));
183
184                 //Collect Multicast Statistics
185                 RECORD_SENDM_STATS(getInstance(), env->getTotalsize(), 
186                                 cmsg->pelist, cmsg->npes);
187         }
188         else {
189                 //All to all personalized
190
191                 //Collect Statistics
192                 RECORD_SEND_STATS(getInstance(), env->getTotalsize(), 
193                                 cmsg->dest_proc);
194         }
195
196         RouterStrategy::insertMessage(cmsg);
197 }
198
199 /*
200 void EachToManyMulticastStrategy::doneInserting(){
201
202     StrategyTableEntry *sentry = 
203         CProxy_ComlibManager(CkpvAccess(cmgrID)).ckLocalBranch()
204         ->getStrategyTableEntry(getInstance());
205     int nexpected = sentry->numElements;
206
207     if(routerID == USE_DIRECT && nexpected == 0)
208         return;
209
210     if(MyPe < 0)
211         return;
212
213     //ComlibPrintf("%d: DoneInserting \n", CkMyPe());    
214     rstrat->doneInserting();
215 }
216  */
217
218 void EachToManyMulticastStrategy::pup(PUP::er &p){
219
220         ComlibPrintf("[%d] EachToManyMulticastStrategy::pup called for %s\n", CkMyPe(), 
221                         p.isPacking()?"packing":(p.isUnpacking()?"unpacking":"sizing"));
222
223         RouterStrategy::pup(p);
224         CharmStrategy::pup(p);
225
226 }
227
228 /* NOT NEEDED
229 void EachToManyMulticastStrategy::finalizeCreation() {
230   ainfo.purge();
231 }
232  */
233
234 // void EachToManyMulticastStrategy::beginProcessing(int numElements){
235
236 //     ComlibPrintf("[%d] Begin processing %d\n", CkMyPe(), numElements);
237 //     /*
238 //     char dump[1000];
239 //     char sdump[100];
240 //     sprintf(dump, "%d: Each To MANY PELIST :\n", CkMyPe());
241 //     for(int count = 0; count < npes; count ++){
242 //         sprintf(sdump, "%d, ", pelist[count]);
243 //         strcat(dump, sdump);           
244 //     }    
245 //     ComlibPrintf("%s\n", dump);
246 //     */
247
248 //     int expectedDeposits = 0;
249
250 //     rstrat->setInstance(getInstance());
251
252 //     if(ainfo.isSourceArray()) 
253 //         expectedDeposits = numElements;
254
255 //     if(getType() == GROUP_STRATEGY) {
256
257 //         CkGroupID gid;
258 //         int *srcpelist;
259 //         int nsrcpes;
260
261 //         ginfo.getSourceGroup(gid, srcpelist, nsrcpes);
262
263 //         for(int count = 0; count < nsrcpes; count ++)
264 //             if(srcpelist[count] == CkMyPe()){
265 //                 expectedDeposits = 1;
266 //                 break;
267 //             }
268
269 //         StrategyTableEntry *sentry = 
270 //             CProxy_ComlibManager(CkpvAccess(cmgrID)).ckLocalBranch()
271 //             ->getStrategyTableEntry(myInstanceID);
272 //         sentry->numElements = expectedDeposits;
273 //     }
274 //     /*
275 //     if(useLearner) 
276 //         setLearner(new AAPLearner());    
277 //     else 
278 //         setLearner(new AAMLearner());                
279 //     */
280
281 //     if(expectedDeposits > 0)
282 //         return;
283
284 //     if(expectedDeposits == 0 && MyPe >= 0)
285 //         ConvComlibScheduleDoneInserting(myInstanceID);
286 // }
287
288 /*
289 void EachToManyMulticastStrategy::finalizeProcessing() {
290     if(npes > 0)
291         delete [] pelist;
292
293     if(ndestpes > 0)
294         delete [] destpelist;
295
296     if(rstrat)
297         delete rstrat;
298
299     if(useLearner && getLearner() != NULL)
300         delete getLearner();
301 }
302  */
303
304 void EachToManyMulticastStrategy::deliver(char *msg, int size) {
305         ComlibPrintf("[%d] EachToManyMulticastStrategy::deliver for %s\n",
306                         CkMyPe(), isAllToAll()?"multicast":"personalized");
307         
308         envelope *env = (envelope *)msg;
309         RECORD_RECV_STATS(myHandle, env->getTotalsize(), env->getSrcPe());
310
311         if (isAllToAll()){
312                 ComlibPrintf("Delivering via localMulticast()\n");
313                 localMulticast(msg);
314         }
315         else { 
316                 if (getType() == GROUP_STRATEGY) {
317                         ComlibPrintf("Delivering via personalized CkSendMsgBranchInline\n");
318                         CkUnpackMessage(&env);
319                         CkSendMsgBranchInline(env->getEpIdx(), EnvToUsr(env), CkMyPe(), env->getGroupNum());
320                 }
321                 else if (getType() == ARRAY_STRATEGY) {
322                         //        CkPrintf("[%d] Delivering via ComlibArrayInfo::deliver()\n", CkMyPe());
323                         //      ComlibArrayInfo::deliver(env);
324
325                         // call ainfo's localBroadcast(env);
326                         ComlibPrintf("[%d] Delivering via ComlibArrayInfo::localBroadcast()\n", CkMyPe());
327                         ainfo.localBroadcast(env);
328
329                 }
330         }
331 }
332
333 void EachToManyMulticastStrategy::localMulticast(void *msg){
334         register envelope *env = (envelope *)msg;
335         CkUnpackMessage(&env);
336         CkPrintf("localMulticast calls ainfo.localBroadcast()\n");
337         ainfo.localBroadcast(env);
338 }
339
340 void EachToManyMulticastStrategy::notifyDone() {
341         if (!getOnFinish().isInvalid()) getOnFinish().send(0);
342         RouterStrategy::notifyDone();
343 }
344
345 /*@}*/