Reset "inited" in the unsync mode so as to avoid expensive atomic operation by
[charm.git] / NodeHelper.C
1 #include "NodeHelper.h"
2
3 FuncNodeHelper::FuncNodeHelper()
4 {  
5 #if CMK_SMP     
6     //CkPrintf("FuncNodeHelper created on node %d\n", CkMyNode());
7          
8     traceRegisterUserEvent("nodehelper total work",20);
9     traceRegisterUserEvent("nodehlelper finish signal",21);
10     
11         numHelpers = CkMyNodeSize();
12         helperPtr = new FuncSingleHelper *[numHelpers];
13         useTreeBcast = (numHelpers >= USE_TREE_BROADCAST_THRESHOLD);
14         
15         int pestart = CkNodeFirst(CkMyNode());
16                 
17         for (int i=0; i<numHelpers; i++) {
18         CkChareID helper;
19         CProxy_FuncSingleHelper::ckNew((size_t)this, &helper, pestart+i);
20         }       
21 #endif
22 }
23
24 int FuncNodeHelper::MAX_CHUNKS = 64;
25
26 #if CMK_TRACE_ENABLED
27 #define TRACE_START(id) _start = CmiWallTimer()
28 #define TRACE_BRACKET(id) traceUserBracketEvent(id,_start,CmiWallTimer())
29 #else
30 #define TRACE_START(id)
31 #define TRACE_BRACKET(id)
32 #endif
33
34 void FuncNodeHelper::parallelizeFunc(HelperFn func, int paramNum, void * param, 
35                                     int numChunks, int lowerRange, 
36                                     int upperRange, int sync,
37                                     void *redResult, REDUCTION_TYPE type) {
38                                         
39     double _start; //may be used for tracing
40     
41     if(numChunks > MAX_CHUNKS){ 
42         CkPrintf("NodeHelper[%d]: WARNING! chunk is set to MAX_CHUNKS=%d\n", CmiMyPe(), MAX_CHUNKS);
43         numChunks = MAX_CHUNKS;
44     }
45         
46     /* "stride" determines the number of loop iterations to be done in each chunk
47      * for chunk indexed at 0 to remainder-1, stride is "unit+1";
48      * for chunk indexed at remainder to numChunks-1, stride is "unit"
49      */
50      int stride;
51     
52     //for using nodequeue
53         TRACE_START(20);
54         
55         FuncSingleHelper *thisHelper = helperPtr[CkMyRank()];
56         CurLoopInfo *curLoop = thisHelper->curLoop;
57         curLoop->set(numChunks, func, lowerRange, upperRange, paramNum, param);
58     ConverseNotifyMsg *notifyMsg = thisHelper->notifyMsg;       
59         if(useTreeBcast){               
60                 int loopTimes = TREE_BCAST_BRANCH>(CmiMyNodeSize()-1)?CmiMyNodeSize()-1:TREE_BCAST_BRANCH;
61                 //just implicit binary tree
62                 int pe = CmiMyRank()+1;        
63                 for(int i=0; i<loopTimes; i++, pe++){
64                         if(pe >= CmiMyNodeSize()) pe -= CmiMyNodeSize();
65                         CmiPushPE(pe, (void *)(notifyMsg));    
66                 }
67         }else{
68                 for (int i=0; i<numHelpers; i++) {
69                         if (i!=CkMyRank()) CmiPushPE(i, (void *)(notifyMsg));            
70                 }
71         }
72
73         curLoop->stealWork();
74         TRACE_BRACKET(20);
75         
76         TRACE_START(21);                
77         curLoop->waitLoopDone(sync);
78         TRACE_BRACKET(21);        
79
80     if (type!=NODEHELPER_NONE)
81         reduce(curLoop->getRedBufs(), redResult, type, numChunks);            
82     return;
83 }
84
85 #define COMPUTE_REDUCTION(T) {\
86     for(int i=0; i<numChunks; i++) {\
87      result += *((T *)(redBufs[i])); \
88      /*CkPrintf("Nodehelper Reduce: %d\n", result);*/ \
89     }\
90 }
91
92 void FuncNodeHelper::reduce(void **redBufs, void *redBuf, REDUCTION_TYPE type, int numChunks) {
93     switch(type){
94         case NODEHELPER_INT_SUM:
95         {
96             int result=0;
97             COMPUTE_REDUCTION(int)
98             *((int *)redBuf) = result;
99             break;
100         }
101         case NODEHELPER_FLOAT_SUM:
102         {
103             float result=0;
104             COMPUTE_REDUCTION(float)
105             *((float *)redBuf) = result;
106             break;
107         }
108         case NODEHELPER_DOUBLE_SUM:
109         {
110             double result=0;
111             COMPUTE_REDUCTION(double)
112             *((double *)redBuf) = result;
113             break;
114         }
115         default:
116         break;
117     }
118 }
119
120 void SingleHelperStealWork(ConverseNotifyMsg *msg){
121         
122         int srcRank = msg->srcRank;
123         
124         if(srcRank >= 0){
125                 //means using tree-broadcast to send the notification msg
126                 
127                 //int numHelpers = CmiMyNodeSize(); //the value of "numHelpers" should be obtained somewhere else
128                 int relPE = CmiMyRank()-msg->srcRank;
129                 if(relPE<0) relPE += CmiMyNodeSize();
130                 
131                 //CmiPrintf("Rank[%d]: got msg from src %d with relPE %d\n", CmiMyRank(), msg->srcRank, relPE);
132                 relPE=relPE*TREE_BCAST_BRANCH+1;
133                 for(int i=0; i<TREE_BCAST_BRANCH; i++, relPE++){
134                         if(relPE >= CmiMyNodeSize()) break;
135                         int pe = (relPE + msg->srcRank)%CmiMyNodeSize();
136                         //CmiPrintf("Rank[%d]: send msg to dst %d (relPE: %d) from src %d\n", CmiMyRank(), pe, relPE, msg->srcRank);
137                         CmiPushPE(pe, (void *)msg);
138                 }
139         }
140     CurLoopInfo *loop = (CurLoopInfo *)msg->ptr;
141     loop->stealWork();
142 }
143
144 void CurLoopInfo::stealWork(){
145     //indicate the current work hasn't been initialized
146     //or the old work has finished.
147     if(inited == 0) return;
148     
149     int first, last;
150     int unit = (upperIndex-lowerIndex+1)/numChunks;
151     int remainder = (upperIndex-lowerIndex+1)-unit*numChunks;
152     int markIdx = remainder*(unit+1);
153     
154     int nextChunkId = getNextChunkIdx();
155     int execTimes = 0;
156     while(nextChunkId < numChunks){
157         if(nextChunkId < remainder){
158             first = (unit+1)*nextChunkId;
159             last = first+unit;
160         }else{
161             first = (nextChunkId - remainder)*unit + markIdx;
162             last = first+unit-1;
163         }
164                 
165         fnPtr(first, last, redBufs[nextChunkId], paramNum, param);
166         execTimes++;
167         nextChunkId = getNextChunkIdx();
168     }
169     reportFinished(execTimes);
170 }
171
172 //======================================================================//
173 //   End of functions related with FuncSingleHelper                     //
174 //======================================================================//
175
176 CProxy_FuncNodeHelper NodeHelper_Init(){
177     CkPrintf("NodeHelperLib is used in SMP with a simple dynamic scheduling but not using node-level queue\n");
178     return CProxy_FuncNodeHelper::ckNew();
179 }
180
181 void NodeHelper_Parallelize(CProxy_FuncNodeHelper nodeHelper, HelperFn func, 
182                         int paramNum, void * param, 
183                         int numChunks, int lowerRange, int upperRange,
184                         int sync,
185                         void *redResult, REDUCTION_TYPE type)
186 {
187     nodeHelper[CkMyNode()].ckLocalBranch()->parallelizeFunc(func, paramNum, param, numChunks, lowerRange, upperRange, sync, redResult, type);
188 }
189
190 #include "NodeHelper.def.h"