kNeighbor ldb example: written by Chao Mei
[charm.git] / examples / charm++ / load_balancing / kNeighbor / kNeighbor.C
1 /** \file kNeighbor.C
2  *  Author: Chao Mei
3  *
4  */
5
6 #include "kNeighbor.decl.h"
7 #include <stdio.h>
8 #include <stdlib.h>
9
10 #define STRIDEK         1
11 #define CALCPERSTEP     100
12 #define WRAPAROUND      1
13 #define ALLCROSSNODE    0
14
15 #define DEBUG           0
16 #define REUSE_ITER_MSG  0
17 #define TOUCH_MSGDATA   0
18
19 #define TURN_ON_LDB     1
20
21 #define WORKSIZECNT     1
22 #define MSGSIZECNT      1
23
24 /* readonly */ CProxy_Main mainProxy;
25 /* readonly */ int num_chares;
26 /* readonly */ int gMsgSize;
27 /* readonly */ int gLBFreq;
28
29 class toNeighborMsg: public CMessage_toNeighborMsg {
30 public:
31     int *data;
32     int size;
33     int fromX;
34     int nID;
35
36 public:
37     toNeighborMsg() {};
38     toNeighborMsg(int s): size(s) {  
39 #if TOUCH_MSGDATA
40         init(); 
41 #endif
42     }
43
44     void setMsgSrc(int X, int id) {
45         fromX = X;
46         nID = id;
47     }
48     void init() {
49         for (int i=0; i<size; i++)
50           data[i] = i;
51     }
52     int sum() {
53         int s=0;
54         for (int i=0; i<size; i++)
55           s += data[i];
56         return s;
57     }        
58 };
59
60
61 int cmpFunc(const void *a, const void *b){
62    if(*(double *)a < *(double *)b) return -1;
63    if(*(double *)a > *(double *)b) return 1;
64    return 0;
65    
66 }
67
68 class Main: public CBase_Main {
69 public:
70     CProxy_Block array;
71
72     //static int msgSizeArr[MSGSIZECNT];
73
74     int numSteps;
75     int currentStep;
76     int currentMsgSize;
77
78     int elementsRecved;
79     double totalTime;
80     double maxTime;
81     double minTime;
82     double *timeRec;
83
84     double gStarttime;
85
86 public:
87     Main(CkArgMsg *m) {
88         mainProxy = thisProxy;
89         CkPrintf("\nStarting kNeighbor ...\n");
90
91         if (m->argc!=4 && m->argc!=5) {
92             CkPrintf("Usage: %s <#elements> <#iterations> <msg size> [ldb freq]\n", m->argv[0]);
93             delete m;
94             CkExit();
95         }
96
97         num_chares = atoi(m->argv[1]);
98         if(num_chares < CkNumPes()){
99                 printf("Warning: #elements is forced to be euqal to #pes\n");
100                 num_chares = CkNumPes();
101         }
102
103         numSteps = atoi(m->argv[2]);
104
105         currentMsgSize = atoi(m->argv[3]);
106         
107         gLBFreq = 100000;        
108         if(m->argc==5){
109                 gLBFreq = atoi(m->argv[4]);
110         }
111 #if TURN_ON_LDB
112         printf("Setting load-balancing freq to every %d steps\n", gLBFreq);  
113 #endif
114         #if REUSE_ITER_MSG
115         gMsgSize = currentMsgSize;
116         #endif
117
118         currentStep = -1;
119
120         timeRec = new double[numSteps];
121
122         array = CProxy_Block::ckNew(num_chares);
123
124         CkCallback *cb = new CkCallback(CkIndex_Main::nextStep(NULL), thisProxy);
125         array.ckSetReductionClient(cb);
126
127         beginIteration();
128     }
129
130     void beginIteration() {
131         currentStep++;
132         if (currentStep==numSteps) {
133             CkPrintf("kNeighbor program finished!\n");
134             //CkCallback *cb = new CkCallback(CkIndex_Main::terminate(NULL), thisProxy);
135             //array.ckSetReductionClient(cb);
136             //array.printSts(numSteps);
137             terminate(NULL);
138             return;
139             //CkExit();
140         }
141
142         elementsRecved = 0;
143         totalTime = 0.0;
144         maxTime = 0.0;
145         minTime = 3600.0;
146
147         //int msgSize = msgSizeArr[currentStep%MSGSIZECNT];
148         //int msgSize = msgSizeArr[rand()%MSGSIZECNT];
149         //currentMsgSize = msgSize;
150 #if TURN_ON_LDB
151                                 if(currentStep!=0 && (currentStep % gLBFreq == 0)){
152                                         array.pauseForLB();
153                                         return;
154                                 }
155 #endif
156
157         gStarttime = CmiWallTimer();
158 #if REUSE_ITER_MSG
159         for (int i=0; i<num_chares; i++)
160             array[i].commWithNeighbors();
161 #else
162         for (int i=0; i<num_chares; i++)
163             array[i].commWithNeighbors(currentMsgSize);
164 #endif  
165         //array.commWithNeighbors(currentMsgSize);
166     }
167     
168     void resumeIter() {
169 #if DEBUG
170                         CkPrintf("Resume iteration at step %d\n", currentStep);
171 #endif
172                                 gStarttime = CmiWallTimer();
173 #if REUSE_ITER_MSG
174         for (int i=0; i<num_chares; i++)
175             array[i].commWithNeighbors();
176 #else
177         for (int i=0; i<num_chares; i++)
178             array[i].commWithNeighbors(currentMsgSize);
179 #endif                                  
180     }
181
182     void terminate(CkReductionMsg  *msg){
183         delete msg;
184         double total = 0.0;
185         for (int i=0; i<numSteps; i++) timeRec[i] = timeRec[i]*1e6;
186         qsort(timeRec, numSteps, sizeof(double), cmpFunc);
187         printf("Time stats: lowest: %f, median: %f, highest: %f\n", timeRec[0], timeRec[numSteps/2], timeRec[numSteps-1]);
188         int samples = 100;
189         if(numSteps<=samples) samples = numSteps-1;
190         for (int i=0; i<samples; i++) total += timeRec[i];
191         total /= samples;
192         CkPrintf("Average time for each %d-Neighbor iteration with msg size %d is %f (us)\n", STRIDEK, currentMsgSize, total);
193         CkExit();
194     }
195
196     void nextStep_plain(double iterTime) {
197         elementsRecved++;
198         totalTime += iterTime;
199         maxTime = maxTime>iterTime?maxTime:iterTime;
200         minTime = minTime<iterTime?minTime:iterTime;
201
202         if (elementsRecved == num_chares) {
203             double wholeStepTime = CmiWallTimer() - gStarttime;
204             timeRec[currentStep] = wholeStepTime/CALCPERSTEP;
205             //CkPrintf("Step %d with msg size %d finished: max=%f, total=%f\n", currentStep, currentMsgSize, maxTime/CALCPERSTEP, wholeStepTime/CALCPERSTEP);
206
207             beginIteration();
208         }
209     }
210
211     void nextStep(CkReductionMsg  *msg) {
212         maxTime = *((double *)msg->getData());
213         delete msg;
214         double wholeStepTime = CmiWallTimer() - gStarttime;
215         timeRec[currentStep] = wholeStepTime/CALCPERSTEP;
216         //CkPrintf("Step %d with msg size %d finished: max=%f, total=%f\n", currentStep, currentMsgSize, maxTime/CALCPERSTEP, wholeStepTime/CALCPERSTEP);
217         beginIteration();
218     }
219
220 };
221
222 //int Main::msgSizeArr[MSGSIZECNT] = {16, 32, 128, 256, 512, 1024, 2048, 4096};
223 //int Main::msgSizeArr[MSGSIZECNT] = {10000};
224
225 //no wrap around for sending messages to neighbors
226 class Block: public CBase_Block {
227 public:
228     /** actual work size is of workSize^3 */
229     static int workSizeArr[WORKSIZECNT];
230
231     int numNeighbors;
232     int neighborsRecved;
233     int *neighbors;
234     double *recvTimes;
235
236     double startTime;
237
238     int random;
239
240     int curIterMsgSize;
241     int curIterWorkSize;
242     int internalStepCnt;
243
244     int sum;
245
246 #if REUSE_ITER_MSG
247     toNeighborMsg **iterMsg;
248 #endif
249
250 public:
251     Block() {
252         //srand(thisIndex.x+thisIndex.y);
253
254 #if TURN_ON_LDB
255                                 usesAtSync = CmiTrue;
256 #endif
257
258 #if WRAPAROUND
259         numNeighbors = 2*STRIDEK;
260         neighbors = new int[numNeighbors];
261         recvTimes = new double[numNeighbors];
262         int nidx=0;
263         //setting left neighbors
264         for (int i=thisIndex-STRIDEK; i<thisIndex; i++, nidx++) {
265             int tmpnei = i;
266             while (tmpnei<0) tmpnei += num_chares;
267             neighbors[nidx] = tmpnei;
268         }
269         //setting right neighbors
270         for (int i=thisIndex+1; i<=thisIndex+STRIDEK; i++, nidx++) {
271             int tmpnei = i;
272             while (tmpnei>=num_chares) tmpnei -= num_chares;
273             neighbors[nidx] = tmpnei;
274         }
275 #elif ALLCROSSNODE
276         if(CkNumNodes()==1){
277             if(thisIndex==0){
278                 CkPrintf("This version has to run with more than 2 nodes!\n");
279                 CkExit();
280             }
281             return;
282         }
283         numNeighbors = CkNumNodes()-1;
284         neighbors = new int[numNeighbors];
285         recvTimes = new double[numNeighbors];
286         for(int i=0; i<numNeighbors; i++){
287             neighbors[i] = (thisIndex+(i+1)*CmiMyNodeSize())%CkNumPes();
288         }
289 #else
290         //calculate the neighbors this element has
291         numNeighbors = 0;
292         numNeighbors += thisIndex - MAX(0, thisIndex-STRIDEK); //left
293         numNeighbors += MIN(num_chares-1, thisIndex+STRIDEK)-thisIndex; //right
294         neighbors = new int[numNeighbors];
295         recvTimes = new double[numNeighbors];
296         int nidx=0;
297         for (int i=MAX(0, thisIndex-STRIDEK); i<thisIndex; i++, nidx++) neighbors[nidx]=i;
298         for (int i=thisIndex+1; i<=MIN(num_chares-1, thisIndex+STRIDEK); i++, nidx++) neighbors[nidx] = i;
299 #endif
300
301         for (int i=0; i<numNeighbors; i++)
302             recvTimes[i] = 0.0;
303
304 #if REUSE_ITER_MSG
305         iterMsg = new toNeighborMsg *[numNeighbors];
306         for (int i=0; i<numNeighbors; i++)
307             iterMsg[i] = NULL;  
308 #endif
309
310 #if DEBUG
311         CkPrintf("Neighbors of %d: ", thisIndex);
312         for (int i=0; i<numNeighbors; i++)
313             CkPrintf("%d ", neighbors[i]);
314         CkPrintf("\n");
315 #endif
316
317         random = thisIndex*31+73;
318     }
319
320     ~Block() {
321         delete [] neighbors;
322         delete [] recvTimes;
323 #if REUSE_ITER_MSG
324         delete [] iterMsg;
325 #endif
326     }
327     
328     void pup(PUP::er &p){
329                         ArrayElement1D::pup(p); //pack our superclass
330                         p(workSizeArr, WORKSIZECNT);
331                         p(numNeighbors);
332                         p(neighborsRecved);
333                         if(p.isUnpacking()){
334                                 neighbors = new int[numNeighbors];
335                                 recvTimes = new double[numNeighbors];
336                         }
337                         PUParray(p, neighbors, numNeighbors);
338                         PUParray(p, recvTimes, numNeighbors);
339                         p(startTime);
340                         p(random);
341                         p(curIterMsgSize);
342                         p(curIterWorkSize);
343                         p(internalStepCnt);
344                         p(sum);
345 #if REUSE_ITER_MSG
346                         if(p.isUnpacking()) iterMsg = new toNeighborMsg *[numNeighbors];
347                         for(int i=0; i<numNeighbors; i++){
348                                 if(p.isUnpacking()) iterMsg[i] = new(curIterMsgSize/4, 0) toNeighborMsg(curIterMsgSize/4);
349                                 CkPupMessage(p, (void **)&iterMsg[i]);
350                         }
351 #endif                  
352     }
353
354     Block(CkMigrateMessage *m) {}
355
356                 void pauseForLB(){
357 #if DEBUG
358                         CkPrintf("Element %d pause for LB on PE %d\n", thisIndex, CkMyPe());
359 #endif
360                         AtSync();
361                 }
362                 
363                 void ResumeFromSync(){ //Called by load-balancing framework
364                         CkCallback cb(CkIndex_Main::resumeIter(), mainProxy);
365                         contribute(0, NULL, CkReduction::sum_int, cb);
366                 }
367
368     void printSts(int totalSteps){
369         /*for(int i=0; i<numNeighbors; i++){
370                 CkPrintf("Elem[%d]: avg RTT from neighbor %d (actual elem id %d): %lf\n", thisIndex, i, neighbors[i], recvTimes[i]/totalSteps);
371         }*/
372         contribute(0,0,CkReduction::max_int);
373     }
374
375     void startInternalIteration() {
376 #if DEBUG
377         CkPrintf("[%d]: Start internal iteration \n", thisIndex);
378 #endif
379
380         neighborsRecved = 0;
381         //1: pick a work size and do some computation
382         int sum=0;
383         int N=curIterWorkSize;
384         for (int i=0; i<N; i++)
385             for (int j=0; j<N; j++)
386                 for (int k=0; k<N; k++)
387                     sum += (thisIndex*i+thisIndex*j+k)%WORKSIZECNT;
388         //2. send msg to K neighbors
389         int msgSize = curIterMsgSize;
390
391         //Send msgs to neighbors
392         for (int i=0; i<numNeighbors; i++) {
393             //double memtimer = CmiWallTimer();
394
395 #if REUSE_ITER_MSG
396             toNeighborMsg *msg = iterMsg[i];
397 #else
398             toNeighborMsg *msg = new(msgSize/4, 0) toNeighborMsg(msgSize/4);
399 #endif
400
401 #if DEBUG
402             CkPrintf("[%d]: send msg to neighbor[%d]=%d\n", thisIndex, i, neighbors[i]);
403 #endif
404             msg->setMsgSrc(thisIndex, i);
405             //double entrytimer = CmiWallTimer();
406             thisProxy(neighbors[i]).recvMsgs(msg);
407             //double entrylasttimer = CmiWallTimer();
408             //if(thisIndex==0){
409             //  CkPrintf("At current step %d to neighbor %d, msg creation time: %f, entrymethod fire time: %f\n", internalStepCnt, neighbors[i], entrytimer-memtimer, entrylasttimer-entrytimer);
410             //}
411         }
412     }
413
414     void commWithNeighbors(int msgSize) {
415         internalStepCnt = 0;
416         curIterMsgSize = msgSize;
417         //currently the work size is only changed every big steps (which
418         //are initiated by the main proxy
419         curIterWorkSize = workSizeArr[random%WORKSIZECNT];
420         random++;
421
422         startTime = CmiWallTimer();
423         startInternalIteration();
424     }
425
426     void commWithNeighbors() {
427         internalStepCnt = 0;
428         curIterMsgSize = gMsgSize;
429         //currently the work size is only changed every big steps (which
430         //are initiated by the main proxy
431         curIterWorkSize = workSizeArr[random%WORKSIZECNT];
432         random++;
433         
434 #if REUSE_ITER_MSG
435         if(iterMsg[0]==NULL){ //indicating the messages have not been created
436             for(int i=0; i<numNeighbors; i++)
437                 iterMsg[i] = new(curIterMsgSize/4, 0) toNeighborMsg(curIterMsgSize/4);
438         }
439 #endif
440         
441         startTime = CmiWallTimer();
442         startInternalIteration();
443     }
444
445     void recvReplies(toNeighborMsg *m) {
446         int fromNID = m->nID;
447
448 #if DEBUG
449         CkPrintf("[%d]: receive ack from neighbor[%d]=%d\n", thisIndex, fromNID, neighbors[fromNID]);
450 #endif
451
452 #if REUSE_ITER_MSG
453         iterMsg[fromNID] = m;
454 #else
455         delete m;
456 #endif
457         //recvTimes[fromNID] += (CmiWallTimer() - startTime);
458
459         //get one step time and send it back to mainProxy
460         neighborsRecved++;
461         if (neighborsRecved == numNeighbors) {
462             internalStepCnt++;
463             if (internalStepCnt==CALCPERSTEP) {
464                 double iterCommTime = CmiWallTimer() - startTime;
465                 contribute(sizeof(double), &iterCommTime, CkReduction::max_double);
466                 /*if(thisIndex==0){
467                         for(int i=0; i<numNeighbors; i++){
468                                 CkPrintf("RTT time from neighbor %d (actual elem id %d): %lf\n", i, neighbors[i], recvTimes[i]);
469                         }
470                 }*/
471             } else {
472                 startInternalIteration();
473             }
474         }
475     }
476
477     void recvMsgs(toNeighborMsg *m) {
478 #if DEBUG
479         CkPrintf("[%d]: recv msg from %d as its %dth neighbor\n", thisIndex, m->fromX, m->nID);
480 #endif
481
482 #if TOUCH_MSGDATA
483         sum = m->sum();
484 #endif
485         thisProxy(m->fromX).recvReplies(m);
486     }
487
488     inline int MAX(int a, int b) {
489         return (a>b)?a:b;
490     }
491     inline int MIN(int a, int b) {
492         return (a<b)?a:b;
493     }
494 };
495
496 //int Block::workSizeArr[WORKSIZECNT] = {20, 60, 120, 180, 240};
497 int Block::workSizeArr[WORKSIZECNT] = {20};
498
499 #include "kNeighbor.def.h"