Added a sub-test to bypass the startup phase when a strategy has not
authorLukasz Wesolowski <wesolwsk@talent.cs.illinois.edu>
Thu, 4 Nov 2010 22:05:28 +0000 (17:05 -0500)
committerLukasz Wesolowski <wesolwsk@talent.cs.illinois.edu>
Thu, 4 Nov 2010 22:05:28 +0000 (17:05 -0500)
been set up yet.

tests/charm++/commtest/commlib_stream_extended/streaming.C
tests/charm++/commtest/commlib_stream_extended/streaming.ci

index 462070a0ef7cd9387c6b5d77cb318a11777cfdb9..0ba339e91e42443bef65a11fcf6d4271f292a75f 100644 (file)
@@ -47,7 +47,7 @@ public:
     nDone = 0;
     numArrayStreaming=0;
 
-    comm_debug = 1;
+    // com_debug = 1;
     nElements = 2;
     //if(m->argc >1 ) nElements=atoi(m->argv[1]);
     delete m;
@@ -55,17 +55,25 @@ public:
     mainProxy = thishandle;
        
     // create streaming strategy
-    CharmStrategy *strategy = new StreamingStrategy(PERIOD_IN_MS, NMSGS,
+    StreamingStrategy *strategy = new StreamingStrategy(PERIOD_IN_MS, NMSGS,
                                               MAX_MESSAGE_SIZE, MAX_BUFFER_SIZE);
     stratStreaming = ComlibRegister(strategy);
 
     streamingArrayProxy = CProxy_StreamingArray::ckNew(nElements);
 
-    streamingArrayProxy.testStreamingTimeout();
+    streamingArrayProxy.simpleTest();
       
   }
 
-  void finishTimeoutTest(void) {
+  void finishStartup() {
+    nDone++; 
+    if (nDone == CkNumPes()) {
+      nDone = 0; 
+      streamingArrayProxy.testStreamingTimeout(); 
+    }
+  }
+
+  void finishTimeoutTest() {
     nDone++;
     if (nDone == CkNumPes()) {
       nDone = 0;
@@ -73,7 +81,7 @@ public:
     }
   }
 
-  void finishMaxCountTest(void) {
+  void finishMaxCountTest() {
     nDone++;
     if (nDone==CkNumPes()) {
       nDone = 0;
@@ -81,7 +89,7 @@ public:
     }
   }
 
-  void finishMaxMsgSizeTest(void) {
+  void finishMaxMsgSizeTest() {
     nDone++;
     if (nDone==CkNumPes()) {
       nDone=0;
@@ -89,7 +97,7 @@ public:
     }
   }
 
-  void finishArrayStreaming(void) {
+  void finishArrayStreaming() {
     nDone++;
     if (nDone == nElements) {
       nDone = 0;
@@ -100,11 +108,11 @@ public:
        CkExit();
     }
   }
-  void finishMigrate(void) {
+  void finishMigrate() {
     nDone++;
     if (nDone == nElements) {
       nDone = 0;
-      streamingArrayProxy.testStreamingTimeout();
+      streamingArrayProxy.simpleTest();
     }
   }
 
@@ -119,17 +127,36 @@ private:
 
 public:
 
-  StreamingArray(void) {
+  StreamingArray() {
     msgCount=0;
     msgCountAfterTimeout=0;
   }
 
   StreamingArray(CkMigrateMessage *m) {}
+
+  void simpleTest() {
+    CkPrintf("[%d] ****** Test 0 - Startup phase ******\n", CkMyPe()); 
+    localProxy = thisProxy; 
+    ComlibAssociateProxy(stratStreaming, localProxy);
+    char msg[] = "|This is a short streaming message|";
+    for (int i=0; i<CkNumPes(); i++) {
+      if (i==CkMyPe()) continue;
+      streamingMessage* b = new(strlen(msg)+1,0) streamingMessage;     
+      memcpy(b->msg, msg, strlen(msg)+1);
+      b->length = strlen(msg);
+      localProxy[i].simpleReceive(b);
+    }    
+  }
+
+  void simpleReceive(streamingMessage *m) {
+    CkPrintf("[%d] Startup message arrived: %s\n", CkMyPe(), m->msg); 
+    delete m; 
+    mainProxy.finishStartup(); 
+  }
     
-  void testStreamingTimeout(void) {
+  void testStreamingTimeout() {
     CkPrintf("[%d] ****** Test 1 - Timeout ******\n", CkMyPe());
-    localProxy = thisProxy;
-    ComlibAssociateProxy(&stratStreaming, localProxy);
+
     char msg[] = "|This is a short streaming message|";
     lastSavedTime= CkWallTimer();
     for (int i=0; i<CkNumPes(); i++) {
@@ -154,7 +181,7 @@ public:
     mainProxy.finishTimeoutTest();
   }
 
-  void testStreamingMaxCount(void) {
+  void testStreamingMaxCount() {
     CkPrintf("[%d] ****** Test 2 - Flush on max message count ******\n", CkMyPe());
     char msg[] = "|This is a short streaming message|";
     lastSavedTime=CkWallTimer();
@@ -179,7 +206,7 @@ public:
     msgCount++;;
     if (msgCount==NMSGS) {
       msgCount=0;
-      assert(CkWallTimer()-lastSavedTime < PERIOD_IN_MS/1000);
+      assert(CkWallTimer()-lastSavedTime < .25 * PERIOD_IN_MS/1000);
       CkPrintf("[%d] %d messages arrived after %f seconds - timeout was not incurred\n", 
               CkMyPe(), NMSGS, CkWallTimer()-lastSavedTime);
       assert(strcmp(m->msg,"|This is a short streaming message|") == 0);
@@ -199,7 +226,7 @@ public:
     delete m;
   }
 
-  void testStreamingMaxMsgSize(void) {
+  void testStreamingMaxMsgSize() {
     CkPrintf("[%d] ****** Test 3 - Flush on max message size ******\n", CkMyPe());
     lastSavedTime=CkWallTimer();
     for (int i=0; i<CkNumPes(); i++) {
@@ -210,14 +237,14 @@ public:
   }
 
   void receiveLargeMessage(streamingMessage* m) {
-    assert(CkWallTimer()-lastSavedTime < PERIOD_IN_MS/1000);
+    assert(CkWallTimer()-lastSavedTime < .25 * PERIOD_IN_MS/1000);
     CkPrintf("[%d] large message received after %f seconds with no "
             "timeout incurred\n", CkMyPe(), CkWallTimer()-lastSavedTime);
     delete m;
     mainProxy.finishMaxMsgSizeTest();
   }
 
-  void testStreamingMaxBufSize(void) {
+  void testStreamingMaxBufSize() {
     CkPrintf("[%d] ****** Test 4 - Flush on max buffer size reached ******\n", 
             CkMyPe());
     int msgSize = MAX_MESSAGE_SIZE - ENVELOPE_OVERHEAD_ESTIMATE;
@@ -243,7 +270,7 @@ public:
   }
 
   void receiveAfterMaxBufSizeReached(streamingMessage *m) {
-    assert(CkWallTimer()-lastSavedTime < PERIOD_IN_MS/1000);
+    assert(CkWallTimer()-lastSavedTime < .25 * PERIOD_IN_MS/1000);
     CkPrintf("[%d] Received group of large messages after maximum "
             "buffer size was reached on sending processor (%f seconds)\n", 
             CkMyPe(), CkWallTimer()-lastSavedTime);
@@ -255,7 +282,7 @@ public:
     delete m;
   }
 
-  void migrate(void) {
+  void migrate() {
     int migrateTo = 0;
     if (CkMyPe() != (CkNumPes() - 1))
       migrateTo = CkMyPe() + 1;
index 7dc79d84be7387e5ab1a317d80a23bb7b5cd27c5..8373e3898b9e4ad81ccdb1451e54fcbab779ec39 100644 (file)
@@ -12,26 +12,29 @@ mainmodule streaming {
 
   mainchare Main {
     entry Main(CkArgMsg *m);
-    entry void finishTimeoutTest(void);
-    entry void finishMaxCountTest(void);
-    entry void finishMaxMsgSizeTest(void);
-    entry void finishArrayStreaming(void);
-    entry void finishMigrate(void);            
+    entry void finishStartup();
+    entry void finishTimeoutTest();
+    entry void finishMaxCountTest();
+    entry void finishMaxMsgSizeTest();
+    entry void finishArrayStreaming();
+    entry void finishMigrate();                
   };
 
   array[1D] StreamingArray {
-    entry StreamingArray(void);                        
-    entry void testStreamingTimeout(void);
+    entry StreamingArray();                    
+    entry void simpleTest(); 
+    entry void simpleReceive(streamingMessage *m); 
+    entry void testStreamingTimeout();
     entry void receiveAfterTimeout(streamingMessage *m);
-    entry void testStreamingMaxCount(void);
+    entry void testStreamingMaxCount();
     entry void receiveWithoutTimeout(streamingMessage *m);
     entry void receiveGroupAfterTimeout(streamingMessage *m);
-    entry void testStreamingMaxMsgSize(void);
+    entry void testStreamingMaxMsgSize();
     entry void receiveLargeMessage(streamingMessage *m);
-    entry void testStreamingMaxBufSize(void);
+    entry void testStreamingMaxBufSize();
     entry void receiveAfterMaxBufSizeReached(streamingMessage *m);
     entry void receiveAndIgnore(streamingMessage *m);
-    entry void migrate(void);
+    entry void migrate();
   };
 
 };