CkIO: Revise interface so that files and sessions can be communicated more easily
authorPhil Miller <mille121@illinois.edu>
Tue, 23 Jul 2013 21:09:11 +0000 (16:09 -0500)
committerPhil Miller <mille121@illinois.edu>
Tue, 20 Aug 2013 23:28:52 +0000 (18:28 -0500)
src/libs/ck-libs/io/Makefile
src/libs/ck-libs/io/ckio.C
src/libs/ck-libs/io/ckio.ci
src/libs/ck-libs/io/ckio.h

index abad49cd3c9a156d6d93c82eae0efa04c0b05be0..42dc1dbc672e22ef482c548ba8545c8e08d66eb1 100644 (file)
@@ -5,7 +5,7 @@ MODULE=CkIO
 LIB = $(CDIR)/lib/libmodule$(MODULE).a
 LIBOBJ = ckio.o
 
-HEADERS = ckio.h $(MODULE).decl.h $(MODULE).def.h
+HEADERS = ckio.h $(MODULE).decl.h $(MODULE).def.h $(MODULE)_impl.decl.h $(MODULE)_impl.def.h
 
 all: $(LIBDEST)$(LIB)
 
index 58e3b5e8700e08e98e152de601cc2bf3141c2ed8..8ce32cd89e01417cbd3c934789af22d2eec1461a 100644 (file)
@@ -5,6 +5,7 @@
 
 typedef int FileToken;
 #include "CkIO.decl.h"
+#include "CkIO_impl.decl.h"
 
 #include <sys/stat.h>
 #include <fcntl.h>
@@ -48,23 +49,6 @@ namespace Ck { namespace IO {
       Manager *manager;
     }
 
-    class SessionReadyMsg : public CMessage_SessionReadyMsg {
-      FileToken file;
-      size_t bytes, offset;
-      impl::CProxy_WriteSession proxy;
-      friend class impl::Manager;
-    public:
-      SessionReadyMsg(FileToken file_, size_t bytes_, size_t offset_,
-                      CkArrayID sessionID)
-        : file(file_), bytes(bytes_), offset(offset_), proxy(sessionID)
-      { }
-    };
-
-    class FileReadyMsg : public CMessage_FileReadyMsg {
-    public:
-      FileToken token;
-      FileReadyMsg(const FileToken &tok) : token(tok) {}
-    };
 
     namespace impl {
       struct FileInfo {
@@ -133,7 +117,7 @@ namespace Ck { namespace IO {
           sessionOpts.setMap(managers);
           CProxy_WriteSession session =
             CProxy_WriteSession::ckNew(file, bytes, offset, complete, sessionOpts);
-          ready.send(new SessionReadyMsg(file, bytes, offset, session));
+          ready.send(new SessionReadyMsg(Session(file, bytes, offset, session)));
         }
 
         void close(FileToken token, CkCallback closed) {
@@ -168,18 +152,18 @@ namespace Ck { namespace IO {
           contribute(sizeof(FileToken), &token, CkReduction::max_int, opened);
         }
 
-        void write(SessionReadyMsg *session,
-                   const char *data, size_t bytes, size_t offset) {
-          Options &opts = files[session->file].opts;
+        void write(Session session, const char *data, size_t bytes, size_t offset) {
+          Options &opts = files[session.file].opts;
           size_t stripe = opts.peStripe;
 
-          size_t sessionStripeBase = (session->offset / stripe) * stripe;
+          size_t sessionStripeBase = (session.offset / stripe) * stripe;
 
           while (bytes > 0) {
             size_t stripeIndex = (offset - sessionStripeBase) / stripe;
             size_t bytesToSend = min(bytes, stripe - offset % stripe);
 
-            session->proxy[stripeIndex].forwardData(data, bytesToSend, offset);
+            CProxy_WriteSession(session.sessionID)[stripeIndex]
+              .forwardData(data, bytesToSend, offset);
 
             data += bytesToSend;
             offset += bytesToSend;
@@ -349,18 +333,17 @@ namespace Ck { namespace IO {
       impl::director.openFile(name, opened, opts);
     }
 
-    void startSession(FileReadyMsg *file, size_t bytes, size_t offset,
+    void startSession(File file, size_t bytes, size_t offset,
                       CkCallback ready, CkCallback complete) {
-      impl::director.prepareWriteSession(file->token, bytes, offset, ready, complete);
+      impl::director.prepareWriteSession(file.token, bytes, offset, ready, complete);
     }
 
-    void write(SessionReadyMsg *session,
-               const char *data, size_t bytes, size_t offset) {
+    void write(Session session, const char *data, size_t bytes, size_t offset) {
       impl::manager->write(session, data, bytes, offset);
     }
 
-    void close(FileReadyMsg *file, CkCallback closed) {
-      impl::director.close(file->token, closed);
+    void close(File file, CkCallback closed) {
+      impl::director.close(file.token, closed);
     }
 
     class SessionCommitMsg : public CMessage_SessionCommitMsg {
@@ -370,3 +353,4 @@ namespace Ck { namespace IO {
 }
 
 #include "CkIO.def.h"
+#include "CkIO_impl.def.h"
index a248aae848ed2fa2490de3fe7f31eabf0a07d06e..0fdea8d39d8d6ef38b84e7dea93e4441d99d67a4 100644 (file)
@@ -7,6 +7,10 @@ module CkIO {
     }
   }
 
+  initnode _registerCkIO_impl();
+};
+
+module CkIO_impl {
   include "ckio.h";
 
   namespace Ck { namespace IO {
index 2b2ae774b9ab68895547abd08b5a8c7f5c897923..cf6d36ed57ef7b4ee10fedc62303e58ba0c7cda2 100644 (file)
@@ -5,10 +5,9 @@
 #include <pup.h>
 #include <ckcallback.h>
 
-namespace Ck { namespace IO {
-  class FileReadyMsg;
-  class SessionReadyMsg;
+#include <CkIO.decl.h>
 
+namespace Ck { namespace IO {
   struct Options {
     Options()
       : peStripe(-1), writeStripe(-1), activePEs(-1), basePE(-1), skipPEs(-1)
@@ -34,6 +33,9 @@ namespace Ck { namespace IO {
     }
   };
 
+  class File;
+  class Session;
+
   /// Open the named file on the selected subset of PEs, and send a
   /// FileReadyMsg to the opened callback when the system is ready to accept
   /// session requests on that file.
@@ -44,15 +46,62 @@ namespace Ck { namespace IO {
   /// SessionReadyMsg will be sent to the ready callback. When all of the data
   /// has been written and synced, a message will be sent to the complete
   /// callback.
-  void startSession(FileReadyMsg *file, size_t bytes, size_t offset,
+  void startSession(File file, size_t bytes, size_t offset,
                     CkCallback ready, CkCallback complete);
 
   /// Write the given data into the file to which session is attached. The
   /// offset is relative to the file as a whole, not to the session's offset.
-  void write(SessionReadyMsg *session, const char *data, size_t bytes, size_t offset);
+  void write(Session session, const char *data, size_t bytes, size_t offset);
 
   /// Close a previously-opened file. All sessions on that file must have
   /// already signalled that they are complete.
-  void close(FileReadyMsg *file, CkCallback closed);
+  void close(File file, CkCallback closed);
+
+  class File {
+    int token;
+    friend void startSession(File file, size_t bytes, size_t offset,
+                             CkCallback ready, CkCallback complete);
+    friend void close(File file, CkCallback closed);
+    friend class FileReadyMsg;
+
+  public:
+    File(int token_) : token(token_) { }
+    File() : token(-1) { }
+    void pup(PUP::er &p) { p|token; }
+  };
+
+  class FileReadyMsg : public CMessage_FileReadyMsg {
+  public:
+    File file;
+    FileReadyMsg(const File &tok) : file(tok) {}
+  };
+
+  namespace impl { class Manager; }
+
+  class Session {
+    int file;
+    size_t bytes, offset;
+    CkArrayID sessionID;
+    friend class Ck::IO::impl::Manager;
+  public:
+    Session(int file_, size_t bytes_, size_t offset_,
+            CkArrayID sessionID_)
+      : file(file_), bytes(bytes_), offset(offset_), sessionID(sessionID_)
+      { }
+    Session() { }
+    void pup(PUP::er &p) {
+      p|file;
+      p|bytes;
+      p|offset;
+      p|sessionID;
+    }
+  };
+
+  class SessionReadyMsg : public CMessage_SessionReadyMsg {
+  public:
+    Session session;
+    SessionReadyMsg(Session session_) : session(session_) { }
+  };
+
 }}
 #endif