fixed compile error in gemini
authorYanhua Sun <sun51@illinois.edu>
Mon, 19 Sep 2011 23:22:17 +0000 (18:22 -0500)
committerYanhua Sun <sun51@illinois.edu>
Mon, 19 Sep 2011 23:22:17 +0000 (18:22 -0500)
src/arch/gemini_gni/machine.c

index ac0571aa8080ce5b6a526455d89c3f7bab33b4d7..28f50a079b134437d0b9fe67a1d596e0c523dc2c 100644 (file)
@@ -40,7 +40,7 @@ static void sleep(int secs) {
 #define USE_LRTS_MEMPOOL   1
 
 #if USE_LRTS_MEMPOOL
-static size_t _mempool_size = 1024ll*1024*32;
+static CmiInt8 _mempool_size = 1024ll*1024*32;
 #endif
 
 #define PRINT_SYH  0
@@ -126,7 +126,7 @@ static int  SMSG_MAX_MSG;
 #define ALIGN4(x)        (size_t)((~3)&((x)+3)) 
 
 static int Mempool_MaxSize = 1024*1024*128;
-static int useDynamicSMSG   = 0;
+static int useDynamicSMSG   = 1;
 static int useStaticMSGQ = 0;
 static int useStaticFMA = 0;
 static int mysize, myrank;
@@ -140,20 +140,22 @@ typedef struct {
 
 typedef struct mdh_addr_list{
     gni_mem_handle_t mdh;
-    uint64_t addr;
+   void *addr;
     struct mdh_addr_list *next;
 }mdh_addr_list_t;
 
 static unsigned int         smsg_memlen;
-#define     SMSG_CONN_SIZE     24
-int     *smsg_connected_flag= 0;
-char    *smsg_connection_addr = 0;
-mdh_addr_t    *smsg_connection_vec = 0;
+#define     SMSG_CONN_SIZE     sizeof(gni_smsg_attr_t)
+gni_smsg_attr_t    **smsg_local_attr_vec = 0;
+int                 *smsg_connected_flag= 0;
+char                *smsg_connection_addr = 0;
+mdh_addr_t          setup_mem;
+mdh_addr_t          *smsg_connection_vec = 0;
 gni_mem_handle_t    smsg_connection_memhndl;
-static int            smsg_expand_slots = 10;
-static int            smsg_available_slot = 0;
-static void           *smsg_mailbox_mempool = 0;
-mdh_addr_list_t       *smsg_dynamic_list = 0;
+static int          smsg_expand_slots = 10;
+static int          smsg_available_slot = 0;
+static void         *smsg_mailbox_mempool = 0;
+mdh_addr_list_t     *smsg_dynamic_list = 0;
 
 static void             *smsg_mailbox_base;
 gni_msgq_attr_t         msgq_attrs;
@@ -581,54 +583,75 @@ static void delay_send_small_msg(void *msg, int size, int destNode, uint8_t tag)
 #endif
 }
 
+inline
+static void setup_smsg_connection(int destNode)
+{
+    mdh_addr_list_t  *new_entry = 0;
+    gni_post_descriptor_t *pd;
+    gni_smsg_attr_t      *smsg_attr;
+    gni_return_t status = GNI_RC_NOT_DONE;
+    if(smsg_available_slot == smsg_expand_slots)
+    {
+        new_entry = (mdh_addr_list_t*)malloc(sizeof(mdh_addr_list_t));
+        new_entry->addr = memalign(64, smsg_memlen*smsg_expand_slots);
+        bzero(new_entry->addr, smsg_memlen*smsg_expand_slots);
+
+        status = GNI_MemRegister(nic_hndl, (uint64_t)new_entry->addr,
+            smsg_memlen*smsg_expand_slots, smsg_rx_cqh,
+            GNI_MEM_READWRITE,   
+            -1,
+            &(new_entry->mdh));
+        smsg_available_slot = 0; 
+        new_entry->next = smsg_dynamic_list;
+        smsg_dynamic_list = new_entry;
+    }
+    smsg_attr = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
+    smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
+    smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
+    smsg_attr->msg_maxsize = SMSG_MAX_MSG;
+    smsg_attr->mbox_offset = smsg_available_slot * smsg_memlen;
+    smsg_attr->buff_size = smsg_memlen;
+    smsg_attr->msg_buffer = smsg_dynamic_list->addr;
+    smsg_attr->mem_hndl = smsg_dynamic_list->mdh;
+    smsg_local_attr_vec[destNode] = smsg_attr;
+    smsg_available_slot++;
+    MallocPostDesc(pd);
+    pd->type            = GNI_POST_FMA_PUT;
+    pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
+    pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
+    pd->length          = sizeof(gni_smsg_attr_t);
+    pd->local_addr      = (uint64_t) smsg_attr;
+    pd->remote_addr     = smsg_connection_vec[destNode].addr;
+    pd->remote_mem_hndl = smsg_connection_vec[destNode].mdh;
+    status = GNI_PostFma(ep_hndl_array[destNode],  pd);
+    GNI_RC_CHECK("SMSG Dynamic link", status);
+}
+
 inline 
 static gni_return_t send_smsg_message(int destNode, void *header, int size_header, void *msg, int size, uint8_t tag, int inbuff )
 {
     gni_return_t status = GNI_RC_NOT_DONE;
-    gni_smsg_attr_t      smsg_attr;
+    gni_smsg_attr_t      *smsg_attr;
     gni_post_descriptor_t *pd;
-    
-    mdh_addr_list_t  *new_entry = 0;
+  
+#if PRINT_SYH
+    CmiPrintf("[%d] send_smsg_message\n", CmiMyPe());
+#endif
+
     if(useDynamicSMSG == 1)
     {
         if(smsg_connected_flag[destNode] == 0)
         {
-            if(smsg_available_slot == smsg_expand_slots)
-            {
-                new_entry = (mdh_addr_list_t*)malloc(sizeof(mdh_addr_list_t));
-                new_entry->addr = memalign(64, smsg_memlen*smsg_expand_slots);
-                bzero(new_entry->addr, smsg_memlen*smsg_expand_slots);
-    
-                status = GNI_MemRegister(nic_hndl, (uint64_t)new_entry->addr,
-                    smsg_memlen*smsg_expand_slots, smsg_rx_cqh,
-                    GNI_MEM_READWRITE,   
-                    vmdh_index,
-                    &(new_entry->mdh));
-                smsg_available_slot = 0; 
-                new_entry->next = smsg_dynamic_list;
-                smsg_dynamic_list = new_entry;
-            }
-            smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
-            smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
-            smsg_attr.msg_maxsize = SMSG_MAX_MSG;
-            smsg_attr.mbox_offset = smsg_available_slot * smsg_memlen;
-            smsg_attr.buff_size = smsg_memlen;
-            smsg_attr.msg_buffer = smsg_dynamic_list->addr ;
-            smsg_attr.mem_hndl = smsg_dynamic_list->mdh;
-            MallocPostDesc(pd);
-            pd->type            = GNI_POST_FMA_PUT;
-            pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
-            pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
-            pd->length          = ALIGN4(request_msg->length);
-            pd->local_addr      = (uint64_t) smsg_attr;
-            pd->remote_addr     = smsg_connection_vec[destNode].addr;
-            pd->remote_mem_hndl = smsg_connection_vec[destNode].mdh;
-            status = GNI_PostFma(ep_hndl_array[destNode],  pd);
-            GNI_RC_CHECK("SMSG Dynamic link", status);
-
-        } else if (smsg_connected_flag[destNode] == 1)
+            CmiPrintf("[%d]Init smsg connection\n", CmiMyPe());
+            setup_smsg_connection(destNode);
+            delay_send_small_msg(msg, size, destNode, tag);
+            return status;
+        }
+        else  if(smsg_connected_flag[destNode] == 1)
         {
-            //connection request sent
+            if(inbuff == 0)
+                delay_send_small_msg(msg, size, destNode, tag);
+            return status;
         }
     }
     if(smsg_msglist_index[destNode].head == 0 || inbuff==1)
@@ -852,8 +875,7 @@ static void PumpNetworkSmsg()
     int                 msg_nbytes;
     void                *msg_data;
     gni_mem_handle_t    msg_mem_hndl;
-
+    gni_smsg_attr_t     *smsg_attr; 
     while ((status =GNI_CqGetEvent(smsg_rx_cqh, &event_data)) == GNI_RC_SUCCESS)
     {
         inst_id = GNI_CQ_GET_INST_ID(event_data);
@@ -861,6 +883,26 @@ static void PumpNetworkSmsg()
 #if PRINT_SYH
         CmiPrintf("[%d] PumpNetworkMsgs small msgs is received from PE: %d,  status=%s\n", myrank, inst_id,  gni_err_str[status]);
 #endif
+
+        if(useDynamicSMSG == 1)
+        {
+            if(smsg_connected_flag[inst_id] == 0)
+            {
+                CmiPrintf("[%d]pump Init smsg connection\n", CmiMyPe());
+                smsg_connected_flag[inst_id] = 2;
+                setup_smsg_connection(inst_id);
+                status = GNI_SmsgInit(ep_hndl_array[inst_id], smsg_local_attr_vec[inst_id],  &(((gni_smsg_attr_t*)(setup_mem.addr))[inst_id]));
+                GNI_RC_CHECK("SmsgInit", status);
+                continue;
+            } else if (smsg_connected_flag[inst_id] == 1)
+            {
+                CmiPrintf("[%d]pump setup smsg connection\n", CmiMyPe());
+                smsg_connected_flag[inst_id] = 2;
+                status = GNI_SmsgInit(ep_hndl_array[inst_id], smsg_local_attr_vec[inst_id], &(((gni_smsg_attr_t*)(setup_mem.addr))[inst_id]));
+                GNI_RC_CHECK("SmsgInit", status);
+                continue;
+            }
+        }
         msg_tag = GNI_SMSG_ANY_TAG;
         while( (status = GNI_SmsgGetNextWTag(ep_hndl_array[inst_id], &header, &msg_tag)) == GNI_RC_SUCCESS)
         {
@@ -1096,9 +1138,13 @@ static void PumpLocalRdmaTransactions()
             status = GNI_GetCompleted(smsg_tx_cqh, ev, &tmp_pd);
             ////Message is sent, free message , put is not used now
             if(tmp_pd->type == GNI_POST_RDMA_PUT || tmp_pd->type == GNI_POST_FMA_PUT)
-            {   //persistent message 
-                CmiFree((void *)tmp_pd->local_addr);
-                msg_tag = PUT_DONE_TAG;  
+            {
+                if(smsg_connected_flag[inst_id] == 2) 
+                {
+                    //persistent message 
+                    CmiFree((void *)tmp_pd->local_addr);
+                    msg_tag = PUT_DONE_TAG; 
+                }
             }else if(tmp_pd->type == GNI_POST_RDMA_GET || tmp_pd->type == GNI_POST_FMA_GET)
             {
                 msg_tag = ACK_TAG;  
@@ -1259,7 +1305,7 @@ static int SendBufferMsg()
     }   // end pooling for all cores
 #if PRINT_SYH
     if(lrts_send_msg_id-lrts_smsg_success !=0)
-        CmiPrintf("WRONG [%d buffered msg is empty(%p) (actually=%d)(buffercounter=%d)\n", myrank, smsg_msglist_head[i], (lrts_send_msg_id-lrts_smsg_success, buffered_smsg_counter));
+        CmiPrintf("WRONG [%d buffered msg is empty]\n", myrank);
 #endif
     return done;
 }
@@ -1267,34 +1313,37 @@ static int SendBufferMsg()
 static void LrtsAdvanceCommunication()
 {
     /*  Receive Msg first */
-    //CmiPrintf("Calling Lrts Pump Msg PE:%d\n", CmiMyPe());
+    //CmiPrintf("Calling Lrts Pump Msg PE:%d\n", myrank);
     PumpNetworkSmsg();
     //CmiPrintf("Calling Lrts Pump RdmaMsg PE:%d\n", CmiMyPe());
     //PumpNetworkRdmaMsgs();
     /* Release Sent Msg */
     //CmiPrintf("Calling Lrts Rlease Msg PE:%d\n", CmiMyPe());
     //PumpLocalSmsgTransactions();
-    //CmiPrintf("Calling Lrts Rlease RdmaMsg PE:%d\n", CmiMyPe());
+    //CmiPrintf("Calling Lrts Rlease RdmaMsg PE:%d\n", myrank);
     PumpLocalRdmaTransactions();
-    //CmiPrintf("Calling Lrts Send Buffmsg PE:%d\n", CmiMyPe());
+    //CmiPrintf("Calling Lrts Send Buffmsg PE:%d\n", myrank);
     /* Send buffered Message */
     SendBufferMsg();
+    //CmiPrintf("Calling Lrts rdma PE:%d\n", myrank);
     SendRdmaMsg();
+    //CmiPrintf("done PE:%d\n", myrank);
 }
 
 static void _init_dynamic_smsg()
 {
     gni_smsg_attr_t smsg_attr;
-    mdh_addr_t current_addr;
     gni_return_t status;
     smsg_connected_flag = (int*)malloc(sizeof(int)*mysize);
     memset(smsg_connected_flag, 0, mysize*sizeof(int));
 
-    current_addr.addr = (uint64_t)malloc(mysize * SMSG_CONN_SIZE);
-    status = MEMORY_REGISTER(onesided_hnd, nic_hndl, smsg_connection_addr, mysize * SMSG_CONN_SIZE,  &(current_addr.mdh), &omdh);
+    CmiPrintf("start Dynamic init done %d\n", myrank);
+    smsg_local_attr_vec = (gni_smsg_attr_t**) malloc(sizeof(gni_smsg_attr_t*) *mysize);
+    setup_mem.addr = (uint64_t)malloc(mysize * SMSG_CONN_SIZE);
+    status = MEMORY_REGISTER(onesided_hnd, nic_hndl, smsg_connection_addr, mysize * SMSG_CONN_SIZE,  &(setup_mem.mdh), &omdh);
     
     smsg_connection_vec = (mdh_addr_t*) malloc(mysize*sizeof(mdh_addr_t)); 
-    allgather(&current_addr, smsg_connection_vec, sizeof(mdh_addr_t));
+    allgather(&setup_mem, smsg_connection_vec, sizeof(mdh_addr_t));
     
     //pre-allocate some memory as mailbox for dynamic connection
     if(mysize <=4096)
@@ -1321,9 +1370,10 @@ static void _init_dynamic_smsg()
     status = GNI_MemRegister(nic_hndl, (uint64_t)smsg_dynamic_list->addr,
             smsg_memlen*smsg_expand_slots, smsg_rx_cqh,
             GNI_MEM_READWRITE,   
-            vmdh_index,
+            -1,
             &(smsg_dynamic_list->mdh));
    smsg_available_slot = 0;  
+   CmiPrintf(" Dynamic init done %d\n", myrank);
 }
 
 static void _init_static_smsg()
@@ -1371,7 +1421,6 @@ static void _init_static_smsg()
 
     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
 
-#if 1
     base_infor.addr =  (uint64_t)smsg_mailbox_base;
     base_infor.mdh =  my_smsg_mdh_mailbox;
     base_addr_vec = malloc(mysize * sizeof(mdh_addr_t));
@@ -1409,65 +1458,27 @@ static void _init_static_smsg()
     } //end initialization
 
     free(base_addr_vec);
-#else
-    for(i=0; i<mysize; i++)
-    {
-        if(i==myrank)
-            continue;
-        smsg_attr[i].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
-        smsg_attr[i].mbox_maxcredit = SMSG_MAX_CREDIT;
-        smsg_attr[i].msg_maxsize = SMSG_MAX_MSG;
-        if(i<myrank)
-            smsg_attr[i].mbox_offset = i*smsg_memlen;
-        else
-            smsg_attr[i].mbox_offset = (i-1)*smsg_memlen;
 
-        smsg_attr[i].msg_buffer = smsg_mailbox_base;
-        smsg_attr[i].buff_size = smsg_memlen;
-        smsg_attr[i].mem_hndl = my_smsg_mdh_mailbox;
-#if 0 
-        if(i<2)
-        {
-            CmiPrintf("[assign %d==%d]  offset=%d buf=%p,credit=%d size=%d, bufsize=%d\n", myrank, i, (smsg_attr[i]).mbox_offset, (smsg_attr[i]).msg_buffer, (smsg_attr[i]).mbox_maxcredit, (smsg_attr[i]).msg_maxsize, (smsg_attr[i]).buff_size ); 
-#endif 
-        }
-    }
-    smsg_attr_vec = (gni_smsg_attr_t*)malloc(mysize * mysize * sizeof(gni_smsg_attr_t));
-    _MEMCHECK(smsg_attr_vec);
-    if(myrank==0)
-        CmiPrintf("total size=%d\n", mysize*mysize*sizeof(gni_smsg_attr_t));
-    allgather(smsg_attr, smsg_attr_vec,  mysize*sizeof(gni_smsg_attr_t));
-    
-    CmiBarrier();
-    for(i=0; i<mysize; i++)
-    {
-        if (myrank == i) continue;
-        /* initialize the smsg channel */
-        status = GNI_SmsgInit(ep_hndl_array[i], &smsg_attr[i], &smsg_attr_vec[i*mysize+myrank]);
-#if 0 
-        if(i<2)
-        {
-            CmiPrintf("[%d==%d] SmsgInit rc=%s, offset=%d buf=%p,credit=%d size=%d, bufsize=%d\n", myrank, i, gni_err_str[status], (smsg_attr_vec[i*mysize+myrank]).mbox_offset, (smsg_attr_vec[i*mysize+myrank]).msg_buffer, (smsg_attr_vec[i*mysize+myrank]).mbox_maxcredit, (smsg_attr_vec[i*mysize+myrank]).msg_maxsize, (smsg_attr_vec[i*mysize+myrank]).buff_size ); 
-        }
-#endif
-        GNI_RC_CHECK("SMSG Init", status);
-    } //end initialization
-    CmiBarrier();
-    free(smsg_attr_vec);
-#endif
     free(smsg_attr);
     status = GNI_SmsgSetMaxRetrans(nic_hndl, 4096);
-    GNI_RC_CHECK("SmsgSetMaxRetrans Init", status);
-    smsg_msglist_index = (MSG_LIST_INDEX*)malloc(mysize*sizeof(MSG_LIST_INDEX));
-    for(i =0; i<mysize; i++)
-    {
+     GNI_RC_CHECK("SmsgSetMaxRetrans Init", status);
+} 
+
+inline
+static void _init_smsg()
+{
+    int i;
+
+     smsg_msglist_index = (MSG_LIST_INDEX*)malloc(mysize*sizeof(MSG_LIST_INDEX));
+     for(i =0; i<mysize; i++)
+     {
         smsg_msglist_index[i].next = -1;
         smsg_msglist_index[i].head = 0;
         smsg_msglist_index[i].tail = 0;
-    }
-    smsg_head_index = -1;
-} 
+     }
+     smsg_head_index = -1;
+
+}
 
 static void _init_static_msgq()
 {
@@ -1539,7 +1550,7 @@ static void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID)
     //void (*remote_bte_event_handler)(gni_cq_entry_t *, void *)  = &RemoteBteEventHandle;
    
     //Mempool_MaxSize = CmiGetArgFlag(*argv, "+useMemorypoolSize");
-    useDynamicSMSG = CmiGetArgFlag(*argv, "+useDynamicSmsg");
+    //useDynamicSMSG = CmiGetArgFlag(*argv, "+useDynamicSmsg");
     if (myrank==0) 
     {
         if(useDynamicSMSG) 
@@ -1636,6 +1647,8 @@ static void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID)
         {
             _init_dynamic_smsg();
         }
+
+        _init_smsg();
     }
 #if     USE_LRTS_MEMPOOL
     CmiGetArgLong(*argv, "+useMemorypoolSize", &_mempool_size);