Checking in performance benchmark for Comlib streaming.
authorLukasz Wesolowski <wesolwsk@illinois.edu>
Fri, 7 Oct 2011 19:27:45 +0000 (14:27 -0500)
committerLukasz Wesolowski <wesolwsk@illinois.edu>
Fri, 7 Oct 2011 19:27:45 +0000 (14:27 -0500)
tests/charm++/commtest/comlib_stream_performance/Makefile [new file with mode: 0644]
tests/charm++/commtest/comlib_stream_performance/streaming.C [new file with mode: 0644]
tests/charm++/commtest/comlib_stream_performance/streaming.ci [new file with mode: 0644]

diff --git a/tests/charm++/commtest/comlib_stream_performance/Makefile b/tests/charm++/commtest/comlib_stream_performance/Makefile
new file mode 100644 (file)
index 0000000..def5020
--- /dev/null
@@ -0,0 +1,23 @@
+CHARMC=../../../../bin/charmc $(OPTS)
+
+OBJS = streaming.o
+
+all: streaming
+
+streaming: $(OBJS)
+       $(CHARMC) -language charm++ -module comlib -tracemode projections -o streaming $(OBJS) 
+
+streaming.decl.h: streaming.ci
+       $(CHARMC)  streaming.ci
+
+clean:
+       rm -f *.decl.h *.def.h conv-host *.o streaming charmrun *.log *.sum *.sts *~
+
+streaming.o: streaming.C streaming.decl.h
+       $(CHARMC) -c -O3 streaming.C
+
+test: all
+       ./charmrun ./streaming +p2 
+
+debug: all
+       ./charmrun ./streaming +p2 ++debug
\ No newline at end of file
diff --git a/tests/charm++/commtest/comlib_stream_performance/streaming.C b/tests/charm++/commtest/comlib_stream_performance/streaming.C
new file mode 100644 (file)
index 0000000..1a7aedd
--- /dev/null
@@ -0,0 +1,171 @@
+#include "comlib.h"
+#include <cassert>
+
+#include "streaming.decl.h"
+CProxy_Main mainProxy;
+int nElements;
+
+CProxy_WorkerArray basicArrayProxy; 
+CProxy_WorkerArray streamingArrayProxy;
+ComlibInstanceHandle stratStreaming;
+
+
+#define PERIOD_IN_MS 10
+#define NMSGS 16
+#define MAX_MESSAGE_SIZE 10000
+#define MAX_BUFFER_SIZE  100000
+#define ENVELOPE_OVERHEAD_ESTIMATE 100
+#define MIN_TEST_SIZE 16
+#define MAX_TEST_SIZE 1024
+
+class TestMessage : public CMessage_TestMessage {
+public:
+  int length;
+  char* msg;
+};
+
+// mainchare
+
+class Main : public CBase_Main{
+private:
+  int nDone;
+
+public:
+
+  Main(CkArgMsg *m) {    
+    nDone = 0;
+
+    //    com_debug = 1;
+
+    nElements = 2; 
+    if(m->argc >1) nElements=atoi(m->argv[1]);
+    delete m;
+
+    mainProxy = thishandle;
+       
+    // create streaming strategy
+    StreamingStrategy *strategy = new StreamingStrategy(PERIOD_IN_MS, NMSGS,
+                                                        MAX_MESSAGE_SIZE, MAX_BUFFER_SIZE);
+    stratStreaming = ComlibRegister(strategy);
+    streamingArrayProxy = CProxy_WorkerArray::ckNew(nElements, nElements);
+    basicArrayProxy = streamingArrayProxy; 
+
+    ComlibAssociateProxy(stratStreaming, streamingArrayProxy);
+    
+    // initiate using non-delegated proxy because broadcasts do not
+    // work with streaming
+    basicArrayProxy.prepareTest();
+    CkCallback syncWorkers(CkIndex_Main::runWithoutStreaming(), mainProxy); 
+    CkStartQD(syncWorkers);   
+  }
+
+  void runWithStreaming() {
+    CkPrintf("Running Streaming Test...\n"); 
+    basicArrayProxy.initiateSends(streamingArrayProxy);
+    CkCallback syncWorkers(CkIndex_Main::done(), mainProxy); 
+    CkStartQD(syncWorkers);
+  }
+
+  void runWithoutStreaming() {
+    CkPrintf("Running Without Streaming Enabled ... \n");
+    basicArrayProxy.initiateSends(basicArrayProxy);
+    CkCallback syncWorkers(CkIndex_Main::runWithStreaming(), mainProxy); 
+    CkStartQD(syncWorkers);
+  }
+
+  void done() {
+    CkPrintf("Finished test\n");
+    CkExit();
+  }
+
+};
+
+class WorkerArray : public CBase_WorkerArray {
+private:
+  CProxy_WorkerArray localProxy;
+  TestMessage **msgs; 
+  TestMessage **newMsgs; 
+  int msgSize;
+  int nElements; 
+  int neighbor; 
+  double mystartTime; 
+  double myendTime;
+  int receivedMsgs; 
+
+public:
+
+  WorkerArray(int nChares) {
+    nElements = nChares; 
+    msgs = new TestMessage*[NMSGS];
+    newMsgs = new TestMessage*[NMSGS];
+    msgSize = MIN_TEST_SIZE; 
+    // partition into pairs of ranks
+    if (thisIndex % 2 == 0) {
+      neighbor = (thisIndex + nElements/2) % nElements; 
+    }
+    else {
+      neighbor = (thisIndex - nElements/2) % nElements; 
+    }
+    receivedMsgs = 0; 
+  }
+
+  WorkerArray(CkMigrateMessage *m) {}
+
+  void prepareTest() {
+    
+    for (int i = 0; i < NMSGS; i++) {
+      msgs[i] = new(msgSize) TestMessage; 
+    } 
+        
+  }
+
+
+  void initiateSends(CProxy_WorkerArray workerProxy) {
+    double startTime = CkWallTimer();
+    localProxy = workerProxy; 
+    if (thisIndex % 2 == 0) {
+      mystartTime = CkWallTimer();    
+      for (int i = 0; i < NMSGS; i++) {
+        localProxy[neighbor].receiveSends(msgs[i]);
+      }
+    }
+    double endTime = CkWallTimer(); 
+    //    CkPrintf("[%d] initiateSends took %f us\n", thisIndex, (endTime-startTime) * 1000000);
+  }
+
+  void receiveSends(TestMessage *msg) {
+    double startTime = CkWallTimer();
+    // recycle received messages
+    newMsgs[receivedMsgs] = msg; 
+    receivedMsgs++; 
+    if (receivedMsgs == NMSGS) {
+      for (int i=0; i < NMSGS; i++) {
+        localProxy[neighbor].receiveReplies(msgs[i]); 
+      }
+      receivedMsgs = 0;
+      msgs = newMsgs; 
+    }
+    double endTime = CkWallTimer(); 
+    //    CkPrintf("[%d] receiveSends took %f us\n", thisIndex, (endTime-startTime) * 1000000);
+  }
+  
+  void receiveReplies(TestMessage *msg) {
+    double startTime = CkWallTimer();
+    // recycle received messages
+    newMsgs[receivedMsgs] = msg;
+    receivedMsgs++;
+    if (receivedMsgs == NMSGS) {
+      myendTime = CkWallTimer();
+      CkPrintf("[%d] round trip time for sending %d messages: %f us\n", 
+               thisIndex, NMSGS, 1000000 * (myendTime - mystartTime));
+      receivedMsgs = 0;
+      msgs = newMsgs; 
+    }
+    double endTime = CkWallTimer(); 
+    //    CkPrintf("[%d] receiveReplies took %f us\n", thisIndex, (endTime-startTime) * 1000000);
+  }
+
+};
+
+
+#include "streaming.def.h"
diff --git a/tests/charm++/commtest/comlib_stream_performance/streaming.ci b/tests/charm++/commtest/comlib_stream_performance/streaming.ci
new file mode 100644 (file)
index 0000000..4fa7737
--- /dev/null
@@ -0,0 +1,30 @@
+mainmodule streaming {
+  extern module comlib; 
+                
+  readonly CProxy_Main mainProxy;      
+  readonly int nElements;
+
+  readonly CProxy_WorkerArray basicArrayProxy; 
+  readonly CProxy_WorkerArray streamingArrayProxy;
+  readonly ComlibInstanceHandle stratStreaming;                                
+
+  message TestMessage {
+    char msg[];
+  };
+
+  mainchare Main {
+    entry Main(CkArgMsg *m);
+    entry void runWithStreaming();
+    entry void runWithoutStreaming();
+    entry void done();
+  };
+
+  array[1D] WorkerArray {
+    entry WorkerArray(int nChares);
+    entry void prepareTest();
+    entry void initiateSends(CProxy_WorkerArray workerProxy);
+    entry void receiveSends(TestMessage *msg);
+    entry void receiveReplies(TestMessage *msg);
+  };
+
+};