Merge branch 'charm' of charmgit:charm into charm
[charm.git] / src / ck-com / MulticastStrategy.C
1 /**
2    @addtogroup ComlibCharmStrategy
3    @{
4    @file
5
6    MulticastStrategy and its
7    derivatives, multicast messages to a section of array elements
8    created on the fly. The section is invoked by calling a
9    section proxy. These strategies can also multicast to a subset
10    of processors for groups.
11    
12    These strategies are non-bracketed. When the first request is
13    made a route is dynamically built on the section. The route
14    information is stored in
15
16  - Sameer Kumar
17  - Heavily revised by Filippo Gioachin 2/2006
18
19 */
20
21
22 #include "MulticastStrategy.h"
23
24 CkpvExtern(CkGroupID, cmgrID);
25
26
27 MulticastStrategy::MulticastStrategy()
28   : Strategy(), CharmStrategy() {
29
30   ComlibPrintf("MulticastStrategy constructor\n");
31   //ainfo.setDestinationArray(aid);
32   setType(ARRAY_STRATEGY);
33 }
34
35 //Destroy all old built routes
36 MulticastStrategy::~MulticastStrategy() {
37     
38   ComlibPrintf("MulticastStrategy destructor\n");
39
40   if(getLearner() != NULL)
41     delete getLearner();
42         
43   CkHashtableIterator *ht_iterator = sec_ht.iterator();
44   ht_iterator->seekStart();
45   while(ht_iterator->hasNext()){
46     void **data;
47     data = (void **)ht_iterator->next();        
48     ComlibSectionHashObject *obj = (ComlibSectionHashObject *) (* data);
49     if(obj != NULL)
50       delete obj;
51   }
52 }
53
54 #if 0
55 void rewritePEs(CharmMessageHolder *cmsg){
56   ComlibPrintf("[%d] rewritePEs insertMessage \n",CkMyPe());
57     
58   CkAssert(cmsg->dest_proc == IS_SECTION_MULTICAST);
59     
60   void *m = cmsg->getCharmMessage();
61   envelope *env = UsrToEnv(m);
62     
63   ComlibMulticastMsg *msg = (ComlibMulticastMsg *)m;
64     
65 }
66 #endif
67
68 void MulticastStrategy::insertMessage(CharmMessageHolder *cmsg){
69     
70   ComlibPrintf("[%d] Comlib Section Multicast: insertMessage \n",  CkMyPe());   
71   // ComlibPrintf("[%d] Comlib Section Multicast: insertMessage \n",  CkMyPe());   
72   
73   
74   ComlibPrintf("[%d] sec_ht.numObjects() =%d\n", CkMyPe(), sec_ht.numObjects());
75   
76   
77   if(cmsg->dest_proc == IS_SECTION_MULTICAST && cmsg->sec_id != NULL) { 
78     ComlibPrintf("[%d] Comlib Section Multicast: looking up cur_sec_id\n",CkMyPe());
79         
80     CkSectionID *sid = cmsg->sec_id;
81
82     // This is a sanity check if we only use a tiny chare array
83     // if(sid->_nElems > 4 || sid->_nElems<0){
84     //       CkPrintf("[%d] Warning!!!!!!!!!!! Section ID in message seems to be screwed up. cmg=%p sid=%p sid->_nElems=%d\n", CkMyPe(), cmsg, sid, (int)sid->_nElems);
85     //       CkAbort("");
86     //     }
87     int cur_sec_id = sid->getSectionID();
88
89     if(cur_sec_id > 0) {
90       sinfo.processOldSectionMessage(cmsg);
91       ComlibPrintf("Array section id was %d, but now is %d\n", cur_sec_id, sid->getSectionID());
92       CkAssert(cur_sec_id == sid->getSectionID());
93
94       ComlibPrintf("[%d] Comlib Section Multicast: insertMessage: cookiePE=%d\n",CkMyPe(),sid->_cookie.get_pe());
95       ComlibSectionHashKey key(CkMyPe(), cur_sec_id);
96       ComlibSectionHashObject *obj = sec_ht.get(key);
97
98       if(obj == NULL) {
99         //CkAbort("Cannot Find Section\n");
100         /* The object can be NULL for various reasons:
101          * 1) the user reassociated the section proxy with a different
102          *    multicast strategy, in which case the new one has no idea about
103          *    the previous usage of the proxy, but the proxy has the cur_sec_id
104          *    set by the previous strategy
105          * 2) the proxy migrated to another processor, in which case the
106          *    cur_sec_id is non null, but the CkMyPe changed, so the hashed
107          *    object could not be found (Filippo: I'm not sure if the id will
108          *    be reset upon migration, so if this case if possible)
109          */
110       }
111
112       /* In the following if, the check (CkMyPe == sid->_cookie.pe) helps identifying situations
113        * where the proxy has migrated from one processor to another. In this situation, the
114        * destination processor might find an "obj", created by somebody else. This "obj"
115        * is accepted only if the current processor is equal to the processor in which the
116        * cookie ID was defined. */
117       if (obj != NULL && CkMyPe() == sid->_cookie.get_pe() && !obj->isOld) {
118         envelope *env = UsrToEnv(cmsg->getCharmMessage());
119         localMulticast(env, obj, (CkMcastBaseMsg*)cmsg->getCharmMessage());
120         remoteMulticast(env, obj);
121
122         delete cmsg;
123         return;
124       }
125     }
126
127
128     // reaching here means the message was not sent as old, either because
129     // it is the first for this section or the existing section is old.
130     ComlibPrintf("[%d] MulticastStrategy, creating a new multicast path\n", CkMyPe());
131         
132     //New sec id, so send it along with the message
133     ComlibMulticastMsg *newmsg = sinfo.getNewMulticastMessage(cmsg, needSorting(), getInstance());
134
135
136     ComlibSectionHashObject *obj = NULL;
137             
138     //    CkAssert(newmsg!=NULL); // Previously the following code was just not called in this case
139     
140     if(newmsg !=NULL){
141       // Add the section to the hashtable, so we can use it in the future
142       ComlibPrintf("[%d] calling insertSectionID\n", CkMyPe());
143       ComlibSectionHashObject *obj_inserted = insertSectionID(sid, newmsg->nPes, newmsg->indicesCount);
144       
145       envelope *newenv = UsrToEnv(newmsg);
146       CkPackMessage(&newenv);
147     
148       ComlibSectionHashKey key(CkMyPe(), sid->_cookie.info.sInfo.cInfo.id);        
149     
150       obj = sec_ht.get(key);
151       ComlibPrintf("[%d] looking up key sid->_cookie.sInfo.cInfo.id=%d. Found obj=%p\n", CkMyPe(), (int)sid->_cookie.info.sInfo.cInfo.id, obj);
152       CkAssert(obj_inserted == obj);
153     
154
155     
156       if(obj == NULL){
157         CkPrintf("[%d] WARNING: Cannot Find ComlibRectSectionHashObject object in hash table sec_ht!\n", CkMyPe());
158         CkAbort("Cannot Find object. sec_ht.get(key)==NULL");
159         // If the number of array elements is fewer than the number of PEs, this happens frequently
160       } else {
161       
162         char *msg = cmsg->getCharmMessage();
163         localMulticast(UsrToEnv(msg), obj, (CkMcastBaseMsg*)msg);
164         CkFreeMsg(msg);
165       
166         if (newmsg != NULL) { 
167           remoteMulticast(UsrToEnv(newmsg), obj);
168         }
169       
170       }
171     }
172
173   }
174   else 
175     CkAbort("Section multicast cannot be used without a section proxy");
176
177   delete cmsg;       
178 }
179
180 ComlibSectionHashObject * MulticastStrategy::insertSectionID(CkSectionID *sid, int npes, ComlibMulticastIndexCount* pelist) {
181
182   ComlibPrintf("[%d] MulticastStrategy:insertSectionID\n",CkMyPe());
183   ComlibPrintf("[%d] MulticastStrategy:insertSectionID  sid->_cookie.sInfo.cInfo.id=%d \n",CkMyPe(),  (int)sid->_cookie.info.sInfo.cInfo.id);
184
185   //    double StartTime = CmiWallTimer();
186
187   ComlibSectionHashKey key(CkMyPe(), sid->_cookie.info.sInfo.cInfo.id);
188
189   ComlibSectionHashObject *obj = NULL;    
190   obj = sec_ht.get(key);
191     
192   if(obj != NULL) {
193     ComlibPrintf("MulticastStrategy:insertSectionID: Deleting old object on proc %d for id %d\n",
194                  CkMyPe(), sid->_cookie.info.sInfo.cInfo.id);
195     delete obj;
196   }
197
198   ComlibPrintf("[%d] Creating new ComlibSectionHashObject in insertSectionID\n", CkMyPe());
199   obj = new ComlibSectionHashObject();
200   CkArrayID aid(sid->_cookie.get_aid());
201   sinfo.getLocalIndices(sid->_nElems, sid->_elems, aid, obj->indices);
202     
203   createObjectOnSrcPe(obj, npes, pelist);
204   sec_ht.put(key) = obj;
205   ComlibPrintf("[%d] Inserting object %p into sec_ht\n", CkMyPe(), obj);
206   ComlibPrintf("[%d] sec_ht.numObjects() =%d\n", CkMyPe(), sec_ht.numObjects());
207
208   return obj;
209
210   //    traceUserBracketEvent( 2204, StartTime, CmiWallTimer()); 
211 }
212
213
214 extern void CmiReference(void *);
215
216 //Send the multicast message the local array elements. The message is 
217 //copied and sent if elements exist. 
218 void MulticastStrategy::localMulticast(envelope *env, 
219                                        ComlibSectionHashObject *obj,
220                                        CkMcastBaseMsg *base) {
221         
222   //    double StartTime = CmiWallTimer();
223         
224   int nIndices = obj->indices.size();
225
226   if(obj->msg != NULL) {
227     CmiFree(obj->msg);
228     obj->msg = NULL;
229   }
230
231   ComlibPrintf("[%d] localMulticast nIndices=%d\n", CkMyPe(), nIndices);
232         
233   if(nIndices > 0) {
234     void *msg = EnvToUsr(env);
235     void *msg1 = msg;
236
237     msg1 = CkCopyMsg(&msg);
238
239     CmiReference(UsrToEnv(msg1));
240     obj->msg = (void *)UsrToEnv(msg1);
241
242     int reply = ComlibArrayInfo::localMulticast(&(obj->indices), UsrToEnv(msg1));
243     if (reply > 0) {
244       // some of the objects were not local, get the update!
245       CkMcastBaseMsg *errorMsg = sinfo.getNewDeliveryErrorMsg(base);
246       envelope *errorEnv = UsrToEnv(errorMsg);
247       CmiSetHandler(errorEnv, CkpvAccess(comlib_handler));
248       ((CmiMsgHeaderExt *) errorEnv)->stratid = getInstance();
249       CmiSyncSendAndFree(env->getSrcPe(), errorEnv->getTotalsize(), (char*)errorEnv);
250     }
251   }
252         
253   //    traceUserBracketEvent( 2200, StartTime, CmiWallTimer()); 
254         
255 }
256
257
258 //Calls default multicast scheme to send the messages. It could 
259 //also call a converse lower level strategy to do the muiticast.
260 //For example pipelined multicast
261 void MulticastStrategy::remoteMulticast(envelope *env, 
262                                         ComlibSectionHashObject *obj) {
263     
264   //    double StartTime = CmiWallTimer();
265         
266   int npes = obj->npes;
267   int *pelist = obj->pelist;
268     
269   if(npes == 0) {
270     CmiFree(env);
271     return;    
272   }
273     
274   //CmiSetHandler(env, handlerId);
275   CmiSetHandler(env, CkpvAccess(comlib_handler));
276
277   ((CmiMsgHeaderExt *) env)->stratid = getInstance();
278
279   //Collect Multicast Statistics
280   RECORD_SENDM_STATS(getInstance(), env->getTotalsize(), pelist, npes);
281     
282   CkPackMessage(&env);
283   //Sending a remote multicast
284     
285   ComlibPrintf("[%d] remoteMulticast Sending to %d PEs: \n", CkMyPe(), npes);
286   for(int i=0;i<npes;i++){
287     ComlibPrintf("[%d]    %d\n", CkMyPe(), pelist[i]);
288   }
289     
290   CmiSyncListSendAndFree(npes, pelist, env->getTotalsize(), (char*)env);
291   //CmiSyncBroadcastAndFree(env->getTotalsize(), (char*)env);
292
293   //    traceUserBracketEvent( 2201, StartTime, CmiWallTimer()); 
294
295 }
296
297 void MulticastStrategy::pup(PUP::er &p){
298   Strategy::pup(p);
299   CharmStrategy::pup(p);
300 }
301
302
303 void MulticastStrategy::handleMessage(void *msg){
304
305         
306   //    double StartTime = CmiWallTimer();
307         
308   envelope *env = (envelope *)msg;
309   RECORD_RECV_STATS(getInstance(), env->getTotalsize(), env->getSrcPe());
310
311   //Section multicast base message
312   CkMcastBaseMsg *cbmsg = (CkMcastBaseMsg *)EnvToUsr(env);
313   if (cbmsg->magic != _SECTION_MAGIC) CkAbort("MulticastStrategy received bad message! Did you forget to inherit from CkMcastBaseMsg?\n");
314     
315   int status = cbmsg->_cookie.info.sInfo.cInfo.status;
316   ComlibPrintf("[%d] In handleMulticastMessage %d\n", CkMyPe(), status);
317     
318   if(status == COMLIB_MULTICAST_NEW_SECTION)
319     handleNewMulticastMessage(env);
320   else if (status == COMLIB_MULTICAST_SECTION_ERROR) {
321     // some objects were not on the correct processor, mark the section as
322     // old. next time we try to use it, a new one will be generated with the
323     // updated inforamtion in the location manager (since the wrong delivery
324     // updated it indirectly.
325     ComlibSectionHashKey key(cbmsg->_cookie.get_pe(), 
326                              cbmsg->_cookie.info.sInfo.cInfo.id);    
327         
328     ComlibSectionHashObject *obj;
329     obj = sec_ht.get(key);
330
331     if(obj == NULL)
332       CkAbort("Destination indices is NULL\n");
333
334     // mark the section as old
335     obj->isOld = 1;
336   } else if (status == COMLIB_MULTICAST_OLD_SECTION) {
337     //status == COMLIB_MULTICAST_OLD_SECTION, use the cached section id
338     ComlibSectionHashKey key(cbmsg->_cookie.get_pe(), 
339                              cbmsg->_cookie.info.sInfo.cInfo.id);    
340         
341     ComlibSectionHashObject *obj;
342     obj = sec_ht.get(key);
343         
344     if(obj == NULL)
345       CkAbort("Destination indices is NULL\n");
346         
347     localMulticast(env, obj, cbmsg);
348     remoteMulticast(env, obj);
349   } else {
350     CkAbort("Multicast message status is zero\n");
351   }
352
353   //    traceUserBracketEvent( 2202, StartTime, CmiWallTimer()); 
354
355 }
356
357
358 void MulticastStrategy::handleNewMulticastMessage(envelope *env) {
359     
360   //    double StartTime = CmiWallTimer();
361
362   ComlibPrintf("%d : In handleNewMulticastMessage\n", CkMyPe());
363   ComlibPrintf("%d : In handleNewMulticastMessage\n", CkMyPe());
364
365   CkUnpackMessage(&env);
366
367   int localElems;
368   envelope *newenv;
369   CkArrayIndex *local_idx_list;    
370     
371   // Extract the list of elements to be delivered locally
372   sinfo.unpack(env, localElems, local_idx_list, newenv);
373
374   ComlibMulticastMsg *cbmsg = (ComlibMulticastMsg *)EnvToUsr(env);
375   ComlibSectionHashKey key(cbmsg->_cookie.get_pe(), 
376                            cbmsg->_cookie.info.sInfo.cInfo.id);
377     
378   ComlibSectionHashObject *old_obj = NULL;
379     
380   old_obj = sec_ht.get(key);
381   if(old_obj != NULL) {
382     delete old_obj;
383   }
384
385   /*
386     CkArrayIndex *idx_list_array = new CkArrayIndex[idx_list.size()];
387     for(int count = 0; count < idx_list.size(); count++)
388     idx_list_array[count] = idx_list[count];
389   */
390
391   ComlibPrintf("[%d] Creating new ComlibSectionHashObject in handleNewMulticastMessage\n", CkMyPe());
392   ComlibSectionHashObject *new_obj = new ComlibSectionHashObject();
393   new_obj->indices.resize(0);
394   for (int i=0; i<localElems; ++i) new_obj->indices.insertAtEnd(local_idx_list[i]);
395     
396   createObjectOnIntermediatePe(new_obj, cbmsg->nPes, cbmsg->indicesCount, cbmsg->_cookie.get_pe());
397
398   ComlibPrintf("[%d] Inserting object into sec_ht\n", CkMyPe());
399   ComlibPrintf("[%d] sec_ht.numObjects() =%d\n", CkMyPe(), sec_ht.numObjects());
400
401   sec_ht.put(key) = new_obj;
402
403     
404   /* local multicast must come before remote multicast because the second can delete
405    * the passed env parameter, and cbmsg is part of env!
406    */
407   //    traceUserBracketEvent( 2203, StartTime, CmiWallTimer()); 
408
409   localMulticast(newenv, new_obj, cbmsg); //local multicast always copies
410   remoteMulticast(env, new_obj);
411   CmiFree(newenv);   
412     
413 }
414
415 /*@}*/