add definition for CmiInt16
[charm.git] / src / ck-perf / trace-utilization.h
1 /**
2  * \addtogroup CkPerf
3  */
4 /*@{*/
5
6
7 #ifndef _TRACE_UTILIZATION_H
8 #define _TRACE_UTILIZATION_H
9
10 #include <stdio.h>
11 #include <errno.h>
12 #include <deque>
13
14 #include "charm++.h"
15
16
17 #include "trace.h"
18 #include "envelope.h"
19 #include "register.h"
20 #include "trace-common.h"
21 #include "ckcallback-ccs.h"
22
23 #include "TraceUtilization.decl.h"
24
25 #define INVALIDEP     -2
26 #define TRACEON_EP    -3
27 #define NUM_DUMMY_EPS 9
28
29
30 // initial bin size, time in seconds
31 #define  BIN_PER_SEC    1000
32 #define  BIN_SIZE       0.001
33 #define NUM_BINS      32768
34
35
36 /** Define the types used in the gathering of sum detail statistics for use with CCS */
37 #define numBins_T int
38 #define numProcs_T int
39 #define entriesInBin_T short
40 #define ep_T short
41 #define utilization_T unsigned char
42 #define other_EP 10000
43
44
45 /* readonly */ extern CProxy_TraceUtilizationBOC traceUtilizationGroupProxy;
46
47 void collectUtilizationData(void *data, double currT);
48
49
50
51 /** A main chare that can create the BOC/group */
52 class TraceUtilizationInit : public Chare {
53  public:
54   TraceUtilizationInit(CkArgMsg *m) {
55     CkPrintf("[%d] TraceUtilizationInit creating traceUtilizationGroupProxy");
56     fflush(stdout);
57    
58     traceUtilizationGroupProxy = CProxy_TraceUtilizationBOC::ckNew();
59     
60     CkPrintf("Trace Summary now listening in for CCS Client\n");
61     CcsRegisterHandler("CkPerfSumDetail compressed", CkCallback(CkIndex_TraceUtilizationBOC::ccsRequestSumDetailCompressed(NULL), traceUtilizationGroupProxy[0])); 
62     
63     CkPrintf("[%d] Setting up periodic startCollectData callback\n", CkMyPe());
64     CcdCallOnConditionKeep(CcdPERIODIC_1second, collectUtilizationData, (void *)NULL);
65
66   }
67 };
68
69
70
71
72
73
74 /** 
75     A class that reads/writes a buffer out of different types of data.
76
77     This class exists because I need to get references to parts of the buffer 
78     that have already been used so that I can increment counters inside the buffer.
79 */
80
81 class compressedBuffer {
82  public:
83   char* buf;
84   int pos; ///<< byte position just beyond the previously read/written data
85
86   compressedBuffer(){
87     buf = NULL;
88     pos = 0;
89   }
90
91   compressedBuffer(int bytes){
92     buf = (char*)malloc(bytes);
93     pos = 0;
94   }
95
96   compressedBuffer(void *buffer){
97     buf = (char*)buffer;
98     pos = 0;
99   }
100   
101   void init(void *buffer){
102     buf = (char*)buffer;
103     pos = 0;
104   }
105   
106   inline void * currentPtr(){
107     return (void*)(buf+pos);
108   }
109   
110   template <typename T>
111     T read(int offset){
112     // to resolve unaligned writes causing bus errors, need memcpy
113     T v;
114     memcpy(&v, buf+offset, sizeof(T));
115     return v;
116   }
117   
118   template <typename T>
119     void write(T v, int offset){
120     T v2 = v; // on stack
121     // to resolve unaligned writes causing bus errors, need memcpy
122     memcpy(buf+offset, &v2, sizeof(T));
123   }
124     
125   template <typename T>
126     void increment(int offset){
127     T temp;
128     temp = read<T>(offset);
129     temp ++;
130     write<T>(temp, offset);
131   }
132
133   template <typename T>
134     void accumulate(T v, int offset){
135     T temp;
136     temp = read<T>(offset);
137     temp += v;
138     write<T>(temp, offset);
139   }
140   
141   template <typename T>
142     int push(T v){
143     int oldpos = pos;
144     write<T>(v, pos);
145     pos += sizeof(T);
146     return oldpos;
147   }
148   
149   template <typename T>
150     T pop(){
151     T temp = read<T>(pos);
152     pos += sizeof(T);
153     return temp;
154   }
155
156   template <typename T>
157     T peek(){
158     T temp = read<T>(pos);
159     return temp;
160   }
161
162   template <typename T0, typename T>
163     T peekSecond(){
164     T temp;
165     memcpy(&temp, buf+pos+sizeof(T0), sizeof(T));
166     return temp;
167   }
168
169   int datalength(){
170     return pos;
171   }
172      
173   void * buffer(){
174     return (void*) buf;
175   }  
176
177   void freeBuf(){
178     free(buf);
179   }
180
181   ~compressedBuffer(){
182     // don't free the buf because the user my have supplied the buffer
183   }
184     
185 };
186
187
188
189 compressedBuffer compressAvailableNewSumDetail(int max=10000);
190 void mergeCompressedBin(compressedBuffer *srcBufferArray, int numSrcBuf, int *numProcsRepresentedInMessage, int totalProcsAcrossAllMessages, compressedBuffer &destBuffer);
191 //void printSumDetailInfo(int desiredBinsToSend);
192 CkReductionMsg *sumDetailCompressedReduction(int nMsg,CkReductionMsg **msgs);
193 void printCompressedBuf(compressedBuffer b);
194 compressedBuffer fakeCompressedMessage();
195 compressedBuffer emptyCompressedBuffer();
196 void sanityCheckCompressedBuf(compressedBuffer b);
197 bool isCompressedBufferSane(compressedBuffer b);
198 double averageUtilizationInBuffer(compressedBuffer b);
199
200
201
202
203
204 class TraceUtilization : public Trace {
205  public:
206   int execEp; // the currently executing EP
207   double start; // the start time for the currently executing EP
208
209   unsigned int epInfoSize;
210
211   double *cpuTime;     // NUM_BINS*epInfoSize
212   int lastBinUsed;
213   unsigned int numBinsSent;
214   unsigned int previouslySentBins;
215
216
217   TraceUtilization() {
218     execEp == TRACEON_EP;
219     cpuTime = NULL;
220     lastBinUsed = -1;
221     numBinsSent = 0;
222   }
223
224
225   /// Initialize memory after the number of EPs has been determined
226   void initMem(){
227     int _numEntries=_entryTable.size();
228     epInfoSize = _numEntries + NUM_DUMMY_EPS + 1; // keep a spare EP
229     //    CkPrintf("allocating cpuTime[%d]\n", (int) (NUM_BINS*epInfoSize));
230     cpuTime = new double[NUM_BINS*epInfoSize];
231     _MEMCHECK(cpuTime);
232
233     if(CkMyPe() == 0)
234       writeSts();
235
236   }
237
238   void writeSts(void);
239
240   void creation(envelope *e, int epIdx, int num=1) {}
241
242   void beginExecute(envelope *e);
243   void beginExecute(CmiObjId  *tid);
244   void beginExecute(int event,int msgType,int ep,int srcPe, int mlen=0, CmiObjId *idx=NULL);
245   void endExecute(void);
246   void beginIdle(double currT) {}
247   void endIdle(double currT) {}
248   void beginPack(void){}
249   void endPack(void) {}
250   void beginUnpack(void) {}
251   void endUnpack(void) {}
252   void beginComputation(void) {  initMem(); }
253   void endComputation(void) {}
254   void traceClearEps() {}
255   void traceWriteSts() {}
256   void traceClose() {}
257
258   void addEventType(int eventType);
259   
260   int cpuTimeEntriesAvailable() const { return lastBinUsed+1; }
261   int cpuTimeEntriesSentSoFar() const { return numBinsSent; }
262   void incrementNumCpuTimeEntriesSent(int n) { numBinsSent += n; }
263
264
265   double sumUtilization(int startBin, int endBin);
266
267
268   void updateCpuTime(int epIdx, double startTime, double endTime){
269     
270     //    CkPrintf("updateCpuTime(startTime=%lf endTime=%lf)\n", startTime, endTime);
271     
272     if (epIdx >= epInfoSize) {
273       CkPrintf("WARNING: epIdx=%d >=  epInfoSize=%d\n", (int)epIdx, (int)epInfoSize );
274       return;
275     }
276     
277     int startingBinIdx = (int)(startTime/BIN_SIZE);
278     int endingBinIdx = (int)(endTime/BIN_SIZE);
279     
280     if (startingBinIdx == endingBinIdx) {
281       addToCPUtime(startingBinIdx, epIdx, endTime - startTime);
282     } else if (startingBinIdx < endingBinIdx) { // EP spans intervals
283       addToCPUtime(startingBinIdx, epIdx, (startingBinIdx+1)*BIN_SIZE - startTime);
284       while(++startingBinIdx < endingBinIdx)
285         addToCPUtime(startingBinIdx, epIdx, BIN_SIZE);
286       addToCPUtime(endingBinIdx, epIdx, endTime - endingBinIdx*BIN_SIZE);
287     } 
288   }
289
290
291   /// Zero out all entries from (lastBinUsed+1) up to and including interval.
292   inline void zeroIfNecessary(unsigned int interval){
293     for(unsigned int i=lastBinUsed+1; i<= interval; i++){
294       // zero all eps for this bin
295       for(unsigned int j=0;j<epInfoSize;j++){
296         cpuTime[(i%NUM_BINS)*epInfoSize+j] = 0.0;
297       }
298     }
299     lastBinUsed = interval;
300   }
301     
302   UInt getEpInfoSize() { return epInfoSize; }
303
304   /// for Summary-Detail
305   inline double getCPUtime(unsigned int interval, unsigned int ep) {
306     CkAssert(ep < epInfoSize);
307     if(cpuTime != NULL && interval <= lastBinUsed)
308       return cpuTime[(interval%NUM_BINS)*epInfoSize+ep]; 
309     else {
310       CkPrintf("getCPUtime called with invalid options: cpuTime=%p interval=%d ep=%d\n", cpuTime, (int)interval, (int)ep);
311       return 0.0;
312     }
313   }
314
315   inline void addToCPUtime(unsigned int interval, unsigned int ep, double val){
316     //    CkAssert(ep < epInfoSize);
317     zeroIfNecessary(interval);
318     //    CkPrintf("addToCPUtime interval=%d\n", (int)interval);
319     cpuTime[(interval%NUM_BINS)*epInfoSize+ep] += val;
320   }
321    
322
323   inline double getUtilization(int interval, int ep){
324     return getCPUtime(interval, ep) * 100.0 * (double)BIN_PER_SEC; 
325   }
326
327
328   compressedBuffer compressNRecentSumDetail(int desiredBinsToSend);
329
330
331 };
332
333
334
335
336
337
338 class TraceUtilizationBOC : public CBase_TraceUtilizationBOC {
339   
340   std::deque<CkReductionMsg *> storedSumDetailResults;
341   
342  public:
343   TraceUtilizationBOC() {}
344   TraceUtilizationBOC(CkMigrateMessage* msg) {}
345   ~TraceUtilizationBOC() {}
346  
347
348   /// Entry methods:
349   void ccsRequestSumDetailCompressed(CkCcsRequestMsg *m);
350   void collectSumDetailData();
351   void sumDetailDataCollected(CkReductionMsg *);
352
353 };
354
355
356
357
358
359 #endif
360
361 /*@}*/