Subsections

17 . Collectives


17 . 1 Reduction Clients

After the data is reduced, it is passed to you via a callback object, as described in section  11 . The message passed to the callback is of type CkReductionMsg . Unlike typed reductions briefed in Section  4.6 , here we discuss callbacks that take CkReductionMsg* argument. The important members of CkReductionMsg are getSize() , which returns the number of bytes of reduction data; and getData() , which returns a ``void *'' to the actual reduced data. The callback to be invoked when the reduction is complete is specified as an additional parameter to contribute . It is an error for chare array elements to specify different callbacks to the same reduction contribution.

     double forces[2]=get_my_forces();
    // When done, broadcast the CkReductionMsg to ``myReductionEntry''
    CkCallback cb(CkIndex_myArrayType::myReductionEntry(NULL), thisProxy);
    contribute(2*sizeof(double), forces,CkReduction::sum_double, cb);

In the case of the reduced version used for synchronization purposes, the callback parameter will be the only input parameter:

     CkCallback cb(CkIndex_myArrayType::myReductionEntry(NULL), thisProxy);
    contribute(cb);

and the corresponding callback function:


 void myReductionEntry(CkReductionMsg *msg)
{
  int reducedArrSize=msg->getSize() / sizeof(double);
  double *output=(double *) msg->getData();
  for(int i=0 ; i<reducedArrSize ; i++)
  {
   // Do something with the reduction results in each output[i] array element
   .
   .
   .
  }
  delete msg;
}

(See examples/charm++/reductions/simple_reduction for a complete example).

If the target of a reduction is an entry method defined by a when clause in SDAG(Section  5 ), one may wish to set a reference number (or tag) that SDAG can use to match the resulting reduction message. To set the tag on a reduction message, call the CkCallback::setRefNum(CMK_REFNUM_TYPE refnum) method on the callback passed to the contribute() call.

17 . 2 Defining a New Reduction Type

It is possible to define a new type of reduction, performing a user-defined operation on user-defined data. This is done by creating a reduction function , which combines separate contributions into a single combined value.

The input to a reduction function is a list of CkReductionMsg s. A CkReductionMsg is a thin wrapper around a buffer of untyped data to be reduced. The output of a reduction function is a single CkReductionMsg containing the reduced data, which you should create using the CkReductionMsg::buildNew(int nBytes,const void *data) method.

Thus every reduction function has the prototype:


 CkReductionMsg *reductionFn(int nMsg,CkReductionMsg **msgs);

For example, a reduction function to add up contributions consisting of two machine short int s would be:


 CkReductionMsg *sumTwoShorts(int nMsg,CkReductionMsg **msgs)
{
  //Sum starts off at zero
  short ret[2]=0,0;
  for (int i=0;i<nMsg;i++) {
    //Sanity check:
    CkAssert(msgs[i]->getSize()==2*sizeof(short));
    //Extract this message's data
    short *m=(short *)msgs[i]->getData();
    ret[0]+=m[0];
    ret[1]+=m[1];
  }
  return CkReductionMsg::buildNew(2*sizeof(short),ret);
}

The reduction function must be registered with Charm++ using CkReduction::addReducer(reducerFn fn, bool streamable) from an initnode routine (see section  9.1 for details on the initnode mechanism). It takes a required parameter, reducerFn fn , a function pointer to the reduction function, and an optional parameter bool streamable , which indicates if the function is streamable or not (see section  17.2.1 for more information). CkReduction::addReducer returns a CkReduction::reducerType which you can later pass to contribute . Since initnode routines are executed once on every node, you can safely store the CkReduction::reducerType in a global or class-static variable. For the example above, the reduction function is registered and used in the following manner:


 //In the .ci file:
  initnode void registerSumTwoShorts(void);

//In some .C file:
/*global*/ CkReduction::reducerType sumTwoShortsType;
/*initnode*/ void registerSumTwoShorts(void)
{
  sumTwoShortsType=CkReduction::addReducer(sumTwoShorts);
}

//In some member function, contribute data to the customized reduction:
  short data[2]=...;
  contribute(2*sizeof(short),data,sumTwoShortsType);

Note that typed reductions briefed in Section  4.6 can also be used for custom reductions. The target reduction client can be declared as in Section  4.6 but the reduction functions will be defined as explained above.
Note that you cannot call CkReduction::addReducer from anywhere but an initnode routine.
(See examples/charm++/barnes-charm for a complete example).

17 . 2 . 1 Streamable Reductions

For custom reductions over fixed sized messages, it is often desirable that the runtime process each contribution in a streaming fashion, i.e. as soon as a contribution is received from a chare array element, that data should be combined with the current aggregation of other contributions on that PE. This results in a smaller memory footprint because contributions are immediately combined as they come in rather than waiting for all contributions to be received. Users can write their own custom streamable reducers by reusing the message memory of the zeroth message in their reducer function by passing it as the last argument to CkReduction::buildNew :


 CkReductionMsg *sumTwoShorts(int nMsg,CkReductionMsg **msgs)
{
  // reuse msgs[0]'s memory:
  short *retData = (short*)msgs[0]->getData();
  for (int i=1;i<nMsg;i++) {
    //Sanity check:
    CkAssert(msgs[i]->getSize()==2*sizeof(short));
    //Extract this message's data
    short *m=(short *)msgs[i]->getData();
    retData[0]+=m[0];
    retData[1]+=m[1];
  }
  return CkReductionMsg::buildNew(2*sizeof(short), retData, sumTwoShortsReducer, msgs[0]);
}

Note that only message zero is allowed to be reused. For reducer functions that do not operate on fixed sized messages, such as set and concat, streaming would result in quadratic memory allocation and so is not desirable. Users can specify that a custom reducer is streamable when calling CkReduction::addReducer by specifying an optional boolean parameter (default is false):


 static void initNodeFn(void) 
    sumTwoShorts = CkReduction::addReducer(sumTwoShorts, /* streamable = */ true);