CkIO: wrap write stripes around the set of PEs involved
[charm.git] / src / libs / ck-libs / io / ckio.C
1 #include <ckio.h>
2 #include <errno.h>
3 #include <algorithm>
4
5
6 namespace Ck { namespace IO {
7     Manager::Manager() : nextToken(0) {
8       run();
9     }
10
11     void Manager::prepareOutput(const char *name, size_t bytes,
12                                 CkCallback ready, CkCallback complete,
13                                 Options opts) {
14       thisProxy[0].prepareOutput_central(name, bytes, ready, complete, opts);
15     }
16
17     void Manager::write(Token token, const char *data, size_t bytes, size_t offset) {
18       Options &opts = files[token].opts;
19       do {
20         size_t stripeIndex = offset / opts.peStripe;
21         int peIndex = stripeIndex % opts.activePEs;
22         int pe = opts.basePE + peIndex * opts.skipPEs;
23         size_t bytesToSend = std::min(bytes, opts.peStripe - offset % opts.peStripe);
24         thisProxy[pe].write_forwardData(token, data, bytesToSend, offset);
25         data += bytesToSend;
26         offset += bytesToSend;
27         bytes -= bytesToSend;
28       } while (bytes > 0);
29     }
30
31     void Manager::write_forwardData(Token token, const char *data, size_t bytes,
32                                     size_t offset) {
33       //files[token].bufferMap[(offset/stripeSize)*stripeSize] is the buffer to which this char should write to.
34       CkAssert(offset + bytes <= files[token].bytes);
35       // XXX: CkAssert(this is the right processor to receive this data)
36
37       size_t stripeSize = files[token].opts.peStripe;   
38       while(bytes > 0)
39       {
40         size_t stripeOffset = (offset/stripeSize)*stripeSize;
41         size_t expectedBufferSize = std::min(files[token].bytes - stripeOffset, stripeSize);
42         struct buffer & currentBuffer = files[token].bufferMap[stripeOffset];
43         size_t bytesInCurrentStripe = std::min(expectedBufferSize - offset%stripeSize, bytes);
44
45         //check if buffer this element already exists in map. If not, insert and resize buffer to stripe size
46         currentBuffer.expect(expectedBufferSize);
47
48         currentBuffer.insertData(data, bytesInCurrentStripe, offset % stripeSize);
49
50         // Ready to flush?
51         if(currentBuffer.isFull()) {
52           //initializa params
53           int l = currentBuffer.bytes_filled_so_far;
54           char *d = &(currentBuffer.array[0]);
55           size_t bufferOffset = stripeOffset;
56           //write to file loop
57           while (l > 0) {
58             ssize_t ret = pwrite(files[token].fd, d, l, bufferOffset);
59             if (ret < 0)
60               if (errno == EINTR)
61                 continue;
62               else {
63                 CkPrintf("Output failed on PE %d: %s", CkMyPe(), strerror(errno));
64                 CkAbort("Giving up");
65               }
66             l -= ret;
67             d += ret;
68             bufferOffset += ret;
69           }
70           //write complete - remove this element from bufferMap and call dataWritten
71           thisProxy[0].write_dataWritten(token, currentBuffer.bytes_filled_so_far);
72           files[token].bufferMap.erase(stripeOffset);
73         }
74
75         bytes -= bytesInCurrentStripe;
76         data += bytesInCurrentStripe;
77         offset += bytesInCurrentStripe;
78       }
79     }
80
81     void Manager::write_dataWritten(Token token, size_t bytes) {
82       CkAssert(CkMyPe() == 0);
83
84       files[token].total_written += bytes;
85
86       if (files[token].total_written == files[token].bytes)
87         files[token].complete.send();
88     }
89
90     void Manager::prepareInput(const char *name, CkCallback ready, Options opts) {
91       CkAbort("not yet implemented");
92     }
93
94     void Manager::read(Token token, void *data, size_t bytes, size_t offset,
95                        CkCallback complete) {
96       CkAbort("not yet implemented");
97     }
98   }
99 }
100
101 #include "CkIO.def.h"