added sendbuffer control
authorYanhua Sun <yanhuas@jyc1.(none)>
Tue, 3 Apr 2012 05:24:10 +0000 (00:24 -0500)
committerYanhua Sun <yanhuas@jyc1.(none)>
Tue, 3 Apr 2012 05:24:10 +0000 (00:24 -0500)
src/arch/gemini_gni-crayxe/conv-mach.h
src/arch/gemini_gni/machine.c

index 85d83abdb6a211e9b24213b43215daee4da793a5..86cc172ab36b74afc57c850ed2bd49dc9f5bcf7f 100644 (file)
@@ -52,7 +52,7 @@
 #define CMK_THREADS_USE_JCONTEXT                           0
 #define CMK_THREADS_USE_PTHREADS                           0
 
-#define CMK_USE_SPINLOCK                                   0
+#define CMK_USE_SPINLOCK                                   1
 
 /* Specifies what kind of timer to use, and the correspondent headers will be
    included in convcore.c. If none is selected, then the machine.c file needs to
index 01f84e35e9ee4b93a9b7284100fd240f2608180c..05fd6a2d35f81602645384d179b5c6f381a08863 100644 (file)
 #define REMOTE_EVENT               1
 #define CQWRITE                    0
 
-#define CMI_EXERT_SEND_CAP     0
-#define        CMI_EXERT_RECV_CAP      0
-#define CMI_EXERT_RDMA_CAP      0
+#define CMI_EXERT_SEND_LARGE_CAP       0
+#define CMI_EXERT_RECV_RDMA_CAP      0
 
-#if CMI_EXERT_SEND_CAP
-int SEND_large_cap = 100;
-int SEND_large_pending = 0;
+
+#define CMI_SENDBUFFERSMSG_CAP 0
+#define CMI_PUMPNETWORKSMSG_CAP 0
+
+#if CMI_SENDBUFFERSMSG_CAP
+int     SendBufferMsg_cap  =10;
 #endif
 
-#if CMI_EXERT_RECV_CAP
-#define RECV_CAP  4                  /* cap <= 2 sometimes hang */
+#if CMI_PUMPNETWORKSMSG_CAP
+int     PumpNetworkSmsg_cap = 10;
+#endif
+
+#if CMI_EXERT_SEND_LARGE_CAP
+int SEND_large_cap = 100;
+int SEND_large_pending = 0;
 #endif
 
-#if CMI_EXERT_RDMA_CAP
+#if CMI_EXERT_RECV_RDMA_CAP
 int   RDMA_cap =   100;
 int   RDMA_pending = 0;
 #endif
@@ -85,7 +92,7 @@ int   RDMA_pending = 0;
 
 // Trace communication thread
 #if CMK_TRACE_ENABLED && CMK_SMP_TRACE_COMMTHREAD
-#define TRACE_THRESHOLD     0.00005
+#define TRACE_THRESHOLD     0.00001
 #define CMI_MPI_TRACE_MOREDETAILED 0
 #undef CMI_MPI_TRACE_USEREVENTS
 #define CMI_MPI_TRACE_USEREVENTS 1
@@ -469,7 +476,7 @@ typedef struct  rmda_msg
 
 
 #if CMK_SMP
-#define SMP_LOCKS                       0
+#define SMP_LOCKS               0
 #define ONE_SEND_QUEUE                  0
 PCQueue sendRdmaBuf;
 typedef struct  msg_list_index
@@ -1690,7 +1697,7 @@ inline static gni_return_t send_large_messages(SMSG_QUEUE *queue, int destNode,
         register_size = 0;  
     }
 
-#if CMI_EXERT_SEND_CAP
+#if CMI_EXERT_SEND_LARGE_CAP
     if(SEND_large_pending >= SEND_large_cap)
     {
         status = GNI_RC_ERROR_NOMEM;
@@ -1702,7 +1709,7 @@ inline static gni_return_t send_large_messages(SMSG_QUEUE *queue, int destNode,
        status = send_smsg_message( queue, destNode, control_msg_tmp, CONTROL_MSG_SIZE, LMSG_INIT_TAG, inbuff, smsg_ptr); 
         if(status == GNI_RC_SUCCESS)
         {
-#if CMI_EXERT_SEND_CAP
+#if CMI_EXERT_SEND_LARGE_CAP
             SEND_large_pending++;
 #endif
             buffered_send_msg += register_size;
@@ -2166,11 +2173,12 @@ static void PumpNetworkSmsg()
 #if   CMK_DIRECT
     cmidirectMsg        *direct_msg;
 #endif
-#if CMI_EXERT_RECV_CAP
+#if CMI_PUMPNETWORKSMSG_CAP 
     int                  recv_cnt = 0;
+    while(recv_cnt< PumpNetworkSmsg_cap) {
+#else
+    while(1) {
 #endif
-    while(1)
-    {
         CMI_GNI_LOCK(smsg_rx_cq_lock)
         status =GNI_CqGetEvent(smsg_rx_cqh, &event_data);
         CMI_GNI_UNLOCK(smsg_rx_cq_lock)
@@ -2198,6 +2206,9 @@ static void PumpNetworkSmsg()
                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
                 break;
             }
+#if         CMI_PUMPNETWORKSMSG_CAP
+            recv_cnt++; 
+#endif
 #if PRINT_SYH
             printf("[%d] from %d smsg msg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
 #endif
@@ -2248,7 +2259,7 @@ static void PumpNetworkSmsg()
                     buffered_send_msg -= GetMempoolsize(msg);
                 MACHSTATE5(8, "GO send done to %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag); 
                 CmiFree(msg);
-#if CMI_EXERT_SEND_CAP
+#if CMI_EXERT_SEND_LARGE_CAP
                 SEND_large_pending--;
 #endif
                 break;
@@ -2264,7 +2275,7 @@ static void PumpNetworkSmsg()
                 header_tmp = (CONTROL_MSG *) header;
 #endif
                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
-#if CMI_EXERT_SEND_CAP
+#if CMI_EXERT_SEND_LARGE_CAP
                     SEND_large_pending--;
 #endif
                 void *msg = (void*)(header_tmp->source_addr);
@@ -2343,9 +2354,6 @@ static void PumpNetworkSmsg()
 #endif
             smsg_recv_count ++;
             msg_tag = GNI_SMSG_ANY_TAG;
-#if CMI_EXERT_RECV_CAP
-            if (status == GNI_RC_SUCCESS && ++recv_cnt == RECV_CAP) return;
-#endif
         } //endwhile GNI_SmsgGetNextWTag
     }   //end while GetEvent
     if(status == GNI_RC_ERROR_RESOURCE)
@@ -2477,7 +2485,7 @@ static void getLargeMsgRequest(void* header, uint64_t inst_id )
     pd->src_cq_hndl     = rdma_tx_cqh;
     pd->rdma_mode       = 0;
     pd->amo_cmd         = 0;
-#if CMI_EXERT_RDMA_CAP
+#if CMI_EXERT_RECV_RDMA_CAP
     if(status == GNI_RC_SUCCESS && RDMA_pending >= RDMA_cap ) status = GNI_RC_ERROR_RESOURCE; 
 #endif
     //memory registration success
@@ -2509,7 +2517,7 @@ static void getLargeMsgRequest(void* header, uint64_t inst_id )
 
         if(status == GNI_RC_SUCCESS )
         {
-#if CMI_EXERT_RDMA_CAP
+#if CMI_EXERT_RECV_RDMA_CAP
             RDMA_pending++;
 #endif
             if(pd->cqwrite_value == 0)
@@ -2697,7 +2705,7 @@ static void PumpRemoteTransactions()
                 buffered_send_msg -= GetMempoolsize(msg);
             CmiFree(msg);
             IndexPool_freeslot(&ackPool, index);
-#if CMI_EXERT_SEND_CAP
+#if CMI_EXERT_SEND_LARGE_CAP
             SEND_large_pending--;
 #endif
             break;
@@ -2754,7 +2762,7 @@ static void PumpLocalTransactions(gni_cq_handle_t my_tx_cqh, CmiNodeLock my_cq_l
         if (type == GNI_CQ_EVENT_TYPE_POST)
         {
 
-#if CMI_EXERT_RDMA_CAP
+#if CMI_EXERT_RECV_RDMA_CAP
             if(RDMA_pending <=0) CmiAbort(" pending error\n");
             RDMA_pending--;
 #endif
@@ -2929,7 +2937,7 @@ static void  SendRdmaMsg()
     int len = PCQueueLength(sendRdmaBuf);
     for (i=0; i<len; i++)
     {
-#if CMI_EXERT_RDMA_CAP
+#if CMI_EXERT_RECV_RDMA_CAP
         if( RDMA_pending >= RDMA_cap) break;
 #endif
         CMI_PCQUEUEPOP_LOCK(sendRdmaBuf)
@@ -2940,7 +2948,7 @@ static void  SendRdmaMsg()
     ptr = sendRdmaBuf;
     while (ptr!=0 )
     {
-#if CMI_EXERT_RDMA_CAP
+#if CMI_EXERT_RECV_RDMA_CAP
          if( RDMA_pending >= RDMA_cap) break;
 #endif
 #endif 
@@ -2980,16 +2988,17 @@ static void  SendRdmaMsg()
 #if CMK_SMP_TRACE_COMMTHREAD
 //            int oldpe = -1;
 //            int oldeventid = -1;
-//            if(pd->type == GNI_POST_RDMA_PUT || pd->type == GNI_POST_FMA_PUT)
+//            START_EVENT();
+//            if(IS_PUT(pd->type))
 //            { 
 //                TRACE_COMM_GET_MSGID((void*)pd->local_addr, &oldpe, &oldeventid);
 //                TRACE_COMM_SET_COMM_MSGID((void*)pd->local_addr);
 //            }
-              if(IS_PUT(pd->type) )
-              { 
-                  START_EVENT();
-                  TRACE_COMM_CREATION(CpvAccess(projTraceStart), (void*)pd->local_addr);
-              }
+            if(IS_PUT(pd->type))
+            {
+                 START_EVENT();
+                 TRACE_COMM_CREATION(CpvAccess(projTraceStart), (void*)pd->local_addr);//based on assumption, post always succeeds on first try
+            }
 #endif
 
             if(pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) 
@@ -3003,14 +3012,14 @@ static void  SendRdmaMsg()
             CMI_GNI_UNLOCK(lock);
             
 #if CMK_SMP_TRACE_COMMTHREAD
-//            if(pd->type == GNI_POST_RDMA_PUT || pd->type == GNI_POST_FMA_PUT)
+//            if(IS_PUT(pd->type))
 //            { 
 //                if (oldpe != -1)  TRACE_COMM_SET_MSGID((void*)pd->local_addr, oldpe, oldeventid);
 //            }
 #endif
             if(status == GNI_RC_SUCCESS)    //post good
             {
-#if CMI_EXERT_RDMA_CAP
+#if CMI_EXERT_RECV_RDMA_CAP
                 RDMA_pending ++;
 #endif
 #if !CMK_SMP
@@ -3028,6 +3037,10 @@ static void  SendRdmaMsg()
                 {
 #if CMK_SMP_TRACE_COMMTHREAD 
                     pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
+//                    if(IS_PUT(pd->type))
+//                    { 
+//                        TRACE_COMM_CREATION(CpvAccess(projTraceStart), (void*)pd->local_addr);
+//                    }
 #endif
                     IncreaseMsgInRecv(((void*)(pd->local_addr)));
                 }
@@ -3081,7 +3094,9 @@ static int SendBufferMsg(SMSG_QUEUE *queue)
     uint64_t            register_size;
     void                *register_addr;
     int                 index_previous = -1;
-
+#if     CMI_SENDBUFFERSMSG_CAP
+    int                 sent_length = 0;
+#endif
 #if CMK_SMP
     int          index = 0;
 #if ONE_SEND_QUEUE
@@ -3102,8 +3117,12 @@ static int SendBufferMsg(SMSG_QUEUE *queue)
 #else
 #if SMP_LOCKS
     int nonempty = PCQueueLength(nonEmptyQueues);
-    for(index =0; index<nonempty; index++)
-    {
+#if CMI_SENDBUFFERSMSG_CAP
+    for(index =0; index<nonempty ; index++) {
+        if ( sent_length < SendBufferMsg_cap) { done = 0; return done;}
+#else
+    for(index =0; index<nonempty; index++) {
+#endif
         CMI_PCQUEUEPOP_LOCK(nonEmptyQueues)
         MSG_LIST_INDEX *current_list = (MSG_LIST_INDEX *)PCQueuePop(nonEmptyQueues);
         CMI_PCQUEUEPOP_UNLOCK(nonEmptyQueues)
@@ -3114,14 +3133,16 @@ static int SendBufferMsg(SMSG_QUEUE *queue)
         current_list->pushed = 0;
         CmiUnlock(current_list->lock);
 #else
-    for(index =0; index<mysize; index++)
-    {
+#if CMI_SENDBUFFERSMSG_CAP
+    for(index =0; index<mysize && sent_length < SendBufferMsg_cap; index++) {
+#else
+    for(index =0; index<mysize; index++) {
+#endif
         //if (index == myrank) continue;
         PCQueue current_queue = queue->smsg_msglist_index[index].sendSmsgBuf;
         int i, len = PCQueueLength(current_queue);
 #endif
-        for (i=0; i<len; i++) 
-        {
+        for (i=0; i<len; i++)  {
             CMI_PCQUEUEPOP_LOCK(current_queue)
             ptr = (MSG_LIST*)PCQueuePop(current_queue);
             CMI_PCQUEUEPOP_UNLOCK(current_queue)
@@ -3196,6 +3217,9 @@ static int SendBufferMsg(SMSG_QUEUE *queue)
                 printf("Weird tag\n");
                 CmiAbort("should not happen\n");
             }       // end switch
+#if CMI_SENDBUFFERSMSG_CAP
+            sent_length++;
+#endif
             if(status == GNI_RC_SUCCESS)
             {
 #if PRINT_SYH
@@ -3698,10 +3722,18 @@ void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID)
     Cmi_smp_mode_setting = COMM_WORK_THREADS_SEND_RECV;
 #endif
 
-#if CMI_EXERT_SEND_CAP
+#if CMI_EXERT_SEND_LARGE_CAP
     CmiGetArgInt(*argv,"+useSendLargeCap", &SEND_large_cap);
 #endif
 
+#if CMI_SENDBUFFERSMSG_CAP
+    CmiGetArgInt(*argv,"+useSendBufferCap", &SendBufferMsg_cap);
+#endif
+
+#if CMI_PUMPNETWORKSMSG_CAP 
+    CmiGetArgInt(*argv,"+usePumpSmsgCap", &PumpNetworkSmsg_cap);
+#endif
+
     CmiGetArgInt(*argv,"+useRecvQueue", &REMOTE_QUEUE_ENTRIES);
     
     env = getenv("CHARM_UGNI_REMOTE_QUEUE_SIZE");
@@ -3867,7 +3899,7 @@ void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID)
     if (env) _checkProgress = 0;
     if (mysize == 1) _checkProgress = 0;
 
-#if CMI_EXERT_RDMA_CAP
+#if CMI_EXERT_RECV_RDMA_CAP
     env = getenv("CHARM_UGNI_RDMA_MAX");
     if (env)  {
         RDMA_pending = atoi(env);