Various bug fixes in the new chunk mesh streamer.
authorLukasz Wesolowski <wesolwsk@illinois.edu>
Tue, 23 Oct 2012 23:32:45 +0000 (18:32 -0500)
committerLukasz Wesolowski <wesolwsk@illinois.edu>
Wed, 24 Oct 2012 18:31:40 +0000 (13:31 -0500)
src/libs/ck-libs/NDMeshStreamer/NDMeshStreamer.ci
src/libs/ck-libs/NDMeshStreamer/NDMeshStreamer.h

index 7e020d0dc8d0ec41231ebf0ba919f9c1f1ea3bce..fda6a96476fbc0c75fcddd16ae7cb4f6a7283dc9 100644 (file)
@@ -64,7 +64,7 @@ module NDMeshStreamer {
   };
 
   template<class dtype>
-    group GroupChunkMeshStreamer : GroupMeshStreamer<ChunkDataItem> {
+    group GroupChunkMeshStreamer : MeshStreamer<ChunkDataItem> {
     entry GroupChunkMeshStreamer(
          int maxNumDataItemsBuffered, int numDimensions, 
          int dimensionSizes[numDimensions], 
index 8a0d5b33b58f42128ae2d6e79d1898218b108b9a..1736a179713a05d438f48a358e6a9bda4660491c 100644 (file)
@@ -16,7 +16,7 @@
 // #define CACHE_LOCATIONS
 // #define SUPPORT_INCOMPLETE_MESH
 // #define CACHE_ARRAY_METADATA // only works for 1D array clients
-// #define STREAMER_VERBOSE_OUTPUT
+#define STREAMER_VERBOSE_OUTPUT
 
 #define TRAM_BROADCAST (-100)
 
@@ -955,27 +955,27 @@ private:
     delete msg;
   }
 
-  void localDeliver(const dtype& dataItem) {
+  inline void localDeliver(const dtype& dataItem) {
     clientObj_->process(dataItem);
     if (MeshStreamer<dtype>::useStagedCompletion_ == false) {
       MeshStreamer<dtype>::detectorLocalObj_->consume();
     }
   }
 
-  void localBroadcast(const dtype& dataItem) {
+  inline void localBroadcast(const dtype& dataItem) {
     localDeliver(dataItem); 
   }
 
-  int numElementsInClient() {
+  inline int numElementsInClient() {
     // client is a group - there is one element per PE
     return CkNumPes();
   }
 
-  int numLocalElementsInClient() {
+  inline int numLocalElementsInClient() {
     return 1; 
   }
 
-  void initLocalClients() {
+  inline void initLocalClients() {
     // no action required
   }
 
@@ -1309,11 +1309,14 @@ public:
 
 template <class dtype>
 class GroupChunkMeshStreamer : 
-  public GroupMeshStreamer<ChunkDataItem> {
+  public MeshStreamer<ChunkDataItem> {
 
 private:
-  ChunkDataItem **receiveBuffers;
+  dtype **receiveBuffers;
   int *receivedChunks;
+  CProxy_MeshStreamerGroupClient<dtype> clientProxy_;
+  MeshStreamerGroupClient<dtype> *clientObj_;
+
 
 public:
 
@@ -1322,14 +1325,19 @@ public:
        int *dimensionSizes, 
        const CProxy_MeshStreamerGroupClient<dtype>& clientProxy,
        bool yieldFlag = 0, double progressPeriodInMs = -1.0)
-    :GroupMeshStreamer<ChunkDataItem>(maxNumDataItemsBuffered, 
-                                      numDimensions, dimensionSizes,
-                                      0, yieldFlag, progressPeriodInMs) {
+    :MeshStreamer<ChunkDataItem>(maxNumDataItemsBuffered, 
+                                 numDimensions, dimensionSizes,
+                                 0, yieldFlag, progressPeriodInMs) {
     
-    receiveBuffers = new ChunkDataItem*[CkNumPes()];
+    receiveBuffers = new dtype*[CkNumPes()];
     receivedChunks = new int[CkNumPes()]; 
     memset(receivedChunks, 0, CkNumPes() * sizeof(int));
-    memset(receiveBuffers, 0, CkNumPes() * sizeof(ChunkDataItem*)); 
+    memset(receiveBuffers, 0, CkNumPes() * sizeof(dtype*)); 
+
+    clientProxy_ = clientProxy; 
+    clientObj_ = 
+      ((MeshStreamerGroupClient<dtype> *)CkLocalBranch(clientProxy_));
+
   }
 
   GroupChunkMeshStreamer(
@@ -1337,9 +1345,20 @@ public:
        const CProxy_MeshStreamerGroupClient<dtype>& clientProxy,
        int bufferSize, bool yieldFlag = 0, 
        double progressPeriodInMs = -1.0)
-    :GroupMeshStreamer<ChunkDataItem>(0, numDimensions, dimensionSizes, 
-                                      bufferSize, yieldFlag, 
-                                      progressPeriodInMs) {}
+    :MeshStreamer<ChunkDataItem>(0, numDimensions, dimensionSizes, 
+                                 bufferSize, yieldFlag, 
+                                 progressPeriodInMs) {
+
+    receiveBuffers = new dtype*[CkNumPes()];
+    receivedChunks = new int[CkNumPes()]; 
+    memset(receivedChunks, 0, CkNumPes() * sizeof(int));
+    memset(receiveBuffers, 0, CkNumPes() * sizeof(dtype*)); 
+
+    clientProxy_ = clientProxy; 
+    clientObj_ = 
+      ((MeshStreamerGroupClient<dtype> *)CkLocalBranch(clientProxy_));
+
+  }
 
   inline void insertData(dtype *dataArray, int numElements, int destinationPe) {
 
@@ -1350,14 +1369,14 @@ public:
     chunk.sourcePe = CkMyPe();
     chunk.chunkNumber = 0; 
     chunk.chunkSize = CHUNK_SIZE;
-    chunk.numChunks = ceil ( (float) numElements * sizeof(dtype) / CHUNK_SIZE); 
+    chunk.numChunks = (int) ceil ( (float) numElements * sizeof(dtype) / CHUNK_SIZE); 
     chunk.numItems = numElements; 
     for (int offset = 0; offset < arraySizeInBytes; offset += CHUNK_SIZE) {
 
       if (offset + CHUNK_SIZE > arraySizeInBytes) {
         chunk.chunkSize = arraySizeInBytes - offset; 
         memset(chunk.rawData, 0, CHUNK_SIZE);
-        memcpy(chunk.rawData + offset, inputData + offset, chunk.chunkSize); 
+        memcpy(chunk.rawData, inputData + offset, chunk.chunkSize); 
       }
       else {
         memcpy(chunk.rawData, inputData + offset, CHUNK_SIZE);
@@ -1365,7 +1384,7 @@ public:
 
     }
 
-    insertData(chunk, destinationPe); 
+    MeshStreamer<ChunkDataItem>::insertData(chunk, destinationPe); 
 
   }
 
@@ -1375,7 +1394,7 @@ public:
       receiveBuffers[chunk.sourcePe] = new dtype[chunk.numItems]; 
     }      
 
-    char *receiveBuffer = &receiveBuffers[chunk.sourcePe];
+    char *receiveBuffer = (char *) &receiveBuffers[chunk.sourcePe];
 
     memcpy(receiveBuffer + chunk.chunkNumber * sizeof(dtype), 
            chunk.rawData, chunk.chunkSize);
@@ -1390,8 +1409,8 @@ public:
 
   inline void localDeliver(const ChunkDataItem& chunk) {
     processChunk(chunk);
-    if (MeshStreamer<dtype>::useStagedCompletion_ == false) {
-      MeshStreamer<dtype>::detectorLocalObj_->consume();
+    if (MeshStreamer<ChunkDataItem>::useStagedCompletion_ == false) {
+      MeshStreamer<ChunkDataItem>::detectorLocalObj_->consume();
     }
   }
 
@@ -1403,7 +1422,7 @@ public:
       processChunk(chunk);             
     }
 
-    if (MeshStreamer<dtype>::useStagedCompletion_) {
+    if (MeshStreamer<ChunkDataItem>::useStagedCompletion_) {
 #ifdef STREAMER_VERBOSE_OUTPUT
       envelope *env = UsrToEnv(msg);
       CkPrintf("[%d] received at dest from %d %d items finalMsgCount: %d\n", 
@@ -1420,6 +1439,23 @@ public:
     
   }
 
+  inline void localBroadcast(const ChunkDataItem& dataItem) {
+    localDeliver(dataItem); 
+  }
+
+  inline int numElementsInClient() {
+    // client is a group - there is one element per PE
+    return CkNumPes();
+  }
+
+  inline int numLocalElementsInClient() {
+    return 1; 
+  }
+
+  inline void initLocalClients() {
+    // no action required
+  }
+
 };