9d12f6c6fef2b4955ed0869ff92353b24a8cea9c
[charm.git] / NodeHelper.h
1 #ifndef _NODEHELPER_H
2 #define _NODEHELPER_H
3
4 #include <pthread.h>
5 #include <assert.h>
6
7 #include "charm++.h"
8 #include "NodeHelperAPI.h"
9 #include "queueing.h"
10
11 #define AtomicIncrement(someInt)  __asm__ __volatile__("lock incl (%0)" :: "r" (&(someInt)))
12
13
14 typedef struct SimpleQueue {
15     Queue nodeQ;
16     pthread_mutex_t * lock;
17 }* NodeQueue;
18
19 class Task:public CMessage_Task {
20 public:
21     HelperFn fnPtr;
22     int first;
23     int last;
24     int originRank;
25     int flag;    
26     int paramNum;
27     void *param;
28     
29     //limitation: only allow single variable reduction!!!
30     char redBuf[sizeof(double)];
31     
32     //make sure 32-byte aligned so that each task doesn't cross cache lines
33     //char padding[32-(sizeof(int)*7+sizeof(void *)*2)%32];
34     
35     Task():fnPtr(NULL), param(NULL), paramNum(0) {}
36
37     void init(HelperFn fn,int first_,int last_,int rank){
38         fnPtr = fn;
39         first = first_;
40         last = last_;
41         originRank = rank;
42     }
43
44     void init(HelperFn fn,int first_,int last_,int flag_,int rank) {
45         init(fn, first_, last_, rank);
46         flag=flag_;
47     }
48     
49     void init(HelperFn fn,int first_,int last_,int rank, int paramNum_, void *param_) {
50         init(fn, first_, last_, rank); 
51         paramNum=paramNum_;
52         param=param_;
53     }
54     
55     void init(HelperFn fn,int first_,int last_,int flag_,int rank, int paramNum_, void *param_) {
56         init(fn, first_, last_, rank, paramNum_, param_);
57         flag=flag_;
58     }
59     
60     void setFlag() {
61         flag=1;
62     }
63     int isFlagSet() {
64         return flag;
65     }
66 };
67
68
69 /* FuncNodeHelper is a nodegroup object */
70 class FuncSingleHelper;
71
72 class FuncNodeHelper : public CBase_FuncNodeHelper {
73 public:
74     static int MAX_CHUNKS;
75     static void printMode(int mode);
76
77 public:
78     int numHelpers;
79     int mode; /* determine whether using dynamic or static scheduling */
80     
81     int numThds; /* only used for pthread version in non-SMP case, the expected #pthreads to be created */
82     
83     CkChareID *helperArr; /* chare ids to the FuncSingleHelpers it manages */
84     FuncSingleHelper **helperPtr; /* ptrs to the FuncSingleHelpers it manages */
85     
86     ~FuncNodeHelper() {
87         delete [] helperArr;
88         delete [] helperPtr;        
89     }
90
91     void oneHelperCreated(int hid, CkChareID cid, FuncSingleHelper* cptr) {
92         helperArr[hid] = cid;
93         helperPtr[hid] = cptr;
94     }
95
96 #if CMK_SMP
97     void  waitDone(Task ** thisReq,int chunck);
98 #else   
99     void waitThreadDone(int chunck);
100     void createThread();
101 #endif
102         
103         /* mode_: PTHREAD only available in non-SMP, while STATIC/DYNAMIC are available in SMP */
104         /* numThds: the expected number of pthreads to be spawned */
105     FuncNodeHelper(int mode_, int numThds_);
106     
107     void parallelizeFunc(HelperFn func, /* the function that finishes a partial work on another thread */
108                         int paramNum, void * param, /* the input parameters for the above func */
109                         int msgPriority, /* the priority of the intra-node msg, and node-level msg */
110                         int numChunks, /* number of chunks to be partitioned */
111                         int lowerRange, int upperRange, /* the loop-like parallelization happens in [lowerRange, upperRange] */                        
112                         void *redResult=NULL, REDUCTION_TYPE type=NODEHELPER_NONE /* the reduction result, ONLY SUPPORT SINGLE VAR of TYPE int/float/double */
113                         );
114     void send(Task *);
115     void reduce(Task **thisReq, void *redBuf, REDUCTION_TYPE type, int numChunks);
116 };
117
118 /* FuncSingleHelper is a chare located on every core of a node */
119 class FuncSingleHelper: public CBase_FuncSingleHelper {
120         friend class FuncNodeHelper;
121 private:
122     CkGroupID nodeHelperID;
123     Queue reqQ; /* The queue may be simplified for better performance */
124     
125     /* The following two vars are for usage of detecting completion in dynamic scheduling */
126     CmiNodeLock reqLock;
127     int counter; 
128     
129     /* To reuse such Task memory as each SingleHelper (i.e. a PE) will only
130      * process one node-level parallelization at a time */
131     Task **tasks; /* Note the Task type is a message */
132
133 public:
134     FuncSingleHelper(CkGroupID nid):nodeHelperID(nid) {
135         reqQ = CqsCreate();
136
137         reqLock = CmiCreateLock();
138         counter = 0;
139         
140         tasks = new Task *[FuncNodeHelper::MAX_CHUNKS];
141         for(int i=0; i<FuncNodeHelper::MAX_CHUNKS; i++) tasks[i] = new (8*sizeof(int)) Task();
142     }
143
144     ~FuncSingleHelper() {
145         for(int i=0; i<FuncNodeHelper::MAX_CHUNKS; i++) delete tasks[i];
146         delete [] tasks;        
147                 CmiDestroyLock(reqLock);
148     }
149     
150     FuncSingleHelper(CkMigrateMessage *m) {}
151     
152     Task **getTasksMem() { return tasks; }
153     
154     void enqueueWork(Task *one) {
155                 unsigned int t = 0; /* default priority */
156         CmiLock(reqLock);
157         CqsEnqueueGeneral(reqQ, (void *)one,CQS_QUEUEING_IFIFO,0,&t);
158         //SimpleQueuePush(reqQ, (char *)one);
159         CmiUnlock(reqLock);
160     }
161     void processWork(int filler); /* filler is here in order to use CkEntryOptions for setting msg priority */
162     void reportCreated();
163 };
164
165 #endif