NDMeshStreamer: code cleanup
authorLukasz Wesolowski <wesolwsk@illinois.edu>
Sat, 5 May 2012 00:37:23 +0000 (19:37 -0500)
committerLukasz Wesolowski <wesolwsk@illinois.edu>
Sat, 5 May 2012 00:37:23 +0000 (19:37 -0500)
src/libs/ck-libs/NDMeshStreamer/NDMeshStreamer.ci
src/libs/ck-libs/NDMeshStreamer/NDMeshStreamer.h

index ba0d786290615f3fb1867da61b6390f9bbcb1441..767d2749f59168c1f6a589ab7b528c94db0d3438 100644 (file)
@@ -6,12 +6,10 @@ module NDMeshStreamer {
   template<class dtype> 
   message MeshStreamerMessage {
     int destinationPes[];
-    dtype data[]; 
+    dtype dataItems[]; 
   };
 
-  template<class dtype> group MeshStreamerGroupClient {
-    entry void receiveCombinedData(MeshStreamerMessage<dtype> *msg);
-  };
+  template<class dtype> group MeshStreamerGroupClient {};
 
   template<class dtype> array [Max] MeshStreamerArrayClient {
     entry void receiveRedeliveredItem(dtype data);
@@ -27,12 +25,13 @@ module NDMeshStreamer {
                                 CkCallback startCb, CkCallback endCb, 
                                 CProxy_CompletionDetector detector,
                                 int prio);
+    entry void receiveAtDestination(MeshStreamerMessage<dtype> *msg);
   };
 
   template<class dtype>
   group GroupMeshStreamer : MeshStreamer<dtype> {
     entry GroupMeshStreamer(
-         int totalBufferCapacity, int numDimensions, 
+         int maxNumDataItemsBuffered, int numDimensions, 
          int dimensionSizes[numDimensions], 
          const CProxy_MeshStreamerGroupClient<dtype> &clientProxy,
          bool yieldFlag = 0, double progressPeriodInMs = -1.0);
@@ -41,13 +40,10 @@ module NDMeshStreamer {
   template<class dtype, class itype>
     group ArrayMeshStreamer : MeshStreamer<ArrayDataItem<dtype, itype> > {
     entry ArrayMeshStreamer(
-         int totalBufferCapacity, int numDimensions, 
+         int maxNumDataItemsBuffered, int numDimensions, 
          int dimensionSizes[numDimensions],
          const CProxy_MeshStreamerArrayClient<dtype> &clientProxy,
          bool yieldFlag = 0, double progressPeriodInMs = -1.0);
-
-    entry void receiveArrayData(
-              MeshStreamerMessage<ArrayDataItem<dtype, itype> > *msg); 
   };
 
 };
index d24d3fe8661ba03bb983c1663b0cc8b7e7fcef04..d70276e8a9770dba025e781d55d71301d9db5ac0 100644 (file)
@@ -7,9 +7,11 @@
 #include "completion.h"
 #include "ckarray.h"
 
-// allocate more total buffer space than the maximum buffering limit but flush 
-//   upon reaching totalBufferCapacity_
-#define BUFFER_SIZE_FACTOR 4
+// limit total number of data items in system to
+// maxNumDataItemsBuffered_ (flush when limit is reached) but allow
+// allocation of up to a factor of OVERALLOCATION_FACTOR more space to
+// take advantage of nonuniform filling of buffers
+#define OVERALLOCATION_FACTOR 4
 
 // #define DEBUG_STREAMER
 // #define CACHE_LOCATIONS
@@ -26,14 +28,15 @@ struct MeshLocation {
 
 template<class dtype>
 class MeshStreamerMessage : public CMessage_MeshStreamerMessage<dtype> {
+
 public:
+
 #ifdef STAGED_COMPLETION
   int finalMsgCount; 
-  int dimension;
 #endif
-    int numDataItems;
-    int *destinationPes;
-    dtype *data;
+  int numDataItems;
+  int *destinationPes;
+  dtype *dataItems;
 
   MeshStreamerMessage(): numDataItems(0) {
 #ifdef STAGED_COMPLETION
@@ -41,36 +44,26 @@ public:
 #endif
   }
 
-    int addDataItem(const dtype &dataItem) {
-        data[numDataItems] = dataItem;
-        return ++numDataItems; 
-    }
+  int addDataItem(const dtype &dataItem) {
+    dataItems[numDataItems] = dataItem;
+    return ++numDataItems; 
+  }
 
-    void markDestination(const int index, const int destinationPe) {
-       destinationPes[index] = destinationPe;
-    }
+  void markDestination(const int index, const int destinationPe) {
+    destinationPes[index] = destinationPe;
+  }
 
-    dtype &getDataItem(const int index) {
-        return data[index];
-    }
-};
+  dtype &getDataItem(const int index) {
+    return dataItems[index];
+  }
 
-/*
-template<class dtype>
-class StagedCompletionMessage : public CMessage_StagedCompletionMessage<dtype> {
-  int finalMsgCount_; 
-  int dimension_;
-  CompletionMessage() {}
-  Completionmessage(int finalMsgCount, int dimension)
-   : finalMsgCount_{finalMsgCount), dimension_(dimension) {}
 };
-*/
 
 template <class dtype>
 class MeshStreamerArrayClient : public CBase_MeshStreamerArrayClient<dtype>{
- private:
+private:
   CompletionDetector *detectorLocalObj_;
- public:
+public:
   MeshStreamerArrayClient(){}
   MeshStreamerArrayClient(CkMigrateMessage *msg) {}
   // would like to make it pure virtual but charm will try to
@@ -97,61 +90,46 @@ class MeshStreamerArrayClient : public CBase_MeshStreamerArrayClient<dtype>{
 template <class dtype>
 class MeshStreamerGroupClient : public CBase_MeshStreamerGroupClient<dtype>{
 
- private:
-  CompletionDetector *detectorLocalObj_;
-
- public:
+public:
   virtual void process(dtype &data) = 0;
-  void setDetector(CompletionDetector *detectorLocalObj) {
-    detectorLocalObj_ = detectorLocalObj;
-  }
-  virtual void receiveCombinedData(MeshStreamerMessage<dtype> *msg) {
-    for (int i = 0; i < msg->numDataItems; i++) {
-      dtype &data = msg->getDataItem(i);
-      process(data);
-    }
-#ifndef STAGED_COMPLETION
-    detectorLocalObj_->consume(msg->numDataItems);
-#endif
-    delete msg;
-  }
+
 };
 
 template <class dtype>
 class MeshStreamer : public CBase_MeshStreamer<dtype> {
 
 private:
-    int bufferSize_; 
-    int totalBufferCapacity_;
-    int numDataItemsBuffered_;
+  int bufferSize_; 
+  int maxNumDataItemsBuffered_;
+  int numDataItemsBuffered_;
 
-    int numMembers_; 
-    int numDimensions_;
-    int *individualDimensionSizes_;
-    int *combinedDimensionSizes_;
+  int numMembers_; 
+  int numDimensions_;
+  int *individualDimensionSizes_;
+  int *combinedDimensionSizes_;
 
-    int myIndex_;
-    int *myLocationIndex_;
+  int myIndex_;
+  int *myLocationIndex_;
 
-    CkCallback   userCallback_;
-    bool yieldFlag_;
+  CkCallback   userCallback_;
+  bool yieldFlag_;
 
-    double progressPeriodInMs_; 
-    bool isPeriodicFlushEnabled_; 
-    bool hasSentRecently_;
+  double progressPeriodInMs_; 
+  bool isPeriodicFlushEnabled_; 
+  bool hasSentRecently_;
 #ifdef STREAMER_EXPERIMENTAL
-    bool hasSentPreviously_;
-    bool immediateMode_; 
+  bool hasSentPreviously_;
+  bool immediateMode_; 
 #endif
-    MeshStreamerMessage<dtype> ***dataBuffers_;
+  MeshStreamerMessage<dtype> ***dataBuffers_;
 
-    CProxy_CompletionDetector detector_;
-    int prio_;
-    int yieldCount_;
+  CProxy_CompletionDetector detector_;
+  int prio_;
+  int yieldCount_;
 
 #ifdef CACHE_LOCATIONS
-    MeshLocation *cachedLocations_;
-    bool *isCached_; 
+  MeshLocation *cachedLocations_;
+  bool *isCached_; 
 #endif
 
 
@@ -164,113 +142,81 @@ private:
   int numLocalDone_; 
 
 
-    MeshLocation determineLocation(int destinationPe);
-
-    void storeMessage(int destinationPe, 
-                     const MeshLocation &destinationCoordinates, 
-                     void *dataItem, bool copyIndirectly = false);
+  void storeMessage(int destinationPe, 
+                    const MeshLocation &destinationCoordinates, 
+                    void *dataItem, bool copyIndirectly = false);
+  virtual void localDeliver(dtype &dataItem) = 0; 
 
-    virtual void deliverToDestination(
-                 int destinationPe, 
-                 MeshStreamerMessage<dtype> *destinationBuffer) = 0;
-    virtual void localDeliver(dtype &dataItem) = 0; 
+  virtual int numElementsInClient() = 0;
+  virtual int numLocalElementsInClient() = 0; 
 
-    virtual int numElementsInClient() = 0;
-    virtual int numLocalElementsInClient() = 0; 
+  virtual void initLocalClients() = 0;
 
-    virtual void initLocalClients() = 0;
-
-    void sendLargestBuffer();
-    void flushToIntermediateDestinations();
-    void flushDimension(int dimension, bool sendMsgCounts = false); 
+  void sendLargestBuffer();
+  void flushToIntermediateDestinations();
+  void flushDimension(int dimension, bool sendMsgCounts = false); 
 
 protected:
 
-    CompletionDetector *detectorLocalObj_;
-    virtual int copyDataItemIntoMessage(
-               MeshStreamerMessage<dtype> *destinationBuffer, 
-               void *dataItemHandle, bool copyIndirectly = false);
-
+  CompletionDetector *detectorLocalObj_;
+  virtual int copyDataItemIntoMessage(
+              MeshStreamerMessage<dtype> *destinationBuffer, 
+              void *dataItemHandle, bool copyIndirectly = false);
+  MeshLocation determineLocation(int destinationPe);
 public:
 
-    MeshStreamer(int totalBufferCapacity, int numDimensions, 
-                int *dimensionSies,
-                 bool yieldFlag = 0, double progressPeriodInMs = -1.0);
-    ~MeshStreamer();
+  MeshStreamer(int maxNumDataItemsBuffered, int numDimensions, 
+               int *dimensionSizes,
+               bool yieldFlag = 0, double progressPeriodInMs = -1.0);
+  ~MeshStreamer();
 
-      // entry
-    void receiveAlongRoute(MeshStreamerMessage<dtype> *msg);
-    void flushDirect();
-    void finish();
+  // entry
+  void receiveAlongRoute(MeshStreamerMessage<dtype> *msg);
+  virtual void receiveAtDestination(MeshStreamerMessage<dtype> *msg) = 0;
+  void flushDirect();
+  void finish();
 
-    // non entry
-    bool isPeriodicFlushEnabled() {
-      return isPeriodicFlushEnabled_;
-    }
-    virtual void insertData(dtype &dataItem, int destinationPe); 
-    void insertData(void *dataItemHandle, int destinationPe);
-    void associateCallback(int numContributors, 
-                          CkCallback startCb, CkCallback endCb, 
-                          CProxy_CompletionDetector detector,
-                          int prio);
-    void flushAllBuffers();
-    void registerPeriodicProgressFunction();
-
-    // flushing begins only after enablePeriodicFlushing has been invoked
-
-    void enablePeriodicFlushing(){
-      isPeriodicFlushEnabled_ = true; 
-      registerPeriodicProgressFunction();
-    }
+  // non entry
+  bool isPeriodicFlushEnabled() {
+    return isPeriodicFlushEnabled_;
+  }
+  virtual void insertData(dtype &dataItem, int destinationPe); 
+  void insertData(void *dataItemHandle, int destinationPe);
+  void associateCallback(int numContributors, 
+                         CkCallback startCb, CkCallback endCb, 
+                         CProxy_CompletionDetector detector,
+                         int prio);
+  void flushAllBuffers();
+  void registerPeriodicProgressFunction();
+
+  // flushing begins only after enablePeriodicFlushing has been invoked
+
+  void enablePeriodicFlushing(){
+    isPeriodicFlushEnabled_ = true; 
+    registerPeriodicProgressFunction();
+  }
 
-    void done(int numContributorsFinished = 1) {
+  void done(int numContributorsFinished = 1) {
 #ifdef STAGED_COMPLETION
-      numLocalDone_ += numContributorsFinished; 
-      if (numLocalDone_ == numLocalElementsInClient()) {
-        startStagedCompletion();
-      }
+    numLocalDone_ += numContributorsFinished; 
+    if (numLocalDone_ == numLocalElementsInClient()) {
+      startStagedCompletion();
+    }
 #else
-      detectorLocalObj_->done(numContributorsFinished);
+    detectorLocalObj_->done(numContributorsFinished);
 #endif
-    }
+  }
 
-    void init(CkCallback startCb, CkCallback endCb, int prio);
+  void init(CkCallback startCb, CkCallback endCb, int prio);
   
-    void startStagedCompletion() {    
-      /*
-  int dimension = msg->dimension;
-  cntMsgReceived_[dimension]++;
-  if (msg->finalMsgCount != -1) {
-    cntFinished_[dimension]++; 
-    cntMsgExpected_[dimension] += msg->finalMsgCount; 
-  }  
-  if (cntFinished_[dimension] == individualDimensionSizes_[dimension] - 1) {
-    if (cntMsgExpected_[dimension] == cntMsgReceived_[dimension]) {
-      flushDimension(dimension - 1); 
+  void startStagedCompletion() {          
+    if (individualDimensionSizes_[dimensionToFlush_] != 1) {
+      flushDimension(dimensionToFlush_, true);
     }
-  }
-      */
-      
-      if (individualDimensionSizes_[dimensionToFlush_] != 1) {
-        flushDimension(dimensionToFlush_, true);
-      }
-      dimensionToFlush_--;
+    dimensionToFlush_--;
 
-      checkForCompletedStages();
-      /*
-      do {
-        if (individualDimensionSizes_[dimensionToFlush_] != 1) {
-          flushDimension(dimensionToFlush_, true);
-        }
-        dimensionToFlush_--;
-      } while (dimensionToFlush_ >= 0 
-               && cntFinished_[dimensionToFlush_ + 1] 
-                    == individualDimensionSizes_[dimensionToFlush_ + 1] - 1
-               && cntMsgExpected_[dimensionToFlush_ + 1]
-                    == cntMsgReceived_[dimensionToFlush_ + 1]);
-      // sendStagedCompletion(numDimensions_ - 1); 
-      */
-    }
+    checkForCompletedStages();
+  }
 
   void markMessageReceived(int dimension, int finalCount) {
     cntMsgReceived_[dimension]++;
@@ -289,55 +235,29 @@ public:
            individualDimensionSizes_[dimensionToFlush_ + 1] - 1 &&
            cntMsgExpected_[dimensionToFlush_ + 1] == 
            cntMsgReceived_[dimensionToFlush_ + 1]) {
-        if (dimensionToFlush_ == -1) {
-          this->contribute(userCallback_);
-        }
-        else if (individualDimensionSizes_[dimensionToFlush_] != 1) {
-          flushDimension(dimensionToFlush_, true); 
-        }
-        dimensionToFlush_--; 
+      if (dimensionToFlush_ == -1) {
+        this->contribute(userCallback_);
       }
-  }
-
-  /*
-  void sendStagedCompletion(int dimension) {
-    
-    for (int i = 0; i < dimensionSizes_[dimension]; i++) {
-      int destinationIndex = myIndex_ + 
-        (i - myLocationIndex_[dimension]) *
-        combinedDimensionSizes_[dimension] ;
-
-      StagedCompletionMsg *msg = new (sizeof(int)) StagedCompletionMsg();
-      this->thisProxy[destinationIndex_].receiveStagedCompletion(msg);
-    }
-  }
-  
-  void receiveStagedCompletion(StagedCompletionMsg *msg) {
-    msgCntExpected[msg->dimension_][msg->bufferIndex_] = msg->finalMsgCount_; 
-    cntFinished_[msg->dimension_]++; 
-    if (cntFinished[msg->dimension_] == 
-        individualDimensionSizes_[msg->dimension_]) {
-      sendstagedCompletion(msg->dimension_ - 1);
+      else if (individualDimensionSizes_[dimensionToFlush_] != 1) {
+        flushDimension(dimensionToFlush_, true); 
+      }
+      dimensionToFlush_--; 
     }
-    delete msg; 
   }
-  */
+
 };
 
 template <class dtype>
 MeshStreamer<dtype>::MeshStreamer(
-                    int totalBufferCapacity, int numDimensions, 
+                    int maxNumDataItemsBuffered, int numDimensions, 
                     int *dimensionSizes, 
                     bool yieldFlag, 
                      double progressPeriodInMs)
  :numDimensions_(numDimensions), 
-  totalBufferCapacity_(totalBufferCapacity), 
+  maxNumDataItemsBuffered_(maxNumDataItemsBuffered), 
   yieldFlag_(yieldFlag), 
   progressPeriodInMs_(progressPeriodInMs)
 {
-  // limit total number of messages in system to totalBufferCapacity
-  //   but allocate a factor BUFFER_SIZE_FACTOR more space to take
-  //   advantage of nonuniform filling of buffers
 
   int sumAlongAllDimensions = 0;   
   individualDimensionSizes_ = new int[numDimensions_];
@@ -354,14 +274,13 @@ MeshStreamer<dtype>::MeshStreamer(
 
   // except for personalized messages, the buffers for dimensions with the 
   //   same index as the sender's are not used
-  bufferSize_ = BUFFER_SIZE_FACTOR * totalBufferCapacity 
+  bufferSize_ = OVERALLOCATION_FACTOR * maxNumDataItemsBuffered_ 
     / (sumAlongAllDimensions - numDimensions_ + 1); 
   if (bufferSize_ <= 0) {
     bufferSize_ = 1; 
-    CkPrintf("Argument totalBufferCapacity to MeshStreamer constructor "
+    CkPrintf("Argument maxNumDataItemsBuffered to MeshStreamer constructor "
             "is invalid. Defaulting to a single buffer per destination.\n");
   }
-  totalBufferCapacity_ = totalBufferCapacity;
   numDataItemsBuffered_ = 0; 
   numMembers_ = CkNumPes(); 
 
@@ -491,8 +410,6 @@ void MeshStreamer<dtype>::storeMessage(
   int bufferIndex = destinationLocation.bufferIndex; 
   MeshStreamerMessage<dtype> ** messageBuffers = dataBuffers_[dimension];   
 
-
-
   // allocate new message if necessary
   if (messageBuffers[bufferIndex] == NULL) {
     if (dimension == 0) {
@@ -506,9 +423,6 @@ void MeshStreamer<dtype>::storeMessage(
     }
     *(int *) CkPriorityPtr(messageBuffers[bufferIndex]) = prio_;
     CkSetQueueing(messageBuffers[bufferIndex], CK_QUEUEING_IFIFO);
-#ifdef STAGED_COMPLETION
-      messageBuffers[bufferIndex]->dimension = dimension; 
-#endif
 #ifdef DEBUG_STREAMER
     CkAssert(messageBuffers[bufferIndex] != NULL);
 #endif
@@ -532,7 +446,7 @@ void MeshStreamer<dtype>::storeMessage(
       combinedDimensionSizes_[dimension];
 
     if (dimension == 0) {
-      deliverToDestination(destinationIndex, destinationBuffer);
+      this->thisProxy[destinationIndex].receiveAtDestination(destinationBuffer);
     }
     else {
       this->thisProxy[destinationIndex].receiveAlongRoute(destinationBuffer);
@@ -548,7 +462,7 @@ void MeshStreamer<dtype>::storeMessage(
 
   }
   // send if total buffering capacity has been reached
-  else if (numDataItemsBuffered_ == totalBufferCapacity_) {
+  else if (numDataItemsBuffered_ == maxNumDataItemsBuffered_) {
     sendLargestBuffer();
     hasSentRecently_ = true; 
   }
@@ -689,7 +603,9 @@ void MeshStreamer<dtype>::receiveAlongRoute(MeshStreamerMessage<dtype> *msg) {
 #endif
 
 #ifdef STAGED_COMPLETION
-  markMessageReceived(msg->dimension, msg->finalMsgCount); 
+  envelope *env = UsrToEnv(msg);
+  MeshLocation sourceLocation = this->determineLocation(env->getSrcPe());
+  markMessageReceived(sourceLocation.dimension, msg->finalMsgCount); 
 #endif
 
   delete msg;
@@ -736,7 +652,8 @@ void MeshStreamer<dtype>::sendLargestBuffer() {
       numDataItemsBuffered_ -= destinationBuffer->numDataItems;
 
       if (flushDimension == 0) {
-        deliverToDestination(destinationIndex, destinationBuffer);
+        this->thisProxy[destinationIndex].
+          receiveAtDestination(destinationBuffer);
       }
       else {
        this->thisProxy[destinationIndex].receiveAlongRoute(destinationBuffer);
@@ -774,7 +691,7 @@ void MeshStreamer<dtype>::flushAllBuffers() {
 
       if (i == 0) {
        int destinationPe = myIndex_ + j - myLocationIndex_[i];
-        deliverToDestination(destinationPe, messageBuffers[j]);
+        this->thisProxy[destinationPe].receiveAtDestination(messageBuffers[j]);
       }         
       else {
 
@@ -791,7 +708,7 @@ void MeshStreamer<dtype>::flushAllBuffers() {
          int destinationPe = messageBuffers[j]->destinationPes[k]; 
          dtype &dataItem = messageBuffers[j]->getDataItem(k);   
          directMsg->addDataItem(dataItem);
-          deliverToDestination(destinationPe,directMsg);
+          this->thisProxy[destinationPe].receiveAtDestination(directMsg);
        }
        delete messageBuffers[j];
       }
@@ -850,12 +767,11 @@ void MeshStreamer<dtype>::flushDimension(int dimension, bool sendMsgCounts) {
     numDataItemsBuffered_ -= destinationBuffer->numDataItems;
 
 #ifdef STAGED_COMPLETION
-      destinationBuffer->dimension = dimension; 
       destinationBuffer->finalMsgCount = ++cntMsgSent_[dimension][j];
 #endif
 
     if (dimension == 0) {
-      deliverToDestination(destinationIndex, destinationBuffer);
+      this->thisProxy[destinationIndex].receiveAtDestination(destinationBuffer);
     }
     else {
       this->thisProxy[destinationIndex].receiveAlongRoute(destinationBuffer);
@@ -887,7 +803,7 @@ void MeshStreamer<dtype>::flushDirect(){
   // number of items buffered is small; avoid doing the switch 
   // at the beginning before any sending has taken place
   if (hasSentPreviously_ && 
-      (numDataItemsBuffered_ < .1 * totalBufferCapacity_)) {
+      (numDataItemsBuffered_ < .1 * maxNumDataItemsBuffered_)) {
     immediateMode_ = true; 
   } 
 
@@ -926,9 +842,19 @@ private:
   CProxy_MeshStreamerGroupClient<dtype> clientProxy_;
   MeshStreamerGroupClient<dtype> *clientObj_;
 
-  void deliverToDestination(int destinationPe, 
-                            MeshStreamerMessage<dtype> *destinationBuffer) {
-    clientProxy_[destinationPe].receiveCombinedData(destinationBuffer);
+  void receiveAtDestination(MeshStreamerMessage<dtype> *msg) {
+    for (int i = 0; i < msg->numDataItems; i++) {
+      dtype &data = msg->getDataItem(i);
+      clientObj_->process(data);
+    }
+#ifdef STAGED_COMPLETION
+    envelope *env = UsrToEnv(msg);
+    MeshLocation sourceLocation = this->determineLocation(env->getSrcPe());
+    markMessageReceived(sourceLocation.dimension, msg->finalMsgCount); 
+#else 
+    this->detectorLocalObj_->consume(msg->numDataItems);    
+#endif
+    delete msg;
   }
 
   void localDeliver(dtype &dataItem) {
@@ -948,18 +874,16 @@ private:
   }
 
   void initLocalClients() {
-#ifndef STAGED_COMPLETION
-    clientObj_->setDetector(MeshStreamer<dtype>::detectorLocalObj_);
-#endif
+    // no action required
   }
 
 public:
 
-  GroupMeshStreamer(int totalBufferCapacity, int numDimensions,
+  GroupMeshStreamer(int maxNumDataItemsBuffered, int numDimensions,
                    int *dimensionSizes, 
                    const CProxy_MeshStreamerGroupClient<dtype> &clientProxy,
                    bool yieldFlag = 0, double progressPeriodInMs = -1.0)
-   :MeshStreamer<dtype>(totalBufferCapacity, numDimensions, dimensionSizes, 
+   :MeshStreamer<dtype>(maxNumDataItemsBuffered, numDimensions, dimensionSizes, 
                          yieldFlag, progressPeriodInMs) 
   {
     clientProxy_ = clientProxy; 
@@ -1007,14 +931,6 @@ private:
   bool *isCachedArrayMetadata_;
 #endif
 
-  void deliverToDestination(
-       int destinationPe, 
-       MeshStreamerMessage<ArrayDataItem<dtype, itype> > *destinationBuffer) {
-
-    CProxy_ArrayMeshStreamer<dtype, itype> myProxy(this->thisProxy); 
-    myProxy[destinationPe].receiveArrayData(destinationBuffer);
-  }
-
   void localDeliver(ArrayDataItem<dtype, itype> &packedDataItem) {
     itype arrayId = packedDataItem.arrayIndex; 
 
@@ -1080,12 +996,12 @@ public:
     dtype *dataItem;
   };
 
-  ArrayMeshStreamer(int totalBufferCapacity, int numDimensions,
+  ArrayMeshStreamer(int maxNumDataItemsBuffered, int numDimensions,
                    int *dimensionSizes, 
                     const CProxy_MeshStreamerArrayClient<dtype> &clientProxy,
                    bool yieldFlag = 0, double progressPeriodInMs = -1.0)
     :MeshStreamer<ArrayDataItem<dtype, itype> >(
-      totalBufferCapacity, numDimensions, dimensionSizes, yieldFlag, 
+      maxNumDataItemsBuffered, numDimensions, dimensionSizes, yieldFlag, 
       progressPeriodInMs) 
   {
     clientProxy_ = clientProxy; 
@@ -1109,7 +1025,7 @@ public:
 #endif
   }
 
-  void receiveArrayData(
+  void receiveAtDestination(
        MeshStreamerMessage<ArrayDataItem<dtype, itype> > *msg) {
 
     for (int i = 0; i < msg->numDataItems; i++) {
@@ -1117,7 +1033,9 @@ public:
       localDeliver(packedData);
     }
 #ifdef STAGED_COMPLETION
-    markMessageReceived(msg->dimension, msg->finalMsgCount);
+    envelope *env = UsrToEnv(msg);
+    MeshLocation sourceLocation = this->determineLocation(env->getSrcPe());
+    markMessageReceived(sourceLocation.dimension, msg->finalMsgCount);
 #endif
 
     delete msg;
@@ -1172,9 +1090,9 @@ public:
       // newly inserted items are passed through a handle to avoid copying
       int numDataItems = destinationBuffer->numDataItems;
       DataItemHandle *tempHandle = (DataItemHandle *) dataItemHandle;
-      (destinationBuffer->data)[numDataItems].dataItem = 
+      (destinationBuffer->dataItems)[numDataItems].dataItem = 
        *(tempHandle->dataItem);
-      (destinationBuffer->data)[numDataItems].arrayIndex = 
+      (destinationBuffer->dataItems)[numDataItems].arrayIndex = 
        tempHandle->arrayIndex;
       return ++destinationBuffer->numDataItems;
     }