two features:
authorGengbin Zheng <gzheng@illinois.edu>
Wed, 7 Mar 2012 23:27:48 +0000 (17:27 -0600)
committerGengbin Zheng <gzheng@illinois.edu>
Wed, 7 Mar 2012 23:27:48 +0000 (17:27 -0600)
1. use LARGEPAGE with mmap from hugetlbfs
2. piggyback ACK msg with small data
both turn off now

src/arch/gemini_gni/machine.c

index b0729c43a4dfe4979fb8688593b217c1b54bbaa8..d77ec390d8a1f7049f14fad1012054eb663d31c5 100644 (file)
@@ -34,7 +34,6 @@
 #include <malloc.h>
 #include <unistd.h>
 #include <time.h>
-
 #include <gni_pub.h>
 #include <pmi.h>
 
 
 #include "converse.h"
 
+#define     LARGEPAGE           0
+
+#if LARGEPAGE
+#include <hugetlbfs.h>
+#endif
+
 #if CMK_DIRECT
 #include "cmidirect.h"
 #endif
 #define PRINT_SYH  0
 
 #define USE_LRTS_MEMPOOL                  1
+
 #define REMOTE_EVENT                      0
 
 #define CMI_EXERT_SEND_CAP     0
 //#define MULTI_THREAD_SEND 1
 #endif
 
+#if CMK_SMP
+#define PIGGYBACK_ACK                        0
+#endif
+
 // Trace communication thread
 #if CMK_TRACE_ENABLED && CMK_SMP_TRACE_COMMTHREAD
 #define TRACE_THRESHOLD     0.00005
@@ -114,11 +124,10 @@ static int BIG_MSG_PIPELINE = 4;
 static CmiInt8 buffered_send_msg = 0;
 static CmiInt8 register_memory_size = 0;
 
-#define     LARGEPAGE           0
-
 #if LARGEPAGE
 static CmiInt8  MAX_BUFF_SEND  =  100000*oneMB;
 static CmiInt8  MAX_REG_MEM    =  200000*oneMB;
+static CmiInt8 register_count = 0;
 #else
 #if CMK_SMP && COMM_THREAD_SEND 
 static CmiInt8  MAX_BUFF_SEND  =  100*oneMB;
@@ -274,18 +283,15 @@ static int  REMOTE_QUEUE_ENTRIES=20480;
 static int LOCAL_QUEUE_ENTRIES=20480; 
 #endif
 
-#define BIG_MSG_TAG  0x26
-#define PUT_DONE_TAG      0x28
-#define DIRECT_PUT_DONE_TAG      0x29
-#define ACK_TAG           0x30
+#define BIG_MSG_TAG             0x26
+#define PUT_DONE_TAG            0x28
+#define DIRECT_PUT_DONE_TAG     0x29
+#define ACK_TAG                 0x30
 /* SMSG is data message */
 #define SMALL_DATA_TAG          0x31
+#define SMALL_DATA_ACK_TAG      0x32
 /* SMSG is a control message to initialize a BTE */
-#define MEDIUM_HEAD_TAG         0x32
-#define MEDIUM_DATA_TAG         0x33
 #define LMSG_INIT_TAG           0x39 
-#define VERY_LMSG_INIT_TAG      0x40 
-#define VERY_LMSG_TAG           0x41 
 
 #define DEBUG
 #ifdef GNI_RC_CHECK
@@ -299,6 +305,7 @@ static int LOCAL_QUEUE_ENTRIES=20480;
 
 #define ALIGN64(x)       (size_t)((~63)&((x)+63))
 //#define ALIGN4(x)        (size_t)((~3)&((x)+3)) 
+#define ALIGNHUGEPAGE(x)   (size_t)((~(_tlbpagesize-1))&((x)+_tlbpagesize-1))
 
 static int useStaticMSGQ = 0;
 static int useStaticFMA = 0;
@@ -464,7 +471,12 @@ typedef struct smsg_queue
 #endif
 
 SMSG_QUEUE                  smsg_queue;
+#if PIGGYBACK_ACK
+SMSG_QUEUE                  smsg_ack_queue;
+#endif
+#if CMK_USE_OOB
 SMSG_QUEUE                  smsg_oob_queue;
+#endif
 
 #if CMK_SMP
 
@@ -766,6 +778,38 @@ static uint32_t get_cookie(void)
     return cookie;
 }
 
+#if LARGEPAGE
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <sys/mman.h>
+
+void *my_get_huge_pages(size_t size)
+{
+    char filename[512];
+    int fd;
+    mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
+    void *ptr = NULL;
+
+    snprintf(filename, sizeof(filename), "%s/charm_mempool.%d.%d", hugetlbfs_find_path_for_size(_tlbpagesize), getpid(), rand());
+//printf("[%d] my_get_huge_pages: %s %d\n", myrank, filename, size);
+    fd = open(filename, O_RDWR | O_CREAT, mode);
+    if (fd == -1) {
+        CmiAbort("my_get_huge_pages: open filed");
+    }
+    ptr = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_PRIVATE, fd, 0);
+    if (ptr == MAP_FAILED) ptr = NULL;
+    close(fd);
+    unlink(filename);
+    return ptr;
+}
+
+void my_free_huge_pages(void *ptr, int size)
+{
+    munmap(ptr, size);
+}
+
+#endif
+
 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
 /* TODO: add any that are related */
 /* =====End of Definitions of Message-Corruption Related Macros=====*/
@@ -1056,6 +1100,54 @@ static int connect_to(int destNode)
     return 1;
 }
 
+#if PIGGYBACK_ACK
+static void * piggyback_ack(int destNode, int msgsize, int *count)
+{
+    int i;
+    if (PCQueueEmpty(smsg_ack_queue.smsg_msglist_index[destNode].sendSmsgBuf)) return NULL;
+    int len = PCQueueLength(smsg_ack_queue.smsg_msglist_index[destNode].sendSmsgBuf);
+    int piggycount = (SMSG_MAX_MSG - msgsize)/sizeof(uint64_t);
+    if (piggycount > len+1) piggycount = len + 1;
+    if (piggycount <= 5) return NULL;
+    uint64_t * buf = (uint64_t*)CmiTmpAlloc(piggycount * sizeof(uint64_t));
+    CmiAssert(buf != NULL);
+    buf[0] = piggycount-1;
+//printf("[%d] piggyback_ack: %d\n", myrank, piggycount);
+    for (i=0; i<piggycount-1; i++) {
+        MSG_LIST *ptr = (MSG_LIST*)PCQueuePop(smsg_ack_queue.smsg_msglist_index[destNode].sendSmsgBuf);
+        CmiAssert(ptr != NULL);
+        ACK_MSG *msg = ptr->msg;
+        buf[i+1] = msg->source_addr;
+        FreeAckMsg(msg);
+        FreeMsgList(ptr);
+    }
+    *count = piggycount;
+    return buf;
+}
+
+
+static void piggyback_ack_done(int destNode, uint64_t *buf, int done)
+{
+    if (!done)
+    {
+        int i;
+        for (i=0; i<buf[0]; i++) {
+            MSG_LIST *msg_tmp;
+            MallocMsgList(msg_tmp);
+            ACK_MSG  *ack_msg;
+            MallocAckMsg(ack_msg);
+            ack_msg->source_addr = buf[i+1];
+            msg_tmp->size = ACK_MSG_SIZE;
+            msg_tmp->msg = ack_msg;
+            msg_tmp->tag = ACK_TAG;
+            msg_tmp->destNode = destNode;
+            PCQueuePush(smsg_ack_queue.smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
+        }
+    }
+    CmiTmpFree(buf);
+}
+#endif
+
 inline 
 static gni_return_t send_smsg_message(SMSG_QUEUE *queue, int destNode, void *msg, int size, uint8_t tag, int inbuff )
 {
@@ -1086,14 +1178,26 @@ static gni_return_t send_smsg_message(SMSG_QUEUE *queue, int destNode, void *msg
 #else
     if(queue->smsg_msglist_index[destNode].sendSmsgBuf == 0 || inbuff==1)
     {
+#endif
+        uint64_t *buf = NULL;
+        int bufsize = 0;
+#if PIGGYBACK_ACK
+        if (tag == SMALL_DATA_TAG) {
+            int nack = 0;
+            buf = piggyback_ack(destNode, size, &nack);
+            if (buf) {
+                tag = SMALL_DATA_ACK_TAG;
+                bufsize = nack * sizeof(uint64_t);
+            }
+        }
 #endif
         CMI_GNI_LOCK
-        status = GNI_SmsgSendWTag(ep_hndl_array[destNode], 0, 0, msg, size, 0, tag);
+        status = GNI_SmsgSendWTag(ep_hndl_array[destNode], buf, bufsize, msg, size, 0, tag);
         CMI_GNI_UNLOCK
         if(status == GNI_RC_SUCCESS)
         {
 #if CMK_SMP_TRACE_COMMTHREAD
-            if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG)
+            if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG || tag == SMALL_DATA_ACK_TAG)
             { 
                 START_EVENT();
                 if ( tag == SMALL_DATA_TAG)
@@ -1104,11 +1208,16 @@ static gni_return_t send_smsg_message(SMSG_QUEUE *queue, int destNode, void *msg
             }
 #endif
             smsg_send_count ++;
-            return status;
         }else
             status = GNI_RC_ERROR_RESOURCE;
+#if PIGGYBACK_ACK
+        if (buf) {
+            piggyback_ack_done(destNode, buf, status==GNI_RC_SUCCESS);
+            tag = SMALL_DATA_TAG;
+        }
+#endif
     }
-    if(inbuff ==0)
+    if(status != GNI_RC_SUCCESS && inbuff ==0)
         buffer_small_msgs(queue, msg, size, destNode, tag);
     return status;
 }
@@ -1445,6 +1554,9 @@ void LrtsPostNonLocal(){
     if (SendBufferMsg(&smsg_oob_queue) == 1)
 #endif
     SendBufferMsg(&smsg_queue);
+#if PIGGYBACK_ACK
+    SendBufferMsg(&smsg_ack_queue);
+#endif
     SendRdmaMsg();
 #endif
 #endif
@@ -1528,6 +1640,7 @@ static void bufferRdmaMsg(int inst_id, gni_post_descriptor_t *pd)
 #endif
 
 }
+
 static void getLargeMsgRequest(void* header, uint64_t inst_id);
 static void PumpNetworkSmsg()
 {
@@ -1578,6 +1691,25 @@ static void PumpNetworkSmsg()
 #endif
             /* copy msg out and then put into queue (small message) */
             switch (msg_tag) {
+#if PIGGYBACK_ACK
+            case SMALL_DATA_ACK_TAG:
+            {
+                int i;
+                uint64_t *buf = (uint64_t*)header;
+                int piggycount = buf[0];
+//printf("[%d] got piggyback msg: %d\n", myrank, piggycount);
+                for (i=0; i<piggycount; i++) {
+                    void *msg = (void*)(buf[i+1]);
+                    CmiAssert(msg != NULL);
+                    DecreaseMsgInSend(msg);
+                    if(NoMsgInSend(msg))
+                        buffered_send_msg -= GetMempoolsize(msg);
+                    CmiFree(msg);
+                }
+                header = buf + piggycount + 1;
+                msg_nbytes -= (piggycount+1)*sizeof(uint64_t);
+            }
+#endif
             case SMALL_DATA_TAG:
             {
                 START_EVENT();
@@ -1967,8 +2099,12 @@ static void PumpLocalRdmaTransactions()
             else
 #endif
             if (msg_tag == ACK_TAG) {
+#if ! PIGGYBACK_ACK
                 status = send_smsg_message(queue, inst_id, ack_msg, ACK_MSG_SIZE, msg_tag, 0); 
                 if (status == GNI_RC_SUCCESS) FreeAckMsg(ack_msg);
+#else
+                buffer_small_msgs(&smsg_ack_queue, ack_msg, ACK_MSG_SIZE, inst_id, msg_tag);
+#endif
             }
             else {
                 status = send_smsg_message(queue, inst_id, ack_msg_tmp, CONTROL_MSG_SIZE, msg_tag, 0); 
@@ -2310,6 +2446,8 @@ static int SendBufferMsg(SMSG_QUEUE *queue)
 static void ProcessDeadlock();
 void LrtsAdvanceCommunication(int whileidle)
 {
+    static int count = 0;
+    count++;
     /*  Receive Msg first */
 #if CMK_SMP_TRACE_COMMTHREAD
     double startT, endT;
@@ -2352,7 +2490,15 @@ void LrtsAdvanceCommunication(int whileidle)
 #if CMK_USE_OOB
     if (SendBufferMsg(&smsg_oob_queue) == 1)
 #endif
+    {
+#if PIGGYBACK_ACK
+    //if (count%10 == 0) SendBufferMsg(&smsg_ack_queue);
+    if (SendBufferMsg(&smsg_queue) == 1)
+        SendBufferMsg(&smsg_ack_queue);
+#else
     SendBufferMsg(&smsg_queue);
+#endif
+    }
     //MACHSTATE(8, "after SendBufferMsg\n") ; 
 #if CMK_SMP_TRACE_COMMTHREAD
     endT = CmiWallTimer();
@@ -2570,6 +2716,9 @@ static void _init_smsg()
     }
 
     _init_send_queue(&smsg_queue);
+#if PIGGYBACK_ACK
+    _init_send_queue(&smsg_ack_queue);
+#endif
 #if CMK_USE_OOB
     _init_send_queue(&smsg_oob_queue);
 #endif
@@ -2730,6 +2879,9 @@ void *alloc_mempool_block(size_t *size, gni_mem_handle_t *mem_hndl, int expand_f
 
     size_t default_size =  expand_flag? _expand_mem : _mempool_size;
     if (*size < default_size) *size = default_size;
+#if LARGEPAGE
+    *size = _tlbpagesize;
+#endif
     total_mempool_size += *size;
     total_mempool_calls += 1;
 #if   !LARGEPAGE
@@ -2739,7 +2891,12 @@ void *alloc_mempool_block(size_t *size, gni_mem_handle_t *mem_hndl, int expand_f
         CmiAbort("alloc_mempool_block");
     }
 #endif
+#if LARGEPAGE
+    pool = my_get_huge_pages(*size);
+    ret = pool==NULL;
+#else
     ret = posix_memalign(&pool, ALIGNBUF, *size);
+#endif
     if (ret != 0) {
 #if CMK_SMP && STEAL_MEMPOOL
       pool = steal_mempool_block(size, mem_hndl);
@@ -2753,10 +2910,12 @@ void *alloc_mempool_block(size_t *size, gni_mem_handle_t *mem_hndl, int expand_f
     }
 #if LARGEPAGE
     CmiMemLock();
+    register_count++;
     MEMORY_REGISTER(onesided_hnd, nic_hndl, pool, *size, mem_hndl, &omdh, status);
+        printf("[%d, %d] memory reigstration %f G (%lld) ask for %lld\n", myrank, CmiMyRank(), register_memory_size/(1024*1024.0*1024),register_count, *size);
     CmiMemUnlock();
     if(status != GNI_RC_SUCCESS) {
-        printf("[%d, %d] memory reigstration fails %lld ask for %lld\n", myrank, CmiMyRank(), register_memory_size, *size);
+        printf("[%d, %d] memory reigstration %f G (%lld) ask for %lld\n", myrank, CmiMyRank(), register_memory_size/(1024*1024.0*1024),register_count, *size);
 sweep_mempool(CpvAccess(mempool));
     }
     GNI_RC_CHECK("MEMORY_REGISTER", status);
@@ -2773,7 +2932,11 @@ void free_mempool_block(void *ptr, gni_mem_handle_t mem_hndl)
     {
         MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &mem_hndl, &omdh, GetSizeFromBlockHeader(ptr));
     }
+#if LARGEPAGE
+    my_free_huge_pages(ptr, GetSizeFromBlockHeader(ptr));
+#else
     free(ptr);
+#endif
 }
 #endif
 
@@ -3018,7 +3181,7 @@ void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID)
 
 void* LrtsAlloc(int n_bytes, int header)
 {
-    void *ptr;
+    void *ptr = NULL;
 #if 0
     printf("\n[PE:%d]Alloc Lrts for bytes=%d, head=%d %d\n", CmiMyPe(), n_bytes, header, SMSG_MAX_MSG);
 #endif
@@ -3034,12 +3197,16 @@ void* LrtsAlloc(int n_bytes, int header)
         if(n_bytes < BIG_MSG)
         {
             char *res = mempool_malloc(CpvAccess(mempool), ALIGNBUF+n_bytes-sizeof(mempool_header), 1);
-            ptr = res - sizeof(mempool_header) + ALIGNBUF - header;
+            if (res) ptr = res - sizeof(mempool_header) + ALIGNBUF - header;
         }else 
         {
+#if LARGEPAGE
+            n_bytes = ALIGNHUGEPAGE(n_bytes+ALIGNBUF);
+            char *res = my_get_huge_pages(n_bytes);
+#else
             char *res = memalign(ALIGNBUF, n_bytes+ALIGNBUF);
-            ptr = res + ALIGNBUF - header;
-
+#endif
+            if (res) ptr = res + ALIGNBUF - header;
         }
 #else
         n_bytes = ALIGN64(n_bytes);           /* make sure size if 4 aligned */
@@ -3054,22 +3221,29 @@ void  LrtsFree(void *msg)
 {
     int size = SIZEFIELD((char*)msg+sizeof(CmiChunkHeader));
     if (size <= SMSG_MAX_MSG)
-      free(msg);
-    else if(size>=BIG_MSG)
-    {
-        free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
-
-    }else
-    {
-#if     USE_LRTS_MEMPOOL
+        free(msg);
+    else {
+        size = ALIGN64(size);
+        if(size>=BIG_MSG)
+        {
+#if LARGEPAGE
+            int s = ALIGNHUGEPAGE(size+ALIGNBUF);
+            my_free_huge_pages((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF, s);
+#else
+            free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
+#endif
+        }
+        else {
+#if    USE_LRTS_MEMPOOL
 #if CMK_SMP
-        mempool_free_thread((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF + sizeof(mempool_header));
+            mempool_free_thread((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF + sizeof(mempool_header));
 #else
-        mempool_free(CpvAccess(mempool), (char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF + sizeof(mempool_header));
+            mempool_free(CpvAccess(mempool), (char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF + sizeof(mempool_header));
 #endif
 #else
-        free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
+            free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
 #endif
+        }
     }
 }
 
@@ -3077,7 +3251,7 @@ void LrtsExit()
 {
     /* free memory ? */
 #if USE_LRTS_MEMPOOL
-    //printf("FINAL [%d, %d]  register=%lld, send=%lld\n", myrank, CmiMyRank(), register_memory_size, buffered_send_msg); 
+    printf("FINAL [%d, %d]  register=%lld, send=%lld\n", myrank, CmiMyRank(), register_memory_size, buffered_send_msg); 
     mempool_destroy(CpvAccess(mempool));
 #endif
     PMI_Finalize();
@@ -3091,7 +3265,11 @@ void LrtsDrainResources()
 #if CMK_USE_OOB
            !SendBufferMsg(&smsg_oob_queue) ||
 #endif
-           !SendBufferMsg(&smsg_queue))
+           !SendBufferMsg(&smsg_queue) 
+#if PIGGYBACK_ACK
+        || !SendBufferMsg(&smsg_ack_queue)
+#endif
+          )
     {
         if (useDynamicSMSG)
             PumpDatagramConnection();