08ab8e292efdd25c17edbcc3b87a9b40f691ee33
[charm.git] / src / libs / ck-libs / pose / gvt.C
1 // Global Virtual Time estimation for POSE
2 #include "pose.h"
3 #include "srtable.h"
4 #include "gvt.def.h"
5 #include "qd.h"
6
7 CkGroupID ThePVT;
8 CkGroupID TheGVT;
9 CpvExtern(int, stateRecovery);
10 CpvExtern(eventID, theEventID);
11 /// Basic Constructor
12 PVT::PVT() 
13 {
14 #ifdef VERBOSE_DEBUG
15   CkPrintf("[%d] constructing PVT\n",CkMyPe());
16 #endif
17   CpvInitialize(int, stateRecovery);
18   CpvAccess(stateRecovery) = 0;
19   CpvInitialize(eventID, theEventID);
20   CpvAccess(theEventID)=eventID();
21   //  CpvAccess(theEventID).dump();
22   LBTurnInstrumentOff();
23   optGVT = POSE_UnsetTS; conGVT = POSE_UnsetTS;
24   rdone=0;
25   SRs=NULL;
26 #ifdef POSE_COMM_ON
27   //com_debug = 1;
28 #endif
29 #ifndef CMK_OPTIMIZE
30   localStats = (localStat *)CkLocalBranch(theLocalStats);
31   if(pose_config.stats)
32     {
33       localStats->TimerStart(GVT_TIMER);
34     }
35 #endif
36 #ifdef MEM_TEMPORAL
37   localTimePool = (TimePool *)CkLocalBranch(TempMemID);
38   CkPrintf("NOTE: Temporal memory manager is ON!\n");
39 #endif
40   optPVT = conPVT = estGVT = POSE_UnsetTS;
41   startPhaseActive = gvtTurn = simdone = 0;
42   SendsAndRecvs = new SRtable();
43   SendsAndRecvs->Initialize();
44   specEventCount = eventCount = waitForFirst = 0;
45   iterMin = POSE_UnsetTS;
46   int P=CkNumPes(), N=CkMyPe();
47   reportReduceTo =  -1;
48   if ((N < P-2) && (N%2 == 1)) { //odd
49     reportTo = N-1;
50     reportsExpected = reportEnd = 0;
51   }
52   else if (N < P-2) { //even
53     reportTo = N;
54     reportsExpected = 2; 
55     if (N == P-3)
56       reportsExpected = 1;
57     reportEnd = 0;
58     if (N < (P-2)/2)
59       reportReduceTo = P-2;
60     else reportReduceTo = P-1;
61   }
62   if (N == P-2) {
63     reportTo = N;
64     reportEnd = 1;
65     reportsExpected = 1 + (P-2)/4 + ((P-2)%4)/2;
66   }
67   else if (N == P-1) {
68     reportTo = N;
69     reportEnd = 1;
70     if (P==1) reportsExpected = 1;
71     else reportsExpected = 1 + (P-2)/4 + (P-2)%2;
72   }
73   //  CkPrintf("PE %d reports to %d, receives %d reports, reduces and sends to %d, and reports directly to GVT if %d = 1!\n", CkMyPe(), reportTo, reportsExpected, reportReduceTo, reportEnd);
74
75   parCheckpointInProgress = 0;
76   parLastCheckpointGVT = 0;
77   parLastCheckpointTime = parStartTime = CmiWallTimer();
78 #ifndef CMK_OPTIMIZE
79   if(pose_config.stats)
80     localStats->TimerStop();
81 #endif
82 }
83
84 /// PUP routine
85 void PVT::pup(PUP::er &p) {
86   p|optPVT; p|conPVT; p|estGVT; p|repPVT;
87   p|simdone; p|iterMin; p|waitForFirst;
88   p|reportTo; p|reportsExpected; p|reportReduceTo; p|reportEnd;
89   p|gvtTurn; p|specEventCount; p|eventCount;
90   p|startPhaseActive; p|parStartTime; p|parCheckpointInProgress;
91   p|parLastCheckpointGVT; p|parLastCheckpointTime;
92   p|optGVT; p|conGVT; p|rdone;
93
94   if (p.isUnpacking()) {
95     parStartTime = CmiWallTimer() - (parLastCheckpointTime - parStartTime);
96 #ifndef CMK_OPTIMIZE
97     localStats = (localStat *)CkLocalBranch(theLocalStats);
98 #endif
99 #ifdef MEM_TEMPORAL
100     localTimePool = (TimePool *)CkLocalBranch(TempMemID);
101 #endif
102     SendsAndRecvs = new SRtable();
103   }
104
105   SendsAndRecvs->pup(p);
106
107   if (SRs != NULL) {
108     CkAbort("ERROR: PVT member *SRs is unexpectedly not NULL\n");
109   }
110 }
111
112 void PVT::startPhaseExp(prioBcMsg *m) {
113   startPhase(m);
114 }
115
116 /// ENTRY: runs the PVT calculation and reports to GVT
117 void PVT::startPhase(prioBcMsg *m) 
118 {
119   CProxy_GVT g(TheGVT);
120   CProxy_PVT p(ThePVT);
121   register int i;
122
123   if (startPhaseActive) return;
124 #ifndef CMK_OPTIMIZE
125   if(pose_config.stats)
126     localStats->TimerStart(GVT_TIMER);
127 #endif
128   startPhaseActive = 1;
129   if (m->bc) {
130     prioBcMsg *startMsg = new (8*sizeof(POSE_TimeType)) prioBcMsg;
131     startMsg->bc = 0;
132     *((POSE_TimeType *)CkPriorityPtr(startMsg)) = 1-POSE_TimeMax;
133     CkSetQueueing(startMsg, CK_QUEUEING_IFIFO); 
134     p.startPhaseExp(startMsg);
135   }
136
137   objs.Wake(); // wake objects to make sure all have reported
138   // compute PVT
139   optPVT = conPVT = POSE_UnsetTS;
140   int end = objs.getNumSpaces();
141   for (i=0; i<end; i++)
142     if (objs.objs[i].isPresent()) {
143       if (objs.objs[i].isOptimistic()) { // check optPVT 
144         if ((optPVT < 0) || ((objs.objs[i].getOVT() < optPVT) && 
145                              (objs.objs[i].getOVT() > POSE_UnsetTS))) {
146           optPVT = objs.objs[i].getOVT();
147           CkAssert(simdone>0 || ((objs.objs[i].getOVT() >= estGVT) ||
148                                (objs.objs[i].getOVT() == POSE_UnsetTS)));
149         }
150       }
151       else if (objs.objs[i].isConservative()) { // check conPVT
152         if ((conPVT < 0) || ((objs.objs[i].getOVT() < conPVT) && 
153                              (objs.objs[i].getOVT() > POSE_UnsetTS)))
154           conPVT = objs.objs[i].getOVT();
155       }
156       CkAssert(simdone>0 || (optPVT >= estGVT)||(optPVT == POSE_UnsetTS)||(estGVT == POSE_UnsetTS));
157       CkAssert(simdone>0 || (conPVT >= estGVT)||(conPVT == POSE_UnsetTS)||(estGVT == POSE_UnsetTS));
158     }
159
160   // (1) Find out the local PVT from optPVT and conPVT
161   POSE_TimeType pvt = optPVT;
162   if ((conPVT < pvt) && (conPVT > POSE_UnsetTS)) pvt = conPVT;
163   if ((iterMin < pvt) && (iterMin > POSE_UnsetTS)) pvt = iterMin;
164   if (waitForFirst) {
165     waitForFirst = 0;
166     if (pvt == POSE_UnsetTS)
167       SendsAndRecvs->Restructure(estGVT, estGVT, POSE_UnsetTS);
168     else
169       SendsAndRecvs->Restructure(estGVT, pvt, POSE_UnsetTS);
170   }
171
172   //  CkPrintf("[%d] pvt=%d gvt=%d optPVT=%d iterMin=%d\n", CkMyPe(), pvt, estGVT, optPVT, iterMin);
173   POSE_TimeType xt;
174   if (pvt == POSE_UnsetTS) { // all are idle; find max ovt
175     POSE_TimeType maxOVT = POSE_UnsetTS;
176     for (i=0; i<end; i++)
177       if (objs.objs[i].isPresent()) {
178         xt = objs.objs[i].getOVT2();
179         if (xt > maxOVT)
180           maxOVT = xt;
181       }
182     if (maxOVT > estGVT)
183       pvt = maxOVT;
184   }
185   
186   // (2) Pack the SRtable data into the message
187   POSE_TimeType maxSR;
188   UpdateMsg *um = SendsAndRecvs->PackTable(pvt, &maxSR);
189   // (3) Add the PVT info to the message
190   um->optPVT = pvt;
191   um->conPVT = conPVT;
192   um->maxSR = maxSR;
193   um->runGVTflag = 0;
194
195   if (um->numEntries > 0) {
196     //CkPrintf("PE %d has %d SRs reported to GVT; earliest=%d pvt=%d\n", CkMyPe(), um->numEntries, um->SRs[0].timestamp, pvt);
197   }
198   // send data to GVT estimation
199   p[reportTo].reportReduce(um);
200
201   /*
202   if (simdone) // transmit final info to GVT on PE 0
203     g[0].computeGVT(um);              
204   else {
205     g[gvtTurn].computeGVT(um);           // transmit info to GVT
206     gvtTurn = (gvtTurn + 1) % CkNumPes();  // calculate next GVT location
207   }
208   */
209   objs.SetIdle(); // Set objects to idle
210   iterMin = POSE_UnsetTS;
211 #ifndef CMK_OPTIMIZE
212   if(pose_config.stats)
213     localStats->TimerStop();
214 #endif
215 }
216
217 /// ENTRY: receive GVT estimate; wake up objects
218 void PVT::setGVT(GVTMsg *m)
219 {
220 #ifndef CMK_OPTIMIZE
221   if(pose_config.stats)
222     localStats->TimerStart(GVT_TIMER);
223 #endif
224   CProxy_PVT p(ThePVT);
225   CkAssert(m->estGVT >= estGVT);
226   estGVT = m->estGVT;
227   int i, end = objs.getNumSpaces();
228 #ifdef POSE_COMM_ON  
229   //PrioStreaming *pstrat = (PrioStreaming *)(POSE_commlib_insthndl.getStrategy());
230   //pstrat->setBasePriority((estGVT+10) - POSE_TimeMax);
231   //pstrat->setBasePriority(estGVT+10);
232 #endif
233   simdone = m->done;
234   CkFreeMsg(m);
235   waitForFirst = 1;
236   objs.Commit();
237 #ifdef MEM_TEMPORAL
238   localTimePool->set_min_time(estGVT);
239 #endif
240
241   // Parallel checkpointing: setGVT was broken into two functions, and
242   // beginCheckpoint was added.  Only initiate the checkpointing
243   // procedure on PE 0, after commits have occurred.  This should
244   // minimize the amount of data written to disk.  In order to ensure
245   // a stable state, we wait for quiescence to be reached before
246   // beginning the checkpoint.  Inconsistent results were obtained
247   // (possibly from messages still in transit) without this step.
248   // Once quiescence is reached, PE 0 begins the checkpoint, and then
249   // resumes the simulation in resumeAfterCheckpoint.  This Callback
250   // function is also the first POSE function to be called when
251   // restarting from a checkpoint.
252
253   // Checkpoints are initiated approximately every
254   // pose_config.checkpoint_gvt_interval GVT ticks or
255   // pose_config.checkpoint_time_interval seconds (both defined in
256   // pose_config.h).
257
258   if ((CkMyPe() == 0) && (parCheckpointInProgress == 0) && 
259       (((pose_config.checkpoint_gvt_interval > 0) && (estGVT >= (parLastCheckpointGVT + pose_config.checkpoint_gvt_interval))) || 
260        ((pose_config.checkpoint_time_interval > 0) && (CmiWallTimer() >= (parLastCheckpointTime + (double)pose_config.checkpoint_time_interval))))) {
261     // wait for quiescence to occur before checkpointing
262     eventMsg *dummyMsg = new eventMsg();
263     CkCallback cb(CkIndex_PVT::beginCheckpoint(dummyMsg), CkMyPe(), ThePVT);
264     parCheckpointInProgress = 1;
265     parLastCheckpointTime = CmiWallTimer();
266     CkStartQD(cb);
267   } else {
268     // skip checkpointing
269     eventMsg *dummyMsg = new eventMsg();
270     p[CkMyPe()].resumeAfterCheckpoint(dummyMsg);
271   }
272 }
273
274 /// ENTRY: begin checkpoint now that quiescence has been reached
275 void PVT::beginCheckpoint(eventMsg *m) {
276   CkFreeMsg(m);
277   if (parCheckpointInProgress) {  // ensure this only happens once
278     CkPrintf("POSE: quiescence detected\n");
279     CkPrintf("POSE: beginning checkpoint on processor %d at GVT=%lld time=%.1f sec\n", CkMyPe(), estGVT, CmiWallTimer() - parStartTime);
280     eventMsg *dummyMsg = new eventMsg();
281     CkCallback cb(CkIndex_PVT::resumeAfterCheckpoint(dummyMsg), CkMyPe(), ThePVT);
282     CkStartCheckpoint(POSE_CHECKPOINT_DIRECTORY, cb);
283   }
284 }
285
286 /// ENTRY: resume after checkpointing, restarting, or if checkpointing doesn't occur
287 void PVT::resumeAfterCheckpoint(eventMsg *m) {
288   if (parCheckpointInProgress) {
289     CkPrintf("POSE: checkpoint/restart complete on processor %d at GVT=%lld time=%.1f sec\n", CkMyPe(), estGVT, CmiWallTimer() - parStartTime);
290     parCheckpointInProgress = 0;
291     parLastCheckpointGVT = estGVT;
292   }
293   CkFreeMsg(m);
294   CProxy_PVT p(ThePVT);
295   startPhaseActive = 0;
296   prioBcMsg *startMsg = new (8*sizeof(int)) prioBcMsg;
297   startMsg->bc = 1;
298   *((int *)CkPriorityPtr(startMsg)) = 0;
299   CkSetQueueing(startMsg, CK_QUEUEING_IFIFO); 
300   p[CkMyPe()].startPhase(startMsg);
301 #ifndef CMK_OPTIMIZE
302   if(pose_config.stats)
303     localStats->TimerStop();
304 #endif
305 }
306
307 /// Register poser with PVT
308 int PVT::objRegister(int arrIdx, POSE_TimeType safeTime, int sync, sim *myPtr)
309 {
310   int i = objs.Insert(arrIdx, POSE_UnsetTS, sync, myPtr); // add to object list
311   return(i*1000 + CkMyPe());                          // return unique PVT idx
312 }
313
314 // Unregister poser from PVT
315 void PVT::objRemove(int pvtIdx)
316 {
317   int idx = (pvtIdx-CkMyPe())/1000;  // calculate local index from unique index
318   objs.Delete(idx);                  // delete the object
319 }
320
321 /// Update send/recv table at timestamp
322 void PVT::objUpdate(POSE_TimeType timestamp, int sr)
323 {
324 #ifndef CMK_OPTIMIZE
325   int tstat = localStats->TimerRunning();
326   if(pose_config.stats){
327     if (tstat)
328       localStats->SwitchTimer(GVT_TIMER);
329     else
330       localStats->TimerStart(GVT_TIMER);
331   }
332 #endif
333   //if ((timestamp < estGVT) && (estGVT > POSE_UnsetTS))
334   //CkPrintf("timestamp=%d estGVT=%d simdone=%d sr=%d\n", timestamp,
335   //estGVT, simdone, sr);
336   CkAssert(simdone>0 || (timestamp >= estGVT) || (estGVT == POSE_UnsetTS));
337   CkAssert((sr == SEND) || (sr == RECV));
338   if ((estGVT > POSE_UnsetTS) && 
339       ((timestamp < iterMin) || (iterMin == POSE_UnsetTS))) 
340     iterMin = timestamp;
341   if (waitForFirst) {
342     waitForFirst = 0;
343     SendsAndRecvs->Restructure(estGVT, timestamp, sr);
344   }
345   else SendsAndRecvs->Insert(timestamp, sr);
346 #ifndef CMK_OPTIMIZE
347   if(pose_config.stats){
348     if (tstat)
349       localStats->SwitchTimer(tstat);
350     else
351       localStats->TimerStop();
352   }
353 #endif
354
355 }
356
357 /// Update PVT with safeTime
358 void PVT::objUpdateOVT(int pvtIdx, POSE_TimeType safeTime, POSE_TimeType ovt)
359 {
360   int index = (pvtIdx-CkMyPe())/1000;
361   // minimize the non-idle OVT
362   //  if ((safeTime < estGVT) && (safeTime > POSE_UnsetTS)) 
363
364   CkAssert(simdone>0 || (safeTime >= estGVT) || (safeTime == POSE_UnsetTS));
365   if ((safeTime == POSE_UnsetTS) && (objs.objs[index].getOVT2() < ovt))
366     objs.objs[index].setOVT2(ovt);
367   else if ((safeTime > POSE_UnsetTS) && 
368            ((objs.objs[index].getOVT() > safeTime) || (objs.objs[index].getOVT() == POSE_UnsetTS)))
369     objs.objs[index].setOVT(safeTime);
370 }
371
372 /// Reduction point for PVT reports
373 void PVT::reportReduce(UpdateMsg *m)
374 {
375 #ifndef CMK_OPTIMIZE
376   if(pose_config.stats)
377     localStats->TimerStart(GVT_TIMER);
378 #endif
379   CProxy_PVT p(ThePVT);
380   CProxy_GVT g(TheGVT);
381   POSE_TimeType lastGVT = 0, maxSR=0;
382
383   // see if message provides new min optGVT or conGVT
384   if ((optGVT < 0) || ((m->optPVT > POSE_UnsetTS) && (m->optPVT < optGVT)))
385     optGVT = m->optPVT;
386   if (m->maxSR > 0)
387     maxSR = m->maxSR;
388   addSR(&SRs, m->SRs, optGVT, m->numEntries);
389   rdone++;
390   CkFreeMsg(m);
391
392   if (rdone == reportsExpected) { // all PVT reports are in
393     UpdateMsg *um;
394     int entryCount = 0;
395     // pack data into um
396     SRentry *tmp = SRs;
397     while (tmp && ((tmp->timestamp <= optGVT) || (optGVT == POSE_UnsetTS))
398            && (tmp->sends != tmp->recvs)) {
399       entryCount++;
400       tmp = tmp->next;
401     }
402     um = new (entryCount * sizeof(SRentry), 0) UpdateMsg;
403     tmp = SRs;
404     int i=0;
405     while (tmp && ((tmp->timestamp <= optGVT) || (optGVT == POSE_UnsetTS))
406            && (tmp->sends != tmp->recvs)) {
407       um->SRs[i] = *tmp;
408       tmp = tmp->next;
409       i++;
410     }
411     um->numEntries = entryCount;
412     um->optPVT = optGVT;
413     um->conPVT = conGVT;
414     um->maxSR = maxSR;
415     um->runGVTflag = 0;
416
417     if (reportEnd) { //send to computeGVT
418       if (simdone>0) // transmit final info to GVT on PE 0
419         g[0].computeGVT(um);              
420       else {
421         g[gvtTurn].computeGVT(um);           // transmit info to GVT
422         gvtTurn = (gvtTurn + 1) % CkNumPes();  // calculate next GVT location
423       }
424     }
425     else { //send to pvt reportReduceTo
426       p[reportReduceTo].reportReduce(um);
427     }
428
429     // reset static data
430     optGVT = conGVT = POSE_UnsetTS;
431     SRentry *cur = SRs;
432     SRs = NULL;
433     while (cur) {
434       tmp = cur->next;
435       delete cur;
436       cur = tmp;
437     }
438     rdone = 0;
439   }
440 #ifndef CMK_OPTIMIZE
441   if(pose_config.stats)
442     localStats->TimerStop();
443 #endif
444 }
445
446 /// Basic Constructor
447 GVT::GVT() 
448 {
449 #ifdef VERBOSE_DEBUG
450   CkPrintf("[%d] constructing GVT\n",CkMyPe());
451 #endif
452
453   optGVT = POSE_UnsetTS, conGVT = POSE_UnsetTS;
454   done=0;
455   SRs = NULL;
456   startOffset = 0;
457
458 #ifndef CMK_OPTIMIZE
459   localStats = (localStat *)CkLocalBranch(theLocalStats);
460 #endif
461 #ifndef SEQUENTIAL_POSE
462   if(pose_config.lb_on)
463     nextLBstart = pose_config.lb_skip - 1;
464 #endif
465   estGVT = lastEarliest = inactiveTime = POSE_UnsetTS;
466   lastSends = lastRecvs = inactive = 0;
467   reportsExpected = 1;
468   if (CkNumPes() >= 2) reportsExpected = 2;
469     
470   //  CkPrintf("GVT expects %d reports!\n", reportsExpected);
471   if (CkMyPe() == 0) { // start the PVT phase of the GVT algorithm
472     CProxy_PVT p(ThePVT);
473     prioBcMsg *startMsg = new (8*sizeof(int)) prioBcMsg;
474     startMsg->bc = 1;
475     *((int *)CkPriorityPtr(startMsg)) = 0;
476     CkSetQueueing(startMsg, CK_QUEUEING_IFIFO); 
477     p.startPhase(startMsg); // broadcast PVT calculation to all PVT branches
478   }
479 }
480
481 // Used for Ccd calls; currently commented out
482 //void GVT::_runGVT(UpdateMsg *m) 
483 //{ 
484 //  CProxy_GVT g(TheGVT);
485 //  g[(CkMyPe() + 1)%CkNumPes()].runGVT(m);
486 //}
487
488 /// ENTRY: Run the GVT
489 void GVT::runGVT(UpdateMsg *m) 
490 {
491 #ifndef CMK_OPTIMIZE
492   if(pose_config.stats)
493     localStats->TimerStart(GVT_TIMER);
494 #endif
495   estGVT = m->optPVT;
496   inactive = m->inactive;
497   inactiveTime = m->inactiveTime;
498   nextLBstart = m->nextLB;
499   CProxy_GVT g(TheGVT);
500   m->runGVTflag = 1;
501   g[CkMyPe()].computeGVT(m);  // start the next PVT phase of the GVT algorithm
502 #ifndef CMK_OPTIMIZE
503   if(pose_config.stats)
504     localStats->TimerStop();
505 #endif
506 }
507
508 /// ENTRY: Gathers PVT reports; calculates and broadcasts GVT to PVTs
509 void GVT::computeGVT(UpdateMsg *m)
510 {
511 #ifndef CMK_OPTIMIZE
512   if(pose_config.stats)
513     localStats->TimerStart(GVT_TIMER);
514 #endif
515   CProxy_PVT p(ThePVT);
516   CProxy_GVT g(TheGVT);
517   GVTMsg *gmsg = new GVTMsg;
518   POSE_TimeType lastGVT = 0, earliestMsg = POSE_UnsetTS, 
519     earlyAny = POSE_UnsetTS;
520
521   if (CkMyPe() != 0) startOffset = 1;
522   if (m->runGVTflag == 1) done++;
523   else {
524     // see if message provides new min optGVT or conGVT
525     if ((optGVT < 0) || ((m->optPVT > POSE_UnsetTS) && (m->optPVT < optGVT)))
526       optGVT = m->optPVT;
527     if ((conGVT < 0) || ((m->conPVT > POSE_UnsetTS) && (m->conPVT < conGVT)))
528       conGVT = m->conPVT;
529     if (m->maxSR > earlyAny) 
530       earlyAny = m->maxSR;
531     // add send/recv info to SRs
532     /*    if (m->numEntries > 0)
533       CkPrintf("GVT recv'd %d SRs from a PE, earliest=%d\n", m->numEntries, 
534       m->SRs[0].timestamp);*/
535     addSR(&SRs, m->SRs, optGVT, m->numEntries);
536     done++;
537   }
538   CkFreeMsg(m);
539
540   if (done == reportsExpected+startOffset) { // all PVT reports are in
541 #ifndef CMK_OPTIMIZE
542     if(pose_config.stats)
543       localStats->GvtInc();
544 #endif
545     done = 0;
546     startOffset = 1;
547     lastGVT = estGVT; // store previous estimate
548     if (lastGVT < 0) lastGVT = 0;
549     estGVT = POSE_UnsetTS;
550     
551     // derive GVT estimate from min optimistic & conservative GVTs
552     estGVT = optGVT;
553     if ((conGVT > POSE_UnsetTS) && (estGVT > POSE_UnsetTS) && (conGVT < estGVT))  estGVT = conGVT;
554
555     // Check if send/recv activity provides lower possible estimate
556     /*    if (SRs) SRs->dump();
557           else CkPrintf("No SRs reported to GVT!\n");*/
558     SRentry *tmp = SRs;
559     POSE_TimeType lastSR = POSE_UnsetTS;
560     while (tmp && ((tmp->timestamp <= estGVT) || (estGVT == POSE_UnsetTS))) {
561       lastSR = tmp->timestamp;
562       if (tmp->sends != tmp->recvs) {
563         earliestMsg = tmp->timestamp;
564         break;
565       }
566       tmp = tmp->next;
567     }
568     /*    if ((earliestMsg > POSE_UnsetTS) || (earlyAny > POSE_UnsetTS))
569           CkPrintf("GVT: earlyDiff=%d earlyAny=%d estGVT was %d.\n", earliestMsg, earlyAny, estGVT);*/
570     if (((earliestMsg < estGVT) && (earliestMsg != POSE_UnsetTS)) ||
571         (estGVT == POSE_UnsetTS))
572       estGVT = earliestMsg;
573     if ((lastSR != POSE_UnsetTS) && (estGVT == POSE_UnsetTS) && 
574         (lastSR > lastGVT))
575       estGVT = lastSR;
576
577     // check for inactivity
578     if ((optGVT == POSE_UnsetTS) && (earliestMsg == POSE_UnsetTS)) {
579       inactive++;
580       /*
581       if (inactive == 1) {
582         CkPrintf("[%d] Inactive... calling CkWaitQD...\n", CkMyPe());
583         CkWaitQD();
584         CkPrintf("[%d] Back from CkWaitQD...\n", CkMyPe());
585       }
586       */
587       estGVT = lastGVT;
588       if (inactive == 1) inactiveTime = lastGVT;
589     }
590     else if (estGVT < 0) {
591       estGVT = lastGVT;
592       inactive = 0;
593     }
594     else inactive = 0;
595
596     // check the estimate
597     //CkPrintf("opt=%d con=%d lastGVT=%d early=%d lastSR=%d et=%d\n", optGVT, conGVT, lastGVT, earliestMsg, lastSR, POSE_endtime);
598     CmiAssert(estGVT >= lastGVT); 
599     //if (estGVT % 1000 == 0)
600     //CkPrintf("[%d] New GVT = %d\n", CkMyPe(), estGVT);
601     //CkPrintf("[%d] New GVT = %lld\n", CkMyPe(), estGVT);
602
603     // check for termination conditions
604     int term = 0;
605     if ((estGVT >= POSE_endtime) && (POSE_endtime > POSE_UnsetTS)) {
606 #if USE_LONG_TIMESTAMPS      
607       CkPrintf("At endtime: %lld\n", POSE_endtime);
608 #else
609       CkPrintf("At endtime: %d\n", POSE_endtime);
610 #endif
611       term = 1;
612     }
613     else if (inactive > 2) {
614 #if USE_LONG_TIMESTAMPS      
615       CkPrintf("Simulation inactive at time: %lld\n", inactiveTime);
616 #else
617       CkPrintf("Simulation inactive at time: %d\n", inactiveTime);
618 #endif
619       term = 1;
620     }
621
622     // report the last new GVT estimate to all PVT branches
623     gmsg->estGVT = estGVT;
624     gmsg->done = term;
625     if (term) {
626       //if (POSE_endtime > POSE_UnsetTS) gmsg->estGVT = POSE_endtime + 1;
627       // else gmsg->estGVT++;
628 #if USE_LONG_TIMESTAMPS      
629       CkPrintf("Final GVT = %lld\n", gmsg->estGVT);
630 #else
631       CkPrintf("Final GVT = %d\n", gmsg->estGVT);
632 #endif
633       p.setGVT(gmsg);
634       POSE_stop();
635     }
636     else {
637       p.setGVT(gmsg);
638
639       if(pose_config.lb_on)
640         {
641           // perform load balancing
642 #ifndef CMK_OPTIMIZE
643           if(pose_config.stats)
644             localStats->SwitchTimer(LB_TIMER);
645 #endif
646          
647           if (CkNumPes() > 1) {
648             nextLBstart++;
649             if (pose_config.lb_skip == nextLBstart) {
650               TheLBG.calculateLocalLoad();
651               nextLBstart = 0;
652             }
653           }
654 #ifndef CMK_OPTIMIZE
655           if(pose_config.stats)
656             localStats->SwitchTimer(GVT_TIMER);
657 #endif
658         }
659
660       // transmit data to start next GVT estimation on next GVT branch
661       UpdateMsg *umsg = new UpdateMsg;
662       umsg->maxSR=0;
663       umsg->optPVT = estGVT;
664       umsg->inactive = inactive;
665       umsg->inactiveTime = inactiveTime;
666       umsg->nextLB = nextLBstart;
667       umsg->runGVTflag = 0;
668       g[(CkMyPe()+1) % CkNumPes()].runGVT(umsg);
669     }
670
671     // reset static data
672     optGVT = conGVT = POSE_UnsetTS;
673     SRentry *cur = SRs;
674     SRs = NULL;
675     while (cur) {
676       tmp = cur->next;
677       delete cur;
678       cur = tmp;
679     }
680   }
681 #ifndef CMK_OPTIMIZE
682   if(pose_config.stats)
683     localStats->TimerStop();
684 #endif
685 }
686
687 void GVT::addSR(SRentry **SRs, SRentry *e, POSE_TimeType og, int ne)
688 {
689   register int i;
690   SRentry *tab = (*SRs);
691   SRentry *tmp = tab;
692
693   for (i=0; i<ne; i++) {
694     if ((e[i].timestamp < og) || (og == POSE_UnsetTS)) {
695       if (!tmp) { // no entries yet
696         tab = new SRentry(e[i].timestamp, (SRentry *)NULL);
697         tab->sends = e[i].sends;
698         tab->recvs = e[i].recvs;
699         tmp = tab;
700         *SRs = tmp;
701       }
702       else {
703         if (e[i].timestamp < tmp->timestamp) { // goes before tmp
704           CkAssert(tmp == *SRs);
705           tab = new SRentry(e[i].timestamp, tmp);
706           tab->sends = e[i].sends;
707           tab->recvs = e[i].recvs;
708           tmp = tab;
709           *SRs = tmp;
710         }
711         else if (e[i].timestamp == tmp->timestamp) { // goes in first entr
712           tmp->sends = tmp->sends + e[i].sends;
713           tmp->recvs = tmp->recvs + e[i].recvs;
714         }
715         else { // search for position
716           while (tmp->next && (e[i].timestamp > tmp->next->timestamp))
717             tmp = tmp->next;
718           if (!tmp->next) { // goes at end of SRs
719             tmp->next = new SRentry(e[i].timestamp, (SRentry *)NULL);
720             tmp->next->sends = tmp->next->sends + e[i].sends;
721             tmp->next->recvs = tmp->next->recvs + e[i].recvs;
722             tmp = tmp->next;
723           }
724           else if (e[i].timestamp == tmp->next->timestamp) {//goes in tmp->next
725             tmp->next->sends = tmp->next->sends + e[i].sends;
726             tmp->next->recvs = tmp->next->recvs + e[i].recvs;
727             tmp = tmp->next;
728           }
729           else { // goes after tmp but before tmp->next
730             tmp->next = new SRentry(e[i].timestamp, tmp->next);
731             tmp->next->sends = tmp->next->sends + e[i].sends;
732             tmp->next->recvs = tmp->next->recvs + e[i].recvs;
733             tmp = tmp->next;
734           }
735         }
736       }
737     }
738     else break;
739   }
740 }
741
742 void PVT::addSR(SRentry **SRs, SRentry *e, POSE_TimeType og, int ne)
743 {
744   register int i;
745   SRentry *tab = (*SRs);
746   SRentry *tmp = tab;
747
748   for (i=0; i<ne; i++) {
749     if ((e[i].timestamp < og) || (og == POSE_UnsetTS)) {
750       if (!tmp) { // no entries yet
751         tab = new SRentry(e[i].timestamp, (SRentry *)NULL);
752         tab->sends = e[i].sends;
753         tab->recvs = e[i].recvs;
754         tmp = tab;
755         *SRs = tmp;
756       }
757       else {
758         if (e[i].timestamp < tmp->timestamp) { // goes before tmp
759           CkAssert(tmp == *SRs);
760           tab = new SRentry(e[i].timestamp, tmp);
761           tab->sends = e[i].sends;
762           tab->recvs = e[i].recvs;
763           tmp = tab;
764           *SRs = tmp;
765         }
766         else if (e[i].timestamp == tmp->timestamp) { // goes in first entr
767           tmp->sends = tmp->sends + e[i].sends;
768           tmp->recvs = tmp->recvs + e[i].recvs;
769         }
770         else { // search for position
771           while (tmp->next && (e[i].timestamp > tmp->next->timestamp))
772             tmp = tmp->next;
773           if (!tmp->next) { // goes at end of SRs
774             tmp->next = new SRentry(e[i].timestamp, (SRentry *)NULL);
775             tmp->next->sends = tmp->next->sends + e[i].sends;
776             tmp->next->recvs = tmp->next->recvs + e[i].recvs;
777             tmp = tmp->next;
778           }
779           else if (e[i].timestamp == tmp->next->timestamp) {//goes in tmp->next
780             tmp->next->sends = tmp->next->sends + e[i].sends;
781             tmp->next->recvs = tmp->next->recvs + e[i].recvs;
782             tmp = tmp->next;
783           }
784           else { // goes after tmp but before tmp->next
785             tmp->next = new SRentry(e[i].timestamp, tmp->next);
786             tmp->next->sends = tmp->next->sends + e[i].sends;
787             tmp->next->recvs = tmp->next->recvs + e[i].recvs;
788             tmp = tmp->next;
789           }
790         }
791       }
792     }
793     else break;
794   }
795 }