NDMeshStreamer performance optimizations and bug fixes:
authorLukasz Wesolowski <wesolwsk@illinois.edu>
Fri, 10 Feb 2012 02:00:53 +0000 (20:00 -0600)
committerLukasz Wesolowski <wesolwsk@illinois.edu>
Fri, 10 Feb 2012 02:00:53 +0000 (20:00 -0600)
(1) only compute the index information necessary for sending
in the current step
(2) process items immediately at intermediate steps when intermediate
node turns out to be the final destination
(3) fixed a bug in the location caching scheme
(4) code cleanup

src/libs/ck-libs/NDMeshStreamer/NDMeshStreamer.h

index 162b3c3f6150214ef33b426d8f0d23e853fcfb26..c403723b637d56e31bc3fd67865a764e8c564ded 100644 (file)
@@ -6,28 +6,16 @@
 
 // allocate more total buffer space than the maximum buffering limit but flush 
 //   upon reaching totalBufferCapacity_
-#define BUCKET_SIZE_FACTOR 4
+#define BUFFER_SIZE_FACTOR 4
 
 // #define DEBUG_STREAMER
 // #define CACHE_LOCATIONS
 // #define SUPPORT_INCOMPLETE_MESH
 
-class MeshLocation {
- public:
-
-  int *locationIndex; 
-  int msgType;
-
-  MeshLocation() {
-    // empty
-  }
-  MeshLocation(int numDimensions) {
-    locationIndex = new int[numDimensions]; 
-  }
-  ~MeshLocation() {
-    delete[] locationIndex; 
-  }
-};
+typedef struct {
+  int dimension; 
+  int bufferIndex; 
+} MeshLocation;
 
 template<class dtype>
 class MeshStreamerMessage : public CMessage_MeshStreamerMessage<dtype> {
@@ -63,7 +51,7 @@ template <class dtype>
 class MeshStreamer : public CBase_MeshStreamer<dtype> {
 
 private:
-    int bucketSize_; 
+    int bufferSize_; 
     int totalBufferCapacity_;
     int numDataItemsBuffered_;
 
@@ -101,14 +89,13 @@ private:
 #endif
     */
 
-    void determineLocation(const int destinationPe, 
-                          MeshLocation &destinationCoordinates);
+    MeshLocation determineLocation(int destinationPe);
 
-    void storeMessage(const int destinationPe, 
+    void storeMessage(int destinationPe, 
                      const MeshLocation &destinationCoordinates, 
                      const dtype &dataItem);
 
-    void flushLargestBucket();
+    void flushLargestBuffer();
 
 public:
 
@@ -127,7 +114,7 @@ public:
     bool isPeriodicFlushEnabled() {
       return isPeriodicFlushEnabled_;
     }
-    void insertData(dtype &dataItem, const int destinationPe); 
+    void insertData(dtype &dataItem, int destinationPe); 
     void doneInserting();
     void associateCallback(CkCallback &cb, bool automaticFinish = true) { 
       userCallback_ = cb;
@@ -136,7 +123,7 @@ public:
                             this->thisProxy));
       }
     }
-    void flushAllBuckets();
+    void flushAllBuffers();
     void registerPeriodicProgressFunction();
 
     // flushing begins only after enablePeriodicFlushing has been invoked
@@ -170,8 +157,8 @@ MeshStreamer<dtype>::MeshStreamer(
   progressPeriodInMs_(progressPeriodInMs)
 {
   // limit total number of messages in system to totalBufferCapacity
-  //   but allocate a factor BUCKET_SIZE_FACTOR more space to take
-  //   advantage of nonuniform filling of buckets
+  //   but allocate a factor BUFFER_SIZE_FACTOR more space to take
+  //   advantage of nonuniform filling of buffers
 
   int sumAlongAllDimensions = 0;   
   individualDimensionSizes_ = new int[numDimensions_];
@@ -188,10 +175,10 @@ MeshStreamer<dtype>::MeshStreamer(
 
   // except for personalized messages, the buffers for dimensions with the 
   //   same index as the sender's are not used
-  bucketSize_ = BUCKET_SIZE_FACTOR * totalBufferCapacity 
+  bufferSize_ = BUFFER_SIZE_FACTOR * totalBufferCapacity 
     / (sumAlongAllDimensions - numDimensions_ + 1); 
-  if (bucketSize_ <= 0) {
-    bucketSize_ = 1; 
+  if (bufferSize_ <= 0) {
+    bufferSize_ = 1; 
     CkPrintf("Argument totalBufferCapacity to MeshStreamer constructor "
             "is invalid. Defaulting to a single buffer per destination.\n");
   }
@@ -253,11 +240,7 @@ MeshStreamer<dtype>::~MeshStreamer() {
   delete[] myLocationIndex_;
 
 #ifdef CACHE_LOCATIONS
-  for (int i = 0; i < numNodes_; i++) {
-    if (cachedLocations[i] != NULL) {
-      delete cachedLocations[i]; 
-    }
-  }
+  delete[] cachedLocations;
   delete[] isCached; 
 #endif
 
@@ -265,103 +248,92 @@ MeshStreamer<dtype>::~MeshStreamer() {
 
 
 template <class dtype>
-void MeshStreamer<dtype>::determineLocation(
-                         const int destinationPe, 
-                         MeshLocation &destinationCoordinates) { 
-
-  int nodeIndex;
+inline 
+MeshLocation MeshStreamer<dtype>::determineLocation(int destinationPe) { 
 
 #ifdef CACHE_LOCATIONS
-  if (isCached[destinationPe] == true) {
-    destinationCoordinates = cachedLocations[destinationPe]; 
-    return;
+  if (isCached[destinationPe]) {    
+    return cachedLocations[destinationPe]; 
   }
 #endif
 
-  nodeIndex = destinationPe;
-
-  int remainder = nodeIndex;
-  bool isMsgTypeSet = false;
-  for (int i = numDimensions_ - 1; i >= 0; i--) {    
-    destinationCoordinates.locationIndex[i] = 
-      remainder / combinedDimensionSizes_[i];
+  MeshLocation destinationLocation;
+  int remainder = destinationPe;
+  int dimensionIndex; 
+  for (int i = numDimensions_ - 1; i >= 0; i--) {        
+    dimensionIndex = remainder / combinedDimensionSizes_[i];
     
-    if (!isMsgTypeSet && 
-       destinationCoordinates.locationIndex[i] != myLocationIndex_[i]) {
-      destinationCoordinates.msgType = i; 
-      isMsgTypeSet = true; 
+    if (dimensionIndex != myLocationIndex_[i]) {
+      destinationLocation.dimension = i; 
+      destinationLocation.bufferIndex = dimensionIndex; 
+#ifdef CACHE_LOCATIONS
+      cachedLocations[destinationPe] = destinationLocation;
+      isCached[destinationPe] = true; 
+#endif
+      return destinationLocation;
     }
 
-    remainder -= 
-      combinedDimensionSizes_[i] * destinationCoordinates.locationIndex[i];
-  }
-
-  // personalized message for oneself
-  if (!isMsgTypeSet) {
-    destinationCoordinates.msgType = 0;
+    remainder -= combinedDimensionSizes_[i] * dimensionIndex;
   }
 
-#ifdef CACHE_LOCATIONS
-  cachedLocations[destinationPe].locationIndex = new int[numDimensions_];
-  memcpy(cachedLocations[destinationPe].locationIndex, 
-        destinationCoordinates.locationIndex, numDimensions_ * sizeof(int));
-  cachedLocations[destinationPe].msgType = destinationCoordinates.msgType;
-#endif
-
+  // all indices agree - message to oneself
+  destinationLocation.dimension = 0; 
+  destinationLocation.bufferIndex = myLocationIndex_[0];
+  return destinationLocation; 
 }
 
 template <class dtype>
+inline
 void MeshStreamer<dtype>::storeMessage(
-                         const int destinationPe, 
-                         const MeshLocation& destinationCoordinates,
+                         int destinationPe, 
+                         const MeshLocation& destinationLocation,
                          const dtype &dataItem) {
 
-  int *locationIndex = destinationCoordinates.locationIndex; 
-  int msgType = destinationCoordinates.msgType;
-  MeshStreamerMessage<dtype> ** messageBuffers = dataBuffers_[msgType]; 
-  int bucketIndex = destinationCoordinates.locationIndex[msgType]; 
+  int dimension = destinationLocation.dimension;
+  int bufferIndex = destinationLocation.bufferIndex; 
+  MeshStreamerMessage<dtype> ** messageBuffers = dataBuffers_[dimension];   
 
   // allocate new message if necessary
-  if (messageBuffers[bucketIndex] == NULL) {
-    if (msgType == 0) {
+  if (messageBuffers[bufferIndex] == NULL) {
+    if (dimension == 0) {
       // personalized messages do not require destination indices
-      messageBuffers[bucketIndex] = 
-        new (0, bucketSize_) MeshStreamerMessage<dtype>();
+      messageBuffers[bufferIndex] = 
+        new (0, bufferSize_) MeshStreamerMessage<dtype>();
     }
     else {
-      messageBuffers[bucketIndex] = 
-        new (bucketSize_, bucketSize_) MeshStreamerMessage<dtype>();
+      messageBuffers[bufferIndex] = 
+        new (bufferSize_, bufferSize_) MeshStreamerMessage<dtype>();
     }
 #ifdef DEBUG_STREAMER
-    CkAssert(messageBuffers[bucketIndex] != NULL);
+    CkAssert(messageBuffers[bufferIndex] != NULL);
 #endif
   }
   
-  MeshStreamerMessage<dtype> *destinationBucket = messageBuffers[bucketIndex];
+  MeshStreamerMessage<dtype> *destinationBuffer = messageBuffers[bufferIndex];
   
-  int numBuffered = destinationBucket->addDataItem(dataItem); 
-  if (msgType != 0) {
-    destinationBucket->markDestination(numBuffered-1, destinationPe);
+  int numBuffered = destinationBuffer->addDataItem(dataItem); 
+  if (dimension != 0) {
+    destinationBuffer->markDestination(numBuffered-1, destinationPe);
   }
   numDataItemsBuffered_++;
 
   // copy data into message and send if buffer is full
-  if (numBuffered == bucketSize_) {
+  if (numBuffered == bufferSize_) {
 
     int destinationIndex;
 
     destinationIndex = myNodeIndex_ + 
-      (locationIndex[msgType] - myLocationIndex_[msgType]) * 
-      combinedDimensionSizes_[msgType];
+      (bufferIndex - myLocationIndex_[dimension]) * 
+      combinedDimensionSizes_[dimension];
 
-    if (msgType == 0) {
-      clientProxy_[destinationIndex].receiveCombinedData(destinationBucket);      
+    if (dimension == 0) {
+      clientProxy_[destinationIndex].receiveCombinedData(destinationBuffer);      
     }
     else {
-      this->thisProxy[destinationIndex].receiveAlongRoute(destinationBucket);
+      this->thisProxy[destinationIndex].receiveAlongRoute(destinationBuffer);
     }
 
-    messageBuffers[bucketIndex] = NULL;
+    messageBuffers[bufferIndex] = NULL;
     numDataItemsBuffered_ -= numBuffered; 
 
     if (isPeriodicFlushEnabled_) {
@@ -371,7 +343,7 @@ void MeshStreamer<dtype>::storeMessage(
   }
 
   if (numDataItemsBuffered_ == totalBufferCapacity_) {
-    flushLargestBucket();
+    flushLargestBuffer();
     if (isPeriodicFlushEnabled_) {
       timeOfLastSend_ = CkWallTimer();
     }
@@ -380,8 +352,7 @@ void MeshStreamer<dtype>::storeMessage(
 }
 
 template <class dtype>
-void MeshStreamer<dtype>::insertData(dtype &dataItem, 
-                                    const int destinationPe) {
+void MeshStreamer<dtype>::insertData(dtype &dataItem, int destinationPe) {
   static int count = 0;
 
   if (destinationPe == CkMyPe()) {
@@ -389,12 +360,8 @@ void MeshStreamer<dtype>::insertData(dtype &dataItem,
     return;
   }
 
-  static MeshLocation destinationCoordinates(numDimensions_);
-
-  determineLocation(destinationPe, destinationCoordinates);
-
-  int msgType = destinationCoordinates.msgType; 
-  storeMessage(destinationPe, destinationCoordinates, dataItem); 
+  MeshLocation destinationLocation = determineLocation(destinationPe);
+  storeMessage(destinationPe, destinationLocation, dataItem); 
 
   // release control to scheduler if requested by the user, 
   //   assume caller is threaded entry
@@ -420,7 +387,7 @@ void MeshStreamer<dtype>::finish(CkReductionMsg *msg) {
     userCallback_ = CkCallback();      // nullify the current callback
   }
 
-  // TODO: TEST IF THIS DELETE STILL CAUSES UNEXPLAINED HANGS
+  // TODO: TEST IF THIS DELETE STILL CAUSES UNEXPLAINED CRASHES
   //  delete msg; 
 }
 
@@ -428,23 +395,18 @@ template <class dtype>
 void MeshStreamer<dtype>::receiveAlongRoute(MeshStreamerMessage<dtype> *msg) {
 
   int destinationPe; 
-  int msgType; 
-  static MeshLocation destinationCoordinates(numDimensions_);
+  MeshLocation destinationLocation;
 
   for (int i = 0; i < msg->numDataItems; i++) {
     destinationPe = msg->destinationPes[i];
     dtype &dataItem = msg->getDataItem(i);
-    determineLocation(destinationPe, destinationCoordinates);
-    msgType = destinationCoordinates.msgType;
-
-#ifdef DEBUG_STREAMER
-    for (int j = numDimensions_ - 1; j >= msgType; j--) {
-      CkAssert(destinationCoordinates.locationIndex[j] == myLocationIndex_[j]);
+    destinationLocation = determineLocation(destinationPe);
+    if (destinationPe == CkMyPe()) {
+      clientObj_->process(dataItem);
+    }
+    else {
+      storeMessage(destinationPe, destinationLocation, dataItem);   
     }
-#endif    
-
-    storeMessage(destinationPe, destinationCoordinates, dataItem);
-    
   }
 
   delete msg;
@@ -452,11 +414,11 @@ void MeshStreamer<dtype>::receiveAlongRoute(MeshStreamerMessage<dtype> *msg) {
 }
 
 template <class dtype>
-void MeshStreamer<dtype>::flushLargestBucket() {
+void MeshStreamer<dtype>::flushLargestBuffer() {
 
   int flushDimension, flushIndex, maxSize, destinationIndex, numBuffers;
   MeshStreamerMessage<dtype> ** messageBuffers; 
-  MeshStreamerMessage<dtype> *destinationBucket
+  MeshStreamerMessage<dtype> *destinationBuffer
 
   maxSize = 0;    
   for (int i = 0; i < numDimensions_; i++) {
@@ -478,24 +440,24 @@ void MeshStreamer<dtype>::flushLargestBucket() {
   if (maxSize > 0) {
 
     messageBuffers = dataBuffers_[flushDimension]; 
-    destinationBucket = messageBuffers[flushIndex];
+    destinationBuffer = messageBuffers[flushIndex];
     destinationIndex = myNodeIndex_ + 
       (flushIndex - myLocationIndex_[flushDimension]) * 
       combinedDimensionSizes_[flushDimension] ;
 
-    if (destinationBucket->numDataItems < bucketSize_) {
+    if (destinationBuffer->numDataItems < bufferSize_) {
       // not sending the full buffer, shrink the message size
-      envelope *env = UsrToEnv(destinationBucket);
+      envelope *env = UsrToEnv(destinationBuffer);
       env->setTotalsize(env->getTotalsize() - sizeof(dtype) *
-                       (bucketSize_ - destinationBucket->numDataItems));
+                       (bufferSize_ - destinationBuffer->numDataItems));
     }
-    numDataItemsBuffered_ -= destinationBucket->numDataItems;
+    numDataItemsBuffered_ -= destinationBuffer->numDataItems;
 
     if (flushDimension == 0) {
-      clientProxy_[destinationIndex].receiveCombinedData(destinationBucket);
+      clientProxy_[destinationIndex].receiveCombinedData(destinationBuffer);
     }
     else {
-      this->thisProxy[destinationIndex].receiveAlongRoute(destinationBucket);
+      this->thisProxy[destinationIndex].receiveAlongRoute(destinationBuffer);
     }
     messageBuffers[flushIndex] = NULL;
 
@@ -504,7 +466,7 @@ void MeshStreamer<dtype>::flushLargestBucket() {
 }
 
 template <class dtype>
-void MeshStreamer<dtype>::flushAllBuckets() {
+void MeshStreamer<dtype>::flushAllBuffers() {
 
   MeshStreamerMessage<dtype> **messageBuffers; 
   int numBuffers; 
@@ -520,7 +482,6 @@ void MeshStreamer<dtype>::flushAllBuckets() {
        continue;
       }
 
-      //flush all messages in j bucket
       numDataItemsBuffered_ -= messageBuffers[j]->numDataItems;
 
       if (i == 0) {
@@ -553,7 +514,7 @@ void MeshStreamer<dtype>::flushDirect(){
 
     if (!isPeriodicFlushEnabled_ || 
        1000 * (CkWallTimer() - timeOfLastSend_) >= progressPeriodInMs_) {
-      flushAllBuckets();
+      flushAllBuffers();
     }
 
     if (isPeriodicFlushEnabled_) {