3be214d6be4d0abe6bce5268f0a3c452dd962d83
[charm.git] / NodeHelper.C
1 #include "NodeHelper.h"
2 #define THRESHOLD 100
3 #define WPSTHRESHOLD 400
4 #define SMP_SUM 1
5
6 NodeQueue Q;
7
8 //vars local to spawned threads
9 //Note: __thread is not portable, but works pretty much anywhere pthreads work.
10 // after C++11 this should be thread_local
11 __thread pthread_mutex_t lock;
12 __thread pthread_cond_t condition;
13
14 //vars to the main flow (master thread)
15 pthread_t * threads;
16 pthread_mutex_t **allLocks;     
17 pthread_cond_t **allConds;
18
19 //global barrier
20 pthread_mutex_t gLock;
21 pthread_cond_t gCond;
22 pthread_barrier_t barr;
23 //testing counter
24 volatile int finishedCnt;
25
26 void * threadWork(void * id){
27         long my_id =(long) id;
28         //printf("thread :%ld\n",my_id);
29         CmiSetCPUAffinity(my_id+1);
30         
31         pthread_mutex_init(&lock, NULL);
32         pthread_cond_init(&condition, NULL);
33         
34         allLocks[my_id] = &lock;
35         allConds[my_id] = &condition;
36         
37         while(1){
38                 pthread_mutex_lock(&lock);
39                 pthread_cond_wait(&condition,&lock);
40                 pthread_mutex_unlock(&lock);
41                 void * r;
42                 Task * one;
43                 CmiLock(Q->lock);
44                 CqsDequeue(Q->nodeQ,&r);
45                 CmiUnlock(Q->lock);
46                 one=(Task *)r;
47             
48                 while(one) {
49                         //printf("starttime:%lf,id:%ld,proc:%d\n",CmiWallTimer(),my_id,CkMyPe());
50                         (one->fnPtr)(one->first,one->last,one->result,one->paramNum, one->param);
51                         pthread_barrier_wait(&barr);
52                         //one->setFlag();
53                         //printf
54                         //printf("endtime:%lf,id:%ld\n",CmiWallTimer(),my_id);
55                         
56                         //Testing
57                         //AtomicIncrement(finishedCnt);
58                         if(my_id==0)
59                                 finishedCnt=4;
60                         //printf("finishedCnt = %d\n", finishedCnt);
61                         
62                         CmiLock((Q->lock));
63                         CqsDequeue(Q->nodeQ,&r);
64                         CmiUnlock((Q->lock));
65                         one=(Task *)r;
66                         
67                 }       
68         }
69         
70 }
71
72 FuncNodeHelper::FuncNodeHelper(int mode_o,int nElements, int threadNum_o){
73         mode=mode_o;
74         threadNum=threadNum_o;
75         numHelpers = CkMyNodeSize();
76                 traceRegisterUserEvent("assign work",20);       
77                 traceRegisterUserEvent("finish signal",21);     
78 #if CMK_SMP
79         if(mode==1){
80                 counter=new int[nElements];
81                 reqLock=new  pthread_mutex_t *[nElements];
82                 for(int i=0;i<nElements;i++){
83                         counter[i]=0;
84                         reqLock[i] = CmiCreateLock();
85                 }
86         }else if(mode==2){
87                 helperArr = new CkChareID[numHelpers];
88                 helperPtr = new FuncSingleHelper *[numHelpers];
89                 int pestart = CkNodeFirst(CkMyNode());
90                 for(int i=0; i<numHelpers; i++) {
91                         CProxy_FuncSingleHelper::ckNew(i, thisgroup, &helperArr[i], pestart+i);    
92                         helperPtr[i] = NULL;
93                 }
94                 for(int i=0; i<numHelpers; i++) {
95                         CProxy_FuncSingleHelper helpProxy(helperArr[i]);
96                         helpProxy.reportCreated();
97                 }
98         }
99 #endif
100 }
101 void FuncNodeHelper::createThread(){
102
103                 pthread_attr_t attr;
104                 finishedCnt=0;
105                 pthread_barrier_init(&barr,NULL,threadNum);     
106                 allLocks = (pthread_mutex_t **)malloc(sizeof(void *)*threadNum);
107                 allConds = (pthread_cond_t **)malloc(sizeof(void *)*threadNum);
108                 memset(allLocks, 0, sizeof(void *)*threadNum);  
109                 memset(allConds, 0, sizeof(void *)*threadNum);  
110                 
111                 pthread_attr_init(&attr);
112                 pthread_attr_setdetachstate(&attr,PTHREAD_CREATE_JOINABLE);
113                 Q=(NodeQueue )malloc(sizeof(struct SimpleQueue));
114                         Q->nodeQ=CqsCreate();
115                         Q->lock=CmiCreateLock();
116                 /*for(int i=0;i<threadNum;i++){
117                         //Q[i]=
118                         Q[i]=(NodeQueue)malloc(sizeof(struct SimpleQueue));
119                         Q[i]->nodeQ=CqsCreate();
120                         Q[i]->lock=CmiCreateLock();
121                 }*/
122                 threads=(pthread_t *)malloc(threadNum*sizeof(pthread_t));
123                 
124         
125                 //create queue;
126                 for(int i=0;i<threadNum;i++)
127                         pthread_create(&threads[i],&attr,threadWork,(void *)i);
128                 
129 }
130 void FuncNodeHelper::send(Task * msg){
131         (msg->fnPtr)(msg->first,msg->last,msg->result,msg->paramNum, msg->param);
132         CmiLock(reqLock[msg->master]);
133         counter[msg->master]++;
134         CmiUnlock(reqLock[msg->master]);
135 }
136
137 /**
138 "func": the function that executes one partition of the task.
139
140 "wps": the number of executions needed on this func (it is there for the purpose of testing.)
141 "time": the time to finish this func, it is a user defined argument for testing, wps is calculated based on time.
142 (the above two parameters could be ignored for real usage)
143
144 "t": the priority bit for the nodegroup msg, it is the time initiating the nodehelper.
145
146 "master": the arrayIndex of the array element that initiate the nodehelper, or the pe index if invoked on a group object. The master element is used to identify the corresponding lock and counter to find out whether all the partitioned tasks have been finished.
147
148 "chunck": the number of tasks that the nodehelper will distribute among the nodegroup. If it not available through the user argument, it will calculate like:
149 if(chunck==0){
150          if(time!=0)
151              chunck=(double)(time/THRESHOLD)+0.5;
152          else
153              chunck=(double)(wps/WPSTHRESHOLD)+0.5;
154   }
155
156 "reduction": whether it is an reduction operation, it will return the reduction result if it is.
157 "type": the reduction type.
158
159 "MODE" means whether the scheduling is static or dynamic. If static, then the partitioned tasks will be assigned to threads evenly within the SMP nodes. If dynamic, then the threads will execute the task if they are idle at the time of the loop job. 
160  * 
161  */
162 int FuncNodeHelper::parallelizeFunc(HelperFn func, int wps,unsigned int t, int master,int chunck,int time,int paramNum, void * param, int reduction, int type){
163         int result=0;
164         if(chunck==0){
165                 if(time!=0)
166                         chunck=(double)(time/THRESHOLD)+0.5;
167                 else
168                         chunck=(double)(wps/WPSTHRESHOLD)+0.5;
169         }
170         int unit=((double)wps)/(double)chunck+0.5;
171         //printf("use %d chuncks for testcase %d\n",chunck,master);
172         Task **task=new Task *[chunck];
173     //for using nodequeue 
174 #if CMK_SMP
175         if(mode==1){
176 #if 0
177 //Note: CsdScheduleNodePoll has not been in the charm yet, so currently disable it
178                 CProxy_FuncNodeHelper fh(thisgroup);
179                 double _start=CmiWallTimer();
180                 for(int i=0; i<chunck; i++) {
181                         int first=unit*i;
182                         int last= (i==chunck-1)?wps:(i+1)*unit-1;
183                         task[i]=new (8*sizeof(int)) Task(func,first,last,master, paramNum, param);
184                         *((int *)CkPriorityPtr(task[i]))=t;
185                         CkSetQueueing(task[i],CK_QUEUEING_IFIFO);
186                         fh[CkMyNode()].send(task[i]);
187                         //send(task[i]);
188                 }
189                 traceUserBracketEvent(20,_start,CmiWallTimer());
190                 _start=CmiWallTimer();
191                 while(counter[master]!=chunck)
192                         CsdScheduleNodePoll();
193                 //CkPrintf("counter:%d,master:%d\n",counter[master],master);
194                 traceUserBracketEvent(21,_start,CmiWallTimer());
195                 counter[master]=0;
196                 /*for(int i=0;i<chunck;i++){
197                         result+=task[i]->result;
198                 }*/
199 #endif
200         }
201         else if(mode==2){
202         // for not using node queue
203                 for(int i=0; i<chunck; i++) {
204                         int first=unit*i;
205                         int last= (i==chunck-1)?wps:(i+1)*unit-1;
206                         task[i] = new Task(func,first,last,0,master,paramNum, param);
207                         //task[i]->UnsetFlag();
208                         helperPtr[i%CkMyNodeSize()]->enqueueWork(task[i],t);
209                 }
210                 for(int i=0; i<numHelpers; i++) {        
211                         if(i!=CkMyRank()){
212                                 CProxy_FuncSingleHelper helpProxy(helperArr[i]);
213                                 helpProxy.processWork();
214                         }
215                 }
216                 helperPtr[CkMyRank()]->processWork();
217   
218                 waitDone(task,chunck);
219                 result=0;
220                 
221                 /*for(int i=0;i<chunck;i++){
222                 result+=(task[i]->result);
223                 }*/
224         }
225 #else
226         if(mode==0){
227                 for(int i=0;i<chunck;i++)
228                 {
229                         int first = unit*i;
230                         int last=(i==chunck-1)?wps:(i+1)*unit-1;
231                         task[i]=new Task(func,first,last,0,master, paramNum, param);
232                         CmiLock((Q->lock));
233                         unsigned int t=(int)(CmiWallTimer()*1000);
234                         CqsEnqueueGeneral((Q->nodeQ), (void *)task[i],CQS_QUEUEING_IFIFO,0,&t);
235                         CmiUnlock((Q->lock));                           
236                 }
237                 //signal the thread
238                 for(int i=0;i<threadNum;i++){
239                         pthread_mutex_lock(allLocks[i]);
240                         pthread_cond_signal(allConds[i]);
241                         pthread_mutex_unlock(allLocks[i]);
242                 }
243                 //wait for the result
244                 waitThreadDone(chunck);
245                 //for(int i=0;i<threadNum;i++)
246                 //      pthread_join(threads[i],NULL);
247                 /*result=0;
248                 for(int i=0;i<chunck;i++)
249                         result+=task[i]->result;
250                 */
251         }
252 #endif  
253         if(reduction==1)
254                 result=reduce(task, type,chunck);
255         delete task;
256         return result;
257 }
258
259 //======================================================================
260 // Functions regarding helpers that parallelize a single function on a
261 // single node (like OpenMP)
262 void FuncSingleHelper::processWork(){
263         //CmiLock(reqLock);
264         void *r;
265     Task *one; // = (WorkReqEntry *)SimpleQueuePop(reqQ);
266         CmiLock(reqLock);       
267         CqsDequeue(reqQ,&r);
268         CmiUnlock(reqLock);
269         one=(Task *)r;
270     
271     while(one) {
272         (one->fnPtr)(one->first,one->last,one->result, one->paramNum, one->param);
273         one->setFlag();
274                 CmiLock(reqLock);
275                 CqsDequeue(reqQ,&r);
276                 CmiUnlock(reqLock);
277                 one=(Task *)r;
278                 
279     }    
280 }
281
282 void FuncSingleHelper::reportCreated(){
283     CProxy_FuncNodeHelper fh(nodeHelperID);
284     CProxy_FuncSingleHelper thisproxy(thishandle);
285     fh[CkMyNode()].ckLocalBranch()->oneHelperCreated(id, thishandle, this);
286 }
287
288 void FuncNodeHelper::waitDone(Task ** thisReq,int chunck){
289     int flag = 1,i;
290     while(1) {
291         for(i=0; i<chunck; i++) 
292             flag = flag & thisReq[i]->isFlagSet();
293         if(flag) break;
294         flag = 1;
295     }
296 }
297 int FuncNodeHelper::reduce(Task ** thisReq, int type,int chunck){
298         int result=0,i;
299         if(type==SMP_SUM){
300                 for(i=0;i<chunck;i++){
301                         result+=thisReq[i]->result;
302                 //CkPrintf("result:%d\n",result);
303                 }
304         }
305         return result;
306 }
307 void FuncNodeHelper::waitThreadDone(int chunck){
308         while(finishedCnt!=chunck);
309         finishedCnt=0;
310 }
311
312 #include "NodeHelper.def.h"