Several new changes
[charm.git] / src / ck-com / MPIStrategy.C
1 #include "MPIStrategy.h"
2
3 #if CHARM_MPI
4 MPI_Comm groupComm;
5 MPI_Group group, groupWorld;
6 #endif
7
8 MPIStrategy::MPIStrategy() {
9     messageBuf = NULL;
10     messageCount = 0;
11     npes = CkNumPes();
12     pelist = NULL;
13 }
14
15 MPIStrategy::MPIStrategy(int npes, int *pelist) {
16     messageBuf = NULL;
17     messageCount = 0;
18
19     this->npes = npes;
20     this->pelist = pelist;
21 }
22
23 void MPIStrategy::insertMessage(CharmMessageHolder *cmsg){
24     cmsg->next = messageBuf;
25     messageBuf = cmsg;    
26 }
27
28 void MPIStrategy::doneInserting(){
29 #if CHARM_MPI
30     ComlibPrintf("[%d] In MPI strategy\n", CkMyPe());
31     
32     CharmMessageHolder *cmsg = messageBuf;
33     char *buf_ptr = mpi_sndbuf;
34     
35     //if(npes == 0)
36     //  npes = CkNumPes();
37     
38     for(count = 0; count < npes; count ++) {
39         ((int *)buf_ptr)[0] = 0;
40         buf_ptr += MPI_MAX_MSG_SIZE;
41     }
42     
43     buf_ptr = mpi_sndbuf;
44     for(count = 0; count < messageCount; count ++) {
45         if(npes < CkNumPes()) {
46             ComlibPrintf("[%d] Copying data to %d and rank %d\n", 
47                          cmsg->dest_proc, procMap[cmsg->dest_proc]);
48             buf_ptr = mpi_sndbuf + MPI_MAX_MSG_SIZE * procMap[cmsg->dest_proc];  
49         }
50         else
51             buf_ptr = mpi_sndbuf + MPI_MAX_MSG_SIZE * cmsg->dest_proc; 
52         
53         char * msg = cmsg->getCharmMessage();
54         envelope * env = UsrToEnv(msg);
55         
56         ((int *)buf_ptr)[0] = env->getTotalsize();
57         
58         ComlibPrintf("[%d] Copying message\n", CkMyPe());
59         memcpy(buf_ptr + sizeof(int), (char *)env, env->getTotalsize());
60         
61         ComlibPrintf("[%d] Deleting message\n", CkMyPe());
62         CmiFree((char *) env);
63         CharmMessageHolder *prev = cmsg;
64         cmsg = cmsg->next;
65         delete prev;
66     }
67     
68     //ComlibPrintf("[%d] Calling Barrier\n", CkMyPe());
69     //PMPI_Barrier(groupComm);
70     
71     ComlibPrintf("[%d] Calling All to all\n", CkMyPe());
72     MPI_Alltoall(mpi_sndbuf, MPI_MAX_MSG_SIZE, MPI_CHAR, mpi_recvbuf, 
73                   MPI_MAX_MSG_SIZE, MPI_CHAR, groupComm);
74     
75     ComlibPrintf("[%d] All to all finished\n", CkMyPe());
76     buf_ptr = mpi_recvbuf;
77     for(count = 0; count < npes; count ++) {
78         int recv_msg_size = ((int *)buf_ptr)[0];
79         char * recv_msg = buf_ptr + sizeof(int);
80         
81         if((recv_msg_size > 0) && recv_msg_size < MPI_MAX_MSG_SIZE) {
82             ComlibPrintf("[%d] Receiving message of size %d\n", CkMyPe(), 
83                          recv_msg_size);
84             CmiSyncSend(CkMyPe(), recv_msg_size, recv_msg);
85         }
86         buf_ptr += MPI_MAX_MSG_SIZE;
87     }
88 #endif
89 }
90
91 void MPIStrategy::pup(PUP::er &p) {
92     CharmStrategy::pup(p);
93
94     p | messageCount;
95     p | npes; 
96        
97     if(p.isUnpacking())
98         pelist = new int[npes];
99     p(pelist , npes);
100
101     messageBuf = NULL;
102     
103     if(p.isUnpacking()){
104 #if CHARM_MPI
105         if(npes < CkNumPes()){
106             MPI_Comm_group(MPI_COMM_WORLD, &groupWorld);
107             MPI_Group_incl(groupWorld, npes, pelist, &group);
108             MPI_Comm_create(MPI_COMM_WORLD, group, &groupComm);
109         }
110         else groupComm = MPI_COMM_WORLD;
111 #endif
112     }
113 }
114
115 //PUPable_def(MPIStrategy);