add a start(callback) function to omit the need for a second QD from user code to...
authorGengbin Zheng <gzheng@illinois.edu>
Tue, 1 Nov 2011 04:12:51 +0000 (23:12 -0500)
committerGengbin Zheng <gzheng@illinois.edu>
Tue, 1 Nov 2011 04:12:51 +0000 (23:12 -0500)
src/libs/ck-libs/MeshStreamer/MeshStreamer.h

index 82d47cb714da0814bb3c3604f2e71167f4182abb..352e92c4dc2c2a37429d0fd042ca29233139389a 100644 (file)
@@ -87,6 +87,8 @@ private:
     int myColumnIndex_; 
     int myRowIndex_;
 
+    CkCallback   user_cb;
+
     MeshStreamerMessage<dtype> **personalizedBuffers_; 
     MeshStreamerMessage<dtype> **columnBuffers_; 
     MeshStreamerMessage<dtype> **planeBuffers_;
@@ -108,15 +110,21 @@ public:
     MeshStreamer(int totalBufferCapacity, int numRows, 
                 int numColumns, int numPlanes, 
                 const CProxy_MeshStreamerClient<dtype> &clientProxy);
-
     ~MeshStreamer();
 
+      // entry
     void insertData(const dtype &dataItem, const int destinationPe); 
     void receiveAggregateData(MeshStreamerMessage<dtype> *msg);
     void receivePersonalizedData(MeshStreamerMessage<dtype> *msg);
 
     void flushBuckets(MeshStreamerMessage<dtype> **messageBuffers, const int numBuffers);
     void flushDirect();
+
+      // non entry
+    void start(CkCallback &cb) { 
+              user_cb = cb;
+              CkStartQD(CkCallback(CkIndex_MeshStreamer<dtype>::flushDirect(), thisProxy));
+         }
 };
 
 template <class dtype>
@@ -477,6 +485,10 @@ void MeshStreamer<dtype>::flushDirect(){
     CkAssert(numDataItemsBuffered_ == 0); 
 #endif
 
+    if (!user_cb.isInvalid()) {
+        CkStartQD(user_cb);
+        user_cb = CkCallback();      // nullify the current callback
+    }
 }
 
 #define CK_TEMPLATES_ONLY