Adding communication library in src/ck-com and src/conv-com
[charm.git] / src / ck-com / DirectMulticastStrategy.C
1 #include "DirectMulticastStrategy.h"
2
3 int intCompare(void *a, void *b){
4     int a1 = *(int *) a;
5     int b1 = *(int *) b;
6
7     if(a1 < b1)
8         return -1;
9     
10     if(a1 == b1)
11         return 0;
12
13     if(a1 > b1)
14         return 1;
15
16     return 0;
17 }    
18
19 //ComlibSectionHashKey CODE
20 int ComlibSectionHashKey::staticCompare(const void *k1,const void *k2,size_t ){
21     return ((const ComlibSectionHashKey *)k1)->
22                 compare(*(const ComlibSectionHashKey *)k2);
23 }
24
25 CkHashCode ComlibSectionHashKey::staticHash(const void *v,size_t){
26     return ((const ComlibSectionHashKey *)v)->hash();
27 }
28
29 CkpvExtern(CkGroupID, cmgrID);
30
31 void *DMHandler(void *msg){
32     ComlibPrintf("[%d]:In CallbackHandler\n", CkMyPe());
33     DirectMulticastStrategy *nm_mgr;    
34     
35     CkMcastBaseMsg *bmsg = (CkMcastBaseMsg *)EnvToUsr((envelope *)msg);
36     int instid = bmsg->_cookie.sInfo.cInfo.instId;
37     
38     nm_mgr = (DirectMulticastStrategy *) 
39         CProxy_ComlibManager(CkpvAccess(cmgrID)).
40         ckLocalBranch()->getStrategy(instid);
41     
42     nm_mgr->handleMulticastMessage(msg);
43     return NULL;
44 }
45
46 //Group Constructor
47 DirectMulticastStrategy::DirectMulticastStrategy(int ndest, int *pelist)
48     : CharmStrategy() {
49  
50     isDestinationArray = 0;
51     isDestinationGroup = 1;
52
53     setType(GROUP_STRATEGY);
54     
55     ndestpes = ndest;
56     destpelist = pelist;
57
58     commonInit();
59 }
60
61 DirectMulticastStrategy::DirectMulticastStrategy(CkArrayID aid)
62     : destArrayID(aid), CharmStrategy() {
63
64     isDestinationArray = 1;
65     isDestinationGroup = 0;
66
67     setType(ARRAY_STRATEGY);
68
69     ndestpes = 0;
70     destpelist = 0;
71
72     commonInit();
73 }
74
75 void DirectMulticastStrategy::commonInit(){
76     if(ndestpes == 0) {
77         ndestpes = CkNumPes();
78         destpelist = new int[CkNumPes()];
79         for(int count = 0; count < CkNumPes(); count ++)
80             destpelist[count] = count;        
81     }
82 }
83
84 void DirectMulticastStrategy::insertMessage(CharmMessageHolder *cmsg){
85     if(messageBuf == NULL) {
86         CkPrintf("ERROR MESSAGE BUF IS NULL\n");
87         return;
88     }
89
90     ComlibPrintf("[%d] Comlib Direct Multicast: insertMessage \n", 
91                  CkMyPe());   
92    
93     if(cmsg->dest_proc == IS_MULTICAST && cmsg->sec_id != NULL) {        
94         int cur_sec_id = cmsg->sec_id->_cookie.sInfo.cInfo.id;
95
96         if(cur_sec_id > 0) {        
97             //Old section id, send the id with the message
98             CkMcastBaseMsg *cbmsg = (CkMcastBaseMsg *)cmsg->getCharmMessage();
99             cbmsg->_cookie.sInfo.cInfo.id = cur_sec_id;
100             cbmsg->_cookie.sInfo.cInfo.status = COMLIB_MULTICAST_OLD_SECTION;
101         }
102         else {
103             CkSectionID *sid = cmsg->sec_id;
104
105             //New sec id, so send it along with the message
106             void *newmsg = (void *)getNewMulticastMessage(cmsg);
107             CkFreeMsg(cmsg->getCharmMessage());
108             delete cmsg;
109             
110             initSectionID(sid);
111
112             cmsg = new CharmMessageHolder((char *)newmsg, IS_MULTICAST); 
113             cmsg->sec_id = sid;
114         }        
115     }
116     
117     messageBuf->enq(cmsg);
118     if(!isBracketed())
119         doneInserting();
120 }
121
122 void DirectMulticastStrategy::doneInserting(){
123     ComlibPrintf("%d: DoneInserting \n", CkMyPe());
124     
125     if(messageBuf->length() == 0) {
126         return;
127     }
128
129     while(!messageBuf->isEmpty()) {
130         CharmMessageHolder *cmsg = messageBuf->deq();
131         char *msg = cmsg->getCharmMessage();
132                 
133         if(cmsg->dest_proc == IS_MULTICAST) {      
134             if(isDestinationArray)
135                 CmiSetHandler(UsrToEnv(msg), handlerId);
136             
137             int *cur_map = destpelist;
138             int cur_npes = ndestpes;
139             if(cmsg->sec_id != NULL && cmsg->sec_id->pelist != NULL) {
140                 cur_map = cmsg->sec_id->pelist;
141                 cur_npes = cmsg->sec_id->npes;
142             }
143             
144             ComlibPrintf("[%d] Calling Direct Multicast %d %d %d\n", CkMyPe(),
145                          UsrToEnv(msg)->getTotalsize(), cur_npes, 
146                          cmsg->dest_proc);
147             /*
148             for(int i=0; i < cur_npes; i++)
149                 CkPrintf("[%d] Sending to %d %d\n", CkMyPe(), 
150                          cur_map[i], cur_npes);
151             */
152
153             CmiSyncListSendAndFree(cur_npes, cur_map, 
154                                    UsrToEnv(msg)->getTotalsize(), 
155                                    (char*)(UsrToEnv(msg)));            
156         }
157         else {
158             //CkPrintf("SHOULD NOT BE HERE\n");
159             CmiSyncSendAndFree(cmsg->dest_proc, UsrToEnv(msg)->getTotalsize(), 
160                                (char *)UsrToEnv(msg));
161         }        
162         
163         delete cmsg; 
164     }
165 }
166
167 void DirectMulticastStrategy::pup(PUP::er &p){
168
169     CharmStrategy::pup(p);
170
171     p | ndestpes;
172     p | destArrayID;
173     p | isDestinationArray;
174     p | isDestinationGroup;           
175
176     if(p.isUnpacking())
177         destpelist = new int[ndestpes];
178     p(destpelist, ndestpes);        
179 }
180
181 void DirectMulticastStrategy::beginProcessing(int numElements){
182     
183     messageBuf = new CkQ<CharmMessageHolder *>;    
184     handlerId = CkRegisterHandler((CmiHandler)DMHandler);    
185     
186     if(isDestinationArray) {
187         CkArray *dest_array = CkArrayID::CkLocalBranch(destArrayID);
188         dest_array->getComlibArrayListener()->getLocalIndices
189             (localDestIndices);
190     }
191
192     MaxSectionID = 1;
193 }
194
195 void DirectMulticastStrategy::handleMulticastMessage(void *msg){
196     register envelope *env = (envelope *)msg;
197     
198     CkMcastBaseMsg *cbmsg = (CkMcastBaseMsg *)EnvToUsr(env);
199
200     int status = cbmsg->_cookie.sInfo.cInfo.status;
201     ComlibPrintf("[%d] In local multicast %d\n", CkMyPe(), status);
202     
203     CkVec<CkArrayIndexMax> *dest_indices; 
204     if(status == COMLIB_MULTICAST_ALL) {        
205         //Multicast to all destination elements on current processor        
206         ComlibPrintf("[%d] Local multicast sending all %d\n", CkMyPe(), 
207                      localDestIndices.size());
208
209         localMulticast(&localDestIndices, env);
210     }   
211     else if(status == COMLIB_MULTICAST_NEW_SECTION){        
212         CkUnpackMessage(&env);
213         dest_indices = new CkVec<CkArrayIndexMax>;
214
215         ComlibPrintf("[%d] Received message for new section %d %d\n", 
216                      CkMyPe(), cbmsg->_cookie.pe, 
217                      cbmsg->_cookie.sInfo.cInfo.id);
218
219         ComlibMulticastMsg *ccmsg = (ComlibMulticastMsg *)cbmsg;
220         for(int count = 0; count < ccmsg->nIndices; count++){
221             CkArrayIndexMax idx = ccmsg->indices[count];
222             //idx.print();
223             int dest_proc =CkArrayID::CkLocalBranch(destArrayID)
224                 ->lastKnown(idx);
225             
226             if(dest_proc == CkMyPe())
227                 dest_indices->insertAtEnd(idx);                        
228         }            
229         
230         envelope *usrenv = (envelope *) ccmsg->usrMsg;
231         envelope *newenv = (envelope *)CmiAlloc(usrenv->getTotalsize());
232         memcpy(newenv, ccmsg->usrMsg, usrenv->getTotalsize());
233         localMulticast(dest_indices, newenv);
234
235         CkVec<CkArrayIndexMax> *old_dest_indices;
236         ComlibSectionHashKey key(cbmsg->_cookie.pe, 
237                                  cbmsg->_cookie.sInfo.cInfo.id);
238
239         old_dest_indices = (CkVec<CkArrayIndexMax> *)sec_ht.get(key);
240         if(old_dest_indices != NULL)
241             delete old_dest_indices;
242         
243         sec_ht.put(key) = dest_indices;
244         CmiFree(env);                
245     }
246     else {
247         //status == COMLIB_MULTICAST_OLD_SECTION, use the cached section id
248         ComlibSectionHashKey key(cbmsg->_cookie.pe, 
249                                  cbmsg->_cookie.sInfo.cInfo.id);    
250         dest_indices = (CkVec<CkArrayIndexMax> *)sec_ht.get(key);
251         
252         if(dest_indices == NULL)
253             CkAbort("Destination indices is NULL\n");
254         
255         localMulticast(dest_indices, env);
256     }
257 }
258
259 #include "register.h"
260 void DirectMulticastStrategy::localMulticast(CkVec<CkArrayIndexMax>*vec, 
261                                                  envelope *env){
262     
263     //Multicast the messages to all elements in vec
264     int nelements = vec->size();
265     if(nelements == 0) {
266         CmiFree(env);
267         return;
268     }
269     
270     void *msg = EnvToUsr(env);    
271     int ep = env->getsetArrayEp();
272     CkUnpackMessage(&env);
273
274     env->getsetArrayMgr() = destArrayID;
275     env->setPacked(0); 
276     env->getsetArrayHops()=1;  
277     env->setUsed(0);
278
279     for(int count = 0; count < nelements; count ++){        
280         CkArrayIndexMax idx = (*vec)[count];
281         
282         ComlibPrintf("[%d] Sending multicast message to ", CkMyPe());        
283         if(comm_debug) idx.print();     
284         /*
285         CProxyElement_ArrayBase ap(destArrayID, idx);
286         ArrayElement *elem = ap.ckLocal();
287         if(elem != NULL) {
288             CkDeliverMessageReadonly(ep, msg, elem);        
289         }
290         else { //Element migrated away?
291             void *newmsg = CkCopyMsg(&msg);
292             ap.ckSend((CkArrayMessage *)newmsg, ep);            
293         }        
294         */        
295
296         env->getsetArrayIndex() = idx;
297
298         CkArray *a=(CkArray *)_localBranch(destArrayID);
299         if(_entryTable[ep]->noKeep) 
300             a->deliver((CkArrayMessage *)msg, CkDeliver_inline, CmiFalse);        
301         else {
302             void *newmsg = CkCopyMsg(&msg);
303             a->deliver((CkArrayMessage *)newmsg, CkDeliver_queue, CmiTrue);             
304         }
305         
306     }
307     
308     CmiFree(env);
309 }
310
311 void DirectMulticastStrategy::initSectionID(CkSectionID *sid){
312     
313     if(sid->npes > 0) 
314         return;
315     
316     //CkPrintf("NDESTPES = %d\n", ndestpes);
317
318     sid->pelist = new int[ndestpes];
319     sid->npes = 0;
320     
321     int count = 0, acount = 0;
322     for(acount = 0; acount < sid->_nElems; acount++){
323         int p = CkArrayID::CkLocalBranch(destArrayID)->
324             lastKnown(sid->_elems[acount]);
325         
326         for(count = 0; count < sid->npes; count ++)
327             if(sid->pelist[count] == p)
328                 break;
329         
330         if(count == sid->npes) {
331             sid->pelist[sid->npes ++] = p;
332         }
333     } 
334     
335 }
336
337
338 ComlibMulticastMsg * DirectMulticastStrategy::getNewMulticastMessage
339 (CharmMessageHolder *cmsg){
340     
341     if(cmsg->sec_id == NULL || cmsg->sec_id->_nElems == 0)
342         return NULL;
343
344     void *m = cmsg->getCharmMessage();
345     envelope *env = UsrToEnv(m);
346     
347     //if(cmsg->sec_id->_cookie.sInfo.cInfo.id == 0) {  //New Section ID;
348     CkPackMessage(&env);
349     int sizes[2];
350     sizes[0] = cmsg->sec_id->_nElems;
351     sizes[1] = env->getTotalsize();                
352     
353     cmsg->sec_id->_cookie.sInfo.cInfo.id = MaxSectionID ++;
354     
355     ComlibPrintf("Creating new comlib multicast message %d, %d\n", sizes[0], sizes[1]);
356     
357     ComlibMulticastMsg *msg = new(sizes, 0) ComlibMulticastMsg;
358     msg->nIndices = cmsg->sec_id->_nElems;
359     msg->_cookie.sInfo.cInfo.instId = myInstanceID;
360     msg->_cookie.sInfo.cInfo.id = MaxSectionID - 1;
361     msg->_cookie.sInfo.cInfo.status = COMLIB_MULTICAST_NEW_SECTION;
362     msg->_cookie.type = COMLIB_MULTICAST_MESSAGE;
363     msg->_cookie.pe = CkMyPe();
364     
365     memcpy(msg->indices, cmsg->sec_id->_elems, 
366            sizes[0] * sizeof(CkArrayIndexMax));
367     memcpy(msg->usrMsg, env, sizes[1] * sizeof(char));         
368     envelope *newenv = UsrToEnv(msg);
369     
370     newenv->getsetArrayMgr() = env->getsetArrayMgr();
371     newenv->getsetArraySrcPe() = env->getsetArraySrcPe();
372     newenv->getsetArrayEp() = env->getsetArrayEp();
373     newenv->getsetArrayHops() = env->getsetArrayHops();
374     newenv->getsetArrayIndex() = env->getsetArrayIndex();
375     // for trace projections
376     newenv->setEvent(env->getEvent());
377     newenv->setSrcPe(env->getSrcPe());
378     
379     CkPackMessage(&newenv);        
380     return (ComlibMulticastMsg *)EnvToUsr(newenv);
381     //}   
382
383     return NULL;
384 }
385