Subsections

5 . TRAM

5 . 1 Overview

Topological Routing and Aggregation Module is a library for optimization of many-to-many and all-to-all collective communication patterns in Charm++ applications. The library performs topological routing and aggregation of network communication in the context of a virtual grid topology comprising the Charm++ Processing Elements (PEs) in the parallel run. The number of dimensions and their sizes within this topology are specified by the user when initializing an instance of the library. TRAM is implemented as a Charm++ group, so an instance of TRAM has one object on every PE used in the run. We use the term local instance to denote a member of the TRAM group on a particular PE. Most collective communication patterns involve sending linear arrays of a single data type. In order to more efficiently aggregate and process data, TRAM restricts the data sent using the library to a single data type specified by the user through a template parameter when initializing an instance of the library. We use the term data item to denote a single object of this datatype submitted to the library for sending. While the library is active (i.e. after initialization and before termination), an arbitrary number of data items can be submitted to the library at each PE. On systems with an underlying grid or torus network topology, it can be beneficial to configure the virtual topology for TRAM to match the physical topology of the network. This can easily be accomplished using the Charm++ Topology Manager. The next two sections explain the routing and aggregation techniques used in the library.

5 . 1 . 1 Routing

Let the variables $ j$ and $ k$ denote PEs within an N-dimensional virtual topology of PEs and $ x$ denote a dimension of the grid. We represent the coordinates of $ j$ and $ k$ within the grid as $ \left
(j_0, j_1, \ldots, j_{N-1} \right) $ and $ \left (k_0, k_1, \ldots,
k_{N-1} \right) $ . Also, let

\begin{displaymath}
f(x, j, k) =
\begin{cases}
0, & \text{if } j_x = k_x \\
1, & \text{if } j_x \ne k_x
\end{cases}\end{displaymath}

$ j$ and $ k$ are peers if

$\displaystyle \sum_{d=0}^{N-1} f(d, j, k) = 1 .$

When using TRAM, PEs communicate directly only with their peers. Sending to a PE which is not a peer is handled inside the library by routing the data through one or more intermediate destinations along the route to the final destination . Suppose a data item destined for PE $ k$ is submitted to the library at PE $ j$ . If $ k$ is a peer of $ j$ , the data item will be sent directly to $ k$ , possibly along with other data items for which $ k$ is the final or intermediate destination. If $ k$ is not a peer of $ j$ , the data item will be sent to an intermediate destination $ m$ along the route to $ k$ whose index is $ \left (j_0, j_1, \ldots, j_{i-1}, k_i,
j_{i+1}, \ldots, j_{N-1} \right)$ , where $ i$ is the greatest value of $ x$ for which $ f(x, j, k) = 1$ . Note that in obtaining the coordinates of $ m$ from $ j$ , exactly one of the coordinates of $ j$ which differs from the coordinates of $ k$ is made to agree with $ k$ . It follows that m is a peer of $ j$ , and that using this routing process at $ m$ and every subsequent intermediate destination along the route eventually leads to the data item being received at $ k$ . Consequently, the number of messages $ F(j, k)$ that will carry the data item to the destination is

$\displaystyle F(j,k) = \sum_{d=0}^{N-1}f(d, j, k) .$

5 . 1 . 2 Aggregation

Communicating over the network of a parallel machine involves per message bandwidth and processing overhead. TRAM amortizes this overhead by aggregating data items at the source and every intermediate destination along the route to the final destination. Every local instance of the TRAM group buffers the data items that have been submitted locally or received from another PE for forwarding. Because only peers communicate directly in the virtual grid, it suffices to have a single buffer per PE for every peer. Given a dimension d within the virtual topology, let $ s_d$ denote its size , or the number of distinct values a coordinate for dimension d can take. Consequently, each local instance allocates up to $ s_d - 1 $ buffers per dimension, for a total of $ \sum_{d=0}^{N-1} (s_d - 1) $ buffers. Note that this is normally significantly less than the total number of PEs specified by the virtual topology, which is equal to $ \prod_{d=0}^{N-1}
{s_d}$ . Sending with TRAM is done by submitting a data item and a destination identifier, either PE or array index, using a function call to the local instance. If the index belongs to a peer, the library places the data item in the buffer for the peer's PE. Otherwise, the library calculates the index of the intermediate destination using the previously described algorithm, and places the data item in the buffer for the resulting PE, which by design is always a peer of the local PE. Buffers are sent out immediately when they become full. When a message is received at an intermediate destination, the data items comprising it are distributed into the appropriate buffers for subsequent sending. In the process, if a data item is determined to have reached its final destination, it is immediately delivered. The total buffering capacity specified by the user may be reached even when no single buffer is completely filled up. In that case the buffer with the greatest number of buffered data items is sent.

5 . 2 Application User Interface

A typical usage scenario for TRAM involves a start-up phase followed by one or more communication steps . We next describe the application user interface and details relevant to usage of the library, which normally follows these steps:

  1. Start-up Creation of a TRAM group and set up of client arrays and groups
  2. Initialization Calling an initialization function, which returns through a callback
  3. Sending An arbitrary number of sends using the insertData function call on the local instance of the library
  4. Receiving Processing received data items through the process function which serves as the delivery interface for the library and must be defined by the user
  5. Termination Termination of a communication step
  6. Re-initialization After termination of a communication step, the library instance is not active. However, re-initialization using step $ 2$ leads to a new communication step.

5 . 2 . 1 Start-Up

Start-up is typically performed once in a program, often inside the main function of the mainchare, and involves creating an aggregator instance. An instance of TRAM is restricted to sending data items of a single user-specified type, which we denote by dtype , to a single user-specified chare array or group.

5 . 2 . 1 . 1 Sending to a Group

To use TRAM for sending to a group, a GroupMeshStreamer group should be created. Either of the following two GroupMeshStreamer constructors can be used for that purpose:


 template<class dtype, class ClientType, class RouterType>

GroupMeshStreamer<dtype, ClientType, RouterType>::

GroupMeshStreamer(int maxNumDataItemsBuffered,
                  int numDimensions,
                  int *dimensionSizes,
                  CkGroupID clientGID,
                  bool yieldFlag = 0,
                  double progressPeriodInMs = -1.0);


template<class dtype, class ClientType, class RouterType>

GroupMeshStreamer<dtype, ClientType, RouterType>::

GroupMeshStreamer(int numDimensions,
                  int *dimensionSizes,
                  CkGroupID clientGID,
                  int bufferSize,
                  bool yieldFlag = 0,
                  double progressPeriodInMs = -1.0);


5 . 2 . 1 . 2 Sending to a Chare Array

For sending to a chare array, an ArrayMeshStreamer group should be created, which has a similar constructor interface to GroupMeshStreamer :


 template <class dtype, class itype, class ClientType,
          class RouterType>

ArrayMeshStreamer<dtype, itype, ClientType, RouterType>::

ArrayMeshStreamer(int maxNumDataItemsBuffered,
                  int numDimensions,
                  int *dimensionSizes,
                  CkArrayID clientAID,
                  bool yieldFlag = 0,
                  double progressPeriodInMs = -1.0);


template <class dtype, class itype, class ClientType,
          class RouterType>

ArrayMeshStreamer<dtype, itype, ClientType, RouterType>::

ArrayMeshStreamer(int numDimensions,
                  int *dimensionSizes,
                  CkArrayID clientAID,
                  int bufferSize,
                  bool yieldFlag = 0,
                  double progressPeriodInMs = -1.0);


Description of parameters:

Template parameters:

5 . 2 . 2 Initialization

A TRAM instance needs to be initialized before every communication step. There are currently three main modes of operation, depending on the type of termination used: staged completion , completion detection , or quiescence detection . The modes of termination are described later. Here, we present the interface for initializing a communication step for each of the three modes.

When using completion detection, each local instance of TRAM must be initialized using the following variant of the overloaded init function:


 template <class dtype, class RouterType>

void MeshStreamer<dtype, RouterType>::

init(int numContributors,
     CkCallback startCb,
     CkCallback endCb,
     CProxy_CompletionDetector detector,
     int prio,
     bool usePeriodicFlushing);

Description of parameters:

When using staged completion, a completion detector object is not required as input, as the library performs its own specialized form of termination. In this case, each local instance of TRAM must be initialized using a different interface for the overloaded init function:


 template <class dtype, class RouterType>

void MeshStreamer<dtype, RouterType>::

init(int numLocalContributors,
     CkCallback startCb,
     CkCallback endCb,
     int prio,
     bool usePeriodicFlushing);


Note that numLocalContributors denotes the local number of done calls expected, rather than the global as in the first interface of init .

A common case is to have a single chare array perform all the sends in a communication step, with each element of the array as a contributor. For this case there is a special version of init that takes as input the CkArrayID object for the chare array that will perform the sends, precluding the need to manually determine the number of client chares per PE:


 template <class dtype, class RouterType>

void MeshStreamer<dtype, RouterType>::

init(CkArrayID senderArrayID,
     CkCallback startCb,
     CkCallback endCb,
     int prio,
     bool usePeriodicFlushing);

The init interface for using quiescence detection is:


 template <class dtype, class RouterType>

void MeshStreamer<dtype, RouterType>::init(CkCallback startCb,
                                           int prio);

After initialization is finished, the system invokes startCb , signalling to the user that the library is ready to accept data items for sending.

5 . 2 . 3 Sending

Sending with TRAM is done through calls to insertData and broadcast .


 template <class dtype, class RouterType>

void MeshStreamer<dtype, RouterType>::

insertData(const dtype& dataItem,
           int destinationPe);


template <class dtype, class itype, class ClientType,
          class RouterType>

void ArrayMeshStreamer<dtype, itype, ClientType, RouterType>::

insertData(const dtype& dataItem,
           itype arrayIndex);


template <class dtype, class RouterType>

void MeshStreamer<dtype, RouterType>::

broadcast(const dtype& dataItem);

Broadcasting has the effect of delivering the data item:

5 . 2 . 4 Receiving

To receive data items sent using TRAM, the user must define the process function for each client group and array:


 void process(const dtype  &ran);

Each item is delivered by the library using a separate call to process on the destination PE. The call is made locally, so process should not be an entry method.


5 . 2 . 5 Termination

Flushing and termination mechanisms are used in TRAM to prevent deadlock due to indefinite buffering of items. Flushing works by sending out all buffers in a local instance if no items have been submitted or received since the last progress check. Meanwhile, termination detection is used to send out partially filled buffers at the end of a communication step after it has been determined that no additional items will be submitted.

Currently, three means of termination are supported: staged completion, completion detection, and quiescence detection. Periodic flushing is a secondary mechanism which can be enabled or disabled when initiating one of the primary mechanisms.

Termination typically requires the user to issue a number of calls to the done function:


 template <class dtype, class RouterType>

void MeshStreamer<dtype, RouterType>::

done(int numContributorsFinished = 1);

When using completion detection, the number of done calls that are expected globally by the TRAM instance is specified using the numContributors parameter to init . Safe termination requires that no calls to insertData or broadcast are made after the last call to done is performed globally. Because order of execution is uncertain in parallel applications, some care is required to ensure the above condition is met. A simple way to terminate safely is to set numContributors equal to the number of senders, and call done once for each sender that is done submitting items.

In contrast to using completion detection, using staged completion involves setting the local number of expected calls to done using the numLocalContributors parameter in the init function. To ensure safe termination, no insertData or broadcast calls should be made on any PE where done has been called the expected number of times.

Another version of init for staged completion, which takes a CkArrayID object as an argument, provides a simplified interface in the common case when a single chare array performs all the sends within a communication step, with each of its elements as a contributor. For this version of init , TRAM determines the appropriate number of local contributors automatically. It also correctly handles the case of PEs without any contributors by immediately marking those PEs as having finished the communication step. As such, this version of init should be preferred by the user when applicable.

Staged completion is not supported when array location data is not guaranteed to be correct, as this can potentially violate the termination conditions used to guarantee successful termination. In order to guarantee correct location data in applications that use load balancing, Charm++ must be compiled with -DCMK GLOBAL LOCATION UPDATE , which has the effect of performing a global broadcast of location data for chare array elements that migrate during load balancing. Unfortunately, this operation is expensive when migrating large numbers of elements. As an alternative, completion detection and quiescence detection modes will work properly without the global location update mechanism, and even in the case of anytime migration.

When using quiescence detection, no end callback is used, and no done calls are required. Instead, termination of a communication step is achieved using the quiescence detection framework in Charm++ , which supports passing a callback as parameter. TRAM is set up such that quiescence will not be detected until all items sent in the current communication step have been delivered to their final destinations.

The choice of which termination mechanism to use is left to the user. Using completion detection mode is more convenient when the global number of contributors is known, while staged completion is easier to use if the local number of contributors can be determined with ease, or if sending is done from the elements of a chare array. If either mode can be used with ease, staged completion should be preferred. Unlike the other mechanisms, staged completion does not involve persistent background communication to determine when the global number of expected done calls is reached. Staged completion is also generally faster at reaching termination due to not being dependent on periodic progress checks. Unlike completion detection, staged completion does incur a small bandwidth overhead ( $ 4$ bytes) for every TRAM message, but in practice this is more than offset by the persistent traffic incurred by completion detection.

Periodic flushing is an auxiliary mechanism which checks at a regular interval whether any sends have taken place since the last time the check was performed. If not, the mechanism sends out all the data items buffered per local instance of the library. The period is specified by the user in the TRAM constructor. A typical use case for periodic flushing is when the submission of a data item B to TRAM happens as a result of the delivery of another data item A sent using the same TRAM instance. If A is buffered inside the library and insufficient data items are submitted to cause the buffer holding A to be sent out, a deadlock could arise. With the periodic flushing mechanism, the buffer holding A is guaranteed to be sent out eventually, and deadlock is prevented. Periodic flushing is required when using the completion detection or quiescence detection termination modes.

5 . 2 . 6 Re-initialization

A TRAM instance that has terminated cannot be used for sending more data items until it has been re-initialized. Re-initialization is achieved by calling init , which prepares the instance of the library for a new communication step. Re-initialization is useful for iterative applications, where it is often convenient to have a single communication step per iteration of the application.

5 . 2 . 7 Charm++ Registration of Templated Classes

Due to the use of templates in TRAM, the library template instances must be explicitly registered with the Charm++ runtime by the user of the library. This must be done in the .ci file for the application, and typically involves three steps.

For GroupMeshStreamer template instances, registration is done as follows:

For ArrayMeshStreamer template instances, registration is done as follows:

5 . 3 New Interface

In addition to the above method of instantiating and using TRAM, a new interface has been designed in which entry method tags can be used to automate aggregation of messages. In order to use the new interface, three things must be done:

5 . 4 Example

For example code showing how to use TRAM, see examples/charm++/TRAM and tests/charm++/streamingAllToAll in the Charm++ repository.