MeshStreamer: Removed capacity from MeshStreamerMessage to decrease
authorLukasz Wesolowski <wesolwsk@illinois.edu>
Fri, 28 Oct 2011 21:56:48 +0000 (16:56 -0500)
committerLukasz Wesolowski <wesolwsk@illinois.edu>
Fri, 28 Oct 2011 22:01:16 +0000 (17:01 -0500)
message size. Capacity can be obtained directly from the library.
Moved assert which checks that no messages remain after flush.
The check now occurs after all three flush phases are finished.

src/libs/ck-libs/MeshStreamer/MeshStreamer.h

index 291903a4014694cf8ddff96026deaba5859dd3db..d9c2517ec899fee7f60e18d6647b40fa4bc84c8c 100644 (file)
@@ -39,11 +39,10 @@ template<class dtype>
 class MeshStreamerMessage : public CMessage_MeshStreamerMessage<dtype> {
 public:
     int numDataItems;
-    int capacity;
     int *destinationPes;
     dtype *data;
 
-    MeshStreamerMessage(int c): numDataItems(0), capacity(c) {}   
+    MeshStreamerMessage(): numDataItems(0) {}   
 
     int addDataItem(dtype &dataItem) {
         data[numDataItems] = dataItem;
@@ -225,14 +224,13 @@ void MeshStreamer<dtype>::storeMessage(MeshStreamerMessage<dtype> **messageBuffe
 
   // allocate new message if necessary
   if (messageBuffers[bucketIndex] == NULL) {
-    int dataSize = bucketSize_;  
     if (msgType == PersonalizedMessage) {
       messageBuffers[bucketIndex] = 
-        new (0, dataSize) MeshStreamerMessage<dtype>(dataSize);
+        new (0, bucketSize_) MeshStreamerMessage<dtype>();
     }
     else {
       messageBuffers[bucketIndex] = 
-        new (bucketSize_, dataSize) MeshStreamerMessage<dtype>(dataSize);
+        new (bucketSize_, bucketSize_) MeshStreamerMessage<dtype>();
     }
 #ifdef DEBUG_STREAMER
     CkAssert(messageBuffers[bucketIndex] != NULL);
@@ -418,10 +416,10 @@ void MeshStreamer<dtype>::flushLargestBucket(MeshStreamerMessage<dtype> **messag
     destinationBucket = messageBuffers[flushIndex];
     destinationIndex = myNodeIndex_ + (flushIndex - myIndex) * dimensionFactor;
 
-    if (destinationBucket->capacity > destinationBucket->numDataItems) {
+    if (destinationBucket->numDataItems < bucketSize_) {
       // not sending the full buffer, shrink the message size
       envelope *env = UsrToEnv(destinationBucket);
-      env->setTotalsize(env->getTotalsize() - (destinationBucket->capacity - destinationBucket->numDataItems) * sizeof(dtype));
+      env->setTotalsize(env->getTotalsize() - (bucketSize_ - destinationBucket->numDataItems) * sizeof(dtype));
     }
     numDataItemsBuffered_ -= destinationBucket->numDataItems;
 
@@ -452,7 +450,7 @@ void MeshStreamer<dtype>::flushBuckets(MeshStreamerMessage<dtype> **messageBuffe
        else {
          for (int j = 0; j < messageBuffers[i]->numDataItems; j++) {
            MeshStreamerMessage<dtype> *directMsg = 
-             new (0, 1) MeshStreamerMessage<dtype>(1);
+             new (0, 1) MeshStreamerMessage<dtype>();
 #ifdef DEBUG_STREAMER
            CkAssert(directMsg != NULL);
 #endif
@@ -466,10 +464,6 @@ void MeshStreamer<dtype>::flushBuckets(MeshStreamerMessage<dtype> **messageBuffe
        messageBuffers[i] = NULL;
     }
 
-#ifdef DEBUG_STREAMER
-    CkPrintf("[%d] numDataItemsBuffered_: %d\n", CkMyPe(), numDataItemsBuffered_);
-    CkAssert(numDataItemsBuffered_ == 0); 
-#endif
 }
 
 template <class dtype>
@@ -477,6 +471,12 @@ void MeshStreamer<dtype>::flushDirect(){
     flushBuckets(planeBuffers_, numPlanes_);
     flushBuckets(columnBuffers_, numColumns_);
     flushBuckets(personalizedBuffers_, numRows_);
+
+#ifdef DEBUG_STREAMER
+    //CkPrintf("[%d] numDataItemsBuffered_: %d\n", CkMyPe(), numDataItemsBuffered_);
+    CkAssert(numDataItemsBuffered_ == 0); 
+#endif
+
 }
 
 #define CK_TEMPLATES_ONLY