04f0c3f95117348c962110ae161bf31aa5fc0e9c
[charm.git] / NodeHelper.h
1 #ifndef _NODEHELPER_H
2 #define _NODEHELPER_H
3
4 #include <pthread.h>
5 #include <assert.h>
6
7 #include "charm++.h"
8 #include "NodeHelperAPI.h"
9 #include "queueing.h"
10
11 #define USE_CONVERSE_MSG 1
12
13 /* The following only works on X86_64 platform */
14 #define AtomicIncrement(someInt)  __asm__ __volatile__("lock incl (%0)" :: "r" (&(someInt)))
15
16
17 typedef struct SimpleQueue {
18     Queue nodeQ;
19     pthread_mutex_t * lock;
20 }* NodeQueue;
21
22 class Task:public CMessage_Task {
23 public:
24     HelperFn fnPtr;
25     int first;
26     int last;
27     int originRank;
28     int flag;    
29     int paramNum;
30     void *param;
31     
32     //limitation: only allow single variable reduction!!!
33     char redBuf[sizeof(double)];
34     
35     //make sure 32-byte aligned so that each task doesn't cross cache lines
36     //char padding[32-(sizeof(int)*7+sizeof(void *)*2)%32];
37     
38     Task():fnPtr(NULL), param(NULL), paramNum(0) {}
39
40     void init(HelperFn fn,int first_,int last_,int rank){
41         fnPtr = fn;
42         first = first_;
43         last = last_;
44         originRank = rank;
45     }
46
47     void init(HelperFn fn,int first_,int last_,int flag_,int rank) {
48         init(fn, first_, last_, rank);
49         flag=flag_;
50     }
51     
52     void init(HelperFn fn,int first_,int last_,int rank, int paramNum_, void *param_) {
53         init(fn, first_, last_, rank); 
54         paramNum=paramNum_;
55         param=param_;
56     }
57     
58     void init(HelperFn fn,int first_,int last_,int flag_,int rank, int paramNum_, void *param_) {
59         init(fn, first_, last_, rank, paramNum_, param_);
60         flag=flag_;
61     }
62     
63     void setFlag() {
64         flag=1;
65     }
66     int isFlagSet() {
67         return flag;
68     }
69 };
70
71
72 /* FuncNodeHelper is a nodegroup object */
73 class FuncSingleHelper;
74 #if USE_CONVERSE_MSG
75 typedef struct converseNotifyMsg{
76     char core[CmiMsgHeaderSizeBytes];
77     FuncSingleHelper *ptr;
78 }ConverseNotifyMsg;
79 #endif
80
81 class FuncNodeHelper : public CBase_FuncNodeHelper {
82     friend class FuncSingleHelper;
83 private:
84 #if USE_CONVERSE_MSG
85     ConverseNotifyMsg *notifyMsgs;         
86 #endif
87
88 public:
89     static int MAX_CHUNKS;
90     static void printMode(int mode);
91
92 public:
93     int numHelpers;
94     int mode; /* determine whether using dynamic or static scheduling */
95     
96     int numThds; /* only used for pthread version in non-SMP case, the expected #pthreads to be created */
97     
98     CkChareID *helperArr; /* chare ids to the FuncSingleHelpers it manages */
99     FuncSingleHelper **helperPtr; /* ptrs to the FuncSingleHelpers it manages */
100     
101     ~FuncNodeHelper() {
102         delete [] helperArr;
103         delete [] helperPtr;
104     #if USE_CONVERSE_MSG
105         delete [] notifyMsgs;
106     #endif
107     }
108
109     /* handler is only useful when converse msg is used to initiate tasks on the pseudo-thread */
110     void oneHelperCreated(int hid, CkChareID cid, FuncSingleHelper* cptr, int handler=0) {
111         helperArr[hid] = cid;
112         helperPtr[hid] = cptr;
113 #if USE_CONVERSE_MSG        
114         notifyMsgs[hid].ptr = cptr;
115         CmiSetHandler(&(notifyMsgs[hid]), handler);
116 #endif
117     }
118
119 #if CMK_SMP
120     void  waitDone(Task ** thisReq,int chunck);
121 #else   
122     void waitThreadDone(int chunck);
123     void createThread();
124 #endif
125         
126         /* mode_: PTHREAD only available in non-SMP, while STATIC/DYNAMIC are available in SMP */
127         /* numThds: the expected number of pthreads to be spawned */
128     FuncNodeHelper(int mode_, int numThds_);
129     
130     void parallelizeFunc(HelperFn func, /* the function that finishes a partial work on another thread */
131                         int paramNum, void * param, /* the input parameters for the above func */
132                         int msgPriority, /* the priority of the intra-node msg, and node-level msg */
133                         int numChunks, /* number of chunks to be partitioned */
134                         int lowerRange, int upperRange, /* the loop-like parallelization happens in [lowerRange, upperRange] */                        
135                         void *redResult=NULL, REDUCTION_TYPE type=NODEHELPER_NONE /* the reduction result, ONLY SUPPORT SINGLE VAR of TYPE int/float/double */
136                         );
137     void send(Task *);
138     void reduce(Task **thisReq, void *redBuf, REDUCTION_TYPE type, int numChunks);
139 };
140
141 #if USE_CONVERSE_MSG
142 void NotifySingleHelper(ConverseNotifyMsg *msg);
143 #endif
144
145 /* FuncSingleHelper is a chare located on every core of a node */
146 class FuncSingleHelper: public CBase_FuncSingleHelper {
147         friend class FuncNodeHelper;
148 private: 
149     /* BE CAREFUL ABOUT THE FILEDS LAYOUT CONSIDERING CACHE EFFECTS */
150     volatile int counter;
151     int notifyHandler;
152     CkGroupID nodeHelperID;
153     FuncNodeHelper *thisNodeHelper;
154     Queue reqQ; /* The queue may be simplified for better performance */
155     
156     /* The following two vars are for usage of detecting completion in dynamic scheduling */
157     CmiNodeLock reqLock;
158     /* To reuse such Task memory as each SingleHelper (i.e. a PE) will only
159      * process one node-level parallelization at a time */
160     Task **tasks; /* Note the Task type is a message */
161 public:
162     FuncSingleHelper(CkGroupID nid):nodeHelperID(nid) {
163         reqQ = CqsCreate();
164
165         reqLock = CmiCreateLock();
166         counter = 0;
167         
168         tasks = new Task *[FuncNodeHelper::MAX_CHUNKS];
169         for(int i=0; i<FuncNodeHelper::MAX_CHUNKS; i++) tasks[i] = new (8*sizeof(int)) Task();
170         
171         CProxy_FuncNodeHelper fh(nodeHelperID);
172         thisNodeHelper = fh[CkMyNode()].ckLocalBranch();
173         CmiAssert(thisNodeHelper!=NULL);
174         
175         notifyHandler = 0;
176     #if USE_CONVERSE_MSG
177         notifyHandler = CmiRegisterHandler((CmiHandler)NotifySingleHelper);
178     #endif
179     }
180
181     ~FuncSingleHelper() {
182         for(int i=0; i<FuncNodeHelper::MAX_CHUNKS; i++) delete tasks[i];
183         delete [] tasks;        
184                 CmiDestroyLock(reqLock);
185     }
186     
187     FuncSingleHelper(CkMigrateMessage *m) {}
188     
189     Task **getTasksMem() { return tasks; }
190     
191     void enqueueWork(Task *one) {
192                 unsigned int t = 0; /* default priority */
193         CmiLock(reqLock);
194         CqsEnqueueGeneral(reqQ, (void *)one,CQS_QUEUEING_IFIFO,0,&t);
195         //SimpleQueuePush(reqQ, (char *)one);
196         CmiUnlock(reqLock);
197     }
198     void processWork(int filler); /* filler is here in order to use CkEntryOptions for setting msg priority */
199     void reportCreated() {
200         //CkPrintf("Single helper %d is created on rank %d\n", CkMyPe(), CkMyRank());        
201         thisNodeHelper->oneHelperCreated(CkMyRank(), thishandle, this, notifyHandler);
202     }
203 };
204
205 #endif