pipelined allreduce for large messages implemented, use -D_PIPELINED_ALLREDUCE_ to...
[charm.git] / src / ck-core / ckcallback.h
1 /*
2 A CkCallback is a simple way for a library to return data 
3 to a wide variety of user code, without the library having
4 to handle all 17 possible cases.
5
6 This object is implemented as a union, so the entire object
7 can be sent as bytes.  Another option would be to use a virtual 
8 "send" method.
9
10 Initial version by Orion Sky Lawlor, olawlor@acm.org, 2/8/2002
11 */
12 #ifndef _CKCALLBACK_H_
13 #define _CKCALLBACK_H_
14
15 #include "conv-ccs.h" /*for CcsDelayedReply struct*/
16
17 typedef void (*CkCallbackFn)(void *param,void *message);
18 typedef void (*Ck1CallbackFn)(void *message);
19
20 class CProxyElement_ArrayBase; /*forward declaration*/
21 class CProxyElement_Group; /*forward declaration*/
22 class CProxy_NodeGroup;
23 class Chare;
24 class Group;
25 class NodeGroup;
26 class ArrayElement;
27
28 #define CkSelfCallback(ep)  CkCallback(this, ep)
29
30 class CkCallback {
31 public:
32 #ifdef _PIPELINED_ALLREDUCE_
33         friend class ArrayElement;
34 #endif
35         typedef enum {
36         invalid=0, //Invalid callback
37         ignore, //Do nothing
38         ckExit, //Call ckExit
39         resumeThread, //Resume a waiting thread (d.thread)
40         callCFn, //Call a C function pointer with a parameter (d.cfn)
41         call1Fn, //Call a C function pointer on any processor (d.c1fn)
42         sendChare, //Send to a chare (d.chare)
43         sendGroup, //Send to a group (d.group)
44         sendNodeGroup, //Send to a nodegroup (d.group)
45         sendArray, //Send to an array (d.array)
46         isendChare, //Inlined send to a chare (d.chare)
47         isendGroup, //Inlined send to a group (d.group)
48         isendNodeGroup, //Inlined send to a nodegroup (d.group)
49         isendArray, //Inlined send to an array (d.array)
50         bcastGroup, //Broadcast to a group (d.group)
51         bcastNodeGroup, //Broadcast to a nodegroup (d.group)
52         bcastArray, //Broadcast to an array (d.array)
53         replyCCS // Reply to a CCS message (d.ccsReply)
54         } callbackType;
55 private:
56         union callbackData {
57         struct s_thread { //resumeThread
58                 int onPE; //Thread is waiting on this PE
59                 int cb; //The suspending callback (0 if already done)
60                 CthThread th; //Thread to resume (NULL if none waiting)
61                 void *ret; //Place to put the returned message
62         } thread;
63         struct s_cfn { //callCFn
64                 int onPE; //Call on this PE
65                 CkCallbackFn fn; //Function to call
66                 void *param; //User parameter
67         } cfn;
68         struct s_c1fn { //call1Fn
69                 Ck1CallbackFn fn; //Function to call on whatever processor
70         } c1fn;
71         struct s_chare { //sendChare
72                 int ep; //Entry point to call
73                 CkChareID id; //Chare to call it on
74         } chare;
75         struct s_group { //(sendGroup, bcastGroup)
76                 int ep; //Entry point to call
77                 CkGroupID id; //Group to call it on
78                 int onPE; //Processor to send to (if any)
79         } group;
80         struct s_array { //(sendArray, bcastArray)
81                 int ep; //Entry point to call
82                 CkGroupID id; //Array ID to call it on
83                 CkArrayIndexBase idx; //Index to send to (if any)
84         } array;
85         struct s_ccsReply {
86                 CcsDelayedReply reply;
87         } ccsReply;
88         };
89 public: 
90         callbackType type;
91         callbackData d;
92         
93         void impl_thread_init(void);
94         void *impl_thread_delay(void) const;
95
96         CkCallback(void) {
97 #if CMK_REPLAYSYSTEM
98       bzero(this, sizeof(CkCallback));
99 #endif
100       type=invalid;
101         }
102         //This is how you create ignore, ckExit, and resumeThreads:
103         CkCallback(callbackType t) {
104 #if CMK_REPLAYSYSTEM
105           bzero(this, sizeof(CkCallback));
106 #endif
107           if (t==resumeThread) impl_thread_init();
108           type=t;
109         }
110
111     // Call a C function on the current PE
112         CkCallback(Ck1CallbackFn fn) {
113 #if CMK_REPLAYSYSTEM
114       bzero(this, sizeof(CkCallback));
115 #endif
116       type=call1Fn;
117           d.c1fn.fn=fn;
118         }
119
120     // Call a C function on the current PE
121         CkCallback(CkCallbackFn fn,void *param) {
122 #if CMK_REPLAYSYSTEM
123       bzero(this, sizeof(CkCallback));
124 #endif
125       type=callCFn;
126           d.cfn.onPE=CkMyPe(); d.cfn.fn=fn; d.cfn.param=param;
127         }
128
129     // Call a chare entry method
130         CkCallback(int ep,const CkChareID &id,CmiBool doInline=CmiFalse) {
131 #if CMK_REPLAYSYSTEM
132       bzero(this, sizeof(CkCallback));
133 #endif
134       type=doInline?isendChare:sendChare;
135           d.chare.ep=ep; d.chare.id=id;
136         }
137
138     // Bcast to nodegroup
139         CkCallback(int ep,const CProxy_NodeGroup &ngp);
140
141     // Bcast to a group or nodegroup
142         CkCallback(int ep,const CkGroupID &id, int isNodeGroup=0) {
143 #if CMK_REPLAYSYSTEM
144       bzero(this, sizeof(CkCallback));
145 #endif
146       type=isNodeGroup?bcastNodeGroup:bcastGroup;
147           d.group.ep=ep; d.group.id=id;
148         }
149
150     // Send to nodegroup element
151         CkCallback(int ep,int onPE,const CProxy_NodeGroup &ngp,CmiBool doInline=CmiFalse);
152
153     // Send to group/nodegroup element
154         CkCallback(int ep,int onPE,const CkGroupID &id,CmiBool doInline=CmiFalse, int isNodeGroup=0) {
155 #if CMK_REPLAYSYSTEM
156       bzero(this, sizeof(CkCallback));
157 #endif
158       type=doInline?(isNodeGroup?isendNodeGroup:isendGroup):(isNodeGroup?sendNodeGroup:sendGroup); 
159       d.group.ep=ep; d.group.id=id; d.group.onPE=onPE;
160         }
161
162     // Send to specified group element
163         CkCallback(int ep,const CProxyElement_Group &grpElt,CmiBool doInline=CmiFalse);
164         
165     // Bcast to array
166         CkCallback(int ep,const CkArrayID &id) {
167 #if CMK_REPLAYSYSTEM
168       bzero(this, sizeof(CkCallback));
169 #endif
170       type=bcastArray;
171           d.array.ep=ep; d.array.id=id;
172         }
173
174     // Send to array element
175         CkCallback(int ep,const CkArrayIndex &idx,const CkArrayID &id,CmiBool doInline=CmiFalse) {
176 #if CMK_REPLAYSYSTEM
177       bzero(this, sizeof(CkCallback));
178 #endif
179       type=doInline?isendArray:sendArray;
180           d.array.ep=ep; d.array.id=id; d.array.idx = idx;
181         }
182
183     // Bcast to array
184         CkCallback(int ep,const CProxyElement_ArrayBase &arrElt,CmiBool doInline=CmiFalse);
185
186     // Send to chare
187         CkCallback(Chare *p, int ep, CmiBool doInline=CmiFalse);
188
189     // Send to group element on current PE
190         CkCallback(Group *p, int ep, CmiBool doInline=CmiFalse);
191
192     // Send to nodegroup element on current node
193         CkCallback(NodeGroup *p, int ep, CmiBool doInline=CmiFalse);
194
195     // Send to specified array element 
196         CkCallback(ArrayElement *p, int ep,CmiBool doInline=CmiFalse);
197
198         CkCallback(const CcsDelayedReply &reply) {
199 #if CMK_REPLAYSYSTEM
200       bzero(this, sizeof(CkCallback));
201 #endif
202       type=replyCCS;
203           d.ccsReply.reply=reply;
204         }
205
206         ~CkCallback() {
207           thread_destroy();
208         }
209         
210         int isInvalid(void) const {return type==invalid;}
211
212 /**
213  * Interface used by threaded callbacks:
214  * Libraries should call these from their "start" entry points.
215  * Use "return cb.thread_delay()" to suspend the thread before
216  * the return.
217  * It's a no-op for everything but threads.
218  */
219         void *thread_delay(void) const {
220                 if (type==resumeThread) return impl_thread_delay();
221                 return NULL;
222         }
223
224         void thread_destroy() const;
225         
226 /**
227  * Send this message back to the caller.
228  *
229  * Libraries should call this from their "done" entry points.
230  * It takes the given message and handles it appropriately.
231  * After the send(), this callback is finished and cannot be reused.
232  */
233         void send(void *msg=NULL) const;
234         
235 /**
236  * Send this data, formatted as a CkDataMsg, back to the caller.
237  */
238         void send(int length,const void *data) const;
239         
240         void pup(PUP::er &p);
241 };
242 //PUPbytes(CkCallback) //FIXME: write a real pup routine
243
244 /**
245  * Convenience class: a thread-suspending callback.  
246  * Makes sure the thread actually gets delayed, even if the 
247  *   library can't or won't call "thread_delay".
248  * The return value is lost, so your library needs to call
249  *   thread_delay itself if you want a return value.
250  * Modification Filippo: Passing in an pointer argument, the return
251  *   value will be stored in that pointer 
252  */
253 class CkCallbackResumeThread : public CkCallback {
254  protected: void ** result;
255  public:
256         CkCallbackResumeThread(void)
257                 :CkCallback(resumeThread) { result = NULL; }
258         CkCallbackResumeThread(void * &ptr)
259             :CkCallback(resumeThread) { result = &ptr; }
260         ~CkCallbackResumeThread(void);
261 };
262
263 void _registerCkCallback(void); //used by init
264
265 void CkCallbackInit();
266
267 #endif
268
269
270