0833c54df32808fd797f72c39f82749aa2b4820c
[charm.git] / src / ck-core / sdag.h
1 #ifndef _sdag_H_
2 #define _sdag_H_
3
4 #include "pup.h"
5
6 namespace SDAG {
7   struct Closure : public PUP::able {
8     virtual void pup(PUP::er& p) = 0;
9     PUPable_abstract(Closure);
10     int continuations;
11     // reference count and self-destruct when no continuations have a reference
12     void ref() { continuations++; }
13     void deref() { if (--continuations <= 0) delete this; }
14     // done this way to keep Closure abstract for PUP reasons
15     // these must be called by descendents of Closure
16     void packClosure(PUP::er& p) { p | continuations; }
17     void init() { continuations = 1; }
18     virtual ~Closure() { }
19   };
20 }
21
22 #include <vector>
23 #include <list>
24 #include <map>
25 #include <set>
26
27 #include <pup_stl.h>
28 #include <envelope.h>
29 #include <debug-charm.h>
30
31 class CkMessage;
32
33 namespace SDAG {
34   struct TransportableBigSimLog : public Closure {
35     void* log;
36     TransportableBigSimLog() : log(0) { init(); }
37     TransportableBigSimLog(CkMigrateMessage*) : log(0) { init(); }
38
39     TransportableBigSimLog(void* log)
40       : log(log) { init(); }
41
42     void pup(PUP::er& p) {
43       if (p.isUnpacking()) log = 0;
44       else if (log != 0)
45         CkAbort("BigSim logs stored by SDAG are not migratable\n");
46       packClosure(p);
47     }
48     PUPable_decl(TransportableBigSimLog);
49   };
50
51   struct ForallClosure : public Closure {
52     int val;
53     ForallClosure() : val(0) { init(); }
54     ForallClosure(CkMigrateMessage*) : val(0) { init(); }
55     ForallClosure(int val) : val(val) { init(); }
56
57     void pup(PUP::er& p) {
58       p | val;
59       packClosure(p);
60     }
61     PUPable_decl(ForallClosure);
62     int& getP0() { return val; }
63   };
64
65   struct MsgClosure : public Closure {
66     void* msg;
67
68     MsgClosure() : msg(0) { init(); continuations = 0; }
69     MsgClosure(CkMigrateMessage*) : msg(0) { init(); continuations = 0; }
70
71     MsgClosure(void* msg)
72       : msg(msg) {
73       init();
74       continuations = 0;
75       CmiReference(UsrToEnv(msg));
76     }
77
78     void pup(PUP::er& p) {
79       bool hasMsg = msg;
80       p | hasMsg;
81       if (hasMsg) CkPupMessage(p, (void**)&msg);
82       if (hasMsg && p.isUnpacking())
83         CmiReference(UsrToEnv(msg));
84       packClosure(p);
85     }
86
87     virtual ~MsgClosure() {
88       if (msg) CmiFree(UsrToEnv(msg));
89     }
90
91     PUPable_decl(MsgClosure);
92   };
93
94   class CCounter : public Closure {
95   private:
96     unsigned int count;
97   public:
98     CCounter() { init(); }
99     CCounter(CkMigrateMessage*) { init(); }
100     CCounter(int c) : count(c) { init(); }
101     CCounter(int first, int last, int stride) {
102       init();
103       count = ((last - first) / stride) + 1;
104     }
105     void decrement(void) { count--; }
106     int isDone(void) { return (count == 0); }
107
108     void pup(PUP::er& p) {
109       p | count;
110       packClosure(p);
111     }
112     PUPable_decl(CCounter);
113   };
114
115   struct CSpeculator : public Closure {
116     int speculationIndex;
117
118     CSpeculator() : speculationIndex(0) { init(); }
119     CSpeculator(CkMigrateMessage*) : speculationIndex(0) { init(); }
120
121     CSpeculator(int speculationIndex_)
122       : speculationIndex(speculationIndex_) { init(); }
123
124     void pup(PUP::er& p) {
125       p | speculationIndex;
126       packClosure(p);
127     }
128     PUPable_decl(CSpeculator);
129   };
130
131   struct Continuation : public PUP::able {
132     int whenID;
133     std::vector<Closure*> closure;
134     std::vector<CMK_REFNUM_TYPE> entries, refnums;
135     std::vector<int> anyEntries;
136     int speculationIndex;
137
138     Continuation() : speculationIndex(-1) { }
139     Continuation(CkMigrateMessage*) : speculationIndex(-1) { }
140
141     Continuation(int whenID)
142       : whenID(whenID)
143       , speculationIndex(-1) { }
144
145     void pup(PUP::er& p) {
146       p | whenID;
147       p | closure;
148       p | entries;
149       p | refnums;
150       p | anyEntries;
151       p | speculationIndex;
152     }
153
154     void addClosure(Closure* cl) {
155       if (cl) cl->ref();
156       closure.push_back(cl);
157     }
158
159     virtual ~Continuation() {
160       for (size_t i = 0; i < closure.size(); i++)
161         if (closure[i])
162           closure[i]->deref();
163     }
164
165     PUPable_decl(Continuation);
166   };
167
168   struct Buffer : public PUP::able {
169     int entry;
170     CMK_REFNUM_TYPE refnum;
171     Closure* cl;
172
173 #if CMK_BIGSIM_CHARM
174     void *bgLog1, *bgLog2;
175 #endif
176
177     Buffer(CkMigrateMessage*) { }
178
179     Buffer(int entry, Closure* cl, CMK_REFNUM_TYPE refnum)
180       : entry(entry)
181       , refnum(refnum)
182       , cl(cl)
183 #if CMK_BIGSIM_CHARM
184       , bgLog1(0)
185       , bgLog2(0)
186 #endif
187     {
188       if (cl) cl->ref();
189     }
190
191     void pup(PUP::er& p) {
192       p | entry;
193       p | refnum;
194       bool hasCl = cl;
195       p | hasCl;
196       if (hasCl)
197         p | cl;
198       else
199         cl = 0;
200 #if CMK_BIGSIM_CHARM
201       if (p.isUnpacking())
202         bgLog1 = bgLog2 = 0;
203       else if (bgLog1 != 0 && bgLog2 != 0)
204         CkAbort("BigSim logs stored by SDAG are not migratable\n");
205 #endif
206     }
207
208     virtual ~Buffer() {
209       if (cl) cl->deref();
210     }
211
212     PUPable_decl(Buffer);
213   };
214
215   struct Dependency {
216     std::vector<std::list<int> > entryToWhen;
217     std::vector<std::list<Continuation*> > whenToContinuation;
218
219     // entry -> lst of buffers
220     // @todo this will have sequential lookup time for specific reference
221     // numbers
222     std::vector<std::list<Buffer*> > buffer;
223
224     int curSpeculationIndex;
225
226     void pup(PUP::er& p) {
227       p | curSpeculationIndex;
228       p | entryToWhen;
229       p | buffer;
230       p | whenToContinuation;
231     }
232
233     Dependency(int numEntries, int numWhens)
234       : entryToWhen(numEntries)
235       , whenToContinuation(numWhens)
236       , buffer(numEntries)
237       , curSpeculationIndex(0)
238       { }
239
240     // after a migration free the structures
241     ~Dependency() {
242       for (std::vector<std::list<Buffer*> >::iterator iter = buffer.begin();
243            iter != buffer.end(); ++iter) {
244         std::list<Buffer*> lst = *iter;
245         for (std::list<Buffer*>::iterator iter2 = lst.begin();
246              iter2 != lst.end(); ++iter2) {
247           delete *iter2;
248         }
249       }
250
251       for (size_t i = 0; i < whenToContinuation.size(); i++) {
252         for (std::list<Continuation*>::iterator iter2 = whenToContinuation[i].begin();
253              iter2 != whenToContinuation[i].end(); ++iter2) {
254           delete *iter2;
255         }
256       }
257     }
258
259     void addDepends(int whenID, int entry) {
260       entryToWhen[entry].push_back(whenID);
261     }
262
263     void reg(Continuation *c) {
264       //printf("registering new continuation %p, whenID = %d\n", c, c->whenID);
265       whenToContinuation[c->whenID].push_back(c);
266     }
267
268     void dereg(Continuation *c) {
269       CkAssert(c->whenID < (int)whenToContinuation.size());
270       std::list<Continuation*>& lst = whenToContinuation[c->whenID];
271       lst.remove(c);
272     }
273
274     Buffer* pushBuffer(int entry, Closure *cl, CMK_REFNUM_TYPE refnum) {
275       Buffer* buf = new Buffer(entry, cl, refnum);
276       buffer[entry].push_back(buf);
277       return buf;
278     }
279
280     Continuation *tryFindContinuation(int entry) {
281       for (std::list<int>::iterator iter = entryToWhen[entry].begin();
282            iter != entryToWhen[entry].end();
283            ++iter) {
284         int whenID = *iter;
285
286         for (std::list<Continuation*>::iterator iter2 = whenToContinuation[whenID].begin();
287              iter2 != whenToContinuation[whenID].end();
288              iter2++) {
289           Continuation* c = *iter2;
290           if (searchBufferedMatching(c)) {
291             dereg(c);
292             return c;
293           }
294         }
295       }
296       //printf("no continuation found\n");
297       return 0;
298     }
299
300     bool searchBufferedMatching(Continuation* t) {
301       CkAssert(t->entries.size() == t->refnums.size());
302       for (size_t i = 0; i < t->entries.size(); i++) {
303         if (!tryFindMessage(t->entries[i], true, t->refnums[i], 0)) {
304           return false;
305         }
306       }
307       for (size_t i = 0; i < t->anyEntries.size(); i++) {
308         if (!tryFindMessage(t->anyEntries[i], false, 0, 0)) {
309           return false;
310         }
311       }
312       return true;
313     }
314
315     Buffer* tryFindMessage(int entry, bool hasRef, CMK_REFNUM_TYPE refnum, std::set<Buffer*>* ignore) {
316       // @todo sequential lookup for buffer with reference number or ignore set
317       for (std::list<Buffer*>::iterator iter = buffer[entry].begin();
318            iter != buffer[entry].end();
319            ++iter) {
320         if ((!hasRef || (*iter)->refnum == refnum) &&
321             (!ignore || ignore->find(*iter) == ignore->end()))
322           return *iter;
323       }
324       return 0;
325     }
326
327     Buffer* tryFindMessage(int entry) {
328       if (buffer[entry].size() == 0)
329         return 0;
330       else
331         return buffer[entry].front();
332     }
333
334     void removeMessage(Buffer *buf) {
335       buffer[buf->entry].remove(buf);
336     }
337
338     int getAndIncrementSpeculationIndex() {
339       return curSpeculationIndex++;
340     }
341
342     void removeAllSpeculationIndex(int speculationIndex) {
343       for (std::vector<std::list<Continuation*> >::iterator iter = whenToContinuation.begin();
344            iter != whenToContinuation.end();
345            ++iter) {
346         std::list<Continuation*>& lst = *iter;
347
348         for (std::list<Continuation*>::iterator iter2 = lst.begin();
349              iter2 != lst.end();
350              //cppcheck-suppress StlMissingComparison
351              ) {
352           if ((*iter2)->speculationIndex == speculationIndex) {
353             Continuation *cancelled = *iter2;
354             //cppcheck-suppress StlMissingComparison
355             iter2 = lst.erase(iter2);
356             delete cancelled;
357           } else {
358             iter2++;
359           }
360         }
361       }
362     }
363   };
364
365   void registerPUPables();
366 }
367
368 #endif