New code for handling different pools per PE in one node was added.
authorEsteban Meneses <emenese2@illinois.edu>
Mon, 16 Jun 2008 19:39:23 +0000 (19:39 +0000)
committerEsteban Meneses <emenese2@illinois.edu>
Mon, 16 Jun 2008 19:39:23 +0000 (19:39 +0000)
To turn this code in just set flag THREAD_MULTI_POOL to 1.

src/arch/net/machine-ibverbs.c

index ce38c368413aafdcc767261c0ae3a40fe6f7464c..87f6e6f98eccafec80f806eae5f3c55fab04ae71 100644 (file)
@@ -67,9 +67,22 @@ static int processBufferedCount;
 #define WC_LIST_SIZE 32
 /*#define WC_BUFFER_SIZE 100*/
 
+
+
+
 #define INCTOKENS_FRACTION 0.04
 #define INCTOKENS_INCREASE .50
 
+// flag for using a pool for every thread
+#define THREAD_MULTI_POOL 0
+
+#if THREAD_MULTI_POOL 
+#include "pcqueue.h"
+PCQueue **queuePool;
+#endif
+
+#define INFIBARRIERPACKET 128
+
 struct infiIncTokenAckPacket{
        int a;
 };
@@ -114,7 +127,6 @@ Data Structures
 #define INFIDIRECT_REQUEST 16
 #define INFIPACKETCODE_INCTOKENSACK 32
 #define INFIDUMMYPACKET 64
-#define INFIBARRIERPACKET 128
 
 struct infiPacketHeader{
        char code;
@@ -301,6 +313,9 @@ typedef struct infiCmiChunkMetaDataStruct {
        void *nextBuf;
        struct infiCmiChunkHeaderStruct *owner;
        int count;
+#if THREAD_MULTI_POOL
+       int parentPe;                                           // the PE that allocated the buffer and must release it
+#endif
 } infiCmiChunkMetaData;
 
 
@@ -318,8 +333,12 @@ typedef struct {
 #define INFIMAXPERPOOL 100
 #define INFIMULTIPOOL -5
 
-
+#if THREAD_MULTI_POOL
+infiCmiChunkPool **infiCmiChunkPools;
+//TODO Find proper place to dispose the memory acquired by infiCmiChunkPool
+#else
 infiCmiChunkPool infiCmiChunkPools[INFINUMPOOLS];
+#endif
 
 static void initInfiCmiChunkPools();
 
@@ -884,9 +903,6 @@ static void CmiNotifyStillIdle(CmiIdleState *s) {
 #endif
 }
 
-
-
-
 static inline void increaseTokens(OtherNode node);
 
 static inline int pollRecvCq(const int toBuffer);
@@ -1576,9 +1592,10 @@ static inline void processRecvWC(struct ibv_wc *recvWC,const int toBuffer){
                MACHSTATE(3,"Dummy packet");
        }
        if(header->code & INFIBARRIERPACKET){
-               MACHSTATE(3,"Barrier packet");
-               CmiAbort("Should not receive Barrier packet in normal polling loop.  Your Barrier is broken");
-       }
+                MACHSTATE(3,"Barrier packet");
+                CmiAbort("Should not receive Barrier packet in normal polling loop.  Your Barrier is broken");
+        }
+
 #if CMK_IBVERBS_INCTOKENS      
        if(header->code & INFIPACKETCODE_INCTOKENS){
                increasePostedRecvs(nodeNo);
@@ -2002,21 +2019,6 @@ static inline void processAllBufferedMsgs(){
 };
 
 
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
 /*************************
        Increase tokens when short of them
 **********/
@@ -2038,8 +2040,6 @@ static inline void increaseTokens(OtherNode node){
        context->sendCqSize+= increase;
 };
 
-
-
 static void increasePostedRecvs(int nodeNo){
        OtherNode node = &nodes[nodeNo];
        int tokenIncrease = node->infiData->postedRecvs*INCTOKENS_INCREASE;     
@@ -2084,15 +2084,44 @@ static void increasePostedRecvs(int nodeNo){
 */
 
 static void initInfiCmiChunkPools(){
-       int i;
+       int i,j;
        int size = firstBinSize;
-       
+
+       //printf("Hello %d of %d\n",CmiMyPe(),CmiNumPes());
+
+#if THREAD_MULTI_POOL
+       infiCmiChunkPools = malloc(sizeof(infiCmiChunkPool *) * CmiNumPes());
+       for(i = 0; i < CmiNumPes(); i++){
+               infiCmiChunkPools[i] = malloc(sizeof(infiCmiChunkPool) * INFINUMPOOLS);
+       }
+       for(j = 0; j < CmiNumPes(); j++){
+               size = firstBinSize;
+               for(i=0;i<INFINUMPOOLS;i++){
+                       infiCmiChunkPools[j][i].size = size;
+                       infiCmiChunkPools[j][i].startBuf = NULL;
+                       infiCmiChunkPools[j][i].count = 0;
+                       size *= 2;
+               }
+       }
+
+       // creating the n^2 system of queues
+       queuePool = malloc(sizeof(PCQueue *) * CmiNumPes());
+       for(i = 0; i < CmiNumPes(); i++){
+               queuePool[i] = malloc(sizeof(PCQueue) * CmiNumPes());
+       }
+       for(i = 0; i < CmiNumPes(); i++)
+               for(j = 0; j < CmiNumPes(); j++)
+                       queuePool[i][j] = PCQueueCreate();
+
+#else  
        for(i=0;i<INFINUMPOOLS;i++){
                infiCmiChunkPools[i].size = size;
                infiCmiChunkPools[i].startBuf = NULL;
                infiCmiChunkPools[i].count = 0;
                size *= 2;
        }
+#endif
+
 }
 
 /***********
@@ -2109,7 +2138,8 @@ infiCmiChunkMetaData *registerMultiSendMesg(char *msg,int size){
 };
 
 
-static inline void *getInfiCmiChunk(int dataSize){
+#if THREAD_MULTI_POOL
+static inline void *getInfiCmiChunkThread(int dataSize){
        //find out to which pool this dataSize belongs to
        // poolIdx = floor(log2(dataSize/firstBinSize))+1
        int ratio = dataSize/firstBinSize;
@@ -2121,7 +2151,7 @@ static inline void *getInfiCmiChunk(int dataSize){
                poolIdx++;
        }
        MACHSTATE2(2,"getInfiCmiChunk for size %d in poolIdx %d",dataSize,poolIdx);
-       if((poolIdx < INFINUMPOOLS && infiCmiChunkPools[poolIdx].startBuf == NULL) || poolIdx >= INFINUMPOOLS){
+       if((poolIdx < INFINUMPOOLS && infiCmiChunkPools[CmiMyPe()][poolIdx].startBuf == NULL) || poolIdx >= INFINUMPOOLS){
                infiCmiChunkMetaData *metaData;         
                infiCmiChunkHeader *hdr;
                int allocSize;
@@ -2132,7 +2162,7 @@ static inline void *getInfiCmiChunk(int dataSize){
                
                
                if(poolIdx < INFINUMPOOLS ){
-                       allocSize = infiCmiChunkPools[poolIdx].size;
+                       allocSize = infiCmiChunkPools[CmiMyPe()][poolIdx].size;
                }else{
                        allocSize = dataSize;
                }
@@ -2153,15 +2183,16 @@ static inline void *getInfiCmiChunk(int dataSize){
                        metaData->key = key;
                        metaData->owner = hdr;
                        metaData->poolIdx = poolIdx;
+                       metaData->parentPe = CmiMyPe();                                         // setting the parent PE
 
                        if(i == 0){
                                metaData->owner->metaData->count = count;
                                metaData->nextBuf = NULL;
                        }else{
                                void *startBuf = res - sizeof(infiCmiChunkHeader);
-                               metaData->nextBuf = infiCmiChunkPools[poolIdx].startBuf;
-                               infiCmiChunkPools[poolIdx].startBuf = startBuf;
-                               infiCmiChunkPools[poolIdx].count++;
+                               metaData->nextBuf = infiCmiChunkPools[CmiMyPe()][poolIdx].startBuf;
+                               infiCmiChunkPools[CmiMyPe()][poolIdx].startBuf = startBuf;
+                               infiCmiChunkPools[CmiMyPe()][poolIdx].count++;
                                
                        }
                        if(i != count-1){
@@ -2177,19 +2208,19 @@ static inline void *getInfiCmiChunk(int dataSize){
        if(poolIdx < INFINUMPOOLS){
                infiCmiChunkMetaData *metaData;                         
        
-               res = infiCmiChunkPools[poolIdx].startBuf;
+               res = infiCmiChunkPools[CmiMyPe()][poolIdx].startBuf;
                res += sizeof(infiCmiChunkHeader);
 
                MACHSTATE2(2,"Reusing old pool %d buf %p",poolIdx,res);
                metaData = METADATAFIELD(res);
 
-               infiCmiChunkPools[poolIdx].startBuf = metaData->nextBuf;
-               MACHSTATE2(1,"Pool %d now has startBuf at %p",poolIdx,infiCmiChunkPools[poolIdx].startBuf);
+               infiCmiChunkPools[CmiMyPe()][poolIdx].startBuf = metaData->nextBuf;
+               MACHSTATE2(1,"Pool %d now has startBuf at %p",poolIdx,infiCmiChunkPools[CmiMyPe()][poolIdx].startBuf);
                
                metaData->nextBuf = NULL;
 //             CmiAssert(metaData->poolIdx == poolIdx);
 
-               infiCmiChunkPools[poolIdx].count--;
+               infiCmiChunkPools[CmiMyPe()][poolIdx].count--;
                return res;
        }
 
@@ -2197,12 +2228,106 @@ static inline void *getInfiCmiChunk(int dataSize){
 
        
 };
+#else
+static inline void *getInfiCmiChunk(int dataSize){
+        //find out to which pool this dataSize belongs to
+        // poolIdx = floor(log2(dataSize/firstBinSize))+1
+        int ratio = dataSize/firstBinSize;
+        int poolIdx=0;
+        void *res;
+
+        while(ratio > 0){
+                ratio  = ratio >> 1;
+                poolIdx++;
+        }
+        MACHSTATE2(2,"getInfiCmiChunk for size %d in poolIdx %d",dataSize,poolIdx);
+        if((poolIdx < INFINUMPOOLS && infiCmiChunkPools[poolIdx].startBuf == NULL) || poolIdx >= INFINUMPOOLS){
+                infiCmiChunkMetaData *metaData;
+                infiCmiChunkHeader *hdr;
+                int allocSize;
+                int count=1;
+                int i;
+                struct ibv_mr *key;
+                void *origres;
+
+
+                if(poolIdx < INFINUMPOOLS ){
+                        allocSize = infiCmiChunkPools[poolIdx].size;
+                }else{
+                        allocSize = dataSize;
+                }
+
+                if(poolIdx < blockThreshold){
+                        count = blockAllocRatio;
+                }
+                res = malloc((allocSize+sizeof(infiCmiChunkHeader))*count);
+                hdr = res;
+
+                key = ibv_reg_mr(context->pd,res,(allocSize+sizeof(infiCmiChunkHeader))*count,IBV_ACCESS_REMOTE_READ | IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
+                CmiAssert(key != NULL);
+
+                origres = (res += sizeof(infiCmiChunkHeader));
+
+                for(i=0;i<count;i++){
+                        metaData = METADATAFIELD(res) = malloc(sizeof(infiCmiChunkMetaData));
+                        metaData->key = key;
+                        metaData->owner = hdr;
+                        metaData->poolIdx = poolIdx;
+
+                        if(i == 0){
+                                metaData->owner->metaData->count = count;
+                                metaData->nextBuf = NULL;
+                        }else{
+                                void *startBuf = res - sizeof(infiCmiChunkHeader);
+                                metaData->nextBuf = infiCmiChunkPools[poolIdx].startBuf;
+                                infiCmiChunkPools[poolIdx].startBuf = startBuf;
+                                infiCmiChunkPools[poolIdx].count++;
+
+                        }
+                        if(i != count-1){
+                                res += (allocSize+sizeof(infiCmiChunkHeader));
+                        }
+          }
+
+
+                MACHSTATE3(2,"AllocSize %d buf %p key %p",allocSize,res,metaData->key);
+
+                return origres;
+        }
+        if(poolIdx < INFINUMPOOLS){
+                infiCmiChunkMetaData *metaData;
+
+                res = infiCmiChunkPools[poolIdx].startBuf;
+                res += sizeof(infiCmiChunkHeader);
+
+                MACHSTATE2(2,"Reusing old pool %d buf %p",poolIdx,res);
+                metaData = METADATAFIELD(res);
 
+                infiCmiChunkPools[poolIdx].startBuf = metaData->nextBuf;
+                MACHSTATE2(1,"Pool %d now has startBuf at %p",poolIdx,infiCmiChunkPools[poolIdx].startBuf);
 
+                metaData->nextBuf = NULL;
+//              CmiAssert(metaData->poolIdx == poolIdx);
+
+                infiCmiChunkPools[poolIdx].count--;
+                return res;
+        }
+
+        CmiAssert(0);
+
+
+};
+
+#endif
 
 
 void * infi_CmiAlloc(int size){
        void *res;
+
+#if THREAD_MULTI_POOL
+       res = getInfiCmiChunkThread(size-sizeof(CmiChunkHeader));
+       return res;
+#else
 #if CMK_SMP    
        CmiMemLock();
 #endif
@@ -2217,8 +2342,106 @@ void * infi_CmiAlloc(int size){
                res = malloc(size);
        }*/
        return res;
+#endif
+}
+
+#if THREAD_MULTI_POOL
+void infi_CmiFreeDirect(void *ptr){
+        int size;
+        int parentPe;
+        void *freePtr = ptr;
+
+        ptr += sizeof(CmiChunkHeader);
+        size = SIZEFIELD (ptr);
+/*      if(size > firstBinSize){*/
+        infiCmiChunkMetaData *metaData;
+        int poolIdx;
+        //there is a infiniband specific header
+        freePtr = ptr - sizeof(infiCmiChunkHeader);
+        metaData = METADATAFIELD(ptr);
+        poolIdx = metaData->poolIdx;
+
+        MACHSTATE2(1,"CmiFree buf %p goes back to pool %d",ptr,poolIdx);
+//      CmiAssert(poolIdx >= 0);
+        if(poolIdx < INFINUMPOOLS && infiCmiChunkPools[CmiMyPe()][poolIdx].count <= INFIMAXPERPOOL){
+                metaData->nextBuf = infiCmiChunkPools[CmiMyPe()][poolIdx].startBuf;
+                infiCmiChunkPools[CmiMyPe()][poolIdx].startBuf = freePtr;
+                        infiCmiChunkPools[CmiMyPe()][poolIdx].count++;
+
+                        MACHSTATE3(2,"Pool %d now has startBuf at %p count %d",poolIdx,infiCmiChunkPools[poolIdx].startBuf,infiCmiChunkPools[poolIdx].count);
+                }else{
+                        MACHSTATE2(2,"Freeing up buf %p poolIdx %d",ptr,poolIdx);
+                        metaData->owner->metaData->count--;
+                        if(metaData->owner->metaData == metaData){
+                                //I am the owner
+                                if(metaData->owner->metaData->count == 0){
+                                        //all the chunks have been freed
+                                        ibv_dereg_mr(metaData->key);
+                                        free(freePtr);
+                                        free(metaData);
+                                }
+                                //if I am the owner and all the chunks have not been
+                                // freed dont free my metaData. will need later
+                        }else{
+                                if(metaData->owner->metaData->count == 0){
+                                        //need to free the owner's buffer and metadata
+                                        freePtr = metaData->owner;
+                                        ibv_dereg_mr(metaData->key);
+                                        free(metaData->owner->metaData);
+                                        free(freePtr);
+                                }
+                                free(metaData);
+                        }
+                }
+}
+
+
+void infi_CmiFree(void *ptr){
+       int i,j;
+        int size;
+       int parentPe;
+       void *pointer;
+        void *freePtr = ptr;
+
+        ptr += sizeof(CmiChunkHeader);
+        size = SIZEFIELD (ptr);
+/*      if(size > firstBinSize){*/
+       infiCmiChunkMetaData *metaData;
+        int poolIdx;
+        //there is a infiniband specific header
+        freePtr = ptr - sizeof(infiCmiChunkHeader);
+        metaData = METADATAFIELD(ptr);
+        poolIdx = metaData->poolIdx;
+
+        if(poolIdx == INFIMULTIPOOL){
+               /** this is a part of a received mult message  
+                    it will be freed correctly later
+                **/
+
+               return;
+        }
+
+       // checking if this free operation is my responsibility
+       parentPe = metaData->parentPe;
+       if(parentPe != CmiMyPe()){
+               PCQueuePush(queuePool[parentPe][CmiMyPe()],(char *)ptr);
+               return;
+       }
+
+       infi_CmiFreeDirect(ptr);
+
+       // checking free request queues
+       for(i = 0; i < CmiNumPes(); i++){
+               if(!PCQueueEmpty(queuePool[CmiMyPe()][i])){
+                       for(j = 0; j < PCQueueLength(queuePool[CmiMyPe()][i]); j++){
+                               pointer = (void *)PCQueuePop(queuePool[CmiMyPe()][i]);
+                               infi_CmiFreeDirect(pointer);    
+                       }
+               }
+       }
 }
 
+#else
 void infi_CmiFree(void *ptr){
        int size;
        void *freePtr = ptr;
@@ -2281,7 +2504,7 @@ void infi_CmiFree(void *ptr){
                free(freePtr);
        }*/
 }
-
+#endif
 
 /*********************************************************************************************
 This section is for CmiDirect. This is a variant of the  persistent communication in which 
@@ -2972,3 +3195,4 @@ int CmiBarrierZero()
   CmiNodeAllBarrier();
   processAllBufferedMsgs();
 }
+