A fresh implementation of post-recv scheme for MPI machine layer, including a dynamic...
authorChao Mei <chaomei2@illinois.edu>
Wed, 4 Jan 2012 05:42:05 +0000 (23:42 -0600)
committerChao Mei <chaomei2@illinois.edu>
Wed, 4 Jan 2012 05:44:40 +0000 (23:44 -0600)
src/arch/mpi/machine.c

index 89655e682ee049303cfffc2ef766b16059bb8fe0..03563d8f5ef315667aab3049776de59e5c8e2025 100644 (file)
@@ -107,15 +107,80 @@ CpvStaticDeclare(double, projTraceStart);
 /* Making those parameters configurable for testing them easily */
 
 #if MPI_POST_RECV
+#define MPI_DYNAMIC_POST_RECV 0
+
+/* Note the tag offset of a msg is determined by
+ * (its size - MPI_RECV_LOWERSIZE)/MPI_POST_RECV_INC.
+ * based on POST_RECV_TAG.
+ */
 static int MPI_POST_RECV_COUNT=10;
-static int MPI_POST_RECV_LOWERSIZE=2000;
-static int MPI_POST_RECV_UPPERSIZE=4000;
+
+/* The range of msgs to be tracked for histogramming */
+static int MPI_POST_RECV_LOWERSIZE=8000;
+static int MPI_POST_RECV_UPPERSIZE=64000;
+
+/* The increment of msg size to be tracked, i.e. the histogram bucket size */
+static int MPI_POST_RECV_INC = 1000;
+
+/* The unit increment of msg cnt for increase #buf for a post recved msg */
+static int MPI_POST_RECV_MSG_INC = 400;
+
+/* If the #msg exceeds this value, post recv is created for such msg */
+static int MPI_POST_RECV_MSG_CNT_THRESHOLD = 200;
+
+/* The frequency of checking the existing posted recv buffers in the unit of #msgs */
+static int MPI_POST_RECV_FREQ = 1000;
+
 static int MPI_POST_RECV_SIZE;
 
+typedef struct mpiPostRecvList {
+    /* POST_RECV_TAG + msgSizeIdx is the recv tag;
+     * Based on this value, this buf corresponds to msg size ranging
+     * [msgSizeIdx*MPI_POST_RECV_INC, (msgSizeIdx+1)*MPI_POST_RECV_INC)
+     */
+    int msgSizeIdx;
+    int bufCnt;
+    MPI_Request *postedRecvReqs;
+    char **postedRecvBufs;
+    struct mpiPostRecvList *next;
+} MPIPostRecvList;
+CpvDeclare(MPIPostRecvList *, postRecvListHdr);
+CpvDeclare(MPIPostRecvList *, curPostRecvPtr);
+CpvDeclare(int, msgRecvCnt);
+
 CpvDeclare(unsigned long long, Cmi_posted_recv_total);
 CpvDeclare(unsigned long long, Cmi_unposted_recv_total);
 CpvDeclare(MPI_Request*, CmiPostedRecvRequests); /* An array of request handles for posted recvs */
-CpvDeclare(char*,CmiPostedRecvBuffers);
+CpvDeclare(char**,CmiPostedRecvBuffers);
+
+/* Note: currently MPI doesn't provide a function whether a request is in progress.
+ * For example, a irecv has been filled partially. Then a call to MPI_Test still returns
+ * indicating it has not been finished. If only relying on this result, then calling
+ * MPI_Cancel will result in a loss of this msg. The dynamic post recv mechanism
+ * can only be safely used in a synchronized point such as load balancing.
+ */
+#if MPI_DYNAMIC_POST_RECV
+static int MSG_HISTOGRAM_BINSIZE;
+static int MAX_HISTOGRAM_BUCKETS; /* only cares msg size less 2 MB */
+CpvDeclare(int *, MSG_HISTOGRAM_ARRAY);
+static void recordMsgHistogramInfo(int size, char *msg);
+static void reportMsgHistogramInfo();
+#endif /* end of MPI_DYNAMIC_POST_RECV defined */
+
+#endif /* end of MPI_POST_RECV defined */
+
+/* Providing functions for external usage to set up the dynamic recv buffer
+ * when the user is aware that it's safe to call such function
+ */
+void CmiSetupMachineRecvBuffers();
+
+#define CAPTURE_MSG_HISTOGRAM 0
+#if CAPTURE_MSG_HISTOGRAM && !MPI_DYNAMIC_POST_RECV
+static int MSG_HISTOGRAM_BINSIZE=1000;
+static int MAX_HISTOGRAM_BUCKETS=2000; /* only cares msg size less 2 MB */
+CpvDeclare(int *, MSG_HISTOGRAM_ARRAY);
+static void recordMsgHistogramInfo(int size, char *msg);
+static void reportMsgHistogramInfo();
 #endif
 
 /* to avoid MPI's in order delivery, changing MPI Tag all the time */
@@ -316,10 +381,17 @@ static CmiCommHandle MPISendOneMsg(SMSG_LIST *smsg) {
 #endif
 
 #if MPI_POST_RECV
-    if (size>=MPI_POST_RECV_LOWERSIZE && size <= MPI_POST_RECV_UPPERSIZE) {
+    if (size>=MPI_POST_RECV_LOWERSIZE && size < MPI_POST_RECV_UPPERSIZE) {
+#if MPI_DYNAMIC_POST_RECV
+        int sendTagOffset = (size-MPI_POST_RECV_LOWERSIZE)/MPI_POST_RECV_INC+1;
+        START_EVENT();
+        if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,POST_RECV_TAG+sendTagOffset,MPI_COMM_WORLD,&(smsg->req)))
+            CmiAbort("MPISendOneMsg: MPI_Isend failed!\n");
+#else
         START_EVENT();
         if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,POST_RECV_TAG,MPI_COMM_WORLD,&(smsg->req)))
             CmiAbort("MPISendOneMsg: MPI_Isend failed!\n");
+#endif
         /*END_EVENT(40);*/
     } else {
         START_EVENT();
@@ -329,12 +401,7 @@ static CmiCommHandle MPISendOneMsg(SMSG_LIST *smsg) {
     }
 #else
     START_EVENT();
-#if CMK_MEM_CHECKPOINT
-    dstrank = petorank[node];
-#else
-    dstrank = node;
-#endif
-    if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,dstrank,TAG,MPI_COMM_WORLD,&(smsg->req)))
+    if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,TAG,MPI_COMM_WORLD,&(smsg->req)))
         CmiAbort("MPISendOneMsg: MPI_Isend failed!\n");
     /*END_EVENT(40);*/
 #endif
@@ -358,7 +425,7 @@ static CmiCommHandle MPISendOneMsg(SMSG_LIST *smsg) {
 #endif
 #endif
 
-    MACHSTATE(3,"}MPI_send end");
+    MACHSTATE(3,"}MPI_Isend end");
     CpvAccess(MsgQueueLen)++;
     if (CpvAccess(sent_msgs)==0)
         CpvAccess(sent_msgs) = smsg;
@@ -517,36 +584,51 @@ static int PumpMsgs(void) {
         if (recvCnt >= dynamicRecvCap) break;
 #endif
 
+#if CMI_SMP_TRACE_COMMTHREAD
+        START_EVENT();
+#endif
+
         /* First check posted recvs then do  probe unmatched outstanding messages */
 #if MPI_POST_RECV
-        int completed_index=-1;
-        if (MPI_SUCCESS != MPI_Testany(MPI_POST_RECV_COUNT, CpvAccess(CmiPostedRecvRequests), &completed_index, &flg, &sts))
+        MPIPostRecvList *postedOne = NULL;
+        int completed_index = -1;
+        flg = 0;
+#if MPI_DYNAMIC_POST_RECV
+        MPIPostRecvList *oldPostRecvPtr = CpvAccess(curPostRecvPtr);
+        if (oldPostRecvPtr) {
+            /* post recv buf inited */
+            do {
+                /* round-robin iteration over the list */
+                MPIPostRecvList *cur = CpvAccess(curPostRecvPtr);
+                if (MPI_SUCCESS != MPI_Testany(cur->bufCnt, cur->postedRecvReqs, &completed_index, &flg, &sts))
+                    CmiAbort("PumpMsgs: MPI_Testany failed!\n");
+
+                if (flg) {
+                    postedOne = cur;
+                    break;
+                }
+                CpvAccess(curPostRecvPtr) = CpvAccess(curPostRecvPtr)->next;
+            } while (CpvAccess(curPostRecvPtr) != oldPostRecvPtr);
+        }
+#else
+        MPIPostRecvList *cur = CpvAccess(curPostRecvPtr);
+        if (MPI_SUCCESS != MPI_Testany(cur->bufCnt, cur->postedRecvReqs, &completed_index, &flg, &sts))
             CmiAbort("PumpMsgs: MPI_Testany failed!\n");
+#endif
         if (flg) {
             if (MPI_SUCCESS != MPI_Get_count(&sts, MPI_BYTE, &nbytes))
                 CmiAbort("PumpMsgs: MPI_Get_count failed!\n");
 
             recd = 1;
-            msg = (char *) CmiAlloc(nbytes);
-            memcpy(msg,&(CpvAccess(CmiPostedRecvBuffers)[completed_index*MPI_POST_RECV_SIZE]),nbytes);
-            /* and repost the recv */
-
-            START_EVENT();
-
-            if (MPI_SUCCESS != MPI_Irecv(  &(CpvAccess(CmiPostedRecvBuffers)[completed_index*MPI_POST_RECV_SIZE])      ,
-                                           MPI_POST_RECV_SIZE,
-                                           MPI_BYTE,
-                                           MPI_ANY_SOURCE,
-                                           POST_RECV_TAG,
-                                           MPI_COMM_WORLD,
-                                           &(CpvAccess(CmiPostedRecvRequests)[completed_index])  ))
-                CmiAbort("PumpMsgs: MPI_Irecv failed!\n");
-
-            END_EVENT(50);
+#if !MPI_DYNAMIC_POST_RECV
+            postedOne = CpvAccess(curPostRecvPtr);
+#endif
+            msg = (postedOne->postedRecvBufs)[completed_index];
+            (postedOne->postedRecvBufs)[completed_index] = NULL;
 
             CpvAccess(Cmi_posted_recv_total)++;
         } else {
-            res = MPI_Iprobe(MPI_ANY_SOURCE, TAG, MPI_COMM_WORLD, &flg, &sts);
+            res = MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flg, &sts);
             if (res != MPI_SUCCESS)
                 CmiAbort("MPI_Iprobe failed\n");
             if (!flg) break;
@@ -592,7 +674,7 @@ static int PumpMsgs(void) {
 
         /*END_EVENT(30);*/
 
-#endif
+#endif /*end of not MPI_POST_RECV */
 
 #if CMK_SMP_TRACE_COMMTHREAD
         traceBeginCommOp(msg);
@@ -622,6 +704,54 @@ static int PumpMsgs(void) {
 #endif
 
         handleOneRecvedMsg(nbytes, msg);
+        
+#if CAPTURE_MSG_HISTOGRAM || MPI_DYNAMIC_POST_RECV
+        recordMsgHistogramInfo(nbytes, msg);
+#endif
+
+#if  MPI_POST_RECV
+#if MPI_DYNAMIC_POST_RECV
+        if (postedOne) {
+            //printf("[%d]: get one posted recv\n", CmiMyPe());
+            /* Get the upper size of this buffer */
+            int postRecvBufSize = postedOne->msgSizeIdx*MPI_POST_RECV_INC + MPI_POST_RECV_LOWERSIZE - 1;
+            int postRecvTag = POST_RECV_TAG + postedOne->msgSizeIdx;
+            /* Has to re-allocate the buffer for the message */
+            (postedOne->postedRecvBufs)[completed_index] = (char *)CmiAlloc(postRecvBufSize);
+
+            /* and repost the recv */
+            START_EVENT();
+
+            if (MPI_SUCCESS != MPI_Irecv((postedOne->postedRecvBufs)[completed_index] ,
+                                         postRecvBufSize,
+                                         MPI_BYTE,
+                                         MPI_ANY_SOURCE,
+                                         postRecvTag,
+                                         MPI_COMM_WORLD,
+                                         &((postedOne->postedRecvReqs)[completed_index])  ))
+                CmiAbort("PumpMsgs: MPI_Irecv failed!\n");
+            END_EVENT(50);
+        }
+#else
+        if (postedOne) {
+            /* Has to re-allocate the buffer for the message */
+            (postedOne->postedRecvBufs)[completed_index] = (char *)CmiAlloc(MPI_POST_RECV_SIZE);
+
+            /* and repost the recv */
+            START_EVENT();
+
+            if (MPI_SUCCESS != MPI_Irecv((postedOne->postedRecvBufs)[completed_index] ,
+                                         MPI_POST_RECV_SIZE,
+                                         MPI_BYTE,
+                                         MPI_ANY_SOURCE,
+                                         POST_RECV_TAG,
+                                         MPI_COMM_WORLD,
+                                         &((postedOne->postedRecvReqs)[completed_index])  ))
+                CmiAbort("PumpMsgs: MPI_Irecv failed!\n");
+            END_EVENT(50);
+        }
+#endif /* not MPI_DYNAMIC_POST_RECV */
+#endif
 
 #if CMI_EXERT_RECV_CAP
         recvCnt++;
@@ -634,14 +764,14 @@ static int PumpMsgs(void) {
              * by MPI
              */
         if (PCQueueLength(sendMsgBuf) > CMI_DYNAMIC_OUTGOING_THRESHOLD
-                || MsgQueueLen > CMI_DYNAMIC_OUTGOING_THRESHOLD) {
+                || CpvAccess(MsgQueueLen) > CMI_DYNAMIC_OUTGOING_THRESHOLD) {
             dynamicRecvCap = CMI_DYNAMIC_RECV_CAPSIZE;
         }
 #else
         /* MsgQueueLen indicates the number of messages that have not been released
              * by MPI
              */
-        if (MsgQueueLen > CMI_DYNAMIC_OUTGOING_THRESHOLD) {
+        if (CpvAccess(MsgQueueLen) > CMI_DYNAMIC_OUTGOING_THRESHOLD) {
             dynamicRecvCap = CMI_DYNAMIC_RECV_CAPSIZE;
         }
 #endif
@@ -746,7 +876,7 @@ static int SendMsgBuf() {
             if (++sentCnt == SEND_CAP) break;
 #elif CMI_DYNAMIC_EXERT_CAP
             if (++sentCnt >= dynamicSendCap) break;
-            if (MsgQueueLen > CMI_DYNAMIC_OUTGOING_THRESHOLD)
+            if (CpvAccess(MsgQueueLen) > CMI_DYNAMIC_OUTGOING_THRESHOLD)
                 dynamicSendCap = CMI_DYNAMIC_SEND_CAPSIZE;
 #endif
 
@@ -926,21 +1056,30 @@ void DrainResourcesForMPI() {
 }
 
 void MachineExitForMPI() {
+    int i;
 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
     int doPrint = 0;
-#if CMK_SMP
     if (CmiMyNode()==0) doPrint = 1;
-#else
-    if (CmiMyPe()==0) doPrint = 1;
-#endif
 
-    if (doPrint) {
+    if (doPrint /*|| CmiMyNode()%11==0 */) {
 #if MPI_POST_RECV
-        CmiPrintf("%llu posted receives,  %llu unposted receives\n", CpvAccess(Cmi_posted_recv_total), CpvAccess(Cmi_unposted_recv_total));
+        CmiPrintf("node[%d]: %llu posted receives,  %llu unposted receives\n", CmiMyNode(), CpvAccess(Cmi_posted_recv_total), CpvAccess(Cmi_unposted_recv_total));
 #endif
     }
 #endif
 
+#if MPI_POST_RECV
+    {
+        MPIPostRecvList *ptr = CpvAccess(postRecvListHdr);
+        if (ptr) {
+            do {
+                for (i=0; i<ptr->bufCnt; i++) MPI_Cancel(ptr->postedRecvReqs+i);
+                ptr = ptr->next;
+            } while (ptr!=CpvAccess(postRecvListHdr));
+        }
+    }
+#endif
+
 #if REPORT_COMM_METRICS
 #if CMK_SMP
     CmiPrintf("Report comm metrics for node %d[%d-%d]: pumptime: %f, releasetime: %f, senttime: %f\n",
@@ -1212,12 +1351,17 @@ static void MachineInitForMPI(int *argc, char ***argv, int *numNodes, int *myNod
     CmiGetArgInt(largv, "+postRecvCnt", &MPI_POST_RECV_COUNT);
     CmiGetArgInt(largv, "+postRecvLowerSize", &MPI_POST_RECV_LOWERSIZE);
     CmiGetArgInt(largv, "+postRecvUpperSize", &MPI_POST_RECV_UPPERSIZE);
+    CmiGetArgInt(largv, "+postRecvThreshold", &MPI_POST_RECV_MSG_CNT_THRESHOLD);
+    CmiGetArgInt(largv, "+postRecvBucketSize", &MPI_POST_RECV_INC);
+    CmiGetArgInt(largv, "+postRecvMsgInc", &MPI_POST_RECV_MSG_INC);
+    CmiGetArgInt(largv, "+postRecvCheckFreq", &MPI_POST_RECV_FREQ);
     if (MPI_POST_RECV_COUNT<=0) MPI_POST_RECV_COUNT=1;
     if (MPI_POST_RECV_LOWERSIZE>MPI_POST_RECV_UPPERSIZE) MPI_POST_RECV_UPPERSIZE = MPI_POST_RECV_LOWERSIZE;
     MPI_POST_RECV_SIZE = MPI_POST_RECV_UPPERSIZE;
     if (myNID==0) {
-        printf("Charm++: using post-recv scheme with %d pre-posted recvs ranging from %d to %d (bytes)\n",
-               MPI_POST_RECV_COUNT, MPI_POST_RECV_LOWERSIZE, MPI_POST_RECV_UPPERSIZE);
+        printf("Charm++: using post-recv scheme with %d pre-posted recvs ranging from %d to %d (bytes) with msg count threshold %d and msg histogram bucket size %d, #buf increment every %d msgs. The buffers are checked every %d msgs\n",
+               MPI_POST_RECV_COUNT, MPI_POST_RECV_LOWERSIZE, MPI_POST_RECV_UPPERSIZE,
+               MPI_POST_RECV_MSG_CNT_THRESHOLD, MPI_POST_RECV_INC, MPI_POST_RECV_MSG_INC, MPI_POST_RECV_FREQ);
     }
 #endif
 
@@ -1276,32 +1420,67 @@ static void MachinePreCommonInitForMPI(int everReturn) {
     CpvInitialize(unsigned long long, Cmi_posted_recv_total);
     CpvInitialize(unsigned long long, Cmi_unposted_recv_total);
     CpvInitialize(MPI_Request*, CmiPostedRecvRequests);
-    CpvInitialize(char*,CmiPostedRecvBuffers);
+    CpvInitialize(char **, CmiPostedRecvBuffers);
+
+    CpvAccess(CmiPostedRecvRequests) = NULL;
+    CpvAccess(CmiPostedRecvBuffers) = NULL;
+
+    CpvInitialize(MPIPostRecvList *, postRecvListHdr);
+    CpvInitialize(MPIPostRecvList *, curPostRecvPtr);
+    CpvInitialize(int, msgRecvCnt);
+
+    CpvAccess(postRecvListHdr) = NULL;
+    CpvAccess(curPostRecvPtr) = NULL;
+    CpvAccess(msgRecvCnt) = 0;
+
+#if MPI_DYNAMIC_POST_RECV
+    CpvInitialize(int *, MSG_HISTOGRAM_ARRAY);
+#endif
 
     if (doInit) {
+#if MPI_DYNAMIC_POST_RECV
+        MSG_HISTOGRAM_BINSIZE = MPI_POST_RECV_INC;
+        /* including two more buckets that are out of the range [LOWERSIZE, UPPERSIZE] */
+        MAX_HISTOGRAM_BUCKETS = (MPI_POST_RECV_UPPERSIZE - MPI_POST_RECV_LOWERSIZE)/MSG_HISTOGRAM_BINSIZE+2;
+        CpvAccess(MSG_HISTOGRAM_ARRAY) = (int *)malloc(sizeof(int)*MAX_HISTOGRAM_BUCKETS);
+        memset(CpvAccess(MSG_HISTOGRAM_ARRAY), 0, sizeof(int)*MAX_HISTOGRAM_BUCKETS);
+#else
         /* Post some extra recvs to help out with incoming messages */
         /* On some MPIs the messages are unexpected and thus slow */
 
-        /* An array of request handles for posted recvs */
-        CpvAccess(CmiPostedRecvRequests) = (MPI_Request*)malloc(sizeof(MPI_Request)*MPI_POST_RECV_COUNT);
+        CpvAccess(postRecvListHdr) = (MPIPostRecvList *)malloc(sizeof(MPIPostRecvList));
 
+        /* An array of request handles for posted recvs */
+        CpvAccess(postRecvListHdr)->msgSizeIdx = -1;
+        CpvAccess(postRecvListHdr)->bufCnt = MPI_POST_RECV_COUNT;
+        CpvAccess(postRecvListHdr)->postedRecvReqs = (MPI_Request*)malloc(sizeof(MPI_Request)*MPI_POST_RECV_COUNT);
         /* An array of buffers for posted recvs */
-        CpvAccess(CmiPostedRecvBuffers) = (char*)malloc(MPI_POST_RECV_COUNT*MPI_POST_RECV_SIZE);
+        CpvAccess(postRecvListHdr)->postedRecvBufs = (char**)malloc(MPI_POST_RECV_COUNT*sizeof(char *));
+        CpvAccess(postRecvListHdr)->next = CpvAccess(postRecvListHdr);
+        CpvAccess(curPostRecvPtr) = CpvAccess(postRecvListHdr);
 
         /* Post Recvs */
         for (i=0; i<MPI_POST_RECV_COUNT; i++) {
-            if (MPI_SUCCESS != MPI_Irecv(  &(CpvAccess(CmiPostedRecvBuffers)[i*MPI_POST_RECV_SIZE])    ,
-                                           MPI_POST_RECV_SIZE,
-                                           MPI_BYTE,
-                                           MPI_ANY_SOURCE,
-                                           POST_RECV_TAG,
-                                           MPI_COMM_WORLD,
-                                           &(CpvAccess(CmiPostedRecvRequests)[i])  ))
+            char *tmpbuf = (char *)CmiAlloc(MPI_POST_RECV_SIZE); /* Note: could be aligned allocation?? */
+            CpvAccess(postRecvListHdr)->postedRecvBufs[i] = tmpbuf;
+            if (MPI_SUCCESS != MPI_Irecv(tmpbuf,
+                                         MPI_POST_RECV_SIZE,
+                                         MPI_BYTE,
+                                         MPI_ANY_SOURCE,
+                                         POST_RECV_TAG,
+                                         MPI_COMM_WORLD,
+                                         CpvAccess(postRecvListHdr)->postedRecvReqs+i  ))
                 CmiAbort("MPI_Irecv failed\n");
         }
-    }
 #endif
+    }
+#endif /* end of MPI_POST_RECV */
 
+#if CAPTURE_MSG_HISTOGRAM && !MPI_DYNAMIC_POST_RECV
+    CpvInitialize(int *, MSG_HISTOGRAM_ARRAY);
+    CpvAccess(MSG_HISTOGRAM_ARRAY) = (int *)malloc(sizeof(int)*MAX_HISTOGRAM_BUCKETS);
+    memset(CpvAccess(MSG_HISTOGRAM_ARRAY), 0, sizeof(int)*MAX_HISTOGRAM_BUCKETS);
+#endif
 }
 
 static void MachinePostCommonInitForMPI(int everReturn) {
@@ -1621,5 +1800,296 @@ void CkDieNow()
 
 #endif
 
+#if CAPTURE_MSG_HISTOGRAM || MPI_DYNAMIC_POST_RECV
+/* Functions related with capturing msg histogram */
+
+#if MPI_DYNAMIC_POST_RECV
+/* Consume all messages in the request buffers */
+static void consumeAllMsgs()
+{
+    MPIPostRecvList *ptr = CpvAccess(curPostRecvPtr);
+    if (ptr) {
+        do {
+            int i;
+            for (i=0; i<ptr->bufCnt; i++) {
+                int done = 0;
+                MPI_Status sts;
+
+                /* Indicating this entry has been tested before */
+                if (ptr->postedRecvBufs[i] == NULL) continue;
+
+                if (MPI_SUCCESS != MPI_Test(ptr->postedRecvReqs+i, &done, &sts))
+                    CmiAbort("consumeAllMsgs failed in MPI_Test!\n");
+                if (done) {
+                    int nbytes;
+                    char *msg;
+                    if (MPI_SUCCESS != MPI_Get_count(&sts, MPI_BYTE, &nbytes))
+                        CmiAbort("consumeAllMsgs failed in MPI_Get_count!\n");
+                    /* ready to handle this msg */
+                    msg = (ptr->postedRecvBufs)[i];
+                    (ptr->postedRecvBufs)[i] = NULL;
+
+                    handleOneRecvedMsg(nbytes, msg);
+                } else {
+                    if (MPI_SUCCESS != MPI_Cancel(ptr->postedRecvReqs+i))
+                        CmiAbort("consumeAllMsgs failed in MPI_Cancel!\n");
+                }
+            }
+            ptr = ptr->next;
+        } while (ptr != CpvAccess(curPostRecvPtr));
+    }
+}
+
+static void recordMsgHistogramInfo(int size, char *msg)
+{
+    int idx = 0;
+    size -= MPI_POST_RECV_LOWERSIZE;
+    if (size > 0)
+        idx = (size/MSG_HISTOGRAM_BINSIZE + 1);
+
+    if (idx >= MAX_HISTOGRAM_BUCKETS) idx = MAX_HISTOGRAM_BUCKETS-1;
+    CpvAccess(MSG_HISTOGRAM_ARRAY)[idx]++;
+}
+
+#define POST_RECV_USE_STATIC_PARAM 0
+#define POST_RECV_REPORT_STS 0
+
+#if POST_RECV_REPORT_STS
+static int buildDynCallCnt = 0;
+#endif
+
+static void buildDynamicRecvBuffers()
+{
+    int i;
+
+    int local_MSG_CNT_THRESHOLD;
+    int local_MSG_INC;
+
+#if POST_RECV_REPORT_STS
+    buildDynCallCnt++;
+#endif
+
+    /* For debugging usage */
+    reportMsgHistogramInfo();
+
+    CpvAccess(msgRecvCnt) = 0;
+    /* consume all outstanding msgs */
+    consumeAllMsgs();
+
+#if POST_RECV_USE_STATIC_PARAM
+    local_MSG_CNT_THRESHOLD = MPI_POST_RECV_MSG_CNT_THRESHOLD;
+    local_MSG_INC = MPI_POST_RECV_MSG_INC;
+#else
+    {
+        int total = 0;
+        int count = 0;
+        for (i=1; i<MAX_HISTOGRAM_BUCKETS-1; i++) {
+            int tmp = CpvAccess(MSG_HISTOGRAM_ARRAY)[i];
+            /* avg is temporarily used for counting how many buckets are non-zero */
+            if (tmp > 0)  {
+                total += tmp;
+                count++;
+            }
+        }
+        if (count == 1) local_MSG_CNT_THRESHOLD = 1; /* Just filter out those zero-count msgs */
+        else local_MSG_CNT_THRESHOLD = total / count /3; /* Catch >50% msgs NEED-BETTER-SCHEME HERE!!*/
+        local_MSG_INC = total/count; /* Not having a good heuristic right now */
+#if POST_RECV_REPORT_STS
+        printf("sel_histo[%d]: critia_threshold=%d, critia_msginc=%d\n", CmiMyPe(), local_MSG_CNT_THRESHOLD, local_MSG_INC);
+#endif
+    }
+#endif
+
+    /* First continue to find the first msg range that requires post recv */
+    /* Ignore the fist and the last one because they are not tracked */
+    MPIPostRecvList *newHdr = NULL;
+    MPIPostRecvList *newListPtr = newHdr;
+    MPIPostRecvList *ptr = CpvAccess(postRecvListHdr);
+    for (i=1; i<MAX_HISTOGRAM_BUCKETS-1; i++) {
+        int count = CpvAccess(MSG_HISTOGRAM_ARRAY)[i];
+        if (count >= local_MSG_CNT_THRESHOLD) {
+
+#if POST_RECV_REPORT_STS
+            /* Report histogram results */
+            int low = (i-1)*MSG_HISTOGRAM_BINSIZE + MPI_POST_RECV_LOWERSIZE;
+            int high = low + MSG_HISTOGRAM_BINSIZE;
+            int reportCnt;
+            if (count == local_MSG_CNT_THRESHOLD) reportCnt = 1;
+            else reportCnt = (count - local_MSG_CNT_THRESHOLD)/local_MSG_INC + 1;
+            printf("sel_histo[%d]-%d: msg size [%.2f, %.2f) with count=%d (%d)\n", CmiMyPe(), buildDynCallCnt, low/1000.0, high/1000.0, count, reportCnt);
+#endif
+            /* find if this msg idx exists, the "i" is the msgSizeIdx, in the current list */
+            int notFound = 1;
+            MPIPostRecvList *newEntry = NULL;
+            while (ptr) {
+                if (ptr->msgSizeIdx < i) {
+                    /* free the buffer for this range of msg size */
+                    MPIPostRecvList *nextptr = ptr->next;
+
+                    free(ptr->postedRecvReqs);
+                    int j;
+                    for (j=0; j<ptr->bufCnt; j++) {
+                        if ((ptr->postedRecvBufs)[j]) CmiFree((ptr->postedRecvBufs)[j]);
+                    }
+                    free(ptr->postedRecvBufs);
+                    ptr = nextptr;
+                } else if (ptr->msgSizeIdx == i) {
+                    int newBufCnt, j;
+                    int bufSize = i*MPI_POST_RECV_INC + MPI_POST_RECV_LOWERSIZE - 1;
+                    newEntry = ptr;
+                    /* Do some adjustment according to the current statistics */
+                    if (count == local_MSG_CNT_THRESHOLD) newBufCnt = 1;
+                    else newBufCnt = (count - local_MSG_CNT_THRESHOLD)/local_MSG_INC + 1;
+                    if (newBufCnt != ptr->bufCnt) {
+                        /* free old buffers, and allocate new buffers */
+                        free(ptr->postedRecvReqs);
+                        ptr->postedRecvReqs = (MPI_Request *)malloc(newBufCnt * sizeof(MPI_Request));
+                        for (j=0; j<ptr->bufCnt; j++) {
+                            if ((ptr->postedRecvBufs)[j]) CmiFree((ptr->postedRecvBufs)[j]);
+                        }
+                        free(ptr->postedRecvBufs);
+                        ptr->postedRecvBufs = (char **)malloc(newBufCnt * sizeof(char *));
+                    }
+
+                    /* re-post those buffers */
+                    ptr->bufCnt = newBufCnt;
+                    for (j=0; j<ptr->bufCnt; j++) {
+                        ptr->postedRecvBufs[j] = (char *)CmiAlloc(bufSize);
+                        if (MPI_SUCCESS != MPI_Irecv(ptr->postedRecvBufs[j], bufSize, MPI_BYTE,
+                                                     MPI_ANY_SOURCE, POST_RECV_TAG+ptr->msgSizeIdx,
+                                                     MPI_COMM_WORLD, ptr->postedRecvReqs+j))
+                            CmiAbort("MPI_Irecv failed in recordMsgHistogramInfo!\n");
+                    }
+
+                    /* We already posted bufs for this range of msg size */
+                    ptr = ptr->next;
+                    /* Need to set ptr to NULL as the buf list comes to an end and the while loop exits */
+                    if (ptr == CpvAccess(postRecvListHdr)) ptr = NULL;
+                    notFound = 0;
+                    break;
+                } else {
+                    /* The msgSizeIdx is larger than i */
+                    break;
+                }
+                if (ptr == CpvAccess(postRecvListHdr)) {
+                    ptr = NULL;
+                    break;
+                }
+            } /* end while(ptr): iterating the posted recv buffer list */
+
+            if (notFound) {
+                /* the current range of msg size is not found in the list */
+                int j;
+                int bufSize = i*MPI_POST_RECV_INC + MPI_POST_RECV_LOWERSIZE - 1;
+                newEntry = malloc(sizeof(MPIPostRecvList));
+                MPIPostRecvList *one = newEntry;
+                one->msgSizeIdx = i;
+                if (count == local_MSG_CNT_THRESHOLD) one->bufCnt = 1;
+                else one->bufCnt = (count - local_MSG_CNT_THRESHOLD)/local_MSG_INC + 1;
+                one->postedRecvReqs = (MPI_Request *)malloc(sizeof(MPI_Request)*one->bufCnt);
+                one->postedRecvBufs = (char **)malloc(one->bufCnt * sizeof(char *));
+                for (j=0; j<one->bufCnt; j++) {
+                    one->postedRecvBufs[j] = (char *)CmiAlloc(bufSize);
+                    if (MPI_SUCCESS != MPI_Irecv(one->postedRecvBufs[j], bufSize, MPI_BYTE,
+                                                 MPI_ANY_SOURCE, POST_RECV_TAG+one->msgSizeIdx,
+                                                 MPI_COMM_WORLD, one->postedRecvReqs+j))
+                        CmiAbort("MPI_Irecv failed in recordMsgHistogramInfo!\n");
+                }
+            } /* end if notFound */
+
+            /* Update the new list with the newEntry */
+            CmiAssert(newEntry != NULL);
+            if (newHdr == NULL) {
+                newHdr = newEntry;
+                newListPtr = newEntry;
+                newHdr->next = newHdr;
+            } else {
+                newListPtr->next = newEntry;
+                newListPtr = newEntry;
+                newListPtr->next = newHdr;
+            }
+        } /* end if the count of this msg size range exceeds the threshold */
+    } /* end for loop over the histogram buckets */
+
+    /* Free remaining entries in the list */
+    while (ptr) {
+        /* free the buffer for this range of msg size */
+        MPIPostRecvList *nextptr = ptr->next;
+
+        free(ptr->postedRecvReqs);
+        int j;
+        for (j=0; j<ptr->bufCnt; j++) {
+            if ((ptr->postedRecvBufs)[j]) CmiFree((ptr->postedRecvBufs)[j]);
+        }
+        free(ptr->postedRecvBufs);
+        ptr = nextptr;
+        if (ptr == CpvAccess(postRecvListHdr)) break;
+    }
+
+    CpvAccess(curPostRecvPtr) = CpvAccess(postRecvListHdr) = newHdr;
+    memset(CpvAccess(MSG_HISTOGRAM_ARRAY), 0, sizeof(int)*MAX_HISTOGRAM_BUCKETS);
+} /* end of function buildDynamicRecvBuffers */
+
+static void examineMsgHistogramInfo(int size, char *msg)
+{
+    int total = CpvAccess(msgRecvCnt)++;
+    if (total < MPI_POST_RECV_FREQ) {
+        recordMsgHistogramInfo(size, msg);
+    } else {
+        buildDynamicRecvBuffers();
+    }
+}
+#else
+/* case when CAPTURE_MSG_HISTOGRAM is defined */
+static void recordMsgHistogramInfo(int size, char *msg)
+{
+    int idx = size/MSG_HISTOGRAM_BINSIZE;
+    if (idx >= MAX_HISTOGRAM_BUCKETS) idx = MAX_HISTOGRAM_BUCKETS-1;
+    CpvAccess(MSG_HISTOGRAM_ARRAY)[idx]++;
+}
+#endif /* end of MPI_DYNAMIC_POST_RECV */
+
+static void reportMsgHistogramInfo()
+{
+#if MPI_DYNAMIC_POST_RECV
+    int i, count;
+    count = CpvAccess(MSG_HISTOGRAM_ARRAY)[0];
+    if (count > 0) {
+        printf("msg_histo[%d]: %d for msg [0, %.2fK)\n", CmiMyNode(), count, MPI_POST_RECV_LOWERSIZE/1000.0);
+    }
+    for (i=1; i<MAX_HISTOGRAM_BUCKETS-1; i++) {
+        int count = CpvAccess(MSG_HISTOGRAM_ARRAY)[i];
+        if (count > 0) {
+            int low = (i-1)*MSG_HISTOGRAM_BINSIZE + MPI_POST_RECV_LOWERSIZE;
+            int high = low + MSG_HISTOGRAM_BINSIZE;
+            printf("msg_histo[%d]: %d for msg [%.2fK, %.2fK)\n", CmiMyNode(), count, low/1000.0, high/1000.0);
+        }
+    }
+    count = CpvAccess(MSG_HISTOGRAM_ARRAY)[MAX_HISTOGRAM_BUCKETS-1];
+    if (count > 0) {
+        printf("msg_histo[%d]: %d for msg [%.2fK, +inf)\n", CmiMyNode(), count, MPI_POST_RECV_UPPERSIZE/1000.0);
+    }
+#else
+    int i;
+    for (i=0; i<MAX_HISTOGRAM_BUCKETS; i++) {
+        int count = CpvAccess(MSG_HISTOGRAM_ARRAY)[i];
+        if (count > 0) {
+            int low = i*MSG_HISTOGRAM_BINSIZE;
+            int high = low + MSG_HISTOGRAM_BINSIZE;
+            printf("msg_histo[%d]: %d for msg [%dK, %dK)\n", CmiMyNode(), count, low/1000, high/1000);
+        }
+    }
+#endif
+}
+#endif /* end of CAPTURE_MSG_HISTOGRAM || MPI_DYNAMIC_POST_RECV */
+
+void CmiSetupMachineRecvBuffersUser()
+{
+#if MPI_DYNAMIC_POST_RECV
+    buildDynamicRecvBuffers();
+#endif
+}
+
+
 /*@}*/