Merge branch 'charm' of charmgit:charm into charm
authorYanhua Sun <sun51@hopper07.(none)>
Thu, 8 Mar 2012 23:03:52 +0000 (15:03 -0800)
committerYanhua Sun <sun51@hopper07.(none)>
Thu, 8 Mar 2012 23:03:52 +0000 (15:03 -0800)
Conflicts:
src/arch/gemini_gni/machine.c

doc/converse/code/msgs/Makefile [new file with mode: 0644]
doc/converse/code/msgs/interNodeMsg.C [new file with mode: 0644]
doc/converse/tutorial.tex
src/arch/gemini_gni/machine.c
src/arch/util/machine-pxshm.c
src/arch/util/machine-xpmem.c
src/libs/ck-libs/NDMeshStreamer/NDMeshStreamer.h

diff --git a/doc/converse/code/msgs/Makefile b/doc/converse/code/msgs/Makefile
new file mode 100644 (file)
index 0000000..d31eddc
--- /dev/null
@@ -0,0 +1,19 @@
+CHARMC=../../../../bin/charmc $(OPTS)
+
+all: interNodeMsg 
+
+interNodeMsg: interNodeMsg.o
+       $(CHARMC) -language converse++ -o interNodeMsg interNodeMsg.o
+
+interNodeMsg.o: interNodeMsg.C
+       $(CHARMC) -language converse++ -c interNodeMsg.C
+
+
+test: interNodeMsg
+       ./charmrun ./interNodeMsg +p2 $(TESTOPTS)
+
+clean:
+       rm -f core *.cpm.h
+       rm -f TAGS *.o
+       rm -f interNodeMsg 
+       rm -f conv-host charmrun
diff --git a/doc/converse/code/msgs/interNodeMsg.C b/doc/converse/code/msgs/interNodeMsg.C
new file mode 100644 (file)
index 0000000..61d8823
--- /dev/null
@@ -0,0 +1,61 @@
+#include <stdlib.h>
+#include <converse.h>
+
+CpvDeclare(int,msgSize);
+CpvDeclare(int,userData);
+CpvDeclare(int,recvHandler);
+CpvDeclare(int,exitHandler);
+
+void sendData()
+{
+  //Allocate message
+  char *msg = (char *)CmiAlloc(CpvAccess(msgSize)+CmiMsgHeaderSizeBytes );
+  //set allocated space to contain user data
+  *((int *)(msg+CmiMsgHeaderSizeBytes)) =  CpvAccess(userData) ;
+  //set Handler
+  CmiSetHandler(msg,CpvAccess(recvHandler));
+  //Send Message
+  CmiSyncSendAndFree(0, CpvAccess(msgSize)+CmiMsgHeaderSizeBytes, msg);
+}
+//We finished for all message sizes. Exit now
+CmiHandler recvHandlerFunc(char *msg)
+{
+       int myData = *((int *)(msg+CmiMsgHeaderSizeBytes));
+       if (myData == CpvAccess(userData))
+                       CmiPrintf ("Received Expected Value\n");        
+    CmiFree(msg);
+       // Broadcast message 
+    void *sendmsg = CmiAlloc(CmiMsgHeaderSizeBytes);
+    CmiSetHandler(sendmsg,CpvAccess(exitHandler));
+    CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes,sendmsg);
+}
+// Exit now
+CmiHandler exitHandlerFunc(char *msg)
+{
+    CmiFree(msg);
+    CsdExitScheduler();
+    return 0;
+}
+
+//Converse main. Initialize variables and register handlers
+CmiStartFn mymain()
+{
+    CpvInitialize(int,msgSize);
+    CpvInitialize(int,userData);
+    CpvInitialize(int,recvHandler);
+    CpvInitialize(int,exitHandler);
+    CpvAccess(recvHandler) = CmiRegisterHandler((CmiHandler) recvHandlerFunc);
+    CpvAccess(exitHandler) = CmiRegisterHandler((CmiHandler) exitHandlerFunc);
+    CpvAccess(msgSize) = 4;
+    CpvAccess(userData) = 1454;
+    if (CmiMyPe() == 0)
+        sendData();
+    return 0;
+}
+
+int main(int argc,char *argv[])
+{
+    ConverseInit(argc,argv,(CmiStartFn)mymain,0,0);
+    return 0;
+}
index 47f27914ba18c42fea8a2dd7f851148a54f488a7..0b812dadda2b19f33777f53fb7ec3de35acec37c 100644 (file)
@@ -61,4 +61,31 @@ done.
 \end{figure}
 
 
+\section{Interprocessor Messaging}
+
+Figure~\ref{fig:converse-msg} illustrates how to write a simple program that sends a message from one processor to other. In the example program, function \textttP{sendData} shows how to send a message. First, one must create a buffer to hold the message. The buffer must be large enough to hold the header and the data. This is done by \texttt{CmiAlloc} call. \texttt{CmiMsgHeaderSizeBytes} is a constant which contains the size of the header. Next, the handler method should be set for the outgoing message using \texttt{CmiSetHandler} call. User data can be set after the header. In this example, we use \texttt{CmiSyncSendAndFree} call to send the message. This function sends the message and frees the buffer. There are various alternatives. The first argument of this function call is the processor number where we want to send the message, second argument is the size of the message (including header) and the last argument is a pointer to the message.
+\begin{itemize}
+\item
+sync: a version that is as simple as possible, pushing the data into the network and not returning until the data is ``in the network''. As soon as a sync function returns, you can reuse the message buffer.
+
+\item
+async: a version that returns almost instantaneously, and then continues working in the background. The background job transfers the data from the message buffer into the network. Since the background job is still using the message buffer when the function returns, you can't reuse the message buffer immediately. The background job sets a flag when it is done and you can then reuse the message buffer.
+
+\item
+send and free: a version that returns almost instantaneously, and then continues working in the background. The background job transfers the data from the message buffer into the network. When the background job finishes, it CmiFrees the message buffer. In this situation, you can't reuse the message buffer at all. To use a function of this type, you must allocate the message buffer using CmiAlloc.
+
+\item
+node: a version that send a message to a node instead of a specific processor. This means that when the message is received, any ``free'' processor within than node can handle it. 
+\end{itemize}
+
+On the receiver, appropiate handler function is called and a pointer to the message (starting from the header) is provided to the user. User must free the message after using the data using \texttt{CmiFree} call. Finally we also illustrate a broadcast message using \texttt{CmiSyncBroadcastAllAndFree}
+
+TODO: Examples of other modes
+
+\begin{figure}
+\lstinputlisting[language=pseudo,basicstyle=\footnotesize,numbers=left,escapechar=\%]{code/interNodeMsg.C}
+\caption{Sending Message across Converse Processes}
+\label{fig:converse-pingpong}
+\end{figure}
+
 \end{document}
index cd334625f8c869dfaa1c05da0c7e6c342574b5d7..6a83c15fdc541a6e1a1c6f839a42210c78862e26 100644 (file)
 #include <malloc.h>
 #include <unistd.h>
 #include <time.h>
-
 #include <gni_pub.h>
 #include <pmi.h>
 
+//#include <numatoolkit.h>
+
 #include "converse.h"
 
+#define     LARGEPAGE           0
+
+#if LARGEPAGE
+#include <hugetlbfs.h>
+#endif
+
 #if CMK_DIRECT
 #include "cmidirect.h"
 #endif
 #define PRINT_SYH  0
 
 #define USE_LRTS_MEMPOOL                  1
+
 #define REMOTE_EVENT                      0
 
 #define CMI_EXERT_SEND_CAP     0
 //#define MULTI_THREAD_SEND 1
 #endif
 
+#if CMK_SMP
+#define PIGGYBACK_ACK                        0
+#endif
+
 // Trace communication thread
 #if CMK_TRACE_ENABLED && CMK_SMP_TRACE_COMMTHREAD
 #define TRACE_THRESHOLD     0.00005
@@ -104,23 +116,23 @@ static CmiInt8 _mempool_size_limit = 0;
 
 static CmiInt8 _totalmem = 0.8*oneGB;
 
+#if LARGEPAGE
+static int BIG_MSG  =  16*oneMB;
+static int ONE_SEG  =  4*oneMB;
+#else
 static int BIG_MSG  =  4*oneMB;
 static int ONE_SEG  =  2*oneMB;
+#endif
 static int BIG_MSG_PIPELINE = 4;
 
 // dynamic flow control
 static CmiInt8 buffered_send_msg = 0;
-static CmiInt8  register_memory_size = 0;
+static CmiInt8 register_memory_size = 0;
 
-#define     LARGEPAGE           0
 #if LARGEPAGE
-#if CMK_SMP && COMM_THREAD_SEND 
 static CmiInt8  MAX_BUFF_SEND  =  100000*oneMB;
 static CmiInt8  MAX_REG_MEM    =  200000*oneMB;
-#else
-static CmiInt8  MAX_BUFF_SEND  =  100000*oneMB;
-static CmiInt8  MAX_REG_MEM    =  200000*oneMB;
-#endif
+static CmiInt8 register_count = 0;
 #else
 #if CMK_SMP && COMM_THREAD_SEND 
 static CmiInt8  MAX_BUFF_SEND  =  100*oneMB;
@@ -145,6 +157,8 @@ static CmiInt8  MAX_REG_MEM    =  25*oneMB;
 
 static int _tlbpagesize = 4096;
 
+//static int _smpd_count  = 0;
+
 static int   user_set_flag  = 0;
 
 static int _checkProgress = 1;             /* check deadlock */
@@ -274,18 +288,15 @@ static int  REMOTE_QUEUE_ENTRIES=20480;
 static int LOCAL_QUEUE_ENTRIES=20480; 
 #endif
 
-#define BIG_MSG_TAG  0x26
-#define PUT_DONE_TAG      0x28
-#define DIRECT_PUT_DONE_TAG      0x29
-#define ACK_TAG           0x30
+#define BIG_MSG_TAG             0x26
+#define PUT_DONE_TAG            0x28
+#define DIRECT_PUT_DONE_TAG     0x29
+#define ACK_TAG                 0x30
 /* SMSG is data message */
 #define SMALL_DATA_TAG          0x31
+#define SMALL_DATA_ACK_TAG      0x32
 /* SMSG is a control message to initialize a BTE */
-#define MEDIUM_HEAD_TAG         0x32
-#define MEDIUM_DATA_TAG         0x33
 #define LMSG_INIT_TAG           0x39 
-#define VERY_LMSG_INIT_TAG      0x40 
-#define VERY_LMSG_TAG           0x41 
 
 #define DEBUG
 #ifdef GNI_RC_CHECK
@@ -299,6 +310,7 @@ static int LOCAL_QUEUE_ENTRIES=20480;
 
 #define ALIGN64(x)       (size_t)((~63)&((x)+63))
 //#define ALIGN4(x)        (size_t)((~3)&((x)+3)) 
+#define ALIGNHUGEPAGE(x)   (size_t)((~(_tlbpagesize-1))&((x)+_tlbpagesize-1))
 
 static int useStaticMSGQ = 0;
 static int useStaticFMA = 0;
@@ -464,7 +476,12 @@ typedef struct smsg_queue
 #endif
 
 SMSG_QUEUE                  smsg_queue;
+#if PIGGYBACK_ACK
+SMSG_QUEUE                  smsg_ack_queue;
+#endif
+#if CMK_USE_OOB
 SMSG_QUEUE                  smsg_oob_queue;
+#endif
 
 #if CMK_SMP
 
@@ -766,6 +783,41 @@ static uint32_t get_cookie(void)
     return cookie;
 }
 
+#if LARGEPAGE
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <sys/mman.h>
+
+// size must be _tlbpagesize aligned
+void *my_get_huge_pages(size_t size)
+{
+    char filename[512];
+    int fd;
+    mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
+    void *ptr = NULL;
+
+    snprintf(filename, sizeof(filename), "%s/charm_mempool.%d.%d", hugetlbfs_find_path_for_size(_tlbpagesize), getpid(), rand());
+    fd = open(filename, O_RDWR | O_CREAT, mode);
+    if (fd == -1) {
+        CmiAbort("my_get_huge_pages: open filed");
+    }
+    ptr = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_PRIVATE, fd, 0);
+    if (ptr == MAP_FAILED) ptr = NULL;
+//printf("[%d] my_get_huge_pages: %s %d %p\n", myrank, filename, size, ptr);
+    close(fd);
+    unlink(filename);
+    return ptr;
+}
+
+void my_free_huge_pages(void *ptr, int size)
+{
+//printf("[%d] my_free_huge_pages: %p %d\n", myrank, ptr, size);
+    int ret = munmap(ptr, size);
+    if (ret == -1) CmiAbort("munmap failed in my_free_huge_pages");
+}
+
+#endif
+
 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
 /* TODO: add any that are related */
 /* =====End of Definitions of Message-Corruption Related Macros=====*/
@@ -795,11 +847,12 @@ int         lrts_received_msg = 0;
 
 static void sweep_mempool(mempool_type *mptr)
 {
+    int n = 0;
     block_header *current = &(mptr->block_head);
 
-    printf("[n %d] sweep_mempool slot START.\n", myrank);
+    printf("[n %d %d] sweep_mempool slot START.\n", myrank, n++);
     while( current!= NULL) {
-        printf("[n %d] sweep_mempool slot %p size: %d (%d %d) %lld %lld.\n", myrank, current, current->size, current->msgs_in_send, current->msgs_in_recv, current->mem_hndl.qword1, current->mem_hndl.qword2);
+        printf("[n %d %d] sweep_mempool slot %p size: %d (%d %d) %lld %lld.\n", myrank, n++, current, current->size, current->msgs_in_send, current->msgs_in_recv, current->mem_hndl.qword1, current->mem_hndl.qword2);
         current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
     }
     printf("[n %d] sweep_mempool slot END.\n", myrank);
@@ -824,7 +877,7 @@ static  gni_return_t deregisterMemory(mempool_type *mptr, block_header **from)
 }
 
 inline 
-static gni_return_t registerFromMempool(mempool_type *mptr, void *blockaddr, int size, gni_mem_handle_t  *memhndl)
+static gni_return_t registerFromMempool(mempool_type *mptr, void *blockaddr, size_t size, gni_mem_handle_t  *memhndl)
 {
     gni_return_t status = GNI_RC_SUCCESS;
     //int size = GetMempoolsize(msg);
@@ -861,7 +914,7 @@ static gni_return_t registerFromMempool(mempool_type *mptr, void *blockaddr, int
 }
 
 inline 
-static gni_return_t registerMemory(void *msg, int size, gni_mem_handle_t *t)
+static gni_return_t registerMemory(void *msg, size_t size, gni_mem_handle_t *t)
 {
     static int rank = -1;
     int i;
@@ -1055,6 +1108,54 @@ static int connect_to(int destNode)
     return 1;
 }
 
+#if PIGGYBACK_ACK
+static void * piggyback_ack(int destNode, int msgsize, int *count)
+{
+    int i;
+    if (PCQueueEmpty(smsg_ack_queue.smsg_msglist_index[destNode].sendSmsgBuf)) return NULL;
+    int len = PCQueueLength(smsg_ack_queue.smsg_msglist_index[destNode].sendSmsgBuf);
+    int piggycount = (SMSG_MAX_MSG - msgsize)/sizeof(uint64_t);
+    if (piggycount > len+1) piggycount = len + 1;
+    if (piggycount <= 5) return NULL;
+    uint64_t * buf = (uint64_t*)CmiTmpAlloc(piggycount * sizeof(uint64_t));
+    CmiAssert(buf != NULL);
+    buf[0] = piggycount-1;
+//printf("[%d] piggyback_ack: %d\n", myrank, piggycount);
+    for (i=0; i<piggycount-1; i++) {
+        MSG_LIST *ptr = (MSG_LIST*)PCQueuePop(smsg_ack_queue.smsg_msglist_index[destNode].sendSmsgBuf);
+        CmiAssert(ptr != NULL);
+        ACK_MSG *msg = ptr->msg;
+        buf[i+1] = msg->source_addr;
+        FreeAckMsg(msg);
+        FreeMsgList(ptr);
+    }
+    *count = piggycount;
+    return buf;
+}
+
+
+static void piggyback_ack_done(int destNode, uint64_t *buf, int done)
+{
+    if (!done)
+    {
+        int i;
+        for (i=0; i<buf[0]; i++) {
+            MSG_LIST *msg_tmp;
+            MallocMsgList(msg_tmp);
+            ACK_MSG  *ack_msg;
+            MallocAckMsg(ack_msg);
+            ack_msg->source_addr = buf[i+1];
+            msg_tmp->size = ACK_MSG_SIZE;
+            msg_tmp->msg = ack_msg;
+            msg_tmp->tag = ACK_TAG;
+            msg_tmp->destNode = destNode;
+            PCQueuePush(smsg_ack_queue.smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
+        }
+    }
+    CmiTmpFree(buf);
+}
+#endif
+
 inline 
 static gni_return_t send_smsg_message(SMSG_QUEUE *queue, int destNode, void *msg, int size, uint8_t tag, int inbuff )
 {
@@ -1085,8 +1186,21 @@ static gni_return_t send_smsg_message(SMSG_QUEUE *queue, int destNode, void *msg
 #else
     if(queue->smsg_msglist_index[destNode].sendSmsgBuf == 0 || inbuff==1)
     {
+#endif
+        uint64_t *buf = NULL;
+        int bufsize = 0;
+#if PIGGYBACK_ACK
+        if (tag == SMALL_DATA_TAG) {
+            int nack = 0;
+            buf = piggyback_ack(destNode, size, &nack);
+            if (buf) {
+                tag = SMALL_DATA_ACK_TAG;
+                bufsize = nack * sizeof(uint64_t);
+            }
+        }
 #endif
         CMI_GNI_LOCK
+<<<<<<< HEAD
 #if CMK_SMP_TRACE_COMMTHREAD
         int oldpe = -1;
         int oldeventid = -1;
@@ -1105,21 +1219,29 @@ static gni_return_t send_smsg_message(SMSG_QUEUE *queue, int destNode, void *msg
 #if CMK_SMP_TRACE_COMMTHREAD
         if (oldpe != -1)  TRACE_COMM_SET_MSGID(real_data, oldpe, oldeventid);
 #endif
+=======
+        status = GNI_SmsgSendWTag(ep_hndl_array[destNode], buf, bufsize, msg, size, 0, tag);
+>>>>>>> 6bc5ef7da3aa42d00053cc7680a14257efe4ee42
         CMI_GNI_UNLOCK
         if(status == GNI_RC_SUCCESS)
         {
 #if CMK_SMP_TRACE_COMMTHREAD
-            if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG)
+            if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG || tag == SMALL_DATA_ACK_TAG)
             { 
                 TRACE_COMM_CREATION(CpvAccess(projTraceStart), real_data);
             }
 #endif
             smsg_send_count ++;
-            return status;
         }else
             status = GNI_RC_ERROR_RESOURCE;
+#if PIGGYBACK_ACK
+        if (buf) {
+            piggyback_ack_done(destNode, buf, status==GNI_RC_SUCCESS);
+            tag = SMALL_DATA_TAG;
+        }
+#endif
     }
-    if(inbuff ==0)
+    if(status != GNI_RC_SUCCESS && inbuff ==0)
         buffer_small_msgs(queue, msg, size, destNode, tag);
     return status;
 }
@@ -1456,6 +1578,9 @@ void LrtsPostNonLocal(){
     if (SendBufferMsg(&smsg_oob_queue) == 1)
 #endif
     SendBufferMsg(&smsg_queue);
+#if PIGGYBACK_ACK
+    SendBufferMsg(&smsg_ack_queue);
+#endif
     SendRdmaMsg();
 #endif
 #endif
@@ -1539,6 +1664,7 @@ static void bufferRdmaMsg(int inst_id, gni_post_descriptor_t *pd)
 #endif
 
 }
+
 static void getLargeMsgRequest(void* header, uint64_t inst_id);
 static void PumpNetworkSmsg()
 {
@@ -1589,6 +1715,25 @@ static void PumpNetworkSmsg()
 #endif
             /* copy msg out and then put into queue (small message) */
             switch (msg_tag) {
+#if PIGGYBACK_ACK
+            case SMALL_DATA_ACK_TAG:
+            {
+                int i;
+                uint64_t *buf = (uint64_t*)header;
+                int piggycount = buf[0];
+//printf("[%d] got piggyback msg: %d\n", myrank, piggycount);
+                for (i=0; i<piggycount; i++) {
+                    void *msg = (void*)(buf[i+1]);
+                    CmiAssert(msg != NULL);
+                    DecreaseMsgInSend(msg);
+                    if(NoMsgInSend(msg))
+                        buffered_send_msg -= GetMempoolsize(msg);
+                    CmiFree(msg);
+                }
+                header = buf + piggycount + 1;
+                msg_nbytes -= (piggycount+1)*sizeof(uint64_t);
+            }
+#endif
             case SMALL_DATA_TAG:
             {
                 START_EVENT();
@@ -1713,7 +1858,7 @@ static void getLargeMsgRequest(void* header, uint64_t inst_id )
     gni_post_descriptor_t *pd;
     gni_mem_handle_t    msg_mem_hndl;
     int source, size, transaction_size, offset = 0;
-    int     register_size = 0;
+    size_t     register_size = 0;
 
     // initial a get to transfer data from the sender side */
     request_msg = (CONTROL_MSG *) header;
@@ -1978,8 +2123,12 @@ static void PumpLocalRdmaTransactions()
             else
 #endif
             if (msg_tag == ACK_TAG) {
+#if ! PIGGYBACK_ACK
                 status = send_smsg_message(queue, inst_id, ack_msg, ACK_MSG_SIZE, msg_tag, 0); 
                 if (status == GNI_RC_SUCCESS) FreeAckMsg(ack_msg);
+#else
+                buffer_small_msgs(&smsg_ack_queue, ack_msg, ACK_MSG_SIZE, inst_id, msg_tag);
+#endif
             }
             else {
                 status = send_smsg_message(queue, inst_id, ack_msg_tmp, CONTROL_MSG_SIZE, msg_tag, 0); 
@@ -2036,8 +2185,9 @@ static void  SendRdmaMsg()
     gni_mem_handle_t        msg_mem_hndl;
     RDMA_REQUEST            *ptr = 0, *tmp_ptr;
     RDMA_REQUEST            *pre = 0;
-    int i, register_size = 0;
+    uint64_t                register_size = 0;
     void                    *msg;
+    int                     i;
 #if CMK_SMP
     int len = PCQueueLength(sendRdmaBuf);
     for (i=0; i<len; i++)
@@ -2137,7 +2287,7 @@ static int SendBufferMsg(SMSG_QUEUE *queue)
     CONTROL_MSG         *control_msg_tmp;
     gni_return_t        status;
     int                 done = 1;
-    int                 register_size;
+    uint64_t            register_size;
     void                *register_addr;
     int                 index_previous = -1;
 #if CMI_EXERT_SEND_CAP
@@ -2316,6 +2466,7 @@ static int SendBufferMsg(SMSG_QUEUE *queue)
 static void ProcessDeadlock();
 void LrtsAdvanceCommunication(int whileidle)
 {
+    static int count = 0;
     /*  Receive Msg first */
 #if CMK_SMP_TRACE_COMMTHREAD
     double startT, endT;
@@ -2358,7 +2509,17 @@ void LrtsAdvanceCommunication(int whileidle)
 #if CMK_USE_OOB
     if (SendBufferMsg(&smsg_oob_queue) == 1)
 #endif
+    {
+#if PIGGYBACK_ACK
+    //if (count%10 == 0) SendBufferMsg(&smsg_ack_queue);
+    if (SendBufferMsg(&smsg_queue) == 1) {
+        //if (count++ % 10 == 0) 
+        SendBufferMsg(&smsg_ack_queue);
+    }
+#else
     SendBufferMsg(&smsg_queue);
+#endif
+    }
     //MACHSTATE(8, "after SendBufferMsg\n") ; 
 #if CMK_SMP_TRACE_COMMTHREAD
     endT = CmiWallTimer();
@@ -2576,6 +2737,9 @@ static void _init_smsg()
     }
 
     _init_send_queue(&smsg_queue);
+#if PIGGYBACK_ACK
+    _init_send_queue(&smsg_ack_queue);
+#endif
 #if CMK_USE_OOB
     _init_send_queue(&smsg_oob_queue);
 #endif
@@ -2724,8 +2888,8 @@ printf("[%d:%d:%d] TRIED but fails: %d wanted: %d %d\n", CmiMyPe(), CmiMyNode(),
 }
 #endif
 
-static long long int total_mempool_size = 0;
-static long long int total_mempool_calls = 0;
+static CmiUInt8 total_mempool_size = 0;
+static CmiUInt8 total_mempool_calls = 0;
 
 #if USE_LRTS_MEMPOOL
 void *alloc_mempool_block(size_t *size, gni_mem_handle_t *mem_hndl, int expand_flag)
@@ -2736,6 +2900,11 @@ void *alloc_mempool_block(size_t *size, gni_mem_handle_t *mem_hndl, int expand_f
 
     size_t default_size =  expand_flag? _expand_mem : _mempool_size;
     if (*size < default_size) *size = default_size;
+#if LARGEPAGE
+    // round up to be multiple of _tlbpagesize
+    //*size = (*size + _tlbpagesize - 1)/_tlbpagesize*_tlbpagesize;
+    *size = ALIGNHUGEPAGE(*size);
+#endif
     total_mempool_size += *size;
     total_mempool_calls += 1;
 #if   !LARGEPAGE
@@ -2745,7 +2914,12 @@ void *alloc_mempool_block(size_t *size, gni_mem_handle_t *mem_hndl, int expand_f
         CmiAbort("alloc_mempool_block");
     }
 #endif
+#if LARGEPAGE
+    pool = my_get_huge_pages(*size);
+    ret = pool==NULL;
+#else
     ret = posix_memalign(&pool, ALIGNBUF, *size);
+#endif
     if (ret != 0) {
 #if CMK_SMP && STEAL_MEMPOOL
       pool = steal_mempool_block(size, mem_hndl);
@@ -2759,10 +2933,13 @@ void *alloc_mempool_block(size_t *size, gni_mem_handle_t *mem_hndl, int expand_f
     }
 #if LARGEPAGE
     CmiMemLock();
+    register_count++;
     MEMORY_REGISTER(onesided_hnd, nic_hndl, pool, *size, mem_hndl, &omdh, status);
     CmiMemUnlock();
-    if(status != GNI_RC_SUCCESS)
-        printf("[%d, %d] memory reigstration fails %lld ask for %lld\n", myrank, CmiMyRank(), register_memory_size, *size);
+    if(status != GNI_RC_SUCCESS) {
+        printf("[%d, %d] memory reigstration %f G (%lld) ask for %lld\n", myrank, CmiMyRank(), register_memory_size/(1024*1024.0*1024),register_count, *size);
+sweep_mempool(CpvAccess(mempool));
+    }
     GNI_RC_CHECK("MEMORY_REGISTER", status);
 #else
     SetMemHndlZero((*mem_hndl));
@@ -2777,7 +2954,11 @@ void free_mempool_block(void *ptr, gni_mem_handle_t mem_hndl)
     {
         MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &mem_hndl, &omdh, GetSizeFromBlockHeader(ptr));
     }
+#if LARGEPAGE
+    my_free_huge_pages(ptr, GetSizeFromBlockHeader(ptr));
+#else
     free(ptr);
+#endif
 }
 #endif
 
@@ -2957,8 +3138,8 @@ void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID)
     if (MAX_BUFF_SEND > MAX_REG_MEM)  MAX_BUFF_SEND = MAX_REG_MEM;
 
     if (myrank==0) {
-        printf("Charm++> memory pool init size: %1.fMB, max size: %1.fMB\n", _mempool_size/1024.0/1024, _mempool_size_limit/1024.0/1024);
-        printf("Charm++> memory pool max size: %1.fMB, max for send: %1.fMB\n", MAX_REG_MEM/1024.0/1024, MAX_BUFF_SEND/1024.0/1024);
+        printf("Charm++> memory pool init block size: %1.fMB, total registered memory per node: %1.fMB\n", _mempool_size/1024.0/1024, _mempool_size_limit/1024.0/1024);
+        printf("Charm++> memory pool registered memory limit: %1.fMB, send limit: %1.fMB\n", MAX_REG_MEM/1024.0/1024, MAX_BUFF_SEND/1024.0/1024);
         if (MAX_REG_MEM < BIG_MSG * 2 + oneMB)  {
             /* memblock can expand to BIG_MSG * 2 size */
             printf("Charm++ Error: The mempool maximum size is too small, please use command line option +gni-mempool-max or environment variable CHARM_UGNI_MEMPOOL_MAX to increase the value to at least %1.fMB.\n",  BIG_MSG * 2.0/1024/1024 + 1);
@@ -3000,6 +3181,12 @@ void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID)
         printf("Charm++> Cray TLB page size: %1.fK\n", _tlbpagesize/1024.0);
     }
 
+#if LARGEPAGE
+    if (_tlbpagesize == 4096) {
+        CmiAbort("Hugepage module, e.g. craype-hugepages8M must be loaded.");
+    }
+#endif
+
     /* init DMA buffer for medium message */
 
     //_init_DMA_buffer();
@@ -3016,11 +3203,13 @@ void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID)
     debugLog=fopen(ln,"w");
 #endif
 
+//    NTK_Init();
+//    ntk_return_t sts = NTK_System_GetSmpdCount(&_smpd_count);
 }
 
 void* LrtsAlloc(int n_bytes, int header)
 {
-    void *ptr;
+    void *ptr = NULL;
 #if 0
     printf("\n[PE:%d]Alloc Lrts for bytes=%d, head=%d %d\n", CmiMyPe(), n_bytes, header, SMSG_MAX_MSG);
 #endif
@@ -3036,12 +3225,17 @@ void* LrtsAlloc(int n_bytes, int header)
         if(n_bytes < BIG_MSG)
         {
             char *res = mempool_malloc(CpvAccess(mempool), ALIGNBUF+n_bytes-sizeof(mempool_header), 1);
-            ptr = res - sizeof(mempool_header) + ALIGNBUF - header;
+            if (res) ptr = res - sizeof(mempool_header) + ALIGNBUF - header;
         }else 
         {
+#if LARGEPAGE
+            //printf("[%d] LrtsAlloc a big_msg: %d %d\n", myrank, n_bytes, ALIGNHUGEPAGE(n_bytes+ALIGNBUF));
+            n_bytes = ALIGNHUGEPAGE(n_bytes+ALIGNBUF);
+            char *res = my_get_huge_pages(n_bytes);
+#else
             char *res = memalign(ALIGNBUF, n_bytes+ALIGNBUF);
-            ptr = res + ALIGNBUF - header;
-
+#endif
+            if (res) ptr = res + ALIGNBUF - header;
         }
 #else
         n_bytes = ALIGN64(n_bytes);           /* make sure size if 4 aligned */
@@ -3056,22 +3250,29 @@ void  LrtsFree(void *msg)
 {
     int size = SIZEFIELD((char*)msg+sizeof(CmiChunkHeader));
     if (size <= SMSG_MAX_MSG)
-      free(msg);
-    else if(size>=BIG_MSG)
-    {
-        free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
-
-    }else
-    {
-#if     USE_LRTS_MEMPOOL
+        free(msg);
+    else {
+        size = ALIGN64(size);
+        if(size>=BIG_MSG)
+        {
+#if LARGEPAGE
+            int s = ALIGNHUGEPAGE(size+ALIGNBUF);
+            my_free_huge_pages((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF, s);
+#else
+            free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
+#endif
+        }
+        else {
+#if    USE_LRTS_MEMPOOL
 #if CMK_SMP
-        mempool_free_thread((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF + sizeof(mempool_header));
+            mempool_free_thread((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF + sizeof(mempool_header));
 #else
-        mempool_free(CpvAccess(mempool), (char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF + sizeof(mempool_header));
+            mempool_free(CpvAccess(mempool), (char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF + sizeof(mempool_header));
 #endif
 #else
-        free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
+            free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
 #endif
+        }
     }
 }
 
@@ -3093,7 +3294,11 @@ void LrtsDrainResources()
 #if CMK_USE_OOB
            !SendBufferMsg(&smsg_oob_queue) ||
 #endif
-           !SendBufferMsg(&smsg_queue))
+           !SendBufferMsg(&smsg_queue) 
+#if PIGGYBACK_ACK
+        || !SendBufferMsg(&smsg_ack_queue)
+#endif
+          )
     {
         if (useDynamicSMSG)
             PumpDatagramConnection();
index 2a99e644192c3f666e47cbaf947edfcdeea0bc80..e698e6fe57010a525fe69604be51c01ce5d5fbe1 100644 (file)
@@ -292,13 +292,16 @@ void CmiInitPxshm(char **argv){
  * shutdown shmem objects and semaphores
  *
  * *******************/
+static int pxshm_freed = 0;
 void tearDownSharedBuffers();
+void freeSharedBuffers();
 
 void CmiExitPxshm(){
         if (pxshmContext == NULL) return;
        if(pxshmContext->nodesize != 1){
                 int i;
-               tearDownSharedBuffers();
+               if (!pxshm_freed)
+                    tearDownSharedBuffers();
        
                for(i=0;i<pxshmContext->nodesize;i++){
                        if(i != pxshmContext->noderank){
@@ -557,6 +560,11 @@ void setupSharedBuffers(){
                        pxshmContext->sendBufs[i].header->bytes = 0;
                }
        }
+
+        if (CmiBarrier() == 0) {
+            freeSharedBuffers();
+            pxshm_freed = 1;
+        }
 }
 
 void allocBufNameStrings(char ***bufName){
@@ -634,6 +642,20 @@ void createShmObject(char *name,int size,char **pPtr){
        close(fd);
 }
 
+void freeSharedBuffers(){
+       int i;
+       for(i= 0;i<pxshmContext->nodesize;i++){
+           if(i != pxshmContext->noderank){
+               if(shm_unlink(pxshmContext->recvBufNames[i]) < 0){
+                   fprintf(stderr,"Error from shm_unlink %s \n",strerror(errno));
+               }
+#if PXSHM_LOCK
+               sem_unlink(pxshmContext->recvBufNames[i]);
+#endif
+           }
+       }
+};
+
 void tearDownSharedBuffers(){
        int i;
        for(i= 0;i<pxshmContext->nodesize;i++){
@@ -641,11 +663,9 @@ void tearDownSharedBuffers(){
                if(shm_unlink(pxshmContext->recvBufNames[i]) < 0){
                    fprintf(stderr,"Error from shm_unlink %s \n",strerror(errno));
                }
-               sem_unlink(pxshmContext->sendBufNames[i]);
 #if PXSHM_LOCK
                sem_close(pxshmContext->recvBufs[i].mutex);
                sem_close(pxshmContext->sendBufs[i].mutex);
-               sem_unlink(pxshmContext->sendBufNames[i]);
                sem_unlink(pxshmContext->recvBufNames[i]);
                 pxshmContext->recvBufs[i].mutex = NULL;
                 pxshmContext->sendBufs[i].mutex = NULL;
index ffce1b0fdaeea982eb98e21b50504427d8660c51..6de6b561c171abd2410bc2d7b79208213e4cc510 100644 (file)
@@ -261,7 +261,9 @@ void CmiInitXpmem(char **argv){
  * shutdown shmem objects and semaphores
  *
  * *******************/
+static int pxshm_freed = 0;
 void tearDownSharedBuffers();
+void freeSharedBuffers();
 
 void CmiExitXpmem(){
        int i=0;
@@ -269,7 +271,7 @@ void CmiExitXpmem(){
         if (xpmemContext == NULL) return;
 
        if(xpmemContext->nodesize != 1) {
-               tearDownSharedBuffers();
+                //tearDownSharedBuffers();
        
                for(i=0;i<xpmemContext->nodesize;i++){
                        if(i != xpmemContext->noderank){
@@ -513,6 +515,7 @@ void setupSharedBuffers(){
        createSendXpmemAndSems(&(xpmemContext->sendBufs),xpmemContext->sendBufNames);
         CmiBarrier();
         removeXpmemFiles();
+        freeSharedBuffers();
        
        for(i=0;i<xpmemContext->nodesize;i++){
                if(i != xpmemContext->noderank){
@@ -688,6 +691,18 @@ void removeXpmemFiles()
         unlink(fname);
 }
 
+void freeSharedBuffers(){
+       int i;
+       for(i= 0;i<xpmemContext->nodesize;i++){
+           if(i != xpmemContext->noderank){
+#if XPMEM_LOCK
+               sem_unlink(xpmemContext->sendBufNames[i]);
+               sem_unlink(xpmemContext->recvBufNames[i]);
+#endif
+           }
+       }
+}
+
 void tearDownSharedBuffers(){
        int i;
        for(i= 0;i<xpmemContext->nodesize;i++){
@@ -837,7 +852,7 @@ inline void flushAllSendQs(){
         for(i=0;i<xpmemContext->nodesize;i++) {
                 if (i == xpmemContext->noderank) continue;
                 XpmemSendQ *sendQ = xpmemContext->sendQs[i];
-                if(SendQ->numEntries > 0) {
+                if(sendQ->numEntries > 0) {
 #endif
        
 #if XPMEM_OSSPINLOCK
index 6b01994848cc99534129d3fcee3d86bd4e445cf9..af1d653e30da53738987e93cc0e625026278abe6 100644 (file)
@@ -1,5 +1,5 @@
-#ifndef NDMESH_STREAMER_H_
-#define NDMESH_STREAMER_H_
+#ifndef NDMESH_STREAMER_H
+#define NDMESH_STREAMER_H
 
 #include <algorithm>
 #include "NDMeshStreamer.decl.h"
@@ -12,6 +12,7 @@
 // #define DEBUG_STREAMER
 // #define CACHE_LOCATIONS
 // #define SUPPORT_INCOMPLETE_MESH
+#define CACHE_ARRAY_METADATA
 
 struct MeshLocation {
   int dimension; 
@@ -86,19 +87,15 @@ private:
     MeshStreamerMessage<dtype> ***dataBuffers_;
 
 #ifdef CACHE_LOCATIONS
-    MeshLocation *cachedLocations;
-    bool *isCached; 
+    MeshLocation *cachedLocations_;
+    bool *isCached_
 #endif
 
     MeshLocation determineLocation(int destinationPe);
 
     void storeMessage(int destinationPe, 
                      const MeshLocation &destinationCoordinates, 
-                     void *dataItem);
-
-    virtual int copyDataItemIntoMessage(
-               MeshStreamerMessage<dtype> *destinationBuffer, 
-               void *dataItemHandle);
+                     void *dataItem, bool copyIndirectly = false);
 
     virtual void deliverToDestination(
                  int destinationPe, 
@@ -108,6 +105,12 @@ private:
 
     void flushLargestBuffer();
 
+protected:
+
+    virtual int copyDataItemIntoMessage(
+               MeshStreamerMessage<dtype> *destinationBuffer, 
+               void *dataItemHandle, bool copyIndirectly = false);
+
 public:
 
     MeshStreamer(int totalBufferCapacity, int numDimensions, 
@@ -125,7 +128,7 @@ public:
       return isPeriodicFlushEnabled_;
     }
     virtual void insertData(dtype &dataItem, int destinationPe); 
-    void insertData(void *dataItem, int destinationPe);
+    void insertData(void *dataItemHandle, int destinationPe);
     void doneInserting();
     void associateCallback(CkCallback &cb, bool automaticFinish = true) { 
       userCallback_ = cb;
@@ -216,9 +219,9 @@ MeshStreamer<dtype>::MeshStreamer(
   isPeriodicFlushEnabled_ = false; 
 
 #ifdef CACHE_LOCATIONS
-  cachedLocations = new MeshLocation[numMembers_];
-  isCached = new bool[numMembers_];
-  std::fill(isCached, isCached + numMembers_, false);
+  cachedLocations_ = new MeshLocation[numMembers_];
+  isCached_ = new bool[numMembers_];
+  std::fill(isCached_, isCached_ + numMembers_, false);
 #endif
 
 }
@@ -238,8 +241,8 @@ MeshStreamer<dtype>::~MeshStreamer() {
   delete[] myLocationIndex_;
 
 #ifdef CACHE_LOCATIONS
-  delete[] cachedLocations;
-  delete[] isCached; 
+  delete[] cachedLocations_;
+  delete[] isCached_
 #endif
 
 }
@@ -250,8 +253,8 @@ inline
 MeshLocation MeshStreamer<dtype>::determineLocation(int destinationPe) { 
 
 #ifdef CACHE_LOCATIONS
-  if (isCached[destinationPe]) {    
-    return cachedLocations[destinationPe]; 
+  if (isCached_[destinationPe]) {    
+    return cachedLocations_[destinationPe]; 
   }
 #endif
 
@@ -265,8 +268,8 @@ MeshLocation MeshStreamer<dtype>::determineLocation(int destinationPe) {
       destinationLocation.dimension = i; 
       destinationLocation.bufferIndex = dimensionIndex; 
 #ifdef CACHE_LOCATIONS
-      cachedLocations[destinationPe] = destinationLocation;
-      isCached[destinationPe] = true; 
+      cachedLocations_[destinationPe] = destinationLocation;
+      isCached_[destinationPe] = true; 
 #endif
       return destinationLocation;
     }
@@ -284,7 +287,7 @@ template <class dtype>
 inline 
 int MeshStreamer<dtype>::copyDataItemIntoMessage(
                         MeshStreamerMessage<dtype> *destinationBuffer,
-                        void *dataItemHandle) {
+                        void *dataItemHandle, bool copyIndirectly) {
   return destinationBuffer->addDataItem(*((dtype *)dataItemHandle)); 
 }
 
@@ -293,7 +296,7 @@ inline
 void MeshStreamer<dtype>::storeMessage(
                          int destinationPe, 
                          const MeshLocation& destinationLocation,
-                         void *dataItem) {
+                         void *dataItem, bool copyIndirectly) {
 
   int dimension = destinationLocation.dimension;
   int bufferIndex = destinationLocation.bufferIndex; 
@@ -319,7 +322,7 @@ void MeshStreamer<dtype>::storeMessage(
   
   MeshStreamerMessage<dtype> *destinationBuffer = messageBuffers[bufferIndex];
   int numBuffered = 
-    copyDataItemIntoMessage(destinationBuffer, dataItem);
+    copyDataItemIntoMessage(destinationBuffer, dataItem, copyIndirectly);
   if (dimension != 0) {
     destinationBuffer->markDestination(numBuffered-1, destinationPe);
   }  
@@ -361,11 +364,14 @@ void MeshStreamer<dtype>::storeMessage(
 }
 
 template <class dtype>
-void MeshStreamer<dtype>::insertData(void *dataItem, int destinationPe) {
+inline
+void MeshStreamer<dtype>::insertData(void *dataItemHandle, int destinationPe) {
   static int count = 0;
+  const static bool copyIndirectly = true;
 
   MeshLocation destinationLocation = determineLocation(destinationPe);
-  storeMessage(destinationPe, destinationLocation, dataItem); 
+  storeMessage(destinationPe, destinationLocation, dataItemHandle, 
+              copyIndirectly); 
 
   // release control to scheduler if requested by the user, 
   //   assume caller is threaded entry
@@ -381,7 +387,10 @@ inline
 void MeshStreamer<dtype>::insertData(dtype &dataItem, int destinationPe) {
 
   if (destinationPe == CkMyPe()) {
-    localDeliver(dataItem);
+    // copying here is necessary - user code should not be 
+    // passed back a reference to the original item
+    dtype dataItemCopy = dataItem;
+    localDeliver(dataItemCopy);
     return;
   }
 
@@ -603,8 +612,12 @@ private:
 
   CProxy_MeshStreamerArrayClient<dtype> clientProxy_;
   CkArray *clientArrayMgr_;
-  MeshStreamerArrayClient<dtype> *clientObj_;
-
+  int numArrayElements_;
+  MeshStreamerArrayClient<dtype> **clientObjs_;
+#ifdef CACHE_ARRAY_METADATA
+  int *destinationPes_;
+  bool *isCachedArrayMetadata_;
+#endif
 
   void deliverToDestination(
        int destinationPe, 
@@ -615,13 +628,11 @@ private:
 
   void localDeliver(ArrayDataItem<dtype> &packedDataItem) {
     int arrayId = packedDataItem.arrayIndex; 
-    MeshStreamerArrayClient<dtype> *clientObj = 
-      clientProxy_[arrayId].ckLocal();
 
-    if (clientObj != NULL) {
-      clientObj->process(packedDataItem.dataItem);
+    if (clientObjs_[arrayId] != NULL) {
+      clientObjs_[arrayId]->process(packedDataItem.dataItem);
     }
-    else {
+    else { 
       // array element is no longer present locally - redeliver using proxy
       clientProxy_[arrayId].process(packedDataItem.dataItem);
     }
@@ -644,6 +655,27 @@ public:
   {
     clientProxy_ = clientProxy; 
     clientArrayMgr_ = clientProxy_.ckLocalBranch();
+
+    numArrayElements_ = (clientArrayMgr_->getNumInitial()).data()[0];
+
+    clientObjs_ = new MeshStreamerArrayClient<dtype>*[numArrayElements_];
+    for (int i = 0; i < numArrayElements_; i++) {
+      clientObjs_[i] = clientProxy_[i].ckLocal();
+    }
+
+#ifdef CACHE_ARRAY_METADATA
+    destinationPes_ = new int[numArrayElements_];
+    isCachedArrayMetadata_ = new bool[numArrayElements_];
+    std::fill(isCachedArrayMetadata_, isCachedArrayMetadata_ + numArrayElements_, false);
+#endif
+  }
+
+  ~ArrayMeshStreamer() {
+    delete [] clientObjs_;
+#ifdef CACHE_ARRAY_METADATA
+    delete [] destinationPes_;
+    delete [] isCachedArrayMetadata_; 
+#endif
   }
 
   void receiveArrayData(MeshStreamerMessage<ArrayDataItem<dtype> > *msg) {
@@ -656,8 +688,22 @@ public:
 
   void insertData(dtype &dataItem, int arrayIndex) {
 
-    int destinationPe = 
+    int destinationPe; 
+#ifdef CACHE_ARRAY_METADATA
+  if (isCachedArrayMetadata_[arrayIndex]) {    
+    destinationPe =  destinationPes_[arrayIndex];
+  }
+  else {
+    destinationPe = 
       clientArrayMgr_->lastKnown(clientProxy_[arrayIndex].ckGetIndex());
+    isCachedArrayMetadata_[arrayIndex] = true;
+    destinationPes_[arrayIndex] = destinationPe;
+  }
+#else 
+  destinationPe = 
+    clientArrayMgr_->lastKnown(clientProxy_[arrayIndex].ckGetIndex());
+#endif
+
     static ArrayDataItem<dtype> packedDataItem;
     if (destinationPe == CkMyPe()) {
       // copying here is necessary - user code should not be 
@@ -680,15 +726,23 @@ public:
 
   int copyDataItemIntoMessage(
       MeshStreamerMessage<ArrayDataItem <dtype> > *destinationBuffer, 
-      void *dataItemHandle) {
-
-    int numDataItems = destinationBuffer->numDataItems;
-    DataItemHandle *tempHandle = (DataItemHandle *) dataItemHandle;
-    (destinationBuffer->data)[numDataItems].dataItem = 
-      *(tempHandle->dataItem);
-    (destinationBuffer->data)[numDataItems].arrayIndex = 
-      tempHandle->arrayIndex;
-    return ++destinationBuffer->numDataItems;
+      void *dataItemHandle, bool copyIndirectly) {
+
+    if (copyIndirectly == true) {
+      // newly inserted items are passed through a handle to avoid copying
+      int numDataItems = destinationBuffer->numDataItems;
+      DataItemHandle *tempHandle = (DataItemHandle *) dataItemHandle;
+      (destinationBuffer->data)[numDataItems].dataItem = 
+       *(tempHandle->dataItem);
+      (destinationBuffer->data)[numDataItems].arrayIndex = 
+       tempHandle->arrayIndex;
+      return ++destinationBuffer->numDataItems;
+    }
+    else {
+      // this is an item received along the route to destination
+      // we can copy it from the received message
+      return MeshStreamer<ArrayDataItem<dtype> >::copyDataItemIntoMessage(destinationBuffer, dataItemHandle);
+    }
   }
 
 };