Adding communication library in src/ck-com and src/conv-com
[charm.git] / src / ck-com / NodeMulticast.C
1 #include "NodeMulticast.h"
2 #include "converse.h"
3
4 #define MAX_BUF_SIZE 165000
5 #define MAX_SENDS_PER_BATCH 16
6 #define MULTICAST_DELAY 5
7
8 static NodeMulticast *nm_mgr;
9
10 static void call_doneInserting(void *ptr,double curWallTime){
11     NodeMulticast *mgr = (NodeMulticast *)ptr;
12     mgr->doneInserting();
13 }
14
15 static void* NodeMulticastHandler(void *msg){
16     ComlibPrintf("In Node MulticastHandler\n");
17     nm_mgr->recvHandler(msg);
18     return NULL;
19 }
20
21 static void* NodeMulticastCallbackHandler(void *msg){
22     ComlibPrintf("[%d]:In Node MulticastCallbackHandler\n", CkMyPe());
23     register envelope *env = (envelope *)msg;
24     CkUnpackMessage(&env);
25     //nm_mgr->getCallback().send(EnvToUsr(env));
26
27     //nm_mgr->getHandler()(env);
28     return NULL;
29 }
30
31 //Handles multicast by sending only one message to a nodes and making 
32 //them multicast locally
33 void NodeMulticast::setDestinationArray(CkArrayID a, int nelem, 
34                                         CkArrayIndexMax **idx, int ep){
35
36     mode = ARRAY_MODE;
37     messageBuf = NULL;
38     pes_per_node = 4;
39     if(getenv("RMS_NODES") != NULL)
40         pes_per_node = CkNumPes()/atoi(getenv("RMS_NODES"));
41
42     mAid = a;
43     nelements = nelem;
44     entryPoint = ep;
45   
46     numNodes = CkNumPes()/pes_per_node;
47     numCurDestPes = CkNumPes();
48     myRank = 0;
49     nodeMap = new int[numNodes];
50   
51     ComlibPrintf("In SetDestinationArray %d, %d, %d, %d\n", numNodes, 
52                  pes_per_node, nelements, ep);
53   
54     indexVec = new CkVec<CkArrayIndexMax> [CkNumPes()];
55     
56     for(int count = 0; count < nelements; count++) {
57         ComlibPrintf("Before lastKnown %d\n", count);
58         int dest_proc = CkArrayID::CkLocalBranch(a)->lastKnown(*idx[count]);
59         ComlibPrintf("After lastKnown %d\n", dest_proc);
60         nodeMap[dest_proc/pes_per_node] = 1;
61         
62         indexVec[dest_proc].insertAtEnd(*idx[count]);
63     }    
64
65     ComlibPrintf("After SetDestinationArray\n");
66 }
67 /*
68 void NodeMulticast::setPeList(int npes, int *pelist, ComlibMulticastHandler handler){
69     mode = PROCESSOR_MODE;
70     messageBuf = NULL;
71     pes_per_node = 4;
72     //if(getenv("RMS_NODES") != NULL)
73     //pes_per_node = CkNumPes()/atoi(getenv("RMS_NODES"));
74
75     //cb = callback;
76     this->handler = (long)handler;
77   
78     numNodes = CkNumPes()/pes_per_node;
79     numCurDestPes = npes;
80     
81     myRank = 0;
82     nodeMap = new int[numNodes];
83   
84     this->npes = npes;
85     this->pelist = new int[npes];
86     memcpy(this->pelist, pelist, npes * sizeof(int));
87
88     ComlibPrintf("In setPeList %d, %d, %d\n", numNodes, 
89                  pes_per_node, npes);
90     
91     for(int count = 0; count < npes; count++)
92         nodeMap[pelist[count]/pes_per_node] = 1;        
93     
94     ComlibPrintf("After setPeList\n");
95 }
96 */
97
98 void NodeMulticast::recvHandler(void *msg) {
99     register envelope* env = (envelope *)msg;
100     void *charm_msg = (void *)EnvToUsr(env);
101
102     env->setUsed(0);
103     ComlibPrintf("In receive Handler\n");
104     if(mode == ARRAY_MODE) {
105         env->getsetArrayMgr()=mAid;
106         env->getsetArrayEp()=entryPoint;
107         env->getsetArrayHops()=0;       
108         CkUnpackMessage(&env);
109
110         for(int count = 0; count < pes_per_node; count ++){
111             int dest_pe = (CkMyPe()/pes_per_node) * pes_per_node + count;
112             int size = indexVec[dest_pe].size();
113             
114             ComlibPrintf("[%d], %d elements to send to %d of size %d\n", CkMyPe(), size, dest_pe, env->getTotalsize());
115             
116             CkArrayIndexMax * idx_arr = indexVec[dest_pe].getVec();
117             for(int itr = 0; itr < size; itr ++) {
118                 void *newcharmmsg = CkCopyMsg(&charm_msg); 
119                 envelope* newenv = UsrToEnv(newcharmmsg);
120                 CProxyElement_ArrayBase ap(mAid, idx_arr[itr]);         
121                 newenv->getsetArrayIndex()=idx_arr[itr];
122                 ap.ckSend((CkArrayMessage *)newcharmmsg, entryPoint);
123             }
124         }
125     }
126     else {
127       CkUnpackMessage(&env);
128       for(int count = 0; count < pes_per_node; count++) 
129         if(validRank[count]){
130             void *newcharmmsg;
131             envelope* newenv;
132           
133             if(count <  pes_per_node - 1) {
134                 newcharmmsg = CkCopyMsg(&charm_msg); 
135                 newenv = UsrToEnv(newcharmmsg);
136             }
137             else {
138                 newcharmmsg = charm_msg;
139                 newenv = UsrToEnv(newcharmmsg);
140             }
141
142             CmiSetHandler(newenv, NodeMulticastCallbackHandlerId);
143             ComlibPrintf("[%d] In receive Handler (proc mode), sending message to %d at handler %d\n", 
144                          CkMyPe(), (CkMyPe()/pes_per_node) * pes_per_node 
145                          + count, NodeMulticastCallbackHandlerId);
146             
147             CkPackMessage(&newenv);
148             CmiSyncSendAndFree((CkMyPe()/pes_per_node) *pes_per_node + count, 
149                                newenv->getTotalsize(), (char *)newenv);
150         }
151     }
152     ComlibPrintf("[%d] CmiFree (Code) (%x)\n", CkMyPe(), 
153                  (long) msg - 2*sizeof(int));
154     //CmiFree(msg);
155 }
156
157 void NodeMulticast::insertMessage(CharmMessageHolder *cmsg){
158
159     ComlibPrintf("In insertMessage \n");
160     envelope *env = UsrToEnv(cmsg->getCharmMessage());
161
162     CmiSetHandler(env, NodeMulticastHandlerId);
163     messageBuf->enq(cmsg);
164 }
165
166 void NodeMulticast::doneInserting(){
167     CharmMessageHolder *cmsg;
168     char *msg;
169     register envelope *env;
170     
171     ComlibPrintf("NodeMulticast :: doneInserting\n");
172     
173     if(messageBuf->length() > 1) {
174         //CkPrintf("NodeMulticast :: doneInserting length > 1\n");
175         /*
176         char **msgComps;
177         int *sizes, msg_count;
178     
179         msgComps = new char*[messageBuf->length()];
180         sizes = new int[messageBuf->length()];
181         msg_count = 0;
182         while (!messageBuf->isEmpty()) {
183             cmsg = messageBuf->deq();
184             msg = cmsg->getCharmMessage();
185             env = UsrToEnv(msg);
186             sizes[msg_count] = env->getTotalsize();
187             msgComps[msg_count] = (char *)env;
188             msg_count++;
189             
190             delete cmsg;
191         }
192         
193         for(int count = 0; count < numNodes; count++)
194             if(nodeMap[count])
195                 CmiMultipleSend(count * pes_per_node + myRank, msg_count, 
196                                 sizes, msgComps);
197         
198         delete [] msgComps;
199         delete [] sizes;
200         */
201     }
202     else if (messageBuf->length() == 1){
203         static int prevCount = 0;
204         int count = 0;
205         ComlibPrintf("Sending Node Multicast\n");
206         cmsg = messageBuf->deq();
207         msg = cmsg->getCharmMessage();
208         env = UsrToEnv(msg);
209         
210         if(mode == ARRAY_MODE)
211             env->getsetArraySrcPe()=CkMyPe();
212         CkPackMessage(&env);
213
214         CmiSetHandler(env, NodeMulticastHandlerId);
215         ComlibPrintf("After set handler\n");
216
217         //CmiPrintf("cursedtpes = %d, %d\n", cmsg->npes, numCurDestPes);
218         
219         if((mode != ARRAY_MODE) && cmsg->npes < numCurDestPes) {
220             numCurDestPes = cmsg->npes;
221             for(count = 0; count < numNodes; count++) 
222                 nodeMap[count] = 0;        
223             
224             for(count = 0; count < cmsg->npes; count++) 
225                 nodeMap[(cmsg->pelist[count])/pes_per_node] = 1;        
226         }
227         
228         for(count = prevCount; count < numNodes; count++) {
229             //int dest_node = count;
230             int dest_node = (count + (CkMyPe()/pes_per_node))%numNodes;
231             if(nodeMap[dest_node]) {
232                 void *newcharmmsg;
233                 envelope* newenv;
234                 
235                 if(count < numNodes - 1) {
236                     newcharmmsg = CkCopyMsg((void **)&msg); 
237                     newenv = UsrToEnv(newcharmmsg);
238                 }
239                 else {
240                     newcharmmsg = msg;
241                     newenv = UsrToEnv(newcharmmsg);
242                 }
243                 
244                 ComlibPrintf("[%d]In cmisyncsend to %d\n", CkMyPe(), 
245                              dest_node * pes_per_node + myRank);
246 #if CMK_PERSISTENT_COMM
247                 if(env->getTotalsize() < MAX_BUF_SIZE)
248                     CmiUsePersistentHandle(&persistentHandlerArray[dest_node],1);
249 #endif
250                 CkPackMessage(&newenv);
251                 CmiSyncSendAndFree(dest_node * pes_per_node + myRank, 
252                                    newenv->getTotalsize(), (char *)newenv);
253 #if CMK_PERSISTENT_COMM
254                 if(env->getTotalsize() < MAX_BUF_SIZE)
255                     CmiUsePersistentHandle(NULL, 0);
256 #endif          
257             }
258             prevCount ++;
259             if((prevCount % MAX_SENDS_PER_BATCH == 0) &&
260                (prevCount != numNodes)) {
261                 CcdCallFnAfterOnPE((CcdVoidFn)call_doneInserting, (void *)this, 
262                                MULTICAST_DELAY, CkMyPe());
263                 return;
264             }
265             prevCount = 0;
266         }
267
268         ComlibPrintf("[%d] CmiFree (Code) (%x)\n", CkMyPe(), (char *)env - 2*sizeof(int));
269         //CmiFree(env);
270         delete cmsg;
271     }
272 }
273
274 void NodeMulticast::pup(PUP::er &p){
275     
276     CharmStrategy::pup(p);
277
278     p | pes_per_node;
279     p | numNodes;
280     p | nelements;
281     p | entryPoint;
282     p | npes;
283     p | mode;
284     p | numCurDestPes;
285     p | mAid;
286     
287     if(p.isUnpacking()) {
288         nodeMap = new int[numNodes];
289         
290         if(mode == ARRAY_MODE) {
291             typedef CkVec<CkArrayIndexMax> CkVecArrayIndex;
292             CkVecArrayIndex *vec = new CkVecArrayIndex[CkNumPes()];
293             indexVec = vec;
294         }
295
296         if(mode == PROCESSOR_MODE)
297             pelist = new int[npes];
298     }
299
300     p | cb;
301     p | handler;
302     p(nodeMap, numNodes);
303
304     if(mode == PROCESSOR_MODE)
305       p(pelist, npes);
306
307     if(mode == ARRAY_MODE)
308       for(int count = 0; count < CkNumPes(); count++)
309         p | indexVec[count];
310     
311     if(p.isUnpacking()) {
312         messageBuf = new CkQ <CharmMessageHolder *>;
313         myRank = CkMyPe() % pes_per_node;
314         
315         NodeMulticastHandlerId = CkRegisterHandler((CmiHandler)NodeMulticastHandler);
316         NodeMulticastCallbackHandlerId = CkRegisterHandler
317             ((CmiHandler)NodeMulticastCallbackHandler);
318         
319         nm_mgr = this;
320
321         //validRank[0] =  validRank[1] = validRank[2] = validRank[3] = 0;
322         memset(validRank, 0, MAX_PES_PER_NODE * sizeof(int));
323         for(int count = 0; count < npes; count ++){
324             if(CkMyPe()/pes_per_node == pelist[count] / pes_per_node)
325                 validRank[pelist[count] % pes_per_node] = 1;
326         }
327
328 #if CMK_PERSISTENT_COMM
329         persistentHandlerArray = new PersistentHandle[numNodes];
330         for(int count = 0; count < numNodes; count ++)
331             //if(nodeMap[count])
332             persistentHandlerArray[count] = CmiCreatePersistent
333                 (count * pes_per_node + myRank, MAX_BUF_SIZE);
334 #endif
335     }
336 }
337
338 //PUPable_def(NodeMulticast);