fe4905c974ecce60366f904355ae0405f32e94d2
[charm.git] / src / ck-core / ck.h
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 #ifndef _CK_H_
9 #define _CK_H_
10
11 #include <string.h>
12 #include <stdlib.h>
13 #include <math.h>
14 #include "charm++.h"
15 #include "envelope.h"
16 #include "qd.h"
17 #include "register.h"
18 #include "stats.h"
19 #include "ckfutures.h"
20 #include "charisma.h"
21
22 #if CMK_ERROR_CHECKING
23 #define _CHECK_VALID(p, msg) do {if((p)==0){CkAbort(msg);}} while(0)
24 #else
25 #define _CHECK_VALID(p, msg) do { } while(0)
26 #endif
27
28 // Flag that tells the system if we are replaying using Record/Replay
29 extern "C" int _replaySystem;
30
31 #if CMK_CHARMDEBUG
32 extern "C" int ConverseDeliver(int pe);
33 inline void _CldEnqueue(int pe, void *msg, int infofn) {
34   if (!ConverseDeliver(pe)) {
35     CmiFree(msg);
36     return;
37   }
38   CldEnqueue(pe, msg, infofn);
39 }
40 inline void _CldEnqueueMulti(int npes, int *pes, void *msg, int infofn) {
41   if (!ConverseDeliver(-1)) {
42     CmiFree(msg);
43     return;
44   }
45   CldEnqueueMulti(npes, pes, msg, infofn);
46 }
47 inline void _CldEnqueueGroup(CmiGroup grp, void *msg, int infofn) {
48   if (!ConverseDeliver(-1)) {
49     CmiFree(msg);
50     return;
51   }
52   CldEnqueueGroup(grp, msg, infofn);
53 }
54 inline void _CldNodeEnqueue(int node, void *msg, int infofn) {
55   if (!ConverseDeliver(node)) {
56     CmiFree(msg);
57     return;
58   }
59   CldNodeEnqueue(node, msg, infofn);
60 }
61 #else
62 #define _CldEnqueue       CldEnqueue
63 #define _CldEnqueueMulti  CldEnqueueMulti
64 #define _CldEnqueueGroup  CldEnqueueGroup
65 #define _CldNodeEnqueue   CldNodeEnqueue
66 #endif
67
68 /// A set of "Virtual ChareID"'s
69 class VidBlock {
70     enum VidState {FILLED, UNFILLED};
71     VidState state;
72     PtrQ *msgQ;
73     CkChareID actualID;
74     void msgDeliver(envelope *env) {
75         // This was causing sync entry methods not to return properly in some cases
76         //env->setSrcPe(CkMyPe());
77         env->setMsgtype(ForChareMsg);
78         env->setObjPtr(actualID.objPtr);
79         _CldEnqueue(actualID.onPE, env, _infoIdx);
80         CpvAccess(_qd)->create();      
81     }
82   public:
83     VidBlock() ;
84     void send(envelope *env) {
85       if(state==UNFILLED) {
86         msgQ->enq((void *)env);
87       } else {
88         msgDeliver(env);
89       }
90     }
91     void fill(int onPE, void *oPtr) {
92       state = FILLED;
93       actualID.onPE = onPE;
94       actualID.objPtr = oPtr;
95       envelope *env;
96       while(NULL!=(env=(envelope*)msgQ->deq())) {
97         msgDeliver(env);
98       }
99       delete msgQ; msgQ=0;
100     }
101     void *getLocalChare(void) {
102       if (state==FILLED && actualID.onPE==CkMyPe()) 
103           return actualID.objPtr;
104       return NULL;
105     }
106     void pup(PUP::er &p) {
107 #ifndef CMK_CHARE_USE_PTR
108       int s;
109       if (!p.isUnpacking()) s = state-FILLED;
110       p|s;
111       if (p.isUnpacking()) state = (VidState)(FILLED+s);
112       if (p.isUnpacking()) msgQ = NULL;    // fixme
113       p|actualID;
114 #endif
115     }
116 };
117
118 class CkCoreState;
119
120 /// Message watcher: for record/replay support
121 class CkMessageWatcher {
122 protected:
123   FILE *f;
124   CkMessageWatcher *next;
125 public:
126     CkMessageWatcher() : f(NULL), next(NULL) { }
127     virtual ~CkMessageWatcher();
128         /**
129          * This message is about to be processed by Charm.
130          * If this function returns false, the message will not be processed.
131          * The message is processed by the watcher starting from the innermost one
132          * up to the outermost
133          */
134 #define PROCESS_MACRO(name,type) inline CmiBool process##name(type *input,CkCoreState *ck) { \
135   CmiBool result = CmiTrue; \
136     if (next != NULL) result &= next->process##name(input, ck); \
137     result &= process(input, ck); \
138     return result; \
139   }
140
141     PROCESS_MACRO(Message,envelope*);
142     PROCESS_MACRO(Thread,CthThreadToken);
143     PROCESS_MACRO(LBMessage,LBMigrateMsg*);
144
145 #undef PROCESS_MACRO
146 protected:
147     /** These are used internally by this class to call the correct subclass method */
148         virtual CmiBool process(envelope **env,CkCoreState *ck) =0;
149         virtual CmiBool process(CthThreadToken *token, CkCoreState *ck) {return CmiTrue;}
150         virtual CmiBool process(LBMigrateMsg **msg, CkCoreState *ck) {return CmiTrue;}
151 public:
152     inline void setNext(CkMessageWatcher *w) { next = w; }
153 };
154
155 /// All the state that's useful to have on the receive side in the Charm Core (ck.C)
156 class CkCoreState {
157         GroupTable *groupTable;
158         QdState *qd;
159 public:
160         CkMessageWatcher *watcher;
161         /** Adds an extra watcher (which wrap the previously existing one) */
162         inline void addWatcher(CkMessageWatcher *w) {
163           w->setNext(watcher);
164           watcher = w;
165         }
166         
167         CkCoreState() 
168                 :groupTable(CkpvAccess(_groupTable)),
169                  qd(CpvAccess(_qd)) { watcher=NULL; }
170         ~CkCoreState() { delete watcher;}
171
172         inline GroupTable *getGroupTable() {
173                 return groupTable;
174         }
175         inline IrrGroup *localBranch(CkGroupID gID) {
176                 return groupTable->find(gID).getObj();
177         }
178
179         inline QdState *getQD() {return qd;}
180         // when in interrupt based net version, use the extra copy
181         // of qd when inside an immediate handler function.
182         inline void process(int n=1) {
183           if (CmiImmIsRunning())
184             CpvAccessOther(_qd, 1)->process(n);
185           else
186             qd->process(n);
187         }
188         inline void create(int n=1) {
189           if (CmiImmIsRunning())
190             CpvAccessOther(_qd, 1)->create(n);
191           else
192             qd->create(n);
193         }
194 };
195
196 CkpvExtern(CkCoreState *, _coreState);
197
198 void CpdHandleLBMessage(LBMigrateMsg **msg);
199 void CkMessageWatcherInit(char **argv,CkCoreState *ck);
200
201 extern void _processHandler(void *converseMsg,CkCoreState *ck);
202 extern void _processBocInitMsg(CkCoreState *ck,envelope *msg);
203 extern void _processNodeBocInitMsg(CkCoreState *ck,envelope *msg);
204 extern void _infoFn(void *msg, CldPackFn *pfn, int *len,
205                     int *queueing, int *priobits, UInt **prioptr);
206 extern void CkCreateLocalGroup(CkGroupID groupID, int eIdx, envelope *env);
207 extern void CkCreateLocalNodeGroup(CkGroupID groupID, int eIdx, envelope *env);
208 extern void _createGroup(CkGroupID groupID, envelope *env);
209 extern void _createNodeGroup(CkGroupID groupID, envelope *env);
210 extern int _getGroupIdx(int,int,int);
211
212 #endif