Remove archaic CVS headers
[charm.git] / src / ck-perf / trace-summary.C
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 /**
9  * \addtogroup CkPerf
10 */
11 /*@{*/
12
13 #include "charm++.h"
14 #include "trace-summary.h"
15 #include "trace-summaryBOC.h"
16
17 #define DEBUGF(x)  // CmiPrintf x
18
19 #define VER   7.1
20
21 #define INVALIDEP     -2
22 #define TRACEON_EP     -3
23
24 // 1 minutes of run before it'll fill up:
25 #define DefaultBinCount      (1000*60*1) 
26
27 CkpvStaticDeclare(TraceSummary*, _trace);
28 static int _numEvents = 0;
29 #define NUM_DUMMY_EPS 9
30 CkpvDeclare(int, binCount);
31 CkpvDeclare(double, binSize);
32 CkpvDeclare(double, version);
33
34
35 CkpvDeclare(int, previouslySentBins);
36
37
38
39
40
41 /** 
42     A class that reads/writes a buffer out of different types of data.
43
44     This class exists because I need to get references to parts of the buffer 
45     that have already been used so that I can increment counters inside the buffer.
46 */
47
48 class compressedBuffer {
49 public:
50   char* buf;
51   int pos; ///<< byte position just beyond the previously read/written data
52
53   compressedBuffer(){
54     buf = NULL;
55     pos = 0;
56   }
57
58   compressedBuffer(int bytes){
59     buf = (char*)malloc(bytes);
60     pos = 0;
61   }
62
63   compressedBuffer(void *buffer){
64     buf = (char*)buffer;
65     pos = 0;
66   }
67     
68   void init(void *buffer){
69     buf = (char*)buffer;
70     pos = 0;
71   }
72     
73   inline void * currentPtr(){
74     return (void*)(buf+pos);
75   }
76
77   template <typename T>
78   T read(int offset){
79     // to resolve unaligned writes causing bus errors, need memcpy
80     T v;
81     memcpy(&v, buf+offset, sizeof(T));
82     return v;
83   }
84     
85   template <typename T>
86   void write(T v, int offset){
87     T v2 = v; // on stack
88     // to resolve unaligned writes causing bus errors, need memcpy
89     memcpy(buf+offset, &v2, sizeof(T));
90   }
91     
92   template <typename T>
93   void increment(int offset){
94     T temp;
95     temp = read<T>(offset);
96     temp ++;
97     write<T>(temp, offset);
98   }
99
100   template <typename T>
101   void accumulate(T v, int offset){
102     T temp;
103     temp = read<T>(offset);
104     temp += v;
105     write<T>(temp, offset);
106   }
107   
108   template <typename T>
109   int push(T v){
110     int oldpos = pos;
111     write<T>(v, pos);
112     pos += sizeof(T);
113     return oldpos;
114   }
115   
116   template <typename T>
117   T pop(){
118     T temp = read<T>(pos);
119     pos += sizeof(T);
120     return temp;
121   }
122
123   template <typename T>
124   T peek(){
125     T temp = read<T>(pos);
126     return temp;
127   }
128
129   template <typename T0, typename T>
130   T peekSecond(){
131     T temp;
132     memcpy(&temp, buf+pos+sizeof(T0), sizeof(T));
133     return temp;
134   }
135
136   int datalength(){
137     return pos;
138   }
139      
140   void * buffer(){
141     return (void*) buf;
142   }  
143
144   void freeBuf(){
145     free(buf);
146   }
147
148   ~compressedBuffer(){
149     // don't free the buf because the user my have supplied the buffer
150   }
151     
152 };
153
154
155
156
157 // Global Readonly
158 CkGroupID traceSummaryGID;
159 bool summaryCcsStreaming;
160
161 int sumonly = 0;
162 int sumDetail = 0;
163
164 /**
165   For each TraceFoo module, _createTraceFoo() must be defined.
166   This function is called in _createTraces() generated in moduleInit.C
167 */
168 void _createTracesummary(char **argv)
169 {
170   DEBUGF(("%d createTraceSummary\n", CkMyPe()));
171   CkpvInitialize(TraceSummary*, _trace);
172   CkpvInitialize(int, previouslySentBins);
173   CkpvAccess(previouslySentBins) = 0;
174   CkpvAccess(_trace) = new  TraceSummary(argv);
175   CkpvAccess(_traces)->addTrace(CkpvAccess(_trace));
176   if (CkMyPe()==0) CkPrintf("Charm++: Tracemode Summary enabled.\n");
177 }
178
179
180 /// function call for starting a phase in trace summary logs 
181 extern "C" 
182 void CkSummary_StartPhase(int phase)
183 {
184    CkpvAccess(_trace)->startPhase(phase);
185 }
186
187
188 /// function call for adding an event mark
189 extern "C" 
190 void CkSummary_MarkEvent(int eventType)
191 {
192    CkpvAccess(_trace)->addEventType(eventType);
193 }
194
195 static inline void writeU(FILE* fp, int u)
196 {
197   fprintf(fp, "%4d", u);
198 }
199
200 PhaseEntry::PhaseEntry() 
201 {
202   int _numEntries=_entryTable.size();
203   // FIXME: Hopes there won't be more than 10 more EP's registered from now on...
204   nEPs = _numEntries+10; 
205   count = new int[nEPs];
206   times = new double[nEPs];
207   maxtimes = new double[nEPs];
208   for (int i=0; i<nEPs; i++) {
209     count[i] = 0;
210     times[i] = 0.0;
211     maxtimes[i] = 0.;
212   }
213 }
214
215 SumLogPool::~SumLogPool() 
216 {
217   if (!sumonly) {
218     write();
219     fclose(fp);
220     if (sumDetail) fclose(sdfp);
221   }
222   // free memory for mark
223   if (markcount > 0)
224   for (int i=0; i<MAX_MARKS; i++) {
225     for (int j=0; j<events[i].length(); j++)
226       delete events[i][j];
227   }
228   delete[] pool;
229   delete[] epInfo;
230   delete[] cpuTime;
231   delete[] numExecutions;
232 }
233
234 void SumLogPool::addEventType(int eventType, double time)
235 {
236    if (eventType <0 || eventType >= MAX_MARKS) {
237        CkPrintf("Invalid event type %d!\n", eventType);
238        return;
239    }
240    MarkEntry *e = new MarkEntry;
241    e->time = time;
242    events[eventType].push_back(e);
243    markcount ++;
244 }
245
246 SumLogPool::SumLogPool(char *pgm) : numBins(0), phaseTab(MAX_PHASES) 
247 {
248    // TBD: Can this be moved to initMem?
249   cpuTime = NULL;
250    poolSize = CkpvAccess(binCount);
251    if (poolSize % 2) poolSize++;        // make sure it is even
252    pool = new BinEntry[poolSize];
253    _MEMCHECK(pool);
254
255    this->pgm = new char[strlen(pgm)+1];
256    strcpy(this->pgm,pgm);
257    
258 #if 0
259    // create the sts file
260    if (CkMyPe() == 0) {
261      char *fname = 
262        new char[strlen(CkpvAccess(traceRoot))+strlen(".sum.sts")+1];
263      sprintf(fname, "%s.sum.sts", CkpvAccess(traceRoot));
264      stsfp = fopen(fname, "w+");
265      //CmiPrintf("File: %s \n", fname);
266      if (stsfp == 0) {
267        CmiAbort("Cannot open summary sts file for writing.\n");
268       }
269      delete[] fname;
270    }
271 #endif
272    
273    // event
274    markcount = 0;
275 }
276
277 void SumLogPool::initMem()
278 {
279    int _numEntries=_entryTable.size();
280    epInfoSize = _numEntries + NUM_DUMMY_EPS + 1; // keep a spare EP
281    epInfo = new SumEntryInfo[epInfoSize];
282    _MEMCHECK(epInfo);
283
284    cpuTime = NULL;
285    numExecutions = NULL;
286    if (sumDetail) {
287        cpuTime = new double[poolSize*epInfoSize];
288        _MEMCHECK(cpuTime);
289        memset(cpuTime, 0, poolSize*epInfoSize*sizeof(double));
290        numExecutions = new int[poolSize*epInfoSize];
291        _MEMCHECK(numExecutions);
292        memset(numExecutions, 0, poolSize*epInfoSize*sizeof(int));
293
294 //         int i, e;
295 //         for(i=0; i<poolSize; i++) {
296 //             for(e=0; e< epInfoSize; e++) {
297 //                 setCPUtime(i,e,0.0);
298 //                 setNumExecutions(i,e,0);
299 //             }
300 //         }
301    }
302 }
303
304 int SumLogPool::getUtilization(int interval, int ep) {
305     return (int)(getCPUtime(interval, ep) * 100.0 / CkpvAccess(binSize)); 
306 }
307
308 void SumLogPool::write(void) 
309 {
310   int i;
311   unsigned int j;
312   int _numEntries=_entryTable.size();
313
314   fp = NULL;
315   sdfp = NULL;
316
317   // create file(s)
318   // CmiPrintf("TRACE: %s:%d\n", fname, errno);
319   if (!sumonly) {
320     char pestr[10];
321     sprintf(pestr, "%d", CkMyPe());
322     int len = strlen(pgm) + strlen(".sumd.") + strlen(pestr) + 1;
323     char *fname = new char[len+1];
324     
325     sprintf(fname, "%s.%s.sum", pgm, pestr);
326     do {
327       fp = fopen(fname, "w+");
328     } while (!fp && errno == EINTR);
329     if (!fp) {
330       CkPrintf("[%d] Attempting to open [%s]\n",CkMyPe(),fname);
331       CmiAbort("Cannot open Summary Trace File for writing...\n");
332     }
333     
334     if (sumDetail) {
335       sprintf(fname, "%s.%s.sumd", pgm, pestr);
336       do {
337         sdfp = fopen(fname, "w+");
338       } while (!sdfp && errno == EINTR);
339       if(!sdfp) {
340         CmiAbort("Cannot open Detailed Summary Trace File for writing...\n");
341       }
342     }
343     delete[] fname;
344   }
345
346   fprintf(fp, "ver:%3.1f %d/%d count:%d ep:%d interval:%e", CkpvAccess(version), CkMyPe(), CkNumPes(), numBins, _numEntries, CkpvAccess(binSize));
347   if (CkpvAccess(version)>=3.0)
348   {
349     fprintf(fp, " phases:%d", phaseTab.numPhasesCalled());
350   }
351   fprintf(fp, "\n");
352
353   // write bin time
354 #if 1
355   int last=pool[0].getU();
356   writeU(fp, last);
357   int count=1;
358   for(j=1; j<numBins; j++) {
359     int u = pool[j].getU();
360     if (last == u) {
361       count++;
362     }
363     else {
364       if (count > 1) fprintf(fp, "+%d", count);
365       writeU(fp, u);
366       last = u;
367       count = 1;
368     }
369   }
370   if (count > 1) fprintf(fp, "+%d", count);
371 #else
372   for(j=0; j<numEntries; j++) 
373       pool[j].write(fp);
374 #endif
375   fprintf(fp, "\n");
376
377   // write entry execution time
378   fprintf(fp, "EPExeTime: ");
379   for (i=0; i<_numEntries; i++)
380     fprintf(fp, "%ld ", (long)(epInfo[i].epTime*1.0e6));
381   fprintf(fp, "\n");
382   // write entry function call times
383   fprintf(fp, "EPCallTime: ");
384   for (i=0; i<_numEntries; i++)
385     fprintf(fp, "%d ", epInfo[i].epCount);
386   fprintf(fp, "\n");
387   // write max entry function execute times
388   fprintf(fp, "MaxEPTime: ");
389   for (i=0; i<_numEntries; i++)
390     fprintf(fp, "%ld ", (long)(epInfo[i].epMaxTime*1.0e6));
391   fprintf(fp, "\n");
392 #if 0
393   for (i=0; i<SumEntryInfo::HIST_SIZE; i++) {
394     for (j=0; j<_numEntries; j++) 
395       fprintf(fp, "%d ", epInfo[j].hist[i]);
396     fprintf(fp, "\n");
397   }
398 #endif
399   // write marks
400   if (CkpvAccess(version)>=2.0) 
401   {
402     fprintf(fp, "NumMarks: %d ", markcount);
403     for (i=0; i<MAX_MARKS; i++) {
404       for(int j=0; j<events[i].length(); j++)
405         fprintf(fp, "%d %f ", i, events[i][j]->time);
406     }
407     fprintf(fp, "\n");
408   }
409   // write phases
410   if (CkpvAccess(version)>=3.0)
411   {
412     phaseTab.write(fp);
413   }
414   // write idle time
415   if (CkpvAccess(version)>=7.1) {
416     fprintf(fp, "IdlePercent: ");
417     int last=pool[0].getUIdle();
418     writeU(fp, last);
419     int count=1;
420     for(j=1; j<numBins; j++) {
421       int u = pool[j].getUIdle();
422       if (last == u) {
423         count++;
424       }
425       else {
426         if (count > 1) fprintf(fp, "+%d", count);
427         writeU(fp, u);
428         last = u;
429         count = 1;
430       }
431     }
432     if (count > 1) fprintf(fp, "+%d", count);
433     fprintf(fp, "\n");
434   }
435
436
437   // write summary details
438   if (sumDetail) {
439         fprintf(sdfp, "ver:%3.1f cpu:%d/%d numIntervals:%d numEPs:%d intervalSize:%e\n",
440                 CkpvAccess(version), CkMyPe(), CkNumPes(),
441                 numBins, _numEntries, CkpvAccess(binSize));
442
443         // Write out cpuTime in microseconds
444         // Run length encoding (RLE) along EP axis
445         fprintf(sdfp, "ExeTimePerEPperInterval ");
446         unsigned int e, i;
447         long last= (long) (getCPUtime(0,0)*1.0e6);
448         int count=0;
449         fprintf(sdfp, "%ld", last);
450         for(e=0; e<_numEntries; e++) {
451             for(i=0; i<numBins; i++) {
452
453                 long u= (long) (getCPUtime(i,e)*1.0e6);
454                 if (last == u) {
455                     count++;
456                 } else {
457
458                     if (count > 1) fprintf(sdfp, "+%d", count);
459                     fprintf(sdfp, " %ld", u);
460                     last = u;
461                     count = 1;
462                 }
463             }
464         }
465         if (count > 1) fprintf(sdfp, "+%d", count);
466         fprintf(sdfp, "\n");
467
468         // Write out numExecutions
469         // Run length encoding (RLE) along EP axis
470         fprintf(sdfp, "EPCallTimePerInterval ");
471         last= getNumExecutions(0,0);
472         count=0;
473         fprintf(sdfp, "%d", last);
474         for(e=0; e<_numEntries; e++) {
475             for(i=0; i<numBins; i++) {
476
477                 long u= getNumExecutions(i, e);
478                 if (last == u) {
479                     count++;
480                 } else {
481
482                     if (count > 1) fprintf(sdfp, "+%d", count);
483                     fprintf(sdfp, " %d", u);
484                     last = u;
485                     count = 1;
486                 }
487             }
488         }
489         if (count > 1) fprintf(sdfp, "+%d", count);
490         fprintf(sdfp, "\n");
491   }
492 }
493
494 void SumLogPool::writeSts(void)
495 {
496     // open sts file
497   char *fname = 
498        new char[strlen(CkpvAccess(traceRoot))+strlen(".sum.sts")+1];
499   sprintf(fname, "%s.sum.sts", CkpvAccess(traceRoot));
500   stsfp = fopen(fname, "w+");
501   //CmiPrintf("File: %s \n", fname);
502   if (stsfp == 0) {
503        CmiAbort("Cannot open summary sts file for writing.\n");
504   }
505   delete[] fname;
506
507   traceWriteSTS(stsfp,_numEvents);
508   for(int i=0;i<_numEvents;i++)
509     fprintf(stsfp, "EVENT %d Event%d\n", i, i);
510   fprintf(stsfp, "END\n");
511
512   fclose(stsfp);
513 }
514
515 // Called once per interval
516 void SumLogPool::add(double time, double idleTime, int pe) 
517 {
518   new (&pool[numBins++]) BinEntry(time, idleTime);
519   if (poolSize==numBins) {
520     shrink();
521   }
522 }
523
524 // Called once per run of an EP
525 // adds 'time' to EP's time, increments epCount
526 void SumLogPool::setEp(int epidx, double time) 
527 {
528   if (epidx >= epInfoSize) {
529         CmiAbort("Invalid entry point!!\n");
530   }
531   //CmiPrintf("set EP: %d %e \n", epidx, time);
532   epInfo[epidx].setTime(time);
533   // set phase table counter
534   phaseTab.setEp(epidx, time);
535 }
536
537 // Called once from endExecute, endPack, etc. this function updates
538 // the sumDetail intervals.
539 void SumLogPool::updateSummaryDetail(int epIdx, double startTime, double endTime)
540 {
541         if (epIdx >= epInfoSize) {
542             CmiAbort("Too many entry points!!\n");
543         }
544
545         double binSz = CkpvAccess(binSize);
546         int startingBinIdx, endingBinIdx;
547         startingBinIdx = (int)(startTime/binSz);
548         endingBinIdx = (int)(endTime/binSz);
549         // shrink if needed
550         while (endingBinIdx >= poolSize) {
551           shrink();
552           CmiAssert(CkpvAccess(binSize) > binSz);
553           binSz = CkpvAccess(binSize);
554           startingBinIdx = (int)(startTime/binSz);
555           endingBinIdx = (int)(endTime/binSz);
556         }
557
558         if (startingBinIdx == endingBinIdx) {
559             addToCPUtime(startingBinIdx, epIdx, endTime - startTime);
560         } else if (startingBinIdx < endingBinIdx) { // EP spans intervals
561             addToCPUtime(startingBinIdx, epIdx, (startingBinIdx+1)*binSz - startTime);
562             while(++startingBinIdx < endingBinIdx)
563                 addToCPUtime(startingBinIdx, epIdx, binSz);
564             addToCPUtime(endingBinIdx, epIdx, endTime - endingBinIdx*binSz);
565         } else {
566           CkPrintf("[%d] EP:%d Start:%lf End:%lf\n",CkMyPe(),epIdx,
567                    startTime, endTime);
568             CmiAbort("Error: end time of EP is less than start time\n");
569         }
570
571         incNumExecutions(startingBinIdx, epIdx);
572 }
573
574 // Shrinks pool[], cpuTime[], and numExecutions[]
575 void SumLogPool::shrink(void)
576 {
577 //  double t = CmiWallTimer();
578
579   // We ensured earlier that poolSize is even; therefore now numBins
580   // == poolSize == even.
581   int entries = numBins/2;
582   for (int i=0; i<entries; i++)
583   {
584      pool[i].time() = pool[i*2].time() + pool[i*2+1].time();
585      pool[i].getIdleTime() = pool[i*2].getIdleTime() + pool[i*2+1].getIdleTime();
586      if (sumDetail)
587      for (int e=0; e < epInfoSize; e++) {
588          setCPUtime(i, e, getCPUtime(i*2, e) + getCPUtime(i*2+1, e));
589          setNumExecutions(i, e, getNumExecutions(i*2, e) + getNumExecutions(i*2+1, e));
590      }
591   }
592   // zero out the remaining intervals
593   if (sumDetail) {
594     memset(&cpuTime[entries*epInfoSize], 0, (numBins-entries)*epInfoSize*sizeof(double));
595     memset(&numExecutions[entries*epInfoSize], 0, (numBins-entries)*epInfoSize*sizeof(int));
596   }
597   numBins = entries;
598   CkpvAccess(binSize) *= 2;
599
600 //CkPrintf("Shrinked binsize: %f entries:%d takes %fs!!!!\n", CkpvAccess(binSize), numEntries, CmiWallTimer()-t);
601 }
602
603 int  BinEntry::getU() 
604
605   return (int)(_time * 100.0 / CkpvAccess(binSize)); 
606 }
607
608 int BinEntry::getUIdle() {
609   return (int)(_idleTime * 100.0 / CkpvAccess(binSize));
610 }
611
612 void BinEntry::write(FILE* fp)
613 {
614   writeU(fp, getU());
615 }
616
617 TraceSummary::TraceSummary(char **argv):binStart(0.0),idleStart(0.0),
618                                         binTime(0.0),binIdle(0.0),msgNum(0)
619 {
620   if (CkpvAccess(traceOnPe) == 0) return;
621
622   CkpvInitialize(int, binCount);
623   CkpvInitialize(double, binSize);
624   CkpvInitialize(double, version);
625   CkpvAccess(binSize) = BIN_SIZE;
626   CkpvAccess(version) = VER;
627   CkpvAccess(binCount) = DefaultBinCount;
628   if (CmiGetArgIntDesc(argv,"+bincount",&CkpvAccess(binCount), "Total number of summary bins"))
629     if (CkMyPe() == 0) 
630       CmiPrintf("Trace: bincount: %d\n", CkpvAccess(binCount));
631   CmiGetArgDoubleDesc(argv,"+binsize",&CkpvAccess(binSize),
632         "CPU usage log time resolution");
633   CmiGetArgDoubleDesc(argv,"+version",&CkpvAccess(version),
634         "Write this .sum file version");
635
636   epThreshold = 0.001; 
637   CmiGetArgDoubleDesc(argv,"+epThreshold",&epThreshold,
638         "Execution time histogram lower bound");
639   epInterval = 0.001; 
640   CmiGetArgDoubleDesc(argv,"+epInterval",&epInterval,
641         "Execution time histogram bin size");
642
643   sumonly = CmiGetArgFlagDesc(argv, "+sumonly", "merge histogram bins on processor 0");
644   // +sumonly overrides +sumDetail
645   if (!sumonly)
646       sumDetail = CmiGetArgFlagDesc(argv, "+sumDetail", "more detailed summary info");
647
648   _logPool = new SumLogPool(CkpvAccess(traceRoot));
649   // assume invalid entry point on start
650   execEp=INVALIDEP;
651 }
652
653 void TraceSummary::traceClearEps(void)
654 {
655   _logPool->clearEps();
656 }
657
658 void TraceSummary::traceWriteSts(void)
659 {
660   if(CkMyPe()==0)
661       _logPool->writeSts();
662 }
663
664 void TraceSummary::traceClose(void)
665 {
666   if(CkMyPe()==0)
667       _logPool->writeSts();
668   CkpvAccess(_trace)->endComputation();
669   // destructor call the write()
670   delete _logPool;
671   // remove myself from traceArray so that no tracing will be called.
672   CkpvAccess(_traces)->removeTrace(this);
673 }
674
675 void TraceSummary::beginExecute(CmiObjId *tid)
676 {
677   beginExecute(-1,-1,_threadEP,-1);
678 }
679
680 void TraceSummary::beginExecute(envelope *e)
681 {
682   // no message means thread execution
683   if (e==NULL) {
684     beginExecute(-1,-1,_threadEP,-1);
685   }
686   else {
687     beginExecute(-1,-1,e->getEpIdx(),-1);
688   }  
689 }
690
691 void TraceSummary::beginExecute(int event,int msgType,int ep,int srcPe, int mlen, CmiObjId *idx)
692 {
693   if (execEp != INVALIDEP) {
694     TRACE_WARN("Warning: TraceSummary two consecutive BEGIN_PROCESSING!\n");
695     return;
696   }
697   
698   execEp=ep;
699   double t = TraceTimer();
700   //CmiPrintf("start: %f \n", start);
701   
702   start = t;
703   double ts = binStart;
704   // fill gaps
705   while ((ts = ts + CkpvAccess(binSize)) < t) {
706     /* Keep as a template for error checking. The current form of this check
707        is vulnerable to round-off errors (eg. 0.001 vs 0.001 the first time
708        I used it). Perhaps an improved form could be used in case vastly
709        incompatible EP vs idle times are reported (binSize*2?).
710
711        This check will have to be duplicated before each call to add()
712
713     CkPrintf("[%d] %f vs %f\n", CkMyPe(),
714              binTime + binIdle, CkpvAccess(binSize));
715     CkAssert(binTime + binIdle <= CkpvAccess(binSize));
716     */
717      _logPool->add(binTime, binIdle, CkMyPe()); // add leftovers of last bin
718      binTime=0.0;                 // fill all other bins with 0 up till start
719      binIdle = 0.0;
720      binStart = ts;
721   }
722 }
723
724 void TraceSummary::endExecute(void)
725 {
726   double t = TraceTimer();
727   double ts = start;
728   double nts = binStart;
729
730 /*
731   if (execEp == TRACEON_EP) {
732     // if trace just got turned on, then one expects to see this
733     // END_PROCESSING event without seeing a preceeding BEGIN_PROCESSING
734     return;
735   }
736 */
737
738   if (execEp == INVALIDEP) {
739     TRACE_WARN("Warning: TraceSummary END_PROCESSING without BEGIN_PROCESSING!\n");
740     return;
741   }
742
743   if (execEp >= 0)
744   {
745     _logPool->setEp(execEp, t-ts);
746   }
747
748   while ((nts = nts + CkpvAccess(binSize)) < t)
749   {
750     // fill the bins with time for this entry method
751      binTime += nts-ts;
752      binStart  = nts;
753      // This calls shrink() if needed
754      _logPool->add(binTime, binIdle, CkMyPe()); 
755      binTime = 0.0;
756      binIdle = 0.0;
757      ts = nts;
758   }
759   binTime += t - ts;
760
761   if (sumDetail && execEp >= 0)
762       _logPool->updateSummaryDetail(execEp, start, t);
763
764   execEp = INVALIDEP;
765 }
766
767 void TraceSummary::beginIdle(double currT)
768 {
769   double t = TraceTimer(currT);
770   
771   // mark the time of this idle period. Only the next endIdle should see
772   // this value
773   idleStart = t; 
774   double ts = binStart;
775   // fill gaps
776   while ((ts = ts + CkpvAccess(binSize)) < t) {
777     _logPool->add(binTime, binIdle, CkMyPe()); // add leftovers of last bin
778     binTime=0.0;                 // fill all other bins with 0 up till start
779     binIdle = 0.0;
780     binStart = ts;
781   }
782 }
783
784 void TraceSummary::endIdle(double currT)
785 {
786   double t = TraceTimer(currT);
787   double t_idleStart = idleStart;
788   double t_binStart = binStart;
789
790   while ((t_binStart = t_binStart + CkpvAccess(binSize)) < t)
791   {
792     // fill the bins with time for idle
793     binIdle += t_binStart - t_idleStart;
794     binStart = t_binStart;
795     _logPool->add(binTime, binIdle, CkMyPe()); // This calls shrink() if needed
796     binTime = 0.0;
797     binIdle = 0.0;
798     t_idleStart = t_binStart;
799   }
800   binIdle += t - t_idleStart;
801 }
802
803 void TraceSummary::traceBegin(void)
804 {
805     // fake as a start of an event, assuming traceBegin is called inside an
806     // entry function.
807   beginExecute(-1, -1, TRACEON_EP, -1, -1);
808 }
809
810 void TraceSummary::traceEnd(void)
811 {
812   endExecute();
813 }
814
815 void TraceSummary::beginPack(void)
816 {
817     packstart = CmiWallTimer();
818 }
819
820 void TraceSummary::endPack(void)
821 {
822     _logPool->setEp(_packEP, CmiWallTimer() - packstart);
823     if (sumDetail)
824         _logPool->updateSummaryDetail(_packEP,  TraceTimer(packstart), TraceTimer(CmiWallTimer()));
825 }
826
827 void TraceSummary::beginUnpack(void)
828 {
829     unpackstart = CmiWallTimer();
830 }
831
832 void TraceSummary::endUnpack(void)
833 {
834     _logPool->setEp(_unpackEP, CmiWallTimer()-unpackstart);
835     if (sumDetail)
836         _logPool->updateSummaryDetail(_unpackEP,  TraceTimer(unpackstart), TraceTimer(CmiWallTimer()));
837 }
838
839 void TraceSummary::beginComputation(void)
840 {
841   // initialze arrays because now the number of entries is known.
842   _logPool->initMem();
843 }
844
845 void TraceSummary::endComputation(void)
846 {
847   static int done = 0;
848   if (done) return;
849   done = 1;
850   if (msgNum==0) {
851 //CmiPrintf("Add at last: %d pe:%d time:%f msg:%d\n", index, CkMyPe(), bin, msgNum);
852      _logPool->add(binTime, binIdle, CkMyPe());
853      binTime = 0.0;
854      binIdle = 0.0;
855      msgNum ++;
856
857      binStart  += CkpvAccess(binSize);
858      double t = TraceTimer();
859      double ts = binStart;
860      while (ts < t)
861      {
862        _logPool->add(binTime, binIdle, CkMyPe());
863        binTime=0.0;
864        binIdle = 0.0;
865        ts += CkpvAccess(binSize);
866      }
867
868   }
869 }
870
871 void TraceSummary::addEventType(int eventType)
872 {
873   _logPool->addEventType(eventType, TraceTimer());
874 }
875
876 void TraceSummary::startPhase(int phase)
877 {
878    _logPool->startPhase(phase);
879 }
880
881 void TraceSummary::traceEnableCCS() {
882   CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
883   sumProxy.initCCS();
884 }
885
886
887 void TraceSummary::fillData(double *buffer, double reqStartTime, 
888                             double reqBinSize, int reqNumBins) {
889   // buffer has to be pre-allocated by the requester and must be an array of
890   // size reqNumBins.
891   //
892   // Assumptions: **CWL** FOR DEMO ONLY - a production-capable version will
893   //              need a number of these assumptions dropped:
894   //              1) reqBinSize == binSize (unrealistic)
895   //              2) bins boundary aligned (ok even under normal circumstances)
896   //              3) bins are "factor"-aligned (where reqBinSize != binSize)
897   //              4) bins are always available (true unless flush)
898   //              5) bins always starts from 0 (unrealistic)
899
900   // works only because of 1)
901   // **CWL** - FRACKING STUPID NAME "binStart" has nothing to do with 
902   //           "starting" at all!
903   int binOffset = (int)(reqStartTime/reqBinSize); 
904   for (int i=binOffset; i<binOffset + reqNumBins; i++) {
905     // CkPrintf("[%d] %f\n", i, pool()->getTime(i));
906     buffer[i-binOffset] = pool()->getTime(i);
907   }
908 }
909
910
911 /// for TraceSummaryBOC
912
913 void TraceSummaryBOC::initCCS() {
914   if(firstTime){
915     CkPrintf("[%d] initCCS() called for first time\n", CkMyPe());
916     // initializing CCS-based parameters on all processors
917     lastRequestedIndexBlock = 0;
918     indicesPerBlock = 1000;
919     collectionGranularity = 0.001; // time in seconds
920     nBufferedBins = 0;
921     
922     // initialize buffer, register CCS handler and start the collection
923     // pulse only on pe 0.
924     if (CkMyPe() == 0) { 
925       ccsBufferedData = new CkVec<double>();
926     
927       CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
928       CkPrintf("Trace Summary now listening in for CCS Client\n");
929       CcsRegisterHandler("CkPerfSummaryCcsClientCB", 
930                          CkCallback(CkIndex_TraceSummaryBOC::ccsRequestSummaryDouble(NULL), sumProxy[0]));
931       CcsRegisterHandler("CkPerfSummaryCcsClientCB uchar", 
932                          CkCallback(CkIndex_TraceSummaryBOC::ccsRequestSummaryUnsignedChar(NULL), sumProxy[0])); 
933
934       CkPrintf("[%d] Setting up periodic startCollectData callback\n", CkMyPe());
935       CcdCallOnConditionKeep(CcdPERIODIC_1second, startCollectData,
936                              (void *)this);
937       summaryCcsStreaming = CmiTrue;
938     }
939     firstTime = false;
940   }
941 }
942
943 /** Return summary information as double precision values for each sample period. 
944     The actual data collection is in double precision values. 
945
946     The units on the returned values are total execution time across all PEs.
947 */
948 void TraceSummaryBOC::ccsRequestSummaryDouble(CkCcsRequestMsg *m) {
949   double *sendBuffer;
950
951   CkPrintf("[%d] Request from Client detected.\n", CkMyPe());
952
953   CkPrintf("Responding ...\n");
954   int datalength = 0;
955   // if we have no data to send, send an acknowledgement code of -13.37
956   // instead.
957   if (ccsBufferedData->length() == 0) {
958     sendBuffer = new double[1];
959     sendBuffer[0] = -13.37;
960     datalength = sizeof(double);
961     CcsSendDelayedReply(m->reply, datalength, (void *)sendBuffer);
962     delete [] sendBuffer;
963   } else {
964     sendBuffer = ccsBufferedData->getVec();
965     datalength = ccsBufferedData->length()*sizeof(double);
966     CcsSendDelayedReply(m->reply, datalength, (void *)sendBuffer);
967     ccsBufferedData->free();
968   }
969   CkPrintf("Response Sent. Proceeding with computation.\n");
970   delete m;
971 }
972
973
974 /** Return summary information as unsigned char values for each sample period. 
975     The actual data collection is in double precision values.
976
977     This returns the utilization in a range from 0 to 200.
978 */
979 void TraceSummaryBOC::ccsRequestSummaryUnsignedChar(CkCcsRequestMsg *m) {
980   unsigned char *sendBuffer;
981
982   CkPrintf("[%d] Request from Client detected. \n", CkMyPe());
983
984   CkPrintf("Responding ...\n");
985   int datalength = 0;
986
987   if (ccsBufferedData->length() == 0) {
988     sendBuffer = new unsigned char[1];
989     sendBuffer[0] = 255;
990     datalength = sizeof(unsigned char);
991     CcsSendDelayedReply(m->reply, datalength, (void *)sendBuffer);
992     delete [] sendBuffer;
993   } else {
994     double * doubleData = ccsBufferedData->getVec();
995     int numData = ccsBufferedData->length();
996     
997     // pack data into unsigned char array
998     sendBuffer = new unsigned char[numData];
999     
1000     for(int i=0;i<numData;i++){
1001       sendBuffer[i] = 1000.0 * doubleData[i] / (double)CkNumPes() * 200.0; // max = 200 is the same as 100% utilization
1002       int v = sendBuffer[i];
1003     }    
1004
1005     datalength = sizeof(unsigned char) * numData;
1006     
1007     CcsSendDelayedReply(m->reply, datalength, (void *)sendBuffer);
1008     ccsBufferedData->free();
1009     delete [] sendBuffer;
1010   }
1011   CkPrintf("Response Sent. Proceeding with computation.\n");
1012   delete m;
1013 }
1014
1015
1016
1017 void startCollectData(void *data, double currT) {
1018   CkAssert(CkMyPe() == 0);
1019   // CkPrintf("startCollectData()\n");
1020   TraceSummaryBOC *sumObj = (TraceSummaryBOC *)data;
1021   int lastRequestedIndexBlock = sumObj->lastRequestedIndexBlock;
1022   double collectionGranularity = sumObj->collectionGranularity;
1023   int indicesPerBlock = sumObj->indicesPerBlock;
1024   
1025   double startTime = lastRequestedIndexBlock*
1026     collectionGranularity * indicesPerBlock;
1027   int numIndicesToGet = (int)floor((currT - startTime)/
1028                                    collectionGranularity);
1029   int numBlocksToGet = numIndicesToGet/indicesPerBlock;
1030   // **TODO** consider limiting the total number of blocks each collection
1031   //   request will pick up. This is to limit the amount of perturbation
1032   //   if it proves to be a problem.
1033   CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
1034
1035    sumProxy.collectSummaryData(startTime, 
1036                        collectionGranularity,
1037                        numBlocksToGet*indicesPerBlock);
1038   // assume success
1039   sumObj->lastRequestedIndexBlock += numBlocksToGet; 
1040 }
1041
1042 void TraceSummaryBOC::collectSummaryData(double startTime, double binSize,
1043                                   int numBins) {
1044   // CkPrintf("[%d] asked to contribute performance data\n", CkMyPe());
1045
1046   double *contribution = new double[numBins];
1047   for (int i=0; i<numBins; i++) {
1048     contribution[i] = 0.0;
1049   }
1050   CkpvAccess(_trace)->fillData(contribution, startTime, binSize, numBins);
1051
1052   /*
1053   for (int i=0; i<numBins; i++) {
1054     CkPrintf("[%d] %f\n", i, contribution[i]);
1055   }
1056   */
1057
1058   CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
1059   CkCallback cb(CkIndex_TraceSummaryBOC::summaryDataCollected(NULL), sumProxy[0]);
1060   contribute(sizeof(double)*numBins, contribution, CkReduction::sum_double, 
1061              cb);
1062 }
1063
1064 void TraceSummaryBOC::summaryDataCollected(CkReductionMsg *msg) {
1065   CkAssert(CkMyPe() == 0);
1066   // **CWL** No memory management for the ccs buffer for now.
1067
1068   // CkPrintf("[%d] Reduction completed and received\n", CkMyPe());
1069   double *recvData = (double *)msg->getData();
1070   int numBins = msg->getSize()/sizeof(double);
1071
1072   // if there's an easier way to append a data block to a CkVec, I'll take it
1073   for (int i=0; i<numBins; i++) {
1074     ccsBufferedData->insertAtEnd(recvData[i]);
1075   }
1076   delete msg;
1077 }
1078
1079
1080
1081
1082 void TraceSummaryBOC::startSumOnly()
1083 {
1084   CmiAssert(CkMyPe() == 0);
1085
1086   CProxy_TraceSummaryBOC p(traceSummaryGID);
1087   int size = CkpvAccess(_trace)->pool()->getNumEntries();
1088   p.askSummary(size);
1089 }
1090
1091 void TraceSummaryBOC::askSummary(int size)
1092 {
1093   if (CkpvAccess(_trace) == NULL) return;
1094
1095   int traced = CkpvAccess(_trace)->traceOnPE();
1096
1097   BinEntry *reductionBuffer = new BinEntry[size+1];
1098   reductionBuffer[size].time() = traced;  // last element is the traced pe count
1099   reductionBuffer[size].getIdleTime() = 0;  // last element is the traced pe count
1100   if (traced) {
1101     CkpvAccess(_trace)->endComputation();
1102     int n = CkpvAccess(_trace)->pool()->getNumEntries();
1103     BinEntry *localBins = CkpvAccess(_trace)->pool()->bins();
1104     if (n>size) n=size;
1105     for (int i=0; i<n; i++) reductionBuffer[i] = localBins[i];
1106   }
1107
1108   contribute(sizeof(BinEntry)*(size+1), reductionBuffer, 
1109              CkReduction::sum_double);
1110   delete [] reductionBuffer;
1111 }
1112
1113 //extern "C" void _CkExit();
1114
1115 void TraceSummaryBOC::sendSummaryBOC(CkReductionMsg *msg)
1116 {
1117   if (CkpvAccess(_trace) == NULL) return;
1118
1119   CkAssert(CkMyPe() == 0);
1120
1121   int n = msg->getSize()/sizeof(BinEntry);
1122   nBins = n-1;
1123   bins = (BinEntry *)msg->getData();
1124   nTracedPEs = (int)bins[n-1].time();
1125   // CmiPrintf("traced: %d entry:%d\n", nTracedPEs, nBins);
1126
1127   write();
1128
1129   delete msg;
1130
1131   CkExit();
1132 }
1133
1134 void TraceSummaryBOC::write(void) 
1135 {
1136   int i;
1137   unsigned int j;
1138
1139   char *fname = new char[strlen(CkpvAccess(traceRoot))+strlen(".sum")+1];
1140   sprintf(fname, "%s.sum", CkpvAccess(traceRoot));
1141   FILE *sumfp = fopen(fname, "w+");
1142   //CmiPrintf("File: %s \n", fname);
1143   if(sumfp == 0)
1144       CmiAbort("Cannot open summary sts file for writing.\n");
1145   delete[] fname;
1146
1147   int _numEntries=_entryTable.size();
1148   fprintf(sumfp, "ver:%3.1f %d/%d count:%d ep:%d interval:%e numTracedPE:%d", CkpvAccess(version), CkMyPe(), CkNumPes(), nBins, _numEntries, CkpvAccess(binSize), nTracedPEs);
1149   fprintf(sumfp, "\n");
1150
1151   // write bin time
1152 #if 0
1153   int last=pool[0].getU();
1154   writeU(fp, last);
1155   int count=1;
1156   for(j=1; j<numEntries; j++) {
1157     int u = pool[j].getU();
1158     if (last == u) {
1159       count++;
1160     }
1161     else {
1162       if (count > 1) fprintf(fp, "+%d", count);
1163       writeU(fp, u);
1164       last = u;
1165       count = 1;
1166     }
1167   }
1168   if (count > 1) fprintf(fp, "+%d", count);
1169 #else
1170   for(j=0; j<nBins; j++) {
1171     bins[j].time() /= nTracedPEs;
1172     bins[j].write(sumfp);
1173   }
1174 #endif
1175   fprintf(sumfp, "\n");
1176   fclose(sumfp);
1177
1178 }
1179
1180 extern "C" void CombineSummary()
1181 {
1182 #if CMK_TRACE_ENABLED
1183   CmiPrintf("[%d] CombineSummary called!\n", CkMyPe());
1184   if (sumonly) {
1185     CmiPrintf("[%d] Sum Only start!\n", CkMyPe());
1186       // pe 0 start the sumonly process
1187     CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
1188     sumProxy[0].startSumOnly();
1189   }
1190   else CkExit();
1191 #else
1192   CkExit();
1193 #endif
1194 }
1195
1196 void initTraceSummaryBOC()
1197 {
1198 #ifdef __BLUEGENE__
1199   if(BgNodeRank()==0) {
1200 #else
1201   if (CkMyRank() == 0) {
1202 #endif
1203     registerExitFn(CombineSummary);
1204   }
1205 }
1206
1207
1208
1209
1210
1211
1212 #include "TraceSummary.def.h"
1213
1214
1215 /*@}*/
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235