Add support for CmiBarrier to net ibverbs layer.
authorEric Bohm <ebohm@illinois.edu>
Mon, 9 Jun 2008 22:22:20 +0000 (22:22 +0000)
committerEric Bohm <ebohm@illinois.edu>
Mon, 9 Jun 2008 22:22:20 +0000 (22:22 +0000)
Should support the +isomalloc_sync feature as shown by successful executions
of test/util/check with +isomalloc_sync up to +p16 on abe

src/arch/net/machine-dgram.c
src/arch/net/machine-ibverbs.c

index 896a63ce1562f76a28a5b56dd51a2ab0bd74cd34..e1f61c180dd654811158aae651d3c3187c9dd14b 100644 (file)
@@ -799,7 +799,7 @@ void SendHypercube(OutgoingMsg ogm, int root, int size, char *msg, unsigned int
 #elif CMK_USE_IBVERBS
 
 #include "machine-ibverbs.c"
-#define BARRIER_NULL           1
+/*#define BARRIER_NULL           1*/
 
 #else
 
index 1fc5fe0f3b5aba9227f76a3183175a46802ac2c4..1578a7d9dcbc8991f32895db150ad25946bbeea3 100644 (file)
@@ -114,6 +114,7 @@ Data Structures
 #define INFIDIRECT_REQUEST 16
 #define INFIPACKETCODE_INCTOKENSACK 32
 #define INFIDUMMYPACKET 64
+#define INFIBARRIERPACKET 128
 
 struct infiPacketHeader{
        char code;
@@ -883,6 +884,9 @@ static void CmiNotifyStillIdle(CmiIdleState *s) {
 #endif
 }
 
+
+
+
 static inline void increaseTokens(OtherNode node);
 
 static inline int pollRecvCq(const int toBuffer);
@@ -1571,6 +1575,10 @@ static inline void processRecvWC(struct ibv_wc *recvWC,const int toBuffer){
        if(header->code & INFIDUMMYPACKET){
                MACHSTATE(3,"Dummy packet");
        }
+       if(header->code & INFIBARRIERPACKET){
+               MACHSTATE(3,"Barrier packet");
+               CmiAbort("Should not receive Barrier packet in normal polling loop.  Your Barrier is broken");
+       }
 #if CMK_IBVERBS_INCTOKENS      
        if(header->code & INFIPACKETCODE_INCTOKENS){
                increasePostedRecvs(nodeNo);
@@ -2778,3 +2786,189 @@ void processDirectWC(struct infiRdmaPacket *rdmaPacket){
 };
 */
 
+static void sendBarrierMessage(int pe)
+{
+  /* we will only need one packet */
+  int size=32;
+  OtherNode  node = nodes + pe;
+  infiPacket packet;
+  MallocInfiPacket(packet);
+  packet->size = size;
+  packet->buf = CmiAlloc(size);
+  packet->header.code = INFIBARRIERPACKET;
+  struct ibv_mr *key = METADATAFIELD(packet->buf)->key;
+  MACHSTATE2(3,"Barrier packet to %d size %d",node->infiData->nodeNo,size);
+  pollSendCq(0);
+  EnqueuePacket(node,packet,size,key);
+}
+
+static void recvBarrierMessage()
+{
+  int i;
+  int ne;
+  /*  struct ibv_wc wc[WC_LIST_SIZE];*/
+  struct ibv_wc wc[1];
+  struct ibv_wc *recvWC;
+  /* block on the recvq, this is lazy and evil in the general case because we abuse buffers but should be ok for startup barriers */
+  int toBuffer=1; // buffer without processing recvd messages
+  int barrierReached=0;
+  struct infiBuffer *buffer = NULL;
+  struct infiPacketHeader *header = NULL;
+  int nodeNo=-1;
+  int len=-1;
+  while(!barrierReached)
+    {
+      /* gengbin's semantic will implode if more than one q is polled at a time */
+      ne = ibv_poll_cq(context->recvCq,1,&wc[0]);
+      //       CmiAssert(ne >=0);
+      if(ne != 0){
+       MACHSTATE1(3,"recvBarrier ne %d",ne);
+      }
+      pollSendCq(0);
+      for(i=0;i<ne;i++){
+       if(wc[i].status != IBV_WC_SUCCESS){
+         CmiAssert(0);
+       }
+       switch(wc[i].opcode){
+       case IBV_WC_RECV: /* we have something to consider*/
+         recvWC=&wc[i];
+         buffer = (struct infiBuffer *) recvWC->wr_id; 
+         header = (struct infiPacketHeader *)buffer->buf;
+         nodeNo = header->nodeNo;
+         len = recvWC->byte_len-sizeof(struct infiPacketHeader);
+
+         if(header->code & INFIPACKETCODE_DATA){
+           processMessage(nodeNo,len,(buffer->buf+sizeof(struct infiPacketHeader)),toBuffer);
+         }
+         if(header->code & INFIDUMMYPACKET){
+           MACHSTATE(3,"Dummy packet");
+         }
+         if(header->code & INFIBARRIERPACKET){
+           MACHSTATE2(3,"Barrier packet from node %d len %d",nodeNo,len);      
+           // now we are done
+           barrierReached=1;
+           /* semantically questionable */
+           //processAllBufferedMsgs();
+           //return;
+         }
+         if(rdma && header->code & INFIRDMA_START){
+           struct infiRdmaPacket *rdmaPacket = (struct infiRdmaPacket *)(buffer->buf+sizeof(struct infiPacketHeader));
+           //          if(toBuffer){
+           //TODO: make a function of this and use for both acks and requests
+           struct infiRdmaPacket *copyPacket = malloc(sizeof(struct infiRdmaPacket));
+           struct infiRdmaPacket *tmp=context->bufferedRdmaRequests;
+           *copyPacket = *rdmaPacket;
+           copyPacket->fromNodeNo = nodeNo;
+           MACHSTATE1(3,"Buffering Rdma Request %p",copyPacket);
+           context->bufferedRdmaRequests = copyPacket;
+           copyPacket->next = tmp;
+           copyPacket->prev = NULL;
+           if(tmp != NULL){
+             tmp->prev = copyPacket;
+           }
+           /*          }else{
+                       processRdmaRequest(rdmaPacket,nodeNo,0);
+                       }*/
+         }
+         if(rdma && header->code & INFIRDMA_ACK){
+           struct infiRdmaPacket *rdmaPacket = (struct infiRdmaPacket *)(buffer->buf+sizeof(struct infiPacketHeader)) ;
+           processRdmaAck(rdmaPacket);
+         }
+         {
+           struct ibv_sge list = {
+             .addr     = (uintptr_t) buffer->buf,
+             .length = buffer->size,
+             .lkey     = buffer->key->lkey
+           };
+       
+           struct ibv_recv_wr wr = {
+             .wr_id = (uint64_t)buffer,
+             .sg_list = &list,
+             .num_sge = 1,
+             .next = NULL
+           };
+           struct ibv_recv_wr *bad_wr;
+       
+           if(ibv_post_srq_recv(context->srq,&wr,&bad_wr)){
+             CmiAssert(0);
+           }
+         }
+
+         break;
+       default:
+         CmiAbort("Wrong type of work completion object in recvq");
+         break;
+       }
+      }
+    }
+  /* semantically questionable */
+  //  processAllBufferedMsgs();
+}
+
+
+/* happen at node level */
+int CmiBarrier()
+{
+  int len, size, i;
+  int status;
+  int count = 0;
+  OtherNode node;
+  int numnodes = CmiNumNodes();
+  if (CmiMyRank() == 0) {
+    /* every one send to pe 0 */
+    if (CmiMyNode() != 0) {
+      sendBarrierMessage(0);
+    }
+    /* printf("[%d] HERE\n", CmiMyPe()); */
+    if (CmiMyNode() == 0) 
+    {
+      for (count = 1; count < numnodes; count ++) 
+      {
+        recvBarrierMessage();
+      }
+      /* pe 0 broadcast */
+      for (i=1; i<=BROADCAST_SPANNING_FACTOR; i++) {
+        int p = i;
+        if (p > numnodes - 1) break;
+        /* printf("[%d] BD => %d \n", CmiMyPe(), p); */
+        sendBarrierMessage(p);
+      }
+    }
+    /* non 0 node waiting */
+    if (CmiMyNode() != 0) 
+    {
+      recvBarrierMessage();
+      for (i=1; i<=BROADCAST_SPANNING_FACTOR; i++) {
+        int p = CmiMyNode();
+        p = BROADCAST_SPANNING_FACTOR*p + i;
+        if (p > numnodes - 1) break;
+        p = p%numnodes;
+        /* printf("[%d] RELAY => %d \n", CmiMyPe(), p); */
+        sendBarrierMessage(p);
+      }
+    }
+  }
+  CmiNodeAllBarrier();
+  processAllBufferedMsgs();
+  /* printf("[%d] OUT of barrier \n", CmiMyPe()); */
+}
+
+/* everyone sends a message to pe 0 and go on */
+int CmiBarrierZero()
+{
+  int i;
+
+  if (CmiMyRank() == 0) {
+    if (CmiMyNode()) {
+      sendBarrierMessage(0);
+    }
+    else {
+      for (i=0; i<CmiNumNodes()-1; i++)
+      {
+        recvBarrierMessage();
+      }
+    }
+  }
+  CmiNodeAllBarrier();
+  processAllBufferedMsgs();
+}