Added a new scheme (work-stealing) which significantly reduced the overhead. E.g...
[charm.git] / NodeHelper.C
1 #include "NodeHelper.h"
2
3 //=======Beginning of pthread version of scheduling which is used in non-SMP =======//
4 #if !CMK_SMP
5 NodeQueue Q;
6
7 //vars local to spawned threads
8 //Note: __thread is not portable, but works pretty much anywhere pthreads work.
9 // after C++11 this should be thread_local
10 __thread pthread_mutex_t lock;
11 __thread pthread_cond_t condition;
12
13 //vars to the main flow (master thread)
14 pthread_t * threads;
15 pthread_mutex_t **allLocks;
16 pthread_cond_t **allConds;
17
18 //global barrier
19 pthread_mutex_t gLock;
20 pthread_cond_t gCond;
21 pthread_barrier_t barr;
22 //testing counter
23 volatile int finishedCnt;
24
25 void * threadWork(void * id) {
26     long my_id =(long) id;
27     //printf("thread :%ld\n",my_id);
28     CmiSetCPUAffinity(my_id+1);
29
30     pthread_mutex_init(&lock, NULL);
31     pthread_cond_init(&condition, NULL);
32
33     allLocks[my_id] = &lock;
34     allConds[my_id] = &condition;
35
36     while (1) {
37         pthread_mutex_lock(&lock);
38         pthread_cond_wait(&condition,&lock);
39         pthread_mutex_unlock(&lock);
40         void * r;
41         Task * one;
42         CmiLock(Q->lock);
43         CqsDequeue(Q->nodeQ,&r);
44         CmiUnlock(Q->lock);
45         one=(Task *)r;
46
47         while (one) {
48             //printf("starttime:%lf,id:%ld,proc:%d\n",CmiWallTimer(),my_id,CkMyPe());
49             (one->fnPtr)(one->first, one->last, (void *)(one->redBuf), one->paramNum, one->param);
50             pthread_barrier_wait(&barr);
51             //one->setFlag();
52             //printf
53             //printf("endtime:%lf,id:%ld\n",CmiWallTimer(),my_id);
54
55             //Testing
56             //AtomicIncrement(finishedCnt);
57             if (my_id==0)
58                 finishedCnt=4;
59             //printf("finishedCnt = %d\n", finishedCnt);
60
61             CmiLock((Q->lock));
62             CqsDequeue(Q->nodeQ,&r);
63             CmiUnlock((Q->lock));
64             one=(Task *)r;
65
66         }
67     }
68
69 }
70
71 void FuncNodeHelper::createThread() {
72     int threadNum = numThds;
73     pthread_attr_t attr;
74     finishedCnt=0;
75     pthread_barrier_init(&barr,NULL,threadNum);
76     allLocks = (pthread_mutex_t **)malloc(sizeof(void *)*threadNum);
77     allConds = (pthread_cond_t **)malloc(sizeof(void *)*threadNum);
78     memset(allLocks, 0, sizeof(void *)*threadNum);
79     memset(allConds, 0, sizeof(void *)*threadNum);
80
81     pthread_attr_init(&attr);
82     pthread_attr_setdetachstate(&attr,PTHREAD_CREATE_JOINABLE);
83     Q=(NodeQueue )malloc(sizeof(struct SimpleQueue));
84     Q->nodeQ=CqsCreate();
85     Q->lock=CmiCreateLock();
86     /*for(int i=0;i<threadNum;i++){
87         //Q[i]=
88         Q[i]=(NodeQueue)malloc(sizeof(struct SimpleQueue));
89         Q[i]->nodeQ=CqsCreate();
90         Q[i]->lock=CmiCreateLock();
91     }*/
92     threads=(pthread_t *)malloc(threadNum*sizeof(pthread_t));
93
94
95     //create queue;
96     for (int i=0; i<threadNum; i++)
97         pthread_create(&threads[i],&attr,threadWork,(void *)i);
98 }
99 #endif //end of !CMK_SMP (definitions for vars and functions used in non-SMP)
100
101 //=======End of pthread version of static scheduling=======//
102
103 FuncNodeHelper::FuncNodeHelper(int mode_,int numThds_):
104     mode(mode_), numThds(numThds_)
105 {   
106     
107     //CkPrintf("FuncNodeHelper created on node %d\n", CkMyNode());
108          
109     traceRegisterUserEvent("assign work",20);
110     traceRegisterUserEvent("finish signal",21);
111     
112 #if CMK_SMP
113     if (mode==NODEHELPER_DYNAMIC || 
114         mode==NODEHELPER_STATIC
115         || mode==NODEHELPER_CHARE_DYNAMIC) {
116         numHelpers = CkMyNodeSize();
117         helperArr = new CkChareID[numHelpers];
118         helperPtr = new FuncSingleHelper *[numHelpers];
119         
120         notifyMsgs = (ConverseNotifyMsg *)malloc(sizeof(ConverseNotifyMsg)*numHelpers);
121         
122         int pestart = CkNodeFirst(CkMyNode());
123         for (int i=0; i<numHelpers; i++) {
124             CProxy_FuncSingleHelper::ckNew(thisgroup, &helperArr[i], pestart+i);
125             helperPtr[i] = NULL;
126         }
127         for (int i=0; i<numHelpers; i++) {
128             CProxy_FuncSingleHelper helpProxy(helperArr[i]);
129             helpProxy.reportCreated();
130         }
131     }
132 #else
133     CmiAssert(mode==NODEHELPER_PTHREAD);
134     createThread();    
135 #endif
136 }
137
138 /* Used for dynamic scheduling as it's a node-level msg */
139 /* So this function will be executed on any PE of this node */
140 void FuncNodeHelper::send(Task * msg) {
141     (msg->fnPtr)(msg->first,msg->last,(void *)(msg->redBuf),msg->paramNum, msg->param);
142     CmiNodeLock lock = helperPtr[msg->originRank]->reqLock;
143     CmiLock(lock);
144     helperPtr[msg->originRank]->counter++;
145     CmiUnlock(lock);
146 }
147
148 int FuncNodeHelper::MAX_CHUNKS = 64;
149
150 #if CMK_TRACE_ENABLED
151 #define TRACE_START(id) _start = CmiWallTimer()
152 #define TRACE_BRACKET(id) traceUserBracketEvent(id,_start,CmiWallTimer())
153 #else
154 #define TRACE_START(id)
155 #define TRACE_BRACKET(id)
156 #endif
157
158 void FuncNodeHelper::parallelizeFunc(HelperFn func, int paramNum, void * param, 
159                                     int msgPriority, int numChunks, int lowerRange, int upperRange, 
160                                     void *redResult, REDUCTION_TYPE type) {
161                                         
162     double _start; //may be used for tracing
163     
164     if(numChunks > MAX_CHUNKS){ 
165         CkPrintf("NodeHelper[%d]: WARNING! chunk is set to MAX_CHUNKS=%d\n", CmiMyPe(), MAX_CHUNKS);
166         numChunks = MAX_CHUNKS;
167     }
168         
169     Task **task = helperPtr[CkMyRank()]->getTasksMem();
170     
171     /* "stride" determines the number of loop iterations to be done in each chunk
172      * for chunk indexed at 0 to remainder-1, stride is "unit+1";
173      * for chunk indexed at remainder to numChunks-1, stride is "unit"
174      */
175      int stride;
176     
177     //for using nodequeue
178 #if CMK_SMP
179     if (mode==NODEHELPER_DYNAMIC) {
180         int first = lowerRange;
181         int unit = (upperRange-lowerRange+1)/numChunks;
182         int remainder = (upperRange-lowerRange+1)-unit*numChunks;        
183         CProxy_FuncNodeHelper fh(thisgroup);
184
185         TRACE_START(20);        
186         stride = unit+1;
187         for (int i=0; i<remainder; i++, first+=stride) {          
188             task[i]->init(func, first, first+stride-1, CkMyRank(), paramNum, param);
189             *((int *)CkPriorityPtr(task[i]))=msgPriority;
190             CkSetQueueing(task[i],CK_QUEUEING_IFIFO);
191             fh[CkMyNode()].send(task[i]);
192         }
193         
194         stride = unit;
195         for(int i=remainder; i<numChunks; i++, first+=stride) {
196             task[i]->init(func, first, first+stride-1, CkMyRank(), paramNum, param);
197             *((int *)CkPriorityPtr(task[i]))=msgPriority;
198             CkSetQueueing(task[i],CK_QUEUEING_IFIFO);
199             fh[CkMyNode()].send(task[i]);
200         }
201         TRACE_BRACKET(20);
202         
203         TRACE_START(21);
204         FuncSingleHelper *fs = helperPtr[CmiMyRank()];
205         while (fs->counter!=numChunks)
206             CsdScheduleNodePoll();
207         //CkPrintf("counter:%d,master:%d\n",counter[master],master);
208         fs->counter = 0;
209         TRACE_BRACKET(21);        
210     } else if (mode==NODEHELPER_STATIC) {
211         int first = lowerRange;
212         int unit = (upperRange-lowerRange+1)/numChunks;
213         int remainder = (upperRange-lowerRange+1)-unit*numChunks;
214
215         TRACE_START(20);
216                 
217         stride = unit+1;
218         for (int i=0; i<remainder; i++, first+=stride) {
219             task[i]->init(func, first, first+stride-1, 0, CkMyRank(),paramNum, param);            
220             helperPtr[i%numHelpers]->enqueueWork(task[i]);
221         }
222         
223         stride = unit;
224         for (int i=remainder; i<numChunks; i++, first+=stride) {
225             task[i]->init(func, first, first+stride-1, 0, CkMyRank(),paramNum, param);            
226             helperPtr[i%numHelpers]->enqueueWork(task[i]);
227         }
228         
229 #if USE_CONVERSE_MSG
230         
231         for (int i=0; i<numHelpers; i++) {
232             if (i!=CkMyRank()) {
233                 CmiPushPE(i, (void *)(notifyMsgs+i));
234             }
235         }
236 #else
237         CkEntryOptions entOpts;
238         entOpts.setPriority(msgPriority);
239
240         for (int i=0; i<numHelpers; i++) {
241             if (i!=CkMyRank()) {
242                 CProxy_FuncSingleHelper helpProxy(helperArr[i]);                
243                 helpProxy.processWork(0, &entOpts);
244             }
245         }    
246 #endif        
247         helperPtr[CkMyRank()]->processWork(0);
248         
249         TRACE_BRACKET(20);
250         
251         TRACE_START(21);
252                 
253         while(!__sync_bool_compare_and_swap(&(helperPtr[CkMyRank()]->counter), numChunks, 0));
254         //waitDone(task,numChunks);
255         
256         TRACE_BRACKET(21);
257     }else if(mode == NODEHELPER_CHARE_DYNAMIC){
258         TRACE_START(20);
259         
260         FuncSingleHelper *thisHelper = helperPtr[CkMyRank()];
261         CurLoopInfo *curLoop = thisHelper->curLoop;
262         curLoop->set(numChunks, func, lowerRange, upperRange, paramNum, param);
263         
264         for (int i=0; i<numHelpers; i++) {
265             if (i!=CkMyRank()) {
266                 notifyMsgs[i].ptr = (void *)curLoop;
267                 CmiPushPE(i, (void *)(notifyMsgs+i));
268             }
269         }        
270         curLoop->stealWork();
271         TRACE_BRACKET(20);
272         
273         TRACE_START(21);                
274         curLoop->waitLoopDone();
275         TRACE_BRACKET(21);        
276     }
277 #else
278 //non-SMP case
279 /* Only works in the non-SMP case */
280     CmiAssert(mode == NODEHELPER_PTHREAD);
281     
282     TRACE_START(20);
283     stride = unit+1;
284     for (int i=0; i<remainder; i++, first+=stride) {
285         task[i]->init(func, first, first+stride-1, 0, CkMyRank(),paramNum, param);            
286         CmiLock((Q->lock));
287         unsigned int t=(int)(CmiWallTimer()*1000);
288         CqsEnqueueGeneral((Q->nodeQ), (void *)task[i],CQS_QUEUEING_IFIFO,0,&t);
289         CmiUnlock((Q->lock));
290     }
291     
292     stride = unit;
293     for (int i=remainder; i<numChunks; i++, first+=stride) {
294         task[i]->init(func, first, first+stride-1, 0, CkMyRank(),paramNum, param);            
295         CmiLock((Q->lock));
296         unsigned int t=(int)(CmiWallTimer()*1000);
297         CqsEnqueueGeneral((Q->nodeQ), (void *)task[i],CQS_QUEUEING_IFIFO,0,&t);
298         CmiUnlock((Q->lock));
299     }    
300     //signal the thread
301     for (int i=0; i<threadNum; i++) {
302         pthread_mutex_lock(allLocks[i]);
303         pthread_cond_signal(allConds[i]);
304         pthread_mutex_unlock(allLocks[i]);
305     }
306     TRACE_BRACKET(20);
307     
308     TRACE_START(21);
309     //wait for the result
310     waitThreadDone(numChunks);
311     TRACE_BRACKET(21);
312 #endif
313
314     if (type!=NODEHELPER_NONE)
315         reduce(task, redResult, type, numChunks);            
316     return;
317 }
318
319 #define COMPUTE_REDUCTION(T) {\
320     for(int i=0; i<numChunks; i++) {\
321      result += *((T *)(thisReq[i]->redBuf)); \
322      /*CkPrintf("Nodehelper Reduce: %d\n", result);*/ \
323     }\
324 }
325
326 void FuncNodeHelper::reduce(Task ** thisReq, void *redBuf, REDUCTION_TYPE type, int numChunks) {
327     switch(type){
328         case NODEHELPER_INT_SUM:
329         {
330             int result=0;
331             COMPUTE_REDUCTION(int)
332             *((int *)redBuf) = result;
333             break;
334         }
335         case NODEHELPER_FLOAT_SUM:
336         {
337             float result=0;
338             COMPUTE_REDUCTION(float)
339             *((float *)redBuf) = result;
340             break;
341         }
342         case NODEHELPER_DOUBLE_SUM:
343         {
344             double result=0;
345             COMPUTE_REDUCTION(double)
346             *((double *)redBuf) = result;
347             break;
348         }
349         default:
350         break;
351     }
352 }
353
354 #if CMK_SMP
355 void FuncNodeHelper::waitDone(Task ** thisReq,int chunck) {
356     int flag = 1,i;
357     while (1) {
358         for (i=0; i<chunck; i++)
359             flag = flag & thisReq[i]->isFlagSet();
360         if (flag) break;
361         flag = 1;
362     }
363 }
364 #else
365 void FuncNodeHelper::waitThreadDone(int chunck) {
366     while (finishedCnt!=chunck);
367     finishedCnt=0;
368 }
369 #endif
370
371 void FuncNodeHelper::printMode(int mode) {
372     switch(mode){
373         case NODEHELPER_PTHREAD:
374             CkPrintf("NodeHelperLib is used in non-SMP using pthread with a simple dynamic scheduling\n");
375             break;
376         case NODEHELPER_DYNAMIC:
377             CkPrintf("NodeHelperLib is used in SMP with a simple dynamic scheduling\n");
378             break;
379         case NODEHELPER_STATIC:
380             CkPrintf("NodeHelperLib is used in SMP with a simple static scheduling\n");
381             break;
382         case NODEHELPER_CHARE_DYNAMIC:
383             CkPrintf("NodeHelperLib is used in SMP with a simple dynamic scheduling but not using node-level queue\n");
384             break;
385         default:
386             CkPrintf("ERROR: NodeHelperLib is used in unknown mode\n");
387     }
388 }
389
390 void NotifySingleHelper(ConverseNotifyMsg *msg){
391     FuncSingleHelper *h = (FuncSingleHelper *)msg->ptr;
392     h->processWork(0);
393 }
394
395 void SingleHelperStealWork(ConverseNotifyMsg *msg){
396     CurLoopInfo *loop = (CurLoopInfo *)msg->ptr;
397     loop->stealWork();
398 }
399
400 //======================================================================//
401 // Functions regarding helpers that parallelize a single function on a  //
402 // single node (like OpenMP)                                            // 
403 //======================================================================//
404 void FuncSingleHelper::processWork(int filler) {
405     Task *one = NULL; // = (WorkReqEntry *)SimpleQueuePop(reqQ);    
406     void *tmp;
407     
408     CmiLock(reqLock);
409     CqsDequeue(reqQ, &tmp);
410     CmiUnlock(reqLock);    
411
412     one = (Task *)tmp;
413     while (one) {
414         (one->fnPtr)(one->first,one->last,(void *)(one->redBuf), one->paramNum, one->param);
415         //int *partial = (int *)(one->redBuf);
416         //CkPrintf("SingleHelper[%d]: partial=%d\n", CkMyRank(), *partial);
417         
418         //one->setFlag();
419         __sync_add_and_fetch(&(thisNodeHelper->helperPtr[one->originRank]->counter), 1);
420         
421         
422         CmiLock(reqLock);
423         CqsDequeue(reqQ, &tmp);
424         one = (Task *)tmp;
425         CmiUnlock(reqLock);
426     }
427 }
428
429 void CurLoopInfo::stealWork(){
430     int first, last;
431     int unit = (upperIndex-lowerIndex+1)/numChunks;
432     int remainder = (upperIndex-lowerIndex+1)-unit*numChunks;
433     int markIdx = remainder*(unit+1);
434     
435     int nextChunkId = getNextChunkIdx();
436     while(nextChunkId < numChunks){
437         if(nextChunkId < remainder){
438             first = (unit+1)*nextChunkId;
439             last = first+unit;
440         }else{
441             first = (nextChunkId - remainder)*unit + markIdx;
442             last = first+unit-1;
443         }
444                 
445         fnPtr(first, last, redBufs[nextChunkId], paramNum, param);
446         
447         nextChunkId = getNextChunkIdx();
448     }
449     reportFinished();    
450 }
451
452 //======================================================================//
453 //   End of functions related with FuncSingleHelper                     //
454 //======================================================================//
455
456 CProxy_FuncNodeHelper NodeHelper_Init(int mode,int numThds){
457     FuncNodeHelper::printMode(mode);
458     return CProxy_FuncNodeHelper::ckNew(mode, numThds);
459 }
460
461 void NodeHelper_Parallelize(CProxy_FuncNodeHelper nodeHelper, HelperFn func, 
462                         int paramNum, void * param, int msgPriority,
463                         int numChunks, int lowerRange, int upperRange, 
464                         void *redResult, REDUCTION_TYPE type)
465 {
466     nodeHelper[CkMyNode()].ckLocalBranch()->parallelizeFunc(func, paramNum, param, msgPriority, numChunks, lowerRange, upperRange, redResult, type);
467 }
468
469 #include "NodeHelper.def.h"