added basic support for load balancing in pose (using the general LB framework).
[charm.git] / src / libs / ck-libs / pose / sim.h
1 /// Sim is the base class for all poser entities
2 /** This is the wrapper class that encapsulates synchronization strategy,
3     representation object and event queue, and manages the orchestration
4     of these components. 
5     This file also defines the three basic POSE messages: 
6     eventMsg, from which all user messages inherit; 
7     cancelMsg, for cancelling events; 
8     and prioMsg, a null message.
9     All three have a priority field which is set to the timestamp; thus all 
10     messages with earlier timestamp have higher priority. */
11 #ifndef SIM_H
12 #define SIM_H
13 #include "sim.decl.h"
14 #include <stdarg.h>
15
16 void POSE_prepExit(void *param, void *msg); 
17 extern CProxy_sim POSE_Objects; 
18 extern CProxy_sim POSE_Objects_RO; 
19 extern CkChareID POSE_Coordinator_ID; 
20 extern POSE_Config pose_config;
21 class sim; // needed for eventMsg definition below
22
23 /// All user event messages inherit from this
24 /** Adds timestamp and event ID to message, plus other info useful for the
25     underlying simulation layer.  Prioritized by default, and given a priority
26     based on the timestamp. Events which take no parameters must
27     still pass an eventMsg. */
28 class eventMsg : public CMessage_eventMsg {
29 public:
30   /// The event's timestamp
31   POSE_TimeType timestamp;  
32   /// The event's globally unique ID
33   eventID evID;   
34   /// The message size, used for message recycling (currently not used)
35   size_t msgSize;    
36   /// Pointer to a poser wrapper; used to send the pointer to rep object
37   sim *parent;    
38   /// Pointer to synchronization strategy; used when creating rep object
39   strat *str;
40   /// Relative start time: for computing degree of parallelization
41   double rst;
42   /// Basic Constructor
43   eventMsg() { rst = 0.0; parent = NULL; str = NULL; evID.init();}
44   /// Destructor
45   virtual ~eventMsg() { }
46   void sanitize() {
47     CkAssert(timestamp > -1);
48     CkAssert(evID.getPE() > -1);
49     CkAssert(evID.getPE() < CkNumPes());
50     CkAssert(parent == NULL);
51     CkAssert(str == NULL);
52     CkAssert(msgSize >= 0);
53   }
54   /// Timestamps this message and generates a unique event ID
55   /** Timestamps this message and generates a unique event ID for the event
56       to be invoked on the receiving side.  Sets the priority of this
57       message to timestamp - POSE_TimeMax. */
58   void Timestamp(POSE_TimeType t) { 
59     timestamp = t;  
60     if (evID.getPE() == -1)
61       evID = GetEventID();  
62     setPriority(t-POSE_TimeMax); 
63     rst = 0.0;
64     parent = NULL; str = NULL;
65   }
66   void SetSequenceNumber(int ctrl) { 
67     evID = GetEventID();  
68     evID.setControl(ctrl);  
69   }
70   /// Assignment operator: copies priority too
71   eventMsg& operator=(const eventMsg& obj) {
72     timestamp = obj.timestamp;
73     evID = obj.evID;
74     parent = obj.parent;
75     str = obj.str;
76     //msgSize = obj.msgSize;
77     rst = obj.rst;
78     setPriority(timestamp-POSE_TimeMax); 
79     return *this;
80   }
81   /// Allocates event message with space for priority
82   /** This can also handle event message recycling (currently off) */
83   void *operator new (size_t size) {  
84 #ifndef SEQUENTIAL_POSE
85 #ifdef MSG_RECYCLING
86     MemoryPool *localPool = (MemoryPool *)CkLocalBranch(MemPoolID);
87     if (localPool->CheckPool(size) > 0)
88       return localPool->GetBlock(size);
89     else {
90 #endif
91 #endif
92       void *msg = CkAllocMsg(CMessage_eventMsg::__idx, size, 8*sizeof(POSE_TimeType));
93       ((eventMsg *)msg)->msgSize = size;
94       return msg;
95 #ifndef SEQUENTIAL_POSE
96 #ifdef MSG_RECYCLING
97     }
98 #endif
99 #endif
100   }
101   void operator delete(void *p) { 
102 #ifndef SEQUENTIAL_POSE
103 #ifdef MSG_RECYCLING
104     MemoryPool *localPool = (MemoryPool *)CkLocalBranch(MemPoolID);
105     int ps = localPool->CheckPool(((eventMsg *)p)->msgSize);
106     if ((ps < MAX_POOL_SIZE) && (ps > -1)) {
107       size_t msgSize = ((eventMsg *)p)->msgSize;
108       memset(p, 0, msgSize);
109       ((eventMsg *)p)->msgSize = msgSize;
110       localPool->PutBlock(msgSize, p);
111     }
112     else
113 #endif
114 #endif
115       CkFreeMsg(p);
116   }
117   /// Set priority field and queuing strategy
118   void setPriority(POSE_TimeType prio) {
119 #if USE_LONG_TIMESTAMPS
120     memcpy(((POSE_TimeType *)CkPriorityPtr(this)),&prio,sizeof(POSE_TimeType));
121     CkSetQueueing(this, CK_QUEUEING_LFIFO);
122 #else
123     *((int*)CkPriorityPtr(this)) = prio;
124     CkSetQueueing(this, CK_QUEUEING_IFIFO);
125 #endif
126   }
127 };
128
129 /// Cancellation message
130 class cancelMsg : public CMessage_cancelMsg {
131 public:
132   /// Event to cancel
133   /** Only this is needed to find the event to cancel */
134   eventID evID;
135   /// Timestamp of event to be cancelled
136   /** Providing this makes finding the event faster */
137   POSE_TimeType timestamp;          
138   /// Allocate cancellation message with priority field
139   void *operator new (size_t size) {  
140     return CkAllocMsg(CMessage_cancelMsg::__idx, size, 8*sizeof(POSE_TimeType));
141   } 
142   /// Delete cancellation message
143   void operator delete(void *p) {  CkFreeMsg(p);  }
144   /// Set priority field and queuing strategy
145   void setPriority(POSE_TimeType prio) {
146 #if USE_LONG_TIMESTAMPS
147     memcpy(((POSE_TimeType *)CkPriorityPtr(this)),&prio,sizeof(POSE_TimeType));
148     CkSetQueueing(this, CK_QUEUEING_LFIFO);
149 #else
150     *((int*)CkPriorityPtr(this)) = prio;
151     CkSetQueueing(this, CK_QUEUEING_IFIFO);
152 #endif
153   }
154
155 };
156
157 /// Prioritized null msg; used to sort Step calls
158 class prioMsg : public CMessage_prioMsg {
159 public:
160   /// Allocate prioritized message with priority field
161   void *operator new (size_t size) {
162     return CkAllocMsg(CMessage_eventMsg::__idx, size, 8*sizeof(POSE_TimeType));
163   }
164   /// Delete prioritized message
165   void operator delete(void *p) {  CkFreeMsg(p);  }
166   /// Set priority field and queuing strategy
167   void setPriority(POSE_TimeType prio) {
168 #if USE_LONG_TIMESTAMPS
169     memcpy(((POSE_TimeType *)CkPriorityPtr(this)),&prio,sizeof(POSE_TimeType));
170     CkSetQueueing(this, CK_QUEUEING_LFIFO);
171 #else
172     *((int*)CkPriorityPtr(this)) = prio;
173     CkSetQueueing(this, CK_QUEUEING_IFIFO);
174
175 #endif
176   }
177 };
178
179 /// Used to specify a destination processor to migrate to during load balancing
180 class destMsg : public CMessage_destMsg {
181 public:
182   int destPE;
183 };
184
185
186 /// Poser wrapper base class
187 /** The poser base class: all user posers are translated to classes that
188     inherit from this class, and act as wrappers around the actual user 
189     object's representation to control the simulation behavior.  These 
190     objects are plugged into the POSE_objects array which is of this type. */
191 class sim : public CBase_sim {
192  protected:
193   /// Flag to indicate that a Step message is scheduled
194   /** Need to re-evaluate the need/function of this... also, how is it used
195       during load balancing... */
196   int active; 
197  public:
198   /// This poser's event queue
199   eventQueue *eq;
200   /// This poser's synchronization strategy   
201   strat *myStrat;
202   /// This poser's user representation
203   rep *objID;       
204   /// List of incoming cancellations for this poser
205   CancelList cancels;
206   /// The local PVT to report to
207   PVT *localPVT;    
208   /// Unique global ID for this object on PVT branch
209   int myPVTidx;
210   /// Unique global ID for this object in load balancing data structures
211   int myLBidx;
212   /// Number of forward execution steps
213   /** Is this needed/used? This is load balancing data... */
214   int DOs;
215   /// Number of undone events
216   /** Is this needed/used? This is load balancing data... */
217   int UNDOs;
218   /// Synchronization strategy type (optimistic or conservative)
219   int sync;
220   /// Number of sends/recvs per PE
221   int *srVector;    
222   /// Most recent GVT estimate
223   POSE_TimeType lastGVT;
224   /// Relative start time, start time, end time and current time
225   /** Used to calculate degree of parallelism */
226   double st, et, ct;
227 #ifndef CMK_OPTIMIZE
228   /// The local statistics collector
229   localStat *localStats; 
230 #endif
231   /// The local load balancer
232   LBgroup *localLBG;
233   /// Basic Constructor
234   sim(void);
235   sim(CkMigrateMessage *) {};
236   /// Destructor
237   virtual ~sim();
238   /// Pack/unpack/sizing operator
239   virtual void pup(PUP::er &p);
240   /// Start a forward execution step on myStrat
241   void Step();                 
242   /// Start a prioritized forward execution step on myStrat
243   void Step(prioMsg *m);       
244   /// Report safe time to PVT branch
245   void Status() { 
246     localPVT->objUpdateOVT(myPVTidx, myStrat->SafeTime(), objID->OVT()); 
247   }
248   /// Commit events based on new GVT estimate
249   void Commit();               
250   /// Add m to cancellation list
251   void Cancel(cancelMsg *m); 
252   /// Report load information to local load balancer
253   void ReportLBdata();
254   /// Migrate this poser to processor indicated in m
255   void Migrate(destMsg *m) { migrateMe(m->destPE); }
256   /// Terminate this poser, when everyone is terminated we exit 
257   void Terminate() { objID->terminus(); int i=1;contribute(sizeof(int),&i,CkReduction::sum_int,CkCallback(POSE_prepExit,NULL)); }
258   /// In sequential mode, begin checkpoint after reaching quiescence
259   void SeqBeginCheckpoint();
260   /// In sequential mode, resume after checkpointing or restarting
261   void SeqResumeAfterCheckpoint();
262   /// Return this poser's unique index on PVT branch
263   int PVTindex() { return myPVTidx; }
264   /// Test active flag
265   int IsActive() { return active; }
266   /// Set active flag
267   void Activate() { active = 1; }
268   /// Unset active flag
269   void Deactivate() { active = 0; }
270   /// Invoke an event on this poser according to fnIdx and pass it msg
271   /** ResolveFn is generated along with the rest of the wrapper object and
272       should handle all possible events on a poser. */
273   virtual void ResolveFn(int fnIdx, void *msg) { }
274   /// Invoke the commit version of an event to handle special behaviors
275   /** This invokes the <fn>_commit method that user provides.  It can be
276       used to perform special activities, statistics gathering, output, or
277       whatever the user wishes. */
278   virtual void ResolveCommitFn(int fnIdx, void *msg) { }
279   /// Notify the PVT of a message send
280   void registerSent(POSE_TimeType timestamp) {
281     localPVT->objUpdate(timestamp, SEND);
282   }
283   /// Used for buffered output
284   /** Output is only printed when the event is committed */
285   void CommitPrintf(const char *Fmt, ...) {
286     va_list ap;
287     va_start(ap,Fmt);
288     InternalCommitPrintf(Fmt, ap);
289     va_end(ap);
290   }
291   /// Used for buffered output of error messages
292   /** Output is only printed when the event is committed */
293   void CommitError(const char *Fmt, ...) {
294     va_list ap;
295     va_start(ap,Fmt);
296     InternalCommitPrintf(Fmt, ap);
297     va_end(ap);
298     myStrat->currentEvent->commitErr = 1;
299   }
300   void ResumeFromSync(void);
301   /// Dump all data fields
302   void dump();
303  private:
304   /// Used by buffered print functions
305   void InternalCommitPrintf (const char *Fmt, va_list ap) {
306     char *tmp;
307     size_t tmplen=myStrat->currentEvent->commitBfrLen + strlen(Fmt) + 1 +512;
308     if (!(tmp = (char *)malloc(tmplen * sizeof(char)))) {
309       CkPrintf("ERROR: sim::CommitPrintf: OUT OF MEMORY!\n");
310       CkExit();
311     }
312     if (myStrat->currentEvent->commitBfr && myStrat->currentEvent->commitBfrLen) {
313       strcpy(tmp, myStrat->currentEvent->commitBfr);
314       free(myStrat->currentEvent->commitBfr);
315       vsnprintf(tmp+strlen(tmp), tmplen, Fmt, ap); 
316     }
317     else vsnprintf(tmp, tmplen, Fmt, ap); 
318     myStrat->currentEvent->commitBfrLen = strlen(tmp) + 1;  
319     myStrat->currentEvent->commitBfr = tmp;
320   }
321 };
322
323 #endif
324
325
326
327
328
329
330
331
332