Adding communication library in src/ck-com and src/conv-com
[charm.git] / src / ck-com / KDirectMulticastStrategy.C
1 #include "KDirectMulticastStrategy.h"
2
3 //Group Constructor
4 KDirectMulticastStrategy::KDirectMulticastStrategy(int kf, 
5                                                    int ndest, int *pelist) 
6     : DirectMulticastStrategy(ndest, pelist), kfactor(kf) {
7     //FIXME: verify the list is sorted
8     commonKDirectInit();
9 }
10
11 //Array Constructor
12 KDirectMulticastStrategy::KDirectMulticastStrategy(int kf, 
13                                                    CkArrayID dest_aid)
14     : DirectMulticastStrategy(dest_aid), kfactor(kf){
15     commonKDirectInit();    
16 }
17
18 void KDirectMulticastStrategy::commonKDirectInit(){
19     //sort list and create a reverse map
20 }
21
22 extern int _charmHandlerIdx;
23 void KDirectMulticastStrategy::doneInserting(){
24     ComlibPrintf("%d: DoneInserting \n", CkMyPe());
25     
26     if(messageBuf->length() == 0) {
27         return;
28     }
29     
30     while(!messageBuf->isEmpty()) {
31         CharmMessageHolder *cmsg = messageBuf->deq();
32         char *msg = cmsg->getCharmMessage();
33         register envelope* env = UsrToEnv(msg);
34
35         ComlibPrintf("[%d] Calling KDirect %d %d %d\n", CkMyPe(),
36                      env->getTotalsize(), ndestpes, cmsg->dest_proc);
37                 
38         if(cmsg->dest_proc == IS_MULTICAST) {      
39             CmiSetHandler(env, handlerId);
40             
41             int *cur_pelist = NULL;
42             int cur_npes = 0;
43             
44             if(cmsg->sec_id == NULL) {
45                 cur_pelist = kdestpelist;
46                 cur_npes = kfactor;
47             }
48             else {                
49                 cur_npes = (kfactor <= cmsg->sid.npes)?kfactor : 
50                     cmsg->sid.npes;
51                 cur_pelist = cmsg->sid.pe_list;
52             }
53             
54             ComlibPrintf("[%d] Sending Message to %d\n", CkMyPe(), cur_npes);
55             CmiSyncListSendAndFree(cur_npes, cur_pelist, 
56                                    UsrToEnv(msg)->getTotalsize(), 
57                                    UsrToEnv(msg));
58         }
59         else {
60             CmiSyncSendAndFree(cmsg->dest_proc, 
61                                UsrToEnv(msg)->getTotalsize(), 
62                                (char *)UsrToEnv(msg));
63         }        
64         
65         delete cmsg; 
66     }
67 }
68
69 void KDirectMulticastStrategy::pup(PUP::er &p){
70     DirectMulticastStrategy::pup(p);
71
72     p | kfactor;
73 }
74
75 void KDirectMulticastStrategy::beginProcessing(int  nelements){
76
77     DirectMulticastStrategy::beginProcessing(nelements);
78
79     kndestpelist = new int[kfactor]; 
80
81     int next_pe = 0, count = 0;
82     //Assuming the destination pe list is sorted.
83     for(count = 0; count < ndestpes; count++)        
84         if(destpelist[count] > CkMyPe()) {
85             next_pe = count;
86             break;
87         }
88
89     int kpos = 0;
90     for(count = next_pe; count < next_pe + kfactor; count++){
91         int pe = destpelist[count % ndestpes];
92         kdestpelist[kpos ++] = pe;
93     }
94 }
95
96 void KDirectMulticastStrategy::handleMulticastMessage(void *msg){
97     register envelope *env = (envelope *)msg;
98     
99     CkMcastBaseMsg *cbmsg = (CkMcastBaseMsg *)EnvToUsr(env);
100     int src_pe = cbmsg->_cookie.pe;
101     if(isDestinationGroup){               
102         CmiSetHandler(env, _charmHandlerIdx);
103         CmiSyncSend(CkMyPe(), env->getTotalsize(), (char *)env);
104         
105         int nmsgs = getNumMessagesToSend(src_pe, CkMyPe, CkNumPes());
106         if(nmsgs > 0){            
107             CmiSetHandler(env, handlerId);            
108             CmiSyncListSendAndFree(nmsgs, kdestpelist, 
109                                    env->getTotalsize(), env);
110         }        
111         return;
112     }
113
114     int status = cbmsg->_cookie.sInfo.cInfo.status;
115     ComlibPrintf("[%d] In handle multicast message %d\n", CkMyPe(), status);
116
117     if(status == COMLIB_MULTICAST_ALL) {                        
118         int nmsgs = getNumMessagesToSend(src_pe. CkMyPe(), CkNumPes());
119         if(nmsgs > 0){ //Have to forward the messages           
120             void *msg = EnvToUsr(env);
121             void *newmsg = CkCopyMsg(&msg);
122             envelope *newenv = UsrToEnv(newmsg);        
123             CmiSyncListSendAndFree(nmsgs, kdestpelist, 
124                                    newenv->getTotalsize(), newenv);
125         }
126
127         //Multicast to all destination elements on current processor        
128         ComlibPrintf("[%d] Local multicast sending all %d\n", CkMyPe(), 
129                      localDestIndices.size());
130         
131         localMulticast(&localDestIndices, env);
132     }   
133     else if(status == COMLIB_MULTICAST_NEW_SECTION){        
134         CkUnpackMessage(&env);
135         ComlibPrintf("[%d] Received message for new section src=%d\n", 
136                      CkMyPe(), cbmsg->_cookie.pe);
137
138         ComlibMulticastMsg *ccmsg = (ComlibMulticastMsg *)cbmsg;
139         
140         KDirectHashObject *kobj = 
141             createHashObject(ccmsg->nIndices, ccmsg->indices);
142         
143         envelope *usrenv = (envelope *) ccmsg->usrMsg;
144         
145         envelope *newenv = (envelope *)CmiAlloc(usrenv->getTotalsize());
146         memcpy(newenv, usrenv, usrenv->getTotalsize());
147
148         localMulticast(&kobj->indices, newenv);
149
150         ComlibSectionHashKey key(cbmsg->_cookie.pe, 
151                                  cbmsg->_cookie.sInfo.cInfo.id);
152
153         KDirectHashObject *old_kobj = 
154             (KDirectHashObject*)sec_ht.get(key);
155         if(old_kobj != NULL)
156             delete old_kobj;
157         
158         sec_ht.put(key) = kobj;
159
160         if(kobj->npes > 0) {
161             ComlibPrintf("[%d] Forwarding Message of %d to %d pes\n", 
162                          CkMyPe(), cbmsg->_cookie.pe, kobj->npes);
163             CkPackMessage(&env);
164             CmiSyncListSendAndFree(kpbj->npes, kobj->pelist, 
165                                    env->getTotalsize(), env);
166         }
167         else
168             CmiFree(env);       
169     }
170     else {
171         //status == COMLIB_MULTICAST_OLD_SECTION, use the cached section id
172         ComlibSectionHashKey key(cbmsg->_cookie.pe, 
173                                  cbmsg->_cookie.sInfo.cInfo.id);    
174         KDirectHashObject *kobj = (KDirectHashObject *)sec_ht.get(key);
175         
176         if(kobj == NULL)
177             CkAbort("Destination indices is NULL\n");
178         
179         if(kobj->npes > 0){
180             void *msg = EnvToUsr(env);
181             void *newmsg = CkCopyMsg(&msg);
182             envelope *newenv = UsrToEnv(newmsg);        
183             CmiSyncListSendAndFree(kpbj->npes, kobj->pelist, 
184                                    newenv->getTotalsize(), newenv);
185
186         }
187         
188         localMulticast(&kobj->indices, env);
189     }
190 }
191
192 void KDirectMulticastStrategy::initSectionID(CkSectionID *sid){
193
194     ComlibPrintf("KDirect Init section ID\n");
195     sid->pelist = NULL;
196     sid->npes = 0;
197
198     int *pelist = new int[kfactor];
199     int npes;
200     getPeList(sid->_nElem,  sid->_elems, pelist, npes);
201     
202     sid->destpelist = pelist;
203     sid->ndestpes = npes;    
204 }
205
206 KDirectHashObject *KDirectMulticastStrategy::createHashObject(int nelements, CkArrayIndexMax *elements){
207
208     KDirectHashObject *kobj = new KDirectHashObject;
209     kobj->pelist = new int[kfactor];
210     getPeList(nelements,  elements, kobj->pelist, kobj->npes);
211
212     return kobj;
213 }
214
215
216 void KDirectMulticastStrategy::getPeList(int nelements, 
217                                          CkArrayIndexMax *elements, 
218                                          int *pelist, int &npes, 
219                                          int src_pe){
220     
221     npes = 0;
222     
223     int *tmp_pelist = new int[CkNumPes()];
224     int num_pes;
225     
226     //make this a reusable function call later.
227     int count = 0, acount = 0;
228     for(acount = 0; acount < nelements; acount++){
229         int p = CkArrayID::CkLocalBranch(destArrayID)->
230             lastKnown(elements[acount]);
231         
232         for(count = 0; count < num_pes; count ++)
233             if(tmp_pelist[count] == p)
234                 break;
235         
236         if(count == num_pes) {
237             tmp_pelist[num_pes ++] = p;
238         }
239     }
240
241     if(num_pes == 0) {
242         delete [] tmp_pelist;
243         return;
244     }
245
246     qsort(tmp_pelist, num_pes, sizeof(int), intCompare);
247     
248     int pdiff = 0;
249     int my_pos = 0;
250     int src_pos = 0;
251
252     int count;
253     for(count = 0; count < num_pes; count ++) {
254         if(tmp_pelist[count] == CkMyPe()){
255             my_pos = count;
256         }
257
258         if(tmp_pelist[count] == src_pos){
259             src_pos = count;
260         }        
261     }            
262
263     int n_tosend = getNumMessagesToSend(src_pos, my_pos, num_pes);
264     for(count = 0; count < n_tosend; count ++) {
265         pelist[npes ++] = tmp_pelist[(src_pos + count)%num_pes];
266     }    
267
268     delete [] tmp_pelist;    
269 }
270
271 int KDirectMulticastStrategy::getNumMessagesToSend(int src_pe, int my_pe, 
272                                                    int num_pes){
273     
274     if(src_pe == my_pe) {
275         retutn 0;
276     }
277
278     int nToSend = 0;
279
280     int pdiff = my_pe - src_pe;
281     
282     if(pdiff < 0)
283         pdiff += num_pes;
284     
285     if(pdiff % kfactor != 0)
286         return 0;
287     
288     return (num_pes - pdiff > kfactor)? kfactor : num_pes - pdiff;
289 }