1. added output if using charm-level notification
[charm.git] / NodeHelper.C
1 #include "NodeHelper.h"
2 #include <pthread.h>
3
4 #if !USE_CONVERSE_NOTIFICATION
5 #include "qd.h"
6 #endif
7
8 /*====Beginning of pthread-related variables and impelementation====*/
9 //__thread is not portable, but it works almost everywhere if pthread works
10 //After C++11, this should be thread_local
11 static __thread pthread_cond_t thdCondition; //the signal var of each pthread to be notified
12 static __thread pthread_mutex_t thdLock; //the lock associated with the condition variables
13
14 static FuncNodeHelper *mainHelper = NULL;
15 static int mainHelperPhyRank = 0;
16 static int numPhysicalPEs = 0;
17 static CurLoopInfo *pthdLoop = NULL; //the pthread-version is always synchronized
18 static pthread_mutex_t **allLocks = NULL;
19 static pthread_cond_t **allConds = NULL;
20 static pthread_t *ndhThreads = NULL;
21 static volatile int gCrtCnt = 0;
22 static volatile int exitFlag = 0;
23
24 #if CMK_OS_IS_LINUX
25 #include <sys/syscall.h>
26 #endif
27
28 static int HelperOnCore(){
29 #if CMK_OS_IS_LINUX
30         char fname[64];
31         sprintf(fname, "/proc/%d/task/%ld/stat", getpid(), syscall(SYS_gettid));
32         FILE *ifp = fopen(fname, "r");
33         if(ifp == NULL) return -1;
34         fseek(ifp, 0, SEEK_SET);
35         char str[128];
36         for(int i=0; i<39; i++) fscanf(ifp, "%s", str);
37         fclose(ifp);
38         return atoi(str);
39 #else
40         return -1;
41 #endif
42 }
43
44 static void *ndhThreadWork(void *id) {
45         size_t myId = (size_t) id;
46
47         //further improvement of this affinity setting!!
48         int myPhyRank = (myId+mainHelperPhyRank)%numPhysicalPEs;
49         //printf("thread[%d]: affixed to rank %d\n", myId, myPhyRank);
50         myPhyRank = myId;
51         CmiSetCPUAffinity(myPhyRank);
52
53         pthread_mutex_init(&thdLock, NULL);
54         pthread_cond_init(&thdCondition, NULL);
55
56         allLocks[myId-1] = &thdLock;
57         allConds[myId-1] = &thdCondition;
58
59         __sync_add_and_fetch(&gCrtCnt, 1);
60
61         while(1){
62                 //printf("thread[%ld]: on core %d with main %d\n", myId, HelperOnCore(), mainHelperPhyRank);
63                 if(exitFlag) break;
64                 pthread_mutex_lock(&thdLock);
65                 pthread_cond_wait(&thdCondition, &thdLock);
66                 pthread_mutex_unlock(&thdLock);
67                 /* kids ID range: [1 ~ numHelpers-1] */
68                 if(mainHelper->needTreeBcast()){
69                         //notify my children
70                         int myKid = myId*TREE_BCAST_BRANCH+1;
71                         for(int i=0; i<TREE_BCAST_BRANCH; i++, myKid++){
72                                 if(myKid >= mainHelper->getNumHelpers()) break;
73                                 //all locks and conditions exclude the main thread, so index needs to be subtracted by one
74                                 pthread_mutex_lock(allLocks[myKid-1]);
75                                 pthread_cond_signal(allConds[myKid-1]);
76                                 pthread_mutex_unlock(allLocks[myKid-1]);
77                         }
78                 }
79                 pthdLoop->stealWork();
80         }
81 }
82
83 void FuncNodeHelper::createPThreads() {
84         int numThreads = numHelpers - 1;
85         allLocks = (pthread_mutex_t **)malloc(sizeof(void *)*numThreads);
86         allConds = (pthread_cond_t **)malloc(sizeof(void *)*numThreads);
87         memset(allLocks, 0, sizeof(void *)*numThreads);
88         memset(allConds, 0, sizeof(void *)*numThreads);
89
90         pthread_attr_t attr;
91         pthread_attr_init(&attr);
92         pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
93         ndhThreads = new pthread_t[numThreads];
94         mainHelperPhyRank = CmiOnCore();
95         numPhysicalPEs = CmiNumCores();
96         if(mainHelperPhyRank == -1) mainHelperPhyRank = 0;
97         for(int i=1; i<=numThreads; i++){
98                 pthread_create(ndhThreads+i, &attr, ndhThreadWork, (void *)i);
99         }
100         while(gCrtCnt != numThreads); //wait for all threads to finish creation
101 }
102
103 void FuncNodeHelper::exit(){
104         if(mode == NODEHELPER_PTHREAD){
105                 exitFlag = 1;
106                 for(int i=0; i<numHelpers-1; i++)
107                         pthread_join(ndhThreads[i], NULL);
108                 delete [] ndhThreads;
109                 free(allLocks);
110                 free(allConds);
111                 delete pthdLoop;
112         }
113 }
114
115 /*====End of pthread-related variables and impelementation====*/
116
117
118 /* Note: Those event ids should be unique globally!! */
119 #define NDH_TOTAL_WORK_EVENTID  139
120 #define NDH_FINISH_SIGNAL_EVENTID 143
121
122 static FuncNodeHelper *globalNodeHelper = NULL;
123
124 FuncNodeHelper::FuncNodeHelper(int mode_, int numThreads_)
125 {
126     traceRegisterUserEvent("nodehelper total work",NDH_TOTAL_WORK_EVENTID);
127     traceRegisterUserEvent("nodehlelper finish signal",NDH_FINISH_SIGNAL_EVENTID);
128
129     mode = mode_;
130
131     CmiAssert(globalNodeHelper==NULL);
132     globalNodeHelper = this;
133
134     if(mode == NODEHELPER_USECHARM){
135         //CkPrintf("FuncNodeHelper created on node %d\n", CkMyNode());
136         numHelpers = CkMyNodeSize();
137         helperPtr = new FuncSingleHelper *[numHelpers];
138         useTreeBcast = (numHelpers >= USE_TREE_BROADCAST_THRESHOLD);
139
140         int pestart = CkNodeFirst(CkMyNode());
141
142         for (int i=0; i<numHelpers; i++) {
143             CkChareID helper;
144             CProxy_FuncSingleHelper::ckNew(numHelpers, &helper, pestart+i);
145         }
146     }else if(mode == NODEHELPER_PTHREAD){
147                 helperPtr = NULL;
148
149                 numHelpers = numThreads_;
150                 useTreeBcast = (numHelpers >= USE_TREE_BROADCAST_THRESHOLD);
151                 pthdLoop = new CurLoopInfo(FuncNodeHelper::MAX_CHUNKS);
152                 mainHelper = this;
153         createPThreads();
154     }
155 }
156
157 int FuncNodeHelper::MAX_CHUNKS = 64;
158
159 #if CMK_TRACE_ENABLED
160 #define TRACE_START(id) _start = CmiWallTimer()
161 #define TRACE_BRACKET(id) traceUserBracketEvent(id,_start,CmiWallTimer())
162 #else
163 #define TRACE_START(id)
164 #define TRACE_BRACKET(id)
165 #endif
166
167 #define ALLOW_MULTIPLE_UNSYNC 1
168 void FuncNodeHelper::parallelizeFunc(HelperFn func, int paramNum, void * param,
169                                     int numChunks, int lowerRange,
170                                                     int upperRange, int sync,
171                                     void *redResult, REDUCTION_TYPE type) {
172
173     double _start; //may be used for tracing
174
175     if(numChunks > MAX_CHUNKS){
176         CkPrintf("NodeHelper[%d]: WARNING! chunk is set to MAX_CHUNKS=%d\n", CmiMyPe(), MAX_CHUNKS);
177         numChunks = MAX_CHUNKS;
178     }
179
180     /* "stride" determines the number of loop iterations to be done in each chunk
181      * for chunk indexed at 0 to remainder-1, stride is "unit+1";
182      * for chunk indexed at remainder to numChunks-1, stride is "unit"
183      */
184      int stride;
185          CurLoopInfo *curLoop = NULL;
186
187     //for using nodequeue
188         TRACE_START(NDH_TOTAL_WORK_EVENTID);
189         if(mode == NODEHELPER_USECHARM){
190                 FuncSingleHelper *thisHelper = helperPtr[CkMyRank()];
191 #if USE_CONVERSE_NOTIFICATION
192         #if ALLOW_MULTIPLE_UNSYNC
193                 ConverseNotifyMsg *notifyMsg = thisHelper->getNotifyMsg();
194         #else
195                 ConverseNotifyMsg *notifyMsg = thisHelper->notifyMsg;
196         #endif
197                 curLoop = (CurLoopInfo *)(notifyMsg->ptr);
198                 curLoop->set(numChunks, func, lowerRange, upperRange, paramNum, param);
199                 if(useTreeBcast){
200                         int loopTimes = TREE_BCAST_BRANCH>(CmiMyNodeSize()-1)?CmiMyNodeSize()-1:TREE_BCAST_BRANCH;
201                         //just implicit binary tree
202                         int pe = CmiMyRank()+1;
203                         for(int i=0; i<loopTimes; i++, pe++){
204                                 if(pe >= CmiMyNodeSize()) pe -= CmiMyNodeSize();
205                                 CmiPushPE(pe, (void *)(notifyMsg));
206                         }
207                 }else{
208                         for (int i=CmiMyRank()+1; i<numHelpers; i++) {
209                                 CmiPushPE(i, (void *)(notifyMsg));
210                         }
211                         for (int i=0; i<CmiMyRank(); i++) {
212                                 CmiPushPE(i, (void *)(notifyMsg));
213                         }
214                 }
215 #else
216     #if ALLOW_MULTIPLE_UNSYNC
217         curLoop = thisHelper->getNewTask();
218     #else
219         curLoop = thisHelper->taskBuffer[0];
220     #endif
221         curLoop->set(numChunks, func, lowerRange, upperRange, paramNum, param);
222         CpvAccess(_qd)->create(numHelpers-1);
223         if(useTreeBcast){
224                         int loopTimes = TREE_BCAST_BRANCH>(CmiMyNodeSize()-1)?CmiMyNodeSize()-1:TREE_BCAST_BRANCH;
225                         //just implicit binary tree
226                         int pe = CmiMyRank()+1;
227                         for(int i=0; i<loopTimes; i++, pe++){
228                                 if(pe >= CmiMyNodeSize()) pe -= CmiMyNodeSize();
229                 CharmNotifyMsg *one = thisHelper->getNotifyMsg();
230                 one->ptr = (void *)curLoop;
231                 envelope *env = UsrToEnv(one);
232                 env->setObjPtr(thisHelper->ckGetChareID().objPtr);
233                                 CmiPushPE(pe, (void *)(env));
234                         }
235                 }else{
236                         for (int i=CmiMyRank()+1; i<numHelpers; i++) {
237                 CharmNotifyMsg *one = thisHelper->getNotifyMsg();
238                 one->ptr = (void *)curLoop;
239                 envelope *env = UsrToEnv(one);
240                 env->setObjPtr(thisHelper->ckGetChareID().objPtr);
241                 //printf("[%d] sending a msg %p (env=%p) to [%d]\n", CmiMyRank(), one, env, i);
242                                 CmiPushPE(i, (void *)(env));
243                         }
244                         for (int i=0; i<CmiMyRank(); i++) {
245                 CharmNotifyMsg *one = thisHelper->getNotifyMsg();
246                 one->ptr = (void *)curLoop;
247                 envelope *env = UsrToEnv(one);
248                 env->setObjPtr(thisHelper->ckGetChareID().objPtr);
249                 //printf("[%d] sending a msg %p (env=%p) to [%d]\n", CmiMyRank(), one, env, i);
250                                 CmiPushPE(i, (void *)(env));
251                         }
252                 }
253 #endif
254         }else if(mode == NODEHELPER_PTHREAD){
255                 int numThreads = numHelpers-1;
256                 curLoop = pthdLoop;
257                 curLoop->set(numChunks, func, lowerRange, upperRange, paramNum, param);
258                 int numNotices = numThreads;
259                 if(useTreeBcast){
260                         numNotices = TREE_BCAST_BRANCH>=numThreads?numThreads:TREE_BCAST_BRANCH;
261                 }
262                 for(int i=0; i<numNotices; i++){
263                         pthread_mutex_lock(allLocks[i]);
264                         pthread_cond_signal(allConds[i]);
265                         pthread_mutex_unlock(allLocks[i]);
266                 }
267                 //in this mode, it's always synced
268                 sync = 1;
269         }
270
271         curLoop->stealWork();
272         TRACE_BRACKET(NDH_TOTAL_WORK_EVENTID);
273
274     //printf("[%d]: parallelize func %p with [%d ~ %d] divided into %d chunks using loop=%p\n", CkMyPe(), func, lowerRange, upperRange, numChunks, curLoop);
275
276         TRACE_START(NDH_FINISH_SIGNAL_EVENTID);
277         curLoop->waitLoopDone(sync);
278         TRACE_BRACKET(NDH_FINISH_SIGNAL_EVENTID);
279
280     //printf("[%d]: finished parallelize func %p with [%d ~ %d] divided into %d chunks using loop=%p\n", CkMyPe(), func, lowerRange, upperRange, numChunks, curLoop);
281
282     if (type!=NODEHELPER_NONE)
283         reduce(curLoop->getRedBufs(), redResult, type, numChunks);
284     return;
285 }
286
287 #define COMPUTE_REDUCTION(T) {\
288     for(int i=0; i<numChunks; i++) {\
289      result += *((T *)(redBufs[i])); \
290      /*CkPrintf("Nodehelper Reduce: %d\n", result);*/ \
291     }\
292 }
293
294 void FuncNodeHelper::reduce(void **redBufs, void *redBuf, REDUCTION_TYPE type, int numChunks) {
295     switch(type){
296         case NODEHELPER_INT_SUM:
297         {
298             int result=0;
299             COMPUTE_REDUCTION(int)
300             *((int *)redBuf) = result;
301             break;
302         }
303         case NODEHELPER_FLOAT_SUM:
304         {
305             float result=0;
306             COMPUTE_REDUCTION(float)
307             *((float *)redBuf) = result;
308             break;
309         }
310         case NODEHELPER_DOUBLE_SUM:
311         {
312             double result=0;
313             COMPUTE_REDUCTION(double)
314             *((double *)redBuf) = result;
315             break;
316         }
317         default:
318         break;
319     }
320 }
321
322 CpvStaticDeclare(int, NdhStealWorkHandler);
323 static void RegisterNodeHelperHdlrs(){
324     CpvInitialize(int, NdhStealWorkHandler);
325     CpvAccess(NdhStealWorkHandler) = CmiRegisterHandler((CmiHandler)SingleHelperStealWork);
326 }
327
328 extern int _charmHandlerIdx;
329 FuncSingleHelper::FuncSingleHelper(int numHelpers) {
330     totalHelpers = numHelpers;
331 #if USE_CONVERSE_NOTIFICATION
332     notifyMsgBufSize = TASK_BUFFER_SIZE;
333 #else
334     notifyMsgBufSize = TASK_BUFFER_SIZE*totalHelpers;
335 #endif
336
337     CmiAssert(globalNodeHelper!=NULL);
338     thisNodeHelper = globalNodeHelper;
339
340         nextFreeNotifyMsg = 0;
341 #if USE_CONVERSE_NOTIFICATION
342     notifyMsg = (ConverseNotifyMsg *)malloc(sizeof(ConverseNotifyMsg)*notifyMsgBufSize);
343     for(int i=0; i<notifyMsgBufSize; i++){
344         ConverseNotifyMsg *tmp = notifyMsg+i;
345         if(thisNodeHelper->useTreeBcast){
346             tmp->srcRank = CmiMyRank();
347         }else{
348             tmp->srcRank = -1;
349         }
350         tmp->ptr = (void *)(new CurLoopInfo(FuncNodeHelper::MAX_CHUNKS));
351         CmiSetHandler(tmp, CpvAccess(NdhStealWorkHandler));
352     }
353 #else
354     nextFreeTaskBuffer = 0;
355     notifyMsg = (CharmNotifyMsg **)malloc(sizeof(CharmNotifyMsg *)*notifyMsgBufSize);
356     for(int i=0; i<notifyMsgBufSize; i++){
357         CharmNotifyMsg *tmp = new(sizeof(int)*8)CharmNotifyMsg; //allow msg priority bits
358         notifyMsg[i] = tmp;
359         if(thisNodeHelper->useTreeBcast){
360             tmp->srcRank = CmiMyRank();
361         }else{
362             tmp->srcRank = -1;
363         }
364         tmp->ptr = NULL;
365         envelope *env = UsrToEnv(tmp);
366         env->setMsgtype(ForChareMsg);
367         env->setEpIdx(CkIndex_FuncSingleHelper::stealWork(NULL));
368         env->setSrcPe(CkMyPe());
369         CmiSetHandler(env, _charmHandlerIdx);
370         //env->setObjPtr has to be called when a notification msg is sent
371     }
372     taskBuffer = (CurLoopInfo **)malloc(sizeof(CurLoopInfo *)*TASK_BUFFER_SIZE);
373     for(int i=0; i<TASK_BUFFER_SIZE; i++){
374         taskBuffer[i] = new CurLoopInfo(FuncNodeHelper::MAX_CHUNKS);
375     }
376 #endif
377     globalNodeHelper->helperPtr[CkMyRank()] = this;
378 }
379
380 void FuncSingleHelper::stealWork(CharmNotifyMsg *msg){
381 #if !USE_CONVERSE_NOTIFICATION
382     int srcRank = msg->srcRank;
383     CurLoopInfo *loop = (CurLoopInfo *)msg->ptr;
384     if(srcRank >= 0){
385         //means using tree-broadcast to send the notification msg
386         int relPE = CmiMyRank()-msg->srcRank;
387                 if(relPE<0) relPE += CmiMyNodeSize();
388
389                 //CmiPrintf("Rank[%d]: got msg from src %d with relPE %d\n", CmiMyRank(), msg->srcRank, relPE);
390                 relPE=relPE*TREE_BCAST_BRANCH+1;
391                 for(int i=0; i<TREE_BCAST_BRANCH; i++, relPE++){
392                         if(relPE >= CmiMyNodeSize()) break;
393                         int pe = (relPE + msg->srcRank)%CmiMyNodeSize();
394                         //CmiPrintf("Rank[%d]: send msg to dst %d (relPE: %d) from src %d\n", CmiMyRank(), pe, relPE, msg->srcRank);
395             CharmNotifyMsg *newone = getNotifyMsg();
396             newone->ptr = (void *)loop;
397             envelope *env = UsrToEnv(newone);
398             env->setObjPtr(thisNodeHelper->helperPtr[pe]->ckGetChareID().objPtr);
399                         CmiPushPE(pe, (void *)env);
400                 }
401     }
402     loop->stealWork();
403 #endif
404 }
405
406 void SingleHelperStealWork(ConverseNotifyMsg *msg){
407         int srcRank = msg->srcRank;
408
409         if(srcRank >= 0){
410                 //means using tree-broadcast to send the notification msg
411
412                 //int numHelpers = CmiMyNodeSize(); //the value of "numHelpers" should be obtained somewhere else
413                 int relPE = CmiMyRank()-msg->srcRank;
414                 if(relPE<0) relPE += CmiMyNodeSize();
415
416                 //CmiPrintf("Rank[%d]: got msg from src %d with relPE %d\n", CmiMyRank(), msg->srcRank, relPE);
417                 relPE=relPE*TREE_BCAST_BRANCH+1;
418                 for(int i=0; i<TREE_BCAST_BRANCH; i++, relPE++){
419                         if(relPE >= CmiMyNodeSize()) break;
420                         int pe = (relPE + msg->srcRank)%CmiMyNodeSize();
421                         //CmiPrintf("Rank[%d]: send msg to dst %d (relPE: %d) from src %d\n", CmiMyRank(), pe, relPE, msg->srcRank);
422                         CmiPushPE(pe, (void *)msg);
423                 }
424         }
425     CurLoopInfo *loop = (CurLoopInfo *)msg->ptr;
426     loop->stealWork();
427 }
428
429 void CurLoopInfo::stealWork(){
430     //indicate the current work hasn't been initialized
431     //or the old work has finished.
432     if(inited == 0) return;
433
434     int first, last;
435     int unit = (upperIndex-lowerIndex+1)/numChunks;
436     int remainder = (upperIndex-lowerIndex+1)-unit*numChunks;
437     int markIdx = remainder*(unit+1);
438
439     int nextChunkId = getNextChunkIdx();
440     int execTimes = 0;
441     while(nextChunkId < numChunks){
442         if(nextChunkId < remainder){
443             first = (unit+1)*nextChunkId;
444             last = first+unit;
445         }else{
446             first = (nextChunkId - remainder)*unit + markIdx;
447             last = first+unit-1;
448         }
449
450         fnPtr(first, last, redBufs[nextChunkId], paramNum, param);
451         execTimes++;
452         nextChunkId = getNextChunkIdx();
453     }
454     reportFinished(execTimes);
455 }
456
457 //======================================================================//
458 //   End of functions related with FuncSingleHelper                     //
459 //======================================================================//
460
461 CProxy_FuncNodeHelper NodeHelper_Init(int mode, int numThreads){
462     if(mode == NODEHELPER_USECHARM){
463 #if USE_CONVERSE_NOTIFICATION
464         CkPrintf("NodeHelperLib is used in SMP with a simple dynamic scheduling (converse-level notification) but not using node-level queue\n");
465 #else
466         CkPrintf("NodeHelperLib is used in SMP with a simple dynamic scheduling (charm-level notifiation) but not using node-level queue\n");
467 #endif
468     }else if(mode==NODEHELPER_PTHREAD){
469         CkPrintf("NodeHelperLib is used with extra %d pthreads via a simple dynamic scheduling\n", numThreads);
470         CmiAssert(numThreads>0);
471     }
472     return CProxy_FuncNodeHelper::ckNew(mode, numThreads);
473 }
474
475 void NodeHelper_Exit(CProxy_FuncNodeHelper nodeHelper){
476         nodeHelper.exit();
477 }
478
479 void NodeHelper_Parallelize(CProxy_FuncNodeHelper nodeHelper, HelperFn func,
480                         int paramNum, void * param,
481                         int numChunks, int lowerRange, int upperRange,
482                                     int sync,
483                         void *redResult, REDUCTION_TYPE type)
484 {
485     nodeHelper[CkMyNode()].ckLocalBranch()->parallelizeFunc(func, paramNum, param, numChunks, lowerRange, upperRange, sync, redResult, type);
486 }
487
488 void NodeHelper_Parallelize(HelperFn func,
489                         int paramNum, void * param,
490                         int numChunks, int lowerRange, int upperRange,
491                                     int sync,
492                         void *redResult, REDUCTION_TYPE type)
493 {
494     globalNodeHelper->parallelizeFunc(func, paramNum, param, numChunks, lowerRange, upperRange, sync, redResult, type);
495 }
496 #include "NodeHelper.def.h"