Fix a bug in the init
[charm.git] / examples / charm++ / load_balancing / kNeighbor / kNeighbor.C
1 /** \file kNeighbor.C
2  *  Author: Chao Mei (chaomei2@illinois.edu)
3  *
4  *  Heavily modified by Abhinav Bhatele (bhatele@illinois.edu) 2011/02/13
5  */
6
7 #include "kNeighbor.decl.h"
8 #include <stdio.h>
9 #include <stdlib.h>
10
11 #define STRIDEK         1
12 #define CALCPERSTEP     100
13
14 #define DEBUG           0
15
16
17 /* readonly */ CProxy_Main mainProxy;
18 /* readonly */ int num_chares;
19 /* readonly */ int gMsgSize;
20 /* readonly */ int gLBFreq;
21
22 int cmpFunc(const void *a, const void *b) {
23   if(*(double *)a < *(double *)b) return -1;
24   if(*(double *)a > *(double *)b) return 1;
25   return 0;
26 }
27
28 class toNeighborMsg: public CMessage_toNeighborMsg {
29   public:
30     int *data;
31     int size;
32     int fromX;
33     int nID;
34
35   public:
36     toNeighborMsg() {};
37     toNeighborMsg(int s): size(s) {  
38     }
39
40     void setMsgSrc(int X, int id) {
41       fromX = X;
42       nID = id;
43     }
44
45 };
46
47 class Main: public CBase_Main {
48   public:
49     CProxy_Block array;
50
51     int numSteps;
52     int currentStep;
53     int currentMsgSize;
54
55     int numElemsRcvd;
56     double totalTime;
57     double maxTime;
58     double minTime;
59     double *timeRec;
60
61     double gStarttime;
62
63   public:
64     Main(CkArgMsg *m) {
65       mainProxy = thisProxy;
66       CkPrintf("\nStarting kNeighbor ...\n");
67
68       if (m->argc!=4 && m->argc!=5) {
69         CkPrintf("Usage: %s <#elements> <#iterations> <msg size> [ldb freq]\n", m->argv[0]);
70         delete m;
71         CkExit();
72       }
73
74       num_chares = atoi(m->argv[1]);
75       if(num_chares < CkNumPes()) {
76         printf("Warning: #elements is forced to be equal to #pes\n");
77         num_chares = CkNumPes();
78       }
79
80       numSteps = atoi(m->argv[2]);
81       currentMsgSize = atoi(m->argv[3]);
82
83       gLBFreq = 100000;
84       if(m->argc==5) {
85         gLBFreq = atoi(m->argv[4]);
86       }
87
88 #if TURN_ON_LDB
89       printf("Setting load-balancing freq to every %d steps\n", gLBFreq);
90 #endif
91       gMsgSize = currentMsgSize;
92
93       currentStep = -1;
94       timeRec = new double[numSteps];
95
96       array = CProxy_Block::ckNew(num_chares);
97       CkCallback *cb = new CkCallback(CkIndex_Main::nextStep(NULL), thisProxy);
98       array.ckSetReductionClient(cb);
99
100       beginIteration();
101     }
102
103     void beginIteration() {
104       currentStep++;
105       if (currentStep == numSteps) {
106         CkPrintf("kNeighbor program finished!\n\n");
107         //CkCallback *cb = new CkCallback(CkIndex_Main::terminate(NULL), thisProxy);
108         //array.ckSetReductionClient(cb);
109         terminate(NULL);
110         return;
111       }
112
113       numElemsRcvd = 0;
114       totalTime = 0.0;
115       maxTime = 0.0;
116       minTime = 3600.0;
117
118       //currentMsgSize = msgSize;
119       if(currentStep!=0 && (currentStep % gLBFreq == 0)) {
120         array.pauseForLB();
121         return;
122       }
123
124       gStarttime = CkWallTimer();
125         array.commWithNeighbors();
126     }
127
128     void resumeIter() {
129 #if DEBUG
130       CkPrintf("Resume iteration at step %d\n", currentStep);
131 #endif
132       gStarttime = CkWallTimer();
133         array.commWithNeighbors();
134     }
135
136     void terminate(CkReductionMsg *msg) {
137       delete msg;
138       double total = 0.0;
139
140       for (int i=0; i<numSteps; i++)
141         timeRec[i] = timeRec[i]*1e6;
142
143       qsort(timeRec, numSteps, sizeof(double), cmpFunc);
144       printf("Time stats: lowest: %f, median: %f, highest: %f\n", timeRec[0], timeRec[numSteps/2], timeRec[numSteps-1]);
145
146       int samples = 100;
147       if(numSteps<=samples) samples = numSteps-1;
148       for (int i=0; i<samples; i++)
149         total += timeRec[i];
150       total /= samples;
151
152       CkPrintf("Average time for each %d-Neighbor iteration with msg size %d is %f (us)\n", STRIDEK, currentMsgSize, total);
153       CkExit();
154     }
155
156     void nextStep(CkReductionMsg  *msg) {
157       maxTime = *((double *)msg->getData());
158       delete msg;
159       double wholeStepTime = CkWallTimer() - gStarttime;
160       timeRec[currentStep] = wholeStepTime/CALCPERSTEP;
161       if(currentStep % 10 == 0)
162         CkPrintf("Step %d with msg size %d finished: max=%f, total=%f\n", currentStep, currentMsgSize, maxTime/CALCPERSTEP, wholeStepTime/CALCPERSTEP);
163       beginIteration();
164     }
165
166 };
167
168
169 //no wrap around for sending messages to neighbors
170 class Block: public CBase_Block {
171   public:
172     int numNeighbors;
173     int numNborsRcvd;
174     int *neighbors;
175     double *recvTimes;
176     double startTime;
177
178     int random;
179     int curIterMsgSize;
180     int internalStepCnt;
181     int sum;
182
183     toNeighborMsg **iterMsg;
184
185   public:
186     Block() {
187       //srand(thisIndex.x+thisIndex.y);
188       usesAtSync = CmiTrue;
189
190       numNeighbors = 2*STRIDEK;
191       neighbors = new int[numNeighbors];
192       recvTimes = new double[numNeighbors];
193       int nidx=0;
194       //setting left neighbors
195       for (int i=thisIndex-STRIDEK; i<thisIndex; i++, nidx++) {
196         int tmpnei = i;
197         while (tmpnei<0) tmpnei += num_chares;
198         neighbors[nidx] = tmpnei;
199       }
200       //setting right neighbors
201       for (int i=thisIndex+1; i<=thisIndex+STRIDEK; i++, nidx++) {
202         int tmpnei = i;
203         while (tmpnei>=num_chares) tmpnei -= num_chares;
204         neighbors[nidx] = tmpnei;
205       }
206
207       for (int i=0; i<numNeighbors; i++)
208         recvTimes[i] = 0.0;
209
210       iterMsg = new toNeighborMsg *[numNeighbors];
211       for (int i=0; i<numNeighbors; i++)
212         iterMsg[i] = NULL;
213
214 #if DEBUG
215       CkPrintf("Neighbors of %d: ", thisIndex);
216       for (int i=0; i<numNeighbors; i++)
217         CkPrintf("%d ", neighbors[i]);
218       CkPrintf("\n");
219 #endif
220
221       random = thisIndex*31+73;
222     }
223
224     ~Block() {
225       delete [] neighbors;
226       delete [] recvTimes;
227       delete [] iterMsg;
228     }
229
230     void pup(PUP::er &p){
231       CBase_Block::pup(p);
232       p(numNeighbors);
233       p(numNborsRcvd);
234
235       if(p.isUnpacking()) {
236         neighbors = new int[numNeighbors];
237         recvTimes = new double[numNeighbors];
238       }
239       PUParray(p, neighbors, numNeighbors);
240       PUParray(p, recvTimes, numNeighbors);
241       p(startTime);
242       p(random);
243       p(curIterMsgSize);
244       p(internalStepCnt);
245       p(sum);
246       if(p.isUnpacking()) iterMsg = new toNeighborMsg *[numNeighbors];
247       for(int i=0; i<numNeighbors; i++){
248         CkPupMessage(p, (void **)&iterMsg[i]);
249       }
250     }
251
252     Block(CkMigrateMessage *m) {}
253
254     void pauseForLB(){
255 #if DEBUG
256       CkPrintf("Element %d pause for LB on PE %d\n", thisIndex, CkMyPe());
257 #endif
258       AtSync();
259     }
260
261     void ResumeFromSync(){ //Called by load-balancing framework
262       CkCallback cb(CkIndex_Main::resumeIter(), mainProxy);
263       contribute(0, NULL, CkReduction::sum_int, cb);
264     }
265
266     void startInternalIteration() {
267 #if DEBUG
268       CkPrintf("[%d]: Start internal iteration \n", thisIndex);
269 #endif
270
271       numNborsRcvd = 0;
272       /* 1: pick a work size and do some computation */
273       int N = (thisIndex * thisIndex / num_chares) * 100;
274       for (int i=0; i<N; i++)
275         for (int j=0; j<N; j++) {
276           sum += (thisIndex * i + j);
277         }
278
279       /* 2. send msg to K neighbors */
280       int msgSize = curIterMsgSize;
281
282       // Send msgs to neighbors
283       for (int i=0; i<numNeighbors; i++) {
284         //double memtimer = CkWallTimer();
285
286         toNeighborMsg *msg = iterMsg[i];
287
288 #if DEBUG
289         CkPrintf("[%d]: send msg to neighbor[%d]=%d\n", thisIndex, i, neighbors[i]);
290 #endif
291         msg->setMsgSrc(thisIndex, i);
292         //double entrytimer = CkWallTimer();
293         thisProxy(neighbors[i]).recvMsgs(msg);
294         //double entrylasttimer = CkWallTimer();
295         //if(thisIndex==0){
296         //      CkPrintf("At current step %d to neighbor %d, msg creation time: %f, entrymethod fire time: %f\n", internalStepCnt, neighbors[i], entrytimer-memtimer, entrylasttimer-entrytimer);
297         //}
298       }
299     }
300
301     void commWithNeighbors() {
302       internalStepCnt = 0;
303       curIterMsgSize = gMsgSize;
304       //currently the work size is only changed every big steps (which
305       //are initiated by the main proxy
306       random++;
307
308       if(iterMsg[0]==NULL) { //indicating the messages have not been created
309         for(int i=0; i<numNeighbors; i++)
310           iterMsg[i] = new(curIterMsgSize/4, 0) toNeighborMsg(curIterMsgSize/4);
311       }
312
313       startTime = CkWallTimer();
314       startInternalIteration();
315     }
316
317     void recvReplies(toNeighborMsg *m) {
318       int fromNID = m->nID;
319
320 #if DEBUG
321       CkPrintf("[%d]: receive ack from neighbor[%d]=%d\n", thisIndex, fromNID, neighbors[fromNID]);
322 #endif
323
324       iterMsg[fromNID] = m;
325       //recvTimes[fromNID] += (CkWallTimer() - startTime);
326
327       //get one step time and send it back to mainProxy
328       numNborsRcvd++;
329       if (numNborsRcvd == numNeighbors) {
330         internalStepCnt++;
331         if (internalStepCnt==CALCPERSTEP) {
332           double iterCommTime = CkWallTimer() - startTime;
333           contribute(sizeof(double), &iterCommTime, CkReduction::max_double);
334           /*if(thisIndex==0){
335             for(int i=0; i<numNeighbors; i++){
336             CkPrintf("RTT time from neighbor %d (actual elem id %d): %lf\n", i, neighbors[i], recvTimes[i]);
337             }
338             }*/
339         } else {
340           startInternalIteration();
341         }
342       }
343     }
344
345     void recvMsgs(toNeighborMsg *m) {
346 #if DEBUG
347       CkPrintf("[%d]: recv msg from %d as its %dth neighbor\n", thisIndex, m->fromX, m->nID);
348 #endif
349
350       thisProxy(m->fromX).recvReplies(m);
351     }
352
353     inline int MAX(int a, int b) {
354       return (a>b)?a:b;
355     }
356     inline int MIN(int a, int b) {
357       return (a<b)?a:b;
358     }
359 };
360
361 #include "kNeighbor.def.h"