Bug #648: Add CkLoop tracing for Projections
[charm.git] / src / libs / ck-libs / ckloop / CkLoop.C
1 #include "CkLoop.h"
2 #if !defined(_WIN32)
3 #include <pthread.h>
4 #endif
5
6 #if !USE_CONVERSE_NOTIFICATION
7 #include "qd.h"
8 #endif
9
10 #define CKLOOP_USECHARM 1
11 #define CKLOOP_PTHREAD 2
12 #define CKLOOP_NOOP 3
13
14 /*====Beginning of pthread-related variables and impelementation====*/
15 //__thread is not portable, but it works almost everywhere if pthread works
16 //After C++11, this should be thread_local
17 #if !CMK_SMP
18 static __thread pthread_cond_t thdCondition; //the signal var of each pthread to be notified
19 static __thread pthread_mutex_t thdLock; //the lock associated with the condition variables
20 #endif
21
22 static FuncCkLoop *mainHelper = NULL;
23 static int mainHelperPhyRank = 0;
24 static int numPhysicalPEs = 0;
25 static CurLoopInfo *pthdLoop = NULL; //the pthread-version is always synchronized
26 #if !defined(_WIN32)
27 static pthread_mutex_t **allLocks = NULL;
28 static pthread_cond_t **allConds = NULL;
29 static pthread_t *ndhThreads = NULL;
30 #endif
31 static volatile int gCrtCnt = 0;
32 static volatile int exitFlag = 0;
33
34 #if CMK_OS_IS_LINUX
35 #include <sys/syscall.h>
36 #endif
37
38 static int HelperOnCore() {
39 #if CMK_OS_IS_LINUX
40     char fname[64];
41     sprintf(fname, "/proc/%d/task/%ld/stat", getpid(), syscall(SYS_gettid));
42     FILE *ifp = fopen(fname, "r");
43     if (ifp == NULL) return -1;
44     fseek(ifp, 0, SEEK_SET);
45     char str[128];
46     for (int i=0; i<39; i++) fscanf(ifp, "%s", str);
47     fclose(ifp);
48     return atoi(str);
49 #else
50     return -1;
51 #endif
52 }
53
54 static void *ndhThreadWork(void *id) {
55 #if !CMK_SMP && !defined(_WIN32)
56     size_t myId = (size_t) id;
57
58     //further improvement of this affinity setting!!
59     int myPhyRank = (myId+mainHelperPhyRank)%numPhysicalPEs;
60     //printf("thread[%d]: affixed to rank %d\n", myId, myPhyRank);
61     myPhyRank = myId;
62     CmiSetCPUAffinity(myPhyRank);
63
64     pthread_mutex_init(&thdLock, NULL);
65     pthread_cond_init(&thdCondition, NULL);
66
67     allLocks[myId-1] = &thdLock;
68     allConds[myId-1] = &thdCondition;
69
70     __sync_add_and_fetch(&gCrtCnt, 1);
71
72     while (1) {
73         //printf("thread[%ld]: on core %d with main %d\n", myId, HelperOnCore(), mainHelperPhyRank);
74         if (exitFlag) break;
75         pthread_mutex_lock(&thdLock);
76         pthread_cond_wait(&thdCondition, &thdLock);
77         pthread_mutex_unlock(&thdLock);
78         /* kids ID range: [1 ~ numHelpers-1] */
79         if (mainHelper->needTreeBcast()) {
80             //notify my children
81             int myKid = myId*TREE_BCAST_BRANCH+1;
82             for (int i=0; i<TREE_BCAST_BRANCH; i++, myKid++) {
83                 if (myKid >= mainHelper->getNumHelpers()) break;
84                 //all locks and conditions exclude the main thread, so index needs to be subtracted by one
85                 pthread_mutex_lock(allLocks[myKid-1]);
86                 pthread_cond_signal(allConds[myKid-1]);
87                 pthread_mutex_unlock(allLocks[myKid-1]);
88             }
89         }
90         pthdLoop->stealWork();
91     }
92 #endif
93 }
94
95 void FuncCkLoop::createPThreads() {
96 #if !defined(_WIN32)
97     int numThreads = numHelpers - 1;
98     allLocks = (pthread_mutex_t **)malloc(sizeof(void *)*numThreads);
99     allConds = (pthread_cond_t **)malloc(sizeof(void *)*numThreads);
100     memset(allLocks, 0, sizeof(void *)*numThreads);
101     memset(allConds, 0, sizeof(void *)*numThreads);
102
103     pthread_attr_t attr;
104     pthread_attr_init(&attr);
105     pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
106     ndhThreads = new pthread_t[numThreads];
107     mainHelperPhyRank = CmiOnCore();
108     numPhysicalPEs = CmiNumCores();
109     if (mainHelperPhyRank == -1) mainHelperPhyRank = 0;
110     for (int i=1; i<=numThreads; i++) {
111         pthread_create(ndhThreads+i, &attr, ndhThreadWork, (void *)i);
112     }
113     while (gCrtCnt != numThreads); //wait for all threads to finish creation
114 #endif
115 }
116
117 void FuncCkLoop::exit() {
118 #if !defined(_WIN32)
119     if (mode == CKLOOP_PTHREAD) {
120         exitFlag = 1;
121         for (int i=0; i<numHelpers-1; i++)
122             pthread_join(ndhThreads[i], NULL);
123         delete [] ndhThreads;
124         free(allLocks);
125         free(allConds);
126         delete pthdLoop;
127     }
128 #endif
129 }
130
131 /*====End of pthread-related variables and impelementation====*/
132
133
134 /* Note: Those event ids should be unique globally!! */
135 #define CKLOOP_TOTAL_WORK_EVENTID  139
136 #define CKLOOP_FINISH_SIGNAL_EVENTID 143
137
138 static FuncCkLoop *globalCkLoop = NULL;
139
140 FuncCkLoop::FuncCkLoop(int mode_, int numThreads_) {
141   init(mode_, numThreads_);
142 }
143
144 void FuncCkLoop::init(int mode_, int numThreads_) {
145   traceRegisterUserEvent("ckloop total work",CKLOOP_TOTAL_WORK_EVENTID);
146   traceRegisterUserEvent("ckloop finish signal",CKLOOP_FINISH_SIGNAL_EVENTID);
147
148   mode = mode_;
149   loop_info_inited_lock = CmiCreateLock();
150
151   CmiAssert(globalCkLoop==NULL);
152   globalCkLoop = this;
153
154   if (mode == CKLOOP_USECHARM) {
155       //CkPrintf("FuncCkLoop created on node %d\n", CkMyNode());
156       numHelpers = CkMyNodeSize();
157       helperPtr = new FuncSingleHelper *[numHelpers];
158       useTreeBcast = (numHelpers >= USE_TREE_BROADCAST_THRESHOLD);
159
160       int pestart = CkNodeFirst(CkMyNode());
161
162       for (int i=0; i<numHelpers; i++) {
163           CkChareID helper;
164           CProxy_FuncSingleHelper::ckNew(&helper, pestart+i);
165       }
166   } else if (mode == CKLOOP_PTHREAD) {
167       helperPtr = NULL;
168
169       numHelpers = numThreads_;
170       useTreeBcast = (numHelpers >= USE_TREE_BROADCAST_THRESHOLD);
171       pthdLoop = new CurLoopInfo(FuncCkLoop::MAX_CHUNKS);
172       mainHelper = this;
173       createPThreads();
174   }
175 }
176
177 FuncCkLoop::FuncCkLoop(CkMigrateMessage *m) : CBase_FuncCkLoop(m) {
178 }
179
180 int FuncCkLoop::MAX_CHUNKS = 64;
181
182 #if CMK_TRACE_ENABLED
183 #define TRACE_START(id) _start = CmiWallTimer()
184 #define TRACE_BRACKET(id) traceUserBracketEvent(id,_start,CmiWallTimer())
185 #else
186 #define TRACE_START(id)
187 #define TRACE_BRACKET(id)
188 #endif
189
190 #define ALLOW_MULTIPLE_UNSYNC 1
191 void FuncCkLoop::parallelizeFunc(HelperFn func, int paramNum, void * param,
192                                      int numChunks, int lowerRange,
193                                      int upperRange, int sync,
194                                      void *redResult, REDUCTION_TYPE type) {
195
196     double _start; //may be used for tracing
197
198     if (numChunks > MAX_CHUNKS) {
199         numChunks = MAX_CHUNKS;
200     }
201
202     /* "stride" determines the number of loop iterations to be done in each chunk
203      * for chunk indexed at 0 to remainder-1, stride is "unit+1";
204      * for chunk indexed at remainder to numChunks-1, stride is "unit"
205     int stride;
206      */
207     CurLoopInfo *curLoop = NULL;
208
209     //for using nodequeue
210     TRACE_START(CKLOOP_TOTAL_WORK_EVENTID);
211     if (mode == CKLOOP_USECHARM) {
212         FuncSingleHelper *thisHelper = helperPtr[CkMyRank()];
213 #if USE_CONVERSE_NOTIFICATION
214 #if ALLOW_MULTIPLE_UNSYNC
215         ConverseNotifyMsg *notifyMsg = thisHelper->getNotifyMsg();
216 #else
217         ConverseNotifyMsg *notifyMsg = thisHelper->notifyMsg;
218 #endif
219         curLoop = (CurLoopInfo *)(notifyMsg->ptr);
220         curLoop->set(numChunks, func, lowerRange, upperRange, paramNum, param);
221         if (useTreeBcast) {
222             int loopTimes = TREE_BCAST_BRANCH>(CmiMyNodeSize()-1)?CmiMyNodeSize()-1:TREE_BCAST_BRANCH;
223             //just implicit binary tree
224             int pe = CmiMyRank()+1;
225             for (int i=0; i<loopTimes; i++, pe++) {
226                 if (pe >= CmiMyNodeSize()) pe -= CmiMyNodeSize();
227                 CmiPushPE(pe, (void *)(notifyMsg));
228             }
229         } else {
230             for (int i=CmiMyRank()+1; i<numHelpers; i++) {
231                 CmiPushPE(i, (void *)(notifyMsg));
232             }
233             for (int i=0; i<CmiMyRank(); i++) {
234                 CmiPushPE(i, (void *)(notifyMsg));
235             }
236         }
237 #else
238 #if ALLOW_MULTIPLE_UNSYNC
239         curLoop = thisHelper->getNewTask();
240 #else
241         curLoop = thisHelper->taskBuffer[0];
242 #endif
243         curLoop->set(numChunks, func, lowerRange, upperRange, paramNum, param);
244         CpvAccess(_qd)->create(numHelpers-1);
245         if (useTreeBcast) {
246             int loopTimes = TREE_BCAST_BRANCH>(CmiMyNodeSize()-1)?CmiMyNodeSize()-1:TREE_BCAST_BRANCH;
247             //just implicit binary tree
248             int pe = CmiMyRank()+1;
249             for (int i=0; i<loopTimes; i++, pe++) {
250                 if (pe >= CmiMyNodeSize()) pe -= CmiMyNodeSize();
251                 CharmNotifyMsg *one = thisHelper->getNotifyMsg();
252                 one->ptr = (void *)curLoop;
253                 envelope *env = UsrToEnv(one);
254                 env->setObjPtr(thisHelper->ckGetChareID().objPtr);
255                 CmiPushPE(pe, (void *)(env));
256             }
257         } else {
258             for (int i=CmiMyRank()+1; i<numHelpers; i++) {
259                 CharmNotifyMsg *one = thisHelper->getNotifyMsg();
260                 one->ptr = (void *)curLoop;
261                 envelope *env = UsrToEnv(one);
262                 env->setObjPtr(thisHelper->ckGetChareID().objPtr);
263                 //printf("[%d] sending a msg %p (env=%p) to [%d]\n", CmiMyRank(), one, env, i);
264                 CmiPushPE(i, (void *)(env));
265             }
266             for (int i=0; i<CmiMyRank(); i++) {
267                 CharmNotifyMsg *one = thisHelper->getNotifyMsg();
268                 one->ptr = (void *)curLoop;
269                 envelope *env = UsrToEnv(one);
270                 env->setObjPtr(thisHelper->ckGetChareID().objPtr);
271                 //printf("[%d] sending a msg %p (env=%p) to [%d]\n", CmiMyRank(), one, env, i);
272                 CmiPushPE(i, (void *)(env));
273             }
274         }
275 #endif
276     } else if (mode == CKLOOP_PTHREAD) {
277
278 #if !defined(_WIN32)
279         int numThreads = numHelpers-1;
280         curLoop = pthdLoop;
281         curLoop->set(numChunks, func, lowerRange, upperRange, paramNum, param);
282         int numNotices = numThreads;
283         if (useTreeBcast) {
284             numNotices = TREE_BCAST_BRANCH>=numThreads?numThreads:TREE_BCAST_BRANCH;
285         }
286         for (int i=0; i<numNotices; i++) {
287             pthread_mutex_lock(allLocks[i]);
288             pthread_cond_signal(allConds[i]);
289             pthread_mutex_unlock(allLocks[i]);
290         }
291         //in this mode, it's always synced
292         sync = 1;
293 #endif
294     } else if (mode == CKLOOP_NOOP) {
295       func(lowerRange, upperRange, redResult, paramNum, param);
296       return;
297     }
298     if(curLoop) curLoop->stealWork();
299     TRACE_BRACKET(CKLOOP_TOTAL_WORK_EVENTID);
300
301     //printf("[%d]: parallelize func %p with [%d ~ %d] divided into %d chunks using loop=%p\n", CkMyPe(), func, lowerRange, upperRange, numChunks, curLoop);
302
303     TRACE_START(CKLOOP_FINISH_SIGNAL_EVENTID);
304     curLoop->waitLoopDone(sync);
305     TRACE_BRACKET(CKLOOP_FINISH_SIGNAL_EVENTID);
306
307     //printf("[%d]: finished parallelize func %p with [%d ~ %d] divided into %d chunks using loop=%p\n", CkMyPe(), func, lowerRange, upperRange, numChunks, curLoop);
308
309     if (type!=CKLOOP_NONE)
310         reduce(curLoop->getRedBufs(), redResult, type, numChunks);
311     return;
312 }
313
314 #define COMPUTE_REDUCTION(T) {\
315     for(int i=0; i<numChunks; i++) {\
316      result += *((T *)(redBufs[i])); \
317      /*CkPrintf("CkLoop Reduce: %d\n", result);*/ \
318     }\
319 }
320 #define COMPUTE_REDUCTION_MAX(T) {\
321     for(int i=0; i<numChunks; i++) {\
322      if( *((T *)(redBufs[i])) > result ) result = *((T *)(redBufs[i])); \
323      /*CkPrintf("CkLoop Reduce: %d\n", result);*/ \
324     }\
325 }
326
327 void FuncCkLoop::destroyHelpers() {
328   int pe = CmiMyRank()+1;
329   for (int i = 0; i < numHelpers; i++) {
330     if (pe >= CmiMyNodeSize()) pe -= CmiMyNodeSize();
331     DestroyNotifyMsg *tmp = new DestroyNotifyMsg;
332     envelope *env = UsrToEnv(tmp);
333     env->setMsgtype(ForChareMsg);
334     env->setEpIdx(CkIndex_FuncSingleHelper::destroyMyself());
335     env->setSrcPe(CkMyPe());
336     CmiSetHandler(env, _charmHandlerIdx);
337     CmiPushPE(pe, (void *)(env));
338   }
339 }
340
341 void FuncCkLoop::reduce(void **redBufs, void *redBuf, REDUCTION_TYPE type, int numChunks) {
342     switch (type) {
343     case CKLOOP_INT_SUM: {
344         int result=0;
345         COMPUTE_REDUCTION(int)
346         *((int *)redBuf) = result;
347         break;
348     }
349     case CKLOOP_FLOAT_SUM: {
350         float result=0;
351         COMPUTE_REDUCTION(float)
352         *((float *)redBuf) = result;
353         break;
354     }
355     case CKLOOP_DOUBLE_SUM: {
356         double result=0;
357         COMPUTE_REDUCTION(double)
358         *((double *)redBuf) = result;
359         break;
360     }
361     case CKLOOP_DOUBLE_MAX: {
362         double result=0;
363         COMPUTE_REDUCTION_MAX(double)
364         *((double *)redBuf) = result;
365         break;
366     }
367     default:
368         break;
369     }
370 }
371
372 void FuncCkLoop::registerHelper(HelperNotifyMsg* msg) {
373   helperPtr[msg->srcRank] = msg->localHelper;
374   msg->localHelper->thisCkLoop = this;
375   delete msg;
376 }
377
378 void FuncCkLoop::pup(PUP::er &p) {
379   p|mode;
380   p|numHelpers;
381   if (p.isUnpacking()) {
382     init(mode, numHelpers);
383   }
384 }
385
386 static int _ckloopEP;
387 CpvStaticDeclare(int, NdhStealWorkHandler);
388 static void RegisterCkLoopHdlrs() {
389     CpvInitialize(int, NdhStealWorkHandler);
390     CpvAccess(NdhStealWorkHandler) = CmiRegisterHandler((CmiHandler)SingleHelperStealWork);
391 #ifdef __BIGSIM__
392     if(BgNodeRank()==0) {
393 #else
394       if(CkMyRank()==0) {
395 #endif
396         int _ckloopMsg = CkRegisterMsg("ckloop_converse_msg", 0, 0, 0, 0);
397         int _ckloopChare = CkRegisterChare("ckloop_converse_chare", 0, TypeInvalid);
398         CkRegisterChareInCharm(_ckloopChare);
399         _ckloopEP = CkRegisterEp("CkLoop", (CkCallFnPtr)SingleHelperStealWork, _ckloopMsg, _ckloopChare, 0+CK_EP_INTRINSIC);
400       }
401 }
402
403 extern int _charmHandlerIdx;
404
405 FuncSingleHelper::FuncSingleHelper() {
406     CmiAssert(globalCkLoop!=NULL);
407     thisCkLoop = globalCkLoop;
408     totalHelpers = globalCkLoop->numHelpers;
409     funcckproxy = globalCkLoop->thisProxy;
410     useTreeBcast = globalCkLoop->useTreeBcast;
411
412     createNotifyMsg();
413
414     globalCkLoop->helperPtr[CkMyRank()] = this;
415 }
416
417 void FuncSingleHelper::createNotifyMsg() {
418 #if USE_CONVERSE_NOTIFICATION
419     notifyMsgBufSize = TASK_BUFFER_SIZE;
420 #else
421     notifyMsgBufSize = TASK_BUFFER_SIZE*totalHelpers;
422 #endif
423
424     nextFreeNotifyMsg = 0;
425 #if USE_CONVERSE_NOTIFICATION
426     notifyMsg = (ConverseNotifyMsg *)malloc(sizeof(ConverseNotifyMsg)*notifyMsgBufSize);
427     for (int i=0; i<notifyMsgBufSize; i++) {
428         ConverseNotifyMsg *tmp = notifyMsg+i;
429         if (useTreeBcast) {
430             tmp->srcRank = CmiMyRank();
431         } else {
432             tmp->srcRank = -1;
433         }
434         tmp->ptr = (void *)(new CurLoopInfo(FuncCkLoop::MAX_CHUNKS));
435         CmiSetHandler(tmp, CpvAccess(NdhStealWorkHandler));
436     }
437 #else
438     nextFreeTaskBuffer = 0;
439     notifyMsg = (CharmNotifyMsg **)malloc(sizeof(CharmNotifyMsg *)*notifyMsgBufSize);
440     for (int i=0; i<notifyMsgBufSize; i++) {
441         CharmNotifyMsg *tmp = new(sizeof(int)*8)CharmNotifyMsg; //allow msg priority bits
442         notifyMsg[i] = tmp;
443         if (useTreeBcast) {
444             tmp->srcRank = CmiMyRank();
445         } else {
446             tmp->srcRank = -1;
447         }
448         tmp->ptr = NULL;
449         envelope *env = UsrToEnv(tmp);
450         env->setMsgtype(ForChareMsg);
451         env->setEpIdx(CkIndex_FuncSingleHelper::stealWork(NULL));
452         env->setSrcPe(CkMyPe());
453         CmiSetHandler(env, _charmHandlerIdx);
454         //env->setObjPtr has to be called when a notification msg is sent
455     }
456     taskBuffer = (CurLoopInfo **)malloc(sizeof(CurLoopInfo *)*TASK_BUFFER_SIZE);
457     for (int i=0; i<TASK_BUFFER_SIZE; i++) {
458         taskBuffer[i] = new CurLoopInfo(FuncCkLoop::MAX_CHUNKS);
459     }
460 #endif
461 }
462
463 void FuncSingleHelper::stealWork(CharmNotifyMsg *msg) {
464 #if !USE_CONVERSE_NOTIFICATION
465     int srcRank = msg->srcRank;
466     CurLoopInfo *loop = (CurLoopInfo *)msg->ptr;
467     if (srcRank >= 0) {
468         //means using tree-broadcast to send the notification msg
469         int relPE = CmiMyRank()-msg->srcRank;
470         if (relPE<0) relPE += CmiMyNodeSize();
471
472         //CmiPrintf("Rank[%d]: got msg from src %d with relPE %d\n", CmiMyRank(), msg->srcRank, relPE);
473         relPE=relPE*TREE_BCAST_BRANCH+1;
474         for (int i=0; i<TREE_BCAST_BRANCH; i++, relPE++) {
475             if (relPE >= CmiMyNodeSize()) break;
476             int pe = (relPE + msg->srcRank)%CmiMyNodeSize();
477             //CmiPrintf("Rank[%d]: send msg to dst %d (relPE: %d) from src %d\n", CmiMyRank(), pe, relPE, msg->srcRank);
478             CharmNotifyMsg *newone = getNotifyMsg();
479             newone->ptr = (void *)loop;
480             envelope *env = UsrToEnv(newone);
481             env->setObjPtr(thisCkLoop->helperPtr[pe]->ckGetChareID().objPtr);
482             CmiPushPE(pe, (void *)env);
483         }
484     }
485     loop->stealWork();
486 #endif
487 }
488
489 void SingleHelperStealWork(ConverseNotifyMsg *msg) {
490     int srcRank = msg->srcRank;
491
492     if (srcRank >= 0) {
493         //means using tree-broadcast to send the notification msg
494
495         //int numHelpers = CmiMyNodeSize(); //the value of "numHelpers" should be obtained somewhere else
496         int relPE = CmiMyRank()-msg->srcRank;
497         if (relPE<0) relPE += CmiMyNodeSize();
498
499         //CmiPrintf("Rank[%d]: got msg from src %d with relPE %d\n", CmiMyRank(), msg->srcRank, relPE);
500         relPE=relPE*TREE_BCAST_BRANCH+1;
501         for (int i=0; i<TREE_BCAST_BRANCH; i++, relPE++) {
502             if (relPE >= CmiMyNodeSize()) break;
503             int pe = (relPE + msg->srcRank)%CmiMyNodeSize();
504             //CmiPrintf("Rank[%d]: send msg to dst %d (relPE: %d) from src %d\n", CmiMyRank(), pe, relPE, msg->srcRank);
505             CmiPushPE(pe, (void *)msg);
506         }
507     }
508     CurLoopInfo *loop = (CurLoopInfo *)msg->ptr;
509
510     _TRACE_BEGIN_EXECUTE_DETAILED(0, 4, _ckloopEP, CkNodeFirst(CkMyNode())+srcRank, sizeof(ConverseNotifyMsg), NULL);
511     loop->stealWork();
512     _TRACE_END_EXECUTE();
513 }
514
515 void CurLoopInfo::stealWork() {
516     //indicate the current work hasn't been initialized
517     //or the old work has finished.
518     CmiLock(loop_info_inited_lock);
519     if (inited == 0) {
520       CmiUnlock(loop_info_inited_lock);
521       return;
522     }
523
524     int nextChunkId = getNextChunkIdx();
525     if (nextChunkId >= numChunks) {
526       CmiUnlock(loop_info_inited_lock);
527       return;
528     }
529
530     CmiUnlock(loop_info_inited_lock);
531     int execTimes = 0;
532
533     int first, last;
534     int unit = (upperIndex-lowerIndex+1)/numChunks;
535     int remainder = (upperIndex-lowerIndex+1)-unit*numChunks;
536     int markIdx = remainder*(unit+1);
537
538     while (nextChunkId < numChunks) {
539       if (nextChunkId < remainder) {
540         first = lowerIndex+(unit+1)*nextChunkId;
541         last = first+unit;
542       } else {
543         first = lowerIndex+(nextChunkId - remainder)*unit + markIdx;
544         last = first+unit-1;
545       }
546
547       if (first < lowerIndex || first > upperIndex || last < lowerIndex || last > upperIndex) {
548         CkPrintf("Error in CurLoopInfo::stealWork() node %d pe %d lowerIndex %d upperIndex %d numChunks %d first %d last %d\n",
549           CkMyNode(), CkMyPe(), lowerIndex, upperIndex, numChunks, first, last);
550         CkAbort("Indices of CkLoop incorrect. There maybe a race condition!\n");
551       }
552
553         fnPtr(first, last, redBufs[nextChunkId], paramNum, param);
554         execTimes++;
555         nextChunkId = getNextChunkIdx();
556     }
557     reportFinished(execTimes);
558 }
559
560 //======================================================================//
561 //   End of functions related with FuncSingleHelper                     //
562 //======================================================================//
563
564 CProxy_FuncCkLoop CkLoop_Init(int numThreads) {
565     int mode;
566 #if CMK_SMP
567     mode = CKLOOP_USECHARM;
568 #if USE_CONVERSE_NOTIFICATION
569     CkPrintf("CkLoopLib is used in SMP with a simple dynamic scheduling (converse-level notification) but not using node-level queue\n");
570 #else
571     CkPrintf("CkLoopLib is used in SMP with a simple dynamic scheduling (charm-level notifiation) but not using node-level queue\n");
572 #endif
573 #elif defined(WIN32)
574     mode = CKLOOP_NOOP;
575 #else
576     mode = CKLOOP_PTHREAD;
577     CkPrintf("CkLoopLib is used with extra %d pthreads via a simple dynamic scheduling\n", numThreads);
578     CmiAssert(numThreads>0);
579 #endif
580     return CProxy_FuncCkLoop::ckNew(mode, numThreads);
581 }
582
583 void CkLoop_Exit(CProxy_FuncCkLoop ckLoop) {
584     ckLoop.exit();
585 }
586
587 void CkLoop_Parallelize(HelperFn func,
588                             int paramNum, void * param,
589                             int numChunks, int lowerRange, int upperRange,
590                             int sync,
591                             void *redResult, REDUCTION_TYPE type) {
592     if ( numChunks > upperRange - lowerRange + 1 ) numChunks = upperRange - lowerRange + 1;
593     globalCkLoop->parallelizeFunc(func, paramNum, param, numChunks, lowerRange, upperRange, sync, redResult, type);
594 }
595
596 void CkLoop_DestroyHelpers() {
597   globalCkLoop->destroyHelpers();
598 }
599 #include "CkLoop.def.h"