a mpi-smp that every worker threads send messages instead of going through comm thread.
authorGengbin Zheng <gzheng@illinois.edu>
Thu, 13 Oct 2011 19:37:49 +0000 (12:37 -0700)
committerGengbin Zheng <gzheng@illinois.edu>
Thu, 13 Oct 2011 19:37:49 +0000 (12:37 -0700)
src/arch/mpi/machine.c
src/arch/util/machine-common-core.c

index 5d4f5c74da7f63255226e881a95dbf48724cdbe7..61d4889dcf28dac6797c32ce4c4f9e0b40851708 100644 (file)
@@ -1,9 +1,3 @@
-/*****************************************************************************
- * $Source$
- * $Author$
- * $Date$
- * $Revision$
- *****************************************************************************/
 
 /** @file
  * MPI based machine layer
@@ -166,6 +160,7 @@ static int checksum_flag = 0;
 #endif
 /* =====End of Definitions of Message-Corruption Related Macros=====*/
 
+static int thread_level;
 
 /* =====Beginning of Declarations of Machine Specific Variables===== */
 #include <signal.h>
@@ -185,10 +180,10 @@ typedef struct msg_list {
     MPI_Request req;
 } SMSG_LIST;
 
-static SMSG_LIST *sent_msgs=0;
-static SMSG_LIST *end_sent=0;
+CpvStaticDeclare(SMSG_LIST *, sent_msgs);
+CpvStaticDeclare(SMSG_LIST *, end_sent);
 
-int MsgQueueLen=0;
+CpvStaticDeclare(int, MsgQueueLen);
 static int request_max;
 /*FLAG: consume outstanding Isends in scheduler loop*/
 static int no_outstanding_sends=0;
@@ -353,17 +348,17 @@ static CmiCommHandle MPISendOneMsg(SMSG_LIST *smsg) {
 #endif
 
     MACHSTATE(3,"}MPI_send end");
-    MsgQueueLen++;
-    if (sent_msgs==0)
-        sent_msgs = smsg;
+    CpvAccess(MsgQueueLen)++;
+    if (CpvAccess(sent_msgs)==0)
+        CpvAccess(sent_msgs) = smsg;
     else
-        end_sent->next = smsg;
-    end_sent = smsg;
+        CpvAccess(end_sent)->next = smsg;
+    CpvAccess(end_sent) = smsg;
 
 #if !CMI_DYNAMIC_EXERT_CAP && !CMI_EXERT_SEND_CAP
     if (mode == P2P_SYNC || mode == P2P_ASYNC)
     {
-    while (MsgQueueLen > request_max) {
+    while (CpvAccess(MsgQueueLen) > request_max) {
         CmiReleaseSentMessages();
         PumpMsgs();
     }
@@ -382,9 +377,11 @@ static CmiCommHandle MachineSpecificSendForMPI(int destNode, int size, char *msg
 
     CmiAssert(destNode != CmiMyNode());
 #if CMK_SMP
-    EnqueueMsg(msg, size, destNode, mode);
-    return 0;
-#else
+    if (thread_level != MPI_THREAD_MULTIPLE) {
+      EnqueueMsg(msg, size, destNode, mode);
+      return 0;
+    }
+#endif
     /* non smp */
     msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));
     msg_tmp->msg = msg;
@@ -393,11 +390,10 @@ static CmiCommHandle MachineSpecificSendForMPI(int destNode, int size, char *msg
     msg_tmp->next = 0;
     msg_tmp->mode = mode;
     return MPISendOneMsg(msg_tmp);
-#endif
 }
 
 static size_t CmiAllAsyncMsgsSent(void) {
-    SMSG_LIST *msg_tmp = sent_msgs;
+    SMSG_LIST *msg_tmp = CpvAccess(sent_msgs);
     MPI_Status sts;
     int done;
 
@@ -415,7 +411,7 @@ static size_t CmiAllAsyncMsgsSent(void) {
 
 int CmiAsyncMsgSent(CmiCommHandle c) {
 
-    SMSG_LIST *msg_tmp = sent_msgs;
+    SMSG_LIST *msg_tmp = CpvAccess(sent_msgs);
     int done;
     MPI_Status sts;
 
@@ -437,7 +433,7 @@ void CmiReleaseCommHandle(CmiCommHandle c) {
 
 /* ######Beginning of functions related with communication progress ###### */
 static void CmiReleaseSentMessages(void) {
-    SMSG_LIST *msg_tmp=sent_msgs;
+    SMSG_LIST *msg_tmp=CpvAccess(sent_msgs);
     SMSG_LIST *prev=0;
     SMSG_LIST *temp;
     int done;
@@ -457,11 +453,11 @@ static void CmiReleaseSentMessages(void) {
             CmiAbort("CmiReleaseSentMessages: MPI_Test failed!\n");
         if (done) {
             MACHSTATE2(3,"CmiReleaseSentMessages release one %d to %d", CmiMyPe(), msg_tmp->destpe);
-            MsgQueueLen--;
+            CpvAccess(MsgQueueLen)--;
             /* Release the message */
             temp = msg_tmp->next;
             if (prev==0) /* first message */
-                sent_msgs = temp;
+                CpvAccess(sent_msgs) = temp;
             else
                 prev->next = temp;
             CmiFree(msg_tmp->msg);
@@ -479,7 +475,7 @@ static void CmiReleaseSentMessages(void) {
         }
 #endif
     }
-    end_sent = prev;
+    CpvAccess(end_sent) = prev;
     MACHSTATE(2,"} CmiReleaseSentMessages end");
 }
 
@@ -659,7 +655,7 @@ static void PumpMsgsBlocking(void) {
     if (!PCQueueEmpty(CmiGetState()->recv)) return;
     if (!CdsFifo_Empty(CpvAccess(CmiLocalQueue))) return;
     if (!CqsEmpty(CpvAccess(CsdSchedQueue))) return;
-    if (sent_msgs)  return;
+    if (CpvAccess(sent_msgs))  return;
 
 #if 0
     CmiPrintf("[%d] PumpMsgsBlocking. \n", CmiMyPe());
@@ -852,6 +848,11 @@ static void MachinePostNonLocalForMPI() {
             return 0;
     }
 #endif
+#else
+  if (thread_level == MPI_THREAD_MULTIPLE) {
+        CmiReleaseSentMessages();
+        SendMsgBuf();
+  }
 #endif
 }
 
@@ -1023,7 +1024,6 @@ static void MachineInitForMPI(int *argc, char ***argv, int *numNodes, int *myNod
     int n,i;
     int ver, subver;
     int provided;
-    int thread_level;
     int myNID;
     int largc=*argc;
     char** largv=*argv;
@@ -1039,7 +1039,7 @@ static void MachineInitForMPI(int *argc, char ***argv, int *numNodes, int *myNod
 
 #if CMK_MPI_INIT_THREAD
 #if CMK_SMP
-    thread_level = MPI_THREAD_FUNNELED;
+    thread_level = MPI_THREAD_MULTIPLE;
 #else
     thread_level = MPI_THREAD_SINGLE;
 #endif
@@ -1157,6 +1157,7 @@ static void MachineInitForMPI(int *argc, char ***argv, int *numNodes, int *myNod
 }
 
 static void MachinePreCommonInitForMPI(int everReturn) {
+
 #if MPI_POST_RECV
     int doInit = 1;
     int i;
@@ -1204,6 +1205,14 @@ static void MachinePreCommonInitForMPI(int everReturn) {
 }
 
 static void MachinePostCommonInitForMPI(int everReturn) {
+
+    CpvInitialize(SMSG_LIST *, sent_msgs);
+    CpvInitialize(SMSG_LIST *, end_sent);
+    CpvInitialize(int, MsgQueueLen);
+    CpvAccess(sent_msgs) = NULL;
+    CpvAccess(end_sent) = NULL;
+    CpvAccess(MsgQueueLen) = 0;
+
     CmiIdleState *s=CmiNotifyGetState();
     machine_exit_idx = CmiRegisterHandler((CmiHandler)machine_exit);
 
@@ -1218,6 +1227,8 @@ static void MachinePostCommonInitForMPI(int everReturn) {
 #if CMK_SMP
     CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
+    if (thread_level == MPI_THREAD_MULTIPLE)
+      CcdCallOnConditionKeep(CcdPERIODIC,(CcdVoidFn)LrtsPostNonLocal,NULL);
 #else
     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyIdleForMPI,NULL);
 #endif
index 3095290fe5c2da155956dcd39c8a2c9e358b01c9..a4ec286b65bf9b8976334486a90dd42f1d02e2d3 100644 (file)
@@ -837,6 +837,8 @@ void *CmiGetNonLocal(void) {
        AdvanceCommunication();
        msg = PCQueuePop(cs->recv);
     }
+#else
+//    LrtsPostNonLocal();
 #endif
 
     MACHSTATE3(3,"[%p] CmiGetNonLocal from queue %p with msg %p end }",CmiGetState(),(cs->recv), msg);
@@ -893,8 +895,8 @@ static void CmiNotifyStillIdle(CmiIdleState *s) {
 
 #if !CMK_SMP
     AdvanceCommunication();
-//#else
-//    LrtsPostNonLocal();
+#else
+    LrtsPostNonLocal();
 #endif
 
     MACHSTATE1(2,"still idle (%d) end {",CmiMyPe())