NDMeshStreamer: checking in a working version of the array interface
authorLukasz Wesolowski <wesolwsk@illinois.edu>
Mon, 27 Feb 2012 01:18:56 +0000 (19:18 -0600)
committerLukasz Wesolowski <wesolwsk@illinois.edu>
Mon, 27 Feb 2012 01:20:37 +0000 (19:20 -0600)
src/libs/ck-libs/NDMeshStreamer/DataItemTypes.h [new file with mode: 0644]
src/libs/ck-libs/NDMeshStreamer/Makefile
src/libs/ck-libs/NDMeshStreamer/NDMeshStreamer.ci
src/libs/ck-libs/NDMeshStreamer/NDMeshStreamer.h

diff --git a/src/libs/ck-libs/NDMeshStreamer/DataItemTypes.h b/src/libs/ck-libs/NDMeshStreamer/DataItemTypes.h
new file mode 100644 (file)
index 0000000..c5b81fc
--- /dev/null
@@ -0,0 +1,11 @@
+#ifndef DATA_ITEM_TYPES_H
+#define DATA_ITEM_TYPES_H
+
+template<class dtype>
+class ArrayDataItem{
+ public:
+  int arrayIndex;
+  dtype dataItem;
+};
+
+#endif
index 39e8203d0e4dc62645c8c8acbf323f36e2f808d9..0c4229563272ec1461614d1188ec6acb8edc1804 100644 (file)
@@ -1,7 +1,7 @@
 CDIR=../../../..
 CHARMC=$(CDIR)/bin/charmc $(OPTS)
 
-HEADERS=NDMeshStreamer.h NDMeshStreamer.decl.h NDMeshStreamer.def.h
+HEADERS=NDMeshStreamer.h DataItemTypes.h NDMeshStreamer.decl.h NDMeshStreamer.def.h
 OBJS=NDMeshStreamer.o
 LIB=libmoduleNDMeshStreamer
 
index 4043b7ae88a48774120ffd529b659d8b81063602..629d992c75c2d59189b02933ead7bdab52a1d391 100644 (file)
@@ -1,15 +1,22 @@
 module NDMeshStreamer {
 
+  include "DataItemTypes.h";
+
   template<class dtype> 
   message MeshStreamerMessage {
     int destinationPes[];
     dtype data[]; 
   };
 
-  template<class dtype> group MeshStreamerClient {
+  template<class dtype> group MeshStreamerGroupClient {
     entry void receiveCombinedData(MeshStreamerMessage<dtype> *msg);
   };
 
+  template<class dtype> array [1D] MeshStreamerArrayClient {
+    // entry void receiveCombinedData(MeshStreamerMessage<dtype> *msg);
+    entry void process(dtype &data);
+  };
+
   template<class dtype> 
   group MeshStreamer {
     entry void receiveAlongRoute(MeshStreamerMessage<dtype> *msg);
@@ -19,11 +26,25 @@ module NDMeshStreamer {
 
   template<class dtype>
   group GroupMeshStreamer : MeshStreamer<dtype> {
-    entry GroupMeshStreamer(int totalBufferCapacity, int numDimensions, 
-                           int dimensionSizes[numDimensions]
-                           const CProxy_MeshStreamerClient<dtype> &clientProxy,
-                           int yieldFlag = 0, 
-                           double progressPeriodInMs = -1.0);
+    entry GroupMeshStreamer(
+         int totalBufferCapacity, int numDimensions
+         int dimensionSizes[numDimensions], 
+         const CProxy_MeshStreamerGroupClient<dtype> &clientProxy,
+         int yieldFlag = 0, double progressPeriodInMs = -1.0);
   };
+
+
+  template<class dtype>
+    group ArrayMeshStreamer : MeshStreamer<ArrayDataItem<dtype> > {
+    entry ArrayMeshStreamer(
+         int totalBufferCapacity, int numDimensions, 
+         int dimensionSizes[numDimensions],
+         const CProxy_MeshStreamerArrayClient<dtype> &clientProxy,
+         int yieldFlag = 0, double progressPeriodInMs = -1.0);
+
+    entry void receiveArrayData(
+              MeshStreamerMessage<ArrayDataItem<dtype> > *msg); 
+  };
+
 };
 
index 32cf73b46d3bc11b70a0ec22270d38f597208ddf..4cf955163eda1293fa61b4cda2bcd4fa4fcea2d4 100644 (file)
@@ -1,8 +1,9 @@
-#ifndef _NDMESH_STREAMER_H_
-#define _NDMESH_STREAMER_H_
+#ifndef NDMESH_STREAMER_H_
+#define NDMESH_STREAMER_H_
 
 #include <algorithm>
 #include "NDMeshStreamer.decl.h"
+#include "DataItemTypes.h"
 
 // allocate more total buffer space than the maximum buffering limit but flush 
 //   upon reaching totalBufferCapacity_
@@ -41,12 +42,23 @@ public:
 };
 
 template <class dtype>
-class MeshStreamerClient : public CBase_MeshStreamerClient<dtype> {
+class MeshStreamerGroupClient : public CBase_MeshStreamerGroupClient<dtype> {
  public:
      virtual void receiveCombinedData(MeshStreamerMessage<dtype> *msg);
      virtual void process(dtype &data)=0; 
 };
 
+template <class dtype>
+class MeshStreamerArrayClient : public CBase_MeshStreamerArrayClient<dtype> {
+ public:
+     // virtual void receiveCombinedData(MeshStreamerMessage<dtype> *msg);
+  // would like to make it pure virtual but charm will try to
+  // instantiate the abstract class, leading to errors
+  virtual void process(dtype &data) {} //=0; 
+  MeshStreamerArrayClient() {}
+  MeshStreamerArrayClient(CkMigrateMessage *msg) {}
+};
+
 template <class dtype>
 class MeshStreamer : public CBase_MeshStreamer<dtype> {
 
@@ -129,7 +141,7 @@ public:
 };
 
 template <class dtype>
-void MeshStreamerClient<dtype>::receiveCombinedData(
+void MeshStreamerGroupClient<dtype>::receiveCombinedData(
                                 MeshStreamerMessage<dtype> *msg) {
   for (int i = 0; i < msg->numDataItems; i++) {
     dtype data = msg->getDataItem(i);
@@ -182,8 +194,8 @@ MeshStreamer<dtype>::MeshStreamer(
   dataBuffers_ = new MeshStreamerMessage<dtype> **[numDimensions_]; 
   for (int i = 0; i < numDimensions; i++) {
     int numMembersAlongDimension = individualDimensionSizes_[i]; 
-    dataBuffers_[i] = new MeshStreamerMessage<dtype> *[numMembersAlongDimension];
-
+    dataBuffers_[i] = 
+      new MeshStreamerMessage<dtype> *[numMembersAlongDimension];
     for (int j = 0; j < numMembersAlongDimension; j++) {
       dataBuffers_[i][j] = NULL;
     }
@@ -354,7 +366,8 @@ void MeshStreamer<dtype>::insertData(dtype &dataItem, int destinationPe) {
 
 template <class dtype>
 void MeshStreamer<dtype>::doneInserting() {
-  this->contribute(CkCallback(CkIndex_MeshStreamer<dtype>::finish(NULL), this->thisProxy));
+  this->contribute(CkCallback(CkIndex_MeshStreamer<dtype>::finish(NULL), 
+                             this->thisProxy));
 }
 
 template <class dtype>
@@ -522,7 +535,8 @@ void periodicProgressFunction(void *MeshStreamerObj, double time) {
 
 template <class dtype>
 void MeshStreamer<dtype>::registerPeriodicProgressFunction() {
-  CcdCallFnAfter(periodicProgressFunction<dtype>, (void *) this, progressPeriodInMs_); 
+  CcdCallFnAfter(periodicProgressFunction<dtype>, (void *) this, 
+                progressPeriodInMs_); 
 }
 
 
@@ -530,8 +544,8 @@ template <class dtype>
 class GroupMeshStreamer : public MeshStreamer<dtype> {
 private:
 
-  CProxy_MeshStreamerClient<dtype> clientProxy_;
-  MeshStreamerClient<dtype> *clientObj_;
+  CProxy_MeshStreamerGroupClient<dtype> clientProxy_;
+  MeshStreamerGroupClient<dtype> *clientObj_;
 
   void deliverToDestination(int destinationPe, 
                             MeshStreamerMessage<dtype> *destinationBuffer) {
@@ -546,37 +560,79 @@ public:
 
   GroupMeshStreamer(int totalBufferCapacity, int numDimensions,
                    int *dimensionSizes, 
-                   const CProxy_MeshStreamerClient<dtype> &clientProxy,
+                   const CProxy_MeshStreamerGroupClient<dtype> &clientProxy,
                    int yieldFlag = 0, double progressPeriodInMs = -1.0)
    :MeshStreamer<dtype>(totalBufferCapacity, numDimensions, dimensionSizes, 
                          yieldFlag, progressPeriodInMs) 
   {
     clientProxy_ = clientProxy; 
-    clientObj_ = ((MeshStreamerClient<dtype> *)CkLocalBranch(clientProxy_));
+    clientObj_ = 
+      ((MeshStreamerGroupClient<dtype> *)CkLocalBranch(clientProxy_));
   }
 
 
 };
 
-/*
-
-template<class dtype>
-struct ArrayDataItem{
-  int arrayIndex;
-  dtype dataItem;
-};
-
 template <class dtype>
-class ArrayMeshStreamer : public CBase_ArrayMeshStreamer<dtype>,
-                          public MeshStreamer<ArrayDataItem<dtype> > {
+class ArrayMeshStreamer : public MeshStreamer<ArrayDataItem<dtype> > {
 private:
 
+  CProxy_MeshStreamerArrayClient<dtype> clientProxy_;
+  CkArray *clientArrayMgr;
+  MeshStreamerArrayClient<dtype> *clientObj_;
+
+
+  void deliverToDestination(
+       int destinationPe, 
+       MeshStreamerMessage<ArrayDataItem<dtype> > *destinationBuffer) { 
+    ( (CProxy_ArrayMeshStreamer<dtype>) this->thisProxy[destinationPe]).receiveArrayData(destinationBuffer);
+  }
+
+  void localDeliver(ArrayDataItem<dtype> &packedDataItem) {
+    int arrayId = packedDataItem.arrayIndex; 
+    MeshStreamerArrayClient<dtype> *clientObj = 
+      clientProxy_[arrayId].ckLocal();
+
+    if (clientObj != NULL) {
+      clientObj->process(packedDataItem.dataItem);
+    }
+    else {
+      // array element is no longer present locally - redeliver using proxy
+      clientProxy_[arrayId].process(packedDataItem.dataItem);
+    }
+  }
+
 public:
 
-  void insertData(dtype &dataItem, int arrayIndex);
+  ArrayMeshStreamer(int totalBufferCapacity, int numDimensions,
+                   int *dimensionSizes, 
+                   const CProxy_MeshStreamerArrayClient<dtype> &clientProxy,
+                   int yieldFlag = 0, double progressPeriodInMs = -1.0)
+    :MeshStreamer<ArrayDataItem<dtype> >(totalBufferCapacity, numDimensions, 
+                                       dimensionSizes, yieldFlag, 
+                                       progressPeriodInMs) 
+  {
+    clientProxy_ = clientProxy; 
+    clientArrayMgr = clientProxy_.ckLocalBranch();
+  }
+
+  void receiveArrayData(MeshStreamerMessage<ArrayDataItem<dtype> > *msg) {
+    for (int i = 0; i < msg->numDataItems; i++) {
+      ArrayDataItem<dtype> &data = msg->getDataItem(i);
+      localDeliver(data);
+    }
+    delete msg;
+  }
 
+  void insertData(dtype &dataItem, int arrayIndex) {
+    // simple implementation to test functionality
+    // TODO - reimplement to avoid copying item before transfer into message
+    ArrayDataItem<dtype> packedDataItem;
+    packedDataItem.arrayIndex = arrayIndex; 
+    packedDataItem.dataItem = dataItem;
+    MeshStreamer<ArrayDataItem<dtype> >::insertData(packedDataItem, clientArrayMgr->lastKnown(clientProxy_[arrayIndex].ckGetIndex()));
+  }
 };
-*/
 
 #define CK_TEMPLATES_ONLY
 #include "NDMeshStreamer.def.h"