Adapted the fft-trans to the newly polished API.
[charm.git] / NodeHelper.C
1 #include "NodeHelper.h"
2
3 //=======Beginning of pthread version of scheduling which is used in non-SMP =======//
4 #if !CMK_SMP
5 NodeQueue Q;
6
7 //vars local to spawned threads
8 //Note: __thread is not portable, but works pretty much anywhere pthreads work.
9 // after C++11 this should be thread_local
10 __thread pthread_mutex_t lock;
11 __thread pthread_cond_t condition;
12
13 //vars to the main flow (master thread)
14 pthread_t * threads;
15 pthread_mutex_t **allLocks;
16 pthread_cond_t **allConds;
17
18 //global barrier
19 pthread_mutex_t gLock;
20 pthread_cond_t gCond;
21 pthread_barrier_t barr;
22 //testing counter
23 volatile int finishedCnt;
24
25 void * threadWork(void * id) {
26     long my_id =(long) id;
27     //printf("thread :%ld\n",my_id);
28     CmiSetCPUAffinity(my_id+1);
29
30     pthread_mutex_init(&lock, NULL);
31     pthread_cond_init(&condition, NULL);
32
33     allLocks[my_id] = &lock;
34     allConds[my_id] = &condition;
35
36     while (1) {
37         pthread_mutex_lock(&lock);
38         pthread_cond_wait(&condition,&lock);
39         pthread_mutex_unlock(&lock);
40         void * r;
41         Task * one;
42         CmiLock(Q->lock);
43         CqsDequeue(Q->nodeQ,&r);
44         CmiUnlock(Q->lock);
45         one=(Task *)r;
46
47         while (one) {
48             //printf("starttime:%lf,id:%ld,proc:%d\n",CmiWallTimer(),my_id,CkMyPe());
49             (one->fnPtr)(one->first, one->last, (void *)(one->redBuf), one->paramNum, one->param);
50             pthread_barrier_wait(&barr);
51             //one->setFlag();
52             //printf
53             //printf("endtime:%lf,id:%ld\n",CmiWallTimer(),my_id);
54
55             //Testing
56             //AtomicIncrement(finishedCnt);
57             if (my_id==0)
58                 finishedCnt=4;
59             //printf("finishedCnt = %d\n", finishedCnt);
60
61             CmiLock((Q->lock));
62             CqsDequeue(Q->nodeQ,&r);
63             CmiUnlock((Q->lock));
64             one=(Task *)r;
65
66         }
67     }
68
69 }
70
71 void FuncNodeHelper::createThread() {
72     int threadNum = numThds;
73     pthread_attr_t attr;
74     finishedCnt=0;
75     pthread_barrier_init(&barr,NULL,threadNum);
76     allLocks = (pthread_mutex_t **)malloc(sizeof(void *)*threadNum);
77     allConds = (pthread_cond_t **)malloc(sizeof(void *)*threadNum);
78     memset(allLocks, 0, sizeof(void *)*threadNum);
79     memset(allConds, 0, sizeof(void *)*threadNum);
80
81     pthread_attr_init(&attr);
82     pthread_attr_setdetachstate(&attr,PTHREAD_CREATE_JOINABLE);
83     Q=(NodeQueue )malloc(sizeof(struct SimpleQueue));
84     Q->nodeQ=CqsCreate();
85     Q->lock=CmiCreateLock();
86     /*for(int i=0;i<threadNum;i++){
87         //Q[i]=
88         Q[i]=(NodeQueue)malloc(sizeof(struct SimpleQueue));
89         Q[i]->nodeQ=CqsCreate();
90         Q[i]->lock=CmiCreateLock();
91     }*/
92     threads=(pthread_t *)malloc(threadNum*sizeof(pthread_t));
93
94
95     //create queue;
96     for (int i=0; i<threadNum; i++)
97         pthread_create(&threads[i],&attr,threadWork,(void *)i);
98 }
99 #endif //end of !CMK_SMP (definitions for vars and functions used in non-SMP)
100
101 //=======End of pthread version of static scheduling=======//
102
103 FuncNodeHelper::FuncNodeHelper(int mode_,int numThds_):
104     mode(mode_), numThds(numThds_)
105 {   
106     
107     //CkPrintf("FuncNodeHelper created on node %d\n", CkMyNode());
108          
109     traceRegisterUserEvent("assign work",20);
110     traceRegisterUserEvent("finish signal",21);
111     
112 #if CMK_SMP
113     if (mode==NODEHELPER_DYNAMIC || 
114         mode==NODEHELPER_STATIC) {
115         numHelpers = CkMyNodeSize();
116         helperArr = new CkChareID[numHelpers];
117         helperPtr = new FuncSingleHelper *[numHelpers];
118         int pestart = CkNodeFirst(CkMyNode());
119         for (int i=0; i<numHelpers; i++) {
120             CProxy_FuncSingleHelper::ckNew(thisgroup, &helperArr[i], pestart+i);
121             helperPtr[i] = NULL;
122         }
123         for (int i=0; i<numHelpers; i++) {
124             CProxy_FuncSingleHelper helpProxy(helperArr[i]);
125             helpProxy.reportCreated();
126         }
127     }
128 #else
129     CmiAssert(mode==NODEHELPER_PTHREAD);
130     createThread();    
131 #endif
132 }
133
134 /* Used for dynamic scheduling as it's a node-level msg */
135 /* So this function will be executed on any PE of this node */
136 void FuncNodeHelper::send(Task * msg) {
137     (msg->fnPtr)(msg->first,msg->last,(void *)(msg->redBuf),msg->paramNum, msg->param);
138     CmiNodeLock lock = helperPtr[msg->originRank]->reqLock;
139     CmiLock(lock);
140     helperPtr[msg->originRank]->counter++;
141     CmiUnlock(lock);
142 }
143
144 int FuncNodeHelper::MAX_CHUNKS = 64;
145
146 #if CMK_TRACE_ENABLED
147 #define TRACE_START(id) _start = CmiWallTimer()
148 #define TRACE_BRACKET(id) traceUserBracketEvent(id,_start,CmiWallTimer())
149 #else
150 #define TRACE_START(id)
151 #define TRACE_BRACKET(id)
152 #endif
153
154 void FuncNodeHelper::parallelizeFunc(HelperFn func, int paramNum, void * param, 
155                                     int msgPriority, int numChunks, int lowerRange, int upperRange, 
156                                     void *redResult, REDUCTION_TYPE type) {
157                                         
158     double _start; //may be used for tracing
159     
160     if(numChunks > MAX_CHUNKS){ 
161         CkPrintf("NodeHelper[%d]: WARNING! chunk is set to MAX_CHUNKS=%d\n", CmiMyPe(), MAX_CHUNKS);
162         numChunks = MAX_CHUNKS;
163     }
164     
165     Task **task = helperPtr[CkMyRank()]->getTasksMem();
166
167     int first = lowerRange;
168     int unit = (upperRange-lowerRange+1)/numChunks;
169     int remainder = (upperRange-lowerRange+1)-unit*numChunks;
170     
171     /* "stride" determines the number of loop iterations to be done in each chunk
172      * for chunk indexed at 0 to remainder-1, stride is "unit+1";
173      * for chunk indexed at remainder to numChunks-1, stride is "unit"
174      */
175      int stride;
176     
177     //for using nodequeue
178 #if CMK_SMP
179     if (mode==NODEHELPER_DYNAMIC) {
180         CProxy_FuncNodeHelper fh(thisgroup);
181
182         TRACE_START(20);        
183         stride = unit+1;
184         for (int i=0; i<remainder; i++, first+=stride) {          
185             task[i]->init(func, first, first+stride, CkMyRank(), paramNum, param);
186             *((int *)CkPriorityPtr(task[i]))=msgPriority;
187             CkSetQueueing(task[i],CK_QUEUEING_IFIFO);
188             fh[CkMyNode()].send(task[i]);
189         }
190         
191         stride = unit;
192         for(int i=remainder; i<numChunks; i++, first+=stride) {
193             task[i]->init(func, first, first+stride, CkMyRank(), paramNum, param);
194             *((int *)CkPriorityPtr(task[i]))=msgPriority;
195             CkSetQueueing(task[i],CK_QUEUEING_IFIFO);
196             fh[CkMyNode()].send(task[i]);
197         }
198         TRACE_BRACKET(20);
199         
200         TRACE_START(21);
201         FuncSingleHelper *fs = helperPtr[CmiMyRank()];
202         while (fs->counter!=numChunks)
203             CsdScheduleNodePoll();
204         //CkPrintf("counter:%d,master:%d\n",counter[master],master);
205         fs->counter = 0;
206         TRACE_BRACKET(21);        
207     } else if (mode==NODEHELPER_STATIC) {
208         TRACE_START(20);
209                 
210         stride = unit+1;
211         for (int i=0; i<remainder; i++, first+=stride) {
212             task[i]->init(func, first, first+stride, 0, CkMyRank(),paramNum, param);            
213             helperPtr[i%CkMyNodeSize()]->enqueueWork(task[i]);
214         }
215         
216         stride = unit;
217         for (int i=remainder; i<numChunks; i++, first+=stride) {
218             task[i]->init(func, first, first+stride, 0, CkMyRank(),paramNum, param);            
219             helperPtr[i%CkMyNodeSize()]->enqueueWork(task[i]);
220         }
221         
222         CkEntryOptions entOpts;
223         entOpts.setPriority(msgPriority);
224
225         for (int i=0; i<numHelpers; i++) {
226             if (i!=CkMyRank()) {
227                 CProxy_FuncSingleHelper helpProxy(helperArr[i]);                
228                 helpProxy.processWork(0, &entOpts);
229             }
230         }
231         helperPtr[CkMyRank()]->processWork(0);
232         
233         TRACE_BRACKET(20);
234         
235         TRACE_START(21);
236         waitDone(task,numChunks);
237         TRACE_BRACKET(21);
238     }
239 #else
240 //non-SMP case
241 /* Only works in the non-SMP case */
242     CmiAssert(mode == NODEHELPER_PTHREAD);
243     
244     TRACE_START(20);
245     stride = unit+1;
246     for (int i=0; i<remainder; i++, first+=stride) {
247         task[i]->init(func, first, first+stride, 0, CkMyRank(),paramNum, param);            
248         CmiLock((Q->lock));
249         unsigned int t=(int)(CmiWallTimer()*1000);
250         CqsEnqueueGeneral((Q->nodeQ), (void *)task[i],CQS_QUEUEING_IFIFO,0,&t);
251         CmiUnlock((Q->lock));
252     }
253     
254     stride = unit;
255     for (int i=remainder; i<numChunks; i++, first+=stride) {
256         task[i]->init(func, first, first+stride, 0, CkMyRank(),paramNum, param);            
257         CmiLock((Q->lock));
258         unsigned int t=(int)(CmiWallTimer()*1000);
259         CqsEnqueueGeneral((Q->nodeQ), (void *)task[i],CQS_QUEUEING_IFIFO,0,&t);
260         CmiUnlock((Q->lock));
261     }    
262     //signal the thread
263     for (int i=0; i<threadNum; i++) {
264         pthread_mutex_lock(allLocks[i]);
265         pthread_cond_signal(allConds[i]);
266         pthread_mutex_unlock(allLocks[i]);
267     }
268     TRACE_BRACKET(20);
269     
270     TRACE_START(21);
271     //wait for the result
272     waitThreadDone(numChunks);
273     TRACE_BRACKET(21);
274 #endif
275
276     if (type!=NODEHELPER_NONE)
277         reduce(task, redResult, type, numChunks);            
278     return;
279 }
280
281 #define COMPUTE_REDUCTION(T) {\
282     for(int i=0; i<numChunks; i++) {\
283      result += *((T *)(thisReq[i]->redBuf)); \
284      /*CkPrintf("Nodehelper Reduce: %d\n", result);*/ \
285     }\
286 }
287
288 void FuncNodeHelper::reduce(Task ** thisReq, void *redBuf, REDUCTION_TYPE type, int numChunks) {
289     switch(type){
290         case NODEHELPER_INT_SUM:
291         {
292             int result=0;
293             COMPUTE_REDUCTION(int)
294             *((int *)redBuf) = result;
295             break;
296         }
297         case NODEHELPER_FLOAT_SUM:
298         {
299             float result=0;
300             COMPUTE_REDUCTION(float)
301             *((float *)redBuf) = result;
302             break;
303         }
304         case NODEHELPER_DOUBLE_SUM:
305         {
306             double result=0;
307             COMPUTE_REDUCTION(double)
308             *((double *)redBuf) = result;
309             break;
310         }
311         default:
312         break;
313     }
314 }
315
316 #if CMK_SMP
317 void FuncNodeHelper::waitDone(Task ** thisReq,int chunck) {
318     int flag = 1,i;
319     while (1) {
320         for (i=0; i<chunck; i++)
321             flag = flag & thisReq[i]->isFlagSet();
322         if (flag) break;
323         flag = 1;
324     }
325 }
326 #else
327 void FuncNodeHelper::waitThreadDone(int chunck) {
328     while (finishedCnt!=chunck);
329     finishedCnt=0;
330 }
331 #endif
332
333 //======================================================================//
334 // Functions regarding helpers that parallelize a single function on a  //
335 // single node (like OpenMP)                                            // 
336 //======================================================================//
337 void FuncSingleHelper::processWork(int filler) {
338     Task *one = NULL; // = (WorkReqEntry *)SimpleQueuePop(reqQ);    
339     void *tmp;
340     
341     CmiLock(reqLock);
342     CqsDequeue(reqQ, &tmp);
343     CmiUnlock(reqLock);    
344
345     one = (Task *)tmp;
346     while (one) {
347         (one->fnPtr)(one->first,one->last,(void *)(one->redBuf), one->paramNum, one->param);
348         int *partial = (int *)(one->redBuf);
349         //CkPrintf("SingleHelper[%d]: partial=%d\n", CkMyRank(), *partial);
350         one->setFlag();
351         CmiLock(reqLock);
352         CqsDequeue(reqQ, &tmp);
353         one = (Task *)tmp;
354         CmiUnlock(reqLock);
355     }
356 }
357
358 void FuncSingleHelper::reportCreated() {
359     //CkPrintf("Single helper %d is created on rank %d\n", CkMyPe(), CkMyRank());
360     CProxy_FuncNodeHelper fh(nodeHelperID);
361     CProxy_FuncSingleHelper thisproxy(thishandle);
362     fh[CkMyNode()].ckLocalBranch()->oneHelperCreated(CkMyRank(), thishandle, this);
363 }
364 //======================================================================//
365 //   End of functions related with FuncSingleHelper                     //
366 //======================================================================//
367
368 CProxy_FuncNodeHelper NodeHelper_Init(int mode,int numThds){
369     return CProxy_FuncNodeHelper::ckNew(mode, numThds);
370 }
371
372 void NodeHelper_Parallelize(CProxy_FuncNodeHelper nodeHelper, HelperFn func, 
373                         int paramNum, void * param, int msgPriority,
374                         int numChunks, int lowerRange, int upperRange, 
375                         void *redResult, REDUCTION_TYPE type)
376 {
377     nodeHelper[CkMyNode()].ckLocalBranch()->parallelizeFunc(func, paramNum, param, msgPriority, numChunks, lowerRange, upperRange, redResult, type);
378 }
379
380 #include "NodeHelper.def.h"