NDMeshStreamer: added support for 3D chare arrays
authorLukasz Wesolowski <wesolwsk@illinois.edu>
Mon, 2 Apr 2012 19:39:25 +0000 (14:39 -0500)
committerLukasz Wesolowski <wesolwsk@illinois.edu>
Mon, 2 Apr 2012 19:39:25 +0000 (14:39 -0500)
src/libs/ck-libs/NDMeshStreamer/DataItemTypes.h
src/libs/ck-libs/NDMeshStreamer/NDMeshStreamer.ci
src/libs/ck-libs/NDMeshStreamer/NDMeshStreamer.h

index c5b81fca05bf82ea22d7ad0ec45419b03ec93a09..00cd1af2926f1b6691a51ab6b28f722d6b7741d6 100644 (file)
@@ -1,10 +1,10 @@
 #ifndef DATA_ITEM_TYPES_H
 #define DATA_ITEM_TYPES_H
 
-template<class dtype>
+template<class dtype, class itype>
 class ArrayDataItem{
  public:
-  int arrayIndex;
+  itype arrayIndex;
   dtype dataItem;
 };
 
index 6cc789bbce13e800fa774646eb410635ad510f9d..9b3841a95c8badb00382c2a47c4eb6dad870b029 100644 (file)
@@ -18,6 +18,10 @@ module NDMeshStreamer {
     entry void receiveRedeliveredItem(dtype data);
   };
 
+  template<class dtype> array [3D] MeshStreamerArray3DClient {
+    entry void receiveRedeliveredItem(dtype data);
+  };
+
   template<class dtype> 
   group MeshStreamer {
     entry void receiveAlongRoute(MeshStreamerMessage<dtype> *msg);
@@ -37,17 +41,16 @@ module NDMeshStreamer {
          bool yieldFlag = 0, double progressPeriodInMs = -1.0);
   };
 
-
-  template<class dtype>
-    group ArrayMeshStreamer : MeshStreamer<ArrayDataItem<dtype> > {
+  template<class dtype, class ctype, class itype>
+    group ArrayMeshStreamer : MeshStreamer<ArrayDataItem<dtype, itype> > {
     entry ArrayMeshStreamer(
          int totalBufferCapacity, int numDimensions, 
          int dimensionSizes[numDimensions],
-         const CProxy_MeshStreamerArrayClient<dtype> &clientProxy,
+         const ctype &clientProxy,
          bool yieldFlag = 0, double progressPeriodInMs = -1.0);
 
     entry void receiveArrayData(
-              MeshStreamerMessage<ArrayDataItem<dtype> > *msg); 
+              MeshStreamerMessage<ArrayDataItem<dtype, itype> > *msg); 
   };
 
 };
index 8d0c8b47decb45068206e33b08fd9dabb74abe06..51bcb18c94a0b95043c4023ab549cdab3c0d495f 100644 (file)
@@ -5,6 +5,7 @@
 #include "NDMeshStreamer.decl.h"
 #include "DataItemTypes.h"
 #include "completion.h"
+#include "ckarray.h"
 
 // allocate more total buffer space than the maximum buffering limit but flush 
 //   upon reaching totalBufferCapacity_
@@ -13,7 +14,7 @@
 // #define DEBUG_STREAMER
 // #define CACHE_LOCATIONS
 // #define SUPPORT_INCOMPLETE_MESH
-#define CACHE_ARRAY_METADATA
+#define CACHE_ARRAY_METADATA // only works for 1D array clients
 
 struct MeshLocation {
   int dimension; 
@@ -74,8 +75,9 @@ class MeshStreamerGroupClient : public CBase_MeshStreamerGroupClient<dtype>,
 template <class dtype>
 class MeshStreamerArrayClient :  public CBase_MeshStreamerArrayClient<dtype>, 
   public MeshStreamerClient<dtype>
-   {
- public:
+{
+
+public:
 
   // virtual void receiveCombinedData(MeshStreamerMessage<dtype> *msg);
   MeshStreamerArrayClient() {}
@@ -84,12 +86,32 @@ class MeshStreamerArrayClient :  public CBase_MeshStreamerArrayClient<dtype>,
     MeshStreamerClient<dtype>::detectorLocalObj_->consume();
     process(data);
   }
+
   void pup(PUP::er &p) {
     CBase_MeshStreamerArrayClient<dtype>::pup(p);
   }
 
 };
 
+template <class dtype>
+class MeshStreamerArray3DClient : 
+  public CBase_MeshStreamerArray3DClient<dtype>, 
+  public MeshStreamerClient<dtype> 
+{
+
+public:
+  MeshStreamerArray3DClient() {}
+  MeshStreamerArray3DClient(CkMigrateMessage *msg) {}
+  void receiveRedeliveredItem(dtype data) {
+    MeshStreamerClient<dtype>::detectorLocalObj_->consume();
+    process(data);
+  }
+  void pup(PUP::er &p) {
+    CBase_MeshStreamerArray3DClient<dtype>::pup(p);
+  }
+
+};
+
 template <class dtype>
 class MeshStreamer : public CBase_MeshStreamer<dtype> {
 
@@ -150,7 +172,7 @@ protected:
 public:
 
     MeshStreamer(int totalBufferCapacity, int numDimensions, 
-                int *dimensionSizes,
+                int *dimensionSies,
                  bool yieldFlag = 0, double progressPeriodInMs = -1.0);
     ~MeshStreamer();
 
@@ -654,32 +676,62 @@ public:
 
 };
 
-template <class dtype>
-class ArrayMeshStreamer : public MeshStreamer<ArrayDataItem<dtype> > {
-private:
+template <class dtype, class ctype>
+class MeshStreamerClientIterator : public CkLocIterator {
 
-  CProxy_MeshStreamerArrayClient<dtype> clientProxy_;
+public:
+  
+  CompletionDetector *detectorLocalObj_;
+  ctype clientProxy_;
+  MeshStreamerClientIterator(CompletionDetector *detectorObj, 
+                            ctype clientProxy) 
+   : detectorLocalObj_(detectorObj), clientProxy_(clientProxy) {}
+
+  // CkLocMgr::iterate will call addLocation on all elements local to this PE
+  void addLocation(CkLocation &loc) {
+
+    ((MeshStreamerClient<dtype> *) 
+     (clientProxy_.ckLocalBranch()->lookup(loc.getIndex())))
+      ->setDetector(detectorLocalObj_); 
+
+  }
+
+};
+
+template <class dtype, class ctype, class itype>
+class ArrayMeshStreamer : public MeshStreamer<ArrayDataItem<dtype, itype> > {
+  
+private:
+  
+  ctype clientProxy_;
   CkArray *clientArrayMgr_;
   int numArrayElements_;
-  MeshStreamerArrayClient<dtype> **clientObjs_;
 #ifdef CACHE_ARRAY_METADATA
+  MeshStreamerArrayClient<dtype> **clientObjs_;
   int *destinationPes_;
   bool *isCachedArrayMetadata_;
 #endif
 
   void deliverToDestination(
        int destinationPe, 
-       MeshStreamerMessage<ArrayDataItem<dtype> > *destinationBuffer) { 
-    ( (CProxy_ArrayMeshStreamer<dtype>) 
+       MeshStreamerMessage<ArrayDataItem<dtype, itype> > *destinationBuffer) {
+    ( (CProxy_ArrayMeshStreamer<dtype, ctype, itype>) 
       this->thisProxy )[destinationPe].receiveArrayData(destinationBuffer);
   }
 
-  void localDeliver(ArrayDataItem<dtype> &packedDataItem) {
-    int arrayId = packedDataItem.arrayIndex; 
+  void localDeliver(ArrayDataItem<dtype, itype> &packedDataItem) {
+    itype arrayId = packedDataItem.arrayIndex; 
+
+    MeshStreamerArrayClient<dtype> *clientObj;
+#ifdef CACHE_ARRAY_METADATA
+    clientObj = clientObjs_[arrayId];
+#else
+    clientObj = clientProxy_[arrayId].ckLocal();
+#endif
 
-    if (clientObjs_[arrayId] != NULL) {
-      clientObjs_[arrayId]->process(packedDataItem.dataItem);
-      MeshStreamer<ArrayDataItem<dtype> >::detectorLocalObj_->consume();
+    if (clientObj != NULL) {
+      clientObj->process(packedDataItem.dataItem);
+      MeshStreamer<ArrayDataItem<dtype, itype> >::detectorLocalObj_->consume();
     }
     else { 
       // array element is no longer present locally - redeliver using proxy
@@ -696,39 +748,46 @@ private:
 #ifdef CACHE_ARRAY_METADATA
     std::fill(isCachedArrayMetadata_, 
              isCachedArrayMetadata_ + numArrayElements_, false);
-#endif
 
     for (int i = 0; i < numArrayElements_; i++) {
       clientObjs_[i] = clientProxy_[i].ckLocal();
       if (clientObjs_[i] != NULL) {
        clientObjs_[i]->setDetector(
-                        MeshStreamer<ArrayDataItem<dtype> >::detectorLocalObj_);
+        MeshStreamer<ArrayDataItem<dtype, itype> >::detectorLocalObj_);
       }
     }
+#else
+    // set completion detector in local elements of the client
+    CkLocMgr *clientLocMgr = clientProxy_.ckLocMgr(); 
+    MeshStreamerClientIterator<dtype, ctype> clientIterator(
+     MeshStreamer<ArrayDataItem<dtype, itype> >::detectorLocalObj_, 
+     clientProxy_);
+    clientLocMgr->iterate(clientIterator);
+#endif    
+
   }
 
 public:
 
   struct DataItemHandle {
-    int arrayIndex; 
+    itype arrayIndex; 
     dtype *dataItem;
   };
 
   ArrayMeshStreamer(int totalBufferCapacity, int numDimensions,
-                   int *dimensionSizes, 
-                   const CProxy_MeshStreamerArrayClient<dtype> &clientProxy,
+                   int *dimensionSizes, const ctype &clientProxy,
                    bool yieldFlag = 0, double progressPeriodInMs = -1.0)
-    :MeshStreamer<ArrayDataItem<dtype> >(totalBufferCapacity, numDimensions, 
-                                       dimensionSizes, yieldFlag, 
-                                       progressPeriodInMs) 
+    :MeshStreamer<ArrayDataItem<dtype, itype> >(
+      totalBufferCapacity, numDimensions, dimensionSizes, yieldFlag, 
+      progressPeriodInMs) 
   {
     clientProxy_ = clientProxy; 
     clientArrayMgr_ = clientProxy_.ckLocalBranch();
 
     numArrayElements_ = (clientArrayMgr_->getNumInitial()).data()[0];
 
-    clientObjs_ = new MeshStreamerArrayClient<dtype>*[numArrayElements_];
 #ifdef CACHE_ARRAY_METADATA
+    clientObjs_ = new MeshStreamerArrayClient<dtype>*[numArrayElements_];
     destinationPes_ = new int[numArrayElements_];
     isCachedArrayMetadata_ = new bool[numArrayElements_];
     std::fill(isCachedArrayMetadata_, 
@@ -737,24 +796,25 @@ public:
   }
 
   ~ArrayMeshStreamer() {
-    delete [] clientObjs_;
 #ifdef CACHE_ARRAY_METADATA
+    delete [] clientObjs_;
     delete [] destinationPes_;
     delete [] isCachedArrayMetadata_; 
 #endif
   }
 
-  void receiveArrayData(MeshStreamerMessage<ArrayDataItem<dtype> > *msg) {
+  void receiveArrayData(
+       MeshStreamerMessage<ArrayDataItem<dtype, itype> > *msg) {
     for (int i = 0; i < msg->numDataItems; i++) {
-      ArrayDataItem<dtype> &packedData = msg->getDataItem(i);
+      ArrayDataItem<dtype, itype> &packedData = msg->getDataItem(i);
       localDeliver(packedData);
     }
     delete msg;
   }
 
-  void insertData(dtype &dataItem, int arrayIndex) {
+  void insertData(dtype &dataItem, itype arrayIndex) {
 
-    MeshStreamer<ArrayDataItem<dtype> >::detectorLocalObj_->produce();
+    MeshStreamer<ArrayDataItem<dtype, itype> >::detectorLocalObj_->produce();
     int destinationPe; 
 #ifdef CACHE_ARRAY_METADATA
   if (isCachedArrayMetadata_[arrayIndex]) {    
@@ -771,7 +831,7 @@ public:
     clientArrayMgr_->lastKnown(clientProxy_[arrayIndex].ckGetIndex());
 #endif
 
-    static ArrayDataItem<dtype> packedDataItem;
+  static ArrayDataItem<dtype, itype> packedDataItem;
     if (destinationPe == CkMyPe()) {
       // copying here is necessary - user code should not be 
       // passed back a reference to the original item
@@ -787,12 +847,13 @@ public:
     tempHandle.arrayIndex = arrayIndex; 
     tempHandle.dataItem = &dataItem;
 
-    MeshStreamer<ArrayDataItem<dtype> >::insertData(&tempHandle, destinationPe);
+    MeshStreamer<ArrayDataItem<dtype, itype> >::
+     insertData(&tempHandle, destinationPe);
 
   }
 
   int copyDataItemIntoMessage(
-      MeshStreamerMessage<ArrayDataItem <dtype> > *destinationBuffer, 
+      MeshStreamerMessage<ArrayDataItem <dtype, itype> > *destinationBuffer, 
       void *dataItemHandle, bool copyIndirectly) {
 
     if (copyIndirectly == true) {
@@ -808,7 +869,8 @@ public:
     else {
       // this is an item received along the route to destination
       // we can copy it from the received message
-      return MeshStreamer<ArrayDataItem<dtype> >::copyDataItemIntoMessage(destinationBuffer, dataItemHandle);
+      return MeshStreamer<ArrayDataItem<dtype, itype> >::
+             copyDataItemIntoMessage(destinationBuffer, dataItemHandle);
     }
   }