3a4d28cb9c69fa32792fbc3a4b004339ca0c3bbf
[charm.git] / examples / charm++ / load_balancing / kNeighbor / kNeighbor.C
1 /** \file kNeighbor.C
2  *  Author: Chao Mei (chaomei2@illinois.edu)
3  *
4  *  Heavily modified by Abhinav Bhatele (bhatele@illinois.edu) 2011/02/13
5  */
6
7 #include "kNeighbor.decl.h"
8 #include <stdio.h>
9 #include <stdlib.h>
10
11 #define STRIDEK         1
12 #define CALCPERSTEP     100
13
14 #define DEBUG           0
15
16
17 /* readonly */ CProxy_Main mainProxy;
18 /* readonly */ int num_chares;
19 /* readonly */ int gMsgSize;
20 /* readonly */ int gLBFreq;
21
22 int cmpFunc(const void *a, const void *b) {
23   if(*(double *)a < *(double *)b) return -1;
24   if(*(double *)a > *(double *)b) return 1;
25   return 0;
26 }
27
28 class toNeighborMsg: public CMessage_toNeighborMsg {
29   public:
30     int *data;
31     int size;
32     int fromX;
33     int nID;
34
35   public:
36     toNeighborMsg() {};
37     toNeighborMsg(int s): size(s) {  
38     }
39
40     void setMsgSrc(int X, int id) {
41       fromX = X;
42       nID = id;
43     }
44
45 };
46
47 class Main: public CBase_Main {
48   public:
49     CProxy_Block array;
50
51     int numSteps;
52     int currentStep;
53     int currentMsgSize;
54
55     int numElemsRcvd;
56     double totalTime;
57     double maxTime;
58     double minTime;
59     double *timeRec;
60
61     double gStarttime;
62
63   public:
64     Main(CkArgMsg *m) {
65       mainProxy = thisProxy;
66       CkPrintf("\nStarting kNeighbor ...\n");
67
68       if (m->argc!=4 && m->argc!=5) {
69         CkPrintf("Usage: %s <#elements> <#iterations> <msg size> [ldb freq]\n", m->argv[0]);
70         delete m;
71         CkExit();
72       }
73
74       num_chares = atoi(m->argv[1]);
75       if(num_chares < CkNumPes()) {
76         printf("Warning: #elements is forced to be equal to #pes\n");
77         num_chares = CkNumPes();
78       }
79
80       numSteps = atoi(m->argv[2]);
81       currentMsgSize = atoi(m->argv[3]);
82
83       gLBFreq = 100000;
84       if(m->argc==5) {
85         gLBFreq = atoi(m->argv[4]);
86       }
87
88 #if TURN_ON_LDB
89       printf("Setting load-balancing freq to every %d steps\n", gLBFreq);
90 #endif
91       gMsgSize = currentMsgSize;
92
93       currentStep = -1;
94       timeRec = new double[numSteps];
95
96       array = CProxy_Block::ckNew(num_chares);
97       CkCallback *cb = new CkCallback(CkIndex_Main::nextStep(NULL), thisProxy);
98       array.ckSetReductionClient(cb);
99
100       beginIteration();
101     }
102
103     void beginIteration() {
104       currentStep++;
105       if (currentStep == numSteps) {
106         CkPrintf("kNeighbor program finished!\n\n");
107         //CkCallback *cb = new CkCallback(CkIndex_Main::terminate(NULL), thisProxy);
108         //array.ckSetReductionClient(cb);
109         terminate(NULL);
110         return;
111       }
112
113       numElemsRcvd = 0;
114       totalTime = 0.0;
115       maxTime = 0.0;
116       minTime = 3600.0;
117
118       //currentMsgSize = msgSize;
119       if(currentStep!=0 && (currentStep % gLBFreq == 0)) {
120         array.pauseForLB();
121         return;
122       }
123
124       gStarttime = CmiWallTimer();
125       for (int i=0; i<num_chares; i++)
126         array[i].commWithNeighbors();
127     }
128
129     void resumeIter() {
130 #if DEBUG
131       CkPrintf("Resume iteration at step %d\n", currentStep);
132 #endif
133       gStarttime = CmiWallTimer();
134       for (int i=0; i<num_chares; i++)
135         array[i].commWithNeighbors();
136     }
137
138     void terminate(CkReductionMsg *msg) {
139       delete msg;
140       double total = 0.0;
141
142       for (int i=0; i<numSteps; i++)
143         timeRec[i] = timeRec[i]*1e6;
144
145       qsort(timeRec, numSteps, sizeof(double), cmpFunc);
146       printf("Time stats: lowest: %f, median: %f, highest: %f\n", timeRec[0], timeRec[numSteps/2], timeRec[numSteps-1]);
147
148       int samples = 100;
149       if(numSteps<=samples) samples = numSteps-1;
150       for (int i=0; i<samples; i++)
151         total += timeRec[i];
152       total /= samples;
153
154       CkPrintf("Average time for each %d-Neighbor iteration with msg size %d is %f (us)\n", STRIDEK, currentMsgSize, total);
155       CkExit();
156     }
157
158     void nextStep(CkReductionMsg  *msg) {
159       maxTime = *((double *)msg->getData());
160       delete msg;
161       double wholeStepTime = CmiWallTimer() - gStarttime;
162       timeRec[currentStep] = wholeStepTime/CALCPERSTEP;
163       if(currentStep % 10 == 0)
164         CkPrintf("Step %d with msg size %d finished: max=%f, total=%f\n", currentStep, currentMsgSize, maxTime/CALCPERSTEP, wholeStepTime/CALCPERSTEP);
165       beginIteration();
166     }
167
168 };
169
170
171 //no wrap around for sending messages to neighbors
172 class Block: public CBase_Block {
173   public:
174     int numNeighbors;
175     int numNborsRcvd;
176     int *neighbors;
177     double *recvTimes;
178     double startTime;
179
180     int random;
181     int curIterMsgSize;
182     int internalStepCnt;
183     int sum;
184
185     toNeighborMsg **iterMsg;
186
187   public:
188     Block() {
189       //srand(thisIndex.x+thisIndex.y);
190       usesAtSync = CmiTrue;
191
192       numNeighbors = 2*STRIDEK;
193       neighbors = new int[numNeighbors];
194       recvTimes = new double[numNeighbors];
195       int nidx=0;
196       //setting left neighbors
197       for (int i=thisIndex-STRIDEK; i<thisIndex; i++, nidx++) {
198         int tmpnei = i;
199         while (tmpnei<0) tmpnei += num_chares;
200         neighbors[nidx] = tmpnei;
201       }
202       //setting right neighbors
203       for (int i=thisIndex+1; i<=thisIndex+STRIDEK; i++, nidx++) {
204         int tmpnei = i;
205         while (tmpnei>=num_chares) tmpnei -= num_chares;
206         neighbors[nidx] = tmpnei;
207       }
208
209       for (int i=0; i<numNeighbors; i++)
210         recvTimes[i] = 0.0;
211
212       iterMsg = new toNeighborMsg *[numNeighbors];
213       for (int i=0; i<numNeighbors; i++)
214         iterMsg[i] = NULL;
215
216 #if DEBUG
217       CkPrintf("Neighbors of %d: ", thisIndex);
218       for (int i=0; i<numNeighbors; i++)
219         CkPrintf("%d ", neighbors[i]);
220       CkPrintf("\n");
221 #endif
222
223       random = thisIndex*31+73;
224     }
225
226     ~Block() {
227       delete [] neighbors;
228       delete [] recvTimes;
229       delete [] iterMsg;
230     }
231
232     void pup(PUP::er &p){
233       ArrayElement1D::pup(p); //pack our superclass
234       p(numNeighbors);
235       p(numNborsRcvd);
236
237       if(p.isUnpacking()) {
238         neighbors = new int[numNeighbors];
239         recvTimes = new double[numNeighbors];
240       }
241       PUParray(p, neighbors, numNeighbors);
242       PUParray(p, recvTimes, numNeighbors);
243       p(startTime);
244       p(random);
245       p(curIterMsgSize);
246       p(internalStepCnt);
247       p(sum);
248       if(p.isUnpacking()) iterMsg = new toNeighborMsg *[numNeighbors];
249       for(int i=0; i<numNeighbors; i++){
250         if(p.isUnpacking()) iterMsg[i] = new(curIterMsgSize/4, 0) toNeighborMsg(curIterMsgSize/4);
251         CkPupMessage(p, (void **)&iterMsg[i]);
252       }
253     }
254
255     Block(CkMigrateMessage *m) {}
256
257     void pauseForLB(){
258 #if DEBUG
259       CkPrintf("Element %d pause for LB on PE %d\n", thisIndex, CkMyPe());
260 #endif
261       AtSync();
262     }
263
264     void ResumeFromSync(){ //Called by load-balancing framework
265       CkCallback cb(CkIndex_Main::resumeIter(), mainProxy);
266       contribute(0, NULL, CkReduction::sum_int, cb);
267     }
268
269     void startInternalIteration() {
270 #if DEBUG
271       CkPrintf("[%d]: Start internal iteration \n", thisIndex);
272 #endif
273
274       numNborsRcvd = 0;
275       /* 1: pick a work size and do some computation */
276       int N = (thisIndex * thisIndex / num_chares) * 100;
277       for (int i=0; i<N; i++)
278         for (int j=0; j<N; j++) {
279           sum += (thisIndex * i + j);
280         }
281
282       /* 2. send msg to K neighbors */
283       int msgSize = curIterMsgSize;
284
285       // Send msgs to neighbors
286       for (int i=0; i<numNeighbors; i++) {
287         //double memtimer = CmiWallTimer();
288
289         toNeighborMsg *msg = iterMsg[i];
290
291 #if DEBUG
292         CkPrintf("[%d]: send msg to neighbor[%d]=%d\n", thisIndex, i, neighbors[i]);
293 #endif
294         msg->setMsgSrc(thisIndex, i);
295         //double entrytimer = CmiWallTimer();
296         thisProxy(neighbors[i]).recvMsgs(msg);
297         //double entrylasttimer = CmiWallTimer();
298         //if(thisIndex==0){
299         //      CkPrintf("At current step %d to neighbor %d, msg creation time: %f, entrymethod fire time: %f\n", internalStepCnt, neighbors[i], entrytimer-memtimer, entrylasttimer-entrytimer);
300         //}
301       }
302     }
303
304     void commWithNeighbors() {
305       internalStepCnt = 0;
306       curIterMsgSize = gMsgSize;
307       //currently the work size is only changed every big steps (which
308       //are initiated by the main proxy
309       random++;
310
311       if(iterMsg[0]==NULL) { //indicating the messages have not been created
312         for(int i=0; i<numNeighbors; i++)
313           iterMsg[i] = new(curIterMsgSize/4, 0) toNeighborMsg(curIterMsgSize/4);
314       }
315
316       startTime = CmiWallTimer();
317       startInternalIteration();
318     }
319
320     void recvReplies(toNeighborMsg *m) {
321       int fromNID = m->nID;
322
323 #if DEBUG
324       CkPrintf("[%d]: receive ack from neighbor[%d]=%d\n", thisIndex, fromNID, neighbors[fromNID]);
325 #endif
326
327       iterMsg[fromNID] = m;
328       //recvTimes[fromNID] += (CmiWallTimer() - startTime);
329
330       //get one step time and send it back to mainProxy
331       numNborsRcvd++;
332       if (numNborsRcvd == numNeighbors) {
333         internalStepCnt++;
334         if (internalStepCnt==CALCPERSTEP) {
335           double iterCommTime = CmiWallTimer() - startTime;
336           contribute(sizeof(double), &iterCommTime, CkReduction::max_double);
337           /*if(thisIndex==0){
338             for(int i=0; i<numNeighbors; i++){
339             CkPrintf("RTT time from neighbor %d (actual elem id %d): %lf\n", i, neighbors[i], recvTimes[i]);
340             }
341             }*/
342         } else {
343           startInternalIteration();
344         }
345       }
346     }
347
348     void recvMsgs(toNeighborMsg *m) {
349 #if DEBUG
350       CkPrintf("[%d]: recv msg from %d as its %dth neighbor\n", thisIndex, m->fromX, m->nID);
351 #endif
352
353       thisProxy(m->fromX).recvReplies(m);
354     }
355
356     inline int MAX(int a, int b) {
357       return (a>b)?a:b;
358     }
359     inline int MIN(int a, int b) {
360       return (a<b)?a:b;
361     }
362 };
363
364 #include "kNeighbor.def.h"