Feature #1834: Zerocopy Broadcast Sender-side API 95/4395/35
authorNitin Bhat <nbhat4@illinois.edu>
Fri, 5 Oct 2018 19:39:48 +0000 (15:39 -0400)
committerNitin Bhat <nbhat4@illinois.edu>
Wed, 10 Apr 2019 17:22:05 +0000 (12:22 -0500)
Add support to broadcast a large buffer using the Zerocopy API
to multiple recipients. Uses memcpy, CMA or RDMA internally
depending upon the the location of the recipient. The received
buffer is Readonly. This change uses the existing infrastructure
and tree algorithm used for broadcasting regular messages.

Change-Id: Ib9b1be1a52c9cf488cec7f4052f949f14c0bbbb0

33 files changed:
examples/charm++/zerocopy/entry_method_api/Makefile
examples/charm++/zerocopy/entry_method_api/simpleBcast/Makefile [new file with mode: 0644]
examples/charm++/zerocopy/entry_method_api/simpleBcast/simpleBcast.C [new file with mode: 0644]
examples/charm++/zerocopy/entry_method_api/simpleBcast/simpleBcast.ci [new file with mode: 0644]
src/arch/common/conv-mach-common.h
src/arch/gni/conv-common.h
src/arch/gni/conv-mach-syncft.h
src/arch/mpi/conv-common.h
src/arch/mpi/conv-mach-syncft.h
src/arch/netlrts/conv-common.h
src/arch/ofi/conv-common.h
src/arch/pamilrts/conv-common.h
src/arch/pamilrts/conv-mach-async.h
src/arch/util/machine-broadcast.c
src/arch/verbs/conv-common.h
src/arch/verbs/conv-mach-syncft.h
src/arch/verbs/machine-onesided.c
src/ck-core/ck.C
src/ck-core/ck.h
src/ck-core/ckrdma.C
src/ck-core/ckrdma.h
src/ck-core/debug-message.C
src/ck-core/envelope.h
src/ck-core/init.C
src/conv-core/conv-config.h
src/conv-core/conv-header.h
src/conv-core/conv-rdma.c
src/conv-core/conv-rdma.h
src/conv-core/converse.h
src/util/cmirdmautils.c
src/xlat-i/xi-Entry.C
src/xlat-i/xi-Entry.h
src/xlat-i/xi-Parameter.C

index 4cc65d57535128690450183b0f338401a1a58c4e..2430a23cda9189abf56732640d59a0b630220e5c 100644 (file)
@@ -3,6 +3,7 @@ DIRS = \
   prereg \
   reg \
   misc \
+  simpleBcast \
 
 TESTDIRS = $(DIRS)
 
diff --git a/examples/charm++/zerocopy/entry_method_api/simpleBcast/Makefile b/examples/charm++/zerocopy/entry_method_api/simpleBcast/Makefile
new file mode 100644 (file)
index 0000000..4703933
--- /dev/null
@@ -0,0 +1,21 @@
+-include ../../../../common.mk
+CHARMC=../../../../../bin/charmc $(OPTS)
+
+all: simpleBcast
+
+simpleBcast:  simpleBcast.o
+       $(CHARMC) simpleBcast.o -o simpleBcast -language charm++ -module CommonLBs
+
+cifiles: simpleBcast.ci
+       $(CHARMC) -c simpleBcast.ci
+       touch cifiles
+
+simpleBcast.o : simpleBcast.C cifiles
+       $(CHARMC) -c simpleBcast.C
+
+test: all
+       $(call run, +p4 ./simpleBcast)
+       $(call run, +p6 ./simpleBcast)
+
+clean:
+       rm -f *.def.h *.decl.h *.o *~ *.exe cifiles charmrun simpleBcast
diff --git a/examples/charm++/zerocopy/entry_method_api/simpleBcast/simpleBcast.C b/examples/charm++/zerocopy/entry_method_api/simpleBcast/simpleBcast.C
new file mode 100644 (file)
index 0000000..2809cec
--- /dev/null
@@ -0,0 +1,122 @@
+#include "simpleBcast.decl.h"
+
+#define DEBUG(x) //x
+
+CProxy_Main mainProxy;
+
+template<class T>
+void assignValues(T *&arr, int size){
+  arr = new T[size];
+  for(int i=0; i<size; i++)
+     arr[i] = i;
+}
+
+template<class T>
+void checkArrValues(T *&arr, int size){
+  for(int i=0; i<size; i++)
+     CkAssert(arr[i] == i);
+}
+
+class Main : public CBase_Main{
+  int size;
+  int counter;
+  public:
+  Main(CkArgMsg *m) {
+    // Create an array of size received in arguments
+    if(m->argc > 2) {
+      // print error message
+      CkAbort("Usage: ./simpleBcast <array-size>");
+    } else if(m->argc == 2 ) {
+      size = atoi(m->argv[1]);
+    } else {
+      size = CkNumPes() * 10; // default with 10 chare array elements per pe
+    }
+
+    delete m;
+
+    counter = 0;
+    mainProxy = thisProxy;
+
+    // Create a chare array
+    CProxy_zcArray arrProxy = CProxy_zcArray::ckNew(size);
+
+    // Create a group
+    CProxy_zcGroup grpProxy = CProxy_zcGroup::ckNew();
+
+    // Create a nodegroup
+    CProxy_zcNodegroup ngrpProxy = CProxy_zcNodegroup::ckNew();
+
+    // allocate a large array
+    int bufferSize = 200000;
+    int *buffer = new int[bufferSize];
+    assignValues(buffer, bufferSize);
+
+    // create a callback method
+    int idx_zerocopySent = CkIndex_Main::zerocopySent(NULL);
+    CkCallback cb = CkCallback(idx_zerocopySent, thisProxy);
+
+    CkCallback doneCb = CkCallback(CkReductionTarget(Main, done), thisProxy);
+
+    // invoking bcast on chare array
+    arrProxy.recvLargeArray(CkSendBuffer(buffer, cb), bufferSize, doneCb);
+
+    // invoking bcast on group
+    grpProxy.recvLargeArray(CkSendBuffer(buffer, cb), bufferSize, doneCb);
+
+    // invoking bcast on nodegroup
+    ngrpProxy.recvLargeArray(CkSendBuffer(buffer, cb), bufferSize, doneCb);
+  }
+
+  void zerocopySent(CkDataMsg *m) {
+    CkPrintf("[%d][%d][%d] Source callback invoked\n", CkMyPe(), CkMyNode(), CmiMyRank());
+    done();
+    delete m;
+  }
+
+  void done() {
+    // Wait for 3 reductions to complete: Chare Array, Group, Nodegroup and
+    // 3 more calls from zerocopySent callback method on completion of sending the buffer
+    if(++counter == 6) {
+      CkPrintf("[%d][%d][%d] All operations completed\n", CkMyPe(), CkMyNode(), CmiMyRank());
+      CkExit();
+    }
+  }
+};
+
+class zcArray : public CBase_zcArray {
+  public:
+  zcArray() {}
+
+  void recvLargeArray(int *ptr1, int n1, CkCallback doneCb) {
+    DEBUG(CkPrintf("[%d][%d] APP Chare array element received large array %p\n", thisIndex, CkMyPe(), ptr1);)
+    checkArrValues(ptr1, n1);
+
+    contribute(doneCb);
+  }
+};
+
+class zcGroup : public CBase_zcGroup {
+  public:
+  zcGroup() {}
+
+  void recvLargeArray(int *ptr1, int n1, CkCallback doneCb) {
+    DEBUG(CkPrintf("[%d][%d] APP Group element received large array %p\n", thisIndex, CkMyPe(), ptr1);)
+    checkArrValues(ptr1, n1);
+
+    contribute(doneCb);
+  }
+};
+
+class zcNodegroup : public CBase_zcNodegroup {
+  public:
+  zcNodegroup() {}
+
+  void recvLargeArray(int *ptr1, int n1, CkCallback doneCb) {
+    DEBUG(CkPrintf("[%d][%d] APP Nodegroup element received large array %p\n", thisIndex, CkMyPe(), ptr1);)
+    checkArrValues(ptr1, n1);
+
+    contribute(doneCb);
+  }
+};
+
+#include "simpleBcast.def.h"
diff --git a/examples/charm++/zerocopy/entry_method_api/simpleBcast/simpleBcast.ci b/examples/charm++/zerocopy/entry_method_api/simpleBcast/simpleBcast.ci
new file mode 100644 (file)
index 0000000..6c1e158
--- /dev/null
@@ -0,0 +1,25 @@
+mainmodule simpleBcast {
+
+  readonly CProxy_Main mainProxy;
+
+  mainchare Main {
+    entry Main(CkArgMsg *m);
+    entry void zerocopySent(CkDataMsg *m);
+    entry [reductiontarget] void done();
+  };
+
+  array [1D] zcArray {
+    entry zcArray();
+    entry void recvLargeArray(nocopy int ptr1[n1], int n1, CkCallback doneCb);
+  };
+
+  group zcGroup {
+    entry zcGroup();
+    entry void recvLargeArray(nocopy int ptr1[n1], int n1, CkCallback doneCb);
+  };
+
+  nodegroup zcNodegroup {
+    entry zcNodegroup();
+    entry void recvLargeArray(nocopy int ptr1[n1], int n1, CkCallback doneCb);
+  };
+}
index 98490eb35f0beb0655478aee1636b8a4202a40b7..46c0242164558a992767c4a673dc492072d3a8d3 100644 (file)
@@ -45,7 +45,9 @@ enum ncpyRegModes {
 enum ncpyOperationMode {
   CMK_DIRECT_API          = 0,
   CMK_EM_API              = 1,
-  CMK_EM_API_REVERSE      = 2
+  CMK_EM_API_REVERSE      = 2,
+  CMK_BCAST_EM_API        = 3,
+  CMK_BCAST_EM_API_REVERSE= 4
 };
 
 // Enum for the method of acknowledglement handling after the completion of a zerocopy operation
@@ -62,3 +64,11 @@ enum ncpyFreeNcpyOpInfoMode {
   CMK_FREE_NCPYOPINFO           = 0,
   CMK_DONT_FREE_NCPYOPINFO           = 1
 };
+
+// Enum for the type of converse message
+// TODO: Convert to a bool variable post C++ conversion
+enum cmiZCMsgType {
+  CMK_REG_NO_ZC_MSG = 0,
+  CMK_ZC_P2P_SEND_MSG = 1,
+  CMK_ZC_BCAST_SEND_MSG = 2
+};
index f414baa97e6100803bf0cc6c958e02d91f93af2a..af1a0c30111cb1529106c445c55f4a1dc9583217 100644 (file)
 
 #if DELTA_COMPRESS
 #if CMK_ERROR_CHECKING
-#define CMK_MSG_HEADER_EXT_    CmiUInt4 size; CmiUInt2 seq; unsigned char cksum, magic; CmiUInt2 rank,hdl,xhdl,info,type,redID; CmiInt4 root; CmiUInt4 compressStart; CmiUInt2 compress_flag,xxhdl; CmiUInt8 persistRecvHandler; 
+#define CMK_MSG_HEADER_EXT_    CmiUInt4 size; CmiUInt2 seq; unsigned char cksum, magic; CmiUInt2 rank,hdl,xhdl,info,type,redID; CmiInt4 root; CmiUInt4 compressStart; CmiUInt2 compress_flag,xxhdl; CmiUInt8 persistRecvHandler; CmiUInt1 zcMsgType:2;
 #else
-#define CMK_MSG_HEADER_EXT_    CmiUInt4 size; CmiUInt4 seq; CmiUInt2 rank,hdl,xhdl,info,type,redID; CmiInt4 root; CmiUInt4 compressStart; CmiUInt2 compress_flag,xxhdl; CmiUInt8 persistRecvHandler; 
+#define CMK_MSG_HEADER_EXT_    CmiUInt4 size; CmiUInt4 seq; CmiUInt2 rank,hdl,xhdl,info,type,redID; CmiInt4 root; CmiUInt4 compressStart; CmiUInt2 compress_flag,xxhdl; CmiUInt8 persistRecvHandler; CmiUInt1 zcMsgType:2;
 #endif
 #else 
 #if CMK_ERROR_CHECKING
-#define CMK_MSG_HEADER_EXT_    CmiUInt4 size; CmiUInt2 seq; unsigned char cksum, magic; CmiUInt2 rank,hdl,xhdl,info,type,redID; CmiInt4 root;  
+#define CMK_MSG_HEADER_EXT_    CmiUInt4 size; CmiUInt2 seq; unsigned char cksum, magic; CmiUInt2 rank,hdl,xhdl,info,type,redID; CmiInt4 root; CmiUInt1 zcMsgType:2;
 #else
-#define CMK_MSG_HEADER_EXT_    CmiUInt4 size; CmiUInt4 seq; CmiUInt2 rank,hdl,xhdl,info,type,redID; CmiInt4 root;  
+#define CMK_MSG_HEADER_EXT_    CmiUInt4 size; CmiUInt4 seq; CmiUInt2 rank,hdl,xhdl,info,type,redID; CmiInt4 root; CmiUInt1 zcMsgType:2;
 #endif
 #endif
 
index d62f03bc3a34499d687c87ef994d47f521ce2cfe..b215afb399c8565e6ce4efdfb9013ddbb1902bfc 100644 (file)
@@ -3,7 +3,7 @@
 //#undef CMK_MSG_HEADER_EXT
 //#undef CMK_MSG_HEADER_BIGSIM_
 /* expand the header to store the restart phase counter(pn) */
-#define CMK_MSG_HEADER_EXT_    CmiUInt4 size; CmiUInt4 seq; CmiUInt2 rank,hdl,xhdl,info,type,redID,pn,d9; CmiInt4 root; 
+#define CMK_MSG_HEADER_EXT_    CmiUInt4 size; CmiUInt4 seq; CmiUInt2 rank,hdl,xhdl,info,type,redID,pn,d9; CmiInt4 root; CmiUInt1 zcMsgType:2;
 //#define CMK_MSG_HEADER_EXT    { CMK_MSG_HEADER_EXT_ }
 //#define CMK_MSG_HEADER_BIGSIM_    { CmiUInt2 d0,d1,d2,d3,d4,d5,hdl,xhdl,pn,info; int nd, n; double rt; CmiInt2 tID; CmiUInt2 hID; char t; int msgID; int srcPe;}
 //#define CMK_MSG_HEADER_BIGSIM_  { CMK_MSG_HEADER_EXT_ CMK_BIGSIM_FIELDS }
index 1ebbd62d1c554e0f79fd81e4b59c13e1b4bd8708..3ebc52277b568786b562f1c99465ee327b23cef8 100644 (file)
@@ -15,9 +15,9 @@
 #define CMK_HANDLE_SIGUSR                                  1
 
 #if CMK_ERROR_CHECKING
-#define CMK_MSG_HEADER_EXT_    CmiUInt2 rank, hdl,xhdl,info, type, redID; CmiInt4 root; unsigned char cksum, magic, msgType;
+#define CMK_MSG_HEADER_EXT_    CmiUInt2 rank, hdl,xhdl,info, type, redID; CmiInt4 root; unsigned char cksum, magic, msgType; CmiUInt1 zcMsgType:2;
 #else
-#define CMK_MSG_HEADER_EXT_    CmiUInt2 rank, hdl,xhdl,info, type, redID; CmiInt4 root; unsigned char msgType;
+#define CMK_MSG_HEADER_EXT_    CmiUInt2 rank, hdl,xhdl,info, type, redID; CmiInt4 root; unsigned char msgType; CmiUInt1 zcMsgType:2;
 #endif
 
 #define CMK_MSG_HEADER_BASIC  CMK_MSG_HEADER_EXT
index 97d8529e16b51020bcc6bc7143bdfe39dc3f2111..c6baf661ec6fb6aaa42174f4d33a035a7aed3a5d 100644 (file)
@@ -3,7 +3,7 @@
 //#undef CMK_MSG_HEADER_EXT
 //#undef CMK_MSG_HEADER_BIGSIM_
 /* expand the header to store the restart phase counter(pn) */
-#define CMK_MSG_HEADER_EXT_   CmiUInt2 rank, root, hdl,xhdl,info, type, pn,d7; unsigned char cksum, magic, msgType; CmiUInt2 redID;
+#define CMK_MSG_HEADER_EXT_   CmiUInt2 rank, root, hdl,xhdl,info, type, pn,d7; unsigned char cksum, magic, msgType; CmiUInt2 redID; CmiUInt1 zcMsgType:2;
 //#define CMK_MSG_HEADER_EXT    { CMK_MSG_HEADER_EXT_ }
 //#define CMK_MSG_HEADER_BIGSIM_    { CmiUInt2 d0,d1,d2,d3,d4,d5,hdl,xhdl,pn,info; int nd, n; double rt; CmiInt2 tID; CmiUInt2 hID; char t; int msgID; int srcPe;}
 //#define CMK_MSG_HEADER_BIGSIM_  { CMK_MSG_HEADER_EXT_ CMK_BIGSIM_FIELDS }
index 7135b34f83fb6ef929670f10d22ea134f0546874..63c7a26c016ef27efd0246f50ce9f16c5b0802e4 100644 (file)
@@ -23,7 +23,7 @@
    between a REG, CMA_MD and CMA_ACK message
 */
 #define CMK_MSG_HEADER_BASIC   CMK_MSG_HEADER_EXT
-#define CMK_MSG_HEADER_EXT_    CmiUInt2 d0,d1,d2,d3,hdl,type,xhdl,info,redID,rank; CmiInt4 root, size;
+#define CMK_MSG_HEADER_EXT_    CmiUInt2 d0,d1,d2,d3,hdl,type,xhdl,info,redID,rank; CmiInt4 root, size; CmiUInt1 zcMsgType:2;
 #define CMK_MSG_HEADER_EXT       { CMK_MSG_HEADER_EXT_ }
 #define CMK_MSG_HEADER_BIGSIM_  { CMK_MSG_HEADER_EXT_ CMK_BIGSIM_FIELDS }
 
index cd99808796efb0d85d588d2e0c9d2a70b54901a1..c14b576676d07e9385b59c2255a71aaebab187f2 100644 (file)
@@ -23,7 +23,7 @@
  * - startid, redID
  * - rank is needed by broadcast
  */
-#define CMK_MSG_HEADER_UNIQUE    CmiUInt4 size; CmiUInt2 rank,hdl,xhdl,info,type,redID; CmiInt4 root;
+#define CMK_MSG_HEADER_UNIQUE    CmiUInt4 size; CmiUInt2 rank,hdl,xhdl,info,type,redID; CmiInt4 root; CmiUInt1 zcMsgType:2;
 
 #define CMK_MSG_HEADER_BASIC  CMK_MSG_HEADER_EXT
 #define CMK_MSG_HEADER_EXT            { CMK_MSG_HEADER_UNIQUE }
index 1b35ec97909b23516764966460ad5c8b1968a3dd..9f1e95e2fde20478e3ae8de6be1ff1ec3c5f5482 100644 (file)
@@ -11,9 +11,9 @@
 
 //#define  DELTA_COMPRESS                                     1
 #if DELTA_COMPRESS
-#define CMK_MSG_HEADER_EXT_    CmiUInt2 rank, hdl,xhdl,info, type; unsigned char cksum, magic; int root, size; CmiUInt2 redID, padding; CmiUInt4 compressStart; CmiUInt2 compress_flag,xxhdl; CmiUInt8 persistRecvHandler;
+#define CMK_MSG_HEADER_EXT_    CmiUInt2 rank, hdl,xhdl,info, type; unsigned char cksum, magic; int root, size; CmiUInt2 redID, padding; CmiUInt4 compressStart; CmiUInt2 compress_flag,xxhdl; CmiUInt8 persistRecvHandler; CmiUInt1 zcMsgType:2;
 #else
-#define CMK_MSG_HEADER_EXT_    CmiUInt2 rank, hdl,xhdl,info, type; unsigned char cksum, magic; int root, size; CmiUInt2 redID, padding; 
+#define CMK_MSG_HEADER_EXT_    CmiUInt2 rank, hdl,xhdl,info, type; unsigned char cksum, magic; int root, size; CmiUInt2 redID, padding; CmiUInt1 zcMsgType:2;
 #endif
 
 #define CMK_MSG_HEADER_BASIC  CMK_MSG_HEADER_EXT
index ad17e59b763c471978c682ee91360682cd588753..21154f413abd7e3ee6e385dcc0d4f35bfee3cbe1 100644 (file)
@@ -3,10 +3,10 @@
 //#define  DELTA_COMPRESS                                     1
 
 #undef CMK_MSG_HEADER_EXT_ 
-#if DELTA_COMPRESS
-#define CMK_MSG_HEADER_EXT_    CmiUInt2 rank, hdl,xhdl,info, type; unsigned char cksum, magic; int root, size, dstnode; CmiUInt2 redID, padding; char work[6*sizeof(void *)]; CmiUInt4 compressStart; CmiUInt2 compress_flag,xxhdl; CmiUInt8 persistRecvHandler;
+#if DELTA_COMPRESkS
+#define CMK_MSG_HEADER_EXT_    CmiUInt2 rank, hdl,xhdl,info, type; unsigned char cksum, magic; int root, size, dstnode; CmiUInt2 redID, padding; char work[6*sizeof(void *)]; CmiUInt4 compressStart; CmiUInt2 compress_flag,xxhdl; CmiUInt8 persistRecvHandler; CmiUInt1 zcMsgType:2;
 #else
-#define CMK_MSG_HEADER_EXT_    CmiUInt2 rank, hdl,xhdl,info, type; unsigned char cksum, magic; int root, size, dstnode; CmiUInt2 redID, padding; char work[6*sizeof(void *)];
+#define CMK_MSG_HEADER_EXT_    CmiUInt2 rank, hdl,xhdl,info, type; unsigned char cksum, magic; int root, size, dstnode; CmiUInt2 redID, padding; char work[6*sizeof(void *)]; CmiUInt1 zcMsgType:2;
 #endif
 
 
index 215ca9aada5adb3af5cf5d32d1bf57d889ba89c4..e1e3510410facc16b41fefa5ed9770cf49abd7c1 100644 (file)
@@ -50,6 +50,24 @@ static void processBcastQs(void) {
 #endif
 }
 
+// Method to forward the received proc message to my child nodes
+static INLINE_KEYWORD void forwardProcBcastMsg(int size, char *msg) {
+#if CMK_BROADCAST_SPANNING_TREE
+  SendSpanningChildrenProc(size, msg);
+#elif CMK_BROADCAST_HYPERCUBE
+  SendHyperCubeProc(size, msg);
+#endif
+#if CMK_BROADCAST_SPANNING_TREE && CMK_BROADCAST_USE_CMIREFERENCE
+  /* same message may be sent out, make a copy of it */
+  if (CmiNumNodes()>1 && CmiGetReference(msg)>1) {
+    void *newmsg;
+    newmsg = CopyMsg(msg, size);
+    CmiFree(msg);
+    msg = newmsg;
+  }
+#endif
+}
+
 static INLINE_KEYWORD void processProcBcastMsg(int size, char *msg) {
     /* Since this function is only called on intermediate nodes,
      * the rank of this msg should be 0.
@@ -57,33 +75,39 @@ static INLINE_KEYWORD void processProcBcastMsg(int size, char *msg) {
     CmiAssert(CMI_DEST_RANK(msg)==0);
     /*CmiPushPE(CMI_DEST_RANK(msg), msg);*/
 
-    //CmiPrintf("[%d][%d] Received the bcast message \n", CmiMyPe(), CmiMyNode());
-#if CMK_BROADCAST_SPANNING_TREE
-    SendSpanningChildrenProc(size, msg);
-#elif CMK_BROADCAST_HYPERCUBE
-    SendHyperCubeProc(size, msg);
-#endif
-#if CMK_BROADCAST_SPANNING_TREE && CMK_BROADCAST_USE_CMIREFERENCE
-      /* same message may be sent out, make a copy of it */
-    if (CmiNumNodes()>1 && CmiGetReference(msg)>1) {
-      void *newmsg;
-      newmsg = CopyMsg(msg, size);
-      CmiFree(msg);
-      msg = newmsg;
-    }
+    // Forward regular messages, do not forward ncpy bcast messages as those messages
+    // are forwarded separately after the completion of the payload transfer
+#if CMK_ONESIDED_IMPL
+    if(!CMI_IS_ZC_BCAST(msg))
 #endif
+      forwardProcBcastMsg(size, msg);
+
     CmiPushPE(0, msg);
 
 }
 
-
 #if CMK_NODE_QUEUE_AVAILABLE
-static INLINE_KEYWORD void processNodeBcastMsg(int size, char *msg) {
+// Method to forward the received node message to my child nodes
+static INLINE_KEYWORD void forwardNodeBcastMsg(int size, char *msg) {
 #if CMK_BROADCAST_SPANNING_TREE
-    SendSpanningChildrenNode(size, msg);
+  SendSpanningChildrenNode(size, msg);
 #elif CMK_BROADCAST_HYPERCUBE
-    SendHyperCubeNode(size, msg);
+  SendHyperCubeNode(size, msg);
+#endif
+}
+
+// API to forward node bcast msg
+void CmiForwardNodeBcastMsg(int size, char *msg) {
+  forwardNodeBcastMsg(size, msg);
+}
+
+static INLINE_KEYWORD void processNodeBcastMsg(int size, char *msg) {
+    // Forward regular messages, do not forward ncpy bcast messages as those messages
+    // are forwarded separately after the completion of the payload transfer
+#if CMK_ONESIDED_IMPL
+    if(!CMI_IS_ZC_BCAST(msg))
 #endif
+      forwardNodeBcastMsg(size, msg);
 
     /* In SMP mode, this push operation needs to be executed
      * after forwarding broadcast messages. If it is executed
@@ -96,6 +120,18 @@ static INLINE_KEYWORD void processNodeBcastMsg(int size, char *msg) {
 }
 #endif
 
+// API to forward proc bcast msg
+void CmiForwardProcBcastMsg(int size, char *msg) {
+  forwardProcBcastMsg(size, msg);
+}
+
+#if CMK_SMP
+// API to forward message to peer PEs
+void CmiForwardMsgToPeers(int size, char *msg) {
+  SendToPeers(size, msg);
+}
+#endif
+
 static void SendSpanningChildren(int size, char *msg, int rankToAssign, int startNode) {
 #if CMK_BROADCAST_SPANNING_TREE
     int i, oldRank;
@@ -202,9 +238,14 @@ static void SendSpanningChildrenProc(int size, char *msg) {
     int startnode = CMI_BROADCAST_ROOT(msg)-1;
     SendSpanningChildren(size, msg, 0, startnode);
 #if CMK_SMP
-    /* second send msgs to my peers on this node */
-    SendToPeers(size, msg);
-#endif
+    // Forward regular messages, do not forward ncpy bcast messages as those messages
+    // are forwarded separately after the completion of the payload transfer
+#if CMK_ONESIDED_IMPL
+    if(!CMI_IS_ZC_BCAST(msg))
+#endif // end of CMK_ONESIDED_IMPL
+      /* second send msgs to my peers on this node */
+      SendToPeers(size, msg);
+#endif // end of CMK_SMP
 }
 
 /* send msg along the hypercube in broadcast. (Sameer) */
@@ -215,10 +256,16 @@ static void SendHyperCubeProc(int size, char *msg) {
     if (startpe > CmiNumPes()) startnode = startpe - CmiNumPes();
 #endif
     SendHyperCube(size, msg, 0, startnode);
+
 #if CMK_SMP
-    /* second send msgs to my peers on this node */
-    SendToPeers(size, msg);
-#endif
+    // Forward regular messages, do not forward ncpy bcast messages as those messages
+    // are forwarded separately after the completion of the payload transfer
+#if CMK_ONESIDED_IMPL
+    if(!CMI_IS_ZC_BCAST(msg))
+#endif // end of CMK_ONESIDED_IMPL
+      /* second send msgs to my peers on this node */
+      SendToPeers(size, msg);
+#endif // end of CMK_SMP
 }
 
 #if CMK_NODE_QUEUE_AVAILABLE
index 4adbe69677c759d0229e3962f841990c4505ebfc..d9c4594c3edd35b72c025e89f6cd760ebe4529f9 100644 (file)
@@ -24,7 +24,7 @@
    of the message and used in the LRTS based CMA implementaion.
 */
 #define CMK_MSG_HEADER_BASIC   CMK_MSG_HEADER_EXT
-#define CMK_MSG_HEADER_EXT_    CmiUInt2 d0,d1,d2,d3,hdl,type,xhdl,info,redID,rank; CmiInt4 root, size;
+#define CMK_MSG_HEADER_EXT_    CmiUInt2 d0,d1,d2,d3,hdl,type,xhdl,info,redID,rank; CmiInt4 root, size; CmiUInt1 zcMsgType:2;
 #define CMK_MSG_HEADER_EXT       { CMK_MSG_HEADER_EXT_ }
 #define CMK_MSG_HEADER_BIGSIM_  { CMK_MSG_HEADER_EXT_ CMK_BIGSIM_FIELDS }
 
index c509e16784357dd9acef4d033e0c3f8fcaec1961..2581cf0ea93c1211ef4c43d657c442b869fdbe0a 100644 (file)
@@ -5,7 +5,7 @@
 //#undef CMK_MSG_HEADER_BIGSIM_
 /* expand the header to store the restart phase counter(pn) */
 #define CMK_MSG_HEADER_BASIC   CMK_MSG_HEADER_EXT
-#define CMK_MSG_HEADER_EXT_    CmiUInt2 d0,d1,d2,d3,hdl,pn,d4,type,xhdl,info,dd,redID,pad2,rank; CmiUInt4 root,size;
+#define CMK_MSG_HEADER_EXT_    CmiUInt2 d0,d1,d2,d3,hdl,pn,d4,type,xhdl,info,dd,redID,pad2,rank; CmiUInt4 root,size; CmiUInt1 zcMsgType:2;
 //#define CMK_MSG_HEADER_EXT    { CMK_MSG_HEADER_EXT_ }
 //#define CMK_MSG_HEADER_BIGSIM_    { CmiUInt2 d0,d1,d2,d3,d4,d5,hdl,xhdl,pn,info; int nd, n; double rt; CmiInt2 tID; CmiUInt2 hID; char t; int msgID; int srcPe;}
 //#define CMK_MSG_HEADER_BIGSIM_  { CMK_MSG_HEADER_EXT_ CMK_BIGSIM_FIELDS }
index 259b43a2b4ccf538b2ca120b762be73aace2c15c..0b4fc454b74feb36e3a0a8ed6f416b7acb8b3da0 100644 (file)
@@ -103,7 +103,7 @@ void LrtsIssueRget(NcpyOperationInfo *ncpyOpInfo) {
     struct ibv_mr *packetKey;
     if(ncpyOpInfo->opMode == CMK_DIRECT_API) {
       packetKey = METADATAFIELD(ncpyOpInfo)->key;
-    } else if(ncpyOpInfo->opMode == CMK_EM_API) {
+    } else if(ncpyOpInfo->opMode == CMK_EM_API || ncpyOpInfo->opMode == CMK_BCAST_EM_API) {
       // Register the small message in order to send it to the other side
       packetKey = ibv_reg_mr(context->pd, (void *)ncpyOpInfo, ncpyOpInfo->ncpyOpInfoSize, IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_READ);
       if (!packetKey) {
index fb68f13a8eb8c61de387e3b1a1e941ba5113aed7..0cd5c93a66b6ba5ded87c714f1846c637512577f 100644 (file)
@@ -1212,15 +1212,23 @@ void _processHandler(void *converseMsg,CkCoreState *ck)
   MESSAGE_PHASE_CHECK(env);
 
 #if CMK_ONESIDED_IMPL
-  if(env->isRdma()){
+  if(CMI_ZC_MSGTYPE(env) == CMK_ZC_P2P_SEND_MSG || CMI_ZC_MSGTYPE(env) == CMK_ZC_BCAST_SEND_MSG){
     envelope *prevEnv = env;
-    env = CkRdmaIssueRgets(prevEnv);
+
+    ncpyEmApiMode mode = ncpyEmApiMode::P2P; // Ncpy p2p API
+
+    if(env->getMsgtype() == ForBocMsg || env->getMsgtype() == ForNodeBocMsg) {
+      mode = ncpyEmApiMode::BCAST; // Ncpy Bcast API
+    }
+
+    env = CkRdmaIssueRgets(env, mode, prevEnv);
+
     if(env) {
-      // Within pe or logical node, env points to new message with data
+      // memcpyGet or cmaGet completed, env contains the payload and will be enqueued
 
       // Free prevEnv
       CkFreeMsg(EnvToUsr(prevEnv));
-    } else{
+    } else {
       // async rdma call in place, asynchronous return and ack handling
       return;
     }
@@ -1420,6 +1428,12 @@ void _skipCldEnqueue(int pe,envelope *env, int infoFn)
   }
 #endif
 
+#if CMK_ONESIDED_IMPL
+  // Store source information to handle acknowledgements on completion
+  if(CMI_IS_ZC_BCAST(env))
+    CkRdmaPrepareBcastMsg(env);
+#endif
+
 #if CMK_FAULT_EVAC
   if(pe == CkMyPe() ){
     if(!CmiNodeAlive(CkMyPe())){
@@ -1521,6 +1535,12 @@ static void _noCldEnqueue(int pe, envelope *env)
   }
 #endif
 
+#if CMK_ONESIDED_IMPL
+  // Store source information to handle acknowledgements on completion
+  if(CMI_IS_ZC_BCAST(env))
+    CkRdmaPrepareBcastMsg(env);
+#endif
+
   CkPackMessage(&env);
   int len=env->getTotalsize();
   if (pe==CLD_BROADCAST) { CmiSyncBroadcastAndFree(len, (char *)env); }
@@ -1544,6 +1564,12 @@ void _noCldNodeEnqueue(int node, envelope *env)
   }
 #endif
 
+#if CMK_ONESIDED_IMPL
+  // Store source information to handle acknowledgements on completion
+  if(CMI_IS_ZC_BCAST(env))
+    CkRdmaPrepareBcastMsg(env);
+#endif
+
   CkPackMessage(&env);
   int len=env->getTotalsize();
   if (node==CLD_BROADCAST) { 
@@ -1658,7 +1684,7 @@ void CkSendMsg(int entryIdx, void *msg,const CkChareID *pCid, int opts)
   envelope *env = UsrToEnv(msg);
 #if CMK_ERROR_CHECKING
   //Allow rdma metadata messages marked as immediate to go through
-  if (opts & CK_MSG_IMMEDIATE && !env->isRdma()) {
+  if (opts & CK_MSG_IMMEDIATE && (CMI_ZC_MSGTYPE(env) == CMK_REG_NO_ZC_MSG)) {
     CmiAbort("Immediate message is not allowed in Chare!");
   }
 #endif
@@ -1850,7 +1876,7 @@ void CkSendMsgBranch(int eIdx, void *msg, int pe, CkGroupID gID, int opts)
   }
   envelope *env=UsrToEnv(msg);
   //Allow rdma metadata messages marked as immediate to go through
-  if (opts & CK_MSG_IMMEDIATE && !env->isRdma()) {
+  if (opts & CK_MSG_IMMEDIATE && (CMI_ZC_MSGTYPE(env) == CMK_REG_NO_ZC_MSG)) {
     CkSendMsgBranchImmediate(eIdx,msg,pe,gID);
     return;
   }
@@ -1979,7 +2005,7 @@ void CkSendMsgNodeBranchImmediate(int eIdx, void *msg, int node, CkGroupID gID)
 extern "C"
 void CkSendMsgNodeBranchInline(int eIdx, void *msg, int node, CkGroupID gID, int opts)
 {
-  if (node==CkMyNode() && ((envelope *)(UsrToEnv(msg)))->isRdma() == false)
+  if (node==CkMyNode() && CMI_ZC_MSGTYPE((envelope *)(UsrToEnv(msg))) == CMK_REG_NO_ZC_MSG)
   {
     CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
     void *obj = CksvAccess(_nodeGroupTable)->find(gID).getObj();
index b2b85239c036fafa71daf86e875c352859ee9d24..351b0da3fdbb16eca0f17598cfc455cbdb36e041 100644 (file)
@@ -28,6 +28,12 @@ inline void _CldEnqueue(int pe, void *msg, int infofn) {
     CmiFree(msg);
     return;
   }
+#if CMK_ONESIDED_IMPL
+  envelope *env = (envelope *)msg;
+  // Store source information to handle acknowledgements on completion
+  if(CMI_IS_ZC_BCAST(msg))
+    CkRdmaPrepareBcastMsg(env);
+#endif
   CldEnqueue(pe, msg, infofn);
 }
 inline void _CldEnqueueMulti(int npes, int *pes, void *msg, int infofn) {
@@ -49,15 +55,33 @@ inline void _CldNodeEnqueue(int node, void *msg, int infofn) {
     CmiFree(msg);
     return;
   }
+#if CMK_ONESIDED_IMPL
+  envelope *env = (envelope *)msg;
+  // Store source information to handle acknowledgements on completion
+  if(CMI_IS_ZC_BCAST(msg))
+    CkRdmaPrepareBcastMsg(env);
+#endif
   CldNodeEnqueue(node, msg, infofn);
 }
 #else
 
 inline void _CldEnqueue(int pe, void *msg, int infofn) {
+#if CMK_ONESIDED_IMPL
+  envelope *env = (envelope *)msg;
+  // Store source information to handle acknowledgements on completion
+  if(CMI_IS_ZC_BCAST(msg))
+    CkRdmaPrepareBcastMsg(env);
+#endif
   CldEnqueue(pe, msg, infofn);
 }
 
 inline void _CldNodeEnqueue(int node, void *msg, int infofn) {
+#if CMK_ONESIDED_IMPL
+  envelope *env = (envelope *)msg;
+  // Store source information to handle acknowledgements on completion
+  if(CMI_IS_ZC_BCAST(msg))
+    CkRdmaPrepareBcastMsg(env);
+#endif
   CldNodeEnqueue(node, msg, infofn);
 }
 #define _CldEnqueueMulti  CldEnqueueMulti
index ee262421978bafc056cc2feabaf96672f7c48406..829b22399a96ad07815817141ae8c0db8ba9a34f 100644 (file)
@@ -13,7 +13,6 @@
 #endif
 
 /*********************************** Zerocopy Direct API **********************************/
-
 // Get Methods
 void CkNcpyBuffer::memcpyGet(CkNcpyBuffer &source) {
   // memcpy the data from the source buffer into the destination buffer
@@ -141,7 +140,7 @@ CkNcpyStatus CkNcpyBuffer::get(CkNcpyBuffer &source){
     return CkNcpyStatus::incomplete;
 
   } else {
-    CkAbort("Invalid CkNcpyMode");
+    CkAbort("CkNcpyBuffer::get : Invalid CkNcpyMode");
   }
 }
 
@@ -270,7 +269,7 @@ CkNcpyStatus CkNcpyBuffer::put(CkNcpyBuffer &destination){
     return CkNcpyStatus::incomplete;
 
   } else {
-    CkAbort("Invalid CkNcpyMode");
+    CkAbort("CkNcpyBuffer::put : Invalid CkNcpyMode");
   }
 }
 
@@ -342,17 +341,23 @@ void CkRdmaDirectAckHandler(void *ack) {
   CkCallback *destCb = (CkCallback *)(info->destAck);
 
   switch(info->opMode) {
-    case CMK_DIRECT_API    : handleDirectApiCompletion(info); // Ncpy Direct API
-                             break;
+    case CMK_DIRECT_API           : handleDirectApiCompletion(info); // Ncpy Direct API
+                                    break;
 #if CMK_ONESIDED_IMPL
-    case CMK_EM_API        : handleEntryMethodApiCompletion(info); // Ncpy EM API invoked through a GET
-                             break;
+    case CMK_EM_API               : handleEntryMethodApiCompletion(info); // Ncpy EM API invoked through a GET
+                                    break;
+
+    case CMK_EM_API_REVERSE       : handleReverseEntryMethodApiCompletion(info); // Ncpy EM API invoked through a PUT
+                                    break;
 
-    case CMK_EM_API_REVERSE: handleReverseEntryMethodApiCompletion(info); // Ncpy EM API invoked through a PUT
-                             break;
+    case CMK_BCAST_EM_API         : handleBcastEntryMethodApiCompletion(info); // Ncpy EM Bcast API
+                                    break;
+
+    case CMK_BCAST_EM_API_REVERSE : handleBcastReverseEntryMethodApiCompletion(info); // Ncpy EM Bcast API invoked through a PUT
+                                    break;
 #endif
-    default                : CmiAbort("Unknown opMode");
-                             break;
+    default                       : CkAbort("CkRdmaDirectAckHandler: Unknown ncpyOpInfo->opMode");
+                                    break;
   }
 }
 
@@ -383,20 +388,102 @@ CkNcpyMode findTransferMode(int srcPe, int destPe) {
     return CkNcpyMode::RDMA;
 }
 
+void enqueueNcpyMessage(int destPe, void *msg){
+  // invoke the charm message handler to enqueue the messsage
+#if CMK_SMP && !CMK_ENABLE_ASYNC_PROGRES
+  // invoked from the comm thread, so send message to the worker thread
+  CmiPushPE(CmiRankOf(destPe), msg);
+#else
+  // invoked from the worker thread, process message
+  CmiHandleMessage(msg);
+#endif
+}
 
 
+/*********************************** Zerocopy Entry Method API ****************************/
+#if CMK_ONESIDED_IMPL
 
+// Method called on completion of an Zcpy EM API (P2P or BCAST)
+void CkRdmaEMAckHandler(int destPe, void *ack) {
 
+  if(_topoTree == NULL) CkAbort("CkRdmaIssueRgets:: topo tree has not been calculated \n");
+  CmiSpanningTreeInfo &t = *_topoTree;
+
+  NcpyEmBufferInfo *emBuffInfo = (NcpyEmBufferInfo *)(ack);
+
+  char *ref = (char *)(emBuffInfo);
+
+  int layerInfoSize = CMK_COMMON_NOCOPY_DIRECT_BYTES + CMK_NOCOPY_DIRECT_BYTES;
+  int ncpyObjSize = getNcpyOpInfoTotalSize(
+                    layerInfoSize,
+                    sizeof(CkCallback),
+                    layerInfoSize,
+                    0);
+
+
+  NcpyEmInfo *ncpyEmInfo = (NcpyEmInfo *)(ref - (emBuffInfo->index) * (sizeof(NcpyEmBufferInfo) + ncpyObjSize - sizeof(NcpyOperationInfo)) - sizeof(NcpyEmInfo));
+  ncpyEmInfo->counter++; // Operation completed, update counter
+
+#if CMK_REG_REQUIRED
+  if(ncpyEmInfo->mode == ncpyEmApiMode::P2P || (ncpyEmInfo->mode == ncpyEmApiMode::BCAST && t.child_count == 0)) { // EM P2P API or EM BCAST API's child node
+    NcpyOperationInfo *ncpyOpInfo = &(emBuffInfo->ncpyOpInfo);
+    // De-register the destination buffer
+    CmiDeregisterMem(ncpyOpInfo->destPtr, ncpyOpInfo->destLayerInfo + CmiGetRdmaCommonInfoSize(), ncpyOpInfo->destPe, ncpyOpInfo->destMode);
+  }
+#endif
+
+  if(ncpyEmInfo->counter == ncpyEmInfo->numOps) {
+    // All operations have been completed
+
+    if(ncpyEmInfo->mode == ncpyEmApiMode::P2P) { // EM P2P API
+
+      // enqueue message
+      enqueueNcpyMessage(destPe, ncpyEmInfo->msg);
+
+    } else if(ncpyEmInfo->mode == ncpyEmApiMode::BCAST) { // EM BCAST API
+
+      envelope *myMsg = (envelope *)(ncpyEmInfo->msg);
+
+      if(t.child_count != 0) {  // Intermediate Node
+
+        envelope *myChildrenMsg = (envelope *)(ncpyEmInfo->forwardMsg);
+
+        // Replace received message with my pointers for my children
+        allocateObjAndReplacePointers(myMsg, myChildrenMsg, ncpyEmInfo->pe, ncpyEmInfo);
+
+        // Send message to children for them to Rget from me
+        forwardMessageToChildNodes(myChildrenMsg, myMsg->getMsgtype());
+
+      } else {  // Child Node
+
+        // Send a message to the parent node to signal completion
+        int srcPe;
+
+        char *ref = (char *)(getParentBcastAckInfo(ncpyEmInfo->msg, srcPe)); // srcPe is passed by reference and written inside the method
+
+        CmiInvokeBcastAckHandler(srcPe,ref); // Invoke BcastAckHandler on the parent node to notify completion
+
+        // Since I am a child node, no more forwarding to any more childing
+        // Only forwarding is to peer PEs
+        forwardMessageToPeerNodes(myMsg, myMsg->getMsgtype());
+
+        // enquque message to execute EM on the child node
+        enqueueNcpyMessage(destPe, myMsg);
+      }
+    } else {
+      CkAbort("CkRdmaEMAckHandler: Invalid Ncpy Operation Mode");
+    }
+  }
+}
 
-/*********************************** Zerocopy Entry Method API ****************************/
-#if CMK_ONESIDED_IMPL
 /*
  * Extract ncpy buffer information from the metadata message, allocate buffers
  * and issue ncpy calls (either memcpy or cma read or rdma get). Main method called on
  * the destination to perform zerocopy operations as a part of the Zerocopy Entry Method
  * API
  */
-envelope* CkRdmaIssueRgets(envelope *env){
+envelope* CkRdmaIssueRgets(envelope *env, ncpyEmApiMode emMode, void *forwardMsg){
+
   int numops=0, bufsize=0, msgsize=0;
 
   CkUnpackMessage(&env); // Unpack message to access msgBuf inside getRdmaNumopsAndBufsize
@@ -406,14 +493,16 @@ envelope* CkRdmaIssueRgets(envelope *env){
   msgsize = env->getTotalsize();
 
   CkNcpyMode ncpyMode = findTransferMode(env->getSrcPe(), CkMyPe());
+  if(_topoTree == NULL) CkAbort("CkRdmaIssueRgets:: topo tree has not been calculated \n");
+  CmiSpanningTreeInfo &t = *_topoTree;
 
   int totalMsgSize = CK_ALIGN(msgsize, 16) + bufsize;
   char *ref;
   int layerInfoSize, ncpyObjSize, extraSize;
 
-  if(ncpyMode == CkNcpyMode::RDMA) {
+  layerInfoSize = CMK_COMMON_NOCOPY_DIRECT_BYTES + CMK_NOCOPY_DIRECT_BYTES;
 
-    layerInfoSize = CMK_COMMON_NOCOPY_DIRECT_BYTES + CMK_NOCOPY_DIRECT_BYTES;
+  if(ncpyMode == CkNcpyMode::RDMA) {
 
     ncpyObjSize = getNcpyOpInfoTotalSize(
                   layerInfoSize,
@@ -437,9 +526,9 @@ envelope* CkRdmaIssueRgets(envelope *env){
    */
   copyenv->setTotalsize(totalMsgSize);
 
-  // Set rdma flag to be false to prevent message handler on the receiver
+  // Make the message a regular message to prevent message handler on the receiver
   // from intercepting it
-  copyenv->setRdma(false);
+  CMI_ZC_MSGTYPE(copyenv) = CMK_REG_NO_ZC_MSG;
 
   if(ncpyMode == CkNcpyMode::RDMA) {
     ref = (char *)copyenv + CK_ALIGN(msgsize, 16) + bufsize;
@@ -458,9 +547,26 @@ envelope* CkRdmaIssueRgets(envelope *env){
   up|numops;
   p|numops;
 
+  if(ncpyMode == CkNcpyMode::RDMA) {
+
+    ncpyObjSize = getNcpyOpInfoTotalSize(
+                  layerInfoSize,
+                  sizeof(CkCallback),
+                  layerInfoSize,
+                  0);
+
+    extraSize = ncpyObjSize - sizeof(NcpyOperationInfo);
+
+    NcpyEmInfo *ncpyEmInfo = (NcpyEmInfo *)ref;
+    ncpyEmInfo->forwardMsg = forwardMsg; // useful only for BCAST, NULL for P2P
+    ncpyEmInfo->pe = CkMyPe();
+    ncpyEmInfo->mode = emMode; // P2P or BCAST
+  }
+
+  // source buffer
+  CkNcpyBuffer source;
+
   for(int i=0; i<numops; i++){
-    // source buffer
-    CkNcpyBuffer source;
     up|source;
 
     // destination buffer
@@ -474,17 +580,29 @@ envelope* CkRdmaIssueRgets(envelope *env){
 
       dest.memcpyGet(source);
 
-      // Invoke source callback
-      source.cb.send(sizeof(CkNcpyBuffer), &source);
+      if(emMode == ncpyEmApiMode::P2P) {
+        // Invoke source callback
+        source.cb.send(sizeof(CkNcpyBuffer), &source);
+      } // send a message to the parent to indicate completion
+      else if (emMode == ncpyEmApiMode::BCAST) {
+        // Invoke the bcast handler
+        CkRdmaEMBcastAckHandler((void *)source.bcastAckInfo);
+      }
 
 #if CMK_USE_CMA
     } else if(ncpyMode == CkNcpyMode::CMA) {
 
       dest.cmaGet(source);
 
-      // Invoke source callback
-      source.cb.send(sizeof(CkNcpyBuffer), &source);
-
+      if(emMode == ncpyEmApiMode::P2P) {
+        // Invoke source callback
+        source.cb.send(sizeof(CkNcpyBuffer), &source);
+      }
+      else if (emMode == ncpyEmApiMode::BCAST && dest.mode == CK_BUFFER_UNREG && t.child_count != 0) {
+        // register intermediate nodes for Bcast operations for child nodes to Rget
+        CmiSetRdmaBufferInfo(dest.layerInfo + CmiGetRdmaCommonInfoSize(), dest.ptr, dest.cnt, dest.mode);
+        dest.isRegistered = true;
+      }
 #endif
     } else if(ncpyMode == CkNcpyMode::RDMA) {
 
@@ -522,8 +640,14 @@ envelope* CkRdmaIssueRgets(envelope *env){
                     (char *)(ncpyEmBufferInfo), // destRef
                     ncpyOpInfo);
 
-      // set opMode for entry method API
-      ncpyOpInfo->opMode = CMK_EM_API;
+      // set opMode
+      if(emMode == ncpyEmApiMode::BCAST)
+        ncpyOpInfo->opMode = CMK_BCAST_EM_API;  // mode for bcast
+      else if(emMode == ncpyEmApiMode::P2P)
+        ncpyOpInfo->opMode = CMK_EM_API;  // mode for p2p
+      else
+        CkAbort("CkRdmaIssueRgets: Invalid ncpyEmApiMode");
+
       ncpyOpInfo->freeMe = CMK_DONT_FREE_NCPYOPINFO; // Since ncpyOpInfo is a part of the charm message, don't explicitly free it
                                                      // It'll be freed when the message is freed by the RTS after the execution of the entry method
       ncpyOpInfo->refPtr = ncpyEmBufferInfo;
@@ -533,12 +657,14 @@ envelope* CkRdmaIssueRgets(envelope *env){
       // on the comm. thread as the message is being inside this for loop on the worker thread
 
     } else {
-      CkAbort("Invalid Mode");
+      CkAbort("CkRdmaIssueRgets: Invalid CkNcpyMode");
     }
 
     //Update the CkRdmaWrapper pointer of the new message
     source.ptr = buf;
 
+    memcpy(source.layerInfo, dest.layerInfo, layerInfoSize);
+
     //Update the pointer
     buf += CK_ALIGN(source.cnt, 16);
     p|source;
@@ -547,11 +673,47 @@ envelope* CkRdmaIssueRgets(envelope *env){
   // Substitute buffer pointers by their offsets from msgBuf to handle migration
   CkPackRdmaPtrs(((CkMarshallMsg *)EnvToUsr(copyenv))->msgBuf);
 
-  if(ncpyMode == CkNcpyMode::MEMCPY || ncpyMode == CkNcpyMode::CMA ) {
+  if(ncpyMode == CkNcpyMode::MEMCPY || (emMode == ncpyEmApiMode::P2P && ncpyMode == CkNcpyMode::CMA)) {
+
+    if(emMode == ncpyEmApiMode::BCAST) {
+
+      // Pack message as it will be forwarded to peers
+      CkPackMessage(&copyenv);
+
+      forwardMessageToPeerNodes(copyenv, copyenv->getMsgtype());
+    }
+
     // All operations have completed
     return copyenv; // to indicate the completion of the gets
     // copyenv represents the new message which consists of the destination buffers
 
+  } else if(emMode == ncpyEmApiMode::BCAST && ncpyMode == CkNcpyMode::CMA) {
+
+    // Pack message as it will be forwarded to peers
+    CkPackMessage(&copyenv);
+
+    if(t.child_count != 0) { // intermediate node
+
+      // Replace received message with my pointers for my children
+      allocateObjAndReplacePointers(copyenv, env, CkMyPe(), NULL);
+
+      // Send message to children for them to Rget from me
+      forwardMessageToChildNodes(env, copyenv->getMsgtype());
+
+    } else {
+
+      // Send a message to the parent node to signal completion
+      CmiInvokeBcastAckHandler(source.pe, (void *)source.bcastAckInfo);
+
+      // Only forwarding is to peer PEs
+      forwardMessageToPeerNodes(copyenv, copyenv->getMsgtype());
+
+      // enqueue message to execute EM on the child node
+      enqueueNcpyMessage(CkMyPe(), copyenv);
+    }
+
+    return NULL; // to take no further action
+
   } else{
 
     // Launch rgets
@@ -564,6 +726,7 @@ envelope* CkRdmaIssueRgets(envelope *env){
   }
 }
 
+/* Zerocopy Entry Method API Functions */
 // Method called to unpack rdma pointers
 void CkPackRdmaPtrs(char *msgBuf){
   PUP::toMem p((void *)msgBuf);
@@ -631,42 +794,245 @@ void handleReverseEntryMethodApiCompletion(NcpyOperationInfo *info) {
     CmiFree(info);
 }
 
-// Ack handler function called when a Zerocopy Entry Method buffer completes
-void CkRdmaEMAckHandler(int destPe, void *ack) {
 
-  NcpyEmBufferInfo *emBuffInfo = (NcpyEmBufferInfo *)(ack);
+/***************************** Zerocopy Bcast Entry Method API ****************************/
 
-  char *ref = (char *)(emBuffInfo);
+// Method called on the bcast source to store some information for ack handling
+void CkRdmaPrepareBcastMsg(envelope *env) {
 
-  int layerInfoSize = CMK_COMMON_NOCOPY_DIRECT_BYTES + CMK_NOCOPY_DIRECT_BYTES;
-  int ncpyObjSize = getNcpyOpInfoTotalSize(
-                    layerInfoSize,
-                    sizeof(CkCallback),
-                    layerInfoSize,
-                    0);
+  int numops;
+  CkUnpackMessage(&env);
+  PUP::toMem p((void *)(((CkMarshallMsg *)EnvToUsr(env))->msgBuf));
+  PUP::fromMem up((void *)((CkMarshallMsg *)EnvToUsr(env))->msgBuf);
 
+  up|numops;
+  p|numops;
 
-  NcpyEmInfo *ncpyEmInfo = (NcpyEmInfo *)(ref - (emBuffInfo->index) * (sizeof(NcpyEmBufferInfo) + ncpyObjSize - sizeof(NcpyOperationInfo)) - sizeof(NcpyEmInfo));
-  ncpyEmInfo->counter++; // A zerocopy get completed, update the counter
+  NcpyBcastRootAckInfo *bcastAckInfo = (NcpyBcastRootAckInfo *)CmiAlloc(sizeof(NcpyBcastRootAckInfo) + numops * sizeof(CkNcpyBuffer));
 
-#if CMK_REG_REQUIRED
-  NcpyOperationInfo *ncpyOpInfo = &(emBuffInfo->ncpyOpInfo);
-  // De-register the destination buffer
-  CmiDeregisterMem(ncpyOpInfo->destPtr, ncpyOpInfo->destLayerInfo + CmiGetRdmaCommonInfoSize(), ncpyOpInfo->destPe, ncpyOpInfo->destMode);
+  CmiSpanningTreeInfo &t = *_topoTree;
+  bcastAckInfo->numChildren = t.child_count + 1;
+  bcastAckInfo->counter = 0;
+  bcastAckInfo->isRoot  = true;
+  bcastAckInfo->numops  = numops;
+  bcastAckInfo->pe = CkMyPe();
+
+  for(int i=0; i<numops; i++) {
+    CkNcpyBuffer source;
+    up|source;
+
+    bcastAckInfo->src[i] = source;
+
+    source.bcastAckInfo = bcastAckInfo;
+
+    p|source;
+  }
+  CkPackMessage(&env);
+}
+
+// Method called on intermediate nodes after RGET to switch old source pointers with my pointers
+void CkReplaceSourcePtrsInBcastMsg(envelope *prevEnv, envelope *env, void *bcastAckInfo, int origPe) {
+
+  int numops;
+
+  CkUnpackMessage(&prevEnv);
+  PUP::toMem p_prev((void *)(((CkMarshallMsg *)EnvToUsr(prevEnv))->msgBuf));
+  PUP::fromMem up_prev((void *)((CkMarshallMsg *)EnvToUsr(prevEnv))->msgBuf);
+
+  CkUnpackMessage(&env);
+  CkUnpackRdmaPtrs((((CkMarshallMsg *)EnvToUsr(env))->msgBuf));
+  PUP::toMem p((void *)(((CkMarshallMsg *)EnvToUsr(env))->msgBuf));
+  PUP::fromMem up((void *)((CkMarshallMsg *)EnvToUsr(env))->msgBuf);
+
+  up_prev|numops;
+  up|numops;
+
+  p|numops;
+  p_prev|numops;
+
+  for(int i=0; i<numops; i++){
+    // source buffer
+    CkNcpyBuffer prev_source, source;
+
+    // unpack from previous message
+    up_prev|prev_source;
+
+    // unpack from current message
+    up|source;
+
+    const void *bcastAckInfoTemp = source.bcastAckInfo;
+    int orig_source_pe = source.pe;
+
+    source.bcastAckInfo = bcastAckInfo;
+    source.pe = origPe;
+
+    // pack updated CkNcpyBuffer into previous message
+    p_prev|source;
+
+    source.bcastAckInfo = bcastAckInfoTemp;
+    source.pe = orig_source_pe;
+
+    // pack back CkNcpyBuffer into current message
+    p|source;
+  }
+
+  CkPackMessage(&prevEnv);
+
+  CkPackRdmaPtrs((((CkMarshallMsg *)EnvToUsr(env))->msgBuf));
+  CkPackMessage(&env);
+}
+
+// Method called to extract the parent bcastAckInfo from the received message for ack handling
+const void *getParentBcastAckInfo(void *msg, int &srcPe) {
+  int numops;
+  CkNcpyBuffer source;
+  envelope *env = (envelope *)msg;
+  CkUnpackMessage(&env);
+  PUP::toMem p((void *)(((CkMarshallMsg *)EnvToUsr(env))->msgBuf));
+  PUP::fromMem up((void *)((CkMarshallMsg *)EnvToUsr(env))->msgBuf);
+
+  up|numops;
+  p|numops;
+
+  CkAssert(numops >= 1);
+
+  up|source;
+  p|source;
+
+  srcPe = source.pe;
+
+  CkPackMessage(&env);
+  return source.bcastAckInfo;
+}
+
+// Executed only on intermediate nodes
+// Called only on intermediate nodes
+// Allocate a NcpyBcastInterimAckInfo and forward the message to my children
+void allocateObjAndReplacePointers(envelope *myMsg, envelope *myChildrenMsg, int pe, NcpyEmInfo *ncpyEmInfo) {
+  if(_topoTree == NULL) CkAbort("CkRdmaIssueRgets:: topo tree has not been calculated \n");
+  CmiSpanningTreeInfo &t = *_topoTree;
+
+  // Allocate a NcpyBcastInterimAckInfo object
+  NcpyBcastInterimAckInfo *bcastAckInfo = (NcpyBcastInterimAckInfo *)CmiAlloc(sizeof(NcpyBcastInterimAckInfo));
+
+  // Initialize fields of bcastAckInfo
+  bcastAckInfo->numChildren = t.child_count;
+  bcastAckInfo->counter = 0;
+  bcastAckInfo->isRoot = false;
+  bcastAckInfo->pe = pe;
+
+  // initialize derived calss NcpyBcastInterimAckInfo fields
+  bcastAckInfo->msg = myMsg; // this message will be enqueued after the completion of all operations
+  bcastAckInfo->ncpyEmInfo = ncpyEmInfo; // NULL in the case of CMA
+
+  // Replace parent pointers with my pointers for my children
+  CkReplaceSourcePtrsInBcastMsg(myChildrenMsg, myMsg, bcastAckInfo, pe);
+}
+
+// Called only on intermediate nodes
+// Method forwards a message to all the children
+void forwardMessageToChildNodes(envelope *myChildrenMsg, UChar msgType) {
+#if CMK_SMP && CMK_NODE_QUEUE_AVAILABLE
+  if(msgType == ForNodeBocMsg) {
+    // Node level forwarding for nodegroup bcasts
+    CmiForwardNodeBcastMsg(myChildrenMsg->getTotalsize(), (char *)myChildrenMsg);
+  } else
 #endif
+  // Proc level forwarding
+  CmiForwardProcBcastMsg(myChildrenMsg->getTotalsize(), (char *)myChildrenMsg);
+}
 
-  // Check if all rdma operations are complete
-  if(ncpyEmInfo->counter == ncpyEmInfo->numOps) {
+// Method forwards a message to all the peer nodes
+void forwardMessageToPeerNodes(envelope *myMsg, UChar msgType) {
+#if CMK_SMP
+#if CMK_NODE_QUEUE_AVAILABLE
+  if(msgType == ForBocMsg)
+#endif // CMK_NODE_QUEUE_AVAILABLE
+    CmiForwardMsgToPeers(myMsg->getTotalsize(), (char *)myMsg);
+#endif
+}
 
-    // invoke the charm message handler to enqueue the messsage
-#if CMK_SMP && !CMK_ENABLE_ASYNC_PROGRES
-    // invoked from the comm thread, so send message to the worker thread
-    CmiPushPE(CmiRankOf(destPe), ncpyEmInfo->msg);
-#else
-    // invoked from the worker thread, process message
-    CmiHandleMessage(ncpyEmInfo->msg);
+void handleBcastEntryMethodApiCompletion(NcpyOperationInfo *info){
+  if(info->ackMode == CMK_SRC_DEST_ACK || info->ackMode == CMK_DEST_ACK) {
+    // invoking the entry method
+    // Invoke the ackhandler function to update the counter
+    CkRdmaEMAckHandler(info->destPe, info->refPtr);
+  }
+}
+
+void handleBcastReverseEntryMethodApiCompletion(NcpyOperationInfo *info) {
+  if(info->ackMode == CMK_SRC_DEST_ACK || info->ackMode == CMK_DEST_ACK) {
+    // Invoke the remote ackhandler function
+    CmiInvokeRemoteAckHandler(info->destPe, info->refPtr);
+  }
+  if(info->freeMe == CMK_FREE_NCPYOPINFO)
+    CmiFree(info);
+}
+
+// Method called on the root node and other intermediate parent nodes on completion of RGET through ZC Bcast
+void CkRdmaEMBcastAckHandler(void *ack) {
+  NcpyBcastAckInfo *bcastAckInfo = (NcpyBcastAckInfo *)ack;
+
+  bcastAckInfo->counter++; // Increment counter to indicate that another child was completed
+
+  if(bcastAckInfo->counter == bcastAckInfo->numChildren) {
+    // All child nodes have completed RGETs
+
+    if(bcastAckInfo->isRoot) {
+
+      NcpyBcastRootAckInfo *bcastRootAckInfo = (NcpyBcastRootAckInfo *)(bcastAckInfo);
+      // invoke the callback with the pointer
+      for(int i=0; i<bcastRootAckInfo->numops; i++) {
+        invokeCallback(&(bcastRootAckInfo->src[i].cb),
+                       bcastRootAckInfo->pe,
+                       bcastRootAckInfo->src[i]);
+      }
+
+      CmiFree(bcastRootAckInfo);
+
+    } else {
+      CmiSpanningTreeInfo &t = *_topoTree;
+
+      NcpyBcastInterimAckInfo *bcastInterimAckInfo = (NcpyBcastInterimAckInfo *)(bcastAckInfo);
+
+      envelope *myMsg = (envelope *)(bcastInterimAckInfo->msg);
+
+      // deregister using the message
+#if CMK_REG_REQUIRED
+      CkUnpackMessage(&myMsg);
+      PUP::toMem p((void *)(((CkMarshallMsg *)EnvToUsr(myMsg))->msgBuf));
+      PUP::fromMem up((void *)((CkMarshallMsg *)EnvToUsr(myMsg))->msgBuf);
+      int numops;
+      up|numops;
+      p|numops;
+
+      CkNcpyBuffer dest;
+
+      for(int i=0; i<numops; i++){
+        up|dest;
+
+        // De-register the destination buffer
+        CmiDeregisterMem(dest.ptr, (char *)dest.layerInfo + CmiGetRdmaCommonInfoSize(), dest.pe, dest.mode);
+
+        p|dest;
+      }
+      CkPackMessage(&myMsg);
 #endif
+
+      // send a message to the parent to signal completion
+      int srcPe;
+      char *ref = (char *)(getParentBcastAckInfo(bcastInterimAckInfo->msg, srcPe));
+      CmiInvokeBcastAckHandler(srcPe,ref);
+
+      forwardMessageToPeerNodes(myMsg, myMsg->getMsgtype());
+
+      // enquque message to execute EM on the intermediate node
+      enqueueNcpyMessage(bcastAckInfo->pe, bcastInterimAckInfo->msg);
+
+      CmiFree(bcastInterimAckInfo);
+    }
   }
 }
+
 #endif
 /* End of CMK_ONESIDED_IMPL */
index 2730e33613f06f48651264ba08a97cbe0ce2e2e4..d8cda27316d002ec02566462d87515df3736c623 100644 (file)
@@ -47,6 +47,10 @@ enum class CkNcpyMode : char { MEMCPY, CMA, RDMA };
 // RDMA transfers use a remote asynchronous call and hence return CkNcpyStatus::incomplete
 enum class CkNcpyStatus : char { incomplete, complete };
 
+// P2P mode is used for EM P2P API
+// BCAST mode is used for EM BCAST API
+enum class ncpyEmApiMode : char { P2P, BCAST };
+
 // Class to represent an Zerocopy buffer
 // CkSendBuffer(....) passed by the user internally translates to a CkNcpyBuffer
 class CkNcpyBuffer{
@@ -85,7 +89,10 @@ class CkNcpyBuffer{
   // reference pointer
   const void *ref;
 
-  CkNcpyBuffer() : isRegistered(false), ptr(NULL), cnt(0), pe(-1), mode(CK_BUFFER_REG), ref(NULL) {}
+  // bcast ack handling pointer
+  const void *bcastAckInfo;
+
+  CkNcpyBuffer() : isRegistered(false), ptr(NULL), cnt(0), pe(-1), mode(CK_BUFFER_REG), ref(NULL), bcastAckInfo(NULL) {}
 
   explicit CkNcpyBuffer(const void *address, unsigned short int mode_=CK_BUFFER_REG) {
     ptr = address;
@@ -107,6 +114,10 @@ class CkNcpyBuffer{
     init(ptr_, cnt_, cb_, mode_);
   }
 
+  void print() {
+    CkPrintf("[%d][%d][%d] CkNcpyBuffer print: ptr:%p, size:%d, pe:%d, mode=%d, ref:%p, bcastAckInfo:%p\n", CmiMyPe(), CmiMyNode(), CmiMyRank(), ptr, cnt, pe, mode, ref, bcastAckInfo);
+  }
+
   void init(const void *ptr_, size_t cnt_, CkCallback &cb_, unsigned short int mode_=CK_BUFFER_REG) {
     ptr  = ptr_;
     cnt  = cnt_;
@@ -185,6 +196,7 @@ class CkNcpyBuffer{
   void pup(PUP::er &p) {
     p((char *)&ptr, sizeof(ptr));
     p((char *)&ref, sizeof(ref));
+    p((char *)&bcastAckInfo, sizeof(bcastAckInfo));
     p|cnt;
     p|cb;
     p|pe;
@@ -195,10 +207,12 @@ class CkNcpyBuffer{
 
   friend void CkRdmaDirectAckHandler(void *ack);
 
+  friend void CkRdmaEMBcastAckHandler(void *ack);
+
   friend void constructSourceBufferObject(NcpyOperationInfo *info, CkNcpyBuffer &src);
   friend void constructDestinationBufferObject(NcpyOperationInfo *info, CkNcpyBuffer &dest);
 
-  friend envelope* CkRdmaIssueRgets(envelope *env);
+  friend envelope* CkRdmaIssueRgets(envelope *env, ncpyEmApiMode emMode, void *forwardMsg);
 };
 
 // Ack handler for the Zerocopy Direct API
@@ -215,13 +229,12 @@ void invokeCallback(void *cb, int pe, CkNcpyBuffer &buff);
 // Returns CkNcpyMode::RDMA if RDMA needs to be used
 CkNcpyMode findTransferMode(int srcPe, int destPe);
 
-
 void invokeSourceCallback(NcpyOperationInfo *info);
 
 void invokeDestinationCallback(NcpyOperationInfo *info);
 
-
-
+// Method to enqueue a message after the completion of an payload transfer
+void enqueueNcpyMessage(int destPe, void *msg);
 
 /*********************************** Zerocopy Entry Method API ****************************/
 #define CkSendBuffer(...) CkNcpyBuffer(__VA_ARGS__)
@@ -241,9 +254,13 @@ void invokeDestinationCallback(NcpyOperationInfo *info);
 struct NcpyEmInfo{
   int numOps; // number of zerocopy operations i.e number of buffers sent using CkSendBuffer
   int counter; // used for tracking the number of completed RDMA operations
+  int pe;
+  ncpyEmApiMode mode; // used to distinguish between p2p and bcast
   void *msg; // pointer to the Charm++ message which will be enqueued after completion of all Rgets
+  void *forwardMsg; // used for the ncpy broadcast api
 };
 
+
 // This structure is used to store the buffer information specific to each buffer being sent
 // using the Zerocopy Entry Method API. A variable of the structure stores the information associated
 // with each buffer
@@ -252,11 +269,12 @@ struct NcpyEmBufferInfo{
   NcpyOperationInfo ncpyOpInfo; // Stores all the information required for the zerocopy operation
 };
 
+
 /*
  * Extract ncpy buffer information from the metadata message,
  * allocate buffers and issue ncpy calls (either memcpy or cma read or rdma get)
  */
-envelope* CkRdmaIssueRgets(envelope *env);
+envelope* CkRdmaIssueRgets(envelope *env, ncpyEmApiMode emMode, void *forwardMsg = NULL);
 
 void handleEntryMethodApiCompletion(NcpyOperationInfo *info);
 
@@ -275,6 +293,49 @@ void getRdmaNumopsAndBufsize(envelope *env, int &numops, int &bufsize);
 // Ack handler function for the nocopy EM API
 void CkRdmaEMAckHandler(int destPe, void *ack);
 
+
+
+/***************************** Zerocopy Bcast Entry Method API ****************************/
+struct NcpyBcastAckInfo{
+  int numChildren;
+  int counter;
+  bool isRoot;
+  int pe;
+  int numops;
+};
+
+struct NcpyBcastRootAckInfo : public NcpyBcastAckInfo {
+  CkNcpyBuffer src[0];
+};
+
+struct NcpyBcastInterimAckInfo : public NcpyBcastAckInfo {
+  NcpyEmInfo *ncpyEmInfo;
+  void *msg;
+};
+
+// Method called on the bcast source to store some information for ack handling
+void CkRdmaPrepareBcastMsg(envelope *env);
+
+// Method called on intermediate nodes after RGET to switch old source pointers with my pointers
+void CkReplaceSourcePtrsInBcastMsg(envelope *prevEnv, envelope *env, void *bcastAckInfo, int origPe);
+
+// Method called to extract the parent bcastAckInfo from the received message for ack handling
+const void *getParentBcastAckInfo(void *msg, int &srcPe);
+
+// Allocate a NcpyBcastInterimAckInfo and forward the message to my children
+void allocateObjAndReplacePointers(envelope *myMsg, envelope *myChildrenMsg, int pe, NcpyEmInfo *ncpyEmInfo);
+
+void forwardMessageToChildNodes(envelope *myChildrenMsg, UChar msgType);
+
+void forwardMessageToPeerNodes(envelope *myMsg, UChar msgType);
+
+void handleBcastEntryMethodApiCompletion(NcpyOperationInfo *info);
+
+void handleBcastReverseEntryMethodApiCompletion(NcpyOperationInfo *info);
+
+// Method called on the root node and other intermediate parent nodes on completion of RGET through ZC Bcast
+void CkRdmaEMBcastAckHandler(void *ack);
+
 #endif /* End of CMK_ONESIDED_IMPL */
 
 #endif
index da98f173d323749d1212c3d4f93af6069eba16bd..4aeb9e7d89ebbf632bccce90723495bef9011c18 100644 (file)
@@ -105,9 +105,6 @@ void envelope::pup(PUP::er &p) {
         if (!p.isUnpacking()) d = attribs.isUsed;
         p|d;
         if (p.isUnpacking()) attribs.isUsed = d; 
-        if (!p.isUnpacking()) d = attribs.isRdma;
-        p|d;
-        if (p.isUnpacking()) attribs.isRdma = d;
        p(epIdx);
        p(pe);
 #if CMK_REPLAYSYSTEM || CMK_TRACE_ENABLED
index 6d41967792d863172c210ff6ceab58c72d4c908d..eea3b9f766b1962bb450d7e6263d6487fc905c21 100644 (file)
@@ -180,7 +180,6 @@ namespace ck {
       UChar queueing:4; ///< Queueing strategy (FIFO, LIFO, PFIFO, ...)
       UChar isPacked:1; ///< If true, message must be unpacked before use
       UChar isUsed:1;   ///< Marker bit to prevent message re-send.
-      UChar isRdma:1;   ///< True if msg has Rdma parameters
       UChar isVarSysMsg:1; ///< True if msg is a variable sized sys message that doesn't use a pool
     };
 
@@ -306,9 +305,7 @@ public:
     }
 
     UChar  isPacked(void) const { return attribs.isPacked; }
-    UChar  isRdma(void) const { return attribs.isRdma; }
     void   setPacked(const UChar p) { attribs.isPacked = p; }
-    void   setRdma(const UChar b) { attribs.isRdma = b; }
     UChar  isVarSysMsg(void) const { return attribs.isVarSysMsg; }
     void   setIsVarSysMsg(const UChar d) { attribs.isVarSysMsg = d; }
     UShort getPriobits(void) const { return priobits; }
@@ -350,13 +347,14 @@ public:
       env->totalsize = tsize;
       env->priobits = prio;
       env->setPacked(0);
-      env->setRdma(0);
       env->setGroupDepNum((int)groupDepNumRequest);
       _SET_USED(env, 0);
       env->setRef(0);
       env->setEpIdx(0);
       env->setIsVarSysMsg(0);
 
+      CMI_ZC_MSGTYPE(env) = CMK_REG_NO_ZC_MSG; // Set the default as CMK_REG_NO_ZC_MSG
+
 #if USE_CRITICAL_PATH_HEADER_ARRAY
       env->pathHistory.reset();
 #endif
index 012b4f79cd62fcb3fb7f516fc1caaf0402ef05a5..11bfba493a7230ab4b49a3a3afa6c6e348db1681 100644 (file)
@@ -1286,11 +1286,12 @@ void _initCharm(int unused_argc, char **argv)
     CkpvAccess(envelopeEventID) = 0;
        CkMessageWatcherInit(argv,CkpvAccess(_coreState));
        
-       // Set the ack handler function used for the direct nocopy api and the entry method nocopy api
+       // Set the ack handler function used for the direct nocopy api
        CmiSetDirectNcpyAckHandler(CkRdmaDirectAckHandler);
 
 #if CMK_ONESIDED_IMPL
-       CmiSetEMNcpyAckHandler(CkRdmaEMAckHandler);
+       // Set the ack handler function used for the entry method p2p api and entry method bcast api
+       CmiSetEMNcpyAckHandler(CkRdmaEMAckHandler, CkRdmaEMBcastAckHandler);
 #endif
        /**
          The rank-0 processor of each node calls the 
index c718aaf1080ff958544c7ad4c4d53ccc7b4fafb5..822ad3be6edc6057cdcbc7aa4ee136f5715046c2 100644 (file)
    corresponding conv-mach.h */
 #if CMK_HAS_STDINT_H && !defined(CMK_TYPEDEF_INT2)
 #include <stdint.h>
+typedef int8_t  CMK_TYPEDEF_INT1;
 typedef int16_t CMK_TYPEDEF_INT2;
 typedef int32_t CMK_TYPEDEF_INT4;
 typedef int64_t CMK_TYPEDEF_INT8;
+typedef uint8_t  CMK_TYPEDEF_UINT1;
 typedef uint16_t CMK_TYPEDEF_UINT2;
 typedef uint32_t CMK_TYPEDEF_UINT4;
 typedef uint64_t CMK_TYPEDEF_UINT8;
index 9d462549475762d8480c90379cfec39711f601de..f6a21a261f183e9002e560c99383a014e9d4b7e3 100644 (file)
@@ -4,10 +4,11 @@
 #include "conv-config.h"
 
 /******** CMI: TYPE DEFINITIONS ********/
-
+typedef CMK_TYPEDEF_INT1      CmiInt1;
 typedef CMK_TYPEDEF_INT2      CmiInt2;
 typedef CMK_TYPEDEF_INT4      CmiInt4;
 typedef CMK_TYPEDEF_INT8      CmiInt8;
+typedef CMK_TYPEDEF_UINT1     CmiUInt1;
 typedef CMK_TYPEDEF_UINT2     CmiUInt2;
 typedef CMK_TYPEDEF_UINT4     CmiUInt4;
 typedef CMK_TYPEDEF_UINT8     CmiUInt8;
index af9a6c1c42935cb0e6a9f237dcc4dd8d7022f55e..3ea3a32b1fff9ce4decf8a2529e1ce51fff124f7 100644 (file)
@@ -17,7 +17,7 @@ int CmiGetRdmaCommonInfoSize() {
 /* Support for generic implementation */
 
 // Function Pointer to Acknowledement handler function for the Direct API
-RdmaDirectAckCallerFn ncpyDirectAckHandlerFn;
+RdmaAckCallerFn ncpyDirectAckHandlerFn;
 
 // An Rget initiator PE sends this message to the target PE that will be the source of the data
 typedef struct _converseRdmaMsg {
@@ -65,7 +65,7 @@ void CmiOnesidedDirectInit(void) {
   put_data_handler_idx = CmiRegisterHandler((CmiHandler)putDataHandler);
 }
 
-void CmiSetDirectNcpyAckHandler(RdmaDirectAckCallerFn fn) {
+void CmiSetDirectNcpyAckHandler(RdmaAckCallerFn fn) {
   ncpyDirectAckHandlerFn = fn;
 }
 
@@ -125,9 +125,11 @@ void CmiDeregisterMem(const void *ptr, void *info, int pe, unsigned short int mo
 
 // Support for sending an ack message for the Entry Method API
 
-RdmaEMAckCallerFn ncpyEMAckHandlerFn;
+RdmaEMAckCallerFn ncpyEMAckHandlerFn; // P2P API ack handler function
 
-static int invoke_entry_method_ack_handler_idx;
+RdmaAckCallerFn ncpyEMBcastAckHandlerFn; // BCast API ack handler function
+
+static int invoke_entry_method_ack_handler_idx, ncpy_bcast_ack_handler_idx;
 
 // Ack Message is typically used in case of reverse operation (when a reverse put is used instead of a get)
 typedef struct _ackEntryMethodMsg{
@@ -142,6 +144,10 @@ static void ackEntryMethodHandler(ackEntryMethodMsg *msg) {
   ncpyEMAckHandlerFn(CmiMyPe(), msg->ref);
 }
 
+// This handler invokes the ncpyEMBcastAckHandler on the source (root node or intermediate nodes)
+static void bcastAckHandler(ackEntryMethodMsg *msg) {
+  ncpyEMBcastAckHandlerFn(msg->ref);
+}
 // Method to create a ackEntryMethodMsg and send it
 void CmiInvokeRemoteAckHandler(int pe, void *ref) {
   ackEntryMethodMsg *msg = (ackEntryMethodMsg *)CmiAlloc(sizeof(ackEntryMethodMsg));
@@ -154,10 +160,23 @@ void CmiInvokeRemoteAckHandler(int pe, void *ref) {
 // Register converse handler for invoking ack on reverse operation
 void CmiOnesidedDirectInit(void) {
   invoke_entry_method_ack_handler_idx = CmiRegisterHandler((CmiHandler)ackEntryMethodHandler);
+  ncpy_bcast_ack_handler_idx = CmiRegisterHandler((CmiHandler)bcastAckHandler);
 }
 
-void CmiSetEMNcpyAckHandler(RdmaEMAckCallerFn fn) {
+void CmiSetEMNcpyAckHandler(RdmaEMAckCallerFn fn, RdmaAckCallerFn bcastFn) {
   // set the EM Ack caller function
   ncpyEMAckHandlerFn = fn;
+
+  // set the EM Bcast Ack caller function
+  ncpyEMBcastAckHandlerFn = bcastFn;
+}
+
+void CmiInvokeBcastAckHandler(int pe, void *ref) {
+
+  ackEntryMethodMsg *msg = (ackEntryMethodMsg *)CmiAlloc(sizeof(ackEntryMethodMsg));
+  msg->ref = ref;
+
+  CmiSetHandler(msg, ncpy_bcast_ack_handler_idx);
+  CmiSyncSendAndFree(pe, sizeof(ackEntryMethodMsg), msg);
 }
 #endif
index 5316a368e5f736e533dcb614824e55a3da07c3e0..b3c25f7a151ba460054aaf525b5f34eaa9a7fae8 100644 (file)
@@ -4,7 +4,7 @@
 #include "cmirdmautils.h"
 
 typedef void (*RdmaEMAckCallerFn)(int destPe, void *token);
-typedef void (*RdmaDirectAckCallerFn)(void *token);
+typedef void (*RdmaAckCallerFn)(void *token);
 
 /* Support for Direct API */
 void CmiSetRdmaCommonInfo(void *info, const void *ptr, int size);
@@ -13,10 +13,10 @@ int CmiGetRdmaCommonInfoSize(void);
 void CmiSetRdmaBufferInfo(void *info, const void *ptr, int size, unsigned short int mode);
 
 // Function to set the ack handler for the Direct API
-void CmiSetDirectNcpyAckHandler(RdmaDirectAckCallerFn fn);
+void CmiSetDirectNcpyAckHandler(RdmaAckCallerFn fn);
 
 // Function to set the ack handler for the Entry Method API
-void CmiSetEMNcpyAckHandler(RdmaEMAckCallerFn fn);
+void CmiSetEMNcpyAckHandler(RdmaEMAckCallerFn fn, RdmaAckCallerFn bcastFn);
 
 /* CmiIssueRget initiates an RDMA read operation, transferring 'size' bytes of data from the address space of 'srcPe' to local address, 'destAddr'.
  * When the runtime invokes srcAck on the source (target), it indicates safety to overwrite or free the srcAddr buffer.
@@ -63,7 +63,15 @@ int CmiDoesCMAWork(void);
 // Method used to send an ack after completion of a reverse rdma operation
 void CmiInvokeRemoteAckHandler(int pe, void *ref);
 
+// Method used to send an ack to my parent after completion of an RGET in the receiver
+void CmiInvokeBcastAckHandler(int pe, void *ref);
+
 // Function declaration for onesided initialization
 void CmiOnesidedDirectInit(void);
 
+// Broadcast API support
+void CmiForwardProcBcastMsg(int size, char *msg); // for forwarding proc messages to my child nodes
+void CmiForwardNodeBcastMsg(int size, char *msg); // for forwarding node queue messages to my child nodes
+
+void CmiForwardMsgToPeers(int size, char *msg); // for forwarding messages to my peer PEs
 #endif
index 68751a5b11b4755c6ecc9fdfbd0efc391c645c82..b858d31a2d505b83f0e1a012976b4dcdd2efbe4c 100644 (file)
@@ -73,6 +73,9 @@
 
 #include "conv-header.h"
 
+#define CMI_ZC_MSGTYPE(msg)                  ((CmiMsgHeaderBasic *)msg)->zcMsgType
+#define CMI_IS_ZC_BCAST(msg)                 (CMI_ZC_MSGTYPE(msg) == CMK_ZC_BCAST_SEND_MSG)
+
 #define CMIALIGN(x,n)       (size_t)((~((size_t)n-1))&((x)+(n-1)))
 /*#define ALIGN8(x)        (size_t)((~7)&((x)+7)) */
 #define ALIGN8(x)          CMIALIGN(x,8)
index 0dac1812cf782c64397fbd2121e0aae3dc36a36d..3d4557bf7a7b61116ba4456790c5dada2fac0408 100644 (file)
@@ -126,6 +126,8 @@ void setReverseModeForNcpyOpInfo(NcpyOperationInfo *ncpyOpInfo) {
                                break;
     case CMK_DIRECT_API      : // Do nothing
                                break;
+    case CMK_BCAST_EM_API    : ncpyOpInfo->opMode = CMK_BCAST_EM_API_REVERSE;
+                               break;
     default                  : CmiAbort("Unknown opcode");
                                break;
   }
index 44a3f7cae3ebf3f5b6740c1db16bdb167075b7f6..443f96324a8e3f4ca2354acc6f2e5493d7982821 100644 (file)
@@ -526,107 +526,93 @@ void Entry::genArrayDefs(XStr& str) {
           << ") \n";  // no const
     str << "{\n";
     // regular broadcast and section broadcast for an entry method with rdma
-    if (param->hasRdma() && !container->isForElement()) {
-      str << "  CkAbort(\"Broadcast not supported for entry methods with nocopy "
-             "parameters\");\n";
+    str << "  ckCheck();\n";
+    XStr inlineCall;
+    if (!isNoTrace())
+      inlineCall
+          << "    _TRACE_BEGIN_EXECUTE_DETAILED(0,ForArrayEltMsg,(" << epIdx()
+          << "),CkMyPe(), 0, ((CkArrayIndex&)ckGetIndex()).getProjectionID(), obj);\n";
+    if (isAppWork()) inlineCall << "    _TRACE_BEGIN_APPWORK();\n";
+    inlineCall << "#if CMK_LBDB_ON\n"
+               << "    LDObjHandle objHandle;\n"
+               << "    int objstopped=0;\n"
+               << "    objHandle = obj->timingBeforeCall(&objstopped);\n"
+               << "#endif\n";
+    inlineCall << "#if CMK_CHARMDEBUG\n"
+                  "    CpdBeforeEp("
+               << epIdx()
+               << ", obj, NULL);\n"
+                  "#endif\n";
+    inlineCall << "    ";
+    if (!retType->isVoid()) inlineCall << retType << " retValue = ";
+    inlineCall << "obj->" << (tspec ? "template " : "") << name;
+    if (tspec) {
+      inlineCall << "<";
+      tspec->genShort(inlineCall);
+      inlineCall << ">";
+    }
+    inlineCall << "(";
+    param->unmarshallForward(inlineCall, true);
+    inlineCall << ");\n";
+    inlineCall << "#if CMK_CHARMDEBUG\n"
+                  "    CpdAfterEp("
+               << epIdx()
+               << ");\n"
+                  "#endif\n";
+    inlineCall << "#if CMK_LBDB_ON\n    "
+                  "obj->timingAfterCall(objHandle,&objstopped);\n#endif\n";
+    if (isAppWork()) inlineCall << "    _TRACE_END_APPWORK();\n";
+    if (!isNoTrace()) inlineCall << "    _TRACE_END_EXECUTE();\n";
+    if (!retType->isVoid()) {
+      inlineCall << "    return retValue;\n";
     } else {
-      str << "  ckCheck();\n";
-      XStr inlineCall;
-      if (!isNoTrace())
-        // Create a dummy envelope to represent the "message send" to the local/inline method
-        // so that Projections can trace the method back to its caller
-        inlineCall
-            << "    envelope env;\n"
-            << "    env.setMsgtype(ForArrayEltMsg);\n"
-            << "    env.setTotalsize(0);\n"
-            // NOTE: extra parentheses around the second argument to _TRACE_CREATION_DETAILED here
-            //       are needed for templated entry methods
-            << "    _TRACE_CREATION_DETAILED(&env, (" << epIdx() << "));\n"
-            << "    _TRACE_CREATION_DONE(1);\n"
-            << "    _TRACE_BEGIN_EXECUTE_DETAILED(CpvAccess(curPeEvent),ForArrayEltMsg,(" << epIdx()
-            << "),CkMyPe(), 0, ((CkArrayIndex&)ckGetIndex()).getProjectionID(), obj);\n";
-      if (isAppWork()) inlineCall << "    _TRACE_BEGIN_APPWORK();\n";
-      inlineCall << "#if CMK_LBDB_ON\n"
-                 << "    LDObjHandle objHandle;\n"
-                 << "    int objstopped=0;\n"
-                 << "    objHandle = obj->timingBeforeCall(&objstopped);\n"
-                 << "#endif\n";
-      inlineCall << "#if CMK_CHARMDEBUG\n"
-                    "    CpdBeforeEp("
-                 << epIdx()
-                 << ", obj, NULL);\n"
-                    "#endif\n";
-      inlineCall << "    ";
-      if (!retType->isVoid()) inlineCall << retType << " retValue = ";
-      inlineCall << "obj->" << (tspec ? "template " : "") << name;
-      if (tspec) {
-        inlineCall << "<";
-        tspec->genShort(inlineCall);
-        inlineCall << ">";
-      }
-      inlineCall << "(";
-      param->unmarshallForward(inlineCall, true);
-      inlineCall << ");\n";
-      inlineCall << "#if CMK_CHARMDEBUG\n"
-                    "    CpdAfterEp("
-                 << epIdx()
-                 << ");\n"
-                    "#endif\n";
-      inlineCall << "#if CMK_LBDB_ON\n    "
-                    "obj->timingAfterCall(objHandle,&objstopped);\n#endif\n";
-      if (isAppWork()) inlineCall << "    _TRACE_END_APPWORK();\n";
-      if (!isNoTrace()) inlineCall << "    _TRACE_END_EXECUTE();\n";
-      if (!retType->isVoid()) {
-        inlineCall << "    return retValue;\n";
-      } else {
-        inlineCall << "    return;\n";
-      }
+      inlineCall << "    return;\n";
+    }
 
-      XStr prepareMsg;
-      prepareMsg << marshallMsg();
-      prepareMsg << "  UsrToEnv(impl_msg)->setMsgtype(ForArrayEltMsg);\n";
-      prepareMsg << "  CkArrayMessage *impl_amsg=(CkArrayMessage *)impl_msg;\n";
-      prepareMsg << "  impl_amsg->array_setIfNotThere(" << ifNot << ");\n";
+    XStr prepareMsg;
+    prepareMsg << marshallMsg();
+    prepareMsg << "  UsrToEnv(impl_msg)->setMsgtype(ForArrayEltMsg);\n";
+    prepareMsg << "  CkArrayMessage *impl_amsg=(CkArrayMessage *)impl_msg;\n";
+    prepareMsg << "  impl_amsg->array_setIfNotThere(" << ifNot << ");\n";
 
-      if (!isLocal()) {
-        if (isInline() && container->isForElement()) {
-          str << "  " << container->baseName() << " *obj = ckLocal();\n";
-          str << "  if (obj) {\n" << inlineCall << "  }\n";
-        }
-        str << prepareMsg;
-      } else {
+    if (!isLocal()) {
+      if (isInline() && container->isForElement()) {
         str << "  " << container->baseName() << " *obj = ckLocal();\n";
-        str << "#if CMK_ERROR_CHECKING\n";
-        str << "  if (obj==NULL) CkAbort(\"Trying to call a LOCAL entry method on a "
-               "non-local element\");\n";
-        str << "#endif\n";
-        str << inlineCall;
-      }
-      if (isIget()) {
-        str << "  CkFutureID f=CkCreateAttachedFutureSend(impl_amsg," << epIdx()
-            << ",ckGetArrayID(),ckGetIndex(),&CProxyElement_ArrayBase::ckSendWrapper);"
-            << "\n";
+        str << "  if (obj) {\n" << inlineCall << "  }\n";
       }
+      str << prepareMsg;
+    } else {
+      str << "  " << container->baseName() << " *obj = ckLocal();\n";
+      str << "#if CMK_ERROR_CHECKING\n";
+      str << "  if (obj==NULL) CkAbort(\"Trying to call a LOCAL entry method on a "
+             "non-local element\");\n";
+      str << "#endif\n";
+      str << inlineCall;
+    }
+    if (isIget()) {
+      str << "  CkFutureID f=CkCreateAttachedFutureSend(impl_amsg," << epIdx()
+          << ",ckGetArrayID(),ckGetIndex(),&CProxyElement_ArrayBase::ckSendWrapper);"
+          << "\n";
+    }
 
-      if (isSync()) {
-        str << syncPreCall() << "ckSendSync(impl_amsg, " << epIdx() << ");\n";
-        str << syncPostCall();
-      } else if (!isLocal()) {
-        XStr opts;
-        opts << ",0";
-        if (isSkipscheduler()) opts << "+CK_MSG_EXPEDITED";
-        if (isInline()) opts << "+CK_MSG_INLINE";
-        if (!isIget()) {
-          if (container->isForElement() || container->isForSection()) {
-            str << "  ckSend(impl_amsg, " << epIdx() << opts << ");\n";
-          } else
-            str << "  ckBroadcast(impl_amsg, " << epIdx() << opts << ");\n";
-        }
-      }
-      if (isIget()) {
-        str << "  return f;\n";
+    if (isSync()) {
+      str << syncPreCall() << "ckSendSync(impl_amsg, " << epIdx() << ");\n";
+      str << syncPostCall();
+    } else if (!isLocal()) {
+      XStr opts;
+      opts << ",0";
+      if (isSkipscheduler()) opts << "+CK_MSG_EXPEDITED";
+      if (isInline()) opts << "+CK_MSG_INLINE";
+      if (!isIget()) {
+        if (container->isForElement() || container->isForSection()) {
+          str << "  ckSend(impl_amsg, " << epIdx() << opts << ");\n";
+        } else
+          str << "  ckBroadcast(impl_amsg, " << epIdx() << opts << ");\n";
       }
     }
+    if (isIget()) {
+      str << "  return f;\n";
+    }
     str << "}\n";
 
     if (!tspec && !container->isTemplated() && !isIget() && (isLocal() || isInline()) && container->isForElement()) {
@@ -850,10 +836,6 @@ void Entry::genGroupDefs(XStr& str) {
   str << makeDecl(retStr, 1) << "::" << name << "(" << msgTypeStr << ")\n";
   str << "{\n";
   // regular broadcast and section broadcast for an entry method with rdma
-  if (param->hasRdma() && !container->isForElement()) {
-    str << "  CkAbort(\"Broadcast not supported for entry methods with nocopy "
-           "parameters\");\n";
-  } else {
     str << "  ckCheck();\n";
     if (!isLocal()) str << marshallMsg();
 
@@ -943,7 +925,6 @@ void Entry::genGroupDefs(XStr& str) {
             << ");\n";
       }
     }
-  }
   str << "}\n";
 
   // entry method on multiple PEs declaration
@@ -951,25 +932,16 @@ void Entry::genGroupDefs(XStr& str) {
       !container->isNodeGroup()) {
     str << "" << makeDecl(retStr, 1) << "::" << name << "(" << paramComma(0, 0)
         << "int npes, int *pes" << eo(0) << ") {\n";
-    if (param->hasRdma()) {
-      str << "  CkAbort(\"Broadcast not supported for entry methods with nocopy "
              "parameters\");\n";
-    } else {
-      str << marshallMsg();
-      str << "  CkSendMsg" << node << "BranchMulti(" << paramg << ", npes, pes" << opts
-          << ");\n";
-    }
+    str << marshallMsg();
+    str << "  CkSendMsg" << node << "BranchMulti(" << paramg << ", npes, pes" << opts
+        << ");\n";
     str << "}\n";
     str << "" << makeDecl(retStr, 1) << "::" << name << "(" << paramComma(0, 0)
         << "CmiGroup &grp" << eo(0) << ") {\n";
-    if (param->hasRdma()) {
-      str << "  CkAbort(\"Broadcast not supported for entry methods with nocopy "
-             "parameters\");\n";
-    } else {
-      str << marshallMsg();
-      str << "  CkSendMsg" << node << "BranchGroup(" << paramg << ", grp" << opts
-          << ");\n";
-    }
+    str << marshallMsg();
+    str << "  CkSendMsg" << node << "BranchGroup(" << paramg << ", grp" << opts
+        << ");\n";
     str << "}\n";
   }
 }
@@ -2470,6 +2442,15 @@ void Entry::genCall(XStr& str, const XStr& preCall, bool redn_wrapper, bool uses
         param->unmarshall(str);
         str << ");\n";
       }
+      // pack pointers if it's a broadcast message
+      if(param->hasRdma() && !container->isForElement()) {
+        // pack rdma pointers for broadcast unmarshall
+        // this is done to support broadcasts before all chare array elements are
+        // finished with their EM execution using the same msg
+        str << "#if CMK_ONESIDED_IMPL\n";
+        str << "  CkPackRdmaPtrs(impl_buf_begin);\n";
+        str << "#endif\n";
+      }
     }
   }
 }
@@ -3007,5 +2988,6 @@ int Entry::isReductionTarget(void) { return (attribs & SREDUCE); }
 
 char* Entry::getEntryName() { return name; }
 int Entry::getLine() { return line; }
+Chare* Entry::getContainer(void) const { return container; }
 
 }  // namespace xi
index d145ed5ef9fb448b8770029e24d82783eb772bf6..7bae47eadc7bbd57f74bc51079177050d98d63b4 100644 (file)
@@ -212,6 +212,8 @@ class Entry : public Member {
   int getLine();
   void genTramRegs(XStr& str);
   void genTramPups(XStr& scope, XStr& decls, XStr& defs);
+
+  Chare* getContainer(void) const;
 };
 
 // TODO(Ralf): why not simply use list<Entry*> instead?
index 396fada11793a8885a77d61c0418689011d204a6..0f77776d117b96ec9d1bfe8ede92bbffd42be725 100644 (file)
@@ -2,6 +2,7 @@
 #include "xi-Parameter.h"
 #include "xi-Type.h"
 #include "xi-Value.h"
+#include "xi-Chare.h"
 
 namespace xi {
 
@@ -194,7 +195,7 @@ void ParamList::callEach(rdmafn_t f, XStr& str, bool isArray) {
 int ParamList::hasConditional() { return orEach(&Parameter::isConditional); }
 
 /** marshalling: pack fields into flat byte buffer **/
-void ParamList::marshall(XStr& str, XStr& entry) {
+void ParamList::marshall(XStr& str, XStr& entry_str) {
   if (isVoid())
     str << "  void *impl_msg = CkAllocSysMsg(impl_e_opts);\n";
   else if (isMarshalled()) {
@@ -242,8 +243,8 @@ void ParamList::marshall(XStr& str, XStr& entry) {
     str << "  }\n";
     // Now that we know the size, allocate the packing buffer
     if (hasConditional())
-      str << "  MarshallMsg_" << entry << " *impl_msg=CkAllocateMarshallMsgT<MarshallMsg_"
-          << entry << ">(impl_off,impl_e_opts);\n";
+      str << "  MarshallMsg_" << entry_str << " *impl_msg=CkAllocateMarshallMsgT<MarshallMsg_"
+          << entry_str << ">(impl_off,impl_e_opts);\n";
     else
       str << "  CkMarshallMsg *impl_msg=CkAllocateMarshallMsg(impl_off,impl_e_opts);\n";
     // Second pass: write the data
@@ -266,7 +267,11 @@ void ParamList::marshall(XStr& str, XStr& entry) {
     }
     if (hasrdma) {
       str << "#if CMK_ONESIDED_IMPL\n";
-      str << "  UsrToEnv(impl_msg)->setRdma(true);\n";
+      if(entry->getContainer()->isForElement()) {
+        str << "  CMI_ZC_MSGTYPE((char *)UsrToEnv(impl_msg)) = CMK_ZC_P2P_SEND_MSG;\n";
+      } else { // Mark a Ncpy Bcast message to intercept it in the send code path
+        str << "  CMI_ZC_MSGTYPE((char *)UsrToEnv(impl_msg)) = CMK_ZC_BCAST_SEND_MSG;\n";
+      }
       str << "#else\n";
       if (!hasArrays) str << "  char *impl_buf=impl_msg->msgBuf+impl_arrstart;\n";
       callEach(&Parameter::marshallRdmaArrayData, str);
@@ -417,7 +422,8 @@ void ParamList::beginRednWrapperUnmarshall(XStr& str, bool needsClosure) {
         if (!needsClosure) {
           if (hasRdma()) {
             str << "#if CMK_ONESIDED_IMPL\n";
-            str << "  CkUnpackRdmaPtrs(impl_buf);\n";
+            str << "  char *impl_buf_begin = impl_buf;\n";
+            str << "  CkUnpackRdmaPtrs(impl_buf_begin);\n";
             str << "  int impl_num_rdma_fields; implP|impl_num_rdma_fields;\n";
             callEach(&Parameter::beginUnmarshallRdma, str, true);
             str << "#else\n";
@@ -428,7 +434,8 @@ void ParamList::beginRednWrapperUnmarshall(XStr& str, bool needsClosure) {
         } else {
           if (hasRdma()) {
             str << "#if CMK_ONESIDED_IMPL\n";
-            str << "  CkUnpackRdmaPtrs(impl_buf);\n";
+            str << "  char *impl_buf_begin = impl_buf;\n";
+            str << "  CkUnpackRdmaPtrs(impl_buf_begin);\n";
             callEach(&Parameter::beginUnmarshallSDAGCallRdma, str, true);
             str << "#else\n";
             callEach(&Parameter::beginUnmarshallSDAGCallRdma, str, false);
@@ -451,7 +458,8 @@ void ParamList::beginRednWrapperUnmarshall(XStr& str, bool needsClosure) {
       if (!needsClosure) {
         if (hasRdma()) {
           str << "#if CMK_ONESIDED_IMPL\n";
-          str << "  CkUnpackRdmaPtrs(impl_buf);\n";
+          str << "  char *impl_buf_begin = impl_buf;\n";
+          str << "  CkUnpackRdmaPtrs(impl_buf_begin);\n";
           str << "  int impl_num_rdma_fields; implP|impl_num_rdma_fields;\n";
           callEach(&Parameter::beginUnmarshallRdma, str, true);
           str << "#else\n";
@@ -483,7 +491,8 @@ void ParamList::beginUnmarshall(XStr& str) {
     str << "  PUP::fromMem implP(impl_buf);\n";
     if (hasRdma()) {
       str << "#if CMK_ONESIDED_IMPL\n";
-      str << "  CkUnpackRdmaPtrs(impl_buf);\n";
+      str << "  char *impl_buf_begin = impl_buf;\n";
+      str << "  CkUnpackRdmaPtrs(impl_buf_begin);\n";
       str << "  int impl_num_rdma_fields; implP|impl_num_rdma_fields; \n";
       callEach(&Parameter::beginUnmarshallRdma, str, true);
       str << "#else\n";
@@ -570,7 +579,8 @@ void ParamList::beginUnmarshallSDAGCall(XStr& str, bool usesImplBuf) {
         << ";\n";
     if (hasRdma()) {
       str << "#if CMK_ONESIDED_IMPL\n";
-      str << "  CkUnpackRdmaPtrs(impl_buf);\n";
+      str << "  char *impl_buf_begin = impl_buf;\n";
+      str << "  CkUnpackRdmaPtrs(impl_buf_begin);\n";
       callEach(&Parameter::beginUnmarshallSDAGCallRdma, str, true);
       str << "#else\n";
       callEach(&Parameter::beginUnmarshallSDAGCallRdma, str, false);