0288be1dc6bc984466b8fdc4fe41aafd4ed12888
[charm.git] / src / ck-core / ckcallback.C
1 /*
2 A CkCallback is a simple way for a library to return data 
3 to a wide variety of user code, without the library having
4 to handle all 17 possible cases.
5
6 This object is implemented as a union, so the entire object
7 can be sent as bytes.  Another option would be to use a virtual 
8 "send" method.
9
10 Initial version by Orion Sky Lawlor, olawlor@acm.org, 2/8/2002
11 */
12 #include "charm++.h"
13 #include "ckcallback-ccs.h"
14 #include "CkCallback.decl.h"
15 #include "envelope.h"
16 /*readonly*/ CProxy_ckcallback_group _ckcallbackgroup;
17
18 typedef CkHashtableT<CkHashtableAdaptorT<unsigned int>, CkCallback*> threadCB_t;
19 CpvStaticDeclare(threadCB_t, threadCBs);
20 CpvStaticDeclare(unsigned int, nextThreadCB);
21
22 //This main chare is only used to create the callback forwarding group
23 class ckcallback_main : public CBase_ckcallback_main {
24 public:
25         ckcallback_main(CkArgMsg *m) {
26                 _ckcallbackgroup=CProxy_ckcallback_group::ckNew();
27                 delete m;
28         }
29 };
30
31 //The callback group is used to forward a callback to the processor
32 // it originated from.
33 class ckcallback_group : public CBase_ckcallback_group {
34 public:
35         ckcallback_group() { /*empty*/ }
36         ckcallback_group(CkMigrateMessage *m) { /*empty*/ }
37         void registerCcsCallback(const char *name,const CkCallback &cb);
38         void call(CkCallback &c,CkMarshalledMessage &msg) {
39                 c.send(msg.getMessage());
40         }
41 };
42
43 /*************** CkCallback implementation ***************/
44 //Initialize the callback's thread fields before sending it off:
45 void CkCallback::impl_thread_init(void)
46 {
47     int exist;
48     CkCallback **cb;
49     d.thread.onPE=CkMyPe();
50         do {
51           if (CpvAccess(nextThreadCB)==0) CpvAccess(nextThreadCB)=1;
52           d.thread.cb=CpvAccess(nextThreadCB)++;
53           cb = &CpvAccess(threadCBs).put(d.thread.cb, &exist);
54         } while (exist==1);
55         *cb = this; //<- so we can find this structure later
56         d.thread.th=NULL; //<- thread isn't suspended yet
57         d.thread.ret=(void*)-1;//<- no data to return yet
58 }
59
60 //Actually suspend this thread
61 void *CkCallback::impl_thread_delay(void) const
62 {
63         if (type!=resumeThread) 
64                 CkAbort("Called impl_thread_delay on non-threaded callback");
65         if (CkMyPe()!=d.thread.onPE)
66                 CkAbort("Called thread_delay on different processor than where callback was created");
67         
68         //Find the original callback object:
69         CkCallback *dest=(CkCallback *)this;
70         if (d.thread.cb!=0) dest=CpvAccess(threadCBs).get(d.thread.cb);
71         if (dest==0)
72             CkAbort("Called thread_delay on an already deleted callback");
73         if (dest->d.thread.ret==(void*)-1) 
74         {  //We need to sleep for the result:
75                 dest->d.thread.th=CthSelf(); //<- so we know a thread is waiting
76                 CthSuspend();
77                 if (dest->d.thread.ret==(void*)-1) 
78                         CkAbort("thread resumed, but callback data is still empty");
79         }
80         return dest->d.thread.ret;
81 }
82
83
84 /*These can't be defined in the .h file like the other constructors
85  * because we need CkCallback before CProxyElement* are defined.
86  */
87 CkCallback::CkCallback(Chare *p, int ep, CmiBool doInline) {
88 #ifndef CMK_OPTIMIZE
89       bzero(this, sizeof(CkCallback));
90 #endif
91       type=doInline?isendChare:sendChare;
92         d.chare.ep=ep; 
93         d.chare.id=p->ckGetChareID();
94 }
95 CkCallback::CkCallback(Group *p, int ep, CmiBool doInline) {
96 #ifndef CMK_OPTIMIZE
97       bzero(this, sizeof(CkCallback));
98 #endif
99       type=doInline?isendGroup:sendGroup;
100         d.group.ep=ep; d.group.id=p->ckGetGroupID(); d.group.onPE=CkMyPe();
101 }
102 CkCallback::CkCallback(NodeGroup *p, int ep, CmiBool doInline) {
103 #ifndef CMK_OPTIMIZE
104       bzero(this, sizeof(CkCallback));
105 #endif
106       type=doInline?isendNodeGroup:sendNodeGroup;
107         d.group.ep=ep; d.group.id=p->ckGetGroupID(); d.group.onPE=CkMyNode();
108 }
109
110 CkCallback::CkCallback(int ep,const CProxy_NodeGroup &ngp) {
111 #ifndef CMK_OPTIMIZE
112       bzero(this, sizeof(CkCallback));
113 #endif
114       type=bcastNodeGroup;
115         d.group.ep=ep; d.group.id=ngp.ckGetGroupID();
116 }
117
118 CkCallback::CkCallback(int ep,int onPE,const CProxy_NodeGroup &ngp,CmiBool doInline) {
119 #ifndef CMK_OPTIMIZE
120       bzero(this, sizeof(CkCallback));
121 #endif
122       type=doInline?isendNodeGroup:sendNodeGroup;
123         d.group.ep=ep; d.group.id=ngp.ckGetGroupID(); d.group.onPE=onPE;
124 }
125
126 CkCallback::CkCallback(int ep,const CProxyElement_Group &grpElt,CmiBool doInline) {
127 #ifndef CMK_OPTIMIZE
128       bzero(this, sizeof(CkCallback));
129 #endif
130       type=doInline?isendGroup:sendGroup;
131         d.group.ep=ep; 
132         d.group.id=grpElt.ckGetGroupID(); 
133         d.group.onPE=grpElt.ckGetGroupPe();
134 }
135 CkCallback::CkCallback(int ep,const CProxyElement_ArrayBase &arrElt,CmiBool doInline) {
136 #ifndef CMK_OPTIMIZE
137       bzero(this, sizeof(CkCallback));
138 #endif
139       type=doInline?isendArray:sendArray;
140         d.array.ep=ep; 
141         d.array.id=arrElt.ckGetArrayID(); 
142         d.array.idx = arrElt.ckGetIndex();
143 }
144
145 CkCallback::CkCallback(int ep,CProxySection_ArrayBase &sectElt,CmiBool doInline) {
146 #ifndef CMK_OPTIMIZE
147       bzero(this, sizeof(CkCallback));
148 #endif
149       type=bcastSection;
150         d.array.ep=ep; 
151         secID=sectElt.ckGetSectionID(0); 
152 }
153
154 CkCallback::CkCallback(ArrayElement *p, int ep,CmiBool doInline) {
155 #ifndef CMK_OPTIMIZE
156       bzero(this, sizeof(CkCallback));
157 #endif
158       type=doInline?isendArray:sendArray;
159     d.array.ep=ep; 
160         d.array.id=p->ckGetArrayID(); 
161         d.array.idx = p->ckGetArrayIndex();
162 }
163
164
165 void CkCallback::send(int length,const void *data) const
166 {
167         send(CkDataMsg::buildNew(length,data));
168 }
169
170 /*Libraries should call this from their "done" entry points.
171   It takes the given message and handles it appropriately.
172   After the send(), this callback is finished and cannot be reused.
173 */
174 void CkCallback::send(void *msg) const
175 {
176         switch(type) {
177         CkPrintf("type:%d\n",type);
178         case ignore: //Just ignore the callback
179                 if (msg) CkFreeMsg(msg);
180                 break;
181         case ckExit: //Call ckExit
182                 if (msg) CkFreeMsg(msg);
183                 CkExit();
184                 break;
185         case resumeThread: //Resume a waiting thread
186                 if (d.thread.onPE==CkMyPe()) {
187                         CkCallback *dest=CpvAccess(threadCBs).get(d.thread.cb);
188                         if (dest==0 || dest->d.thread.ret!=(void*)-1)
189                                 CkAbort("Already sent a value to this callback!\n");
190                         dest->d.thread.ret=msg; //<- return data
191                         if (dest->d.thread.th!=NULL)
192                                 CthAwaken(dest->d.thread.th);
193                 } 
194                 else //Forward message to processor where the thread actually lives
195                         _ckcallbackgroup[d.thread.onPE].call(*this,(CkMessage *)msg);
196                 break;
197         case call1Fn: //Call a C function pointer on the current processor
198                 (d.c1fn.fn)(msg);
199                 break;
200         case callCFn: //Call a C function pointer on the appropriate processor
201                 if (d.cfn.onPE==CkMyPe())
202                         (d.cfn.fn)(d.cfn.param,msg);
203                 else
204                         _ckcallbackgroup[d.cfn.onPE].call(*this,(CkMessage *)msg);
205                 break;
206         case sendChare: //Send message to a chare
207                 if (!msg) msg=CkAllocSysMsg();
208                 CkSendMsg(d.chare.ep,msg,&d.chare.id);
209                 break;
210         case isendChare: //inline send-to-chare
211                 if (!msg) msg=CkAllocSysMsg();
212                 CkSendMsgInline(d.chare.ep,msg,&d.chare.id);
213                 break;
214         case sendGroup: //Send message to a group element
215                 if (!msg) msg=CkAllocSysMsg();
216                 CkSendMsgBranch(d.group.ep,msg,d.group.onPE,d.group.id);
217                 break;
218         case sendNodeGroup: //Send message to a group element
219                 if (!msg) msg=CkAllocSysMsg();
220                 CkSendMsgNodeBranch(d.group.ep,msg,d.group.onPE,d.group.id);
221                 break;
222         case isendGroup: //inline send-to-group element
223                 if (!msg) msg=CkAllocSysMsg();
224                 CkSendMsgBranchInline(d.group.ep,msg,d.group.onPE,d.group.id);
225                 break;
226         case isendNodeGroup: //inline send-to-group element
227                 if (!msg) msg=CkAllocSysMsg();
228                 CkSendMsgNodeBranchInline(d.group.ep,msg,d.group.onPE,d.group.id);
229                 break;
230         case sendArray: //Send message to an array element
231                 if (!msg) msg=CkAllocSysMsg();
232                 CkSendMsgArray(d.array.ep,msg,d.array.id,d.array.idx.asChild());
233                 break;
234         case isendArray: //inline send-to-array element
235                 if (!msg) msg=CkAllocSysMsg();
236                 CkSendMsgArrayInline(d.array.ep,msg,d.array.id,d.array.idx.asChild());
237                 break;
238         case bcastGroup:
239                 if (!msg) msg=CkAllocSysMsg();
240                 CkBroadcastMsgBranch(d.group.ep,msg,d.group.id);
241                 break;
242         case bcastNodeGroup:
243                 if (!msg) msg=CkAllocSysMsg();
244                 CkBroadcastMsgNodeBranch(d.group.ep,msg,d.group.id);
245                 break;
246         case bcastArray:
247                 if (!msg) msg=CkAllocSysMsg();
248                 CkBroadcastMsgArray(d.array.ep,msg,d.array.id);
249                 break;
250         /* Xiang begin*/
251         case bcastSection:
252                 if(!msg)msg=CkAllocSysMsg();
253                 CkBroadcastMsgSection(d.section.ep,msg,secID);
254                 break;
255         /* Xiang end */
256         case replyCCS: { /* Send CkDataMsg as a CCS reply */
257                 void *data=NULL;
258                 int length=0;
259                 if (msg) {
260                         CkDataMsg *m=(CkDataMsg *)msg;
261                         m->check();
262                         data=m->getData();
263                         length=m->getLength();
264                 }
265                 CcsSendDelayedReply(d.ccsReply.reply,length,data);
266                 if (msg) CkFreeMsg(msg);
267                 } break;
268         case invalid: //Uninitialized
269                 CmiAbort("Called send on uninitialized callback");
270                 break;
271         default: //Out-of-bounds type code
272                 CmiAbort("Called send on corrupted callback");
273                 break;
274         };
275 }
276 /* Xiang begin */
277 CkCallback::CkCallback(int ep, CkSectionID &id)
278         {
279                 type=bcastSection;
280                 d.section.ep=ep;
281                 secID=id;
282         }
283 /* Xiang end */
284 void CkCallback::pup(PUP::er &p) {
285   //p((char*)this, sizeof(CkCallback));
286   int t = (int)type;
287   p|t;
288   type = (callbackType)t;
289   switch (type) {
290   case resumeThread:
291     p|d.thread.onPE;
292     p|d.thread.cb;
293     break;
294   case isendChare:
295   case sendChare:
296     p|d.chare.ep;
297     p|d.chare.id;
298     break;
299   case isendGroup:
300   case sendGroup:
301   case isendNodeGroup:
302   case sendNodeGroup:
303     p|d.group.onPE;
304   case bcastNodeGroup:
305   case bcastGroup:
306     p|d.group.ep;
307     p|d.group.id;
308     break;
309   case isendArray:
310   case sendArray:
311     p|d.array.idx;
312   case bcastArray:
313     p|d.array.ep;
314     p|d.array.id;
315     break;
316   case replyCCS:
317     p((char*)&d.ccsReply.reply, sizeof(d.ccsReply.reply));
318     break;
319   case call1Fn:
320     p((char*)&d.c1fn, sizeof(d.c1fn));
321     break;
322   case callCFn:
323     p((char*)&d.cfn, sizeof(d.cfn));
324     break;
325   case ignore:
326   case ckExit:
327   case invalid:
328     break;
329   default:
330     CkAbort("Inconsistent CkCallback type");
331   }
332 }
333
334 void CkCallback::thread_destroy() const {
335   if (type==resumeThread && CpvAccess(threadCBs).get(d.thread.cb)==this) {
336     CpvAccess(threadCBs).remove(d.thread.cb);
337   }
338 }
339
340 CkCallbackResumeThread::~CkCallbackResumeThread() {
341   void * res = thread_delay(); //<- block thread here if it hasn't already
342   if (result != NULL) *result = res;
343   else CkFreeMsg(res);
344   thread_destroy();
345 }
346
347 /****** Callback-from-CCS ******/
348
349 // This function is called by CCS when a request comes in-- it maps the 
350 // request to a Charm++ message and passes the message to its callback.
351 extern "C" void ccsHandlerToCallback(void *cbPtr,int reqLen,const void *reqData) 
352 {
353         CkCallback *cb=(CkCallback *)cbPtr;
354         CkCcsRequestMsg *msg=new (reqLen,0) CkCcsRequestMsg;
355         msg->reply=CcsDelayReply();
356         msg->length=reqLen;
357         memcpy(msg->data,reqData,reqLen);
358         cb->send(msg);
359 }
360
361 // Register this callback with CCS.
362 void ckcallback_group::registerCcsCallback(const char *name,const CkCallback &cb)
363 {
364         CcsRegisterHandlerFn(name,ccsHandlerToCallback,new CkCallback(cb));
365 }
366
367 // Broadcast this callback registration to all processors
368 void CcsRegisterHandler(const char *ccs_handlername,const CkCallback &cb) {
369         _ckcallbackgroup.registerCcsCallback(ccs_handlername,cb);
370 }
371
372 enum {dataMsgTag=0x7ed2beef};
373 CkDataMsg *CkDataMsg::buildNew(int length,const void *data)
374 {
375         CkDataMsg *msg=new (&length,0) CkDataMsg;
376         msg->length=length;
377         memcpy(msg->data,data,length);
378         msg->checkTag=dataMsgTag;
379         return msg;
380 }
381
382 void CkDataMsg::check(void)
383 {
384         if (checkTag!=dataMsgTag)
385                 CkAbort("CkDataMsg corrupted-- bad tag.");
386 }
387
388 void CkCallbackInit() {
389   CpvInitialize(threadCB_t, threadCBs);
390   CpvInitialize(unsigned int, nextThreadCB);
391   CpvAccess(nextThreadCB)=1;
392 }
393
394 #include "CkCallback.def.h"
395