Rearchitect to separate Director chare, Manager group, and WriteSession array
authorPhil Miller <mille121@illinois.edu>
Sat, 6 Jul 2013 00:27:45 +0000 (19:27 -0500)
committerPhil Miller <mille121@illinois.edu>
Tue, 20 Aug 2013 23:28:18 +0000 (18:28 -0500)
src/libs/ck-libs/io/ckio.ci

index 38f5fb222f86cc0bd857047689d127758e01afe9..5329d0e501d7606662f53371459796daf7709ccc 100644 (file)
@@ -2,92 +2,79 @@ module CkIO {
   namespace Ck { namespace IO {
       message FileReadyMsg;
 
-      group Manager {
-       entry Manager();
-
-       entry void prepareOutput_central(std::string name, CkCallback opened,
-                                        Options opts);
-       entry void prepareOutput_distrib(FileToken token, std::string name,
-                                        Options opts);
-       entry void prepareOutput_opened(CkReductionMsg *m);
+      readonly CProxy_Director director;
 
-       /// Serialize setting up each file, so that all PEs have the same sequence
-       entry void run() {
-         for (filesOpened = 0; true; filesOpened++) {
-           if (CkMyPe() == 0)
-             when prepareOutput_central(std::string name, CkCallback opened,
-                                         Options opts) atomic {
-               if (-1 == opts.peStripe)
-                 opts.peStripe = 16 * 1024 * 1024;
-               if (-1 == opts.writeStripe)
-                 opts.writeStripe = 4 * 1024 * 1024;
+      mainchare Director
+      {
+        entry Director();
 
-               if (-1 == opts.activePEs) {
-                 size_t numStripes = (bytes + opts.peStripe - 1) / opts.peStripe;
-                 opts.activePEs = std::min((size_t)CkNumNodes(), numStripes);
-               }
-               if (-1 == opts.basePE)
-                 opts.basePE = 0;
-               if (-1 == opts.skipPEs)
-                 opts.skipPEs = CkMyNodeSize();
-
-               CkAssert(lastActivePE(opts) < CkNumPes());
-               CkAssert(opts.writeStripe <= opts.peStripe);
-               nextReady = opened;
+       /// Serialize setting up each file through this chare, so that all PEs
+       /// have the same sequence
+       entry void openFile(std::string name, CkCallback opened, Options opts) {
+          atomic {
+            if (-1 == opts.peStripe)
+              opts.peStripe = 16 * 1024 * 1024;
+            if (-1 == opts.writeStripe)
+              opts.writeStripe = 4 * 1024 * 1024;
 
-               thisProxy.prepareOutput_distrib(nextToken, name, opts);
+            if (-1 == opts.activePEs) {
+              size_t numStripes = (bytes + opts.peStripe - 1) / opts.peStripe;
+              opts.activePEs = std::min((size_t)CkNumNodes(), numStripes);
+            }
+            if (-1 == opts.basePE)
+              opts.basePE = 0;
+            if (-1 == opts.skipPEs)
+              opts.skipPEs = CkMyNodeSize();
 
-               files[nextToken] = impl::FileInfo(name, opts);
-             }
+            CkAssert(lastActivePE(opts) < CkNumPes());
+            CkAssert(opts.writeStripe <= opts.peStripe);
 
-           when prepareOutput_distrib[filesOpened](FileToken token, std::string name,
-                                                   Options opts) atomic {
-             if (CkMyPe() != 0) {
-               CkAssert(files.end() == files.find(token));
-               files[token] = impl::FileInfo(name, opts);
-             }
+            files[filesOpened] = impl::FileInfo(name, opts);
+            managers.openFile(filesOpened++, name, opened, opts);
+          }
+        };
 
-             // Open file if we're one of the active PEs
-             // XXX: Or maybe wait until the first write-out, to smooth the metadata load?
-             if (((CkMyPe() - opts.basePE) % opts.skipPEs == 0 &&
-                  CkMyPe() < lastActivePE(opts)) ||
-                 true) {
-               files[token].fd = openFile(name);
-             }
+        entry void prepareWriteSession(FileToken file, size_t bytes, size_t offset,
+                                       CkCallback ready, CkCallback complete) {
+          atomic {
+            int numElements = files[file].activePEs;
+            CkArrayOpts opts(numElements);
+            opts.setMap();
+            CProxy_WriteSession::ckNew(file, bytes, offset, ready, complete, opts);
+          }
+        };
+      }
 
-             contribute(CkCallback(CkIndex_Manager::prepareOutput_opened(0),
-                                   thisProxy[0]),
-                        filesOpened);
-           }
+      group Manager
+      {
+       entry Manager();
 
-           if (CkMyPe() == 0)
-             when prepareOutput_opened[filesOpened](CkReductionMsg *m) atomic {
-               delete m;
-               FileReadyMsg *f = new FileReadyMsg(nextToken++);
-               nextReady.send(f);
-             }
-         }
+        entry void openFile(FileToken token, std::string name,
+                            CkCallback opened, Options opts) {
+          atomic {
+            CkAssert(files.end() == files.find(token));
+            files[token] = impl::FileInfo(name, opts);
 
-       };
+            // Open file if we're one of the active PEs
+            // XXX: Or maybe wait until the first write-out, to smooth the metadata load?
+            if (((CkMyPe() - opts.basePE) % opts.skipPEs == 0 &&
+                 CkMyPe() < lastActivePE(opts)) ||
+                true) {
+              files[token].fd = doOpenFile(name);
+            }
 
-        entry void write_prepareSession(FileToken file, size_t bytes, size_t offset,
-                                        CkCallback ready, CkCallback complete) {
-          atomic {
-            CkAssert(CkMyPe() == 0);
-            thisProxy.write_announceSession(file, sessionsOpened++, bytes, offset);
+            contribute(sizeof(FileToken), token, CkReduction::max_int, opened);
           }
-          // 
-          when sessionReady[sessionsOpened - 1](CkReductionMsg *m) { delete m; }
-          atomic { ready.send(new SessionReadyMsg(/*session ID */)); }
         };
+      };
 
-        entry void write_announceSession(FileToken file, SessionToken session,
-                                         size_t bytes, size_t offset);
-       entry void write_forwardData(SessionToken token, const char data[bytes],
-                                    size_t bytes, size_t offset);
-       entry void write_dataWritten(SessionToken token, size_t bytes);
+      array [1D] WriteSession
+      {
+        entry WriteSession(FileToken file, size_t offset, size_t bytes,
+                           CkCallback ready, CkCallback complete);
+       entry void forwardData(size_t offset, size_t bytes,
+                               const char data[bytes]);
       };
     }
-
   }
 }