reduced the frequency of calling atomic increment.
[charm.git] / NodeHelper.C
1 #include "NodeHelper.h"
2
3 FuncNodeHelper::FuncNodeHelper()
4 {  
5 #if CMK_SMP     
6     //CkPrintf("FuncNodeHelper created on node %d\n", CkMyNode());
7          
8     traceRegisterUserEvent("nodehelper total work",20);
9     traceRegisterUserEvent("nodehlelper finish signal",21);
10     
11         numHelpers = CkMyNodeSize();
12         helperPtr = new FuncSingleHelper *[numHelpers];
13         useTreeBcast = (numHelpers >= USE_TREE_BROADCAST_THRESHOLD);
14         
15         int pestart = CkNodeFirst(CkMyNode());
16                 
17         for (int i=0; i<numHelpers; i++) {
18         CkChareID helper;
19         CProxy_FuncSingleHelper::ckNew((size_t)this, &helper, pestart+i);
20         }       
21 #endif
22 }
23
24 int FuncNodeHelper::MAX_CHUNKS = 64;
25
26 #if CMK_TRACE_ENABLED
27 #define TRACE_START(id) _start = CmiWallTimer()
28 #define TRACE_BRACKET(id) traceUserBracketEvent(id,_start,CmiWallTimer())
29 #else
30 #define TRACE_START(id)
31 #define TRACE_BRACKET(id)
32 #endif
33
34 void FuncNodeHelper::parallelizeFunc(HelperFn func, int paramNum, void * param, 
35                                     int msgPriority, int numChunks, int lowerRange, int upperRange, 
36                                     void *redResult, REDUCTION_TYPE type) {
37                                         
38     double _start; //may be used for tracing
39     
40     if(numChunks > MAX_CHUNKS){ 
41         CkPrintf("NodeHelper[%d]: WARNING! chunk is set to MAX_CHUNKS=%d\n", CmiMyPe(), MAX_CHUNKS);
42         numChunks = MAX_CHUNKS;
43     }
44         
45     /* "stride" determines the number of loop iterations to be done in each chunk
46      * for chunk indexed at 0 to remainder-1, stride is "unit+1";
47      * for chunk indexed at remainder to numChunks-1, stride is "unit"
48      */
49      int stride;
50     
51     //for using nodequeue
52         TRACE_START(20);
53         
54         FuncSingleHelper *thisHelper = helperPtr[CkMyRank()];
55         CurLoopInfo *curLoop = thisHelper->curLoop;
56         curLoop->set(numChunks, func, lowerRange, upperRange, paramNum, param);
57     ConverseNotifyMsg *notifyMsg = thisHelper->notifyMsg;       
58         if(useTreeBcast){               
59                 int loopTimes = TREE_BCAST_BRANCH>(CmiMyNodeSize()-1)?CmiMyNodeSize()-1:TREE_BCAST_BRANCH;
60                 //just implicit binary tree
61                 int pe = CmiMyRank()+1;        
62                 for(int i=0; i<loopTimes; i++, pe++){
63                         if(pe >= CmiMyNodeSize()) pe -= CmiMyNodeSize();
64                         CmiPushPE(pe, (void *)(notifyMsg));    
65                 }
66         }else{
67                 for (int i=0; i<numHelpers; i++) {
68                         if (i!=CkMyRank()) CmiPushPE(i, (void *)(notifyMsg));            
69                 }
70         }
71
72         curLoop->stealWork();
73         TRACE_BRACKET(20);
74         
75         TRACE_START(21);                
76         curLoop->waitLoopDone();
77         TRACE_BRACKET(21);        
78
79     if (type!=NODEHELPER_NONE)
80         reduce(curLoop->getRedBufs(), redResult, type, numChunks);            
81     return;
82 }
83
84 #define COMPUTE_REDUCTION(T) {\
85     for(int i=0; i<numChunks; i++) {\
86      result += *((T *)(redBufs[i])); \
87      /*CkPrintf("Nodehelper Reduce: %d\n", result);*/ \
88     }\
89 }
90
91 void FuncNodeHelper::reduce(void **redBufs, void *redBuf, REDUCTION_TYPE type, int numChunks) {
92     switch(type){
93         case NODEHELPER_INT_SUM:
94         {
95             int result=0;
96             COMPUTE_REDUCTION(int)
97             *((int *)redBuf) = result;
98             break;
99         }
100         case NODEHELPER_FLOAT_SUM:
101         {
102             float result=0;
103             COMPUTE_REDUCTION(float)
104             *((float *)redBuf) = result;
105             break;
106         }
107         case NODEHELPER_DOUBLE_SUM:
108         {
109             double result=0;
110             COMPUTE_REDUCTION(double)
111             *((double *)redBuf) = result;
112             break;
113         }
114         default:
115         break;
116     }
117 }
118
119 void SingleHelperStealWork(ConverseNotifyMsg *msg){
120         
121         int srcRank = msg->srcRank;
122         
123         if(srcRank >= 0){
124                 //means using tree-broadcast to send the notification msg
125                 
126                 //int numHelpers = CmiMyNodeSize(); //the value of "numHelpers" should be obtained somewhere else
127                 int relPE = CmiMyRank()-msg->srcRank;
128                 if(relPE<0) relPE += CmiMyNodeSize();
129                 
130                 //CmiPrintf("Rank[%d]: got msg from src %d with relPE %d\n", CmiMyRank(), msg->srcRank, relPE);
131                 relPE=relPE*TREE_BCAST_BRANCH+1;
132                 for(int i=0; i<TREE_BCAST_BRANCH; i++, relPE++){
133                         if(relPE >= CmiMyNodeSize()) break;
134                         int pe = (relPE + msg->srcRank)%CmiMyNodeSize();
135                         //CmiPrintf("Rank[%d]: send msg to dst %d (relPE: %d) from src %d\n", CmiMyRank(), pe, relPE, msg->srcRank);
136                         CmiPushPE(pe, (void *)msg);
137                 }
138         }
139     CurLoopInfo *loop = (CurLoopInfo *)msg->ptr;
140     loop->stealWork();
141 }
142
143 void CurLoopInfo::stealWork(){
144     //indicate the current work hasn't been initialized
145     //or the old work has finished.
146     if(inited == 0) return;
147     
148     int first, last;
149     int unit = (upperIndex-lowerIndex+1)/numChunks;
150     int remainder = (upperIndex-lowerIndex+1)-unit*numChunks;
151     int markIdx = remainder*(unit+1);
152     
153     int nextChunkId = getNextChunkIdx();
154     int execTimes = 0;
155     while(nextChunkId < numChunks){
156         if(nextChunkId < remainder){
157             first = (unit+1)*nextChunkId;
158             last = first+unit;
159         }else{
160             first = (nextChunkId - remainder)*unit + markIdx;
161             last = first+unit-1;
162         }
163                 
164         fnPtr(first, last, redBufs[nextChunkId], paramNum, param);
165         execTimes++;
166         nextChunkId = getNextChunkIdx();
167     }
168     reportFinished(execTimes);
169 }
170
171 //======================================================================//
172 //   End of functions related with FuncSingleHelper                     //
173 //======================================================================//
174
175 CProxy_FuncNodeHelper NodeHelper_Init(){
176     CkPrintf("NodeHelperLib is used in SMP with a simple dynamic scheduling but not using node-level queue\n");
177     return CProxy_FuncNodeHelper::ckNew();
178 }
179
180 void NodeHelper_Parallelize(CProxy_FuncNodeHelper nodeHelper, HelperFn func, 
181                         int paramNum, void * param, int msgPriority,
182                         int numChunks, int lowerRange, int upperRange, 
183                         void *redResult, REDUCTION_TYPE type)
184 {
185     nodeHelper[CkMyNode()].ckLocalBranch()->parallelizeFunc(func, paramNum, param, msgPriority, numChunks, lowerRange, upperRange, redResult, type);
186 }
187
188 #include "NodeHelper.def.h"