Merge branch 'charm' of charmgit:charm into xiang/optChkp
[charm.git] / src / ck-perf / trace-summary.C
1 /**
2  * \addtogroup CkPerf
3 */
4 /*@{*/
5
6 #include "charm++.h"
7 #include "trace-summary.h"
8 #include "trace-summaryBOC.h"
9
10 #define DEBUGF(x)  // CmiPrintf x
11
12 #define VER   7.1
13
14 #define INVALIDEP     -2
15 #define TRACEON_EP     -3
16
17 // 1 minutes of run before it'll fill up:
18 #define DefaultBinCount      (1000*60*1) 
19
20 CkpvStaticDeclare(TraceSummary*, _trace);
21 static int _numEvents = 0;
22 #define NUM_DUMMY_EPS 9
23 CkpvDeclare(int, binCount);
24 CkpvDeclare(double, binSize);
25 CkpvDeclare(double, version);
26
27
28 CkpvDeclare(int, previouslySentBins);
29
30
31
32 /** 
33     A class that reads/writes a buffer out of different types of data.
34
35     This class exists because I need to get references to parts of the buffer 
36     that have already been used so that I can increment counters inside the buffer.
37 */
38
39 class compressedBuffer {
40 public:
41   char* buf;
42   int pos; ///<< byte position just beyond the previously read/written data
43
44   compressedBuffer(){
45     buf = NULL;
46     pos = 0;
47   }
48
49   compressedBuffer(int bytes){
50     buf = (char*)malloc(bytes);
51     pos = 0;
52   }
53
54   compressedBuffer(void *buffer){
55     buf = (char*)buffer;
56     pos = 0;
57   }
58     
59   void init(void *buffer){
60     buf = (char*)buffer;
61     pos = 0;
62   }
63     
64   inline void * currentPtr(){
65     return (void*)(buf+pos);
66   }
67
68   template <typename T>
69   T read(int offset){
70     // to resolve unaligned writes causing bus errors, need memcpy
71     T v;
72     memcpy(&v, buf+offset, sizeof(T));
73     return v;
74   }
75     
76   template <typename T>
77   void write(T v, int offset){
78     T v2 = v; // on stack
79     // to resolve unaligned writes causing bus errors, need memcpy
80     memcpy(buf+offset, &v2, sizeof(T));
81   }
82     
83   template <typename T>
84   void increment(int offset){
85     T temp;
86     temp = read<T>(offset);
87     temp ++;
88     write<T>(temp, offset);
89   }
90
91   template <typename T>
92   void accumulate(T v, int offset){
93     T temp;
94     temp = read<T>(offset);
95     temp += v;
96     write<T>(temp, offset);
97   }
98   
99   template <typename T>
100   int push(T v){
101     int oldpos = pos;
102     write<T>(v, pos);
103     pos += sizeof(T);
104     return oldpos;
105   }
106   
107   template <typename T>
108   T pop(){
109     T temp = read<T>(pos);
110     pos += sizeof(T);
111     return temp;
112   }
113
114   template <typename T>
115   T peek(){
116     T temp = read<T>(pos);
117     return temp;
118   }
119
120   template <typename T0, typename T>
121   T peekSecond(){
122     T temp;
123     memcpy(&temp, buf+pos+sizeof(T0), sizeof(T));
124     return temp;
125   }
126
127   int datalength(){
128     return pos;
129   }
130      
131   void * buffer(){
132     return (void*) buf;
133   }  
134
135   void freeBuf(){
136     free(buf);
137   }
138
139   ~compressedBuffer(){
140     // don't free the buf because the user my have supplied the buffer
141   }
142     
143 };
144
145
146
147
148 // Global Readonly
149 CkGroupID traceSummaryGID;
150 bool summaryCcsStreaming;
151
152 int sumonly = 0;
153 int sumDetail = 0;
154
155 /**
156   For each TraceFoo module, _createTraceFoo() must be defined.
157   This function is called in _createTraces() generated in moduleInit.C
158 */
159 void _createTracesummary(char **argv)
160 {
161   DEBUGF(("%d createTraceSummary\n", CkMyPe()));
162   CkpvInitialize(TraceSummary*, _trace);
163   CkpvInitialize(int, previouslySentBins);
164   CkpvAccess(previouslySentBins) = 0;
165   CkpvAccess(_trace) = new  TraceSummary(argv);
166   CkpvAccess(_traces)->addTrace(CkpvAccess(_trace));
167   if (CkMyPe()==0) CkPrintf("Charm++: Tracemode Summary enabled.\n");
168 }
169
170
171 /// function call for starting a phase in trace summary logs 
172 extern "C" 
173 void CkSummary_StartPhase(int phase)
174 {
175    CkpvAccess(_trace)->startPhase(phase);
176 }
177
178
179 /// function call for adding an event mark
180 extern "C" 
181 void CkSummary_MarkEvent(int eventType)
182 {
183    CkpvAccess(_trace)->addEventType(eventType);
184 }
185
186 static inline void writeU(FILE* fp, int u)
187 {
188   fprintf(fp, "%4d", u);
189 }
190
191 PhaseEntry::PhaseEntry() 
192 {
193   int _numEntries=_entryTable.size();
194   // FIXME: Hopes there won't be more than 10 more EP's registered from now on...
195   nEPs = _numEntries+10; 
196   count = new int[nEPs];
197   times = new double[nEPs];
198   maxtimes = new double[nEPs];
199   for (int i=0; i<nEPs; i++) {
200     count[i] = 0;
201     times[i] = 0.0;
202     maxtimes[i] = 0.;
203   }
204 }
205
206 SumLogPool::~SumLogPool() 
207 {
208     if (!sumonly) {
209       write();
210       fclose(fp);
211       if (sumDetail) fclose(sdfp);
212   }
213   // free memory for mark
214   if (markcount > 0)
215   for (int i=0; i<MAX_MARKS; i++) {
216     for (int j=0; j<events[i].length(); j++)
217       delete events[i][j];
218   }
219   delete[] pool;
220   delete[] epInfo;
221   delete[] cpuTime;
222   delete[] numExecutions;
223 }
224
225 void SumLogPool::addEventType(int eventType, double time)
226 {
227    if (eventType <0 || eventType >= MAX_MARKS) {
228        CkPrintf("Invalid event type %d!\n", eventType);
229        return;
230    }
231    MarkEntry *e = new MarkEntry;
232    e->time = time;
233    events[eventType].push_back(e);
234    markcount ++;
235 }
236
237 SumLogPool::SumLogPool(char *pgm) : numBins(0), phaseTab(MAX_PHASES) 
238 {
239    // TBD: Can this be moved to initMem?
240   cpuTime = NULL;
241    poolSize = CkpvAccess(binCount);
242    if (poolSize % 2) poolSize++;        // make sure it is even
243    pool = new BinEntry[poolSize];
244    _MEMCHECK(pool);
245
246    this->pgm = new char[strlen(pgm)+1];
247    strcpy(this->pgm,pgm);
248    
249 #if 0
250    // create the sts file
251    if (CkMyPe() == 0) {
252      char *fname = 
253        new char[strlen(CkpvAccess(traceRoot))+strlen(".sum.sts")+1];
254      sprintf(fname, "%s.sum.sts", CkpvAccess(traceRoot));
255      stsfp = fopen(fname, "w+");
256      //CmiPrintf("File: %s \n", fname);
257      if (stsfp == 0) {
258        CmiAbort("Cannot open summary sts file for writing.\n");
259       }
260      delete[] fname;
261    }
262 #endif
263    
264    // event
265    markcount = 0;
266 }
267
268 void SumLogPool::initMem()
269 {
270    int _numEntries=_entryTable.size();
271    epInfoSize = _numEntries + NUM_DUMMY_EPS + 1; // keep a spare EP
272    epInfo = new SumEntryInfo[epInfoSize];
273    _MEMCHECK(epInfo);
274
275    cpuTime = NULL;
276    numExecutions = NULL;
277    if (sumDetail) {
278        cpuTime = new double[poolSize*epInfoSize];
279        _MEMCHECK(cpuTime);
280        memset(cpuTime, 0, poolSize*epInfoSize*sizeof(double));
281        numExecutions = new int[poolSize*epInfoSize];
282        _MEMCHECK(numExecutions);
283        memset(numExecutions, 0, poolSize*epInfoSize*sizeof(int));
284
285 //         int i, e;
286 //         for(i=0; i<poolSize; i++) {
287 //             for(e=0; e< epInfoSize; e++) {
288 //                 setCPUtime(i,e,0.0);
289 //                 setNumExecutions(i,e,0);
290 //             }
291 //         }
292    }
293 }
294
295 int SumLogPool::getUtilization(int interval, int ep) {
296     return (int)(getCPUtime(interval, ep) * 100.0 / CkpvAccess(binSize)); 
297 }
298
299 void SumLogPool::write(void) 
300 {
301   int i;
302   unsigned int j;
303   int _numEntries=_entryTable.size();
304
305   fp = NULL;
306   sdfp = NULL;
307
308   // create file(s)
309   // CmiPrintf("TRACE: %s:%d\n", fname, errno);
310   if (!sumonly) {
311     char pestr[10];
312     sprintf(pestr, "%d", CkMyPe());
313     int len = strlen(pgm) + strlen(".sumd.") + strlen(pestr) + 1;
314     char *fname = new char[len+1];
315     
316     sprintf(fname, "%s.%s.sum", pgm, pestr);
317     do {
318       fp = fopen(fname, "w+");
319     } while (!fp && errno == EINTR);
320     if (!fp) {
321       CkPrintf("[%d] Attempting to open [%s]\n",CkMyPe(),fname);
322       CmiAbort("Cannot open Summary Trace File for writing...\n");
323     }
324     
325     if (sumDetail) {
326       sprintf(fname, "%s.%s.sumd", pgm, pestr);
327       do {
328         sdfp = fopen(fname, "w+");
329       } while (!sdfp && errno == EINTR);
330       if(!sdfp) {
331         CmiAbort("Cannot open Detailed Summary Trace File for writing...\n");
332       }
333     }
334     delete[] fname;
335   }
336
337   fprintf(fp, "ver:%3.1f %d/%d count:%d ep:%d interval:%e", CkpvAccess(version), CkMyPe(), CkNumPes(), numBins, _numEntries, CkpvAccess(binSize));
338   if (CkpvAccess(version)>=3.0)
339   {
340     fprintf(fp, " phases:%d", phaseTab.numPhasesCalled());
341   }
342   fprintf(fp, "\n");
343
344   // write bin time
345 #if 1
346   int last=pool[0].getU();
347   writeU(fp, last);
348   int count=1;
349   for(j=1; j<numBins; j++) {
350     int u = pool[j].getU();
351     if (last == u) {
352       count++;
353     }
354     else {
355       if (count > 1) fprintf(fp, "+%d", count);
356       writeU(fp, u);
357       last = u;
358       count = 1;
359     }
360   }
361   if (count > 1) fprintf(fp, "+%d", count);
362 #else
363   for(j=0; j<numEntries; j++) 
364       pool[j].write(fp);
365 #endif
366   fprintf(fp, "\n");
367
368   // write entry execution time
369   fprintf(fp, "EPExeTime: ");
370   for (i=0; i<_numEntries; i++)
371     fprintf(fp, "%ld ", (long)(epInfo[i].epTime*1.0e6));
372   fprintf(fp, "\n");
373   // write entry function call times
374   fprintf(fp, "EPCallTime: ");
375   for (i=0; i<_numEntries; i++)
376     fprintf(fp, "%d ", epInfo[i].epCount);
377   fprintf(fp, "\n");
378   // write max entry function execute times
379   fprintf(fp, "MaxEPTime: ");
380   for (i=0; i<_numEntries; i++)
381     fprintf(fp, "%ld ", (long)(epInfo[i].epMaxTime*1.0e6));
382   fprintf(fp, "\n");
383 #if 0
384   for (i=0; i<SumEntryInfo::HIST_SIZE; i++) {
385     for (j=0; j<_numEntries; j++) 
386       fprintf(fp, "%d ", epInfo[j].hist[i]);
387     fprintf(fp, "\n");
388   }
389 #endif
390   // write marks
391   if (CkpvAccess(version)>=2.0) 
392   {
393     fprintf(fp, "NumMarks: %d ", markcount);
394     for (i=0; i<MAX_MARKS; i++) {
395       for(int j=0; j<events[i].length(); j++)
396         fprintf(fp, "%d %f ", i, events[i][j]->time);
397     }
398     fprintf(fp, "\n");
399   }
400   // write phases
401   if (CkpvAccess(version)>=3.0)
402   {
403     phaseTab.write(fp);
404   }
405   // write idle time
406   if (CkpvAccess(version)>=7.1) {
407     fprintf(fp, "IdlePercent: ");
408     int last=pool[0].getUIdle();
409     writeU(fp, last);
410     int count=1;
411     for(j=1; j<numBins; j++) {
412       int u = pool[j].getUIdle();
413       if (last == u) {
414         count++;
415       }
416       else {
417         if (count > 1) fprintf(fp, "+%d", count);
418         writeU(fp, u);
419         last = u;
420         count = 1;
421       }
422     }
423     if (count > 1) fprintf(fp, "+%d", count);
424     fprintf(fp, "\n");
425   }
426
427   //CkPrintf("writing to detail file:%d    %d \n", getNumEntries(), numBins);
428   // write summary details
429   if (sumDetail) {
430         fprintf(sdfp, "ver:%3.1f cpu:%d/%d numIntervals:%d numEPs:%d intervalSize:%e\n",
431                 CkpvAccess(version), CkMyPe(), CkNumPes(),
432                 numBins, _numEntries, CkpvAccess(binSize));
433
434         // Write out cpuTime in microseconds
435         // Run length encoding (RLE) along EP axis
436         fprintf(sdfp, "ExeTimePerEPperInterval ");
437         unsigned int e, i;
438         long last= (long) (getCPUtime(0,0)*1.0e6);
439         int count=0;
440         fprintf(sdfp, "%ld", last);
441         for(e=0; e<_numEntries; e++) {
442             for(i=0; i<numBins; i++) {
443
444                 long u= (long) (getCPUtime(i,e)*1.0e6);
445                 if (last == u) {
446                     count++;
447                 } else {
448
449                     if (count > 1) fprintf(sdfp, "+%d", count);
450                     fprintf(sdfp, " %ld", u);
451                     last = u;
452                     count = 1;
453                 }
454             }
455         }
456         if (count > 1) fprintf(sdfp, "+%d", count);
457         fprintf(sdfp, "\n");
458         // Write out numExecutions
459         // Run length encoding (RLE) along EP axis
460         fprintf(sdfp, "EPCallTimePerInterval ");
461         last= getNumExecutions(0,0);
462         count=0;
463         fprintf(sdfp, "%ld", last);
464         for(e=0; e<_numEntries; e++) {
465             for(i=0; i<numBins; i++) {
466
467                 long u= getNumExecutions(i, e);
468                 if (last == u) {
469                     count++;
470                 } else {
471
472                     if (count > 1) fprintf(sdfp, "+%d", count);
473                     fprintf(sdfp, " %ld", u);
474                     last = u;
475                     count = 1;
476                 }
477             }
478         }
479         if (count > 1) fprintf(sdfp, "+%d", count);
480         fprintf(sdfp, "\n");
481   }
482 }
483
484 void SumLogPool::writeSts(void)
485 {
486     // open sts file
487   char *fname = 
488        new char[strlen(CkpvAccess(traceRoot))+strlen(".sum.sts")+1];
489   sprintf(fname, "%s.sum.sts", CkpvAccess(traceRoot));
490   stsfp = fopen(fname, "w+");
491   //CmiPrintf("File: %s \n", fname);
492   if (stsfp == 0) {
493        CmiAbort("Cannot open summary sts file for writing.\n");
494   }
495   delete[] fname;
496
497   traceWriteSTS(stsfp,_numEvents);
498   for(int i=0;i<_numEvents;i++)
499     fprintf(stsfp, "EVENT %d Event%d\n", i, i);
500   fprintf(stsfp, "END\n");
501
502   fclose(stsfp);
503 }
504
505 // Called once per interval
506 void SumLogPool::add(double time, double idleTime, int pe) 
507 {
508   new (&pool[numBins++]) BinEntry(time, idleTime);
509   if (poolSize==numBins) {
510     shrink();
511   }
512 }
513
514 // Called once per run of an EP
515 // adds 'time' to EP's time, increments epCount
516 void SumLogPool::setEp(int epidx, double time) 
517 {
518   if (epidx >= epInfoSize) {
519         CmiAbort("Invalid entry point!!\n");
520   }
521   //CmiPrintf("set EP: %d %e \n", epidx, time);
522   epInfo[epidx].setTime(time);
523   // set phase table counter
524   phaseTab.setEp(epidx, time);
525 }
526
527 // Called once from endExecute, endPack, etc. this function updates
528 // the sumDetail intervals.
529 void SumLogPool::updateSummaryDetail(int epIdx, double startTime, double endTime)
530 {
531         if (epIdx >= epInfoSize) {
532             CmiAbort("Too many entry points!!\n");
533         }
534
535         double binSz = CkpvAccess(binSize);
536         int startingBinIdx, endingBinIdx;
537         startingBinIdx = (int)(startTime/binSz);
538         endingBinIdx = (int)(endTime/binSz);
539         // shrink if needed
540         while (endingBinIdx >= poolSize) {
541           shrink();
542           CmiAssert(CkpvAccess(binSize) > binSz);
543           binSz = CkpvAccess(binSize);
544           startingBinIdx = (int)(startTime/binSz);
545           endingBinIdx = (int)(endTime/binSz);
546         }
547
548         if (startingBinIdx == endingBinIdx) {
549             addToCPUtime(startingBinIdx, epIdx, endTime - startTime);
550         } else if (startingBinIdx < endingBinIdx) { // EP spans intervals
551             addToCPUtime(startingBinIdx, epIdx, (startingBinIdx+1)*binSz - startTime);
552             while(++startingBinIdx < endingBinIdx)
553                 addToCPUtime(startingBinIdx, epIdx, binSz);
554             addToCPUtime(endingBinIdx, epIdx, endTime - endingBinIdx*binSz);
555         } else {
556           CkPrintf("[%d] EP:%d Start:%lf End:%lf\n",CkMyPe(),epIdx,
557                    startTime, endTime);
558             CmiAbort("Error: end time of EP is less than start time\n");
559         }
560
561         incNumExecutions(startingBinIdx, epIdx);
562 }
563
564 // Shrinks pool[], cpuTime[], and numExecutions[]
565 void SumLogPool::shrink(void)
566 {
567 //  double t = CmiWallTimer();
568
569   // We ensured earlier that poolSize is even; therefore now numBins
570   // == poolSize == even.
571   int entries = numBins/2;
572   for (int i=0; i<entries; i++)
573   {
574      pool[i].time() = pool[i*2].time() + pool[i*2+1].time();
575      pool[i].getIdleTime() = pool[i*2].getIdleTime() + pool[i*2+1].getIdleTime();
576      if (sumDetail)
577      for (int e=0; e < epInfoSize; e++) {
578          setCPUtime(i, e, getCPUtime(i*2, e) + getCPUtime(i*2+1, e));
579          setNumExecutions(i, e, getNumExecutions(i*2, e) + getNumExecutions(i*2+1, e));
580      }
581   }
582   // zero out the remaining intervals
583   if (sumDetail) {
584     memset(&cpuTime[entries*epInfoSize], 0, (numBins-entries)*epInfoSize*sizeof(double));
585     memset(&numExecutions[entries*epInfoSize], 0, (numBins-entries)*epInfoSize*sizeof(int));
586   }
587   numBins = entries;
588   CkpvAccess(binSize) *= 2;
589
590 //CkPrintf("Shrinked binsize: %f entries:%d takes %fs!!!!\n", CkpvAccess(binSize), numEntries, CmiWallTimer()-t);
591 }
592
593 void SumLogPool::shrink(double _maxBinSize)
594 {
595     while(CkpvAccess(binSize) < _maxBinSize)
596     {
597         shrink();
598     };
599 }
600 int  BinEntry::getU() 
601
602   return (int)(_time * 100.0 / CkpvAccess(binSize)); 
603 }
604
605 int BinEntry::getUIdle() {
606   return (int)(_idleTime * 100.0 / CkpvAccess(binSize));
607 }
608
609 void BinEntry::write(FILE* fp)
610 {
611   writeU(fp, getU());
612 }
613
614 TraceSummary::TraceSummary(char **argv):binStart(0.0),idleStart(0.0),
615                                         binTime(0.0),binIdle(0.0),msgNum(0)
616 {
617   if (CkpvAccess(traceOnPe) == 0) return;
618
619     // use absolute time
620   if (CmiTimerAbsolute()) binStart = CmiInitTime();
621
622   CkpvInitialize(int, binCount);
623   CkpvInitialize(double, binSize);
624   CkpvInitialize(double, version);
625   CkpvAccess(binSize) = BIN_SIZE;
626   CkpvAccess(version) = VER;
627   CkpvAccess(binCount) = DefaultBinCount;
628   if (CmiGetArgIntDesc(argv,"+bincount",&CkpvAccess(binCount), "Total number of summary bins"))
629     if (CkMyPe() == 0) 
630       CmiPrintf("Trace: bincount: %d\n", CkpvAccess(binCount));
631   CmiGetArgDoubleDesc(argv,"+binsize",&CkpvAccess(binSize),
632         "CPU usage log time resolution");
633   CmiGetArgDoubleDesc(argv,"+version",&CkpvAccess(version),
634         "Write this .sum file version");
635
636   epThreshold = 0.001; 
637   CmiGetArgDoubleDesc(argv,"+epThreshold",&epThreshold,
638         "Execution time histogram lower bound");
639   epInterval = 0.001; 
640   CmiGetArgDoubleDesc(argv,"+epInterval",&epInterval,
641         "Execution time histogram bin size");
642
643   sumonly = CmiGetArgFlagDesc(argv, "+sumonly", "merge histogram bins on processor 0");
644   // +sumonly overrides +sumDetail
645   if (!sumonly)
646       sumDetail = CmiGetArgFlagDesc(argv, "+sumDetail", "more detailed summary info");
647
648   _logPool = new SumLogPool(CkpvAccess(traceRoot));
649   // assume invalid entry point on start
650   execEp=INVALIDEP;
651   inIdle = 0;
652   inExec = 0;
653   depth = 0;
654 }
655
656 void TraceSummary::traceClearEps(void)
657 {
658   _logPool->clearEps();
659 }
660
661 void TraceSummary::traceWriteSts(void)
662 {
663   if(CkMyPe()==0)
664       _logPool->writeSts();
665 }
666
667 void TraceSummary::traceClose(void)
668 {
669     if(CkMyPe()==0)
670         _logPool->writeSts();
671     CkpvAccess(_trace)->endComputation();
672
673     delete _logPool;
674     CkpvAccess(_traces)->removeTrace(this);
675 }
676
677 void TraceSummary::beginExecute(CmiObjId *tid)
678 {
679   beginExecute(-1,-1,_threadEP,-1);
680 }
681
682 void TraceSummary::beginExecute(envelope *e)
683 {
684   // no message means thread execution
685   if (e==NULL) {
686     beginExecute(-1,-1,_threadEP,-1);
687   }
688   else {
689     beginExecute(-1,-1,e->getEpIdx(),-1);
690   }  
691 }
692
693 void TraceSummary::beginExecute(char *msg)
694 {
695 #if CMK_SMP_TRACE_COMMTHREAD
696     //This function is called from comm thread in SMP mode
697     envelope *e = (envelope *)msg;
698     int num = _entryTable.size();
699     int ep = e->getEpIdx();
700     if(ep<0 || ep>=num) return;
701     if(_entryTable[ep]->traceEnabled)
702         beginExecute(-1,-1,e->getEpIdx(),-1);
703 #endif
704 }
705
706 void TraceSummary::beginExecute(int event,int msgType,int ep,int srcPe, int mlen, CmiObjId *idx)
707 {
708   if (execEp == TRACEON_EP) {
709     endExecute();
710   }
711   CmiAssert(inIdle == 0);
712   if (inExec == 0) {
713     CmiAssert(depth == 0);
714     inExec = 1;
715   }
716   depth ++;
717   // printf("BEGIN exec: %d %d %d\n", inIdle, inExec, depth);
718
719   if (depth > 1) return;          //  nested
720
721 /*
722   if (execEp != INVALIDEP) {
723     TRACE_WARN("Warning: TraceSummary two consecutive BEGIN_PROCESSING!\n");
724     return;
725   }
726 */
727   
728   execEp=ep;
729   double t = TraceTimer();
730   //CmiPrintf("start: %f \n", start);
731   
732   start = t;
733   double ts = binStart;
734   // fill gaps
735   while ((ts = ts + CkpvAccess(binSize)) < t) {
736     /* Keep as a template for error checking. The current form of this check
737        is vulnerable to round-off errors (eg. 0.001 vs 0.001 the first time
738        I used it). Perhaps an improved form could be used in case vastly
739        incompatible EP vs idle times are reported (binSize*2?).
740
741        This check will have to be duplicated before each call to add()
742
743     CkPrintf("[%d] %f vs %f\n", CkMyPe(),
744              binTime + binIdle, CkpvAccess(binSize));
745     CkAssert(binTime + binIdle <= CkpvAccess(binSize));
746     */
747      _logPool->add(binTime, binIdle, CkMyPe()); // add leftovers of last bin
748      binTime=0.0;                 // fill all other bins with 0 up till start
749      binIdle = 0.0;
750      binStart = ts;
751   }
752 }
753
754 void TraceSummary::endExecute()
755 {
756   CmiAssert(inIdle == 0 && inExec == 1);
757   depth --;
758   if (depth == 0) inExec = 0;
759   CmiAssert(depth >= 0);
760   // printf("END exec: %d %d %d\n", inIdle, inExec, depth);
761
762   if (depth != 0) return;
763  
764   double t = TraceTimer();
765   double ts = start;
766   double nts = binStart;
767
768 /*
769   if (execEp == TRACEON_EP) {
770     // if trace just got turned on, then one expects to see this
771     // END_PROCESSING event without seeing a preceeding BEGIN_PROCESSING
772     return;
773   }
774 */
775
776   if (execEp == INVALIDEP) {
777     TRACE_WARN("Warning: TraceSummary END_PROCESSING without BEGIN_PROCESSING!\n");
778     return;
779   }
780
781   if (execEp >= 0)
782   {
783     _logPool->setEp(execEp, t-ts);
784   }
785
786   while ((nts = nts + CkpvAccess(binSize)) < t)
787   {
788     // fill the bins with time for this entry method
789      binTime += nts-ts;
790      binStart  = nts;
791      // This calls shrink() if needed
792      _logPool->add(binTime, binIdle, CkMyPe()); 
793      binTime = 0.0;
794      binIdle = 0.0;
795      ts = nts;
796   }
797   binTime += t - ts;
798
799   if (sumDetail && execEp >= 0 )
800       _logPool->updateSummaryDetail(execEp, start, t);
801
802   execEp = INVALIDEP;
803 }
804
805 void TraceSummary::endExecute(char *msg){
806 #if CMK_SMP_TRACE_COMMTHREAD
807     //This function is called from comm thread in SMP mode
808     envelope *e = (envelope *)msg;
809     int num = _entryTable.size();
810     int ep = e->getEpIdx();
811     if(ep<0 || ep>=num) return;
812     if(_entryTable[ep]->traceEnabled){
813         endExecute();
814     }
815 #endif    
816 }
817
818 void TraceSummary::beginIdle(double currT)
819 {
820   if (execEp == TRACEON_EP) {
821     endExecute();
822   }
823
824   CmiAssert(inIdle == 0 && inExec == 0);
825   inIdle = 1;
826   //printf("BEGIN idle: %d %d %d\n", inIdle, inExec, depth);
827
828   double t = TraceTimer(currT);
829   
830   // mark the time of this idle period. Only the next endIdle should see
831   // this value
832   idleStart = t; 
833   double ts = binStart;
834   // fill gaps
835   while ((ts = ts + CkpvAccess(binSize)) < t) {
836     _logPool->add(binTime, binIdle, CkMyPe()); // add leftovers of last bin
837     binTime=0.0;                 // fill all other bins with 0 up till start
838     binIdle = 0.0;
839     binStart = ts;
840   }
841 }
842
843 void TraceSummary::endIdle(double currT)
844 {
845   CmiAssert(inIdle == 1 && inExec == 0);
846   inIdle = 0;
847   // printf("END idle: %d %d %d\n", inIdle, inExec, depth);
848
849   double t = TraceTimer(currT);
850   double t_idleStart = idleStart;
851   double t_binStart = binStart;
852
853   while ((t_binStart = t_binStart + CkpvAccess(binSize)) < t)
854   {
855     // fill the bins with time for idle
856     binIdle += t_binStart - t_idleStart;
857     binStart = t_binStart;
858     _logPool->add(binTime, binIdle, CkMyPe()); // This calls shrink() if needed
859     binTime = 0.0;
860     binIdle = 0.0;
861     t_idleStart = t_binStart;
862   }
863   binIdle += t - t_idleStart;
864 }
865
866 void TraceSummary::traceBegin(void)
867 {
868     // fake as a start of an event, assuming traceBegin is called inside an
869     // entry function.
870   beginExecute(-1, -1, TRACEON_EP, -1, -1);
871 }
872
873 void TraceSummary::traceEnd(void)
874 {
875   endExecute();
876 }
877
878 void TraceSummary::beginPack(void)
879 {
880     packstart = CmiWallTimer();
881 }
882
883 void TraceSummary::endPack(void)
884 {
885     _logPool->setEp(_packEP, CmiWallTimer() - packstart);
886     if (sumDetail)
887         _logPool->updateSummaryDetail(_packEP,  TraceTimer(packstart), TraceTimer(CmiWallTimer()));
888 }
889
890 void TraceSummary::beginUnpack(void)
891 {
892     unpackstart = CmiWallTimer();
893 }
894
895 void TraceSummary::endUnpack(void)
896 {
897     _logPool->setEp(_unpackEP, CmiWallTimer()-unpackstart);
898     if (sumDetail)
899         _logPool->updateSummaryDetail(_unpackEP,  TraceTimer(unpackstart), TraceTimer(CmiWallTimer()));
900 }
901
902 void TraceSummary::beginComputation(void)
903 {
904   // initialze arrays because now the number of entries is known.
905   _logPool->initMem();
906 }
907
908 void TraceSummary::endComputation(void)
909 {
910   static int done = 0;
911   if (done) return;
912   done = 1;
913   if (msgNum==0) {
914 //CmiPrintf("Add at last: %d pe:%d time:%f msg:%d\n", index, CkMyPe(), bin, msgNum);
915      _logPool->add(binTime, binIdle, CkMyPe());
916      binTime = 0.0;
917      binIdle = 0.0;
918      msgNum ++;
919
920      binStart  += CkpvAccess(binSize);
921      double t = TraceTimer();
922      double ts = binStart;
923      while (ts < t)
924      {
925        _logPool->add(binTime, binIdle, CkMyPe());
926        binTime=0.0;
927        binIdle = 0.0;
928        ts += CkpvAccess(binSize);
929      }
930
931   }
932 }
933
934 void TraceSummary::addEventType(int eventType)
935 {
936   _logPool->addEventType(eventType, TraceTimer());
937 }
938
939 void TraceSummary::startPhase(int phase)
940 {
941    _logPool->startPhase(phase);
942 }
943
944 void TraceSummary::traceEnableCCS() {
945   CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
946   sumProxy.initCCS();
947 }
948
949
950 void TraceSummary::fillData(double *buffer, double reqStartTime, 
951                             double reqBinSize, int reqNumBins) {
952   // buffer has to be pre-allocated by the requester and must be an array of
953   // size reqNumBins.
954   //
955   // Assumptions: **CWL** FOR DEMO ONLY - a production-capable version will
956   //              need a number of these assumptions dropped:
957   //              1) reqBinSize == binSize (unrealistic)
958   //              2) bins boundary aligned (ok even under normal circumstances)
959   //              3) bins are "factor"-aligned (where reqBinSize != binSize)
960   //              4) bins are always available (true unless flush)
961   //              5) bins always starts from 0 (unrealistic)
962
963   // works only because of 1)
964   // **CWL** - FRACKING STUPID NAME "binStart" has nothing to do with 
965   //           "starting" at all!
966   int binOffset = (int)(reqStartTime/reqBinSize); 
967   for (int i=binOffset; i<binOffset + reqNumBins; i++) {
968     // CkPrintf("[%d] %f\n", i, pool()->getTime(i));
969     buffer[i-binOffset] = pool()->getTime(i);
970   }
971 }
972
973 void TraceSummaryBOC::traceSummaryParallelShutdown(int pe) {
974    
975     UInt    numBins = CkpvAccess(_trace)->pool()->getNumEntries();  
976     //CkPrintf("trace shut down pe=%d bincount=%d\n", CkMyPe(), numBins);
977     CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
978     CkCallback cb(CkIndex_TraceSummaryBOC::maxBinSize(NULL), sumProxy[0]);
979     contribute(sizeof(double), &(CkpvAccess(binSize)), CkReduction::max_double, cb);
980 }
981
982 // collect the max bin size
983 void TraceSummaryBOC::maxBinSize(CkReductionMsg *msg)
984 {
985     double _maxBinSize = *((double *)msg->getData());
986     CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
987     sumProxy.shrink(_maxBinSize);
988 }
989
990 void TraceSummaryBOC::shrink(double _mBin){
991     UInt    numBins = CkpvAccess(_trace)->pool()->getNumEntries();  
992     UInt    epNums  = CkpvAccess(_trace)->pool()->getEpInfoSize();
993     _maxBinSize = _mBin;
994     if(CkpvAccess(binSize) < _maxBinSize)
995     {
996         CkpvAccess(_trace)->pool()->shrink(_maxBinSize);
997     }
998     double *sumData = CkpvAccess(_trace)->pool()->getCpuTime();  
999     CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
1000     CkCallback cb(CkIndex_TraceSummaryBOC::sumData(NULL), sumProxy[0]);
1001     contribute(sizeof(double) * numBins * epNums, CkpvAccess(_trace)->pool()->getCpuTime(), CkReduction::sum_double, cb);
1002 }
1003
1004 void TraceSummaryBOC::sumData(CkReductionMsg *msg) {
1005     double *sumData = (double *)msg->getData();
1006     int     totalsize = msg->getSize()/sizeof(double);
1007     UInt    epNums  = CkpvAccess(_trace)->pool()->getEpInfoSize();
1008     UInt    numBins = totalsize/epNums;  
1009     int     numEntries = epNums - NUM_DUMMY_EPS - 1; 
1010     char    *fname = new char[strlen(CkpvAccess(traceRoot))+strlen(".sumall")+1];
1011     sprintf(fname, "%s.sumall", CkpvAccess(traceRoot));
1012     FILE *sumfp = fopen(fname, "w+");
1013     fprintf(sumfp, "ver:%3.1f cpu:%d numIntervals:%d numEPs:%d intervalSize:%e\n",
1014                 CkpvAccess(version), CkNumPes(),
1015                 numBins, numEntries, _maxBinSize);
1016     for(int i=0; i<numBins; i++){
1017         for(int j=0; j<numEntries; j++)
1018         {
1019             fprintf(sumfp, "%ld ", (long)(sumData[i*epNums+j]*1.0e6));
1020         }
1021     }
1022    fclose(sumfp);
1023    //CkPrintf("done with analysis\n");
1024    CkExit();
1025 }
1026
1027 /// for TraceSummaryBOC
1028
1029 void TraceSummaryBOC::initCCS() {
1030   if(firstTime){
1031     CkPrintf("[%d] initCCS() called for first time\n", CkMyPe());
1032     // initializing CCS-based parameters on all processors
1033     lastRequestedIndexBlock = 0;
1034     indicesPerBlock = 1000;
1035     collectionGranularity = 0.001; // time in seconds
1036     nBufferedBins = 0;
1037     
1038     // initialize buffer, register CCS handler and start the collection
1039     // pulse only on pe 0.
1040     if (CkMyPe() == 0) { 
1041       ccsBufferedData = new CkVec<double>();
1042     
1043       CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
1044       CkPrintf("Trace Summary now listening in for CCS Client\n");
1045       CcsRegisterHandler("CkPerfSummaryCcsClientCB", 
1046                          CkCallback(CkIndex_TraceSummaryBOC::ccsRequestSummaryDouble(NULL), sumProxy[0]));
1047       CcsRegisterHandler("CkPerfSummaryCcsClientCB uchar", 
1048                          CkCallback(CkIndex_TraceSummaryBOC::ccsRequestSummaryUnsignedChar(NULL), sumProxy[0])); 
1049
1050       CkPrintf("[%d] Setting up periodic startCollectData callback\n", CkMyPe());
1051       CcdCallOnConditionKeep(CcdPERIODIC_1second, startCollectData,
1052                              (void *)this);
1053       summaryCcsStreaming = CmiTrue;
1054     }
1055     firstTime = false;
1056   }
1057 }
1058
1059 /** Return summary information as double precision values for each sample period. 
1060     The actual data collection is in double precision values. 
1061
1062     The units on the returned values are total execution time across all PEs.
1063 */
1064 void TraceSummaryBOC::ccsRequestSummaryDouble(CkCcsRequestMsg *m) {
1065   double *sendBuffer;
1066
1067   CkPrintf("[%d] Request from Client detected.\n", CkMyPe());
1068
1069   CkPrintf("Responding ...\n");
1070   int datalength = 0;
1071   // if we have no data to send, send an acknowledgement code of -13.37
1072   // instead.
1073   if (ccsBufferedData->length() == 0) {
1074     sendBuffer = new double[1];
1075     sendBuffer[0] = -13.37;
1076     datalength = sizeof(double);
1077     CcsSendDelayedReply(m->reply, datalength, (void *)sendBuffer);
1078     delete [] sendBuffer;
1079   } else {
1080     sendBuffer = ccsBufferedData->getVec();
1081     datalength = ccsBufferedData->length()*sizeof(double);
1082     CcsSendDelayedReply(m->reply, datalength, (void *)sendBuffer);
1083     ccsBufferedData->free();
1084   }
1085   CkPrintf("Response Sent. Proceeding with computation.\n");
1086   delete m;
1087 }
1088
1089
1090 /** Return summary information as unsigned char values for each sample period. 
1091     The actual data collection is in double precision values.
1092
1093     This returns the utilization in a range from 0 to 200.
1094 */
1095 void TraceSummaryBOC::ccsRequestSummaryUnsignedChar(CkCcsRequestMsg *m) {
1096   unsigned char *sendBuffer;
1097
1098   CkPrintf("[%d] Request from Client detected. \n", CkMyPe());
1099
1100   CkPrintf("Responding ...\n");
1101   int datalength = 0;
1102
1103   if (ccsBufferedData->length() == 0) {
1104     sendBuffer = new unsigned char[1];
1105     sendBuffer[0] = 255;
1106     datalength = sizeof(unsigned char);
1107     CcsSendDelayedReply(m->reply, datalength, (void *)sendBuffer);
1108     delete [] sendBuffer;
1109   } else {
1110     double * doubleData = ccsBufferedData->getVec();
1111     int numData = ccsBufferedData->length();
1112     
1113     // pack data into unsigned char array
1114     sendBuffer = new unsigned char[numData];
1115     
1116     for(int i=0;i<numData;i++){
1117       sendBuffer[i] = 1000.0 * doubleData[i] / (double)CkNumPes() * 200.0; // max = 200 is the same as 100% utilization
1118     }    
1119
1120     datalength = sizeof(unsigned char) * numData;
1121     
1122     CcsSendDelayedReply(m->reply, datalength, (void *)sendBuffer);
1123     ccsBufferedData->free();
1124     delete [] sendBuffer;
1125   }
1126   CkPrintf("Response Sent. Proceeding with computation.\n");
1127   delete m;
1128 }
1129
1130
1131
1132 void startCollectData(void *data, double currT) {
1133   CkAssert(CkMyPe() == 0);
1134   // CkPrintf("startCollectData()\n");
1135   TraceSummaryBOC *sumObj = (TraceSummaryBOC *)data;
1136   int lastRequestedIndexBlock = sumObj->lastRequestedIndexBlock;
1137   double collectionGranularity = sumObj->collectionGranularity;
1138   int indicesPerBlock = sumObj->indicesPerBlock;
1139   
1140   double startTime = lastRequestedIndexBlock*
1141     collectionGranularity * indicesPerBlock;
1142   int numIndicesToGet = (int)floor((currT - startTime)/
1143                                    collectionGranularity);
1144   int numBlocksToGet = numIndicesToGet/indicesPerBlock;
1145   // **TODO** consider limiting the total number of blocks each collection
1146   //   request will pick up. This is to limit the amount of perturbation
1147   //   if it proves to be a problem.
1148   CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
1149
1150    sumProxy.collectSummaryData(startTime, 
1151                        collectionGranularity,
1152                        numBlocksToGet*indicesPerBlock);
1153   // assume success
1154   sumObj->lastRequestedIndexBlock += numBlocksToGet; 
1155 }
1156
1157 void TraceSummaryBOC::collectSummaryData(double startTime, double binSize,
1158                                   int numBins) {
1159   // CkPrintf("[%d] asked to contribute performance data\n", CkMyPe());
1160
1161   double *contribution = new double[numBins];
1162   for (int i=0; i<numBins; i++) {
1163     contribution[i] = 0.0;
1164   }
1165   CkpvAccess(_trace)->fillData(contribution, startTime, binSize, numBins);
1166
1167   /*
1168   for (int i=0; i<numBins; i++) {
1169     CkPrintf("[%d] %f\n", i, contribution[i]);
1170   }
1171   */
1172
1173   CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
1174   CkCallback cb(CkIndex_TraceSummaryBOC::summaryDataCollected(NULL), sumProxy[0]);
1175   contribute(sizeof(double)*numBins, contribution, CkReduction::sum_double, 
1176              cb);
1177 }
1178
1179 void TraceSummaryBOC::summaryDataCollected(CkReductionMsg *msg) {
1180   CkAssert(CkMyPe() == 0);
1181   // **CWL** No memory management for the ccs buffer for now.
1182
1183   // CkPrintf("[%d] Reduction completed and received\n", CkMyPe());
1184   double *recvData = (double *)msg->getData();
1185   int numBins = msg->getSize()/sizeof(double);
1186
1187   // if there's an easier way to append a data block to a CkVec, I'll take it
1188   for (int i=0; i<numBins; i++) {
1189     ccsBufferedData->insertAtEnd(recvData[i]);
1190   }
1191   delete msg;
1192 }
1193
1194
1195
1196
1197 void TraceSummaryBOC::startSumOnly()
1198 {
1199   CmiAssert(CkMyPe() == 0);
1200
1201   CProxy_TraceSummaryBOC p(traceSummaryGID);
1202   int size = CkpvAccess(_trace)->pool()->getNumEntries();
1203   p.askSummary(size);
1204 }
1205
1206 void TraceSummaryBOC::askSummary(int size)
1207 {
1208   if (CkpvAccess(_trace) == NULL) return;
1209
1210   int traced = CkpvAccess(_trace)->traceOnPE();
1211
1212   BinEntry *reductionBuffer = new BinEntry[size+1];
1213   reductionBuffer[size].time() = traced;  // last element is the traced pe count
1214   reductionBuffer[size].getIdleTime() = 0;  // last element is the traced pe count
1215   if (traced) {
1216     CkpvAccess(_trace)->endComputation();
1217     int n = CkpvAccess(_trace)->pool()->getNumEntries();
1218     BinEntry *localBins = CkpvAccess(_trace)->pool()->bins();
1219     if (n>size) n=size;
1220     for (int i=0; i<n; i++) reductionBuffer[i] = localBins[i];
1221   }
1222
1223   contribute(sizeof(BinEntry)*(size+1), reductionBuffer, 
1224              CkReduction::sum_double);
1225   delete [] reductionBuffer;
1226 }
1227
1228 //extern "C" void _CkExit();
1229
1230 void TraceSummaryBOC::sendSummaryBOC(CkReductionMsg *msg)
1231 {
1232   if (CkpvAccess(_trace) == NULL) return;
1233
1234   CkAssert(CkMyPe() == 0);
1235
1236   int n = msg->getSize()/sizeof(BinEntry);
1237   nBins = n-1;
1238   bins = (BinEntry *)msg->getData();
1239   nTracedPEs = (int)bins[n-1].time();
1240   // CmiPrintf("traced: %d entry:%d\n", nTracedPEs, nBins);
1241
1242   write();
1243
1244   delete msg;
1245
1246   CkExit();
1247 }
1248
1249 void TraceSummaryBOC::write(void) 
1250 {
1251   unsigned int j;
1252
1253   char *fname = new char[strlen(CkpvAccess(traceRoot))+strlen(".sum")+1];
1254   sprintf(fname, "%s.sum", CkpvAccess(traceRoot));
1255   FILE *sumfp = fopen(fname, "w+");
1256   //CmiPrintf("File: %s \n", fname);
1257   if(sumfp == 0)
1258       CmiAbort("Cannot open summary sts file for writing.\n");
1259   delete[] fname;
1260
1261   int _numEntries=_entryTable.size();
1262   fprintf(sumfp, "ver:%3.1f %d/%d count:%d ep:%d interval:%e numTracedPE:%d", CkpvAccess(version), CkMyPe(), CkNumPes(), nBins, _numEntries, CkpvAccess(binSize), nTracedPEs);
1263   fprintf(sumfp, "\n");
1264
1265   // write bin time
1266 #if 0
1267   int last=pool[0].getU();
1268   writeU(fp, last);
1269   int count=1;
1270   for(j=1; j<numEntries; j++) {
1271     int u = pool[j].getU();
1272     if (last == u) {
1273       count++;
1274     }
1275     else {
1276       if (count > 1) fprintf(fp, "+%d", count);
1277       writeU(fp, u);
1278       last = u;
1279       count = 1;
1280     }
1281   }
1282   if (count > 1) fprintf(fp, "+%d", count);
1283 #else
1284   for(j=0; j<nBins; j++) {
1285     bins[j].time() /= nTracedPEs;
1286     bins[j].write(sumfp);
1287   }
1288 #endif
1289   fprintf(sumfp, "\n");
1290   fclose(sumfp);
1291
1292 }
1293
1294 extern "C" void CombineSummary()
1295 {
1296 #if CMK_TRACE_ENABLED
1297   CmiPrintf("[%d] CombineSummary called!\n", CkMyPe());
1298   if (sumonly) {
1299     CmiPrintf("[%d] Sum Only start!\n", CkMyPe());
1300       // pe 0 start the sumonly process
1301     CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
1302     sumProxy[0].startSumOnly();
1303   }else if(sumDetail)
1304   {
1305       CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
1306       sumProxy.traceSummaryParallelShutdown(-1);
1307   }
1308   else {
1309     _TRACE_BEGIN_EXECUTE_DETAILED(-1, -1, _threadEP,CkMyPe(), 0, NULL);
1310     CkExit();
1311   }
1312 #else
1313   CkExit();
1314 #endif
1315 }
1316
1317 void initTraceSummaryBOC()
1318 {
1319 #ifdef __BIGSIM__
1320   if(BgNodeRank()==0) {
1321 #else
1322   if (CkMyRank() == 0) {
1323 #endif
1324     registerExitFn(CombineSummary);
1325   }
1326 }
1327
1328
1329
1330
1331
1332
1333 #include "TraceSummary.def.h"
1334
1335
1336 /*@}*/
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356