New functionality in CCS streaming of sum detail data for projections. The reduction...
authorIsaac Dooley <idooley2@illinois.edu>
Mon, 11 May 2009 19:10:23 +0000 (19:10 +0000)
committerIsaac Dooley <idooley2@illinois.edu>
Mon, 11 May 2009 19:10:23 +0000 (19:10 +0000)
src/ck-perf/trace-summary.C
src/ck-perf/trace-summary.ci
src/ck-perf/trace-summary.h
src/ck-perf/trace-summaryBOC.h

index 6d42cb54d83f829fa9c05c74403c3a1eb14df1df..4425e1134078618e3b7d6ff7845bdf1591853dd4 100644 (file)
@@ -31,6 +31,9 @@ CkpvDeclare(int, binCount);
 CkpvDeclare(double, binSize);
 CkpvDeclare(double, version);
 
+
+CkpvDeclare(int, previouslySentBins);
+
 // Global Readonly
 CkGroupID traceSummaryGID;
 bool summaryCcsStreaming;
@@ -46,6 +49,8 @@ void _createTracesummary(char **argv)
 {
   DEBUGF(("%d createTraceSummary\n", CkMyPe()));
   CkpvInitialize(TraceSummary*, _trace);
+  CkpvInitialize(int, previouslySentBins);
+  CkpvAccess(previouslySentBins) = 0;
   CkpvAccess(_trace) = new  TraceSummary(argv);
   CkpvAccess(_traces)->addTrace(CkpvAccess(_trace));
 }
@@ -786,24 +791,32 @@ void TraceSummaryBOC::initCCS() {
     CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
     CkPrintf("Trace Summary now listening in for CCS Client\n");
     CcsRegisterHandler("CkPerfSummaryCcsClientCB", 
-                      CkCallback(CkIndex_TraceSummaryBOC::ccsClientRequest(NULL), sumProxy[0]));
+                      CkCallback(CkIndex_TraceSummaryBOC::ccsRequestSummaryDouble(NULL), sumProxy[0]));
     CcsRegisterHandler("CkPerfSummaryCcsClientCB uchar", 
-                      CkCallback(CkIndex_TraceSummaryBOC::ccsClientRequestUnsignedChar(NULL), sumProxy[0])); 
+                      CkCallback(CkIndex_TraceSummaryBOC::ccsRequestSummaryUnsignedChar(NULL), sumProxy[0])); 
     CcsRegisterHandler("CkPerfSumDetail uchar", 
-                      CkCallback(CkIndex_TraceSummaryBOC::ccsClientRequestSumDetailUnsignedChar(NULL), sumProxy[0])); 
+                      CkCallback(CkIndex_TraceSummaryBOC::ccsRequestSumDetailUnsignedChar(NULL), sumProxy[0])); 
     CcsRegisterHandler("CkPerfSumDetail compressed", 
-                      CkCallback(CkIndex_TraceSummaryBOC::ccsClientRequestSumDetailCompressed(NULL), sumProxy[0])); 
+                      CkCallback(CkIndex_TraceSummaryBOC::ccsRequestSumDetailCompressed(NULL), sumProxy[0])); 
+    CcsRegisterHandler("CkPerfSumDetail compressed PE0", 
+                      CkCallback(CkIndex_TraceSummaryBOC::ccsRequestSumDetailCompressedPE0(NULL), sumProxy[0])); 
 
+    CkPrintf("[%d] Setting up periodic startCollectData callback\n", CkMyPe());
     CcdCallOnConditionKeep(CcdPERIODIC_1second, startCollectData,
                           (void *)this);
     summaryCcsStreaming = CmiTrue;
   }
 }
 
-void TraceSummaryBOC::ccsClientRequest(CkCcsRequestMsg *m) {
+/** Return summary information as double precision values for each sample period. 
+    The actual data collection is in double precision values. 
+
+    The units on the returned values are total execution time across all PEs.
+*/
+void TraceSummaryBOC::ccsRequestSummaryDouble(CkCcsRequestMsg *m) {
   double *sendBuffer;
 
-  CkPrintf("[%d] Request from Client detected. Client message: %s\n", CkMyPe(), m->data);
+  CkPrintf("[%d] Request from Client detected.\n", CkMyPe());
 
   CkPrintf("Responding ...\n");
   int datalength = 0;
@@ -826,10 +839,15 @@ void TraceSummaryBOC::ccsClientRequest(CkCcsRequestMsg *m) {
 }
 
 
-void TraceSummaryBOC::ccsClientRequestUnsignedChar(CkCcsRequestMsg *m) {
+/** Return summary information as unsigned char values for each sample period. 
+    The actual data collection is in double precision values.
+
+    This returns the utilization in a range from 0 to 200.
+*/
+void TraceSummaryBOC::ccsRequestSummaryUnsignedChar(CkCcsRequestMsg *m) {
   unsigned char *sendBuffer;
 
-  CkPrintf("[%d] Request from Client detected. Client message: %s\n", CkMyPe(), m->data);
+  CkPrintf("[%d] Request from Client detected. \n", CkMyPe());
 
   CkPrintf("Responding ...\n");
   int datalength = 0;
@@ -863,20 +881,19 @@ void TraceSummaryBOC::ccsClientRequestUnsignedChar(CkCcsRequestMsg *m) {
 }
 
 
-
-void TraceSummaryBOC::ccsClientRequestSumDetailUnsignedChar(CkCcsRequestMsg *m) {
+/** This one may be broken */
+void TraceSummaryBOC::ccsRequestSumDetailUnsignedChar(CkCcsRequestMsg *m) {
   unsigned char *sendBuffer;
 
-  CkPrintf("[%d] Request from Client for sum detail data as unsigned chars. Client message: %s\n", CkMyPe(), m->data);
+  CkPrintf("[%d] Request from Client for sum detail data as unsigned chars.\n", CkMyPe());
 
   CkAssert(sumDetail);
 
   int _numEntries=_entryTable.size();
 
-  static int previouslySentBins = 0;
   SumLogPool * p = CkpvAccess(_trace)->pool();
   int numBinsAvailable = p->getNumEntries();
-  int binsToSend = numBinsAvailable - previouslySentBins;
+  int binsToSend = numBinsAvailable - CkpvAccess(previouslySentBins);
   CkPrintf("_numEntries=%d, binsToSend=%d\n", _numEntries, binsToSend);
 
   if (binsToSend < 1) {
@@ -897,7 +914,7 @@ void TraceSummaryBOC::ccsClientRequestSumDetailUnsignedChar(CkCcsRequestMsg *m)
       }
     }
 
-    previouslySentBins += binsToSend;
+    CkpvAccess(previouslySentBins) += binsToSend;
 
     int datalength = sizeof(unsigned char) * (_numEntries * binsToSend + 1);
     CcsSendDelayedReply(m->reply, datalength, (void *)sendBuffer);
@@ -910,32 +927,61 @@ void TraceSummaryBOC::ccsClientRequestSumDetailUnsignedChar(CkCcsRequestMsg *m)
 
 /**
 
+Send back to the client compressed sum-detail style measurements about the 
+utilization for each active PE combined across all PEs.
+
 The data format sent by this handler is a bunch of records(one for each bin) of the following format:
-#samples (EP,utilization)* 
+   #samples (EP,utilization)* 
 
+One example record for two EPS that executed during the sample period. 
+EP 3 used 150/200 of the time while EP 122 executed for 20/200 of the time. 
+All of these would be packed as bytes into the message:
+2 3 150 122 20
 
  */
-void TraceSummaryBOC::ccsClientRequestSumDetailCompressed(CkCcsRequestMsg *m) {
-  unsigned char *sendBuffer;
+void TraceSummaryBOC::ccsRequestSumDetailCompressed(CkCcsRequestMsg *m) {
+  CkPrintf("CCS request for compressed sum detail. (found %d stored in deque)\n",  storedSumDetailResults.size() );
+  CkAssert(sumDetail);
+  int datalength;
+
+  if (storedSumDetailResults.size()  == 0) {
+    unsigned char *sendBuffer = new unsigned char[1];
+    sendBuffer[0] = 255;
+    datalength = sizeof(unsigned char);
+    CcsSendDelayedReply(m->reply, datalength, (void *)sendBuffer);
+    delete [] sendBuffer;
+  } else {
+    CkReductionMsg * msg = storedSumDetailResults.front();
+    storedSumDetailResults.pop_front();
 
-  CkPrintf("[%d] Request from Client for sum detail data as unsigned chars. Client message: %s\n", CkMyPe(), m->data);
+    unsigned char *sendBuffer = (unsigned char *)msg->getData();
+    datalength = msg->getSize();
+    CcsSendDelayedReply(m->reply, datalength, (void *)sendBuffer);
 
-  CkAssert(sumDetail);
+    delete msg;
+  }
+
+  CkPrintf("CCS response of %d bytes sent.\n", datalength);
+  delete m;
+}
+
+
+/// Create a compressed buffer of the n most recent sum detail samples
+unsigned char * compressNRecentSumDetail(int &datalength, int binsToSend){
+  unsigned char * sendBuffer;
 
   int _numEntries=_entryTable.size();
 
-  static int previouslySentBins = 0;
   SumLogPool * p = CkpvAccess(_trace)->pool();
   int numBinsAvailable = p->getNumEntries();
-  int binsToSend = numBinsAvailable - previouslySentBins;
-  CkPrintf("_numEntries=%d, binsToSend=%d\n", _numEntries, binsToSend);
 
+  if(binsToSend > numBinsAvailable)
+    binsToSend = numBinsAvailable;
+  
   if (binsToSend < 1) {
     sendBuffer = new unsigned char[1];
     sendBuffer[0] = 255;
-    int datalength = sizeof(unsigned char);
-    CcsSendDelayedReply(m->reply, datalength, (void *)sendBuffer);
-    delete [] sendBuffer;
+    datalength = sizeof(unsigned char);
   } else {
     
     sendBuffer = new unsigned char[3*_numEntries * binsToSend];
@@ -958,19 +1004,62 @@ void TraceSummaryBOC::ccsClientRequestSumDetailCompressed(CkCcsRequestMsg *m) {
       startOfCurrentRecord = nextPos;
     }
 
-    previouslySentBins += binsToSend;
-
-    int datalength = sizeof(unsigned char) * nextPos;
-    CcsSendDelayedReply(m->reply, datalength, (void *)sendBuffer);
-    delete [] sendBuffer;
+    CkpvAccess(previouslySentBins) += binsToSend;    
+    datalength = sizeof(unsigned char) * nextPos;
   }
 
-  CkPrintf("Response Sent. Proceeding with computation.\n");
+    return sendBuffer;
+}
+
+/// Create a compressed buffer of the sum detail samples that occured since the previous call to this function (default max of 10000 bins).
+unsigned char * compressAvailableNewSumDetail(int &datalength, int max=10000){
+  const SumLogPool * p = CkpvAccess(_trace)->pool();
+  const int numBinsAvailable = p->getNumEntries();
+  int binsToSend = numBinsAvailable - CkpvAccess(previouslySentBins);
+  if(binsToSend > max)
+    binsToSend = max;
+
+  unsigned char *sendBuffer = compressNRecentSumDetail(datalength, binsToSend);
+
+  return sendBuffer;
+}
+
+/**
+
+Send back to the client sum-detail style measurements about the utilization for PE 0.
+
+The data format sent by this handler is a bunch of records(one for each bin) of the following format:
+#samples (EP,utilization)* 
+
+One example with two EPS that executed during the sample period. EP 3 used 150/200 of the time while EP 122 executed for 20/200 of the time. All of these would be packed as bytes into the message:
+2 3 150 122 20
+
+ */
+void TraceSummaryBOC::ccsRequestSumDetailCompressedPE0(CkCcsRequestMsg *m) {
+
+  CkPrintf("[%d] Request from Client for compressed sum detail for PE 0.\n", CkMyPe());
+
+  CkAssert(sumDetail);
+
+  int datalength;
+  unsigned char * sendBuffer = compressAvailableNewSumDetail(datalength);
+
+  CcsSendDelayedReply(m->reply, datalength, (void *)sendBuffer);
+  delete [] sendBuffer;
+  
+  CkPrintf("CCS Response of %d bytes sent.\n", datalength);
   delete m;
+
 }
 
+
+
+
+
+
 void startCollectData(void *data, double currT) {
   CkAssert(CkMyPe() == 0);
+  CkPrintf("startCollectData()\n");
   TraceSummaryBOC *sumObj = (TraceSummaryBOC *)data;
   int lastRequestedIndexBlock = sumObj->lastRequestedIndexBlock;
   double collectionGranularity = sumObj->collectionGranularity;
@@ -985,14 +1074,19 @@ void startCollectData(void *data, double currT) {
   //   request will pick up. This is to limit the amount of perturbation
   //   if it proves to be a problem.
   CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
-  sumProxy.collectData(startTime, 
+  sumProxy.collectSummaryData(startTime, 
+                      collectionGranularity,
+                      numBlocksToGet*indicesPerBlock);
+
+  sumProxy.collectSumDetailData(startTime, 
                       collectionGranularity,
                       numBlocksToGet*indicesPerBlock);
+
   // assume success
   sumObj->lastRequestedIndexBlock += numBlocksToGet; 
 }
 
-void TraceSummaryBOC::collectData(double startTime, double binSize,
+void TraceSummaryBOC::collectSummaryData(double startTime, double binSize,
                                  int numBins) {
   // CkPrintf("[%d] asked to contribute performance data\n", CkMyPe());
 
@@ -1009,12 +1103,12 @@ void TraceSummaryBOC::collectData(double startTime, double binSize,
   */
 
   CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
-  CkCallback cb(CkIndex_TraceSummaryBOC::dataCollected(NULL), sumProxy[0]);
+  CkCallback cb(CkIndex_TraceSummaryBOC::summaryDataCollected(NULL), sumProxy[0]);
   contribute(sizeof(double)*numBins, contribution, CkReduction::sum_double, 
             cb);
 }
 
-void TraceSummaryBOC::dataCollected(CkReductionMsg *msg) {
+void TraceSummaryBOC::summaryDataCollected(CkReductionMsg *msg) {
   CkAssert(CkMyPe() == 0);
   // **CWL** No memory management for the ccs buffer for now.
 
@@ -1029,6 +1123,91 @@ void TraceSummaryBOC::dataCollected(CkReductionMsg *msg) {
   delete msg;
 }
 
+
+/** Merge the compressed entries from the first bin in each of the srcBuf buffers.
+    Return a pointer to the byte just after all the data I filled in.
+    The pointers in srcBuf will be updated to point to their next available bin.
+*/
+unsigned char * mergeCompressedBin(unsigned char ** &srcBuf, unsigned char ** endBuf, int numSrcBuf, unsigned char * destBuf){
+  unsigned char *pos = destBuf;
+
+  // hack for now, just copy first buffer to destination
+  int c = 0;
+  for(c=0; c < (endBuf[0]-srcBuf[0]); c++){
+    destBuf[c] = (srcBuf[0])[c];
+  }
+
+  srcBuf[0] += c;
+  return pos + c;
+}
+
+
+/// A reducer for merging compressed sum detail data
+CkReductionMsg *sumDetailCompressedReduction(int nMsg,CkReductionMsg **msgs){
+  CkPrintf("[%d] sumDetailCompressedReduction(nMsgs=%d)\n", CkMyPe(), nMsg);
+
+  // start with a pointer to the beginning of all the compressed buffers
+  unsigned char **posPtr = new unsigned char*[nMsg];
+  unsigned char **endBuf = new unsigned char*[nMsg];
+  for (int i=0;i<nMsg;i++) {
+    posPtr[i]=(unsigned char *)msgs[0]->getData();
+    endBuf[i]=posPtr[i] + msgs[0]->getSize();
+  }  
+
+  unsigned char * newBuf = new unsigned char[1000000];
+  unsigned char * dest = newBuf;
+
+  // build a compressed representation of each merged bin
+  bool haveMoreBins;
+  do {
+    dest = mergeCompressedBin(posPtr, endBuf, nMsg, dest);
+    
+    haveMoreBins = true;
+    for(int i=0;i<nMsg;i++){
+      if(posPtr[i] >= endBuf[i])
+       haveMoreBins = false;
+    }
+  } while (haveMoreBins);
+
+
+  return CkReductionMsg::buildNew((dest-newBuf)*sizeof(unsigned char),newBuf);   
+}
+
+/// A reduction type for merging compressed sum detail data
+CkReduction::reducerType sumDetailCompressedReducer;
+/// An initnode registration function for the reducer
+void registerIdleTimeReduction(void) {
+  sumDetailCompressedReducer=CkReduction::addReducer(sumDetailCompressedReduction);
+}
+
+
+
+
+
+void TraceSummaryBOC::collectSumDetailData(double startTime, double binSize, int numBins) {
+
+  int datalength;
+  unsigned char *buffer = compressNRecentSumDetail(datalength, numBins);
+
+  CkPrintf("[%d] contributing %d bytes worth of SumDetail data\n", CkMyPe(), datalength);
+
+  //  CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
+  CkCallback cb(CkIndex_TraceSummaryBOC::sumDetailDataCollected(NULL), thisProxy[0]);
+  contribute(datalength, buffer, sumDetailCompressedReducer, cb);
+
+  delete[] buffer;
+}
+
+
+void TraceSummaryBOC::sumDetailDataCollected(CkReductionMsg *msg) {
+  CkAssert(CkMyPe() == 0);
+  CkPrintf("[%d] Reduction of SumDetail stored in storedSumDetailResults deque\n", CkMyPe());
+  storedSumDetailResults.push_back(msg); 
+}
+
+
+
+
 void TraceSummaryBOC::startSumOnly()
 {
   CmiAssert(CkMyPe() == 0);
index 7eeb6eea15610c46451c8cd19ef9c045cd122ed2..3b298e42d24a9d1f24fc4e27bc3520cbb7c6bca1 100644 (file)
@@ -9,7 +9,8 @@ module TraceSummary {
   };
 
   initnode void initTraceSummaryBOC();
-  
+  initnode void registerIdleTimeReduction();
+
   group [migratable] TraceSummaryBOC {
     entry TraceSummaryBOC(void);
     entry void startSumOnly();
@@ -19,13 +20,18 @@ module TraceSummary {
     entry void initCCS();
 
     // The ccs handlers:
-    entry void ccsClientRequest(CkCcsRequestMsg *m);
-    entry void ccsClientRequestUnsignedChar(CkCcsRequestMsg *m);
-    entry void ccsClientRequestSumDetailUnsignedChar(CkCcsRequestMsg *m);
-    entry void ccsClientRequestSumDetailCompressed(CkCcsRequestMsg *m);
+    entry void ccsRequestSummaryDouble(CkCcsRequestMsg *m);
+    entry void ccsRequestSummaryUnsignedChar(CkCcsRequestMsg *m);
+    entry void ccsRequestSumDetailUnsignedChar(CkCcsRequestMsg *m);
+    entry void ccsRequestSumDetailCompressed(CkCcsRequestMsg *m);
+    entry void ccsRequestSumDetailCompressedPE0(CkCcsRequestMsg *m);
+
+    entry void collectSummaryData(double startTime, double binSize, int numBins);
+    entry void summaryDataCollected(CkReductionMsg *);
+
+    entry void collectSumDetailData(double startTime, double binSize, int numBins);
+    entry void sumDetailDataCollected(CkReductionMsg *);
 
-    entry void collectData(double startTime, double binSize, int numBins);
-    entry void dataCollected(CkReductionMsg *);
   };
 
 };
index ab71834d3ce152fb45bf047759238677d5752792..d031df1d8542f0c325270e47d8fb58ea3cb37f39 100644 (file)
@@ -209,7 +209,7 @@ class SumLogPool {
     void addEventType(int eventType, double time);
     void startPhase(int phase) { phaseTab.startPhase(phase); }
     BinEntry *bins() { return pool; }
-    int getNumEntries() { return numBins; }
+    int getNumEntries() const { return numBins; }
     void updateSummaryDetail(int epIdx, double startTime, double endTime);
 
     // accessors to normal summary data
index b1b6f948580ddd427fee1aae8750c0ff609a2284..7979e7fb7adc199b7c890ccda9de36bbdfde5c17 100644 (file)
@@ -1,5 +1,6 @@
 #include "ckcallback-ccs.h"
 #include "TraceSummary.decl.h"
+#include <deque>
 
 extern CkGroupID traceSummaryGID;
 extern bool summaryCcsStreaming;
@@ -26,6 +27,7 @@ private:
   int  nBins;
   int nTracedPEs;
 
+
 public:
   /* CCS support variables */
   int lastRequestedIndexBlock;
@@ -34,6 +36,8 @@ public:
   int nBufferedBins;
   CkVec<double> *ccsBufferedData;
   int nextBinIndexCcs;
+  std::deque<CkReductionMsg *> storedSumDetailResults;
+
 
 public:
   TraceSummaryBOC(void): count(0), bins(NULL), nBins(0), 
@@ -45,13 +49,17 @@ public:
 
   /* CCS support methods/entry methods */
   void initCCS();
-  void ccsClientRequest(CkCcsRequestMsg *m);
-  void ccsClientRequestUnsignedChar(CkCcsRequestMsg *m);
-  void ccsClientRequestSumDetailUnsignedChar(CkCcsRequestMsg *m);
-  void ccsClientRequestSumDetailCompressed(CkCcsRequestMsg *m);
+  void ccsRequestSummaryDouble(CkCcsRequestMsg *m);
+  void ccsRequestSummaryUnsignedChar(CkCcsRequestMsg *m);
+  void ccsRequestSumDetailUnsignedChar(CkCcsRequestMsg *m);
+  void ccsRequestSumDetailCompressed(CkCcsRequestMsg *m);
+  void ccsRequestSumDetailCompressedPE0(CkCcsRequestMsg *m);
+
+  void collectSummaryData(double startTime, double binSize, int numBins);
+  void summaryDataCollected(CkReductionMsg *);
+  void collectSumDetailData(double startTime, double binSize, int numBins);
+  void sumDetailDataCollected(CkReductionMsg *);
 
-  void collectData(double startTime, double binSize, int numBins);
-  void dataCollected(CkReductionMsg *);
 private:
   void write();
 };