1. make the dynamic cap parameter configurable by reading from command line
authorChao Mei <chaomei2@illinois.edu>
Wed, 25 May 2011 22:18:59 +0000 (17:18 -0500)
committerChao Mei <chaomei2@illinois.edu>
Wed, 25 May 2011 22:18:59 +0000 (17:18 -0500)
2. simplified the send op for both non-SMP and SMP that they share the same
   code of calling MPI_Isend function

src/arch/mpi/machine.c

index 7a64c8a2421ccaee0ca72c7d127c751b25006e35..c2ae27121f09273f03b729c6d9414869f359cf34 100644 (file)
@@ -55,10 +55,10 @@ static void sleep(int secs) {
 /* This macro defines the max number of msgs in the sender msg buffer
  * that is allowed for recving operation to continue
  */
-#define CMI_DYNAMIC_OUTGOING_THRESHOLD 4
+static int CMI_DYNAMIC_OUTGOING_THRESHOLD=4;
 #define CMI_DYNAMIC_MAXCAPSIZE 1000
-#define CMI_DYNAMIC_SEND_CAPSIZE 4
-#define CMI_DYNAMIC_RECV_CAPSIZE 3
+static int CMI_DYNAMIC_SEND_CAPSIZE=4;
+static int CMI_DYNAMIC_RECV_CAPSIZE=3;
 /* initial values, -1 indiates there's no cap */
 static int dynamicSendCap = CMI_DYNAMIC_MAXCAPSIZE;
 static int dynamicRecvCap = CMI_DYNAMIC_MAXCAPSIZE;
@@ -281,6 +281,7 @@ static void EnqueueMsg(void *m, int size, int node) {
     msg_tmp->msg = m;
     msg_tmp->size = size;
     msg_tmp->destpe = node;
+    msg_tmp->next = 0;
 
 #if CMK_SMP_TRACE_COMMTHREAD
     msg_tmp->srcpe = CmiMyPe();
@@ -298,64 +299,93 @@ static void EnqueueMsg(void *m, int size, int node) {
 }
 #endif
 
-static CmiCommHandle MachineSpecificSendForMPI(int destNode, int size, char *msg, int mode) {
-    /* Ignoring the mode for MPI layer */
-
-    CmiState cs = CmiGetState();
-    SMSG_LIST *msg_tmp;
-    int  rank;
+/* The function that calls MPI_Isend so that both non-SMP and SMP could use */
+static CmiCommHandle MPISendOneMsg(SMSG_LIST *smsg) {
+    int node = smsg->destpe;
+    int size = smsg->size;
+    char *msg = smsg->msg;
 
-    CmiAssert(destNode != CmiMyNode());
-#if CMK_SMP
-    EnqueueMsg(msg, size, destNode);
-    return 0;
-#else
-    /* non smp */
-    msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));
-    msg_tmp->msg = msg;
-    msg_tmp->next = 0;
+#if !CMI_DYNAMIC_EXERT_CAP && !CMI_EXERT_SEND_CAP
     while (MsgQueueLen > request_max) {
-        /*printf("Waiting for %d messages to be sent\n", MsgQueueLen);*/
         CmiReleaseSentMessages();
         PumpMsgs();
     }
+#endif
+
+    MACHSTATE2(3,"MPI_send to node %d rank: %d{", node, CMI_DEST_RANK(msg));
 #if CMK_ERROR_CHECKING
     CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
-#endif
     CMI_SET_CHECKSUM(msg, size);
+#endif
 
 #if MPI_POST_RECV
     if (size>=MPI_POST_RECV_LOWERSIZE && size <= MPI_POST_RECV_UPPERSIZE) {
         START_EVENT();
-        if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,destNode,POST_RECV_TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
-            CmiAbort("MachineSpecificSendForMPI: MPI_Isend failed!\n");
-        END_EVENT(40);
+        if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,POST_RECV_TAG,MPI_COMM_WORLD,&(smsg->req)))
+            CmiAbort("MPISendOneMsg: MPI_Isend failed!\n");
+        /*END_EVENT(40);*/
     } else {
         START_EVENT();
-        if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,destNode,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
-            CmiAbort("MachineSpecificSendForMPI: MPI_Isend failed!\n");
-        END_EVENT(40);
+        if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,TAG,MPI_COMM_WORLD,&(smsg->req)))
+            CmiAbort("MPISendOneMsg: MPI_Isend failed!\n");
+        /*END_EVENT(40);*/
     }
 #else
     START_EVENT();
-    if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,destNode,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
-        CmiAbort("MachineSpecificSendForMPI: MPI_Isend failed!\n");
+    if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,TAG,MPI_COMM_WORLD,&(smsg->req)))
+        CmiAbort("MPISendOneMsg: MPI_Isend failed!\n");
     /*END_EVENT(40);*/
-#if CMK_TRACE_COMMOVERHEAD
+#endif
+
+#if CMK_SMP_TRACE_COMMTHREAD
+    traceBeginCommOp(msg);
+    traceChangeLastTimestamp(CpvAccess(projTraceStart));
+    /* traceSendMsgComm must execute after traceBeginCommOp because
+         * we pretend we execute an entry method, and inside this we
+         * pretend we will send another message. Otherwise how could
+         * a message creation just before an entry method invocation?
+         * If such logic is broken, the projections will not trace
+         * messages correctly! -Chao Mei
+         */
+    traceSendMsgComm(msg);
+    traceEndCommOp(msg);
+#if CMI_MPI_TRACE_MOREDETAILED
     char tmp[64];
-    sprintf(tmp, "MPI_Isend: from proc %d to proc %d", CmiMyPe(), destNode);
+    sprintf(tmp, "MPI_Isend: from proc %d to proc %d", smsg->srcpe, CmiNodeFirst(node)+CMI_DEST_RANK(msg));
     traceUserSuppliedBracketedNote(tmp, 40, CpvAccess(projTraceStart), CmiWallTimer());
 #endif
 #endif
 
+    MACHSTATE(3,"}MPI_send end");
     MsgQueueLen++;
     if (sent_msgs==0)
-        sent_msgs = msg_tmp;
+        sent_msgs = smsg;
     else
-        end_sent->next = msg_tmp;
-    end_sent = msg_tmp;
-    return (CmiCommHandle) &(msg_tmp->req);
-#endif              /* non-smp */
+        end_sent->next = smsg;
+    end_sent = smsg;
+    return (CmiCommHandle) &(smsg->req);
+}
+
+static CmiCommHandle MachineSpecificSendForMPI(int destNode, int size, char *msg, int mode) {
+    /* Ignoring the mode for MPI layer */
+
+    CmiState cs = CmiGetState();
+    SMSG_LIST *msg_tmp;
+    int  rank;
+
+    CmiAssert(destNode != CmiMyNode());
+#if CMK_SMP
+    EnqueueMsg(msg, size, destNode);
+    return 0;
+#else
+    /* non smp */
+    msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));
+    msg_tmp->msg = msg;
+    msg_tmp->destpe = destNode;
+    msg_tmp->size = size;
+    msg_tmp->next = 0;
+    return MPISendOneMsg(msg_tmp);
+#endif
 }
 
 static size_t CmiAllAsyncMsgsSent(void) {
@@ -694,70 +724,7 @@ static int SendMsgBuf() {
     /* CmiUnlock(sendMsgBufLock); */
     while (NULL != msg_tmp) {
 #endif
-            node = msg_tmp->destpe;
-            size = msg_tmp->size;
-            msg = msg_tmp->msg;
-            msg_tmp->next = 0;
-
-#if !CMI_DYNAMIC_EXERT_CAP && !CMI_EXERT_SEND_CAP
-            while (MsgQueueLen > request_max) {
-                CmiReleaseSentMessages();
-                PumpMsgs();
-            }
-#endif
-
-            MACHSTATE2(3,"MPI_send to node %d rank: %d{", node, CMI_DEST_RANK(msg));
-#if CMK_ERROR_CHECKING
-            CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
-#endif
-            CMI_SET_CHECKSUM(msg, size);
-
-#if MPI_POST_RECV
-            if (size>=MPI_POST_RECV_LOWERSIZE && size <= MPI_POST_RECV_UPPERSIZE) {
-                START_EVENT();
-                if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,POST_RECV_TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
-                    CmiAbort("MachineSpecificSendForMPI: MPI_Isend failed!\n");
-                END_EVENT(40);
-            } else {
-                START_EVENT();
-                if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
-                    CmiAbort("MachineSpecificSendForMPI: MPI_Isend failed!\n");
-                END_EVENT(40);
-            }
-#else
-            START_EVENT();
-            if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
-                CmiAbort("MachineSpecificSendForMPI: MPI_Isend failed!\n");
-            /*END_EVENT(40);*/
-#endif
-
-#if CMK_SMP_TRACE_COMMTHREAD
-            traceBeginCommOp(msg);
-            traceChangeLastTimestamp(CpvAccess(projTraceStart));
-            /* traceSendMsgComm must execute after traceBeginCommOp because
-                 * we pretend we execute an entry method, and inside this we
-                 * pretend we will send another message. Otherwise how could
-                 * a message creation just before an entry method invocation?
-                 * If such logic is broken, the projections will not trace
-                 * messages correctly! -Chao Mei
-                 */
-            traceSendMsgComm(msg);
-            traceEndCommOp(msg);
-#if CMI_MPI_TRACE_MOREDETAILED
-            char tmp[64];
-            sprintf(tmp, "MPI_Isend: from proc %d to proc %d", msg_tmp->srcpe, CmiNodeFirst(node)+CMI_DEST_RANK(msg));
-            traceUserSuppliedBracketedNote(tmp, 40, CpvAccess(projTraceStart), CmiWallTimer());
-#endif
-#endif
-
-
-            MACHSTATE(3,"}MPI_send end");
-            MsgQueueLen++;
-            if (sent_msgs==0)
-                sent_msgs = msg_tmp;
-            else
-                end_sent->next = msg_tmp;
-            end_sent = msg_tmp;
+            MPISendOneMsg(msg_tmp);
             sent=1;
 
 #if CMI_EXERT_SEND_CAP
@@ -806,8 +773,8 @@ static int RecvQueueEmpty() {
 #define REPORT_COMM_METRICS 0
 #if REPORT_COMM_METRICS
 static double pumptime = 0.0;
-static double releasetime = 0.0;
-static double sendtime = 0.0;
+                         static double releasetime = 0.0;
+                                                     static double sendtime = 0.0;
 #endif
 
 #endif //end of CMK_SMP
@@ -1127,6 +1094,16 @@ static void MachineInitForMPI(int argc, char **argv, int *numNodes, int *myNodeI
     }
 #endif
 
+#if CMI_DYNAMIC_EXERT_CAP
+    CmiGetArgInt(argv, "+dynCapThreshold", &CMI_DYNAMIC_OUTGOING_THRESHOLD);
+    CmiGetArgInt(argv, "+dynCapSend", &CMI_DYNAMIC_SEND_CAPSIZE);
+    CmiGetArgInt(argv, "+dynCapRecv", &CMI_DYNAMIC_RECV_CAPSIZE);
+    if (myNID==0) {
+        printf("Charm++: using dynamic flow control with outgoing threshold %d, send cap %d, recv cap %d\n",
+               CMI_DYNAMIC_OUTGOING_THRESHOLD, CMI_DYNAMIC_SEND_CAPSIZE, CMI_DYNAMIC_RECV_CAPSIZE);
+    }
+#endif
+
     /* checksum flag */
     if (CmiGetArgFlag(argv,"+checksum")) {
 #if CMK_ERROR_CHECKING
@@ -1275,9 +1252,9 @@ void CmiAbort(const char *message) {
 
 /* MPI calls are not threadsafe, even the timer on some machines */
 static CmiNodeLock  timerLock = 0;
-static int _absoluteTime = 0;
-static double starttimer = 0;
-static int _is_global = 0;
+                                static int _absoluteTime = 0;
+                                                           static double starttimer = 0;
+                                                                                      static int _is_global = 0;
 
 int CmiTimerIsSynchronized() {
     int  flag;