doc: Add serial to list of ci file reserved words
[charm.git] / src / ck-perf / trace-utilization.C
1 /**
2  * \addtogroup CkPerf
3 */
4 /*@{*/
5
6 #include "trace-utilization.h"
7
8
9 /* readonly */ CProxy_TraceUtilizationBOC traceUtilizationGroupProxy;
10
11
12 /// A reduction type for merging compressed sum detail data
13 CkReduction::reducerType sumDetailCompressedReducer;
14
15
16 void collectUtilizationData(void *ignore, double currT) {
17   // Only start collecting after a few seconds have passed. This way there will hopefully be at least 1000 bins to pickup each time we try.
18   static int numTimesCalled = 0;
19   numTimesCalled ++;
20   if(numTimesCalled > 4){
21     traceUtilizationGroupProxy.collectSumDetailData();
22   }
23 }
24
25
26 CkpvStaticDeclare(TraceUtilization*, _trace);
27
28 /**
29   For each TraceFoo module, _createTraceFoo() must be defined.
30   This function is called in _createTraces() generated in moduleInit.C
31 */
32 void _createTraceutilization(char **argv)
33 {
34   //  CkPrintf("[%d] _createTraceutilization\n", CkMyPe());
35
36   // Register the reducer
37   CkAssert(sizeof(short) == 2);
38   sumDetailCompressedReducer=CkReduction::addReducer(sumDetailCompressedReduction);
39
40   CkpvInitialize(TraceUtilization*, _trace);
41   CkpvAccess(_trace) = new TraceUtilization();
42   CkpvAccess(_traces)->addTrace(CkpvAccess(_trace));
43
44 }
45
46
47
48 void TraceUtilization::beginExecute(CmiObjId *tid)
49 {
50   beginExecute(-1,-1,_threadEP,-1);
51 }
52
53 void TraceUtilization::beginExecute(envelope *e)
54 {
55   // no message means thread execution
56   if (e==NULL) {
57     beginExecute(-1,-1,_threadEP,-1);
58   }
59   else {
60     beginExecute(-1,-1,e->getEpIdx(),-1);
61   }  
62 }
63
64 void TraceUtilization::beginExecute(int event,int msgType,int ep,int srcPe, int mlen, CmiObjId *idx)
65 {
66   if (execEp != INVALIDEP) {
67     TRACE_WARN("Warning: TraceUtilization two consecutive BEGIN_PROCESSING!\n");
68     return;
69   }
70   
71   execEp=ep;
72   start = TraceTimer();
73 }
74
75
76 void TraceUtilization::endExecute(void)
77 {
78
79   if (execEp == TRACEON_EP) {
80     // if trace just got turned on, then one expects to see this
81     // END_PROCESSING event without seeing a preceeding BEGIN_PROCESSING
82     return;
83   }
84
85   double endTime = TraceTimer();
86  
87   updateCpuTime(execEp, start, endTime);
88
89
90   execEp = INVALIDEP;
91 }
92
93
94
95 void TraceUtilization::addEventType(int eventType)
96 {
97   CkPrintf("FIXME handle TraceUtilization::addEventType(%d)\n", eventType);
98 }
99
100
101
102
103 /**
104
105 Send back to the client compressed sum-detail style measurements about the 
106 utilization for each active PE combined across all PEs.
107
108 The data format sent by this handler is a bunch of records(one for each bin) of the following format:
109    #samples (EP,utilization)* 
110
111 One example record for two EPS that executed during the sample period. 
112 EP 3 used 150/200 of the time while EP 122 executed for 20/200 of the time. 
113 All of these would be packed as bytes into the message:
114 2 3 150 122 20
115
116  */
117 void TraceUtilizationBOC::ccsRequestSumDetailCompressed(CkCcsRequestMsg *m) {
118   CkPrintf("CCS request for compressed sum detail. (found %d stored in deque)\n",  storedSumDetailResults.size() );
119   //  CkAssert(sumDetail);
120   int datalength;
121
122 #if 0
123
124   compressedBuffer fakeMessage = fakeCompressedMessage();
125   CcsSendDelayedReply(m->reply, fakeMessage.datalength(), fakeMessage.buffer() );
126   fakeMessage.freeBuf();
127
128 #else
129
130   if (storedSumDetailResults.size()  == 0) {
131     compressedBuffer b = emptyCompressedBuffer();
132     CcsSendDelayedReply(m->reply, b.datalength(), b.buffer()); 
133     b.freeBuf();
134   } else {
135     CkReductionMsg * msg = storedSumDetailResults.front();
136     storedSumDetailResults.pop_front();
137
138     
139     void *sendBuffer = (void *)msg->getData();
140     datalength = msg->getSize();
141     CcsSendDelayedReply(m->reply, datalength, sendBuffer);
142     
143     delete msg;
144   }
145     
146   
147 #endif
148
149   //  CkPrintf("CCS response of %d bytes sent.\n", datalength);
150   delete m;
151 }
152
153
154
155 void TraceUtilizationBOC::collectSumDetailData() {
156   TraceUtilization* t = CkpvAccess(_trace);
157
158   compressedBuffer b = t->compressNRecentSumDetail(BIN_PER_SEC);
159
160   //  CkPrintf("[%d] contributing buffer created by compressNRecentSumDetail avg util=%lg\n", CkMyPe(), averageUtilizationInBuffer(b));
161   //  printCompressedBuf(b);
162   // fflush(stdout);
163   
164   
165 #if 0
166   b = fakeCompressedMessage();
167 #endif
168   
169   //  CkPrintf("[%d] contributing %d bytes worth of SumDetail data\n", CkMyPe(), b.datalength());
170   
171   //  CProxy_TraceUtilizationBOC sumProxy(traceSummaryGID);
172   CkCallback cb(CkIndex_TraceUtilizationBOC::sumDetailDataCollected(NULL), thisProxy[0]);
173   contribute(b.datalength(), b.buffer(), sumDetailCompressedReducer, cb);
174   
175   b.freeBuf();
176 }
177
178
179 void TraceUtilizationBOC::sumDetailDataCollected(CkReductionMsg *msg) {
180   CkAssert(CkMyPe() == 0);
181
182   compressedBuffer b(msg->getData());
183   CkPrintf("putting CCS reply in queue (average utilization= %lg)\n", averageUtilizationInBuffer(b));
184   //if(isCompressedBufferSane(b)){
185     storedSumDetailResults.push_back(msg); 
186     //}
187
188     // CkPrintf("[%d] Reduction of SumDetail completed. Result stored in storedSumDetailResults deque(size now=%d)\n", CkMyPe(), storedSumDetailResults.size() );
189     //  fflush(stdout);
190
191 }
192
193
194
195 void TraceUtilization::writeSts(void) {
196   // open sts file
197   char *fname = new char[strlen(CkpvAccess(traceRoot))+strlen(".util.sts")+1];
198   sprintf(fname, "%s.util.sts", CkpvAccess(traceRoot));
199   FILE* stsfp = fopen(fname, "w+");
200   if (stsfp == 0) {
201        CmiAbort("Cannot open summary sts file for writing.\n");
202   }
203   delete[] fname;
204
205   traceWriteSTS(stsfp,0);
206   fprintf(stsfp, "END\n");
207
208   fclose(stsfp);
209 }
210
211
212
213 ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
214
215
216   
217 /// Compress a buffer by merging all entries in a bin that are less than the threshold into a single "other" category
218   compressedBuffer moveTinyEntriesToOther(compressedBuffer src, double threshold){
219     //    CkPrintf("[%d] moveTinyEntriesToOther\n", CkMyPe());
220     
221     // reset the src buffer to the beginning
222     src.pos = 0;
223
224     compressedBuffer dest(100000); 
225     
226     int numBins = src.pop<numBins_T>();
227     int numProcs = src.pop<numProcs_T>();
228     
229     dest.push<numBins_T>(numBins);
230     dest.push<numProcs_T>(numProcs);
231     
232     
233     for(int i=0;i<numBins;i++){
234       double utilizationInOther = 0.0;
235       
236       entriesInBin_T numEntriesInSrcBin = src.pop<entriesInBin_T>();
237       int numEntriesInDestBinOffset = dest.push<entriesInBin_T>(0);
238       
239       CkAssert(numEntriesInSrcBin < 200);
240
241       for(int j=0;j<numEntriesInSrcBin;j++){
242         ep_T ep = src.pop<ep_T>();
243         double v = src.pop<utilization_T>();
244         
245         if(v < threshold * 250.0){
246           // do not copy bin into destination
247           utilizationInOther += v / 250.0;
248         } else {
249           // copy bin into destination
250           dest.increment<entriesInBin_T>(numEntriesInDestBinOffset);
251           dest.push<ep_T>(ep);
252           dest.push<utilization_T>(v);
253         }
254
255       }
256       
257       // if other category has stuff in it, add it to the destination buffer
258       if(utilizationInOther > 0.0){
259         dest.increment<entriesInBin_T>(numEntriesInDestBinOffset);
260         dest.push<ep_T>(other_EP);
261         if(utilizationInOther > 1.0)
262           utilizationInOther = 1.0;
263         dest.push<utilization_T>(utilizationInOther*250.0);
264       }
265       
266     }
267    
268     return dest;
269   }
270   
271     
272
273
274 /// A reducer for merging compressed sum detail data
275 CkReductionMsg *sumDetailCompressedReduction(int nMsg,CkReductionMsg **msgs){
276   // CkPrintf("[%d] sumDetailCompressedReduction(nMsgs=%d)\n", CkMyPe(), nMsg);
277   
278   compressedBuffer *incomingMsgs = new compressedBuffer[nMsg];
279   int *numProcsRepresentedInMessage = new int[nMsg];
280   
281   int numBins = 0;
282   int totalsize = 0;
283   int totalProcsAcrossAllMessages = 0;
284   
285   for (int i=0;i<nMsg;i++) {
286     incomingMsgs[i].init(msgs[i]->getData());
287     
288     //  CkPrintf("[%d] Incoming reduction message %d has average utilization %lg\n", CkMyPe(),  i, averageUtilizationInBuffer(incomingMsgs[i])); 
289     //   CkPrintf("Is buffer %d sane? %s\n", i, isCompressedBufferSane(incomingMsgs[i]) ? "yes": "no" );
290
291
292     totalsize += msgs[i]->getSize();
293     //  CkPrintf("BEGIN MERGE MESSAGE=========================================================\n");
294     //   printCompressedBuf(incomingMsgs[i]);
295     
296     // Read first value from message. 
297     // Make sure all messages have the same number of bins
298     if(i==0)
299       numBins = incomingMsgs[i].pop<numBins_T>();
300     else 
301       CkAssert( numBins ==  incomingMsgs[i].pop<numBins_T>() );
302     
303     // Read second value from message. 
304     numProcsRepresentedInMessage[i] = incomingMsgs[i].pop<numProcs_T>();
305     totalProcsAcrossAllMessages += numProcsRepresentedInMessage[i];
306     //    CkPrintf("Number of procs in message[%d] is %d\n", i,  (int)numProcsRepresentedInMessage[i]);
307   }
308   
309   compressedBuffer dest(totalsize + 100); 
310   
311   // build a compressed representation of each merged bin
312   dest.push<numBins_T>(numBins);
313   dest.push<numProcs_T>(totalProcsAcrossAllMessages);
314   
315   for(int i=0; i<numBins; i++){
316     mergeCompressedBin(incomingMsgs, nMsg, numProcsRepresentedInMessage, totalProcsAcrossAllMessages, dest);
317   }
318   
319   // CkPrintf("END MERGE RESULT=========================================================\n");
320   // printCompressedBuf(dest);
321
322
323   //CkPrintf("[%d] Merged buffer has average utilization %lg \n", CkMyPe(), averageUtilizationInBuffer(dest));
324
325   //CkPrintf("Is resulting merged buffer sane? %s\n", isCompressedBufferSane(dest) ? "yes": "no" );  
326   
327   compressedBuffer dest2 = moveTinyEntriesToOther(dest, 0.10);
328   
329   //  CkPrintf("Is resulting merged Filtered buffer sane? %s\n", isCompressedBufferSane(dest2) ? "yes": "no" ); 
330
331   //  CkPrintf("[%d] Outgoing reduction (filtered) message has average utilization %lf \n", CkMyPe(), averageUtilizationInBuffer(dest2));
332
333   
334   CkReductionMsg *m = CkReductionMsg::buildNew(dest2.datalength(),dest2.buffer());   
335   dest.freeBuf();
336   delete[] incomingMsgs;
337   return m;
338 }
339
340
341
342
343
344
345
346 /// Create fake sum detail data in the compressed format (for debugging)
347  compressedBuffer fakeCompressedMessage(){
348    CkPrintf("[%d] fakeCompressedMessage\n", CkMyPe());
349    
350    compressedBuffer fakeBuf(10000);
351    
352    int numBins = 55;
353    int numProcs = 1000;
354
355    // build a compressed representation of each merged bin
356    fakeBuf.push<numBins_T>(numBins);
357    fakeBuf.push<numProcs_T>(numProcs);
358    for(int i=0; i<numBins; i++){
359      int numRecords = 3;
360      fakeBuf.push<entriesInBin_T>(numRecords);
361      for(int j=0;j<numRecords;j++){
362        fakeBuf.push<ep_T>(j*10+2);
363        fakeBuf.push<utilization_T>(120.00);
364      }  
365    }
366    
367    //CkPrintf("Fake Compressed Message:=========================================================\n");
368    //   printCompressedBuf(fakeBuf);
369
370    CkAssert(isCompressedBufferSane(fakeBuf));
371
372    return fakeBuf;
373  }
374
375
376  /// Create an empty message
377  compressedBuffer emptyCompressedBuffer(){
378    compressedBuffer result(sizeof(numBins_T));
379    result.push<numBins_T>(0);
380    return result;
381  }
382
383
384
385
386 /** print out the compressed buffer starting from its begining*/
387 void printCompressedBuf(compressedBuffer b){
388   // b should be passed in by value, and hence we can modify it
389   b.pos = 0;
390   int numEntries = b.pop<numBins_T>();
391   CkPrintf("Buffer contains %d records\n", numEntries);
392   int numProcs = b.pop<numProcs_T>();
393   CkPrintf("Buffer represents an average over %d PEs\n", numProcs);
394
395   for(int i=0;i<numEntries;i++){
396     entriesInBin_T recordLength = b.pop<entriesInBin_T>();
397     if(recordLength > 0){
398       CkPrintf("    Record %d is of length %d : ", i, recordLength);
399       
400       for(int j=0;j<recordLength;j++){
401         ep_T ep = b.pop<ep_T>();
402         utilization_T v = b.pop<utilization_T>();
403         CkPrintf("(%d,%f) ", ep, v);
404       }
405     
406       CkPrintf("\n");
407     }
408   }
409 }
410
411
412
413  bool isCompressedBufferSane(compressedBuffer b){
414    // b should be passed in by value, and hence we can modify it  
415    b.pos = 0;  
416    numBins_T numBins = b.pop<numBins_T>();  
417    numProcs_T numProcs = b.pop<numProcs_T>();  
418    
419    if(numBins > 2000){
420      ckout << "WARNING: numBins=" << numBins << endl;
421      return false;
422    }
423    
424    for(int i=0;i<numBins;i++){  
425      entriesInBin_T recordLength = b.pop<entriesInBin_T>();  
426      if(recordLength > 200){
427        ckout << "WARNING: recordLength=" << recordLength << endl;
428        return false;
429      }
430      
431      if(recordLength > 0){  
432        
433        for(int j=0;j<recordLength;j++){  
434          ep_T ep = b.pop<ep_T>();  
435          utilization_T v = b.pop<utilization_T>();  
436          //      CkPrintf("(%d,%f) ", ep, v);  
437          if(((ep>800 || ep <0 ) && ep != other_EP) || v < 0.0 || v > 251.0){
438            ckout << "WARNING: ep=" << ep << " v=" << v << endl;
439            return false;
440          }
441        }  
442        
443      }  
444    }  
445    
446    return true;
447  }
448
449
450
451  double averageUtilizationInBuffer(compressedBuffer b){
452    // b should be passed in by value, and hence we can modify it  
453    b.pos = 0;  
454    numBins_T numBins = b.pop<numBins_T>();  
455    numProcs_T numProcs = b.pop<numProcs_T>();  
456    
457    //   CkPrintf("[%d] averageUtilizationInBuffer numProcs=%d   (grep reduction message)\n", CkMyPe(), numProcs);
458    
459    double totalUtilization = 0.0;
460    
461    for(int i=0;i<numBins;i++) {  
462      entriesInBin_T entriesInBin = b.pop<entriesInBin_T>();     
463      for(int j=0;j<entriesInBin;j++){  
464        ep_T ep = b.pop<ep_T>();  
465        totalUtilization +=  b.pop<utilization_T>();  
466      }
467    }
468    
469    return totalUtilization / numBins / 2.5;
470  }
471  
472  
473
474 void sanityCheckCompressedBuf(compressedBuffer b){  
475    CkAssert(isCompressedBufferSane(b)); 
476  }  
477  
478
479
480 double TraceUtilization::sumUtilization(int startBin, int endBin){
481    int epInfoSize = getEpInfoSize();
482    
483    double a = 0.0;
484
485    for(int i=startBin; i<=endBin; i++){
486      for(int j=0; j<epInfoSize; j++){
487        a += cpuTime[(i%NUM_BINS)*epInfoSize+j];
488      }
489    }
490    return a;
491  }
492
493  
494  /// Create a compressed buffer of the n most recent sum detail samples
495  compressedBuffer TraceUtilization::compressNRecentSumDetail(int desiredBinsToSend){
496    //   CkPrintf("compressNRecentSumDetail(desiredBinsToSend=%d)\n", desiredBinsToSend);
497
498    int startBin =  cpuTimeEntriesSentSoFar();
499    int numEntries = getEpInfoSize();
500
501    int endBin = startBin + desiredBinsToSend - 1;
502    int binsToSend = endBin - startBin + 1;
503    CkAssert(binsToSend >= desiredBinsToSend );
504    incrementNumCpuTimeEntriesSent(binsToSend);
505
506
507 #if 0
508    bool nonePrinted = true;
509    for(int i=0;i<(NUM_BINS-1000);i+=1000){
510      double expectedU = sumUtilization(i, i+999);
511      if(expectedU > 0.0){
512           CkPrintf("[%d of %d] compressNRecentSumDetail All bins: start=%05d end=%05d values in array sum to %lg\n", CkMyPe(), CkNumPes(),  i, i+999, expectedU);
513        nonePrinted = false;
514      }
515    }
516    
517    if(nonePrinted)
518      CkPrintf("[%d of %d] compressNRecentSumDetail All bins are 0\n", CkMyPe(), CkNumPes() );
519
520    fflush(stdout);
521 #endif
522
523    int bufferSize = 8*(2+numEntries) * (2+binsToSend)+100;
524    compressedBuffer b(bufferSize);
525
526    b.push<numBins_T>(binsToSend);
527    b.push<numProcs_T>(1); // number of processors along reduction subtree. I am just one processor.
528    //   double myu = 0.0;
529    
530    for(int i=0; i<binsToSend; i++) {
531      // Create a record for bin i
532      //  CkPrintf("Adding record for bin %d\n", i);
533      int numEntriesInRecordOffset = b.push<entriesInBin_T>(0); // The number of entries in this record
534      
535      for(int e=0; e<numEntries; e++) {
536        double scaledUtilization = getUtilization(i+startBin,e) * 2.5; // use range of 0 to 250 for the utilization, so it can fit in an unsigned char
537        if(scaledUtilization > 0.0) {
538          //CkPrintf("scaledUtilization=%lg !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n", scaledUtilization);
539          if(scaledUtilization > 250.0)
540            scaledUtilization = 250.0;
541          
542          b.push<ep_T>(e);
543          b.push<utilization_T>(scaledUtilization);
544          //      myu += scaledUtilization;
545          b.increment<entriesInBin_T>(numEntriesInRecordOffset);
546        }
547      }
548    }
549    
550    // CkPrintf("[%d] compressNRecentSumDetail resulting buffer: averageUtilizationInBuffer()=%lg myu=%lg\n", CkMyPe(), averageUtilizationInBuffer(b), myu);
551    // fflush(stdout);
552    
553    return b;
554  }
555  
556
557
558
559
560 /** Merge the compressed entries from the first bin in each of the srcBuf buffers.
561      
562 */
563  void mergeCompressedBin(compressedBuffer *srcBufferArray, int numSrcBuf, int *numProcsRepresentedInMessage, int totalProcsAcrossAllMessages, compressedBuffer &destBuf){
564   // put a counter at the beginning of destBuf
565   int numEntriesInDestRecordOffset = destBuf.push<entriesInBin_T>(0);
566   
567   //  CkPrintf("BEGIN MERGE------------------------------------------------------------------\n");
568   
569   // Read off the number of bins in each buffer
570   int *remainingEntriesToRead = new int[numSrcBuf];
571   for(int i=0;i<numSrcBuf;i++){
572     remainingEntriesToRead[i] = srcBufferArray[i].pop<entriesInBin_T>();
573   }
574
575   int count = 0;
576   // Count remaining entries to process
577   for(int i=0;i<numSrcBuf;i++){
578     count += remainingEntriesToRead[i];
579   }
580   
581   while (count>0) {
582     // find first EP from all buffers (these are sorted by EP already)
583     int minEp = 10000;
584     for(int i=0;i<numSrcBuf;i++){
585       if(remainingEntriesToRead[i]>0){
586         int ep = srcBufferArray[i].peek<ep_T>();
587         if(ep < minEp){
588           minEp = ep;
589         }
590       }
591     }
592     
593     //   CkPrintf("[%d] mergeCompressedBin minEp found was %d   totalProcsAcrossAllMessages=%d\n", CkMyPe(), minEp, (int)totalProcsAcrossAllMessages);
594     
595     destBuf.increment<entriesInBin_T>(numEntriesInDestRecordOffset);
596
597     // Merge contributions from all buffers that list the EP
598     double v = 0.0;
599     for(int i=0;i<numSrcBuf;i++){
600       if(remainingEntriesToRead[i]>0){
601         int ep = srcBufferArray[i].peek<ep_T>(); 
602         if(ep == minEp){
603           srcBufferArray[i].pop<ep_T>();  // pop ep
604           double util = srcBufferArray[i].pop<utilization_T>();
605           v += util * numProcsRepresentedInMessage[i];
606           remainingEntriesToRead[i]--;
607           count --;
608         }
609       }
610     }
611
612     // create a new entry in the output for this EP.
613     destBuf.push<ep_T>(minEp);
614     destBuf.push<utilization_T>(v / (double)totalProcsAcrossAllMessages);
615
616   }
617
618
619   delete [] remainingEntriesToRead;
620   // CkPrintf("[%d] End of mergeCompressedBin:\n", CkMyPe() );
621   // CkPrintf("END MERGE ------------------------------------------------------------------\n");
622  }
623
624
625
626 #include "TraceUtilization.def.h"
627
628
629 /*@}*/
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649