added basic support for load balancing in pose (using the general LB framework).
[charm.git] / src / libs / ck-libs / pose / sim.C
1 /// Sim is the base class for all poser entities
2 #include "pose.h"
3 #include "sim.def.h"
4
5 /// Global readonly proxy to array containing all posers in a simulation
6 CProxy_sim POSE_Objects;
7 CProxy_sim POSE_Objects_RO;
8 /// Coordinates all startup and shutdown behaviors for POSE simulations
9 CkChareID POSE_Coordinator_ID;
10
11 #ifdef POSE_COMM_ON
12 /// Used with the CommLib
13 extern CkGroupID dmid;
14 //extern int com_debug;
15 #endif
16
17 /// Basic Constructor
18 sim::sim() 
19 {
20 #ifdef VERBOSE_DEBUG
21   CkPrintf("[%d] constructing sim %d\n",CkMyPe(), thisIndex);
22 #endif
23
24 #ifndef SEQUENTIAL_POSE
25   localPVT = (PVT *)CkLocalBranch(ThePVT);
26   if(pose_config.lb_on)
27     localLBG = TheLBG.ckLocalBranch();
28 #endif
29 #ifndef CMK_OPTIMIZE
30   if(pose_config.stats)
31     localStats = (localStat *)CkLocalBranch(theLocalStats);
32 #endif
33   lastGVT = active = DOs = UNDOs = 0;
34   srVector = (int *)malloc(CkNumPes() * sizeof(int));
35   for (int i=0; i<CkNumPes(); i++) srVector[i] = 0;
36   eq = new eventQueue();
37   myStrat = NULL;
38   objID = NULL;
39 }
40
41 /// Destructor
42 sim::~sim() 
43 {
44   active = -1;
45 #ifndef SEQUENTIAL_POSE
46   localPVT->objRemove(myPVTidx);
47 #endif
48   if(pose_config.lb_on)
49     localLBG->objRemove(myLBidx);
50
51   delete(eq);
52   delete(myStrat);
53   delete(objID);
54 }
55
56 /// Pack/unpack/sizing operator
57 void sim::pup(PUP::er &p) {
58   ArrayElement1D::pup(p); // call parent class pup method
59   // pup simple types
60   p(active); p(myPVTidx); p(myLBidx); p(sync); p(DOs); p(UNDOs);
61   // pup event queue
62   if (p.isUnpacking())
63     eq = new eventQueue();
64   eq->pup(p);
65   // pup cancellations
66   cancels.pup(p);
67   if (p.isUnpacking()) { // reactivate migrated object
68 #ifndef CMK_OPTIMIZE
69     localStats = (localStat *)CkLocalBranch(theLocalStats);
70 #endif
71 #ifndef SEQUENTIAL_POSE
72     localPVT = (PVT *)CkLocalBranch(ThePVT);
73     myPVTidx = localPVT->objRegister(thisIndex, localPVT->getGVT(), sync, this);
74     if(pose_config.lb_on){
75       localLBG = TheLBG.ckLocalBranch();
76       myLBidx = localLBG->objRegister(thisIndex, sync, this);
77     }
78 #endif
79     active = 0;
80   }
81   // pup checkpoint info for sequential mode using sim 0 only
82 #ifdef SEQUENTIAL_POSE
83   if (thisIndex == 0) {
84     p|seqCheckpointInProgress;
85     p|seqLastCheckpointGVT;
86     p|seqLastCheckpointTime;
87     p|seqStartTime;
88     p|POSE_Skipped_Events;
89     if (p.isUnpacking()) {
90       seqStartTime = CmiWallTimer() - (seqLastCheckpointTime - seqStartTime);
91     }
92   }
93 #endif
94 }
95
96 /// Start a forward execution step on myStrat
97 void sim::Step()
98 {
99   if (active < 0) return; // object is migrating; deactivate it 
100
101 #ifndef CMK_OPTIMIZE
102   double critStart;
103   if(pose_config.trace)
104     critStart=CmiWallTimer();  // trace timing
105   int tstat;
106   if(pose_config.stats)
107     {
108       tstat= localStats->TimerRunning();
109       if (!tstat)  localStats->TimerStart(SIM_TIMER);
110       else localStats->SwitchTimer(SIM_TIMER);
111     }
112 #endif
113
114   prioMsg *pm;
115   switch (myStrat->STRAT_T) { // step based on strategy type
116   case SEQ_T:
117   case CONS_T:
118   case OPT_T:
119   case OPT2_T: // pass this step call directly to strategy
120   case OPT3_T: // prioritize this step call if work exists
121   case SPEC_T:
122   case ADAPT_T:
123   case ADAPT2_T:
124   case ADAPT3_T:
125   case ADAPT4_T: // pass this step call directly to strategy
126     myStrat->Step();
127     break;
128   default: 
129     CkPrintf("Invalid strategy type: %d\n", myStrat->STRAT_T); 
130     break;
131   }
132 #ifndef CMK_OPTIMIZE
133   if(pose_config.stats)  
134     {
135       if (!tstat)  localStats->TimerStop();
136       else localStats->SwitchTimer(tstat);
137     }
138   if(pose_config.trace)
139     traceUserBracketEvent(60, critStart, CmiWallTimer());
140 #endif
141 }
142
143 /// Start a prioritized forward execution step on myStrat
144 void sim::Step(prioMsg *m)
145 {
146   CkFreeMsg(m);
147   if (active < 0) return; // object is migrating; deactivate it 
148 #ifndef CMK_OPTIMIZE
149   int tstat;
150   if(pose_config.stats)
151     {
152       tstat= localStats->TimerRunning();
153       if (!tstat)
154         localStats->TimerStart(SIM_TIMER);
155       else localStats->SwitchTimer(SIM_TIMER);
156     }
157 #endif
158
159   myStrat->Step(); // Call Step on strategy
160
161 #ifndef CMK_OPTIMIZE
162   if(pose_config.stats)
163     {
164       if (!tstat)
165         localStats->TimerStop();
166       else localStats->SwitchTimer(tstat);
167     }
168 #endif
169 }
170
171 /// Commit events based on new GVT estimate
172 void sim::Commit()
173 {
174   if (active < 0)  return; // object is migrating
175 #ifndef CMK_OPTIMIZE
176   double critStart;
177   if(pose_config.trace)
178     critStart= CmiWallTimer();  // trace timing
179   int tstat;
180   if(pose_config.stats) {
181     tstat = localStats->TimerRunning();
182     if (!tstat)  localStats->TimerStart(SIM_TIMER);
183     else localStats->SwitchTimer(SIM_TIMER);
184   }
185   if(pose_config.stats)
186     localStats->SwitchTimer(FC_TIMER);
187 #endif
188   int isDone=localPVT->done(); 
189   int curGVT=localPVT->getGVT();
190   if (isDone) { // simulation inactive
191     eq->CommitEvents(this, POSE_endtime); // commit all events in queue
192     Terminate();// call terminus on all posers
193   }
194   else if (curGVT > lastGVT + 100) {  // What's the constant doing to us?
195     lastGVT = curGVT;
196     eq->CommitEvents(this, lastGVT); // commit events up to GVT
197   }
198 #ifndef CMK_OPTIMIZE
199   if(pose_config.trace) {
200     traceUserBracketEvent(50, critStart, CmiWallTimer());
201     critStart = CmiWallTimer();
202   }
203   if(pose_config.stats)
204     localStats->SwitchTimer(SIM_TIMER);
205 #endif
206   if (!isDone && (eq->currentPtr->timestamp > -1)) 
207     Step(); // not done; try stepping again
208
209 #ifndef CMK_OPTIMIZE
210   if(pose_config.stats)
211     if (!tstat)  localStats->TimerStop();
212     else localStats->SwitchTimer(tstat);
213   if(pose_config.trace)
214     traceUserBracketEvent(60, critStart, CmiWallTimer());
215 #endif
216 }
217
218 /// Report load information to local load balancer
219 void sim::ReportLBdata()
220 {
221   if(pose_config.lb_on){
222     double rbOh;
223     int numEvents = 0;
224     Event *tmp = eq->currentPtr;
225
226     if (DOs-UNDOs == 0) rbOh = 1.0;
227     else rbOh = ((double)DOs)/((double)(DOs-UNDOs));
228     while (tmp->timestamp > POSE_UnsetTS) {
229       numEvents++;
230       tmp = tmp->next;
231     }
232     localLBG->objUpdate(myLBidx, objID->ovt, eq->currentPtr->timestamp,
233                         numEvents, rbOh, srVector);
234     DOs = UNDOs = 0;
235     for (int i=0; i<CkNumPes(); i++) srVector[i] = 0;
236   }
237 }
238
239 /// Add m to cancellation list
240 void sim::Cancel(cancelMsg *m) 
241 {
242 #ifndef CMK_OPTIMIZE
243   if(pose_config.stats)
244     localStats->TimerStart(CAN_TIMER);
245 #endif
246   //  char str[20];
247   //  CkPrintf("[%d] RECV(cancel) %s at %d...\n", CkMyPe(), m->evID.sdump(str), m->timestamp);      
248   //localPVT = (PVT *)CkLocalBranch(ThePVT);
249   cancels.Insert(m->timestamp, m->evID); // add to cancellations list
250   localPVT->objUpdate(m->timestamp, RECV); // tell PVT branch about recv
251   CkFreeMsg(m);
252
253 #ifndef CMK_OPTIMIZE
254   double critStart;
255   if(pose_config.trace)
256     critStart= CmiWallTimer();  // trace timing
257   if(pose_config.stats)
258     localStats->SwitchTimer(SIM_TIMER);      
259 #endif
260
261   myStrat->Step(); // call Step to handle cancellation
262
263 #ifndef CMK_OPTIMIZE
264   if(pose_config.stats)
265     localStats->TimerStop();
266   if(pose_config.trace)
267     traceUserBracketEvent(60, critStart, CmiWallTimer());
268 #endif
269 }
270
271 // Sequential checkpointing: Two functions, SeqBeginCheckpoint and
272 // SeqResumeAfterCheckpoint, were added to the sim class to
273 // handle this.  Only initiate the checkpointing procedure on sim
274 // 0, after commits have occurred.  This should minimize the
275 // amount of data written to disk.  In order to ensure a stable
276 // state, we wait for quiescence to be reached before beginning
277 // the checkpoint.  Once this happens, sim 0 checkpoints and then
278 // resumes the simulation in SeqResumeAfterCheckpoint.  This
279 // Callback function is also the first POSE function to be called
280 // when restarting from a checkpoint.
281
282 // While waiting for quiescence to be reached, all events with
283 // timestamps less than the checkpoint GVT are allowed to
284 // execute.  All others are skipped, instead storing their sim handles (indices
285 // into the POSE_Objects array) in POSE_Skipped_Events.  Even
286 // though execution is skipped, the events still remain in their
287 // event queues.  When resuming the simulation, both after
288 // checkpointing and after a restart, sim::Step() (which calls
289 // seq::Step()) is called on each poser listed in
290 // POSE_Skipped_Events to execute the skipped events.
291
292 // Checkpoints are initiated approximately every
293 // pose_config.checkpoint_gvt_interval GVT ticks or
294 // pose_config.checkpoint_time_interval seconds (both defined in
295 // pose_config.h).
296
297 /// In sequential mode, begin checkpoint after reaching quiescence
298 void sim::SeqBeginCheckpoint() {
299   // Ensure this only happens on sim 0
300   CkAssert(thisIndex == 0);
301   // Ensure we're checkpointing
302   CkAssert(seqCheckpointInProgress);
303   CkPrintf("POSE: quiescence detected\n");
304   CkPrintf("POSE: beginning checkpoint on sim %d at GVT=%lld time=%.1f sec\n", thisIndex, seqLastCheckpointGVT, CmiWallTimer() - seqStartTime);
305   CkCallback cb(CkIndex_sim::SeqResumeAfterCheckpoint(), CkArrayIndex1D(thisIndex), thisProxy);
306   CkStartCheckpoint(POSE_CHECKPOINT_DIRECTORY, cb);
307 }
308
309 /// In sequential mode, resume after checkpointing or restarting
310 void sim::SeqResumeAfterCheckpoint() {
311   // Ensure this only happens on sim 0
312   CkAssert(thisIndex == 0);
313   // Ensure this function is only called once after a checkpoint
314   CkAssert(seqCheckpointInProgress);
315   seqCheckpointInProgress = 0;
316   POSE_GlobalClock = seqLastCheckpointGVT;
317   CkPrintf("POSE: checkpoint/restart complete on sim %d at GVT=%lld time=%.1f sec\n", thisIndex, POSE_GlobalClock, CmiWallTimer() - seqStartTime);
318   // restart simulation
319   while (POSE_Skipped_Events.length() > 0) {
320     // These Step iterations MUST be executed now, before any messages
321     // are delivered, or else the event queues will break.  To do this
322     // efficiently, since we're in sequential mode, call Step() as a
323     // local function.
324     int index = POSE_Skipped_Events.deq();
325     sim *localSim = POSE_Objects[index].ckLocal();
326     if (localSim == NULL) {
327       CkPrintf("ERROR: could not obtain pointer to local sim object %d after checkpoint/restart\n", index);
328       CkAbort("Pointer to local sim is NULL...this shouldn't happen in sequential mode\n");
329     } else {
330       localSim->Step();
331     }
332   }
333   CkStartQD(CkIndex_pose::stop(), &POSE_Coordinator_ID);
334 }
335
336 void sim::ResumeFromSync()
337 {
338   PVT *localPVT = (PVT *)CkLocalBranch(ThePVT);
339   localPVT->doneLB();
340 }
341
342 /// Dump all data fields
343 void sim::dump()
344 {
345   CkPrintf("[SIM: active=%d sync=%d myPVTidx=%d ", active, sync, myPVTidx);
346   if (objID) objID->dump();
347   else CkPrintf("objID=NULL\n");
348   eq->dump();
349   cancels.dump();
350   CkPrintf("end SIM]\n");
351 }
352