Implemented an alternative to use MPI_Irecv for msgs that use rzv protocol.
authorChao Mei <chaomei2@illinois.edu>
Fri, 6 Jan 2012 23:15:43 +0000 (17:15 -0600)
committerChao Mei <chaomei2@illinois.edu>
Fri, 6 Jan 2012 23:15:43 +0000 (17:15 -0600)
src/arch/mpi/machine.c

index 03563d8f5ef315667aab3049776de59e5c8e2025..c2e054555c771201d5f33b932f2aa5ae5a994dd8 100644 (file)
@@ -163,12 +163,49 @@ CpvDeclare(char**,CmiPostedRecvBuffers);
 static int MSG_HISTOGRAM_BINSIZE;
 static int MAX_HISTOGRAM_BUCKETS; /* only cares msg size less 2 MB */
 CpvDeclare(int *, MSG_HISTOGRAM_ARRAY);
-static void recordMsgHistogramInfo(int size, char *msg);
+static void recordMsgHistogramInfo(int size);
 static void reportMsgHistogramInfo();
 #endif /* end of MPI_DYNAMIC_POST_RECV defined */
 
 #endif /* end of MPI_POST_RECV defined */
 
+/* Defining this macro will use MPI_Irecv instead of MPI_Recv for
+ * large messages. This could save synchronization overhead caused by
+ * the rzv protocol used by MPI
+ */
+#define USE_ASYNC_RECV_FUNC 0
+
+#ifdef USE_ASYNC_RECV_FUNC
+static int IRECV_MSG_THRESHOLD = 8000;
+typedef struct IRecvListEntry{
+    MPI_Request req;
+    char *msg;
+    int size;
+    struct IRecvListEntry *next;
+}*IRecvList;
+
+static IRecvList freedIrecvList = NULL; /* used to recycle the entries */
+static IRecvList waitIrecvListHead = NULL; /* points to the guardian entry, i.e., the next of it points to the first entry */
+static IRecvList waitIrecvListTail = NULL; /* points to the last entry */
+
+static IRecvList irecvListEntryAllocate(){
+    IRecvList ret;
+    if(freedIrecvList == NULL) {
+        ret = (IRecvList)malloc(sizeof(struct IRecvListEntry));        
+        return ret;
+    } else {
+        ret = freedIrecvList;
+        freedIrecvList = freedIrecvList->next;
+        return ret;
+    }
+}
+static void irecvListEntryFree(IRecvList used){
+    used->next = freedIrecvList;
+    freedIrecvList = used;
+}
+
+#endif /* end of USE_ASYNC_RECV_FUNC */
+
 /* Providing functions for external usage to set up the dynamic recv buffer
  * when the user is aware that it's safe to call such function
  */
@@ -179,7 +216,7 @@ void CmiSetupMachineRecvBuffers();
 static int MSG_HISTOGRAM_BINSIZE=1000;
 static int MAX_HISTOGRAM_BUCKETS=2000; /* only cares msg size less 2 MB */
 CpvDeclare(int *, MSG_HISTOGRAM_ARRAY);
-static void recordMsgHistogramInfo(int size, char *msg);
+static void recordMsgHistogramInfo(int size);
 static void reportMsgHistogramInfo();
 #endif
 
@@ -342,7 +379,8 @@ static void MachinePostNonLocalForMPI();
 
 #if CMK_SMP
 static void EnqueueMsg(void *m, int size, int node, int mode) {
-    SMSG_LIST *msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));
+    /*SMSG_LIST *msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));*/
+    SMSG_LIST *msg_tmp = (SMSG_LIST *) malloc(sizeof(SMSG_LIST));
     MACHSTATE1(3,"EnqueueMsg to node %d {{ ", node);
     msg_tmp->msg = m;
     msg_tmp->size = size;
@@ -461,7 +499,8 @@ static CmiCommHandle MachineSpecificSendForMPI(int destNode, int size, char *msg
     }
 #endif
     /* non smp */
-    msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));
+    /*msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));*/
+    msg_tmp = (SMSG_LIST *) malloc(sizeof(SMSG_LIST));
     msg_tmp->msg = msg;
     msg_tmp->destpe = destNode;
     msg_tmp->size = size;
@@ -539,7 +578,8 @@ static void CmiReleaseSentMessages(void) {
             else
                 prev->next = temp;
             CmiFree(msg_tmp->msg);
-            CmiFree(msg_tmp);
+            /* CmiFree(msg_tmp); */
+            free(msg_tmp);
             msg_tmp = temp;
         } else {
             prev = msg_tmp;
@@ -578,6 +618,7 @@ static int PumpMsgs(void) {
 #endif
 
     while (1) {
+        int doSyncRecv = 1;
 #if CMI_EXERT_RECV_CAP
         if (recvCnt==RECV_CAP) break;
 #elif CMI_DYNAMIC_EXERT_CAP
@@ -632,16 +673,31 @@ static int PumpMsgs(void) {
             if (res != MPI_SUCCESS)
                 CmiAbort("MPI_Iprobe failed\n");
             if (!flg) break;
+            
             recd = 1;
             MPI_Get_count(&sts, MPI_BYTE, &nbytes);
             msg = (char *) CmiAlloc(nbytes);
 
+#if USE_ASYNC_RECV_FUNC
+            if(nbytes >= IRECV_MSG_THRESHOLD) doSyncRecv = 0;
+#endif
             START_EVENT();
-
-            if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, MPI_COMM_WORLD,&sts))
-                CmiAbort("PumpMsgs: MPI_Recv failed!\n");
-
-            END_EVENT(30);
+            if(doSyncRecv){            
+                if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, MPI_COMM_WORLD,&sts))
+                    CmiAbort("PumpMsgs: MPI_Recv failed!\n");            
+            }
+#if USE_ASYNC_RECV_FUNC        
+            else {
+                IRecvList one = irecvListEntryAllocate();
+                if(MPI_SUCCESS != MPI_Irecv(msg, nbytes, MPI_BYTE, sts.MPI_SOURCE, sts.MPI_TAG, MPI_COMM_WORLD, &(one->req));
+                    CmiAbort("PumpMsgs: MPI_Irecv failed!\n");
+                one->msg = msg;
+                one->size = nbytes;
+                one->next = NULL;
+                waitIrecvListTail->next = one;            
+            }
+#endif
+            /*END_EVENT(30);*/
 
             CpvAccess(Cmi_unposted_recv_total)++;
         }
@@ -662,16 +718,31 @@ static int PumpMsgs(void) {
             if (endT-startT>=0.001) traceUserSuppliedBracketedNote("MPI_Iprobe before a recv call", 70, startT, endT);
         }
 #endif
-
         recd = 1;
         MPI_Get_count(&sts, MPI_BYTE, &nbytes);
         msg = (char *) CmiAlloc(nbytes);
 
+#if USE_ASYNC_RECV_FUNC
+        if(nbytes >= IRECV_MSG_THRESHOLD) doSyncRecv = 0;
+#endif
         START_EVENT();
-
-        if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, MPI_COMM_WORLD,&sts))
-            CmiAbort("PumpMsgs: MPI_Recv failed!\n");
-
+        if(doSyncRecv){            
+            if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, MPI_COMM_WORLD,&sts))
+                CmiAbort("PumpMsgs: MPI_Recv failed!\n");            
+        }
+#if USE_ASYNC_RECV_FUNC        
+        else {
+            IRecvList one = irecvListEntryAllocate();
+            if(MPI_SUCCESS != MPI_Irecv(msg, nbytes, MPI_BYTE, sts.MPI_SOURCE, sts.MPI_TAG, MPI_COMM_WORLD, &(one->req)))
+                CmiAbort("PumpMsgs: MPI_Irecv failed!\n");
+            one->msg = msg;
+            one->size = nbytes;
+            one->next = NULL;
+            waitIrecvListTail->next = one;
+            waitIrecvListTail = one;
+            /*printf("PE[%d]: MPI_Irecv msg=%p, size=%d, entry=%p\n", CmiMyPe(), msg, nbytes, one);*/
+        }
+#endif        
         /*END_EVENT(30);*/
 
 #endif /*end of not MPI_POST_RECV */
@@ -702,11 +773,12 @@ static int PumpMsgs(void) {
             continue;
         }
 #endif
-
-        handleOneRecvedMsg(nbytes, msg);
+        if(doSyncRecv){
+            handleOneRecvedMsg(nbytes, msg);
+        }
         
 #if CAPTURE_MSG_HISTOGRAM || MPI_DYNAMIC_POST_RECV
-        recordMsgHistogramInfo(nbytes, msg);
+        recordMsgHistogramInfo(nbytes);
 #endif
 
 #if  MPI_POST_RECV
@@ -780,6 +852,42 @@ static int PumpMsgs(void) {
 
     }
 
+#if USE_ASYNC_RECV_FUNC
+/* Another loop to check the irecved msgs list */
+{
+    IRecvList irecvEnt;
+    int irecvDone = 0;
+    MPI_Status sts;
+    while(waitIrecvListHead->next) {
+        IRecvList irecvEnt = waitIrecvListHead->next;
+#if CMK_SMP_TRACE_COMMTHREAD
+        START_EVENT();
+#endif        
+        
+        /*printf("PE[%d]: check irecv entry=%p\n", CmiMyPe(), irecvEnt);*/
+        if(MPI_SUCCESS != MPI_Test(&(irecvEnt->req), &irecvDone, &sts))
+            CmiAbort("PumpMsgs: MPI_Test failed!\n");
+        if(!irecvDone) break; /* in-order recv */
+
+#if CMK_SMP_TRACE_COMMTHREAD
+        traceBeginCommOp(irecvEnt->msg);
+        traceChangeLastTimestamp(CpvAccess(projTraceStart));
+        traceEndCommOp(irecvEnt->msg);
+#endif
+    
+        /*printf("PE[%d]: irecv entry=%p finished with size=%d, msg=%p\n", CmiMyPe(), irecvEnt, irecvEnt->size, irecvEnt->msg);*/
+        
+        handleOneRecvedMsg(irecvEnt->size, irecvEnt->msg);        
+        waitIrecvListHead->next = irecvEnt->next;
+        irecvListEntryFree(irecvEnt);
+        recd = 1;        
+    }
+    if(waitIrecvListHead->next == NULL)
+        waitIrecvListTail = waitIrecvListHead;
+}
+#endif
+
+
     MACHSTATE(2,"} PumpMsgs end ");
     return recd;
 }
@@ -1375,6 +1483,13 @@ static void MachineInitForMPI(int *argc, char ***argv, int *numNodes, int *myNod
     }
 #endif
 
+#if USE_ASYNC_RECV_FUNC
+    CmiGetArgInt(largv, "+irecvMsgThreshold", &IRECV_MSG_THRESHOLD);
+    if(myNID==0) {
+        printf("Charm++: for msg size larger than %d, MPI_Irecv is going to be used.\n", IRECV_MSG_THRESHOLD);
+    }
+#endif
+
     /* checksum flag */
     if (CmiGetArgFlag(largv,"+checksum")) {
 #if CMK_ERROR_CHECKING
@@ -1481,6 +1596,19 @@ static void MachinePreCommonInitForMPI(int everReturn) {
     CpvAccess(MSG_HISTOGRAM_ARRAY) = (int *)malloc(sizeof(int)*MAX_HISTOGRAM_BUCKETS);
     memset(CpvAccess(MSG_HISTOGRAM_ARRAY), 0, sizeof(int)*MAX_HISTOGRAM_BUCKETS);
 #endif
+
+#if USE_ASYNC_RECV_FUNC
+#if CMK_SMP
+    /* allocate the guardian entry only on comm thread considering NUMA */
+    if(CmiMyRank() == CmiMyNodeSize()) {
+        waitIrecvListHead = waitIrecvListTail = irecvListEntryAllocate();
+        waitIrecvListHead->next = NULL;
+    }
+#else    
+    waitIrecvListHead = waitIrecvListTail = irecvListEntryAllocate();
+    waitIrecvListHead->next = NULL;
+#endif
+#endif
 }
 
 static void MachinePostCommonInitForMPI(int everReturn) {
@@ -1800,6 +1928,7 @@ void CkDieNow()
 
 #endif
 
+/*======Beginning of Msg Histogram or Dynamic Post-Recv Related Funcs=====*/
 #if CAPTURE_MSG_HISTOGRAM || MPI_DYNAMIC_POST_RECV
 /* Functions related with capturing msg histogram */
 
@@ -1840,7 +1969,7 @@ static void consumeAllMsgs()
     }
 }
 
-static void recordMsgHistogramInfo(int size, char *msg)
+static void recordMsgHistogramInfo(int size)
 {
     int idx = 0;
     size -= MPI_POST_RECV_LOWERSIZE;
@@ -1958,7 +2087,7 @@ static void buildDynamicRecvBuffers()
                         if (MPI_SUCCESS != MPI_Irecv(ptr->postedRecvBufs[j], bufSize, MPI_BYTE,
                                                      MPI_ANY_SOURCE, POST_RECV_TAG+ptr->msgSizeIdx,
                                                      MPI_COMM_WORLD, ptr->postedRecvReqs+j))
-                            CmiAbort("MPI_Irecv failed in recordMsgHistogramInfo!\n");
+                            CmiAbort("MPI_Irecv failed in buildDynamicRecvBuffers!\n");
                     }
 
                     /* We already posted bufs for this range of msg size */
@@ -1993,7 +2122,7 @@ static void buildDynamicRecvBuffers()
                     if (MPI_SUCCESS != MPI_Irecv(one->postedRecvBufs[j], bufSize, MPI_BYTE,
                                                  MPI_ANY_SOURCE, POST_RECV_TAG+one->msgSizeIdx,
                                                  MPI_COMM_WORLD, one->postedRecvReqs+j))
-                        CmiAbort("MPI_Irecv failed in recordMsgHistogramInfo!\n");
+                        CmiAbort("MPI_Irecv failed in buildDynamicRecvBuffers!\n");
                 }
             } /* end if notFound */
 
@@ -2030,18 +2159,18 @@ static void buildDynamicRecvBuffers()
     memset(CpvAccess(MSG_HISTOGRAM_ARRAY), 0, sizeof(int)*MAX_HISTOGRAM_BUCKETS);
 } /* end of function buildDynamicRecvBuffers */
 
-static void examineMsgHistogramInfo(int size, char *msg)
+static void examineMsgHistogramInfo(int size)
 {
     int total = CpvAccess(msgRecvCnt)++;
     if (total < MPI_POST_RECV_FREQ) {
-        recordMsgHistogramInfo(size, msg);
+        recordMsgHistogramInfo(size);
     } else {
         buildDynamicRecvBuffers();
     }
 }
 #else
 /* case when CAPTURE_MSG_HISTOGRAM is defined */
-static void recordMsgHistogramInfo(int size, char *msg)
+static void recordMsgHistogramInfo(int size)
 {
     int idx = size/MSG_HISTOGRAM_BINSIZE;
     if (idx >= MAX_HISTOGRAM_BUCKETS) idx = MAX_HISTOGRAM_BUCKETS-1;
@@ -2089,6 +2218,7 @@ void CmiSetupMachineRecvBuffersUser()
     buildDynamicRecvBuffers();
 #endif
 }
+/*=======End of Msg Histogram or Dynamic Post-Recv Related Funcs======*/
 
 
 /*@}*/