Make event id to be macro defined, and warned that those user event ids should
[charm.git] / NodeHelper.C
1 #include "NodeHelper.h"
2 #include <pthread.h>
3
4 /*====Beginning of pthread-related variables and impelementation====*/
5 //__thread is not portable, but it works almost everywhere if pthread works
6 //After C++11, this should be thread_local
7 static __thread pthread_cond_t thdCondition; //the signal var of each pthread to be notified
8 static __thread pthread_mutex_t thdLock; //the lock associated with the condition variables
9
10 static FuncNodeHelper *mainHelper = NULL;
11 static int mainHelperPhyRank = 0;
12 static int numPhysicalPEs = 0;
13 static CurLoopInfo *pthdLoop = NULL; //the pthread-version is always synchronized
14 static pthread_mutex_t **allLocks = NULL;
15 static pthread_cond_t **allConds = NULL;
16 static pthread_t *ndhThreads = NULL;
17 static volatile int gCrtCnt = 0;
18 static volatile int exitFlag = 0;
19
20 #if CMK_OS_IS_LINUX
21 #include <sys/syscall.h>
22 #endif
23
24 static int HelperOnCore(){
25 #if CMK_OS_IS_LINUX
26         char fname[64];
27         sprintf(fname, "/proc/%d/task/%ld/stat", getpid(), syscall(SYS_gettid));
28         FILE *ifp = fopen(fname, "r");
29         if(ifp == NULL) return -1;
30         fseek(ifp, 0, SEEK_SET);
31         char str[128];
32         for(int i=0; i<39; i++) fscanf(ifp, "%s", str);
33         fclose(ifp);
34         return atoi(str);
35 #else
36         return -1;
37 #endif
38 }
39
40 static void *ndhThreadWork(void *id) {
41         size_t myId = (size_t) id;
42         
43         //further improvement of this affinity setting!!
44         int myPhyRank = (myId+mainHelperPhyRank)%numPhysicalPEs;
45         //printf("thread[%d]: affixed to rank %d\n", myId, myPhyRank);
46         myPhyRank = myId;
47         CmiSetCPUAffinity(myPhyRank);
48         
49         pthread_mutex_init(&thdLock, NULL);
50         pthread_cond_init(&thdCondition, NULL);
51         
52         allLocks[myId-1] = &thdLock;
53         allConds[myId-1] = &thdCondition;
54         
55         __sync_add_and_fetch(&gCrtCnt, 1);
56         
57         while(1){
58                 //printf("thread[%ld]: on core %d with main %d\n", myId, HelperOnCore(), mainHelperPhyRank);
59                 if(exitFlag) break;
60                 pthread_mutex_lock(&thdLock);
61                 pthread_cond_wait(&thdCondition, &thdLock);
62                 pthread_mutex_unlock(&thdLock);
63                 /* kids ID range: [1 ~ numHelpers-1] */
64                 if(mainHelper->needTreeBcast()){
65                         //notify my children
66                         int myKid = myId*TREE_BCAST_BRANCH+1;
67                         for(int i=0; i<TREE_BCAST_BRANCH; i++, myKid++){
68                                 if(myKid >= mainHelper->getNumHelpers()) break;
69                                 //all locks and conditions exclude the main thread, so index needs to be subtracted by one
70                                 pthread_mutex_lock(allLocks[myKid-1]);
71                                 pthread_cond_signal(allConds[myKid-1]);
72                                 pthread_mutex_unlock(allLocks[myKid-1]);
73                         }
74                 }
75                 pthdLoop->stealWork();
76         }
77 }
78
79 void FuncNodeHelper::createPThreads() {
80         int numThreads = numHelpers - 1;
81         allLocks = (pthread_mutex_t **)malloc(sizeof(void *)*numThreads);
82         allConds = (pthread_cond_t **)malloc(sizeof(void *)*numThreads);
83         memset(allLocks, 0, sizeof(void *)*numThreads);
84         memset(allConds, 0, sizeof(void *)*numThreads);
85         
86         pthread_attr_t attr;
87         pthread_attr_init(&attr);
88         pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
89         ndhThreads = new pthread_t[numThreads];
90         mainHelperPhyRank = CmiOnCore();
91         numPhysicalPEs = CmiNumCores();
92         if(mainHelperPhyRank == -1) mainHelperPhyRank = 0;
93         for(int i=1; i<=numThreads; i++){
94                 pthread_create(ndhThreads+i, &attr, ndhThreadWork, (void *)i);
95         }
96         while(gCrtCnt != numThreads); //wait for all threads to finish creation
97 }
98
99 void FuncNodeHelper::exit(){
100         if(mode == NODEHELPER_PTHREAD){
101                 exitFlag = 1;             
102                 for(int i=0; i<numHelpers-1; i++)
103                         pthread_join(ndhThreads[i], NULL);
104                 delete [] ndhThreads;
105                 free(allLocks);
106                 free(allConds);
107                 delete pthdLoop;
108         }
109 }
110
111 /*====End of pthread-related variables and impelementation====*/
112
113
114 /* Note: Those event ids should be unique globally!! */
115 #define NDH_TOTAL_WORK_EVENTID  139
116 #define NDH_FINISH_SIGNAL_EVENTID 143
117
118 FuncNodeHelper::FuncNodeHelper(int mode_, int numThreads_)
119 {  
120     traceRegisterUserEvent("nodehelper total work",NDH_TOTAL_WORK_EVENTID);
121     traceRegisterUserEvent("nodehlelper finish signal",NDH_FINISH_SIGNAL_EVENTID);
122
123     mode = mode_;
124
125     if(mode == NODEHELPER_USECHARM){
126         //CkPrintf("FuncNodeHelper created on node %d\n", CkMyNode());                     
127         numHelpers = CkMyNodeSize();
128         helperPtr = new FuncSingleHelper *[numHelpers];
129         useTreeBcast = (numHelpers >= USE_TREE_BROADCAST_THRESHOLD);
130         
131         int pestart = CkNodeFirst(CkMyNode());
132             
133         for (int i=0; i<numHelpers; i++) {
134             CkChareID helper;
135             CProxy_FuncSingleHelper::ckNew((size_t)this, &helper, pestart+i);
136         }
137     }else if(mode == NODEHELPER_PTHREAD){
138                 helperPtr = NULL;
139                 
140                 numHelpers = numThreads_;
141                 useTreeBcast = (numHelpers >= USE_TREE_BROADCAST_THRESHOLD);
142                 pthdLoop = new CurLoopInfo(FuncNodeHelper::MAX_CHUNKS);
143                 mainHelper = this;
144         createPThreads();
145     }
146 }
147
148 int FuncNodeHelper::MAX_CHUNKS = 64;
149
150 #if CMK_TRACE_ENABLED
151 #define TRACE_START(id) _start = CmiWallTimer()
152 #define TRACE_BRACKET(id) traceUserBracketEvent(id,_start,CmiWallTimer())
153 #else
154 #define TRACE_START(id)
155 #define TRACE_BRACKET(id)
156 #endif
157
158 #define ALLOW_MULTIPLE_UNSYNC 1
159 void FuncNodeHelper::parallelizeFunc(HelperFn func, int paramNum, void * param, 
160                                     int numChunks, int lowerRange, 
161                                     int upperRange, int sync,
162                                     void *redResult, REDUCTION_TYPE type) {
163                                         
164     double _start; //may be used for tracing
165     
166     if(numChunks > MAX_CHUNKS){ 
167         CkPrintf("NodeHelper[%d]: WARNING! chunk is set to MAX_CHUNKS=%d\n", CmiMyPe(), MAX_CHUNKS);
168         numChunks = MAX_CHUNKS;
169     }
170         
171     /* "stride" determines the number of loop iterations to be done in each chunk
172      * for chunk indexed at 0 to remainder-1, stride is "unit+1";
173      * for chunk indexed at remainder to numChunks-1, stride is "unit"
174      */
175      int stride;
176          CurLoopInfo *curLoop = NULL;
177     
178     //for using nodequeue
179         TRACE_START(NDH_TOTAL_WORK_EVENTID);
180         if(mode == NODEHELPER_USECHARM){        
181                 FuncSingleHelper *thisHelper = helperPtr[CkMyRank()];
182         #if ALLOW_MULTIPLE_UNSYNC
183                 ConverseNotifyMsg *notifyMsg = thisHelper->getNotifyMsg();
184         #else
185                 ConverseNotifyMsg *notifyMsg = thisHelper->notifyMsg;
186         #endif
187                 curLoop = (CurLoopInfo *)(notifyMsg->ptr);
188                 curLoop->set(numChunks, func, lowerRange, upperRange, paramNum, param); 
189                 if(useTreeBcast){               
190                         int loopTimes = TREE_BCAST_BRANCH>(CmiMyNodeSize()-1)?CmiMyNodeSize()-1:TREE_BCAST_BRANCH;
191                         //just implicit binary tree
192                         int pe = CmiMyRank()+1;        
193                         for(int i=0; i<loopTimes; i++, pe++){
194                                 if(pe >= CmiMyNodeSize()) pe -= CmiMyNodeSize();
195                                 CmiPushPE(pe, (void *)(notifyMsg));    
196                         }
197                 }else{
198                         for (int i=0; i<numHelpers; i++) {
199                                 if (i!=CkMyRank()) CmiPushPE(i, (void *)(notifyMsg));            
200                         }
201                 }
202         }else if(mode == NODEHELPER_PTHREAD){
203                 int numThreads = numHelpers-1;
204                 curLoop = pthdLoop;
205                 curLoop->set(numChunks, func, lowerRange, upperRange, paramNum, param);
206                 int numNotices = numThreads;
207                 if(useTreeBcast){
208                         numNotices = TREE_BCAST_BRANCH>=numThreads?numThreads:TREE_BCAST_BRANCH;
209                 }
210                 for(int i=0; i<numNotices; i++){
211                         pthread_mutex_lock(allLocks[i]);
212                         pthread_cond_signal(allConds[i]);
213                         pthread_mutex_unlock(allLocks[i]);
214                 }
215                 //in this mode, it's always synced
216                 sync = 1;
217         }
218         
219         curLoop->stealWork();
220         TRACE_BRACKET(NDH_TOTAL_WORK_EVENTID);
221         
222         TRACE_START(NDH_FINISH_SIGNAL_EVENTID);
223         curLoop->waitLoopDone(sync);
224         TRACE_BRACKET(NDH_FINISH_SIGNAL_EVENTID);
225
226     if (type!=NODEHELPER_NONE)
227         reduce(curLoop->getRedBufs(), redResult, type, numChunks);            
228     return;
229 }
230
231 #define COMPUTE_REDUCTION(T) {\
232     for(int i=0; i<numChunks; i++) {\
233      result += *((T *)(redBufs[i])); \
234      /*CkPrintf("Nodehelper Reduce: %d\n", result);*/ \
235     }\
236 }
237
238 void FuncNodeHelper::reduce(void **redBufs, void *redBuf, REDUCTION_TYPE type, int numChunks) {
239     switch(type){
240         case NODEHELPER_INT_SUM:
241         {
242             int result=0;
243             COMPUTE_REDUCTION(int)
244             *((int *)redBuf) = result;
245             break;
246         }
247         case NODEHELPER_FLOAT_SUM:
248         {
249             float result=0;
250             COMPUTE_REDUCTION(float)
251             *((float *)redBuf) = result;
252             break;
253         }
254         case NODEHELPER_DOUBLE_SUM:
255         {
256             double result=0;
257             COMPUTE_REDUCTION(double)
258             *((double *)redBuf) = result;
259             break;
260         }
261         default:
262         break;
263     }
264 }
265
266 CpvStaticDeclare(int, NdhStealWorkHandler);
267 static void RegisterNodeHelperHdlrs(){
268     CpvInitialize(int, NdhStealWorkHandler);
269     CpvAccess(NdhStealWorkHandler) = CmiRegisterHandler((CmiHandler)SingleHelperStealWork);
270 }
271
272 FuncSingleHelper::FuncSingleHelper(size_t ndhPtr) {
273     thisNodeHelper = (FuncNodeHelper *)ndhPtr;
274     CmiAssert(thisNodeHelper!=NULL);
275         
276         nextFreeNotifyMsg = 0;
277     notifyMsg = (ConverseNotifyMsg *)malloc(sizeof(ConverseNotifyMsg)*MSG_BUFFER_SIZE);
278     for(int i=0; i<MSG_BUFFER_SIZE; i++){
279         ConverseNotifyMsg *tmp = notifyMsg+i;
280         if(thisNodeHelper->useTreeBcast){
281             tmp->srcRank = CmiMyRank();
282         }else{
283             tmp->srcRank = -1;
284         }            
285         tmp->ptr = (void *)(new CurLoopInfo(FuncNodeHelper::MAX_CHUNKS));
286         CmiSetHandler(tmp, CpvAccess(NdhStealWorkHandler));
287     }
288     thisNodeHelper->helperPtr[CkMyRank()] = this;
289 }
290
291
292 void SingleHelperStealWork(ConverseNotifyMsg *msg){
293         
294         int srcRank = msg->srcRank;
295         
296         if(srcRank >= 0){
297                 //means using tree-broadcast to send the notification msg
298                 
299                 //int numHelpers = CmiMyNodeSize(); //the value of "numHelpers" should be obtained somewhere else
300                 int relPE = CmiMyRank()-msg->srcRank;
301                 if(relPE<0) relPE += CmiMyNodeSize();
302                 
303                 //CmiPrintf("Rank[%d]: got msg from src %d with relPE %d\n", CmiMyRank(), msg->srcRank, relPE);
304                 relPE=relPE*TREE_BCAST_BRANCH+1;
305                 for(int i=0; i<TREE_BCAST_BRANCH; i++, relPE++){
306                         if(relPE >= CmiMyNodeSize()) break;
307                         int pe = (relPE + msg->srcRank)%CmiMyNodeSize();
308                         //CmiPrintf("Rank[%d]: send msg to dst %d (relPE: %d) from src %d\n", CmiMyRank(), pe, relPE, msg->srcRank);
309                         CmiPushPE(pe, (void *)msg);
310                 }
311         }
312     CurLoopInfo *loop = (CurLoopInfo *)msg->ptr;
313     loop->stealWork();
314 }
315
316 void CurLoopInfo::stealWork(){
317     //indicate the current work hasn't been initialized
318     //or the old work has finished.
319     if(inited == 0) return;
320     
321     int first, last;
322     int unit = (upperIndex-lowerIndex+1)/numChunks;
323     int remainder = (upperIndex-lowerIndex+1)-unit*numChunks;
324     int markIdx = remainder*(unit+1);
325     
326     int nextChunkId = getNextChunkIdx();
327     int execTimes = 0;
328     while(nextChunkId < numChunks){
329         if(nextChunkId < remainder){
330             first = (unit+1)*nextChunkId;
331             last = first+unit;
332         }else{
333             first = (nextChunkId - remainder)*unit + markIdx;
334             last = first+unit-1;
335         }
336                 
337         fnPtr(first, last, redBufs[nextChunkId], paramNum, param);
338         execTimes++;
339         nextChunkId = getNextChunkIdx();
340     }
341     reportFinished(execTimes);
342 }
343
344 //======================================================================//
345 //   End of functions related with FuncSingleHelper                     //
346 //======================================================================//
347
348 CProxy_FuncNodeHelper NodeHelper_Init(int mode, int numThreads){
349     if(mode == NODEHELPER_USECHARM){
350         CkPrintf("NodeHelperLib is used in SMP with a simple dynamic scheduling but not using node-level queue\n");
351     }else if(mode==NODEHELPER_PTHREAD){
352         CkPrintf("NodeHelperLib is used with extra %d pthreads via a simple dynamic scheduling\n", numThreads);
353         CmiAssert(numThreads>0);
354     }
355     return CProxy_FuncNodeHelper::ckNew(mode, numThreads);
356 }
357
358 void NodeHelper_Exit(CProxy_FuncNodeHelper nodeHelper){
359         nodeHelper.exit();
360 }
361
362 void NodeHelper_Parallelize(CProxy_FuncNodeHelper nodeHelper, HelperFn func, 
363                         int paramNum, void * param, 
364                         int numChunks, int lowerRange, int upperRange,
365                         int sync,
366                         void *redResult, REDUCTION_TYPE type)
367 {
368     nodeHelper[CkMyNode()].ckLocalBranch()->parallelizeFunc(func, paramNum, param, numChunks, lowerRange, upperRange, sync, redResult, type);
369 }
370
371 #include "NodeHelper.def.h"