NDMeshStreamer: code cleanup.
authorLukasz Wesolowski <wesolwsk@illinois.edu>
Fri, 26 Oct 2012 23:37:04 +0000 (18:37 -0500)
committerLukasz Wesolowski <wesolwsk@illinois.edu>
Fri, 26 Oct 2012 23:37:04 +0000 (18:37 -0500)
src/libs/ck-libs/NDMeshStreamer/NDMeshStreamer.h

index 0d676a13542c0990eb2a9a269cc999087313da14..84fd0ded0dd619403cde811f23b24828033d9f62 100644 (file)
@@ -40,16 +40,16 @@ public:
     finalMsgCount = -1; 
   }
 
-  int addDataItem(const dtype& dataItem) {
+  inline int addDataItem(const dtype& dataItem) {
     dataItems[numDataItems] = dataItem;
     return ++numDataItems; 
   }
 
-  void markDestination(const int index, const int destinationPe) {
+  inline void markDestination(const int index, const int destinationPe) {
     destinationPes[index] = destinationPe;
   }
 
-  const dtype& getDataItem(const int index) {
+  inline const dtype& getDataItem(const int index) {
     return dataItems[index];
   }
 
@@ -70,7 +70,7 @@ public:
     CkAbort("Error. MeshStreamerArrayClient::process() is being called. "
             "This virtual function should have been defined by the user.\n");
   }     
-  void setDetector(CompletionDetector *detectorLocalObj) {
+  inline void setDetector(CompletionDetector *detectorLocalObj) {
     detectorLocalObj_ = detectorLocalObj;
   }
   void receiveRedeliveredItem(dtype data) {
@@ -200,7 +200,7 @@ public:
   void finish();
 
   // non entry
-  bool isPeriodicFlushEnabled() {
+  inline bool isPeriodicFlushEnabled() {
     return isPeriodicFlushEnabled_;
   }
   virtual void insertData(const dtype& dataItem, int destinationPe); 
@@ -209,12 +209,12 @@ public:
 
   // flushing begins only after enablePeriodicFlushing has been invoked
 
-  void enablePeriodicFlushing(){
+  inline void enablePeriodicFlushing(){
     isPeriodicFlushEnabled_ = true; 
     registerPeriodicProgressFunction();
   }
 
-  void done(int numContributorsFinished = 1) {
+  inline void done(int numContributorsFinished = 1) {
 
     if (useStagedCompletion_) {
       numLocalDone_ += numContributorsFinished; 
@@ -227,11 +227,11 @@ public:
     }
   }
   
-  bool stagedCompletionStarted() {    
+  inline bool stagedCompletionStarted() {
     return (useStagedCompletion_ && dimensionToFlush_ != numDimensions_ - 1); 
   }
 
-  void startStagedCompletion() {          
+  inline void startStagedCompletion() {
     if (individualDimensionSizes_[dimensionToFlush_] != 1) {
       flushDimension(dimensionToFlush_, true);
     }
@@ -240,7 +240,7 @@ public:
     checkForCompletedStages();
   }
 
-  void markMessageReceived(int dimension, int finalCount) {
+  inline void markMessageReceived(int dimension, int finalCount) {
     cntMsgReceived_[dimension]++;
     if (finalCount != -1) {
       cntFinished_[dimension]++; 
@@ -257,7 +257,7 @@ public:
     }
   }
 
-  void checkForCompletedStages() {
+  inline void checkForCompletedStages() {
 
     while (cntFinished_[dimensionToFlush_ + 1] == 
            individualDimensionSizes_[dimensionToFlush_ + 1] - 1 &&
@@ -755,6 +755,7 @@ void MeshStreamer<dtype>::receiveAlongRoute(MeshStreamerMessage<dtype> *msg) {
 }
 
 template <class dtype>
+inline
 void MeshStreamer<dtype>::sendLargestBuffer() {
 
   int flushDimension, flushIndex, maxSize, destinationIndex, numBuffers;
@@ -819,6 +820,7 @@ void MeshStreamer<dtype>::sendLargestBuffer() {
 }
 
 template <class dtype>
+inline
 void MeshStreamer<dtype>::flushToIntermediateDestinations() {
   
   for (int i = 0; i < numDimensions_; i++) {
@@ -994,8 +996,7 @@ public:
                         0, yieldFlag, progressPeriodInMs) 
   {
     clientProxy_ = clientProxy; 
-    clientObj_ = 
-      ((MeshStreamerGroupClient<dtype> *)CkLocalBranch(clientProxy_));
+    clientObj_ = clientProxy_.ckLocalBranch();
   }
 
   GroupMeshStreamer(int numDimensions, int *dimensionSizes, 
@@ -1006,8 +1007,7 @@ public:
                         yieldFlag, progressPeriodInMs) 
   {
     clientProxy_ = clientProxy; 
-    clientObj_ = 
-      ((MeshStreamerGroupClient<dtype> *)CkLocalBranch(clientProxy_));
+    clientObj_ = clientProxy_.ckLocalBranch();
   }
 
 };
@@ -1024,7 +1024,7 @@ public:
     : detectorLocalObj_(detectorObj), clientArrMgr_(clientArrMgr) {}
 
   // CkLocMgr::iterate will call addLocation on all elements local to this PE
-  void addLocation(CkLocation& loc) {
+  inline void addLocation(CkLocation& loc) {
 
     MeshStreamerArrayClient<dtype> *clientObj = 
       (MeshStreamerArrayClient<dtype> *) clientArrMgr_->lookup(loc.getIndex());
@@ -1071,6 +1071,7 @@ private:
   bool *isCachedArrayMetadata_;
 #endif
 
+  inline 
   void localDeliver(const ArrayDataItem<dtype, itype>& packedDataItem) {
     itype arrayId = packedDataItem.arrayIndex; 
     if (arrayId == itype(TRAM_BROADCAST)) {
@@ -1098,6 +1099,7 @@ private:
     }
   }
 
+  inline 
   void localBroadcast(const ArrayDataItem<dtype, itype>& packedDataItem) {
 
     LocalBroadcaster<dtype> clientIterator(clientProxy_.ckLocalBranch(), 
@@ -1113,15 +1115,15 @@ private:
 
   }
 
-  int numElementsInClient() {
+  inline int numElementsInClient() {
     return numArrayElements_;
   }
 
-  int numLocalElementsInClient() {
+  inline int numLocalElementsInClient() {
     return numLocalArrayElements_;
   }
 
-  void initLocalClients() {
+  inline void initLocalClients() {
     if (MeshStreamer<ArrayDataItem<dtype, itype> >
          ::useStagedCompletion_ == false) {
 #ifdef CACHE_ARRAY_METADATA
@@ -1149,7 +1151,7 @@ private:
     }
   }
 
-  void commonInit() {
+  inline void commonInit() {
 #ifdef CACHE_ARRAY_METADATA
     numArrayElements_ = (clientArrayMgr_->getNumInitial()).data()[0];
     clientObjs_ = new MeshStreamerArrayClient<dtype>*[numArrayElements_];
@@ -1333,16 +1335,10 @@ public:
     :MeshStreamer<ChunkDataItem>(maxNumDataItemsBuffered, 
                                  numDimensions, dimensionSizes,
                                  0, yieldFlag, progressPeriodInMs) {
-    
-    receiveBuffers = new dtype*[CkNumPes()];
-    receivedChunks = new int[CkNumPes()]; 
-    memset(receivedChunks, 0, CkNumPes() * sizeof(int));
-    memset(receiveBuffers, 0, CkNumPes() * sizeof(dtype*)); 
 
     clientProxy_ = clientProxy; 
-    clientObj_ = 
-      ((MeshStreamerGroupClient<dtype> *)CkLocalBranch(clientProxy_));
-
+    clientObj_ = clientProxy_.ckLocalBranch();    
+    commonInit();
   }
 
   GroupChunkMeshStreamer(
@@ -1354,15 +1350,18 @@ public:
                                  bufferSize, yieldFlag, 
                                  progressPeriodInMs) {
 
+    clientProxy_ = clientProxy; 
+    clientObj_ = clientProxy_.ckLocalBranch();
+    commonInit();
+  }
+
+  inline void commonInit() {
+
     receiveBuffers = new dtype*[CkNumPes()];
     receivedChunks = new int[CkNumPes()]; 
     memset(receivedChunks, 0, CkNumPes() * sizeof(int));
     memset(receiveBuffers, 0, CkNumPes() * sizeof(dtype*)); 
 
-    clientProxy_ = clientProxy; 
-    clientObj_ = 
-      ((MeshStreamerGroupClient<dtype> *)CkLocalBranch(clientProxy_));
-
   }
 
   inline void insertData(dtype *dataArray, int numElements, int destinationPe) {
@@ -1374,7 +1373,8 @@ public:
     chunk.sourcePe = CkMyPe();
     chunk.chunkNumber = 0; 
     chunk.chunkSize = CHUNK_SIZE;
-    chunk.numChunks = (int) ceil ( (float) numElements * sizeof(dtype) / CHUNK_SIZE); 
+    chunk.numChunks = 
+      (int) ceil ( (float) numElements * sizeof(dtype) / CHUNK_SIZE); 
     chunk.numItems = numElements; 
     for (int offset = 0; offset < arraySizeInBytes; offset += CHUNK_SIZE) {
 
@@ -1404,7 +1404,8 @@ public:
     memcpy(receiveBuffer + chunk.chunkNumber * CHUNK_SIZE, 
            chunk.rawData, chunk.chunkSize);
     if (++receivedChunks[chunk.sourcePe] == chunk.numChunks) {
-      clientObj_->receiveArray((dtype *) receiveBuffer, chunk.numItems, chunk.sourcePe);
+      clientObj_->receiveArray(
+                  (dtype *) receiveBuffer, chunk.numItems, chunk.sourcePe);
       receivedChunks[chunk.sourcePe] = 0;        
       delete [] receiveBuffers[chunk.sourcePe]; 
       receiveBuffers[chunk.sourcePe] = NULL;