Make notifyMsg local to each helper instead of being local to the node helper
[charm.git] / NodeHelper.C
1 #include "NodeHelper.h"
2
3 FuncNodeHelper::FuncNodeHelper()
4 {  
5 #if CMK_SMP     
6     //CkPrintf("FuncNodeHelper created on node %d\n", CkMyNode());
7          
8     traceRegisterUserEvent("nodehelper total work",20);
9     traceRegisterUserEvent("nodehlelper finish signal",21);
10     
11         numHelpers = CkMyNodeSize();
12         helperPtr = new FuncSingleHelper *[numHelpers];
13         useTreeBcast = (numHelpers >= USE_TREE_BROADCAST_THRESHOLD);
14         
15         int pestart = CkNodeFirst(CkMyNode());
16         
17         CkChareID *helperArr = new CkChareID[numHelpers];
18         for (int i=0; i<numHelpers; i++) {
19                 CProxy_FuncSingleHelper::ckNew(thisgroup, &helperArr[i], pestart+i);
20                 helperPtr[i] = NULL;
21         }
22         for (int i=0; i<numHelpers; i++) {
23                 CProxy_FuncSingleHelper helpProxy(helperArr[i]);
24                 helpProxy.reportCreated();
25         }
26         delete [] helperArr;
27 #endif
28 }
29
30 int FuncNodeHelper::MAX_CHUNKS = 64;
31
32 #if CMK_TRACE_ENABLED
33 #define TRACE_START(id) _start = CmiWallTimer()
34 #define TRACE_BRACKET(id) traceUserBracketEvent(id,_start,CmiWallTimer())
35 #else
36 #define TRACE_START(id)
37 #define TRACE_BRACKET(id)
38 #endif
39
40 void FuncNodeHelper::parallelizeFunc(HelperFn func, int paramNum, void * param, 
41                                     int msgPriority, int numChunks, int lowerRange, int upperRange, 
42                                     void *redResult, REDUCTION_TYPE type) {
43                                         
44     double _start; //may be used for tracing
45     
46     if(numChunks > MAX_CHUNKS){ 
47         CkPrintf("NodeHelper[%d]: WARNING! chunk is set to MAX_CHUNKS=%d\n", CmiMyPe(), MAX_CHUNKS);
48         numChunks = MAX_CHUNKS;
49     }
50         
51     /* "stride" determines the number of loop iterations to be done in each chunk
52      * for chunk indexed at 0 to remainder-1, stride is "unit+1";
53      * for chunk indexed at remainder to numChunks-1, stride is "unit"
54      */
55      int stride;
56     
57     //for using nodequeue
58         TRACE_START(20);
59         
60         FuncSingleHelper *thisHelper = helperPtr[CkMyRank()];
61         CurLoopInfo *curLoop = thisHelper->curLoop;
62         curLoop->set(numChunks, func, lowerRange, upperRange, paramNum, param);
63     ConverseNotifyMsg *notifyMsg = thisHelper->notifyMsg;       
64         if(useTreeBcast){               
65                 int loopTimes = TREE_BCAST_BRANCH>(CmiMyNodeSize()-1)?CmiMyNodeSize()-1:TREE_BCAST_BRANCH;
66                 //just implicit binary tree
67                 int pe = CmiMyRank()+1;        
68                 for(int i=0; i<loopTimes; i++, pe++){
69                         if(pe >= CmiMyNodeSize()) pe -= CmiMyNodeSize();
70                         CmiPushPE(pe, (void *)(notifyMsg));    
71                 }
72         }else{
73                 for (int i=0; i<numHelpers; i++) {
74                         if (i!=CkMyRank()) CmiPushPE(i, (void *)(notifyMsg));            
75                 }
76         }
77
78         curLoop->stealWork();
79         TRACE_BRACKET(20);
80         
81         TRACE_START(21);                
82         curLoop->waitLoopDone();
83         TRACE_BRACKET(21);        
84
85     if (type!=NODEHELPER_NONE)
86         reduce(curLoop->getRedBufs(), redResult, type, numChunks);            
87     return;
88 }
89
90 #define COMPUTE_REDUCTION(T) {\
91     for(int i=0; i<numChunks; i++) {\
92      result += *((T *)(redBufs[i])); \
93      /*CkPrintf("Nodehelper Reduce: %d\n", result);*/ \
94     }\
95 }
96
97 void FuncNodeHelper::reduce(void **redBufs, void *redBuf, REDUCTION_TYPE type, int numChunks) {
98     switch(type){
99         case NODEHELPER_INT_SUM:
100         {
101             int result=0;
102             COMPUTE_REDUCTION(int)
103             *((int *)redBuf) = result;
104             break;
105         }
106         case NODEHELPER_FLOAT_SUM:
107         {
108             float result=0;
109             COMPUTE_REDUCTION(float)
110             *((float *)redBuf) = result;
111             break;
112         }
113         case NODEHELPER_DOUBLE_SUM:
114         {
115             double result=0;
116             COMPUTE_REDUCTION(double)
117             *((double *)redBuf) = result;
118             break;
119         }
120         default:
121         break;
122     }
123 }
124
125 void SingleHelperStealWork(ConverseNotifyMsg *msg){
126         
127         int srcRank = msg->srcRank;
128         
129         if(srcRank >= 0){
130                 //means using tree-broadcast to send the notification msg
131                 
132                 //int numHelpers = CmiMyNodeSize(); //the value of "numHelpers" should be obtained somewhere else
133                 int relPE = CmiMyRank()-msg->srcRank;
134                 if(relPE<0) relPE += CmiMyNodeSize();
135                 
136                 //CmiPrintf("Rank[%d]: got msg from src %d with relPE %d\n", CmiMyRank(), msg->srcRank, relPE);
137                 relPE=relPE*TREE_BCAST_BRANCH+1;
138                 for(int i=0; i<TREE_BCAST_BRANCH; i++, relPE++){
139                         if(relPE >= CmiMyNodeSize()) break;
140                         int pe = (relPE + msg->srcRank)%CmiMyNodeSize();
141                         //CmiPrintf("Rank[%d]: send msg to dst %d (relPE: %d) from src %d\n", CmiMyRank(), pe, relPE, msg->srcRank);
142                         CmiPushPE(pe, (void *)msg);
143                 }
144         }
145     CurLoopInfo *loop = (CurLoopInfo *)msg->ptr;
146     loop->stealWork();
147 }
148
149 void CurLoopInfo::stealWork(){
150     //indicate the current work hasn't been initialized
151     //or the old work has finished.
152     if(inited == 0) return;
153     
154     int first, last;
155     int unit = (upperIndex-lowerIndex+1)/numChunks;
156     int remainder = (upperIndex-lowerIndex+1)-unit*numChunks;
157     int markIdx = remainder*(unit+1);
158     
159     int nextChunkId = getNextChunkIdx();
160     while(nextChunkId < numChunks){
161         if(nextChunkId < remainder){
162             first = (unit+1)*nextChunkId;
163             last = first+unit;
164         }else{
165             first = (nextChunkId - remainder)*unit + markIdx;
166             last = first+unit-1;
167         }
168                 
169         fnPtr(first, last, redBufs[nextChunkId], paramNum, param);
170         reportFinished();
171         
172         nextChunkId = getNextChunkIdx();
173     }
174 }
175
176 //======================================================================//
177 //   End of functions related with FuncSingleHelper                     //
178 //======================================================================//
179
180 CProxy_FuncNodeHelper NodeHelper_Init(){
181     CkPrintf("NodeHelperLib is used in SMP with a simple dynamic scheduling but not using node-level queue\n");
182     return CProxy_FuncNodeHelper::ckNew();
183 }
184
185 void NodeHelper_Parallelize(CProxy_FuncNodeHelper nodeHelper, HelperFn func, 
186                         int paramNum, void * param, int msgPriority,
187                         int numChunks, int lowerRange, int upperRange, 
188                         void *redResult, REDUCTION_TYPE type)
189 {
190     nodeHelper[CkMyNode()].ckLocalBranch()->parallelizeFunc(func, paramNum, param, msgPriority, numChunks, lowerRange, upperRange, redResult, type);
191 }
192
193 #include "NodeHelper.def.h"