NDMeshStreamer.h: adding a function to flush to intermediate destinations
authorLukasz Wesolowski <wesolwsk@illinois.edu>
Fri, 27 Apr 2012 21:04:13 +0000 (16:04 -0500)
committerLukasz Wesolowski <wesolwsk@illinois.edu>
Fri, 27 Apr 2012 21:04:13 +0000 (16:04 -0500)
instead of directly to the final destinations

src/libs/ck-libs/NDMeshStreamer/NDMeshStreamer.h

index 6613e616ea867345ae5da88aaefafdafc9edd35f..10d3330476044d8cae3426b93bc25a545e9edf4b 100644 (file)
@@ -61,6 +61,7 @@ class MeshStreamerArrayClient : public CBase_MeshStreamerArrayClient<dtype>{
     detectorLocalObj_ = detectorLocalObj;
   }
   void receiveRedeliveredItem(dtype data) {
     detectorLocalObj_ = detectorLocalObj;
   }
   void receiveRedeliveredItem(dtype data) {
+    //    CkPrintf("[%d] redelivered to index %d\n", CkMyPe(), this->thisIndex.data[0]);
     detectorLocalObj_->consume();
     process(data);
   }
     detectorLocalObj_->consume();
     process(data);
   }
@@ -143,6 +144,7 @@ private:
     virtual void initLocalClients() = 0;
 
     void flushLargestBuffer();
     virtual void initLocalClients() = 0;
 
     void flushLargestBuffer();
+    void flushToIntermediateDestinations();
 
 protected:
 
 
 protected:
 
@@ -592,6 +594,52 @@ void MeshStreamer<dtype>::flushAllBuffers() {
   }
 }
 
   }
 }
 
+template <class dtype>
+void MeshStreamer<dtype>::flushToIntermediateDestinations() {
+
+  MeshStreamerMessage<dtype> **messageBuffers; 
+  MeshStreamerMessage<dtype> *destinationBuffer; 
+  int destinationIndex, numBuffers; 
+
+  for (int i = 0; i < numDimensions_; i++) {
+
+    messageBuffers = dataBuffers_[i]; 
+    numBuffers = individualDimensionSizes_[i]; 
+
+    for (int j = 0; j < numBuffers; j++) {
+
+      if(messageBuffers[j] == NULL) {
+       continue;
+      }
+
+      messageBuffers = dataBuffers_[i]; 
+      destinationBuffer = messageBuffers[j];
+      destinationIndex = myIndex_ + 
+       (j - myLocationIndex_[i]) * 
+       combinedDimensionSizes_[i] ;
+
+      if (destinationBuffer->numDataItems < bufferSize_) {
+       // not sending the full buffer, shrink the message size
+       envelope *env = UsrToEnv(destinationBuffer);
+       env->setTotalsize(env->getTotalsize() - sizeof(dtype) *
+                         (bufferSize_ - destinationBuffer->numDataItems));
+       *((int *) env->getPrioPtr()) = prio_;
+      }
+      numDataItemsBuffered_ -= destinationBuffer->numDataItems;
+
+      if (i == 0) {
+        deliverToDestination(destinationIndex, destinationBuffer);
+      }
+      else {
+       this->thisProxy[destinationIndex].receiveAlongRoute(destinationBuffer);
+      }
+      messageBuffers[j] = NULL;
+    }
+  }
+}
+
+
+
 template <class dtype>
 void MeshStreamer<dtype>::flushDirect(){
 
 template <class dtype>
 void MeshStreamer<dtype>::flushDirect(){