Merge branch 'charm-mpi-interop' of charmgit:charm into charm
[charm.git] / src / libs / ck-libs / ckloop / CkLoop.C
1 #include "CkLoop.h"
2 #include <pthread.h>
3
4 #if !USE_CONVERSE_NOTIFICATION
5 #include "qd.h"
6 #endif
7
8 /*====Beginning of pthread-related variables and impelementation====*/
9 //__thread is not portable, but it works almost everywhere if pthread works
10 //After C++11, this should be thread_local
11 static __thread pthread_cond_t thdCondition; //the signal var of each pthread to be notified
12 static __thread pthread_mutex_t thdLock; //the lock associated with the condition variables
13
14 static FuncCkLoop *mainHelper = NULL;
15 static int mainHelperPhyRank = 0;
16 static int numPhysicalPEs = 0;
17 static CurLoopInfo *pthdLoop = NULL; //the pthread-version is always synchronized
18 static pthread_mutex_t **allLocks = NULL;
19 static pthread_cond_t **allConds = NULL;
20 static pthread_t *ndhThreads = NULL;
21 static volatile int gCrtCnt = 0;
22 static volatile int exitFlag = 0;
23
24 #if CMK_OS_IS_LINUX
25 #include <sys/syscall.h>
26 #endif
27
28 static int HelperOnCore() {
29 #if CMK_OS_IS_LINUX
30     char fname[64];
31     sprintf(fname, "/proc/%d/task/%ld/stat", getpid(), syscall(SYS_gettid));
32     FILE *ifp = fopen(fname, "r");
33     if (ifp == NULL) return -1;
34     fseek(ifp, 0, SEEK_SET);
35     char str[128];
36     for (int i=0; i<39; i++) fscanf(ifp, "%s", str);
37     fclose(ifp);
38     return atoi(str);
39 #else
40     return -1;
41 #endif
42 }
43
44 static void *ndhThreadWork(void *id) {
45     size_t myId = (size_t) id;
46
47     //further improvement of this affinity setting!!
48     int myPhyRank = (myId+mainHelperPhyRank)%numPhysicalPEs;
49     //printf("thread[%d]: affixed to rank %d\n", myId, myPhyRank);
50     myPhyRank = myId;
51     CmiSetCPUAffinity(myPhyRank);
52
53     pthread_mutex_init(&thdLock, NULL);
54     pthread_cond_init(&thdCondition, NULL);
55
56     allLocks[myId-1] = &thdLock;
57     allConds[myId-1] = &thdCondition;
58
59     __sync_add_and_fetch(&gCrtCnt, 1);
60
61     while (1) {
62         //printf("thread[%ld]: on core %d with main %d\n", myId, HelperOnCore(), mainHelperPhyRank);
63         if (exitFlag) break;
64         pthread_mutex_lock(&thdLock);
65         pthread_cond_wait(&thdCondition, &thdLock);
66         pthread_mutex_unlock(&thdLock);
67         /* kids ID range: [1 ~ numHelpers-1] */
68         if (mainHelper->needTreeBcast()) {
69             //notify my children
70             int myKid = myId*TREE_BCAST_BRANCH+1;
71             for (int i=0; i<TREE_BCAST_BRANCH; i++, myKid++) {
72                 if (myKid >= mainHelper->getNumHelpers()) break;
73                 //all locks and conditions exclude the main thread, so index needs to be subtracted by one
74                 pthread_mutex_lock(allLocks[myKid-1]);
75                 pthread_cond_signal(allConds[myKid-1]);
76                 pthread_mutex_unlock(allLocks[myKid-1]);
77             }
78         }
79         pthdLoop->stealWork();
80     }
81 }
82
83 void FuncCkLoop::createPThreads() {
84     int numThreads = numHelpers - 1;
85     allLocks = (pthread_mutex_t **)malloc(sizeof(void *)*numThreads);
86     allConds = (pthread_cond_t **)malloc(sizeof(void *)*numThreads);
87     memset(allLocks, 0, sizeof(void *)*numThreads);
88     memset(allConds, 0, sizeof(void *)*numThreads);
89
90     pthread_attr_t attr;
91     pthread_attr_init(&attr);
92     pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
93     ndhThreads = new pthread_t[numThreads];
94     mainHelperPhyRank = CmiOnCore();
95     numPhysicalPEs = CmiNumCores();
96     if (mainHelperPhyRank == -1) mainHelperPhyRank = 0;
97     for (int i=1; i<=numThreads; i++) {
98         pthread_create(ndhThreads+i, &attr, ndhThreadWork, (void *)i);
99     }
100     while (gCrtCnt != numThreads); //wait for all threads to finish creation
101 }
102
103 void FuncCkLoop::exit() {
104     if (mode == CKLOOP_PTHREAD) {
105         exitFlag = 1;
106         for (int i=0; i<numHelpers-1; i++)
107             pthread_join(ndhThreads[i], NULL);
108         delete [] ndhThreads;
109         free(allLocks);
110         free(allConds);
111         delete pthdLoop;
112     }
113 }
114
115 /*====End of pthread-related variables and impelementation====*/
116
117
118 /* Note: Those event ids should be unique globally!! */
119 #define CKLOOP_TOTAL_WORK_EVENTID  139
120 #define CKLOOP_FINISH_SIGNAL_EVENTID 143
121
122 static FuncCkLoop *globalCkLoop = NULL;
123
124 FuncCkLoop::FuncCkLoop(int mode_, int numThreads_) {
125     traceRegisterUserEvent("ckloop total work",CKLOOP_TOTAL_WORK_EVENTID);
126     traceRegisterUserEvent("ckloop finish signal",CKLOOP_FINISH_SIGNAL_EVENTID);
127
128     mode = mode_;
129
130     CmiAssert(globalCkLoop==NULL);
131     globalCkLoop = this;
132
133     if (mode == CKLOOP_USECHARM) {
134         //CkPrintf("FuncCkLoop created on node %d\n", CkMyNode());
135         numHelpers = CkMyNodeSize();
136         helperPtr = new FuncSingleHelper *[numHelpers];
137         useTreeBcast = (numHelpers >= USE_TREE_BROADCAST_THRESHOLD);
138
139         int pestart = CkNodeFirst(CkMyNode());
140
141         for (int i=0; i<numHelpers; i++) {
142             CkChareID helper;
143             CProxy_FuncSingleHelper::ckNew(numHelpers, &helper, pestart+i);
144         }
145     } else if (mode == CKLOOP_PTHREAD) {
146         helperPtr = NULL;
147
148         numHelpers = numThreads_;
149         useTreeBcast = (numHelpers >= USE_TREE_BROADCAST_THRESHOLD);
150         pthdLoop = new CurLoopInfo(FuncCkLoop::MAX_CHUNKS);
151         mainHelper = this;
152         createPThreads();
153     }
154 }
155
156 int FuncCkLoop::MAX_CHUNKS = 64;
157
158 #if CMK_TRACE_ENABLED
159 #define TRACE_START(id) _start = CmiWallTimer()
160 #define TRACE_BRACKET(id) traceUserBracketEvent(id,_start,CmiWallTimer())
161 #else
162 #define TRACE_START(id)
163 #define TRACE_BRACKET(id)
164 #endif
165
166 #define ALLOW_MULTIPLE_UNSYNC 1
167 void FuncCkLoop::parallelizeFunc(HelperFn func, int paramNum, void * param,
168                                      int numChunks, int lowerRange,
169                                      int upperRange, int sync,
170                                      void *redResult, REDUCTION_TYPE type) {
171
172     double _start; //may be used for tracing
173
174     if (numChunks > MAX_CHUNKS) {
175         CkPrintf("CkLoop[%d]: WARNING! chunk is set to MAX_CHUNKS=%d\n", CmiMyPe(), MAX_CHUNKS);
176         numChunks = MAX_CHUNKS;
177     }
178
179     /* "stride" determines the number of loop iterations to be done in each chunk
180      * for chunk indexed at 0 to remainder-1, stride is "unit+1";
181      * for chunk indexed at remainder to numChunks-1, stride is "unit"
182     int stride;
183      */
184     CurLoopInfo *curLoop = NULL;
185
186     //for using nodequeue
187     TRACE_START(CKLOOP_TOTAL_WORK_EVENTID);
188     if (mode == CKLOOP_USECHARM) {
189         FuncSingleHelper *thisHelper = helperPtr[CkMyRank()];
190 #if USE_CONVERSE_NOTIFICATION
191 #if ALLOW_MULTIPLE_UNSYNC
192         ConverseNotifyMsg *notifyMsg = thisHelper->getNotifyMsg();
193 #else
194         ConverseNotifyMsg *notifyMsg = thisHelper->notifyMsg;
195 #endif
196         curLoop = (CurLoopInfo *)(notifyMsg->ptr);
197         curLoop->set(numChunks, func, lowerRange, upperRange, paramNum, param);
198         if (useTreeBcast) {
199             int loopTimes = TREE_BCAST_BRANCH>(CmiMyNodeSize()-1)?CmiMyNodeSize()-1:TREE_BCAST_BRANCH;
200             //just implicit binary tree
201             int pe = CmiMyRank()+1;
202             for (int i=0; i<loopTimes; i++, pe++) {
203                 if (pe >= CmiMyNodeSize()) pe -= CmiMyNodeSize();
204                 CmiPushPE(pe, (void *)(notifyMsg));
205             }
206         } else {
207             for (int i=CmiMyRank()+1; i<numHelpers; i++) {
208                 CmiPushPE(i, (void *)(notifyMsg));
209             }
210             for (int i=0; i<CmiMyRank(); i++) {
211                 CmiPushPE(i, (void *)(notifyMsg));
212             }
213         }
214 #else
215 #if ALLOW_MULTIPLE_UNSYNC
216         curLoop = thisHelper->getNewTask();
217 #else
218         curLoop = thisHelper->taskBuffer[0];
219 #endif
220         curLoop->set(numChunks, func, lowerRange, upperRange, paramNum, param);
221         CpvAccess(_qd)->create(numHelpers-1);
222         if (useTreeBcast) {
223             int loopTimes = TREE_BCAST_BRANCH>(CmiMyNodeSize()-1)?CmiMyNodeSize()-1:TREE_BCAST_BRANCH;
224             //just implicit binary tree
225             int pe = CmiMyRank()+1;
226             for (int i=0; i<loopTimes; i++, pe++) {
227                 if (pe >= CmiMyNodeSize()) pe -= CmiMyNodeSize();
228                 CharmNotifyMsg *one = thisHelper->getNotifyMsg();
229                 one->ptr = (void *)curLoop;
230                 envelope *env = UsrToEnv(one);
231                 env->setObjPtr(thisHelper->ckGetChareID().objPtr);
232                 CmiPushPE(pe, (void *)(env));
233             }
234         } else {
235             for (int i=CmiMyRank()+1; i<numHelpers; i++) {
236                 CharmNotifyMsg *one = thisHelper->getNotifyMsg();
237                 one->ptr = (void *)curLoop;
238                 envelope *env = UsrToEnv(one);
239                 env->setObjPtr(thisHelper->ckGetChareID().objPtr);
240                 //printf("[%d] sending a msg %p (env=%p) to [%d]\n", CmiMyRank(), one, env, i);
241                 CmiPushPE(i, (void *)(env));
242             }
243             for (int i=0; i<CmiMyRank(); i++) {
244                 CharmNotifyMsg *one = thisHelper->getNotifyMsg();
245                 one->ptr = (void *)curLoop;
246                 envelope *env = UsrToEnv(one);
247                 env->setObjPtr(thisHelper->ckGetChareID().objPtr);
248                 //printf("[%d] sending a msg %p (env=%p) to [%d]\n", CmiMyRank(), one, env, i);
249                 CmiPushPE(i, (void *)(env));
250             }
251         }
252 #endif
253     } else if (mode == CKLOOP_PTHREAD) {
254         int numThreads = numHelpers-1;
255         curLoop = pthdLoop;
256         curLoop->set(numChunks, func, lowerRange, upperRange, paramNum, param);
257         int numNotices = numThreads;
258         if (useTreeBcast) {
259             numNotices = TREE_BCAST_BRANCH>=numThreads?numThreads:TREE_BCAST_BRANCH;
260         }
261         for (int i=0; i<numNotices; i++) {
262             pthread_mutex_lock(allLocks[i]);
263             pthread_cond_signal(allConds[i]);
264             pthread_mutex_unlock(allLocks[i]);
265         }
266         //in this mode, it's always synced
267         sync = 1;
268     }
269
270     curLoop->stealWork();
271     TRACE_BRACKET(CKLOOP_TOTAL_WORK_EVENTID);
272
273     //printf("[%d]: parallelize func %p with [%d ~ %d] divided into %d chunks using loop=%p\n", CkMyPe(), func, lowerRange, upperRange, numChunks, curLoop);
274
275     TRACE_START(CKLOOP_FINISH_SIGNAL_EVENTID);
276     curLoop->waitLoopDone(sync);
277     TRACE_BRACKET(CKLOOP_FINISH_SIGNAL_EVENTID);
278
279     //printf("[%d]: finished parallelize func %p with [%d ~ %d] divided into %d chunks using loop=%p\n", CkMyPe(), func, lowerRange, upperRange, numChunks, curLoop);
280
281     if (type!=CKLOOP_NONE)
282         reduce(curLoop->getRedBufs(), redResult, type, numChunks);
283     return;
284 }
285
286 #define COMPUTE_REDUCTION(T) {\
287     for(int i=0; i<numChunks; i++) {\
288      result += *((T *)(redBufs[i])); \
289      /*CkPrintf("CkLoop Reduce: %d\n", result);*/ \
290     }\
291 }
292
293 void FuncCkLoop::reduce(void **redBufs, void *redBuf, REDUCTION_TYPE type, int numChunks) {
294     switch (type) {
295     case CKLOOP_INT_SUM: {
296         int result=0;
297         COMPUTE_REDUCTION(int)
298         *((int *)redBuf) = result;
299         break;
300     }
301     case CKLOOP_FLOAT_SUM: {
302         float result=0;
303         COMPUTE_REDUCTION(float)
304         *((float *)redBuf) = result;
305         break;
306     }
307     case CKLOOP_DOUBLE_SUM: {
308         double result=0;
309         COMPUTE_REDUCTION(double)
310         *((double *)redBuf) = result;
311         break;
312     }
313     default:
314         break;
315     }
316 }
317
318 CpvStaticDeclare(int, NdhStealWorkHandler);
319 static void RegisterCkLoopHdlrs() {
320     CpvInitialize(int, NdhStealWorkHandler);
321     CpvAccess(NdhStealWorkHandler) = CmiRegisterHandler((CmiHandler)SingleHelperStealWork);
322 }
323
324 extern int _charmHandlerIdx;
325 FuncSingleHelper::FuncSingleHelper(int numHelpers) {
326     totalHelpers = numHelpers;
327 #if USE_CONVERSE_NOTIFICATION
328     notifyMsgBufSize = TASK_BUFFER_SIZE;
329 #else
330     notifyMsgBufSize = TASK_BUFFER_SIZE*totalHelpers;
331 #endif
332
333     CmiAssert(globalCkLoop!=NULL);
334     thisCkLoop = globalCkLoop;
335
336     nextFreeNotifyMsg = 0;
337 #if USE_CONVERSE_NOTIFICATION
338     notifyMsg = (ConverseNotifyMsg *)malloc(sizeof(ConverseNotifyMsg)*notifyMsgBufSize);
339     for (int i=0; i<notifyMsgBufSize; i++) {
340         ConverseNotifyMsg *tmp = notifyMsg+i;
341         if (thisCkLoop->useTreeBcast) {
342             tmp->srcRank = CmiMyRank();
343         } else {
344             tmp->srcRank = -1;
345         }
346         tmp->ptr = (void *)(new CurLoopInfo(FuncCkLoop::MAX_CHUNKS));
347         CmiSetHandler(tmp, CpvAccess(NdhStealWorkHandler));
348     }
349 #else
350     nextFreeTaskBuffer = 0;
351     notifyMsg = (CharmNotifyMsg **)malloc(sizeof(CharmNotifyMsg *)*notifyMsgBufSize);
352     for (int i=0; i<notifyMsgBufSize; i++) {
353         CharmNotifyMsg *tmp = new(sizeof(int)*8)CharmNotifyMsg; //allow msg priority bits
354         notifyMsg[i] = tmp;
355         if (thisCkLoop->useTreeBcast) {
356             tmp->srcRank = CmiMyRank();
357         } else {
358             tmp->srcRank = -1;
359         }
360         tmp->ptr = NULL;
361         envelope *env = UsrToEnv(tmp);
362         env->setMsgtype(ForChareMsg);
363         env->setEpIdx(CkIndex_FuncSingleHelper::stealWork(NULL));
364         env->setSrcPe(CkMyPe());
365         CmiSetHandler(env, _charmHandlerIdx);
366         //env->setObjPtr has to be called when a notification msg is sent
367     }
368     taskBuffer = (CurLoopInfo **)malloc(sizeof(CurLoopInfo *)*TASK_BUFFER_SIZE);
369     for (int i=0; i<TASK_BUFFER_SIZE; i++) {
370         taskBuffer[i] = new CurLoopInfo(FuncCkLoop::MAX_CHUNKS);
371     }
372 #endif
373     globalCkLoop->helperPtr[CkMyRank()] = this;
374 }
375
376 void FuncSingleHelper::stealWork(CharmNotifyMsg *msg) {
377 #if !USE_CONVERSE_NOTIFICATION
378     int srcRank = msg->srcRank;
379     CurLoopInfo *loop = (CurLoopInfo *)msg->ptr;
380     if (srcRank >= 0) {
381         //means using tree-broadcast to send the notification msg
382         int relPE = CmiMyRank()-msg->srcRank;
383         if (relPE<0) relPE += CmiMyNodeSize();
384
385         //CmiPrintf("Rank[%d]: got msg from src %d with relPE %d\n", CmiMyRank(), msg->srcRank, relPE);
386         relPE=relPE*TREE_BCAST_BRANCH+1;
387         for (int i=0; i<TREE_BCAST_BRANCH; i++, relPE++) {
388             if (relPE >= CmiMyNodeSize()) break;
389             int pe = (relPE + msg->srcRank)%CmiMyNodeSize();
390             //CmiPrintf("Rank[%d]: send msg to dst %d (relPE: %d) from src %d\n", CmiMyRank(), pe, relPE, msg->srcRank);
391             CharmNotifyMsg *newone = getNotifyMsg();
392             newone->ptr = (void *)loop;
393             envelope *env = UsrToEnv(newone);
394             env->setObjPtr(thisCkLoop->helperPtr[pe]->ckGetChareID().objPtr);
395             CmiPushPE(pe, (void *)env);
396         }
397     }
398     loop->stealWork();
399 #endif
400 }
401
402 void SingleHelperStealWork(ConverseNotifyMsg *msg) {
403     int srcRank = msg->srcRank;
404
405     if (srcRank >= 0) {
406         //means using tree-broadcast to send the notification msg
407
408         //int numHelpers = CmiMyNodeSize(); //the value of "numHelpers" should be obtained somewhere else
409         int relPE = CmiMyRank()-msg->srcRank;
410         if (relPE<0) relPE += CmiMyNodeSize();
411
412         //CmiPrintf("Rank[%d]: got msg from src %d with relPE %d\n", CmiMyRank(), msg->srcRank, relPE);
413         relPE=relPE*TREE_BCAST_BRANCH+1;
414         for (int i=0; i<TREE_BCAST_BRANCH; i++, relPE++) {
415             if (relPE >= CmiMyNodeSize()) break;
416             int pe = (relPE + msg->srcRank)%CmiMyNodeSize();
417             //CmiPrintf("Rank[%d]: send msg to dst %d (relPE: %d) from src %d\n", CmiMyRank(), pe, relPE, msg->srcRank);
418             CmiPushPE(pe, (void *)msg);
419         }
420     }
421     CurLoopInfo *loop = (CurLoopInfo *)msg->ptr;
422     loop->stealWork();
423 }
424
425 void CurLoopInfo::stealWork() {
426     //indicate the current work hasn't been initialized
427     //or the old work has finished.
428     if (inited == 0) return;
429
430     int first, last;
431     int unit = (upperIndex-lowerIndex+1)/numChunks;
432     int remainder = (upperIndex-lowerIndex+1)-unit*numChunks;
433     int markIdx = remainder*(unit+1);
434
435     int nextChunkId = getNextChunkIdx();
436     int execTimes = 0;
437     while (nextChunkId < numChunks) {
438         if (nextChunkId < remainder) {
439             first = lowerIndex+(unit+1)*nextChunkId;
440             last = first+unit;
441         } else {
442             first = lowerIndex+(nextChunkId - remainder)*unit + markIdx;
443             last = first+unit-1;
444         }
445
446         fnPtr(first, last, redBufs[nextChunkId], paramNum, param);
447         execTimes++;
448         nextChunkId = getNextChunkIdx();
449     }
450     reportFinished(execTimes);
451 }
452
453 //======================================================================//
454 //   End of functions related with FuncSingleHelper                     //
455 //======================================================================//
456
457 CProxy_FuncCkLoop CkLoop_Init(int mode, int numThreads) {
458     if (mode == CKLOOP_USECHARM) {
459 #if USE_CONVERSE_NOTIFICATION
460         CkPrintf("CkLoopLib is used in SMP with a simple dynamic scheduling (converse-level notification) but not using node-level queue\n");
461 #else
462         CkPrintf("CkLoopLib is used in SMP with a simple dynamic scheduling (charm-level notifiation) but not using node-level queue\n");
463 #endif
464     } else if (mode==CKLOOP_PTHREAD) {
465         CkPrintf("CkLoopLib is used with extra %d pthreads via a simple dynamic scheduling\n", numThreads);
466         CmiAssert(numThreads>0);
467     }
468     return CProxy_FuncCkLoop::ckNew(mode, numThreads);
469 }
470
471 void CkLoop_Exit(CProxy_FuncCkLoop ckLoop) {
472     ckLoop.exit();
473 }
474
475 void CkLoop_Parallelize(CProxy_FuncCkLoop ckLoop, HelperFn func,
476                             int paramNum, void * param,
477                             int numChunks, int lowerRange, int upperRange,
478                             int sync,
479                             void *redResult, REDUCTION_TYPE type) {
480     ckLoop[CkMyNode()].ckLocalBranch()->parallelizeFunc(func, paramNum, param, numChunks, lowerRange, upperRange, sync, redResult, type);
481 }
482
483 void CkLoop_Parallelize(HelperFn func,
484                             int paramNum, void * param,
485                             int numChunks, int lowerRange, int upperRange,
486                             int sync,
487                             void *redResult, REDUCTION_TYPE type) {
488     globalCkLoop->parallelizeFunc(func, paramNum, param, numChunks, lowerRange, upperRange, sync, redResult, type);
489 }
490 #include "CkLoop.def.h"