c7c9d254dd6930a8b749084ba1bbb72d820c4962
[charm.git] / examples / charm++ / nodeHelper / fft-trans / fft1d.C
1 #include "fft1d.decl.h"
2 #include <fftw3.h>
3 #include <limits>
4 #include "fileio.h"
5 #include "NodeHelperAPI.h"
6
7 #define TWOPI 6.283185307179586
8
9 /*readonly*/ CProxy_Main mainProxy;
10 /*readonly*/ int numChunks;
11 /*readonly*/ int numTasks;
12 /*readonly*/ uint64_t N;
13 static CmiNodeLock fft_plan_lock;
14 #include "fftmacro.h"
15
16 CProxy_FuncNodeHelper nodeHelperProxy;
17 /** called by initnode once per node to support node level locking for
18     fftw plan create/destroy operations */
19 #define NODEHELPER_MODE NODEHELPER_STATIC 
20
21 extern "C" void doCalc(int first,int last, void *result, int paramNum, void * param)
22 {
23   //result=first;
24   for(int i=first; i<=last; i++)
25     fft_execute(((fft_plan*)param)[i]);
26 }
27
28 void initplanlock ()
29
30 {
31   fft_plan_lock=CmiCreateLock();
32 }
33
34 struct fftMsg : public CMessage_fftMsg {
35   int source;
36   fft_complex *data;
37 };
38
39 struct Main : public CBase_Main {
40   double start;
41   CProxy_fft fftProxy;
42
43   Main(CkArgMsg* m) {
44     numChunks = atoi(m->argv[1]); //#1D partitions
45     N = atol(m->argv[2]); //matrix size
46     if(m->argc>=4)
47       numTasks = atol(m->argv[3]); //the number of tasks that 1D partition is splitted into
48     else
49       numTasks = CmiMyNodeSize();  //default to 1/core
50     delete m;
51     
52     mainProxy = thisProxy;
53
54     /* how to nodify this computation? */
55     /* We make one block alloc per chare and divide the work evenly
56        across the number of threads.
57      * cache locality issues... 
58
59      *       The NodeHelper scheme presents a problem in cache
60      *       ignorance.  We push tasks into the queue as the remote
61      *       message dependencies are met, however the execution of
62      *       dequeued tasks performance will have significant cache
63      *       limitations obfuscated to our scheduler.  Our helper
64      *       threads will block while fetching data into cache local
65      *       to the thread.  If we only have 1 thread per core, we
66      *       have no way to self overlap those operations.  This
67      *       implies that there are probably conditions under which
68      *       more than one nodehelper thread per core will result in
69      *       better performance.  A natural sweet spot for these
70      *       should be explored in the SMT case where one thread per
71      *       SMT will allow for natural overlap of execution based on
72      *       cache availability, as controlled by the OS without
73      *       additional pthread context switching overhead.  A further
74      *       runtime based virtualized overthreading may provide
75      *       further benefits depending on thread overhead.
76      */
77     if (N % numChunks != 0)
78       CkAbort("numChunks not a factor of N\n");
79
80     // Construct an array of fft chares to do the calculation
81     fftProxy = CProxy_fft::ckNew(numChunks);
82
83     // Construct a nodehelper to do the calculation
84     //nodeHelperProxy = NodeHelper_Init(NODEHELPER_MODE, numTasks);
85     nodeHelperProxy = NodeHelper_Init();
86     
87     CkStartQD(CkIndex_Main::initDone((CkQdMsg *)0), &thishandle);
88   }
89
90   void initDone(CkQdMsg *msg){
91     delete msg;
92     startFFT();
93   }
94   
95   void startFFT() {
96     start = CkWallTimer();
97     // Broadcast the 'go' signal to the fft chare array
98     fftProxy.doFFT();
99   }
100
101   void doneFFT() {
102     double time = CkWallTimer() - start;
103     double gflops = 5 * (double)N*N * log2((double)N*N) / (time * 1000000000);
104     CkPrintf("chares: %d\ncores: %d\nTasks: %d\nsize: %ld\ntime: %f sec\nrate: %f GFlop/s\n",
105              numChunks, CkNumPes(), numTasks, N*N, time, gflops);
106
107     fftProxy.initValidation();
108   }
109
110   void printResidual(realType r) {
111     CkPrintf("residual = %g\n", r);
112     CkExit();
113   }
114
115 };
116
117 struct fft : public CBase_fft {
118   fft_SDAG_CODE
119
120   int iteration, count;
121   uint64_t n;
122   fft_plan *plan;
123   fft_plan p1;
124   fftMsg **msgs;
125   fft_complex *in, *out;
126   bool validating;
127   int nPerThread;
128   fft() {
129     __sdag_init();
130
131     validating = false;
132
133     n = N*N/numChunks;
134
135     in = (fft_complex*) fft_malloc(sizeof(fft_complex) * n);
136     out = (fft_complex*) fft_malloc(sizeof(fft_complex) * n);
137     nPerThread= n/numTasks;
138     
139     int length[] = {N};
140     
141     /** Basically, we want to parallelize the following fftw function call
142      * which is to do fft on each row of a 2D array with #rows=N/numChunks, #cols=N;
143      * 1. create a plan: singlePlan = fft_plan_many_dft(1, len, N/numChunks, out, len,
144      *                                                  1, N, out, len, 1,
145      *                                                  N, FFTW_FORWARD, FFTW_ESTIMATE)
146      * where len is defined as int len[]={N}
147      * 2. execute the plan: fft_execute(singlePlan).
148      * 
149      * It's not a loop, we transformed it into a loop with N/numTasks plans so that 
150      * each task execute one plan. Each plan has N/numChunks/numTasks rows for fftw
151      * processing.
152      */
153     
154     CmiLock(fft_plan_lock);
155     size_t offset=0;
156     plan= new fft_plan[numTasks];
157     for(int i=0; i < numTasks; i++,offset+=nPerThread)
158       {
159         /* ??? should the dist be nPerThread as the fft is performed as 1d of length nPerThread?? */
160         //plan[i] = fft_plan_many_dft(1, length, N/numChunks/numTasks, out+offset, length, 1, N/numTasks,
161     //                        out+offset, length, 1, N/numTasks, FFTW_FORWARD, FFTW_ESTIMATE);
162     
163     plan[i] = fft_plan_many_dft(1, length, N/numChunks/numTasks, out+offset, length, 1, N,
164                             out+offset, length, 1, N, FFTW_FORWARD, FFTW_ESTIMATE);                        
165       }
166     CmiUnlock(fft_plan_lock);
167     
168     srand48(thisIndex);
169     for(int i = 0; i < n; i++) {
170       in[i][0] = drand48();
171       in[i][1] = drand48();
172     }
173
174     msgs = new fftMsg*[numChunks];
175     for(int i = 0; i < numChunks; i++) {
176       msgs[i] = new (n/numChunks) fftMsg;
177       msgs[i]->source = thisIndex;
178     }
179
180     // Reduction to the mainchare to signal that initialization is complete
181     //contribute(CkCallback(CkIndex_Main::startFFT(), mainProxy));
182   }
183
184   void sendTranspose(fft_complex *src_buf) {
185     // All-to-all transpose by constructing and sending
186     // point-to-point messages to each chare in the array.
187     for(int i = thisIndex; i < thisIndex+numChunks; i++) {
188       //  Stagger communication order to avoid hotspots and the
189       //  associated contention.
190       int k = i % numChunks;
191       for(int j = 0, l = 0; j < N/numChunks; j++)
192         memcpy(msgs[k]->data[(l++)*N/numChunks], src_buf[k*N/numChunks+j*N], sizeof(fft_complex)*N/numChunks);
193
194       // Tag each message with the iteration in which it was
195       // generated, to prevent mis-matched messages from chares that
196       // got all of their input quickly and moved to the next step.
197       CkSetRefNum(msgs[k], iteration);
198       thisProxy[k].getTranspose(msgs[k]);
199       // Runtime system takes ownership of messages once they're sent
200       msgs[k] = NULL;
201     }
202   }
203
204   void applyTranspose(fftMsg *m) {
205     int k = m->source;
206     for(int j = 0, l = 0; j < N/numChunks; j++)
207       for(int i = 0; i < N/numChunks; i++) {
208         out[k*N/numChunks+(i*N+j)][0] = m->data[l][0];
209         out[k*N/numChunks+(i*N+j)][1] = m->data[l++][1];
210       }
211
212     // Save just-received messages to reuse for later sends, to
213     // avoid reallocation
214     delete msgs[k];
215     msgs[k] = m;
216     msgs[k]->source = thisIndex;
217   }
218
219   void twiddle(realType sign) {
220     realType a, c, s, re, im;
221
222     int k = thisIndex;
223     for(int i = 0; i < N/numChunks; i++)
224       for(int j = 0; j < N; j++) {
225         a = sign * (TWOPI*(i+k*N/numChunks)*j)/(N*N);
226         c = cos(a);
227         s = sin(a);
228
229         int idx = i*N+j;
230
231         re = c*out[idx][0] - s*out[idx][1];
232         im = s*out[idx][0] + c*out[idx][1];
233         out[idx][0] = re;
234         out[idx][1] = im;
235       }
236   }
237   void fftHelperLaunch()
238   {
239     //kick off thread computation
240     //FuncNodeHelper *nth = nodeHelperProxy[CkMyNode()].ckLocalBranch();
241     //nth->parallelizeFunc(doCalc, numTasks, numTasks, thisIndex, numTasks, 1, 1, plan, 0, NULL);
242     double ffttime = CkWallTimer();
243     NodeHelper_Parallelize(doCalc, 1, plan, numTasks, 0, numTasks-1);    
244     CkPrintf("FFT time: %.3f (ms)\n", (CkWallTimer()-ffttime)*1e3);
245   }
246
247   void initValidation() {
248     memcpy(in, out, sizeof(fft_complex) * n);
249
250     validating = true;
251     int length[] = {N};
252     CmiLock(fft_plan_lock);
253     size_t offset=0;
254     plan= new fft_plan[numTasks];
255     for(int i=0; i < numTasks; i++,offset+=nPerThread)
256       {
257         //      fft_destroy_plan(plan[i]);
258         //plan[i] = fft_plan_many_dft(1, length, N/numChunks/numTasks, out+offset, length, 1, N/numTasks,
259     //                        out+offset, length, 1, N/numTasks, FFTW_BACKWARD, FFTW_ESTIMATE);
260     plan[i] = fft_plan_many_dft(1, length, N/numChunks/numTasks, out+offset, length, 1, N,
261                             out+offset, length, 1, N, FFTW_BACKWARD, FFTW_ESTIMATE);
262       }
263     CmiUnlock(fft_plan_lock);
264     contribute(CkCallback(CkIndex_Main::startFFT(), mainProxy));
265   }
266
267   void calcResidual() {
268     double infNorm = 0.0;
269
270     srand48(thisIndex);
271     for(int i = 0; i < n; i++) {
272       out[i][0] = out[i][0]/(N*N) - drand48();
273       out[i][1] = out[i][1]/(N*N) - drand48();
274
275       double mag = sqrt(pow(out[i][0], 2) + pow(out[i][1], 2));
276       if(mag > infNorm) infNorm = mag;
277     }
278
279     double r = infNorm / (std::numeric_limits<double>::epsilon() * log((double)N * N));
280
281     CkCallback cb(CkReductionTarget(Main, printResidual), mainProxy);
282     contribute(sizeof(double), &r, CkReduction::max_double, cb);
283   }
284
285   fft(CkMigrateMessage* m) {}
286   ~fft() {}
287 };
288
289 #include "fft1d.def.h"