NDMeshStreamer: leave asserts in the code by default (they get eliminated
authorLukasz Wesolowski <wesolwsk@illinois.edu>
Thu, 23 Aug 2012 22:58:33 +0000 (17:58 -0500)
committerLukasz Wesolowski <wesolwsk@illinois.edu>
Thu, 23 Aug 2012 23:01:45 +0000 (18:01 -0500)
in production build anayway); add assert which ensures data items are not
submitted after all local contributors have indicated they are done

src/libs/ck-libs/NDMeshStreamer/NDMeshStreamer.h

index a2d3311f26b36dd5429798198f2f28461070548e..37ca015bcc21814ce8fe094d8e01b60f19d332e5 100644 (file)
@@ -13,7 +13,6 @@
 // take advantage of nonuniform filling of buffers
 #define OVERALLOCATION_FACTOR 4
 
-// #define DEBUG_STREAMER
 // #define CACHE_LOCATIONS
 // #define SUPPORT_INCOMPLETE_MESH
 // #define CACHE_ARRAY_METADATA // only works for 1D array clients
@@ -208,6 +207,10 @@ public:
   void init(int numLocalContributors, CkCallback startCb, CkCallback endCb, 
             int prio, bool usePeriodicFlushing);
   
+  bool stagedCompletionStarted() {    
+    return (useStagedCompletion_ && dimensionToFlush_ != numDimensions_ - 1); 
+  }
+
   void startStagedCompletion() {          
     if (individualDimensionSizes_[dimensionToFlush_] != 1) {
       flushDimension(dimensionToFlush_, true);
@@ -244,9 +247,7 @@ public:
 #ifdef STREAMER_VERBOSE_OUTPUT
         CkPrintf("[%d] contribute\n", CkMyPe()); 
 #endif
-#ifdef DEBUG_STREAMER
         CkAssert(numDataItemsBuffered_ == 0); 
-#endif
         this->contribute(userCallback_);
         return; 
       }
@@ -446,9 +447,7 @@ void MeshStreamer<dtype>::storeMessage(
     }
     *(int *) CkPriorityPtr(messageBuffers[bufferIndex]) = prio_;
     CkSetQueueing(messageBuffers[bufferIndex], CK_QUEUEING_IFIFO);
-#ifdef DEBUG_STREAMER
     CkAssert(messageBuffers[bufferIndex] != NULL);
-#endif
   }
   
   MeshStreamerMessage<dtype> *destinationBuffer = messageBuffers[bufferIndex];
@@ -522,6 +521,10 @@ template <class dtype>
 inline
 void MeshStreamer<dtype>::insertData(dtype &dataItem, int destinationPe) {
 
+  // no data items should be submitted after all local contributors call done 
+  // and staged completion has begun
+  CkAssert(stagedCompletionStarted() == false);
+
   if (!useStagedCompletion_) {
     detectorLocalObj_->produce();
   }
@@ -807,9 +810,7 @@ void MeshStreamer<dtype>::flushIfIdle(){
     if (numDataItemsBuffered_ != 0) {
       flushToIntermediateDestinations();
     }    
-#ifdef DEBUG_STREAMER
     CkAssert(numDataItemsBuffered_ == 0); 
-#endif
     
   }
 
@@ -928,9 +929,7 @@ public:
     MeshStreamerArrayClient<dtype> *clientObj = 
       (MeshStreamerArrayClient<dtype> *) clientArrMgr_->lookup(loc.getIndex());
 
-#ifdef DEBUG_STREAMER
     CkAssert(clientObj != NULL); 
-#endif
     clientObj->setDetector(detectorLocalObj_); 
   }
 
@@ -1088,6 +1087,12 @@ public:
   }
 
   void insertData(dtype &dataItem, itype arrayIndex) {
+
+    // no data items should be submitted after all local contributors call done
+    // and staged completion has begun
+    CkAssert((MeshStreamer<ArrayDataItem<dtype, itype> >
+               ::stagedCompletionStarted()) == false);
+
     if (MeshStreamer<ArrayDataItem<dtype, itype> >
          ::useStagedCompletion_ == false) {
       MeshStreamer<ArrayDataItem<dtype, itype> >::detectorLocalObj_->produce();