Added the initial support of using pthreads for parallelization (a special SMP mode...
authorChao Mei <chaomei2@illinois.edu>
Wed, 29 Feb 2012 05:11:55 +0000 (23:11 -0600)
committerChao Mei <chaomei2@illinois.edu>
Wed, 29 Feb 2012 05:11:55 +0000 (23:11 -0600)
There's one problem that remains as the cpu affinity setting for those pthreads

NodeHelper.C
NodeHelper.ci
NodeHelper.h
NodeHelperAPI.h
example/simpleLoopBench/hello.C

index 12cd51c2f72f2cb56eb931d4473af2b890a2d9bb..73e8d4ca53ce02b2e3e18cdfbc9aebb04f70d4fc 100644 (file)
 #include "NodeHelper.h"
+#include <pthread.h>
 
-FuncNodeHelper::FuncNodeHelper()
+/*====Beginning of pthread-related variables and impelementation====*/
+//__thread is not portable, but it works almost everywhere if pthread works
+//After C++11, this should be thread_local
+static __thread pthread_cond_t thdCondition; //the signal var of each pthread to be notified
+static __thread pthread_mutex_t thdLock; //the lock associated with the condition variables
+
+static FuncNodeHelper *mainHelper = NULL;
+static int mainHelperPhyRank = 0;
+static int numPhysicalPEs = 0;
+static CurLoopInfo *pthdLoop = NULL; //the pthread-version is always synchronized
+static pthread_mutex_t **allLocks = NULL;
+static pthread_cond_t **allConds = NULL;
+static pthread_t *ndhThreads = NULL;
+static volatile int gCrtCnt = 0;
+static volatile int exitFlag = 0;
+
+static void *ndhThreadWork(void *id) {
+       size_t myId = (size_t) id;
+       
+       //further improvement of this affinity setting!!
+       int myPhyRank = (myId+mainHelperPhyRank)%numPhysicalPEs;
+       //printf("thread[%d]: affixed to rank %d\n", myId, myPhyRank);
+       myPhyRank = myId;
+       CmiSetCPUAffinity(myPhyRank);
+       
+       pthread_mutex_init(&thdLock, NULL);
+       pthread_cond_init(&thdCondition, NULL);
+       
+       allLocks[myId-1] = &thdLock;
+       allConds[myId-1] = &thdCondition;
+       
+       __sync_add_and_fetch(&gCrtCnt, 1);
+       
+       while(1){
+               if(exitFlag) break;
+               pthread_mutex_lock(&thdLock);
+               pthread_cond_wait(&thdCondition, &thdLock);
+               pthread_mutex_unlock(&thdLock);
+               /* kids ID range: [1 ~ numHelpers-1] */
+               if(mainHelper->needTreeBcast()){
+                       //notify my children
+                       int myKid = myId*TREE_BCAST_BRANCH+1;
+                       for(int i=0; i<TREE_BCAST_BRANCH; i++, myKid++){
+                               if(myKid >= mainHelper->getNumHelpers()) break;
+                               //all locks and conditions exclude the main thread, so index needs to be subtracted by one
+                               pthread_mutex_lock(allLocks[myKid-1]);
+                               pthread_cond_signal(allConds[myKid-1]);
+                               pthread_mutex_unlock(allLocks[myKid-1]);
+                       }
+               }
+               pthdLoop->stealWork();
+       }
+}
+
+void FuncNodeHelper::createPThreads() {
+       int numThreads = numHelpers - 1;
+       allLocks = (pthread_mutex_t **)malloc(sizeof(void *)*numThreads);
+       allConds = (pthread_cond_t **)malloc(sizeof(void *)*numThreads);
+       memset(allLocks, 0, sizeof(void *)*numThreads);
+       memset(allConds, 0, sizeof(void *)*numThreads);
+       
+       pthread_attr_t attr;
+       pthread_attr_init(&attr);
+       pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
+       ndhThreads = new pthread_t[numThreads];
+       mainHelperPhyRank = CmiPhysicalRank(CmiMyPe());
+       numPhysicalPEs = CmiNumPesOnPhysicalNode(CmiPhysicalNodeID(CmiMyPe()));
+       for(int i=1; i<=numThreads; i++){
+               pthread_create(ndhThreads+i, &attr, ndhThreadWork, (void *)i);
+       }
+       while(gCrtCnt != numThreads); //wait for all threads to finish creation
+}
+
+void FuncNodeHelper::exit(){
+       if(mode == NODEHELPER_PTHREAD){
+               exitFlag = 1;             
+               for(int i=0; i<numHelpers-1; i++)
+                       pthread_join(ndhThreads[i], NULL);
+               delete [] ndhThreads;
+               free(allLocks);
+               free(allConds);
+               delete pthdLoop;
+       }
+}
+
+/*====End of pthread-related variables and impelementation====*/
+
+FuncNodeHelper::FuncNodeHelper(int mode_, int numThreads_)
 {  
-#if CMK_SMP    
-    //CkPrintf("FuncNodeHelper created on node %d\n", CkMyNode());
-         
     traceRegisterUserEvent("nodehelper total work",20);
     traceRegisterUserEvent("nodehlelper finish signal",21);
-    
-       numHelpers = CkMyNodeSize();
-       helperPtr = new FuncSingleHelper *[numHelpers];
-       useTreeBcast = (numHelpers >= USE_TREE_BROADCAST_THRESHOLD);
-       
-       int pestart = CkNodeFirst(CkMyNode());
+
+    mode = mode_;
+
+    if(mode == NODEHELPER_USECHARM){
+        //CkPrintf("FuncNodeHelper created on node %d\n", CkMyNode());                     
+        numHelpers = CkMyNodeSize();
+        helperPtr = new FuncSingleHelper *[numHelpers];
+        useTreeBcast = (numHelpers >= USE_TREE_BROADCAST_THRESHOLD);
+        
+        int pestart = CkNodeFirst(CkMyNode());
+            
+        for (int i=0; i<numHelpers; i++) {
+            CkChareID helper;
+            CProxy_FuncSingleHelper::ckNew((size_t)this, &helper, pestart+i);
+        }
+    }else if(mode == NODEHELPER_PTHREAD){
+               helperPtr = NULL;
                
-       for (int i=0; i<numHelpers; i++) {
-        CkChareID helper;
-        CProxy_FuncSingleHelper::ckNew((size_t)this, &helper, pestart+i);
-       }       
-#endif
+               numHelpers = numThreads_;
+               useTreeBcast = (numHelpers >= USE_TREE_BROADCAST_THRESHOLD);
+               pthdLoop = new CurLoopInfo(FuncNodeHelper::MAX_CHUNKS);
+               mainHelper = this;
+        createPThreads();
+    }
 }
 
 int FuncNodeHelper::MAX_CHUNKS = 64;
@@ -49,32 +146,49 @@ void FuncNodeHelper::parallelizeFunc(HelperFn func, int paramNum, void * param,
      * for chunk indexed at remainder to numChunks-1, stride is "unit"
      */
      int stride;
+        CurLoopInfo *curLoop = NULL;
     
     //for using nodequeue
        TRACE_START(20);
-       
-       FuncSingleHelper *thisHelper = helperPtr[CkMyRank()];
-#if ALLOW_MULTIPLE_UNSYNC
-    ConverseNotifyMsg *notifyMsg = thisHelper->getNotifyMsg();
-#else
-    ConverseNotifyMsg *notifyMsg = thisHelper->notifyMsg;
-#endif
-    CurLoopInfo *curLoop = (CurLoopInfo *)(notifyMsg->ptr);
-       curLoop->set(numChunks, func, lowerRange, upperRange, paramNum, param); 
-       if(useTreeBcast){               
-               int loopTimes = TREE_BCAST_BRANCH>(CmiMyNodeSize()-1)?CmiMyNodeSize()-1:TREE_BCAST_BRANCH;
-               //just implicit binary tree
-               int pe = CmiMyRank()+1;        
-               for(int i=0; i<loopTimes; i++, pe++){
-                       if(pe >= CmiMyNodeSize()) pe -= CmiMyNodeSize();
-                       CmiPushPE(pe, (void *)(notifyMsg));    
+       if(mode == NODEHELPER_USECHARM){        
+               FuncSingleHelper *thisHelper = helperPtr[CkMyRank()];
+       #if ALLOW_MULTIPLE_UNSYNC
+               ConverseNotifyMsg *notifyMsg = thisHelper->getNotifyMsg();
+       #else
+               ConverseNotifyMsg *notifyMsg = thisHelper->notifyMsg;
+       #endif
+               curLoop = (CurLoopInfo *)(notifyMsg->ptr);
+               curLoop->set(numChunks, func, lowerRange, upperRange, paramNum, param); 
+               if(useTreeBcast){               
+                       int loopTimes = TREE_BCAST_BRANCH>(CmiMyNodeSize()-1)?CmiMyNodeSize()-1:TREE_BCAST_BRANCH;
+                       //just implicit binary tree
+                       int pe = CmiMyRank()+1;        
+                       for(int i=0; i<loopTimes; i++, pe++){
+                               if(pe >= CmiMyNodeSize()) pe -= CmiMyNodeSize();
+                               CmiPushPE(pe, (void *)(notifyMsg));    
+                       }
+               }else{
+                       for (int i=0; i<numHelpers; i++) {
+                               if (i!=CkMyRank()) CmiPushPE(i, (void *)(notifyMsg));            
+                       }
                }
-       }else{
-               for (int i=0; i<numHelpers; i++) {
-                       if (i!=CkMyRank()) CmiPushPE(i, (void *)(notifyMsg));            
+       }else if(mode == NODEHELPER_PTHREAD){
+               int numThreads = numHelpers-1;
+               curLoop = pthdLoop;
+               curLoop->set(numChunks, func, lowerRange, upperRange, paramNum, param);
+               int numNotices = numThreads;
+               if(useTreeBcast){
+                       numNotices = TREE_BCAST_BRANCH>=numThreads?numThreads:TREE_BCAST_BRANCH;
                }
+               for(int i=0; i<numNotices; i++){
+                       pthread_mutex_lock(allLocks[i]);
+                       pthread_cond_signal(allConds[i]);
+                       pthread_mutex_unlock(allLocks[i]);
+               }
+               //in this mode, it's always synced
+               sync = 1;
        }
-    
+       
        curLoop->stealWork();
        TRACE_BRACKET(20);
        
@@ -204,9 +318,18 @@ void CurLoopInfo::stealWork(){
 //   End of functions related with FuncSingleHelper                     //
 //======================================================================//
 
-CProxy_FuncNodeHelper NodeHelper_Init(){
-    CkPrintf("NodeHelperLib is used in SMP with a simple dynamic scheduling but not using node-level queue\n");
-    return CProxy_FuncNodeHelper::ckNew();
+CProxy_FuncNodeHelper NodeHelper_Init(int mode, int numThreads){
+    if(mode == NODEHELPER_USECHARM){
+        CkPrintf("NodeHelperLib is used in SMP with a simple dynamic scheduling but not using node-level queue\n");
+    }else if(mode==NODEHELPER_PTHREAD){
+        CkPrintf("NodeHelperLib is used with extra %d pthreads via a simple dynamic scheduling\n", numThreads);
+        CmiAssert(numThreads>0);
+    }
+    return CProxy_FuncNodeHelper::ckNew(mode, numThreads);
+}
+
+void NodeHelper_Exit(CProxy_FuncNodeHelper nodeHelper){
+       nodeHelper.exit();
 }
 
 void NodeHelper_Parallelize(CProxy_FuncNodeHelper nodeHelper, HelperFn func, 
index b8ce748d8c8fd97271a699bd6f42592abfc394c1..8e2ad8b9ead6525f1a37116d4d5634f14f4cb81d 100644 (file)
@@ -1,7 +1,8 @@
 module NodeHelper{
     initproc void RegisterNodeHelperHdlrs(void);
        nodegroup FuncNodeHelper{
-               entry FuncNodeHelper();
+               entry FuncNodeHelper(int mode, int numThreads);
+               entry void exit();
        };
        chare FuncSingleHelper{
                entry FuncSingleHelper(size_t);
index 3ddbecc4386595eee6bc5abc096215c739125493..6d652a7f3347b431bc0fd73da2cf5a95e30d1147 100644 (file)
@@ -101,16 +101,24 @@ class FuncNodeHelper : public CBase_FuncNodeHelper {
        
 public:
     static int MAX_CHUNKS;
-private:    
-    int numHelpers;    
+private:
+    int mode;
+        
+    int numHelpers; //in pthread mode, the counter includes itself    
     FuncSingleHelper **helperPtr; /* ptrs to the FuncSingleHelpers it manages */
        int useTreeBcast;
     
 public:
-       FuncNodeHelper();
+       FuncNodeHelper(int mode_, int numThreads_);
     ~FuncNodeHelper() {
         delete [] helperPtr;
     }
+       
+       void createPThreads();
+       void exit();
+       
+       int getNumHelpers() { return numHelpers; }
+       int needTreeBcast() { return useTreeBcast; }
     
     void parallelizeFunc(HelperFn func, /* the function that finishes a partial work on another thread */
                         int paramNum, void * param, /* the input parameters for the above func */
index cf9bc795c964ea209e5d9811c2277fd0c17b9e5f..bf29a77f51b52947166801b9779ae96e57b6ece8 100644 (file)
@@ -13,9 +13,19 @@ typedef enum REDUCTION_TYPE{
     NODEHELPER_DOUBLE_SUM
 }REDUCTION_TYPE;
 
+#define NODEHELPER_USECHARM 1
+#define NODEHELPER_PTHREAD 2
+
 class CProxy_FuncNodeHelper;
-/* currently only thinking of SMP mode */
-extern CProxy_FuncNodeHelper NodeHelper_Init();
+/* 
+ * The default mode is intended to be used in SMP mode 
+ * The next mode that uses pthread is intended to be used in a restricted mode where
+ * a node just have one charm PE!
+ **/
+extern CProxy_FuncNodeHelper NodeHelper_Init(int mode=NODEHELPER_USECHARM, int numThreads=0);
+
+extern void NodeHelper_Exit(CProxy_FuncNodeHelper nodeHelper); /* used to free resources if using pthread mode. It should be called on just one PE, say PE 0 */                                                                                
+
 extern void NodeHelper_Parallelize(
                                                CProxy_FuncNodeHelper nodeHelper, /* the proxy to the FuncNodeHelper instance */
                                                HelperFn func, /* the function that finishes a partial work on another thread */
index 16bcff934f65ace7d7c85647e30a31b0b071b154..d4e796e8b2f9a958edfd4d9d84a44c9e46656169 100644 (file)
@@ -65,8 +65,7 @@ Main::Main(CkArgMsg* m) {
     totalElems = 1;
        numChunks = CkMyNodeSize();
        loopTimes = 1000;
-       //runningMode = NODEHELPER_STATIC;
-       runningMode = 3; 
+       runningMode = NODEHELPER_USECHARM; 
        
     mainStep = 0;
        numElemFinished = 0;
@@ -78,7 +77,7 @@ Main::Main(CkArgMsg* m) {
         processCommandLine(m->argc,m->argv);
        }
     else{              
-               CkPrintf("Usage: -t(#iterations) -c(#chunks) -a(#test instances) -m(running mode, 0 (non-SMP) or 1|2 (SMP))  -p(#threads)\n");
+               CkPrintf("Usage: -t(#iterations) -c(#chunks) -a(#test instances) -m(running mode, 1 for use Charm threads; 2 for use pthreads )  -p(#threads)\n");
        }
     delete m;
        
@@ -90,8 +89,8 @@ Main::Main(CkArgMsg* m) {
        CkPrintf("Using NodeHelper Lib with mode: %d, nodesize=%d\n", runningMode, CkMyNodeSize());
        CkPrintf("Testcase info: %d test instances where the loop iterates %d times, each work is partitioned into %d tasks\n", totalElems, loopTimes, numChunks);
        
-       //nodeHelperProxy = NodeHelper_Init(runningMode, threadNum);
-       nodeHelperProxy = NodeHelper_Init();
+       nodeHelperProxy = NodeHelper_Init(runningMode, threadNum);
+       //nodeHelperProxy = NodeHelper_Init();
     mainProxy = thishandle;
     
        //create test instances