6f821b6ae5ec5965075d20411a49d3b2ac27d6b9
[charm.git] / NodeHelper.C
1 #include "NodeHelper.h"
2
3 FuncNodeHelper::FuncNodeHelper()
4 {  
5 #if CMK_SMP     
6     //CkPrintf("FuncNodeHelper created on node %d\n", CkMyNode());
7          
8     traceRegisterUserEvent("nodehelper total work",20);
9     traceRegisterUserEvent("nodehlelper finish signal",21);
10     
11         numHelpers = CkMyNodeSize();
12         helperPtr = new FuncSingleHelper *[numHelpers];
13         
14         notifyMsgs = (ConverseNotifyMsg *)malloc(sizeof(ConverseNotifyMsg)*numHelpers);
15         for(int i=0; i<numHelpers; i++) notifyMsgs[i].srcRank = -1;
16         
17         useTreeBcast = (numHelpers >= USE_TREE_BROADCAST_THRESHOLD);
18         
19         int pestart = CkNodeFirst(CkMyNode());
20         
21         CkChareID *helperArr = new CkChareID[numHelpers];
22         for (int i=0; i<numHelpers; i++) {
23                 CProxy_FuncSingleHelper::ckNew(thisgroup, &helperArr[i], pestart+i);
24                 helperPtr[i] = NULL;
25         }
26         for (int i=0; i<numHelpers; i++) {
27                 CProxy_FuncSingleHelper helpProxy(helperArr[i]);
28                 helpProxy.reportCreated();
29         }
30         delete [] helperArr;
31 #endif
32 }
33
34 int FuncNodeHelper::MAX_CHUNKS = 64;
35
36 #if CMK_TRACE_ENABLED
37 #define TRACE_START(id) _start = CmiWallTimer()
38 #define TRACE_BRACKET(id) traceUserBracketEvent(id,_start,CmiWallTimer())
39 #else
40 #define TRACE_START(id)
41 #define TRACE_BRACKET(id)
42 #endif
43
44 void FuncNodeHelper::parallelizeFunc(HelperFn func, int paramNum, void * param, 
45                                     int msgPriority, int numChunks, int lowerRange, int upperRange, 
46                                     void *redResult, REDUCTION_TYPE type) {
47                                         
48     double _start; //may be used for tracing
49     
50     if(numChunks > MAX_CHUNKS){ 
51         CkPrintf("NodeHelper[%d]: WARNING! chunk is set to MAX_CHUNKS=%d\n", CmiMyPe(), MAX_CHUNKS);
52         numChunks = MAX_CHUNKS;
53     }
54         
55     /* "stride" determines the number of loop iterations to be done in each chunk
56      * for chunk indexed at 0 to remainder-1, stride is "unit+1";
57      * for chunk indexed at remainder to numChunks-1, stride is "unit"
58      */
59      int stride;
60     
61     //for using nodequeue
62         TRACE_START(20);
63         
64         FuncSingleHelper *thisHelper = helperPtr[CkMyRank()];
65         CurLoopInfo *curLoop = thisHelper->curLoop;
66         curLoop->set(numChunks, func, lowerRange, upperRange, paramNum, param);
67         ConverseNotifyMsg *notifyMsg = &(notifyMsgs[CmiMyRank()]);
68         notifyMsg->ptr = (void *)curLoop;
69         
70         if(useTreeBcast){
71                 notifyMsg->srcRank = CmiMyRank();
72                 int loopTimes = TREE_BCAST_BRANCH>(CmiMyNodeSize()-1)?CmiMyNodeSize()-1:TREE_BCAST_BRANCH;
73                 //just implicit binary tree
74                 int pe = CmiMyRank()+1;        
75                 for(int i=0; i<loopTimes; i++, pe++){
76                         if(pe >= CmiMyNodeSize()) pe -= CmiMyNodeSize();
77                         CmiPushPE(pe, (void *)(notifyMsg));    
78                 }
79         }else{
80                 for (int i=0; i<numHelpers; i++) {
81                         if (i!=CkMyRank()) CmiPushPE(i, (void *)(notifyMsg));            
82                 }
83         }
84
85         curLoop->stealWork();
86         TRACE_BRACKET(20);
87         
88         TRACE_START(21);                
89         curLoop->waitLoopDone();
90         TRACE_BRACKET(21);        
91
92     if (type!=NODEHELPER_NONE)
93         reduce(curLoop->getRedBufs(), redResult, type, numChunks);            
94     return;
95 }
96
97 #define COMPUTE_REDUCTION(T) {\
98     for(int i=0; i<numChunks; i++) {\
99      result += *((T *)(redBufs[i])); \
100      /*CkPrintf("Nodehelper Reduce: %d\n", result);*/ \
101     }\
102 }
103
104 void FuncNodeHelper::reduce(void **redBufs, void *redBuf, REDUCTION_TYPE type, int numChunks) {
105     switch(type){
106         case NODEHELPER_INT_SUM:
107         {
108             int result=0;
109             COMPUTE_REDUCTION(int)
110             *((int *)redBuf) = result;
111             break;
112         }
113         case NODEHELPER_FLOAT_SUM:
114         {
115             float result=0;
116             COMPUTE_REDUCTION(float)
117             *((float *)redBuf) = result;
118             break;
119         }
120         case NODEHELPER_DOUBLE_SUM:
121         {
122             double result=0;
123             COMPUTE_REDUCTION(double)
124             *((double *)redBuf) = result;
125             break;
126         }
127         default:
128         break;
129     }
130 }
131
132 void SingleHelperStealWork(ConverseNotifyMsg *msg){
133         
134         int srcRank = msg->srcRank;
135         
136         if(srcRank >= 0){
137                 //means using tree-broadcast to send the notification msg
138                 
139                 //int numHelpers = CmiMyNodeSize(); //the value of "numHelpers" should be obtained somewhere else
140                 int relPE = CmiMyRank()-msg->srcRank;
141                 if(relPE<0) relPE += CmiMyNodeSize();
142                 
143                 //CmiPrintf("Rank[%d]: got msg from src %d with relPE %d\n", CmiMyRank(), msg->srcRank, relPE);
144                 relPE=relPE*TREE_BCAST_BRANCH+1;
145                 for(int i=0; i<TREE_BCAST_BRANCH; i++, relPE++){
146                         if(relPE >= CmiMyNodeSize()) break;
147                         int pe = (relPE + msg->srcRank)%CmiMyNodeSize();
148                         //CmiPrintf("Rank[%d]: send msg to dst %d (relPE: %d) from src %d\n", CmiMyRank(), pe, relPE, msg->srcRank);
149                         CmiPushPE(pe, (void *)msg);
150                 }
151         }
152     CurLoopInfo *loop = (CurLoopInfo *)msg->ptr;
153     loop->stealWork();
154 }
155
156 void CurLoopInfo::stealWork(){
157     //indicate the current work hasn't been initialized
158     //or the old work has finished.
159     if(inited == 0) return;
160     
161     int first, last;
162     int unit = (upperIndex-lowerIndex+1)/numChunks;
163     int remainder = (upperIndex-lowerIndex+1)-unit*numChunks;
164     int markIdx = remainder*(unit+1);
165     
166     int nextChunkId = getNextChunkIdx();
167     while(nextChunkId < numChunks){
168         if(nextChunkId < remainder){
169             first = (unit+1)*nextChunkId;
170             last = first+unit;
171         }else{
172             first = (nextChunkId - remainder)*unit + markIdx;
173             last = first+unit-1;
174         }
175                 
176         fnPtr(first, last, redBufs[nextChunkId], paramNum, param);
177         reportFinished();
178         
179         nextChunkId = getNextChunkIdx();
180     }
181 }
182
183 //======================================================================//
184 //   End of functions related with FuncSingleHelper                     //
185 //======================================================================//
186
187 CProxy_FuncNodeHelper NodeHelper_Init(){
188     CkPrintf("NodeHelperLib is used in SMP with a simple dynamic scheduling but not using node-level queue\n");
189     return CProxy_FuncNodeHelper::ckNew();
190 }
191
192 void NodeHelper_Parallelize(CProxy_FuncNodeHelper nodeHelper, HelperFn func, 
193                         int paramNum, void * param, int msgPriority,
194                         int numChunks, int lowerRange, int upperRange, 
195                         void *redResult, REDUCTION_TYPE type)
196 {
197     nodeHelper[CkMyNode()].ckLocalBranch()->parallelizeFunc(func, paramNum, param, msgPriority, numChunks, lowerRange, upperRange, redResult, type);
198 }
199
200 #include "NodeHelper.def.h"