New version of communication library with learning capabilities. Learning and dynamic...
[charm.git] / src / ck-com / DirectMulticastStrategy.C
1 #include "DirectMulticastStrategy.h"
2 #include "AAMLearner.h"
3
4 CkpvExtern(CkGroupID, cmgrID);
5
6 void *DMHandler(void *msg){
7     ComlibPrintf("[%d]:In CallbackHandler\n", CkMyPe());
8     DirectMulticastStrategy *nm_mgr;    
9     
10     CkMcastBaseMsg *bmsg = (CkMcastBaseMsg *)EnvToUsr((envelope *)msg);
11     int instid = bmsg->_cookie.sInfo.cInfo.instId;
12     
13     nm_mgr = (DirectMulticastStrategy *) 
14         CProxy_ComlibManager(CkpvAccess(cmgrID)).
15         ckLocalBranch()->getStrategy(instid);
16
17     envelope *env = (envelope *) msg;
18     RECORD_RECV_STATS(instid, env->getTotalsize(), env->getSrcPe());
19     nm_mgr->handleMulticastMessage(msg);
20     return NULL;
21 }
22
23 //Group Constructor
24 DirectMulticastStrategy::DirectMulticastStrategy(int ndest, int *pelist)
25     : CharmStrategy() {
26  
27     setType(GROUP_STRATEGY);
28     
29     ndestpes = ndest;
30     destpelist = pelist;
31
32     commonInit();
33 }
34
35 DirectMulticastStrategy::DirectMulticastStrategy(CkArrayID aid)
36     :  CharmStrategy() {
37
38     //ainfo.setSourceArray(aid);
39     ainfo.setDestinationArray(aid);
40     setType(ARRAY_STRATEGY);
41     ndestpes = 0;
42     destpelist = 0;
43     commonInit();
44 }
45
46 DirectMulticastStrategy::DirectMulticastStrategy(CkArrayID said, CkArrayID daid)
47     :  CharmStrategy() {
48
49     ainfo.setSourceArray(said);
50     ainfo.setDestinationArray(daid);
51     setType(ARRAY_STRATEGY);
52     ndestpes = 0;
53     destpelist = 0;
54     commonInit();
55 }
56
57 void DirectMulticastStrategy::commonInit(){
58
59     if(ndestpes == 0) {
60         ndestpes = CkNumPes();
61         destpelist = new int[CkNumPes()];
62         for(int count = 0; count < CkNumPes(); count ++)
63             destpelist[count] = count;        
64     }
65 }
66
67 DirectMulticastStrategy::~DirectMulticastStrategy() {
68     if(ndestpes > 0)
69         delete [] destpelist;
70
71     if(getLearner() != NULL)
72         delete getLearner();
73         
74     CkHashtableIterator *ht_iterator = sec_ht.iterator();
75     ht_iterator->seekStart();
76     while(ht_iterator->hasNext()){
77         void **data;
78         data = (void **)ht_iterator->next();        
79         CkVec<CkArrayIndexMax> *a_vec = (CkVec<CkArrayIndexMax> *) (* data);
80         if(a_vec != NULL)
81             delete a_vec;
82     }
83 }
84
85 void DirectMulticastStrategy::insertMessage(CharmMessageHolder *cmsg){
86     if(messageBuf == NULL) {
87         CkPrintf("ERROR MESSAGE BUF IS NULL\n");
88         return;
89     }
90
91     ComlibPrintf("[%d] Comlib Direct Multicast: insertMessage \n", 
92                  CkMyPe());   
93    
94     if(cmsg->dest_proc == IS_SECTION_MULTICAST && cmsg->sec_id != NULL) { 
95         int cur_sec_id = ComlibSectionInfo::getSectionID(*cmsg->sec_id);
96
97         if(cur_sec_id > 0) {        
98             sinfo.processOldSectionMessage(cmsg);
99         }
100         else {
101             CkSectionID *sid = cmsg->sec_id;
102
103             //New sec id, so send it along with the message
104             void *newmsg = sinfo.getNewMulticastMessage(cmsg);
105             CkFreeMsg(cmsg->getCharmMessage());
106             delete cmsg;
107             
108             sinfo.initSectionID(sid);
109
110             cmsg = new CharmMessageHolder((char *)newmsg, 
111                                           IS_SECTION_MULTICAST); 
112             cmsg->sec_id = sid;
113         }        
114     }
115    
116     messageBuf->enq(cmsg);
117     if(!isBracketed())
118         doneInserting();
119 }
120
121 void DirectMulticastStrategy::doneInserting(){
122     ComlibPrintf("%d: DoneInserting \n", CkMyPe());
123     
124     if(messageBuf->length() == 0) {
125         return;
126     }
127
128     while(!messageBuf->isEmpty()) {
129         CharmMessageHolder *cmsg = messageBuf->deq();
130         char *msg = cmsg->getCharmMessage();
131                 
132         if(cmsg->dest_proc == IS_SECTION_MULTICAST || 
133            cmsg->dest_proc == IS_BROADCAST) {      
134
135             if(getType() == ARRAY_STRATEGY)
136                 CmiSetHandler(UsrToEnv(msg), handlerId);
137             
138             int *cur_map = destpelist;
139             int cur_npes = ndestpes;
140             if(cmsg->sec_id != NULL && cmsg->sec_id->pelist != NULL) {
141                 cur_map = cmsg->sec_id->pelist;
142                 cur_npes = cmsg->sec_id->npes;
143             }
144             
145             //Collect Multicast Statistics
146             RECORD_SENDM_STATS(getInstance(), 
147                                ((envelope *)cmsg->getMessage())->getTotalsize(), 
148                                cur_map, cur_npes);
149
150
151             ComlibPrintf("[%d] Calling Direct Multicast %d %d %d\n", CkMyPe(),
152                          UsrToEnv(msg)->getTotalsize(), cur_npes, 
153                          cmsg->dest_proc);
154
155             /*
156               for(int i=0; i < cur_npes; i++)
157               CkPrintf("[%d] Sending to %d %d\n", CkMyPe(), 
158               cur_map[i], cur_npes);
159             */
160
161             CmiSyncListSendAndFree(cur_npes, cur_map, 
162                                    UsrToEnv(msg)->getTotalsize(), 
163                                    (char*)(UsrToEnv(msg)));            
164         }
165         else {
166             //CkPrintf("SHOULD NOT BE HERE\n");
167             CmiSyncSendAndFree(cmsg->dest_proc, 
168                                UsrToEnv(msg)->getTotalsize(), 
169                                (char *)UsrToEnv(msg));
170         }        
171         
172         delete cmsg; 
173     }
174 }
175
176 void DirectMulticastStrategy::pup(PUP::er &p){
177
178     CharmStrategy::pup(p);
179
180     p | ndestpes;
181     if(p.isUnpacking() && ndestpes > 0)
182         destpelist = new int[ndestpes];
183     
184     p(destpelist, ndestpes);        
185     
186     if(p.isUnpacking()) {
187         CkArrayID src;
188         int nidx;
189         CkArrayIndexMax *idx_list;     
190         ainfo.getSourceArray(src, idx_list, nidx);
191         
192         if(!src.isZero()) {
193             AAMLearner *l = new AAMLearner();
194             setLearner(l);
195         }
196     }
197 }
198
199 void DirectMulticastStrategy::beginProcessing(int numElements){
200     
201     messageBuf = new CkQ<CharmMessageHolder *>;    
202     handlerId = CkRegisterHandler((CmiHandler)DMHandler);    
203     
204     CkArrayID dest;
205     int nidx;
206     CkArrayIndexMax *idx_list;
207
208     ainfo.getDestinationArray(dest, idx_list, nidx);
209     sinfo = ComlibSectionInfo(dest, myInstanceID);
210 }
211
212 void DirectMulticastStrategy::handleMulticastMessage(void *msg){
213     register envelope *env = (envelope *)msg;
214     
215     CkMcastBaseMsg *cbmsg = (CkMcastBaseMsg *)EnvToUsr(env);
216
217     int status = cbmsg->_cookie.sInfo.cInfo.status;
218     ComlibPrintf("[%d] In local multicast %d\n", CkMyPe(), status);
219     
220     CkVec<CkArrayIndexMax> *dest_indices; 
221     if(status == COMLIB_MULTICAST_ALL) {        
222         ainfo.localBroadcast(env);
223     }   
224     else if(status == COMLIB_MULTICAST_NEW_SECTION){        
225         CkUnpackMessage(&env);
226         envelope *newenv;
227         sinfo.unpack(env, dest_indices, newenv);
228         ComlibArrayInfo::localMulticast(dest_indices, newenv);
229
230         CkVec<CkArrayIndexMax> *old_dest_indices;
231         ComlibSectionHashKey key(cbmsg->_cookie.pe, 
232                                  cbmsg->_cookie.sInfo.cInfo.id);
233
234         old_dest_indices = (CkVec<CkArrayIndexMax> *)sec_ht.get(key);
235         if(old_dest_indices != NULL)
236             delete old_dest_indices;
237         
238         sec_ht.put(key) = dest_indices;
239         CmiFree(env);                
240     }
241     else {
242         //status == COMLIB_MULTICAST_OLD_SECTION, use the cached section id
243         ComlibSectionHashKey key(cbmsg->_cookie.pe, 
244                                  cbmsg->_cookie.sInfo.cInfo.id);    
245         dest_indices = (CkVec<CkArrayIndexMax> *)sec_ht.get(key);
246         
247         if(dest_indices == NULL)
248             CkAbort("Destination indices is NULL\n");
249         
250         ComlibArrayInfo::localMulticast(dest_indices, env);
251     }
252 }