Adapt forwarding and flushing logic to new structure
authorPhil Miller <mille121@illinois.edu>
Tue, 9 Jul 2013 21:56:09 +0000 (16:56 -0500)
committerPhil Miller <mille121@illinois.edu>
Tue, 20 Aug 2013 23:28:28 +0000 (18:28 -0500)
src/libs/ck-libs/io/ckio.C
src/libs/ck-libs/io/ckio.ci

index 10f5b707908308fe0dc368da5a4eb53d693e3f8e..4f1f2acd324a4279b55895d1eb9c61cea1bc8cc7 100644 (file)
@@ -131,12 +131,16 @@ namespace Ck { namespace IO {
         void write(SessionReadyMsg *session,
                    const char *data, size_t bytes, size_t offset) {
           Options &opts = files[session->file].opts;
+          size_t stripe = opts.peStripe;
+
+          size_t sessionStripeBase = (session->offset / stripe) * stripe;
+
           while (bytes > 0) {
-            size_t stripeIndex = offset / opts.peStripe;
-            int peIndex = stripeIndex % opts.activePEs;
-            int pe = opts.basePE + peIndex * opts.skipPEs;
-            size_t bytesToSend = std::min(bytes, opts.peStripe - offset % opts.peStripe);
-            session->proxy[pe].forwardData(data, bytesToSend, offset);
+            size_t stripeIndex = (offset - sessionStripeBase) / stripe;
+            size_t bytesToSend = std::min(bytes, stripe - offset % stripe);
+
+            session->proxy[stripeIndex].forwardData(data, bytesToSend, offset);
+
             data += bytesToSend;
             offset += bytesToSend;
             bytes -= bytesToSend;
@@ -145,6 +149,10 @@ namespace Ck { namespace IO {
 
         int procNum(int arrayHdl,const CkArrayIndex &element)
         {
+#if 0
+          int peIndex = stripeIndex % opts.activePEs;
+          int pe = opts.basePE + peIndex * opts.skipPEs;
+#endif
           return 0;
         }
 
@@ -219,42 +227,22 @@ namespace Ck { namespace IO {
           CkAssert(offset >= myOffset);
           CkAssert(offset + bytes < myOffset + myBytes);
 
-          size_t stripeSize = file->opts.peStripe;
+          size_t stripeSize = file->opts.writeStripe;
 
           while (bytes > 0) {
-            size_t stripeOffset = (offset/stripeSize)*stripeSize;
-            size_t expectedBufferSize = std::min(file->bytes - stripeOffset, stripeSize);
-            buffer& currentBuffer = bufferMap[stripeOffset];
-            size_t bytesInCurrentStripe = std::min(expectedBufferSize - offset%stripeSize, bytes);
+            size_t stripeBase = (offset/stripeSize)*stripeSize;
+            size_t stripeOffset = std::max(stripeBase, myOffset);
+            size_t nextStripe = stripeBase + stripeSize;
+            size_t expectedBufferSize = std::min(nextStripe, myOffset + myBytes) - stripeOffset;
+            size_t bytesInCurrentStripe = std::min(nextStripe - offset, bytes);
 
-            //check if buffer this element already exists in map. If not, insert and resize buffer to stripe size
+            buffer& currentBuffer = bufferMap[stripeOffset];
             currentBuffer.expect(expectedBufferSize);
 
-            currentBuffer.insertData(data, bytesInCurrentStripe, offset % stripeSize);
+            currentBuffer.insertData(data, bytesInCurrentStripe, offset - stripeOffset);
 
-            // Ready to flush?
             if (currentBuffer.isFull()) {
-              //initializa params
-              int l = currentBuffer.bytes_filled_so_far;
-              char *d = &(currentBuffer.array[0]);
-              size_t bufferOffset = stripeOffset;
-              //write to file loop
-              while (l > 0) {
-                CmiInt8 ret = pwrite(file->fd, d, l, bufferOffset);
-                if (ret < 0) {
-                  if (errno == EINTR) {
-                    continue;
-                  } else {
-                    CkPrintf("Output failed on PE %d: %s", CkMyPe(), strerror(errno));
-                    CkAbort("Giving up");
-                  }
-                }
-                l -= ret;
-                d += ret;
-                bufferOffset += ret;
-              }
-              //write complete - remove this element from bufferMap and call dataWritten
-              myBytesWritten += currentBuffer.bytes_filled_so_far;
+              flushBuffer(currentBuffer, stripeOffset);
               bufferMap.erase(stripeOffset);
             }
 
@@ -263,6 +251,27 @@ namespace Ck { namespace IO {
             offset += bytesInCurrentStripe;
           }
         }
+
+        void flushBuffer(buffer& buf, size_t bufferOffset) {
+          int l = buf.bytes_filled_so_far;
+          char *d = &(buf.array[0]);
+
+          while (l > 0) {
+            CmiInt8 ret = pwrite(file->fd, d, l, bufferOffset);
+            if (ret < 0) {
+              if (errno == EINTR) {
+                continue;
+              } else {
+                CkPrintf("Output failed on PE %d: %s", CkMyPe(), strerror(errno));
+                CkAbort("Giving up");
+              }
+            }
+            l -= ret;
+            d += ret;
+            bufferOffset += ret;
+          }
+          myBytesWritten += buf.bytes_filled_so_far;
+        }
       };
     }
 
index 03849132ca91f7857f37ab1f3dacea2c6a0f61b0..5b0a9a39a0442624edd776bdd19227ed0c5e3945 100644 (file)
@@ -25,7 +25,7 @@ module CkIO {
                                          CkCallback ready, CkCallback complete);
         }
 
-        group Manager : CkArrayMap
+        group Manager // : CkArrayMap
         {
           entry Manager();
           entry void openFile(FileToken token, std::string name,