added a command line option +lb_gvt_pose to set the gvt intervals between load balancing.
[charm.git] / src / libs / ck-libs / pose / pose.C
1 /// Global POSE data and functions; includes and dependencies handled here
2 #include "pose.h"
3 #include "pose.def.h"
4
5 CpvDeclare(int, stateRecovery);
6 CpvDeclare(eventID, theEventID);
7
8 void POSEreadCmdLine();
9 #ifdef POSE_COMM_ON
10 extern int com_debug;
11 #endif
12 double busyWait;
13 double sim_timer;
14 int POSE_inactDetect;
15 POSE_TimeType POSE_endtime;
16 POSE_TimeType POSE_GlobalClock;
17 POSE_TimeType POSE_GlobalTS;
18 POSE_Config pose_config;
19 #ifdef POSE_COMM_ON
20 ComlibInstanceHandle POSE_commlib_insthndl;
21 #endif
22 int _POSE_SEQUENTIAL;
23 int seqCheckpointInProgress;
24 POSE_TimeType seqLastCheckpointGVT;
25 double seqLastCheckpointTime;
26 double seqStartTime;
27 CkQ<int> POSE_Skipped_Events;
28
29 const eventID& GetEventID() {
30   //CpvStaticDeclare(eventID, theEventID);  // initializes to [0.pe]
31   //  for each pe called on
32   CpvAccess(theEventID).incEventID();
33   CkAssert(CpvAccess(theEventID).getPE()>=0);
34   return(CpvAccess(theEventID));
35  }
36
37 // Main initialization for all of POSE
38 void POSE_init() // use inactivity detection by default
39 {
40   POSE_init(1, POSE_UnsetTS);
41 }
42
43 void POSE_init(int ET) // a single parameter specifies endtime
44 {
45   POSE_init(0, ET);
46 }
47
48 void POSE_init(int IDflag, int ET) // can specify both
49 {
50   CkPrintf("Initializing POSE...  \n");
51   POSEreadCmdLine();
52   if (pose_config.checkpoint_gvt_interval) {
53     CkPrintf("POSE checkpointing interval set to %lld GVT ticks\n", pose_config.checkpoint_gvt_interval);
54   }
55   if (pose_config.checkpoint_time_interval) {
56     CkPrintf("POSE checkpointing interval set to %d seconds\n", pose_config.checkpoint_time_interval);
57   }
58   POSE_inactDetect = IDflag;
59   POSE_endtime = ET;
60 #ifdef SEQUENTIAL_POSE
61   _POSE_SEQUENTIAL = 1;
62 #else
63   _POSE_SEQUENTIAL = 0;
64 #endif
65 #ifndef CMK_OPTIMIZE
66   traceRegisterUserEvent("Forward Execution", 10);
67   traceRegisterUserEvent("Cancellation", 20);
68   traceRegisterUserEvent("Cancel Spawn", 30);
69   traceRegisterUserEvent("Rollback", 40);
70   traceRegisterUserEvent("Commit", 50);
71   traceRegisterUserEvent("OptSync", 60);
72 #endif
73 #ifndef SEQUENTIAL_POSE
74 #ifdef POSE_COMM_ON
75   // Create the communication library for POSE
76   POSE_commlib_insthndl = CkGetComlibInstance();
77   // Create the communication strategy for POSE
78   StreamingStrategy *strategy = new StreamingStrategy(COMM_TIMEOUT,COMM_MAXMSG);
79   //MeshStreamingStrategy *strategy = new MeshStreamingStrategy(COMM_TIMEOUT,COMM_MAXMSG);
80   //PrioStreaming *strategy = new PrioStreaming(COMM_TIMEOUT,COMM_MAXMSG);
81   //Register the strategy
82   POSE_commlib_insthndl.setStrategy(strategy);
83   //com_debug=1;
84   //CkPrintf("Simulation run with PrioStreaming(%d,%d) for communication optimization...\n", COMM_TIMEOUT, COMM_MAXMSG);
85   CkPrintf("Simulation run with StreamingStrategy(%d,%d) for communication optimization...\n", COMM_TIMEOUT, COMM_MAXMSG);
86   //CkPrintf("Simulation run with MeshStreaming(%d,%d) for communication optimization...\n", COMM_TIMEOUT, COMM_MAXMSG);
87 #endif
88   // Create a MemoryPool with global handle for memory recycling 
89   MemPoolID = CProxy_MemoryPool::ckNew();
90   // Create a Temporal Memory Manager
91   TempMemID = CProxy_TimePool::ckNew();
92 #endif
93   // Initialize statistics collection if desired
94 #ifndef CMK_OPTIMIZE
95   theLocalStats = CProxy_localStat::ckNew();
96   CProxy_globalStat::ckNew(&theGlobalStats);
97 #endif
98 #ifndef SEQUENTIAL_POSE
99   // Initialize global handles to GVT and PVT
100   ThePVT = CProxy_PVT::ckNew(); 
101   TheGVT = CProxy_GVT::ckNew();
102   // Start off using normal forward execution
103   if(pose_config.lb_on)
104     {
105       // Initialize the load balancer
106       TheLBG = CProxy_LBgroup::ckNew();
107       TheLBstrategy = CProxy_LBstrategy::ckNew();
108       CkPrintf("Load balancing is ON.\n");
109     }
110 #endif
111   CProxy_pose::ckNew(&POSE_Coordinator_ID, 0);
112   // Create array to hold all POSE objects
113 #ifdef POSE_COMM_ON  
114   POSE_Objects_RO = CProxy_sim::ckNew(); 
115   POSE_Objects = POSE_Objects_RO;
116 #else
117   POSE_Objects = CProxy_sim::ckNew(); 
118 #endif
119   //#ifndef SEQUENTIAL_POSE
120   //#ifdef POSE_COMM_ON
121   // Make POSE_Objects use the comm lib
122   //  ComlibDelegateProxy(&POSE_Objects);
123   //#endif
124   //#endif
125
126 #ifdef SEQUENTIAL_POSE
127   if (CkNumPes() > 1) CkAbort("ERROR: Cannot run a sequential simulation on more than one processor!\n");
128   CkPrintf("NOTE: POSE running in sequential simulation mode!\n");
129   int fnIdx = CkIndex_pose::stop();
130   CkStartQD(fnIdx, &POSE_Coordinator_ID);
131   POSE_GlobalClock = 0;
132   POSE_GlobalTS = 0;
133   seqCheckpointInProgress = 0;
134   seqLastCheckpointGVT = 0;
135   seqLastCheckpointTime = seqStartTime = CmiWallTimer();
136 #else
137   /*  CkPrintf("WARNING: Charm Quiescence termination enabled!\n");
138   int fnIdx = CkIndex_pose::stop();
139   CkStartQD(fnIdx, &POSE_Coordinator_ID);
140   */
141 #endif  
142   CkPrintf("POSE initialization complete.\n");
143   if (POSE_inactDetect) CkPrintf("Using Inactivity Detection for termination.\n");
144   else 
145 #if USE_LONG_TIMESTAMPS
146     CkPrintf("Using endTime of %lld for termination.\n", POSE_endtime);
147 #else
148     CkPrintf("Using endTime of %d for termination.\n", POSE_endtime);
149 #endif
150   sim_timer = CmiWallTimer(); 
151 }
152
153 void POSE_startTimer() {
154   CkPrintf("Starting simulation...\n"); 
155   sim_timer = CmiWallTimer(); 
156 }
157
158 /// Use Inactivity Detection to terminate program
159 void POSE_useID() 
160 {
161   CkPrintf("WARNING: POSE_useID obsolete. See POSE_init params.\n");
162 }
163
164 /// Use a user-specified end time to terminate program
165 void POSE_useET(POSE_TimeType et) 
166 {
167   CkPrintf("WARNING: POSE_useET obsolete. See POSE_init params.\n");
168 }
169
170 /// Specify an optional callback to be called when simulation terminates
171 void POSE_registerCallBack(CkCallback cb)
172 {
173   CProxy_pose POSE_Coordinator(POSE_Coordinator_ID);
174   callBack *cbm = new callBack;
175   cbm->callback = cb;
176   POSE_Coordinator.registerCallBack(cbm);
177 }
178
179 /// Stop POSE simulation
180 void POSE_stop()
181 {
182   CProxy_pose POSE_Coordinator(POSE_Coordinator_ID);
183   POSE_Coordinator.stop();
184 }
185
186 /// Exit simulation program
187 void POSE_exit()
188 {
189   CProxy_pose POSE_Coordinator(POSE_Coordinator_ID);
190   POSE_Coordinator.exit();
191 }
192
193 /// Exit simulation program after terminus reduction
194 void POSE_prepExit(void *param, void *msg)
195 {
196   CkReductionMsg *m=(CkReductionMsg *)msg;
197   delete m;
198   CProxy_pose POSE_Coordinator(POSE_Coordinator_ID);
199   POSE_Coordinator.prepExit();
200 }
201
202 /// Set busy wait time
203 void POSE_set_busy_wait(double n) { busyWait = n; }
204
205 /// Busy wait for busyWait
206 void POSE_busy_wait()
207 {
208   double start = CmiWallTimer();
209   while (CmiWallTimer() - start < busyWait) ;
210 }
211
212 /// Busy wait for n
213 void POSE_busy_wait(double n)
214 {
215   double start = CmiWallTimer();
216   while (CmiWallTimer() - start < n) ;
217 }
218
219 /// Register the callback with POSE
220 void pose::registerCallBack(callBack *cbm) 
221 {
222   callBackSet = 1;
223   cb = cbm->callback;
224 }
225
226 /// Stop the simulation
227 void pose::stop(void) 
228
229 #ifdef SEQUENTIAL_POSE
230   // don't stop if quiescence was reached for a checkpoint operation
231   if (seqCheckpointInProgress) {
232     POSE_Objects[0].SeqBeginCheckpoint();
233   } else {
234 #if USE_LONG_TIMESTAMPS
235     CkPrintf("Sequential Endtime Approximation: %lld\n", POSE_GlobalClock);
236 #else
237     CkPrintf("Sequential Endtime Approximation: %d\n", POSE_GlobalClock);
238 #endif
239     // Call sequential termination here, when done it calls prepExit
240     POSE_Objects.Terminate();
241   }
242 #endif
243   // prepExit();
244 }
245
246 //! dump stats if enabled and exit
247 void pose::prepExit(void) 
248 {
249 #ifndef CMK_OPTIMIZE
250   if(pose_config.stats)
251     {
252       CProxy_localStat stats(theLocalStats);
253       CkPrintf("%d PE Simulation finished at %f. Gathering stats...\n", 
254                CkNumPes(), CmiWallTimer() - sim_timer);
255       stats.SendStats();
256     }
257   else
258     {
259       CkPrintf("%d PE Simulation finished at %f.\n", CkNumPes(), 
260                CmiWallTimer() - sim_timer);
261       POSE_exit();
262     }
263 #else
264   CkPrintf("%d PE Simulation finished at %f.\n", CkNumPes(), 
265            CmiWallTimer() - sim_timer);
266   POSE_exit();
267 #endif  
268 }
269
270 /// Exit the simulation
271 void pose::exit(void) 
272
273   if (callBackSet)
274     cb.send(); // need to make callback here
275   else
276     CkExit();
277 }
278
279 // this is a HACK to get module seqpose working
280 void _registerseqpose(void)
281 {
282   _registerpose();
283 }
284
285 void POSEreadCmdLine()
286 {
287   char **argv = CkGetArgv();
288   CmiArgGroup("Charm++","POSE");
289   pose_config.stats=CmiGetArgFlagDesc(argv, "+stats_pose",
290                         "Gather timing information and other statistics");
291   /*  semantic meaning for these still to be determined
292   CmiGetArgIntDesc(argv, "+start_proj_pose",&pose_config.start_proj,
293                         "GVT to initiate projections tracing");
294   CmiGetArgIntDesc(argv, "+end_proj_pose",&pose_config.end_proj,
295                         "GVT to end projections tracing");
296   */
297   pose_config.trace=CmiGetArgFlagDesc(argv, "+trace_pose",
298                         "Traces key POSE operations like Forward Execution, Rollback, Cancellation, Fossil Collection, etc. via user events for display in projections");
299
300   pose_config.dop=CmiGetArgFlagDesc(argv, "+dop_pose",
301                         "Critical path analysis by measuring degree of parallelism");
302
303   CmiGetArgIntDesc(argv, "+memman_pose", &pose_config.max_usage , "Coarse memory management: Restricts forward execution of objects with over <max_usage>/<checkpoint store_rate> checkpoints; default to 10");
304   /*
305   pose_config.msg_pool=CmiGetArgFlagDesc(argv, "+pose_msgpool",  "Store and reuse pools of messages under a certain size default 1000");
306   CmiGetArgIntDesc(argv, "+msgpoolsize_pose", &pose_config.msg_pool_size , "Store and reuse pools of messages under a certain size default 1000");
307
308   CmiGetArgIntDesc(argv, "+msgpoolmax_pose", &pose_config.max_pool_msg_size , "Store and reuse pools of messages under a certain size");
309   char *strat;
310   CmiGetArgStringDesc(argv, "+commlib_strat_pose", &strat , "Use commlib with strat in {stream|mesh|prio}");
311   if(strcmp("stream",strat)==0)
312     pose_config.commlib_strat=stream;
313   if(strcmp("mesh",strat)==0)
314     pose_config.commlib_strat=mesh;
315   if(strcmp("prio",strat)==0)
316     pose_config.commlib_strat=prio;
317   CmiGetArgIntDesc(argv, "+commlib_timeout-pose", &pose_config.commlib_timeout , "Use commlib with timeout N; default 1ms");
318   CmiGetArgIntDesc(argv, "+commlib_maxmsg_pose", &pose_config.commlib_maxmsg , "Use commlib with max msg N;  default 5");
319   */
320   pose_config.lb_on=CmiGetArgFlagDesc(argv, "+lb_on_pose", "Use load balancing");
321   CmiGetArgIntDesc(argv, "+lb_skip_pose", &pose_config.lb_skip , "Load balancing skip N; default 51");
322   CmiGetArgIntDesc(argv, "+lb_threshold_pose", &pose_config.lb_threshold , "Load balancing threshold N; default 4000");
323   CmiGetArgIntDesc(argv, "+lb_diff_pose", &pose_config.lb_diff , "Load balancing  min diff between min and max load PEs; default 2000");
324   CmiGetArgIntDesc(argv, "+checkpoint_rate_pose", &pose_config.store_rate , "Sets checkpoint to 1 for every <rate> events. Default to 1. ");
325   CmiGetArgIntDesc(argv, "+checkpoint_gvt_pose", &pose_config.checkpoint_gvt_interval, 
326                    "Checkpoint approximately every <gvt #> of GVT ticks; default = 0 = no checkpointing; overrides +checkpoint_time_pose");
327   if (pose_config.checkpoint_gvt_interval < 0) {
328     CmiAbort("+checkpoint_gvt_pose value must be >= 0; 0 = no checkpointing\n");
329   }
330   CmiGetArgIntDesc(argv, "+checkpoint_time_pose", &pose_config.checkpoint_time_interval, 
331                    "Checkpoint approximately every <time> seconds; default = 0 = no checkpointing; overridden by checkpoint_gvt_pose");
332   if (pose_config.checkpoint_time_interval < 0) {
333     CmiAbort("+checkpoint_time_pose value must be >= 0; 0 = no checkpointing\n");
334   }
335   if ((pose_config.checkpoint_gvt_interval > 0) && (pose_config.checkpoint_time_interval > 0)) {
336     CmiPrintf("WARNING: checkpoint GVT and time values both set; ignoring time value\n");
337     pose_config.checkpoint_time_interval = 0;
338   }
339   /* load balancing */
340   CmiGetArgIntDesc(argv, "+lb_gvt_pose", &pose_config.lb_gvt_interval, 
341                    "Load balancing approximately every <gvt #> of GVT ticks; default = 0 = no lb");
342   if (pose_config.lb_gvt_interval < 0) {
343     CmiAbort("+lb_gvt_pose value must be >= 0; 0 = no load balancing\n");
344   }
345   /* max_iteration seems to be defunct */
346   //  CmiGetArgIntDesc(argv, "+FEmax_pose", &pose_config.max_iter , "Sets max events executed in single forward execution step.  Default to 100.");
347   CmiGetArgIntDesc(argv, "+leash_specwindow_pose", &pose_config.spec_window , "Sets speculative window behavior.");
348   CmiGetArgIntDesc(argv, "+leash_min_pose", &pose_config.min_leash , "Sets speculative window behavior minimum leash. Default 10.");
349   CmiGetArgIntDesc(argv, "+leash_max_pose", &pose_config.max_leash , "Sets speculative window behavior maximum leash. Default 100.");
350   CmiGetArgIntDesc(argv, "+leash_flex_pose", &pose_config.max_leash , "Sets speculative window behavior leash flex. Default 10.");
351   if(pose_config.deterministic= CmiGetArgFlagDesc(argv, "+deterministic_pose",  "sorts events of same timestamp by event id for repeatable behavior "))
352     {
353       CkPrintf("enter at your own risk, this feature is broken\n");
354     }
355 }