take Projector out and protect it with a macro CMK_PROJECTOR
[charm.git] / src / ck-perf / trace-utilization.C
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 /**
9  * \addtogroup CkPerf
10 */
11 /*@{*/
12
13 #include "trace-utilization.h"
14
15
16 /* readonly */ CProxy_TraceUtilizationBOC traceUtilizationGroupProxy;
17
18
19 /// A reduction type for merging compressed sum detail data
20 CkReduction::reducerType sumDetailCompressedReducer;
21
22
23 void collectUtilizationData(void *ignore, double currT) {
24   // Only start collecting after a few seconds have passed. This way there will hopefully be at least 1000 bins to pickup each time we try.
25   static int numTimesCalled = 0;
26   numTimesCalled ++;
27   if(numTimesCalled > 4){
28     traceUtilizationGroupProxy.collectSumDetailData();
29   }
30 }
31
32
33 CkpvStaticDeclare(TraceUtilization*, _trace);
34
35 /**
36   For each TraceFoo module, _createTraceFoo() must be defined.
37   This function is called in _createTraces() generated in moduleInit.C
38 */
39 void _createTraceutilization(char **argv)
40 {
41   //  CkPrintf("[%d] _createTraceutilization\n", CkMyPe());
42
43   // Register the reducer
44   CkAssert(sizeof(short) == 2);
45   sumDetailCompressedReducer=CkReduction::addReducer(sumDetailCompressedReduction);
46
47   CkpvInitialize(TraceUtilization*, _trace);
48   CkpvAccess(_trace) = new TraceUtilization();
49   CkpvAccess(_traces)->addTrace(CkpvAccess(_trace));
50
51 }
52
53
54
55 void TraceUtilization::beginExecute(CmiObjId *tid)
56 {
57   beginExecute(-1,-1,_threadEP,-1);
58 }
59
60 void TraceUtilization::beginExecute(envelope *e)
61 {
62   // no message means thread execution
63   if (e==NULL) {
64     beginExecute(-1,-1,_threadEP,-1);
65   }
66   else {
67     beginExecute(-1,-1,e->getEpIdx(),-1);
68   }  
69 }
70
71 void TraceUtilization::beginExecute(int event,int msgType,int ep,int srcPe, int mlen, CmiObjId *idx)
72 {
73   if (execEp != INVALIDEP) {
74     TRACE_WARN("Warning: TraceUtilization two consecutive BEGIN_PROCESSING!\n");
75     return;
76   }
77   
78   execEp=ep;
79   start = TraceTimer();
80 }
81
82
83 void TraceUtilization::endExecute(void)
84 {
85
86   if (execEp == TRACEON_EP) {
87     // if trace just got turned on, then one expects to see this
88     // END_PROCESSING event without seeing a preceeding BEGIN_PROCESSING
89     return;
90   }
91
92   double endTime = TraceTimer();
93  
94   updateCpuTime(execEp, start, endTime);
95
96
97   execEp = INVALIDEP;
98 }
99
100
101
102 void TraceUtilization::addEventType(int eventType)
103 {
104   CkPrintf("FIXME handle TraceUtilization::addEventType(%d)\n", eventType);
105 }
106
107
108
109
110 /**
111
112 Send back to the client compressed sum-detail style measurements about the 
113 utilization for each active PE combined across all PEs.
114
115 The data format sent by this handler is a bunch of records(one for each bin) of the following format:
116    #samples (EP,utilization)* 
117
118 One example record for two EPS that executed during the sample period. 
119 EP 3 used 150/200 of the time while EP 122 executed for 20/200 of the time. 
120 All of these would be packed as bytes into the message:
121 2 3 150 122 20
122
123  */
124 void TraceUtilizationBOC::ccsRequestSumDetailCompressed(CkCcsRequestMsg *m) {
125   CkPrintf("CCS request for compressed sum detail. (found %d stored in deque)\n",  storedSumDetailResults.size() );
126   //  CkAssert(sumDetail);
127   int datalength;
128
129 #if 0
130
131   compressedBuffer fakeMessage = fakeCompressedMessage();
132   CcsSendDelayedReply(m->reply, fakeMessage.datalength(), fakeMessage.buffer() );
133   fakeMessage.freeBuf();
134
135 #else
136
137   if (storedSumDetailResults.size()  == 0) {
138     compressedBuffer b = emptyCompressedBuffer();
139     CcsSendDelayedReply(m->reply, b.datalength(), b.buffer()); 
140     b.freeBuf();
141   } else {
142     CkReductionMsg * msg = storedSumDetailResults.front();
143     storedSumDetailResults.pop_front();
144
145     
146     void *sendBuffer = (void *)msg->getData();
147     datalength = msg->getSize();
148     CcsSendDelayedReply(m->reply, datalength, sendBuffer);
149     
150     delete msg;
151   }
152     
153   
154 #endif
155
156   //  CkPrintf("CCS response of %d bytes sent.\n", datalength);
157   delete m;
158 }
159
160
161
162 void TraceUtilizationBOC::collectSumDetailData() {
163   TraceUtilization* t = CkpvAccess(_trace);
164
165   compressedBuffer b = t->compressNRecentSumDetail(BIN_PER_SEC);
166
167   //  CkPrintf("[%d] contributing buffer created by compressNRecentSumDetail avg util=%lg\n", CkMyPe(), averageUtilizationInBuffer(b));
168   //  printCompressedBuf(b);
169   // fflush(stdout);
170   
171   
172 #if 0
173   b = fakeCompressedMessage();
174 #endif
175   
176   //  CkPrintf("[%d] contributing %d bytes worth of SumDetail data\n", CkMyPe(), b.datalength());
177   
178   //  CProxy_TraceUtilizationBOC sumProxy(traceSummaryGID);
179   CkCallback cb(CkIndex_TraceUtilizationBOC::sumDetailDataCollected(NULL), thisProxy[0]);
180   contribute(b.datalength(), b.buffer(), sumDetailCompressedReducer, cb);
181   
182   b.freeBuf();
183 }
184
185
186 void TraceUtilizationBOC::sumDetailDataCollected(CkReductionMsg *msg) {
187   CkAssert(CkMyPe() == 0);
188
189   compressedBuffer b(msg->getData());
190   CkPrintf("putting CCS reply in queue (average utilization= %lg)\n", averageUtilizationInBuffer(b));
191   //if(isCompressedBufferSane(b)){
192     storedSumDetailResults.push_back(msg); 
193     //}
194
195     // CkPrintf("[%d] Reduction of SumDetail completed. Result stored in storedSumDetailResults deque(size now=%d)\n", CkMyPe(), storedSumDetailResults.size() );
196     //  fflush(stdout);
197
198 }
199
200
201
202 void TraceUtilization::writeSts(void) {
203   // open sts file
204   char *fname = new char[strlen(CkpvAccess(traceRoot))+strlen(".util.sts")+1];
205   sprintf(fname, "%s.util.sts", CkpvAccess(traceRoot));
206   FILE* stsfp = fopen(fname, "w+");
207   if (stsfp == 0) {
208        CmiAbort("Cannot open summary sts file for writing.\n");
209   }
210   delete[] fname;
211
212   traceWriteSTS(stsfp,0);
213   fprintf(stsfp, "END\n");
214
215   fclose(stsfp);
216 }
217
218
219
220 ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
221
222
223   
224 /// Compress a buffer by merging all entries in a bin that are less than the threshold into a single "other" category
225   compressedBuffer moveTinyEntriesToOther(compressedBuffer src, double threshold){
226     //    CkPrintf("[%d] moveTinyEntriesToOther\n", CkMyPe());
227     
228     // reset the src buffer to the beginning
229     src.pos = 0;
230
231     compressedBuffer dest(100000); 
232     
233     int numBins = src.pop<numBins_T>();
234     int numProcs = src.pop<numProcs_T>();
235     
236     dest.push<numBins_T>(numBins);
237     dest.push<numProcs_T>(numProcs);
238     
239     
240     for(int i=0;i<numBins;i++){
241       double utilizationInOther = 0.0;
242       
243       entriesInBin_T numEntriesInSrcBin = src.pop<entriesInBin_T>();
244       int numEntriesInDestBinOffset = dest.push<entriesInBin_T>(0);
245       
246       CkAssert(numEntriesInSrcBin < 200);
247
248       for(int j=0;j<numEntriesInSrcBin;j++){
249         ep_T ep = src.pop<ep_T>();
250         double v = src.pop<utilization_T>();
251         
252         if(v < threshold * 250.0){
253           // do not copy bin into destination
254           utilizationInOther += v / 250.0;
255         } else {
256           // copy bin into destination
257           dest.increment<entriesInBin_T>(numEntriesInDestBinOffset);
258           dest.push<ep_T>(ep);
259           dest.push<utilization_T>(v);
260         }
261
262       }
263       
264       // if other category has stuff in it, add it to the destination buffer
265       if(utilizationInOther > 0.0){
266         dest.increment<entriesInBin_T>(numEntriesInDestBinOffset);
267         dest.push<ep_T>(other_EP);
268         if(utilizationInOther > 1.0)
269           utilizationInOther = 1.0;
270         dest.push<utilization_T>(utilizationInOther*250.0);
271       }
272       
273     }
274    
275     return dest;
276   }
277   
278     
279
280
281 /// A reducer for merging compressed sum detail data
282 CkReductionMsg *sumDetailCompressedReduction(int nMsg,CkReductionMsg **msgs){
283   // CkPrintf("[%d] sumDetailCompressedReduction(nMsgs=%d)\n", CkMyPe(), nMsg);
284   
285   compressedBuffer *incomingMsgs = new compressedBuffer[nMsg];
286   int *numProcsRepresentedInMessage = new int[nMsg];
287   
288   int numBins = 0;
289   int totalsize = 0;
290   int totalProcsAcrossAllMessages = 0;
291   
292   for (int i=0;i<nMsg;i++) {
293     incomingMsgs[i].init(msgs[i]->getData());
294     
295     //  CkPrintf("[%d] Incoming reduction message %d has average utilization %lg\n", CkMyPe(),  i, averageUtilizationInBuffer(incomingMsgs[i])); 
296     //   CkPrintf("Is buffer %d sane? %s\n", i, isCompressedBufferSane(incomingMsgs[i]) ? "yes": "no" );
297
298
299     totalsize += msgs[i]->getSize();
300     //  CkPrintf("BEGIN MERGE MESSAGE=========================================================\n");
301     //   printCompressedBuf(incomingMsgs[i]);
302     
303     // Read first value from message. 
304     // Make sure all messages have the same number of bins
305     if(i==0)
306       numBins = incomingMsgs[i].pop<numBins_T>();
307     else 
308       CkAssert( numBins ==  incomingMsgs[i].pop<numBins_T>() );
309     
310     // Read second value from message. 
311     numProcsRepresentedInMessage[i] = incomingMsgs[i].pop<numProcs_T>();
312     totalProcsAcrossAllMessages += numProcsRepresentedInMessage[i];
313     //    CkPrintf("Number of procs in message[%d] is %d\n", i,  (int)numProcsRepresentedInMessage[i]);
314   }
315   
316   compressedBuffer dest(totalsize + 100); 
317   
318   // build a compressed representation of each merged bin
319   dest.push<numBins_T>(numBins);
320   dest.push<numProcs_T>(totalProcsAcrossAllMessages);
321   
322   for(int i=0; i<numBins; i++){
323     mergeCompressedBin(incomingMsgs, nMsg, numProcsRepresentedInMessage, totalProcsAcrossAllMessages, dest);
324   }
325   
326   // CkPrintf("END MERGE RESULT=========================================================\n");
327   // printCompressedBuf(dest);
328
329
330   //CkPrintf("[%d] Merged buffer has average utilization %lg \n", CkMyPe(), averageUtilizationInBuffer(dest));
331
332   //CkPrintf("Is resulting merged buffer sane? %s\n", isCompressedBufferSane(dest) ? "yes": "no" );  
333   
334   compressedBuffer dest2 = moveTinyEntriesToOther(dest, 0.10);
335   
336   //  CkPrintf("Is resulting merged Filtered buffer sane? %s\n", isCompressedBufferSane(dest2) ? "yes": "no" ); 
337
338   //  CkPrintf("[%d] Outgoing reduction (filtered) message has average utilization %lf \n", CkMyPe(), averageUtilizationInBuffer(dest2));
339
340   
341   CkReductionMsg *m = CkReductionMsg::buildNew(dest2.datalength(),dest2.buffer());   
342   dest.freeBuf();
343   delete[] incomingMsgs;
344   return m;
345 }
346
347
348
349
350
351
352
353 /// Create fake sum detail data in the compressed format (for debugging)
354  compressedBuffer fakeCompressedMessage(){
355    CkPrintf("[%d] fakeCompressedMessage\n", CkMyPe());
356    
357    compressedBuffer fakeBuf(10000);
358    
359    int numBins = 55;
360    int numProcs = 1000;
361
362    // build a compressed representation of each merged bin
363    fakeBuf.push<numBins_T>(numBins);
364    fakeBuf.push<numProcs_T>(numProcs);
365    for(int i=0; i<numBins; i++){
366      int numRecords = 3;
367      fakeBuf.push<entriesInBin_T>(numRecords);
368      for(int j=0;j<numRecords;j++){
369        fakeBuf.push<ep_T>(j*10+2);
370        fakeBuf.push<utilization_T>(120.00);
371      }  
372    }
373    
374    //CkPrintf("Fake Compressed Message:=========================================================\n");
375    //   printCompressedBuf(fakeBuf);
376
377    CkAssert(isCompressedBufferSane(fakeBuf));
378
379    return fakeBuf;
380  }
381
382
383  /// Create an empty message
384  compressedBuffer emptyCompressedBuffer(){
385    compressedBuffer result(sizeof(numBins_T));
386    result.push<numBins_T>(0);
387    return result;
388  }
389
390
391
392
393 /** print out the compressed buffer starting from its begining*/
394 void printCompressedBuf(compressedBuffer b){
395   // b should be passed in by value, and hence we can modify it
396   b.pos = 0;
397   int numEntries = b.pop<numBins_T>();
398   CkPrintf("Buffer contains %d records\n", numEntries);
399   int numProcs = b.pop<numProcs_T>();
400   CkPrintf("Buffer represents an average over %d PEs\n", numProcs);
401
402   for(int i=0;i<numEntries;i++){
403     entriesInBin_T recordLength = b.pop<entriesInBin_T>();
404     if(recordLength > 0){
405       CkPrintf("    Record %d is of length %d : ", i, recordLength);
406       
407       for(int j=0;j<recordLength;j++){
408         ep_T ep = b.pop<ep_T>();
409         utilization_T v = b.pop<utilization_T>();
410         CkPrintf("(%d,%f) ", ep, v);
411       }
412     
413       CkPrintf("\n");
414     }
415   }
416 }
417
418
419
420  bool isCompressedBufferSane(compressedBuffer b){
421    // b should be passed in by value, and hence we can modify it  
422    b.pos = 0;  
423    numBins_T numBins = b.pop<numBins_T>();  
424    numProcs_T numProcs = b.pop<numProcs_T>();  
425    
426    if(numBins > 2000){
427      ckout << "WARNING: numBins=" << numBins << endl;
428      return false;
429    }
430    
431    for(int i=0;i<numBins;i++){  
432      entriesInBin_T recordLength = b.pop<entriesInBin_T>();  
433      if(recordLength > 200){
434        ckout << "WARNING: recordLength=" << recordLength << endl;
435        return false;
436      }
437      
438      if(recordLength > 0){  
439        
440        for(int j=0;j<recordLength;j++){  
441          ep_T ep = b.pop<ep_T>();  
442          utilization_T v = b.pop<utilization_T>();  
443          //      CkPrintf("(%d,%f) ", ep, v);  
444          if(((ep>800 || ep <0 ) && ep != other_EP) || v < 0.0 || v > 251.0){
445            ckout << "WARNING: ep=" << ep << " v=" << v << endl;
446            return false;
447          }
448        }  
449        
450      }  
451    }  
452    
453    return true;
454  }
455
456
457
458  double averageUtilizationInBuffer(compressedBuffer b){
459    // b should be passed in by value, and hence we can modify it  
460    b.pos = 0;  
461    numBins_T numBins = b.pop<numBins_T>();  
462    numProcs_T numProcs = b.pop<numProcs_T>();  
463    
464    //   CkPrintf("[%d] averageUtilizationInBuffer numProcs=%d   (grep reduction message)\n", CkMyPe(), numProcs);
465    
466    double totalUtilization = 0.0;
467    
468    for(int i=0;i<numBins;i++) {  
469      entriesInBin_T entriesInBin = b.pop<entriesInBin_T>();     
470      for(int j=0;j<entriesInBin;j++){  
471        ep_T ep = b.pop<ep_T>();  
472        totalUtilization +=  b.pop<utilization_T>();  
473      }
474    }
475    
476    return totalUtilization / numBins / 2.5;
477  }
478  
479  
480
481 void sanityCheckCompressedBuf(compressedBuffer b){  
482    CkAssert(isCompressedBufferSane(b)); 
483  }  
484  
485
486
487 double TraceUtilization::sumUtilization(int startBin, int endBin){
488    int epInfoSize = getEpInfoSize();
489    
490    double a = 0.0;
491
492    for(int i=startBin; i<=endBin; i++){
493      for(int j=0; j<epInfoSize; j++){
494        a += cpuTime[(i%NUM_BINS)*epInfoSize+j];
495      }
496    }
497    return a;
498  }
499
500  
501  /// Create a compressed buffer of the n most recent sum detail samples
502  compressedBuffer TraceUtilization::compressNRecentSumDetail(int desiredBinsToSend){
503    //   CkPrintf("compressNRecentSumDetail(desiredBinsToSend=%d)\n", desiredBinsToSend);
504
505    int startBin =  cpuTimeEntriesSentSoFar();
506    int numEntries = getEpInfoSize();
507
508    int endBin = startBin + desiredBinsToSend - 1;
509    int binsToSend = endBin - startBin + 1;
510    CkAssert(binsToSend >= desiredBinsToSend );
511    incrementNumCpuTimeEntriesSent(binsToSend);
512
513
514 #if 0
515    bool nonePrinted = true;
516    for(int i=0;i<(NUM_BINS-1000);i+=1000){
517      double expectedU = sumUtilization(i, i+999);
518      if(expectedU > 0.0){
519           CkPrintf("[%d of %d] compressNRecentSumDetail All bins: start=%05d end=%05d values in array sum to %lg\n", CkMyPe(), CkNumPes(),  i, i+999, expectedU);
520        nonePrinted = false;
521      }
522    }
523    
524    if(nonePrinted)
525      CkPrintf("[%d of %d] compressNRecentSumDetail All bins are 0\n", CkMyPe(), CkNumPes() );
526
527    fflush(stdout);
528 #endif
529
530    int bufferSize = 8*(2+numEntries) * (2+binsToSend)+100;
531    compressedBuffer b(bufferSize);
532
533    b.push<numBins_T>(binsToSend);
534    b.push<numProcs_T>(1); // number of processors along reduction subtree. I am just one processor.
535    //   double myu = 0.0;
536    
537    for(int i=0; i<binsToSend; i++) {
538      // Create a record for bin i
539      //  CkPrintf("Adding record for bin %d\n", i);
540      int numEntriesInRecordOffset = b.push<entriesInBin_T>(0); // The number of entries in this record
541      
542      for(int e=0; e<numEntries; e++) {
543        double scaledUtilization = getUtilization(i+startBin,e) * 2.5; // use range of 0 to 250 for the utilization, so it can fit in an unsigned char
544        if(scaledUtilization > 0.0) {
545          //CkPrintf("scaledUtilization=%lg !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n", scaledUtilization);
546          if(scaledUtilization > 250.0)
547            scaledUtilization = 250.0;
548          
549          b.push<ep_T>(e);
550          b.push<utilization_T>(scaledUtilization);
551          //      myu += scaledUtilization;
552          b.increment<entriesInBin_T>(numEntriesInRecordOffset);
553        }
554      }
555    }
556    
557    // CkPrintf("[%d] compressNRecentSumDetail resulting buffer: averageUtilizationInBuffer()=%lg myu=%lg\n", CkMyPe(), averageUtilizationInBuffer(b), myu);
558    // fflush(stdout);
559    
560    return b;
561  }
562  
563
564
565
566
567 /** Merge the compressed entries from the first bin in each of the srcBuf buffers.
568      
569 */
570  void mergeCompressedBin(compressedBuffer *srcBufferArray, int numSrcBuf, int *numProcsRepresentedInMessage, int totalProcsAcrossAllMessages, compressedBuffer &destBuf){
571   // put a counter at the beginning of destBuf
572   int numEntriesInDestRecordOffset = destBuf.push<entriesInBin_T>(0);
573   
574   //  CkPrintf("BEGIN MERGE------------------------------------------------------------------\n");
575   
576   // Read off the number of bins in each buffer
577   int *remainingEntriesToRead = new int[numSrcBuf];
578   for(int i=0;i<numSrcBuf;i++){
579     remainingEntriesToRead[i] = srcBufferArray[i].pop<entriesInBin_T>();
580   }
581
582   int count = 0;
583   // Count remaining entries to process
584   for(int i=0;i<numSrcBuf;i++){
585     count += remainingEntriesToRead[i];
586   }
587   
588   while (count>0) {
589     // find first EP from all buffers (these are sorted by EP already)
590     int minEp = 10000;
591     for(int i=0;i<numSrcBuf;i++){
592       if(remainingEntriesToRead[i]>0){
593         int ep = srcBufferArray[i].peek<ep_T>();
594         if(ep < minEp){
595           minEp = ep;
596         }
597       }
598     }
599     
600     //   CkPrintf("[%d] mergeCompressedBin minEp found was %d   totalProcsAcrossAllMessages=%d\n", CkMyPe(), minEp, (int)totalProcsAcrossAllMessages);
601     
602     destBuf.increment<entriesInBin_T>(numEntriesInDestRecordOffset);
603
604     // Merge contributions from all buffers that list the EP
605     double v = 0.0;
606     for(int i=0;i<numSrcBuf;i++){
607       if(remainingEntriesToRead[i]>0){
608         int ep = srcBufferArray[i].peek<ep_T>(); 
609         if(ep == minEp){
610           srcBufferArray[i].pop<ep_T>();  // pop ep
611           double util = srcBufferArray[i].pop<utilization_T>();
612           v += util * numProcsRepresentedInMessage[i];
613           remainingEntriesToRead[i]--;
614           count --;
615         }
616       }
617     }
618
619     // create a new entry in the output for this EP.
620     destBuf.push<ep_T>(minEp);
621     destBuf.push<utilization_T>(v / (double)totalProcsAcrossAllMessages);
622
623   }
624
625
626   delete [] remainingEntriesToRead;
627   // CkPrintf("[%d] End of mergeCompressedBin:\n", CkMyPe() );
628   // CkPrintf("END MERGE ------------------------------------------------------------------\n");
629  }
630
631
632
633 #include "TraceUtilization.def.h"
634
635
636 /*@}*/
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656