Clean up the codes so that only the best implementation is kept. The old
authorChao Mei <chaomei2@illinois.edu>
Wed, 22 Feb 2012 20:28:43 +0000 (14:28 -0600)
committerChao Mei <chaomei2@illinois.edu>
Wed, 22 Feb 2012 20:28:43 +0000 (14:28 -0600)
one is moved to "alloptions" directory for performance testing reference.

NodeHelper.C
NodeHelper.ci
NodeHelper.h
NodeHelperAPI.h
example/simpleLoopBench/hello.C

index 454a325c71cfc2091338caed621ce2c73e75454e..6f821b6ae5ec5965075d20411a49d3b2ac27d6b9 100644 (file)
 #include "NodeHelper.h"
 
-//=======Beginning of pthread version of scheduling which is used in non-SMP =======//
-#if !CMK_SMP
-NodeQueue Q;
-
-//vars local to spawned threads
-//Note: __thread is not portable, but works pretty much anywhere pthreads work.
-// after C++11 this should be thread_local
-__thread pthread_mutex_t lock;
-__thread pthread_cond_t condition;
-
-//vars to the main flow (master thread)
-pthread_t * threads;
-pthread_mutex_t **allLocks;
-pthread_cond_t **allConds;
-
-//global barrier
-pthread_mutex_t gLock;
-pthread_cond_t gCond;
-pthread_barrier_t barr;
-//testing counter
-volatile int finishedCnt;
-
-void * threadWork(void * id) {
-    long my_id =(long) id;
-    //printf("thread :%ld\n",my_id);
-    CmiSetCPUAffinity(my_id+1);
-
-    pthread_mutex_init(&lock, NULL);
-    pthread_cond_init(&condition, NULL);
-
-    allLocks[my_id] = &lock;
-    allConds[my_id] = &condition;
-
-    while (1) {
-        pthread_mutex_lock(&lock);
-        pthread_cond_wait(&condition,&lock);
-        pthread_mutex_unlock(&lock);
-        void * r;
-        Task * one;
-        CmiLock(Q->lock);
-        CqsDequeue(Q->nodeQ,&r);
-        CmiUnlock(Q->lock);
-        one=(Task *)r;
-
-        while (one) {
-            //printf("starttime:%lf,id:%ld,proc:%d\n",CmiWallTimer(),my_id,CkMyPe());
-            (one->fnPtr)(one->first, one->last, (void *)(one->redBuf), one->paramNum, one->param);
-            pthread_barrier_wait(&barr);
-            //one->setFlag();
-            //printf
-            //printf("endtime:%lf,id:%ld\n",CmiWallTimer(),my_id);
-
-            //Testing
-            //AtomicIncrement(finishedCnt);
-            if (my_id==0)
-                finishedCnt=4;
-            //printf("finishedCnt = %d\n", finishedCnt);
-
-            CmiLock((Q->lock));
-            CqsDequeue(Q->nodeQ,&r);
-            CmiUnlock((Q->lock));
-            one=(Task *)r;
-
-        }
-    }
-
-}
-
-void FuncNodeHelper::createThread() {
-    int threadNum = numThds;
-    pthread_attr_t attr;
-    finishedCnt=0;
-    pthread_barrier_init(&barr,NULL,threadNum);
-    allLocks = (pthread_mutex_t **)malloc(sizeof(void *)*threadNum);
-    allConds = (pthread_cond_t **)malloc(sizeof(void *)*threadNum);
-    memset(allLocks, 0, sizeof(void *)*threadNum);
-    memset(allConds, 0, sizeof(void *)*threadNum);
-
-    pthread_attr_init(&attr);
-    pthread_attr_setdetachstate(&attr,PTHREAD_CREATE_JOINABLE);
-    Q=(NodeQueue )malloc(sizeof(struct SimpleQueue));
-    Q->nodeQ=CqsCreate();
-    Q->lock=CmiCreateLock();
-    /*for(int i=0;i<threadNum;i++){
-       //Q[i]=
-       Q[i]=(NodeQueue)malloc(sizeof(struct SimpleQueue));
-       Q[i]->nodeQ=CqsCreate();
-       Q[i]->lock=CmiCreateLock();
-    }*/
-    threads=(pthread_t *)malloc(threadNum*sizeof(pthread_t));
-
-
-    //create queue;
-    for (int i=0; i<threadNum; i++)
-        pthread_create(&threads[i],&attr,threadWork,(void *)i);
-}
-#endif //end of !CMK_SMP (definitions for vars and functions used in non-SMP)
-
-//=======End of pthread version of static scheduling=======//
-
-FuncNodeHelper::FuncNodeHelper(int mode_,int numThds_):
-    mode(mode_), numThds(numThds_)
-{   
-    
+FuncNodeHelper::FuncNodeHelper()
+{  
+#if CMK_SMP    
     //CkPrintf("FuncNodeHelper created on node %d\n", CkMyNode());
          
-    traceRegisterUserEvent("assign work",20);
-    traceRegisterUserEvent("finish signal",21);
+    traceRegisterUserEvent("nodehelper total work",20);
+    traceRegisterUserEvent("nodehlelper finish signal",21);
     
-#if CMK_SMP
-    if (mode==NODEHELPER_DYNAMIC || 
-        mode==NODEHELPER_STATIC
-        || mode==NODEHELPER_CHARE_DYNAMIC) {
-        numHelpers = CkMyNodeSize();
-        helperArr = new CkChareID[numHelpers];
-        helperPtr = new FuncSingleHelper *[numHelpers];
-        
-        notifyMsgs = (ConverseNotifyMsg *)malloc(sizeof(ConverseNotifyMsg)*numHelpers);
-        
-        int pestart = CkNodeFirst(CkMyNode());
-        for (int i=0; i<numHelpers; i++) {
-            CProxy_FuncSingleHelper::ckNew(thisgroup, &helperArr[i], pestart+i);
-            helperPtr[i] = NULL;
-        }
-        for (int i=0; i<numHelpers; i++) {
-            CProxy_FuncSingleHelper helpProxy(helperArr[i]);
-            helpProxy.reportCreated();
-        }
-    }
-#else
-    CmiAssert(mode==NODEHELPER_PTHREAD);
-    createThread();    
+       numHelpers = CkMyNodeSize();
+       helperPtr = new FuncSingleHelper *[numHelpers];
+       
+       notifyMsgs = (ConverseNotifyMsg *)malloc(sizeof(ConverseNotifyMsg)*numHelpers);
+       for(int i=0; i<numHelpers; i++) notifyMsgs[i].srcRank = -1;
+       
+       useTreeBcast = (numHelpers >= USE_TREE_BROADCAST_THRESHOLD);
+       
+       int pestart = CkNodeFirst(CkMyNode());
+       
+       CkChareID *helperArr = new CkChareID[numHelpers];
+       for (int i=0; i<numHelpers; i++) {
+               CProxy_FuncSingleHelper::ckNew(thisgroup, &helperArr[i], pestart+i);
+               helperPtr[i] = NULL;
+       }
+       for (int i=0; i<numHelpers; i++) {
+               CProxy_FuncSingleHelper helpProxy(helperArr[i]);
+               helpProxy.reportCreated();
+       }
+       delete [] helperArr;
 #endif
 }
 
-/* Used for dynamic scheduling as it's a node-level msg */
-/* So this function will be executed on any PE of this node */
-void FuncNodeHelper::send(Task * msg) {
-    (msg->fnPtr)(msg->first,msg->last,(void *)(msg->redBuf),msg->paramNum, msg->param);
-    CmiNodeLock lock = helperPtr[msg->originRank]->reqLock;
-    CmiLock(lock);
-    helperPtr[msg->originRank]->counter++;
-    CmiUnlock(lock);
-}
-
 int FuncNodeHelper::MAX_CHUNKS = 64;
 
 #if CMK_TRACE_ENABLED
@@ -165,9 +51,7 @@ void FuncNodeHelper::parallelizeFunc(HelperFn func, int paramNum, void * param,
         CkPrintf("NodeHelper[%d]: WARNING! chunk is set to MAX_CHUNKS=%d\n", CmiMyPe(), MAX_CHUNKS);
         numChunks = MAX_CHUNKS;
     }
-        
-    Task **task = helperPtr[CkMyRank()]->getTasksMem();
-    
+       
     /* "stride" determines the number of loop iterations to be done in each chunk
      * for chunk indexed at 0 to remainder-1, stride is "unit+1";
      * for chunk indexed at remainder to numChunks-1, stride is "unit"
@@ -175,164 +59,49 @@ void FuncNodeHelper::parallelizeFunc(HelperFn func, int paramNum, void * param,
      int stride;
     
     //for using nodequeue
-#if CMK_SMP
-    if (mode==NODEHELPER_DYNAMIC) {
-        int first = lowerRange;
-        int unit = (upperRange-lowerRange+1)/numChunks;
-        int remainder = (upperRange-lowerRange+1)-unit*numChunks;        
-        CProxy_FuncNodeHelper fh(thisgroup);
-
-        TRACE_START(20);        
-        stride = unit+1;
-        for (int i=0; i<remainder; i++, first+=stride) {          
-            task[i]->init(func, first, first+stride-1, CkMyRank(), paramNum, param);
-            *((int *)CkPriorityPtr(task[i]))=msgPriority;
-            CkSetQueueing(task[i],CK_QUEUEING_IFIFO);
-            fh[CkMyNode()].send(task[i]);
-        }
-        
-        stride = unit;
-        for(int i=remainder; i<numChunks; i++, first+=stride) {
-            task[i]->init(func, first, first+stride-1, CkMyRank(), paramNum, param);
-            *((int *)CkPriorityPtr(task[i]))=msgPriority;
-            CkSetQueueing(task[i],CK_QUEUEING_IFIFO);
-            fh[CkMyNode()].send(task[i]);
-        }
-        TRACE_BRACKET(20);
-        
-        TRACE_START(21);
-        FuncSingleHelper *fs = helperPtr[CmiMyRank()];
-        while (fs->counter!=numChunks)
-            CsdScheduleNodePoll();
-        //CkPrintf("counter:%d,master:%d\n",counter[master],master);
-        fs->counter = 0;
-        TRACE_BRACKET(21);        
-    } else if (mode==NODEHELPER_STATIC) {
-        int first = lowerRange;
-        int unit = (upperRange-lowerRange+1)/numChunks;
-        int remainder = (upperRange-lowerRange+1)-unit*numChunks;
-
-        TRACE_START(20);
-                
-        stride = unit+1;
-        for (int i=0; i<remainder; i++, first+=stride) {
-            task[i]->init(func, first, first+stride-1, 0, CkMyRank(),paramNum, param);            
-            helperPtr[i%numHelpers]->enqueueWork(task[i]);
-        }
-        
-        stride = unit;
-        for (int i=remainder; i<numChunks; i++, first+=stride) {
-            task[i]->init(func, first, first+stride-1, 0, CkMyRank(),paramNum, param);            
-            helperPtr[i%numHelpers]->enqueueWork(task[i]);
-        }
-        
-#if USE_CONVERSE_MSG
-        
-        for (int i=0; i<numHelpers; i++) {
-            if (i!=CkMyRank()) {
-                CmiPushPE(i, (void *)(notifyMsgs+i));
-            }
-        }
-#else
-        CkEntryOptions entOpts;
-        entOpts.setPriority(msgPriority);
-
-        for (int i=0; i<numHelpers; i++) {
-            if (i!=CkMyRank()) {
-                CProxy_FuncSingleHelper helpProxy(helperArr[i]);                
-                helpProxy.processWork(0, &entOpts);
-            }
-        }    
-#endif        
-        helperPtr[CkMyRank()]->processWork(0);
-        
-        TRACE_BRACKET(20);
-        
-        TRACE_START(21);
-                
-        while(!__sync_bool_compare_and_swap(&(helperPtr[CkMyRank()]->counter), numChunks, 0));
-        //waitDone(task,numChunks);
-        
-        TRACE_BRACKET(21);
-    }else if(mode == NODEHELPER_CHARE_DYNAMIC){
-        TRACE_START(20);
-        
-        FuncSingleHelper *thisHelper = helperPtr[CkMyRank()];
-        CurLoopInfo *curLoop = thisHelper->curLoop;
-        curLoop->set(numChunks, func, lowerRange, upperRange, paramNum, param);
-        ConverseNotifyMsg *notifyMsg = &(notifyMsgs[CmiMyRank()]);
-        notifyMsg->ptr = (void *)curLoop;
-#if USE_TREE_BROADCAST        
-        notifyMsg->srcRank = CmiMyRank();
-        int loopTimes = TREE_BCAST_BRANCH>(CmiMyNodeSize()-1)?CmiMyNodeSize()-1:TREE_BCAST_BRANCH;
-        //just implicit binary tree
-        int pe = CmiMyRank()+1;        
-        for(int i=0; i<loopTimes; i++, pe++){
-            if(pe >= CmiMyNodeSize()) pe -= CmiMyNodeSize();
-            CmiPushPE(pe, (void *)(notifyMsg));    
-        }
-#else        
-        for (int i=0; i<numHelpers; i++) {
-            if (i!=CkMyRank()) CmiPushPE(i, (void *)(notifyMsg));            
-        }
-#endif            
-        curLoop->stealWork();
-        TRACE_BRACKET(20);
-        
-        TRACE_START(21);                
-        curLoop->waitLoopDone();
-        TRACE_BRACKET(21);        
-    }
-#else
-//non-SMP case
-/* Only works in the non-SMP case */
-    CmiAssert(mode == NODEHELPER_PTHREAD);
-    
-    TRACE_START(20);
-    stride = unit+1;
-    for (int i=0; i<remainder; i++, first+=stride) {
-        task[i]->init(func, first, first+stride-1, 0, CkMyRank(),paramNum, param);            
-        CmiLock((Q->lock));
-        unsigned int t=(int)(CmiWallTimer()*1000);
-        CqsEnqueueGeneral((Q->nodeQ), (void *)task[i],CQS_QUEUEING_IFIFO,0,&t);
-        CmiUnlock((Q->lock));
-    }
-    
-    stride = unit;
-    for (int i=remainder; i<numChunks; i++, first+=stride) {
-        task[i]->init(func, first, first+stride-1, 0, CkMyRank(),paramNum, param);            
-        CmiLock((Q->lock));
-        unsigned int t=(int)(CmiWallTimer()*1000);
-        CqsEnqueueGeneral((Q->nodeQ), (void *)task[i],CQS_QUEUEING_IFIFO,0,&t);
-        CmiUnlock((Q->lock));
-    }    
-    //signal the thread
-    for (int i=0; i<threadNum; i++) {
-        pthread_mutex_lock(allLocks[i]);
-        pthread_cond_signal(allConds[i]);
-        pthread_mutex_unlock(allLocks[i]);
-    }
-    TRACE_BRACKET(20);
-    
-    TRACE_START(21);
-    //wait for the result
-    waitThreadDone(numChunks);
-    TRACE_BRACKET(21);
-#endif
+       TRACE_START(20);
+       
+       FuncSingleHelper *thisHelper = helperPtr[CkMyRank()];
+       CurLoopInfo *curLoop = thisHelper->curLoop;
+       curLoop->set(numChunks, func, lowerRange, upperRange, paramNum, param);
+       ConverseNotifyMsg *notifyMsg = &(notifyMsgs[CmiMyRank()]);
+       notifyMsg->ptr = (void *)curLoop;
+       
+       if(useTreeBcast){
+               notifyMsg->srcRank = CmiMyRank();
+               int loopTimes = TREE_BCAST_BRANCH>(CmiMyNodeSize()-1)?CmiMyNodeSize()-1:TREE_BCAST_BRANCH;
+               //just implicit binary tree
+               int pe = CmiMyRank()+1;        
+               for(int i=0; i<loopTimes; i++, pe++){
+                       if(pe >= CmiMyNodeSize()) pe -= CmiMyNodeSize();
+                       CmiPushPE(pe, (void *)(notifyMsg));    
+               }
+       }else{
+               for (int i=0; i<numHelpers; i++) {
+                       if (i!=CkMyRank()) CmiPushPE(i, (void *)(notifyMsg));            
+               }
+       }
+
+       curLoop->stealWork();
+       TRACE_BRACKET(20);
+       
+       TRACE_START(21);                
+       curLoop->waitLoopDone();
+       TRACE_BRACKET(21);        
 
     if (type!=NODEHELPER_NONE)
-        reduce(task, redResult, type, numChunks);            
+        reduce(curLoop->getRedBufs(), redResult, type, numChunks);            
     return;
 }
 
 #define COMPUTE_REDUCTION(T) {\
     for(int i=0; i<numChunks; i++) {\
-     result += *((T *)(thisReq[i]->redBuf)); \
+     result += *((T *)(redBufs[i])); \
      /*CkPrintf("Nodehelper Reduce: %d\n", result);*/ \
     }\
 }
 
-void FuncNodeHelper::reduce(Task ** thisReq, void *redBuf, REDUCTION_TYPE type, int numChunks) {
+void FuncNodeHelper::reduce(void **redBufs, void *redBuf, REDUCTION_TYPE type, int numChunks) {
     switch(type){
         case NODEHELPER_INT_SUM:
         {
@@ -360,95 +129,30 @@ void FuncNodeHelper::reduce(Task ** thisReq, void *redBuf, REDUCTION_TYPE type,
     }
 }
 
-#if CMK_SMP
-void FuncNodeHelper::waitDone(Task ** thisReq,int chunck) {
-    int flag = 1,i;
-    while (1) {
-        for (i=0; i<chunck; i++)
-            flag = flag & thisReq[i]->isFlagSet();
-        if (flag) break;
-        flag = 1;
-    }
-}
-#else
-void FuncNodeHelper::waitThreadDone(int chunck) {
-    while (finishedCnt!=chunck);
-    finishedCnt=0;
-}
-#endif
-
-void FuncNodeHelper::printMode(int mode) {
-    switch(mode){
-        case NODEHELPER_PTHREAD:
-            CkPrintf("NodeHelperLib is used in non-SMP using pthread with a simple dynamic scheduling\n");
-            break;
-        case NODEHELPER_DYNAMIC:
-            CkPrintf("NodeHelperLib is used in SMP with a simple dynamic scheduling\n");
-            break;
-        case NODEHELPER_STATIC:
-            CkPrintf("NodeHelperLib is used in SMP with a simple static scheduling\n");
-            break;
-        case NODEHELPER_CHARE_DYNAMIC:
-            CkPrintf("NodeHelperLib is used in SMP with a simple dynamic scheduling but not using node-level queue\n");
-            break;
-        default:
-            CkPrintf("ERROR: NodeHelperLib is used in unknown mode\n");
-    }
-}
-
-void NotifySingleHelper(ConverseNotifyMsg *msg){
-    FuncSingleHelper *h = (FuncSingleHelper *)msg->ptr;
-    h->processWork(0);
-}
-
 void SingleHelperStealWork(ConverseNotifyMsg *msg){
-#if USE_TREE_BROADCAST
-    //int numHelpers = CmiMyNodeSize(); //the value of "numHelpers" should be obtained somewhere else
-    int relPE = CmiMyRank()-msg->srcRank;
-    if(relPE<0) relPE += CmiMyNodeSize();
-    
-    //CmiPrintf("Rank[%d]: got msg from src %d with relPE %d\n", CmiMyRank(), msg->srcRank, relPE);
-    relPE=relPE*TREE_BCAST_BRANCH+1;
-    for(int i=0; i<TREE_BCAST_BRANCH; i++, relPE++){
-        if(relPE >= CmiMyNodeSize()) break;
-        int pe = (relPE + msg->srcRank)%CmiMyNodeSize();
-        //CmiPrintf("Rank[%d]: send msg to dst %d (relPE: %d) from src %d\n", CmiMyRank(), pe, relPE, msg->srcRank);
-        CmiPushPE(pe, (void *)msg);
-    }
-#endif
+       
+       int srcRank = msg->srcRank;
+       
+       if(srcRank >= 0){
+               //means using tree-broadcast to send the notification msg
+               
+               //int numHelpers = CmiMyNodeSize(); //the value of "numHelpers" should be obtained somewhere else
+               int relPE = CmiMyRank()-msg->srcRank;
+               if(relPE<0) relPE += CmiMyNodeSize();
+               
+               //CmiPrintf("Rank[%d]: got msg from src %d with relPE %d\n", CmiMyRank(), msg->srcRank, relPE);
+               relPE=relPE*TREE_BCAST_BRANCH+1;
+               for(int i=0; i<TREE_BCAST_BRANCH; i++, relPE++){
+                       if(relPE >= CmiMyNodeSize()) break;
+                       int pe = (relPE + msg->srcRank)%CmiMyNodeSize();
+                       //CmiPrintf("Rank[%d]: send msg to dst %d (relPE: %d) from src %d\n", CmiMyRank(), pe, relPE, msg->srcRank);
+                       CmiPushPE(pe, (void *)msg);
+               }
+       }
     CurLoopInfo *loop = (CurLoopInfo *)msg->ptr;
     loop->stealWork();
 }
 
-//======================================================================//
-// Functions regarding helpers that parallelize a single function on a  //
-// single node (like OpenMP)                                            // 
-//======================================================================//
-void FuncSingleHelper::processWork(int filler) {
-    Task *one = NULL; // = (WorkReqEntry *)SimpleQueuePop(reqQ);    
-    void *tmp;
-    
-    CmiLock(reqLock);
-    CqsDequeue(reqQ, &tmp);
-    CmiUnlock(reqLock);    
-
-    one = (Task *)tmp;
-    while (one) {
-        (one->fnPtr)(one->first,one->last,(void *)(one->redBuf), one->paramNum, one->param);
-        //int *partial = (int *)(one->redBuf);
-        //CkPrintf("SingleHelper[%d]: partial=%d\n", CkMyRank(), *partial);
-        
-        //one->setFlag();
-        __sync_add_and_fetch(&(thisNodeHelper->helperPtr[one->originRank]->counter), 1);
-        
-        
-        CmiLock(reqLock);
-        CqsDequeue(reqQ, &tmp);
-        one = (Task *)tmp;
-        CmiUnlock(reqLock);
-    }
-}
-
 void CurLoopInfo::stealWork(){
     //indicate the current work hasn't been initialized
     //or the old work has finished.
@@ -480,9 +184,9 @@ void CurLoopInfo::stealWork(){
 //   End of functions related with FuncSingleHelper                     //
 //======================================================================//
 
-CProxy_FuncNodeHelper NodeHelper_Init(int mode,int numThds){
-    FuncNodeHelper::printMode(mode);
-    return CProxy_FuncNodeHelper::ckNew(mode, numThds);
+CProxy_FuncNodeHelper NodeHelper_Init(){
+    CkPrintf("NodeHelperLib is used in SMP with a simple dynamic scheduling but not using node-level queue\n");
+    return CProxy_FuncNodeHelper::ckNew();
 }
 
 void NodeHelper_Parallelize(CProxy_FuncNodeHelper nodeHelper, HelperFn func, 
index fdc512da71899701ad097205f63a6f6b149a5913..1c3549a8fc2dd8227b86189d7ca708f3738166df 100644 (file)
@@ -1,14 +1,9 @@
 module NodeHelper{
-
-       message Task;
        nodegroup FuncNodeHelper{
-               entry FuncNodeHelper(int mode, int numThds);
-               entry void send(Task * msg);
+               entry FuncNodeHelper();
        };
        chare FuncSingleHelper{
                entry FuncSingleHelper(CkGroupID);
-               entry void processWork(int);
                entry void reportCreated();
-               
        };
 };
index c27988a70743969ff6626a9f6d853fdbc9790d14..c18c7c61205c61affdc67952f1e4cf3e95314651 100644 (file)
@@ -1,74 +1,13 @@
 #ifndef _NODEHELPER_H
 #define _NODEHELPER_H
-
-#include <pthread.h>
 #include <assert.h>
 
 #include "charm++.h"
 #include "NodeHelperAPI.h"
-#include "queueing.h"
 
-#define USE_CONVERSE_MSG 1
-#define USE_TREE_BROADCAST 0
+#define USE_TREE_BROADCAST_THRESHOLD 8
 #define TREE_BCAST_BRANCH (4)
-
-/* The following only works on X86_64 platform */
-#define AtomicIncrement(someInt)  __asm__ __volatile__("lock incl (%0)" :: "r" (&(someInt)))
-
-
-typedef struct SimpleQueue {
-    Queue nodeQ;
-    pthread_mutex_t * lock;
-}* NodeQueue;
-
-class Task:public CMessage_Task {
-public:
-    HelperFn fnPtr;
-    int first;
-    int last;
-    int originRank;
-    int flag;    
-    int paramNum;
-    void *param;
-    
-    //limitation: only allow single variable reduction!!!
-    char redBuf[sizeof(double)];
-    
-    //make sure 32-byte aligned so that each task doesn't cross cache lines
-    //char padding[32-(sizeof(int)*7+sizeof(void *)*2)%32];
-    
-    Task():fnPtr(NULL), param(NULL), paramNum(0) {}
-
-    void init(HelperFn fn,int first_,int last_,int rank){
-        fnPtr = fn;
-        first = first_;
-        last = last_;
-        originRank = rank;
-    }
-
-    void init(HelperFn fn,int first_,int last_,int flag_,int rank) {
-        init(fn, first_, last_, rank);
-        flag=flag_;
-    }
-    
-    void init(HelperFn fn,int first_,int last_,int rank, int paramNum_, void *param_) {
-        init(fn, first_, last_, rank); 
-        paramNum=paramNum_;
-        param=param_;
-    }
-    
-    void init(HelperFn fn,int first_,int last_,int flag_,int rank, int paramNum_, void *param_) {
-        init(fn, first_, last_, rank, paramNum_, param_);
-        flag=flag_;
-    }
-    
-    void setFlag() {
-        flag=1;
-    }
-    int isFlagSet() {
-        return flag;
-    }
-};
+#define CACHE_LINE_SIZE 64
 
 class FuncSingleHelper;
 
@@ -85,7 +24,8 @@ private:
     void *param;
     //limitation: only allow single variable reduction of size numChunks!!!
     void **redBufs;
-    
+       char *bufSpace;
+
     volatile int finishFlag;
     
     //a tag to indicate whether the task for this new loop has been inited
@@ -93,10 +33,18 @@ private:
     int inited;
     
 public:    
-    CurLoopInfo():numChunks(0),fnPtr(NULL), lowerIndex(-1), upperIndex(0), 
-    paramNum(0), param(NULL), curChunkIdx(-1), finishFlag(0), redBufs(NULL), inited(0) {}
-    
-    ~CurLoopInfo() { delete [] redBufs; }
+    CurLoopInfo(int maxChunks):numChunks(0),fnPtr(NULL), lowerIndex(-1), upperIndex(0), 
+    paramNum(0), param(NULL), curChunkIdx(-1), finishFlag(0), redBufs(NULL), bufSpace(NULL), inited(0) 
+       {
+               redBufs = new void *[maxChunks];
+               bufSpace = new char[maxChunks * CACHE_LINE_SIZE];
+        for(int i=0; i<maxChunks; i++) redBufs[i] = (void *)(bufSpace+i*CACHE_LINE_SIZE);
+       }
+    
+    ~CurLoopInfo() { 
+               delete [] redBufs; 
+               delete [] bufSpace;
+       }
     
     void set(int nc, HelperFn f, int lIdx, int uIdx, int numParams, void *p){        
         numChunks = nc;
@@ -122,6 +70,8 @@ public:
         __sync_add_and_fetch(&finishFlag, 1);
     }
     
+       void **getRedBufs() { return redBufs; }
+       
     void stealWork();
 };
 
@@ -129,57 +79,33 @@ public:
 
 typedef struct converseNotifyMsg{
     char core[CmiMsgHeaderSizeBytes];
-#if USE_TREE_BROADCAST
     int srcRank;
-#endif    
     void *ptr;
 }ConverseNotifyMsg;
 
 class FuncNodeHelper : public CBase_FuncNodeHelper {
     friend class FuncSingleHelper;
-private:
-    ConverseNotifyMsg *notifyMsgs;
-
+       
 public:
     static int MAX_CHUNKS;
-    static void printMode(int mode);
-
-public:
-    int numHelpers;
-    int mode; /* determine whether using dynamic or static scheduling */
-    
-    int numThds; /* only used for pthread version in non-SMP case, the expected #pthreads to be created */
-    
-    CkChareID *helperArr; /* chare ids to the FuncSingleHelpers it manages */
+private:
+    ConverseNotifyMsg *notifyMsgs;
+    int numHelpers;    
     FuncSingleHelper **helperPtr; /* ptrs to the FuncSingleHelpers it manages */
+       int useTreeBcast;
     
+public:
+       FuncNodeHelper();
     ~FuncNodeHelper() {
-        delete [] helperArr;
         delete [] helperPtr;
-        delete [] notifyMsgs;        
+        delete [] notifyMsgs;
     }
 
     /* handler is only useful when converse msg is used to initiate tasks on the pseudo-thread */
-    void oneHelperCreated(int hid, CkChareID cid, FuncSingleHelper* cptr, int handler=0) {
-        helperArr[hid] = cid;
+    void oneHelperCreated(int hid, FuncSingleHelper* cptr, int handler) {
         helperPtr[hid] = cptr;
-
         CmiSetHandler(&(notifyMsgs[hid]), handler);
-        if(mode == NODEHELPER_STATIC) {
-            notifyMsgs[hid].ptr = (void *)cptr;         
-        }
     }
-
-#if CMK_SMP
-    void  waitDone(Task ** thisReq,int chunck);
-#else  
-    void waitThreadDone(int chunck);
-    void createThread();
-#endif
-       
-       /* mode_: PTHREAD only available in non-SMP, while STATIC/DYNAMIC are available in SMP */
-       /* numThds: the expected number of pthreads to be spawned */
-    FuncNodeHelper(int mode_, int numThds_);
     
     void parallelizeFunc(HelperFn func, /* the function that finishes a partial work on another thread */
                         int paramNum, void * param, /* the input parameters for the above func */
@@ -188,82 +114,37 @@ public:
                         int lowerRange, int upperRange, /* the loop-like parallelization happens in [lowerRange, upperRange] */                        
                         void *redResult=NULL, REDUCTION_TYPE type=NODEHELPER_NONE /* the reduction result, ONLY SUPPORT SINGLE VAR of TYPE int/float/double */
                         );
-    void send(Task *);
-    void reduce(Task **thisReq, void *redBuf, REDUCTION_TYPE type, int numChunks);
+    void reduce(void **redBufs, void *redBuf, REDUCTION_TYPE type, int numChunks);
 };
 
-void NotifySingleHelper(ConverseNotifyMsg *msg);
 void SingleHelperStealWork(ConverseNotifyMsg *msg);
 
 /* FuncSingleHelper is a chare located on every core of a node */
 class FuncSingleHelper: public CBase_FuncSingleHelper {
        friend class FuncNodeHelper;
 private: 
-    /* BE CAREFUL ABOUT THE FILEDS LAYOUT CONSIDERING CACHE EFFECTS */
-    volatile int counter;
-    int notifyHandler;
     int stealWorkHandler;
-    CkGroupID nodeHelperID;
     FuncNodeHelper *thisNodeHelper;
-    Queue reqQ; /* The queue may be simplified for better performance */
-    
-    /* The following two vars are for usage of detecting completion in dynamic scheduling */
-    CmiNodeLock reqLock;
-    /* To reuse such Task memory as each SingleHelper (i.e. a PE) will only
-     * process one node-level parallelization at a time */
-    Task **tasks; /* Note the Task type is a message */
-    
     CurLoopInfo *curLoop; /* Points to the current loop that is being processed */
     
 public:
-    FuncSingleHelper(CkGroupID nid):nodeHelperID(nid) {
-        reqQ = CqsCreate();
-
-        reqLock = CmiCreateLock();
-        counter = 0;
-        
-        tasks = new Task *[FuncNodeHelper::MAX_CHUNKS];
-        for(int i=0; i<FuncNodeHelper::MAX_CHUNKS; i++) tasks[i] = new (8*sizeof(int)) Task();
-        
-        CProxy_FuncNodeHelper fh(nodeHelperID);
+    FuncSingleHelper(CkGroupID nid) {        
+        CProxy_FuncNodeHelper fh(nid);
         thisNodeHelper = fh[CkMyNode()].ckLocalBranch();
-        CmiAssert(thisNodeHelper!=NULL);
-        
-        notifyHandler = CmiRegisterHandler((CmiHandler)NotifySingleHelper);
+        CmiAssert(thisNodeHelper!=NULL);        
         stealWorkHandler = CmiRegisterHandler((CmiHandler)SingleHelperStealWork);
-            
-        curLoop = new CurLoopInfo();
-        curLoop->redBufs = new void *[FuncNodeHelper::MAX_CHUNKS];
-        for(int i=0; i<FuncNodeHelper::MAX_CHUNKS; i++) curLoop->redBufs[i] = (void *)(tasks[i]->redBuf);
+        curLoop = new CurLoopInfo(FuncNodeHelper::MAX_CHUNKS);
     }
 
     ~FuncSingleHelper() {
-        for(int i=0; i<FuncNodeHelper::MAX_CHUNKS; i++) delete tasks[i];
-        delete [] tasks;        
-               CmiDestroyLock(reqLock);
         delete curLoop;
     }
     
     FuncSingleHelper(CkMigrateMessage *m) {}
-    
-    Task **getTasksMem() { return tasks; }
-    
-    void enqueueWork(Task *one) {
-               unsigned int t = 0; /* default priority */
-        CmiLock(reqLock);
-        CqsEnqueueGeneral(reqQ, (void *)one,CQS_QUEUEING_IFIFO,0,&t);
-        //SimpleQueuePush(reqQ, (char *)one);
-        CmiUnlock(reqLock);
-    }
-    void processWork(int filler); /* filler is here in order to use CkEntryOptions for setting msg priority */
+               
     void reportCreated() {
         //CkPrintf("Single helper %d is created on rank %d\n", CkMyPe(), CkMyRank());
-        if(thisNodeHelper->mode == NODEHELPER_DYNAMIC)
-            thisNodeHelper->oneHelperCreated(CkMyRank(), thishandle, this, -1);
-        else if(thisNodeHelper->mode == NODEHELPER_STATIC)
-            thisNodeHelper->oneHelperCreated(CkMyRank(), thishandle, this, notifyHandler);
-        else if(thisNodeHelper->mode == NODEHELPER_CHARE_DYNAMIC)
-            thisNodeHelper->oneHelperCreated(CkMyRank(), thishandle, this, stealWorkHandler);
+               thisNodeHelper->oneHelperCreated(CkMyRank(), this, stealWorkHandler);
     }    
 };
 
index 948b73bb2e9861ddf9a41efbdc35480b00313ecb..915a675d7b9de7fb1f99af8d98a5c604cc6c04df 100644 (file)
@@ -6,11 +6,6 @@
 /* "result" is the buffer for reduction result on a single simple-type variable */
 typedef void (*HelperFn)(int first,int last, void *result, int paramNum, void *param);
 
-#define NODEHELPER_PTHREAD 0
-#define NODEHELPER_DYNAMIC  1
-#define NODEHELPER_STATIC 2
-#define NODEHELPER_CHARE_DYNAMIC 3
-
 typedef enum REDUCTION_TYPE{
     NODEHELPER_NONE=0,
     NODEHELPER_INT_SUM,
@@ -19,8 +14,8 @@ typedef enum REDUCTION_TYPE{
 }REDUCTION_TYPE;
 
 class CProxy_FuncNodeHelper;
-extern CProxy_FuncNodeHelper NodeHelper_Init(int mode, /* indicates the nodehelper running mode, pthread of non-SMP, dynamic/static of SMP */
-                                            int numThds /* only valid in non-SMP mode, indicating how many pthreads are going to be created*/);
+/* currently only thinking of SMP mode */
+extern CProxy_FuncNodeHelper NodeHelper_Init();
 extern void NodeHelper_Parallelize(
                                                CProxy_FuncNodeHelper nodeHelper, /* the proxy to the FuncNodeHelper instance */
                                                HelperFn func, /* the function that finishes a partial work on another thread */
index 9ec19f2069dbb6acd8ff0322c6f3c03e0a55acce..18751e7004431f01dbd6a4772ad21720fb4e7458 100644 (file)
@@ -65,7 +65,8 @@ Main::Main(CkArgMsg* m) {
     totalElems = 1;
        numChunks = CkMyNodeSize();
        loopTimes = 1000;
-       runningMode = NODEHELPER_STATIC;
+       //runningMode = NODEHELPER_STATIC;
+       runningMode = 3; 
        
     mainStep = 0;
        numElemFinished = 0;
@@ -89,7 +90,8 @@ Main::Main(CkArgMsg* m) {
        CkPrintf("Using NodeHelper Lib with mode: %d, nodesize=%d\n", runningMode, CkMyNodeSize());
        CkPrintf("Testcase info: %d test instances where the loop iterates %d times, each work is partitioned into %d tasks\n", totalElems, loopTimes, numChunks);
        
-       nodeHelperProxy = NodeHelper_Init(runningMode, threadNum);
+       //nodeHelperProxy = NodeHelper_Init(runningMode, threadNum);
+       nodeHelperProxy = NodeHelper_Init();
     mainProxy = thishandle;
     
        //create test instances