Fixed message tracing on comm thread (to trace those entry methods that need
[charm.git] / src / ck-perf / trace-utilization.C
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 /**
9  * \addtogroup CkPerf
10 */
11 /*@{*/
12
13 #include "trace-utilization.h"
14
15
16 /* readonly */ CProxy_TraceUtilizationBOC traceUtilizationGroupProxy;
17
18
19 /// A reduction type for merging compressed sum detail data
20 CkReduction::reducerType sumDetailCompressedReducer;
21
22
23 void collectUtilizationData(void *ignore, double currT) {
24   // Only start collecting after a few seconds have passed. This way there will hopefully be at least 1000 bins to pickup each time we try.
25   static int numTimesCalled = 0;
26   numTimesCalled ++;
27   if(numTimesCalled > 4){
28     traceUtilizationGroupProxy.collectSumDetailData();
29   }
30 }
31
32
33 CkpvStaticDeclare(TraceUtilization*, _trace);
34
35 /**
36   For each TraceFoo module, _createTraceFoo() must be defined.
37   This function is called in _createTraces() generated in moduleInit.C
38 */
39 void _createTraceutilization(char **argv)
40 {
41   //  CkPrintf("[%d] _createTraceutilization\n", CkMyPe());
42
43   // Register the reducer
44   CkAssert(sizeof(short) == 2);
45   sumDetailCompressedReducer=CkReduction::addReducer(sumDetailCompressedReduction);
46
47   CkpvInitialize(TraceUtilization*, _trace);
48   CkpvAccess(_trace) = new TraceUtilization();
49   CkpvAccess(_traces)->addTrace(CkpvAccess(_trace));
50
51 }
52
53
54
55 void TraceUtilization::beginExecute(CmiObjId *tid)
56 {
57   beginExecute(-1,-1,_threadEP,-1);
58 }
59
60 void TraceUtilization::beginExecute(envelope *e)
61 {
62   // no message means thread execution
63   if (e==NULL) {
64     beginExecute(-1,-1,_threadEP,-1);
65   }
66   else {
67     beginExecute(-1,-1,e->getEpIdx(),-1);
68   }  
69 }
70
71 void TraceUtilization::beginExecute(int event,int msgType,int ep,int srcPe, int mlen, CmiObjId *idx)
72 {
73   if (execEp != INVALIDEP) {
74     TRACE_WARN("Warning: TraceUtilization two consecutive BEGIN_PROCESSING!\n");
75     return;
76   }
77   
78   execEp=ep;
79   start = TraceTimer();
80 }
81
82
83 void TraceUtilization::endExecute(void)
84 {
85
86   if (execEp == TRACEON_EP) {
87     // if trace just got turned on, then one expects to see this
88     // END_PROCESSING event without seeing a preceeding BEGIN_PROCESSING
89     return;
90   }
91
92   double endTime = TraceTimer();
93  
94   updateCpuTime(execEp, start, endTime);
95
96
97   execEp = INVALIDEP;
98 }
99
100
101
102 void TraceUtilization::addEventType(int eventType)
103 {
104   CkPrintf("FIXME handle TraceUtilization::addEventType(%d)\n", eventType);
105 }
106
107
108
109
110 /**
111
112 Send back to the client compressed sum-detail style measurements about the 
113 utilization for each active PE combined across all PEs.
114
115 The data format sent by this handler is a bunch of records(one for each bin) of the following format:
116    #samples (EP,utilization)* 
117
118 One example record for two EPS that executed during the sample period. 
119 EP 3 used 150/200 of the time while EP 122 executed for 20/200 of the time. 
120 All of these would be packed as bytes into the message:
121 2 3 150 122 20
122
123  */
124 void TraceUtilizationBOC::ccsRequestSumDetailCompressed(CkCcsRequestMsg *m) {
125   CkPrintf("CCS request for compressed sum detail. (found %d stored in deque)\n",  storedSumDetailResults.size() );
126   //  CkAssert(sumDetail);
127   int datalength;
128
129 #if 0
130
131   compressedBuffer fakeMessage = fakeCompressedMessage();
132   CcsSendDelayedReply(m->reply, fakeMessage.datalength(), fakeMessage.buffer() );
133   fakeMessage.freeBuf();
134
135 #else
136
137   if (storedSumDetailResults.size()  == 0) {
138     compressedBuffer b = emptyCompressedBuffer();
139     CcsSendDelayedReply(m->reply, b.datalength(), b.buffer()); 
140     b.freeBuf();
141   } else {
142     CkReductionMsg * msg = storedSumDetailResults.front();
143     storedSumDetailResults.pop_front();
144
145     
146     void *sendBuffer = (void *)msg->getData();
147     datalength = msg->getSize();
148     CcsSendDelayedReply(m->reply, datalength, sendBuffer);
149     
150     delete msg;
151   }
152     
153   
154 #endif
155
156   //  CkPrintf("CCS response of %d bytes sent.\n", datalength);
157   delete m;
158 }
159
160
161
162 void TraceUtilizationBOC::collectSumDetailData() {
163   TraceUtilization* t = CkpvAccess(_trace);
164
165   compressedBuffer b = t->compressNRecentSumDetail(BIN_PER_SEC);
166
167   //  CkPrintf("[%d] contributing buffer created by compressNRecentSumDetail avg util=%lg\n", CkMyPe(), averageUtilizationInBuffer(b));
168   //  printCompressedBuf(b);
169   // fflush(stdout);
170   
171   
172 #if 0
173   b = fakeCompressedMessage();
174 #endif
175   
176   //  CkPrintf("[%d] contributing %d bytes worth of SumDetail data\n", CkMyPe(), b.datalength());
177   
178   //  CProxy_TraceUtilizationBOC sumProxy(traceSummaryGID);
179   CkCallback cb(CkIndex_TraceUtilizationBOC::sumDetailDataCollected(NULL), thisProxy[0]);
180   contribute(b.datalength(), b.buffer(), sumDetailCompressedReducer, cb);
181   
182   b.freeBuf();
183 }
184
185
186 void TraceUtilizationBOC::sumDetailDataCollected(CkReductionMsg *msg) {
187   CkAssert(CkMyPe() == 0);
188
189   compressedBuffer b(msg->getData());
190   CkPrintf("putting CCS reply in queue (average utilization= %lg)\n", averageUtilizationInBuffer(b));
191   //if(isCompressedBufferSane(b)){
192     storedSumDetailResults.push_back(msg); 
193     //}
194
195     // CkPrintf("[%d] Reduction of SumDetail completed. Result stored in storedSumDetailResults deque(size now=%d)\n", CkMyPe(), storedSumDetailResults.size() );
196     //  fflush(stdout);
197
198 }
199
200
201
202 void TraceUtilization::writeSts(void) {
203   // open sts file
204   char *fname = new char[strlen(CkpvAccess(traceRoot))+strlen(".util.sts")+1];
205   sprintf(fname, "%s.util.sts", CkpvAccess(traceRoot));
206   FILE* stsfp = fopen(fname, "w+");
207   if (stsfp == 0) {
208        CmiAbort("Cannot open summary sts file for writing.\n");
209   }
210   delete[] fname;
211
212   traceWriteSTS(stsfp,0);
213   fprintf(stsfp, "END\n");
214
215   fclose(stsfp);
216 }
217
218
219
220 ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
221
222
223   
224 /// Compress a buffer by merging all entries in a bin that are less than the threshold into a single "other" category
225   compressedBuffer moveTinyEntriesToOther(compressedBuffer src, double threshold){
226     //    CkPrintf("[%d] moveTinyEntriesToOther\n", CkMyPe());
227     
228     // reset the src buffer to the beginning
229     src.pos = 0;
230
231     compressedBuffer dest(100000); 
232     
233     int numBins = src.pop<numBins_T>();
234     int numProcs = src.pop<numProcs_T>();
235     
236     dest.push<numBins_T>(numBins);
237     dest.push<numProcs_T>(numProcs);
238     
239     
240     for(int i=0;i<numBins;i++){
241       double utilizationInOther = 0.0;
242       
243       entriesInBin_T numEntriesInSrcBin = src.pop<entriesInBin_T>();
244       int numEntriesInDestBinOffset = dest.push<entriesInBin_T>(0);
245       
246       CkAssert(numEntriesInSrcBin < 200);
247
248       for(int j=0;j<numEntriesInSrcBin;j++){
249         ep_T ep = src.pop<ep_T>();
250         double v = src.pop<utilization_T>();
251         
252         if(v < threshold * 250.0){
253           // do not copy bin into destination
254           utilizationInOther += v / 250.0;
255         } else {
256           // copy bin into destination
257           dest.increment<entriesInBin_T>(numEntriesInDestBinOffset);
258           dest.push<ep_T>(ep);
259           dest.push<utilization_T>(v);
260         }
261
262       }
263       
264       // if other category has stuff in it, add it to the destination buffer
265       if(utilizationInOther > 0.0){
266         dest.increment<entriesInBin_T>(numEntriesInDestBinOffset);
267         dest.push<ep_T>(other_EP);
268         if(utilizationInOther > 1.0)
269           utilizationInOther = 1.0;
270         dest.push<utilization_T>(utilizationInOther*250.0);
271       }
272       
273     }
274    
275     return dest;
276   }
277   
278     
279
280
281 /// A reducer for merging compressed sum detail data
282 CkReductionMsg *sumDetailCompressedReduction(int nMsg,CkReductionMsg **msgs){
283   // CkPrintf("[%d] sumDetailCompressedReduction(nMsgs=%d)\n", CkMyPe(), nMsg);
284   
285   compressedBuffer *incomingMsgs = new compressedBuffer[nMsg];
286   int *numProcsRepresentedInMessage = new int[nMsg];
287   
288   int numBins = 0;
289   int totalsize = 0;
290   int totalProcsAcrossAllMessages = 0;
291   
292   for (int i=0;i<nMsg;i++) {
293     incomingMsgs[i].init(msgs[i]->getData());
294     
295     //  CkPrintf("[%d] Incoming reduction message %d has average utilization %lg\n", CkMyPe(),  i, averageUtilizationInBuffer(incomingMsgs[i])); 
296     //   CkPrintf("Is buffer %d sane? %s\n", i, isCompressedBufferSane(incomingMsgs[i]) ? "yes": "no" );
297
298
299     totalsize += msgs[i]->getSize();
300     //  CkPrintf("BEGIN MERGE MESSAGE=========================================================\n");
301     //   printCompressedBuf(incomingMsgs[i]);
302     
303     // Read first value from message. 
304     // Make sure all messages have the same number of bins
305     if(i==0)
306       numBins = incomingMsgs[i].pop<numBins_T>();
307     else 
308       CkAssert( numBins ==  incomingMsgs[i].pop<numBins_T>() );
309     
310     // Read second value from message. 
311     numProcsRepresentedInMessage[i] = incomingMsgs[i].pop<numProcs_T>();
312     totalProcsAcrossAllMessages += numProcsRepresentedInMessage[i];
313     //    CkPrintf("Number of procs in message[%d] is %d\n", i,  (int)numProcsRepresentedInMessage[i]);
314   }
315   
316   compressedBuffer dest(totalsize + 100); 
317   
318   // build a compressed representation of each merged bin
319   dest.push<numBins_T>(numBins);
320   dest.push<numProcs_T>(totalProcsAcrossAllMessages);
321   
322   for(int i=0; i<numBins; i++){
323     mergeCompressedBin(incomingMsgs, nMsg, numProcsRepresentedInMessage, totalProcsAcrossAllMessages, dest);
324   }
325   
326   // CkPrintf("END MERGE RESULT=========================================================\n");
327   // printCompressedBuf(dest);
328
329
330   //CkPrintf("[%d] Merged buffer has average utilization %lg \n", CkMyPe(), averageUtilizationInBuffer(dest));
331
332   //CkPrintf("Is resulting merged buffer sane? %s\n", isCompressedBufferSane(dest) ? "yes": "no" );  
333   
334   compressedBuffer dest2 = moveTinyEntriesToOther(dest, 0.10);
335   
336   //  CkPrintf("Is resulting merged Filtered buffer sane? %s\n", isCompressedBufferSane(dest2) ? "yes": "no" ); 
337
338   //  CkPrintf("[%d] Outgoing reduction (filtered) message has average utilization %lf \n", CkMyPe(), averageUtilizationInBuffer(dest2));
339
340   
341   CkReductionMsg *m = CkReductionMsg::buildNew(dest2.datalength(),dest2.buffer());   
342   dest.freeBuf();
343   delete[] incomingMsgs;
344   return m;
345 }
346
347
348
349
350
351
352
353 /// Create fake sum detail data in the compressed format (for debugging)
354  compressedBuffer fakeCompressedMessage(){
355    CkPrintf("[%d] fakeCompressedMessage\n", CkMyPe());
356    
357    compressedBuffer fakeBuf(10000);
358    
359    int numBins = 55;
360    int totalsize = 0;
361    int numProcs = 1000;
362
363    // build a compressed representation of each merged bin
364    fakeBuf.push<numBins_T>(numBins);
365    fakeBuf.push<numProcs_T>(numProcs);
366    for(int i=0; i<numBins; i++){
367      int numRecords = 3;
368      fakeBuf.push<entriesInBin_T>(numRecords);
369      for(int j=0;j<numRecords;j++){
370        fakeBuf.push<ep_T>(j*10+2);
371        fakeBuf.push<utilization_T>(120.00);
372      }  
373    }
374    
375    //CkPrintf("Fake Compressed Message:=========================================================\n");
376    //   printCompressedBuf(fakeBuf);
377
378    CkAssert(isCompressedBufferSane(fakeBuf));
379
380    return fakeBuf;
381  }
382
383
384  /// Create an empty message
385  compressedBuffer emptyCompressedBuffer(){
386    compressedBuffer result(sizeof(numBins_T));
387    result.push<numBins_T>(0);
388    return result;
389  }
390
391
392
393
394 /** print out the compressed buffer starting from its begining*/
395 void printCompressedBuf(compressedBuffer b){
396   // b should be passed in by value, and hence we can modify it
397   b.pos = 0;
398   int numEntries = b.pop<numBins_T>();
399   CkPrintf("Buffer contains %d records\n", numEntries);
400   int numProcs = b.pop<numProcs_T>();
401   CkPrintf("Buffer represents an average over %d PEs\n", numProcs);
402
403   for(int i=0;i<numEntries;i++){
404     entriesInBin_T recordLength = b.pop<entriesInBin_T>();
405     if(recordLength > 0){
406       CkPrintf("    Record %d is of length %d : ", i, recordLength);
407       
408       for(int j=0;j<recordLength;j++){
409         ep_T ep = b.pop<ep_T>();
410         utilization_T v = b.pop<utilization_T>();
411         CkPrintf("(%d,%f) ", ep, v);
412       }
413     
414       CkPrintf("\n");
415     }
416   }
417 }
418
419
420
421  bool isCompressedBufferSane(compressedBuffer b){
422    // b should be passed in by value, and hence we can modify it  
423    b.pos = 0;  
424    numBins_T numBins = b.pop<numBins_T>();  
425    numProcs_T numProcs = b.pop<numProcs_T>();  
426    
427    if(numBins > 2000){
428      ckout << "WARNING: numBins=" << numBins << endl;
429      return false;
430    }
431    
432    for(int i=0;i<numBins;i++){  
433      entriesInBin_T recordLength = b.pop<entriesInBin_T>();  
434      if(recordLength > 200){
435        ckout << "WARNING: recordLength=" << recordLength << endl;
436        return false;
437      }
438      
439      if(recordLength > 0){  
440        
441        for(int j=0;j<recordLength;j++){  
442          ep_T ep = b.pop<ep_T>();  
443          utilization_T v = b.pop<utilization_T>();  
444          //      CkPrintf("(%d,%f) ", ep, v);  
445          if(((ep>800 || ep <0 ) && ep != other_EP) || v < 0.0 || v > 251.0){
446            ckout << "WARNING: ep=" << ep << " v=" << v << endl;
447            return false;
448          }
449        }  
450        
451      }  
452    }  
453    
454    return true;
455  }
456
457
458
459  double averageUtilizationInBuffer(compressedBuffer b){
460    // b should be passed in by value, and hence we can modify it  
461    b.pos = 0;  
462    numBins_T numBins = b.pop<numBins_T>();  
463    numProcs_T numProcs = b.pop<numProcs_T>();  
464    
465    //   CkPrintf("[%d] averageUtilizationInBuffer numProcs=%d   (grep reduction message)\n", CkMyPe(), numProcs);
466    
467    double totalUtilization = 0.0;
468    
469    for(int i=0;i<numBins;i++) {  
470      entriesInBin_T entriesInBin = b.pop<entriesInBin_T>();     
471      for(int j=0;j<entriesInBin;j++){  
472        ep_T ep = b.pop<ep_T>();  
473        totalUtilization +=  b.pop<utilization_T>();  
474      }
475    }
476    
477    return totalUtilization / numBins / 2.5;
478  }
479  
480  
481
482 void sanityCheckCompressedBuf(compressedBuffer b){  
483    CkAssert(isCompressedBufferSane(b)); 
484  }  
485  
486
487
488 double TraceUtilization::sumUtilization(int startBin, int endBin){
489    int epInfoSize = getEpInfoSize();
490    
491    double a = 0.0;
492
493    for(int i=startBin; i<=endBin; i++){
494      for(int j=0; j<epInfoSize; j++){
495        a += cpuTime[(i%NUM_BINS)*epInfoSize+j];
496      }
497    }
498    return a;
499  }
500
501  
502  /// Create a compressed buffer of the n most recent sum detail samples
503  compressedBuffer TraceUtilization::compressNRecentSumDetail(int desiredBinsToSend){
504    //   CkPrintf("compressNRecentSumDetail(desiredBinsToSend=%d)\n", desiredBinsToSend);
505
506    int startBin =  cpuTimeEntriesSentSoFar();
507    int numEntries = getEpInfoSize();
508
509    int endBin = startBin + desiredBinsToSend - 1;
510    int binsToSend = endBin - startBin + 1;
511    CkAssert(binsToSend >= desiredBinsToSend );
512    incrementNumCpuTimeEntriesSent(binsToSend);
513
514
515 #if 0
516    bool nonePrinted = true;
517    for(int i=0;i<(NUM_BINS-1000);i+=1000){
518      double expectedU = sumUtilization(i, i+999);
519      if(expectedU > 0.0){
520           CkPrintf("[%d of %d] compressNRecentSumDetail All bins: start=%05d end=%05d values in array sum to %lg\n", CkMyPe(), CkNumPes(),  i, i+999, expectedU);
521        nonePrinted = false;
522      }
523    }
524    
525    if(nonePrinted)
526      CkPrintf("[%d of %d] compressNRecentSumDetail All bins are 0\n", CkMyPe(), CkNumPes() );
527
528    fflush(stdout);
529 #endif
530
531    int bufferSize = 8*(2+numEntries) * (2+binsToSend)+100;
532    compressedBuffer b(bufferSize);
533
534    b.push<numBins_T>(binsToSend);
535    b.push<numProcs_T>(1); // number of processors along reduction subtree. I am just one processor.
536    //   double myu = 0.0;
537    
538    for(int i=0; i<binsToSend; i++) {
539      // Create a record for bin i
540      //  CkPrintf("Adding record for bin %d\n", i);
541      int numEntriesInRecordOffset = b.push<entriesInBin_T>(0); // The number of entries in this record
542      
543      for(int e=0; e<numEntries; e++) {
544        double scaledUtilization = getUtilization(i+startBin,e) * 2.5; // use range of 0 to 250 for the utilization, so it can fit in an unsigned char
545        if(scaledUtilization > 0.0) {
546          //CkPrintf("scaledUtilization=%lg !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n", scaledUtilization);
547          if(scaledUtilization > 250.0)
548            scaledUtilization = 250.0;
549          
550          b.push<ep_T>(e);
551          b.push<utilization_T>(scaledUtilization);
552          //      myu += scaledUtilization;
553          b.increment<entriesInBin_T>(numEntriesInRecordOffset);
554        }
555      }
556    }
557    
558    // CkPrintf("[%d] compressNRecentSumDetail resulting buffer: averageUtilizationInBuffer()=%lg myu=%lg\n", CkMyPe(), averageUtilizationInBuffer(b), myu);
559    // fflush(stdout);
560    
561    return b;
562  }
563  
564
565
566
567
568 /** Merge the compressed entries from the first bin in each of the srcBuf buffers.
569      
570 */
571  void mergeCompressedBin(compressedBuffer *srcBufferArray, int numSrcBuf, int *numProcsRepresentedInMessage, int totalProcsAcrossAllMessages, compressedBuffer &destBuf){
572   // put a counter at the beginning of destBuf
573   int numEntriesInDestRecordOffset = destBuf.push<entriesInBin_T>(0);
574   
575   //  CkPrintf("BEGIN MERGE------------------------------------------------------------------\n");
576   
577   // Read off the number of bins in each buffer
578   int *remainingEntriesToRead = new int[numSrcBuf];
579   for(int i=0;i<numSrcBuf;i++){
580     remainingEntriesToRead[i] = srcBufferArray[i].pop<entriesInBin_T>();
581   }
582
583   int count = 0;
584   // Count remaining entries to process
585   for(int i=0;i<numSrcBuf;i++){
586     count += remainingEntriesToRead[i];
587   }
588   
589   while (count>0) {
590     // find first EP from all buffers (these are sorted by EP already)
591     int minEp = 10000;
592     for(int i=0;i<numSrcBuf;i++){
593       if(remainingEntriesToRead[i]>0){
594         int ep = srcBufferArray[i].peek<ep_T>();
595         if(ep < minEp){
596           minEp = ep;
597         }
598       }
599     }
600     
601     //   CkPrintf("[%d] mergeCompressedBin minEp found was %d   totalProcsAcrossAllMessages=%d\n", CkMyPe(), minEp, (int)totalProcsAcrossAllMessages);
602     
603     destBuf.increment<entriesInBin_T>(numEntriesInDestRecordOffset);
604
605     // Merge contributions from all buffers that list the EP
606     double v = 0.0;
607     for(int i=0;i<numSrcBuf;i++){
608       if(remainingEntriesToRead[i]>0){
609         int ep = srcBufferArray[i].peek<ep_T>(); 
610         if(ep == minEp){
611           srcBufferArray[i].pop<ep_T>();  // pop ep
612           double util = srcBufferArray[i].pop<utilization_T>();
613           v += util * numProcsRepresentedInMessage[i];
614           remainingEntriesToRead[i]--;
615           count --;
616         }
617       }
618     }
619
620     // create a new entry in the output for this EP.
621     destBuf.push<ep_T>(minEp);
622     destBuf.push<utilization_T>(v / (double)totalProcsAcrossAllMessages);
623
624   }
625
626
627   delete [] remainingEntriesToRead;
628   // CkPrintf("[%d] End of mergeCompressedBin:\n", CkMyPe() );
629   // CkPrintf("END MERGE ------------------------------------------------------------------\n");
630  }
631
632
633
634 #include "TraceUtilization.def.h"
635
636
637 /*@}*/
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657