4ca49ac0cc8bf61bf5f3ba4204d2ff3c29c895c8
[charm.git] / NodeHelper.C
1 #include "NodeHelper.h"
2
3 //=======Beginning of pthread version of scheduling which is used in non-SMP =======//
4 #if !CMK_SMP
5 NodeQueue Q;
6
7 //vars local to spawned threads
8 //Note: __thread is not portable, but works pretty much anywhere pthreads work.
9 // after C++11 this should be thread_local
10 __thread pthread_mutex_t lock;
11 __thread pthread_cond_t condition;
12
13 //vars to the main flow (master thread)
14 pthread_t * threads;
15 pthread_mutex_t **allLocks;
16 pthread_cond_t **allConds;
17
18 //global barrier
19 pthread_mutex_t gLock;
20 pthread_cond_t gCond;
21 pthread_barrier_t barr;
22 //testing counter
23 volatile int finishedCnt;
24
25 void * threadWork(void * id) {
26     long my_id =(long) id;
27     //printf("thread :%ld\n",my_id);
28     CmiSetCPUAffinity(my_id+1);
29
30     pthread_mutex_init(&lock, NULL);
31     pthread_cond_init(&condition, NULL);
32
33     allLocks[my_id] = &lock;
34     allConds[my_id] = &condition;
35
36     while (1) {
37         pthread_mutex_lock(&lock);
38         pthread_cond_wait(&condition,&lock);
39         pthread_mutex_unlock(&lock);
40         void * r;
41         Task * one;
42         CmiLock(Q->lock);
43         CqsDequeue(Q->nodeQ,&r);
44         CmiUnlock(Q->lock);
45         one=(Task *)r;
46
47         while (one) {
48             //printf("starttime:%lf,id:%ld,proc:%d\n",CmiWallTimer(),my_id,CkMyPe());
49             (one->fnPtr)(one->first, one->last, (void *)(one->redBuf), one->paramNum, one->param);
50             pthread_barrier_wait(&barr);
51             //one->setFlag();
52             //printf
53             //printf("endtime:%lf,id:%ld\n",CmiWallTimer(),my_id);
54
55             //Testing
56             //AtomicIncrement(finishedCnt);
57             if (my_id==0)
58                 finishedCnt=4;
59             //printf("finishedCnt = %d\n", finishedCnt);
60
61             CmiLock((Q->lock));
62             CqsDequeue(Q->nodeQ,&r);
63             CmiUnlock((Q->lock));
64             one=(Task *)r;
65
66         }
67     }
68
69 }
70
71 void FuncNodeHelper::createThread() {
72     int threadNum = numThds;
73     pthread_attr_t attr;
74     finishedCnt=0;
75     pthread_barrier_init(&barr,NULL,threadNum);
76     allLocks = (pthread_mutex_t **)malloc(sizeof(void *)*threadNum);
77     allConds = (pthread_cond_t **)malloc(sizeof(void *)*threadNum);
78     memset(allLocks, 0, sizeof(void *)*threadNum);
79     memset(allConds, 0, sizeof(void *)*threadNum);
80
81     pthread_attr_init(&attr);
82     pthread_attr_setdetachstate(&attr,PTHREAD_CREATE_JOINABLE);
83     Q=(NodeQueue )malloc(sizeof(struct SimpleQueue));
84     Q->nodeQ=CqsCreate();
85     Q->lock=CmiCreateLock();
86     /*for(int i=0;i<threadNum;i++){
87         //Q[i]=
88         Q[i]=(NodeQueue)malloc(sizeof(struct SimpleQueue));
89         Q[i]->nodeQ=CqsCreate();
90         Q[i]->lock=CmiCreateLock();
91     }*/
92     threads=(pthread_t *)malloc(threadNum*sizeof(pthread_t));
93
94
95     //create queue;
96     for (int i=0; i<threadNum; i++)
97         pthread_create(&threads[i],&attr,threadWork,(void *)i);
98 }
99 #endif //end of !CMK_SMP (definitions for vars and functions used in non-SMP)
100
101 //=======End of pthread version of static scheduling=======//
102
103 FuncNodeHelper::FuncNodeHelper(int mode_,int numThds_):
104     mode(mode_), numThds(numThds_)
105 {   
106     
107     //CkPrintf("FuncNodeHelper created on node %d\n", CkMyNode());
108          
109     traceRegisterUserEvent("assign work",20);
110     traceRegisterUserEvent("finish signal",21);
111     
112 #if CMK_SMP
113     if (mode==NODEHELPER_DYNAMIC || 
114         mode==NODEHELPER_STATIC) {
115         numHelpers = CkMyNodeSize();
116         helperArr = new CkChareID[numHelpers];
117         helperPtr = new FuncSingleHelper *[numHelpers];
118         
119 #if USE_CONVERSE_MSG
120         notifyMsgs = (ConverseNotifyMsg *)malloc(sizeof(ConverseNotifyMsg)*numHelpers);
121 #endif        
122         
123         int pestart = CkNodeFirst(CkMyNode());
124         for (int i=0; i<numHelpers; i++) {
125             CProxy_FuncSingleHelper::ckNew(thisgroup, &helperArr[i], pestart+i);
126             helperPtr[i] = NULL;
127         }
128         for (int i=0; i<numHelpers; i++) {
129             CProxy_FuncSingleHelper helpProxy(helperArr[i]);
130             helpProxy.reportCreated();
131         }
132     }
133 #else
134     CmiAssert(mode==NODEHELPER_PTHREAD);
135     createThread();    
136 #endif
137 }
138
139 /* Used for dynamic scheduling as it's a node-level msg */
140 /* So this function will be executed on any PE of this node */
141 void FuncNodeHelper::send(Task * msg) {
142     (msg->fnPtr)(msg->first,msg->last,(void *)(msg->redBuf),msg->paramNum, msg->param);
143     CmiNodeLock lock = helperPtr[msg->originRank]->reqLock;
144     CmiLock(lock);
145     helperPtr[msg->originRank]->counter++;
146     CmiUnlock(lock);
147 }
148
149 int FuncNodeHelper::MAX_CHUNKS = 64;
150
151 #if CMK_TRACE_ENABLED
152 #define TRACE_START(id) _start = CmiWallTimer()
153 #define TRACE_BRACKET(id) traceUserBracketEvent(id,_start,CmiWallTimer())
154 #else
155 #define TRACE_START(id)
156 #define TRACE_BRACKET(id)
157 #endif
158
159 void FuncNodeHelper::parallelizeFunc(HelperFn func, int paramNum, void * param, 
160                                     int msgPriority, int numChunks, int lowerRange, int upperRange, 
161                                     void *redResult, REDUCTION_TYPE type) {
162                                         
163     double _start; //may be used for tracing
164     
165     if(numChunks > MAX_CHUNKS){ 
166         CkPrintf("NodeHelper[%d]: WARNING! chunk is set to MAX_CHUNKS=%d\n", CmiMyPe(), MAX_CHUNKS);
167         numChunks = MAX_CHUNKS;
168     }
169         
170     Task **task = helperPtr[CkMyRank()]->getTasksMem();
171
172     int first = lowerRange;
173     int unit = (upperRange-lowerRange+1)/numChunks;
174     int remainder = (upperRange-lowerRange+1)-unit*numChunks;
175     
176     /* "stride" determines the number of loop iterations to be done in each chunk
177      * for chunk indexed at 0 to remainder-1, stride is "unit+1";
178      * for chunk indexed at remainder to numChunks-1, stride is "unit"
179      */
180      int stride;
181     
182     //for using nodequeue
183 #if CMK_SMP
184     if (mode==NODEHELPER_DYNAMIC) {
185         CProxy_FuncNodeHelper fh(thisgroup);
186
187         TRACE_START(20);        
188         stride = unit+1;
189         for (int i=0; i<remainder; i++, first+=stride) {          
190             task[i]->init(func, first, first+stride-1, CkMyRank(), paramNum, param);
191             *((int *)CkPriorityPtr(task[i]))=msgPriority;
192             CkSetQueueing(task[i],CK_QUEUEING_IFIFO);
193             fh[CkMyNode()].send(task[i]);
194         }
195         
196         stride = unit;
197         for(int i=remainder; i<numChunks; i++, first+=stride) {
198             task[i]->init(func, first, first+stride-1, CkMyRank(), paramNum, param);
199             *((int *)CkPriorityPtr(task[i]))=msgPriority;
200             CkSetQueueing(task[i],CK_QUEUEING_IFIFO);
201             fh[CkMyNode()].send(task[i]);
202         }
203         TRACE_BRACKET(20);
204         
205         TRACE_START(21);
206         FuncSingleHelper *fs = helperPtr[CmiMyRank()];
207         while (fs->counter!=numChunks)
208             CsdScheduleNodePoll();
209         //CkPrintf("counter:%d,master:%d\n",counter[master],master);
210         fs->counter = 0;
211         TRACE_BRACKET(21);        
212     } else if (mode==NODEHELPER_STATIC) {
213         TRACE_START(20);
214                 
215         stride = unit+1;
216         for (int i=0; i<remainder; i++, first+=stride) {
217             task[i]->init(func, first, first+stride-1, 0, CkMyRank(),paramNum, param);            
218             helperPtr[i%numHelpers]->enqueueWork(task[i]);
219         }
220         
221         stride = unit;
222         for (int i=remainder; i<numChunks; i++, first+=stride) {
223             task[i]->init(func, first, first+stride-1, 0, CkMyRank(),paramNum, param);            
224             helperPtr[i%numHelpers]->enqueueWork(task[i]);
225         }
226         
227 #if USE_CONVERSE_MSG
228         
229         for (int i=0; i<numHelpers; i++) {
230             if (i!=CkMyRank()) {
231                 CmiPushPE(i, (void *)(notifyMsgs+i));
232             }
233         }
234 #else
235         CkEntryOptions entOpts;
236         entOpts.setPriority(msgPriority);
237
238         for (int i=0; i<numHelpers; i++) {
239             if (i!=CkMyRank()) {
240                 CProxy_FuncSingleHelper helpProxy(helperArr[i]);                
241                 helpProxy.processWork(0, &entOpts);
242             }
243         }    
244 #endif        
245         helperPtr[CkMyRank()]->processWork(0);
246         
247         TRACE_BRACKET(20);
248         
249         TRACE_START(21);
250                 
251         while(!__sync_bool_compare_and_swap(&(helperPtr[CkMyRank()]->counter), numChunks, 0));
252         //waitDone(task,numChunks);
253         
254         TRACE_BRACKET(21);
255     }
256 #else
257 //non-SMP case
258 /* Only works in the non-SMP case */
259     CmiAssert(mode == NODEHELPER_PTHREAD);
260     
261     TRACE_START(20);
262     stride = unit+1;
263     for (int i=0; i<remainder; i++, first+=stride) {
264         task[i]->init(func, first, first+stride-1, 0, CkMyRank(),paramNum, param);            
265         CmiLock((Q->lock));
266         unsigned int t=(int)(CmiWallTimer()*1000);
267         CqsEnqueueGeneral((Q->nodeQ), (void *)task[i],CQS_QUEUEING_IFIFO,0,&t);
268         CmiUnlock((Q->lock));
269     }
270     
271     stride = unit;
272     for (int i=remainder; i<numChunks; i++, first+=stride) {
273         task[i]->init(func, first, first+stride-1, 0, CkMyRank(),paramNum, param);            
274         CmiLock((Q->lock));
275         unsigned int t=(int)(CmiWallTimer()*1000);
276         CqsEnqueueGeneral((Q->nodeQ), (void *)task[i],CQS_QUEUEING_IFIFO,0,&t);
277         CmiUnlock((Q->lock));
278     }    
279     //signal the thread
280     for (int i=0; i<threadNum; i++) {
281         pthread_mutex_lock(allLocks[i]);
282         pthread_cond_signal(allConds[i]);
283         pthread_mutex_unlock(allLocks[i]);
284     }
285     TRACE_BRACKET(20);
286     
287     TRACE_START(21);
288     //wait for the result
289     waitThreadDone(numChunks);
290     TRACE_BRACKET(21);
291 #endif
292
293     if (type!=NODEHELPER_NONE)
294         reduce(task, redResult, type, numChunks);            
295     return;
296 }
297
298 #define COMPUTE_REDUCTION(T) {\
299     for(int i=0; i<numChunks; i++) {\
300      result += *((T *)(thisReq[i]->redBuf)); \
301      /*CkPrintf("Nodehelper Reduce: %d\n", result);*/ \
302     }\
303 }
304
305 void FuncNodeHelper::reduce(Task ** thisReq, void *redBuf, REDUCTION_TYPE type, int numChunks) {
306     switch(type){
307         case NODEHELPER_INT_SUM:
308         {
309             int result=0;
310             COMPUTE_REDUCTION(int)
311             *((int *)redBuf) = result;
312             break;
313         }
314         case NODEHELPER_FLOAT_SUM:
315         {
316             float result=0;
317             COMPUTE_REDUCTION(float)
318             *((float *)redBuf) = result;
319             break;
320         }
321         case NODEHELPER_DOUBLE_SUM:
322         {
323             double result=0;
324             COMPUTE_REDUCTION(double)
325             *((double *)redBuf) = result;
326             break;
327         }
328         default:
329         break;
330     }
331 }
332
333 #if CMK_SMP
334 void FuncNodeHelper::waitDone(Task ** thisReq,int chunck) {
335     int flag = 1,i;
336     while (1) {
337         for (i=0; i<chunck; i++)
338             flag = flag & thisReq[i]->isFlagSet();
339         if (flag) break;
340         flag = 1;
341     }
342 }
343 #else
344 void FuncNodeHelper::waitThreadDone(int chunck) {
345     while (finishedCnt!=chunck);
346     finishedCnt=0;
347 }
348 #endif
349
350 void FuncNodeHelper::printMode(int mode) {
351     if(mode == NODEHELPER_PTHREAD){
352         CkPrintf("NodeHelperLib is used in non-SMP using pthread with a simple dynamic scheduling\n");
353     }else if(mode == NODEHELPER_DYNAMIC){
354         CkPrintf("NodeHelperLib is used in SMP with a simple dynamic scheduling\n");
355     }else if(mode == NODEHELPER_STATIC){
356         CkPrintf("NodeHelperLib is used in SMP with a simple static scheduling\n");
357     }
358 }
359
360 #if USE_CONVERSE_MSG
361 void NotifySingleHelper(ConverseNotifyMsg *msg){
362     FuncSingleHelper *h = msg->ptr;
363     h->processWork(0);
364 }
365 #endif
366
367 //======================================================================//
368 // Functions regarding helpers that parallelize a single function on a  //
369 // single node (like OpenMP)                                            // 
370 //======================================================================//
371 void FuncSingleHelper::processWork(int filler) {
372     Task *one = NULL; // = (WorkReqEntry *)SimpleQueuePop(reqQ);    
373     void *tmp;
374     
375     CmiLock(reqLock);
376     CqsDequeue(reqQ, &tmp);
377     CmiUnlock(reqLock);    
378
379     one = (Task *)tmp;
380     while (one) {
381         (one->fnPtr)(one->first,one->last,(void *)(one->redBuf), one->paramNum, one->param);
382         //int *partial = (int *)(one->redBuf);
383         //CkPrintf("SingleHelper[%d]: partial=%d\n", CkMyRank(), *partial);
384         
385         //one->setFlag();
386         __sync_add_and_fetch(&(thisNodeHelper->helperPtr[one->originRank]->counter), 1);
387         
388         
389         CmiLock(reqLock);
390         CqsDequeue(reqQ, &tmp);
391         one = (Task *)tmp;
392         CmiUnlock(reqLock);
393     }
394 }
395 //======================================================================//
396 //   End of functions related with FuncSingleHelper                     //
397 //======================================================================//
398
399 CProxy_FuncNodeHelper NodeHelper_Init(int mode,int numThds){
400     FuncNodeHelper::printMode(mode);
401     return CProxy_FuncNodeHelper::ckNew(mode, numThds);
402 }
403
404 void NodeHelper_Parallelize(CProxy_FuncNodeHelper nodeHelper, HelperFn func, 
405                         int paramNum, void * param, int msgPriority,
406                         int numChunks, int lowerRange, int upperRange, 
407                         void *redResult, REDUCTION_TYPE type)
408 {
409     nodeHelper[CkMyNode()].ckLocalBranch()->parallelizeFunc(func, paramNum, param, msgPriority, numChunks, lowerRange, upperRange, redResult, type);
410 }
411
412 #include "NodeHelper.def.h"