NDMeshStreamer: replaced STAGED_COMPLETION macro with a variable
authorLukasz Wesolowski <wesolwsk@illinois.edu>
Sat, 23 Jun 2012 00:00:43 +0000 (19:00 -0500)
committerLukasz Wesolowski <wesolwsk@illinois.edu>
Sat, 23 Jun 2012 00:00:43 +0000 (19:00 -0500)
src/libs/ck-libs/NDMeshStreamer/NDMeshStreamer.h

index 8fc5eb5c7227f3241aad687610cd27b7c0c599ec..a2d3311f26b36dd5429798198f2f28461070548e 100644 (file)
@@ -18,7 +18,6 @@
 // #define SUPPORT_INCOMPLETE_MESH
 // #define CACHE_ARRAY_METADATA // only works for 1D array clients
 // #define STREAMER_VERBOSE_OUTPUT
 // #define SUPPORT_INCOMPLETE_MESH
 // #define CACHE_ARRAY_METADATA // only works for 1D array clients
 // #define STREAMER_VERBOSE_OUTPUT
-#define STAGED_COMPLETION
 
 struct MeshLocation {
   int dimension; 
 
 struct MeshLocation {
   int dimension; 
@@ -30,18 +29,14 @@ class MeshStreamerMessage : public CMessage_MeshStreamerMessage<dtype> {
 
 public:
 
 
 public:
 
-#ifdef STAGED_COMPLETION
   int finalMsgCount; 
   int finalMsgCount; 
-#endif
   int dimension; 
   int numDataItems;
   int *destinationPes;
   dtype *dataItems;
 
   MeshStreamerMessage(int dim): numDataItems(0), dimension(dim) {
   int dimension; 
   int numDataItems;
   int *destinationPes;
   dtype *dataItems;
 
   MeshStreamerMessage(int dim): numDataItems(0), dimension(dim) {
-#ifdef STAGED_COMPLETION
     finalMsgCount = -1; 
     finalMsgCount = -1; 
-#endif
   }
 
   int addDataItem(const dtype &dataItem) {
   }
 
   int addDataItem(const dtype &dataItem) {
@@ -158,6 +153,7 @@ private:
 
 protected:
 
 
 protected:
 
+  bool useStagedCompletion_;
   CompletionDetector *detectorLocalObj_;
   virtual int copyDataItemIntoMessage(
               MeshStreamerMessage<dtype> *destinationBuffer, 
   CompletionDetector *detectorLocalObj_;
   virtual int copyDataItemIntoMessage(
               MeshStreamerMessage<dtype> *destinationBuffer, 
@@ -197,14 +193,16 @@ public:
   }
 
   void done(int numContributorsFinished = 1) {
   }
 
   void done(int numContributorsFinished = 1) {
-#ifdef STAGED_COMPLETION
-    numLocalDone_ += numContributorsFinished; 
-    if (numLocalDone_ == numLocalContributors_) {
-      startStagedCompletion();
+
+    if (useStagedCompletion_) {
+      numLocalDone_ += numContributorsFinished; 
+      if (numLocalDone_ == numLocalContributors_) {
+        startStagedCompletion();
+      }
+    }
+    else {
+      detectorLocalObj_->done(numContributorsFinished);
     }
     }
-#else
-    detectorLocalObj_->done(numContributorsFinished);
-#endif
   }
 
   void init(int numLocalContributors, CkCallback startCb, CkCallback endCb, 
   }
 
   void init(int numLocalContributors, CkCallback startCb, CkCallback endCb, 
@@ -339,18 +337,10 @@ MeshStreamer<dtype>::MeshStreamer(
   std::fill(isCached_, isCached_ + numMembers_, false);
 #endif
 
   std::fill(isCached_, isCached_ + numMembers_, false);
 #endif
 
-#ifdef STAGED_COMPLETION
-
-  cntMsgSent_ = new int*[numDimensions_]; 
-  cntMsgReceived_ = new int[numDimensions_];
-  cntMsgExpected_ = new int[numDimensions_];
-  cntFinished_ = new int[numDimensions_]; 
-
-  for (int i = 0; i < numDimensions_; i++) {
-    cntMsgSent_[i] = new int[individualDimensionSizes_[i]]; 
-  }
-
-#endif
+  cntMsgSent_ = NULL; 
+  cntMsgReceived_ = NULL; 
+  cntMsgExpected_ = NULL; 
+  cntFinished_ = NULL; 
 
 }
 
 
 }
 
@@ -374,15 +364,15 @@ MeshStreamer<dtype>::~MeshStreamer() {
   delete[] isCached_; 
 #endif
 
   delete[] isCached_; 
 #endif
 
-#ifdef STAGED_COMPLETION
-  for (int i = 0; i < numDimensions_; i++) {
-    delete[] cntMsgSent_[i]; 
+  if (cntMsgSent_ != NULL) {
+    for (int i = 0; i < numDimensions_; i++) {
+      delete[] cntMsgSent_[i]; 
+    }
+    delete[] cntMsgSent_; 
+    delete[] cntMsgReceived_; 
+    delete[] cntMsgExpected_; 
+    delete[] cntFinished_;
   }
   }
-  delete[] cntMsgSent_; 
-  delete[] cntMsgReceived_; 
-  delete[] cntMsgExpected_; 
-  delete[] cntFinished_;
-#endif
 
 }
 
 
 }
 
@@ -491,9 +481,9 @@ void MeshStreamer<dtype>::storeMessage(
       this->thisProxy[destinationIndex].receiveAlongRoute(destinationBuffer);
     }
 
       this->thisProxy[destinationIndex].receiveAlongRoute(destinationBuffer);
     }
 
-#ifdef STAGED_COMPLETION
-    cntMsgSent_[dimension][bufferIndex]++; 
-#endif
+    if (useStagedCompletion_) {
+      cntMsgSent_[dimension][bufferIndex]++; 
+    }
 
     messageBuffers[bufferIndex] = NULL;
     numDataItemsBuffered_ -= numBuffered; 
 
     messageBuffers[bufferIndex] = NULL;
     numDataItemsBuffered_ -= numBuffered; 
@@ -531,9 +521,10 @@ void MeshStreamer<dtype>::insertData(void *dataItemHandle, int destinationPe) {
 template <class dtype>
 inline
 void MeshStreamer<dtype>::insertData(dtype &dataItem, int destinationPe) {
 template <class dtype>
 inline
 void MeshStreamer<dtype>::insertData(dtype &dataItem, int destinationPe) {
-#ifndef STAGED_COMPLETION
-  detectorLocalObj_->produce();
-#endif
+
+  if (!useStagedCompletion_) {
+    detectorLocalObj_->produce();
+  }
   if (destinationPe == CkMyPe()) {
     // copying here is necessary - user code should not be 
     // passed back a reference to the original item
   if (destinationPe == CkMyPe()) {
     // copying here is necessary - user code should not be 
     // passed back a reference to the original item
@@ -549,6 +540,19 @@ template <class dtype>
 void MeshStreamer<dtype>::init(int numLocalContributors, CkCallback startCb, 
                                CkCallback endCb, int prio, 
                                bool usePeriodicFlushing) {
 void MeshStreamer<dtype>::init(int numLocalContributors, CkCallback startCb, 
                                CkCallback endCb, int prio, 
                                bool usePeriodicFlushing) {
+  useStagedCompletion_ = true; 
+  // allocate memory on first use
+  if (cntMsgSent_ == NULL) {
+    cntMsgSent_ = new int*[numDimensions_]; 
+    cntMsgReceived_ = new int[numDimensions_];
+    cntMsgExpected_ = new int[numDimensions_];
+    cntFinished_ = new int[numDimensions_]; 
+
+    for (int i = 0; i < numDimensions_; i++) {
+      cntMsgSent_[i] = new int[individualDimensionSizes_[i]]; 
+    }
+  }
+
 
   for (int i = 0; i < numDimensions_; i++) {
     std::fill(cntMsgSent_[i], 
 
   for (int i = 0; i < numDimensions_; i++) {
     std::fill(cntMsgSent_[i], 
@@ -586,6 +590,7 @@ void MeshStreamer<dtype>::associateCallback(
                          CProxy_CompletionDetector detector, 
                          int prio, bool usePeriodicFlushing) {
 
                          CProxy_CompletionDetector detector, 
                          int prio, bool usePeriodicFlushing) {
 
+  useStagedCompletion_ = false; 
   yieldCount_ = 0; 
   prio_ = prio;
   userCallback_ = endCb; 
   yieldCount_ = 0; 
   prio_ = prio;
   userCallback_ = endCb; 
@@ -597,7 +602,8 @@ void MeshStreamer<dtype>::associateCallback(
   detectorLocalObj_ = detector_.ckLocalBranch();
   initLocalClients();
 
   detectorLocalObj_ = detector_.ckLocalBranch();
   initLocalClients();
 
-  detectorLocalObj_->start_detection(numContributors, startCb, flushCb, finish , 0);
+  detectorLocalObj_->start_detection(numContributors, startCb, flushCb, 
+                                     finish , 0);
   
   if (progressPeriodInMs_ <= 0) {
     CkPrintf("Using completion detection in NDMeshStreamer requires"
   
   if (progressPeriodInMs_ <= 0) {
     CkPrintf("Using completion detection in NDMeshStreamer requires"
@@ -610,6 +616,7 @@ void MeshStreamer<dtype>::associateCallback(
   if (usePeriodicFlushing) {
     enablePeriodicFlushing();
   }
   if (usePeriodicFlushing) {
     enablePeriodicFlushing();
   }
+
 }
 
 template <class dtype>
 }
 
 template <class dtype>
@@ -646,9 +653,9 @@ void MeshStreamer<dtype>::receiveAlongRoute(MeshStreamerMessage<dtype> *msg) {
     lastDestinationPe = destinationPe; 
   }
 
     lastDestinationPe = destinationPe; 
   }
 
-#ifdef STAGED_COMPLETION
-  markMessageReceived(msg->dimension, msg->finalMsgCount); 
-#endif
+  if (useStagedCompletion_) {
+    markMessageReceived(msg->dimension, msg->finalMsgCount); 
+  }
 
   delete msg;
 
 
   delete msg;
 
@@ -684,13 +691,12 @@ void MeshStreamer<dtype>::sendLargestBuffer() {
        (flushIndex - myLocationIndex_[flushDimension]) * 
        combinedDimensionSizes_[flushDimension] ;
 
        (flushIndex - myLocationIndex_[flushDimension]) * 
        combinedDimensionSizes_[flushDimension] ;
 
-      if (destinationBuffer->numDataItems < bufferSize_) {
-       // not sending the full buffer, shrink the message size
-       envelope *env = UsrToEnv(destinationBuffer);
-       env->setTotalsize(env->getTotalsize() - sizeof(dtype) *
-                         (bufferSize_ - destinationBuffer->numDataItems));
-       *((int *) env->getPrioPtr()) = prio_;
-      }
+      // not sending the full buffer, shrink the message size
+      envelope *env = UsrToEnv(destinationBuffer);
+      env->setTotalsize(env->getTotalsize() - sizeof(dtype) *
+                        (bufferSize_ - destinationBuffer->numDataItems));
+      *((int *) env->getPrioPtr()) = prio_;
+
       numDataItemsBuffered_ -= destinationBuffer->numDataItems;
 
       if (flushDimension == 0) {
       numDataItemsBuffered_ -= destinationBuffer->numDataItems;
 
       if (flushDimension == 0) {
@@ -707,9 +713,9 @@ void MeshStreamer<dtype>::sendLargestBuffer() {
        this->thisProxy[destinationIndex].receiveAlongRoute(destinationBuffer);
       }
 
        this->thisProxy[destinationIndex].receiveAlongRoute(destinationBuffer);
       }
 
-#ifdef STAGED_COMPLETION
-      cntMsgSent_[i][flushIndex]++; 
-#endif
+      if (useStagedCompletion_) {
+        cntMsgSent_[i][flushIndex]++; 
+      }
 
       messageBuffers[flushIndex] = NULL;
 
 
       messageBuffers[flushIndex] = NULL;
 
@@ -757,27 +763,21 @@ void MeshStreamer<dtype>::flushDimension(int dimension, bool sendMsgCounts) {
       (j - myLocationIndex_[dimension]) * 
       combinedDimensionSizes_[dimension] ;
 
       (j - myLocationIndex_[dimension]) * 
       combinedDimensionSizes_[dimension] ;
 
-    if (destinationBuffer->numDataItems < bufferSize_) {
-#ifdef STAGED_COMPLETION
-      if (destinationBuffer->numDataItems != 0) {
-#endif
+    if (destinationBuffer->numDataItems != 0) {
       // not sending the full buffer, shrink the message size
       envelope *env = UsrToEnv(destinationBuffer);
       env->setTotalsize(env->getTotalsize() - sizeof(dtype) *
                         (bufferSize_ - destinationBuffer->numDataItems));
       *((int *) env->getPrioPtr()) = prio_;
       // not sending the full buffer, shrink the message size
       envelope *env = UsrToEnv(destinationBuffer);
       env->setTotalsize(env->getTotalsize() - sizeof(dtype) *
                         (bufferSize_ - destinationBuffer->numDataItems));
       *((int *) env->getPrioPtr()) = prio_;
-#ifdef STAGED_COMPLETION
-      }
-#endif
     }
     numDataItemsBuffered_ -= destinationBuffer->numDataItems;
 
     }
     numDataItemsBuffered_ -= destinationBuffer->numDataItems;
 
-#ifdef STAGED_COMPLETION
-    cntMsgSent_[dimension][j]++;
-    if (sendMsgCounts) {
-      destinationBuffer->finalMsgCount = cntMsgSent_[dimension][j];
+    if (useStagedCompletion_) {
+      cntMsgSent_[dimension][j]++;
+      if (sendMsgCounts) {
+        destinationBuffer->finalMsgCount = cntMsgSent_[dimension][j];
+      }
     }
     }
-#endif
 
     if (dimension == 0) {
 #ifdef STREAMER_VERBOSE_OUTPUT
 
     if (dimension == 0) {
 #ifdef STREAMER_VERBOSE_OUTPUT
@@ -849,23 +849,25 @@ private:
       clientObj_->process(data);
     }
 
       clientObj_->process(data);
     }
 
-#ifdef STAGED_COMPLETION
+    if (MeshStreamer<dtype>::useStagedCompletion_) {
 #ifdef STREAMER_VERBOSE_OUTPUT
 #ifdef STREAMER_VERBOSE_OUTPUT
-    envelope *env = UsrToEnv(msg);
-    CkPrintf("[%d] received at dest from %d %d items finalMsgCount: %d\n", CkMyPe(), env->getSrcPe(), msg->numDataItems, msg->finalMsgCount);  
-#endif
-    markMessageReceived(msg->dimension, msg->finalMsgCount); 
-#else 
-    this->detectorLocalObj_->consume(msg->numDataItems);    
+      envelope *env = UsrToEnv(msg);
+      CkPrintf("[%d] received at dest from %d %d items finalMsgCount: %d\n", CkMyPe(), env->getSrcPe(), msg->numDataItems, msg->finalMsgCount);  
 #endif
 #endif
+      markMessageReceived(msg->dimension, msg->finalMsgCount); 
+    }
+    else {
+      this->detectorLocalObj_->consume(msg->numDataItems);    
+    }
+
     delete msg;
   }
 
   void localDeliver(dtype &dataItem) {
     clientObj_->process(dataItem);
     delete msg;
   }
 
   void localDeliver(dtype &dataItem) {
     clientObj_->process(dataItem);
-#ifndef STAGED_COMPLETION
-    MeshStreamer<dtype>::detectorLocalObj_->consume();
-#endif
+    if (MeshStreamer<dtype>::useStagedCompletion_ == false) {
+      MeshStreamer<dtype>::detectorLocalObj_->consume();
+    }
   }
 
   int numElementsInClient() {
   }
 
   int numElementsInClient() {
@@ -961,9 +963,11 @@ private:
 
     if (clientObj != NULL) {
       clientObj->process(packedDataItem.dataItem);
 
     if (clientObj != NULL) {
       clientObj->process(packedDataItem.dataItem);
-#ifndef STAGED_COMPLETION
-      MeshStreamer<ArrayDataItem<dtype, itype> >::detectorLocalObj_->consume();
-#endif
+      if (MeshStreamer<ArrayDataItem<dtype, itype> >
+           ::useStagedCompletion_ == false) {
+        MeshStreamer<ArrayDataItem<dtype, itype> >
+         ::detectorLocalObj_->consume();
+      }
     }
     else { 
       // array element is no longer present locally - redeliver using proxy
     }
     else { 
       // array element is no longer present locally - redeliver using proxy
@@ -980,31 +984,31 @@ private:
   }
 
   void initLocalClients() {
   }
 
   void initLocalClients() {
-#ifndef STAGED_COMPLETION
-
-  #ifdef CACHE_ARRAY_METADATA
-    std::fill(isCachedArrayMetadata_, 
-             isCachedArrayMetadata_ + numArrayElements_, false);
-
-    for (int i = 0; i < numArrayElements_; i++) {
-      clientObjs_[i] = clientProxy_[i].ckLocal();
-      if (clientObjs_[i] != NULL) {
-       clientObjs_[i]->setDetector(
-        MeshStreamer<ArrayDataItem<dtype, itype> >::detectorLocalObj_);
+    if (MeshStreamer<ArrayDataItem<dtype, itype> >
+         ::useStagedCompletion_ == false) {
+#ifdef CACHE_ARRAY_METADATA
+      std::fill(isCachedArrayMetadata_, 
+                isCachedArrayMetadata_ + numArrayElements_, false);
+
+      for (int i = 0; i < numArrayElements_; i++) {
+        clientObjs_[i] = clientProxy_[i].ckLocal();
+        if (clientObjs_[i] != NULL) {
+          clientObjs_[i]->setDetector(
+           MeshStreamer<ArrayDataItem<dtype, itype> >::detectorLocalObj_);
+        }
       }
       }
+#else
+      // set completion detector in local elements of the client
+      CkLocMgr *clientLocMgr = clientProxy_.ckLocMgr(); 
+      MeshStreamerClientIterator<dtype> clientIterator(
+          MeshStreamer<ArrayDataItem<dtype, itype> >::detectorLocalObj_, 
+          clientProxy_.ckLocalBranch());
+      clientLocMgr->iterate(clientIterator);
+#endif    
+    }
+    else {
+      numLocalArrayElements_ = clientProxy_.ckLocMgr()->numLocalElements();
     }
     }
-  #else
-    // set completion detector in local elements of the client
-    CkLocMgr *clientLocMgr = clientProxy_.ckLocMgr(); 
-    MeshStreamerClientIterator<dtype> clientIterator(
-     MeshStreamer<ArrayDataItem<dtype, itype> >::detectorLocalObj_, 
-     clientProxy_.ckLocalBranch());
-    clientLocMgr->iterate(clientIterator);
-  #endif    
-
-#else 
-    numLocalArrayElements_ = clientProxy_.ckLocMgr()->numLocalElements();
-#endif
   }
 
   void commonInit() {
   }
 
   void commonInit() {
@@ -1076,17 +1080,18 @@ public:
       ArrayDataItem<dtype, itype> &packedData = msg->getDataItem(i);
       localDeliver(packedData);
     }
       ArrayDataItem<dtype, itype> &packedData = msg->getDataItem(i);
       localDeliver(packedData);
     }
-#ifdef STAGED_COMPLETION
-    markMessageReceived(msg->dimension, msg->finalMsgCount);
-#endif
+    if (MeshStreamer<ArrayDataItem<dtype, itype> >::useStagedCompletion_) {
+      markMessageReceived(msg->dimension, msg->finalMsgCount);
+    }
 
     delete msg;
   }
 
   void insertData(dtype &dataItem, itype arrayIndex) {
 
     delete msg;
   }
 
   void insertData(dtype &dataItem, itype arrayIndex) {
-#ifndef STAGED_COMPLETION
-    MeshStreamer<ArrayDataItem<dtype, itype> >::detectorLocalObj_->produce();
-#endif
+    if (MeshStreamer<ArrayDataItem<dtype, itype> >
+         ::useStagedCompletion_ == false) {
+      MeshStreamer<ArrayDataItem<dtype, itype> >::detectorLocalObj_->produce();
+    }
     int destinationPe; 
 #ifdef CACHE_ARRAY_METADATA
   if (isCachedArrayMetadata_[arrayIndex]) {    
     int destinationPe; 
 #ifdef CACHE_ARRAY_METADATA
   if (isCachedArrayMetadata_[arrayIndex]) {