init maxfiles to number of physnodes
[charm.git] / examples / charm++ / pupDisk / pupDisk.C
1 ////////////////////////////////////
2 //
3 //  pupDisk.C
4 //
5 //  Definition of chares in pupDisk
6 //
7 //  Author: Eric Bohm
8 //  Date: 2012/01/23
9 //
10 ////////////////////////////////////
11
12 #include "pupDisk.h"
13 CkCallback icb, rcb, wcb, vcb;
14 CProxy_userData userDataProxy;
15 CProxy_pupDisk pupDiskProxy;
16 int numElementsPer;
17 main::main(CkArgMsg *m)
18 {
19
20   int numElements=10;
21   int size=20;
22   bool skipToRead=false;
23   int maxFiles=CkNumPes();
24   if(CmiCpuTopologyEnabled())
25     {
26       maxFiles=CmiNumPhysicalNodes();
27     }
28   if(m->argc>1)
29     numElements=atoi(m->argv[1]);
30   if(m->argc>2)
31     size=atoi(m->argv[2]);
32   if(m->argc>3)
33     maxFiles=atoi(m->argv[3]);
34   if(m->argc>4)
35     skipToRead=(m->argv[4][0]=='r');
36   delete m;
37   if(numElements/maxFiles<=0)
38     CkAbort("This works better with more elements than files");
39   //rejigger their choices, possibly reducing the number of files below max
40   numElementsPer=numElements/maxFiles;
41   if(numElements%maxFiles>0) ++numElementsPer;
42   maxFiles=numElements/numElementsPer;
43   if(numElements%numElementsPer) ++maxFiles;
44   CkPrintf("pupDisk numElements %d howBig %d maxFiles %d skip %d elements per file %d\n", numElements, size, maxFiles, skipToRead, numElementsPer);
45   icb = CkCallback(CkIndex_main::initialized(NULL),  thishandle);
46   wcb = CkCallback(CkIndex_main::written(NULL),  thishandle);
47   rcb = CkCallback(CkIndex_main::read(NULL),  thishandle);
48   vcb = CkCallback(CkIndex_main::done(NULL),  thishandle);
49   CProxy_pupDiskMap diskMapProxy = CProxy_pupDiskMap::ckNew(maxFiles);
50   CkArrayOptions mapOptions(maxFiles);
51   mapOptions.setMap(diskMapProxy);
52   pupDiskProxy= CProxy_pupDisk::ckNew(size,numElements,maxFiles,mapOptions);
53   pupDiskProxy.doneInserting();
54   userDataProxy= CProxy_userData::ckNew(size,numElements,maxFiles, numElements);
55   userDataProxy.doneInserting();
56   if(skipToRead)
57     {
58       CkPrintf("reading data\n");
59       userDataProxy.read();
60     }
61   else
62     {
63       userDataProxy.init();
64     }
65 }
66
67
68
69 void main::initialized(CkReductionMsg *m)
70   {
71     CkPrintf("writing data\n");
72     userDataProxy.write();
73   }
74 void main::written(CkReductionMsg *m)
75   {
76     CkPrintf("reading data\n");
77     userDataProxy.read();
78   }
79 void main::read(CkReductionMsg *m)
80   {
81     CkPrintf("verifying data\n");
82     userDataProxy.verify();
83   }
84
85 void userData::init(){
86   CkAssert(myData); 
87   for (int i=0;i<howBig;++i) myData->data[i]=thisIndex;
88   contribute(sizeof(int), &thisIndex, CkReduction::sum_int, icb);
89 }
90
91 void userData::verify(){
92   CkAssert(myData); 
93   for (int i=0;i<howBig;++i) 
94     if(myData->data[i]!=thisIndex){
95       CkPrintf("[%d] element %d corrupt as %d\n", 
96                thisIndex, i, myData->data[i]);
97       CkAbort("corrupt element");
98     }
99   CkPrintf("[%d] verified\n",thisIndex);
100   contribute(sizeof(int), &thisIndex, CkReduction::sum_int, vcb);
101 }
102
103 void userData::write()
104 {
105   
106   int fileNum = thisIndex/numElementsPer;
107   //  CkPrintf("[%d] userData write to file %d\n",thisIndex,fileNum);
108   pupDiskProxy[fileNum].write(thisIndex, *myData);
109 }
110
111 void userData::read()
112 {
113   int fileNum = thisIndex/numElementsPer;
114   pupDiskProxy[fileNum].read(thisIndex);
115 }
116
117 void userData::acceptData(someData &inData){
118   for(int i=0; i<howBig; ++i) myData->data[i]=inData.data[i];
119   contribute(sizeof(int), &thisIndex, CkReduction::sum_int, rcb);
120 }
121
122 pupDisk::pupDisk(int _howbig, int _numElements, int _maxFiles): howBig(_howbig), numElements(_numElements), maxFiles(_maxFiles)
123   { elementsToWrite=numElementsPer; 
124     if(thisIndex==maxFiles-1 && numElements%numElementsPer>0) elementsToWrite=numElements%numElementsPer; 
125     dataCache=new someData[elementsToWrite]; 
126     count=0; 
127     nextSlot=0; 
128     //    CkPrintf("[%d] pupDisk constructed expecting elementsToWrite %d for / %d and %% %d\n",thisIndex, elementsToWrite, numElements/maxFiles, numElements%maxFiles);
129   }
130
131
132
133 void pupDisk::read(int sender)
134 {
135   if(diskRead(sender))
136     {
137       // the ugly verbose syntax for extracting what you want from an STL map
138       // never fails to annoy me.
139       int offset=(*lookupIdx.find(sender)).second;
140       userDataProxy[sender].acceptData(dataCache[offset]);
141     }
142 }
143
144 bool pupDisk::diskRead(int sender)
145 {
146   if(!doneRead)
147     {
148       // get stuff from disk
149
150       // a more complicated caching scheme could pull less than the
151       // entire file and use a per entry flag system to track what is
152       // in cache.
153       doneRead=true;      
154       //      CkPrintf("[%d] reading from file for %d\n",thisIndex, sender);
155       char *d = new char[512];
156       sprintf(d, "%s.%d.%d.%d", "diskfile", numElements, howBig, thisIndex);
157       FILE *f = fopen(d,"r");
158       if (f == NULL) {
159         CkPrintf("[%d] Open failed with %s. \n", CkMyPe(), d);
160         CkAbort("\n");
161       }
162       // A simple scheme would require the user be consistent in their
163       // parameter choices across executions.  A more elaborate scheme
164       // codifies them in a block so the reader can do a lookup for
165       // the parameters used during writing.
166       PUP::fromDisk pd(f);
167       PUP::machineInfo machInfor;
168       pd((char *)&machInfor, sizeof(machInfor));       // machine info
169       if (!machInfor.valid()) {
170         CkPrintf("Invalid machineInfo on disk file when reading %d!\n", thisIndex);
171         CkAbort("");
172       }
173       PUP::xlater p(machInfor, pd);
174       int elementsToWriteFile;
175       p|elementsToWriteFile;
176       // safety check, for some formats you might be able to adjust
177       // properly if the file's parameters disagree from your instance's.
178       // This implementation is not that smart.
179       if(elementsToWriteFile==elementsToWrite)
180         {
181           p|lookupIdx;
182           someData input;
183           for(int i=0;i<elementsToWrite;++i)
184             {
185               dataCache[i].pup(p);
186             }
187         }
188       else
189         {
190           CkAbort("a pox upon your file format");
191         }
192       fclose(f);
193       delete [] d;
194     }
195   return doneRead;
196 }
197
198 void pupDisk::write(int sender, someData &inData)
199 {
200   //  CkPrintf("[%d] pupDisk write for sender %d with count %d of elementsToWrite %d\n",thisIndex, sender, count, elementsToWrite);
201   lookupIdx[sender]=nextSlot;
202   dataCache[nextSlot++]=inData;
203   if(++count==elementsToWrite) 
204     diskWrite();
205
206 }
207
208 void pupDisk::diskWrite()
209 {
210   //  CkPrintf("[%d] writing to file\n",thisIndex);
211   char *d = new char[512];
212   sprintf(d, "%s.%d.%d.%d", "diskfile", numElements, howBig, thisIndex);
213   FILE *f;
214   struct stat sb;
215   // a+ will force appending, which is not what we want
216   if(stat(d,&sb)==-1){
217       f = fopen(d,"w");  
218   }
219   else
220     {
221       f = fopen(d,"r+");
222     }
223   if (f == NULL) {
224     CkPrintf("[%d] Open for writing failed with %s \n", CkMyPe(), d);
225     CkAbort("\n");
226   }
227   PUP::toDisk p(f);
228   const PUP::machineInfo &machInfow = PUP::machineInfo::current();
229   //  CkPrintf("[%d] writing machineInfo %d bytes\n",thisIndex,sizeof(machInfow));
230   p((char *)&machInfow, sizeof(machInfow));       // machine info
231   if(!machInfow.valid())
232     {
233       CkPrintf("Invalid machineInfo on disk file when writing %d!\n", thisIndex);
234       CkAbort("");
235     }
236   p|elementsToWrite;
237   p|lookupIdx;
238   for(int i=0; i<elementsToWrite;i++)
239     dataCache[i].pup(p);
240   fflush(f);
241   fclose(f);
242   contribute(sizeof(int), &thisIndex, CkReduction::sum_int, wcb);
243   delete [] d;
244 }
245
246
247
248 #include "pupDisk.def.h"