AMPI: Add support for and example of primitive Charm++ interoperation 66/4366/23
authorSam White <white67@illinois.edu>
Tue, 17 Jul 2018 21:10:48 +0000 (16:10 -0500)
committerEvan Ramos <evan@hpccharm.com>
Fri, 15 Feb 2019 20:45:12 +0000 (14:45 -0600)
- Adds a new header ampi-interoperate.h
- Adds a new entry method 'void injectMsg(int size, char buf[size])' to
  the ampi class, for use in sending messages b/w AMPI ranks and chares.
- Adds an example application that uses a Group to interoperate an AMPI
  program with a Charm++ one.

Co-authored-by: Evan Ramos <evan@hpccharm.com>
Change-Id: I58fa26958eb70ca8b9974c6f0ea0e7c0036071c4

15 files changed:
doc/ampi/manual.rst
doc/ampi/manual.tex
examples/charm++/AMPI-interop/AmpiInterop.C [new file with mode: 0644]
examples/charm++/AMPI-interop/AmpiInterop.ci [new file with mode: 0644]
examples/charm++/AMPI-interop/AmpiInterop.h [new file with mode: 0644]
examples/charm++/AMPI-interop/Makefile [new file with mode: 0644]
examples/charm++/AMPI-interop/README [new file with mode: 0644]
examples/charm++/AMPI-interop/exampleMpi.C [new file with mode: 0644]
examples/charm++/AMPI-interop/hello.C [new file with mode: 0644]
examples/charm++/AMPI-interop/hello.ci [new file with mode: 0644]
src/libs/ck-libs/ampi/Makefile
src/libs/ck-libs/ampi/ampi-interoperate.h [new file with mode: 0644]
src/libs/ck-libs/ampi/ampi.C
src/libs/ck-libs/ampi/ampi.ci
src/libs/ck-libs/ampi/ampiimpl.h

index 917e6a3a6bc59a1493a7ea4dc37a2e11ed9ebc5c..3906ecfc10c7342838bd2267416168f03cc24a07 100644 (file)
@@ -1182,6 +1182,17 @@ this data:
    ierr = MPI_Recv(InitialTime, 1, MPI_DOUBLE, tag,
                    36, MPI_Comm_Universe(Solids_Comm), &stat);
 
+Charm++ Interoperability
+------------------------
+
+There is preliminary support for interoperating AMPI programs with Charm++
+programs. This allows users to launch an AMPI program with an arbitrary number
+of virtual processes in the same executable as a Charm++ program that contains
+arbitrary collections of chares, with both AMPI ranks and chares being co-scheduled
+by the runtime system. We also provide an entry method ``void injectMsg(int n, char buf[n])``
+for chares to communicate with AMPI ranks. An example program can be found in
+``examples/charm++/AMPI-interop``.
+
 Extensions for Sequential Re-run of a Parallel Node
 ---------------------------------------------------
 
index 2e18a0fce356def3614faa1376ddc87060ea4d32..71ad0da6083e4e099a5e65699dbd6d43819db37a 100644 (file)
@@ -1100,6 +1100,15 @@ ierr = MPI_Recv(InitialTime, 1, MPI_DOUBLE, tag,
                 \emph{36, MPI_Comm_Universe(Solids_Comm)}, &stat);
 \end{alltt}
 
+\subsection{\charmpp{} Interoperability}
+There is preliminary support for interoperating AMPI programs with \charmpp{}
+programs. This allows users to launch an AMPI program with an arbitrary number
+of virtual processes in the same executable as a \charmpp{} program that contains
+arbitrary collections of chares, with both AMPI ranks and chares being co-scheduled
+by the runtime system. We also provide an entry method "void injectMsg(int n, char buf[n])"
+for chares to communicate with AMPI ranks. An example program can be found in
+\examplerefdir{AMPI$-$interop}.
+
 \subsection{Extensions for Sequential Re-run of a Parallel Node}
 In some scenarios, a sequential re-run of a parallel node is desired. One
 example is instruction-level accurate architecture simulations, in which case
diff --git a/examples/charm++/AMPI-interop/AmpiInterop.C b/examples/charm++/AMPI-interop/AmpiInterop.C
new file mode 100644 (file)
index 0000000..ec75112
--- /dev/null
@@ -0,0 +1,77 @@
+#include "stdio.h"
+#include "AmpiInterop.h"
+/*readonly*/ CProxy_AmpiInterop ampiInteropProxy;
+/*readonly*/ int nRanks;
+
+void dummy_mpi_fn(void* in, void* out) {}
+
+class AmpiInterop : public CBase_AmpiInterop {
+  AmpiInterop_SDAG_CODE
+  CProxy_ampi ampiProxy;
+  bool finished;
+
+  public:
+  AmpiInterop() {
+    finished = false;
+  }
+  bool isFinished() {
+    return finished;
+  }
+  void call_mpi_fn(int pe, int chareIndex, MpiCallData buf) {
+    // Wrap chare array index around number of AMPI ranks?
+    //if (chareIndex >= nRanks) chareIndex = chareIndex % nRanks;
+
+    // Send a message
+    ampiProxy[chareIndex].injectMsg(sizeof(buf), (char*)&buf);
+  }
+  void finish() {
+    // target called on processor 0 only
+    finished = true;
+    CkCallback cb(CkReductionTarget(AmpiInterop, finalize), 0, thisProxy);
+    contribute(cb);
+  }
+  void finalize() {
+    // send a dummy message
+    MpiCallData buf;
+    buf.fn = dummy_mpi_fn;
+    buf.cb = CkCallback(CkCallback::ignore);
+    ampiProxy.injectMsg(sizeof(buf), (char*)&buf);
+  }
+};
+
+void AmpiInteropInit() {
+  ampiInteropProxy = CProxy_AmpiInterop::ckNew();
+  ampiInteropProxy.run();
+}
+
+int main(int argc, char **argv) {
+  int rank;
+  MPI_Status status;
+
+  MPI_Init(&argc, &argv);
+  MPI_Comm_size(MPI_COMM_WORLD, &nRanks);
+  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
+
+  if (rank == 0) {
+    CkPrintf("[%d] In AMPI: running on %d PEs with %d AMPI ranks\n", CkMyPe(), CkNumPes(), nRanks);
+    ampi *ptr = getAmpiInstance(MPI_COMM_WORLD);
+    const ampiCommStruct &dest = ptr->comm2CommStruct(MPI_COMM_WORLD);
+    ampiInteropProxy.init(dest.getProxy());
+  }
+  CkPrintf("[%d] In AMPI: created AMPI rank %d\n", CkMyPe(), rank);
+
+  MPI_Barrier(MPI_COMM_WORLD);
+
+  while (!ampiInteropProxy.ckLocalBranch()->isFinished()) {
+    MpiCallData buf;
+    MPI_Recv((void*)&buf, sizeof(buf), MPI_BYTE, rank, 0, MPI_COMM_WORLD, &status);
+    buf.fn(buf.in, buf.out);
+    buf.cb.send(NULL);
+  }
+
+  CkPrintf("[%d] In AMPI: finalizing AMPI rank %d\n", CkMyPe(), rank);
+  MPI_Finalize();
+  return 0;
+}
+
+#include "AmpiInterop.def.h"
diff --git a/examples/charm++/AMPI-interop/AmpiInterop.ci b/examples/charm++/AMPI-interop/AmpiInterop.ci
new file mode 100644 (file)
index 0000000..8fb88e3
--- /dev/null
@@ -0,0 +1,24 @@
+module AmpiInterop {
+
+  readonly CProxy_AmpiInterop ampiInteropProxy;
+  readonly int nRanks;
+
+  group AmpiInterop {
+    entry void AmpiInterop();
+    entry void init(CProxy_ampi);
+    entry void callMpiFn(int pe, int index, MpiCallData mcall);
+    entry void finish();
+    entry [reductiontarget] void finalize();
+
+    entry void run() {
+      when init(CProxy_ampi _ampiP) serial {
+        ampiProxy = _ampiP;
+      }
+      while (!finished) {
+        when callMpiFn(int pe, int index, MpiCallData mcall) serial {
+          call_mpi_fn(pe, index, mcall);
+        }
+      }
+    };
+  };
+};
diff --git a/examples/charm++/AMPI-interop/AmpiInterop.h b/examples/charm++/AMPI-interop/AmpiInterop.h
new file mode 100644 (file)
index 0000000..c3d5de6
--- /dev/null
@@ -0,0 +1,41 @@
+
+typedef void (*LibMpiFn)(void*,void*);
+
+#include "mpi.h"
+#include "ampi-interoperate.h"
+
+struct MpiCallData {
+
+  static constexpr size_t size = sizeof(LibMpiFn)+2*sizeof(void*);
+
+  LibMpiFn fn;
+  void *in;
+  void *out;
+  CkCallback cb;
+  unsigned char pup_dat[size];
+
+  MpiCallData() {
+  }
+
+  ~MpiCallData() {
+  }
+
+  void pup(PUP::er &p) {
+    if (p.isPacking()) {
+      memcpy(pup_dat, &fn, sizeof(LibMpiFn));
+      memcpy(pup_dat+sizeof(LibMpiFn), &in, sizeof(void*));
+      memcpy(pup_dat+sizeof(LibMpiFn)+sizeof(void*), &out, sizeof(void*));
+    }
+    PUParray(p, pup_dat, size);
+    if (p.isUnpacking()) {
+      memcpy(&fn, pup_dat, sizeof(LibMpiFn));
+      memcpy(&in, pup_dat+sizeof(LibMpiFn), sizeof(void*));
+      memcpy(&out, pup_dat+sizeof(LibMpiFn)+sizeof(void*), sizeof(void*));
+    }
+    p|cb;
+  }
+};
+
+#include "AmpiInterop.decl.h"
+extern CProxy_AmpiInterop ampiInteropProxy;
+void AmpiInteropInit();
diff --git a/examples/charm++/AMPI-interop/Makefile b/examples/charm++/AMPI-interop/Makefile
new file mode 100644 (file)
index 0000000..279ccc6
--- /dev/null
@@ -0,0 +1,31 @@
+-include ../../../common.mk
+CHARMDIR=../../..
+CHARMC=$(CHARMDIR)/bin/charmc $(OPTS)
+AMPICC=$(CHARMDIR)/bin/ampicxx $(OPTS)
+
+all: hello
+
+AmpiInterop.decl.h: AmpiInterop.ci
+       $(CHARMC) AmpiInterop.ci
+
+AmpiInterop.o: AmpiInterop.C AmpiInterop.decl.h AmpiInterop.h
+       $(AMPICC) -c AmpiInterop.C
+
+hello.decl.h: hello.ci
+       $(CHARMC) hello.ci
+
+hello.o: hello.C hello.decl.h
+       $(CHARMC) -c hello.C
+
+exampleMpi.o: exampleMpi.C
+       $(AMPICC) -c exampleMpi.C
+
+hello: AmpiInterop.o  hello.o exampleMpi.o
+       $(AMPICC) -o hello hello.o exampleMpi.o AmpiInterop.o
+
+test: all
+       $(call run, ./hello +p4 8 +vp8 )
+
+clean:
+       rm -f hello *.o *.a charmrun ampirun AmpiInterop.decl.h AmpiInterop.def.h hello.decl.h hello.def.h
+
diff --git a/examples/charm++/AMPI-interop/README b/examples/charm++/AMPI-interop/README
new file mode 100644 (file)
index 0000000..23d58f2
--- /dev/null
@@ -0,0 +1,14 @@
+This is a 'proof of concept' example program showing AMPI + Charm++
+  interoperation. It runs with virtualization in both the number of
+  chares and AMPI ranks.
+
+Warning: This functionality is experimental and may not be production-ready.
+
+To build:
+  $ make
+
+To run:
+  $ ./charmrun +p <numPEs> ./hello <numChares> +vp <numRanks>
+
+For example:
+  $ ./charmrun +p4 ./hello 8 +vp8 ++local
diff --git a/examples/charm++/AMPI-interop/exampleMpi.C b/examples/charm++/AMPI-interop/exampleMpi.C
new file mode 100644 (file)
index 0000000..b5359d6
--- /dev/null
@@ -0,0 +1,9 @@
+#include <stdio.h>
+#include "charm++.h"
+#include "mpi.h"
+
+void exm_mpi_fn(void* in, void* out) {
+  int rank;
+  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
+  printf("[%d] In AMPI: Hello[%d] from AMPI rank %d\n", CkMyPe(), *((int*)in), rank);
+}
diff --git a/examples/charm++/AMPI-interop/hello.C b/examples/charm++/AMPI-interop/hello.C
new file mode 100644 (file)
index 0000000..bdc1e6c
--- /dev/null
@@ -0,0 +1,70 @@
+#include <stdio.h>
+#include "AmpiInterop.h"
+#include "hello.decl.h"
+
+void exm_mpi_fn(void* in, void* out);
+
+/*readonly*/ CProxy_Main mainProxy;
+/*readonly*/ int nElements;
+
+/*mainchare*/
+class Main : public CBase_Main {
+public:
+  Main(CkArgMsg* m) {
+    // Process command-line arguments
+    nElements = 2;
+    if (m->argc > 1) nElements=atoi(m->argv[1]);
+    delete m;
+
+    // Init the AMPI chare group
+    AmpiInteropInit();
+
+    // Init the Hello chare array
+    CkPrintf("[%d] In Charm++: running on %d PEs with %d chares\n",
+             CkMyPe(), CkNumPes(), nElements);
+    mainProxy = thisProxy;
+    CProxy_Hello arr = CProxy_Hello::ckNew(nElements);
+    arr[0].sayHi(0);
+  };
+
+  void done() {
+    CkPrintf("[%d] In Charm++: done\n", CkMyPe());
+  };
+};
+
+/*array [1D]*/
+class Hello : public CBase_Hello {
+  int myDat, resDat;
+
+public:
+  Hello() {
+    myDat = thisIndex;
+    CkPrintf("[%d] In Charm++: created Chare idx %d\n", CkMyPe(), thisIndex);
+  }
+
+  Hello(CkMigrateMessage *m) {}
+
+  void sayHi(int n) {
+    CkPrintf("[%d] In Charm++: hello[%d] from Chare idx %d\n", CkMyPe(), n, thisIndex);
+    MpiCallData mpiCall;
+    mpiCall.fn  = exm_mpi_fn;
+    mpiCall.in  = &myDat;
+    mpiCall.out = &resDat;
+    mpiCall.cb  = CkCallback(CkIndex_Hello::doneLibCall(), thisProxy[thisIndex]);
+    ampiInteropProxy[CkMyPe()].callMpiFn(CkMyPe(), thisIndex, mpiCall);
+
+    // Pass the hello on
+    if (thisIndex < nElements-1) {
+      thisProxy[thisIndex+1].sayHi(thisIndex+1);
+    }
+  }
+
+  void doneLibCall() {
+    if (thisIndex == nElements-1) {
+      ampiInteropProxy.finish();
+      mainProxy.done();
+    }
+  }
+};
+
+#include "hello.def.h"
diff --git a/examples/charm++/AMPI-interop/hello.ci b/examples/charm++/AMPI-interop/hello.ci
new file mode 100644 (file)
index 0000000..0747ed8
--- /dev/null
@@ -0,0 +1,18 @@
+mainmodule hello {
+
+  extern module AmpiInterop;
+
+  readonly CProxy_Main mainProxy;
+  readonly int nElements;
+
+  mainchare Main {
+    entry Main(CkArgMsg *m);
+    entry void done();
+  };
+
+  array [1D] Hello {
+    entry Hello();
+    entry void sayHi(int n);
+    entry void doneLibCall();
+  };
+};
index ed1f08d5e11802923a990bd886579cf3ed0034b4..25bd3b752f143c26de142b0ef375c8272cb874d2 100644 (file)
@@ -2,8 +2,8 @@ CDIR := $(shell cd ../../../.. && pwd)
 -include $(CDIR)/include/conv-mach-opt.mak
 CHARMC=$(CDIR)/bin/charmc $(OPTS)
 
-HEADERS=ampi.h ampif.h mpio.h mpiof.h ampiimpl.h
-HEADDEP=$(HEADERS) ampiimpl.h ddt.h ampi.decl.h \
+HEADERS=ampi.h ampif.h mpio.h mpiof.h ampiimpl.h ampi-interoperate.h ampi.decl.h
+HEADDEP=$(HEADERS) ampiimpl.h ddt.h \
                ../tcharm/tcharm.h ../tcharm/tcharm_impl.h
 COMPAT=compat_ampius.o compat_ampifus.o compat_ampi.o \
        compat_ampim.o compat_ampifm.o compat_ampicm.o \
@@ -81,6 +81,9 @@ headers: $(HEADERS)
        cp $(HEADERS) $(CDIR)/include/
        cp ampi.h $(CDIR)/include/mpi.h
        cp ampif.h $(CDIR)/include/mpif.h
+       cp ampiimpl.h $(CDIR)/include/          # For AMPI + Charm++ interop
+       cp ddt.h $(CDIR)/include/               # For AMPI + Charm++ interop
+       cp ampi-interoperate.h $(CDIR)/include/ # For AMPI + Charm++ interop
        cp ampiCC $(CDIR)/bin/ampicc
        cp ampiCC $(CDIR)/bin/ampiCC
        cp ampiCC $(CDIR)/bin/ampicxx
diff --git a/src/libs/ck-libs/ampi/ampi-interoperate.h b/src/libs/ck-libs/ampi/ampi-interoperate.h
new file mode 100644 (file)
index 0000000..9ff3a8e
--- /dev/null
@@ -0,0 +1,8 @@
+#ifndef _AMPI_INTEROPERATE_H
+#define _AMPI_INTEROPERATE_H
+
+/* This header is used for AMPI + Charm++ interoperation. */
+#include "ampiimpl.h"
+#include "ddt.h"
+
+#endif //_AMPI_INTEROPERATE_H
\ No newline at end of file
index 45f3be4af7628641010d1bf10aa08860f0aaeaa5..dad98cf5040973cf89c6439534b0a7da168e3ef4 100644 (file)
@@ -2624,6 +2624,11 @@ void ampi::ssend_ack(int sreq_idx) noexcept {
   }
 }
 
+void ampi::injectMsg(int size, char* buf) noexcept
+{
+  generic(makeAmpiMsg(thisIndex, 0, thisIndex, (void*)buf, size, MPI_CHAR, MPI_COMM_WORLD, 0));
+}
+
 void ampi::generic(AmpiMsg* msg) noexcept
 {
   MSG_ORDER_DEBUG(
index e6aef666f57e592a105ec4de12cd92a2bfa3b7f2..5e878eca73f5bbc0b8c434e22b8e6fc6ee62a63a 100644 (file)
@@ -44,6 +44,7 @@ module ampi {
     entry EXPEDITED void setInitDoneFlag();
     entry EXPEDITED void unblock(void);
     entry EXPEDITED void ssend_ack(int);
+    entry EXPEDITED void injectMsg(int size, char buf[size]);
     entry EXPEDITED void generic(AmpiMsg *);
     entry EXPEDITED void genericRdma(nocopy char buf[size], int size, CMK_REFNUM_TYPE seq, int tag,
                                      int srcRank, MPI_Comm destcomm, int ssendReq);
index a870eba403c18aae9d02e93177903d53b6cda3d3..cd97f2ce8ee60baa4bd4261b917aa879b6226246 100644 (file)
@@ -2473,6 +2473,7 @@ class ampi final : public CBase_ampi {
   void setInitDoneFlag() noexcept;
 
   void unblock() noexcept;
+  void injectMsg(int size, char* buf) noexcept;
   void generic(AmpiMsg *) noexcept;
   void genericRdma(char* buf, int size, CMK_REFNUM_TYPE seq, int tag, int srcRank,
                    MPI_Comm destcomm, int ssendReq) noexcept;