when flush message buffer which are below the full capacity, set message size to... hpcc-2011 hpcc-2011
authorGengbin Zheng <gzheng@illinois.edu>
Tue, 25 Oct 2011 07:10:50 +0000 (02:10 -0500)
committerGengbin Zheng <gzheng@illinois.edu>
Tue, 25 Oct 2011 07:10:50 +0000 (02:10 -0500)
src/libs/ck-libs/MeshStreamer/MeshStreamer.ci
src/libs/ck-libs/MeshStreamer/MeshStreamer.h

index c268d478fc1527eead4f0c163fd063f5b8233882..6ddacafcce1220214c61a0919c9fb08eaa9be24a 100644 (file)
@@ -1,25 +1,27 @@
 module MeshStreamer {
 
-  template<class dtype> message MeshStreamerMessage {
+  template<class dtype> 
+  message MeshStreamerMessage {
     int destinationPes[];
     dtype data[]; 
   };
+
   /*
   message LocalMessage {
     char data[]; 
   };
   */
-  template<class dtype> group MeshStreamerClient {
 
+  template<class dtype> group MeshStreamerClient {
     entry MeshStreamerClient();
     entry void receiveCombinedData(MeshStreamerMessage<dtype> *msg);
   };
 
-  template<class dtype> group MeshStreamer {
+  template<class dtype> 
+  group MeshStreamer {
     entry MeshStreamer(int totalBufferCapacity, int numRows, 
                       int numColumns, int numPlanes, 
                       const CProxy_MeshStreamerClient<dtype> &clientProxy );   
-
     // entry void insertData(CmiUInt8, int); 
     entry void receiveAggregateData(MeshStreamerMessage<dtype> *msg);
     // entry void receivePersonalizedData(MeshStreamerMessage *msg);
index 3f60e7a00b39df60ae5b291b9e1e873dc8b91387..9f3cc9c4bf0a8f27ce6d8c7e3bce22ae0fc88f45 100644 (file)
@@ -7,7 +7,6 @@
 // reaching totalBufferCapacity_
 #define BUCKET_SIZE_FACTOR 4
 
-
 //#define DEBUG_STREAMER 1
 
 enum MeshStreamerMessageType {PlaneMessage, ColumnMessage, PersonalizedMessage};
@@ -40,11 +39,11 @@ template<class dtype>
 class MeshStreamerMessage : public CMessage_MeshStreamerMessage<dtype> {
 public:
     int numDataItems;
-    int dataItemSize;
+    int capacity;
     int *destinationPes;
     dtype *data;
 
-    MeshStreamerMessage(): numDataItems(0) {}   
+    MeshStreamerMessage(int c): numDataItems(0), capacity(c) {}   
 
     int addDataItem(dtype &dataItem) {
         data[numDataItems] = dataItem;
@@ -229,11 +228,11 @@ void MeshStreamer<dtype>::storeMessage(MeshStreamerMessage<dtype> **messageBuffe
     int dataSize = bucketSize_;  
     if (msgType == PersonalizedMessage) {
       messageBuffers[bucketIndex] = 
-        new (0, dataSize) MeshStreamerMessage<dtype>;
+        new (0, dataSize) MeshStreamerMessage<dtype>(dataSize);
     }
     else {
       messageBuffers[bucketIndex] = 
-        new (bucketSize_, dataSize) MeshStreamerMessage<dtype>;
+        new (bucketSize_, dataSize) MeshStreamerMessage<dtype>(dataSize);
     }
 #ifdef DEBUG_STREAMER
     CkAssert(messageBuffers[bucketIndex] != NULL);
@@ -419,6 +418,11 @@ void MeshStreamer<dtype>::flushLargestBucket(MeshStreamerMessage<dtype> **messag
     destinationBucket = messageBuffers[flushIndex];
     destinationIndex = myNodeIndex_ + (flushIndex - myIndex) * dimensionFactor;
     if (destinationBucket != NULL) {
+      if (destinationBucket->capacity > destinationBucket->numDataItems) {
+          // not sending the full buffer, shrink the message size
+        envelope *env = UsrToEnv(destinationBucket);
+        env->setTotalsize(env->getTotalsize() - (destinationBucket->capacity - destinationBucket->numDataItems) * sizeof(dtype));
+      }
       numDataItemsBuffered_ -= destinationBucket->numDataItems;
     }
     if (messageBuffers == personalizedBuffers_) {
@@ -448,7 +452,7 @@ void MeshStreamer<dtype>::flushBuckets(MeshStreamerMessage<dtype> **messageBuffe
        else {
          for (int j = 0; j < messageBuffers[i]->numDataItems; j++) {
            MeshStreamerMessage<dtype> *directMsg = 
-             new (0, 1) MeshStreamerMessage<dtype>;
+             new (0, 1) MeshStreamerMessage<dtype>(1);
 #ifdef DEBUG_STREAMER
            CkAssert(directMsg != NULL);
 #endif