fix race condition in setting dynamic connections.
authorGengbin Zheng <gzheng@illinois.edu>
Fri, 30 Dec 2011 01:14:02 +0000 (17:14 -0800)
committerGengbin Zheng <gzheng@illinois.edu>
Fri, 30 Dec 2011 01:14:02 +0000 (17:14 -0800)
src/arch/gemini_gni/machine.c

index ca8aa834aeb1ec19c7512570c38c0470d63d8ee6..d1b29cf9ff75517aa6000a5fe57faea0a3e662ee 100644 (file)
@@ -40,12 +40,12 @@ static void sleep(int secs) {
 #if useDynamicSMSG
 #define             AVG_SMSG_CONNECTION     10
 #define             SMSG_ATTR_SIZE      sizeof(gni_smsg_attr_t)
-int                 *smsg_connected_flag= 0;
-gni_smsg_attr_t     **smsg_attr_vector_local;
-gni_smsg_attr_t     **smsg_attr_vector_remote;
-gni_ep_handle_t       ep_hndl_unbound;
-gni_smsg_attr_t     send_smsg_attr;
-gni_smsg_attr_t     recv_smsg_attr;
+static int                 *smsg_connected_flag= 0;
+static gni_smsg_attr_t     **smsg_attr_vector_local;
+static gni_smsg_attr_t     **smsg_attr_vector_remote;
+static gni_ep_handle_t     ep_hndl_unbound;
+static gni_smsg_attr_t     send_smsg_attr;
+static gni_smsg_attr_t     recv_smsg_attr;
 
 typedef struct _dynamic_smsg_mailbox{
    void     *mailbox_base;
@@ -55,7 +55,7 @@ typedef struct _dynamic_smsg_mailbox{
    struct      _dynamic_smsg_mailbox  *next;
 }dynamic_smsg_mailbox_t;
 
-dynamic_smsg_mailbox_t  *mailbox_list;
+static dynamic_smsg_mailbox_t  *mailbox_list;
 #endif
 
 #define REMOTE_EVENT                      0
@@ -744,21 +744,20 @@ static void setup_smsg_connection(int destNode)
         printf("[%d=%d]OK send post FMA \n", myrank, destNode);
 #endif
 }
+
 #if useDynamicSMSG
 inline 
 static void alloc_smsg_attr( gni_smsg_attr_t *local_smsg_attr)
 {
     gni_return_t status = GNI_RC_NOT_DONE;
-    dynamic_smsg_mailbox_t *new_mailbox_entry;
 
-    if(mailbox_list->offset == mailbox_list->size-1)
+    if(mailbox_list->offset == mailbox_list->size)
     {
+        dynamic_smsg_mailbox_t *new_mailbox_entry;
         new_mailbox_entry = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
-
-        new_mailbox_entry = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
-        new_mailbox_entry->mailbox_base = malloc(smsg_memlen*AVG_SMSG_CONNECTION);
-        bzero(mailbox_list->mailbox_base, smsg_memlen*AVG_SMSG_CONNECTION);
         new_mailbox_entry->size = smsg_memlen*AVG_SMSG_CONNECTION;
+        new_mailbox_entry->mailbox_base = malloc(new_mailbox_entry->size);
+        bzero(new_mailbox_entry->mailbox_base, new_mailbox_entry->size);
         new_mailbox_entry->offset = 0;
         
         status = GNI_MemRegister(nic_hndl, (uint64_t)new_mailbox_entry->mailbox_base,
@@ -780,32 +779,51 @@ static void alloc_smsg_attr( gni_smsg_attr_t *local_smsg_attr)
     local_smsg_attr->buff_size = smsg_memlen;
     local_smsg_attr->msg_buffer = mailbox_list->mailbox_base;
     local_smsg_attr->mem_hndl = mailbox_list->mem_hndl;
-
 }
 #endif
+
+static void PumpDatagramConnection();
+
 inline 
 static gni_return_t send_smsg_message(int destNode, void *header, int size_header, void *msg, int size, uint8_t tag, int inbuff )
 {
     unsigned int          remote_address;
-    uint32_t             remote_id;
+    uint32_t              remote_id;
     gni_return_t status = GNI_RC_NOT_DONE;
     gni_smsg_attr_t      *smsg_attr;
     gni_post_descriptor_t *pd;
     gni_post_state_t      post_state;
     
 #if useDynamicSMSG
-        if(smsg_connected_flag[destNode] == 0)
-        {
+    loop:
+        status = GNI_RC_NOT_DONE;
+        switch (smsg_connected_flag[destNode]) {
+        case 0: {
             smsg_attr_vector_local[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
             alloc_smsg_attr(smsg_attr_vector_local[destNode]);
             smsg_attr_vector_remote[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
             
-            status = GNI_EpPostDataWId (ep_hndl_array[destNode], smsg_attr_vector_local[destNode], sizeof(gni_smsg_attr_t),smsg_attr_vector_remote[destNode] ,sizeof(gni_smsg_attr_t), destNode);
+            do {
+              status = GNI_EpPostDataWId (ep_hndl_array[destNode], smsg_attr_vector_local[destNode], sizeof(gni_smsg_attr_t),smsg_attr_vector_remote[destNode] ,sizeof(gni_smsg_attr_t), destNode+mysize);
+              if (status == GNI_RC_SUCCESS) break;
+              PumpDatagramConnection();
+              if (smsg_connected_flag[destNode] == 2) break;
+            } while (status == GNI_RC_ERROR_RESOURCE);
+            if (smsg_connected_flag[destNode] == 2) goto loop; // fixme, leak
+#if 0
+            if (status == GNI_RC_ERROR_RESOURCE) {
+              status = GNI_RC_NOT_DONE;
+              break;
+            }
+#endif
             GNI_RC_CHECK("GNI_Post", status);
             smsg_connected_flag[destNode] = 1;
             status = GNI_RC_NOT_DONE;
-        }else if (smsg_connected_flag[destNode] == 1)   //already sending out connection_setup infor
+            break;
+        }
+        case 1:   //already sending out connection_setup infor
         {
+#if 0
             //check whether connection is done
             status = GNI_EpPostDataTest( ep_hndl_array[destNode], &post_state, &remote_address, &remote_id);
             if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED){
@@ -815,9 +833,13 @@ static gni_return_t send_smsg_message(int destNode, void *header, int size_heade
                 printf("++ Dynamic SMSG setup [%d===>%d] done\n", myrank, destNode);
 #endif
                 smsg_connected_flag[destNode] = 2;
+                goto loop;
             }
+#endif
             status = GNI_RC_NOT_DONE;
-        }else if (smsg_connected_flag[destNode] == 2) // connection done
+            break;
+        }
+        case 2:  // connection done
         {
             if(PCQueueEmpty(smsg_msglist_index[destNode].sendSmsgBuf) || inbuff==1)
             {
@@ -832,7 +854,8 @@ static gni_return_t send_smsg_message(int destNode, void *header, int size_heade
                 }
             }
             status = GNI_RC_NOT_DONE;
-
+            break;
+        }
         }
 
         if(inbuff ==0)
@@ -1122,9 +1145,47 @@ static void    PumpDatagramConnection()
     uint32_t          remote_id;
     gni_return_t status;
     gni_post_state_t  post_state;
-    int i;
     uint64_t             datagram_id;
+    int i;
+
+#if 1         /* rewrite */
+   while ((status = GNI_PostDataProbeById(nic_hndl, &datagram_id)) == GNI_RC_SUCCESS)
+   {
+     if (datagram_id >= mysize) {           /* bound endpoint */
+       int pe = datagram_id - mysize;
+       status = GNI_EpPostDataTestById( ep_hndl_array[pe], datagram_id, &post_state, &remote_address, &remote_id);
+       if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
+       {
+          CmiAssert(remote_id == pe);
+          status = GNI_SmsgInit(ep_hndl_array[pe], smsg_attr_vector_local[pe], smsg_attr_vector_remote[pe]);
+          GNI_RC_CHECK("Dynamic SMSG Init", status);
+#if PRINT_SYH
+          printf("++ Dynamic SMSG setup [%d===>%d] done\n", myrank, pe);
+#endif
+         CmiAssert(smsg_connected_flag[pe] == 1);
+          smsg_connected_flag[pe] = 2;
+       }
+     }
+     else {         /* unbound ep */
+       status = GNI_EpPostDataTestById( ep_hndl_unbound, datagram_id, &post_state, &remote_address, &remote_id);
+       if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
+       {
+          CmiAssert(remote_id<mysize);
+         //CmiAssert(smsg_connected_flag[remote_id] == 0);
+          status = GNI_SmsgInit(ep_hndl_array[remote_id], &send_smsg_attr, &recv_smsg_attr);
+          GNI_RC_CHECK("Dynamic SMSG Init", status);
+#if PRINT_SYH
+          printf("++ Dynamic SMSG setup2 [%d===>%d] done\n", myrank, remote_id);
+#endif
+          smsg_connected_flag[remote_id] = 2;
 
+          alloc_smsg_attr(&send_smsg_attr);
+          status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
+          GNI_RC_CHECK("post unbound datagram", status);
+        }
+     }
+   }
+#else                       /*  ORIGINAL,  NOT USED */
     for(i=0; i<mysize; i++)
     {
         if(smsg_connected_flag[i] ==0 || smsg_connected_flag[i] == 2)
@@ -1134,7 +1195,8 @@ static void    PumpDatagramConnection()
             continue;
 
         status = GNI_EpPostDataTestById( ep_hndl_array[i], datagram_id, &post_state, &remote_address, &remote_id);
-        CmiAssert(remote_id == i);
+printf("[%d] PumpDatagramConnection: %d %d %d\n", myrank, datagram_id, remote_id, i);
+        //CmiAssert(remote_id == i);
         if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
         {
             status = GNI_SmsgInit(ep_hndl_array[i], smsg_attr_vector_local[i], smsg_attr_vector_remote[i]);
@@ -1167,6 +1229,7 @@ static void    PumpDatagramConnection()
             GNI_RC_CHECK("post unbound datagram", status);
         }
     };
+#endif
 }
 #endif
 
@@ -1201,6 +1264,9 @@ static void PumpNetworkSmsg()
         // GetEvent returns success but GetNext return not_done. caused by Smsg out-of-order transfer
 #if PRINT_SYH
         printf("[%d] PumpNetworkMsgs is received from PE: %d,  status=%s\n", myrank, inst_id,  gni_err_str[status]);
+#endif
+#if useDynamicSMSG
+        if (smsg_connected_flag[inst_id] != 2) continue;
 #endif
         msg_tag = GNI_SMSG_ANY_TAG;
         while( (status = GNI_SmsgGetNextWTag(ep_hndl_array[inst_id], &header, &msg_tag)) == GNI_RC_SUCCESS)
@@ -1283,10 +1349,12 @@ static void PumpNetworkSmsg()
         GNI_RC_CHECK("Smsg_rx_cq full", status);
     }
 }
+
 static void printDesc(gni_post_descriptor_t *pd)
 {
     printf(" addr=%p, ", pd->local_addr); 
 }
+
 static void getLargeMsgRequest(void* header, uint64_t inst_id )
 {
 #if     USE_LRTS_MEMPOOL
@@ -1676,6 +1744,12 @@ static int SendBufferMsg()
     {
         while(!PCQueueEmpty(smsg_msglist_index[index].sendSmsgBuf))
         {
+#if useDynamicSMSG
+            if (smsg_connected_flag[index] != 2) {    /* connection exists */
+              done = 0;
+              break;
+            }
+#endif
             ptr = (MSG_LIST*)PCQueuePop(smsg_msglist_index[index].sendSmsgBuf);
 #if         CMK_SMP
             if(ptr == NULL)
@@ -1821,15 +1895,16 @@ void LrtsAdvanceCommunication()
     printf("done PE:%d\n", myrank);
 #endif
 }
+
 #if useDynamicSMSG
 static void _init_dynamic_smsg()
 {
     gni_return_t status;
+    uint32_t     vmdh_index = -1;
     int i;
 
     smsg_attr_vector_local = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
     smsg_attr_vector_remote = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
-    
     smsg_connected_flag = (int*)malloc(sizeof(int)*mysize);
     for(i=0; i<mysize; i++) {
         smsg_connected_flag[i] = 0;
@@ -1852,17 +1927,20 @@ static void _init_dynamic_smsg()
     send_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
     send_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
     status = GNI_SmsgBufferSizeNeeded(&send_smsg_attr, &smsg_memlen);
+    GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
 
     mailbox_list = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
-    mailbox_list->mailbox_base = malloc(smsg_memlen*AVG_SMSG_CONNECTION);
-    bzero(mailbox_list->mailbox_base, smsg_memlen*AVG_SMSG_CONNECTION);
     mailbox_list->size = smsg_memlen*AVG_SMSG_CONNECTION;
+    //mailbox_list->mailbox_base = malloc(mailbox_list->size);
+    posix_memalign(&mailbox_list->mailbox_base, 64, mailbox_list->size);
+    bzero(mailbox_list->mailbox_base, mailbox_list->size);
     mailbox_list->offset = 0;
+    mailbox_list->next = 0;
     
     status = GNI_MemRegister(nic_hndl, (uint64_t)(mailbox_list->mailbox_base),
         mailbox_list->size, smsg_rx_cqh,
         GNI_MEM_READWRITE,   
-        -1,
+        vmdh_index,
         &(mailbox_list->mem_hndl));
     //status = MEMORY_REGISTER(onesided_hnd, nic_hndl, mailbox_list->mailbox_base, mailbox_list->size, &(mailbox_list->mem_hndl), &omdh);
     GNI_RC_CHECK("MEMORY registration for smsg", status);
@@ -1876,6 +1954,7 @@ static void _init_dynamic_smsg()
     GNI_RC_CHECK("post unbound datagram", status);
 }
 #endif
+
 static void _init_static_smsg()
 {
     gni_smsg_attr_t      *smsg_attr;
@@ -2326,9 +2405,9 @@ void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID)
     if(mysize > 1)
     {
 #if useDynamicSMSG
-            _init_dynamic_smsg();
+        _init_dynamic_smsg();
 #else
-            _init_static_smsg();
+        _init_static_smsg();
 #endif
         _init_smsg();
         PMI_Barrier();
@@ -2438,7 +2517,7 @@ void  LrtsFree(void *msg)
 void LrtsExit()
 {
     /* free memory ? */
-#if     USE_LRTS_MEMPOOL
+#if USE_LRTS_MEMPOOL
     mempool_destroy(CpvAccess(mempool));
 #endif
     PMI_Finalize();
@@ -2449,10 +2528,14 @@ void LrtsDrainResources()
 {
     if(mysize == 1) return;
     while (!SendBufferMsg()) {
+#if useDynamicSMSG
+        PumpDatagramConnection();
+#endif
         PumpNetworkSmsg();
-        PumpNetworkRdmaMsgs();
-        PumpLocalSmsgTransactions();
+        //PumpNetworkRdmaMsgs();
         PumpLocalRdmaTransactions();
+        //PumpLocalSmsgTransactions();
+        SendRdmaMsg();
     }
     PMI_Barrier();
 }