support REMOTE_EVENT in persistent
authorGengbin Zheng <gzheng@illinois.edu>
Mon, 19 Mar 2012 22:49:40 +0000 (15:49 -0700)
committerGengbin Zheng <gzheng@illinois.edu>
Mon, 19 Mar 2012 22:49:40 +0000 (15:49 -0700)
src/arch/gemini_gni/machine-persistent.c
src/arch/gemini_gni/machine-persistent.h
src/arch/gemini_gni/machine.c
src/arch/util/persist-comm.c

index 06d5c264c6f17170168c9dd0416055c50c2467cb..fcae4b260c030b1de946e46e5e61ca60067056cf 100644 (file)
@@ -64,8 +64,18 @@ void LrtsSendPersistentMsg(PersistentHandle h, int destNode, int size, void *m)
 
         SetMemHndlZero(pd->local_mem_hndl);
 #if CMK_SMP
+#if REMOTE_EVENT
+        bufferRdmaMsg(destNode, pd, (int)(size_t)(slot->destHandle));
+#else
         bufferRdmaMsg(destNode, pd, -1);
+#endif
 #else
+#if REMOTE_EVENT
+        pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
+        int sts = GNI_EpSetEventData(ep_hndl_array[destNode], inst_id, ACK_EVENT((int)(slot->destHandle)));
+        GNI_RC_CHECK("GNI_EpSetEventData", sts);
+#endif
+
         status = registerMessage((void*)(pd->local_addr), pd->length, pd->cqwrite_value, &pd->local_mem_hndl);
         if (status == GNI_RC_SUCCESS) 
         {
@@ -196,10 +206,10 @@ void *PerAlloc(int size)
   char *ptr = (char*)res+sizeof(CmiChunkHeader);
   SIZEFIELD(ptr)=size;
   REFFIELD(ptr)=1;
-#if  CQWRITE
-  MEMORY_REGISTER(onesided_hnd, nic_hndl,  res, size , &MEMHFIELD((char*)res+sizeof(CmiChunkHeader)), &omdh, rdma_rx_cqh, status);
+#if  CQWRITE || CMK_PERSISTENT_COMM
+  MEMORY_REGISTER(onesided_hnd, nic_hndl,  res, size , &MEMHFIELD(ptr), &omdh, rdma_rx_cqh, status);
 #else
-  MEMORY_REGISTER(onesided_hnd, nic_hndl,  res, size , &MEMHFIELD((char*)res+sizeof(CmiChunkHeader)), &omdh, NULL, status);
+  MEMORY_REGISTER(onesided_hnd, nic_hndl,  res, size , &MEMHFIELD(ptr), &omdh, NULL, status);
 #endif
   GNI_RC_CHECK("Mem Register before post", status);
   return ptr;
@@ -218,6 +228,39 @@ void persist_machine_init(void)
 {
 }
 
+void initSendSlot(PersistentSendsTable *slot)
+{
+  int i;
+  slot->used = 0;
+  slot->destPE = -1;
+  slot->sizeMax = 0;
+  slot->destHandle = 0; 
+#if 0
+  for (i=0; i<PERSIST_BUFFERS_NUM; i++) {
+    slot->destAddress[i] = NULL;
+    slot->destSizeAddress[i] = NULL;
+  }
+#endif
+  memset(&slot->destBuf, 0, sizeof(PersistentBuf)*PERSIST_BUFFERS_NUM);
+  slot->messageBuf = 0;
+  slot->messageSize = 0;
+}
+
+void initRecvSlot(PersistentReceivesTable *slot)
+{
+  int i;
+#if 0
+  for (i=0; i<PERSIST_BUFFERS_NUM; i++) {
+    slot->messagePtr[i] = NULL;
+    slot->recvSizePtr[i] = NULL;
+  }
+#endif
+  memset(&slot->destBuf, 0, sizeof(PersistentBuf)*PERSIST_BUFFERS_NUM);
+  slot->sizeMax = 0;
+  slot->index = -1;
+  slot->prev = slot->next = NULL;
+}
+
 void setupRecvSlot(PersistentReceivesTable *slot, int maxBytes)
 {
   int i;
@@ -236,6 +279,17 @@ void setupRecvSlot(PersistentReceivesTable *slot, int maxBytes)
     // FIXME:  assume always succeed
   }
   slot->sizeMax = maxBytes;
+#if REMOTE_EVENT
+  slot->index = IndexPool_getslot(&ackPool, slot->destBuf[0].destAddress, 2);
+#endif
 }
 
 
+PersistentHandle getPersistentHandle(PersistentHandle h)
+{
+#if REMOTE_EVENT
+  return (PersistentHandle)(((PersistentReceivesTable*)h)->index);
+#else
+  return h;
+#endif
+}
index cfe872fda72ad6006a8d7428acf1b55009ee78b1..dc7ce76d87b2bfcf31f2293da4e468e56c902422 100644 (file)
@@ -37,16 +37,18 @@ typedef struct _PersistentReceivesTable {
 #endif
   PersistentBuf     destBuf[PERSIST_BUFFERS_NUM];
   int sizeMax;
+  size_t               index;
   struct _PersistentReceivesTable *prev, *next;
 } PersistentReceivesTable;
 
-extern PersistentReceivesTable *persistentReceivesTableHead;
-extern PersistentReceivesTable *persistentReceivesTableTail;
+CpvExtern(PersistentReceivesTable *, persistentReceivesTableHead);
+CpvExtern(PersistentReceivesTable *, persistentReceivesTableTail);
 
 CpvExtern(PersistentHandle *, phs);
 CpvExtern(int, phsSize);
 CpvExtern(int, curphs);
 
+PersistentHandle getPersistentHandle(PersistentHandle h);
 void *PerAlloc(int size);
 void PerFree(char *msg);
 int PumpPersistent();
index 827517e004db6213fc0035f2b5a3abd2f8d65bc8..3f7c06d86ee13ac66006afc369e004b1e6590444 100644 (file)
@@ -652,6 +652,7 @@ CpvDeclare(mempool_type*, mempool);
 struct IndexStruct {
 void *addr;
 int next;
+int type;     // 1: ACK   2: Persistent
 };
 
 typedef struct IndexPool {
@@ -664,7 +665,8 @@ typedef struct IndexPool {
 static IndexPool  ackPool;
 
 
-#define  GetAckAddress(s)          (ackPool.indexes[s].addr)
+#define  GetIndexType(s)             (ackPool.indexes[s].type)
+#define  GetIndexAddress(s)          (ackPool.indexes[s].addr)
 
 static void IndexPool_init(IndexPool *pool)
 {
@@ -684,7 +686,7 @@ static void IndexPool_init(IndexPool *pool)
 }
 
 static
-inline int IndexPool_getslot(IndexPool *pool, void *addr)
+inline int IndexPool_getslot(IndexPool *pool, void *addr, int type)
 {
     int i;
     int s;
@@ -708,6 +710,7 @@ inline int IndexPool_getslot(IndexPool *pool, void *addr)
     }
     pool->freehead = pool->indexes[s].next;
     pool->indexes[s].addr = addr;
+    pool->indexes[s].type = type;
     CMI_GNI_UNLOCK(pool->lock);
     return s;
 }
@@ -1371,7 +1374,7 @@ static gni_return_t send_smsg_message(SMSG_QUEUE *queue, int destNode, void *msg
         if (tag == LMSG_INIT_TAG) {
             CONTROL_MSG *control_msg_tmp = (CONTROL_MSG*)msg;
             if (control_msg_tmp->seq_id == 0 && control_msg_tmp->ack_index == -1)
-                control_msg_tmp->ack_index = IndexPool_getslot(&ackPool, (void*)control_msg_tmp->source_addr);
+                control_msg_tmp->ack_index = IndexPool_getslot(&ackPool, (void*)control_msg_tmp->source_addr, 1);
         }
         // GNI_EpSetEventData(ep_hndl_array[destNode], destNode, myrank);
 #endif
@@ -2129,9 +2132,9 @@ static gni_return_t  registerMessage(void *msg, int size, int seqno, gni_mem_han
     if (!IsMemHndlZero(*memh)) return GNI_RC_SUCCESS;
 
 #if CMK_PERSISTENT_COMM
-        // persistent message is always registered
-        // BIG_MSG small pieces do not have malloc chunk header
-    if (seqno == PERSIST_SEQ && !IsMemHndlZero(MEMHFIELD(msg))) {
+      // persistent message is always registered
+      // BIG_MSG small pieces do not have malloc chunk header
+    if ((seqno <= 1 || seqno == PERSIST_SEQ) && !IsMemHndlZero(MEMHFIELD(msg))) {
         *memh = MEMHFIELD(msg);
         return GNI_RC_SUCCESS;
     }
@@ -2398,7 +2401,7 @@ static void PumpRemoteTransactions()
     gni_cq_entry_t          ev;
     gni_return_t            status;
     void                    *msg;   
-    int                     slot;
+    int                     slot, type, size;
 
     while(1) {
         CMI_GNI_LOCK(global_gni_lock)
@@ -2413,19 +2416,32 @@ static void PumpRemoteTransactions()
         //slot = GNI_CQ_GET_DATA(ev) & 0xFFFFFFFFL;
 
         //CMI_GNI_LOCK(ackpool_lock);
-        msg = GetAckAddress(slot);
+        type = GetIndexType(slot);
+        msg = GetIndexAddress(slot);
         //CMI_GNI_UNLOCK(ackpool_lock);
 
-        DecreaseMsgInSend(msg);
+        switch (type) {
+        case 1:    // ACK
+            DecreaseMsgInSend(msg);
 #if ! USE_LRTS_MEMPOOL
-       // MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
+           // MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
 #else
-        DecreaseMsgInSend(msg);
+            DecreaseMsgInSend(msg);
 #endif
-        if(NoMsgInSend(msg))
-            buffered_send_msg -= GetMempoolsize(msg);
-        CmiFree(msg);
-        IndexPool_freeslot(&ackPool, slot);
+            if(NoMsgInSend(msg))
+                buffered_send_msg -= GetMempoolsize(msg);
+            CmiFree(msg);
+            IndexPool_freeslot(&ackPool, slot);
+            break;
+        case 2:     // PERSISTENT
+            size = CmiGetMsgSize(msg);
+            CmiReference(msg);
+            CMI_CHECK_CHECKSUM(msg, size);
+            handleOneRecvedMsg(size, msg); 
+            break;
+        default:
+            CmiAbort("PumpRemoteTransactions: unknown type");
+        }
     }
     if(status == GNI_RC_ERROR_RESOURCE)
     {
@@ -2483,16 +2499,19 @@ static void PumpLocalTransactions(gni_cq_handle_t my_tx_cqh, CmiNodeLock my_cq_l
                 }
                 else {
                     CmiFree((void *)tmp_pd->local_addr);
-#if     !CQWRITE
+#if REMOTE_EVENT
+                    FreePostDesc(tmp_pd);
+                    continue;
+#elif CQWRITE
+                    sendCqWrite(inst_id, tmp_pd->remote_addr, tmp_pd->remote_mem_hndl);
+                    FreePostDesc(tmp_pd);
+                    continue;
+#else
                     MallocControlMsg(ack_msg_tmp);
                     ack_msg_tmp->source_addr = tmp_pd->remote_addr;
                     ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
                     ack_msg_tmp->length  = tmp_pd->length;
                     msg_tag = PUT_DONE_TAG;
-#else
-                    sendCqWrite(inst_id, tmp_pd->remote_addr, tmp_pd->remote_mem_hndl);
-                    FreePostDesc(tmp_pd);
-                    continue;
 #endif
                 }
                 break;
@@ -2651,7 +2670,7 @@ static void  SendRdmaMsg()
             CmiNodeLock lock = (pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) ? rdma_tx_cq_lock:default_tx_cq_lock;
             CMI_GNI_LOCK(lock);
 #if REMOTE_EVENT
-            if( pd->cqwrite_value == 0)
+            if( pd->cqwrite_value == 0 || pd->cqwrite_value == PERSIST_SEQ)
             {
                 pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
                 int sts = GNI_EpSetEventData(ep_hndl_array[ptr->destNode], ptr->destNode, ACK_EVENT(ptr->ack_index));
index d320c0f8b198b1d94544c44ffc957a266d938853..766b36ebb7f8e169e4a2935df266f769ff7be2cb 100644 (file)
 
 #include "machine-persistent.h"
 
-#define TABLESIZE  512
-PersistentSendsTable persistentSendsTable[TABLESIZE];
-int persistentSendsTableCount = 0;
-PersistentReceivesTable *persistentReceivesTableHead;
-PersistentReceivesTable *persistentReceivesTableTail;
-int persistentReceivesTableCount = 0;
+CpvDeclare(int, TABLESIZE);
+
+CpvDeclare(PersistentSendsTable *, persistentSendsTable);
+CpvDeclare(int, persistentSendsTableCount);
+CpvDeclare(PersistentReceivesTable *, persistentReceivesTableHead);
+CpvDeclare(PersistentReceivesTable *, persistentReceivesTableTail);
+CpvDeclare(int, persistentReceivesTableCount);
 
 /* Converse message type */
 typedef struct _PersistentRequestMsg {
   char core[CmiMsgHeaderSizeBytes];
   int requestorPE;
   int maxBytes;
-  PersistentHandle sourceHandlerIndex;
+  PersistentHandle sourceHandler;
 } PersistentRequestMsg;
 
 typedef struct _PersistentReqGrantedMsg {
@@ -36,8 +37,8 @@ typedef struct _PersistentReqGrantedMsg {
   void *slotFlagAddress[PERSIST_BUFFERS_NUM];
 */
   PersistentBuf    buf[PERSIST_BUFFERS_NUM];
-  PersistentHandle sourceHandlerIndex;
-  PersistentHandle destHandlerIndex;
+  PersistentHandle sourceHandler;
+  PersistentHandle destHandler;
 } PersistentReqGrantedMsg;
 
 typedef struct _PersistentDestoryMsg {
@@ -58,23 +59,8 @@ CpvDeclare(int, curphs);
      Utilities
 ******************************************************************************/
 
-void initSendSlot(PersistentSendsTable *slot)
-{
-  int i;
-  slot->used = 0;
-  slot->destPE = -1;
-  slot->sizeMax = 0;
-  slot->destHandle = 0; 
-#if 0
-  for (i=0; i<PERSIST_BUFFERS_NUM; i++) {
-    slot->destAddress[i] = NULL;
-    slot->destSizeAddress[i] = NULL;
-  }
-#endif
-  memset(&slot->destBuf, 0, sizeof(PersistentBuf)*PERSIST_BUFFERS_NUM);
-  slot->messageBuf = 0;
-  slot->messageSize = 0;
-}
+extern void initRecvSlot(PersistentReceivesTable *slot);
+extern void initSendSlot(PersistentSendsTable *slot);
 
 void swapSendSlotBuffers(PersistentSendsTable *slot)
 {
@@ -94,20 +80,6 @@ void swapSendSlotBuffers(PersistentSendsTable *slot)
   }
 }
 
-void initRecvSlot(PersistentReceivesTable *slot)
-{
-  int i;
-#if 0
-  for (i=0; i<PERSIST_BUFFERS_NUM; i++) {
-    slot->messagePtr[i] = NULL;
-    slot->recvSizePtr[i] = NULL;
-  }
-#endif
-  memset(&slot->destBuf, 0, sizeof(PersistentBuf)*PERSIST_BUFFERS_NUM);
-  slot->sizeMax = 0;
-  slot->prev = slot->next = NULL;
-}
-
 void swapRecvSlotBuffers(PersistentReceivesTable *slot)
 {
   if (PERSIST_BUFFERS_NUM == 2) {
@@ -129,26 +101,28 @@ void swapRecvSlotBuffers(PersistentReceivesTable *slot)
 PersistentHandle getFreeSendSlot()
 {
   int i;
-  if (persistentSendsTableCount == TABLESIZE) CmiAbort("persistentSendsTable full.\n");
-  persistentSendsTableCount++;
-  for (i=1; i<TABLESIZE; i++)
-    if (persistentSendsTable[i].used == 0) break;
-  return &persistentSendsTable[i];
+  if (CpvAccess(persistentSendsTableCount) == CpvAccess(TABLESIZE)) {
+    CmiAbort("Charm++> too many persistent channels on sender.");
+  }
+  CpvAccess(persistentSendsTableCount)++;
+  for (i=1; i<CpvAccess(TABLESIZE); i++)
+    if (CpvAccess(persistentSendsTable)[i].used == 0) break;
+  return &CpvAccess(persistentSendsTable)[i];
 }
 
 PersistentHandle getFreeRecvSlot()
 {
   PersistentReceivesTable *slot = (PersistentReceivesTable *)CmiAlloc(sizeof(PersistentReceivesTable));
   initRecvSlot(slot);
-  if (persistentReceivesTableHead == NULL) {
-    persistentReceivesTableHead = persistentReceivesTableTail = slot;
+  if (CpvAccess(persistentReceivesTableHead) == NULL) {
+    CpvAccess(persistentReceivesTableHead) = CpvAccess(persistentReceivesTableTail) = slot;
   }
   else {
-    persistentReceivesTableTail->next = slot;
-    slot->prev = persistentReceivesTableTail;
-    persistentReceivesTableTail = slot;
+    CpvAccess(persistentReceivesTableTail)->next = slot;
+    slot->prev = CpvAccess(persistentReceivesTableTail);
+    CpvAccess(persistentReceivesTableTail) = slot;
   }
-  persistentReceivesTableCount++;
+  CpvAccess(persistentReceivesTableCount)++;
   return slot;
 }
 
@@ -185,7 +159,7 @@ PersistentHandle CmiCreatePersistent(int destPE, int maxBytes)
 
   PersistentRequestMsg *msg = (PersistentRequestMsg *)CmiAlloc(sizeof(PersistentRequestMsg));
   msg->maxBytes = maxBytes;
-  msg->sourceHandlerIndex = h;
+  msg->sourceHandler = h;
   msg->requestorPE = CmiMyPe();
 
   CmiSetHandler(msg, persistentRequestHandlerIdx);
@@ -218,8 +192,8 @@ static void persistentRequestHandler(void *env)
 #endif
   }
 
-  gmsg->sourceHandlerIndex = msg->sourceHandlerIndex;
-  gmsg->destHandlerIndex = h;
+  gmsg->sourceHandler = msg->sourceHandler;
+  gmsg->destHandler = getPersistentHandle(h);
 
   CmiSetHandler(gmsg, persistentReqGrantedHandlerIdx);
   CmiSyncSendAndFree(msg->requestorPE,sizeof(PersistentReqGrantedMsg),gmsg);
@@ -232,7 +206,7 @@ static void persistentReqGrantedHandler(void *env)
   int i;
 
   PersistentReqGrantedMsg *msg = (PersistentReqGrantedMsg *)env;
-  PersistentHandle h = msg->sourceHandlerIndex;
+  PersistentHandle h = msg->sourceHandler;
   PersistentSendsTable *slot = (PersistentSendsTable *)h;
 
   /* CmiPrintf("[%d] Persistent handler granted  h:%p\n", CmiMyPe(), h); */
@@ -248,7 +222,7 @@ static void persistentReqGrantedHandler(void *env)
     slot->destBuf[i] = msg->buf[i];
 #endif
   }
-  slot->destHandle = msg->destHandlerIndex;
+  slot->destHandle = msg->destHandler;
 
   if (slot->messageBuf) {
     LrtsSendPersistentMsg(h, CmiNodeOf(slot->destPE), slot->messageSize, slot->messageBuf);
@@ -324,17 +298,17 @@ void persistentDestoryHandler(void *env)
   CmiFree(msg);
   PersistentReceivesTable *slot = (PersistentReceivesTable *)h;
 
-  persistentReceivesTableCount --;
+  CpvAccess(persistentReceivesTableCount) --;
   if (slot->prev) {
     slot->prev->next = slot->next;
   }
   else
-   persistentReceivesTableHead = slot->next;
+    CpvAccess(persistentReceivesTableHead) = slot->next;
   if (slot->next) {
     slot->next->prev = slot->prev;
   }
   else
-    persistentReceivesTableTail = slot->prev;
+    CpvAccess(persistentReceivesTableTail) = slot->prev;
 
   for (i=0; i<PERSIST_BUFFERS_NUM; i++) 
     if (slot->destBuf[i].destAddress) /*elan_CmiStaticFree(slot->messagePtr);*/
@@ -361,21 +335,21 @@ void CmiDestoryPersistent(PersistentHandle h)
   /* free this slot */
   initSendSlot(slot);
 
-  persistentSendsTableCount --;
+  CpvAccess(persistentSendsTableCount) --;
 }
 
 
 void CmiDestoryAllPersistent()
 {
   int i;
-  for (i=0; i<TABLESIZE; i++) {
-    if (persistentSendsTable[i].messageBuf) 
+  for (i=0; i<CpvAccess(TABLESIZE); i++) {
+    if (CpvAccess(persistentSendsTable)[i].messageBuf) 
       CmiPrintf("Warning: CmiDestoryAllPersistent destoried buffered unsend message.\n");
-    initSendSlot(&persistentSendsTable[i]);
+    initSendSlot(&CpvAccess(persistentSendsTable)[i]);
   }
-  persistentSendsTableCount = 0;
+  CpvAccess(persistentSendsTableCount) = 0;
 
-  PersistentReceivesTable *slot = persistentReceivesTableHead;
+  PersistentReceivesTable *slot = CpvAccess(persistentReceivesTableHead);
   while (slot) {
     PersistentReceivesTable *next = slot->next;
     int i;
@@ -387,13 +361,14 @@ void CmiDestoryAllPersistent()
     CmiFree(slot);
     slot = next;
   }
-  persistentReceivesTableHead = persistentReceivesTableTail = NULL;
-  persistentReceivesTableCount = 0;
+  CpvAccess(persistentReceivesTableHead) = CpvAccess(persistentReceivesTableTail) = NULL;
+  CpvAccess(persistentReceivesTableCount) = 0;
 }
 
 void CmiPersistentInit()
 {
   int i;
+
   persistentRequestHandlerIdx = 
        CmiRegisterHandler((CmiHandler)persistentRequestHandler);
   persistentReqGrantedHandlerIdx = 
@@ -401,6 +376,7 @@ void CmiPersistentInit()
   persistentDestoryHandlerIdx = 
        CmiRegisterHandler((CmiHandler)persistentDestoryHandler);
 
   CpvInitialize(PersistentHandle*, phs);
   CpvAccess(phs) = NULL;
   CpvInitialize(int, phsSize);
@@ -409,12 +385,22 @@ void CmiPersistentInit()
 
   persist_machine_init();
 
-  for (i=0; i<TABLESIZE; i++) {
-    initSendSlot(&persistentSendsTable[i]);
+  CpvInitialize(int, TABLESIZE);
+  CpvAccess(TABLESIZE) = 512;
+
+  CpvInitialize(PersistentSendsTable *, persistentSendsTable);
+  CpvAccess(persistentSendsTable) = (PersistentSendsTable *)malloc(CpvAccess(TABLESIZE) * sizeof(PersistentSendsTable));
+  for (i=0; i<CpvAccess(TABLESIZE); i++) {
+    initSendSlot(&CpvAccess(persistentSendsTable)[i]);
   }
-  persistentSendsTableCount = 0;
-  persistentReceivesTableHead = persistentReceivesTableTail = NULL;
-  persistentReceivesTableCount = 0;
+  CpvInitialize(int, persistentSendsTableCount);
+  CpvAccess(persistentSendsTableCount) = 0;
+
+  CpvInitialize(PersistentReceivesTable *, persistentReceivesTableHead);
+  CpvInitialize(PersistentReceivesTable *, persistentReceivesTableTail);
+  CpvAccess(persistentReceivesTableHead) = CpvAccess(persistentReceivesTableTail) = NULL;
+  CpvInitialize(int, persistentReceivesTableCount);
+  CpvAccess(persistentReceivesTableCount) = 0;
 }