d69279d06aa5157f4dfe817839b06241348fcd01
[charm.git] / src / libs / ck-libs / pose / gvt.C
1 // Global Virtual Time estimation for POSE
2 #include "pose.h"
3 #include "srtable.h"
4 #include "gvt.def.h"
5 #include "qd.h"
6
7 CkGroupID ThePVT;
8 CkGroupID TheGVT;
9 CpvExtern(int, stateRecovery);
10 CpvExtern(eventID, theEventID);
11 /// Basic Constructor
12 PVT::PVT() 
13 {
14 #ifdef VERBOSE_DEBUG
15   CkPrintf("[%d] constructing PVT\n",CkMyPe());
16 #endif
17   CpvInitialize(int, stateRecovery);
18   CpvAccess(stateRecovery) = 0;
19   CpvInitialize(eventID, theEventID);
20   CpvAccess(theEventID)=eventID();
21   //  CpvAccess(theEventID).dump();
22   //LBTurnInstrumentOff();
23   optGVT = POSE_UnsetTS; conGVT = POSE_UnsetTS;
24   rdone=0;
25   SRs=NULL;
26 #ifdef POSE_COMM_ON
27   //com_debug = 1;
28 #endif
29 #ifndef CMK_OPTIMIZE
30   localStats = (localStat *)CkLocalBranch(theLocalStats);
31   if(pose_config.stats)
32     {
33       localStats->TimerStart(GVT_TIMER);
34     }
35 #endif
36 #ifdef MEM_TEMPORAL
37   localTimePool = (TimePool *)CkLocalBranch(TempMemID);
38   CkPrintf("NOTE: Temporal memory manager is ON!\n");
39 #endif
40   optPVT = conPVT = estGVT = POSE_UnsetTS;
41   startPhaseActive = gvtTurn = simdone = 0;
42   SendsAndRecvs = new SRtable();
43   SendsAndRecvs->Initialize();
44   specEventCount = eventCount = waitForFirst = 0;
45   iterMin = POSE_UnsetTS;
46   int P=CkNumPes(), N=CkMyPe();
47   reportReduceTo =  -1;
48   if ((N < P-2) && (N%2 == 1)) { //odd
49     reportTo = N-1;
50     reportsExpected = reportEnd = 0;
51   }
52   else if (N < P-2) { //even
53     reportTo = N;
54     reportsExpected = 2; 
55     if (N == P-3)
56       reportsExpected = 1;
57     reportEnd = 0;
58     if (N < (P-2)/2)
59       reportReduceTo = P-2;
60     else reportReduceTo = P-1;
61   }
62   if (N == P-2) {
63     reportTo = N;
64     reportEnd = 1;
65     reportsExpected = 1 + (P-2)/4 + ((P-2)%4)/2;
66   }
67   else if (N == P-1) {
68     reportTo = N;
69     reportEnd = 1;
70     if (P==1) reportsExpected = 1;
71     else reportsExpected = 1 + (P-2)/4 + (P-2)%2;
72   }
73   //  CkPrintf("PE %d reports to %d, receives %d reports, reduces and sends to %d, and reports directly to GVT if %d = 1!\n", CkMyPe(), reportTo, reportsExpected, reportReduceTo, reportEnd);
74
75   parCheckpointInProgress = 0;
76   parLastCheckpointGVT = 0;
77   parLastCheckpointTime = parStartTime = CmiWallTimer();
78 #ifndef CMK_OPTIMIZE
79   if(pose_config.stats)
80     localStats->TimerStop();
81 #endif
82 }
83
84 /// PUP routine
85 void PVT::pup(PUP::er &p) {
86   p|optPVT; p|conPVT; p|estGVT; p|repPVT;
87   p|simdone; p|iterMin; p|waitForFirst;
88   p|reportTo; p|reportsExpected; p|reportReduceTo; p|reportEnd;
89   p|gvtTurn; p|specEventCount; p|eventCount;
90   p|startPhaseActive; p|parStartTime; p|parCheckpointInProgress;
91   p|parLastCheckpointGVT; p|parLastCheckpointTime;
92   p|optGVT; p|conGVT; p|rdone;
93
94   if (p.isUnpacking()) {
95     parStartTime = CmiWallTimer() - (parLastCheckpointTime - parStartTime);
96 #ifndef CMK_OPTIMIZE
97     localStats = (localStat *)CkLocalBranch(theLocalStats);
98 #endif
99 #ifdef MEM_TEMPORAL
100     localTimePool = (TimePool *)CkLocalBranch(TempMemID);
101 #endif
102     SendsAndRecvs = new SRtable();
103   }
104
105   SendsAndRecvs->pup(p);
106
107   if (SRs != NULL) {
108     CkAbort("ERROR: PVT member *SRs is unexpectedly not NULL\n");
109   }
110 }
111
112 void PVT::startPhaseExp(prioBcMsg *m) {
113   startPhase(m);
114 }
115
116 /// ENTRY: runs the PVT calculation and reports to GVT
117 void PVT::startPhase(prioBcMsg *m) 
118 {
119   CProxy_GVT g(TheGVT);
120   CProxy_PVT p(ThePVT);
121   register int i;
122
123   if (startPhaseActive) return;
124 #ifndef CMK_OPTIMIZE
125   if(pose_config.stats)
126     localStats->TimerStart(GVT_TIMER);
127 #endif
128   startPhaseActive = 1;
129   if (m->bc) {
130     prioBcMsg *startMsg = new (8*sizeof(POSE_TimeType)) prioBcMsg;
131     startMsg->bc = 0;
132     *((POSE_TimeType *)CkPriorityPtr(startMsg)) = 1-POSE_TimeMax;
133     CkSetQueueing(startMsg, CK_QUEUEING_IFIFO); 
134     p.startPhaseExp(startMsg);
135   }
136
137   objs.Wake(); // wake objects to make sure all have reported
138   // compute PVT
139   optPVT = conPVT = POSE_UnsetTS;
140   int end = objs.getNumSpaces();
141   for (i=0; i<end; i++)
142     if (objs.objs[i].isPresent()) {
143       if (objs.objs[i].isOptimistic()) { // check optPVT 
144         if ((optPVT < 0) || ((objs.objs[i].getOVT() < optPVT) && 
145                              (objs.objs[i].getOVT() > POSE_UnsetTS))) {
146           optPVT = objs.objs[i].getOVT();
147           CkAssert(simdone>0 || ((objs.objs[i].getOVT() >= estGVT) ||
148                                (objs.objs[i].getOVT() == POSE_UnsetTS)));
149         }
150       }
151       else if (objs.objs[i].isConservative()) { // check conPVT
152         if ((conPVT < 0) || ((objs.objs[i].getOVT() < conPVT) && 
153                              (objs.objs[i].getOVT() > POSE_UnsetTS)))
154           conPVT = objs.objs[i].getOVT();
155       }
156       CkAssert(simdone>0 || (optPVT >= estGVT)||(optPVT == POSE_UnsetTS)||(estGVT == POSE_UnsetTS));
157       CkAssert(simdone>0 || (conPVT >= estGVT)||(conPVT == POSE_UnsetTS)||(estGVT == POSE_UnsetTS));
158     }
159
160   // (1) Find out the local PVT from optPVT and conPVT
161   POSE_TimeType pvt = optPVT;
162   if ((conPVT < pvt) && (conPVT > POSE_UnsetTS)) pvt = conPVT;
163   if ((iterMin < pvt) && (iterMin > POSE_UnsetTS)) pvt = iterMin;
164   if (waitForFirst) {
165     waitForFirst = 0;
166     if (pvt == POSE_UnsetTS)
167       SendsAndRecvs->Restructure(estGVT, estGVT, POSE_UnsetTS);
168     else
169       SendsAndRecvs->Restructure(estGVT, pvt, POSE_UnsetTS);
170   }
171
172   //  CkPrintf("[%d] pvt=%d gvt=%d optPVT=%d iterMin=%d\n", CkMyPe(), pvt, estGVT, optPVT, iterMin);
173   POSE_TimeType xt;
174   if (pvt == POSE_UnsetTS) { // all are idle; find max ovt
175     POSE_TimeType maxOVT = POSE_UnsetTS;
176     for (i=0; i<end; i++)
177       if (objs.objs[i].isPresent()) {
178         xt = objs.objs[i].getOVT2();
179         if (xt > maxOVT)
180           maxOVT = xt;
181       }
182     if (maxOVT > estGVT)
183       pvt = maxOVT;
184   }
185   
186   // (2) Pack the SRtable data into the message
187   POSE_TimeType maxSR;
188   UpdateMsg *um = SendsAndRecvs->PackTable(pvt, &maxSR);
189   // (3) Add the PVT info to the message
190   um->optPVT = pvt;
191   um->conPVT = conPVT;
192   um->maxSR = maxSR;
193   um->runGVTflag = 0;
194
195   if (um->numEntries > 0) {
196     //CkPrintf("PE %d has %d SRs reported to GVT; earliest=%d pvt=%d\n", CkMyPe(), um->numEntries, um->SRs[0].timestamp, pvt);
197   }
198   // send data to GVT estimation
199   p[reportTo].reportReduce(um);
200
201   /*
202   if (simdone) // transmit final info to GVT on PE 0
203     g[0].computeGVT(um);              
204   else {
205     g[gvtTurn].computeGVT(um);           // transmit info to GVT
206     gvtTurn = (gvtTurn + 1) % CkNumPes();  // calculate next GVT location
207   }
208   */
209   objs.SetIdle(); // Set objects to idle
210   iterMin = POSE_UnsetTS;
211 #ifndef CMK_OPTIMIZE
212   if(pose_config.stats)
213     localStats->TimerStop();
214 #endif
215 }
216
217 /// ENTRY: receive GVT estimate; wake up objects
218 void PVT::setGVT(GVTMsg *m)
219 {
220 #ifndef CMK_OPTIMIZE
221   if(pose_config.stats)
222     localStats->TimerStart(GVT_TIMER);
223 #endif
224   CProxy_PVT p(ThePVT);
225   CkAssert(m->estGVT >= estGVT);
226   estGVT = m->estGVT;
227   int i, end = objs.getNumSpaces();
228 #ifdef POSE_COMM_ON  
229   //PrioStreaming *pstrat = (PrioStreaming *)(POSE_commlib_insthndl.getStrategy());
230   //pstrat->setBasePriority((estGVT+10) - POSE_TimeMax);
231   //pstrat->setBasePriority(estGVT+10);
232 #endif
233   simdone = m->done;
234   CkFreeMsg(m);
235   waitForFirst = 1;
236   objs.Commit();
237 #ifdef MEM_TEMPORAL
238   localTimePool->set_min_time(estGVT);
239 #endif
240
241   // Parallel checkpointing: setGVT was broken into two functions, and
242   // beginCheckpoint was added.  Only initiate the checkpointing
243   // procedure on PE 0, after commits have occurred.  This should
244   // minimize the amount of data written to disk.  In order to ensure
245   // a stable state, we wait for quiescence to be reached before
246   // beginning the checkpoint.  Inconsistent results were obtained
247   // (possibly from messages still in transit) without this step.
248   // Once quiescence is reached, PE 0 begins the checkpoint, and then
249   // resumes the simulation in resumeAfterCheckpoint.  This Callback
250   // function is also the first POSE function to be called when
251   // restarting from a checkpoint.
252
253   // Checkpoints are initiated approximately every
254   // pose_config.checkpoint_gvt_interval GVT ticks or
255   // pose_config.checkpoint_time_interval seconds (both defined in
256   // pose_config.h).
257
258   if ((CkMyPe() == 0) && (parCheckpointInProgress == 0) && 
259       (((pose_config.checkpoint_gvt_interval > 0) && (estGVT >= (parLastCheckpointGVT + pose_config.checkpoint_gvt_interval))) || 
260        ((pose_config.checkpoint_time_interval > 0) && (CmiWallTimer() >= (parLastCheckpointTime + (double)pose_config.checkpoint_time_interval))))) {
261     // wait for quiescence to occur before checkpointing
262     eventMsg *dummyMsg = new eventMsg();
263     CkCallback cb(CkIndex_PVT::beginCheckpoint(dummyMsg), CkMyPe(), ThePVT);
264     parCheckpointInProgress = 1;
265     parLastCheckpointTime = CmiWallTimer();
266     CkStartQD(cb);
267   } else {
268     // skip checkpointing
269     eventMsg *dummyMsg = new eventMsg();
270     p[CkMyPe()].resumeAfterCheckpoint(dummyMsg);
271   }
272 }
273
274 /// ENTRY: begin checkpoint now that quiescence has been reached
275 void PVT::beginCheckpoint(eventMsg *m) {
276   CkFreeMsg(m);
277   if (parCheckpointInProgress) {  // ensure this only happens once
278     CkPrintf("POSE: quiescence detected\n");
279     CkPrintf("POSE: beginning checkpoint on processor %d at GVT=%lld time=%.1f sec\n", CkMyPe(), estGVT, CmiWallTimer() - parStartTime);
280     eventMsg *dummyMsg = new eventMsg();
281     CkCallback cb(CkIndex_PVT::resumeAfterCheckpoint(dummyMsg), CkMyPe(), ThePVT);
282     CkStartCheckpoint(POSE_CHECKPOINT_DIRECTORY, cb);
283   }
284 }
285
286 void PVT::beginLoadbalancing(eventMsg *m) {
287   CkFreeMsg(m);
288   if (parCheckpointInProgress) {  // ensure this only happens once
289     CProxy_PVT p(ThePVT);
290     p.callAtSync();
291   }
292 }
293
294 void PVT::callAtSync()
295 {
296   objs.callAtSync();
297 }
298
299 void PVT::doneLB() {
300   static int count = 0;
301   count ++;
302   if (count == objs.getNumObjs()) {
303     count =0;
304     if (CkMyPe()==0) { 
305       eventMsg *dummyMsg = new eventMsg();
306       CProxy_PVT p(ThePVT);
307       p[0].resumeAfterCheckpoint(dummyMsg);
308     }
309   }
310 }
311
312 /// ENTRY: resume after checkpointing, restarting, or if checkpointing doesn't occur
313 void PVT::resumeAfterCheckpoint(eventMsg *m) {
314   if (parCheckpointInProgress) {
315     CkPrintf("POSE: checkpoint/restart complete on processor %d at GVT=%lld time=%.1f sec\n", CkMyPe(), estGVT, CmiWallTimer() - parStartTime);
316     parCheckpointInProgress = 0;
317     parLastCheckpointGVT = estGVT;
318   }
319   CkFreeMsg(m);
320   CProxy_PVT p(ThePVT);
321   startPhaseActive = 0;
322   prioBcMsg *startMsg = new (8*sizeof(int)) prioBcMsg;
323   startMsg->bc = 1;
324   *((int *)CkPriorityPtr(startMsg)) = 0;
325   CkSetQueueing(startMsg, CK_QUEUEING_IFIFO); 
326   p[CkMyPe()].startPhase(startMsg);
327 #ifndef CMK_OPTIMIZE
328   if(pose_config.stats)
329     localStats->TimerStop();
330 #endif
331 }
332
333 /// Register poser with PVT
334 int PVT::objRegister(int arrIdx, POSE_TimeType safeTime, int sync, sim *myPtr)
335 {
336   int i = objs.Insert(arrIdx, POSE_UnsetTS, sync, myPtr); // add to object list
337   return(i*1000 + CkMyPe());                          // return unique PVT idx
338 }
339
340 // Unregister poser from PVT
341 void PVT::objRemove(int pvtIdx)
342 {
343   int idx = (pvtIdx-CkMyPe())/1000;  // calculate local index from unique index
344   objs.Delete(idx);                  // delete the object
345 }
346
347 /// Update send/recv table at timestamp
348 void PVT::objUpdate(POSE_TimeType timestamp, int sr)
349 {
350 #ifndef CMK_OPTIMIZE
351   int tstat = localStats->TimerRunning();
352   if(pose_config.stats){
353     if (tstat)
354       localStats->SwitchTimer(GVT_TIMER);
355     else
356       localStats->TimerStart(GVT_TIMER);
357   }
358 #endif
359   //if ((timestamp < estGVT) && (estGVT > POSE_UnsetTS))
360   //CkPrintf("timestamp=%d estGVT=%d simdone=%d sr=%d\n", timestamp,
361   //estGVT, simdone, sr);
362   CkAssert(simdone>0 || (timestamp >= estGVT) || (estGVT == POSE_UnsetTS));
363   CkAssert((sr == SEND) || (sr == RECV));
364   if ((estGVT > POSE_UnsetTS) && 
365       ((timestamp < iterMin) || (iterMin == POSE_UnsetTS))) 
366     iterMin = timestamp;
367   if (waitForFirst) {
368     waitForFirst = 0;
369     SendsAndRecvs->Restructure(estGVT, timestamp, sr);
370   }
371   else SendsAndRecvs->Insert(timestamp, sr);
372 #ifndef CMK_OPTIMIZE
373   if(pose_config.stats){
374     if (tstat)
375       localStats->SwitchTimer(tstat);
376     else
377       localStats->TimerStop();
378   }
379 #endif
380
381 }
382
383 /// Update PVT with safeTime
384 void PVT::objUpdateOVT(int pvtIdx, POSE_TimeType safeTime, POSE_TimeType ovt)
385 {
386   int index = (pvtIdx-CkMyPe())/1000;
387   // minimize the non-idle OVT
388   //  if ((safeTime < estGVT) && (safeTime > POSE_UnsetTS)) 
389
390   CkAssert(simdone>0 || (safeTime >= estGVT) || (safeTime == POSE_UnsetTS));
391   if ((safeTime == POSE_UnsetTS) && (objs.objs[index].getOVT2() < ovt))
392     objs.objs[index].setOVT2(ovt);
393   else if ((safeTime > POSE_UnsetTS) && 
394            ((objs.objs[index].getOVT() > safeTime) || (objs.objs[index].getOVT() == POSE_UnsetTS)))
395     objs.objs[index].setOVT(safeTime);
396 }
397
398 /// Reduction point for PVT reports
399 void PVT::reportReduce(UpdateMsg *m)
400 {
401 #ifndef CMK_OPTIMIZE
402   if(pose_config.stats)
403     localStats->TimerStart(GVT_TIMER);
404 #endif
405   CProxy_PVT p(ThePVT);
406   CProxy_GVT g(TheGVT);
407   POSE_TimeType lastGVT = 0, maxSR=0;
408
409   // see if message provides new min optGVT or conGVT
410   if ((optGVT < 0) || ((m->optPVT > POSE_UnsetTS) && (m->optPVT < optGVT)))
411     optGVT = m->optPVT;
412   if (m->maxSR > 0)
413     maxSR = m->maxSR;
414   addSR(&SRs, m->SRs, optGVT, m->numEntries);
415   rdone++;
416   CkFreeMsg(m);
417
418   if (rdone == reportsExpected) { // all PVT reports are in
419     UpdateMsg *um;
420     int entryCount = 0;
421     // pack data into um
422     SRentry *tmp = SRs;
423     while (tmp && ((tmp->timestamp <= optGVT) || (optGVT == POSE_UnsetTS))
424            && (tmp->sends != tmp->recvs)) {
425       entryCount++;
426       tmp = tmp->next;
427     }
428     um = new (entryCount * sizeof(SRentry), 0) UpdateMsg;
429     tmp = SRs;
430     int i=0;
431     while (tmp && ((tmp->timestamp <= optGVT) || (optGVT == POSE_UnsetTS))
432            && (tmp->sends != tmp->recvs)) {
433       um->SRs[i] = *tmp;
434       tmp = tmp->next;
435       i++;
436     }
437     um->numEntries = entryCount;
438     um->optPVT = optGVT;
439     um->conPVT = conGVT;
440     um->maxSR = maxSR;
441     um->runGVTflag = 0;
442
443     if (reportEnd) { //send to computeGVT
444       if (simdone>0) // transmit final info to GVT on PE 0
445         g[0].computeGVT(um);              
446       else {
447         g[gvtTurn].computeGVT(um);           // transmit info to GVT
448         gvtTurn = (gvtTurn + 1) % CkNumPes();  // calculate next GVT location
449       }
450     }
451     else { //send to pvt reportReduceTo
452       p[reportReduceTo].reportReduce(um);
453     }
454
455     // reset static data
456     optGVT = conGVT = POSE_UnsetTS;
457     SRentry *cur = SRs;
458     SRs = NULL;
459     while (cur) {
460       tmp = cur->next;
461       delete cur;
462       cur = tmp;
463     }
464     rdone = 0;
465   }
466 #ifndef CMK_OPTIMIZE
467   if(pose_config.stats)
468     localStats->TimerStop();
469 #endif
470 }
471
472 /// Basic Constructor
473 GVT::GVT() 
474 {
475 #ifdef VERBOSE_DEBUG
476   CkPrintf("[%d] constructing GVT\n",CkMyPe());
477 #endif
478
479   optGVT = POSE_UnsetTS, conGVT = POSE_UnsetTS;
480   done=0;
481   SRs = NULL;
482   startOffset = 0;
483
484 #ifndef CMK_OPTIMIZE
485   localStats = (localStat *)CkLocalBranch(theLocalStats);
486 #endif
487 #ifndef SEQUENTIAL_POSE
488   if(pose_config.lb_on)
489     nextLBstart = pose_config.lb_skip - 1;
490 #endif
491   estGVT = lastEarliest = inactiveTime = POSE_UnsetTS;
492   lastSends = lastRecvs = inactive = 0;
493   reportsExpected = 1;
494   if (CkNumPes() >= 2) reportsExpected = 2;
495     
496   //  CkPrintf("GVT expects %d reports!\n", reportsExpected);
497   if (CkMyPe() == 0) { // start the PVT phase of the GVT algorithm
498     CProxy_PVT p(ThePVT);
499     prioBcMsg *startMsg = new (8*sizeof(int)) prioBcMsg;
500     startMsg->bc = 1;
501     *((int *)CkPriorityPtr(startMsg)) = 0;
502     CkSetQueueing(startMsg, CK_QUEUEING_IFIFO); 
503     p.startPhase(startMsg); // broadcast PVT calculation to all PVT branches
504   }
505 }
506
507 // Used for Ccd calls; currently commented out
508 //void GVT::_runGVT(UpdateMsg *m) 
509 //{ 
510 //  CProxy_GVT g(TheGVT);
511 //  g[(CkMyPe() + 1)%CkNumPes()].runGVT(m);
512 //}
513
514 /// ENTRY: Run the GVT
515 void GVT::runGVT(UpdateMsg *m) 
516 {
517 #ifndef CMK_OPTIMIZE
518   if(pose_config.stats)
519     localStats->TimerStart(GVT_TIMER);
520 #endif
521   estGVT = m->optPVT;
522   inactive = m->inactive;
523   inactiveTime = m->inactiveTime;
524   nextLBstart = m->nextLB;
525   CProxy_GVT g(TheGVT);
526   m->runGVTflag = 1;
527   g[CkMyPe()].computeGVT(m);  // start the next PVT phase of the GVT algorithm
528 #ifndef CMK_OPTIMIZE
529   if(pose_config.stats)
530     localStats->TimerStop();
531 #endif
532 }
533
534 /// ENTRY: Gathers PVT reports; calculates and broadcasts GVT to PVTs
535 void GVT::computeGVT(UpdateMsg *m)
536 {
537 #ifndef CMK_OPTIMIZE
538   if(pose_config.stats)
539     localStats->TimerStart(GVT_TIMER);
540 #endif
541   CProxy_PVT p(ThePVT);
542   CProxy_GVT g(TheGVT);
543   GVTMsg *gmsg = new GVTMsg;
544   POSE_TimeType lastGVT = 0, earliestMsg = POSE_UnsetTS, 
545     earlyAny = POSE_UnsetTS;
546
547   if (CkMyPe() != 0) startOffset = 1;
548   if (m->runGVTflag == 1) done++;
549   else {
550     // see if message provides new min optGVT or conGVT
551     if ((optGVT < 0) || ((m->optPVT > POSE_UnsetTS) && (m->optPVT < optGVT)))
552       optGVT = m->optPVT;
553     if ((conGVT < 0) || ((m->conPVT > POSE_UnsetTS) && (m->conPVT < conGVT)))
554       conGVT = m->conPVT;
555     if (m->maxSR > earlyAny) 
556       earlyAny = m->maxSR;
557     // add send/recv info to SRs
558     /*    if (m->numEntries > 0)
559       CkPrintf("GVT recv'd %d SRs from a PE, earliest=%d\n", m->numEntries, 
560       m->SRs[0].timestamp);*/
561     addSR(&SRs, m->SRs, optGVT, m->numEntries);
562     done++;
563   }
564   CkFreeMsg(m);
565
566   if (done == reportsExpected+startOffset) { // all PVT reports are in
567 #ifndef CMK_OPTIMIZE
568     if(pose_config.stats)
569       localStats->GvtInc();
570 #endif
571     done = 0;
572     startOffset = 1;
573     lastGVT = estGVT; // store previous estimate
574     if (lastGVT < 0) lastGVT = 0;
575     estGVT = POSE_UnsetTS;
576     
577     // derive GVT estimate from min optimistic & conservative GVTs
578     estGVT = optGVT;
579     if ((conGVT > POSE_UnsetTS) && (estGVT > POSE_UnsetTS) && (conGVT < estGVT))  estGVT = conGVT;
580
581     // Check if send/recv activity provides lower possible estimate
582     /*    if (SRs) SRs->dump();
583           else CkPrintf("No SRs reported to GVT!\n");*/
584     SRentry *tmp = SRs;
585     POSE_TimeType lastSR = POSE_UnsetTS;
586     while (tmp && ((tmp->timestamp <= estGVT) || (estGVT == POSE_UnsetTS))) {
587       lastSR = tmp->timestamp;
588       if (tmp->sends != tmp->recvs) {
589         earliestMsg = tmp->timestamp;
590         break;
591       }
592       tmp = tmp->next;
593     }
594     /*    if ((earliestMsg > POSE_UnsetTS) || (earlyAny > POSE_UnsetTS))
595           CkPrintf("GVT: earlyDiff=%d earlyAny=%d estGVT was %d.\n", earliestMsg, earlyAny, estGVT);*/
596     if (((earliestMsg < estGVT) && (earliestMsg != POSE_UnsetTS)) ||
597         (estGVT == POSE_UnsetTS))
598       estGVT = earliestMsg;
599     if ((lastSR != POSE_UnsetTS) && (estGVT == POSE_UnsetTS) && 
600         (lastSR > lastGVT))
601       estGVT = lastSR;
602
603     // check for inactivity
604     if ((optGVT == POSE_UnsetTS) && (earliestMsg == POSE_UnsetTS)) {
605       inactive++;
606       /*
607       if (inactive == 1) {
608         CkPrintf("[%d] Inactive... calling CkWaitQD...\n", CkMyPe());
609         CkWaitQD();
610         CkPrintf("[%d] Back from CkWaitQD...\n", CkMyPe());
611       }
612       */
613       estGVT = lastGVT;
614       if (inactive == 1) inactiveTime = lastGVT;
615     }
616     else if (estGVT < 0) {
617       estGVT = lastGVT;
618       inactive = 0;
619     }
620     else inactive = 0;
621
622     // check the estimate
623     //CkPrintf("opt=%d con=%d lastGVT=%d early=%d lastSR=%d et=%d\n", optGVT, conGVT, lastGVT, earliestMsg, lastSR, POSE_endtime);
624     CmiAssert(estGVT >= lastGVT); 
625     //if (estGVT % 1000 == 0)
626     //CkPrintf("[%d] New GVT = %d\n", CkMyPe(), estGVT);
627     //CkPrintf("[%d] New GVT = %lld\n", CkMyPe(), estGVT);
628
629     // check for termination conditions
630     int term = 0;
631     if ((estGVT >= POSE_endtime) && (POSE_endtime > POSE_UnsetTS)) {
632 #if USE_LONG_TIMESTAMPS      
633       CkPrintf("At endtime: %lld\n", POSE_endtime);
634 #else
635       CkPrintf("At endtime: %d\n", POSE_endtime);
636 #endif
637       term = 1;
638     }
639     else if (inactive > 2) {
640 #if USE_LONG_TIMESTAMPS      
641       CkPrintf("Simulation inactive at time: %lld\n", inactiveTime);
642 #else
643       CkPrintf("Simulation inactive at time: %d\n", inactiveTime);
644 #endif
645       term = 1;
646     }
647
648     // report the last new GVT estimate to all PVT branches
649     gmsg->estGVT = estGVT;
650     gmsg->done = term;
651     if (term) {
652       //if (POSE_endtime > POSE_UnsetTS) gmsg->estGVT = POSE_endtime + 1;
653       // else gmsg->estGVT++;
654 #if USE_LONG_TIMESTAMPS      
655       CkPrintf("Final GVT = %lld\n", gmsg->estGVT);
656 #else
657       CkPrintf("Final GVT = %d\n", gmsg->estGVT);
658 #endif
659       p.setGVT(gmsg);
660       POSE_stop();
661     }
662     else {
663       p.setGVT(gmsg);
664
665       if(pose_config.lb_on)
666         {
667           // perform load balancing
668 #ifndef CMK_OPTIMIZE
669           if(pose_config.stats)
670             localStats->SwitchTimer(LB_TIMER);
671 #endif
672          
673           if (CkNumPes() > 1) {
674             nextLBstart++;
675             if (pose_config.lb_skip == nextLBstart) {
676               TheLBG.calculateLocalLoad();
677               nextLBstart = 0;
678             }
679           }
680 #ifndef CMK_OPTIMIZE
681           if(pose_config.stats)
682             localStats->SwitchTimer(GVT_TIMER);
683 #endif
684         }
685
686       // transmit data to start next GVT estimation on next GVT branch
687       UpdateMsg *umsg = new UpdateMsg;
688       umsg->maxSR=0;
689       umsg->optPVT = estGVT;
690       umsg->inactive = inactive;
691       umsg->inactiveTime = inactiveTime;
692       umsg->nextLB = nextLBstart;
693       umsg->runGVTflag = 0;
694       g[(CkMyPe()+1) % CkNumPes()].runGVT(umsg);
695     }
696
697     // reset static data
698     optGVT = conGVT = POSE_UnsetTS;
699     SRentry *cur = SRs;
700     SRs = NULL;
701     while (cur) {
702       tmp = cur->next;
703       delete cur;
704       cur = tmp;
705     }
706   }
707 #ifndef CMK_OPTIMIZE
708   if(pose_config.stats)
709     localStats->TimerStop();
710 #endif
711 }
712
713 void GVT::addSR(SRentry **SRs, SRentry *e, POSE_TimeType og, int ne)
714 {
715   register int i;
716   SRentry *tab = (*SRs);
717   SRentry *tmp = tab;
718
719   for (i=0; i<ne; i++) {
720     if ((e[i].timestamp < og) || (og == POSE_UnsetTS)) {
721       if (!tmp) { // no entries yet
722         tab = new SRentry(e[i].timestamp, (SRentry *)NULL);
723         tab->sends = e[i].sends;
724         tab->recvs = e[i].recvs;
725         tmp = tab;
726         *SRs = tmp;
727       }
728       else {
729         if (e[i].timestamp < tmp->timestamp) { // goes before tmp
730           CkAssert(tmp == *SRs);
731           tab = new SRentry(e[i].timestamp, tmp);
732           tab->sends = e[i].sends;
733           tab->recvs = e[i].recvs;
734           tmp = tab;
735           *SRs = tmp;
736         }
737         else if (e[i].timestamp == tmp->timestamp) { // goes in first entr
738           tmp->sends = tmp->sends + e[i].sends;
739           tmp->recvs = tmp->recvs + e[i].recvs;
740         }
741         else { // search for position
742           while (tmp->next && (e[i].timestamp > tmp->next->timestamp))
743             tmp = tmp->next;
744           if (!tmp->next) { // goes at end of SRs
745             tmp->next = new SRentry(e[i].timestamp, (SRentry *)NULL);
746             tmp->next->sends = tmp->next->sends + e[i].sends;
747             tmp->next->recvs = tmp->next->recvs + e[i].recvs;
748             tmp = tmp->next;
749           }
750           else if (e[i].timestamp == tmp->next->timestamp) {//goes in tmp->next
751             tmp->next->sends = tmp->next->sends + e[i].sends;
752             tmp->next->recvs = tmp->next->recvs + e[i].recvs;
753             tmp = tmp->next;
754           }
755           else { // goes after tmp but before tmp->next
756             tmp->next = new SRentry(e[i].timestamp, tmp->next);
757             tmp->next->sends = tmp->next->sends + e[i].sends;
758             tmp->next->recvs = tmp->next->recvs + e[i].recvs;
759             tmp = tmp->next;
760           }
761         }
762       }
763     }
764     else break;
765   }
766 }
767
768 void PVT::addSR(SRentry **SRs, SRentry *e, POSE_TimeType og, int ne)
769 {
770   register int i;
771   SRentry *tab = (*SRs);
772   SRentry *tmp = tab;
773
774   for (i=0; i<ne; i++) {
775     if ((e[i].timestamp < og) || (og == POSE_UnsetTS)) {
776       if (!tmp) { // no entries yet
777         tab = new SRentry(e[i].timestamp, (SRentry *)NULL);
778         tab->sends = e[i].sends;
779         tab->recvs = e[i].recvs;
780         tmp = tab;
781         *SRs = tmp;
782       }
783       else {
784         if (e[i].timestamp < tmp->timestamp) { // goes before tmp
785           CkAssert(tmp == *SRs);
786           tab = new SRentry(e[i].timestamp, tmp);
787           tab->sends = e[i].sends;
788           tab->recvs = e[i].recvs;
789           tmp = tab;
790           *SRs = tmp;
791         }
792         else if (e[i].timestamp == tmp->timestamp) { // goes in first entr
793           tmp->sends = tmp->sends + e[i].sends;
794           tmp->recvs = tmp->recvs + e[i].recvs;
795         }
796         else { // search for position
797           while (tmp->next && (e[i].timestamp > tmp->next->timestamp))
798             tmp = tmp->next;
799           if (!tmp->next) { // goes at end of SRs
800             tmp->next = new SRentry(e[i].timestamp, (SRentry *)NULL);
801             tmp->next->sends = tmp->next->sends + e[i].sends;
802             tmp->next->recvs = tmp->next->recvs + e[i].recvs;
803             tmp = tmp->next;
804           }
805           else if (e[i].timestamp == tmp->next->timestamp) {//goes in tmp->next
806             tmp->next->sends = tmp->next->sends + e[i].sends;
807             tmp->next->recvs = tmp->next->recvs + e[i].recvs;
808             tmp = tmp->next;
809           }
810           else { // goes after tmp but before tmp->next
811             tmp->next = new SRentry(e[i].timestamp, tmp->next);
812             tmp->next->sends = tmp->next->sends + e[i].sends;
813             tmp->next->recvs = tmp->next->recvs + e[i].recvs;
814             tmp = tmp->next;
815           }
816         }
817       }
818     }
819     else break;
820   }
821 }