Adding communication library in src/ck-com and src/conv-com
[charm.git] / src / ck-com / EachToManyMulticastStrategy.C
1
2 #include "EachToManyMulticastStrategy.h"
3 #include "string.h"
4 #include "routerstrategy.h"
5
6 //EachToManyMulticastStrategy CODE
7 CkpvExtern(int, RecvdummyHandle);
8 CkpvExtern(CkGroupID, cmgrID);
9
10 void *itrDoneHandler(void *msg){
11
12     EachToManyMulticastStrategy *nm_mgr;
13     
14     DummyMsg *dmsg = (DummyMsg *)msg;
15     comID id = dmsg->id;
16     int instid = id.instanceID;
17
18     CmiFree(msg);
19     ComlibPrintf("[%d] Iteration finished %d\n", CkMyPe(), instid);
20
21     StrategyTableEntry *sentry = 
22         CProxy_ComlibManager(CkpvAccess(cmgrID)).ckLocalBranch()
23         ->getStrategyTableEntry(instid);
24     int nexpected = sentry->numElements;
25     
26     if(nexpected == 0) {             
27         CkPrintf("[%d] Calling Dummy Done Inserting\n", CkMyPe());
28         nm_mgr = (EachToManyMulticastStrategy *)sentry->strategy;    
29         nm_mgr->doneInserting();
30     }
31     
32     return NULL;
33 }
34
35 void *E2MHandler(void *msg){
36     //CkPrintf("[%d]:In EachtoMany CallbackHandler\n", CkMyPe());
37     EachToManyMulticastStrategy *nm_mgr;    
38     
39     CkMcastBaseMsg *bmsg = (CkMcastBaseMsg *)EnvToUsr((envelope *)msg);
40     int instid = bmsg->_cookie.sInfo.cInfo.instId;
41     
42     nm_mgr = (EachToManyMulticastStrategy *) 
43         CProxy_ComlibManager(CkpvAccess(cmgrID)).
44         ckLocalBranch()->getStrategy(instid);
45     
46     nm_mgr->localMulticast(msg);
47     return NULL;
48 }
49
50 //Group Constructor
51 EachToManyMulticastStrategy::EachToManyMulticastStrategy(int substrategy, 
52                                                          int n_srcpes, 
53                                                          int *src_pelist,
54                                                          int n_destpes, 
55                                                          int *dest_pelist) 
56     : routerID(substrategy), CharmStrategy() {
57     
58     setType(GROUP_STRATEGY);
59
60     int count = 0;
61
62     if(n_srcpes == 0) {
63         n_srcpes = CkNumPes();
64         src_pelist = new int[n_srcpes];
65         for(count =0; count < n_srcpes; count ++)
66             src_pelist[count] = count;
67     }
68     
69     CkGroupID gid;
70     gid.setZero();
71     ginfo.setSourceGroup(gid, src_pelist, n_srcpes);    
72
73     if(n_destpes == 0) {
74         ndestpes = CkNumPes();
75         destpelist = new int[ndestpes];
76         for(count =0; count < ndestpes; count ++)
77             destpelist[count] = count;
78     }
79     else {
80         ndestpes = n_destpes;
81         destpelist = dest_pelist;
82     }
83
84     if(n_srcpes == 0){
85         pelist = src_pelist;
86         npes = n_srcpes;
87
88         commonInit();
89         return;
90     }
91
92     if(n_destpes == 0) {
93         pelist = destpelist;
94         npes = ndestpes;
95         
96         commonInit();
97         return;
98     }
99     
100     //source and destination lists are both subsets
101     pelist = new int[CkNumPes()];
102     npes = n_srcpes;
103     memcpy(pelist, src_pelist, n_srcpes * sizeof(int));
104     
105     for(int dcount = 0; dcount < ndestpes; dcount++) {
106         int p = destpelist[dcount];
107         
108         for(count = 0; count < npes; count ++)
109             if(pelist[count] == p)
110                 break;
111         
112         if(count == npes)
113             pelist[npes++] = p;
114     }    
115
116     commonInit();
117 }
118
119 //Array Constructor
120 EachToManyMulticastStrategy::EachToManyMulticastStrategy(int substrategy, 
121                                                          CkArrayID src, 
122                                                          CkArrayID dest, 
123                                                          int nsrc, 
124                                                          CkArrayIndexMax 
125                                                          *srcelements, 
126                                                          int ndest, 
127                                                          CkArrayIndexMax 
128                                                          *destelements)
129     :routerID(substrategy), CharmStrategy() {
130
131     setType(ARRAY_STRATEGY);
132     ainfo.setSourceArray(src, srcelements, nsrc);
133     ainfo.setDestinationArray(dest, destelements, ndest);
134
135     ainfo.getDestinationPeList(destpelist, ndestpes);
136     ainfo.getCombinedPeList(pelist, npes);
137     
138     //    for(int count = 0; count < npes; count ++){
139     //CkPrintf("%d, ", pelist[count]);
140     //}    
141     //CkPrintf("\n");
142
143     commonInit();
144 }
145
146 extern char *router;
147 //Common initialization for both group and array constructors
148 void EachToManyMulticastStrategy::commonInit() {
149
150     setBracketed();
151
152     if(CkMyPe() == 0 && router != NULL){
153         if(strcmp(router, "USE_MESH") == 0)
154             routerID = USE_MESH;
155         else if(strcmp(router, "USE_GRID") == 0)
156             routerID = USE_GRID;
157         else  if(strcmp(router, "USE_HYPERCUBE") == 0)
158             routerID = USE_HYPERCUBE;
159         else  if(strcmp(router, "USE_DIRECT") == 0)
160             routerID = USE_DIRECT;        
161     }
162     
163     ComlibPrintf("Creating Strategy %d\n", routerID);
164
165     rstrat = NULL;
166 }
167
168
169 void EachToManyMulticastStrategy::insertMessage(CharmMessageHolder *cmsg){
170
171     ComlibPrintf("[%d] EachToManyMulticast: insertMessage \n", 
172                  CkMyPe());   
173
174     if(cmsg->dest_proc == IS_MULTICAST && cmsg->sec_id != NULL) {        
175         int cur_sec_id = cmsg->sec_id->_cookie.sInfo.cInfo.id;
176
177         if(cur_sec_id > 0) {        
178             //Old section id, send the id with the message
179             CkMcastBaseMsg *cbmsg = (CkMcastBaseMsg *)cmsg->getCharmMessage();
180             cbmsg->_cookie.sInfo.cInfo.id = cur_sec_id;
181             cbmsg->_cookie.sInfo.cInfo.status = COMLIB_MULTICAST_OLD_SECTION;
182         }
183         else {
184             //New sec id, so send it along with the message
185             void *newmsg = (void *)getNewMulticastMessage(cmsg);
186             CkFreeMsg(cmsg->getCharmMessage());
187             CkSectionID *sid = cmsg->sec_id;
188             delete cmsg;
189             
190             cmsg = new CharmMessageHolder((char *)newmsg, IS_MULTICAST); 
191             cmsg->sec_id = sid;
192             initSectionID(cmsg->sec_id);
193         }        
194
195         if(cmsg->sec_id != NULL && cmsg->sec_id->pelist != NULL) {
196             cmsg->pelist = cmsg->sec_id->pelist;
197             cmsg->npes = cmsg->sec_id->npes;
198         }
199         
200         CmiSetHandler(UsrToEnv(cmsg->getCharmMessage()), handlerId);
201     }
202     
203     rstrat->insertMessage(cmsg);
204 }
205
206 void EachToManyMulticastStrategy::doneInserting(){
207     ComlibPrintf("%d: DoneInserting \n", CkMyPe());
208     
209     rstrat->doneInserting();
210 }
211
212 void EachToManyMulticastStrategy::pup(PUP::er &p){
213
214     int count = 0;
215     ComlibPrintf("[%d] Each To many::pup %s\n", CkMyPe(), 
216                  ((p.isPacking()==0)?("UnPacking"):("Packing")));
217
218     CharmStrategy::pup(p);
219
220     p | routerID; 
221     p | npes; p | ndestpes;     
222     
223     if(p.isUnpacking()) {
224         pelist = new int[npes];    
225     }
226     p(pelist, npes);
227
228     if(p.isUnpacking()) {
229         destpelist = new int[ndestpes];    
230     }    
231
232     p(destpelist, ndestpes);
233
234     if(p.isUnpacking()){
235         handlerId = CkRegisterHandler((CmiHandler)E2MHandler);
236         int handler = CkRegisterHandler((CmiHandler)itrDoneHandler);
237         
238         rstrat = new RouterStrategy(routerID, handler, npes, pelist);
239         setConverseStrategy(rstrat);
240         MyPe = rstrat->getProcMap()[CkMyPe()];
241     }
242     
243     ComlibPrintf("[%d] End of pup\n", CkMyPe());
244 }
245
246 void EachToManyMulticastStrategy::beginProcessing(int numElements){
247
248     int expectedDeposits = 0;
249     MaxSectionID = 0;
250
251     if(ainfo.isSourceArray()) 
252         expectedDeposits = numElements;
253
254     if(getType() == GROUP_STRATEGY) {
255         
256         CkGroupID gid;
257         int *srcpelist;
258         int nsrcpes;
259         
260         ginfo.getSourceGroup(gid, srcpelist, nsrcpes);
261         
262         for(int count = 0; count < nsrcpes; count ++)
263             if(srcpelist[count] == CkMyPe()){
264                 expectedDeposits = 1;
265                 break;
266             }
267         
268         StrategyTableEntry *sentry = 
269             CProxy_ComlibManager(CkpvAccess(cmgrID)).ckLocalBranch()
270             ->getStrategyTableEntry(myInstanceID);
271         sentry->numElements = expectedDeposits;
272     }
273     
274     if(expectedDeposits > 0)
275         return;
276     
277     if(expectedDeposits == 0 && MyPe >= 0)
278         doneInserting();
279 }
280
281 void EachToManyMulticastStrategy::localMulticast(void *msg){
282     register envelope *env = (envelope *)msg;
283     CkUnpackMessage(&env);
284     
285     CkMcastBaseMsg *cbmsg = (CkMcastBaseMsg *)EnvToUsr(env);
286
287     int status = cbmsg->_cookie.sInfo.cInfo.status;
288     ComlibPrintf("[%d] In local multicast %d\n", CkMyPe(), status);
289         
290     if(status == COMLIB_MULTICAST_ALL) {        
291         ainfo.localMulticast(env);
292         return;
293     }   
294
295     CkVec<CkArrayIndexMax> *dest_indices;    
296     if(status == COMLIB_MULTICAST_NEW_SECTION){        
297
298         dest_indices = new CkVec<CkArrayIndexMax>;
299
300         //CkPrintf("[%d] Received message for new section\n", CkMyPe());
301
302         CkArrayID destArrayID;
303         int nDestElements;
304         CkArrayIndexMax *destelements;
305         ainfo.getSourceArray(destArrayID, destelements, nDestElements);
306
307         ComlibMulticastMsg *ccmsg = (ComlibMulticastMsg *)cbmsg;
308         for(int count = 0; count < ccmsg->nIndices; count++){
309             CkArrayIndexMax idx = ccmsg->indices[count];
310             //idx.print();
311             int dest_proc =CkArrayID::CkLocalBranch(destArrayID)
312                 ->lastKnown(idx);
313             
314             if(dest_proc == CkMyPe())
315                 dest_indices->insertAtEnd(idx);                        
316         }            
317
318         envelope *usrenv = (envelope *) ccmsg->usrMsg;
319         envelope *newenv = (envelope *)CmiAlloc(usrenv->getTotalsize());
320         memcpy(newenv, ccmsg->usrMsg, usrenv->getTotalsize());
321
322         ainfo.localMulticast(dest_indices, newenv);
323
324         CkVec<CkArrayIndexMax> *old_dest_indices;
325         ComlibSectionHashKey key(cbmsg->_cookie.pe, 
326                                  cbmsg->_cookie.sInfo.cInfo.id);
327
328         old_dest_indices = (CkVec<CkArrayIndexMax> *)sec_ht.get(key);
329         if(old_dest_indices != NULL)
330             delete old_dest_indices;
331         
332         sec_ht.put(key) = dest_indices;
333         CmiFree(env);
334         return;       
335     }
336
337     //status == COMLIB_MULTICAST_OLD_SECTION, use the cached section id
338     ComlibSectionHashKey key(cbmsg->_cookie.pe, 
339                              cbmsg->_cookie.sInfo.cInfo.id);    
340     dest_indices = (CkVec<CkArrayIndexMax> *)sec_ht.get(key);
341
342     if(dest_indices == NULL)
343         CkAbort("Destination indices is NULL\n");
344
345     ainfo.localMulticast(dest_indices, env);
346 }
347
348 ComlibMulticastMsg * EachToManyMulticastStrategy::getNewMulticastMessage
349 (CharmMessageHolder *cmsg){
350     
351     if(cmsg->sec_id == NULL || cmsg->sec_id->_nElems == 0)
352         return NULL;
353
354     void *m = cmsg->getCharmMessage();
355     envelope *env = UsrToEnv(m);
356     
357     if(cmsg->sec_id->_cookie.sInfo.cInfo.id == 0) {  //New Section ID;
358         CkPackMessage(&env);
359         int sizes[2];
360         sizes[0] = cmsg->sec_id->_nElems;
361         sizes[1] = env->getTotalsize();                
362
363         cmsg->sec_id->_cookie.sInfo.cInfo.id = MaxSectionID ++;
364
365         ComlibPrintf("Creating new comlib multicast message %d, %d\n", sizes[0], sizes[1]);
366
367         ComlibMulticastMsg *msg = new(sizes, 0) ComlibMulticastMsg;
368         msg->nIndices = cmsg->sec_id->_nElems;
369         msg->_cookie.sInfo.cInfo.instId = myInstanceID;
370         msg->_cookie.type = COMLIB_MULTICAST_MESSAGE;
371         msg->_cookie.sInfo.cInfo.id = MaxSectionID - 1;
372         msg->_cookie.sInfo.cInfo.status = COMLIB_MULTICAST_NEW_SECTION;
373         msg->_cookie.pe = CkMyPe();
374
375         memcpy(msg->indices, cmsg->sec_id->_elems, 
376                sizes[0] * sizeof(CkArrayIndexMax));
377         memcpy(msg->usrMsg, env, sizes[1] * sizeof(char));         
378         envelope *newenv = UsrToEnv(msg);
379         
380         newenv->getsetArrayMgr() = env->getsetArrayMgr();
381         newenv->getsetArraySrcPe() = env->getsetArraySrcPe();
382         newenv->getsetArrayEp() = env->getsetArrayEp();
383         newenv->getsetArrayHops() = env->getsetArrayHops();
384         newenv->getsetArrayIndex() = env->getsetArrayIndex();
385         // for trace projections
386         newenv->setEvent(env->getEvent());
387         newenv->setSrcPe(env->getSrcPe());
388
389         CkPackMessage(&newenv);        
390         return (ComlibMulticastMsg *)EnvToUsr(newenv);
391     }   
392
393     return NULL;
394 }
395
396
397 void EachToManyMulticastStrategy::initSectionID(CkSectionID *sid){
398
399     ainfo.initSectionID(sid);    
400
401     //Convert real processor numbers to virtual processors in the all
402     //to all multicast group
403     for(int count = 0; count < sid->npes; count ++) {
404         sid->pelist[count] = rstrat->getProcMap()[sid->pelist[count]];        
405         if(sid->pelist[count] == -1) CkAbort("Invalid Section\n");
406     }
407 }