Added the initial support of using pthreads for parallelization (a special SMP mode...
[charm.git] / NodeHelper.C
1 #include "NodeHelper.h"
2 #include <pthread.h>
3
4 /*====Beginning of pthread-related variables and impelementation====*/
5 //__thread is not portable, but it works almost everywhere if pthread works
6 //After C++11, this should be thread_local
7 static __thread pthread_cond_t thdCondition; //the signal var of each pthread to be notified
8 static __thread pthread_mutex_t thdLock; //the lock associated with the condition variables
9
10 static FuncNodeHelper *mainHelper = NULL;
11 static int mainHelperPhyRank = 0;
12 static int numPhysicalPEs = 0;
13 static CurLoopInfo *pthdLoop = NULL; //the pthread-version is always synchronized
14 static pthread_mutex_t **allLocks = NULL;
15 static pthread_cond_t **allConds = NULL;
16 static pthread_t *ndhThreads = NULL;
17 static volatile int gCrtCnt = 0;
18 static volatile int exitFlag = 0;
19
20 static void *ndhThreadWork(void *id) {
21         size_t myId = (size_t) id;
22         
23         //further improvement of this affinity setting!!
24         int myPhyRank = (myId+mainHelperPhyRank)%numPhysicalPEs;
25         //printf("thread[%d]: affixed to rank %d\n", myId, myPhyRank);
26         myPhyRank = myId;
27         CmiSetCPUAffinity(myPhyRank);
28         
29         pthread_mutex_init(&thdLock, NULL);
30         pthread_cond_init(&thdCondition, NULL);
31         
32         allLocks[myId-1] = &thdLock;
33         allConds[myId-1] = &thdCondition;
34         
35         __sync_add_and_fetch(&gCrtCnt, 1);
36         
37         while(1){
38                 if(exitFlag) break;
39                 pthread_mutex_lock(&thdLock);
40                 pthread_cond_wait(&thdCondition, &thdLock);
41                 pthread_mutex_unlock(&thdLock);
42                 /* kids ID range: [1 ~ numHelpers-1] */
43                 if(mainHelper->needTreeBcast()){
44                         //notify my children
45                         int myKid = myId*TREE_BCAST_BRANCH+1;
46                         for(int i=0; i<TREE_BCAST_BRANCH; i++, myKid++){
47                                 if(myKid >= mainHelper->getNumHelpers()) break;
48                                 //all locks and conditions exclude the main thread, so index needs to be subtracted by one
49                                 pthread_mutex_lock(allLocks[myKid-1]);
50                                 pthread_cond_signal(allConds[myKid-1]);
51                                 pthread_mutex_unlock(allLocks[myKid-1]);
52                         }
53                 }
54                 pthdLoop->stealWork();
55         }
56 }
57
58 void FuncNodeHelper::createPThreads() {
59         int numThreads = numHelpers - 1;
60         allLocks = (pthread_mutex_t **)malloc(sizeof(void *)*numThreads);
61         allConds = (pthread_cond_t **)malloc(sizeof(void *)*numThreads);
62         memset(allLocks, 0, sizeof(void *)*numThreads);
63         memset(allConds, 0, sizeof(void *)*numThreads);
64         
65         pthread_attr_t attr;
66         pthread_attr_init(&attr);
67         pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
68         ndhThreads = new pthread_t[numThreads];
69         mainHelperPhyRank = CmiPhysicalRank(CmiMyPe());
70         numPhysicalPEs = CmiNumPesOnPhysicalNode(CmiPhysicalNodeID(CmiMyPe()));
71         for(int i=1; i<=numThreads; i++){
72                 pthread_create(ndhThreads+i, &attr, ndhThreadWork, (void *)i);
73         }
74         while(gCrtCnt != numThreads); //wait for all threads to finish creation
75 }
76
77 void FuncNodeHelper::exit(){
78         if(mode == NODEHELPER_PTHREAD){
79                 exitFlag = 1;             
80                 for(int i=0; i<numHelpers-1; i++)
81                         pthread_join(ndhThreads[i], NULL);
82                 delete [] ndhThreads;
83                 free(allLocks);
84                 free(allConds);
85                 delete pthdLoop;
86         }
87 }
88
89 /*====End of pthread-related variables and impelementation====*/
90
91 FuncNodeHelper::FuncNodeHelper(int mode_, int numThreads_)
92 {  
93     traceRegisterUserEvent("nodehelper total work",20);
94     traceRegisterUserEvent("nodehlelper finish signal",21);
95
96     mode = mode_;
97
98     if(mode == NODEHELPER_USECHARM){
99         //CkPrintf("FuncNodeHelper created on node %d\n", CkMyNode());                     
100         numHelpers = CkMyNodeSize();
101         helperPtr = new FuncSingleHelper *[numHelpers];
102         useTreeBcast = (numHelpers >= USE_TREE_BROADCAST_THRESHOLD);
103         
104         int pestart = CkNodeFirst(CkMyNode());
105             
106         for (int i=0; i<numHelpers; i++) {
107             CkChareID helper;
108             CProxy_FuncSingleHelper::ckNew((size_t)this, &helper, pestart+i);
109         }
110     }else if(mode == NODEHELPER_PTHREAD){
111                 helperPtr = NULL;
112                 
113                 numHelpers = numThreads_;
114                 useTreeBcast = (numHelpers >= USE_TREE_BROADCAST_THRESHOLD);
115                 pthdLoop = new CurLoopInfo(FuncNodeHelper::MAX_CHUNKS);
116                 mainHelper = this;
117         createPThreads();
118     }
119 }
120
121 int FuncNodeHelper::MAX_CHUNKS = 64;
122
123 #if CMK_TRACE_ENABLED
124 #define TRACE_START(id) _start = CmiWallTimer()
125 #define TRACE_BRACKET(id) traceUserBracketEvent(id,_start,CmiWallTimer())
126 #else
127 #define TRACE_START(id)
128 #define TRACE_BRACKET(id)
129 #endif
130
131 #define ALLOW_MULTIPLE_UNSYNC 1
132 void FuncNodeHelper::parallelizeFunc(HelperFn func, int paramNum, void * param, 
133                                     int numChunks, int lowerRange, 
134                                     int upperRange, int sync,
135                                     void *redResult, REDUCTION_TYPE type) {
136                                         
137     double _start; //may be used for tracing
138     
139     if(numChunks > MAX_CHUNKS){ 
140         CkPrintf("NodeHelper[%d]: WARNING! chunk is set to MAX_CHUNKS=%d\n", CmiMyPe(), MAX_CHUNKS);
141         numChunks = MAX_CHUNKS;
142     }
143         
144     /* "stride" determines the number of loop iterations to be done in each chunk
145      * for chunk indexed at 0 to remainder-1, stride is "unit+1";
146      * for chunk indexed at remainder to numChunks-1, stride is "unit"
147      */
148      int stride;
149          CurLoopInfo *curLoop = NULL;
150     
151     //for using nodequeue
152         TRACE_START(20);
153         if(mode == NODEHELPER_USECHARM){        
154                 FuncSingleHelper *thisHelper = helperPtr[CkMyRank()];
155         #if ALLOW_MULTIPLE_UNSYNC
156                 ConverseNotifyMsg *notifyMsg = thisHelper->getNotifyMsg();
157         #else
158                 ConverseNotifyMsg *notifyMsg = thisHelper->notifyMsg;
159         #endif
160                 curLoop = (CurLoopInfo *)(notifyMsg->ptr);
161                 curLoop->set(numChunks, func, lowerRange, upperRange, paramNum, param); 
162                 if(useTreeBcast){               
163                         int loopTimes = TREE_BCAST_BRANCH>(CmiMyNodeSize()-1)?CmiMyNodeSize()-1:TREE_BCAST_BRANCH;
164                         //just implicit binary tree
165                         int pe = CmiMyRank()+1;        
166                         for(int i=0; i<loopTimes; i++, pe++){
167                                 if(pe >= CmiMyNodeSize()) pe -= CmiMyNodeSize();
168                                 CmiPushPE(pe, (void *)(notifyMsg));    
169                         }
170                 }else{
171                         for (int i=0; i<numHelpers; i++) {
172                                 if (i!=CkMyRank()) CmiPushPE(i, (void *)(notifyMsg));            
173                         }
174                 }
175         }else if(mode == NODEHELPER_PTHREAD){
176                 int numThreads = numHelpers-1;
177                 curLoop = pthdLoop;
178                 curLoop->set(numChunks, func, lowerRange, upperRange, paramNum, param);
179                 int numNotices = numThreads;
180                 if(useTreeBcast){
181                         numNotices = TREE_BCAST_BRANCH>=numThreads?numThreads:TREE_BCAST_BRANCH;
182                 }
183                 for(int i=0; i<numNotices; i++){
184                         pthread_mutex_lock(allLocks[i]);
185                         pthread_cond_signal(allConds[i]);
186                         pthread_mutex_unlock(allLocks[i]);
187                 }
188                 //in this mode, it's always synced
189                 sync = 1;
190         }
191         
192         curLoop->stealWork();
193         TRACE_BRACKET(20);
194         
195         TRACE_START(21);                
196         curLoop->waitLoopDone(sync);
197         TRACE_BRACKET(21);        
198
199     if (type!=NODEHELPER_NONE)
200         reduce(curLoop->getRedBufs(), redResult, type, numChunks);            
201     return;
202 }
203
204 #define COMPUTE_REDUCTION(T) {\
205     for(int i=0; i<numChunks; i++) {\
206      result += *((T *)(redBufs[i])); \
207      /*CkPrintf("Nodehelper Reduce: %d\n", result);*/ \
208     }\
209 }
210
211 void FuncNodeHelper::reduce(void **redBufs, void *redBuf, REDUCTION_TYPE type, int numChunks) {
212     switch(type){
213         case NODEHELPER_INT_SUM:
214         {
215             int result=0;
216             COMPUTE_REDUCTION(int)
217             *((int *)redBuf) = result;
218             break;
219         }
220         case NODEHELPER_FLOAT_SUM:
221         {
222             float result=0;
223             COMPUTE_REDUCTION(float)
224             *((float *)redBuf) = result;
225             break;
226         }
227         case NODEHELPER_DOUBLE_SUM:
228         {
229             double result=0;
230             COMPUTE_REDUCTION(double)
231             *((double *)redBuf) = result;
232             break;
233         }
234         default:
235         break;
236     }
237 }
238
239 CpvStaticDeclare(int, NdhStealWorkHandler);
240 static void RegisterNodeHelperHdlrs(){
241     CpvInitialize(int, NdhStealWorkHandler);
242     CpvAccess(NdhStealWorkHandler) = CmiRegisterHandler((CmiHandler)SingleHelperStealWork);
243 }
244
245 FuncSingleHelper::FuncSingleHelper(size_t ndhPtr) {
246     thisNodeHelper = (FuncNodeHelper *)ndhPtr;
247     CmiAssert(thisNodeHelper!=NULL);
248         
249         nextFreeNotifyMsg = 0;
250     notifyMsg = (ConverseNotifyMsg *)malloc(sizeof(ConverseNotifyMsg)*MSG_BUFFER_SIZE);
251     for(int i=0; i<MSG_BUFFER_SIZE; i++){
252         ConverseNotifyMsg *tmp = notifyMsg+i;
253         if(thisNodeHelper->useTreeBcast){
254             tmp->srcRank = CmiMyRank();
255         }else{
256             tmp->srcRank = -1;
257         }            
258         tmp->ptr = (void *)(new CurLoopInfo(FuncNodeHelper::MAX_CHUNKS));
259         CmiSetHandler(tmp, CpvAccess(NdhStealWorkHandler));
260     }
261     thisNodeHelper->helperPtr[CkMyRank()] = this;
262 }
263
264
265 void SingleHelperStealWork(ConverseNotifyMsg *msg){
266         
267         int srcRank = msg->srcRank;
268         
269         if(srcRank >= 0){
270                 //means using tree-broadcast to send the notification msg
271                 
272                 //int numHelpers = CmiMyNodeSize(); //the value of "numHelpers" should be obtained somewhere else
273                 int relPE = CmiMyRank()-msg->srcRank;
274                 if(relPE<0) relPE += CmiMyNodeSize();
275                 
276                 //CmiPrintf("Rank[%d]: got msg from src %d with relPE %d\n", CmiMyRank(), msg->srcRank, relPE);
277                 relPE=relPE*TREE_BCAST_BRANCH+1;
278                 for(int i=0; i<TREE_BCAST_BRANCH; i++, relPE++){
279                         if(relPE >= CmiMyNodeSize()) break;
280                         int pe = (relPE + msg->srcRank)%CmiMyNodeSize();
281                         //CmiPrintf("Rank[%d]: send msg to dst %d (relPE: %d) from src %d\n", CmiMyRank(), pe, relPE, msg->srcRank);
282                         CmiPushPE(pe, (void *)msg);
283                 }
284         }
285     CurLoopInfo *loop = (CurLoopInfo *)msg->ptr;
286     loop->stealWork();
287 }
288
289 void CurLoopInfo::stealWork(){
290     //indicate the current work hasn't been initialized
291     //or the old work has finished.
292     if(inited == 0) return;
293     
294     int first, last;
295     int unit = (upperIndex-lowerIndex+1)/numChunks;
296     int remainder = (upperIndex-lowerIndex+1)-unit*numChunks;
297     int markIdx = remainder*(unit+1);
298     
299     int nextChunkId = getNextChunkIdx();
300     int execTimes = 0;
301     while(nextChunkId < numChunks){
302         if(nextChunkId < remainder){
303             first = (unit+1)*nextChunkId;
304             last = first+unit;
305         }else{
306             first = (nextChunkId - remainder)*unit + markIdx;
307             last = first+unit-1;
308         }
309                 
310         fnPtr(first, last, redBufs[nextChunkId], paramNum, param);
311         execTimes++;
312         nextChunkId = getNextChunkIdx();
313     }
314     reportFinished(execTimes);
315 }
316
317 //======================================================================//
318 //   End of functions related with FuncSingleHelper                     //
319 //======================================================================//
320
321 CProxy_FuncNodeHelper NodeHelper_Init(int mode, int numThreads){
322     if(mode == NODEHELPER_USECHARM){
323         CkPrintf("NodeHelperLib is used in SMP with a simple dynamic scheduling but not using node-level queue\n");
324     }else if(mode==NODEHELPER_PTHREAD){
325         CkPrintf("NodeHelperLib is used with extra %d pthreads via a simple dynamic scheduling\n", numThreads);
326         CmiAssert(numThreads>0);
327     }
328     return CProxy_FuncNodeHelper::ckNew(mode, numThreads);
329 }
330
331 void NodeHelper_Exit(CProxy_FuncNodeHelper nodeHelper){
332         nodeHelper.exit();
333 }
334
335 void NodeHelper_Parallelize(CProxy_FuncNodeHelper nodeHelper, HelperFn func, 
336                         int paramNum, void * param, 
337                         int numChunks, int lowerRange, int upperRange,
338                         int sync,
339                         void *redResult, REDUCTION_TYPE type)
340 {
341     nodeHelper[CkMyNode()].ckLocalBranch()->parallelizeFunc(func, paramNum, param, numChunks, lowerRange, upperRange, sync, redResult, type);
342 }
343
344 #include "NodeHelper.def.h"