split gni global lock to 2 locks, improve performance
authorYanhua Sun <yanhuas@jyc1.(none)>
Mon, 12 Mar 2012 04:16:00 +0000 (23:16 -0500)
committerYanhua Sun <yanhuas@jyc1.(none)>
Mon, 12 Mar 2012 04:16:00 +0000 (23:16 -0500)
src/arch/gemini_gni/machine.c

index 09e4a6c71e758e949ddf9b829ae01f7d3a44466c..badb955cb151ead32a72e4f698d0f2ff80353d21 100644 (file)
@@ -152,14 +152,14 @@ static CmiInt8  MAX_REG_MEM    =  25*oneMB;
 
 #endif     /* end USE_LRTS_MEMPOOL */
 
-#if MULTI_THREAD_SEND
-#define     CMI_GNI_LOCK        CmiLock(tx_cq_lock);
-#define     CMI_GNI_UNLOCK        CmiUnlock(tx_cq_lock);
+#if MULTI_THREAD_SEND 
+#define     CMI_GNI_LOCK(x)       CmiLock(x);
+#define     CMI_GNI_UNLOCK(x)        CmiUnlock(x);
 #define     CMI_PCQUEUEPOP_LOCK(Q)   CmiLock((Q)->lock);
 #define     CMI_PCQUEUEPOP_UNLOCK(Q)    CmiUnlock((Q)->lock);
 #else
-#define     CMI_GNI_LOCK
-#define     CMI_GNI_UNLOCK
+#define     CMI_GNI_LOCK(x)
+#define     CMI_GNI_UNLOCK(x)
 #define     CMI_PCQUEUEPOP_LOCK(Q)   
 #define     CMI_PCQUEUEPOP_UNLOCK(Q)
 #endif
@@ -357,16 +357,20 @@ gni_msgq_ep_attr_t      msgq_ep_attrs_size;
 static int cookie;
 static int modes = 0;
 static gni_cq_handle_t       smsg_rx_cqh = NULL;
-static gni_cq_handle_t       smsg_tx_cqh = NULL;
+static gni_cq_handle_t       default_tx_cqh = NULL;
+static gni_cq_handle_t       rdma_tx_cqh = NULL;
 static gni_cq_handle_t       post_rx_cqh = NULL;
 static gni_cq_handle_t       post_tx_cqh = NULL;
 static gni_ep_handle_t       *ep_hndl_array;
-#if MULTI_THREAD_SEND
+
 static CmiNodeLock           *ep_lock_array;
-static CmiNodeLock           tx_cq_lock; 
+static CmiNodeLock           default_tx_cq_lock; 
+static CmiNodeLock           rdma_tx_cq_lock; 
+static CmiNodeLock           global_gni_lock; 
 static CmiNodeLock           rx_cq_lock;
+static CmiNodeLock           smsg_mailbox_lock;
+static CmiNodeLock           smsg_rx_cq_lock;
 static CmiNodeLock           *mempool_lock;
-#endif
 
 typedef struct msg_list
 {
@@ -862,7 +866,7 @@ void CmiMachineProgressImpl() {
 
 static void SendRdmaMsg();
 static void PumpNetworkSmsg();
-static void PumpLocalRdmaTransactions();
+static void PumpLocalTransactions(gni_cq_handle_t tx_cqh, CmiNodeLock cq_lock);
 static int SendBufferMsg(SMSG_QUEUE *queue);
 
 #if MACHINE_DEBUG_LOG
@@ -1053,7 +1057,7 @@ static void setup_smsg_connection(int destNode)
     pd->local_addr      = (uint64_t) smsg_attr;
     pd->remote_addr     = (uint64_t)&((((gni_smsg_attr_t*)(smsg_connection_vec[destNode].addr))[myrank]));
     pd->remote_mem_hndl = smsg_connection_vec[destNode].mdh;
-    pd->src_cq_hndl     = 0;
+    pd->src_cq_hndl     = rdma_tx_cqh;
     pd->rdma_mode       = 0;
     status = GNI_PostFma(ep_hndl_array[destNode],  pd);
     print_smsg_attr(smsg_attr);
@@ -1118,9 +1122,9 @@ static int connect_to(int destNode)
     alloc_smsg_attr(smsg_attr_vector_local[destNode]);
     smsg_attr_vector_remote[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
     
-    CMI_GNI_LOCK
+    CMI_GNI_LOCK(global_gni_lock)
     status = GNI_EpPostDataWId (ep_hndl_array[destNode], smsg_attr_vector_local[destNode], sizeof(gni_smsg_attr_t),smsg_attr_vector_remote[destNode] ,sizeof(gni_smsg_attr_t), destNode+mysize);
-    CMI_GNI_UNLOCK
+    CMI_GNI_UNLOCK(global_gni_lock)
     if (status == GNI_RC_ERROR_RESOURCE) {
       /* possibly destNode is making connection at the same time */
       free(smsg_attr_vector_local[destNode]);
@@ -1227,7 +1231,8 @@ static gni_return_t send_smsg_message(SMSG_QUEUE *queue, int destNode, void *msg
             }
         }
 #endif
-        CMI_GNI_LOCK
+        //CMI_GNI_LOCK(smsg_mailbox_lock)
+        CMI_GNI_LOCK(default_tx_cq_lock)
 #if CMK_SMP_TRACE_COMMTHREAD
         int oldpe = -1;
         int oldeventid = -1;
@@ -1246,7 +1251,8 @@ static gni_return_t send_smsg_message(SMSG_QUEUE *queue, int destNode, void *msg
 #if CMK_SMP_TRACE_COMMTHREAD
         if (oldpe != -1)  TRACE_COMM_SET_MSGID(real_data, oldpe, oldeventid);
 #endif
-        CMI_GNI_UNLOCK
+        CMI_GNI_UNLOCK(default_tx_cq_lock)
+        //CMI_GNI_UNLOCK(smsg_mailbox_lock)
         if(status == GNI_RC_SUCCESS)
         {
 #if CMK_SMP_TRACE_COMMTHREAD
@@ -1445,7 +1451,7 @@ CmiCommHandle LrtsSendFunc(int destNode, int size, char *msg, int mode)
 #if PRINT_SYH
     printf("LrtsSendFn %d==>%d, size=%d\n", myrank, destNode, size);
 #endif 
-#if CMK_SMP && COMM_THREAD_SEND
+#if CMK_SMP 
     if(size <= SMSG_MAX_MSG)
         buffer_small_msgs(queue, msg, size, destNode, SMALL_DATA_TAG);
     else if (size < BIG_MSG) {
@@ -1485,6 +1491,7 @@ static void    PumpDatagramConnection();
 static      int         event_SetupConnect = 111;
 static      int         event_PumpSmsg = 222 ;
 static      int         event_PumpTransaction = 333;
+static      int         event_PumpRdmaTransaction = 444;
 static      int         event_SendBufferSmsg = 444;
 static      int         event_SendFmaRdmaMsg = 555;
 static      int         event_AdvanceCommunication = 666;
@@ -1492,7 +1499,8 @@ static void registerUserTraceEvents() {
 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
     event_SetupConnect = traceRegisterUserEvent("setting up connections", -1 );
     event_PumpSmsg = traceRegisterUserEvent("Pump network small msgs", -1);
-    event_PumpTransaction = traceRegisterUserEvent("Pump FMA/RDMA local transaction" , -1);
+    event_PumpTransaction = traceRegisterUserEvent("Pump FMA local transaction" , -1);
+    event_PumpRdmaTransaction = traceRegisterUserEvent("Pump RDMA local transaction" , -1);
     event_SendBufferSmsg = traceRegisterUserEvent("Sending buffered small msgs", -1);
     event_SendFmaRdmaMsg = traceRegisterUserEvent("Sending buffered fma/rdma transactions", -1);
     event_AdvanceCommunication = traceRegisterUserEvent("Worker thread in sending/receiving", -1);
@@ -1616,8 +1624,8 @@ void LrtsPostNonLocal(){
     startT = CmiWallTimer();
 #endif
     PumpNetworkSmsg();
-    PumpLocalRdmaTransactions();
-    
+    PumpLocalTransactions(default_tx_cqh, default_tx_cq_lock);
+    PumpLocalTransactions(rdma_tx_cqh, rdma_tx_cq_lock);
 #if CMK_USE_OOB
     if (SendBufferMsg(&smsg_oob_queue) == 1)
 #endif
@@ -1634,7 +1642,6 @@ void LrtsPostNonLocal(){
     }
 
     SendRdmaMsg();
-
 #if CMK_SMP_TRACE_COMMTHREAD
     endT = CmiWallTimer();
     traceUserBracketEvent(event_AdvanceCommunication, startT, endT);
@@ -1659,9 +1666,9 @@ static void    PumpDatagramConnection()
    {
        if (datagram_id >= mysize) {           /* bound endpoint */
            int pe = datagram_id - mysize;
-           CMI_GNI_LOCK
+           CMI_GNI_LOCK(global_gni_lock)
            status = GNI_EpPostDataTestById( ep_hndl_array[pe], datagram_id, &post_state, &remote_address, &remote_id);
-           CMI_GNI_UNLOCK
+           CMI_GNI_UNLOCK(global_gni_lock)
            if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
            {
                CmiAssert(remote_id == pe);
@@ -1767,9 +1774,9 @@ static void PumpNetworkSmsg()
 #endif
     while(1)
     {
-        CMI_GNI_LOCK
+        CMI_GNI_LOCK(smsg_rx_cq_lock)
         status =GNI_CqGetEvent(smsg_rx_cqh, &event_data);
-        CMI_GNI_UNLOCK
+        CMI_GNI_UNLOCK(smsg_rx_cq_lock)
         if(status != GNI_RC_SUCCESS)
             break;
         inst_id = GNI_CQ_GET_INST_ID(event_data);
@@ -1784,11 +1791,11 @@ static void PumpNetworkSmsg()
         }
         msg_tag = GNI_SMSG_ANY_TAG;
         while(1) {
-            CMI_GNI_LOCK
+            CMI_GNI_LOCK(smsg_mailbox_lock)
             status = GNI_SmsgGetNextWTag(ep_hndl_array[inst_id], &header, &msg_tag);
             if (status != GNI_RC_SUCCESS)
             {
-                CMI_GNI_UNLOCK
+                CMI_GNI_UNLOCK(smsg_mailbox_lock)
                 break;
             }
 #if PRINT_SYH
@@ -1810,7 +1817,7 @@ static void PumpNetworkSmsg()
                 msg_data    = CmiAlloc(msg_nbytes);
                 memcpy(msg_data, (char*)header, msg_nbytes);
                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
-                CMI_GNI_UNLOCK
+                CMI_GNI_UNLOCK(smsg_mailbox_lock)
                 TRACE_COMM_CREATION(CpvAccess(projTraceStart), msg_data);
                 handleOneRecvedMsg(msg_nbytes, msg_data);
                 break;
@@ -1828,10 +1835,11 @@ static void PumpNetworkSmsg()
                 MallocControlMsg(control_msg_tmp);
                 memcpy(control_msg_tmp, header, CONTROL_MSG_SIZE);
                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
-                CMI_GNI_UNLOCK
+                CMI_GNI_UNLOCK(smsg_mailbox_lock)
                 getLargeMsgRequest(control_msg_tmp, inst_id);
                 FreeControlMsg(control_msg_tmp);
 #else
+                CMI_GNI_UNLOCK(smsg_mailbox_lock)
                 getLargeMsgRequest(header, inst_id);
                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
 #endif
@@ -1842,7 +1850,7 @@ static void PumpNetworkSmsg()
                 /* Get is done, release message . Now put is not used yet*/
                 void *msg = (void*)(((ACK_MSG *)header)->source_addr);
                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
-                CMI_GNI_UNLOCK
+                CMI_GNI_UNLOCK(smsg_mailbox_lock)
 #if ! USE_LRTS_MEMPOOL
                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
 #else
@@ -1860,10 +1868,11 @@ static void PumpNetworkSmsg()
                 MallocControlMsg(header_tmp);
                 memcpy(header_tmp, header, CONTROL_MSG_SIZE);
                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
-                CMI_GNI_UNLOCK
+                CMI_GNI_UNLOCK(smsg_mailbox_lock)
                    /* FIXME: leak */
 #else
                 header_tmp = (CONTROL_MSG *) header;
+                CMI_GNI_UNLOCK(smsg_mailbox_lock)
 #endif
                 void *msg = (void*)(header_tmp->source_addr);
                 int cur_seq = CmiGetMsgSeq(msg);
@@ -1908,7 +1917,7 @@ static void PumpNetworkSmsg()
                 void *msg = (void *)((CONTROL_MSG *) header)->source_addr;
                 int size = ((CONTROL_MSG *) header)->length;
                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
-                CMI_GNI_UNLOCK
+                CMI_GNI_UNLOCK(smsg_mailbox_lock)
                 CmiReference(msg);
                 handleOneRecvedMsg(size, msg); 
                 break;
@@ -1919,7 +1928,7 @@ static void PumpNetworkSmsg()
                 direct_msg = (cmidirectMsg*)CmiAlloc(sizeof(cmidirectMsg));
                 direct_msg->handler = ((CMK_DIRECT_HEADER*)header)->handler_addr;
                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
-                CMI_GNI_UNLOCK
+                CMI_GNI_UNLOCK(smsg_mailbox_lock)
                 CmiSetHandler(direct_msg, CpvAccess(CmiHandleDirectIdx));
                 CmiPushPE(((CmiDirectUserHandle*)direct_msg->handler)->remoteRank, direct_msg);
                 //(*(((CMK_DIRECT_HEADER*) header)->callbackFnPtr))(((CMK_DIRECT_HEADER*) header)->callbackData);
@@ -1927,7 +1936,7 @@ static void PumpNetworkSmsg()
 #endif
             default: {
                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
-                CMI_GNI_UNLOCK
+                CMI_GNI_UNLOCK(smsg_mailbox_lock)
                 printf("weird tag problem\n");
                 CmiAbort("Unknown tag\n");
                      }
@@ -2027,19 +2036,25 @@ static void getLargeMsgRequest(void* header, uint64_t inst_id )
     pd->local_addr      = (uint64_t) msg_data;
     pd->remote_addr     = request_msg->source_addr + offset;
     pd->remote_mem_hndl = request_msg->source_mem_hndl;
-    pd->src_cq_hndl     = 0;//post_tx_cqh;     /* smsg_tx_cqh;  */
+    pd->src_cq_hndl     = rdma_tx_cqh;
     pd->rdma_mode       = 0;
     pd->amo_cmd         = 0;
 
     //memory registration success
     if(status == GNI_RC_SUCCESS)
     {
-        CMI_GNI_LOCK
         if(pd->type == GNI_POST_RDMA_GET) 
+        {
+            CMI_GNI_LOCK(rdma_tx_cq_lock)
             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
+            CMI_GNI_UNLOCK(rdma_tx_cq_lock)
+        }
         else
+        {
+            CMI_GNI_LOCK(default_tx_cq_lock)
             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
-        CMI_GNI_UNLOCK
+            CMI_GNI_UNLOCK(default_tx_cq_lock)
+        }
 
         if(status == GNI_RC_SUCCESS )
         {
@@ -2098,7 +2113,7 @@ static void getLargeMsgRequest(void* header, uint64_t inst_id )
     pd->local_addr      = (uint64_t) msg_data;
     pd->remote_addr     = request_msg->source_addr;
     pd->remote_mem_hndl = request_msg->source_mem_hndl;
-    pd->src_cq_hndl     = 0;//post_tx_cqh;     /* smsg_tx_cqh;  */
+    pd->src_cq_hndl     = rdma_tx_cqh;
     pd->rdma_mode       = 0;
     pd->amo_cmd         = 0;
 
@@ -2106,12 +2121,20 @@ static void getLargeMsgRequest(void* header, uint64_t inst_id )
     if(status == GNI_RC_SUCCESS)
     {
         pd->local_mem_hndl  = msg_mem_hndl;
-        CMI_GNI_LOCK
+       
         if(pd->type == GNI_POST_RDMA_GET) 
+        {
+            CMI_GNI_LOCK(rdma_tx_cq_lock)
             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
+            CMI_GNI_UNLOCK(rdma_tx_cq_lock)
+        }
         else
+        {
+            CMI_GNI_LOCK(default_tx_cq_lock)
             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
-        CMI_GNI_UNLOCK
+            CMI_GNI_UNLOCK(default_tx_cq_lock)
+        }
+
     }else
     {
         SetMemHndlZero(pd->local_mem_hndl);
@@ -2129,7 +2152,7 @@ static void getLargeMsgRequest(void* header, uint64_t inst_id )
 #endif
 }
 
-static void PumpLocalRdmaTransactions()
+static void PumpLocalTransactions(gni_cq_handle_t my_tx_cqh, CmiNodeLock my_cq_lock)
 {
     gni_cq_entry_t          ev;
     gni_return_t            status;
@@ -2145,9 +2168,9 @@ static void PumpLocalRdmaTransactions()
     SMSG_QUEUE         *queue = &smsg_queue;
 
     while(1) {
-        CMI_GNI_LOCK 
-        status = GNI_CqGetEvent(smsg_tx_cqh, &ev);
-        CMI_GNI_UNLOCK
+        CMI_GNI_LOCK(my_cq_lock) 
+        status = GNI_CqGetEvent(my_tx_cqh, &ev);
+        CMI_GNI_UNLOCK(my_cq_lock)
         if(status != GNI_RC_SUCCESS) break;
         
         type = GNI_CQ_GET_TYPE(ev);
@@ -2157,9 +2180,9 @@ static void PumpLocalRdmaTransactions()
 #if PRINT_SYH
             printf("[%d] LocalTransactions localdone=%d\n", myrank,  lrts_local_done_msg);
 #endif
-            CMI_GNI_LOCK
-            status = GNI_GetCompleted(smsg_tx_cqh, ev, &tmp_pd);
-            CMI_GNI_UNLOCK
+            CMI_GNI_LOCK(my_cq_lock)
+            status = GNI_GetCompleted(my_tx_cqh, ev, &tmp_pd);
+            CMI_GNI_UNLOCK(my_cq_lock)
 
             switch (tmp_pd->type) {
 #if CMK_PERSISTENT_COMM || CMK_DIRECT
@@ -2218,7 +2241,7 @@ static void PumpLocalRdmaTransactions()
             }
             default:
                 CmiPrintf("type=%d\n", tmp_pd->type);
-                CmiAbort("PumpLocalRdmaTransactions: unknown type!");
+                CmiAbort("PumpLocalTransactions: unknown type!");
             }      /* end of switch */
 
 #if CMK_DIRECT
@@ -2331,12 +2354,19 @@ static void  SendRdmaMsg()
         }
         if(status == GNI_RC_SUCCESS)        //mem register good
         {
-            CMI_GNI_LOCK
-            if(pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) 
-                status = GNI_PostRdma(ep_hndl_array[ptr->destNode], pd);
+            if(pd->type == GNI_POST_RDMA_GET) 
+            {
+                CMI_GNI_LOCK(rdma_tx_cq_lock)
+                    status = GNI_PostRdma(ep_hndl_array[ptr->destNode], pd);
+                CMI_GNI_UNLOCK(rdma_tx_cq_lock)
+            }
             else
-                status = GNI_PostFma(ep_hndl_array[ptr->destNode],  pd);
-            CMI_GNI_UNLOCK
+            {
+                CMI_GNI_LOCK(default_tx_cq_lock)
+                    status = GNI_PostFma(ep_hndl_array[ptr->destNode],  pd);
+                CMI_GNI_UNLOCK(default_tx_cq_lock)
+            }
+            
             if(status == GNI_RC_SUCCESS)    //post good
             {
 #if !CMK_SMP
@@ -2608,12 +2638,23 @@ void LrtsAdvanceCommunication(int whileidle)
 #if CMK_SMP_TRACE_COMMTHREAD
     startT = CmiWallTimer();
 #endif
-    PumpLocalRdmaTransactions();
-    //MACHSTATE(8, "after PumpLocalRdmaTransactions\n") ; 
+    PumpLocalTransactions(default_tx_cqh, default_tx_cq_lock);
+    //MACHSTATE(8, "after PumpLocalTransactions\n") ; 
 #if CMK_SMP_TRACE_COMMTHREAD
     endT = CmiWallTimer();
     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpTransaction, startT, endT);
 #endif
+
+#if CMK_SMP_TRACE_COMMTHREAD
+    startT = CmiWallTimer();
+#endif
+    PumpLocalTransactions(rdma_tx_cqh,  rdma_tx_cq_lock);
+    //MACHSTATE(8, "after PumpLocalTransactions\n") ; 
+#if CMK_SMP_TRACE_COMMTHREAD
+    endT = CmiWallTimer();
+    if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpRdmaTransaction, startT, endT);
+#endif
     /* Send buffered Message */
 #if CMK_SMP_TRACE_COMMTHREAD
     startT = CmiWallTimer();
@@ -2700,7 +2741,7 @@ static void _init_dynamic_smsg()
         &(mailbox_list->mem_hndl));
     GNI_RC_CHECK("MEMORY registration for smsg", status);
 
-    status = GNI_EpCreate(nic_hndl, smsg_tx_cqh, &ep_hndl_unbound);
+    status = GNI_EpCreate(nic_hndl, default_tx_cqh, &ep_hndl_unbound);
     GNI_RC_CHECK("Unbound EP", status);
     
     alloc_smsg_attr(&send_smsg_attr);
@@ -3170,11 +3211,11 @@ void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID)
     allgather(&local_addr, MPID_UGNI_AllAddr, sizeof(unsigned int));
     /* create the local completion queue */
     /* the third parameter : The number of events the NIC allows before generating an interrupt. Setting this parameter to zero results in interrupt delivery with every event. When using this parameter, the mode parameter must be set to GNI_CQ_BLOCKING*/
-    status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &smsg_tx_cqh);
+    status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &default_tx_cqh);
     GNI_RC_CHECK("GNI_CqCreate (tx)", status);
     
-    status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &post_tx_cqh);
-    GNI_RC_CHECK("GNI_CqCreate post (tx)", status);
+    status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_tx_cqh);
+    GNI_RC_CHECK("GNI_CqCreate RDMA (tx)", status);
     /* create the destination completion queue for receiving micro-messages, make this queue considerably larger than the number of transfers */
 
     status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &smsg_rx_cqh);
@@ -3190,12 +3231,16 @@ void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID)
     ep_hndl_array = (gni_ep_handle_t*)malloc(mysize * sizeof(gni_ep_handle_t));
     _MEMCHECK(ep_hndl_array);
 #if MULTI_THREAD_SEND 
-    tx_cq_lock  = CmiCreateLock();
-    rx_cq_lock  = CmiCreateLock();
+    rx_cq_lock = global_gni_lock = default_tx_cq_lock = smsg_mailbox_lock = CmiCreateLock();
+    //default_tx_cq_lock = CmiCreateLock();
+    rdma_tx_cq_lock = CmiCreateLock();
+    smsg_rx_cq_lock = CmiCreateLock();
+    //global_gni_lock  = CmiCreateLock();
+    //rx_cq_lock  = CmiCreateLock();
 #endif
     for (i=0; i<mysize; i++) {
         if(i == myrank) continue;
-        status = GNI_EpCreate(nic_hndl, smsg_tx_cqh, &ep_hndl_array[i]);
+        status = GNI_EpCreate(nic_hndl, default_tx_cqh, &ep_hndl_array[i]);
         GNI_RC_CHECK("GNI_EpCreate ", status);   
         remote_addr = MPID_UGNI_AllAddr[i];
         status = GNI_EpBind(ep_hndl_array[i], remote_addr, i);
@@ -3423,7 +3468,8 @@ void LrtsDrainResources()
         if (useDynamicSMSG)
             PumpDatagramConnection();
         PumpNetworkSmsg();
-        PumpLocalRdmaTransactions();
+        PumpLocalTransactions(default_tx_cqh, default_tx_cq_lock);
+        PumpLocalTransactions(rdma_tx_cqh, rdma_tx_cq_lock);
         SendRdmaMsg();
     }
     PMI_Barrier();