New version with migration and forwarding always working. The test program also works.
[charm.git] / src / ck-com / RingMulticastStrategy.C
1 #include "RingMulticastStrategy.h"
2
3 //Group Constructor
4 RingMulticastStrategy::RingMulticastStrategy(int ndest, int *pelist) 
5     : DirectMulticastStrategy(ndest, pelist) {
6     commonRingInit();
7 }
8
9 //Array Constructor
10 RingMulticastStrategy::RingMulticastStrategy(CkArrayID dest_aid)
11     : DirectMulticastStrategy(dest_aid){
12     commonRingInit();    
13 }
14
15 void RingMulticastStrategy::commonRingInit(){
16     //Sort destpelist
17 }
18
19 extern int _charmHandlerIdx;
20 void RingMulticastStrategy::doneInserting(){
21     ComlibPrintf("%d: DoneInserting \n", CkMyPe());
22     
23     if(messageBuf->length() == 0) {
24         return;
25     }
26
27     while(!messageBuf->isEmpty()) {
28         CharmMessageHolder *cmsg = messageBuf->deq();
29         char *msg = cmsg->getCharmMessage();
30         register envelope* env = UsrToEnv(msg);
31
32         ComlibPrintf("[%d] Calling Ring %d %d %d\n", CkMyPe(),
33                      env->getTotalsize(), ndestpes, cmsg->dest_proc);
34                 
35         if(cmsg->dest_proc == IS_MULTICAST) {      
36             CmiSetHandler(env, handlerId);
37             
38             int dest_pe = -1;
39             RingMulticastHashObject *robj;
40
41             if(cmsg->sec_id == NULL)
42                 dest_pe = nextPE;
43             else {
44                 robj = getHashObject(CkMyPe(), 
45                                      cmsg->sec_id->_cookie.sInfo.cInfo.id);
46                                 
47                 ComlibPrintf("Gotten has obect %d\n",  robj);
48
49                 CkAssert(robj != NULL);
50
51                 dest_pe = robj->nextPE;
52             }
53             
54             ComlibPrintf("[%d] Sending Message to %d\n", CkMyPe(), dest_pe);
55
56             if(dest_pe != -1)
57                 CmiSyncSend(dest_pe, env->getTotalsize(), (char *)env); 
58             
59             if(getType() == ARRAY_STRATEGY) {
60                 CmiSyncSendAndFree(CkMyPe(), env->getTotalsize(), (char *)env);
61             }
62             else {
63                 CmiSetHandler(env, _charmHandlerIdx);
64                 CmiSyncSendAndFree(CkMyPe(), env->getTotalsize(), (char *)env);
65             }
66         }
67         else {
68             CmiSyncSendAndFree(cmsg->dest_proc, UsrToEnv(msg)->getTotalsize(), 
69                                (char *)UsrToEnv(msg));
70         }        
71         
72         delete cmsg; 
73     }
74 }
75
76 void RingMulticastStrategy::pup(PUP::er &p){
77
78     DirectMulticastStrategy::pup(p);
79 }
80
81 void RingMulticastStrategy::beginProcessing(int  nelements){
82
83     DirectMulticastStrategy::beginProcessing(nelements);
84
85     nextPE = -1;
86     if(ndestpes == 1)
87         return;
88
89     for(int count = 0; count < ndestpes; count++)
90         if(destpelist[count] > CkMyPe()) {
91             nextPE = destpelist[count];
92             break;
93         }
94     if(nextPE == -1)
95         nextPE = destpelist[0];
96 }
97
98 void RingMulticastStrategy::handleMulticastMessage(void *msg){
99     register envelope *env = (envelope *)msg;
100        
101     CkMcastBaseMsg *cbmsg = (CkMcastBaseMsg *)EnvToUsr(env);
102     int src_pe = cbmsg->_cookie.pe;
103     if(getType() == GROUP_STRATEGY){               
104
105         if(!isEndOfRing(nextPE, src_pe)) {
106             ComlibPrintf("[%d] Forwarding Message to %d\n", CkMyPe(), nextPE);
107             CmiSyncSend(nextPE, env->getTotalsize(), (char *)env);        
108         }
109         CmiSetHandler(env, _charmHandlerIdx);
110         CmiSyncSendAndFree(CkMyPe(), env->getTotalsize(), (char *)env);
111         
112         return;
113     }
114
115     int status = cbmsg->_cookie.sInfo.cInfo.status;
116     ComlibPrintf("[%d] In handle multicast message %d\n", CkMyPe(), status);
117
118     if(status == COMLIB_MULTICAST_ALL) {                        
119         if(src_pe != CkMyPe() && !isEndOfRing(nextPE, src_pe)) {
120             ComlibPrintf("[%d] Forwarding Message to %d\n", CkMyPe(), nextPE);
121             CmiSyncSend(nextPE, env->getTotalsize(), (char *)env); 
122         }
123
124         ainfo.localBroadcast(env);
125     }   
126     else if(status == COMLIB_MULTICAST_NEW_SECTION){        
127         CkUnpackMessage(&env);
128         ComlibPrintf("[%d] Received message for new section src=%d\n", 
129                      CkMyPe(), cbmsg->_cookie.pe);
130
131         ComlibMulticastMsg *ccmsg = (ComlibMulticastMsg *)cbmsg;
132         
133         RingMulticastHashObject *robj = 
134             createHashObject(ccmsg->nIndices, ccmsg->indices);
135         
136         envelope *usrenv = (envelope *) ccmsg->usrMsg;
137         
138         envelope *newenv = (envelope *)CmiAlloc(usrenv->getTotalsize());
139         memcpy(newenv, usrenv, usrenv->getTotalsize());
140
141         ComlibArrayInfo::localMulticast(&robj->indices, newenv);
142
143         ComlibSectionHashKey key(cbmsg->_cookie.pe, 
144                                  cbmsg->_cookie.sInfo.cInfo.id);
145
146         RingMulticastHashObject *old_robj = 
147             (RingMulticastHashObject*)sec_ht.get(key);
148         if(old_robj != NULL)
149             delete old_robj;
150         
151         sec_ht.put(key) = robj;
152
153         if(src_pe != CkMyPe() && !isEndOfRing(robj->nextPE, src_pe)) {
154             ComlibPrintf("[%d] Forwarding Message of %d to %d\n", CkMyPe(), 
155                          cbmsg->_cookie.pe, robj->nextPE);
156             CkPackMessage(&env);
157             CmiSyncSendAndFree(robj->nextPE, env->getTotalsize(), 
158                                (char *)env);
159         }
160         else
161             CmiFree(env);       
162     }
163     else {
164         //status == COMLIB_MULTICAST_OLD_SECTION, use the cached section id
165         ComlibSectionHashKey key(cbmsg->_cookie.pe, 
166                                  cbmsg->_cookie.sInfo.cInfo.id);    
167         RingMulticastHashObject *robj = (RingMulticastHashObject *)sec_ht.
168             get(key);
169         
170         if(robj == NULL)
171             CkAbort("Destination indices is NULL\n");
172         
173         if(src_pe != CkMyPe() && !isEndOfRing(robj->nextPE, src_pe)) {
174             CmiSyncSend(robj->nextPE, env->getTotalsize(), (char *)env);
175             ComlibPrintf("[%d] Forwarding Message to %d\n", CkMyPe(), 
176                          robj->nextPE);
177         }
178         
179         ComlibArrayInfo::localMulticast(&robj->indices, env);
180     }
181 }
182
183 void RingMulticastStrategy::initSectionID(CkSectionID *sid){
184
185     CkPrintf("Ring Init section ID\n");
186     sid->pelist = NULL;
187     sid->npes = 0;
188
189     RingMulticastHashObject *robj = 
190         createHashObject(sid->_nElems, sid->_elems);
191     
192     ComlibSectionHashKey key(CkMyPe(), sid->_cookie.sInfo.cInfo.id);
193     sec_ht.put(key) = robj;
194 }
195
196 RingMulticastHashObject *RingMulticastStrategy::createHashObject
197 (int nelements, CkArrayIndexMax *elements){
198
199     RingMulticastHashObject *robj = new RingMulticastHashObject;
200
201     int next_pe = CkNumPes();
202     int acount = 0;
203     int min_dest = CkNumPes();
204     for(acount = 0; acount < nelements; acount++){
205         //elements[acount].print();
206         
207         CkArrayID dest;
208         int nidx;
209         CkArrayIndexMax *idx_list;        
210         ainfo.getDestinationArray(dest, idx_list, nidx);
211
212         int p = CkArrayID::CkLocalBranch(dest)->lastKnown(elements[acount]);
213         
214         if(p < min_dest)
215             min_dest = p;
216         
217         if(p > CkMyPe() && next_pe > p) 
218             next_pe = p;       
219
220         if (p == CkMyPe())
221             robj->indices.insertAtEnd(elements[acount]);
222     }
223     
224     //Recycle the destination pelist and start from the begining
225     if(next_pe == CkNumPes() && min_dest != CkMyPe())        
226         next_pe = min_dest;
227     
228     if(next_pe == CkNumPes())
229         next_pe = -1;
230
231     robj->nextPE = next_pe;
232
233     return robj;
234 }
235
236
237 RingMulticastHashObject *RingMulticastStrategy::getHashObject(int pe, int id){
238     
239     ComlibSectionHashKey key(pe, id);
240     RingMulticastHashObject *robj = (RingMulticastHashObject *)sec_ht.get(key);
241     return robj;
242 }
243
244 int RingMulticastStrategy::isEndOfRing(int next_pe, int src_pe){
245
246     if(next_pe < 0)
247         return 1;
248
249     ComlibPrintf("[%d] isEndofring %d, %d\n", CkMyPe(), next_pe, src_pe);
250
251     if(next_pe > CkMyPe()){
252         if(src_pe <= next_pe && src_pe > CkMyPe())
253             return 1;
254
255         return 0;
256     }
257     
258     //next_pe < CkMyPe()
259
260     if(src_pe > CkMyPe() || src_pe <= next_pe)
261         return 1;
262     
263     return 0;
264 }