Critical path header changes for the pics merge
[charm.git] / src / ck-core / sdag.h
1 #ifndef _sdag_H_
2 #define _sdag_H_
3
4 #include "pup.h"
5
6 namespace SDAG {
7   struct Closure : public PUP::able {
8     virtual void pup(PUP::er& p) = 0;
9     PUPable_abstract(Closure);
10     int continuations;
11     // reference count and self-destruct when no continuations have a reference
12     void ref() { continuations++; }
13     void deref() { if (--continuations <= 0) delete this; }
14     // done this way to keep Closure abstract for PUP reasons
15     // these must be called by descendents of Closure
16     void packClosure(PUP::er& p) { p | continuations; }
17     void init() { continuations = 1; }
18     virtual ~Closure() { }
19   };
20 }
21
22 #include <vector>
23 #include <list>
24 #include <map>
25 #include <set>
26
27 #include <pup_stl.h>
28 #include <envelope.h>
29 #include <debug-charm.h>
30
31 class CkMessage;
32 #if USE_CRITICAL_PATH_HEADER_ARRAY
33 class MergeablePathHistory;
34 #endif
35 namespace SDAG {
36   struct TransportableBigSimLog : public Closure {
37     void* log;
38     TransportableBigSimLog() : log(0) { init(); }
39     TransportableBigSimLog(CkMigrateMessage*) : log(0) { init(); }
40
41     TransportableBigSimLog(void* log)
42       : log(log) { init(); }
43
44     void pup(PUP::er& p) {
45       if (p.isUnpacking()) log = 0;
46       else if (log != 0)
47         CkAbort("BigSim logs stored by SDAG are not migratable\n");
48       packClosure(p);
49     }
50     PUPable_decl(TransportableBigSimLog);
51   };
52
53   struct ForallClosure : public Closure {
54     int val;
55     ForallClosure() : val(0) { init(); }
56     ForallClosure(CkMigrateMessage*) : val(0) { init(); }
57     ForallClosure(int val) : val(val) { init(); }
58
59     void pup(PUP::er& p) {
60       p | val;
61       packClosure(p);
62     }
63     PUPable_decl(ForallClosure);
64     int& getP0() { return val; }
65   };
66
67   struct MsgClosure : public Closure {
68     void* msg;
69
70     MsgClosure() : msg(0) { init(); continuations = 0; }
71     MsgClosure(CkMigrateMessage*) : msg(0) { init(); continuations = 0; }
72
73     MsgClosure(void* msg)
74       : msg(msg) {
75       init();
76       continuations = 0;
77       CmiReference(UsrToEnv(msg));
78     }
79
80     void pup(PUP::er& p) {
81       bool hasMsg = msg;
82       p | hasMsg;
83       if (hasMsg) CkPupMessage(p, (void**)&msg);
84       if (hasMsg && p.isUnpacking())
85         CmiReference(UsrToEnv(msg));
86       packClosure(p);
87     }
88
89     virtual ~MsgClosure() {
90       if (msg) CmiFree(UsrToEnv(msg));
91     }
92
93     PUPable_decl(MsgClosure);
94   };
95
96   class CCounter : public Closure {
97   private:
98     unsigned int count;
99   public:
100     CCounter() { init(); }
101     CCounter(CkMigrateMessage*) { init(); }
102     CCounter(int c) : count(c) { init(); }
103     CCounter(int first, int last, int stride) {
104       init();
105       count = ((last - first) / stride) + 1;
106     }
107     void decrement(void) { count--; }
108     int isDone(void) { return (count == 0); }
109
110     void pup(PUP::er& p) {
111       p | count;
112       packClosure(p);
113     }
114     PUPable_decl(CCounter);
115   };
116
117   struct CSpeculator : public Closure {
118     int speculationIndex;
119
120     CSpeculator() : speculationIndex(0) { init(); }
121     CSpeculator(CkMigrateMessage*) : speculationIndex(0) { init(); }
122
123     CSpeculator(int speculationIndex_)
124       : speculationIndex(speculationIndex_) { init(); }
125
126     void pup(PUP::er& p) {
127       p | speculationIndex;
128       packClosure(p);
129     }
130     PUPable_decl(CSpeculator);
131   };
132
133   struct Continuation : public PUP::able {
134     int whenID;
135     std::vector<Closure*> closure;
136     std::vector<CMK_REFNUM_TYPE> entries, refnums;
137     std::vector<int> anyEntries;
138     int speculationIndex;
139     Continuation() : speculationIndex(-1) { }
140     Continuation(CkMigrateMessage*) : speculationIndex(-1) { }
141
142     Continuation(int whenID)
143       : whenID(whenID)
144       , speculationIndex(-1) { }
145
146     void pup(PUP::er& p) {
147       p | whenID;
148       p | closure;
149       p | entries;
150       p | refnums;
151       p | anyEntries;
152       p | speculationIndex;
153     }
154
155     void addClosure(Closure* cl) {
156       if (cl) cl->ref();
157       closure.push_back(cl);
158     }
159
160 #if USE_CRITICAL_PATH_HEADER_ARRAY
161     MergeablePathHistory *saved;
162     void setPath(MergeablePathHistory *tmp) {saved = tmp;}
163
164     MergeablePathHistory *getPath() { return saved;}
165 #endif
166
167     virtual ~Continuation() {
168       for (size_t i = 0; i < closure.size(); i++)
169         if (closure[i])
170           closure[i]->deref();
171     }
172
173     PUPable_decl(Continuation);
174   };
175
176   struct Buffer : public PUP::able {
177     int entry;
178     CMK_REFNUM_TYPE refnum;
179     Closure* cl;
180 #if USE_CRITICAL_PATH_HEADER_ARRAY
181     MergeablePathHistory *savedPath;
182 #endif
183 #if CMK_BIGSIM_CHARM
184     void *bgLog1, *bgLog2;
185 #endif
186
187     Buffer(CkMigrateMessage*) { }
188
189     Buffer(int entry, Closure* cl, CMK_REFNUM_TYPE refnum)
190       : entry(entry)
191       , refnum(refnum)
192       , cl(cl)
193 #if CMK_BIGSIM_CHARM
194       , bgLog1(0)
195       , bgLog2(0)
196 #endif
197 #if USE_CRITICAL_PATH_HEADER_ARRAY
198       ,savedPath(NULL)
199 #endif
200     {
201       if (cl) cl->ref();
202     }
203
204 #if USE_CRITICAL_PATH_HEADER_ARRAY
205     void  setPath(MergeablePathHistory* p)
206     {
207         savedPath = p;
208     }
209
210     MergeablePathHistory* getPath() 
211     {
212         return savedPath;
213     }
214 #endif
215     void pup(PUP::er& p) {
216       p | entry;
217       p | refnum;
218       bool hasCl = cl;
219       p | hasCl;
220       if (hasCl)
221         p | cl;
222       else
223         cl = 0;
224 #if CMK_BIGSIM_CHARM
225       if (p.isUnpacking())
226         bgLog1 = bgLog2 = 0;
227       else if (bgLog1 != 0 && bgLog2 != 0)
228         CkAbort("BigSim logs stored by SDAG are not migratable\n");
229 #endif
230     }
231
232     virtual ~Buffer() {
233       if (cl) cl->deref();
234     }
235
236     PUPable_decl(Buffer);
237   };
238
239   struct Dependency {
240     std::vector<std::list<int> > entryToWhen;
241     std::vector<std::list<Continuation*> > whenToContinuation;
242
243     // entry -> lst of buffers
244     // @todo this will have sequential lookup time for specific reference
245     // numbers
246     std::vector<std::list<Buffer*> > buffer;
247
248     int curSpeculationIndex;
249
250     void pup(PUP::er& p) {
251       p | curSpeculationIndex;
252       p | entryToWhen;
253       p | buffer;
254       p | whenToContinuation;
255     }
256
257     Dependency(int numEntries, int numWhens)
258       : entryToWhen(numEntries)
259       , whenToContinuation(numWhens)
260       , buffer(numEntries)
261       , curSpeculationIndex(0)
262       { }
263
264     // after a migration free the structures
265     ~Dependency() {
266       for (std::vector<std::list<Buffer*> >::iterator iter = buffer.begin();
267            iter != buffer.end(); ++iter) {
268         std::list<Buffer*> lst = *iter;
269         for (std::list<Buffer*>::iterator iter2 = lst.begin();
270              iter2 != lst.end(); ++iter2) {
271           delete *iter2;
272         }
273       }
274
275       for (size_t i = 0; i < whenToContinuation.size(); i++) {
276         for (std::list<Continuation*>::iterator iter2 = whenToContinuation[i].begin();
277              iter2 != whenToContinuation[i].end(); ++iter2) {
278           delete *iter2;
279         }
280       }
281     }
282
283     void addDepends(int whenID, int entry) {
284       entryToWhen[entry].push_back(whenID);
285     }
286
287     void reg(Continuation *c) {
288       //printf("registering new continuation %p, whenID = %d\n", c, c->whenID);
289       whenToContinuation[c->whenID].push_back(c);
290     }
291
292     void dereg(Continuation *c) {
293       CkAssert(c->whenID < (int)whenToContinuation.size());
294       std::list<Continuation*>& lst = whenToContinuation[c->whenID];
295       lst.remove(c);
296     }
297
298     Buffer* pushBuffer(int entry, Closure *cl, CMK_REFNUM_TYPE refnum) {
299       Buffer* buf = new Buffer(entry, cl, refnum);
300       buffer[entry].push_back(buf);
301       return buf;
302     }
303
304     Continuation *tryFindContinuation(int entry) {
305       for (std::list<int>::iterator iter = entryToWhen[entry].begin();
306            iter != entryToWhen[entry].end();
307            ++iter) {
308         int whenID = *iter;
309
310         for (std::list<Continuation*>::iterator iter2 = whenToContinuation[whenID].begin();
311              iter2 != whenToContinuation[whenID].end();
312              iter2++) {
313           Continuation* c = *iter2;
314           if (searchBufferedMatching(c)) {
315             dereg(c);
316             return c;
317           }
318         }
319       }
320       //printf("no continuation found\n");
321       return 0;
322     }
323
324     bool searchBufferedMatching(Continuation* t) {
325       CkAssert(t->entries.size() == t->refnums.size());
326       for (size_t i = 0; i < t->entries.size(); i++) {
327         if (!tryFindMessage(t->entries[i], true, t->refnums[i], 0)) {
328           return false;
329         }
330       }
331       for (size_t i = 0; i < t->anyEntries.size(); i++) {
332         if (!tryFindMessage(t->anyEntries[i], false, 0, 0)) {
333           return false;
334         }
335       }
336       return true;
337     }
338
339     Buffer* tryFindMessage(int entry, bool hasRef, CMK_REFNUM_TYPE refnum, std::set<Buffer*>* ignore) {
340       // @todo sequential lookup for buffer with reference number or ignore set
341       for (std::list<Buffer*>::iterator iter = buffer[entry].begin();
342            iter != buffer[entry].end();
343            ++iter) {
344         if ((!hasRef || (*iter)->refnum == refnum) &&
345             (!ignore || ignore->find(*iter) == ignore->end()))
346           return *iter;
347       }
348       return 0;
349     }
350
351     Buffer* tryFindMessage(int entry) {
352       if (buffer[entry].size() == 0)
353         return 0;
354       else
355         return buffer[entry].front();
356     }
357
358     void removeMessage(Buffer *buf) {
359       buffer[buf->entry].remove(buf);
360     }
361
362     int getAndIncrementSpeculationIndex() {
363       return curSpeculationIndex++;
364     }
365
366     void removeAllSpeculationIndex(int speculationIndex) {
367       for (std::vector<std::list<Continuation*> >::iterator iter = whenToContinuation.begin();
368            iter != whenToContinuation.end();
369            ++iter) {
370         std::list<Continuation*>& lst = *iter;
371
372         for (std::list<Continuation*>::iterator iter2 = lst.begin();
373              iter2 != lst.end();
374              //cppcheck-suppress StlMissingComparison
375              ) {
376           if ((*iter2)->speculationIndex == speculationIndex) {
377             Continuation *cancelled = *iter2;
378             //cppcheck-suppress StlMissingComparison
379             iter2 = lst.erase(iter2);
380             delete cancelled;
381           } else {
382             iter2++;
383           }
384         }
385       }
386     }
387   };
388
389   void registerPUPables();
390 }
391
392 #endif