Adding hook when resuming a normal thread of execution (i.e not the main thread or...
[charm.git] / src / ck-core / ck.h
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 #ifndef _CK_H_
9 #define _CK_H_
10
11 #include <string.h>
12 #include <stdlib.h>
13 #include <math.h>
14 #include "charm++.h"
15 #include "envelope.h"
16 #include "qd.h"
17 #include "register.h"
18 #include "stats.h"
19 #include "ckfutures.h"
20 #include "charisma.h"
21
22 #ifndef CMK_OPTIMIZE
23 #define _CHECK_VALID(p, msg) do {if((p)==0){CkAbort(msg);}} while(0)
24 #else
25 #define _CHECK_VALID(p, msg) do { } while(0)
26 #endif
27
28 // Flag that tells the system if we are replaying using Record/Replay
29 extern int replaySystem;
30
31 /// A set of "Virtual ChareID"'s
32 class VidBlock {
33     enum VidState {FILLED, UNFILLED};
34     VidState state;
35     PtrQ *msgQ;
36     CkChareID actualID;
37     void msgDeliver(envelope *env) {
38         // This was causing sync entry methods not to return properly in some cases
39         //env->setSrcPe(CkMyPe());
40         env->setMsgtype(ForChareMsg);
41         env->setObjPtr(actualID.objPtr);
42         CldEnqueue(actualID.onPE, env, _infoIdx);
43         CpvAccess(_qd)->create();      
44     }
45   public:
46     VidBlock() ;
47     void send(envelope *env) {
48       if(state==UNFILLED) {
49         msgQ->enq((void *)env);
50       } else {
51         msgDeliver(env);
52       }
53     }
54     void fill(int onPE, void *oPtr) {
55       state = FILLED;
56       actualID.onPE = onPE;
57       actualID.objPtr = oPtr;
58       envelope *env;
59       while(NULL!=(env=(envelope*)msgQ->deq())) {
60         msgDeliver(env);
61       }
62       delete msgQ; msgQ=0;
63     }
64     void *getLocalChare(void) {
65       if (state==FILLED && actualID.onPE==CkMyPe()) 
66           return actualID.objPtr;
67       return NULL;
68     }
69     void pup(PUP::er &p) {
70 #ifndef CMK_CHARE_USE_PTR
71       int s;
72       if (!p.isUnpacking()) s = state-FILLED;
73       p|s;
74       if (p.isUnpacking()) state = (VidState)(FILLED+s);
75       if (p.isUnpacking()) msgQ = NULL;    // fixme
76       p|actualID;
77 #endif
78     }
79 };
80
81 class CkCoreState;
82
83 /// Message watcher: for record/replay support
84 class CkMessageWatcher {
85 protected:
86   FILE *f;
87   CkMessageWatcher *next;
88 public:
89     CkMessageWatcher() : next(NULL) { }
90     virtual ~CkMessageWatcher();
91         /**
92          * This message is about to be processed by Charm.
93          * If this function returns false, the message will not be processed.
94          * The message is processed by the watcher starting from the innermost one
95          * up to the outermost
96          */
97         inline CmiBool processMessage(envelope *env,CkCoreState *ck) {
98           CmiBool result = CmiTrue;
99           if (next != NULL) result &= next->processMessage(env, ck);
100           result &= process(env, ck);
101           return result;
102         }
103         inline int processThread(CthThreadToken *token, CkCoreState *ck) {
104            int result = 1;
105            if (next != NULL) result &= next->processThread(token, ck);
106            result &= process(token, ck);
107            return result;
108         }
109 protected:
110     /** These are used internally by this class to call the correct subclass method */
111         virtual CmiBool process(envelope *env,CkCoreState *ck) =0;
112         virtual int process(CthThreadToken *token, CkCoreState *ck) {return 1;}
113 public:
114     inline void setNext(CkMessageWatcher *w) { next = w; }
115 };
116
117 /// All the state that's useful to have on the receive side in the Charm Core (ck.C)
118 class CkCoreState {
119         GroupTable *groupTable;
120         QdState *qd;
121 public:
122         CkMessageWatcher *watcher;
123         /** Adds an extra watcher (which wrap the previously existing one) */
124         inline void addWatcher(CkMessageWatcher *w) {
125           w->setNext(watcher);
126           watcher = w;
127         }
128         
129         CkCoreState() 
130                 :groupTable(CkpvAccess(_groupTable)),
131                  qd(CpvAccess(_qd)) { watcher=NULL; }
132         ~CkCoreState() { delete watcher;}
133
134         inline GroupTable *getGroupTable() {
135                 return groupTable;
136         }
137         inline IrrGroup *localBranch(CkGroupID gID) {
138                 return groupTable->find(gID).getObj();
139         }
140
141         inline QdState *getQD() {return qd;}
142         // when in interrupt based net version, use the extra copy
143         // of qd when inside an immediate handler function.
144         inline void process(int n=1) {
145           if (CmiImmIsRunning())
146             CpvAccessOther(_qd, 1)->process(n);
147           else
148             qd->process(n);
149         }
150         inline void create(int n=1) {
151           if (CmiImmIsRunning())
152             CpvAccessOther(_qd, 1)->create(n);
153           else
154             qd->create(n);
155         }
156 };
157
158 CkpvExtern(CkCoreState *, _coreState);
159
160 void CkMessageWatcherInit(char **argv,CkCoreState *ck);
161
162 extern void _processHandler(void *converseMsg,CkCoreState *ck);
163 extern void _processBocInitMsg(CkCoreState *ck,envelope *msg);
164 extern void _processNodeBocInitMsg(CkCoreState *ck,envelope *msg);
165 extern void _infoFn(void *msg, CldPackFn *pfn, int *len,
166                     int *queueing, int *priobits, UInt **prioptr);
167 extern void CkCreateLocalGroup(CkGroupID groupID, int eIdx, envelope *env);
168 extern void CkCreateLocalNodeGroup(CkGroupID groupID, int eIdx, envelope *env);
169 extern void _createGroup(CkGroupID groupID, envelope *env);
170 extern void _createNodeGroup(CkGroupID groupID, envelope *env);
171 extern int _getGroupIdx(int,int,int);
172
173 #endif