re-implement urgent send
authorGengbin Zheng <gzheng@illinois.edu>
Thu, 5 Apr 2012 22:41:46 +0000 (17:41 -0500)
committerGengbin Zheng <gzheng@illinois.edu>
Thu, 5 Apr 2012 22:41:46 +0000 (17:41 -0500)
src/arch/gemini_gni/machine.c

index eb86aba4418941c00019732f789af304a625e274..db20309a8a6ed84dd2fbca45bcb5433e7882b467 100644 (file)
@@ -489,14 +489,14 @@ PCQueue sendRdmaBuf;
 PCQueue sendPersistentBuf;
 typedef struct  msg_list_index
 {
-    PCQueue     sendSmsgBuf;
-    int         pushed;
+    PCQueue       sendSmsgBuf;
+#if  SMP_LOCKS
     CmiNodeLock   lock;
+    int           pushed;
+    int           destpe;
+#endif
 } MSG_LIST_INDEX;
 char                *destpe_avail;
-#if  !ONE_SEND_QUEUE && SMP_LOCKS
-    PCQueue     nonEmptyQueues;
-#endif
 #else         /* non-smp */
 
 static RDMA_REQUEST        *sendRdmaBuf = 0;
@@ -517,6 +517,9 @@ typedef struct smsg_queue
 {
     MSG_LIST_INDEX   *smsg_msglist_index;
     int               smsg_head_index;
+#if  SMP_LOCKS
+    PCQueue     nonEmptyQueues;
+#endif
 } SMSG_QUEUE;
 #else
 typedef struct smsg_queue
@@ -1222,7 +1225,7 @@ void CmiMachineProgressImpl() {
 }
 #endif
 
-static int SendBufferMsg(SMSG_QUEUE *queue);
+static int SendBufferMsg(SMSG_QUEUE *queue, SMSG_QUEUE *urgent_queue);
 #if CMK_SMP
 static void SendRdmaMsg(PCQueue );
 #else
@@ -1366,7 +1369,7 @@ static void buffer_small_msgs(SMSG_QUEUE *queue, void *msg, int size, int destNo
     CmiLock(queue->smsg_msglist_index[destNode].lock);
     if(queue->smsg_msglist_index[destNode].pushed == 0)
     {
-        PCQueuePush(nonEmptyQueues, (char*)&(queue->smsg_msglist_index[destNode]));
+        PCQueuePush(queue->nonEmptyQueues, (char*)&(queue->smsg_msglist_index[destNode]));
     }
     PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
     CmiUnlock(queue->smsg_msglist_index[destNode].lock);
@@ -2034,8 +2037,8 @@ void LrtsPostNonLocal(){
 #if CMK_SMP_TRACE_COMMTHREAD
     double startT, endT;
 #endif
-#if MULTI_THREAD_SEND
 
+#if MULTI_THREAD_SEND
     if(mysize == 1) return;
 
 #if CMK_SMP_TRACE_COMMTHREAD
@@ -2062,17 +2065,19 @@ void LrtsPostNonLocal(){
 #if CMK_WORKER_SINGLE_TASK
     if (CmiMyRank() % 6 == 3)
 #endif
-    PumpRemoteTransactions( rdma_rx_cqh);         // rdma_rx_cqh
+    PumpRemoteTransactions(rdma_rx_cqh);         // rdma_rx_cqh
 #endif
 
 #if CMK_WORKER_SINGLE_TASK
     if (CmiMyRank() % 6 == 4)
 #endif
+    {
 #if CMK_USE_OOB
-    if (SendBufferMsg(&smsg_oob_queue) == 1)
+    SendBufferMsg(&smsg_oob_queue, NULL);
+    SendBufferMsg(&smsg_queue, &smsg_oob_queue);
+#else
+    SendBufferMsg(&smsg_queue, NULL);
 #endif
-    {
-        SendBufferMsg(&smsg_queue);
     }
 
 #if CMK_WORKER_SINGLE_TASK
@@ -2206,7 +2211,7 @@ static void PumpNetworkSmsg()
     CONTROL_MSG         *control_msg_tmp, *header_tmp;
     uint64_t            source_addr;
     SMSG_QUEUE         *queue = &smsg_queue;
-#if   CMK_DIRECT
+#if  CMK_DIRECT
     cmidirectMsg        *direct_msg;
 #endif
 #if CMI_PUMPNETWORKSMSG_CAP 
@@ -2711,7 +2716,7 @@ static void PumpCqWriteTransactions()
 #endif
 
 #if REMOTE_EVENT
-static void PumpRemoteTransactions( gni_cq_handle_t rx_cqh)
+static void PumpRemoteTransactions(gni_cq_handle_t rx_cqh)
 {
     gni_cq_entry_t          ev;
     gni_return_t            status;
@@ -3199,7 +3204,8 @@ static void  SendRdmaMsg()
 }
 #endif
 
-static inline gni_return_t _sendOneBufferedSmsg(SMSG_QUEUE *queue, MSG_LIST *ptr)
+static 
+inline gni_return_t _sendOneBufferedSmsg(SMSG_QUEUE *queue, MSG_LIST *ptr)
 {
     CONTROL_MSG         *control_msg_tmp;
     gni_return_t        status = GNI_RC_ERROR_RESOURCE;
@@ -3270,7 +3276,7 @@ static inline gni_return_t _sendOneBufferedSmsg(SMSG_QUEUE *queue, MSG_LIST *ptr
 
 #if ONE_SEND_QUEUE
 
-static int SendBufferMsg(SMSG_QUEUE *queue)
+static int SendBufferMsg(SMSG_QUEUE *queue, SMSG_QUEUE *prio_queue)
 {
     MSG_LIST            *ptr, *tmp_ptr, *pre=0, *current_head;
     CONTROL_MSG         *control_msg_tmp;
@@ -3323,7 +3329,7 @@ static int SendBufferMsg(SMSG_QUEUE *queue)
 
 #else   /*  ! ONE_SEND_QUEUE  */
 
-static int SendBufferMsg(SMSG_QUEUE *queue)
+static int SendBufferMsg(SMSG_QUEUE *queue, SMSG_QUEUE *prio_queue)
 {
     MSG_LIST            *ptr;
     gni_return_t        status;
@@ -3331,32 +3337,38 @@ static int SendBufferMsg(SMSG_QUEUE *queue)
 #if     CMI_SENDBUFFERSMSG_CAP
     int                 sent_length = 0;
 #endif
-    static int          index = -1;
     int idx;
 #if SMP_LOCKS
-    int nonempty = PCQueueLength(nonEmptyQueues);
+    int          index = -1;
+    int nonempty = PCQueueLength(queue->nonEmptyQueues);
     for(idx =0; idx<nonempty; idx++) 
     {
         index++;  if (index >= nonempty) index = 0;
 #if CMI_SENDBUFFERSMSG_CAP
         if ( sent_length >= SendBufferMsg_cap) { done = 0; return done;}
 #endif
-        CMI_PCQUEUEPOP_LOCK(nonEmptyQueues)
-        MSG_LIST_INDEX *current_list = (MSG_LIST_INDEX *)PCQueuePop(nonEmptyQueues);
-        CMI_PCQUEUEPOP_UNLOCK(nonEmptyQueues)
+        CMI_PCQUEUEPOP_LOCK(queue->nonEmptyQueues)
+        MSG_LIST_INDEX *current_list = (MSG_LIST_INDEX *)PCQueuePop(queue->nonEmptyQueues);
+        CMI_PCQUEUEPOP_UNLOCK(queue->nonEmptyQueues)
         if(current_list == NULL) break; 
+        if (prio_queue && PCQueueLength(prio_queue->smsg_msglist_index[current_list->destpe].sendSmsgBuf) != 0) {
+            PCQueuePush(queue->nonEmptyQueues, (char*)current_list);
+            continue;
+        }
         PCQueue current_queue= current_list->sendSmsgBuf;
         CmiLock(current_list->lock);
         int i, len = PCQueueLength(current_queue);
         current_list->pushed = 0;
         CmiUnlock(current_list->lock);
 #else      /* ! SMP_LOCKS */
+    static int          index = -1;
     for(idx =0; idx<mysize; idx++) 
     {
         index++;  if (index == mysize) index = 0;
 #if CMI_SENDBUFFERSMSG_CAP
         if ( sent_length >= SendBufferMsg_cap) { done = 0; return done;}
 #endif
+        if (prio_queue && PCQueueLength(prio_queue->smsg_msglist_index[index].sendSmsgBuf) != 0) continue;             // check urgent queue
         //if (index == myrank) continue;
         PCQueue current_queue = queue->smsg_msglist_index[index].sendSmsgBuf;
         int i, len = PCQueueLength(current_queue);
@@ -3386,13 +3398,13 @@ static int SendBufferMsg(SMSG_QUEUE *queue)
                     break;
                 }
             } 
-        } //end while
+        } //end for i
 #if SMP_LOCKS
         CmiLock(current_list->lock);
         if(!PCQueueEmpty(current_queue) && current_list->pushed == 0)
         {
             current_list->pushed = 1;
-            PCQueuePush(nonEmptyQueues, (char*)current_list);
+            PCQueuePush(queue->nonEmptyQueues, (char*)current_list);
         }
         CmiUnlock(current_list->lock); 
 #endif
@@ -3404,7 +3416,7 @@ static int SendBufferMsg(SMSG_QUEUE *queue)
 
 #else              /* non-smp */
 
-static int SendBufferMsg(SMSG_QUEUE *queue)
+static int SendBufferMsg(SMSG_QUEUE *queue, SMSG_QUEUE *prio_queue)
 {
     MSG_LIST            *ptr, *tmp_ptr, *pre=0, *current_head;
     CONTROL_MSG         *control_msg_tmp;
@@ -3513,11 +3525,11 @@ void LrtsAdvanceCommunication(int whileidle)
     startT = CmiWallTimer();
 #endif
 #if CMK_USE_OOB
-    if (SendBufferMsg(&smsg_oob_queue) == 1)
+    SendBufferMsg(&smsg_oob_queue, NULL);
+    STATS_SEND_SMSGS_TIME(SendBufferMsg(&smsg_queue, &smsg_oob_queue));
+#else
+    STATS_SEND_SMSGS_TIME(SendBufferMsg(&smsg_queue, NULL));
 #endif
-    {
-        STATS_SEND_SMSGS_TIME(SendBufferMsg(&smsg_queue));
-    }
 #if CMK_SMP_TRACE_COMMTHREAD
     endT = CmiWallTimer();
     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SendBufferSmsg, startT, endT);
@@ -3557,7 +3569,7 @@ void LrtsAdvanceCommunication(int whileidle)
 #endif
 
 #if REMOTE_EVENT
-    STATS_PUMPREMOTETRANSACTIONS_TIME(PumpRemoteTransactions( rdma_rx_cqh));
+    STATS_PUMPREMOTETRANSACTIONS_TIME(PumpRemoteTransactions(rdma_rx_cqh));
 #endif
 
 #if CMK_SMP_TRACE_COMMTHREAD
@@ -3758,7 +3770,7 @@ static void _init_send_queue(SMSG_QUEUE *queue)
 #else
      queue->smsg_msglist_index = (MSG_LIST_INDEX*)malloc(mysize*sizeof(MSG_LIST_INDEX));
 #if CMK_SMP && SMP_LOCKS
-     nonEmptyQueues = PCQueueCreate();
+     queue->nonEmptyQueues = PCQueueCreate();
 #endif
      for(i =0; i<mysize; i++)
      {
@@ -3767,6 +3779,7 @@ static void _init_send_queue(SMSG_QUEUE *queue)
 #if SMP_LOCKS
          queue->smsg_msglist_index[i].pushed = 0;
          queue->smsg_msglist_index[i].lock = CmiCreateLock();
+         queue->smsg_msglist_index[i].destpe = i;
 #endif
 #else
          queue->smsg_msglist_index[i].sendSmsgBuf = 0; 
@@ -4336,9 +4349,9 @@ void LrtsDrainResources()
     if(mysize == 1) return;
     while (
 #if CMK_USE_OOB
-           !SendBufferMsg(&smsg_oob_queue) ||
+           !SendBufferMsg(&smsg_oob_queue, NULL) ||
 #endif
-           !SendBufferMsg(&smsg_queue) 
+           !SendBufferMsg(&smsg_queue, NULL
           )
     {
         if (useDynamicSMSG)