use two indexpool, one dedicated for persistent message to reduce locking in SMP.
authorGengbin Zheng <gzheng@illinois.edu>
Thu, 22 Mar 2012 04:02:56 +0000 (21:02 -0700)
committerGengbin Zheng <gzheng@illinois.edu>
Thu, 22 Mar 2012 04:02:56 +0000 (21:02 -0700)
src/arch/gemini_gni/machine-persistent.c
src/arch/gemini_gni/machine-persistent.h
src/arch/gemini_gni/machine.c
src/arch/util/persist-comm.c

index ff59652084adccd32296970b37fc1575e04a5b3a..4ed95f3d1eea5c158053816460671fdc981a9f0b 100644 (file)
@@ -68,23 +68,24 @@ void LrtsSendPersistentMsg(PersistentHandle h, int destNode, int size, void *m)
 #else
         bufferRdmaMsg(destNode, pd, -1);
 #endif
-#else
+
+#else                      /* non smp */
+
 #if REMOTE_EVENT
         pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
-        int sts = GNI_EpSetEventData(ep_hndl_array[destNode], destNode, ACK_EVENT((int)(size_t)(slot->destHandle)));
+        int sts = GNI_EpSetEventData(ep_hndl_array[destNode], destNode, PERSIST_EVENT((int)(size_t)(slot->destHandle)));
         GNI_RC_CHECK("GNI_EpSetEventData", sts);
 #endif
-
         status = registerMessage((void*)(pd->local_addr), pd->length, pd->cqwrite_value, &pd->local_mem_hndl);
         if (status == GNI_RC_SUCCESS) 
         {
 #if CMK_WITH_STATS
             RDMA_TRY_SEND(pd->type)
 #endif
-         if(pd->type == GNI_POST_RDMA_PUT) 
-            status = GNI_PostRdma(ep_hndl_array[destNode], pd);
-        else
-            status = GNI_PostFma(ep_hndl_array[destNode],  pd);
+            if(pd->type == GNI_POST_RDMA_PUT) 
+                status = GNI_PostRdma(ep_hndl_array[destNode], pd);
+            else
+                status = GNI_PostFma(ep_hndl_array[destNode],  pd);
         }
         else
             status = GNI_RC_ERROR_RESOURCE;
@@ -95,8 +96,8 @@ void LrtsSendPersistentMsg(PersistentHandle h, int destNode, int size, void *m)
 #else
             bufferRdmaMsg(destNode, pd, -1);
 #endif
-        }else
-        {
+        }
+        else {
             GNI_RC_CHECK("AFter posting", status);
 #if  CMK_WITH_STATS
             pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
@@ -104,7 +105,7 @@ void LrtsSendPersistentMsg(PersistentHandle h, int destNode, int size, void *m)
 #endif
         }
 #endif
-    }
+  }
   else {
 #if 1
     if (slot->messageBuf != NULL) {
@@ -292,10 +293,20 @@ void setupRecvSlot(PersistentReceivesTable *slot, int maxBytes)
   }
   slot->sizeMax = maxBytes;
 #if REMOTE_EVENT
-  slot->index = IndexPool_getslot(&ackPool, slot, 2);
+  CmiLock(persistPool.lock);
+  slot->index = IndexPool_getslot(&persistPool, slot, 2);
+  CmiUnlock(persistPool.lock);
 #endif
 }
 
+void clearRecvSlot(PersistentReceivesTable *slot)
+{
+#if REMOTE_EVENT
+  CmiLock(persistPool.lock);
+  IndexPool_freeslot(&persistPool, slot->index);
+  CmiUnlock(persistPool.lock);
+#endif
+}
 
 PersistentHandle getPersistentHandle(PersistentHandle h, int toindex)
 {
@@ -303,7 +314,7 @@ PersistentHandle getPersistentHandle(PersistentHandle h, int toindex)
   if (toindex)
     return (PersistentHandle)(((PersistentReceivesTable*)h)->index);
   else {
-    return (PersistentHandle)GetIndexAddress((int)(size_t)h);
+    return (PersistentHandle)GetIndexAddress(persistPool, (int)(size_t)h);
   }
 #else
   return h;
index 334c379dbbcac370ffc6ae9c300498dca330c9ad..3f8f3cdbda880eedec92e8bb866e5fbff6d9daf3 100644 (file)
@@ -57,6 +57,7 @@ int PumpPersistent();
 void swapSendSlotBuffers(PersistentSendsTable *slot);
 void swapRecvSlotBuffers(PersistentReceivesTable *slot);
 void setupRecvSlot(PersistentReceivesTable *slot, int maxBytes);
+void clearRecvSlot(PersistentReceivesTable *slot);
 
 /*@}*/
 
index ba62d94ee60eac83a7cbf0c2d31c0b6134883adb..84e49b1ad0f19d1d21afe6291c1ed5ae938b0074 100644 (file)
@@ -46,7 +46,7 @@
 #include "cmidirect.h"
 #endif
 
-#define     LARGEPAGE              1
+#define     LARGEPAGE              0
 
 #if CMK_SMP
 #define MULTI_THREAD_SEND          0
@@ -57,7 +57,7 @@
 #define CMK_WORKER_SINGLE_TASK     0
 #endif
 
-#define REMOTE_EVENT               1
+#define REMOTE_EVENT               0
 #define CQWRITE                    0
 
 #define CMI_EXERT_SEND_CAP     0
@@ -644,10 +644,16 @@ CpvDeclare(mempool_type*, mempool);
 #if REMOTE_EVENT
 /* ack pool for remote events */
 
-#define ACK_SHIFT                  19
-#define ACK_EVENT(idx)             (((idx)<<ACK_SHIFT) | myrank)
-#define ACK_GET_RANK(evt)          ((evt) & ((1<<ACK_SHIFT)-1))
-#define ACK_GET_INDEX(evt)         ((evt) >> ACK_SHIFT)
+#define SHIFT                   18
+#define INDEX_MASK              ((1<<(32-SHIFT-1)) - 1)
+#define RANK_MASK               ((1<<SHIFT) - 1)
+#define ACK_EVENT(idx)          ((((idx) & INDEX_MASK)<<SHIFT) | myrank)
+
+#define GET_TYPE(evt)           (((evt) >> 31) & 1)
+#define GET_RANK(evt)           ((evt) & RANK_MASK)
+#define GET_INDEX(evt)          (((evt) >> SHIFT) & INDEX_MASK)
+
+#define PERSIST_EVENT(idx)      (1<<31 | (((idx) & INDEX_MASK)<<SHIFT) | myrank)
 
 struct IndexStruct {
 void *addr;
@@ -663,25 +669,30 @@ typedef struct IndexPool {
 } IndexPool;
 
 static IndexPool  ackPool;
+#if CMK_PERSISTENT_COMM
+static IndexPool  persistPool;
+#endif
 
-
-#define  GetIndexType(s)             (ackPool.indexes[s].type)
-#define  GetIndexAddress(s)          (ackPool.indexes[s].addr)
+#define  GetIndexType(pool, s)             (pool.indexes[s].type)
+#define  GetIndexAddress(pool, s)          (pool.indexes[s].addr)
 
 static void IndexPool_init(IndexPool *pool)
 {
     int i;
-    if ((1<<ACK_SHIFT) < mysize) 
+    if ((1<<SHIFT) < mysize) 
         CmiAbort("Charm++ Error: Remote event's rank field overflow.");
-    pool->size = 2048;
+    pool->size = 1024;
     pool->indexes = (struct IndexStruct *)malloc(pool->size*sizeof(struct IndexStruct));
     for (i=0; i<pool->size-1; i++) {
         pool->indexes[i].next = i+1;
+        pool->indexes[i].type = -1;
     }
     pool->indexes[i].next = -1;
     pool->freehead = 0;
-#if MULTI_THREAD_SEND  || REMOTE_EVENT
+#if MULTI_THREAD_SEND || CMK_PERSISTENT_COMM
     pool->lock  = CmiCreateLock();
+#else
+    pool->lock  = NULL;
 #endif
 }
 
@@ -690,19 +701,20 @@ inline int IndexPool_getslot(IndexPool *pool, void *addr, int type)
 {
     int i;
     int s;
-#if MULTI_THREAD_SEND  || REMOTE_EVENT
+#if MULTI_THREAD_SEND
     CmiLock(pool->lock);
 #endif
     s = pool->freehead;
     if (s == -1) {
         int newsize = pool->size * 2;
         printf("[%d] IndexPool_getslot expand to: %d\n", myrank, newsize);
-        if (newsize > (1<<(32-ACK_SHIFT))) CmiAbort("AckPool too large");
+        if (newsize > (1<<(32-SHIFT-1))) CmiAbort("IndexPool too large");
         struct IndexStruct *old_ackpool = pool->indexes;
         pool->indexes = (struct IndexStruct *)malloc(newsize*sizeof(struct IndexStruct));
         memcpy(pool->indexes, old_ackpool, pool->size*sizeof(struct IndexStruct));
         for (i=pool->size; i<newsize-1; i++) {
             pool->indexes[i].next = i+1;
+            pool->indexes[i].type = -1;
         }
         pool->indexes[i].next = -1;
         pool->freehead = pool->size;
@@ -713,7 +725,7 @@ inline int IndexPool_getslot(IndexPool *pool, void *addr, int type)
     pool->freehead = pool->indexes[s].next;
     pool->indexes[s].addr = addr;
     pool->indexes[s].type = type;
-#if MULTI_THREAD_SEND  || REMOTE_EVENT
+#if MULTI_THREAD_SEND
     CmiUnlock(pool->lock);
 #endif
     return s;
@@ -723,12 +735,13 @@ static
 inline  void IndexPool_freeslot(IndexPool *pool, int s)
 {
     CmiAssert(s>=0 && s<pool->size);
-#if MULTI_THREAD_SEND  || REMOTE_EVENT
+#if MULTI_THREAD_SEND
     CmiLock(pool->lock);
 #endif
     pool->indexes[s].next = pool->freehead;
+    pool->indexes[s].type = -1;
     pool->freehead = s;
-#if MULTI_THREAD_SEND  || REMOTE_EVENT
+#if MULTI_THREAD_SEND
     CmiUnlock(pool->lock);
 #endif
 }
@@ -2095,7 +2108,7 @@ static void PumpNetworkSmsg()
             break;
         inst_id = GNI_CQ_GET_INST_ID(event_data);
 #if REMOTE_EVENT
-        inst_id = ACK_GET_RANK(inst_id);
+        inst_id = GET_RANK(inst_id);      /* important */
 #endif
         // GetEvent returns success but GetNext return not_done. caused by Smsg out-of-order transfer
 #if PRINT_SYH
@@ -2567,27 +2580,23 @@ static void PumpRemoteTransactions()
     gni_cq_entry_t          ev;
     gni_return_t            status;
     void                    *msg;   
-    int                     slot, type, size;
+    int                     inst_id, index, type, size;
 
     while(1) {
         CMI_GNI_LOCK(global_gni_lock)
         status = GNI_CqGetEvent(rdma_rx_cqh, &ev);
         CMI_GNI_UNLOCK(global_gni_lock)
-        if(status != GNI_RC_SUCCESS) {
-            break;
-        }
 
-        slot = GNI_CQ_GET_INST_ID(ev);
-        slot = ACK_GET_INDEX(slot);
-        //slot = GNI_CQ_GET_DATA(ev) & 0xFFFFFFFFL;
+        if(status != GNI_RC_SUCCESS) break;
 
-        //CMI_GNI_LOCK(ackpool_lock);
-        type = GetIndexType(slot);
-        msg = GetIndexAddress(slot);
-        //CMI_GNI_UNLOCK(ackpool_lock);
+        inst_id = GNI_CQ_GET_INST_ID(ev);
+        index = GET_INDEX(inst_id);
+        type = GET_TYPE(inst_id);
 
         switch (type) {
-        case 1:    // ACK
+        case 0:    // ACK
+            CmiAssert(GetIndexType(ackPool, index) == 1);
+            msg = GetIndexAddress(ackPool, index);
             DecreaseMsgInSend(msg);
 #if ! USE_LRTS_MEMPOOL
            // MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
@@ -2597,21 +2606,23 @@ static void PumpRemoteTransactions()
             if(NoMsgInSend(msg))
                 buffered_send_msg -= GetMempoolsize(msg);
             CmiFree(msg);
-            IndexPool_freeslot(&ackPool, slot);
+            IndexPool_freeslot(&ackPool, index);
             break;
 #if CMK_PERSISTENT_COMM
-        case 2:     // PERSISTENT
-            msg = ((PersistentReceivesTable*)msg)->destBuf[0].destAddress;
+        case 1:  {    // PERSISTENT
+            CmiAssert(GetIndexType(persistPool, index) == 2);
+            PersistentReceivesTable *slot = GetIndexAddress(persistPool, index);
+            msg = slot->destBuf[0].destAddress;
             size = CmiGetMsgSize(msg);
             CmiReference(msg);
             CMI_CHECK_CHECKSUM(msg, size);
             handleOneRecvedMsg(size, msg); 
             break;
+            }
 #endif
-        default: {
+        default:
             fprintf(stderr, "[%d] PumpRemoteTransactions: unknown type: %d\n", myrank, type);
             CmiAbort("PumpRemoteTransactions: unknown type");
-            }
         }
     }
     if(status == GNI_RC_ERROR_RESOURCE)
@@ -2841,16 +2852,18 @@ static void  SendRdmaMsg()
             CmiNodeLock lock = (pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) ? rdma_tx_cq_lock:default_tx_cq_lock;
             CMI_GNI_LOCK(lock);
 #if REMOTE_EVENT
-            if( pd->cqwrite_value == 0
-#if CMK_PERSISTENT_COMM
-                || pd->cqwrite_value == PERSIST_SEQ
-#endif
-              )
-            {
+            if( pd->cqwrite_value == 0) {
                 pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
                 int sts = GNI_EpSetEventData(ep_hndl_array[ptr->destNode], ptr->destNode, ACK_EVENT(ptr->ack_index));
                 GNI_RC_CHECK("GNI_EpSetEventData", sts);
             }
+#if CMK_PERSISTENT_COMM
+            else if (pd->cqwrite_value == PERSIST_SEQ) {
+                pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
+                int sts = GNI_EpSetEventData(ep_hndl_array[ptr->destNode], ptr->destNode, PERSIST_EVENT(ptr->ack_index));
+                GNI_RC_CHECK("GNI_EpSetEventData", sts);
+            }
+#endif
 #endif
 #if CMK_WITH_STATS
             RDMA_TRY_SEND(pd->type)
@@ -3763,6 +3776,9 @@ void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID)
 
 #if  REMOTE_EVENT
     IndexPool_init(&ackPool);
+#if CMK_PERSISTENT_COMM
+    IndexPool_init(&persistPool);
+#endif
 #endif
 
 #if CMK_WITH_STATS
index acc57f06bac0da27d446cb31aad398dc3a976a3a..c60b6fb36d387d36e66d40c9791c3490caa8fcd9 100644 (file)
@@ -321,6 +321,8 @@ void persistentDestoryHandler(void *env)
     if (slot->destBuf[i].destAddress) /*elan_CmiStaticFree(slot->messagePtr);*/
       PerFree((char*)slot->destBuf[i].destAddress);
 
+  clearRecvSlot(slot);
+
   CmiFree(slot);
 }