a new mode that both worker and comm thread send and receive messages.
authorYanhua Sun <yanhuas@jyc1.(none)>
Fri, 9 Mar 2012 04:42:36 +0000 (22:42 -0600)
committerYanhua Sun <yanhuas@jyc1.(none)>
Fri, 9 Mar 2012 04:42:36 +0000 (22:42 -0600)
src/arch/gemini_gni/machine.c
src/arch/util/machine-common-core.c

index 95fe2d1998769a1bdaa09f593c8df76acae10554..3ce60e2d03cb6b5ef4c2bc38a191ec557cae49aa 100644 (file)
 #endif
 
 #if CMK_SMP
+#if LARGEPAGE
+#define MULTI_THREAD_SEND 1
+#else
 #define COMM_THREAD_SEND 1
-//#define MULTI_THREAD_SEND 1
+#endif
 #endif
 
 #if CMK_SMP
@@ -123,7 +126,11 @@ static int ONE_SEG  =  4*oneMB;
 static int BIG_MSG  =  4*oneMB;
 static int ONE_SEG  =  2*oneMB;
 #endif
+#if MULTI_THREAD_SEND
+static int BIG_MSG_PIPELINE = 1;
+#else
 static int BIG_MSG_PIPELINE = 4;
+#endif
 
 // dynamic flow control
 static CmiInt8 buffered_send_msg = 0;
@@ -147,12 +154,16 @@ static CmiInt8  MAX_REG_MEM    =  25*oneMB;
 
 #endif     /* end USE_LRTS_MEMPOOL */
 
-#if CMK_SMP && MULTI_THREAD_SEND
+#if MULTI_THREAD_SEND
 #define     CMI_GNI_LOCK        CmiLock(tx_cq_lock);
 #define     CMI_GNI_UNLOCK        CmiUnlock(tx_cq_lock);
+#define     CMI_PCQUEUEPOP_LOCK(Q)   CmiLock((Q)->lock);
+#define     CMI_PCQUEUEPOP_UNLOCK(Q)    CmiUnlock((Q)->lock);
 #else
 #define     CMI_GNI_LOCK
 #define     CMI_GNI_UNLOCK
+#define     CMI_PCQUEUEPOP_LOCK(Q)   
+#define     CMI_PCQUEUEPOP_UNLOCK(Q)
 #endif
 
 static int _tlbpagesize = 4096;
@@ -1567,19 +1578,29 @@ void LrtsPostCommonInit(int everReturn)
 
 /* this is called by worker thread */
 void LrtsPostNonLocal(){
-#if CMK_SMP
 #if MULTI_THREAD_SEND
     if(mysize == 1) return;
+    //printf("[%d,%d] worker call communication\n", CmiMyNode(), CmiMyRank());
+    PumpNetworkSmsg();
     PumpLocalRdmaTransactions();
+    
 #if CMK_USE_OOB
     if (SendBufferMsg(&smsg_oob_queue) == 1)
 #endif
-    SendBufferMsg(&smsg_queue);
+    {
 #if PIGGYBACK_ACK
-    SendBufferMsg(&smsg_ack_queue);
+    //if (count%10 == 0) SendBufferMsg(&smsg_ack_queue);
+    if (SendBufferMsg(&smsg_queue) == 1) {
+        //if (count++ % 10 == 0) 
+        SendBufferMsg(&smsg_ack_queue);
+    }
+#else
+    SendBufferMsg(&smsg_queue);
 #endif
+    }
+
     SendRdmaMsg();
-#endif
+    //LrtsAdvanceCommunication(1);
 #endif
 }
 
@@ -1724,9 +1745,11 @@ static void PumpNetworkSmsg()
         while(1) {
             CMI_GNI_LOCK
             status = GNI_SmsgGetNextWTag(ep_hndl_array[inst_id], &header, &msg_tag);
-            CMI_GNI_UNLOCK
             if (status != GNI_RC_SUCCESS)
+            {
+                CMI_GNI_UNLOCK
                 break;
+            }
 #if PRINT_SYH
             printf("[%d] from %d request for Large msg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
 #endif
@@ -1746,6 +1769,8 @@ static void PumpNetworkSmsg()
                 msg_nbytes = CmiGetMsgSize(header);
                 msg_data    = CmiAlloc(msg_nbytes);
                 memcpy(msg_data, (char*)header, msg_nbytes);
+                GNI_SmsgRelease(ep_hndl_array[inst_id]);
+                CMI_GNI_UNLOCK
 #if CMK_SMP_TRACE_COMMTHREAD
                 TRACE_COMM_CREATION(CpvAccess(projTraceStart), msg_data);
 #endif
@@ -1762,13 +1787,19 @@ static void PumpNetworkSmsg()
 #endif
             case LMSG_INIT_TAG:
             {
-                getLargeMsgRequest(header, inst_id);
+                MallocControlMsg(control_msg_tmp);
+                memcpy(control_msg_tmp, header, sizeof(CONTROL_MSG));
+                GNI_SmsgRelease(ep_hndl_array[inst_id]);
+                CMI_GNI_UNLOCK
+                getLargeMsgRequest(control_msg_tmp, inst_id);
                 break;
             }
             case ACK_TAG:   //msg fit into mempool
             {
                 /* Get is done, release message . Now put is not used yet*/
                 void *msg = (void*)(((ACK_MSG *)header)->source_addr);
+                GNI_SmsgRelease(ep_hndl_array[inst_id]);
+                CMI_GNI_UNLOCK
 #if ! USE_LRTS_MEMPOOL
                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
 #else
@@ -1782,7 +1813,10 @@ static void PumpNetworkSmsg()
             }
             case BIG_MSG_TAG:  //big msg, de-register, transfer next seg
             {
-                header_tmp = (CONTROL_MSG *) header;
+                MallocControlMsg(header_tmp);
+                memcpy(header_tmp, header, sizeof(CONTROL_MSG));
+                GNI_SmsgRelease(ep_hndl_array[inst_id]);
+                CMI_GNI_UNLOCK
                 void *msg = (void*)(header_tmp->source_addr);
                 int cur_seq = CmiGetMsgSeq(msg);
                 int offset = ONE_SEG*(cur_seq+1);
@@ -1820,6 +1854,8 @@ static void PumpNetworkSmsg()
             case PUT_DONE_TAG: //persistent message
                 void *msg = (void *)((CONTROL_MSG *) header)->source_addr;
                 int size = ((CONTROL_MSG *) header)->length;
+                GNI_SmsgRelease(ep_hndl_array[inst_id]);
+                CMI_GNI_UNLOCK
                 CmiReference(msg);
                 handleOneRecvedMsg(size, msg); 
                 break;
@@ -1829,12 +1865,16 @@ static void PumpNetworkSmsg()
                 //create a trigger message
                 direct_msg = (cmidirectMsg*)CmiAlloc(sizeof(cmidirectMsg));
                 direct_msg->handler = ((CMK_DIRECT_HEADER*)header)->handler_addr;
+                GNI_SmsgRelease(ep_hndl_array[inst_id]);
+                CMI_GNI_UNLOCK
                 CmiSetHandler(direct_msg, CpvAccess(CmiHandleDirectIdx));
                 CmiPushPE(((CmiDirectUserHandle*)direct_msg->handler)->remoteRank, direct_msg);
                 //(*(((CMK_DIRECT_HEADER*) header)->callbackFnPtr))(((CMK_DIRECT_HEADER*) header)->callbackData);
                 break;
 #endif
             default: {
+                GNI_SmsgRelease(ep_hndl_array[inst_id]);
+                CMI_GNI_UNLOCK
                 printf("weird tag problem\n");
                 CmiAbort("Unknown tag\n");
                      }
@@ -1842,9 +1882,6 @@ static void PumpNetworkSmsg()
 #if PRINT_SYH
             printf("[%d] from %d after switch request for Large msg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
 #endif
-            CMI_GNI_LOCK
-            GNI_SmsgRelease(ep_hndl_array[inst_id]);
-            CMI_GNI_UNLOCK
             smsg_recv_count ++;
             msg_tag = GNI_SMSG_ANY_TAG;
         } //endwhile getNext
@@ -2206,7 +2243,9 @@ static void  SendRdmaMsg()
     int len = PCQueueLength(sendRdmaBuf);
     for (i=0; i<len; i++)
     {
+        CMI_PCQUEUEPOP_LOCK(sendRdmaBuf)
         ptr = (RDMA_REQUEST*)PCQueuePop(sendRdmaBuf);
+        CMI_PCQUEUEPOP_UNLOCK(sendRdmaBuf)
         if (ptr == NULL) break;
 #else
     ptr = sendRdmaBuf;
@@ -2317,7 +2356,10 @@ static int SendBufferMsg(SMSG_QUEUE *queue)
         int i, len = PCQueueLength(queue->sendMsgBuf);
         for (i=0; i<len; i++) 
         {
+            CMI_PCQUEUEPOP_LOCK(queue->sendMsgBuf)
             ptr = (MSG_LIST*)PCQueuePop(queue->sendMsgBuf);
+            CMI_PCQUEUEPOP_UNLOCK(queue->sendMsgBuf)
+            if(ptr == NULL) break;
             if (destpe_avail[ptr->destNode] == 1) {       /* can't send to this pe */
                 PCQueuePush(queue->sendMsgBuf, (char*)ptr);
                 continue;
@@ -2327,7 +2369,10 @@ static int SendBufferMsg(SMSG_QUEUE *queue)
     int nonempty = PCQueueLength(nonEmptyQueues);
     for(index =0; index<nonempty; index++)
     {
+        CMI_PCQUEUEPOP_LOCK(nonEmptyQueues)
         MSG_LIST_INDEX *current_list = (MSG_LIST_INDEX *)PCQueuePop(nonEmptyQueues);
+        CMI_PCQUEUEPOP_UNLOCK(nonEmptyQueues)
+        if(current_list == NULL) break; 
         PCQueue current_queue= current_list-> sendSmsgBuf;
         CmiLock(current_list->lock);
         int i, len = PCQueueLength(current_queue);
@@ -2341,7 +2386,9 @@ static int SendBufferMsg(SMSG_QUEUE *queue)
 #endif
         for (i=0; i<len; i++) 
         {
+            CMI_PCQUEUEPOP_LOCK(current_queue)
             ptr = (MSG_LIST*)PCQueuePop(current_queue);
+            CMI_PCQUEUEPOP_UNLOCK(current_queue)
             if (ptr == 0) break;
 #endif
 #else
@@ -3021,7 +3068,7 @@ void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID)
   
 #if MULTI_THREAD_SEND
     /* Currently, we only consider the case that comm. thread will only recv msgs */
-    Cmi_smp_mode_setting = COMM_THREAD_ONLY_RECV;
+    Cmi_smp_mode_setting = COMM_WORK_THREADS_SEND_RECV;
 #endif
 
     env = getenv("CHARM_UGNI_REMOTE_QUEUE_SIZE");
@@ -3092,7 +3139,7 @@ void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID)
     /* create the endpoints. they need to be bound to allow later CQWrites to them */
     ep_hndl_array = (gni_ep_handle_t*)malloc(mysize * sizeof(gni_ep_handle_t));
     _MEMCHECK(ep_hndl_array);
-#if CMK_SMP && !COMM_THREAD_SEND
+#if MULTI_THREAD_SEND 
     tx_cq_lock  = CmiCreateLock();
     rx_cq_lock  = CmiCreateLock();
 #endif
index 3d668248fccfbcb31433a3eb0f50aa466f4bc077..d7356279c2d78fa1e195a7059d6ebb9dd8ed2f04 100644 (file)
@@ -153,6 +153,7 @@ enum MACHINE_SMP_MODE {
     INVALID_MODE,
     COMM_THREAD_SEND_RECV = 0,
     COMM_THREAD_ONLY_RECV, /* work threads will do the send */
+    COMM_WORK_THREADS_SEND_RECV, /* work and comm threads do the both send/recv */
     COMM_THREAD_NOT_EXIST /* work threads will do both send and recv */
 };
 /* The default mode of smp charm runtime */
@@ -672,6 +673,8 @@ if (  MSG_STATISTIC)
                        printf("Charm++> The comm. thread both sends and receives messages\n");
                } else if (Cmi_smp_mode_setting == COMM_THREAD_ONLY_RECV) {
                        printf("Charm++> The comm. thread only receives messages, while work threads send messages\n");
+               } else if (Cmi_smp_mode_setting == COMM_WORK_THREADS_SEND_RECV) {
+                       printf("Charm++> Both  comm. thread and worker thread send and messages\n");
                } else if (Cmi_smp_mode_setting == COMM_THREAD_NOT_EXIST) {
                        printf("Charm++> There's no comm. thread. Work threads both send and receive messages\n");
                } else {