Continue restructuring
authorPhil Miller <mille121@illinois.edu>
Mon, 8 Jul 2013 19:14:28 +0000 (14:14 -0500)
committerPhil Miller <mille121@illinois.edu>
Tue, 20 Aug 2013 23:28:21 +0000 (18:28 -0500)
src/libs/ck-libs/io/ckio.C
src/libs/ck-libs/io/ckio.ci
src/libs/ck-libs/io/ckio.h

index 2668a39f09bce9e15cd7e6524e3cf65a95dd6da8..a83e6d9137a07cebbad3a17243ddce4d015b3e73 100644 (file)
@@ -30,112 +30,171 @@ ssize_t pwrite(int fd, const void *buf, size_t count, off_t offset);
 #endif
 
 namespace Ck { namespace IO {
-    Manager::Manager() : nextToken(0), sessionsOpened(0) {
-      run();
-    }
-
-    void Manager::openWrite(std::string name, CkCallback opened, Options opts = Options()) {
-      thisProxy[0].prepareOutput_central(name, opened, opts);
-    }
-
-    void Manager::prepareWrite(FileToken file, size_t bytes, size_t offset
-                               CkCallback ready, CkCallback complete) {
-      thisProxy[0].write_prepareSession(file, bytes, offset, ready, complete);
-    }
-
-    void Manager::write(SessionToken token, const char *data, size_t bytes, size_t offset) {
+    
+    void write(SessionToken token, const char *data, size_t bytes, size_t offset) {
       Options &opts = files[token].opts;
       while (bytes > 0) {
         size_t stripeIndex = offset / opts.peStripe;
         int peIndex = stripeIndex % opts.activePEs;
         int pe = opts.basePE + peIndex * opts.skipPEs;
         size_t bytesToSend = std::min(bytes, opts.peStripe - offset % opts.peStripe);
-       thisProxy[pe].write_forwardData(token, data, bytesToSend, offset);
+       token.proxy[pe].forwardData(token, data, bytesToSend, offset);
        data += bytesToSend;
        offset += bytesToSend;
        bytes -= bytesToSend;
       }
     }
 
-    void Manager::write_forwardData(SessionToken token, const char *data, size_t bytes,
-                                   size_t offset) {
-      //files[token].bufferMap[(offset/stripeSize)*stripeSize] is the buffer to which this char should write to.
-      CkAssert(offset + bytes <= files[token].bytes);
-      // XXX: CkAssert(this is the right processor to receive this data)
-
-      size_t stripeSize = files[token].opts.peStripe;   
-      while(bytes > 0)
-      {
-       size_t stripeOffset = (offset/stripeSize)*stripeSize;
-       size_t expectedBufferSize = std::min(files[token].bytes - stripeOffset, stripeSize);
-       struct impl::buffer & currentBuffer = files[token].bufferMap[stripeOffset];
-       size_t bytesInCurrentStripe = std::min(expectedBufferSize - offset%stripeSize, bytes);
-
-       //check if buffer this element already exists in map. If not, insert and resize buffer to stripe size
-       currentBuffer.expect(expectedBufferSize);
-
-       currentBuffer.insertData(data, bytesInCurrentStripe, offset % stripeSize);
-
-       // Ready to flush?
-       if(currentBuffer.isFull()) {
-         //initializa params
-         int l = currentBuffer.bytes_filled_so_far;
-         char *d = &(currentBuffer.array[0]);
-         size_t bufferOffset = stripeOffset;
-         //write to file loop
-         while (l > 0) {
-           CmiInt8 ret = pwrite(files[token].fd, d, l, bufferOffset);
-           if (ret < 0) {
-             if (errno == EINTR) {
-               continue;
-             } else {
-               CkPrintf("Output failed on PE %d: %s", CkMyPe(), strerror(errno));
-               CkAbort("Giving up");
-             }
-            }
-           l -= ret;
-           d += ret;
-           bufferOffset += ret;
-         }
-         //write complete - remove this element from bufferMap and call dataWritten
-         thisProxy[0].write_dataWritten(token, currentBuffer.bytes_filled_so_far);
-         files[token].bufferMap.erase(stripeOffset);
-       }
-
-       bytes -= bytesInCurrentStripe;
-       data += bytesInCurrentStripe;
-       offset += bytesInCurrentStripe;
-      }
-    }
-
-    void Manager::write_dataWritten(Token token, size_t bytes) {
-      CkAssert(CkMyPe() == 0);
-
-      files[token].total_written += bytes;
-
-      if (files[token].total_written == files[token].bytes)
-       files[token].complete.send();
-    }
-
-    void Manager::prepareInput(const char *name, CkCallback ready, Options opts) {
-      CkAbort("not yet implemented");
-    }
-
-    void Manager::read(Token token, void *data, size_t bytes, size_t offset,
-                      CkCallback complete) {
-      CkAbort("not yet implemented");
-    }
-
-    int Manager::openFile(const std::string& name) {
-      int fd;
+    namespace impl {
+      CProxy_Director director;
+
+      class Director : public CBase_Director {
+        int filesOpened;
+        std::map<FileToken, impl::FileInfo> files;
+
+      public:
+        Director()
+          : filesOpened(0)
+        {
+          director = thisProxy;
+        }
+
+        void openFile(std::string name, CkCallback opened, Options opts) {
+          if (-1 == opts.peStripe)
+            opts.peStripe = 16 * 1024 * 1024;
+          if (-1 == opts.writeStripe)
+            opts.writeStripe = 4 * 1024 * 1024;
+
+          if (-1 == opts.activePEs) {
+            size_t numStripes = (bytes + opts.peStripe - 1) / opts.peStripe;
+            opts.activePEs = std::min((size_t)CkNumNodes(), numStripes);
+          }
+          if (-1 == opts.basePE)
+            opts.basePE = 0;
+          if (-1 == opts.skipPEs)
+            opts.skipPEs = CkMyNodeSize();
+
+          CkAssert(lastActivePE(opts) < CkNumPes());
+          CkAssert(opts.writeStripe <= opts.peStripe);
+
+          files[filesOpened] = impl::FileInfo(name, opts);
+          managers.openFile(filesOpened++, name, opened, opts);
+        }
+
+        void prepareWriteSession(FileToken file, size_t bytes, size_t offset,
+                                 CkCallback ready, CkCallback complete) {
+          int numElements = files[file].activePEs;
+          CkArrayOpts opts(numElements);
+          opts.setMap();
+          CProxy_WriteSession session =
+            CProxy_WriteSession::ckNew(file, bytes, offset, complete, opts);
+          ready.send(new SessionReadyMessage(session));
+        }
+      };
+
+      Manager *manager;
+
+      class Manager : public CBase_Manager {
+
+      public:
+        Manager()
+        {
+          manager = this;
+        }
+
+        void openFile(FileToken token, std::string name,
+                      CkCallback opened, Options opts) {
+          CkAssert(files.end() == files.find(token));
+          files[token] = impl::FileInfo(name, opts);
+
+          // Open file if we're one of the active PEs
+          // XXX: Or maybe wait until the first write-out, to smooth the metadata load?
+          if (((CkMyPe() - opts.basePE) % opts.skipPEs == 0 &&
+               CkMyPe() < lastActivePE(opts)) ||
+              true) {
+            files[token].fd = doOpenFile(name);
+          }
+
+          contribute(sizeof(FileToken), token, CkReduction::max_int, opened);
+        }
+
+      private:
+        std::map<FileToken, impl::FileInfo> files;
+
+        int doOpenFile(const std::string& name) {
+          int fd;
 #if defined(_WIN32)
-      fd = _open(name.c_str(), _O_WRONLY | _O_CREAT, _S_IREAD | _S_IWRITE);
+          fd = _open(name.c_str(), _O_WRONLY | _O_CREAT, _S_IREAD | _S_IWRITE);
 #else
-      fd = open(name.c_str(), O_WRONLY | O_CREAT, S_IRUSR | S_IWUSR);
+          fd = open(name.c_str(), O_WRONLY | O_CREAT, S_IRUSR | S_IWUSR);
 #endif
-      if (-1 == fd)
-       CkAbort("Failed to open a file for parallel output");
-      return fd;
+          if (-1 == fd)
+            CkAbort("Failed to open a file for parallel output");
+          return fd;
+        }
+      };
+
+      class WriteSession : public CBase_WriteSession {
+        FileToken file;
+        size_t sessionOffset;
+        size_t sessionBytes, myBytes, myBytesWritten;
+        CkCallback complete;
+
+      public:
+        WriteSession(FileToken file_, size_t offset_, size_t bytes_, CkCallback complete_)
+          : file(file_), offset(offset_), bytes(bytes_), complete(complete_)
+        { }
+
+        void forwardData(SessionToken token, const char *data, size_t bytes,
+                         size_t offset) {
+          //files[token].bufferMap[(offset/stripeSize)*stripeSize] is the buffer to which this char should write to.
+          CkAssert(offset + bytes <= files[token].bytes);
+          // XXX: CkAssert(this is the right processor to receive this data)
+
+          size_t stripeSize = files[token].opts.peStripe;   
+          while(bytes > 0) {
+            size_t stripeOffset = (offset/stripeSize)*stripeSize;
+            size_t expectedBufferSize = std::min(files[token].bytes - stripeOffset, stripeSize);
+            struct impl::buffer & currentBuffer = files[token].bufferMap[stripeOffset];
+            size_t bytesInCurrentStripe = std::min(expectedBufferSize - offset%stripeSize, bytes);
+
+            //check if buffer this element already exists in map. If not, insert and resize buffer to stripe size
+            currentBuffer.expect(expectedBufferSize);
+
+            currentBuffer.insertData(data, bytesInCurrentStripe, offset % stripeSize);
+
+            // Ready to flush?
+            if(currentBuffer.isFull()) {
+              //initializa params
+              int l = currentBuffer.bytes_filled_so_far;
+              char *d = &(currentBuffer.array[0]);
+              size_t bufferOffset = stripeOffset;
+              //write to file loop
+              while (l > 0) {
+                CmiInt8 ret = pwrite(files[token].fd, d, l, bufferOffset);
+                if (ret < 0) {
+                  if (errno == EINTR) {
+                    continue;
+                  } else {
+                    CkPrintf("Output failed on PE %d: %s", CkMyPe(), strerror(errno));
+                    CkAbort("Giving up");
+                  }
+                }
+                l -= ret;
+                d += ret;
+                bufferOffset += ret;
+              }
+              //write complete - remove this element from bufferMap and call dataWritten
+              thisProxy[0].write_dataWritten(token, currentBuffer.bytes_filled_so_far);
+              files[token].bufferMap.erase(stripeOffset);
+            }
+
+            bytes -= bytesInCurrentStripe;
+            data += bytesInCurrentStripe;
+            offset += bytesInCurrentStripe;
+          }
+        }
+      };
     }
   }
 }
index df13cf454a87148dc9a375366dcad2a874a7eaf3..e24de73f334a39dfca3b4cf2013357a3dda14530 100644 (file)
@@ -13,64 +13,17 @@ module CkIO {
 
           /// Serialize setting up each file through this chare, so that all PEs
           /// have the same sequence
-          entry void openFile(std::string name, CkCallback opened, Options opts) {
-            atomic {
-              if (-1 == opts.peStripe)
-                opts.peStripe = 16 * 1024 * 1024;
-              if (-1 == opts.writeStripe)
-                opts.writeStripe = 4 * 1024 * 1024;
-
-              if (-1 == opts.activePEs) {
-                size_t numStripes = (bytes + opts.peStripe - 1) / opts.peStripe;
-                opts.activePEs = std::min((size_t)CkNumNodes(), numStripes);
-              }
-              if (-1 == opts.basePE)
-                opts.basePE = 0;
-              if (-1 == opts.skipPEs)
-                opts.skipPEs = CkMyNodeSize();
-
-              CkAssert(lastActivePE(opts) < CkNumPes());
-              CkAssert(opts.writeStripe <= opts.peStripe);
-
-              files[filesOpened] = impl::FileInfo(name, opts);
-              managers.openFile(filesOpened++, name, opened, opts);
-            }
-          };
+          entry void openFile(std::string name, CkCallback opened, Options opts);
 
           entry void prepareWriteSession(FileToken file, size_t bytes, size_t offset,
-                                         CkCallback ready, CkCallback complete) {
-            atomic {
-              int numElements = files[file].activePEs;
-              CkArrayOpts opts(numElements);
-              opts.setMap();
-              CProxy_WriteSession session =
-                CProxy_WriteSession::ckNew(file, bytes, offset, complete, opts);
-              ready.send(new SessionReadyMessage(session));
-            }
-          };
+                                         CkCallback ready, CkCallback complete);
         }
 
         group Manager
         {
           entry Manager();
-
           entry void openFile(FileToken token, std::string name,
-                              CkCallback opened, Options opts) {
-            atomic {
-              CkAssert(files.end() == files.find(token));
-              files[token] = impl::FileInfo(name, opts);
-
-              // Open file if we're one of the active PEs
-              // XXX: Or maybe wait until the first write-out, to smooth the metadata load?
-              if (((CkMyPe() - opts.basePE) % opts.skipPEs == 0 &&
-                   CkMyPe() < lastActivePE(opts)) ||
-                  true) {
-                files[token].fd = doOpenFile(name);
-              }
-
-              contribute(sizeof(FileToken), token, CkReduction::max_int, opened);
-            }
-          };
+                              CkCallback opened, Options opts);
         };
 
         array [1D] WriteSession
index f21e13c94b38dd4ad3ed375c22510dd945232231..ca4fac3d6d95d8c8d531622d11f23655a184a05d 100644 (file)
@@ -38,7 +38,11 @@ namespace Ck { namespace IO {
     }
   };
 
-    struct FileReadyMsg;
+  struct FileReadyMsg;
+
+  void open(std::string name, CkCallback opened, Options opts);
+  void startSession(FileToken token, size_t offset, size_t bytes, CkCallback complete);
+  void write(SessionToken token, const char *data, size_t bytes, size_t offset);
   }
 }
 
@@ -119,8 +123,6 @@ namespace Ck { namespace IO {
   public:
     Manager();
 
-    Manager_SDAG_CODE;
-
     /// Application-facing methods, invoked locally on the calling PE
     void openWrite(std::string name, CkCallback opened, Options opts = Options());
     void prepareWrite(size_t bytes, size_t offset, CkCallback ready, CkCallback complete);
@@ -141,15 +143,11 @@ namespace Ck { namespace IO {
   private:
     int filesOpened, sessionsOpened;
     FileToken nextToken;
-    std::map<FileToken, impl::FileInfo> files;
     std::map<SessionToken, impl::SessionInfo> sessions;
 
-    CkCallback nextReady;
-
     int lastActivePE(const Options &opts) {
       return opts.basePE + (opts.activePEs-1)*opts.skipPEs;
     }
-    int openFile(const std::string& name);
   };
 
   }}