An alternative implementation of MPI machine layer as suggested by
authorChao Mei <chaomei2@illinois.edu>
Fri, 7 Sep 2012 23:29:52 +0000 (18:29 -0500)
committerChao Mei <chaomei2@illinois.edu>
Fri, 7 Sep 2012 23:30:33 +0000 (18:30 -0500)
Pavan.

src/arch/mpi/machine-ctrlmsg.c [new file with mode: 0644]
src/arch/mpi/machine.c
src/conv-core/convcore.c

diff --git a/src/arch/mpi/machine-ctrlmsg.c b/src/arch/mpi/machine-ctrlmsg.c
new file mode 100644 (file)
index 0000000..8da4dfd
--- /dev/null
@@ -0,0 +1,122 @@
+/* An alternative way to implement MPI-based machine layer */
+/* Control flows of this scheme:
+ * SEND SIDE:
+ * 1. send a fixed-size small control msg to destination (MPI_Send)
+ *     so the ctrl msg buffer can be reused every time.
+ * 2. immediately following the 1st step, send the actual msg (MPI_Isend)
+ * 3. free the buffer for the sent msg
+ * 
+ * RECV SIDE:
+ * 1. Pre-post buffers for those small ctrl msgs (MPI_Irecv)
+ * 2. If any ctrl msg is received, issue a (i)recv call for the actual msg 
+ *     (differentiate small/large msgs)
+ *
+ * MEMORY ALLOCATION:
+ * use MPI_Alloc_mem and MPI_Free_mem so that CmiAllloc/CmiFree needs to be changed
+ */
+
+/* This file contains variables and function declarations that are used for this alternative implementation */
+
+/* This file contains function and variables definitions that are used for this alternative implementation */
+
+#if USE_MPI_CTRLMSG_SCHEME
+
+#define CTRL_MSG_TAG         (TAG-13)
+
+static int MPI_CTRL_MSG_CNT=10;
+
+typedef struct MPICtrlMsgEntry{
+       int src;
+       int size;
+}MPICtrlMsgEntry;
+
+typedef struct RecvCtrlMsgEntry{
+       int bufCnt;
+       MPI_Request *ctrlReqs; /* sizeof(MPI_Request* bufCnt */
+       MPICtrlMsgEntry *bufs; /*sizeof(MPICtrlMsgEntry)*bufCnt*/
+}RecvCtrlMsgEntry;
+
+static RecvCtrlMsgEntry recvCtrlMsgList;
+
+static void createCtrlMsgIrecvBufs(){
+       int i;
+       MPICtrlMsgEntry *bufPtr = NULL;
+       MPI_Request *reqPtr = NULL;
+       int count = MPI_CTRL_MSG_CNT;
+       
+       recvCtrlMsgList.bufCnt = count;
+       recvCtrlMsgList.ctrlReqs = (MPI_Request *)malloc(sizeof(MPI_Request)*count);
+       recvCtrlMsgList.bufs = (MPICtrlMsgEntry *)malloc(sizeof(MPICtrlMsgEntry)*count);
+       
+       bufPtr = recvCtrlMsgList.bufs;
+       reqPtr = recvCtrlMsgList.ctrlReqs;
+       
+       for(i=0; i<count; i++, bufPtr++, reqPtr++){
+               if(MPI_SUCCESS != MPI_Irecv(bufPtr, sizeof(MPICtrlMsgEntry), 
+                                                                                                        MPI_BYTE, MPI_ANY_SOURCE, CTRL_MSG_TAG, charmComm, reqPtr)){
+                       CmiAbort("MPI_Irecv failed in creating pre-posted ctrl msg buffers\n");
+               }
+       }
+}
+
+static void sendViaCtrlMsg(int node, int size, char *msg, SMSG_LIST *smsg){
+       MPICtrlMsgEntry one;
+
+       one.src = CmiMyNode();
+       one.size = size;
+       
+       START_TRACE_SENDCOMM(msg);
+       if(MPI_SUCCESS != MPI_Send((void *)&one, sizeof(MPICtrlMsgEntry), MPI_BYTE, node, 
+                                                                                                  CTRL_MSG_TAG, charmComm)){
+               CmiAbort("MPI_Send failed in sending ctrl msg\n");
+       }
+       
+       if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,TAG,charmComm,&(smsg->req)))
+            CmiAbort("MPISendOneMsg: MPI_Isend failed!\n");
+       END_TRACE_SENDCOMM(msg);
+}
+
+/* returns the size of msg to be received. If there's no msg to be received, then -1 is returned */
+static int recvViaCtrlMsg(){
+       int count = recvCtrlMsgList.bufCnt;
+       MPI_Request *ctrlReqs = recvCtrlMsgList.ctrlReqs;
+       MPICtrlMsgEntry *ctrlMsgs = recvCtrlMsgList.bufs;
+       
+       int completed_index = -1;
+       int flg = 0;
+       int nbytes = -1;
+       MPI_Status sts;
+       
+       if(MPI_SUCCESS != MPI_Testany(count, ctrlReqs, &completed_index, &flg, &sts)){
+               CmiAbort("MPI_Testany failed for checking if ctrl msg is received\n");
+       }
+       
+       if(flg){
+               int src = ctrlMsgs[completed_index].src;
+               int msgsize = ctrlMsgs[completed_index].size;
+               nbytes = msgsize;
+               char *actualMsg = (char *)CmiAlloc(msgsize);
+               
+               IRecvList one = irecvListEntryAllocate();
+               
+               /* repost the ctrl msg */
+               if(MPI_SUCCESS != MPI_Irecv(ctrlMsgs+completed_index, sizeof(MPICtrlMsgEntry), MPI_BYTE,
+                                                                 MPI_ANY_SOURCE, CTRL_MSG_TAG, charmComm, ctrlReqs+completed_index)){
+                       CmiAbort("MPI_Irecv failed in re-posting a ctrl msg is received\n");
+               }
+               
+               /* irecv the actual msg */
+               if(MPI_SUCCESS != MPI_Irecv(actualMsg, msgsize, MPI_BYTE, src, TAG, charmComm, &(one->req))){
+                       CmiAbort("MPI_Irecv failed after a ctrl msg is received\n");
+               }
+               one->msg = actualMsg;
+               one->size = msgsize;
+               one->next = NULL;
+               waitIrecvListTail->next = one;
+               waitIrecvListTail = one;
+       }
+       
+       return nbytes;
+}
+
+#endif
\ No newline at end of file
index ebfba7f93094bf427ed6a582d45b846435cf086e..aff8bb1fb6f0dbfa973b8c03698794733537f8f3 100644 (file)
@@ -198,13 +198,24 @@ static void reportMsgHistogramInfo();
 
 #endif /* end of MPI_POST_RECV defined */
 
+/* to avoid MPI's in order delivery, changing MPI Tag all the time */
+#define TAG     1375
+#if MPI_POST_RECV
+#define POST_RECV_TAG       (TAG+1)
+#define BARRIER_ZERO_TAG  TAG
+#else
+#define BARRIER_ZERO_TAG   (TAG-1)
+#endif
+
+#define USE_MPI_CTRLMSG_SCHEME 1
+
 /* Defining this macro will use MPI_Irecv instead of MPI_Recv for
  * large messages. This could save synchronization overhead caused by
  * the rzv protocol used by MPI
  */
 #define USE_ASYNC_RECV_FUNC 0
 
-#ifdef USE_ASYNC_RECV_FUNC
+#if USE_ASYNC_RECV_FUNC || USE_MPI_CTRLMSG_SCHEME
 static int IRECV_MSG_THRESHOLD = 8000;
 typedef struct IRecvListEntry{
     MPI_Request req;
@@ -233,7 +244,7 @@ static void irecvListEntryFree(IRecvList used){
     freedIrecvList = used;
 }
 
-#endif /* end of USE_ASYNC_RECV_FUNC */
+#endif /* end of USE_ASYNC_RECV_FUNC || USE_MPI_CTRLMSG_SCHEME */
 
 /* Providing functions for external usage to set up the dynamic recv buffer
  * when the user is aware that it's safe to call such function
@@ -249,14 +260,6 @@ static void recordMsgHistogramInfo(int size);
 static void reportMsgHistogramInfo();
 #endif
 
-/* to avoid MPI's in order delivery, changing MPI Tag all the time */
-#define TAG     1375
-#if MPI_POST_RECV
-#define POST_RECV_TAG       (TAG+1)
-#define BARRIER_ZERO_TAG  TAG
-#else
-#define BARRIER_ZERO_TAG   (TAG-1)
-#endif
 /* ###End of POST_RECV related related macros ### */
 
 #if CMK_BLUEGENEL
@@ -375,6 +378,10 @@ void CmiNotifyIdleForMPI(void);
 #include "machine-lrts.h"
 #include "machine-common-core.c"
 
+#if USE_MPI_CTRLMSG_SCHEME
+#include "machine-ctrlmsg.c"
+#endif
+
 /* The machine specific msg-sending function */
 
 #if CMK_SMP
@@ -434,8 +441,10 @@ static CmiCommHandle MPISendOneMsg(SMSG_LIST *smsg) {
             CmiAbort("MPISendOneMsg: MPI_Isend failed!\n");
         END_TRACE_SENDCOMM(msg);
     }
+#elif USE_MPI_CTRLMSG_SCHEME
+       sendViaCtrlMsg(node, size, msg, smsg);
 #else
-/* branch not using MPI_POST_RECV */
+/* branch not using MPI_POST_RECV or USE_MPI_CTRLMSG_SCHEME */
 
 #if CMK_MEM_CHECKPOINT || CMK_MESSAGE_LOGGING
        dstrank = petorank[node];
@@ -612,9 +621,12 @@ static int PumpMsgs(void) {
 #endif
 
         START_TRACE_RECVCOMM(NULL);
-
-        /* First check posted recvs then do  probe unmatched outstanding messages */
-#if MPI_POST_RECV
+#if USE_MPI_CTRLMSG_SCHEME
+       doSyncRecv = 0;
+       nbytes = recvViaCtrlMsg();
+       if(nbytes == -1) break;
+#elif MPI_POST_RECV
+               /* First check posted recvs then do  probe unmatched outstanding messages */
         MPIPostRecvList *postedOne = NULL;
         int completed_index = -1;
         flg = 0;
@@ -691,7 +703,7 @@ static int PumpMsgs(void) {
             CpvAccess(Cmi_unposted_recv_total)++;
         }
 #else
-        /* Original version */
+        /* Original version of not using MPI_POST_RECV and USE_MPI_CTRLMSG_SCHEME */
         START_EVENT();
         res = MPI_Iprobe(MPI_ANY_SOURCE, TAG, charmComm, &flg, &sts);
         if (res != MPI_SUCCESS)
@@ -728,20 +740,20 @@ static int PumpMsgs(void) {
         }
 #endif
 
-#endif /*end of not MPI_POST_RECV */
-
-        MACHSTATE2(3,"PumpMsgs recv one from node:%d to rank:%d", sts.MPI_SOURCE, CMI_DEST_RANK(msg));
-        CMI_CHECK_CHECKSUM(msg, nbytes);
-#if CMK_ERROR_CHECKING
-        if (CMI_MAGIC(msg) != CHARM_MAGIC_NUMBER) { /* received a non-charm msg */
-            CmiPrintf("Charm++ Abort: Non Charm++ Message Received of size %d. \n", nbytes);
-            CmiFree(msg);
-            CmiAbort("Abort!\n");
-            continue;
-        }
-#endif
+#endif /*end of !MPI_POST_RECV and !USE_MPI_CTRLMSG_SCHEME*/
 
-        if(doSyncRecv){
+               if(doSyncRecv){
+                       MACHSTATE2(3,"PumpMsgs recv one from node:%d to rank:%d", sts.MPI_SOURCE, CMI_DEST_RANK(msg));
+                       CMI_CHECK_CHECKSUM(msg, nbytes);
+       #if CMK_ERROR_CHECKING
+                       if (CMI_MAGIC(msg) != CHARM_MAGIC_NUMBER) { /* received a non-charm msg */
+                               CmiPrintf("Charm++ Abort: Non Charm++ Message Received of size %d. \n", nbytes);
+                               CmiFree(msg);
+                               CmiAbort("Abort!\n");
+                               continue;
+                       }
+       #endif
+        
             END_TRACE_RECVCOMM(msg);
             handleOneRecvedMsg(nbytes, msg);
         }
@@ -820,9 +832,10 @@ static int PumpMsgs(void) {
 
     }
 
-#if USE_ASYNC_RECV_FUNC
+#if USE_ASYNC_RECV_FUNC || USE_MPI_CTRLMSG_SCHEME
 /* Another loop to check the irecved msgs list */
 {
+       /*TODO: msg cap (throttling) is not exerted here */
     IRecvList irecvEnt;
     int irecvDone = 0;
     MPI_Status sts;
@@ -1446,6 +1459,13 @@ void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID) {
                MPI_POST_RECV_MSG_CNT_THRESHOLD, MPI_POST_RECV_INC, MPI_POST_RECV_MSG_INC, MPI_POST_RECV_FREQ);
     }
 #endif
+       
+#if USE_MPI_CTRLMSG_SCHEME
+       CmiGetArgInt(largv, "+ctrlMsgCnt", &MPI_CTRL_MSG_CNT);
+       if(myNID == 0){
+               printf("Charm++: using the alternative ctrl msg scheme with %d pre-posted ctrl msgs\n", MPI_CTRL_MSG_CNT);
+       }
+#endif
 
 #if CMI_EXERT_SEND_CAP
     CmiGetArgInt(largv, "+dynCapSend", &SEND_CAP);
@@ -1503,7 +1523,13 @@ void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID) {
 
 void LrtsPreCommonInit(int everReturn) {
 
-#if MPI_POST_RECV
+#if USE_MPI_CTRLMSG_SCHEME
+       #if CMK_SMP
+               if(CmiMyRank() == CmiMyNodeSize()) createCtrlMsgIrecvBufs();
+       #else
+               createCtrlMsgIrecvBufs();
+       #endif
+#elif MPI_POST_RECV
     int doInit = 1;
     int i;
 
@@ -1575,15 +1601,15 @@ void LrtsPreCommonInit(int everReturn) {
         }
 #endif
     }
-#endif /* end of MPI_POST_RECV */
-
+#endif /* end of MPI_POST_RECV  and USE_MPI_CTRLMSG_SCHEME */
+       
 #if CAPTURE_MSG_HISTOGRAM && !MPI_DYNAMIC_POST_RECV
     CpvInitialize(int *, MSG_HISTOGRAM_ARRAY);
     CpvAccess(MSG_HISTOGRAM_ARRAY) = (int *)malloc(sizeof(int)*MAX_HISTOGRAM_BUCKETS);
     memset(CpvAccess(MSG_HISTOGRAM_ARRAY), 0, sizeof(int)*MAX_HISTOGRAM_BUCKETS);
 #endif
 
-#if USE_ASYNC_RECV_FUNC
+#if USE_ASYNC_RECV_FUNC || USE_MPI_CTRLMSG_SCHEME
 #if CMK_SMP
     /* allocate the guardian entry only on comm thread considering NUMA */
     if(CmiMyRank() == CmiMyNodeSize()) {
index f79dfca5783503501a597f3f75e938305fee5a7d..46c011b2e0c9465a3a22eac925f4e67f180d6e20 100644 (file)
@@ -131,6 +131,11 @@ extern void CldModuleInit(char **);
 #endif
 
 #include "quiescence.h"
+
+#if USE_MPI_CTRLMSG_SCHEME && CMK_CONVERSE_MPI
+#include <mpi.h>
+#endif
+
 //int cur_restart_phase = 1;      /* checkpointing/restarting phase counter */
 CpvDeclare(int,_curRestartPhase);
 static int CsdLocalMax = CSD_LOCAL_MAX_DEFAULT;
@@ -2846,6 +2851,8 @@ void *CmiAlloc(int size)
   res =(char *) LrtsAlloc(size, sizeof(CmiChunkHeader));
 #elif CONVERSE_POOL
   res =(char *) CmiPoolAlloc(size+sizeof(CmiChunkHeader));
+#elif USE_MPI_CTRLMSG_SCHEME && CMK_CONVERSE_MPI
+  MPI_Alloc_mem(size+sizeof(CmiChunkHeader), MPI_INFO_NULL, &res);
 #else
   res =(char *) malloc_nomigrate(size+sizeof(CmiChunkHeader));
 #endif
@@ -2947,6 +2954,8 @@ void CmiFree(void *blk)
     LrtsFree(BLKSTART(parentBlk));
 #elif CONVERSE_POOL
     CmiPoolFree(BLKSTART(parentBlk));
+#elif USE_MPI_CTRLMSG_SCHEME && CMK_CONVERSE_MPI
+    MPI_Free_mem(parentBlk);
 #else
     free_nomigrate(BLKSTART(parentBlk));
 #endif