take Projector out and protect it with a macro CMK_PROJECTOR
[charm.git] / src / ck-perf / trace-summary.C
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 /**
9  * \addtogroup CkPerf
10 */
11 /*@{*/
12
13 #include "charm++.h"
14 #include "trace-summary.h"
15 #include "trace-summaryBOC.h"
16
17 #define DEBUGF(x)  // CmiPrintf x
18
19 #define VER   7.1
20
21 #define INVALIDEP     -2
22 #define TRACEON_EP     -3
23
24 // 1 minutes of run before it'll fill up:
25 #define DefaultBinCount      (1000*60*1) 
26
27 CkpvStaticDeclare(TraceSummary*, _trace);
28 static int _numEvents = 0;
29 #define NUM_DUMMY_EPS 9
30 CkpvDeclare(int, binCount);
31 CkpvDeclare(double, binSize);
32 CkpvDeclare(double, version);
33
34
35 CkpvDeclare(int, previouslySentBins);
36
37
38
39 /** 
40     A class that reads/writes a buffer out of different types of data.
41
42     This class exists because I need to get references to parts of the buffer 
43     that have already been used so that I can increment counters inside the buffer.
44 */
45
46 class compressedBuffer {
47 public:
48   char* buf;
49   int pos; ///<< byte position just beyond the previously read/written data
50
51   compressedBuffer(){
52     buf = NULL;
53     pos = 0;
54   }
55
56   compressedBuffer(int bytes){
57     buf = (char*)malloc(bytes);
58     pos = 0;
59   }
60
61   compressedBuffer(void *buffer){
62     buf = (char*)buffer;
63     pos = 0;
64   }
65     
66   void init(void *buffer){
67     buf = (char*)buffer;
68     pos = 0;
69   }
70     
71   inline void * currentPtr(){
72     return (void*)(buf+pos);
73   }
74
75   template <typename T>
76   T read(int offset){
77     // to resolve unaligned writes causing bus errors, need memcpy
78     T v;
79     memcpy(&v, buf+offset, sizeof(T));
80     return v;
81   }
82     
83   template <typename T>
84   void write(T v, int offset){
85     T v2 = v; // on stack
86     // to resolve unaligned writes causing bus errors, need memcpy
87     memcpy(buf+offset, &v2, sizeof(T));
88   }
89     
90   template <typename T>
91   void increment(int offset){
92     T temp;
93     temp = read<T>(offset);
94     temp ++;
95     write<T>(temp, offset);
96   }
97
98   template <typename T>
99   void accumulate(T v, int offset){
100     T temp;
101     temp = read<T>(offset);
102     temp += v;
103     write<T>(temp, offset);
104   }
105   
106   template <typename T>
107   int push(T v){
108     int oldpos = pos;
109     write<T>(v, pos);
110     pos += sizeof(T);
111     return oldpos;
112   }
113   
114   template <typename T>
115   T pop(){
116     T temp = read<T>(pos);
117     pos += sizeof(T);
118     return temp;
119   }
120
121   template <typename T>
122   T peek(){
123     T temp = read<T>(pos);
124     return temp;
125   }
126
127   template <typename T0, typename T>
128   T peekSecond(){
129     T temp;
130     memcpy(&temp, buf+pos+sizeof(T0), sizeof(T));
131     return temp;
132   }
133
134   int datalength(){
135     return pos;
136   }
137      
138   void * buffer(){
139     return (void*) buf;
140   }  
141
142   void freeBuf(){
143     free(buf);
144   }
145
146   ~compressedBuffer(){
147     // don't free the buf because the user my have supplied the buffer
148   }
149     
150 };
151
152
153
154
155 // Global Readonly
156 CkGroupID traceSummaryGID;
157 bool summaryCcsStreaming;
158
159 int sumonly = 0;
160 int sumDetail = 0;
161
162 /**
163   For each TraceFoo module, _createTraceFoo() must be defined.
164   This function is called in _createTraces() generated in moduleInit.C
165 */
166 void _createTracesummary(char **argv)
167 {
168   DEBUGF(("%d createTraceSummary\n", CkMyPe()));
169   CkpvInitialize(TraceSummary*, _trace);
170   CkpvInitialize(int, previouslySentBins);
171   CkpvAccess(previouslySentBins) = 0;
172   CkpvAccess(_trace) = new  TraceSummary(argv);
173   CkpvAccess(_traces)->addTrace(CkpvAccess(_trace));
174   if (CkMyPe()==0) CkPrintf("Charm++: Tracemode Summary enabled.\n");
175 }
176
177
178 /// function call for starting a phase in trace summary logs 
179 extern "C" 
180 void CkSummary_StartPhase(int phase)
181 {
182    CkpvAccess(_trace)->startPhase(phase);
183 }
184
185
186 /// function call for adding an event mark
187 extern "C" 
188 void CkSummary_MarkEvent(int eventType)
189 {
190    CkpvAccess(_trace)->addEventType(eventType);
191 }
192
193 static inline void writeU(FILE* fp, int u)
194 {
195   fprintf(fp, "%4d", u);
196 }
197
198 PhaseEntry::PhaseEntry() 
199 {
200   int _numEntries=_entryTable.size();
201   // FIXME: Hopes there won't be more than 10 more EP's registered from now on...
202   nEPs = _numEntries+10; 
203   count = new int[nEPs];
204   times = new double[nEPs];
205   maxtimes = new double[nEPs];
206   for (int i=0; i<nEPs; i++) {
207     count[i] = 0;
208     times[i] = 0.0;
209     maxtimes[i] = 0.;
210   }
211 }
212
213 SumLogPool::~SumLogPool() 
214 {
215     if (!sumonly) {
216       write();
217       fclose(fp);
218       if (sumDetail) fclose(sdfp);
219   }
220   // free memory for mark
221   if (markcount > 0)
222   for (int i=0; i<MAX_MARKS; i++) {
223     for (int j=0; j<events[i].length(); j++)
224       delete events[i][j];
225   }
226   delete[] pool;
227   delete[] epInfo;
228   delete[] cpuTime;
229   delete[] numExecutions;
230 }
231
232 void SumLogPool::addEventType(int eventType, double time)
233 {
234    if (eventType <0 || eventType >= MAX_MARKS) {
235        CkPrintf("Invalid event type %d!\n", eventType);
236        return;
237    }
238    MarkEntry *e = new MarkEntry;
239    e->time = time;
240    events[eventType].push_back(e);
241    markcount ++;
242 }
243
244 SumLogPool::SumLogPool(char *pgm) : numBins(0), phaseTab(MAX_PHASES) 
245 {
246    // TBD: Can this be moved to initMem?
247   cpuTime = NULL;
248    poolSize = CkpvAccess(binCount);
249    if (poolSize % 2) poolSize++;        // make sure it is even
250    pool = new BinEntry[poolSize];
251    _MEMCHECK(pool);
252
253    this->pgm = new char[strlen(pgm)+1];
254    strcpy(this->pgm,pgm);
255    
256 #if 0
257    // create the sts file
258    if (CkMyPe() == 0) {
259      char *fname = 
260        new char[strlen(CkpvAccess(traceRoot))+strlen(".sum.sts")+1];
261      sprintf(fname, "%s.sum.sts", CkpvAccess(traceRoot));
262      stsfp = fopen(fname, "w+");
263      //CmiPrintf("File: %s \n", fname);
264      if (stsfp == 0) {
265        CmiAbort("Cannot open summary sts file for writing.\n");
266       }
267      delete[] fname;
268    }
269 #endif
270    
271    // event
272    markcount = 0;
273 }
274
275 void SumLogPool::initMem()
276 {
277    int _numEntries=_entryTable.size();
278    epInfoSize = _numEntries + NUM_DUMMY_EPS + 1; // keep a spare EP
279    epInfo = new SumEntryInfo[epInfoSize];
280    _MEMCHECK(epInfo);
281
282    cpuTime = NULL;
283    numExecutions = NULL;
284    if (sumDetail) {
285        cpuTime = new double[poolSize*epInfoSize];
286        _MEMCHECK(cpuTime);
287        memset(cpuTime, 0, poolSize*epInfoSize*sizeof(double));
288        numExecutions = new int[poolSize*epInfoSize];
289        _MEMCHECK(numExecutions);
290        memset(numExecutions, 0, poolSize*epInfoSize*sizeof(int));
291
292 //         int i, e;
293 //         for(i=0; i<poolSize; i++) {
294 //             for(e=0; e< epInfoSize; e++) {
295 //                 setCPUtime(i,e,0.0);
296 //                 setNumExecutions(i,e,0);
297 //             }
298 //         }
299    }
300 }
301
302 int SumLogPool::getUtilization(int interval, int ep) {
303     return (int)(getCPUtime(interval, ep) * 100.0 / CkpvAccess(binSize)); 
304 }
305
306 void SumLogPool::write(void) 
307 {
308   int i;
309   unsigned int j;
310   int _numEntries=_entryTable.size();
311
312   fp = NULL;
313   sdfp = NULL;
314
315   // create file(s)
316   // CmiPrintf("TRACE: %s:%d\n", fname, errno);
317   if (!sumonly) {
318     char pestr[10];
319     sprintf(pestr, "%d", CkMyPe());
320     int len = strlen(pgm) + strlen(".sumd.") + strlen(pestr) + 1;
321     char *fname = new char[len+1];
322     
323     sprintf(fname, "%s.%s.sum", pgm, pestr);
324     do {
325       fp = fopen(fname, "w+");
326     } while (!fp && errno == EINTR);
327     if (!fp) {
328       CkPrintf("[%d] Attempting to open [%s]\n",CkMyPe(),fname);
329       CmiAbort("Cannot open Summary Trace File for writing...\n");
330     }
331     
332     if (sumDetail) {
333       sprintf(fname, "%s.%s.sumd", pgm, pestr);
334       do {
335         sdfp = fopen(fname, "w+");
336       } while (!sdfp && errno == EINTR);
337       if(!sdfp) {
338         CmiAbort("Cannot open Detailed Summary Trace File for writing...\n");
339       }
340     }
341     delete[] fname;
342   }
343
344   fprintf(fp, "ver:%3.1f %d/%d count:%d ep:%d interval:%e", CkpvAccess(version), CkMyPe(), CkNumPes(), numBins, _numEntries, CkpvAccess(binSize));
345   if (CkpvAccess(version)>=3.0)
346   {
347     fprintf(fp, " phases:%d", phaseTab.numPhasesCalled());
348   }
349   fprintf(fp, "\n");
350
351   // write bin time
352 #if 1
353   int last=pool[0].getU();
354   writeU(fp, last);
355   int count=1;
356   for(j=1; j<numBins; j++) {
357     int u = pool[j].getU();
358     if (last == u) {
359       count++;
360     }
361     else {
362       if (count > 1) fprintf(fp, "+%d", count);
363       writeU(fp, u);
364       last = u;
365       count = 1;
366     }
367   }
368   if (count > 1) fprintf(fp, "+%d", count);
369 #else
370   for(j=0; j<numEntries; j++) 
371       pool[j].write(fp);
372 #endif
373   fprintf(fp, "\n");
374
375   // write entry execution time
376   fprintf(fp, "EPExeTime: ");
377   for (i=0; i<_numEntries; i++)
378     fprintf(fp, "%ld ", (long)(epInfo[i].epTime*1.0e6));
379   fprintf(fp, "\n");
380   // write entry function call times
381   fprintf(fp, "EPCallTime: ");
382   for (i=0; i<_numEntries; i++)
383     fprintf(fp, "%d ", epInfo[i].epCount);
384   fprintf(fp, "\n");
385   // write max entry function execute times
386   fprintf(fp, "MaxEPTime: ");
387   for (i=0; i<_numEntries; i++)
388     fprintf(fp, "%ld ", (long)(epInfo[i].epMaxTime*1.0e6));
389   fprintf(fp, "\n");
390 #if 0
391   for (i=0; i<SumEntryInfo::HIST_SIZE; i++) {
392     for (j=0; j<_numEntries; j++) 
393       fprintf(fp, "%d ", epInfo[j].hist[i]);
394     fprintf(fp, "\n");
395   }
396 #endif
397   // write marks
398   if (CkpvAccess(version)>=2.0) 
399   {
400     fprintf(fp, "NumMarks: %d ", markcount);
401     for (i=0; i<MAX_MARKS; i++) {
402       for(int j=0; j<events[i].length(); j++)
403         fprintf(fp, "%d %f ", i, events[i][j]->time);
404     }
405     fprintf(fp, "\n");
406   }
407   // write phases
408   if (CkpvAccess(version)>=3.0)
409   {
410     phaseTab.write(fp);
411   }
412   // write idle time
413   if (CkpvAccess(version)>=7.1) {
414     fprintf(fp, "IdlePercent: ");
415     int last=pool[0].getUIdle();
416     writeU(fp, last);
417     int count=1;
418     for(j=1; j<numBins; j++) {
419       int u = pool[j].getUIdle();
420       if (last == u) {
421         count++;
422       }
423       else {
424         if (count > 1) fprintf(fp, "+%d", count);
425         writeU(fp, u);
426         last = u;
427         count = 1;
428       }
429     }
430     if (count > 1) fprintf(fp, "+%d", count);
431     fprintf(fp, "\n");
432   }
433
434   CkPrintf("writing to detail file:%d    %d \n", getNumEntries(), numBins);
435   // write summary details
436   if (sumDetail) {
437         fprintf(sdfp, "ver:%3.1f cpu:%d/%d numIntervals:%d numEPs:%d intervalSize:%e\n",
438                 CkpvAccess(version), CkMyPe(), CkNumPes(),
439                 numBins, _numEntries, CkpvAccess(binSize));
440
441         // Write out cpuTime in microseconds
442         // Run length encoding (RLE) along EP axis
443         fprintf(sdfp, "ExeTimePerEPperInterval ");
444         unsigned int e, i;
445         long last= (long) (getCPUtime(0,0)*1.0e6);
446         int count=0;
447         fprintf(sdfp, "%ld", last);
448         for(e=0; e<_numEntries; e++) {
449             for(i=0; i<numBins; i++) {
450
451                 long u= (long) (getCPUtime(i,e)*1.0e6);
452                 if (last == u) {
453                     count++;
454                 } else {
455
456                     if (count > 1) fprintf(sdfp, "+%d", count);
457                     fprintf(sdfp, " %ld", u);
458                     last = u;
459                     count = 1;
460                 }
461             }
462         }
463         if (count > 1) fprintf(sdfp, "+%d", count);
464         fprintf(sdfp, "\n");
465         // Write out numExecutions
466         // Run length encoding (RLE) along EP axis
467         fprintf(sdfp, "EPCallTimePerInterval ");
468         last= getNumExecutions(0,0);
469         count=0;
470         fprintf(sdfp, "%ld", last);
471         for(e=0; e<_numEntries; e++) {
472             for(i=0; i<numBins; i++) {
473
474                 long u= getNumExecutions(i, e);
475                 if (last == u) {
476                     count++;
477                 } else {
478
479                     if (count > 1) fprintf(sdfp, "+%d", count);
480                     fprintf(sdfp, " %ld", u);
481                     last = u;
482                     count = 1;
483                 }
484             }
485         }
486         if (count > 1) fprintf(sdfp, "+%d", count);
487         fprintf(sdfp, "\n");
488   }
489 }
490
491 void SumLogPool::writeSts(void)
492 {
493     // open sts file
494   char *fname = 
495        new char[strlen(CkpvAccess(traceRoot))+strlen(".sum.sts")+1];
496   sprintf(fname, "%s.sum.sts", CkpvAccess(traceRoot));
497   stsfp = fopen(fname, "w+");
498   //CmiPrintf("File: %s \n", fname);
499   if (stsfp == 0) {
500        CmiAbort("Cannot open summary sts file for writing.\n");
501   }
502   delete[] fname;
503
504   traceWriteSTS(stsfp,_numEvents);
505   for(int i=0;i<_numEvents;i++)
506     fprintf(stsfp, "EVENT %d Event%d\n", i, i);
507   fprintf(stsfp, "END\n");
508
509   fclose(stsfp);
510 }
511
512 // Called once per interval
513 void SumLogPool::add(double time, double idleTime, int pe) 
514 {
515   new (&pool[numBins++]) BinEntry(time, idleTime);
516   if (poolSize==numBins) {
517     shrink();
518   }
519 }
520
521 // Called once per run of an EP
522 // adds 'time' to EP's time, increments epCount
523 void SumLogPool::setEp(int epidx, double time) 
524 {
525   if (epidx >= epInfoSize) {
526         CmiAbort("Invalid entry point!!\n");
527   }
528   //CmiPrintf("set EP: %d %e \n", epidx, time);
529   epInfo[epidx].setTime(time);
530   // set phase table counter
531   phaseTab.setEp(epidx, time);
532 }
533
534 // Called once from endExecute, endPack, etc. this function updates
535 // the sumDetail intervals.
536 void SumLogPool::updateSummaryDetail(int epIdx, double startTime, double endTime)
537 {
538         if (epIdx >= epInfoSize) {
539             CmiAbort("Too many entry points!!\n");
540         }
541
542         double binSz = CkpvAccess(binSize);
543         int startingBinIdx, endingBinIdx;
544         startingBinIdx = (int)(startTime/binSz);
545         endingBinIdx = (int)(endTime/binSz);
546         // shrink if needed
547         while (endingBinIdx >= poolSize) {
548           shrink();
549           CmiAssert(CkpvAccess(binSize) > binSz);
550           binSz = CkpvAccess(binSize);
551           startingBinIdx = (int)(startTime/binSz);
552           endingBinIdx = (int)(endTime/binSz);
553         }
554
555         if (startingBinIdx == endingBinIdx) {
556             addToCPUtime(startingBinIdx, epIdx, endTime - startTime);
557         } else if (startingBinIdx < endingBinIdx) { // EP spans intervals
558             addToCPUtime(startingBinIdx, epIdx, (startingBinIdx+1)*binSz - startTime);
559             while(++startingBinIdx < endingBinIdx)
560                 addToCPUtime(startingBinIdx, epIdx, binSz);
561             addToCPUtime(endingBinIdx, epIdx, endTime - endingBinIdx*binSz);
562         } else {
563           CkPrintf("[%d] EP:%d Start:%lf End:%lf\n",CkMyPe(),epIdx,
564                    startTime, endTime);
565             CmiAbort("Error: end time of EP is less than start time\n");
566         }
567
568         incNumExecutions(startingBinIdx, epIdx);
569 }
570
571 // Shrinks pool[], cpuTime[], and numExecutions[]
572 void SumLogPool::shrink(void)
573 {
574 //  double t = CmiWallTimer();
575
576   // We ensured earlier that poolSize is even; therefore now numBins
577   // == poolSize == even.
578   int entries = numBins/2;
579   for (int i=0; i<entries; i++)
580   {
581      pool[i].time() = pool[i*2].time() + pool[i*2+1].time();
582      pool[i].getIdleTime() = pool[i*2].getIdleTime() + pool[i*2+1].getIdleTime();
583      if (sumDetail)
584      for (int e=0; e < epInfoSize; e++) {
585          setCPUtime(i, e, getCPUtime(i*2, e) + getCPUtime(i*2+1, e));
586          setNumExecutions(i, e, getNumExecutions(i*2, e) + getNumExecutions(i*2+1, e));
587      }
588   }
589   // zero out the remaining intervals
590   if (sumDetail) {
591     memset(&cpuTime[entries*epInfoSize], 0, (numBins-entries)*epInfoSize*sizeof(double));
592     memset(&numExecutions[entries*epInfoSize], 0, (numBins-entries)*epInfoSize*sizeof(int));
593   }
594   numBins = entries;
595   CkpvAccess(binSize) *= 2;
596
597 //CkPrintf("Shrinked binsize: %f entries:%d takes %fs!!!!\n", CkpvAccess(binSize), numEntries, CmiWallTimer()-t);
598 }
599
600 void SumLogPool::shrink(double _maxBinSize)
601 {
602     while(CkpvAccess(binSize) < _maxBinSize)
603     {
604         shrink();
605     };
606 }
607 int  BinEntry::getU() 
608
609   return (int)(_time * 100.0 / CkpvAccess(binSize)); 
610 }
611
612 int BinEntry::getUIdle() {
613   return (int)(_idleTime * 100.0 / CkpvAccess(binSize));
614 }
615
616 void BinEntry::write(FILE* fp)
617 {
618   writeU(fp, getU());
619 }
620
621 TraceSummary::TraceSummary(char **argv):binStart(0.0),idleStart(0.0),
622                                         binTime(0.0),binIdle(0.0),msgNum(0)
623 {
624   if (CkpvAccess(traceOnPe) == 0) return;
625
626     // use absolute time
627   if (CmiTimerAbsolute()) binStart = CmiInitTime();
628
629   CkpvInitialize(int, binCount);
630   CkpvInitialize(double, binSize);
631   CkpvInitialize(double, version);
632   CkpvAccess(binSize) = BIN_SIZE;
633   CkpvAccess(version) = VER;
634   CkpvAccess(binCount) = DefaultBinCount;
635   if (CmiGetArgIntDesc(argv,"+bincount",&CkpvAccess(binCount), "Total number of summary bins"))
636     if (CkMyPe() == 0) 
637       CmiPrintf("Trace: bincount: %d\n", CkpvAccess(binCount));
638   CmiGetArgDoubleDesc(argv,"+binsize",&CkpvAccess(binSize),
639         "CPU usage log time resolution");
640   CmiGetArgDoubleDesc(argv,"+version",&CkpvAccess(version),
641         "Write this .sum file version");
642
643   epThreshold = 0.001; 
644   CmiGetArgDoubleDesc(argv,"+epThreshold",&epThreshold,
645         "Execution time histogram lower bound");
646   epInterval = 0.001; 
647   CmiGetArgDoubleDesc(argv,"+epInterval",&epInterval,
648         "Execution time histogram bin size");
649
650   sumonly = CmiGetArgFlagDesc(argv, "+sumonly", "merge histogram bins on processor 0");
651   // +sumonly overrides +sumDetail
652   if (!sumonly)
653       sumDetail = CmiGetArgFlagDesc(argv, "+sumDetail", "more detailed summary info");
654
655   _logPool = new SumLogPool(CkpvAccess(traceRoot));
656   // assume invalid entry point on start
657   execEp=INVALIDEP;
658   inIdle = 0;
659   inExec = 0;
660 }
661
662 void TraceSummary::traceClearEps(void)
663 {
664   _logPool->clearEps();
665 }
666
667 void TraceSummary::traceWriteSts(void)
668 {
669   if(CkMyPe()==0)
670       _logPool->writeSts();
671 }
672
673 void TraceSummary::traceClose(void)
674 {
675     if(CkMyPe()==0)
676         _logPool->writeSts();
677     CkpvAccess(_trace)->endComputation();
678
679     delete _logPool;
680     CkpvAccess(_traces)->removeTrace(this);
681 }
682
683 void TraceSummary::beginExecute(CmiObjId *tid)
684 {
685   beginExecute(-1,-1,_threadEP,-1);
686 }
687
688 void TraceSummary::beginExecute(envelope *e)
689 {
690   // no message means thread execution
691   if (e==NULL) {
692     beginExecute(-1,-1,_threadEP,-1);
693   }
694   else {
695     beginExecute(-1,-1,e->getEpIdx(),-1);
696   }  
697 }
698
699 void TraceSummary::beginExecute(char *msg)
700 {
701 #if CMK_SMP_TRACE_COMMTHREAD
702     //This function is called from comm thread in SMP mode
703     envelope *e = (envelope *)msg;
704     int num = _entryTable.size();
705     int ep = e->getEpIdx();
706     if(ep<0 || ep>=num) return;
707     if(_entryTable[ep]->traceEnabled)
708         beginExecute(-1,-1,e->getEpIdx(),-1);
709 #endif
710 }
711
712 void TraceSummary::beginExecute(int event,int msgType,int ep,int srcPe, int mlen, CmiObjId *idx)
713 {
714   if (execEp == TRACEON_EP) {
715     endExecute();
716   }
717   CmiAssert(inIdle == 0 && inExec == 0);
718   inExec = 1;
719
720   if (execEp != INVALIDEP) {
721     TRACE_WARN("Warning: TraceSummary two consecutive BEGIN_PROCESSING!\n");
722     return;
723   }
724   
725   execEp=ep;
726   double t = TraceTimer();
727   //CmiPrintf("start: %f \n", start);
728   
729   start = t;
730   double ts = binStart;
731   // fill gaps
732   while ((ts = ts + CkpvAccess(binSize)) < t) {
733     /* Keep as a template for error checking. The current form of this check
734        is vulnerable to round-off errors (eg. 0.001 vs 0.001 the first time
735        I used it). Perhaps an improved form could be used in case vastly
736        incompatible EP vs idle times are reported (binSize*2?).
737
738        This check will have to be duplicated before each call to add()
739
740     CkPrintf("[%d] %f vs %f\n", CkMyPe(),
741              binTime + binIdle, CkpvAccess(binSize));
742     CkAssert(binTime + binIdle <= CkpvAccess(binSize));
743     */
744      _logPool->add(binTime, binIdle, CkMyPe()); // add leftovers of last bin
745      binTime=0.0;                 // fill all other bins with 0 up till start
746      binIdle = 0.0;
747      binStart = ts;
748   }
749 }
750
751 void TraceSummary::endExecute(void)
752 {
753   CmiAssert(inIdle == 0 && inExec == 1);
754   inExec = 0;
755  
756   double t = TraceTimer();
757   double ts = start;
758   double nts = binStart;
759
760 /*
761   if (execEp == TRACEON_EP) {
762     // if trace just got turned on, then one expects to see this
763     // END_PROCESSING event without seeing a preceeding BEGIN_PROCESSING
764     return;
765   }
766 */
767
768   if (execEp == INVALIDEP) {
769     TRACE_WARN("Warning: TraceSummary END_PROCESSING without BEGIN_PROCESSING!\n");
770     return;
771   }
772
773   if (execEp >= 0)
774   {
775     _logPool->setEp(execEp, t-ts);
776   }
777
778   while ((nts = nts + CkpvAccess(binSize)) < t)
779   {
780     // fill the bins with time for this entry method
781      binTime += nts-ts;
782      binStart  = nts;
783      // This calls shrink() if needed
784      _logPool->add(binTime, binIdle, CkMyPe()); 
785      binTime = 0.0;
786      binIdle = 0.0;
787      ts = nts;
788   }
789   binTime += t - ts;
790
791   if (sumDetail && execEp >= 0 )
792       _logPool->updateSummaryDetail(execEp, start, t);
793
794   execEp = INVALIDEP;
795 }
796
797 void TraceSummary::endExecute(char *msg){
798 #if CMK_SMP_TRACE_COMMTHREAD
799     //This function is called from comm thread in SMP mode
800     envelope *e = (envelope *)msg;
801     int num = _entryTable.size();
802     int ep = e->getEpIdx();
803     if(ep<0 || ep>=num) return;
804     if(_entryTable[ep]->traceEnabled){
805         endExecute();
806     }
807 #endif    
808 }
809
810 void TraceSummary::beginIdle(double currT)
811 {
812   if (execEp == TRACEON_EP) {
813     endExecute();
814   }
815
816   CmiAssert(inIdle == 0 && inExec == 0);
817   inIdle = 1;
818   double t = TraceTimer(currT);
819   
820   // mark the time of this idle period. Only the next endIdle should see
821   // this value
822   idleStart = t; 
823   double ts = binStart;
824   // fill gaps
825   while ((ts = ts + CkpvAccess(binSize)) < t) {
826     _logPool->add(binTime, binIdle, CkMyPe()); // add leftovers of last bin
827     binTime=0.0;                 // fill all other bins with 0 up till start
828     binIdle = 0.0;
829     binStart = ts;
830   }
831 }
832
833 void TraceSummary::endIdle(double currT)
834 {
835   CmiAssert(inIdle == 1 && inExec == 0);
836   inIdle = 0;
837   double t = TraceTimer(currT);
838   double t_idleStart = idleStart;
839   double t_binStart = binStart;
840
841   while ((t_binStart = t_binStart + CkpvAccess(binSize)) < t)
842   {
843     // fill the bins with time for idle
844     binIdle += t_binStart - t_idleStart;
845     binStart = t_binStart;
846     _logPool->add(binTime, binIdle, CkMyPe()); // This calls shrink() if needed
847     binTime = 0.0;
848     binIdle = 0.0;
849     t_idleStart = t_binStart;
850   }
851   binIdle += t - t_idleStart;
852 }
853
854 void TraceSummary::traceBegin(void)
855 {
856     // fake as a start of an event, assuming traceBegin is called inside an
857     // entry function.
858   beginExecute(-1, -1, TRACEON_EP, -1, -1);
859 }
860
861 void TraceSummary::traceEnd(void)
862 {
863   endExecute();
864 }
865
866 void TraceSummary::beginPack(void)
867 {
868     packstart = CmiWallTimer();
869 }
870
871 void TraceSummary::endPack(void)
872 {
873     _logPool->setEp(_packEP, CmiWallTimer() - packstart);
874     if (sumDetail)
875         _logPool->updateSummaryDetail(_packEP,  TraceTimer(packstart), TraceTimer(CmiWallTimer()));
876 }
877
878 void TraceSummary::beginUnpack(void)
879 {
880     unpackstart = CmiWallTimer();
881 }
882
883 void TraceSummary::endUnpack(void)
884 {
885     _logPool->setEp(_unpackEP, CmiWallTimer()-unpackstart);
886     if (sumDetail)
887         _logPool->updateSummaryDetail(_unpackEP,  TraceTimer(unpackstart), TraceTimer(CmiWallTimer()));
888 }
889
890 void TraceSummary::beginComputation(void)
891 {
892   // initialze arrays because now the number of entries is known.
893   _logPool->initMem();
894 }
895
896 void TraceSummary::endComputation(void)
897 {
898   static int done = 0;
899   if (done) return;
900   done = 1;
901   if (msgNum==0) {
902 //CmiPrintf("Add at last: %d pe:%d time:%f msg:%d\n", index, CkMyPe(), bin, msgNum);
903      _logPool->add(binTime, binIdle, CkMyPe());
904      binTime = 0.0;
905      binIdle = 0.0;
906      msgNum ++;
907
908      binStart  += CkpvAccess(binSize);
909      double t = TraceTimer();
910      double ts = binStart;
911      while (ts < t)
912      {
913        _logPool->add(binTime, binIdle, CkMyPe());
914        binTime=0.0;
915        binIdle = 0.0;
916        ts += CkpvAccess(binSize);
917      }
918
919   }
920 }
921
922 void TraceSummary::addEventType(int eventType)
923 {
924   _logPool->addEventType(eventType, TraceTimer());
925 }
926
927 void TraceSummary::startPhase(int phase)
928 {
929    _logPool->startPhase(phase);
930 }
931
932 void TraceSummary::traceEnableCCS() {
933   CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
934   sumProxy.initCCS();
935 }
936
937
938 void TraceSummary::fillData(double *buffer, double reqStartTime, 
939                             double reqBinSize, int reqNumBins) {
940   // buffer has to be pre-allocated by the requester and must be an array of
941   // size reqNumBins.
942   //
943   // Assumptions: **CWL** FOR DEMO ONLY - a production-capable version will
944   //              need a number of these assumptions dropped:
945   //              1) reqBinSize == binSize (unrealistic)
946   //              2) bins boundary aligned (ok even under normal circumstances)
947   //              3) bins are "factor"-aligned (where reqBinSize != binSize)
948   //              4) bins are always available (true unless flush)
949   //              5) bins always starts from 0 (unrealistic)
950
951   // works only because of 1)
952   // **CWL** - FRACKING STUPID NAME "binStart" has nothing to do with 
953   //           "starting" at all!
954   int binOffset = (int)(reqStartTime/reqBinSize); 
955   for (int i=binOffset; i<binOffset + reqNumBins; i++) {
956     // CkPrintf("[%d] %f\n", i, pool()->getTime(i));
957     buffer[i-binOffset] = pool()->getTime(i);
958   }
959 }
960
961 void TraceSummaryBOC::traceSummaryParallelShutdown(int pe) {
962    
963     UInt    numBins = CkpvAccess(_trace)->pool()->getNumEntries();  
964     //CkPrintf("trace shut down pe=%d bincount=%d\n", CkMyPe(), numBins);
965     CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
966     CkCallback cb(CkIndex_TraceSummaryBOC::maxBinSize(NULL), sumProxy[0]);
967     contribute(sizeof(double), &(CkpvAccess(binSize)), CkReduction::max_double, cb);
968 }
969
970 // collect the max bin size
971 void TraceSummaryBOC::maxBinSize(CkReductionMsg *msg)
972 {
973     double _maxBinSize = *((double *)msg->getData());
974     CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
975     sumProxy.shrink(_maxBinSize);
976 }
977
978 void TraceSummaryBOC::shrink(double _mBin){
979     UInt    numBins = CkpvAccess(_trace)->pool()->getNumEntries();  
980     UInt    epNums  = CkpvAccess(_trace)->pool()->getEpInfoSize();
981     _maxBinSize = _mBin;
982     if(CkpvAccess(binSize) < _maxBinSize)
983     {
984         CkpvAccess(_trace)->pool()->shrink(_maxBinSize);
985     }
986     double *sumData = CkpvAccess(_trace)->pool()->getCpuTime();  
987     CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
988     CkCallback cb(CkIndex_TraceSummaryBOC::sumData(NULL), sumProxy[0]);
989     contribute(sizeof(double) * numBins * epNums, CkpvAccess(_trace)->pool()->getCpuTime(), CkReduction::sum_double, cb);
990 }
991
992 void TraceSummaryBOC::sumData(CkReductionMsg *msg) {
993     double *sumData = (double *)msg->getData();
994     int     totalsize = msg->getSize()/sizeof(double);
995     UInt    epNums  = CkpvAccess(_trace)->pool()->getEpInfoSize();
996     UInt    numBins = totalsize/epNums;  
997     int     numEntries = epNums - NUM_DUMMY_EPS - 1; 
998     char    *fname = new char[strlen(CkpvAccess(traceRoot))+strlen(".sumall")+1];
999     sprintf(fname, "%s.sumall", CkpvAccess(traceRoot));
1000     FILE *sumfp = fopen(fname, "w+");
1001     fprintf(sumfp, "ver:%3.1f cpu:%d numIntervals:%d numEPs:%d intervalSize:%e\n",
1002                 CkpvAccess(version), CkNumPes(),
1003                 numBins, numEntries, _maxBinSize);
1004     for(int i=0; i<numBins; i++){
1005         for(int j=0; j<numEntries; j++)
1006         {
1007             fprintf(sumfp, "%ld ", (long)(sumData[i*epNums+j]*1.0e6));
1008         }
1009     }
1010    fclose(sumfp);
1011    //CkPrintf("done with analysis\n");
1012    CkExit();
1013 }
1014
1015 /// for TraceSummaryBOC
1016
1017 void TraceSummaryBOC::initCCS() {
1018   if(firstTime){
1019     CkPrintf("[%d] initCCS() called for first time\n", CkMyPe());
1020     // initializing CCS-based parameters on all processors
1021     lastRequestedIndexBlock = 0;
1022     indicesPerBlock = 1000;
1023     collectionGranularity = 0.001; // time in seconds
1024     nBufferedBins = 0;
1025     
1026     // initialize buffer, register CCS handler and start the collection
1027     // pulse only on pe 0.
1028     if (CkMyPe() == 0) { 
1029       ccsBufferedData = new CkVec<double>();
1030     
1031       CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
1032       CkPrintf("Trace Summary now listening in for CCS Client\n");
1033       CcsRegisterHandler("CkPerfSummaryCcsClientCB", 
1034                          CkCallback(CkIndex_TraceSummaryBOC::ccsRequestSummaryDouble(NULL), sumProxy[0]));
1035       CcsRegisterHandler("CkPerfSummaryCcsClientCB uchar", 
1036                          CkCallback(CkIndex_TraceSummaryBOC::ccsRequestSummaryUnsignedChar(NULL), sumProxy[0])); 
1037
1038       CkPrintf("[%d] Setting up periodic startCollectData callback\n", CkMyPe());
1039       CcdCallOnConditionKeep(CcdPERIODIC_1second, startCollectData,
1040                              (void *)this);
1041       summaryCcsStreaming = CmiTrue;
1042     }
1043     firstTime = false;
1044   }
1045 }
1046
1047 /** Return summary information as double precision values for each sample period. 
1048     The actual data collection is in double precision values. 
1049
1050     The units on the returned values are total execution time across all PEs.
1051 */
1052 void TraceSummaryBOC::ccsRequestSummaryDouble(CkCcsRequestMsg *m) {
1053   double *sendBuffer;
1054
1055   CkPrintf("[%d] Request from Client detected.\n", CkMyPe());
1056
1057   CkPrintf("Responding ...\n");
1058   int datalength = 0;
1059   // if we have no data to send, send an acknowledgement code of -13.37
1060   // instead.
1061   if (ccsBufferedData->length() == 0) {
1062     sendBuffer = new double[1];
1063     sendBuffer[0] = -13.37;
1064     datalength = sizeof(double);
1065     CcsSendDelayedReply(m->reply, datalength, (void *)sendBuffer);
1066     delete [] sendBuffer;
1067   } else {
1068     sendBuffer = ccsBufferedData->getVec();
1069     datalength = ccsBufferedData->length()*sizeof(double);
1070     CcsSendDelayedReply(m->reply, datalength, (void *)sendBuffer);
1071     ccsBufferedData->free();
1072   }
1073   CkPrintf("Response Sent. Proceeding with computation.\n");
1074   delete m;
1075 }
1076
1077
1078 /** Return summary information as unsigned char values for each sample period. 
1079     The actual data collection is in double precision values.
1080
1081     This returns the utilization in a range from 0 to 200.
1082 */
1083 void TraceSummaryBOC::ccsRequestSummaryUnsignedChar(CkCcsRequestMsg *m) {
1084   unsigned char *sendBuffer;
1085
1086   CkPrintf("[%d] Request from Client detected. \n", CkMyPe());
1087
1088   CkPrintf("Responding ...\n");
1089   int datalength = 0;
1090
1091   if (ccsBufferedData->length() == 0) {
1092     sendBuffer = new unsigned char[1];
1093     sendBuffer[0] = 255;
1094     datalength = sizeof(unsigned char);
1095     CcsSendDelayedReply(m->reply, datalength, (void *)sendBuffer);
1096     delete [] sendBuffer;
1097   } else {
1098     double * doubleData = ccsBufferedData->getVec();
1099     int numData = ccsBufferedData->length();
1100     
1101     // pack data into unsigned char array
1102     sendBuffer = new unsigned char[numData];
1103     
1104     for(int i=0;i<numData;i++){
1105       sendBuffer[i] = 1000.0 * doubleData[i] / (double)CkNumPes() * 200.0; // max = 200 is the same as 100% utilization
1106     }    
1107
1108     datalength = sizeof(unsigned char) * numData;
1109     
1110     CcsSendDelayedReply(m->reply, datalength, (void *)sendBuffer);
1111     ccsBufferedData->free();
1112     delete [] sendBuffer;
1113   }
1114   CkPrintf("Response Sent. Proceeding with computation.\n");
1115   delete m;
1116 }
1117
1118
1119
1120 void startCollectData(void *data, double currT) {
1121   CkAssert(CkMyPe() == 0);
1122   // CkPrintf("startCollectData()\n");
1123   TraceSummaryBOC *sumObj = (TraceSummaryBOC *)data;
1124   int lastRequestedIndexBlock = sumObj->lastRequestedIndexBlock;
1125   double collectionGranularity = sumObj->collectionGranularity;
1126   int indicesPerBlock = sumObj->indicesPerBlock;
1127   
1128   double startTime = lastRequestedIndexBlock*
1129     collectionGranularity * indicesPerBlock;
1130   int numIndicesToGet = (int)floor((currT - startTime)/
1131                                    collectionGranularity);
1132   int numBlocksToGet = numIndicesToGet/indicesPerBlock;
1133   // **TODO** consider limiting the total number of blocks each collection
1134   //   request will pick up. This is to limit the amount of perturbation
1135   //   if it proves to be a problem.
1136   CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
1137
1138    sumProxy.collectSummaryData(startTime, 
1139                        collectionGranularity,
1140                        numBlocksToGet*indicesPerBlock);
1141   // assume success
1142   sumObj->lastRequestedIndexBlock += numBlocksToGet; 
1143 }
1144
1145 void TraceSummaryBOC::collectSummaryData(double startTime, double binSize,
1146                                   int numBins) {
1147   // CkPrintf("[%d] asked to contribute performance data\n", CkMyPe());
1148
1149   double *contribution = new double[numBins];
1150   for (int i=0; i<numBins; i++) {
1151     contribution[i] = 0.0;
1152   }
1153   CkpvAccess(_trace)->fillData(contribution, startTime, binSize, numBins);
1154
1155   /*
1156   for (int i=0; i<numBins; i++) {
1157     CkPrintf("[%d] %f\n", i, contribution[i]);
1158   }
1159   */
1160
1161   CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
1162   CkCallback cb(CkIndex_TraceSummaryBOC::summaryDataCollected(NULL), sumProxy[0]);
1163   contribute(sizeof(double)*numBins, contribution, CkReduction::sum_double, 
1164              cb);
1165 }
1166
1167 void TraceSummaryBOC::summaryDataCollected(CkReductionMsg *msg) {
1168   CkAssert(CkMyPe() == 0);
1169   // **CWL** No memory management for the ccs buffer for now.
1170
1171   // CkPrintf("[%d] Reduction completed and received\n", CkMyPe());
1172   double *recvData = (double *)msg->getData();
1173   int numBins = msg->getSize()/sizeof(double);
1174
1175   // if there's an easier way to append a data block to a CkVec, I'll take it
1176   for (int i=0; i<numBins; i++) {
1177     ccsBufferedData->insertAtEnd(recvData[i]);
1178   }
1179   delete msg;
1180 }
1181
1182
1183
1184
1185 void TraceSummaryBOC::startSumOnly()
1186 {
1187   CmiAssert(CkMyPe() == 0);
1188
1189   CProxy_TraceSummaryBOC p(traceSummaryGID);
1190   int size = CkpvAccess(_trace)->pool()->getNumEntries();
1191   p.askSummary(size);
1192 }
1193
1194 void TraceSummaryBOC::askSummary(int size)
1195 {
1196   if (CkpvAccess(_trace) == NULL) return;
1197
1198   int traced = CkpvAccess(_trace)->traceOnPE();
1199
1200   BinEntry *reductionBuffer = new BinEntry[size+1];
1201   reductionBuffer[size].time() = traced;  // last element is the traced pe count
1202   reductionBuffer[size].getIdleTime() = 0;  // last element is the traced pe count
1203   if (traced) {
1204     CkpvAccess(_trace)->endComputation();
1205     int n = CkpvAccess(_trace)->pool()->getNumEntries();
1206     BinEntry *localBins = CkpvAccess(_trace)->pool()->bins();
1207     if (n>size) n=size;
1208     for (int i=0; i<n; i++) reductionBuffer[i] = localBins[i];
1209   }
1210
1211   contribute(sizeof(BinEntry)*(size+1), reductionBuffer, 
1212              CkReduction::sum_double);
1213   delete [] reductionBuffer;
1214 }
1215
1216 //extern "C" void _CkExit();
1217
1218 void TraceSummaryBOC::sendSummaryBOC(CkReductionMsg *msg)
1219 {
1220   if (CkpvAccess(_trace) == NULL) return;
1221
1222   CkAssert(CkMyPe() == 0);
1223
1224   int n = msg->getSize()/sizeof(BinEntry);
1225   nBins = n-1;
1226   bins = (BinEntry *)msg->getData();
1227   nTracedPEs = (int)bins[n-1].time();
1228   // CmiPrintf("traced: %d entry:%d\n", nTracedPEs, nBins);
1229
1230   write();
1231
1232   delete msg;
1233
1234   CkExit();
1235 }
1236
1237 void TraceSummaryBOC::write(void) 
1238 {
1239   unsigned int j;
1240
1241   char *fname = new char[strlen(CkpvAccess(traceRoot))+strlen(".sum")+1];
1242   sprintf(fname, "%s.sum", CkpvAccess(traceRoot));
1243   FILE *sumfp = fopen(fname, "w+");
1244   //CmiPrintf("File: %s \n", fname);
1245   if(sumfp == 0)
1246       CmiAbort("Cannot open summary sts file for writing.\n");
1247   delete[] fname;
1248
1249   int _numEntries=_entryTable.size();
1250   fprintf(sumfp, "ver:%3.1f %d/%d count:%d ep:%d interval:%e numTracedPE:%d", CkpvAccess(version), CkMyPe(), CkNumPes(), nBins, _numEntries, CkpvAccess(binSize), nTracedPEs);
1251   fprintf(sumfp, "\n");
1252
1253   // write bin time
1254 #if 0
1255   int last=pool[0].getU();
1256   writeU(fp, last);
1257   int count=1;
1258   for(j=1; j<numEntries; j++) {
1259     int u = pool[j].getU();
1260     if (last == u) {
1261       count++;
1262     }
1263     else {
1264       if (count > 1) fprintf(fp, "+%d", count);
1265       writeU(fp, u);
1266       last = u;
1267       count = 1;
1268     }
1269   }
1270   if (count > 1) fprintf(fp, "+%d", count);
1271 #else
1272   for(j=0; j<nBins; j++) {
1273     bins[j].time() /= nTracedPEs;
1274     bins[j].write(sumfp);
1275   }
1276 #endif
1277   fprintf(sumfp, "\n");
1278   fclose(sumfp);
1279
1280 }
1281
1282 extern "C" void CombineSummary()
1283 {
1284 #if CMK_TRACE_ENABLED
1285   CmiPrintf("[%d] CombineSummary called!\n", CkMyPe());
1286   if (sumonly) {
1287     CmiPrintf("[%d] Sum Only start!\n", CkMyPe());
1288       // pe 0 start the sumonly process
1289     CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
1290     sumProxy[0].startSumOnly();
1291   }else if(sumDetail)
1292   {
1293       CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
1294       sumProxy.traceSummaryParallelShutdown(-1);
1295   }
1296   else {
1297     _TRACE_BEGIN_EXECUTE_DETAILED(-1, -1, _threadEP,CkMyPe(), 0, NULL);
1298     CkExit();
1299   }
1300 #else
1301   CkExit();
1302 #endif
1303 }
1304
1305 void initTraceSummaryBOC()
1306 {
1307 #ifdef __BLUEGENE__
1308   if(BgNodeRank()==0) {
1309 #else
1310   if (CkMyRank() == 0) {
1311 #endif
1312     registerExitFn(CombineSummary);
1313   }
1314 }
1315
1316
1317
1318
1319
1320
1321 #include "TraceSummary.def.h"
1322
1323
1324 /*@}*/
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344