added explicit code for handling traceBegin and traceEnd.
[charm.git] / src / ck-perf / trace-summary.C
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 /**
9  * \addtogroup CkPerf
10 */
11 /*@{*/
12
13 #include "charm++.h"
14 #include "trace-summary.h"
15 #include "trace-summaryBOC.h"
16
17 #define DEBUGF(x)  // CmiPrintf x
18
19 #define VER   7.1
20
21 #define INVALIDEP     -2
22 #define TRACEON_EP     -3
23
24 // 1 minutes of run before it'll fill up:
25 #define DefaultBinCount      (1000*60*1) 
26
27 CkpvStaticDeclare(TraceSummary*, _trace);
28 static int _numEvents = 0;
29 #define NUM_DUMMY_EPS 9
30 CkpvDeclare(int, binCount);
31 CkpvDeclare(double, binSize);
32 CkpvDeclare(double, version);
33
34
35 CkpvDeclare(int, previouslySentBins);
36
37
38
39
40
41 /** 
42     A class that reads/writes a buffer out of different types of data.
43
44     This class exists because I need to get references to parts of the buffer 
45     that have already been used so that I can increment counters inside the buffer.
46 */
47
48 class compressedBuffer {
49 public:
50   char* buf;
51   int pos; ///<< byte position just beyond the previously read/written data
52
53   compressedBuffer(){
54     buf = NULL;
55     pos = 0;
56   }
57
58   compressedBuffer(int bytes){
59     buf = (char*)malloc(bytes);
60     pos = 0;
61   }
62
63   compressedBuffer(void *buffer){
64     buf = (char*)buffer;
65     pos = 0;
66   }
67     
68   void init(void *buffer){
69     buf = (char*)buffer;
70     pos = 0;
71   }
72     
73   inline void * currentPtr(){
74     return (void*)(buf+pos);
75   }
76
77   template <typename T>
78   T read(int offset){
79     // to resolve unaligned writes causing bus errors, need memcpy
80     T v;
81     memcpy(&v, buf+offset, sizeof(T));
82     return v;
83   }
84     
85   template <typename T>
86   void write(T v, int offset){
87     T v2 = v; // on stack
88     // to resolve unaligned writes causing bus errors, need memcpy
89     memcpy(buf+offset, &v2, sizeof(T));
90   }
91     
92   template <typename T>
93   void increment(int offset){
94     T temp;
95     temp = read<T>(offset);
96     temp ++;
97     write<T>(temp, offset);
98   }
99
100   template <typename T>
101   void accumulate(T v, int offset){
102     T temp;
103     temp = read<T>(offset);
104     temp += v;
105     write<T>(temp, offset);
106   }
107   
108   template <typename T>
109   int push(T v){
110     int oldpos = pos;
111     write<T>(v, pos);
112     pos += sizeof(T);
113     return oldpos;
114   }
115   
116   template <typename T>
117   T pop(){
118     T temp = read<T>(pos);
119     pos += sizeof(T);
120     return temp;
121   }
122
123   template <typename T>
124   T peek(){
125     T temp = read<T>(pos);
126     return temp;
127   }
128
129   template <typename T0, typename T>
130   T peekSecond(){
131     T temp;
132     memcpy(&temp, buf+pos+sizeof(T0), sizeof(T));
133     return temp;
134   }
135
136   int datalength(){
137     return pos;
138   }
139      
140   void * buffer(){
141     return (void*) buf;
142   }  
143
144   void freeBuf(){
145     free(buf);
146   }
147
148   ~compressedBuffer(){
149     // don't free the buf because the user my have supplied the buffer
150   }
151     
152 };
153
154
155
156
157 // Global Readonly
158 CkGroupID traceSummaryGID;
159 bool summaryCcsStreaming;
160
161 int sumonly = 0;
162 int sumDetail = 0;
163
164 /**
165   For each TraceFoo module, _createTraceFoo() must be defined.
166   This function is called in _createTraces() generated in moduleInit.C
167 */
168 void _createTracesummary(char **argv)
169 {
170   DEBUGF(("%d createTraceSummary\n", CkMyPe()));
171   CkpvInitialize(TraceSummary*, _trace);
172   CkpvInitialize(int, previouslySentBins);
173   CkpvAccess(previouslySentBins) = 0;
174   CkpvAccess(_trace) = new  TraceSummary(argv);
175   CkpvAccess(_traces)->addTrace(CkpvAccess(_trace));
176   if (CkMyPe()==0) CkPrintf("Charm++: Tracemode Summary enabled.\n");
177 }
178
179
180 /// function call for starting a phase in trace summary logs 
181 extern "C" 
182 void CkSummary_StartPhase(int phase)
183 {
184    CkpvAccess(_trace)->startPhase(phase);
185 }
186
187
188 /// function call for adding an event mark
189 extern "C" 
190 void CkSummary_MarkEvent(int eventType)
191 {
192    CkpvAccess(_trace)->addEventType(eventType);
193 }
194
195 static inline void writeU(FILE* fp, int u)
196 {
197   fprintf(fp, "%4d", u);
198 }
199
200 PhaseEntry::PhaseEntry() 
201 {
202   int _numEntries=_entryTable.size();
203   // FIXME: Hopes there won't be more than 10 more EP's registered from now on...
204   nEPs = _numEntries+10; 
205   count = new int[nEPs];
206   times = new double[nEPs];
207   maxtimes = new double[nEPs];
208   for (int i=0; i<nEPs; i++) {
209     count[i] = 0;
210     times[i] = 0.0;
211     maxtimes[i] = 0.;
212   }
213 }
214
215 SumLogPool::~SumLogPool() 
216 {
217   if (!sumonly) {
218     write();
219     fclose(fp);
220     if (sumDetail) fclose(sdfp);
221   }
222   // free memory for mark
223   if (markcount > 0)
224   for (int i=0; i<MAX_MARKS; i++) {
225     for (int j=0; j<events[i].length(); j++)
226       delete events[i][j];
227   }
228   delete[] pool;
229   delete[] epInfo;
230   delete[] cpuTime;
231   delete[] numExecutions;
232 }
233
234 void SumLogPool::addEventType(int eventType, double time)
235 {
236    if (eventType <0 || eventType >= MAX_MARKS) {
237        CkPrintf("Invalid event type %d!\n", eventType);
238        return;
239    }
240    MarkEntry *e = new MarkEntry;
241    e->time = time;
242    events[eventType].push_back(e);
243    markcount ++;
244 }
245
246 SumLogPool::SumLogPool(char *pgm) : numBins(0), phaseTab(MAX_PHASES) 
247 {
248    // TBD: Can this be moved to initMem?
249   cpuTime = NULL;
250    poolSize = CkpvAccess(binCount);
251    if (poolSize % 2) poolSize++;        // make sure it is even
252    pool = new BinEntry[poolSize];
253    _MEMCHECK(pool);
254
255    this->pgm = new char[strlen(pgm)+1];
256    strcpy(this->pgm,pgm);
257    
258 #if 0
259    // create the sts file
260    if (CkMyPe() == 0) {
261      char *fname = 
262        new char[strlen(CkpvAccess(traceRoot))+strlen(".sum.sts")+1];
263      sprintf(fname, "%s.sum.sts", CkpvAccess(traceRoot));
264      stsfp = fopen(fname, "w+");
265      //CmiPrintf("File: %s \n", fname);
266      if (stsfp == 0) {
267        CmiAbort("Cannot open summary sts file for writing.\n");
268       }
269      delete[] fname;
270    }
271 #endif
272    
273    // event
274    markcount = 0;
275 }
276
277 void SumLogPool::initMem()
278 {
279    int _numEntries=_entryTable.size();
280    epInfoSize = _numEntries + NUM_DUMMY_EPS + 1; // keep a spare EP
281    epInfo = new SumEntryInfo[epInfoSize];
282    _MEMCHECK(epInfo);
283
284    cpuTime = NULL;
285    numExecutions = NULL;
286    if (sumDetail) {
287        cpuTime = new double[poolSize*epInfoSize];
288        _MEMCHECK(cpuTime);
289        memset(cpuTime, 0, poolSize*epInfoSize*sizeof(double));
290        numExecutions = new int[poolSize*epInfoSize];
291        _MEMCHECK(numExecutions);
292        memset(numExecutions, 0, poolSize*epInfoSize*sizeof(int));
293
294 //         int i, e;
295 //         for(i=0; i<poolSize; i++) {
296 //             for(e=0; e< epInfoSize; e++) {
297 //                 setCPUtime(i,e,0.0);
298 //                 setNumExecutions(i,e,0);
299 //             }
300 //         }
301    }
302 }
303
304 int SumLogPool::getUtilization(int interval, int ep) {
305     return (int)(getCPUtime(interval, ep) * 100.0 / CkpvAccess(binSize)); 
306 }
307
308 void SumLogPool::write(void) 
309 {
310   int i;
311   unsigned int j;
312   int _numEntries=_entryTable.size();
313
314   fp = NULL;
315   sdfp = NULL;
316
317   // create file(s)
318   // CmiPrintf("TRACE: %s:%d\n", fname, errno);
319   if (!sumonly) {
320     char pestr[10];
321     sprintf(pestr, "%d", CkMyPe());
322     int len = strlen(pgm) + strlen(".sumd.") + strlen(pestr) + 1;
323     char *fname = new char[len+1];
324     
325     sprintf(fname, "%s.%s.sum", pgm, pestr);
326     do {
327       fp = fopen(fname, "w+");
328     } while (!fp && errno == EINTR);
329     if (!fp) {
330       CkPrintf("[%d] Attempting to open [%s]\n",CkMyPe(),fname);
331       CmiAbort("Cannot open Summary Trace File for writing...\n");
332     }
333     
334     if (sumDetail) {
335       sprintf(fname, "%s.%s.sumd", pgm, pestr);
336       do {
337         sdfp = fopen(fname, "w+");
338       } while (!sdfp && errno == EINTR);
339       if(!sdfp) {
340         CmiAbort("Cannot open Detailed Summary Trace File for writing...\n");
341       }
342     }
343     delete[] fname;
344   }
345
346   fprintf(fp, "ver:%3.1f %d/%d count:%d ep:%d interval:%e", CkpvAccess(version), CkMyPe(), CkNumPes(), numBins, _numEntries, CkpvAccess(binSize));
347   if (CkpvAccess(version)>=3.0)
348   {
349     fprintf(fp, " phases:%d", phaseTab.numPhasesCalled());
350   }
351   fprintf(fp, "\n");
352
353   // write bin time
354 #if 1
355   int last=pool[0].getU();
356   writeU(fp, last);
357   int count=1;
358   for(j=1; j<numBins; j++) {
359     int u = pool[j].getU();
360     if (last == u) {
361       count++;
362     }
363     else {
364       if (count > 1) fprintf(fp, "+%d", count);
365       writeU(fp, u);
366       last = u;
367       count = 1;
368     }
369   }
370   if (count > 1) fprintf(fp, "+%d", count);
371 #else
372   for(j=0; j<numEntries; j++) 
373       pool[j].write(fp);
374 #endif
375   fprintf(fp, "\n");
376
377   // write entry execution time
378   fprintf(fp, "EPExeTime: ");
379   for (i=0; i<_numEntries; i++)
380     fprintf(fp, "%ld ", (long)(epInfo[i].epTime*1.0e6));
381   fprintf(fp, "\n");
382   // write entry function call times
383   fprintf(fp, "EPCallTime: ");
384   for (i=0; i<_numEntries; i++)
385     fprintf(fp, "%d ", epInfo[i].epCount);
386   fprintf(fp, "\n");
387   // write max entry function execute times
388   fprintf(fp, "MaxEPTime: ");
389   for (i=0; i<_numEntries; i++)
390     fprintf(fp, "%ld ", (long)(epInfo[i].epMaxTime*1.0e6));
391   fprintf(fp, "\n");
392 #if 0
393   for (i=0; i<SumEntryInfo::HIST_SIZE; i++) {
394     for (j=0; j<_numEntries; j++) 
395       fprintf(fp, "%d ", epInfo[j].hist[i]);
396     fprintf(fp, "\n");
397   }
398 #endif
399   // write marks
400   if (CkpvAccess(version)>=2.0) 
401   {
402     fprintf(fp, "NumMarks: %d ", markcount);
403     for (i=0; i<MAX_MARKS; i++) {
404       for(int j=0; j<events[i].length(); j++)
405         fprintf(fp, "%d %f ", i, events[i][j]->time);
406     }
407     fprintf(fp, "\n");
408   }
409   // write phases
410   if (CkpvAccess(version)>=3.0)
411   {
412     phaseTab.write(fp);
413   }
414   // write idle time
415   if (CkpvAccess(version)>=7.1) {
416     fprintf(fp, "IdlePercent: ");
417     int last=pool[0].getUIdle();
418     writeU(fp, last);
419     int count=1;
420     for(j=1; j<numBins; j++) {
421       int u = pool[j].getUIdle();
422       if (last == u) {
423         count++;
424       }
425       else {
426         if (count > 1) fprintf(fp, "+%d", count);
427         writeU(fp, u);
428         last = u;
429         count = 1;
430       }
431     }
432     if (count > 1) fprintf(fp, "+%d", count);
433     fprintf(fp, "\n");
434   }
435
436
437   // write summary details
438   if (sumDetail) {
439         fprintf(sdfp, "ver:%3.1f cpu:%d/%d numIntervals:%d numEPs:%d intervalSize:%e\n",
440                 CkpvAccess(version), CkMyPe(), CkNumPes(),
441                 numBins, _numEntries, CkpvAccess(binSize));
442
443         // Write out cpuTime in microseconds
444         // Run length encoding (RLE) along EP axis
445         fprintf(sdfp, "ExeTimePerEPperInterval ");
446         unsigned int e, i;
447         long last= (long) (getCPUtime(0,0)*1.0e6);
448         int count=0;
449         fprintf(sdfp, "%ld", last);
450         for(e=0; e<_numEntries; e++) {
451             for(i=0; i<numBins; i++) {
452
453                 long u= (long) (getCPUtime(i,e)*1.0e6);
454                 if (last == u) {
455                     count++;
456                 } else {
457
458                     if (count > 1) fprintf(sdfp, "+%d", count);
459                     fprintf(sdfp, " %ld", u);
460                     last = u;
461                     count = 1;
462                 }
463             }
464         }
465         if (count > 1) fprintf(sdfp, "+%d", count);
466         fprintf(sdfp, "\n");
467
468         // Write out numExecutions
469         // Run length encoding (RLE) along EP axis
470         fprintf(sdfp, "EPCallTimePerInterval ");
471         last= getNumExecutions(0,0);
472         count=0;
473         fprintf(sdfp, "%d", last);
474         for(e=0; e<_numEntries; e++) {
475             for(i=0; i<numBins; i++) {
476
477                 long u= getNumExecutions(i, e);
478                 if (last == u) {
479                     count++;
480                 } else {
481
482                     if (count > 1) fprintf(sdfp, "+%d", count);
483                     fprintf(sdfp, " %d", u);
484                     last = u;
485                     count = 1;
486                 }
487             }
488         }
489         if (count > 1) fprintf(sdfp, "+%d", count);
490         fprintf(sdfp, "\n");
491   }
492 }
493
494 void SumLogPool::writeSts(void)
495 {
496     // open sts file
497   char *fname = 
498        new char[strlen(CkpvAccess(traceRoot))+strlen(".sum.sts")+1];
499   sprintf(fname, "%s.sum.sts", CkpvAccess(traceRoot));
500   stsfp = fopen(fname, "w+");
501   //CmiPrintf("File: %s \n", fname);
502   if (stsfp == 0) {
503        CmiAbort("Cannot open summary sts file for writing.\n");
504   }
505   delete[] fname;
506
507   traceWriteSTS(stsfp,_numEvents);
508   for(int i=0;i<_numEvents;i++)
509     fprintf(stsfp, "EVENT %d Event%d\n", i, i);
510   fprintf(stsfp, "END\n");
511
512   fclose(stsfp);
513 }
514
515 // Called once per interval
516 void SumLogPool::add(double time, double idleTime, int pe) 
517 {
518   new (&pool[numBins++]) BinEntry(time, idleTime);
519   if (poolSize==numBins) {
520     shrink();
521   }
522 }
523
524 // Called once per run of an EP
525 // adds 'time' to EP's time, increments epCount
526 void SumLogPool::setEp(int epidx, double time) 
527 {
528   if (epidx >= epInfoSize) {
529         CmiAbort("Invalid entry point!!\n");
530   }
531   //CmiPrintf("set EP: %d %e \n", epidx, time);
532   epInfo[epidx].setTime(time);
533   // set phase table counter
534   phaseTab.setEp(epidx, time);
535 }
536
537 // Called once from endExecute, endPack, etc. this function updates
538 // the sumDetail intervals.
539 void SumLogPool::updateSummaryDetail(int epIdx, double startTime, double endTime)
540 {
541         if (epIdx >= epInfoSize) {
542             CmiAbort("Too many entry points!!\n");
543         }
544
545         double binSz = CkpvAccess(binSize);
546         int startingBinIdx, endingBinIdx;
547         startingBinIdx = (int)(startTime/binSz);
548         endingBinIdx = (int)(endTime/binSz);
549         // shrink if needed
550         while (endingBinIdx >= poolSize) {
551           shrink();
552           CmiAssert(CkpvAccess(binSize) > binSz);
553           binSz = CkpvAccess(binSize);
554           startingBinIdx = (int)(startTime/binSz);
555           endingBinIdx = (int)(endTime/binSz);
556         }
557
558         if (startingBinIdx == endingBinIdx) {
559             addToCPUtime(startingBinIdx, epIdx, endTime - startTime);
560         } else if (startingBinIdx < endingBinIdx) { // EP spans intervals
561             addToCPUtime(startingBinIdx, epIdx, (startingBinIdx+1)*binSz - startTime);
562             while(++startingBinIdx < endingBinIdx)
563                 addToCPUtime(startingBinIdx, epIdx, binSz);
564             addToCPUtime(endingBinIdx, epIdx, endTime - endingBinIdx*binSz);
565         } else {
566           CkPrintf("[%d] EP:%d Start:%lf End:%lf\n",CkMyPe(),epIdx,
567                    startTime, endTime);
568             CmiAbort("Error: end time of EP is less than start time\n");
569         }
570
571         incNumExecutions(startingBinIdx, epIdx);
572 }
573
574 // Shrinks pool[], cpuTime[], and numExecutions[]
575 void SumLogPool::shrink(void)
576 {
577 //  double t = CmiWallTimer();
578
579   // We ensured earlier that poolSize is even; therefore now numBins
580   // == poolSize == even.
581   int entries = numBins/2;
582   for (int i=0; i<entries; i++)
583   {
584      pool[i].time() = pool[i*2].time() + pool[i*2+1].time();
585      if (sumDetail)
586      for (int e=0; e < epInfoSize; e++) {
587          setCPUtime(i, e, getCPUtime(i*2, e) + getCPUtime(i*2+1, e));
588          setNumExecutions(i, e, getNumExecutions(i*2, e) + getNumExecutions(i*2+1, e));
589      }
590   }
591   // zero out the remaining intervals
592   if (sumDetail) {
593     memset(&cpuTime[entries*epInfoSize], 0, (numBins-entries)*epInfoSize*sizeof(double));
594     memset(&numExecutions[entries*epInfoSize], 0, (numBins-entries)*epInfoSize*sizeof(int));
595   }
596   numBins = entries;
597   CkpvAccess(binSize) *= 2;
598
599 //CkPrintf("Shrinked binsize: %f entries:%d takes %fs!!!!\n", CkpvAccess(binSize), numEntries, CmiWallTimer()-t);
600 }
601
602 int  BinEntry::getU() 
603
604   return (int)(_time * 100.0 / CkpvAccess(binSize)); 
605 }
606
607 int BinEntry::getUIdle() {
608   return (int)(_idleTime * 100.0 / CkpvAccess(binSize));
609 }
610
611 void BinEntry::write(FILE* fp)
612 {
613   writeU(fp, getU());
614 }
615
616 TraceSummary::TraceSummary(char **argv):binStart(0.0),
617                                         binTime(0.0),binIdle(0.0),msgNum(0)
618 {
619   if (CkpvAccess(traceOnPe) == 0) return;
620
621   CkpvInitialize(int, binCount);
622   CkpvInitialize(double, binSize);
623   CkpvInitialize(double, version);
624   CkpvAccess(binSize) = BIN_SIZE;
625   CkpvAccess(version) = VER;
626   CkpvAccess(binCount) = DefaultBinCount;
627   if (CmiGetArgIntDesc(argv,"+bincount",&CkpvAccess(binCount), "Total number of summary bins"))
628     if (CkMyPe() == 0) 
629       CmiPrintf("Trace: bincount: %d\n", CkpvAccess(binCount));
630   CmiGetArgDoubleDesc(argv,"+binsize",&CkpvAccess(binSize),
631         "CPU usage log time resolution");
632   CmiGetArgDoubleDesc(argv,"+version",&CkpvAccess(version),
633         "Write this .sum file version");
634
635   epThreshold = 0.001; 
636   CmiGetArgDoubleDesc(argv,"+epThreshold",&epThreshold,
637         "Execution time histogram lower bound");
638   epInterval = 0.001; 
639   CmiGetArgDoubleDesc(argv,"+epInterval",&epInterval,
640         "Execution time histogram bin size");
641
642   sumonly = CmiGetArgFlagDesc(argv, "+sumonly", "merge histogram bins on processor 0");
643   // +sumonly overrides +sumDetail
644   if (!sumonly)
645       sumDetail = CmiGetArgFlagDesc(argv, "+sumDetail", "more detailed summary info");
646
647   _logPool = new SumLogPool(CkpvAccess(traceRoot));
648   // assume invalid entry point on start
649   execEp=INVALIDEP;
650 }
651
652 void TraceSummary::traceClearEps(void)
653 {
654   _logPool->clearEps();
655 }
656
657 void TraceSummary::traceWriteSts(void)
658 {
659   if(CkMyPe()==0)
660       _logPool->writeSts();
661 }
662
663 void TraceSummary::traceClose(void)
664 {
665   if(CkMyPe()==0)
666       _logPool->writeSts();
667   CkpvAccess(_trace)->endComputation();
668   // destructor call the write()
669   delete _logPool;
670   // remove myself from traceArray so that no tracing will be called.
671   CkpvAccess(_traces)->removeTrace(this);
672 }
673
674 void TraceSummary::beginExecute(CmiObjId *tid)
675 {
676   beginExecute(-1,-1,_threadEP,-1);
677 }
678
679 void TraceSummary::beginExecute(envelope *e)
680 {
681   // no message means thread execution
682   if (e==NULL) {
683     beginExecute(-1,-1,_threadEP,-1);
684   }
685   else {
686     beginExecute(-1,-1,e->getEpIdx(),-1);
687   }  
688 }
689
690 void TraceSummary::beginExecute(int event,int msgType,int ep,int srcPe, int mlen, CmiObjId *idx)
691 {
692   if (execEp != INVALIDEP) {
693     TRACE_WARN("Warning: TraceSummary two consecutive BEGIN_PROCESSING!\n");
694     return;
695   }
696   
697   execEp=ep;
698   double t = TraceTimer();
699   //CmiPrintf("start: %f \n", start);
700   
701   start = t;
702   double ts = binStart;
703   // fill gaps
704   while ((ts = ts + CkpvAccess(binSize)) < t) {
705     /* Keep as a template for error checking. The current form of this check
706        is vulnerable to round-off errors (eg. 0.001 vs 0.001 the first time
707        I used it). Perhaps an improved form could be used in case vastly
708        incompatible EP vs idle times are reported (binSize*2?).
709
710        This check will have to be duplicated before each call to add()
711
712     CkPrintf("[%d] %f vs %f\n", CkMyPe(),
713              binTime + binIdle, CkpvAccess(binSize));
714     CkAssert(binTime + binIdle <= CkpvAccess(binSize));
715     */
716      _logPool->add(binTime, binIdle, CkMyPe()); // add leftovers of last bin
717      binTime=0.0;                 // fill all other bins with 0 up till start
718      binIdle = 0.0;
719      binStart = ts;
720   }
721 }
722
723 void TraceSummary::endExecute(void)
724 {
725   double t = TraceTimer();
726   double ts = start;
727   double nts = binStart;
728
729 /*
730   if (execEp == TRACEON_EP) {
731     // if trace just got turned on, then one expects to see this
732     // END_PROCESSING event without seeing a preceeding BEGIN_PROCESSING
733     return;
734   }
735 */
736
737   if (execEp == INVALIDEP) {
738     TRACE_WARN("Warning: TraceSummary END_PROCESSING without BEGIN_PROCESSING!\n");
739     return;
740   }
741
742   if (execEp >= 0)
743   {
744     _logPool->setEp(execEp, t-ts);
745   }
746
747   while ((nts = nts + CkpvAccess(binSize)) < t)
748   {
749     // fill the bins with time for this entry method
750      binTime += nts-ts;
751      binStart  = nts;
752      // This calls shrink() if needed
753      _logPool->add(binTime, binIdle, CkMyPe()); 
754      binTime = 0.0;
755      binIdle = 0.0;
756      ts = nts;
757   }
758   binTime += t - ts;
759
760   if (sumDetail)
761       _logPool->updateSummaryDetail(execEp, start, t);
762
763   execEp = INVALIDEP;
764 }
765
766 void TraceSummary::beginIdle(double currT)
767 {
768   // for consistency with current framework behavior, currT is ignored and
769   // independent timing taken by trace-summary.
770   double t = TraceTimer();
771   
772   // mark the time of this idle period. Only the next endIdle should see
773   // this value
774   idleStart = t; 
775   double ts = binStart;
776   // fill gaps
777   while ((ts = ts + CkpvAccess(binSize)) < t) {
778     _logPool->add(binTime, binIdle, CkMyPe()); // add leftovers of last bin
779     binTime=0.0;                 // fill all other bins with 0 up till start
780     binIdle = 0.0;
781     binStart = ts;
782   }
783 }
784
785 void TraceSummary::endIdle(double currT)
786 {
787   // again, we ignore the reported currT (see beginIdle)
788   double t = TraceTimer();
789   double t_idleStart = idleStart;
790   double t_binStart = binStart;
791
792   while ((t_binStart = t_binStart + CkpvAccess(binSize)) < t)
793   {
794     // fill the bins with time for idle
795     binIdle += t_binStart - t_idleStart;
796     binStart = t_binStart;
797     _logPool->add(binTime, binIdle, CkMyPe()); // This calls shrink() if needed
798     binTime = 0.0;
799     binIdle = 0.0;
800     t_idleStart = t_binStart;
801   }
802   binIdle += t - t_idleStart;
803 }
804
805 void TraceSummary::traceBegin(void)
806 {
807   beginExecute(-1, -1, TRACEON_EP, -1, -1);
808 }
809
810 void TraceSummary::traceEnd(void)
811 {
812   endExecute();
813 }
814
815 void TraceSummary::beginPack(void)
816 {
817     packstart = CmiWallTimer();
818 }
819
820 void TraceSummary::endPack(void)
821 {
822     _logPool->setEp(_packEP, CmiWallTimer() - packstart);
823     if (sumDetail)
824         _logPool->updateSummaryDetail(_packEP,  TraceTimer(packstart), TraceTimer(CmiWallTimer()));
825 }
826
827 void TraceSummary::beginUnpack(void)
828 {
829     unpackstart = CmiWallTimer();
830 }
831
832 void TraceSummary::endUnpack(void)
833 {
834     _logPool->setEp(_unpackEP, CmiWallTimer()-unpackstart);
835     if (sumDetail)
836         _logPool->updateSummaryDetail(_unpackEP,  TraceTimer(unpackstart), TraceTimer(CmiWallTimer()));
837 }
838
839 void TraceSummary::beginComputation(void)
840 {
841   // initialze arrays because now the number of entries is known.
842   _logPool->initMem();
843 }
844
845 void TraceSummary::endComputation(void)
846 {
847   static int done = 0;
848   if (done) return;
849   done = 1;
850   if (msgNum==0) {
851 //CmiPrintf("Add at last: %d pe:%d time:%f msg:%d\n", index, CkMyPe(), bin, msgNum);
852      _logPool->add(binTime, binIdle, CkMyPe());
853      binTime = 0.0;
854      binIdle = 0.0;
855      msgNum ++;
856
857      binStart  += CkpvAccess(binSize);
858      double t = TraceTimer();
859      double ts = binStart;
860      while (ts < t)
861      {
862        _logPool->add(binTime, binIdle, CkMyPe());
863        binTime=0.0;
864        binIdle = 0.0;
865        ts += CkpvAccess(binSize);
866      }
867
868   }
869 }
870
871 void TraceSummary::addEventType(int eventType)
872 {
873   _logPool->addEventType(eventType, TraceTimer());
874 }
875
876 void TraceSummary::startPhase(int phase)
877 {
878    _logPool->startPhase(phase);
879 }
880
881 void TraceSummary::traceEnableCCS() {
882   CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
883   sumProxy.initCCS();
884 }
885
886
887 void TraceSummary::fillData(double *buffer, double reqStartTime, 
888                             double reqBinSize, int reqNumBins) {
889   // buffer has to be pre-allocated by the requester and must be an array of
890   // size reqNumBins.
891   //
892   // Assumptions: **CWL** FOR DEMO ONLY - a production-capable version will
893   //              need a number of these assumptions dropped:
894   //              1) reqBinSize == binSize (unrealistic)
895   //              2) bins boundary aligned (ok even under normal circumstances)
896   //              3) bins are "factor"-aligned (where reqBinSize != binSize)
897   //              4) bins are always available (true unless flush)
898   //              5) bins always starts from 0 (unrealistic)
899
900   // works only because of 1)
901   // **CWL** - FRACKING STUPID NAME "binStart" has nothing to do with 
902   //           "starting" at all!
903   int binOffset = (int)(reqStartTime/reqBinSize); 
904   for (int i=binOffset; i<binOffset + reqNumBins; i++) {
905     // CkPrintf("[%d] %f\n", i, pool()->getTime(i));
906     buffer[i-binOffset] = pool()->getTime(i);
907   }
908 }
909
910
911 /// for TraceSummaryBOC
912
913 void TraceSummaryBOC::initCCS() {
914   if(firstTime){
915     CkPrintf("[%d] initCCS() called for first time\n", CkMyPe());
916     // initializing CCS-based parameters on all processors
917     lastRequestedIndexBlock = 0;
918     indicesPerBlock = 1000;
919     collectionGranularity = 0.001; // time in seconds
920     nBufferedBins = 0;
921     
922     // initialize buffer, register CCS handler and start the collection
923     // pulse only on pe 0.
924     if (CkMyPe() == 0) { 
925       ccsBufferedData = new CkVec<double>();
926     
927       CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
928       CkPrintf("Trace Summary now listening in for CCS Client\n");
929       CcsRegisterHandler("CkPerfSummaryCcsClientCB", 
930                          CkCallback(CkIndex_TraceSummaryBOC::ccsRequestSummaryDouble(NULL), sumProxy[0]));
931       CcsRegisterHandler("CkPerfSummaryCcsClientCB uchar", 
932                          CkCallback(CkIndex_TraceSummaryBOC::ccsRequestSummaryUnsignedChar(NULL), sumProxy[0])); 
933
934       CkPrintf("[%d] Setting up periodic startCollectData callback\n", CkMyPe());
935       CcdCallOnConditionKeep(CcdPERIODIC_1second, startCollectData,
936                              (void *)this);
937       summaryCcsStreaming = CmiTrue;
938     }
939     firstTime = false;
940   }
941 }
942
943 /** Return summary information as double precision values for each sample period. 
944     The actual data collection is in double precision values. 
945
946     The units on the returned values are total execution time across all PEs.
947 */
948 void TraceSummaryBOC::ccsRequestSummaryDouble(CkCcsRequestMsg *m) {
949   double *sendBuffer;
950
951   CkPrintf("[%d] Request from Client detected.\n", CkMyPe());
952
953   CkPrintf("Responding ...\n");
954   int datalength = 0;
955   // if we have no data to send, send an acknowledgement code of -13.37
956   // instead.
957   if (ccsBufferedData->length() == 0) {
958     sendBuffer = new double[1];
959     sendBuffer[0] = -13.37;
960     datalength = sizeof(double);
961     CcsSendDelayedReply(m->reply, datalength, (void *)sendBuffer);
962     delete [] sendBuffer;
963   } else {
964     sendBuffer = ccsBufferedData->getVec();
965     datalength = ccsBufferedData->length()*sizeof(double);
966     CcsSendDelayedReply(m->reply, datalength, (void *)sendBuffer);
967     ccsBufferedData->free();
968   }
969   CkPrintf("Response Sent. Proceeding with computation.\n");
970   delete m;
971 }
972
973
974 /** Return summary information as unsigned char values for each sample period. 
975     The actual data collection is in double precision values.
976
977     This returns the utilization in a range from 0 to 200.
978 */
979 void TraceSummaryBOC::ccsRequestSummaryUnsignedChar(CkCcsRequestMsg *m) {
980   unsigned char *sendBuffer;
981
982   CkPrintf("[%d] Request from Client detected. \n", CkMyPe());
983
984   CkPrintf("Responding ...\n");
985   int datalength = 0;
986
987   if (ccsBufferedData->length() == 0) {
988     sendBuffer = new unsigned char[1];
989     sendBuffer[0] = 255;
990     datalength = sizeof(unsigned char);
991     CcsSendDelayedReply(m->reply, datalength, (void *)sendBuffer);
992     delete [] sendBuffer;
993   } else {
994     double * doubleData = ccsBufferedData->getVec();
995     int numData = ccsBufferedData->length();
996     
997     // pack data into unsigned char array
998     sendBuffer = new unsigned char[numData];
999     
1000     for(int i=0;i<numData;i++){
1001       sendBuffer[i] = 1000.0 * doubleData[i] / (double)CkNumPes() * 200.0; // max = 200 is the same as 100% utilization
1002       int v = sendBuffer[i];
1003     }    
1004
1005     datalength = sizeof(unsigned char) * numData;
1006     
1007     CcsSendDelayedReply(m->reply, datalength, (void *)sendBuffer);
1008     ccsBufferedData->free();
1009     delete [] sendBuffer;
1010   }
1011   CkPrintf("Response Sent. Proceeding with computation.\n");
1012   delete m;
1013 }
1014
1015
1016
1017 void startCollectData(void *data, double currT) {
1018   CkAssert(CkMyPe() == 0);
1019   // CkPrintf("startCollectData()\n");
1020   TraceSummaryBOC *sumObj = (TraceSummaryBOC *)data;
1021   int lastRequestedIndexBlock = sumObj->lastRequestedIndexBlock;
1022   double collectionGranularity = sumObj->collectionGranularity;
1023   int indicesPerBlock = sumObj->indicesPerBlock;
1024   
1025   double startTime = lastRequestedIndexBlock*
1026     collectionGranularity * indicesPerBlock;
1027   int numIndicesToGet = (int)floor((currT - startTime)/
1028                                    collectionGranularity);
1029   int numBlocksToGet = numIndicesToGet/indicesPerBlock;
1030   // **TODO** consider limiting the total number of blocks each collection
1031   //   request will pick up. This is to limit the amount of perturbation
1032   //   if it proves to be a problem.
1033   CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
1034
1035    sumProxy.collectSummaryData(startTime, 
1036                        collectionGranularity,
1037                        numBlocksToGet*indicesPerBlock);
1038   // assume success
1039   sumObj->lastRequestedIndexBlock += numBlocksToGet; 
1040 }
1041
1042 void TraceSummaryBOC::collectSummaryData(double startTime, double binSize,
1043                                   int numBins) {
1044   // CkPrintf("[%d] asked to contribute performance data\n", CkMyPe());
1045
1046   double *contribution = new double[numBins];
1047   for (int i=0; i<numBins; i++) {
1048     contribution[i] = 0.0;
1049   }
1050   CkpvAccess(_trace)->fillData(contribution, startTime, binSize, numBins);
1051
1052   /*
1053   for (int i=0; i<numBins; i++) {
1054     CkPrintf("[%d] %f\n", i, contribution[i]);
1055   }
1056   */
1057
1058   CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
1059   CkCallback cb(CkIndex_TraceSummaryBOC::summaryDataCollected(NULL), sumProxy[0]);
1060   contribute(sizeof(double)*numBins, contribution, CkReduction::sum_double, 
1061              cb);
1062 }
1063
1064 void TraceSummaryBOC::summaryDataCollected(CkReductionMsg *msg) {
1065   CkAssert(CkMyPe() == 0);
1066   // **CWL** No memory management for the ccs buffer for now.
1067
1068   // CkPrintf("[%d] Reduction completed and received\n", CkMyPe());
1069   double *recvData = (double *)msg->getData();
1070   int numBins = msg->getSize()/sizeof(double);
1071
1072   // if there's an easier way to append a data block to a CkVec, I'll take it
1073   for (int i=0; i<numBins; i++) {
1074     ccsBufferedData->insertAtEnd(recvData[i]);
1075   }
1076   delete msg;
1077 }
1078
1079
1080
1081
1082 void TraceSummaryBOC::startSumOnly()
1083 {
1084   CmiAssert(CkMyPe() == 0);
1085
1086   CProxy_TraceSummaryBOC p(traceSummaryGID);
1087   int size = CkpvAccess(_trace)->pool()->getNumEntries();
1088   p.askSummary(size);
1089 }
1090
1091 void TraceSummaryBOC::askSummary(int size)
1092 {
1093   if (CkpvAccess(_trace) == NULL) return;
1094
1095   int traced = CkpvAccess(_trace)->traceOnPE();
1096
1097   BinEntry *reductionBuffer = new BinEntry[size+1];
1098   reductionBuffer[size].time() = traced;  // last element is the traced pe count
1099   reductionBuffer[size].getIdleTime() = 0;  // last element is the traced pe count
1100   if (traced) {
1101     CkpvAccess(_trace)->endComputation();
1102     int n = CkpvAccess(_trace)->pool()->getNumEntries();
1103     BinEntry *localBins = CkpvAccess(_trace)->pool()->bins();
1104     if (n>size) n=size;
1105     for (int i=0; i<n; i++) reductionBuffer[i] = localBins[i];
1106   }
1107
1108   contribute(sizeof(BinEntry)*(size+1), reductionBuffer, 
1109              CkReduction::sum_double);
1110   delete [] reductionBuffer;
1111 }
1112
1113 //extern "C" void _CkExit();
1114
1115 void TraceSummaryBOC::sendSummaryBOC(CkReductionMsg *msg)
1116 {
1117   if (CkpvAccess(_trace) == NULL) return;
1118
1119   CkAssert(CkMyPe() == 0);
1120
1121   int n = msg->getSize()/sizeof(BinEntry);
1122   nBins = n-1;
1123   bins = (BinEntry *)msg->getData();
1124   nTracedPEs = (int)bins[n-1].time();
1125   // CmiPrintf("traced: %d entry:%d\n", nTracedPEs, nBins);
1126
1127   write();
1128
1129   delete msg;
1130
1131   CkExit();
1132 }
1133
1134 void TraceSummaryBOC::write(void) 
1135 {
1136   int i;
1137   unsigned int j;
1138
1139   char *fname = new char[strlen(CkpvAccess(traceRoot))+strlen(".sum")+1];
1140   sprintf(fname, "%s.sum", CkpvAccess(traceRoot));
1141   FILE *sumfp = fopen(fname, "w+");
1142   //CmiPrintf("File: %s \n", fname);
1143   if(sumfp == 0)
1144       CmiAbort("Cannot open summary sts file for writing.\n");
1145   delete[] fname;
1146
1147   int _numEntries=_entryTable.size();
1148   fprintf(sumfp, "ver:%3.1f %d/%d count:%d ep:%d interval:%e numTracedPE:%d", CkpvAccess(version), CkMyPe(), CkNumPes(), nBins, _numEntries, CkpvAccess(binSize), nTracedPEs);
1149   fprintf(sumfp, "\n");
1150
1151   // write bin time
1152 #if 0
1153   int last=pool[0].getU();
1154   writeU(fp, last);
1155   int count=1;
1156   for(j=1; j<numEntries; j++) {
1157     int u = pool[j].getU();
1158     if (last == u) {
1159       count++;
1160     }
1161     else {
1162       if (count > 1) fprintf(fp, "+%d", count);
1163       writeU(fp, u);
1164       last = u;
1165       count = 1;
1166     }
1167   }
1168   if (count > 1) fprintf(fp, "+%d", count);
1169 #else
1170   for(j=0; j<nBins; j++) {
1171     bins[j].time() /= nTracedPEs;
1172     bins[j].write(sumfp);
1173   }
1174 #endif
1175   fprintf(sumfp, "\n");
1176   fclose(sumfp);
1177
1178 }
1179
1180 extern "C" void CombineSummary()
1181 {
1182 #if CMK_TRACE_ENABLED
1183   CmiPrintf("[%d] CombineSummary called!\n", CkMyPe());
1184   if (sumonly) {
1185     CmiPrintf("[%d] Sum Only start!\n", CkMyPe());
1186       // pe 0 start the sumonly process
1187     CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
1188     sumProxy[0].startSumOnly();
1189   }
1190   else CkExit();
1191 #else
1192   CkExit();
1193 #endif
1194 }
1195
1196 void initTraceSummaryBOC()
1197 {
1198 #ifdef __BLUEGENE__
1199   if(BgNodeRank()==0) {
1200 #else
1201   if (CkMyRank() == 0) {
1202 #endif
1203     registerExitFn(CombineSummary);
1204   }
1205 }
1206
1207
1208
1209
1210
1211
1212 #include "TraceSummary.def.h"
1213
1214
1215 /*@}*/
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235