Improving the CCS sum detail streaming reporting tool.
authorIsaac Dooley <idooley2@illinois.edu>
Thu, 14 May 2009 20:45:38 +0000 (20:45 +0000)
committerIsaac Dooley <idooley2@illinois.edu>
Thu, 14 May 2009 20:45:38 +0000 (20:45 +0000)
src/ck-perf/trace-summary.C
src/ck-perf/trace-summary.ci
src/ck-perf/trace-summary.h

index 235f7a016262747b6c48c4452ba3702ad9016de7..522c36aed730e652378385dc336e6386d359095a 100644 (file)
@@ -34,6 +34,139 @@ CkpvDeclare(double, version);
 
 CkpvDeclare(int, previouslySentBins);
 
+
+
+/** 
+    A class that reads/writes a buffer out of different types of data.
+
+    This class exists because I need to get references to parts of the buffer 
+    that have already been used so that I can increment counters inside the buffer.
+*/
+
+class compressedBuffer {
+public:
+  char* buf;
+  int pos; ///<< byte position just beyond the previously read/written data
+
+  compressedBuffer(){
+    buf = NULL;
+    pos = 0;
+  }
+
+  compressedBuffer(int bytes){
+    buf = (char*)malloc(bytes);
+    bzero(buf, bytes);
+    pos = 0;
+  }
+
+  compressedBuffer(void *buffer){
+    buf = (char*)buffer;
+    pos = 0;
+  }
+    
+  void init(void *buffer){
+    buf = (char*)buffer;
+    pos = 0;
+  }
+    
+  inline void * currentPtr(){
+    return (void*)(buf+pos);
+  }
+
+  template <typename T>
+  T read(int offset){
+    // to resolve unaligned writes causing bus errors, need memcpy
+    T v;
+    memcpy(&v, buf+offset, sizeof(T));
+    return v;
+  }
+    
+  template <typename T>
+  void write(T v, int offset){
+    // to resolve unaligned writes causing bus errors, need memcpy
+    memcpy(buf+offset, &v, sizeof(T));
+  }
+    
+  template <typename T>
+  void increment(int offset){
+    T temp;
+    temp = read<T>(offset);
+    temp ++;
+    write<T>(temp, offset);
+  }
+
+  template <typename T>
+  void accumulate(T v, int offset){
+    T temp;
+    temp = read<T>(offset);
+    temp += v;
+    write<T>(temp, offset);
+  }
+  
+  template <typename T>
+  int push(T v){
+    int oldpos = pos;
+    write<T>(v, pos);
+    pos += sizeof(T);
+    return oldpos;
+  }
+  
+  template <typename T>
+  T pop(){
+    T temp = read<T>(pos);
+    pos += sizeof(T);
+    return temp;
+  }
+
+  template <typename T>
+  T peek(){
+    return *((T*)currentPtr());
+  }
+
+  template <typename T0, typename T>
+  T peekSecond(){
+    T temp;
+    memcpy(&temp, buf+pos+sizeof(T0), sizeof(T));
+    return temp;
+  }
+
+  int datalength(){
+    return pos;
+  }
+     
+  void * buffer(){
+    return (void*) buf;
+  }  
+
+  void freeBuf(){
+    free(buf);
+  }
+
+  ~compressedBuffer(){
+    // don't free the buf because the user my have supplied the buffer in the 
+  }
+    
+  };
+
+
+// Predeclarations of the functions at the bottom of this file
+compressedBuffer compressAvailableNewSumDetail(int max=10000);
+void mergeCompressedBin(compressedBuffer *srcBufferArray, int numSrcBuf, int *numProcsRepresentedInMessage, int totalProcsAcrossAllMessages, compressedBuffer &destBuffer);
+compressedBuffer compressNRecentSumDetail(int desiredBinsToSend);
+CkReductionMsg *sumDetailCompressedReduction(int nMsg,CkReductionMsg **msgs);
+void printCompressedBuf(compressedBuffer b);
+compressedBuffer fakeCompressedMessage();
+
+
+/// A reduction type for merging compressed sum detail data
+CkReduction::reducerType sumDetailCompressedReducer;
+/// An initnode registration function for the reducer
+void registerIdleTimeReduction(void) {
+  sumDetailCompressedReducer=CkReduction::addReducer(sumDetailCompressedReduction);
+}
+
+
+
 // Global Readonly
 CkGroupID traceSummaryGID;
 bool summaryCcsStreaming;
@@ -125,6 +258,7 @@ void SumLogPool::addEventType(int eventType, double time)
 SumLogPool::SumLogPool(char *pgm) : numBins(0), phaseTab(MAX_PHASES) 
 {
    // TBD: Can this be moved to initMem?
+  cpuTime = NULL;
    poolSize = CkpvAccess(binCount);
    if (poolSize % 2) poolSize++;       // make sure it is even
    pool = new BinEntry[poolSize];
@@ -794,12 +928,8 @@ void TraceSummaryBOC::initCCS() {
                       CkCallback(CkIndex_TraceSummaryBOC::ccsRequestSummaryDouble(NULL), sumProxy[0]));
     CcsRegisterHandler("CkPerfSummaryCcsClientCB uchar", 
                       CkCallback(CkIndex_TraceSummaryBOC::ccsRequestSummaryUnsignedChar(NULL), sumProxy[0])); 
-    CcsRegisterHandler("CkPerfSumDetail uchar", 
-                      CkCallback(CkIndex_TraceSummaryBOC::ccsRequestSumDetailUnsignedChar(NULL), sumProxy[0])); 
     CcsRegisterHandler("CkPerfSumDetail compressed", 
                       CkCallback(CkIndex_TraceSummaryBOC::ccsRequestSumDetailCompressed(NULL), sumProxy[0])); 
-    CcsRegisterHandler("CkPerfSumDetail compressed PE0", 
-                      CkCallback(CkIndex_TraceSummaryBOC::ccsRequestSumDetailCompressedPE0(NULL), sumProxy[0])); 
 
     CkPrintf("[%d] Setting up periodic startCollectData callback\n", CkMyPe());
     CcdCallOnConditionKeep(CcdPERIODIC_1second, startCollectData,
@@ -881,50 +1011,6 @@ void TraceSummaryBOC::ccsRequestSummaryUnsignedChar(CkCcsRequestMsg *m) {
 }
 
 
-/** This one may be broken */
-void TraceSummaryBOC::ccsRequestSumDetailUnsignedChar(CkCcsRequestMsg *m) {
-  unsigned char *sendBuffer;
-
-  CkPrintf("[%d] Request from Client for sum detail data as unsigned chars.\n", CkMyPe());
-
-  CkAssert(sumDetail);
-
-  int _numEntries=_entryTable.size();
-
-  SumLogPool * p = CkpvAccess(_trace)->pool();
-  int numBinsAvailable = p->getNumEntries();
-  int binsToSend = numBinsAvailable - CkpvAccess(previouslySentBins);
-  CkPrintf("_numEntries=%d, binsToSend=%d\n", _numEntries, binsToSend);
-
-  if (binsToSend < 1) {
-    sendBuffer = new unsigned char[1];
-    sendBuffer[0] = 255;
-    int datalength = sizeof(unsigned char);
-    CcsSendDelayedReply(m->reply, datalength, (void *)sendBuffer);
-    delete [] sendBuffer;
-  } else {
-    
-    sendBuffer = new unsigned char[_numEntries * binsToSend + 1];
-    sendBuffer[0] = _numEntries;
-
-    for(int i=0; i<binsToSend; i++) {
-      for(int e=0; e<_numEntries; e++) {
-       unsigned char u= p->getUtilization(i,e) * 2.0;
-       sendBuffer[2+i*_numEntries+e] = u;
-      }
-    }
-
-    CkpvAccess(previouslySentBins) += binsToSend;
-
-    int datalength = sizeof(unsigned char) * (_numEntries * binsToSend + 1);
-    CcsSendDelayedReply(m->reply, datalength, (void *)sendBuffer);
-    delete [] sendBuffer;
-  }
-
-  CkPrintf("Response Sent. Proceeding with computation.\n");
-  delete m;
-}
-
 /**
 
 Send back to the client compressed sum-detail style measurements about the 
@@ -944,6 +1030,14 @@ void TraceSummaryBOC::ccsRequestSumDetailCompressed(CkCcsRequestMsg *m) {
   CkAssert(sumDetail);
   int datalength;
 
+#if 0
+
+  compressedBuffer fakeMessage = fakeCompressedMessage();
+  CcsSendDelayedReply(m->reply, fakeMessage.datalength(), fakeMessage.buffer() );
+  fakeMessage.freeBuf();
+
+#else
+
   if (storedSumDetailResults.size()  == 0) {
     int *sendBuffer = new int[1];
     sendBuffer[0] = -1;
@@ -960,196 +1054,13 @@ void TraceSummaryBOC::ccsRequestSumDetailCompressed(CkCcsRequestMsg *m) {
     
     delete msg;
   }
+#endif
 
   CkPrintf("CCS response of %d bytes sent.\n", datalength);
   delete m;
 }
 
 
-/// Create a compressed buffer of the n most recent sum detail samples
-unsigned char * compressNRecentSumDetail(int &datalength, int binsToSend){
-  unsigned char * sendBuffer;
-
-  int _numEntries=_entryTable.size();
-
-  SumLogPool * p = CkpvAccess(_trace)->pool();
-  int numBinsAvailable = p->getNumEntries();
-
-  if(binsToSend > numBinsAvailable)
-    binsToSend = numBinsAvailable;
-  
-  if (binsToSend < 1) {
-    sendBuffer = new unsigned char[1];
-    sendBuffer[0] = 255;
-    datalength = sizeof(unsigned char);
-  } else {
-    
-    sendBuffer = new unsigned char[3*_numEntries * binsToSend];
-    int nextPos;
-    int startOfCurrentRecord = 0;
-
-    for(int i=0; i<binsToSend; i++) {
-      // Create a record for bin i
-      sendBuffer[startOfCurrentRecord] = 0; // The number of entries in this record
-      nextPos = startOfCurrentRecord+1;
-      for(int e=0; e<_numEntries; e++) {
-       unsigned char u= p->getUtilization(i,e) * 2.0;
-       if(u > 0) {
-         sendBuffer[nextPos] = e;
-         sendBuffer[nextPos+1] = u;
-         nextPos += 2;
-         sendBuffer[startOfCurrentRecord] ++;
-       }
-      }
-      startOfCurrentRecord = nextPos;
-    }
-
-    CkpvAccess(previouslySentBins) += binsToSend;    
-    datalength = sizeof(unsigned char) * nextPos;
-  }
-
-    return sendBuffer;
-}
-
-
-
-/** print out the compressed buffer */
-void printCompressedBuf(float *buf){
-  int pos = 0;
-  int *bufInt = (int *) buf;
-  int numEntries = bufInt[pos++];
-  CkPrintf("Buffer contains %d records\n", numEntries);
-  for(int i=0;i<numEntries;i++){
-    int recordLength = bufInt[pos++];
-    CkPrintf("    Record %d is of length %d : ", i, recordLength);
-    
-    for(int j=0;j<recordLength;j++){
-      int ep = bufInt[pos++];
-      float v = buf[pos++];
-      CkPrintf("(%d,%f) ", ep, v);
-    }
-
-    CkPrintf("\n");
-
-  }
-}
-
-
-
-/** print out the compressed buffer */
-void printCompressedRecord(float *buf){
-  int *bufInt = (int *) buf;
-  int pos = 0;
-  int recordLength = bufInt[pos++];
-  CkPrintf("    Record is of length %d : ", recordLength);
-  for(int j=0;j<recordLength;j++){
-    int ep = bufInt[pos++];
-    float v = buf[pos++];
-    CkPrintf("(%d,%f)", ep, v);
-  }
-  CkPrintf("\n");
-}
-
-
-/// Create a compressed buffer of the n most recent sum detail samples as floating point values
-float * compressNRecentSumDetailFloat(int &datalength, int desiredBinsToSend){
-  float * sendBuffer;
-  CkPrintf("compressNRecentSumDetailFloat(desiredBinsToSend=%d)\n", desiredBinsToSend);
-  int _numEntries=_entryTable.size();
-
-  SumLogPool * p = CkpvAccess(_trace)->pool();
-  int numBinsAvailable = p->getNumEntries();
-
-  int binsToSend = desiredBinsToSend;
-
-  if(binsToSend > numBinsAvailable)
-    binsToSend = numBinsAvailable;
-  
-  if (binsToSend < 1) {
-    sendBuffer = new float[1];
-    int * sendBufferInt = (int *) sendBuffer;
-    sendBufferInt[0] = -1;
-    CkAssert( ((int *)sendBuffer)[0] ==-1);
-    datalength = sizeof(float);
-  } else {
-    sendBuffer = new float[3*_numEntries * binsToSend+100];
-    CkAssert(sizeof(int) == sizeof(float));
-    int *sendBufferInt =  (int *)sendBuffer;
-    int pos = 0;
-    
-    sendBufferInt[pos++] = binsToSend;
-    CkAssert( ((int *)sendBuffer)[0] == binsToSend );
-
-    int startingEntry = numBinsAvailable - binsToSend;
-    for(int i=0; i<binsToSend; i++) {
-      // Create a record for bin i
-      int startOfCurrentRecord = pos++;
-      sendBufferInt[startOfCurrentRecord] = 0; // The number of entries in this record
-      for(int e=0; e<_numEntries; e++) {
-       float u= p->getUtilization(i+startingEntry,e);
-       if(u > 0) {
-         sendBufferInt[pos++] = e;
-         sendBuffer[pos++] = u;
-         sendBufferInt[startOfCurrentRecord]++;
-       }
-      }
-    }
-    
-    datalength = sizeof(float) * pos;
-  }
-
-  CkAssert( ((int *)sendBuffer)[0] == binsToSend || ((int *)sendBuffer)[0] ==-1);
-
-  //  CkPrintf("Created a new buffer in compressNRecentSumDetailFloat binsToSend=%d\n", binsToSend);
-  //  printCompressedBuf(sendBuffer);
-
-  return sendBuffer;
-}
-
-/// Create a compressed buffer of the sum detail samples that occured since the previous call to this function (default max of 10000 bins).
-unsigned char * compressAvailableNewSumDetail(int &datalength, int max=10000){
-  const SumLogPool * p = CkpvAccess(_trace)->pool();
-  const int numBinsAvailable = p->getNumEntries();
-  int binsToSend = numBinsAvailable - CkpvAccess(previouslySentBins);
-  if(binsToSend > max)
-    binsToSend = max;
-
-  unsigned char *sendBuffer = compressNRecentSumDetail(datalength, binsToSend);
-
-  return sendBuffer;
-}
-
-/**
-
-Send back to the client sum-detail style measurements about the utilization for PE 0.
-
-The data format sent by this handler is a bunch of records(one for each bin) of the following format:
-#samples (EP,utilization)* 
-
-One example with two EPS that executed during the sample period. EP 3 used 150/200 of the time while EP 122 executed for 20/200 of the time. All of these would be packed as bytes into the message:
-2 3 150 122 20
-
- */
-void TraceSummaryBOC::ccsRequestSumDetailCompressedPE0(CkCcsRequestMsg *m) {
-
-  CkPrintf("[%d] Request from Client for compressed sum detail for PE 0.\n", CkMyPe());
-
-  CkAssert(sumDetail);
-
-  int datalength;
-  unsigned char * sendBuffer = compressAvailableNewSumDetail(datalength);
-
-  CcsSendDelayedReply(m->reply, datalength, (void *)sendBuffer);
-  delete [] sendBuffer;
-  
-  CkPrintf("CCS Response of %d bytes sent.\n", datalength);
-  delete m;
-
-}
-
-
-
-
 
 
 void startCollectData(void *data, double currT) {
@@ -1169,9 +1080,10 @@ void startCollectData(void *data, double currT) {
   //   request will pick up. This is to limit the amount of perturbation
   //   if it proves to be a problem.
   CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
-  sumProxy.collectSummaryData(startTime, 
-                      collectionGranularity,
-                      numBlocksToGet*indicesPerBlock);
+
+//   sumProxy.collectSummaryData(startTime, 
+//                    collectionGranularity,
+//                    numBlocksToGet*indicesPerBlock);
 
   sumProxy.collectSumDetailData(startTime, 
                       collectionGranularity,
@@ -1219,180 +1131,33 @@ void TraceSummaryBOC::summaryDataCollected(CkReductionMsg *msg) {
 }
 
 
-
-
-/** Merge the compressed entries from the first bin in each of the srcBuf buffers.
-    Return a pointer to the byte just after all the data I filled in.
-    The pointers in srcBuf will be updated to point to their next available bin.
-*/
-void mergeCompressedBin(float ** &srcBuf, int numSrcBuf, float * &destBuf){
-  // put a counter at the beginning of destBuf
-  ((int*)destBuf)[0] = 0;
-
-
-  float *pos = destBuf+1;
-  
-  //  CkPrintf("[%d] mergeCompressedBin(numSrcBuf=%d)\n",CkMyPe(), numSrcBuf);
-
-  for(int i=0;i<numSrcBuf;i++){
-    for(int j=i+1;j<numSrcBuf;j++){
-      if(srcBuf[i] == srcBuf[j]){
-       CkPrintf("[%d] ERROR: srcBuf[%d]  == srcBuf[%d]    numSrcBuf=%d\n", CkMyPe(), i, j, numSrcBuf);
-       CkAbort("srcBuf[i] == srcBuf[j])");
-      }
-    }
-  }
-
-
-  // Print incoming buffers
-  for(int i=0;i<numSrcBuf;i++){
-    //    CkPrintf("MERGING ------------------------------------------------------------------\n");
-    //   printCompressedRecord(srcBuf[i]);
-  }
-
-
-  // Read off the number of entries for each buffer
-  int *remainingEntriesToRead = new int[numSrcBuf];
-  for(int i=0;i<numSrcBuf;i++){
-    remainingEntriesToRead[i] = ((int*)srcBuf[i])[0];
-    (srcBuf[i])++;
-    //  CkPrintf("Merging %d EP entries from srcBuf[%d]\n", remainingEntriesToRead[i], i);
-  }
-
-  // srcBuf[i] now points to the first record
-
-  int count = 0;
-  // Count remaining entries to process
-  for(int i=0;i<numSrcBuf;i++){
-    count += remainingEntriesToRead[i];
-  }
-  
-  while (count>0) {
-    // find first EP from all buffers (these are sorted by EP already)
-    int minEp = 10000;
-    for(int i=0;i<numSrcBuf;i++){
-      if(remainingEntriesToRead[i]>0){
-       int ep = ((int*)srcBuf[i])[0];
-       if(ep < minEp){
-         minEp = ep;
-       }
-      }
-    }
-    
-    ((int*)destBuf)[0] ++;
-
-    //   CkPrintf("Merging:  minEp=%d \n", minEp);
-
-    // create a new entry in the output for this EP.
-    ((int*)pos)[0] = minEp;
-    pos[1] = 0.0f;
-
-    // Merge contributions from all buffers that list the EP
-    for(int i=0;i<numSrcBuf;i++){
-      if(remainingEntriesToRead[i]>0){
-       int ep = ((int*)srcBuf[i])[0];
-       float v = (srcBuf[i])[1];
-       if(ep == minEp){
-         pos[1] += v;
-         (srcBuf[i])+=2;
-         remainingEntriesToRead[i]--;
-         //      CkPrintf("[%d] After ep=%d srcBuf[%d]=%p\n", CkMyPe(), ep, i, srcBuf[i]);
-       }
-      }
-    }
-
-    pos += 2;
-
-
-    // Count remaining entries to process
-    count = 0;
-    for(int i=0;i<numSrcBuf;i++){
-      count += remainingEntriesToRead[i];
-    }
-    
-  }
-
-
-  delete [] remainingEntriesToRead;
-  //  CkPrintf("End of mergeCompressedBin\n");
-
-
-  //  CkPrintf("MERGE RESULT: ------------------------------------------------------------------\n");
-  //  printCompressedRecord(destBuf);
-
-  destBuf = pos;
-
-}
-
-
-/// A reducer for merging compressed sum detail data
-CkReductionMsg *sumDetailCompressedReduction(int nMsg,CkReductionMsg **msgs){
-  CkPrintf("[%d] sumDetailCompressedReduction(nMsgs=%d)\n", CkMyPe(), nMsg);
-  CkAssert(sizeof(float)==sizeof(int));
-  // start with a pointer to the beginning of all the compressed buffers
-  float **posPtr = new float*[nMsg];
-
-  int numBins = 0;
-  for (int i=0;i<nMsg;i++) {
-    posPtr[i]=((float *)msgs[i]->getData());
-
-    //  CkPrintf("BEGIN MERGE MESSAGE=========================================================\n");
-    //   printCompressedBuf(posPtr[i]);
-
-    if(i==0)
-      numBins = ((int*)posPtr[i])[0];
-    else 
-      CkAssert( numBins == ((int*)posPtr[i])[0] );
-
-    posPtr[i] ++; // first entry is the total number of bins
-  }
-
-  float * newBuf = new float[100000];
-  float * dest = newBuf;
-
-  // build a compressed representation of each merged bin
-  
-  ((int*)dest)[0] = numBins;
-  dest++;
-  for(int i=0; i<numBins; i++){
-    mergeCompressedBin(posPtr, nMsg, dest);
-  }
-  
-  // CkPrintf("END MERGE RESULT=========================================================\n");
-  // printCompressedBuf(newBuf);
-   
-  return CkReductionMsg::buildNew((dest-newBuf) * sizeof(float),newBuf);   
-}
-
-/// A reduction type for merging compressed sum detail data
-CkReduction::reducerType sumDetailCompressedReducer;
-/// An initnode registration function for the reducer
-void registerIdleTimeReduction(void) {
-  sumDetailCompressedReducer=CkReduction::addReducer(sumDetailCompressedReduction);
-}
-
-
-
-
-
 void TraceSummaryBOC::collectSumDetailData(double startTime, double binSize, int numBins) {
-
+  
   int datalength;
-  float *buffer = compressNRecentSumDetailFloat(datalength, numBins);
+  compressedBuffer b = compressNRecentSumDetail(numBins);
+  //  CkPrintf("[%d] contributing buffer created by compressNRecentSumDetail: \n", CkMyPe());
+  //  printCompressedBuf(b);
+#if 0
+    b = fakeCompressedMessage();
+#endif
 
   CkPrintf("[%d] contributing %d bytes worth of SumDetail data\n", CkMyPe(), datalength);
-
+  
   //  CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
   CkCallback cb(CkIndex_TraceSummaryBOC::sumDetailDataCollected(NULL), thisProxy[0]);
-  contribute(datalength, buffer, sumDetailCompressedReducer, cb);
-
-  delete[] buffer;
+  contribute(b.datalength(), b.buffer(), sumDetailCompressedReducer, cb);
+  
+  b.freeBuf();
 }
 
 
 void TraceSummaryBOC::sumDetailDataCollected(CkReductionMsg *msg) {
   CkAssert(CkMyPe() == 0);
   CkPrintf("[%d] Reduction of SumDetail completed. Result stored in storedSumDetailResults deque(sizes=%d)\n", CkMyPe(), storedSumDetailResults.size() );
+  
+  //  printCompressedBuf(msg->getData());
+
   storedSumDetailResults.push_back(msg); 
 }
 
@@ -1523,7 +1288,332 @@ void initTraceSummaryBOC()
   }
 }
 
+
+
+
+
+
+
+////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+
+
+
+/// A reducer for merging compressed sum detail data
+CkReductionMsg *sumDetailCompressedReduction(int nMsg,CkReductionMsg **msgs){
+  CkPrintf("[%d] sumDetailCompressedReduction(nMsgs=%d)\n", CkMyPe(), nMsg);
+  
+  compressedBuffer *incomingMsgs = new compressedBuffer[nMsg];
+  int *numProcsRepresentedInMessage = new int[nMsg];
+  
+  int numBins = 0;
+  int totalsize = 0;
+  int totalProcsAcrossAllMessages = 0;
+  
+  for (int i=0;i<nMsg;i++) {
+    incomingMsgs[i].init(msgs[i]->getData());
+    totalsize += msgs[i]->getSize();
+   //  CkPrintf("BEGIN MERGE MESSAGE=========================================================\n");
+    //   printCompressedBuf(incomingMsgs[i]);
+    
+    // Read first value from message. 
+    // Make sure all messages have the same number of bins
+    if(i==0)
+      numBins = incomingMsgs[i].pop<int>();
+    else 
+      CkAssert( numBins ==  incomingMsgs[i].pop<int>() );
+    
+    numProcsRepresentedInMessage[i] = incomingMsgs[i].pop<int>();
+    totalProcsAcrossAllMessages += numProcsRepresentedInMessage[i];
+    CkPrintf("Number of procs in message[%d] is %d\n", i,  numProcsRepresentedInMessage[i]);
+  }
+  
+  compressedBuffer dest(totalsize);
+  
+  // build a compressed representation of each merged bin
+  dest.push<int>(numBins);
+  dest.push<int>(totalProcsAcrossAllMessages);
+  
+  for(int i=0; i<numBins; i++){
+    mergeCompressedBin(incomingMsgs, nMsg, numProcsRepresentedInMessage, totalProcsAcrossAllMessages, dest);
+  }
+  
+  // CkPrintf("END MERGE RESULT=========================================================\n");
+  // printCompressedBuf(dest);
+  
+  CkReductionMsg *m = CkReductionMsg::buildNew(dest.datalength(),dest.buffer());   
+  dest.freeBuf();
+  delete[] incomingMsgs;
+  return m;
+}
+
+
+
+
+
+
+
+/// A reducer for merging compressed sum detail data
+ compressedBuffer fakeCompressedMessage(){
+   CkPrintf("[%d] fakeCompressedMessage\n", CkMyPe());
+   
+   compressedBuffer fakeBuf(10000);
+   
+   int numBins = 55;
+   int totalsize = 0;
+   int numProcs = 1000;
+
+   // build a compressed representation of each merged bin
+   fakeBuf.push<int>(numBins);
+   fakeBuf.push<int>(numProcs);
+   for(int i=0; i<numBins; i++){
+     int numRecords = 3;
+     fakeBuf.push<int>(numRecords);
+     for(int j=0;j<numRecords;j++){
+       fakeBuf.push<int>(j*10+2);
+       fakeBuf.push<float>(j+100.0);
+     }  
+   }
+   
+   //CkPrintf("Fake Compressed Message:=========================================================\n");
+   //   printCompressedBuf(fakeBuf);
+
+   return fakeBuf;
+ }
+
+
+
+
+
+
+
+/// Create a compressed buffer of the sum detail samples that occured since the previous call to this function (default max of 10000 bins).
+compressedBuffer compressAvailableNewSumDetail(int max){
+  const SumLogPool * p = CkpvAccess(_trace)->pool();
+  const int numBinsAvailable = p->getNumEntries();
+  int binsToSend = numBinsAvailable - CkpvAccess(previouslySentBins);
+  if(binsToSend > max)
+    binsToSend = max;
+
+  return compressNRecentSumDetail(binsToSend);
+}
+
+
+/** print out the compressed buffer starting from its begining*/
+void printCompressedBuf(compressedBuffer b){
+  // b should be passed in by value, and hence we can modify it
+  b.pos = 0;
+  int numEntries = b.pop<int>();
+  CkPrintf("Buffer contains %d records\n", numEntries);
+  for(int i=0;i<numEntries;i++){
+    int recordLength = b.pop<int>();
+    if(recordLength > 0){
+      CkPrintf("    Record %d is of length %d : ", i, recordLength);
+      
+      for(int j=0;j<recordLength;j++){
+       int ep = b.pop<int>();
+       float v = b.pop<float>();
+       CkPrintf("(%d,%f) ", ep, v);
+      }
+    
+      CkPrintf("\n");
+    }
+  }
+}
+
+
+
+// /** print out the compressed buffer */
+// void printCompressedRecord(compressedBuffer b){
+//   int recordLength = b.pop<int>();
+//   CkPrintf("    Record is of length %d : ", recordLength);
+//   for(int j=0;j<recordLength;j++){
+//     int ep = b.pop<int>();
+//     float v = b.pop<float>();
+//     CkPrintf("(%d,%f)", ep, v);
+//   }
+//   CkPrintf("\n");
+// }
+
+
+
+
+/// Create a compressed buffer of the n most recent sum detail samples
+ compressedBuffer compressNRecentSumDetail(int desiredBinsToSend){
+   CkPrintf("compressNRecentSumDetail(desiredBinsToSend=%d)\n", desiredBinsToSend);
+
+   int _numEntries=_entryTable.size();
+   SumLogPool * p = CkpvAccess(_trace)->pool();
+   int numBinsAvailable = p->getNumEntries();
+
+   int binsToSend = desiredBinsToSend;
+   if(binsToSend > numBinsAvailable)
+     binsToSend = numBinsAvailable;
+
+   int startBin = numBinsAvailable - binsToSend;
+  
+   CkPrintf("compressNRecentSumDetail binsToSend=%d\n", binsToSend);
+
+   if (binsToSend < 1) {
+     compressedBuffer b(sizeof(int));
+     b.push<int>(0);
+     return b;
+   } else {
+     compressedBuffer b(8*(2+_numEntries) * (2+binsToSend)+100);
+
+     b.push<int>(binsToSend);
+     b.push<int>(1); // number of processors in reduction subtree. I am just one processor.
+
+     for(int i=0; i<binsToSend; i++) {
+       // Create a record for bin i
+       //  CkPrintf("Adding record for bin %d\n", i);
+       int numEntriesInRecordOffset = b.push<int>(0); // The number of entries in this record
+       
+       for(int e=0; e<_numEntries; e++) {
+        double scaledUtilization = p->getUtilization(i+startBin,e) * 2.5; // use range of 0 to 250 for the utilization, so it can fit in an unsigned char
+        if(scaledUtilization > 250.0)
+          scaledUtilization = 250.0;
+        
+        if(scaledUtilization > 0.0) {
+          CkPrintf("Adding non-zero entry (%d,%lf) to bin %d\n", e, scaledUtilization, i);
+          b.push<int>(e);
+          b.push<float>(scaledUtilization);
+          b.increment<int>(numEntriesInRecordOffset);
+        } else{
+          
+        }
+       }
+     }
+     CkpvAccess(previouslySentBins) += binsToSend;    
+     return b;
+   }
+   
+ }
+
+
+
+/** Merge the compressed entries from the first bin in each of the srcBuf buffers.
+     
+*/
+ void mergeCompressedBin(compressedBuffer *srcBufferArray, int numSrcBuf, int *numProcsRepresentedInMessage, int totalProcsAcrossAllMessages, compressedBuffer &destBuf){
+  // put a counter at the beginning of destBuf
+  int numEntriesInDestRecordOffset = destBuf.push<int>(0); // get reference to the item we just pushed into the buffer
+  
+  for(int i=0;i<numSrcBuf;i++){
+    for(int j=i+1;j<numSrcBuf;j++){
+      if(srcBufferArray[i].buffer() == srcBufferArray[j].buffer()){
+       CkPrintf("[%d] ERROR: srcBuf[%d]  == srcBuf[%d]    numSrcBuf=%d\n", CkMyPe(), i, j, numSrcBuf);
+       CkAbort("srcBuf[i] == srcBuf[j])");
+      }
+    }
+  }
+
+
+  // Print incoming buffers
+  for(int i=0;i<numSrcBuf;i++){
+    //   CkPrintf("MERGING ------------------------------------------------------------------\n");
+    //   printCompressedBuf(srcBufferArray[i]);
+  }
+
+
+  // Read off the number of entries for each buffer
+  int *remainingEntriesToRead = new int[numSrcBuf];
+  for(int i=0;i<numSrcBuf;i++){
+    remainingEntriesToRead[i] = srcBufferArray[i].pop<int>();
+    //   CkPrintf("Merging %d EP entries from srcBuf[%d]\n", remainingEntriesToRead[i], i);
+  }
+
+
+  int count = 0;
+  // Count remaining entries to process
+  for(int i=0;i<numSrcBuf;i++){
+    count += remainingEntriesToRead[i];
+  }
+  
+  while (count>0) {
+    // find first EP from all buffers (these are sorted by EP already)
+    int minEp = 10000;
+    for(int i=0;i<numSrcBuf;i++){
+      if(remainingEntriesToRead[i]>0){
+       int ep = srcBufferArray[i].peek<int>();
+       if(ep < minEp){
+         minEp = ep;
+       }
+      }
+    }
+    
+    destBuf.increment<int>(numEntriesInDestRecordOffset);
+    
+    //   CkPrintf("Merging:  minEp=%d \n", minEp);
+
+    // create a new entry in the output for this EP.
+    destBuf.push<int>(minEp);
+    int destVOffset = destBuf.push<float>(0.0);
+    double v = 0.0;
+
+    // Merge contributions from all buffers that list the EP
+    for(int i=0;i<numSrcBuf;i++){
+      if(remainingEntriesToRead[i]>0){
+       int ep = srcBufferArray[i].peek<int>(); 
+       //srcBufferArray[i].peekSecond<int, float>();
+       if(ep == minEp){
+         srcBufferArray[i].pop<int>();  // pop ep
+         double util = srcBufferArray[i].pop<float>();
+         v += util * numProcsRepresentedInMessage[i];
+         remainingEntriesToRead[i]--;
+         //      CkPrintf("[%d] After ep=%d srcBuf[%d]=%p\n", CkMyPe(), ep, i, srcBuf[i]);
+       }
+      }
+    }
+
+    v /= (double)totalProcsAcrossAllMessages;
+    destBuf.write<float>(v, destVOffset); 
+
+
+    // Count remaining entries to process
+    count = 0;
+    for(int i=0;i<numSrcBuf;i++){
+      count += remainingEntriesToRead[i];
+    }
+    
+  }
+
+
+  delete [] remainingEntriesToRead;
+  // CkPrintf("End of mergeCompressedBin\n");
+  
+  
+  //CkPrintf("MERGE RESULT: ------------------------------------------------------------------\n");
+  //  printCompressedBuf(destBuf);
+
+}
+
+
+
+
+
+
 #include "TraceSummary.def.h"
 
 
 /*@}*/
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
index 3b298e42d24a9d1f24fc4e27bc3520cbb7c6bca1..443f8a47c3558ca9db280b1f22804f2b2af041be 100644 (file)
@@ -22,9 +22,7 @@ module TraceSummary {
     // The ccs handlers:
     entry void ccsRequestSummaryDouble(CkCcsRequestMsg *m);
     entry void ccsRequestSummaryUnsignedChar(CkCcsRequestMsg *m);
-    entry void ccsRequestSumDetailUnsignedChar(CkCcsRequestMsg *m);
     entry void ccsRequestSumDetailCompressed(CkCcsRequestMsg *m);
-    entry void ccsRequestSumDetailCompressedPE0(CkCcsRequestMsg *m);
 
     entry void collectSummaryData(double startTime, double binSize, int numBins);
     entry void summaryDataCollected(CkReductionMsg *);
index d031df1d8542f0c325270e47d8fb58ea3cb37f39..1f169a7097a7db550977c5717f8b0a8d3741a9cd 100644 (file)
@@ -219,7 +219,12 @@ class SumLogPool {
 
 
     inline double getCPUtime(unsigned int interval, unsigned int ep){
-        return cpuTime[interval*epInfoSize+ep]; }
+      if(cpuTime != NULL)
+        return cpuTime[interval*epInfoSize+ep]; 
+      else 
+       return 0.0;
+    }
+    
     inline void setCPUtime(unsigned int interval, unsigned int ep, double val){
         cpuTime[interval*epInfoSize+ep] = val; }
     inline double addToCPUtime(unsigned int interval, unsigned int ep, double val){