Feature #1742: Use ZC infrastructure for bcast of large readonly variables 37/4637/27
authorNitin Bhat <nbhat4@illinois.edu>
Fri, 5 Oct 2018 21:17:12 +0000 (17:17 -0400)
committerNitin Bhat <nbhat4@illinois.edu>
Wed, 10 Apr 2019 18:19:25 +0000 (13:19 -0500)
A macro CMK_ONESIDED_RO_THRESHOLD is used to specify the threshold
which represents the size in bytes over which the Zerocopy API is used
to broadcast the large readonly variables. Currently, it is defined
in src/arch/util/lrts-common.h. In the future, it has to be a layer
dependent macro that is computed based on experiments and defined in
src/arch/<layer>/conv-common.h.

Change-Id: I261345f2e8bc1520c6b67f08ae8abf79f9534ce3

15 files changed:
examples/charm++/zerocopy/Makefile
examples/charm++/zerocopy/large_readonly/Makefile [new file with mode: 0644]
examples/charm++/zerocopy/large_readonly/readonly.C [new file with mode: 0644]
examples/charm++/zerocopy/large_readonly/readonly.ci [new file with mode: 0644]
src/arch/common/conv-mach-common.h
src/arch/util/lrts-common.h
src/ck-core/charm++.h
src/ck-core/charm.h
src/ck-core/ckrdma.C
src/ck-core/ckrdma.h
src/ck-core/init.C
src/util/pup.h
src/xlat-i/xi-Member.C
src/xlat-i/xi-Type.h
tests/util/check.C

index 83d44b3179b8e6f874cf08ad349b1cc9884f8dad..5ab3974a6295433de6a5966307281cb64296bced 100644 (file)
@@ -2,6 +2,7 @@ DIRS = \
   direct_api \
   entry_method_api \
   entry_method_bcast_api \
+  large_readonly \
 
 TESTDIRS = $(DIRS)
 
diff --git a/examples/charm++/zerocopy/large_readonly/Makefile b/examples/charm++/zerocopy/large_readonly/Makefile
new file mode 100644 (file)
index 0000000..bf665e3
--- /dev/null
@@ -0,0 +1,23 @@
+-include ../../../common.mk
+CHARMC=../../../../bin/charmc $(OPTS)
+
+all: readonly
+
+OBJS = readonly.o
+
+readonly: $(OBJS)
+       $(CHARMC) -language charm++ -o readonly $(OBJS)
+
+cifiles: readonly.ci
+       $(CHARMC) readonly.ci
+       touch cifiles
+
+readonly.o: readonly.C cifiles
+       $(CHARMC) -c readonly.C
+
+test: all
+       $(call run, +p4 ./readonly)
+
+clean:
+       rm -f *.decl.h *.def.h conv-host *.o readonly charmrun cifiles
+
diff --git a/examples/charm++/zerocopy/large_readonly/readonly.C b/examples/charm++/zerocopy/large_readonly/readonly.C
new file mode 100644 (file)
index 0000000..71d53a9
--- /dev/null
@@ -0,0 +1,44 @@
+#include "ro_example.decl.h"
+
+int arr_size;
+int vec_size;
+
+int num_arr1[2000000];
+std::vector<int> num_vec1;
+CProxy_Main mProxy;
+
+class Main : public CBase_Main {
+
+  public:
+  Main(CkArgMsg *msg) {
+    CkPrintf("[%d][%d][%d] Hello, inside main\n", CmiMyPe(), CmiMyNode(), CmiMyRank());
+
+    arr_size = 2000000;
+    vec_size = 2000000;
+    mProxy = thisProxy;
+
+    for(int i=0; i<arr_size; i++) num_arr1[i] = i;
+    for(int i=0; i<vec_size; i++) num_vec1.push_back(i);
+
+    CProxy_arr1::ckNew(2*CkNumPes());
+  }
+
+  void done() {
+    CkPrintf("[%d][%d][%d] Verified successful Readonly transfer\n", CmiMyPe(), CmiMyNode(), CmiMyRank());
+    CkExit();
+  }
+};
+
+class arr1 : public CBase_arr1 {
+  public:
+  arr1() {
+
+    for(int i=0; i<arr_size; i++) CkAssert(num_arr1[i] == i);
+    for(int i=0; i<vec_size; i++) CkAssert(num_vec1[i] == i);
+
+    CkCallback cb(CkReductionTarget(Main, done), mProxy);
+    contribute(cb);
+  }
+};
+
+#include "ro_example.def.h"
diff --git a/examples/charm++/zerocopy/large_readonly/readonly.ci b/examples/charm++/zerocopy/large_readonly/readonly.ci
new file mode 100644 (file)
index 0000000..a588933
--- /dev/null
@@ -0,0 +1,19 @@
+mainmodule ro_example {
+
+  readonly int arr_size;
+  readonly int vec_size;
+
+  readonly int num_arr1[2000000];
+  readonly std::vector<int> num_vec1;
+
+  readonly CProxy_Main mProxy;
+
+  mainchare Main{
+    entry Main(CkArgMsg *msg);
+    entry [reductiontarget] void done();
+  };
+
+  array [1D] arr1 {
+    entry arr1();
+  }
+};
index 46c0242164558a992767c4a673dc492072d3a8d3..3d94808004f258a81e35f3217b54101153ef30cc 100644 (file)
@@ -47,7 +47,8 @@ enum ncpyOperationMode {
   CMK_EM_API              = 1,
   CMK_EM_API_REVERSE      = 2,
   CMK_BCAST_EM_API        = 3,
-  CMK_BCAST_EM_API_REVERSE= 4
+  CMK_BCAST_EM_API_REVERSE= 4,
+  CMK_READONLY_BCAST      = 5
 };
 
 // Enum for the method of acknowledglement handling after the completion of a zerocopy operation
index 3705dfce7431bb8b933c03029fc433762c02e185..6b885fc327ab315c5a747f67d0f07d8a3426630d 100644 (file)
@@ -36,3 +36,14 @@ enum cmiCMAMsgType {
 #undef  CMK_COMMON_NOCOPY_DIRECT_BYTES // previous definition is in conv-mach-common.h
 #define CMK_COMMON_NOCOPY_DIRECT_BYTES sizeof(pid_t)
 #endif
+
+#if CMK_ONESIDED_IMPL
+// This macro is used to specify the threshold size in bytes, above which
+// the Zerocopy API is used to broadcast large readonly variables to all
+// processes
+#define CMK_ONESIDED_RO_THRESHOLD      1048576
+
+// TODO: Modify and move this variable to <layer>/conv-common.h by running experiments on each layer
+// CMK_ONESIDED_RO_THRESHOLD is currently set to 1 MiB arbitrarily until the experiments are conducted
+
+#endif
index fc8579ff268c77f527fad4296c6e5e637bff2094..cd97e34ab62f798e18a04db7d7889e719b724c8e 100644 (file)
@@ -1391,6 +1391,9 @@ public:
   }
 };
 
+// Method to check if the Charm RTS initialization phase has completed
+void checkForInitDone();
+
 #endif
 
 
index 0c8881389f53fe776934a70d25f3296a9c0a334e..22dd30f11af369e1b066d877f8f20e3de9909131 100644 (file)
@@ -316,32 +316,36 @@ extern void CkMigrateExt(int aid, int ndims, int *index, int toPe);
  *   Make sure the two remain synchronized if changing this one.
  ***/
 typedef enum {
-  NewChareMsg     =1,               // Singleton chare creation message
-  NewVChareMsg    =2,               // Singleton virtual chare creation message
-  BocInitMsg      =3,               // Group creation message
-  ForChareMsg     =4,               // Singleton chare entry method message (non creation)
-  ForBocMsg       =5,               // Group entry method message (non creation)
-  ForVidMsg       =6,               // Singleton virtual chare entry method message (non creation)
-  FillVidMsg      =7,               // Message sent to fill a VidBlock on a virtual chare PE
-  DeleteVidMsg    =8,               // Message sent to delete a VidBlock on a virtual chare PE
-  RODataMsg       =9,               // Readonly Data Message (for user declared readonly variables)
-  ROMsgMsg        =10,              // Readonly message Message (for user declared readonly messages)
-  StartExitMsg    =11,              // Exit sequence trigger message
-  ExitMsg         =12,              // Exit sequence trigger message using user registered exit function
-  ReqStatMsg      =13,              // Request stats and warnings message
-  StatMsg         =14,              // Stats data message (Reduction)
-  StatDoneMsg     =15,              // Signal completion of stats reduction (Broadcast)
-  NodeBocInitMsg  =16,              // Nodegroup creation message
-  ForNodeBocMsg   =17,              // Nodegroup entry method message (non creation)
-  ArrayEltInitMsg =18,              // Array Element Initialization message
-  ForArrayEltMsg  =19,              // Array Element entry method message
-  ForIDedObjMsg   =20,
+  NewChareMsg          =1,               // Singleton chare creation message
+  NewVChareMsg         =2,               // Singleton virtual chare creation message
+  BocInitMsg           =3,               // Group creation message
+  ForChareMsg          =4,               // Singleton chare entry method message (non creation)
+  ForBocMsg            =5,               // Group entry method message (non creation)
+  ForVidMsg            =6,               // Singleton virtual chare entry method message (non creation)
+  FillVidMsg           =7,               // Message sent to fill a VidBlock on a virtual chare PE
+  DeleteVidMsg         =8,               // Message sent to delete a VidBlock on a virtual chare PE
+  RODataMsg            =9,               // Readonly Data Message (for user declared readonly variables)
+  ROMsgMsg             =10,              // Readonly message Message (for user declared readonly messages)
+  ROPeerCompletionMsg  =11,              // Message to signal completion of RO Data transfer using Zcpy API
+                                         // ^(used by child nodes to signal completion to their parent node in the bcast spanning tree)
+  ROChildCompletionMsg =12,              // Message to signal completion of RO Data transfer using Zcpy API
+                                         // ^(used by peer nodes to signal completion to the 0th node)
+  StartExitMsg         =13,              // Exit sequence trigger message
+  ExitMsg              =14,              // Exit sequence trigger message using user registered exit function
+  ReqStatMsg           =15,              // Request stats and warnings message
+  StatMsg              =16,              // Stats data message (Reduction)
+  StatDoneMsg          =17,              // Signal completion of stats reduction (Broadcast)
+  NodeBocInitMsg       =18,              // Nodegroup creation message
+  ForNodeBocMsg        =19,              // Nodegroup entry method message (non creation)
+  ArrayEltInitMsg      =20,              // Array Element Initialization message
+  ForArrayEltMsg       =21,              // Array Element entry method message
+  ForIDedObjMsg        =22,
 #if CMK_LOCKLESS_QUEUE
-  WarnMsg         =21,              // Warning data message (Reduction)
-  WarnDoneMsg     =22,              // Signal completion of warnings reduction (Broadcast)
-  LAST_CK_ENVELOPE_TYPE =23         // Used for error-checking
+  WarnMsg              =23,              // Warning data message (Reduction)
+  WarnDoneMsg          =24,              // Signal completion of warnings reduction (Broadcast)
+  LAST_CK_ENVELOPE_TYPE =25              // Used for error-checking
 #else
-  LAST_CK_ENVELOPE_TYPE =21         // Used for error-checking
+  LAST_CK_ENVELOPE_TYPE =23              // Used for error-checking
 #endif
 } CkEnvelopeType;
 
index 829b22399a96ad07815817141ae8c0db8ba9a34f..3f72721c4e77ee7a1f575d694123913913b2f94a 100644 (file)
@@ -355,6 +355,8 @@ void CkRdmaDirectAckHandler(void *ack) {
 
     case CMK_BCAST_EM_API_REVERSE : handleBcastReverseEntryMethodApiCompletion(info); // Ncpy EM Bcast API invoked through a PUT
                                     break;
+    case CMK_READONLY_BCAST       : readonlyGetCompleted(info);
+                                    break;
 #endif
     default                       : CkAbort("CkRdmaDirectAckHandler: Unknown ncpyOpInfo->opMode");
                                     break;
@@ -570,8 +572,7 @@ envelope* CkRdmaIssueRgets(envelope *env, ncpyEmApiMode emMode, void *forwardMsg
     up|source;
 
     // destination buffer
-    CkNcpyBuffer dest((const void *)buf, CK_BUFFER_UNREG);
-    dest.cnt = source.cnt;
+    CkNcpyBuffer dest((const void *)buf, source.cnt, CK_BUFFER_UNREG);
 
     // Set the common layerInfo for the destination
     CmiSetRdmaCommonInfo(dest.layerInfo, dest.ptr, dest.cnt);
@@ -1034,5 +1035,148 @@ void CkRdmaEMBcastAckHandler(void *ack) {
   }
 }
 
+
+
+/***************************** Zerocopy Readonly Bcast Support ****************************/
+
+extern int _roRdmaDoneHandlerIdx,_initHandlerIdx;
+CksvExtern(int, _numPendingRORdmaTransfers);
+extern UInt numZerocopyROops, curROIndex;
+extern NcpyROBcastAckInfo *roBcastAckInfo;
+
+void readonlyUpdateNumops() {
+  //update numZerocopyROops
+  numZerocopyROops++;
+}
+
+// Method to allocate an object on the source for de-registration after bcast completes
+void readonlyAllocateOnSource() {
+
+  if(_topoTree == NULL) CkAbort("CkRdmaIssueRgets:: topo tree has not been calculated \n");
+  CmiSpanningTreeInfo &t = *_topoTree;
+
+  // allocate the buffer to keep track of the completed operations
+  roBcastAckInfo = (NcpyROBcastAckInfo *)CmiAlloc(sizeof(NcpyROBcastAckInfo) + numZerocopyROops * sizeof(NcpyROBcastBuffAckInfo));
+
+  roBcastAckInfo->counter = 0;
+  roBcastAckInfo->isRoot = (t.parent == -1);
+  roBcastAckInfo->numChildren = t.child_count;
+  roBcastAckInfo->numops = numZerocopyROops;
+}
+
+// Method to initialize the allocated object with each source buffer's information
+void readonlyCreateOnSource(CkNcpyBuffer &src) {
+  src.bcastAckInfo = roBcastAckInfo;
+
+  NcpyROBcastBuffAckInfo *buffAckInfo = &(roBcastAckInfo->buffAckInfo[curROIndex]);
+
+  buffAckInfo->ptr = src.ptr;
+  buffAckInfo->mode = src.mode;
+  buffAckInfo->pe = src.pe;
+
+  // store the source layer information for de-registration
+  memcpy(buffAckInfo->layerInfo, src.layerInfo, CMK_COMMON_NOCOPY_DIRECT_BYTES + CMK_NOCOPY_DIRECT_BYTES);
+
+  curROIndex++;
+}
+
+// Method to perform an get for Readonly transfer
+int readonlyGet(CkNcpyBuffer &src, CkNcpyBuffer &dest, void *refPtr) {
+  CkNcpyMode transferMode = findTransferMode(src.pe, dest.pe);
+  if(transferMode == CkNcpyMode::MEMCPY) {
+    CmiAbort("memcpy: should not happen\n");
+  }
+#if CMK_USE_CMA
+  else if(transferMode == CkNcpyMode::CMA) {
+    dest.cmaGet(src);
+  }
+#endif
+  else {
+
+    CksvAccess(_numPendingRORdmaTransfers)++;
+
+    int layerInfoSize = CMK_COMMON_NOCOPY_DIRECT_BYTES + CMK_NOCOPY_DIRECT_BYTES;
+    int ackSize = 0;
+    int ncpyObjSize = getNcpyOpInfoTotalSize(
+                      layerInfoSize,
+                      ackSize,
+                      layerInfoSize,
+                      ackSize);
+    NcpyOperationInfo *ncpyOpInfo = (NcpyOperationInfo *)CmiAlloc(ncpyObjSize);
+    setNcpyOpInfo(src.ptr,
+                  (char *)(src.layerInfo),
+                  layerInfoSize,
+                  NULL,
+                  ackSize,
+                  src.cnt,
+                  src.mode,
+                  src.isRegistered,
+                  src.pe,
+                  src.ref,
+                  dest.ptr,
+                  (char *)(dest.layerInfo),
+                  layerInfoSize,
+                  NULL,
+                  ackSize,
+                  dest.cnt,
+                  dest.mode,
+                  dest.isRegistered,
+                  dest.pe,
+                  dest.ref,
+                  ncpyOpInfo);
+
+    ncpyOpInfo->opMode = CMK_READONLY_BCAST;
+    ncpyOpInfo->refPtr = refPtr;
+
+    readonlyCreateOnSource(dest);
+
+    CmiIssueRget(ncpyOpInfo);
+  }
+}
+
+void readonlyGetCompleted(NcpyOperationInfo *ncpyOpInfo) {
+
+  if(_topoTree == NULL) CkAbort("CkRdmaIssueRgets:: topo tree has not been calculated \n");
+  CmiSpanningTreeInfo &t = *_topoTree;
+
+  CksvAccess(_numPendingRORdmaTransfers)--;
+
+  if(CksvAccess(_numPendingRORdmaTransfers) == 0) {
+
+    if(t.child_count != 0) {  // Intermediate Node
+
+      envelope *env = (envelope *)(ncpyOpInfo->refPtr);
+
+      // send a message to my child nodes
+      CmiForwardProcBcastMsg(env->getTotalsize(), (char *)env);
+
+      //TODO:QD support
+
+    } else {
+
+      // deregister dest buffer
+      CmiDeregisterMem(ncpyOpInfo->destPtr, ncpyOpInfo->destLayerInfo + CmiGetRdmaCommonInfoSize(), ncpyOpInfo->destPe, ncpyOpInfo->destMode);
+
+      // Send a message to the parent to signal completion in order to deregister
+      envelope *compEnv = _allocEnv(ROChildCompletionMsg);
+      compEnv->setSrcPe(CkMyPe());
+      CmiSetHandler(compEnv, _roRdmaDoneHandlerIdx);
+      CmiSyncSendAndFree(ncpyOpInfo->srcPe, compEnv->getTotalsize(), (char *)compEnv);
+    }
+
+#if CMK_SMP
+    // Send a message to my first node to signal completion
+    envelope *sigEnv = _allocEnv(ROPeerCompletionMsg);
+    sigEnv->setSrcPe(CkMyPe());
+    CmiSetHandler(sigEnv, _roRdmaDoneHandlerIdx);
+    //CmiSetHandler(sigEnv, _initHandlerIdx);
+    CmiSyncSendAndFree(CmiNodeFirst(CmiMyNode()), sigEnv->getTotalsize(), (char *)sigEnv);
+#else
+    // Directly call checkInitDone
+    checkForInitDone();
+#endif
+
+  }
+}
 #endif
 /* End of CMK_ONESIDED_IMPL */
index d8cda27316d002ec02566462d87515df3736c623..777c40106ef734e96aa1aa60d89e5a1f26831948 100644 (file)
@@ -110,6 +110,11 @@ class CkNcpyBuffer{
     isRegistered = false;
   }
 
+  explicit CkNcpyBuffer(const void *ptr_, size_t cnt_, unsigned short int mode_) {
+    cb = CkCallback(CkCallback::ignore);
+    init(ptr_, cnt_, mode_);
+  }
+
   CkNcpyBuffer(const void *ptr_, size_t cnt_, CkCallback &cb_, unsigned short int mode_=CK_BUFFER_REG) {
     init(ptr_, cnt_, cb_, mode_);
   }
@@ -119,9 +124,13 @@ class CkNcpyBuffer{
   }
 
   void init(const void *ptr_, size_t cnt_, CkCallback &cb_, unsigned short int mode_=CK_BUFFER_REG) {
+    cb   = cb_;
+    init(ptr_, cnt_, mode_);
+  }
+
+  void init(const void *ptr_, size_t cnt_, unsigned short int mode_=CK_BUFFER_REG) {
     ptr  = ptr_;
     cnt  = cnt_;
-    cb   = cb_;
     pe   = CkMyPe();
     mode = mode_;
 
@@ -213,6 +222,8 @@ class CkNcpyBuffer{
   friend void constructDestinationBufferObject(NcpyOperationInfo *info, CkNcpyBuffer &dest);
 
   friend envelope* CkRdmaIssueRgets(envelope *env, ncpyEmApiMode emMode, void *forwardMsg);
+  friend int readonlyGet(CkNcpyBuffer &src, CkNcpyBuffer &dest, void *refPtr);
+  friend void readonlyCreateOnSource(CkNcpyBuffer &src);
 };
 
 // Ack handler for the Zerocopy Direct API
@@ -336,6 +347,49 @@ void handleBcastReverseEntryMethodApiCompletion(NcpyOperationInfo *info);
 // Method called on the root node and other intermediate parent nodes on completion of RGET through ZC Bcast
 void CkRdmaEMBcastAckHandler(void *ack);
 
+
+
+/***************************** Zerocopy Readonly Bcast Support ****************************/
+
+/* Support for Zerocopy Broadcast of large readonly variables */
+CkpvExtern(int, _numPendingRORdmaTransfers);
+
+struct NcpyROBcastBuffAckInfo {
+  const void *ptr;
+
+  int mode;
+
+  int pe;
+
+  // machine specific information about the buffer
+  #ifdef __GNUC__
+  #pragma GCC diagnostic push
+  #pragma GCC diagnostic ignored "-Wpedantic"
+  #endif
+  char layerInfo[CMK_COMMON_NOCOPY_DIRECT_BYTES + CMK_NOCOPY_DIRECT_BYTES];
+  #ifdef __GNUC__
+  #pragma GCC diagnostic pop
+  #endif
+};
+
+struct NcpyROBcastAckInfo {
+  int numChildren;
+  int counter;
+  bool isRoot;
+  int numops;
+  NcpyROBcastBuffAckInfo buffAckInfo[0];
+};
+
+void readonlyUpdateNumops();
+
+void readonlyAllocateOnSource();
+
+void readonlyCreateOnSource(CkNcpyBuffer &src);
+
+int readonlyGet(CkNcpyBuffer &src, CkNcpyBuffer &dest, void *refPtr);
+
+void readonlyGetCompleted(NcpyOperationInfo *ncpyOpInfo);
+
 #endif /* End of CMK_ONESIDED_IMPL */
 
 #endif
index 11bfba493a7230ab4b49a3a3afa6c6e348db1681..541ebf0b4ca2fd52e7fdb3aeef64d64564d91705 100644 (file)
@@ -113,6 +113,15 @@ UInt  _numInitMsgs = 0;
  * given node, this value must be seen by all processors in a node.
  */
 CksvDeclare(UInt,_numInitNodeMsgs);
+
+#if CMK_ONESIDED_IMPL
+UInt  numZerocopyROops;
+UInt  curROIndex;
+NcpyROBcastAckInfo *roBcastAckInfo;
+int   _roRdmaDoneHandlerIdx;
+CksvDeclare(int,  _numPendingRORdmaTransfers);
+#endif
+
 int   _infoIdx;
 int   _charmHandlerIdx;
 int   _initHandlerIdx;
@@ -883,11 +892,8 @@ void _initDone(void)
  */
 static void _triggerHandler(envelope *env)
 {
-  if (_numExpectInitMsgs && CkpvAccess(_numInitsRecd) + CksvAccess(_numInitNodeMsgs) == _numExpectInitMsgs)
-  {
-    DEBUGF(("Calling Init Done from _triggerHandler\n"));
-    _initDone();
-  }
+  DEBUGF(("Calling Init Done from _triggerHandler\n"));
+  checkForInitDone();
   if (env!=NULL) CmiFree(env);
 }
 
@@ -902,12 +908,38 @@ static inline void _processRODataMsg(envelope *env)
 {
   //Unpack each readonly:
   if(!CmiMyRank()) {
+#if CMK_ONESIDED_IMPL && CMK_SMP
+    if(CMI_IS_ZC_BCAST(env)) {
+      // Send message to peers
+      CmiForwardMsgToPeers(env->getTotalsize(), (char *)env);
+    }
+#endif
+
+    //Unpack each readonly
     PUP::fromMem pu((char *)EnvToUsr(env));
+
+#if CMK_ONESIDED_IMPL
+    pu|numZerocopyROops;
+    if(numZerocopyROops > 0) {
+      readonlyAllocateOnSource();
+    }
+#endif
+
     for(size_t i=0;i<_readonlyTable.size();i++) {
       _readonlyTable[i]->pupData(pu);
     }
+
+#if CMK_ONESIDED_IMPL
+    if(CMI_IS_ZC_BCAST(env)) {
+      if(findTransferMode(env->getSrcPe(), CkMyPe()) == CkNcpyMode::CMA) {
+        // Forward the env after replacing the pointers
+        CmiForwardProcBcastMsg(env->getTotalsize(), (char *)env);
+      }
+    }
+#endif
+  } else {
+    CmiFree(env);
   }
-  CmiFree(env);
 }
 
 /**
@@ -932,6 +964,56 @@ static void _roRestartHandler(void *msg)
   _triggerHandler(NULL);
 }
 
+#if CMK_ONESIDED_IMPL
+static void _roRdmaDoneHandler(envelope *env) {
+
+  switch(env->getMsgtype()) {
+    case ROPeerCompletionMsg:
+      checkForInitDone();
+      if (env!=NULL) CmiFree(env);
+      break;
+    case ROChildCompletionMsg:
+      roBcastAckInfo->counter++;
+      if(roBcastAckInfo->counter == roBcastAckInfo->numChildren) {
+        // deregister
+        for(int i=0; i < roBcastAckInfo->numops; i++) {
+          NcpyROBcastBuffAckInfo *buffAckInfo = &(roBcastAckInfo->buffAckInfo[i]);
+          CmiDeregisterMem(buffAckInfo->ptr,
+                           buffAckInfo->layerInfo +CmiGetRdmaCommonInfoSize(),
+                           buffAckInfo->pe,
+                           buffAckInfo->mode);
+        }
+
+        if(roBcastAckInfo->isRoot != 1) {
+          if(_topoTree == NULL) CkAbort("CkRdmaIssueRgets:: topo tree has not been calculated \n");
+          CmiSpanningTreeInfo &t = *_topoTree;
+
+          //forward message to root
+          // Send a message to the parent to signal completion in order to deregister
+          envelope *compEnv = _allocEnv(ROChildCompletionMsg);
+          compEnv->setSrcPe(CkMyPe());
+          CmiSetHandler(compEnv, _roRdmaDoneHandlerIdx);
+          CmiSyncSendAndFree(t.parent, compEnv->getTotalsize(), (char *)compEnv);
+        }
+      }
+      break;
+    default:
+      CmiAbort("Invalid msg type\n");
+      break;
+  }
+}
+#endif
+
+void checkForInitDone() {
+
+  bool noPendingRORdmaTransfers = true;
+#if CMK_ONESIDED_IMPL
+  noPendingRORdmaTransfers = (CksvAccess(_numPendingRORdmaTransfers) == 0);
+#endif
+  if (_numExpectInitMsgs && CkpvAccess(_numInitsRecd) + CksvAccess(_numInitNodeMsgs) == _numExpectInitMsgs && noPendingRORdmaTransfers)
+    _initDone();
+}
+
 /**
  * This handler is used only during initialization. It receives messages from
  * processor zero regarding Readonly Data (in one single message), Readonly Messages,
@@ -991,9 +1073,7 @@ static void _initHandler(void *msg, CkCoreState *ck)
       CmiAbort("Internal Error: Unknown-msg-type. Contact Developers.\n");
   }
   DEBUGF(("[%d,%.6lf] _numExpectInitMsgs %d CkpvAccess(_numInitsRecd)+CksvAccess(_numInitNodeMsgs) %d+%d\n",CmiMyPe(),CmiWallTimer(),_numExpectInitMsgs,CkpvAccess(_numInitsRecd),CksvAccess(_numInitNodeMsgs)));
-  if(_numExpectInitMsgs&&(CkpvAccess(_numInitsRecd)+CksvAccess(_numInitNodeMsgs)==_numExpectInitMsgs)) {
-    _initDone();
-  }
+  checkForInitDone();
 }
 
 #if CMK_SHRINK_EXPAND
@@ -1200,6 +1280,10 @@ void _initCharm(int unused_argc, char **argv)
        CksvInitialize(bool, _triggersSent);
        CksvAccess(_triggersSent) = false;
 
+#if CMK_ONESIDED_IMPL
+       CksvInitialize(int, _numPendingRORdmaTransfers);
+#endif
+
        CkpvInitialize(_CkOutStream*, _ckout);
        CkpvInitialize(_CkErrStream*, _ckerr);
        CkpvInitialize(Stats*, _myStats);
@@ -1218,6 +1302,11 @@ void _initCharm(int unused_argc, char **argv)
        {
                CksvAccess(_numNodeGroups) = 1; //make 0 an invalid group number
                CksvAccess(_numInitNodeMsgs) = 0;
+
+#if CMK_ONESIDED_IMPL
+               CksvAccess(_numPendingRORdmaTransfers) = 0;
+#endif
+
                CksvAccess(_nodeLock) = CmiCreateLock();
                CksvAccess(_nodeGroupTable) = new GroupTable();
                CksvAccess(_nodeGroupTable)->init();
@@ -1244,6 +1333,11 @@ void _initCharm(int unused_argc, char **argv)
        CmiAssignOnce(&_charmHandlerIdx, CkRegisterHandler(_bufferHandler));
        CmiAssignOnce(&_initHandlerIdx, CkRegisterHandlerEx(_initHandler, CkpvAccess(_coreState)));
        CmiAssignOnce(&_roRestartHandlerIdx, CkRegisterHandler(_roRestartHandler));
+
+#if CMK_ONESIDED_IMPL
+       CmiAssignOnce(&_roRdmaDoneHandlerIdx, CkRegisterHandler(_roRdmaDoneHandler));
+#endif
+
        CmiAssignOnce(&_exitHandlerIdx, CkRegisterHandler(_exitHandler));
        //added for interoperabilitY
        CmiAssignOnce(&_libExitHandlerIdx, CkRegisterHandler(_libExitHandler));
@@ -1659,20 +1753,45 @@ void _initCharm(int unused_argc, char **argv)
                        _numInitMsgs++;
                }
 
+#if CMK_ONESIDED_IMPL
+               numZerocopyROops = 0;
+               curROIndex = 0;
+#endif
+
                //Determine the size of the RODataMessage
                PUP::sizer ps;
+
+#if CMK_ONESIDED_IMPL
+               ps|numZerocopyROops;
+#endif
+
                for(i=0;i<_readonlyTable.size();i++) _readonlyTable[i]->pupData(ps);
 
+#if CMK_ONESIDED_IMPL
+               if(numZerocopyROops > 0)
+                       readonlyAllocateOnSource();
+#endif
+
                //Allocate and fill out the RODataMessage
                envelope *env = _allocEnv(RODataMsg, ps.size());
                PUP::toMem pp((char *)EnvToUsr(env));
+#if CMK_ONESIDED_IMPL
+               pp|numZerocopyROops;
+#endif
                for(i=0;i<_readonlyTable.size();i++) _readonlyTable[i]->pupData(pp);
 
                env->setCount(++_numInitMsgs);
                env->setSrcPe(CkMyPe());
                CmiSetHandler(env, _initHandlerIdx);
                DEBUGF(("[%d,%.6lf] RODataMsg being sent of size %d \n",CmiMyPe(),CmiWallTimer(),env->getTotalsize()));
-               CmiSyncBroadcastAndFree(env->getTotalsize(), (char *)env);
+               CmiSyncBroadcast(env->getTotalsize(), (char *)env);
+#if CMK_ONESIDED_IMPL && CMK_SMP
+               if(numZerocopyROops > 0) {
+                       // Send message to peers
+                       CmiForwardMsgToPeers(env->getTotalsize(), (char *)env);
+               }
+#endif
+               CmiFree(env);
                CpvAccess(_qd)->create(CkNumPes()-1);
                _initDone();
        } else {
index eb67b482abf15d78e60e9bcf0e6821dc98b5917f..e6c3078a7c14a3daf90e3e3e85e6c7f43e5bab65 100644 (file)
@@ -432,6 +432,10 @@ class mem : public er { //Memory-buffer packers and unpackers
     return reinterpret_cast<char*>(buf);
   }
 
+  inline char* get_orig_pointer() const {
+    return reinterpret_cast<char*>(origBuf);
+  }
+
   inline void advance(size_t const offset) {
     buf += offset;
   }
index 0ee14c1e25e78fc113d3280fa542b78de7b9f9bd..f4e9d4e4ca82a94ee7f45a78a9753ec2b14cfb2e 100644 (file)
@@ -72,13 +72,110 @@ void Readonly::genDefs(XStr& str) {
     str << "(void *_impl_pup_er) {\n";
     str << "  PUP::er &_impl_p=*(PUP::er *)_impl_pup_er;\n";
     if (dims) {
+      // Setting CMK_ONESIDED_RO_DISABLE provides a compile time switch to turn off RO ZC Bcast
+      str <<"#if !CMK_ONESIDED_RO_DISABLE && CMK_ONESIDED_IMPL\n";
+      // Conditional to selectively pup ncpy buffers
+      str <<"  if(";
+      dims->printValueProduct(str);
+      str <<" * sizeof(";
+      type->print(str);
+      str << ") >= CMK_ONESIDED_RO_THRESHOLD) {\n";
+      str <<"    CkNcpyBuffer myBuffer(& "<< qName();
+      dims->printZeros(str);
+      str <<", (";
+      dims->printValueProduct(str);
+      str <<" * sizeof(";
+      type->print(str);
+      str <<")), CK_BUFFER_REG);\n";
+      str <<"    if(_impl_p.isPacking() || _impl_p.isSizing()) {\n";
+      str <<"      _impl_p|myBuffer;\n";
+      str <<"      if(_impl_p.isSizing())\n";
+      str <<"        readonlyUpdateNumops();\n";
+      str <<"      else\n";
+      str <<"        readonlyCreateOnSource(myBuffer);\n";
+      str <<"    }\n";
+      str <<"    if(_impl_p.isPacking()) {\n";
+      str <<"      PUP::toMem &_impl_p_toMem = *(PUP::toMem *)_impl_pup_er;\n";
+      str <<"      envelope *env = UsrToEnv(_impl_p_toMem.get_orig_pointer());\n";
+      str <<"      CMI_ZC_MSGTYPE(env) = CMK_ZC_BCAST_SEND_MSG;\n";
+      str <<"    }\n";
+      str <<"    if(_impl_p.isUnpacking()) {\n";
+      str <<"      PUP::fromMem &_impl_p_fromMem = *(PUP::fromMem *)_impl_pup_er;\n";
+      str <<"      char *ptr = _impl_p_fromMem.get_current_pointer();\n";
+      str <<"      PUP::toMem _impl_p_toMem = (PUP::toMem)((void *)ptr);\n";
+      str <<"      envelope *env = UsrToEnv(_impl_p_toMem.get_orig_pointer());\n";
+      str <<"      CkNcpyBuffer srcBuffer;\n";
+      str <<"      _impl_p|srcBuffer;\n";
+      str <<"      _impl_p_toMem|myBuffer;\n";
+      str <<"      readonlyGet(srcBuffer, myBuffer, (void *)env);\n";
+      str <<"    }\n";
+      str <<"  } else\n";
+      str <<"#endif\n";
+      str << "  {\n";
       str << "  _impl_p(&" << qName();
       dims->printZeros(str);
       str << ", (";
       dims->printValueProduct(str);
       str << ") );\n";
+      str << "  }\n";
     } else {
-      str << "  _impl_p|" << qName() << ";\n";
+      if(strcmp("vector",type->getBaseName()) == 0) {
+        // Setting CMK_ONESIDED_RO_DISABLE provides a compile time switch to turn off RO ZC Bcast
+        str <<"#if !CMK_ONESIDED_RO_DISABLE && CMK_ONESIDED_IMPL\n";
+        NamedType *nType = (NamedType *)type;
+
+        // Determine if the vector is going to be using ZC bcast
+        str <<"  bool vecUsesZC = false;\n";
+        str <<"  if(_impl_p.isPacking() || _impl_p.isSizing()) {\n";
+        // Add conditional to selectively pup ncpy buffers
+        str <<"    vecUsesZC = (" << qName() <<".size()";
+        str <<" * sizeof(" << nType->getTparams() <<")";
+        str <<" >= CMK_ONESIDED_RO_THRESHOLD);\n";
+        str <<"  }\n";
+
+        str <<"  _impl_p|vecUsesZC;\n";
+
+        str <<"  if(vecUsesZC) {\n";
+        str <<"    if(_impl_p.isPacking() || _impl_p.isSizing()) {\n";
+        str <<"      CkNcpyBuffer myBuffer("<< qName() << ".data()";
+        str <<", " << "sizeof(" << nType->getTparams() << ") * "<< qName() <<".size()";
+        str <<", CK_BUFFER_REG);\n";
+        str <<"      _impl_p|myBuffer;\n";
+        str <<"      if(_impl_p.isSizing())\n";
+        str <<"        readonlyUpdateNumops();\n";
+        str <<"      else\n";
+        str <<"        readonlyCreateOnSource(myBuffer);\n";
+        str <<"    }\n";
+        str <<"    if(_impl_p.isPacking()) {\n";
+        str <<"      PUP::toMem &_impl_p_toMem_orig = *(PUP::toMem *)_impl_pup_er;\n";
+        str <<"      envelope *env = UsrToEnv(_impl_p_toMem_orig.get_orig_pointer());\n";
+        str <<"      CMI_ZC_MSGTYPE(env) = CMK_ZC_BCAST_SEND_MSG;\n";
+        str <<"    }\n";
+        str <<"    if(_impl_p.isUnpacking()) {\n";
+        str <<"      PUP::fromMem &_impl_p_fromMem = *(PUP::fromMem *)_impl_pup_er;\n";
+        str <<"      PUP::toMem &_impl_p_toMem_orig = *(PUP::toMem *)_impl_pup_er;\n";
+        str <<"      envelope *env = UsrToEnv(_impl_p_toMem_orig.get_orig_pointer());\n";
+        str <<"      char *ptr = _impl_p_fromMem.get_current_pointer();\n";
+        str <<"      PUP::toMem _impl_p_toMem = (PUP::toMem)((void *)ptr);\n";
+        str <<"      CkNcpyBuffer srcBuffer;\n";
+        str <<"      _impl_p|srcBuffer;\n";
+        str <<"      size_t nElem = ";
+        str <<"(srcBuffer.cnt)/sizeof("<<nType->getTparams()<<");\n";
+        str <<"      " << qName() <<".resize(nElem);\n";
+        str <<"      " << qName() <<".shrink_to_fit();\n";
+        str <<"      CkNcpyBuffer myBuffer("<< qName() << ".data()";
+        str <<", " << "srcBuffer.cnt, CK_BUFFER_REG);\n";
+        str <<"      _impl_p_toMem|myBuffer;\n";
+        str <<"      readonlyGet(srcBuffer, myBuffer, (void *)env);\n";
+        str <<"    }\n";
+        str <<"  } else\n";
+        str <<"#endif\n";
+        str <<"  {\n";
+        str <<"    _impl_p|" << qName() << ";\n";
+        str <<"  }\n";
+      } else {
+        str <<"  _impl_p|" << qName() << ";\n";
+      }
     }
     str << "}\n";
     templateGuardEnd(str);
index be564625fdc889d439dfdf33bb71f31152e48a84..ea669dbf5dcea092c02c6f9abe36bbe7be345790 100644 (file)
@@ -85,8 +85,10 @@ class NamedType : public Type {
   int isTemplated(void) const { return (tparams != 0); }
   int isCkArgMsg(void) const { return 0 == strcmp(name, "CkArgMsg"); }
   int isCkMigMsg(void) const { return 0 == strcmp(name, "CkMigrateMessage"); }
+  int isVector(void) const { return 0 == strcmp(name, "vector"); }
   void print(XStr& str);
   int isNamed(void) const { return 1; }
+  TParamList *getTparams(void) const { return tparams; }
   virtual const char* getBaseName(void) const { return name; }
   virtual const char* getScope(void) const { return scope; }
   virtual void genProxyName(XStr& str, forWhom forElement);
index 822dd49a919aaf7ec6c2fd3c046447adde74af4e..a77e8f38369089323abfd683192c50bfd0b63cbb 100644 (file)
@@ -95,9 +95,9 @@ void check_test(int argc, char** argv) {
   // incremented, and the CharmDebug correspondant enumeration (in
   // charm.debug.pdata.MsgInfo.java) is updated accordingly.
 #if CMK_LOCKLESS_QUEUE
-  if (LAST_CK_ENVELOPE_TYPE != 23) {
+  if (LAST_CK_ENVELOPE_TYPE != 25) {
 #else
-  if (LAST_CK_ENVELOPE_TYPE != 21) {
+  if (LAST_CK_ENVELOPE_TYPE != 23) {
 #endif
     CmiPrintf("Error: LAST_CK_ENVELOPE_TYPE changed. Update CharmDebug and fix this test:\n");
     CmiPrintf("       BEFORE changing this test, make sure the CHARMDEBUG_MINOR version number is incremented, and the CharmDebug correspondant enumeration (in charm.debug.pdata.MsgInfo.java) is updated accordingly.");