441c9e431c1b9d2ded1c6e6b26a9e81f69426518
[charm.git] / src / libs / ck-libs / io / ckio.C
1 #include <ckio.h>
2 #include <errno.h>
3 #include <algorithm>
4
5
6 namespace Ck { namespace IO {
7     Manager::Manager() : nextToken(0) {
8       run();
9     }
10
11     void Manager::prepareOutput(const char *name, size_t bytes,
12                                 CkCallback ready, CkCallback complete,
13                                 Options opts) {
14       thisProxy[0].prepareOutput_central(name, bytes, ready, complete, opts);
15     }
16
17     void Manager::write(Token token, const char *data, size_t bytes, size_t offset) {
18       Options &opts = files[token].opts;
19       do {
20         size_t stripe = offset / opts.peStripe;
21         int pe = opts.basePE + stripe * opts.skipPEs;
22         size_t bytesToSend = std::min(bytes, opts.peStripe - offset % opts.peStripe);
23         thisProxy[pe].write_forwardData(token, data, bytesToSend, offset);
24         data += bytesToSend;
25         offset += bytesToSend;
26         bytes -= bytesToSend;
27       } while (bytes > 0);
28     }
29
30     void Manager::write_forwardData(Token token, const char *data, size_t bytes,
31                                     size_t offset) {
32       //files[token].bufferMap[(offset/stripeSize)*stripeSize] is the buffer to which this char should write to.
33       CkAssert(offset + bytes <= files[token].bytes);
34       // XXX: CkAssert(this is the right processor to receive this data)
35
36       size_t stripeSize = files[token].opts.peStripe;   
37       while(bytes > 0)
38       {
39         size_t stripeOffset = (offset/stripeSize)*stripeSize;
40         size_t expectedBufferSize = std::min(files[token].bytes - stripeOffset, stripeSize);
41         struct buffer & currentBuffer = files[token].bufferMap[stripeOffset];
42         size_t bytesInCurrentStripe = std::min(expectedBufferSize - offset%stripeSize, bytes);
43
44         //check if buffer this element already exists in map. If not, insert and resize buffer to stripe size
45         currentBuffer.expect(expectedBufferSize);
46
47         currentBuffer.insertData(data, bytesInCurrentStripe, offset % stripeSize);
48
49         // Ready to flush?
50         if(currentBuffer.isFull()) {
51           //initializa params
52           int l = currentBuffer.bytes_filled_so_far;
53           char *d = &(currentBuffer.array[0]);
54           size_t bufferOffset = stripeOffset;
55           //write to file loop
56           while (l > 0) {
57             ssize_t ret = pwrite(files[token].fd, d, l, bufferOffset);
58             if (ret < 0)
59               if (errno == EINTR)
60                 continue;
61               else {
62                 CkPrintf("Output failed on PE %d: %s", CkMyPe(), strerror(errno));
63                 CkAbort("Giving up");
64               }
65             l -= ret;
66             d += ret;
67             bufferOffset += ret;
68           }
69           //write complete - remove this element from bufferMap and call dataWritten
70           thisProxy[0].write_dataWritten(token, currentBuffer.bytes_filled_so_far);
71           files[token].bufferMap.erase(stripeOffset);
72         }
73
74         bytes -= bytesInCurrentStripe;
75         data += bytesInCurrentStripe;
76         offset += bytesInCurrentStripe;
77       }
78     }
79
80     void Manager::write_dataWritten(Token token, size_t bytes) {
81       CkAssert(CkMyPe() == 0);
82
83       files[token].total_written += bytes;
84
85       if (files[token].total_written == files[token].bytes)
86         files[token].complete.send();
87     }
88
89     void Manager::prepareInput(const char *name, CkCallback ready, Options opts) {
90       CkAbort("not yet implemented");
91     }
92
93     void Manager::read(Token token, void *data, size_t bytes, size_t offset,
94                        CkCallback complete) {
95       CkAbort("not yet implemented");
96     }
97   }
98 }
99
100 #include "CkIO.def.h"