fix a bug in worker thread sending messages in smp
authorYanhua Sun <yanhuas@jyc1.(none)>
Tue, 21 Feb 2012 19:18:14 +0000 (13:18 -0600)
committerYanhua Sun <yanhuas@jyc1.(none)>
Tue, 21 Feb 2012 19:18:14 +0000 (13:18 -0600)
src/arch/gemini_gni/machine.c

index 1b5d6934f5ec45697148f665e33a43e8e243c200..7ce1726a5c04c71a18de2a0b96c7dc76f426afeb 100644 (file)
@@ -87,7 +87,7 @@ static CmiInt8 _expand_mem =  4*oneMB;
 
 #endif
 
-#if CMK_SMP
+#if CMK_SMP && COMM_THREAD_SEND 
 //Dynamic flow control about memory registration
 static CmiInt8  MAX_BUFF_SEND  =  100*oneMB;
 static CmiInt8  MAX_REG_MEM    =  200*oneMB;
@@ -105,6 +105,15 @@ static int register_memory_size = 0;
 
 #if CMK_SMP
 #define COMM_THREAD_SEND 1
+//#define MULTI_THREAD_SEND 1
+#endif
+
+#if CMK_SMP && MULTI_THREAD_SEND
+#define     CMI_GNI_LOCK        CmiLock(tx_cq_lock);
+#define     CMI_GNI_UNLOCK        CmiUnlock(tx_cq_lock);
+#else
+#define     CMI_GNI_LOCK
+#define     CMI_GNI_UNLOCK
 #endif
 
 static int _checkProgress = 1;             /* check deadlock */
@@ -141,7 +150,6 @@ static CmiUInt8  smsg_recv_count = 0,  last_smsg_recv_count = 0;
 #if PRINT_SYH
 int         lrts_send_msg_id = 0;
 int         lrts_local_done_msg = 0;
-int         lrts_smsg_success = 0;
 int         lrts_send_rdma_success = 0;
 #endif
 
@@ -303,10 +311,11 @@ static gni_cq_handle_t       smsg_tx_cqh = NULL;
 static gni_cq_handle_t       post_rx_cqh = NULL;
 static gni_cq_handle_t       post_tx_cqh = NULL;
 static gni_ep_handle_t       *ep_hndl_array;
-#if CMK_SMP && !COMM_THREAD_SEND
+#if CMK_SMP && MULTI_THREAD_SEND
 static CmiNodeLock           *ep_lock_array;
 static CmiNodeLock           tx_cq_lock; 
-static CmiNodeLock           rx_cq_lock; 
+static CmiNodeLock           rx_cq_lock;
+static CmiNodeLock           *mempool_lock;
 #endif
 typedef struct msg_list
 {
@@ -664,7 +673,7 @@ void CmiMachineProgressImpl() {
 static void SendRdmaMsg();
 static void PumpNetworkSmsg();
 static void PumpLocalRdmaTransactions();
-static int SendBufferMsg();
+static int SendBufferMsg(SMSG_QUEUE *queue);
 
 #if MACHINE_DEBUG_LOG
 FILE *debugLog = NULL;
@@ -752,7 +761,7 @@ static gni_return_t registerMemory(void *msg, int size, gni_mem_handle_t *t)
 
     status = registerFromMempool(mptr1, msg, size, t);
     if (status == GNI_RC_SUCCESS) return status;
-#if CMK_SMP
+#if CMK_SMP 
     for (i=0; i<CmiMyNodeSize()+1; i++) {
       rank = (rank+1)%(CmiMyNodeSize()+1);
       mptr = CpvAccessOther(mempool, rank);
@@ -903,16 +912,10 @@ static int connect_to(int destNode)
     smsg_attr_vector_local[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
     alloc_smsg_attr(smsg_attr_vector_local[destNode]);
     smsg_attr_vector_remote[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
-            
-#if CMK_SMP && !COMM_THREAD_SEND
-    //CmiLock(ep_lock_array[destNode]);
-    CmiLock(tx_cq_lock);
-#endif
+    
+    CMI_GNI_LOCK
     status = GNI_EpPostDataWId (ep_hndl_array[destNode], smsg_attr_vector_local[destNode], sizeof(gni_smsg_attr_t),smsg_attr_vector_remote[destNode] ,sizeof(gni_smsg_attr_t), destNode+mysize);
-#if CMK_SMP && !COMM_THREAD_SEND
-    //CmiUnlock(ep_lock_array[destNode]);
-    CmiUnlock(tx_cq_lock);
-#endif
+    CMI_GNI_UNLOCK
     if (status == GNI_RC_ERROR_RESOURCE) {
       /* possibly destNode is making connection at the same time */
       free(smsg_attr_vector_local[destNode]);
@@ -956,13 +959,9 @@ static gni_return_t send_smsg_message(SMSG_QUEUE *queue, int destNode, void *msg
     if(queue->smsg_msglist_index[destNode].sendSmsgBuf == 0 || inbuff==1)
     {
 #endif
-#if CMK_SMP && !COMM_THREAD_SEND
-        CmiLock(tx_cq_lock);
-#endif
+        CMI_GNI_LOCK
         status = GNI_SmsgSendWTag(ep_hndl_array[destNode], 0, 0, msg, size, 0, tag);
-#if CMK_SMP && !COMM_THREAD_SEND
-        CmiUnlock(tx_cq_lock);
-#endif
+        CMI_GNI_UNLOCK
         if(status == GNI_RC_SUCCESS)
         {
 #if CMK_SMP_TRACE_COMMTHREAD
@@ -1275,10 +1274,13 @@ void LrtsPostCommonInit(int everReturn)
 /* this is called by worker thread */
 void LrtsPostNonLocal(){
 #if CMK_SMP
-#if !COMM_THREAD_SEND
+#if MULTI_THREAD_SEND
     if(mysize == 1) return;
     PumpLocalRdmaTransactions();
-    SendBufferMsg();
+#if CMK_USE_OOB
+    if (SendBufferMsg(&smsg_oob_queue) == 1)
+#endif
+    SendBufferMsg(&smsg_queue);
     SendRdmaMsg();
 #endif
 #endif
@@ -1298,13 +1300,9 @@ static void    PumpDatagramConnection()
    {
        if (datagram_id >= mysize) {           /* bound endpoint */
            int pe = datagram_id - mysize;
-#if CMK_SMP && !COMM_THREAD_SEND
-           CmiLock(tx_cq_lock);
-#endif
+           CMI_GNI_LOCK
            status = GNI_EpPostDataTestById( ep_hndl_array[pe], datagram_id, &post_state, &remote_address, &remote_id);
-#if CMK_SMP && !COMM_THREAD_SEND
-           CmiUnlock(tx_cq_lock);
-#endif
+           CMI_GNI_UNLOCK
            if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
            {
                CmiAssert(remote_id == pe);
@@ -1385,22 +1383,17 @@ static void PumpNetworkSmsg()
     uint64_t            source_addr;
     SMSG_QUEUE         *queue = &smsg_queue;
 
-#if CMK_SMP && !COMM_THREAD_SEND
     while(1)
     {
-        CmiLock(tx_cq_lock);
+        CMI_GNI_LOCK
         status =GNI_CqGetEvent(smsg_rx_cqh, &event_data);
-        CmiUnlock(tx_cq_lock);
+        CMI_GNI_UNLOCK
         if(status != GNI_RC_SUCCESS)
             break;
-#else
-    while ((status=GNI_CqGetEvent(smsg_rx_cqh, &event_data)) == GNI_RC_SUCCESS)
-    {
-#endif
         inst_id = GNI_CQ_GET_INST_ID(event_data);
         // GetEvent returns success but GetNext return not_done. caused by Smsg out-of-order transfer
 #if PRINT_SYH
-        printf("[%d] PumpNetworkMsgs is received from PE: %d,  status=%s\n", myrank, inst_id,  gni_err_str[status]);
+        printf("[%d] %d PumpNetworkMsgs is received from PE: %d,  status=%s\n", myrank, CmiMyRank(), inst_id,  gni_err_str[status]);
 #endif
         if (useDynamicSMSG) {
             /* subtle: smsg may come before connection is setup */
@@ -1408,15 +1401,14 @@ static void PumpNetworkSmsg()
                PumpDatagramConnection();
         }
         msg_tag = GNI_SMSG_ANY_TAG;
-#if CMK_SMP && !COMM_THREAD_SEND
         while(1) {
-            CmiLock(tx_cq_lock);
+            CMI_GNI_LOCK
             status = GNI_SmsgGetNextWTag(ep_hndl_array[inst_id], &header, &msg_tag);
-            CmiUnlock(tx_cq_lock);
+            CMI_GNI_UNLOCK
             if (status != GNI_RC_SUCCESS)
                 break;
-#else
-        while( (status = GNI_SmsgGetNextWTag(ep_hndl_array[inst_id], &header, &msg_tag)) == GNI_RC_SUCCESS){
+#if PRINT_SYH
+            printf("[%d] from %d request for Large msg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
 #endif
             /* copy msg out and then put into queue (small message) */
             switch (msg_tag) {
@@ -1434,9 +1426,6 @@ static void PumpNetworkSmsg()
             }
             case LMSG_INIT_TAG:
             {
-#if PRINT_SYH
-                printf("[%d] from %d request for Large msg is received, messageid:%d tag=%d\n", myrank, inst_id, lrts_received_msg, msg_tag);
-#endif
                 getLargeMsgRequest(header, inst_id);
                 break;
             }
@@ -1509,13 +1498,12 @@ static void PumpNetworkSmsg()
                 CmiAbort("Unknown tag\n");
                      }
             }               // end switch
-#if CMK_SMP && !COMM_THREAD_SEND
-            CmiLock(tx_cq_lock);
+#if PRINT_SYH
+            printf("[%d] from %d after switch request for Large msg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
 #endif
+            CMI_GNI_LOCK
             GNI_SmsgRelease(ep_hndl_array[inst_id]);
-#if CMK_SMP && !COMM_THREAD_SEND
-            CmiUnlock(tx_cq_lock);
-#endif
+            CMI_GNI_UNLOCK
             smsg_recv_count ++;
             msg_tag = GNI_SMSG_ANY_TAG;
         } //endwhile getNext
@@ -1612,17 +1600,13 @@ static void getLargeMsgRequest(void* header, uint64_t inst_id )
     //memory registration success
     if(status == GNI_RC_SUCCESS)
     {
-#if CMK_SMP && !COMM_THREAD_SEND
-        CmiLock(tx_cq_lock);
-#endif
+        CMI_GNI_LOCK
         if(pd->type == GNI_POST_RDMA_GET) 
             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
         else
             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
-#if CMK_SMP && !COMM_THREAD_SEND
-        CmiUnlock(tx_cq_lock);
-#endif
-         
+        CMI_GNI_UNLOCK
+
         if(status == GNI_RC_SUCCESS )
         {
             if(pd->cqwrite_value == 0)
@@ -1688,16 +1672,12 @@ static void getLargeMsgRequest(void* header, uint64_t inst_id )
     if(status == GNI_RC_SUCCESS)
     {
         pd->local_mem_hndl  = msg_mem_hndl;
-#if CMK_SMP && !COMM_THREAD_SEND
-            CmiLock(tx_cq_lock);
-#endif
+        CMI_GNI_LOCK
         if(pd->type == GNI_POST_RDMA_GET) 
             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
         else
             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
-#if CMK_SMP && !COMM_THREAD_SEND
-            CmiUnlock(tx_cq_lock);
-#endif
+        CMI_GNI_UNLOCK
     }else
     {
         SetMemHndlZero(pd->local_mem_hndl);
@@ -1729,16 +1709,12 @@ static void PumpLocalRdmaTransactions()
 #endif
     SMSG_QUEUE         *queue = &smsg_queue;
 
-#if CMK_SMP && !COMM_THREAD_SEND
     while(1) {
-        CmiLock(tx_cq_lock);
+        CMI_GNI_LOCK 
         status = GNI_CqGetEvent(smsg_tx_cqh, &ev);
-        CmiUnlock(tx_cq_lock);
+        CMI_GNI_UNLOCK
         if(status != GNI_RC_SUCCESS) break;
-#else
-    while( (status = GNI_CqGetEvent(smsg_tx_cqh, &ev)) == GNI_RC_SUCCESS)
-    {
-#endif
+        
         type = GNI_CQ_GET_TYPE(ev);
         if (type == GNI_CQ_EVENT_TYPE_POST)
         {
@@ -1746,13 +1722,9 @@ static void PumpLocalRdmaTransactions()
 #if PRINT_SYH
             printf("[%d] LocalTransactions localdone=%d\n", myrank,  lrts_local_done_msg);
 #endif
-#if CMK_SMP && !COMM_THREAD_SEND
-            CmiLock(tx_cq_lock);
-#endif
+            CMI_GNI_LOCK
             status = GNI_GetCompleted(smsg_tx_cqh, ev, &tmp_pd);
-#if CMK_SMP && !COMM_THREAD_SEND
-            CmiUnlock(tx_cq_lock);
-#endif
+            CMI_GNI_UNLOCK
 #ifdef CMK_DIRECT
             if(tmp_pd->amo_cmd == 1)
             {
@@ -1928,17 +1900,12 @@ static void  SendRdmaMsg()
         }
         if(status == GNI_RC_SUCCESS)        //mem register good
         {
-#if CMK_SMP && !COMM_THREAD_SEND
-            CmiLock(tx_cq_lock);
-#endif
+            CMI_GNI_LOCK
             if(pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) 
                 status = GNI_PostRdma(ep_hndl_array[ptr->destNode], pd);
             else
                 status = GNI_PostFma(ep_hndl_array[ptr->destNode],  pd);
-#if CMK_SMP && !COMM_THREAD_SEND
-            CmiUnlock(tx_cq_lock);
-#endif
-
+            CMI_GNI_UNLOCK
             if(status == GNI_RC_SUCCESS)    //post good
             {
 #if !CMK_SMP
@@ -2001,7 +1968,7 @@ static int SendBufferMsg(SMSG_QUEUE *queue)
     int                        sent_cnt = 0;
 #endif
 
-#if ! CMK_SMP
+#if !CMK_SMP
     index = queue->smsg_head_index;
 #else
     index = 0;
@@ -2109,7 +2076,6 @@ static int SendBufferMsg(SMSG_QUEUE *queue)
             else
                 queue->smsg_head_index = queue->smsg_msglist_index[index].next;
         }else
-        {
             index_previous = index;
         }
         index = queue->smsg_msglist_index[index].next;
@@ -2613,7 +2579,7 @@ void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID)
     *myNodeID = myrank;
     *numNodes = mysize;
   
-#if !COMM_THREAD_SEND
+#if MULTI_THREAD_SEND
     /* Currently, we only consider the case that comm. thread will only recv msgs */
     Cmi_smp_mode_setting = COMM_THREAD_ONLY_RECV;
 #endif
@@ -2731,6 +2697,11 @@ void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID)
             printf("Charm++ Error: The mempool maximum size is too small, please use command line option +gni-mempool-max or environment variable CHARM_UGNI_MEMPOOL_MAX to increase the value to at least %1.fMB.\n",  BIG_MSG * 2.0/1024/1024 + 1);
             CmiAbort("mempool maximum size is too small. \n");
         }
+#if CMK_SMP && MULTI_THREAD_SEND
+        printf("Charm++> worker thread sending messages\n");
+#elif CMK_SMP && COMM_THREAD_SEND
+        printf("Charm++> only comm thread send/recv messages\n");
+#endif
     }
 #endif