fix in infi_CmiFree the case in standalone mode for SMP.
[charm.git] / tests / charm++ / pingpong / pingpong.C
1 #include <string.h> // for strlen, and strcmp
2 #include <charm++.h>
3
4 #define NITER 1000
5 #define PAYLOAD 100
6
7 #if ! CMK_SMP            /* only test RDMA when non-SMP */
8
9 #if defined(CMK_DIRECT) || defined(CMK_USE_IBVERBS)
10 #define USE_RDMA 1
11 #endif
12
13 #endif
14
15 #ifdef USE_RDMA
16 extern "C" {
17 #include "cmidirect.h"
18 }
19 #endif
20
21 class Fancy
22 {
23   char _str[12];
24   public:
25     Fancy() { _str[0] = '\0'; }
26     Fancy(const char *str) {
27       strncpy(_str, str, 12);
28     }
29     int equals(const char *str) const { return !strcmp(str, _str); }
30 };
31
32 class CkArrayIndexFancy : public CkArrayIndex {
33   Fancy *f;
34   public:
35     CkArrayIndexFancy(const char *str) 
36     {
37         /// Use placement new to ensure that the custom index object is placed in the memory reserved for it in the base class
38         f = new (index) Fancy(str);
39         nInts=3; 
40     }
41 };
42
43 #include "pingpong.decl.h"
44 class PingMsg : public CMessage_PingMsg
45 {
46   public:
47     char *x;
48
49 };
50
51 class FragMsg : public CMessage_FragMsg
52 {
53   public:
54     char *x; 
55     int fragmentId; 
56     int numFragments; 
57     int pipeSize; 
58     bool copy;
59     bool allocate; 
60
61   FragMsg(int sequenceNumber, int total, int size, bool copyFragments, 
62           bool allocMsgs) 
63     : fragmentId(sequenceNumber), numFragments(total), pipeSize(size), 
64       copy(copyFragments), allocate(allocMsgs) {}
65   
66 };
67
68 class IdMsg : public CMessage_IdMsg
69 {
70   public:
71     CkChareID cid;
72     IdMsg(CkChareID _cid) : cid(_cid) {}
73 };
74
75 CProxy_main mainProxy;
76 int iterations;
77 int payload;
78
79 #define P1 0
80 #define P2 1%CkNumPes()
81
82 class main : public CBase_main
83 {
84   int phase;
85   int pipeSize;
86   CProxy_Ping1 arr1;
87   CProxy_Ping2 arr2;
88   CProxy_Ping3 arr3;
89   CProxy_PingF arrF;
90   CProxy_PingC cid;
91   CProxy_PingG gid;
92   CProxy_PingN ngid;
93 public:
94   main(CkMigrateMessage *m) {}
95   main(CkArgMsg* m)
96   {
97     if(CkNumPes()>2) {
98       CkAbort("Run this program on 1 or 2 processors only.\n");
99     }
100
101     pipeSize = 1024;
102     iterations=NITER;
103     payload=PAYLOAD;
104     if(m->argc>1)
105       payload=atoi(m->argv[1]);
106     if(m->argc>2)
107       iterations=atoi(m->argv[2]);
108     if(m->argc>3)
109       CkPrintf("Usage: pgm +pN [payload] [iterations]\n Where N [1-2], payload (default %d) is integer >0 iterations (default %d) is integer >0 ", PAYLOAD, NITER);
110     CkPrintf("Pingpong with payload: %d iterations: %d\n", payload,iterations);
111     mainProxy = thishandle;
112     gid = CProxy_PingG::ckNew();
113     ngid = CProxy_PingN::ckNew();
114     cid=CProxy_PingC::ckNew(1%CkNumPes());
115     cid=CProxy_PingC::ckNew(new IdMsg(cid.ckGetChareID()),0);
116     arr1 = CProxy_Ping1::ckNew(2);
117     arr2 = CProxy_Ping2::ckNew();
118     arr3 = CProxy_Ping3::ckNew();
119     arrF = CProxy_PingF::ckNew();
120     arr2(0,0).insert(P1);
121     arr2(0,1).insert(P2);
122     arr2.doneInserting();
123     arr3(0,0,0).insert(P1);
124     arr3(0,0,1).insert(P2);
125     arr3.doneInserting();
126     arrF[CkArrayIndexFancy("first")].insert(P1);
127     arrF[CkArrayIndexFancy("second")].insert(P2);
128     arrF.doneInserting();
129     phase=0;
130     CkStartQD(CkCallback(CkIndex_main::maindone(), mainProxy));
131     delete m;
132   };
133
134   void maindone(void)
135   {
136     bool isPipelined, allocMsgs, copyFragments;
137     switch(phase) {
138       case 0:
139         arr1[0].start();
140         break;
141       case 1:
142         arr2(0,0).start();
143         break;
144       case 2:
145         arr3(0,0,0).start();
146         break;
147       case 3:
148         arrF[CkArrayIndexFancy("first")].start();
149         break;
150       case 4:
151         cid.start();
152         break;
153       case 5:       
154         isPipelined = false; 
155         copyFragments = false;
156         allocMsgs = false; 
157         gid[0].start(isPipelined, copyFragments, allocMsgs, 0);
158         break;
159       case 6: 
160         isPipelined = true; 
161         copyFragments = false;
162         allocMsgs = false;
163         gid[0].start(isPipelined, copyFragments, allocMsgs, pipeSize);           
164         break;
165       case 7:
166         isPipelined = true; 
167         copyFragments = false; 
168         allocMsgs = true;
169         gid[0].start(isPipelined, copyFragments, allocMsgs, pipeSize);           
170         break;
171       case 8:
172         isPipelined = true; 
173         copyFragments = true; 
174         allocMsgs = true;
175         gid[0].start(isPipelined, copyFragments, allocMsgs, pipeSize);           
176         // repeat pipelined test for different fragment sizes 
177         if (pipeSize < .5 * payload) {
178           pipeSize *= 2; 
179           phase = 5; 
180         } 
181         break;
182 #ifndef USE_RDMA
183       case 9:
184         ngid[0].start();
185         break;
186 #else
187       case 9:
188           ngid[0].startRDMA();
189           break;
190 #endif
191       default:
192         CkExit();
193     }
194     phase++; 
195   };
196 };
197
198 class PingG : public CBase_PingG
199 {
200   CProxyElement_PingG *pp;
201   int niter;
202   int me, nbr;
203   double start_time, end_time;
204   PingMsg *collectedMsg;
205   FragMsg **fragments;
206   int numFragmentsReceived; 
207   int numFragmentsTotal; 
208   bool copyFragments; 
209   bool allocateMsgs; 
210   int pipeSize; 
211 public:
212   PingG()
213   {
214     me = CkMyPe();    
215     nbr = (me+1)%CkNumPes();
216     pp = new CProxyElement_PingG(thisgroup,nbr);
217     niter = 0;
218     numFragmentsReceived = 0; 
219     numFragmentsTotal = -1; 
220   }
221   PingG(CkMigrateMessage *m) {}
222   void start(bool isPipelined, bool copy, bool allocate, int fragSize)
223   {
224     pipeSize = fragSize;     
225     copyFragments = copy;
226     allocateMsgs = allocate; 
227     PingMsg *msg = new (payload) PingMsg;
228     if (isPipelined) {
229       // CkPrintf("[%d] allocating collected msg\n", CkMyPe()); 
230       collectedMsg = msg;
231       numFragmentsTotal = (payload + pipeSize - 1) / pipeSize; 
232       // CkPrintf("[%d] allocating fragments \n", CkMyPe()); 
233       fragments = new FragMsg*[numFragmentsTotal]; 
234       int fragmentSize = pipeSize; 
235       if (!allocateMsgs) {        
236         // allocate once and reuse
237         for (int i = 0; i < numFragmentsTotal; i++) {
238           if (i == numFragmentsTotal - 1) {
239             fragmentSize = payload - i * fragmentSize;  
240           }
241           // CkPrintf("[%d] allocating %d\n", CkMyPe(), i); 
242           fragments[i] = new (fragmentSize) 
243             FragMsg(i, numFragmentsTotal, fragmentSize, copyFragments, 
244                     allocateMsgs); 
245         }
246       }
247       pipelinedSend(); 
248     }
249     else {
250       start_time = CkWallTimer();
251       (*pp).recv(msg);
252     }
253   }
254
255   void recv(PingMsg *msg)
256   {
257     if(me==0) {
258       niter++;
259       if(niter==iterations) {
260         niter = 0;
261         end_time = CkWallTimer();
262         int titer = (CkNumPes()==1)?(iterations/2) : iterations;
263         CkPrintf("Roundtrip time for Groups is %lf us\n",
264                  1.0e6*(end_time-start_time)/titer);
265         delete msg;
266         mainProxy.maindone();
267       } else {
268         (*pp).recv(msg);
269       }
270     } else {
271       (*pp).recv(msg);
272     }
273   }
274
275   void pipelinedSend() {
276     int fragmentSize = pipeSize; 
277     FragMsg *fragMsg; 
278     for (int i = 0; i < numFragmentsTotal; i++) {      
279       if (i == numFragmentsTotal - 1) {
280         fragmentSize = payload - i * fragmentSize;  
281       }
282       if (allocateMsgs) {
283         // CkPrintf("[%d] allocating %d\n", CkMyPe(), i); 
284         fragMsg = new (fragmentSize) 
285           FragMsg(i, numFragmentsTotal, fragmentSize, copyFragments, allocateMsgs); 
286       }
287       else {
288         fragMsg = fragments[i]; 
289       }
290       if (copyFragments) {
291         // CkPrintf("[%d] copying %d\n", CkMyPe(), i); 
292         memcpy(fragMsg->x, ((char *) collectedMsg ) + i * pipeSize, fragmentSize); 
293       }
294       // CkPrintf("[%d] sending %d\n", CkMyPe(), i); 
295       (*pp).pipelinedRecv(fragMsg);
296     }
297     if (copyFragments) {
298       // CkPrintf("[%d] deleting collectedMsg \n", CkMyPe()); 
299       delete collectedMsg; 
300       collectedMsg = NULL; 
301     }
302   }
303
304   // local function
305   void setupPipelinedRecv(FragMsg *msg) {
306       if (me == 1) {
307         numFragmentsTotal = msg->numFragments; 
308         pipeSize = msg->pipeSize; 
309         if (niter == 0) {
310           // CkPrintf("[%d] allocating fragments\n", CkMyPe()); 
311           fragments = new FragMsg*[numFragmentsTotal]; 
312           copyFragments = msg->copy; 
313           allocateMsgs = msg->allocate;
314         }
315       }
316       if (copyFragments) {
317         // CkPrintf("[%d] allocating collectedMsg\n", CkMyPe()); 
318         collectedMsg = new (payload) PingMsg();
319       }
320       else {
321         collectedMsg = NULL;
322       }
323   }
324
325   void finishPipelinedTest() {
326     niter = 0;
327     if (me == 0) {
328       end_time = CkWallTimer();
329       int titer = (CkNumPes()==1)?(iterations/2) : iterations;
330       CkPrintf("Roundtrip time for Groups "
331                "(%d KB pipe, %s memcpy, "
332                "%s allocs) is %lf us\n",
333                pipeSize / 1024, 
334                copyFragments ? "w/" : "no",
335                allocateMsgs  ? "w/" : "no",
336                1.0e6*(end_time-start_time)/titer);
337       // if fragments were being kept for resending, delete them here
338       if (!allocateMsgs) {
339         for (int i = 0; i < numFragmentsTotal; i++) {
340           // CkPrintf("[%d] deleting fragments %d\n", CkMyPe(), i); 
341           delete fragments[i]; 
342           fragments[i] = NULL; 
343         }
344       }
345       // CkPrintf("[%d] deleting collectedMsg \n", CkMyPe()); 
346       delete collectedMsg; 
347       mainProxy.maindone();
348     }
349     else {
350       // reply for last iteration
351       pipelinedSend(); 
352     }
353     // CkPrintf("[%d] deleting fragments \n", CkMyPe()); 
354     delete [] fragments; 
355     fragments = NULL; 
356   }
357
358   void pipelinedRecv(FragMsg *msg) {
359     //    CkPrintf("[%d] receiving fragment %d of %d\n", CkMyPe(), msg->fragmentId + 1, 
360     //       msg->numFragments);
361     if (numFragmentsReceived == 0) {
362       setupPipelinedRecv(msg);
363     }
364     numFragmentsReceived++; 
365     if (copyFragments) {
366       // CkPrintf("[%d] copying received %d\n", CkMyPe(), msg->fragmentId); 
367       memcpy(&collectedMsg->x[msg->fragmentId * pipeSize], msg->x, msg->pipeSize); 
368     }
369     if (allocateMsgs) {
370       // CkPrintf("[%d] deleting %d\n", CkMyPe(), msg->fragmentId); 
371       delete msg; 
372       msg = NULL; 
373     }
374     else {
375       fragments[msg->fragmentId] = msg; 
376     }
377     if (numFragmentsReceived == numFragmentsTotal) {
378       niter++;
379       numFragmentsReceived = 0;
380
381       // start timing after the warm-up iteration
382       if (me == 0 && niter == 1) {
383         start_time = CkWallTimer();
384       }
385
386       if (niter == iterations + 1) {
387         finishPipelinedTest();
388       }
389       else {
390         pipelinedSend(); 
391       }
392     }
393   }
394 };
395
396
397 class PingN : public CBase_PingN
398 {
399   int niter;
400   int me, nbr;
401 #ifdef USE_RDMA 
402   struct infiDirectUserHandle shandle,rhandle;
403   char *rbuff;
404   char *sbuff;
405 #endif
406   double start_time, end_time;
407 public:
408   PingN()
409   {
410     me = CkMyNode();    
411     nbr = (me+1)%CkNumNodes();
412
413     // note: for RMDA in ping you can only have 1 nbr who is both your
414     // upstream and downstream which makes this an artificially simple
415     // calculation.
416
417     niter = 0;
418 #ifdef USE_RDMA 
419     rbuff=(char *) malloc(payload*sizeof(char));
420     sbuff=(char *) malloc(payload*sizeof(char));
421     bzero(sbuff,payload);
422     // setup persistent comm sender and receiver side
423     double OOB=9999999999.0;
424     rhandle=CmiDirect_createHandle(nbr,rbuff,payload*sizeof(char),PingN::Wrapper_To_CallBack,(void *) this,OOB);
425     thisProxy[nbr].recvHandle((char*) &rhandle,sizeof(struct infiDirectUserHandle));
426 #endif
427   }
428   PingN(CkMigrateMessage *m) {}
429   void recvHandle(char *ptr,int size)
430   {
431
432 #ifdef USE_RDMA 
433     struct infiDirectUserHandle *_shandle=(struct infiDirectUserHandle *) ptr;
434     shandle=*_shandle;
435     CmiDirect_assocLocalBuffer(&shandle,sbuff,payload);
436 #endif
437   }
438   void start(void)
439   {
440     start_time = CkWallTimer();
441     thisProxy[nbr].recv(new (payload) PingMsg);
442   }
443   void startRDMA(void)
444   {
445     niter=0;
446     start_time = CkWallTimer();
447 #ifdef USE_RDMA 
448     CmiDirect_put(&shandle);
449 #else
450     CkAbort("do not call startRDMA if you don't actually have RDMA");
451 #endif
452   }
453
454   void recv(PingMsg *msg)
455   {
456     if(me==0) {
457       niter++;
458       if(niter==iterations) {
459         end_time = CkWallTimer();
460         int titer = (CkNumNodes()==1)?(iterations/2) : iterations;
461         CkPrintf("Roundtrip time for NodeGroups is %lf us\n",
462                  1.0e6*(end_time-start_time)/titer);
463         delete msg;
464         mainProxy.maindone();
465       } else {
466         thisProxy[nbr].recv(msg);
467       }
468     } else {
469       thisProxy[nbr].recv(msg);
470     }
471   }
472   static void Wrapper_To_CallBack(void* pt2Object){
473     // explicitly cast to a pointer to PingN
474     PingN* mySelf = (PingN*) pt2Object;
475
476     // call member
477     if(CkNumNodes() == 0){
478       mySelf->recvRDMA();
479     }else{
480       (mySelf->thisProxy)[CkMyNode()].recvRDMA();   
481     }
482   }
483   // not an entry method, called via Wrapper_To_Callback
484   void recvRDMA()
485   {
486 #ifdef USE_RDMA 
487     CmiDirect_ready(&rhandle);
488 #endif
489     if(me==0) {
490       niter++;
491       if(niter==iterations) {
492         end_time = CkWallTimer();
493         int titer = (CkNumNodes()==1)?(iterations/2) : iterations;
494         CkPrintf("Roundtrip time for NodeGroups RDMA is %lf us\n",
495                  1.0e6*(end_time-start_time)/titer);
496         mainProxy.maindone();
497       } else {
498 #ifdef USE_RDMA 
499         CmiDirect_put(&shandle);
500 #else
501         CkAbort("do not call startRDMA if you don't actually have RDMA");
502 #endif
503       }
504     } else {
505 #ifdef USE_RDMA 
506       CmiDirect_put(&shandle);
507 #else
508       CkAbort("do not call startRDMA if you don't actually have RDMA");
509 #endif
510     }
511   }
512
513 };
514
515
516 class Ping1 : public CBase_Ping1
517 {
518   CProxy_Ping1 *pp;
519   int niter;
520   double start_time, end_time;
521 public:
522   Ping1()
523   {
524     pp = new CProxy_Ping1(thisArrayID);
525     niter = 0;
526   }
527   Ping1(CkMigrateMessage *m) {}
528   void start(void)
529   {
530     (*pp)[1].recv(new (payload) PingMsg);
531     start_time = CkWallTimer();
532   }
533   void recv(PingMsg *msg)
534   {
535     if(thisIndex==0) {
536       niter++;
537       if(niter==iterations) {
538         end_time = CkWallTimer();
539         CkPrintf("Roundtrip time for 1D Arrays is %lf us\n",
540                  1.0e6*(end_time-start_time)/iterations);
541         //mainProxy.maindone();
542         niter=0;
543         start_time = CkWallTimer();
544         (*pp)[0].trecv(msg);
545       } else {
546         (*pp)[1].recv(msg);
547       }
548     } else {
549       (*pp)[0].recv(msg);
550     }
551   }
552   void trecv(PingMsg *msg)
553   {
554     if(thisIndex==0) {
555       niter++;
556       if(niter==iterations) {
557         end_time = CkWallTimer();
558         CkPrintf("Roundtrip time for 1D threaded Arrays is %lf us\n",
559                  1.0e6*(end_time-start_time)/iterations);
560         mainProxy.maindone();
561       } else {
562         (*pp)[1].trecv(msg);
563       }
564     } else {
565       (*pp)[0].trecv(msg);
566     }
567   }
568 };
569
570 class Ping2 : public CBase_Ping2
571 {
572   CProxy_Ping2 *pp;
573   int niter;
574   double start_time, end_time;
575 public:
576   Ping2()
577   {
578     pp = new CProxy_Ping2(thisArrayID);
579     niter = 0;
580   }
581   Ping2(CkMigrateMessage *m) {}
582   void start(void)
583   {
584     (*pp)(0,1).recv(new (payload) PingMsg);
585     start_time = CkWallTimer();
586   }
587   void recv(PingMsg *msg)
588   {
589     if(thisIndex.y==0) {
590       niter++;
591       if(niter==iterations) {
592         end_time = CkWallTimer();
593         CkPrintf("Roundtrip time for 2D Arrays is %lf us\n",
594                  1.0e6*(end_time-start_time)/iterations);
595         mainProxy.maindone();
596       } else {
597         (*pp)(0,1).recv(msg);
598       }
599     } else {
600       (*pp)(0,0).recv(msg);
601     }
602   }
603 };
604
605 class Ping3 : public CBase_Ping3
606 {
607   CProxy_Ping3 *pp;
608   int niter;
609   double start_time, end_time;
610 public:
611   Ping3()
612   {
613     pp = new CProxy_Ping3(thisArrayID);
614     niter = 0;
615   }
616   Ping3(CkMigrateMessage *m) {}
617   void start(void)
618   {
619     (*pp)(0,0,1).recv(new (payload) PingMsg);
620     start_time = CkWallTimer();
621   }
622   void recv(PingMsg *msg)
623   {
624     if(thisIndex.z==0) {
625       niter++;
626       if(niter==iterations) {
627         end_time = CkWallTimer();
628         CkPrintf("Roundtrip time for 3D Arrays is %lf us\n",
629                  1.0e6*(end_time-start_time)/iterations);
630         mainProxy.maindone();
631       } else {
632         (*pp)(0,0,1).recv(msg);
633       }
634     } else {
635       (*pp)(0,0,0).recv(msg);
636     }
637   }
638 };
639
640 class PingF : public CBase_PingF
641 {
642   CProxy_PingF *pp;
643   int niter;
644   double start_time, end_time;
645   int first;
646 public:
647   PingF()
648   {
649     pp = new CProxy_PingF(thisArrayID);
650     niter = 0;
651     first = thisIndex.equals("first") ? 1 : 0;
652   }
653   PingF(CkMigrateMessage *m) {}
654   void start(void)
655   {
656     (*pp)[CkArrayIndexFancy("second")].recv(new (payload) PingMsg);
657     start_time = CkWallTimer();
658   }
659   void recv(PingMsg *msg)
660   {
661     CkArrayIndexFancy partner((char *)(first?"second" : "first"));
662     if(first) {
663       niter++;
664       if(niter==iterations) {
665         end_time = CkWallTimer();
666         CkPrintf("Roundtrip time for Fancy Arrays is %lf us\n",
667                  1.0e6*(end_time-start_time)/iterations);
668         delete msg;
669         mainProxy.maindone();
670       } else {
671         (*pp)[partner].recv(msg);
672       }
673     } else {
674       (*pp)[partner].recv(msg);
675     }
676   }
677 };
678
679 class PingC : public CBase_PingC
680 {
681   CProxy_PingC *pp;
682   int niter;
683   double start_time, end_time;
684   int first;
685  public:
686   PingC(void)
687   {
688     first = 0;
689   }
690   PingC(IdMsg *msg)
691   {
692     first = 1;
693     CProxy_PingC pc(msg->cid);
694     msg->cid = thishandle;
695     pc.exchange(msg);
696   }
697   PingC(CkMigrateMessage *m) {}
698   void start(void)
699   {
700     niter = 0;
701     pp->recvReuse(new (payload) PingMsg);
702     start_time = CkWallTimer();
703   }
704   void exchange(IdMsg *msg)
705   {
706     if(first) {
707       pp = new CProxy_PingC(msg->cid);
708       delete msg;
709     } else {
710       pp = new CProxy_PingC(msg->cid);
711       msg->cid = thishandle;
712       pp->exchange(msg);
713     }
714   }
715   void recvReuse(PingMsg *msg)
716   {
717     if(first) {
718       niter++;
719       if(niter==iterations) {
720         end_time = CkWallTimer();
721         CkPrintf("Roundtrip time for Chares (reuse msgs) is %lf us\n",
722                  1.0e6*(end_time-start_time)/iterations);
723         niter = 0;
724         delete msg;
725         pp->recv(new (payload) PingMsg);
726         start_time = CkWallTimer();
727       } else {
728         pp->recvReuse(msg);
729       }
730     } else {
731       pp->recvReuse(msg);
732     }
733   }
734   void recv(PingMsg *msg)
735   {
736     delete msg;
737     if(first) {
738       niter++;
739       if(niter==iterations) {
740         end_time = CkWallTimer();
741         CkPrintf("Roundtrip time for Chares (new/del msgs) is %lf us\n",
742                  1.0e6*(end_time-start_time)/iterations);
743         niter = 0;
744         pp->trecv(new (payload) PingMsg);
745         start_time = CkWallTimer();
746       } else {
747         pp->recv(new (payload) PingMsg);
748       }
749     } else {
750       pp->recv(new (payload) PingMsg);
751     }
752   }
753   void trecv(PingMsg *msg)
754   {
755     if(first) {
756       niter++;
757       if(niter==iterations) {
758         end_time = CkWallTimer();
759         CkPrintf("Roundtrip time for threaded Chares (reuse) is %lf us\n",
760                  1.0e6*(end_time-start_time)/iterations);
761         delete msg;
762         mainProxy.maindone();
763       } else {
764         pp->trecv(msg);
765       }
766     } else {
767       pp->trecv(msg);
768     }
769   }
770 };
771 #include "pingpong.def.h"