examples/charm++: Always use CkWallTimer, not CmiWallTimer (tested)
[charm.git] / examples / charm++ / ckdirect / matmul3d / matmul3d.C
1 /** \file matmul3d.C
2  *  Author: Abhinav S Bhatele
3  *  Date Created: April 01st, 2008
4  *
5  */
6
7 #include <stdlib.h>
8 #include "ckdirect.h"
9 #include "matmul3d.decl.h"
10 #include "matmul3d.h"
11 #include "TopoManager.h"
12 #if CMK_BLUEGENEP || CMK_VERSION_BLUEGENE
13 #include "essl.h"
14 #endif
15
16 Main::Main(CkArgMsg* m) {
17   if ( (m->argc != 3) && (m->argc != 7) && (m->argc != 10) ) {
18     CkPrintf("%s [array_size] [block_size]\n", m->argv[0]);
19     CkPrintf("OR %s [array_size_X] [array_size_Y] [array_size_Z] [block_size_X] [block_size_Y] [block_size_Z]\n", m->argv[0]);
20     CkPrintf("OR %s [array_size_X] [array_size_Y] [array_size_Z] [block_size_X] [block_size_Y] [block_size_Z] [torus_dim_X] [torus_dim_Y] [torus_dim_Z] \n", m->argv[0]);
21     CkAbort("Abort");
22   }
23
24   // store the main proxy
25   mainProxy = thisProxy;
26
27   // get the size of the global array, size of each chare and size of the torus [optional]
28   if(m->argc == 3) {
29     arrayDimX = arrayDimY = arrayDimZ = atoi(m->argv[1]);
30     blockDimX = blockDimY = blockDimZ = atoi(m->argv[2]);
31   }
32   else if (m->argc == 7) {
33     arrayDimX = atoi(m->argv[1]);
34     arrayDimY = atoi(m->argv[2]);
35     arrayDimZ = atoi(m->argv[3]);
36     blockDimX = atoi(m->argv[4]);
37     blockDimY = atoi(m->argv[5]);
38     blockDimZ = atoi(m->argv[6]);
39   } else {
40     arrayDimX = atoi(m->argv[1]);
41     arrayDimY = atoi(m->argv[2]);
42     arrayDimZ = atoi(m->argv[3]);
43     blockDimX = atoi(m->argv[4]);
44     blockDimY = atoi(m->argv[5]);
45     blockDimZ = atoi(m->argv[6]);
46     torusDimX = atoi(m->argv[7]);
47     torusDimY = atoi(m->argv[8]);
48     torusDimZ = atoi(m->argv[9]);
49   }
50
51   if (arrayDimX < blockDimX || arrayDimX % blockDimX != 0)
52     CkAbort("array_size_X % block_size_X != 0!");
53   if (arrayDimY < blockDimY || arrayDimY % blockDimY != 0)
54     CkAbort("array_size_Y % block_size_Y != 0!");
55   if (arrayDimZ < blockDimZ || arrayDimZ % blockDimZ != 0)
56     CkAbort("array_size_Z % block_size_Z != 0!");
57
58   num_chare_x = arrayDimX / blockDimX;
59   num_chare_y = arrayDimY / blockDimY;
60   num_chare_z = arrayDimZ / blockDimZ;
61   num_chares = num_chare_x * num_chare_y * num_chare_z;
62
63   subBlockDimXz = blockDimX/num_chare_z;
64   subBlockDimYx = blockDimY/num_chare_x;
65   subBlockDimXy = blockDimX/num_chare_y;
66
67   // print info
68   CkPrintf("Running Matrix Multiplication on %d processors with (%d, %d, %d) chares\n", CkNumPes(), num_chare_x, num_chare_y, num_chare_z);
69   CkPrintf("Array Dimensions: %d %d %d\n", arrayDimX, arrayDimY, arrayDimZ);
70   CkPrintf("Block Dimensions: %d %d %d\n", blockDimX, blockDimY, blockDimZ);
71
72   // Create new array of worker chares
73 #if USE_TOPOMAP
74   CkPrintf("Topology Mapping is being done ...\n");
75   CProxy_ComputeMap map = CProxy_ComputeMap::ckNew(num_chare_x, num_chare_y, num_chare_z, 1, 1, 1);
76   CkArrayOptions opts(num_chare_x, num_chare_y, num_chare_z);
77   opts.setMap(map);
78   compute = CProxy_Compute::ckNew(opts);
79 #elif USE_BLOCKMAP
80   CkPrintf("Block Mapping is being done ...\n");
81   CProxy_ComputeMap map = CProxy_ComputeMap::ckNew(num_chare_x, num_chare_y, num_chare_z, torusDimX, torusDimY, torusDimZ);
82   CkArrayOptions opts(num_chare_x, num_chare_y, num_chare_z);
83   opts.setMap(map);
84   compute = CProxy_Compute::ckNew(opts);
85 #else
86   compute = CProxy_Compute::ckNew(num_chare_x, num_chare_y, num_chare_z);
87 #endif
88
89   // CkPrintf("Total Hops: %d\n", hops);
90
91   // Start the computation
92   numIterations = 0;
93   startTime = CkWallTimer();
94 #if USE_CKDIRECT
95   compute.setupChannels();
96 #else
97   compute.beginCopying();
98 #endif
99 }
100
101 void Main::done() {
102   numIterations++;
103   if(numIterations == 1) {
104     firstTime = CkWallTimer();
105 #if USE_CKDIRECT
106     CkPrintf("FIRST ITER TIME %f secs\n", firstTime - setupTime);
107 #else
108     CkPrintf("FIRST ITER TIME %f secs\n", firstTime - startTime);
109 #endif
110     compute.resetArrays();
111   } else {
112     if(numIterations == NUM_ITER) {
113       endTime[numIterations-2] = CkWallTimer() - firstTime;
114       double sum = 0;
115       for(int i=0; i<NUM_ITER-1; i++)
116         sum += endTime[i];
117 #if USE_CKDIRECT
118       CkPrintf("AVG TIME %f secs\n", sum/(NUM_ITER-1));
119 #else
120       CkPrintf("AVG TIME %f secs\n", sum/(NUM_ITER-1));
121 #endif
122       CkExit();
123     } else {
124       endTime[numIterations-2] = CkWallTimer() - firstTime;
125       compute.resetArrays();
126     }
127   }
128 }
129
130 void Main::resetDone() {
131   firstTime = CkWallTimer();
132   compute.beginCopying();
133 }
134
135 void Main::setupDone() {
136   setupTime = CkWallTimer();
137   CkPrintf("SETUP TIME %f secs\n", setupTime - startTime);
138   compute.beginCopying();
139 }
140
141 // Constructor, initialize values
142 Compute::Compute() {
143   // each compute will only hold blockDimX/num_chare_z no. of rows to begin with
144   A = new float[blockDimX*blockDimY];
145   B = new float[blockDimY*blockDimZ];
146   C = new float[blockDimX*blockDimZ];
147 #if USE_CKDIRECT
148   tmpC = new float[blockDimX*blockDimZ];
149 #endif
150
151   int indexX = thisIndex.x;
152   int indexY = thisIndex.y;
153   int indexZ = thisIndex.z;
154
155   float tmp;
156
157   for(int i=indexZ*subBlockDimXz; i<(indexZ+1)*subBlockDimXz; i++)
158     for(int j=0; j<blockDimY; j++) {
159       tmp = (float)drand48();
160       while(tmp > MAX_LIMIT || tmp < (-1)*MAX_LIMIT)
161         tmp = (float)drand48();
162
163       A[i*blockDimY + j] = tmp;
164   }
165
166   for(int j=indexX*subBlockDimYx; j<(indexX+1)*subBlockDimYx; j++)
167     for(int k=0; k<blockDimZ; k++) {
168       tmp = (float)drand48();
169       while(tmp > MAX_LIMIT || tmp < (-1)*MAX_LIMIT)
170         tmp = (float)drand48();
171
172       B[j*blockDimZ + k] = tmp;
173   }
174
175   for(int i=0; i<blockDimX; i++)
176     for(int k=0; k<blockDimZ; k++) {
177       C[i*blockDimZ + k] = 0.0;
178 #if USE_CKDIRECT
179       tmpC[i*blockDimZ + k] = 0.0;
180 #endif
181     }
182
183   // counters to keep track of how many messages have been received
184   countA = 0;
185   countB = 0;
186   countC = 0;
187
188 #if USE_CKDIRECT
189   sHandles = new infiDirectUserHandle[num_chare_x + num_chare_y + num_chare_z];
190   rHandles = new infiDirectUserHandle[num_chare_x + num_chare_y + num_chare_z];
191 #endif
192 }
193
194 Compute::Compute(CkMigrateMessage* m) {
195
196 }
197
198 Compute::~Compute() {
199   delete [] A;
200   delete [] B;
201   delete [] C;
202 #if USE_CKDIRECT
203   delete [] tmpC;
204 #endif
205 }
206
207 void Compute::beginCopying() {
208   sendA();
209   sendB();
210 }
211
212 void Compute::resetArrays() {
213   int indexX = thisIndex.x;
214   int indexY = thisIndex.y;
215   int indexZ = thisIndex.z;
216   
217   float tmp;
218   
219   for(int i=indexZ*subBlockDimXz; i<(indexZ+1)*subBlockDimXz; i++)
220     for(int j=0; j<blockDimY; j++) {
221       tmp = (float)drand48(); 
222       while(tmp > MAX_LIMIT || tmp < (-1)*MAX_LIMIT)
223         tmp = (float)drand48();
224
225       A[i*blockDimY + j] = tmp;
226   }
227
228   for(int j=indexX*subBlockDimYx; j<(indexX+1)*subBlockDimYx; j++)
229     for(int k=0; k<blockDimZ; k++) {
230       tmp = (float)drand48();
231       while(tmp > MAX_LIMIT || tmp < (-1)*MAX_LIMIT)
232         tmp = (float)drand48();
233
234       B[j*blockDimZ + k] = tmp;
235   }
236
237   for(int i=0; i<blockDimX; i++)
238     for(int k=0; k<blockDimZ; k++) {
239       C[i*blockDimZ + k] = 0.0;
240 #if USE_CKDIRECT
241       tmpC[i*blockDimZ + k] = 0.0;
242 #endif
243     }
244
245   contribute(0, 0, CkReduction::concat, CkCallback(CkIndex_Main::resetDone(), mainProxy));
246 }
247
248 void Compute::sendA() {
249   int indexZ = thisIndex.z;
250
251   for(int k=0; k<num_chare_z; k++)
252     if(k != indexZ) {
253 #if USE_CKDIRECT
254       CkDirect_put(&sHandles[num_chare_x + num_chare_y + k]);
255 #else
256       // use a local pointer for chares on the same processor
257       Compute* c = compute(thisIndex.x, thisIndex.y, k).ckLocal();
258       if(c != NULL)
259         c->receiveA(indexZ, &A[indexZ*subBlockDimXz*blockDimY], subBlockDimXz * blockDimY);
260       else
261         compute(thisIndex.x, thisIndex.y, k).receiveA(indexZ, &A[indexZ*subBlockDimXz*blockDimY], subBlockDimXz * blockDimY);
262 #endif
263     }
264 }
265
266 void Compute::sendB() {
267   int indexX = thisIndex.x;
268
269   for(int i=0; i<num_chare_x; i++)
270     if(i != indexX) {
271 #if USE_CKDIRECT
272       CkDirect_put(&sHandles[i]);
273 #else
274       // use a local pointer for chares on the same processor
275       Compute* c = compute(i, thisIndex.y, thisIndex.z).ckLocal();
276       if(c != NULL)
277         c->receiveB(indexX, &B[indexX*subBlockDimYx*blockDimZ], subBlockDimYx * blockDimZ);
278       else
279         compute(i, thisIndex.y, thisIndex.z).receiveB(indexX, &B[indexX*subBlockDimYx*blockDimZ], subBlockDimYx * blockDimZ);
280 #endif
281     }
282 }
283
284 void Compute::sendC() {
285   int indexY = thisIndex.y;
286   for(int j=0; j<num_chare_y; j++) {
287     if(j != indexY) {
288 #if USE_CKDIRECT
289       CkDirect_put(&sHandles[num_chare_x + j]);
290 #else
291       // use a local pointer for chares on the same processor
292       Compute *c = compute(thisIndex.x, j, thisIndex.z).ckLocal();
293       if(c != NULL)
294         c->receiveC(&C[j*subBlockDimXy*blockDimZ], subBlockDimXy * blockDimZ, 1);
295       else
296         compute(thisIndex.x, j, thisIndex.z).receiveC(&C[j*subBlockDimXy*blockDimZ], subBlockDimXy * blockDimZ, 1);
297 #endif
298     }
299   }
300 }
301
302 void Compute::receiveA(int indexZ, float *data, int size) {
303   for(int i=0; i<subBlockDimXz; i++)
304     for(int j=0; j<blockDimY; j++)
305       A[indexZ*subBlockDimXz*blockDimY + i*blockDimY + j] = data[i*blockDimY + j];
306   countA++;
307   if(countA == num_chare_z-1 && countB == num_chare_x-1)
308     doWork();
309 }
310
311 void Compute::receiveB(int indexX, float *data, int size) {
312   for(int j=0; j<subBlockDimYx; j++)
313     for(int k=0; k<blockDimZ; k++)
314       B[indexX*subBlockDimYx*blockDimZ + j*blockDimZ + k] = data[j*blockDimZ + k];
315   countB++;
316   if(countA == num_chare_z-1 && countB == num_chare_x-1)
317     doWork();
318 }
319
320 void Compute::receiveC(float *data, int size, int who) {
321   int indexY = thisIndex.y;
322   if(who) {
323     for(int i=0; i<subBlockDimXy; i++)
324       for(int k=0; k<blockDimZ; k++)
325         C[indexY*subBlockDimXy*blockDimZ + i*blockDimZ + k] += data[i*blockDimZ + k];
326   }
327   countC++;
328   if(countC == num_chare_y) {
329     /*char name[30];
330     sprintf(name, "%s_%d_%d_%d", "C", thisIndex.x, thisIndex.y, thisIndex.z);
331     FILE *fp = fopen(name, "w");
332     for(int i=0; i<subBlockDimXy; i++) {
333       for(int k=0; k<blockDimZ; k++)
334         fprintf(fp, "%f ", C[indexY*subBlockDimXy*blockDimZ + i*blockDimZ + k]);
335       fprintf(fp, "\n");
336     }
337     fclose(fp);*/
338
339     // counters to keep track of how many messages have been received
340     countA = 0;
341     countB = 0;
342     countC = 0;
343
344     contribute(0, 0, CkReduction::concat, CkCallback(CkIndex_Main::done(), mainProxy));
345     // mainProxy.done();
346   }
347 }
348
349 void Compute::receiveC() {
350   int indexX = thisIndex.x;
351   int indexY = thisIndex.y;
352   int indexZ = thisIndex.z;
353
354   // copy C from tmpC to the correct location
355   for(int j=0; j<num_chare_y; j++) {
356     if( j != indexY) {
357       for(int i=0; i<subBlockDimXy; i++)
358         for(int k=0; k<blockDimZ; k++)
359           C[indexY*subBlockDimXy*blockDimZ + i*blockDimZ + k] += tmpC[j*subBlockDimXy*blockDimZ + i*blockDimZ + k];
360     }
361   }
362   /*char name[30];
363   sprintf(name, "%s_%d_%d_%d", "C", thisIndex.x, thisIndex.y, thisIndex.z);
364   FILE *fp = fopen(name, "w");
365   for(int i=0; i<subBlockDimXy; i++) {
366     for(int k=0; k<blockDimZ; k++)
367       fprintf(fp, "%f ", C[indexY*subBlockDimXy*blockDimZ + i*blockDimZ + k]);
368     fprintf(fp, "\n");
369   }
370   fclose(fp);
371   CkPrintf("%d_%d_%d\n", thisIndex.x, thisIndex.y, thisIndex.z);
372   for(int i=0; i<subBlockDimXy; i++) {
373     for(int k=0; k<blockDimZ; k++)
374       CkPrintf("%f ", C[indexY*subBlockDimXy*blockDimZ + i*blockDimZ + k]);
375     CkPrintf("\n");
376   }*/
377
378   // call ready for the buffers
379   for(int i=0; i<num_chare_x; i++)
380     if(i != indexX)
381       CkDirect_ready(&rHandles[i]);
382
383   for(int j=0; j<num_chare_y; j++)
384     if(j != indexY)
385       CkDirect_ready(&rHandles[num_chare_x + j]);
386   
387   for(int k=0; k<num_chare_z; k++)
388     if(k != indexZ)
389       CkDirect_ready(&rHandles[num_chare_x + num_chare_y + k]);
390
391   // counters to keep track of how many messages have been received
392   countA = 0;
393   countB = 0;
394   countC = 0;
395
396   contribute(0, 0, CkReduction::concat, CkCallback(CkIndex_Main::done(), mainProxy));
397   // mainProxy.done();
398 }
399
400 void Compute::doWork() {
401   if(countA == num_chare_z-1 && countB == num_chare_x-1) {
402
403 #if CMK_BLUEGENEP || CMK_VERSION_BLUEGENE
404     const char trans = 'N';
405     const double alpha = 1.0;
406     const double beta = 0.0;
407     
408     sgemm(&trans, &trans, blockDimX, blockDimZ, blockDimY, alpha, A, blockDimX, B, blockDimY, beta, C, blockDimX);
409 #else
410     for(int i=0; i<blockDimX; i++)
411       for(int j=0; j<blockDimY; j++)
412         for(int k=0; k<blockDimZ; k++)
413           C[i*blockDimZ+k] += A[i*blockDimY+j] * B[j*blockDimZ+k];
414 #endif
415
416 #if USE_CKDIRECT
417     callBackRcvdC((void *)this);
418 #else
419     receiveC(&C[(thisIndex.y)*subBlockDimXy*blockDimZ], subBlockDimXy*blockDimZ, 0);
420 #endif
421     sendC();
422   }
423 }
424
425 void Compute::setupChannels() {
426   int mype = CkMyPe();
427   int x = thisIndex.x;
428   int y = thisIndex.y;
429   int z = thisIndex.z;
430
431   for(int k=0; k<num_chare_z; k++)
432     if(k != z)
433       thisProxy(x, y, k).notifyReceiver(mype, thisIndex, SENDA);
434
435   for(int i=0; i<num_chare_x; i++)
436     if(i != x)
437       thisProxy(i, y, z).notifyReceiver(mype, thisIndex, SENDB);
438
439   for(int j=0; j<num_chare_y; j++)
440     if(j != y)
441       thisProxy(x, j, z).notifyReceiver(mype, thisIndex, SENDC);
442
443 }
444
445 void Compute::notifyReceiver(int pe, CkIndex3D index, int arr) {
446   // --- B --- | --- C --- | --- A ---
447   if(arr == SENDA) {
448     rHandles[num_chare_x + num_chare_y + index.z] = CkDirect_createHandle(pe, &A[(index.z)*subBlockDimXz*blockDimY], sizeof(float)*subBlockDimXz*blockDimY, Compute::callBackRcvdA, (void *)this, OOB);
449     thisProxy(index.x, index.y, index.z).recvHandle(rHandles[num_chare_x + num_chare_y + index.z], thisIndex.z, arr);
450   }
451
452   if(arr == SENDB) {
453     rHandles[index.x] = CkDirect_createHandle(pe, &B[(index.x)*subBlockDimYx*blockDimZ], sizeof(float)*subBlockDimYx*blockDimZ, Compute::callBackRcvdB, (void *)this, OOB);
454     thisProxy(index.x, index.y, index.z).recvHandle(rHandles[index.x], thisIndex.x, arr);
455   }
456
457   if(arr == SENDC) {
458     rHandles[num_chare_x + index.y] = CkDirect_createHandle(pe, &tmpC[(index.y)*subBlockDimXy*blockDimZ], sizeof(float)*subBlockDimXy*blockDimZ, Compute::callBackRcvdC, (void *)this, OOB);
459     thisProxy(index.x, index.y, index.z).recvHandle(rHandles[num_chare_x + index.y], thisIndex.y, arr);
460   }
461 }
462
463 void Compute::recvHandle(infiDirectUserHandle shdl, int index, int arr) {
464   // --- B --- | --- C --- | --- A ---
465   if(arr == SENDA) {
466     sHandles[num_chare_x + num_chare_y + index] = shdl;
467     CkDirect_assocLocalBuffer(&sHandles[num_chare_x + num_chare_y + index], &A[thisIndex.z*subBlockDimXz*blockDimY], sizeof(float)*subBlockDimXz*blockDimY);
468     countA++;
469   }
470
471   if(arr == SENDB) {
472     sHandles[index] = shdl;
473     CkDirect_assocLocalBuffer(&sHandles[index], &B[thisIndex.x*subBlockDimYx*blockDimZ], sizeof(float)*subBlockDimYx*blockDimZ);
474     countB++;
475   }
476
477   if(arr == SENDC) {
478     sHandles[num_chare_x + index] = shdl;
479     CkDirect_assocLocalBuffer(&sHandles[num_chare_x + index], &C[index*subBlockDimXy*blockDimZ], sizeof(float)*subBlockDimXy*blockDimZ);
480     countC++;
481   }
482
483   if(countA == num_chare_z-1 && countB == num_chare_x-1 && countC == num_chare_y-1) {
484     countA = 0;
485     countB = 0;
486     countC = 0;
487     contribute(0, 0, CkReduction::concat, CkCallback(CkIndex_Main::setupDone(), mainProxy));
488     // mainProxy.setupDone();
489   }
490 }
491
492
493 void Compute::callBackRcvdA(void *arg) {
494   Compute *obj = (Compute *)arg;
495   obj->countA++;
496   if(obj->countA == num_chare_z-1 && obj->countB == num_chare_x-1)
497     obj->thisProxy(obj->thisIndex.x, obj->thisIndex.y, obj->thisIndex.z).doWork();
498 }
499
500 void Compute::callBackRcvdB(void *arg) {
501   Compute *obj = (Compute *)arg;
502   obj->countB++;
503   if(obj->countA == num_chare_z-1 && obj->countB == num_chare_x-1)
504     obj->thisProxy(obj->thisIndex.x, obj->thisIndex.y, obj->thisIndex.z).doWork();
505 }
506
507 void Compute::callBackRcvdC(void *arg) {
508   Compute *obj = (Compute *)arg;
509   obj->countC++;
510   if(obj->countC == num_chare_y)
511     obj->thisProxy(obj->thisIndex.x, obj->thisIndex.y, obj->thisIndex.z).receiveC();
512 }
513
514 ComputeMap::ComputeMap(int x, int y, int z, int tx, int ty, int tz) {
515   X = x;
516   Y = y;
517   Z = z;
518   mapping = new int[X*Y*Z];
519
520   TopoManager tmgr;
521   int dimX, dimY, dimZ, dimT;
522
523 #if USE_TOPOMAP
524   dimX = tmgr.getDimNX();
525   dimY = tmgr.getDimNY();
526   dimZ = tmgr.getDimNZ();
527   dimT = tmgr.getDimNT();
528 #elif USE_BLOCKMAP
529   dimX = tx;
530   dimY = ty;
531   dimZ = tz;
532   dimT = 1;
533 #endif
534
535   // we are assuming that the no. of chares in each dimension is a 
536   // multiple of the torus dimension
537   int numCharesPerPe = X*Y*Z/CkNumPes();
538
539   int numCharesPerPeX = X / dimX;
540   int numCharesPerPeY = Y / dimY;
541   int numCharesPerPeZ = Z / dimZ;
542
543   if(dimT < 2) {    // one core per node
544     if(CkMyPe()==0) CkPrintf("DATA: %d %d %d %d : %d %d %d\n", dimX, dimY, dimZ, dimT, numCharesPerPeX, numCharesPerPeY, numCharesPerPeZ);
545     for(int i=0; i<dimX; i++)
546       for(int j=0; j<dimY; j++)
547         for(int k=0; k<dimZ; k++)
548           for(int ci=i*numCharesPerPeX; ci<(i+1)*numCharesPerPeX; ci++)
549             for(int cj=j*numCharesPerPeY; cj<(j+1)*numCharesPerPeY; cj++)
550               for(int ck=k*numCharesPerPeZ; ck<(k+1)*numCharesPerPeZ; ck++) {
551 #if USE_TOPOMAP
552                 mapping[ci*Y*Z + cj*Z + ck] = tmgr.coordinatesToRank(i, j, k);
553 #elif USE_BLOCKMAP
554                 mapping[ci*Y*Z + cj*Z + ck] = i + j*dimX + k*dimX*dimY;
555 #endif
556               }
557   } else {          // multiple cores per node
558     // In this case, we split the chares in the X dimension among the
559     // cores on the same node.
560     numCharesPerPeX /= dimT;
561       if(CkMyPe()==0) CkPrintf("%d %d %d : %d %d %d %d : %d %d %d \n", x, y, z, dimX, dimY, dimZ, dimT, numCharesPerPeX, numCharesPerPeY, numCharesPerPeZ);
562       for(int i=0; i<dimX; i++)
563         for(int j=0; j<dimY; j++)
564           for(int k=0; k<dimZ; k++)
565             for(int l=0; l<dimT; l++)
566               for(int ci=(dimT*i+l)*numCharesPerPeX; ci<(dimT*i+l+1)*numCharesPerPeX; ci++)
567                 for(int cj=j*numCharesPerPeY; cj<(j+1)*numCharesPerPeY; cj++)
568                   for(int ck=k*numCharesPerPeZ; ck<(k+1)*numCharesPerPeZ; ck++) {
569                     mapping[ci*Y*Z + cj*Z + ck] = tmgr.coordinatesToRank(i, j, k, l);
570                   }
571   }
572 }
573
574 ComputeMap::~ComputeMap() {
575   delete [] mapping;
576 }
577
578 int ComputeMap::procNum(int, const CkArrayIndex &idx) {
579   int *index = (int *)idx.data();
580   return mapping[index[0]*Y*Z + index[1]*Z + index[2]];
581 }
582