NDMeshStreamer: checking in new termination scheme based on stage by stage flushing.
authorLukasz Wesolowski <wesolwsk@illinois.edu>
Thu, 3 May 2012 23:30:30 +0000 (18:30 -0500)
committerLukasz Wesolowski <wesolwsk@illinois.edu>
Thu, 3 May 2012 23:30:30 +0000 (18:30 -0500)
src/libs/ck-libs/NDMeshStreamer/NDMeshStreamer.ci
src/libs/ck-libs/NDMeshStreamer/NDMeshStreamer.h

index bc2a5cccea1746aefe8a43f2f67e5c5a4c9b7d2c..ba0d786290615f3fb1867da61b6390f9bbcb1441 100644 (file)
@@ -22,6 +22,7 @@ module NDMeshStreamer {
     entry void receiveAlongRoute(MeshStreamerMessage<dtype> *msg);
     entry void flushDirect();
     entry void finish();
+    entry void init(CkCallback startCb, CkCallback endCb, int prio);
     entry void associateCallback(int numContributors, 
                                 CkCallback startCb, CkCallback endCb, 
                                 CProxy_CompletionDetector detector,
index b3a8c30e374bfb67728a33a75c2a74a342afe497..d24d3fe8661ba03bb983c1663b0cc8b7e7fcef04 100644 (file)
@@ -17,6 +17,8 @@
 // #define CACHE_ARRAY_METADATA // only works for 1D array clients
 // #define STREAMER_EXPERIMENTAL
 
+// #define STAGED_COMPLETION
+
 struct MeshLocation {
   int dimension; 
   int bufferIndex; 
@@ -25,11 +27,19 @@ struct MeshLocation {
 template<class dtype>
 class MeshStreamerMessage : public CMessage_MeshStreamerMessage<dtype> {
 public:
+#ifdef STAGED_COMPLETION
+  int finalMsgCount; 
+  int dimension;
+#endif
     int numDataItems;
     int *destinationPes;
     dtype *data;
 
-    MeshStreamerMessage(): numDataItems(0) {}   
+  MeshStreamerMessage(): numDataItems(0) {
+#ifdef STAGED_COMPLETION
+    finalMsgCount = -1; 
+#endif
+  }
 
     int addDataItem(const dtype &dataItem) {
         data[numDataItems] = dataItem;
@@ -45,6 +55,17 @@ public:
     }
 };
 
+/*
+template<class dtype>
+class StagedCompletionMessage : public CMessage_StagedCompletionMessage<dtype> {
+  int finalMsgCount_; 
+  int dimension_;
+  CompletionMessage() {}
+  Completionmessage(int finalMsgCount, int dimension)
+   : finalMsgCount_{finalMsgCount), dimension_(dimension) {}
+};
+*/
+
 template <class dtype>
 class MeshStreamerArrayClient : public CBase_MeshStreamerArrayClient<dtype>{
  private:
@@ -89,7 +110,9 @@ class MeshStreamerGroupClient : public CBase_MeshStreamerGroupClient<dtype>{
       dtype &data = msg->getDataItem(i);
       process(data);
     }
+#ifndef STAGED_COMPLETION
     detectorLocalObj_->consume(msg->numDataItems);
+#endif
     delete msg;
   }
 };
@@ -131,6 +154,16 @@ private:
     bool *isCached_; 
 #endif
 
+
+  // only used for staged completion
+  int **cntMsgSent_;
+  int *cntMsgReceived_;
+  int *cntMsgExpected_;
+  int *cntFinished_; 
+  int dimensionToFlush_;
+  int numLocalDone_; 
+
+
     MeshLocation determineLocation(int destinationPe);
 
     void storeMessage(int destinationPe, 
@@ -140,15 +173,16 @@ private:
     virtual void deliverToDestination(
                  int destinationPe, 
                  MeshStreamerMessage<dtype> *destinationBuffer) = 0;
-
     virtual void localDeliver(dtype &dataItem) = 0; 
 
     virtual int numElementsInClient() = 0;
+    virtual int numLocalElementsInClient() = 0; 
 
     virtual void initLocalClients() = 0;
 
-    void flushLargestBuffer();
+    void sendLargestBuffer();
     void flushToIntermediateDestinations();
+    void flushDimension(int dimension, bool sendMsgCounts = false); 
 
 protected:
 
@@ -190,9 +224,104 @@ public:
     }
 
     void done(int numContributorsFinished = 1) {
+#ifdef STAGED_COMPLETION
+      numLocalDone_ += numContributorsFinished; 
+      if (numLocalDone_ == numLocalElementsInClient()) {
+        startStagedCompletion();
+      }
+#else
       detectorLocalObj_->done(numContributorsFinished);
+#endif
     }
 
+    void init(CkCallback startCb, CkCallback endCb, int prio);
+  
+    void startStagedCompletion() {    
+      /*
+  int dimension = msg->dimension;
+  cntMsgReceived_[dimension]++;
+  if (msg->finalMsgCount != -1) {
+    cntFinished_[dimension]++; 
+    cntMsgExpected_[dimension] += msg->finalMsgCount; 
+  }  
+  if (cntFinished_[dimension] == individualDimensionSizes_[dimension] - 1) {
+    if (cntMsgExpected_[dimension] == cntMsgReceived_[dimension]) {
+      flushDimension(dimension - 1); 
+    }
+  }
+      */
+      
+      if (individualDimensionSizes_[dimensionToFlush_] != 1) {
+        flushDimension(dimensionToFlush_, true);
+      }
+      dimensionToFlush_--;
+
+      checkForCompletedStages();
+      /*
+      do {
+        if (individualDimensionSizes_[dimensionToFlush_] != 1) {
+          flushDimension(dimensionToFlush_, true);
+        }
+        dimensionToFlush_--;
+      } while (dimensionToFlush_ >= 0 
+               && cntFinished_[dimensionToFlush_ + 1] 
+                    == individualDimensionSizes_[dimensionToFlush_ + 1] - 1
+               && cntMsgExpected_[dimensionToFlush_ + 1]
+                    == cntMsgReceived_[dimensionToFlush_ + 1]);
+      // sendStagedCompletion(numDimensions_ - 1); 
+      */
+    }
+
+  void markMessageReceived(int dimension, int finalCount) {
+    cntMsgReceived_[dimension]++;
+    if (finalCount != -1) {
+      cntFinished_[dimension]++; 
+      cntMsgExpected_[dimension] += finalCount; 
+    }  
+    if (dimensionToFlush_ != numDimensions_ - 1) {
+      checkForCompletedStages();
+    }
+  }
+
+  void checkForCompletedStages() {
+
+    while (cntFinished_[dimensionToFlush_ + 1] == 
+           individualDimensionSizes_[dimensionToFlush_ + 1] - 1 &&
+           cntMsgExpected_[dimensionToFlush_ + 1] == 
+           cntMsgReceived_[dimensionToFlush_ + 1]) {
+        if (dimensionToFlush_ == -1) {
+          this->contribute(userCallback_);
+        }
+        else if (individualDimensionSizes_[dimensionToFlush_] != 1) {
+          flushDimension(dimensionToFlush_, true); 
+        }
+        dimensionToFlush_--; 
+      }
+  }
+
+  /*
+  void sendStagedCompletion(int dimension) {
+    
+    for (int i = 0; i < dimensionSizes_[dimension]; i++) {
+      int destinationIndex = myIndex_ + 
+        (i - myLocationIndex_[dimension]) *
+        combinedDimensionSizes_[dimension] ;
+
+      StagedCompletionMsg *msg = new (sizeof(int)) StagedCompletionMsg();
+      this->thisProxy[destinationIndex_].receiveStagedCompletion(msg);
+    }
+  }
+  
+  void receiveStagedCompletion(StagedCompletionMsg *msg) {
+    msgCntExpected[msg->dimension_][msg->bufferIndex_] = msg->finalMsgCount_; 
+    cntFinished_[msg->dimension_]++; 
+    if (cntFinished[msg->dimension_] == 
+        individualDimensionSizes_[msg->dimension_]) {
+      sendstagedCompletion(msg->dimension_ - 1);
+    }
+    delete msg; 
+  }
+  */
 };
 
 template <class dtype>
@@ -265,6 +394,15 @@ MeshStreamer<dtype>::MeshStreamer(
   std::fill(isCached_, isCached_ + numMembers_, false);
 #endif
 
+#ifdef STAGED_COMPLETION
+
+  cntMsgSent_ = new int*[numDimensions_]; 
+  cntMsgReceived_ = new int[numDimensions_];
+  cntMsgExpected_ = new int[numDimensions_];
+  cntFinished_ = new int[numDimensions_]; 
+
+#endif
+
 }
 
 template <class dtype>
@@ -286,6 +424,16 @@ MeshStreamer<dtype>::~MeshStreamer() {
   delete[] isCached_; 
 #endif
 
+#ifdef STAGED_COMPLETION
+  for (int i = 0; i < numDimensions_; i++) {
+    delete[] cntMsgSent_[i]; 
+  }
+  delete[] cntMsgSent_; 
+  delete[] cntMsgReceived_; 
+  delete[] cntMsgExpected_; 
+  delete[] cntFinished_;
+#endif
+
 }
 
 
@@ -358,6 +506,9 @@ void MeshStreamer<dtype>::storeMessage(
     }
     *(int *) CkPriorityPtr(messageBuffers[bufferIndex]) = prio_;
     CkSetQueueing(messageBuffers[bufferIndex], CK_QUEUEING_IFIFO);
+#ifdef STAGED_COMPLETION
+      messageBuffers[bufferIndex]->dimension = dimension; 
+#endif
 #ifdef DEBUG_STREAMER
     CkAssert(messageBuffers[bufferIndex] != NULL);
 #endif
@@ -387,6 +538,10 @@ void MeshStreamer<dtype>::storeMessage(
       this->thisProxy[destinationIndex].receiveAlongRoute(destinationBuffer);
     }
 
+#ifdef STAGED_COMPLETION
+    cntMsgSent_[dimension][destinationIndex]++; 
+#endif
+
     messageBuffers[bufferIndex] = NULL;
     numDataItemsBuffered_ -= numBuffered; 
     hasSentRecently_ = true; 
@@ -394,7 +549,7 @@ void MeshStreamer<dtype>::storeMessage(
   }
   // send if total buffering capacity has been reached
   else if (numDataItemsBuffered_ == totalBufferCapacity_) {
-    flushLargestBuffer();
+    sendLargestBuffer();
     hasSentRecently_ = true; 
   }
 
@@ -420,8 +575,9 @@ void MeshStreamer<dtype>::insertData(void *dataItemHandle, int destinationPe) {
 template <class dtype>
 inline
 void MeshStreamer<dtype>::insertData(dtype &dataItem, int destinationPe) {
-
+#ifndef STAGED_COMPLETION
   detectorLocalObj_->produce();
+#endif
   if (destinationPe == CkMyPe()) {
     // copying here is necessary - user code should not be 
     // passed back a reference to the original item
@@ -433,12 +589,36 @@ void MeshStreamer<dtype>::insertData(dtype &dataItem, int destinationPe) {
   insertData((void *) &dataItem, destinationPe);
 }
 
+template <class dtype>
+void MeshStreamer<dtype>::init(CkCallback startCb, CkCallback endCb, 
+                               int prio) {
+
+  for (int i = 0; i < numDimensions_; i++) {
+    cntMsgSent_[i] = new int[individualDimensionSizes_[i]]; 
+    std::fill(cntMsgSent_[i], 
+              cntMsgSent_[i] + individualDimensionSizes_[i], 0);
+    cntMsgReceived_[i] = 0;
+    cntMsgExpected_[i] = 0; 
+    cntFinished_[i] = 0; 
+  }
+  dimensionToFlush_ = numDimensions_ - 1;
+
+  yieldCount_ = 0; 
+  userCallback_ = endCb; 
+  prio_ = prio;
+
+  numLocalDone_ = 0; 
+  initLocalClients();
+  this->contribute(startCb);
+}
+
 template <class dtype>
 void MeshStreamer<dtype>::associateCallback(
                          int numContributors,
                          CkCallback startCb, CkCallback endCb, 
                          CProxy_CompletionDetector detector, 
                          int prio) {
+
 #ifdef STREAMER_EXPERIMENTAL
   immediateMode_ = false;
   hasSentPreviously_ = false; 
@@ -502,18 +682,22 @@ void MeshStreamer<dtype>::receiveAlongRoute(MeshStreamerMessage<dtype> *msg) {
     lastDestinationPe = destinationPe; 
   }
 
-#ifdef STRAMER_EXPERIMENTAL
+#ifdef STREAMER_EXPERIMENTAL
   if (immediateMode_) {
     flushToIntermediateDestinations();
   }
 #endif
 
+#ifdef STAGED_COMPLETION
+  markMessageReceived(msg->dimension, msg->finalMsgCount); 
+#endif
+
   delete msg;
 
 }
 
 template <class dtype>
-void MeshStreamer<dtype>::flushLargestBuffer() {
+void MeshStreamer<dtype>::sendLargestBuffer() {
 
   int flushDimension, flushIndex, maxSize, destinationIndex, numBuffers;
   MeshStreamerMessage<dtype> ** messageBuffers; 
@@ -557,6 +741,11 @@ void MeshStreamer<dtype>::flushLargestBuffer() {
       else {
        this->thisProxy[destinationIndex].receiveAlongRoute(destinationBuffer);
       }
+
+#ifdef STAGED_COMPLETION
+      cntMsgSent_[i][flushIndex]++; 
+#endif
+
       messageBuffers[flushIndex] = NULL;
 
     }
@@ -613,50 +802,70 @@ void MeshStreamer<dtype>::flushAllBuffers() {
 
 template <class dtype>
 void MeshStreamer<dtype>::flushToIntermediateDestinations() {
+  
+  for (int i = 0; i < numDimensions_; i++) {
+    flushDimension(i); 
+  }
+}
 
+template <class dtype>
+void MeshStreamer<dtype>::flushDimension(int dimension, bool sendMsgCounts) {
   MeshStreamerMessage<dtype> **messageBuffers; 
   MeshStreamerMessage<dtype> *destinationBuffer; 
   int destinationIndex, numBuffers; 
 
-  for (int i = 0; i < numDimensions_; i++) {
-
-    messageBuffers = dataBuffers_[i]; 
-    numBuffers = individualDimensionSizes_[i]; 
+  messageBuffers = dataBuffers_[dimension]; 
+  numBuffers = individualDimensionSizes_[dimension]; 
 
-    for (int j = 0; j < numBuffers; j++) {
+  for (int j = 0; j < numBuffers; j++) {
 
-      if(messageBuffers[j] == NULL) {
-       continue;
+    if(messageBuffers[j] == NULL) {      
+      if (sendMsgCounts && j != myLocationIndex_[dimension]) {
+        messageBuffers[j] = 
+          new (0, 0, sizeof(int)) MeshStreamerMessage<dtype>();
       }
+      else {
+        continue; 
+      } 
+    }
 
-      messageBuffers = dataBuffers_[i]; 
-      destinationBuffer = messageBuffers[j];
-      destinationIndex = myIndex_ + 
-       (j - myLocationIndex_[i]) * 
-       combinedDimensionSizes_[i] ;
+    destinationBuffer = messageBuffers[j];
+    destinationIndex = myIndex_ + 
+      (j - myLocationIndex_[dimension]) * 
+      combinedDimensionSizes_[dimension] ;
 
-      if (destinationBuffer->numDataItems < bufferSize_) {
-       // not sending the full buffer, shrink the message size
-       envelope *env = UsrToEnv(destinationBuffer);
-       env->setTotalsize(env->getTotalsize() - sizeof(dtype) *
-                         (bufferSize_ - destinationBuffer->numDataItems));
-       *((int *) env->getPrioPtr()) = prio_;
+    if (destinationBuffer->numDataItems < bufferSize_) {
+#ifdef STAGED_COMPLETION
+      if (destinationBuffer->numDataItems != 0) {
+#endif
+      // not sending the full buffer, shrink the message size
+      envelope *env = UsrToEnv(destinationBuffer);
+      env->setTotalsize(env->getTotalsize() - sizeof(dtype) *
+                        (bufferSize_ - destinationBuffer->numDataItems));
+      *((int *) env->getPrioPtr()) = prio_;
+#ifdef STAGED_COMPLETION
       }
-      numDataItemsBuffered_ -= destinationBuffer->numDataItems;
+#endif
+    }
+    numDataItemsBuffered_ -= destinationBuffer->numDataItems;
 
-      if (i == 0) {
-        deliverToDestination(destinationIndex, destinationBuffer);
-      }
-      else {
-       this->thisProxy[destinationIndex].receiveAlongRoute(destinationBuffer);
-      }
-      messageBuffers[j] = NULL;
+#ifdef STAGED_COMPLETION
+      destinationBuffer->dimension = dimension; 
+      destinationBuffer->finalMsgCount = ++cntMsgSent_[dimension][j];
+#endif
+
+    if (dimension == 0) {
+      deliverToDestination(destinationIndex, destinationBuffer);
     }
+    else {
+      this->thisProxy[destinationIndex].receiveAlongRoute(destinationBuffer);
+    }
+    messageBuffers[j] = NULL;
   }
+  
 }
 
 
-
 template <class dtype>
 void MeshStreamer<dtype>::flushDirect(){
   // flush if (1) this is not a periodic call or 
@@ -724,7 +933,9 @@ private:
 
   void localDeliver(dtype &dataItem) {
     clientObj_->process(dataItem);
+#ifndef STAGED_COMPLETION
     MeshStreamer<dtype>::detectorLocalObj_->consume();
+#endif
   }
 
   int numElementsInClient() {
@@ -732,8 +943,14 @@ private:
     return CkNumPes();
   }
 
+  int numLocalElementsInClient() {
+    return 1; 
+  }
+
   void initLocalClients() {
+#ifndef STAGED_COMPLETION
     clientObj_->setDetector(MeshStreamer<dtype>::detectorLocalObj_);
+#endif
   }
 
 public:
@@ -783,6 +1000,7 @@ private:
   CProxy_MeshStreamerArrayClient<dtype> clientProxy_;
   CkArray *clientArrayMgr_;
   int numArrayElements_;
+  int numLocalArrayElements_;
 #ifdef CACHE_ARRAY_METADATA
   MeshStreamerArrayClient<dtype> **clientObjs_;
   int *destinationPes_;
@@ -809,7 +1027,9 @@ private:
 
     if (clientObj != NULL) {
       clientObj->process(packedDataItem.dataItem);
+#ifndef STAGED_COMPLETION
       MeshStreamer<ArrayDataItem<dtype, itype> >::detectorLocalObj_->consume();
+#endif
     }
     else { 
       // array element is no longer present locally - redeliver using proxy
@@ -821,9 +1041,14 @@ private:
     return numArrayElements_;
   }
 
+  int numLocalElementsInClient() {
+    return numLocalArrayElements_;
+  }
+
   void initLocalClients() {
+#ifndef STAGED_COMPLETION
 
-#ifdef CACHE_ARRAY_METADATA
+  #ifdef CACHE_ARRAY_METADATA
     std::fill(isCachedArrayMetadata_, 
              isCachedArrayMetadata_ + numArrayElements_, false);
 
@@ -834,15 +1059,18 @@ private:
         MeshStreamer<ArrayDataItem<dtype, itype> >::detectorLocalObj_);
       }
     }
-#else
+  #else
     // set completion detector in local elements of the client
     CkLocMgr *clientLocMgr = clientProxy_.ckLocMgr(); 
     MeshStreamerClientIterator<dtype> clientIterator(
      MeshStreamer<ArrayDataItem<dtype, itype> >::detectorLocalObj_, 
      clientProxy_.ckLocalBranch());
     clientLocMgr->iterate(clientIterator);
-#endif    
+  #endif    
 
+#else 
+    numLocalArrayElements_ = clientProxy_.ckLocMgr()->numLocalElements();
+#endif
   }
 
 public:
@@ -863,9 +1091,8 @@ public:
     clientProxy_ = clientProxy; 
     clientArrayMgr_ = clientProxy_.ckLocalBranch();
 
-    numArrayElements_ = (clientArrayMgr_->getNumInitial()).data()[0];
-
 #ifdef CACHE_ARRAY_METADATA
+    numArrayElements_ = (clientArrayMgr_->getNumInitial()).data()[0];
     clientObjs_ = new MeshStreamerArrayClient<dtype>*[numArrayElements_];
     destinationPes_ = new int[numArrayElements_];
     isCachedArrayMetadata_ = new bool[numArrayElements_];
@@ -884,16 +1111,22 @@ public:
 
   void receiveArrayData(
        MeshStreamerMessage<ArrayDataItem<dtype, itype> > *msg) {
+
     for (int i = 0; i < msg->numDataItems; i++) {
       ArrayDataItem<dtype, itype> &packedData = msg->getDataItem(i);
       localDeliver(packedData);
     }
+#ifdef STAGED_COMPLETION
+    markMessageReceived(msg->dimension, msg->finalMsgCount);
+#endif
+
     delete msg;
   }
 
   void insertData(dtype &dataItem, itype arrayIndex) {
-
+#ifndef STAGED_COMPLETION
     MeshStreamer<ArrayDataItem<dtype, itype> >::detectorLocalObj_->produce();
+#endif
     int destinationPe; 
 #ifdef CACHE_ARRAY_METADATA
   if (isCachedArrayMetadata_[arrayIndex]) {