Adding communication library in src/ck-com and src/conv-com
[charm.git] / src / ck-com / RingMulticastStrategy.C
1 #include "RingMulticastStrategy.h"
2
3 //Group Constructor
4 RingMulticastStrategy::RingMulticastStrategy(int ndest, int *pelist) 
5     : DirectMulticastStrategy(ndest, pelist) {
6     commonRingInit();
7 }
8
9 //Array Constructor
10 RingMulticastStrategy::RingMulticastStrategy(CkArrayID dest_aid)
11     : DirectMulticastStrategy(dest_aid){
12     commonRingInit();    
13 }
14
15 void RingMulticastStrategy::commonRingInit(){
16     //Sort destpelist
17 }
18
19 extern int _charmHandlerIdx;
20 void RingMulticastStrategy::doneInserting(){
21     ComlibPrintf("%d: DoneInserting \n", CkMyPe());
22     
23     if(messageBuf->length() == 0) {
24         return;
25     }
26
27     while(!messageBuf->isEmpty()) {
28         CharmMessageHolder *cmsg = messageBuf->deq();
29         char *msg = cmsg->getCharmMessage();
30         register envelope* env = UsrToEnv(msg);
31
32         ComlibPrintf("[%d] Calling Ring %d %d %d\n", CkMyPe(),
33                      env->getTotalsize(), ndestpes, cmsg->dest_proc);
34                 
35         if(cmsg->dest_proc == IS_MULTICAST) {      
36             CmiSetHandler(env, handlerId);
37             
38             int dest_pe = -1;
39             RingMulticastHashObject *robj;
40
41             if(cmsg->sec_id == NULL)
42                 dest_pe = nextPE;
43             else {
44                 robj = getHashObject(CkMyPe(), 
45                                      cmsg->sec_id->_cookie.sInfo.cInfo.id);
46                                 
47                 ComlibPrintf("Gotten has obect %d\n",  robj);
48
49                 CkAssert(robj != NULL);
50
51                 dest_pe = robj->nextPE;
52             }
53             
54             ComlibPrintf("[%d] Sending Message to %d\n", CkMyPe(), dest_pe);
55
56             if(dest_pe != -1)
57                 CmiSyncSend(dest_pe, env->getTotalsize(), (char *)env); 
58             
59             if(isDestinationArray) {
60                 /*
61                 if(robj != NULL)
62                     localMulticast(&robj->indices, env);
63                 else
64                     localMulticast(&localDestIndices, env);
65                 */
66                 CmiSyncSendAndFree(CkMyPe(), env->getTotalsize(), (char *)env);
67             }
68             else {
69                 CmiSetHandler(env, _charmHandlerIdx);
70                 CmiSyncSendAndFree(CkMyPe(), env->getTotalsize(), (char *)env);
71             }
72         }
73         else {
74             CmiSyncSendAndFree(cmsg->dest_proc, UsrToEnv(msg)->getTotalsize(), 
75                                (char *)UsrToEnv(msg));
76         }        
77         
78         delete cmsg; 
79     }
80 }
81
82 void RingMulticastStrategy::pup(PUP::er &p){
83
84     DirectMulticastStrategy::pup(p);
85 }
86
87 void RingMulticastStrategy::beginProcessing(int  nelements){
88
89     DirectMulticastStrategy::beginProcessing(nelements);
90
91     nextPE = -1;
92     if(ndestpes == 1)
93         return;
94
95     for(int count = 0; count < ndestpes; count++)
96         if(destpelist[count] > CkMyPe()) {
97             nextPE = destpelist[count];
98             break;
99         }
100     if(nextPE == -1)
101         nextPE = destpelist[0];
102 }
103
104 void RingMulticastStrategy::handleMulticastMessage(void *msg){
105     register envelope *env = (envelope *)msg;
106        
107     CkMcastBaseMsg *cbmsg = (CkMcastBaseMsg *)EnvToUsr(env);
108     int src_pe = cbmsg->_cookie.pe;
109     if(isDestinationGroup){               
110
111         if(!isEndOfRing(nextPE, src_pe)) {
112             ComlibPrintf("[%d] Forwarding Message to %d\n", CkMyPe(), nextPE);
113             CmiSyncSend(nextPE, env->getTotalsize(), (char *)env);        
114         }
115         CmiSetHandler(env, _charmHandlerIdx);
116         CmiSyncSendAndFree(CkMyPe(), env->getTotalsize(), (char *)env);
117         
118         return;
119     }
120
121     int status = cbmsg->_cookie.sInfo.cInfo.status;
122     ComlibPrintf("[%d] In handle multicast message %d\n", CkMyPe(), status);
123
124     if(status == COMLIB_MULTICAST_ALL) {                        
125         if(src_pe != CkMyPe() && !isEndOfRing(nextPE, src_pe)) {
126             ComlibPrintf("[%d] Forwarding Message to %d\n", CkMyPe(), nextPE);
127             CmiSyncSend(nextPE, env->getTotalsize(), (char *)env); 
128         }
129
130         //Multicast to all destination elements on current processor        
131         ComlibPrintf("[%d] Local multicast sending all %d\n", CkMyPe(), 
132                      localDestIndices.size());
133         
134         localMulticast(&localDestIndices, env);
135     }   
136     else if(status == COMLIB_MULTICAST_NEW_SECTION){        
137         CkUnpackMessage(&env);
138         ComlibPrintf("[%d] Received message for new section src=%d\n", 
139                      CkMyPe(), cbmsg->_cookie.pe);
140
141         ComlibMulticastMsg *ccmsg = (ComlibMulticastMsg *)cbmsg;
142         
143         RingMulticastHashObject *robj = 
144             createHashObject(ccmsg->nIndices, ccmsg->indices);
145         
146         envelope *usrenv = (envelope *) ccmsg->usrMsg;
147         
148         envelope *newenv = (envelope *)CmiAlloc(usrenv->getTotalsize());
149         memcpy(newenv, usrenv, usrenv->getTotalsize());
150
151         localMulticast(&robj->indices, newenv);
152
153         ComlibSectionHashKey key(cbmsg->_cookie.pe, 
154                                  cbmsg->_cookie.sInfo.cInfo.id);
155
156         RingMulticastHashObject *old_robj = 
157             (RingMulticastHashObject*)sec_ht.get(key);
158         if(old_robj != NULL)
159             delete old_robj;
160         
161         sec_ht.put(key) = robj;
162
163         if(src_pe != CkMyPe() && !isEndOfRing(robj->nextPE, src_pe)) {
164             ComlibPrintf("[%d] Forwarding Message of %d to %d\n", CkMyPe(), 
165                          cbmsg->_cookie.pe, robj->nextPE);
166             CkPackMessage(&env);
167             CmiSyncSendAndFree(robj->nextPE, env->getTotalsize(), 
168                                (char *)env);
169         }
170         else
171             CmiFree(env);       
172     }
173     else {
174         //status == COMLIB_MULTICAST_OLD_SECTION, use the cached section id
175         ComlibSectionHashKey key(cbmsg->_cookie.pe, 
176                                  cbmsg->_cookie.sInfo.cInfo.id);    
177         RingMulticastHashObject *robj = (RingMulticastHashObject *)sec_ht.
178             get(key);
179         
180         if(robj == NULL)
181             CkAbort("Destination indices is NULL\n");
182         
183         if(src_pe != CkMyPe() && !isEndOfRing(robj->nextPE, src_pe)) {
184             CmiSyncSend(robj->nextPE, env->getTotalsize(), (char *)env);
185             ComlibPrintf("[%d] Forwarding Message to %d\n", CkMyPe(), 
186                          robj->nextPE);
187         }
188         
189         localMulticast(&robj->indices, env);
190     }
191 }
192
193 void RingMulticastStrategy::initSectionID(CkSectionID *sid){
194
195     CkPrintf("Ring Init section ID\n");
196     sid->pelist = NULL;
197     sid->npes = 0;
198
199     RingMulticastHashObject *robj = 
200         createHashObject(sid->_nElems, sid->_elems);
201     
202     ComlibSectionHashKey key(CkMyPe(), sid->_cookie.sInfo.cInfo.id);
203     sec_ht.put(key) = robj;
204 }
205
206 RingMulticastHashObject *RingMulticastStrategy::createHashObject
207 (int nelements, CkArrayIndexMax *elements){
208
209     RingMulticastHashObject *robj = new RingMulticastHashObject;
210
211     int next_pe = CkNumPes();
212     int acount = 0;
213     int min_dest = CkNumPes();
214     for(acount = 0; acount < nelements; acount++){
215         //elements[acount].print();
216         int p = CkArrayID::CkLocalBranch(destArrayID)->
217             lastKnown(elements[acount]);
218         
219         if(p < min_dest)
220             min_dest = p;
221         
222         if(p > CkMyPe() && next_pe > p) 
223             next_pe = p;       
224
225         if (p == CkMyPe())
226             robj->indices.insertAtEnd(elements[acount]);
227     }
228     
229     //Recycle the destination pelist and start from the begining
230     if(next_pe == CkNumPes() && min_dest != CkMyPe())        
231         next_pe = min_dest;
232     
233     if(next_pe == CkNumPes())
234         next_pe = -1;
235
236     robj->nextPE = next_pe;
237
238     return robj;
239 }
240
241
242 RingMulticastHashObject *RingMulticastStrategy::getHashObject(int pe, int id){
243     
244     ComlibSectionHashKey key(pe, id);
245     RingMulticastHashObject *robj = (RingMulticastHashObject *)sec_ht.get(key);
246     return robj;
247 }
248
249 int RingMulticastStrategy::isEndOfRing(int next_pe, int src_pe){
250
251     if(next_pe < 0)
252         return 1;
253
254     ComlibPrintf("[%d] isEndofring %d, %d\n", CkMyPe(), next_pe, src_pe);
255
256     if(next_pe > CkMyPe()){
257         if(src_pe <= next_pe && src_pe > CkMyPe())
258             return 1;
259
260         return 0;
261     }
262     
263     //next_pe < CkMyPe()
264
265     if(src_pe > CkMyPe() || src_pe <= next_pe)
266         return 1;
267     
268     return 0;
269 }