working version. needs small amount of placing tweaking to put
[charm.git] / examples / charm++ / pupDisk / pupDisk.C
1 ////////////////////////////////////
2 //
3 //  pupDisk.C
4 //
5 //  Definition of chares in pupDisk
6 //
7 //  Author: Eric Bohm
8 //  Date: 2012/01/23
9 //
10 ////////////////////////////////////
11
12 #include "pupDisk.h"
13 CkCallback icb, rcb, wcb, vcb;
14 CProxy_userData userDataProxy;
15 CProxy_pupDisk pupDiskProxy;
16 int numElementsPer;
17 main::main(CkArgMsg *m)
18 {
19
20   int numElements=10;
21   int size=20;
22   bool skipToRead=false;
23   int maxFiles=CkNumPes();
24   if(m->argc>1)
25     numElements=atoi(m->argv[1]);
26   if(m->argc>2)
27     size=atoi(m->argv[2]);
28   if(m->argc>3)
29     maxFiles=atoi(m->argv[3]);
30   if(m->argc>4)
31     skipToRead=(m->argv[4][0]=='r');
32   delete m;
33   if(numElements/maxFiles<=0)
34     CkAbort("This works better with more elements than files");
35   //rejigger their choices, possibly reducing the number of files below max
36   numElementsPer=numElements/maxFiles;
37   if(numElements%maxFiles>0) ++numElementsPer;
38   maxFiles=numElements/numElementsPer;
39   if(numElements%numElementsPer) ++maxFiles;
40   CkPrintf("pupDisk numElements %d howBig %d maxFiles %d skip %d elements per file %d\n", numElements, size, maxFiles, skipToRead, numElementsPer);
41   icb = CkCallback(CkIndex_main::initialized(NULL),  thishandle);
42   wcb = CkCallback(CkIndex_main::written(NULL),  thishandle);
43   rcb = CkCallback(CkIndex_main::read(NULL),  thishandle);
44   vcb = CkCallback(CkIndex_main::done(NULL),  thishandle);
45   pupDiskProxy= CProxy_pupDisk::ckNew(size,numElements,maxFiles, maxFiles);
46   pupDiskProxy.doneInserting();
47   userDataProxy= CProxy_userData::ckNew(size,numElements,maxFiles, numElements);
48   userDataProxy.doneInserting();
49   if(skipToRead)
50     {
51       CkPrintf("reading data\n");
52       userDataProxy.read();
53     }
54   else
55     {
56       userDataProxy.init();
57     }
58 }
59
60
61
62 void main::initialized(CkReductionMsg *m)
63   {
64     CkPrintf("writing data\n");
65     userDataProxy.write();
66   }
67 void main::written(CkReductionMsg *m)
68   {
69     CkPrintf("reading data\n");
70     userDataProxy.read();
71   }
72 void main::read(CkReductionMsg *m)
73   {
74     CkPrintf("verifying data\n");
75     userDataProxy.verify();
76   }
77
78 void userData::init(){
79   CkAssert(myData); 
80   for (int i=0;i<howBig;++i) myData->data[i]=thisIndex;
81   contribute(sizeof(int), &thisIndex, CkReduction::sum_int, icb);
82 }
83
84 void userData::verify(){
85   CkAssert(myData); 
86   for (int i=0;i<howBig;++i) 
87     if(myData->data[i]!=thisIndex){
88       CkPrintf("[%d] element %d corrupt as %d\n", 
89                thisIndex, i, myData->data[i]);
90       CkAbort("corrupt element");
91     }
92   CkPrintf("[%d] verified\n",thisIndex);
93   contribute(sizeof(int), &thisIndex, CkReduction::sum_int, vcb);
94 }
95
96 void userData::write()
97 {
98   
99   int fileNum = thisIndex/numElementsPer;
100   //  CkPrintf("[%d] userData write to file %d\n",thisIndex,fileNum);
101   pupDiskProxy[fileNum].write(thisIndex, *myData);
102 }
103
104 void userData::read()
105 {
106   int fileNum = thisIndex/numElementsPer;
107   pupDiskProxy[fileNum].read(thisIndex);
108 }
109
110 void userData::acceptData(someData &inData){
111   for(int i=0; i<howBig; ++i) myData->data[i]=inData.data[i];
112   contribute(sizeof(int), &thisIndex, CkReduction::sum_int, rcb);
113 }
114
115 pupDisk::pupDisk(int _howbig, int _numElements, int _maxFiles): howBig(_howbig), numElements(_numElements), maxFiles(_maxFiles)
116   { elementsToWrite=numElementsPer; 
117     if(thisIndex==maxFiles-1 && numElements%numElementsPer>0) elementsToWrite=numElements%numElementsPer; 
118     dataCache=new someData[elementsToWrite]; 
119     count=0; 
120     nextSlot=0; 
121     //    CkPrintf("[%d] pupDisk constructed expecting elementsToWrite %d for / %d and %% %d\n",thisIndex, elementsToWrite, numElements/maxFiles, numElements%maxFiles);
122   }
123
124
125
126 void pupDisk::read(int sender)
127 {
128   if(diskRead(sender))
129     {
130       // the ugly verbose syntax for extracting what you want from an STL map
131       // never fails to annoy me.
132       int offset=(*lookupIdx.find(sender)).second;
133       userDataProxy[sender].acceptData(dataCache[offset]);
134     }
135 }
136
137 bool pupDisk::diskRead(int sender)
138 {
139   if(!doneRead)
140     {
141       // get stuff from disk
142
143       // a more complicated caching scheme could pull less than the
144       // entire file and use a per entry flag system to track what is
145       // in cache.
146       doneRead=true;      
147       //      CkPrintf("[%d] reading from file for %d\n",thisIndex, sender);
148       char *d = new char[512];
149       sprintf(d, "%s.%d.%d.%d", "diskfile", numElements, howBig, thisIndex);
150       FILE *f = fopen(d,"r");
151       if (f == NULL) {
152         CkPrintf("[%d] Open failed with %s. \n", CkMyPe(), d);
153         CkAbort("\n");
154       }
155       // A simple scheme would require the user be consistent in their
156       // parameter choices across executions.  A more elaborate scheme
157       // codifies them in a block so the reader can do a lookup for
158       // the parameters used during writing.
159       PUP::fromDisk pd(f);
160       PUP::machineInfo machInfor;
161       pd((char *)&machInfor, sizeof(machInfor));       // machine info
162       if (!machInfor.valid()) {
163         CkPrintf("Invalid machineInfo on disk file when reading %d!\n", thisIndex);
164         CkAbort("");
165       }
166       PUP::xlater p(machInfor, pd);
167       int elementsToWriteFile;
168       p|elementsToWriteFile;
169       // safety check, for some formats you might be able to adjust
170       // properly if the file's parameters disagree from your instance's.
171       // This implementation is not that smart.
172       if(elementsToWriteFile==elementsToWrite)
173         {
174           p|lookupIdx;
175           someData input;
176           for(int i=0;i<elementsToWrite;++i)
177             {
178               dataCache[i].pup(p);
179             }
180         }
181       else
182         {
183           CkAbort("a pox upon your file format");
184         }
185       fclose(f);
186       delete [] d;
187     }
188   return doneRead;
189 }
190
191 void pupDisk::write(int sender, someData &inData)
192 {
193   //  CkPrintf("[%d] pupDisk write for sender %d with count %d of elementsToWrite %d\n",thisIndex, sender, count, elementsToWrite);
194   lookupIdx[sender]=nextSlot;
195   dataCache[nextSlot++]=inData;
196   if(++count==elementsToWrite) 
197     diskWrite();
198
199 }
200
201 void pupDisk::diskWrite()
202 {
203   //  CkPrintf("[%d] writing to file\n",thisIndex);
204   char *d = new char[512];
205   sprintf(d, "%s.%d.%d.%d", "diskfile", numElements, howBig, thisIndex);
206   FILE *f;
207   struct stat sb;
208   // a+ will force appending, which is not what we want
209   if(stat(d,&sb)==-1){
210       f = fopen(d,"w");  
211   }
212   else
213     {
214       f = fopen(d,"r+");
215     }
216   if (f == NULL) {
217     CkPrintf("[%d] Open for writing failed with %s \n", CkMyPe(), d);
218     CkAbort("\n");
219   }
220   PUP::toDisk p(f);
221   const PUP::machineInfo &machInfow = PUP::machineInfo::current();
222   //  CkPrintf("[%d] writing machineInfo %d bytes\n",thisIndex,sizeof(machInfow));
223   p((char *)&machInfow, sizeof(machInfow));       // machine info
224   if(!machInfow.valid())
225     {
226       CkPrintf("Invalid machineInfo on disk file when writing %d!\n", thisIndex);
227       CkAbort("");
228     }
229   p|elementsToWrite;
230   p|lookupIdx;
231   for(int i=0; i<elementsToWrite;i++)
232     dataCache[i].pup(p);
233   fflush(f);
234   fclose(f);
235   contribute(sizeof(int), &thisIndex, CkReduction::sum_int, wcb);
236   delete [] d;
237 }
238
239
240
241 #include "pupDisk.def.h"