Merge branch 'charm' of charmgit:charm into harshitha/adaptive_lb
[charm.git] / src / arch / mpi / machine-ctrlmsg.c
1 /* An alternative way to implement MPI-based machine layer */
2 /* Control flows of this scheme:
3  * SEND SIDE:
4  * 1. send a fixed-size small control msg to destination (MPI_Send)
5  *     so the ctrl msg buffer can be reused every time.
6  * 2. immediately following the 1st step, send the actual msg (MPI_Isend)
7  * 3. free the buffer for the sent msg
8  * 
9  * RECV SIDE:
10  * 1. Pre-post buffers for those small ctrl msgs (MPI_Irecv)
11  * 2. If any ctrl msg is received, issue a (i)recv call for the actual msg 
12  *     (differentiate small/large msgs)
13  *
14  * MEMORY ALLOCATION:
15  * use MPI_Alloc_mem and MPI_Free_mem so that CmiAllloc/CmiFree needs to be changed
16  */
17
18 /* This file contains variables and function declarations that are used for this alternative implementation */
19
20 /* This file contains function and variables definitions that are used for this alternative implementation */
21
22 #if USE_MPI_CTRLMSG_SCHEME
23
24 #define CTRL_MSG_TAG         (TAG-13)
25 #define USE_NUM_TAGS            1000
26
27 static int MPI_CTRL_MSG_CNT=100;
28 static int tags;
29
30 typedef struct MPICtrlMsgEntry{
31         int src;
32         int size;
33         int tag;
34 }MPICtrlMsgEntry;
35
36 typedef struct RecvCtrlMsgEntry{
37         int bufCnt;
38         MPI_Request *ctrlReqs; /* sizeof(MPI_Request* bufCnt */
39         MPICtrlMsgEntry *bufs; /*sizeof(MPICtrlMsgEntry)*bufCnt*/
40 }RecvCtrlMsgEntry;
41
42 static RecvCtrlMsgEntry recvCtrlMsgList;
43
44 static void createCtrlMsgIrecvBufs(){
45         int i;
46         MPICtrlMsgEntry *bufPtr = NULL;
47         MPI_Request *reqPtr = NULL;
48         int count = MPI_CTRL_MSG_CNT;
49
50         tags = 0;
51         
52         recvCtrlMsgList.bufCnt = count;
53         recvCtrlMsgList.ctrlReqs = (MPI_Request *)malloc(sizeof(MPI_Request)*count);
54         recvCtrlMsgList.bufs = (MPICtrlMsgEntry *)malloc(sizeof(MPICtrlMsgEntry)*count);
55         
56         bufPtr = recvCtrlMsgList.bufs;
57         reqPtr = recvCtrlMsgList.ctrlReqs;
58         
59         for(i=0; i<count; i++, bufPtr++, reqPtr++){
60                 if(MPI_SUCCESS != MPI_Irecv(bufPtr, sizeof(MPICtrlMsgEntry), 
61                               MPI_BYTE, MPI_ANY_SOURCE, CTRL_MSG_TAG, charmComm, reqPtr)){
62                         CmiAbort("MPI_Irecv failed in creating pre-posted ctrl msg buffers\n");
63                 }
64         }
65 }
66
67 static void sendViaCtrlMsg(int node, int size, char *msg, SMSG_LIST *smsg){
68         MPICtrlMsgEntry one;
69
70         one.src = CmiMyNode();
71         one.size = size;
72         one.tag = TAG + 100 + tags;
73
74         tags = (tags+1)%USE_NUM_TAGS;
75         
76         START_TRACE_SENDCOMM(msg);
77         if(MPI_SUCCESS != MPI_Send((void *)&one, sizeof(MPICtrlMsgEntry), MPI_BYTE, node, CTRL_MSG_TAG, charmComm)){
78                 CmiAbort("MPI_Send failed in sending ctrl msg\n");
79         }
80         if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,one.tag,charmComm,&(smsg->req)))
81             CmiAbort("MPISendOneMsg: MPI_Isend failed!\n");
82         END_TRACE_SENDCOMM(msg);
83 }
84
85 /* returns the size of msg to be received. If there's no msg to be received, then -1 is returned */
86 static int recvViaCtrlMsg(){
87         int count = recvCtrlMsgList.bufCnt;
88         MPI_Request *ctrlReqs = recvCtrlMsgList.ctrlReqs;
89         MPICtrlMsgEntry *ctrlMsgs = recvCtrlMsgList.bufs;
90         
91         int completed_index = -1;
92         int flg = 0;
93         int nbytes = -1;
94         MPI_Status sts;
95         if(MPI_SUCCESS != MPI_Testany(count, ctrlReqs, &completed_index, &flg, &sts)){
96                 CmiAbort("MPI_Testany failed for checking if ctrl msg is received\n");
97         }
98         
99         if(flg){
100                 int src = ctrlMsgs[completed_index].src;
101                 int msgsize = ctrlMsgs[completed_index].size;
102                 nbytes = msgsize;
103                 char *actualMsg = (char *)CmiAlloc(msgsize);
104                 
105                 IRecvList one = irecvListEntryAllocate();
106                 
107                 /* repost the ctrl msg */
108                 if(MPI_SUCCESS != MPI_Irecv(ctrlMsgs+completed_index, sizeof(MPICtrlMsgEntry), MPI_BYTE,
109                                                                   MPI_ANY_SOURCE, CTRL_MSG_TAG, charmComm, ctrlReqs+completed_index)){
110                         CmiAbort("MPI_Irecv failed in re-posting a ctrl msg is received\n");
111                 }
112                 
113                 /* irecv the actual msg */
114                 if(MPI_SUCCESS != MPI_Irecv(actualMsg, msgsize, MPI_BYTE, src, ctrlMsgs[completed_index].tag, charmComm, &(one->req))){
115                         CmiAbort("MPI_Irecv failed after a ctrl msg is received\n");
116                 }
117                 one->msg = actualMsg;
118                 one->size = msgsize;
119                 one->next = NULL;
120                 waitIrecvListTail->next = one;
121                 waitIrecvListTail = one;
122         }
123         
124         return nbytes;
125 }
126
127 #endif