completion: give a callback when all producers are done
authorPhil Miller <mille121@illinois.edu>
Wed, 18 Apr 2012 23:20:42 +0000 (18:20 -0500)
committerPhil Miller <mille121@illinois.edu>
Wed, 18 Apr 2012 23:20:42 +0000 (18:20 -0500)
NDMeshStreamer can use this to drive a flush as soon as all of the
input data has been deposited, rather than potentially waiting for the
next periodic flush or forcing the client code to reduce separately.

src/libs/ck-libs/NDMeshStreamer/NDMeshStreamer.h
src/libs/ck-libs/completion/completion.ci
tests/charm++/megatest/completion_test.ci

index 518a15a75484fd0e7a09a5ee5051e6ea4c322815..fa1224ebf8f8335d52b5e742dcc4738bb60679cb 100644 (file)
@@ -432,13 +432,14 @@ void MeshStreamer<dtype>::associateCallback(
                          int prio) {
   prio_ = prio;
   userCallback_ = endCb; 
+  CkCallback flushCb(CkIndex_MeshStreamer<dtype>::flushDirect(), this->thisProxy);
   static CkCallback finish(CkIndex_MeshStreamer<dtype>::finish(), 
                           this->thisProxy);
   detector_ = detector;      
   detectorLocalObj_ = detector_.ckLocalBranch();
   initLocalClients();
 
-  detectorLocalObj_->start_detection(numContributors, startCb, finish , 0);
+  detectorLocalObj_->start_detection(numContributors, startCb, flushCb, finish , 0);
   
   if (progressPeriodInMs_ <= 0) {
     CkPrintf("Using completion detection in NDMeshStreamer requires"
index ae4d6ffdae6d2df359bb4387566a4e0e98c6753d..9f7f74908953c3bdfab06b4a581f486be2f0be74 100644 (file)
@@ -7,7 +7,8 @@ module completion {
 
     // How many producer objects must signal completion, and who
     // should we tell when consumption matches production?
-    entry void start_detection(int num_producers, CkCallback start, CkCallback finish,
+    entry void start_detection(int num_producers,
+                               CkCallback start, CkCallback allProduced, CkCallback finish,
                                int prio_) {
       atomic {
         CkAssert(!running);
@@ -29,6 +30,11 @@ module completion {
         }
       }
 
+      atomic {
+        if (CkMyPe() == 0 && !allProduced.isInvalid())
+          allProduced.send();
+      }
+
       while (unconsumed > 0) {
         atomic {
           int counts[2];
index 9f41cdf8395ce618d50680d223702613c269e18c..556da38d198511892b715440649b1ef89f772416 100644 (file)
@@ -14,6 +14,7 @@ module completion_test {
        detector.start_detection(0,
                                 CkCallback(CkIndex_completion_tester::start_test(0),
                                            thisProxy),
+                                 CkCallback(),
                                 CkCallback(CkIndex_completion_tester::finish_test(0),
                                            thisProxy), 0);
       }
@@ -23,6 +24,7 @@ module completion_test {
        detector.start_detection(1,
                                 CkCallback(CkIndex_completion_tester::start_test(0),
                                            thisProxy),
+                                 CkCallback(),
                                 CkCallback(CkIndex_completion_tester::finish_test(0),
                                            thisProxy), 0);
       }
@@ -32,6 +34,7 @@ module completion_test {
        detector.start_detection(1,
                                 CkCallback(CkIndex_completion_tester::start_test(0),
                                            thisProxy),
+                                 CkCallback(),
                                 CkCallback(CkIndex_completion_tester::finish_test(0),
                                            thisProxy), 0);
        detector.ckLocalBranch()->produce();
@@ -44,7 +47,7 @@ module completion_test {
       when finish_test(CkReductionMsg *m) atomic {
          int num = 10;
          CProxy_completion_array::ckNew(detector, num, num);
-         detector.start_detection(num, CkCallback(),
+         detector.start_detection(num, CkCallback(), CkCallback(),
                                   CkCallback(CkIndex_completion_tester::finish_test(0),
                                              thisProxy), 0);
       }