Added msg buffer to allow the combination of unsync and sync parallelization
authorChao Mei <chaomei2@illinois.edu>
Sat, 25 Feb 2012 22:14:09 +0000 (16:14 -0600)
committerChao Mei <chaomei2@illinois.edu>
Sat, 25 Feb 2012 22:14:09 +0000 (16:14 -0600)
tasks

NodeHelper.C
NodeHelper.h

index d328608ddce2c54529707faf6557a938399cd30b..3f4fb9c894c45712800665f40a42f8b7006974d8 100644 (file)
@@ -31,6 +31,7 @@ int FuncNodeHelper::MAX_CHUNKS = 64;
 #define TRACE_BRACKET(id)
 #endif
 
+#define ALLOW_MULTIPLE_UNSYNC 1
 void FuncNodeHelper::parallelizeFunc(HelperFn func, int paramNum, void * param, 
                                     int numChunks, int lowerRange, 
                                    int upperRange, int sync,
@@ -53,9 +54,13 @@ void FuncNodeHelper::parallelizeFunc(HelperFn func, int paramNum, void * param,
        TRACE_START(20);
        
        FuncSingleHelper *thisHelper = helperPtr[CkMyRank()];
-       CurLoopInfo *curLoop = thisHelper->curLoop;
-       curLoop->set(numChunks, func, lowerRange, upperRange, paramNum, param);
-    ConverseNotifyMsg *notifyMsg = thisHelper->notifyMsg;      
+#if ALLOW_MULTIPLE_UNSYNC
+    ConverseNotifyMsg *notifyMsg = thisHelper->getNotifyMsg();
+#else
+    ConverseNotifyMsg *notifyMsg = thisHelper->notifyMsg;
+#endif
+    CurLoopInfo *curLoop = (CurLoopInfo *)(notifyMsg->ptr);
+       curLoop->set(numChunks, func, lowerRange, upperRange, paramNum, param); 
        if(useTreeBcast){               
                int loopTimes = TREE_BCAST_BRANCH>(CmiMyNodeSize()-1)?CmiMyNodeSize()-1:TREE_BCAST_BRANCH;
                //just implicit binary tree
@@ -69,7 +74,7 @@ void FuncNodeHelper::parallelizeFunc(HelperFn func, int paramNum, void * param,
                        if (i!=CkMyRank()) CmiPushPE(i, (void *)(notifyMsg));            
                }
        }
-
+    
        curLoop->stealWork();
        TRACE_BRACKET(20);
        
index 14ca7c74ed1ac225dea740419eb93b90ea735e80..41f077d0a1381380b16dbe779c1d1abf9eafefc1 100644 (file)
@@ -81,6 +81,8 @@ public:
         __sync_add_and_fetch(&finishFlag, counter);
     }
     
+    int isFree() { return finishFlag == numChunks; }
+    
        void **getRedBufs() { return redBufs; }
        
     void stealWork();
@@ -123,34 +125,54 @@ public:
 void SingleHelperStealWork(ConverseNotifyMsg *msg);
 
 /* FuncSingleHelper is a chare located on every core of a node */
+//allowing arbitrary combination of sync and unsync parallelizd loops
+#define MSG_BUFFER_SIZE (3)
 class FuncSingleHelper: public CBase_FuncSingleHelper {
        friend class FuncNodeHelper;
 private: 
     FuncNodeHelper *thisNodeHelper;
     ConverseNotifyMsg *notifyMsg;
-    CurLoopInfo *curLoop; /* Points to the current loop that is being processed */
+    int nextFreeNotifyMsg;
+    //CurLoopInfo *curLoop; /* Points to the current loop that is being processed */
     
 public:
     FuncSingleHelper(size_t ndhPtr) {
         thisNodeHelper = (FuncNodeHelper *)ndhPtr;
         CmiAssert(thisNodeHelper!=NULL);        
         int stealWorkHandler = CmiRegisterHandler((CmiHandler)SingleHelperStealWork);
-        curLoop = new CurLoopInfo(FuncNodeHelper::MAX_CHUNKS);
         
-        notifyMsg = (ConverseNotifyMsg *)malloc(sizeof(ConverseNotifyMsg));
-        if(thisNodeHelper->useTreeBcast){
-            notifyMsg->srcRank = CmiMyRank();
-        }else{
-            notifyMsg->srcRank = -1;
+       nextFreeNotifyMsg = 0;
+        notifyMsg = (ConverseNotifyMsg *)malloc(sizeof(ConverseNotifyMsg)*MSG_BUFFER_SIZE);
+        for(int i=0; i<MSG_BUFFER_SIZE; i++){
+            ConverseNotifyMsg *tmp = notifyMsg+i;
+            if(thisNodeHelper->useTreeBcast){
+                tmp->srcRank = CmiMyRank();
+            }else{
+                tmp->srcRank = -1;
+            }            
+            tmp->ptr = (void *)(new CurLoopInfo(FuncNodeHelper::MAX_CHUNKS));
+            CmiSetHandler(tmp, stealWorkHandler);
         }
-        notifyMsg->ptr = (void *)curLoop;
-        CmiSetHandler(notifyMsg, stealWorkHandler);
         thisNodeHelper->helperPtr[CkMyRank()] = this;
     }
 
     ~FuncSingleHelper() {
-        delete curLoop;
-        delete notifyMsg;
+        for(int i=0; i<MSG_BUFFER_SIZE; i++){
+            ConverseNotifyMsg *tmp = notifyMsg+i;
+            CurLoopInfo *loop = (CurLoopInfo *)(tmp->ptr);
+            delete loop;
+        }
+        free(notifyMsg);
+    }
+    
+    ConverseNotifyMsg *getNotifyMsg(){
+        while(1){
+            ConverseNotifyMsg *cur = notifyMsg+nextFreeNotifyMsg;
+            CurLoopInfo *loop = (CurLoopInfo *)(cur->ptr);
+            nextFreeNotifyMsg = (nextFreeNotifyMsg+1)%MSG_BUFFER_SIZE;
+            if(loop->isFree()) return cur;
+        }
+        return NULL;
     }
     
     FuncSingleHelper(CkMigrateMessage *m) {}