tests/charm++/pingpong: added pipelined test for groups
authorLukasz Wesolowski <wesolwsk@illinois.edu>
Thu, 5 Jul 2012 01:56:52 +0000 (20:56 -0500)
committerLukasz Wesolowski <wesolwsk@illinois.edu>
Thu, 5 Jul 2012 01:56:52 +0000 (20:56 -0500)
tests/charm++/pingpong/pingpong.C
tests/charm++/pingpong/pingpong.ci

index 666ca112698c46c5eed56447ba71f6ee2ef661a7..817a0a1054d48fc6dab364a7944f0803ee957a8a 100644 (file)
@@ -48,6 +48,21 @@ class PingMsg : public CMessage_PingMsg
 
 };
 
 
 };
 
+class FragMsg : public CMessage_FragMsg
+{
+  public:
+    char *x; 
+    int fragmentId; 
+    int numFragments; 
+    int pipeSize; 
+    bool copy;
+
+  FragMsg(int sequenceNumber, int total, int size, bool copyFragments) 
+    : fragmentId(sequenceNumber), numFragments(total), pipeSize(size), 
+      copy(copyFragments) {}
+  
+};
+
 class IdMsg : public CMessage_IdMsg
 {
   public:
 class IdMsg : public CMessage_IdMsg
 {
   public:
@@ -65,6 +80,7 @@ int payload;
 class main : public CBase_main
 {
   int phase;
 class main : public CBase_main
 {
   int phase;
+  int pipeSize;
   CProxy_Ping1 arr1;
   CProxy_Ping2 arr2;
   CProxy_Ping3 arr3;
   CProxy_Ping1 arr1;
   CProxy_Ping2 arr2;
   CProxy_Ping3 arr3;
@@ -80,6 +96,7 @@ public:
       CkAbort("Run this program on 1 or 2 processors only.\n");
     }
 
       CkAbort("Run this program on 1 or 2 processors only.\n");
     }
 
+    pipeSize = 8192;
     iterations=NITER;
     payload=PAYLOAD;
     if(m->argc>1)
     iterations=NITER;
     payload=PAYLOAD;
     if(m->argc>1)
@@ -115,7 +132,8 @@ public:
 
   void maindone(void)
   {
 
   void maindone(void)
   {
-    switch(phase++) {
+    bool isPipelined, copyFragments;
+    switch(phase) {
       case 0:
        arr1[0].start();
        break;
       case 0:
        arr1[0].start();
        break;
@@ -131,21 +149,39 @@ public:
       case 4:
         cid.start();
         break;
       case 4:
         cid.start();
         break;
-      case 5:
-        gid[0].start();
+      case 5:       
+        isPipelined = false; 
+        copyFragments = false;
+        gid[0].start(isPipelined, copyFragments, 0);
+        break;
+      case 6: 
+        isPipelined = true; 
+        copyFragments = false;
+        gid[0].start(isPipelined, copyFragments, pipeSize);           
+        break;
+      case 7:
+        isPipelined = true; 
+        copyFragments = true; 
+        gid[0].start(isPipelined, copyFragments, pipeSize);           
+        // repeat pipelined test for different fragment sizes 
+        if (pipeSize < .5 * payload) {
+          pipeSize *= 2; 
+          phase = 5; 
+        } 
         break;
 #ifndef USE_RDMA
         break;
 #ifndef USE_RDMA
-      case 6:
+      case 8:
         ngid[0].start();
         break;
 #else
         ngid[0].start();
         break;
 #else
-      case 6:
+      case 8:
          ngid[0].startRDMA();
          break;
 #endif
       default:
         CkExit();
     }
          ngid[0].startRDMA();
          break;
 #endif
       default:
         CkExit();
     }
+    phase++; 
   };
 };
 
   };
 };
 
@@ -155,6 +191,11 @@ class PingG : public CBase_PingG
   int niter;
   int me, nbr;
   double start_time, end_time;
   int niter;
   int me, nbr;
   double start_time, end_time;
+  PingMsg *collectedMsg; 
+  int numFragmentsReceived; 
+  int numFragmentsTotal; 
+  bool copyFragments; 
+  int pipeSize; 
 public:
   PingG()
   {
 public:
   PingG()
   {
@@ -162,18 +203,31 @@ public:
     nbr = (me+1)%CkNumPes();
     pp = new CProxyElement_PingG(thisgroup,nbr);
     niter = 0;
     nbr = (me+1)%CkNumPes();
     pp = new CProxyElement_PingG(thisgroup,nbr);
     niter = 0;
+    numFragmentsReceived = 0; 
+    numFragmentsTotal = -1; 
   }
   PingG(CkMigrateMessage *m) {}
   }
   PingG(CkMigrateMessage *m) {}
-  void start(void)
+  void start(bool isPipelined, bool copy, int fragSize)
   {
   {
+    pipeSize = fragSize;     
+    copyFragments = copy;
+    PingMsg *msg = new (payload) PingMsg;
     start_time = CkWallTimer();
     start_time = CkWallTimer();
-    (*pp).recv(new (payload) PingMsg);
+    if (isPipelined) {
+      collectedMsg = msg;
+      pipelinedSend(); 
+    }
+    else {
+      (*pp).recv(msg);
+    }
   }
   }
+
   void recv(PingMsg *msg)
   {
     if(me==0) {
       niter++;
       if(niter==iterations) {
   void recv(PingMsg *msg)
   {
     if(me==0) {
       niter++;
       if(niter==iterations) {
+        niter = 0;
         end_time = CkWallTimer();
         int titer = (CkNumPes()==1)?(iterations/2) : iterations;
         CkPrintf("Roundtrip time for Groups is %lf us\n",
         end_time = CkWallTimer();
         int titer = (CkNumPes()==1)?(iterations/2) : iterations;
         CkPrintf("Roundtrip time for Groups is %lf us\n",
@@ -187,6 +241,66 @@ public:
       (*pp).recv(msg);
     }
   }
       (*pp).recv(msg);
     }
   }
+
+  void pipelinedSend() {
+    int numFragments = (payload + pipeSize - 1) / pipeSize; 
+    int fragmentSize = pipeSize; 
+    FragMsg *fragMsg; 
+    for (int i = 0; i < numFragments; i++) {      
+      if (i == numFragments - 1) {
+        fragmentSize = payload - i * fragmentSize;  
+      }      
+      fragMsg = new (fragmentSize) FragMsg(i, numFragments, fragmentSize, copyFragments); 
+      if (copyFragments) {
+        memcpy(fragMsg->x, ((char *) collectedMsg ) + i * pipeSize, fragmentSize); 
+      }
+      (*pp).pipelinedRecv(fragMsg);
+    }
+  }
+
+  void pipelinedRecv(FragMsg *msg) {
+    //    CkPrintf("[%d] receiving fragment %d of %d\n", CkMyPe(), msg->fragmentId + 1, 
+    //       msg->numFragments);
+    if (me == 1 && numFragmentsReceived == 0) {
+      pipeSize = msg->pipeSize; 
+      copyFragments = msg->copy; 
+      if (copyFragments) {
+        collectedMsg = new (payload) PingMsg();
+      }
+      else {
+        collectedMsg = NULL;
+      }
+    }
+    numFragmentsReceived++; 
+    numFragmentsTotal = msg->numFragments; 
+    if (copyFragments) {
+      memcpy(&collectedMsg->x[msg->fragmentId * pipeSize], msg->x, msg->pipeSize); 
+    }
+    if (numFragmentsReceived == numFragmentsTotal) {
+      niter++;
+      numFragmentsReceived = 0; 
+
+      if (niter == iterations) {
+        niter = 0;
+        if (me == 0) {
+          end_time = CkWallTimer();
+          int titer = (CkNumPes()==1)?(iterations/2) : iterations;
+          CkPrintf("Roundtrip time for Groups "
+                   "(pipe size %d KB, %s memcpy) is %lf us\n",
+                   pipeSize / 1024, copyFragments ? "with" : "without", 
+                   1.0e6*(end_time-start_time)/titer);
+          mainProxy.maindone();
+        }
+        else {
+          pipelinedSend(); 
+        }
+        delete collectedMsg;
+      }
+      else {
+        pipelinedSend(); 
+      }
+    }
+  }
 };
 
 
 };
 
 
index e2ca993d535406b52c7cda33be569919bfea3595..2d72a36f32c26dddbe1e21b80125c9f12be0a292 100644 (file)
@@ -7,6 +7,9 @@ mainmodule pingpong {
     entry void maindone(void);
   };
   message PingMsg{char x[];};
     entry void maindone(void);
   };
   message PingMsg{char x[];};
+  message FragMsg{
+    char x[];
+  };
   message IdMsg;
 
   array [1D] Ping1 {
   message IdMsg;
 
   array [1D] Ping1 {
@@ -36,8 +39,9 @@ mainmodule pingpong {
   };
   group PingG {
     entry PingG(void);
   };
   group PingG {
     entry PingG(void);
-    entry void start(void);
+    entry void start(bool isPipelined, bool copy, int fragSize);
     entry void recv(PingMsg *);
     entry void recv(PingMsg *);
+    entry void pipelinedRecv(FragMsg *); 
   }
   nodegroup PingN {
     entry PingN(void);
   }
   nodegroup PingN {
     entry PingN(void);