Changing CharmStrategy and DirectMulticastStrategy to use the strategy_handler
[charm.git] / src / ck-com / DirectMulticastStrategy.C
1
2 /********************************************************
3         Section multicast strategy suite. DirectMulticast and its
4         derivatives, multicast messages to a section of array elements
5         created on the fly. The section is invoked by calling a
6         section proxy. These strategies can also multicast to a subset
7         of processors for groups.
8
9         These strategies are non-bracketed. When the first request is
10         made a route is dynamically built on the section. The route
11         information is stored in
12
13  - Sameer Kumar
14
15 **********************************************/
16
17
18 #include "DirectMulticastStrategy.h"
19
20 CkpvExtern(CkGroupID, cmgrID);
21
22 /*
23 void *DMHandler(void *msg){
24     ComlibPrintf("[%d]:In CallbackHandler\n", CkMyPe());
25     DirectMulticastStrategy *nm_mgr;    
26     
27     CkMcastBaseMsg *bmsg = (CkMcastBaseMsg *)EnvToUsr((envelope *)msg);
28     int instid = bmsg->_cookie.sInfo.cInfo.instId;
29     
30     nm_mgr = (DirectMulticastStrategy *) 
31         CProxy_ComlibManager(CkpvAccess(cmgrID)).
32         ckLocalBranch()->getStrategy(instid);
33     
34     nm_mgr->handleMulticastMessage(msg);
35     return NULL;
36 }
37 */
38
39 DirectMulticastStrategy::DirectMulticastStrategy(CkArrayID aid)
40     :  CharmStrategy() {
41
42     ainfo.setDestinationArray(aid);
43     setType(ARRAY_STRATEGY);
44 }
45
46 //Destroy all old built routes
47 DirectMulticastStrategy::~DirectMulticastStrategy() {
48     
49     ComlibPrintf("Calling Distructor\n");
50
51     if(getLearner() != NULL)
52         delete getLearner();
53         
54     CkHashtableIterator *ht_iterator = sec_ht.iterator();
55     ht_iterator->seekStart();
56     while(ht_iterator->hasNext()){
57         void **data;
58         data = (void **)ht_iterator->next();        
59         ComlibSectionHashObject *obj = (ComlibSectionHashObject *) (* data);
60         if(obj != NULL)
61             delete obj;
62     }
63 }
64
65 void DirectMulticastStrategy::insertMessage(CharmMessageHolder *cmsg){
66     
67     ComlibPrintf("[%d] Comlib Direct Section Multicast: insertMessage \n", 
68                  CkMyPe());   
69
70     if(cmsg->dest_proc == IS_SECTION_MULTICAST && cmsg->sec_id != NULL) { 
71         CkSectionID *sid = cmsg->sec_id;
72         int cur_sec_id = ComlibSectionInfo::getSectionID(*sid);
73         
74         if(cur_sec_id > 0) {        
75             sinfo.processOldSectionMessage(cmsg);            
76             
77             ComlibSectionHashKey 
78                 key(CkMyPe(), sid->_cookie.sInfo.cInfo.id);        
79             ComlibSectionHashObject *obj = sec_ht.get(key);
80
81             if(obj == NULL)
82                 CkAbort("Cannot Find Section\n");
83
84             envelope *env = UsrToEnv(cmsg->getCharmMessage());
85             localMulticast(env, obj);
86             remoteMulticast(env, obj);
87         }
88         else {            
89             //New sec id, so send it along with the message
90             void *newmsg = sinfo.getNewMulticastMessage(cmsg);
91             insertSectionID(sid);
92
93             ComlibSectionHashKey 
94                 key(CkMyPe(), sid->_cookie.sInfo.cInfo.id);        
95             
96             ComlibSectionHashObject *obj = sec_ht.get(key);
97
98             if(obj == NULL)
99                 CkAbort("Cannot Find Section\n");
100             
101             char *msg = cmsg->getCharmMessage();
102             localMulticast(UsrToEnv(msg), obj);
103             CkFreeMsg(msg);
104             
105             remoteMulticast(UsrToEnv(newmsg), obj);
106         }        
107     }
108     else 
109         CkAbort("Section multicast cannot be used without a section proxy");
110
111     delete cmsg;       
112 }
113
114 void DirectMulticastStrategy::insertSectionID(CkSectionID *sid) {
115     
116     ComlibSectionHashKey 
117         key(CkMyPe(), sid->_cookie.sInfo.cInfo.id);
118
119     ComlibSectionHashObject *obj = NULL;    
120     obj = sec_ht.get(key);
121     
122     if(obj != NULL)
123         delete obj;
124     
125     obj = createObjectOnSrcPe(sid->_nElems, sid->_elems);
126     sec_ht.put(key) = obj;
127 }
128
129
130 ComlibSectionHashObject *DirectMulticastStrategy::createObjectOnSrcPe
131 (int nindices, CkArrayIndexMax *idxlist) {
132
133     ComlibSectionHashObject *obj = new ComlibSectionHashObject();
134     
135     sinfo.getRemotePelist(nindices, idxlist, obj->npes, obj->pelist);
136     sinfo.getLocalIndices(nindices, idxlist, obj->indices);
137     
138     return obj;
139 }
140
141
142 ComlibSectionHashObject *DirectMulticastStrategy::
143 createObjectOnIntermediatePe(int nindices, CkArrayIndexMax *idxlist, 
144                              int srcpe){
145
146     ComlibSectionHashObject *obj = new ComlibSectionHashObject();
147         
148     obj->pelist = 0;
149     obj->npes = 0;
150     
151     sinfo.getLocalIndices(nindices, idxlist, obj->indices);
152
153     return obj;
154 }
155
156
157 void DirectMulticastStrategy::doneInserting(){
158     //Do nothing! Its a bracketed strategy
159 }
160
161 //Send the multicast message the local array elements. The message is 
162 //copied and sent if elements exist. 
163 void DirectMulticastStrategy::localMulticast(envelope *env, 
164                                              ComlibSectionHashObject *obj) {
165     int nIndices = obj->indices.size();
166     
167     if(nIndices > 0) {
168         void *msg = EnvToUsr(env);
169         void *msg1 = msg;
170         
171         msg1 = CkCopyMsg(&msg);
172         ComlibArrayInfo::localMulticast(&(obj->indices), UsrToEnv(msg1));
173     }    
174 }
175
176
177 //Calls default multicast scheme to send the messages. It could 
178 //also call a converse lower level strategy to do the muiticast.
179 //For example pipelined multicast
180 void DirectMulticastStrategy::remoteMulticast(envelope *env, 
181                                               ComlibSectionHashObject *obj) {
182     
183     int npes = obj->npes;
184     int *pelist = obj->pelist;
185     
186     if(npes == 0) {
187         CmiFree(env);
188         return;    
189     }
190     
191     //CmiSetHandler(env, handlerId);
192     CmiSetHandler(env, CkpvAccess(strategy_handlerid));
193
194     ((CmiMsgHeaderBasic *) env)->stratid = getInstance();
195
196     //Collect Multicast Statistics
197     RECORD_SENDM_STATS(getInstance(), env->getTotalsize(), pelist, npes);
198     
199     CkPackMessage(&env);
200     //Sending a remote multicast
201     CmiSyncListSendAndFree(npes, pelist, env->getTotalsize(), (char*)env);
202 }
203
204 void DirectMulticastStrategy::pup(PUP::er &p){
205
206     CharmStrategy::pup(p);
207 }
208
209 void DirectMulticastStrategy::beginProcessing(int numElements){
210     
211     //handlerId = CkRegisterHandler((CmiHandler)DMHandler);    
212     
213     CkArrayID dest;
214     int nidx;
215     CkArrayIndexMax *idx_list;
216
217     ainfo.getDestinationArray(dest, idx_list, nidx);
218     sinfo = ComlibSectionInfo(dest, myInstanceID);
219
220     ComlibLearner *learner = new ComlibLearner();
221     setLearner(learner);
222 }
223
224 void DirectMulticastStrategy::handleMessage(void *msg){
225     envelope *env = (envelope *)msg;
226     RECORD_RECV_STATS(getInstance(), env->getTotalsize(), env->getSrcPe());
227
228     //Section multicast base message
229     CkMcastBaseMsg *cbmsg = (CkMcastBaseMsg *)EnvToUsr(env);
230     
231     int status = cbmsg->_cookie.sInfo.cInfo.status;
232     ComlibPrintf("[%d] In handleMulticastMessage %d\n", CkMyPe(), status);
233     
234     if(status == COMLIB_MULTICAST_NEW_SECTION)
235         handleNewMulticastMessage(env);
236     else {
237         //status == COMLIB_MULTICAST_OLD_SECTION, use the cached section id
238         ComlibSectionHashKey key(cbmsg->_cookie.pe, 
239                                  cbmsg->_cookie.sInfo.cInfo.id);    
240         
241         ComlibSectionHashObject *obj;
242         obj = sec_ht.get(key);
243         
244         if(obj == NULL)
245             CkAbort("Destination indices is NULL\n");
246         
247         localMulticast(env, obj);
248         remoteMulticast(env, obj);
249     }
250 }
251
252
253 void DirectMulticastStrategy::handleNewMulticastMessage(envelope *env) {
254     
255     ComlibPrintf("%d : In handleNewMulticastMessage\n", CkMyPe());
256
257     CkUnpackMessage(&env);    
258     
259     envelope *newenv;
260     CkVec<CkArrayIndexMax> idx_list;    
261     
262     sinfo.unpack(env, idx_list, newenv);
263
264     ComlibMulticastMsg *cbmsg = (ComlibMulticastMsg *)EnvToUsr(env);
265     ComlibSectionHashKey key(cbmsg->_cookie.pe, 
266                              cbmsg->_cookie.sInfo.cInfo.id);
267     
268     ComlibSectionHashObject *old_obj = NULL;
269     
270     old_obj = sec_ht.get(key);
271     if(old_obj != NULL)
272         delete old_obj;
273
274     
275     CkArrayIndexMax *idx_list_array = new CkArrayIndexMax[idx_list.size()];
276     for(int count = 0; count < idx_list.size(); count++)
277         idx_list_array[count] = idx_list[count];
278
279     ComlibSectionHashObject *new_obj = createObjectOnIntermediatePe
280         (idx_list.size(), idx_list_array, cbmsg->_cookie.pe);
281
282     delete idx_list_array;
283     
284     sec_ht.put(key) = new_obj;
285
286     remoteMulticast(env, new_obj);
287     
288     if(new_obj->indices.size() > 0)
289         ComlibArrayInfo::localMulticast(&(new_obj->indices), newenv);    
290     else        
291         CmiFree(newenv);                
292 }