fixed a bug in CkExit() with projections which has end_execute without matching begin...
[charm.git] / src / ck-perf / trace-summary.C
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 /**
9  * \addtogroup CkPerf
10 */
11 /*@{*/
12
13 #include "charm++.h"
14 #include "trace-summary.h"
15 #include "trace-summaryBOC.h"
16
17 #define DEBUGF(x)  // CmiPrintf x
18
19 #define VER   7.1
20
21 #define INVALIDEP     -2
22 #define TRACEON_EP     -3
23
24 // 1 minutes of run before it'll fill up:
25 #define DefaultBinCount      (1000*60*1) 
26
27 CkpvStaticDeclare(TraceSummary*, _trace);
28 static int _numEvents = 0;
29 #define NUM_DUMMY_EPS 9
30 CkpvDeclare(int, binCount);
31 CkpvDeclare(double, binSize);
32 CkpvDeclare(double, version);
33
34
35 CkpvDeclare(int, previouslySentBins);
36
37
38
39
40
41 /** 
42     A class that reads/writes a buffer out of different types of data.
43
44     This class exists because I need to get references to parts of the buffer 
45     that have already been used so that I can increment counters inside the buffer.
46 */
47
48 class compressedBuffer {
49 public:
50   char* buf;
51   int pos; ///<< byte position just beyond the previously read/written data
52
53   compressedBuffer(){
54     buf = NULL;
55     pos = 0;
56   }
57
58   compressedBuffer(int bytes){
59     buf = (char*)malloc(bytes);
60     pos = 0;
61   }
62
63   compressedBuffer(void *buffer){
64     buf = (char*)buffer;
65     pos = 0;
66   }
67     
68   void init(void *buffer){
69     buf = (char*)buffer;
70     pos = 0;
71   }
72     
73   inline void * currentPtr(){
74     return (void*)(buf+pos);
75   }
76
77   template <typename T>
78   T read(int offset){
79     // to resolve unaligned writes causing bus errors, need memcpy
80     T v;
81     memcpy(&v, buf+offset, sizeof(T));
82     return v;
83   }
84     
85   template <typename T>
86   void write(T v, int offset){
87     T v2 = v; // on stack
88     // to resolve unaligned writes causing bus errors, need memcpy
89     memcpy(buf+offset, &v2, sizeof(T));
90   }
91     
92   template <typename T>
93   void increment(int offset){
94     T temp;
95     temp = read<T>(offset);
96     temp ++;
97     write<T>(temp, offset);
98   }
99
100   template <typename T>
101   void accumulate(T v, int offset){
102     T temp;
103     temp = read<T>(offset);
104     temp += v;
105     write<T>(temp, offset);
106   }
107   
108   template <typename T>
109   int push(T v){
110     int oldpos = pos;
111     write<T>(v, pos);
112     pos += sizeof(T);
113     return oldpos;
114   }
115   
116   template <typename T>
117   T pop(){
118     T temp = read<T>(pos);
119     pos += sizeof(T);
120     return temp;
121   }
122
123   template <typename T>
124   T peek(){
125     T temp = read<T>(pos);
126     return temp;
127   }
128
129   template <typename T0, typename T>
130   T peekSecond(){
131     T temp;
132     memcpy(&temp, buf+pos+sizeof(T0), sizeof(T));
133     return temp;
134   }
135
136   int datalength(){
137     return pos;
138   }
139      
140   void * buffer(){
141     return (void*) buf;
142   }  
143
144   void freeBuf(){
145     free(buf);
146   }
147
148   ~compressedBuffer(){
149     // don't free the buf because the user my have supplied the buffer
150   }
151     
152 };
153
154
155
156
157 // Global Readonly
158 CkGroupID traceSummaryGID;
159 bool summaryCcsStreaming;
160
161 int sumonly = 0;
162 int sumDetail = 0;
163
164 /**
165   For each TraceFoo module, _createTraceFoo() must be defined.
166   This function is called in _createTraces() generated in moduleInit.C
167 */
168 void _createTracesummary(char **argv)
169 {
170   DEBUGF(("%d createTraceSummary\n", CkMyPe()));
171   CkpvInitialize(TraceSummary*, _trace);
172   CkpvInitialize(int, previouslySentBins);
173   CkpvAccess(previouslySentBins) = 0;
174   CkpvAccess(_trace) = new  TraceSummary(argv);
175   CkpvAccess(_traces)->addTrace(CkpvAccess(_trace));
176   if (CkMyPe()==0) CkPrintf("Charm++: Tracemode Summary enabled.\n");
177 }
178
179
180 /// function call for starting a phase in trace summary logs 
181 extern "C" 
182 void CkSummary_StartPhase(int phase)
183 {
184    CkpvAccess(_trace)->startPhase(phase);
185 }
186
187
188 /// function call for adding an event mark
189 extern "C" 
190 void CkSummary_MarkEvent(int eventType)
191 {
192    CkpvAccess(_trace)->addEventType(eventType);
193 }
194
195 static inline void writeU(FILE* fp, int u)
196 {
197   fprintf(fp, "%4d", u);
198 }
199
200 PhaseEntry::PhaseEntry() 
201 {
202   int _numEntries=_entryTable.size();
203   // FIXME: Hopes there won't be more than 10 more EP's registered from now on...
204   nEPs = _numEntries+10; 
205   count = new int[nEPs];
206   times = new double[nEPs];
207   maxtimes = new double[nEPs];
208   for (int i=0; i<nEPs; i++) {
209     count[i] = 0;
210     times[i] = 0.0;
211     maxtimes[i] = 0.;
212   }
213 }
214
215 SumLogPool::~SumLogPool() 
216 {
217   if (!sumonly) {
218     write();
219     fclose(fp);
220     if (sumDetail) fclose(sdfp);
221   }
222   // free memory for mark
223   if (markcount > 0)
224   for (int i=0; i<MAX_MARKS; i++) {
225     for (int j=0; j<events[i].length(); j++)
226       delete events[i][j];
227   }
228   delete[] pool;
229   delete[] epInfo;
230   delete[] cpuTime;
231   delete[] numExecutions;
232 }
233
234 void SumLogPool::addEventType(int eventType, double time)
235 {
236    if (eventType <0 || eventType >= MAX_MARKS) {
237        CkPrintf("Invalid event type %d!\n", eventType);
238        return;
239    }
240    MarkEntry *e = new MarkEntry;
241    e->time = time;
242    events[eventType].push_back(e);
243    markcount ++;
244 }
245
246 SumLogPool::SumLogPool(char *pgm) : numBins(0), phaseTab(MAX_PHASES) 
247 {
248    // TBD: Can this be moved to initMem?
249   cpuTime = NULL;
250    poolSize = CkpvAccess(binCount);
251    if (poolSize % 2) poolSize++;        // make sure it is even
252    pool = new BinEntry[poolSize];
253    _MEMCHECK(pool);
254
255    this->pgm = new char[strlen(pgm)+1];
256    strcpy(this->pgm,pgm);
257    
258 #if 0
259    // create the sts file
260    if (CkMyPe() == 0) {
261      char *fname = 
262        new char[strlen(CkpvAccess(traceRoot))+strlen(".sum.sts")+1];
263      sprintf(fname, "%s.sum.sts", CkpvAccess(traceRoot));
264      stsfp = fopen(fname, "w+");
265      //CmiPrintf("File: %s \n", fname);
266      if (stsfp == 0) {
267        CmiAbort("Cannot open summary sts file for writing.\n");
268       }
269      delete[] fname;
270    }
271 #endif
272    
273    // event
274    markcount = 0;
275 }
276
277 void SumLogPool::initMem()
278 {
279    int _numEntries=_entryTable.size();
280    epInfoSize = _numEntries + NUM_DUMMY_EPS + 1; // keep a spare EP
281    epInfo = new SumEntryInfo[epInfoSize];
282    _MEMCHECK(epInfo);
283
284    cpuTime = NULL;
285    numExecutions = NULL;
286    if (sumDetail) {
287        cpuTime = new double[poolSize*epInfoSize];
288        _MEMCHECK(cpuTime);
289        memset(cpuTime, 0, poolSize*epInfoSize*sizeof(double));
290        numExecutions = new int[poolSize*epInfoSize];
291        _MEMCHECK(numExecutions);
292        memset(numExecutions, 0, poolSize*epInfoSize*sizeof(int));
293
294 //         int i, e;
295 //         for(i=0; i<poolSize; i++) {
296 //             for(e=0; e< epInfoSize; e++) {
297 //                 setCPUtime(i,e,0.0);
298 //                 setNumExecutions(i,e,0);
299 //             }
300 //         }
301    }
302 }
303
304 int SumLogPool::getUtilization(int interval, int ep) {
305     return (int)(getCPUtime(interval, ep) * 100.0 / CkpvAccess(binSize)); 
306 }
307
308 void SumLogPool::write(void) 
309 {
310   int i;
311   unsigned int j;
312   int _numEntries=_entryTable.size();
313
314   fp = NULL;
315   sdfp = NULL;
316
317   // create file(s)
318   // CmiPrintf("TRACE: %s:%d\n", fname, errno);
319   if (!sumonly) {
320     char pestr[10];
321     sprintf(pestr, "%d", CkMyPe());
322     int len = strlen(pgm) + strlen(".sumd.") + strlen(pestr) + 1;
323     char *fname = new char[len+1];
324     
325     sprintf(fname, "%s.%s.sum", pgm, pestr);
326     do {
327       fp = fopen(fname, "w+");
328     } while (!fp && errno == EINTR);
329     if (!fp) {
330       CkPrintf("[%d] Attempting to open [%s]\n",CkMyPe(),fname);
331       CmiAbort("Cannot open Summary Trace File for writing...\n");
332     }
333     
334     if (sumDetail) {
335       sprintf(fname, "%s.%s.sumd", pgm, pestr);
336       do {
337         sdfp = fopen(fname, "w+");
338       } while (!sdfp && errno == EINTR);
339       if(!sdfp) {
340         CmiAbort("Cannot open Detailed Summary Trace File for writing...\n");
341       }
342     }
343     delete[] fname;
344   }
345
346   fprintf(fp, "ver:%3.1f %d/%d count:%d ep:%d interval:%e", CkpvAccess(version), CkMyPe(), CkNumPes(), numBins, _numEntries, CkpvAccess(binSize));
347   if (CkpvAccess(version)>=3.0)
348   {
349     fprintf(fp, " phases:%d", phaseTab.numPhasesCalled());
350   }
351   fprintf(fp, "\n");
352
353   // write bin time
354 #if 1
355   int last=pool[0].getU();
356   writeU(fp, last);
357   int count=1;
358   for(j=1; j<numBins; j++) {
359     int u = pool[j].getU();
360     if (last == u) {
361       count++;
362     }
363     else {
364       if (count > 1) fprintf(fp, "+%d", count);
365       writeU(fp, u);
366       last = u;
367       count = 1;
368     }
369   }
370   if (count > 1) fprintf(fp, "+%d", count);
371 #else
372   for(j=0; j<numEntries; j++) 
373       pool[j].write(fp);
374 #endif
375   fprintf(fp, "\n");
376
377   // write entry execution time
378   fprintf(fp, "EPExeTime: ");
379   for (i=0; i<_numEntries; i++)
380     fprintf(fp, "%ld ", (long)(epInfo[i].epTime*1.0e6));
381   fprintf(fp, "\n");
382   // write entry function call times
383   fprintf(fp, "EPCallTime: ");
384   for (i=0; i<_numEntries; i++)
385     fprintf(fp, "%d ", epInfo[i].epCount);
386   fprintf(fp, "\n");
387   // write max entry function execute times
388   fprintf(fp, "MaxEPTime: ");
389   for (i=0; i<_numEntries; i++)
390     fprintf(fp, "%ld ", (long)(epInfo[i].epMaxTime*1.0e6));
391   fprintf(fp, "\n");
392 #if 0
393   for (i=0; i<SumEntryInfo::HIST_SIZE; i++) {
394     for (j=0; j<_numEntries; j++) 
395       fprintf(fp, "%d ", epInfo[j].hist[i]);
396     fprintf(fp, "\n");
397   }
398 #endif
399   // write marks
400   if (CkpvAccess(version)>=2.0) 
401   {
402     fprintf(fp, "NumMarks: %d ", markcount);
403     for (i=0; i<MAX_MARKS; i++) {
404       for(int j=0; j<events[i].length(); j++)
405         fprintf(fp, "%d %f ", i, events[i][j]->time);
406     }
407     fprintf(fp, "\n");
408   }
409   // write phases
410   if (CkpvAccess(version)>=3.0)
411   {
412     phaseTab.write(fp);
413   }
414   // write idle time
415   if (CkpvAccess(version)>=7.1) {
416     fprintf(fp, "IdlePercent: ");
417     int last=pool[0].getUIdle();
418     writeU(fp, last);
419     int count=1;
420     for(j=1; j<numBins; j++) {
421       int u = pool[j].getUIdle();
422       if (last == u) {
423         count++;
424       }
425       else {
426         if (count > 1) fprintf(fp, "+%d", count);
427         writeU(fp, u);
428         last = u;
429         count = 1;
430       }
431     }
432     if (count > 1) fprintf(fp, "+%d", count);
433     fprintf(fp, "\n");
434   }
435
436
437   // write summary details
438   if (sumDetail) {
439         fprintf(sdfp, "ver:%3.1f cpu:%d/%d numIntervals:%d numEPs:%d intervalSize:%e\n",
440                 CkpvAccess(version), CkMyPe(), CkNumPes(),
441                 numBins, _numEntries, CkpvAccess(binSize));
442
443         // Write out cpuTime in microseconds
444         // Run length encoding (RLE) along EP axis
445         fprintf(sdfp, "ExeTimePerEPperInterval ");
446         unsigned int e, i;
447         long last= (long) (getCPUtime(0,0)*1.0e6);
448         int count=0;
449         fprintf(sdfp, "%ld", last);
450         for(e=0; e<_numEntries; e++) {
451             for(i=0; i<numBins; i++) {
452
453                 long u= (long) (getCPUtime(i,e)*1.0e6);
454                 if (last == u) {
455                     count++;
456                 } else {
457
458                     if (count > 1) fprintf(sdfp, "+%d", count);
459                     fprintf(sdfp, " %ld", u);
460                     last = u;
461                     count = 1;
462                 }
463             }
464         }
465         if (count > 1) fprintf(sdfp, "+%d", count);
466         fprintf(sdfp, "\n");
467
468         // Write out numExecutions
469         // Run length encoding (RLE) along EP axis
470         fprintf(sdfp, "EPCallTimePerInterval ");
471         last= getNumExecutions(0,0);
472         count=0;
473         fprintf(sdfp, "%ld", last);
474         for(e=0; e<_numEntries; e++) {
475             for(i=0; i<numBins; i++) {
476
477                 long u= getNumExecutions(i, e);
478                 if (last == u) {
479                     count++;
480                 } else {
481
482                     if (count > 1) fprintf(sdfp, "+%d", count);
483                     fprintf(sdfp, " %ld", u);
484                     last = u;
485                     count = 1;
486                 }
487             }
488         }
489         if (count > 1) fprintf(sdfp, "+%d", count);
490         fprintf(sdfp, "\n");
491   }
492 }
493
494 void SumLogPool::writeSts(void)
495 {
496     // open sts file
497   char *fname = 
498        new char[strlen(CkpvAccess(traceRoot))+strlen(".sum.sts")+1];
499   sprintf(fname, "%s.sum.sts", CkpvAccess(traceRoot));
500   stsfp = fopen(fname, "w+");
501   //CmiPrintf("File: %s \n", fname);
502   if (stsfp == 0) {
503        CmiAbort("Cannot open summary sts file for writing.\n");
504   }
505   delete[] fname;
506
507   traceWriteSTS(stsfp,_numEvents);
508   for(int i=0;i<_numEvents;i++)
509     fprintf(stsfp, "EVENT %d Event%d\n", i, i);
510   fprintf(stsfp, "END\n");
511
512   fclose(stsfp);
513 }
514
515 // Called once per interval
516 void SumLogPool::add(double time, double idleTime, int pe) 
517 {
518   new (&pool[numBins++]) BinEntry(time, idleTime);
519   if (poolSize==numBins) {
520     shrink();
521   }
522 }
523
524 // Called once per run of an EP
525 // adds 'time' to EP's time, increments epCount
526 void SumLogPool::setEp(int epidx, double time) 
527 {
528   if (epidx >= epInfoSize) {
529         CmiAbort("Invalid entry point!!\n");
530   }
531   //CmiPrintf("set EP: %d %e \n", epidx, time);
532   epInfo[epidx].setTime(time);
533   // set phase table counter
534   phaseTab.setEp(epidx, time);
535 }
536
537 // Called once from endExecute, endPack, etc. this function updates
538 // the sumDetail intervals.
539 void SumLogPool::updateSummaryDetail(int epIdx, double startTime, double endTime)
540 {
541         if (epIdx >= epInfoSize) {
542             CmiAbort("Too many entry points!!\n");
543         }
544
545         double binSz = CkpvAccess(binSize);
546         int startingBinIdx, endingBinIdx;
547         startingBinIdx = (int)(startTime/binSz);
548         endingBinIdx = (int)(endTime/binSz);
549         // shrink if needed
550         while (endingBinIdx >= poolSize) {
551           shrink();
552           CmiAssert(CkpvAccess(binSize) > binSz);
553           binSz = CkpvAccess(binSize);
554           startingBinIdx = (int)(startTime/binSz);
555           endingBinIdx = (int)(endTime/binSz);
556         }
557
558         if (startingBinIdx == endingBinIdx) {
559             addToCPUtime(startingBinIdx, epIdx, endTime - startTime);
560         } else if (startingBinIdx < endingBinIdx) { // EP spans intervals
561             addToCPUtime(startingBinIdx, epIdx, (startingBinIdx+1)*binSz - startTime);
562             while(++startingBinIdx < endingBinIdx)
563                 addToCPUtime(startingBinIdx, epIdx, binSz);
564             addToCPUtime(endingBinIdx, epIdx, endTime - endingBinIdx*binSz);
565         } else {
566           CkPrintf("[%d] EP:%d Start:%lf End:%lf\n",CkMyPe(),epIdx,
567                    startTime, endTime);
568             CmiAbort("Error: end time of EP is less than start time\n");
569         }
570
571         incNumExecutions(startingBinIdx, epIdx);
572 }
573
574 // Shrinks pool[], cpuTime[], and numExecutions[]
575 void SumLogPool::shrink(void)
576 {
577 //  double t = CmiWallTimer();
578
579   // We ensured earlier that poolSize is even; therefore now numBins
580   // == poolSize == even.
581   int entries = numBins/2;
582   for (int i=0; i<entries; i++)
583   {
584      pool[i].time() = pool[i*2].time() + pool[i*2+1].time();
585      pool[i].getIdleTime() = pool[i*2].getIdleTime() + pool[i*2+1].getIdleTime();
586      if (sumDetail)
587      for (int e=0; e < epInfoSize; e++) {
588          setCPUtime(i, e, getCPUtime(i*2, e) + getCPUtime(i*2+1, e));
589          setNumExecutions(i, e, getNumExecutions(i*2, e) + getNumExecutions(i*2+1, e));
590      }
591   }
592   // zero out the remaining intervals
593   if (sumDetail) {
594     memset(&cpuTime[entries*epInfoSize], 0, (numBins-entries)*epInfoSize*sizeof(double));
595     memset(&numExecutions[entries*epInfoSize], 0, (numBins-entries)*epInfoSize*sizeof(int));
596   }
597   numBins = entries;
598   CkpvAccess(binSize) *= 2;
599
600 //CkPrintf("Shrinked binsize: %f entries:%d takes %fs!!!!\n", CkpvAccess(binSize), numEntries, CmiWallTimer()-t);
601 }
602
603 int  BinEntry::getU() 
604
605   return (int)(_time * 100.0 / CkpvAccess(binSize)); 
606 }
607
608 int BinEntry::getUIdle() {
609   return (int)(_idleTime * 100.0 / CkpvAccess(binSize));
610 }
611
612 void BinEntry::write(FILE* fp)
613 {
614   writeU(fp, getU());
615 }
616
617 TraceSummary::TraceSummary(char **argv):binStart(0.0),idleStart(0.0),
618                                         binTime(0.0),binIdle(0.0),msgNum(0)
619 {
620   if (CkpvAccess(traceOnPe) == 0) return;
621
622     // use absolute time
623   if (CmiTimerAbsolute()) binStart = CmiInitTime();
624
625   CkpvInitialize(int, binCount);
626   CkpvInitialize(double, binSize);
627   CkpvInitialize(double, version);
628   CkpvAccess(binSize) = BIN_SIZE;
629   CkpvAccess(version) = VER;
630   CkpvAccess(binCount) = DefaultBinCount;
631   if (CmiGetArgIntDesc(argv,"+bincount",&CkpvAccess(binCount), "Total number of summary bins"))
632     if (CkMyPe() == 0) 
633       CmiPrintf("Trace: bincount: %d\n", CkpvAccess(binCount));
634   CmiGetArgDoubleDesc(argv,"+binsize",&CkpvAccess(binSize),
635         "CPU usage log time resolution");
636   CmiGetArgDoubleDesc(argv,"+version",&CkpvAccess(version),
637         "Write this .sum file version");
638
639   epThreshold = 0.001; 
640   CmiGetArgDoubleDesc(argv,"+epThreshold",&epThreshold,
641         "Execution time histogram lower bound");
642   epInterval = 0.001; 
643   CmiGetArgDoubleDesc(argv,"+epInterval",&epInterval,
644         "Execution time histogram bin size");
645
646   sumonly = CmiGetArgFlagDesc(argv, "+sumonly", "merge histogram bins on processor 0");
647   // +sumonly overrides +sumDetail
648   if (!sumonly)
649       sumDetail = CmiGetArgFlagDesc(argv, "+sumDetail", "more detailed summary info");
650
651   _logPool = new SumLogPool(CkpvAccess(traceRoot));
652   // assume invalid entry point on start
653   execEp=INVALIDEP;
654   inIdle = 0;
655   inExec = 0;
656 }
657
658 void TraceSummary::traceClearEps(void)
659 {
660   _logPool->clearEps();
661 }
662
663 void TraceSummary::traceWriteSts(void)
664 {
665   if(CkMyPe()==0)
666       _logPool->writeSts();
667 }
668
669 void TraceSummary::traceClose(void)
670 {
671   if(CkMyPe()==0)
672       _logPool->writeSts();
673   CkpvAccess(_trace)->endComputation();
674   // destructor call the write()
675   delete _logPool;
676   // remove myself from traceArray so that no tracing will be called.
677   CkpvAccess(_traces)->removeTrace(this);
678 }
679
680 void TraceSummary::beginExecute(CmiObjId *tid)
681 {
682   beginExecute(-1,-1,_threadEP,-1);
683 }
684
685 void TraceSummary::beginExecute(envelope *e)
686 {
687   // no message means thread execution
688   if (e==NULL) {
689     beginExecute(-1,-1,_threadEP,-1);
690   }
691   else {
692     beginExecute(-1,-1,e->getEpIdx(),-1);
693   }  
694 }
695
696 void TraceSummary::beginExecute(char *msg)
697 {
698 #if CMK_SMP_TRACE_COMMTHREAD
699     //This function is called from comm thread in SMP mode
700     envelope *e = (envelope *)msg;
701     int num = _entryTable.size();
702     int ep = e->getEpIdx();
703     if(ep<0 || ep>=num) return;
704     if(_entryTable[ep]->traceEnabled)
705         beginExecute(-1,-1,e->getEpIdx(),-1);
706 #endif
707 }
708
709 void TraceSummary::beginExecute(int event,int msgType,int ep,int srcPe, int mlen, CmiObjId *idx)
710 {
711   if (execEp == TRACEON_EP) {
712     endExecute();
713   }
714   CmiAssert(inIdle == 0 && inExec == 0);
715   inExec = 1;
716
717   if (execEp != INVALIDEP) {
718     TRACE_WARN("Warning: TraceSummary two consecutive BEGIN_PROCESSING!\n");
719     return;
720   }
721   
722   execEp=ep;
723   double t = TraceTimer();
724   //CmiPrintf("start: %f \n", start);
725   
726   start = t;
727   double ts = binStart;
728   // fill gaps
729   while ((ts = ts + CkpvAccess(binSize)) < t) {
730     /* Keep as a template for error checking. The current form of this check
731        is vulnerable to round-off errors (eg. 0.001 vs 0.001 the first time
732        I used it). Perhaps an improved form could be used in case vastly
733        incompatible EP vs idle times are reported (binSize*2?).
734
735        This check will have to be duplicated before each call to add()
736
737     CkPrintf("[%d] %f vs %f\n", CkMyPe(),
738              binTime + binIdle, CkpvAccess(binSize));
739     CkAssert(binTime + binIdle <= CkpvAccess(binSize));
740     */
741      _logPool->add(binTime, binIdle, CkMyPe()); // add leftovers of last bin
742      binTime=0.0;                 // fill all other bins with 0 up till start
743      binIdle = 0.0;
744      binStart = ts;
745   }
746 }
747
748 void TraceSummary::endExecute(void)
749 {
750   CmiAssert(inIdle == 0 && inExec == 1);
751   inExec = 0;
752
753   double t = TraceTimer();
754   double ts = start;
755   double nts = binStart;
756
757 /*
758   if (execEp == TRACEON_EP) {
759     // if trace just got turned on, then one expects to see this
760     // END_PROCESSING event without seeing a preceeding BEGIN_PROCESSING
761     return;
762   }
763 */
764
765   if (execEp == INVALIDEP) {
766     TRACE_WARN("Warning: TraceSummary END_PROCESSING without BEGIN_PROCESSING!\n");
767     return;
768   }
769
770   if (execEp >= 0)
771   {
772     _logPool->setEp(execEp, t-ts);
773   }
774
775   while ((nts = nts + CkpvAccess(binSize)) < t)
776   {
777     // fill the bins with time for this entry method
778      binTime += nts-ts;
779      binStart  = nts;
780      // This calls shrink() if needed
781      _logPool->add(binTime, binIdle, CkMyPe()); 
782      binTime = 0.0;
783      binIdle = 0.0;
784      ts = nts;
785   }
786   binTime += t - ts;
787
788   if (sumDetail && execEp >= 0)
789       _logPool->updateSummaryDetail(execEp, start, t);
790
791   execEp = INVALIDEP;
792 }
793
794 void TraceSummary::endExecute(char *msg){
795 #if CMK_SMP_TRACE_COMMTHREAD
796     //This function is called from comm thread in SMP mode
797     envelope *e = (envelope *)msg;
798     int num = _entryTable.size();
799     int ep = e->getEpIdx();
800     if(ep<0 || ep>=num) return;
801     if(_entryTable[ep]->traceEnabled){
802         endExecute();
803     }
804 #endif    
805 }
806
807 void TraceSummary::beginIdle(double currT)
808 {
809   if (execEp == TRACEON_EP) {
810     endExecute();
811   }
812
813   CmiAssert(inIdle == 0 && inExec == 0);
814   inIdle = 1;
815   double t = TraceTimer(currT);
816   
817   // mark the time of this idle period. Only the next endIdle should see
818   // this value
819   idleStart = t; 
820   double ts = binStart;
821   // fill gaps
822   while ((ts = ts + CkpvAccess(binSize)) < t) {
823     _logPool->add(binTime, binIdle, CkMyPe()); // add leftovers of last bin
824     binTime=0.0;                 // fill all other bins with 0 up till start
825     binIdle = 0.0;
826     binStart = ts;
827   }
828 }
829
830 void TraceSummary::endIdle(double currT)
831 {
832   CmiAssert(inIdle == 1 && inExec == 0);
833   inIdle = 0;
834   double t = TraceTimer(currT);
835   double t_idleStart = idleStart;
836   double t_binStart = binStart;
837
838   while ((t_binStart = t_binStart + CkpvAccess(binSize)) < t)
839   {
840     // fill the bins with time for idle
841     binIdle += t_binStart - t_idleStart;
842     binStart = t_binStart;
843     _logPool->add(binTime, binIdle, CkMyPe()); // This calls shrink() if needed
844     binTime = 0.0;
845     binIdle = 0.0;
846     t_idleStart = t_binStart;
847   }
848   binIdle += t - t_idleStart;
849 }
850
851 void TraceSummary::traceBegin(void)
852 {
853     // fake as a start of an event, assuming traceBegin is called inside an
854     // entry function.
855   beginExecute(-1, -1, TRACEON_EP, -1, -1);
856 }
857
858 void TraceSummary::traceEnd(void)
859 {
860   endExecute();
861 }
862
863 void TraceSummary::beginPack(void)
864 {
865     packstart = CmiWallTimer();
866 }
867
868 void TraceSummary::endPack(void)
869 {
870     _logPool->setEp(_packEP, CmiWallTimer() - packstart);
871     if (sumDetail)
872         _logPool->updateSummaryDetail(_packEP,  TraceTimer(packstart), TraceTimer(CmiWallTimer()));
873 }
874
875 void TraceSummary::beginUnpack(void)
876 {
877     unpackstart = CmiWallTimer();
878 }
879
880 void TraceSummary::endUnpack(void)
881 {
882     _logPool->setEp(_unpackEP, CmiWallTimer()-unpackstart);
883     if (sumDetail)
884         _logPool->updateSummaryDetail(_unpackEP,  TraceTimer(unpackstart), TraceTimer(CmiWallTimer()));
885 }
886
887 void TraceSummary::beginComputation(void)
888 {
889   // initialze arrays because now the number of entries is known.
890   _logPool->initMem();
891 }
892
893 void TraceSummary::endComputation(void)
894 {
895   static int done = 0;
896   if (done) return;
897   done = 1;
898   if (msgNum==0) {
899 //CmiPrintf("Add at last: %d pe:%d time:%f msg:%d\n", index, CkMyPe(), bin, msgNum);
900      _logPool->add(binTime, binIdle, CkMyPe());
901      binTime = 0.0;
902      binIdle = 0.0;
903      msgNum ++;
904
905      binStart  += CkpvAccess(binSize);
906      double t = TraceTimer();
907      double ts = binStart;
908      while (ts < t)
909      {
910        _logPool->add(binTime, binIdle, CkMyPe());
911        binTime=0.0;
912        binIdle = 0.0;
913        ts += CkpvAccess(binSize);
914      }
915
916   }
917 }
918
919 void TraceSummary::addEventType(int eventType)
920 {
921   _logPool->addEventType(eventType, TraceTimer());
922 }
923
924 void TraceSummary::startPhase(int phase)
925 {
926    _logPool->startPhase(phase);
927 }
928
929 void TraceSummary::traceEnableCCS() {
930   CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
931   sumProxy.initCCS();
932 }
933
934
935 void TraceSummary::fillData(double *buffer, double reqStartTime, 
936                             double reqBinSize, int reqNumBins) {
937   // buffer has to be pre-allocated by the requester and must be an array of
938   // size reqNumBins.
939   //
940   // Assumptions: **CWL** FOR DEMO ONLY - a production-capable version will
941   //              need a number of these assumptions dropped:
942   //              1) reqBinSize == binSize (unrealistic)
943   //              2) bins boundary aligned (ok even under normal circumstances)
944   //              3) bins are "factor"-aligned (where reqBinSize != binSize)
945   //              4) bins are always available (true unless flush)
946   //              5) bins always starts from 0 (unrealistic)
947
948   // works only because of 1)
949   // **CWL** - FRACKING STUPID NAME "binStart" has nothing to do with 
950   //           "starting" at all!
951   int binOffset = (int)(reqStartTime/reqBinSize); 
952   for (int i=binOffset; i<binOffset + reqNumBins; i++) {
953     // CkPrintf("[%d] %f\n", i, pool()->getTime(i));
954     buffer[i-binOffset] = pool()->getTime(i);
955   }
956 }
957
958
959 /// for TraceSummaryBOC
960
961 void TraceSummaryBOC::initCCS() {
962   if(firstTime){
963     CkPrintf("[%d] initCCS() called for first time\n", CkMyPe());
964     // initializing CCS-based parameters on all processors
965     lastRequestedIndexBlock = 0;
966     indicesPerBlock = 1000;
967     collectionGranularity = 0.001; // time in seconds
968     nBufferedBins = 0;
969     
970     // initialize buffer, register CCS handler and start the collection
971     // pulse only on pe 0.
972     if (CkMyPe() == 0) { 
973       ccsBufferedData = new CkVec<double>();
974     
975       CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
976       CkPrintf("Trace Summary now listening in for CCS Client\n");
977       CcsRegisterHandler("CkPerfSummaryCcsClientCB", 
978                          CkCallback(CkIndex_TraceSummaryBOC::ccsRequestSummaryDouble(NULL), sumProxy[0]));
979       CcsRegisterHandler("CkPerfSummaryCcsClientCB uchar", 
980                          CkCallback(CkIndex_TraceSummaryBOC::ccsRequestSummaryUnsignedChar(NULL), sumProxy[0])); 
981
982       CkPrintf("[%d] Setting up periodic startCollectData callback\n", CkMyPe());
983       CcdCallOnConditionKeep(CcdPERIODIC_1second, startCollectData,
984                              (void *)this);
985       summaryCcsStreaming = CmiTrue;
986     }
987     firstTime = false;
988   }
989 }
990
991 /** Return summary information as double precision values for each sample period. 
992     The actual data collection is in double precision values. 
993
994     The units on the returned values are total execution time across all PEs.
995 */
996 void TraceSummaryBOC::ccsRequestSummaryDouble(CkCcsRequestMsg *m) {
997   double *sendBuffer;
998
999   CkPrintf("[%d] Request from Client detected.\n", CkMyPe());
1000
1001   CkPrintf("Responding ...\n");
1002   int datalength = 0;
1003   // if we have no data to send, send an acknowledgement code of -13.37
1004   // instead.
1005   if (ccsBufferedData->length() == 0) {
1006     sendBuffer = new double[1];
1007     sendBuffer[0] = -13.37;
1008     datalength = sizeof(double);
1009     CcsSendDelayedReply(m->reply, datalength, (void *)sendBuffer);
1010     delete [] sendBuffer;
1011   } else {
1012     sendBuffer = ccsBufferedData->getVec();
1013     datalength = ccsBufferedData->length()*sizeof(double);
1014     CcsSendDelayedReply(m->reply, datalength, (void *)sendBuffer);
1015     ccsBufferedData->free();
1016   }
1017   CkPrintf("Response Sent. Proceeding with computation.\n");
1018   delete m;
1019 }
1020
1021
1022 /** Return summary information as unsigned char values for each sample period. 
1023     The actual data collection is in double precision values.
1024
1025     This returns the utilization in a range from 0 to 200.
1026 */
1027 void TraceSummaryBOC::ccsRequestSummaryUnsignedChar(CkCcsRequestMsg *m) {
1028   unsigned char *sendBuffer;
1029
1030   CkPrintf("[%d] Request from Client detected. \n", CkMyPe());
1031
1032   CkPrintf("Responding ...\n");
1033   int datalength = 0;
1034
1035   if (ccsBufferedData->length() == 0) {
1036     sendBuffer = new unsigned char[1];
1037     sendBuffer[0] = 255;
1038     datalength = sizeof(unsigned char);
1039     CcsSendDelayedReply(m->reply, datalength, (void *)sendBuffer);
1040     delete [] sendBuffer;
1041   } else {
1042     double * doubleData = ccsBufferedData->getVec();
1043     int numData = ccsBufferedData->length();
1044     
1045     // pack data into unsigned char array
1046     sendBuffer = new unsigned char[numData];
1047     
1048     for(int i=0;i<numData;i++){
1049       sendBuffer[i] = 1000.0 * doubleData[i] / (double)CkNumPes() * 200.0; // max = 200 is the same as 100% utilization
1050       int v = sendBuffer[i];
1051     }    
1052
1053     datalength = sizeof(unsigned char) * numData;
1054     
1055     CcsSendDelayedReply(m->reply, datalength, (void *)sendBuffer);
1056     ccsBufferedData->free();
1057     delete [] sendBuffer;
1058   }
1059   CkPrintf("Response Sent. Proceeding with computation.\n");
1060   delete m;
1061 }
1062
1063
1064
1065 void startCollectData(void *data, double currT) {
1066   CkAssert(CkMyPe() == 0);
1067   // CkPrintf("startCollectData()\n");
1068   TraceSummaryBOC *sumObj = (TraceSummaryBOC *)data;
1069   int lastRequestedIndexBlock = sumObj->lastRequestedIndexBlock;
1070   double collectionGranularity = sumObj->collectionGranularity;
1071   int indicesPerBlock = sumObj->indicesPerBlock;
1072   
1073   double startTime = lastRequestedIndexBlock*
1074     collectionGranularity * indicesPerBlock;
1075   int numIndicesToGet = (int)floor((currT - startTime)/
1076                                    collectionGranularity);
1077   int numBlocksToGet = numIndicesToGet/indicesPerBlock;
1078   // **TODO** consider limiting the total number of blocks each collection
1079   //   request will pick up. This is to limit the amount of perturbation
1080   //   if it proves to be a problem.
1081   CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
1082
1083    sumProxy.collectSummaryData(startTime, 
1084                        collectionGranularity,
1085                        numBlocksToGet*indicesPerBlock);
1086   // assume success
1087   sumObj->lastRequestedIndexBlock += numBlocksToGet; 
1088 }
1089
1090 void TraceSummaryBOC::collectSummaryData(double startTime, double binSize,
1091                                   int numBins) {
1092   // CkPrintf("[%d] asked to contribute performance data\n", CkMyPe());
1093
1094   double *contribution = new double[numBins];
1095   for (int i=0; i<numBins; i++) {
1096     contribution[i] = 0.0;
1097   }
1098   CkpvAccess(_trace)->fillData(contribution, startTime, binSize, numBins);
1099
1100   /*
1101   for (int i=0; i<numBins; i++) {
1102     CkPrintf("[%d] %f\n", i, contribution[i]);
1103   }
1104   */
1105
1106   CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
1107   CkCallback cb(CkIndex_TraceSummaryBOC::summaryDataCollected(NULL), sumProxy[0]);
1108   contribute(sizeof(double)*numBins, contribution, CkReduction::sum_double, 
1109              cb);
1110 }
1111
1112 void TraceSummaryBOC::summaryDataCollected(CkReductionMsg *msg) {
1113   CkAssert(CkMyPe() == 0);
1114   // **CWL** No memory management for the ccs buffer for now.
1115
1116   // CkPrintf("[%d] Reduction completed and received\n", CkMyPe());
1117   double *recvData = (double *)msg->getData();
1118   int numBins = msg->getSize()/sizeof(double);
1119
1120   // if there's an easier way to append a data block to a CkVec, I'll take it
1121   for (int i=0; i<numBins; i++) {
1122     ccsBufferedData->insertAtEnd(recvData[i]);
1123   }
1124   delete msg;
1125 }
1126
1127
1128
1129
1130 void TraceSummaryBOC::startSumOnly()
1131 {
1132   CmiAssert(CkMyPe() == 0);
1133
1134   CProxy_TraceSummaryBOC p(traceSummaryGID);
1135   int size = CkpvAccess(_trace)->pool()->getNumEntries();
1136   p.askSummary(size);
1137 }
1138
1139 void TraceSummaryBOC::askSummary(int size)
1140 {
1141   if (CkpvAccess(_trace) == NULL) return;
1142
1143   int traced = CkpvAccess(_trace)->traceOnPE();
1144
1145   BinEntry *reductionBuffer = new BinEntry[size+1];
1146   reductionBuffer[size].time() = traced;  // last element is the traced pe count
1147   reductionBuffer[size].getIdleTime() = 0;  // last element is the traced pe count
1148   if (traced) {
1149     CkpvAccess(_trace)->endComputation();
1150     int n = CkpvAccess(_trace)->pool()->getNumEntries();
1151     BinEntry *localBins = CkpvAccess(_trace)->pool()->bins();
1152     if (n>size) n=size;
1153     for (int i=0; i<n; i++) reductionBuffer[i] = localBins[i];
1154   }
1155
1156   contribute(sizeof(BinEntry)*(size+1), reductionBuffer, 
1157              CkReduction::sum_double);
1158   delete [] reductionBuffer;
1159 }
1160
1161 //extern "C" void _CkExit();
1162
1163 void TraceSummaryBOC::sendSummaryBOC(CkReductionMsg *msg)
1164 {
1165   if (CkpvAccess(_trace) == NULL) return;
1166
1167   CkAssert(CkMyPe() == 0);
1168
1169   int n = msg->getSize()/sizeof(BinEntry);
1170   nBins = n-1;
1171   bins = (BinEntry *)msg->getData();
1172   nTracedPEs = (int)bins[n-1].time();
1173   // CmiPrintf("traced: %d entry:%d\n", nTracedPEs, nBins);
1174
1175   write();
1176
1177   delete msg;
1178
1179   CkExit();
1180 }
1181
1182 void TraceSummaryBOC::write(void) 
1183 {
1184   int i;
1185   unsigned int j;
1186
1187   char *fname = new char[strlen(CkpvAccess(traceRoot))+strlen(".sum")+1];
1188   sprintf(fname, "%s.sum", CkpvAccess(traceRoot));
1189   FILE *sumfp = fopen(fname, "w+");
1190   //CmiPrintf("File: %s \n", fname);
1191   if(sumfp == 0)
1192       CmiAbort("Cannot open summary sts file for writing.\n");
1193   delete[] fname;
1194
1195   int _numEntries=_entryTable.size();
1196   fprintf(sumfp, "ver:%3.1f %d/%d count:%d ep:%d interval:%e numTracedPE:%d", CkpvAccess(version), CkMyPe(), CkNumPes(), nBins, _numEntries, CkpvAccess(binSize), nTracedPEs);
1197   fprintf(sumfp, "\n");
1198
1199   // write bin time
1200 #if 0
1201   int last=pool[0].getU();
1202   writeU(fp, last);
1203   int count=1;
1204   for(j=1; j<numEntries; j++) {
1205     int u = pool[j].getU();
1206     if (last == u) {
1207       count++;
1208     }
1209     else {
1210       if (count > 1) fprintf(fp, "+%d", count);
1211       writeU(fp, u);
1212       last = u;
1213       count = 1;
1214     }
1215   }
1216   if (count > 1) fprintf(fp, "+%d", count);
1217 #else
1218   for(j=0; j<nBins; j++) {
1219     bins[j].time() /= nTracedPEs;
1220     bins[j].write(sumfp);
1221   }
1222 #endif
1223   fprintf(sumfp, "\n");
1224   fclose(sumfp);
1225
1226 }
1227
1228 extern "C" void CombineSummary()
1229 {
1230 #if CMK_TRACE_ENABLED
1231   CmiPrintf("[%d] CombineSummary called!\n", CkMyPe());
1232   if (sumonly) {
1233     CmiPrintf("[%d] Sum Only start!\n", CkMyPe());
1234       // pe 0 start the sumonly process
1235     CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
1236     sumProxy[0].startSumOnly();
1237   }
1238   else {
1239     _TRACE_BEGIN_EXECUTE_DETAILED(-1, -1, _threadEP,CkMyPe(), 0, NULL);
1240     CkExit();
1241   }
1242 #else
1243   CkExit();
1244 #endif
1245 }
1246
1247 void initTraceSummaryBOC()
1248 {
1249 #ifdef __BLUEGENE__
1250   if(BgNodeRank()==0) {
1251 #else
1252   if (CkMyRank() == 0) {
1253 #endif
1254     registerExitFn(CombineSummary);
1255   }
1256 }
1257
1258
1259
1260
1261
1262
1263 #include "TraceSummary.def.h"
1264
1265
1266 /*@}*/
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286