Minor change in mempool - saving size as power of two in slots
[charm.git] / tests / converse / commbench / reduction.c
1 /*****************************************************************************
2  *
3  *  Benchmarks to measure performance of CmiReduce
4  *
5  *  Clocks are synchronized first up, followed by singleton CmiReduce
6  *  after which the performance is measured by message collection are
7  *  a central point.
8  *
9  *
10  *  Author- Nikhil Jain
11  *
12  *****************************************************************************/
13
14
15 #include "converse.h"
16 #include "commbench.h"
17
18 typedef double* pdouble;
19
20 CpvStaticDeclare(int, numiter);
21 CpvStaticDeclare(int, nextidx);
22 CpvStaticDeclare(int, reduction_starter);
23 CpvStaticDeclare(int, reduction_handler);
24 CpvStaticDeclare(int, reduction_central);
25 CpvStaticDeclare(int, sync_starter);
26 CpvStaticDeclare(int, sync_reply);
27 CpvStaticDeclare(int, flip);
28 CpvStaticDeclare(double, starttime);
29 CpvStaticDeclare(double, endtime);
30 CpvStaticDeclare(double, lasttime);
31 CpvStaticDeclare(pdouble, timediff);
32 CpvStaticDeclare(int, currentPe);
33
34 //change it if adding values to sizes
35 #define MAXSIZE 1048576
36
37 static struct testdata {
38   int size;
39   int numiter;
40   double time;
41 } sizes[] = {
42   {4,       1024,      0.0},
43   {16,      1024,      0.0},
44   {64,      1024,      0.0},
45   {256,     1024,      0.0},
46   {1024,    1024,      0.0},
47   {4096,    1024,      0.0},
48   {16384,   1024,      0.0},
49   {65536,   1024,      0.0},
50   {262144,  1024,      0.0},
51   {1048576, 1024,      0.0},
52   {-1,      -1,        0.0},
53 };
54
55 typedef struct _varmsg {
56     char head[CmiMsgHeaderSizeBytes];
57     char contribution[MAXSIZE];
58 } *varmsg;
59
60 typedef struct _timemsg {
61       char head[CmiMsgHeaderSizeBytes];
62       double time;
63       int srcpe;
64 } *ptimemsg;
65
66 typedef struct _timemsg timemsg;
67
68 static char *sync_outstr =
69 "[broadcast] (%s) %le seconds per %d bytes\n"
70 ;
71
72 static void * reduceMessage(int *size, void *data, void **remote, int count) 
73 {
74   return data;
75 }
76
77 static void print_results(char *func)
78 {
79   int i=0;
80
81   while(sizes[i].size != (-1)) {
82     CmiPrintf(sync_outstr, func, sizes[i].time/sizes[i].numiter, sizes[i].size);
83     i++;
84   }
85 }
86
87 static void reduction_starter(void *msg)
88 {
89   int idx = CpvAccess(nextidx);
90   varmsg red_msg;
91   ptimemsg tmsg;
92   CmiFree(msg);
93
94   if(CpvAccess(flip)) {
95     tmsg = (ptimemsg)CmiAlloc(sizeof(timemsg));
96     tmsg->time = CpvAccess(starttime);;
97     tmsg->srcpe = CmiMyPe();
98     CmiSetHandler(tmsg, CpvAccess(reduction_central));
99     CmiSyncSend(0, sizeof(timemsg), tmsg);
100     CmiFree(tmsg);
101     CpvAccess(flip) = 0;
102   } else {
103     red_msg = (varmsg)CmiAlloc(sizeof(struct _varmsg));
104     CmiSetHandler(red_msg, CpvAccess(reduction_handler));
105     CpvAccess(starttime) = CmiWallTimer();
106     CmiReduce(red_msg, CmiMsgHeaderSizeBytes+sizes[idx].size, reduceMessage);
107     CpvAccess(flip) = 1;
108     if(CmiMyPe() != 0) {
109       CpvAccess(numiter)++;
110       if(CpvAccess(numiter) == sizes[idx].numiter) {
111         CpvAccess(nextidx) = idx + 1;
112         CpvAccess(numiter) = 0;
113       }
114     }
115   }
116 }
117
118 static void reduction_handler(void *msg) 
119 {
120   EmptyMsg emsg;
121   CpvAccess(endtime) = CmiWallTimer();
122
123   CmiFree(msg);
124   CmiSetHandler(&emsg, CpvAccess(reduction_starter));
125   CmiSyncBroadcastAll(sizeof(EmptyMsg), &emsg);
126 }
127
128 static void reduction_central(void *msg)
129 {
130   EmptyMsg emsg;
131   ptimemsg tmsg = (ptimemsg)msg;
132   if(CpvAccess(currentPe) == 0) {
133     CpvAccess(lasttime) = CpvAccess(endtime) - tmsg->time -
134                           CpvAccess(timediff)[tmsg->srcpe];
135   } else if((CpvAccess(endtime) - tmsg->time - 
136     CpvAccess(timediff)[tmsg->srcpe]) > CpvAccess(lasttime)) {
137     CpvAccess(lasttime) = CpvAccess(endtime) - tmsg->time -
138                           CpvAccess(timediff)[tmsg->srcpe];
139   }
140   CmiFree(msg);
141   CpvAccess(currentPe)++;
142   if(CpvAccess(currentPe) == CmiNumPes()) {
143     sizes[CpvAccess(nextidx)].time += CpvAccess(lasttime);
144     CpvAccess(numiter)++;
145     if(CpvAccess(numiter)<sizes[CpvAccess(nextidx)].numiter) {
146       CpvAccess(currentPe) = 0;
147       CmiSetHandler(&emsg, CpvAccess(reduction_starter));
148       CmiSyncBroadcastAll(sizeof(EmptyMsg), &emsg);
149     } else {
150       CpvAccess(numiter) = 0;
151       CpvAccess(nextidx)++;
152       if(sizes[CpvAccess(nextidx)].size == (-1)) {
153         print_results("CmiReduce");
154         CmiSetHandler(&emsg, CpvAccess(ack_handler));
155         CmiSyncSend(0, sizeof(EmptyMsg), &emsg);
156         return;
157       } else {
158         CpvAccess(currentPe) = 0;
159         CmiSetHandler(&emsg, CpvAccess(reduction_starter));
160         CmiSyncBroadcastAll(sizeof(EmptyMsg), &emsg);
161       }
162     }
163   }
164 }
165
166 static void sync_starter(void *msg) 
167 {
168   EmptyMsg emsg;    
169   ptimemsg tmsg = (ptimemsg)msg;
170
171   double midTime = (CmiWallTimer() + CpvAccess(lasttime))/2;
172   CpvAccess(timediff)[CpvAccess(currentPe)] = midTime - tmsg->time;
173   CmiFree(msg);
174
175   CpvAccess(currentPe)++;
176   if(CpvAccess(currentPe) < CmiNumPes()) {
177     CmiSetHandler(&emsg, CpvAccess(sync_reply));
178     CpvAccess(lasttime) = CmiWallTimer(); 
179     CmiSyncSend(CpvAccess(currentPe), sizeof(EmptyMsg), &emsg);
180   } else {
181     CmiSetHandler(&emsg, CpvAccess(reduction_starter));
182     CpvAccess(currentPe) = 0;
183     CmiSyncBroadcastAll(sizeof(EmptyMsg), &emsg);
184   }
185 }
186
187 static void sync_reply(void *msg) 
188 {
189   ptimemsg tmsg = (ptimemsg)CmiAlloc(sizeof(timemsg));
190   tmsg->time = CmiWallTimer();
191
192   CmiFree(msg);
193   CmiSetHandler(tmsg, CpvAccess(sync_starter));
194   CmiSyncSend(0, sizeof(timemsg), tmsg);
195   CmiFree(tmsg);
196 }
197
198 void reduction_init(void)
199 {
200   EmptyMsg emsg;
201
202   CmiSetHandler(&emsg, CpvAccess(sync_reply));
203   CpvAccess(lasttime) = CmiWallTimer();
204   CmiSyncSend(CpvAccess(currentPe),sizeof(EmptyMsg), &emsg);
205 }
206
207 void reduction_moduleinit(void)
208 {
209   CpvInitialize(int, numiter);
210   CpvInitialize(int, nextidx);
211   CpvInitialize(int, flip);
212   CpvInitialize(int, currentPe);
213   CpvInitialize(double, starttime);
214   CpvInitialize(double, lasttime);
215   CpvInitialize(double, endtime);
216   CpvInitialize(pdouble, timediff);
217   CpvInitialize(int, sync_starter);
218   CpvInitialize(int, sync_reply);
219   CpvInitialize(int, reduction_starter);
220   CpvInitialize(int, reduction_handler);
221   CpvInitialize(int, reduction_central);
222   CpvAccess(numiter) = 0;
223   CpvAccess(nextidx) = 0;
224   CpvAccess(currentPe) = 0;
225   CpvAccess(flip) = 0;
226   CpvAccess(timediff) = (pdouble)malloc(CmiNumPes()*sizeof(double));
227   CpvAccess(reduction_starter) = CmiRegisterHandler((CmiHandler)reduction_starter);
228   CpvAccess(reduction_handler) = CmiRegisterHandler((CmiHandler)reduction_handler);
229   CpvAccess(reduction_central) = CmiRegisterHandler((CmiHandler)reduction_central);
230   CpvAccess(sync_starter) = CmiRegisterHandler((CmiHandler)sync_starter);
231   CpvAccess(sync_reply) = CmiRegisterHandler((CmiHandler)sync_reply);
232 }