CkLoop: add hybrid version 59/4459/26
authorMatthias Diener <mdiener@illinois.edu>
Wed, 8 Aug 2018 02:34:14 +0000 (21:34 -0500)
committerSam White <white67@illinois.edu>
Thu, 18 Oct 2018 19:12:45 +0000 (14:12 -0500)
This adds the hybrid mode to CkLoop. In this mode,
each PE's share of the iteration space is divided between
a static and a dynamic part. The dynamic part is inserted
in a PE's task queue, and PE's can steal work from other PE's
queues after finishing their own static and dynamic work.
This also adds drone mode that can be enabled by passing
--enable-drone-mode to the build command. Drone mode maps
chares to only rank 0 of nodes, other ranks perform work
through work stealing. --enable-task-queue to be used
with build command has been added to allow enabling
task queue.

Change-Id: I36ad2052adfecc2336bbdda47204144372f95313

12 files changed:
doc/charm++/looplevel.tex
examples/charm++/ckloop/dotProd/Makefile [new file with mode: 0644]
examples/charm++/ckloop/dotProd/dotProd.C [new file with mode: 0644]
examples/charm++/ckloop/dotProd/dotProd.ci [new file with mode: 0644]
src/ck-core/cklocation.C
src/ck-core/cklocation.h
src/ck-ldb/CentralLB.C
src/conv-core/convcore.c
src/libs/ck-libs/ckloop/CkLoop.C
src/libs/ck-libs/ckloop/CkLoop.h
src/libs/ck-libs/ckloop/CkLoopAPI.h
src/scripts/configure.ac

index 652118e5a02a2541fb3b070b64f05b2b27ef28c9..c7025573a88e65d0c0e56c3d1949dd1f5659ade2 100644 (file)
@@ -25,6 +25,8 @@ shared-memory multithreading runtime will waste computational power because
 those dedicated cores are not utilized at all during most of the application's
 execution time. This case indicates the necessity of a unified
 runtime supporting both types of parallelism.
+
+
 \section{CkLoop library}
 The \emph{CkLoop} library is an add-on to the \charmpp{} runtime to achieve such
 a unified runtime.  The library implements a simple OpenMP-like shared-memory
@@ -97,10 +99,60 @@ Examples using this library can be found in \examplerefdir{ckloop} and the
 widely used molecular dynamics simulation application
 NAMD\footnote{http://www.ks.uiuc.edu/Research/namd}.
 
+\subsection{The CkLoop Hybrid library}
+The CkLoop\_Hybrid library is a mode of CkLoop that incorporates specific
+adaptive scheduling strategies aimed at providing a tradeoff between dynamic
+load balance and spatial locality. It is used in a build of Charm++ where all
+chares are placed on core 0 of each node (called the drone-mode, or
+all-drones-mode). It incorporates a strategy called staggered static-dynamic
+scheduling (from dissertation work of Vivek Kale). The iteration space is
+first tentatively divided approximately equally to all available PEs. Each
+PE's share of the iteration space is divided into a static portion, specified by
+the staticFraction parameter below, and the remaining dynamic portion. The dynamic
+portion of a PE is divided into chunks of specified chunksize, and enqueued in
+the task-queue associated with that PE. Each PE works on its static portion,
+and then on its own task queue (thus preserving spatial locality, as well as
+persistence of allocations across outer iterations), and after finishing that,
+steals work from other PE’s task queues.
+
+CkLoopHybrid support requires the SMP mode of \charmpp{} and the additional flags --enable-drone-mode and
+--enable-task-queue to be passed as build options when Charm++ is built.
+
+The changes to the CkLoop API call are the following:
+
+\begin{itemize}
+  \item \textbf{CkLoop\_Init} does not need to be called
+  \item \textbf{CkLoop\_SetSchedPolicy} is not supported
+  \item \textbf{CkLoop\_Exit} does not need to be called
+  \item \textbf{CkLoop\_Parallelize} call is similar to CkLoop but has an additional
+  variable that provides the fraction of iterations that are statically scheduled:
+  \begin{verbatim}
+void CkLoop_ParallelizeHybrid(
+float staticFraction,
+HelperFn func, /* the function that finishes partial work on another thread */
+int paramNum, /* the number of parameters for func */
+void * param, /* the input parameters for the above func */
+int numChunks, /* number of chunks to be partitioned */
+int lowerRange, /* lower range of the loop-like parallelization [lowerRange, upperRange] */
+int upperRange, /* upper range of the loop-like parallelization [lowerRange, upperRange] */
+int sync=1, /* toggle implicit barrier after each parallelized loop */
+void *redResult=NULL, /* the reduction result, ONLY SUPPORT SINGLE VAR of TYPE int/float/double */
+REDUCTION_TYPE type=CKLOOP_NONE /* type of the reduction result */
+CallerFn cfunc=NULL, /* caller PE will call this function before ckloop is done and before starting to work on its chunks */
+int cparamNum=0, void *cparam=NULL /* the input parameters to the above function */
+)
+\end{verbatim}
+
+\end{itemize}
+
+Reduction is supported for type CKLOOP\_INT\_SUM, CKLOOP\_FLOAT\_SUM,
+CKLOOP\_DOUBLE\_SUM. It is recommended to use this mode without reduction.
+
+
 \section{Charm++/Converse Runtime Scheduler Integrated OpenMP}
-The compiler-provided OpenMP runtime library can work with Charm++ but it creates its own thread pool so that Charm++ 
-and OpenMP can have oversubscription problem. The integrated OpenMP runtime library parallelizes OpenMP regions in each chare 
-and runs on the Charm++ runtime without oversubscription. The integrated runtime creates OpenMP user-level threads, which can migrate among PEs within 
+The compiler-provided OpenMP runtime library can work with Charm++ but it creates its own thread pool so that Charm++
+and OpenMP can have oversubscription problem. The integrated OpenMP runtime library parallelizes OpenMP regions in each chare
+and runs on the Charm++ runtime without oversubscription. The integrated runtime creates OpenMP user-level threads, which can migrate among PEs within
 a node. This fine-grained parallelism by the integrated runtime helps resolve load imbalance within a node easily. When PEs become idle, they help other busy PEs within a node via work-stealing.
 \subsection{Instructions to build and use the integrated OpenMP library}
 \subsubsection{Instructions to build}
@@ -118,19 +170,19 @@ The following is the list of compilers which are verified to support this integr
   \item Clang: 3.7 or newer
 \end{itemize}
 
-You can use this integrated OpenMP with clang on IBM Bluegene machines without special compilation flags.  
+You can use this integrated OpenMP with clang on IBM Bluegene machines without special compilation flags.
 (Don't need to add -fopenmp or -openmp on Bluegene clang)
 
-On Linux, the OpenMP supported version of clang has been installed in default recently. For example, 
+On Linux, the OpenMP supported version of clang has been installed in default recently. For example,
 Ubuntu has been released with clang higher than 3.7 since 15.10.
 Depending on which version of clang is installed in your working environments, you should follow additional instructions
-to use this integrated OpenMP with Clang. The following is the instruction to use 
-clang on Ubuntu where the default clang is older than 3.7. If you want to use clang on other Linux 
-distributions, you can use package managers on those Linux distributions to install clang and OpenMP library. 
+to use this integrated OpenMP with Clang. The following is the instruction to use
+clang on Ubuntu where the default clang is older than 3.7. If you want to use clang on other Linux
+distributions, you can use package managers on those Linux distributions to install clang and OpenMP library.
 This installation of clang will add headers for OpenMP environmental routines and allow you to parse the OpenMP directives.
 However, on Ubuntu, the installation of clang doesn't come with its OpenMP runtime library so it results in an error message saying that
 it fails to link the compiler provided OpenMP library. This library is not needed to use the integrated OpenMP runtime but you
-need to avoid this error to succeed compiling your codes. The following is the instruction to avoid the error. 
+need to avoid this error to succeed compiling your codes. The following is the instruction to avoid the error.
 
 \begin{verbatim}
 /* When you want to compile Integrated OpenMP on Ubuntu where the pre-installed clang
@@ -140,13 +192,13 @@ sudo apt-get install clang-3.8 //you can use any version of clang higher than 3.
 sudo ln -svT /usr/bin/clang-3.8 /usr/bin/clang
 sudo ln -svT /usr/bin/clang++-3.8 /usr/bin/clang
 
-$(CHARM_DIR)/build charm++ multicore-linux64 clang omp --with-production -j8 
+$(CHARM_DIR)/build charm++ multicore-linux64 clang omp --with-production -j8
 echo '!<arch>' > $(CHARM_DIR)/lib/libomp.a //Dummy library. This will make you avoid the error message.
 \end{verbatim}
 
-On Mac, the Apple-provided clang installed in default doesn't have OpenMP feature. We're working on the support of 
+On Mac, the Apple-provided clang installed in default doesn't have OpenMP feature. We're working on the support of
 this library on Mac with OpenMP enabled clang which can be downloaded and installed through `Homebrew or MacPorts`.
-Currently, this integrated library is built and compiled on Mac with the normal GCC which can be downloaded and 
+Currently, this integrated library is built and compiled on Mac with the normal GCC which can be downloaded and
 installed via Homebrew and MacPorts. If installed globally, GCC will be accessible by appending the major version
 number and adding it to the invocation of the Charm++ build script. For example:\\
 \begin{verbatim}
@@ -179,25 +231,25 @@ In addition, this library will be supported on Windows in the next release of Ch
 
 \subsubsection{How to use the integrated OpenMP on Charm++}
 
-To use this library on your applications, you have to add `-module OmpCharm' in compile flags 
-to link this library instead of the compiler-provided library in compilers. Without `-module OmpCharm', 
-your application will use the compiler-provided OpenMP library which running on its own separate runtime. 
-(You don't need to add `-fopenmp or -openmp' with gcc and icc. These flags are included 
+To use this library on your applications, you have to add `-module OmpCharm' in compile flags
+to link this library instead of the compiler-provided library in compilers. Without `-module OmpCharm',
+your application will use the compiler-provided OpenMP library which running on its own separate runtime.
+(You don't need to add `-fopenmp or -openmp' with gcc and icc. These flags are included
 in the predefined compile options when you build Charm++ with `omp')
 
-This integrated OpenMP adjusts the number of OpenMP instances on each chare so the number of 
+This integrated OpenMP adjusts the number of OpenMP instances on each chare so the number of
 OpenMP instances can be changed for each OpenMP region over execution.
-If your code shares some data structures among OpenMP instances in a parallel region, you can set the size of 
-the data structures before the start of the OpenMP region with ``omp\_get\_max\_threads()'' 
-and use the data structure within each OpenMP instance with ``omp\_get\_thread\_num()''. 
-After the OpenMP region, you can iterate over the data structure to combine partial results 
-with ``CmiGetCurKnownOmpThreads()''. ``CmiGetCurKnownOmpThreads() returns the number of OpenMP 
+If your code shares some data structures among OpenMP instances in a parallel region, you can set the size of
+the data structures before the start of the OpenMP region with ``omp\_get\_max\_threads()''
+and use the data structure within each OpenMP instance with ``omp\_get\_thread\_num()''.
+After the OpenMP region, you can iterate over the data structure to combine partial results
+with ``CmiGetCurKnownOmpThreads()''. ``CmiGetCurKnownOmpThreads() returns the number of OpenMP
 threads for the latest OpenMP region on the PE where a chare is running.'' The following is an
-example to describe how you can use shared data structures for OpenMP regions on the integrated 
+example to describe how you can use shared data structures for OpenMP regions on the integrated
 OpenMP with Charm++.
 \begin{verbatim}
 /* Maximum possible number of OpenMP threads in the upcoming OpenMP region.
-   Users can restrict this number with 'omp_set_num_threads()' for each chare 
+   Users can restrict this number with 'omp_set_num_threads()' for each chare
    and the environmental variable, 'OMP_NUM_THREADS' for all chares.
    By default, omp_get_max_threads() returns the number of PEs for each logical node.
 */
@@ -205,7 +257,7 @@ int maxAvailableThreads = omp_get_max_threads();
 int *partialResult = new int[maxAvailableThreads]{0};
 
 /* Partial sum for subsets of iterations assigned to each OpenMP thread.
-   The integrated OpenMP runtime decides how many OpenMP threads to create 
+   The integrated OpenMP runtime decides how many OpenMP threads to create
    with some heuristics internally.
 */
 #pragma omp parallel for
@@ -220,8 +272,8 @@ for (int j = 0; j < CmiCurKnownOmpThreads(); j++)
 \end{verbatim}
 
 \subsection{The list of supported pragmas}
-This library is forked from LLVM OpenMP Library supporting OpenMP 4.0. Among many number of 
-directives specified in OpenMP 4.0,  limited set of directives are supported. 
+This library is forked from LLVM OpenMP Library supporting OpenMP 4.0. Among many number of
+directives specified in OpenMP 4.0,  limited set of directives are supported.
 The following list of supported pragmas is verified from the openmp conformance test suite which forked from LLVM OpenMP library and ported to Charm++ program running multiple OpenMP instances on chares. The test suite can be found in \emph{tests/converse/openmp\_test}.
 \begin{verbatim}
 /* omp_<directive>_<clauses> */
@@ -275,10 +327,10 @@ omp_single_private
 \end{verbatim}
 The other directives in OpenMP standard will be supported in the next version.
 
-A simple example using this library can be found in \examplerefdir{openmp}. You can compare ckloop 
-and the integrated OpenMP with this example. You can see that the total execution time of 
-this example with enough big size of problem is faster with OpenMP than CkLoop thanks to 
-load balancing through work-stealing between threads within a node while the execution 
+A simple example using this library can be found in \examplerefdir{openmp}. You can compare ckloop
+and the integrated OpenMP with this example. You can see that the total execution time of
+this example with enough big size of problem is faster with OpenMP than CkLoop thanks to
+load balancing through work-stealing between threads within a node while the execution
 time of each chare can be slower on OpenMP because idle PEs helping busy PEs.
 \section{API to control which PEs participating in CkLoop/OpenMP work}
 User may want certain PE not to be involved in other PE's loop-level parallelization for some cases because it may add latency to works in the PE by helping other PEs. User can enable or disable each PE to participate in the loop-level parallelization through the following API.
diff --git a/examples/charm++/ckloop/dotProd/Makefile b/examples/charm++/ckloop/dotProd/Makefile
new file mode 100644 (file)
index 0000000..1d986dc
--- /dev/null
@@ -0,0 +1,67 @@
+-include ../../../common.mk
+#CHARMC ?=
+USEROPTS= -O3 -g
+#-lpapi
+CHARMDIR=../../../..
+CHARMINC=$(CHARMDIR)/include
+OPTS=-I$(CHARMINC) $(USEROPTS)
+CHARMC=$(CHARMDIR)/bin/charmc $(OPTS)
+CHARMLIB=$(CHARMDIR)/lib
+CXX = ${CHARMC}
+CC = ${CXX}
+DOTPROD = dotProd
+DOTPRODOMP = dotProd_withomp
+
+default: ${DOTPRODOMP}
+
+dotProd: dotProd.o
+       $(CHARMC) -language charm++ -o dotProd dotProd.o -module CkLoop
+#-DPAPI_PROFILING -DCKL_HYBRID
+
+test1: dotProd
+       ./charmrun +p4 dotProd 800 2 0.7 5 ++ppn 2 ++local
+
+test2: dotProd
+       ./charmrun +p4 dotProd 800 2 0.7 5 ++ppn 2 ++local
+
+test3: dotProd
+       ./charmrun +p4 dotProd 800 2 0.7 5 ++ppn 2 ++local
+
+test4: dotProd
+       ./charmrun +p4 dotProd 800 4 0.7 5 ++ppn 2 ++local
+
+test5: dotProd
+       ./charmrun +p6 dotProd 800 2 0.7 5 ++ppn 2 ++local
+
+dotProd_withomp: dotProd_withomp.o
+       $(CHARMC) -language charm++ -o dotProd_withomp dotProd_withomp.o -fopenmp
+
+%.decl.h: %.ci
+       $(CXX) $<
+
+dotProd.decl.h: dotProd.ci
+       $(CHARMC) dotProd.ci
+
+dotProd.o: dotProd.C dotProd.decl.h
+       $(CHARMC) -c dotProd.C
+
+dotProd_withomp.o: dotProd.C dotProd.decl.h
+       $(CHARMC) -c dotProd.C -fopenmp -o dotProd_withomp.o
+
+${DOTPROD}.o: ${DOTPROD}.decl.h
+
+${DOTPRODOMP}.o: ${DOTPROD}.decl.h
+
+module: $(CHARMLIB)/libmoduleCkLoop.a
+
+$(CHARMLIB)/libmoduleCkLoop.a: CkLoop.o
+       $(CHARMC)  -o $(CHARMLIB)/libmoduleCkLoop.a CkLoop.o
+
+CkLoop.decl.h: CkLoop.ci
+       $(CHARMC) CkLoop.ci
+
+CkLoop.o: CkLoop.C CkLoop.decl.h
+       $(CHARMC) -c CkLoop.C
+
+clean:
+       rm -rf $(wildcard *.o *decl.h *def.h charmrun $(DOTPROD) $(DOTPRODOMP))
diff --git a/examples/charm++/ckloop/dotProd/dotProd.C b/examples/charm++/ckloop/dotProd/dotProd.C
new file mode 100644 (file)
index 0000000..43f6f86
--- /dev/null
@@ -0,0 +1,255 @@
+/** Authors: Vivek Kale and Karthik Senthil **/
+/** Description: Charm++ + CkLoopHybrid code to compute dot product of two vectors whose sizes can be given as input by the user. **/
+/** Guide to compiling and running this code:
+
+ To compile the code, you can type:
+
+ $(CHARMC) -language charm++ -o dotProd dotProd.o -module CkLoop
+
+
+ To run the code on 1 physical node with 4 logical PEs per node, you can type:
+
+./charmrun ./dotProd 1 +p4 ++ppn 4 ++local
+
+
+ To run the code with a vector size of 800 and static fraction of .7, you can type:
+
+ ./charmrun +p4 dotProd 800 2 0.7 5 ++ppn 2 ++local
+
+
+ You can also look at the Makefile given to try other experiments.
+
+
+**/
+#include "dotProd.decl.h"
+#include <cstdlib>
+#include "charm++.h"
+#include <math.h>
+
+#include "CkLoopAPI.h"
+
+//#define OMP_HYBRID - uncomment this for runs with OMP_HYBRID
+
+#ifdef OMP_HYBRID
+#include <omp.h>
+#endif
+
+#ifdef PAPI_PROFILING
+#include <papi.h>
+#endif
+
+#define MAX_ITERS 1000000 // TODO: The constant MAX_ITERS ought to be a command-line parameter, but leave for now.
+#define DEFAULT_SIZE 512 // Size of vectors, i.e., global size of the array.
+
+#define DEFAULT_NUM_ELEMS 4 // Default number of chares per node.
+#define DEFAULT_STATIC_FRACTION 0.5 // Size of vectors, i.e., global size of the array.
+#define DEFAULT_CHUNK_SIZE 1 // Size of vectors, i.e., global size of the array.
+#define DEFAULT_OP_NUM 0
+#define DEFAULT_NUM_ITERS 110
+
+#define DEFAULT_NUM_THREADS 4
+
+#define SIZE 512 // Size of vectors, i.e., global size of the array.
+int numElemsPerNode; // This is the number of chares per node, i.e., process.
+int probSize; //Global problem size.
+double staticFraction; // Fraction of loop iterations to be scheduled statically.
+int chunkSize; // Number of iterations in a task for static scheduling portion and dynamic scheduling portion of hybrid static/dynamic scheduling.
+int opNum;
+int numIters;
+int numThreads;
+
+class Main : public CBase_Main {
+public:
+  CProxy_Elem vChunks;
+  int iter = DEFAULT_NUM_ITERS;
+  int warm_up = 10;
+  double t1;
+  Main(CkArgMsg* msg) {
+    probSize = msg->argc > 1 ?  atoi(msg->argv[1]) : DEFAULT_SIZE;
+    numElemsPerNode = msg->argc > 2 ? atoi(msg->argv[2]) : DEFAULT_NUM_ELEMS;
+    if (probSize % numElemsPerNode != 0)
+      CkAbort("Error! probSize not divisible by number of chares per node.\n");
+    // if (numElemsPerNode < CkNumNodes())  // shouldn't need an error check involving numElemsPerNode here.
+    staticFraction = msg->argc > 3 ?  atof(msg->argv[3]) : DEFAULT_STATIC_FRACTION;
+    chunkSize = msg->argc > 4 ?  atoi(msg->argv[4]) : DEFAULT_CHUNK_SIZE;
+    opNum = msg->argc > 5 ?  atoi(msg->argv[5]) : DEFAULT_OP_NUM;
+    iter = msg->argc > 6 ?  atoi(msg->argv[6]) : DEFAULT_NUM_ITERS;
+    delete msg;
+    CkPrintf("Running dot product code with %d iterations.\n", iter);
+#ifdef OMP_HYBRID
+    omp_set_num_threads(omp_get_max_threads());
+#pragma omp parallel
+    {
+      numThreads = omp_get_num_threads();
+      if(numThreads <= 1)
+  {
+    printf("Number of threads is %d. Resetting a number greater than 1. \n", numThreads);
+    omp_set_num_threads(DEFAULT_NUM_THREADS);
+  }
+      printf("Number of threads running on is: %d .\n", numThreads);
+    }
+#else
+    CkLoop_Init(-1); // Run without shared memory of Charm.
+#endif
+    CProxy_rank0BlockMap myMap = CProxy_rank0BlockMap::ckNew();
+    CkArrayOptions opts(numElemsPerNode*CkNumNodes());
+    opts.setMap(myMap);
+    vChunks = CProxy_Elem::ckNew(thisProxy, opts);
+    int ind2 = CkIndex_Main::initializeStructures();
+    CkStartQD(ind2, &thishandle); // Quiescience detection.
+  }
+  void doTests(){vChunks.doDotProduct();}
+  void initializeStructures(){vChunks.doInitVectors(); int ind = CkIndex_Main::doTests(); CkStartQD(ind, &thishandle); } // Quiescience detection.
+  void printResult(float result) {
+    iter--;
+    warm_up--;
+    if (warm_up == 0) {
+      t1 = CkWallTimer();
+      CkPrintf("[main] Started wallclock timer\n");
+    }
+    if(iter == 0)
+      {
+  double t2 = CkWallTimer();
+  //TODO: add print of number of iterations here.
+  CkPrintf("dotP: probSize = %lld \t \t charesPerNode = %d \t staticFraction = %f \t chunkSize=%d \t time = %f \n", probSize, numElemsPerNode, staticFraction, chunkSize, t2-t1);
+  CkPrintf("dotP: result = %f \n", result);
+/*
+  float errorPercent = fabs( 100.0* (result - 6.00*probSize*CkNumNodes())/result);
+        if ( errorPercent > 0.1)
+          CkPrintf("result %f is wrong. shd be: %f\n", result, (6.0*probSize*CkNumNodes()));
+*/
+  CkExit();
+      }
+    else
+      vChunks.doDotProduct();
+  }
+};
+
+extern "C" void initVectors(int start, int end, void * result, int numParams, void* params)
+{
+  float** z = (float**) params;
+  float* v1 = z[0]; float* v2 = z[1];
+  for(int i=start; i<end; i++)
+    {
+      v1[i] = drand48() * 1000.0;
+      v2[i] = drand48() * 1000.0;
+    }
+}
+
+extern "C" void dotP_chunked(int start, int end, void* result, int numParams, void* params)
+{
+  float** z = (float**) params;
+  float* v1 = z[0];  float* v2 = z[1];
+  float x = 0.0;
+  //CkPrintf("DOTP:DEBUG: Executing chunk %d: %d on PE %d \n", start, end, CkMyPe());
+  // Make this easily handle sparse vectors: One way to do this is to use hash tables to represent
+  // non-zeros in sparse vectors. A more complicated example will add Sparse Matrix Vector Multiplication
+  // with CSR format to handle sparsity of the matrix.
+  for(int i=start; i<end; i++)
+    x += v1[i]*v2[i];
+  * ((double*)result) = x;
+  // printf("DOTP:DEBUG: result = %f \t x = %f. \n", *((float*) result), x);
+}
+
+class Elem : public CBase_Elem {
+public:
+  float* a;
+  float* b;
+  CProxy_Main mainProxy;
+  Elem(CProxy_Main _mainProxy) {
+    int value = thisIndex;
+    // CkPrintf("DEBUG: Elem constructor %d on PE: %d \n", thisIndex, CkMyPe());
+    mainProxy = _mainProxy;
+    a = new float[probSize/numElemsPerNode]; // Each array on a node is of the size total problem size divided by number of chares per PE.
+    b = new float[probSize/numElemsPerNode];
+    #ifdef OMP_HYBRID
+
+    #else
+    #endif
+  }
+
+  void doInitVectors()
+  {
+    float* params[2]; // Used for parameters to be passed to initVectors.
+    params[0] = a;
+    params[1] = b;
+    int numberOfChunks= (probSize/numElemsPerNode)/chunkSize;
+    srand48(time(NULL));
+#ifdef OMP_HYBRID
+    float r =0.0;
+#pragma omp parallel
+    {
+      printf("OpenMP implementation: Initializing vectors: The number of threads is %d\n", omp_get_num_threads());
+#pragma omp for
+      for(int i=0; i < (int) (ceil) (probSize/numElemsPerNode); i++)
+  {
+    a[i] = drand48() * 1000.0;
+    b[i] = drand48() * 1000.0;
+  }
+    }
+#else
+    CkLoop_ParallelizeHybrid(1.0, initVectors, 2, params, numberOfChunks, 0, (probSize/numElemsPerNode), 1); // Use static scheduling for initializing array.
+#endif
+  }
+
+  void doDotProduct()
+  {
+    float r = 0.0;
+    int numberOfChunks= (probSize/numElemsPerNode)/chunkSize;
+#ifdef OMP_HYBRID
+#pragma omp parallel
+    {
+
+      if(omp_get_thread_num() == 0)
+  printf("OpenMP implementation: Doing computation: The number of threads is %d\n", omp_get_num_threads());
+#pragma omp for nowait reduction(+:r)
+      for(int i=0; i < (int) (ceil((staticFraction*(probSize/numElemsPerNode)))); i++)
+  r = r + a[i]*b[i];
+#pragma omp for schedule(dynamic,chunkSize) reduction(+:r) // The clause reduction is a hint to LLVM OpenMP to optimize the code for reduct\ion operation.
+      for(int i=(int) ((floor) (staticFraction*(probSize/numElemsPerNode))); i< (int) (probSize/numElemsPerNode); i++)
+  r = r + a[i]*b[i];
+    }
+#else
+    float* params[2]; // Used for parameters to be passed to dotP_chunked.
+    params[0] = a;
+    params[1] = b;
+    // CkPrintf("numberOfChunks = %d \n", numberOfChunks);
+    CkLoop_ParallelizeHybrid(staticFraction, dotP_chunked, 2, params, numberOfChunks, 0, (probSize/numElemsPerNode), 1, &r, CKLOOP_FLOAT_SUM);
+    // CkPrintf("End CkLoop_Parallelize function.\n");
+#endif
+    CkCallback cb(CkReductionTarget(Main, printResult), mainProxy);
+    contribute(sizeof(float), &r, CkReduction::sum_float, cb);
+    // CkPrintf("End contribute with r = %f\n.", r);
+  }
+
+  Elem(CkMigrateMessage*){}
+  float dotP(){
+    float x = 0;
+    for(int i=0; i<probSize/numElemsPerNode; i++)
+      {
+  x += a[i]*b[i];
+      }
+    return x;
+  }
+};
+
+class rank0BlockMap : public CkArrayMap
+{
+public:
+  rank0BlockMap(void) {}
+  rank0BlockMap(CkMigrateMessage *m){}
+  int registerArray(CkArrayIndex& numElements,CkArrayID aid) {
+    return 0;
+  }
+  // Assign chares to rank 0 of each process.
+  int procNum(int /*arrayHdl*/, const CkArrayIndex &idx) {
+    int elem=*(int *)idx.data();
+    int charesPerNode = numElemsPerNode;
+    int nodeNum = (elem/(charesPerNode));
+    int numPEsPerNode = CkNumPes()/CkNumNodes();
+    int penum = nodeNum*numPEsPerNode;
+    //CkPrintf("DEBUG: procNum: Assigning elem index %d to %d\n", elem, penum);
+    return penum;
+  }
+};
+#include "dotProd.def.h"
diff --git a/examples/charm++/ckloop/dotProd/dotProd.ci b/examples/charm++/ckloop/dotProd/dotProd.ci
new file mode 100644 (file)
index 0000000..616ded0
--- /dev/null
@@ -0,0 +1,29 @@
+mainmodule dotProd {
+  readonly int probSize;
+  readonly int numElemsPerNode;
+  readonly double staticFraction;
+  readonly int chunkSize;
+  readonly int opNum;
+
+  mainchare Main {
+
+    entry Main(CkArgMsg*);
+    entry [reductiontarget] void printResult(float result);
+    entry void doTests();
+    entry void initializeStructures();
+
+  }; // entry method can be called remotely.
+
+  array [1D] Elem {
+    entry Elem(CProxy_Main mainProxy);
+    entry void dotP();
+    entry void doDotProduct();
+    entry void doInitVectors();
+
+  };
+
+  group rank0BlockMap
+  {
+    entry rank0BlockMap();
+  }
+};
index d5f5b9dc9a8c9d5a3e8e0e3e963572e24cef4bc2..ba4cf3c3a7ecd142458c99888c5125652964204b 100644 (file)
@@ -217,7 +217,7 @@ void CkArrayMap::populateInitial(int arrayHdl,CkArrayOptions& options,void *ctor
         /* The CkArrayIndex is supposed to have at most 3 dimensions, which
            means that all the fields are ints, and numElements.nInts represents
            how many of them are used */
-        CKARRAYMAP_POPULATE_INITIAL(procNum(arrayHdl,idx)==thisPe);
+        CKARRAYMAP_POPULATE_INITIAL(CMK_RANK_0(procNum(arrayHdl,idx))==thisPe);
 
 #if CMK_BIGSIM_CHARM
         BgEntrySplit("split-array-new-end");
index 0c3ccdabfbef15c0d39c46bbf1a93b2276922d1b..08a73b5deeb220e4c16009f09f426c36ab23fcb1 100644 (file)
@@ -267,6 +267,16 @@ enum CkElementCreation_t : uint8_t {
 typedef void (*CkLocFn)(CkArray *,void *,CkLocRec *,CkArrayIndex *);
 #endif
 
+// Returns rank 0 for a pe for drone mode
+#if CMK_DRONE_MODE
+#define CMK_RANK_0(pe) ({\
+  CkNodeOf(pe)*CkNodeSize(0);\
+})
+#else
+  #define CMK_RANK_0(pe) ({\
+  pe;\
+})
+#endif
 
 /**
  * A group which manages the location of an indexed set of
@@ -297,14 +307,14 @@ typedef std::unordered_map<CmiUInt8, CkLocRec*> LocRecHash;
 
 //Interface used by external users:
        /// Home mapping
-       inline int homePe(const CkArrayIndex &idx) const {return map->homePe(mapHandle,idx);}
+       inline int homePe(const CkArrayIndex &idx) const {return CMK_RANK_0(map->homePe(mapHandle,idx));}
         inline int homePe(const CmiUInt8 id) const {
           if (compressor)
-            return homePe(compressor->decompress(id));
+            return CMK_RANK_0(homePe(compressor->decompress(id)));
 
-          return id >> 24;
+          return CMK_RANK_0(id >> 24);
         }
-       inline int procNum(const CkArrayIndex &idx) const {return map->procNum(mapHandle,idx);}
+       inline int procNum(const CkArrayIndex &idx) const {return CMK_RANK_0(map->procNum(mapHandle,idx));}
        inline bool isHome (const CkArrayIndex &idx) const {return (bool)(homePe(idx)==CkMyPe());}
   int whichPE(const CkArrayIndex &idx) const;
   int whichPE(const CmiUInt8 id) const;
index 0203331fdda0e5387f5ce181c5c05749ab0f2602..81112593dc0173af02895c660db184706dceb228 100644 (file)
@@ -1178,6 +1178,12 @@ void CentralLB::ProcessReceiveMigration()
     MigrateInfo& move = m->moves[i];
     const int me = CkMyPe();
     if (move.from_pe == me && move.to_pe != me) {
+#if CMK_DRONE_MODE
+      int to_pe_rank0 = CMK_RANK_0(move.to_pe);
+      if(move.from_pe == to_pe_rank0) continue;
+      move.to_pe = to_pe_rank0;
+#endif
+
       DEBUGF(("[%d] migrating object to %d\n",move.from_pe,move.to_pe));
       // migrate object, in case it is already gone, inform toPe
 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
@@ -1201,6 +1207,10 @@ void CentralLB::ProcessReceiveMigration()
             }
 #endif
     } else if (move.from_pe != me && move.to_pe == me) {
+#if CMK_DRONE_MODE
+      int to_pe_rank0 = CMK_RANK_0(move.to_pe);
+      if(me != to_pe_rank0) continue;
+#endif
        DEBUGF(("[%d] expecting object from %d\n",move.to_pe,move.from_pe));
       if (!move.async_arrival) migrates_expected++;
       else future_migrates_expected++;
index 0f7dfa6966d45d83bc3ada172d17066129b4f2d7..26d2e95da63a8b8d64c6389c6d3d220bb44a68de 100644 (file)
@@ -1809,10 +1809,12 @@ void *CsdNextMessage(CsdSchedulerState_t *s) {
        }
 #endif
 #if CMK_SMP && CMK_TASKQUEUE
+#if CMK_OMP
        msg = CmiSuspendedTaskPop();
        if (msg != NULL) {
          return (msg);
        }
+#endif
        msg = TaskQueuePop((TaskQueue)s->taskQ);
        if (msg != NULL) {
          return (msg);
@@ -2252,7 +2254,7 @@ void CthSchedInit()
   CpvInitialize(CthThread, CthSleepingStandins);
   CpvInitialize(int      , CthResumeNormalThreadIdx);
   CpvInitialize(int      , CthResumeSchedulingThreadIdx);
-#if CMK_SMP && CMK_TASKQUEUE
+#if CMK_OMP
   CpvInitialize(int      , CthResumeStealableThreadIdx);
   CpvInitialize(int      , CthResumeSuspendedStealableThreadIdx);
 #endif
index fbc689a5bbb82040001527e4bc9e799ac42a68be..aac5af6054e6d6063a214af38d46ed8034611a63 100644 (file)
@@ -60,7 +60,7 @@ static void *ndhThreadWork(void *id) {
 
     //further improvement of this affinity setting!!
     int myPhyRank = (myId+mainHelperPhyRank)%numPhysicalPEs;
-    //printf("thread[%d]: affixed to rank %d\n", myId, myPhyRank);
+    //CkPrintf("thread[%d]: affixed to rank %d\n", myId, myPhyRank);
     myPhyRank = myId;
     CmiSetCPUAffinity(myPhyRank);
 
@@ -73,7 +73,7 @@ static void *ndhThreadWork(void *id) {
     __sync_add_and_fetch(&gCrtCnt, 1);
 
     while (1) {
-        //printf("thread[%ld]: on core %d with main %d\n", myId, HelperOnCore(), mainHelperPhyRank);
+        //CkPrintf("thread[%ld]: on core %d with main %d\n", myId, HelperOnCore(), mainHelperPhyRank);
         if (exitFlag) break;
         pthread_mutex_lock(&thdLock);
         pthread_cond_wait(&thdCondition, &thdLock);
@@ -140,6 +140,8 @@ void FuncCkLoop::exit() {
 /* Note: Those event ids should be unique globally!! */
 #define CKLOOP_TOTAL_WORK_EVENTID  139
 #define CKLOOP_FINISH_SIGNAL_EVENTID 143
+#define CKLOOP_STATIC_CHUNK_WORK 998
+#define CKLOOP_DYNAMIC_CHUNK_WORK 999
 
 static FuncCkLoop *globalCkLoop = NULL;
 
@@ -318,7 +320,7 @@ void FuncCkLoop::parallelizeFunc(HelperFn func, int paramNum, void * param,
                 one->ptr = (void *)curLoop;
                 envelope *env = UsrToEnv(one);
                 env->setObjPtr(thisHelper->ckGetChareID().objPtr);
-                //printf("[%d] sending a msg %p (env=%p) to [%d]\n", CmiMyRank(), one, env, i);
+                //CkPrintf("[%d] sending a msg %p (env=%p) to [%d]\n", CmiMyRank(), one, env, i);
                 CmiPushPE(i, (void *)(env));
             }
             for (int i=0; i<CmiMyRank(); i++) {
@@ -327,7 +329,7 @@ void FuncCkLoop::parallelizeFunc(HelperFn func, int paramNum, void * param,
                 one->ptr = (void *)curLoop;
                 envelope *env = UsrToEnv(one);
                 env->setObjPtr(thisHelper->ckGetChareID().objPtr);
-                //printf("[%d] sending a msg %p (env=%p) to [%d]\n", CmiMyRank(), one, env, i);
+                //CkPrintf("[%d] sending a msg %p (env=%p) to [%d]\n", CmiMyRank(), one, env, i);
                 CmiPushPE(i, (void *)(env));
             }
         }
@@ -360,19 +362,123 @@ void FuncCkLoop::parallelizeFunc(HelperFn func, int paramNum, void * param,
     if(curLoop) curLoop->stealWork();
     TRACE_BRACKET(CKLOOP_TOTAL_WORK_EVENTID);
 
-    //printf("[%d]: parallelize func %p with [%d ~ %d] divided into %d chunks using loop=%p\n", CkMyPe(), func, lowerRange, upperRange, numChunks, curLoop);
+    //CkPrintf("[%d]: parallelize func %p with [%d ~ %d] divided into %d chunks using loop=%p\n", CkMyPe(), func, lowerRange, upperRange, numChunks, curLoop);
 
     TRACE_START(CKLOOP_FINISH_SIGNAL_EVENTID);
     curLoop->waitLoopDone(sync);
     TRACE_BRACKET(CKLOOP_FINISH_SIGNAL_EVENTID);
 
-    //printf("[%d]: finished parallelize func %p with [%d ~ %d] divided into %d chunks using loop=%p\n", CkMyPe(), func, lowerRange, upperRange, numChunks, curLoop);
+    //CkPrintf("[%d]: finished parallelize func %p with [%d ~ %d] divided into %d chunks using loop=%p\n", CkMyPe(), func, lowerRange, upperRange, numChunks, curLoop);
 
     if (type!=CKLOOP_NONE)
         reduce(curLoop->getRedBufs(), redResult, type, numChunks);
     return;
 }
 
+CpvStaticDeclare(int, chunkHandler);
+CpvStaticDeclare(int, hybridHandler);
+
+void FuncCkLoop::parallelizeFuncHybrid(float staticFraction, HelperFn func, int paramNum, void * param,
+                                     int numChunks, int lowerRange,
+                                     int upperRange, int sync,
+                                     void *redResult, REDUCTION_TYPE type,
+                                     CallerFn cfunc,
+                                     int cparamNum, void * cparam) {
+  double _start; //may be used for tracing
+  if (numChunks > MAX_CHUNKS) {
+    numChunks = MAX_CHUNKS;
+  }
+
+#ifdef CMK_PAPI_PROFILING
+  int num_hwcntrs = 2;
+  int Events[2] = {PAPI_L2_DCM, PAPI_L3_DCM}; // Obtain L1 and L2 data cache misses.
+  long_long values[2];
+#endif
+
+#ifdef CMK_PAPI_PROFILING
+  if (PAPI_start_counters(Events, num_hwcntrs) != PAPI_OK) CkPrintf("Error with creating event set \n");
+#endif
+
+  //for using nodequeue
+  TRACE_START(CKLOOP_TOTAL_WORK_EVENTID);
+  /*
+ CkPrintf("debug [%d]:, CkMyRank=%d: funchybrid. \n", CkMyPe(), CkMyRank());
+  FuncSingleHelper *thisHelper = helperPtr[CkMyRank()];
+#if ALLOW_MULTIPLE_UNSYNC
+  CkPrintf("debug: %d: funchybrid. 1b thisHelper: %d \n", CkMyPe(), (long) thisHelper);
+      ConverseNotifyMsg *notifyMsg = thisHelper->getNotifyMsg();
+CkPrintf("ptr %d\n", (long) notifyMsg->ptr);
+#else
+  CkPrintf("debug: %d: funchybrid. 1c \n", CkMyPe());
+      ConverseNotifyMsg *notifyMsg = thisHelper->notifyMsg;
+#endif
+  CkPrintf("debug: %d: funchybrid. 2 \n", CkMyPe());
+      curLoop = (CurLoopInfo *)(notifyMsg->ptr);
+  */
+
+  //curLoop = new CurLoopInfo(FuncCkLoop::MAX_CHUNKS);
+
+  CurLoopInfo* curLoop = new CurLoopInfo(numHelpers);
+  curLoop->set(numChunks, func, lowerRange, upperRange, paramNum, param);
+  curLoop->setStaticFraction(staticFraction);
+  curLoop->setReductionType(type);
+  void** redBufs = curLoop->getRedBufs();
+  if(type == CKLOOP_INT_SUM)
+    for(int i=0; i<numHelpers; i++)
+      *((int*)redBufs[i]) = 0;
+  else if((type == CKLOOP_DOUBLE_SUM) || (type == CKLOOP_DOUBLE_MAX))
+    for(int i=0; i<numHelpers; i++)
+      *((double*)redBufs[i]) = 0.0;
+  else if(type == CKLOOP_FLOAT_SUM)
+    for(int i=0; i<numHelpers; i++)
+      *((float*)redBufs[i]) = 0.0;
+  LoopChunkMsg* msg = new LoopChunkMsg;
+  msg->loopRec = curLoop;
+  CmiSetHandler(msg, CpvAccess(hybridHandler));
+  for (int i=1; i<numHelpers; i++) {
+    CmiPushPE(i, (void*)msg); // New work coming, send message to other ranks to notify the ranks.
+  }
+  // Call the function on the caller PE before it starts working on chunks
+  if (cfunc != NULL) {
+    cfunc(cparamNum, cparam); //user code
+  }
+  curLoop->doWorkForMyPe();
+  // Rank 0 processor continues dequeuing its dynamic chunks from its own task queue.
+#if CMK_SMP && CMK_TASKQUEUE
+  while(1) {
+      void* msg = TaskQueuePop((TaskQueue)CpvAccess(CsdTaskQueue));
+      if (msg == NULL) break;
+      CmiHandleMessage(msg);
+  }
+#endif
+  // TODO: Should core 0 steal?
+  // If so, do randomized steals in a loop until all chunks are done.
+  curLoop->waitLoopDoneHybrid(1);
+  // NOTE: use 1 in parameter of function waitLoopDone to force exit.
+
+  // CkPrintf("DEBUG: Exiting loop : numChunks = %d \t numStaticChunksCompleted = %d \t numDynamicChunksFired = %d \t numDynamicChunksCompleted = %d \t \n", numChunks, curLoop->numStaticChunksCompleted, curLoop->numDynamicChunksFired, curLoop->numDynamicChunksCompleted);
+
+  TRACE_BRACKET(CKLOOP_TOTAL_WORK_EVENTID);
+
+#ifdef CMK_PAPI_PROFILING
+ if (PAPI_stop_counters(values, num_hwcntrs) != PAPI_OK)   CkPrintf("Error with stopping counters!\n");
+#endif
+
+#ifdef CMK_PAPI_PROFILING
+    if (PAPI_read_counters(values, num_hwcntrs) != PAPI_OK)  CkPrintf("Error with reading counters!\n");
+#endif
+
+  //CkPrintf("[%d]: parallelize func %p with [%d ~ %d] divided into %d chunks using loop=%p\n", CkMyPe(), func, lowerRange, upperRange, numChunks, curLoop);
+  TRACE_START(CKLOOP_FINISH_SIGNAL_EVENTID);
+  TRACE_BRACKET(CKLOOP_FINISH_SIGNAL_EVENTID);
+  //CkPrintf("[%d]: finished parallelize func %p with [%d ~ %d] divided into %d chunks using loop=%p\n", CkMyPe(), func, lowerRange, upperRange, numChunks, curLoop);
+  if (type!=CKLOOP_NONE)
+    reduce(curLoop->getRedBufs(), redResult, type, numHelpers);
+
+  delete curLoop;
+  delete msg;
+}
+
 #define COMPUTE_REDUCTION(T) {\
     for(int i=0; i<numChunks; i++) {\
      result += *((T *)(redBufs[i])); \
@@ -449,6 +555,13 @@ static int _ckloopEP;
 CpvStaticDeclare(int, NdhStealWorkHandler);
 static void RegisterCkLoopHdlrs() {
     CpvInitialize(int, NdhStealWorkHandler);
+
+    // The following four lines are for the hybrid static/dynamic scheduler.
+    CpvInitialize(int, hybridHandler);
+    CpvAccess(hybridHandler) = CmiRegisterHandler((CmiHandler)hybridHandlerFunc);
+    CpvInitialize(int, chunkHandler);
+    CpvAccess(chunkHandler) = CmiRegisterHandler((CmiHandler)executeChunk);
+
 #if CMK_TRACE_ENABLED
     CpvInitialize(envelope*, dummyEnv);
     CpvAccess(dummyEnv) = envelope::alloc(ForChareMsg,0,0); //Msgtype is the same as the one used for TRACE_BEGIN_EXECUTED_DETAILED
@@ -678,6 +791,89 @@ void CkLoop_Exit(CProxy_FuncCkLoop ckLoop) {
     ckLoop.exit();
 }
 
+void hybridHandlerFunc(LoopChunkMsg *msg)
+{
+  CurLoopInfo* loop = msg->loopRec;
+  loop->doWorkForMyPe(); // Do or enqueue work for the current loop for my PE.
+}
+
+void CurLoopInfo::doWorkForMyPe() {
+  int numHelpers = CmiMyNodeSize();
+  int myRank = CmiMyRank();
+  if (upperIndex-lowerIndex < numHelpers)
+    numHelpers = upperIndex-lowerIndex;
+  int myStaticBegin = lowerIndex + myRank*(upperIndex - lowerIndex)/numHelpers;
+
+  int myDynamicBegin = myStaticBegin + ((upperIndex - lowerIndex)/numHelpers)*staticFraction;
+  int lastDynamic = lowerIndex + (myRank+1)*(upperIndex - lowerIndex)/numHelpers;
+  if(lastDynamic > upperIndex) lastDynamic = upperIndex; // for the last PE.
+
+  int i, j;
+
+  // TODO: make numChunks smaller as needed.
+
+  chunkSize = (upperIndex - lowerIndex)/numChunks;
+  if(chunkSize == 0) chunkSize = 1;
+  LoopChunkMsg* msgBlock = new LoopChunkMsg[1 + (lastDynamic - myDynamicBegin)/chunkSize];
+  // TODO: msgBlock should be freed when the whole CkLoop loop is done and all stealers have finished using it.
+
+  // Enqueue dynamic work first, because before a PE starts work on static, other PE's should be ready to steal from its dynamic.
+  // TODO: the order of enqueues should be reversed since the task queue is run as a stack.
+  /* for (i=myDynamicBegin, j=0; i<lastDynamic; i+=chunkSize, j++)
+   {
+     LoopChunkMsg* msg = (LoopChunkMsg*)(&(msgBlock[j]));
+     msg->startIndex = i;
+      msg->endIndex = i + chunkSize > lastDynamic ? lastDynamic : i+chunkSize;
+      msg->loopRec = this;
+      CmiSetHandler(msg, CpvAccess(chunkHandler));
+      CsdTaskEnqueue(msg);
+      } */
+
+  //TODO: test with    400  404
+
+  // TODO: size : 402 / 404 4 threads.
+  // TODO: size:
+#if CMK_SMP && CMK_TASKQUEUE
+  for (i=lastDynamic, j=0; i>myDynamicBegin; i-=chunkSize, j++)
+    {
+      LoopChunkMsg* msg = (LoopChunkMsg*)(&(msgBlock[j]));
+      //  msg->startIndex = i;
+      // msg->endIndex = i + chunkSize > lastDynamic ? lastDynamic : i+chunkSize;
+      msg->endIndex = i;
+      msg->startIndex = i - chunkSize  < myDynamicBegin ? myDynamicBegin  :  i - chunkSize;
+      msg->loopRec = this;
+      CmiSetHandler(msg, CpvAccess(chunkHandler));
+      CsdTaskEnqueue(msg);
+    }
+#endif
+
+  double _start; //may be used for tracing
+  TRACE_START(CKLOOP_STATIC_CHUNK_WORK);
+  // do PE's static part
+  double x = 0.0;
+  fnPtr(myStaticBegin, myDynamicBegin, (void*) &x, paramNum, param);
+  TRACE_BRACKET(CKLOOP_STATIC_CHUNK_WORK);
+  // TODO: the code block below may not be needed since the hybrid scheduler doesn't use finishFlag.
+  /* int tmp  = (myDynamicBegin - myStaticBegin)/chunkSize;
+     finishFlag+= tmp; */
+
+  //CkPrintf("DEBUG: chunk [%d:\t %d] function returned %f for reduction\n" , myStaticBegin, myDynamicBegin, x);
+
+  localReduce(x, type);
+  numDynamicChunksFired += j;
+  numStaticRegionsCompleted++;
+}
+
+void executeChunk(LoopChunkMsg *msg) {
+  double _start; //may be used for tracing
+  TRACE_START(CKLOOP_DYNAMIC_CHUNK_WORK);
+  // This is the function executed when a task is dequeued from the task queue in the hybrid scheduler.
+  CurLoopInfo* linfo = msg->loopRec;
+  linfo->runChunk(msg->startIndex, msg->endIndex);
+  TRACE_BRACKET(CKLOOP_DYNAMIC_CHUNK_WORK);
+  /* free(msg); */ // Free not needed since we are using a block allocation in doWorkForMyPe().
+}
+
 void CkLoop_Parallelize(HelperFn func,
                             int paramNum, void * param,
                             int numChunks, int lowerRange, int upperRange,
@@ -690,6 +886,25 @@ void CkLoop_Parallelize(HelperFn func,
         upperRange, sync, redResult, type, cfunc, cparamNum, cparam);
 }
 
+void CkLoop_ParallelizeHybrid(float staticFraction,
+             HelperFn func,
+             int paramNum, void * param,
+             int numChunks, int lowerRange, int upperRange,
+             int sync,
+             void *redResult, REDUCTION_TYPE type,
+             CallerFn cfunc,
+             int cparamNum, void* cparam) {
+#if CMK_SMP && CMK_TASKQUEUE
+  if (0 != CkMyRank()) CkAbort("CkLoop_ParallelizeHybrid() must be called from rank 0 PE on a node.\n");
+  if (numChunks > upperRange - lowerRange + 1) numChunks = upperRange - lowerRange + 1;
+  // Not doing anything with loop history for now.
+  globalCkLoop->parallelizeFuncHybrid(staticFraction, func, paramNum, param, numChunks, lowerRange, upperRange, sync, redResult, type, cfunc, cparamNum, cparam);
+#else
+  globalCkLoop->parallelizeFunc(func, paramNum, param, numChunks, lowerRange,
+        upperRange, sync, redResult, type, cfunc, cparamNum, cparam);
+#endif
+}
+
 void CkLoop_SetSchedPolicy(CkLoop_sched schedPolicy) {
   globalCkLoop->setSchedPolicy(schedPolicy);
   std::atomic_thread_fence(std::memory_order_release);
index 3e6532487bfa1e5929b37d2c08d1e6bf7be19e90..50437f5ba3e1f97faad65bcbbd49df096089dc1f 100644 (file)
@@ -2,6 +2,8 @@
 #define _CKLOOP_H
 #include <assert.h>
 
+#include "converse.h"
+#include "taskqueue.h"
 #include "charm++.h"
 #include "CkLoopAPI.h"
 #include <atomic>
@@ -31,8 +33,11 @@ class CurLoopInfo {
     friend class FuncSingleHelper;
 
 private:
+    float staticFraction;
     std::atomic<int> curChunkIdx;
     int numChunks;
+    int chunkSize;
+    REDUCTION_TYPE type; // only used in hybrid mode
     HelperFn fnPtr;
     int lowerIndex;
     int upperIndex;
@@ -48,6 +53,11 @@ private:
     //this tag is needed to prevent other helpers to run the old task
     std::atomic<int> inited;
 
+    // For Hybrid mode:
+    std::atomic<int> numStaticRegionsCompleted{0};
+    std::atomic<int> numDynamicChunksCompleted{0};
+    std::atomic<int> numDynamicChunksFired{0};
+
 public:
     CurLoopInfo(int maxChunks):numChunks(0),fnPtr(NULL), lowerIndex(-1), upperIndex(0),
             paramNum(0), param(NULL), curChunkIdx(-1), finishFlag(0), redBufs(NULL), bufSpace(NULL), inited(0) {
@@ -84,6 +94,54 @@ public:
         CmiUnlock(loop_info_inited_lock);
     }
 
+    void setReductionType(REDUCTION_TYPE p) {
+      type = p;
+    }
+
+    void setStaticFraction(float _staticFraction) {
+      staticFraction = _staticFraction;
+    }
+
+    #define LOCALSUM(T) *((T*) redBufs[CmiMyRank()]) += (T) x;
+
+    void localReduce(double x, REDUCTION_TYPE type) {
+      // TODO: add more data types here
+      switch(type)
+        {
+        case CKLOOP_INT_SUM: {
+          LOCALSUM(int)
+        break;
+        }
+        case CKLOOP_FLOAT_SUM: {
+          LOCALSUM(float)
+        break;
+        }
+        case CKLOOP_DOUBLE_SUM: {
+          LOCALSUM(double)
+        break;
+        }
+        case CKLOOP_DOUBLE_MAX: {
+          if( *((double *)(redBufs[CmiMyRank()])) < x ) *((double *)(redBufs[CmiMyRank()])) = x;
+          break;
+        }
+        default:
+          break;
+        }
+    }
+
+     //This function is called from hybrid scheduler functions.
+     void runChunk(int sInd, int eInd) {
+       int myRank = CmiMyRank();
+       int numHelpers = CmiMyNodeSize();
+       int nextPesStaticBegin = lowerIndex + (myRank+1)*(upperIndex - lowerIndex)/numHelpers;
+       double x;  // Just allocating an 8-byte scalar.
+       fnPtr(sInd, eInd, (void*) &x, paramNum, param); // Calling user's function to do one chunk of iterations.
+
+       // "Add" *x to *(redBufs[myRank]). The meaning of "Add" depends on the type.
+       localReduce(x, type);
+       numDynamicChunksCompleted++;
+     }
+
     void waitLoopDone(int sync) {
         //while(!__sync_bool_compare_and_swap(&finishFlag, numChunks, 0));
         if (sync) while (finishFlag.load(std::memory_order_relaxed)!=numChunks);
@@ -93,9 +151,26 @@ public:
         inited = 0;
         CmiUnlock(loop_info_inited_lock);
     }
+
+    void waitLoopDoneHybrid(int sync) {
+        int count = 0;
+        int numHelpers = CmiMyNodeSize();
+        if (sync)
+        while ((numStaticRegionsCompleted != numHelpers) || (numDynamicChunksCompleted != numDynamicChunksFired))
+        {
+            // debug print in case the function is stuck in an infinite loop.
+            // count++; if ((count % 100000) == 0);
+            // printf("DEBUG: nsrc= %d \t ndcf = %d \t ndcc = %d \n" , (int) numStaticRegionsCompleted, (int) numDynamicChunksFired, (int) numDynamicChunksCompleted);
+        };
+        CmiLock(loop_info_inited_lock);
+        inited = 0;
+        CmiUnlock(loop_info_inited_lock);
+    }
+
     int getNextChunkIdx() {
         return curChunkIdx.fetch_add(1, std::memory_order_relaxed) + 1;
     }
+
     void reportFinished(int counter) {
         if (counter==0) return;
         finishFlag.fetch_add(counter, std::memory_order_release);
@@ -111,8 +186,18 @@ public:
     }
 
     void stealWork();
+    void doWorkForMyPe();
 };
 
+// To be used for hybridHandler and chunkHandler.
+typedef struct loopChunkMsg
+{
+  char hdr[CmiMsgHeaderSizeBytes];
+  CurLoopInfo* loopRec;
+  int startIndex;
+  int endIndex;
+} LoopChunkMsg;
+
 /* FuncCkLoop is a nodegroup object */
 
 typedef enum CkLoop_queueID { NODE_Q=0, PE_Q} CkLoop_queueID;
@@ -199,12 +284,24 @@ public:
                          CallerFn cfunc=NULL, /* the caller PE will call this function before starting to work on the chunks */
                          int cparamNum=0, void* cparam=NULL /* the input parameters to the above function */
                         );
+    void parallelizeFuncHybrid(float sf,
+               HelperFn func, /* the function that finishes a partial work on another thread */
+               int paramNum, void * param, /* the input parameters for the above func */
+               int numChunks, /* number of chunks to be partitioned. Note that some of the chunks may be subsumed into a large static section. */
+               int lowerRange, int upperRange, /* the loop-like parallelization happens in [lowerRange, upperRange] */
+               int sync=1, /* whether the flow will continue until all chunks have finished */
+               void *redResult=NULL, REDUCTION_TYPE type=CKLOOP_NONE, /* the reduction result, ONLY SUPPORT SINGLE VAR of TYPE int/float/double */
+               CallerFn cfunc=NULL, /* the caller PE will call this function before starting to work on the chunks */
+               int cparamNum=0, void* cparam=NULL /* the input parameters to the above function */
+               );
     void destroyHelpers();
     void reduce(void **redBufs, void *redBuf, REDUCTION_TYPE type, int numChunks);
     void pup(PUP::er &p);
 };
 
+void executeChunk(LoopChunkMsg* msg);
 void SingleHelperStealWork(ConverseNotifyMsg *msg);
+void hybridHandlerFunc(LoopChunkMsg *msg);
 
 /* FuncSingleHelper is a chare located on every core of a node */
 //allowing arbitrary combination of sync and unsync parallelizd loops
index be566cbb76d316e50f2ebe4117cb00eee6b698a4..f3a6eeba42b491b951775a530aa8534aba49463a 100644 (file)
@@ -27,7 +27,7 @@ class CProxy_FuncCkLoop;
 extern CProxy_FuncCkLoop CkLoop_Init(int numThreads=0);
 
 /* used to free resources if using the library in non-SMP mode. It should be called on just one PE, say PE 0 */
-extern void CkLoop_Exit(CProxy_FuncCkLoop ckLoop); 
+extern void CkLoop_Exit(CProxy_FuncCkLoop ckLoop);
 
 extern void CkLoop_Parallelize(
     HelperFn func, /* the function that finishes a partial work on another thread */
@@ -40,6 +40,19 @@ extern void CkLoop_Parallelize(
     int cparamNum=0, void *cparam=NULL /* the input parameters to the above function */
 );
 
+extern void CkLoop_ParallelizeHybrid(
+    float staticFraction,
+    HelperFn func, /* the function that finishes a partial work on another thread */
+    int paramNum, void * param, /* the input parameters for the above func */
+    int numChunks, /* number of chunks to be partitioned */
+    int lowerRange, int upperRange, /* the loop-like parallelization happens in [lowerRange, upperRange] */
+    int sync=1, /* whether the flow will continue unless all chunks have finished */
+    void *redResult=NULL, REDUCTION_TYPE type=CKLOOP_NONE, /* the reduction result, ONLY SUPPORT SINGLE VAR of TYPE int/float/double */
+    CallerFn cfunc=NULL, /* caller PE will call this function before ckloop is done and before starting to work on its chunks */
+    int cparamNum=0, void *cparam=NULL /* the input parameters to the above function */
+);
+
+
 extern void CkLoop_SetSchedPolicy(CkLoop_sched schedPolicy);
 
 extern void CkLoop_DestroyHelpers();
index affe10f5db972c861d96f02d839ae02b3c83a67f..afba205399eadc3b51e77c419cb396f580540dac 100644 (file)
@@ -191,6 +191,38 @@ else
 fi
 
 
+# enable task queue
+AC_ARG_ENABLE([task_queue],
+            [AS_HELP_STRING([--enable-task-queue],
+              [enable task queue])],
+            [enable_task_queue=$enableval],
+            [enable_task_queue=no])
+
+if test "$enable_task_queue" = "no"
+then
+  Echo "Task Queue is disabled"
+  AC_DEFINE_UNQUOTED(CMK_TASKQUEUE, 0, [disable task queue])
+else
+  Echo "Task Queue is enabled"
+  AC_DEFINE_UNQUOTED(CMK_TASKQUEUE, 1, [enable task queue])
+fi
+
+# enable drone mode
+AC_ARG_ENABLE([drone_mode],
+            [AS_HELP_STRING([--enable-drone-mode],
+              [enable drone mode])],
+            [enable_drone_mode=$enableval],
+            [enable_drone_mode=no])
+
+if test "$enable_drone_mode" = "no"
+then
+  Echo "Drone mode is disabled"
+  AC_DEFINE_UNQUOTED(CMK_DRONE_MODE, 0, [disable drone mode])
+else
+  Echo "Drone mode is enabled"
+  AC_DEFINE_UNQUOTED(CMK_DRONE_MODE, 1, [enable drone mode])
+fi
+
 AC_ARG_ENABLE([charmdebug],
             [AS_HELP_STRING([--enable-charmdebug],
               [enable charmDebug])], ,