Merge branch 'charm' of charmgit:charm into charm
[charm.git] / src / ck-perf / trace-summary.C
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 /**
9  * \addtogroup CkPerf
10 */
11 /*@{*/
12
13 #include "charm++.h"
14 #include "trace-summary.h"
15 #include "trace-summaryBOC.h"
16
17 #define DEBUGF(x)  // CmiPrintf x
18
19 #define VER   7.1
20
21 #define INVALIDEP     -2
22 #define TRACEON_EP     -3
23
24 // 1 minutes of run before it'll fill up:
25 #define DefaultBinCount      (1000*60*1) 
26
27 CkpvStaticDeclare(TraceSummary*, _trace);
28 static int _numEvents = 0;
29 #define NUM_DUMMY_EPS 9
30 CkpvDeclare(int, binCount);
31 CkpvDeclare(double, binSize);
32 CkpvDeclare(double, version);
33
34
35 CkpvDeclare(int, previouslySentBins);
36
37
38
39 /** 
40     A class that reads/writes a buffer out of different types of data.
41
42     This class exists because I need to get references to parts of the buffer 
43     that have already been used so that I can increment counters inside the buffer.
44 */
45
46 class compressedBuffer {
47 public:
48   char* buf;
49   int pos; ///<< byte position just beyond the previously read/written data
50
51   compressedBuffer(){
52     buf = NULL;
53     pos = 0;
54   }
55
56   compressedBuffer(int bytes){
57     buf = (char*)malloc(bytes);
58     pos = 0;
59   }
60
61   compressedBuffer(void *buffer){
62     buf = (char*)buffer;
63     pos = 0;
64   }
65     
66   void init(void *buffer){
67     buf = (char*)buffer;
68     pos = 0;
69   }
70     
71   inline void * currentPtr(){
72     return (void*)(buf+pos);
73   }
74
75   template <typename T>
76   T read(int offset){
77     // to resolve unaligned writes causing bus errors, need memcpy
78     T v;
79     memcpy(&v, buf+offset, sizeof(T));
80     return v;
81   }
82     
83   template <typename T>
84   void write(T v, int offset){
85     T v2 = v; // on stack
86     // to resolve unaligned writes causing bus errors, need memcpy
87     memcpy(buf+offset, &v2, sizeof(T));
88   }
89     
90   template <typename T>
91   void increment(int offset){
92     T temp;
93     temp = read<T>(offset);
94     temp ++;
95     write<T>(temp, offset);
96   }
97
98   template <typename T>
99   void accumulate(T v, int offset){
100     T temp;
101     temp = read<T>(offset);
102     temp += v;
103     write<T>(temp, offset);
104   }
105   
106   template <typename T>
107   int push(T v){
108     int oldpos = pos;
109     write<T>(v, pos);
110     pos += sizeof(T);
111     return oldpos;
112   }
113   
114   template <typename T>
115   T pop(){
116     T temp = read<T>(pos);
117     pos += sizeof(T);
118     return temp;
119   }
120
121   template <typename T>
122   T peek(){
123     T temp = read<T>(pos);
124     return temp;
125   }
126
127   template <typename T0, typename T>
128   T peekSecond(){
129     T temp;
130     memcpy(&temp, buf+pos+sizeof(T0), sizeof(T));
131     return temp;
132   }
133
134   int datalength(){
135     return pos;
136   }
137      
138   void * buffer(){
139     return (void*) buf;
140   }  
141
142   void freeBuf(){
143     free(buf);
144   }
145
146   ~compressedBuffer(){
147     // don't free the buf because the user my have supplied the buffer
148   }
149     
150 };
151
152
153
154
155 // Global Readonly
156 CkGroupID traceSummaryGID;
157 bool summaryCcsStreaming;
158
159 int sumonly = 0;
160 int sumDetail = 0;
161
162 /**
163   For each TraceFoo module, _createTraceFoo() must be defined.
164   This function is called in _createTraces() generated in moduleInit.C
165 */
166 void _createTracesummary(char **argv)
167 {
168   DEBUGF(("%d createTraceSummary\n", CkMyPe()));
169   CkpvInitialize(TraceSummary*, _trace);
170   CkpvInitialize(int, previouslySentBins);
171   CkpvAccess(previouslySentBins) = 0;
172   CkpvAccess(_trace) = new  TraceSummary(argv);
173   CkpvAccess(_traces)->addTrace(CkpvAccess(_trace));
174   if (CkMyPe()==0) CkPrintf("Charm++: Tracemode Summary enabled.\n");
175 }
176
177
178 /// function call for starting a phase in trace summary logs 
179 extern "C" 
180 void CkSummary_StartPhase(int phase)
181 {
182    CkpvAccess(_trace)->startPhase(phase);
183 }
184
185
186 /// function call for adding an event mark
187 extern "C" 
188 void CkSummary_MarkEvent(int eventType)
189 {
190    CkpvAccess(_trace)->addEventType(eventType);
191 }
192
193 static inline void writeU(FILE* fp, int u)
194 {
195   fprintf(fp, "%4d", u);
196 }
197
198 PhaseEntry::PhaseEntry() 
199 {
200   int _numEntries=_entryTable.size();
201   // FIXME: Hopes there won't be more than 10 more EP's registered from now on...
202   nEPs = _numEntries+10; 
203   count = new int[nEPs];
204   times = new double[nEPs];
205   maxtimes = new double[nEPs];
206   for (int i=0; i<nEPs; i++) {
207     count[i] = 0;
208     times[i] = 0.0;
209     maxtimes[i] = 0.;
210   }
211 }
212
213 SumLogPool::~SumLogPool() 
214 {
215     if (!sumonly) {
216       write();
217       fclose(fp);
218       if (sumDetail) fclose(sdfp);
219   }
220   // free memory for mark
221   if (markcount > 0)
222   for (int i=0; i<MAX_MARKS; i++) {
223     for (int j=0; j<events[i].length(); j++)
224       delete events[i][j];
225   }
226   delete[] pool;
227   delete[] epInfo;
228   delete[] cpuTime;
229   delete[] numExecutions;
230 }
231
232 void SumLogPool::addEventType(int eventType, double time)
233 {
234    if (eventType <0 || eventType >= MAX_MARKS) {
235        CkPrintf("Invalid event type %d!\n", eventType);
236        return;
237    }
238    MarkEntry *e = new MarkEntry;
239    e->time = time;
240    events[eventType].push_back(e);
241    markcount ++;
242 }
243
244 SumLogPool::SumLogPool(char *pgm) : numBins(0), phaseTab(MAX_PHASES) 
245 {
246    // TBD: Can this be moved to initMem?
247   cpuTime = NULL;
248    poolSize = CkpvAccess(binCount);
249    if (poolSize % 2) poolSize++;        // make sure it is even
250    pool = new BinEntry[poolSize];
251    _MEMCHECK(pool);
252
253    this->pgm = new char[strlen(pgm)+1];
254    strcpy(this->pgm,pgm);
255    
256 #if 0
257    // create the sts file
258    if (CkMyPe() == 0) {
259      char *fname = 
260        new char[strlen(CkpvAccess(traceRoot))+strlen(".sum.sts")+1];
261      sprintf(fname, "%s.sum.sts", CkpvAccess(traceRoot));
262      stsfp = fopen(fname, "w+");
263      //CmiPrintf("File: %s \n", fname);
264      if (stsfp == 0) {
265        CmiAbort("Cannot open summary sts file for writing.\n");
266       }
267      delete[] fname;
268    }
269 #endif
270    
271    // event
272    markcount = 0;
273 }
274
275 void SumLogPool::initMem()
276 {
277    int _numEntries=_entryTable.size();
278    epInfoSize = _numEntries + NUM_DUMMY_EPS + 1; // keep a spare EP
279    epInfo = new SumEntryInfo[epInfoSize];
280    _MEMCHECK(epInfo);
281
282    cpuTime = NULL;
283    numExecutions = NULL;
284    if (sumDetail) {
285        cpuTime = new double[poolSize*epInfoSize];
286        _MEMCHECK(cpuTime);
287        memset(cpuTime, 0, poolSize*epInfoSize*sizeof(double));
288        numExecutions = new int[poolSize*epInfoSize];
289        _MEMCHECK(numExecutions);
290        memset(numExecutions, 0, poolSize*epInfoSize*sizeof(int));
291
292 //         int i, e;
293 //         for(i=0; i<poolSize; i++) {
294 //             for(e=0; e< epInfoSize; e++) {
295 //                 setCPUtime(i,e,0.0);
296 //                 setNumExecutions(i,e,0);
297 //             }
298 //         }
299    }
300 }
301
302 int SumLogPool::getUtilization(int interval, int ep) {
303     return (int)(getCPUtime(interval, ep) * 100.0 / CkpvAccess(binSize)); 
304 }
305
306 void SumLogPool::write(void) 
307 {
308   int i;
309   unsigned int j;
310   int _numEntries=_entryTable.size();
311
312   fp = NULL;
313   sdfp = NULL;
314
315   // create file(s)
316   // CmiPrintf("TRACE: %s:%d\n", fname, errno);
317   if (!sumonly) {
318     char pestr[10];
319     sprintf(pestr, "%d", CkMyPe());
320     int len = strlen(pgm) + strlen(".sumd.") + strlen(pestr) + 1;
321     char *fname = new char[len+1];
322     
323     sprintf(fname, "%s.%s.sum", pgm, pestr);
324     do {
325       fp = fopen(fname, "w+");
326     } while (!fp && errno == EINTR);
327     if (!fp) {
328       CkPrintf("[%d] Attempting to open [%s]\n",CkMyPe(),fname);
329       CmiAbort("Cannot open Summary Trace File for writing...\n");
330     }
331     
332     if (sumDetail) {
333       sprintf(fname, "%s.%s.sumd", pgm, pestr);
334       do {
335         sdfp = fopen(fname, "w+");
336       } while (!sdfp && errno == EINTR);
337       if(!sdfp) {
338         CmiAbort("Cannot open Detailed Summary Trace File for writing...\n");
339       }
340     }
341     delete[] fname;
342   }
343
344   fprintf(fp, "ver:%3.1f %d/%d count:%d ep:%d interval:%e", CkpvAccess(version), CkMyPe(), CkNumPes(), numBins, _numEntries, CkpvAccess(binSize));
345   if (CkpvAccess(version)>=3.0)
346   {
347     fprintf(fp, " phases:%d", phaseTab.numPhasesCalled());
348   }
349   fprintf(fp, "\n");
350
351   // write bin time
352 #if 1
353   int last=pool[0].getU();
354   writeU(fp, last);
355   int count=1;
356   for(j=1; j<numBins; j++) {
357     int u = pool[j].getU();
358     if (last == u) {
359       count++;
360     }
361     else {
362       if (count > 1) fprintf(fp, "+%d", count);
363       writeU(fp, u);
364       last = u;
365       count = 1;
366     }
367   }
368   if (count > 1) fprintf(fp, "+%d", count);
369 #else
370   for(j=0; j<numEntries; j++) 
371       pool[j].write(fp);
372 #endif
373   fprintf(fp, "\n");
374
375   // write entry execution time
376   fprintf(fp, "EPExeTime: ");
377   for (i=0; i<_numEntries; i++)
378     fprintf(fp, "%ld ", (long)(epInfo[i].epTime*1.0e6));
379   fprintf(fp, "\n");
380   // write entry function call times
381   fprintf(fp, "EPCallTime: ");
382   for (i=0; i<_numEntries; i++)
383     fprintf(fp, "%d ", epInfo[i].epCount);
384   fprintf(fp, "\n");
385   // write max entry function execute times
386   fprintf(fp, "MaxEPTime: ");
387   for (i=0; i<_numEntries; i++)
388     fprintf(fp, "%ld ", (long)(epInfo[i].epMaxTime*1.0e6));
389   fprintf(fp, "\n");
390 #if 0
391   for (i=0; i<SumEntryInfo::HIST_SIZE; i++) {
392     for (j=0; j<_numEntries; j++) 
393       fprintf(fp, "%d ", epInfo[j].hist[i]);
394     fprintf(fp, "\n");
395   }
396 #endif
397   // write marks
398   if (CkpvAccess(version)>=2.0) 
399   {
400     fprintf(fp, "NumMarks: %d ", markcount);
401     for (i=0; i<MAX_MARKS; i++) {
402       for(int j=0; j<events[i].length(); j++)
403         fprintf(fp, "%d %f ", i, events[i][j]->time);
404     }
405     fprintf(fp, "\n");
406   }
407   // write phases
408   if (CkpvAccess(version)>=3.0)
409   {
410     phaseTab.write(fp);
411   }
412   // write idle time
413   if (CkpvAccess(version)>=7.1) {
414     fprintf(fp, "IdlePercent: ");
415     int last=pool[0].getUIdle();
416     writeU(fp, last);
417     int count=1;
418     for(j=1; j<numBins; j++) {
419       int u = pool[j].getUIdle();
420       if (last == u) {
421         count++;
422       }
423       else {
424         if (count > 1) fprintf(fp, "+%d", count);
425         writeU(fp, u);
426         last = u;
427         count = 1;
428       }
429     }
430     if (count > 1) fprintf(fp, "+%d", count);
431     fprintf(fp, "\n");
432   }
433
434   CkPrintf("writing to detail file:%d    %d \n", getNumEntries(), numBins);
435   // write summary details
436   if (sumDetail) {
437         fprintf(sdfp, "ver:%3.1f cpu:%d/%d numIntervals:%d numEPs:%d intervalSize:%e\n",
438                 CkpvAccess(version), CkMyPe(), CkNumPes(),
439                 numBins, _numEntries, CkpvAccess(binSize));
440
441         // Write out cpuTime in microseconds
442         // Run length encoding (RLE) along EP axis
443         fprintf(sdfp, "ExeTimePerEPperInterval ");
444         unsigned int e, i;
445         long last= (long) (getCPUtime(0,0)*1.0e6);
446         int count=0;
447         fprintf(sdfp, "%ld", last);
448         for(e=0; e<_numEntries; e++) {
449             for(i=0; i<numBins; i++) {
450
451                 long u= (long) (getCPUtime(i,e)*1.0e6);
452                 if (last == u) {
453                     count++;
454                 } else {
455
456                     if (count > 1) fprintf(sdfp, "+%d", count);
457                     fprintf(sdfp, " %ld", u);
458                     last = u;
459                     count = 1;
460                 }
461             }
462         }
463         if (count > 1) fprintf(sdfp, "+%d", count);
464         fprintf(sdfp, "\n");
465         // Write out numExecutions
466         // Run length encoding (RLE) along EP axis
467         fprintf(sdfp, "EPCallTimePerInterval ");
468         last= getNumExecutions(0,0);
469         count=0;
470         fprintf(sdfp, "%ld", last);
471         for(e=0; e<_numEntries; e++) {
472             for(i=0; i<numBins; i++) {
473
474                 long u= getNumExecutions(i, e);
475                 if (last == u) {
476                     count++;
477                 } else {
478
479                     if (count > 1) fprintf(sdfp, "+%d", count);
480                     fprintf(sdfp, " %ld", u);
481                     last = u;
482                     count = 1;
483                 }
484             }
485         }
486         if (count > 1) fprintf(sdfp, "+%d", count);
487         fprintf(sdfp, "\n");
488   }
489 }
490
491 void SumLogPool::writeSts(void)
492 {
493     // open sts file
494   char *fname = 
495        new char[strlen(CkpvAccess(traceRoot))+strlen(".sum.sts")+1];
496   sprintf(fname, "%s.sum.sts", CkpvAccess(traceRoot));
497   stsfp = fopen(fname, "w+");
498   //CmiPrintf("File: %s \n", fname);
499   if (stsfp == 0) {
500        CmiAbort("Cannot open summary sts file for writing.\n");
501   }
502   delete[] fname;
503
504   traceWriteSTS(stsfp,_numEvents);
505   for(int i=0;i<_numEvents;i++)
506     fprintf(stsfp, "EVENT %d Event%d\n", i, i);
507   fprintf(stsfp, "END\n");
508
509   fclose(stsfp);
510 }
511
512 // Called once per interval
513 void SumLogPool::add(double time, double idleTime, int pe) 
514 {
515   new (&pool[numBins++]) BinEntry(time, idleTime);
516   if (poolSize==numBins) {
517     shrink();
518   }
519 }
520
521 // Called once per run of an EP
522 // adds 'time' to EP's time, increments epCount
523 void SumLogPool::setEp(int epidx, double time) 
524 {
525   if (epidx >= epInfoSize) {
526         CmiAbort("Invalid entry point!!\n");
527   }
528   //CmiPrintf("set EP: %d %e \n", epidx, time);
529   epInfo[epidx].setTime(time);
530   // set phase table counter
531   phaseTab.setEp(epidx, time);
532 }
533
534 // Called once from endExecute, endPack, etc. this function updates
535 // the sumDetail intervals.
536 void SumLogPool::updateSummaryDetail(int epIdx, double startTime, double endTime)
537 {
538         if (epIdx >= epInfoSize) {
539             CmiAbort("Too many entry points!!\n");
540         }
541
542         double binSz = CkpvAccess(binSize);
543         int startingBinIdx, endingBinIdx;
544         startingBinIdx = (int)(startTime/binSz);
545         endingBinIdx = (int)(endTime/binSz);
546         // shrink if needed
547         while (endingBinIdx >= poolSize) {
548           shrink();
549           CmiAssert(CkpvAccess(binSize) > binSz);
550           binSz = CkpvAccess(binSize);
551           startingBinIdx = (int)(startTime/binSz);
552           endingBinIdx = (int)(endTime/binSz);
553         }
554
555         if (startingBinIdx == endingBinIdx) {
556             addToCPUtime(startingBinIdx, epIdx, endTime - startTime);
557         } else if (startingBinIdx < endingBinIdx) { // EP spans intervals
558             addToCPUtime(startingBinIdx, epIdx, (startingBinIdx+1)*binSz - startTime);
559             while(++startingBinIdx < endingBinIdx)
560                 addToCPUtime(startingBinIdx, epIdx, binSz);
561             addToCPUtime(endingBinIdx, epIdx, endTime - endingBinIdx*binSz);
562         } else {
563           CkPrintf("[%d] EP:%d Start:%lf End:%lf\n",CkMyPe(),epIdx,
564                    startTime, endTime);
565             CmiAbort("Error: end time of EP is less than start time\n");
566         }
567
568         incNumExecutions(startingBinIdx, epIdx);
569 }
570
571 // Shrinks pool[], cpuTime[], and numExecutions[]
572 void SumLogPool::shrink(void)
573 {
574 //  double t = CmiWallTimer();
575
576   // We ensured earlier that poolSize is even; therefore now numBins
577   // == poolSize == even.
578   int entries = numBins/2;
579   for (int i=0; i<entries; i++)
580   {
581      pool[i].time() = pool[i*2].time() + pool[i*2+1].time();
582      pool[i].getIdleTime() = pool[i*2].getIdleTime() + pool[i*2+1].getIdleTime();
583      if (sumDetail)
584      for (int e=0; e < epInfoSize; e++) {
585          setCPUtime(i, e, getCPUtime(i*2, e) + getCPUtime(i*2+1, e));
586          setNumExecutions(i, e, getNumExecutions(i*2, e) + getNumExecutions(i*2+1, e));
587      }
588   }
589   // zero out the remaining intervals
590   if (sumDetail) {
591     memset(&cpuTime[entries*epInfoSize], 0, (numBins-entries)*epInfoSize*sizeof(double));
592     memset(&numExecutions[entries*epInfoSize], 0, (numBins-entries)*epInfoSize*sizeof(int));
593   }
594   numBins = entries;
595   CkpvAccess(binSize) *= 2;
596
597 //CkPrintf("Shrinked binsize: %f entries:%d takes %fs!!!!\n", CkpvAccess(binSize), numEntries, CmiWallTimer()-t);
598 }
599
600 void SumLogPool::shrink(double _maxBinSize)
601 {
602     while(CkpvAccess(binSize) < _maxBinSize)
603     {
604         shrink();
605     };
606 }
607 int  BinEntry::getU() 
608
609   return (int)(_time * 100.0 / CkpvAccess(binSize)); 
610 }
611
612 int BinEntry::getUIdle() {
613   return (int)(_idleTime * 100.0 / CkpvAccess(binSize));
614 }
615
616 void BinEntry::write(FILE* fp)
617 {
618   writeU(fp, getU());
619 }
620
621 TraceSummary::TraceSummary(char **argv):binStart(0.0),idleStart(0.0),
622                                         binTime(0.0),binIdle(0.0),msgNum(0)
623 {
624   if (CkpvAccess(traceOnPe) == 0) return;
625
626     // use absolute time
627   if (CmiTimerAbsolute()) binStart = CmiInitTime();
628
629   CkpvInitialize(int, binCount);
630   CkpvInitialize(double, binSize);
631   CkpvInitialize(double, numEntries);
632   CkpvInitialize(double, version);
633   CkpvAccess(binSize) = BIN_SIZE;
634   CkpvAccess(version) = VER;
635   CkpvAccess(binCount) = DefaultBinCount;
636   if (CmiGetArgIntDesc(argv,"+bincount",&CkpvAccess(binCount), "Total number of summary bins"))
637     if (CkMyPe() == 0) 
638       CmiPrintf("Trace: bincount: %d\n", CkpvAccess(binCount));
639   CmiGetArgDoubleDesc(argv,"+binsize",&CkpvAccess(binSize),
640         "CPU usage log time resolution");
641   CmiGetArgDoubleDesc(argv,"+version",&CkpvAccess(version),
642         "Write this .sum file version");
643
644   epThreshold = 0.001; 
645   CmiGetArgDoubleDesc(argv,"+epThreshold",&epThreshold,
646         "Execution time histogram lower bound");
647   epInterval = 0.001; 
648   CmiGetArgDoubleDesc(argv,"+epInterval",&epInterval,
649         "Execution time histogram bin size");
650
651   sumonly = CmiGetArgFlagDesc(argv, "+sumonly", "merge histogram bins on processor 0");
652   // +sumonly overrides +sumDetail
653   if (!sumonly)
654       sumDetail = CmiGetArgFlagDesc(argv, "+sumDetail", "more detailed summary info");
655
656   _logPool = new SumLogPool(CkpvAccess(traceRoot));
657   // assume invalid entry point on start
658   execEp=INVALIDEP;
659   inIdle = 0;
660   inExec = 0;
661 }
662
663 void TraceSummary::traceClearEps(void)
664 {
665   _logPool->clearEps();
666 }
667
668 void TraceSummary::traceWriteSts(void)
669 {
670   if(CkMyPe()==0)
671       _logPool->writeSts();
672 }
673
674 void TraceSummary::traceClose(void)
675 {
676     if(CkMyPe()==0)
677         _logPool->writeSts();
678     CkpvAccess(_trace)->endComputation();
679
680     delete _logPool;
681     CkpvAccess(_traces)->removeTrace(this);
682 }
683
684 void TraceSummary::beginExecute(CmiObjId *tid)
685 {
686   beginExecute(-1,-1,_threadEP,-1);
687 }
688
689 void TraceSummary::beginExecute(envelope *e)
690 {
691   // no message means thread execution
692   if (e==NULL) {
693     beginExecute(-1,-1,_threadEP,-1);
694   }
695   else {
696     beginExecute(-1,-1,e->getEpIdx(),-1);
697   }  
698 }
699
700 void TraceSummary::beginExecute(char *msg)
701 {
702 #if CMK_SMP_TRACE_COMMTHREAD
703     //This function is called from comm thread in SMP mode
704     envelope *e = (envelope *)msg;
705     int num = _entryTable.size();
706     int ep = e->getEpIdx();
707     if(ep<0 || ep>=num) return;
708     if(_entryTable[ep]->traceEnabled)
709         beginExecute(-1,-1,e->getEpIdx(),-1);
710 #endif
711 }
712
713 void TraceSummary::beginExecute(int event,int msgType,int ep,int srcPe, int mlen, CmiObjId *idx)
714 {
715   if (execEp == TRACEON_EP) {
716     endExecute();
717   }
718   CmiAssert(inIdle == 0 && inExec == 0);
719   inExec = 1;
720
721   if (execEp != INVALIDEP) {
722     TRACE_WARN("Warning: TraceSummary two consecutive BEGIN_PROCESSING!\n");
723     return;
724   }
725   
726   execEp=ep;
727   double t = TraceTimer();
728   //CmiPrintf("start: %f \n", start);
729   
730   start = t;
731   double ts = binStart;
732   // fill gaps
733   while ((ts = ts + CkpvAccess(binSize)) < t) {
734     /* Keep as a template for error checking. The current form of this check
735        is vulnerable to round-off errors (eg. 0.001 vs 0.001 the first time
736        I used it). Perhaps an improved form could be used in case vastly
737        incompatible EP vs idle times are reported (binSize*2?).
738
739        This check will have to be duplicated before each call to add()
740
741     CkPrintf("[%d] %f vs %f\n", CkMyPe(),
742              binTime + binIdle, CkpvAccess(binSize));
743     CkAssert(binTime + binIdle <= CkpvAccess(binSize));
744     */
745      _logPool->add(binTime, binIdle, CkMyPe()); // add leftovers of last bin
746      binTime=0.0;                 // fill all other bins with 0 up till start
747      binIdle = 0.0;
748      binStart = ts;
749   }
750 }
751
752 void TraceSummary::endExecute(void)
753 {
754   CmiAssert(inIdle == 0 && inExec == 1);
755   inExec = 0;
756  
757   double t = TraceTimer();
758   double ts = start;
759   double nts = binStart;
760
761 /*
762   if (execEp == TRACEON_EP) {
763     // if trace just got turned on, then one expects to see this
764     // END_PROCESSING event without seeing a preceeding BEGIN_PROCESSING
765     return;
766   }
767 */
768
769   if (execEp == INVALIDEP) {
770     TRACE_WARN("Warning: TraceSummary END_PROCESSING without BEGIN_PROCESSING!\n");
771     return;
772   }
773
774   if (execEp >= 0)
775   {
776     _logPool->setEp(execEp, t-ts);
777   }
778
779   while ((nts = nts + CkpvAccess(binSize)) < t)
780   {
781     // fill the bins with time for this entry method
782      binTime += nts-ts;
783      binStart  = nts;
784      // This calls shrink() if needed
785      _logPool->add(binTime, binIdle, CkMyPe()); 
786      binTime = 0.0;
787      binIdle = 0.0;
788      ts = nts;
789   }
790   binTime += t - ts;
791
792   if (sumDetail && execEp >= 0 )
793       _logPool->updateSummaryDetail(execEp, start, t);
794
795   execEp = INVALIDEP;
796 }
797
798 void TraceSummary::endExecute(char *msg){
799 #if CMK_SMP_TRACE_COMMTHREAD
800     //This function is called from comm thread in SMP mode
801     envelope *e = (envelope *)msg;
802     int num = _entryTable.size();
803     int ep = e->getEpIdx();
804     if(ep<0 || ep>=num) return;
805     if(_entryTable[ep]->traceEnabled){
806         endExecute();
807     }
808 #endif    
809 }
810
811 void TraceSummary::beginIdle(double currT)
812 {
813   if (execEp == TRACEON_EP) {
814     endExecute();
815   }
816
817   CmiAssert(inIdle == 0 && inExec == 0);
818   inIdle = 1;
819   double t = TraceTimer(currT);
820   
821   // mark the time of this idle period. Only the next endIdle should see
822   // this value
823   idleStart = t; 
824   double ts = binStart;
825   // fill gaps
826   while ((ts = ts + CkpvAccess(binSize)) < t) {
827     _logPool->add(binTime, binIdle, CkMyPe()); // add leftovers of last bin
828     binTime=0.0;                 // fill all other bins with 0 up till start
829     binIdle = 0.0;
830     binStart = ts;
831   }
832 }
833
834 void TraceSummary::endIdle(double currT)
835 {
836   CmiAssert(inIdle == 1 && inExec == 0);
837   inIdle = 0;
838   double t = TraceTimer(currT);
839   double t_idleStart = idleStart;
840   double t_binStart = binStart;
841
842   while ((t_binStart = t_binStart + CkpvAccess(binSize)) < t)
843   {
844     // fill the bins with time for idle
845     binIdle += t_binStart - t_idleStart;
846     binStart = t_binStart;
847     _logPool->add(binTime, binIdle, CkMyPe()); // This calls shrink() if needed
848     binTime = 0.0;
849     binIdle = 0.0;
850     t_idleStart = t_binStart;
851   }
852   binIdle += t - t_idleStart;
853 }
854
855 void TraceSummary::traceBegin(void)
856 {
857     // fake as a start of an event, assuming traceBegin is called inside an
858     // entry function.
859   beginExecute(-1, -1, TRACEON_EP, -1, -1);
860 }
861
862 void TraceSummary::traceEnd(void)
863 {
864   endExecute();
865 }
866
867 void TraceSummary::beginPack(void)
868 {
869     packstart = CmiWallTimer();
870 }
871
872 void TraceSummary::endPack(void)
873 {
874     _logPool->setEp(_packEP, CmiWallTimer() - packstart);
875     if (sumDetail)
876         _logPool->updateSummaryDetail(_packEP,  TraceTimer(packstart), TraceTimer(CmiWallTimer()));
877 }
878
879 void TraceSummary::beginUnpack(void)
880 {
881     unpackstart = CmiWallTimer();
882 }
883
884 void TraceSummary::endUnpack(void)
885 {
886     _logPool->setEp(_unpackEP, CmiWallTimer()-unpackstart);
887     if (sumDetail)
888         _logPool->updateSummaryDetail(_unpackEP,  TraceTimer(unpackstart), TraceTimer(CmiWallTimer()));
889 }
890
891 void TraceSummary::beginComputation(void)
892 {
893   // initialze arrays because now the number of entries is known.
894   _logPool->initMem();
895 }
896
897 void TraceSummary::endComputation(void)
898 {
899   static int done = 0;
900   if (done) return;
901   done = 1;
902   if (msgNum==0) {
903 //CmiPrintf("Add at last: %d pe:%d time:%f msg:%d\n", index, CkMyPe(), bin, msgNum);
904      _logPool->add(binTime, binIdle, CkMyPe());
905      binTime = 0.0;
906      binIdle = 0.0;
907      msgNum ++;
908
909      binStart  += CkpvAccess(binSize);
910      double t = TraceTimer();
911      double ts = binStart;
912      while (ts < t)
913      {
914        _logPool->add(binTime, binIdle, CkMyPe());
915        binTime=0.0;
916        binIdle = 0.0;
917        ts += CkpvAccess(binSize);
918      }
919
920   }
921 }
922
923 void TraceSummary::addEventType(int eventType)
924 {
925   _logPool->addEventType(eventType, TraceTimer());
926 }
927
928 void TraceSummary::startPhase(int phase)
929 {
930    _logPool->startPhase(phase);
931 }
932
933 void TraceSummary::traceEnableCCS() {
934   CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
935   sumProxy.initCCS();
936 }
937
938
939 void TraceSummary::fillData(double *buffer, double reqStartTime, 
940                             double reqBinSize, int reqNumBins) {
941   // buffer has to be pre-allocated by the requester and must be an array of
942   // size reqNumBins.
943   //
944   // Assumptions: **CWL** FOR DEMO ONLY - a production-capable version will
945   //              need a number of these assumptions dropped:
946   //              1) reqBinSize == binSize (unrealistic)
947   //              2) bins boundary aligned (ok even under normal circumstances)
948   //              3) bins are "factor"-aligned (where reqBinSize != binSize)
949   //              4) bins are always available (true unless flush)
950   //              5) bins always starts from 0 (unrealistic)
951
952   // works only because of 1)
953   // **CWL** - FRACKING STUPID NAME "binStart" has nothing to do with 
954   //           "starting" at all!
955   int binOffset = (int)(reqStartTime/reqBinSize); 
956   for (int i=binOffset; i<binOffset + reqNumBins; i++) {
957     // CkPrintf("[%d] %f\n", i, pool()->getTime(i));
958     buffer[i-binOffset] = pool()->getTime(i);
959   }
960 }
961
962 void TraceSummaryBOC::traceSummaryParallelShutdown(int pe) {
963    
964     UInt    numBins = CkpvAccess(_trace)->pool()->getNumEntries();  
965     //CkPrintf("trace shut down pe=%d bincount=%d\n", CkMyPe(), numBins);
966     CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
967     CkCallback cb(CkIndex_TraceSummaryBOC::maxBinSize(NULL), sumProxy[0]);
968     contribute(sizeof(double), &(CkpvAccess(binSize)), CkReduction::max_double, cb);
969 }
970
971 // collect the max bin size
972 void TraceSummaryBOC::maxBinSize(CkReductionMsg *msg)
973 {
974     double _maxBinSize = *((double *)msg->getData());
975     CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
976     sumProxy.shrink(_maxBinSize);
977 }
978
979 void TraceSummaryBOC::shrink(double _mBin){
980     UInt    numBins = CkpvAccess(_trace)->pool()->getNumEntries();  
981     UInt    epNums  = CkpvAccess(_trace)->pool()->getEpInfoSize();
982     int     numEntries = epNums - NUM_DUMMY_EPS - 1;
983     _maxBinSize = _mBin;
984     if(CkpvAccess(binSize) < _maxBinSize)
985     {
986         CkpvAccess(_trace)->pool()->shrink(_maxBinSize);
987     }
988     double *sumData = CkpvAccess(_trace)->pool()->getCpuTime();  
989     CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
990     CkCallback cb(CkIndex_TraceSummaryBOC::sumData(NULL), sumProxy[0]);
991     contribute(sizeof(double) * numBins * epNums, CkpvAccess(_trace)->pool()->getCpuTime(), CkReduction::sum_double, cb);
992 }
993
994 void TraceSummaryBOC::sumData(CkReductionMsg *msg) {
995     double *sumData = (double *)msg->getData();
996     int     totalsize = msg->getSize()/sizeof(double);
997     UInt    epNums  = CkpvAccess(_trace)->pool()->getEpInfoSize();
998     UInt    numBins = totalsize/epNums;  
999     int     numEntries = epNums - NUM_DUMMY_EPS - 1; 
1000     char    *fname = new char[strlen(CkpvAccess(traceRoot))+strlen(".sumall")+1];
1001     sprintf(fname, "%s.sumall", CkpvAccess(traceRoot));
1002     FILE *sumfp = fopen(fname, "w+");
1003     fprintf(sumfp, "ver:%3.1f cpu:%d numIntervals:%d numEPs:%d intervalSize:%e\n",
1004                 CkpvAccess(version), CkNumPes(),
1005                 numBins, numEntries, _maxBinSize);
1006     for(int i=0; i<numBins; i++){
1007         for(int j=0; j<numEntries; j++)
1008         {
1009             fprintf(sumfp, "%ld ", (long)(sumData[i*epNums+j]*1.0e6));
1010         }
1011     }
1012    fclose(sumfp);
1013    //CkPrintf("done with analysis\n");
1014    CkExit();
1015 }
1016
1017 /// for TraceSummaryBOC
1018
1019 void TraceSummaryBOC::initCCS() {
1020   if(firstTime){
1021     CkPrintf("[%d] initCCS() called for first time\n", CkMyPe());
1022     // initializing CCS-based parameters on all processors
1023     lastRequestedIndexBlock = 0;
1024     indicesPerBlock = 1000;
1025     collectionGranularity = 0.001; // time in seconds
1026     nBufferedBins = 0;
1027     
1028     // initialize buffer, register CCS handler and start the collection
1029     // pulse only on pe 0.
1030     if (CkMyPe() == 0) { 
1031       ccsBufferedData = new CkVec<double>();
1032     
1033       CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
1034       CkPrintf("Trace Summary now listening in for CCS Client\n");
1035       CcsRegisterHandler("CkPerfSummaryCcsClientCB", 
1036                          CkCallback(CkIndex_TraceSummaryBOC::ccsRequestSummaryDouble(NULL), sumProxy[0]));
1037       CcsRegisterHandler("CkPerfSummaryCcsClientCB uchar", 
1038                          CkCallback(CkIndex_TraceSummaryBOC::ccsRequestSummaryUnsignedChar(NULL), sumProxy[0])); 
1039
1040       CkPrintf("[%d] Setting up periodic startCollectData callback\n", CkMyPe());
1041       CcdCallOnConditionKeep(CcdPERIODIC_1second, startCollectData,
1042                              (void *)this);
1043       summaryCcsStreaming = CmiTrue;
1044     }
1045     firstTime = false;
1046   }
1047 }
1048
1049 /** Return summary information as double precision values for each sample period. 
1050     The actual data collection is in double precision values. 
1051
1052     The units on the returned values are total execution time across all PEs.
1053 */
1054 void TraceSummaryBOC::ccsRequestSummaryDouble(CkCcsRequestMsg *m) {
1055   double *sendBuffer;
1056
1057   CkPrintf("[%d] Request from Client detected.\n", CkMyPe());
1058
1059   CkPrintf("Responding ...\n");
1060   int datalength = 0;
1061   // if we have no data to send, send an acknowledgement code of -13.37
1062   // instead.
1063   if (ccsBufferedData->length() == 0) {
1064     sendBuffer = new double[1];
1065     sendBuffer[0] = -13.37;
1066     datalength = sizeof(double);
1067     CcsSendDelayedReply(m->reply, datalength, (void *)sendBuffer);
1068     delete [] sendBuffer;
1069   } else {
1070     sendBuffer = ccsBufferedData->getVec();
1071     datalength = ccsBufferedData->length()*sizeof(double);
1072     CcsSendDelayedReply(m->reply, datalength, (void *)sendBuffer);
1073     ccsBufferedData->free();
1074   }
1075   CkPrintf("Response Sent. Proceeding with computation.\n");
1076   delete m;
1077 }
1078
1079
1080 /** Return summary information as unsigned char values for each sample period. 
1081     The actual data collection is in double precision values.
1082
1083     This returns the utilization in a range from 0 to 200.
1084 */
1085 void TraceSummaryBOC::ccsRequestSummaryUnsignedChar(CkCcsRequestMsg *m) {
1086   unsigned char *sendBuffer;
1087
1088   CkPrintf("[%d] Request from Client detected. \n", CkMyPe());
1089
1090   CkPrintf("Responding ...\n");
1091   int datalength = 0;
1092
1093   if (ccsBufferedData->length() == 0) {
1094     sendBuffer = new unsigned char[1];
1095     sendBuffer[0] = 255;
1096     datalength = sizeof(unsigned char);
1097     CcsSendDelayedReply(m->reply, datalength, (void *)sendBuffer);
1098     delete [] sendBuffer;
1099   } else {
1100     double * doubleData = ccsBufferedData->getVec();
1101     int numData = ccsBufferedData->length();
1102     
1103     // pack data into unsigned char array
1104     sendBuffer = new unsigned char[numData];
1105     
1106     for(int i=0;i<numData;i++){
1107       sendBuffer[i] = 1000.0 * doubleData[i] / (double)CkNumPes() * 200.0; // max = 200 is the same as 100% utilization
1108       int v = sendBuffer[i];
1109     }    
1110
1111     datalength = sizeof(unsigned char) * numData;
1112     
1113     CcsSendDelayedReply(m->reply, datalength, (void *)sendBuffer);
1114     ccsBufferedData->free();
1115     delete [] sendBuffer;
1116   }
1117   CkPrintf("Response Sent. Proceeding with computation.\n");
1118   delete m;
1119 }
1120
1121
1122
1123 void startCollectData(void *data, double currT) {
1124   CkAssert(CkMyPe() == 0);
1125   // CkPrintf("startCollectData()\n");
1126   TraceSummaryBOC *sumObj = (TraceSummaryBOC *)data;
1127   int lastRequestedIndexBlock = sumObj->lastRequestedIndexBlock;
1128   double collectionGranularity = sumObj->collectionGranularity;
1129   int indicesPerBlock = sumObj->indicesPerBlock;
1130   
1131   double startTime = lastRequestedIndexBlock*
1132     collectionGranularity * indicesPerBlock;
1133   int numIndicesToGet = (int)floor((currT - startTime)/
1134                                    collectionGranularity);
1135   int numBlocksToGet = numIndicesToGet/indicesPerBlock;
1136   // **TODO** consider limiting the total number of blocks each collection
1137   //   request will pick up. This is to limit the amount of perturbation
1138   //   if it proves to be a problem.
1139   CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
1140
1141    sumProxy.collectSummaryData(startTime, 
1142                        collectionGranularity,
1143                        numBlocksToGet*indicesPerBlock);
1144   // assume success
1145   sumObj->lastRequestedIndexBlock += numBlocksToGet; 
1146 }
1147
1148 void TraceSummaryBOC::collectSummaryData(double startTime, double binSize,
1149                                   int numBins) {
1150   // CkPrintf("[%d] asked to contribute performance data\n", CkMyPe());
1151
1152   double *contribution = new double[numBins];
1153   for (int i=0; i<numBins; i++) {
1154     contribution[i] = 0.0;
1155   }
1156   CkpvAccess(_trace)->fillData(contribution, startTime, binSize, numBins);
1157
1158   /*
1159   for (int i=0; i<numBins; i++) {
1160     CkPrintf("[%d] %f\n", i, contribution[i]);
1161   }
1162   */
1163
1164   CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
1165   CkCallback cb(CkIndex_TraceSummaryBOC::summaryDataCollected(NULL), sumProxy[0]);
1166   contribute(sizeof(double)*numBins, contribution, CkReduction::sum_double, 
1167              cb);
1168 }
1169
1170 void TraceSummaryBOC::summaryDataCollected(CkReductionMsg *msg) {
1171   CkAssert(CkMyPe() == 0);
1172   // **CWL** No memory management for the ccs buffer for now.
1173
1174   // CkPrintf("[%d] Reduction completed and received\n", CkMyPe());
1175   double *recvData = (double *)msg->getData();
1176   int numBins = msg->getSize()/sizeof(double);
1177
1178   // if there's an easier way to append a data block to a CkVec, I'll take it
1179   for (int i=0; i<numBins; i++) {
1180     ccsBufferedData->insertAtEnd(recvData[i]);
1181   }
1182   delete msg;
1183 }
1184
1185
1186
1187
1188 void TraceSummaryBOC::startSumOnly()
1189 {
1190   CmiAssert(CkMyPe() == 0);
1191
1192   CProxy_TraceSummaryBOC p(traceSummaryGID);
1193   int size = CkpvAccess(_trace)->pool()->getNumEntries();
1194   p.askSummary(size);
1195 }
1196
1197 void TraceSummaryBOC::askSummary(int size)
1198 {
1199   if (CkpvAccess(_trace) == NULL) return;
1200
1201   int traced = CkpvAccess(_trace)->traceOnPE();
1202
1203   BinEntry *reductionBuffer = new BinEntry[size+1];
1204   reductionBuffer[size].time() = traced;  // last element is the traced pe count
1205   reductionBuffer[size].getIdleTime() = 0;  // last element is the traced pe count
1206   if (traced) {
1207     CkpvAccess(_trace)->endComputation();
1208     int n = CkpvAccess(_trace)->pool()->getNumEntries();
1209     BinEntry *localBins = CkpvAccess(_trace)->pool()->bins();
1210     if (n>size) n=size;
1211     for (int i=0; i<n; i++) reductionBuffer[i] = localBins[i];
1212   }
1213
1214   contribute(sizeof(BinEntry)*(size+1), reductionBuffer, 
1215              CkReduction::sum_double);
1216   delete [] reductionBuffer;
1217 }
1218
1219 //extern "C" void _CkExit();
1220
1221 void TraceSummaryBOC::sendSummaryBOC(CkReductionMsg *msg)
1222 {
1223   if (CkpvAccess(_trace) == NULL) return;
1224
1225   CkAssert(CkMyPe() == 0);
1226
1227   int n = msg->getSize()/sizeof(BinEntry);
1228   nBins = n-1;
1229   bins = (BinEntry *)msg->getData();
1230   nTracedPEs = (int)bins[n-1].time();
1231   // CmiPrintf("traced: %d entry:%d\n", nTracedPEs, nBins);
1232
1233   write();
1234
1235   delete msg;
1236
1237   CkExit();
1238 }
1239
1240 void TraceSummaryBOC::write(void) 
1241 {
1242   int i;
1243   unsigned int j;
1244
1245   char *fname = new char[strlen(CkpvAccess(traceRoot))+strlen(".sum")+1];
1246   sprintf(fname, "%s.sum", CkpvAccess(traceRoot));
1247   FILE *sumfp = fopen(fname, "w+");
1248   //CmiPrintf("File: %s \n", fname);
1249   if(sumfp == 0)
1250       CmiAbort("Cannot open summary sts file for writing.\n");
1251   delete[] fname;
1252
1253   int _numEntries=_entryTable.size();
1254   fprintf(sumfp, "ver:%3.1f %d/%d count:%d ep:%d interval:%e numTracedPE:%d", CkpvAccess(version), CkMyPe(), CkNumPes(), nBins, _numEntries, CkpvAccess(binSize), nTracedPEs);
1255   fprintf(sumfp, "\n");
1256
1257   // write bin time
1258 #if 0
1259   int last=pool[0].getU();
1260   writeU(fp, last);
1261   int count=1;
1262   for(j=1; j<numEntries; j++) {
1263     int u = pool[j].getU();
1264     if (last == u) {
1265       count++;
1266     }
1267     else {
1268       if (count > 1) fprintf(fp, "+%d", count);
1269       writeU(fp, u);
1270       last = u;
1271       count = 1;
1272     }
1273   }
1274   if (count > 1) fprintf(fp, "+%d", count);
1275 #else
1276   for(j=0; j<nBins; j++) {
1277     bins[j].time() /= nTracedPEs;
1278     bins[j].write(sumfp);
1279   }
1280 #endif
1281   fprintf(sumfp, "\n");
1282   fclose(sumfp);
1283
1284 }
1285
1286 extern "C" void CombineSummary()
1287 {
1288 #if CMK_TRACE_ENABLED
1289   CmiPrintf("[%d] CombineSummary called!\n", CkMyPe());
1290   if (sumonly) {
1291     CmiPrintf("[%d] Sum Only start!\n", CkMyPe());
1292       // pe 0 start the sumonly process
1293     CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
1294     sumProxy[0].startSumOnly();
1295   }else if(sumDetail)
1296   {
1297       CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
1298       sumProxy.traceSummaryParallelShutdown(-1);
1299   }
1300   else {
1301     _TRACE_BEGIN_EXECUTE_DETAILED(-1, -1, _threadEP,CkMyPe(), 0, NULL);
1302     CkExit();
1303   }
1304 #else
1305   CkExit();
1306 #endif
1307 }
1308
1309 void initTraceSummaryBOC()
1310 {
1311 #ifdef __BLUEGENE__
1312   if(BgNodeRank()==0) {
1313 #else
1314   if (CkMyRank() == 0) {
1315 #endif
1316     registerExitFn(CombineSummary);
1317   }
1318 }
1319
1320
1321
1322
1323
1324
1325 #include "TraceSummary.def.h"
1326
1327
1328 /*@}*/
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348