re-structure gemini send smsg
[charm.git] / src / arch / gemini_gni / machine.c
index c1d6395a3f992519ec6901c9f2eb203d13aad163..0ab9a2a5712c4e5faac43901891393a1b4f2aedf 100644 (file)
@@ -35,6 +35,7 @@ static void sleep(int secs) {
 #include <unistd.h> /*For getpid()*/
 #endif
 
+
 #define REMOTE_EVENT         0
 #define USE_LRTS_MEMPOOL   1
 
@@ -125,12 +126,23 @@ static int  SMSG_MAX_MSG;
 #define ALIGN4(x)        (size_t)((~3)&((x)+3)) 
 
 static int Mempool_MaxSize = 1024*1024*128;
-static int useStaticSMSG   = 1;
+static int useDynamicSMSG   = 0;
 static int useStaticMSGQ = 0;
 static int useStaticFMA = 0;
 static int mysize, myrank;
 static gni_nic_handle_t      nic_hndl;
 
+typedef struct {
+    gni_mem_handle_t mdh;
+    uint64_t addr;
+} mdh_addr_t;
+// this is related to dynamic SMSG
+
+#define     SMSG_CONN_SIZE     24
+int     *smsg_connected_flag= 0;
+char    *smsg_connection_addr = 0;
+mdh_addr_t    *smsg_connection_vec = 0;
+gni_mem_handle_t    smsg_connection_memhndl;
 
 static void             *smsg_mailbox_base;
 gni_msgq_attr_t         msgq_attrs;
@@ -138,11 +150,6 @@ gni_msgq_handle_t       msgq_handle;
 gni_msgq_ep_attr_t      msgq_ep_attrs;
 gni_msgq_ep_attr_t      msgq_ep_attrs_size;
 
-/* preallocated memory buffer for FMA for short message and control message */
-typedef struct {
-    gni_mem_handle_t mdh;
-    uint64_t addr;
-} mdh_addr_t;
 
 
 /* preallocated DMA buffer */
@@ -563,17 +570,16 @@ static void delay_send_small_msg(void *msg, int size, int destNode, uint8_t tag)
 #endif
 }
 
-// messages smaller than max single SMSG
-inline
-static void send_small_messages(int destNode, int size, char *msg)
+inline 
+static gni_return_t send_smsg_message(int destNode, void *header, int size_header, void *msg, int size, uint8_t tag, int inbuff )
 {
-    gni_return_t        status  =   GNI_RC_SUCCESS;
-    const uint8_t       tag_data    = SMALL_DATA_TAG;
+    gni_return_t status = GNI_RC_NOT_DONE;
     
-    if(smsg_msglist_index[destNode].head == 0)
+    //if(useDynamicSMSG == 0 ) 
+    if(smsg_msglist_index[destNode].head == 0 || inbuff==1)
     {
-        status = GNI_SmsgSendWTag(ep_hndl_array[destNode], NULL, 0, msg, size, 0, SMALL_DATA_TAG);
-        if (status == GNI_RC_SUCCESS)
+        status = GNI_SmsgSendWTag(ep_hndl_array[destNode], header, size_header, msg, size, 0, tag);
+        if(status == GNI_RC_SUCCESS)
         {
 #if PRINT_SYH
             lrts_smsg_success++;
@@ -582,15 +588,13 @@ static void send_small_messages(int destNode, int size, char *msg)
             else
                 CmiPrintf("BAD [%d==>%d] sent done%d (msgs=%d)\n", myrank, destNode, lrts_smsg_success, lrts_send_msg_id);
 #endif
-            CmiFree(msg);
-            return;
-        }else if (status == GNI_RC_INVALID_PARAM)
-        {
-            CmiAbort("SmsgSen fails\n");
+            return status;
         }
+    }else {
+        if(inbuff ==0)
+            delay_send_small_msg(msg, size, destNode, tag);
     }
-    //buffer this message when buffer_msg_head!=0 or send fails
-    delay_send_small_msg(msg, size, destNode, SMALL_DATA_TAG);
+    return status;
 }
 
 // Get first 0 in DMA_tags starting from index
@@ -665,7 +669,7 @@ static int send_medium_messages(int destNode, int size, char *msg)
 
 // Large message, send control to receiver, receiver register memory and do a GET 
 inline
-static int send_large_messages(int destNode, int size, char *msg)
+static void send_large_messages(int destNode, int size, char *msg)
 {
 #if     USE_LRTS_MEMPOOL
     gni_return_t        status  =   GNI_RC_SUCCESS;
@@ -682,22 +686,11 @@ static int send_large_messages(int destNode, int size, char *msg)
     lrts_send_msg_id++;
     CmiPrintf("Large LrtsSend PE:%d==>%d, size=%d, messageid:%d LMSG\n", myrank, destNode, size, lrts_send_msg_id);
 #endif
-    status = GNI_SmsgSendWTag(ep_hndl_array[destNode], 0, 0, control_msg_tmp, sizeof(CONTROL_MSG), 0, LMSG_INIT_TAG);
+    status = send_smsg_message( destNode, 0, 0, control_msg_tmp, sizeof(CONTROL_MSG), LMSG_INIT_TAG, 0);  
     if(status == GNI_RC_SUCCESS)
     {
-#if PRINT_SYH
-        lrts_smsg_success++;
-        if(lrts_smsg_success == lrts_send_msg_id)
-            CmiPrintf("GOOD [%d==>%d] sent done%d (msgs=%d)\n", myrank, destNode, lrts_smsg_success, lrts_send_msg_id);
-        else
-            CmiPrintf("BAD [%d==>%d] sent done%d (msgs=%d)\n", myrank, destNode, lrts_smsg_success, lrts_send_msg_id);
-#endif
         FreeControlMsg(control_msg_tmp);
-        return 1;
     }
-
-    delay_send_small_msg((char*)control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
-    return 0;
 // NOT use mempool, should slow 
 #else
     gni_return_t        status  =   GNI_RC_SUCCESS;
@@ -714,32 +707,21 @@ static int send_large_messages(int destNode, int size, char *msg)
     lrts_send_msg_id++;
     CmiPrintf("Large LrtsSend PE:%d==>%d, size=%d, messageid:%d LMSG\n", myrank, destNode, size, lrts_send_msg_id);
 #endif
-    if(smsg_msglist_index[destNode].head == 0 )
+    status = MEMORY_REGISTER(onesided_hnd, nic_hndl,msg, ALIGN4(size), &(control_msg_tmp->source_mem_hndl), &omdh);
+    if(status == GNI_RC_SUCCESS)
     {
-        status = MEMORY_REGISTER(onesided_hnd, nic_hndl,msg, ALIGN4(size), &(control_msg_tmp->source_mem_hndl), &omdh);
+        status = send_smsg_message( destNode, 0, 0, control_msg_tmp, sizeof(CONTROL_MSG), LMSG_INIT_TAG, 0);  
         if(status == GNI_RC_SUCCESS)
         {
-            status = GNI_SmsgSendWTag(ep_hndl_array[destNode], 0, 0, control_msg_tmp, sizeof(CONTROL_MSG), 0, LMSG_INIT_TAG);
-            if(status == GNI_RC_SUCCESS)
-            {
-#if PRINT_SYH
-                lrts_smsg_success++;
-                if(lrts_smsg_success == lrts_send_msg_id)
-                    CmiPrintf("GOOD [%d==>%d] sent done%d (msgs=%d)\n", myrank, destNode, lrts_smsg_success, lrts_send_msg_id);
-                else
-                    CmiPrintf("BAD [%d==>%d] sent done%d (msgs=%d)\n", myrank, destNode, lrts_smsg_success, lrts_send_msg_id);
-#endif
-                FreeControlMsg(control_msg_tmp);
-                return 1;
-            }
-
-        } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
-        {
-            CmiAbort("Memory registor for large msg\n");
+            FreeControlMsg(control_msg_tmp);
         }
+    } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
+    {
+        CmiAbort("Memory registor for large msg\n");
+    }else 
+    {
+        delay_send_small_msg(control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
     }
-    delay_send_small_msg((char*)control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
-    return 0;
 #endif
 }
 
@@ -750,6 +732,7 @@ inline void LrtsPrepareEnvelope(char *msg, int size)
 
 static CmiCommHandle LrtsSendFunc(int destNode, int size, char *msg, int mode)
 {
+    gni_return_t        status  =   GNI_RC_SUCCESS;
     LrtsPrepareEnvelope(msg, size);
 
     if(size <= SMSG_MAX_MSG)
@@ -758,7 +741,11 @@ static CmiCommHandle LrtsSendFunc(int destNode, int size, char *msg, int mode)
         lrts_send_msg_id++;
         CmiPrintf("SMSG LrtsSend PE:%d==>%d, size=%d, messageid:%d\n", myrank, destNode, size, lrts_send_msg_id);
 #endif
-        send_small_messages(destNode, size, msg);
+        status = send_smsg_message( destNode, 0, 0, msg, size, SMALL_DATA_TAG, 0);  
+        if(status == GNI_RC_SUCCESS)
+        {
+            CmiFree(msg);
+        }
     }
     else
     {
@@ -1044,9 +1031,6 @@ static void PumpLocalRdmaTransactions()
         //lrts_local_done_msg++;
         CmiPrintf("**[%d] SMSGPumpLocalTransactions (type=%d)\n", myrank, type);
 #endif
-        /*if(type == GNI_CQ_EVENT_TYPE_SMSG)
-        {
-        }else */
         if (type == GNI_CQ_EVENT_TYPE_POST)
         {
             inst_id     = GNI_CQ_GET_INST_ID(ev);
@@ -1055,7 +1039,6 @@ static void PumpLocalRdmaTransactions()
 #endif
             //status = GNI_GetCompleted(post_tx_cqh, ev, &tmp_pd);
             status = GNI_GetCompleted(smsg_tx_cqh, ev, &tmp_pd);
-            //CmiPrintf("**[%d] SMSGPumpLocalTransactions local done(type=%d) length=%d, size=%d\n", myrank, type, tmp_pd->length, SIZEFIELD((void*)(tmp_pd->local_addr)) );
             ////Message is sent, free message , put is not used now
             if(tmp_pd->type == GNI_POST_RDMA_PUT || tmp_pd->type == GNI_POST_FMA_PUT)
             {   //persistent message 
@@ -1074,25 +1057,10 @@ static void PumpLocalRdmaTransactions()
             lrts_send_msg_id++;
             CmiPrintf("ACK LrtsSend PE:%d==>%d, size=%d, messageid:%d ACK\n", myrank, inst_id, sizeof(CONTROL_MSG), lrts_send_msg_id);
 #endif
-            if(smsg_msglist_index[inst_id].head != 0 )
-            {
-                delay_send_small_msg(ack_msg_tmp, sizeof(CONTROL_MSG), inst_id, msg_tag);
-            }else
+            status = send_smsg_message(inst_id, 0, 0, ack_msg_tmp, sizeof(CONTROL_MSG), msg_tag, 0);  
+            if(status == GNI_RC_SUCCESS)
             {
-                status = GNI_SmsgSendWTag(ep_hndl_array[inst_id], 0, 0, ack_msg_tmp, sizeof(CONTROL_MSG), 0, msg_tag);
-                if(status == GNI_RC_SUCCESS)
-                {
-#if PRINT_SYH
-                    lrts_smsg_success++;
-                    CmiPrintf("[%d==>%d] sent ACK done%d\n", myrank, inst_id, lrts_smsg_success);
-#endif
-                    FreeControlMsg(ack_msg_tmp);
-                }else if(status == GNI_RC_NOT_DONE || status == GNI_RC_ERROR_RESOURCE)
-                {
-                    delay_send_small_msg(ack_msg_tmp, sizeof(CONTROL_MSG), inst_id, msg_tag);
-                }
-                else
-                    GNI_RC_CHECK("GNI_SmsgSendWTag", status);
+                FreeControlMsg(ack_msg_tmp);
             }
 #if     !USE_LRTS_MEMPOOL
             MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh);
@@ -1163,22 +1131,13 @@ static int SendBufferMsg()
         ptr = smsg_msglist_index[index].head;
        
         while(ptr!=0)
-        {
-        if(useStaticSMSG)
         {
             CmiAssert(ptr!=NULL);
             if(ptr->tag == SMALL_DATA_TAG)
             {
-                status = GNI_SmsgSendWTag(ep_hndl_array[ptr->destNode], NULL, 0, ptr->msg, ptr->size, 0, SMALL_DATA_TAG);
-               // CmiPrintf("[%d==>%d] buffer send call(%d)%s\n", myrank, ptr->destNode, lrts_smsg_success, gni_err_str[status] );
-                if(status == GNI_RC_SUCCESS) {
-#if PRINT_SYH
-                    lrts_smsg_success++;
-                    if(lrts_smsg_success == lrts_send_msg_id)
-                        CmiPrintf("GOOD [%d==>%d] sent done%d (msgs=%d)\n", myrank, ptr->destNode, lrts_smsg_success, lrts_send_msg_id);
-                    else
-                        CmiPrintf("BAD [%d==>%d] sent done%d (msgs=%d)\n", myrank, ptr->destNode, lrts_smsg_success, lrts_send_msg_id);
-#endif
+                status = send_smsg_message( ptr->destNode, 0, 0, ptr->msg, ptr->size, SMALL_DATA_TAG, 1);  
+                if(status == GNI_RC_SUCCESS)
+                {
                     CmiFree(ptr->msg);
                 }
             }
@@ -1196,28 +1155,17 @@ static int SendBufferMsg()
                         break;
                     }
                 }
-                status = GNI_SmsgSendWTag(ep_hndl_array[ptr->destNode], 0, 0, ptr->msg, sizeof(CONTROL_MSG), 0, LMSG_INIT_TAG);
-                if(status == GNI_RC_SUCCESS) {
-#if PRINT_SYH
-                    lrts_smsg_success++;
-                    CmiPrintf("[%d==>%d] sent LMSG done%d\n", myrank, ptr->destNode, lrts_smsg_success);
-#endif
+                
+                status = send_smsg_message( ptr->destNode, 0, 0, ptr->msg, sizeof(CONTROL_MSG), LMSG_INIT_TAG, 1);  
+                if(status == GNI_RC_SUCCESS)
+                {
                     FreeControlMsg((CONTROL_MSG*)(ptr->msg));
                 }
             }else if (ptr->tag == ACK_TAG)
             {
-#if PRINT_SYH
-                CmiPrintf("[%d==>%d] ACK buffer send call(%d)%s\n", myrank, ptr->destNode, lrts_smsg_success, gni_err_str[status] );
-#endif
-                status = GNI_SmsgSendWTag(ep_hndl_array[ptr->destNode], 0, 0, ptr->msg, sizeof(CONTROL_MSG), 0, ACK_TAG);
-                if(status == GNI_RC_SUCCESS) {
-#if PRINT_SYH
-                    lrts_smsg_success++;
-                    if(lrts_smsg_success == lrts_send_msg_id)
-                        CmiPrintf("GOOD [%d==>%d] sent done%d (msgs=%d)\n", myrank, ptr->destNode, lrts_smsg_success, lrts_send_msg_id);
-                    else
-                        CmiPrintf("BAD [%d==>%d] sent done%d (msgs=%d)\n", myrank, ptr->destNode, lrts_smsg_success, lrts_send_msg_id);
-#endif
+                status = send_smsg_message( ptr->destNode, 0, 0, ptr->msg, sizeof(CONTROL_MSG), ACK_TAG, 1);  
+                if(status == GNI_RC_SUCCESS)
+                {
                     FreeControlMsg((CONTROL_MSG*)ptr->msg);
                 }
             }else
@@ -1225,7 +1173,7 @@ static int SendBufferMsg()
                 CmiPrintf("Weird tag\n");
                 CmiAbort("should not happen\n");
             }
-        } 
+        } //end while 
         if(status == GNI_RC_SUCCESS)
         {
             smsg_msglist_index[index].head = smsg_msglist_index[index].head->next;
@@ -1242,7 +1190,6 @@ static int SendBufferMsg()
             done = 0;
             break;
         }
-        } //end pooling this i-th core
         if(ptr == 0)
         {
             if(index_previous != -1)
@@ -1280,6 +1227,20 @@ static void LrtsAdvanceCommunication()
     SendRdmaMsg();
 }
 
+static void _init_dynamic_smsg()
+{
+    mdh_addr_t current_addr;
+    gni_return_t status;
+    smsg_connected_flag = (int*)malloc(sizeof(int)*mysize);
+    memset(smsg_connected_flag, 0, mysize*sizeof(int));
+
+    current_addr.addr = (uint64_t)malloc(mysize * SMSG_CONN_SIZE);
+    status = MEMORY_REGISTER(onesided_hnd, nic_hndl, smsg_connection_addr, mysize * SMSG_CONN_SIZE,  &(current_addr.mdh), &omdh);
+    
+    smsg_connection_vec = (mdh_addr_t*) malloc(mysize*sizeof(mdh_addr_t)); 
+    allgather(&current_addr, smsg_connection_vec, sizeof(mdh_addr_t));
+}
+
 static void _init_static_smsg()
 {
     gni_smsg_attr_t      *smsg_attr;
@@ -1494,7 +1455,14 @@ static void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID)
     //void (*remote_bte_event_handler)(gni_cq_entry_t *, void *)  = &RemoteBteEventHandle;
    
     //Mempool_MaxSize = CmiGetArgFlag(*argv, "+useMemorypoolSize");
-    //useStaticSMSG = CmiGetArgFlag(*argv, "+useStaticSmsg");
+    useDynamicSMSG = CmiGetArgFlag(*argv, "+useDynamicSmsg");
+    if (myrank==0) 
+    {
+        if(useDynamicSMSG) 
+            CmiPrintf("Charm++> use Dynamic SMSG\n"); 
+        else 
+            CmiPrintf("Charm++> use Static SMSG\n");
+    };
     //useStaticMSGQ = CmiGetArgFlag(*argv, "+useStaticMsgQ");
     
     status = PMI_Init(&first_spawned);
@@ -1577,12 +1545,12 @@ static void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID)
     /* SMSG is fastest but not scale; Msgq is scalable, FMA is own implementation for small message */
     if(mysize > 1)
     {
-        if(useStaticSMSG == 1)
+        if(useDynamicSMSG == 0)
         {
-            _init_static_smsg(mysize);
-        }else if(useStaticMSGQ == 1)
+            _init_static_smsg();
+        }else
         {
-            _init_static_msgq();
+            _init_dynamic_smsg();
         }
     }
 #if     USE_LRTS_MEMPOOL