tests/charm++/pingpong: added pipelined test for groups
[charm.git] / tests / charm++ / pingpong / pingpong.C
1 #include <string.h> // for strlen, and strcmp
2 #include <charm++.h>
3
4 #define NITER 1000
5 #define PAYLOAD 100
6
7 #if ! CMK_SMP            /* only test RDMA when non-SMP */
8
9 #if defined(CMK_DIRECT) || defined(CMK_USE_IBVERBS)
10 #define USE_RDMA 1
11 #endif
12
13 #endif
14
15 #ifdef USE_RDMA
16 extern "C" {
17 #include "cmidirect.h"
18 }
19 #endif
20
21 class Fancy
22 {
23   char _str[12];
24   public:
25     Fancy() { _str[0] = '\0'; }
26     Fancy(const char *str) {
27       strncpy(_str, str, 12);
28     }
29     int equals(const char *str) const { return !strcmp(str, _str); }
30 };
31
32 class CkArrayIndexFancy : public CkArrayIndex {
33   Fancy *f;
34   public:
35     CkArrayIndexFancy(const char *str) 
36     {
37         /// Use placement new to ensure that the custom index object is placed in the memory reserved for it in the base class
38         f = new (index) Fancy(str);
39         nInts=3; 
40     }
41 };
42
43 #include "pingpong.decl.h"
44 class PingMsg : public CMessage_PingMsg
45 {
46   public:
47     char *x;
48
49 };
50
51 class FragMsg : public CMessage_FragMsg
52 {
53   public:
54     char *x; 
55     int fragmentId; 
56     int numFragments; 
57     int pipeSize; 
58     bool copy;
59
60   FragMsg(int sequenceNumber, int total, int size, bool copyFragments) 
61     : fragmentId(sequenceNumber), numFragments(total), pipeSize(size), 
62       copy(copyFragments) {}
63   
64 };
65
66 class IdMsg : public CMessage_IdMsg
67 {
68   public:
69     CkChareID cid;
70     IdMsg(CkChareID _cid) : cid(_cid) {}
71 };
72
73 CProxy_main mainProxy;
74 int iterations;
75 int payload;
76
77 #define P1 0
78 #define P2 1%CkNumPes()
79
80 class main : public CBase_main
81 {
82   int phase;
83   int pipeSize;
84   CProxy_Ping1 arr1;
85   CProxy_Ping2 arr2;
86   CProxy_Ping3 arr3;
87   CProxy_PingF arrF;
88   CProxy_PingC cid;
89   CProxy_PingG gid;
90   CProxy_PingN ngid;
91 public:
92   main(CkMigrateMessage *m) {}
93   main(CkArgMsg* m)
94   {
95     if(CkNumPes()>2) {
96       CkAbort("Run this program on 1 or 2 processors only.\n");
97     }
98
99     pipeSize = 8192;
100     iterations=NITER;
101     payload=PAYLOAD;
102     if(m->argc>1)
103       payload=atoi(m->argv[1]);
104     if(m->argc>2)
105       iterations=atoi(m->argv[2]);
106     if(m->argc>3)
107       CkPrintf("Usage: pgm +pN [payload] [iterations]\n Where N [1-2], payload (default %d) is integer >0 iterations (default %d) is integer >0 ", PAYLOAD, NITER);
108     CkPrintf("Pingpong with payload: %d iterations: %d\n", payload,iterations);
109     mainProxy = thishandle;
110     phase = 0;
111     gid = CProxy_PingG::ckNew();
112     ngid = CProxy_PingN::ckNew();
113     cid=CProxy_PingC::ckNew(1%CkNumPes());
114     cid=CProxy_PingC::ckNew(new IdMsg(cid.ckGetChareID()),0);
115     arr1 = CProxy_Ping1::ckNew(2);
116     arr2 = CProxy_Ping2::ckNew();
117     arr3 = CProxy_Ping3::ckNew();
118     arrF = CProxy_PingF::ckNew();
119     arr2(0,0).insert(P1);
120     arr2(0,1).insert(P2);
121     arr2.doneInserting();
122     arr3(0,0,0).insert(P1);
123     arr3(0,0,1).insert(P2);
124     arr3.doneInserting();
125     arrF[CkArrayIndexFancy("first")].insert(P1);
126     arrF[CkArrayIndexFancy("second")].insert(P2);
127     arrF.doneInserting();
128     phase=0;
129     CkStartQD(CkCallback(CkIndex_main::maindone(), mainProxy));
130     delete m;
131   };
132
133   void maindone(void)
134   {
135     bool isPipelined, copyFragments;
136     switch(phase) {
137       case 0:
138         arr1[0].start();
139         break;
140       case 1:
141         arr2(0,0).start();
142         break;
143       case 2:
144         arr3(0,0,0).start();
145         break;
146       case 3:
147         arrF[CkArrayIndexFancy("first")].start();
148         break;
149       case 4:
150         cid.start();
151         break;
152       case 5:       
153         isPipelined = false; 
154         copyFragments = false;
155         gid[0].start(isPipelined, copyFragments, 0);
156         break;
157       case 6: 
158         isPipelined = true; 
159         copyFragments = false;
160         gid[0].start(isPipelined, copyFragments, pipeSize);           
161         break;
162       case 7:
163         isPipelined = true; 
164         copyFragments = true; 
165         gid[0].start(isPipelined, copyFragments, pipeSize);           
166         // repeat pipelined test for different fragment sizes 
167         if (pipeSize < .5 * payload) {
168           pipeSize *= 2; 
169           phase = 5; 
170         } 
171         break;
172 #ifndef USE_RDMA
173       case 8:
174         ngid[0].start();
175         break;
176 #else
177       case 8:
178           ngid[0].startRDMA();
179           break;
180 #endif
181       default:
182         CkExit();
183     }
184     phase++; 
185   };
186 };
187
188 class PingG : public CBase_PingG
189 {
190   CProxyElement_PingG *pp;
191   int niter;
192   int me, nbr;
193   double start_time, end_time;
194   PingMsg *collectedMsg; 
195   int numFragmentsReceived; 
196   int numFragmentsTotal; 
197   bool copyFragments; 
198   int pipeSize; 
199 public:
200   PingG()
201   {
202     me = CkMyPe();    
203     nbr = (me+1)%CkNumPes();
204     pp = new CProxyElement_PingG(thisgroup,nbr);
205     niter = 0;
206     numFragmentsReceived = 0; 
207     numFragmentsTotal = -1; 
208   }
209   PingG(CkMigrateMessage *m) {}
210   void start(bool isPipelined, bool copy, int fragSize)
211   {
212     pipeSize = fragSize;     
213     copyFragments = copy;
214     PingMsg *msg = new (payload) PingMsg;
215     start_time = CkWallTimer();
216     if (isPipelined) {
217       collectedMsg = msg;
218       pipelinedSend(); 
219     }
220     else {
221       (*pp).recv(msg);
222     }
223   }
224
225   void recv(PingMsg *msg)
226   {
227     if(me==0) {
228       niter++;
229       if(niter==iterations) {
230         niter = 0;
231         end_time = CkWallTimer();
232         int titer = (CkNumPes()==1)?(iterations/2) : iterations;
233         CkPrintf("Roundtrip time for Groups is %lf us\n",
234                  1.0e6*(end_time-start_time)/titer);
235         delete msg;
236         mainProxy.maindone();
237       } else {
238         (*pp).recv(msg);
239       }
240     } else {
241       (*pp).recv(msg);
242     }
243   }
244
245   void pipelinedSend() {
246     int numFragments = (payload + pipeSize - 1) / pipeSize; 
247     int fragmentSize = pipeSize; 
248     FragMsg *fragMsg; 
249     for (int i = 0; i < numFragments; i++) {      
250       if (i == numFragments - 1) {
251         fragmentSize = payload - i * fragmentSize;  
252       }      
253       fragMsg = new (fragmentSize) FragMsg(i, numFragments, fragmentSize, copyFragments); 
254       if (copyFragments) {
255         memcpy(fragMsg->x, ((char *) collectedMsg ) + i * pipeSize, fragmentSize); 
256       }
257       (*pp).pipelinedRecv(fragMsg);
258     }
259   }
260
261   void pipelinedRecv(FragMsg *msg) {
262     //    CkPrintf("[%d] receiving fragment %d of %d\n", CkMyPe(), msg->fragmentId + 1, 
263     //       msg->numFragments);
264     if (me == 1 && numFragmentsReceived == 0) {
265       pipeSize = msg->pipeSize; 
266       copyFragments = msg->copy; 
267       if (copyFragments) {
268         collectedMsg = new (payload) PingMsg();
269       }
270       else {
271         collectedMsg = NULL;
272       }
273     }
274     numFragmentsReceived++; 
275     numFragmentsTotal = msg->numFragments; 
276     if (copyFragments) {
277       memcpy(&collectedMsg->x[msg->fragmentId * pipeSize], msg->x, msg->pipeSize); 
278     }
279     if (numFragmentsReceived == numFragmentsTotal) {
280       niter++;
281       numFragmentsReceived = 0; 
282
283       if (niter == iterations) {
284         niter = 0;
285         if (me == 0) {
286           end_time = CkWallTimer();
287           int titer = (CkNumPes()==1)?(iterations/2) : iterations;
288           CkPrintf("Roundtrip time for Groups "
289                    "(pipe size %d KB, %s memcpy) is %lf us\n",
290                    pipeSize / 1024, copyFragments ? "with" : "without", 
291                    1.0e6*(end_time-start_time)/titer);
292           mainProxy.maindone();
293         }
294         else {
295           pipelinedSend(); 
296         }
297         delete collectedMsg;
298       }
299       else {
300         pipelinedSend(); 
301       }
302     }
303   }
304 };
305
306
307 class PingN : public CBase_PingN
308 {
309   int niter;
310   int me, nbr;
311 #ifdef USE_RDMA 
312   struct infiDirectUserHandle shandle,rhandle;
313   char *rbuff;
314   char *sbuff;
315 #endif
316   double start_time, end_time;
317 public:
318   PingN()
319   {
320     me = CkMyNode();    
321     nbr = (me+1)%CkNumNodes();
322
323     // note: for RMDA in ping you can only have 1 nbr who is both your
324     // upstream and downstream which makes this an artificially simple
325     // calculation.
326
327     niter = 0;
328 #ifdef USE_RDMA 
329     rbuff=(char *) malloc(payload*sizeof(char));
330     sbuff=(char *) malloc(payload*sizeof(char));
331     bzero(sbuff,payload);
332     // setup persistent comm sender and receiver side
333     double OOB=9999999999.0;
334     rhandle=CmiDirect_createHandle(nbr,rbuff,payload*sizeof(char),PingN::Wrapper_To_CallBack,(void *) this,OOB);
335     thisProxy[nbr].recvHandle((char*) &rhandle,sizeof(struct infiDirectUserHandle));
336 #endif
337   }
338   PingN(CkMigrateMessage *m) {}
339   void recvHandle(char *ptr,int size)
340   {
341
342 #ifdef USE_RDMA 
343     struct infiDirectUserHandle *_shandle=(struct infiDirectUserHandle *) ptr;
344     shandle=*_shandle;
345     CmiDirect_assocLocalBuffer(&shandle,sbuff,payload);
346 #endif
347   }
348   void start(void)
349   {
350     start_time = CkWallTimer();
351     thisProxy[nbr].recv(new (payload) PingMsg);
352   }
353   void startRDMA(void)
354   {
355     niter=0;
356     start_time = CkWallTimer();
357 #ifdef USE_RDMA 
358     CmiDirect_put(&shandle);
359 #else
360     CkAbort("do not call startRDMA if you don't actually have RDMA");
361 #endif
362   }
363
364   void recv(PingMsg *msg)
365   {
366     if(me==0) {
367       niter++;
368       if(niter==iterations) {
369         end_time = CkWallTimer();
370         int titer = (CkNumNodes()==1)?(iterations/2) : iterations;
371         CkPrintf("Roundtrip time for NodeGroups is %lf us\n",
372                  1.0e6*(end_time-start_time)/titer);
373         delete msg;
374         mainProxy.maindone();
375       } else {
376         thisProxy[nbr].recv(msg);
377       }
378     } else {
379       thisProxy[nbr].recv(msg);
380     }
381   }
382   static void Wrapper_To_CallBack(void* pt2Object){
383     // explicitly cast to a pointer to PingN
384     PingN* mySelf = (PingN*) pt2Object;
385
386     // call member
387     if(CkNumNodes() == 0){
388       mySelf->recvRDMA();
389     }else{
390       (mySelf->thisProxy)[CkMyNode()].recvRDMA();   
391     }
392   }
393   // not an entry method, called via Wrapper_To_Callback
394   void recvRDMA()
395   {
396 #ifdef USE_RDMA 
397     CmiDirect_ready(&rhandle);
398 #endif
399     if(me==0) {
400       niter++;
401       if(niter==iterations) {
402         end_time = CkWallTimer();
403         int titer = (CkNumNodes()==1)?(iterations/2) : iterations;
404         CkPrintf("Roundtrip time for NodeGroups RDMA is %lf us\n",
405                  1.0e6*(end_time-start_time)/titer);
406         mainProxy.maindone();
407       } else {
408 #ifdef USE_RDMA 
409         CmiDirect_put(&shandle);
410 #else
411         CkAbort("do not call startRDMA if you don't actually have RDMA");
412 #endif
413       }
414     } else {
415 #ifdef USE_RDMA 
416       CmiDirect_put(&shandle);
417 #else
418       CkAbort("do not call startRDMA if you don't actually have RDMA");
419 #endif
420     }
421   }
422
423 };
424
425
426 class Ping1 : public CBase_Ping1
427 {
428   CProxy_Ping1 *pp;
429   int niter;
430   double start_time, end_time;
431 public:
432   Ping1()
433   {
434     pp = new CProxy_Ping1(thisArrayID);
435     niter = 0;
436   }
437   Ping1(CkMigrateMessage *m) {}
438   void start(void)
439   {
440     (*pp)[1].recv(new (payload) PingMsg);
441     start_time = CkWallTimer();
442   }
443   void recv(PingMsg *msg)
444   {
445     if(thisIndex==0) {
446       niter++;
447       if(niter==iterations) {
448         end_time = CkWallTimer();
449         CkPrintf("Roundtrip time for 1D Arrays is %lf us\n",
450                  1.0e6*(end_time-start_time)/iterations);
451         //mainProxy.maindone();
452         niter=0;
453         start_time = CkWallTimer();
454         (*pp)[0].trecv(msg);
455       } else {
456         (*pp)[1].recv(msg);
457       }
458     } else {
459       (*pp)[0].recv(msg);
460     }
461   }
462   void trecv(PingMsg *msg)
463   {
464     if(thisIndex==0) {
465       niter++;
466       if(niter==iterations) {
467         end_time = CkWallTimer();
468         CkPrintf("Roundtrip time for 1D threaded Arrays is %lf us\n",
469                  1.0e6*(end_time-start_time)/iterations);
470         mainProxy.maindone();
471       } else {
472         (*pp)[1].trecv(msg);
473       }
474     } else {
475       (*pp)[0].trecv(msg);
476     }
477   }
478 };
479
480 class Ping2 : public CBase_Ping2
481 {
482   CProxy_Ping2 *pp;
483   int niter;
484   double start_time, end_time;
485 public:
486   Ping2()
487   {
488     pp = new CProxy_Ping2(thisArrayID);
489     niter = 0;
490   }
491   Ping2(CkMigrateMessage *m) {}
492   void start(void)
493   {
494     (*pp)(0,1).recv(new (payload) PingMsg);
495     start_time = CkWallTimer();
496   }
497   void recv(PingMsg *msg)
498   {
499     if(thisIndex.y==0) {
500       niter++;
501       if(niter==iterations) {
502         end_time = CkWallTimer();
503         CkPrintf("Roundtrip time for 2D Arrays is %lf us\n",
504                  1.0e6*(end_time-start_time)/iterations);
505         mainProxy.maindone();
506       } else {
507         (*pp)(0,1).recv(msg);
508       }
509     } else {
510       (*pp)(0,0).recv(msg);
511     }
512   }
513 };
514
515 class Ping3 : public CBase_Ping3
516 {
517   CProxy_Ping3 *pp;
518   int niter;
519   double start_time, end_time;
520 public:
521   Ping3()
522   {
523     pp = new CProxy_Ping3(thisArrayID);
524     niter = 0;
525   }
526   Ping3(CkMigrateMessage *m) {}
527   void start(void)
528   {
529     (*pp)(0,0,1).recv(new (payload) PingMsg);
530     start_time = CkWallTimer();
531   }
532   void recv(PingMsg *msg)
533   {
534     if(thisIndex.z==0) {
535       niter++;
536       if(niter==iterations) {
537         end_time = CkWallTimer();
538         CkPrintf("Roundtrip time for 3D Arrays is %lf us\n",
539                  1.0e6*(end_time-start_time)/iterations);
540         mainProxy.maindone();
541       } else {
542         (*pp)(0,0,1).recv(msg);
543       }
544     } else {
545       (*pp)(0,0,0).recv(msg);
546     }
547   }
548 };
549
550 class PingF : public CBase_PingF
551 {
552   CProxy_PingF *pp;
553   int niter;
554   double start_time, end_time;
555   int first;
556 public:
557   PingF()
558   {
559     pp = new CProxy_PingF(thisArrayID);
560     niter = 0;
561     first = thisIndex.equals("first") ? 1 : 0;
562   }
563   PingF(CkMigrateMessage *m) {}
564   void start(void)
565   {
566     (*pp)[CkArrayIndexFancy("second")].recv(new (payload) PingMsg);
567     start_time = CkWallTimer();
568   }
569   void recv(PingMsg *msg)
570   {
571     CkArrayIndexFancy partner((char *)(first?"second" : "first"));
572     if(first) {
573       niter++;
574       if(niter==iterations) {
575         end_time = CkWallTimer();
576         CkPrintf("Roundtrip time for Fancy Arrays is %lf us\n",
577                  1.0e6*(end_time-start_time)/iterations);
578         delete msg;
579         mainProxy.maindone();
580       } else {
581         (*pp)[partner].recv(msg);
582       }
583     } else {
584       (*pp)[partner].recv(msg);
585     }
586   }
587 };
588
589 class PingC : public CBase_PingC
590 {
591   CProxy_PingC *pp;
592   int niter;
593   double start_time, end_time;
594   int first;
595  public:
596   PingC(void)
597   {
598     first = 0;
599   }
600   PingC(IdMsg *msg)
601   {
602     first = 1;
603     CProxy_PingC pc(msg->cid);
604     msg->cid = thishandle;
605     pc.exchange(msg);
606   }
607   PingC(CkMigrateMessage *m) {}
608   void start(void)
609   {
610     niter = 0;
611     pp->recvReuse(new (payload) PingMsg);
612     start_time = CkWallTimer();
613   }
614   void exchange(IdMsg *msg)
615   {
616     if(first) {
617       pp = new CProxy_PingC(msg->cid);
618       delete msg;
619     } else {
620       pp = new CProxy_PingC(msg->cid);
621       msg->cid = thishandle;
622       pp->exchange(msg);
623     }
624   }
625   void recvReuse(PingMsg *msg)
626   {
627     if(first) {
628       niter++;
629       if(niter==iterations) {
630         end_time = CkWallTimer();
631         CkPrintf("Roundtrip time for Chares (reuse msgs) is %lf us\n",
632                  1.0e6*(end_time-start_time)/iterations);
633         niter = 0;
634         delete msg;
635         pp->recv(new (payload) PingMsg);
636         start_time = CkWallTimer();
637       } else {
638         pp->recvReuse(msg);
639       }
640     } else {
641       pp->recvReuse(msg);
642     }
643   }
644   void recv(PingMsg *msg)
645   {
646     delete msg;
647     if(first) {
648       niter++;
649       if(niter==iterations) {
650         end_time = CkWallTimer();
651         CkPrintf("Roundtrip time for Chares (new/del msgs) is %lf us\n",
652                  1.0e6*(end_time-start_time)/iterations);
653         niter = 0;
654         pp->trecv(new (payload) PingMsg);
655         start_time = CkWallTimer();
656       } else {
657         pp->recv(new (payload) PingMsg);
658       }
659     } else {
660       pp->recv(new (payload) PingMsg);
661     }
662   }
663   void trecv(PingMsg *msg)
664   {
665     if(first) {
666       niter++;
667       if(niter==iterations) {
668         end_time = CkWallTimer();
669         CkPrintf("Roundtrip time for threaded Chares (reuse) is %lf us\n",
670                  1.0e6*(end_time-start_time)/iterations);
671         delete msg;
672         mainProxy.maindone();
673       } else {
674         pp->trecv(msg);
675       }
676     } else {
677       pp->trecv(msg);
678     }
679   }
680 };
681 #include "pingpong.def.h"