NDMeshStreamer: bug fix for ArrayMeshStreamer - items received
authorLukasz Wesolowski <wesolwsk@illinois.edu>
Tue, 6 Mar 2012 22:25:33 +0000 (16:25 -0600)
committerLukasz Wesolowski <wesolwsk@illinois.edu>
Tue, 6 Mar 2012 22:29:57 +0000 (16:29 -0600)
along the route to the destination were being treated as handles
instead of actual data

src/libs/ck-libs/NDMeshStreamer/NDMeshStreamer.h

index 6b01994848cc99534129d3fcee3d86bd4e445cf9..1d35bf4b1d08cf1999344526fcd57fd0ff22aded 100644 (file)
@@ -94,11 +94,7 @@ private:
 
     void storeMessage(int destinationPe, 
                      const MeshLocation &destinationCoordinates, 
-                     void *dataItem);
-
-    virtual int copyDataItemIntoMessage(
-               MeshStreamerMessage<dtype> *destinationBuffer, 
-               void *dataItemHandle);
+                     void *dataItem, bool copyIndirectly = false);
 
     virtual void deliverToDestination(
                  int destinationPe, 
@@ -108,6 +104,12 @@ private:
 
     void flushLargestBuffer();
 
+protected:
+
+    virtual int copyDataItemIntoMessage(
+               MeshStreamerMessage<dtype> *destinationBuffer, 
+               void *dataItemHandle, bool copyIndirectly = false);
+
 public:
 
     MeshStreamer(int totalBufferCapacity, int numDimensions, 
@@ -125,7 +127,7 @@ public:
       return isPeriodicFlushEnabled_;
     }
     virtual void insertData(dtype &dataItem, int destinationPe); 
-    void insertData(void *dataItem, int destinationPe);
+    void insertData(void *dataItemHandle, int destinationPe);
     void doneInserting();
     void associateCallback(CkCallback &cb, bool automaticFinish = true) { 
       userCallback_ = cb;
@@ -284,7 +286,7 @@ template <class dtype>
 inline 
 int MeshStreamer<dtype>::copyDataItemIntoMessage(
                         MeshStreamerMessage<dtype> *destinationBuffer,
-                        void *dataItemHandle) {
+                        void *dataItemHandle, bool copyIndirectly) {
   return destinationBuffer->addDataItem(*((dtype *)dataItemHandle)); 
 }
 
@@ -293,7 +295,7 @@ inline
 void MeshStreamer<dtype>::storeMessage(
                          int destinationPe, 
                          const MeshLocation& destinationLocation,
-                         void *dataItem) {
+                         void *dataItem, bool copyIndirectly) {
 
   int dimension = destinationLocation.dimension;
   int bufferIndex = destinationLocation.bufferIndex; 
@@ -319,7 +321,7 @@ void MeshStreamer<dtype>::storeMessage(
   
   MeshStreamerMessage<dtype> *destinationBuffer = messageBuffers[bufferIndex];
   int numBuffered = 
-    copyDataItemIntoMessage(destinationBuffer, dataItem);
+    copyDataItemIntoMessage(destinationBuffer, dataItem, copyIndirectly);
   if (dimension != 0) {
     destinationBuffer->markDestination(numBuffered-1, destinationPe);
   }  
@@ -361,11 +363,13 @@ void MeshStreamer<dtype>::storeMessage(
 }
 
 template <class dtype>
-void MeshStreamer<dtype>::insertData(void *dataItem, int destinationPe) {
+void MeshStreamer<dtype>::insertData(void *dataItemHandle, int destinationPe) {
   static int count = 0;
+  const static bool copyIndirectly = true;
 
   MeshLocation destinationLocation = determineLocation(destinationPe);
-  storeMessage(destinationPe, destinationLocation, dataItem); 
+  storeMessage(destinationPe, destinationLocation, dataItemHandle, 
+              copyIndirectly); 
 
   // release control to scheduler if requested by the user, 
   //   assume caller is threaded entry
@@ -680,15 +684,23 @@ public:
 
   int copyDataItemIntoMessage(
       MeshStreamerMessage<ArrayDataItem <dtype> > *destinationBuffer, 
-      void *dataItemHandle) {
-
-    int numDataItems = destinationBuffer->numDataItems;
-    DataItemHandle *tempHandle = (DataItemHandle *) dataItemHandle;
-    (destinationBuffer->data)[numDataItems].dataItem = 
-      *(tempHandle->dataItem);
-    (destinationBuffer->data)[numDataItems].arrayIndex = 
-      tempHandle->arrayIndex;
-    return ++destinationBuffer->numDataItems;
+      void *dataItemHandle, bool copyIndirectly) {
+
+    if (copyIndirectly == true) {
+      // newly inserted items are passed through a handle to avoid copying
+      int numDataItems = destinationBuffer->numDataItems;
+      DataItemHandle *tempHandle = (DataItemHandle *) dataItemHandle;
+      (destinationBuffer->data)[numDataItems].dataItem = 
+       *(tempHandle->dataItem);
+      (destinationBuffer->data)[numDataItems].arrayIndex = 
+       tempHandle->arrayIndex;
+      return ++destinationBuffer->numDataItems;
+    }
+    else {
+      // this is an item received along the route to destination
+      // we can copy it from the received message
+      return MeshStreamer<ArrayDataItem<dtype> >::copyDataItemIntoMessage(destinationBuffer, dataItemHandle);
+    }
   }
 
 };