NDMeshStreamer: bug fixes to staged completion scheme
authorLukasz Wesolowski <wesolwsk@illinois.edu>
Tue, 15 May 2012 22:11:17 +0000 (17:11 -0500)
committerLukasz Wesolowski <wesolwsk@illinois.edu>
Tue, 15 May 2012 22:11:17 +0000 (17:11 -0500)
src/libs/ck-libs/NDMeshStreamer/NDMeshStreamer.h

index a47a3cb41522ea3e8a988188b93df579c8130f08..3da57443534174cdc3e21fb8bb6a138947f07b7e 100644 (file)
@@ -18,7 +18,7 @@
 // #define SUPPORT_INCOMPLETE_MESH
 // #define CACHE_ARRAY_METADATA // only works for 1D array clients
 // #define STREAMER_EXPERIMENTAL
-
+// #define STREAMER_VERBOSE_OUTPUT
 // #define STAGED_COMPLETION
 
 struct MeshLocation {
@@ -76,7 +76,9 @@ public:
     detectorLocalObj_ = detectorLocalObj;
   }
   void receiveRedeliveredItem(dtype data) {
+#ifdef STREAMER_VERBOSE_OUTPUT
     CkPrintf("[%d] redelivered to index %d\n", CkMyPe(), this->thisIndex.data[0]);
+#endif
     detectorLocalObj_->consume();
     process(data);
   }
@@ -223,6 +225,12 @@ public:
     if (finalCount != -1) {
       cntFinished_[dimension]++; 
       cntMsgExpected_[dimension] += finalCount; 
+#ifdef STREAMER_VERBOSE_OUTPUT
+      CkPrintf("[%d] received dimension: %d finalCount: %d cntFinished: %d "
+               "cntMsgExpected: %d cntMsgReceived: %d\n", CkMyPe(), dimension, 
+               finalCount, cntFinished_[dimension], cntMsgExpected_[dimension],
+               cntMsgReceived_[dimension]); 
+#endif
     }  
     if (dimensionToFlush_ != numDimensions_ - 1) {
       checkForCompletedStages();
@@ -236,7 +244,14 @@ public:
            cntMsgExpected_[dimensionToFlush_ + 1] == 
            cntMsgReceived_[dimensionToFlush_ + 1]) {
       if (dimensionToFlush_ == -1) {
+#ifdef STREAMER_VERBOSE_OUTPUT
+        CkPrintf("[%d] contribute\n", CkMyPe()); 
+#endif
+#ifdef DEBUG_STREAMER
+        CkAssert(numDataItemsBuffered_ == 0); 
+#endif
         this->contribute(userCallback_);
+        return; 
       }
       else if (individualDimensionSizes_[dimensionToFlush_] != 1) {
         flushDimension(dimensionToFlush_, true); 
@@ -450,14 +465,20 @@ void MeshStreamer<dtype>::storeMessage(
       combinedDimensionSizes_[dimension];
 
     if (dimension == 0) {
+#ifdef STREAMER_VERBOSE_OUTPUT
+      CkPrintf("[%d] sending to %d\n", CkMyPe(), destinationIndex); 
+#endif
       this->thisProxy[destinationIndex].receiveAtDestination(destinationBuffer);
     }
     else {
+#ifdef STREAMER_VERBOSE_OUTPUT
+      CkPrintf("[%d] sending intermediate to %d\n", CkMyPe(), destinationIndex); 
+#endif
       this->thisProxy[destinationIndex].receiveAlongRoute(destinationBuffer);
     }
 
 #ifdef STAGED_COMPLETION
-    cntMsgSent_[dimension][destinationIndex]++; 
+    cntMsgSent_[dimension][bufferIndex]++; 
 #endif
 
     messageBuffers[bufferIndex] = NULL;
@@ -655,10 +676,16 @@ void MeshStreamer<dtype>::sendLargestBuffer() {
       numDataItemsBuffered_ -= destinationBuffer->numDataItems;
 
       if (flushDimension == 0) {
+#ifdef STREAMER_VERBOSE_OUTPUT
+        CkPrintf("[%d] sending flush to %d\n", CkMyPe(), destinationIndex); 
+#endif
         this->thisProxy[destinationIndex].
           receiveAtDestination(destinationBuffer);
       }
       else {
+#ifdef STREAMER_VERBOSE_OUTPUT
+        CkPrintf("[%d] sending intermediate flush to %d\n", CkMyPe(), destinationIndex); 
+#endif
        this->thisProxy[destinationIndex].receiveAlongRoute(destinationBuffer);
       }
 
@@ -730,6 +757,9 @@ void MeshStreamer<dtype>::flushToIntermediateDestinations() {
 
 template <class dtype>
 void MeshStreamer<dtype>::flushDimension(int dimension, bool sendMsgCounts) {
+#ifdef STREAMER_VERBOSE_OUTPUT
+  CkPrintf("[%d] flushDimension: %d, sendMsgCounts: %d\n", CkMyPe(), dimension, sendMsgCounts); 
+#endif
   MeshStreamerMessage<dtype> **messageBuffers; 
   MeshStreamerMessage<dtype> *destinationBuffer; 
   int destinationIndex, numBuffers; 
@@ -743,6 +773,8 @@ void MeshStreamer<dtype>::flushDimension(int dimension, bool sendMsgCounts) {
       if (sendMsgCounts && j != myLocationIndex_[dimension]) {
         messageBuffers[j] = 
           new (0, 0, sizeof(int)) MeshStreamerMessage<dtype>();
+        *(int *) CkPriorityPtr(messageBuffers[j]) = prio_;
+        CkSetQueueing(messageBuffers[j], CK_QUEUEING_IFIFO);
       }
       else {
         continue; 
@@ -770,13 +802,19 @@ void MeshStreamer<dtype>::flushDimension(int dimension, bool sendMsgCounts) {
     numDataItemsBuffered_ -= destinationBuffer->numDataItems;
 
 #ifdef STAGED_COMPLETION
-      destinationBuffer->finalMsgCount = ++cntMsgSent_[dimension][j];
+    destinationBuffer->finalMsgCount = ++cntMsgSent_[dimension][j];
 #endif
 
     if (dimension == 0) {
+#ifdef STREAMER_VERBOSE_OUTPUT
+      CkPrintf("[%d] sending dimension flush to %d\n", CkMyPe(), destinationIndex); 
+#endif
       this->thisProxy[destinationIndex].receiveAtDestination(destinationBuffer);
     }
     else {
+#ifdef STREAMER_VERBOSE_OUTPUT
+      CkPrintf("[%d] sending intermediate dimension flush to %d\n", CkMyPe(), destinationIndex); 
+#endif
       this->thisProxy[destinationIndex].receiveAlongRoute(destinationBuffer);
     }
     messageBuffers[j] = NULL;
@@ -853,6 +891,12 @@ private:
 #ifdef STAGED_COMPLETION
     envelope *env = UsrToEnv(msg);
     MeshLocation sourceLocation = this->determineLocation(env->getSrcPe());
+#ifdef DEBUG_STREAMER
+    CkAssert(env->getSrcPe() >= 0 && env->getSrcPe() < CkNumPes()); 
+#endif
+#ifdef STREAMER_VERBOSE_OUTPUT
+    CkPrintf("[%d] received at dest from %d %d items finalMsgCount: %d\n", CkMyPe(), env->getSrcPe(), msg->numDataItems, msg->finalMsgCount);  
+#endif
     markMessageReceived(sourceLocation.dimension, msg->finalMsgCount); 
 #else 
     this->detectorLocalObj_->consume(msg->numDataItems);    
@@ -913,7 +957,9 @@ public:
     MeshStreamerArrayClient<dtype> *clientObj = 
       (MeshStreamerArrayClient<dtype> *) clientArrMgr_->lookup(loc.getIndex());
 
+#ifdef DEBUG_STREAMER
     CkAssert(clientObj != NULL); 
+#endif
     clientObj->setDetector(detectorLocalObj_); 
   }