added CQWrite to replace ACK msg
authorYanhua Sun <yanhuas@jyc1.(none)>
Mon, 12 Mar 2012 20:36:49 +0000 (15:36 -0500)
committerYanhua Sun <yanhuas@jyc1.(none)>
Mon, 12 Mar 2012 20:36:49 +0000 (15:36 -0500)
src/arch/gemini_gni/machine.c

index badb955cb151ead32a72e4f698d0f2ff80353d21..a963eaf7789f517d3e6b87dfc99e45b285a44096 100644 (file)
@@ -73,7 +73,8 @@
 
 #define USE_LRTS_MEMPOOL                  1
 
-#define REMOTE_EVENT                      0
+#define CQWRITE                     1
+#define REMOTE_EVENT                      1
 
 #define PRINT_SYH  0
 
@@ -228,18 +229,17 @@ onesided_md_t    omdh;
 #else
 uint8_t   onesided_hnd, omdh;
 #if REMOTE_EVENT
-#define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh, status)    if(register_memory_size+size>= MAX_REG_MEM) { \
-         status = GNI_RC_ERROR_NOMEM;} \
-        else {status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, smsg_rx_cqh,  GNI_MEM_READWRITE, -1, mem_hndl); \
-                if(status == GNI_RC_SUCCESS) register_memory_size += size; }  
+#define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh, status) \
+    if(register_memory_size+size>= MAX_REG_MEM) { \
+        status = GNI_RC_ERROR_NOMEM;} \
+    else {status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, rdma_rx_cqh,  GNI_MEM_READWRITE, -1, mem_hndl); \
+        if(status == GNI_RC_SUCCESS) register_memory_size += size; }  
 #else
 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh, status ) \
-    do {   \
         if (register_memory_size + size >= MAX_REG_MEM) { \
             status = GNI_RC_ERROR_NOMEM; \
         } else { status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, NULL,  GNI_MEM_READWRITE, -1, mem_hndl); \
-            if(status == GNI_RC_SUCCESS) register_memory_size += size; } \
-    } while(0)
+            if(status == GNI_RC_SUCCESS) register_memory_size += size; } 
 #endif
 #define  MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh, size)  \
     do { if (GNI_MemDeregister(nic_hndl, (mem_hndl) ) == GNI_RC_SUCCESS) \
@@ -359,7 +359,7 @@ static int modes = 0;
 static gni_cq_handle_t       smsg_rx_cqh = NULL;
 static gni_cq_handle_t       default_tx_cqh = NULL;
 static gni_cq_handle_t       rdma_tx_cqh = NULL;
-static gni_cq_handle_t       post_rx_cqh = NULL;
+static gni_cq_handle_t       rdma_rx_cqh = NULL;
 static gni_cq_handle_t       post_tx_cqh = NULL;
 static gni_ep_handle_t       *ep_hndl_array;
 
@@ -867,6 +867,7 @@ void CmiMachineProgressImpl() {
 static void SendRdmaMsg();
 static void PumpNetworkSmsg();
 static void PumpLocalTransactions(gni_cq_handle_t tx_cqh, CmiNodeLock cq_lock);
+static void PumpCqWriteTransactions();
 static int SendBufferMsg(SMSG_QUEUE *queue);
 
 #if MACHINE_DEBUG_LOG
@@ -1051,7 +1052,8 @@ static void setup_smsg_connection(int destNode)
     smsg_available_slot++;
     MallocPostDesc(pd);
     pd->type            = GNI_POST_FMA_PUT;
-    pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
+    //pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
+    pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT ;
     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
     pd->length          = sizeof(gni_smsg_attr_t);
     pd->local_addr      = (uint64_t) smsg_attr;
@@ -1959,7 +1961,23 @@ static void printDesc(gni_post_descriptor_t *pd)
 {
     printf(" Descriptor (%p===>%p)(%d)\n", pd->local_addr, pd->remote_addr, pd->length); 
 }
+static void sendCqWrite(int destNode, uint64_t data, gni_mem_handle_t mem_hndl)
+{
+    gni_post_descriptor_t *pd;
+    gni_return_t        status = GNI_RC_SUCCESS;
+    
+    MallocPostDesc(pd);
 
+    pd->type = GNI_POST_CQWRITE;
+    pd->cq_mode = GNI_CQMODE_GLOBAL_EVENT | GNI_CQMODE_REMOTE_EVENT ;
+    pd->dlvr_mode = GNI_DLVMODE_PERFORMANCE;
+    pd->cqwrite_value = data;
+    pd->remote_mem_hndl = mem_hndl;
+    status = GNI_PostCqWrite(ep_hndl_array[destNode], pd);
+
+    GNI_RC_CHECK("GNI_PostCqWrite", status);
+
+}
 // for BIG_MSG called on receiver side for receiving control message
 // LMSG_INIT_TAG
 static void getLargeMsgRequest(void* header, uint64_t inst_id )
@@ -2026,11 +2044,7 @@ static void getLargeMsgRequest(void* header, uint64_t inst_id )
         pd->type            = GNI_POST_FMA_GET;
     else
         pd->type            = GNI_POST_RDMA_GET;
-#if REMOTE_EVENT
-    pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
-#else
-    pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
-#endif
+    pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;// |  GNI_CQMODE_REMOTE_EVENT;
     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
     pd->length          = transaction_size;
     pd->local_addr      = (uint64_t) msg_data;
@@ -2103,11 +2117,7 @@ static void getLargeMsgRequest(void* header, uint64_t inst_id )
         pd->type            = GNI_POST_FMA_GET;
     else
         pd->type            = GNI_POST_RDMA_GET;
-#if REMOTE_EVENT
-    pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
-#else
-    pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
-#endif
+    pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;// |  GNI_CQMODE_REMOTE_EVENT;
     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
     pd->length          = ALIGN64(request_msg->length);
     pd->local_addr      = (uint64_t) msg_data;
@@ -2152,6 +2162,36 @@ static void getLargeMsgRequest(void* header, uint64_t inst_id )
 #endif
 }
 
+static void PumpCqWriteTransactions()
+{
+
+    gni_cq_entry_t          ev;
+    gni_return_t            status;
+    void                    *msg;   
+    while(1) {
+        //CMI_GNI_LOCK(my_cq_lock) 
+        status = GNI_CqGetEvent(rdma_rx_cqh, &ev);
+        //CMI_GNI_UNLOCK(my_cq_lock)
+        if(status != GNI_RC_SUCCESS) break;
+        msg = (void*) ( GNI_CQ_GET_DATA(ev) & 0xFFFFFFFFFFFFL);
+
+        DecreaseMsgInSend(msg);
+#if ! USE_LRTS_MEMPOOL
+       // MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
+#else
+        DecreaseMsgInSend(msg);
+#endif
+        if(NoMsgInSend(msg))
+            buffered_send_msg -= GetMempoolsize(msg);
+        CmiFree(msg);
+    };
+    if(status == GNI_RC_ERROR_RESOURCE)
+    {
+        GNI_RC_CHECK("rdma_rx_cq full too many ack", status);
+    }
+}
+
+
 static void PumpLocalTransactions(gni_cq_handle_t my_tx_cqh, CmiNodeLock my_cq_lock)
 {
     gni_cq_entry_t          ev;
@@ -2231,14 +2271,18 @@ static void PumpLocalTransactions(gni_cq_handle_t my_tx_cqh, CmiNodeLock my_cq_l
                 } 
                 else
                 {
+                    msg_tag = ACK_TAG; 
+#if         !CQWRITE
                     MallocAckMsg(ack_msg);
                     ack_msg->source_addr = tmp_pd->remote_addr;
-                    msg_tag = ACK_TAG;  
-                    // ack_msg_tmp->dest_addr = tmp_pd->local_addr; ???
+#endif
                 }
 #endif
                 break;
             }
+            case  GNI_POST_CQWRITE:
+                   FreePostDesc(tmp_pd);
+                   continue;
             default:
                 CmiPrintf("type=%d\n", tmp_pd->type);
                 CmiAbort("PumpLocalTransactions: unknown type!");
@@ -2252,11 +2296,15 @@ static void PumpLocalTransactions(gni_cq_handle_t my_tx_cqh, CmiNodeLock my_cq_l
             else
 #endif
             if (msg_tag == ACK_TAG) {
+#if         !CQWRITE
 #if ! PIGGYBACK_ACK
                 status = send_smsg_message(queue, inst_id, ack_msg, ACK_MSG_SIZE, msg_tag, 0); 
                 if (status == GNI_RC_SUCCESS) FreeAckMsg(ack_msg);
 #else
                 buffer_small_msgs(&smsg_ack_queue, ack_msg, ACK_MSG_SIZE, inst_id, msg_tag);
+#endif
+#else
+                sendCqWrite(inst_id, tmp_pd->remote_addr, tmp_pd->remote_mem_hndl); 
 #endif
             }
             else {
@@ -2649,6 +2697,10 @@ void LrtsAdvanceCommunication(int whileidle)
     startT = CmiWallTimer();
 #endif
     PumpLocalTransactions(rdma_tx_cqh,  rdma_tx_cq_lock);
+
+#if CQWRITE
+    PumpCqWriteTransactions();
+#endif
     //MACHSTATE(8, "after PumpLocalTransactions\n") ; 
 #if CMK_SMP_TRACE_COMMTHREAD
     endT = CmiWallTimer();
@@ -3221,8 +3273,8 @@ void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID)
     status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &smsg_rx_cqh);
     GNI_RC_CHECK("Create CQ (rx)", status);
     
-    //status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &post_rx_cqh);
-    //GNI_RC_CHECK("Create Post CQ (rx)", status);
+    status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_rx_cqh);
+    GNI_RC_CHECK("Create Post CQ (rx)", status);
     
     //status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_cqh);
     //GNI_RC_CHECK("Create BTE CQ", status);