Distributed Completion Detection: Apps don't need to use QD
authorPhil Miller <mille121@illinois.edu>
Wed, 23 Feb 2011 22:51:32 +0000 (16:51 -0600)
committerPhil Miller <mille121@illinois.edu>
Thu, 24 Feb 2011 02:18:29 +0000 (20:18 -0600)
A mechanism to detect the completion of distributed processes, based
on a counts of event generators and events produced and consumed. From
an application's standpoint, this accomplishes the same thing as QD,
but on a more modular basis.

Ideally, this should be able to prioritize itself below the process
it's tracking. That's not implemented yet.

src/libs/ck-libs/Makefile
src/libs/ck-libs/completion/Make.depends [new file with mode: 0644]
src/libs/ck-libs/completion/Makefile [new file with mode: 0644]
src/libs/ck-libs/completion/completion.C [new file with mode: 0644]
src/libs/ck-libs/completion/completion.ci [new file with mode: 0644]
src/libs/ck-libs/completion/completion.h [new file with mode: 0644]
tests/charm++/megatest/Make.depends
tests/charm++/megatest/Makefile
tests/charm++/megatest/completion_test.C [new file with mode: 0644]
tests/charm++/megatest/completion_test.ci [new file with mode: 0644]

index a190efa3d4d2a49dbd12085cce28e8af8529df8f..a166459885274f2161777f2b568202e4e6b6c709 100644 (file)
@@ -1,7 +1,7 @@
 CHARMC=../../bin/charmc $(OPTS)
 CHARMINC=.
 
-SIMPLE_DIRS = cache sparseContiguousReducer tcharm ampi idxl \
+SIMPLE_DIRS = completion cache sparseContiguousReducer tcharm ampi idxl \
               parmetis multiphaseSharedArrays fem ParFUM \
               ifem armci collide mblock barrier irecv netfem liveViz \
               taskGraph search datatransfer pose
diff --git a/src/libs/ck-libs/completion/Make.depends b/src/libs/ck-libs/completion/Make.depends
new file mode 100644 (file)
index 0000000..a3017bb
--- /dev/null
@@ -0,0 +1,2 @@
+#generated by make depends
+       $(CHARMC) -I../../../../tmp -o completion.o completion.C
diff --git a/src/libs/ck-libs/completion/Makefile b/src/libs/ck-libs/completion/Makefile
new file mode 100644 (file)
index 0000000..af484be
--- /dev/null
@@ -0,0 +1,51 @@
+CDIR =../../../..
+INCDIR=$(CDIR)/include
+CHARMC=$(CDIR)/bin/charmc $(OPTS)
+
+LIB = libmodulecompletion.a
+LIBOBJ = completion.o
+
+HEADERS = $(INCDIR)/completion.decl.h \
+          $(INCDIR)/completion.def.h \
+          $(INCDIR)/completion.h
+LIBDEST =  $(CDIR)/lib/$(LIB)
+
+CIFILES = completion.ci
+
+all: $(LIBDEST) $(HEADERS)
+
+$(INCDIR)/completion.decl.h: completion.decl.h
+       /bin/cp completion.decl.h $(CDIR)/include
+$(INCDIR)/completion.def.h: completion.def.h
+       /bin/cp completion.def.h $(CDIR)/include
+$(INCDIR)/completion.h: completion.h
+       /bin/cp completion.h $(CDIR)/include
+
+$(LIBDEST): $(LIBOBJ)
+       $(CHARMC) -o $(LIBDEST) $(LIBOBJ) 
+
+completion.def.h: completion.decl.h
+
+completion.decl.h : completion.ci $(CDIR)/bin/charmxi
+       $(CHARMC) -c completion.ci
+
+clean:
+       rm -f conv-host *.o *.decl.h *.def.h core  $(LIB)
+
+include Make.depends
+
+DEPENDFILE = Make.depends
+
+depends:  $(CIFILES)
+       echo "Creating " $(DEPENDFILE) " ...";  \
+       if [ -f $(DEPENDFILE) ]; then \
+           /bin/cp -f $(DEPENDFILE) $(DEPENDFILE).old; \
+        fi; \
+       echo '#generated by make depends' > $(DEPENDFILE); \
+        for i in $(LIBOBJ) ; do \
+             SRCFILE=`basename $$i .o`.C ; \
+              echo "checking dependencies for $$i : $$SRCFILE" ; \
+              g++ -MM -Wno-deprecated -I$(CDIR)/tmp $$SRCFILE >> $(DEPENDFILE); \
+              echo '   $$(CHARMC) -I$(CDIR)/tmp -o '$$i $$SRCFILE >> $(DEPENDFILE) ; \
+        done; 
+
diff --git a/src/libs/ck-libs/completion/completion.C b/src/libs/ck-libs/completion/completion.C
new file mode 100644 (file)
index 0000000..df4fcd7
--- /dev/null
@@ -0,0 +1,30 @@
+#include "completion.h"
+
+CompletionDetector::CompletionDetector()
+{
+    __sdag_init();
+    init();
+}
+
+void CompletionDetector::init() {
+    producers_total = 0;
+    producers_done_local = producers_done_global = 0;
+    produced = 0;
+    consumed = 0;
+    running = false;
+    unconsumed = 1; // Nonsense value, for loop below
+}
+
+void CompletionDetector::produce(int events_produced) {
+    produced += events_produced;
+}
+
+void CompletionDetector::consume(int events_consumed) {
+    consumed += events_consumed;
+}
+
+void CompletionDetector::done(int producers_done) {
+    producers_done_local += producers_done;
+}
+
+#include "completion.def.h"
diff --git a/src/libs/ck-libs/completion/completion.ci b/src/libs/ck-libs/completion/completion.ci
new file mode 100644 (file)
index 0000000..d230e7a
--- /dev/null
@@ -0,0 +1,58 @@
+module completion {
+  //include "ckcallback.h";
+
+  group CompletionDetector {
+    entry CompletionDetector();
+    // Call as a broadcast to the group
+
+    // How many producer objects must signal completion, and who
+    // should we tell when consumption matches production?
+    entry void start_detection(int num_producers, CkCallback start, CkCallback finish,
+                              int prio_) {
+      atomic {
+       CkAssert(!running);
+       running = true;
+       prio = prio_;
+       producers_total = num_producers;
+       cb = finish;
+       if (!start.isInvalid())
+         contribute(start);
+      }
+
+      while (producers_done_global < producers_total) {
+       atomic {
+           contribute(sizeof(int), &producers_done_local, CkReduction::sum_int,
+                    CkCallback(CkReductionTarget(CompletionDetector, producers_done),
+                               thisgroup));
+       }
+       when producers_done(int producers_done_global_) atomic {
+         producers_done_global = producers_done_global_;
+       }
+      }
+
+      while (unconsumed > 0) {
+       atomic {
+         int counts[2];
+         counts[0] = produced; // Move up to first reduction loop
+         counts[1] = consumed;
+         contribute(2*sizeof(int), counts, CkReduction::sum_int,
+                    CkCallback(CkReductionTarget(CompletionDetector, count_consumed),
+                               thisgroup));
+       }
+       when count_consumed(int produced_global, int consumed_global) atomic {
+         unconsumed = produced_global - consumed_global;
+       }
+      }
+
+      atomic "completion finished" {
+       init();
+       CkAssert(!cb.isInvalid());
+       contribute(cb);
+       cb = CkCallback();
+      }
+    };
+
+    entry [reductiontarget] void producers_done(int producers_done_global_);
+    entry [reductiontarget] void count_consumed(int produced_global, int consumed_global);
+  };
+};
diff --git a/src/libs/ck-libs/completion/completion.h b/src/libs/ck-libs/completion/completion.h
new file mode 100644 (file)
index 0000000..2940760
--- /dev/null
@@ -0,0 +1,27 @@
+#ifndef COMPLETION_H
+#define COMPLETION_H
+
+#include "completion.decl.h"
+
+class CompletionDetector : public CBase_CompletionDetector {
+public:
+    CompletionDetector();
+
+    // Local methods
+    void produce(int events_produced = 1);
+    void consume(int events_consumed = 1);
+    void done(int producers_done = 1);
+
+    CompletionDetector_SDAG_CODE
+
+private:
+    int produced, consumed, unconsumed;
+    int producers_total, producers_done_local, producers_done_global;
+    int prio;
+    CkCallback cb;
+    bool running;
+
+    void init();
+};
+
+#endif
index bf4d85dd1ef1bd7ebd284d97d4ce48284e2f0ff7..48c0ff72c5f992f2524ee7e631039baf9d76f58f 100644 (file)
@@ -205,3 +205,8 @@ inlineem.o: \
        megatest.h \
        inlineem.def.h
        $(CHARMC) -o inlineem.o inlineem.C
+completion_test.o: \
+       completion_test.C \
+       completion_test.decl.h \
+       completion_test.def.h
+       $(CHARMC) -o completion_test.o completion_test.C
index c5eb6ae295264f3a30246184244eca2f8d75a5c4..e86442914a77937f46bad823d676c16baf8551ac 100644 (file)
@@ -32,6 +32,7 @@ OBJS=megatest.o \
      immediatering.o \
      callback.o \
      inlineem.o \
+     completion_test.o
 
 
 #     priolongtest.o \
@@ -77,7 +78,7 @@ CIFILES =  \
       groupcast.def.h  migration.def.h      queens.def.h     varraystest.def.h\
       groupring.def.h  nodecast.def.h       reduction.def.h  varsizetest.def.h\
       groupsectiontest.def.h multisectiontest.def.h inlineem.def.h varsizetest2.def.h\
-      groupmulti.def.h #priolongtest.def.h
+      groupmulti.def.h completion_test.def.h #priolongtest.def.h
 
 .SUFFIXES:
 .SUFFIXES: .o .C .def.h .decl.h .ci .h
@@ -85,7 +86,7 @@ CIFILES =  \
 all: pgm
 
 pgm: $(OBJS)
-       $(CHARMC) -o pgm  $(OBJS) -language charm++
+       $(CHARMC) -o pgm  $(OBJS) -language charm++ -module completion
 
 .ci.decl.h:
        $(CHARMC) -c $<
diff --git a/tests/charm++/megatest/completion_test.C b/tests/charm++/megatest/completion_test.C
new file mode 100644 (file)
index 0000000..1c9b0dc
--- /dev/null
@@ -0,0 +1,31 @@
+#include "completion_test.decl.h"
+#include "completion.decl.h"
+#include "completion.h"
+#include "megatest.h"
+
+void completion_test_init(void){
+  CProxy_completion_tester::ckNew();
+}
+
+void completion_test_moduleinit(void){} 
+
+
+struct completion_tester : public CBase_completion_tester {
+    CProxy_CompletionDetector detector;
+    completion_tester() { __sdag_init(); thisProxy.run_test(); }
+
+    completion_tester_SDAG_CODE
+};
+
+struct completion_array : public CBase_completion_array {
+    completion_array(CProxy_CompletionDetector det, int n) {
+       det.ckLocalBranch()->produce(thisIndex+1);
+       det.ckLocalBranch()->done();
+       det.ckLocalBranch()->consume(n - thisIndex);
+    }
+    completion_array(CkMigrateMessage *) {}
+};
+
+MEGATEST_REGISTER_TEST(completion_test,"phil",0)
+
+#include "completion_test.def.h"
diff --git a/tests/charm++/megatest/completion_test.ci b/tests/charm++/megatest/completion_test.ci
new file mode 100644 (file)
index 0000000..9f41cdf
--- /dev/null
@@ -0,0 +1,64 @@
+module completion_test {
+
+  extern module completion;
+  
+  chare completion_tester {
+    entry completion_tester();
+
+    entry void run_test(void) {
+      //First test: no producers, nothing produced
+      atomic {
+         CkPrintf("Starting test\n");
+       detector = CProxy_CompletionDetector::ckNew();
+         CkPrintf("Created detector, starting first detection\n");
+       detector.start_detection(0,
+                                CkCallback(CkIndex_completion_tester::start_test(0),
+                                           thisProxy),
+                                CkCallback(CkIndex_completion_tester::finish_test(0),
+                                           thisProxy), 0);
+      }
+      when start_test(CkReductionMsg *m) atomic { CkPrintf("Started first test\n"); }
+      when finish_test(CkReductionMsg *m) atomic {
+       // Just ourself as a producer, produce nothing
+       detector.start_detection(1,
+                                CkCallback(CkIndex_completion_tester::start_test(0),
+                                           thisProxy),
+                                CkCallback(CkIndex_completion_tester::finish_test(0),
+                                           thisProxy), 0);
+      }
+      when start_test(CkReductionMsg *m) atomic { detector.ckLocalBranch()->done(); }
+      when finish_test(CkReductionMsg *m) atomic {
+         CkPrintf("Finished second test\n");
+       detector.start_detection(1,
+                                CkCallback(CkIndex_completion_tester::start_test(0),
+                                           thisProxy),
+                                CkCallback(CkIndex_completion_tester::finish_test(0),
+                                           thisProxy), 0);
+       detector.ckLocalBranch()->produce();
+      }
+      when start_test(CkReductionMsg *m) atomic {
+         CkPrintf("Started third test\n");
+         detector.ckLocalBranch()->done();
+         detector.ckLocalBranch()->consume();
+      }
+      when finish_test(CkReductionMsg *m) atomic {
+         int num = 10;
+         CProxy_completion_array::ckNew(detector, num, num);
+         detector.start_detection(num, CkCallback(),
+                                  CkCallback(CkIndex_completion_tester::finish_test(0),
+                                             thisProxy), 0);
+      }
+      when finish_test(CkReductionMsg *m) atomic {
+       megatest_finish();
+      }
+    };
+
+    // SDAG Entry methods
+    entry void start_test(CkReductionMsg *m);
+    entry void finish_test(CkReductionMsg *m);
+  };
+
+  array [1D] completion_array {
+      entry completion_array(CProxy_CompletionDetector detector, int n);
+  };
+};