implement urgent send queue, default off
authorGengbin Zheng <gzheng@illinois.edu>
Tue, 14 Feb 2012 07:00:11 +0000 (01:00 -0600)
committerGengbin Zheng <gzheng@illinois.edu>
Tue, 14 Feb 2012 07:00:11 +0000 (01:00 -0600)
src/arch/gemini_gni/machine.c
src/arch/util/machine-common-core.c

index 9853550e5dbb81554127a3cf4978d1e61d14a719..9c1bbde2b02f7acb94ecec157674ea7210d5a957 100644 (file)
@@ -6,6 +6,14 @@
              Gengbin Zheng
  * Date:   07-01-2011
  *
+ *  Flow control by mem pool using environment variables:
+
+    export CHARM_UGNI_MEMPOOL_SIZE=8M
+    export CHARM_UGNI_MEMPOOL_MAX=20M
+    export CHARM_UGNI_SEND_MAX=10M
+
+    CHARM_UGNI_MEMPOOL_MAX can be maximum_register_mem/number_of_processes
+    CHARM_UGNI_SEND_MAX can be half of CHARM_UGNI_MEMPOOL_MAX
  */
 /*@{*/
 
@@ -22,6 +30,8 @@
 #include "converse.h"
 #include <time.h>
 
+#define USE_OOB                     0
+
 #define PRINT_SYH  0
 
 // Trace communication thread
@@ -331,13 +341,25 @@ typedef struct  msg_list_index
     MSG_LIST    *tail;
 } MSG_LIST_INDEX;
 #endif
+
 /* reuse PendingMsg memory */
 static CONTROL_MSG          *control_freelist=0;
 static MSG_LIST             *msglist_freelist=0;
+
+typedef struct smsg_queue
+{
+    MSG_LIST_INDEX   *smsg_msglist_index;
+    int               smsg_head_index;
+} SMSG_QUEUE;
+
+SMSG_QUEUE                  smsg_queue;
+SMSG_QUEUE                  smsg_oob_queue;
+/*
 static int                  smsg_head_index = -1;
 static MSG_LIST_INDEX       *smsg_msglist_index= 0;
-static MSG_LIST             *smsg_free_head=0;
-static MSG_LIST             *smsg_free_tail=0;
+static int                  smsg_oob_head_index = -1;
+static MSG_LIST_INDEX       *smsg_oob_msglist_index= 0;    // out of band
+*/
 
 /*
 #define FreeMsgList(msg_head, msg_tail, free_head, free_tail)       \
@@ -714,7 +736,7 @@ static gni_return_t registerMempool(void *msg)
 }
 
 inline
-static void buffer_small_msgs(void *msg, int size, int destNode, uint8_t tag)
+static void buffer_small_msgs(SMSG_QUEUE *queue, void *msg, int size, int destNode, uint8_t tag)
 {
     MSG_LIST        *msg_tmp;
     MallocMsgList(msg_tmp);
@@ -725,17 +747,17 @@ static void buffer_small_msgs(void *msg, int size, int destNode, uint8_t tag)
     //msg_tmp->next   = 0;
 
 #if !CMK_SMP
-    if (smsg_msglist_index[destNode].sendSmsgBuf == 0 ) {
-        smsg_msglist_index[destNode].next = smsg_head_index;
-        smsg_head_index = destNode;
-        smsg_msglist_index[destNode].tail = smsg_msglist_index[destNode].sendSmsgBuf = msg_tmp;
+    if (queue->smsg_msglist_index[destNode].sendSmsgBuf == 0 ) {
+        queue->smsg_msglist_index[destNode].next = queue->smsg_head_index;
+        queue->smsg_head_index = destNode;
+        queue->smsg_msglist_index[destNode].tail = queue->smsg_msglist_index[destNode].sendSmsgBuf = msg_tmp;
     }else
     {
-        smsg_msglist_index[destNode].tail->next = msg_tmp;
-        smsg_msglist_index[destNode].tail = msg_tmp;
+        queue->smsg_msglist_index[destNode].tail->next = msg_tmp;
+        queue->smsg_msglist_index[destNode].tail = msg_tmp;
     }
 #else
-    PCQueuePush(smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
+    PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
 #endif
 #if PRINT_SYH
     buffered_smsg_counter++;
@@ -880,7 +902,7 @@ static int connect_to(int destNode)
 }
 
 //inline static gni_return_t send_smsg_message(int destNode, void *header, int size_header, void *msg, int size, uint8_t tag, int inbuff )
-inline static gni_return_t send_smsg_message(int destNode, void *msg, int size, uint8_t tag, int inbuff )
+inline static gni_return_t send_smsg_message(SMSG_QUEUE *queue, int destNode, void *msg, int size, uint8_t tag, int inbuff )
 {
     unsigned int          remote_address;
     uint32_t              remote_id;
@@ -897,15 +919,15 @@ inline static gni_return_t send_smsg_message(int destNode, void *msg, int size,
         case 1:                           /* pending connection, do nothing */
             status = GNI_RC_NOT_DONE;
             if(inbuff ==0)
-                buffer_small_msgs(msg, size, destNode, tag);
+                buffer_small_msgs(queue, msg, size, destNode, tag);
             return status;
         }
     }
 #if CMK_SMP
-    if(PCQueueEmpty(smsg_msglist_index[destNode].sendSmsgBuf) || inbuff==1)
+    if(PCQueueEmpty(queue->smsg_msglist_index[destNode].sendSmsgBuf) || inbuff==1)
     {
 #else
-    if(smsg_msglist_index[destNode].sendSmsgBuf == 0 || inbuff==1)
+    if(queue->smsg_msglist_index[destNode].sendSmsgBuf == 0 || inbuff==1)
     {
 #endif
 #if CMK_SMP && !COMM_THREAD_SEND
@@ -937,7 +959,7 @@ inline static gni_return_t send_smsg_message(int destNode, void *msg, int size,
         }
     }
     if(inbuff ==0)
-        buffer_small_msgs(msg, size, destNode, tag);
+        buffer_small_msgs(queue, msg, size, destNode, tag);
     return status;
 }
 
@@ -971,7 +993,7 @@ inline static CONTROL_MSG* construct_control_msg(int size, char *msg, uint8_t se
 // Large message, send control to receiver, receiver register memory and do a GET, 
 // return 1 - send no success
 inline
-static int send_large_messages(int destNode, CONTROL_MSG  *control_msg_tmp, int inbuff)
+static int send_large_messages(SMSG_QUEUE *queue, int destNode, CONTROL_MSG  *control_msg_tmp, int inbuff)
 {
     gni_return_t        status  =   GNI_RC_NOT_DONE;
     uint32_t            vmdh_index  = -1;
@@ -988,7 +1010,7 @@ static int send_large_messages(int destNode, CONTROL_MSG  *control_msg_tmp, int
         if(buffered_send_msg >= MAX_BUFF_SEND)
         {
             if(!inbuff)
-                buffer_small_msgs(control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
+                buffer_small_msgs(queue, control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
             return status;
         }
         if(IsMemHndlZero(GetMemHndl(source_addr))) //it is in mempool, it is possible to be de-registered by others
@@ -1020,7 +1042,7 @@ static int send_large_messages(int destNode, CONTROL_MSG  *control_msg_tmp, int
 
     if(status == GNI_RC_SUCCESS)
     {
-        status = send_smsg_message( destNode, control_msg_tmp, sizeof(CONTROL_MSG), LMSG_INIT_TAG, inbuff);  
+        status = send_smsg_message(queue, destNode, control_msg_tmp, sizeof(CONTROL_MSG), LMSG_INIT_TAG, inbuff);  
         if(status == GNI_RC_SUCCESS)
         {
             buffered_send_msg += register_size;
@@ -1041,14 +1063,14 @@ static int send_large_messages(int destNode, CONTROL_MSG  *control_msg_tmp, int
     }else 
     {
         if(!inbuff)
-            buffer_small_msgs(control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
+            buffer_small_msgs(queue, control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
     }
     return status;
 #else
     status = MEMORY_REGISTER(onesided_hnd, nic_hndl,msg, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), &omdh);
     if(status == GNI_RC_SUCCESS)
     {
-        status = send_smsg_message( destNode, control_msg_tmp, sizeof(CONTROL_MSG), LMSG_INIT_TAG, 0);  
+        status = send_smsg_message(queue, destNode, control_msg_tmp, sizeof(CONTROL_MSG), LMSG_INIT_TAG, 0);  
         if(status == GNI_RC_SUCCESS)
         {
             FreeControlMsg(control_msg_tmp);
@@ -1058,7 +1080,7 @@ static int send_large_messages(int destNode, CONTROL_MSG  *control_msg_tmp, int
         CmiAbort("Memory registor for large msg\n");
     }else 
     {
-        buffer_small_msgs(control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
+        buffer_small_msgs(queue, control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
     }
     return status;
 #endif
@@ -1074,6 +1096,14 @@ CmiCommHandle LrtsSendFunc(int destNode, int size, char *msg, int mode)
     gni_return_t        status  =   GNI_RC_SUCCESS;
     uint8_t tag;
     CONTROL_MSG         *control_msg_tmp;
+    int                 oob = ( mode & OUT_OF_BAND);
+    SMSG_QUEUE          *queue;
+
+#if USE_OOB
+    queue = oob? &smsg_oob_queue : &smsg_queue;
+#else
+    queue = &smsg_queue;
+#endif
 
     LrtsPrepareEnvelope(msg, size);
 
@@ -1082,35 +1112,35 @@ CmiCommHandle LrtsSendFunc(int destNode, int size, char *msg, int mode)
 #endif 
 #if CMK_SMP && COMM_THREAD_SEND
     if(size <= SMSG_MAX_MSG)
-        buffer_small_msgs(msg, size, destNode, SMALL_DATA_TAG);
+        buffer_small_msgs(queue, msg, size, destNode, SMALL_DATA_TAG);
     else if (size < BIG_MSG) {
         control_msg_tmp =  construct_control_msg(size, msg, 0);
-        buffer_small_msgs(control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
+        buffer_small_msgs(queue, control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
     }
     else {
           CmiSetMsgSeq(msg, 0);
           control_msg_tmp =  construct_control_msg(size, msg, 1);
-          buffer_small_msgs(control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
+          buffer_small_msgs(queue, control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
     }
 #else //non-smp, smp(worker sending)
     if(size <= SMSG_MAX_MSG)
     {
-        status = send_smsg_message( destNode,  msg, size, SMALL_DATA_TAG, 0);  
+        status = send_smsg_message(queue, destNode,  msg, size, SMALL_DATA_TAG, 0);  
         if(status == GNI_RC_SUCCESS)
             CmiFree(msg);
     }
     else if (size < BIG_MSG) {
         control_msg_tmp =  construct_control_msg(size, msg, 0);
-        send_large_messages(destNode, control_msg_tmp, 0);
+        send_large_messages(queue, destNode, control_msg_tmp, 0);
     }
     else {
 #if     USE_LRTS_MEMPOOL
         CmiSetMsgSeq(msg, 0);
         control_msg_tmp =  construct_control_msg(size, msg, 1);
-        send_large_messages(destNode, control_msg_tmp, 0);
+        send_large_messages(queue, destNode, control_msg_tmp, 0);
 #else
         control_msg_tmp =  construct_control_msg(size, msg, 0);
-        send_large_messages(destNode, control_msg_tmp, 0);
+        send_large_messages(queue, destNode, control_msg_tmp, 0);
 #endif
     }
 #endif
@@ -1240,6 +1270,8 @@ static void PumpNetworkSmsg()
     int                 init_flag;
     CONTROL_MSG         *control_msg_tmp, *header_tmp;
     uint64_t            source_addr;
+    SMSG_QUEUE         *queue = &smsg_queue;
+
 #if CMK_SMP && !COMM_THREAD_SEND
     while(1)
     {
@@ -1348,7 +1380,7 @@ static void PumpNetworkSmsg()
                     if (control_msg_tmp->length >= ONE_SEG) control_msg_tmp->length = ONE_SEG;
                     control_msg_tmp->seq_id         = cur_seq+1+1;
                     //send next seg
-                    send_large_messages(inst_id, control_msg_tmp, 0);
+                    send_large_messages(queue, inst_id, control_msg_tmp, 0);
                          // pipelining
                     if (header_tmp->seq_id == 1) {
                       int i;
@@ -1357,7 +1389,7 @@ static void PumpNetworkSmsg()
                         CmiSetMsgSeq(header_tmp->source_addr, seq-1);
                         control_msg_tmp =  construct_control_msg(header_tmp->total_length, (char *)header_tmp->source_addr, seq);
                         control_msg_tmp->dest_addr = header_tmp->dest_addr;
-                        send_large_messages(inst_id, control_msg_tmp, 0);
+                        send_large_messages(queue, inst_id, control_msg_tmp, 0);
                         if (header_tmp->total_length <= ONE_SEG*seq) break;
                       }
                     }
@@ -1625,6 +1657,8 @@ static void PumpLocalRdmaTransactions()
 #ifdef CMK_DIRECT
     CMK_DIRECT_HEADER       *cmk_direct_done_msg;
 #endif
+    SMSG_QUEUE         *queue = &smsg_queue;
+
 #if CMK_SMP && !COMM_THREAD_SEND
     while(1) {
         CmiLock(tx_cq_lock);
@@ -1717,10 +1751,10 @@ static void PumpLocalRdmaTransactions()
             }
 #if CMK_DIRECT
             if(tmp_pd->amo_cmd == 1)
-                status = send_smsg_message(inst_id, cmk_direct_done_msg, sizeof(CMK_DIRECT_HEADER), msg_tag, 0); 
+                status = send_smsg_message(queue, inst_id, cmk_direct_done_msg, sizeof(CMK_DIRECT_HEADER), msg_tag, 0); 
             else
 #endif
-                status = send_smsg_message(inst_id, ack_msg_tmp, sizeof(CONTROL_MSG), msg_tag, 0); 
+                status = send_smsg_message(queue, inst_id, ack_msg_tmp, sizeof(CONTROL_MSG), msg_tag, 0); 
             if(status == GNI_RC_SUCCESS)
             {
 #if CMK_DIRECT
@@ -1895,7 +1929,7 @@ static void  SendRdmaMsg()
 }
 
 // return 1 if all messages are sent
-static int SendBufferMsg()
+static int SendBufferMsg(SMSG_QUEUE *queue)
 {
     MSG_LIST            *ptr, *tmp_ptr, *pre=0, *current_head;
     CONTROL_MSG         *control_msg_tmp;
@@ -1904,13 +1938,13 @@ static int SendBufferMsg()
     int                 register_size;
     void                *register_addr;
     int                 index_previous = -1;
-    int                 index = smsg_head_index;
+    int                 index = queue->smsg_head_index;
 #if CMI_EXERT_SEND_CAP
     int                        sent_cnt = 0;
 #endif
 
 #if ! CMK_SMP
-    index = smsg_head_index;
+    index = queue->smsg_head_index;
 #else
     index = 0;
 #endif
@@ -1920,15 +1954,15 @@ static int SendBufferMsg()
 #if CMK_SMP
     while(index <mysize)
     {
-        int i, len = PCQueueLength(smsg_msglist_index[index].sendSmsgBuf);
+        int i, len = PCQueueLength(queue->smsg_msglist_index[index].sendSmsgBuf);
         for (i=0; i<len; i++) 
         {
-            ptr = (MSG_LIST*)PCQueuePop(smsg_msglist_index[index].sendSmsgBuf);
+            ptr = (MSG_LIST*)PCQueuePop(queue->smsg_msglist_index[index].sendSmsgBuf);
             if (ptr == NULL) break;
 #else
     while(index != -1)
     {
-        ptr = smsg_msglist_index[index].sendSmsgBuf;
+        ptr = queue->smsg_msglist_index[index].sendSmsgBuf;
         pre = 0;
         while(ptr != 0)
         {
@@ -1945,7 +1979,7 @@ static int SendBufferMsg()
             switch(ptr->tag)
             {
             case SMALL_DATA_TAG:
-                status = send_smsg_message( ptr->destNode,  ptr->msg, ptr->size, ptr->tag, 1);  
+                status = send_smsg_message(queue, ptr->destNode,  ptr->msg, ptr->size, ptr->tag, 1);  
                 if(status == GNI_RC_SUCCESS)
                 {
                     CmiFree(ptr->msg);
@@ -1953,13 +1987,13 @@ static int SendBufferMsg()
                 break;
             case LMSG_INIT_TAG:
                 control_msg_tmp = (CONTROL_MSG*)ptr->msg;
-                status = send_large_messages( ptr->destNode, control_msg_tmp, 1);
+                status = send_large_messages(queue, ptr->destNode, control_msg_tmp, 1);
                 if(status != GNI_RC_SUCCESS)
                     done = 0;
                 break;
             case   ACK_TAG:
             case   BIG_MSG_TAG:
-                status = send_smsg_message( ptr->destNode, ptr->msg, sizeof(CONTROL_MSG), ptr->tag, 1);  
+                status = send_smsg_message(queue, ptr->destNode, ptr->msg, sizeof(CONTROL_MSG), ptr->tag, 1);  
                 if(status == GNI_RC_SUCCESS)
                 {
                     FreeControlMsg((CONTROL_MSG*)ptr->msg);
@@ -1968,7 +2002,7 @@ static int SendBufferMsg()
                 break;
 #ifdef CMK_DIRECT
             case DIRECT_PUT_DONE_TAG:
-                status = send_smsg_message( ptr->destNode, ptr->msg, sizeof(CMK_DIRECT_HEADER), ptr->tag, 1);  
+                status = send_smsg_message(queue, ptr->destNode, ptr->msg, sizeof(CMK_DIRECT_HEADER), ptr->tag, 1);  
                 if(status == GNI_RC_SUCCESS)
                 {
                     free((CMK_DIRECT_HEADER*)ptr->msg);
@@ -1989,7 +2023,7 @@ static int SendBufferMsg()
                     ptr = pre ->next = ptr->next;
                 }else
                 {
-                    ptr = smsg_msglist_index[index].sendSmsgBuf = smsg_msglist_index[index].sendSmsgBuf->next;
+                    ptr = queue->smsg_msglist_index[index].sendSmsgBuf = queue->smsg_msglist_index[index].sendSmsgBuf->next;
                 }
                 FreeMsgList(tmp_ptr);
 #else
@@ -2006,7 +2040,7 @@ static int SendBufferMsg()
 #endif
             }else {
 #if CMK_SMP
-                PCQueuePush(smsg_msglist_index[index].sendSmsgBuf, (char*)ptr);
+                PCQueuePush(queue->smsg_msglist_index[index].sendSmsgBuf, (char*)ptr);
 #else
                 pre = ptr;
                 ptr=ptr->next;
@@ -2015,18 +2049,18 @@ static int SendBufferMsg()
             } 
         } //end while
 #if !CMK_SMP
-        smsg_msglist_index[index].tail = pre;
-        if(smsg_msglist_index[index].sendSmsgBuf == 0)
+        queue->smsg_msglist_index[index].tail = pre;
+        if(queue->smsg_msglist_index[index].sendSmsgBuf == 0)
         {
             if(index_previous != -1)
-                smsg_msglist_index[index_previous].next = smsg_msglist_index[index].next;
+                queue->smsg_msglist_index[index_previous].next = queue->smsg_msglist_index[index].next;
             else
-                smsg_head_index = smsg_msglist_index[index].next;
+                queue->smsg_head_index = queue->smsg_msglist_index[index].next;
         }else
         {
             index_previous = index;
         }
-        index = smsg_msglist_index[index].next;
+        index = queue->smsg_msglist_index[index].next;
 #else
         index++;
 #endif
@@ -2092,7 +2126,10 @@ void LrtsAdvanceCommunication(int whileidle)
 #if CMK_SMP_TRACE_COMMTHREAD
     startT = CmiWallTimer();
 #endif
-    SendBufferMsg();
+#if USE_OOB
+    SendBufferMsg(&smsg_oob_queue);
+#endif
+    SendBufferMsg(&smsg_queue);
 #if DEBUG_POOL
     MACHSTATE(8, "after SendBufferMsg\n") ; 
 #endif
@@ -2275,10 +2312,26 @@ static void _init_static_smsg()
 } 
 
 inline
-static void _init_smsg()
+static void _init_send_queue(SMSG_QUEUE *queue)
 {
-    int i;
+     int i;
+     queue->smsg_msglist_index = (MSG_LIST_INDEX*)malloc(mysize*sizeof(MSG_LIST_INDEX));
+     for(i =0; i<mysize; i++)
+     {
+        queue->smsg_msglist_index[i].next = -1;
+#if CMK_SMP
+        queue->smsg_msglist_index[i].sendSmsgBuf = PCQueueCreate();
+#else
+        queue->smsg_msglist_index[i].sendSmsgBuf = 0; 
+#endif
+        
+     }
+     queue->smsg_head_index = -1;
+}
 
+inline
+static void _init_smsg()
+{
     if(mysize > 1) {
         if (useDynamicSMSG)
             _init_dynamic_smsg();
@@ -2286,18 +2339,10 @@ static void _init_smsg()
             _init_static_smsg();
     }
 
-     smsg_msglist_index = (MSG_LIST_INDEX*)malloc(mysize*sizeof(MSG_LIST_INDEX));
-     for(i =0; i<mysize; i++)
-     {
-        smsg_msglist_index[i].next = -1;
-#if CMK_SMP
-        smsg_msglist_index[i].sendSmsgBuf = PCQueueCreate();
-#else
-        smsg_msglist_index[i].sendSmsgBuf = 0; 
+    _init_send_queue(&smsg_queue);
+#if USE_OOB
+    _init_send_queue(&smsg_oob_queue);
 #endif
-        
-     }
-     smsg_head_index = -1;
 }
 
 static void _init_static_msgq()
@@ -2750,7 +2795,12 @@ void LrtsExit()
 void LrtsDrainResources()
 {
     if(mysize == 1) return;
-    while (!SendBufferMsg()) {
+    while (
+#if USE_OOB
+           !SendBufferMsg(&smsg_oob_queue) ||
+#endif
+           !SendBufferMsg(&smsg_queue))
+    {
         if (useDynamicSMSG)
             PumpDatagramConnection();
         PumpNetworkSmsg();
index 8f9142edf7b5eaa7da2fe4e6cc76509da32b2293..3f508eb532d64c82f7828591f2cb9365778a6f64 100644 (file)
@@ -139,10 +139,11 @@ int               _Cmi_numpes;    /* Total number of processors */
 CpvDeclare(void*, CmiLocalQueue);
 
 /* different modes for sending a message */
-#define P2P_SYNC 0x1
-#define P2P_ASYNC 0x2
-#define BCAST_SYNC 0x3
-#define BCAST_ASYNC 0x4
+#define P2P_SYNC      0x1
+#define P2P_ASYNC     0x2
+#define BCAST_SYNC    0x4
+#define BCAST_ASYNC   0x8
+#define OUT_OF_BAND   0x10
 
 enum MACHINE_SMP_MODE {
     INVALID_MODE,