NDMeshStreamer: refactored MeshStreamer into an abstract class;
authorLukasz Wesolowski <wesolwsk@illinois.edu>
Mon, 20 Feb 2012 02:21:38 +0000 (20:21 -0600)
committerLukasz Wesolowski <wesolwsk@illinois.edu>
Mon, 20 Feb 2012 02:21:38 +0000 (20:21 -0600)
implemented GroupMeshStreamer - a concrete implementation of MeshStreamer

concrete implementations correspond to the different

src/libs/ck-libs/NDMeshStreamer/NDMeshStreamer.ci
src/libs/ck-libs/NDMeshStreamer/NDMeshStreamer.h

index eefcfdcc192df9d357058371e5ceda5fe0228cc2..4043b7ae88a48774120ffd529b659d8b81063602 100644 (file)
@@ -12,13 +12,18 @@ module NDMeshStreamer {
 
   template<class dtype> 
   group MeshStreamer {
-    entry MeshStreamer(int totalBufferCapacity, int numDimensions, 
-                      int dimensionSizes[numDimensions], 
-                      const CProxy_MeshStreamerClient<dtype> &clientProxy, 
-                       int yieldFlag = 0, double progressPeriodInMs = -1.0);   
     entry void receiveAlongRoute(MeshStreamerMessage<dtype> *msg);
     entry void flushDirect();
     entry void finish(CkReductionMsg *msg);
   };
 
+  template<class dtype>
+  group GroupMeshStreamer : MeshStreamer<dtype> {
+    entry GroupMeshStreamer(int totalBufferCapacity, int numDimensions, 
+                           int dimensionSizes[numDimensions], 
+                           const CProxy_MeshStreamerClient<dtype> &clientProxy,
+                           int yieldFlag = 0, 
+                           double progressPeriodInMs = -1.0);
+  };
 };
+
index 9dee88842c2841703a1a445afa7891efd83fb611..32cf73b46d3bc11b70a0ec22270d38f597208ddf 100644 (file)
 // #define CACHE_LOCATIONS
 // #define SUPPORT_INCOMPLETE_MESH
 
-typedef struct {
+struct MeshLocation {
   int dimension; 
   int bufferIndex; 
-} MeshLocation;
+}
 
 template<class dtype>
 class MeshStreamerMessage : public CMessage_MeshStreamerMessage<dtype> {
@@ -55,15 +55,12 @@ private:
     int totalBufferCapacity_;
     int numDataItemsBuffered_;
 
-    int numNodes_; 
+    int numMembers_; 
     int numDimensions_;
     int *individualDimensionSizes_;
     int *combinedDimensionSizes_;
 
-    CProxy_MeshStreamerClient<dtype> clientProxy_;
-    MeshStreamerClient<dtype> *clientObj_;
-
-    int myNodeIndex_;
+    int myIndex_;
     int *myLocationIndex_;
 
     CkCallback   userCallback_;
@@ -81,27 +78,24 @@ private:
     bool *isCached; 
 #endif
 
-    /*
-#ifdef SUPPORT_INCOMPLETE_MESH
-    int numNodesInLastPlane_;
-    int numFullRowsInLastPlane_;
-    int numColumnsInLastRow_;
-#endif
-    */
-
     MeshLocation determineLocation(int destinationPe);
 
     void storeMessage(int destinationPe, 
                      const MeshLocation &destinationCoordinates, 
                      const dtype &dataItem);
 
+    virtual void deliverToDestination(
+                 int destinationPe, 
+                 MeshStreamerMessage<dtype> *destinationBuffer) = 0;
+
+    virtual void localDeliver(dtype &dataItem) = 0; 
+
     void flushLargestBuffer();
 
 public:
 
-    MeshStreamer(int totalBufferCapacity, int numDimensions,
-                int *dimensionSizes, 
-                const CProxy_MeshStreamerClient<dtype> &clientProxy,
+    MeshStreamer(int totalBufferCapacity, int numDimensions, 
+                int *dimensionSizes,
                  int yieldFlag = 0, double progressPeriodInMs = -1.0);
     ~MeshStreamer();
 
@@ -114,7 +108,7 @@ public:
     bool isPeriodicFlushEnabled() {
       return isPeriodicFlushEnabled_;
     }
-    void insertData(dtype &dataItem, int destinationPe); 
+    virtual void insertData(dtype &dataItem, int destinationPe); 
     void doneInserting();
     void associateCallback(CkCallback &cb, bool automaticFinish = true) { 
       userCallback_ = cb;
@@ -138,8 +132,8 @@ template <class dtype>
 void MeshStreamerClient<dtype>::receiveCombinedData(
                                 MeshStreamerMessage<dtype> *msg) {
   for (int i = 0; i < msg->numDataItems; i++) {
-     dtype data = ((dtype*)(msg->data))[i];
-     process(data);
+    dtype data = msg->getDataItem(i);
+    process(data);
   }
   delete msg;
 }
@@ -148,7 +142,6 @@ template <class dtype>
 MeshStreamer<dtype>::MeshStreamer(
                     int totalBufferCapacity, int numDimensions, 
                     int *dimensionSizes, 
-                     const CProxy_MeshStreamerClient<dtype> &clientProxy,
                     int yieldFlag, 
                      double progressPeriodInMs)
  :numDimensions_(numDimensions), 
@@ -184,24 +177,20 @@ MeshStreamer<dtype>::MeshStreamer(
   }
   totalBufferCapacity_ = totalBufferCapacity;
   numDataItemsBuffered_ = 0; 
-  numNodes_ = CkNumPes(); 
-  clientProxy_ = clientProxy; 
-  clientObj_ = ((MeshStreamerClient<dtype> *)CkLocalBranch(clientProxy_));
+  numMembers_ = CkNumPes(); 
 
   dataBuffers_ = new MeshStreamerMessage<dtype> **[numDimensions_]; 
   for (int i = 0; i < numDimensions; i++) {
-    int numNodesAlongDimension = individualDimensionSizes_[i]; 
-    dataBuffers_[i] = new MeshStreamerMessage<dtype> *[numNodesAlongDimension];
+    int numMembersAlongDimension = individualDimensionSizes_[i]; 
+    dataBuffers_[i] = new MeshStreamerMessage<dtype> *[numMembersAlongDimension];
 
-    for (int j = 0; j < numNodesAlongDimension; j++) {
+    for (int j = 0; j < numMembersAlongDimension; j++) {
       dataBuffers_[i][j] = NULL;
     }
   }
 
-  // determine location indices for this node
-  myNodeIndex_ = CkMyPe();
-
-  int remainder = myNodeIndex_;
+  myIndex_ = CkMyPe();
+  int remainder = myIndex_;
   for (int i = numDimensions_ - 1; i >= 0; i--) {    
     myLocationIndex_[i] = remainder / combinedDimensionSizes_[i];
     remainder -= combinedDimensionSizes_[i] * myLocationIndex_[i];
@@ -210,19 +199,11 @@ MeshStreamer<dtype>::MeshStreamer(
   isPeriodicFlushEnabled_ = false; 
 
 #ifdef CACHE_LOCATIONS
-  cachedLocations = new MeshLocation[numNodes_];
-  isCached = new bool[numNodes_];
-  std::fill(isCached, isCached + numNodes_, false);
+  cachedLocations = new MeshLocation[numMembers_];
+  isCached = new bool[numMembers_];
+  std::fill(isCached, isCached + numMembers_, false);
 #endif
 
-  /*
-#ifdef SUPPORT_INCOMPLETE_MESH
-  numNodesInLastPlane_ = numNodes_ % planeSize_; 
-  numFullRowsInLastPlane_ = numNodesInLastPlane_ / numColumns_;
-  numColumnsInLastRow_ = numNodesInLastPlane_ - 
-    numFullRowsInLastPlane_ * numColumns_;  
-#endif
-  */
 }
 
 template <class dtype>
@@ -322,12 +303,12 @@ void MeshStreamer<dtype>::storeMessage(
 
     int destinationIndex;
 
-    destinationIndex = myNodeIndex_ + 
+    destinationIndex = myIndex_ + 
       (bufferIndex - myLocationIndex_[dimension]) * 
       combinedDimensionSizes_[dimension];
 
     if (dimension == 0) {
-      clientProxy_[destinationIndex].receiveCombinedData(destinationBuffer);      
+      deliverToDestination(destinationIndex, destinationBuffer);
     }
     else {
       this->thisProxy[destinationIndex].receiveAlongRoute(destinationBuffer);
@@ -356,7 +337,7 @@ void MeshStreamer<dtype>::insertData(dtype &dataItem, int destinationPe) {
   static int count = 0;
 
   if (destinationPe == CkMyPe()) {
-    clientObj_->process(dataItem);
+    localDeliver(dataItem);
     return;
   }
 
@@ -402,7 +383,7 @@ void MeshStreamer<dtype>::receiveAlongRoute(MeshStreamerMessage<dtype> *msg) {
     dtype &dataItem = msg->getDataItem(i);
     destinationLocation = determineLocation(destinationPe);
     if (destinationPe == CkMyPe()) {
-      clientObj_->process(dataItem);
+      localDeliver(dataItem);
     }
     else {
       storeMessage(destinationPe, destinationLocation, dataItem);   
@@ -439,7 +420,7 @@ void MeshStreamer<dtype>::flushLargestBuffer() {
 
       messageBuffers = dataBuffers_[flushDimension]; 
       destinationBuffer = messageBuffers[flushIndex];
-      destinationIndex = myNodeIndex_ + 
+      destinationIndex = myIndex_ + 
        (flushIndex - myLocationIndex_[flushDimension]) * 
        combinedDimensionSizes_[flushDimension] ;
 
@@ -452,7 +433,7 @@ void MeshStreamer<dtype>::flushLargestBuffer() {
       numDataItemsBuffered_ -= destinationBuffer->numDataItems;
 
       if (flushDimension == 0) {
-       clientProxy_[destinationIndex].receiveCombinedData(destinationBuffer);
+        deliverToDestination(destinationIndex, destinationBuffer);
       }
       else {
        this->thisProxy[destinationIndex].receiveAlongRoute(destinationBuffer);
@@ -484,8 +465,8 @@ void MeshStreamer<dtype>::flushAllBuffers() {
       numDataItemsBuffered_ -= messageBuffers[j]->numDataItems;
 
       if (i == 0) {
-       int destinationPe = myNodeIndex_ + j - myLocationIndex_[i];
-       clientProxy_[destinationPe].receiveCombinedData(messageBuffers[j]);
+       int destinationPe = myIndex_ + j - myLocationIndex_[i];
+        deliverToDestination(destinationPe, messageBuffers[j]);
       }         
       else {
 
@@ -499,7 +480,7 @@ void MeshStreamer<dtype>::flushAllBuffers() {
          int destinationPe = messageBuffers[j]->destinationPes[k]; 
          dtype &dataItem = messageBuffers[j]->getDataItem(k);   
          directMsg->addDataItem(dataItem);
-         clientProxy_[destinationPe].receiveCombinedData(directMsg);
+          deliverToDestination(destinationPe,directMsg);
        }
        delete messageBuffers[j];
       }
@@ -545,6 +526,58 @@ void MeshStreamer<dtype>::registerPeriodicProgressFunction() {
 }
 
 
+template <class dtype>
+class GroupMeshStreamer : public MeshStreamer<dtype> {
+private:
+
+  CProxy_MeshStreamerClient<dtype> clientProxy_;
+  MeshStreamerClient<dtype> *clientObj_;
+
+  void deliverToDestination(int destinationPe, 
+                            MeshStreamerMessage<dtype> *destinationBuffer) {
+    clientProxy_[destinationPe].receiveCombinedData(destinationBuffer);
+  }
+
+  void localDeliver(dtype &dataItem) {
+    clientObj_->process(dataItem);
+  }
+
+public:
+
+  GroupMeshStreamer(int totalBufferCapacity, int numDimensions,
+                   int *dimensionSizes, 
+                   const CProxy_MeshStreamerClient<dtype> &clientProxy,
+                   int yieldFlag = 0, double progressPeriodInMs = -1.0)
+   :MeshStreamer<dtype>(totalBufferCapacity, numDimensions, dimensionSizes, 
+                         yieldFlag, progressPeriodInMs) 
+  {
+    clientProxy_ = clientProxy; 
+    clientObj_ = ((MeshStreamerClient<dtype> *)CkLocalBranch(clientProxy_));
+  }
+
+
+};
+
+/*
+
+template<class dtype>
+struct ArrayDataItem{
+  int arrayIndex;
+  dtype dataItem;
+};
+
+template <class dtype>
+class ArrayMeshStreamer : public CBase_ArrayMeshStreamer<dtype>,
+                          public MeshStreamer<ArrayDataItem<dtype> > {
+private:
+
+public:
+
+  void insertData(dtype &dataItem, int arrayIndex);
+
+};
+*/
+
 #define CK_TEMPLATES_ONLY
 #include "NDMeshStreamer.def.h"
 #undef CK_TEMPLATES_ONLY