handle nested events
[charm.git] / src / ck-perf / trace-summary.C
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 /**
9  * \addtogroup CkPerf
10 */
11 /*@{*/
12
13 #include "charm++.h"
14 #include "trace-summary.h"
15 #include "trace-summaryBOC.h"
16
17 #define DEBUGF(x)  // CmiPrintf x
18
19 #define VER   7.1
20
21 #define INVALIDEP     -2
22 #define TRACEON_EP     -3
23
24 // 1 minutes of run before it'll fill up:
25 #define DefaultBinCount      (1000*60*1) 
26
27 CkpvStaticDeclare(TraceSummary*, _trace);
28 static int _numEvents = 0;
29 #define NUM_DUMMY_EPS 9
30 CkpvDeclare(int, binCount);
31 CkpvDeclare(double, binSize);
32 CkpvDeclare(double, version);
33
34
35 CkpvDeclare(int, previouslySentBins);
36
37
38
39 /** 
40     A class that reads/writes a buffer out of different types of data.
41
42     This class exists because I need to get references to parts of the buffer 
43     that have already been used so that I can increment counters inside the buffer.
44 */
45
46 class compressedBuffer {
47 public:
48   char* buf;
49   int pos; ///<< byte position just beyond the previously read/written data
50
51   compressedBuffer(){
52     buf = NULL;
53     pos = 0;
54   }
55
56   compressedBuffer(int bytes){
57     buf = (char*)malloc(bytes);
58     pos = 0;
59   }
60
61   compressedBuffer(void *buffer){
62     buf = (char*)buffer;
63     pos = 0;
64   }
65     
66   void init(void *buffer){
67     buf = (char*)buffer;
68     pos = 0;
69   }
70     
71   inline void * currentPtr(){
72     return (void*)(buf+pos);
73   }
74
75   template <typename T>
76   T read(int offset){
77     // to resolve unaligned writes causing bus errors, need memcpy
78     T v;
79     memcpy(&v, buf+offset, sizeof(T));
80     return v;
81   }
82     
83   template <typename T>
84   void write(T v, int offset){
85     T v2 = v; // on stack
86     // to resolve unaligned writes causing bus errors, need memcpy
87     memcpy(buf+offset, &v2, sizeof(T));
88   }
89     
90   template <typename T>
91   void increment(int offset){
92     T temp;
93     temp = read<T>(offset);
94     temp ++;
95     write<T>(temp, offset);
96   }
97
98   template <typename T>
99   void accumulate(T v, int offset){
100     T temp;
101     temp = read<T>(offset);
102     temp += v;
103     write<T>(temp, offset);
104   }
105   
106   template <typename T>
107   int push(T v){
108     int oldpos = pos;
109     write<T>(v, pos);
110     pos += sizeof(T);
111     return oldpos;
112   }
113   
114   template <typename T>
115   T pop(){
116     T temp = read<T>(pos);
117     pos += sizeof(T);
118     return temp;
119   }
120
121   template <typename T>
122   T peek(){
123     T temp = read<T>(pos);
124     return temp;
125   }
126
127   template <typename T0, typename T>
128   T peekSecond(){
129     T temp;
130     memcpy(&temp, buf+pos+sizeof(T0), sizeof(T));
131     return temp;
132   }
133
134   int datalength(){
135     return pos;
136   }
137      
138   void * buffer(){
139     return (void*) buf;
140   }  
141
142   void freeBuf(){
143     free(buf);
144   }
145
146   ~compressedBuffer(){
147     // don't free the buf because the user my have supplied the buffer
148   }
149     
150 };
151
152
153
154
155 // Global Readonly
156 CkGroupID traceSummaryGID;
157 bool summaryCcsStreaming;
158
159 int sumonly = 0;
160 int sumDetail = 0;
161
162 /**
163   For each TraceFoo module, _createTraceFoo() must be defined.
164   This function is called in _createTraces() generated in moduleInit.C
165 */
166 void _createTracesummary(char **argv)
167 {
168   DEBUGF(("%d createTraceSummary\n", CkMyPe()));
169   CkpvInitialize(TraceSummary*, _trace);
170   CkpvInitialize(int, previouslySentBins);
171   CkpvAccess(previouslySentBins) = 0;
172   CkpvAccess(_trace) = new  TraceSummary(argv);
173   CkpvAccess(_traces)->addTrace(CkpvAccess(_trace));
174   if (CkMyPe()==0) CkPrintf("Charm++: Tracemode Summary enabled.\n");
175 }
176
177
178 /// function call for starting a phase in trace summary logs 
179 extern "C" 
180 void CkSummary_StartPhase(int phase)
181 {
182    CkpvAccess(_trace)->startPhase(phase);
183 }
184
185
186 /// function call for adding an event mark
187 extern "C" 
188 void CkSummary_MarkEvent(int eventType)
189 {
190    CkpvAccess(_trace)->addEventType(eventType);
191 }
192
193 static inline void writeU(FILE* fp, int u)
194 {
195   fprintf(fp, "%4d", u);
196 }
197
198 PhaseEntry::PhaseEntry() 
199 {
200   int _numEntries=_entryTable.size();
201   // FIXME: Hopes there won't be more than 10 more EP's registered from now on...
202   nEPs = _numEntries+10; 
203   count = new int[nEPs];
204   times = new double[nEPs];
205   maxtimes = new double[nEPs];
206   for (int i=0; i<nEPs; i++) {
207     count[i] = 0;
208     times[i] = 0.0;
209     maxtimes[i] = 0.;
210   }
211 }
212
213 SumLogPool::~SumLogPool() 
214 {
215     if (!sumonly) {
216       write();
217       fclose(fp);
218       if (sumDetail) fclose(sdfp);
219   }
220   // free memory for mark
221   if (markcount > 0)
222   for (int i=0; i<MAX_MARKS; i++) {
223     for (int j=0; j<events[i].length(); j++)
224       delete events[i][j];
225   }
226   delete[] pool;
227   delete[] epInfo;
228   delete[] cpuTime;
229   delete[] numExecutions;
230 }
231
232 void SumLogPool::addEventType(int eventType, double time)
233 {
234    if (eventType <0 || eventType >= MAX_MARKS) {
235        CkPrintf("Invalid event type %d!\n", eventType);
236        return;
237    }
238    MarkEntry *e = new MarkEntry;
239    e->time = time;
240    events[eventType].push_back(e);
241    markcount ++;
242 }
243
244 SumLogPool::SumLogPool(char *pgm) : numBins(0), phaseTab(MAX_PHASES) 
245 {
246    // TBD: Can this be moved to initMem?
247   cpuTime = NULL;
248    poolSize = CkpvAccess(binCount);
249    if (poolSize % 2) poolSize++;        // make sure it is even
250    pool = new BinEntry[poolSize];
251    _MEMCHECK(pool);
252
253    this->pgm = new char[strlen(pgm)+1];
254    strcpy(this->pgm,pgm);
255    
256 #if 0
257    // create the sts file
258    if (CkMyPe() == 0) {
259      char *fname = 
260        new char[strlen(CkpvAccess(traceRoot))+strlen(".sum.sts")+1];
261      sprintf(fname, "%s.sum.sts", CkpvAccess(traceRoot));
262      stsfp = fopen(fname, "w+");
263      //CmiPrintf("File: %s \n", fname);
264      if (stsfp == 0) {
265        CmiAbort("Cannot open summary sts file for writing.\n");
266       }
267      delete[] fname;
268    }
269 #endif
270    
271    // event
272    markcount = 0;
273 }
274
275 void SumLogPool::initMem()
276 {
277    int _numEntries=_entryTable.size();
278    epInfoSize = _numEntries + NUM_DUMMY_EPS + 1; // keep a spare EP
279    epInfo = new SumEntryInfo[epInfoSize];
280    _MEMCHECK(epInfo);
281
282    cpuTime = NULL;
283    numExecutions = NULL;
284    if (sumDetail) {
285        cpuTime = new double[poolSize*epInfoSize];
286        _MEMCHECK(cpuTime);
287        memset(cpuTime, 0, poolSize*epInfoSize*sizeof(double));
288        numExecutions = new int[poolSize*epInfoSize];
289        _MEMCHECK(numExecutions);
290        memset(numExecutions, 0, poolSize*epInfoSize*sizeof(int));
291
292 //         int i, e;
293 //         for(i=0; i<poolSize; i++) {
294 //             for(e=0; e< epInfoSize; e++) {
295 //                 setCPUtime(i,e,0.0);
296 //                 setNumExecutions(i,e,0);
297 //             }
298 //         }
299    }
300 }
301
302 int SumLogPool::getUtilization(int interval, int ep) {
303     return (int)(getCPUtime(interval, ep) * 100.0 / CkpvAccess(binSize)); 
304 }
305
306 void SumLogPool::write(void) 
307 {
308   int i;
309   unsigned int j;
310   int _numEntries=_entryTable.size();
311
312   fp = NULL;
313   sdfp = NULL;
314
315   // create file(s)
316   // CmiPrintf("TRACE: %s:%d\n", fname, errno);
317   if (!sumonly) {
318     char pestr[10];
319     sprintf(pestr, "%d", CkMyPe());
320     int len = strlen(pgm) + strlen(".sumd.") + strlen(pestr) + 1;
321     char *fname = new char[len+1];
322     
323     sprintf(fname, "%s.%s.sum", pgm, pestr);
324     do {
325       fp = fopen(fname, "w+");
326     } while (!fp && errno == EINTR);
327     if (!fp) {
328       CkPrintf("[%d] Attempting to open [%s]\n",CkMyPe(),fname);
329       CmiAbort("Cannot open Summary Trace File for writing...\n");
330     }
331     
332     if (sumDetail) {
333       sprintf(fname, "%s.%s.sumd", pgm, pestr);
334       do {
335         sdfp = fopen(fname, "w+");
336       } while (!sdfp && errno == EINTR);
337       if(!sdfp) {
338         CmiAbort("Cannot open Detailed Summary Trace File for writing...\n");
339       }
340     }
341     delete[] fname;
342   }
343
344   fprintf(fp, "ver:%3.1f %d/%d count:%d ep:%d interval:%e", CkpvAccess(version), CkMyPe(), CkNumPes(), numBins, _numEntries, CkpvAccess(binSize));
345   if (CkpvAccess(version)>=3.0)
346   {
347     fprintf(fp, " phases:%d", phaseTab.numPhasesCalled());
348   }
349   fprintf(fp, "\n");
350
351   // write bin time
352 #if 1
353   int last=pool[0].getU();
354   writeU(fp, last);
355   int count=1;
356   for(j=1; j<numBins; j++) {
357     int u = pool[j].getU();
358     if (last == u) {
359       count++;
360     }
361     else {
362       if (count > 1) fprintf(fp, "+%d", count);
363       writeU(fp, u);
364       last = u;
365       count = 1;
366     }
367   }
368   if (count > 1) fprintf(fp, "+%d", count);
369 #else
370   for(j=0; j<numEntries; j++) 
371       pool[j].write(fp);
372 #endif
373   fprintf(fp, "\n");
374
375   // write entry execution time
376   fprintf(fp, "EPExeTime: ");
377   for (i=0; i<_numEntries; i++)
378     fprintf(fp, "%ld ", (long)(epInfo[i].epTime*1.0e6));
379   fprintf(fp, "\n");
380   // write entry function call times
381   fprintf(fp, "EPCallTime: ");
382   for (i=0; i<_numEntries; i++)
383     fprintf(fp, "%d ", epInfo[i].epCount);
384   fprintf(fp, "\n");
385   // write max entry function execute times
386   fprintf(fp, "MaxEPTime: ");
387   for (i=0; i<_numEntries; i++)
388     fprintf(fp, "%ld ", (long)(epInfo[i].epMaxTime*1.0e6));
389   fprintf(fp, "\n");
390 #if 0
391   for (i=0; i<SumEntryInfo::HIST_SIZE; i++) {
392     for (j=0; j<_numEntries; j++) 
393       fprintf(fp, "%d ", epInfo[j].hist[i]);
394     fprintf(fp, "\n");
395   }
396 #endif
397   // write marks
398   if (CkpvAccess(version)>=2.0) 
399   {
400     fprintf(fp, "NumMarks: %d ", markcount);
401     for (i=0; i<MAX_MARKS; i++) {
402       for(int j=0; j<events[i].length(); j++)
403         fprintf(fp, "%d %f ", i, events[i][j]->time);
404     }
405     fprintf(fp, "\n");
406   }
407   // write phases
408   if (CkpvAccess(version)>=3.0)
409   {
410     phaseTab.write(fp);
411   }
412   // write idle time
413   if (CkpvAccess(version)>=7.1) {
414     fprintf(fp, "IdlePercent: ");
415     int last=pool[0].getUIdle();
416     writeU(fp, last);
417     int count=1;
418     for(j=1; j<numBins; j++) {
419       int u = pool[j].getUIdle();
420       if (last == u) {
421         count++;
422       }
423       else {
424         if (count > 1) fprintf(fp, "+%d", count);
425         writeU(fp, u);
426         last = u;
427         count = 1;
428       }
429     }
430     if (count > 1) fprintf(fp, "+%d", count);
431     fprintf(fp, "\n");
432   }
433
434   //CkPrintf("writing to detail file:%d    %d \n", getNumEntries(), numBins);
435   // write summary details
436   if (sumDetail) {
437         fprintf(sdfp, "ver:%3.1f cpu:%d/%d numIntervals:%d numEPs:%d intervalSize:%e\n",
438                 CkpvAccess(version), CkMyPe(), CkNumPes(),
439                 numBins, _numEntries, CkpvAccess(binSize));
440
441         // Write out cpuTime in microseconds
442         // Run length encoding (RLE) along EP axis
443         fprintf(sdfp, "ExeTimePerEPperInterval ");
444         unsigned int e, i;
445         long last= (long) (getCPUtime(0,0)*1.0e6);
446         int count=0;
447         fprintf(sdfp, "%ld", last);
448         for(e=0; e<_numEntries; e++) {
449             for(i=0; i<numBins; i++) {
450
451                 long u= (long) (getCPUtime(i,e)*1.0e6);
452                 if (last == u) {
453                     count++;
454                 } else {
455
456                     if (count > 1) fprintf(sdfp, "+%d", count);
457                     fprintf(sdfp, " %ld", u);
458                     last = u;
459                     count = 1;
460                 }
461             }
462         }
463         if (count > 1) fprintf(sdfp, "+%d", count);
464         fprintf(sdfp, "\n");
465         // Write out numExecutions
466         // Run length encoding (RLE) along EP axis
467         fprintf(sdfp, "EPCallTimePerInterval ");
468         last= getNumExecutions(0,0);
469         count=0;
470         fprintf(sdfp, "%ld", last);
471         for(e=0; e<_numEntries; e++) {
472             for(i=0; i<numBins; i++) {
473
474                 long u= getNumExecutions(i, e);
475                 if (last == u) {
476                     count++;
477                 } else {
478
479                     if (count > 1) fprintf(sdfp, "+%d", count);
480                     fprintf(sdfp, " %ld", u);
481                     last = u;
482                     count = 1;
483                 }
484             }
485         }
486         if (count > 1) fprintf(sdfp, "+%d", count);
487         fprintf(sdfp, "\n");
488   }
489 }
490
491 void SumLogPool::writeSts(void)
492 {
493     // open sts file
494   char *fname = 
495        new char[strlen(CkpvAccess(traceRoot))+strlen(".sum.sts")+1];
496   sprintf(fname, "%s.sum.sts", CkpvAccess(traceRoot));
497   stsfp = fopen(fname, "w+");
498   //CmiPrintf("File: %s \n", fname);
499   if (stsfp == 0) {
500        CmiAbort("Cannot open summary sts file for writing.\n");
501   }
502   delete[] fname;
503
504   traceWriteSTS(stsfp,_numEvents);
505   for(int i=0;i<_numEvents;i++)
506     fprintf(stsfp, "EVENT %d Event%d\n", i, i);
507   fprintf(stsfp, "END\n");
508
509   fclose(stsfp);
510 }
511
512 // Called once per interval
513 void SumLogPool::add(double time, double idleTime, int pe) 
514 {
515   new (&pool[numBins++]) BinEntry(time, idleTime);
516   if (poolSize==numBins) {
517     shrink();
518   }
519 }
520
521 // Called once per run of an EP
522 // adds 'time' to EP's time, increments epCount
523 void SumLogPool::setEp(int epidx, double time) 
524 {
525   if (epidx >= epInfoSize) {
526         CmiAbort("Invalid entry point!!\n");
527   }
528   //CmiPrintf("set EP: %d %e \n", epidx, time);
529   epInfo[epidx].setTime(time);
530   // set phase table counter
531   phaseTab.setEp(epidx, time);
532 }
533
534 // Called once from endExecute, endPack, etc. this function updates
535 // the sumDetail intervals.
536 void SumLogPool::updateSummaryDetail(int epIdx, double startTime, double endTime)
537 {
538         if (epIdx >= epInfoSize) {
539             CmiAbort("Too many entry points!!\n");
540         }
541
542         double binSz = CkpvAccess(binSize);
543         int startingBinIdx, endingBinIdx;
544         startingBinIdx = (int)(startTime/binSz);
545         endingBinIdx = (int)(endTime/binSz);
546         // shrink if needed
547         while (endingBinIdx >= poolSize) {
548           shrink();
549           CmiAssert(CkpvAccess(binSize) > binSz);
550           binSz = CkpvAccess(binSize);
551           startingBinIdx = (int)(startTime/binSz);
552           endingBinIdx = (int)(endTime/binSz);
553         }
554
555         if (startingBinIdx == endingBinIdx) {
556             addToCPUtime(startingBinIdx, epIdx, endTime - startTime);
557         } else if (startingBinIdx < endingBinIdx) { // EP spans intervals
558             addToCPUtime(startingBinIdx, epIdx, (startingBinIdx+1)*binSz - startTime);
559             while(++startingBinIdx < endingBinIdx)
560                 addToCPUtime(startingBinIdx, epIdx, binSz);
561             addToCPUtime(endingBinIdx, epIdx, endTime - endingBinIdx*binSz);
562         } else {
563           CkPrintf("[%d] EP:%d Start:%lf End:%lf\n",CkMyPe(),epIdx,
564                    startTime, endTime);
565             CmiAbort("Error: end time of EP is less than start time\n");
566         }
567
568         incNumExecutions(startingBinIdx, epIdx);
569 }
570
571 // Shrinks pool[], cpuTime[], and numExecutions[]
572 void SumLogPool::shrink(void)
573 {
574 //  double t = CmiWallTimer();
575
576   // We ensured earlier that poolSize is even; therefore now numBins
577   // == poolSize == even.
578   int entries = numBins/2;
579   for (int i=0; i<entries; i++)
580   {
581      pool[i].time() = pool[i*2].time() + pool[i*2+1].time();
582      pool[i].getIdleTime() = pool[i*2].getIdleTime() + pool[i*2+1].getIdleTime();
583      if (sumDetail)
584      for (int e=0; e < epInfoSize; e++) {
585          setCPUtime(i, e, getCPUtime(i*2, e) + getCPUtime(i*2+1, e));
586          setNumExecutions(i, e, getNumExecutions(i*2, e) + getNumExecutions(i*2+1, e));
587      }
588   }
589   // zero out the remaining intervals
590   if (sumDetail) {
591     memset(&cpuTime[entries*epInfoSize], 0, (numBins-entries)*epInfoSize*sizeof(double));
592     memset(&numExecutions[entries*epInfoSize], 0, (numBins-entries)*epInfoSize*sizeof(int));
593   }
594   numBins = entries;
595   CkpvAccess(binSize) *= 2;
596
597 //CkPrintf("Shrinked binsize: %f entries:%d takes %fs!!!!\n", CkpvAccess(binSize), numEntries, CmiWallTimer()-t);
598 }
599
600 void SumLogPool::shrink(double _maxBinSize)
601 {
602     while(CkpvAccess(binSize) < _maxBinSize)
603     {
604         shrink();
605     };
606 }
607 int  BinEntry::getU() 
608
609   return (int)(_time * 100.0 / CkpvAccess(binSize)); 
610 }
611
612 int BinEntry::getUIdle() {
613   return (int)(_idleTime * 100.0 / CkpvAccess(binSize));
614 }
615
616 void BinEntry::write(FILE* fp)
617 {
618   writeU(fp, getU());
619 }
620
621 TraceSummary::TraceSummary(char **argv):binStart(0.0),idleStart(0.0),
622                                         binTime(0.0),binIdle(0.0),msgNum(0)
623 {
624   if (CkpvAccess(traceOnPe) == 0) return;
625
626     // use absolute time
627   if (CmiTimerAbsolute()) binStart = CmiInitTime();
628
629   CkpvInitialize(int, binCount);
630   CkpvInitialize(double, binSize);
631   CkpvInitialize(double, version);
632   CkpvAccess(binSize) = BIN_SIZE;
633   CkpvAccess(version) = VER;
634   CkpvAccess(binCount) = DefaultBinCount;
635   if (CmiGetArgIntDesc(argv,"+bincount",&CkpvAccess(binCount), "Total number of summary bins"))
636     if (CkMyPe() == 0) 
637       CmiPrintf("Trace: bincount: %d\n", CkpvAccess(binCount));
638   CmiGetArgDoubleDesc(argv,"+binsize",&CkpvAccess(binSize),
639         "CPU usage log time resolution");
640   CmiGetArgDoubleDesc(argv,"+version",&CkpvAccess(version),
641         "Write this .sum file version");
642
643   epThreshold = 0.001; 
644   CmiGetArgDoubleDesc(argv,"+epThreshold",&epThreshold,
645         "Execution time histogram lower bound");
646   epInterval = 0.001; 
647   CmiGetArgDoubleDesc(argv,"+epInterval",&epInterval,
648         "Execution time histogram bin size");
649
650   sumonly = CmiGetArgFlagDesc(argv, "+sumonly", "merge histogram bins on processor 0");
651   // +sumonly overrides +sumDetail
652   if (!sumonly)
653       sumDetail = CmiGetArgFlagDesc(argv, "+sumDetail", "more detailed summary info");
654
655   _logPool = new SumLogPool(CkpvAccess(traceRoot));
656   // assume invalid entry point on start
657   execEp=INVALIDEP;
658   inIdle = 0;
659   inExec = 0;
660   depth = 0;
661 }
662
663 void TraceSummary::traceClearEps(void)
664 {
665   _logPool->clearEps();
666 }
667
668 void TraceSummary::traceWriteSts(void)
669 {
670   if(CkMyPe()==0)
671       _logPool->writeSts();
672 }
673
674 void TraceSummary::traceClose(void)
675 {
676     if(CkMyPe()==0)
677         _logPool->writeSts();
678     CkpvAccess(_trace)->endComputation();
679
680     delete _logPool;
681     CkpvAccess(_traces)->removeTrace(this);
682 }
683
684 void TraceSummary::beginExecute(CmiObjId *tid)
685 {
686   beginExecute(-1,-1,_threadEP,-1);
687 }
688
689 void TraceSummary::beginExecute(envelope *e)
690 {
691   // no message means thread execution
692   if (e==NULL) {
693     beginExecute(-1,-1,_threadEP,-1);
694   }
695   else {
696     beginExecute(-1,-1,e->getEpIdx(),-1);
697   }  
698 }
699
700 void TraceSummary::beginExecute(char *msg)
701 {
702 #if CMK_SMP_TRACE_COMMTHREAD
703     //This function is called from comm thread in SMP mode
704     envelope *e = (envelope *)msg;
705     int num = _entryTable.size();
706     int ep = e->getEpIdx();
707     if(ep<0 || ep>=num) return;
708     if(_entryTable[ep]->traceEnabled)
709         beginExecute(-1,-1,e->getEpIdx(),-1);
710 #endif
711 }
712
713 void TraceSummary::beginExecute(int event,int msgType,int ep,int srcPe, int mlen, CmiObjId *idx)
714 {
715   if (execEp == TRACEON_EP) {
716     endExecute();
717   }
718   CmiAssert(inIdle == 0);
719   if (inExec == 0) {
720     CmiAssert(depth == 0);
721     inExec = 1;
722   }
723   depth ++;
724   // printf("BEGIN exec: %d %d %d\n", inIdle, inExec, depth);
725
726   if (depth > 1) return;          //  nested
727
728 /*
729   if (execEp != INVALIDEP) {
730     TRACE_WARN("Warning: TraceSummary two consecutive BEGIN_PROCESSING!\n");
731     return;
732   }
733 */
734   
735   execEp=ep;
736   double t = TraceTimer();
737   //CmiPrintf("start: %f \n", start);
738   
739   start = t;
740   double ts = binStart;
741   // fill gaps
742   while ((ts = ts + CkpvAccess(binSize)) < t) {
743     /* Keep as a template for error checking. The current form of this check
744        is vulnerable to round-off errors (eg. 0.001 vs 0.001 the first time
745        I used it). Perhaps an improved form could be used in case vastly
746        incompatible EP vs idle times are reported (binSize*2?).
747
748        This check will have to be duplicated before each call to add()
749
750     CkPrintf("[%d] %f vs %f\n", CkMyPe(),
751              binTime + binIdle, CkpvAccess(binSize));
752     CkAssert(binTime + binIdle <= CkpvAccess(binSize));
753     */
754      _logPool->add(binTime, binIdle, CkMyPe()); // add leftovers of last bin
755      binTime=0.0;                 // fill all other bins with 0 up till start
756      binIdle = 0.0;
757      binStart = ts;
758   }
759 }
760
761 void TraceSummary::endExecute()
762 {
763   CmiAssert(inIdle == 0 && inExec == 1);
764   depth --;
765   if (depth == 0) inExec = 0;
766   CmiAssert(depth >= 0);
767   // printf("END exec: %d %d %d\n", inIdle, inExec, depth);
768
769   if (depth != 0) return;
770  
771   double t = TraceTimer();
772   double ts = start;
773   double nts = binStart;
774
775 /*
776   if (execEp == TRACEON_EP) {
777     // if trace just got turned on, then one expects to see this
778     // END_PROCESSING event without seeing a preceeding BEGIN_PROCESSING
779     return;
780   }
781 */
782
783   if (execEp == INVALIDEP) {
784     TRACE_WARN("Warning: TraceSummary END_PROCESSING without BEGIN_PROCESSING!\n");
785     return;
786   }
787
788   if (execEp >= 0)
789   {
790     _logPool->setEp(execEp, t-ts);
791   }
792
793   while ((nts = nts + CkpvAccess(binSize)) < t)
794   {
795     // fill the bins with time for this entry method
796      binTime += nts-ts;
797      binStart  = nts;
798      // This calls shrink() if needed
799      _logPool->add(binTime, binIdle, CkMyPe()); 
800      binTime = 0.0;
801      binIdle = 0.0;
802      ts = nts;
803   }
804   binTime += t - ts;
805
806   if (sumDetail && execEp >= 0 )
807       _logPool->updateSummaryDetail(execEp, start, t);
808
809   execEp = INVALIDEP;
810 }
811
812 void TraceSummary::endExecute(char *msg){
813 #if CMK_SMP_TRACE_COMMTHREAD
814     //This function is called from comm thread in SMP mode
815     envelope *e = (envelope *)msg;
816     int num = _entryTable.size();
817     int ep = e->getEpIdx();
818     if(ep<0 || ep>=num) return;
819     if(_entryTable[ep]->traceEnabled){
820         endExecute();
821     }
822 #endif    
823 }
824
825 void TraceSummary::beginIdle(double currT)
826 {
827   if (execEp == TRACEON_EP) {
828     endExecute();
829   }
830
831   CmiAssert(inIdle == 0 && inExec == 0);
832   inIdle = 1;
833   //printf("BEGIN idle: %d %d %d\n", inIdle, inExec, depth);
834
835   double t = TraceTimer(currT);
836   
837   // mark the time of this idle period. Only the next endIdle should see
838   // this value
839   idleStart = t; 
840   double ts = binStart;
841   // fill gaps
842   while ((ts = ts + CkpvAccess(binSize)) < t) {
843     _logPool->add(binTime, binIdle, CkMyPe()); // add leftovers of last bin
844     binTime=0.0;                 // fill all other bins with 0 up till start
845     binIdle = 0.0;
846     binStart = ts;
847   }
848 }
849
850 void TraceSummary::endIdle(double currT)
851 {
852   CmiAssert(inIdle == 1 && inExec == 0);
853   inIdle = 0;
854   // printf("END idle: %d %d %d\n", inIdle, inExec, depth);
855
856   double t = TraceTimer(currT);
857   double t_idleStart = idleStart;
858   double t_binStart = binStart;
859
860   while ((t_binStart = t_binStart + CkpvAccess(binSize)) < t)
861   {
862     // fill the bins with time for idle
863     binIdle += t_binStart - t_idleStart;
864     binStart = t_binStart;
865     _logPool->add(binTime, binIdle, CkMyPe()); // This calls shrink() if needed
866     binTime = 0.0;
867     binIdle = 0.0;
868     t_idleStart = t_binStart;
869   }
870   binIdle += t - t_idleStart;
871 }
872
873 void TraceSummary::traceBegin(void)
874 {
875     // fake as a start of an event, assuming traceBegin is called inside an
876     // entry function.
877   beginExecute(-1, -1, TRACEON_EP, -1, -1);
878 }
879
880 void TraceSummary::traceEnd(void)
881 {
882   endExecute();
883 }
884
885 void TraceSummary::beginPack(void)
886 {
887     packstart = CmiWallTimer();
888 }
889
890 void TraceSummary::endPack(void)
891 {
892     _logPool->setEp(_packEP, CmiWallTimer() - packstart);
893     if (sumDetail)
894         _logPool->updateSummaryDetail(_packEP,  TraceTimer(packstart), TraceTimer(CmiWallTimer()));
895 }
896
897 void TraceSummary::beginUnpack(void)
898 {
899     unpackstart = CmiWallTimer();
900 }
901
902 void TraceSummary::endUnpack(void)
903 {
904     _logPool->setEp(_unpackEP, CmiWallTimer()-unpackstart);
905     if (sumDetail)
906         _logPool->updateSummaryDetail(_unpackEP,  TraceTimer(unpackstart), TraceTimer(CmiWallTimer()));
907 }
908
909 void TraceSummary::beginComputation(void)
910 {
911   // initialze arrays because now the number of entries is known.
912   _logPool->initMem();
913 }
914
915 void TraceSummary::endComputation(void)
916 {
917   static int done = 0;
918   if (done) return;
919   done = 1;
920   if (msgNum==0) {
921 //CmiPrintf("Add at last: %d pe:%d time:%f msg:%d\n", index, CkMyPe(), bin, msgNum);
922      _logPool->add(binTime, binIdle, CkMyPe());
923      binTime = 0.0;
924      binIdle = 0.0;
925      msgNum ++;
926
927      binStart  += CkpvAccess(binSize);
928      double t = TraceTimer();
929      double ts = binStart;
930      while (ts < t)
931      {
932        _logPool->add(binTime, binIdle, CkMyPe());
933        binTime=0.0;
934        binIdle = 0.0;
935        ts += CkpvAccess(binSize);
936      }
937
938   }
939 }
940
941 void TraceSummary::addEventType(int eventType)
942 {
943   _logPool->addEventType(eventType, TraceTimer());
944 }
945
946 void TraceSummary::startPhase(int phase)
947 {
948    _logPool->startPhase(phase);
949 }
950
951 void TraceSummary::traceEnableCCS() {
952   CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
953   sumProxy.initCCS();
954 }
955
956
957 void TraceSummary::fillData(double *buffer, double reqStartTime, 
958                             double reqBinSize, int reqNumBins) {
959   // buffer has to be pre-allocated by the requester and must be an array of
960   // size reqNumBins.
961   //
962   // Assumptions: **CWL** FOR DEMO ONLY - a production-capable version will
963   //              need a number of these assumptions dropped:
964   //              1) reqBinSize == binSize (unrealistic)
965   //              2) bins boundary aligned (ok even under normal circumstances)
966   //              3) bins are "factor"-aligned (where reqBinSize != binSize)
967   //              4) bins are always available (true unless flush)
968   //              5) bins always starts from 0 (unrealistic)
969
970   // works only because of 1)
971   // **CWL** - FRACKING STUPID NAME "binStart" has nothing to do with 
972   //           "starting" at all!
973   int binOffset = (int)(reqStartTime/reqBinSize); 
974   for (int i=binOffset; i<binOffset + reqNumBins; i++) {
975     // CkPrintf("[%d] %f\n", i, pool()->getTime(i));
976     buffer[i-binOffset] = pool()->getTime(i);
977   }
978 }
979
980 void TraceSummaryBOC::traceSummaryParallelShutdown(int pe) {
981    
982     UInt    numBins = CkpvAccess(_trace)->pool()->getNumEntries();  
983     //CkPrintf("trace shut down pe=%d bincount=%d\n", CkMyPe(), numBins);
984     CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
985     CkCallback cb(CkIndex_TraceSummaryBOC::maxBinSize(NULL), sumProxy[0]);
986     contribute(sizeof(double), &(CkpvAccess(binSize)), CkReduction::max_double, cb);
987 }
988
989 // collect the max bin size
990 void TraceSummaryBOC::maxBinSize(CkReductionMsg *msg)
991 {
992     double _maxBinSize = *((double *)msg->getData());
993     CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
994     sumProxy.shrink(_maxBinSize);
995 }
996
997 void TraceSummaryBOC::shrink(double _mBin){
998     UInt    numBins = CkpvAccess(_trace)->pool()->getNumEntries();  
999     UInt    epNums  = CkpvAccess(_trace)->pool()->getEpInfoSize();
1000     _maxBinSize = _mBin;
1001     if(CkpvAccess(binSize) < _maxBinSize)
1002     {
1003         CkpvAccess(_trace)->pool()->shrink(_maxBinSize);
1004     }
1005     double *sumData = CkpvAccess(_trace)->pool()->getCpuTime();  
1006     CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
1007     CkCallback cb(CkIndex_TraceSummaryBOC::sumData(NULL), sumProxy[0]);
1008     contribute(sizeof(double) * numBins * epNums, CkpvAccess(_trace)->pool()->getCpuTime(), CkReduction::sum_double, cb);
1009 }
1010
1011 void TraceSummaryBOC::sumData(CkReductionMsg *msg) {
1012     double *sumData = (double *)msg->getData();
1013     int     totalsize = msg->getSize()/sizeof(double);
1014     UInt    epNums  = CkpvAccess(_trace)->pool()->getEpInfoSize();
1015     UInt    numBins = totalsize/epNums;  
1016     int     numEntries = epNums - NUM_DUMMY_EPS - 1; 
1017     char    *fname = new char[strlen(CkpvAccess(traceRoot))+strlen(".sumall")+1];
1018     sprintf(fname, "%s.sumall", CkpvAccess(traceRoot));
1019     FILE *sumfp = fopen(fname, "w+");
1020     fprintf(sumfp, "ver:%3.1f cpu:%d numIntervals:%d numEPs:%d intervalSize:%e\n",
1021                 CkpvAccess(version), CkNumPes(),
1022                 numBins, numEntries, _maxBinSize);
1023     for(int i=0; i<numBins; i++){
1024         for(int j=0; j<numEntries; j++)
1025         {
1026             fprintf(sumfp, "%ld ", (long)(sumData[i*epNums+j]*1.0e6));
1027         }
1028     }
1029    fclose(sumfp);
1030    //CkPrintf("done with analysis\n");
1031    CkExit();
1032 }
1033
1034 /// for TraceSummaryBOC
1035
1036 void TraceSummaryBOC::initCCS() {
1037   if(firstTime){
1038     CkPrintf("[%d] initCCS() called for first time\n", CkMyPe());
1039     // initializing CCS-based parameters on all processors
1040     lastRequestedIndexBlock = 0;
1041     indicesPerBlock = 1000;
1042     collectionGranularity = 0.001; // time in seconds
1043     nBufferedBins = 0;
1044     
1045     // initialize buffer, register CCS handler and start the collection
1046     // pulse only on pe 0.
1047     if (CkMyPe() == 0) { 
1048       ccsBufferedData = new CkVec<double>();
1049     
1050       CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
1051       CkPrintf("Trace Summary now listening in for CCS Client\n");
1052       CcsRegisterHandler("CkPerfSummaryCcsClientCB", 
1053                          CkCallback(CkIndex_TraceSummaryBOC::ccsRequestSummaryDouble(NULL), sumProxy[0]));
1054       CcsRegisterHandler("CkPerfSummaryCcsClientCB uchar", 
1055                          CkCallback(CkIndex_TraceSummaryBOC::ccsRequestSummaryUnsignedChar(NULL), sumProxy[0])); 
1056
1057       CkPrintf("[%d] Setting up periodic startCollectData callback\n", CkMyPe());
1058       CcdCallOnConditionKeep(CcdPERIODIC_1second, startCollectData,
1059                              (void *)this);
1060       summaryCcsStreaming = CmiTrue;
1061     }
1062     firstTime = false;
1063   }
1064 }
1065
1066 /** Return summary information as double precision values for each sample period. 
1067     The actual data collection is in double precision values. 
1068
1069     The units on the returned values are total execution time across all PEs.
1070 */
1071 void TraceSummaryBOC::ccsRequestSummaryDouble(CkCcsRequestMsg *m) {
1072   double *sendBuffer;
1073
1074   CkPrintf("[%d] Request from Client detected.\n", CkMyPe());
1075
1076   CkPrintf("Responding ...\n");
1077   int datalength = 0;
1078   // if we have no data to send, send an acknowledgement code of -13.37
1079   // instead.
1080   if (ccsBufferedData->length() == 0) {
1081     sendBuffer = new double[1];
1082     sendBuffer[0] = -13.37;
1083     datalength = sizeof(double);
1084     CcsSendDelayedReply(m->reply, datalength, (void *)sendBuffer);
1085     delete [] sendBuffer;
1086   } else {
1087     sendBuffer = ccsBufferedData->getVec();
1088     datalength = ccsBufferedData->length()*sizeof(double);
1089     CcsSendDelayedReply(m->reply, datalength, (void *)sendBuffer);
1090     ccsBufferedData->free();
1091   }
1092   CkPrintf("Response Sent. Proceeding with computation.\n");
1093   delete m;
1094 }
1095
1096
1097 /** Return summary information as unsigned char values for each sample period. 
1098     The actual data collection is in double precision values.
1099
1100     This returns the utilization in a range from 0 to 200.
1101 */
1102 void TraceSummaryBOC::ccsRequestSummaryUnsignedChar(CkCcsRequestMsg *m) {
1103   unsigned char *sendBuffer;
1104
1105   CkPrintf("[%d] Request from Client detected. \n", CkMyPe());
1106
1107   CkPrintf("Responding ...\n");
1108   int datalength = 0;
1109
1110   if (ccsBufferedData->length() == 0) {
1111     sendBuffer = new unsigned char[1];
1112     sendBuffer[0] = 255;
1113     datalength = sizeof(unsigned char);
1114     CcsSendDelayedReply(m->reply, datalength, (void *)sendBuffer);
1115     delete [] sendBuffer;
1116   } else {
1117     double * doubleData = ccsBufferedData->getVec();
1118     int numData = ccsBufferedData->length();
1119     
1120     // pack data into unsigned char array
1121     sendBuffer = new unsigned char[numData];
1122     
1123     for(int i=0;i<numData;i++){
1124       sendBuffer[i] = 1000.0 * doubleData[i] / (double)CkNumPes() * 200.0; // max = 200 is the same as 100% utilization
1125     }    
1126
1127     datalength = sizeof(unsigned char) * numData;
1128     
1129     CcsSendDelayedReply(m->reply, datalength, (void *)sendBuffer);
1130     ccsBufferedData->free();
1131     delete [] sendBuffer;
1132   }
1133   CkPrintf("Response Sent. Proceeding with computation.\n");
1134   delete m;
1135 }
1136
1137
1138
1139 void startCollectData(void *data, double currT) {
1140   CkAssert(CkMyPe() == 0);
1141   // CkPrintf("startCollectData()\n");
1142   TraceSummaryBOC *sumObj = (TraceSummaryBOC *)data;
1143   int lastRequestedIndexBlock = sumObj->lastRequestedIndexBlock;
1144   double collectionGranularity = sumObj->collectionGranularity;
1145   int indicesPerBlock = sumObj->indicesPerBlock;
1146   
1147   double startTime = lastRequestedIndexBlock*
1148     collectionGranularity * indicesPerBlock;
1149   int numIndicesToGet = (int)floor((currT - startTime)/
1150                                    collectionGranularity);
1151   int numBlocksToGet = numIndicesToGet/indicesPerBlock;
1152   // **TODO** consider limiting the total number of blocks each collection
1153   //   request will pick up. This is to limit the amount of perturbation
1154   //   if it proves to be a problem.
1155   CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
1156
1157    sumProxy.collectSummaryData(startTime, 
1158                        collectionGranularity,
1159                        numBlocksToGet*indicesPerBlock);
1160   // assume success
1161   sumObj->lastRequestedIndexBlock += numBlocksToGet; 
1162 }
1163
1164 void TraceSummaryBOC::collectSummaryData(double startTime, double binSize,
1165                                   int numBins) {
1166   // CkPrintf("[%d] asked to contribute performance data\n", CkMyPe());
1167
1168   double *contribution = new double[numBins];
1169   for (int i=0; i<numBins; i++) {
1170     contribution[i] = 0.0;
1171   }
1172   CkpvAccess(_trace)->fillData(contribution, startTime, binSize, numBins);
1173
1174   /*
1175   for (int i=0; i<numBins; i++) {
1176     CkPrintf("[%d] %f\n", i, contribution[i]);
1177   }
1178   */
1179
1180   CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
1181   CkCallback cb(CkIndex_TraceSummaryBOC::summaryDataCollected(NULL), sumProxy[0]);
1182   contribute(sizeof(double)*numBins, contribution, CkReduction::sum_double, 
1183              cb);
1184 }
1185
1186 void TraceSummaryBOC::summaryDataCollected(CkReductionMsg *msg) {
1187   CkAssert(CkMyPe() == 0);
1188   // **CWL** No memory management for the ccs buffer for now.
1189
1190   // CkPrintf("[%d] Reduction completed and received\n", CkMyPe());
1191   double *recvData = (double *)msg->getData();
1192   int numBins = msg->getSize()/sizeof(double);
1193
1194   // if there's an easier way to append a data block to a CkVec, I'll take it
1195   for (int i=0; i<numBins; i++) {
1196     ccsBufferedData->insertAtEnd(recvData[i]);
1197   }
1198   delete msg;
1199 }
1200
1201
1202
1203
1204 void TraceSummaryBOC::startSumOnly()
1205 {
1206   CmiAssert(CkMyPe() == 0);
1207
1208   CProxy_TraceSummaryBOC p(traceSummaryGID);
1209   int size = CkpvAccess(_trace)->pool()->getNumEntries();
1210   p.askSummary(size);
1211 }
1212
1213 void TraceSummaryBOC::askSummary(int size)
1214 {
1215   if (CkpvAccess(_trace) == NULL) return;
1216
1217   int traced = CkpvAccess(_trace)->traceOnPE();
1218
1219   BinEntry *reductionBuffer = new BinEntry[size+1];
1220   reductionBuffer[size].time() = traced;  // last element is the traced pe count
1221   reductionBuffer[size].getIdleTime() = 0;  // last element is the traced pe count
1222   if (traced) {
1223     CkpvAccess(_trace)->endComputation();
1224     int n = CkpvAccess(_trace)->pool()->getNumEntries();
1225     BinEntry *localBins = CkpvAccess(_trace)->pool()->bins();
1226     if (n>size) n=size;
1227     for (int i=0; i<n; i++) reductionBuffer[i] = localBins[i];
1228   }
1229
1230   contribute(sizeof(BinEntry)*(size+1), reductionBuffer, 
1231              CkReduction::sum_double);
1232   delete [] reductionBuffer;
1233 }
1234
1235 //extern "C" void _CkExit();
1236
1237 void TraceSummaryBOC::sendSummaryBOC(CkReductionMsg *msg)
1238 {
1239   if (CkpvAccess(_trace) == NULL) return;
1240
1241   CkAssert(CkMyPe() == 0);
1242
1243   int n = msg->getSize()/sizeof(BinEntry);
1244   nBins = n-1;
1245   bins = (BinEntry *)msg->getData();
1246   nTracedPEs = (int)bins[n-1].time();
1247   // CmiPrintf("traced: %d entry:%d\n", nTracedPEs, nBins);
1248
1249   write();
1250
1251   delete msg;
1252
1253   CkExit();
1254 }
1255
1256 void TraceSummaryBOC::write(void) 
1257 {
1258   unsigned int j;
1259
1260   char *fname = new char[strlen(CkpvAccess(traceRoot))+strlen(".sum")+1];
1261   sprintf(fname, "%s.sum", CkpvAccess(traceRoot));
1262   FILE *sumfp = fopen(fname, "w+");
1263   //CmiPrintf("File: %s \n", fname);
1264   if(sumfp == 0)
1265       CmiAbort("Cannot open summary sts file for writing.\n");
1266   delete[] fname;
1267
1268   int _numEntries=_entryTable.size();
1269   fprintf(sumfp, "ver:%3.1f %d/%d count:%d ep:%d interval:%e numTracedPE:%d", CkpvAccess(version), CkMyPe(), CkNumPes(), nBins, _numEntries, CkpvAccess(binSize), nTracedPEs);
1270   fprintf(sumfp, "\n");
1271
1272   // write bin time
1273 #if 0
1274   int last=pool[0].getU();
1275   writeU(fp, last);
1276   int count=1;
1277   for(j=1; j<numEntries; j++) {
1278     int u = pool[j].getU();
1279     if (last == u) {
1280       count++;
1281     }
1282     else {
1283       if (count > 1) fprintf(fp, "+%d", count);
1284       writeU(fp, u);
1285       last = u;
1286       count = 1;
1287     }
1288   }
1289   if (count > 1) fprintf(fp, "+%d", count);
1290 #else
1291   for(j=0; j<nBins; j++) {
1292     bins[j].time() /= nTracedPEs;
1293     bins[j].write(sumfp);
1294   }
1295 #endif
1296   fprintf(sumfp, "\n");
1297   fclose(sumfp);
1298
1299 }
1300
1301 extern "C" void CombineSummary()
1302 {
1303 #if CMK_TRACE_ENABLED
1304   CmiPrintf("[%d] CombineSummary called!\n", CkMyPe());
1305   if (sumonly) {
1306     CmiPrintf("[%d] Sum Only start!\n", CkMyPe());
1307       // pe 0 start the sumonly process
1308     CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
1309     sumProxy[0].startSumOnly();
1310   }else if(sumDetail)
1311   {
1312       CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
1313       sumProxy.traceSummaryParallelShutdown(-1);
1314   }
1315   else {
1316     _TRACE_BEGIN_EXECUTE_DETAILED(-1, -1, _threadEP,CkMyPe(), 0, NULL);
1317     CkExit();
1318   }
1319 #else
1320   CkExit();
1321 #endif
1322 }
1323
1324 void initTraceSummaryBOC()
1325 {
1326 #ifdef __BIGSIM__
1327   if(BgNodeRank()==0) {
1328 #else
1329   if (CkMyRank() == 0) {
1330 #endif
1331     registerExitFn(CombineSummary);
1332   }
1333 }
1334
1335
1336
1337
1338
1339
1340 #include "TraceSummary.def.h"
1341
1342
1343 /*@}*/
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363