41f077d0a1381380b16dbe779c1d1abf9eafefc1
[charm.git] / NodeHelper.h
1 #ifndef _NODEHELPER_H
2 #define _NODEHELPER_H
3 #include <assert.h>
4
5 #include "charm++.h"
6 #include "NodeHelperAPI.h"
7
8 #define USE_TREE_BROADCAST_THRESHOLD 8
9 #define TREE_BCAST_BRANCH (4)
10 #define CACHE_LINE_SIZE 64
11
12 class FuncSingleHelper;
13
14 class CurLoopInfo{
15     friend class FuncSingleHelper;
16     
17 private:
18     volatile int curChunkIdx;
19     int numChunks;
20     HelperFn fnPtr;
21     int lowerIndex;
22     int upperIndex;
23     int paramNum;
24     void *param;
25     //limitation: only allow single variable reduction of size numChunks!!!
26     void **redBufs;
27         char *bufSpace;
28
29     volatile int finishFlag;
30     
31     //a tag to indicate whether the task for this new loop has been inited
32     //this tag is needed to prevent other helpers to run the old task
33     int inited;
34     
35 public:    
36     CurLoopInfo(int maxChunks):numChunks(0),fnPtr(NULL), lowerIndex(-1), upperIndex(0), 
37     paramNum(0), param(NULL), curChunkIdx(-1), finishFlag(0), redBufs(NULL), bufSpace(NULL), inited(0) 
38         {
39                 redBufs = new void *[maxChunks];
40                 bufSpace = new char[maxChunks * CACHE_LINE_SIZE];
41         for(int i=0; i<maxChunks; i++) redBufs[i] = (void *)(bufSpace+i*CACHE_LINE_SIZE);
42         }
43     
44     ~CurLoopInfo() { 
45                 delete [] redBufs; 
46                 delete [] bufSpace;
47         }
48     
49     void set(int nc, HelperFn f, int lIdx, int uIdx, int numParams, void *p){        /*
50       * WARNING: there's a rare data-racing case here. The current loop is
51       * about to finish (just before setting inited to 0; A helper (say B) 
52       * just enters the stealWork and passes the inited check. The helper 
53       * (say A) is very fast, and starts the next loop, and happens enter
54       * into the middle of this function. Then helper B will face corrupted
55       * task info as it is trying to execute the old loop task!
56       * In reality for user cases, this case happens very rarely!! -Chao Mei
57       */
58         numChunks = nc;
59         fnPtr = f;
60         lowerIndex = lIdx;
61         upperIndex = uIdx;
62         paramNum = numParams;
63         param = p;
64         curChunkIdx = -1;
65         finishFlag = 0;
66         //needs to be set last
67         inited = 1;
68     }
69       
70     void waitLoopDone(int sync){
71         //while(!__sync_bool_compare_and_swap(&finishFlag, numChunks, 0));
72         if(sync) while(finishFlag!=numChunks);
73         //finishFlag = 0;
74         inited = 0;
75     }
76     int getNextChunkIdx(){
77         return __sync_add_and_fetch(&curChunkIdx, 1);
78     }
79     void reportFinished(int counter){
80         if(counter==0) return;
81         __sync_add_and_fetch(&finishFlag, counter);
82     }
83     
84     int isFree() { return finishFlag == numChunks; }
85     
86         void **getRedBufs() { return redBufs; }
87         
88     void stealWork();
89 };
90
91 /* FuncNodeHelper is a nodegroup object */
92
93 typedef struct converseNotifyMsg{
94     char core[CmiMsgHeaderSizeBytes];
95     int srcRank;
96     void *ptr;
97 }ConverseNotifyMsg;
98
99 class FuncNodeHelper : public CBase_FuncNodeHelper {
100     friend class FuncSingleHelper;
101         
102 public:
103     static int MAX_CHUNKS;
104 private:    
105     int numHelpers;    
106     FuncSingleHelper **helperPtr; /* ptrs to the FuncSingleHelpers it manages */
107         int useTreeBcast;
108     
109 public:
110         FuncNodeHelper();
111     ~FuncNodeHelper() {
112         delete [] helperPtr;
113     }
114     
115     void parallelizeFunc(HelperFn func, /* the function that finishes a partial work on another thread */
116                         int paramNum, void * param, /* the input parameters for the above func */
117                         int numChunks, /* number of chunks to be partitioned */
118                         int lowerRange, int upperRange, /* the loop-like parallelization happens in [lowerRange, upperRange] */                        
119                         int sync=1, /* whether the flow will continue until all chunks have finished */
120                         void *redResult=NULL, REDUCTION_TYPE type=NODEHELPER_NONE /* the reduction result, ONLY SUPPORT SINGLE VAR of TYPE int/float/double */
121                         );
122     void reduce(void **redBufs, void *redBuf, REDUCTION_TYPE type, int numChunks);
123 };
124
125 void SingleHelperStealWork(ConverseNotifyMsg *msg);
126
127 /* FuncSingleHelper is a chare located on every core of a node */
128 //allowing arbitrary combination of sync and unsync parallelizd loops
129 #define MSG_BUFFER_SIZE (3)
130 class FuncSingleHelper: public CBase_FuncSingleHelper {
131         friend class FuncNodeHelper;
132 private: 
133     FuncNodeHelper *thisNodeHelper;
134     ConverseNotifyMsg *notifyMsg;
135     int nextFreeNotifyMsg;
136     //CurLoopInfo *curLoop; /* Points to the current loop that is being processed */
137     
138 public:
139     FuncSingleHelper(size_t ndhPtr) {
140         thisNodeHelper = (FuncNodeHelper *)ndhPtr;
141         CmiAssert(thisNodeHelper!=NULL);        
142         int stealWorkHandler = CmiRegisterHandler((CmiHandler)SingleHelperStealWork);
143         
144         nextFreeNotifyMsg = 0;
145         notifyMsg = (ConverseNotifyMsg *)malloc(sizeof(ConverseNotifyMsg)*MSG_BUFFER_SIZE);
146         for(int i=0; i<MSG_BUFFER_SIZE; i++){
147             ConverseNotifyMsg *tmp = notifyMsg+i;
148             if(thisNodeHelper->useTreeBcast){
149                 tmp->srcRank = CmiMyRank();
150             }else{
151                 tmp->srcRank = -1;
152             }            
153             tmp->ptr = (void *)(new CurLoopInfo(FuncNodeHelper::MAX_CHUNKS));
154             CmiSetHandler(tmp, stealWorkHandler);
155         }
156         thisNodeHelper->helperPtr[CkMyRank()] = this;
157     }
158
159     ~FuncSingleHelper() {
160         for(int i=0; i<MSG_BUFFER_SIZE; i++){
161             ConverseNotifyMsg *tmp = notifyMsg+i;
162             CurLoopInfo *loop = (CurLoopInfo *)(tmp->ptr);
163             delete loop;
164         }
165         free(notifyMsg);
166     }
167     
168     ConverseNotifyMsg *getNotifyMsg(){
169         while(1){
170             ConverseNotifyMsg *cur = notifyMsg+nextFreeNotifyMsg;
171             CurLoopInfo *loop = (CurLoopInfo *)(cur->ptr);
172             nextFreeNotifyMsg = (nextFreeNotifyMsg+1)%MSG_BUFFER_SIZE;
173             if(loop->isFree()) return cur;
174         }
175         return NULL;
176     }
177     
178     FuncSingleHelper(CkMigrateMessage *m) {}            
179 };
180
181 #endif