use remote_event for ack
authorGengbin Zheng <gzheng@illinois.edu>
Tue, 13 Mar 2012 06:03:46 +0000 (01:03 -0500)
committerGengbin Zheng <gzheng@illinois.edu>
Tue, 13 Mar 2012 06:03:46 +0000 (01:03 -0500)
src/arch/gemini_gni/machine.c

index b48d2e14b838b070884df275f918c52cd052cfdd..da2c3fdb6d585b56a207ab3e63b2a59b5238fe4b 100644 (file)
 #endif
 
 
+#if MULTI_THREAD_SEND
+#define CMK_WORKER_SINGLE_TASK     0
+#endif
+
 #define CMI_EXERT_SEND_CAP     0
 #define        CMI_EXERT_RECV_CAP      0
 
 
 #define USE_LRTS_MEMPOOL                  1
 
-#define CQWRITE                     0
+#define CQWRITE                           0
 #define REMOTE_EVENT                      1
 
-#define PRINT_SYH  0
+#define PRINT_SYH                         0
 
 // Trace communication thread
 #if CMK_TRACE_ENABLED && CMK_SMP_TRACE_COMMTHREAD
@@ -116,11 +120,11 @@ static CmiInt8 _mempool_size_limit = 0;
 static CmiInt8 _totalmem = 0.8*oneGB;
 
 #if LARGEPAGE
-static int BIG_MSG  =  16*oneMB;
-static int ONE_SEG  =  4*oneMB;
+static CmiInt8 BIG_MSG  =  10024*oneMB;
+static CmiInt8 ONE_SEG  =  4*oneMB;
 #else
-static int BIG_MSG  =  4*oneMB;
-static int ONE_SEG  =  2*oneMB;
+static CmiInt8 BIG_MSG  =  4*oneMB;
+static CmiInt8 ONE_SEG  =  2*oneMB;
 #endif
 #if MULTI_THREAD_SEND
 static int BIG_MSG_PIPELINE = 1;
@@ -387,6 +391,9 @@ typedef struct control_msg
     uint64_t            dest_addr;      /* address from the start of buffer */
     int                 total_length;   /* total length */
     int                 length;         /* length of this packet */
+#if REMOTE_EVENT
+    int                 ack_index;      /* index from integer to address */
+#endif
     uint8_t             seq_id;         //big message   0 meaning single message
     gni_mem_handle_t    source_mem_hndl;
     struct control_msg *next;
@@ -436,6 +443,9 @@ void CmiDirectInit()
 typedef struct  rmda_msg
 {
     int                   destNode;
+#if REMOTE_EVENT
+    int                   ack_index;
+#endif
     gni_post_descriptor_t *pd;
 #if !CMK_SMP
     struct  rmda_msg      *next;
@@ -626,6 +636,74 @@ static MSG_LIST *buffered_fma_tail = 0;
 
 CpvDeclare(mempool_type*, mempool);
 
+#if REMOTE_EVENT
+/* ack pool for remote events */
+
+#define ACK_SHIFT                  19
+#define ACK_EVENT(idx)             ((idx<<ACK_SHIFT) | myrank)
+#define ACK_GET_RANK(evt)          (evt & ((1<<ACK_SHIFT)-1))
+#define ACK_GET_INDEX(evt)         (evt >> ACK_SHIFT)
+
+struct AckPool {
+void *addr;
+int next;
+};
+
+static struct AckPool   *ackpool;
+static int    ackpoolsize;
+static int    ackpool_freehead;
+
+#define  GetAckAddress(s)          (ackpool[s].addr)
+
+static void AckPool_init()
+{
+    int i;
+    ackpoolsize = 2048;
+    ackpool = (struct AckPool *)malloc(ackpoolsize*sizeof(struct AckPool));
+    for (i=0; i<ackpoolsize-1; i++) {
+        ackpool[i].next = i+1;
+    }
+    ackpool[i].next = -1;
+    ackpool_freehead = 0;
+}
+
+static
+inline  int  AckPool_getslot(void *addr)
+{
+    int i;
+    int s = ackpool_freehead;
+    if (s == -1) {
+        // printf("[%d] AckPool_getslot expand: %d\n", myrank, ackpoolsize);
+        int newsize = ackpoolsize * 2;
+        if (ackpoolsize == 1<<(32-ACK_SHIFT)) CmiAbort("AckPool too large");
+        struct AckPool   *old_ackpool = ackpool;
+        ackpool = (struct AckPool *)malloc(newsize*sizeof(struct AckPool));
+        memcpy(ackpool, old_ackpool, ackpoolsize*sizeof(struct AckPool));
+        for (i=ackpoolsize; i<newsize-1; i++) {
+            ackpool[i].next = i+1;
+        }
+        ackpool[i].next = -1;
+        ackpool_freehead = ackpoolsize;
+        s = ackpoolsize;
+        ackpoolsize = newsize;
+        free(old_ackpool);
+    }
+    ackpool_freehead = ackpool[s].next;
+    ackpool[s].addr = addr;
+    return s;
+}
+
+static
+inline  void AckPool_freeslot(int s)
+{
+    CmiAssert(s>=0 && s<ackpoolsize);
+    ackpool[s].next = ackpool_freehead;
+    ackpool_freehead = s;
+}
+
+
+#endif
+
 #if CMK_WITH_STATS
 typedef struct comm_thread_stats
 {
@@ -654,13 +732,26 @@ static void init_comm_stats()
               comm_stats.max_time_in_send_buffered_ack = t;    \
         }
 
+#define STATS_SEND_SMSGS_TIME(x)   \
+        { double t = CmiWallTimer(); \
+          x;        \
+          t = CmiWallTimer() - t;          \
+          comm_stats.count_in_send_buffered_smsg ++;        \
+          comm_stats.time_in_send_buffered_smsg += t;   \
+          if (t>comm_stats.max_time_in_send_buffered_smsg)      \
+              comm_stats.max_time_in_send_buffered_smsg = t;    \
+        }
+
 static void print_comm_stats()
 {
-    printf("PE[%d]  count/time in send buffered ack:   %d %f\n",  myrank, comm_stats.count_in_send_buffered_ack, comm_stats.time_in_send_buffered_ack);
-    printf("PE[%d]  max time in send buffered ack:     %f\n",  myrank, comm_stats.max_time_in_send_buffered_ack);
+    if (myrank == 0) 
+    printf("PE[%d]                    count\ttime\tmax \n", myrank);
+    printf("PE[%d]  send buffered ack:   %d\t%f\t%f\n",  myrank, comm_stats.count_in_send_buffered_ack, comm_stats.time_in_send_buffered_ack, comm_stats.max_time_in_send_buffered_ack);
+    printf("PE[%d]  send smsgs:          %d\t%f\t%f\n",  myrank, comm_stats.count_in_send_buffered_smsg, comm_stats.time_in_send_buffered_smsg, comm_stats.max_time_in_send_buffered_smsg);
 }
 #else
 #define STATS_ACK_TIME(x)            x
+#define STATS_SEND_SMSGS_TIME(x)     x
 #endif
 
 static int print_stats = 0;
@@ -1183,6 +1274,14 @@ static gni_return_t send_smsg_message(SMSG_QUEUE *queue, int destNode, void *msg
             TRACE_COMM_GET_MSGID(real_data, &oldpe, &oldeventid);
             TRACE_COMM_SET_COMM_MSGID(real_data);
         }
+#endif
+#if REMOTE_EVENT
+        if (tag == LMSG_INIT_TAG) {
+            CONTROL_MSG *control_msg_tmp = (CONTROL_MSG*)msg;
+            if (control_msg_tmp->seq_id == 0 && control_msg_tmp->ack_index == -1)
+                control_msg_tmp->ack_index = AckPool_getslot((void*)control_msg_tmp->source_addr);
+        }
+        GNI_EpSetEventData(ep_hndl_array[destNode], destNode, myrank);
 #endif
         status = GNI_SmsgSendWTag(ep_hndl_array[destNode], buf, bufsize, msg, size, 0, tag);
 #if CMK_SMP_TRACE_COMMTHREAD
@@ -1216,6 +1315,9 @@ static CONTROL_MSG* construct_control_msg(int size, char *msg, uint8_t seqno)
     control_msg_tmp->source_addr = (uint64_t)msg;
     control_msg_tmp->seq_id    = seqno;
     control_msg_tmp->total_length = control_msg_tmp->length = ALIGN64(size); //for GET 4 bytes aligned 
+#if REMOTE_EVENT
+    control_msg_tmp->ack_index    =  -1;
+#endif
 #if     USE_LRTS_MEMPOOL
     if(size < BIG_MSG)
     {
@@ -1280,8 +1382,8 @@ static gni_return_t send_large_messages(SMSG_QUEUE *queue, int destNode, CONTROL
             control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
             status = GNI_RC_SUCCESS;
         }
-        if(NoMsgInSend( control_msg_tmp->source_addr))
-            register_size = GetMempoolsize((void*)(control_msg_tmp->source_addr));
+        if(NoMsgInSend(source_addr))
+            register_size = GetMempoolsize((void*)(source_addr));
         else
             register_size = 0;
     }else if(control_msg_tmp->seq_id >0)    // BIG_MSG
@@ -1554,17 +1656,44 @@ void LrtsPostNonLocal(){
 #if CMK_SMP_TRACE_COMMTHREAD
     startT = CmiWallTimer();
 #endif
+
+#if CMK_WORKER_SINGLE_TASK
+    if (CmiMyRank() % 6 == 0)
+#endif
     PumpNetworkSmsg();
+
+#if CMK_WORKER_SINGLE_TASK
+    if (CmiMyRank() % 6 == 1)
+#endif
     PumpLocalTransactions(default_tx_cqh, default_tx_cq_lock);
+
+#if CMK_WORKER_SINGLE_TASK
+    if (CmiMyRank() % 6 == 2)
+#endif
     PumpLocalTransactions(rdma_tx_cqh, rdma_tx_cq_lock);
+
+#if REMOTE_EVENT
+#if CMK_WORKER_SINGLE_TASK
+    if (CmiMyRank() % 6 == 3)
+#endif
+    PumpRemoteTransactions();
+#endif
+
 #if CMK_USE_OOB
     if (SendBufferMsg(&smsg_oob_queue) == 1)
 #endif
     {
-    SendBufferMsg(&smsg_queue);
+#if CMK_WORKER_SINGLE_TASK
+    if (CmiMyRank() % 6 == 4)
+#endif
+        SendBufferMsg(&smsg_queue);
     }
 
+#if CMK_WORKER_SINGLE_TASK
+    if (CmiMyRank() % 6 == 5)
+#endif
     SendRdmaMsg();
+
 #if CMK_SMP_TRACE_COMMTHREAD
     endT = CmiWallTimer();
     traceUserBracketEvent(event_AdvanceCommunication, startT, endT);
@@ -1634,12 +1763,15 @@ static void PumpNetworkRdmaMsgs()
 }
 
 inline 
-static void bufferRdmaMsg(int inst_id, gni_post_descriptor_t *pd)
+static void bufferRdmaMsg(int inst_id, gni_post_descriptor_t *pd, int ack_index)
 {
     RDMA_REQUEST        *rdma_request_msg;
     MallocRdmaRequest(rdma_request_msg);
     rdma_request_msg->destNode = inst_id;
     rdma_request_msg->pd = pd;
+#if REMOTE_EVENT
+    rdma_request_msg->ack_index = ack_index;
+#endif
 #if CMK_SMP
     PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
 #else
@@ -1684,6 +1816,9 @@ static void PumpNetworkSmsg()
         if(status != GNI_RC_SUCCESS)
             break;
         inst_id = GNI_CQ_GET_INST_ID(event_data);
+#if REMOTE_EVENT
+        inst_id = ACK_GET_RANK(inst_id);
+#endif
         // GetEvent returns success but GetNext return not_done. caused by Smsg out-of-order transfer
 #if PRINT_SYH
         printf("[%d] %d PumpNetworkMsgs is received from PE: %d,  status=%s\n", myrank, CmiMyRank(), inst_id,  gni_err_str[status]);
@@ -1849,6 +1984,7 @@ static void printDesc(gni_post_descriptor_t *pd)
 {
     printf(" Descriptor (%p===>%p)(%d)\n", pd->local_addr, pd->remote_addr, pd->length); 
 }
+
 static void sendCqWrite(int destNode, uint64_t data, gni_mem_handle_t mem_hndl)
 {
     gni_post_descriptor_t *pd;
@@ -1866,6 +2002,7 @@ static void sendCqWrite(int destNode, uint64_t data, gni_mem_handle_t mem_hndl)
     GNI_RC_CHECK("GNI_PostCqWrite", status);
 
 }
+
 // for BIG_MSG called on receiver side for receiving control message
 // LMSG_INIT_TAG
 static void getLargeMsgRequest(void* header, uint64_t inst_id )
@@ -1933,7 +2070,7 @@ static void getLargeMsgRequest(void* header, uint64_t inst_id )
         pd->type            = GNI_POST_FMA_GET;
     else
         pd->type            = GNI_POST_RDMA_GET;
-    pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;// |  GNI_CQMODE_REMOTE_EVENT;
+    pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
     pd->length          = transaction_size;
     pd->local_addr      = (uint64_t) msg_data;
@@ -1946,6 +2083,18 @@ static void getLargeMsgRequest(void* header, uint64_t inst_id )
     //memory registration success
     if(status == GNI_RC_SUCCESS)
     {
+#if REMOTE_EVENT
+        if( request_msg->seq_id == 0)
+        {
+            pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
+            int sts = GNI_EpSetEventData(ep_hndl_array[inst_id], inst_id, ACK_EVENT(request_msg->ack_index));
+            GNI_RC_CHECK("GNI_EpSetEventData", sts);
+        }
+        else {
+            int sts = GNI_EpSetEventData(ep_hndl_array[inst_id], inst_id, myrank);
+            GNI_RC_CHECK("GNI_EpSetEventData", sts);
+        }
+#endif
         if(pd->type == GNI_POST_RDMA_GET) 
         {
             CMI_GNI_LOCK(rdma_tx_cq_lock)
@@ -1976,7 +2125,11 @@ static void getLargeMsgRequest(void* header, uint64_t inst_id )
     }
     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
     {
-        bufferRdmaMsg(inst_id, pd); 
+#if REMOTE_EVENT
+        bufferRdmaMsg(inst_id, pd, request_msg->ack_index); 
+#else
+        bufferRdmaMsg(inst_id, pd, -1); 
+#endif
     }else {
          //printf("source: %d pd:(%p,%p)(%p,%p)\n", source, (pd->local_mem_hndl).qword1, (pd->local_mem_hndl).qword2, (pd->remote_mem_hndl).qword1, (pd->remote_mem_hndl).qword2);
         GNI_RC_CHECK("GetLargeAFter posting", status);
@@ -2080,6 +2233,42 @@ static void PumpCqWriteTransactions()
     }
 }
 
+#if REMOTE_EVENT
+static void PumpRemoteTransactions()
+{
+    gni_cq_entry_t          ev;
+    gni_return_t            status;
+    void                    *msg;   
+    int                     slot;
+
+    while(1) {
+        //CMI_GNI_LOCK(my_cq_lock) 
+        status = GNI_CqGetEvent(rdma_rx_cqh, &ev);
+        //CMI_GNI_UNLOCK(my_cq_lock)
+        if(status != GNI_RC_SUCCESS) break;
+
+        slot = GNI_CQ_GET_INST_ID(ev);
+        slot = ACK_GET_INDEX(slot);
+        //slot = GNI_CQ_GET_DATA(ev) & 0xFFFFFFFFL;
+        msg = GetAckAddress(slot);
+
+        DecreaseMsgInSend(msg);
+#if ! USE_LRTS_MEMPOOL
+       // MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
+#else
+        DecreaseMsgInSend(msg);
+#endif
+        if(NoMsgInSend(msg))
+            buffered_send_msg -= GetMempoolsize(msg);
+        CmiFree(msg);
+        AckPool_freeslot(slot);
+    };
+    if(status == GNI_RC_ERROR_RESOURCE)
+    {
+        GNI_RC_CHECK("rdma_rx_cq full too many ack", status);
+    }
+}
+#endif
 
 static void PumpLocalTransactions(gni_cq_handle_t my_tx_cqh, CmiNodeLock my_cq_lock)
 {
@@ -2161,7 +2350,7 @@ static void PumpLocalTransactions(gni_cq_handle_t my_tx_cqh, CmiNodeLock my_cq_l
                 else
                 {
                     msg_tag = ACK_TAG; 
-#if         !CQWRITE
+#if  !REMOTE_EVENT && !CQWRITE
                     MallocAckMsg(ack_msg);
                     ack_msg->source_addr = tmp_pd->remote_addr;
 #endif
@@ -2185,11 +2374,13 @@ static void PumpLocalTransactions(gni_cq_handle_t my_tx_cqh, CmiNodeLock my_cq_l
             else
 #endif
             if (msg_tag == ACK_TAG) {
+#if !REMOTE_EVENT
 #if         !CQWRITE
                 status = send_smsg_message(queue, inst_id, ack_msg, ACK_MSG_SIZE, msg_tag, 0); 
                 if (status == GNI_RC_SUCCESS) FreeAckMsg(ack_msg);
 #else
                 sendCqWrite(inst_id, tmp_pd->remote_addr, tmp_pd->remote_mem_hndl); 
+#endif
 #endif
             }
             else {
@@ -2293,6 +2484,18 @@ static void  SendRdmaMsg()
         }
         if(status == GNI_RC_SUCCESS)        //mem register good
         {
+#if REMOTE_EVENT
+            if( pd->cqwrite_value == 0)
+            {
+                pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
+                int sts = GNI_EpSetEventData(ep_hndl_array[ptr->destNode], ptr->destNode, ACK_EVENT(ptr->ack_index));
+                GNI_RC_CHECK("GNI_EpSetEventData", sts);
+            }
+            else {
+                int sts = GNI_EpSetEventData(ep_hndl_array[ptr->destNode], ptr->destNode, myrank);
+                GNI_RC_CHECK("GNI_EpSetEventData", sts);
+            }
+#endif
             if(pd->type == GNI_POST_RDMA_GET) 
             {
                 CMI_GNI_LOCK(rdma_tx_cq_lock)
@@ -2466,6 +2669,10 @@ static int SendBufferMsg(SMSG_QUEUE *queue)
             }       // end switch
             if(status == GNI_RC_SUCCESS)
             {
+#if PRINT_SYH
+                buffered_smsg_counter--;
+                printf("[%d==>%d] buffered smsg sending done\n", myrank, ptr->destNode);
+#endif
 #if !CMK_SMP
                 tmp_ptr = ptr;
                 if(pre)
@@ -2479,10 +2686,6 @@ static int SendBufferMsg(SMSG_QUEUE *queue)
 #else
                 FreeMsgList(ptr);
 #endif
-#if PRINT_SYH
-                buffered_smsg_counter--;
-                printf("[%d==>%d] buffered smsg sending done\n", myrank, ptr->destNode);
-#endif
 #if CMI_EXERT_SEND_CAP
                 sent_cnt++;
                 if(sent_cnt == SEND_CAP)
@@ -2592,6 +2795,11 @@ void LrtsAdvanceCommunication(int whileidle)
 #if CQWRITE
     PumpCqWriteTransactions();
 #endif
+
+#if REMOTE_EVENT
+    PumpRemoteTransactions();
+#endif
+
     //MACHSTATE(8, "after PumpLocalTransactions\n") ; 
 #if CMK_SMP_TRACE_COMMTHREAD
     endT = CmiWallTimer();
@@ -3297,6 +3505,10 @@ void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID)
 //    NTK_Init();
 //    ntk_return_t sts = NTK_System_GetSmpdCount(&_smpd_count);
 
+#if  REMOTE_EVENT
+    AckPool_init();
+#endif
+
 #if CMK_WITH_STATS
     init_comm_stats();
 #endif