c637ffb4ccf484f87054bf7da406c349869a2ca5
[charm.git] / src / ck-com / DirectMulticastStrategy.C
1
2 /********************************************************
3         Section multicast strategy suite. DirectMulticast and its
4         derivatives, multicast messages to a section of array elements
5         created on the fly. The section is invoked by calling a
6         section proxy. These strategies can also multicast to a subset
7         of processors for groups.
8
9         These strategies are non-bracketed. When the first request is
10         made a route is dynamically built on the section. The route
11         information is stored in
12
13  - Sameer Kumar
14
15 **********************************************/
16
17
18 #include "DirectMulticastStrategy.h"
19
20 CkpvExtern(CkGroupID, cmgrID);
21
22 /*
23 void *DMHandler(void *msg){
24     ComlibPrintf("[%d]:In CallbackHandler\n", CkMyPe());
25     DirectMulticastStrategy *nm_mgr;    
26     
27     CkMcastBaseMsg *bmsg = (CkMcastBaseMsg *)EnvToUsr((envelope *)msg);
28     int instid = bmsg->_cookie.sInfo.cInfo.instId;
29     
30     nm_mgr = (DirectMulticastStrategy *) 
31         CProxy_ComlibManager(CkpvAccess(cmgrID)).
32         ckLocalBranch()->getStrategy(instid);
33     
34     nm_mgr->handleMulticastMessage(msg);
35     return NULL;
36 }
37 */
38
39 DirectMulticastStrategy::DirectMulticastStrategy(CkArrayID aid, 
40                                                  int isPersistent)
41     :  CharmStrategy() {
42
43     ainfo.setDestinationArray(aid);
44     setType(ARRAY_STRATEGY);
45
46     this->isPersistent = isPersistent;
47 }
48
49 //Destroy all old built routes
50 DirectMulticastStrategy::~DirectMulticastStrategy() {
51     
52     ComlibPrintf("Calling Distructor\n");
53
54     if(getLearner() != NULL)
55         delete getLearner();
56         
57     CkHashtableIterator *ht_iterator = sec_ht.iterator();
58     ht_iterator->seekStart();
59     while(ht_iterator->hasNext()){
60         void **data;
61         data = (void **)ht_iterator->next();        
62         ComlibSectionHashObject *obj = (ComlibSectionHashObject *) (* data);
63         if(obj != NULL)
64             delete obj;
65     }
66 }
67
68 void DirectMulticastStrategy::insertMessage(CharmMessageHolder *cmsg){
69     
70     ComlibPrintf("[%d] Comlib Direct Section Multicast: insertMessage \n", 
71                  CkMyPe());   
72
73     if(cmsg->dest_proc == IS_SECTION_MULTICAST && cmsg->sec_id != NULL) { 
74         CkSectionID *sid = cmsg->sec_id;
75         int cur_sec_id = ComlibSectionInfo::getSectionID(*sid);
76         
77         if(cur_sec_id > 0) {        
78             sinfo.processOldSectionMessage(cmsg);            
79             
80             ComlibSectionHashKey 
81                 key(CkMyPe(), sid->_cookie.sInfo.cInfo.id);        
82             ComlibSectionHashObject *obj = sec_ht.get(key);
83
84             if(obj == NULL)
85                 CkAbort("Cannot Find Section\n");
86
87             envelope *env = UsrToEnv(cmsg->getCharmMessage());
88             localMulticast(env, obj);
89             remoteMulticast(env, obj);
90         }
91         else {            
92             //New sec id, so send it along with the message
93             void *newmsg = sinfo.getNewMulticastMessage(cmsg);
94             insertSectionID(sid);
95
96             ComlibSectionHashKey 
97                 key(CkMyPe(), sid->_cookie.sInfo.cInfo.id);        
98             
99             ComlibSectionHashObject *obj = sec_ht.get(key);
100
101             if(obj == NULL)
102                 CkAbort("Cannot Find Section\n");
103             
104             char *msg = cmsg->getCharmMessage();
105             localMulticast(UsrToEnv(msg), obj);
106             CkFreeMsg(msg);
107             
108             remoteMulticast(UsrToEnv(newmsg), obj);
109         }        
110     }
111     else 
112         CkAbort("Section multicast cannot be used without a section proxy");
113
114     delete cmsg;       
115 }
116
117 void DirectMulticastStrategy::insertSectionID(CkSectionID *sid) {
118     
119     ComlibSectionHashKey 
120         key(CkMyPe(), sid->_cookie.sInfo.cInfo.id);
121
122     ComlibSectionHashObject *obj = NULL;    
123     obj = sec_ht.get(key);
124     
125     if(obj != NULL)
126         delete obj;
127     
128     obj = createObjectOnSrcPe(sid->_nElems, sid->_elems);
129     sec_ht.put(key) = obj;
130 }
131
132
133 ComlibSectionHashObject *DirectMulticastStrategy::createObjectOnSrcPe
134 (int nindices, CkArrayIndexMax *idxlist) {
135
136     ComlibSectionHashObject *obj = new ComlibSectionHashObject();
137     
138     sinfo.getRemotePelist(nindices, idxlist, obj->npes, obj->pelist);
139     sinfo.getLocalIndices(nindices, idxlist, obj->indices);
140     
141     return obj;
142 }
143
144
145 ComlibSectionHashObject *DirectMulticastStrategy::
146 createObjectOnIntermediatePe(int nindices, CkArrayIndexMax *idxlist, 
147                              int srcpe){
148
149     ComlibSectionHashObject *obj = new ComlibSectionHashObject();
150         
151     obj->pelist = 0;
152     obj->npes = 0;
153     
154     sinfo.getLocalIndices(nindices, idxlist, obj->indices);
155
156     return obj;
157 }
158
159
160 void DirectMulticastStrategy::doneInserting(){
161     //Do nothing! Its a bracketed strategy
162 }
163
164 extern void CmiReference(void *);
165
166 //Send the multicast message the local array elements. The message is 
167 //copied and sent if elements exist. 
168 void DirectMulticastStrategy::localMulticast(envelope *env, 
169                                              ComlibSectionHashObject *obj) {
170     int nIndices = obj->indices.size();
171     
172     //If the library is set to persistent. 
173     //The message is stored in the library. The applications should 
174     //use the message as a readonly and it exists till the next one 
175     //comes along
176     
177     if(obj->msg != NULL) {
178         CmiFree(obj->msg);
179         obj->msg = NULL;
180     } 
181     
182     if(nIndices > 0) {
183         void *msg = EnvToUsr(env);
184         void *msg1 = msg;
185         
186         msg1 = CkCopyMsg(&msg);
187         
188         if(isPersistent) {
189             CmiReference(UsrToEnv(msg1));
190             obj->msg = (void *)UsrToEnv(msg1);
191         }
192         
193         ComlibArrayInfo::localMulticast(&(obj->indices), UsrToEnv(msg1));
194     }    
195 }
196
197
198 //Calls default multicast scheme to send the messages. It could 
199 //also call a converse lower level strategy to do the muiticast.
200 //For example pipelined multicast
201 void DirectMulticastStrategy::remoteMulticast(envelope *env, 
202                                               ComlibSectionHashObject *obj) {
203     
204     int npes = obj->npes;
205     int *pelist = obj->pelist;
206     
207     if(npes == 0) {
208         CmiFree(env);
209         return;    
210     }
211     
212     //CmiSetHandler(env, handlerId);
213     CmiSetHandler(env, CkpvAccess(strategy_handlerid));
214
215     ((CmiMsgHeaderBasic *) env)->stratid = getInstance();
216
217     //Collect Multicast Statistics
218     RECORD_SENDM_STATS(getInstance(), env->getTotalsize(), pelist, npes);
219     
220     CkPackMessage(&env);
221     //Sending a remote multicast
222     CmiSyncListSendAndFree(npes, pelist, env->getTotalsize(), (char*)env);
223 }
224
225 void DirectMulticastStrategy::pup(PUP::er &p){
226
227     CharmStrategy::pup(p);
228     p | isPersistent; 
229 }
230
231 void DirectMulticastStrategy::beginProcessing(int numElements){
232     
233     //handlerId = CkRegisterHandler((CmiHandler)DMHandler);    
234     
235     CkArrayID dest;
236     int nidx;
237     CkArrayIndexMax *idx_list;
238
239     ainfo.getDestinationArray(dest, idx_list, nidx);
240     sinfo = ComlibSectionInfo(dest, myInstanceID);
241
242     ComlibLearner *learner = new ComlibLearner();
243     setLearner(learner);
244 }
245
246 void DirectMulticastStrategy::handleMessage(void *msg){
247     envelope *env = (envelope *)msg;
248     RECORD_RECV_STATS(getInstance(), env->getTotalsize(), env->getSrcPe());
249
250     //Section multicast base message
251     CkMcastBaseMsg *cbmsg = (CkMcastBaseMsg *)EnvToUsr(env);
252     
253     int status = cbmsg->_cookie.sInfo.cInfo.status;
254     ComlibPrintf("[%d] In handleMulticastMessage %d\n", CkMyPe(), status);
255     
256     if(status == COMLIB_MULTICAST_NEW_SECTION)
257         handleNewMulticastMessage(env);
258     else {
259         //status == COMLIB_MULTICAST_OLD_SECTION, use the cached section id
260         ComlibSectionHashKey key(cbmsg->_cookie.pe, 
261                                  cbmsg->_cookie.sInfo.cInfo.id);    
262         
263         ComlibSectionHashObject *obj;
264         obj = sec_ht.get(key);
265         
266         if(obj == NULL)
267             CkAbort("Destination indices is NULL\n");
268         
269         localMulticast(env, obj);
270         remoteMulticast(env, obj);
271     }
272 }
273
274
275 void DirectMulticastStrategy::handleNewMulticastMessage(envelope *env) {
276     
277     ComlibPrintf("%d : In handleNewMulticastMessage\n", CkMyPe());
278
279     CkUnpackMessage(&env);    
280     
281     envelope *newenv;
282     CkVec<CkArrayIndexMax> idx_list;    
283     
284     sinfo.unpack(env, idx_list, newenv);
285
286     ComlibMulticastMsg *cbmsg = (ComlibMulticastMsg *)EnvToUsr(env);
287     ComlibSectionHashKey key(cbmsg->_cookie.pe, 
288                              cbmsg->_cookie.sInfo.cInfo.id);
289     
290     ComlibSectionHashObject *old_obj = NULL;
291     
292     old_obj = sec_ht.get(key);
293     if(old_obj != NULL)
294         delete old_obj;
295
296     
297     CkArrayIndexMax *idx_list_array = new CkArrayIndexMax[idx_list.size()];
298     for(int count = 0; count < idx_list.size(); count++)
299         idx_list_array[count] = idx_list[count];
300
301     ComlibSectionHashObject *new_obj = createObjectOnIntermediatePe
302         (idx_list.size(), idx_list_array, cbmsg->_cookie.pe);
303
304     delete idx_list_array;
305     
306     sec_ht.put(key) = new_obj;
307
308     remoteMulticast(env, new_obj);
309     localMulticast(newenv, new_obj); //local multicast always copies
310     CmiFree(newenv);                
311 }