NDMeshStreamer: switch into immediate sending mode when volume of data is small
authorLukasz Wesolowski <wesolwsk@illinois.edu>
Tue, 1 May 2012 19:22:37 +0000 (14:22 -0500)
committerLukasz Wesolowski <wesolwsk@illinois.edu>
Tue, 1 May 2012 19:22:37 +0000 (14:22 -0500)
src/libs/ck-libs/NDMeshStreamer/NDMeshStreamer.h

index a801ef95afe6879090dd83757af2fc43497be62a..e34183952fc56cf673ae39e288688a0b60dbd720 100644 (file)
@@ -115,6 +115,7 @@ private:
     double progressPeriodInMs_; 
     bool isPeriodicFlushEnabled_; 
     bool hasSentRecently_;
+    bool immediateMode_; 
 
     MeshStreamerMessage<dtype> ***dataBuffers_;
 
@@ -251,6 +252,7 @@ MeshStreamer<dtype>::MeshStreamer(
 
   isPeriodicFlushEnabled_ = false; 
   detectorLocalObj_ = NULL;
+  immediateMode_ = false; 
 
 #ifdef CACHE_LOCATIONS
   cachedLocations_ = new MeshLocation[numMembers_];
@@ -432,6 +434,7 @@ void MeshStreamer<dtype>::associateCallback(
                          CkCallback startCb, CkCallback endCb, 
                          CProxy_CompletionDetector detector, 
                          int prio) {
+  immediateMode_ = false;
   yieldCount_ = 0; 
   prio_ = prio;
   userCallback_ = endCb; 
@@ -491,6 +494,10 @@ void MeshStreamer<dtype>::receiveAlongRoute(MeshStreamerMessage<dtype> *msg) {
     lastDestinationPe = destinationPe; 
   }
 
+  if (immediateMode_) {
+    flushToIntermediateDestinations();
+  }
+
   delete msg;
 
 }
@@ -642,7 +649,6 @@ void MeshStreamer<dtype>::flushToIntermediateDestinations() {
 
 template <class dtype>
 void MeshStreamer<dtype>::flushDirect(){
-
   // flush if (1) this is not a periodic call or 
   //          (2) this is a periodic call and no sending took place
   //              since the last time the function was invoked
@@ -657,6 +663,14 @@ void MeshStreamer<dtype>::flushDirect(){
     
   }
 
+  // switch into immediate sending mode when 
+  // number of items buffered is small; avoid doing the switch 
+  // at the beginning before any sending has taken place
+  if (hasSentRecently_ && 
+      (numDataItemsBuffered_ < .1 * totalBufferCapacity_)) {
+    immediateMode_ = true; 
+  } 
+
   hasSentRecently_ = false; 
 
 }