Merge branch 'charm' of charmgit:charm into charm
authorYanhua Sun <sun51@hopper11.(none)>
Fri, 7 Oct 2011 21:15:26 +0000 (14:15 -0700)
committerYanhua Sun <sun51@hopper11.(none)>
Fri, 7 Oct 2011 21:15:26 +0000 (14:15 -0700)
15 files changed:
src/arch/gemini_gni/machine.c
src/arch/mpi-linux-x86_64/cc-mpicxx.h
src/arch/mpi-linux-x86_64/cc-mpicxx.sh
src/arch/util/mempool.c
src/arch/util/mempool.h
src/ck-com/ComlibSectionInfo.C
src/ck-com/ComlibStrategy.h
src/ck-com/EachToManyMulticastStrategy.C
src/ck-com/EachToManyMulticastStrategy.h
src/conv-com/StreamingStrategy.C
src/conv-com/convcomlibmanager.C
src/conv-core/cputopology.C
tests/charm++/commtest/comlib_stream_performance/Makefile [new file with mode: 0644]
tests/charm++/commtest/comlib_stream_performance/streaming.C [new file with mode: 0644]
tests/charm++/commtest/comlib_stream_performance/streaming.ci [new file with mode: 0644]

index a98814252326ec233a010008b19238b86223086a..e39ef3de839f370f65eeff370bc23e36eae8a679 100644 (file)
@@ -40,7 +40,7 @@ static void sleep(int secs) {
 #define USE_LRTS_MEMPOOL     1
 
 #if USE_LRTS_MEMPOOL
-static CmiInt8 _mempool_size = 1024ll*1024*4;
+static CmiInt8 _mempool_size = 1024ll*1024*32;
 #endif
 
 #define PRINT_SYH  0
@@ -1653,10 +1653,10 @@ static void _init_DMA_buffer()
     allgather(&DMA_buffer_base_mdh_addr, DMA_buffer_base_mdh_addr_vec, sizeof(mdh_addr_t) );
 }
 
-void *alloc_mempool_block(int size, gni_mem_handle_t *mem_hndl)
+void *alloc_mempool_block(int *size, gni_mem_handle_t *mem_hndl)
 {
-    void *pool = memalign(ALIGNBUF, size);
-    gni_return_t status = MEMORY_REGISTER(onesided_hnd, nic_hndl, pool, size,  mem_hndl, &omdh);
+    void *pool = memalign(ALIGNBUF, *size);
+    gni_return_t status = MEMORY_REGISTER(onesided_hnd, nic_hndl, pool, *size,  mem_hndl, &omdh);
     GNI_RC_CHECK("Mempool register", status);
     return pool;
 }
@@ -1852,7 +1852,7 @@ static void LrtsExit()
 {
     /* free memory ? */
 #if     USE_LRTS_MEMPOOL
-    mempool_destory(mempool);
+    mempool_destroy(mempool);
 #endif
     PMI_Finalize();
     exit(0);
index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..7c1ee09e330a9ecacdd5c8695f433863e4c3ca3b 100644 (file)
@@ -0,0 +1,4 @@
+#ifdef __FCC_VERSION 
+  #undef CMK_THREADS_BUILD_TLS
+  #define CMK_THREADS_BUILD_TLS 0
+#endif
index 1ecaa02539ab9a35e4c4037bb42d298baaca4430..5aae5474a55b8c88fe74409f8ed725040711ff58 100644 (file)
@@ -19,9 +19,12 @@ case "$CMK_REAL_COMPILER" in
 g++)   CMK_AMD64="-m64 -fPIC" ;;
 icpc)  CMK_AMD64="-m64";;
 pgCC)  CMK_AMD64="-DCMK_CC_PGCC=1" ;;
+FCC)   CMK_AMD64="-Kfast -DCMK_CC_PGCC=1 --variadic_macros";;
 esac
 CMK_REAL_C_COMPILER=`$MPICC -show 2>/dev/null | cut -d' ' -f1 `
 
+
+
 CMK_CPP_CHARM="/lib/cpp -P"
 CMK_CPP_C="$MPICC -E"
 CMK_CC="$MPICC $CMK_AMD64 "
@@ -36,6 +39,12 @@ CMK_NATIVE_CXX="$CMK_REAL_COMPILER $CMK_AMD64 "
 CMK_NATIVE_LDXX="$CMK_REAL_COMPILER $CMK_AMD64 "
 CMK_NATIVE_LIBS=""
 
+case "$CMK_REAL_COMPILER" in
+FCC) CMK_NATIVE_LD="$CMK_REAL_C_COMPILER";;
+esac
+
+
+
 # fortran compiler 
 # for Intel Fortran compiler 8.0 and higher which is renamed to ifort from ifc
 # does not work for ifc 7.0
index a277d080762fe86d342c29e4a2c7ae0c2d158b52..722671f9fd303102173b0d1db7dce312ba300aaf 100644 (file)
@@ -49,7 +49,7 @@ mempool_type *mempool_init(size_t pool_size, mempool_newblockfn allocfn, mempool
     return mptr;
 }
 
-void mempool_destory(mempool_type *mptr)
+void mempool_destroy(mempool_type *mptr)
 {
     mempool_block *current, *mempools_head;
     mempool_freeblock   freefn = mptr->freeblockfn;
@@ -125,7 +125,7 @@ void*  mempool_malloc(mempool_type *mptr, int size, int expand)
         if (!expand) return NULL;
 
         expand_size = expand_mem>size ? expand_mem:2*size; 
-        pool = mptr->newblockfn(expand_size, &mem_hndl);
+        pool = mptr->newblockfn(&expand_size, &mem_hndl);
         expand_pool = (mempool_block*)pool;
         expand_pool->mempool_ptr = pool;
         expand_pool->mem_hndl = mem_hndl;
@@ -140,7 +140,7 @@ void*  mempool_malloc(mempool_type *mptr, int size, int expand)
         bestfit->size = expand_size-sizeof(mempool_block);
         bestfit->mem_hndl = expand_pool->mem_hndl;
         bestfit->next_free = 0;
-        bestfit_size = expand_size;
+        bestfit_size = expand_size-sizeof(mempool_block);
 #if 0
         current = freelist_head;
         while(current!= NULL && current < bestfit )
@@ -152,14 +152,16 @@ void*  mempool_malloc(mempool_type *mptr, int size, int expand)
         CmiAssert(bestfit > previous);
 #endif
         bestfit_previous = previous;
-        if (previous == NULL)
+        if (previous == NULL) {
            *freelist_head = (char*)bestfit - (char*)mptr;
+           freelist_head_ptr =  bestfit;
+        }
         else
            previous->next_free = (char*)bestfit-(char*)mptr;
     }
 
     bestfit->size = size;
-    if(bestfit_size > size) //deduct this entry 
+    if(bestfit_size > size + sizeof(mempool_header)) //deduct this entry 
     {
         mempool_header *ptr = (mempool_header *)((char*)bestfit + size);
         ptr->size = bestfit_size - size;
@@ -172,6 +174,9 @@ void*  mempool_malloc(mempool_type *mptr, int size, int expand)
     }
     else {  
           //delete this free entry
+        if (bestfit_size > size) {
+           bestfit->size = bestfit_size;
+        }
         if(bestfit == freelist_head_ptr)
             *freelist_head = freelist_head_ptr->next_free;
         else
index c396d80630b6f6e26ccb901a18a353af104784ec..7f42f373cda7f9e90e39aee9dcc4067acf4ee98a 100644 (file)
@@ -30,7 +30,7 @@ typedef struct mempool_header
   size_t            next_free;
 } mempool_header;
 
-typedef void * (* mempool_newblockfn)(int size, gni_mem_handle_t *mem_hndl);
+typedef void * (* mempool_newblockfn)(int *size, gni_mem_handle_t *mem_hndl);
 typedef void (* mempool_freeblock)(void *ptr, gni_mem_handle_t mem_hndl);
 
 // only at beginning of first block of mempool
@@ -43,7 +43,7 @@ typedef struct mempool_type
 } mempool_type;
 
 mempool_type *mempool_init(size_t pool_size, mempool_newblockfn newfn, mempool_freeblock freefn);
-void  mempool_destory(mempool_type *mptr);
+void  mempool_destroy(mempool_type *mptr);
 void*  mempool_malloc(mempool_type *mptr, int size, int expand);
 void mempool_free(mempool_type *mptr, void *ptr_free);
 
index a03c9126ff86fbdcc79aee1719841f293a1d15ba..abdd8a30bc2faebd4a59924ec855c711ff918565 100644 (file)
@@ -30,7 +30,7 @@
 */
 ComlibMulticastMsg * ComlibSectionInfo::getNewMulticastMessage(CharmMessageHolder *cmsg, int needSort, int instanceID){
     
-  cmsg->checkme();
+  //  cmsg->checkme();
 
     if(cmsg->sec_id == NULL || cmsg->sec_id->_nElems == 0)
         return NULL;
@@ -190,7 +190,7 @@ void ComlibSectionInfo::unpack(envelope *cb_env,
 
 
 void ComlibSectionInfo::processOldSectionMessage(CharmMessageHolder *cmsg) {
-  cmsg->checkme();
+  //  cmsg->checkme();
 
     ComlibPrintf("Process Old Section Message \n");
 
index 227c791aaf9631e2f111fc2685a66b28aaf84b79..59ebb18aa4a524c326ab880244658104c03f8df9 100644 (file)
@@ -33,9 +33,9 @@ class CharmMessageHolder : public MessageHolder{
  public:
     /// An unused, and probably unnecessary array that was used to avoid a memory corruption that clobbers members in this class. The bug has likely been fixed.
 
-#define BUGTRAPSIZE 5000
+  //#define BUGTRAPSIZE 5000
 
-    int bug_trap[BUGTRAPSIZE];
+   //    int bug_trap[BUGTRAPSIZE];
 
     /// The section information for an enqueued multicast message
     CkSectionID *sec_id;
@@ -53,14 +53,17 @@ class CharmMessageHolder : public MessageHolder{
       type = t;
       sec_id = NULL;
       copy_of_sec_id = NULL;
+      /*
       for(int i=0;i<BUGTRAPSIZE;i++){
        bug_trap[i] = 0;
       }
       
       checkme();
+      */
     }
     
     /// Verfiy that the bug_trap array has not been corrupted. Noone should ever write to that array.
+    /*
     void checkme() {
        for(int i=0;i<BUGTRAPSIZE;i++){
          if(bug_trap[i] != 0){
@@ -69,19 +72,21 @@ class CharmMessageHolder : public MessageHolder{
          }
        }
     }
-
+    */
 
     CharmMessageHolder(CkMigrateMessage *m) : MessageHolder(m) {
+      /*
       for(int i=0;i<BUGTRAPSIZE;i++){
        bug_trap[i] = 0;
       }
       checkme();
+      */
     }
     
 
 
     ~CharmMessageHolder(){
-      checkme();
+      //      checkme();
     }
 
     inline char * getCharmMessage() {
@@ -92,7 +97,7 @@ class CharmMessageHolder : public MessageHolder{
     inline void saveCopyOf_sec_id(){
       //      ComlibPrintf("[%d] saveCopyOf_sec_id sec_id=%p NULL=%d\n", CkMyPe(), sec_id, NULL);
 
-      checkme();
+      //      checkme();
 
       if(sec_id!=NULL){
 
@@ -122,7 +127,7 @@ class CharmMessageHolder : public MessageHolder{
        //      ComlibPrintf("saving copy of sec_id into %p\n", copy_of_sec_id);
       }
 
-      checkme();
+      //      checkme();
 
     }
 
@@ -135,7 +140,7 @@ class CharmMessageHolder : public MessageHolder{
 /*     copy_of_sec_id = NULL; */
 /*       } */
 
-      checkme();
+//      checkme();
 
     }
 
index 822bd002ff83c7e046549299942b83f6aa2ee532..1d1b67a2ae36ca6e7e0c238c37f0574fe2f58e93 100644 (file)
@@ -96,7 +96,7 @@ EachToManyMulticastStrategy::~EachToManyMulticastStrategy() {
 
 void EachToManyMulticastStrategy::insertMessage(CharmMessageHolder *cmsg){
 
-  cmsg -> checkme();
+  //  cmsg -> checkme();
 
        ComlibPrintf("[%d] EachToManyMulticast: insertMessage\n", CkMyPe());
 
index ff30a8cffa63a2e0eac835db329944fec291df31..90895d20ac9e504ce678d17b0ad273d8378c1929 100644 (file)
@@ -53,7 +53,7 @@ class EachToManyMulticastStrategy : public RouterStrategy, public CharmStrategy
     ~EachToManyMulticastStrategy();
 
     void insertMessage(MessageHolder *msg) {
-      ((CharmMessageHolder*)msg) -> checkme();
+      //      ((CharmMessageHolder*)msg) -> checkme();
       insertMessage((CharmMessageHolder*)msg);
     }
 
index 001ea4d256e8fa8be2b9475e2039b484f1a83f56..b9e09d3c95b1071d4e9fec9e6cd6dd3a666fd84e 100644 (file)
@@ -78,7 +78,7 @@ void StreamingStrategy::insertMessage(MessageHolder *cmsg) {
     streamingMsgBuf[pe].enq(cmsg);
     streamingMsgCount[pe]++;
     bufSize[pe]+=size;
-    if (streamingMsgCount[pe] > bufferMax || bufSize[pe] > bufSizeMax) flushPE(pe);
+    if (streamingMsgCount[pe] >= bufferMax || bufSize[pe] >= bufSizeMax) flushPE(pe);
 }
 
 void StreamingStrategy::doneInserting() {
index 6f4682cd410e2800e2915054265e648195922879..86465b782b436d7158a0a0cd0334ac2a2b4bac4d 100644 (file)
@@ -285,7 +285,7 @@ void ConvComlibManager::enableStrategy(int i) {
   MessageHolder *mh;
   while ((mh=strategyTable[i].tmplist.deq()) != NULL) {
     CharmMessageHolder*cmh = (CharmMessageHolder*)mh;
-    cmh->checkme();
+    //    cmh->checkme();
     cmh->sec_id = cmh->copy_of_sec_id;
 
 #if DEBUG_MULTICAST
index 0ed2b573b4008bd25df0d0e091293f8ec1de74d7..807cd99b652b07ee6e99f1ee3b45c3a1f26ae849 100644 (file)
@@ -385,11 +385,13 @@ extern "C" void LrtsInitCpuTopo(char **argv)
   hostnameMsg  *msg;
   double startT;
  
+  int obtain_flag = 1;              // default on
+  int show_flag = 0;                // default not show topology
+
   if (CmiMyRank() ==0) {
      topoLock = CmiCreateLock();
   }
 
-  int obtain_flag = 1;              // default on
 #if __FAULT_|| CMK_BLUEGENEQ
   obtain_flag = 0;
 #endif
@@ -399,6 +401,9 @@ extern "C" void LrtsInitCpuTopo(char **argv)
   if (CmiGetArgFlagDesc(argv,"+skip_cpu_topology",
                                "skip the processof getting cpu topology info"))
     obtain_flag = 0;
+  if(CmiGetArgFlagDesc(argv,"+show_cpu_topology",
+                                          "Show cpu topology info"))
+    show_flag = 1;
 
 #if CMK_BIGSIM_CHARM
   if (BgNodeRank() == 0)
@@ -558,6 +563,7 @@ extern "C" void LrtsInitCpuTopo(char **argv)
 
   // now every one should have the node info
   CcdRaiseCondition(CcdTOPOLOGY_AVAIL);      // call callbacks
+  if (CmiMyPe() == 0 && show_flag) cpuTopo.print();
 }
 
 #else           /* not supporting cpu topology */
@@ -569,6 +575,8 @@ extern "C" void LrtsInitCpuTopo(char **argv)
                                                "obtain cpu topology info");
   CmiGetArgFlagDesc(argv,"+skip_cpu_topology",
                                "skip the processof getting cpu topology info");
+  CmiGetArgFlagDesc(argv,"+show_cpu_topology",
+                                          "Show cpu topology info");
 }
 
 #endif
diff --git a/tests/charm++/commtest/comlib_stream_performance/Makefile b/tests/charm++/commtest/comlib_stream_performance/Makefile
new file mode 100644 (file)
index 0000000..def5020
--- /dev/null
@@ -0,0 +1,23 @@
+CHARMC=../../../../bin/charmc $(OPTS)
+
+OBJS = streaming.o
+
+all: streaming
+
+streaming: $(OBJS)
+       $(CHARMC) -language charm++ -module comlib -tracemode projections -o streaming $(OBJS) 
+
+streaming.decl.h: streaming.ci
+       $(CHARMC)  streaming.ci
+
+clean:
+       rm -f *.decl.h *.def.h conv-host *.o streaming charmrun *.log *.sum *.sts *~
+
+streaming.o: streaming.C streaming.decl.h
+       $(CHARMC) -c -O3 streaming.C
+
+test: all
+       ./charmrun ./streaming +p2 
+
+debug: all
+       ./charmrun ./streaming +p2 ++debug
\ No newline at end of file
diff --git a/tests/charm++/commtest/comlib_stream_performance/streaming.C b/tests/charm++/commtest/comlib_stream_performance/streaming.C
new file mode 100644 (file)
index 0000000..1a7aedd
--- /dev/null
@@ -0,0 +1,171 @@
+#include "comlib.h"
+#include <cassert>
+
+#include "streaming.decl.h"
+CProxy_Main mainProxy;
+int nElements;
+
+CProxy_WorkerArray basicArrayProxy; 
+CProxy_WorkerArray streamingArrayProxy;
+ComlibInstanceHandle stratStreaming;
+
+
+#define PERIOD_IN_MS 10
+#define NMSGS 16
+#define MAX_MESSAGE_SIZE 10000
+#define MAX_BUFFER_SIZE  100000
+#define ENVELOPE_OVERHEAD_ESTIMATE 100
+#define MIN_TEST_SIZE 16
+#define MAX_TEST_SIZE 1024
+
+class TestMessage : public CMessage_TestMessage {
+public:
+  int length;
+  char* msg;
+};
+
+// mainchare
+
+class Main : public CBase_Main{
+private:
+  int nDone;
+
+public:
+
+  Main(CkArgMsg *m) {    
+    nDone = 0;
+
+    //    com_debug = 1;
+
+    nElements = 2; 
+    if(m->argc >1) nElements=atoi(m->argv[1]);
+    delete m;
+
+    mainProxy = thishandle;
+       
+    // create streaming strategy
+    StreamingStrategy *strategy = new StreamingStrategy(PERIOD_IN_MS, NMSGS,
+                                                        MAX_MESSAGE_SIZE, MAX_BUFFER_SIZE);
+    stratStreaming = ComlibRegister(strategy);
+    streamingArrayProxy = CProxy_WorkerArray::ckNew(nElements, nElements);
+    basicArrayProxy = streamingArrayProxy; 
+
+    ComlibAssociateProxy(stratStreaming, streamingArrayProxy);
+    
+    // initiate using non-delegated proxy because broadcasts do not
+    // work with streaming
+    basicArrayProxy.prepareTest();
+    CkCallback syncWorkers(CkIndex_Main::runWithoutStreaming(), mainProxy); 
+    CkStartQD(syncWorkers);   
+  }
+
+  void runWithStreaming() {
+    CkPrintf("Running Streaming Test...\n"); 
+    basicArrayProxy.initiateSends(streamingArrayProxy);
+    CkCallback syncWorkers(CkIndex_Main::done(), mainProxy); 
+    CkStartQD(syncWorkers);
+  }
+
+  void runWithoutStreaming() {
+    CkPrintf("Running Without Streaming Enabled ... \n");
+    basicArrayProxy.initiateSends(basicArrayProxy);
+    CkCallback syncWorkers(CkIndex_Main::runWithStreaming(), mainProxy); 
+    CkStartQD(syncWorkers);
+  }
+
+  void done() {
+    CkPrintf("Finished test\n");
+    CkExit();
+  }
+
+};
+
+class WorkerArray : public CBase_WorkerArray {
+private:
+  CProxy_WorkerArray localProxy;
+  TestMessage **msgs; 
+  TestMessage **newMsgs; 
+  int msgSize;
+  int nElements; 
+  int neighbor; 
+  double mystartTime; 
+  double myendTime;
+  int receivedMsgs; 
+
+public:
+
+  WorkerArray(int nChares) {
+    nElements = nChares; 
+    msgs = new TestMessage*[NMSGS];
+    newMsgs = new TestMessage*[NMSGS];
+    msgSize = MIN_TEST_SIZE; 
+    // partition into pairs of ranks
+    if (thisIndex % 2 == 0) {
+      neighbor = (thisIndex + nElements/2) % nElements; 
+    }
+    else {
+      neighbor = (thisIndex - nElements/2) % nElements; 
+    }
+    receivedMsgs = 0; 
+  }
+
+  WorkerArray(CkMigrateMessage *m) {}
+
+  void prepareTest() {
+    
+    for (int i = 0; i < NMSGS; i++) {
+      msgs[i] = new(msgSize) TestMessage; 
+    } 
+        
+  }
+
+
+  void initiateSends(CProxy_WorkerArray workerProxy) {
+    double startTime = CkWallTimer();
+    localProxy = workerProxy; 
+    if (thisIndex % 2 == 0) {
+      mystartTime = CkWallTimer();    
+      for (int i = 0; i < NMSGS; i++) {
+        localProxy[neighbor].receiveSends(msgs[i]);
+      }
+    }
+    double endTime = CkWallTimer(); 
+    //    CkPrintf("[%d] initiateSends took %f us\n", thisIndex, (endTime-startTime) * 1000000);
+  }
+
+  void receiveSends(TestMessage *msg) {
+    double startTime = CkWallTimer();
+    // recycle received messages
+    newMsgs[receivedMsgs] = msg; 
+    receivedMsgs++; 
+    if (receivedMsgs == NMSGS) {
+      for (int i=0; i < NMSGS; i++) {
+        localProxy[neighbor].receiveReplies(msgs[i]); 
+      }
+      receivedMsgs = 0;
+      msgs = newMsgs; 
+    }
+    double endTime = CkWallTimer(); 
+    //    CkPrintf("[%d] receiveSends took %f us\n", thisIndex, (endTime-startTime) * 1000000);
+  }
+  
+  void receiveReplies(TestMessage *msg) {
+    double startTime = CkWallTimer();
+    // recycle received messages
+    newMsgs[receivedMsgs] = msg;
+    receivedMsgs++;
+    if (receivedMsgs == NMSGS) {
+      myendTime = CkWallTimer();
+      CkPrintf("[%d] round trip time for sending %d messages: %f us\n", 
+               thisIndex, NMSGS, 1000000 * (myendTime - mystartTime));
+      receivedMsgs = 0;
+      msgs = newMsgs; 
+    }
+    double endTime = CkWallTimer(); 
+    //    CkPrintf("[%d] receiveReplies took %f us\n", thisIndex, (endTime-startTime) * 1000000);
+  }
+
+};
+
+
+#include "streaming.def.h"
diff --git a/tests/charm++/commtest/comlib_stream_performance/streaming.ci b/tests/charm++/commtest/comlib_stream_performance/streaming.ci
new file mode 100644 (file)
index 0000000..4fa7737
--- /dev/null
@@ -0,0 +1,30 @@
+mainmodule streaming {
+  extern module comlib; 
+                
+  readonly CProxy_Main mainProxy;      
+  readonly int nElements;
+
+  readonly CProxy_WorkerArray basicArrayProxy; 
+  readonly CProxy_WorkerArray streamingArrayProxy;
+  readonly ComlibInstanceHandle stratStreaming;                                
+
+  message TestMessage {
+    char msg[];
+  };
+
+  mainchare Main {
+    entry Main(CkArgMsg *m);
+    entry void runWithStreaming();
+    entry void runWithoutStreaming();
+    entry void done();
+  };
+
+  array[1D] WorkerArray {
+    entry WorkerArray(int nChares);
+    entry void prepareTest();
+    entry void initiateSends(CProxy_WorkerArray workerProxy);
+    entry void receiveSends(TestMessage *msg);
+    entry void receiveReplies(TestMessage *msg);
+  };
+
+};