The new version of comlib! This version passed "make test" in charm/tests on order...
[charm.git] / src / ck-com / MulticastStrategy.C
1 /**
2    @addtogroup ComlibCharmStrategy
3    @{
4    @file
5
6    MulticastStrategy and its
7    derivatives, multicast messages to a section of array elements
8    created on the fly. The section is invoked by calling a
9    section proxy. These strategies can also multicast to a subset
10    of processors for groups.
11    
12    These strategies are non-bracketed. When the first request is
13    made a route is dynamically built on the section. The route
14    information is stored in
15
16  - Sameer Kumar
17  - Heavily revised by Filippo Gioachin 2/2006
18
19 */
20
21
22 #include "MulticastStrategy.h"
23
24 CkpvExtern(CkGroupID, cmgrID);
25
26
27 MulticastStrategy::MulticastStrategy(int isPersistent)
28   : Strategy(), CharmStrategy() {
29
30   ComlibPrintf("MulticastStrategy constructor\n");
31   //ainfo.setDestinationArray(aid);
32   setType(ARRAY_STRATEGY);
33
34   this->isPersistent = isPersistent;
35 }
36
37 //Destroy all old built routes
38 MulticastStrategy::~MulticastStrategy() {
39     
40   ComlibPrintf("MulticastStrategy destructor\n");
41
42   if(getLearner() != NULL)
43     delete getLearner();
44         
45   CkHashtableIterator *ht_iterator = sec_ht.iterator();
46   ht_iterator->seekStart();
47   while(ht_iterator->hasNext()){
48     void **data;
49     data = (void **)ht_iterator->next();        
50     ComlibSectionHashObject *obj = (ComlibSectionHashObject *) (* data);
51     if(obj != NULL)
52       delete obj;
53   }
54 }
55
56 #if 0
57 void rewritePEs(CharmMessageHolder *cmsg){
58   ComlibPrintf("[%d] rewritePEs insertMessage \n",CkMyPe());
59     
60   CkAssert(cmsg->dest_proc == IS_SECTION_MULTICAST);
61     
62   void *m = cmsg->getCharmMessage();
63   envelope *env = UsrToEnv(m);
64     
65   ComlibMulticastMsg *msg = (ComlibMulticastMsg *)m;
66     
67 }
68 #endif
69
70 void MulticastStrategy::insertMessage(CharmMessageHolder *cmsg){
71     
72   ComlibPrintf("[%d] Comlib Section Multicast: insertMessage \n",  CkMyPe());   
73   // ComlibPrintf("[%d] Comlib Section Multicast: insertMessage \n",  CkMyPe());   
74   
75   
76   ComlibPrintf("[%d] sec_ht.numObjects() =%d\n", CkMyPe(), sec_ht.numObjects());
77   
78   
79   if(cmsg->dest_proc == IS_SECTION_MULTICAST && cmsg->sec_id != NULL) { 
80     ComlibPrintf("[%d] Comlib Section Multicast: looking up cur_sec_id\n",CkMyPe());
81         
82     CkSectionID *sid = cmsg->sec_id;
83
84     // This is a sanity check if we only use a tiny chare array
85     // if(sid->_nElems > 4 || sid->_nElems<0){
86     //       CkPrintf("[%d] Warning!!!!!!!!!!! Section ID in message seems to be screwed up. cmg=%p sid=%p sid->_nElems=%d\n", CkMyPe(), cmsg, sid, (int)sid->_nElems);
87     //       CkAbort("");
88     //     }
89     int cur_sec_id = sid->getSectionID();
90
91     if(cur_sec_id > 0) {
92       sinfo.processOldSectionMessage(cmsg);
93       ComlibPrintf("Array section id was %d, but now is %d\n", cur_sec_id, sid->getSectionID());
94       CkAssert(cur_sec_id == sid->getSectionID());
95
96       ComlibPrintf("[%d] Comlib Section Multicast: insertMessage: cookiePE=%d\n",CkMyPe(),sid->_cookie.pe);
97       ComlibSectionHashKey key(CkMyPe(), cur_sec_id);
98       ComlibSectionHashObject *obj = sec_ht.get(key);
99
100       if(obj == NULL) {
101         //CkAbort("Cannot Find Section\n");
102         /* The object can be NULL for various reasons:
103          * 1) the user reassociated the section proxy with a different
104          *    multicast strategy, in which case the new one has no idea about
105          *    the previous usage of the proxy, but the proxy has the cur_sec_id
106          *    set by the previous strategy
107          * 2) the proxy migrated to another processor, in which case the
108          *    cur_sec_id is non null, but the CkMyPe changed, so the hashed
109          *    object could not be found (Filippo: I'm not sure if the id will
110          *    be reset upon migration, so if this case if possible)
111          */
112       }
113
114       /* In the following if, the check (CkMyPe == sid->_cookie.pe) helps identifying situations
115        * where the proxy has migrated from one processor to another. In this situation, the
116        * destination processor might find an "obj", created by somebody else. This "obj"
117        * is accepted only if the current processor is equal to the processor in which the
118        * cookie ID was defined. */
119       if (obj != NULL && CkMyPe() == sid->_cookie.pe && !obj->isOld) {
120         envelope *env = UsrToEnv(cmsg->getCharmMessage());
121         localMulticast(env, obj, (CkMcastBaseMsg*)cmsg->getCharmMessage());
122         remoteMulticast(env, obj);
123
124         delete cmsg;
125         return;
126       }
127     }
128
129
130     // reaching here means the message was not sent as old, either because
131     // it is the first for this section or the existing section is old.
132     ComlibPrintf("[%d] MulticastStrategy, creating a new multicast path\n", CkMyPe());
133         
134     //New sec id, so send it along with the message
135     ComlibMulticastMsg *newmsg = sinfo.getNewMulticastMessage(cmsg, needSorting(), getInstance());
136
137
138     ComlibSectionHashObject *obj = NULL;
139             
140     //    CkAssert(newmsg!=NULL); // Previously the following code was just not called in this case
141     
142     if(newmsg !=NULL){
143       // Add the section to the hashtable, so we can use it in the future
144       ComlibPrintf("[%d] calling insertSectionID\n", CkMyPe());
145       ComlibSectionHashObject *obj_inserted = insertSectionID(sid, newmsg->nPes, newmsg->indicesCount);
146       
147       envelope *newenv = UsrToEnv(newmsg);
148       CkPackMessage(&newenv);
149     
150       ComlibSectionHashKey key(CkMyPe(), sid->_cookie.sInfo.cInfo.id);        
151     
152       obj = sec_ht.get(key);
153       ComlibPrintf("[%d] looking up key sid->_cookie.sInfo.cInfo.id=%d. Found obj=%p\n", CkMyPe(), (int)sid->_cookie.sInfo.cInfo.id, obj);
154       CkAssert(obj_inserted == obj);
155     
156
157     
158       if(obj == NULL){
159         CkPrintf("[%d] WARNING: Cannot Find ComlibRectSectionHashObject object in hash table sec_ht!\n", CkMyPe());
160         CkAbort("Cannot Find object. sec_ht.get(key)==NULL");
161         // If the number of array elements is fewer than the number of PEs, this happens frequently
162       } else {
163       
164         char *msg = cmsg->getCharmMessage();
165         localMulticast(UsrToEnv(msg), obj, (CkMcastBaseMsg*)msg);
166         CkFreeMsg(msg);
167       
168         if (newmsg != NULL) { 
169           remoteMulticast(UsrToEnv(newmsg), obj);
170         }
171       
172       }
173     }
174
175   }
176   else 
177     CkAbort("Section multicast cannot be used without a section proxy");
178
179   delete cmsg;       
180 }
181
182 ComlibSectionHashObject * MulticastStrategy::insertSectionID(CkSectionID *sid, int npes, ComlibMulticastIndexCount* pelist) {
183
184   ComlibPrintf("[%d] MulticastStrategy:insertSectionID\n",CkMyPe());
185   ComlibPrintf("[%d] MulticastStrategy:insertSectionID  sid->_cookie.sInfo.cInfo.id=%d \n",CkMyPe(),  (int)sid->_cookie.sInfo.cInfo.id);
186
187   //    double StartTime = CmiWallTimer();
188
189   ComlibSectionHashKey key(CkMyPe(), sid->_cookie.sInfo.cInfo.id);
190
191   ComlibSectionHashObject *obj = NULL;    
192   obj = sec_ht.get(key);
193     
194   if(obj != NULL) {
195     ComlibPrintf("MulticastStrategy:insertSectionID: Deleting old object on proc %d for id %d\n",
196                  CkMyPe(), sid->_cookie.sInfo.cInfo.id);
197     delete obj;
198   }
199
200   ComlibPrintf("[%d] Creating new ComlibSectionHashObject in insertSectionID\n", CkMyPe());
201   obj = new ComlibSectionHashObject();
202   sinfo.getLocalIndices(sid->_nElems, sid->_elems, sid->_cookie.aid, obj->indices);
203     
204   createObjectOnSrcPe(obj, npes, pelist);
205   sec_ht.put(key) = obj;
206   ComlibPrintf("[%d] Inserting object %p into sec_ht\n", CkMyPe(), obj);
207   ComlibPrintf("[%d] sec_ht.numObjects() =%d\n", CkMyPe(), sec_ht.numObjects());
208
209   return obj;
210
211   //    traceUserBracketEvent( 2204, StartTime, CmiWallTimer()); 
212 }
213
214
215
216 void MulticastStrategy::doneInserting(){
217   //Do nothing! It is not a bracketed strategy
218 }
219
220 extern void CmiReference(void *);
221
222 //Send the multicast message the local array elements. The message is 
223 //copied and sent if elements exist. 
224 void MulticastStrategy::localMulticast(envelope *env, 
225                                        ComlibSectionHashObject *obj,
226                                        CkMcastBaseMsg *base) {
227         
228   //    double StartTime = CmiWallTimer();
229         
230   int nIndices = obj->indices.size();
231
232   //If the library is set to persistent. 
233   //The message is stored in the library. The applications should 
234   //use the message as a readonly and it exists till the next one 
235   //comes along
236
237   if(obj->msg != NULL) {
238     CmiFree(obj->msg);
239     obj->msg = NULL;
240   }
241
242   ComlibPrintf("[%d] localMulticast nIndices=%d\n", CkMyPe(), nIndices);
243         
244   if(nIndices > 0) {
245     void *msg = EnvToUsr(env);
246     void *msg1 = msg;
247
248     msg1 = CkCopyMsg(&msg);
249
250     if(isPersistent) {
251       CmiReference(UsrToEnv(msg1));
252       obj->msg = (void *)UsrToEnv(msg1);
253     }
254
255     int reply = ComlibArrayInfo::localMulticast(&(obj->indices), UsrToEnv(msg1));
256     if (reply > 0) {
257       // some of the objects were not local, get the update!
258       CkMcastBaseMsg *errorMsg = sinfo.getNewDeliveryErrorMsg(base);
259       envelope *errorEnv = UsrToEnv(errorMsg);
260       CmiSetHandler(errorEnv, CkpvAccess(comlib_handler));
261       ((CmiMsgHeaderBasic *) errorEnv)->stratid = getInstance();
262       CmiSyncSendAndFree(env->getSrcPe(), errorEnv->getTotalsize(), (char*)errorEnv);
263     }
264   }
265         
266   //    traceUserBracketEvent( 2200, StartTime, CmiWallTimer()); 
267         
268 }
269
270
271 //Calls default multicast scheme to send the messages. It could 
272 //also call a converse lower level strategy to do the muiticast.
273 //For example pipelined multicast
274 void MulticastStrategy::remoteMulticast(envelope *env, 
275                                         ComlibSectionHashObject *obj) {
276     
277   //    double StartTime = CmiWallTimer();
278         
279   int npes = obj->npes;
280   int *pelist = obj->pelist;
281     
282   if(npes == 0) {
283     CmiFree(env);
284     return;    
285   }
286     
287   //CmiSetHandler(env, handlerId);
288   CmiSetHandler(env, CkpvAccess(comlib_handler));
289
290   ((CmiMsgHeaderBasic *) env)->stratid = getInstance();
291
292   //Collect Multicast Statistics
293   RECORD_SENDM_STATS(getInstance(), env->getTotalsize(), pelist, npes);
294     
295   CkPackMessage(&env);
296   //Sending a remote multicast
297     
298   ComlibPrintf("[%d] remoteMulticast Sending to %d PEs: \n", CkMyPe(), npes);
299   for(int i=0;i<npes;i++){
300     ComlibPrintf("[%d]    %d\n", CkMyPe(), pelist[i]);
301   }
302     
303   CmiSyncListSendAndFree(npes, pelist, env->getTotalsize(), (char*)env);
304   //CmiSyncBroadcastAndFree(env->getTotalsize(), (char*)env);
305
306   //    traceUserBracketEvent( 2201, StartTime, CmiWallTimer()); 
307
308 }
309
310 void MulticastStrategy::pup(PUP::er &p){
311   Strategy::pup(p);
312   CharmStrategy::pup(p);
313   p | isPersistent; 
314 }
315
316
317 void MulticastStrategy::handleMessage(void *msg){
318
319         
320   //    double StartTime = CmiWallTimer();
321         
322   envelope *env = (envelope *)msg;
323   RECORD_RECV_STATS(getInstance(), env->getTotalsize(), env->getSrcPe());
324
325   //Section multicast base message
326   CkMcastBaseMsg *cbmsg = (CkMcastBaseMsg *)EnvToUsr(env);
327   if (cbmsg->magic != _SECTION_MAGIC) CkAbort("MulticastStrategy received bad message! Did you forget to inherit from CkMcastBaseMsg?\n");
328     
329   int status = cbmsg->_cookie.sInfo.cInfo.status;
330   ComlibPrintf("[%d] In handleMulticastMessage %d\n", CkMyPe(), status);
331     
332   if(status == COMLIB_MULTICAST_NEW_SECTION)
333     handleNewMulticastMessage(env);
334   else if (status == COMLIB_MULTICAST_SECTION_ERROR) {
335     // some objects were not on the correct processor, mark the section as
336     // old. next time we try to use it, a new one will be generated with the
337     // updated inforamtion in the location manager (since the wrong delivery
338     // updated it indirectly.
339     ComlibSectionHashKey key(cbmsg->_cookie.pe, 
340                              cbmsg->_cookie.sInfo.cInfo.id);    
341         
342     ComlibSectionHashObject *obj;
343     obj = sec_ht.get(key);
344
345     if(obj == NULL)
346       CkAbort("Destination indices is NULL\n");
347
348     // mark the section as old
349     obj->isOld = 1;
350   } else if (status == COMLIB_MULTICAST_OLD_SECTION) {
351     //status == COMLIB_MULTICAST_OLD_SECTION, use the cached section id
352     ComlibSectionHashKey key(cbmsg->_cookie.pe, 
353                              cbmsg->_cookie.sInfo.cInfo.id);    
354         
355     ComlibSectionHashObject *obj;
356     obj = sec_ht.get(key);
357         
358     if(obj == NULL)
359       CkAbort("Destination indices is NULL\n");
360         
361     localMulticast(env, obj, cbmsg);
362     remoteMulticast(env, obj);
363   } else {
364     CkAbort("Multicast message status is zero\n");
365   }
366
367   //    traceUserBracketEvent( 2202, StartTime, CmiWallTimer()); 
368
369 }
370
371 #include <string>
372
373 void MulticastStrategy::handleNewMulticastMessage(envelope *env) {
374     
375   //    double StartTime = CmiWallTimer();
376
377   ComlibPrintf("%d : In handleNewMulticastMessage\n", CkMyPe());
378   ComlibPrintf("%d : In handleNewMulticastMessage\n", CkMyPe());
379
380   CkUnpackMessage(&env);
381
382   int localElems;
383   envelope *newenv;
384   CkArrayIndexMax *local_idx_list;    
385     
386   // Extract the list of elements to be delivered locally
387   sinfo.unpack(env, localElems, local_idx_list, newenv);
388
389   ComlibMulticastMsg *cbmsg = (ComlibMulticastMsg *)EnvToUsr(env);
390   ComlibSectionHashKey key(cbmsg->_cookie.pe, 
391                            cbmsg->_cookie.sInfo.cInfo.id);
392     
393   ComlibSectionHashObject *old_obj = NULL;
394     
395   old_obj = sec_ht.get(key);
396   if(old_obj != NULL) {
397     delete old_obj;
398   }
399
400   /*
401     CkArrayIndexMax *idx_list_array = new CkArrayIndexMax[idx_list.size()];
402     for(int count = 0; count < idx_list.size(); count++)
403     idx_list_array[count] = idx_list[count];
404   */
405
406   ComlibPrintf("[%d] Creating new ComlibSectionHashObject in handleNewMulticastMessage\n", CkMyPe());
407   ComlibSectionHashObject *new_obj = new ComlibSectionHashObject();
408   new_obj->indices.resize(0);
409   for (int i=0; i<localElems; ++i) new_obj->indices.insertAtEnd(local_idx_list[i]);
410     
411   createObjectOnIntermediatePe(new_obj, cbmsg->nPes, cbmsg->indicesCount, cbmsg->_cookie.pe);
412
413   ComlibPrintf("[%d] Inserting object into sec_ht\n", CkMyPe());
414   ComlibPrintf("[%d] sec_ht.numObjects() =%d\n", CkMyPe(), sec_ht.numObjects());
415
416   sec_ht.put(key) = new_obj;
417
418     
419   /* local multicast must come before remote multicast because the second can delete
420    * the passed env parameter, and cbmsg is part of env!
421    */
422   //    traceUserBracketEvent( 2203, StartTime, CmiWallTimer()); 
423
424   localMulticast(newenv, new_obj, cbmsg); //local multicast always copies
425   remoteMulticast(env, new_obj);
426   CmiFree(newenv);   
427     
428 }
429
430 /*@}*/