fa41dcc5e95a3a83d999142b3c62841ae0c44dfa
[charm.git] / src / ck-core / ckcallback.C
1 /*
2 A CkCallback is a simple way for a library to return data 
3 to a wide variety of user code, without the library having
4 to handle all 17 possible cases.
5
6 This object is implemented as a union, so the entire object
7 can be sent as bytes.  Another option would be to use a virtual 
8 "send" method.
9
10 Initial version by Orion Sky Lawlor, olawlor@acm.org, 2/8/2002
11 */
12 #include "charm++.h"
13 #include "ckcallback-ccs.h"
14 #include "CkCallback.decl.h"
15 #include "envelope.h"
16
17 /*readonly*/ CProxy_ckcallback_group _ckcallbackgroup;
18
19 typedef CkHashtableT<CkHashtableAdaptorT<unsigned int>, CkCallback*> threadCB_t;
20 CpvStaticDeclare(threadCB_t, threadCBs);
21 CpvStaticDeclare(unsigned int, nextThreadCB);
22
23 //This main chare is only used to create the callback forwarding group
24 class ckcallback_main : public CBase_ckcallback_main {
25 public:
26         ckcallback_main(CkArgMsg *m) {
27                 _ckcallbackgroup=CProxy_ckcallback_group::ckNew();
28                 delete m;
29         }
30 };
31
32 //The callback group is used to forward a callback to the processor
33 // it originated from.
34 class ckcallback_group : public CBase_ckcallback_group {
35 public:
36         ckcallback_group() { /*empty*/ }
37         ckcallback_group(CkMigrateMessage *m) { /*empty*/ }
38         void registerCcsCallback(const char *name,const CkCallback &cb);
39         void call(CkCallback &c,CkMarshalledMessage &msg) {
40                 c.send(msg.getMessage());
41         }
42 };
43
44 /*************** CkCallback implementation ***************/
45 //Initialize the callback's thread fields before sending it off:
46 void CkCallback::impl_thread_init(void)
47 {
48     int exist;
49     CkCallback **cb;
50     d.thread.onPE=CkMyPe();
51         do {
52           if (CpvAccess(nextThreadCB)==0) CpvAccess(nextThreadCB)=1;
53           d.thread.cb=CpvAccess(nextThreadCB)++;
54           cb = &CpvAccess(threadCBs).put(d.thread.cb, &exist);
55         } while (exist==1);
56         *cb = this; //<- so we can find this structure later
57         d.thread.th=NULL; //<- thread isn't suspended yet
58         d.thread.ret=(void*)-1;//<- no data to return yet
59 }
60
61 //Actually suspend this thread
62 void *CkCallback::impl_thread_delay(void) const
63 {
64         if (type!=resumeThread) 
65                 CkAbort("Called impl_thread_delay on non-threaded callback");
66         if (CkMyPe()!=d.thread.onPE)
67                 CkAbort("Called thread_delay on different processor than where callback was created");
68         
69         //Find the original callback object:
70         CkCallback *dest=(CkCallback *)this;
71         if (d.thread.cb!=0) dest=CpvAccess(threadCBs).get(d.thread.cb);
72         if (dest==0)
73             CkAbort("Called thread_delay on an already deleted callback");
74         if (dest->d.thread.ret==(void*)-1) 
75         {  //We need to sleep for the result:
76                 dest->d.thread.th=CthSelf(); //<- so we know a thread is waiting
77                 CthSuspend();
78                 if (dest->d.thread.ret==(void*)-1) 
79                         CkAbort("thread resumed, but callback data is still empty");
80         }
81         return dest->d.thread.ret;
82 }
83
84
85 /*These can't be defined in the .h file like the other constructors
86  * because we need CkCallback before CProxyElement* are defined.
87  */
88 CkCallback::CkCallback(Chare *p, int ep, CmiBool doInline) {
89 #ifndef CMK_OPTIMIZE
90       bzero(this, sizeof(CkCallback));
91 #endif
92       type=doInline?isendChare:sendChare;
93         d.chare.ep=ep; 
94         d.chare.id=p->ckGetChareID();
95 }
96 CkCallback::CkCallback(Group *p, int ep, CmiBool doInline) {
97 #ifndef CMK_OPTIMIZE
98       bzero(this, sizeof(CkCallback));
99 #endif
100       type=doInline?isendGroup:sendGroup;
101         d.group.ep=ep; d.group.id=p->ckGetGroupID(); d.group.onPE=CkMyPe();
102 }
103 CkCallback::CkCallback(NodeGroup *p, int ep, CmiBool doInline) {
104 #ifndef CMK_OPTIMIZE
105       bzero(this, sizeof(CkCallback));
106 #endif
107       type=doInline?isendNodeGroup:sendNodeGroup;
108         d.group.ep=ep; d.group.id=p->ckGetGroupID(); d.group.onPE=CkMyNode();
109 }
110
111 CkCallback::CkCallback(int ep,const CProxy_NodeGroup &ngp) {
112 #ifndef CMK_OPTIMIZE
113       bzero(this, sizeof(CkCallback));
114 #endif
115       type=bcastNodeGroup;
116         d.group.ep=ep; d.group.id=ngp.ckGetGroupID();
117 }
118
119 CkCallback::CkCallback(int ep,int onPE,const CProxy_NodeGroup &ngp,CmiBool doInline) {
120 #ifndef CMK_OPTIMIZE
121       bzero(this, sizeof(CkCallback));
122 #endif
123       type=doInline?isendNodeGroup:sendNodeGroup;
124         d.group.ep=ep; d.group.id=ngp.ckGetGroupID(); d.group.onPE=onPE;
125 }
126
127 CkCallback::CkCallback(int ep,const CProxyElement_Group &grpElt,CmiBool doInline) {
128 #ifndef CMK_OPTIMIZE
129       bzero(this, sizeof(CkCallback));
130 #endif
131       type=doInline?isendGroup:sendGroup;
132         d.group.ep=ep; 
133         d.group.id=grpElt.ckGetGroupID(); 
134         d.group.onPE=grpElt.ckGetGroupPe();
135 }
136 CkCallback::CkCallback(int ep,const CProxyElement_ArrayBase &arrElt,CmiBool doInline) {
137 #ifndef CMK_OPTIMIZE
138       bzero(this, sizeof(CkCallback));
139 #endif
140       type=doInline?isendArray:sendArray;
141         d.array.ep=ep; 
142         d.array.id=arrElt.ckGetArrayID(); 
143         d.array.idx = arrElt.ckGetIndex();
144 }
145
146 CkCallback::CkCallback(ArrayElement *p, int ep,CmiBool doInline) {
147 #ifndef CMK_OPTIMIZE
148       bzero(this, sizeof(CkCallback));
149 #endif
150       type=doInline?isendArray:sendArray;
151     d.array.ep=ep; 
152         d.array.id=p->ckGetArrayID(); 
153         d.array.idx = p->ckGetArrayIndex();
154 }
155
156 void CkCallback::send(int length,const void *data) const
157 {
158         send(CkDataMsg::buildNew(length,data));
159 }
160
161 /*Libraries should call this from their "done" entry points.
162   It takes the given message and handles it appropriately.
163   After the send(), this callback is finished and cannot be reused.
164 */
165 void CkCallback::send(void *msg) const
166 {
167         switch(type) {
168         case ignore: //Just ignore the callback
169                 if (msg) CkFreeMsg(msg);
170                 break;
171         case ckExit: //Call ckExit
172                 if (msg) CkFreeMsg(msg);
173                 CkExit();
174                 break;
175         case resumeThread: //Resume a waiting thread
176                 if (d.thread.onPE==CkMyPe()) {
177                         CkCallback *dest=CpvAccess(threadCBs).get(d.thread.cb);
178                         if (dest==0 || dest->d.thread.ret!=(void*)-1)
179                                 CkAbort("Already sent a value to this callback!\n");
180                         dest->d.thread.ret=msg; //<- return data
181                         if (dest->d.thread.th!=NULL)
182                                 CthAwaken(dest->d.thread.th);
183                 } 
184                 else //Forward message to processor where the thread actually lives
185                         _ckcallbackgroup[d.thread.onPE].call(*this,(CkMessage *)msg);
186                 break;
187         case call1Fn: //Call a C function pointer on the current processor
188                 (d.c1fn.fn)(msg);
189                 break;
190         case callCFn: //Call a C function pointer on the appropriate processor
191                 if (d.cfn.onPE==CkMyPe())
192                         (d.cfn.fn)(d.cfn.param,msg);
193                 else
194                         _ckcallbackgroup[d.cfn.onPE].call(*this,(CkMessage *)msg);
195                 break;
196         case sendChare: //Send message to a chare
197                 if (!msg) msg=CkAllocSysMsg();
198                 CkSendMsg(d.chare.ep,msg,&d.chare.id);
199                 break;
200         case isendChare: //inline send-to-chare
201                 if (!msg) msg=CkAllocSysMsg();
202                 CkSendMsgInline(d.chare.ep,msg,&d.chare.id);
203                 break;
204         case sendGroup: //Send message to a group element
205                 if (!msg) msg=CkAllocSysMsg();
206                 CkSendMsgBranch(d.group.ep,msg,d.group.onPE,d.group.id);
207                 break;
208         case sendNodeGroup: //Send message to a group element
209                 if (!msg) msg=CkAllocSysMsg();
210                 CkSendMsgNodeBranch(d.group.ep,msg,d.group.onPE,d.group.id);
211                 break;
212         case isendGroup: //inline send-to-group element
213                 if (!msg) msg=CkAllocSysMsg();
214                 CkSendMsgBranchInline(d.group.ep,msg,d.group.onPE,d.group.id);
215                 break;
216         case isendNodeGroup: //inline send-to-group element
217                 if (!msg) msg=CkAllocSysMsg();
218                 CkSendMsgNodeBranchInline(d.group.ep,msg,d.group.onPE,d.group.id);
219                 break;
220         case sendArray: //Send message to an array element
221                 if (!msg) msg=CkAllocSysMsg();
222                 CkSendMsgArray(d.array.ep,msg,d.array.id,d.array.idx.asChild());
223                 break;
224         case isendArray: //inline send-to-array element
225                 if (!msg) msg=CkAllocSysMsg();
226                 CkSendMsgArrayInline(d.array.ep,msg,d.array.id,d.array.idx.asChild());
227                 break;
228         case bcastGroup:
229                 if (!msg) msg=CkAllocSysMsg();
230                 CkBroadcastMsgBranch(d.group.ep,msg,d.group.id);
231                 break;
232         case bcastNodeGroup:
233                 if (!msg) msg=CkAllocSysMsg();
234                 CkBroadcastMsgNodeBranch(d.group.ep,msg,d.group.id);
235                 break;
236         case bcastArray:
237                 if (!msg) msg=CkAllocSysMsg();
238                 CkBroadcastMsgArray(d.array.ep,msg,d.array.id);
239                 break;
240         case replyCCS: { /* Send CkDataMsg as a CCS reply */
241                 void *data=NULL;
242                 int length=0;
243                 if (msg) {
244                         CkDataMsg *m=(CkDataMsg *)msg;
245                         m->check();
246                         data=m->getData();
247                         length=m->getLength();
248                 }
249                 CcsSendDelayedReply(d.ccsReply.reply,length,data);
250                 if (msg) CkFreeMsg(msg);
251                 } break;
252         case invalid: //Uninitialized
253                 CmiAbort("Called send on uninitialized callback");
254                 break;
255         default: //Out-of-bounds type code
256                 CmiAbort("Called send on corrupted callback");
257                 break;
258         };
259 }
260
261 void CkCallback::pup(PUP::er &p) {
262   //p((char*)this, sizeof(CkCallback));
263   int t = (int)type;
264   p|t;
265   type = (callbackType)t;
266   switch (type) {
267   case resumeThread:
268     p|d.thread.onPE;
269     p|d.thread.cb;
270     break;
271   case isendChare:
272   case sendChare:
273     p|d.chare.ep;
274     p|d.chare.id;
275     break;
276   case isendGroup:
277   case sendGroup:
278   case isendNodeGroup:
279   case sendNodeGroup:
280     p|d.group.onPE;
281   case bcastNodeGroup:
282   case bcastGroup:
283     p|d.group.ep;
284     p|d.group.id;
285     break;
286   case isendArray:
287   case sendArray:
288     p|d.array.idx;
289   case bcastArray:
290     p|d.array.ep;
291     p|d.array.id;
292     break;
293   case replyCCS:
294     p((char*)&d.ccsReply.reply, sizeof(d.ccsReply.reply));
295     break;
296   case call1Fn:
297     p((char*)&d.c1fn, sizeof(d.c1fn));
298     break;
299   case callCFn:
300     p((char*)&d.cfn, sizeof(d.cfn));
301     break;
302   case ignore:
303   case ckExit:
304   case invalid:
305     break;
306   default:
307     CkAbort("Inconsistent CkCallback type");
308   }
309 }
310
311 void CkCallback::thread_destroy() const {
312   if (type==resumeThread && CpvAccess(threadCBs).get(d.thread.cb)==this) {
313     CpvAccess(threadCBs).remove(d.thread.cb);
314   }
315 }
316
317 CkCallbackResumeThread::~CkCallbackResumeThread() {
318   void * res = thread_delay(); //<- block thread here if it hasn't already
319   if (result != NULL) *result = res;
320   else CkFreeMsg(res);
321   thread_destroy();
322 }
323
324 /****** Callback-from-CCS ******/
325
326 // This function is called by CCS when a request comes in-- it maps the 
327 // request to a Charm++ message and passes the message to its callback.
328 extern "C" void ccsHandlerToCallback(void *cbPtr,int reqLen,const void *reqData) 
329 {
330         CkCallback *cb=(CkCallback *)cbPtr;
331         CkCcsRequestMsg *msg=new (reqLen,0) CkCcsRequestMsg;
332         msg->reply=CcsDelayReply();
333         msg->length=reqLen;
334         memcpy(msg->data,reqData,reqLen);
335         cb->send(msg);
336 }
337
338 // Register this callback with CCS.
339 void ckcallback_group::registerCcsCallback(const char *name,const CkCallback &cb)
340 {
341         CcsRegisterHandlerFn(name,ccsHandlerToCallback,new CkCallback(cb));
342 }
343
344 // Broadcast this callback registration to all processors
345 void CcsRegisterHandler(const char *ccs_handlername,const CkCallback &cb) {
346         _ckcallbackgroup.registerCcsCallback(ccs_handlername,cb);
347 }
348
349 enum {dataMsgTag=0x7ed2beef};
350 CkDataMsg *CkDataMsg::buildNew(int length,const void *data)
351 {
352         CkDataMsg *msg=new (&length,0) CkDataMsg;
353         msg->length=length;
354         memcpy(msg->data,data,length);
355         msg->checkTag=dataMsgTag;
356         return msg;
357 }
358
359 void CkDataMsg::check(void)
360 {
361         if (checkTag!=dataMsgTag)
362                 CkAbort("CkDataMsg corrupted-- bad tag.");
363 }
364
365 void CkCallbackInit() {
366   CpvInitialize(threadCB_t, threadCBs);
367   CpvInitialize(unsigned int, nextThreadCB);
368   CpvAccess(nextThreadCB)=1;
369 }
370
371 #include "CkCallback.def.h"
372