New version with migration and forwarding always working. The test program also works.
[charm.git] / src / ck-com / DirectMulticastStrategy.C
1 #include "DirectMulticastStrategy.h"
2
3 CkpvExtern(CkGroupID, cmgrID);
4
5 void *DMHandler(void *msg){
6     ComlibPrintf("[%d]:In CallbackHandler\n", CkMyPe());
7     DirectMulticastStrategy *nm_mgr;    
8     
9     CkMcastBaseMsg *bmsg = (CkMcastBaseMsg *)EnvToUsr((envelope *)msg);
10     int instid = bmsg->_cookie.sInfo.cInfo.instId;
11     
12     nm_mgr = (DirectMulticastStrategy *) 
13         CProxy_ComlibManager(CkpvAccess(cmgrID)).
14         ckLocalBranch()->getStrategy(instid);
15     
16     nm_mgr->handleMulticastMessage(msg);
17     return NULL;
18 }
19
20 //Group Constructor
21 DirectMulticastStrategy::DirectMulticastStrategy(int ndest, int *pelist)
22     : CharmStrategy() {
23  
24     setType(GROUP_STRATEGY);
25     
26     ndestpes = ndest;
27     destpelist = pelist;
28
29     commonInit();
30 }
31
32 DirectMulticastStrategy::DirectMulticastStrategy(CkArrayID aid)
33     :  CharmStrategy() {
34
35     ainfo.setDestinationArray(aid);
36     setType(ARRAY_STRATEGY);
37     ndestpes = 0;
38     destpelist = 0;
39     commonInit();
40 }
41
42 void DirectMulticastStrategy::commonInit(){
43
44     if(ndestpes == 0) {
45         ndestpes = CkNumPes();
46         destpelist = new int[CkNumPes()];
47         for(int count = 0; count < CkNumPes(); count ++)
48             destpelist[count] = count;        
49     }
50 }
51
52 void DirectMulticastStrategy::insertMessage(CharmMessageHolder *cmsg){
53     if(messageBuf == NULL) {
54         CkPrintf("ERROR MESSAGE BUF IS NULL\n");
55         return;
56     }
57
58     ComlibPrintf("[%d] Comlib Direct Multicast: insertMessage \n", 
59                  CkMyPe());   
60    
61     if(cmsg->dest_proc == IS_MULTICAST && cmsg->sec_id != NULL) {        
62         int cur_sec_id = ComlibSectionInfo::getSectionID(*cmsg->sec_id);
63
64         if(cur_sec_id > 0) {        
65             sinfo.processOldSectionMessage(cmsg);
66         }
67         else {
68             CkSectionID *sid = cmsg->sec_id;
69
70             //New sec id, so send it along with the message
71             void *newmsg = sinfo.getNewMulticastMessage(cmsg);
72             CkFreeMsg(cmsg->getCharmMessage());
73             delete cmsg;
74             
75             sinfo.initSectionID(sid);
76
77             cmsg = new CharmMessageHolder((char *)newmsg, IS_MULTICAST); 
78             cmsg->sec_id = sid;
79         }        
80     }
81     
82     messageBuf->enq(cmsg);
83     if(!isBracketed())
84         doneInserting();
85 }
86
87 void DirectMulticastStrategy::doneInserting(){
88     ComlibPrintf("%d: DoneInserting \n", CkMyPe());
89     
90     if(messageBuf->length() == 0) {
91         return;
92     }
93
94     while(!messageBuf->isEmpty()) {
95         CharmMessageHolder *cmsg = messageBuf->deq();
96         char *msg = cmsg->getCharmMessage();
97                 
98         if(cmsg->dest_proc == IS_MULTICAST) {      
99             if(getType() == ARRAY_STRATEGY)
100                 CmiSetHandler(UsrToEnv(msg), handlerId);
101             
102             int *cur_map = destpelist;
103             int cur_npes = ndestpes;
104             if(cmsg->sec_id != NULL && cmsg->sec_id->pelist != NULL) {
105                 cur_map = cmsg->sec_id->pelist;
106                 cur_npes = cmsg->sec_id->npes;
107             }
108             
109             ComlibPrintf("[%d] Calling Direct Multicast %d %d %d\n", CkMyPe(),
110                          UsrToEnv(msg)->getTotalsize(), cur_npes, 
111                          cmsg->dest_proc);
112             /*
113             for(int i=0; i < cur_npes; i++)
114                 CkPrintf("[%d] Sending to %d %d\n", CkMyPe(), 
115                          cur_map[i], cur_npes);
116             */
117
118             CmiSyncListSendAndFree(cur_npes, cur_map, 
119                                    UsrToEnv(msg)->getTotalsize(), 
120                                    (char*)(UsrToEnv(msg)));            
121         }
122         else {
123             //CkPrintf("SHOULD NOT BE HERE\n");
124             CmiSyncSendAndFree(cmsg->dest_proc, 
125                                UsrToEnv(msg)->getTotalsize(), 
126                                (char *)UsrToEnv(msg));
127         }        
128         
129         delete cmsg; 
130     }
131 }
132
133 void DirectMulticastStrategy::pup(PUP::er &p){
134
135     CharmStrategy::pup(p);
136
137     p | ndestpes;
138     if(p.isUnpacking())
139         destpelist = new int[ndestpes];
140     
141     p(destpelist, ndestpes);        
142 }
143
144 void DirectMulticastStrategy::beginProcessing(int numElements){
145     
146     messageBuf = new CkQ<CharmMessageHolder *>;    
147     handlerId = CkRegisterHandler((CmiHandler)DMHandler);    
148     
149     CkArrayID dest;
150     int nidx;
151     CkArrayIndexMax *idx_list;
152
153     ainfo.getDestinationArray(dest, idx_list, nidx);
154     sinfo = ComlibSectionInfo(dest, myInstanceID);
155 }
156
157 void DirectMulticastStrategy::handleMulticastMessage(void *msg){
158     register envelope *env = (envelope *)msg;
159     
160     CkMcastBaseMsg *cbmsg = (CkMcastBaseMsg *)EnvToUsr(env);
161
162     int status = cbmsg->_cookie.sInfo.cInfo.status;
163     ComlibPrintf("[%d] In local multicast %d\n", CkMyPe(), status);
164     
165     CkVec<CkArrayIndexMax> *dest_indices; 
166     if(status == COMLIB_MULTICAST_ALL) {        
167         ainfo.localBroadcast(env);
168     }   
169     else if(status == COMLIB_MULTICAST_NEW_SECTION){        
170         CkUnpackMessage(&env);
171         envelope *newenv;
172         sinfo.unpack(env, dest_indices, newenv);
173         ComlibArrayInfo::localMulticast(dest_indices, newenv);
174
175         CkVec<CkArrayIndexMax> *old_dest_indices;
176         ComlibSectionHashKey key(cbmsg->_cookie.pe, 
177                                  cbmsg->_cookie.sInfo.cInfo.id);
178
179         old_dest_indices = (CkVec<CkArrayIndexMax> *)sec_ht.get(key);
180         if(old_dest_indices != NULL)
181             delete old_dest_indices;
182         
183         sec_ht.put(key) = dest_indices;
184         CmiFree(env);                
185     }
186     else {
187         //status == COMLIB_MULTICAST_OLD_SECTION, use the cached section id
188         ComlibSectionHashKey key(cbmsg->_cookie.pe, 
189                                  cbmsg->_cookie.sInfo.cInfo.id);    
190         dest_indices = (CkVec<CkArrayIndexMax> *)sec_ht.get(key);
191         
192         if(dest_indices == NULL)
193             CkAbort("Destination indices is NULL\n");
194         
195         ComlibArrayInfo::localMulticast(dest_indices, env);
196     }
197 }