Added ability for trace summary to use CCS to stream performance data to
authorChee Wai Lee <cheelee@illinois.edu>
Tue, 13 Jan 2009 01:37:14 +0000 (01:37 +0000)
committerChee Wai Lee <cheelee@illinois.edu>
Tue, 13 Jan 2009 01:37:14 +0000 (01:37 +0000)
an external client via the +sumCCS runtime option. Current changes are
not production-quality (many assumptions made for CCS capabilities and
summary data collection) but should not impact non-CCS runs making use of
summary, sumonly or sumdetail.

src/ck-perf/trace-summary.C
src/ck-perf/trace-summary.ci
src/ck-perf/trace-summary.h
src/ck-perf/trace-summaryBOC.h

index 4f33ff1a4415c5fdbcc90fbbf1d6ab81fb729610..3c07cdf8c52c621f489ee6ac6bcfa52f5dc2cb24 100644 (file)
@@ -30,6 +30,10 @@ CkpvDeclare(int, binCount);
 CkpvDeclare(double, binSize);
 CkpvDeclare(double, version);
 
+// Global Readonly
+CProxy_TraceSummaryInit initProxy;
+bool summaryCcsStreaming;
+
 int sumonly = 0;
 int sumDetail = 0;
 
@@ -362,7 +366,9 @@ void SumLogPool::writeSts(void)
 void SumLogPool::add(double time, int pe) 
 {
   new (&pool[numBins++]) BinEntry(time);
-  if(poolSize==numBins) shrink();
+  if (poolSize==numBins) {
+    shrink();
+  }
 }
 
 // Called once per run of an EP
@@ -394,9 +400,9 @@ void SumLogPool::updateSummaryDetail(int epIdx, double startTime, double endTime
        while (endingBinIdx >= poolSize) {
          shrink();
          CmiAssert(CkpvAccess(binSize) > binSz);
-          binSz = CkpvAccess(binSize);
+         binSz = CkpvAccess(binSize);
          startingBinIdx = (int)(startTime/binSz);
-          endingBinIdx = (int)(endTime/binSz);
+         endingBinIdx = (int)(endTime/binSz);
        }
 
         if (startingBinIdx == endingBinIdx) {
@@ -453,6 +459,36 @@ void BinEntry::write(FILE* fp)
   writeU(fp, getU());
 }
 
+void TraceSummaryInit::ccsClientRequest(CkCcsRequestMsg *m) {
+  char *sendBuffer;
+
+  CkPrintf("[%d] Request from Client detected. Data reads as:\n", CkMyPe());
+  CkPrintf("%s\n",m->data);
+  CkPrintf("\n");
+
+  CkPrintf("Responding ...\n");
+  sendBuffer = (char *)ccsBufferedData->getVec();
+  CcsSendDelayedReply(m->reply, ccsBufferedData->length()*sizeof(double), 
+                     sendBuffer);
+  // clear the buffer
+  ccsBufferedData->removeAll();
+  CkPrintf("Response Sent. Proceeding with computation.\n");
+}
+
+void TraceSummaryInit::dataCollected(CkReductionMsg *msg) {
+  // **CWL** No memory management for the ccs buffer for now.
+
+  // CkPrintf("[%d] Reduction completed and received\n", CkMyPe());
+  double *recvData = (double *)msg->getData();
+  int numBins = msg->getSize()/sizeof(double);
+
+  // if there's an easier way to append a data block to a CkVec, I'll take it
+  for (int i=0; i<numBins; i++) {
+    ccsBufferedData->insertAtEnd(recvData[i]);
+  }
+  delete msg;
+}
+
 TraceSummary::TraceSummary(char **argv):binStart(0.0),bin(0.0),msgNum(0)
 {
   if (CkpvAccess(traceOnPe) == 0) return;
@@ -650,6 +686,29 @@ void TraceSummary::startPhase(int phase)
    _logPool->startPhase(phase);
 }
 
+void TraceSummary::fillData(double *buffer, double reqStartTime, 
+                           double reqBinSize, int reqNumBins) {
+  // buffer has to be pre-allocated by the requester and must be an array of
+  // size reqNumBins.
+  //
+  // Assumptions: **CWL** FOR DEMO ONLY - a production-capable version will
+  //              need a number of these assumptions dropped:
+  //              1) reqBinSize == binSize (unrealistic)
+  //              2) bins boundary aligned (ok even under normal circumstances)
+  //              3) bins are "factor"-aligned (where reqBinSize != binSize)
+  //              4) bins are always available (true unless flush)
+  //              5) bins always starts from 0 (unrealistic)
+
+  // works only because of 1)
+  // **CWL** - FRACKING STUPID NAME "binStart" has nothing to do with 
+  //           "starting" at all!
+  int binOffset = (int)(reqStartTime/reqBinSize); 
+  for (int i=binOffset; i<binOffset + reqNumBins; i++) {
+    // CkPrintf("[%d] %f\n", i, pool()->getTime(i));
+    buffer[i-binOffset] = pool()->getTime(i);
+  }
+}
+
 
 /// for TraceSummaryBOC
 
@@ -758,6 +817,28 @@ void TraceSummaryBOC::write(void)
 
 }
 
+void TraceSummaryBOC::collectData(double startTime, double binSize,
+                                 int numBins) {
+  // CkPrintf("[%d] asked to contribute performance data\n", CkMyPe());
+
+  double *contribution = new double[numBins];
+  for (int i=0; i<numBins; i++) {
+    contribution[i] = 0.0;
+  }
+
+  CkpvAccess(_trace)->fillData(contribution, startTime, binSize, numBins);
+
+  // DEBUG - print out pe 0's contribution.
+  /*
+  for (int i=0; i<numBins; i++) {
+    CkPrintf("[%d] %f\n", i, contribution[i]);
+  }
+  */
+  CkCallback cb(CkIndex_TraceSummaryInit::dataCollected(NULL), initProxy);
+  contribute(sizeof(double)*numBins, contribution, CkReduction::sum_double, 
+            cb);
+}
+
 extern "C" void CombineSummary()
 {
 #ifndef CMK_OPTIMIZE
index cbed2e9d556316619cb29ca0e1f93239a6860e69..cde051263f4d4db1d99bb7e3cc1dd82b8622cbc7 100644 (file)
@@ -1,19 +1,26 @@
 
 module TraceSummary {
 
+  readonly CkGroupID traceSummaryGID;
+  readonly CProxy_TraceSummaryInit initProxy;
+
+  readonly bool summaryCcsStreaming;
+
   mainchare TraceSummaryInit {
     entry TraceSummaryInit(CkArgMsg *m);
+    entry void ccsClientRequest(CkCcsRequestMsg *m);
+    entry void dataCollected(CkReductionMsg *);
   };
 
   initnode void initTraceSummaryBOC();
   
-  readonly CkGroupID traceSummaryGID;
-
   group [migratable] TraceSummaryBOC {
     entry TraceSummaryBOC(void);
     entry void startSumOnly();
     entry void askSummary(int size);
     entry void sendSummaryBOC(CkReductionMsg *);
+
+    entry void collectData(double startTime, double binSize, int numBins);
   };
 
 };
index c75091234a7f1ad06604a65cf2ab2e3c022cb985..50dbf4f346ff41d6f3b87f34e331c1bf9c0dca0a 100644 (file)
@@ -218,12 +218,17 @@ class SumLogPool {
        epInfo[i].clear();
       }
     }
-    void shrink(void) ;
+    void shrink(void);
     void addEventType(int eventType, double time);
     void startPhase(int phase) { phaseTab.startPhase(phase); }
     BinEntry *bins() { return pool; }
     int getNumEntries() { return numBins; }
     void updateSummaryDetail(int epIdx, double startTime, double endTime);
+
+    // accessors to normal summary data
+    inline double getTime(unsigned int interval) {
+      return pool[interval].time();
+    }
 };
 
 /// class for recording trace summary events 
@@ -273,6 +278,8 @@ class TraceSummary : public Trace {
        query utilities
     */
     SumLogPool *pool() { return _logPool; }
+    void fillData(double *buffer, double reqStartTime, 
+                 double reqBinSize, int reqNumBins);
 };
 
 #endif
index 0ba7fbfbf4deae2099d534d81b2814f91d077748..9b6051fffba6075714ca78b13517b4446e2fa847 100644 (file)
@@ -1,31 +1,85 @@
-
+#include "ckcallback-ccs.h"
 #include "TraceSummary.decl.h"
 
 extern CkGroupID traceSummaryGID;
+extern CProxy_TraceSummaryInit initProxy;
+extern bool summaryCcsStreaming;
+
+void masterCollectData(void *data, double currT);
 
 class TraceSummaryInit : public Chare {
-  public:
-  TraceSummaryInit(CkArgMsg*) {
+ public:
+  int lastRequestedIndexBlock;
+  int indicesPerBlock;
+  double collectionGranularity;
+  CkVec<double> *ccsBufferedData;
+  int nBufferedBins;
+ public:
+  TraceSummaryInit(CkArgMsg *m) {
+    lastRequestedIndexBlock = 0;
+    indicesPerBlock = 1000;
+    collectionGranularity = 0.001; // time in seconds
+    nBufferedBins = 0;
     traceSummaryGID = CProxy_TraceSummaryBOC::ckNew();
     CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
+    initProxy = thishandle;
     CkCallback *cb = new CkCallback(CkIndex_TraceSummaryBOC::sendSummaryBOC(NULL), 0, sumProxy);
     CProxy_TraceSummaryBOC(traceSummaryGID).ckSetReductionClient(cb);
+    if (CmiGetArgFlagDesc(m->argv,"+sumCCS",
+                         "CCS client connected to Trace Summary")) {
+      ccsBufferedData = new CkVec<double>();
+      CkPrintf("Trace Summary listening in for CCS Client\n");
+      CcsRegisterHandler("CkPerfSummaryCcsClientCB", 
+                        CkCallback(CkIndex_TraceSummaryInit::ccsClientRequest(NULL), thishandle));
+      CcdCallOnConditionKeep(CcdPERIODIC_1second, masterCollectData, 
+                            (void *)this);
+      summaryCcsStreaming = CmiTrue;
+    } else {
+      summaryCcsStreaming = CmiFalse;
+    }
   }
   TraceSummaryInit(CkMigrateMessage *m):Chare(m) {}
+  void dataCollected(CkReductionMsg *);  
+  void ccsClientRequest(CkCcsRequestMsg *m);
 };
 
+void masterCollectData(void *data, double currT) {
+  // CkPrintf("collectData called\n");
+  TraceSummaryInit *sumInitObj = (TraceSummaryInit *)data;
+
+  double startTime = sumInitObj->lastRequestedIndexBlock*
+    sumInitObj->collectionGranularity*sumInitObj->indicesPerBlock;
+  int numIndicesToGet = (int)floor((currT - startTime)/
+                                  sumInitObj->collectionGranularity);
+  int numBlocksToGet = numIndicesToGet/sumInitObj->indicesPerBlock;
+  // **TODO** consider limiting the total number of blocks each collection
+  //   request will pick up. This is to limit the amount of perturbation
+  //   if it proves to be a problem.
+  CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
+  sumProxy.collectData(startTime, 
+                      sumInitObj->collectionGranularity,
+                      numBlocksToGet*sumInitObj->indicesPerBlock);
+  // assume success
+  sumInitObj->lastRequestedIndexBlock += numBlocksToGet; 
+}
+
 class TraceSummaryBOC : public CBase_TraceSummaryBOC {
 private:
   int count;
   BinEntry *bins;
   int  nBins;
   int nTracedPEs;
+
+  int nextBinIndexCcs;
 public:
-  TraceSummaryBOC(void): count(0), bins(NULL), nBins(0), nTracedPEs(0) {};
+  TraceSummaryBOC(void): count(0), bins(NULL), nBins(0), 
+    nTracedPEs(0), nextBinIndexCcs(0) {};
   TraceSummaryBOC(CkMigrateMessage *m):CBase_TraceSummaryBOC(m) {};
   void startSumOnly();
   void askSummary(int size);
   void sendSummaryBOC(CkReductionMsg *);
+
+  void collectData(double startTime, double binSize, int numBins);
 private:
   void write();
 };