Several new changes
[charm.git] / src / ck-com / DirectMulticastStrategy.C
1
2 /********************************************************
3         Section multicast strategy suite. DirectMulticast and its
4         derivatives, multicast messages to a section of array elements
5         created on the fly. The section is invoked by calling a
6         section proxy. These strategies can also multicast to a subset
7         of processors for groups.
8
9         These strategies are non-bracketed. When the first request is
10         made a route is dynamically built on the section. The route
11         information is stored in
12
13  - Sameer Kumar
14
15 **********************************************/
16
17
18 #include "DirectMulticastStrategy.h"
19
20 CkpvExtern(CkGroupID, cmgrID);
21
22 void *DMHandler(void *msg){
23     ComlibPrintf("[%d]:In CallbackHandler\n", CkMyPe());
24     DirectMulticastStrategy *nm_mgr;    
25     
26     CkMcastBaseMsg *bmsg = (CkMcastBaseMsg *)EnvToUsr((envelope *)msg);
27     int instid = bmsg->_cookie.sInfo.cInfo.instId;
28     
29     nm_mgr = (DirectMulticastStrategy *) 
30         CProxy_ComlibManager(CkpvAccess(cmgrID)).
31         ckLocalBranch()->getStrategy(instid);
32     
33     envelope *env = (envelope *) msg;
34     RECORD_RECV_STATS(instid, env->getTotalsize(), env->getSrcPe());
35     nm_mgr->handleMulticastMessage(msg);
36     return NULL;
37 }
38
39 DirectMulticastStrategy::DirectMulticastStrategy(CkArrayID aid)
40     :  CharmStrategy() {
41
42     ainfo.setDestinationArray(aid);
43     setType(ARRAY_STRATEGY);
44 }
45
46 //Destroy all old built routes
47 DirectMulticastStrategy::~DirectMulticastStrategy() {
48     
49     ComlibPrintf("Calling Distructor\n");
50
51     if(getLearner() != NULL)
52         delete getLearner();
53         
54     CkHashtableIterator *ht_iterator = sec_ht.iterator();
55     ht_iterator->seekStart();
56     while(ht_iterator->hasNext()){
57         void **data;
58         data = (void **)ht_iterator->next();        
59         ComlibSectionHashObject *obj = (ComlibSectionHashObject *) (* data);
60         if(obj != NULL)
61             delete obj;
62     }
63 }
64
65 void DirectMulticastStrategy::insertMessage(CharmMessageHolder *cmsg){
66     
67     ComlibPrintf("[%d] Comlib Direct Section Multicast: insertMessage \n", 
68                  CkMyPe());   
69
70     if(cmsg->dest_proc == IS_SECTION_MULTICAST && cmsg->sec_id != NULL) { 
71         CkSectionID *sid = cmsg->sec_id;
72         int cur_sec_id = ComlibSectionInfo::getSectionID(*sid);
73         
74         if(cur_sec_id > 0) {        
75             sinfo.processOldSectionMessage(cmsg);            
76             
77             ComlibSectionHashKey 
78                 key(CkMyPe(), sid->_cookie.sInfo.cInfo.id);        
79             ComlibSectionHashObject *obj = sec_ht.get(key);
80
81             if(obj == NULL)
82                 CkAbort("Cannot Find Section\n");
83
84             envelope *env = UsrToEnv(cmsg->getCharmMessage());
85             localMulticast(env, obj);
86             remoteMulticast(env, obj);
87         }
88         else {            
89             //New sec id, so send it along with the message
90             void *newmsg = sinfo.getNewMulticastMessage(cmsg);
91             insertSectionID(sid);
92
93             ComlibSectionHashKey 
94                 key(CkMyPe(), sid->_cookie.sInfo.cInfo.id);        
95             
96             ComlibSectionHashObject *obj = sec_ht.get(key);
97
98             if(obj == NULL)
99                 CkAbort("Cannot Find Section\n");
100             
101             char *msg = cmsg->getCharmMessage();
102             localMulticast(UsrToEnv(msg), obj);
103             CkFreeMsg(msg);
104             
105             remoteMulticast(UsrToEnv(newmsg), obj);
106         }        
107     }
108     else 
109         CkAbort("Section multicast cannot be used without a section proxy");
110
111     delete cmsg;       
112 }
113
114 void DirectMulticastStrategy::insertSectionID(CkSectionID *sid) {
115     
116     ComlibSectionHashKey 
117         key(CkMyPe(), sid->_cookie.sInfo.cInfo.id);
118
119     ComlibSectionHashObject *obj = NULL;    
120     obj = sec_ht.get(key);
121     
122     if(obj != NULL)
123         delete obj;
124     
125     obj = createObjectOnSrcPe(sid->_nElems, sid->_elems);
126     sec_ht.put(key) = obj;
127 }
128
129
130 ComlibSectionHashObject *DirectMulticastStrategy::createObjectOnSrcPe
131 (int nindices, CkArrayIndexMax *idxlist) {
132
133     ComlibSectionHashObject *obj = new ComlibSectionHashObject();
134     
135     sinfo.getRemotePelist(nindices, idxlist, obj->npes, obj->pelist);
136     sinfo.getLocalIndices(nindices, idxlist, obj->indices);
137     
138     return obj;
139 }
140
141
142 ComlibSectionHashObject *DirectMulticastStrategy::
143 createObjectOnIntermediatePe(int nindices, CkArrayIndexMax *idxlist, 
144                              int srcpe){
145
146     ComlibSectionHashObject *obj = new ComlibSectionHashObject();
147         
148     obj->pelist = 0;
149     obj->npes = 0;
150     
151     sinfo.getLocalIndices(nindices, idxlist, obj->indices);
152
153     return obj;
154 }
155
156
157 void DirectMulticastStrategy::doneInserting(){
158     //Do nothing! Its a bracketed strategy
159 }
160
161 //Send the multicast message the local array elements. The message is 
162 //copied and sent if elements exist. 
163 void DirectMulticastStrategy::localMulticast(envelope *env, 
164                                              ComlibSectionHashObject *obj) {
165     int nIndices = obj->indices.size();
166     
167     if(nIndices > 0) {
168         void *msg = EnvToUsr(env);
169         void *msg1 = msg;
170         
171         msg1 = CkCopyMsg(&msg);
172         ComlibArrayInfo::localMulticast(&(obj->indices), UsrToEnv(msg1));
173     }    
174 }
175
176
177 //Calls default multicast scheme to send the messages. It could 
178 //also call a converse lower level strategy to do the muiticast.
179 //For example pipelined multicast
180 void DirectMulticastStrategy::remoteMulticast(envelope *env, 
181                                               ComlibSectionHashObject *obj) {
182     
183     int npes = obj->npes;
184     int *pelist = obj->pelist;
185     
186     if(npes == 0) {
187         CmiFree(env);
188         return;    
189     }
190     
191     CmiSetHandler(env, handlerId);
192
193     //Collect Multicast Statistics
194     RECORD_SENDM_STATS(getInstance(), env->getTotalsize(), pelist, npes);
195     
196     //Sending a remote multicast
197     CmiSyncListSendAndFree(npes, pelist, env->getTotalsize(), (char*)env);
198 }
199
200 void DirectMulticastStrategy::pup(PUP::er &p){
201
202     CharmStrategy::pup(p);
203 }
204
205 void DirectMulticastStrategy::beginProcessing(int numElements){
206     
207     handlerId = CkRegisterHandler((CmiHandler)DMHandler);    
208     
209     CkArrayID dest;
210     int nidx;
211     CkArrayIndexMax *idx_list;
212
213     ainfo.getDestinationArray(dest, idx_list, nidx);
214     sinfo = ComlibSectionInfo(dest, myInstanceID);
215
216     ComlibLearner *learner = new ComlibLearner();
217     setLearner(learner);
218 }
219
220 void DirectMulticastStrategy::handleMulticastMessage(void *msg){
221     register envelope *env = (envelope *)msg;
222
223     //Section multicast base message
224     CkMcastBaseMsg *cbmsg = (CkMcastBaseMsg *)EnvToUsr(env);
225     
226     int status = cbmsg->_cookie.sInfo.cInfo.status;
227     ComlibPrintf("[%d] In handleMulticastMessage %d\n", CkMyPe(), status);
228     
229     if(status == COMLIB_MULTICAST_NEW_SECTION)
230         handleNewMulticastMessage(env);
231     else {
232         //status == COMLIB_MULTICAST_OLD_SECTION, use the cached section id
233         ComlibSectionHashKey key(cbmsg->_cookie.pe, 
234                                  cbmsg->_cookie.sInfo.cInfo.id);    
235         
236         ComlibSectionHashObject *obj;
237         obj = sec_ht.get(key);
238         
239         if(obj == NULL)
240             CkAbort("Destination indices is NULL\n");
241         
242         localMulticast(env, obj);
243         remoteMulticast(env, obj);
244     }
245 }
246
247
248 void DirectMulticastStrategy::handleNewMulticastMessage(envelope *env) {
249
250     ComlibPrintf("%d : In handleNewMulticastMessage\n", CkMyPe());
251
252     CkUnpackMessage(&env);    
253     
254     envelope *newenv;
255     CkVec<CkArrayIndexMax> idx_list;    
256     
257     sinfo.unpack(env, idx_list, newenv);
258
259     ComlibMulticastMsg *cbmsg = (ComlibMulticastMsg *)EnvToUsr(env);
260     ComlibSectionHashKey key(cbmsg->_cookie.pe, 
261                              cbmsg->_cookie.sInfo.cInfo.id);
262     
263     ComlibSectionHashObject *old_obj = NULL;
264     
265     old_obj = sec_ht.get(key);
266     if(old_obj != NULL)
267         delete old_obj;
268
269     
270     CkArrayIndexMax *idx_list_array = new CkArrayIndexMax[idx_list.size()];
271     for(int count = 0; count < idx_list.size(); count++)
272         idx_list_array[count] = idx_list[count];
273
274     ComlibSectionHashObject *new_obj = createObjectOnIntermediatePe
275         (idx_list.size(), idx_list_array, cbmsg->_cookie.pe);
276
277     delete idx_list_array;
278     
279     sec_ht.put(key) = new_obj;
280
281     if(new_obj->npes > 0) {
282         CkPackMessage(&env);
283         CmiSyncListSendAndFree(new_obj->npes, new_obj->pelist, 
284                                env->getTotalsize(), (char*)env);  
285     }
286     else        
287         CmiFree(env);                
288     
289     if(new_obj->indices.size() > 0)
290         ComlibArrayInfo::localMulticast(&(new_obj->indices), newenv);    
291     else        
292         CmiFree(newenv);                
293 }