In the new chunk streamer, process chunks correctly at intermediate destination.
authorLukasz Wesolowski <wesolwsk@illinois.edu>
Tue, 23 Oct 2012 19:44:13 +0000 (14:44 -0500)
committerLukasz Wesolowski <wesolwsk@illinois.edu>
Wed, 24 Oct 2012 18:31:40 +0000 (13:31 -0500)
src/libs/ck-libs/NDMeshStreamer/NDMeshStreamer.h

index 57fb05b3989efd065ee9d64a93332a476b065a8c..f607e05b9a5e542f0fce18c5079a79d387688747 100644 (file)
@@ -95,7 +95,7 @@ class MeshStreamerGroupClient : public CBase_MeshStreamerGroupClient<dtype>{
 
 public:
   virtual void process(const dtype& data) = 0;
-  virtual void receiveArray(dtype *data, int numItems) {
+  virtual void receiveArray(dtype *data, int numItems, int sourcePe) {
     for (int i = 0; i < numItems; i++) {
       process(data[i]);
     }
@@ -1211,7 +1211,7 @@ public:
     delete msg;
   }
 
-  void broadcast(const dtype& dataItem) {
+  inline void broadcast(const dtype& dataItem) {
     const static bool copyIndirectly = true;
 
     // no data items should be submitted after all local contributors call done
@@ -1239,7 +1239,7 @@ public:
       broadcast(&tempHandle, numDimensions - 1, copyIndirectly);
   }
 
-  void insertData(const dtype& dataItem, itype arrayIndex) {
+  inline void insertData(const dtype& dataItem, itype arrayIndex) {
 
     // no data items should be submitted after all local contributors call done
     // and staged completion has begun
@@ -1282,7 +1282,7 @@ public:
 
   }
 
-  int copyDataItemIntoMessage(
+  inline int copyDataItemIntoMessage(
       MeshStreamerMessage<ArrayDataItem <dtype, itype> > *destinationBuffer, 
       const void *dataItemHandle, bool copyIndirectly) {
 
@@ -1341,7 +1341,7 @@ public:
                                       bufferSize, yieldFlag, 
                                       progressPeriodInMs) {}
 
-  void insertData(dtype *dataArray, int numElements, int destinationPe) {
+  inline void insertData(dtype *dataArray, int numElements, int destinationPe) {
 
     char *inputData = (char *) dataArray; 
     int arraySizeInBytes = numElements * sizeof(dtype); 
@@ -1369,28 +1369,38 @@ public:
 
   }
 
-  void receiveAtDestination(
-       MeshStreamerMessage<ChunkDataItem> *msg) {
+  inline void processChunk(const ChunkDataItem& chunk) {
 
-    for (int i = 0; i < msg->numDataItems; i++) {
-      const ChunkDataItem& chunk = msg->getDataItem(i);
-      
-      if (receiveBuffers[chunk.sourcePe] == NULL) {
-        receiveBuffers[chunk.sourcePe] = new dtype[chunk.numItems]; 
-      }      
+    if (receiveBuffers[chunk.sourcePe] == NULL) {
+      receiveBuffers[chunk.sourcePe] = new dtype[chunk.numItems]; 
+    }      
 
-      char *receiveBuffer = &receiveBuffers[chunk.sourcePe];
+    char *receiveBuffer = &receiveBuffers[chunk.sourcePe];
 
-      memcpy(receiveBuffer + chunk.chunkNumber * sizeof(dtype), 
-             chunk.rawData, chunk.chunkSize);
-      if (++receivedChunks[chunk.sourcePe] == chunk.numChunks) {
+    memcpy(receiveBuffer + chunk.chunkNumber * sizeof(dtype), 
+           chunk.rawData, chunk.chunkSize);
+    if (++receivedChunks[chunk.sourcePe] == chunk.numChunks) {
+      clientObj_->receiveArray((dtype *) receiveBuffer, chunk.numItems, chunk.sourcePe);
+      receivedChunks[chunk.sourcePe] = 0;        
+      delete [] receiveBuffers[chunk.sourcePe]; 
+      receiveBuffers[chunk.sourcePe] = NULL;
+    }
 
-        clientObj_->receiveArray((dtype *) receiveBuffer, chunk.numItems);
-        receivedChunks[chunk.sourcePe] = 0;        
-        delete [] receiveBuffers[chunk.sourcePe]; 
-        receiveBuffers[chunk.sourcePe] = NULL;
-      }
-      
+  }
+
+  inline void localDeliver(const ChunkDataItem& chunk) {
+    processChunk(chunk);
+    if (MeshStreamer<dtype>::useStagedCompletion_ == false) {
+      MeshStreamer<dtype>::detectorLocalObj_->consume();
+    }
+  }
+
+  inline void receiveAtDestination(
+       MeshStreamerMessage<ChunkDataItem> *msg) {
+
+    for (int i = 0; i < msg->numDataItems; i++) {
+      const ChunkDataItem& chunk = msg->getDataItem(i);
+      processChunk(chunk);             
     }
 
     if (MeshStreamer<dtype>::useStagedCompletion_) {