WIP
authorPhil Miller <mille121@illinois.edu>
Tue, 2 Jul 2013 22:38:02 +0000 (17:38 -0500)
committerPhil Miller <mille121@illinois.edu>
Tue, 20 Aug 2013 23:28:16 +0000 (18:28 -0500)
src/libs/ck-libs/io/ckio.C
src/libs/ck-libs/io/ckio.ci
src/libs/ck-libs/io/ckio.h

index 918b8d808316661e13d2ff74649041b21a8e011e..2668a39f09bce9e15cd7e6524e3cf65a95dd6da8 100644 (file)
@@ -30,17 +30,20 @@ ssize_t pwrite(int fd, const void *buf, size_t count, off_t offset);
 #endif
 
 namespace Ck { namespace IO {
-    Manager::Manager() : nextToken(0) {
+    Manager::Manager() : nextToken(0), sessionsOpened(0) {
       run();
     }
 
-    void Manager::prepareOutput(const char *name, size_t bytes,
-                               CkCallback ready, CkCallback complete,
-                               Options opts) {
-      thisProxy[0].prepareOutput_central(name, bytes, ready, complete, opts);
+    void Manager::openWrite(std::string name, CkCallback opened, Options opts = Options()) {
+      thisProxy[0].prepareOutput_central(name, opened, opts);
     }
 
-    void Manager::write(Token token, const char *data, size_t bytes, size_t offset) {
+    void Manager::prepareWrite(FileToken file, size_t bytes, size_t offset
+                               CkCallback ready, CkCallback complete) {
+      thisProxy[0].write_prepareSession(file, bytes, offset, ready, complete);
+    }
+
+    void Manager::write(SessionToken token, const char *data, size_t bytes, size_t offset) {
       Options &opts = files[token].opts;
       while (bytes > 0) {
         size_t stripeIndex = offset / opts.peStripe;
@@ -54,7 +57,7 @@ namespace Ck { namespace IO {
       }
     }
 
-    void Manager::write_forwardData(Token token, const char *data, size_t bytes,
+    void Manager::write_forwardData(SessionToken token, const char *data, size_t bytes,
                                    size_t offset) {
       //files[token].bufferMap[(offset/stripeSize)*stripeSize] is the buffer to which this char should write to.
       CkAssert(offset + bytes <= files[token].bytes);
index efd8cc37269073e0119f12142450c99624150612..38f5fb222f86cc0bd857047689d127758e01afe9 100644 (file)
@@ -5,20 +5,18 @@ module CkIO {
       group Manager {
        entry Manager();
 
-       entry void prepareOutput_central(std::string name, size_t bytes,
-                                        CkCallback ready, CkCallback complete,
+       entry void prepareOutput_central(std::string name, CkCallback opened,
                                         Options opts);
-       entry void prepareOutput_distrib(Token token, std::string name, size_t bytes,
+       entry void prepareOutput_distrib(FileToken token, std::string name,
                                         Options opts);
-       entry void prepareOutput_readied(CkReductionMsg *m);
+       entry void prepareOutput_opened(CkReductionMsg *m);
 
        /// Serialize setting up each file, so that all PEs have the same sequence
        entry void run() {
          for (filesOpened = 0; true; filesOpened++) {
            if (CkMyPe() == 0)
-             when prepareOutput_central(std::string name, size_t bytes,
-                                        CkCallback ready, CkCallback complete,
-                                        Options opts) atomic {
+             when prepareOutput_central(std::string name, CkCallback opened,
+                                         Options opts) atomic {
                if (-1 == opts.peStripe)
                  opts.peStripe = 16 * 1024 * 1024;
                if (-1 == opts.writeStripe)
@@ -35,19 +33,18 @@ module CkIO {
 
                CkAssert(lastActivePE(opts) < CkNumPes());
                CkAssert(opts.writeStripe <= opts.peStripe);
-               nextReady = ready;
+               nextReady = opened;
 
-               thisProxy.prepareOutput_distrib(nextToken, name, bytes, opts);
+               thisProxy.prepareOutput_distrib(nextToken, name, opts);
 
-               files[nextToken] = impl::FileInfo(name, bytes, opts);
-               files[nextToken].complete = complete;
+               files[nextToken] = impl::FileInfo(name, opts);
              }
 
-           when prepareOutput_distrib[filesOpened](Token token, std::string name, size_t bytes,
+           when prepareOutput_distrib[filesOpened](FileToken token, std::string name,
                                                    Options opts) atomic {
              if (CkMyPe() != 0) {
                CkAssert(files.end() == files.find(token));
-               files[token] = impl::FileInfo(name, bytes, opts);
+               files[token] = impl::FileInfo(name, opts);
              }
 
              // Open file if we're one of the active PEs
@@ -58,13 +55,13 @@ module CkIO {
                files[token].fd = openFile(name);
              }
 
-             contribute(CkCallback(CkIndex_Manager::prepareOutput_readied(0),
+             contribute(CkCallback(CkIndex_Manager::prepareOutput_opened(0),
                                    thisProxy[0]),
                         filesOpened);
            }
 
            if (CkMyPe() == 0)
-             when prepareOutput_readied[filesOpened](CkReductionMsg *m) atomic {
+             when prepareOutput_opened[filesOpened](CkReductionMsg *m) atomic {
                delete m;
                FileReadyMsg *f = new FileReadyMsg(nextToken++);
                nextReady.send(f);
@@ -73,9 +70,22 @@ module CkIO {
 
        };
 
-       entry void write_forwardData(Token token, const char data[bytes],
+        entry void write_prepareSession(FileToken file, size_t bytes, size_t offset,
+                                        CkCallback ready, CkCallback complete) {
+          atomic {
+            CkAssert(CkMyPe() == 0);
+            thisProxy.write_announceSession(file, sessionsOpened++, bytes, offset);
+          }
+          // 
+          when sessionReady[sessionsOpened - 1](CkReductionMsg *m) { delete m; }
+          atomic { ready.send(new SessionReadyMsg(/*session ID */)); }
+        };
+
+        entry void write_announceSession(FileToken file, SessionToken session,
+                                         size_t bytes, size_t offset);
+       entry void write_forwardData(SessionToken token, const char data[bytes],
                                     size_t bytes, size_t offset);
-       entry void write_dataWritten(Token token, size_t bytes);
+       entry void write_dataWritten(SessionToken token, size_t bytes);
       };
     }
 
index 93cd8f7bbc282d6f6d5570934984449e20904e8a..f21e13c94b38dd4ad3ed375c22510dd945232231 100644 (file)
@@ -10,7 +10,8 @@
 
 namespace Ck { namespace IO {
   /// Identifier for a file to be accessed
-  typedef int Token;
+  typedef int FileToken;
+  typedef int SessionToken;
 
   struct Options {
     Options()
@@ -47,8 +48,8 @@ namespace Ck { namespace IO {
 
 namespace Ck { namespace IO {
   struct FileReadyMsg : public CMessage_FileReadyMsg {
-    Token token;
-    FileReadyMsg(const Token &tok) : token(tok) {}
+    FileToken token;
+    FileReadyMsg(const FileToken &tok) : token(tok) {}
   };
 
   namespace impl {  
@@ -81,21 +82,31 @@ namespace Ck { namespace IO {
       }
     };
     
+    struct SessionInfo {
+      FileToken file;
+      size_t bytes, offset, total_written;
+      int pesReady;
+      CkCallback complete;
+      std::map<size_t, struct buffer> bufferMap;
+
+      SessionInfo(FileToken file_, size_t bytes_, size_t offset_, CkCallback complete_)
+        : file(file_), bytes(bytes_), offset(offset_), complete(complete_)
+        { }
+      SessionInfo()
+        : file(-1)
+    };
 
     struct FileInfo {
       std::string name;
       Options opts;
-      size_t bytes, total_written;
       int fd;
-      CkCallback complete;
-      std::map<size_t, struct buffer> bufferMap;
 
-    FileInfo(std::string name_, size_t bytes_, Options opts_)
-    : name(name_), opts(opts_), bytes(bytes_), total_written(0), fd(-1)
-      { }
-    FileInfo()
-    : bytes(-1), total_written(-1), fd(-1)
-      { }
+      FileInfo(std::string name_, Options opts_)
+        : name(name_), opts(opts_), fd(-1)
+        { }
+      FileInfo()
+        : fd(-1)
+        { }
     };
   }
 
@@ -111,25 +122,28 @@ namespace Ck { namespace IO {
     Manager_SDAG_CODE;
 
     /// Application-facing methods, invoked locally on the calling PE
-    void prepareOutput(const char *name, size_t bytes,
-                      CkCallback ready, CkCallback complete,
-                      Options opts = Options());
-    void write(Token token, const char *data, size_t bytes, size_t offset);
+    void openWrite(std::string name, CkCallback opened, Options opts = Options());
+    void prepareWrite(size_t bytes, size_t offset, CkCallback ready, CkCallback complete);
+    void write(FileToken file, SessionToken session,
+               const char *data, size_t bytes, size_t offset);
 
+#if 0
     void prepareInput(const char *name, CkCallback ready,
                      Options opts = Options());
     void read(Token token, void *data, size_t bytes, size_t offset,
              CkCallback complete);
-
+#endif
 
     /// Internal methods, used for interaction among IO managers across the system
-    void write_forwardData(Token token, const char *data, size_t bytes, size_t offset);
-    void write_dataWritten(Token token, size_t bytes);
+    void write_forwardData(SessionToken token, const char *data, size_t bytes, size_t offset);
+    void write_dataWritten(SessionToken token, size_t bytes);
 
   private:
-    int filesOpened;
-    Token nextToken;
-    std::map<Token, impl::FileInfo> files;
+    int filesOpened, sessionsOpened;
+    FileToken nextToken;
+    std::map<FileToken, impl::FileInfo> files;
+    std::map<SessionToken, impl::SessionInfo> sessions;
+
     CkCallback nextReady;
 
     int lastActivePE(const Options &opts) {