Added a new scheme (work-stealing) which significantly reduced the overhead. E.g...
authorChao Mei <chaomei2@illinois.edu>
Fri, 27 Jan 2012 04:22:14 +0000 (22:22 -0600)
committerChao Mei <chaomei2@illinois.edu>
Fri, 27 Jan 2012 04:22:14 +0000 (22:22 -0600)
NodeHelper.C
NodeHelper.h
NodeHelperAPI.h

index 4ca49ac0cc8bf61bf5f3ba4204d2ff3c29c895c8..25ef04198d40ea9d533f585109edf9f0de966bbd 100644 (file)
@@ -111,14 +111,13 @@ FuncNodeHelper::FuncNodeHelper(int mode_,int numThds_):
     
 #if CMK_SMP
     if (mode==NODEHELPER_DYNAMIC || 
-        mode==NODEHELPER_STATIC) {
+        mode==NODEHELPER_STATIC
+        || mode==NODEHELPER_CHARE_DYNAMIC) {
         numHelpers = CkMyNodeSize();
         helperArr = new CkChareID[numHelpers];
         helperPtr = new FuncSingleHelper *[numHelpers];
         
-#if USE_CONVERSE_MSG
         notifyMsgs = (ConverseNotifyMsg *)malloc(sizeof(ConverseNotifyMsg)*numHelpers);
-#endif        
         
         int pestart = CkNodeFirst(CkMyNode());
         for (int i=0; i<numHelpers; i++) {
@@ -168,10 +167,6 @@ void FuncNodeHelper::parallelizeFunc(HelperFn func, int paramNum, void * param,
     }
         
     Task **task = helperPtr[CkMyRank()]->getTasksMem();
-
-    int first = lowerRange;
-    int unit = (upperRange-lowerRange+1)/numChunks;
-    int remainder = (upperRange-lowerRange+1)-unit*numChunks;
     
     /* "stride" determines the number of loop iterations to be done in each chunk
      * for chunk indexed at 0 to remainder-1, stride is "unit+1";
@@ -182,6 +177,9 @@ void FuncNodeHelper::parallelizeFunc(HelperFn func, int paramNum, void * param,
     //for using nodequeue
 #if CMK_SMP
     if (mode==NODEHELPER_DYNAMIC) {
+        int first = lowerRange;
+        int unit = (upperRange-lowerRange+1)/numChunks;
+        int remainder = (upperRange-lowerRange+1)-unit*numChunks;        
         CProxy_FuncNodeHelper fh(thisgroup);
 
         TRACE_START(20);        
@@ -210,6 +208,10 @@ void FuncNodeHelper::parallelizeFunc(HelperFn func, int paramNum, void * param,
         fs->counter = 0;
         TRACE_BRACKET(21);        
     } else if (mode==NODEHELPER_STATIC) {
+        int first = lowerRange;
+        int unit = (upperRange-lowerRange+1)/numChunks;
+        int remainder = (upperRange-lowerRange+1)-unit*numChunks;
+
         TRACE_START(20);
                 
         stride = unit+1;
@@ -252,6 +254,25 @@ void FuncNodeHelper::parallelizeFunc(HelperFn func, int paramNum, void * param,
         //waitDone(task,numChunks);
         
         TRACE_BRACKET(21);
+    }else if(mode == NODEHELPER_CHARE_DYNAMIC){
+        TRACE_START(20);
+        
+        FuncSingleHelper *thisHelper = helperPtr[CkMyRank()];
+        CurLoopInfo *curLoop = thisHelper->curLoop;
+        curLoop->set(numChunks, func, lowerRange, upperRange, paramNum, param);
+        
+        for (int i=0; i<numHelpers; i++) {
+            if (i!=CkMyRank()) {
+                notifyMsgs[i].ptr = (void *)curLoop;
+                CmiPushPE(i, (void *)(notifyMsgs+i));
+            }
+        }        
+        curLoop->stealWork();
+        TRACE_BRACKET(20);
+        
+        TRACE_START(21);                
+        curLoop->waitLoopDone();
+        TRACE_BRACKET(21);        
     }
 #else
 //non-SMP case
@@ -348,21 +369,33 @@ void FuncNodeHelper::waitThreadDone(int chunck) {
 #endif
 
 void FuncNodeHelper::printMode(int mode) {
-    if(mode == NODEHELPER_PTHREAD){
-        CkPrintf("NodeHelperLib is used in non-SMP using pthread with a simple dynamic scheduling\n");
-    }else if(mode == NODEHELPER_DYNAMIC){
-        CkPrintf("NodeHelperLib is used in SMP with a simple dynamic scheduling\n");
-    }else if(mode == NODEHELPER_STATIC){
-        CkPrintf("NodeHelperLib is used in SMP with a simple static scheduling\n");
+    switch(mode){
+        case NODEHELPER_PTHREAD:
+            CkPrintf("NodeHelperLib is used in non-SMP using pthread with a simple dynamic scheduling\n");
+            break;
+        case NODEHELPER_DYNAMIC:
+            CkPrintf("NodeHelperLib is used in SMP with a simple dynamic scheduling\n");
+            break;
+        case NODEHELPER_STATIC:
+            CkPrintf("NodeHelperLib is used in SMP with a simple static scheduling\n");
+            break;
+        case NODEHELPER_CHARE_DYNAMIC:
+            CkPrintf("NodeHelperLib is used in SMP with a simple dynamic scheduling but not using node-level queue\n");
+            break;
+        default:
+            CkPrintf("ERROR: NodeHelperLib is used in unknown mode\n");
     }
 }
 
-#if USE_CONVERSE_MSG
 void NotifySingleHelper(ConverseNotifyMsg *msg){
-    FuncSingleHelper *h = msg->ptr;
+    FuncSingleHelper *h = (FuncSingleHelper *)msg->ptr;
     h->processWork(0);
 }
-#endif
+
+void SingleHelperStealWork(ConverseNotifyMsg *msg){
+    CurLoopInfo *loop = (CurLoopInfo *)msg->ptr;
+    loop->stealWork();
+}
 
 //======================================================================//
 // Functions regarding helpers that parallelize a single function on a  //
@@ -392,6 +425,30 @@ void FuncSingleHelper::processWork(int filler) {
         CmiUnlock(reqLock);
     }
 }
+
+void CurLoopInfo::stealWork(){
+    int first, last;
+    int unit = (upperIndex-lowerIndex+1)/numChunks;
+    int remainder = (upperIndex-lowerIndex+1)-unit*numChunks;
+    int markIdx = remainder*(unit+1);
+    
+    int nextChunkId = getNextChunkIdx();
+    while(nextChunkId < numChunks){
+        if(nextChunkId < remainder){
+            first = (unit+1)*nextChunkId;
+            last = first+unit;
+        }else{
+            first = (nextChunkId - remainder)*unit + markIdx;
+            last = first+unit-1;
+        }
+                
+        fnPtr(first, last, redBufs[nextChunkId], paramNum, param);
+        
+        nextChunkId = getNextChunkIdx();
+    }
+    reportFinished();    
+}
+
 //======================================================================//
 //   End of functions related with FuncSingleHelper                     //
 //======================================================================//
index 04f0c3f95117348c962110ae161bf31aa5fc0e9c..e5836ac09eb7cd8281d4ec89b0738f4c693e30c5 100644 (file)
@@ -68,22 +68,65 @@ public:
     }
 };
 
+class FuncSingleHelper;
+
+class CurLoopInfo{
+    friend class FuncSingleHelper;
+    
+private:
+    volatile int curChunkIdx;
+    int numChunks;
+    HelperFn fnPtr;
+    int lowerIndex;
+    int upperIndex;
+    int paramNum;
+    void *param;
+    //limitation: only allow single variable reduction of size numChunks!!!
+    void **redBufs;
+    
+    volatile int finishFlag;
+    
+public:    
+    CurLoopInfo():numChunks(0),fnPtr(NULL), lowerIndex(-1), upperIndex(0), 
+    paramNum(0), param(NULL), curChunkIdx(-1), finishFlag(0), redBufs(NULL) {}
+    
+    ~CurLoopInfo() { delete [] redBufs; }
+    
+    void set(int nc, HelperFn f, int lIdx, int uIdx, int numParams, void *p){        
+        numChunks = nc;
+        fnPtr = f;
+        lowerIndex = lIdx;
+        upperIndex = uIdx;
+        paramNum = numParams;
+        param = p;
+        curChunkIdx = -1;
+        finishFlag = 0;
+    }
+      
+    void waitLoopDone(){
+        while(!__sync_bool_compare_and_swap(&finishFlag, numChunks, 0));
+    }
+    int getNextChunkIdx(){
+        return __sync_add_and_fetch(&curChunkIdx, 1);
+    }
+    void reportFinished(){
+        __sync_add_and_fetch(&finishFlag, 1);
+    }
+    
+    void stealWork();
+};
 
 /* FuncNodeHelper is a nodegroup object */
-class FuncSingleHelper;
-#if USE_CONVERSE_MSG
+
 typedef struct converseNotifyMsg{
     char core[CmiMsgHeaderSizeBytes];
-    FuncSingleHelper *ptr;
+    void *ptr;
 }ConverseNotifyMsg;
-#endif
 
 class FuncNodeHelper : public CBase_FuncNodeHelper {
     friend class FuncSingleHelper;
 private:
-#if USE_CONVERSE_MSG
-    ConverseNotifyMsg *notifyMsgs;         
-#endif
+    ConverseNotifyMsg *notifyMsgs;
 
 public:
     static int MAX_CHUNKS;
@@ -101,19 +144,18 @@ public:
     ~FuncNodeHelper() {
         delete [] helperArr;
         delete [] helperPtr;
-    #if USE_CONVERSE_MSG
-        delete [] notifyMsgs;
-    #endif
+        delete [] notifyMsgs;        
     }
 
     /* handler is only useful when converse msg is used to initiate tasks on the pseudo-thread */
     void oneHelperCreated(int hid, CkChareID cid, FuncSingleHelper* cptr, int handler=0) {
         helperArr[hid] = cid;
         helperPtr[hid] = cptr;
-#if USE_CONVERSE_MSG        
-        notifyMsgs[hid].ptr = cptr;
+
         CmiSetHandler(&(notifyMsgs[hid]), handler);
-#endif
+        if(mode == NODEHELPER_STATIC) {
+            notifyMsgs[hid].ptr = (void *)cptr;         
+        }
     }
 
 #if CMK_SMP
@@ -138,9 +180,8 @@ public:
     void reduce(Task **thisReq, void *redBuf, REDUCTION_TYPE type, int numChunks);
 };
 
-#if USE_CONVERSE_MSG
 void NotifySingleHelper(ConverseNotifyMsg *msg);
-#endif
+void SingleHelperStealWork(ConverseNotifyMsg *msg);
 
 /* FuncSingleHelper is a chare located on every core of a node */
 class FuncSingleHelper: public CBase_FuncSingleHelper {
@@ -149,6 +190,7 @@ private:
     /* BE CAREFUL ABOUT THE FILEDS LAYOUT CONSIDERING CACHE EFFECTS */
     volatile int counter;
     int notifyHandler;
+    int stealWorkHandler;
     CkGroupID nodeHelperID;
     FuncNodeHelper *thisNodeHelper;
     Queue reqQ; /* The queue may be simplified for better performance */
@@ -158,6 +200,9 @@ private:
     /* To reuse such Task memory as each SingleHelper (i.e. a PE) will only
      * process one node-level parallelization at a time */
     Task **tasks; /* Note the Task type is a message */
+    
+    CurLoopInfo *curLoop; /* Points to the current loop that is being processed */
+    
 public:
     FuncSingleHelper(CkGroupID nid):nodeHelperID(nid) {
         reqQ = CqsCreate();
@@ -172,16 +217,19 @@ public:
         thisNodeHelper = fh[CkMyNode()].ckLocalBranch();
         CmiAssert(thisNodeHelper!=NULL);
         
-        notifyHandler = 0;
-    #if USE_CONVERSE_MSG
         notifyHandler = CmiRegisterHandler((CmiHandler)NotifySingleHelper);
-    #endif
+        stealWorkHandler = CmiRegisterHandler((CmiHandler)SingleHelperStealWork);
+            
+        curLoop = new CurLoopInfo();
+        curLoop->redBufs = new void *[FuncNodeHelper::MAX_CHUNKS];
+        for(int i=0; i<FuncNodeHelper::MAX_CHUNKS; i++) curLoop->redBufs[i] = (void *)(tasks[i]->redBuf);
     }
 
     ~FuncSingleHelper() {
         for(int i=0; i<FuncNodeHelper::MAX_CHUNKS; i++) delete tasks[i];
         delete [] tasks;        
                CmiDestroyLock(reqLock);
+        delete curLoop;
     }
     
     FuncSingleHelper(CkMigrateMessage *m) {}
@@ -197,9 +245,14 @@ public:
     }
     void processWork(int filler); /* filler is here in order to use CkEntryOptions for setting msg priority */
     void reportCreated() {
-        //CkPrintf("Single helper %d is created on rank %d\n", CkMyPe(), CkMyRank());        
-        thisNodeHelper->oneHelperCreated(CkMyRank(), thishandle, this, notifyHandler);
-    }
+        //CkPrintf("Single helper %d is created on rank %d\n", CkMyPe(), CkMyRank());
+        if(thisNodeHelper->mode == NODEHELPER_DYNAMIC)
+            thisNodeHelper->oneHelperCreated(CkMyRank(), thishandle, this, -1);
+        else if(thisNodeHelper->mode == NODEHELPER_STATIC)
+            thisNodeHelper->oneHelperCreated(CkMyRank(), thishandle, this, notifyHandler);
+        else if(thisNodeHelper->mode == NODEHELPER_CHARE_DYNAMIC)
+            thisNodeHelper->oneHelperCreated(CkMyRank(), thishandle, this, stealWorkHandler);
+    }    
 };
 
 #endif
index 0b26195ff674652f0cf39730183afc59d696d183..948b73bb2e9861ddf9a41efbdc35480b00313ecb 100644 (file)
@@ -9,6 +9,7 @@ typedef void (*HelperFn)(int first,int last, void *result, int paramNum, void *p
 #define NODEHELPER_PTHREAD 0
 #define NODEHELPER_DYNAMIC  1
 #define NODEHELPER_STATIC 2
+#define NODEHELPER_CHARE_DYNAMIC 3
 
 typedef enum REDUCTION_TYPE{
     NODEHELPER_NONE=0,