Changes included to send and receive messages through the unreliable layer of Infiniband.
authorEsteban Meneses <emenese2@illinois.edu>
Mon, 8 Sep 2008 19:20:45 +0000 (19:20 +0000)
committerEsteban Meneses <emenese2@illinois.edu>
Mon, 8 Sep 2008 19:20:45 +0000 (19:20 +0000)
src/arch/net/machine-ibud.c

index 4cbebc567aab0ab6770edc4af2912ae93f4d897f..cee00c523a3c967d412e91c0f28d102f6c701dbe 100644 (file)
  * @{
  */
 
+/** NOTES
+- Every message sent using the unreliable layer of infiniband must include the GRH (Global Routing Header). The GRH are the first 40 bytes of every packet and the machine layer has no other responsibility over it than reserving the space in the packet.
+*/
+
+
 // FIXME: Note: Charm does not guarantee in order messages - can use for bettter performance
 
 
@@ -61,6 +66,7 @@ struct infiAddr {
 typedef struct infiPacketStruct {
        char *buf;
        int size;
+       char extra[40];                 // FIXME: check this 40 extra stuff
        struct infiPacketHeader header;
        struct ibv_mr *keyHeader;
        struct OtherNodeStruct *destNode;
@@ -96,6 +102,7 @@ struct infiContext {
        int sendCqSize,recvCqSize;
 
        void *buffer; // Registered memory buffer for msg's
+
 };
 
 static struct infiContext *context;
@@ -121,7 +128,7 @@ typedef struct {
   CmiState cs; /*Machine state*/
 } CmiIdleState;
 
-
+#define BUFFER_RECV 1
 struct infiBuffer{
        int type;
        char *buf;
@@ -412,19 +419,15 @@ void postInitialRecvs(struct infiBufferPool *recvBufferPool,int numRecvs,int siz
     int startBufferIdx=0;
     MACHSTATE2(3,"posting %d receives of size %d",numRecvs,sizePerBuffer);
     for(j=0;j<numRecvs;j++){
-
-
         sgElements[j].addr = (uint64_t) recvBufferPool->buffers[startBufferIdx+j].buf;
-        sgElements[j].length = sizePerBuffer;
+        sgElements[j].length = sizePerBuffer + 40;                                             // we add the 40 bytes of the GRH
         sgElements[j].lkey = recvBufferPool->buffers[startBufferIdx+j].key->lkey;
-
         workRequests[j].wr_id = (uint64_t)&(recvBufferPool->buffers[startBufferIdx+j]);
         workRequests[j].sg_list = &sgElements[j];
         workRequests[j].num_sge = 1;
         if(j != numRecvs-1){
             workRequests[j].next = &workRequests[j+1];
         }
-
     }
     workRequests[numRecvs-1].next = NULL;
     MACHSTATE(3,"About to call ibv_post_recv");
@@ -432,54 +435,54 @@ void postInitialRecvs(struct infiBufferPool *recvBufferPool,int numRecvs,int siz
 
     free(workRequests);
     free(sgElements);
+
 }
 
 struct infiBufferPool * allocateInfiBufferPool(int numRecvs,int sizePerBuffer){
-    int numBuffers;
-    int i;
-    int bigSize;
-    char *bigBuf;
-    struct infiBufferPool *ret;
-    struct ibv_mr *bigKey;
-
-    MACHSTATE2(3,"allocateInfiBufferPool numRecvs %d sizePerBuffer%d ",numRecvs,sizePerBuffer);
-
-    //page_size = sysconf(_SC_PAGESIZE);
-    ret = malloc(sizeof(struct infiBufferPool));
-    ret->next = NULL;
-    numBuffers=ret->numBuffers = numRecvs;
-
-    ret->buffers = malloc(sizeof(struct infiBuffer)*numBuffers);
-
-    bigSize = numBuffers*sizePerBuffer;
-    bigBuf=malloc(bigSize);
-    bigKey = ibv_reg_mr(context->pd,bigBuf,bigSize,IBV_ACCESS_LOCAL_WRITE);
-    CmiAssert(bigKey != NULL);
-
-    for(i=0;i<numBuffers;i++){
-        struct infiBuffer *buffer =  &(ret->buffers[i]);
-        buffer->type = 1;
-        buffer->size = sizePerBuffer;
-
-        buffer->buf = &bigBuf[i*sizePerBuffer];
-        buffer->key = bigKey;
-
-        if(buffer->key == NULL){
-            MACHSTATE2(3,"i %d buffer->buf %p",i,buffer->buf);
-            CmiAssert(buffer->key != NULL);
-        }
-    }
-    return ret;
+       int numBuffers;
+       int i;
+       int bigSize;
+       char *bigBuf;
+       struct infiBufferPool *ret;
+       struct ibv_mr *bigKey;
+       int page_size;
+
+       MACHSTATE2(3,"allocateInfiBufferPool numRecvs %d sizePerBuffer%d ",numRecvs,sizePerBuffer);
+
+       page_size = sysconf(_SC_PAGESIZE);
+       ret = malloc(sizeof(struct infiBufferPool));
+       ret->next = NULL;
+       numBuffers=ret->numBuffers = numRecvs;
+       ret->buffers = malloc(sizeof(struct infiBuffer)*numBuffers);
+       bigSize = numBuffers*sizePerBuffer;
+       bigBuf = memalign(page_size,bigSize);
+       bigKey = ibv_reg_mr(context->pd,bigBuf,bigSize,IBV_ACCESS_LOCAL_WRITE);
+       CmiAssert(bigKey != NULL);
+
+       for(i=0;i<numBuffers;i++){
+               struct infiBuffer *buffer =  &(ret->buffers[i]);
+               buffer->type = BUFFER_RECV;
+               buffer->size = sizePerBuffer;
+               buffer->buf = &bigBuf[i*sizePerBuffer];
+               buffer->key = bigKey;
+
+               if(buffer->key == NULL){
+                       MACHSTATE2(3,"i %d buffer->buf %p",i,buffer->buf);
+                       CmiAssert(buffer->key != NULL);
+               }
+       }
+       return ret;
 };
 
 
 
-void    infiPostInitialRecvs(){
-    //create the pool and post the receives 
-    int numPosts;
+void infiPostInitialRecvs(){
+       //create the pool and post the receives 
+       int numPosts;
     
-    context->recvBufferPool=allocateInfiBufferPool(maxrecvbuffers,packetsize);
-    postInitialRecvs(context->recvBufferPool,maxrecvbuffers,packetsize);
+       // we add 40 to the buffer size to handle administrative information
+       context->recvBufferPool = allocateInfiBufferPool(maxrecvbuffers, packetsize + 40);      // we add 40 bytes to hold the GRH of Infiniband 
+       postInitialRecvs(context->recvBufferPool,maxrecvbuffers,packetsize);
 
 }   
 
@@ -609,9 +612,9 @@ static inline infiPacket newPacket(){
        pkt->elemList[0].length = sizeof(struct infiPacketHeader);
        pkt->elemList[0].lkey = pkt->keyHeader->lkey;
        
-       pkt->wr.wr_id = (uint64_t)pkt;
+       pkt->wr.wr_id = (uint64_t)pkt; 
        pkt->wr.sg_list = &(pkt->elemList[0]);
-       pkt->wr.num_sge = 1; // FIXME: original ibverbs has 2, but this breaks sending
+       pkt->wr.num_sge = 2; //FIXME: should be 2 here
        pkt->wr.opcode = IBV_WR_SEND;
        pkt->wr.send_flags = IBV_SEND_SIGNALED;
        pkt->wr.next = NULL;
@@ -620,10 +623,8 @@ static inline infiPacket newPacket(){
 };
 
 static void inline EnqueuePacket(OtherNode node,infiPacket packet,int size,struct ibv_mr *dataKey){
-
 /*
-
-       struct ibv_send_wr wr,*bad_wr=NULL;
+struct ibv_send_wr wr,*bad_wr=NULL;
     struct ibv_sge list;
     void *buffer;
     struct ibv_mr *mr;
@@ -633,7 +634,6 @@ static void inline EnqueuePacket(OtherNode node,infiPacket packet,int size,struc
 buffer=malloc(128);
 mr=ibv_reg_mr(context->pd, buffer, 128, IBV_ACCESS_LOCAL_WRITE);
 
-
     //memset(&list, 0, sizeof(struct ibv_sge));
     list.addr = (uintptr_t) buffer + 40;
     list.length = 128;
@@ -677,8 +677,10 @@ mr=ibv_reg_mr(context->pd, buffer, 128, IBV_ACCESS_LOCAL_WRITE);
     MACHSTATE1(2," here qp=%i",context->qp);
     MACHSTATE1(2," here wr=%i",&(packet->wr));
     MACHSTATE1(2," here wr=%i",&bad_wr);
+
+
        if(ibv_post_send(context->qp,&(packet->wr),&bad_wr)){ 
-        MACHSTATE(2," problem sending");
+               MACHSTATE(2," problem sending");
                CmiPrintf("[%d] Sending to node %d failed with return value %d\n",_Cmi_mynode,node->infiData->nodeNo,retval);
                CmiAssert(0);
        }
@@ -936,14 +938,13 @@ static inline int pollCq(const int toBuffer,struct ibv_cq *cq) {
                 MACHSTATE3(3,"   wr_id=%i qp_num=%i vendor_err=%i",wc[i].wr_id,wc[i].qp_num,wc[i].vendor_err); 
                 MACHSTATE1(3,"  key=%p ",
                     ((struct infiBuffer *)((&wc[i])->wr_id))->key);
-/*
-                MACHSTATE4(3,"  lkey=%p buffer=%d length=%d end=%d",
+/*                MACHSTATE4(3,"  lkey=%p buffer=%d length=%d end=%d",
                     ((struct infiBuffer *)((&wc[i])->wr_id))->key->lkey,
                     ((struct infiBuffer *)((&wc[i])->wr_id))->buf, 
                     ((struct infiBuffer *)((&wc[i])->wr_id))->size,
                     ((struct infiBuffer *)((&wc[i])->wr_id))->buf+((struct infiBuffer *)((&wc[i])->wr_id))->size); 
-*/
-                       CmiAssert(wc[i].status==IBV_WC_SUCCESS);
+
+*/                     CmiAssert(wc[i].status==IBV_WC_SUCCESS);
         }
 
 
@@ -1078,15 +1079,15 @@ static void sendBarrierMessage(int pe) {
        int size=32;
        OtherNode node=nodes+pe;
        infiPacket packet;
-       //MallocInfiPacket(packet); // FIXME: put back in!
-       packet=newPacket();
+       MallocInfiPacket(packet); 
        packet->size = size;
        packet->buf = CmiAlloc(size); 
        packet->header.code=INFIBARRIERPACKET;
        packet->wr.wr.ud.ah=context->ah[pe];
        packet->wr.wr.ud.remote_qpn=nodes[pe].infiData->qp.qpn;
-       packet->wr.wr.ud.remote_qkey=0;
+       packet->wr.wr.ud.remote_qkey = 0x11111111;
 
+       MACHSTATE1(3,"HERE -> %d",packet->header.code);
 MACHSTATE2(3,"sending to qpn=%i pe=%i",nodes[pe].infiData->qp.qpn,pe); 
        struct ibv_mr *key=METADATAFIELD(packet->buf)->key;
        MACHSTATE3(3,"Barrier packet to %d size %d wr_id %d",node->infiData->nodeNo,size,packet->wr.wr_id);
@@ -1137,8 +1138,10 @@ MACHSTATE(3,"recvBarrierMessage 0"); // FIXME: REMOVE this debug
                                case IBV_WC_RECV: /* we have something to consider*/
                                    MACHSTATE(3," IN HERE !!!!!!!!!!");
                                        recvWC=&wc[i];
+
                                        buffer = (struct infiBuffer *) recvWC->wr_id;
-                                       header = (struct infiPacketHeader *)buffer->buf;
+                                       header = (struct infiPacketHeader *)(buffer->buf + 40);         // add 40 bytes to skip the GRH
+
                                        nodeNo = header->nodeNo;
                                        len = recvWC->byte_len-sizeof(struct infiPacketHeader);
                                        if(header->code & INFIPACKETCODE_DATA){
@@ -1148,7 +1151,8 @@ MACHSTATE(3,"recvBarrierMessage 0"); // FIXME: REMOVE this debug
                                        } else if(header->code & INFIBARRIERPACKET){
                                                MACHSTATE2(3,"Barrier packet from node %d len %d",nodeNo,len);
                                                barrierReached=1;
-                                       }
+                                       }else // FIXME: erase this else clause
+                                               MACHSTATE2(3,"Ups... %d %d",header->code,nodeNo);
                                        {
                                                struct ibv_sge list = {
                                                        .addr     = (uintptr_t) buffer->buf,
@@ -1163,6 +1167,9 @@ MACHSTATE(3,"recvBarrierMessage 0"); // FIXME: REMOVE this debug
                                                        .next = NULL
                                                };
                                                struct ibv_recv_wr *bad_wr;
+
+                                               CmiAssert(ibv_post_recv(context->qp,&wr,&bad_wr)==0); 
+
                                        }
                                        break;
                                default:
@@ -1187,7 +1194,7 @@ int CmiBarrier() {
        int numnodes = CmiNumNodes();
 MACHSTATE1(3,"Barrier 1 rank=%i",CmiMyRank());
        if (CmiMyRank() == 0) { /* every one send to pe 0 */
-               if (CmiMyNode() != 0) { // FIXME: CHANGE TO !=0      FIXME FIXME FIXME
+               if (CmiMyNode() != 0) { 
 
 MACHSTATE(3,"Barrier sendmsg");
                        sendBarrierMessage(0);
@@ -1241,13 +1248,6 @@ int CmiBarrierZero() {
 }
 
 
-
-
-
-
-
-
-
 void createqp(struct ibv_device *dev){
 
        context->sendCq = ibv_create_cq(context->context,context->sendCqSize,NULL,NULL,0);
@@ -1282,7 +1282,7 @@ void createqp(struct ibv_device *dev){
        attr.qp_state        = IBV_QPS_INIT;
        attr.pkey_index      = 0;
        attr.port_num        = context->ibPort; 
-        attr.qkey            = 0;
+        attr.qkey            = 0x11111111;
            if(ibv_modify_qp(context->qp, &attr,
                    IBV_QP_STATE              |
                IBV_QP_PKEY_INDEX         |
@@ -1336,40 +1336,6 @@ void createah() {
 }
 
 
-
-
-static int pp_post_recv() { 
-MACHSTATE(4,"pp_post_recv ");
-    int count=100;
-    void *buffer;
-    struct ibv_mr *mr;
-buffer=malloc(sizeof(int)*maxrecvbuffers);
-mr=ibv_reg_mr(context->pd, buffer, sizeof(int)*maxrecvbuffers, IBV_ACCESS_LOCAL_WRITE);
-
-
-        struct ibv_sge list = {
-            .addr   = (uintptr_t) buffer, 
-            .length = sizeof(int)*maxrecvbuffers,
-            .lkey   = mr->lkey,
-        };
-
-        struct ibv_recv_wr wr = {
-            .wr_id = (uint64_t)buffer,
-            .sg_list = &list,
-            .num_sge = 1,
-        };
-        struct ibv_recv_wr *bad_wr;
-        int i;
-        for (i=0;i<count;i++)
-            if(ibv_post_recv(context->qp,&wr,&bad_wr))
-                break;
-
-        ibv_dereg_mr(mr);
-        free(buffer);
-        CmiAssert(i==count);
-        return i;
-}
-
 void CmiMachineInit(char **argv)
 {
        int i;
@@ -1426,14 +1392,14 @@ void CmiMachineInit(char **argv)
                createqp(ibud.dev);
 //MACHSTATE1(3,"pp post recv=%i",pp_post_recv());      
     }
-
+       
        MACHSTATE(3,"} CmiMachineInit");
 }
 
 void CmiCommunicationInit(char **argv) {
        MACHSTATE(3,"CmiCommunicationInit {");
        if(_Cmi_numnodes>1) {
-        infiPostInitialRecvs();
+               infiPostInitialRecvs();
                createah();
        }
        MACHSTATE(3,"} CmiCommunicationInit");