adds RectMulticastStrategy. Depends on BGML native layer on BG/L.
[charm.git] / src / ck-com / DirectMulticastStrategy.C
1
2 /********************************************************
3         Section multicast strategy suite. DirectMulticast and its
4         derivatives, multicast messages to a section of array elements
5         created on the fly. The section is invoked by calling a
6         section proxy. These strategies can also multicast to a subset
7         of processors for groups.
8
9         These strategies are non-bracketed. When the first request is
10         made a route is dynamically built on the section. The route
11         information is stored in
12
13  - Sameer Kumar
14
15 **********************************************/
16
17
18 #include "DirectMulticastStrategy.h"
19
20 CkpvExtern(CkGroupID, cmgrID);
21
22 /*
23 void *DMHandler(void *msg){
24     ComlibPrintf("[%d]:In CallbackHandler\n", CkMyPe());
25     DirectMulticastStrategy *nm_mgr;    
26     
27     CkMcastBaseMsg *bmsg = (CkMcastBaseMsg *)EnvToUsr((envelope *)msg);
28     int instid = bmsg->_cookie.sInfo.cInfo.instId;
29     
30     nm_mgr = (DirectMulticastStrategy *) 
31         CProxy_ComlibManager(CkpvAccess(cmgrID)).
32         ckLocalBranch()->getStrategy(instid);
33     
34     nm_mgr->handleMulticastMessage(msg);
35     return NULL;
36 }
37 */
38
39 DirectMulticastStrategy::DirectMulticastStrategy(CkArrayID aid, 
40                                                  int isPersistent)
41     :  CharmStrategy() {
42
43     ainfo.setDestinationArray(aid);
44     setType(ARRAY_STRATEGY);
45
46     this->isPersistent = isPersistent;
47 }
48
49 //Destroy all old built routes
50 DirectMulticastStrategy::~DirectMulticastStrategy() {
51     
52     ComlibPrintf("Calling Distructor\n");
53
54     if(getLearner() != NULL)
55         delete getLearner();
56         
57     CkHashtableIterator *ht_iterator = sec_ht.iterator();
58     ht_iterator->seekStart();
59     while(ht_iterator->hasNext()){
60         void **data;
61         data = (void **)ht_iterator->next();        
62         ComlibSectionHashObject *obj = (ComlibSectionHashObject *) (* data);
63         if(obj != NULL)
64             delete obj;
65     }
66 }
67
68 void DirectMulticastStrategy::insertMessage(CharmMessageHolder *cmsg){
69     
70     ComlibPrintf("[%d] Comlib Direct Section Multicast: insertMessage \n", 
71                  CkMyPe());   
72
73     if(cmsg->dest_proc == IS_SECTION_MULTICAST && cmsg->sec_id != NULL) { 
74         CkSectionID *sid = cmsg->sec_id;
75         int cur_sec_id = ComlibSectionInfo::getSectionID(*sid);
76         
77         if(cur_sec_id > 0) {        
78             sinfo.processOldSectionMessage(cmsg);            
79             
80             ComlibSectionHashKey 
81                 key(CkMyPe(), sid->_cookie.sInfo.cInfo.id);        
82             ComlibSectionHashObject *obj = sec_ht.get(key);
83
84             if(obj == NULL)
85                 CkAbort("Cannot Find Section\n");
86
87             envelope *env = UsrToEnv(cmsg->getCharmMessage());
88             localMulticast(env, obj);
89             remoteMulticast(env, obj);
90         }
91         else {
92             //New sec id, so send it along with the message
93             void *newmsg = sinfo.getNewMulticastMessage(cmsg, needSorting());
94             insertSectionID(sid);
95
96             ComlibSectionHashKey 
97                 key(CkMyPe(), sid->_cookie.sInfo.cInfo.id);        
98             
99             ComlibSectionHashObject *obj = sec_ht.get(key);
100
101             if(obj == NULL)
102                 CkAbort("Cannot Find Section\n");
103
104             /*
105             CkPrintf("%u: Src = %d dest:", key.hash(), CkMyPe());
106             for (int i=0; i<obj->npes; ++i)
107               CkPrintf(" %d",obj->pelist[i]);
108             CkPrintf(", map:");
109             ComlibMulticastMsg *lll = (ComlibMulticastMsg*)newmsg;
110             envelope *ppp = UsrToEnv(newmsg);
111             CkUnpackMessage(&ppp);
112             int ttt=0;
113             for (int i=0; i<lll->nPes; ++i) {
114               CkPrintf(" %d (",lll->indicesCount[i].pe);
115               for (int j=0; j<lll->indicesCount[i].count; ++j) {
116                 CkPrintf(" %d",((int*)&(lll->indices[ttt]))[1]);
117                 ttt++;
118               }
119               CkPrintf(" )");
120             }
121             CkPackMessage(&ppp);
122             CkPrintf("\n");
123             */
124
125             char *msg = cmsg->getCharmMessage();
126             localMulticast(UsrToEnv(msg), obj);
127             CkFreeMsg(msg);
128             
129             if (newmsg != NULL) remoteMulticast(UsrToEnv(newmsg), obj);
130         }        
131     }
132     else 
133         CkAbort("Section multicast cannot be used without a section proxy");
134
135     delete cmsg;       
136 }
137
138 void DirectMulticastStrategy::insertSectionID(CkSectionID *sid) {
139     
140     ComlibSectionHashKey 
141         key(CkMyPe(), sid->_cookie.sInfo.cInfo.id);
142
143     ComlibSectionHashObject *obj = NULL;    
144     obj = sec_ht.get(key);
145     
146     if(obj != NULL)
147         delete obj;
148     
149     obj = createObjectOnSrcPe(sid->_nElems, sid->_elems);
150     sec_ht.put(key) = obj;
151
152 }
153
154
155 ComlibSectionHashObject *
156 DirectMulticastStrategy::createObjectOnSrcPe(int nindices, CkArrayIndexMax *idxlist) {
157
158     ComlibSectionHashObject *obj = new ComlibSectionHashObject();
159     
160     sinfo.getRemotePelist(nindices, idxlist, obj->npes, obj->pelist);
161     sinfo.getLocalIndices(nindices, idxlist, obj->indices);
162     
163     return obj;
164 }
165
166
167 ComlibSectionHashObject *
168 DirectMulticastStrategy::createObjectOnIntermediatePe(int nindices,
169                                                       CkArrayIndexMax *idxlist,
170                                                       int npes,
171                                                       ComlibMulticastIndexCount *counts,
172                                                       int srcpe) {
173
174     ComlibSectionHashObject *obj = new ComlibSectionHashObject();
175         
176     obj->pelist = 0;
177     obj->npes = 0;
178
179     obj->indices.resize(0);
180     for (int i=0; i<nindices; ++i) obj->indices.insertAtEnd(idxlist[i]);
181     //sinfo.getLocalIndices(nindices, idxlist, obj->indices);
182
183     return obj;
184 }
185
186
187 void DirectMulticastStrategy::doneInserting(){
188     //Do nothing! Its a bracketed strategy
189 }
190
191 extern void CmiReference(void *);
192
193 //Send the multicast message the local array elements. The message is 
194 //copied and sent if elements exist. 
195 void DirectMulticastStrategy::localMulticast(envelope *env, 
196                                              ComlibSectionHashObject *obj) {
197     int nIndices = obj->indices.size();
198     
199     //If the library is set to persistent. 
200     //The message is stored in the library. The applications should 
201     //use the message as a readonly and it exists till the next one 
202     //comes along
203     
204     if(obj->msg != NULL) {
205         CmiFree(obj->msg);
206         obj->msg = NULL;
207     } 
208     
209     if(nIndices > 0) {
210         void *msg = EnvToUsr(env);
211         void *msg1 = msg;
212         
213         msg1 = CkCopyMsg(&msg);
214         
215         if(isPersistent) {
216             CmiReference(UsrToEnv(msg1));
217             obj->msg = (void *)UsrToEnv(msg1);
218         }
219         
220         ComlibArrayInfo::localMulticast(&(obj->indices), UsrToEnv(msg1));
221     }    
222 }
223
224
225 //Calls default multicast scheme to send the messages. It could 
226 //also call a converse lower level strategy to do the muiticast.
227 //For example pipelined multicast
228 void DirectMulticastStrategy::remoteMulticast(envelope *env, 
229                                               ComlibSectionHashObject *obj) {
230     
231     int npes = obj->npes;
232     int *pelist = obj->pelist;
233     
234     if(npes == 0) {
235         CmiFree(env);
236         return;    
237     }
238     
239     //CmiSetHandler(env, handlerId);
240     CmiSetHandler(env, CkpvAccess(strategy_handlerid));
241
242     ((CmiMsgHeaderBasic *) env)->stratid = getInstance();
243
244     //Collect Multicast Statistics
245     RECORD_SENDM_STATS(getInstance(), env->getTotalsize(), pelist, npes);
246     
247     CkPackMessage(&env);
248     //Sending a remote multicast
249     CmiSyncListSendAndFree(npes, pelist, env->getTotalsize(), (char*)env);
250     //CmiSyncBroadcastAndFree(env->getTotalsize(), (char*)env);
251 }
252
253 void DirectMulticastStrategy::pup(PUP::er &p){
254
255     CharmStrategy::pup(p);
256     p | isPersistent; 
257 }
258
259 void DirectMulticastStrategy::beginProcessing(int numElements){
260     
261     //handlerId = CkRegisterHandler((CmiHandler)DMHandler);    
262     
263     CkArrayID dest;
264     int nidx;
265     CkArrayIndexMax *idx_list;
266
267     ainfo.getDestinationArray(dest, idx_list, nidx);
268     sinfo = ComlibSectionInfo(dest, myInstanceID);
269
270     ComlibLearner *learner = new ComlibLearner();
271     //setLearner(learner);
272 }
273
274 void DirectMulticastStrategy::handleMessage(void *msg){
275     envelope *env = (envelope *)msg;
276     RECORD_RECV_STATS(getInstance(), env->getTotalsize(), env->getSrcPe());
277
278     //Section multicast base message
279     CkMcastBaseMsg *cbmsg = (CkMcastBaseMsg *)EnvToUsr(env);
280     
281     int status = cbmsg->_cookie.sInfo.cInfo.status;
282     ComlibPrintf("[%d] In handleMulticastMessage %d\n", CkMyPe(), status);
283     
284     if(status == COMLIB_MULTICAST_NEW_SECTION)
285         handleNewMulticastMessage(env);
286     else {
287         //status == COMLIB_MULTICAST_OLD_SECTION, use the cached section id
288         ComlibSectionHashKey key(cbmsg->_cookie.pe, 
289                                  cbmsg->_cookie.sInfo.cInfo.id);    
290         
291         ComlibSectionHashObject *obj;
292         obj = sec_ht.get(key);
293         
294         if(obj == NULL)
295             CkAbort("Destination indices is NULL\n");
296         
297         localMulticast(env, obj);
298         remoteMulticast(env, obj);
299     }
300 }
301
302 #include <string>
303
304 void DirectMulticastStrategy::handleNewMulticastMessage(envelope *env) {
305     
306     ComlibPrintf("%d : In handleNewMulticastMessage\n", CkMyPe());
307
308     CkUnpackMessage(&env);    
309
310     int localElems;
311     envelope *newenv;
312     CkArrayIndexMax *local_idx_list;    
313     
314     sinfo.unpack(env, localElems, local_idx_list, newenv);
315
316     ComlibMulticastMsg *cbmsg = (ComlibMulticastMsg *)EnvToUsr(env);
317     ComlibSectionHashKey key(cbmsg->_cookie.pe, 
318                              cbmsg->_cookie.sInfo.cInfo.id);
319     
320     ComlibSectionHashObject *old_obj = NULL;
321     
322     old_obj = sec_ht.get(key);
323     if(old_obj != NULL) {
324         delete old_obj;
325     }
326
327     /*
328     CkArrayIndexMax *idx_list_array = new CkArrayIndexMax[idx_list.size()];
329     for(int count = 0; count < idx_list.size(); count++)
330         idx_list_array[count] = idx_list[count];
331     */
332
333     ComlibSectionHashObject *new_obj = createObjectOnIntermediatePe(localElems, local_idx_list, cbmsg->nPes, cbmsg->indicesCount, cbmsg->_cookie.pe);
334
335     /*
336     char buf[100];
337     std::string tmp = "";
338     for (int i=0; i<idx_list.size(); i++) {
339       sprintf(buf, " (%d-%d-%d-%d)",((short*)&idx_list_array[i])[2],((short*)&idx_list_array[i])[3],((short*)&idx_list_array[i])[4],((short*)&idx_list_array[i])[5]);
340       tmp+=buf;
341     }
342     ComlibPrintf("[%d] LOCAL MULTICAST: %s\n",key.hash(),tmp.data());
343     tmp=""+new_obj->npes;
344     for (int i=0; i<new_obj->npes; i++) {
345       sprintf(buf, " %d",new_obj->pelist[i]);
346       tmp+=buf;
347     }
348     ComlibPrintf("[%d] REMOTE MULTICAST: %s\n",key.hash(),tmp.data());
349     */
350     //delete [] idx_list_array;
351     
352     sec_ht.put(key) = new_obj;
353
354     /*
355     CkPrintf("%u: Proc = %d (%d) forward:", key.hash(), CkMyPe(),cbmsg->nPes);
356     for (int i=0; i<new_obj->npes; ++i) CkPrintf(" %d",new_obj->pelist[i]);
357     CkPrintf(", deliver:");
358     for (int i=0; i<new_obj->indices.size(); ++i) CkPrintf(" %d",((int*)&new_obj->indices[i])[1]);
359     CkPrintf("\n");
360     */
361
362     remoteMulticast(env, new_obj);
363     localMulticast(newenv, new_obj); //local multicast always copies
364     CmiFree(newenv);                
365 }