added a command line option +lb_gvt_pose to set the gvt intervals between load balancing.
[charm.git] / src / libs / ck-libs / pose / gvt.C
1 // Global Virtual Time estimation for POSE
2 #include "pose.h"
3 #include "srtable.h"
4 #include "gvt.def.h"
5 #include "qd.h"
6
7 CkGroupID ThePVT;
8 CkGroupID TheGVT;
9 CpvExtern(int, stateRecovery);
10 CpvExtern(eventID, theEventID);
11 /// Basic Constructor
12 PVT::PVT() 
13 {
14 #ifdef VERBOSE_DEBUG
15   CkPrintf("[%d] constructing PVT\n",CkMyPe());
16 #endif
17   CpvInitialize(int, stateRecovery);
18   CpvAccess(stateRecovery) = 0;
19   CpvInitialize(eventID, theEventID);
20   CpvAccess(theEventID)=eventID();
21   //  CpvAccess(theEventID).dump();
22   //LBTurnInstrumentOff();
23   optGVT = POSE_UnsetTS; conGVT = POSE_UnsetTS;
24   rdone=0;
25   SRs=NULL;
26 #ifdef POSE_COMM_ON
27   //com_debug = 1;
28 #endif
29 #ifndef CMK_OPTIMIZE
30   localStats = (localStat *)CkLocalBranch(theLocalStats);
31   if(pose_config.stats)
32     {
33       localStats->TimerStart(GVT_TIMER);
34     }
35 #endif
36 #ifdef MEM_TEMPORAL
37   localTimePool = (TimePool *)CkLocalBranch(TempMemID);
38   CkPrintf("NOTE: Temporal memory manager is ON!\n");
39 #endif
40   optPVT = conPVT = estGVT = POSE_UnsetTS;
41   startPhaseActive = gvtTurn = simdone = 0;
42   SendsAndRecvs = new SRtable();
43   SendsAndRecvs->Initialize();
44   specEventCount = eventCount = waitForFirst = 0;
45   iterMin = POSE_UnsetTS;
46   int P=CkNumPes(), N=CkMyPe();
47   reportReduceTo =  -1;
48   if ((N < P-2) && (N%2 == 1)) { //odd
49     reportTo = N-1;
50     reportsExpected = reportEnd = 0;
51   }
52   else if (N < P-2) { //even
53     reportTo = N;
54     reportsExpected = 2; 
55     if (N == P-3)
56       reportsExpected = 1;
57     reportEnd = 0;
58     if (N < (P-2)/2)
59       reportReduceTo = P-2;
60     else reportReduceTo = P-1;
61   }
62   if (N == P-2) {
63     reportTo = N;
64     reportEnd = 1;
65     reportsExpected = 1 + (P-2)/4 + ((P-2)%4)/2;
66   }
67   else if (N == P-1) {
68     reportTo = N;
69     reportEnd = 1;
70     if (P==1) reportsExpected = 1;
71     else reportsExpected = 1 + (P-2)/4 + (P-2)%2;
72   }
73   //  CkPrintf("PE %d reports to %d, receives %d reports, reduces and sends to %d, and reports directly to GVT if %d = 1!\n", CkMyPe(), reportTo, reportsExpected, reportReduceTo, reportEnd);
74
75   parCheckpointInProgress = 0;
76   parLastCheckpointGVT = 0;
77   parLastCheckpointTime = parStartTime = CmiWallTimer();
78   parLBInProgress = 0;
79   parLastLBGVT = 0;
80 #ifndef CMK_OPTIMIZE
81   if(pose_config.stats)
82     localStats->TimerStop();
83 #endif
84 }
85
86 /// PUP routine
87 void PVT::pup(PUP::er &p) {
88   p|optPVT; p|conPVT; p|estGVT; p|repPVT;
89   p|simdone; p|iterMin; p|waitForFirst;
90   p|reportTo; p|reportsExpected; p|reportReduceTo; p|reportEnd;
91   p|gvtTurn; p|specEventCount; p|eventCount;
92   p|startPhaseActive; p|parStartTime; p|parCheckpointInProgress;
93   p|parLastCheckpointGVT; p|parLastCheckpointTime;
94   p|parLBInProgress; p|parLastLBGVT;
95   p|optGVT; p|conGVT; p|rdone;
96
97   if (p.isUnpacking()) {
98     parStartTime = CmiWallTimer() - (parLastCheckpointTime - parStartTime);
99 #ifndef CMK_OPTIMIZE
100     localStats = (localStat *)CkLocalBranch(theLocalStats);
101 #endif
102 #ifdef MEM_TEMPORAL
103     localTimePool = (TimePool *)CkLocalBranch(TempMemID);
104 #endif
105     SendsAndRecvs = new SRtable();
106   }
107
108   SendsAndRecvs->pup(p);
109
110   if (SRs != NULL) {
111     CkAbort("ERROR: PVT member *SRs is unexpectedly not NULL\n");
112   }
113 }
114
115 void PVT::startPhaseExp(prioBcMsg *m) {
116   startPhase(m);
117 }
118
119 /// ENTRY: runs the PVT calculation and reports to GVT
120 void PVT::startPhase(prioBcMsg *m) 
121 {
122   CProxy_GVT g(TheGVT);
123   CProxy_PVT p(ThePVT);
124   register int i;
125
126   if (startPhaseActive) return;
127 #ifndef CMK_OPTIMIZE
128   if(pose_config.stats)
129     localStats->TimerStart(GVT_TIMER);
130 #endif
131   startPhaseActive = 1;
132   if (m->bc) {
133     prioBcMsg *startMsg = new (8*sizeof(POSE_TimeType)) prioBcMsg;
134     startMsg->bc = 0;
135     *((POSE_TimeType *)CkPriorityPtr(startMsg)) = 1-POSE_TimeMax;
136     CkSetQueueing(startMsg, CK_QUEUEING_IFIFO); 
137     p.startPhaseExp(startMsg);
138   }
139
140   objs.Wake(); // wake objects to make sure all have reported
141   // compute PVT
142   optPVT = conPVT = POSE_UnsetTS;
143   int end = objs.getNumSpaces();
144   for (i=0; i<end; i++)
145     if (objs.objs[i].isPresent()) {
146       if (objs.objs[i].isOptimistic()) { // check optPVT 
147         if ((optPVT < 0) || ((objs.objs[i].getOVT() < optPVT) && 
148                              (objs.objs[i].getOVT() > POSE_UnsetTS))) {
149           optPVT = objs.objs[i].getOVT();
150           CkAssert(simdone>0 || ((objs.objs[i].getOVT() >= estGVT) ||
151                                (objs.objs[i].getOVT() == POSE_UnsetTS)));
152         }
153       }
154       else if (objs.objs[i].isConservative()) { // check conPVT
155         if ((conPVT < 0) || ((objs.objs[i].getOVT() < conPVT) && 
156                              (objs.objs[i].getOVT() > POSE_UnsetTS)))
157           conPVT = objs.objs[i].getOVT();
158       }
159       CkAssert(simdone>0 || (optPVT >= estGVT)||(optPVT == POSE_UnsetTS)||(estGVT == POSE_UnsetTS));
160       CkAssert(simdone>0 || (conPVT >= estGVT)||(conPVT == POSE_UnsetTS)||(estGVT == POSE_UnsetTS));
161     }
162
163   // (1) Find out the local PVT from optPVT and conPVT
164   POSE_TimeType pvt = optPVT;
165   if ((conPVT < pvt) && (conPVT > POSE_UnsetTS)) pvt = conPVT;
166   if ((iterMin < pvt) && (iterMin > POSE_UnsetTS)) pvt = iterMin;
167   if (waitForFirst) {
168     waitForFirst = 0;
169     if (pvt == POSE_UnsetTS)
170       SendsAndRecvs->Restructure(estGVT, estGVT, POSE_UnsetTS);
171     else
172       SendsAndRecvs->Restructure(estGVT, pvt, POSE_UnsetTS);
173   }
174
175   //  CkPrintf("[%d] pvt=%d gvt=%d optPVT=%d iterMin=%d\n", CkMyPe(), pvt, estGVT, optPVT, iterMin);
176   POSE_TimeType xt;
177   if (pvt == POSE_UnsetTS) { // all are idle; find max ovt
178     POSE_TimeType maxOVT = POSE_UnsetTS;
179     for (i=0; i<end; i++)
180       if (objs.objs[i].isPresent()) {
181         xt = objs.objs[i].getOVT2();
182         if (xt > maxOVT)
183           maxOVT = xt;
184       }
185     if (maxOVT > estGVT)
186       pvt = maxOVT;
187   }
188   
189   // (2) Pack the SRtable data into the message
190   POSE_TimeType maxSR;
191   UpdateMsg *um = SendsAndRecvs->PackTable(pvt, &maxSR);
192   // (3) Add the PVT info to the message
193   um->optPVT = pvt;
194   um->conPVT = conPVT;
195   um->maxSR = maxSR;
196   um->runGVTflag = 0;
197
198   if (um->numEntries > 0) {
199     //CkPrintf("PE %d has %d SRs reported to GVT; earliest=%d pvt=%d\n", CkMyPe(), um->numEntries, um->SRs[0].timestamp, pvt);
200   }
201   // send data to GVT estimation
202   p[reportTo].reportReduce(um);
203
204   /*
205   if (simdone) // transmit final info to GVT on PE 0
206     g[0].computeGVT(um);              
207   else {
208     g[gvtTurn].computeGVT(um);           // transmit info to GVT
209     gvtTurn = (gvtTurn + 1) % CkNumPes();  // calculate next GVT location
210   }
211   */
212   objs.SetIdle(); // Set objects to idle
213   iterMin = POSE_UnsetTS;
214 #ifndef CMK_OPTIMIZE
215   if(pose_config.stats)
216     localStats->TimerStop();
217 #endif
218 }
219
220 /// ENTRY: receive GVT estimate; wake up objects
221 void PVT::setGVT(GVTMsg *m)
222 {
223 #ifndef CMK_OPTIMIZE
224   if(pose_config.stats)
225     localStats->TimerStart(GVT_TIMER);
226 #endif
227   CProxy_PVT p(ThePVT);
228   CkAssert(m->estGVT >= estGVT);
229   estGVT = m->estGVT;
230   int i, end = objs.getNumSpaces();
231 #ifdef POSE_COMM_ON  
232   //PrioStreaming *pstrat = (PrioStreaming *)(POSE_commlib_insthndl.getStrategy());
233   //pstrat->setBasePriority((estGVT+10) - POSE_TimeMax);
234   //pstrat->setBasePriority(estGVT+10);
235 #endif
236   simdone = m->done;
237   CkFreeMsg(m);
238   waitForFirst = 1;
239   objs.Commit();
240 #ifdef MEM_TEMPORAL
241   localTimePool->set_min_time(estGVT);
242 #endif
243
244   // Parallel checkpointing: setGVT was broken into two functions, and
245   // beginCheckpoint was added.  Only initiate the checkpointing
246   // procedure on PE 0, after commits have occurred.  This should
247   // minimize the amount of data written to disk.  In order to ensure
248   // a stable state, we wait for quiescence to be reached before
249   // beginning the checkpoint.  Inconsistent results were obtained
250   // (possibly from messages still in transit) without this step.
251   // Once quiescence is reached, PE 0 begins the checkpoint, and then
252   // resumes the simulation in resumeAfterCheckpoint.  This Callback
253   // function is also the first POSE function to be called when
254   // restarting from a checkpoint.
255
256   // Checkpoints are initiated approximately every
257   // pose_config.checkpoint_gvt_interval GVT ticks or
258   // pose_config.checkpoint_time_interval seconds (both defined in
259   // pose_config.h).
260
261   if ((CkMyPe() == 0) && (parCheckpointInProgress == 0) && 
262       (((pose_config.checkpoint_gvt_interval > 0) && (estGVT >= (parLastCheckpointGVT + pose_config.checkpoint_gvt_interval))) || 
263        ((pose_config.checkpoint_time_interval > 0) && (CmiWallTimer() >= (parLastCheckpointTime + (double)pose_config.checkpoint_time_interval))))) {
264     // wait for quiescence to occur before checkpointing
265     eventMsg *dummyMsg = new eventMsg();
266     CkCallback cb(CkIndex_PVT::beginCheckpoint(dummyMsg), CkMyPe(), ThePVT);
267     parCheckpointInProgress = 1;
268     parLastCheckpointTime = CmiWallTimer();
269     CkStartQD(cb);
270   } else if ((CkMyPe() == 0) && (parLBInProgress == 0) && 
271       (((pose_config.lb_gvt_interval > 0) && (estGVT >= (parLastLBGVT + pose_config.lb_gvt_interval))))) {
272     // wait for quiescence to occur before checkpointing
273     eventMsg *dummyMsg = new eventMsg();
274     CkCallback cb(CkIndex_PVT::beginLoadbalancing(dummyMsg), CkMyPe(), ThePVT);
275     parLBInProgress = 1;
276     CkStartQD(cb);
277   } else {
278     // skip checkpointing
279     eventMsg *dummyMsg = new eventMsg();
280     p[CkMyPe()].resumeAfterCheckpoint(dummyMsg);
281   }
282 }
283
284 /// ENTRY: begin checkpoint now that quiescence has been reached
285 void PVT::beginCheckpoint(eventMsg *m) {
286   CkFreeMsg(m);
287   if (parCheckpointInProgress) {  // ensure this only happens once
288     CkPrintf("POSE: quiescence detected\n");
289     CkPrintf("POSE: beginning checkpoint on processor %d at GVT=%lld time=%.1f sec\n", CkMyPe(), estGVT, CmiWallTimer() - parStartTime);
290     eventMsg *dummyMsg = new eventMsg();
291     CkCallback cb(CkIndex_PVT::resumeAfterCheckpoint(dummyMsg), CkMyPe(), ThePVT);
292     CkStartCheckpoint(POSE_CHECKPOINT_DIRECTORY, cb);
293   }
294 }
295
296 void PVT::beginLoadbalancing(eventMsg *m) {
297   CkFreeMsg(m);
298   if (parLBInProgress) {  // ensure this only happens once
299     CProxy_PVT p(ThePVT);
300     p.callAtSync();
301   }
302 }
303
304 void PVT::callAtSync()
305 {
306   objs.callAtSync();
307 }
308
309 void PVT::doneLB() {
310   static int count = 0;
311   count ++;
312   if (count == objs.getNumObjs()) {
313     count =0;
314     if (CkMyPe()==0) { 
315       eventMsg *dummyMsg = new eventMsg();
316       CProxy_PVT p(ThePVT);
317       p[0].resumeAfterCheckpoint(dummyMsg);
318     }
319   }
320 }
321
322 /// ENTRY: resume after checkpointing, restarting, or if checkpointing doesn't occur
323 void PVT::resumeAfterCheckpoint(eventMsg *m) {
324   if (parCheckpointInProgress) {
325     CkPrintf("POSE: checkpoint/restart complete on processor %d at GVT=%lld time=%.1f sec\n", CkMyPe(), estGVT, CmiWallTimer() - parStartTime);
326     parCheckpointInProgress = 0;
327     parLastCheckpointGVT = estGVT;
328   }
329   CkFreeMsg(m);
330   CProxy_PVT p(ThePVT);
331   startPhaseActive = 0;
332   prioBcMsg *startMsg = new (8*sizeof(int)) prioBcMsg;
333   startMsg->bc = 1;
334   *((int *)CkPriorityPtr(startMsg)) = 0;
335   CkSetQueueing(startMsg, CK_QUEUEING_IFIFO); 
336   p[CkMyPe()].startPhase(startMsg);
337 #ifndef CMK_OPTIMIZE
338   if(pose_config.stats)
339     localStats->TimerStop();
340 #endif
341 }
342
343 void PVT::resumeAfterLB(eventMsg *m) {
344   if (parLBInProgress) {
345     CkPrintf("POSE: load balancing complete on processor %d at GVT=%lld time=%.1f sec\n", CkMyPe(), estGVT, CmiWallTimer() - parStartTime);
346     parLBInProgress = 0;
347     parLastLBGVT = estGVT;
348   }
349   CkFreeMsg(m);
350   CProxy_PVT p(ThePVT);
351   startPhaseActive = 0;
352   prioBcMsg *startMsg = new (8*sizeof(int)) prioBcMsg;
353   startMsg->bc = 1;
354   *((int *)CkPriorityPtr(startMsg)) = 0;
355   CkSetQueueing(startMsg, CK_QUEUEING_IFIFO); 
356   p[CkMyPe()].startPhase(startMsg);
357 #ifndef CMK_OPTIMIZE
358   if(pose_config.stats)
359     localStats->TimerStop();
360 #endif
361 }
362
363 /// Register poser with PVT
364 int PVT::objRegister(int arrIdx, POSE_TimeType safeTime, int sync, sim *myPtr)
365 {
366   int i = objs.Insert(arrIdx, POSE_UnsetTS, sync, myPtr); // add to object list
367   return(i*1000 + CkMyPe());                          // return unique PVT idx
368 }
369
370 // Unregister poser from PVT
371 void PVT::objRemove(int pvtIdx)
372 {
373   int idx = (pvtIdx-CkMyPe())/1000;  // calculate local index from unique index
374   objs.Delete(idx);                  // delete the object
375 }
376
377 /// Update send/recv table at timestamp
378 void PVT::objUpdate(POSE_TimeType timestamp, int sr)
379 {
380 #ifndef CMK_OPTIMIZE
381   int tstat = localStats->TimerRunning();
382   if(pose_config.stats){
383     if (tstat)
384       localStats->SwitchTimer(GVT_TIMER);
385     else
386       localStats->TimerStart(GVT_TIMER);
387   }
388 #endif
389   //if ((timestamp < estGVT) && (estGVT > POSE_UnsetTS))
390   //CkPrintf("timestamp=%d estGVT=%d simdone=%d sr=%d\n", timestamp,
391   //estGVT, simdone, sr);
392   CkAssert(simdone>0 || (timestamp >= estGVT) || (estGVT == POSE_UnsetTS));
393   CkAssert((sr == SEND) || (sr == RECV));
394   if ((estGVT > POSE_UnsetTS) && 
395       ((timestamp < iterMin) || (iterMin == POSE_UnsetTS))) 
396     iterMin = timestamp;
397   if (waitForFirst) {
398     waitForFirst = 0;
399     SendsAndRecvs->Restructure(estGVT, timestamp, sr);
400   }
401   else SendsAndRecvs->Insert(timestamp, sr);
402 #ifndef CMK_OPTIMIZE
403   if(pose_config.stats){
404     if (tstat)
405       localStats->SwitchTimer(tstat);
406     else
407       localStats->TimerStop();
408   }
409 #endif
410
411 }
412
413 /// Update PVT with safeTime
414 void PVT::objUpdateOVT(int pvtIdx, POSE_TimeType safeTime, POSE_TimeType ovt)
415 {
416   int index = (pvtIdx-CkMyPe())/1000;
417   // minimize the non-idle OVT
418   //  if ((safeTime < estGVT) && (safeTime > POSE_UnsetTS)) 
419
420   CkAssert(simdone>0 || (safeTime >= estGVT) || (safeTime == POSE_UnsetTS));
421   if ((safeTime == POSE_UnsetTS) && (objs.objs[index].getOVT2() < ovt))
422     objs.objs[index].setOVT2(ovt);
423   else if ((safeTime > POSE_UnsetTS) && 
424            ((objs.objs[index].getOVT() > safeTime) || (objs.objs[index].getOVT() == POSE_UnsetTS)))
425     objs.objs[index].setOVT(safeTime);
426 }
427
428 /// Reduction point for PVT reports
429 void PVT::reportReduce(UpdateMsg *m)
430 {
431 #ifndef CMK_OPTIMIZE
432   if(pose_config.stats)
433     localStats->TimerStart(GVT_TIMER);
434 #endif
435   CProxy_PVT p(ThePVT);
436   CProxy_GVT g(TheGVT);
437   POSE_TimeType lastGVT = 0, maxSR=0;
438
439   // see if message provides new min optGVT or conGVT
440   if ((optGVT < 0) || ((m->optPVT > POSE_UnsetTS) && (m->optPVT < optGVT)))
441     optGVT = m->optPVT;
442   if (m->maxSR > 0)
443     maxSR = m->maxSR;
444   addSR(&SRs, m->SRs, optGVT, m->numEntries);
445   rdone++;
446   CkFreeMsg(m);
447
448   if (rdone == reportsExpected) { // all PVT reports are in
449     UpdateMsg *um;
450     int entryCount = 0;
451     // pack data into um
452     SRentry *tmp = SRs;
453     while (tmp && ((tmp->timestamp <= optGVT) || (optGVT == POSE_UnsetTS))
454            && (tmp->sends != tmp->recvs)) {
455       entryCount++;
456       tmp = tmp->next;
457     }
458     um = new (entryCount * sizeof(SRentry), 0) UpdateMsg;
459     tmp = SRs;
460     int i=0;
461     while (tmp && ((tmp->timestamp <= optGVT) || (optGVT == POSE_UnsetTS))
462            && (tmp->sends != tmp->recvs)) {
463       um->SRs[i] = *tmp;
464       tmp = tmp->next;
465       i++;
466     }
467     um->numEntries = entryCount;
468     um->optPVT = optGVT;
469     um->conPVT = conGVT;
470     um->maxSR = maxSR;
471     um->runGVTflag = 0;
472
473     if (reportEnd) { //send to computeGVT
474       if (simdone>0) // transmit final info to GVT on PE 0
475         g[0].computeGVT(um);              
476       else {
477         g[gvtTurn].computeGVT(um);           // transmit info to GVT
478         gvtTurn = (gvtTurn + 1) % CkNumPes();  // calculate next GVT location
479       }
480     }
481     else { //send to pvt reportReduceTo
482       p[reportReduceTo].reportReduce(um);
483     }
484
485     // reset static data
486     optGVT = conGVT = POSE_UnsetTS;
487     SRentry *cur = SRs;
488     SRs = NULL;
489     while (cur) {
490       tmp = cur->next;
491       delete cur;
492       cur = tmp;
493     }
494     rdone = 0;
495   }
496 #ifndef CMK_OPTIMIZE
497   if(pose_config.stats)
498     localStats->TimerStop();
499 #endif
500 }
501
502 /// Basic Constructor
503 GVT::GVT() 
504 {
505 #ifdef VERBOSE_DEBUG
506   CkPrintf("[%d] constructing GVT\n",CkMyPe());
507 #endif
508
509   optGVT = POSE_UnsetTS, conGVT = POSE_UnsetTS;
510   done=0;
511   SRs = NULL;
512   startOffset = 0;
513
514 #ifndef CMK_OPTIMIZE
515   localStats = (localStat *)CkLocalBranch(theLocalStats);
516 #endif
517 #ifndef SEQUENTIAL_POSE
518   if(pose_config.lb_on)
519     nextLBstart = pose_config.lb_skip - 1;
520 #endif
521   estGVT = lastEarliest = inactiveTime = POSE_UnsetTS;
522   lastSends = lastRecvs = inactive = 0;
523   reportsExpected = 1;
524   if (CkNumPes() >= 2) reportsExpected = 2;
525     
526   //  CkPrintf("GVT expects %d reports!\n", reportsExpected);
527   if (CkMyPe() == 0) { // start the PVT phase of the GVT algorithm
528     CProxy_PVT p(ThePVT);
529     prioBcMsg *startMsg = new (8*sizeof(int)) prioBcMsg;
530     startMsg->bc = 1;
531     *((int *)CkPriorityPtr(startMsg)) = 0;
532     CkSetQueueing(startMsg, CK_QUEUEING_IFIFO); 
533     p.startPhase(startMsg); // broadcast PVT calculation to all PVT branches
534   }
535 }
536
537 // Used for Ccd calls; currently commented out
538 //void GVT::_runGVT(UpdateMsg *m) 
539 //{ 
540 //  CProxy_GVT g(TheGVT);
541 //  g[(CkMyPe() + 1)%CkNumPes()].runGVT(m);
542 //}
543
544 /// ENTRY: Run the GVT
545 void GVT::runGVT(UpdateMsg *m) 
546 {
547 #ifndef CMK_OPTIMIZE
548   if(pose_config.stats)
549     localStats->TimerStart(GVT_TIMER);
550 #endif
551   estGVT = m->optPVT;
552   inactive = m->inactive;
553   inactiveTime = m->inactiveTime;
554   nextLBstart = m->nextLB;
555   CProxy_GVT g(TheGVT);
556   m->runGVTflag = 1;
557   g[CkMyPe()].computeGVT(m);  // start the next PVT phase of the GVT algorithm
558 #ifndef CMK_OPTIMIZE
559   if(pose_config.stats)
560     localStats->TimerStop();
561 #endif
562 }
563
564 /// ENTRY: Gathers PVT reports; calculates and broadcasts GVT to PVTs
565 void GVT::computeGVT(UpdateMsg *m)
566 {
567 #ifndef CMK_OPTIMIZE
568   if(pose_config.stats)
569     localStats->TimerStart(GVT_TIMER);
570 #endif
571   CProxy_PVT p(ThePVT);
572   CProxy_GVT g(TheGVT);
573   GVTMsg *gmsg = new GVTMsg;
574   POSE_TimeType lastGVT = 0, earliestMsg = POSE_UnsetTS, 
575     earlyAny = POSE_UnsetTS;
576
577   if (CkMyPe() != 0) startOffset = 1;
578   if (m->runGVTflag == 1) done++;
579   else {
580     // see if message provides new min optGVT or conGVT
581     if ((optGVT < 0) || ((m->optPVT > POSE_UnsetTS) && (m->optPVT < optGVT)))
582       optGVT = m->optPVT;
583     if ((conGVT < 0) || ((m->conPVT > POSE_UnsetTS) && (m->conPVT < conGVT)))
584       conGVT = m->conPVT;
585     if (m->maxSR > earlyAny) 
586       earlyAny = m->maxSR;
587     // add send/recv info to SRs
588     /*    if (m->numEntries > 0)
589       CkPrintf("GVT recv'd %d SRs from a PE, earliest=%d\n", m->numEntries, 
590       m->SRs[0].timestamp);*/
591     addSR(&SRs, m->SRs, optGVT, m->numEntries);
592     done++;
593   }
594   CkFreeMsg(m);
595
596   if (done == reportsExpected+startOffset) { // all PVT reports are in
597 #ifndef CMK_OPTIMIZE
598     if(pose_config.stats)
599       localStats->GvtInc();
600 #endif
601     done = 0;
602     startOffset = 1;
603     lastGVT = estGVT; // store previous estimate
604     if (lastGVT < 0) lastGVT = 0;
605     estGVT = POSE_UnsetTS;
606     
607     // derive GVT estimate from min optimistic & conservative GVTs
608     estGVT = optGVT;
609     if ((conGVT > POSE_UnsetTS) && (estGVT > POSE_UnsetTS) && (conGVT < estGVT))  estGVT = conGVT;
610
611     // Check if send/recv activity provides lower possible estimate
612     /*    if (SRs) SRs->dump();
613           else CkPrintf("No SRs reported to GVT!\n");*/
614     SRentry *tmp = SRs;
615     POSE_TimeType lastSR = POSE_UnsetTS;
616     while (tmp && ((tmp->timestamp <= estGVT) || (estGVT == POSE_UnsetTS))) {
617       lastSR = tmp->timestamp;
618       if (tmp->sends != tmp->recvs) {
619         earliestMsg = tmp->timestamp;
620         break;
621       }
622       tmp = tmp->next;
623     }
624     /*    if ((earliestMsg > POSE_UnsetTS) || (earlyAny > POSE_UnsetTS))
625           CkPrintf("GVT: earlyDiff=%d earlyAny=%d estGVT was %d.\n", earliestMsg, earlyAny, estGVT);*/
626     if (((earliestMsg < estGVT) && (earliestMsg != POSE_UnsetTS)) ||
627         (estGVT == POSE_UnsetTS))
628       estGVT = earliestMsg;
629     if ((lastSR != POSE_UnsetTS) && (estGVT == POSE_UnsetTS) && 
630         (lastSR > lastGVT))
631       estGVT = lastSR;
632
633     // check for inactivity
634     if ((optGVT == POSE_UnsetTS) && (earliestMsg == POSE_UnsetTS)) {
635       inactive++;
636       /*
637       if (inactive == 1) {
638         CkPrintf("[%d] Inactive... calling CkWaitQD...\n", CkMyPe());
639         CkWaitQD();
640         CkPrintf("[%d] Back from CkWaitQD...\n", CkMyPe());
641       }
642       */
643       estGVT = lastGVT;
644       if (inactive == 1) inactiveTime = lastGVT;
645     }
646     else if (estGVT < 0) {
647       estGVT = lastGVT;
648       inactive = 0;
649     }
650     else inactive = 0;
651
652     // check the estimate
653     //CkPrintf("opt=%d con=%d lastGVT=%d early=%d lastSR=%d et=%d\n", optGVT, conGVT, lastGVT, earliestMsg, lastSR, POSE_endtime);
654     CmiAssert(estGVT >= lastGVT); 
655     //if (estGVT % 1000 == 0)
656     //CkPrintf("[%d] New GVT = %d\n", CkMyPe(), estGVT);
657     //CkPrintf("[%d] New GVT = %lld\n", CkMyPe(), estGVT);
658
659     // check for termination conditions
660     int term = 0;
661     if ((estGVT >= POSE_endtime) && (POSE_endtime > POSE_UnsetTS)) {
662 #if USE_LONG_TIMESTAMPS      
663       CkPrintf("At endtime: %lld\n", POSE_endtime);
664 #else
665       CkPrintf("At endtime: %d\n", POSE_endtime);
666 #endif
667       term = 1;
668     }
669     else if (inactive > 2) {
670 #if USE_LONG_TIMESTAMPS      
671       CkPrintf("Simulation inactive at time: %lld\n", inactiveTime);
672 #else
673       CkPrintf("Simulation inactive at time: %d\n", inactiveTime);
674 #endif
675       term = 1;
676     }
677
678     // report the last new GVT estimate to all PVT branches
679     gmsg->estGVT = estGVT;
680     gmsg->done = term;
681     if (term) {
682       //if (POSE_endtime > POSE_UnsetTS) gmsg->estGVT = POSE_endtime + 1;
683       // else gmsg->estGVT++;
684 #if USE_LONG_TIMESTAMPS      
685       CkPrintf("Final GVT = %lld\n", gmsg->estGVT);
686 #else
687       CkPrintf("Final GVT = %d\n", gmsg->estGVT);
688 #endif
689       p.setGVT(gmsg);
690       POSE_stop();
691     }
692     else {
693       p.setGVT(gmsg);
694
695       if(pose_config.lb_on)
696         {
697           // perform load balancing
698 #ifndef CMK_OPTIMIZE
699           if(pose_config.stats)
700             localStats->SwitchTimer(LB_TIMER);
701 #endif
702          
703           if (CkNumPes() > 1) {
704             nextLBstart++;
705             if (pose_config.lb_skip == nextLBstart) {
706               TheLBG.calculateLocalLoad();
707               nextLBstart = 0;
708             }
709           }
710 #ifndef CMK_OPTIMIZE
711           if(pose_config.stats)
712             localStats->SwitchTimer(GVT_TIMER);
713 #endif
714         }
715
716       // transmit data to start next GVT estimation on next GVT branch
717       UpdateMsg *umsg = new UpdateMsg;
718       umsg->maxSR=0;
719       umsg->optPVT = estGVT;
720       umsg->inactive = inactive;
721       umsg->inactiveTime = inactiveTime;
722       umsg->nextLB = nextLBstart;
723       umsg->runGVTflag = 0;
724       g[(CkMyPe()+1) % CkNumPes()].runGVT(umsg);
725     }
726
727     // reset static data
728     optGVT = conGVT = POSE_UnsetTS;
729     SRentry *cur = SRs;
730     SRs = NULL;
731     while (cur) {
732       tmp = cur->next;
733       delete cur;
734       cur = tmp;
735     }
736   }
737 #ifndef CMK_OPTIMIZE
738   if(pose_config.stats)
739     localStats->TimerStop();
740 #endif
741 }
742
743 void GVT::addSR(SRentry **SRs, SRentry *e, POSE_TimeType og, int ne)
744 {
745   register int i;
746   SRentry *tab = (*SRs);
747   SRentry *tmp = tab;
748
749   for (i=0; i<ne; i++) {
750     if ((e[i].timestamp < og) || (og == POSE_UnsetTS)) {
751       if (!tmp) { // no entries yet
752         tab = new SRentry(e[i].timestamp, (SRentry *)NULL);
753         tab->sends = e[i].sends;
754         tab->recvs = e[i].recvs;
755         tmp = tab;
756         *SRs = tmp;
757       }
758       else {
759         if (e[i].timestamp < tmp->timestamp) { // goes before tmp
760           CkAssert(tmp == *SRs);
761           tab = new SRentry(e[i].timestamp, tmp);
762           tab->sends = e[i].sends;
763           tab->recvs = e[i].recvs;
764           tmp = tab;
765           *SRs = tmp;
766         }
767         else if (e[i].timestamp == tmp->timestamp) { // goes in first entr
768           tmp->sends = tmp->sends + e[i].sends;
769           tmp->recvs = tmp->recvs + e[i].recvs;
770         }
771         else { // search for position
772           while (tmp->next && (e[i].timestamp > tmp->next->timestamp))
773             tmp = tmp->next;
774           if (!tmp->next) { // goes at end of SRs
775             tmp->next = new SRentry(e[i].timestamp, (SRentry *)NULL);
776             tmp->next->sends = tmp->next->sends + e[i].sends;
777             tmp->next->recvs = tmp->next->recvs + e[i].recvs;
778             tmp = tmp->next;
779           }
780           else if (e[i].timestamp == tmp->next->timestamp) {//goes in tmp->next
781             tmp->next->sends = tmp->next->sends + e[i].sends;
782             tmp->next->recvs = tmp->next->recvs + e[i].recvs;
783             tmp = tmp->next;
784           }
785           else { // goes after tmp but before tmp->next
786             tmp->next = new SRentry(e[i].timestamp, tmp->next);
787             tmp->next->sends = tmp->next->sends + e[i].sends;
788             tmp->next->recvs = tmp->next->recvs + e[i].recvs;
789             tmp = tmp->next;
790           }
791         }
792       }
793     }
794     else break;
795   }
796 }
797
798 void PVT::addSR(SRentry **SRs, SRentry *e, POSE_TimeType og, int ne)
799 {
800   register int i;
801   SRentry *tab = (*SRs);
802   SRentry *tmp = tab;
803
804   for (i=0; i<ne; i++) {
805     if ((e[i].timestamp < og) || (og == POSE_UnsetTS)) {
806       if (!tmp) { // no entries yet
807         tab = new SRentry(e[i].timestamp, (SRentry *)NULL);
808         tab->sends = e[i].sends;
809         tab->recvs = e[i].recvs;
810         tmp = tab;
811         *SRs = tmp;
812       }
813       else {
814         if (e[i].timestamp < tmp->timestamp) { // goes before tmp
815           CkAssert(tmp == *SRs);
816           tab = new SRentry(e[i].timestamp, tmp);
817           tab->sends = e[i].sends;
818           tab->recvs = e[i].recvs;
819           tmp = tab;
820           *SRs = tmp;
821         }
822         else if (e[i].timestamp == tmp->timestamp) { // goes in first entr
823           tmp->sends = tmp->sends + e[i].sends;
824           tmp->recvs = tmp->recvs + e[i].recvs;
825         }
826         else { // search for position
827           while (tmp->next && (e[i].timestamp > tmp->next->timestamp))
828             tmp = tmp->next;
829           if (!tmp->next) { // goes at end of SRs
830             tmp->next = new SRentry(e[i].timestamp, (SRentry *)NULL);
831             tmp->next->sends = tmp->next->sends + e[i].sends;
832             tmp->next->recvs = tmp->next->recvs + e[i].recvs;
833             tmp = tmp->next;
834           }
835           else if (e[i].timestamp == tmp->next->timestamp) {//goes in tmp->next
836             tmp->next->sends = tmp->next->sends + e[i].sends;
837             tmp->next->recvs = tmp->next->recvs + e[i].recvs;
838             tmp = tmp->next;
839           }
840           else { // goes after tmp but before tmp->next
841             tmp->next = new SRentry(e[i].timestamp, tmp->next);
842             tmp->next->sends = tmp->next->sends + e[i].sends;
843             tmp->next->recvs = tmp->next->recvs + e[i].recvs;
844             tmp = tmp->next;
845           }
846         }
847       }
848     }
849     else break;
850   }
851 }