Polished/Updated the library and changed the example codes accordingly.
authorChao Mei <chaomei2@illinois.edu>
Sat, 21 Jan 2012 00:13:20 +0000 (18:13 -0600)
committerChao Mei <chaomei2@illinois.edu>
Sat, 21 Jan 2012 00:13:20 +0000 (18:13 -0600)
12 files changed:
Makefile
NodeHelper.C
NodeHelper.ci
NodeHelper.h
NodeHelperAPI.h [new file with mode: 0644]
example/fft-trans/fft1d.C
example/hello.C [deleted file]
example/hello.h [deleted file]
example/simpleLoopBench/Makefile [moved from example/Makefile with 78% similarity]
example/simpleLoopBench/hello.C [new file with mode: 0644]
example/simpleLoopBench/hello.ci [moved from example/hello.ci with 65% similarity]
example/simpleLoopBench/hello.h [new file with mode: 0644]

index f2de6a098a682553cc7cbe1700fa46380520a9c1..4a4c3a65a3e60c9f212e00e473a94f01911dac51 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -1,7 +1,10 @@
-OPTS=-I../ -O3 -g -lpthread
-CHARMC=$(HOME)/charm/net-linux-x86_64-smp-prod/bin/charmc $(OPTS)
-CHARMLIB=$(HOME)/charm/net-linux-x86_64-smp-prod/lib
-CHARMINC=$(HOME)/charm/net-linux-x86_64-smp-prod/include
+OPTS=-I../ -O0 -g -lpthread
+
+CHARMDIR=$(HOME)/curcvs/charm/net-linux-x86_64-smp-dbg
+CHARMC=$(CHARMDIR)/bin/charmc $(OPTS)
+CHARMLIB=$(CHARMDIR)/lib
+CHARMINC=$(CHARMDIR)/include
+
 all: module
 
 clean:
@@ -23,9 +26,9 @@ NodeHelper.decl.h: NodeHelper.ci
        $(CHARMC)  NodeHelper.ci
 
 
-NodeHelper.o: NodeHelper.C NodeHelper.decl.h
+NodeHelper.o: NodeHelper.C NodeHelper.decl.h NodeHelper.h NodeHelperAPI.h
        $(CHARMC) -c NodeHelper.C
 
 install: $(CHARMLIB)/libmoduleNodeHelper.a
-       cp NodeHelper.h NodeHelper.decl.h NodeHelper.def.h $(CHARMINC)/
+       cp NodeHelperAPI.h NodeHelper.h NodeHelper.decl.h NodeHelper.def.h $(CHARMINC)/
 
index 3be214d6be4d0abe6bce5268f0a3c452dd962d83..29ccaa8ca5aa4f4c715ca09a35924e2c25f146ac 100644 (file)
@@ -1,8 +1,7 @@
 #include "NodeHelper.h"
-#define THRESHOLD 100
-#define WPSTHRESHOLD 400
-#define SMP_SUM 1
 
+//=======Beginning of pthread version of scheduling which is used in non-SMP =======//
+#if !CMK_SMP
 NodeQueue Q;
 
 //vars local to spawned threads
@@ -13,7 +12,7 @@ __thread pthread_cond_t condition;
 
 //vars to the main flow (master thread)
 pthread_t * threads;
-pthread_mutex_t **allLocks;    
+pthread_mutex_t **allLocks;
 pthread_cond_t **allConds;
 
 //global barrier
@@ -23,290 +22,359 @@ pthread_barrier_t barr;
 //testing counter
 volatile int finishedCnt;
 
-void * threadWork(void * id){
-       long my_id =(long) id;
-       //printf("thread :%ld\n",my_id);
-       CmiSetCPUAffinity(my_id+1);
-       
-       pthread_mutex_init(&lock, NULL);
-       pthread_cond_init(&condition, NULL);
-       
-       allLocks[my_id] = &lock;
-       allConds[my_id] = &condition;
-       
-       while(1){
-               pthread_mutex_lock(&lock);
-               pthread_cond_wait(&condition,&lock);
-               pthread_mutex_unlock(&lock);
-               void * r;
-               Task * one;
-               CmiLock(Q->lock);
-               CqsDequeue(Q->nodeQ,&r);
-               CmiUnlock(Q->lock);
-               one=(Task *)r;
-           
-               while(one) {
-                       //printf("starttime:%lf,id:%ld,proc:%d\n",CmiWallTimer(),my_id,CkMyPe());
-                       (one->fnPtr)(one->first,one->last,one->result,one->paramNum, one->param);
-                       pthread_barrier_wait(&barr);
-                       //one->setFlag();
-                       //printf
-                       //printf("endtime:%lf,id:%ld\n",CmiWallTimer(),my_id);
-                       
-                       //Testing
-                       //AtomicIncrement(finishedCnt);
-                       if(my_id==0)
-                               finishedCnt=4;
-                       //printf("finishedCnt = %d\n", finishedCnt);
-                       
-                       CmiLock((Q->lock));
-                       CqsDequeue(Q->nodeQ,&r);
-                       CmiUnlock((Q->lock));
-                       one=(Task *)r;
-                       
-               }       
-       }
-       
+void * threadWork(void * id) {
+    long my_id =(long) id;
+    //printf("thread :%ld\n",my_id);
+    CmiSetCPUAffinity(my_id+1);
+
+    pthread_mutex_init(&lock, NULL);
+    pthread_cond_init(&condition, NULL);
+
+    allLocks[my_id] = &lock;
+    allConds[my_id] = &condition;
+
+    while (1) {
+        pthread_mutex_lock(&lock);
+        pthread_cond_wait(&condition,&lock);
+        pthread_mutex_unlock(&lock);
+        void * r;
+        Task * one;
+        CmiLock(Q->lock);
+        CqsDequeue(Q->nodeQ,&r);
+        CmiUnlock(Q->lock);
+        one=(Task *)r;
+
+        while (one) {
+            //printf("starttime:%lf,id:%ld,proc:%d\n",CmiWallTimer(),my_id,CkMyPe());
+            (one->fnPtr)(one->first, one->last, (void *)(one->redBuf), one->paramNum, one->param);
+            pthread_barrier_wait(&barr);
+            //one->setFlag();
+            //printf
+            //printf("endtime:%lf,id:%ld\n",CmiWallTimer(),my_id);
+
+            //Testing
+            //AtomicIncrement(finishedCnt);
+            if (my_id==0)
+                finishedCnt=4;
+            //printf("finishedCnt = %d\n", finishedCnt);
+
+            CmiLock((Q->lock));
+            CqsDequeue(Q->nodeQ,&r);
+            CmiUnlock((Q->lock));
+            one=(Task *)r;
+
+        }
+    }
+
 }
 
-FuncNodeHelper::FuncNodeHelper(int mode_o,int nElements, int threadNum_o){
-       mode=mode_o;
-       threadNum=threadNum_o;
-       numHelpers = CkMyNodeSize();
-               traceRegisterUserEvent("assign work",20);       
-               traceRegisterUserEvent("finish signal",21);     
+void FuncNodeHelper::createThread() {
+    int threadNum = numThds;
+    pthread_attr_t attr;
+    finishedCnt=0;
+    pthread_barrier_init(&barr,NULL,threadNum);
+    allLocks = (pthread_mutex_t **)malloc(sizeof(void *)*threadNum);
+    allConds = (pthread_cond_t **)malloc(sizeof(void *)*threadNum);
+    memset(allLocks, 0, sizeof(void *)*threadNum);
+    memset(allConds, 0, sizeof(void *)*threadNum);
+
+    pthread_attr_init(&attr);
+    pthread_attr_setdetachstate(&attr,PTHREAD_CREATE_JOINABLE);
+    Q=(NodeQueue )malloc(sizeof(struct SimpleQueue));
+    Q->nodeQ=CqsCreate();
+    Q->lock=CmiCreateLock();
+    /*for(int i=0;i<threadNum;i++){
+       //Q[i]=
+       Q[i]=(NodeQueue)malloc(sizeof(struct SimpleQueue));
+       Q[i]->nodeQ=CqsCreate();
+       Q[i]->lock=CmiCreateLock();
+    }*/
+    threads=(pthread_t *)malloc(threadNum*sizeof(pthread_t));
+
+
+    //create queue;
+    for (int i=0; i<threadNum; i++)
+        pthread_create(&threads[i],&attr,threadWork,(void *)i);
+}
+#endif //end of !CMK_SMP (definitions for vars and functions used in non-SMP)
+
+//=======End of pthread version of static scheduling=======//
+
+FuncNodeHelper::FuncNodeHelper(int mode_,int numThds_):
+    mode(mode_), numThds(numThds_)
+{   
+    
+    //CkPrintf("FuncNodeHelper created on node %d\n", CkMyNode());
+         
+    traceRegisterUserEvent("assign work",20);
+    traceRegisterUserEvent("finish signal",21);
+    
 #if CMK_SMP
-       if(mode==1){
-               counter=new int[nElements];
-               reqLock=new  pthread_mutex_t *[nElements];
-               for(int i=0;i<nElements;i++){
-                       counter[i]=0;
-                       reqLock[i] = CmiCreateLock();
-               }
-       }else if(mode==2){
-               helperArr = new CkChareID[numHelpers];
-               helperPtr = new FuncSingleHelper *[numHelpers];
-               int pestart = CkNodeFirst(CkMyNode());
-               for(int i=0; i<numHelpers; i++) {
-                       CProxy_FuncSingleHelper::ckNew(i, thisgroup, &helperArr[i], pestart+i);    
-                       helperPtr[i] = NULL;
-               }
-               for(int i=0; i<numHelpers; i++) {
-                       CProxy_FuncSingleHelper helpProxy(helperArr[i]);
-                       helpProxy.reportCreated();
-               }
-       }
+    if (mode==NODEHELPER_DYNAMIC || 
+        mode==NODEHELPER_STATIC) {
+        numHelpers = CkMyNodeSize();
+        helperArr = new CkChareID[numHelpers];
+        helperPtr = new FuncSingleHelper *[numHelpers];
+        int pestart = CkNodeFirst(CkMyNode());
+        for (int i=0; i<numHelpers; i++) {
+            CProxy_FuncSingleHelper::ckNew(thisgroup, &helperArr[i], pestart+i);
+            helperPtr[i] = NULL;
+        }
+        for (int i=0; i<numHelpers; i++) {
+            CProxy_FuncSingleHelper helpProxy(helperArr[i]);
+            helpProxy.reportCreated();
+        }
+    }
+#else
+    CmiAssert(mode==NODEHELPER_PTHREAD);
+    createThread();    
 #endif
 }
-void FuncNodeHelper::createThread(){
-
-               pthread_attr_t attr;
-               finishedCnt=0;
-               pthread_barrier_init(&barr,NULL,threadNum);     
-               allLocks = (pthread_mutex_t **)malloc(sizeof(void *)*threadNum);
-               allConds = (pthread_cond_t **)malloc(sizeof(void *)*threadNum);
-               memset(allLocks, 0, sizeof(void *)*threadNum);  
-               memset(allConds, 0, sizeof(void *)*threadNum);  
-               
-               pthread_attr_init(&attr);
-               pthread_attr_setdetachstate(&attr,PTHREAD_CREATE_JOINABLE);
-               Q=(NodeQueue )malloc(sizeof(struct SimpleQueue));
-                       Q->nodeQ=CqsCreate();
-                       Q->lock=CmiCreateLock();
-               /*for(int i=0;i<threadNum;i++){
-                       //Q[i]=
-                       Q[i]=(NodeQueue)malloc(sizeof(struct SimpleQueue));
-                       Q[i]->nodeQ=CqsCreate();
-                       Q[i]->lock=CmiCreateLock();
-               }*/
-               threads=(pthread_t *)malloc(threadNum*sizeof(pthread_t));
-               
+
+/* Used for dynamic scheduling as it's a node-level msg */
+/* So this function will be executed on any PE of this node */
+void FuncNodeHelper::send(Task * msg) {
+    (msg->fnPtr)(msg->first,msg->last,(void *)(msg->redBuf),msg->paramNum, msg->param);
+    CmiNodeLock lock = helperPtr[msg->originRank]->reqLock;
+    CmiLock(lock);
+    helperPtr[msg->originRank]->counter++;
+    CmiUnlock(lock);
+}
+
+int FuncNodeHelper::MAX_CHUNKS = 64;
+
+#if CMK_TRACE_ENABLED
+#define TRACE_START(id) _start = CmiWallTimer()
+#define TRACE_BRACKET(id) traceUserBracketEvent(id,_start,CmiWallTimer())
+#else
+#define TRACE_START(id)
+#define TRACE_BRACKET(id)
+#endif
+
+void FuncNodeHelper::parallelizeFunc(HelperFn func, int paramNum, void * param, 
+                                    int msgPriority, int numChunks, int lowerRange, int upperRange, 
+                                    void *redResult, REDUCTION_TYPE type) {
+                                        
+    double _start; //may be used for tracing
+    
+    if(numChunks > MAX_CHUNKS){ 
+        CkPrintf("NodeHelper[%d]: WARNING! chunk is set to MAX_CHUNKS=%d\n", CmiMyPe(), MAX_CHUNKS);
+        numChunks = MAX_CHUNKS;
+    }
+    
+    Task **task = helperPtr[CkMyRank()]->getTasksMem();
+
+    int first = lowerRange;
+    int unit = (upperRange-lowerRange+1)/numChunks;
+    int remainder = (upperRange-lowerRange+1)-unit*numChunks;
+    
+    /* "stride" determines the number of loop iterations to be done in each chunk
+     * for chunk indexed at 0 to remainder-1, stride is "unit+1";
+     * for chunk indexed at remainder to numChunks-1, stride is "unit"
+     */
+     int stride;
+    
+    //for using nodequeue
+#if CMK_SMP
+    if (mode==NODEHELPER_DYNAMIC) {
+        CProxy_FuncNodeHelper fh(thisgroup);
+
+        TRACE_START(20);        
+        stride = unit+1;
+        for (int i=0; i<remainder; i++, first+=stride) {          
+            task[i]->init(func, first, first+stride, CkMyRank(), paramNum, param);
+            *((int *)CkPriorityPtr(task[i]))=msgPriority;
+            CkSetQueueing(task[i],CK_QUEUEING_IFIFO);
+            fh[CkMyNode()].send(task[i]);
+        }
+        
+        stride = unit;
+        for(int i=remainder; i<numChunks; i++, first+=stride) {
+            task[i]->init(func, first, first+stride, CkMyRank(), paramNum, param);
+            *((int *)CkPriorityPtr(task[i]))=msgPriority;
+            CkSetQueueing(task[i],CK_QUEUEING_IFIFO);
+            fh[CkMyNode()].send(task[i]);
+        }
+        TRACE_BRACKET(20);
         
-               //create queue;
-               for(int i=0;i<threadNum;i++)
-                       pthread_create(&threads[i],&attr,threadWork,(void *)i);
-               
+        TRACE_START(21);
+        FuncSingleHelper *fs = helperPtr[CmiMyRank()];
+        while (fs->counter!=numChunks)
+            CsdScheduleNodePoll();
+        //CkPrintf("counter:%d,master:%d\n",counter[master],master);
+        fs->counter = 0;
+        TRACE_BRACKET(21);        
+    } else if (mode==NODEHELPER_STATIC) {
+        TRACE_START(20);
+                
+        stride = unit+1;
+        for (int i=0; i<remainder; i++, first+=stride) {
+            task[i]->init(func, first, first+stride, 0, CkMyRank(),paramNum, param);            
+            helperPtr[i%CkMyNodeSize()]->enqueueWork(task[i]);
+        }
+        
+        stride = unit;
+        for (int i=remainder; i<numChunks; i++, first+=stride) {
+            task[i]->init(func, first, first+stride, 0, CkMyRank(),paramNum, param);            
+            helperPtr[i%CkMyNodeSize()]->enqueueWork(task[i]);
+        }
+        
+        CkEntryOptions entOpts;
+        entOpts.setPriority(msgPriority);
+
+        for (int i=0; i<numHelpers; i++) {
+            if (i!=CkMyRank()) {
+                CProxy_FuncSingleHelper helpProxy(helperArr[i]);                
+                helpProxy.processWork(0, &entOpts);
+            }
+        }
+        helperPtr[CkMyRank()]->processWork(0);
+        
+        TRACE_BRACKET(20);
+        
+        TRACE_START(21);
+        waitDone(task,numChunks);
+        TRACE_BRACKET(21);
+    }
+#else
+//non-SMP case
+/* Only works in the non-SMP case */
+    CmiAssert(mode == NODEHELPER_PTHREAD);
+    
+    TRACE_START(20);
+    stride = unit+1;
+    for (int i=0; i<remainder; i++, first+=stride) {
+        task[i]->init(func, first, first+stride, 0, CkMyRank(),paramNum, param);            
+        CmiLock((Q->lock));
+        unsigned int t=(int)(CmiWallTimer()*1000);
+        CqsEnqueueGeneral((Q->nodeQ), (void *)task[i],CQS_QUEUEING_IFIFO,0,&t);
+        CmiUnlock((Q->lock));
+    }
+    
+    stride = unit;
+    for (int i=remainder; i<numChunks; i++, first+=stride) {
+        task[i]->init(func, first, first+stride, 0, CkMyRank(),paramNum, param);            
+        CmiLock((Q->lock));
+        unsigned int t=(int)(CmiWallTimer()*1000);
+        CqsEnqueueGeneral((Q->nodeQ), (void *)task[i],CQS_QUEUEING_IFIFO,0,&t);
+        CmiUnlock((Q->lock));
+    }    
+    //signal the thread
+    for (int i=0; i<threadNum; i++) {
+        pthread_mutex_lock(allLocks[i]);
+        pthread_cond_signal(allConds[i]);
+        pthread_mutex_unlock(allLocks[i]);
+    }
+    TRACE_BRACKET(20);
+    
+    TRACE_START(21);
+    //wait for the result
+    waitThreadDone(numChunks);
+    TRACE_BRACKET(21);
+#endif
+
+    if (type!=NODEHELPER_NONE)
+        reduce(task, redResult, type, numChunks);            
+    return;
 }
-void FuncNodeHelper::send(Task * msg){
-       (msg->fnPtr)(msg->first,msg->last,msg->result,msg->paramNum, msg->param);
-       CmiLock(reqLock[msg->master]);
-       counter[msg->master]++;
-       CmiUnlock(reqLock[msg->master]);
+
+#define COMPUTE_REDUCTION(T) {\
+    for(int i=0; i<numChunks; i++) {\
+     result += *((T *)(thisReq[i]->redBuf)); \
+     /*CkPrintf("Nodehelper Reduce: %d\n", result);*/ \
+    }\
+}
+
+void FuncNodeHelper::reduce(Task ** thisReq, void *redBuf, REDUCTION_TYPE type, int numChunks) {
+    switch(type){
+        case NODEHELPER_INT_SUM:
+        {
+            int result=0;
+            COMPUTE_REDUCTION(int)
+            *((int *)redBuf) = result;
+            break;
+        }
+        case NODEHELPER_FLOAT_SUM:
+        {
+            float result=0;
+            COMPUTE_REDUCTION(float)
+            *((float *)redBuf) = result;
+            break;
+        }
+        case NODEHELPER_DOUBLE_SUM:
+        {
+            double result=0;
+            COMPUTE_REDUCTION(double)
+            *((double *)redBuf) = result;
+            break;
+        }
+        default:
+        break;
+    }
 }
 
-/**
-"func": the function that executes one partition of the task.
-
-"wps": the number of executions needed on this func (it is there for the purpose of testing.)
-"time": the time to finish this func, it is a user defined argument for testing, wps is calculated based on time.
-(the above two parameters could be ignored for real usage)
-
-"t": the priority bit for the nodegroup msg, it is the time initiating the nodehelper.
-
-"master": the arrayIndex of the array element that initiate the nodehelper, or the pe index if invoked on a group object. The master element is used to identify the corresponding lock and counter to find out whether all the partitioned tasks have been finished.
-
-"chunck": the number of tasks that the nodehelper will distribute among the nodegroup. If it not available through the user argument, it will calculate like:
-if(chunck==0){
-         if(time!=0)
-             chunck=(double)(time/THRESHOLD)+0.5;
-         else
-             chunck=(double)(wps/WPSTHRESHOLD)+0.5;
-  }
-
-"reduction": whether it is an reduction operation, it will return the reduction result if it is.
-"type": the reduction type.
-
-"MODE" means whether the scheduling is static or dynamic. If static, then the partitioned tasks will be assigned to threads evenly within the SMP nodes. If dynamic, then the threads will execute the task if they are idle at the time of the loop job. 
- * 
- */
-int FuncNodeHelper::parallelizeFunc(HelperFn func, int wps,unsigned int t, int master,int chunck,int time,int paramNum, void * param, int reduction, int type){
-       int result=0;
-       if(chunck==0){
-               if(time!=0)
-                       chunck=(double)(time/THRESHOLD)+0.5;
-               else
-                       chunck=(double)(wps/WPSTHRESHOLD)+0.5;
-       }
-       int unit=((double)wps)/(double)chunck+0.5;
-       //printf("use %d chuncks for testcase %d\n",chunck,master);
-       Task **task=new Task *[chunck];
-    //for using nodequeue 
 #if CMK_SMP
-       if(mode==1){
-#if 0
-//Note: CsdScheduleNodePoll has not been in the charm yet, so currently disable it
-               CProxy_FuncNodeHelper fh(thisgroup);
-               double _start=CmiWallTimer();
-               for(int i=0; i<chunck; i++) {
-                       int first=unit*i;
-                       int last= (i==chunck-1)?wps:(i+1)*unit-1;
-                       task[i]=new (8*sizeof(int)) Task(func,first,last,master, paramNum, param);
-                       *((int *)CkPriorityPtr(task[i]))=t;
-                       CkSetQueueing(task[i],CK_QUEUEING_IFIFO);
-                       fh[CkMyNode()].send(task[i]);
-                       //send(task[i]);
-               }
-               traceUserBracketEvent(20,_start,CmiWallTimer());
-               _start=CmiWallTimer();
-               while(counter[master]!=chunck)
-                       CsdScheduleNodePoll();
-               //CkPrintf("counter:%d,master:%d\n",counter[master],master);
-               traceUserBracketEvent(21,_start,CmiWallTimer());
-               counter[master]=0;
-               /*for(int i=0;i<chunck;i++){
-                       result+=task[i]->result;
-               }*/
-#endif
-       }
-       else if(mode==2){
-       // for not using node queue
-               for(int i=0; i<chunck; i++) {
-                       int first=unit*i;
-                       int last= (i==chunck-1)?wps:(i+1)*unit-1;
-                       task[i] = new Task(func,first,last,0,master,paramNum, param);
-                       //task[i]->UnsetFlag();
-                       helperPtr[i%CkMyNodeSize()]->enqueueWork(task[i],t);
-               }
-               for(int i=0; i<numHelpers; i++) {        
-                       if(i!=CkMyRank()){
-                               CProxy_FuncSingleHelper helpProxy(helperArr[i]);
-                               helpProxy.processWork();
-                       }
-               }
-               helperPtr[CkMyRank()]->processWork();
-  
-               waitDone(task,chunck);
-               result=0;
-               
-               /*for(int i=0;i<chunck;i++){
-               result+=(task[i]->result);
-               }*/
-       }
+void FuncNodeHelper::waitDone(Task ** thisReq,int chunck) {
+    int flag = 1,i;
+    while (1) {
+        for (i=0; i<chunck; i++)
+            flag = flag & thisReq[i]->isFlagSet();
+        if (flag) break;
+        flag = 1;
+    }
+}
 #else
-       if(mode==0){
-               for(int i=0;i<chunck;i++)
-               {
-                       int first = unit*i;
-                       int last=(i==chunck-1)?wps:(i+1)*unit-1;
-                       task[i]=new Task(func,first,last,0,master, paramNum, param);
-                       CmiLock((Q->lock));
-                       unsigned int t=(int)(CmiWallTimer()*1000);
-                       CqsEnqueueGeneral((Q->nodeQ), (void *)task[i],CQS_QUEUEING_IFIFO,0,&t);
-                       CmiUnlock((Q->lock));                           
-               }
-               //signal the thread
-               for(int i=0;i<threadNum;i++){
-                       pthread_mutex_lock(allLocks[i]);
-                       pthread_cond_signal(allConds[i]);
-                       pthread_mutex_unlock(allLocks[i]);
-               }
-               //wait for the result
-               waitThreadDone(chunck);
-               //for(int i=0;i<threadNum;i++)
-               //      pthread_join(threads[i],NULL);
-               /*result=0;
-               for(int i=0;i<chunck;i++)
-                       result+=task[i]->result;
-               */
-       }
-#endif 
-       if(reduction==1)
-               result=reduce(task, type,chunck);
-       delete task;
-       return result;
+void FuncNodeHelper::waitThreadDone(int chunck) {
+    while (finishedCnt!=chunck);
+    finishedCnt=0;
 }
+#endif
 
-//======================================================================
-// Functions regarding helpers that parallelize a single function on a
-// single node (like OpenMP)
-void FuncSingleHelper::processWork(){
-       //CmiLock(reqLock);
-       void *r;
-    Task *one; // = (WorkReqEntry *)SimpleQueuePop(reqQ);
-       CmiLock(reqLock);       
-       CqsDequeue(reqQ,&r);
-       CmiUnlock(reqLock);
-       one=(Task *)r;
+//======================================================================//
+// Functions regarding helpers that parallelize a single function on a  //
+// single node (like OpenMP)                                            // 
+//======================================================================//
+void FuncSingleHelper::processWork(int filler) {
+    Task *one = NULL; // = (WorkReqEntry *)SimpleQueuePop(reqQ);    
+    void *tmp;
     
-    while(one) {
-        (one->fnPtr)(one->first,one->last,one->result, one->paramNum, one->param);
+    CmiLock(reqLock);
+    CqsDequeue(reqQ, &tmp);
+    CmiUnlock(reqLock);    
+
+    one = (Task *)tmp;
+    while (one) {
+        (one->fnPtr)(one->first,one->last,(void *)(one->redBuf), one->paramNum, one->param);
+        int *partial = (int *)(one->redBuf);
+        //CkPrintf("SingleHelper[%d]: partial=%d\n", CkMyRank(), *partial);
         one->setFlag();
-               CmiLock(reqLock);
-               CqsDequeue(reqQ,&r);
-               CmiUnlock(reqLock);
-               one=(Task *)r;
-               
-    }    
+        CmiLock(reqLock);
+        CqsDequeue(reqQ, &tmp);
+        one = (Task *)tmp;
+        CmiUnlock(reqLock);
+    }
 }
 
-void FuncSingleHelper::reportCreated(){
+void FuncSingleHelper::reportCreated() {
+    //CkPrintf("Single helper %d is created on rank %d\n", CkMyPe(), CkMyRank());
     CProxy_FuncNodeHelper fh(nodeHelperID);
     CProxy_FuncSingleHelper thisproxy(thishandle);
-    fh[CkMyNode()].ckLocalBranch()->oneHelperCreated(id, thishandle, this);
+    fh[CkMyNode()].ckLocalBranch()->oneHelperCreated(CkMyRank(), thishandle, this);
 }
+//======================================================================//
+//   End of functions related with FuncSingleHelper                     //
+//======================================================================//
 
-void FuncNodeHelper::waitDone(Task ** thisReq,int chunck){
-    int flag = 1,i;
-    while(1) {
-        for(i=0; i<chunck; i++) 
-            flag = flag & thisReq[i]->isFlagSet();
-        if(flag) break;
-        flag = 1;
-    }
+CProxy_FuncNodeHelper NodeHelper_Init(int mode,int numThds){
+    return CProxy_FuncNodeHelper::ckNew(mode, numThds);
 }
-int FuncNodeHelper::reduce(Task ** thisReq, int type,int chunck){
-       int result=0,i;
-       if(type==SMP_SUM){
-               for(i=0;i<chunck;i++){
-                       result+=thisReq[i]->result;
-               //CkPrintf("result:%d\n",result);
-               }
-       }
-       return result;
-}
-void FuncNodeHelper::waitThreadDone(int chunck){
-       while(finishedCnt!=chunck);
-       finishedCnt=0;
+
+void NodeHelper_Parallelize(CProxy_FuncNodeHelper nodeHelper, HelperFn func, 
+                        int paramNum, void * param, int msgPriority,
+                        int numChunks, int lowerRange, int upperRange, 
+                        void *redResult, REDUCTION_TYPE type)
+{
+    nodeHelper[CkMyNode()].ckLocalBranch()->parallelizeFunc(func, paramNum, param, msgPriority, numChunks, lowerRange, upperRange, redResult, type);
 }
 
 #include "NodeHelper.def.h"
index 990f19d845cc268b67b68750720576fc4859ce48..fdc512da71899701ad097205f63a6f6b149a5913 100644 (file)
@@ -2,12 +2,12 @@ module NodeHelper{
 
        message Task;
        nodegroup FuncNodeHelper{
-               entry FuncNodeHelper(int mode, int nElements, int threadNum);
+               entry FuncNodeHelper(int mode, int numThds);
                entry void send(Task * msg);
        };
        chare FuncSingleHelper{
-               entry FuncSingleHelper(int, CkGroupID);
-               entry void processWork();
+               entry FuncSingleHelper(CkGroupID);
+               entry void processWork(int);
                entry void reportCreated();
                
        };
index fe356d5efd46c3e444deca38a1d04c19c711af1d..931e59ca918a813000e30cc068a4a23ef989ebb2 100644 (file)
 #define _NODEHELPER_H
 
 #include <pthread.h>
+#include <assert.h>
 
 #include "charm++.h"
-#include "NodeHelper.decl.h"
-#include <assert.h>
+#include "NodeHelperAPI.h"
 #include "queueing.h"
-#include <converse.h>
+
 #define AtomicIncrement(someInt)  __asm__ __volatile__("lock incl (%0)" :: "r" (&(someInt)))
-#define SMP_SUM 1
-typedef void (*HelperFn)(int first,int last, int &result, int paramNum, void * param_o);
 
-typedef struct SimpleQueue
-{
-       Queue nodeQ;
-       pthread_mutex_t * lock;
+
+typedef struct SimpleQueue {
+    Queue nodeQ;
+    pthread_mutex_t * lock;
 }* NodeQueue;
-class Task:public CMessage_Task{
+
+class Task:public CMessage_Task {
 public:
-       HelperFn fnPtr;
-       int first;
-       int last;
-       int result;
-       int master;
-       int flag;
-       int reduction;
-       int paramNum;
-       void * param;
-       Task(HelperFn fn,int first_o,int last_o,int master_o){
-               fnPtr=fn;
-               first=first_o;
-               last=last_o;
-               master=master_o;
-       }
-       
-       Task(HelperFn fn,int first_o,int last_o,int flag_o,int master_o){
-               fnPtr=fn;
-               first=first_o;
-               last=last_o;
-               flag=flag_o;
-               master=master_o;
-       }
-       Task(HelperFn fn,int first_o,int last_o,int master_o, int paramNum_o, void * param_o){
-               fnPtr=fn;
-               first=first_o;
-               last=last_o;
-               master=master_o;
-               //reduction=reduction_o;
-               paramNum=paramNum_o;
-               param=param_o;
-       }
-       Task(HelperFn fn,int first_o,int last_o,int flag_o,int master_o, int paramNum_o, void * param_o){
-               fnPtr=fn;
-               first=first_o;
-               last=last_o;
-               master=master_o;
-               flag=flag_o;
-               //reduction=reduction_o;
-               paramNum=paramNum_o;
-               param=param_o;
-       }
-       void setFlag(){
-               flag=1;
-       }
-       int isFlagSet(){
-               return flag;
-       }
+    HelperFn fnPtr;
+    int first;
+    int last;
+    int originRank;
+    int flag;    
+    int paramNum;
+    void *param;
+    
+    //limitation: only allow single variable reduction!!!
+    char redBuf[sizeof(double)];
+    
+    //make sure 32-byte aligned so that each task doesn't cross cache lines
+    //char padding[32-(sizeof(int)*7+sizeof(void *)*2)%32];
+    
+    Task():fnPtr(NULL), param(NULL), paramNum(0) {}
+
+    void init(HelperFn fn,int first_,int last_,int rank){
+        fnPtr = fn;
+        first = first_;
+        last = last_;
+        originRank = rank;
+    }
+
+    void init(HelperFn fn,int first_,int last_,int flag_,int rank) {
+        init(fn, first_, last_, rank);
+        flag=flag_;
+    }
+    
+    void init(HelperFn fn,int first_,int last_,int rank, int paramNum_, void *param_) {
+        init(fn, first_, last_, rank); 
+        paramNum=paramNum_;
+        param=param_;
+    }
+    
+    void init(HelperFn fn,int first_,int last_,int flag_,int rank, int paramNum_, void *param_) {
+        init(fn, first_, last_, rank, paramNum_, param_);
+        flag=flag_;
+    }
+    
+    void setFlag() {
+        flag=1;
+    }
+    int isFlagSet() {
+        return flag;
+    }
 };
 
 
+/* FuncNodeHelper is a nodegroup object */
+class FuncSingleHelper;
+
+class FuncNodeHelper : public CBase_FuncNodeHelper {
+public:
+    static int MAX_CHUNKS;
 
-class FuncSingleHelper: public CBase_FuncSingleHelper{
+public:
+    int numHelpers;
+    int mode; /* determine whether using dynamic or static scheduling */
+    
+    int numThds; /* only used for pthread version in non-SMP case, the expected #pthreads to be created */
+    
+    CkChareID *helperArr; /* chare ids to the FuncSingleHelpers it manages */
+    FuncSingleHelper **helperPtr; /* ptrs to the FuncSingleHelpers it manages */
+    
+    ~FuncNodeHelper() {
+        delete [] helperArr;
+        delete [] helperPtr;        
+    }
+
+    void oneHelperCreated(int hid, CkChareID cid, FuncSingleHelper* cptr) {
+        helperArr[hid] = cid;
+        helperPtr[hid] = cptr;
+    }
+
+#if CMK_SMP
+    void  waitDone(Task ** thisReq,int chunck);
+#else  
+    void waitThreadDone(int chunck);
+    void createThread();
+#endif
+       
+       /* mode_: PTHREAD only available in non-SMP, while STATIC/DYNAMIC are available in SMP */
+       /* numThds: the expected number of pthreads to be spawned */
+    FuncNodeHelper(int mode_, int numThds_);
+    
+    void parallelizeFunc(HelperFn func, /* the function that finishes a partial work on another thread */
+                        int paramNum, void * param, /* the input parameters for the above func */
+                        int msgPriority, /* the priority of the intra-node msg, and node-level msg */
+                        int numChunks, /* number of chunks to be partitioned */
+                        int lowerRange, int upperRange, /* the loop-like parallelization happens in [lowerRange, upperRange] */                        
+                        void *redResult=NULL, REDUCTION_TYPE type=NODEHELPER_NONE /* the reduction result, ONLY SUPPORT SINGLE VAR of TYPE int/float/double */
+                        );
+    void send(Task *);
+    void reduce(Task **thisReq, void *redBuf, REDUCTION_TYPE type, int numChunks);
+
+};
+
+/* FuncSingleHelper is a chare located on every core of a node */
+class FuncSingleHelper: public CBase_FuncSingleHelper {
+       friend class FuncNodeHelper;
 private:
     CkGroupID nodeHelperID;
-    int id;
-    Queue reqQ;
-    pthread_mutex_t* reqLock;
+    Queue reqQ; /* The queue may be simplified for better performance */
+    
+    /* The following two vars are for usage of detecting completion in dynamic scheduling */
+    CmiNodeLock reqLock;
+    int counter; 
+    
+    /* To reuse such Task memory as each SingleHelper (i.e. a PE) will only
+     * process one node-level parallelization at a time */
+    Task **tasks; /* Note the Task type is a message */
 
 public:
-    FuncSingleHelper(int myid, CkGroupID nid):id(myid),nodeHelperID(nid){
-       //CkPrintf("Single helper %d is created on rank %d\n", myid, CkMyRank());
+    FuncSingleHelper(CkGroupID nid):nodeHelperID(nid) {
         reqQ = CqsCreate();
+
         reqLock = CmiCreateLock();
+        counter = 0;
+        
+        tasks = new Task *[FuncNodeHelper::MAX_CHUNKS];
+        for(int i=0; i<FuncNodeHelper::MAX_CHUNKS; i++) tasks[i] = new (8*sizeof(int)) Task();
     }
 
-    ~FuncSingleHelper(){}
-    FuncSingleHelper(CkMigrateMessage *m){}
-    void enqueueWork(Task *one,unsigned int t){
-        //CmiLock(reqLock);
-               //unsigned int t;
-               //t=(int)(CmiWallTimer()*1000);
-               CmiLock(reqLock);
-               CqsEnqueueGeneral(reqQ, (void *)one,CQS_QUEUEING_IFIFO,0,&t);
+    ~FuncSingleHelper() {
+        for(int i=0; i<FuncNodeHelper::MAX_CHUNKS; i++) delete tasks[i];
+        delete [] tasks;        
+               CmiDestroyLock(reqLock);
+    }
+    
+    FuncSingleHelper(CkMigrateMessage *m) {}
+    
+    Task **getTasksMem() { return tasks; }
+    
+    void enqueueWork(Task *one) {
+               unsigned int t = 0; /* default priority */
+        CmiLock(reqLock);
+        CqsEnqueueGeneral(reqQ, (void *)one,CQS_QUEUEING_IFIFO,0,&t);
         //SimpleQueuePush(reqQ, (char *)one);
-               CmiUnlock(reqLock);
+        CmiUnlock(reqLock);
     }
-    void processWork();
+    void processWork(int filler); /* filler is here in order to use CkEntryOptions for setting msg priority */
     void reportCreated();
 };
-class FuncNodeHelper : public CBase_FuncNodeHelper{  
 
-public:
-       int numHelpers;
-       int mode;
-       int * counter;
-       int threadNum;
-       pthread_mutex_t** reqLock;
-       CkChareID *helperArr;
-       FuncSingleHelper **helperPtr;
-       ~FuncNodeHelper(){
-               delete [] helperArr;
-               delete [] helperPtr;
-       }
-    
-       void oneHelperCreated(int hid, CkChareID cid, FuncSingleHelper* cptr){
-               helperArr[hid] = cid;
-               helperPtr[hid] = cptr;
-       }
-       
-       void  waitDone(Task ** thisReq,int chunck);
-       void waitThreadDone(int chunck);
-       void createThread();
-       FuncNodeHelper(int mode,int elements, int threadNum);
-       int parallelizeFunc(HelperFn func, int wps,unsigned int t, int master,int chunck,int time, int paramNum, void * param, int reduction, int type);
-       void send(Task *);
-       int reduce(Task ** thisReq, int type, int chunck);
-
-};
-
-       
 #endif
diff --git a/NodeHelperAPI.h b/NodeHelperAPI.h
new file mode 100644 (file)
index 0000000..0b26195
--- /dev/null
@@ -0,0 +1,32 @@
+#ifndef _NODEHELPERAPI_H
+#define _NODEHELPERAPI_H
+
+#include "NodeHelper.decl.h"
+
+/* "result" is the buffer for reduction result on a single simple-type variable */
+typedef void (*HelperFn)(int first,int last, void *result, int paramNum, void *param);
+
+#define NODEHELPER_PTHREAD 0
+#define NODEHELPER_DYNAMIC  1
+#define NODEHELPER_STATIC 2
+
+typedef enum REDUCTION_TYPE{
+    NODEHELPER_NONE=0,
+    NODEHELPER_INT_SUM,
+    NODEHELPER_FLOAT_SUM,
+    NODEHELPER_DOUBLE_SUM
+}REDUCTION_TYPE;
+
+class CProxy_FuncNodeHelper;
+extern CProxy_FuncNodeHelper NodeHelper_Init(int mode, /* indicates the nodehelper running mode, pthread of non-SMP, dynamic/static of SMP */
+                                            int numThds /* only valid in non-SMP mode, indicating how many pthreads are going to be created*/);
+extern void NodeHelper_Parallelize(
+                                               CProxy_FuncNodeHelper nodeHelper, /* the proxy to the FuncNodeHelper instance */
+                                               HelperFn func, /* the function that finishes a partial work on another thread */
+                        int paramNum, void * param, /* the input parameters for the above func */
+                        int msgPriority, /* the priority of the intra-node msg, and node-level msg */
+                        int numChunks, /* number of chunks to be partitioned */
+                        int lowerRange, int upperRange, /* the loop-like parallelization happens in [lowerRange, upperRange] */                        
+                        void *redResult=NULL, REDUCTION_TYPE type=NODEHELPER_NONE /* the reduction result, ONLY SUPPORT SINGLE VAR of TYPE int/float/double */
+                        );
+#endif
index 6565ad7f992723c677ec0a549a9befa7b4d7d194..526c12389c3c196965cf3638ab0b5170677fd197 100644 (file)
@@ -131,6 +131,7 @@ struct fft : public CBase_fft {
     plan= new fft_plan[numThreads];
     for(int i=0; i < numThreads; i++,offset+=nPerThread)
       {
+       /* ??? should the dist be nPerThread as the fft is performed as 1d of length nPerThread?? */
        plan[i] = fft_plan_many_dft(1, length, N/numChunks/numThreads, out+offset, length, 1, N/numThreads,
                             out+offset, length, 1, N/numThreads, FFTW_FORWARD, FFTW_ESTIMATE);
       }
diff --git a/example/hello.C b/example/hello.C
deleted file mode 100644 (file)
index 216e340..0000000
+++ /dev/null
@@ -1,223 +0,0 @@
-#include <stdio.h>
-#include <assert.h>
-#include <stdlib.h>
-#include "hello.h"
-
-#include "hello.decl.h"
-
-
-CProxy_Main mainProxy;
-CProxy_TestInstance allTestsProxy;
-CProxy_FuncNodeHelper nodeHelperProxy;
-
-int iteration;
-int flag;// ith iteration
-#define MODE 1
-int threadNum;
-int useNodeQueue;
-
-#define THRESHOLD 100
-extern "C" void doCalc(int first,int last, int & result, int paramNum, void * param);
-
-       void work(int start, int end, int &result){
-               result=0;
-               for(int i=start;i<=end;i++){
-                       result+=(int)(sqrt(1+cos(i*1.57)));
-               }
-               //return result;
-       }
-
-extern "C" void doCalc(int first, int last, int &result, int paramNum, void * param) {
-       result=0;
-       double tstart = CmiWallTimer();
-       work(first, last, result);
-       tstart = CmiWallTimer() - tstart;
-    //printf("start=%d, end=%d, time: %f,result: %d on proc %d\n",first, last, tstart,result,CkMyPe());
-}
-/*mainchare*/
-
-
-Main::Main(CkArgMsg* m) {
-       //number of elements
-        totalElems = 2;
-       flag=0;
-               for(int i=0;i<4;i++){
-                       time[i]=0;
-
-                       chunck[i]=0;
-               }
-       //process command line
-        if (m->argc >1 ) 
-                       processCommandLine(m->argc,m->argv);
-               else 
-                       CkPrintf("usage -t(time) -c(chunk) -n(nodequeue) -a (num of tests) -p (thread)\n");
-        delete m;
-       /*choose mode and initialization
-        mode=0, use pthread;
-        mode=1, use NODEQUEUE;
-        mode=2, not use NODEQUEUE*/
-                       nodeHelperProxy = CProxy_FuncNodeHelper::ckNew(MODE,totalElems,threadNum);
-                       
-       if(MODE==0){
-               
-               FuncNodeHelper *nth = nodeHelperProxy[CkMyNode()].ckLocalBranch();
-               nth->createThread();
-       }
-               int result;
-               wps=0;
-               calTime=0.05;
-
-        mainProxy = thishandle;
-        nodesFinished = 0;
-       CkPrintf("useNodeQueue:%d\n",useNodeQueue);
-        CProxy_cyclicMap myMap = CProxy_cyclicMap::ckNew();
-        CkArrayOptions opts(totalElems);
-        opts.setMap(myMap);
-        allTestsProxy = CProxy_TestInstance::ckNew(opts);
-
-
-        //Start the computation
-        CkPrintf("Running Hello on %d processors for %d test elements\n",
-                 CkNumPes(),totalElems);
-
-        //serial version
-               
-               calibrated=-1;
-
-               const double end_time = CmiWallTimer()+calTime;
-               wps = 0;
-
-               while(CmiWallTimer() < end_time) {
-                       work(0,100,result);
-                       wps+=100;
-               }
-               
-
-               for(int i=0; i < 2; i++) {
-                       const double start_time = CmiWallTimer();
-                       work(0,(int)wps,result);
-                       const double end_time = CmiWallTimer();
-                       const double correction = calTime / (end_time-start_time);
-                       wps *= correction;
-               }
-               calibrated = (int)(wps/calTime);
-               printf("calibrated 1: %d\n", calibrated);
-               double starttime=CmiWallTimer();
-               work(0,calibrated,result);
-               double endtime = CmiWallTimer();
-               printf("Serial time = %lf,result:%d\n",endtime-starttime,result);
-               starttime=CmiWallTimer();
-               work(0,calibrated/2500,result);
-               endtime = CmiWallTimer();
-               printf("Serial time for 400 micro second= %lf,result:%d\n",endtime-starttime,result);
-
-               
-               CmiSetCPUAffinity(0);
-               
-               //use node initialization
-                       
-               CkStartQD(CkIndex_Main::doTests((CkQdMsg *)0), &thishandle);
-    };
-void Main::done(void) {
-    nodesFinished++;
-    if (nodesFinished < totalElems){
-               return;
-       }
-       else{
-               flag++;
-               nodesFinished=0;
-               if(flag<iteration){
-                       
-                       for(int i=0;i<totalElems;i++){
-                               //CkPrintf("time:%d\n",time[i]);
-                               allTestsProxy[i].doTest(flag,(calibrated/(1e6/time[i])),chunck[i],time[i]);
-                       }
-                       return;
-               }
-       }
-        CkPrintf("All done\n");
-        CkExit();
-};
-    
-    void Main:: doTests(CkQdMsg *msg){
-
-               delete msg;
-               int wps=calibrated;
-               for(int i=0;i<totalElems;i++){
-                       allTestsProxy[i].doTest(0,(wps/(1e6/time[i])),chunck[i],time[i]);
-                       //allTestsProxy[8].doTest(0,(wps/(1e6/time[0])),chunck[0],time[0]);
-               }
-    };
-    
-    void Main::processCommandLine(int argc,char ** argv){
-               int i;
-               int j=0;
-               int f=0;
-               for(i=0;i<argc;i++){
-                       if(argv[i][0]=='-'){
-                               switch(argv[i][1]){
-                                       case 't': time[j++]=atoi(argv[++i]);
-                                                         break;
-                                       case 'c': chunck[f++]=atoi(argv[++i]);
-                                                         break;
-                                       case 'n': iteration=atoi(argv[++i]);
-                                                         break;
-                                       case 'a': totalElems=atoi(argv[++i]);
-                                                         break;
-                                       case 'p': threadNum=atoi(argv[++i]);
-                                                         break;
-                               }
-                       }
-               }
-       }
-
-
-
-int cmpDFunc(const void *a, const void *b){
-        double n1 = *(double *)a;
-        double n2 = *(double *)b;
-        if(n1<n2) return -1;
-        if(n1>n2) return 1;
-        return 0;
-}
-
-TestInstance::TestInstance() { 
-       CkPrintf("test case %d is created on proc %d node %d\n", thisIndex, CkMyPe(),CkMyNode());
-               result=0;
-               flag=0;
-       allTimes=(double *)malloc(sizeof(double)*iteration);
-       fflush(stdout);
-    }
-void TestInstance::doTest(int flag,int wps,int chunck,int time){
-      //printf("On proc %d node %d, begin parallel execution for test case %d %dth iteration\n", CkMyPe(), CkMyNode(), thisIndex,flag);    
-       //CkPrintf("wps :%d, flag:%d\n",wps,flag); 
-               timerec = CmiWallTimer();
-                int result;
-                if(chunck==0&&time<=THRESHOLD){
-                        work(0,wps,result);
-                        //printf("On proc %d node %d, Parallel time with %d helpers: %lf for test case %d, result: %d\n", CkMyPe(), CkMyNode(), 0,(CmiWallTimer()-timerec)*1e6, thisIndex,result);
-                        
-                }
-                else{
-
-                       FuncNodeHelper *nth = nodeHelperProxy[CkMyNode()].ckLocalBranch();
-                               unsigned int t;
-                       t=(int)(CmiWallTimer()*1000);
-                       result=nth->parallelizeFunc(doCalc,wps,t,thisIndex,chunck,time,0,NULL, 1,SMP_SUM);
-                       //printf("On proc %d node %d, Parallel time with %d helpers: %lf for test case %d, result: %d\n", CkMyPe(), CkMyNode(), nth->numHelpers,(CmiWallTimer()-timerec)*1e6, thisIndex,result);
-                       
-                }
-               allTimes[flag]=(CmiWallTimer()-timerec)*1e6;
-               if(flag==iteration-1){
-                       qsort(allTimes,iteration,sizeof(double),cmpDFunc);
-                       double sumTimes=0.0;
-                       for(int i=0; i<iteration-3; i++) sumTimes += allTimes[i];
-                       CkPrintf("result:%d,avg iteration time: %.6f [%.6f, %.6f, %.6f] (us)\n",result, sumTimes/(iteration-3), allTimes[0], allTimes[iteration/2], allTimes[iteration-1]);
-               }
-               mainProxy.done();
-                 
-
-}
-
-#include "hello.def.h"
-
diff --git a/example/hello.h b/example/hello.h
deleted file mode 100644 (file)
index f1641cd..0000000
+++ /dev/null
@@ -1,52 +0,0 @@
-#ifndef _HELLO_H
-#define _HELLO_H
-
-#include "charm++.h"
-#include "NodeHelper.h"
-#include "hello.decl.h"
-#include <assert.h>
-//#include "queueing.h"
-#include <converse.h>
-
-class Main : public Chare{
-private:
-    int nodesFinished ;
-    int totalElems;
-       double wps;
-       double calTime; 
-       int calibrated;
-       int time[4];
-       int chunck[4];
-       
-public:
-        Main(CkArgMsg* m) ;
-       void done(void);
-       void doTests(CkQdMsg *msg);
-       void processCommandLine(int argc,char ** argv);
-
-};
-class TestInstance : public CBase_TestInstance {
-       double wps;
-       int result;
-       int flag;
-       int helpers;
-       double timerec;
-       double * allTimes;
-public:
-    TestInstance();
-    ~TestInstance() {}
-    TestInstance(CkMigrateMessage *m) {}
-    void doTest(int flag,int wps,int chunck, int time);
-};
-
-class cyclicMap : public CkArrayMap{
-public:
-  int procNum(int, const CkArrayIndex &idx){
-    int index = *(int *)idx.data();
-    int nid = (index/CkMyNodeSize())%CkNumNodes();
-    int rid = index%CkMyNodeSize();
-    return CkNodeFirst(nid)+rid;
-  }
-};
-
-#endif
similarity index 78%
rename from example/Makefile
rename to example/simpleLoopBench/Makefile
index b6c7451b85b7ab5011d9a3287852fa0938ab0a95..f6c492c0067059b64b02b5e435c8f5e6fbafa227 100644 (file)
@@ -1,8 +1,12 @@
 #NOTE: to compile the example, the NodeHelper.decl/def.h should exist
-OPTS=-I../ -O3 -g -lpthread
-CHARMC=$(HOME)/curcvs/charm/net-linux-x86_64-smp-production/bin/charmc $(OPTS)
-CHARMLIB=$(HOME)/curcvs/charm/net-linux-x86_64-smp-production/lib
-#CHARMLIB=$(HOME)/workspace/charm/net-linux-x86_64/lib
+USEROPTS=-O0 -g -lpthread
+CHARMDIR=$(HOME)/curcvs/charm/net-linux-x86_64-smp-dbg/
+CHARMINC=$(CHARMDIR)/include
+OPTS=-I$(CHARMINC) $(USEROPTS)
+CHARMC=$(CHARMDIR)/bin/charmc $(OPTS)
+CHARMLIB=$(CHARMDIR)/lib
+
+
 all: hello
 
 hello: hello.o
diff --git a/example/simpleLoopBench/hello.C b/example/simpleLoopBench/hello.C
new file mode 100644 (file)
index 0000000..8c35aab
--- /dev/null
@@ -0,0 +1,206 @@
+#include <stdio.h>
+#include <assert.h>
+#include <stdlib.h>
+#include "hello.h"
+
+#include "hello.decl.h"
+
+#define TEST_REPEAT_TIMES 10
+
+CProxy_Main mainProxy;
+CProxy_TestInstance allTestsProxy;
+CProxy_FuncNodeHelper nodeHelperProxy;
+int totalElems; //the number of test instances
+int loopTimes;
+int numChunks;
+int runningMode;
+
+int threadNum; //number of threads to be used in non-SMP
+
+int cmpDFunc(const void *a, const void *b) {
+    double n1 = *(double *)a;
+    double n2 = *(double *)b;
+    if (n1<n2) return -1;
+    if (n1>n2) return 1;
+    return 0;
+}
+
+void work(int start, int end, void *result) {
+    int tmp=0;
+    for (int i=start; i<=end; i++) {
+        tmp+=(int)(sqrt(1+cos(i*1.57)));
+    }
+    *(int *)result = tmp;
+}
+extern "C" void doCalc(int first, int last, void *result, int paramNum, void * param) {    
+    //double tstart = CmiWallTimer();
+    
+       work(first, last, result);
+    
+       //tstart = CmiWallTimer() - tstart;
+    //printf("start=%d, end=%d, time: %f,result: %d on proc %d\n",first, last, tstart,result,CkMyPe());
+}
+
+/*mainchare*/
+Main::Main(CkArgMsg* m) {
+    
+       //default values        
+    totalElems = 1;
+       numChunks = CkMyNodeSize();
+       loopTimes = 1000;
+       runningMode = NODEHELPER_STATIC;
+       
+    mainStep = 0;
+       numElemFinished = 0;
+       
+    //process command line
+    if (m->argc >1 ){
+        processCommandLine(m->argc,m->argv);
+       }
+    else{              
+               CkPrintf("Usage: -t(#iterations) -c(#chunks) -a(#test instances) -m(running mode, 0 (non-SMP) or 1|2 (SMP))  -p(#threads)\n");
+       }
+    delete m;
+       
+       mainTimes = new double[loopTimes];
+       memset(mainTimes, 0, sizeof(double)*loopTimes);
+       
+       CkPrintf("Using NodeHelper Lib with mode: %d, nodesize=%d\n", runningMode, CkMyNodeSize());
+       CkPrintf("Testcase info: %d test instances where the loop iterates %d times, each work is partitioned into %d tasks\n", totalElems, loopTimes, numChunks);
+       
+       nodeHelperProxy = NodeHelper_Init(runningMode, threadNum);
+    mainProxy = thishandle;
+    
+       //create test instances
+    CProxy_cyclicMap myMap = CProxy_cyclicMap::ckNew();
+    CkArrayOptions opts(totalElems);
+    opts.setMap(myMap);
+    allTestsProxy = CProxy_TestInstance::ckNew(opts);
+
+    //serial version
+       int result;
+       double starttime, endtime;
+       for(int i=0; i<3; i++){
+               starttime = CmiWallTimer();
+               work(0, loopTimes, &result);            
+               endtime = CmiWallTimer();
+               CkPrintf("Calibration %d: the loop takes %.3f us with result %d\n", i+1,  (endtime-starttime)*1e6, result);
+       }
+       starttime = CmiWallTimer();
+       for(int i=0; i<5; i++) work(0, loopTimes, &result);
+       endtime = CmiWallTimer();
+       double avgtime = (endtime-starttime)*1e6/5; //in the unit of us
+       CkPrintf("Calibration: avg time %.3f us of 5 consecutive runs, so a 100us-loop will iterate %d times\n", avgtime, (int)(loopTimes*100.0/avgtime));
+               
+    CmiSetCPUAffinity(0);
+    CkStartQD(CkIndex_Main::doTests((CkQdMsg *)0), &thishandle);
+};
+
+void Main::done(void) {
+    numElemFinished++;
+    if (numElemFinished < totalElems) {
+        return;
+    } else {
+               mainTimes[mainStep] = (CmiWallTimer() - timestamp)*1e6;
+        mainStep++;
+        numElemFinished=0;
+        if (mainStep < TEST_REPEAT_TIMES) {
+                       doTests(NULL);
+            return;
+        }
+    }  
+       //do some final output
+       allTestsProxy[0].reportSts();
+};
+
+void Main::exitTest(){
+       //do some final output
+       qsort(mainTimes, TEST_REPEAT_TIMES, sizeof(double), cmpDFunc);
+       double sum = 0.0;
+       for(int i=0; i<TEST_REPEAT_TIMES; i++) sum += mainTimes[i];
+       int maxi = TEST_REPEAT_TIMES;
+       CkPrintf("Global timestep info: avg time: %.3f [%.3f, %.3f, %.3f] (us)\n", sum/(maxi-3), mainTimes[0], mainTimes[maxi/2], mainTimes[maxi-1]);
+       
+       CkPrintf("All done\n");
+       CkExit();
+}
+
+void Main::doTests(CkQdMsg *msg) {
+    delete msg;
+    
+       timestamp = CmiWallTimer(); //record the start time of the whole test
+    for (int i=0; i<totalElems; i++) {
+        allTestsProxy[i].doTest(mainStep);
+        //allTestsProxy[8].doTest(mainStep);
+    }
+};
+
+void Main::processCommandLine(int argc,char ** argv) {
+    for (int i=0; i<argc; i++) {
+        if (argv[i][0]=='-') {
+            switch (argv[i][1]) {
+            case 't':
+                loopTimes = atoi(argv[++i]);
+                break;
+            case 'c':
+                numChunks = atoi(argv[++i]);
+                break;
+            case 'm':
+                runningMode = atoi(argv[++i]);
+                break;
+            case 'a':
+                totalElems = atoi(argv[++i]);
+                break;
+            case 'p':
+                threadNum = atoi(argv[++i]);
+                break;
+            }
+        }
+    }
+}
+
+
+TestInstance::TestInstance() {
+    CkPrintf("test case %d is created on proc %d node %d\n", thisIndex, CkMyPe(),CkMyNode());
+    
+       hasTest = 0; 
+       allTimes = new double[loopTimes];
+       allResults = new int[loopTimes];
+       memset(allTimes, 0, sizeof(double)*loopTimes);
+       memset(allResults, 0, sizeof(int)*loopTimes);
+}
+
+void TestInstance::doTest(int curstep) {
+    //printf("On proc %d node %d, begin parallel execution for test case %d %dth iteration\n", CkMyPe(), CkMyNode(), thisIndex,flag);
+       hasTest = 1;
+    double timerec = CmiWallTimer();
+       int result;
+       
+       NodeHelper_Parallelize(nodeHelperProxy, doCalc, 0, NULL, 0, numChunks, 0, loopTimes-1, &result, NODEHELPER_INT_SUM);
+    allTimes[curstep]=(CmiWallTimer()-timerec)*1e6;
+       allResults[curstep] = result;
+       
+    mainProxy.done();
+}
+
+void TestInstance::reportSts(){
+       if(hasTest){
+               //do sts output
+               qsort(allTimes, TEST_REPEAT_TIMES, sizeof(double), cmpDFunc);
+               double sum = 0.0;
+               for(int i=0; i<TEST_REPEAT_TIMES; i++) sum += allTimes[i];
+               
+               double avgResult = 0.0;
+               for(int i=0; i<TEST_REPEAT_TIMES; i++) avgResult += allResults[i];
+               avgResult /= TEST_REPEAT_TIMES;
+               
+               int maxi = TEST_REPEAT_TIMES;
+               CkPrintf("Test instance[%d]: result:%.3f, avg time: %.3f [%.3f, %.3f, %.3f] (us)\n",thisIndex, avgResult, sum/(maxi-3), allTimes[0], allTimes[maxi/2], allTimes[maxi-1]);
+       }
+       
+       if(thisIndex == totalElems-1) mainProxy.exitTest();
+       else thisProxy[thisIndex+1].reportSts();
+}
+
+#include "hello.def.h"
+
similarity index 65%
rename from example/hello.ci
rename to example/simpleLoopBench/hello.ci
index 77b631a8f9e1945aebcf6f6ef2ee1a4f01e06a92..e69799c2772c6b704fed17ea0d624c0ca2f79e60 100644 (file)
@@ -2,20 +2,25 @@ mainmodule hello {
   readonly CProxy_Main mainProxy;
   readonly CProxy_TestInstance allTestsProxy;
   readonly CProxy_FuncNodeHelper nodeHelperProxy;
-  readonly int iteration;
+  readonly int totalElems;
+  readonly int loopTimes;
+  readonly int numChunks;
+  readonly int runningMode;
+  
   mainchare Main {
     entry Main(CkArgMsg *m);
     entry void done(void);
     entry void doTests(CkQdMsg *msg);
+       entry void exitTest();
   };
 
   array [1D] TestInstance{
     entry TestInstance();
-    entry void doTest(int flag,int result,int chunck, int time);
+    entry void doTest(int curstep);
+       entry void reportSts();
    };
 
   group cyclicMap : CkArrayMap{
     entry cyclicMap();
-  };
-  //extern module NodeHelper;  
+  };  
 };
diff --git a/example/simpleLoopBench/hello.h b/example/simpleLoopBench/hello.h
new file mode 100644 (file)
index 0000000..661d477
--- /dev/null
@@ -0,0 +1,52 @@
+#ifndef _HELLO_H
+#define _HELLO_H
+
+#include "charm++.h"
+#include "NodeHelperAPI.h"
+#include "hello.decl.h"
+#include <assert.h>
+
+class Main : public Chare {
+private:
+       int numElemFinished; //record the number of test instances finished in a timestep
+       double timestamp;
+       int mainStep; //the global counter of timesteps
+       double *mainTimes; //record each timestep from test initiation to test finish (i.e. from the point of main)
+
+public:
+    Main(CkArgMsg* m) ;
+    void done(void);
+       void exitTest();
+    void doTests(CkQdMsg *msg);
+    void processCommandLine(int argc,char ** argv);
+
+};
+
+class TestInstance : public CBase_TestInstance {
+       int hasTest; //used for reporting statistics
+       
+    double *allTimes; //record time taken for each timestep
+       int *allResults; //record the result of each timestep
+       
+public:
+    TestInstance();
+    ~TestInstance() {
+               delete [] allTimes;
+               delete [] allResults;
+       }
+    TestInstance(CkMigrateMessage *m) {}
+    void doTest(int curstep);
+       void reportSts();
+};
+
+class cyclicMap : public CkArrayMap {
+public:
+    int procNum(int, const CkArrayIndex &idx) {
+        int index = *(int *)idx.data();
+        int nid = (index/CkMyNodeSize())%CkNumNodes();
+        int rid = index%CkMyNodeSize();
+        return CkNodeFirst(nid)+rid;
+    }
+};
+
+#endif