Cleaned the library to remove redundant function argument and interface
[charm.git] / src / libs / ck-libs / ckloop / CkLoop.C
1 #include "CkLoop.h"
2 #include <pthread.h>
3
4 #if !USE_CONVERSE_NOTIFICATION
5 #include "qd.h"
6 #endif
7
8 #define CKLOOP_USECHARM 1
9 #define CKLOOP_PTHREAD 2
10
11 /*====Beginning of pthread-related variables and impelementation====*/
12 //__thread is not portable, but it works almost everywhere if pthread works
13 //After C++11, this should be thread_local
14 static __thread pthread_cond_t thdCondition; //the signal var of each pthread to be notified
15 static __thread pthread_mutex_t thdLock; //the lock associated with the condition variables
16
17 static FuncCkLoop *mainHelper = NULL;
18 static int mainHelperPhyRank = 0;
19 static int numPhysicalPEs = 0;
20 static CurLoopInfo *pthdLoop = NULL; //the pthread-version is always synchronized
21 static pthread_mutex_t **allLocks = NULL;
22 static pthread_cond_t **allConds = NULL;
23 static pthread_t *ndhThreads = NULL;
24 static volatile int gCrtCnt = 0;
25 static volatile int exitFlag = 0;
26
27 #if CMK_OS_IS_LINUX
28 #include <sys/syscall.h>
29 #endif
30
31 static int HelperOnCore() {
32 #if CMK_OS_IS_LINUX
33     char fname[64];
34     sprintf(fname, "/proc/%d/task/%ld/stat", getpid(), syscall(SYS_gettid));
35     FILE *ifp = fopen(fname, "r");
36     if (ifp == NULL) return -1;
37     fseek(ifp, 0, SEEK_SET);
38     char str[128];
39     for (int i=0; i<39; i++) fscanf(ifp, "%s", str);
40     fclose(ifp);
41     return atoi(str);
42 #else
43     return -1;
44 #endif
45 }
46
47 static void *ndhThreadWork(void *id) {
48     size_t myId = (size_t) id;
49
50     //further improvement of this affinity setting!!
51     int myPhyRank = (myId+mainHelperPhyRank)%numPhysicalPEs;
52     //printf("thread[%d]: affixed to rank %d\n", myId, myPhyRank);
53     myPhyRank = myId;
54     CmiSetCPUAffinity(myPhyRank);
55
56     pthread_mutex_init(&thdLock, NULL);
57     pthread_cond_init(&thdCondition, NULL);
58
59     allLocks[myId-1] = &thdLock;
60     allConds[myId-1] = &thdCondition;
61
62     __sync_add_and_fetch(&gCrtCnt, 1);
63
64     while (1) {
65         //printf("thread[%ld]: on core %d with main %d\n", myId, HelperOnCore(), mainHelperPhyRank);
66         if (exitFlag) break;
67         pthread_mutex_lock(&thdLock);
68         pthread_cond_wait(&thdCondition, &thdLock);
69         pthread_mutex_unlock(&thdLock);
70         /* kids ID range: [1 ~ numHelpers-1] */
71         if (mainHelper->needTreeBcast()) {
72             //notify my children
73             int myKid = myId*TREE_BCAST_BRANCH+1;
74             for (int i=0; i<TREE_BCAST_BRANCH; i++, myKid++) {
75                 if (myKid >= mainHelper->getNumHelpers()) break;
76                 //all locks and conditions exclude the main thread, so index needs to be subtracted by one
77                 pthread_mutex_lock(allLocks[myKid-1]);
78                 pthread_cond_signal(allConds[myKid-1]);
79                 pthread_mutex_unlock(allLocks[myKid-1]);
80             }
81         }
82         pthdLoop->stealWork();
83     }
84 }
85
86 void FuncCkLoop::createPThreads() {
87     int numThreads = numHelpers - 1;
88     allLocks = (pthread_mutex_t **)malloc(sizeof(void *)*numThreads);
89     allConds = (pthread_cond_t **)malloc(sizeof(void *)*numThreads);
90     memset(allLocks, 0, sizeof(void *)*numThreads);
91     memset(allConds, 0, sizeof(void *)*numThreads);
92
93     pthread_attr_t attr;
94     pthread_attr_init(&attr);
95     pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
96     ndhThreads = new pthread_t[numThreads];
97     mainHelperPhyRank = CmiOnCore();
98     numPhysicalPEs = CmiNumCores();
99     if (mainHelperPhyRank == -1) mainHelperPhyRank = 0;
100     for (int i=1; i<=numThreads; i++) {
101         pthread_create(ndhThreads+i, &attr, ndhThreadWork, (void *)i);
102     }
103     while (gCrtCnt != numThreads); //wait for all threads to finish creation
104 }
105
106 void FuncCkLoop::exit() {
107     if (mode == CKLOOP_PTHREAD) {
108         exitFlag = 1;
109         for (int i=0; i<numHelpers-1; i++)
110             pthread_join(ndhThreads[i], NULL);
111         delete [] ndhThreads;
112         free(allLocks);
113         free(allConds);
114         delete pthdLoop;
115     }
116 }
117
118 /*====End of pthread-related variables and impelementation====*/
119
120
121 /* Note: Those event ids should be unique globally!! */
122 #define CKLOOP_TOTAL_WORK_EVENTID  139
123 #define CKLOOP_FINISH_SIGNAL_EVENTID 143
124
125 static FuncCkLoop *globalCkLoop = NULL;
126
127 FuncCkLoop::FuncCkLoop(int mode_, int numThreads_) {
128     traceRegisterUserEvent("ckloop total work",CKLOOP_TOTAL_WORK_EVENTID);
129     traceRegisterUserEvent("ckloop finish signal",CKLOOP_FINISH_SIGNAL_EVENTID);
130
131     mode = mode_;
132
133     CmiAssert(globalCkLoop==NULL);
134     globalCkLoop = this;
135
136     if (mode == CKLOOP_USECHARM) {
137         //CkPrintf("FuncCkLoop created on node %d\n", CkMyNode());
138         numHelpers = CkMyNodeSize();
139         helperPtr = new FuncSingleHelper *[numHelpers];
140         useTreeBcast = (numHelpers >= USE_TREE_BROADCAST_THRESHOLD);
141
142         int pestart = CkNodeFirst(CkMyNode());
143
144         for (int i=0; i<numHelpers; i++) {
145             CkChareID helper;
146             CProxy_FuncSingleHelper::ckNew(numHelpers, &helper, pestart+i);
147         }
148     } else if (mode == CKLOOP_PTHREAD) {
149         helperPtr = NULL;
150
151         numHelpers = numThreads_;
152         useTreeBcast = (numHelpers >= USE_TREE_BROADCAST_THRESHOLD);
153         pthdLoop = new CurLoopInfo(FuncCkLoop::MAX_CHUNKS);
154         mainHelper = this;
155         createPThreads();
156     }
157 }
158
159 int FuncCkLoop::MAX_CHUNKS = 64;
160
161 #if CMK_TRACE_ENABLED
162 #define TRACE_START(id) _start = CmiWallTimer()
163 #define TRACE_BRACKET(id) traceUserBracketEvent(id,_start,CmiWallTimer())
164 #else
165 #define TRACE_START(id)
166 #define TRACE_BRACKET(id)
167 #endif
168
169 #define ALLOW_MULTIPLE_UNSYNC 1
170 void FuncCkLoop::parallelizeFunc(HelperFn func, int paramNum, void * param,
171                                      int numChunks, int lowerRange,
172                                      int upperRange, int sync,
173                                      void *redResult, REDUCTION_TYPE type) {
174
175     double _start; //may be used for tracing
176
177     if (numChunks > MAX_CHUNKS) {
178         CkPrintf("CkLoop[%d]: WARNING! chunk is set to MAX_CHUNKS=%d\n", CmiMyPe(), MAX_CHUNKS);
179         numChunks = MAX_CHUNKS;
180     }
181
182     /* "stride" determines the number of loop iterations to be done in each chunk
183      * for chunk indexed at 0 to remainder-1, stride is "unit+1";
184      * for chunk indexed at remainder to numChunks-1, stride is "unit"
185     int stride;
186      */
187     CurLoopInfo *curLoop = NULL;
188
189     //for using nodequeue
190     TRACE_START(CKLOOP_TOTAL_WORK_EVENTID);
191     if (mode == CKLOOP_USECHARM) {
192         FuncSingleHelper *thisHelper = helperPtr[CkMyRank()];
193 #if USE_CONVERSE_NOTIFICATION
194 #if ALLOW_MULTIPLE_UNSYNC
195         ConverseNotifyMsg *notifyMsg = thisHelper->getNotifyMsg();
196 #else
197         ConverseNotifyMsg *notifyMsg = thisHelper->notifyMsg;
198 #endif
199         curLoop = (CurLoopInfo *)(notifyMsg->ptr);
200         curLoop->set(numChunks, func, lowerRange, upperRange, paramNum, param);
201         if (useTreeBcast) {
202             int loopTimes = TREE_BCAST_BRANCH>(CmiMyNodeSize()-1)?CmiMyNodeSize()-1:TREE_BCAST_BRANCH;
203             //just implicit binary tree
204             int pe = CmiMyRank()+1;
205             for (int i=0; i<loopTimes; i++, pe++) {
206                 if (pe >= CmiMyNodeSize()) pe -= CmiMyNodeSize();
207                 CmiPushPE(pe, (void *)(notifyMsg));
208             }
209         } else {
210             for (int i=CmiMyRank()+1; i<numHelpers; i++) {
211                 CmiPushPE(i, (void *)(notifyMsg));
212             }
213             for (int i=0; i<CmiMyRank(); i++) {
214                 CmiPushPE(i, (void *)(notifyMsg));
215             }
216         }
217 #else
218 #if ALLOW_MULTIPLE_UNSYNC
219         curLoop = thisHelper->getNewTask();
220 #else
221         curLoop = thisHelper->taskBuffer[0];
222 #endif
223         curLoop->set(numChunks, func, lowerRange, upperRange, paramNum, param);
224         CpvAccess(_qd)->create(numHelpers-1);
225         if (useTreeBcast) {
226             int loopTimes = TREE_BCAST_BRANCH>(CmiMyNodeSize()-1)?CmiMyNodeSize()-1:TREE_BCAST_BRANCH;
227             //just implicit binary tree
228             int pe = CmiMyRank()+1;
229             for (int i=0; i<loopTimes; i++, pe++) {
230                 if (pe >= CmiMyNodeSize()) pe -= CmiMyNodeSize();
231                 CharmNotifyMsg *one = thisHelper->getNotifyMsg();
232                 one->ptr = (void *)curLoop;
233                 envelope *env = UsrToEnv(one);
234                 env->setObjPtr(thisHelper->ckGetChareID().objPtr);
235                 CmiPushPE(pe, (void *)(env));
236             }
237         } else {
238             for (int i=CmiMyRank()+1; i<numHelpers; i++) {
239                 CharmNotifyMsg *one = thisHelper->getNotifyMsg();
240                 one->ptr = (void *)curLoop;
241                 envelope *env = UsrToEnv(one);
242                 env->setObjPtr(thisHelper->ckGetChareID().objPtr);
243                 //printf("[%d] sending a msg %p (env=%p) to [%d]\n", CmiMyRank(), one, env, i);
244                 CmiPushPE(i, (void *)(env));
245             }
246             for (int i=0; i<CmiMyRank(); i++) {
247                 CharmNotifyMsg *one = thisHelper->getNotifyMsg();
248                 one->ptr = (void *)curLoop;
249                 envelope *env = UsrToEnv(one);
250                 env->setObjPtr(thisHelper->ckGetChareID().objPtr);
251                 //printf("[%d] sending a msg %p (env=%p) to [%d]\n", CmiMyRank(), one, env, i);
252                 CmiPushPE(i, (void *)(env));
253             }
254         }
255 #endif
256     } else if (mode == CKLOOP_PTHREAD) {
257         int numThreads = numHelpers-1;
258         curLoop = pthdLoop;
259         curLoop->set(numChunks, func, lowerRange, upperRange, paramNum, param);
260         int numNotices = numThreads;
261         if (useTreeBcast) {
262             numNotices = TREE_BCAST_BRANCH>=numThreads?numThreads:TREE_BCAST_BRANCH;
263         }
264         for (int i=0; i<numNotices; i++) {
265             pthread_mutex_lock(allLocks[i]);
266             pthread_cond_signal(allConds[i]);
267             pthread_mutex_unlock(allLocks[i]);
268         }
269         //in this mode, it's always synced
270         sync = 1;
271     }
272
273     curLoop->stealWork();
274     TRACE_BRACKET(CKLOOP_TOTAL_WORK_EVENTID);
275
276     //printf("[%d]: parallelize func %p with [%d ~ %d] divided into %d chunks using loop=%p\n", CkMyPe(), func, lowerRange, upperRange, numChunks, curLoop);
277
278     TRACE_START(CKLOOP_FINISH_SIGNAL_EVENTID);
279     curLoop->waitLoopDone(sync);
280     TRACE_BRACKET(CKLOOP_FINISH_SIGNAL_EVENTID);
281
282     //printf("[%d]: finished parallelize func %p with [%d ~ %d] divided into %d chunks using loop=%p\n", CkMyPe(), func, lowerRange, upperRange, numChunks, curLoop);
283
284     if (type!=CKLOOP_NONE)
285         reduce(curLoop->getRedBufs(), redResult, type, numChunks);
286     return;
287 }
288
289 #define COMPUTE_REDUCTION(T) {\
290     for(int i=0; i<numChunks; i++) {\
291      result += *((T *)(redBufs[i])); \
292      /*CkPrintf("CkLoop Reduce: %d\n", result);*/ \
293     }\
294 }
295
296 void FuncCkLoop::reduce(void **redBufs, void *redBuf, REDUCTION_TYPE type, int numChunks) {
297     switch (type) {
298     case CKLOOP_INT_SUM: {
299         int result=0;
300         COMPUTE_REDUCTION(int)
301         *((int *)redBuf) = result;
302         break;
303     }
304     case CKLOOP_FLOAT_SUM: {
305         float result=0;
306         COMPUTE_REDUCTION(float)
307         *((float *)redBuf) = result;
308         break;
309     }
310     case CKLOOP_DOUBLE_SUM: {
311         double result=0;
312         COMPUTE_REDUCTION(double)
313         *((double *)redBuf) = result;
314         break;
315     }
316     default:
317         break;
318     }
319 }
320
321 CpvStaticDeclare(int, NdhStealWorkHandler);
322 static void RegisterCkLoopHdlrs() {
323     CpvInitialize(int, NdhStealWorkHandler);
324     CpvAccess(NdhStealWorkHandler) = CmiRegisterHandler((CmiHandler)SingleHelperStealWork);
325 }
326
327 extern int _charmHandlerIdx;
328 FuncSingleHelper::FuncSingleHelper(int numHelpers) {
329     totalHelpers = numHelpers;
330 #if USE_CONVERSE_NOTIFICATION
331     notifyMsgBufSize = TASK_BUFFER_SIZE;
332 #else
333     notifyMsgBufSize = TASK_BUFFER_SIZE*totalHelpers;
334 #endif
335
336     CmiAssert(globalCkLoop!=NULL);
337     thisCkLoop = globalCkLoop;
338
339     nextFreeNotifyMsg = 0;
340 #if USE_CONVERSE_NOTIFICATION
341     notifyMsg = (ConverseNotifyMsg *)malloc(sizeof(ConverseNotifyMsg)*notifyMsgBufSize);
342     for (int i=0; i<notifyMsgBufSize; i++) {
343         ConverseNotifyMsg *tmp = notifyMsg+i;
344         if (thisCkLoop->useTreeBcast) {
345             tmp->srcRank = CmiMyRank();
346         } else {
347             tmp->srcRank = -1;
348         }
349         tmp->ptr = (void *)(new CurLoopInfo(FuncCkLoop::MAX_CHUNKS));
350         CmiSetHandler(tmp, CpvAccess(NdhStealWorkHandler));
351     }
352 #else
353     nextFreeTaskBuffer = 0;
354     notifyMsg = (CharmNotifyMsg **)malloc(sizeof(CharmNotifyMsg *)*notifyMsgBufSize);
355     for (int i=0; i<notifyMsgBufSize; i++) {
356         CharmNotifyMsg *tmp = new(sizeof(int)*8)CharmNotifyMsg; //allow msg priority bits
357         notifyMsg[i] = tmp;
358         if (thisCkLoop->useTreeBcast) {
359             tmp->srcRank = CmiMyRank();
360         } else {
361             tmp->srcRank = -1;
362         }
363         tmp->ptr = NULL;
364         envelope *env = UsrToEnv(tmp);
365         env->setMsgtype(ForChareMsg);
366         env->setEpIdx(CkIndex_FuncSingleHelper::stealWork(NULL));
367         env->setSrcPe(CkMyPe());
368         CmiSetHandler(env, _charmHandlerIdx);
369         //env->setObjPtr has to be called when a notification msg is sent
370     }
371     taskBuffer = (CurLoopInfo **)malloc(sizeof(CurLoopInfo *)*TASK_BUFFER_SIZE);
372     for (int i=0; i<TASK_BUFFER_SIZE; i++) {
373         taskBuffer[i] = new CurLoopInfo(FuncCkLoop::MAX_CHUNKS);
374     }
375 #endif
376     globalCkLoop->helperPtr[CkMyRank()] = this;
377 }
378
379 void FuncSingleHelper::stealWork(CharmNotifyMsg *msg) {
380 #if !USE_CONVERSE_NOTIFICATION
381     int srcRank = msg->srcRank;
382     CurLoopInfo *loop = (CurLoopInfo *)msg->ptr;
383     if (srcRank >= 0) {
384         //means using tree-broadcast to send the notification msg
385         int relPE = CmiMyRank()-msg->srcRank;
386         if (relPE<0) relPE += CmiMyNodeSize();
387
388         //CmiPrintf("Rank[%d]: got msg from src %d with relPE %d\n", CmiMyRank(), msg->srcRank, relPE);
389         relPE=relPE*TREE_BCAST_BRANCH+1;
390         for (int i=0; i<TREE_BCAST_BRANCH; i++, relPE++) {
391             if (relPE >= CmiMyNodeSize()) break;
392             int pe = (relPE + msg->srcRank)%CmiMyNodeSize();
393             //CmiPrintf("Rank[%d]: send msg to dst %d (relPE: %d) from src %d\n", CmiMyRank(), pe, relPE, msg->srcRank);
394             CharmNotifyMsg *newone = getNotifyMsg();
395             newone->ptr = (void *)loop;
396             envelope *env = UsrToEnv(newone);
397             env->setObjPtr(thisCkLoop->helperPtr[pe]->ckGetChareID().objPtr);
398             CmiPushPE(pe, (void *)env);
399         }
400     }
401     loop->stealWork();
402 #endif
403 }
404
405 void SingleHelperStealWork(ConverseNotifyMsg *msg) {
406     int srcRank = msg->srcRank;
407
408     if (srcRank >= 0) {
409         //means using tree-broadcast to send the notification msg
410
411         //int numHelpers = CmiMyNodeSize(); //the value of "numHelpers" should be obtained somewhere else
412         int relPE = CmiMyRank()-msg->srcRank;
413         if (relPE<0) relPE += CmiMyNodeSize();
414
415         //CmiPrintf("Rank[%d]: got msg from src %d with relPE %d\n", CmiMyRank(), msg->srcRank, relPE);
416         relPE=relPE*TREE_BCAST_BRANCH+1;
417         for (int i=0; i<TREE_BCAST_BRANCH; i++, relPE++) {
418             if (relPE >= CmiMyNodeSize()) break;
419             int pe = (relPE + msg->srcRank)%CmiMyNodeSize();
420             //CmiPrintf("Rank[%d]: send msg to dst %d (relPE: %d) from src %d\n", CmiMyRank(), pe, relPE, msg->srcRank);
421             CmiPushPE(pe, (void *)msg);
422         }
423     }
424     CurLoopInfo *loop = (CurLoopInfo *)msg->ptr;
425     loop->stealWork();
426 }
427
428 void CurLoopInfo::stealWork() {
429     //indicate the current work hasn't been initialized
430     //or the old work has finished.
431     if (inited == 0) return;
432
433     int first, last;
434     int unit = (upperIndex-lowerIndex+1)/numChunks;
435     int remainder = (upperIndex-lowerIndex+1)-unit*numChunks;
436     int markIdx = remainder*(unit+1);
437
438     int nextChunkId = getNextChunkIdx();
439     int execTimes = 0;
440     while (nextChunkId < numChunks) {
441         if (nextChunkId < remainder) {
442             first = lowerIndex+(unit+1)*nextChunkId;
443             last = first+unit;
444         } else {
445             first = lowerIndex+(nextChunkId - remainder)*unit + markIdx;
446             last = first+unit-1;
447         }
448
449         fnPtr(first, last, redBufs[nextChunkId], paramNum, param);
450         execTimes++;
451         nextChunkId = getNextChunkIdx();
452     }
453     reportFinished(execTimes);
454 }
455
456 //======================================================================//
457 //   End of functions related with FuncSingleHelper                     //
458 //======================================================================//
459
460 CProxy_FuncCkLoop CkLoop_Init(int numThreads) {
461     int mode;
462 #if CMK_SMP
463     mode = CKLOOP_USECHARM;
464 #if USE_CONVERSE_NOTIFICATION
465     CkPrintf("CkLoopLib is used in SMP with a simple dynamic scheduling (converse-level notification) but not using node-level queue\n");
466 #else
467     CkPrintf("CkLoopLib is used in SMP with a simple dynamic scheduling (charm-level notifiation) but not using node-level queue\n");
468 #endif
469 #else
470     mode = CKLOOP_PTHREAD;
471     CkPrintf("CkLoopLib is used with extra %d pthreads via a simple dynamic scheduling\n", numThreads);
472     CmiAssert(numThreads>0);
473 #endif
474     return CProxy_FuncCkLoop::ckNew(mode, numThreads);
475 }
476
477 void CkLoop_Exit(CProxy_FuncCkLoop ckLoop) {
478     ckLoop.exit();
479 }
480
481 void CkLoop_Parallelize(HelperFn func,
482                             int paramNum, void * param,
483                             int numChunks, int lowerRange, int upperRange,
484                             int sync,
485                             void *redResult, REDUCTION_TYPE type) {
486     globalCkLoop->parallelizeFunc(func, paramNum, param, numChunks, lowerRange, upperRange, sync, redResult, type);
487 }
488 #include "CkLoop.def.h"