modify remote event scheme to work with multi sender
authorGengbin Zheng <gzheng@illinois.edu>
Tue, 13 Mar 2012 22:36:46 +0000 (17:36 -0500)
committerGengbin Zheng <gzheng@illinois.edu>
Tue, 13 Mar 2012 22:36:46 +0000 (17:36 -0500)
src/arch/gemini_gni/machine.c

index fbad8904206067d8aa353c5e28e65dae773f6d5f..df78445f414e4a681e8df3e70210ddf67f0372b9 100644 (file)
 
 #include "converse.h"
 
-#define     LARGEPAGE              0
-
-#if LARGEPAGE
-#include <hugetlbfs.h>
-#endif
-
 #if CMK_DIRECT
 #include "cmidirect.h"
 #endif
 
+#define     LARGEPAGE              0
+
 #if CMK_SMP
 #define MULTI_THREAD_SEND          0
 #define COMM_THREAD_SEND           1
 #endif
 
-
 #if MULTI_THREAD_SEND
 #define CMK_WORKER_SINGLE_TASK     0
 #endif
 
+#define REMOTE_EVENT               0
+#define CQWRITE                    0
+
 #define CMI_EXERT_SEND_CAP     0
 #define        CMI_EXERT_RECV_CAP      0
 
@@ -74,9 +72,6 @@
 
 #define USE_LRTS_MEMPOOL                  1
 
-#define CQWRITE                           0
-#define REMOTE_EVENT                      0
-
 #define PRINT_SYH                         0
 
 // Trace communication thread
@@ -654,6 +649,7 @@ int next;
 static struct AckPool   *ackpool;
 static int    ackpoolsize;
 static int    ackpool_freehead;
+static CmiNodeLock  ackpool_lock;
 
 #define  GetAckAddress(s)          (ackpool[s].addr)
 
@@ -667,13 +663,18 @@ static void AckPool_init()
     }
     ackpool[i].next = -1;
     ackpool_freehead = 0;
+#if MULTI_THREAD_SEND 
+    ackpool_lock  = CmiCreateLock();
+#endif
 }
 
 static
 inline  int  AckPool_getslot(void *addr)
 {
     int i;
-    int s = ackpool_freehead;
+    int s;
+    CMI_GNI_LOCK(ackpool_lock);
+    s = ackpool_freehead;
     if (s == -1) {
         // printf("[%d] AckPool_getslot expand: %d\n", myrank, ackpoolsize);
         int newsize = ackpoolsize * 2;
@@ -692,6 +693,7 @@ inline  int  AckPool_getslot(void *addr)
     }
     ackpool_freehead = ackpool[s].next;
     ackpool[s].addr = addr;
+    CMI_GNI_UNLOCK(ackpool_lock);
     return s;
 }
 
@@ -699,8 +701,10 @@ static
 inline  void AckPool_freeslot(int s)
 {
     CmiAssert(s>=0 && s<ackpoolsize);
+    CMI_GNI_LOCK(ackpool_lock);
     ackpool[s].next = ackpool_freehead;
     ackpool_freehead = s;
+    CMI_GNI_UNLOCK(ackpool_lock);
 }
 
 
@@ -905,6 +909,7 @@ static uint32_t get_cookie(void)
 #include <sys/stat.h>
 #include <fcntl.h>
 #include <sys/mman.h>
+#include <hugetlbfs.h>
 
 // size must be _tlbpagesize aligned
 void *my_get_huge_pages(size_t size)
@@ -951,11 +956,16 @@ void CmiMachineProgressImpl() {
 }
 #endif
 
+static int SendBufferMsg(SMSG_QUEUE *queue);
 static void SendRdmaMsg();
 static void PumpNetworkSmsg();
 static void PumpLocalTransactions(gni_cq_handle_t tx_cqh, CmiNodeLock cq_lock);
+#if CQWRITE
 static void PumpCqWriteTransactions();
-static int SendBufferMsg(SMSG_QUEUE *queue);
+#endif
+#if REMOTE_EVENT
+static void PumpRemoteTransactions();
+#endif
 
 #if MACHINE_DEBUG_LOG
 FILE *debugLog = NULL;
@@ -1283,7 +1293,7 @@ static gni_return_t send_smsg_message(SMSG_QUEUE *queue, int destNode, void *msg
             if (control_msg_tmp->seq_id == 0 && control_msg_tmp->ack_index == -1)
                 control_msg_tmp->ack_index = AckPool_getslot((void*)control_msg_tmp->source_addr);
         }
-        GNI_EpSetEventData(ep_hndl_array[destNode], destNode, myrank);
+        // GNI_EpSetEventData(ep_hndl_array[destNode], destNode, myrank);
 #endif
         status = GNI_SmsgSendWTag(ep_hndl_array[destNode], buf, bufsize, msg, size, 0, tag);
 #if CMK_SMP_TRACE_COMMTHREAD
@@ -1530,6 +1540,7 @@ static      int         event_PumpRdmaTransaction = 444;
 static      int         event_SendBufferSmsg = 444;
 static      int         event_SendFmaRdmaMsg = 555;
 static      int         event_AdvanceCommunication = 666;
+
 static void registerUserTraceEvents() {
 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
     event_SetupConnect = traceRegisterUserEvent("setting up connections", -1 );
@@ -1633,12 +1644,14 @@ void LrtsPostCommonInit(int everReturn)
     CcdCallOnConditionKeep(CcdPERIODIC_10ms, (CcdVoidFn) PumpDatagramConnection, NULL);
 #endif
 
-   /* if (_checkProgress)
+#if ! LARGEPAGE
+    if (_checkProgress)
 #if CMK_SMP
     if (CmiMyRank() == 0)
 #endif
     CcdCallOnConditionKeep(CcdPERIODIC_2minute, (CcdVoidFn) CheckProgress, NULL);
-    */
+#endif
 #if !LARGEPAGE
     CcdCallOnCondition(CcdTOPOLOGY_AVAIL, (CcdVoidFn)set_limit, NULL);
 #endif
@@ -1895,12 +1908,10 @@ static void PumpNetworkSmsg()
                 MallocControlMsg(header_tmp);
                 memcpy(header_tmp, header, CONTROL_MSG_SIZE);
                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
-                CMI_GNI_UNLOCK(smsg_mailbox_lock)
-                   /* FIXME: leak */
 #else
                 header_tmp = (CONTROL_MSG *) header;
-                CMI_GNI_UNLOCK(smsg_mailbox_lock)
 #endif
+                CMI_GNI_UNLOCK(smsg_mailbox_lock)
                 void *msg = (void*)(header_tmp->source_addr);
                 int cur_seq = CmiGetMsgSeq(msg);
                 int offset = ONE_SEG*(cur_seq+1);
@@ -1987,23 +1998,22 @@ static void printDesc(gni_post_descriptor_t *pd)
     printf(" Descriptor (%p===>%p)(%d)\n", pd->local_addr, pd->remote_addr, pd->length); 
 }
 
+#if CQWRITE
 static void sendCqWrite(int destNode, uint64_t data, gni_mem_handle_t mem_hndl)
 {
     gni_post_descriptor_t *pd;
     gni_return_t        status = GNI_RC_SUCCESS;
     
     MallocPostDesc(pd);
-
     pd->type = GNI_POST_CQWRITE;
     pd->cq_mode = GNI_CQMODE_GLOBAL_EVENT | GNI_CQMODE_REMOTE_EVENT ;
     pd->dlvr_mode = GNI_DLVMODE_PERFORMANCE;
     pd->cqwrite_value = data;
     pd->remote_mem_hndl = mem_hndl;
     status = GNI_PostCqWrite(ep_hndl_array[destNode], pd);
-
     GNI_RC_CHECK("GNI_PostCqWrite", status);
-
 }
+#endif
 
 // for BIG_MSG called on receiver side for receiving control message
 // LMSG_INIT_TAG
@@ -2085,6 +2095,8 @@ static void getLargeMsgRequest(void* header, uint64_t inst_id )
     //memory registration success
     if(status == GNI_RC_SUCCESS)
     {
+        CmiNodeLock lock = pd->type == GNI_POST_RDMA_GET?rdma_tx_cq_lock:default_tx_cq_lock;
+        CMI_GNI_LOCK(lock)
 #if REMOTE_EVENT
         if( request_msg->seq_id == 0)
         {
@@ -2099,16 +2111,13 @@ static void getLargeMsgRequest(void* header, uint64_t inst_id )
 #endif
         if(pd->type == GNI_POST_RDMA_GET) 
         {
-            CMI_GNI_LOCK(rdma_tx_cq_lock)
             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
-            CMI_GNI_UNLOCK(rdma_tx_cq_lock)
         }
         else
         {
-            CMI_GNI_LOCK(default_tx_cq_lock)
             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
-            CMI_GNI_UNLOCK(default_tx_cq_lock)
         }
+        CMI_GNI_UNLOCK(lock)
 
         if(status == GNI_RC_SUCCESS )
         {
@@ -2206,6 +2215,7 @@ static void getLargeMsgRequest(void* header, uint64_t inst_id )
 #endif
 }
 
+#if CQWRITE
 static void PumpCqWriteTransactions()
 {
 
@@ -2234,6 +2244,7 @@ static void PumpCqWriteTransactions()
         GNI_RC_CHECK("rdma_rx_cq full too many ack", status);
     }
 }
+#endif
 
 #if REMOTE_EVENT
 static void PumpRemoteTransactions()
@@ -2244,15 +2255,20 @@ static void PumpRemoteTransactions()
     int                     slot;
 
     while(1) {
-        //CMI_GNI_LOCK(my_cq_lock) 
+        CMI_GNI_LOCK(global_gni_lock)
         status = GNI_CqGetEvent(rdma_rx_cqh, &ev);
-        //CMI_GNI_UNLOCK(my_cq_lock)
-        if(status != GNI_RC_SUCCESS) break;
+        CMI_GNI_UNLOCK(global_gni_lock)
+        if(status != GNI_RC_SUCCESS) {
+            break;
+        }
 
         slot = GNI_CQ_GET_INST_ID(ev);
         slot = ACK_GET_INDEX(slot);
         //slot = GNI_CQ_GET_DATA(ev) & 0xFFFFFFFFL;
+
+        //CMI_GNI_LOCK(ackpool_lock);
         msg = GetAckAddress(slot);
+        //CMI_GNI_UNLOCK(ackpool_lock);
 
         DecreaseMsgInSend(msg);
 #if ! USE_LRTS_MEMPOOL
@@ -2264,7 +2280,7 @@ static void PumpRemoteTransactions()
             buffered_send_msg -= GetMempoolsize(msg);
         CmiFree(msg);
         AckPool_freeslot(slot);
-    };
+    }
     if(status == GNI_RC_ERROR_RESOURCE)
     {
         GNI_RC_CHECK("rdma_rx_cq full too many ack", status);
@@ -2377,7 +2393,7 @@ static void PumpLocalTransactions(gni_cq_handle_t my_tx_cqh, CmiNodeLock my_cq_l
 #endif
             if (msg_tag == ACK_TAG) {
 #if !REMOTE_EVENT
-#if         !CQWRITE
+#if   !CQWRITE
                 status = send_smsg_message(queue, inst_id, ack_msg, ACK_MSG_SIZE, msg_tag, 0); 
                 if (status == GNI_RC_SUCCESS) FreeAckMsg(ack_msg);
 #else
@@ -2486,6 +2502,8 @@ static void  SendRdmaMsg()
         }
         if(status == GNI_RC_SUCCESS)        //mem register good
         {
+            CmiNodeLock lock = pd->type == GNI_POST_RDMA_GET? rdma_tx_cq_lock:default_tx_cq_lock;
+            CMI_GNI_LOCK(lock);
 #if REMOTE_EVENT
             if( pd->cqwrite_value == 0)
             {
@@ -2500,16 +2518,13 @@ static void  SendRdmaMsg()
 #endif
             if(pd->type == GNI_POST_RDMA_GET) 
             {
-                CMI_GNI_LOCK(rdma_tx_cq_lock)
-                    status = GNI_PostRdma(ep_hndl_array[ptr->destNode], pd);
-                CMI_GNI_UNLOCK(rdma_tx_cq_lock)
+                status = GNI_PostRdma(ep_hndl_array[ptr->destNode], pd);
             }
             else
             {
-                CMI_GNI_LOCK(default_tx_cq_lock)
-                    status = GNI_PostFma(ep_hndl_array[ptr->destNode],  pd);
-                CMI_GNI_UNLOCK(default_tx_cq_lock)
+                status = GNI_PostFma(ep_hndl_array[ptr->destNode],  pd);
             }
+            CMI_GNI_UNLOCK(lock);
             
             if(status == GNI_RC_SUCCESS)    //post good
             {
@@ -2834,9 +2849,9 @@ void LrtsAdvanceCommunication(int whileidle)
     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SendFmaRdmaMsg, startT, endT);
 #endif
 
-//#if CMK_SMP
-//    if (_detected_hang)  ProcessDeadlock();
-//#endif
+#if CMK_SMP && ! LARGEPAGE
+    if (_detected_hang)  ProcessDeadlock();
+#endif
 }
 
 /* useDynamicSMSG */
@@ -3433,7 +3448,7 @@ void* LrtsAlloc(int n_bytes, int header)
 
 void  LrtsFree(void *msg)
 {
-    int size = SIZEFIELD((char*)msg+sizeof(CmiChunkHeader));
+    CmiUInt4 size = SIZEFIELD((char*)msg+sizeof(CmiChunkHeader));
     if (size <= SMSG_MAX_MSG)
         free(msg);
     else {