Need to delete the elements of hashtable before deleting it.
[charm.git] / src / ck-com / EachToManyMulticastStrategy.C
1
2 /*********************************************************
3             The EachToManyMulticast Strategy optimizes all-to-all
4             communication. It combines messages and sends them along
5             virtual topologies 2d mesh, 3d mesh and hypercube.
6
7             For large messages send them directly.
8
9             This is the object level strategy. For processor level
10             optimizations routers are called.
11
12   - Sameer Kumar.
13
14 **********************************************************/
15
16
17 #include "EachToManyMulticastStrategy.h"
18 #include "string.h"
19 #include "routerstrategy.h"
20
21 #include "AAPLearner.h"
22 #include "AAMLearner.h"
23
24 //EachToManyMulticastStrategy CODE
25 CkpvExtern(int, RecvdummyHandle);
26 CkpvExtern(CkGroupID, cmgrID);
27
28 void *itrDoneHandler(void *msg){
29
30     EachToManyMulticastStrategy *nm_mgr;
31     
32     DummyMsg *dmsg = (DummyMsg *)msg;
33     comID id = dmsg->id;
34     int instid = id.instanceID;
35     
36     CmiFree(msg);
37     ComlibPrintf("[%d] Iteration finished %d\n", CkMyPe(), instid);
38
39     StrategyTableEntry *sentry = 
40         CProxy_ComlibManager(CkpvAccess(cmgrID)).ckLocalBranch()
41         ->getStrategyTableEntry(instid);
42     int nexpected = sentry->numElements;
43     
44     if(nexpected == 0) {             
45         ComlibPrintf("[%d] Calling Dummy Done Inserting, %d, %d\n", CkMyPe(), instid, nexpected);
46         nm_mgr = (EachToManyMulticastStrategy *)sentry->strategy;    
47         nm_mgr->doneInserting();
48     }
49     
50     return NULL;
51 }
52
53 void *E2MHandler(void *msg){
54     //CkPrintf("[%d]:In EachtoMany CallbackHandler\n", CkMyPe());
55     EachToManyMulticastStrategy *nm_mgr;    
56     
57     CmiMsgHeaderExt *conv_header = (CmiMsgHeaderExt *) msg;
58     int instid = conv_header->stratid;
59
60     envelope *env = (envelope *)msg;
61     
62     nm_mgr = (EachToManyMulticastStrategy *) 
63         CProxy_ComlibManager(CkpvAccess(cmgrID)).
64         ckLocalBranch()->getStrategy(instid);
65     
66     RECORD_RECV_STATS(instid, env->getTotalsize(), env->getSrcPe());
67     
68     nm_mgr->localMulticast(msg);
69     return NULL;
70 }
71
72 //Group Constructor
73 EachToManyMulticastStrategy::EachToManyMulticastStrategy(int substrategy, 
74                                                          int n_srcpes, 
75                                                          int *src_pelist,
76                                                          int n_destpes, 
77                                                          int *dest_pelist) 
78     : routerID(substrategy), CharmStrategy() {
79     
80     setType(GROUP_STRATEGY);
81
82     CkGroupID gid;
83     gid.setZero();
84     ginfo.setSourceGroup(gid, src_pelist, n_srcpes);    
85     ginfo.setDestinationGroup(gid, dest_pelist, n_destpes);
86
87     //Written in this funny way to be symmetric with the array case.
88     ginfo.getDestinationGroup(gid, destpelist, ndestpes);
89     ginfo.getCombinedPeList(pelist, npes);
90
91     commonInit();
92 }
93
94 //Array Constructor
95 EachToManyMulticastStrategy::EachToManyMulticastStrategy(int substrategy, 
96                                                          CkArrayID src, 
97                                                          CkArrayID dest, 
98                                                          int nsrc, 
99                                                          CkArrayIndexMax 
100                                                          *srcelements, 
101                                                          int ndest, 
102                                                          CkArrayIndexMax 
103                                                          *destelements)
104     :routerID(substrategy), CharmStrategy() {
105
106     setType(ARRAY_STRATEGY);
107     ainfo.setSourceArray(src, srcelements, nsrc);
108     ainfo.setDestinationArray(dest, destelements, ndest);
109
110     ainfo.getDestinationPeList(destpelist, ndestpes);
111     ainfo.getCombinedPeList(pelist, npes);
112     
113     /*
114       char dump[1000];
115       char sdump[100];
116       sprintf(dump, "%d: Each To MANY PELIST :\n", CkMyPe());
117       for(int count = 0; count < npes; count ++){
118       sprintf(sdump, "%d, ", pelist[count]);
119       strcat(dump, sdump);           
120       }    
121       ComlibPrintf("%s\n", dump);
122     */
123
124     commonInit();
125 }
126
127 extern char *router;
128 //Common initialization for both group and array constructors
129 void EachToManyMulticastStrategy::commonInit() {
130
131     setBracketed();
132     setForwardOnMigration(1);
133
134     if(CkMyPe() == 0 && router != NULL){
135         if(strcmp(router, "USE_MESH") == 0)
136             routerID = USE_MESH;
137         else if(strcmp(router, "USE_GRID") == 0)
138             routerID = USE_GRID;
139         else  if(strcmp(router, "USE_HYPERCUBE") == 0)
140             routerID = USE_HYPERCUBE;
141         else  if(strcmp(router, "USE_DIRECT") == 0)
142             routerID = USE_DIRECT;        
143
144         //Just for the first step. After learning the learned
145         //strategies will be chosen
146         router = NULL;
147     }
148     
149     ComlibPrintf("Creating Strategy %d\n", routerID);
150
151     rstrat = NULL;
152 }
153
154 EachToManyMulticastStrategy::~EachToManyMulticastStrategy() {
155 }
156
157
158 void EachToManyMulticastStrategy::insertMessage(CharmMessageHolder *cmsg){
159
160     ComlibPrintf("[%d] EachToManyMulticast: insertMessage \n", 
161                  CkMyPe());   
162
163     envelope *env = UsrToEnv(cmsg->getCharmMessage());
164
165     if(cmsg->dest_proc == IS_BROADCAST) {
166         //All to all multicast
167         
168         cmsg->npes = ndestpes;
169         cmsg->pelist = destpelist;
170         
171         //Use Multicast Learner (Foobar will not work for combinations
172         //of personalized and multicast messages
173         
174         CmiSetHandler(env, handlerId);
175
176         //Collect Multicast Statistics
177         RECORD_SENDM_STATS(getInstance(), env->getTotalsize(), 
178                            cmsg->pelist, cmsg->npes);
179     }
180     else {
181         //All to all personalized
182
183         //Collect Statistics
184         RECORD_SEND_STATS(getInstance(), env->getTotalsize(), 
185                           cmsg->dest_proc);
186     }        
187
188     rstrat->insertMessage(cmsg);
189 }
190
191 void EachToManyMulticastStrategy::doneInserting(){
192
193     StrategyTableEntry *sentry = 
194         CProxy_ComlibManager(CkpvAccess(cmgrID)).ckLocalBranch()
195         ->getStrategyTableEntry(getInstance());
196     int nexpected = sentry->numElements;
197     
198     if(routerID == USE_DIRECT && nexpected == 0)
199         return;
200     
201     if(MyPe < 0)
202         return;
203
204     //ComlibPrintf("%d: DoneInserting \n", CkMyPe());    
205     rstrat->doneInserting();
206 }
207
208 void EachToManyMulticastStrategy::pup(PUP::er &p){
209
210     int count = 0;
211     ComlibPrintf("[%d] Each To many::pup %s\n", CkMyPe(), 
212                  ((!p.isUnpacking() == 0)?("UnPacking"):("Packing")));
213
214     CharmStrategy::pup(p);
215
216     p | routerID; 
217     p | npes; p | ndestpes;     
218     
219     if(p.isUnpacking() && npes > 0) {
220         pelist = new int[npes];    
221     }
222
223     if(npes > 0)
224         p(pelist, npes);
225
226     if(p.isUnpacking() && ndestpes > 0) {
227         destpelist = new int[ndestpes];    
228     }    
229
230     if(ndestpes > 0)
231         p(destpelist, ndestpes);
232
233     if(p.isUnpacking()){
234         handlerId = CkRegisterHandler((CmiHandler)E2MHandler);
235         int handler = CkRegisterHandler((CmiHandler)itrDoneHandler);        
236         
237         if(npes > 0) {
238             rstrat = new RouterStrategy(routerID, handler, npes, pelist);
239             setConverseStrategy(rstrat);
240             MyPe = rstrat->getProcMap()[CkMyPe()];
241         }
242         else MyPe = -1;
243     }
244     
245     ComlibPrintf("[%d] End of pup\n", CkMyPe());
246 }
247
248 void EachToManyMulticastStrategy::beginProcessing(int numElements){
249     
250     ComlibPrintf("[%d] Begin processing %d\n", CkMyPe(), numElements);
251     /*
252     char dump[1000];
253     char sdump[100];
254     sprintf(dump, "%d: Each To MANY PELIST :\n", CkMyPe());
255     for(int count = 0; count < npes; count ++){
256         sprintf(sdump, "%d, ", pelist[count]);
257         strcat(dump, sdump);           
258     }    
259     ComlibPrintf("%s\n", dump);
260     */
261
262     int expectedDeposits = 0;
263
264     rstrat->setInstance(getInstance());
265
266     if(ainfo.isSourceArray()) 
267         expectedDeposits = numElements;
268     
269     if(getType() == GROUP_STRATEGY) {
270         
271         CkGroupID gid;
272         int *srcpelist;
273         int nsrcpes;
274         
275         ginfo.getSourceGroup(gid, srcpelist, nsrcpes);
276         
277         for(int count = 0; count < nsrcpes; count ++)
278             if(srcpelist[count] == CkMyPe()){
279                 expectedDeposits = 1;
280                 break;
281             }
282         
283         StrategyTableEntry *sentry = 
284             CProxy_ComlibManager(CkpvAccess(cmgrID)).ckLocalBranch()
285             ->getStrategyTableEntry(myInstanceID);
286         sentry->numElements = expectedDeposits;
287     }
288     
289     if(!mflag) 
290         setLearner(new AAPLearner());    
291     else 
292         setLearner(new AAMLearner());                
293     
294     if(expectedDeposits > 0)
295         return;
296     
297     if(expectedDeposits == 0 && MyPe >= 0)
298         ConvComlibScheduleDoneInserting(myInstanceID);
299 }
300
301 void EachToManyMulticastStrategy::finalizeProcessing() {
302     if(npes > 0)
303         delete [] pelist;
304     
305     if(ndestpes > 0)
306         delete [] destpelist;
307
308     if(rstrat)
309         delete rstrat;
310
311     if(getLearner() != NULL)
312         delete getLearner();
313 }
314
315 void EachToManyMulticastStrategy::localMulticast(void *msg){
316     register envelope *env = (envelope *)msg;
317     CkUnpackMessage(&env);
318     
319     ainfo.localBroadcast(env);
320 }
321