1. Simplified the parallelization interface a little bit. The Nodehelper proxy could...
[charm.git] / NodeHelper.h
index 6d652a7f3347b431bc0fd73da2cf5a95e30d1147..15b9449de7c63cf605e7abc6c8889f55e6e00ea0 100644 (file)
@@ -8,6 +8,16 @@
 #define USE_TREE_BROADCAST_THRESHOLD 8
 #define TREE_BCAST_BRANCH (4)
 #define CACHE_LINE_SIZE 64
+/* 1. Using converse-level msg, then the msg is always of highest priority.
+ * And the notification msg comes from the singlehelper where the loop parallelization
+ * is initiated.
+ * 
+ * 2. Using charm-level msg, then the msg could be set with different priorities.
+ * However, the notification msg comes from the singlehelper where the parallelized
+ * loop is executed.
+ * 
+ * */
+#define USE_CONVERSE_NOTIFICATION 1
 
 class FuncSingleHelper;
 
@@ -96,6 +106,12 @@ typedef struct converseNotifyMsg{
     void *ptr;
 }ConverseNotifyMsg;
 
+class CharmNotifyMsg: public CMessage_CharmNotifyMsg{
+public:
+    int srcRank;
+    void *ptr; //the loop info
+};
+
 class FuncNodeHelper : public CBase_FuncNodeHelper {
     friend class FuncSingleHelper;
        
@@ -124,7 +140,7 @@ public:
                         int paramNum, void * param, /* the input parameters for the above func */
                         int numChunks, /* number of chunks to be partitioned */
                         int lowerRange, int upperRange, /* the loop-like parallelization happens in [lowerRange, upperRange] */                        
-                       int sync=1, /* whether the flow will continue until all chunks have finished */
+                        int sync=1, /* whether the flow will continue until all chunks have finished */
                         void *redResult=NULL, REDUCTION_TYPE type=NODEHELPER_NONE /* the reduction result, ONLY SUPPORT SINGLE VAR of TYPE int/float/double */
                         );
     void reduce(void **redBufs, void *redBuf, REDUCTION_TYPE type, int numChunks);
@@ -134,36 +150,77 @@ void SingleHelperStealWork(ConverseNotifyMsg *msg);
 
 /* FuncSingleHelper is a chare located on every core of a node */
 //allowing arbitrary combination of sync and unsync parallelizd loops
-#define MSG_BUFFER_SIZE (3)
+#define TASK_BUFFER_SIZE (3)
 class FuncSingleHelper: public CBase_FuncSingleHelper {
        friend class FuncNodeHelper;
-private: 
+private:
+    int totalHelpers;
+    int notifyMsgBufSize;
+    
     FuncNodeHelper *thisNodeHelper;
+#if USE_CONVERSE_NOTIFICATION
+    //this msg is shared across all SingleHelpers
     ConverseNotifyMsg *notifyMsg;
+#else
+    //acted as a msg buffer for charm-level notification msgs sent to other
+    //SingleHelpers. At each sending, 
+    //1. the msg destination chare (SingleHelper) has to be set.
+    //2. the associated loop info has to be set.
+    CharmNotifyMsg **notifyMsg;
+    CurLoopInfo **taskBuffer;
+    int nextFreeTaskBuffer;
+#endif
     int nextFreeNotifyMsg;
-    //CurLoopInfo *curLoop; /* Points to the current loop that is being processed */
     
 public:
-    FuncSingleHelper(size_t ndhPtr);
+    FuncSingleHelper(int numHelpers);
 
     ~FuncSingleHelper() {
-        for(int i=0; i<MSG_BUFFER_SIZE; i++){
+    #if USE_CONVERSE_NOTIFICATION
+        for(int i=0; i<notifyMsgBufSize; i++){
             ConverseNotifyMsg *tmp = notifyMsg+i;
             CurLoopInfo *loop = (CurLoopInfo *)(tmp->ptr);
             delete loop;
         }
         free(notifyMsg);
+    #else
+        for(int i=0; i<notifyMsgBufSize; i++) delete notifyMsg[i];
+        free(notifyMsg);
+        for(int i=0; i<TASK_BUFFER_SIZE; i++) delete taskBuffer[i];
+        free(taskBuffer);
+    #endif
     }
-    
+#if USE_CONVERSE_NOTIFICATION    
     ConverseNotifyMsg *getNotifyMsg(){
         while(1){
             ConverseNotifyMsg *cur = notifyMsg+nextFreeNotifyMsg;
             CurLoopInfo *loop = (CurLoopInfo *)(cur->ptr);
-            nextFreeNotifyMsg = (nextFreeNotifyMsg+1)%MSG_BUFFER_SIZE;
+            nextFreeNotifyMsg = (nextFreeNotifyMsg+1)%notifyMsgBufSize;
             if(loop->isFree()) return cur;
         }
         return NULL;
     }
+#else
+    CharmNotifyMsg *getNotifyMsg(){
+        while(1){
+            CharmNotifyMsg *cur = notifyMsg[nextFreeNotifyMsg];
+            CurLoopInfo *loop = (CurLoopInfo *)(cur->ptr);
+            nextFreeNotifyMsg = (nextFreeNotifyMsg+1)%notifyMsgBufSize;
+            if(loop==NULL || loop->isFree()) return cur;
+        }
+        return NULL;
+    }
+    CurLoopInfo *getNewTask(){
+        while(1){
+            CurLoopInfo *cur = taskBuffer[nextFreeTaskBuffer];
+            nextFreeTaskBuffer = (nextFreeTaskBuffer+1)%TASK_BUFFER_SIZE;
+            if(cur->isFree()) return cur;
+        }
+        return NULL;
+    }
+#endif    
+    
+    void stealWork(CharmNotifyMsg *msg);
     
     FuncSingleHelper(CkMigrateMessage *m) {}           
 };