NDMeshStreamer: added GroupMeshStreamer and ArrayMeshStreamer constructors
authorLukasz Wesolowski <wesolwsk@illinois.edu>
Mon, 18 Jun 2012 22:44:58 +0000 (17:44 -0500)
committerLukasz Wesolowski <wesolwsk@illinois.edu>
Mon, 18 Jun 2012 22:44:58 +0000 (17:44 -0500)
that accept buffer size, rather than total buffering limit

src/libs/ck-libs/NDMeshStreamer/NDMeshStreamer.ci
src/libs/ck-libs/NDMeshStreamer/NDMeshStreamer.h

index 359044df63aed0508032a14169526385a239c210..0a6781d59d81ced377662400d3f653f419d28664 100644 (file)
@@ -37,6 +37,12 @@ module NDMeshStreamer {
          int dimensionSizes[numDimensions], 
          const CProxy_MeshStreamerGroupClient<dtype> &clientProxy,
          bool yieldFlag = 0, double progressPeriodInMs = -1.0);
+
+    entry GroupMeshStreamer(
+          int numDimensions, int dimensionSizes[numDimensions], 
+         const CProxy_MeshStreamerGroupClient<dtype> &clientProxy,
+         int bufferSize, bool yieldFlag = 0, 
+         double progressPeriodInMs = -1.0);
   };
 
   template<class dtype, class itype>
@@ -46,6 +52,11 @@ module NDMeshStreamer {
          int dimensionSizes[numDimensions],
          const CProxy_MeshStreamerArrayClient<dtype> &clientProxy,
          bool yieldFlag = 0, double progressPeriodInMs = -1.0);
+    entry ArrayMeshStreamer(
+         int numDimensions, int dimensionSizes[numDimensions],
+         const CProxy_MeshStreamerArrayClient<dtype> &clientProxy,
+         int bufferSize, bool yieldFlag = 0, 
+         double progressPeriodInMs = -1.0);
   };
 
 };
index 9c6505500d74f90b3969f9951617a2dfd64b1c0c..c999f8d09ec6cb37777304a67242823437f0484c 100644 (file)
@@ -172,7 +172,7 @@ protected:
 public:
 
   MeshStreamer(int maxNumDataItemsBuffered, int numDimensions, 
-               int *dimensionSizes,
+               int *dimensionSizes, int bufferSize,
                bool yieldFlag = 0, double progressPeriodInMs = -1.0);
   ~MeshStreamer();
 
@@ -270,12 +270,14 @@ template <class dtype>
 MeshStreamer<dtype>::MeshStreamer(
                     int maxNumDataItemsBuffered, int numDimensions, 
                     int *dimensionSizes, 
+                     int bufferSize,
                     bool yieldFlag, 
                      double progressPeriodInMs)
  :numDimensions_(numDimensions), 
   maxNumDataItemsBuffered_(maxNumDataItemsBuffered), 
   yieldFlag_(yieldFlag), 
-  progressPeriodInMs_(progressPeriodInMs)
+  progressPeriodInMs_(progressPeriodInMs), 
+  bufferSize_(bufferSize)
 {
 
   int sumAlongAllDimensions = 0;   
@@ -292,10 +294,19 @@ MeshStreamer<dtype>::MeshStreamer(
       combinedDimensionSizes_[i] * individualDimensionSizes_[i];
   }
 
-  // except for personalized messages, the buffers for dimensions with the 
-  //   same index as the sender's are not used
-  bufferSize_ = OVERALLOCATION_FACTOR * maxNumDataItemsBuffered_ 
-    / (sumAlongAllDimensions - numDimensions_ + 1); 
+  // a bufferSize input of 0 indicates it should be calculated by the library
+  if (bufferSize_ == 0) {
+    CkAssert(maxNumDataItemsBuffered_ > 0);
+    // except for personalized messages, the buffers for dimensions with the 
+    //   same index as the sender's are not used
+    bufferSize_ = OVERALLOCATION_FACTOR * maxNumDataItemsBuffered_ 
+      / (sumAlongAllDimensions - numDimensions_ + 1); 
+  }
+  else {
+    maxNumDataItemsBuffered_ = 
+      bufferSize_ * (sumAlongAllDimensions - numDimensions_ + 1);
+  }
+
   if (bufferSize_ <= 0) {
     bufferSize_ = 1; 
     CkPrintf("Argument maxNumDataItemsBuffered to MeshStreamer constructor "
@@ -940,8 +951,20 @@ public:
                    int *dimensionSizes, 
                    const CProxy_MeshStreamerGroupClient<dtype> &clientProxy,
                    bool yieldFlag = 0, double progressPeriodInMs = -1.0)
-   :MeshStreamer<dtype>(maxNumDataItemsBuffered, numDimensions, dimensionSizes, 
-                         yieldFlag, progressPeriodInMs) 
+   :MeshStreamer<dtype>(maxNumDataItemsBuffered, numDimensions, dimensionSizes,
+                        0, yieldFlag, progressPeriodInMs) 
+  {
+    clientProxy_ = clientProxy; 
+    clientObj_ = 
+      ((MeshStreamerGroupClient<dtype> *)CkLocalBranch(clientProxy_));
+  }
+
+  GroupMeshStreamer(int numDimensions, int *dimensionSizes, 
+                   const CProxy_MeshStreamerGroupClient<dtype> &clientProxy,
+                   int bufferSize, bool yieldFlag = 0, 
+                    double progressPeriodInMs = -1.0)
+   :MeshStreamer<dtype>(0, numDimensions, dimensionSizes, bufferSize, 
+                        yieldFlag, progressPeriodInMs) 
   {
     clientProxy_ = clientProxy; 
     clientObj_ = 
@@ -1048,6 +1071,17 @@ private:
 #endif
   }
 
+  void commonInit() {
+#ifdef CACHE_ARRAY_METADATA
+    numArrayElements_ = (clientArrayMgr_->getNumInitial()).data()[0];
+    clientObjs_ = new MeshStreamerArrayClient<dtype>*[numArrayElements_];
+    destinationPes_ = new int[numArrayElements_];
+    isCachedArrayMetadata_ = new bool[numArrayElements_];
+    std::fill(isCachedArrayMetadata_, 
+             isCachedArrayMetadata_ + numArrayElements_, false);
+#endif    
+  }
+
 public:
 
   struct DataItemHandle {
@@ -1060,20 +1094,26 @@ public:
                     const CProxy_MeshStreamerArrayClient<dtype> &clientProxy,
                    bool yieldFlag = 0, double progressPeriodInMs = -1.0)
     :MeshStreamer<ArrayDataItem<dtype, itype> >(
-      maxNumDataItemsBuffered, numDimensions, dimensionSizes, yieldFlag
-      progressPeriodInMs) 
+                  maxNumDataItemsBuffered, numDimensions, dimensionSizes
+                  0, yieldFlag, progressPeriodInMs) 
   {
     clientProxy_ = clientProxy; 
     clientArrayMgr_ = clientProxy_.ckLocalBranch();
+    commonInit();
+  }
+
+  ArrayMeshStreamer(int numDimensions, int *dimensionSizes, 
+                   const CProxy_MeshStreamerArrayClient<dtype> &clientProxy,
+                   int bufferSize, bool yieldFlag = 0, 
+                    double progressPeriodInMs = -1.0)
+    :MeshStreamer<ArrayDataItem<dtype,itype> >(
+                  0, numDimensions, dimensionSizes, 
+                  bufferSize, yieldFlag, progressPeriodInMs) 
+  {
+    clientProxy_ = clientProxy; 
+    clientArrayMgr_ = clientProxy_.ckLocalBranch();
+    commonInit();
 
-#ifdef CACHE_ARRAY_METADATA
-    numArrayElements_ = (clientArrayMgr_->getNumInitial()).data()[0];
-    clientObjs_ = new MeshStreamerArrayClient<dtype>*[numArrayElements_];
-    destinationPes_ = new int[numArrayElements_];
-    isCachedArrayMetadata_ = new bool[numArrayElements_];
-    std::fill(isCachedArrayMetadata_, 
-             isCachedArrayMetadata_ + numArrayElements_, false);
-#endif
   }
 
   ~ArrayMeshStreamer() {