changes to kNEighbor
[charm.git] / examples / charm++ / load_balancing / kNeighbor / kNeighbor.C
1 /** \file kNeighbor.C
2  *  Author: Chao Mei (chaomei2@illinois.edu)
3  *
4  *  Heavily modified by Abhinav Bhatele (bhatele@illinois.edu) 2011/02/13
5  */
6
7 #include "kNeighbor.decl.h"
8 #include <stdio.h>
9 #include <stdlib.h>
10
11 #define STRIDEK         1
12 #define CALCPERSTEP     100
13
14 #define DEBUG           0
15
16
17 /* readonly */ CProxy_Main mainProxy;
18 /* readonly */ int num_chares;
19 /* readonly */ int gMsgSize;
20 /* readonly */ int gLBFreq;
21 /* readonly */ int numSteps;
22
23 int cmpFunc(const void *a, const void *b) {
24   if(*(double *)a < *(double *)b) return -1;
25   if(*(double *)a > *(double *)b) return 1;
26   return 0;
27 }
28
29 class toNeighborMsg: public CMessage_toNeighborMsg {
30   public:
31     int *data;
32     int size;
33     int fromX;
34     int nID;
35
36   public:
37     toNeighborMsg() {};
38     toNeighborMsg(int s): size(s) {  
39     }
40
41     void setMsgSrc(int X, int id) {
42       fromX = X;
43       nID = id;
44     }
45
46 };
47
48 class Main: public CBase_Main {
49   public:
50     CProxy_Block array;
51
52     int currentStep;
53     int currentMsgSize;
54
55     int numElemsRcvd;
56     double totalTime;
57     double maxTime;
58     double minTime;
59     double *timeRec;
60     double startTime;
61
62     double gStarttime;
63
64   public:
65     Main(CkArgMsg *m) {
66       mainProxy = thisProxy;
67       CkPrintf("\nStarting kNeighbor ...\n");
68
69       if (m->argc!=4 && m->argc!=5) {
70         CkPrintf("Usage: %s <#elements> <#iterations> <msg size> [ldb freq]\n", m->argv[0]);
71         delete m;
72         CkExit();
73       }
74
75       num_chares = atoi(m->argv[1]);
76       if(num_chares < CkNumPes()) {
77         printf("Warning: #elements is forced to be equal to #pes\n");
78         num_chares = CkNumPes();
79       }
80
81       numSteps = atoi(m->argv[2]);
82       currentMsgSize = atoi(m->argv[3]);
83
84       gLBFreq = 100000;
85       if(m->argc==5) {
86         gLBFreq = atoi(m->argv[4]);
87       }
88
89 #if TURN_ON_LDB
90       printf("Setting load-balancing freq to every %d steps\n", gLBFreq);
91 #endif
92       gMsgSize = currentMsgSize;
93
94       currentStep = -1;
95       timeRec = new double[numSteps];
96
97       array = CProxy_Block::ckNew(num_chares);
98       CkCallback *cb = new CkCallback(CkIndex_Main::nextStep(NULL), thisProxy);
99       array.ckSetReductionClient(cb);
100       startTime = CmiWallTimer();
101
102       beginIteration();
103       for (int i=0; i<num_chares; i++)
104         array[i].commWithNeighbors();
105
106     }
107
108     void beginIteration() {
109       currentStep++;
110       if (currentStep == numSteps) {
111         CkPrintf("kNeighbor program finished!\n\n");
112         //CkCallback *cb = new CkCallback(CkIndex_Main::terminate(NULL), thisProxy);
113         //array.ckSetReductionClient(cb);
114         CkPrintf("Total time %lf\n", CmiWallTimer() - startTime);
115         terminate(NULL);
116         return;
117       }
118
119       numElemsRcvd = 0;
120       totalTime = 0.0;
121       maxTime = 0.0;
122       minTime = 3600.0;
123
124       //currentMsgSize = msgSize;
125       if(currentStep!=0 && (currentStep % gLBFreq == 0)) {
126      //   array.pauseForLB();
127         return;
128       }
129
130       gStarttime = CmiWallTimer();
131      // for (int i=0; i<num_chares; i++)
132      //   array[i].commWithNeighbors();
133     }
134
135     void resumeIter() {
136 #if DEBUG
137       CkPrintf("Resume iteration at step %d\n", currentStep);
138 #endif
139       gStarttime = CmiWallTimer();
140       for (int i=0; i<num_chares; i++)
141         array[i].commWithNeighbors();
142     }
143
144     void terminate(CkReductionMsg *msg) {
145       delete msg;
146       double total = 0.0;
147
148       for (int i=0; i<numSteps; i++)
149         timeRec[i] = timeRec[i]*1e6;
150
151       qsort(timeRec, numSteps, sizeof(double), cmpFunc);
152       printf("Time stats: lowest: %f, median: %f, highest: %f\n", timeRec[0], timeRec[numSteps/2], timeRec[numSteps-1]);
153
154       int samples = 100;
155       if(numSteps<=samples) samples = numSteps-1;
156       for (int i=0; i<samples; i++)
157         total += timeRec[i];
158       total /= samples;
159
160       CkPrintf("Average time for each %d-Neighbor iteration with msg size %d is %f (us)\n", STRIDEK, currentMsgSize, total);
161       CkExit();
162     }
163
164     void nextStep(CkReductionMsg  *msg) {
165       maxTime = *((double *)msg->getData());
166       delete msg;
167       double wholeStepTime = CmiWallTimer() - gStarttime;
168       timeRec[currentStep] = wholeStepTime/CALCPERSTEP;
169       if(currentStep % 10 == 0)
170         CkPrintf("Step %d with msg size %d finished: max=%f, total=%f\n", currentStep, currentMsgSize, maxTime/CALCPERSTEP, wholeStepTime/CALCPERSTEP);
171       beginIteration();
172     }
173
174 };
175
176
177 //no wrap around for sending messages to neighbors
178 class Block: public CBase_Block {
179   public:
180     int numNeighbors;
181     int numNborsRcvd;
182     int *neighbors;
183     double *recvTimes;
184     double startTime;
185
186     int random;
187     int curIterMsgSize;
188     int internalStepCnt;
189     int sum;
190     int currentStep;
191
192     toNeighborMsg **iterMsg;
193
194   public:
195     Block() {
196       //srand(thisIndex.x+thisIndex.y);
197       usesAtSync = CmiTrue;
198
199       numNeighbors = 2*STRIDEK;
200       neighbors = new int[numNeighbors];
201       recvTimes = new double[numNeighbors];
202       int nidx=0;
203       //setting left neighbors
204       for (int i=thisIndex-STRIDEK; i<thisIndex; i++, nidx++) {
205         int tmpnei = i;
206         while (tmpnei<0) tmpnei += num_chares;
207         neighbors[nidx] = tmpnei;
208       }
209       //setting right neighbors
210       for (int i=thisIndex+1; i<=thisIndex+STRIDEK; i++, nidx++) {
211         int tmpnei = i;
212         while (tmpnei>=num_chares) tmpnei -= num_chares;
213         neighbors[nidx] = tmpnei;
214       }
215
216       for (int i=0; i<numNeighbors; i++)
217         recvTimes[i] = 0.0;
218
219       iterMsg = new toNeighborMsg *[numNeighbors];
220       for (int i=0; i<numNeighbors; i++)
221         iterMsg[i] = NULL;
222
223 #if DEBUG
224       CkPrintf("Neighbors of %d: ", thisIndex);
225       for (int i=0; i<numNeighbors; i++)
226         CkPrintf("%d ", neighbors[i]);
227       CkPrintf("\n");
228 #endif
229
230       random = thisIndex*31+73;
231     }
232
233     ~Block() {
234       delete [] neighbors;
235       delete [] recvTimes;
236       delete [] iterMsg;
237     }
238
239     void pup(PUP::er &p){
240       ArrayElement1D::pup(p); //pack our superclass
241       p(numNeighbors);
242       p(numNborsRcvd);
243
244       if(p.isUnpacking()) {
245         neighbors = new int[numNeighbors];
246         recvTimes = new double[numNeighbors];
247       }
248       PUParray(p, neighbors, numNeighbors);
249       PUParray(p, recvTimes, numNeighbors);
250       p(startTime);
251       p(random);
252       p(curIterMsgSize);
253       p(internalStepCnt);
254       p(sum);
255       p(currentStep);
256       if(p.isUnpacking()) iterMsg = new toNeighborMsg *[numNeighbors];
257       for(int i=0; i<numNeighbors; i++){
258         CkPupMessage(p, (void **)&iterMsg[i]);
259       }
260     }
261
262     Block(CkMigrateMessage *m) {}
263
264     void pauseForLB(){
265 #if DEBUG
266       CkPrintf("Element %d pause for LB on PE %d\n", thisIndex, CkMyPe());
267 #endif
268       AtSync();
269     }
270
271     void ResumeFromSync(){ //Called by load-balancing framework
272      // CkCallback cb(CkIndex_Main::resumeIter(), mainProxy);
273      // contribute(0, NULL, CkReduction::sum_int, cb);
274      commWithNeighbors();
275     }
276
277     void startInternalIteration() {
278 #if DEBUG
279       CkPrintf("[%d]: Start internal iteration \n", thisIndex);
280 #endif
281
282       numNborsRcvd = 0;
283       /* 1: pick a work size and do some computation */
284       //int N = (thisIndex * thisIndex / num_chares) * 100;
285       int N = 2;
286       if (currentStep < numSteps/4) {
287         if (thisIndex >= num_chares/4 && thisIndex < num_chares/2) {
288           N = 500;
289         }
290       } else if(currentStep < numSteps/2) {
291         if (thisIndex >= num_chares/num_chares && thisIndex < num_chares/4) {
292           N = 500;
293         }
294       } else if (currentStep < 3*numSteps/2) {
295         if (thisIndex >= num_chares/2 && thisIndex < 3*(num_chares/4)) {
296           N = 500;
297         }
298       } else {
299         if (thisIndex >= 3*(num_chares/4) && thisIndex < num_chares) {
300           N = 500;
301         }
302       }
303
304       for (int i=0; i<N; i++)
305         for (int j=0; j<N; j++) {
306           sum += (thisIndex * i + j);
307         }
308
309       /* 2. send msg to K neighbors */
310       int msgSize = curIterMsgSize;
311
312       // Send msgs to neighbors
313       for (int i=0; i<numNeighbors; i++) {
314         //double memtimer = CmiWallTimer();
315
316         toNeighborMsg *msg = iterMsg[i];
317
318 #if DEBUG
319         CkPrintf("[%d]: send msg to neighbor[%d]=%d\n", thisIndex, i, neighbors[i]);
320 #endif
321         msg->setMsgSrc(thisIndex, i);
322         //double entrytimer = CmiWallTimer();
323         thisProxy(neighbors[i]).recvMsgs(msg);
324         //double entrylasttimer = CmiWallTimer();
325         //if(thisIndex==0){
326         //      CkPrintf("At current step %d to neighbor %d, msg creation time: %f, entrymethod fire time: %f\n", internalStepCnt, neighbors[i], entrytimer-memtimer, entrylasttimer-entrytimer);
327         //}
328       }
329     }
330
331     void commWithNeighbors() {
332       currentStep++;
333       internalStepCnt = 0;
334       curIterMsgSize = gMsgSize;
335       //currently the work size is only changed every big steps (which
336       //are initiated by the main proxy
337       random++;
338
339       if(iterMsg[0]==NULL) { //indicating the messages have not been created
340         for(int i=0; i<numNeighbors; i++)
341           iterMsg[i] = new(curIterMsgSize/4, 0) toNeighborMsg(curIterMsgSize/4);
342       }
343
344       startTime = CmiWallTimer();
345       startInternalIteration();
346     }
347
348     void recvReplies(toNeighborMsg *m) {
349       int fromNID = m->nID;
350
351 #if DEBUG
352       CkPrintf("[%d]: receive ack from neighbor[%d]=%d\n", thisIndex, fromNID, neighbors[fromNID]);
353 #endif
354
355       iterMsg[fromNID] = m;
356       //recvTimes[fromNID] += (CmiWallTimer() - startTime);
357
358       //get one step time and send it back to mainProxy
359       numNborsRcvd++;
360       if (numNborsRcvd == numNeighbors) {
361         internalStepCnt++;
362         if (internalStepCnt==CALCPERSTEP) {
363           double iterCommTime = CmiWallTimer() - startTime;
364           contribute(sizeof(double), &iterCommTime, CkReduction::max_double);
365
366           if(currentStep!=0 && (currentStep % gLBFreq == 0)) {
367             pauseForLB();
368             return; 
369           }
370           if (currentStep != numSteps ) {
371             commWithNeighbors();
372           }
373           /*if(thisIndex==0){
374             for(int i=0; i<numNeighbors; i++){
375             CkPrintf("RTT time from neighbor %d (actual elem id %d): %lf\n", i, neighbors[i], recvTimes[i]);
376             }
377             }*/
378         } else {
379           startInternalIteration();
380         }
381       }
382     }
383
384     void recvMsgs(toNeighborMsg *m) {
385 #if DEBUG
386       CkPrintf("[%d]: recv msg from %d as its %dth neighbor\n", thisIndex, m->fromX, m->nID);
387 #endif
388
389       thisProxy(m->fromX).recvReplies(m);
390     }
391
392     inline int MAX(int a, int b) {
393       return (a>b)?a:b;
394     }
395     inline int MIN(int a, int b) {
396       return (a<b)?a:b;
397     }
398 };
399
400 #include "kNeighbor.def.h"