Fixed message tracing on comm thread (to trace those entry methods that need
[charm.git] / src / ck-perf / trace-summary.C
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 /**
9  * \addtogroup CkPerf
10 */
11 /*@{*/
12
13 #include "charm++.h"
14 #include "trace-summary.h"
15 #include "trace-summaryBOC.h"
16
17 #define DEBUGF(x)  // CmiPrintf x
18
19 #define VER   7.1
20
21 #define INVALIDEP     -2
22 #define TRACEON_EP     -3
23
24 // 1 minutes of run before it'll fill up:
25 #define DefaultBinCount      (1000*60*1) 
26
27 CkpvStaticDeclare(TraceSummary*, _trace);
28 static int _numEvents = 0;
29 #define NUM_DUMMY_EPS 9
30 CkpvDeclare(int, binCount);
31 CkpvDeclare(double, binSize);
32 CkpvDeclare(double, version);
33
34
35 CkpvDeclare(int, previouslySentBins);
36
37
38
39
40
41 /** 
42     A class that reads/writes a buffer out of different types of data.
43
44     This class exists because I need to get references to parts of the buffer 
45     that have already been used so that I can increment counters inside the buffer.
46 */
47
48 class compressedBuffer {
49 public:
50   char* buf;
51   int pos; ///<< byte position just beyond the previously read/written data
52
53   compressedBuffer(){
54     buf = NULL;
55     pos = 0;
56   }
57
58   compressedBuffer(int bytes){
59     buf = (char*)malloc(bytes);
60     pos = 0;
61   }
62
63   compressedBuffer(void *buffer){
64     buf = (char*)buffer;
65     pos = 0;
66   }
67     
68   void init(void *buffer){
69     buf = (char*)buffer;
70     pos = 0;
71   }
72     
73   inline void * currentPtr(){
74     return (void*)(buf+pos);
75   }
76
77   template <typename T>
78   T read(int offset){
79     // to resolve unaligned writes causing bus errors, need memcpy
80     T v;
81     memcpy(&v, buf+offset, sizeof(T));
82     return v;
83   }
84     
85   template <typename T>
86   void write(T v, int offset){
87     T v2 = v; // on stack
88     // to resolve unaligned writes causing bus errors, need memcpy
89     memcpy(buf+offset, &v2, sizeof(T));
90   }
91     
92   template <typename T>
93   void increment(int offset){
94     T temp;
95     temp = read<T>(offset);
96     temp ++;
97     write<T>(temp, offset);
98   }
99
100   template <typename T>
101   void accumulate(T v, int offset){
102     T temp;
103     temp = read<T>(offset);
104     temp += v;
105     write<T>(temp, offset);
106   }
107   
108   template <typename T>
109   int push(T v){
110     int oldpos = pos;
111     write<T>(v, pos);
112     pos += sizeof(T);
113     return oldpos;
114   }
115   
116   template <typename T>
117   T pop(){
118     T temp = read<T>(pos);
119     pos += sizeof(T);
120     return temp;
121   }
122
123   template <typename T>
124   T peek(){
125     T temp = read<T>(pos);
126     return temp;
127   }
128
129   template <typename T0, typename T>
130   T peekSecond(){
131     T temp;
132     memcpy(&temp, buf+pos+sizeof(T0), sizeof(T));
133     return temp;
134   }
135
136   int datalength(){
137     return pos;
138   }
139      
140   void * buffer(){
141     return (void*) buf;
142   }  
143
144   void freeBuf(){
145     free(buf);
146   }
147
148   ~compressedBuffer(){
149     // don't free the buf because the user my have supplied the buffer
150   }
151     
152 };
153
154
155
156
157 // Global Readonly
158 CkGroupID traceSummaryGID;
159 bool summaryCcsStreaming;
160
161 int sumonly = 0;
162 int sumDetail = 0;
163
164 /**
165   For each TraceFoo module, _createTraceFoo() must be defined.
166   This function is called in _createTraces() generated in moduleInit.C
167 */
168 void _createTracesummary(char **argv)
169 {
170   DEBUGF(("%d createTraceSummary\n", CkMyPe()));
171   CkpvInitialize(TraceSummary*, _trace);
172   CkpvInitialize(int, previouslySentBins);
173   CkpvAccess(previouslySentBins) = 0;
174   CkpvAccess(_trace) = new  TraceSummary(argv);
175   CkpvAccess(_traces)->addTrace(CkpvAccess(_trace));
176   if (CkMyPe()==0) CkPrintf("Charm++: Tracemode Summary enabled.\n");
177 }
178
179
180 /// function call for starting a phase in trace summary logs 
181 extern "C" 
182 void CkSummary_StartPhase(int phase)
183 {
184    CkpvAccess(_trace)->startPhase(phase);
185 }
186
187
188 /// function call for adding an event mark
189 extern "C" 
190 void CkSummary_MarkEvent(int eventType)
191 {
192    CkpvAccess(_trace)->addEventType(eventType);
193 }
194
195 static inline void writeU(FILE* fp, int u)
196 {
197   fprintf(fp, "%4d", u);
198 }
199
200 PhaseEntry::PhaseEntry() 
201 {
202   int _numEntries=_entryTable.size();
203   // FIXME: Hopes there won't be more than 10 more EP's registered from now on...
204   nEPs = _numEntries+10; 
205   count = new int[nEPs];
206   times = new double[nEPs];
207   maxtimes = new double[nEPs];
208   for (int i=0; i<nEPs; i++) {
209     count[i] = 0;
210     times[i] = 0.0;
211     maxtimes[i] = 0.;
212   }
213 }
214
215 SumLogPool::~SumLogPool() 
216 {
217   if (!sumonly) {
218     write();
219     fclose(fp);
220     if (sumDetail) fclose(sdfp);
221   }
222   // free memory for mark
223   if (markcount > 0)
224   for (int i=0; i<MAX_MARKS; i++) {
225     for (int j=0; j<events[i].length(); j++)
226       delete events[i][j];
227   }
228   delete[] pool;
229   delete[] epInfo;
230   delete[] cpuTime;
231   delete[] numExecutions;
232 }
233
234 void SumLogPool::addEventType(int eventType, double time)
235 {
236    if (eventType <0 || eventType >= MAX_MARKS) {
237        CkPrintf("Invalid event type %d!\n", eventType);
238        return;
239    }
240    MarkEntry *e = new MarkEntry;
241    e->time = time;
242    events[eventType].push_back(e);
243    markcount ++;
244 }
245
246 SumLogPool::SumLogPool(char *pgm) : numBins(0), phaseTab(MAX_PHASES) 
247 {
248    // TBD: Can this be moved to initMem?
249   cpuTime = NULL;
250    poolSize = CkpvAccess(binCount);
251    if (poolSize % 2) poolSize++;        // make sure it is even
252    pool = new BinEntry[poolSize];
253    _MEMCHECK(pool);
254
255    this->pgm = new char[strlen(pgm)+1];
256    strcpy(this->pgm,pgm);
257    
258 #if 0
259    // create the sts file
260    if (CkMyPe() == 0) {
261      char *fname = 
262        new char[strlen(CkpvAccess(traceRoot))+strlen(".sum.sts")+1];
263      sprintf(fname, "%s.sum.sts", CkpvAccess(traceRoot));
264      stsfp = fopen(fname, "w+");
265      //CmiPrintf("File: %s \n", fname);
266      if (stsfp == 0) {
267        CmiAbort("Cannot open summary sts file for writing.\n");
268       }
269      delete[] fname;
270    }
271 #endif
272    
273    // event
274    markcount = 0;
275 }
276
277 void SumLogPool::initMem()
278 {
279    int _numEntries=_entryTable.size();
280    epInfoSize = _numEntries + NUM_DUMMY_EPS + 1; // keep a spare EP
281    epInfo = new SumEntryInfo[epInfoSize];
282    _MEMCHECK(epInfo);
283
284    cpuTime = NULL;
285    numExecutions = NULL;
286    if (sumDetail) {
287        cpuTime = new double[poolSize*epInfoSize];
288        _MEMCHECK(cpuTime);
289        memset(cpuTime, 0, poolSize*epInfoSize*sizeof(double));
290        numExecutions = new int[poolSize*epInfoSize];
291        _MEMCHECK(numExecutions);
292        memset(numExecutions, 0, poolSize*epInfoSize*sizeof(int));
293
294 //         int i, e;
295 //         for(i=0; i<poolSize; i++) {
296 //             for(e=0; e< epInfoSize; e++) {
297 //                 setCPUtime(i,e,0.0);
298 //                 setNumExecutions(i,e,0);
299 //             }
300 //         }
301    }
302 }
303
304 int SumLogPool::getUtilization(int interval, int ep) {
305     return (int)(getCPUtime(interval, ep) * 100.0 / CkpvAccess(binSize)); 
306 }
307
308 void SumLogPool::write(void) 
309 {
310   int i;
311   unsigned int j;
312   int _numEntries=_entryTable.size();
313
314   fp = NULL;
315   sdfp = NULL;
316
317   // create file(s)
318   // CmiPrintf("TRACE: %s:%d\n", fname, errno);
319   if (!sumonly) {
320     char pestr[10];
321     sprintf(pestr, "%d", CkMyPe());
322     int len = strlen(pgm) + strlen(".sumd.") + strlen(pestr) + 1;
323     char *fname = new char[len+1];
324     
325     sprintf(fname, "%s.%s.sum", pgm, pestr);
326     do {
327       fp = fopen(fname, "w+");
328     } while (!fp && errno == EINTR);
329     if (!fp) {
330       CkPrintf("[%d] Attempting to open [%s]\n",CkMyPe(),fname);
331       CmiAbort("Cannot open Summary Trace File for writing...\n");
332     }
333     
334     if (sumDetail) {
335       sprintf(fname, "%s.%s.sumd", pgm, pestr);
336       do {
337         sdfp = fopen(fname, "w+");
338       } while (!sdfp && errno == EINTR);
339       if(!sdfp) {
340         CmiAbort("Cannot open Detailed Summary Trace File for writing...\n");
341       }
342     }
343     delete[] fname;
344   }
345
346   fprintf(fp, "ver:%3.1f %d/%d count:%d ep:%d interval:%e", CkpvAccess(version), CkMyPe(), CkNumPes(), numBins, _numEntries, CkpvAccess(binSize));
347   if (CkpvAccess(version)>=3.0)
348   {
349     fprintf(fp, " phases:%d", phaseTab.numPhasesCalled());
350   }
351   fprintf(fp, "\n");
352
353   // write bin time
354 #if 1
355   int last=pool[0].getU();
356   writeU(fp, last);
357   int count=1;
358   for(j=1; j<numBins; j++) {
359     int u = pool[j].getU();
360     if (last == u) {
361       count++;
362     }
363     else {
364       if (count > 1) fprintf(fp, "+%d", count);
365       writeU(fp, u);
366       last = u;
367       count = 1;
368     }
369   }
370   if (count > 1) fprintf(fp, "+%d", count);
371 #else
372   for(j=0; j<numEntries; j++) 
373       pool[j].write(fp);
374 #endif
375   fprintf(fp, "\n");
376
377   // write entry execution time
378   fprintf(fp, "EPExeTime: ");
379   for (i=0; i<_numEntries; i++)
380     fprintf(fp, "%ld ", (long)(epInfo[i].epTime*1.0e6));
381   fprintf(fp, "\n");
382   // write entry function call times
383   fprintf(fp, "EPCallTime: ");
384   for (i=0; i<_numEntries; i++)
385     fprintf(fp, "%d ", epInfo[i].epCount);
386   fprintf(fp, "\n");
387   // write max entry function execute times
388   fprintf(fp, "MaxEPTime: ");
389   for (i=0; i<_numEntries; i++)
390     fprintf(fp, "%ld ", (long)(epInfo[i].epMaxTime*1.0e6));
391   fprintf(fp, "\n");
392 #if 0
393   for (i=0; i<SumEntryInfo::HIST_SIZE; i++) {
394     for (j=0; j<_numEntries; j++) 
395       fprintf(fp, "%d ", epInfo[j].hist[i]);
396     fprintf(fp, "\n");
397   }
398 #endif
399   // write marks
400   if (CkpvAccess(version)>=2.0) 
401   {
402     fprintf(fp, "NumMarks: %d ", markcount);
403     for (i=0; i<MAX_MARKS; i++) {
404       for(int j=0; j<events[i].length(); j++)
405         fprintf(fp, "%d %f ", i, events[i][j]->time);
406     }
407     fprintf(fp, "\n");
408   }
409   // write phases
410   if (CkpvAccess(version)>=3.0)
411   {
412     phaseTab.write(fp);
413   }
414   // write idle time
415   if (CkpvAccess(version)>=7.1) {
416     fprintf(fp, "IdlePercent: ");
417     int last=pool[0].getUIdle();
418     writeU(fp, last);
419     int count=1;
420     for(j=1; j<numBins; j++) {
421       int u = pool[j].getUIdle();
422       if (last == u) {
423         count++;
424       }
425       else {
426         if (count > 1) fprintf(fp, "+%d", count);
427         writeU(fp, u);
428         last = u;
429         count = 1;
430       }
431     }
432     if (count > 1) fprintf(fp, "+%d", count);
433     fprintf(fp, "\n");
434   }
435
436
437   // write summary details
438   if (sumDetail) {
439         fprintf(sdfp, "ver:%3.1f cpu:%d/%d numIntervals:%d numEPs:%d intervalSize:%e\n",
440                 CkpvAccess(version), CkMyPe(), CkNumPes(),
441                 numBins, _numEntries, CkpvAccess(binSize));
442
443         // Write out cpuTime in microseconds
444         // Run length encoding (RLE) along EP axis
445         fprintf(sdfp, "ExeTimePerEPperInterval ");
446         unsigned int e, i;
447         long last= (long) (getCPUtime(0,0)*1.0e6);
448         int count=0;
449         fprintf(sdfp, "%ld", last);
450         for(e=0; e<_numEntries; e++) {
451             for(i=0; i<numBins; i++) {
452
453                 long u= (long) (getCPUtime(i,e)*1.0e6);
454                 if (last == u) {
455                     count++;
456                 } else {
457
458                     if (count > 1) fprintf(sdfp, "+%d", count);
459                     fprintf(sdfp, " %ld", u);
460                     last = u;
461                     count = 1;
462                 }
463             }
464         }
465         if (count > 1) fprintf(sdfp, "+%d", count);
466         fprintf(sdfp, "\n");
467
468         // Write out numExecutions
469         // Run length encoding (RLE) along EP axis
470         fprintf(sdfp, "EPCallTimePerInterval ");
471         last= getNumExecutions(0,0);
472         count=0;
473         fprintf(sdfp, "%d", last);
474         for(e=0; e<_numEntries; e++) {
475             for(i=0; i<numBins; i++) {
476
477                 long u= getNumExecutions(i, e);
478                 if (last == u) {
479                     count++;
480                 } else {
481
482                     if (count > 1) fprintf(sdfp, "+%d", count);
483                     fprintf(sdfp, " %d", u);
484                     last = u;
485                     count = 1;
486                 }
487             }
488         }
489         if (count > 1) fprintf(sdfp, "+%d", count);
490         fprintf(sdfp, "\n");
491   }
492 }
493
494 void SumLogPool::writeSts(void)
495 {
496     // open sts file
497   char *fname = 
498        new char[strlen(CkpvAccess(traceRoot))+strlen(".sum.sts")+1];
499   sprintf(fname, "%s.sum.sts", CkpvAccess(traceRoot));
500   stsfp = fopen(fname, "w+");
501   //CmiPrintf("File: %s \n", fname);
502   if (stsfp == 0) {
503        CmiAbort("Cannot open summary sts file for writing.\n");
504   }
505   delete[] fname;
506
507   traceWriteSTS(stsfp,_numEvents);
508   for(int i=0;i<_numEvents;i++)
509     fprintf(stsfp, "EVENT %d Event%d\n", i, i);
510   fprintf(stsfp, "END\n");
511
512   fclose(stsfp);
513 }
514
515 // Called once per interval
516 void SumLogPool::add(double time, double idleTime, int pe) 
517 {
518   new (&pool[numBins++]) BinEntry(time, idleTime);
519   if (poolSize==numBins) {
520     shrink();
521   }
522 }
523
524 // Called once per run of an EP
525 // adds 'time' to EP's time, increments epCount
526 void SumLogPool::setEp(int epidx, double time) 
527 {
528   if (epidx >= epInfoSize) {
529         CmiAbort("Invalid entry point!!\n");
530   }
531   //CmiPrintf("set EP: %d %e \n", epidx, time);
532   epInfo[epidx].setTime(time);
533   // set phase table counter
534   phaseTab.setEp(epidx, time);
535 }
536
537 // Called once from endExecute, endPack, etc. this function updates
538 // the sumDetail intervals.
539 void SumLogPool::updateSummaryDetail(int epIdx, double startTime, double endTime)
540 {
541         if (epIdx >= epInfoSize) {
542             CmiAbort("Too many entry points!!\n");
543         }
544
545         double binSz = CkpvAccess(binSize);
546         int startingBinIdx, endingBinIdx;
547         startingBinIdx = (int)(startTime/binSz);
548         endingBinIdx = (int)(endTime/binSz);
549         // shrink if needed
550         while (endingBinIdx >= poolSize) {
551           shrink();
552           CmiAssert(CkpvAccess(binSize) > binSz);
553           binSz = CkpvAccess(binSize);
554           startingBinIdx = (int)(startTime/binSz);
555           endingBinIdx = (int)(endTime/binSz);
556         }
557
558         if (startingBinIdx == endingBinIdx) {
559             addToCPUtime(startingBinIdx, epIdx, endTime - startTime);
560         } else if (startingBinIdx < endingBinIdx) { // EP spans intervals
561             addToCPUtime(startingBinIdx, epIdx, (startingBinIdx+1)*binSz - startTime);
562             while(++startingBinIdx < endingBinIdx)
563                 addToCPUtime(startingBinIdx, epIdx, binSz);
564             addToCPUtime(endingBinIdx, epIdx, endTime - endingBinIdx*binSz);
565         } else {
566           CkPrintf("[%d] EP:%d Start:%lf End:%lf\n",CkMyPe(),epIdx,
567                    startTime, endTime);
568             CmiAbort("Error: end time of EP is less than start time\n");
569         }
570
571         incNumExecutions(startingBinIdx, epIdx);
572 }
573
574 // Shrinks pool[], cpuTime[], and numExecutions[]
575 void SumLogPool::shrink(void)
576 {
577 //  double t = CmiWallTimer();
578
579   // We ensured earlier that poolSize is even; therefore now numBins
580   // == poolSize == even.
581   int entries = numBins/2;
582   for (int i=0; i<entries; i++)
583   {
584      pool[i].time() = pool[i*2].time() + pool[i*2+1].time();
585      pool[i].getIdleTime() = pool[i*2].getIdleTime() + pool[i*2+1].getIdleTime();
586      if (sumDetail)
587      for (int e=0; e < epInfoSize; e++) {
588          setCPUtime(i, e, getCPUtime(i*2, e) + getCPUtime(i*2+1, e));
589          setNumExecutions(i, e, getNumExecutions(i*2, e) + getNumExecutions(i*2+1, e));
590      }
591   }
592   // zero out the remaining intervals
593   if (sumDetail) {
594     memset(&cpuTime[entries*epInfoSize], 0, (numBins-entries)*epInfoSize*sizeof(double));
595     memset(&numExecutions[entries*epInfoSize], 0, (numBins-entries)*epInfoSize*sizeof(int));
596   }
597   numBins = entries;
598   CkpvAccess(binSize) *= 2;
599
600 //CkPrintf("Shrinked binsize: %f entries:%d takes %fs!!!!\n", CkpvAccess(binSize), numEntries, CmiWallTimer()-t);
601 }
602
603 int  BinEntry::getU() 
604
605   return (int)(_time * 100.0 / CkpvAccess(binSize)); 
606 }
607
608 int BinEntry::getUIdle() {
609   return (int)(_idleTime * 100.0 / CkpvAccess(binSize));
610 }
611
612 void BinEntry::write(FILE* fp)
613 {
614   writeU(fp, getU());
615 }
616
617 TraceSummary::TraceSummary(char **argv):binStart(0.0),idleStart(0.0),
618                                         binTime(0.0),binIdle(0.0),msgNum(0)
619 {
620   if (CkpvAccess(traceOnPe) == 0) return;
621
622   CkpvInitialize(int, binCount);
623   CkpvInitialize(double, binSize);
624   CkpvInitialize(double, version);
625   CkpvAccess(binSize) = BIN_SIZE;
626   CkpvAccess(version) = VER;
627   CkpvAccess(binCount) = DefaultBinCount;
628   if (CmiGetArgIntDesc(argv,"+bincount",&CkpvAccess(binCount), "Total number of summary bins"))
629     if (CkMyPe() == 0) 
630       CmiPrintf("Trace: bincount: %d\n", CkpvAccess(binCount));
631   CmiGetArgDoubleDesc(argv,"+binsize",&CkpvAccess(binSize),
632         "CPU usage log time resolution");
633   CmiGetArgDoubleDesc(argv,"+version",&CkpvAccess(version),
634         "Write this .sum file version");
635
636   epThreshold = 0.001; 
637   CmiGetArgDoubleDesc(argv,"+epThreshold",&epThreshold,
638         "Execution time histogram lower bound");
639   epInterval = 0.001; 
640   CmiGetArgDoubleDesc(argv,"+epInterval",&epInterval,
641         "Execution time histogram bin size");
642
643   sumonly = CmiGetArgFlagDesc(argv, "+sumonly", "merge histogram bins on processor 0");
644   // +sumonly overrides +sumDetail
645   if (!sumonly)
646       sumDetail = CmiGetArgFlagDesc(argv, "+sumDetail", "more detailed summary info");
647
648   _logPool = new SumLogPool(CkpvAccess(traceRoot));
649   // assume invalid entry point on start
650   execEp=INVALIDEP;
651 }
652
653 void TraceSummary::traceClearEps(void)
654 {
655   _logPool->clearEps();
656 }
657
658 void TraceSummary::traceWriteSts(void)
659 {
660   if(CkMyPe()==0)
661       _logPool->writeSts();
662 }
663
664 void TraceSummary::traceClose(void)
665 {
666   if(CkMyPe()==0)
667       _logPool->writeSts();
668   CkpvAccess(_trace)->endComputation();
669   // destructor call the write()
670   delete _logPool;
671   // remove myself from traceArray so that no tracing will be called.
672   CkpvAccess(_traces)->removeTrace(this);
673 }
674
675 void TraceSummary::beginExecute(CmiObjId *tid)
676 {
677   beginExecute(-1,-1,_threadEP,-1);
678 }
679
680 void TraceSummary::beginExecute(envelope *e)
681 {
682   // no message means thread execution
683   if (e==NULL) {
684     beginExecute(-1,-1,_threadEP,-1);
685   }
686   else {
687     beginExecute(-1,-1,e->getEpIdx(),-1);
688   }  
689 }
690
691 void TraceSummary::beginExecute(char *msg)
692 {
693 #if CMK_SMP_TRACE_COMMTHREAD
694     //This function is called from comm thread in SMP mode
695     envelope *e = (envelope *)msg;
696     int num = _entryTable.size();
697     int ep = e->getEpIdx();
698     if(ep<0 || ep>=num) return;
699     if(_entryTable[ep]->traceEnabled)
700         beginExecute(-1,-1,e->getEpIdx(),-1);
701 #endif
702 }
703
704 void TraceSummary::beginExecute(int event,int msgType,int ep,int srcPe, int mlen, CmiObjId *idx)
705 {
706   if (execEp != INVALIDEP) {
707     TRACE_WARN("Warning: TraceSummary two consecutive BEGIN_PROCESSING!\n");
708     return;
709   }
710   
711   execEp=ep;
712   double t = TraceTimer();
713   //CmiPrintf("start: %f \n", start);
714   
715   start = t;
716   double ts = binStart;
717   // fill gaps
718   while ((ts = ts + CkpvAccess(binSize)) < t) {
719     /* Keep as a template for error checking. The current form of this check
720        is vulnerable to round-off errors (eg. 0.001 vs 0.001 the first time
721        I used it). Perhaps an improved form could be used in case vastly
722        incompatible EP vs idle times are reported (binSize*2?).
723
724        This check will have to be duplicated before each call to add()
725
726     CkPrintf("[%d] %f vs %f\n", CkMyPe(),
727              binTime + binIdle, CkpvAccess(binSize));
728     CkAssert(binTime + binIdle <= CkpvAccess(binSize));
729     */
730      _logPool->add(binTime, binIdle, CkMyPe()); // add leftovers of last bin
731      binTime=0.0;                 // fill all other bins with 0 up till start
732      binIdle = 0.0;
733      binStart = ts;
734   }
735 }
736
737 void TraceSummary::endExecute(void)
738 {
739   double t = TraceTimer();
740   double ts = start;
741   double nts = binStart;
742
743 /*
744   if (execEp == TRACEON_EP) {
745     // if trace just got turned on, then one expects to see this
746     // END_PROCESSING event without seeing a preceeding BEGIN_PROCESSING
747     return;
748   }
749 */
750
751   if (execEp == INVALIDEP) {
752     TRACE_WARN("Warning: TraceSummary END_PROCESSING without BEGIN_PROCESSING!\n");
753     return;
754   }
755
756   if (execEp >= 0)
757   {
758     _logPool->setEp(execEp, t-ts);
759   }
760
761   while ((nts = nts + CkpvAccess(binSize)) < t)
762   {
763     // fill the bins with time for this entry method
764      binTime += nts-ts;
765      binStart  = nts;
766      // This calls shrink() if needed
767      _logPool->add(binTime, binIdle, CkMyPe()); 
768      binTime = 0.0;
769      binIdle = 0.0;
770      ts = nts;
771   }
772   binTime += t - ts;
773
774   if (sumDetail && execEp >= 0)
775       _logPool->updateSummaryDetail(execEp, start, t);
776
777   execEp = INVALIDEP;
778 }
779
780 void TraceSummary::endExecute(char *msg){
781 #if CMK_SMP_TRACE_COMMTHREAD
782     //This function is called from comm thread in SMP mode
783     envelope *e = (envelope *)msg;
784     int num = _entryTable.size();
785     int ep = e->getEpIdx();
786     if(ep<0 || ep>=num) return;
787     if(_entryTable[ep]->traceEnabled){
788         endExecute();
789     }
790 #endif    
791 }
792
793 void TraceSummary::beginIdle(double currT)
794 {
795   double t = TraceTimer(currT);
796   
797   // mark the time of this idle period. Only the next endIdle should see
798   // this value
799   idleStart = t; 
800   double ts = binStart;
801   // fill gaps
802   while ((ts = ts + CkpvAccess(binSize)) < t) {
803     _logPool->add(binTime, binIdle, CkMyPe()); // add leftovers of last bin
804     binTime=0.0;                 // fill all other bins with 0 up till start
805     binIdle = 0.0;
806     binStart = ts;
807   }
808 }
809
810 void TraceSummary::endIdle(double currT)
811 {
812   double t = TraceTimer(currT);
813   double t_idleStart = idleStart;
814   double t_binStart = binStart;
815
816   while ((t_binStart = t_binStart + CkpvAccess(binSize)) < t)
817   {
818     // fill the bins with time for idle
819     binIdle += t_binStart - t_idleStart;
820     binStart = t_binStart;
821     _logPool->add(binTime, binIdle, CkMyPe()); // This calls shrink() if needed
822     binTime = 0.0;
823     binIdle = 0.0;
824     t_idleStart = t_binStart;
825   }
826   binIdle += t - t_idleStart;
827 }
828
829 void TraceSummary::traceBegin(void)
830 {
831     // fake as a start of an event, assuming traceBegin is called inside an
832     // entry function.
833   beginExecute(-1, -1, TRACEON_EP, -1, -1);
834 }
835
836 void TraceSummary::traceEnd(void)
837 {
838   endExecute();
839 }
840
841 void TraceSummary::beginPack(void)
842 {
843     packstart = CmiWallTimer();
844 }
845
846 void TraceSummary::endPack(void)
847 {
848     _logPool->setEp(_packEP, CmiWallTimer() - packstart);
849     if (sumDetail)
850         _logPool->updateSummaryDetail(_packEP,  TraceTimer(packstart), TraceTimer(CmiWallTimer()));
851 }
852
853 void TraceSummary::beginUnpack(void)
854 {
855     unpackstart = CmiWallTimer();
856 }
857
858 void TraceSummary::endUnpack(void)
859 {
860     _logPool->setEp(_unpackEP, CmiWallTimer()-unpackstart);
861     if (sumDetail)
862         _logPool->updateSummaryDetail(_unpackEP,  TraceTimer(unpackstart), TraceTimer(CmiWallTimer()));
863 }
864
865 void TraceSummary::beginComputation(void)
866 {
867   // initialze arrays because now the number of entries is known.
868   _logPool->initMem();
869 }
870
871 void TraceSummary::endComputation(void)
872 {
873   static int done = 0;
874   if (done) return;
875   done = 1;
876   if (msgNum==0) {
877 //CmiPrintf("Add at last: %d pe:%d time:%f msg:%d\n", index, CkMyPe(), bin, msgNum);
878      _logPool->add(binTime, binIdle, CkMyPe());
879      binTime = 0.0;
880      binIdle = 0.0;
881      msgNum ++;
882
883      binStart  += CkpvAccess(binSize);
884      double t = TraceTimer();
885      double ts = binStart;
886      while (ts < t)
887      {
888        _logPool->add(binTime, binIdle, CkMyPe());
889        binTime=0.0;
890        binIdle = 0.0;
891        ts += CkpvAccess(binSize);
892      }
893
894   }
895 }
896
897 void TraceSummary::addEventType(int eventType)
898 {
899   _logPool->addEventType(eventType, TraceTimer());
900 }
901
902 void TraceSummary::startPhase(int phase)
903 {
904    _logPool->startPhase(phase);
905 }
906
907 void TraceSummary::traceEnableCCS() {
908   CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
909   sumProxy.initCCS();
910 }
911
912
913 void TraceSummary::fillData(double *buffer, double reqStartTime, 
914                             double reqBinSize, int reqNumBins) {
915   // buffer has to be pre-allocated by the requester and must be an array of
916   // size reqNumBins.
917   //
918   // Assumptions: **CWL** FOR DEMO ONLY - a production-capable version will
919   //              need a number of these assumptions dropped:
920   //              1) reqBinSize == binSize (unrealistic)
921   //              2) bins boundary aligned (ok even under normal circumstances)
922   //              3) bins are "factor"-aligned (where reqBinSize != binSize)
923   //              4) bins are always available (true unless flush)
924   //              5) bins always starts from 0 (unrealistic)
925
926   // works only because of 1)
927   // **CWL** - FRACKING STUPID NAME "binStart" has nothing to do with 
928   //           "starting" at all!
929   int binOffset = (int)(reqStartTime/reqBinSize); 
930   for (int i=binOffset; i<binOffset + reqNumBins; i++) {
931     // CkPrintf("[%d] %f\n", i, pool()->getTime(i));
932     buffer[i-binOffset] = pool()->getTime(i);
933   }
934 }
935
936
937 /// for TraceSummaryBOC
938
939 void TraceSummaryBOC::initCCS() {
940   if(firstTime){
941     CkPrintf("[%d] initCCS() called for first time\n", CkMyPe());
942     // initializing CCS-based parameters on all processors
943     lastRequestedIndexBlock = 0;
944     indicesPerBlock = 1000;
945     collectionGranularity = 0.001; // time in seconds
946     nBufferedBins = 0;
947     
948     // initialize buffer, register CCS handler and start the collection
949     // pulse only on pe 0.
950     if (CkMyPe() == 0) { 
951       ccsBufferedData = new CkVec<double>();
952     
953       CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
954       CkPrintf("Trace Summary now listening in for CCS Client\n");
955       CcsRegisterHandler("CkPerfSummaryCcsClientCB", 
956                          CkCallback(CkIndex_TraceSummaryBOC::ccsRequestSummaryDouble(NULL), sumProxy[0]));
957       CcsRegisterHandler("CkPerfSummaryCcsClientCB uchar", 
958                          CkCallback(CkIndex_TraceSummaryBOC::ccsRequestSummaryUnsignedChar(NULL), sumProxy[0])); 
959
960       CkPrintf("[%d] Setting up periodic startCollectData callback\n", CkMyPe());
961       CcdCallOnConditionKeep(CcdPERIODIC_1second, startCollectData,
962                              (void *)this);
963       summaryCcsStreaming = CmiTrue;
964     }
965     firstTime = false;
966   }
967 }
968
969 /** Return summary information as double precision values for each sample period. 
970     The actual data collection is in double precision values. 
971
972     The units on the returned values are total execution time across all PEs.
973 */
974 void TraceSummaryBOC::ccsRequestSummaryDouble(CkCcsRequestMsg *m) {
975   double *sendBuffer;
976
977   CkPrintf("[%d] Request from Client detected.\n", CkMyPe());
978
979   CkPrintf("Responding ...\n");
980   int datalength = 0;
981   // if we have no data to send, send an acknowledgement code of -13.37
982   // instead.
983   if (ccsBufferedData->length() == 0) {
984     sendBuffer = new double[1];
985     sendBuffer[0] = -13.37;
986     datalength = sizeof(double);
987     CcsSendDelayedReply(m->reply, datalength, (void *)sendBuffer);
988     delete [] sendBuffer;
989   } else {
990     sendBuffer = ccsBufferedData->getVec();
991     datalength = ccsBufferedData->length()*sizeof(double);
992     CcsSendDelayedReply(m->reply, datalength, (void *)sendBuffer);
993     ccsBufferedData->free();
994   }
995   CkPrintf("Response Sent. Proceeding with computation.\n");
996   delete m;
997 }
998
999
1000 /** Return summary information as unsigned char values for each sample period. 
1001     The actual data collection is in double precision values.
1002
1003     This returns the utilization in a range from 0 to 200.
1004 */
1005 void TraceSummaryBOC::ccsRequestSummaryUnsignedChar(CkCcsRequestMsg *m) {
1006   unsigned char *sendBuffer;
1007
1008   CkPrintf("[%d] Request from Client detected. \n", CkMyPe());
1009
1010   CkPrintf("Responding ...\n");
1011   int datalength = 0;
1012
1013   if (ccsBufferedData->length() == 0) {
1014     sendBuffer = new unsigned char[1];
1015     sendBuffer[0] = 255;
1016     datalength = sizeof(unsigned char);
1017     CcsSendDelayedReply(m->reply, datalength, (void *)sendBuffer);
1018     delete [] sendBuffer;
1019   } else {
1020     double * doubleData = ccsBufferedData->getVec();
1021     int numData = ccsBufferedData->length();
1022     
1023     // pack data into unsigned char array
1024     sendBuffer = new unsigned char[numData];
1025     
1026     for(int i=0;i<numData;i++){
1027       sendBuffer[i] = 1000.0 * doubleData[i] / (double)CkNumPes() * 200.0; // max = 200 is the same as 100% utilization
1028       int v = sendBuffer[i];
1029     }    
1030
1031     datalength = sizeof(unsigned char) * numData;
1032     
1033     CcsSendDelayedReply(m->reply, datalength, (void *)sendBuffer);
1034     ccsBufferedData->free();
1035     delete [] sendBuffer;
1036   }
1037   CkPrintf("Response Sent. Proceeding with computation.\n");
1038   delete m;
1039 }
1040
1041
1042
1043 void startCollectData(void *data, double currT) {
1044   CkAssert(CkMyPe() == 0);
1045   // CkPrintf("startCollectData()\n");
1046   TraceSummaryBOC *sumObj = (TraceSummaryBOC *)data;
1047   int lastRequestedIndexBlock = sumObj->lastRequestedIndexBlock;
1048   double collectionGranularity = sumObj->collectionGranularity;
1049   int indicesPerBlock = sumObj->indicesPerBlock;
1050   
1051   double startTime = lastRequestedIndexBlock*
1052     collectionGranularity * indicesPerBlock;
1053   int numIndicesToGet = (int)floor((currT - startTime)/
1054                                    collectionGranularity);
1055   int numBlocksToGet = numIndicesToGet/indicesPerBlock;
1056   // **TODO** consider limiting the total number of blocks each collection
1057   //   request will pick up. This is to limit the amount of perturbation
1058   //   if it proves to be a problem.
1059   CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
1060
1061    sumProxy.collectSummaryData(startTime, 
1062                        collectionGranularity,
1063                        numBlocksToGet*indicesPerBlock);
1064   // assume success
1065   sumObj->lastRequestedIndexBlock += numBlocksToGet; 
1066 }
1067
1068 void TraceSummaryBOC::collectSummaryData(double startTime, double binSize,
1069                                   int numBins) {
1070   // CkPrintf("[%d] asked to contribute performance data\n", CkMyPe());
1071
1072   double *contribution = new double[numBins];
1073   for (int i=0; i<numBins; i++) {
1074     contribution[i] = 0.0;
1075   }
1076   CkpvAccess(_trace)->fillData(contribution, startTime, binSize, numBins);
1077
1078   /*
1079   for (int i=0; i<numBins; i++) {
1080     CkPrintf("[%d] %f\n", i, contribution[i]);
1081   }
1082   */
1083
1084   CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
1085   CkCallback cb(CkIndex_TraceSummaryBOC::summaryDataCollected(NULL), sumProxy[0]);
1086   contribute(sizeof(double)*numBins, contribution, CkReduction::sum_double, 
1087              cb);
1088 }
1089
1090 void TraceSummaryBOC::summaryDataCollected(CkReductionMsg *msg) {
1091   CkAssert(CkMyPe() == 0);
1092   // **CWL** No memory management for the ccs buffer for now.
1093
1094   // CkPrintf("[%d] Reduction completed and received\n", CkMyPe());
1095   double *recvData = (double *)msg->getData();
1096   int numBins = msg->getSize()/sizeof(double);
1097
1098   // if there's an easier way to append a data block to a CkVec, I'll take it
1099   for (int i=0; i<numBins; i++) {
1100     ccsBufferedData->insertAtEnd(recvData[i]);
1101   }
1102   delete msg;
1103 }
1104
1105
1106
1107
1108 void TraceSummaryBOC::startSumOnly()
1109 {
1110   CmiAssert(CkMyPe() == 0);
1111
1112   CProxy_TraceSummaryBOC p(traceSummaryGID);
1113   int size = CkpvAccess(_trace)->pool()->getNumEntries();
1114   p.askSummary(size);
1115 }
1116
1117 void TraceSummaryBOC::askSummary(int size)
1118 {
1119   if (CkpvAccess(_trace) == NULL) return;
1120
1121   int traced = CkpvAccess(_trace)->traceOnPE();
1122
1123   BinEntry *reductionBuffer = new BinEntry[size+1];
1124   reductionBuffer[size].time() = traced;  // last element is the traced pe count
1125   reductionBuffer[size].getIdleTime() = 0;  // last element is the traced pe count
1126   if (traced) {
1127     CkpvAccess(_trace)->endComputation();
1128     int n = CkpvAccess(_trace)->pool()->getNumEntries();
1129     BinEntry *localBins = CkpvAccess(_trace)->pool()->bins();
1130     if (n>size) n=size;
1131     for (int i=0; i<n; i++) reductionBuffer[i] = localBins[i];
1132   }
1133
1134   contribute(sizeof(BinEntry)*(size+1), reductionBuffer, 
1135              CkReduction::sum_double);
1136   delete [] reductionBuffer;
1137 }
1138
1139 //extern "C" void _CkExit();
1140
1141 void TraceSummaryBOC::sendSummaryBOC(CkReductionMsg *msg)
1142 {
1143   if (CkpvAccess(_trace) == NULL) return;
1144
1145   CkAssert(CkMyPe() == 0);
1146
1147   int n = msg->getSize()/sizeof(BinEntry);
1148   nBins = n-1;
1149   bins = (BinEntry *)msg->getData();
1150   nTracedPEs = (int)bins[n-1].time();
1151   // CmiPrintf("traced: %d entry:%d\n", nTracedPEs, nBins);
1152
1153   write();
1154
1155   delete msg;
1156
1157   CkExit();
1158 }
1159
1160 void TraceSummaryBOC::write(void) 
1161 {
1162   int i;
1163   unsigned int j;
1164
1165   char *fname = new char[strlen(CkpvAccess(traceRoot))+strlen(".sum")+1];
1166   sprintf(fname, "%s.sum", CkpvAccess(traceRoot));
1167   FILE *sumfp = fopen(fname, "w+");
1168   //CmiPrintf("File: %s \n", fname);
1169   if(sumfp == 0)
1170       CmiAbort("Cannot open summary sts file for writing.\n");
1171   delete[] fname;
1172
1173   int _numEntries=_entryTable.size();
1174   fprintf(sumfp, "ver:%3.1f %d/%d count:%d ep:%d interval:%e numTracedPE:%d", CkpvAccess(version), CkMyPe(), CkNumPes(), nBins, _numEntries, CkpvAccess(binSize), nTracedPEs);
1175   fprintf(sumfp, "\n");
1176
1177   // write bin time
1178 #if 0
1179   int last=pool[0].getU();
1180   writeU(fp, last);
1181   int count=1;
1182   for(j=1; j<numEntries; j++) {
1183     int u = pool[j].getU();
1184     if (last == u) {
1185       count++;
1186     }
1187     else {
1188       if (count > 1) fprintf(fp, "+%d", count);
1189       writeU(fp, u);
1190       last = u;
1191       count = 1;
1192     }
1193   }
1194   if (count > 1) fprintf(fp, "+%d", count);
1195 #else
1196   for(j=0; j<nBins; j++) {
1197     bins[j].time() /= nTracedPEs;
1198     bins[j].write(sumfp);
1199   }
1200 #endif
1201   fprintf(sumfp, "\n");
1202   fclose(sumfp);
1203
1204 }
1205
1206 extern "C" void CombineSummary()
1207 {
1208 #if CMK_TRACE_ENABLED
1209   CmiPrintf("[%d] CombineSummary called!\n", CkMyPe());
1210   if (sumonly) {
1211     CmiPrintf("[%d] Sum Only start!\n", CkMyPe());
1212       // pe 0 start the sumonly process
1213     CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
1214     sumProxy[0].startSumOnly();
1215   }
1216   else CkExit();
1217 #else
1218   CkExit();
1219 #endif
1220 }
1221
1222 void initTraceSummaryBOC()
1223 {
1224 #ifdef __BLUEGENE__
1225   if(BgNodeRank()==0) {
1226 #else
1227   if (CkMyRank() == 0) {
1228 #endif
1229     registerExitFn(CombineSummary);
1230   }
1231 }
1232
1233
1234
1235
1236
1237
1238 #include "TraceSummary.def.h"
1239
1240
1241 /*@}*/
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261