MeshStreamer: Added the option to let users specify when each group member
authorLukasz Wesolowski <wesolwsk@illinois.edu>
Thu, 3 Nov 2011 23:04:14 +0000 (18:04 -0500)
committerLukasz Wesolowski <wesolwsk@illinois.edu>
Thu, 3 Nov 2011 23:08:40 +0000 (18:08 -0500)
is done inserting, instead of using quiescence. When using periodic flushing,
added an optimization to prevent unnecessary flushing if recent progress
has been made.

src/libs/ck-libs/MeshStreamer/MeshStreamer.ci
src/libs/ck-libs/MeshStreamer/MeshStreamer.h

index d021bbe81b4bf4adceb8ccdc9301273b7f821642..b1e527bb29c3091c2d961787bd64da391069f993 100644 (file)
@@ -26,6 +26,7 @@ module MeshStreamer {
     entry void receiveAggregateData(MeshStreamerMessage<dtype> *msg);
     // entry void receivePersonalizedData(MeshStreamerMessage *msg);
     entry void flushDirect();
     entry void receiveAggregateData(MeshStreamerMessage<dtype> *msg);
     // entry void receivePersonalizedData(MeshStreamerMessage *msg);
     entry void flushDirect();
+    entry void finish(CkReductionMsg *msg);
   };
 
 };
   };
 
 };
index 5650fd392771c3059da62f23a0c646e118daf647..9d3c0a88bb386e9c5e4b2dfcb61940441430d572 100644 (file)
@@ -3,7 +3,7 @@
 
 #include "MeshStreamer.decl.h"
 
 
 #include "MeshStreamer.decl.h"
 
-// allocate more total buffer space then the maximum buffering limit but flush upon
+// allocate more total buffer space than the maximum buffering limit but flush upon
 // reaching totalBufferCapacity_
 #define BUCKET_SIZE_FACTOR 4
 
 // reaching totalBufferCapacity_
 #define BUCKET_SIZE_FACTOR 4
 
@@ -92,6 +92,7 @@ private:
 
     int progressPeriodInMs_; 
     bool isPeriodicFlushEnabled_; 
 
     int progressPeriodInMs_; 
     bool isPeriodicFlushEnabled_; 
+    double timeOfLastSend_; 
 
     MeshStreamerMessage<dtype> **personalizedBuffers_; 
     MeshStreamerMessage<dtype> **columnBuffers_; 
 
     MeshStreamerMessage<dtype> **personalizedBuffers_; 
     MeshStreamerMessage<dtype> **columnBuffers_; 
@@ -120,7 +121,7 @@ public:
 
       // entry
     void insertData(const dtype &dataItem, const int destinationPe); 
 
       // entry
     void insertData(const dtype &dataItem, const int destinationPe); 
-    void doneInserting(); 
+    void doneInserting();
     void receiveAggregateData(MeshStreamerMessage<dtype> *msg);
     // void receivePersonalizedData(MeshStreamerMessage<dtype> *msg);
 
     void receiveAggregateData(MeshStreamerMessage<dtype> *msg);
     // void receivePersonalizedData(MeshStreamerMessage<dtype> *msg);
 
@@ -131,13 +132,15 @@ public:
       return isPeriodicFlushEnabled_;
     }
       // non entry
       return isPeriodicFlushEnabled_;
     }
       // non entry
-    void associateCallback(CkCallback &cb) { 
-              userCallback_ = cb;
-              CkStartQD(CkCallback(CkIndex_MeshStreamer<dtype>::flushDirect(), thisProxy));
-         }
+    void associateCallback(CkCallback &cb, bool automaticFinish = true) { 
+      userCallback_ = cb;
+      if (automaticFinish) {
+       CkStartQD(CkCallback(CkIndex_MeshStreamer<dtype>::finish(NULL), thisProxy));
+      }
+    }
 
     void registerPeriodicProgressFunction();
 
     void registerPeriodicProgressFunction();
-
+    void finish(CkReductionMsg *msg);
 };
 
 template <class dtype>
 };
 
 template <class dtype>
@@ -302,6 +305,11 @@ void MeshStreamer<dtype>::storeMessage(MeshStreamerMessage<dtype> ** const messa
     }
     messageBuffers[bucketIndex] = NULL;
     numDataItemsBuffered_ -= numBuffered; 
     }
     messageBuffers[bucketIndex] = NULL;
     numDataItemsBuffered_ -= numBuffered; 
+
+    if (isPeriodicFlushEnabled_) {
+      timeOfLastSend_ = CkWallTimer();
+    }
+
   }
 
   if (numDataItemsBuffered_ == totalBufferCapacity_) {
   }
 
   if (numDataItemsBuffered_ == totalBufferCapacity_) {
@@ -310,6 +318,10 @@ void MeshStreamer<dtype>::storeMessage(MeshStreamerMessage<dtype> ** const messa
     flushLargestBucket(columnBuffers_, numColumns_, myColumnIndex_, 1);
     flushLargestBucket(planeBuffers_, numPlanes_, myPlaneIndex_, planeSize_);
 
     flushLargestBucket(columnBuffers_, numColumns_, myColumnIndex_, 1);
     flushLargestBucket(planeBuffers_, numPlanes_, myPlaneIndex_, planeSize_);
 
+    if (isPeriodicFlushEnabled_) {
+      timeOfLastSend_ = CkWallTimer();
+    }
+
   }
 
 }
   }
 
 }
@@ -354,16 +366,31 @@ void MeshStreamer<dtype>::insertData(const dtype &dataItem, const int destinatio
   storeMessage(messageBuffers, bucketIndex, destinationPe, rowIndex, 
                columnIndex, planeIndex, msgType, dataItem);
 
   storeMessage(messageBuffers, bucketIndex, destinationPe, rowIndex, 
                columnIndex, planeIndex, msgType, dataItem);
 
-    // release control to scheduler, assume caller is threaded entry
+    // release control to scheduler if requested by the user, 
+    //   assume caller is threaded entry
   if (yieldFlag_ && ++count % 1024 == 0) CthYield();
 }
 
 template <class dtype>
 void MeshStreamer<dtype>::doneInserting() {
   if (yieldFlag_ && ++count % 1024 == 0) CthYield();
 }
 
 template <class dtype>
 void MeshStreamer<dtype>::doneInserting() {
-  // disable periodic flushing
+  contribute(CkCallback(CkIndex_MeshStreamer<dtype>::finish(NULL)), thisProxy);
+}
+
+template <class dtype>
+void MeshStreamer<dtype>::finish(CkReductionMsg *msg) {
+
   isPeriodicFlushEnabled_ = false; 
   isPeriodicFlushEnabled_ = false; 
+  flushDirect();
+
+  if (!userCallback_.isInvalid()) {
+    CkStartQD(userCallback_);
+    userCallback_ = CkCallback();      // nullify the current callback
+  }
+
+  delete msg; 
 }
 
 }
 
+
 template <class dtype>
 void MeshStreamer<dtype>::receiveAggregateData(MeshStreamerMessage<dtype> *msg) {
 
 template <class dtype>
 void MeshStreamer<dtype>::receiveAggregateData(MeshStreamerMessage<dtype> *msg) {
 
@@ -514,29 +541,33 @@ void MeshStreamer<dtype>::flushBuckets(MeshStreamerMessage<dtype> **messageBuffe
 
 template <class dtype>
 void MeshStreamer<dtype>::flushDirect(){
 
 template <class dtype>
 void MeshStreamer<dtype>::flushDirect(){
-    flushBuckets(planeBuffers_, numPlanes_);
-    flushBuckets(columnBuffers_, numColumns_);
-    flushBuckets(personalizedBuffers_, numRows_);
+
+    if (!isPeriodicFlushEnabled_ || 
+       CkWallTimer() - timeOfLastSend_ >= progressPeriodInMs_) {
+      flushBuckets(planeBuffers_, numPlanes_);
+      flushBuckets(columnBuffers_, numColumns_);
+      flushBuckets(personalizedBuffers_, numRows_);
+    }
+
+    if (isPeriodicFlushEnabled_) {
+      timeOfLastSend_ = CkWallTimer();
+    }
 
 #ifdef DEBUG_STREAMER
     //CkPrintf("[%d] numDataItemsBuffered_: %d\n", CkMyPe(), numDataItemsBuffered_);
     CkAssert(numDataItemsBuffered_ == 0); 
 #endif
 
 
 #ifdef DEBUG_STREAMER
     //CkPrintf("[%d] numDataItemsBuffered_: %d\n", CkMyPe(), numDataItemsBuffered_);
     CkAssert(numDataItemsBuffered_ == 0); 
 #endif
 
-    if (!userCallback_.isInvalid()) {
-        CkStartQD(userCallback_);
-        userCallback_ = CkCallback();      // nullify the current callback
-    }
 }
 
 template <class dtype>
 void periodicProgressFunction(void *MeshStreamerObj, double time) {
 
 }
 
 template <class dtype>
 void periodicProgressFunction(void *MeshStreamerObj, double time) {
 
-  MeshStreamer<dtype> *properObj = static_cast<MeshStreamer<dtype>*>(MeshStreamerObj); 
-
-  properObj->flushDirect();
+  MeshStreamer<dtype> *properObj = 
+    static_cast<MeshStreamer<dtype>*>(MeshStreamerObj); 
 
   if (properObj->isPeriodicFlushEnabled()) {
 
   if (properObj->isPeriodicFlushEnabled()) {
+    properObj->flushDirect();
     properObj->registerPeriodicProgressFunction();
   }
 }
     properObj->registerPeriodicProgressFunction();
   }
 }