Bug #1938: Fix race condition in converse tests and examples 05/4305/4
authorNitin Bhat <nbhat4@illinois.edu>
Thu, 21 Jun 2018 17:00:58 +0000 (12:00 -0500)
committerNitin Bhat <nbhat4@illinois.edu>
Wed, 27 Jun 2018 19:14:08 +0000 (14:14 -0500)
This fixes the hang in converse multiping on netlrts-linux-x86_64-tcp.
For converse tests and examples that use application arguments, it
is required to process the application user-specified arguments after
calling CmiInitCPUAffinity and CmiInitCPUTopology. Since only PE 0
waits for other PEs to complete the call to CmiInitCPUTopology, there
are potential race conditions that can occur when the non-0 PEs run
faster and send user messages to PE 0. Since PE 0 is the slower PE,
it receives user messages before initializing some application
variables using the user specified arguments, as PE 0 does not exit
from CmiInitCPUTopology. For this reason, the race condition can be
fixed by holding off application message sending on all non-0 PEs
until they receive a message from PE 0 signaling that all PEs have
finished processing their topology information. For within-process
synchronization, a barrier is used to enforce topo initialization
before application messaging.

Change-Id: I102169e5b7c5f8298452cc0d9b3a08b73e02a7e3

examples/converse/pingpong/pingpong.C
examples/converse/pingpong/pingpong_multipairs.C
examples/converse/pingpong_multi/pingpong.C
tests/converse/commbench/commbench.c
tests/converse/machinetest/multiping.C
tests/converse/machinetest/pingall.C
tests/converse/megacon/megacon.c

index f64d9eb049d8593bf701d07eb0898a88c163c7ed..0e089419bee8c8be411e0d89e960713b3e86f062 100644 (file)
@@ -20,6 +20,7 @@ CpvDeclare(int,warmUpDoneHandler);
 CpvDeclare(int,exitHandler);
 CpvDeclare(int,node0Handler);
 CpvDeclare(int,node1Handler);
+CpvDeclare(int,startOperationHandler);
 CpvStaticDeclare(double,startTime);
 CpvStaticDeclare(double,endTime);
 
@@ -146,6 +147,18 @@ CmiHandler node1HandlerFunc(char *msg)
   return 0;
 }
 
+// Converse handler for beginning operation
+CmiHandler startOperationHandlerFunc(char *msg)
+{
+#if USE_PERSISTENT
+  if (CmiMyPe() < CmiNumPes())
+    h = CmiCreateCompressPersistent(otherPe, CpvAccess(maxMsgSize)+1024, 200, CMI_FLOATING);
+#endif
+
+  if (CmiMyPe() == 0)
+    startWarmUp();
+  return 0;
+}
 
 //Converse main. Initialize variables and register handlers
 CmiStartFn mymain(int argc, char *argv[])
@@ -168,6 +181,8 @@ CmiStartFn mymain(int argc, char *argv[])
   CpvAccess(node0Handler) = CmiRegisterHandler((CmiHandler) node0HandlerFunc);
   CpvInitialize(int,node1Handler);
   CpvAccess(node1Handler) = CmiRegisterHandler((CmiHandler) node1HandlerFunc);
+  CpvInitialize(int,startOperationHandler);
+  CpvAccess(startOperationHandler) = CmiRegisterHandler((CmiHandler) startOperationHandlerFunc);
 
   //set warmup run
   CpvAccess(warmUp) = true;
@@ -183,6 +198,9 @@ CmiStartFn mymain(int argc, char *argv[])
   // Initialize CPU topology
   CmiInitCPUTopology(argv);
 
+  // Wait for all PEs of the node to complete topology init
+  CmiNodeAllBarrier();
+
   // Update the argc after runtime parameters are extracted out
   argc = CmiGetArgc(argv);
   if(argc == 5){
@@ -207,14 +225,16 @@ CmiStartFn mymain(int argc, char *argv[])
 
   CpvAccess(msgSize)= CpvAccess(minMsgSize) + CmiMsgHeaderSizeBytes;
 
-#if USE_PERSISTENT
-  if (CmiMyPe() < CmiNumPes())
-    h = CmiCreateCompressPersistent(otherPe, CpvAccess(maxMsgSize)+1024, 200, CMI_FLOATING);
-#endif
-
-  if (CmiMyPe() == 0)
-    startWarmUp();
+  // Node 0 waits till all processors finish their topology processing
+  if(CmiMyPe() == 0) {
+    // Signal all PEs to begin computing
+    char *startOperationMsg = (char *)CmiAlloc(CmiMsgHeaderSizeBytes);
+    CmiSetHandler((char *)startOperationMsg, CpvAccess(startOperationHandler));
+    CmiSyncBroadcastAndFree(CmiMsgHeaderSizeBytes, startOperationMsg);
 
+    // start operation locally on PE 0
+    startOperationHandlerFunc(NULL);
+  }
   return 0;
 }
 
index d331ac08fcf0c7e7f936265b37e4f5cfbd436099..8d13ac6360f0e9e691aad4f8822ac28b1b932bdf 100644 (file)
@@ -22,6 +22,7 @@ CpvDeclare(int,reduceHandler);
 CpvDeclare(int,startRingHandler);
 CpvDeclare(int,node0Handler);
 CpvDeclare(int,node1Handler);
+CpvDeclare(int,startOperationHandler);
 CpvStaticDeclare(double,startTime);
 CpvStaticDeclare(double,endTime);
 
@@ -145,6 +146,24 @@ CmiHandler node1HandlerFunc(char *msg)
     return 0;
 }
 
+// Converse handler for beginning operation
+CmiHandler startOperationHandlerFunc(char *msg)
+{
+#if USE_PERSISTENT
+    h = CmiCreatePersistent(otherPe, maxMsgSize+1024);
+#endif
+    if(CmiMyPe() == 0)
+    {
+        CmiPrintf("Multiple pair send/recv\n bytes \t\t latency(us)\t bandwidth(MBytes/sec)\n");
+    }
+    if (CmiMyPe() <CmiNumPes()/2)
+    {
+        void *sendmsg = CmiAlloc(CmiMsgHeaderSizeBytes);
+        CmiSetHandler(sendmsg,CpvAccess(startRingHandler));
+        CmiSyncSendAndFree(CmiMyPe(), CmiMsgHeaderSizeBytes, sendmsg);
+    }
+    return 0;
+}
 
 //Converse main. Initialize variables and register handlers
 CmiStartFn mymain(int argc, char *argv[])
@@ -167,6 +186,8 @@ CmiStartFn mymain(int argc, char *argv[])
     CpvAccess(node0Handler) = CmiRegisterHandler((CmiHandler) node0HandlerFunc);
     CpvInitialize(int,node1Handler);
     CpvAccess(node1Handler) = CmiRegisterHandler((CmiHandler) node1HandlerFunc);
+    CpvInitialize(int,startOperationHandler);
+    CpvAccess(startOperationHandler) = CmiRegisterHandler((CmiHandler) startOperationHandlerFunc);
     
     CpvInitialize(double,startTime);
     CpvInitialize(double,endTime);
@@ -179,20 +200,19 @@ CmiStartFn mymain(int argc, char *argv[])
     // Initialize CPU topology
     CmiInitCPUTopology(argv);
 
-    
-#if USE_PERSISTENT
-    h = CmiCreatePersistent(otherPe, maxMsgSize+1024);
-#endif
-    if(CmiMyPe() == 0)
-    {
-        CmiPrintf("Multiple pair send/recv\n bytes \t\t latency(us)\t bandwidth(MBytes/sec)\n");
+    // Wait for all PEs of the node to complete topology init
+    CmiNodeAllBarrier();
+
+    // Node 0 waits till all processors finish their topology processing
+    if(CmiMyPe() == 0) {
+        // Signal all PEs to begin computing
+        char *startOperationMsg = (char *)CmiAlloc(CmiMsgHeaderSizeBytes);
+        CmiSetHandler((char *)startOperationMsg, CpvAccess(startOperationHandler));
+        CmiSyncBroadcastAndFree(CmiMsgHeaderSizeBytes, startOperationMsg);
+
+        // start operation locally on PE 0
+        startOperationHandlerFunc(NULL);
     }
-    if (CmiMyPe() <CmiNumPes()/2)
-    {
-        void *sendmsg = CmiAlloc(CmiMsgHeaderSizeBytes);
-        CmiSetHandler(sendmsg,CpvAccess(startRingHandler));
-        CmiSyncSendAndFree(CmiMyPe(), CmiMsgHeaderSizeBytes, sendmsg);
-   }    
     return 0;
 }
 
index 211bd6a3026615fcb4a93cc08a439526e0b465a6..28538d8b40d9f27808181ce4ec837e7666c3389f 100644 (file)
@@ -18,6 +18,7 @@ CpvDeclare(int,recvNum);
 CpvDeclare(int,exitHandler);
 CpvDeclare(int,node0Handler);
 CpvDeclare(int,node1Handler);
+CpvDeclare(int,startOperationHandler);
 CpvStaticDeclare(double,startTime);
 CpvStaticDeclare(double,endTime);
 
@@ -151,6 +152,21 @@ CmiHandler node1HandlerFunc(Message *msg)
     return 0;
 }
 
+// Converse handler for beginning operation
+CmiHandler startOperationHandlerFunc(char *msg)
+{
+#if USE_PERSISTENT
+    h = CmiCreatePersistent(otherPe, maxMsgSize+1024);
+#endif
+    if (CmiMyPe() == 0)
+    {
+#if REUSE_MSG
+        recvMsgs = (Message**) malloc(sizeof(Message*)*(CmiNumPes()-1));
+#endif
+        startRing();
+    }
+    return 0;
+}
 
 //Converse main. Initialize variables and register handlers
 CmiStartFn mymain(int argc, char *argv[])
@@ -167,6 +183,9 @@ CmiStartFn mymain(int argc, char *argv[])
     CpvAccess(node0Handler) = CmiRegisterHandler((CmiHandler) node0HandlerFunc);
     CpvInitialize(int,node1Handler);
     CpvAccess(node1Handler) = CmiRegisterHandler((CmiHandler) node1HandlerFunc);
+    CpvInitialize(int,startOperationHandler);
+    CpvAccess(startOperationHandler) = CmiRegisterHandler((CmiHandler) startOperationHandlerFunc);
+
     
     CpvInitialize(double,startTime);
     CpvInitialize(double,endTime);
@@ -179,18 +198,19 @@ CmiStartFn mymain(int argc, char *argv[])
     // Initialize CPU topology
     CmiInitCPUTopology(argv);
 
-#if USE_PERSISTENT
-    h = CmiCreatePersistent(otherPe, maxMsgSize+1024);
-#endif
-    
-    if (CmiMyPe() == 0)
-    {
-#if REUSE_MSG
-        recvMsgs = (Message**) malloc(sizeof(Message*)*(CmiNumPes()-1));
-#endif
-        startRing();
+    // Wait for all PEs of the node to complete topology init
+    CmiNodeAllBarrier();
+
+    // Node 0 waits till all processors finish their topology processing
+    if(CmiMyPe() == 0) {
+      // Signal all PEs to begin computing
+      char *startOperationMsg = (char *)CmiAlloc(CmiMsgHeaderSizeBytes);
+      CmiSetHandler((char *)startOperationMsg, CpvAccess(startOperationHandler));
+      CmiSyncBroadcastAndFree(CmiMsgHeaderSizeBytes, startOperationMsg);
+
+      // start operation locally on PE 0
+      startOperationHandlerFunc(NULL);
     }
-    
     return 0;
 }
 
index b241aa970973316fd21dfc453d94d5053e7a41f9..72346233bfa608aef0247e518961accda23bd8e9 100644 (file)
@@ -146,6 +146,9 @@ void commbench_init(int argc, char** argv) {
   // Initialize CPU topology
   CmiInitCPUTopology(argv);
 
+  // Wait for all PEs of the node to complete topology init
+  CmiNodeAllBarrier();
+
   // Update the argc after runtime parameters are extracted out
   argc = CmiGetArgc(argv);
 
index 79f22521dbdab707eeb68cbaf73f00567a3522b7..1b47f9123f2886cc82aad1fbb3309196aa7f556a 100644 (file)
@@ -31,6 +31,7 @@ CpvDeclare(int,exitHandler);
 CpvDeclare(int,node0Handler);
 CpvDeclare(int,node1Handler);
 CpvDeclare(int,ackHandler);
+CpvDeclare(int,startOperationHandler);
 CpvStaticDeclare(double,startTime);
 CpvStaticDeclare(double,endTime);
 
@@ -124,6 +125,17 @@ CmiHandler exitHandlerFunc(char *msg)
     return 0;
 }
 
+// Converse handler for beginning operation
+CmiHandler startOperationHandlerFunc(char *msg)
+{
+    CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE, ApplIdleStart, NULL);
+    CcdCallOnConditionKeep(CcdPROCESSOR_END_IDLE, ApplIdleEnd, NULL);
+
+    if ((CmiMyPe() < CmiNumPes()/2) || CpvAccess(twoway))
+        startOperation();
+    return 0;
+}
+
 //The handler that sends out K_FACTOR messages
 CmiHandler node0HandlerFunc(char *msg)
 {
@@ -186,6 +198,8 @@ CmiStartFn mymain(int argc, char **argv)
     CpvAccess(node1Handler) = CmiRegisterHandler((CmiHandler) node1HandlerFunc);
     CpvInitialize(int,ackHandler);
     CpvAccess(ackHandler) = CmiRegisterHandler((CmiHandler) ackHandlerFunc);
+    CpvInitialize(int,startOperationHandler);
+    CpvAccess(startOperationHandler) = CmiRegisterHandler((CmiHandler) startOperationHandlerFunc);
     
     CpvInitialize(double,startTime);
     CpvInitialize(double,endTime);
@@ -205,6 +219,9 @@ CmiStartFn mymain(int argc, char **argv)
     // Initialize CPU topology
     CmiInitCPUTopology(argv);
 
+    // Wait for all PEs of the node to complete topology init
+    CmiNodeAllBarrier();
+
     // Update the argc after runtime parameters are extracted out
     argc = CmiGetArgc(argv);
 
@@ -216,9 +233,6 @@ CmiStartFn mymain(int argc, char **argv)
 
     int otherPe = CmiMyPe() ^ 1;
     
-    CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE, ApplIdleStart, NULL);
-    CcdCallOnConditionKeep(CcdPROCESSOR_END_IDLE, ApplIdleEnd, NULL);
-
     if(CmiMyRank() == CmiMyNodeSize()) return 0;
 
     if(CmiMyPe() == 0) {
@@ -230,9 +244,17 @@ CmiStartFn mymain(int argc, char **argv)
                       CpvAccess(kFactor));
     }
 
-    if ((CmiMyPe() < CmiNumPes()/2) || CpvAccess(twoway))
-        startOperation();
-    
+
+    // Node 0 waits till all processors finish their topology processing
+    if(CmiMyPe() == 0) {
+        // Signal all PEs to begin computing
+        char *startOperationMsg = (char *)CmiAlloc(CmiMsgHeaderSizeBytes);
+        CmiSetHandler((char *)startOperationMsg, CpvAccess(startOperationHandler));
+        CmiSyncBroadcastAndFree(CmiMsgHeaderSizeBytes, startOperationMsg);
+
+        // start operation locally on PE 0
+        startOperationHandlerFunc(NULL);
+    }
     return 0;
 }
 
index efd6100b21e5f334be640e1e80317729000bcad9..9df83080b5e302f2870f912c49f86b8ec03101da 100644 (file)
@@ -27,6 +27,7 @@ CpvDeclare(int,exitHandler);
 CpvDeclare(int,node0Handler);
 CpvDeclare(int,node1Handler);
 CpvDeclare(int,ackHandler);
+CpvDeclare(int,startOperationHandler);
 CpvStaticDeclare(double,startTime);
 CpvStaticDeclare(double,endTime);
 
@@ -113,6 +114,18 @@ CmiHandler exitHandlerFunc(char *msg)
     return 0;
 }
 
+// Converse handler for beginning operation
+CmiHandler startOperationHandlerFunc(char *msg)
+{
+    CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE, ApplIdleStart, NULL);
+    CcdCallOnConditionKeep(CcdPROCESSOR_END_IDLE, ApplIdleEnd, NULL);
+
+    if ((CmiMyPe() < CmiNumPes()/2) || CpvAccess(twoway))
+      startPingpong();
+
+    return 0;
+}
+
 CmiHandler node0HandlerFunc(char *msg)
 {
     CpvAccess(cycleNum)++;
@@ -158,6 +171,8 @@ CmiStartFn mymain(int argc, char** argv)
     CpvAccess(node1Handler) = CmiRegisterHandler((CmiHandler) node1HandlerFunc);
     CpvInitialize(int,ackHandler);
     CpvAccess(ackHandler) = CmiRegisterHandler((CmiHandler) ackHandlerFunc);
+    CpvInitialize(int,startOperationHandler);
+    CpvAccess(startOperationHandler) = CmiRegisterHandler((CmiHandler) startOperationHandlerFunc);
     
     CpvInitialize(double,startTime);
     CpvInitialize(double,endTime);
@@ -171,18 +186,18 @@ CmiStartFn mymain(int argc, char** argv)
     CpvInitialize(int,twoway);
     CpvAccess(twoway) = 0;
 
-    CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE, ApplIdleStart, NULL);
-    CcdCallOnConditionKeep(CcdPROCESSOR_END_IDLE, ApplIdleEnd, NULL);
-
     // Set runtime cpuaffinity
     CmiInitCPUAffinity(argv);
 
     // Initialize CPU topology
     CmiInitCPUTopology(argv);
 
+    // Wait for all PEs of the node to complete topology init
+    CmiNodeAllBarrier();
+
     // Update the argc after runtime parameters are extracted out
     argc = CmiGetArgc(argv);
+
     if(CmiMyRank() == CmiMyNodeSize()) return 0;
     
     if(argc > 1)
@@ -195,9 +210,16 @@ CmiStartFn mymain(int argc, char** argv)
         CmiPrintf("Starting Pingpong with twoway traffic\n");
     }
 
-    if ((CmiMyPe() < CmiNumPes()/2) || CpvAccess(twoway))
-      startPingpong();
+    // Node 0 waits till all processors finish their topology processing
+    if(CmiMyPe() == 0) {
+        // Signal all PEs to begin computing
+        char *startOperationMsg = (char *)CmiAlloc(CmiMsgHeaderSizeBytes);
+        CmiSetHandler((char *)startOperationMsg, CpvAccess(startOperationHandler));
+        CmiSyncBroadcastAndFree(CmiMsgHeaderSizeBytes, startOperationMsg);
 
+        // start operation locally on PE 0
+        startOperationHandlerFunc(NULL);
+    }
     return 0;
 }
 
index 016056fef07e772fbdcc7c8805e14eb171ab293d..ea93797d5b66f28f88eea893fd2a19740f1943f8 100644 (file)
@@ -245,6 +245,9 @@ void megacon_init(int argc, char **argv)
   // Initialize CPU topology
   CmiInitCPUTopology(argv);
 
+  // Wait for all PEs of the node to complete topology init
+  CmiNodeAllBarrier();
+
   // Update the argc after runtime parameters are extracted out
   argc = CmiGetArgc(argv);