Adding communication library in src/ck-com and src/conv-com
[charm.git] / src / ck-com / MPIStrategy.C
1 #include "MPIStrategy.h"
2
3 #if CHARM_MPI
4 MPI_Comm groupComm;
5 MPI_Group group, groupWorld;
6 #endif
7
8 MPIStrategy::MPIStrategy() {
9     messageBuf = NULL;
10     messageCount = 0;
11     npes = CkNumPes();
12     pelist = NULL;
13 }
14
15 MPIStrategy::MPIStrategy(int npes, int *pelist) {
16     messageBuf = NULL;
17     messageCount = 0;
18
19     this->npes = npes;
20     this->pelist = pelist;
21 }
22
23 void MPIStrategy::insertMessage(CharmMessageHolder *cmsg){
24     cmsg->next = messageBuf;
25     messageBuf = cmsg;    
26 }
27
28 void MPIStrategy::doneInserting(){
29 #if CHARM_MPI
30     ComlibPrintf("[%d] In MPI strategy\n", CkMyPe());
31     
32     CharmMessageHolder *cmsg = messageBuf;
33     char *buf_ptr = mpi_sndbuf;
34     
35     //if(npes == 0)
36     //  npes = CkNumPes();
37     
38     for(count = 0; count < npes; count ++) {
39         ((int *)buf_ptr)[0] = 0;
40         buf_ptr += MPI_MAX_MSG_SIZE;
41     }
42     
43     buf_ptr = mpi_sndbuf;
44     for(count = 0; count < messageCount; count ++) {
45         if(npes < CkNumPes()) {
46             ComlibPrintf("[%d] Copying data to %d and rank %d\n", 
47                          cmsg->dest_proc, procMap[cmsg->dest_proc]);
48             buf_ptr = mpi_sndbuf + MPI_MAX_MSG_SIZE * procMap[cmsg->dest_proc];  
49         }
50         else
51             buf_ptr = mpi_sndbuf + MPI_MAX_MSG_SIZE * cmsg->dest_proc; 
52         
53         char * msg = cmsg->getCharmMessage();
54         envelope * env = UsrToEnv(msg);
55         
56         ((int *)buf_ptr)[0] = env->getTotalsize();
57         
58         ComlibPrintf("[%d] Copying message\n", CkMyPe());
59         memcpy(buf_ptr + sizeof(int), (char *)env, env->getTotalsize());
60         
61         ComlibPrintf("[%d] Deleting message\n", CkMyPe());
62         CmiFree((char *) env);
63         CharmMessageHolder *prev = cmsg;
64         cmsg = cmsg->next;
65         delete prev;
66     }
67     
68     //ComlibPrintf("[%d] Calling Barrier\n", CkMyPe());
69     //PMPI_Barrier(groupComm);
70     
71     ComlibPrintf("[%d] Calling All to all\n", CkMyPe());
72     MPI_Alltoall(mpi_sndbuf, MPI_MAX_MSG_SIZE, MPI_CHAR, mpi_recvbuf, 
73                   MPI_MAX_MSG_SIZE, MPI_CHAR, groupComm);
74     
75     ComlibPrintf("[%d] All to all finished\n", CkMyPe());
76     buf_ptr = mpi_recvbuf;
77     for(count = 0; count < npes; count ++) {
78         int recv_msg_size = ((int *)buf_ptr)[0];
79         char * recv_msg = buf_ptr + sizeof(int);
80         
81         if((recv_msg_size > 0) && recv_msg_size < MPI_MAX_MSG_SIZE) {
82             ComlibPrintf("[%d] Receiving message of size %d\n", CkMyPe(), 
83                          recv_msg_size);
84             CmiSyncSend(CkMyPe(), recv_msg_size, recv_msg);
85         }
86         buf_ptr += MPI_MAX_MSG_SIZE;
87     }
88 #endif
89 }
90
91 void MPIStrategy::pup(PUP::er &p) {
92     p | messageCount;
93     p | npes; 
94        
95     if(p.isUnpacking())
96         pelist = new int[npes];
97     p(pelist , npes);
98
99     messageBuf = NULL;
100     
101     if(p.isUnpacking()){
102 #if CHARM_MPI
103         if(npes < CkNumPes()){
104             MPI_Comm_group(MPI_COMM_WORLD, &groupWorld);
105             MPI_Group_incl(groupWorld, npes, pelist, &group);
106             MPI_Comm_create(MPI_COMM_WORLD, group, &groupComm);
107         }
108         else groupComm = MPI_COMM_WORLD;
109 #endif
110     }
111 }
112
113 //PUPable_def(MPIStrategy);