MeshStreamer: added hashing of coordinate information and refactored the code
authorLukasz Wesolowski <wesolwsk@illinois.edu>
Sat, 5 Nov 2011 02:45:21 +0000 (21:45 -0500)
committerLukasz Wesolowski <wesolwsk@illinois.edu>
Sat, 5 Nov 2011 02:48:24 +0000 (21:48 -0500)
to use coordinate objects which encapsulate row, column, plane, msgType

src/libs/ck-libs/MeshStreamer/MeshStreamer.h

index 9d3c0a88bb386e9c5e4b2dfcb61940441430d572..1f8e310881821c863eb428091fe70eca91b092dd 100644 (file)
@@ -2,15 +2,28 @@
 #define _MESH_STREAMER_H_
 
 #include "MeshStreamer.decl.h"
-
 // allocate more total buffer space than the maximum buffering limit but flush upon
 // reaching totalBufferCapacity_
 #define BUCKET_SIZE_FACTOR 4
 
-//#define DEBUG_STREAMER 1
+//#define DEBUG_STREAMER
 
 enum MeshStreamerMessageType {PlaneMessage, ColumnMessage, PersonalizedMessage};
 
+class MeshLocation {
+ public:
+  int rowIndex;
+  int columnIndex;
+  int planeIndex; 
+  MeshStreamerMessageType msgType;
+};
+
+#define HASH_LOCATIONS
+
+#ifdef HASH_LOCATIONS
+#include <map>
+#endif
+
 /*
 class LocalMessage : public CMessage_LocalMessage {
 public:
@@ -98,14 +111,16 @@ private:
     MeshStreamerMessage<dtype> **columnBuffers_; 
     MeshStreamerMessage<dtype> **planeBuffers_;
 
-    void determineLocation(const int destinationPe, int &row, int &column, 
-        int &plane, MeshStreamerMessageType &msgType);
+#ifdef HASH_LOCATIONS
+    std::map<int, MeshLocation> hashedLocations; 
+#endif
+
+    void determineLocation(const int destinationPe, 
+                          MeshLocation &destinationCoordinates);
 
     void storeMessage(MeshStreamerMessage<dtype> ** const messageBuffers, 
-        const int bucketIndex, const int destinationPe, 
-        const int rowIndex, const int columnIndex, 
-        const int planeIndex,
-        const MeshStreamerMessageType msgType, const dtype &dataItem);
+                     const int bucketIndex, const int destinationPe, 
+                     const MeshLocation &destinationCoordinates, const dtype &dataItem);
 
     void flushLargestBucket(MeshStreamerMessage<dtype> ** const messageBuffers,
                            const int numBuffers, const int myIndex, 
@@ -224,42 +239,53 @@ MeshStreamer<dtype>::~MeshStreamer() {
 }
 
 template <class dtype>
-void MeshStreamer<dtype>::determineLocation(const int destinationPe, int &rowIndex, 
-                                     int &columnIndex, int &planeIndex, 
-                                     MeshStreamerMessageType &msgType) {
-  
+void MeshStreamer<dtype>::determineLocation(const int destinationPe, 
+                                           MeshLocation &destinationCoordinates) { 
+
   int nodeIndex, indexWithinPlane; 
 
+#ifdef HASH_LOCATIONS
+  std::map<int, MeshLocation>::iterator it;
+  if ((it = hashedLocations.find(destinationPe)) != hashedLocations.end()) {
+    destinationCoordinates = it->second;
+    return;
+  }
+#endif
+
   nodeIndex = destinationPe;
-  planeIndex = nodeIndex / planeSize_; 
-  if (planeIndex != myPlaneIndex_) {
-    msgType = PlaneMessage;     
+  destinationCoordinates.planeIndex = nodeIndex / planeSize_; 
+  if (destinationCoordinates.planeIndex != myPlaneIndex_) {
+    destinationCoordinates.msgType = PlaneMessage;     
   }
   else {
-    indexWithinPlane = nodeIndex - planeIndex * planeSize_;
-    rowIndex = indexWithinPlane / numColumns_;
-    columnIndex = indexWithinPlane - rowIndex * numColumns_; 
-    if (columnIndex != myColumnIndex_) {
-      msgType = ColumnMessage; 
+    indexWithinPlane = 
+      nodeIndex - destinationCoordinates.planeIndex * planeSize_;
+    destinationCoordinates.rowIndex = indexWithinPlane / numColumns_;
+    destinationCoordinates.columnIndex = 
+      indexWithinPlane - destinationCoordinates.rowIndex * numColumns_; 
+    if (destinationCoordinates.columnIndex != myColumnIndex_) {
+      destinationCoordinates.msgType = ColumnMessage; 
     }
     else {
-      msgType = PersonalizedMessage;
+      destinationCoordinates.msgType = PersonalizedMessage;
     }
   }
 
+#ifdef HASH_LOCATIONS
+  hashedLocations[destinationPe] = destinationCoordinates;
+#endif
+
 }
 
 template <class dtype>
 void MeshStreamer<dtype>::storeMessage(MeshStreamerMessage<dtype> ** const messageBuffers, 
-                                const int bucketIndex, const int destinationPe, 
-                                const int rowIndex, const int columnIndex, 
-                                const int planeIndex, 
-                                const MeshStreamerMessageType msgType, 
-                                const dtype &dataItem) {
+                                      const int bucketIndex, const int destinationPe, 
+                                      const MeshLocation& destinationCoordinates,
+                                      const dtype &dataItem) {
 
   // allocate new message if necessary
   if (messageBuffers[bucketIndex] == NULL) {
-    if (msgType == PersonalizedMessage) {
+    if (destinationCoordinates.msgType == PersonalizedMessage) {
       messageBuffers[bucketIndex] = 
         new (0, bucketSize_) MeshStreamerMessage<dtype>();
     }
@@ -275,7 +301,7 @@ void MeshStreamer<dtype>::storeMessage(MeshStreamerMessage<dtype> ** const messa
   MeshStreamerMessage<dtype> *destinationBucket = messageBuffers[bucketIndex];
   
   int numBuffered = destinationBucket->addDataItem(dataItem); 
-  if (msgType != PersonalizedMessage) {
+  if (destinationCoordinates.msgType != PersonalizedMessage) {
     destinationBucket->markDestination(numBuffered-1, destinationPe);
   }
   numDataItemsBuffered_++;
@@ -283,19 +309,21 @@ void MeshStreamer<dtype>::storeMessage(MeshStreamerMessage<dtype> ** const messa
   if (numBuffered == bucketSize_) {
     int destinationIndex;
     CProxy_MeshStreamer<dtype> thisProxy(thisgroup);
-    switch (msgType) {
+    switch (destinationCoordinates.msgType) {
 
     case PlaneMessage:
-      destinationIndex = 
-        myNodeIndex_ + (planeIndex - myPlaneIndex_) * planeSize_;      
+      destinationIndex = myNodeIndex_ + 
+       (destinationCoordinates.planeIndex - myPlaneIndex_) * planeSize_;  
       thisProxy[destinationIndex].receiveAggregateData(destinationBucket);
       break;
     case ColumnMessage:
-      destinationIndex = myNodeIndex_ + (columnIndex - myColumnIndex_);
+      destinationIndex = myNodeIndex_ + 
+       (destinationCoordinates.columnIndex - myColumnIndex_);
       thisProxy[destinationIndex].receiveAggregateData(destinationBucket);
       break;
     case PersonalizedMessage:
-      destinationIndex = myNodeIndex_ + (rowIndex - myRowIndex_) * numColumns_;
+      destinationIndex = myNodeIndex_ + 
+       (destinationCoordinates.rowIndex - myRowIndex_) * numColumns_;
       clientProxy_[destinationIndex].receiveCombinedData(destinationBucket);      
       //      thisProxy[destinationIndex].receivePersonalizedData(destinationBucket);
       break;
@@ -335,36 +363,35 @@ void MeshStreamer<dtype>::insertData(const dtype &dataItem, const int destinatio
     return;
   }
 
-  int planeIndex, columnIndex, rowIndex; // location of destination
   int indexWithinPlane; 
-  MeshStreamerMessageType msgType; 
+  MeshLocation destinationCoordinates;
 
-  determineLocation(destinationPe, rowIndex, columnIndex, planeIndex, msgType);
+  determineLocation(destinationPe, destinationCoordinates);
 
   // determine which array of buffers is appropriate for this message
   MeshStreamerMessage<dtype> **messageBuffers;
   int bucketIndex; 
 
-  switch (msgType) {
+  switch (destinationCoordinates.msgType) {
   case PlaneMessage:
     messageBuffers = planeBuffers_; 
-    bucketIndex = planeIndex; 
+    bucketIndex = destinationCoordinates.planeIndex; 
     break;
   case ColumnMessage:
     messageBuffers = columnBuffers_; 
-    bucketIndex = columnIndex; 
+    bucketIndex = destinationCoordinates.columnIndex; 
     break;
   case PersonalizedMessage:
     messageBuffers = personalizedBuffers_; 
-    bucketIndex = rowIndex; 
+    bucketIndex = destinationCoordinates.rowIndex; 
     break;
   default: 
     CkError("Unrecognized MeshStreamer message type\n");
     break;
   }
 
-  storeMessage(messageBuffers, bucketIndex, destinationPe, rowIndex
-               columnIndex, planeIndex, msgType, dataItem);
+  storeMessage(messageBuffers, bucketIndex, destinationPe, destinationCoordinates
+              dataItem);
 
     // release control to scheduler if requested by the user, 
     //   assume caller is threaded entry
@@ -394,41 +421,41 @@ void MeshStreamer<dtype>::finish(CkReductionMsg *msg) {
 template <class dtype>
 void MeshStreamer<dtype>::receiveAggregateData(MeshStreamerMessage<dtype> *msg) {
 
-  int rowIndex, columnIndex, planeIndex, destinationPe; 
+  int destinationPe; 
   MeshStreamerMessageType msgType;   
+  MeshLocation destinationCoordinates;
 
   for (int i = 0; i < msg->numDataItems; i++) {
     destinationPe = msg->destinationPes[i];
     dtype dataItem = msg->getDataItem(i);
-    determineLocation(destinationPe, rowIndex, columnIndex, 
-                      planeIndex, msgType);
+    determineLocation(destinationPe, destinationCoordinates);
 #ifdef DEBUG_STREAMER
-    CkAssert(planeIndex == myPlaneIndex_);
+    CkAssert(destinationCoordinates.planeIndex == myPlaneIndex_);
 
-    if (msgType == PersonalizedMessage) {
-      CkAssert(columnIndex == myColumnIndex_);
+    if (destinationCoordinates.msgType == PersonalizedMessage) {
+      CkAssert(destinationCoordinates.columnIndex == myColumnIndex_);
     }
 #endif    
 
     MeshStreamerMessage<dtype> **messageBuffers;
     int bucketIndex; 
 
-    switch (msgType) {
+    switch (destinationCoordinates.msgType) {
     case ColumnMessage:
       messageBuffers = columnBuffers_; 
-      bucketIndex = columnIndex; 
+      bucketIndex = destinationCoordinates.columnIndex; 
       break;
     case PersonalizedMessage:
       messageBuffers = personalizedBuffers_; 
-      bucketIndex = rowIndex; 
+      bucketIndex = destinationCoordinates.rowIndex; 
       break;
     default: 
       CkError("Incorrect MeshStreamer message type\n");
       break;
     }
 
-    storeMessage(messageBuffers, bucketIndex, destinationPe, rowIndex, 
-                 columnIndex, planeIndex, msgType, dataItem);
+    storeMessage(messageBuffers, bucketIndex, destinationPe, 
+                destinationCoordinates, dataItem);
     
   }