tests/charm++/pingpong: for group pinpong, (1) added pipelined run
authorLukasz Wesolowski <wesolwsk@illinois.edu>
Thu, 5 Jul 2012 22:54:50 +0000 (17:54 -0500)
committerLukasz Wesolowski <wesolwsk@illinois.edu>
Thu, 5 Jul 2012 22:54:50 +0000 (17:54 -0500)
without allocation of fragment messages, (2) added warm up iteration
that does not get timed

tests/charm++/pingpong/pingpong.C
tests/charm++/pingpong/pingpong.ci

index 817a0a1054d48fc6dab364a7944f0803ee957a8a..258e9b862289c8043f9397ed4011138e26017b0b 100644 (file)
@@ -56,10 +56,12 @@ class FragMsg : public CMessage_FragMsg
     int numFragments; 
     int pipeSize; 
     bool copy;
     int numFragments; 
     int pipeSize; 
     bool copy;
+    bool allocate; 
 
 
-  FragMsg(int sequenceNumber, int total, int size, bool copyFragments) 
+  FragMsg(int sequenceNumber, int total, int size, bool copyFragments, 
+          bool allocMsgs) 
     : fragmentId(sequenceNumber), numFragments(total), pipeSize(size), 
     : fragmentId(sequenceNumber), numFragments(total), pipeSize(size), 
-      copy(copyFragments) {}
+      copy(copyFragments), allocate(allocMsgs) {}
   
 };
 
   
 };
 
@@ -96,7 +98,7 @@ public:
       CkAbort("Run this program on 1 or 2 processors only.\n");
     }
 
       CkAbort("Run this program on 1 or 2 processors only.\n");
     }
 
-    pipeSize = 8192;
+    pipeSize = 1024;
     iterations=NITER;
     payload=PAYLOAD;
     if(m->argc>1)
     iterations=NITER;
     payload=PAYLOAD;
     if(m->argc>1)
@@ -107,7 +109,6 @@ public:
       CkPrintf("Usage: pgm +pN [payload] [iterations]\n Where N [1-2], payload (default %d) is integer >0 iterations (default %d) is integer >0 ", PAYLOAD, NITER);
     CkPrintf("Pingpong with payload: %d iterations: %d\n", payload,iterations);
     mainProxy = thishandle;
       CkPrintf("Usage: pgm +pN [payload] [iterations]\n Where N [1-2], payload (default %d) is integer >0 iterations (default %d) is integer >0 ", PAYLOAD, NITER);
     CkPrintf("Pingpong with payload: %d iterations: %d\n", payload,iterations);
     mainProxy = thishandle;
-    phase = 0;
     gid = CProxy_PingG::ckNew();
     ngid = CProxy_PingN::ckNew();
     cid=CProxy_PingC::ckNew(1%CkNumPes());
     gid = CProxy_PingG::ckNew();
     ngid = CProxy_PingN::ckNew();
     cid=CProxy_PingC::ckNew(1%CkNumPes());
@@ -132,7 +133,7 @@ public:
 
   void maindone(void)
   {
 
   void maindone(void)
   {
-    bool isPipelined, copyFragments;
+    bool isPipelined, allocMsgs, copyFragments;
     switch(phase) {
       case 0:
        arr1[0].start();
     switch(phase) {
       case 0:
        arr1[0].start();
@@ -152,17 +153,26 @@ public:
       case 5:       
         isPipelined = false; 
         copyFragments = false;
       case 5:       
         isPipelined = false; 
         copyFragments = false;
-        gid[0].start(isPipelined, copyFragments, 0);
+        allocMsgs = false; 
+        gid[0].start(isPipelined, copyFragments, allocMsgs, 0);
         break;
       case 6: 
         isPipelined = true; 
         copyFragments = false;
         break;
       case 6: 
         isPipelined = true; 
         copyFragments = false;
-        gid[0].start(isPipelined, copyFragments, pipeSize);           
+        allocMsgs = false;
+        gid[0].start(isPipelined, copyFragments, allocMsgs, pipeSize);           
         break;
       case 7:
         break;
       case 7:
+        isPipelined = true; 
+        copyFragments = false; 
+        allocMsgs = true;
+        gid[0].start(isPipelined, copyFragments, allocMsgs, pipeSize);           
+        break;
+      case 8:
         isPipelined = true; 
         copyFragments = true; 
         isPipelined = true; 
         copyFragments = true; 
-        gid[0].start(isPipelined, copyFragments, pipeSize);           
+        allocMsgs = true;
+        gid[0].start(isPipelined, copyFragments, allocMsgs, pipeSize);           
         // repeat pipelined test for different fragment sizes 
         if (pipeSize < .5 * payload) {
           pipeSize *= 2; 
         // repeat pipelined test for different fragment sizes 
         if (pipeSize < .5 * payload) {
           pipeSize *= 2; 
@@ -170,11 +180,11 @@ public:
         } 
         break;
 #ifndef USE_RDMA
         } 
         break;
 #ifndef USE_RDMA
-      case 8:
+      case 9:
         ngid[0].start();
         break;
 #else
         ngid[0].start();
         break;
 #else
-      case 8:
+      case 9:
          ngid[0].startRDMA();
          break;
 #endif
          ngid[0].startRDMA();
          break;
 #endif
@@ -191,10 +201,12 @@ class PingG : public CBase_PingG
   int niter;
   int me, nbr;
   double start_time, end_time;
   int niter;
   int me, nbr;
   double start_time, end_time;
-  PingMsg *collectedMsg; 
+  PingMsg *collectedMsg;
+  FragMsg **fragments;
   int numFragmentsReceived; 
   int numFragmentsTotal; 
   bool copyFragments; 
   int numFragmentsReceived; 
   int numFragmentsTotal; 
   bool copyFragments; 
+  bool allocateMsgs; 
   int pipeSize; 
 public:
   PingG()
   int pipeSize; 
 public:
   PingG()
@@ -207,17 +219,32 @@ public:
     numFragmentsTotal = -1; 
   }
   PingG(CkMigrateMessage *m) {}
     numFragmentsTotal = -1; 
   }
   PingG(CkMigrateMessage *m) {}
-  void start(bool isPipelined, bool copy, int fragSize)
+  void start(bool isPipelined, bool copy, bool allocate, int fragSize)
   {
     pipeSize = fragSize;     
     copyFragments = copy;
   {
     pipeSize = fragSize;     
     copyFragments = copy;
+    allocateMsgs = allocate; 
     PingMsg *msg = new (payload) PingMsg;
     PingMsg *msg = new (payload) PingMsg;
-    start_time = CkWallTimer();
     if (isPipelined) {
       collectedMsg = msg;
     if (isPipelined) {
       collectedMsg = msg;
+      numFragmentsTotal = (payload + pipeSize - 1) / pipeSize; 
+      fragments = new FragMsg*[numFragmentsTotal]; 
+      int fragmentSize = pipeSize; 
+      if (!allocateMsgs) {        
+        // allocate once and reuse
+        for (int i = 0; i < numFragmentsTotal; i++) {
+          if (i == numFragmentsTotal - 1) {
+            fragmentSize = payload - i * fragmentSize;  
+          }
+          fragments[i] = new (fragmentSize) 
+            FragMsg(i, numFragmentsTotal, fragmentSize, copyFragments, 
+                    allocateMsgs); 
+        }
+      }
       pipelinedSend(); 
     }
     else {
       pipelinedSend(); 
     }
     else {
+      start_time = CkWallTimer();
       (*pp).recv(msg);
     }
   }
       (*pp).recv(msg);
     }
   }
@@ -243,14 +270,19 @@ public:
   }
 
   void pipelinedSend() {
   }
 
   void pipelinedSend() {
-    int numFragments = (payload + pipeSize - 1) / pipeSize; 
     int fragmentSize = pipeSize; 
     FragMsg *fragMsg; 
     int fragmentSize = pipeSize; 
     FragMsg *fragMsg; 
-    for (int i = 0; i < numFragments; i++) {      
-      if (i == numFragments - 1) {
+    for (int i = 0; i < numFragmentsTotal; i++) {      
+      if (i == numFragmentsTotal - 1) {
         fragmentSize = payload - i * fragmentSize;  
         fragmentSize = payload - i * fragmentSize;  
-      }      
-      fragMsg = new (fragmentSize) FragMsg(i, numFragments, fragmentSize, copyFragments); 
+      }
+      if (allocateMsgs) {
+        fragMsg = new (fragmentSize) 
+          FragMsg(i, numFragmentsTotal, fragmentSize, copyFragments, allocateMsgs); 
+      }
+      else {
+        fragMsg = fragments[i]; 
+      }
       if (copyFragments) {
         memcpy(fragMsg->x, ((char *) collectedMsg ) + i * pipeSize, fragmentSize); 
       }
       if (copyFragments) {
         memcpy(fragMsg->x, ((char *) collectedMsg ) + i * pipeSize, fragmentSize); 
       }
@@ -262,8 +294,11 @@ public:
     //    CkPrintf("[%d] receiving fragment %d of %d\n", CkMyPe(), msg->fragmentId + 1, 
     //       msg->numFragments);
     if (me == 1 && numFragmentsReceived == 0) {
     //    CkPrintf("[%d] receiving fragment %d of %d\n", CkMyPe(), msg->fragmentId + 1, 
     //       msg->numFragments);
     if (me == 1 && numFragmentsReceived == 0) {
+      numFragmentsTotal = msg->numFragments; 
+      fragments = new FragMsg*[numFragmentsTotal]; 
       pipeSize = msg->pipeSize; 
       copyFragments = msg->copy; 
       pipeSize = msg->pipeSize; 
       copyFragments = msg->copy; 
+      allocateMsgs = msg->allocate;
       if (copyFragments) {
         collectedMsg = new (payload) PingMsg();
       }
       if (copyFragments) {
         collectedMsg = new (payload) PingMsg();
       }
@@ -272,29 +307,49 @@ public:
       }
     }
     numFragmentsReceived++; 
       }
     }
     numFragmentsReceived++; 
-    numFragmentsTotal = msg->numFragments; 
     if (copyFragments) {
       memcpy(&collectedMsg->x[msg->fragmentId * pipeSize], msg->x, msg->pipeSize); 
     }
     if (copyFragments) {
       memcpy(&collectedMsg->x[msg->fragmentId * pipeSize], msg->x, msg->pipeSize); 
     }
+    if (allocateMsgs) {
+      delete msg; 
+    }
+    else {
+      fragments[msg->fragmentId] = msg; 
+    }
     if (numFragmentsReceived == numFragmentsTotal) {
       niter++;
     if (numFragmentsReceived == numFragmentsTotal) {
       niter++;
-      numFragmentsReceived = 0; 
+      numFragmentsReceived = 0;
+
+      // start timing after the warm-up iteration
+      if (me == 0 && niter == 1) {
+        start_time = CkWallTimer();
+      }
 
 
-      if (niter == iterations) {
+      if (niter == iterations + 1) {
         niter = 0;
         if (me == 0) {
           end_time = CkWallTimer();
           int titer = (CkNumPes()==1)?(iterations/2) : iterations;
           CkPrintf("Roundtrip time for Groups "
         niter = 0;
         if (me == 0) {
           end_time = CkWallTimer();
           int titer = (CkNumPes()==1)?(iterations/2) : iterations;
           CkPrintf("Roundtrip time for Groups "
-                   "(pipe size %d KB, %s memcpy) is %lf us\n",
-                   pipeSize / 1024, copyFragments ? "with" : "without", 
+                   "(%d KB pipe, %s memcpy, "
+                   "%s allocs) is %lf us\n",
+                   pipeSize / 1024, 
+                   copyFragments ? "\b" : "no",
+                   allocateMsgs  ? "\b" : "no",
                    1.0e6*(end_time-start_time)/titer);
                    1.0e6*(end_time-start_time)/titer);
+          // if fragments were being kept for resending, delete them here
+          if (!allocateMsgs) {
+            for (int i = 0; i < numFragmentsTotal; i++) {
+              delete fragments[i]; 
+            }
+          }
           mainProxy.maindone();
         }
         else {
           pipelinedSend(); 
         }
         delete collectedMsg;
           mainProxy.maindone();
         }
         else {
           pipelinedSend(); 
         }
         delete collectedMsg;
+        delete [] fragments; 
       }
       else {
         pipelinedSend(); 
       }
       else {
         pipelinedSend(); 
index 2d72a36f32c26dddbe1e21b80125c9f12be0a292..655bc63e58c41effc7566a3bd1bf91c3898f0734 100644 (file)
@@ -39,7 +39,7 @@ mainmodule pingpong {
   };
   group PingG {
     entry PingG(void);
   };
   group PingG {
     entry PingG(void);
-    entry void start(bool isPipelined, bool copy, int fragSize);
+    entry void start(bool isPipelined, bool copy, bool allocate, int fragSize);
     entry void recv(PingMsg *);
     entry void pipelinedRecv(FragMsg *); 
   }
     entry void recv(PingMsg *);
     entry void pipelinedRecv(FragMsg *); 
   }