Merge branch 'charm' of charmgit:charm into charm
authorYanhua Sun <yanhuas@jyc1.(none)>
Wed, 25 Jan 2012 03:28:40 +0000 (21:28 -0600)
committerYanhua Sun <yanhuas@jyc1.(none)>
Wed, 25 Jan 2012 03:28:40 +0000 (21:28 -0600)
13 files changed:
examples/multiphaseSharedArrays/Makefile_common
examples/multiphaseSharedArrays/histogram/Makefile [new file with mode: 0644]
examples/multiphaseSharedArrays/histogram/headers [new file with mode: 0644]
examples/multiphaseSharedArrays/histogram/histogram [new file with mode: 0755]
examples/multiphaseSharedArrays/histogram/histogram.C [new file with mode: 0644]
examples/multiphaseSharedArrays/histogram/histogram.ci [new file with mode: 0644]
examples/multiphaseSharedArrays/histogram/run.sh [new file with mode: 0755]
src/arch/net/machine-ibverbs.c
src/ck-core/ckarray.C
src/ck-core/ckarray.h
src/ck-core/init.C
src/ck-core/middle-blue.h
src/libs/ck-libs/MeshStreamer/MeshStreamer.h

index 95c46b0bc69195b9f75d4ab3ca957abce525c140..a55f5b817f15fd0a4d0fd3e714cf3ebbeb2819db 100644 (file)
@@ -2,7 +2,7 @@
 # needs $(PGM)
 
 OPTS=
-CDIR=../../../..
+CDIR=../../..
 CHARMC=$(CDIR)/bin/charmc -language charm++ $(OPTS)
 
 # Rules to convert .ci to .decl.h and .def.h
diff --git a/examples/multiphaseSharedArrays/histogram/Makefile b/examples/multiphaseSharedArrays/histogram/Makefile
new file mode 100644 (file)
index 0000000..5c7f5c4
--- /dev/null
@@ -0,0 +1,3 @@
+
+PGM=histogram
+include ../Makefile_common
diff --git a/examples/multiphaseSharedArrays/histogram/headers b/examples/multiphaseSharedArrays/histogram/headers
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/examples/multiphaseSharedArrays/histogram/histogram b/examples/multiphaseSharedArrays/histogram/histogram
new file mode 100755 (executable)
index 0000000..b914917
Binary files /dev/null and b/examples/multiphaseSharedArrays/histogram/histogram differ
diff --git a/examples/multiphaseSharedArrays/histogram/histogram.C b/examples/multiphaseSharedArrays/histogram/histogram.C
new file mode 100644 (file)
index 0000000..000038b
--- /dev/null
@@ -0,0 +1,125 @@
+// -*- mode: c++; tab-width: 4 -*-
+//
+#include "msa/msa.h"
+
+typedef MSA::MSA2D<int, DefaultEntry<int>,
+        MSA_DEFAULT_ENTRIES_PER_PAGE, MSA_ROW_MAJOR> MSA2D;
+typedef MSA::MSA1D<int, DefaultEntry<int>, MSA_DEFAULT_ENTRIES_PER_PAGE> MSA1D;
+
+#include "histogram.decl.h"
+
+
+const unsigned int ROWS = 2000;
+const unsigned int COLS = 2000;
+const unsigned int BINS = 10;
+const unsigned int MAX_ENTRY = 1000;
+unsigned int WORKERS = 10;
+
+
+class Driver : public CBase_Driver
+{
+public:
+    Driver(CkArgMsg* m)
+    {
+        // Usage: histogram [number_of_worker_threads]
+        if (m->argc > 1) WORKERS=atoi(m->argv[1]);
+        delete m;
+
+        // Actually build the shared arrays: a 2d array to hold arbitrary
+        // data, and a 1d histogram array.
+        MSA2D data(ROWS, COLS, WORKERS);
+        MSA1D bins(BINS, WORKERS);
+        // Create worker threads and start them off.
+        workers = CProxy_Histogram::ckNew(data, bins, WORKERS);
+        workers.ckSetReductionClient(
+            new CkCallback(CkIndex_Driver::done(NULL), thisProxy));
+        workers.start();
+    }
+
+    void done(CkReductionMsg* m)
+    {
+        // When the reduction is complete, everything is ready to exit.
+        CkExit();
+    }
+};
+
+
+class Histogram: public CBase_Histogram
+{
+public:
+    MSA2D data;
+    MSA1D bins;
+
+    Histogram(const MSA2D& data_, const MSA1D& bins_)
+    : data(data_), bins(bins_)
+    {}
+
+    Histogram(CkMigrateMessage* m)
+    {}
+
+    ~Histogram()
+    {}
+
+    // Note: it's important that start is a threaded entry method
+    // so that the blocking MSA calls work as intended.
+    void start()
+    {
+        data.enroll(WORKERS);
+        bins.enroll(WORKERS);
+        
+        // Fill the data array with random numbers.
+               MSA2D::Write wd = data.getInitialWrite();
+        if (thisIndex == 0) fill_array(wd);
+
+        // Fill the histogram bins: read from the data array and
+        // accumulate to the histogram array.
+               MSA2D::Read rd = wd.syncToRead();
+        MSA1D::Accum ab = bins.getInitialAccum();
+        fill_bins(ab, rd);
+
+        // Print the histogram.
+        MSA1D::Read rb = ab.syncToRead();
+        if (thisIndex == 0) print_array(rb);
+
+        // Contribute to Driver::done to terminate the program.
+        contribute();
+    }
+
+    void fill_array(MSA2D::Write& w)
+    {
+        // Just let one thread fill the whole data array
+        // with random entries to be histogrammed.
+        for (unsigned int r = 0; r < data.getRows(); r++) {
+            for (unsigned int c = 0; c < data.getCols(); c++) {
+                w.set(r, c) = random() % MAX_ENTRY;
+            }
+        }
+    }
+
+    void fill_bins(MSA1D::Accum& b, MSA2D::Read& d)
+    {
+        // Determine the range of the data array that this
+        // worker should read from.
+        unsigned int range = ROWS / WORKERS;
+        unsigned int min_row = thisIndex * range;
+        unsigned int max_row = (thisIndex + 1) * range;
+        
+        // Count the entries that belong to each bin.
+        for (unsigned int r = 0; r < data.getRows(); r++) {
+            for (unsigned int c = 0; c < data.getCols(); c++) {
+                unsigned int bin = d.get(r, c) / (MAX_ENTRY / BINS);
+                b(bin) += 1;
+            }
+        }
+    }
+
+    void print_array(MSA1D::Read& b)
+    {
+        for (unsigned int i=0; i<BINS; ++i) {
+            CkPrintf("%d ", b.get(i)); 
+        }
+    }
+};
+
+#include "histogram.def.h"
diff --git a/examples/multiphaseSharedArrays/histogram/histogram.ci b/examples/multiphaseSharedArrays/histogram/histogram.ci
new file mode 100644 (file)
index 0000000..e0dd682
--- /dev/null
@@ -0,0 +1,23 @@
+// -*- mode: c++; tab-width: 4 -*-
+mainmodule histogram
+{
+    mainchare Driver
+    {
+        entry void Driver(CkArgMsg*);
+        entry void done(CkReductionMsg*);
+    };
+
+    array[1D] Histogram
+    {
+        entry void Histogram(MSA2D data_, MSA1D bins_);
+        entry [threaded] void start();
+    };
+
+    
+    /* Currently, you must explicitly instantiate any
+       MSA templates that you use. */
+    group MSA_CacheGroup<int, DefaultEntry<int>,
+                         MSA_DEFAULT_ENTRIES_PER_PAGE>;
+    array [1D] MSA_PageArray<int, DefaultEntry<int>,
+                             MSA_DEFAULT_ENTRIES_PER_PAGE>;
+};
diff --git a/examples/multiphaseSharedArrays/histogram/run.sh b/examples/multiphaseSharedArrays/histogram/run.sh
new file mode 100755 (executable)
index 0000000..3cfcad3
--- /dev/null
@@ -0,0 +1,29 @@
+#!/bin/sh
+# Shell script to test for multiple test cases
+
+touch outputs
+for rows1 in 1000 5000 10000; do
+  for cols1 in 500 750 1000; do
+    for cols2 in 1000 5000 10000; do
+      for mbytes in 128 64 32 16 8 4 2 1; do
+       for num_workers in 1 2 4 8 16 32; do
+          rm -rf params.h
+          printf "const unsigned int bytes = %d*1024*1024;\n" $mbytes >> params.h
+          printf "const unsigned int ROWS1 = %d;\n" $rows1 >> params.h
+          printf "const unsigned int COLS1 = %d;\n" $cols1 >> params.h
+          printf "const unsigned int COLS2 = %d;\n" $cols2 >> params.h
+          printf "const unsigned int ROWS2 = COLS1;\n" >> params.h
+          printf "const unsigned int NUM_WORKERS = %d;\n" $num_workers >> params.h
+          printf "\n" >> params.h
+  
+          rm -f t3
+          make OPTS=-O3 -s
+          for num_pes in 4 8 16 32; do
+            ./charmrun transpose +p$num_pes >> outputs
+          done
+        done
+      done
+    done
+  done
+done
index 3d12779590375267e827c6cd60f66352d49651fb..0cb4b15e89d1bd7aee7956ebc395dd8452071714 100644 (file)
@@ -540,6 +540,7 @@ static void CmiMachineInit(char **argv){
                createLocalQps(dev,ibPort,_Cmi_mynode,_Cmi_numnodes,context->localAddr);
        }
        
+        if (Cmi_charmrun_fd == -1) return;
        
        //TURN ON RDMA
        rdma=1;
@@ -2503,6 +2504,7 @@ void * infi_CmiAlloc(int size){
 #if CMK_IBVERBS_STATS
        numAlloc++;
 #endif
+        if (Cmi_charmrun_fd == -1) return malloc(size);
 #if THREAD_MULTI_POOL
        res = getInfiCmiChunkThread(size-sizeof(CmiChunkHeader));
        res -= sizeof(CmiChunkHeader);
@@ -2648,6 +2650,7 @@ void infi_CmiFree(void *ptr){
        numFree++;
 #endif
        
+        if (Cmi_charmrun_fd == -1) return free(ptr);
 #if CMK_SMP    
        CmiMemLock();
 #endif
index 7ec5613e12d4f77a685db5eb7dfe320e975041f2..595984b79e3cd26a67e3a88391e8371e4702c4d0 100644 (file)
@@ -777,6 +777,7 @@ void CkArray::pup(PUP::er &p){
        p|locMgrID;
        p|listeners;
        p|listenerDataOffset;
+        p|stableLocations;
        testPup(p,1234);
        if(p.isUnpacking()){
                thisProxy=thisgroup;
index a951572b9c7be6df4301761d2168539d77553dec..bc1d35125e0eeca33d93e05965a751ea638ed20f 100644 (file)
@@ -635,6 +635,7 @@ class CkArray : public CkReductionMgr, public CkArrMgr {
   CProxy_CkArray thisProxy;
   typedef CkMigratableListT<ArrayElement> ArrayElementList;
   ArrayElementList *elements;
+private:
   bool stableLocations;
 
 public:
index 50d746e66a9f8108e4f476f247360e57b648bcde..5fa514a4840c0eabd1786cdf1096c1725a86aac9 100644 (file)
@@ -1161,9 +1161,12 @@ void _initCharm(int unused_argc, char **argv)
        CkpvAccess(_msgPool) = new MsgPool();
 
        CmiNodeAllBarrier();
+
+#if ! CMK_MEM_CHECKPOINT
        CmiBarrier();
        CmiBarrier();
        CmiBarrier();
+#endif
 #if CMK_SMP_TRACE_COMMTHREAD
        _TRACE_BEGIN_COMPUTATION();     
 #else
index e70d2105490ea0d6d5c62edddae4948cab9a142f..b702ac445216312bff78a27c74941efff00d9d24 100644 (file)
@@ -195,6 +195,9 @@ void CkReduce(void *msg, int size, CmiReduceMergeFn mergeFn);
 #undef CmiNodeAllBarrier
 #define CmiNodeAllBarrier()
 
+#undef CmiBarrier
+#define CmiBarrier()
+
 /** common functions for two versions */
 namespace BGConverse {
 
index f97109f449c9a68231818876758576b391c0dc7a..71278b721d75129d982bb861477c9687028825ec 100644 (file)
@@ -7,7 +7,9 @@
 // reaching totalBufferCapacity_
 #define BUCKET_SIZE_FACTOR 4
 
-//#define DEBUG_STREAMER
+// #define DEBUG_STREAMER
+// #define CACHE_LOCATIONS
+// #define SUPPORT_INCOMPLETE_MESH
 
 enum MeshStreamerMessageType {PlaneMessage, ColumnMessage, PersonalizedMessage};
 
@@ -19,7 +21,6 @@ class MeshLocation {
   MeshStreamerMessageType msgType;
 };
 
-// #define CACHE_LOCATIONS
 
 /*
 class LocalMessage : public CMessage_LocalMessage {
@@ -113,6 +114,12 @@ private:
     bool *isCached; 
 #endif
 
+#ifdef SUPPORT_INCOMPLETE_MESH
+    int numNodesInLastPlane_;
+    int numFullRowsInLastPlane_;
+    int numColumnsInLastRow_;
+#endif
+
     void determineLocation(const int destinationPe, 
                           MeshLocation &destinationCoordinates);
 
@@ -224,6 +231,11 @@ MeshStreamer<dtype>::MeshStreamer(int totalBufferCapacity, int numRows,
   std::fill(isCached, isCached + numNodes_, false);
 #endif
 
+#ifdef SUPPORT_INCOMPLETE_MESH
+  numNodesInLastPlane_ = numNodes_ % planeSize_; 
+  numFullRowsInLastPlane_ = numNodesInLastPlane_ / numColumns_;
+  numColumnsInLastRow_ = numNodesInLastPlane_ - numFullRowsInLastPlane_ * numColumns_;  
+#endif
 }
 
 template <class dtype>
@@ -318,11 +330,28 @@ void MeshStreamer<dtype>::storeMessage(MeshStreamerMessage<dtype> ** const messa
     case PlaneMessage:
       destinationIndex = myNodeIndex_ + 
        (destinationCoordinates.planeIndex - myPlaneIndex_) * planeSize_;  
+#ifdef SUPPORT_INCOMPLETE_MESH
+      if (destinationIndex >= numNodes_) {
+       int numValidRows = numFullRowsInLastPlane_; 
+       if (numColumnsInLastRow_ > myColumnIndex_) {
+         numValidRows++; 
+       }
+       destinationIndex = destinationCoordinates.planeIndex * planeSize_ + 
+         myColumnIndex_ + (myRowIndex_ % numValidRows) * numColumns_; 
+      }
+#endif      
       this->thisProxy[destinationIndex].receiveAggregateData(destinationBucket);
       break;
     case ColumnMessage:
       destinationIndex = myNodeIndex_ + 
        (destinationCoordinates.columnIndex - myColumnIndex_);
+#ifdef SUPPORT_INCOMPLETE_MESH
+      if (destinationIndex >= numNodes_) {
+       destinationIndex = destinationCoordinates.planeIndex * planeSize_ + 
+         destinationCoordinates.columnIndex + 
+         (myColumnIndex_ % numFullRowsInLastPlane_) * numColumns_; 
+      }
+#endif      
       this->thisProxy[destinationIndex].receiveAggregateData(destinationBucket);
       break;
     case PersonalizedMessage:
@@ -367,7 +396,6 @@ void MeshStreamer<dtype>::insertData(dtype &dataItem, const int destinationPe) {
     return;
   }
 
-  int indexWithinPlane; 
   MeshLocation destinationCoordinates;
 
   determineLocation(destinationPe, destinationCoordinates);