add get-based persistent messaging to avoid put-based contention
authorYanhua Sun <suny@titan-ext5.ccs.ornl.gov>
Fri, 18 Jan 2013 22:22:48 +0000 (17:22 -0500)
committerYanhua Sun <suny@titan-ext5.ccs.ornl.gov>
Fri, 18 Jan 2013 22:22:48 +0000 (17:22 -0500)
src/arch/gemini_gni/machine-persistent.c
src/arch/gemini_gni/machine.c

index 00debed73f1e22590f2df238f0e0cdceb3984347..86ebc32bf647d0a8e5191d42f12bbe6d1cbf00fb 100644 (file)
   * persist_machine_init  // machine specific initialization call
 */
 
+#if   PERSISTENT_GET_BASE
+void LrtsSendPersistentMsg(PersistentHandle h, int destNode, int size, void *msg)
+{
+    PersistentSendsTable *slot = (PersistentSendsTable *)h;
+    int         destIndex; 
+    uint8_t tag = LMSG_PERSISTENT_INIT_TAG;
+    SMSG_QUEUE *queue = &smsg_queue;
+
+    LrtsPrepareEnvelope(msg, size);
+    if (size < BIG_MSG) {
+        CONTROL_MSG *control_msg_tmp =  construct_control_msg(size, msg, -1);
+        control_msg_tmp -> dest_addr = (uint64_t)slot->destBuf[destIndex].destAddress;
+        control_msg_tmp -> dest_mem_hndl = slot->destBuf[destIndex].mem_hndl;
+        registerMessage(msg, size, PERSIST_SEQ, &(control_msg_tmp -> source_mem_hndl));
+        buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, tag);
+    }
+}
+#else
 void LrtsSendPersistentMsg(PersistentHandle h, int destNode, int size, void *m)
 {
     gni_post_descriptor_t *pd;
@@ -141,6 +159,8 @@ void LrtsSendPersistentMsg(PersistentHandle h, int destNode, int size, void *m)
   }
 }
 
+#endif
+
 #if 0
 void CmiSyncSendPersistent(int destPE, int size, char *msg, PersistentHandle h)
 {
index 15e3c752b7bb0e2ede78608ec16f7fee90daa821..d7788769e458965d433abc56fc6af1c1ba7d5409 100644 (file)
@@ -47,9 +47,9 @@
 #include "cmidirect.h"
 #endif
 
-#if !defined(LARGEPAGE)
-#define     LARGEPAGE              0
-#endif
+//#if !defined(LARGEPAGE)
+#define     LARGEPAGE              1
+//#endif
 
 #if CMK_SMP
 #define MULTI_THREAD_SEND          0
@@ -241,6 +241,7 @@ int         lrts_send_rdma_success = 0;
 #include "mempool.h"
 
 #if CMK_PERSISTENT_COMM
+#define PERSISTENT_GET_BASE 0 
 #include "machine-persistent.h"
 #define  POST_HIGHPRIORITY_RDMA    STATS_SENDRDMAMSG_TIME(SendRdmaMsg(sendHighPriorBuf));
 #else  
@@ -348,6 +349,7 @@ static int LOCAL_QUEUE_ENTRIES=20480;
 #define SMALL_DATA_TAG          0x31
 /* SMSG is a control message to initialize a BTE */
 #define LMSG_INIT_TAG           0x33 
+#define LMSG_PERSISTENT_INIT_TAG           0x34 
 #define LMSG_OOB_INIT_TAG       0x35
 
 #define DEBUG
@@ -440,7 +442,10 @@ typedef struct control_msg
 #endif
     uint8_t             seq_id;         //big message   0 meaning single message
     gni_mem_handle_t    source_mem_hndl;
-    struct control_msg *next;
+#if PERSISTENT_GET_BASE
+    gni_mem_handle_t    dest_mem_hndl;
+#endif
+    struct control_msg  *next;
 } CONTROL_MSG;
 
 #define CONTROL_MSG_SIZE       (sizeof(CONTROL_MSG)-sizeof(void*))
@@ -1437,7 +1442,7 @@ static gni_return_t send_smsg_message(SMSG_QUEUE *queue, int destNode, void *msg
 #if CMK_SMP_TRACE_COMMTHREAD
         int oldpe = -1;
         int oldeventid = -1;
-        if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG || tag == LMSG_OOB_INIT_TAG)
+        if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG || tag == LMSG_OOB_INIT_TAG || tag == LMSG_PERSISTENT_INIT_TAG)
         { 
             START_EVENT();
             if ( tag == SMALL_DATA_TAG)
@@ -1449,7 +1454,7 @@ static gni_return_t send_smsg_message(SMSG_QUEUE *queue, int destNode, void *msg
         }
 #endif
 #if REMOTE_EVENT
-        if (tag == LMSG_INIT_TAG || tag == LMSG_OOB_INIT_TAG) {
+        if (tag == LMSG_INIT_TAG || tag == LMSG_OOB_INIT_TAG || tag == LMSG_PERSISTENT_INIT_TAG) {
             CONTROL_MSG *control_msg_tmp = (CONTROL_MSG*)msg;
             if (control_msg_tmp->seq_id == 0 && control_msg_tmp->ack_index == -1)
             {
@@ -1487,7 +1492,7 @@ static gni_return_t send_smsg_message(SMSG_QUEUE *queue, int destNode, void *msg
             SMSG_SENT_DONE(creation_time,tag) 
 #endif
 #if CMK_SMP_TRACE_COMMTHREAD
-            if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG || tag == LMSG_OOB_INIT_TAG)
+            if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG || tag == LMSG_OOB_INIT_TAG || tag == LMSG_PERSISTENT_INIT_TAG )
             { 
                 TRACE_COMM_CREATION(CpvAccess(projTraceStart), real_data);
             }
@@ -2092,6 +2097,7 @@ static void bufferRdmaMsg(PCQueue bufferqueue, int inst_id, gni_post_descriptor_
 }
 
 static void getLargeMsgRequest(void* header, uint64_t inst_id,  uint8_t tag, PCQueue);
+static void getPersistentMsgRequest(void* header, uint64_t inst_id,  uint8_t tag, PCQueue);
 
 static void PumpNetworkSmsg()
 {
@@ -2167,6 +2173,11 @@ static void PumpNetworkSmsg()
                 handleOneRecvedMsg(msg_nbytes, msg_data);
                 break;
             }
+            case LMSG_PERSISTENT_INIT_TAG:
+                CMI_GNI_UNLOCK(smsg_mailbox_lock)
+                getPersistentMsgRequest(header, inst_id, msg_tag, sendRdmaBuf);
+                GNI_SmsgRelease(ep_hndl_array[inst_id]);
+            break;
             case LMSG_INIT_TAG:
             case LMSG_OOB_INIT_TAG:
             {
@@ -2368,6 +2379,39 @@ static gni_return_t  registerMessage(void *msg, int size, int seqno, gni_mem_han
     return status;
 }
 
+static void getPersistentMsgRequest(void* header, uint64_t inst_id, uint8_t tag, PCQueue bufferRdmaQueue )
+{
+#if   PERSISTENT_GET_BASE
+    CONTROL_MSG         *request_msg;
+    gni_return_t        status;
+    gni_post_descriptor_t *pd;
+    request_msg = (CONTROL_MSG *) header;
+
+    MallocPostDesc(pd);
+    pd->cqwrite_value = request_msg->seq_id;
+    if(request_msg->length <= LRTS_GNI_RDMA_THRESHOLD) 
+        pd->type            = GNI_POST_FMA_GET;
+    else
+        pd->type            = GNI_POST_RDMA_GET;
+    pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;// |  GNI_CQMODE_REMOTE_EVENT;
+    pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
+    pd->length          = ALIGN64(request_msg->length);
+    pd->local_addr      = (uint64_t) request_msg->dest_addr;
+    pd->local_mem_hndl  = request_msg->dest_mem_hndl;
+    pd->remote_addr     = (uint64_t) request_msg->source_addr;
+    pd->remote_mem_hndl = request_msg->source_mem_hndl;
+    pd->rdma_mode       = 0;
+    pd->amo_cmd         = 0;
+
+#if REMOTE_EVENT
+    bufferRdmaMsg(bufferRdmaQueue, inst_id, pd, request_msg->ack_index); 
+#else
+    bufferRdmaMsg(bufferRdmaQueue, inst_id, pd, -1); 
+#endif
+
+#endif
+}
+
 // for BIG_MSG called on receiver side for receiving control message
 // LMSG_INIT_TAG
 static void getLargeMsgRequest(void* header, uint64_t inst_id, uint8_t tag, PCQueue bufferRdmaQueue )
@@ -2813,6 +2857,8 @@ static void PumpLocalTransactions(gni_cq_handle_t my_tx_cqh, CmiNodeLock my_cq_l
                 } 
                 else
                 {
+                    if(seq_id < 0)
+                        CmiReference((void*)tmp_pd->local_addr);
                     msg_tag = ACK_TAG; 
 #if  !REMOTE_EVENT && !CQWRITE
                     MallocAckMsg(ack_msg);