NDMeshStreamer: checking in a working version of the array interface
[charm.git] / src / libs / ck-libs / NDMeshStreamer / NDMeshStreamer.h
index 32cf73b46d3bc11b70a0ec22270d38f597208ddf..4cf955163eda1293fa61b4cda2bcd4fa4fcea2d4 100644 (file)
@@ -1,8 +1,9 @@
-#ifndef _NDMESH_STREAMER_H_
-#define _NDMESH_STREAMER_H_
+#ifndef NDMESH_STREAMER_H_
+#define NDMESH_STREAMER_H_
 
 #include <algorithm>
 #include "NDMeshStreamer.decl.h"
+#include "DataItemTypes.h"
 
 // allocate more total buffer space than the maximum buffering limit but flush 
 //   upon reaching totalBufferCapacity_
@@ -41,12 +42,23 @@ public:
 };
 
 template <class dtype>
-class MeshStreamerClient : public CBase_MeshStreamerClient<dtype> {
+class MeshStreamerGroupClient : public CBase_MeshStreamerGroupClient<dtype> {
  public:
      virtual void receiveCombinedData(MeshStreamerMessage<dtype> *msg);
      virtual void process(dtype &data)=0; 
 };
 
+template <class dtype>
+class MeshStreamerArrayClient : public CBase_MeshStreamerArrayClient<dtype> {
+ public:
+     // virtual void receiveCombinedData(MeshStreamerMessage<dtype> *msg);
+  // would like to make it pure virtual but charm will try to
+  // instantiate the abstract class, leading to errors
+  virtual void process(dtype &data) {} //=0; 
+  MeshStreamerArrayClient() {}
+  MeshStreamerArrayClient(CkMigrateMessage *msg) {}
+};
+
 template <class dtype>
 class MeshStreamer : public CBase_MeshStreamer<dtype> {
 
@@ -129,7 +141,7 @@ public:
 };
 
 template <class dtype>
-void MeshStreamerClient<dtype>::receiveCombinedData(
+void MeshStreamerGroupClient<dtype>::receiveCombinedData(
                                 MeshStreamerMessage<dtype> *msg) {
   for (int i = 0; i < msg->numDataItems; i++) {
     dtype data = msg->getDataItem(i);
@@ -182,8 +194,8 @@ MeshStreamer<dtype>::MeshStreamer(
   dataBuffers_ = new MeshStreamerMessage<dtype> **[numDimensions_]; 
   for (int i = 0; i < numDimensions; i++) {
     int numMembersAlongDimension = individualDimensionSizes_[i]; 
-    dataBuffers_[i] = new MeshStreamerMessage<dtype> *[numMembersAlongDimension];
-
+    dataBuffers_[i] = 
+      new MeshStreamerMessage<dtype> *[numMembersAlongDimension];
     for (int j = 0; j < numMembersAlongDimension; j++) {
       dataBuffers_[i][j] = NULL;
     }
@@ -354,7 +366,8 @@ void MeshStreamer<dtype>::insertData(dtype &dataItem, int destinationPe) {
 
 template <class dtype>
 void MeshStreamer<dtype>::doneInserting() {
-  this->contribute(CkCallback(CkIndex_MeshStreamer<dtype>::finish(NULL), this->thisProxy));
+  this->contribute(CkCallback(CkIndex_MeshStreamer<dtype>::finish(NULL), 
+                             this->thisProxy));
 }
 
 template <class dtype>
@@ -522,7 +535,8 @@ void periodicProgressFunction(void *MeshStreamerObj, double time) {
 
 template <class dtype>
 void MeshStreamer<dtype>::registerPeriodicProgressFunction() {
-  CcdCallFnAfter(periodicProgressFunction<dtype>, (void *) this, progressPeriodInMs_); 
+  CcdCallFnAfter(periodicProgressFunction<dtype>, (void *) this, 
+                progressPeriodInMs_); 
 }
 
 
@@ -530,8 +544,8 @@ template <class dtype>
 class GroupMeshStreamer : public MeshStreamer<dtype> {
 private:
 
-  CProxy_MeshStreamerClient<dtype> clientProxy_;
-  MeshStreamerClient<dtype> *clientObj_;
+  CProxy_MeshStreamerGroupClient<dtype> clientProxy_;
+  MeshStreamerGroupClient<dtype> *clientObj_;
 
   void deliverToDestination(int destinationPe, 
                             MeshStreamerMessage<dtype> *destinationBuffer) {
@@ -546,37 +560,79 @@ public:
 
   GroupMeshStreamer(int totalBufferCapacity, int numDimensions,
                    int *dimensionSizes, 
-                   const CProxy_MeshStreamerClient<dtype> &clientProxy,
+                   const CProxy_MeshStreamerGroupClient<dtype> &clientProxy,
                    int yieldFlag = 0, double progressPeriodInMs = -1.0)
    :MeshStreamer<dtype>(totalBufferCapacity, numDimensions, dimensionSizes, 
                          yieldFlag, progressPeriodInMs) 
   {
     clientProxy_ = clientProxy; 
-    clientObj_ = ((MeshStreamerClient<dtype> *)CkLocalBranch(clientProxy_));
+    clientObj_ = 
+      ((MeshStreamerGroupClient<dtype> *)CkLocalBranch(clientProxy_));
   }
 
 
 };
 
-/*
-
-template<class dtype>
-struct ArrayDataItem{
-  int arrayIndex;
-  dtype dataItem;
-};
-
 template <class dtype>
-class ArrayMeshStreamer : public CBase_ArrayMeshStreamer<dtype>,
-                          public MeshStreamer<ArrayDataItem<dtype> > {
+class ArrayMeshStreamer : public MeshStreamer<ArrayDataItem<dtype> > {
 private:
 
+  CProxy_MeshStreamerArrayClient<dtype> clientProxy_;
+  CkArray *clientArrayMgr;
+  MeshStreamerArrayClient<dtype> *clientObj_;
+
+
+  void deliverToDestination(
+       int destinationPe, 
+       MeshStreamerMessage<ArrayDataItem<dtype> > *destinationBuffer) { 
+    ( (CProxy_ArrayMeshStreamer<dtype>) this->thisProxy[destinationPe]).receiveArrayData(destinationBuffer);
+  }
+
+  void localDeliver(ArrayDataItem<dtype> &packedDataItem) {
+    int arrayId = packedDataItem.arrayIndex; 
+    MeshStreamerArrayClient<dtype> *clientObj = 
+      clientProxy_[arrayId].ckLocal();
+
+    if (clientObj != NULL) {
+      clientObj->process(packedDataItem.dataItem);
+    }
+    else {
+      // array element is no longer present locally - redeliver using proxy
+      clientProxy_[arrayId].process(packedDataItem.dataItem);
+    }
+  }
+
 public:
 
-  void insertData(dtype &dataItem, int arrayIndex);
+  ArrayMeshStreamer(int totalBufferCapacity, int numDimensions,
+                   int *dimensionSizes, 
+                   const CProxy_MeshStreamerArrayClient<dtype> &clientProxy,
+                   int yieldFlag = 0, double progressPeriodInMs = -1.0)
+    :MeshStreamer<ArrayDataItem<dtype> >(totalBufferCapacity, numDimensions, 
+                                       dimensionSizes, yieldFlag, 
+                                       progressPeriodInMs) 
+  {
+    clientProxy_ = clientProxy; 
+    clientArrayMgr = clientProxy_.ckLocalBranch();
+  }
+
+  void receiveArrayData(MeshStreamerMessage<ArrayDataItem<dtype> > *msg) {
+    for (int i = 0; i < msg->numDataItems; i++) {
+      ArrayDataItem<dtype> &data = msg->getDataItem(i);
+      localDeliver(data);
+    }
+    delete msg;
+  }
 
+  void insertData(dtype &dataItem, int arrayIndex) {
+    // simple implementation to test functionality
+    // TODO - reimplement to avoid copying item before transfer into message
+    ArrayDataItem<dtype> packedDataItem;
+    packedDataItem.arrayIndex = arrayIndex; 
+    packedDataItem.dataItem = dataItem;
+    MeshStreamer<ArrayDataItem<dtype> >::insertData(packedDataItem, clientArrayMgr->lastKnown(clientProxy_[arrayIndex].ckGetIndex()));
+  }
 };
-*/
 
 #define CK_TEMPLATES_ONLY
 #include "NDMeshStreamer.def.h"