Adding performance benchmarks for CmiReduce and Broadcast in commbench
[charm.git] / tests / converse / commbench / reduction.c
1 /*****************************************************************************
2  *
3  *  Benchmarks to measure performance of CmiReduce
4  *
5  *  Clocks are synchronized first up, followed by singleton CmiReduce
6  *  after which the performance is measured by message collection are
7  *  a central point.
8  *
9  *
10  *  Author- Nikhil Jain
11  *
12  *****************************************************************************/
13
14
15 #include "converse.h"
16 #include "commbench.h"
17
18 typedef double* pdouble;
19
20 CpvStaticDeclare(int, numiter);
21 CpvStaticDeclare(int, nextidx);
22 CpvStaticDeclare(int, reduction_starter);
23 CpvStaticDeclare(int, reduction_handler);
24 CpvStaticDeclare(int, reduction_central);
25 CpvStaticDeclare(int, sync_starter);
26 CpvStaticDeclare(int, sync_reply);
27 CpvStaticDeclare(int, flip);
28 CpvStaticDeclare(double, starttime);
29 CpvStaticDeclare(double, endtime);
30 CpvStaticDeclare(double, lasttime);
31 CpvStaticDeclare(pdouble, timediff);
32 CpvStaticDeclare(int, currentPe);
33
34 //change it if adding values to sizes
35 #define MAXSIZE 1048576
36
37 static struct testdata {
38   int size;
39   int numiter;
40   double time;
41 } sizes[] = {
42   {4,       1024,      0.0},
43   {16,      1024,      0.0},
44   {64,      1024,      0.0},
45   {256,     1024,      0.0},
46   {1024,    1024,      0.0},
47   {4096,    1024,      0.0},
48   {16384,   1024,      0.0},
49   {65536,   1024,      0.0},
50   {262144,  1024,      0.0},
51   {1048576, 1024,      0.0},
52   {-1,      -1,        0.0},
53 };
54
55 typedef struct _varmsg {
56     char head[CmiMsgHeaderSizeBytes];
57     char contribution[MAXSIZE];
58 } *varmsg;
59
60 typedef struct _timemsg {
61       char head[CmiMsgHeaderSizeBytes];
62       double time;
63       int srcpe;
64 } *ptimemsg;
65
66 typedef struct _timemsg timemsg;
67
68
69
70 static char *sync_outstr =
71 "[broadcast] (%s) %le seconds per %d bytes\n"
72 ;
73
74 static void * reduceMessage(int *size, void *data, void **remote, int count) 
75 {
76   return data;
77 }
78
79 static void print_results(char *func)
80 {
81   int i=0;
82
83   while(sizes[i].size != (-1)) {
84     CmiPrintf(sync_outstr, func, sizes[i].time/sizes[i].numiter, sizes[i].size);
85     i++;
86   }
87 }
88
89 static void reduction_starter(void *msg)
90 {
91   int idx = CpvAccess(nextidx);
92   varmsg red_msg;
93   ptimemsg tmsg;
94   CmiFree(msg);
95
96   if(CpvAccess(flip)) {
97     tmsg = (ptimemsg)CmiAlloc(sizeof(timemsg));
98     tmsg->time = CpvAccess(starttime);;
99     tmsg->srcpe = CmiMyPe();
100     CmiSetHandler(tmsg, CpvAccess(reduction_central));
101     CmiSyncSend(0, sizeof(timemsg), tmsg);
102     CmiFree(tmsg);
103     CpvAccess(flip) = 0;
104   } else {
105     red_msg = (varmsg)CmiAlloc(sizeof(struct _varmsg));
106     CmiSetHandler(red_msg, CpvAccess(reduction_handler));
107     CpvAccess(starttime) = CmiWallTimer();
108     CmiReduce(red_msg, CmiMsgHeaderSizeBytes+sizes[idx].size, reduceMessage);
109     CpvAccess(flip) = 1;
110     if(CmiMyPe() != 0) {
111       CpvAccess(numiter)++;
112       if(CpvAccess(numiter) == sizes[idx].numiter) {
113         CpvAccess(nextidx) = idx + 1;
114         CpvAccess(numiter) = 0;
115       }
116     }
117   }
118 }
119
120 static void reduction_handler(void *msg) 
121 {
122   CpvAccess(endtime) = CmiWallTimer();
123   EmptyMsg emsg;
124
125   CmiFree(msg);
126   CmiSetHandler(&emsg, CpvAccess(reduction_starter));
127   CmiSyncBroadcastAll(sizeof(EmptyMsg), &emsg);
128 }
129
130 static void reduction_central(void *msg)
131 {
132   EmptyMsg emsg;
133   ptimemsg tmsg = (ptimemsg)msg;
134   if(CpvAccess(currentPe) == 0) {
135     CpvAccess(lasttime) = CpvAccess(endtime) - tmsg->time -
136                           CpvAccess(timediff)[tmsg->srcpe];
137   } else if((CpvAccess(endtime) - tmsg->time - 
138     CpvAccess(timediff)[tmsg->srcpe]) > CpvAccess(lasttime)) {
139     CpvAccess(lasttime) = CpvAccess(endtime) - tmsg->time -
140                           CpvAccess(timediff)[tmsg->srcpe];
141   }
142   CmiFree(msg);
143   CpvAccess(currentPe)++;
144   if(CpvAccess(currentPe) == CmiNumPes()) {
145     sizes[CpvAccess(nextidx)].time += CpvAccess(lasttime);
146     CpvAccess(numiter)++;
147     if(CpvAccess(numiter)<sizes[CpvAccess(nextidx)].numiter) {
148       CpvAccess(currentPe) = 0;
149       CmiSetHandler(&emsg, CpvAccess(reduction_starter));
150       CmiSyncBroadcastAll(sizeof(EmptyMsg), &emsg);
151     } else {
152       CpvAccess(numiter) = 0;
153       CpvAccess(nextidx)++;
154       if(sizes[CpvAccess(nextidx)].size == (-1)) {
155         print_results("CmiReduce");
156         CmiSetHandler(&emsg, CpvAccess(ack_handler));
157         CmiSyncSend(0, sizeof(EmptyMsg), &emsg);
158         return;
159       } else {
160         CpvAccess(currentPe) = 0;
161         CmiSetHandler(&emsg, CpvAccess(reduction_starter));
162         CmiSyncBroadcastAll(sizeof(EmptyMsg), &emsg);
163       }
164     }
165   }
166 }
167
168 static void sync_starter(void *msg) 
169 {
170   EmptyMsg emsg;    
171   ptimemsg tmsg = (ptimemsg)msg;
172
173   double midTime = (CmiWallTimer() + CpvAccess(lasttime))/2;
174   CpvAccess(timediff)[CpvAccess(currentPe)] = midTime - tmsg->time;
175   CmiFree(msg);
176
177   CpvAccess(currentPe)++;
178   if(CpvAccess(currentPe) < CmiNumPes()) {
179     CmiSetHandler(&emsg, CpvAccess(sync_reply));
180     CpvAccess(lasttime) = CmiWallTimer(); 
181     CmiSyncSend(CpvAccess(currentPe), sizeof(EmptyMsg), &emsg);
182   } else {
183     CmiSetHandler(&emsg, CpvAccess(reduction_starter));
184     CpvAccess(currentPe) = 0;
185     CmiSyncBroadcastAll(sizeof(EmptyMsg), &emsg);
186   }
187 }
188
189 static void sync_reply(void *msg) 
190 {
191   ptimemsg tmsg = (ptimemsg)CmiAlloc(sizeof(timemsg));
192   tmsg->time = CmiWallTimer();
193
194   CmiFree(msg);
195   CmiSetHandler(tmsg, CpvAccess(sync_starter));
196   CmiSyncSend(0, sizeof(timemsg), tmsg);
197   CmiFree(tmsg);
198 }
199
200 void reduction_init(void)
201 {
202   EmptyMsg emsg;
203
204   CmiSetHandler(&emsg, CpvAccess(sync_reply));
205   CpvAccess(lasttime) = CmiWallTimer();
206   CmiSyncSend(CpvAccess(currentPe),sizeof(EmptyMsg), &emsg);
207 }
208
209 void reduction_moduleinit(void)
210 {
211   CpvInitialize(int, numiter);
212   CpvInitialize(int, nextidx);
213   CpvInitialize(int, flip);
214   CpvInitialize(int, currentPe);
215   CpvInitialize(double, starttime);
216   CpvInitialize(doube, lasttime);
217   CpvInitialize(doube, endtime);
218   CpvInitialize(pdoube, timediff);
219   CpvInitialize(int, sync_starter);
220   CpvInitialize(int, sync_reply);
221   CpvInitialize(int, reduction_starter);
222   CpvInitialize(int, reduction_handler);
223   CpvInitialize(int, reduction_central);
224   CpvAccess(numiter) = 0;
225   CpvAccess(nextidx) = 0;
226   CpvAccess(currentPe) = 0;
227   CpvAccess(flip) = 0;
228   CpvAccess(timediff) = (pdouble)malloc(CmiNumPes()*sizeof(double));
229   CpvAccess(reduction_starter) = CmiRegisterHandler((CmiHandler)reduction_starter);
230   CpvAccess(reduction_handler) = CmiRegisterHandler((CmiHandler)reduction_handler);
231   CpvAccess(reduction_central) = CmiRegisterHandler((CmiHandler)reduction_central);
232   CpvAccess(sync_starter) = CmiRegisterHandler((CmiHandler)sync_starter);
233   CpvAccess(sync_reply) = CmiRegisterHandler((CmiHandler)sync_reply);
234 }