2de665f9656c8f2fc721381f01c00346705e4c03
[charm.git] / examples / charm++ / load_balancing / kNeighbor / kNeighbor.C
1 /** \file kNeighbor.C
2  *  Author: Chao Mei
3  *
4  */
5
6 #include "kNeighbor.decl.h"
7 #include <stdio.h>
8 #include <stdlib.h>
9
10 #define STRIDEK         1
11 #define CALCPERSTEP     100
12 #define WRAPAROUND      1
13 #define ALLCROSSNODE    0
14
15 #define DEBUG           0
16 #define REUSE_ITER_MSG  0
17 #define TOUCH_MSGDATA   0
18
19 #define WORKSIZECNT     1
20 #define MSGSIZECNT      1
21
22 /* readonly */ CProxy_Main mainProxy;
23 /* readonly */ int num_chares;
24 /* readonly */ int gMsgSize;
25 /* readonly */ int gLBFreq;
26
27 int cmpFunc(const void *a, const void *b) {
28   if(*(double *)a < *(double *)b) return -1;
29   if(*(double *)a > *(double *)b) return 1;
30   return 0;
31 }
32
33 class toNeighborMsg: public CMessage_toNeighborMsg {
34   public:
35     int *data;
36     int size;
37     int fromX;
38     int nID;
39
40   public:
41     toNeighborMsg() {};
42     toNeighborMsg(int s): size(s) {  
43 #if TOUCH_MSGDATA
44       init();
45 #endif
46     }
47
48     void setMsgSrc(int X, int id) {
49       fromX = X;
50       nID = id;
51     }
52
53     void init() {
54       for (int i=0; i<size; i++)
55         data[i] = i;
56     }
57
58     int sum() {
59       int s=0;
60       for (int i=0; i<size; i++)
61         s += data[i];
62       return s;
63     }
64 };
65
66 class Main: public CBase_Main {
67   public:
68     CProxy_Block array;
69
70     //static int msgSizeArr[MSGSIZECNT];
71
72     int numSteps;
73     int currentStep;
74     int currentMsgSize;
75
76     int elementsRecved;
77     double totalTime;
78     double maxTime;
79     double minTime;
80     double *timeRec;
81
82     double gStarttime;
83
84   public:
85     Main(CkArgMsg *m) {
86       mainProxy = thisProxy;
87       CkPrintf("\nStarting kNeighbor ...\n");
88
89       if (m->argc!=4 && m->argc!=5) {
90         CkPrintf("Usage: %s <#elements> <#iterations> <msg size> [ldb freq]\n", m->argv[0]);
91         delete m;
92         CkExit();
93       }
94
95       num_chares = atoi(m->argv[1]);
96       if(num_chares < CkNumPes()) {
97         printf("Warning: #elements is forced to be equal to #pes\n");
98         num_chares = CkNumPes();
99       }
100
101       numSteps = atoi(m->argv[2]);
102       currentMsgSize = atoi(m->argv[3]);
103
104       gLBFreq = 100000;        
105       if(m->argc==5) {
106         gLBFreq = atoi(m->argv[4]);
107       }
108
109 #if TURN_ON_LDB
110       printf("Setting load-balancing freq to every %d steps\n", gLBFreq);  
111 #endif
112 #if REUSE_ITER_MSG
113       gMsgSize = currentMsgSize;
114 #endif
115
116       currentStep = -1;
117       timeRec = new double[numSteps];
118
119       array = CProxy_Block::ckNew(num_chares);
120       CkCallback *cb = new CkCallback(CkIndex_Main::nextStep(NULL), thisProxy);
121       array.ckSetReductionClient(cb);
122
123       beginIteration();
124     }
125
126     void beginIteration() {
127       currentStep++;
128       if (currentStep == numSteps) {
129         CkPrintf("kNeighbor program finished!\n");
130         //CkCallback *cb = new CkCallback(CkIndex_Main::terminate(NULL), thisProxy);
131         //array.ckSetReductionClient(cb);
132         //array.printSts(numSteps);
133         terminate(NULL);
134         return;
135         //CkExit();
136       }
137
138       elementsRecved = 0;
139       totalTime = 0.0;
140       maxTime = 0.0;
141       minTime = 3600.0;
142
143       //int msgSize = msgSizeArr[currentStep%MSGSIZECNT];
144       //int msgSize = msgSizeArr[rand()%MSGSIZECNT];
145       //currentMsgSize = msgSize;
146       if(currentStep!=0 && (currentStep % gLBFreq == 0)) {
147         array.pauseForLB();
148         return;
149       }
150
151       gStarttime = CmiWallTimer();
152 #if REUSE_ITER_MSG
153       for (int i=0; i<num_chares; i++)
154         array[i].commWithNeighbors();
155 #else
156       for (int i=0; i<num_chares; i++)
157         array[i].commWithNeighbors(currentMsgSize);
158 #endif  
159       //array.commWithNeighbors(currentMsgSize);
160     }
161     
162     void resumeIter() {
163 #if DEBUG
164       CkPrintf("Resume iteration at step %d\n", currentStep);
165 #endif
166       gStarttime = CmiWallTimer();
167 #if REUSE_ITER_MSG
168       for (int i=0; i<num_chares; i++)
169         array[i].commWithNeighbors();
170 #else
171       for (int i=0; i<num_chares; i++)
172         array[i].commWithNeighbors(currentMsgSize);
173 #endif
174     }
175
176     void terminate(CkReductionMsg *msg) {
177       delete msg;
178       double total = 0.0;
179
180       for (int i=0; i<numSteps; i++)
181         timeRec[i] = timeRec[i]*1e6;
182
183       qsort(timeRec, numSteps, sizeof(double), cmpFunc);
184       printf("Time stats: lowest: %f, median: %f, highest: %f\n", timeRec[0], timeRec[numSteps/2], timeRec[numSteps-1]);
185
186       int samples = 100;
187       if(numSteps<=samples) samples = numSteps-1;
188       for (int i=0; i<samples; i++)
189         total += timeRec[i];
190       total /= samples;
191
192       CkPrintf("Average time for each %d-Neighbor iteration with msg size %d is %f (us)\n", STRIDEK, currentMsgSize, total);
193       CkExit();
194     }
195
196     void nextStep_plain(double iterTime) {
197       elementsRecved++;
198       totalTime += iterTime;
199       maxTime = maxTime>iterTime?maxTime:iterTime;
200       minTime = minTime<iterTime?minTime:iterTime;
201
202       if (elementsRecved == num_chares) {
203           double wholeStepTime = CmiWallTimer() - gStarttime;
204           timeRec[currentStep] = wholeStepTime/CALCPERSTEP;
205           //CkPrintf("Step %d with msg size %d finished: max=%f, total=%f\n", currentStep, currentMsgSize, maxTime/CALCPERSTEP, wholeStepTime/CALCPERSTEP);
206
207           beginIteration();
208       }
209     }
210
211     void nextStep(CkReductionMsg  *msg) {
212       maxTime = *((double *)msg->getData());
213       delete msg;
214       double wholeStepTime = CmiWallTimer() - gStarttime;
215       timeRec[currentStep] = wholeStepTime/CALCPERSTEP;
216       //CkPrintf("Step %d with msg size %d finished: max=%f, total=%f\n", currentStep, currentMsgSize, maxTime/CALCPERSTEP, wholeStepTime/CALCPERSTEP);
217       beginIteration();
218     }
219
220 };
221
222 //int Main::msgSizeArr[MSGSIZECNT] = {16, 32, 128, 256, 512, 1024, 2048, 4096};
223 //int Main::msgSizeArr[MSGSIZECNT] = {10000};
224
225 //no wrap around for sending messages to neighbors
226 class Block: public CBase_Block {
227   public:
228     /** actual work size is of workSize^3 */
229     static int workSizeArr[WORKSIZECNT];
230
231     int numNeighbors;
232     int neighborsRecved;
233     int *neighbors;
234     double *recvTimes;
235     double startTime;
236
237     int random;
238     int curIterMsgSize;
239     int curIterWorkSize;
240     int internalStepCnt;
241     int sum;
242
243 #if REUSE_ITER_MSG
244     toNeighborMsg **iterMsg;
245 #endif
246
247   public:
248     Block() {
249       //srand(thisIndex.x+thisIndex.y);
250       usesAtSync = CmiTrue;
251
252 #if WRAPAROUND
253       numNeighbors = 2*STRIDEK;
254       neighbors = new int[numNeighbors];
255       recvTimes = new double[numNeighbors];
256       int nidx=0;
257       //setting left neighbors
258       for (int i=thisIndex-STRIDEK; i<thisIndex; i++, nidx++) {
259         int tmpnei = i;
260         while (tmpnei<0) tmpnei += num_chares;
261         neighbors[nidx] = tmpnei;
262       }
263       //setting right neighbors
264       for (int i=thisIndex+1; i<=thisIndex+STRIDEK; i++, nidx++) {
265         int tmpnei = i;
266         while (tmpnei>=num_chares) tmpnei -= num_chares;
267         neighbors[nidx] = tmpnei;
268       }
269 #elif ALLCROSSNODE
270       if(CkNumNodes()==1){
271         if(thisIndex==0){
272           CkPrintf("This version has to run with more than 2 nodes!\n");
273           CkExit();
274         }
275         return;
276       }
277       numNeighbors = CkNumNodes()-1;
278       neighbors = new int[numNeighbors];
279       recvTimes = new double[numNeighbors];
280       for(int i=0; i<numNeighbors; i++){
281         neighbors[i] = (thisIndex+(i+1)*CmiMyNodeSize())%CkNumPes();
282       }
283 #else
284       //calculate the neighbors this element has
285       numNeighbors = 0;
286       numNeighbors += thisIndex - MAX(0, thisIndex-STRIDEK); //left
287       numNeighbors += MIN(num_chares-1, thisIndex+STRIDEK)-thisIndex; //right
288       neighbors = new int[numNeighbors];
289       recvTimes = new double[numNeighbors];
290       int nidx=0;
291       for (int i=MAX(0, thisIndex-STRIDEK); i<thisIndex; i++, nidx++) neighbors[nidx]=i;
292       for (int i=thisIndex+1; i<=MIN(num_chares-1, thisIndex+STRIDEK); i++, nidx++) neighbors[nidx] = i;
293 #endif
294
295       for (int i=0; i<numNeighbors; i++)
296         recvTimes[i] = 0.0;
297
298 #if REUSE_ITER_MSG
299       iterMsg = new toNeighborMsg *[numNeighbors];
300       for (int i=0; i<numNeighbors; i++)
301         iterMsg[i] = NULL;      
302 #endif
303
304 #if DEBUG
305       CkPrintf("Neighbors of %d: ", thisIndex);
306       for (int i=0; i<numNeighbors; i++)
307         CkPrintf("%d ", neighbors[i]);
308       CkPrintf("\n");
309 #endif
310
311       random = thisIndex*31+73;
312     }
313
314     ~Block() {
315       delete [] neighbors;
316       delete [] recvTimes;
317 #if REUSE_ITER_MSG
318       delete [] iterMsg;
319 #endif
320     }
321     
322     void pup(PUP::er &p){
323       ArrayElement1D::pup(p); //pack our superclass
324       p(workSizeArr, WORKSIZECNT);
325       p(numNeighbors);
326       p(neighborsRecved);
327
328       if(p.isUnpacking()) {
329         neighbors = new int[numNeighbors];
330         recvTimes = new double[numNeighbors];
331       }
332       PUParray(p, neighbors, numNeighbors);
333       PUParray(p, recvTimes, numNeighbors);
334       p(startTime);
335       p(random);
336       p(curIterMsgSize);
337       p(curIterWorkSize);
338       p(internalStepCnt);
339       p(sum);
340 #if REUSE_ITER_MSG
341       if(p.isUnpacking()) iterMsg = new toNeighborMsg *[numNeighbors];
342       for(int i=0; i<numNeighbors; i++){
343         if(p.isUnpacking()) iterMsg[i] = new(curIterMsgSize/4, 0) toNeighborMsg(curIterMsgSize/4);
344         CkPupMessage(p, (void **)&iterMsg[i]);
345       }
346 #endif                  
347     }
348
349     Block(CkMigrateMessage *m) {}
350
351     void pauseForLB(){
352 #if DEBUG
353       CkPrintf("Element %d pause for LB on PE %d\n", thisIndex, CkMyPe());
354 #endif
355       AtSync();
356     }
357
358     void ResumeFromSync(){ //Called by load-balancing framework
359       CkCallback cb(CkIndex_Main::resumeIter(), mainProxy);
360       contribute(0, NULL, CkReduction::sum_int, cb);
361     }
362
363     void printSts(int totalSteps){
364       /*for(int i=0; i<numNeighbors; i++){
365         CkPrintf("Elem[%d]: avg RTT from neighbor %d (actual elem id %d): %lf\n", thisIndex, i, neighbors[i], recvTimes[i]/totalSteps);
366         }*/
367       contribute(0,0,CkReduction::max_int);
368     }
369
370     void startInternalIteration() {
371 #if DEBUG
372       CkPrintf("[%d]: Start internal iteration \n", thisIndex);
373 #endif
374
375       neighborsRecved = 0;
376       /* 1: pick a work size and do some computation */
377       int sum=0;
378       int N=curIterWorkSize;
379       for (int i=0; i<N; i++)
380         for (int j=0; j<N; j++)
381           for (int k=0; k<N; k++)
382             sum += (thisIndex*i+thisIndex*j+k)%WORKSIZECNT;
383       /* 2. send msg to K neighbors */
384       int msgSize = curIterMsgSize;
385
386       // Send msgs to neighbors
387       for (int i=0; i<numNeighbors; i++) {
388         //double memtimer = CmiWallTimer();
389
390 #if REUSE_ITER_MSG
391         toNeighborMsg *msg = iterMsg[i];
392 #else
393         toNeighborMsg *msg = new(msgSize/4, 0) toNeighborMsg(msgSize/4);
394 #endif
395
396 #if DEBUG
397         CkPrintf("[%d]: send msg to neighbor[%d]=%d\n", thisIndex, i, neighbors[i]);
398 #endif
399         msg->setMsgSrc(thisIndex, i);
400         //double entrytimer = CmiWallTimer();
401         thisProxy(neighbors[i]).recvMsgs(msg);
402         //double entrylasttimer = CmiWallTimer();
403         //if(thisIndex==0){
404         //      CkPrintf("At current step %d to neighbor %d, msg creation time: %f, entrymethod fire time: %f\n", internalStepCnt, neighbors[i], entrytimer-memtimer, entrylasttimer-entrytimer);
405         //}
406       }
407     }
408
409     void commWithNeighbors(int msgSize) {
410       internalStepCnt = 0;
411       curIterMsgSize = msgSize;
412       //currently the work size is only changed every big steps (which
413       //are initiated by the main proxy
414       curIterWorkSize = workSizeArr[random%WORKSIZECNT];
415       random++;
416
417       startTime = CmiWallTimer();
418       startInternalIteration();
419     }
420
421     void commWithNeighbors() {
422       internalStepCnt = 0;
423       curIterMsgSize = gMsgSize;
424       //currently the work size is only changed every big steps (which
425       //are initiated by the main proxy
426       curIterWorkSize = workSizeArr[random%WORKSIZECNT];
427       random++;
428
429 #if REUSE_ITER_MSG
430       if(iterMsg[0]==NULL) { //indicating the messages have not been created
431         for(int i=0; i<numNeighbors; i++)
432           iterMsg[i] = new(curIterMsgSize/4, 0) toNeighborMsg(curIterMsgSize/4);
433       }
434 #endif
435
436       startTime = CmiWallTimer();
437       startInternalIteration();
438     }
439
440     void recvReplies(toNeighborMsg *m) {
441       int fromNID = m->nID;
442
443 #if DEBUG
444       CkPrintf("[%d]: receive ack from neighbor[%d]=%d\n", thisIndex, fromNID, neighbors[fromNID]);
445 #endif
446
447 #if REUSE_ITER_MSG
448       iterMsg[fromNID] = m;
449 #else
450       delete m;
451 #endif
452       //recvTimes[fromNID] += (CmiWallTimer() - startTime);
453
454       //get one step time and send it back to mainProxy
455       neighborsRecved++;
456       if (neighborsRecved == numNeighbors) {
457         internalStepCnt++;
458         if (internalStepCnt==CALCPERSTEP) {
459           double iterCommTime = CmiWallTimer() - startTime;
460           contribute(sizeof(double), &iterCommTime, CkReduction::max_double);
461           /*if(thisIndex==0){
462             for(int i=0; i<numNeighbors; i++){
463             CkPrintf("RTT time from neighbor %d (actual elem id %d): %lf\n", i, neighbors[i], recvTimes[i]);
464             }
465             }*/
466         } else {
467           startInternalIteration();
468         }
469       }
470     }
471
472     void recvMsgs(toNeighborMsg *m) {
473 #if DEBUG
474       CkPrintf("[%d]: recv msg from %d as its %dth neighbor\n", thisIndex, m->fromX, m->nID);
475 #endif
476
477 #if TOUCH_MSGDATA
478       sum = m->sum();
479 #endif
480       thisProxy(m->fromX).recvReplies(m);
481     }
482
483     inline int MAX(int a, int b) {
484       return (a>b)?a:b;
485     }
486     inline int MIN(int a, int b) {
487       return (a<b)?a:b;
488     }
489 };
490
491 //int Block::workSizeArr[WORKSIZECNT] = {20, 60, 120, 180, 240};
492 int Block::workSizeArr[WORKSIZECNT] = {20};
493
494 #include "kNeighbor.def.h"