Merge branch 'charm' of charmgit:charm into charm
[charm.git] / examples / charm++ / pupDisk / pupDisk.C
1 ////////////////////////////////////
2 //
3 //  pupDisk.C
4 //
5 //  Definition of chares in pupDisk
6 //
7 //  Author: Eric Bohm
8 //  Date: 2012/01/23
9 //
10 ////////////////////////////////////
11
12 #include "pupDisk.h"
13 CkCallback icb, rcb, wcb, vcb;
14 CProxy_userData userDataProxy;
15 CProxy_pupDisk pupDiskProxy;
16 int numElementsPer;
17 main::main(CkArgMsg *m)
18 {
19
20   int numElements=10;
21   int size=20;
22   bool skipToRead=false;
23   int maxFiles=CkNumPes();
24   if(m->argc>1)
25     numElements=atoi(m->argv[1]);
26   if(m->argc>2)
27     size=atoi(m->argv[2]);
28   if(m->argc>3)
29     maxFiles=atoi(m->argv[3]);
30   if(m->argc>4)
31     skipToRead=(m->argv[4][0]=='r');
32   delete m;
33   if(numElements/maxFiles<=0)
34     CkAbort("This works better with more elements than files");
35   //rejigger their choices, possibly reducing the number of files below max
36   numElementsPer=numElements/maxFiles;
37   if(numElements%maxFiles>0) ++numElementsPer;
38   maxFiles=numElements/numElementsPer;
39   if(numElements%numElementsPer) ++maxFiles;
40   CkPrintf("pupDisk numElements %d howBig %d maxFiles %d skip %d elements per file %d\n", numElements, size, maxFiles, skipToRead, numElementsPer);
41   icb = CkCallback(CkIndex_main::initialized(NULL),  thishandle);
42   wcb = CkCallback(CkIndex_main::written(NULL),  thishandle);
43   rcb = CkCallback(CkIndex_main::read(NULL),  thishandle);
44   vcb = CkCallback(CkIndex_main::done(NULL),  thishandle);
45   CProxy_pupDiskMap diskMapProxy = CProxy_pupDiskMap::ckNew(maxFiles);
46   CkArrayOptions mapOptions(maxFiles);
47   mapOptions.setMap(diskMapProxy);
48   pupDiskProxy= CProxy_pupDisk::ckNew(size,numElements,maxFiles,mapOptions);
49   pupDiskProxy.doneInserting();
50   userDataProxy= CProxy_userData::ckNew(size,numElements,maxFiles, numElements);
51   userDataProxy.doneInserting();
52   if(skipToRead)
53     {
54       CkPrintf("reading data\n");
55       userDataProxy.read();
56     }
57   else
58     {
59       userDataProxy.init();
60     }
61 }
62
63
64
65 void main::initialized(CkReductionMsg *m)
66   {
67     CkPrintf("writing data\n");
68     userDataProxy.write();
69   }
70 void main::written(CkReductionMsg *m)
71   {
72     CkPrintf("reading data\n");
73     userDataProxy.read();
74   }
75 void main::read(CkReductionMsg *m)
76   {
77     CkPrintf("verifying data\n");
78     userDataProxy.verify();
79   }
80
81 void userData::init(){
82   CkAssert(myData); 
83   for (int i=0;i<howBig;++i) myData->data[i]=thisIndex;
84   contribute(sizeof(int), &thisIndex, CkReduction::sum_int, icb);
85 }
86
87 void userData::verify(){
88   CkAssert(myData); 
89   for (int i=0;i<howBig;++i) 
90     if(myData->data[i]!=thisIndex){
91       CkPrintf("[%d] element %d corrupt as %d\n", 
92                thisIndex, i, myData->data[i]);
93       CkAbort("corrupt element");
94     }
95   CkPrintf("[%d] verified\n",thisIndex);
96   contribute(sizeof(int), &thisIndex, CkReduction::sum_int, vcb);
97 }
98
99 void userData::write()
100 {
101   
102   int fileNum = thisIndex/numElementsPer;
103   //  CkPrintf("[%d] userData write to file %d\n",thisIndex,fileNum);
104   pupDiskProxy[fileNum].write(thisIndex, *myData);
105 }
106
107 void userData::read()
108 {
109   int fileNum = thisIndex/numElementsPer;
110   pupDiskProxy[fileNum].read(thisIndex);
111 }
112
113 void userData::acceptData(someData &inData){
114   for(int i=0; i<howBig; ++i) myData->data[i]=inData.data[i];
115   contribute(sizeof(int), &thisIndex, CkReduction::sum_int, rcb);
116 }
117
118 pupDisk::pupDisk(int _howbig, int _numElements, int _maxFiles): howBig(_howbig), numElements(_numElements), maxFiles(_maxFiles)
119   { elementsToWrite=numElementsPer; 
120     if(thisIndex==maxFiles-1 && numElements%numElementsPer>0) elementsToWrite=numElements%numElementsPer; 
121     dataCache=new someData[elementsToWrite]; 
122     count=0; 
123     nextSlot=0; 
124     //    CkPrintf("[%d] pupDisk constructed expecting elementsToWrite %d for / %d and %% %d\n",thisIndex, elementsToWrite, numElements/maxFiles, numElements%maxFiles);
125   }
126
127
128
129 void pupDisk::read(int sender)
130 {
131   if(diskRead(sender))
132     {
133       // the ugly verbose syntax for extracting what you want from an STL map
134       // never fails to annoy me.
135       int offset=(*lookupIdx.find(sender)).second;
136       userDataProxy[sender].acceptData(dataCache[offset]);
137     }
138 }
139
140 bool pupDisk::diskRead(int sender)
141 {
142   if(!doneRead)
143     {
144       // get stuff from disk
145
146       // a more complicated caching scheme could pull less than the
147       // entire file and use a per entry flag system to track what is
148       // in cache.
149       doneRead=true;      
150       //      CkPrintf("[%d] reading from file for %d\n",thisIndex, sender);
151       char *d = new char[512];
152       sprintf(d, "%s.%d.%d.%d", "diskfile", numElements, howBig, thisIndex);
153       FILE *f = fopen(d,"r");
154       if (f == NULL) {
155         CkPrintf("[%d] Open failed with %s. \n", CkMyPe(), d);
156         CkAbort("\n");
157       }
158       // A simple scheme would require the user be consistent in their
159       // parameter choices across executions.  A more elaborate scheme
160       // codifies them in a block so the reader can do a lookup for
161       // the parameters used during writing.
162       PUP::fromDisk pd(f);
163       PUP::machineInfo machInfor;
164       pd((char *)&machInfor, sizeof(machInfor));       // machine info
165       if (!machInfor.valid()) {
166         CkPrintf("Invalid machineInfo on disk file when reading %d!\n", thisIndex);
167         CkAbort("");
168       }
169       PUP::xlater p(machInfor, pd);
170       int elementsToWriteFile;
171       p|elementsToWriteFile;
172       // safety check, for some formats you might be able to adjust
173       // properly if the file's parameters disagree from your instance's.
174       // This implementation is not that smart.
175       if(elementsToWriteFile==elementsToWrite)
176         {
177           p|lookupIdx;
178           someData input;
179           for(int i=0;i<elementsToWrite;++i)
180             {
181               dataCache[i].pup(p);
182             }
183         }
184       else
185         {
186           CkAbort("a pox upon your file format");
187         }
188       fclose(f);
189       delete [] d;
190     }
191   return doneRead;
192 }
193
194 void pupDisk::write(int sender, someData &inData)
195 {
196   //  CkPrintf("[%d] pupDisk write for sender %d with count %d of elementsToWrite %d\n",thisIndex, sender, count, elementsToWrite);
197   lookupIdx[sender]=nextSlot;
198   dataCache[nextSlot++]=inData;
199   if(++count==elementsToWrite) 
200     diskWrite();
201
202 }
203
204 void pupDisk::diskWrite()
205 {
206   //  CkPrintf("[%d] writing to file\n",thisIndex);
207   char *d = new char[512];
208   sprintf(d, "%s.%d.%d.%d", "diskfile", numElements, howBig, thisIndex);
209   FILE *f;
210   struct stat sb;
211   // a+ will force appending, which is not what we want
212   if(stat(d,&sb)==-1){
213       f = fopen(d,"w");  
214   }
215   else
216     {
217       f = fopen(d,"r+");
218     }
219   if (f == NULL) {
220     CkPrintf("[%d] Open for writing failed with %s \n", CkMyPe(), d);
221     CkAbort("\n");
222   }
223   PUP::toDisk p(f);
224   const PUP::machineInfo &machInfow = PUP::machineInfo::current();
225   //  CkPrintf("[%d] writing machineInfo %d bytes\n",thisIndex,sizeof(machInfow));
226   p((char *)&machInfow, sizeof(machInfow));       // machine info
227   if(!machInfow.valid())
228     {
229       CkPrintf("Invalid machineInfo on disk file when writing %d!\n", thisIndex);
230       CkAbort("");
231     }
232   p|elementsToWrite;
233   p|lookupIdx;
234   for(int i=0; i<elementsToWrite;i++)
235     dataCache[i].pup(p);
236   fflush(f);
237   fclose(f);
238   contribute(sizeof(int), &thisIndex, CkReduction::sum_int, wcb);
239   delete [] d;
240 }
241
242
243
244 #include "pupDisk.def.h"