speedup sendbuffersmsg sendrdma
authorYanhua Sun <sun51@hopper05.(none)>
Wed, 15 Feb 2012 21:26:50 +0000 (13:26 -0800)
committerYanhua Sun <sun51@hopper05.(none)>
Wed, 15 Feb 2012 21:26:50 +0000 (13:26 -0800)
src/arch/gemini_gni/machine.c

index 43954b372429d6b0419f6b991939a242ccfa4478..73a008bc769f8b23d8a66dfda41ad539b106bf8c 100644 (file)
@@ -212,6 +212,13 @@ static int  REMOTE_QUEUE_ENTRIES=20480;
 static int LOCAL_QUEUE_ENTRIES=20480; 
 #endif
 
+typedef enum send_gni_return
+{
+    SEND_GNI_SUCCESS=10,
+    SEND_GNI_NO_RESOURCE,
+    SEND_GNI_NO_MEM
+}send_gni_return_t;
+
 #define BIG_MSG_TAG  0x26
 #define PUT_DONE_TAG      0x28
 #define DIRECT_PUT_DONE_TAG      0x29
@@ -675,7 +682,6 @@ static  gni_return_t deregisterMempool(mempool_type *mptr, block_header **from)
     status = MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(GetMemHndlFromHeader(current)) , &omdh);
     GNI_RC_CHECK("registerMemorypool de-register", status);
     register_mempool -= GetSizeFromHeader(current);
-//printf("[n %d] deregisterMempool done. register_mempool: %d\n", myrank, register_mempool);
     SetMemHndlZero(GetMemHndlFromHeader(current));
     return GNI_RC_SUCCESS;
 }
@@ -967,9 +973,10 @@ inline static CONTROL_MSG* construct_control_msg(int size, char *msg, uint8_t se
 // Large message, send control to receiver, receiver register memory and do a GET, 
 // return 1 - send no success
 inline
-static int send_large_messages(SMSG_QUEUE *queue, int destNode, CONTROL_MSG  *control_msg_tmp, int inbuff)
+static send_gni_return_t send_large_messages(SMSG_QUEUE *queue, int destNode, CONTROL_MSG  *control_msg_tmp, int inbuff)
 {
-    gni_return_t        status  =   GNI_RC_NOT_DONE;
+    gni_return_t        status  =  GNI_RC_ERROR_NOMEM;
+    send_gni_return_t   send_status = SEND_GNI_NO_MEM;
     uint32_t            vmdh_index  = -1;
     int                 size;
     int                 offset = 0;
@@ -985,7 +992,7 @@ static int send_large_messages(SMSG_QUEUE *queue, int destNode, CONTROL_MSG  *co
         {
             if(!inbuff)
                 buffer_small_msgs(queue, control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
-            return status;
+            return SEND_GNI_NO_MEM;
         }
         if(IsMemHndlZero(GetMemHndl(source_addr))) //it is in mempool, it is possible to be de-registered by others
         {
@@ -1024,8 +1031,11 @@ static int send_large_messages(SMSG_QUEUE *queue, int destNode, CONTROL_MSG  *co
                 IncreaseMsgInSend(source_addr);
             }
             FreeControlMsg(control_msg_tmp);
+            send_status = SEND_GNI_SUCCESS;
             MACHSTATE5(8, "GO SMSG LARGE to %d (%d,%d,%d) tag=%d\n", destNode, buffered_send_msg, buffered_recv_msg, register_mempool, LMSG_INIT_TAG); 
-        }
+        }else
+            send_status = SEND_GNI_NO_RESOURCE;
+
     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
     {
         CmiAbort("Memory registor for large msg\n");
@@ -1034,7 +1044,7 @@ static int send_large_messages(SMSG_QUEUE *queue, int destNode, CONTROL_MSG  *co
         if(!inbuff)
             buffer_small_msgs(queue, control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
     }
-    return status;
+    return send_status;
 #else
     status = MEMORY_REGISTER(onesided_hnd, nic_hndl,msg, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), &omdh);
     if(status == GNI_RC_SUCCESS)
@@ -1796,7 +1806,6 @@ static void  SendRdmaMsg()
             }else
             {
                 pd->local_mem_hndl = GetMemHndl((void*)(pd->local_addr));
-                status = GNI_RC_SUCCESS;
             }
             if(NoMsgInRecv( (void*)(pd->local_addr)))
                 register_size = GetMempoolsize((void*)(pd->local_addr));
@@ -1806,7 +1815,7 @@ static void  SendRdmaMsg()
         {
             status = MEMORY_REGISTER(onesided_hnd, nic_hndl, pd->local_addr, pd->length, &(pd->local_mem_hndl), &omdh);
         }
-        if(status == GNI_RC_SUCCESS)
+        if(status == GNI_RC_SUCCESS)        //mem register good
         {
 #if CMK_SMP && !COMM_THREAD_SEND
             CmiLock(tx_cq_lock);
@@ -1818,29 +1827,39 @@ static void  SendRdmaMsg()
 #if CMK_SMP && !COMM_THREAD_SEND
             CmiUnlock(tx_cq_lock);
 #endif
-        }
-        if(status == GNI_RC_SUCCESS)
-        {
+
+            if(status == GNI_RC_SUCCESS)    //post good
+            {
 #if !CMK_SMP
-            tmp_ptr = ptr;
-            if(pre != 0) {
-                pre->next = ptr->next;
-            }
-            else {
-                sendRdmaBuf = ptr->next;
-            }
-            ptr = ptr->next;
-            FreeRdmaRequest(tmp_ptr);
+                tmp_ptr = ptr;
+                if(pre != 0) {
+                    pre->next = ptr->next;
+                }
+                else {
+                    sendRdmaBuf = ptr->next;
+                }
+                ptr = ptr->next;
+                FreeRdmaRequest(tmp_ptr);
 #endif
-            if(pd->cqwrite_value == 0)
-            {
-                IncreaseMsgInRecv(((void*)(pd->local_addr)));
-            }
+                if(pd->cqwrite_value == 0)
+                {
+                    IncreaseMsgInRecv(((void*)(pd->local_addr)));
+                }
 #if MACHINE_DEBUG_LOG
-            buffered_recv_msg += register_size;
-            MACHSTATE(8, "GO request from buffered\n"); 
+                buffered_recv_msg += register_size;
+                MACHSTATE(8, "GO request from buffered\n"); 
 #endif
-        }else
+            }else           // cannot post
+            {
+#if CMK_SMP
+                PCQueuePush(sendRdmaBuf, (char*)ptr);
+#else
+                pre = ptr;
+                ptr = ptr->next;
+#endif
+                break;
+            }
+        } else          //memory registration fails
         {
 #if CMK_SMP
             PCQueuePush(sendRdmaBuf, (char*)ptr);
@@ -1851,7 +1870,8 @@ static void  SendRdmaMsg()
         }
     } //end while
 #if ! CMK_SMP
-    sendRdmaTail = pre;
+    if(ptr == 0)
+        sendRdmaTail = pre;
 #endif
 }
 
@@ -1860,6 +1880,7 @@ static int SendBufferMsg(SMSG_QUEUE *queue)
 {
     MSG_LIST            *ptr, *tmp_ptr, *pre=0, *current_head;
     CONTROL_MSG         *control_msg_tmp;
+    send_gni_return_t  send_status = SEND_GNI_NO_RESOURCE;
     gni_return_t        status;
     int                 done = 1;
     int                 register_size;
@@ -1898,19 +1919,21 @@ static int SendBufferMsg(SMSG_QUEUE *queue)
               break;
             }
             status = GNI_RC_ERROR_RESOURCE;
+            send_status = SEND_GNI_NO_RESOURCE;
             switch(ptr->tag)
             {
             case SMALL_DATA_TAG:
                 status = send_smsg_message(queue, ptr->destNode,  ptr->msg, ptr->size, ptr->tag, 1);  
                 if(status == GNI_RC_SUCCESS)
                 {
+                    send_status = SEND_GNI_SUCCESS;
                     CmiFree(ptr->msg);
                 }
                 break;
             case LMSG_INIT_TAG:
                 control_msg_tmp = (CONTROL_MSG*)ptr->msg;
-                status = send_large_messages(queue, ptr->destNode, control_msg_tmp, 1);
-                if(status != GNI_RC_SUCCESS)
+                send_status = send_large_messages(queue, ptr->destNode, control_msg_tmp, 1);
+                if(send_status != SEND_GNI_SUCCESS)
                     done = 0;
                 break;
             case   ACK_TAG:
@@ -1918,6 +1941,7 @@ static int SendBufferMsg(SMSG_QUEUE *queue)
                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, sizeof(CONTROL_MSG), ptr->tag, 1);  
                 if(status == GNI_RC_SUCCESS)
                 {
+                    send_status = SEND_GNI_SUCCESS;
                     FreeControlMsg((CONTROL_MSG*)ptr->msg);
                 }else
                     done = 0;
@@ -1927,6 +1951,7 @@ static int SendBufferMsg(SMSG_QUEUE *queue)
                 status = send_smsg_message(queue, ptr->destNode, ptr->msg, sizeof(CMK_DIRECT_HEADER), ptr->tag, 1);  
                 if(status == GNI_RC_SUCCESS)
                 {
+                    send_status = SEND_GNI_SUCCESS;
                     free((CMK_DIRECT_HEADER*)ptr->msg);
                 }
                 break;
@@ -1936,7 +1961,7 @@ static int SendBufferMsg(SMSG_QUEUE *queue)
                 printf("Weird tag\n");
                 CmiAbort("should not happen\n");
             }
-            if(status == GNI_RC_SUCCESS)
+            if(send_status == SEND_GNI_SUCCESS)
             {
 #if !CMK_SMP
                 tmp_ptr = ptr;
@@ -1968,10 +1993,13 @@ static int SendBufferMsg(SMSG_QUEUE *queue)
                 ptr=ptr->next;
 #endif
                 done = 0;
+                if(send_status == SEND_GNI_NO_RESOURCE)
+                    break;
             } 
         } //end while
 #if !CMK_SMP
-        queue->smsg_msglist_index[index].tail = pre;
+        if(ptr == 0)
+            queue->smsg_msglist_index[index].tail = pre;
         if(queue->smsg_msglist_index[index].sendSmsgBuf == 0)
         {
             if(index_previous != -1)