remove unnecessary include <string> .
[charm.git] / src / ck-com / MulticastStrategy.C
1 /**
2    @addtogroup ComlibCharmStrategy
3    @{
4    @file
5
6    MulticastStrategy and its
7    derivatives, multicast messages to a section of array elements
8    created on the fly. The section is invoked by calling a
9    section proxy. These strategies can also multicast to a subset
10    of processors for groups.
11    
12    These strategies are non-bracketed. When the first request is
13    made a route is dynamically built on the section. The route
14    information is stored in
15
16  - Sameer Kumar
17  - Heavily revised by Filippo Gioachin 2/2006
18
19 */
20
21
22 #include "MulticastStrategy.h"
23
24 CkpvExtern(CkGroupID, cmgrID);
25
26
27 MulticastStrategy::MulticastStrategy()
28   : Strategy(), CharmStrategy() {
29
30   ComlibPrintf("MulticastStrategy constructor\n");
31   //ainfo.setDestinationArray(aid);
32   setType(ARRAY_STRATEGY);
33 }
34
35 //Destroy all old built routes
36 MulticastStrategy::~MulticastStrategy() {
37     
38   ComlibPrintf("MulticastStrategy destructor\n");
39
40   if(getLearner() != NULL)
41     delete getLearner();
42         
43   CkHashtableIterator *ht_iterator = sec_ht.iterator();
44   ht_iterator->seekStart();
45   while(ht_iterator->hasNext()){
46     void **data;
47     data = (void **)ht_iterator->next();        
48     ComlibSectionHashObject *obj = (ComlibSectionHashObject *) (* data);
49     if(obj != NULL)
50       delete obj;
51   }
52 }
53
54 #if 0
55 void rewritePEs(CharmMessageHolder *cmsg){
56   ComlibPrintf("[%d] rewritePEs insertMessage \n",CkMyPe());
57     
58   CkAssert(cmsg->dest_proc == IS_SECTION_MULTICAST);
59     
60   void *m = cmsg->getCharmMessage();
61   envelope *env = UsrToEnv(m);
62     
63   ComlibMulticastMsg *msg = (ComlibMulticastMsg *)m;
64     
65 }
66 #endif
67
68 void MulticastStrategy::insertMessage(CharmMessageHolder *cmsg){
69     
70   ComlibPrintf("[%d] Comlib Section Multicast: insertMessage \n",  CkMyPe());   
71   // ComlibPrintf("[%d] Comlib Section Multicast: insertMessage \n",  CkMyPe());   
72   
73   
74   ComlibPrintf("[%d] sec_ht.numObjects() =%d\n", CkMyPe(), sec_ht.numObjects());
75   
76   
77   if(cmsg->dest_proc == IS_SECTION_MULTICAST && cmsg->sec_id != NULL) { 
78     ComlibPrintf("[%d] Comlib Section Multicast: looking up cur_sec_id\n",CkMyPe());
79         
80     CkSectionID *sid = cmsg->sec_id;
81
82     // This is a sanity check if we only use a tiny chare array
83     // if(sid->_nElems > 4 || sid->_nElems<0){
84     //       CkPrintf("[%d] Warning!!!!!!!!!!! Section ID in message seems to be screwed up. cmg=%p sid=%p sid->_nElems=%d\n", CkMyPe(), cmsg, sid, (int)sid->_nElems);
85     //       CkAbort("");
86     //     }
87     int cur_sec_id = sid->getSectionID();
88
89     if(cur_sec_id > 0) {
90       sinfo.processOldSectionMessage(cmsg);
91       ComlibPrintf("Array section id was %d, but now is %d\n", cur_sec_id, sid->getSectionID());
92       CkAssert(cur_sec_id == sid->getSectionID());
93
94       ComlibPrintf("[%d] Comlib Section Multicast: insertMessage: cookiePE=%d\n",CkMyPe(),sid->_cookie.pe);
95       ComlibSectionHashKey key(CkMyPe(), cur_sec_id);
96       ComlibSectionHashObject *obj = sec_ht.get(key);
97
98       if(obj == NULL) {
99         //CkAbort("Cannot Find Section\n");
100         /* The object can be NULL for various reasons:
101          * 1) the user reassociated the section proxy with a different
102          *    multicast strategy, in which case the new one has no idea about
103          *    the previous usage of the proxy, but the proxy has the cur_sec_id
104          *    set by the previous strategy
105          * 2) the proxy migrated to another processor, in which case the
106          *    cur_sec_id is non null, but the CkMyPe changed, so the hashed
107          *    object could not be found (Filippo: I'm not sure if the id will
108          *    be reset upon migration, so if this case if possible)
109          */
110       }
111
112       /* In the following if, the check (CkMyPe == sid->_cookie.pe) helps identifying situations
113        * where the proxy has migrated from one processor to another. In this situation, the
114        * destination processor might find an "obj", created by somebody else. This "obj"
115        * is accepted only if the current processor is equal to the processor in which the
116        * cookie ID was defined. */
117       if (obj != NULL && CkMyPe() == sid->_cookie.pe && !obj->isOld) {
118         envelope *env = UsrToEnv(cmsg->getCharmMessage());
119         localMulticast(env, obj, (CkMcastBaseMsg*)cmsg->getCharmMessage());
120         remoteMulticast(env, obj);
121
122         delete cmsg;
123         return;
124       }
125     }
126
127
128     // reaching here means the message was not sent as old, either because
129     // it is the first for this section or the existing section is old.
130     ComlibPrintf("[%d] MulticastStrategy, creating a new multicast path\n", CkMyPe());
131         
132     //New sec id, so send it along with the message
133     ComlibMulticastMsg *newmsg = sinfo.getNewMulticastMessage(cmsg, needSorting(), getInstance());
134
135
136     ComlibSectionHashObject *obj = NULL;
137             
138     //    CkAssert(newmsg!=NULL); // Previously the following code was just not called in this case
139     
140     if(newmsg !=NULL){
141       // Add the section to the hashtable, so we can use it in the future
142       ComlibPrintf("[%d] calling insertSectionID\n", CkMyPe());
143       ComlibSectionHashObject *obj_inserted = insertSectionID(sid, newmsg->nPes, newmsg->indicesCount);
144       
145       envelope *newenv = UsrToEnv(newmsg);
146       CkPackMessage(&newenv);
147     
148       ComlibSectionHashKey key(CkMyPe(), sid->_cookie.sInfo.cInfo.id);        
149     
150       obj = sec_ht.get(key);
151       ComlibPrintf("[%d] looking up key sid->_cookie.sInfo.cInfo.id=%d. Found obj=%p\n", CkMyPe(), (int)sid->_cookie.sInfo.cInfo.id, obj);
152       CkAssert(obj_inserted == obj);
153     
154
155     
156       if(obj == NULL){
157         CkPrintf("[%d] WARNING: Cannot Find ComlibRectSectionHashObject object in hash table sec_ht!\n", CkMyPe());
158         CkAbort("Cannot Find object. sec_ht.get(key)==NULL");
159         // If the number of array elements is fewer than the number of PEs, this happens frequently
160       } else {
161       
162         char *msg = cmsg->getCharmMessage();
163         localMulticast(UsrToEnv(msg), obj, (CkMcastBaseMsg*)msg);
164         CkFreeMsg(msg);
165       
166         if (newmsg != NULL) { 
167           remoteMulticast(UsrToEnv(newmsg), obj);
168         }
169       
170       }
171     }
172
173   }
174   else 
175     CkAbort("Section multicast cannot be used without a section proxy");
176
177   delete cmsg;       
178 }
179
180 ComlibSectionHashObject * MulticastStrategy::insertSectionID(CkSectionID *sid, int npes, ComlibMulticastIndexCount* pelist) {
181
182   ComlibPrintf("[%d] MulticastStrategy:insertSectionID\n",CkMyPe());
183   ComlibPrintf("[%d] MulticastStrategy:insertSectionID  sid->_cookie.sInfo.cInfo.id=%d \n",CkMyPe(),  (int)sid->_cookie.sInfo.cInfo.id);
184
185   //    double StartTime = CmiWallTimer();
186
187   ComlibSectionHashKey key(CkMyPe(), sid->_cookie.sInfo.cInfo.id);
188
189   ComlibSectionHashObject *obj = NULL;    
190   obj = sec_ht.get(key);
191     
192   if(obj != NULL) {
193     ComlibPrintf("MulticastStrategy:insertSectionID: Deleting old object on proc %d for id %d\n",
194                  CkMyPe(), sid->_cookie.sInfo.cInfo.id);
195     delete obj;
196   }
197
198   ComlibPrintf("[%d] Creating new ComlibSectionHashObject in insertSectionID\n", CkMyPe());
199   obj = new ComlibSectionHashObject();
200   sinfo.getLocalIndices(sid->_nElems, sid->_elems, sid->_cookie.aid, obj->indices);
201     
202   createObjectOnSrcPe(obj, npes, pelist);
203   sec_ht.put(key) = obj;
204   ComlibPrintf("[%d] Inserting object %p into sec_ht\n", CkMyPe(), obj);
205   ComlibPrintf("[%d] sec_ht.numObjects() =%d\n", CkMyPe(), sec_ht.numObjects());
206
207   return obj;
208
209   //    traceUserBracketEvent( 2204, StartTime, CmiWallTimer()); 
210 }
211
212
213 extern void CmiReference(void *);
214
215 //Send the multicast message the local array elements. The message is 
216 //copied and sent if elements exist. 
217 void MulticastStrategy::localMulticast(envelope *env, 
218                                        ComlibSectionHashObject *obj,
219                                        CkMcastBaseMsg *base) {
220         
221   //    double StartTime = CmiWallTimer();
222         
223   int nIndices = obj->indices.size();
224
225   if(obj->msg != NULL) {
226     CmiFree(obj->msg);
227     obj->msg = NULL;
228   }
229
230   ComlibPrintf("[%d] localMulticast nIndices=%d\n", CkMyPe(), nIndices);
231         
232   if(nIndices > 0) {
233     void *msg = EnvToUsr(env);
234     void *msg1 = msg;
235
236     msg1 = CkCopyMsg(&msg);
237
238     CmiReference(UsrToEnv(msg1));
239     obj->msg = (void *)UsrToEnv(msg1);
240
241     int reply = ComlibArrayInfo::localMulticast(&(obj->indices), UsrToEnv(msg1));
242     if (reply > 0) {
243       // some of the objects were not local, get the update!
244       CkMcastBaseMsg *errorMsg = sinfo.getNewDeliveryErrorMsg(base);
245       envelope *errorEnv = UsrToEnv(errorMsg);
246       CmiSetHandler(errorEnv, CkpvAccess(comlib_handler));
247       ((CmiMsgHeaderBasic *) errorEnv)->stratid = getInstance();
248       CmiSyncSendAndFree(env->getSrcPe(), errorEnv->getTotalsize(), (char*)errorEnv);
249     }
250   }
251         
252   //    traceUserBracketEvent( 2200, StartTime, CmiWallTimer()); 
253         
254 }
255
256
257 //Calls default multicast scheme to send the messages. It could 
258 //also call a converse lower level strategy to do the muiticast.
259 //For example pipelined multicast
260 void MulticastStrategy::remoteMulticast(envelope *env, 
261                                         ComlibSectionHashObject *obj) {
262     
263   //    double StartTime = CmiWallTimer();
264         
265   int npes = obj->npes;
266   int *pelist = obj->pelist;
267     
268   if(npes == 0) {
269     CmiFree(env);
270     return;    
271   }
272     
273   //CmiSetHandler(env, handlerId);
274   CmiSetHandler(env, CkpvAccess(comlib_handler));
275
276   ((CmiMsgHeaderBasic *) env)->stratid = getInstance();
277
278   //Collect Multicast Statistics
279   RECORD_SENDM_STATS(getInstance(), env->getTotalsize(), pelist, npes);
280     
281   CkPackMessage(&env);
282   //Sending a remote multicast
283     
284   ComlibPrintf("[%d] remoteMulticast Sending to %d PEs: \n", CkMyPe(), npes);
285   for(int i=0;i<npes;i++){
286     ComlibPrintf("[%d]    %d\n", CkMyPe(), pelist[i]);
287   }
288     
289   CmiSyncListSendAndFree(npes, pelist, env->getTotalsize(), (char*)env);
290   //CmiSyncBroadcastAndFree(env->getTotalsize(), (char*)env);
291
292   //    traceUserBracketEvent( 2201, StartTime, CmiWallTimer()); 
293
294 }
295
296 void MulticastStrategy::pup(PUP::er &p){
297   Strategy::pup(p);
298   CharmStrategy::pup(p);
299 }
300
301
302 void MulticastStrategy::handleMessage(void *msg){
303
304         
305   //    double StartTime = CmiWallTimer();
306         
307   envelope *env = (envelope *)msg;
308   RECORD_RECV_STATS(getInstance(), env->getTotalsize(), env->getSrcPe());
309
310   //Section multicast base message
311   CkMcastBaseMsg *cbmsg = (CkMcastBaseMsg *)EnvToUsr(env);
312   if (cbmsg->magic != _SECTION_MAGIC) CkAbort("MulticastStrategy received bad message! Did you forget to inherit from CkMcastBaseMsg?\n");
313     
314   int status = cbmsg->_cookie.sInfo.cInfo.status;
315   ComlibPrintf("[%d] In handleMulticastMessage %d\n", CkMyPe(), status);
316     
317   if(status == COMLIB_MULTICAST_NEW_SECTION)
318     handleNewMulticastMessage(env);
319   else if (status == COMLIB_MULTICAST_SECTION_ERROR) {
320     // some objects were not on the correct processor, mark the section as
321     // old. next time we try to use it, a new one will be generated with the
322     // updated inforamtion in the location manager (since the wrong delivery
323     // updated it indirectly.
324     ComlibSectionHashKey key(cbmsg->_cookie.pe, 
325                              cbmsg->_cookie.sInfo.cInfo.id);    
326         
327     ComlibSectionHashObject *obj;
328     obj = sec_ht.get(key);
329
330     if(obj == NULL)
331       CkAbort("Destination indices is NULL\n");
332
333     // mark the section as old
334     obj->isOld = 1;
335   } else if (status == COMLIB_MULTICAST_OLD_SECTION) {
336     //status == COMLIB_MULTICAST_OLD_SECTION, use the cached section id
337     ComlibSectionHashKey key(cbmsg->_cookie.pe, 
338                              cbmsg->_cookie.sInfo.cInfo.id);    
339         
340     ComlibSectionHashObject *obj;
341     obj = sec_ht.get(key);
342         
343     if(obj == NULL)
344       CkAbort("Destination indices is NULL\n");
345         
346     localMulticast(env, obj, cbmsg);
347     remoteMulticast(env, obj);
348   } else {
349     CkAbort("Multicast message status is zero\n");
350   }
351
352   //    traceUserBracketEvent( 2202, StartTime, CmiWallTimer()); 
353
354 }
355
356
357 void MulticastStrategy::handleNewMulticastMessage(envelope *env) {
358     
359   //    double StartTime = CmiWallTimer();
360
361   ComlibPrintf("%d : In handleNewMulticastMessage\n", CkMyPe());
362   ComlibPrintf("%d : In handleNewMulticastMessage\n", CkMyPe());
363
364   CkUnpackMessage(&env);
365
366   int localElems;
367   envelope *newenv;
368   CkArrayIndexMax *local_idx_list;    
369     
370   // Extract the list of elements to be delivered locally
371   sinfo.unpack(env, localElems, local_idx_list, newenv);
372
373   ComlibMulticastMsg *cbmsg = (ComlibMulticastMsg *)EnvToUsr(env);
374   ComlibSectionHashKey key(cbmsg->_cookie.pe, 
375                            cbmsg->_cookie.sInfo.cInfo.id);
376     
377   ComlibSectionHashObject *old_obj = NULL;
378     
379   old_obj = sec_ht.get(key);
380   if(old_obj != NULL) {
381     delete old_obj;
382   }
383
384   /*
385     CkArrayIndexMax *idx_list_array = new CkArrayIndexMax[idx_list.size()];
386     for(int count = 0; count < idx_list.size(); count++)
387     idx_list_array[count] = idx_list[count];
388   */
389
390   ComlibPrintf("[%d] Creating new ComlibSectionHashObject in handleNewMulticastMessage\n", CkMyPe());
391   ComlibSectionHashObject *new_obj = new ComlibSectionHashObject();
392   new_obj->indices.resize(0);
393   for (int i=0; i<localElems; ++i) new_obj->indices.insertAtEnd(local_idx_list[i]);
394     
395   createObjectOnIntermediatePe(new_obj, cbmsg->nPes, cbmsg->indicesCount, cbmsg->_cookie.pe);
396
397   ComlibPrintf("[%d] Inserting object into sec_ht\n", CkMyPe());
398   ComlibPrintf("[%d] sec_ht.numObjects() =%d\n", CkMyPe(), sec_ht.numObjects());
399
400   sec_ht.put(key) = new_obj;
401
402     
403   /* local multicast must come before remote multicast because the second can delete
404    * the passed env parameter, and cbmsg is part of env!
405    */
406   //    traceUserBracketEvent( 2203, StartTime, CmiWallTimer()); 
407
408   localMulticast(newenv, new_obj, cbmsg); //local multicast always copies
409   remoteMulticast(env, new_obj);
410   CmiFree(newenv);   
411     
412 }
413
414 /*@}*/