fe356d5efd46c3e444deca38a1d04c19c711af1d
[charm.git] / NodeHelper.h
1 #ifndef _NODEHELPER_H
2 #define _NODEHELPER_H
3
4 #include <pthread.h>
5
6 #include "charm++.h"
7 #include "NodeHelper.decl.h"
8 #include <assert.h>
9 #include "queueing.h"
10 #include <converse.h>
11 #define AtomicIncrement(someInt)  __asm__ __volatile__("lock incl (%0)" :: "r" (&(someInt)))
12 #define SMP_SUM 1
13 typedef void (*HelperFn)(int first,int last, int &result, int paramNum, void * param_o);
14
15 typedef struct SimpleQueue
16 {
17         Queue nodeQ;
18         pthread_mutex_t * lock;
19 }* NodeQueue;
20 class Task:public CMessage_Task{
21 public:
22         HelperFn fnPtr;
23         int first;
24         int last;
25         int result;
26         int master;
27         int flag;
28         int reduction;
29         int paramNum;
30         void * param;
31         Task(HelperFn fn,int first_o,int last_o,int master_o){
32                 fnPtr=fn;
33                 first=first_o;
34                 last=last_o;
35                 master=master_o;
36         }
37         
38         Task(HelperFn fn,int first_o,int last_o,int flag_o,int master_o){
39                 fnPtr=fn;
40                 first=first_o;
41                 last=last_o;
42                 flag=flag_o;
43                 master=master_o;
44         }
45         Task(HelperFn fn,int first_o,int last_o,int master_o, int paramNum_o, void * param_o){
46                 fnPtr=fn;
47                 first=first_o;
48                 last=last_o;
49                 master=master_o;
50                 //reduction=reduction_o;
51                 paramNum=paramNum_o;
52                 param=param_o;
53         }
54         Task(HelperFn fn,int first_o,int last_o,int flag_o,int master_o, int paramNum_o, void * param_o){
55                 fnPtr=fn;
56                 first=first_o;
57                 last=last_o;
58                 master=master_o;
59                 flag=flag_o;
60                 //reduction=reduction_o;
61                 paramNum=paramNum_o;
62                 param=param_o;
63         }
64         void setFlag(){
65                 flag=1;
66         }
67         int isFlagSet(){
68                 return flag;
69         }
70 };
71
72
73
74 class FuncSingleHelper: public CBase_FuncSingleHelper{
75 private:
76     CkGroupID nodeHelperID;
77     int id;
78     Queue reqQ;
79     pthread_mutex_t* reqLock;
80
81 public:
82     FuncSingleHelper(int myid, CkGroupID nid):id(myid),nodeHelperID(nid){
83         //CkPrintf("Single helper %d is created on rank %d\n", myid, CkMyRank());
84         reqQ = CqsCreate();
85         reqLock = CmiCreateLock();
86     }
87
88     ~FuncSingleHelper(){}
89     FuncSingleHelper(CkMigrateMessage *m){}
90     void enqueueWork(Task *one,unsigned int t){
91         //CmiLock(reqLock);
92                 //unsigned int t;
93                 //t=(int)(CmiWallTimer()*1000);
94                 CmiLock(reqLock);
95                 CqsEnqueueGeneral(reqQ, (void *)one,CQS_QUEUEING_IFIFO,0,&t);
96         //SimpleQueuePush(reqQ, (char *)one);
97                 CmiUnlock(reqLock);
98     }
99     void processWork();
100     void reportCreated();
101 };
102 class FuncNodeHelper : public CBase_FuncNodeHelper{  
103
104 public:
105         int numHelpers;
106         int mode;
107         int * counter;
108         int threadNum;
109         pthread_mutex_t** reqLock;
110         CkChareID *helperArr;
111         FuncSingleHelper **helperPtr;
112         ~FuncNodeHelper(){
113                 delete [] helperArr;
114                 delete [] helperPtr;
115         }
116     
117         void oneHelperCreated(int hid, CkChareID cid, FuncSingleHelper* cptr){
118                 helperArr[hid] = cid;
119                 helperPtr[hid] = cptr;
120         }
121         
122         void  waitDone(Task ** thisReq,int chunck);
123         void waitThreadDone(int chunck);
124         void createThread();
125         FuncNodeHelper(int mode,int elements, int threadNum);
126         int parallelizeFunc(HelperFn func, int wps,unsigned int t, int master,int chunck,int time, int paramNum, void * param, int reduction, int type);
127         void send(Task *);
128         int reduce(Task ** thisReq, int type, int chunck);
129
130 };
131
132         
133 #endif