New API for sending arrays of data items.
authorLukasz Wesolowski <wesolwsk@illinois.edu>
Tue, 23 Oct 2012 06:40:28 +0000 (01:40 -0500)
committerLukasz Wesolowski <wesolwsk@illinois.edu>
Wed, 24 Oct 2012 18:31:39 +0000 (13:31 -0500)
src/libs/ck-libs/NDMeshStreamer/DataItemTypes.h
src/libs/ck-libs/NDMeshStreamer/NDMeshStreamer.ci
src/libs/ck-libs/NDMeshStreamer/NDMeshStreamer.h

index 73d77a21f0a5d9d10dc417d1d4da49e37dab6210..dc046cf40144f8cdece95286255bb6eb81212487 100644 (file)
@@ -1,13 +1,42 @@
 #ifndef DATA_ITEM_TYPES_H
 #define DATA_ITEM_TYPES_H
 
+#define CHUNK_SIZE 256
+
 template<class dtype, class itype>
-class ArrayDataItem{
- public:
+class ArrayDataItem {
+
+public:
   itype arrayIndex;
   dtype dataItem;
 
   ArrayDataItem(itype i, const dtype d) : arrayIndex(i), dataItem(d) {}
 };
 
+class ChunkDataItem {
+
+public:
+  short chunkSize;
+  int sourcePe; 
+  int chunkNumber; 
+  int numChunks;  
+  int numItems;
+  char rawData[CHUNK_SIZE];
+  
+  ChunkDataItem& operator=(const ChunkDataItem &rhs) {
+    
+    if (this != &rhs) {      
+      chunkSize = rhs.chunkSize; 
+      sourcePe = rhs.sourcePe;
+      chunkNumber = rhs.chunkNumber; 
+      numChunks = rhs.numChunks;
+      numItems = rhs.numItems;
+      memcpy(rawData, rhs.rawData, CHUNK_SIZE);
+    }
+
+    return *this;
+  }
+  
+};
+
 #endif
index 991ce535ec7c2e43b25c258ccde1cc546b23e3ee..7e020d0dc8d0ec41231ebf0ba919f9c1f1ea3bce 100644 (file)
@@ -63,5 +63,21 @@ module NDMeshStreamer {
          double progressPeriodInMs = -1.0);
   };
 
+  template<class dtype>
+    group GroupChunkMeshStreamer : GroupMeshStreamer<ChunkDataItem> {
+    entry GroupChunkMeshStreamer(
+         int maxNumDataItemsBuffered, int numDimensions, 
+         int dimensionSizes[numDimensions], 
+         const CProxy_MeshStreamerGroupClient<dtype> &clientProxy,
+         bool yieldFlag = 0, double progressPeriodInMs = -1.0);
+
+    entry GroupChunkMeshStreamer(
+          int numDimensions, int dimensionSizes[numDimensions], 
+         const CProxy_MeshStreamerGroupClient<dtype> &clientProxy,
+         int bufferSize, bool yieldFlag = 0, 
+         double progressPeriodInMs = -1.0);
+
+  };
+
 };
 
index 7dd98e6587f7bdf28b461caadd45cc12fcd39a5c..57fb05b3989efd065ee9d64a93332a476b065a8c 100644 (file)
@@ -95,7 +95,11 @@ class MeshStreamerGroupClient : public CBase_MeshStreamerGroupClient<dtype>{
 
 public:
   virtual void process(const dtype& data) = 0;
-
+  virtual void receiveArray(dtype *data, int numItems) {
+    for (int i = 0; i < numItems; i++) {
+      process(data[i]);
+    }
+  }
 };
 
 template <class dtype>
@@ -1303,6 +1307,113 @@ public:
 
 };
 
+template <class dtype>
+class GroupChunkMeshStreamer : 
+  public GroupMeshStreamer<ChunkDataItem> {
+
+private:
+  ChunkDataItem **receiveBuffers;
+  int *receivedChunks;
+
+public:
+
+  GroupChunkMeshStreamer(
+       int maxNumDataItemsBuffered, int numDimensions,
+       int *dimensionSizes, 
+       const CProxy_MeshStreamerGroupClient<dtype>& clientProxy,
+       bool yieldFlag = 0, double progressPeriodInMs = -1.0)
+    :GroupMeshStreamer<ChunkDataItem>(maxNumDataItemsBuffered, 
+                                      numDimensions, dimensionSizes,
+                                      0, yieldFlag, progressPeriodInMs) {
+    
+    receiveBuffers = new ChunkDataItem*[CkNumPes()];
+    receivedChunks = new int[CkNumPes()]; 
+    memset(receivedChunks, 0, CkNumPes() * sizeof(int));
+    memset(receiveBuffers, 0, CkNumPes() * sizeof(ChunkDataItem*)); 
+  }
+
+  GroupChunkMeshStreamer(
+       int numDimensions, int *dimensionSizes, 
+       const CProxy_MeshStreamerGroupClient<dtype>& clientProxy,
+       int bufferSize, bool yieldFlag = 0, 
+       double progressPeriodInMs = -1.0)
+    :GroupMeshStreamer<ChunkDataItem>(0, numDimensions, dimensionSizes, 
+                                      bufferSize, yieldFlag, 
+                                      progressPeriodInMs) {}
+
+  void insertData(dtype *dataArray, int numElements, int destinationPe) {
+
+    char *inputData = (char *) dataArray; 
+    int arraySizeInBytes = numElements * sizeof(dtype); 
+    ChunkDataItem chunk;
+    int chunkNumber = 0; 
+    chunk.sourcePe = CkMyPe();
+    chunk.chunkNumber = 0; 
+    chunk.chunkSize = CHUNK_SIZE;
+    chunk.numChunks = numElements * sizeof(dtype) / CHUNK_SIZE; 
+    chunk.numItems = numElements; 
+    for (int offset = 0; offset < arraySizeInBytes; offset += CHUNK_SIZE) {
+
+      if (offset + CHUNK_SIZE > arraySizeInBytes) {
+        chunk.chunkSize = arraySizeInBytes - offset; 
+        memset(chunk.rawData, 0, CHUNK_SIZE);
+        memcpy(chunk.rawData + offset, inputData + offset, chunk.chunkSize); 
+      }
+      else {
+        memcpy(chunk.rawData, inputData + offset, CHUNK_SIZE);
+      }
+
+    }
+
+    insertData(chunk, destinationPe); 
+
+  }
+
+  void receiveAtDestination(
+       MeshStreamerMessage<ChunkDataItem> *msg) {
+
+    for (int i = 0; i < msg->numDataItems; i++) {
+      const ChunkDataItem& chunk = msg->getDataItem(i);
+      
+      if (receiveBuffers[chunk.sourcePe] == NULL) {
+        receiveBuffers[chunk.sourcePe] = new dtype[chunk.numItems]; 
+      }      
+
+      char *receiveBuffer = &receiveBuffers[chunk.sourcePe];
+
+      memcpy(receiveBuffer + chunk.chunkNumber * sizeof(dtype), 
+             chunk.rawData, chunk.chunkSize);
+      if (++receivedChunks[chunk.sourcePe] == chunk.numChunks) {
+
+        clientObj_->receiveArray((dtype *) receiveBuffer, chunk.numItems);
+        receivedChunks[chunk.sourcePe] = 0;        
+        delete [] receiveBuffers[chunk.sourcePe]; 
+        receiveBuffers[chunk.sourcePe] = NULL;
+      }
+      
+    }
+
+    if (MeshStreamer<dtype>::useStagedCompletion_) {
+#ifdef STREAMER_VERBOSE_OUTPUT
+      envelope *env = UsrToEnv(msg);
+      CkPrintf("[%d] received at dest from %d %d items finalMsgCount: %d\n", 
+               CkMyPe(), env->getSrcPe(), msg->numDataItems, 
+               msg->finalMsgCount);  
+#endif
+      markMessageReceived(msg->dimension, msg->finalMsgCount); 
+    }
+    else {
+      this->detectorLocalObj_->consume(msg->numDataItems);    
+    }
+
+    delete msg;
+    
+  }
+
+};
+
+
+
 #define CK_TEMPLATES_ONLY
 #include "NDMeshStreamer.def.h"
 #undef CK_TEMPLATES_ONLY