NDMeshStreamer: adding broadcast functionality
authorLukasz Wesolowski <wesolwsk@illinois.edu>
Tue, 4 Sep 2012 04:41:05 +0000 (23:41 -0500)
committerLukasz Wesolowski <wesolwsk@illinois.edu>
Tue, 4 Sep 2012 04:41:51 +0000 (23:41 -0500)
src/libs/ck-libs/NDMeshStreamer/NDMeshStreamer.h

index 7fd963a9d9c393e61105d780e3fa2ddd88b9d35b..0ca33f41ec383cdc5ab8d9268a4dd9408d774e83 100644 (file)
@@ -18,6 +18,8 @@
 // #define CACHE_ARRAY_METADATA // only works for 1D array clients
 // #define STREAMER_VERBOSE_OUTPUT
 
+#define TRAM_BROADCAST (-100)
+
 struct MeshLocation {
   int dimension; 
   int bufferIndex; 
@@ -73,7 +75,8 @@ public:
   }
   void receiveRedeliveredItem(dtype data) {
 #ifdef STREAMER_VERBOSE_OUTPUT
-    CkPrintf("[%d] redelivered to index %d\n", CkMyPe(), this->thisIndex.data[0]);
+    CkPrintf("[%d] redelivered to index %d\n", 
+             CkMyPe(), this->thisIndex.data[0]);
 #endif
     if (detectorLocalObj_ != NULL) {
       detectorLocalObj_->consume();
@@ -104,7 +107,6 @@ private:
   int numDataItemsBuffered_;
 
   int numMembers_; 
-  int numDimensions_;
   int *individualDimensionSizes_;
   int *combinedDimensionSizes_;
 
@@ -144,7 +146,7 @@ private:
                     const MeshLocation &destinationCoordinates, 
                     void *dataItem, bool copyIndirectly = false);
   virtual void localDeliver(dtype &dataItem) = 0; 
-
+  virtual void localBroadcast(dtype &dataItem) = 0; 
   virtual int numElementsInClient() = 0;
   virtual int numLocalElementsInClient() = 0; 
 
@@ -153,16 +155,20 @@ private:
   void sendLargestBuffer();
   void flushToIntermediateDestinations();
   void flushDimension(int dimension, bool sendMsgCounts = false); 
+  MeshLocation determineLocation(int destinationPe, 
+                                 int dimensionReceivedAlong);
 
 protected:
 
+  int numDimensions_;
   bool useStagedCompletion_;
   CompletionDetector *detectorLocalObj_;
   virtual int copyDataItemIntoMessage(
               MeshStreamerMessage<dtype> *destinationBuffer, 
               void *dataItemHandle, bool copyIndirectly = false);
-  MeshLocation determineLocation(int destinationPe, 
-                                 int dimensionReceivedAlong);
+  void insertData(void *dataItemHandle, int destinationPe);
+  void broadcast(void *dataItemHandle, int dimension, bool copyIndirectly);
+
 public:
 
   MeshStreamer(int maxNumDataItemsBuffered, int numDimensions, 
@@ -171,6 +177,12 @@ public:
   ~MeshStreamer();
 
   // entry
+  void init(int numLocalContributors, CkCallback startCb, CkCallback endCb, 
+            int prio, bool usePeriodicFlushing);
+  void associateCallback(int numContributors, 
+                         CkCallback startCb, CkCallback endCb, 
+                         CProxy_CompletionDetector detector,
+                         int prio, bool usePeriodicFlushing);
   void receiveAlongRoute(MeshStreamerMessage<dtype> *msg);
   virtual void receiveAtDestination(MeshStreamerMessage<dtype> *msg) = 0;
   void flushIfIdle();
@@ -181,11 +193,7 @@ public:
     return isPeriodicFlushEnabled_;
   }
   virtual void insertData(dtype &dataItem, int destinationPe); 
-  void insertData(void *dataItemHandle, int destinationPe);
-  void associateCallback(int numContributors, 
-                         CkCallback startCb, CkCallback endCb, 
-                         CProxy_CompletionDetector detector,
-                         int prio, bool usePeriodicFlushing);
+  virtual void broadcast(dtype &dataItem); 
   void registerPeriodicProgressFunction();
 
   // flushing begins only after enablePeriodicFlushing has been invoked
@@ -207,9 +215,6 @@ public:
       detectorLocalObj_->done(numContributorsFinished);
     }
   }
-
-  void init(int numLocalContributors, CkCallback startCb, CkCallback endCb, 
-            int prio, bool usePeriodicFlushing);
   
   bool stagedCompletionStarted() {    
     return (useStagedCompletion_ && dimensionToFlush_ != numDimensions_ - 1); 
@@ -252,6 +257,7 @@ public:
         CkPrintf("[%d] contribute\n", CkMyPe()); 
 #endif
         CkAssert(numDataItemsBuffered_ == 0); 
+        isPeriodicFlushEnabled_ = false; 
         if (!userCallback_.isInvalid()) {
           this->contribute(userCallback_);
           userCallback_ = CkCallback();
@@ -333,7 +339,8 @@ MeshStreamer<dtype>::MeshStreamer(
     myLocationIndex_[i] = remainder / combinedDimensionSizes_[i];
     int dimensionOffset = combinedDimensionSizes_[i] * myLocationIndex_[i];
     remainder -= dimensionOffset; 
-    startingIndexAtDimension_[i] = startingIndexAtDimension_[i+1] + dimensionOffset; 
+    startingIndexAtDimension_[i] = 
+      startingIndexAtDimension_[i+1] + dimensionOffset; 
   }
 
   isPeriodicFlushEnabled_ = false; 
@@ -397,7 +404,8 @@ MeshLocation MeshStreamer<dtype>::determineLocation(
 #endif
 
   MeshLocation destinationLocation;
-  int remainder = destinationPe - startingIndexAtDimension_[dimensionReceivedAlong];
+  int remainder = 
+    destinationPe - startingIndexAtDimension_[dimensionReceivedAlong];
   int dimensionIndex; 
   for (int i = dimensionReceivedAlong - 1; i >= 0; i--) {        
     dimensionIndex = remainder / combinedDimensionSizes_[i];
@@ -462,7 +470,7 @@ void MeshStreamer<dtype>::storeMessage(
     copyDataItemIntoMessage(destinationBuffer, dataItem, copyIndirectly);
   if (dimension != 0) {
     destinationBuffer->markDestination(numBuffered-1, destinationPe);
-  }  
+  }
   numDataItemsBuffered_++;
 
   // send if buffer is full
@@ -482,7 +490,8 @@ void MeshStreamer<dtype>::storeMessage(
     }
     else {
 #ifdef STREAMER_VERBOSE_OUTPUT
-      CkPrintf("[%d] sending intermediate to %d\n", CkMyPe(), destinationIndex); 
+      CkPrintf("[%d] sending intermediate to %d\n", 
+               CkMyPe(), destinationIndex); 
 #endif
       this->thisProxy[destinationIndex].receiveAlongRoute(destinationBuffer);
     }
@@ -503,6 +512,57 @@ void MeshStreamer<dtype>::storeMessage(
   }
 
 }
+template <class dtype>
+inline
+void MeshStreamer<dtype>::broadcast(dtype &dataItem) {
+  const static bool copyIndirectly = true;
+
+  // no data items should be submitted after all local contributors call done 
+  // and staged completion has begun
+  CkAssert(stagedCompletionStarted() == false);
+
+  // produce and consume once per PE
+  if (!useStagedCompletion_) {
+    detectorLocalObj_->produce(CkNumPes());
+  }
+
+  // deliver locally
+  dtype dataItemCopy = dataItem;
+  localBroadcast(dataItemCopy);
+
+  broadcast(&dataItem, numDimensions_ - 1, copyIndirectly); 
+}
+
+template <class dtype>
+inline
+void MeshStreamer<dtype>::broadcast(void *dataItemHandle, int dimension, 
+                                    bool copyIndirectly) {
+
+  MeshLocation destinationLocation;
+  destinationLocation.dimension = dimension; 
+
+  while (destinationLocation.dimension != -1) {
+    for (int i = 0; 
+         i < individualDimensionSizes_[destinationLocation.dimension]; 
+         i++) {
+
+      if (i != myLocationIndex_[destinationLocation.dimension]) {
+        destinationLocation.bufferIndex = i; 
+        storeMessage(TRAM_BROADCAST, destinationLocation, 
+                     dataItemHandle, copyIndirectly); 
+      }
+      // release control to scheduler if requested by the user, 
+      //   assume caller is threaded entry
+      if (yieldFlag_ && ++yieldCount_ == 1024) {
+        yieldCount_ = 0; 
+        CthYield();
+      }
+    }
+    destinationLocation.dimension--; 
+  }
+
+}
+
 
 template <class dtype>
 inline
@@ -653,13 +713,17 @@ void MeshStreamer<dtype>::receiveAlongRoute(MeshStreamerMessage<dtype> *msg) {
     if (destinationPe == CkMyPe()) {
       localDeliver(dataItem);
     }
-    else {
+    else if (destinationPe != TRAM_BROADCAST) {
       if (destinationPe != lastDestinationPe) {
         // do this once per sequence of items with the same destination
         destinationLocation = determineLocation(destinationPe, msg->dimension);
       }
       storeMessage(destinationPe, destinationLocation, &dataItem);   
     }
+    else /* if (destinationPe == TRAM_BROADCAST) */ {
+      localBroadcast(dataItem);       
+      broadcast(&dataItem, msg->dimension - 1, false); 
+    }
     lastDestinationPe = destinationPe; 
   }
 
@@ -718,7 +782,8 @@ void MeshStreamer<dtype>::sendLargestBuffer() {
       }
       else {
 #ifdef STREAMER_VERBOSE_OUTPUT
-        CkPrintf("[%d] sending intermediate flush to %d\n", CkMyPe(), destinationIndex); 
+        CkPrintf("[%d] sending intermediate flush to %d\n", 
+                 CkMyPe(), destinationIndex); 
 #endif
        this->thisProxy[destinationIndex].receiveAlongRoute(destinationBuffer);
       }
@@ -745,7 +810,8 @@ void MeshStreamer<dtype>::flushToIntermediateDestinations() {
 template <class dtype>
 void MeshStreamer<dtype>::flushDimension(int dimension, bool sendMsgCounts) {
 #ifdef STREAMER_VERBOSE_OUTPUT
-  CkPrintf("[%d] flushDimension: %d, sendMsgCounts: %d\n", CkMyPe(), dimension, sendMsgCounts); 
+  CkPrintf("[%d] flushDimension: %d, sendMsgCounts: %d\n", 
+           CkMyPe(), dimension, sendMsgCounts); 
 #endif
   MeshStreamerMessage<dtype> **messageBuffers; 
   MeshStreamerMessage<dtype> *destinationBuffer; 
@@ -791,13 +857,15 @@ void MeshStreamer<dtype>::flushDimension(int dimension, bool sendMsgCounts) {
 
     if (dimension == 0) {
 #ifdef STREAMER_VERBOSE_OUTPUT
-      CkPrintf("[%d] sending dimension flush to %d\n", CkMyPe(), destinationIndex); 
+      CkPrintf("[%d] sending dimension flush to %d\n", 
+               CkMyPe(), destinationIndex); 
 #endif
       this->thisProxy[destinationIndex].receiveAtDestination(destinationBuffer);
     }
     else {
 #ifdef STREAMER_VERBOSE_OUTPUT
-      CkPrintf("[%d] sending intermediate dimension flush to %d\n", CkMyPe(), destinationIndex); 
+      CkPrintf("[%d] sending intermediate dimension flush to %d\n", 
+               CkMyPe(), destinationIndex); 
 #endif
       this->thisProxy[destinationIndex].receiveAlongRoute(destinationBuffer);
     }
@@ -860,7 +928,9 @@ private:
     if (MeshStreamer<dtype>::useStagedCompletion_) {
 #ifdef STREAMER_VERBOSE_OUTPUT
       envelope *env = UsrToEnv(msg);
-      CkPrintf("[%d] received at dest from %d %d items finalMsgCount: %d\n", CkMyPe(), env->getSrcPe(), msg->numDataItems, msg->finalMsgCount);  
+      CkPrintf("[%d] received at dest from %d %d items finalMsgCount: %d\n", 
+               CkMyPe(), env->getSrcPe(), msg->numDataItems, 
+               msg->finalMsgCount);  
 #endif
       markMessageReceived(msg->dimension, msg->finalMsgCount); 
     }
@@ -878,6 +948,10 @@ private:
     }
   }
 
+  void localBroadcast(dtype &dataItem) {
+    localDeliver(dataItem); 
+  }
+
   int numElementsInClient() {
     // client is a group - there is one element per PE
     return CkNumPes();
@@ -920,13 +994,13 @@ public:
 };
 
 template <class dtype>
-class MeshStreamerClientIterator : public CkLocIterator {
+class ClientInitializer : public CkLocIterator {
 
 public:
   
   CompletionDetector *detectorLocalObj_;
   CkArray *clientArrMgr_;
-  MeshStreamerClientIterator(CompletionDetector *detectorObj, 
+  ClientInitializer(CompletionDetector *detectorObj, 
                             CkArray *clientArrMgr) 
     : detectorLocalObj_(detectorObj), clientArrMgr_(clientArrMgr) {}
 
@@ -942,6 +1016,28 @@ public:
 
 };
 
+template <class dtype>
+class LocalBroadcaster : public CkLocIterator {
+
+public:
+  CkArray *clientArrMgr_;
+  dtype *dataItem_; 
+
+  LocalBroadcaster(CkArray *clientArrMgr, dtype *dataItem) 
+   : clientArrMgr_(clientArrMgr), dataItem_(dataItem) {}
+
+  void addLocation(CkLocation &loc) {
+    MeshStreamerArrayClient<dtype> *clientObj = 
+      (MeshStreamerArrayClient<dtype> *) clientArrMgr_->lookup(loc.getIndex());
+
+    CkAssert(clientObj != NULL); 
+
+    dtype dataItemCopy = *dataItem_; 
+    clientObj->process(dataItemCopy); 
+  }
+
+};
+
 template <class dtype, class itype>
 class ArrayMeshStreamer : public MeshStreamer<ArrayDataItem<dtype, itype> > {
   
@@ -959,7 +1055,10 @@ private:
 
   void localDeliver(ArrayDataItem<dtype, itype> &packedDataItem) {
     itype arrayId = packedDataItem.arrayIndex; 
-
+    if (arrayId == itype(TRAM_BROADCAST)) {
+      localBroadcast(packedDataItem);
+      return;
+    }
     MeshStreamerArrayClient<dtype> *clientObj;
 #ifdef CACHE_ARRAY_METADATA
     clientObj = clientObjs_[arrayId];
@@ -981,6 +1080,21 @@ private:
     }
   }
 
+  void localBroadcast(ArrayDataItem<dtype, itype> &packedDataItem) {
+
+    LocalBroadcaster<dtype> clientIterator(clientProxy_.ckLocalBranch(), 
+                                           &packedDataItem.dataItem);
+    CkLocMgr *clientLocMgr = clientProxy_.ckLocMgr(); 
+    clientLocMgr->iterate(clientIterator);
+
+    if (MeshStreamer<ArrayDataItem<dtype, itype> >
+         ::useStagedCompletion_ == false) {
+        MeshStreamer<ArrayDataItem<dtype, itype> >
+         ::detectorLocalObj_->consume();      
+    }
+
+  }
+
   int numElementsInClient() {
     return numArrayElements_;
   }
@@ -1006,7 +1120,7 @@ private:
 #else
       // set completion detector in local elements of the client
       CkLocMgr *clientLocMgr = clientProxy_.ckLocMgr(); 
-      MeshStreamerClientIterator<dtype> clientIterator(
+      ClientInitializer<dtype> clientIterator(
           MeshStreamer<ArrayDataItem<dtype, itype> >::detectorLocalObj_, 
           clientProxy_.ckLocalBranch());
       clientLocMgr->iterate(clientIterator);
@@ -1093,6 +1207,36 @@ public:
     delete msg;
   }
 
+  void broadcast(dtype &dataItem) {
+    const static bool copyIndirectly = true;
+
+    // no data items should be submitted after all local contributors call done
+    // and staged completion has begun
+    CkAssert((MeshStreamer<ArrayDataItem<dtype, itype> >
+               ::stagedCompletionStarted()) == false);
+
+    if (MeshStreamer<ArrayDataItem<dtype, itype> >
+         ::useStagedCompletion_ == false) {
+      MeshStreamer<ArrayDataItem<dtype, itype> >
+        ::detectorLocalObj_->produce(CkNumPes());
+    }
+
+    // deliver locally
+    ArrayDataItem<dtype, itype> packedDataItem;
+    packedDataItem.arrayIndex = TRAM_BROADCAST; 
+    packedDataItem.dataItem = dataItem;
+    localBroadcast(packedDataItem);
+
+    DataItemHandle tempHandle; 
+    tempHandle.dataItem = &dataItem;
+    tempHandle.arrayIndex = TRAM_BROADCAST;
+
+    int numDimensions = 
+      MeshStreamer<ArrayDataItem<dtype, itype> >::numDimensions_;
+    MeshStreamer<ArrayDataItem<dtype, itype> >::
+      broadcast(&tempHandle, numDimensions - 1, copyIndirectly);
+  }
+
   void insertData(dtype &dataItem, itype arrayIndex) {
 
     // no data items should be submitted after all local contributors call done