1. Simplified the parallelization interface a little bit. The Nodehelper proxy could...
authorChao Mei <chaomei2@illinois.edu>
Sat, 7 Apr 2012 02:47:14 +0000 (21:47 -0500)
committerChao Mei <chaomei2@illinois.edu>
Sat, 7 Apr 2012 02:47:14 +0000 (21:47 -0500)
2. Added optimized support to use charm-level notification scheme so that msg priority could be used if needed. In this optimized scheme, no memory allocation happens for notifying loop parallelization on a node.

NodeHelper.C
NodeHelper.ci
NodeHelper.h
NodeHelperAPI.h

index 5ea90732266eb6bb526515d692422a95df7849b4..a9210f1b7738d0deef7d02c07c4efd19ca58c37e 100644 (file)
@@ -115,13 +115,18 @@ void FuncNodeHelper::exit(){
 #define NDH_TOTAL_WORK_EVENTID  139
 #define NDH_FINISH_SIGNAL_EVENTID 143
 
+static FuncNodeHelper *globalNodeHelper = NULL;
+
 FuncNodeHelper::FuncNodeHelper(int mode_, int numThreads_)
 {  
     traceRegisterUserEvent("nodehelper total work",NDH_TOTAL_WORK_EVENTID);
     traceRegisterUserEvent("nodehlelper finish signal",NDH_FINISH_SIGNAL_EVENTID);
 
     mode = mode_;
-
+    
+    CmiAssert(globalNodeHelper==NULL);
+    globalNodeHelper = this;
+    
     if(mode == NODEHELPER_USECHARM){
         //CkPrintf("FuncNodeHelper created on node %d\n", CkMyNode());                     
         numHelpers = CkMyNodeSize();
@@ -132,7 +137,7 @@ FuncNodeHelper::FuncNodeHelper(int mode_, int numThreads_)
             
         for (int i=0; i<numHelpers; i++) {
             CkChareID helper;
-            CProxy_FuncSingleHelper::ckNew((size_t)this, &helper, pestart+i);
+            CProxy_FuncSingleHelper::ckNew(numHelpers, &helper, pestart+i);
         }
     }else if(mode == NODEHELPER_PTHREAD){
                helperPtr = NULL;
@@ -158,7 +163,7 @@ int FuncNodeHelper::MAX_CHUNKS = 64;
 #define ALLOW_MULTIPLE_UNSYNC 1
 void FuncNodeHelper::parallelizeFunc(HelperFn func, int paramNum, void * param, 
                                     int numChunks, int lowerRange, 
-                                   int upperRange, int sync,
+                                                   int upperRange, int sync,
                                     void *redResult, REDUCTION_TYPE type) {
                                         
     double _start; //may be used for tracing
@@ -179,14 +184,15 @@ void FuncNodeHelper::parallelizeFunc(HelperFn func, int paramNum, void * param,
        TRACE_START(NDH_TOTAL_WORK_EVENTID);
        if(mode == NODEHELPER_USECHARM){        
                FuncSingleHelper *thisHelper = helperPtr[CkMyRank()];
+#if USE_CONVERSE_NOTIFICATION        
        #if ALLOW_MULTIPLE_UNSYNC
                ConverseNotifyMsg *notifyMsg = thisHelper->getNotifyMsg();
        #else
                ConverseNotifyMsg *notifyMsg = thisHelper->notifyMsg;
        #endif
                curLoop = (CurLoopInfo *)(notifyMsg->ptr);
-               curLoop->set(numChunks, func, lowerRange, upperRange, paramNum, param); 
-               if(useTreeBcast){               
+               curLoop->set(numChunks, func, lowerRange, upperRange, paramNum, param);
+               if(useTreeBcast){
                        int loopTimes = TREE_BCAST_BRANCH>(CmiMyNodeSize()-1)?CmiMyNodeSize()-1:TREE_BCAST_BRANCH;
                        //just implicit binary tree
                        int pe = CmiMyRank()+1;        
@@ -202,6 +208,44 @@ void FuncNodeHelper::parallelizeFunc(HelperFn func, int paramNum, void * param,
                                CmiPushPE(i, (void *)(notifyMsg));
                        }
                }
+#else
+    #if ALLOW_MULTIPLE_UNSYNC
+        curLoop = thisHelper->getNewTask();
+    #else
+        curLoop = thisHelper->taskBuffer[0];
+    #endif
+        curLoop->set(numChunks, func, lowerRange, upperRange, paramNum, param);
+        if(useTreeBcast){
+                       int loopTimes = TREE_BCAST_BRANCH>(CmiMyNodeSize()-1)?CmiMyNodeSize()-1:TREE_BCAST_BRANCH;
+                       //just implicit binary tree
+                       int pe = CmiMyRank()+1;
+                       for(int i=0; i<loopTimes; i++, pe++){
+                               if(pe >= CmiMyNodeSize()) pe -= CmiMyNodeSize();
+                CharmNotifyMsg *one = thisHelper->getNotifyMsg();
+                one->ptr = (void *)curLoop;
+                envelope *env = UsrToEnv(one);
+                env->setObjPtr(thisHelper->ckGetChareID().objPtr);
+                               CmiPushPE(pe, (void *)(env));    
+                       }
+               }else{
+                       for (int i=CmiMyRank()+1; i<numHelpers; i++) {
+                CharmNotifyMsg *one = thisHelper->getNotifyMsg();
+                one->ptr = (void *)curLoop;
+                envelope *env = UsrToEnv(one);
+                env->setObjPtr(thisHelper->ckGetChareID().objPtr);
+                //printf("[%d] sending a msg %p (env=%p) to [%d]\n", CmiMyRank(), one, env, i);
+                               CmiPushPE(i, (void *)(env));
+                       }
+                       for (int i=0; i<CmiMyRank(); i++) {
+                CharmNotifyMsg *one = thisHelper->getNotifyMsg();
+                one->ptr = (void *)curLoop;
+                envelope *env = UsrToEnv(one);
+                env->setObjPtr(thisHelper->ckGetChareID().objPtr);
+                //printf("[%d] sending a msg %p (env=%p) to [%d]\n", CmiMyRank(), one, env, i);
+                               CmiPushPE(i, (void *)(env));
+                       }
+               }
+#endif        
        }else if(mode == NODEHELPER_PTHREAD){
                int numThreads = numHelpers-1;
                curLoop = pthdLoop;
@@ -272,13 +316,22 @@ static void RegisterNodeHelperHdlrs(){
     CpvAccess(NdhStealWorkHandler) = CmiRegisterHandler((CmiHandler)SingleHelperStealWork);
 }
 
-FuncSingleHelper::FuncSingleHelper(size_t ndhPtr) {
-    thisNodeHelper = (FuncNodeHelper *)ndhPtr;
-    CmiAssert(thisNodeHelper!=NULL);
-        
+extern int _charmHandlerIdx;
+FuncSingleHelper::FuncSingleHelper(int numHelpers) {
+    totalHelpers = numHelpers;
+#if USE_CONVERSE_NOTIFICATION    
+    notifyMsgBufSize = TASK_BUFFER_SIZE;
+#else
+    notifyMsgBufSize = TASK_BUFFER_SIZE*totalHelpers;
+#endif
+
+    CmiAssert(globalNodeHelper!=NULL);
+    thisNodeHelper = globalNodeHelper;
+            
        nextFreeNotifyMsg = 0;
-    notifyMsg = (ConverseNotifyMsg *)malloc(sizeof(ConverseNotifyMsg)*MSG_BUFFER_SIZE);
-    for(int i=0; i<MSG_BUFFER_SIZE; i++){
+#if USE_CONVERSE_NOTIFICATION    
+    notifyMsg = (ConverseNotifyMsg *)malloc(sizeof(ConverseNotifyMsg)*notifyMsgBufSize);
+    for(int i=0; i<notifyMsgBufSize; i++){
         ConverseNotifyMsg *tmp = notifyMsg+i;
         if(thisNodeHelper->useTreeBcast){
             tmp->srcRank = CmiMyRank();
@@ -288,12 +341,60 @@ FuncSingleHelper::FuncSingleHelper(size_t ndhPtr) {
         tmp->ptr = (void *)(new CurLoopInfo(FuncNodeHelper::MAX_CHUNKS));
         CmiSetHandler(tmp, CpvAccess(NdhStealWorkHandler));
     }
-    thisNodeHelper->helperPtr[CkMyRank()] = this;
+#else
+    nextFreeTaskBuffer = 0;
+    notifyMsg = (CharmNotifyMsg **)malloc(sizeof(CharmNotifyMsg *)*notifyMsgBufSize);
+    for(int i=0; i<notifyMsgBufSize; i++){
+        CharmNotifyMsg *tmp = new(sizeof(int)*8)CharmNotifyMsg; //allow msg priority bits
+        notifyMsg[i] = tmp;
+        if(thisNodeHelper->useTreeBcast){
+            tmp->srcRank = CmiMyRank();
+        }else{
+            tmp->srcRank = -1;
+        }
+        tmp->ptr = NULL;
+        envelope *env = UsrToEnv(tmp);
+        env->setMsgtype(ForChareMsg);
+        env->setEpIdx(CkIndex_FuncSingleHelper::stealWork(NULL));
+        env->setSrcPe(CkMyPe());
+        CmiSetHandler(env, _charmHandlerIdx);
+        //env->setObjPtr has to be called when a notification msg is sent
+    }
+    taskBuffer = (CurLoopInfo **)malloc(sizeof(CurLoopInfo *)*TASK_BUFFER_SIZE);
+    for(int i=0; i<TASK_BUFFER_SIZE; i++){
+        taskBuffer[i] = new CurLoopInfo(FuncNodeHelper::MAX_CHUNKS);
+    }
+#endif    
+    globalNodeHelper->helperPtr[CkMyRank()] = this;
 }
 
+void FuncSingleHelper::stealWork(CharmNotifyMsg *msg){
+#if !USE_CONVERSE_NOTIFICATION    
+    int srcRank = msg->srcRank;
+    CurLoopInfo *loop = (CurLoopInfo *)msg->ptr;
+    if(srcRank >= 0){
+        //means using tree-broadcast to send the notification msg
+        int relPE = CmiMyRank()-msg->srcRank;
+               if(relPE<0) relPE += CmiMyNodeSize();
+               
+               //CmiPrintf("Rank[%d]: got msg from src %d with relPE %d\n", CmiMyRank(), msg->srcRank, relPE);
+               relPE=relPE*TREE_BCAST_BRANCH+1;
+               for(int i=0; i<TREE_BCAST_BRANCH; i++, relPE++){
+                       if(relPE >= CmiMyNodeSize()) break;
+                       int pe = (relPE + msg->srcRank)%CmiMyNodeSize();
+                       //CmiPrintf("Rank[%d]: send msg to dst %d (relPE: %d) from src %d\n", CmiMyRank(), pe, relPE, msg->srcRank);
+            CharmNotifyMsg *newone = getNotifyMsg();
+            newone->ptr = (void *)loop;
+            envelope *env = UsrToEnv(newone);
+            env->setObjPtr(thisNodeHelper->helperPtr[pe]->ckGetChareID().objPtr);
+                       CmiPushPE(pe, (void *)env);
+               }
+    }
+    loop->stealWork();
+#endif    
+}
 
 void SingleHelperStealWork(ConverseNotifyMsg *msg){
-       
        int srcRank = msg->srcRank;
        
        if(srcRank >= 0){
@@ -365,10 +466,18 @@ void NodeHelper_Exit(CProxy_FuncNodeHelper nodeHelper){
 void NodeHelper_Parallelize(CProxy_FuncNodeHelper nodeHelper, HelperFn func, 
                         int paramNum, void * param, 
                         int numChunks, int lowerRange, int upperRange,
-                       int sync,
+                                   int sync,
                         void *redResult, REDUCTION_TYPE type)
 {
     nodeHelper[CkMyNode()].ckLocalBranch()->parallelizeFunc(func, paramNum, param, numChunks, lowerRange, upperRange, sync, redResult, type);
 }
 
+void NodeHelper_Parallelize(HelperFn func, 
+                        int paramNum, void * param, 
+                        int numChunks, int lowerRange, int upperRange,
+                                   int sync,
+                        void *redResult, REDUCTION_TYPE type)
+{
+    globalNodeHelper->parallelizeFunc(func, paramNum, param, numChunks, lowerRange, upperRange, sync, redResult, type);
+}
 #include "NodeHelper.def.h"
index 8e2ad8b9ead6525f1a37116d4d5634f14f4cb81d..def045cd9009549e6e6498582a8a28a730e2ba5b 100644 (file)
@@ -1,10 +1,14 @@
 module NodeHelper{
     initproc void RegisterNodeHelperHdlrs(void);
+
+    message CharmNotifyMsg;
+
        nodegroup FuncNodeHelper{
                entry FuncNodeHelper(int mode, int numThreads);
                entry void exit();
        };
        chare FuncSingleHelper{
-               entry FuncSingleHelper(size_t);
+               entry FuncSingleHelper(int numHelpers);
+               entry void stealWork(CharmNotifyMsg *msg);
        };
 };
index 6d652a7f3347b431bc0fd73da2cf5a95e30d1147..15b9449de7c63cf605e7abc6c8889f55e6e00ea0 100644 (file)
@@ -8,6 +8,16 @@
 #define USE_TREE_BROADCAST_THRESHOLD 8
 #define TREE_BCAST_BRANCH (4)
 #define CACHE_LINE_SIZE 64
+/* 1. Using converse-level msg, then the msg is always of highest priority.
+ * And the notification msg comes from the singlehelper where the loop parallelization
+ * is initiated.
+ * 
+ * 2. Using charm-level msg, then the msg could be set with different priorities.
+ * However, the notification msg comes from the singlehelper where the parallelized
+ * loop is executed.
+ * 
+ * */
+#define USE_CONVERSE_NOTIFICATION 1
 
 class FuncSingleHelper;
 
@@ -96,6 +106,12 @@ typedef struct converseNotifyMsg{
     void *ptr;
 }ConverseNotifyMsg;
 
+class CharmNotifyMsg: public CMessage_CharmNotifyMsg{
+public:
+    int srcRank;
+    void *ptr; //the loop info
+};
+
 class FuncNodeHelper : public CBase_FuncNodeHelper {
     friend class FuncSingleHelper;
        
@@ -124,7 +140,7 @@ public:
                         int paramNum, void * param, /* the input parameters for the above func */
                         int numChunks, /* number of chunks to be partitioned */
                         int lowerRange, int upperRange, /* the loop-like parallelization happens in [lowerRange, upperRange] */                        
-                       int sync=1, /* whether the flow will continue until all chunks have finished */
+                        int sync=1, /* whether the flow will continue until all chunks have finished */
                         void *redResult=NULL, REDUCTION_TYPE type=NODEHELPER_NONE /* the reduction result, ONLY SUPPORT SINGLE VAR of TYPE int/float/double */
                         );
     void reduce(void **redBufs, void *redBuf, REDUCTION_TYPE type, int numChunks);
@@ -134,36 +150,77 @@ void SingleHelperStealWork(ConverseNotifyMsg *msg);
 
 /* FuncSingleHelper is a chare located on every core of a node */
 //allowing arbitrary combination of sync and unsync parallelizd loops
-#define MSG_BUFFER_SIZE (3)
+#define TASK_BUFFER_SIZE (3)
 class FuncSingleHelper: public CBase_FuncSingleHelper {
        friend class FuncNodeHelper;
-private: 
+private:
+    int totalHelpers;
+    int notifyMsgBufSize;
+    
     FuncNodeHelper *thisNodeHelper;
+#if USE_CONVERSE_NOTIFICATION
+    //this msg is shared across all SingleHelpers
     ConverseNotifyMsg *notifyMsg;
+#else
+    //acted as a msg buffer for charm-level notification msgs sent to other
+    //SingleHelpers. At each sending, 
+    //1. the msg destination chare (SingleHelper) has to be set.
+    //2. the associated loop info has to be set.
+    CharmNotifyMsg **notifyMsg;
+    CurLoopInfo **taskBuffer;
+    int nextFreeTaskBuffer;
+#endif
     int nextFreeNotifyMsg;
-    //CurLoopInfo *curLoop; /* Points to the current loop that is being processed */
     
 public:
-    FuncSingleHelper(size_t ndhPtr);
+    FuncSingleHelper(int numHelpers);
 
     ~FuncSingleHelper() {
-        for(int i=0; i<MSG_BUFFER_SIZE; i++){
+    #if USE_CONVERSE_NOTIFICATION
+        for(int i=0; i<notifyMsgBufSize; i++){
             ConverseNotifyMsg *tmp = notifyMsg+i;
             CurLoopInfo *loop = (CurLoopInfo *)(tmp->ptr);
             delete loop;
         }
         free(notifyMsg);
+    #else
+        for(int i=0; i<notifyMsgBufSize; i++) delete notifyMsg[i];
+        free(notifyMsg);
+        for(int i=0; i<TASK_BUFFER_SIZE; i++) delete taskBuffer[i];
+        free(taskBuffer);
+    #endif
     }
-    
+#if USE_CONVERSE_NOTIFICATION    
     ConverseNotifyMsg *getNotifyMsg(){
         while(1){
             ConverseNotifyMsg *cur = notifyMsg+nextFreeNotifyMsg;
             CurLoopInfo *loop = (CurLoopInfo *)(cur->ptr);
-            nextFreeNotifyMsg = (nextFreeNotifyMsg+1)%MSG_BUFFER_SIZE;
+            nextFreeNotifyMsg = (nextFreeNotifyMsg+1)%notifyMsgBufSize;
             if(loop->isFree()) return cur;
         }
         return NULL;
     }
+#else
+    CharmNotifyMsg *getNotifyMsg(){
+        while(1){
+            CharmNotifyMsg *cur = notifyMsg[nextFreeNotifyMsg];
+            CurLoopInfo *loop = (CurLoopInfo *)(cur->ptr);
+            nextFreeNotifyMsg = (nextFreeNotifyMsg+1)%notifyMsgBufSize;
+            if(loop==NULL || loop->isFree()) return cur;
+        }
+        return NULL;
+    }
+    CurLoopInfo *getNewTask(){
+        while(1){
+            CurLoopInfo *cur = taskBuffer[nextFreeTaskBuffer];
+            nextFreeTaskBuffer = (nextFreeTaskBuffer+1)%TASK_BUFFER_SIZE;
+            if(cur->isFree()) return cur;
+        }
+        return NULL;
+    }
+#endif    
+    
+    void stealWork(CharmNotifyMsg *msg);
     
     FuncSingleHelper(CkMigrateMessage *m) {}           
 };
index bf29a77f51b52947166801b9779ae96e57b6ece8..7808e79d5119867d1129d3cd6239a5571cb5e32a 100644 (file)
@@ -32,7 +32,15 @@ extern void NodeHelper_Parallelize(
                         int paramNum, void * param, /* the input parameters for the above func */
                         int numChunks, /* number of chunks to be partitioned */
                         int lowerRange, int upperRange, /* the loop-like parallelization happens in [lowerRange, upperRange] */                        
-                       int sync=1, /* whether the flow will continue unless all chunks have finished */ 
+                        int sync=1, /* whether the flow will continue unless all chunks have finished */ 
+                        void *redResult=NULL, REDUCTION_TYPE type=NODEHELPER_NONE /* the reduction result, ONLY SUPPORT SINGLE VAR of TYPE int/float/double */
+                        );
+extern void NodeHelper_Parallelize(
+                                               HelperFn func, /* the function that finishes a partial work on another thread */
+                        int paramNum, void * param, /* the input parameters for the above func */
+                        int numChunks, /* number of chunks to be partitioned */
+                        int lowerRange, int upperRange, /* the loop-like parallelization happens in [lowerRange, upperRange] */                        
+                        int sync=1, /* whether the flow will continue unless all chunks have finished */ 
                         void *redResult=NULL, REDUCTION_TYPE type=NODEHELPER_NONE /* the reduction result, ONLY SUPPORT SINGLE VAR of TYPE int/float/double */
                         );
 #endif