fixed a bug in shrinking (binsize doubled), the idle time was not handled properly.
[charm.git] / src / ck-perf / trace-summary.C
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 /**
9  * \addtogroup CkPerf
10 */
11 /*@{*/
12
13 #include "charm++.h"
14 #include "trace-summary.h"
15 #include "trace-summaryBOC.h"
16
17 #define DEBUGF(x)  // CmiPrintf x
18
19 #define VER   7.1
20
21 #define INVALIDEP     -2
22 #define TRACEON_EP     -3
23
24 // 1 minutes of run before it'll fill up:
25 #define DefaultBinCount      (1000*60*1) 
26
27 CkpvStaticDeclare(TraceSummary*, _trace);
28 static int _numEvents = 0;
29 #define NUM_DUMMY_EPS 9
30 CkpvDeclare(int, binCount);
31 CkpvDeclare(double, binSize);
32 CkpvDeclare(double, version);
33
34
35 CkpvDeclare(int, previouslySentBins);
36
37
38
39
40
41 /** 
42     A class that reads/writes a buffer out of different types of data.
43
44     This class exists because I need to get references to parts of the buffer 
45     that have already been used so that I can increment counters inside the buffer.
46 */
47
48 class compressedBuffer {
49 public:
50   char* buf;
51   int pos; ///<< byte position just beyond the previously read/written data
52
53   compressedBuffer(){
54     buf = NULL;
55     pos = 0;
56   }
57
58   compressedBuffer(int bytes){
59     buf = (char*)malloc(bytes);
60     pos = 0;
61   }
62
63   compressedBuffer(void *buffer){
64     buf = (char*)buffer;
65     pos = 0;
66   }
67     
68   void init(void *buffer){
69     buf = (char*)buffer;
70     pos = 0;
71   }
72     
73   inline void * currentPtr(){
74     return (void*)(buf+pos);
75   }
76
77   template <typename T>
78   T read(int offset){
79     // to resolve unaligned writes causing bus errors, need memcpy
80     T v;
81     memcpy(&v, buf+offset, sizeof(T));
82     return v;
83   }
84     
85   template <typename T>
86   void write(T v, int offset){
87     T v2 = v; // on stack
88     // to resolve unaligned writes causing bus errors, need memcpy
89     memcpy(buf+offset, &v2, sizeof(T));
90   }
91     
92   template <typename T>
93   void increment(int offset){
94     T temp;
95     temp = read<T>(offset);
96     temp ++;
97     write<T>(temp, offset);
98   }
99
100   template <typename T>
101   void accumulate(T v, int offset){
102     T temp;
103     temp = read<T>(offset);
104     temp += v;
105     write<T>(temp, offset);
106   }
107   
108   template <typename T>
109   int push(T v){
110     int oldpos = pos;
111     write<T>(v, pos);
112     pos += sizeof(T);
113     return oldpos;
114   }
115   
116   template <typename T>
117   T pop(){
118     T temp = read<T>(pos);
119     pos += sizeof(T);
120     return temp;
121   }
122
123   template <typename T>
124   T peek(){
125     T temp = read<T>(pos);
126     return temp;
127   }
128
129   template <typename T0, typename T>
130   T peekSecond(){
131     T temp;
132     memcpy(&temp, buf+pos+sizeof(T0), sizeof(T));
133     return temp;
134   }
135
136   int datalength(){
137     return pos;
138   }
139      
140   void * buffer(){
141     return (void*) buf;
142   }  
143
144   void freeBuf(){
145     free(buf);
146   }
147
148   ~compressedBuffer(){
149     // don't free the buf because the user my have supplied the buffer
150   }
151     
152 };
153
154
155
156
157 // Global Readonly
158 CkGroupID traceSummaryGID;
159 bool summaryCcsStreaming;
160
161 int sumonly = 0;
162 int sumDetail = 0;
163
164 /**
165   For each TraceFoo module, _createTraceFoo() must be defined.
166   This function is called in _createTraces() generated in moduleInit.C
167 */
168 void _createTracesummary(char **argv)
169 {
170   DEBUGF(("%d createTraceSummary\n", CkMyPe()));
171   CkpvInitialize(TraceSummary*, _trace);
172   CkpvInitialize(int, previouslySentBins);
173   CkpvAccess(previouslySentBins) = 0;
174   CkpvAccess(_trace) = new  TraceSummary(argv);
175   CkpvAccess(_traces)->addTrace(CkpvAccess(_trace));
176   if (CkMyPe()==0) CkPrintf("Charm++: Tracemode Summary enabled.\n");
177 }
178
179
180 /// function call for starting a phase in trace summary logs 
181 extern "C" 
182 void CkSummary_StartPhase(int phase)
183 {
184    CkpvAccess(_trace)->startPhase(phase);
185 }
186
187
188 /// function call for adding an event mark
189 extern "C" 
190 void CkSummary_MarkEvent(int eventType)
191 {
192    CkpvAccess(_trace)->addEventType(eventType);
193 }
194
195 static inline void writeU(FILE* fp, int u)
196 {
197   fprintf(fp, "%4d", u);
198 }
199
200 PhaseEntry::PhaseEntry() 
201 {
202   int _numEntries=_entryTable.size();
203   // FIXME: Hopes there won't be more than 10 more EP's registered from now on...
204   nEPs = _numEntries+10; 
205   count = new int[nEPs];
206   times = new double[nEPs];
207   maxtimes = new double[nEPs];
208   for (int i=0; i<nEPs; i++) {
209     count[i] = 0;
210     times[i] = 0.0;
211     maxtimes[i] = 0.;
212   }
213 }
214
215 SumLogPool::~SumLogPool() 
216 {
217   if (!sumonly) {
218     write();
219     fclose(fp);
220     if (sumDetail) fclose(sdfp);
221   }
222   // free memory for mark
223   if (markcount > 0)
224   for (int i=0; i<MAX_MARKS; i++) {
225     for (int j=0; j<events[i].length(); j++)
226       delete events[i][j];
227   }
228   delete[] pool;
229   delete[] epInfo;
230   delete[] cpuTime;
231   delete[] numExecutions;
232 }
233
234 void SumLogPool::addEventType(int eventType, double time)
235 {
236    if (eventType <0 || eventType >= MAX_MARKS) {
237        CkPrintf("Invalid event type %d!\n", eventType);
238        return;
239    }
240    MarkEntry *e = new MarkEntry;
241    e->time = time;
242    events[eventType].push_back(e);
243    markcount ++;
244 }
245
246 SumLogPool::SumLogPool(char *pgm) : numBins(0), phaseTab(MAX_PHASES) 
247 {
248    // TBD: Can this be moved to initMem?
249   cpuTime = NULL;
250    poolSize = CkpvAccess(binCount);
251    if (poolSize % 2) poolSize++;        // make sure it is even
252    pool = new BinEntry[poolSize];
253    _MEMCHECK(pool);
254
255    this->pgm = new char[strlen(pgm)+1];
256    strcpy(this->pgm,pgm);
257    
258 #if 0
259    // create the sts file
260    if (CkMyPe() == 0) {
261      char *fname = 
262        new char[strlen(CkpvAccess(traceRoot))+strlen(".sum.sts")+1];
263      sprintf(fname, "%s.sum.sts", CkpvAccess(traceRoot));
264      stsfp = fopen(fname, "w+");
265      //CmiPrintf("File: %s \n", fname);
266      if (stsfp == 0) {
267        CmiAbort("Cannot open summary sts file for writing.\n");
268       }
269      delete[] fname;
270    }
271 #endif
272    
273    // event
274    markcount = 0;
275 }
276
277 void SumLogPool::initMem()
278 {
279    int _numEntries=_entryTable.size();
280    epInfoSize = _numEntries + NUM_DUMMY_EPS + 1; // keep a spare EP
281    epInfo = new SumEntryInfo[epInfoSize];
282    _MEMCHECK(epInfo);
283
284    cpuTime = NULL;
285    numExecutions = NULL;
286    if (sumDetail) {
287        cpuTime = new double[poolSize*epInfoSize];
288        _MEMCHECK(cpuTime);
289        memset(cpuTime, 0, poolSize*epInfoSize*sizeof(double));
290        numExecutions = new int[poolSize*epInfoSize];
291        _MEMCHECK(numExecutions);
292        memset(numExecutions, 0, poolSize*epInfoSize*sizeof(int));
293
294 //         int i, e;
295 //         for(i=0; i<poolSize; i++) {
296 //             for(e=0; e< epInfoSize; e++) {
297 //                 setCPUtime(i,e,0.0);
298 //                 setNumExecutions(i,e,0);
299 //             }
300 //         }
301    }
302 }
303
304 int SumLogPool::getUtilization(int interval, int ep) {
305     return (int)(getCPUtime(interval, ep) * 100.0 / CkpvAccess(binSize)); 
306 }
307
308 void SumLogPool::write(void) 
309 {
310   int i;
311   unsigned int j;
312   int _numEntries=_entryTable.size();
313
314   fp = NULL;
315   sdfp = NULL;
316
317   // create file(s)
318   // CmiPrintf("TRACE: %s:%d\n", fname, errno);
319   if (!sumonly) {
320     char pestr[10];
321     sprintf(pestr, "%d", CkMyPe());
322     int len = strlen(pgm) + strlen(".sumd.") + strlen(pestr) + 1;
323     char *fname = new char[len+1];
324     
325     sprintf(fname, "%s.%s.sum", pgm, pestr);
326     do {
327       fp = fopen(fname, "w+");
328     } while (!fp && errno == EINTR);
329     if (!fp) {
330       CkPrintf("[%d] Attempting to open [%s]\n",CkMyPe(),fname);
331       CmiAbort("Cannot open Summary Trace File for writing...\n");
332     }
333     
334     if (sumDetail) {
335       sprintf(fname, "%s.%s.sumd", pgm, pestr);
336       do {
337         sdfp = fopen(fname, "w+");
338       } while (!sdfp && errno == EINTR);
339       if(!sdfp) {
340         CmiAbort("Cannot open Detailed Summary Trace File for writing...\n");
341       }
342     }
343     delete[] fname;
344   }
345
346   fprintf(fp, "ver:%3.1f %d/%d count:%d ep:%d interval:%e", CkpvAccess(version), CkMyPe(), CkNumPes(), numBins, _numEntries, CkpvAccess(binSize));
347   if (CkpvAccess(version)>=3.0)
348   {
349     fprintf(fp, " phases:%d", phaseTab.numPhasesCalled());
350   }
351   fprintf(fp, "\n");
352
353   // write bin time
354 #if 1
355   int last=pool[0].getU();
356   writeU(fp, last);
357   int count=1;
358   for(j=1; j<numBins; j++) {
359     int u = pool[j].getU();
360     if (last == u) {
361       count++;
362     }
363     else {
364       if (count > 1) fprintf(fp, "+%d", count);
365       writeU(fp, u);
366       last = u;
367       count = 1;
368     }
369   }
370   if (count > 1) fprintf(fp, "+%d", count);
371 #else
372   for(j=0; j<numEntries; j++) 
373       pool[j].write(fp);
374 #endif
375   fprintf(fp, "\n");
376
377   // write entry execution time
378   fprintf(fp, "EPExeTime: ");
379   for (i=0; i<_numEntries; i++)
380     fprintf(fp, "%ld ", (long)(epInfo[i].epTime*1.0e6));
381   fprintf(fp, "\n");
382   // write entry function call times
383   fprintf(fp, "EPCallTime: ");
384   for (i=0; i<_numEntries; i++)
385     fprintf(fp, "%d ", epInfo[i].epCount);
386   fprintf(fp, "\n");
387   // write max entry function execute times
388   fprintf(fp, "MaxEPTime: ");
389   for (i=0; i<_numEntries; i++)
390     fprintf(fp, "%ld ", (long)(epInfo[i].epMaxTime*1.0e6));
391   fprintf(fp, "\n");
392 #if 0
393   for (i=0; i<SumEntryInfo::HIST_SIZE; i++) {
394     for (j=0; j<_numEntries; j++) 
395       fprintf(fp, "%d ", epInfo[j].hist[i]);
396     fprintf(fp, "\n");
397   }
398 #endif
399   // write marks
400   if (CkpvAccess(version)>=2.0) 
401   {
402     fprintf(fp, "NumMarks: %d ", markcount);
403     for (i=0; i<MAX_MARKS; i++) {
404       for(int j=0; j<events[i].length(); j++)
405         fprintf(fp, "%d %f ", i, events[i][j]->time);
406     }
407     fprintf(fp, "\n");
408   }
409   // write phases
410   if (CkpvAccess(version)>=3.0)
411   {
412     phaseTab.write(fp);
413   }
414   // write idle time
415   if (CkpvAccess(version)>=7.1) {
416     fprintf(fp, "IdlePercent: ");
417     int last=pool[0].getUIdle();
418     writeU(fp, last);
419     int count=1;
420     for(j=1; j<numBins; j++) {
421       int u = pool[j].getUIdle();
422       if (last == u) {
423         count++;
424       }
425       else {
426         if (count > 1) fprintf(fp, "+%d", count);
427         writeU(fp, u);
428         last = u;
429         count = 1;
430       }
431     }
432     if (count > 1) fprintf(fp, "+%d", count);
433     fprintf(fp, "\n");
434   }
435
436
437   // write summary details
438   if (sumDetail) {
439         fprintf(sdfp, "ver:%3.1f cpu:%d/%d numIntervals:%d numEPs:%d intervalSize:%e\n",
440                 CkpvAccess(version), CkMyPe(), CkNumPes(),
441                 numBins, _numEntries, CkpvAccess(binSize));
442
443         // Write out cpuTime in microseconds
444         // Run length encoding (RLE) along EP axis
445         fprintf(sdfp, "ExeTimePerEPperInterval ");
446         unsigned int e, i;
447         long last= (long) (getCPUtime(0,0)*1.0e6);
448         int count=0;
449         fprintf(sdfp, "%ld", last);
450         for(e=0; e<_numEntries; e++) {
451             for(i=0; i<numBins; i++) {
452
453                 long u= (long) (getCPUtime(i,e)*1.0e6);
454                 if (last == u) {
455                     count++;
456                 } else {
457
458                     if (count > 1) fprintf(sdfp, "+%d", count);
459                     fprintf(sdfp, " %ld", u);
460                     last = u;
461                     count = 1;
462                 }
463             }
464         }
465         if (count > 1) fprintf(sdfp, "+%d", count);
466         fprintf(sdfp, "\n");
467
468         // Write out numExecutions
469         // Run length encoding (RLE) along EP axis
470         fprintf(sdfp, "EPCallTimePerInterval ");
471         last= getNumExecutions(0,0);
472         count=0;
473         fprintf(sdfp, "%d", last);
474         for(e=0; e<_numEntries; e++) {
475             for(i=0; i<numBins; i++) {
476
477                 long u= getNumExecutions(i, e);
478                 if (last == u) {
479                     count++;
480                 } else {
481
482                     if (count > 1) fprintf(sdfp, "+%d", count);
483                     fprintf(sdfp, " %d", u);
484                     last = u;
485                     count = 1;
486                 }
487             }
488         }
489         if (count > 1) fprintf(sdfp, "+%d", count);
490         fprintf(sdfp, "\n");
491   }
492 }
493
494 void SumLogPool::writeSts(void)
495 {
496     // open sts file
497   char *fname = 
498        new char[strlen(CkpvAccess(traceRoot))+strlen(".sum.sts")+1];
499   sprintf(fname, "%s.sum.sts", CkpvAccess(traceRoot));
500   stsfp = fopen(fname, "w+");
501   //CmiPrintf("File: %s \n", fname);
502   if (stsfp == 0) {
503        CmiAbort("Cannot open summary sts file for writing.\n");
504   }
505   delete[] fname;
506
507   traceWriteSTS(stsfp,_numEvents);
508   for(int i=0;i<_numEvents;i++)
509     fprintf(stsfp, "EVENT %d Event%d\n", i, i);
510   fprintf(stsfp, "END\n");
511
512   fclose(stsfp);
513 }
514
515 // Called once per interval
516 void SumLogPool::add(double time, double idleTime, int pe) 
517 {
518   new (&pool[numBins++]) BinEntry(time, idleTime);
519   if (poolSize==numBins) {
520     shrink();
521   }
522 }
523
524 // Called once per run of an EP
525 // adds 'time' to EP's time, increments epCount
526 void SumLogPool::setEp(int epidx, double time) 
527 {
528   if (epidx >= epInfoSize) {
529         CmiAbort("Invalid entry point!!\n");
530   }
531   //CmiPrintf("set EP: %d %e \n", epidx, time);
532   epInfo[epidx].setTime(time);
533   // set phase table counter
534   phaseTab.setEp(epidx, time);
535 }
536
537 // Called once from endExecute, endPack, etc. this function updates
538 // the sumDetail intervals.
539 void SumLogPool::updateSummaryDetail(int epIdx, double startTime, double endTime)
540 {
541         if (epIdx >= epInfoSize) {
542             CmiAbort("Too many entry points!!\n");
543         }
544
545         double binSz = CkpvAccess(binSize);
546         int startingBinIdx, endingBinIdx;
547         startingBinIdx = (int)(startTime/binSz);
548         endingBinIdx = (int)(endTime/binSz);
549         // shrink if needed
550         while (endingBinIdx >= poolSize) {
551           shrink();
552           CmiAssert(CkpvAccess(binSize) > binSz);
553           binSz = CkpvAccess(binSize);
554           startingBinIdx = (int)(startTime/binSz);
555           endingBinIdx = (int)(endTime/binSz);
556         }
557
558         if (startingBinIdx == endingBinIdx) {
559             addToCPUtime(startingBinIdx, epIdx, endTime - startTime);
560         } else if (startingBinIdx < endingBinIdx) { // EP spans intervals
561             addToCPUtime(startingBinIdx, epIdx, (startingBinIdx+1)*binSz - startTime);
562             while(++startingBinIdx < endingBinIdx)
563                 addToCPUtime(startingBinIdx, epIdx, binSz);
564             addToCPUtime(endingBinIdx, epIdx, endTime - endingBinIdx*binSz);
565         } else {
566           CkPrintf("[%d] EP:%d Start:%lf End:%lf\n",CkMyPe(),epIdx,
567                    startTime, endTime);
568             CmiAbort("Error: end time of EP is less than start time\n");
569         }
570
571         incNumExecutions(startingBinIdx, epIdx);
572 }
573
574 // Shrinks pool[], cpuTime[], and numExecutions[]
575 void SumLogPool::shrink(void)
576 {
577 //  double t = CmiWallTimer();
578
579   // We ensured earlier that poolSize is even; therefore now numBins
580   // == poolSize == even.
581   int entries = numBins/2;
582   for (int i=0; i<entries; i++)
583   {
584      pool[i].time() = pool[i*2].time() + pool[i*2+1].time();
585      pool[i].getIdleTime() = pool[i*2].getIdleTime() + pool[i*2+1].getIdleTime();
586      if (sumDetail)
587      for (int e=0; e < epInfoSize; e++) {
588          setCPUtime(i, e, getCPUtime(i*2, e) + getCPUtime(i*2+1, e));
589          setNumExecutions(i, e, getNumExecutions(i*2, e) + getNumExecutions(i*2+1, e));
590      }
591   }
592   // zero out the remaining intervals
593   if (sumDetail) {
594     memset(&cpuTime[entries*epInfoSize], 0, (numBins-entries)*epInfoSize*sizeof(double));
595     memset(&numExecutions[entries*epInfoSize], 0, (numBins-entries)*epInfoSize*sizeof(int));
596   }
597   numBins = entries;
598   CkpvAccess(binSize) *= 2;
599
600 //CkPrintf("Shrinked binsize: %f entries:%d takes %fs!!!!\n", CkpvAccess(binSize), numEntries, CmiWallTimer()-t);
601 }
602
603 int  BinEntry::getU() 
604
605   return (int)(_time * 100.0 / CkpvAccess(binSize)); 
606 }
607
608 int BinEntry::getUIdle() {
609   return (int)(_idleTime * 100.0 / CkpvAccess(binSize));
610 }
611
612 void BinEntry::write(FILE* fp)
613 {
614   writeU(fp, getU());
615 }
616
617 TraceSummary::TraceSummary(char **argv):binStart(0.0),idleStart(0.0),
618                                         binTime(0.0),binIdle(0.0),msgNum(0)
619 {
620   if (CkpvAccess(traceOnPe) == 0) return;
621
622   CkpvInitialize(int, binCount);
623   CkpvInitialize(double, binSize);
624   CkpvInitialize(double, version);
625   CkpvAccess(binSize) = BIN_SIZE;
626   CkpvAccess(version) = VER;
627   CkpvAccess(binCount) = DefaultBinCount;
628   if (CmiGetArgIntDesc(argv,"+bincount",&CkpvAccess(binCount), "Total number of summary bins"))
629     if (CkMyPe() == 0) 
630       CmiPrintf("Trace: bincount: %d\n", CkpvAccess(binCount));
631   CmiGetArgDoubleDesc(argv,"+binsize",&CkpvAccess(binSize),
632         "CPU usage log time resolution");
633   CmiGetArgDoubleDesc(argv,"+version",&CkpvAccess(version),
634         "Write this .sum file version");
635
636   epThreshold = 0.001; 
637   CmiGetArgDoubleDesc(argv,"+epThreshold",&epThreshold,
638         "Execution time histogram lower bound");
639   epInterval = 0.001; 
640   CmiGetArgDoubleDesc(argv,"+epInterval",&epInterval,
641         "Execution time histogram bin size");
642
643   sumonly = CmiGetArgFlagDesc(argv, "+sumonly", "merge histogram bins on processor 0");
644   // +sumonly overrides +sumDetail
645   if (!sumonly)
646       sumDetail = CmiGetArgFlagDesc(argv, "+sumDetail", "more detailed summary info");
647
648   _logPool = new SumLogPool(CkpvAccess(traceRoot));
649   // assume invalid entry point on start
650   execEp=INVALIDEP;
651 }
652
653 void TraceSummary::traceClearEps(void)
654 {
655   _logPool->clearEps();
656 }
657
658 void TraceSummary::traceWriteSts(void)
659 {
660   if(CkMyPe()==0)
661       _logPool->writeSts();
662 }
663
664 void TraceSummary::traceClose(void)
665 {
666   if(CkMyPe()==0)
667       _logPool->writeSts();
668   CkpvAccess(_trace)->endComputation();
669   // destructor call the write()
670   delete _logPool;
671   // remove myself from traceArray so that no tracing will be called.
672   CkpvAccess(_traces)->removeTrace(this);
673 }
674
675 void TraceSummary::beginExecute(CmiObjId *tid)
676 {
677   beginExecute(-1,-1,_threadEP,-1);
678 }
679
680 void TraceSummary::beginExecute(envelope *e)
681 {
682   // no message means thread execution
683   if (e==NULL) {
684     beginExecute(-1,-1,_threadEP,-1);
685   }
686   else {
687     beginExecute(-1,-1,e->getEpIdx(),-1);
688   }  
689 }
690
691 void TraceSummary::beginExecute(int event,int msgType,int ep,int srcPe, int mlen, CmiObjId *idx)
692 {
693   if (execEp != INVALIDEP) {
694     TRACE_WARN("Warning: TraceSummary two consecutive BEGIN_PROCESSING!\n");
695     return;
696   }
697   
698   execEp=ep;
699   double t = TraceTimer();
700   //CmiPrintf("start: %f \n", start);
701   
702   start = t;
703   double ts = binStart;
704   // fill gaps
705   while ((ts = ts + CkpvAccess(binSize)) < t) {
706     /* Keep as a template for error checking. The current form of this check
707        is vulnerable to round-off errors (eg. 0.001 vs 0.001 the first time
708        I used it). Perhaps an improved form could be used in case vastly
709        incompatible EP vs idle times are reported (binSize*2?).
710
711        This check will have to be duplicated before each call to add()
712
713     CkPrintf("[%d] %f vs %f\n", CkMyPe(),
714              binTime + binIdle, CkpvAccess(binSize));
715     CkAssert(binTime + binIdle <= CkpvAccess(binSize));
716     */
717      _logPool->add(binTime, binIdle, CkMyPe()); // add leftovers of last bin
718      binTime=0.0;                 // fill all other bins with 0 up till start
719      binIdle = 0.0;
720      binStart = ts;
721   }
722 }
723
724 void TraceSummary::endExecute(void)
725 {
726   double t = TraceTimer();
727   double ts = start;
728   double nts = binStart;
729
730 /*
731   if (execEp == TRACEON_EP) {
732     // if trace just got turned on, then one expects to see this
733     // END_PROCESSING event without seeing a preceeding BEGIN_PROCESSING
734     return;
735   }
736 */
737
738   if (execEp == INVALIDEP) {
739     TRACE_WARN("Warning: TraceSummary END_PROCESSING without BEGIN_PROCESSING!\n");
740     return;
741   }
742
743   if (execEp >= 0)
744   {
745     _logPool->setEp(execEp, t-ts);
746   }
747
748   while ((nts = nts + CkpvAccess(binSize)) < t)
749   {
750     // fill the bins with time for this entry method
751      binTime += nts-ts;
752      binStart  = nts;
753      // This calls shrink() if needed
754      _logPool->add(binTime, binIdle, CkMyPe()); 
755      binTime = 0.0;
756      binIdle = 0.0;
757      ts = nts;
758   }
759   binTime += t - ts;
760
761   if (sumDetail)
762       _logPool->updateSummaryDetail(execEp, start, t);
763
764   execEp = INVALIDEP;
765 }
766
767 void TraceSummary::beginIdle(double currT)
768 {
769   // for consistency with current framework behavior, currT is ignored and
770   // independent timing taken by trace-summary.
771   double t = TraceTimer(currT);
772   
773   // mark the time of this idle period. Only the next endIdle should see
774   // this value
775   idleStart = t; 
776   double ts = binStart;
777   // fill gaps
778   while ((ts = ts + CkpvAccess(binSize)) < t) {
779     _logPool->add(binTime, binIdle, CkMyPe()); // add leftovers of last bin
780     binTime=0.0;                 // fill all other bins with 0 up till start
781     binIdle = 0.0;
782     binStart = ts;
783   }
784 }
785
786 void TraceSummary::endIdle(double currT)
787 {
788   // again, we ignore the reported currT (see beginIdle)
789   double t = TraceTimer(currT);
790   double t_idleStart = idleStart;
791   double t_binStart = binStart;
792
793   while ((t_binStart = t_binStart + CkpvAccess(binSize)) < t)
794   {
795     // fill the bins with time for idle
796     binIdle += t_binStart - t_idleStart;
797     binStart = t_binStart;
798     _logPool->add(binTime, binIdle, CkMyPe()); // This calls shrink() if needed
799     binTime = 0.0;
800     binIdle = 0.0;
801     t_idleStart = t_binStart;
802   }
803   binIdle += t - t_idleStart;
804 }
805
806 void TraceSummary::traceBegin(void)
807 {
808   beginExecute(-1, -1, TRACEON_EP, -1, -1);
809 }
810
811 void TraceSummary::traceEnd(void)
812 {
813   endExecute();
814 }
815
816 void TraceSummary::beginPack(void)
817 {
818     packstart = CmiWallTimer();
819 }
820
821 void TraceSummary::endPack(void)
822 {
823     _logPool->setEp(_packEP, CmiWallTimer() - packstart);
824     if (sumDetail)
825         _logPool->updateSummaryDetail(_packEP,  TraceTimer(packstart), TraceTimer(CmiWallTimer()));
826 }
827
828 void TraceSummary::beginUnpack(void)
829 {
830     unpackstart = CmiWallTimer();
831 }
832
833 void TraceSummary::endUnpack(void)
834 {
835     _logPool->setEp(_unpackEP, CmiWallTimer()-unpackstart);
836     if (sumDetail)
837         _logPool->updateSummaryDetail(_unpackEP,  TraceTimer(unpackstart), TraceTimer(CmiWallTimer()));
838 }
839
840 void TraceSummary::beginComputation(void)
841 {
842   // initialze arrays because now the number of entries is known.
843   _logPool->initMem();
844 }
845
846 void TraceSummary::endComputation(void)
847 {
848   static int done = 0;
849   if (done) return;
850   done = 1;
851   if (msgNum==0) {
852 //CmiPrintf("Add at last: %d pe:%d time:%f msg:%d\n", index, CkMyPe(), bin, msgNum);
853      _logPool->add(binTime, binIdle, CkMyPe());
854      binTime = 0.0;
855      binIdle = 0.0;
856      msgNum ++;
857
858      binStart  += CkpvAccess(binSize);
859      double t = TraceTimer();
860      double ts = binStart;
861      while (ts < t)
862      {
863        _logPool->add(binTime, binIdle, CkMyPe());
864        binTime=0.0;
865        binIdle = 0.0;
866        ts += CkpvAccess(binSize);
867      }
868
869   }
870 }
871
872 void TraceSummary::addEventType(int eventType)
873 {
874   _logPool->addEventType(eventType, TraceTimer());
875 }
876
877 void TraceSummary::startPhase(int phase)
878 {
879    _logPool->startPhase(phase);
880 }
881
882 void TraceSummary::traceEnableCCS() {
883   CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
884   sumProxy.initCCS();
885 }
886
887
888 void TraceSummary::fillData(double *buffer, double reqStartTime, 
889                             double reqBinSize, int reqNumBins) {
890   // buffer has to be pre-allocated by the requester and must be an array of
891   // size reqNumBins.
892   //
893   // Assumptions: **CWL** FOR DEMO ONLY - a production-capable version will
894   //              need a number of these assumptions dropped:
895   //              1) reqBinSize == binSize (unrealistic)
896   //              2) bins boundary aligned (ok even under normal circumstances)
897   //              3) bins are "factor"-aligned (where reqBinSize != binSize)
898   //              4) bins are always available (true unless flush)
899   //              5) bins always starts from 0 (unrealistic)
900
901   // works only because of 1)
902   // **CWL** - FRACKING STUPID NAME "binStart" has nothing to do with 
903   //           "starting" at all!
904   int binOffset = (int)(reqStartTime/reqBinSize); 
905   for (int i=binOffset; i<binOffset + reqNumBins; i++) {
906     // CkPrintf("[%d] %f\n", i, pool()->getTime(i));
907     buffer[i-binOffset] = pool()->getTime(i);
908   }
909 }
910
911
912 /// for TraceSummaryBOC
913
914 void TraceSummaryBOC::initCCS() {
915   if(firstTime){
916     CkPrintf("[%d] initCCS() called for first time\n", CkMyPe());
917     // initializing CCS-based parameters on all processors
918     lastRequestedIndexBlock = 0;
919     indicesPerBlock = 1000;
920     collectionGranularity = 0.001; // time in seconds
921     nBufferedBins = 0;
922     
923     // initialize buffer, register CCS handler and start the collection
924     // pulse only on pe 0.
925     if (CkMyPe() == 0) { 
926       ccsBufferedData = new CkVec<double>();
927     
928       CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
929       CkPrintf("Trace Summary now listening in for CCS Client\n");
930       CcsRegisterHandler("CkPerfSummaryCcsClientCB", 
931                          CkCallback(CkIndex_TraceSummaryBOC::ccsRequestSummaryDouble(NULL), sumProxy[0]));
932       CcsRegisterHandler("CkPerfSummaryCcsClientCB uchar", 
933                          CkCallback(CkIndex_TraceSummaryBOC::ccsRequestSummaryUnsignedChar(NULL), sumProxy[0])); 
934
935       CkPrintf("[%d] Setting up periodic startCollectData callback\n", CkMyPe());
936       CcdCallOnConditionKeep(CcdPERIODIC_1second, startCollectData,
937                              (void *)this);
938       summaryCcsStreaming = CmiTrue;
939     }
940     firstTime = false;
941   }
942 }
943
944 /** Return summary information as double precision values for each sample period. 
945     The actual data collection is in double precision values. 
946
947     The units on the returned values are total execution time across all PEs.
948 */
949 void TraceSummaryBOC::ccsRequestSummaryDouble(CkCcsRequestMsg *m) {
950   double *sendBuffer;
951
952   CkPrintf("[%d] Request from Client detected.\n", CkMyPe());
953
954   CkPrintf("Responding ...\n");
955   int datalength = 0;
956   // if we have no data to send, send an acknowledgement code of -13.37
957   // instead.
958   if (ccsBufferedData->length() == 0) {
959     sendBuffer = new double[1];
960     sendBuffer[0] = -13.37;
961     datalength = sizeof(double);
962     CcsSendDelayedReply(m->reply, datalength, (void *)sendBuffer);
963     delete [] sendBuffer;
964   } else {
965     sendBuffer = ccsBufferedData->getVec();
966     datalength = ccsBufferedData->length()*sizeof(double);
967     CcsSendDelayedReply(m->reply, datalength, (void *)sendBuffer);
968     ccsBufferedData->free();
969   }
970   CkPrintf("Response Sent. Proceeding with computation.\n");
971   delete m;
972 }
973
974
975 /** Return summary information as unsigned char values for each sample period. 
976     The actual data collection is in double precision values.
977
978     This returns the utilization in a range from 0 to 200.
979 */
980 void TraceSummaryBOC::ccsRequestSummaryUnsignedChar(CkCcsRequestMsg *m) {
981   unsigned char *sendBuffer;
982
983   CkPrintf("[%d] Request from Client detected. \n", CkMyPe());
984
985   CkPrintf("Responding ...\n");
986   int datalength = 0;
987
988   if (ccsBufferedData->length() == 0) {
989     sendBuffer = new unsigned char[1];
990     sendBuffer[0] = 255;
991     datalength = sizeof(unsigned char);
992     CcsSendDelayedReply(m->reply, datalength, (void *)sendBuffer);
993     delete [] sendBuffer;
994   } else {
995     double * doubleData = ccsBufferedData->getVec();
996     int numData = ccsBufferedData->length();
997     
998     // pack data into unsigned char array
999     sendBuffer = new unsigned char[numData];
1000     
1001     for(int i=0;i<numData;i++){
1002       sendBuffer[i] = 1000.0 * doubleData[i] / (double)CkNumPes() * 200.0; // max = 200 is the same as 100% utilization
1003       int v = sendBuffer[i];
1004     }    
1005
1006     datalength = sizeof(unsigned char) * numData;
1007     
1008     CcsSendDelayedReply(m->reply, datalength, (void *)sendBuffer);
1009     ccsBufferedData->free();
1010     delete [] sendBuffer;
1011   }
1012   CkPrintf("Response Sent. Proceeding with computation.\n");
1013   delete m;
1014 }
1015
1016
1017
1018 void startCollectData(void *data, double currT) {
1019   CkAssert(CkMyPe() == 0);
1020   // CkPrintf("startCollectData()\n");
1021   TraceSummaryBOC *sumObj = (TraceSummaryBOC *)data;
1022   int lastRequestedIndexBlock = sumObj->lastRequestedIndexBlock;
1023   double collectionGranularity = sumObj->collectionGranularity;
1024   int indicesPerBlock = sumObj->indicesPerBlock;
1025   
1026   double startTime = lastRequestedIndexBlock*
1027     collectionGranularity * indicesPerBlock;
1028   int numIndicesToGet = (int)floor((currT - startTime)/
1029                                    collectionGranularity);
1030   int numBlocksToGet = numIndicesToGet/indicesPerBlock;
1031   // **TODO** consider limiting the total number of blocks each collection
1032   //   request will pick up. This is to limit the amount of perturbation
1033   //   if it proves to be a problem.
1034   CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
1035
1036    sumProxy.collectSummaryData(startTime, 
1037                        collectionGranularity,
1038                        numBlocksToGet*indicesPerBlock);
1039   // assume success
1040   sumObj->lastRequestedIndexBlock += numBlocksToGet; 
1041 }
1042
1043 void TraceSummaryBOC::collectSummaryData(double startTime, double binSize,
1044                                   int numBins) {
1045   // CkPrintf("[%d] asked to contribute performance data\n", CkMyPe());
1046
1047   double *contribution = new double[numBins];
1048   for (int i=0; i<numBins; i++) {
1049     contribution[i] = 0.0;
1050   }
1051   CkpvAccess(_trace)->fillData(contribution, startTime, binSize, numBins);
1052
1053   /*
1054   for (int i=0; i<numBins; i++) {
1055     CkPrintf("[%d] %f\n", i, contribution[i]);
1056   }
1057   */
1058
1059   CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
1060   CkCallback cb(CkIndex_TraceSummaryBOC::summaryDataCollected(NULL), sumProxy[0]);
1061   contribute(sizeof(double)*numBins, contribution, CkReduction::sum_double, 
1062              cb);
1063 }
1064
1065 void TraceSummaryBOC::summaryDataCollected(CkReductionMsg *msg) {
1066   CkAssert(CkMyPe() == 0);
1067   // **CWL** No memory management for the ccs buffer for now.
1068
1069   // CkPrintf("[%d] Reduction completed and received\n", CkMyPe());
1070   double *recvData = (double *)msg->getData();
1071   int numBins = msg->getSize()/sizeof(double);
1072
1073   // if there's an easier way to append a data block to a CkVec, I'll take it
1074   for (int i=0; i<numBins; i++) {
1075     ccsBufferedData->insertAtEnd(recvData[i]);
1076   }
1077   delete msg;
1078 }
1079
1080
1081
1082
1083 void TraceSummaryBOC::startSumOnly()
1084 {
1085   CmiAssert(CkMyPe() == 0);
1086
1087   CProxy_TraceSummaryBOC p(traceSummaryGID);
1088   int size = CkpvAccess(_trace)->pool()->getNumEntries();
1089   p.askSummary(size);
1090 }
1091
1092 void TraceSummaryBOC::askSummary(int size)
1093 {
1094   if (CkpvAccess(_trace) == NULL) return;
1095
1096   int traced = CkpvAccess(_trace)->traceOnPE();
1097
1098   BinEntry *reductionBuffer = new BinEntry[size+1];
1099   reductionBuffer[size].time() = traced;  // last element is the traced pe count
1100   reductionBuffer[size].getIdleTime() = 0;  // last element is the traced pe count
1101   if (traced) {
1102     CkpvAccess(_trace)->endComputation();
1103     int n = CkpvAccess(_trace)->pool()->getNumEntries();
1104     BinEntry *localBins = CkpvAccess(_trace)->pool()->bins();
1105     if (n>size) n=size;
1106     for (int i=0; i<n; i++) reductionBuffer[i] = localBins[i];
1107   }
1108
1109   contribute(sizeof(BinEntry)*(size+1), reductionBuffer, 
1110              CkReduction::sum_double);
1111   delete [] reductionBuffer;
1112 }
1113
1114 //extern "C" void _CkExit();
1115
1116 void TraceSummaryBOC::sendSummaryBOC(CkReductionMsg *msg)
1117 {
1118   if (CkpvAccess(_trace) == NULL) return;
1119
1120   CkAssert(CkMyPe() == 0);
1121
1122   int n = msg->getSize()/sizeof(BinEntry);
1123   nBins = n-1;
1124   bins = (BinEntry *)msg->getData();
1125   nTracedPEs = (int)bins[n-1].time();
1126   // CmiPrintf("traced: %d entry:%d\n", nTracedPEs, nBins);
1127
1128   write();
1129
1130   delete msg;
1131
1132   CkExit();
1133 }
1134
1135 void TraceSummaryBOC::write(void) 
1136 {
1137   int i;
1138   unsigned int j;
1139
1140   char *fname = new char[strlen(CkpvAccess(traceRoot))+strlen(".sum")+1];
1141   sprintf(fname, "%s.sum", CkpvAccess(traceRoot));
1142   FILE *sumfp = fopen(fname, "w+");
1143   //CmiPrintf("File: %s \n", fname);
1144   if(sumfp == 0)
1145       CmiAbort("Cannot open summary sts file for writing.\n");
1146   delete[] fname;
1147
1148   int _numEntries=_entryTable.size();
1149   fprintf(sumfp, "ver:%3.1f %d/%d count:%d ep:%d interval:%e numTracedPE:%d", CkpvAccess(version), CkMyPe(), CkNumPes(), nBins, _numEntries, CkpvAccess(binSize), nTracedPEs);
1150   fprintf(sumfp, "\n");
1151
1152   // write bin time
1153 #if 0
1154   int last=pool[0].getU();
1155   writeU(fp, last);
1156   int count=1;
1157   for(j=1; j<numEntries; j++) {
1158     int u = pool[j].getU();
1159     if (last == u) {
1160       count++;
1161     }
1162     else {
1163       if (count > 1) fprintf(fp, "+%d", count);
1164       writeU(fp, u);
1165       last = u;
1166       count = 1;
1167     }
1168   }
1169   if (count > 1) fprintf(fp, "+%d", count);
1170 #else
1171   for(j=0; j<nBins; j++) {
1172     bins[j].time() /= nTracedPEs;
1173     bins[j].write(sumfp);
1174   }
1175 #endif
1176   fprintf(sumfp, "\n");
1177   fclose(sumfp);
1178
1179 }
1180
1181 extern "C" void CombineSummary()
1182 {
1183 #if CMK_TRACE_ENABLED
1184   CmiPrintf("[%d] CombineSummary called!\n", CkMyPe());
1185   if (sumonly) {
1186     CmiPrintf("[%d] Sum Only start!\n", CkMyPe());
1187       // pe 0 start the sumonly process
1188     CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
1189     sumProxy[0].startSumOnly();
1190   }
1191   else CkExit();
1192 #else
1193   CkExit();
1194 #endif
1195 }
1196
1197 void initTraceSummaryBOC()
1198 {
1199 #ifdef __BLUEGENE__
1200   if(BgNodeRank()==0) {
1201 #else
1202   if (CkMyRank() == 0) {
1203 #endif
1204     registerExitFn(CombineSummary);
1205   }
1206 }
1207
1208
1209
1210
1211
1212
1213 #include "TraceSummary.def.h"
1214
1215
1216 /*@}*/
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236