fixed dynamic smsg problem
authorYanhua Sun <sun51@hopper09.(none)>
Tue, 20 Sep 2011 03:09:52 +0000 (20:09 -0700)
committerYanhua Sun <sun51@hopper09.(none)>
Tue, 20 Sep 2011 03:09:52 +0000 (20:09 -0700)
src/arch/gemini_gni/machine.c

index 28f50a079b134437d0b9fe67a1d596e0c523dc2c..b87eebb04b0f5121eccec4758e9b1223283cfa9b 100644 (file)
@@ -43,7 +43,7 @@ static void sleep(int secs) {
 static CmiInt8 _mempool_size = 1024ll*1024*32;
 #endif
 
-#define PRINT_SYH  0
+#define PRINT_SYH  1
 
 #if PRINT_SYH
 int         lrts_smsg_success = 0;
@@ -126,7 +126,7 @@ static int  SMSG_MAX_MSG;
 #define ALIGN4(x)        (size_t)((~3)&((x)+3)) 
 
 static int Mempool_MaxSize = 1024*1024*128;
-static int useDynamicSMSG   = 1;
+static int useDynamicSMSG   = 0;
 static int useStaticMSGQ = 0;
 static int useStaticFMA = 0;
 static int mysize, myrank;
@@ -617,14 +617,17 @@ static void setup_smsg_connection(int destNode)
     smsg_available_slot++;
     MallocPostDesc(pd);
     pd->type            = GNI_POST_FMA_PUT;
-    pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
-    pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
+    pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT |  GNI_CQMODE_REMOTE_EVENT;
+//pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
     pd->length          = sizeof(gni_smsg_attr_t);
     pd->local_addr      = (uint64_t) smsg_attr;
-    pd->remote_addr     = smsg_connection_vec[destNode].addr;
+    pd->remote_addr     = (uint64_t)&((((gni_smsg_attr_t*)(smsg_connection_vec[destNode].addr))[myrank]));
     pd->remote_mem_hndl = smsg_connection_vec[destNode].mdh;
+    pd->src_cq_hndl     = 0;
+    pd->rdma_mode       = 0;
     status = GNI_PostFma(ep_hndl_array[destNode],  pd);
     GNI_RC_CHECK("SMSG Dynamic link", status);
+    CmiPrintf("[%d=%d] send post FMA successful (%p) %s\n", myrank, destNode, (void*)pd->remote_addr, gni_err_str[status]);
 }
 
 inline 
@@ -634,10 +637,6 @@ static gni_return_t send_smsg_message(int destNode, void *header, int size_heade
     gni_smsg_attr_t      *smsg_attr;
     gni_post_descriptor_t *pd;
   
-#if PRINT_SYH
-    CmiPrintf("[%d] send_smsg_message\n", CmiMyPe());
-#endif
-
     if(useDynamicSMSG == 1)
     {
         if(smsg_connected_flag[destNode] == 0)
@@ -645,15 +644,17 @@ static gni_return_t send_smsg_message(int destNode, void *header, int size_heade
             CmiPrintf("[%d]Init smsg connection\n", CmiMyPe());
             setup_smsg_connection(destNode);
             delay_send_small_msg(msg, size, destNode, tag);
+            smsg_connected_flag[destNode] =1;
             return status;
         }
-        else  if(smsg_connected_flag[destNode] == 1)
+        else  if(smsg_connected_flag[destNode] < 3)
         {
             if(inbuff == 0)
                 delay_send_small_msg(msg, size, destNode, tag);
             return status;
         }
     }
+    CmiPrintf("[%d] reach send\n", myrank);
     if(smsg_msglist_index[destNode].head == 0 || inbuff==1)
     {
         status = GNI_SmsgSendWTag(ep_hndl_array[destNode], header, size_header, msg, size, 0, tag);
@@ -875,7 +876,7 @@ static void PumpNetworkSmsg()
     int                 msg_nbytes;
     void                *msg_data;
     gni_mem_handle_t    msg_mem_hndl;
-    gni_smsg_attr_t     *smsg_attr; 
+    gni_smsg_attr_t     *smsg_attr;
     while ((status =GNI_CqGetEvent(smsg_rx_cqh, &event_data)) == GNI_RC_SUCCESS)
     {
         inst_id = GNI_CQ_GET_INST_ID(event_data);
@@ -886,18 +887,18 @@ static void PumpNetworkSmsg()
 
         if(useDynamicSMSG == 1)
         {
-            if(smsg_connected_flag[inst_id] == 0)
+            if(smsg_connected_flag[inst_id] == 0 )
             {
                 CmiPrintf("[%d]pump Init smsg connection\n", CmiMyPe());
-                smsg_connected_flag[inst_id] = 2;
+                smsg_connected_flag[inst_id] =2;
                 setup_smsg_connection(inst_id);
                 status = GNI_SmsgInit(ep_hndl_array[inst_id], smsg_local_attr_vec[inst_id],  &(((gni_smsg_attr_t*)(setup_mem.addr))[inst_id]));
                 GNI_RC_CHECK("SmsgInit", status);
                 continue;
-            } else if (smsg_connected_flag[inst_id] == 1)
+            } else if (smsg_connected_flag[inst_id] <3) 
             {
                 CmiPrintf("[%d]pump setup smsg connection\n", CmiMyPe());
-                smsg_connected_flag[inst_id] = 2;
+                smsg_connected_flag[inst_id] += 1;
                 status = GNI_SmsgInit(ep_hndl_array[inst_id], smsg_local_attr_vec[inst_id], &(((gni_smsg_attr_t*)(setup_mem.addr))[inst_id]));
                 GNI_RC_CHECK("SmsgInit", status);
                 continue;
@@ -1132,14 +1133,19 @@ static void PumpLocalRdmaTransactions()
         {
             inst_id     = GNI_CQ_GET_INST_ID(ev);
 #if PRINT_SYH
-            CmiPrintf("**[%d] SMSGPumpLocalTransactions localdone=%d\n", myrank,  lrts_local_done_msg);
+            CmiPrintf("**[%d] SMSGPumpLocalTransactions localdone=%d, %d\n", myrank,  lrts_local_done_msg, smsg_connected_flag[inst_id]);
 #endif
             //status = GNI_GetCompleted(post_tx_cqh, ev, &tmp_pd);
             status = GNI_GetCompleted(smsg_tx_cqh, ev, &tmp_pd);
             ////Message is sent, free message , put is not used now
             if(tmp_pd->type == GNI_POST_RDMA_PUT || tmp_pd->type == GNI_POST_FMA_PUT)
             {
-                if(smsg_connected_flag[inst_id] == 2) 
+                if(smsg_connected_flag[inst_id] <3)
+                {
+                    smsg_connected_flag[inst_id] += 1;
+                    continue;
+                }
+                else  
                 {
                     //persistent message 
                     CmiFree((void *)tmp_pd->local_addr);
@@ -1274,23 +1280,24 @@ static int SendBufferMsg()
                 CmiPrintf("Weird tag\n");
                 CmiAbort("should not happen\n");
             }
-        } //end while 
-        if(status == GNI_RC_SUCCESS)
-        {
-            smsg_msglist_index[index].head = smsg_msglist_index[index].head->next;
-            FreeMsgList(ptr);
-            ptr= smsg_msglist_index[index].head;
+            if(status == GNI_RC_SUCCESS)
+            {
+                smsg_msglist_index[index].head = smsg_msglist_index[index].head->next;
+                FreeMsgList(ptr);
+                ptr= smsg_msglist_index[index].head;
 #if PRINT_SYH
-            buffered_smsg_counter--;
-            if(lrts_smsg_success == lrts_send_msg_id)
-                CmiPrintf("GOOD send buff [%d==>%d] send buffer sent done%d (msgs=%d)\n", myrank, ptr->destNode, lrts_smsg_success, lrts_send_msg_id);
-            else
-                CmiPrintf("BAD send buff [%d==>%d] sent done%d (msgs=%d)\n", myrank, ptr->destNode, lrts_smsg_success, lrts_send_msg_id);
+                buffered_smsg_counter--;
+                if(lrts_smsg_success == lrts_send_msg_id)
+                    CmiPrintf("GOOD send buff [%d==>%d] send buffer sent done%d (msgs=%d)\n", myrank, ptr->destNode, lrts_smsg_success, lrts_send_msg_id);
+                else
+                    CmiPrintf("BAD send buff [%d==>%d] sent done%d (msgs=%d)\n", myrank, ptr->destNode, lrts_smsg_success, lrts_send_msg_id);
 #endif
-        }else {
-            done = 0;
-            break;
-        }
+            }else {
+                done = 0;
+                break;
+            } 
+        
+        } //end while
         if(ptr == 0)
         {
             if(index_previous != -1)
@@ -1303,31 +1310,32 @@ static int SendBufferMsg()
         }
         index = smsg_msglist_index[index].next;
     }   // end pooling for all cores
-#if PRINT_SYH
-    if(lrts_send_msg_id-lrts_smsg_success !=0)
-        CmiPrintf("WRONG [%d buffered msg is empty]\n", myrank);
-#endif
     return done;
 }
 
 static void LrtsAdvanceCommunication()
 {
     /*  Receive Msg first */
-    //CmiPrintf("Calling Lrts Pump Msg PE:%d\n", myrank);
+    if(myrank == 0)
+    CmiPrintf("Calling Lrts Pump Msg PE:%d\n", myrank);
     PumpNetworkSmsg();
     //CmiPrintf("Calling Lrts Pump RdmaMsg PE:%d\n", CmiMyPe());
     //PumpNetworkRdmaMsgs();
     /* Release Sent Msg */
     //CmiPrintf("Calling Lrts Rlease Msg PE:%d\n", CmiMyPe());
-    //PumpLocalSmsgTransactions();
-    //CmiPrintf("Calling Lrts Rlease RdmaMsg PE:%d\n", myrank);
+    ////PumpLocalSmsgTransactions();
+    if(myrank == 0)
+    CmiPrintf("Calling Lrts Rlease RdmaMsg PE:%d\n", myrank);
     PumpLocalRdmaTransactions();
-    //CmiPrintf("Calling Lrts Send Buffmsg PE:%d\n", myrank);
+    if(myrank == 0)
+    CmiPrintf("Calling Lrts Send Buffmsg PE:%d\n", myrank);
     /* Send buffered Message */
     SendBufferMsg();
-    //CmiPrintf("Calling Lrts rdma PE:%d\n", myrank);
+    if(myrank == 0)
+    CmiPrintf("Calling Lrts rdma PE:%d\n", myrank);
     SendRdmaMsg();
-    //CmiPrintf("done PE:%d\n", myrank);
+    if(myrank == 0)
+    CmiPrintf("done PE:%d\n", myrank);
 }
 
 static void _init_dynamic_smsg()
@@ -1339,9 +1347,12 @@ static void _init_dynamic_smsg()
 
     CmiPrintf("start Dynamic init done %d\n", myrank);
     smsg_local_attr_vec = (gni_smsg_attr_t**) malloc(sizeof(gni_smsg_attr_t*) *mysize);
-    setup_mem.addr = (uint64_t)malloc(mysize * SMSG_CONN_SIZE);
-    status = MEMORY_REGISTER(onesided_hnd, nic_hndl, smsg_connection_addr, mysize * SMSG_CONN_SIZE,  &(setup_mem.mdh), &omdh);
     
+    setup_mem.addr = (uint64_t)malloc(mysize * sizeof(gni_smsg_attr_t));
+    status = GNI_MemRegister(nic_hndl, setup_mem.addr,  mysize * SMSG_CONN_SIZE, smsg_rx_cqh,  GNI_MEM_READWRITE, -1,  &(setup_mem.mdh));
+   
+    GNI_RC_CHECK("Smsg dynamic allocation \n", status);
+    CmiPrintf("[%d]Smsg dynamic allocation (%p)\n", myrank, (void*)setup_mem.addr);
     smsg_connection_vec = (mdh_addr_t*) malloc(mysize*sizeof(mdh_addr_t)); 
     allgather(&setup_mem, smsg_connection_vec, sizeof(mdh_addr_t));