CkIO: Don't rely on group broadcasts being delivered in order
authorPhil Miller <mille121@illinois.edu>
Thu, 25 Jul 2013 00:05:47 +0000 (19:05 -0500)
committerPhil Miller <mille121@illinois.edu>
Tue, 20 Aug 2013 23:29:02 +0000 (18:29 -0500)
src/libs/ck-libs/io/ckio.C
src/libs/ck-libs/io/ckio.ci
tests/charm++/io/iotest.C
tests/charm++/io/iotest.ci

index 32a966c29193131cee3eda66643c871a6dbb04e9..84ec209de2a49e724831c30e4ae77436e35986dd 100644 (file)
@@ -80,10 +80,11 @@ namespace Ck { namespace IO {
         int filesOpened;
         map<FileToken, impl::FileInfo> files;
         CProxy_Manager managers;
+        int opnum;
 
       public:
         Director(CkArgMsg *m)
-          : filesOpened(0)
+          : filesOpened(0), opnum(0)
         {
           delete m;
           director = thisProxy;
@@ -103,7 +104,7 @@ namespace Ck { namespace IO {
             opts.skipPEs = CkMyNodeSize();
 
           files[filesOpened] = FileInfo(name, opened, opts);
-          managers.openFile(filesOpened++, name, opts);
+          managers.openFile(opnum++, filesOpened++, name, opts);
         }
 
         void fileOpened(FileToken file) {
@@ -129,20 +130,24 @@ namespace Ck { namespace IO {
         }
 
         void close(FileToken token, CkCallback closed) {
-          managers.close(token, closed);
+          managers.close(opnum++, token, closed);
           files.erase(token);
         }
       };
 
       class Manager : public CBase_Manager {
+        Manager_SDAG_CODE
+        int opnum;
 
       public:
         Manager()
+          : opnum(0)
         {
           manager = this;
+          thisProxy[CkMyPe()].run();
         }
 
-        void openFile(FileToken token, string name, Options opts) {
+        void doOpenFile(FileToken token, string name, Options opts) {
           CkAssert(files.end() == files.find(token));
           CkAssert(lastActivePE(opts) < CkNumPes());
           CkAssert(opts.writeStripe <= opts.peStripe);
@@ -187,7 +192,7 @@ namespace Ck { namespace IO {
           }
         }
 
-        void close(FileToken token, CkCallback closed) {
+        void doClose(FileToken token, CkCallback closed) {
           int ret;
           do {
             ret = ::close(files[token].fd);
index efc4b49f8456f38acc7053098a02e7dbee2489b1..86bb3b66a7b599f9459c3da00ed47312b7afbd76 100644 (file)
@@ -34,8 +34,21 @@ module CkIO_impl {
         group Manager
         {
           entry Manager();
-          entry void openFile(FileToken token, std::string name, Options opts);
-          entry void close(FileToken token, CkCallback closed);
+          entry void run() {
+            while (true) {
+              case {
+                when openFile[opnum](unsigned int opnum_,
+                                     FileToken token, std::string name, Options opts)
+                  serial { doOpenFile(token, name, opts); }
+                when close[opnum](unsigned int opnum_, FileToken token, CkCallback closed)
+                  serial { doClose(token, closed); }
+              }
+              serial { opnum++; }
+            }
+          };
+          entry void openFile(unsigned int opnum,
+                              FileToken token, std::string name, Options opts);
+          entry void close(unsigned int opnum, FileToken token, CkCallback closed);
         };
 
         array [1D] WriteSession
index 6eb76ca1dadef29ca82e6d4d5729e7f30cd644cc..78286dbe6cbaa7f2e927240bdf5ea66aca017021 100644 (file)
@@ -1,27 +1,29 @@
 #include "iotest.decl.h"
-
-/* readonly */ CkGroupID mgr;
+#include <vector>
 
 class Main : public CBase_Main {
   Main_SDAG_CODE
 
   CProxy_test testers;
-  int n;
-  Ck::IO::File f;
+  int n, numdone;
+  std::vector<Ck::IO::File> f;
 public:
   Main(CkArgMsg *m) {
+    numdone = 0;
     n = atoi(m->argv[1]);
-    Ck::IO::Options opts;
-    opts.peStripe = 200;
-    opts.writeStripe = 1;
-    CkCallback opened(CkIndex_Main::ready(NULL), thisProxy);
-    opened.setRefnum(5);
-    Ck::IO::open("test", opened, opts);
+
+    f.resize(6);
+    for (int i = 0; i < f.size(); ++i)
+      thisProxy.run(4*i);
 
     CkPrintf("Main ran\n");
-    thisProxy.run();
   }
 
+  void iterDone() {
+    numdone++;
+    if (numdone == f.size())
+      CkExit();
+  }
 };
 
 struct test : public CBase_test {
index 1be757835e2bf897acca89f761c26d9e4f4c05e9..7b69626eed114d7bebd640c748160b57d570d051 100644 (file)
@@ -1,42 +1,52 @@
 mainmodule iotest {
-  readonly CkGroupID mgr;
   include "ckio.h";
 
   mainchare Main {
     entry Main(CkArgMsg *m);
     entry void ready(Ck::IO::FileReadyMsg *m);
 
-    entry void run() {
-      when ready[5](Ck::IO::FileReadyMsg *m) serial {
-        f = m->file;
+    entry void run(int iter) {
+      serial {
+        Ck::IO::Options opts;
+        opts.peStripe = 200;
+        opts.writeStripe = 1;
+        CkCallback opened(CkIndex_Main::ready(NULL), thisProxy);
+        opened.setRefnum(iter + 0);
+        char name[20];
+        sprintf(name, "test%d", iter);
+        Ck::IO::open(name, opened, opts);
+      }
+      when ready[iter + 0](Ck::IO::FileReadyMsg *m) serial {
+        f.at(iter/4) = m->file;
         CkCallback sessionStart(CkIndex_Main::start_write(0), thisProxy);
-        sessionStart.setRefnum(13);
+        sessionStart.setRefnum(iter + 1);
         CkCallback sessionEnd(CkIndex_Main::test_written(0), thisProxy);
-        sessionEnd.setRefnum(13);
-        Ck::IO::startSession(f, 10*n, 0, sessionStart, sessionEnd);
+        sessionEnd.setRefnum(iter + 2);
+        Ck::IO::startSession(f.at(iter/4), 10*n, 0, sessionStart, sessionEnd);
         delete m;
         CkPrintf("Main saw file ready\n");
       }
-      when start_write[13](Ck::IO::SessionReadyMsg *m) serial {
+      when start_write[iter + 1](Ck::IO::SessionReadyMsg *m) serial {
         testers = CProxy_test::ckNew(m->session, n);
         CkPrintf("Main saw session ready\n");
       }
-      when test_written[13](CkReductionMsg *m) serial {
+      when test_written[iter + 2](CkReductionMsg *m) serial {
         CkPrintf("Main saw write done\n");
         // Read file and validate contents
         CkCallback cb(CkIndex_Main::closed(0), thisProxy);
-        cb.setRefnum(17);
-        Ck::IO::close(f, cb);
+        cb.setRefnum(iter + 3);
+        Ck::IO::close(f.at(iter/4), cb);
       }
-      when closed[17](CkReductionMsg *m) serial {
+      when closed[iter + 3](CkReductionMsg *m) serial {
         CkPrintf("Main saw close done\n");
-        CkExit();
+        thisProxy.iterDone();
       }
     };
 
     entry void start_write(Ck::IO::SessionReadyMsg *m);
     entry void test_written(CkReductionMsg *m);
     entry void closed(CkReductionMsg *m);
+    entry void iterDone();
   };
 
   array [1D] test {