A bug fix which has assumed the #chunks a job is partitioned into is the same with...
[charm.git] / NodeHelper.h
1 #ifndef _NODEHELPER_H
2 #define _NODEHELPER_H
3
4 #include <pthread.h>
5 #include <assert.h>
6
7 #include "charm++.h"
8 #include "NodeHelperAPI.h"
9 #include "queueing.h"
10
11 #define USE_CONVERSE_MSG 1
12
13 /* The following only works on X86_64 platform */
14 #define AtomicIncrement(someInt)  __asm__ __volatile__("lock incl (%0)" :: "r" (&(someInt)))
15
16
17 typedef struct SimpleQueue {
18     Queue nodeQ;
19     pthread_mutex_t * lock;
20 }* NodeQueue;
21
22 class Task:public CMessage_Task {
23 public:
24     HelperFn fnPtr;
25     int first;
26     int last;
27     int originRank;
28     int flag;    
29     int paramNum;
30     void *param;
31     
32     //limitation: only allow single variable reduction!!!
33     char redBuf[sizeof(double)];
34     
35     //make sure 32-byte aligned so that each task doesn't cross cache lines
36     //char padding[32-(sizeof(int)*7+sizeof(void *)*2)%32];
37     
38     Task():fnPtr(NULL), param(NULL), paramNum(0) {}
39
40     void init(HelperFn fn,int first_,int last_,int rank){
41         fnPtr = fn;
42         first = first_;
43         last = last_;
44         originRank = rank;
45     }
46
47     void init(HelperFn fn,int first_,int last_,int flag_,int rank) {
48         init(fn, first_, last_, rank);
49         flag=flag_;
50     }
51     
52     void init(HelperFn fn,int first_,int last_,int rank, int paramNum_, void *param_) {
53         init(fn, first_, last_, rank); 
54         paramNum=paramNum_;
55         param=param_;
56     }
57     
58     void init(HelperFn fn,int first_,int last_,int flag_,int rank, int paramNum_, void *param_) {
59         init(fn, first_, last_, rank, paramNum_, param_);
60         flag=flag_;
61     }
62     
63     void setFlag() {
64         flag=1;
65     }
66     int isFlagSet() {
67         return flag;
68     }
69 };
70
71 class FuncSingleHelper;
72
73 class CurLoopInfo{
74     friend class FuncSingleHelper;
75     
76 private:
77     volatile int curChunkIdx;
78     int numChunks;
79     HelperFn fnPtr;
80     int lowerIndex;
81     int upperIndex;
82     int paramNum;
83     void *param;
84     //limitation: only allow single variable reduction of size numChunks!!!
85     void **redBufs;
86     
87     volatile int finishFlag;
88     
89 public:    
90     CurLoopInfo():numChunks(0),fnPtr(NULL), lowerIndex(-1), upperIndex(0), 
91     paramNum(0), param(NULL), curChunkIdx(-1), finishFlag(0), redBufs(NULL) {}
92     
93     ~CurLoopInfo() { delete [] redBufs; }
94     
95     void set(int nc, HelperFn f, int lIdx, int uIdx, int numParams, void *p){        
96         numChunks = nc;
97         fnPtr = f;
98         lowerIndex = lIdx;
99         upperIndex = uIdx;
100         paramNum = numParams;
101         param = p;
102         curChunkIdx = -1;
103         finishFlag = 0;
104     }
105       
106     void waitLoopDone(){
107         while(!__sync_bool_compare_and_swap(&finishFlag, numChunks, 0));
108     }
109     int getNextChunkIdx(){
110         return __sync_add_and_fetch(&curChunkIdx, 1);
111     }
112     void reportFinished(){
113         __sync_add_and_fetch(&finishFlag, 1);
114     }
115     
116     void stealWork();
117 };
118
119 /* FuncNodeHelper is a nodegroup object */
120
121 typedef struct converseNotifyMsg{
122     char core[CmiMsgHeaderSizeBytes];
123     void *ptr;
124 }ConverseNotifyMsg;
125
126 class FuncNodeHelper : public CBase_FuncNodeHelper {
127     friend class FuncSingleHelper;
128 private:
129     ConverseNotifyMsg *notifyMsgs;
130
131 public:
132     static int MAX_CHUNKS;
133     static void printMode(int mode);
134
135 public:
136     int numHelpers;
137     int mode; /* determine whether using dynamic or static scheduling */
138     
139     int numThds; /* only used for pthread version in non-SMP case, the expected #pthreads to be created */
140     
141     CkChareID *helperArr; /* chare ids to the FuncSingleHelpers it manages */
142     FuncSingleHelper **helperPtr; /* ptrs to the FuncSingleHelpers it manages */
143     
144     ~FuncNodeHelper() {
145         delete [] helperArr;
146         delete [] helperPtr;
147         delete [] notifyMsgs;        
148     }
149
150     /* handler is only useful when converse msg is used to initiate tasks on the pseudo-thread */
151     void oneHelperCreated(int hid, CkChareID cid, FuncSingleHelper* cptr, int handler=0) {
152         helperArr[hid] = cid;
153         helperPtr[hid] = cptr;
154
155         CmiSetHandler(&(notifyMsgs[hid]), handler);
156         if(mode == NODEHELPER_STATIC) {
157             notifyMsgs[hid].ptr = (void *)cptr;         
158         }
159     }
160
161 #if CMK_SMP
162     void  waitDone(Task ** thisReq,int chunck);
163 #else   
164     void waitThreadDone(int chunck);
165     void createThread();
166 #endif
167         
168         /* mode_: PTHREAD only available in non-SMP, while STATIC/DYNAMIC are available in SMP */
169         /* numThds: the expected number of pthreads to be spawned */
170     FuncNodeHelper(int mode_, int numThds_);
171     
172     void parallelizeFunc(HelperFn func, /* the function that finishes a partial work on another thread */
173                         int paramNum, void * param, /* the input parameters for the above func */
174                         int msgPriority, /* the priority of the intra-node msg, and node-level msg */
175                         int numChunks, /* number of chunks to be partitioned */
176                         int lowerRange, int upperRange, /* the loop-like parallelization happens in [lowerRange, upperRange] */                        
177                         void *redResult=NULL, REDUCTION_TYPE type=NODEHELPER_NONE /* the reduction result, ONLY SUPPORT SINGLE VAR of TYPE int/float/double */
178                         );
179     void send(Task *);
180     void reduce(Task **thisReq, void *redBuf, REDUCTION_TYPE type, int numChunks);
181 };
182
183 void NotifySingleHelper(ConverseNotifyMsg *msg);
184 void SingleHelperStealWork(ConverseNotifyMsg *msg);
185
186 /* FuncSingleHelper is a chare located on every core of a node */
187 class FuncSingleHelper: public CBase_FuncSingleHelper {
188         friend class FuncNodeHelper;
189 private: 
190     /* BE CAREFUL ABOUT THE FILEDS LAYOUT CONSIDERING CACHE EFFECTS */
191     volatile int counter;
192     int notifyHandler;
193     int stealWorkHandler;
194     CkGroupID nodeHelperID;
195     FuncNodeHelper *thisNodeHelper;
196     Queue reqQ; /* The queue may be simplified for better performance */
197     
198     /* The following two vars are for usage of detecting completion in dynamic scheduling */
199     CmiNodeLock reqLock;
200     /* To reuse such Task memory as each SingleHelper (i.e. a PE) will only
201      * process one node-level parallelization at a time */
202     Task **tasks; /* Note the Task type is a message */
203     
204     CurLoopInfo *curLoop; /* Points to the current loop that is being processed */
205     
206 public:
207     FuncSingleHelper(CkGroupID nid):nodeHelperID(nid) {
208         reqQ = CqsCreate();
209
210         reqLock = CmiCreateLock();
211         counter = 0;
212         
213         tasks = new Task *[FuncNodeHelper::MAX_CHUNKS];
214         for(int i=0; i<FuncNodeHelper::MAX_CHUNKS; i++) tasks[i] = new (8*sizeof(int)) Task();
215         
216         CProxy_FuncNodeHelper fh(nodeHelperID);
217         thisNodeHelper = fh[CkMyNode()].ckLocalBranch();
218         CmiAssert(thisNodeHelper!=NULL);
219         
220         notifyHandler = CmiRegisterHandler((CmiHandler)NotifySingleHelper);
221         stealWorkHandler = CmiRegisterHandler((CmiHandler)SingleHelperStealWork);
222             
223         curLoop = new CurLoopInfo();
224         curLoop->redBufs = new void *[FuncNodeHelper::MAX_CHUNKS];
225         for(int i=0; i<FuncNodeHelper::MAX_CHUNKS; i++) curLoop->redBufs[i] = (void *)(tasks[i]->redBuf);
226     }
227
228     ~FuncSingleHelper() {
229         for(int i=0; i<FuncNodeHelper::MAX_CHUNKS; i++) delete tasks[i];
230         delete [] tasks;        
231                 CmiDestroyLock(reqLock);
232         delete curLoop;
233     }
234     
235     FuncSingleHelper(CkMigrateMessage *m) {}
236     
237     Task **getTasksMem() { return tasks; }
238     
239     void enqueueWork(Task *one) {
240                 unsigned int t = 0; /* default priority */
241         CmiLock(reqLock);
242         CqsEnqueueGeneral(reqQ, (void *)one,CQS_QUEUEING_IFIFO,0,&t);
243         //SimpleQueuePush(reqQ, (char *)one);
244         CmiUnlock(reqLock);
245     }
246     void processWork(int filler); /* filler is here in order to use CkEntryOptions for setting msg priority */
247     void reportCreated() {
248         //CkPrintf("Single helper %d is created on rank %d\n", CkMyPe(), CkMyRank());
249         if(thisNodeHelper->mode == NODEHELPER_DYNAMIC)
250             thisNodeHelper->oneHelperCreated(CkMyRank(), thishandle, this, -1);
251         else if(thisNodeHelper->mode == NODEHELPER_STATIC)
252             thisNodeHelper->oneHelperCreated(CkMyRank(), thishandle, this, notifyHandler);
253         else if(thisNodeHelper->mode == NODEHELPER_CHARE_DYNAMIC)
254             thisNodeHelper->oneHelperCreated(CkMyRank(), thishandle, this, stealWorkHandler);
255     }    
256 };
257
258 #endif