NDMeshStreamer: adding support for specifying priority of streamed messages
authorLukasz Wesolowski <wesolwsk@illinois.edu>
Mon, 2 Apr 2012 22:51:22 +0000 (17:51 -0500)
committerLukasz Wesolowski <wesolwsk@illinois.edu>
Mon, 2 Apr 2012 22:52:22 +0000 (17:52 -0500)
src/libs/ck-libs/NDMeshStreamer/NDMeshStreamer.ci
src/libs/ck-libs/NDMeshStreamer/NDMeshStreamer.h

index 9b3841a95c8badb00382c2a47c4eb6dad870b029..763ea1360663305dd0d2753a32d04257b7315603 100644 (file)
@@ -29,7 +29,8 @@ module NDMeshStreamer {
     entry void finish();
     entry void associateCallback(int numContributors, 
                                 CkCallback startCb, CkCallback endCb, 
-                                CProxy_CompletionDetector detector);
+                                CProxy_CompletionDetector detector,
+                                int prio);
   };
 
   template<class dtype>
index 16eb9d8daaa254ac516d65a2451f3d3f15638911..c7b5566e823dd762ef851a2cef498c3980170c9d 100644 (file)
@@ -138,6 +138,7 @@ private:
     MeshStreamerMessage<dtype> ***dataBuffers_;
 
     CProxy_CompletionDetector detector_;
+    int prio_;
 
 #ifdef CACHE_LOCATIONS
     MeshLocation *cachedLocations_;
@@ -189,7 +190,8 @@ public:
     void insertData(void *dataItemHandle, int destinationPe);
     void associateCallback(int numContributors, 
                           CkCallback startCb, CkCallback endCb, 
-                          CProxy_CompletionDetector detector);
+                          CProxy_CompletionDetector detector,
+                          int prio);
     void flushAllBuffers();
     void registerPeriodicProgressFunction();
 
@@ -358,12 +360,14 @@ void MeshStreamer<dtype>::storeMessage(
     if (dimension == 0) {
       // personalized messages do not require destination indices
       messageBuffers[bufferIndex] = 
-        new (0, bufferSize_) MeshStreamerMessage<dtype>();
+        new (0, bufferSize_, sizeof(int)) MeshStreamerMessage<dtype>();
     }
     else {
       messageBuffers[bufferIndex] = 
-        new (bufferSize_, bufferSize_) MeshStreamerMessage<dtype>();
+        new (bufferSize_, bufferSize_, sizeof(int)) MeshStreamerMessage<dtype>();
     }
+    *(int *) CkPriorityPtr(messageBuffers[bufferIndex]) = prio_;
+    CkSetQueueing(messageBuffers[bufferIndex], CK_QUEUEING_IFIFO);
 #ifdef DEBUG_STREAMER
     CkAssert(messageBuffers[bufferIndex] != NULL);
 #endif
@@ -444,7 +448,9 @@ template <class dtype>
 void MeshStreamer<dtype>::associateCallback(
                          int numContributors,
                          CkCallback startCb, CkCallback endCb, 
-                         CProxy_CompletionDetector detector) {
+                         CProxy_CompletionDetector detector, 
+                         int prio) {
+  prio_ = prio;
   userCallback_ = endCb; 
   static CkCallback finish(CkIndex_MeshStreamer<dtype>::finish(), 
                           this->thisProxy);
@@ -534,6 +540,7 @@ void MeshStreamer<dtype>::flushLargestBuffer() {
        envelope *env = UsrToEnv(destinationBuffer);
        env->setTotalsize(env->getTotalsize() - sizeof(dtype) *
                          (bufferSize_ - destinationBuffer->numDataItems));
+       *((int *) env->getPrioPtr()) = prio_;
       }
       numDataItemsBuffered_ -= destinationBuffer->numDataItems;
 
@@ -578,7 +585,10 @@ void MeshStreamer<dtype>::flushAllBuffers() {
        for (int k = 0; k < messageBuffers[j]->numDataItems; k++) {
 
          MeshStreamerMessage<dtype> *directMsg = 
-           new (0, 1) MeshStreamerMessage<dtype>();
+           new (0, 1, sizeof(int)) MeshStreamerMessage<dtype>();
+         *(int *) CkPriorityPtr(directMsg) = prio_;
+         CkSetQueueing(directMsg, CK_QUEUEING_IFIFO);
+
 #ifdef DEBUG_STREAMER
          CkAssert(directMsg != NULL);
 #endif