ckio: return right bits to indicate a file has been opened
authorPhil Miller <mille121@illinois.edu>
Tue, 23 Jul 2013 22:06:55 +0000 (17:06 -0500)
committerPhil Miller <mille121@illinois.edu>
Tue, 20 Aug 2013 23:28:55 +0000 (18:28 -0500)
src/libs/ck-libs/io/ckio.C
src/libs/ck-libs/io/ckio.ci

index 0ad2b2ad7fe1e3fc27ce112550783fff9dc1b655..2da7b56f56abd9a4a22328d40b3ff67fb6296617 100644 (file)
@@ -53,11 +53,15 @@ namespace Ck { namespace IO {
     namespace impl {
       struct FileInfo {
         string name;
+        CkCallback opened;
         Options opts;
         int fd;
 
+        FileInfo(string name_, CkCallback opened_, Options opts_)
+          : name(name_), opened(opened_), opts(opts_), fd(-1)
+        { }
         FileInfo(string name_, Options opts_)
-          : name(name_), opts(opts_), fd(-1)
+          : name(name_), opened(), opts(opts_), fd(-1)
         { }
         FileInfo()
           : fd(-1)
@@ -98,8 +102,12 @@ namespace Ck { namespace IO {
           if (-1 == opts.skipPEs)
             opts.skipPEs = CkMyNodeSize();
 
-          files[filesOpened] = FileInfo(name, opts);
-          managers.openFile(filesOpened++, name, opened, opts);
+          files[filesOpened] = FileInfo(name, opened, opts);
+          managers.openFile(filesOpened++, name, opts);
+        }
+
+        void fileOpened(FileToken file) {
+          files[file].opened.send(new FileReadyMsg(file));
         }
 
         void prepareWriteSession(FileToken file, size_t bytes, size_t offset,
@@ -134,8 +142,7 @@ namespace Ck { namespace IO {
           manager = this;
         }
 
-        void openFile(FileToken token, string name,
-                      CkCallback opened, Options opts) {
+        void openFile(FileToken token, string name, Options opts) {
           CkAssert(files.end() == files.find(token));
           CkAssert(lastActivePE(opts) < CkNumPes());
           CkAssert(opts.writeStripe <= opts.peStripe);
@@ -146,10 +153,12 @@ namespace Ck { namespace IO {
           if (((CkMyPe() - opts.basePE) % opts.skipPEs == 0 &&
                CkMyPe() < lastActivePE(opts)) ||
               true) {
-            files[token].fd = doOpenFile(name);
+            int fd = doOpenFile(name);
+            files[token].fd = fd;
           }
 
-          contribute(sizeof(FileToken), &token, CkReduction::max_int, opened);
+          contribute(sizeof(FileToken), &token, CkReduction::max_int,
+                     CkCallback(CkReductionTarget(Director, fileOpened), director));
         }
 
         void write(Session session, const char *data, size_t bytes, size_t offset) {
@@ -253,7 +262,9 @@ namespace Ck { namespace IO {
           , myBytes(min(file->opts.peStripe, sessionOffset + sessionBytes - myOffset))
           , myBytesWritten(0)
           , complete(complete_)
-        { }
+        {
+          CkAssert(file->fd != -1);
+        }
 
         WriteSession(CkMigrateMessage *m) { }
 
index 0fdea8d39d8d6ef38b84e7dea93e4441d99d67a4..efc4b49f8456f38acc7053098a02e7dbee2489b1 100644 (file)
@@ -24,6 +24,7 @@ module CkIO_impl {
           /// Serialize setting up each file through this chare, so that all PEs
           /// have the same sequence
           entry void openFile(std::string name, CkCallback opened, Options opts);
+          entry [reductiontarget] void fileOpened(FileToken file);
 
           entry void prepareWriteSession(FileToken file, size_t bytes, size_t offset,
                                          CkCallback ready, CkCallback complete);
@@ -33,8 +34,7 @@ module CkIO_impl {
         group Manager
         {
           entry Manager();
-          entry void openFile(FileToken token, std::string name,
-                              CkCallback opened, Options opts);
+          entry void openFile(FileToken token, std::string name, Options opts);
           entry void close(FileToken token, CkCallback closed);
         };