tests/charm++/pingpong: (1) removed a memory leak in pipelined test
authorLukasz Wesolowski <wesolwsk@illinois.edu>
Sat, 7 Jul 2012 05:58:02 +0000 (00:58 -0500)
committerLukasz Wesolowski <wesolwsk@illinois.edu>
Sat, 7 Jul 2012 05:58:02 +0000 (00:58 -0500)
(2) modified the pipeline scheme w/ memcpy to include
the time for allocating a buffer on the receive side.

how memory allocations are done in the pipelined test so

tests/charm++/pingpong/pingpong.C

index 258e9b862289c8043f9397ed4011138e26017b0b..7e48b1e23d16f33388d1f0af1d2fe0890f65c448 100644 (file)
@@ -226,8 +226,10 @@ public:
     allocateMsgs = allocate; 
     PingMsg *msg = new (payload) PingMsg;
     if (isPipelined) {
+      // CkPrintf("[%d] allocating collected msg\n", CkMyPe()); 
       collectedMsg = msg;
       numFragmentsTotal = (payload + pipeSize - 1) / pipeSize; 
+      // CkPrintf("[%d] allocating fragments \n", CkMyPe()); 
       fragments = new FragMsg*[numFragmentsTotal]; 
       int fragmentSize = pipeSize; 
       if (!allocateMsgs) {        
@@ -236,6 +238,7 @@ public:
           if (i == numFragmentsTotal - 1) {
             fragmentSize = payload - i * fragmentSize;  
           }
+          // CkPrintf("[%d] allocating %d\n", CkMyPe(), i); 
           fragments[i] = new (fragmentSize) 
             FragMsg(i, numFragmentsTotal, fragmentSize, copyFragments, 
                     allocateMsgs); 
@@ -277,6 +280,7 @@ public:
         fragmentSize = payload - i * fragmentSize;  
       }
       if (allocateMsgs) {
+        // CkPrintf("[%d] allocating %d\n", CkMyPe(), i); 
         fragMsg = new (fragmentSize) 
           FragMsg(i, numFragmentsTotal, fragmentSize, copyFragments, allocateMsgs); 
       }
@@ -284,34 +288,88 @@ public:
         fragMsg = fragments[i]; 
       }
       if (copyFragments) {
+        // CkPrintf("[%d] copying %d\n", CkMyPe(), i); 
         memcpy(fragMsg->x, ((char *) collectedMsg ) + i * pipeSize, fragmentSize); 
       }
+      // CkPrintf("[%d] sending %d\n", CkMyPe(), i); 
       (*pp).pipelinedRecv(fragMsg);
     }
+    if (copyFragments) {
+      // CkPrintf("[%d] deleting collectedMsg \n", CkMyPe()); 
+      delete collectedMsg; 
+      collectedMsg = NULL; 
+    }
   }
 
-  void pipelinedRecv(FragMsg *msg) {
-    //    CkPrintf("[%d] receiving fragment %d of %d\n", CkMyPe(), msg->fragmentId + 1, 
-    //       msg->numFragments);
-    if (me == 1 && numFragmentsReceived == 0) {
-      numFragmentsTotal = msg->numFragments; 
-      fragments = new FragMsg*[numFragmentsTotal]; 
-      pipeSize = msg->pipeSize; 
-      copyFragments = msg->copy; 
-      allocateMsgs = msg->allocate;
+  // local function
+  void setupPipelinedRecv(FragMsg *msg) {
+      if (me == 1) {
+        numFragmentsTotal = msg->numFragments; 
+        pipeSize = msg->pipeSize; 
+        if (niter == 0) {
+          // CkPrintf("[%d] allocating fragments\n", CkMyPe()); 
+          fragments = new FragMsg*[numFragmentsTotal]; 
+          copyFragments = msg->copy; 
+          allocateMsgs = msg->allocate;
+        }
+      }
       if (copyFragments) {
+        // CkPrintf("[%d] allocating collectedMsg\n", CkMyPe()); 
         collectedMsg = new (payload) PingMsg();
       }
       else {
         collectedMsg = NULL;
       }
+  }
+
+  void finishPipelinedTest() {
+    niter = 0;
+    if (me == 0) {
+      end_time = CkWallTimer();
+      int titer = (CkNumPes()==1)?(iterations/2) : iterations;
+      CkPrintf("Roundtrip time for Groups "
+               "(%d KB pipe, %s memcpy, "
+               "%s allocs) is %lf us\n",
+               pipeSize / 1024, 
+               copyFragments ? "w/" : "no",
+               allocateMsgs  ? "w/" : "no",
+               1.0e6*(end_time-start_time)/titer);
+      // if fragments were being kept for resending, delete them here
+      if (!allocateMsgs) {
+        for (int i = 0; i < numFragmentsTotal; i++) {
+          // CkPrintf("[%d] deleting fragments %d\n", CkMyPe(), i); 
+          delete fragments[i]; 
+          fragments[i] = NULL; 
+        }
+      }
+      // CkPrintf("[%d] deleting collectedMsg \n", CkMyPe()); 
+      delete collectedMsg; 
+      mainProxy.maindone();
+    }
+    else {
+      // reply for last iteration
+      pipelinedSend(); 
+    }
+    // CkPrintf("[%d] deleting fragments \n", CkMyPe()); 
+    delete [] fragments; 
+    fragments = NULL; 
+  }
+
+  void pipelinedRecv(FragMsg *msg) {
+    //    CkPrintf("[%d] receiving fragment %d of %d\n", CkMyPe(), msg->fragmentId + 1, 
+    //       msg->numFragments);
+    if (numFragmentsReceived == 0) {
+      setupPipelinedRecv(msg);
     }
     numFragmentsReceived++; 
     if (copyFragments) {
+      // CkPrintf("[%d] copying received %d\n", CkMyPe(), msg->fragmentId); 
       memcpy(&collectedMsg->x[msg->fragmentId * pipeSize], msg->x, msg->pipeSize); 
     }
     if (allocateMsgs) {
+      // CkPrintf("[%d] deleting %d\n", CkMyPe(), msg->fragmentId); 
       delete msg; 
+      msg = NULL; 
     }
     else {
       fragments[msg->fragmentId] = msg; 
@@ -326,30 +384,7 @@ public:
       }
 
       if (niter == iterations + 1) {
-        niter = 0;
-        if (me == 0) {
-          end_time = CkWallTimer();
-          int titer = (CkNumPes()==1)?(iterations/2) : iterations;
-          CkPrintf("Roundtrip time for Groups "
-                   "(%d KB pipe, %s memcpy, "
-                   "%s allocs) is %lf us\n",
-                   pipeSize / 1024, 
-                   copyFragments ? "\b" : "no",
-                   allocateMsgs  ? "\b" : "no",
-                   1.0e6*(end_time-start_time)/titer);
-          // if fragments were being kept for resending, delete them here
-          if (!allocateMsgs) {
-            for (int i = 0; i < numFragmentsTotal; i++) {
-              delete fragments[i]; 
-            }
-          }
-          mainProxy.maindone();
-        }
-        else {
-          pipelinedSend(); 
-        }
-        delete collectedMsg;
-        delete [] fragments; 
+        finishPipelinedTest();
       }
       else {
         pipelinedSend();