443da78b7b2781d9be2a539f6ecbbfc63d9a8440
[charm.git] / NodeHelper.C
1 #include "NodeHelper.h"
2 #include <pthread.h>
3
4 /*====Beginning of pthread-related variables and impelementation====*/
5 //__thread is not portable, but it works almost everywhere if pthread works
6 //After C++11, this should be thread_local
7 static __thread pthread_cond_t thdCondition; //the signal var of each pthread to be notified
8 static __thread pthread_mutex_t thdLock; //the lock associated with the condition variables
9
10 static FuncNodeHelper *mainHelper = NULL;
11 static int mainHelperPhyRank = 0;
12 static int numPhysicalPEs = 0;
13 static CurLoopInfo *pthdLoop = NULL; //the pthread-version is always synchronized
14 static pthread_mutex_t **allLocks = NULL;
15 static pthread_cond_t **allConds = NULL;
16 static pthread_t *ndhThreads = NULL;
17 static volatile int gCrtCnt = 0;
18 static volatile int exitFlag = 0;
19
20 #if CMK_OS_IS_LINUX
21 #include <sys/syscall.h>
22 #endif
23
24 static int HelperOnCore(){
25 #if CMK_OS_IS_LINUX
26         char fname[64];
27         sprintf(fname, "/proc/%ld/task/%ld/stat", getpid(), syscall(SYS_gettid));
28         FILE *ifp = fopen(fname, "r");
29         if(ifp == NULL) return -1;
30         fseek(ifp, 0, SEEK_SET);
31         char str[128];
32         for(int i=0; i<39; i++) fscanf(ifp, "%s", str);
33         fclose(ifp);
34         return atoi(str);
35 #else
36         return -1;
37 #endif
38 }
39
40 static void *ndhThreadWork(void *id) {
41         size_t myId = (size_t) id;
42         
43         //further improvement of this affinity setting!!
44         int myPhyRank = (myId+mainHelperPhyRank)%numPhysicalPEs;
45         //printf("thread[%d]: affixed to rank %d\n", myId, myPhyRank);
46         myPhyRank = myId;
47         CmiSetCPUAffinity(myPhyRank);
48         
49         pthread_mutex_init(&thdLock, NULL);
50         pthread_cond_init(&thdCondition, NULL);
51         
52         allLocks[myId-1] = &thdLock;
53         allConds[myId-1] = &thdCondition;
54         
55         __sync_add_and_fetch(&gCrtCnt, 1);
56         
57         while(1){
58                 //printf("thread[%ld]: on core %d with main %d\n", myId, HelperOnCore(), mainHelperPhyRank);
59                 if(exitFlag) break;
60                 pthread_mutex_lock(&thdLock);
61                 pthread_cond_wait(&thdCondition, &thdLock);
62                 pthread_mutex_unlock(&thdLock);
63                 /* kids ID range: [1 ~ numHelpers-1] */
64                 if(mainHelper->needTreeBcast()){
65                         //notify my children
66                         int myKid = myId*TREE_BCAST_BRANCH+1;
67                         for(int i=0; i<TREE_BCAST_BRANCH; i++, myKid++){
68                                 if(myKid >= mainHelper->getNumHelpers()) break;
69                                 //all locks and conditions exclude the main thread, so index needs to be subtracted by one
70                                 pthread_mutex_lock(allLocks[myKid-1]);
71                                 pthread_cond_signal(allConds[myKid-1]);
72                                 pthread_mutex_unlock(allLocks[myKid-1]);
73                         }
74                 }
75                 pthdLoop->stealWork();
76         }
77 }
78
79 void FuncNodeHelper::createPThreads() {
80         int numThreads = numHelpers - 1;
81         allLocks = (pthread_mutex_t **)malloc(sizeof(void *)*numThreads);
82         allConds = (pthread_cond_t **)malloc(sizeof(void *)*numThreads);
83         memset(allLocks, 0, sizeof(void *)*numThreads);
84         memset(allConds, 0, sizeof(void *)*numThreads);
85         
86         pthread_attr_t attr;
87         pthread_attr_init(&attr);
88         pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
89         ndhThreads = new pthread_t[numThreads];
90         mainHelperPhyRank = CmiOnCore();
91         numPhysicalPEs = CmiNumCores();
92         if(mainHelperPhyRank == -1) mainHelperPhyRank = 0;
93         for(int i=1; i<=numThreads; i++){
94                 pthread_create(ndhThreads+i, &attr, ndhThreadWork, (void *)i);
95         }
96         while(gCrtCnt != numThreads); //wait for all threads to finish creation
97 }
98
99 void FuncNodeHelper::exit(){
100         if(mode == NODEHELPER_PTHREAD){
101                 exitFlag = 1;             
102                 for(int i=0; i<numHelpers-1; i++)
103                         pthread_join(ndhThreads[i], NULL);
104                 delete [] ndhThreads;
105                 free(allLocks);
106                 free(allConds);
107                 delete pthdLoop;
108         }
109 }
110
111 /*====End of pthread-related variables and impelementation====*/
112
113 FuncNodeHelper::FuncNodeHelper(int mode_, int numThreads_)
114 {  
115     traceRegisterUserEvent("nodehelper total work",20);
116     traceRegisterUserEvent("nodehlelper finish signal",21);
117
118     mode = mode_;
119
120     if(mode == NODEHELPER_USECHARM){
121         //CkPrintf("FuncNodeHelper created on node %d\n", CkMyNode());                     
122         numHelpers = CkMyNodeSize();
123         helperPtr = new FuncSingleHelper *[numHelpers];
124         useTreeBcast = (numHelpers >= USE_TREE_BROADCAST_THRESHOLD);
125         
126         int pestart = CkNodeFirst(CkMyNode());
127             
128         for (int i=0; i<numHelpers; i++) {
129             CkChareID helper;
130             CProxy_FuncSingleHelper::ckNew((size_t)this, &helper, pestart+i);
131         }
132     }else if(mode == NODEHELPER_PTHREAD){
133                 helperPtr = NULL;
134                 
135                 numHelpers = numThreads_;
136                 useTreeBcast = (numHelpers >= USE_TREE_BROADCAST_THRESHOLD);
137                 pthdLoop = new CurLoopInfo(FuncNodeHelper::MAX_CHUNKS);
138                 mainHelper = this;
139         createPThreads();
140     }
141 }
142
143 int FuncNodeHelper::MAX_CHUNKS = 64;
144
145 #if CMK_TRACE_ENABLED
146 #define TRACE_START(id) _start = CmiWallTimer()
147 #define TRACE_BRACKET(id) traceUserBracketEvent(id,_start,CmiWallTimer())
148 #else
149 #define TRACE_START(id)
150 #define TRACE_BRACKET(id)
151 #endif
152
153 #define ALLOW_MULTIPLE_UNSYNC 1
154 void FuncNodeHelper::parallelizeFunc(HelperFn func, int paramNum, void * param, 
155                                     int numChunks, int lowerRange, 
156                                     int upperRange, int sync,
157                                     void *redResult, REDUCTION_TYPE type) {
158                                         
159     double _start; //may be used for tracing
160     
161     if(numChunks > MAX_CHUNKS){ 
162         CkPrintf("NodeHelper[%d]: WARNING! chunk is set to MAX_CHUNKS=%d\n", CmiMyPe(), MAX_CHUNKS);
163         numChunks = MAX_CHUNKS;
164     }
165         
166     /* "stride" determines the number of loop iterations to be done in each chunk
167      * for chunk indexed at 0 to remainder-1, stride is "unit+1";
168      * for chunk indexed at remainder to numChunks-1, stride is "unit"
169      */
170      int stride;
171          CurLoopInfo *curLoop = NULL;
172     
173     //for using nodequeue
174         TRACE_START(20);
175         if(mode == NODEHELPER_USECHARM){        
176                 FuncSingleHelper *thisHelper = helperPtr[CkMyRank()];
177         #if ALLOW_MULTIPLE_UNSYNC
178                 ConverseNotifyMsg *notifyMsg = thisHelper->getNotifyMsg();
179         #else
180                 ConverseNotifyMsg *notifyMsg = thisHelper->notifyMsg;
181         #endif
182                 curLoop = (CurLoopInfo *)(notifyMsg->ptr);
183                 curLoop->set(numChunks, func, lowerRange, upperRange, paramNum, param); 
184                 if(useTreeBcast){               
185                         int loopTimes = TREE_BCAST_BRANCH>(CmiMyNodeSize()-1)?CmiMyNodeSize()-1:TREE_BCAST_BRANCH;
186                         //just implicit binary tree
187                         int pe = CmiMyRank()+1;        
188                         for(int i=0; i<loopTimes; i++, pe++){
189                                 if(pe >= CmiMyNodeSize()) pe -= CmiMyNodeSize();
190                                 CmiPushPE(pe, (void *)(notifyMsg));    
191                         }
192                 }else{
193                         for (int i=0; i<numHelpers; i++) {
194                                 if (i!=CkMyRank()) CmiPushPE(i, (void *)(notifyMsg));            
195                         }
196                 }
197         }else if(mode == NODEHELPER_PTHREAD){
198                 int numThreads = numHelpers-1;
199                 curLoop = pthdLoop;
200                 curLoop->set(numChunks, func, lowerRange, upperRange, paramNum, param);
201                 int numNotices = numThreads;
202                 if(useTreeBcast){
203                         numNotices = TREE_BCAST_BRANCH>=numThreads?numThreads:TREE_BCAST_BRANCH;
204                 }
205                 for(int i=0; i<numNotices; i++){
206                         pthread_mutex_lock(allLocks[i]);
207                         pthread_cond_signal(allConds[i]);
208                         pthread_mutex_unlock(allLocks[i]);
209                 }
210                 //in this mode, it's always synced
211                 sync = 1;
212         }
213         
214         curLoop->stealWork();
215         TRACE_BRACKET(20);
216         
217         TRACE_START(21);                
218         curLoop->waitLoopDone(sync);
219         TRACE_BRACKET(21);        
220
221     if (type!=NODEHELPER_NONE)
222         reduce(curLoop->getRedBufs(), redResult, type, numChunks);            
223     return;
224 }
225
226 #define COMPUTE_REDUCTION(T) {\
227     for(int i=0; i<numChunks; i++) {\
228      result += *((T *)(redBufs[i])); \
229      /*CkPrintf("Nodehelper Reduce: %d\n", result);*/ \
230     }\
231 }
232
233 void FuncNodeHelper::reduce(void **redBufs, void *redBuf, REDUCTION_TYPE type, int numChunks) {
234     switch(type){
235         case NODEHELPER_INT_SUM:
236         {
237             int result=0;
238             COMPUTE_REDUCTION(int)
239             *((int *)redBuf) = result;
240             break;
241         }
242         case NODEHELPER_FLOAT_SUM:
243         {
244             float result=0;
245             COMPUTE_REDUCTION(float)
246             *((float *)redBuf) = result;
247             break;
248         }
249         case NODEHELPER_DOUBLE_SUM:
250         {
251             double result=0;
252             COMPUTE_REDUCTION(double)
253             *((double *)redBuf) = result;
254             break;
255         }
256         default:
257         break;
258     }
259 }
260
261 CpvStaticDeclare(int, NdhStealWorkHandler);
262 static void RegisterNodeHelperHdlrs(){
263     CpvInitialize(int, NdhStealWorkHandler);
264     CpvAccess(NdhStealWorkHandler) = CmiRegisterHandler((CmiHandler)SingleHelperStealWork);
265 }
266
267 FuncSingleHelper::FuncSingleHelper(size_t ndhPtr) {
268     thisNodeHelper = (FuncNodeHelper *)ndhPtr;
269     CmiAssert(thisNodeHelper!=NULL);
270         
271         nextFreeNotifyMsg = 0;
272     notifyMsg = (ConverseNotifyMsg *)malloc(sizeof(ConverseNotifyMsg)*MSG_BUFFER_SIZE);
273     for(int i=0; i<MSG_BUFFER_SIZE; i++){
274         ConverseNotifyMsg *tmp = notifyMsg+i;
275         if(thisNodeHelper->useTreeBcast){
276             tmp->srcRank = CmiMyRank();
277         }else{
278             tmp->srcRank = -1;
279         }            
280         tmp->ptr = (void *)(new CurLoopInfo(FuncNodeHelper::MAX_CHUNKS));
281         CmiSetHandler(tmp, CpvAccess(NdhStealWorkHandler));
282     }
283     thisNodeHelper->helperPtr[CkMyRank()] = this;
284 }
285
286
287 void SingleHelperStealWork(ConverseNotifyMsg *msg){
288         
289         int srcRank = msg->srcRank;
290         
291         if(srcRank >= 0){
292                 //means using tree-broadcast to send the notification msg
293                 
294                 //int numHelpers = CmiMyNodeSize(); //the value of "numHelpers" should be obtained somewhere else
295                 int relPE = CmiMyRank()-msg->srcRank;
296                 if(relPE<0) relPE += CmiMyNodeSize();
297                 
298                 //CmiPrintf("Rank[%d]: got msg from src %d with relPE %d\n", CmiMyRank(), msg->srcRank, relPE);
299                 relPE=relPE*TREE_BCAST_BRANCH+1;
300                 for(int i=0; i<TREE_BCAST_BRANCH; i++, relPE++){
301                         if(relPE >= CmiMyNodeSize()) break;
302                         int pe = (relPE + msg->srcRank)%CmiMyNodeSize();
303                         //CmiPrintf("Rank[%d]: send msg to dst %d (relPE: %d) from src %d\n", CmiMyRank(), pe, relPE, msg->srcRank);
304                         CmiPushPE(pe, (void *)msg);
305                 }
306         }
307     CurLoopInfo *loop = (CurLoopInfo *)msg->ptr;
308     loop->stealWork();
309 }
310
311 void CurLoopInfo::stealWork(){
312     //indicate the current work hasn't been initialized
313     //or the old work has finished.
314     if(inited == 0) return;
315     
316     int first, last;
317     int unit = (upperIndex-lowerIndex+1)/numChunks;
318     int remainder = (upperIndex-lowerIndex+1)-unit*numChunks;
319     int markIdx = remainder*(unit+1);
320     
321     int nextChunkId = getNextChunkIdx();
322     int execTimes = 0;
323     while(nextChunkId < numChunks){
324         if(nextChunkId < remainder){
325             first = (unit+1)*nextChunkId;
326             last = first+unit;
327         }else{
328             first = (nextChunkId - remainder)*unit + markIdx;
329             last = first+unit-1;
330         }
331                 
332         fnPtr(first, last, redBufs[nextChunkId], paramNum, param);
333         execTimes++;
334         nextChunkId = getNextChunkIdx();
335     }
336     reportFinished(execTimes);
337 }
338
339 //======================================================================//
340 //   End of functions related with FuncSingleHelper                     //
341 //======================================================================//
342
343 CProxy_FuncNodeHelper NodeHelper_Init(int mode, int numThreads){
344     if(mode == NODEHELPER_USECHARM){
345         CkPrintf("NodeHelperLib is used in SMP with a simple dynamic scheduling but not using node-level queue\n");
346     }else if(mode==NODEHELPER_PTHREAD){
347         CkPrintf("NodeHelperLib is used with extra %d pthreads via a simple dynamic scheduling\n", numThreads);
348         CmiAssert(numThreads>0);
349     }
350     return CProxy_FuncNodeHelper::ckNew(mode, numThreads);
351 }
352
353 void NodeHelper_Exit(CProxy_FuncNodeHelper nodeHelper){
354         nodeHelper.exit();
355 }
356
357 void NodeHelper_Parallelize(CProxy_FuncNodeHelper nodeHelper, HelperFn func, 
358                         int paramNum, void * param, 
359                         int numChunks, int lowerRange, int upperRange,
360                         int sync,
361                         void *redResult, REDUCTION_TYPE type)
362 {
363     nodeHelper[CkMyNode()].ckLocalBranch()->parallelizeFunc(func, paramNum, param, numChunks, lowerRange, upperRange, sync, redResult, type);
364 }
365
366 #include "NodeHelper.def.h"