Merge branch 'charm' of charmgit:charm into charm
authorNikhil Jain <nikhil@illinois.edu>
Thu, 10 Nov 2011 01:29:57 +0000 (19:29 -0600)
committerNikhil Jain <nikhil@illinois.edu>
Thu, 10 Nov 2011 01:29:57 +0000 (19:29 -0600)
src/arch/gemini_gni/machine.c
src/libs/ck-libs/MeshStreamer/MeshStreamer.h

index b6eae7d6100851f4f85463eb4b28e512a4fca960..72a455415aa718e93f710067ab0d5f1e490a626b 100644 (file)
@@ -46,11 +46,11 @@ static void sleep(int secs) {
 
 #define oneMB (1024ll*1024)
 #if CMK_SMP
-static CmiInt8 _mempool_size = 32*oneMB;
+static CmiInt8 _mempool_size = 4*oneMB;
 #else
 static CmiInt8 _mempool_size = 32*oneMB;
 #endif
-static CmiInt8 _expand_mem =  16*oneMB;
+static CmiInt8 _expand_mem =  1*oneMB;
 #endif
 
 #define PRINT_SYH  0
@@ -105,6 +105,11 @@ uint8_t   onesided_hnd, omdh;
 
 /* =======Beginning of Definitions of Performance-Specific Macros =======*/
 /* If SMSG is not used */
+#define BIG_MSG       1*oneMB
+#define ONE_SEG       1*oneMB
+
+//#define BIG_MSG        65536 
+//#define ONE_SEG        16384
 #define FMA_PER_CORE  1024
 #define FMA_BUFFER_SIZE 1024
 /* If SMSG is used */
@@ -120,6 +125,7 @@ static int  SMSG_MAX_MSG = 1024;
 #define REMOTE_QUEUE_ENTRIES  20480 
 #define LOCAL_QUEUE_ENTRIES   20480 
 
+#define BIG_MSG_TAG  0x26
 #define PUT_DONE_TAG      0x29
 #define ACK_TAG           0x30
 /* SMSG is data message */
@@ -128,6 +134,8 @@ static int  SMSG_MAX_MSG = 1024;
 #define MEDIUM_HEAD_TAG         0x32
 #define MEDIUM_DATA_TAG         0x33
 #define LMSG_INIT_TAG     0x39 
+#define VERY_LMSG_INIT_TAG     0x40 
+#define VERY_LMSG_TAG     0x41 
 
 #define DEBUG
 #ifdef GNI_RC_CHECK
@@ -140,7 +148,7 @@ static int  SMSG_MAX_MSG = 1024;
 #endif
 
 #define ALIGN64(x)       (size_t)((~63)&((x)+63))
-#define ALIGN4(x)        (size_t)((~3)&((x)+3)) 
+//#define ALIGN4(x)        (size_t)((~3)&((x)+3)) 
 
 #define     useDynamicSMSG    0
 //static int useDynamicSMSG   = 1;
@@ -245,8 +253,10 @@ typedef struct medium_msg_list
 typedef struct control_msg
 {
     uint64_t            source_addr;
+    uint64_t            dest_addr;
     int                 source;               /* source rank */
     int                 length;
+    int                 seq_id;                 //big message   -1 meaning single message
     gni_mem_handle_t    source_mem_hndl;
     struct control_msg *next;
 }CONTROL_MSG;
@@ -662,10 +672,12 @@ static void setup_smsg_connection(int destNode)
         rdma_request_msg->pd = pd;
         /* buffer this request */
     }
+#if PRINT_SYH
     if(status != GNI_RC_SUCCESS)
        printf("[%d=%d] send post FMA %s\n", myrank, destNode, gni_err_str[status]);
     else
         printf("[%d=%d]OK send post FMA \n", myrank, destNode);
+#endif
     //GNI_RC_CHECK("SMSG Dynamic link", status);
 }
 
@@ -703,10 +715,7 @@ static gni_return_t send_smsg_message(int destNode, void *header, int size_heade
         {
 #if PRINT_SYH
             lrts_smsg_success++;
-            if(lrts_smsg_success == lrts_send_msg_id)
-                printf("GOOD [%d==>%d] sent done%d (msgs=%d)\n", myrank, destNode, lrts_smsg_success, lrts_send_msg_id);
-            else
-                printf("BAD [%d==>%d] sent done%d (msgs=%d)\n", myrank, destNode, lrts_smsg_success, lrts_send_msg_id);
+            printf("[%d==>%d] send done%d (msgs=%d)\n", myrank, destNode, lrts_smsg_success, lrts_send_msg_id);
 #endif     
             return status;
         }
@@ -793,9 +802,15 @@ inline static CONTROL_MSG* construct_control_msg(int size, char *msg)
     MallocControlMsg(control_msg_tmp);
     control_msg_tmp->source_addr    = (uint64_t)msg;
     control_msg_tmp->source         = myrank;
-    control_msg_tmp->length         =ALIGN4(size); //for GET 4 bytes aligned 
+    control_msg_tmp->length         =ALIGN64(size); //for GET 4 bytes aligned 
 #if     USE_LRTS_MEMPOOL
-    control_msg_tmp->source_mem_hndl = GetMemHndl(msg);
+    if(size < BIG_MSG)
+        control_msg_tmp->source_mem_hndl = GetMemHndl(msg);
+    else
+    {
+        control_msg_tmp->source_mem_hndl.qword1 = 0;
+        control_msg_tmp->source_mem_hndl.qword2 = 0;
+    }
 #else
     control_msg_tmp->source_mem_hndl.qword1 = 0;
     control_msg_tmp->source_mem_hndl.qword2 = 0;
@@ -805,21 +820,44 @@ inline static CONTROL_MSG* construct_control_msg(int size, char *msg)
 
 // Large message, send control to receiver, receiver register memory and do a GET 
 inline
-static void send_large_messages(int destNode, int size, char *msg)
+static void send_large_messages(int destNode, CONTROL_MSG  *control_msg_tmp)
 {
     gni_return_t        status  =   GNI_RC_SUCCESS;
-    CONTROL_MSG         *control_msg_tmp;
     uint32_t            vmdh_index  = -1;
+    int                 size;
 
-   control_msg_tmp =  construct_control_msg(size, msg);
+    size    =   control_msg_tmp->length;
 #if     USE_LRTS_MEMPOOL
-    status = send_smsg_message( destNode, 0, 0, control_msg_tmp, sizeof(CONTROL_MSG), LMSG_INIT_TAG, 0);  
-    if(status == GNI_RC_SUCCESS)
+    if( control_msg_tmp ->seq_id == 0 ){
+        status = send_smsg_message( destNode, 0, 0, control_msg_tmp, sizeof(CONTROL_MSG), LMSG_INIT_TAG, 0);  
+        if(status == GNI_RC_SUCCESS)
+        {
+            FreeControlMsg(control_msg_tmp);
+        }
+    }else
     {
-        FreeControlMsg(control_msg_tmp);
+        if( control_msg_tmp->seq_id == 1)
+            size = ONE_SEG;
+
+        status = MEMORY_REGISTER(onesided_hnd, nic_hndl, control_msg_tmp->source_addr, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), &omdh);
+        if(status == GNI_RC_SUCCESS)
+        {
+            status = send_smsg_message( destNode, 0, 0, control_msg_tmp, sizeof(CONTROL_MSG), LMSG_INIT_TAG, 0);  
+            if(status == GNI_RC_SUCCESS)
+            {
+                FreeControlMsg(control_msg_tmp);
+            }
+        } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
+        {
+            CmiAbort("Memory registor for large msg\n");
+        }else 
+        {
+            buffer_small_msgs(control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
+        }
+
     }
 #else
-    status = MEMORY_REGISTER(onesided_hnd, nic_hndl,msg, ALIGN4(size), &(control_msg_tmp->source_mem_hndl), &omdh);
+    status = MEMORY_REGISTER(onesided_hnd, nic_hndl,msg, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), &omdh);
     if(status == GNI_RC_SUCCESS)
     {
         status = send_smsg_message( destNode, 0, 0, control_msg_tmp, sizeof(CONTROL_MSG), LMSG_INIT_TAG, 0);  
@@ -829,7 +867,6 @@ static void send_large_messages(int destNode, int size, char *msg)
         }
     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
     {
-        printf("!!!!!!!!!!!Please try to use large page + module load craype-hugepages8m+ or contact charm++ developer for help\n");
         CmiAbort("Memory registor for large msg\n");
     }else 
     {
@@ -847,25 +884,28 @@ CmiCommHandle LrtsSendFunc(int destNode, int size, char *msg, int mode)
 {
 
     gni_return_t        status  =   GNI_RC_SUCCESS;
+    uint8_t tag;
+    CONTROL_MSG         *control_msg_tmp;
     LrtsPrepareEnvelope(msg, size);
 #if CMK_SMP
-    CONTROL_MSG         *control_msg_tmp;
 #if COMM_THREAD_SEND
     if(size <= SMSG_MAX_MSG)
         buffer_small_msgs(msg, size, destNode, SMALL_DATA_TAG);
     else
     {
         control_msg_tmp =  construct_control_msg(size, msg);
+        if(size < BIG_MSG)
+            control_msg_tmp->seq_id = 0;
+        else
+        {
+            control_msg_tmp->seq_id = 1;
+        }
         buffer_small_msgs(control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
     }
 #endif
 #else
     if(size <= SMSG_MAX_MSG)
     {
-#if PRINT_SYH
-        lrts_send_msg_id++;
-        printf("SMSG LrtsSend PE:%d==>%d, size=%d, messageid:%d\n", myrank, destNode, size, lrts_send_msg_id);
-#endif
         status = send_smsg_message( destNode, 0, 0, msg, size, SMALL_DATA_TAG, 0);  
         if(status == GNI_RC_SUCCESS)
         {
@@ -874,7 +914,18 @@ CmiCommHandle LrtsSendFunc(int destNode, int size, char *msg, int mode)
     }
     else
     {
-        send_large_messages(destNode, size, msg);
+        control_msg_tmp =  construct_control_msg(size, msg);
+#if     USE_LRTS_MEMPOOL
+        if(size < BIG_MSG)
+            control_msg_tmp->seq_id = 0;
+        else
+        {
+            control_msg_tmp->seq_id = 1;
+        }
+#else
+        control_msg_tmp->seq_id = 0;
+#endif
+        send_large_messages(destNode, control_msg_tmp);
     }
 #endif
     return 0;
@@ -932,12 +983,13 @@ static void PumpNetworkSmsg()
     gni_smsg_attr_t     *smsg_attr;
     gni_smsg_attr_t     *remote_smsg_attr;
     int                 init_flag;
+    CONTROL_MSG         *control_msg_tmp, *header_tmp;
     while ((status =GNI_CqGetEvent(smsg_rx_cqh, &event_data)) == GNI_RC_SUCCESS)
     {
         inst_id = GNI_CQ_GET_INST_ID(event_data);
         // GetEvent returns success but GetNext return not_done. caused by Smsg out-of-order transfer
 #if PRINT_SYH
-        printf("[%d] PumpNetworkMsgs small msgs is received from PE: %d,  status=%s\n", myrank, inst_id,  gni_err_str[status]);
+        printf("[%d] PumpNetworkMsgs is received from PE: %d,  status=%s\n", myrank, inst_id,  gni_err_str[status]);
 #endif
 #if     useDynamicSMSG
         rdma_id++;
@@ -969,10 +1021,6 @@ static void PumpNetworkSmsg()
         msg_tag = GNI_SMSG_ANY_TAG;
         while( (status = GNI_SmsgGetNextWTag(ep_hndl_array[inst_id], &header, &msg_tag)) == GNI_RC_SUCCESS)
         {
-#if PRINT_SYH
-            lrts_received_msg++;
-            printf("+++[%d] PumpNetwork msg is received, messageid:%d tag=%d\n", myrank, lrts_received_msg, msg_tag);
-#endif
             /* copy msg out and then put into queue (small message) */
             switch (msg_tag) {
             case SMALL_DATA_TAG:
@@ -986,12 +1034,12 @@ static void PumpNetworkSmsg()
             case LMSG_INIT_TAG:
             {
 #if PRINT_SYH
-                printf("+++[%d] from %d PumpNetwork Rdma Request msg is received, messageid:%d tag=%d\n", myrank, inst_id, lrts_received_msg, msg_tag);
+                printf("[%d] from %d request for Large msg is received, messageid:%d tag=%d\n", myrank, inst_id, lrts_received_msg, msg_tag);
 #endif
                 getLargeMsgRequest(header, inst_id);
                 break;
             }
-            case ACK_TAG:
+            case ACK_TAG:   //msg fit into mempool
             {
                 /* Get is done, release message . Now put is not used yet*/
 #if         !USE_LRTS_MEMPOOL
@@ -1001,21 +1049,34 @@ static void PumpNetworkSmsg()
                 SendRdmaMsg();
                 break;
             }
+            case BIG_MSG_TAG:  //big msg, de-register, transfer next seg
+            {
+                header_tmp = (CONTROL_MSG *) header;
+                MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(header_tmp->source_mem_hndl), &omdh);
+                if(header_tmp->length <= ONE_SEG) //transaction done
+                {
+                    CmiFree((void*)(header_tmp->source_addr) - ONE_SEG*(header_tmp->seq_id-1));
+                }else
+                {
+                    MallocControlMsg(control_msg_tmp);
+                    control_msg_tmp->source         = myrank;
+                    control_msg_tmp->source_addr    = (uint64_t)((void*)(header_tmp->source_addr + ONE_SEG));
+                    control_msg_tmp->dest_addr      = (uint64_t)((void*)(header_tmp->dest_addr) + ONE_SEG);
+                    control_msg_tmp->length         = header_tmp->length-ONE_SEG; 
+                    control_msg_tmp->seq_id         = header_tmp->seq_id+1;
+                    //send next seg
+                    send_large_messages(inst_id, control_msg_tmp);
+                }
+                SendRdmaMsg();
+                break;
+            }
+
 #if CMK_PERSISTENT_COMM
             case PUT_DONE_TAG: //persistent message
             {
                 void *msg = (void *)((CONTROL_MSG *) header)->source_addr;
                 int size = ((CONTROL_MSG *) header)->length;
-#if 0
-                void *dupmsg;
-                dupmsg = CmiAlloc(size);
-                _MEMCHECK(dupmsg);
-                memcpy(dupmsg, msg, size);
-                msg = dupmsg;
-
-#else
                 CmiReference(msg);
-#endif
                 handleOneRecvedMsg(size, msg); 
                 break;
             }
@@ -1023,7 +1084,7 @@ static void PumpNetworkSmsg()
             default: {
                 printf("weird tag problem\n");
                 CmiAbort("Unknown tag\n");
-            }
+                     }
             }
             GNI_SmsgRelease(ep_hndl_array[inst_id]);
             msg_tag = GNI_SMSG_ANY_TAG;
@@ -1034,26 +1095,47 @@ static void PumpNetworkSmsg()
         GNI_RC_CHECK("Smsg_rx_cq full", status);
     }
 }
-
-static void getLargeMsgRequest(void* header, uint64_t inst_id)
+static void printDesc(gni_post_descriptor_t *pd)
+{
+    printf(" addr=%p, ", pd->local_addr); 
+}
+static void getLargeMsgRequest(void* header, uint64_t inst_id )
 {
 #if     USE_LRTS_MEMPOOL
     CONTROL_MSG         *request_msg;
-    gni_return_t        status;
+    gni_return_t        status = GNI_RC_SUCCESS;
     void                *msg_data;
     gni_post_descriptor_t *pd;
     RDMA_REQUEST        *rdma_request_msg;
     gni_mem_handle_t    msg_mem_hndl;
-    int source;
+    int source, size, transaction_size;
     // initial a get to transfer data from the sender side */
     request_msg = (CONTROL_MSG *) header;
     source = request_msg->source;
-    msg_data = CmiAlloc(request_msg->length);
+    size = request_msg->length; 
+    if(request_msg->seq_id < 2)  
+        msg_data = CmiAlloc(size);
+    else
+        msg_data = request_msg-> dest_addr;
     _MEMCHECK(msg_data);
-    //memcpy(&msg_mem_hndl, GetMemHndl(msg_data), sizeof(gni_mem_handle_t));
-    msg_mem_hndl = GetMemHndl(msg_data);
-
+   
     MallocPostDesc(pd);
+    pd->cqwrite_value = request_msg->seq_id;
+    if( request_msg->seq_id == 0)
+    {
+        msg_mem_hndl = GetMemHndl(msg_data);
+        transaction_size = ALIGN64(size);
+    }
+    else{
+        transaction_size = size > ONE_SEG?ONE_SEG: ALIGN64(size);
+        status = MEMORY_REGISTER(onesided_hnd, nic_hndl, msg_data, transaction_size, &msg_mem_hndl, &omdh);
+        if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR) 
+        {
+            GNI_RC_CHECK("Invalid/permission Mem Register in post", status);
+        }
+        pd->first_operand = size;
+    }
+
     if(request_msg->length < LRTS_GNI_RDMA_THRESHOLD) 
         pd->type            = GNI_POST_FMA_GET;
     else
@@ -1064,7 +1146,7 @@ static void getLargeMsgRequest(void* header, uint64_t inst_id)
     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
 #endif
     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
-    pd->length          = ALIGN4(request_msg->length);
+    pd->length          = transaction_size;
     pd->local_addr      = (uint64_t) msg_data;
     pd->local_mem_hndl  = msg_mem_hndl;
     pd->remote_addr     = request_msg->source_addr;
@@ -1072,19 +1154,29 @@ static void getLargeMsgRequest(void* header, uint64_t inst_id)
     pd->src_cq_hndl     = 0;//post_tx_cqh;     /* smsg_tx_cqh;  */
     pd->rdma_mode       = 0;
 
-    if(pd->type == GNI_POST_RDMA_GET) 
-        status = GNI_PostRdma(ep_hndl_array[source], pd);
-    else
-        status = GNI_PostFma(ep_hndl_array[source],  pd);
+    //memory registration success
+    if(status == GNI_RC_SUCCESS)
+    {
+        if(pd->type == GNI_POST_RDMA_GET) 
+            status = GNI_PostRdma(ep_hndl_array[source], pd);
+        else
+            status = GNI_PostFma(ep_hndl_array[source],  pd);
+    }else
+    {
+        pd->local_mem_hndl.qword1  = 0; 
+        pd->local_mem_hndl.qword1  = 0; 
+    }
     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
     {
         MallocRdmaRequest(rdma_request_msg);
+        rdma_request_msg->next = 0;
         rdma_request_msg->destNode = inst_id;
         rdma_request_msg->pd = pd;
         PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
-    }else
+    }else {
+        /* printf("source: %d pd:%p\n", source, pd); */
         GNI_RC_CHECK("AFter posting", status);
-
+    }
 #else
     CONTROL_MSG         *request_msg;
     gni_return_t        status;
@@ -1092,7 +1184,7 @@ static void getLargeMsgRequest(void* header, uint64_t inst_id)
     gni_post_descriptor_t *pd;
     RDMA_REQUEST        *rdma_request_msg;
     gni_mem_handle_t    msg_mem_hndl;
-    int source;
+    //int source;
     // initial a get to transfer data from the sender side */
     request_msg = (CONTROL_MSG *) header;
     source = request_msg->source;
@@ -1103,8 +1195,7 @@ static void getLargeMsgRequest(void* header, uint64_t inst_id)
 
     if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR) 
     {
-        printf("!!!!!!!!!!!Please try to use large page + module load craype-hugepages8m+ or contact charm++ developer for help\n");
-        GNI_RC_CHECK("Mem Register before post", status);
+        GNI_RC_CHECK("Invalid/permission Mem Register in post", status);
     }
 
     MallocPostDesc(pd);
@@ -1118,7 +1209,7 @@ static void getLargeMsgRequest(void* header, uint64_t inst_id)
     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
 #endif
     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
-    pd->length          = ALIGN4(request_msg->length);
+    pd->length          = ALIGN64(request_msg->length);
     pd->local_addr      = (uint64_t) msg_data;
     pd->remote_addr     = request_msg->source_addr;
     pd->remote_mem_hndl = request_msg->source_mem_hndl;
@@ -1163,7 +1254,6 @@ static void PumpLocalSmsgTransactions()
         type        = GNI_CQ_GET_TYPE(ev);
 #if PRINT_SYH
         lrts_local_done_msg++;
-        printf("[%d] SMSGPumpLocalSMSGTransactions (type=%d)\n", myrank, type);
         printf("*[%d]  PumpLocalSmsgTransactions GNI_CQ_GET_TYPE %d. Localdone=%d\n", myrank, GNI_CQ_GET_TYPE(ev), lrts_local_done_msg);
 #endif
         if(GNI_CQ_OVERRUN(ev))
@@ -1193,18 +1283,15 @@ static void PumpLocalRdmaTransactions()
     while ( (status = GNI_CqGetEvent(smsg_tx_cqh, &ev)) == GNI_RC_SUCCESS) 
     {
         type        = GNI_CQ_GET_TYPE(ev);
-#if PRINT_SYH
-        //lrts_local_done_msg++;
-        printf("**[%d] SMSGPumpLocalTransactions (type=%d)\n", myrank, type);
-#endif
         if (type == GNI_CQ_EVENT_TYPE_POST)
         {
             inst_id     = GNI_CQ_GET_INST_ID(ev);
 #if PRINT_SYH
-            printf("**[%d] SMSGPumpLocalTransactions localdone=%d\n", myrank,  lrts_local_done_msg);
+            printf("[%d] LocalTransactions localdone=%d\n", myrank,  lrts_local_done_msg);
 #endif
             //status = GNI_GetCompleted(post_tx_cqh, ev, &tmp_pd);
             status = GNI_GetCompleted(smsg_tx_cqh, ev, &tmp_pd);
+            MallocControlMsg(ack_msg_tmp);
             ////Message is sent, free message , put is not used now
             switch (tmp_pd->type) {
 #if CMK_PERSISTENT_COMM
@@ -1224,25 +1311,31 @@ static void PumpLocalRdmaTransactions()
 #endif
             case GNI_POST_RDMA_GET:
             case GNI_POST_FMA_GET:
-                msg_tag = ACK_TAG;  
 #if     !USE_LRTS_MEMPOOL
                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh);
+                msg_tag = ACK_TAG;  
+#else
+                if(tmp_pd->cqwrite_value > 0)
+                {
+                    msg_tag = BIG_MSG_TAG; 
+                    MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh);
+                } 
+                else
+                {
+                    msg_tag = ACK_TAG;  
+                }
+                ack_msg_tmp->seq_id = tmp_pd->cqwrite_value;
+                ack_msg_tmp->length = tmp_pd->first_operand;
+                ack_msg_tmp->dest_addr = tmp_pd->local_addr;
 #endif
                 break;
             default:
                 CmiPrintf("type=%d\n", tmp_pd->type);
                 CmiAbort("PumpLocalRdmaTransactions: unknown type!");
             }
-
-            MallocControlMsg(ack_msg_tmp);
-            ack_msg_tmp->source             = myrank;
+            //ack_msg_tmp->source             = myrank;
             ack_msg_tmp->source_addr        = tmp_pd->remote_addr;
-            ack_msg_tmp->length             = tmp_pd->length; 
             ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
-#if PRINT_SYH
-            lrts_send_msg_id++;
-            printf("ACK LrtsSend PE:%d==>%d, size=%d, messageid:%d ACK\n", myrank, inst_id, sizeof(CONTROL_MSG), lrts_send_msg_id);
-#endif
             status = send_smsg_message(inst_id, 0, 0, ack_msg_tmp, sizeof(CONTROL_MSG), msg_tag, 0);  
             if(status == GNI_RC_SUCCESS)
             {
@@ -1252,9 +1345,19 @@ static void PumpLocalRdmaTransactions()
             if (tmp_pd->type == GNI_POST_RDMA_GET || tmp_pd->type == GNI_POST_FMA_GET)
 #endif
             {
-              CmiAssert(SIZEFIELD((void*)(tmp_pd->local_addr)) <= tmp_pd->length);
-              handleOneRecvedMsg(tmp_pd->length, (void*)tmp_pd->local_addr); 
-              SendRdmaMsg(); 
+                if( msg_tag == ACK_TAG){    //msg fit in mempool 
+#if PRINT_SYH
+                    printf("Normal msg transaction PE:%d==>%d\n", myrank, inst_id);
+#endif
+                    CmiAssert(SIZEFIELD((void*)(tmp_pd->local_addr)) <= tmp_pd->length);
+                    handleOneRecvedMsg(tmp_pd->length, (void*)tmp_pd->local_addr); 
+                }else if (tmp_pd->first_operand <= ONE_SEG) {
+#if PRINT_SYH
+                    printf("Pipeline msg done [%d]\n", myrank);
+#endif
+                    handleOneRecvedMsg(tmp_pd->length + (tmp_pd->cqwrite_value-1)*ONE_SEG, (void*)tmp_pd->local_addr-(tmp_pd->cqwrite_value-1)*ONE_SEG); 
+                }
+                    SendRdmaMsg();
             }
             FreePostDesc(tmp_pd);
         }
@@ -1335,7 +1438,7 @@ static int SendBufferMsg()
     CONTROL_MSG         *control_msg_tmp;
     gni_return_t        status;
     int done = 1;
-    register    int     i;
+    register    int     i, register_size;
     int                 index_previous = -1;
     int                 index = smsg_head_index;
 #if !CMK_SMP
@@ -1360,43 +1463,44 @@ static int SendBufferMsg()
                 break;
 #endif
             CmiAssert(ptr!=NULL);
-            if(ptr->tag == SMALL_DATA_TAG)
+            switch(ptr->tag)
             {
-                status = send_smsg_message( ptr->destNode, 0, 0, ptr->msg, ptr->size, SMALL_DATA_TAG, 1);  
+            case SMALL_DATA_TAG:
+                status = send_smsg_message( ptr->destNode, 0, 0, ptr->msg, ptr->size, ptr->tag, 1);  
                 if(status == GNI_RC_SUCCESS)
                 {
                     CmiFree(ptr->msg);
                 }
-            }
-            else if(ptr->tag == LMSG_INIT_TAG)
-            {
+                break;
+            case LMSG_INIT_TAG:
                 control_msg_tmp = (CONTROL_MSG*)ptr->msg;
-#if PRINT_SYH
-                printf("[%d==>%d] LMSG buffer send call(%d)%s\n", myrank, ptr->destNode, lrts_smsg_success, gni_err_str[status] );
-#endif
                 if(control_msg_tmp->source_mem_hndl.qword1 == 0 && control_msg_tmp->source_mem_hndl.qword2 == 0)
                 {
-                    MEMORY_REGISTER(onesided_hnd, nic_hndl, control_msg_tmp->source_addr, control_msg_tmp->length, &(control_msg_tmp->source_mem_hndl), &omdh);
+                    if(control_msg_tmp->seq_id >0)
+                        register_size = control_msg_tmp->length>=ONE_SEG?ONE_SEG:control_msg_tmp->length;
+                    else
+                        register_size = control_msg_tmp->length;
+                    status = MEMORY_REGISTER(onesided_hnd, nic_hndl, control_msg_tmp->source_addr, register_size, &(control_msg_tmp->source_mem_hndl), &omdh);
                     if(status != GNI_RC_SUCCESS) {
                         done = 0;
                         break;
                     }
                 }
-                
-                status = send_smsg_message( ptr->destNode, 0, 0, ptr->msg, sizeof(CONTROL_MSG), LMSG_INIT_TAG, 1);  
+                status = send_smsg_message( ptr->destNode, 0, 0, ptr->msg, sizeof(CONTROL_MSG), ptr->tag, 1);  
                 if(status == GNI_RC_SUCCESS)
                 {
                     FreeControlMsg((CONTROL_MSG*)(ptr->msg));
                 }
-            }else if (ptr->tag == ACK_TAG)
-            {
-                status = send_smsg_message( ptr->destNode, 0, 0, ptr->msg, sizeof(CONTROL_MSG), ACK_TAG, 1);  
+                break;
+            case   ACK_TAG:
+            case   BIG_MSG_TAG:
+                status = send_smsg_message( ptr->destNode, 0, 0, ptr->msg, sizeof(CONTROL_MSG), ptr->tag, 1);  
                 if(status == GNI_RC_SUCCESS)
                 {
                     FreeControlMsg((CONTROL_MSG*)ptr->msg);
                 }
-            }else
-            {
+                break;
+            default:
                 printf("Weird tag\n");
                 CmiAbort("should not happen\n");
             }
@@ -1404,10 +1508,7 @@ static int SendBufferMsg()
             {
 #if PRINT_SYH
                 buffered_smsg_counter--;
-                if(lrts_smsg_success == lrts_send_msg_id)
-                    printf("GOOD send buff [%d==>%d] send buffer sent done%d (msgs=%d)\n", myrank, ptr->destNode, lrts_smsg_success, lrts_send_msg_id);
-                else
-                    printf("BAD send buff [%d==>%d] sent done%d (msgs=%d)\n", myrank, ptr->destNode, lrts_smsg_success, lrts_send_msg_id);
+                printf("[%d==>%d] buffered smsg sending done\n", myrank, ptr->destNode);
 #endif
                 FreeMsgList(ptr);
             }else {
@@ -1795,7 +1896,8 @@ printf("[%d:%d:%d] TRIED but fails: %d wanted: %d %d\n", CmiMyPe(), CmiMyNode(),
     return NULL;
 }
 #endif
-
+static long long int total_mempool_size = 0;
+static long long int total_mempool_calls = 0;
 #if USE_LRTS_MEMPOOL
 void *alloc_mempool_block(size_t *size, gni_mem_handle_t *mem_hndl, int expand_flag)
 {
@@ -1804,6 +1906,8 @@ void *alloc_mempool_block(size_t *size, gni_mem_handle_t *mem_hndl, int expand_f
 
     int default_size =  expand_flag? _expand_mem : _mempool_size;
     if (*size < default_size) *size = default_size;
+    total_mempool_size += *size;
+    total_mempool_calls += 1;
     ret = posix_memalign(&pool, ALIGNBUF, *size);
     if (ret != 0) {
 #if CMK_SMP && STEAL_MEMPOOL
@@ -1825,7 +1929,7 @@ void *alloc_mempool_block(size_t *size, gni_mem_handle_t *mem_hndl, int expand_f
     }
 #endif
     if(status != GNI_RC_SUCCESS)
-        printf("[%d] Charm++> Fatal error with registering memory of %d bytes: Please try to use large page (module load craype-hugepages8m) or contact charm++ developer for help.\n", CmiMyPe(), *size);
+        printf("[%d] Charm++> Fatal error with registering memory of %d bytes: Please try to use large page (module load craype-hugepages8m) or contact charm++ developer for help.[%lld, %lld]\n", CmiMyPe(), *size, total_mempool_size, total_mempool_calls);
     GNI_RC_CHECK("Mempool register", status);
     return pool;
 }
@@ -1989,10 +2093,18 @@ void* LrtsAlloc(int n_bytes, int header)
         CmiAssert(header+sizeof(mempool_header) <= ALIGNBUF);
 #if     USE_LRTS_MEMPOOL
         n_bytes = ALIGN64(n_bytes);
-        char *res = mempool_malloc(CpvAccess(mempool), ALIGNBUF+n_bytes-sizeof(mempool_header), 1);
-        ptr = res - sizeof(mempool_header) + ALIGNBUF - header;
+        if(n_bytes <= BIG_MSG)
+        {
+            char *res = mempool_malloc(CpvAccess(mempool), ALIGNBUF+n_bytes-sizeof(mempool_header), 1);
+            ptr = res - sizeof(mempool_header) + ALIGNBUF - header;
+        }else 
+        {
+            char *res = memalign(ALIGNBUF, n_bytes+ALIGNBUF);
+            ptr = res + ALIGNBUF - header;
+
+        }
 #else
-        n_bytes = ALIGN4(n_bytes);           /* make sure size if 4 aligned */
+        n_bytes = ALIGN64(n_bytes);           /* make sure size if 4 aligned */
         char *res = memalign(ALIGNBUF, n_bytes+ALIGNBUF);
         ptr = res + ALIGNBUF - header;
 #endif
@@ -2008,7 +2120,11 @@ void  LrtsFree(void *msg)
     int size = SIZEFIELD((char*)msg+sizeof(CmiChunkHeader));
     if (size <= SMSG_MAX_MSG)
       free(msg);
-    else
+    else if(size>BIG_MSG)
+    {
+        free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
+
+    }else
     {
 #if 0
         printf("[PE:%d] Free lrts for bytes=%d, ptr=%p\n", CmiMyPe(), size, (char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
index 1f8e310881821c863eb428091fe70eca91b092dd..0ea84af2bd9295be900cfb1cb62a6a6d423b07d5 100644 (file)
@@ -400,7 +400,7 @@ void MeshStreamer<dtype>::insertData(const dtype &dataItem, const int destinatio
 
 template <class dtype>
 void MeshStreamer<dtype>::doneInserting() {
-  contribute(CkCallback(CkIndex_MeshStreamer<dtype>::finish(NULL)), thisProxy);
+  contribute(CkCallback(CkIndex_MeshStreamer<dtype>::finish(NULL), thisProxy));
 }
 
 template <class dtype>