split RDMA queue, pooling persistent more often
authorYanhua Sun <yanhuas@jyc1.(none)>
Thu, 5 Apr 2012 17:51:02 +0000 (12:51 -0500)
committerYanhua Sun <yanhuas@jyc1.(none)>
Thu, 5 Apr 2012 17:51:02 +0000 (12:51 -0500)
src/arch/gemini_gni/machine-persistent.c
src/arch/gemini_gni/machine.c

index b2be082f556cf37a270c44e223962ad7d86dcfbf..dd11aa0742648ba0400466526f9707b23333f534 100644 (file)
@@ -71,9 +71,9 @@ void LrtsSendPersistentMsg(PersistentHandle h, int destNode, int size, void *m)
          /* always buffer */
 #if CMK_SMP || 1
 #if REMOTE_EVENT
-        bufferRdmaMsg(destNode, pd, (int)(size_t)(slot->destHandle));
+        bufferRdmaMsg(sendPersistentBuf, destNode, pd, (int)(size_t)(slot->destHandle));
 #else
-        bufferRdmaMsg(destNode, pd, -1);
+        bufferRdmaMsg(sendPersistentBuf, destNode, pd, -1);
 #endif
 
 #else                      /* non smp */
@@ -99,9 +99,9 @@ void LrtsSendPersistentMsg(PersistentHandle h, int destNode, int size, void *m)
         if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
         {
 #if REMOTE_EVENT
-            bufferRdmaMsg(destNode, pd, (int)(size_t)(slot->destHandle));
+            bufferRdmaMsg(sendRdmaBuf, destNode, pd, (int)(size_t)(slot->destHandle));
 #else
-            bufferRdmaMsg(destNode, pd, -1);
+            bufferRdmaMsg(sendRdmaBuf, destNode, pd, -1);
 #endif
         }
         else {
@@ -226,7 +226,7 @@ void *PerAlloc(int size)
   char *ptr;
   size = ALIGN64(size + sizeof(CmiChunkHeader));
   //printf("[%d] PerAlloc %p %p %d. \n", myrank, res, ptr, size);
-  res = mempool_malloc(CpvAccess(mempool), ALIGNBUF+size-sizeof(mempool_header), 1);
+  res = mempool_malloc(CpvAccess(persistent_mempool), ALIGNBUF+size-sizeof(mempool_header), 1);
   if (res) ptr = (char*)res - sizeof(mempool_header) + ALIGNBUF;
   SIZEFIELD(ptr)=size;
   REFFIELD(ptr)= PERSIST_SEQ;
@@ -238,7 +238,7 @@ void PerFree(char *msg)
 #if CMK_SMP
   mempool_free_thread((char*)msg - ALIGNBUF + sizeof(mempool_header));
 #else
-  mempool_free(CpvAccess(mempool), (char*)msg - ALIGNBUF + sizeof(mempool_header));
+  mempool_free(CpvAccess(persistent_mempool), (char*)msg - ALIGNBUF + sizeof(mempool_header));
 #endif
 }
 
index aba88213e8b9d7f31831faf3872c05aa632129a2..eb86aba4418941c00019732f789af304a625e274 100644 (file)
@@ -55,7 +55,7 @@
 #endif
 
 #if MULTI_THREAD_SEND
-#define CMK_WORKER_SINGLE_TASK     0
+#define CMK_WORKER_SINGLE_TASK     1
 #endif
 
 #define REMOTE_EVENT               1
@@ -230,6 +230,11 @@ int         lrts_send_rdma_success = 0;
 
 #if CMK_PERSISTENT_COMM
 #include "machine-persistent.h"
+#define  SEND_PERSISTENT    STATS_SENDRDMAMSG_TIME(SendRdmaMsg(sendPersistentBuf));
+#define  RECV_PERSISTENT    STATS_PUMPREMOTETRANSACTIONS_TIME(PumpRemoteTransactions(persistent_rx_cqh) );
+#else  
+#define  SEND_PERSISTENT   
+#define  RECV_PERSISTENT
 #endif
 
 //#define  USE_ONESIDED 1
@@ -382,6 +387,7 @@ static gni_cq_handle_t       smsg_rx_cqh = NULL;      // smsg send
 static gni_cq_handle_t       default_tx_cqh = NULL;   // bind to endpoint
 static gni_cq_handle_t       rdma_tx_cqh = NULL;      // rdma - local event
 static gni_cq_handle_t       rdma_rx_cqh = NULL;      // mempool - remote event
+static gni_cq_handle_t       persistent_rx_cqh = NULL;      // mempool - remote event
 static gni_ep_handle_t       *ep_hndl_array;
 
 static CmiNodeLock           *ep_lock_array;
@@ -480,6 +486,7 @@ typedef struct  rmda_msg
 #define SMP_LOCKS                       1
 #define ONE_SEND_QUEUE                  0
 PCQueue sendRdmaBuf;
+PCQueue sendPersistentBuf;
 typedef struct  msg_list_index
 {
     PCQueue     sendSmsgBuf;
@@ -493,6 +500,7 @@ char                *destpe_avail;
 #else         /* non-smp */
 
 static RDMA_REQUEST        *sendRdmaBuf = 0;
+static RDMA_REQUEST        *sendPersistentBuf = 0;
 static RDMA_REQUEST        *sendRdmaTail = 0;
 typedef struct  msg_list_index
 {
@@ -659,6 +667,10 @@ static MSG_LIST *buffered_fma_tail = 0;
 
 CpvDeclare(mempool_type*, mempool);
 
+#if CMK_PERSISTENT_COMM
+CpvDeclare(mempool_type*, persistent_mempool);
+#endif
+
 #if REMOTE_EVENT
 /* ack pool for remote events */
 
@@ -1211,14 +1223,18 @@ void CmiMachineProgressImpl() {
 #endif
 
 static int SendBufferMsg(SMSG_QUEUE *queue);
+#if CMK_SMP
+static void SendRdmaMsg(PCQueue );
+#else
 static void SendRdmaMsg();
+#endif 
 static void PumpNetworkSmsg();
 static void PumpLocalTransactions(gni_cq_handle_t tx_cqh, CmiNodeLock cq_lock);
 #if CQWRITE
 static void PumpCqWriteTransactions();
 #endif
 #if REMOTE_EVENT
-static void PumpRemoteTransactions();
+static void PumpRemoteTransactions(gni_cq_handle_t);
 #endif
 
 #if MACHINE_DEBUG_LOG
@@ -2014,6 +2030,7 @@ void LrtsPostCommonInit(int everReturn)
 
 /* this is called by worker thread */
 void LrtsPostNonLocal(){
+#if 1
 #if CMK_SMP_TRACE_COMMTHREAD
     double startT, endT;
 #endif
@@ -2045,7 +2062,7 @@ void LrtsPostNonLocal(){
 #if CMK_WORKER_SINGLE_TASK
     if (CmiMyRank() % 6 == 3)
 #endif
-    PumpRemoteTransactions();         // rdma_rx_cqh
+    PumpRemoteTransactions( rdma_rx_cqh);         // rdma_rx_cqh
 #endif
 
 #if CMK_WORKER_SINGLE_TASK
@@ -2061,7 +2078,11 @@ void LrtsPostNonLocal(){
 #if CMK_WORKER_SINGLE_TASK
     if (CmiMyRank() % 6 == 5)
 #endif
-    SendRdmaMsg();
+#if CMK_SMP
+    STATS_SENDRDMAMSG_TIME(SendRdmaMsg(sendRdmaBuf));
+#else
+    STATS_SENDRDMAMSG_TIME(SendRdmaMsg());
+#endif
 
 #if CMK_SMP_TRACE_COMMTHREAD
     endT = CmiWallTimer();
@@ -2069,6 +2090,7 @@ void LrtsPostNonLocal(){
     traceBeginIdle();
 #endif
 
+#endif
 #endif
 }
 
@@ -2130,8 +2152,9 @@ static void PumpNetworkRdmaMsgs()
 
 }
 
+#if CMK_SMP
 inline 
-static void bufferRdmaMsg(int inst_id, gni_post_descriptor_t *pd, int ack_index)
+static void bufferRdmaMsg(PCQueue bufferqueue, int inst_id, gni_post_descriptor_t *pd, int ack_index)
 {
     RDMA_REQUEST        *rdma_request_msg;
     MallocRdmaRequest(rdma_request_msg);
@@ -2140,20 +2163,31 @@ static void bufferRdmaMsg(int inst_id, gni_post_descriptor_t *pd, int ack_index)
 #if REMOTE_EVENT
     rdma_request_msg->ack_index = ack_index;
 #endif
-#if CMK_SMP
-    PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
+    PCQueuePush(bufferqueue, (char*)rdma_request_msg);
+}
+
 #else
-    if(sendRdmaBuf == 0)
+inline 
+static void bufferRdmaMsg(RDMA_REQUEST *bufferqueue, int inst_id, gni_post_descriptor_t *pd, int ack_index)
+{
+    RDMA_REQUEST        *rdma_request_msg;
+    MallocRdmaRequest(rdma_request_msg);
+    rdma_request_msg->destNode = inst_id;
+    rdma_request_msg->pd = pd;
+#if REMOTE_EVENT
+    rdma_request_msg->ack_index = ack_index;
+#endif
+    if(bufferqueue == 0)
     {
         sendRdmaBuf = sendRdmaTail = rdma_request_msg;
     }else{
         sendRdmaTail->next = rdma_request_msg;
         sendRdmaTail =  rdma_request_msg;
     }
-#endif
-
 }
 
+#endif
+
 static void getLargeMsgRequest(void* header, uint64_t inst_id);
 
 static void PumpNetworkSmsg()
@@ -2549,9 +2583,9 @@ static void getLargeMsgRequest(void* header, uint64_t inst_id )
     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
     {
 #if REMOTE_EVENT
-        bufferRdmaMsg(inst_id, pd, request_msg->ack_index); 
+        bufferRdmaMsg(sendRdmaBuf, inst_id, pd, request_msg->ack_index); 
 #else
-        bufferRdmaMsg(inst_id, pd, -1); 
+        bufferRdmaMsg(sendRdmaBuf, inst_id, pd, -1); 
 #endif
     }else if (status != GNI_RC_SUCCESS) {
         // printf("source: %d pd:(%p,%p)(%p,%p) len:%d local:%x remote:%x\n", (int)inst_id, (pd->local_mem_hndl).qword1, (pd->local_mem_hndl).qword2, (pd->remote_mem_hndl).qword1, (pd->remote_mem_hndl).qword2, pd->length, pd->local_addr, pd->remote_addr);
@@ -2677,7 +2711,7 @@ static void PumpCqWriteTransactions()
 #endif
 
 #if REMOTE_EVENT
-static void PumpRemoteTransactions()
+static void PumpRemoteTransactions( gni_cq_handle_t rx_cqh)
 {
     gni_cq_entry_t          ev;
     gni_return_t            status;
@@ -2687,7 +2721,7 @@ static void PumpRemoteTransactions()
     while(1) {
         CMI_GNI_LOCK(global_gni_lock)
 //        CMI_GNI_LOCK(rdma_tx_cq_lock)
-        status = GNI_CqGetEvent(rdma_rx_cqh, &ev);
+        status = GNI_CqGetEvent(rx_cqh, &ev);
 //        CMI_GNI_UNLOCK(rdma_tx_cq_lock)
         CMI_GNI_UNLOCK(global_gni_lock)
 
@@ -2934,7 +2968,9 @@ static void PumpLocalTransactions(gni_cq_handle_t my_tx_cqh, CmiNodeLock my_cq_l
     }
 }
 
-static void  SendRdmaMsg()
+#if CMK_SMP
+
+static void  SendRdmaMsg( PCQueue sendqueue)
 {
     gni_return_t            status = GNI_RC_SUCCESS;
     gni_mem_handle_t        msg_mem_hndl;
@@ -2943,25 +2979,123 @@ static void  SendRdmaMsg()
     uint64_t                register_size = 0;
     void                    *msg;
     int                     i;
-#if CMK_SMP
-    int len = PCQueueLength(sendRdmaBuf);
+    int len = PCQueueLength(sendqueue);
     for (i=0; i<len; i++)
     {
 #if CMI_EXERT_RECV_RDMA_CAP
         if( RDMA_pending >= RDMA_cap) break;
 #endif
-        CMI_PCQUEUEPOP_LOCK(sendRdmaBuf)
-        ptr = (RDMA_REQUEST*)PCQueuePop(sendRdmaBuf);
-        CMI_PCQUEUEPOP_UNLOCK(sendRdmaBuf)
+        CMI_PCQUEUEPOP_LOCK( sendqueue)
+        ptr = (RDMA_REQUEST*)PCQueuePop(sendqueue);
+        CMI_PCQUEUEPOP_UNLOCK( sendqueue)
         if (ptr == NULL) break;
+        
+        MACHSTATE4(8, "noempty-rdma  %d (%lld,%lld,%d) \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size); 
+        gni_post_descriptor_t *pd = ptr->pd;
+        
+        msg = (void*)(pd->local_addr);
+        status = registerMessage(msg, pd->length, pd->cqwrite_value, &pd->local_mem_hndl);
+        register_size = 0;
+        if(pd->cqwrite_value == 0) {
+            if(NoMsgInRecv(msg))
+                register_size = GetMempoolsize(msg);
+        }
+
+        if(status == GNI_RC_SUCCESS)        //mem register good
+        {
+            int destNode = ptr->destNode;
+            CmiNodeLock lock = (pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) ? rdma_tx_cq_lock:default_tx_cq_lock;
+            CMI_GNI_LOCK(lock);
+#if REMOTE_EVENT
+            if( pd->cqwrite_value == 0) {
+                pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
+                int sts = GNI_EpSetEventData(ep_hndl_array[destNode], destNode, ACK_EVENT(ptr->ack_index));
+                GNI_RC_CHECK("GNI_EpSetEventData", sts);
+            }
+#if CMK_PERSISTENT_COMM
+            else if (pd->cqwrite_value == PERSIST_SEQ) {
+                pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
+                int sts = GNI_EpSetEventData(ep_hndl_array[destNode], destNode, PERSIST_EVENT(ptr->ack_index));
+                GNI_RC_CHECK("GNI_EpSetEventData", sts);
+            }
+#endif
+#endif
+#if CMK_WITH_STATS
+            RDMA_TRY_SEND(pd->type)
+#endif
+#if CMK_SMP_TRACE_COMMTHREAD
+            if(IS_PUT(pd->type))
+            {
+                 START_EVENT();
+                 TRACE_COMM_CREATION(EVENT_TIME(), (void*)pd->local_addr);//based on assumption, post always succeeds on first try
+            }
+#endif
+
+            if(pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) 
+            {
+                status = GNI_PostRdma(ep_hndl_array[destNode], pd);
+            }
+            else
+            {
+                status = GNI_PostFma(ep_hndl_array[destNode],  pd);
+            }
+            CMI_GNI_UNLOCK(lock);
+            
+            if(status == GNI_RC_SUCCESS)    //post good
+            {
+#if CMI_EXERT_RECV_RDMA_CAP
+                RDMA_pending ++;
+#endif
+                if(pd->cqwrite_value == 0)
+                {
+#if CMK_SMP_TRACE_COMMTHREAD 
+                    pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
+#endif
+                    IncreaseMsgInRecv(((void*)(pd->local_addr)));
+                }
+#if  CMK_WITH_STATS
+                pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
+                RDMA_TRANS_INIT(pd->type, pd->sync_flag_addr/1000000.0)
+#endif
+#if MACHINE_DEBUG_LOG
+                buffered_recv_msg += register_size;
+                MACHSTATE(8, "GO request from buffered\n"); 
+#endif
+#if PRINT_SYH
+                printf("[%d] SendRdmaMsg: post succeed. seqno: %x\n", myrank, pd->cqwrite_value);
+#endif
+            }else           // cannot post
+            {
+                PCQueuePush(sendRdmaBuf, (char*)ptr);
+#if PRINT_SYH
+                printf("[%d] SendRdmaMsg: post failed. seqno: %x dest: %d local mhdl: %lld %lld remote mhdl: %lld %lld connect: %d\n", myrank, pd->cqwrite_value, destNode, pd->local_mem_hndl.qword1, pd->local_mem_hndl.qword2, pd->remote_mem_hndl.qword1, pd->remote_mem_hndl.qword2, smsg_connected_flag[destNode]);
+#endif
+                break;
+            }
+        } else          //memory registration fails
+        {
+            PCQueuePush(sendqueue, (char*)ptr);
+        }
+    } //end while
+}
+
+
 #else
+static void  SendRdmaMsg()
+{
+    gni_return_t            status = GNI_RC_SUCCESS;
+    gni_mem_handle_t        msg_mem_hndl;
+    RDMA_REQUEST            *ptr = 0, *tmp_ptr;
+    RDMA_REQUEST            *pre = 0;
+    uint64_t                register_size = 0;
+    void                    *msg;
+    int                     i;
     ptr = sendRdmaBuf;
     while (ptr!=0 )
     {
 #if CMI_EXERT_RECV_RDMA_CAP
          if( RDMA_pending >= RDMA_cap) break;
 #endif
-#endif 
         MACHSTATE4(8, "noempty-rdma  %d (%lld,%lld,%d) \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size); 
         gni_post_descriptor_t *pd = ptr->pd;
         
@@ -2996,14 +3130,6 @@ static void  SendRdmaMsg()
             RDMA_TRY_SEND(pd->type)
 #endif
 #if CMK_SMP_TRACE_COMMTHREAD
-//            int oldpe = -1;
-//            int oldeventid = -1;
-//            START_EVENT();
-//            if(IS_PUT(pd->type))
-//            { 
-//                TRACE_COMM_GET_MSGID((void*)pd->local_addr, &oldpe, &oldeventid);
-//                TRACE_COMM_SET_COMM_MSGID((void*)pd->local_addr);
-//            }
             if(IS_PUT(pd->type))
             {
                  START_EVENT();
@@ -3021,18 +3147,11 @@ static void  SendRdmaMsg()
             }
             CMI_GNI_UNLOCK(lock);
             
-#if CMK_SMP_TRACE_COMMTHREAD
-//            if(IS_PUT(pd->type))
-//            { 
-//                if (oldpe != -1)  TRACE_COMM_SET_MSGID((void*)pd->local_addr, oldpe, oldeventid);
-//            }
-#endif
             if(status == GNI_RC_SUCCESS)    //post good
             {
 #if CMI_EXERT_RECV_RDMA_CAP
                 RDMA_pending ++;
 #endif
-#if !CMK_SMP
                 tmp_ptr = ptr;
                 if(pre != 0) {
                     pre->next = ptr->next;
@@ -3042,15 +3161,10 @@ static void  SendRdmaMsg()
                 }
                 ptr = ptr->next;
                 FreeRdmaRequest(tmp_ptr);
-#endif
                 if(pd->cqwrite_value == 0)
                 {
 #if CMK_SMP_TRACE_COMMTHREAD 
                     pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
-//                    if(IS_PUT(pd->type))
-//                    { 
-//                        TRACE_COMM_CREATION(CpvAccess(projTraceStart), (void*)pd->local_addr);
-//                    }
 #endif
                     IncreaseMsgInRecv(((void*)(pd->local_addr)));
                 }
@@ -3067,12 +3181,8 @@ static void  SendRdmaMsg()
 #endif
             }else           // cannot post
             {
-#if CMK_SMP
-                PCQueuePush(sendRdmaBuf, (char*)ptr);
-#else
                 pre = ptr;
                 ptr = ptr->next;
-#endif
 #if PRINT_SYH
                 printf("[%d] SendRdmaMsg: post failed. seqno: %x dest: %d local mhdl: %lld %lld remote mhdl: %lld %lld connect: %d\n", myrank, pd->cqwrite_value, destNode, pd->local_mem_hndl.qword1, pd->local_mem_hndl.qword2, pd->remote_mem_hndl.qword1, pd->remote_mem_hndl.qword2, smsg_connected_flag[destNode]);
 #endif
@@ -3080,19 +3190,14 @@ static void  SendRdmaMsg()
             }
         } else          //memory registration fails
         {
-#if CMK_SMP
-            PCQueuePush(sendRdmaBuf, (char*)ptr);
-#else
             pre = ptr;
             ptr = ptr->next;
-#endif
         }
     } //end while
-#if ! CMK_SMP
     if(ptr == 0)
         sendRdmaTail = pre;
-#endif
 }
+#endif
 
 static inline gni_return_t _sendOneBufferedSmsg(SMSG_QUEUE *queue, MSG_LIST *ptr)
 {
@@ -3386,6 +3491,7 @@ void LrtsAdvanceCommunication(int whileidle)
 #endif
     }
 
+    // Receiving small messages and persistent
 #if CMK_SMP_TRACE_COMMTHREAD
     startT = CmiWallTimer();
 #endif
@@ -3396,6 +3502,33 @@ void LrtsAdvanceCommunication(int whileidle)
     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpSmsg, startT, endT);
 #endif
 
+#if REMOTE_EVENT
+    RECV_PERSISTENT
+#endif
+    SEND_PERSISTENT
+
+    //sending small messages
+    /* Send buffered Message */
+#if CMK_SMP_TRACE_COMMTHREAD
+    startT = CmiWallTimer();
+#endif
+#if CMK_USE_OOB
+    if (SendBufferMsg(&smsg_oob_queue) == 1)
+#endif
+    {
+        STATS_SEND_SMSGS_TIME(SendBufferMsg(&smsg_queue));
+    }
+#if CMK_SMP_TRACE_COMMTHREAD
+    endT = CmiWallTimer();
+    if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SendBufferSmsg, startT, endT);
+#endif
+
+#if REMOTE_EVENT
+    RECV_PERSISTENT
+#endif
+    SEND_PERSISTENT
+
+    //Pump Get messages or PUT messages
 #if CMK_SMP_TRACE_COMMTHREAD
     startT = CmiWallTimer();
 #endif
@@ -3403,12 +3536,18 @@ void LrtsAdvanceCommunication(int whileidle)
 #if MULTI_THREAD_SEND
     STATS_PUMPLOCALTRANSACTIONS_RDMA_TIME(PumpLocalTransactions(rdma_tx_cqh,  rdma_tx_cq_lock));
 #endif
-    //MACHSTATE(8, "after PumpLocalTransactions\n") ; 
 #if CMK_SMP_TRACE_COMMTHREAD
     endT = CmiWallTimer();
     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpTransaction, startT, endT);
 #endif
 
+#if REMOTE_EVENT
+    RECV_PERSISTENT
+#endif
+    SEND_PERSISTENT
+
+    //Pump Remote event
+
 #if CMK_SMP_TRACE_COMMTHREAD
     startT = CmiWallTimer();
 #endif
@@ -3418,41 +3557,38 @@ void LrtsAdvanceCommunication(int whileidle)
 #endif
 
 #if REMOTE_EVENT
-    STATS_PUMPREMOTETRANSACTIONS_TIME(PumpRemoteTransactions());
+    STATS_PUMPREMOTETRANSACTIONS_TIME(PumpRemoteTransactions( rdma_rx_cqh));
 #endif
 
-    //MACHSTATE(8, "after PumpLocalTransactions\n") ; 
 #if CMK_SMP_TRACE_COMMTHREAD
     endT = CmiWallTimer();
     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpRdmaTransaction, startT, endT);
 #endif
  
+    //Send pending RDMA transactions
+#if REMOTE_EVENT
+    RECV_PERSISTENT
+#endif
+    SEND_PERSISTENT
+
 #if CMK_SMP_TRACE_COMMTHREAD
     startT = CmiWallTimer();
 #endif
+#if CMK_SMP
+    STATS_SENDRDMAMSG_TIME(SendRdmaMsg(sendRdmaBuf));
+#else
     STATS_SENDRDMAMSG_TIME(SendRdmaMsg());
+#endif
     //MACHSTATE(8, "after SendRdmaMsg\n") ; 
 #if CMK_SMP_TRACE_COMMTHREAD
     endT = CmiWallTimer();
     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SendFmaRdmaMsg, startT, endT);
 #endif
 
-    /* Send buffered Message */
-#if CMK_SMP_TRACE_COMMTHREAD
-    startT = CmiWallTimer();
-#endif
-#if CMK_USE_OOB
-    if (SendBufferMsg(&smsg_oob_queue) == 1)
-#endif
-    {
-        STATS_SEND_SMSGS_TIME(SendBufferMsg(&smsg_queue));
-    }
-    //MACHSTATE(8, "after SendBufferMsg\n") ; 
-#if CMK_SMP_TRACE_COMMTHREAD
-    endT = CmiWallTimer();
-    if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SendBufferSmsg, startT, endT);
+#if REMOTE_EVENT
+    RECV_PERSISTENT
 #endif
-
+    SEND_PERSISTENT
 #if CMK_SMP && ! LARGEPAGE
     if (_detected_hang)  ProcessDeadlock();
 #endif
@@ -3681,7 +3817,9 @@ static CmiUInt8 total_mempool_size = 0;
 static CmiUInt8 total_mempool_calls = 0;
 
 #if USE_LRTS_MEMPOOL
-void *alloc_mempool_block(size_t *size, gni_mem_handle_t *mem_hndl, int expand_flag)
+
+#if CMK_PERSISTENT_COMM
+void *alloc_persistent_mempool_block(size_t *size, gni_mem_handle_t *mem_hndl, int expand_flag)
 {
     void *pool;
     int ret;
@@ -3710,10 +3848,58 @@ void *alloc_mempool_block(size_t *size, gni_mem_handle_t *mem_hndl, int expand_f
     ret = posix_memalign(&pool, ALIGNBUF, *size);
 #endif
     if (ret != 0) {
-#if CMK_SMP && STEAL_MEMPOOL
-      pool = steal_mempool_block(size, mem_hndl);
-      if (pool != NULL) return pool;
+      printf("Charm++> can not allocate memory pool of size %.2fMB. \n", 1.0*(*size)/1024/1024);
+      if (ret == ENOMEM)
+        CmiAbort("alloc_mempool_block: out of memory.");
+      else
+        CmiAbort("alloc_mempool_block: posix_memalign failed");
+    }
+#if LARGEPAGE
+    CmiMemLock();
+    register_count++;
+    MEMORY_REGISTER(onesided_hnd, nic_hndl, pool, *size, mem_hndl, &omdh, persistent_rx_cqh, status);
+    CmiMemUnlock();
+    if(status != GNI_RC_SUCCESS) {
+        printf("[%d, %d] memory reigstration %f G (%lld) ask for %lld\n", myrank, CmiMyRank(), register_memory_size/(1024*1024.0*1024),register_count, *size);
+sweep_mempool(CpvAccess(mempool));
+    }
+    GNI_RC_CHECK("MEMORY_REGISTER", status);
+#else
+    SetMemHndlZero((*mem_hndl));
 #endif
+    return pool;
+}
+#endif
+
+void *alloc_mempool_block(size_t *size, gni_mem_handle_t *mem_hndl, int expand_flag)
+{
+    void *pool;
+    int ret;
+    gni_return_t status = GNI_RC_SUCCESS;
+
+    size_t default_size =  expand_flag? _expand_mem : _mempool_size;
+    if (*size < default_size) *size = default_size;
+#if LARGEPAGE
+    // round up to be multiple of _tlbpagesize
+    //*size = (*size + _tlbpagesize - 1)/_tlbpagesize*_tlbpagesize;
+    *size = ALIGNHUGEPAGE(*size);
+#endif
+    total_mempool_size += *size;
+    total_mempool_calls += 1;
+#if   !LARGEPAGE
+    if ((*size > MAX_REG_MEM || *size > MAX_BUFF_SEND) && expand_flag) 
+    {
+        printf("Error: A mempool block with size %lld is allocated, which is greater than the maximum mempool allowed.\n Please increase the max pool size by using +gni-mempool-max or set enviorment variable CHARM_UGNI_MEMPOOL_MAX. (current=%lld, %lld)\n", *size, MAX_REG_MEM, MAX_BUFF_SEND);
+        CmiAbort("alloc_mempool_block");
+    }
+#endif
+#if LARGEPAGE
+    pool = my_get_huge_pages(*size);
+    ret = pool==NULL;
+#else
+    ret = posix_memalign(&pool, ALIGNBUF, *size);
+#endif
+    if (ret != 0) {
       printf("Charm++> can not allocate memory pool of size %.2fMB. \n", 1.0*(*size)/1024/1024);
       if (ret == ENOMEM)
         CmiAbort("alloc_mempool_block: out of memory.");
@@ -3755,6 +3941,10 @@ void LrtsPreCommonInit(int everReturn){
 #if USE_LRTS_MEMPOOL
     CpvInitialize(mempool_type*, mempool);
     CpvAccess(mempool) = mempool_init(_mempool_size, alloc_mempool_block, free_mempool_block, _mempool_size_limit);
+#if CMK_PERSISTENT_COMM
+    CpvInitialize(mempool_type*, persistent_mempool);
+    CpvAccess(persistent_mempool) = mempool_init(_mempool_size, alloc_mempool_block, free_mempool_block, _mempool_size_limit);
+#endif
     MACHSTATE2(8, "mempool_init %d %p\n", CmiMyRank(), CpvAccess(mempool)) ; 
 #endif
 }
@@ -3875,7 +4065,11 @@ void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID)
     
     status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_rx_cqh);
     GNI_RC_CHECK("Create Post CQ (rx)", status);
-    
+   
+#if CMK_PERSISTENT_COMM
+    status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &persistent_rx_cqh);
+    GNI_RC_CHECK("Create Post CQ (rx)", status);
+#endif
     //status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_cqh);
     //GNI_RC_CHECK("Create BTE CQ", status);
 
@@ -4022,8 +4216,10 @@ void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID)
 
 #if CMK_SMP
     sendRdmaBuf = PCQueueCreate();
+    sendPersistentBuf = PCQueueCreate();
 #else
     sendRdmaBuf = 0;
+    sendPersistentBuf = 0;
 #endif
 
 #if MACHINE_DEBUG_LOG
@@ -4153,9 +4349,14 @@ void LrtsDrainResources()
         PumpLocalTransactions(rdma_tx_cqh, rdma_tx_cq_lock);
 #endif
 #if REMOTE_EVENT
-        PumpRemoteTransactions();
+        PumpRemoteTransactions(rdma_rx_cqh);
 #endif
+#if CMK_SMP
+        SendRdmaMsg(sendRdmaBuf);
+        SendRdmaMsg(sendPersistentBuf);
+#else
         SendRdmaMsg();
+#endif
     }
     PMI_Barrier();
 }