linklist for nonempty pcqueue in smp
authorYanhua Sun <yanhuas@jyc1.(none)>
Mon, 5 Mar 2012 18:51:09 +0000 (12:51 -0600)
committerYanhua Sun <yanhuas@jyc1.(none)>
Mon, 5 Mar 2012 18:51:09 +0000 (12:51 -0600)
src/arch/gemini_gni/machine.c

index dcc28a4e64edef9aea6bac5905448aae325ab7b3..a538722b04d7baa2f68e8a16082678ca45026295 100644 (file)
@@ -411,14 +411,19 @@ typedef struct  rmda_msg
 
 
 #if CMK_SMP
-
+#define SMP_LOCKS               0
 #define ONE_SEND_QUEUE                  0
 PCQueue sendRdmaBuf;
 typedef struct  msg_list_index
 {
     PCQueue     sendSmsgBuf;
+    int         pushed;
+    CmiNodeLock   lock;
 } MSG_LIST_INDEX;
-
+char                *destpe_avail;
+#if  !ONE_SEND_QUEUE && SMP_LOCKS
+    PCQueue     nonEmptyQueues;
+#endif
 #else         /* non-smp */
 
 static RDMA_REQUEST        *sendRdmaBuf = 0;
@@ -437,9 +442,7 @@ typedef struct  msg_list_index
 typedef struct smsg_queue
 {
     MSG_LIST_INDEX   *smsg_msglist_index;
-#if ! CMK_SMP
     int               smsg_head_index;
-#endif
 } SMSG_QUEUE;
 #else
 typedef struct smsg_queue
@@ -892,10 +895,20 @@ static void buffer_small_msgs(SMSG_QUEUE *queue, void *msg, int size, int destNo
 #else
 #if ONE_SEND_QUEUE
     PCQueuePush(queue->sendMsgBuf, (char*)msg_tmp);
+#else
+#if SMP_LOCKS
+    CmiLock(queue->smsg_msglist_index[destNode].lock);
+    if(queue->smsg_msglist_index[destNode].pushed == 0)
+    {
+        PCQueuePush(nonEmptyQueues, (char*)&(queue->smsg_msglist_index[destNode]));
+    }
+    PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
+    CmiUnlock(queue->smsg_msglist_index[destNode].lock);
 #else
     PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
 #endif
 #endif
+#endif
 #if PRINT_SYH
     buffered_smsg_counter++;
 #endif
@@ -1338,6 +1351,7 @@ static void ProcessDeadlock()
             CmiPrintf("Charm++> Network progress engine appears to have stalled, possibly because registered memory limits have been exceeded or are too low.  Try adjusting environment variables CHARM_UGNI_MEMPOOL_MAX and CHARM_UGNI_SEND_MAX (current limits are %d and %d).\n", MAX_REG_MEM, MAX_BUFF_SEND);
             CmiAbort("Fatal> Deadlock detected.");
         }
+
     }
     _detected_hang = 0;
 }
@@ -2109,30 +2123,39 @@ static int SendBufferMsg(SMSG_QUEUE *queue)
 #endif
 
 #if CMK_SMP
-    static int          index = 0;
-    int                 idx;
+    int          index = 0;
 #if ONE_SEND_QUEUE
-    char                *destpe;
-    destpe = (char*)CmiTmpAlloc(mysize * sizeof(char));
-    memset(destpe, 0, mysize * sizeof(char));
-    for (idx=0; idx<1; idx++)
+    memset(destpe_avail, 0, mysize * sizeof(char));
+    for (index=0; index<1; index++)
     {
         int i, len = PCQueueLength(queue->sendMsgBuf);
         for (i=0; i<len; i++) 
         {
             ptr = (MSG_LIST*)PCQueuePop(queue->sendMsgBuf);
-            if (ptr == 0) break;
-            if (destpe[ptr->destNode] == 1) {       /* can't send to this pe */
+            if (destpe_avail[ptr->destNode] == 1) {       /* can't send to this pe */
                 PCQueuePush(queue->sendMsgBuf, (char*)ptr);
                 continue;
             }
 #else
-    for (idx=0; idx<mysize; idx++)
+#if SMP_LOCKS
+    int nonempty = PCQueueLength(nonEmptyQueues);
+    for(index =0; index<nonempty; index++)
     {
-        int i, len = PCQueueLength(queue->smsg_msglist_index[index].sendSmsgBuf);
+        MSG_LIST_INDEX *current_list = (MSG_LIST_INDEX *)PCQueuePop(nonEmptyQueues);
+        PCQueue current_queue= current_list-> sendSmsgBuf;
+        CmiLock(current_list->lock);
+        int i, len = PCQueueLength(current_queue);
+        current_list->pushed = 0;
+        CmiUnlock(current_list->lock);
+#else
+    for(index =0; index<mysize; index++)
+    {
+        PCQueue current_queue = queue->smsg_msglist_index[index].sendSmsgBuf;
+        int i, len = PCQueueLength(current_queue);
+#endif
         for (i=0; i<len; i++) 
         {
-            ptr = (MSG_LIST*)PCQueuePop(queue->smsg_msglist_index[index].sendSmsgBuf);
+            ptr = (MSG_LIST*)PCQueuePop(current_queue);
             if (ptr == 0) break;
 #endif
 #else
@@ -2217,7 +2240,7 @@ static int SendBufferMsg(SMSG_QUEUE *queue)
 #if ONE_SEND_QUEUE
                 PCQueuePush(queue->sendMsgBuf, (char*)ptr);
 #else
-                PCQueuePush(queue->smsg_msglist_index[index].sendSmsgBuf, (char*)ptr);
+                PCQueuePush(current_queue, (char*)ptr);
 #endif
 #else
                 pre = ptr;
@@ -2227,7 +2250,7 @@ static int SendBufferMsg(SMSG_QUEUE *queue)
                 if(status == GNI_RC_ERROR_RESOURCE)
                 {
 #if CMK_SMP && ONE_SEND_QUEUE 
-                    destpe[ptr->destNode] = 1;
+                    destpe_avail[ptr->destNode] = 1;
 #else
                     break;
 #endif
@@ -2249,7 +2272,15 @@ static int SendBufferMsg(SMSG_QUEUE *queue)
         }
         index = queue->smsg_msglist_index[index].next;
 #else
-        index = (index+1)%mysize;
+#if !ONE_SEND_QUEUE && SMP_LOCKS
+        CmiLock(current_list->lock);
+        if(!PCQueueEmpty(current_queue) && current_list->pushed == 0)
+        {
+            current_list->pushed = 1;
+            PCQueuePush(nonEmptyQueues, current_list);
+        }
+        CmiUnlock(current_list->lock); 
+#endif
 #endif
 
 #if CMI_EXERT_SEND_CAP
@@ -2257,9 +2288,6 @@ static int SendBufferMsg(SMSG_QUEUE *queue)
                break;
 #endif
     }   // end pooling for all cores
-#if ONE_SEND_QUEUE
-    CmiTmpFree(destpe);
-#endif
     return done;
 }
 
@@ -2491,21 +2519,27 @@ static void _init_send_queue(SMSG_QUEUE *queue)
      int i;
 #if ONE_SEND_QUEUE
      queue->sendMsgBuf = PCQueueCreate();
+     destpe_avail = (char*)malloc(mysize * sizeof(char));
 #else
      queue->smsg_msglist_index = (MSG_LIST_INDEX*)malloc(mysize*sizeof(MSG_LIST_INDEX));
+#if CMK_SMP && SMP_LOCKS
+     nonEmptyQueues = PCQueueCreate();
+#endif
      for(i =0; i<mysize; i++)
      {
 #if CMK_SMP
-        queue->smsg_msglist_index[i].sendSmsgBuf = PCQueueCreate();
+         queue->smsg_msglist_index[i].sendSmsgBuf = PCQueueCreate();
+#if SMP_LOCKS
+         queue->smsg_msglist_index[i].pushed = 0;
+         queue->smsg_msglist_index[i].lock = CmiCreateLock();
+#endif
 #else
-        queue->smsg_msglist_index[i].sendSmsgBuf = 0; 
-        queue->smsg_msglist_index[i].next = -1;
+         queue->smsg_msglist_index[i].sendSmsgBuf = 0; 
+         queue->smsg_msglist_index[i].next = -1;
+         queue->smsg_head_index = -1;
 #endif
         
      }
-#if ! CMK_SMP
-     queue->smsg_head_index = -1;
-#endif
 #endif
 }