TEmporarily turning off learning
[charm.git] / src / ck-com / EachToManyMulticastStrategy.C
1
2 #include "EachToManyMulticastStrategy.h"
3 #include "string.h"
4 #include "routerstrategy.h"
5
6 #include "AAPLearner.h"
7 #include "AAMLearner.h"
8
9 //EachToManyMulticastStrategy CODE
10 CkpvExtern(int, RecvdummyHandle);
11 CkpvExtern(CkGroupID, cmgrID);
12
13 void *itrDoneHandler(void *msg){
14
15     EachToManyMulticastStrategy *nm_mgr;
16     
17     DummyMsg *dmsg = (DummyMsg *)msg;
18     comID id = dmsg->id;
19     int instid = id.instanceID;
20
21     CmiFree(msg);
22     ComlibPrintf("[%d] Iteration finished %d\n", CkMyPe(), instid);
23
24     StrategyTableEntry *sentry = 
25         CProxy_ComlibManager(CkpvAccess(cmgrID)).ckLocalBranch()
26         ->getStrategyTableEntry(instid);
27     int nexpected = sentry->numElements;
28     
29     if(nexpected == 0) {             
30         ComlibPrintf("[%d] Calling Dummy Done Inserting, %d, %d\n", CkMyPe(), instid, nexpected);
31         nm_mgr = (EachToManyMulticastStrategy *)sentry->strategy;    
32         nm_mgr->doneInserting();
33     }
34     
35     return NULL;
36 }
37
38 void *E2MHandler(void *msg){
39     //CkPrintf("[%d]:In EachtoMany CallbackHandler\n", CkMyPe());
40     EachToManyMulticastStrategy *nm_mgr;    
41
42     envelope *env = (envelope *)msg;
43     CkMcastBaseMsg *bmsg = (CkMcastBaseMsg *)EnvToUsr(env);
44     int instid = bmsg->_cookie.sInfo.cInfo.instId;
45     
46     nm_mgr = (EachToManyMulticastStrategy *) 
47         CProxy_ComlibManager(CkpvAccess(cmgrID)).
48         ckLocalBranch()->getStrategy(instid);
49     
50     RECORD_RECV_STATS(instid, env->getTotalsize(), env->getSrcPe());
51     
52     nm_mgr->localMulticast(msg);
53     return NULL;
54 }
55
56 //Group Constructor
57 EachToManyMulticastStrategy::EachToManyMulticastStrategy(int substrategy, 
58                                                          int n_srcpes, 
59                                                          int *src_pelist,
60                                                          int n_destpes, 
61                                                          int *dest_pelist) 
62     : routerID(substrategy), CharmStrategy() {
63     
64     setType(GROUP_STRATEGY);
65
66     CkGroupID gid;
67     gid.setZero();
68     ginfo.setSourceGroup(gid, src_pelist, n_srcpes);    
69     ginfo.setDestinationGroup(gid, dest_pelist, n_destpes);
70
71     //Written in this funny way to be symettric with the array case.
72     ginfo.getDestinationGroup(gid, destpelist, ndestpes);
73     ginfo.getCombinedPeList(pelist, npes);
74
75     commonInit();
76 }
77
78 //Array Constructor
79 EachToManyMulticastStrategy::EachToManyMulticastStrategy(int substrategy, 
80                                                          CkArrayID src, 
81                                                          CkArrayID dest, 
82                                                          int nsrc, 
83                                                          CkArrayIndexMax 
84                                                          *srcelements, 
85                                                          int ndest, 
86                                                          CkArrayIndexMax 
87                                                          *destelements)
88     :routerID(substrategy), CharmStrategy() {
89
90     setType(ARRAY_STRATEGY);
91     ainfo.setSourceArray(src, srcelements, nsrc);
92     ainfo.setDestinationArray(dest, destelements, ndest);
93
94     ainfo.getDestinationPeList(destpelist, ndestpes);
95     ainfo.getCombinedPeList(pelist, npes);
96     
97     /*
98       char dump[1000];
99       char sdump[100];
100       sprintf(dump, "%d: Each To MANY PELIST :\n", CkMyPe());
101       for(int count = 0; count < npes; count ++){
102       sprintf(sdump, "%d, ", pelist[count]);
103       strcat(dump, sdump);           
104       }    
105       ComlibPrintf("%s\n", dump);
106     */
107
108     commonInit();
109 }
110
111 extern char *router;
112 //Common initialization for both group and array constructors
113 void EachToManyMulticastStrategy::commonInit() {
114
115     setBracketed();
116     setForwardOnMigration(1);
117
118     mflag = CmiFalse;
119
120     if(CkMyPe() == 0 && router != NULL){
121         if(strcmp(router, "USE_MESH") == 0)
122             routerID = USE_MESH;
123         else if(strcmp(router, "USE_GRID") == 0)
124             routerID = USE_GRID;
125         else  if(strcmp(router, "USE_HYPERCUBE") == 0)
126             routerID = USE_HYPERCUBE;
127         else  if(strcmp(router, "USE_DIRECT") == 0)
128             routerID = USE_DIRECT;        
129     }
130     
131     ComlibPrintf("Creating Strategy %d\n", routerID);
132
133     rstrat = NULL;
134 }
135
136 EachToManyMulticastStrategy::~EachToManyMulticastStrategy() {
137     CkHashtableIterator *ht_iterator = sec_ht.iterator();
138     ht_iterator->seekStart();
139     while(ht_iterator->hasNext()){
140         void **data;
141         data = (void **)ht_iterator->next();        
142         CkVec<CkArrayIndexMax> *a_vec = (CkVec<CkArrayIndexMax> *) (* data);
143         if(a_vec != NULL)
144             delete a_vec;
145     }
146 }
147
148
149 void EachToManyMulticastStrategy::insertMessage(CharmMessageHolder *cmsg){
150
151     ComlibPrintf("[%d] EachToManyMulticast: insertMessage \n", 
152                  CkMyPe());   
153
154     if(cmsg->dest_proc == IS_SECTION_MULTICAST && cmsg->sec_id != NULL) { 
155         int cur_sec_id = ComlibSectionInfo::getSectionID(*cmsg->sec_id);
156
157         if(cur_sec_id > 0) {        
158             sinfo.processOldSectionMessage(cmsg);
159         }
160         else {
161             //New sec id, so send it along with the message
162             void *newmsg = sinfo.getNewMulticastMessage(cmsg);
163             CkFreeMsg(cmsg->getCharmMessage());
164             CkSectionID *sid = cmsg->sec_id;
165             delete cmsg;
166             
167             cmsg = new CharmMessageHolder((char *)newmsg,
168                                           IS_SECTION_MULTICAST); 
169             cmsg->sec_id = sid;
170             initSectionID(cmsg->sec_id);
171         }        
172
173         if(cmsg->sec_id != NULL && cmsg->sec_id->pelist != NULL) {
174             cmsg->pelist = cmsg->sec_id->pelist;
175             cmsg->npes = cmsg->sec_id->npes;
176         }        
177     }
178
179     if(cmsg->dest_proc == IS_BROADCAST) {
180         cmsg->npes = ndestpes;
181         cmsg->pelist = destpelist;
182     }
183
184     //For section multicasts and broadcasts
185     if(cmsg->dest_proc == IS_SECTION_MULTICAST 
186        || cmsg->dest_proc == IS_BROADCAST ) {
187         
188         //Use Multicast Learner (Foobar will not work for combinations
189         //of personalized and multicast messages
190         
191         if(!mflag) {
192             ComlibLearner *l = getLearner();
193             if(l != NULL) {
194                 delete l;
195                 l = NULL;
196             }
197             
198             AAMLearner *alearner = new AAMLearner();
199             //setLearner(alearner);
200             mflag = CmiTrue;
201         }
202
203         CmiSetHandler(UsrToEnv(cmsg->getCharmMessage()), handlerId);
204
205         //Collect Multicast Statistics
206         RECORD_SENDM_STATS(getInstance(), 
207                            ((envelope *)cmsg->getMessage())->getTotalsize(), 
208                            cmsg->pelist, cmsg->npes);
209     }
210     else {
211         //Collect Statistics
212         RECORD_SEND_STATS(getInstance(), 
213                           ((envelope *)cmsg->getMessage())->getTotalsize(), 
214                           cmsg->dest_proc);
215     }
216     
217     rstrat->insertMessage(cmsg);
218 }
219
220 void EachToManyMulticastStrategy::doneInserting(){
221
222     StrategyTableEntry *sentry = 
223         CProxy_ComlibManager(CkpvAccess(cmgrID)).ckLocalBranch()
224         ->getStrategyTableEntry(getInstance());
225     int nexpected = sentry->numElements;
226
227     if(routerID == USE_DIRECT && nexpected == 0)
228         return;
229
230     //ComlibPrintf("%d: DoneInserting \n", CkMyPe());    
231     rstrat->doneInserting();
232 }
233
234 void EachToManyMulticastStrategy::pup(PUP::er &p){
235
236     int count = 0;
237     ComlibPrintf("[%d] Each To many::pup %s\n", CkMyPe(), 
238                  ((!p.isUnpacking() == 0)?("UnPacking"):("Packing")));
239
240     CharmStrategy::pup(p);
241
242     p | routerID; 
243     p | npes; p | ndestpes;     
244     p | mflag;
245     
246     if(p.isUnpacking() && npes > 0) {
247         pelist = new int[npes];    
248     }
249
250     if(npes > 0)
251         p(pelist, npes);
252
253     if(p.isUnpacking() && ndestpes > 0) {
254         destpelist = new int[ndestpes];    
255     }    
256
257     if(ndestpes > 0)
258         p(destpelist, ndestpes);
259
260     if(p.isUnpacking()){
261         handlerId = CkRegisterHandler((CmiHandler)E2MHandler);
262         int handler = CkRegisterHandler((CmiHandler)itrDoneHandler);        
263         
264         rstrat = new RouterStrategy(routerID, handler, npes, pelist);
265         setConverseStrategy(rstrat);
266         MyPe = rstrat->getProcMap()[CkMyPe()];
267     }
268     
269     ComlibPrintf("[%d] End of pup\n", CkMyPe());
270 }
271
272 void EachToManyMulticastStrategy::beginProcessing(int numElements){
273     
274     ComlibPrintf("[%d] Begin processing %d\n", CkMyPe(), numElements);
275     
276     char dump[1000];
277     char sdump[100];
278     sprintf(dump, "%d: Each To MANY PELIST :\n", CkMyPe());
279     for(int count = 0; count < npes; count ++){
280         sprintf(sdump, "%d, ", pelist[count]);
281         strcat(dump, sdump);           
282     }    
283     ComlibPrintf("%s\n", dump);
284
285     int expectedDeposits = 0;
286
287     rstrat->setInstance(getInstance());
288
289     if(ainfo.isSourceArray()) 
290         expectedDeposits = numElements;
291
292     if(getType() == GROUP_STRATEGY) {
293         
294         CkGroupID gid;
295         int *srcpelist;
296         int nsrcpes;
297         
298         ginfo.getSourceGroup(gid, srcpelist, nsrcpes);
299         
300         for(int count = 0; count < nsrcpes; count ++)
301             if(srcpelist[count] == CkMyPe()){
302                 expectedDeposits = 1;
303                 break;
304             }
305         
306         StrategyTableEntry *sentry = 
307             CProxy_ComlibManager(CkpvAccess(cmgrID)).ckLocalBranch()
308             ->getStrategyTableEntry(myInstanceID);
309         sentry->numElements = expectedDeposits;
310     }
311     
312     CkArrayID dest;
313     int nidx;
314     CkArrayIndexMax *idx_list;
315     
316     ainfo.getDestinationArray(dest, idx_list, nidx);
317     sinfo = ComlibSectionInfo(dest, myInstanceID);
318     
319     AAPLearner *alearner = new AAPLearner();
320     //setLearner(alearner);
321
322     if(expectedDeposits > 0)
323         return;
324     
325     if(expectedDeposits == 0 && MyPe >= 0)
326         //doneInserting();
327         ConvComlibScheduleDoneInserting(myInstanceID);
328 }
329
330 void EachToManyMulticastStrategy::finalizeProcessing() {
331     if(npes > 0)
332         delete [] pelist;
333     
334     if(ndestpes > 0)
335         delete [] destpelist;
336
337     if(rstrat)
338         delete rstrat;
339
340     if(getLearner() != NULL)
341         delete getLearner();
342 }
343
344
345 void EachToManyMulticastStrategy::localMulticast(void *msg){
346     register envelope *env = (envelope *)msg;
347     CkUnpackMessage(&env);
348     
349     CkMcastBaseMsg *cbmsg = (CkMcastBaseMsg *)EnvToUsr(env);
350
351     int status = cbmsg->_cookie.sInfo.cInfo.status;
352     ComlibPrintf("[%d] In local multicast %d\n", CkMyPe(), status);
353         
354     if(status == COMLIB_MULTICAST_ALL) {        
355         ainfo.localBroadcast(env);
356         return;
357     }   
358
359     CkVec<CkArrayIndexMax> *dest_indices;    
360     if(status == COMLIB_MULTICAST_NEW_SECTION){        
361         envelope *newenv;
362         sinfo.unpack(env, dest_indices, newenv);        
363         ComlibArrayInfo::localMulticast(dest_indices, newenv);
364
365         CkVec<CkArrayIndexMax> *old_dest_indices;
366         ComlibSectionHashKey key(cbmsg->_cookie.pe, 
367                                  cbmsg->_cookie.sInfo.cInfo.id);
368
369         old_dest_indices = (CkVec<CkArrayIndexMax> *)sec_ht.get(key);
370         if(old_dest_indices != NULL)
371             delete old_dest_indices;
372         
373         sec_ht.put(key) = dest_indices;
374         CmiFree(env);
375         return;       
376     }
377
378     //status == COMLIB_MULTICAST_OLD_SECTION, use the cached section id
379     ComlibSectionHashKey key(cbmsg->_cookie.pe, 
380                              cbmsg->_cookie.sInfo.cInfo.id);    
381     dest_indices = (CkVec<CkArrayIndexMax> *)sec_ht.get(key);
382
383     if(dest_indices == NULL)
384         CkAbort("Destination indices is NULL\n");
385
386     ComlibArrayInfo::localMulticast(dest_indices, env);
387 }
388
389 void EachToManyMulticastStrategy::initSectionID(CkSectionID *sid){
390
391     sinfo.initSectionID(sid);    
392
393     //Convert real processor numbers to virtual processors in the all
394     //to all multicast group
395     for(int count = 0; count < sid->npes; count ++) {
396         sid->pelist[count] = rstrat->getProcMap()[sid->pelist[count]]; 
397         if(sid->pelist[count] == -1) CkAbort("Invalid Section\n");
398     }
399 }