fixed a bug in calling ckLocal for a local chare.
[charm.git] / src / ck-core / ck.h
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 #ifndef _CK_H_
9 #define _CK_H_
10
11 #include <string.h>
12 #include <stdlib.h>
13 #include <math.h>
14 #include "charm++.h"
15 #include "envelope.h"
16 #include "qd.h"
17 #include "register.h"
18 #include "stats.h"
19 #include "ckfutures.h"
20 #include "charisma.h"
21
22 #if CMK_ERROR_CHECKING
23 #define _CHECK_VALID(p, msg) do {if((p)==0){CkAbort(msg);}} while(0)
24 #else
25 #define _CHECK_VALID(p, msg) do { } while(0)
26 #endif
27
28 // Flag that tells the system if we are replaying using Record/Replay
29 extern "C" int _replaySystem;
30
31 #if CMK_CHARMDEBUG
32 extern "C" int ConverseDeliver(int pe);
33 inline void _CldEnqueue(int pe, void *msg, int infofn) {
34   if (!ConverseDeliver(pe)) {
35     CmiFree(msg);
36     return;
37   }
38   CldEnqueue(pe, msg, infofn);
39 }
40 inline void _CldEnqueueMulti(int npes, int *pes, void *msg, int infofn) {
41   if (!ConverseDeliver(-1)) {
42     CmiFree(msg);
43     return;
44   }
45   CldEnqueueMulti(npes, pes, msg, infofn);
46 }
47 inline void _CldEnqueueGroup(CmiGroup grp, void *msg, int infofn) {
48   if (!ConverseDeliver(-1)) {
49     CmiFree(msg);
50     return;
51   }
52   CldEnqueueGroup(grp, msg, infofn);
53 }
54 inline void _CldNodeEnqueue(int node, void *msg, int infofn) {
55   if (!ConverseDeliver(node)) {
56     CmiFree(msg);
57     return;
58   }
59   CldNodeEnqueue(node, msg, infofn);
60 }
61 #else
62 #define _CldEnqueue       CldEnqueue
63 #define _CldEnqueueMulti  CldEnqueueMulti
64 #define _CldEnqueueGroup  CldEnqueueGroup
65 #define _CldNodeEnqueue   CldNodeEnqueue
66 #endif
67
68 #ifndef CMK_CHARE_USE_PTR
69 CkpvExtern(CkVec<void *>, chare_objs);
70 #endif
71
72 /// A set of "Virtual ChareID"'s
73 class VidBlock {
74     enum VidState {FILLED, UNFILLED};
75     VidState state;
76     PtrQ *msgQ;
77     CkChareID actualID;
78     void msgDeliver(envelope *env) {
79         // This was causing sync entry methods not to return properly in some cases
80         //env->setSrcPe(CkMyPe());
81         env->setMsgtype(ForChareMsg);
82         env->setObjPtr(actualID.objPtr);
83         _CldEnqueue(actualID.onPE, env, _infoIdx);
84         CpvAccess(_qd)->create();      
85     }
86   public:
87     VidBlock() ;
88     void send(envelope *env) {
89       if(state==UNFILLED) {
90         msgQ->enq((void *)env);
91       } else {
92         msgDeliver(env);
93       }
94     }
95     void fill(int onPE, void *oPtr) {
96       state = FILLED;
97       actualID.onPE = onPE;
98       actualID.objPtr = oPtr;
99       envelope *env;
100       while(NULL!=(env=(envelope*)msgQ->deq())) {
101         msgDeliver(env);
102       }
103       delete msgQ; msgQ=0;
104     }
105     void *getLocalChare(void) {
106       if (state==FILLED && actualID.onPE==CkMyPe()) 
107           return actualID.objPtr;
108       return NULL;
109     }
110     void *getLocalChareObj(void) {   
111          // returns actual object, different when CMK_CHARE_USE_PTR is false
112       if (state==FILLED && actualID.onPE==CkMyPe()) 
113 #ifdef CMK_CHARE_USE_PTR
114           return actualID.objPtr;
115 #else
116           return CkpvAccess(chare_objs)[(CmiIntPtr)actualID.objPtr];
117 #endif
118       return NULL;
119     }
120     void pup(PUP::er &p) {
121 #ifndef CMK_CHARE_USE_PTR
122       int s;
123       if (!p.isUnpacking()) s = state-FILLED;
124       p|s;
125       if (p.isUnpacking()) state = (VidState)(FILLED+s);
126       if (p.isUnpacking()) msgQ = NULL;    // fixme
127       p|actualID;
128 #endif
129     }
130 };
131
132 class CkCoreState;
133
134 /// Message watcher: for record/replay support
135 class CkMessageWatcher {
136 protected:
137   FILE *f;
138   CkMessageWatcher *next;
139 public:
140     CkMessageWatcher() : f(NULL), next(NULL) { }
141     virtual ~CkMessageWatcher();
142         /**
143          * This message is about to be processed by Charm.
144          * If this function returns false, the message will not be processed.
145          * The message is processed by the watcher starting from the innermost one
146          * up to the outermost
147          */
148 #define PROCESS_MACRO(name,type) inline CmiBool process##name(type *input,CkCoreState *ck) { \
149   CmiBool result = CmiTrue; \
150     if (next != NULL) result &= next->process##name(input, ck); \
151     result &= process(input, ck); \
152     return result; \
153   }
154
155     PROCESS_MACRO(Message,envelope*);
156     PROCESS_MACRO(Thread,CthThreadToken);
157     PROCESS_MACRO(LBMessage,LBMigrateMsg*);
158
159 #undef PROCESS_MACRO
160 protected:
161     /** These are used internally by this class to call the correct subclass method */
162         virtual CmiBool process(envelope **env,CkCoreState *ck) =0;
163         virtual CmiBool process(CthThreadToken *token, CkCoreState *ck) {return CmiTrue;}
164         virtual CmiBool process(LBMigrateMsg **msg, CkCoreState *ck) {return CmiTrue;}
165 public:
166     inline void setNext(CkMessageWatcher *w) { next = w; }
167 };
168
169 /// All the state that's useful to have on the receive side in the Charm Core (ck.C)
170 class CkCoreState {
171         GroupTable *groupTable;
172         QdState *qd;
173 public:
174         CkMessageWatcher *watcher;
175         /** Adds an extra watcher (which wrap the previously existing one) */
176         inline void addWatcher(CkMessageWatcher *w) {
177           w->setNext(watcher);
178           watcher = w;
179         }
180         
181         CkCoreState() 
182                 :groupTable(CkpvAccess(_groupTable)),
183                  qd(CpvAccess(_qd)) { watcher=NULL; }
184         ~CkCoreState() { delete watcher;}
185
186         inline GroupTable *getGroupTable() {
187                 return groupTable;
188         }
189         inline IrrGroup *localBranch(CkGroupID gID) {
190                 return groupTable->find(gID).getObj();
191         }
192
193         inline QdState *getQD() {return qd;}
194         // when in interrupt based net version, use the extra copy
195         // of qd when inside an immediate handler function.
196         inline void process(int n=1) {
197           if (CmiImmIsRunning())
198             CpvAccessOther(_qd, 1)->process(n);
199           else
200             qd->process(n);
201         }
202         inline void create(int n=1) {
203           if (CmiImmIsRunning())
204             CpvAccessOther(_qd, 1)->create(n);
205           else
206             qd->create(n);
207         }
208 };
209
210 CkpvExtern(CkCoreState *, _coreState);
211
212 void CpdHandleLBMessage(LBMigrateMsg **msg);
213 void CkMessageWatcherInit(char **argv,CkCoreState *ck);
214
215 extern void _processHandler(void *converseMsg,CkCoreState *ck);
216 extern void _processBocInitMsg(CkCoreState *ck,envelope *msg);
217 extern void _processNodeBocInitMsg(CkCoreState *ck,envelope *msg);
218 extern void _infoFn(void *msg, CldPackFn *pfn, int *len,
219                     int *queueing, int *priobits, UInt **prioptr);
220 extern void CkCreateLocalGroup(CkGroupID groupID, int eIdx, envelope *env);
221 extern void CkCreateLocalNodeGroup(CkGroupID groupID, int eIdx, envelope *env);
222 extern void _createGroup(CkGroupID groupID, envelope *env);
223 extern void _createNodeGroup(CkGroupID groupID, envelope *env);
224 extern int _getGroupIdx(int,int,int);
225
226 #endif