added sender side throttling control
authorYanhua Sun <yanhuas@jyc1.(none)>
Wed, 28 Mar 2012 22:29:10 +0000 (17:29 -0500)
committerYanhua Sun <yanhuas@jyc1.(none)>
Wed, 28 Mar 2012 22:29:10 +0000 (17:29 -0500)
src/arch/gemini_gni/machine.c

index 668e4d84a93762dad8ab022b2968499cf3fba7c5..0871abc9fadbd916115e9095b58c19acad83f314 100644 (file)
@@ -64,9 +64,9 @@
 #define CMI_EXERT_SEND_CAP     0
 #define        CMI_EXERT_RECV_CAP      0
 #define CMI_EXERT_RDMA_CAP      0
-
 #if CMI_EXERT_SEND_CAP
-#define SEND_CAP  32
+int SEND_large_cap = 100;
+int SEND_large_pending = 0;
 #endif
 
 #if CMI_EXERT_RECV_CAP
@@ -1687,11 +1687,21 @@ inline static gni_return_t send_large_messages(SMSG_QUEUE *queue, int destNode,
         register_size = 0;  
     }
 
+#if CMI_EXERT_SEND_CAP
+    if(SEND_large_pending >= SEND_large_cap)
+    {
+        status = GNI_RC_ERROR_NOMEM;
+    }
+#endif
     if(status == GNI_RC_SUCCESS)
     {
-        status = send_smsg_message( queue, destNode, control_msg_tmp, CONTROL_MSG_SIZE, LMSG_INIT_TAG, inbuff, smsg_ptr); 
+       status = send_smsg_message( queue, destNode, control_msg_tmp, CONTROL_MSG_SIZE, LMSG_INIT_TAG, inbuff, smsg_ptr); 
         if(status == GNI_RC_SUCCESS)
         {
+#if CMI_EXERT_SEND_CAP
+            SEND_large_pending++;
+#endif
             buffered_send_msg += register_size;
             if(control_msg_tmp->seq_id == 0)
             {
@@ -2239,6 +2249,9 @@ static void PumpNetworkSmsg()
                     buffered_send_msg -= GetMempoolsize(msg);
                 MACHSTATE5(8, "GO send done to %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag); 
                 CmiFree(msg);
+#if CMI_EXERT_SEND_CAP
+                SEND_large_pending--;
+#endif
                 break;
             }
 #endif
@@ -2252,6 +2265,9 @@ static void PumpNetworkSmsg()
                 header_tmp = (CONTROL_MSG *) header;
 #endif
                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
+#if CMI_EXERT_SEND_CAP
+                    SEND_large_pending--;
+#endif
                 void *msg = (void*)(header_tmp->source_addr);
                 int cur_seq = CmiGetMsgSeq(msg);
                 int offset = ONE_SEG*(cur_seq+1);
@@ -2678,6 +2694,9 @@ static void PumpRemoteTransactions()
                 buffered_send_msg -= GetMempoolsize(msg);
             CmiFree(msg);
             IndexPool_freeslot(&ackPool, index);
+#if CMI_EXERT_SEND_CAP
+            SEND_large_pending--;
+#endif
             break;
 #if CMK_PERSISTENT_COMM
         case 1:  {    // PERSISTENT
@@ -3037,9 +3056,6 @@ static int SendBufferMsg(SMSG_QUEUE *queue)
     uint64_t            register_size;
     void                *register_addr;
     int                 index_previous = -1;
-#if CMI_EXERT_SEND_CAP
-    int                        sent_cnt = 0;
-#endif
 
 #if CMK_SMP
     int          index = 0;
@@ -3173,9 +3189,6 @@ static int SendBufferMsg(SMSG_QUEUE *queue)
                 FreeMsgList(tmp_ptr);
 #else
                 FreeMsgList(ptr);
-#endif
-#if CMI_EXERT_SEND_CAP
-                if(++sent_cnt == SEND_CAP) break;
 #endif
             }else {
 #if CMK_SMP
@@ -3225,12 +3238,6 @@ static int SendBufferMsg(SMSG_QUEUE *queue)
 #endif
 #endif
 
-#if CMI_EXERT_SEND_CAP
-       if(sent_cnt == SEND_CAP) {
-            done = 0;
-           break;
-        }
-#endif
     }   // end pooling for all cores
     return done;
 }
@@ -3667,6 +3674,12 @@ void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID)
     Cmi_smp_mode_setting = COMM_WORK_THREADS_SEND_RECV;
 #endif
 
+#if CMI_EXERT_SEND_CAP
+    CmiGetArgInt(*argv,"+useSendLargeCap", &SEND_large_cap);
+#endif
+
+    CmiGetArgInt(*argv,"+useRecvQueue", &REMOTE_QUEUE_ENTRIES);
+    
     env = getenv("CHARM_UGNI_REMOTE_QUEUE_SIZE");
     if (env) REMOTE_QUEUE_ENTRIES = atoi(env);
     CmiGetArgInt(*argv,"+useRecvQueue", &REMOTE_QUEUE_ENTRIES);