make send persistent table link list (instead of fixed size table)
authorGengbin Zheng <gzheng@illinois.edu>
Sat, 24 Mar 2012 18:26:41 +0000 (13:26 -0500)
committerGengbin Zheng <gzheng@illinois.edu>
Sat, 24 Mar 2012 18:26:41 +0000 (13:26 -0500)
dynamically determine SHIFT
fixed a race condition in PumpRemote for persistent

src/arch/gemini_gni/machine-persistent.c
src/arch/gemini_gni/machine-persistent.h
src/arch/gemini_gni/machine.c
src/arch/util/persist-comm.c

index c2ff67c2c567148df9525816619d3a2d11489885..3535e0fb3b12c3ff409845ce6140e5240f2f0eb5 100644 (file)
@@ -25,7 +25,6 @@ void LrtsSendPersistentMsg(PersistentHandle h, int destNode, int size, void *m)
     
     PersistentSendsTable *slot = (PersistentSendsTable *)h;
     if (h==NULL) CmiAbort("LrtsSendPersistentMsg: not a valid PersistentHandle");
-    CmiAssert(slot->used == 1);
     CmiAssert(CmiNodeOf(slot->destPE) == destNode);
     if (size > slot->sizeMax) {
         CmiPrintf("size: %d sizeMax: %d mype=%d destPe=%d\n", size, slot->sizeMax, CmiMyPe(), destNode);
@@ -242,7 +241,6 @@ void persist_machine_init(void)
 void initSendSlot(PersistentSendsTable *slot)
 {
   int i;
-  slot->used = 0;
   slot->destPE = -1;
   slot->sizeMax = 0;
   slot->destHandle = 0; 
@@ -255,6 +253,7 @@ void initSendSlot(PersistentSendsTable *slot)
   memset(&slot->destBuf, 0, sizeof(PersistentBuf)*PERSIST_BUFFERS_NUM);
   slot->messageBuf = 0;
   slot->messageSize = 0;
+  slot->prev = slot->next = NULL;
 }
 
 void initRecvSlot(PersistentReceivesTable *slot)
index 3f8f3cdbda880eedec92e8bb866e5fbff6d9daf3..d0ae570351fd7840d56508d6e06224ddbbd2b20d 100644 (file)
@@ -29,7 +29,7 @@ typedef struct _PersistentSendsTable {
   PersistentBuf     destBuf[PERSIST_BUFFERS_NUM];
   void *messageBuf;
   int messageSize;
-  char used;
+  struct _PersistentSendsTable *prev, *next;
 } PersistentSendsTable;
 
 typedef struct _PersistentReceivesTable {
index b7ae02db3fb95743896eab8fbfcde725c093a3ba..ade054c44b45ae934442a82ae9b409ab131e5857 100644 (file)
@@ -651,7 +651,7 @@ CpvDeclare(mempool_type*, mempool);
 #if REMOTE_EVENT
 /* ack pool for remote events */
 
-#define SHIFT                   18
+static int  SHIFT   =           18;
 #define INDEX_MASK              ((1<<(32-SHIFT-1)) - 1)
 #define RANK_MASK               ((1<<SHIFT) - 1)
 #define ACK_EVENT(idx)          ((((idx) & INDEX_MASK)<<SHIFT) | myrank)
@@ -660,7 +660,13 @@ CpvDeclare(mempool_type*, mempool);
 #define GET_RANK(evt)           ((evt) & RANK_MASK)
 #define GET_INDEX(evt)          (((evt) >> SHIFT) & INDEX_MASK)
 
-#define PERSIST_EVENT(idx)      (1<<31 | (((idx) & INDEX_MASK)<<SHIFT) | myrank)
+#define PERSIST_EVENT(idx)      ( (1<<31) | (((idx) & INDEX_MASK)<<SHIFT) | myrank)
+
+#if CMK_SMP
+#define INIT_SIZE                4096
+#else
+#define INIT_SIZE                1024
+#endif
 
 struct IndexStruct {
 void *addr;
@@ -688,11 +694,12 @@ static void IndexPool_init(IndexPool *pool)
     int i;
     if ((1<<SHIFT) < mysize) 
         CmiAbort("Charm++ Error: Remote event's rank field overflow.");
-    pool->size = 1024;
+    pool->size = INIT_SIZE;
+    if ( (1<<(31-SHIFT)) < pool->size) CmiAbort("IndexPool_init: pool initial size is too big.");
     pool->indexes = (struct IndexStruct *)malloc(pool->size*sizeof(struct IndexStruct));
     for (i=0; i<pool->size-1; i++) {
         pool->indexes[i].next = i+1;
-        pool->indexes[i].type = -1;
+        pool->indexes[i].type = 0;
     }
     pool->indexes[i].next = -1;
     pool->freehead = 0;
@@ -714,16 +721,17 @@ inline int IndexPool_getslot(IndexPool *pool, void *addr, int type)
     s = pool->freehead;
     if (s == -1) {
         int newsize = pool->size * 2;
-        printf("[%d] IndexPool_getslot expand to: %d\n", myrank, newsize);
+        printf("[%d] IndexPool_getslot %p expand to: %d\n", myrank, pool, newsize);
         if (newsize > (1<<(32-SHIFT-1))) CmiAbort("IndexPool too large");
         struct IndexStruct *old_ackpool = pool->indexes;
         pool->indexes = (struct IndexStruct *)malloc(newsize*sizeof(struct IndexStruct));
         memcpy(pool->indexes, old_ackpool, pool->size*sizeof(struct IndexStruct));
         for (i=pool->size; i<newsize-1; i++) {
             pool->indexes[i].next = i+1;
-            pool->indexes[i].type = -1;
+            pool->indexes[i].type = 0;
         }
         pool->indexes[i].next = -1;
+        pool->indexes[i].type = 0;
         pool->freehead = pool->size;
         s = pool->size;
         pool->size = newsize;
@@ -731,6 +739,7 @@ inline int IndexPool_getslot(IndexPool *pool, void *addr, int type)
     }
     pool->freehead = pool->indexes[s].next;
     pool->indexes[s].addr = addr;
+    CmiAssert(pool->indexes[s].type == 0 && (type == 1 || type == 2));
     pool->indexes[s].type = type;
 #if MULTI_THREAD_SEND
     CmiUnlock(pool->lock);
@@ -746,7 +755,7 @@ inline  void IndexPool_freeslot(IndexPool *pool, int s)
     CmiLock(pool->lock);
 #endif
     pool->indexes[s].next = pool->freehead;
-    pool->indexes[s].type = -1;
+    pool->indexes[s].type = 0;
     pool->freehead = s;
 #if MULTI_THREAD_SEND
     CmiUnlock(pool->lock);
@@ -2645,10 +2654,11 @@ static void PumpRemoteTransactions()
             break;
 #if CMK_PERSISTENT_COMM
         case 1:  {    // PERSISTENT
-            CmiAssert(index>=0 && index<persistPool.size);
+            CmiLock(persistPool.lock);
             CmiAssert(GetIndexType(persistPool, index) == 2);
-            START_EVENT();
             PersistentReceivesTable *slot = GetIndexAddress(persistPool, index);
+            CmiUnlock(persistPool.lock);
+            START_EVENT();
             msg = slot->destBuf[0].destAddress;
             size = CmiGetMsgSize(msg);
             CmiReference(msg);
@@ -3842,6 +3852,10 @@ void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID)
 #if CMK_PERSISTENT_COMM
     IndexPool_init(&persistPool);
 #endif
+    SHIFT = 1;
+    while (1<<SHIFT < mysize) SHIFT++;
+    CmiAssert(SHIFT < 31);
+printf("SHIFT: %d\n", SHIFT);
 #endif
 
 #if CMK_WITH_STATS
index c60b6fb36d387d36e66d40c9791c3490caa8fcd9..57643801a9a2f9a6f62d9d35526e3b0f6256d9d2 100644 (file)
@@ -14,9 +14,8 @@
 
 #include "machine-persistent.h"
 
-CpvDeclare(int, TABLESIZE);
-
-CpvDeclare(PersistentSendsTable *, persistentSendsTable);
+CpvDeclare(PersistentSendsTable *, persistentSendsTableHead);
+CpvDeclare(PersistentSendsTable *, persistentSendsTableTail);
 CpvDeclare(int, persistentSendsTableCount);
 CpvDeclare(PersistentReceivesTable *, persistentReceivesTableHead);
 CpvDeclare(PersistentReceivesTable *, persistentReceivesTableTail);
@@ -100,19 +99,23 @@ void swapRecvSlotBuffers(PersistentReceivesTable *slot)
 
 PersistentHandle getFreeSendSlot()
 {
-  int i;
-  if (CpvAccess(persistentSendsTableCount) == CpvAccess(TABLESIZE)) {
-    CmiAbort("Charm++> too many persistent channels on sender.");
+  PersistentSendsTable *slot = (PersistentSendsTable *)malloc(sizeof(PersistentSendsTable));
+  initSendSlot(slot);
+  if (CpvAccess(persistentSendsTableHead) == NULL) {
+    CpvAccess(persistentSendsTableHead) = CpvAccess(persistentSendsTableTail) = slot;
+  }
+  else {
+    CpvAccess(persistentSendsTableTail)->next = slot;
+    slot->prev = CpvAccess(persistentSendsTableTail);
+    CpvAccess(persistentSendsTableTail) = slot;
   }
   CpvAccess(persistentSendsTableCount)++;
-  for (i=1; i<CpvAccess(TABLESIZE); i++)
-    if (CpvAccess(persistentSendsTable)[i].used == 0) break;
-  return &CpvAccess(persistentSendsTable)[i];
+  return slot;
 }
 
 PersistentHandle getFreeRecvSlot()
 {
-  PersistentReceivesTable *slot = (PersistentReceivesTable *)CmiAlloc(sizeof(PersistentReceivesTable));
+  PersistentReceivesTable *slot = (PersistentReceivesTable *)malloc(sizeof(PersistentReceivesTable));
   initRecvSlot(slot);
   if (CpvAccess(persistentReceivesTableHead) == NULL) {
     CpvAccess(persistentReceivesTableHead) = CpvAccess(persistentReceivesTableTail) = slot;
@@ -160,7 +163,6 @@ PersistentHandle CmiCreatePersistent(int destPE, int maxBytes)
   h = getFreeSendSlot();
   slot = (PersistentSendsTable *)h;
 
-  slot->used = 1;
   slot->destPE = destPE;
   slot->sizeMax = maxBytes;
 
@@ -218,9 +220,6 @@ static void persistentReqGrantedHandler(void *env)
 
   /* CmiPrintf("[%d] Persistent handler granted  h:%p\n", CmiMyPe(), h); */
 
-  CmiAssert(slot->used == 1);
-
-
   for (i=0; i<PERSIST_BUFFERS_NUM; i++) {
 #if 0
     slot->destAddress[i] = msg->msgAddr[i];
@@ -275,7 +274,6 @@ PersistentHandle CmiRegisterReceivePersistent(PersistentReq recvHand)
   PersistentHandle h = getFreeSendSlot();
 
   PersistentSendsTable *slot = (PersistentSendsTable *)h;
-  slot->used = 1;
   slot->destPE = recvHand.pe;
   slot->sizeMax = recvHand.maxBytes;
 
@@ -323,7 +321,7 @@ void persistentDestoryHandler(void *env)
 
   clearRecvSlot(slot);
 
-  CmiFree(slot);
+  free(slot);
 }
 
 /* FIXME: need to buffer until ReqGranted message come back? */
@@ -342,7 +340,17 @@ void CmiDestoryPersistent(PersistentHandle h)
   CmiSyncSendAndFree(slot->destPE,sizeof(PersistentDestoryMsg),msg);
 
   /* free this slot */
-  initSendSlot(slot);
+  if (slot->prev) {
+    slot->prev->next = slot->next;
+  }
+  else
+    CpvAccess(persistentSendsTableHead) = slot->next;
+  if (slot->next) {
+    slot->next->prev = slot->prev;
+  }
+  else
+    CpvAccess(persistentSendsTableTail) = slot->prev;
+  free(slot);
 
   CpvAccess(persistentSendsTableCount) --;
 }
@@ -350,12 +358,13 @@ void CmiDestoryPersistent(PersistentHandle h)
 
 void CmiDestoryAllPersistent()
 {
-  int i;
-  for (i=0; i<CpvAccess(TABLESIZE); i++) {
-    if (CpvAccess(persistentSendsTable)[i].messageBuf) 
-      CmiPrintf("Warning: CmiDestoryAllPersistent destoried buffered unsend message.\n");
-    initSendSlot(&CpvAccess(persistentSendsTable)[i]);
+  PersistentSendsTable *sendslot = CpvAccess(persistentSendsTableHead);
+  while (sendslot) {
+    PersistentSendsTable *next = sendslot->next;
+    free(sendslot);
+    sendslot = next;
   }
+  CpvAccess(persistentSendsTableHead) = CpvAccess(persistentSendsTableTail) = NULL;
   CpvAccess(persistentSendsTableCount) = 0;
 
   PersistentReceivesTable *slot = CpvAccess(persistentReceivesTableHead);
@@ -367,7 +376,7 @@ void CmiDestoryAllPersistent()
         CmiPrintf("Warning: CmiDestoryAllPersistent destoried buffered undelivered message.\n");
       if (slot->destBuf[i].destAddress) PerFree((char*)slot->destBuf[i].destAddress);
     }
-    CmiFree(slot);
+    free(slot);
     slot = next;
   }
   CpvAccess(persistentReceivesTableHead) = CpvAccess(persistentReceivesTableTail) = NULL;
@@ -394,14 +403,9 @@ void CmiPersistentInit()
 
   persist_machine_init();
 
-  CpvInitialize(int, TABLESIZE);
-  CpvAccess(TABLESIZE) = 512;
-
-  CpvInitialize(PersistentSendsTable *, persistentSendsTable);
-  CpvAccess(persistentSendsTable) = (PersistentSendsTable *)malloc(CpvAccess(TABLESIZE) * sizeof(PersistentSendsTable));
-  for (i=0; i<CpvAccess(TABLESIZE); i++) {
-    initSendSlot(&CpvAccess(persistentSendsTable)[i]);
-  }
+  CpvInitialize(PersistentSendsTable *, persistentSendsTableHead);
+  CpvInitialize(PersistentSendsTable *, persistentSendsTableTail);
+  CpvAccess(persistentSendsTableHead) = CpvAccess(persistentSendsTableTail) = NULL;
   CpvInitialize(int, persistentSendsTableCount);
   CpvAccess(persistentSendsTableCount) = 0;