Fixed a bug in using the macro for the number of test iterations.
[charm.git] / NodeHelper.h
1 #ifndef _NODEHELPER_H
2 #define _NODEHELPER_H
3
4 #include <pthread.h>
5 #include <assert.h>
6
7 #include "charm++.h"
8 #include "NodeHelperAPI.h"
9 #include "queueing.h"
10
11 #define USE_CONVERSE_MSG 1
12 #define USE_TREE_BROADCAST 0
13 #define TREE_BCAST_BRANCH (4)
14
15 /* The following only works on X86_64 platform */
16 #define AtomicIncrement(someInt)  __asm__ __volatile__("lock incl (%0)" :: "r" (&(someInt)))
17
18
19 typedef struct SimpleQueue {
20     Queue nodeQ;
21     pthread_mutex_t * lock;
22 }* NodeQueue;
23
24 class Task:public CMessage_Task {
25 public:
26     HelperFn fnPtr;
27     int first;
28     int last;
29     int originRank;
30     int flag;    
31     int paramNum;
32     void *param;
33     
34     //limitation: only allow single variable reduction!!!
35     char redBuf[sizeof(double)];
36     
37     //make sure 32-byte aligned so that each task doesn't cross cache lines
38     //char padding[32-(sizeof(int)*7+sizeof(void *)*2)%32];
39     
40     Task():fnPtr(NULL), param(NULL), paramNum(0) {}
41
42     void init(HelperFn fn,int first_,int last_,int rank){
43         fnPtr = fn;
44         first = first_;
45         last = last_;
46         originRank = rank;
47     }
48
49     void init(HelperFn fn,int first_,int last_,int flag_,int rank) {
50         init(fn, first_, last_, rank);
51         flag=flag_;
52     }
53     
54     void init(HelperFn fn,int first_,int last_,int rank, int paramNum_, void *param_) {
55         init(fn, first_, last_, rank); 
56         paramNum=paramNum_;
57         param=param_;
58     }
59     
60     void init(HelperFn fn,int first_,int last_,int flag_,int rank, int paramNum_, void *param_) {
61         init(fn, first_, last_, rank, paramNum_, param_);
62         flag=flag_;
63     }
64     
65     void setFlag() {
66         flag=1;
67     }
68     int isFlagSet() {
69         return flag;
70     }
71 };
72
73 class FuncSingleHelper;
74
75 class CurLoopInfo{
76     friend class FuncSingleHelper;
77     
78 private:
79     volatile int curChunkIdx;
80     int numChunks;
81     HelperFn fnPtr;
82     int lowerIndex;
83     int upperIndex;
84     int paramNum;
85     void *param;
86     //limitation: only allow single variable reduction of size numChunks!!!
87     void **redBufs;
88     
89     volatile int finishFlag;
90     
91 public:    
92     CurLoopInfo():numChunks(0),fnPtr(NULL), lowerIndex(-1), upperIndex(0), 
93     paramNum(0), param(NULL), curChunkIdx(-1), finishFlag(0), redBufs(NULL) {}
94     
95     ~CurLoopInfo() { delete [] redBufs; }
96     
97     void set(int nc, HelperFn f, int lIdx, int uIdx, int numParams, void *p){        
98         numChunks = nc;
99         fnPtr = f;
100         lowerIndex = lIdx;
101         upperIndex = uIdx;
102         paramNum = numParams;
103         param = p;
104         curChunkIdx = -1;
105         finishFlag = 0;
106     }
107       
108     void waitLoopDone(){
109         while(!__sync_bool_compare_and_swap(&finishFlag, numChunks, 0));
110     }
111     int getNextChunkIdx(){
112         return __sync_add_and_fetch(&curChunkIdx, 1);
113     }
114     void reportFinished(){
115         __sync_add_and_fetch(&finishFlag, 1);
116     }
117     
118     void stealWork();
119 };
120
121 /* FuncNodeHelper is a nodegroup object */
122
123 typedef struct converseNotifyMsg{
124     char core[CmiMsgHeaderSizeBytes];
125 #if USE_TREE_BROADCAST
126     int srcRank;
127 #endif    
128     void *ptr;
129 }ConverseNotifyMsg;
130
131 class FuncNodeHelper : public CBase_FuncNodeHelper {
132     friend class FuncSingleHelper;
133 private:
134     ConverseNotifyMsg *notifyMsgs;
135
136 public:
137     static int MAX_CHUNKS;
138     static void printMode(int mode);
139
140 public:
141     int numHelpers;
142     int mode; /* determine whether using dynamic or static scheduling */
143     
144     int numThds; /* only used for pthread version in non-SMP case, the expected #pthreads to be created */
145     
146     CkChareID *helperArr; /* chare ids to the FuncSingleHelpers it manages */
147     FuncSingleHelper **helperPtr; /* ptrs to the FuncSingleHelpers it manages */
148     
149     ~FuncNodeHelper() {
150         delete [] helperArr;
151         delete [] helperPtr;
152         delete [] notifyMsgs;        
153     }
154
155     /* handler is only useful when converse msg is used to initiate tasks on the pseudo-thread */
156     void oneHelperCreated(int hid, CkChareID cid, FuncSingleHelper* cptr, int handler=0) {
157         helperArr[hid] = cid;
158         helperPtr[hid] = cptr;
159
160         CmiSetHandler(&(notifyMsgs[hid]), handler);
161         if(mode == NODEHELPER_STATIC) {
162             notifyMsgs[hid].ptr = (void *)cptr;         
163         }
164     }
165
166 #if CMK_SMP
167     void  waitDone(Task ** thisReq,int chunck);
168 #else   
169     void waitThreadDone(int chunck);
170     void createThread();
171 #endif
172         
173         /* mode_: PTHREAD only available in non-SMP, while STATIC/DYNAMIC are available in SMP */
174         /* numThds: the expected number of pthreads to be spawned */
175     FuncNodeHelper(int mode_, int numThds_);
176     
177     void parallelizeFunc(HelperFn func, /* the function that finishes a partial work on another thread */
178                         int paramNum, void * param, /* the input parameters for the above func */
179                         int msgPriority, /* the priority of the intra-node msg, and node-level msg */
180                         int numChunks, /* number of chunks to be partitioned */
181                         int lowerRange, int upperRange, /* the loop-like parallelization happens in [lowerRange, upperRange] */                        
182                         void *redResult=NULL, REDUCTION_TYPE type=NODEHELPER_NONE /* the reduction result, ONLY SUPPORT SINGLE VAR of TYPE int/float/double */
183                         );
184     void send(Task *);
185     void reduce(Task **thisReq, void *redBuf, REDUCTION_TYPE type, int numChunks);
186 };
187
188 void NotifySingleHelper(ConverseNotifyMsg *msg);
189 void SingleHelperStealWork(ConverseNotifyMsg *msg);
190
191 /* FuncSingleHelper is a chare located on every core of a node */
192 class FuncSingleHelper: public CBase_FuncSingleHelper {
193         friend class FuncNodeHelper;
194 private: 
195     /* BE CAREFUL ABOUT THE FILEDS LAYOUT CONSIDERING CACHE EFFECTS */
196     volatile int counter;
197     int notifyHandler;
198     int stealWorkHandler;
199     CkGroupID nodeHelperID;
200     FuncNodeHelper *thisNodeHelper;
201     Queue reqQ; /* The queue may be simplified for better performance */
202     
203     /* The following two vars are for usage of detecting completion in dynamic scheduling */
204     CmiNodeLock reqLock;
205     /* To reuse such Task memory as each SingleHelper (i.e. a PE) will only
206      * process one node-level parallelization at a time */
207     Task **tasks; /* Note the Task type is a message */
208     
209     CurLoopInfo *curLoop; /* Points to the current loop that is being processed */
210     
211 public:
212     FuncSingleHelper(CkGroupID nid):nodeHelperID(nid) {
213         reqQ = CqsCreate();
214
215         reqLock = CmiCreateLock();
216         counter = 0;
217         
218         tasks = new Task *[FuncNodeHelper::MAX_CHUNKS];
219         for(int i=0; i<FuncNodeHelper::MAX_CHUNKS; i++) tasks[i] = new (8*sizeof(int)) Task();
220         
221         CProxy_FuncNodeHelper fh(nodeHelperID);
222         thisNodeHelper = fh[CkMyNode()].ckLocalBranch();
223         CmiAssert(thisNodeHelper!=NULL);
224         
225         notifyHandler = CmiRegisterHandler((CmiHandler)NotifySingleHelper);
226         stealWorkHandler = CmiRegisterHandler((CmiHandler)SingleHelperStealWork);
227             
228         curLoop = new CurLoopInfo();
229         curLoop->redBufs = new void *[FuncNodeHelper::MAX_CHUNKS];
230         for(int i=0; i<FuncNodeHelper::MAX_CHUNKS; i++) curLoop->redBufs[i] = (void *)(tasks[i]->redBuf);
231     }
232
233     ~FuncSingleHelper() {
234         for(int i=0; i<FuncNodeHelper::MAX_CHUNKS; i++) delete tasks[i];
235         delete [] tasks;        
236                 CmiDestroyLock(reqLock);
237         delete curLoop;
238     }
239     
240     FuncSingleHelper(CkMigrateMessage *m) {}
241     
242     Task **getTasksMem() { return tasks; }
243     
244     void enqueueWork(Task *one) {
245                 unsigned int t = 0; /* default priority */
246         CmiLock(reqLock);
247         CqsEnqueueGeneral(reqQ, (void *)one,CQS_QUEUEING_IFIFO,0,&t);
248         //SimpleQueuePush(reqQ, (char *)one);
249         CmiUnlock(reqLock);
250     }
251     void processWork(int filler); /* filler is here in order to use CkEntryOptions for setting msg priority */
252     void reportCreated() {
253         //CkPrintf("Single helper %d is created on rank %d\n", CkMyPe(), CkMyRank());
254         if(thisNodeHelper->mode == NODEHELPER_DYNAMIC)
255             thisNodeHelper->oneHelperCreated(CkMyRank(), thishandle, this, -1);
256         else if(thisNodeHelper->mode == NODEHELPER_STATIC)
257             thisNodeHelper->oneHelperCreated(CkMyRank(), thishandle, this, notifyHandler);
258         else if(thisNodeHelper->mode == NODEHELPER_CHARE_DYNAMIC)
259             thisNodeHelper->oneHelperCreated(CkMyRank(), thishandle, this, stealWorkHandler);
260     }    
261 };
262
263 #endif