1. added output if using charm-level notification
[charm.git] / NodeHelper.C
index a9210f1b7738d0deef7d02c07c4efd19ca58c37e..62b728a708accfe9696c8f762b94162ab7b21868 100644 (file)
@@ -1,6 +1,10 @@
 #include "NodeHelper.h"
 #include <pthread.h>
 
+#if !USE_CONVERSE_NOTIFICATION
+#include "qd.h"
+#endif
+
 /*====Beginning of pthread-related variables and impelementation====*/
 //__thread is not portable, but it works almost everywhere if pthread works
 //After C++11, this should be thread_local
@@ -39,21 +43,21 @@ static int HelperOnCore(){
 
 static void *ndhThreadWork(void *id) {
        size_t myId = (size_t) id;
-       
+
        //further improvement of this affinity setting!!
        int myPhyRank = (myId+mainHelperPhyRank)%numPhysicalPEs;
        //printf("thread[%d]: affixed to rank %d\n", myId, myPhyRank);
        myPhyRank = myId;
        CmiSetCPUAffinity(myPhyRank);
-       
+
        pthread_mutex_init(&thdLock, NULL);
        pthread_cond_init(&thdCondition, NULL);
-       
+
        allLocks[myId-1] = &thdLock;
        allConds[myId-1] = &thdCondition;
-       
+
        __sync_add_and_fetch(&gCrtCnt, 1);
-       
+
        while(1){
                //printf("thread[%ld]: on core %d with main %d\n", myId, HelperOnCore(), mainHelperPhyRank);
                if(exitFlag) break;
@@ -82,7 +86,7 @@ void FuncNodeHelper::createPThreads() {
        allConds = (pthread_cond_t **)malloc(sizeof(void *)*numThreads);
        memset(allLocks, 0, sizeof(void *)*numThreads);
        memset(allConds, 0, sizeof(void *)*numThreads);
-       
+
        pthread_attr_t attr;
        pthread_attr_init(&attr);
        pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
@@ -98,7 +102,7 @@ void FuncNodeHelper::createPThreads() {
 
 void FuncNodeHelper::exit(){
        if(mode == NODEHELPER_PTHREAD){
-               exitFlag = 1;             
+               exitFlag = 1;
                for(int i=0; i<numHelpers-1; i++)
                        pthread_join(ndhThreads[i], NULL);
                delete [] ndhThreads;
@@ -118,30 +122,30 @@ void FuncNodeHelper::exit(){
 static FuncNodeHelper *globalNodeHelper = NULL;
 
 FuncNodeHelper::FuncNodeHelper(int mode_, int numThreads_)
-{  
+{
     traceRegisterUserEvent("nodehelper total work",NDH_TOTAL_WORK_EVENTID);
     traceRegisterUserEvent("nodehlelper finish signal",NDH_FINISH_SIGNAL_EVENTID);
 
     mode = mode_;
-    
+
     CmiAssert(globalNodeHelper==NULL);
     globalNodeHelper = this;
-    
+
     if(mode == NODEHELPER_USECHARM){
-        //CkPrintf("FuncNodeHelper created on node %d\n", CkMyNode());                     
+        //CkPrintf("FuncNodeHelper created on node %d\n", CkMyNode());
         numHelpers = CkMyNodeSize();
         helperPtr = new FuncSingleHelper *[numHelpers];
         useTreeBcast = (numHelpers >= USE_TREE_BROADCAST_THRESHOLD);
-        
+
         int pestart = CkNodeFirst(CkMyNode());
-            
+
         for (int i=0; i<numHelpers; i++) {
             CkChareID helper;
             CProxy_FuncSingleHelper::ckNew(numHelpers, &helper, pestart+i);
         }
     }else if(mode == NODEHELPER_PTHREAD){
                helperPtr = NULL;
-               
+
                numHelpers = numThreads_;
                useTreeBcast = (numHelpers >= USE_TREE_BROADCAST_THRESHOLD);
                pthdLoop = new CurLoopInfo(FuncNodeHelper::MAX_CHUNKS);
@@ -161,30 +165,30 @@ int FuncNodeHelper::MAX_CHUNKS = 64;
 #endif
 
 #define ALLOW_MULTIPLE_UNSYNC 1
-void FuncNodeHelper::parallelizeFunc(HelperFn func, int paramNum, void * param, 
-                                    int numChunks, int lowerRange, 
+void FuncNodeHelper::parallelizeFunc(HelperFn func, int paramNum, void * param,
+                                    int numChunks, int lowerRange,
                                                    int upperRange, int sync,
                                     void *redResult, REDUCTION_TYPE type) {
-                                        
+
     double _start; //may be used for tracing
-    
-    if(numChunks > MAX_CHUNKS){ 
+
+    if(numChunks > MAX_CHUNKS){
         CkPrintf("NodeHelper[%d]: WARNING! chunk is set to MAX_CHUNKS=%d\n", CmiMyPe(), MAX_CHUNKS);
         numChunks = MAX_CHUNKS;
     }
-       
+
     /* "stride" determines the number of loop iterations to be done in each chunk
      * for chunk indexed at 0 to remainder-1, stride is "unit+1";
      * for chunk indexed at remainder to numChunks-1, stride is "unit"
      */
      int stride;
         CurLoopInfo *curLoop = NULL;
-    
+
     //for using nodequeue
        TRACE_START(NDH_TOTAL_WORK_EVENTID);
-       if(mode == NODEHELPER_USECHARM){        
+       if(mode == NODEHELPER_USECHARM){
                FuncSingleHelper *thisHelper = helperPtr[CkMyRank()];
-#if USE_CONVERSE_NOTIFICATION        
+#if USE_CONVERSE_NOTIFICATION
        #if ALLOW_MULTIPLE_UNSYNC
                ConverseNotifyMsg *notifyMsg = thisHelper->getNotifyMsg();
        #else
@@ -195,10 +199,10 @@ void FuncNodeHelper::parallelizeFunc(HelperFn func, int paramNum, void * param,
                if(useTreeBcast){
                        int loopTimes = TREE_BCAST_BRANCH>(CmiMyNodeSize()-1)?CmiMyNodeSize()-1:TREE_BCAST_BRANCH;
                        //just implicit binary tree
-                       int pe = CmiMyRank()+1;        
+                       int pe = CmiMyRank()+1;
                        for(int i=0; i<loopTimes; i++, pe++){
                                if(pe >= CmiMyNodeSize()) pe -= CmiMyNodeSize();
-                               CmiPushPE(pe, (void *)(notifyMsg));    
+                               CmiPushPE(pe, (void *)(notifyMsg));
                        }
                }else{
                        for (int i=CmiMyRank()+1; i<numHelpers; i++) {
@@ -215,6 +219,7 @@ void FuncNodeHelper::parallelizeFunc(HelperFn func, int paramNum, void * param,
         curLoop = thisHelper->taskBuffer[0];
     #endif
         curLoop->set(numChunks, func, lowerRange, upperRange, paramNum, param);
+       CpvAccess(_qd)->create(numHelpers-1);
         if(useTreeBcast){
                        int loopTimes = TREE_BCAST_BRANCH>(CmiMyNodeSize()-1)?CmiMyNodeSize()-1:TREE_BCAST_BRANCH;
                        //just implicit binary tree
@@ -225,7 +230,7 @@ void FuncNodeHelper::parallelizeFunc(HelperFn func, int paramNum, void * param,
                 one->ptr = (void *)curLoop;
                 envelope *env = UsrToEnv(one);
                 env->setObjPtr(thisHelper->ckGetChareID().objPtr);
-                               CmiPushPE(pe, (void *)(env));    
+                               CmiPushPE(pe, (void *)(env));
                        }
                }else{
                        for (int i=CmiMyRank()+1; i<numHelpers; i++) {
@@ -245,7 +250,7 @@ void FuncNodeHelper::parallelizeFunc(HelperFn func, int paramNum, void * param,
                                CmiPushPE(i, (void *)(env));
                        }
                }
-#endif        
+#endif
        }else if(mode == NODEHELPER_PTHREAD){
                int numThreads = numHelpers-1;
                curLoop = pthdLoop;
@@ -262,16 +267,20 @@ void FuncNodeHelper::parallelizeFunc(HelperFn func, int paramNum, void * param,
                //in this mode, it's always synced
                sync = 1;
        }
-       
+
        curLoop->stealWork();
        TRACE_BRACKET(NDH_TOTAL_WORK_EVENTID);
-       
+
+    //printf("[%d]: parallelize func %p with [%d ~ %d] divided into %d chunks using loop=%p\n", CkMyPe(), func, lowerRange, upperRange, numChunks, curLoop);
+
        TRACE_START(NDH_FINISH_SIGNAL_EVENTID);
        curLoop->waitLoopDone(sync);
        TRACE_BRACKET(NDH_FINISH_SIGNAL_EVENTID);
 
+    //printf("[%d]: finished parallelize func %p with [%d ~ %d] divided into %d chunks using loop=%p\n", CkMyPe(), func, lowerRange, upperRange, numChunks, curLoop);
+
     if (type!=NODEHELPER_NONE)
-        reduce(curLoop->getRedBufs(), redResult, type, numChunks);            
+        reduce(curLoop->getRedBufs(), redResult, type, numChunks);
     return;
 }
 
@@ -319,7 +328,7 @@ static void RegisterNodeHelperHdlrs(){
 extern int _charmHandlerIdx;
 FuncSingleHelper::FuncSingleHelper(int numHelpers) {
     totalHelpers = numHelpers;
-#if USE_CONVERSE_NOTIFICATION    
+#if USE_CONVERSE_NOTIFICATION
     notifyMsgBufSize = TASK_BUFFER_SIZE;
 #else
     notifyMsgBufSize = TASK_BUFFER_SIZE*totalHelpers;
@@ -327,9 +336,9 @@ FuncSingleHelper::FuncSingleHelper(int numHelpers) {
 
     CmiAssert(globalNodeHelper!=NULL);
     thisNodeHelper = globalNodeHelper;
-            
+
        nextFreeNotifyMsg = 0;
-#if USE_CONVERSE_NOTIFICATION    
+#if USE_CONVERSE_NOTIFICATION
     notifyMsg = (ConverseNotifyMsg *)malloc(sizeof(ConverseNotifyMsg)*notifyMsgBufSize);
     for(int i=0; i<notifyMsgBufSize; i++){
         ConverseNotifyMsg *tmp = notifyMsg+i;
@@ -337,7 +346,7 @@ FuncSingleHelper::FuncSingleHelper(int numHelpers) {
             tmp->srcRank = CmiMyRank();
         }else{
             tmp->srcRank = -1;
-        }            
+        }
         tmp->ptr = (void *)(new CurLoopInfo(FuncNodeHelper::MAX_CHUNKS));
         CmiSetHandler(tmp, CpvAccess(NdhStealWorkHandler));
     }
@@ -364,19 +373,19 @@ FuncSingleHelper::FuncSingleHelper(int numHelpers) {
     for(int i=0; i<TASK_BUFFER_SIZE; i++){
         taskBuffer[i] = new CurLoopInfo(FuncNodeHelper::MAX_CHUNKS);
     }
-#endif    
+#endif
     globalNodeHelper->helperPtr[CkMyRank()] = this;
 }
 
 void FuncSingleHelper::stealWork(CharmNotifyMsg *msg){
-#if !USE_CONVERSE_NOTIFICATION    
+#if !USE_CONVERSE_NOTIFICATION
     int srcRank = msg->srcRank;
     CurLoopInfo *loop = (CurLoopInfo *)msg->ptr;
     if(srcRank >= 0){
         //means using tree-broadcast to send the notification msg
         int relPE = CmiMyRank()-msg->srcRank;
                if(relPE<0) relPE += CmiMyNodeSize();
-               
+
                //CmiPrintf("Rank[%d]: got msg from src %d with relPE %d\n", CmiMyRank(), msg->srcRank, relPE);
                relPE=relPE*TREE_BCAST_BRANCH+1;
                for(int i=0; i<TREE_BCAST_BRANCH; i++, relPE++){
@@ -391,19 +400,19 @@ void FuncSingleHelper::stealWork(CharmNotifyMsg *msg){
                }
     }
     loop->stealWork();
-#endif    
+#endif
 }
 
 void SingleHelperStealWork(ConverseNotifyMsg *msg){
        int srcRank = msg->srcRank;
-       
+
        if(srcRank >= 0){
                //means using tree-broadcast to send the notification msg
-               
+
                //int numHelpers = CmiMyNodeSize(); //the value of "numHelpers" should be obtained somewhere else
                int relPE = CmiMyRank()-msg->srcRank;
                if(relPE<0) relPE += CmiMyNodeSize();
-               
+
                //CmiPrintf("Rank[%d]: got msg from src %d with relPE %d\n", CmiMyRank(), msg->srcRank, relPE);
                relPE=relPE*TREE_BCAST_BRANCH+1;
                for(int i=0; i<TREE_BCAST_BRANCH; i++, relPE++){
@@ -421,12 +430,12 @@ void CurLoopInfo::stealWork(){
     //indicate the current work hasn't been initialized
     //or the old work has finished.
     if(inited == 0) return;
-    
+
     int first, last;
     int unit = (upperIndex-lowerIndex+1)/numChunks;
     int remainder = (upperIndex-lowerIndex+1)-unit*numChunks;
     int markIdx = remainder*(unit+1);
-    
+
     int nextChunkId = getNextChunkIdx();
     int execTimes = 0;
     while(nextChunkId < numChunks){
@@ -437,7 +446,7 @@ void CurLoopInfo::stealWork(){
             first = (nextChunkId - remainder)*unit + markIdx;
             last = first+unit-1;
         }
-                
+
         fnPtr(first, last, redBufs[nextChunkId], paramNum, param);
         execTimes++;
         nextChunkId = getNextChunkIdx();
@@ -451,7 +460,11 @@ void CurLoopInfo::stealWork(){
 
 CProxy_FuncNodeHelper NodeHelper_Init(int mode, int numThreads){
     if(mode == NODEHELPER_USECHARM){
-        CkPrintf("NodeHelperLib is used in SMP with a simple dynamic scheduling but not using node-level queue\n");
+#if USE_CONVERSE_NOTIFICATION
+        CkPrintf("NodeHelperLib is used in SMP with a simple dynamic scheduling (converse-level notification) but not using node-level queue\n");
+#else
+        CkPrintf("NodeHelperLib is used in SMP with a simple dynamic scheduling (charm-level notifiation) but not using node-level queue\n");
+#endif
     }else if(mode==NODEHELPER_PTHREAD){
         CkPrintf("NodeHelperLib is used with extra %d pthreads via a simple dynamic scheduling\n", numThreads);
         CmiAssert(numThreads>0);
@@ -463,8 +476,8 @@ void NodeHelper_Exit(CProxy_FuncNodeHelper nodeHelper){
        nodeHelper.exit();
 }
 
-void NodeHelper_Parallelize(CProxy_FuncNodeHelper nodeHelper, HelperFn func, 
-                        int paramNum, void * param, 
+void NodeHelper_Parallelize(CProxy_FuncNodeHelper nodeHelper, HelperFn func,
+                        int paramNum, void * param,
                         int numChunks, int lowerRange, int upperRange,
                                    int sync,
                         void *redResult, REDUCTION_TYPE type)
@@ -472,8 +485,8 @@ void NodeHelper_Parallelize(CProxy_FuncNodeHelper nodeHelper, HelperFn func,
     nodeHelper[CkMyNode()].ckLocalBranch()->parallelizeFunc(func, paramNum, param, numChunks, lowerRange, upperRange, sync, redResult, type);
 }
 
-void NodeHelper_Parallelize(HelperFn func, 
-                        int paramNum, void * param, 
+void NodeHelper_Parallelize(HelperFn func,
+                        int paramNum, void * param,
                         int numChunks, int lowerRange, int upperRange,
                                    int sync,
                         void *redResult, REDUCTION_TYPE type)