make send_smsg_buffer more efficient
authorYanhua Sun <sun51@hopper03.(none)>
Thu, 15 Sep 2011 21:03:39 +0000 (14:03 -0700)
committerYanhua Sun <sun51@hopper03.(none)>
Thu, 15 Sep 2011 21:03:39 +0000 (14:03 -0700)
src/arch/gemini_gni/machine.c

index 305705946384037d35228e87e49f4f5cedffdc96..7410425a2da2d47898d0debee32703448300bf5b 100644 (file)
@@ -194,6 +194,7 @@ typedef struct msg_list
     uint32_t size;
     void *msg;
     struct msg_list *next;
+    struct msg_list *pehead_next;
     uint8_t tag;
 }MSG_LIST;
 
@@ -231,11 +232,18 @@ typedef struct  rmda_msg
     struct  rmda_msg      *next;
 }RDMA_REQUEST;
 
+typedef struct  msg_list_index
+{
+    int         next;
+    MSG_LIST    *head;
+    MSG_LIST    *tail;
+} MSG_LIST_INDEX;
+
 /* reuse PendingMsg memory */
 static CONTROL_MSG          *control_freelist=0;
 static MSG_LIST             *msglist_freelist=0;
-static MSG_LIST             **smsg_msglist_head= 0;
-static MSG_LIST             **smsg_msglist_tail= 0;
+static int                  smsg_head_index;
+static MSG_LIST_INDEX       *smsg_msglist_index= 0;
 static MSG_LIST             *smsg_free_head=0;
 static MSG_LIST             *smsg_free_tail=0;
 
@@ -538,13 +546,15 @@ static void delay_send_small_msg(void *msg, int size, int destNode, uint8_t tag)
     msg_tmp->msg    = msg;
     msg_tmp->tag    = tag;
     msg_tmp->next   = 0;
-    if (smsg_msglist_head[destNode] == 0) {
-        smsg_msglist_head[destNode]  = msg_tmp;
+    if (smsg_msglist_index[destNode].head == 0 ) {
+        smsg_msglist_index[destNode].head = msg_tmp;
+        smsg_msglist_index[destNode].next = smsg_head_index;
+        smsg_head_index = destNode;
     }
     else {
-      smsg_msglist_tail[destNode]->next    = msg_tmp;
+      (smsg_msglist_index[destNode].tail)->next    = msg_tmp;
     }
-    smsg_msglist_tail[destNode]          = msg_tmp;
+    smsg_msglist_index[destNode].tail          = msg_tmp;
 #if PRINT_SYH
     buffered_smsg_counter++;
 #endif
@@ -557,7 +567,7 @@ static void send_small_messages(int destNode, int size, char *msg)
     gni_return_t        status  =   GNI_RC_SUCCESS;
     const uint8_t       tag_data    = SMALL_DATA_TAG;
     
-    if(smsg_msglist_head[destNode] == 0)
+    if(smsg_msglist_index[destNode].head == 0)
     {
         status = GNI_SmsgSendWTag(ep_hndl_array[destNode], NULL, 0, msg, size, 0, SMALL_DATA_TAG);
         if (status == GNI_RC_SUCCESS)
@@ -701,7 +711,7 @@ static int send_large_messages(int destNode, int size, char *msg)
     lrts_send_msg_id++;
     CmiPrintf("Large LrtsSend PE:%d==>%d, size=%d, messageid:%d LMSG\n", myrank, destNode, size, lrts_send_msg_id);
 #endif
-    if(smsg_msglist_head[destNode] == 0)
+    if(smsg_msglist_index[destNode].head == 0 )
     {
         status = MEMORY_REGISTER(onesided_hnd, nic_hndl,msg, ALIGN4(size), &(control_msg_tmp->source_mem_hndl), &omdh);
         if(status == GNI_RC_SUCCESS)
@@ -1057,7 +1067,7 @@ static void PumpLocalRdmaTransactions()
             lrts_send_msg_id++;
             CmiPrintf("ACK LrtsSend PE:%d==>%d, size=%d, messageid:%d ACK\n", myrank, inst_id, sizeof(CONTROL_MSG), lrts_send_msg_id);
 #endif
-            if(smsg_msglist_head[inst_id]!=0)
+            if(smsg_msglist_index[inst_id].head != 0 )
             {
                 delay_send_small_msg(ack_msg_tmp, sizeof(CONTROL_MSG), inst_id, msg_tag);
             }else
@@ -1131,22 +1141,23 @@ static int SendRdmaMsg()
 // return 1 if all messages are sent
 static int SendBufferMsg()
 {
-    MSG_LIST            *ptr;
+    MSG_LIST            *ptr, *previous_head, *current_head;
     CONTROL_MSG         *control_msg_tmp;
     gni_return_t        status;
     int done = 1;
     register    int     i;
+    int                 index_previous = -1;
+    int                 index = smsg_head_index;
     //if( smsg_msglist_head == 0 && buffered_smsg_counter!= 0 ) {CmiPrintf("WRONGWRONG on rank%d, buffermsg=%d, (msgid-succ:%d)\n", myrank, buffered_smsg_counter, (lrts_send_msg_id-lrts_smsg_success)); CmiAbort("sendbuf");}
     /* can add flow control here to control the number of messages sent before handle message */
-    for(i=0; i<mysize; i++)
+    while(index != -1)
     {
-        if(i==myrank || smsg_msglist_head[i]==0) continue;
+        ptr = smsg_msglist_index[index].head;
        
-        while(smsg_msglist_head[i]!=0)
+        while(ptr!=0)
         {
         if(useStaticSMSG)
         {
-            ptr = smsg_msglist_head[i];
             CmiAssert(ptr!=NULL);
             if(ptr->tag == SMALL_DATA_TAG)
             {
@@ -1209,8 +1220,9 @@ static int SendBufferMsg()
         } 
         if(status == GNI_RC_SUCCESS)
         {
-            smsg_msglist_head[i] = smsg_msglist_head[i]->next;
+            smsg_msglist_index[index].head = smsg_msglist_index[index].head->next;
             FreeMsgList(ptr);
+            ptr= smsg_msglist_index[index].head;
 #if PRINT_SYH
             buffered_smsg_counter--;
             if(lrts_smsg_success == lrts_send_msg_id)
@@ -1223,6 +1235,17 @@ static int SendBufferMsg()
             break;
         }
         } //end pooling this i-th core
+        if(ptr == 0)
+        {
+            if(index_previous != -1)
+                smsg_msglist_index[index_previous].next = smsg_msglist_index[index].next;
+            else
+                smsg_head_index = smsg_msglist_index[index].next;
+        }else
+        {
+            index_previous = index;
+        }
+        index = smsg_msglist_index[index].next;
     }   // end pooling for all cores
 #if PRINT_SYH
     if(lrts_send_msg_id-lrts_smsg_success !=0)
@@ -1382,10 +1405,14 @@ static void _init_static_smsg()
     free(smsg_attr);
     status = GNI_SmsgSetMaxRetrans(nic_hndl, 4096);
     GNI_RC_CHECK("SmsgSetMaxRetrans Init", status);
-    smsg_msglist_head = (MSG_LIST**) malloc(mysize*sizeof(MSG_LIST*));
-    memset(smsg_msglist_head, 0, mysize*sizeof(MSG_LIST*));
-    smsg_msglist_tail = (MSG_LIST**) malloc(mysize*sizeof(MSG_LIST*));
-    memset(smsg_msglist_tail, 0, mysize*sizeof(MSG_LIST*));
+    smsg_msglist_index = (MSG_LIST_INDEX*)malloc(mysize*sizeof(MSG_LIST_INDEX));
+    for(i =0; i<mysize; i++)
+    {
+        smsg_msglist_index[i].next = -1;
+        smsg_msglist_index[i].head = 0;
+        smsg_msglist_index[i].tail = 0;
+    }
+    smsg_head_index = -1;
 } 
 
 static void _init_static_msgq()