NDMeshStreamer: make data items const in interface functions
authorLukasz Wesolowski <wesolwsk@illinois.edu>
Sat, 15 Sep 2012 06:25:31 +0000 (01:25 -0500)
committerLukasz Wesolowski <wesolwsk@illinois.edu>
Sat, 15 Sep 2012 06:25:31 +0000 (01:25 -0500)
src/libs/ck-libs/NDMeshStreamer/DataItemTypes.h
src/libs/ck-libs/NDMeshStreamer/NDMeshStreamer.h

index 00cd1af2926f1b6691a51ab6b28f722d6b7741d6..73d77a21f0a5d9d10dc417d1d4da49e37dab6210 100644 (file)
@@ -6,6 +6,8 @@ class ArrayDataItem{
  public:
   itype arrayIndex;
   dtype dataItem;
+
+  ArrayDataItem(itype i, const dtype d) : arrayIndex(i), dataItem(d) {}
 };
 
 #endif
index 0ca33f41ec383cdc5ab8d9268a4dd9408d774e83..360aca7b81f5411e2106c53a89d4230b22d838fe 100644 (file)
@@ -40,7 +40,7 @@ public:
     finalMsgCount = -1; 
   }
 
-  int addDataItem(const dtype &dataItem) {
+  int addDataItem(const dtypedataItem) {
     dataItems[numDataItems] = dataItem;
     return ++numDataItems; 
   }
@@ -49,7 +49,7 @@ public:
     destinationPes[index] = destinationPe;
   }
 
-  dtype &getDataItem(const int index) {
+  const dtype& getDataItem(const int index) {
     return dataItems[index];
   }
 
@@ -66,7 +66,7 @@ public:
   MeshStreamerArrayClient(CkMigrateMessage *msg) {}
   // would like to make it pure virtual but charm will try to
   // instantiate the abstract class, leading to errors
-  virtual void process(dtype &data) {
+  virtual void process(const dtype& data) {
     CkAbort("Error. MeshStreamerArrayClient::process() is being called. "
             "This virtual function should have been defined by the user.\n");
   };     
@@ -84,7 +84,7 @@ public:
     process(data);
   }
 
-  void pup(PUP::er &p) {
+  void pup(PUP::erp) {
     CBase_MeshStreamerArrayClient<dtype>::pup(p);
    }  
 
@@ -94,7 +94,7 @@ template <class dtype>
 class MeshStreamerGroupClient : public CBase_MeshStreamerGroupClient<dtype>{
 
 public:
-  virtual void process(dtype &data) = 0;
+  virtual void process(const dtype& data) = 0;
 
 };
 
@@ -143,10 +143,10 @@ private:
   int numLocalContributors_;
 
   void storeMessage(int destinationPe, 
-                    const MeshLocation &destinationCoordinates, 
-                    void *dataItem, bool copyIndirectly = false);
-  virtual void localDeliver(dtype &dataItem) = 0; 
-  virtual void localBroadcast(dtype &dataItem) = 0; 
+                    const MeshLocationdestinationCoordinates, 
+                    const void *dataItem, bool copyIndirectly = false);
+  virtual void localDeliver(const dtype& dataItem) = 0; 
+  virtual void localBroadcast(const dtype& dataItem) = 0; 
   virtual int numElementsInClient() = 0;
   virtual int numLocalElementsInClient() = 0; 
 
@@ -165,9 +165,10 @@ protected:
   CompletionDetector *detectorLocalObj_;
   virtual int copyDataItemIntoMessage(
               MeshStreamerMessage<dtype> *destinationBuffer, 
-              void *dataItemHandle, bool copyIndirectly = false);
-  void insertData(void *dataItemHandle, int destinationPe);
-  void broadcast(void *dataItemHandle, int dimension, bool copyIndirectly);
+              const void *dataItemHandle, bool copyIndirectly = false);
+  void insertData(const void *dataItemHandle, int destinationPe);
+  void broadcast(const void *dataItemHandle, int dimension, 
+                 bool copyIndirectly);
 
 public:
 
@@ -192,8 +193,8 @@ public:
   bool isPeriodicFlushEnabled() {
     return isPeriodicFlushEnabled_;
   }
-  virtual void insertData(dtype &dataItem, int destinationPe); 
-  virtual void broadcast(dtype &dataItem); 
+  virtual void insertData(const dtype& dataItem, int destinationPe); 
+  virtual void broadcast(const dtype& dataItem); 
   void registerPeriodicProgressFunction();
 
   // flushing begins only after enablePeriodicFlushing has been invoked
@@ -301,6 +302,8 @@ MeshStreamer<dtype>::MeshStreamer(
       combinedDimensionSizes_[i] * individualDimensionSizes_[i];
   }
 
+  CkAssert(combinedDimensionSizes_[numDimensions] == CkNumPes()); 
+
   // a bufferSize input of 0 indicates it should be calculated by the library
   if (bufferSize_ == 0) {
     CkAssert(maxNumDataItemsBuffered_ > 0);
@@ -433,8 +436,8 @@ template <class dtype>
 inline 
 int MeshStreamer<dtype>::copyDataItemIntoMessage(
                         MeshStreamerMessage<dtype> *destinationBuffer,
-                        void *dataItemHandle, bool copyIndirectly) {
-  return destinationBuffer->addDataItem(*((dtype *)dataItemHandle)); 
+                        const void *dataItemHandle, bool copyIndirectly) {
+  return destinationBuffer->addDataItem(*((const dtype *)dataItemHandle)); 
 }
 
 template <class dtype>
@@ -442,7 +445,7 @@ inline
 void MeshStreamer<dtype>::storeMessage(
                          int destinationPe, 
                          const MeshLocation& destinationLocation,
-                         void *dataItem, bool copyIndirectly) {
+                         const void *dataItem, bool copyIndirectly) {
 
   int dimension = destinationLocation.dimension;
   int bufferIndex = destinationLocation.bufferIndex; 
@@ -514,7 +517,7 @@ void MeshStreamer<dtype>::storeMessage(
 }
 template <class dtype>
 inline
-void MeshStreamer<dtype>::broadcast(dtype &dataItem) {
+void MeshStreamer<dtype>::broadcast(const dtype& dataItem) {
   const static bool copyIndirectly = true;
 
   // no data items should be submitted after all local contributors call done 
@@ -527,15 +530,14 @@ void MeshStreamer<dtype>::broadcast(dtype &dataItem) {
   }
 
   // deliver locally
-  dtype dataItemCopy = dataItem;
-  localBroadcast(dataItemCopy);
+  localBroadcast(dataItem);
 
   broadcast(&dataItem, numDimensions_ - 1, copyIndirectly); 
 }
 
 template <class dtype>
 inline
-void MeshStreamer<dtype>::broadcast(void *dataItemHandle, int dimension, 
+void MeshStreamer<dtype>::broadcast(const void *dataItemHandle, int dimension, 
                                     bool copyIndirectly) {
 
   MeshLocation destinationLocation;
@@ -566,7 +568,8 @@ void MeshStreamer<dtype>::broadcast(void *dataItemHandle, int dimension,
 
 template <class dtype>
 inline
-void MeshStreamer<dtype>::insertData(void *dataItemHandle, int destinationPe) {
+void MeshStreamer<dtype>::insertData(const void *dataItemHandle, 
+                                     int destinationPe) {
   const static bool copyIndirectly = true;
 
   // treat newly inserted items as if they were received along
@@ -586,7 +589,7 @@ void MeshStreamer<dtype>::insertData(void *dataItemHandle, int destinationPe) {
 
 template <class dtype>
 inline
-void MeshStreamer<dtype>::insertData(dtype &dataItem, int destinationPe) {
+void MeshStreamer<dtype>::insertData(const dtype& dataItem, int destinationPe) {
 
   // no data items should be submitted after all local contributors call done 
   // and staged completion has begun
@@ -596,14 +599,11 @@ void MeshStreamer<dtype>::insertData(dtype &dataItem, int destinationPe) {
     detectorLocalObj_->produce();
   }
   if (destinationPe == CkMyPe()) {
-    // copying here is necessary - user code should not be 
-    // passed back a reference to the original item
-    dtype dataItemCopy = dataItem;
-    localDeliver(dataItemCopy);
+    localDeliver(dataItem);
     return;
   }
 
-  insertData((void *) &dataItem, destinationPe);
+  insertData((const void *) &dataItem, destinationPe);
 }
 
 template <class dtype>
@@ -709,7 +709,7 @@ void MeshStreamer<dtype>::receiveAlongRoute(MeshStreamerMessage<dtype> *msg) {
   lastDestinationPe = -1;
   for (int i = 0; i < msg->numDataItems; i++) {
     destinationPe = msg->destinationPes[i];
-    dtype &dataItem = msg->getDataItem(i);
+    const dtype& dataItem = msg->getDataItem(i);
     if (destinationPe == CkMyPe()) {
       localDeliver(dataItem);
     }
@@ -921,7 +921,7 @@ private:
 
   void receiveAtDestination(MeshStreamerMessage<dtype> *msg) {
     for (int i = 0; i < msg->numDataItems; i++) {
-      dtype &data = msg->getDataItem(i);
+      const dtype& data = msg->getDataItem(i);
       clientObj_->process(data);
     }
 
@@ -941,14 +941,14 @@ private:
     delete msg;
   }
 
-  void localDeliver(dtype &dataItem) {
+  void localDeliver(const dtype& dataItem) {
     clientObj_->process(dataItem);
     if (MeshStreamer<dtype>::useStagedCompletion_ == false) {
       MeshStreamer<dtype>::detectorLocalObj_->consume();
     }
   }
 
-  void localBroadcast(dtype &dataItem) {
+  void localBroadcast(const dtype& dataItem) {
     localDeliver(dataItem); 
   }
 
@@ -969,7 +969,7 @@ public:
 
   GroupMeshStreamer(int maxNumDataItemsBuffered, int numDimensions,
                    int *dimensionSizes, 
-                   const CProxy_MeshStreamerGroupClient<dtype> &clientProxy,
+                   const CProxy_MeshStreamerGroupClient<dtype>clientProxy,
                    bool yieldFlag = 0, double progressPeriodInMs = -1.0)
    :MeshStreamer<dtype>(maxNumDataItemsBuffered, numDimensions, dimensionSizes,
                         0, yieldFlag, progressPeriodInMs) 
@@ -980,7 +980,7 @@ public:
   }
 
   GroupMeshStreamer(int numDimensions, int *dimensionSizes, 
-                   const CProxy_MeshStreamerGroupClient<dtype> &clientProxy,
+                   const CProxy_MeshStreamerGroupClient<dtype>clientProxy,
                    int bufferSize, bool yieldFlag = 0, 
                     double progressPeriodInMs = -1.0)
    :MeshStreamer<dtype>(0, numDimensions, dimensionSizes, bufferSize, 
@@ -1005,7 +1005,7 @@ public:
     : detectorLocalObj_(detectorObj), clientArrMgr_(clientArrMgr) {}
 
   // CkLocMgr::iterate will call addLocation on all elements local to this PE
-  void addLocation(CkLocation &loc) {
+  void addLocation(CkLocationloc) {
 
     MeshStreamerArrayClient<dtype> *clientObj = 
       (MeshStreamerArrayClient<dtype> *) clientArrMgr_->lookup(loc.getIndex());
@@ -1021,19 +1021,18 @@ class LocalBroadcaster : public CkLocIterator {
 
 public:
   CkArray *clientArrMgr_;
-  dtype *dataItem_; 
+  const dtype *dataItem_; 
 
-  LocalBroadcaster(CkArray *clientArrMgr, dtype *dataItem) 
+  LocalBroadcaster(CkArray *clientArrMgr, const dtype *dataItem) 
    : clientArrMgr_(clientArrMgr), dataItem_(dataItem) {}
 
-  void addLocation(CkLocation &loc) {
+  void addLocation(CkLocationloc) {
     MeshStreamerArrayClient<dtype> *clientObj = 
       (MeshStreamerArrayClient<dtype> *) clientArrMgr_->lookup(loc.getIndex());
 
     CkAssert(clientObj != NULL); 
 
-    dtype dataItemCopy = *dataItem_; 
-    clientObj->process(dataItemCopy); 
+    clientObj->process(*dataItem_); 
   }
 
 };
@@ -1053,7 +1052,7 @@ private:
   bool *isCachedArrayMetadata_;
 #endif
 
-  void localDeliver(ArrayDataItem<dtype, itype> &packedDataItem) {
+  void localDeliver(const ArrayDataItem<dtype, itype>& packedDataItem) {
     itype arrayId = packedDataItem.arrayIndex; 
     if (arrayId == itype(TRAM_BROADCAST)) {
       localBroadcast(packedDataItem);
@@ -1080,7 +1079,7 @@ private:
     }
   }
 
-  void localBroadcast(ArrayDataItem<dtype, itype> &packedDataItem) {
+  void localBroadcast(const ArrayDataItem<dtype, itype>& packedDataItem) {
 
     LocalBroadcaster<dtype> clientIterator(clientProxy_.ckLocalBranch(), 
                                            &packedDataItem.dataItem);
@@ -1146,12 +1145,12 @@ public:
 
   struct DataItemHandle {
     itype arrayIndex; 
-    dtype *dataItem;
+    const dtype *dataItem;
   };
 
   ArrayMeshStreamer(int maxNumDataItemsBuffered, int numDimensions,
                    int *dimensionSizes, 
-                    const CProxy_MeshStreamerArrayClient<dtype> &clientProxy,
+                    const CProxy_MeshStreamerArrayClient<dtype>clientProxy,
                    bool yieldFlag = 0, double progressPeriodInMs = -1.0)
     :MeshStreamer<ArrayDataItem<dtype, itype> >(
                   maxNumDataItemsBuffered, numDimensions, dimensionSizes, 
@@ -1163,7 +1162,7 @@ public:
   }
 
   ArrayMeshStreamer(int numDimensions, int *dimensionSizes, 
-                   const CProxy_MeshStreamerArrayClient<dtype> &clientProxy,
+                   const CProxy_MeshStreamerArrayClient<dtype>clientProxy,
                    int bufferSize, bool yieldFlag = 0, 
                     double progressPeriodInMs = -1.0)
     :MeshStreamer<ArrayDataItem<dtype,itype> >(
@@ -1197,7 +1196,7 @@ public:
        MeshStreamerMessage<ArrayDataItem<dtype, itype> > *msg) {
 
     for (int i = 0; i < msg->numDataItems; i++) {
-      ArrayDataItem<dtype, itype> &packedData = msg->getDataItem(i);
+      const ArrayDataItem<dtype, itype>& packedData = msg->getDataItem(i);
       localDeliver(packedData);
     }
     if (MeshStreamer<ArrayDataItem<dtype, itype> >::useStagedCompletion_) {
@@ -1207,7 +1206,7 @@ public:
     delete msg;
   }
 
-  void broadcast(dtype &dataItem) {
+  void broadcast(const dtype& dataItem) {
     const static bool copyIndirectly = true;
 
     // no data items should be submitted after all local contributors call done
@@ -1222,9 +1221,7 @@ public:
     }
 
     // deliver locally
-    ArrayDataItem<dtype, itype> packedDataItem;
-    packedDataItem.arrayIndex = TRAM_BROADCAST; 
-    packedDataItem.dataItem = dataItem;
+    ArrayDataItem<dtype, itype>& packedDataItem(TRAM_BROADCAST, dataItem);
     localBroadcast(packedDataItem);
 
     DataItemHandle tempHandle; 
@@ -1237,7 +1234,7 @@ public:
       broadcast(&tempHandle, numDimensions - 1, copyIndirectly);
   }
 
-  void insertData(dtype &dataItem, itype arrayIndex) {
+  void insertData(const dtype& dataItem, itype arrayIndex) {
 
     // no data items should be submitted after all local contributors call done
     // and staged completion has begun
@@ -1264,18 +1261,13 @@ public:
     clientArrayMgr_->lastKnown(clientProxy_[arrayIndex].ckGetIndex());
 #endif
 
-  ArrayDataItem<dtype, itype> packedDataItem;
     if (destinationPe == CkMyPe()) {
-      // copying here is necessary - user code should not be 
-      // passed back a reference to the original item
-      packedDataItem.arrayIndex = arrayIndex; 
-      packedDataItem.dataItem = dataItem;
+      ArrayDataItem<dtype, itype> packedDataItem(arrayIndex, dataItem);
       localDeliver(packedDataItem);
       return;
     }
 
     // this implementation avoids copying an item before transfer into message
-
     DataItemHandle tempHandle; 
     tempHandle.arrayIndex = arrayIndex; 
     tempHandle.dataItem = &dataItem;
@@ -1287,12 +1279,13 @@ public:
 
   int copyDataItemIntoMessage(
       MeshStreamerMessage<ArrayDataItem <dtype, itype> > *destinationBuffer, 
-      void *dataItemHandle, bool copyIndirectly) {
+      const void *dataItemHandle, bool copyIndirectly) {
 
     if (copyIndirectly == true) {
       // newly inserted items are passed through a handle to avoid copying
       int numDataItems = destinationBuffer->numDataItems;
-      DataItemHandle *tempHandle = (DataItemHandle *) dataItemHandle;
+      const DataItemHandle *tempHandle = 
+        (const DataItemHandle *) dataItemHandle;
       (destinationBuffer->dataItems)[numDataItems].dataItem = 
        *(tempHandle->dataItem);
       (destinationBuffer->dataItems)[numDataItems].arrayIndex =