CkCallback: Clean up new array section constructor
[charm.git] / src / ck-core / ckcallback.C
1 /*
2 A CkCallback is a simple way for a library to return data 
3 to a wide variety of user code, without the library having
4 to handle all 17 possible cases.
5
6 This object is implemented as a union, so the entire object
7 can be sent as bytes.  Another option would be to use a virtual 
8 "send" method.
9
10 Initial version by Orion Sky Lawlor, olawlor@acm.org, 2/8/2002
11 */
12 #include "charm++.h"
13 #include "ckcallback-ccs.h"
14 #include "CkCallback.decl.h"
15 #include "envelope.h"
16 /*readonly*/ CProxy_ckcallback_group _ckcallbackgroup;
17
18 typedef CkHashtableT<CkHashtableAdaptorT<unsigned int>, CkCallback*> threadCB_t;
19 CpvStaticDeclare(threadCB_t, threadCBs);
20 CpvStaticDeclare(unsigned int, nextThreadCB);
21
22 //This main chare is only used to create the callback forwarding group
23 class ckcallback_main : public CBase_ckcallback_main {
24 public:
25         ckcallback_main(CkArgMsg *m) {
26                 _ckcallbackgroup=CProxy_ckcallback_group::ckNew();
27                 delete m;
28         }
29 };
30
31 //The callback group is used to forward a callback to the processor
32 // it originated from.
33 class ckcallback_group : public CBase_ckcallback_group {
34 public:
35         ckcallback_group() { /*empty*/ }
36         ckcallback_group(CkMigrateMessage *m) { /*empty*/ }
37         void registerCcsCallback(const char *name,const CkCallback &cb);
38         void call(CkCallback &c,CkMarshalledMessage &msg) {
39                 c.send(msg.getMessage());
40         }
41 };
42
43 /*************** CkCallback implementation ***************/
44 //Initialize the callback's thread fields before sending it off:
45 void CkCallback::impl_thread_init(void)
46 {
47     int exist;
48     CkCallback **cb;
49     d.thread.onPE=CkMyPe();
50         do {
51           if (CpvAccess(nextThreadCB)==0) CpvAccess(nextThreadCB)=1;
52           d.thread.cb=CpvAccess(nextThreadCB)++;
53           cb = &CpvAccess(threadCBs).put(d.thread.cb, &exist);
54         } while (exist==1);
55         *cb = this; //<- so we can find this structure later
56         d.thread.th=NULL; //<- thread isn't suspended yet
57         d.thread.ret=(void*)-1;//<- no data to return yet
58 }
59
60 //Actually suspend this thread
61 void *CkCallback::impl_thread_delay(void) const
62 {
63         if (type!=resumeThread) 
64                 CkAbort("Called impl_thread_delay on non-threaded callback");
65         if (CkMyPe()!=d.thread.onPE)
66                 CkAbort("Called thread_delay on different processor than where callback was created");
67         
68         //Find the original callback object:
69         CkCallback *dest=(CkCallback *)this;
70         if (d.thread.cb!=0) dest=CpvAccess(threadCBs).get(d.thread.cb);
71         if (dest==0)
72             CkAbort("Called thread_delay on an already deleted callback");
73         if (dest->d.thread.ret==(void*)-1) 
74         {  //We need to sleep for the result:
75                 dest->d.thread.th=CthSelf(); //<- so we know a thread is waiting
76                 CthSuspend();
77                 if (dest->d.thread.ret==(void*)-1) 
78                         CkAbort("thread resumed, but callback data is still empty");
79         }
80         return dest->d.thread.ret;
81 }
82
83
84 /*These can't be defined in the .h file like the other constructors
85  * because we need CkCallback before CProxyElement* are defined.
86  */
87 CkCallback::CkCallback(Chare *p, int ep, CmiBool doInline) {
88 #ifndef CMK_OPTIMIZE
89       bzero(this, sizeof(CkCallback));
90 #endif
91       type=doInline?isendChare:sendChare;
92         d.chare.ep=ep; 
93         d.chare.id=p->ckGetChareID();
94 }
95 CkCallback::CkCallback(Group *p, int ep, CmiBool doInline) {
96 #ifndef CMK_OPTIMIZE
97       bzero(this, sizeof(CkCallback));
98 #endif
99       type=doInline?isendGroup:sendGroup;
100         d.group.ep=ep; d.group.id=p->ckGetGroupID(); d.group.onPE=CkMyPe();
101 }
102 CkCallback::CkCallback(NodeGroup *p, int ep, CmiBool doInline) {
103 #ifndef CMK_OPTIMIZE
104       bzero(this, sizeof(CkCallback));
105 #endif
106       type=doInline?isendNodeGroup:sendNodeGroup;
107         d.group.ep=ep; d.group.id=p->ckGetGroupID(); d.group.onPE=CkMyNode();
108 }
109
110 CkCallback::CkCallback(int ep,const CProxy_NodeGroup &ngp) {
111 #ifndef CMK_OPTIMIZE
112       bzero(this, sizeof(CkCallback));
113 #endif
114       type=bcastNodeGroup;
115         d.group.ep=ep; d.group.id=ngp.ckGetGroupID();
116 }
117
118 CkCallback::CkCallback(int ep,int onPE,const CProxy_NodeGroup &ngp,CmiBool doInline) {
119 #ifndef CMK_OPTIMIZE
120       bzero(this, sizeof(CkCallback));
121 #endif
122       type=doInline?isendNodeGroup:sendNodeGroup;
123         d.group.ep=ep; d.group.id=ngp.ckGetGroupID(); d.group.onPE=onPE;
124 }
125
126 CkCallback::CkCallback(int ep,const CProxyElement_Group &grpElt,CmiBool doInline) {
127 #ifndef CMK_OPTIMIZE
128       bzero(this, sizeof(CkCallback));
129 #endif
130       type=doInline?isendGroup:sendGroup;
131         d.group.ep=ep; 
132         d.group.id=grpElt.ckGetGroupID(); 
133         d.group.onPE=grpElt.ckGetGroupPe();
134 }
135 CkCallback::CkCallback(int ep,const CProxyElement_ArrayBase &arrElt,CmiBool doInline) {
136 #ifndef CMK_OPTIMIZE
137       bzero(this, sizeof(CkCallback));
138 #endif
139       type=doInline?isendArray:sendArray;
140         d.array.ep=ep; 
141         d.array.id=arrElt.ckGetArrayID(); 
142         d.array.idx = arrElt.ckGetIndex();
143 }
144
145 CkCallback::CkCallback(int ep,CProxySection_ArrayBase &sectElt,CmiBool doInline) {
146 #ifndef CMK_OPTIMIZE
147       bzero(this, sizeof(CkCallback));
148 #endif
149       type=bcastSection;
150         d.array.ep=ep; 
151         secID=sectElt.ckGetSectionID(0); 
152 }
153
154 CkCallback::CkCallback(int ep, CkSectionID &id) {
155 #ifndef CMK_OPTIMIZE
156       bzero(this, sizeof(CkCallback));
157 #endif
158       type=bcastSection;
159       d.section.ep=ep;
160       secID=id;
161 }
162
163 CkCallback::CkCallback(ArrayElement *p, int ep,CmiBool doInline) {
164 #ifndef CMK_OPTIMIZE
165       bzero(this, sizeof(CkCallback));
166 #endif
167       type=doInline?isendArray:sendArray;
168     d.array.ep=ep; 
169         d.array.id=p->ckGetArrayID(); 
170         d.array.idx = p->ckGetArrayIndex();
171 }
172
173
174 void CkCallback::send(int length,const void *data) const
175 {
176         send(CkDataMsg::buildNew(length,data));
177 }
178
179 /*Libraries should call this from their "done" entry points.
180   It takes the given message and handles it appropriately.
181   After the send(), this callback is finished and cannot be reused.
182 */
183 void CkCallback::send(void *msg) const
184 {
185         switch(type) {
186         CkPrintf("type:%d\n",type);
187         case ignore: //Just ignore the callback
188                 if (msg) CkFreeMsg(msg);
189                 break;
190         case ckExit: //Call ckExit
191                 if (msg) CkFreeMsg(msg);
192                 CkExit();
193                 break;
194         case resumeThread: //Resume a waiting thread
195                 if (d.thread.onPE==CkMyPe()) {
196                         CkCallback *dest=CpvAccess(threadCBs).get(d.thread.cb);
197                         if (dest==0 || dest->d.thread.ret!=(void*)-1)
198                                 CkAbort("Already sent a value to this callback!\n");
199                         dest->d.thread.ret=msg; //<- return data
200                         if (dest->d.thread.th!=NULL)
201                                 CthAwaken(dest->d.thread.th);
202                 } 
203                 else //Forward message to processor where the thread actually lives
204                         _ckcallbackgroup[d.thread.onPE].call(*this,(CkMessage *)msg);
205                 break;
206         case call1Fn: //Call a C function pointer on the current processor
207                 (d.c1fn.fn)(msg);
208                 break;
209         case callCFn: //Call a C function pointer on the appropriate processor
210                 if (d.cfn.onPE==CkMyPe())
211                         (d.cfn.fn)(d.cfn.param,msg);
212                 else
213                         _ckcallbackgroup[d.cfn.onPE].call(*this,(CkMessage *)msg);
214                 break;
215         case sendChare: //Send message to a chare
216                 if (!msg) msg=CkAllocSysMsg();
217                 CkSendMsg(d.chare.ep,msg,&d.chare.id);
218                 break;
219         case isendChare: //inline send-to-chare
220                 if (!msg) msg=CkAllocSysMsg();
221                 CkSendMsgInline(d.chare.ep,msg,&d.chare.id);
222                 break;
223         case sendGroup: //Send message to a group element
224                 if (!msg) msg=CkAllocSysMsg();
225                 CkSendMsgBranch(d.group.ep,msg,d.group.onPE,d.group.id);
226                 break;
227         case sendNodeGroup: //Send message to a group element
228                 if (!msg) msg=CkAllocSysMsg();
229                 CkSendMsgNodeBranch(d.group.ep,msg,d.group.onPE,d.group.id);
230                 break;
231         case isendGroup: //inline send-to-group element
232                 if (!msg) msg=CkAllocSysMsg();
233                 CkSendMsgBranchInline(d.group.ep,msg,d.group.onPE,d.group.id);
234                 break;
235         case isendNodeGroup: //inline send-to-group element
236                 if (!msg) msg=CkAllocSysMsg();
237                 CkSendMsgNodeBranchInline(d.group.ep,msg,d.group.onPE,d.group.id);
238                 break;
239         case sendArray: //Send message to an array element
240                 if (!msg) msg=CkAllocSysMsg();
241                 CkSendMsgArray(d.array.ep,msg,d.array.id,d.array.idx.asChild());
242                 break;
243         case isendArray: //inline send-to-array element
244                 if (!msg) msg=CkAllocSysMsg();
245                 CkSendMsgArrayInline(d.array.ep,msg,d.array.id,d.array.idx.asChild());
246                 break;
247         case bcastGroup:
248                 if (!msg) msg=CkAllocSysMsg();
249                 CkBroadcastMsgBranch(d.group.ep,msg,d.group.id);
250                 break;
251         case bcastNodeGroup:
252                 if (!msg) msg=CkAllocSysMsg();
253                 CkBroadcastMsgNodeBranch(d.group.ep,msg,d.group.id);
254                 break;
255         case bcastArray:
256                 if (!msg) msg=CkAllocSysMsg();
257                 CkBroadcastMsgArray(d.array.ep,msg,d.array.id);
258                 break;
259         case bcastSection:
260                 if(!msg)msg=CkAllocSysMsg();
261                 CkBroadcastMsgSection(d.section.ep,msg,secID);
262                 break;
263         case replyCCS: { /* Send CkDataMsg as a CCS reply */
264                 void *data=NULL;
265                 int length=0;
266                 if (msg) {
267                         CkDataMsg *m=(CkDataMsg *)msg;
268                         m->check();
269                         data=m->getData();
270                         length=m->getLength();
271                 }
272                 CcsSendDelayedReply(d.ccsReply.reply,length,data);
273                 if (msg) CkFreeMsg(msg);
274                 } break;
275         case invalid: //Uninitialized
276                 CmiAbort("Called send on uninitialized callback");
277                 break;
278         default: //Out-of-bounds type code
279                 CmiAbort("Called send on corrupted callback");
280                 break;
281         };
282 }
283
284 void CkCallback::pup(PUP::er &p) {
285   //p((char*)this, sizeof(CkCallback));
286   int t = (int)type;
287   p|t;
288   type = (callbackType)t;
289   switch (type) {
290   case resumeThread:
291     p|d.thread.onPE;
292     p|d.thread.cb;
293     break;
294   case isendChare:
295   case sendChare:
296     p|d.chare.ep;
297     p|d.chare.id;
298     break;
299   case isendGroup:
300   case sendGroup:
301   case isendNodeGroup:
302   case sendNodeGroup:
303     p|d.group.onPE;
304   case bcastNodeGroup:
305   case bcastGroup:
306     p|d.group.ep;
307     p|d.group.id;
308     break;
309   case isendArray:
310   case sendArray:
311     p|d.array.idx;
312   case bcastArray:
313     p|d.array.ep;
314     p|d.array.id;
315     break;
316   case replyCCS:
317     p((char*)&d.ccsReply.reply, sizeof(d.ccsReply.reply));
318     break;
319   case call1Fn:
320     p((char*)&d.c1fn, sizeof(d.c1fn));
321     break;
322   case callCFn:
323     p((char*)&d.cfn, sizeof(d.cfn));
324     break;
325   case ignore:
326   case ckExit:
327   case invalid:
328     break;
329   default:
330     CkAbort("Inconsistent CkCallback type");
331   }
332 }
333
334 void CkCallback::thread_destroy() const {
335   if (type==resumeThread && CpvAccess(threadCBs).get(d.thread.cb)==this) {
336     CpvAccess(threadCBs).remove(d.thread.cb);
337   }
338 }
339
340 CkCallbackResumeThread::~CkCallbackResumeThread() {
341   void * res = thread_delay(); //<- block thread here if it hasn't already
342   if (result != NULL) *result = res;
343   else CkFreeMsg(res);
344   thread_destroy();
345 }
346
347 /****** Callback-from-CCS ******/
348
349 // This function is called by CCS when a request comes in-- it maps the 
350 // request to a Charm++ message and passes the message to its callback.
351 extern "C" void ccsHandlerToCallback(void *cbPtr,int reqLen,const void *reqData) 
352 {
353         CkCallback *cb=(CkCallback *)cbPtr;
354         CkCcsRequestMsg *msg=new (reqLen,0) CkCcsRequestMsg;
355         msg->reply=CcsDelayReply();
356         msg->length=reqLen;
357         memcpy(msg->data,reqData,reqLen);
358         cb->send(msg);
359 }
360
361 // Register this callback with CCS.
362 void ckcallback_group::registerCcsCallback(const char *name,const CkCallback &cb)
363 {
364         CcsRegisterHandlerFn(name,ccsHandlerToCallback,new CkCallback(cb));
365 }
366
367 // Broadcast this callback registration to all processors
368 void CcsRegisterHandler(const char *ccs_handlername,const CkCallback &cb) {
369         _ckcallbackgroup.registerCcsCallback(ccs_handlername,cb);
370 }
371
372 enum {dataMsgTag=0x7ed2beef};
373 CkDataMsg *CkDataMsg::buildNew(int length,const void *data)
374 {
375         CkDataMsg *msg=new (&length,0) CkDataMsg;
376         msg->length=length;
377         memcpy(msg->data,data,length);
378         msg->checkTag=dataMsgTag;
379         return msg;
380 }
381
382 void CkDataMsg::check(void)
383 {
384         if (checkTag!=dataMsgTag)
385                 CkAbort("CkDataMsg corrupted-- bad tag.");
386 }
387
388 void CkCallbackInit() {
389   CpvInitialize(threadCB_t, threadCBs);
390   CpvInitialize(unsigned int, nextThreadCB);
391   CpvAccess(nextThreadCB)=1;
392 }
393
394 #include "CkCallback.def.h"
395