MeshStreamer: reintroduced a periodic flush function to help with
authorLukasz Wesolowski <wesolwsk@illinois.edu>
Thu, 3 Nov 2011 05:02:06 +0000 (00:02 -0500)
committerLukasz Wesolowski <wesolwsk@illinois.edu>
Thu, 3 Nov 2011 23:08:39 +0000 (18:08 -0500)
the use case of applications which generate additional messages
in response to streamed messages.

src/libs/ck-libs/MeshStreamer/MeshStreamer.ci
src/libs/ck-libs/MeshStreamer/MeshStreamer.h

index 09e5f10368630014cf1376decce70772a3aae262..d021bbe81b4bf4adceb8ccdc9301273b7f821642 100644 (file)
@@ -21,7 +21,7 @@ module MeshStreamer {
     entry MeshStreamer(int totalBufferCapacity, int numRows, 
                       int numColumns, int numPlanes, 
                       const CProxy_MeshStreamerClient<dtype> &clientProxy, 
-                       int yield_flag=0 );   
+                       int yieldFlag = 0, int progressPeriodInMs = -1);   
     // entry void insertData(CmiUInt8, int); 
     entry void receiveAggregateData(MeshStreamerMessage<dtype> *msg);
     // entry void receivePersonalizedData(MeshStreamerMessage *msg);
index 329072be25d12147f7e0a83ebe7b61ad326658c0..5650fd392771c3059da62f23a0c646e118daf647 100644 (file)
@@ -88,7 +88,10 @@ private:
     int myRowIndex_;
 
     CkCallback   userCallback_;
-    int yield_flag_;
+    int yieldFlag_;
+
+    int progressPeriodInMs_; 
+    bool isPeriodicFlushEnabled_; 
 
     MeshStreamerMessage<dtype> **personalizedBuffers_; 
     MeshStreamerMessage<dtype> **columnBuffers_; 
@@ -106,27 +109,35 @@ private:
     void flushLargestBucket(MeshStreamerMessage<dtype> ** const messageBuffers,
                            const int numBuffers, const int myIndex, 
                            const int dimensionFactor);
+
 public:
 
     MeshStreamer(int totalBufferCapacity, int numRows, 
                 int numColumns, int numPlanes, 
                 const CProxy_MeshStreamerClient<dtype> &clientProxy,
-                 int yield_flag = 0);
+                 int yieldFlag = 0, int progressPeriodInMs = -1);
     ~MeshStreamer();
 
       // entry
     void insertData(const dtype &dataItem, const int destinationPe); 
+    void doneInserting(); 
     void receiveAggregateData(MeshStreamerMessage<dtype> *msg);
     // void receivePersonalizedData(MeshStreamerMessage<dtype> *msg);
 
     void flushBuckets(MeshStreamerMessage<dtype> **messageBuffers, const int numBuffers);
     void flushDirect();
 
+    bool isPeriodicFlushEnabled() {
+      return isPeriodicFlushEnabled_;
+    }
       // non entry
     void associateCallback(CkCallback &cb) { 
               userCallback_ = cb;
               CkStartQD(CkCallback(CkIndex_MeshStreamer<dtype>::flushDirect(), thisProxy));
          }
+
+    void registerPeriodicProgressFunction();
+
 };
 
 template <class dtype>
@@ -142,7 +153,7 @@ template <class dtype>
 MeshStreamer<dtype>::MeshStreamer(int totalBufferCapacity, int numRows, 
                            int numColumns, int numPlanes, 
                            const CProxy_MeshStreamerClient<dtype> &clientProxy,
-                           int yield_flag): yield_flag_(yield_flag) {
+                          int yieldFlag, int progressPeriodInMs): yieldFlag_(yieldFlag) {
   // limit total number of messages in system to totalBufferCapacity
   //   but allocate a factor BUCKET_SIZE_FACTOR more space to take
   //   advantage of nonuniform filling of buckets
@@ -156,6 +167,7 @@ MeshStreamer<dtype>::MeshStreamer(int totalBufferCapacity, int numRows,
   numNodes_ = CkNumPes(); 
   clientProxy_ = clientProxy; 
   clientObj_ = ((MeshStreamerClient<dtype> *)CkLocalBranch(clientProxy_));
+  progressPeriodInMs_ = progressPeriodInMs; 
 
   personalizedBuffers_ = new MeshStreamerMessage<dtype> *[numRows];
   for (int i = 0; i < numRows; i++) {
@@ -180,6 +192,14 @@ MeshStreamer<dtype>::MeshStreamer(int totalBufferCapacity, int numRows,
   myRowIndex_ = indexWithinPlane / numColumns_;
   myColumnIndex_ = indexWithinPlane - myRowIndex_ * numColumns_; 
 
+  if (progressPeriodInMs_ > 0) {
+    isPeriodicFlushEnabled_ = true; 
+    registerPeriodicProgressFunction();
+  }
+  else {
+    isPeriodicFlushEnabled_ = false; 
+  }
+
 }
 
 template <class dtype>
@@ -335,7 +355,13 @@ void MeshStreamer<dtype>::insertData(const dtype &dataItem, const int destinatio
                columnIndex, planeIndex, msgType, dataItem);
 
     // release control to scheduler, assume caller is threaded entry
-  if (yield_flag_ && ++count % 1024 == 0) CthYield();
+  if (yieldFlag_ && ++count % 1024 == 0) CthYield();
+}
+
+template <class dtype>
+void MeshStreamer<dtype>::doneInserting() {
+  // disable periodic flushing
+  isPeriodicFlushEnabled_ = false; 
 }
 
 template <class dtype>
@@ -503,6 +529,24 @@ void MeshStreamer<dtype>::flushDirect(){
     }
 }
 
+template <class dtype>
+void periodicProgressFunction(void *MeshStreamerObj, double time) {
+
+  MeshStreamer<dtype> *properObj = static_cast<MeshStreamer<dtype>*>(MeshStreamerObj); 
+
+  properObj->flushDirect();
+
+  if (properObj->isPeriodicFlushEnabled()) {
+    properObj->registerPeriodicProgressFunction();
+  }
+}
+
+template <class dtype>
+void MeshStreamer<dtype>::registerPeriodicProgressFunction() {
+  CcdCallFnAfter(periodicProgressFunction<dtype>, (void *) this, progressPeriodInMs_); 
+}
+
+
 #define CK_TEMPLATES_ONLY
 #include "MeshStreamer.def.h"
 #undef CK_TEMPLATES_ONLY