New version of communication library with learning capabilities. Learning and dynamic...
[charm.git] / src / ck-com / RingMulticastStrategy.C
1 #include "RingMulticastStrategy.h"
2
3 //Group Constructor
4 RingMulticastStrategy::RingMulticastStrategy(int ndest, int *pelist) 
5     : DirectMulticastStrategy(ndest, pelist) {
6     commonRingInit();
7 }
8
9 //Array Constructor
10 RingMulticastStrategy::RingMulticastStrategy(CkArrayID dest_aid)
11     : DirectMulticastStrategy(dest_aid){
12     commonRingInit();    
13 }
14
15 //Array Constructor
16 RingMulticastStrategy::RingMulticastStrategy(CkArrayID src, CkArrayID dest)
17     : DirectMulticastStrategy(src, dest){
18     commonRingInit();    
19 }
20
21 void RingMulticastStrategy::commonRingInit(){
22     //Sort destpelist
23 }
24
25
26 void RingMulticastStrategy::insertMessage(CharmMessageHolder *cmsg){
27     if(messageBuf == NULL) {
28         CkPrintf("ERROR MESSAGE BUF IS NULL\n");
29         return;
30     }
31     
32     ComlibPrintf("[%d] Comlib Direct Multicast: insertMessage \n", 
33                  CkMyPe());   
34     
35     if(cmsg->dest_proc == IS_SECTION_MULTICAST && cmsg->sec_id != NULL) { 
36         int cur_sec_id = ComlibSectionInfo::getSectionID(*cmsg->sec_id);
37
38         if(cur_sec_id > 0) {        
39             sinfo.processOldSectionMessage(cmsg);
40         }
41         else {
42             CkSectionID *sid = cmsg->sec_id;
43
44             //New sec id, so send it along with the message
45             void *newmsg = sinfo.getNewMulticastMessage(cmsg);
46             CkFreeMsg(cmsg->getCharmMessage());
47             delete cmsg;
48             
49             initSectionID(sid);
50             cmsg = new CharmMessageHolder((char *)newmsg, 
51                                           IS_SECTION_MULTICAST); 
52             cmsg->sec_id = sid;
53         }        
54     }
55     
56     messageBuf->enq(cmsg);
57     if(!isBracketed())
58         doneInserting();
59 }
60
61 extern int _charmHandlerIdx;
62 void RingMulticastStrategy::doneInserting(){
63     ComlibPrintf("%d: DoneInserting \n", CkMyPe());
64     
65     if(messageBuf->length() == 0) {
66         return;
67     }
68
69     while(!messageBuf->isEmpty()) {
70         CharmMessageHolder *cmsg = messageBuf->deq();
71         char *msg = cmsg->getCharmMessage();
72         register envelope* env = UsrToEnv(msg);
73
74         ComlibPrintf("[%d] Calling Ring %d %d %d\n", CkMyPe(),
75                      env->getTotalsize(), ndestpes, cmsg->dest_proc);
76                 
77         if(cmsg->dest_proc == IS_SECTION_MULTICAST ||
78            cmsg->dest_proc == IS_BROADCAST) {      
79             
80             CmiSetHandler(env, handlerId);
81             
82             int dest_pe = -1;
83             RingMulticastHashObject *robj;
84             
85             if(cmsg->sec_id == NULL)
86                 dest_pe = nextPE;
87             else {
88                 robj = getHashObject(CkMyPe(), 
89                                      cmsg->sec_id->_cookie.sInfo.cInfo.id);
90                 
91                 ComlibPrintf("Gotten has obect %d\n",  robj);                
92                 CkAssert(robj != NULL);                
93                 dest_pe = robj->nextPE;
94             }
95             
96             ComlibPrintf("[%d] Sending Message to %d\n", CkMyPe(), dest_pe);
97
98             if(dest_pe != -1)
99                 CmiSyncSend(dest_pe, env->getTotalsize(), (char *)env); 
100             
101             if(getType() == ARRAY_STRATEGY) {
102                 CmiSyncSendAndFree(CkMyPe(), env->getTotalsize(), (char *)env);
103             }
104             else {
105                 CmiSetHandler(env, _charmHandlerIdx);
106                 CmiSyncSendAndFree(CkMyPe(), env->getTotalsize(), (char *)env);
107             }
108         }
109         else {
110             CmiSyncSendAndFree(cmsg->dest_proc, UsrToEnv(msg)->getTotalsize(), 
111                                (char *)UsrToEnv(msg));
112         }        
113         
114         delete cmsg; 
115     }
116 }
117
118 void RingMulticastStrategy::pup(PUP::er &p){
119
120     DirectMulticastStrategy::pup(p);
121 }
122
123 void RingMulticastStrategy::beginProcessing(int  nelements){
124
125     DirectMulticastStrategy::beginProcessing(nelements);
126
127     nextPE = -1;
128     if(ndestpes == 1)
129         return;
130
131     for(int count = 0; count < ndestpes; count++)
132         if(destpelist[count] > CkMyPe()) {
133             nextPE = destpelist[count];
134             break;
135         }
136     if(nextPE == -1)
137         nextPE = destpelist[0];
138 }
139
140 void RingMulticastStrategy::handleMulticastMessage(void *msg){
141     register envelope *env = (envelope *)msg;
142        
143     CkMcastBaseMsg *cbmsg = (CkMcastBaseMsg *)EnvToUsr(env);
144     int src_pe = cbmsg->_cookie.pe;
145     if(getType() == GROUP_STRATEGY){               
146
147         if(!isEndOfRing(nextPE, src_pe)) {
148             ComlibPrintf("[%d] Forwarding Message to %d\n", CkMyPe(), nextPE);
149             CmiSyncSend(nextPE, env->getTotalsize(), (char *)env);        
150         }
151         CmiSetHandler(env, _charmHandlerIdx);
152         CmiSyncSendAndFree(CkMyPe(), env->getTotalsize(), (char *)env);
153         
154         return;
155     }
156
157     int status = cbmsg->_cookie.sInfo.cInfo.status;
158     ComlibPrintf("[%d] In handle multicast message %d\n", CkMyPe(), status);
159
160     if(status == COMLIB_MULTICAST_ALL) {                        
161         if(src_pe != CkMyPe() && !isEndOfRing(nextPE, src_pe)) {
162             ComlibPrintf("[%d] Forwarding Message to %d\n", CkMyPe(), nextPE);
163             CmiSyncSend(nextPE, env->getTotalsize(), (char *)env); 
164         }
165
166         ainfo.localBroadcast(env);
167     }   
168     else if(status == COMLIB_MULTICAST_NEW_SECTION){        
169         CkUnpackMessage(&env);
170         ComlibPrintf("[%d] Received message for new section src=%d\n", 
171                      CkMyPe(), cbmsg->_cookie.pe);
172
173         ComlibMulticastMsg *ccmsg = (ComlibMulticastMsg *)cbmsg;
174         
175         RingMulticastHashObject *robj = 
176             createHashObject(ccmsg->nIndices, ccmsg->indices);
177         
178         envelope *usrenv = (envelope *) ccmsg->usrMsg;
179         
180         envelope *newenv = (envelope *)CmiAlloc(usrenv->getTotalsize());
181         memcpy(newenv, usrenv, usrenv->getTotalsize());
182
183         ComlibArrayInfo::localMulticast(&robj->indices, newenv);
184
185         ComlibSectionHashKey key(cbmsg->_cookie.pe, 
186                                  cbmsg->_cookie.sInfo.cInfo.id);
187
188         RingMulticastHashObject *old_robj = 
189             (RingMulticastHashObject*)sec_ht.get(key);
190         if(old_robj != NULL)
191             delete old_robj;
192         
193         sec_ht.put(key) = robj;
194
195         if(src_pe != CkMyPe() && !isEndOfRing(robj->nextPE, src_pe)) {
196             ComlibPrintf("[%d] Forwarding Message of %d to %d\n", CkMyPe(), 
197                          cbmsg->_cookie.pe, robj->nextPE);
198             CkPackMessage(&env);
199             CmiSyncSendAndFree(robj->nextPE, env->getTotalsize(), 
200                                (char *)env);
201         }
202         else
203             CmiFree(env);       
204     }
205     else {
206         //status == COMLIB_MULTICAST_OLD_SECTION, use the cached section id
207         ComlibSectionHashKey key(cbmsg->_cookie.pe, 
208                                  cbmsg->_cookie.sInfo.cInfo.id);    
209         RingMulticastHashObject *robj = (RingMulticastHashObject *)sec_ht.
210             get(key);
211         
212         if(robj == NULL)
213             CkAbort("Destination indices is NULL\n");
214         
215         if(src_pe != CkMyPe() && !isEndOfRing(robj->nextPE, src_pe)) {
216             CmiSyncSend(robj->nextPE, env->getTotalsize(), (char *)env);
217             ComlibPrintf("[%d] Forwarding Message to %d\n", CkMyPe(), 
218                          robj->nextPE);
219         }
220         
221         ComlibArrayInfo::localMulticast(&robj->indices, env);
222     }
223 }
224
225 void RingMulticastStrategy::initSectionID(CkSectionID *sid){
226
227     ComlibPrintf("Ring Init section ID\n");
228     sid->pelist = NULL;
229     sid->npes = 0;
230
231     RingMulticastHashObject *robj = 
232         createHashObject(sid->_nElems, sid->_elems);
233     
234     ComlibSectionHashKey key(CkMyPe(), sid->_cookie.sInfo.cInfo.id);
235     sec_ht.put(key) = robj;
236 }
237
238 RingMulticastHashObject *RingMulticastStrategy::createHashObject
239 (int nelements, CkArrayIndexMax *elements){
240
241     RingMulticastHashObject *robj = new RingMulticastHashObject;
242
243     int next_pe = CkNumPes();
244     int acount = 0;
245     int min_dest = CkNumPes();
246     for(acount = 0; acount < nelements; acount++){
247         //elements[acount].print();
248         
249         CkArrayID dest;
250         int nidx;
251         CkArrayIndexMax *idx_list;        
252         ainfo.getDestinationArray(dest, idx_list, nidx);
253
254         int p = ComlibGetLastKnown(dest, elements[acount]);
255         //CkArrayID::CkLocalBranch(dest)->lastKnown(elements[acount]);
256         
257         if(p < min_dest)
258             min_dest = p;
259         
260         if(p > CkMyPe() && next_pe > p) 
261             next_pe = p;       
262
263         if (p == CkMyPe())
264             robj->indices.insertAtEnd(elements[acount]);
265     }
266     
267     //Recycle the destination pelist and start from the begining
268     if(next_pe == CkNumPes() && min_dest != CkMyPe())        
269         next_pe = min_dest;
270     
271     if(next_pe == CkNumPes())
272         next_pe = -1;
273
274     robj->nextPE = next_pe;
275
276     return robj;
277 }
278
279
280 RingMulticastHashObject *RingMulticastStrategy::getHashObject(int pe, int id){
281     
282     ComlibSectionHashKey key(pe, id);
283     RingMulticastHashObject *robj = (RingMulticastHashObject *)sec_ht.get(key);
284     return robj;
285 }
286
287 int RingMulticastStrategy::isEndOfRing(int next_pe, int src_pe){
288
289     if(next_pe < 0)
290         return 1;
291
292     ComlibPrintf("[%d] isEndofring %d, %d\n", CkMyPe(), next_pe, src_pe);
293
294     if(next_pe > CkMyPe()){
295         if(src_pe <= next_pe && src_pe > CkMyPe())
296             return 1;
297
298         return 0;
299     }
300     
301     //next_pe < CkMyPe()
302
303     if(src_pe > CkMyPe() || src_pe <= next_pe)
304         return 1;
305     
306     return 0;
307 }