a9210f1b7738d0deef7d02c07c4efd19ca58c37e
[charm.git] / NodeHelper.C
1 #include "NodeHelper.h"
2 #include <pthread.h>
3
4 /*====Beginning of pthread-related variables and impelementation====*/
5 //__thread is not portable, but it works almost everywhere if pthread works
6 //After C++11, this should be thread_local
7 static __thread pthread_cond_t thdCondition; //the signal var of each pthread to be notified
8 static __thread pthread_mutex_t thdLock; //the lock associated with the condition variables
9
10 static FuncNodeHelper *mainHelper = NULL;
11 static int mainHelperPhyRank = 0;
12 static int numPhysicalPEs = 0;
13 static CurLoopInfo *pthdLoop = NULL; //the pthread-version is always synchronized
14 static pthread_mutex_t **allLocks = NULL;
15 static pthread_cond_t **allConds = NULL;
16 static pthread_t *ndhThreads = NULL;
17 static volatile int gCrtCnt = 0;
18 static volatile int exitFlag = 0;
19
20 #if CMK_OS_IS_LINUX
21 #include <sys/syscall.h>
22 #endif
23
24 static int HelperOnCore(){
25 #if CMK_OS_IS_LINUX
26         char fname[64];
27         sprintf(fname, "/proc/%d/task/%ld/stat", getpid(), syscall(SYS_gettid));
28         FILE *ifp = fopen(fname, "r");
29         if(ifp == NULL) return -1;
30         fseek(ifp, 0, SEEK_SET);
31         char str[128];
32         for(int i=0; i<39; i++) fscanf(ifp, "%s", str);
33         fclose(ifp);
34         return atoi(str);
35 #else
36         return -1;
37 #endif
38 }
39
40 static void *ndhThreadWork(void *id) {
41         size_t myId = (size_t) id;
42         
43         //further improvement of this affinity setting!!
44         int myPhyRank = (myId+mainHelperPhyRank)%numPhysicalPEs;
45         //printf("thread[%d]: affixed to rank %d\n", myId, myPhyRank);
46         myPhyRank = myId;
47         CmiSetCPUAffinity(myPhyRank);
48         
49         pthread_mutex_init(&thdLock, NULL);
50         pthread_cond_init(&thdCondition, NULL);
51         
52         allLocks[myId-1] = &thdLock;
53         allConds[myId-1] = &thdCondition;
54         
55         __sync_add_and_fetch(&gCrtCnt, 1);
56         
57         while(1){
58                 //printf("thread[%ld]: on core %d with main %d\n", myId, HelperOnCore(), mainHelperPhyRank);
59                 if(exitFlag) break;
60                 pthread_mutex_lock(&thdLock);
61                 pthread_cond_wait(&thdCondition, &thdLock);
62                 pthread_mutex_unlock(&thdLock);
63                 /* kids ID range: [1 ~ numHelpers-1] */
64                 if(mainHelper->needTreeBcast()){
65                         //notify my children
66                         int myKid = myId*TREE_BCAST_BRANCH+1;
67                         for(int i=0; i<TREE_BCAST_BRANCH; i++, myKid++){
68                                 if(myKid >= mainHelper->getNumHelpers()) break;
69                                 //all locks and conditions exclude the main thread, so index needs to be subtracted by one
70                                 pthread_mutex_lock(allLocks[myKid-1]);
71                                 pthread_cond_signal(allConds[myKid-1]);
72                                 pthread_mutex_unlock(allLocks[myKid-1]);
73                         }
74                 }
75                 pthdLoop->stealWork();
76         }
77 }
78
79 void FuncNodeHelper::createPThreads() {
80         int numThreads = numHelpers - 1;
81         allLocks = (pthread_mutex_t **)malloc(sizeof(void *)*numThreads);
82         allConds = (pthread_cond_t **)malloc(sizeof(void *)*numThreads);
83         memset(allLocks, 0, sizeof(void *)*numThreads);
84         memset(allConds, 0, sizeof(void *)*numThreads);
85         
86         pthread_attr_t attr;
87         pthread_attr_init(&attr);
88         pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
89         ndhThreads = new pthread_t[numThreads];
90         mainHelperPhyRank = CmiOnCore();
91         numPhysicalPEs = CmiNumCores();
92         if(mainHelperPhyRank == -1) mainHelperPhyRank = 0;
93         for(int i=1; i<=numThreads; i++){
94                 pthread_create(ndhThreads+i, &attr, ndhThreadWork, (void *)i);
95         }
96         while(gCrtCnt != numThreads); //wait for all threads to finish creation
97 }
98
99 void FuncNodeHelper::exit(){
100         if(mode == NODEHELPER_PTHREAD){
101                 exitFlag = 1;             
102                 for(int i=0; i<numHelpers-1; i++)
103                         pthread_join(ndhThreads[i], NULL);
104                 delete [] ndhThreads;
105                 free(allLocks);
106                 free(allConds);
107                 delete pthdLoop;
108         }
109 }
110
111 /*====End of pthread-related variables and impelementation====*/
112
113
114 /* Note: Those event ids should be unique globally!! */
115 #define NDH_TOTAL_WORK_EVENTID  139
116 #define NDH_FINISH_SIGNAL_EVENTID 143
117
118 static FuncNodeHelper *globalNodeHelper = NULL;
119
120 FuncNodeHelper::FuncNodeHelper(int mode_, int numThreads_)
121 {  
122     traceRegisterUserEvent("nodehelper total work",NDH_TOTAL_WORK_EVENTID);
123     traceRegisterUserEvent("nodehlelper finish signal",NDH_FINISH_SIGNAL_EVENTID);
124
125     mode = mode_;
126     
127     CmiAssert(globalNodeHelper==NULL);
128     globalNodeHelper = this;
129     
130     if(mode == NODEHELPER_USECHARM){
131         //CkPrintf("FuncNodeHelper created on node %d\n", CkMyNode());                     
132         numHelpers = CkMyNodeSize();
133         helperPtr = new FuncSingleHelper *[numHelpers];
134         useTreeBcast = (numHelpers >= USE_TREE_BROADCAST_THRESHOLD);
135         
136         int pestart = CkNodeFirst(CkMyNode());
137             
138         for (int i=0; i<numHelpers; i++) {
139             CkChareID helper;
140             CProxy_FuncSingleHelper::ckNew(numHelpers, &helper, pestart+i);
141         }
142     }else if(mode == NODEHELPER_PTHREAD){
143                 helperPtr = NULL;
144                 
145                 numHelpers = numThreads_;
146                 useTreeBcast = (numHelpers >= USE_TREE_BROADCAST_THRESHOLD);
147                 pthdLoop = new CurLoopInfo(FuncNodeHelper::MAX_CHUNKS);
148                 mainHelper = this;
149         createPThreads();
150     }
151 }
152
153 int FuncNodeHelper::MAX_CHUNKS = 64;
154
155 #if CMK_TRACE_ENABLED
156 #define TRACE_START(id) _start = CmiWallTimer()
157 #define TRACE_BRACKET(id) traceUserBracketEvent(id,_start,CmiWallTimer())
158 #else
159 #define TRACE_START(id)
160 #define TRACE_BRACKET(id)
161 #endif
162
163 #define ALLOW_MULTIPLE_UNSYNC 1
164 void FuncNodeHelper::parallelizeFunc(HelperFn func, int paramNum, void * param, 
165                                     int numChunks, int lowerRange, 
166                                                     int upperRange, int sync,
167                                     void *redResult, REDUCTION_TYPE type) {
168                                         
169     double _start; //may be used for tracing
170     
171     if(numChunks > MAX_CHUNKS){ 
172         CkPrintf("NodeHelper[%d]: WARNING! chunk is set to MAX_CHUNKS=%d\n", CmiMyPe(), MAX_CHUNKS);
173         numChunks = MAX_CHUNKS;
174     }
175         
176     /* "stride" determines the number of loop iterations to be done in each chunk
177      * for chunk indexed at 0 to remainder-1, stride is "unit+1";
178      * for chunk indexed at remainder to numChunks-1, stride is "unit"
179      */
180      int stride;
181          CurLoopInfo *curLoop = NULL;
182     
183     //for using nodequeue
184         TRACE_START(NDH_TOTAL_WORK_EVENTID);
185         if(mode == NODEHELPER_USECHARM){        
186                 FuncSingleHelper *thisHelper = helperPtr[CkMyRank()];
187 #if USE_CONVERSE_NOTIFICATION        
188         #if ALLOW_MULTIPLE_UNSYNC
189                 ConverseNotifyMsg *notifyMsg = thisHelper->getNotifyMsg();
190         #else
191                 ConverseNotifyMsg *notifyMsg = thisHelper->notifyMsg;
192         #endif
193                 curLoop = (CurLoopInfo *)(notifyMsg->ptr);
194                 curLoop->set(numChunks, func, lowerRange, upperRange, paramNum, param);
195                 if(useTreeBcast){
196                         int loopTimes = TREE_BCAST_BRANCH>(CmiMyNodeSize()-1)?CmiMyNodeSize()-1:TREE_BCAST_BRANCH;
197                         //just implicit binary tree
198                         int pe = CmiMyRank()+1;        
199                         for(int i=0; i<loopTimes; i++, pe++){
200                                 if(pe >= CmiMyNodeSize()) pe -= CmiMyNodeSize();
201                                 CmiPushPE(pe, (void *)(notifyMsg));    
202                         }
203                 }else{
204                         for (int i=CmiMyRank()+1; i<numHelpers; i++) {
205                                 CmiPushPE(i, (void *)(notifyMsg));
206                         }
207                         for (int i=0; i<CmiMyRank(); i++) {
208                                 CmiPushPE(i, (void *)(notifyMsg));
209                         }
210                 }
211 #else
212     #if ALLOW_MULTIPLE_UNSYNC
213         curLoop = thisHelper->getNewTask();
214     #else
215         curLoop = thisHelper->taskBuffer[0];
216     #endif
217         curLoop->set(numChunks, func, lowerRange, upperRange, paramNum, param);
218         if(useTreeBcast){
219                         int loopTimes = TREE_BCAST_BRANCH>(CmiMyNodeSize()-1)?CmiMyNodeSize()-1:TREE_BCAST_BRANCH;
220                         //just implicit binary tree
221                         int pe = CmiMyRank()+1;
222                         for(int i=0; i<loopTimes; i++, pe++){
223                                 if(pe >= CmiMyNodeSize()) pe -= CmiMyNodeSize();
224                 CharmNotifyMsg *one = thisHelper->getNotifyMsg();
225                 one->ptr = (void *)curLoop;
226                 envelope *env = UsrToEnv(one);
227                 env->setObjPtr(thisHelper->ckGetChareID().objPtr);
228                                 CmiPushPE(pe, (void *)(env));    
229                         }
230                 }else{
231                         for (int i=CmiMyRank()+1; i<numHelpers; i++) {
232                 CharmNotifyMsg *one = thisHelper->getNotifyMsg();
233                 one->ptr = (void *)curLoop;
234                 envelope *env = UsrToEnv(one);
235                 env->setObjPtr(thisHelper->ckGetChareID().objPtr);
236                 //printf("[%d] sending a msg %p (env=%p) to [%d]\n", CmiMyRank(), one, env, i);
237                                 CmiPushPE(i, (void *)(env));
238                         }
239                         for (int i=0; i<CmiMyRank(); i++) {
240                 CharmNotifyMsg *one = thisHelper->getNotifyMsg();
241                 one->ptr = (void *)curLoop;
242                 envelope *env = UsrToEnv(one);
243                 env->setObjPtr(thisHelper->ckGetChareID().objPtr);
244                 //printf("[%d] sending a msg %p (env=%p) to [%d]\n", CmiMyRank(), one, env, i);
245                                 CmiPushPE(i, (void *)(env));
246                         }
247                 }
248 #endif        
249         }else if(mode == NODEHELPER_PTHREAD){
250                 int numThreads = numHelpers-1;
251                 curLoop = pthdLoop;
252                 curLoop->set(numChunks, func, lowerRange, upperRange, paramNum, param);
253                 int numNotices = numThreads;
254                 if(useTreeBcast){
255                         numNotices = TREE_BCAST_BRANCH>=numThreads?numThreads:TREE_BCAST_BRANCH;
256                 }
257                 for(int i=0; i<numNotices; i++){
258                         pthread_mutex_lock(allLocks[i]);
259                         pthread_cond_signal(allConds[i]);
260                         pthread_mutex_unlock(allLocks[i]);
261                 }
262                 //in this mode, it's always synced
263                 sync = 1;
264         }
265         
266         curLoop->stealWork();
267         TRACE_BRACKET(NDH_TOTAL_WORK_EVENTID);
268         
269         TRACE_START(NDH_FINISH_SIGNAL_EVENTID);
270         curLoop->waitLoopDone(sync);
271         TRACE_BRACKET(NDH_FINISH_SIGNAL_EVENTID);
272
273     if (type!=NODEHELPER_NONE)
274         reduce(curLoop->getRedBufs(), redResult, type, numChunks);            
275     return;
276 }
277
278 #define COMPUTE_REDUCTION(T) {\
279     for(int i=0; i<numChunks; i++) {\
280      result += *((T *)(redBufs[i])); \
281      /*CkPrintf("Nodehelper Reduce: %d\n", result);*/ \
282     }\
283 }
284
285 void FuncNodeHelper::reduce(void **redBufs, void *redBuf, REDUCTION_TYPE type, int numChunks) {
286     switch(type){
287         case NODEHELPER_INT_SUM:
288         {
289             int result=0;
290             COMPUTE_REDUCTION(int)
291             *((int *)redBuf) = result;
292             break;
293         }
294         case NODEHELPER_FLOAT_SUM:
295         {
296             float result=0;
297             COMPUTE_REDUCTION(float)
298             *((float *)redBuf) = result;
299             break;
300         }
301         case NODEHELPER_DOUBLE_SUM:
302         {
303             double result=0;
304             COMPUTE_REDUCTION(double)
305             *((double *)redBuf) = result;
306             break;
307         }
308         default:
309         break;
310     }
311 }
312
313 CpvStaticDeclare(int, NdhStealWorkHandler);
314 static void RegisterNodeHelperHdlrs(){
315     CpvInitialize(int, NdhStealWorkHandler);
316     CpvAccess(NdhStealWorkHandler) = CmiRegisterHandler((CmiHandler)SingleHelperStealWork);
317 }
318
319 extern int _charmHandlerIdx;
320 FuncSingleHelper::FuncSingleHelper(int numHelpers) {
321     totalHelpers = numHelpers;
322 #if USE_CONVERSE_NOTIFICATION    
323     notifyMsgBufSize = TASK_BUFFER_SIZE;
324 #else
325     notifyMsgBufSize = TASK_BUFFER_SIZE*totalHelpers;
326 #endif
327
328     CmiAssert(globalNodeHelper!=NULL);
329     thisNodeHelper = globalNodeHelper;
330             
331         nextFreeNotifyMsg = 0;
332 #if USE_CONVERSE_NOTIFICATION    
333     notifyMsg = (ConverseNotifyMsg *)malloc(sizeof(ConverseNotifyMsg)*notifyMsgBufSize);
334     for(int i=0; i<notifyMsgBufSize; i++){
335         ConverseNotifyMsg *tmp = notifyMsg+i;
336         if(thisNodeHelper->useTreeBcast){
337             tmp->srcRank = CmiMyRank();
338         }else{
339             tmp->srcRank = -1;
340         }            
341         tmp->ptr = (void *)(new CurLoopInfo(FuncNodeHelper::MAX_CHUNKS));
342         CmiSetHandler(tmp, CpvAccess(NdhStealWorkHandler));
343     }
344 #else
345     nextFreeTaskBuffer = 0;
346     notifyMsg = (CharmNotifyMsg **)malloc(sizeof(CharmNotifyMsg *)*notifyMsgBufSize);
347     for(int i=0; i<notifyMsgBufSize; i++){
348         CharmNotifyMsg *tmp = new(sizeof(int)*8)CharmNotifyMsg; //allow msg priority bits
349         notifyMsg[i] = tmp;
350         if(thisNodeHelper->useTreeBcast){
351             tmp->srcRank = CmiMyRank();
352         }else{
353             tmp->srcRank = -1;
354         }
355         tmp->ptr = NULL;
356         envelope *env = UsrToEnv(tmp);
357         env->setMsgtype(ForChareMsg);
358         env->setEpIdx(CkIndex_FuncSingleHelper::stealWork(NULL));
359         env->setSrcPe(CkMyPe());
360         CmiSetHandler(env, _charmHandlerIdx);
361         //env->setObjPtr has to be called when a notification msg is sent
362     }
363     taskBuffer = (CurLoopInfo **)malloc(sizeof(CurLoopInfo *)*TASK_BUFFER_SIZE);
364     for(int i=0; i<TASK_BUFFER_SIZE; i++){
365         taskBuffer[i] = new CurLoopInfo(FuncNodeHelper::MAX_CHUNKS);
366     }
367 #endif    
368     globalNodeHelper->helperPtr[CkMyRank()] = this;
369 }
370
371 void FuncSingleHelper::stealWork(CharmNotifyMsg *msg){
372 #if !USE_CONVERSE_NOTIFICATION    
373     int srcRank = msg->srcRank;
374     CurLoopInfo *loop = (CurLoopInfo *)msg->ptr;
375     if(srcRank >= 0){
376         //means using tree-broadcast to send the notification msg
377         int relPE = CmiMyRank()-msg->srcRank;
378                 if(relPE<0) relPE += CmiMyNodeSize();
379                 
380                 //CmiPrintf("Rank[%d]: got msg from src %d with relPE %d\n", CmiMyRank(), msg->srcRank, relPE);
381                 relPE=relPE*TREE_BCAST_BRANCH+1;
382                 for(int i=0; i<TREE_BCAST_BRANCH; i++, relPE++){
383                         if(relPE >= CmiMyNodeSize()) break;
384                         int pe = (relPE + msg->srcRank)%CmiMyNodeSize();
385                         //CmiPrintf("Rank[%d]: send msg to dst %d (relPE: %d) from src %d\n", CmiMyRank(), pe, relPE, msg->srcRank);
386             CharmNotifyMsg *newone = getNotifyMsg();
387             newone->ptr = (void *)loop;
388             envelope *env = UsrToEnv(newone);
389             env->setObjPtr(thisNodeHelper->helperPtr[pe]->ckGetChareID().objPtr);
390                         CmiPushPE(pe, (void *)env);
391                 }
392     }
393     loop->stealWork();
394 #endif    
395 }
396
397 void SingleHelperStealWork(ConverseNotifyMsg *msg){
398         int srcRank = msg->srcRank;
399         
400         if(srcRank >= 0){
401                 //means using tree-broadcast to send the notification msg
402                 
403                 //int numHelpers = CmiMyNodeSize(); //the value of "numHelpers" should be obtained somewhere else
404                 int relPE = CmiMyRank()-msg->srcRank;
405                 if(relPE<0) relPE += CmiMyNodeSize();
406                 
407                 //CmiPrintf("Rank[%d]: got msg from src %d with relPE %d\n", CmiMyRank(), msg->srcRank, relPE);
408                 relPE=relPE*TREE_BCAST_BRANCH+1;
409                 for(int i=0; i<TREE_BCAST_BRANCH; i++, relPE++){
410                         if(relPE >= CmiMyNodeSize()) break;
411                         int pe = (relPE + msg->srcRank)%CmiMyNodeSize();
412                         //CmiPrintf("Rank[%d]: send msg to dst %d (relPE: %d) from src %d\n", CmiMyRank(), pe, relPE, msg->srcRank);
413                         CmiPushPE(pe, (void *)msg);
414                 }
415         }
416     CurLoopInfo *loop = (CurLoopInfo *)msg->ptr;
417     loop->stealWork();
418 }
419
420 void CurLoopInfo::stealWork(){
421     //indicate the current work hasn't been initialized
422     //or the old work has finished.
423     if(inited == 0) return;
424     
425     int first, last;
426     int unit = (upperIndex-lowerIndex+1)/numChunks;
427     int remainder = (upperIndex-lowerIndex+1)-unit*numChunks;
428     int markIdx = remainder*(unit+1);
429     
430     int nextChunkId = getNextChunkIdx();
431     int execTimes = 0;
432     while(nextChunkId < numChunks){
433         if(nextChunkId < remainder){
434             first = (unit+1)*nextChunkId;
435             last = first+unit;
436         }else{
437             first = (nextChunkId - remainder)*unit + markIdx;
438             last = first+unit-1;
439         }
440                 
441         fnPtr(first, last, redBufs[nextChunkId], paramNum, param);
442         execTimes++;
443         nextChunkId = getNextChunkIdx();
444     }
445     reportFinished(execTimes);
446 }
447
448 //======================================================================//
449 //   End of functions related with FuncSingleHelper                     //
450 //======================================================================//
451
452 CProxy_FuncNodeHelper NodeHelper_Init(int mode, int numThreads){
453     if(mode == NODEHELPER_USECHARM){
454         CkPrintf("NodeHelperLib is used in SMP with a simple dynamic scheduling but not using node-level queue\n");
455     }else if(mode==NODEHELPER_PTHREAD){
456         CkPrintf("NodeHelperLib is used with extra %d pthreads via a simple dynamic scheduling\n", numThreads);
457         CmiAssert(numThreads>0);
458     }
459     return CProxy_FuncNodeHelper::ckNew(mode, numThreads);
460 }
461
462 void NodeHelper_Exit(CProxy_FuncNodeHelper nodeHelper){
463         nodeHelper.exit();
464 }
465
466 void NodeHelper_Parallelize(CProxy_FuncNodeHelper nodeHelper, HelperFn func, 
467                         int paramNum, void * param, 
468                         int numChunks, int lowerRange, int upperRange,
469                                     int sync,
470                         void *redResult, REDUCTION_TYPE type)
471 {
472     nodeHelper[CkMyNode()].ckLocalBranch()->parallelizeFunc(func, paramNum, param, numChunks, lowerRange, upperRange, sync, redResult, type);
473 }
474
475 void NodeHelper_Parallelize(HelperFn func, 
476                         int paramNum, void * param, 
477                         int numChunks, int lowerRange, int upperRange,
478                                     int sync,
479                         void *redResult, REDUCTION_TYPE type)
480 {
481     globalNodeHelper->parallelizeFunc(func, paramNum, param, numChunks, lowerRange, upperRange, sync, redResult, type);
482 }
483 #include "NodeHelper.def.h"