Fixed a bug in using the macro for the number of test iterations.
[charm.git] / NodeHelper.C
1 #include "NodeHelper.h"
2
3 //=======Beginning of pthread version of scheduling which is used in non-SMP =======//
4 #if !CMK_SMP
5 NodeQueue Q;
6
7 //vars local to spawned threads
8 //Note: __thread is not portable, but works pretty much anywhere pthreads work.
9 // after C++11 this should be thread_local
10 __thread pthread_mutex_t lock;
11 __thread pthread_cond_t condition;
12
13 //vars to the main flow (master thread)
14 pthread_t * threads;
15 pthread_mutex_t **allLocks;
16 pthread_cond_t **allConds;
17
18 //global barrier
19 pthread_mutex_t gLock;
20 pthread_cond_t gCond;
21 pthread_barrier_t barr;
22 //testing counter
23 volatile int finishedCnt;
24
25 void * threadWork(void * id) {
26     long my_id =(long) id;
27     //printf("thread :%ld\n",my_id);
28     CmiSetCPUAffinity(my_id+1);
29
30     pthread_mutex_init(&lock, NULL);
31     pthread_cond_init(&condition, NULL);
32
33     allLocks[my_id] = &lock;
34     allConds[my_id] = &condition;
35
36     while (1) {
37         pthread_mutex_lock(&lock);
38         pthread_cond_wait(&condition,&lock);
39         pthread_mutex_unlock(&lock);
40         void * r;
41         Task * one;
42         CmiLock(Q->lock);
43         CqsDequeue(Q->nodeQ,&r);
44         CmiUnlock(Q->lock);
45         one=(Task *)r;
46
47         while (one) {
48             //printf("starttime:%lf,id:%ld,proc:%d\n",CmiWallTimer(),my_id,CkMyPe());
49             (one->fnPtr)(one->first, one->last, (void *)(one->redBuf), one->paramNum, one->param);
50             pthread_barrier_wait(&barr);
51             //one->setFlag();
52             //printf
53             //printf("endtime:%lf,id:%ld\n",CmiWallTimer(),my_id);
54
55             //Testing
56             //AtomicIncrement(finishedCnt);
57             if (my_id==0)
58                 finishedCnt=4;
59             //printf("finishedCnt = %d\n", finishedCnt);
60
61             CmiLock((Q->lock));
62             CqsDequeue(Q->nodeQ,&r);
63             CmiUnlock((Q->lock));
64             one=(Task *)r;
65
66         }
67     }
68
69 }
70
71 void FuncNodeHelper::createThread() {
72     int threadNum = numThds;
73     pthread_attr_t attr;
74     finishedCnt=0;
75     pthread_barrier_init(&barr,NULL,threadNum);
76     allLocks = (pthread_mutex_t **)malloc(sizeof(void *)*threadNum);
77     allConds = (pthread_cond_t **)malloc(sizeof(void *)*threadNum);
78     memset(allLocks, 0, sizeof(void *)*threadNum);
79     memset(allConds, 0, sizeof(void *)*threadNum);
80
81     pthread_attr_init(&attr);
82     pthread_attr_setdetachstate(&attr,PTHREAD_CREATE_JOINABLE);
83     Q=(NodeQueue )malloc(sizeof(struct SimpleQueue));
84     Q->nodeQ=CqsCreate();
85     Q->lock=CmiCreateLock();
86     /*for(int i=0;i<threadNum;i++){
87         //Q[i]=
88         Q[i]=(NodeQueue)malloc(sizeof(struct SimpleQueue));
89         Q[i]->nodeQ=CqsCreate();
90         Q[i]->lock=CmiCreateLock();
91     }*/
92     threads=(pthread_t *)malloc(threadNum*sizeof(pthread_t));
93
94
95     //create queue;
96     for (int i=0; i<threadNum; i++)
97         pthread_create(&threads[i],&attr,threadWork,(void *)i);
98 }
99 #endif //end of !CMK_SMP (definitions for vars and functions used in non-SMP)
100
101 //=======End of pthread version of static scheduling=======//
102
103 FuncNodeHelper::FuncNodeHelper(int mode_,int numThds_):
104     mode(mode_), numThds(numThds_)
105 {   
106     
107     //CkPrintf("FuncNodeHelper created on node %d\n", CkMyNode());
108          
109     traceRegisterUserEvent("assign work",20);
110     traceRegisterUserEvent("finish signal",21);
111     
112 #if CMK_SMP
113     if (mode==NODEHELPER_DYNAMIC || 
114         mode==NODEHELPER_STATIC
115         || mode==NODEHELPER_CHARE_DYNAMIC) {
116         numHelpers = CkMyNodeSize();
117         helperArr = new CkChareID[numHelpers];
118         helperPtr = new FuncSingleHelper *[numHelpers];
119         
120         notifyMsgs = (ConverseNotifyMsg *)malloc(sizeof(ConverseNotifyMsg)*numHelpers);
121         
122         int pestart = CkNodeFirst(CkMyNode());
123         for (int i=0; i<numHelpers; i++) {
124             CProxy_FuncSingleHelper::ckNew(thisgroup, &helperArr[i], pestart+i);
125             helperPtr[i] = NULL;
126         }
127         for (int i=0; i<numHelpers; i++) {
128             CProxy_FuncSingleHelper helpProxy(helperArr[i]);
129             helpProxy.reportCreated();
130         }
131     }
132 #else
133     CmiAssert(mode==NODEHELPER_PTHREAD);
134     createThread();    
135 #endif
136 }
137
138 /* Used for dynamic scheduling as it's a node-level msg */
139 /* So this function will be executed on any PE of this node */
140 void FuncNodeHelper::send(Task * msg) {
141     (msg->fnPtr)(msg->first,msg->last,(void *)(msg->redBuf),msg->paramNum, msg->param);
142     CmiNodeLock lock = helperPtr[msg->originRank]->reqLock;
143     CmiLock(lock);
144     helperPtr[msg->originRank]->counter++;
145     CmiUnlock(lock);
146 }
147
148 int FuncNodeHelper::MAX_CHUNKS = 64;
149
150 #if CMK_TRACE_ENABLED
151 #define TRACE_START(id) _start = CmiWallTimer()
152 #define TRACE_BRACKET(id) traceUserBracketEvent(id,_start,CmiWallTimer())
153 #else
154 #define TRACE_START(id)
155 #define TRACE_BRACKET(id)
156 #endif
157
158 void FuncNodeHelper::parallelizeFunc(HelperFn func, int paramNum, void * param, 
159                                     int msgPriority, int numChunks, int lowerRange, int upperRange, 
160                                     void *redResult, REDUCTION_TYPE type) {
161                                         
162     double _start; //may be used for tracing
163     
164     if(numChunks > MAX_CHUNKS){ 
165         CkPrintf("NodeHelper[%d]: WARNING! chunk is set to MAX_CHUNKS=%d\n", CmiMyPe(), MAX_CHUNKS);
166         numChunks = MAX_CHUNKS;
167     }
168         
169     Task **task = helperPtr[CkMyRank()]->getTasksMem();
170     
171     /* "stride" determines the number of loop iterations to be done in each chunk
172      * for chunk indexed at 0 to remainder-1, stride is "unit+1";
173      * for chunk indexed at remainder to numChunks-1, stride is "unit"
174      */
175      int stride;
176     
177     //for using nodequeue
178 #if CMK_SMP
179     if (mode==NODEHELPER_DYNAMIC) {
180         int first = lowerRange;
181         int unit = (upperRange-lowerRange+1)/numChunks;
182         int remainder = (upperRange-lowerRange+1)-unit*numChunks;        
183         CProxy_FuncNodeHelper fh(thisgroup);
184
185         TRACE_START(20);        
186         stride = unit+1;
187         for (int i=0; i<remainder; i++, first+=stride) {          
188             task[i]->init(func, first, first+stride-1, CkMyRank(), paramNum, param);
189             *((int *)CkPriorityPtr(task[i]))=msgPriority;
190             CkSetQueueing(task[i],CK_QUEUEING_IFIFO);
191             fh[CkMyNode()].send(task[i]);
192         }
193         
194         stride = unit;
195         for(int i=remainder; i<numChunks; i++, first+=stride) {
196             task[i]->init(func, first, first+stride-1, CkMyRank(), paramNum, param);
197             *((int *)CkPriorityPtr(task[i]))=msgPriority;
198             CkSetQueueing(task[i],CK_QUEUEING_IFIFO);
199             fh[CkMyNode()].send(task[i]);
200         }
201         TRACE_BRACKET(20);
202         
203         TRACE_START(21);
204         FuncSingleHelper *fs = helperPtr[CmiMyRank()];
205         while (fs->counter!=numChunks)
206             CsdScheduleNodePoll();
207         //CkPrintf("counter:%d,master:%d\n",counter[master],master);
208         fs->counter = 0;
209         TRACE_BRACKET(21);        
210     } else if (mode==NODEHELPER_STATIC) {
211         int first = lowerRange;
212         int unit = (upperRange-lowerRange+1)/numChunks;
213         int remainder = (upperRange-lowerRange+1)-unit*numChunks;
214
215         TRACE_START(20);
216                 
217         stride = unit+1;
218         for (int i=0; i<remainder; i++, first+=stride) {
219             task[i]->init(func, first, first+stride-1, 0, CkMyRank(),paramNum, param);            
220             helperPtr[i%numHelpers]->enqueueWork(task[i]);
221         }
222         
223         stride = unit;
224         for (int i=remainder; i<numChunks; i++, first+=stride) {
225             task[i]->init(func, first, first+stride-1, 0, CkMyRank(),paramNum, param);            
226             helperPtr[i%numHelpers]->enqueueWork(task[i]);
227         }
228         
229 #if USE_CONVERSE_MSG
230         
231         for (int i=0; i<numHelpers; i++) {
232             if (i!=CkMyRank()) {
233                 CmiPushPE(i, (void *)(notifyMsgs+i));
234             }
235         }
236 #else
237         CkEntryOptions entOpts;
238         entOpts.setPriority(msgPriority);
239
240         for (int i=0; i<numHelpers; i++) {
241             if (i!=CkMyRank()) {
242                 CProxy_FuncSingleHelper helpProxy(helperArr[i]);                
243                 helpProxy.processWork(0, &entOpts);
244             }
245         }    
246 #endif        
247         helperPtr[CkMyRank()]->processWork(0);
248         
249         TRACE_BRACKET(20);
250         
251         TRACE_START(21);
252                 
253         while(!__sync_bool_compare_and_swap(&(helperPtr[CkMyRank()]->counter), numChunks, 0));
254         //waitDone(task,numChunks);
255         
256         TRACE_BRACKET(21);
257     }else if(mode == NODEHELPER_CHARE_DYNAMIC){
258         TRACE_START(20);
259         
260         FuncSingleHelper *thisHelper = helperPtr[CkMyRank()];
261         CurLoopInfo *curLoop = thisHelper->curLoop;
262         curLoop->set(numChunks, func, lowerRange, upperRange, paramNum, param);
263         ConverseNotifyMsg *notifyMsg = &(notifyMsgs[CmiMyRank()]);
264         notifyMsg->ptr = (void *)curLoop;
265 #if USE_TREE_BROADCAST        
266         notifyMsg->srcRank = CmiMyRank();
267         int loopTimes = TREE_BCAST_BRANCH>(CmiMyNodeSize()-1)?CmiMyNodeSize()-1:TREE_BCAST_BRANCH;
268         //just implicit binary tree
269         int pe = CmiMyRank()+1;        
270         for(int i=0; i<loopTimes; i++, pe++){
271             if(pe >= CmiMyNodeSize()) pe -= CmiMyNodeSize();
272             CmiPushPE(pe, (void *)(notifyMsg));    
273         }
274 #else        
275         for (int i=0; i<numHelpers; i++) {
276             if (i!=CkMyRank()) CmiPushPE(i, (void *)(notifyMsg));            
277         }
278 #endif            
279         curLoop->stealWork();
280         TRACE_BRACKET(20);
281         
282         TRACE_START(21);                
283         curLoop->waitLoopDone();
284         TRACE_BRACKET(21);        
285     }
286 #else
287 //non-SMP case
288 /* Only works in the non-SMP case */
289     CmiAssert(mode == NODEHELPER_PTHREAD);
290     
291     TRACE_START(20);
292     stride = unit+1;
293     for (int i=0; i<remainder; i++, first+=stride) {
294         task[i]->init(func, first, first+stride-1, 0, CkMyRank(),paramNum, param);            
295         CmiLock((Q->lock));
296         unsigned int t=(int)(CmiWallTimer()*1000);
297         CqsEnqueueGeneral((Q->nodeQ), (void *)task[i],CQS_QUEUEING_IFIFO,0,&t);
298         CmiUnlock((Q->lock));
299     }
300     
301     stride = unit;
302     for (int i=remainder; i<numChunks; i++, first+=stride) {
303         task[i]->init(func, first, first+stride-1, 0, CkMyRank(),paramNum, param);            
304         CmiLock((Q->lock));
305         unsigned int t=(int)(CmiWallTimer()*1000);
306         CqsEnqueueGeneral((Q->nodeQ), (void *)task[i],CQS_QUEUEING_IFIFO,0,&t);
307         CmiUnlock((Q->lock));
308     }    
309     //signal the thread
310     for (int i=0; i<threadNum; i++) {
311         pthread_mutex_lock(allLocks[i]);
312         pthread_cond_signal(allConds[i]);
313         pthread_mutex_unlock(allLocks[i]);
314     }
315     TRACE_BRACKET(20);
316     
317     TRACE_START(21);
318     //wait for the result
319     waitThreadDone(numChunks);
320     TRACE_BRACKET(21);
321 #endif
322
323     if (type!=NODEHELPER_NONE)
324         reduce(task, redResult, type, numChunks);            
325     return;
326 }
327
328 #define COMPUTE_REDUCTION(T) {\
329     for(int i=0; i<numChunks; i++) {\
330      result += *((T *)(thisReq[i]->redBuf)); \
331      /*CkPrintf("Nodehelper Reduce: %d\n", result);*/ \
332     }\
333 }
334
335 void FuncNodeHelper::reduce(Task ** thisReq, void *redBuf, REDUCTION_TYPE type, int numChunks) {
336     switch(type){
337         case NODEHELPER_INT_SUM:
338         {
339             int result=0;
340             COMPUTE_REDUCTION(int)
341             *((int *)redBuf) = result;
342             break;
343         }
344         case NODEHELPER_FLOAT_SUM:
345         {
346             float result=0;
347             COMPUTE_REDUCTION(float)
348             *((float *)redBuf) = result;
349             break;
350         }
351         case NODEHELPER_DOUBLE_SUM:
352         {
353             double result=0;
354             COMPUTE_REDUCTION(double)
355             *((double *)redBuf) = result;
356             break;
357         }
358         default:
359         break;
360     }
361 }
362
363 #if CMK_SMP
364 void FuncNodeHelper::waitDone(Task ** thisReq,int chunck) {
365     int flag = 1,i;
366     while (1) {
367         for (i=0; i<chunck; i++)
368             flag = flag & thisReq[i]->isFlagSet();
369         if (flag) break;
370         flag = 1;
371     }
372 }
373 #else
374 void FuncNodeHelper::waitThreadDone(int chunck) {
375     while (finishedCnt!=chunck);
376     finishedCnt=0;
377 }
378 #endif
379
380 void FuncNodeHelper::printMode(int mode) {
381     switch(mode){
382         case NODEHELPER_PTHREAD:
383             CkPrintf("NodeHelperLib is used in non-SMP using pthread with a simple dynamic scheduling\n");
384             break;
385         case NODEHELPER_DYNAMIC:
386             CkPrintf("NodeHelperLib is used in SMP with a simple dynamic scheduling\n");
387             break;
388         case NODEHELPER_STATIC:
389             CkPrintf("NodeHelperLib is used in SMP with a simple static scheduling\n");
390             break;
391         case NODEHELPER_CHARE_DYNAMIC:
392             CkPrintf("NodeHelperLib is used in SMP with a simple dynamic scheduling but not using node-level queue\n");
393             break;
394         default:
395             CkPrintf("ERROR: NodeHelperLib is used in unknown mode\n");
396     }
397 }
398
399 void NotifySingleHelper(ConverseNotifyMsg *msg){
400     FuncSingleHelper *h = (FuncSingleHelper *)msg->ptr;
401     h->processWork(0);
402 }
403
404 void SingleHelperStealWork(ConverseNotifyMsg *msg){
405 #if USE_TREE_BROADCAST
406     //int numHelpers = CmiMyNodeSize(); //the value of "numHelpers" should be obtained somewhere else
407     int relPE = CmiMyRank()-msg->srcRank;
408     if(relPE<0) relPE += CmiMyNodeSize();
409     
410     //CmiPrintf("Rank[%d]: got msg from src %d with relPE %d\n", CmiMyRank(), msg->srcRank, relPE);
411     relPE=relPE*TREE_BCAST_BRANCH+1;
412     for(int i=0; i<TREE_BCAST_BRANCH; i++, relPE++){
413         if(relPE >= CmiMyNodeSize()) break;
414         int pe = (relPE + msg->srcRank)%CmiMyNodeSize();
415         //CmiPrintf("Rank[%d]: send msg to dst %d (relPE: %d) from src %d\n", CmiMyRank(), pe, relPE, msg->srcRank);
416         CmiPushPE(pe, (void *)msg);
417     }
418 #endif
419     CurLoopInfo *loop = (CurLoopInfo *)msg->ptr;
420     loop->stealWork();
421 }
422
423 //======================================================================//
424 // Functions regarding helpers that parallelize a single function on a  //
425 // single node (like OpenMP)                                            // 
426 //======================================================================//
427 void FuncSingleHelper::processWork(int filler) {
428     Task *one = NULL; // = (WorkReqEntry *)SimpleQueuePop(reqQ);    
429     void *tmp;
430     
431     CmiLock(reqLock);
432     CqsDequeue(reqQ, &tmp);
433     CmiUnlock(reqLock);    
434
435     one = (Task *)tmp;
436     while (one) {
437         (one->fnPtr)(one->first,one->last,(void *)(one->redBuf), one->paramNum, one->param);
438         //int *partial = (int *)(one->redBuf);
439         //CkPrintf("SingleHelper[%d]: partial=%d\n", CkMyRank(), *partial);
440         
441         //one->setFlag();
442         __sync_add_and_fetch(&(thisNodeHelper->helperPtr[one->originRank]->counter), 1);
443         
444         
445         CmiLock(reqLock);
446         CqsDequeue(reqQ, &tmp);
447         one = (Task *)tmp;
448         CmiUnlock(reqLock);
449     }
450 }
451
452 void CurLoopInfo::stealWork(){
453     int first, last;
454     int unit = (upperIndex-lowerIndex+1)/numChunks;
455     int remainder = (upperIndex-lowerIndex+1)-unit*numChunks;
456     int markIdx = remainder*(unit+1);
457     
458     int nextChunkId = getNextChunkIdx();
459     while(nextChunkId < numChunks){
460         if(nextChunkId < remainder){
461             first = (unit+1)*nextChunkId;
462             last = first+unit;
463         }else{
464             first = (nextChunkId - remainder)*unit + markIdx;
465             last = first+unit-1;
466         }
467                 
468         fnPtr(first, last, redBufs[nextChunkId], paramNum, param);
469         reportFinished();
470         
471         nextChunkId = getNextChunkIdx();
472     }
473 }
474
475 //======================================================================//
476 //   End of functions related with FuncSingleHelper                     //
477 //======================================================================//
478
479 CProxy_FuncNodeHelper NodeHelper_Init(int mode,int numThds){
480     FuncNodeHelper::printMode(mode);
481     return CProxy_FuncNodeHelper::ckNew(mode, numThds);
482 }
483
484 void NodeHelper_Parallelize(CProxy_FuncNodeHelper nodeHelper, HelperFn func, 
485                         int paramNum, void * param, int msgPriority,
486                         int numChunks, int lowerRange, int upperRange, 
487                         void *redResult, REDUCTION_TYPE type)
488 {
489     nodeHelper[CkMyNode()].ckLocalBranch()->parallelizeFunc(func, paramNum, param, msgPriority, numChunks, lowerRange, upperRange, redResult, type);
490 }
491
492 #include "NodeHelper.def.h"