Restructuring mostly done
authorPhil Miller <mille121@illinois.edu>
Tue, 9 Jul 2013 21:13:08 +0000 (16:13 -0500)
committerPhil Miller <mille121@illinois.edu>
Tue, 20 Aug 2013 23:28:25 +0000 (18:28 -0500)
src/libs/ck-libs/io/ckio.C
src/libs/ck-libs/io/ckio.ci
src/libs/ck-libs/io/ckio.h

index 5711ff06e8a1e00b2857e6b87cc658b4f55a3694..36988c6c31257156019f1e6ff0ff152f238f4cc5 100644 (file)
@@ -4,6 +4,7 @@
 #include <errno.h>
 #include <algorithm>
 #include <sys/stat.h>
+#include <fcntl.h>
 
 #if defined(_WIN32)
 #include <io.h>
@@ -35,30 +36,20 @@ namespace Ck { namespace IO {
 
     namespace impl {
       CProxy_Director director;
+      Manager *manager;
     }
 
-    void open(std::string name, CkCallback opened, Options opts) {
-      impl::director.openFile(name, opened, opts);
-    }
-
-    void startSession(FileToken token, size_t bytes, size_t offset,
-                      CkCallback ready, CkCallback complete) {
-      impl::director.prepareWriteSession(token, bytes, offset, ready, complete);
-    }
-
-    void write(SessionToken token, const char *data, size_t bytes, size_t offset) {
-      Options &opts = files[token].opts;
-      while (bytes > 0) {
-        size_t stripeIndex = offset / opts.peStripe;
-        int peIndex = stripeIndex % opts.activePEs;
-        int pe = opts.basePE + peIndex * opts.skipPEs;
-        size_t bytesToSend = std::min(bytes, opts.peStripe - offset % opts.peStripe);
-       token.proxy[pe].forwardData(token, data, bytesToSend, offset);
-       data += bytesToSend;
-       offset += bytesToSend;
-       bytes -= bytesToSend;
-      }
-    }
+    class SessionReadyMsg : public CMessage_SessionReadyMsg {
+      FileToken file;
+      size_t bytes, offset;
+      impl::CProxy_WriteSession proxy;
+      friend class impl::Manager;
+    public:
+      SessionReadyMsg(FileToken file_, size_t bytes_, size_t offset_,
+                      CkArrayID sessionID)
+        : file(file_), bytes(bytes_), offset(offset_), proxy(sessionID)
+      { }
+    };
 
     namespace impl {
 
@@ -68,9 +59,10 @@ namespace Ck { namespace IO {
         CProxy_Manager managers;
 
       public:
-        Director()
+        Director(CkArgMsg *m)
           : filesOpened(0)
         {
+          delete m;
           director = thisProxy;
           managers = CProxy_Manager::ckNew();
         }
@@ -80,36 +72,36 @@ namespace Ck { namespace IO {
             opts.peStripe = 16 * 1024 * 1024;
           if (-1 == opts.writeStripe)
             opts.writeStripe = 4 * 1024 * 1024;
-
-          if (-1 == opts.activePEs) {
-            size_t numStripes = (bytes + opts.peStripe - 1) / opts.peStripe;
-            opts.activePEs = std::min((size_t)CkNumNodes(), numStripes);
-          }
+          if (-1 == opts.activePEs)
+            opts.activePEs = std::min(CkNumPes(), 32);
           if (-1 == opts.basePE)
             opts.basePE = 0;
           if (-1 == opts.skipPEs)
             opts.skipPEs = CkMyNodeSize();
 
-          CkAssert(lastActivePE(opts) < CkNumPes());
-          CkAssert(opts.writeStripe <= opts.peStripe);
-
           files[filesOpened] = FileInfo(name, opts);
           managers.openFile(filesOpened++, name, opened, opts);
         }
 
         void prepareWriteSession(FileToken file, size_t bytes, size_t offset,
                                  CkCallback ready, CkCallback complete) {
-          int numElements = files[file].activePEs;
-          CkArrayOpts opts(numElements);
-          opts.setMap();
+          Options &opts = files[file].opts;
+
+          // XXX: Replace with a direct calculation
+          int numStripes = 0, o = offset;
+          while (o < offset + bytes) {
+            numStripes++;
+            o += opts.peStripe - o % opts.peStripe;
+          }
+
+          CkArrayOptions sessionOpts(numStripes);
+          sessionOpts.setMap(managers);
           CProxy_WriteSession session =
-            CProxy_WriteSession::ckNew(file, bytes, offset, complete, opts);
-          ready.send(new SessionReadyMessage(session));
+            CProxy_WriteSession::ckNew(file, bytes, offset, complete, sessionOpts);
+          ready.send(new SessionReadyMsg(file, bytes, offset, session));
         }
       };
 
-      Manager *manager;
-
       class Manager : public CBase_Manager {
 
       public:
@@ -121,6 +113,8 @@ namespace Ck { namespace IO {
         void openFile(FileToken token, std::string name,
                       CkCallback opened, Options opts) {
           CkAssert(files.end() == files.find(token));
+          CkAssert(lastActivePE(opts) < CkNumPes());
+          CkAssert(opts.writeStripe <= opts.peStripe);
           files[token] = impl::FileInfo(name, opts);
 
           // Open file if we're one of the active PEs
@@ -134,24 +128,49 @@ namespace Ck { namespace IO {
           contribute(sizeof(FileToken), token, CkReduction::max_int, opened);
         }
 
+        void write(SessionReadyMsg *session,
+                   const char *data, size_t bytes, size_t offset) {
+          Options &opts = files[session->file].opts;
+          while (bytes > 0) {
+            size_t stripeIndex = offset / opts.peStripe;
+            int peIndex = stripeIndex % opts.activePEs;
+            int pe = opts.basePE + peIndex * opts.skipPEs;
+            size_t bytesToSend = std::min(bytes, opts.peStripe - offset % opts.peStripe);
+            session->proxy[pe].forwardData(data, bytesToSend, offset);
+            data += bytesToSend;
+            offset += bytesToSend;
+            bytes -= bytesToSend;
+          }
+        }
+
+        int procNum(int arrayHdl,const CkArrayIndex &element)
+        {
+          return 0;
+        }
+
       private:
         std::map<FileToken, impl::FileInfo> files;
+        friend class WriteSession;
 
         int doOpenFile(const std::string& name) {
           int fd;
 #if defined(_WIN32)
           fd = _open(name.c_str(), _O_WRONLY | _O_CREAT, _S_IREAD | _S_IWRITE);
 #else
-          fd = open(name.c_str(), O_WRONLY | O_CREAT, S_IRUSR | S_IWUSR);
+          fd = ::open(name.c_str(), O_WRONLY | O_CREAT, S_IRUSR | S_IWUSR);
 #endif
           if (-1 == fd)
             CkAbort("Failed to open a file for parallel output");
           return fd;
         }
+
+        int lastActivePE(const Options &opts) {
+          return opts.basePE + (opts.activePEs-1)*opts.skipPEs;
+        }
       };
 
       class WriteSession : public CBase_WriteSession {
-        FileToken file;
+        const FileInfo *file;
         size_t sessionOffset, myOffset;
         size_t sessionBytes, myBytes, myBytesWritten;
         CkCallback complete;
@@ -183,19 +202,29 @@ namespace Ck { namespace IO {
 
       public:
         WriteSession(FileToken file_, size_t offset_, size_t bytes_, CkCallback complete_)
-          : file(file_), offset(offset_), bytes(bytes_), complete(complete_)
+          : file(&manager->files[file_])
+          , sessionOffset(offset_)
+          , myOffset()
+          , sessionBytes(bytes_)
+          , myBytes()
+          , myBytesWritten(0)
+          , complete(complete_)
         { }
 
+        WriteSession(CkMigrateMessage *m) { }
+
         void forwardData(const char *data, size_t bytes, size_t offset) {
           //files[token].bufferMap[(offset/stripeSize)*stripeSize] is the buffer to which this char should write to.
-          CkAssert(offset + bytes <= files[token].bytes);
-          // XXX: CkAssert(this is the right processor to receive this data)
 
-          size_t stripeSize = files[token].opts.peStripe;   
-          while(bytes > 0) {
+          CkAssert(offset >= myOffset);
+          CkAssert(offset + bytes < myOffset + myBytes);
+
+          size_t stripeSize = file->opts.peStripe;
+
+          while (bytes > 0) {
             size_t stripeOffset = (offset/stripeSize)*stripeSize;
-            size_t expectedBufferSize = std::min(files[token].bytes - stripeOffset, stripeSize);
-            buffer& currentBuffer = files[token].bufferMap[stripeOffset];
+            size_t expectedBufferSize = std::min(file->bytes - stripeOffset, stripeSize);
+            buffer& currentBuffer = bufferMap[stripeOffset];
             size_t bytesInCurrentStripe = std::min(expectedBufferSize - offset%stripeSize, bytes);
 
             //check if buffer this element already exists in map. If not, insert and resize buffer to stripe size
@@ -204,14 +233,14 @@ namespace Ck { namespace IO {
             currentBuffer.insertData(data, bytesInCurrentStripe, offset % stripeSize);
 
             // Ready to flush?
-            if(currentBuffer.isFull()) {
+            if (currentBuffer.isFull()) {
               //initializa params
               int l = currentBuffer.bytes_filled_so_far;
               char *d = &(currentBuffer.array[0]);
               size_t bufferOffset = stripeOffset;
               //write to file loop
               while (l > 0) {
-                CmiInt8 ret = pwrite(files[token].fd, d, l, bufferOffset);
+                CmiInt8 ret = pwrite(file->fd, d, l, bufferOffset);
                 if (ret < 0) {
                   if (errno == EINTR) {
                     continue;
@@ -225,8 +254,8 @@ namespace Ck { namespace IO {
                 bufferOffset += ret;
               }
               //write complete - remove this element from bufferMap and call dataWritten
-              thisProxy[0].write_dataWritten(token, currentBuffer.bytes_filled_so_far);
-              files[token].bufferMap.erase(stripeOffset);
+              myBytesWritten += currentBuffer.bytes_filled_so_far;
+              bufferMap.erase(stripeOffset);
             }
 
             bytes -= bytesInCurrentStripe;
@@ -236,6 +265,24 @@ namespace Ck { namespace IO {
         }
       };
     }
+
+    void open(std::string name, CkCallback opened, Options opts) {
+      impl::director.openFile(name, opened, opts);
+    }
+
+    void startSession(FileToken token, size_t bytes, size_t offset,
+                      CkCallback ready, CkCallback complete) {
+      impl::director.prepareWriteSession(token, bytes, offset, ready, complete);
+    }
+
+    void write(SessionReadyMsg *session,
+               const char *data, size_t bytes, size_t offset) {
+      impl::manager->write(session, data, bytes, offset);
+    }
+
+    class SessionCommitMsg : public CMessage_SessionCommitMsg {
+
+    };
   }
 }
 
index ccb818f35d6e11a8df6c1632e581239e79275f5e..03849132ca91f7857f37ab1f3dacea2c6a0f61b0 100644 (file)
@@ -25,7 +25,7 @@ module CkIO {
                                          CkCallback ready, CkCallback complete);
         }
 
-        group Manager
+        group Manager : CkArrayMap
         {
           entry Manager();
           entry void openFile(FileToken token, std::string name,
@@ -36,8 +36,7 @@ module CkIO {
         {
           entry WriteSession(FileToken file, size_t offset, size_t bytes,
                              CkCallback complete);
-          entry void forwardData(size_t offset, size_t bytes,
-                                 const char data[bytes]);
+          entry void forwardData(const char data[bytes], size_t bytes, size_t offset);
         };
       }
     }
index 2ad675802237198edc358a84a314c421a397465b..04cebdc9c96b13e773f6cdc018b6426d0d722412 100644 (file)
@@ -1,17 +1,13 @@
 #ifndef CK_IO_H
 #define CK_IO_H
 
-#include <cstring>
 #include <string>
-//#include <algorithm>
-#include <utility>
-#include <fcntl.h>
 #include <pup_stl.h>
 
 namespace Ck { namespace IO {
   /// Identifier for a file to be accessed
   typedef int FileToken;
-  typedef int SessionToken;
+  class SessionReadyMessage;
 
   struct Options {
     Options()
@@ -43,7 +39,7 @@ namespace Ck { namespace IO {
   void open(std::string name, CkCallback opened, Options opts);
   void startSession(FileToken token, size_t bytes, size_t offset,
                     CkCallback ready, CkCallback complete);
-  void write(SessionToken token, const char *data, size_t bytes, size_t offset);
+  void write(SessionReadyMessage *session, const char *data, size_t bytes, size_t offset);
   }
 }
 
@@ -114,13 +110,7 @@ namespace Ck { namespace IO {
     void write_dataWritten(SessionToken token, size_t bytes);
 
   private:
-    int filesOpened, sessionsOpened;
-    FileToken nextToken;
     std::map<SessionToken, impl::SessionInfo> sessions;
-
-    int lastActivePE(const Options &opts) {
-      return opts.basePE + (opts.activePEs-1)*opts.skipPEs;
-    }
   };
 #endif