Added doxygen documentation.
[charm.git] / src / ck-com / DirectMulticastStrategy.C
1
2 /********************************************************
3         Section multicast strategy suite. DirectMulticast and its
4         derivatives, multicast messages to a section of array elements
5         created on the fly. The section is invoked by calling a
6         section proxy. These strategies can also multicast to a subset
7         of processors for groups.
8
9         These strategies are non-bracketed. When the first request is
10         made a route is dynamically built on the section. The route
11         information is stored in
12
13  - Sameer Kumar
14
15 **********************************************/
16
17
18 #include "DirectMulticastStrategy.h"
19
20 CkpvExtern(CkGroupID, cmgrID);
21
22 /*
23 void *DMHandler(void *msg){
24     ComlibPrintf("[%d]:In CallbackHandler\n", CkMyPe());
25     DirectMulticastStrategy *nm_mgr;    
26     
27     CkMcastBaseMsg *bmsg = (CkMcastBaseMsg *)EnvToUsr((envelope *)msg);
28     int instid = bmsg->_cookie.sInfo.cInfo.instId;
29     
30     nm_mgr = (DirectMulticastStrategy *) 
31         CProxy_ComlibManager(CkpvAccess(cmgrID)).
32         ckLocalBranch()->getStrategy(instid);
33     
34     nm_mgr->handleMulticastMessage(msg);
35     return NULL;
36 }
37 */
38
39 DirectMulticastStrategy::DirectMulticastStrategy(CkArrayID aid, 
40                                                  int isPersistent)
41     :  CharmStrategy() {
42
43     ainfo.setDestinationArray(aid);
44     setType(ARRAY_STRATEGY);
45
46     this->isPersistent = isPersistent;
47 }
48
49 //Destroy all old built routes
50 DirectMulticastStrategy::~DirectMulticastStrategy() {
51     
52     ComlibPrintf("Calling Distructor\n");
53
54     if(getLearner() != NULL)
55         delete getLearner();
56         
57     CkHashtableIterator *ht_iterator = sec_ht.iterator();
58     ht_iterator->seekStart();
59     while(ht_iterator->hasNext()){
60         void **data;
61         data = (void **)ht_iterator->next();        
62         ComlibSectionHashObject *obj = (ComlibSectionHashObject *) (* data);
63         if(obj != NULL)
64             delete obj;
65     }
66 }
67
68 void DirectMulticastStrategy::insertMessage(CharmMessageHolder *cmsg){
69     
70     ComlibPrintf("[%d] Comlib Direct Section Multicast: insertMessage \n", 
71                  CkMyPe());   
72
73     if(cmsg->dest_proc == IS_SECTION_MULTICAST && cmsg->sec_id != NULL) { 
74         CkSectionID *sid = cmsg->sec_id;
75         int cur_sec_id = ComlibSectionInfo::getSectionID(*sid);
76         
77         if(cur_sec_id > 0) {        
78             sinfo.processOldSectionMessage(cmsg);            
79             
80             ComlibSectionHashKey 
81                 key(CkMyPe(), sid->_cookie.sInfo.cInfo.id);        
82             ComlibSectionHashObject *obj = sec_ht.get(key);
83
84             if(obj == NULL)
85                 CkAbort("Cannot Find Section\n");
86
87             envelope *env = UsrToEnv(cmsg->getCharmMessage());
88             localMulticast(env, obj);
89             remoteMulticast(env, obj);
90         }
91         else {            
92             //New sec id, so send it along with the message
93             void *newmsg = sinfo.getNewMulticastMessage(cmsg);
94             insertSectionID(sid);
95
96             ComlibSectionHashKey 
97                 key(CkMyPe(), sid->_cookie.sInfo.cInfo.id);        
98             
99             ComlibSectionHashObject *obj = sec_ht.get(key);
100
101             if(obj == NULL)
102                 CkAbort("Cannot Find Section\n");
103             
104             char *msg = cmsg->getCharmMessage();
105             localMulticast(UsrToEnv(msg), obj);
106             CkFreeMsg(msg);
107             
108             if (newmsg != NULL) remoteMulticast(UsrToEnv(newmsg), obj);
109         }        
110     }
111     else 
112         CkAbort("Section multicast cannot be used without a section proxy");
113
114     delete cmsg;       
115 }
116
117 void DirectMulticastStrategy::insertSectionID(CkSectionID *sid) {
118     
119     ComlibSectionHashKey 
120         key(CkMyPe(), sid->_cookie.sInfo.cInfo.id);
121
122     ComlibSectionHashObject *obj = NULL;    
123     obj = sec_ht.get(key);
124     
125     if(obj != NULL)
126         delete obj;
127     
128     obj = createObjectOnSrcPe(sid->_nElems, sid->_elems);
129     sec_ht.put(key) = obj;
130 }
131
132
133 ComlibSectionHashObject *
134 DirectMulticastStrategy::createObjectOnSrcPe(int nindices, CkArrayIndexMax *idxlist) {
135
136     ComlibSectionHashObject *obj = new ComlibSectionHashObject();
137     
138     sinfo.getRemotePelist(nindices, idxlist, obj->npes, obj->pelist);
139     sinfo.getLocalIndices(nindices, idxlist, obj->indices);
140     
141     return obj;
142 }
143
144
145 ComlibSectionHashObject *
146 DirectMulticastStrategy::createObjectOnIntermediatePe(int nindices,
147                                                       CkArrayIndexMax *idxlist,
148                                                       int npes,
149                                                       ComlibMulticastIndexCount *counts,
150                                                       int srcpe) {
151
152     ComlibSectionHashObject *obj = new ComlibSectionHashObject();
153         
154     obj->pelist = 0;
155     obj->npes = 0;
156
157     obj->indices.resize(0);
158     for (int i=0; i<nindices; ++i) obj->indices.insertAtEnd(idxlist[i]);
159     //sinfo.getLocalIndices(nindices, idxlist, obj->indices);
160
161     return obj;
162 }
163
164
165 void DirectMulticastStrategy::doneInserting(){
166     //Do nothing! Its a bracketed strategy
167 }
168
169 extern void CmiReference(void *);
170
171 //Send the multicast message the local array elements. The message is 
172 //copied and sent if elements exist. 
173 void DirectMulticastStrategy::localMulticast(envelope *env, 
174                                              ComlibSectionHashObject *obj) {
175     int nIndices = obj->indices.size();
176     
177     //If the library is set to persistent. 
178     //The message is stored in the library. The applications should 
179     //use the message as a readonly and it exists till the next one 
180     //comes along
181     
182     if(obj->msg != NULL) {
183         CmiFree(obj->msg);
184         obj->msg = NULL;
185     } 
186     
187     if(nIndices > 0) {
188         void *msg = EnvToUsr(env);
189         void *msg1 = msg;
190         
191         msg1 = CkCopyMsg(&msg);
192         
193         if(isPersistent) {
194             CmiReference(UsrToEnv(msg1));
195             obj->msg = (void *)UsrToEnv(msg1);
196         }
197         
198         ComlibArrayInfo::localMulticast(&(obj->indices), UsrToEnv(msg1));
199     }    
200 }
201
202
203 //Calls default multicast scheme to send the messages. It could 
204 //also call a converse lower level strategy to do the muiticast.
205 //For example pipelined multicast
206 void DirectMulticastStrategy::remoteMulticast(envelope *env, 
207                                               ComlibSectionHashObject *obj) {
208     
209     int npes = obj->npes;
210     int *pelist = obj->pelist;
211     
212     if(npes == 0) {
213         CmiFree(env);
214         return;    
215     }
216     
217     //CmiSetHandler(env, handlerId);
218     CmiSetHandler(env, CkpvAccess(strategy_handlerid));
219
220     ((CmiMsgHeaderBasic *) env)->stratid = getInstance();
221
222     //Collect Multicast Statistics
223     RECORD_SENDM_STATS(getInstance(), env->getTotalsize(), pelist, npes);
224     
225     CkPackMessage(&env);
226     //Sending a remote multicast
227     CmiSyncListSendAndFree(npes, pelist, env->getTotalsize(), (char*)env);
228     //CmiSyncBroadcastAndFree(env->getTotalsize(), (char*)env);
229 }
230
231 void DirectMulticastStrategy::pup(PUP::er &p){
232
233     CharmStrategy::pup(p);
234     p | isPersistent; 
235 }
236
237 void DirectMulticastStrategy::beginProcessing(int numElements){
238     
239     //handlerId = CkRegisterHandler((CmiHandler)DMHandler);    
240     
241     CkArrayID dest;
242     int nidx;
243     CkArrayIndexMax *idx_list;
244
245     ainfo.getDestinationArray(dest, idx_list, nidx);
246     sinfo = ComlibSectionInfo(dest, myInstanceID);
247
248     ComlibLearner *learner = new ComlibLearner();
249     //setLearner(learner);
250 }
251
252 void DirectMulticastStrategy::handleMessage(void *msg){
253     envelope *env = (envelope *)msg;
254     RECORD_RECV_STATS(getInstance(), env->getTotalsize(), env->getSrcPe());
255
256     //Section multicast base message
257     CkMcastBaseMsg *cbmsg = (CkMcastBaseMsg *)EnvToUsr(env);
258     
259     int status = cbmsg->_cookie.sInfo.cInfo.status;
260     ComlibPrintf("[%d] In handleMulticastMessage %d\n", CkMyPe(), status);
261     
262     if(status == COMLIB_MULTICAST_NEW_SECTION)
263         handleNewMulticastMessage(env);
264     else {
265         //status == COMLIB_MULTICAST_OLD_SECTION, use the cached section id
266         ComlibSectionHashKey key(cbmsg->_cookie.pe, 
267                                  cbmsg->_cookie.sInfo.cInfo.id);    
268         
269         ComlibSectionHashObject *obj;
270         obj = sec_ht.get(key);
271         
272         if(obj == NULL)
273             CkAbort("Destination indices is NULL\n");
274         
275         localMulticast(env, obj);
276         remoteMulticast(env, obj);
277     }
278 }
279
280 #include <string>
281
282 void DirectMulticastStrategy::handleNewMulticastMessage(envelope *env) {
283     
284     ComlibPrintf("%d : In handleNewMulticastMessage\n", CkMyPe());
285
286     CkUnpackMessage(&env);    
287
288     int localElems;
289     envelope *newenv;
290     CkArrayIndexMax *local_idx_list;    
291     
292     sinfo.unpack(env, localElems, local_idx_list, newenv);
293
294     ComlibMulticastMsg *cbmsg = (ComlibMulticastMsg *)EnvToUsr(env);
295     ComlibSectionHashKey key(cbmsg->_cookie.pe, 
296                              cbmsg->_cookie.sInfo.cInfo.id);
297     
298     ComlibSectionHashObject *old_obj = NULL;
299     
300     old_obj = sec_ht.get(key);
301     if(old_obj != NULL) {
302         delete old_obj;
303     }
304
305     /*
306     CkArrayIndexMax *idx_list_array = new CkArrayIndexMax[idx_list.size()];
307     for(int count = 0; count < idx_list.size(); count++)
308         idx_list_array[count] = idx_list[count];
309     */
310
311     ComlibSectionHashObject *new_obj = createObjectOnIntermediatePe(localElems, local_idx_list, cbmsg->nPes, cbmsg->indicesCount, cbmsg->_cookie.pe);
312
313     /*
314     char buf[100];
315     std::string tmp = "";
316     for (int i=0; i<idx_list.size(); i++) {
317       sprintf(buf, " (%d-%d-%d-%d)",((short*)&idx_list_array[i])[2],((short*)&idx_list_array[i])[3],((short*)&idx_list_array[i])[4],((short*)&idx_list_array[i])[5]);
318       tmp+=buf;
319     }
320     ComlibPrintf("[%d] LOCAL MULTICAST: %s\n",key.hash(),tmp.data());
321     tmp=""+new_obj->npes;
322     for (int i=0; i<new_obj->npes; i++) {
323       sprintf(buf, " %d",new_obj->pelist[i]);
324       tmp+=buf;
325     }
326     ComlibPrintf("[%d] REMOTE MULTICAST: %s\n",key.hash(),tmp.data());
327     */
328     //delete [] idx_list_array;
329     
330     sec_ht.put(key) = new_obj;
331
332     remoteMulticast(env, new_obj);
333     localMulticast(newenv, new_obj); //local multicast always copies
334     CmiFree(newenv);                
335 }