Adding CCS handlers that export trace information to projections as the program runs...
authorIsaac Dooley <idooley2@illinois.edu>
Sun, 10 May 2009 23:50:01 +0000 (23:50 +0000)
committerIsaac Dooley <idooley2@illinois.edu>
Sun, 10 May 2009 23:50:01 +0000 (23:50 +0000)
src/ck-perf/trace-summary.C
src/ck-perf/trace-summary.ci
src/ck-perf/trace-summary.h
src/ck-perf/trace-summaryBOC.h

index f1fc6939f2ffc42b9ee3bf6188e75b2dd47ef656..6d42cb54d83f829fa9c05c74403c3a1eb14df1df 100644 (file)
@@ -21,7 +21,8 @@
 #define INVALIDEP     -2
 #define TRACEON_EP     -3
 
-#define DefaultBinCount      10000
+// 5 minutes of run before it'll fill up:
+#define DefaultBinCount      (1000*60*5) 
 
 CkpvStaticDeclare(TraceSummary*, _trace);
 static int _numEvents = 0;
@@ -786,6 +787,13 @@ void TraceSummaryBOC::initCCS() {
     CkPrintf("Trace Summary now listening in for CCS Client\n");
     CcsRegisterHandler("CkPerfSummaryCcsClientCB", 
                       CkCallback(CkIndex_TraceSummaryBOC::ccsClientRequest(NULL), sumProxy[0]));
+    CcsRegisterHandler("CkPerfSummaryCcsClientCB uchar", 
+                      CkCallback(CkIndex_TraceSummaryBOC::ccsClientRequestUnsignedChar(NULL), sumProxy[0])); 
+    CcsRegisterHandler("CkPerfSumDetail uchar", 
+                      CkCallback(CkIndex_TraceSummaryBOC::ccsClientRequestSumDetailUnsignedChar(NULL), sumProxy[0])); 
+    CcsRegisterHandler("CkPerfSumDetail compressed", 
+                      CkCallback(CkIndex_TraceSummaryBOC::ccsClientRequestSumDetailCompressed(NULL), sumProxy[0])); 
+
     CcdCallOnConditionKeep(CcdPERIODIC_1second, startCollectData,
                           (void *)this);
     summaryCcsStreaming = CmiTrue;
@@ -795,9 +803,7 @@ void TraceSummaryBOC::initCCS() {
 void TraceSummaryBOC::ccsClientRequest(CkCcsRequestMsg *m) {
   double *sendBuffer;
 
-  CkPrintf("[%d] Request from Client detected. Data reads as:\n", CkMyPe());
-  CkPrintf("%s\n",m->data);
-  CkPrintf("\n");
+  CkPrintf("[%d] Request from Client detected. Client message: %s\n", CkMyPe(), m->data);
 
   CkPrintf("Responding ...\n");
   int datalength = 0;
@@ -819,6 +825,150 @@ void TraceSummaryBOC::ccsClientRequest(CkCcsRequestMsg *m) {
   delete m;
 }
 
+
+void TraceSummaryBOC::ccsClientRequestUnsignedChar(CkCcsRequestMsg *m) {
+  unsigned char *sendBuffer;
+
+  CkPrintf("[%d] Request from Client detected. Client message: %s\n", CkMyPe(), m->data);
+
+  CkPrintf("Responding ...\n");
+  int datalength = 0;
+
+  if (ccsBufferedData->length() == 0) {
+    sendBuffer = new unsigned char[1];
+    sendBuffer[0] = 255;
+    datalength = sizeof(unsigned char);
+    CcsSendDelayedReply(m->reply, datalength, (void *)sendBuffer);
+    delete [] sendBuffer;
+  } else {
+    double * doubleData = ccsBufferedData->getVec();
+    int numData = ccsBufferedData->length();
+    
+    // pack data into unsigned char array
+    sendBuffer = new unsigned char[numData];
+    
+    for(int i=0;i<numData;i++){
+      sendBuffer[i] = 1000.0 * doubleData[i] / (double)CkNumPes() * 200.0; // max = 200 is the same as 100% utilization
+      int v = sendBuffer[i];
+    }    
+
+    datalength = sizeof(unsigned char) * numData;
+    
+    CcsSendDelayedReply(m->reply, datalength, (void *)sendBuffer);
+    ccsBufferedData->free();
+    delete [] sendBuffer;
+  }
+  CkPrintf("Response Sent. Proceeding with computation.\n");
+  delete m;
+}
+
+
+
+void TraceSummaryBOC::ccsClientRequestSumDetailUnsignedChar(CkCcsRequestMsg *m) {
+  unsigned char *sendBuffer;
+
+  CkPrintf("[%d] Request from Client for sum detail data as unsigned chars. Client message: %s\n", CkMyPe(), m->data);
+
+  CkAssert(sumDetail);
+
+  int _numEntries=_entryTable.size();
+
+  static int previouslySentBins = 0;
+  SumLogPool * p = CkpvAccess(_trace)->pool();
+  int numBinsAvailable = p->getNumEntries();
+  int binsToSend = numBinsAvailable - previouslySentBins;
+  CkPrintf("_numEntries=%d, binsToSend=%d\n", _numEntries, binsToSend);
+
+  if (binsToSend < 1) {
+    sendBuffer = new unsigned char[1];
+    sendBuffer[0] = 255;
+    int datalength = sizeof(unsigned char);
+    CcsSendDelayedReply(m->reply, datalength, (void *)sendBuffer);
+    delete [] sendBuffer;
+  } else {
+    
+    sendBuffer = new unsigned char[_numEntries * binsToSend + 1];
+    sendBuffer[0] = _numEntries;
+
+    for(int i=0; i<binsToSend; i++) {
+      for(int e=0; e<_numEntries; e++) {
+       unsigned char u= p->getUtilization(i,e) * 2.0;
+       sendBuffer[2+i*_numEntries+e] = u;
+      }
+    }
+
+    previouslySentBins += binsToSend;
+
+    int datalength = sizeof(unsigned char) * (_numEntries * binsToSend + 1);
+    CcsSendDelayedReply(m->reply, datalength, (void *)sendBuffer);
+    delete [] sendBuffer;
+  }
+
+  CkPrintf("Response Sent. Proceeding with computation.\n");
+  delete m;
+}
+
+/**
+
+The data format sent by this handler is a bunch of records(one for each bin) of the following format:
+#samples (EP,utilization)* 
+
+
+ */
+void TraceSummaryBOC::ccsClientRequestSumDetailCompressed(CkCcsRequestMsg *m) {
+  unsigned char *sendBuffer;
+
+  CkPrintf("[%d] Request from Client for sum detail data as unsigned chars. Client message: %s\n", CkMyPe(), m->data);
+
+  CkAssert(sumDetail);
+
+  int _numEntries=_entryTable.size();
+
+  static int previouslySentBins = 0;
+  SumLogPool * p = CkpvAccess(_trace)->pool();
+  int numBinsAvailable = p->getNumEntries();
+  int binsToSend = numBinsAvailable - previouslySentBins;
+  CkPrintf("_numEntries=%d, binsToSend=%d\n", _numEntries, binsToSend);
+
+  if (binsToSend < 1) {
+    sendBuffer = new unsigned char[1];
+    sendBuffer[0] = 255;
+    int datalength = sizeof(unsigned char);
+    CcsSendDelayedReply(m->reply, datalength, (void *)sendBuffer);
+    delete [] sendBuffer;
+  } else {
+    
+    sendBuffer = new unsigned char[3*_numEntries * binsToSend];
+    int nextPos;
+    int startOfCurrentRecord = 0;
+
+    for(int i=0; i<binsToSend; i++) {
+      // Create a record for bin i
+      sendBuffer[startOfCurrentRecord] = 0; // The number of entries in this record
+      nextPos = startOfCurrentRecord+1;
+      for(int e=0; e<_numEntries; e++) {
+       unsigned char u= p->getUtilization(i,e) * 2.0;
+       if(u > 0) {
+         sendBuffer[nextPos] = e;
+         sendBuffer[nextPos+1] = u;
+         nextPos += 2;
+         sendBuffer[startOfCurrentRecord] ++;
+       }
+      }
+      startOfCurrentRecord = nextPos;
+    }
+
+    previouslySentBins += binsToSend;
+
+    int datalength = sizeof(unsigned char) * nextPos;
+    CcsSendDelayedReply(m->reply, datalength, (void *)sendBuffer);
+    delete [] sendBuffer;
+  }
+
+  CkPrintf("Response Sent. Proceeding with computation.\n");
+  delete m;
+}
+
 void startCollectData(void *data, double currT) {
   CkAssert(CkMyPe() == 0);
   TraceSummaryBOC *sumObj = (TraceSummaryBOC *)data;
index 5aa574c84f2b7f3f35ffd0e5a3c37edf4d16a8e1..7eeb6eea15610c46451c8cd19ef9c045cd122ed2 100644 (file)
@@ -17,7 +17,13 @@ module TraceSummary {
     entry void sendSummaryBOC(CkReductionMsg *);
 
     entry void initCCS();
+
+    // The ccs handlers:
     entry void ccsClientRequest(CkCcsRequestMsg *m);
+    entry void ccsClientRequestUnsignedChar(CkCcsRequestMsg *m);
+    entry void ccsClientRequestSumDetailUnsignedChar(CkCcsRequestMsg *m);
+    entry void ccsClientRequestSumDetailCompressed(CkCcsRequestMsg *m);
+
     entry void collectData(double startTime, double binSize, int numBins);
     entry void dataCollected(CkReductionMsg *);
   };
index 350adfeb47c4508c53c40812838a8fae044732fe..ab71834d3ce152fb45bf047759238677d5752792 100644 (file)
@@ -192,22 +192,6 @@ class SumLogPool {
     double *cpuTime;    //[MAX_INTERVALS * MAX_ENTRIES];
     int *numExecutions; //[MAX_INTERVALS * MAX_ENTRIES];
 
-    inline double getCPUtime(unsigned int interval, unsigned int ep){
-        return cpuTime[interval*epInfoSize+ep]; }
-    inline void setCPUtime(unsigned int interval, unsigned int ep, double val){
-        cpuTime[interval*epInfoSize+ep] = val; }
-    inline double addToCPUtime(unsigned int interval, unsigned int ep, double val){
-        cpuTime[interval*epInfoSize+ep] += val;
-        return cpuTime[interval*epInfoSize+ep]; }
-    inline int getNumExecutions(unsigned int interval, unsigned int ep){
-        return numExecutions[interval*epInfoSize+ep]; }
-    inline void setNumExecutions(unsigned int interval, unsigned int ep, unsigned int val){
-        numExecutions[interval*epInfoSize+ep] = val; }
-    inline int incNumExecutions(unsigned int interval, unsigned int ep){
-        ++numExecutions[interval*epInfoSize+ep];
-        return numExecutions[interval*epInfoSize+ep]; }
-    inline int getUtilization(int interval, int ep);
-
   public:
     SumLogPool(char *pgm);
     ~SumLogPool();
@@ -232,6 +216,25 @@ class SumLogPool {
     inline double getTime(unsigned int interval) {
       return pool[interval].time();
     }
+
+
+    inline double getCPUtime(unsigned int interval, unsigned int ep){
+        return cpuTime[interval*epInfoSize+ep]; }
+    inline void setCPUtime(unsigned int interval, unsigned int ep, double val){
+        cpuTime[interval*epInfoSize+ep] = val; }
+    inline double addToCPUtime(unsigned int interval, unsigned int ep, double val){
+        cpuTime[interval*epInfoSize+ep] += val;
+        return cpuTime[interval*epInfoSize+ep]; }
+    inline int getNumExecutions(unsigned int interval, unsigned int ep){
+        return numExecutions[interval*epInfoSize+ep]; }
+    inline void setNumExecutions(unsigned int interval, unsigned int ep, unsigned int val){
+        numExecutions[interval*epInfoSize+ep] = val; }
+    inline int incNumExecutions(unsigned int interval, unsigned int ep){
+        ++numExecutions[interval*epInfoSize+ep];
+        return numExecutions[interval*epInfoSize+ep]; }
+    inline int getUtilization(int interval, int ep);
+
+
 };
 
 /// class for recording trace summary events 
index 33cc5123146bbee27fdde456faa3d3b3012e9822..b1b6f948580ddd427fee1aae8750c0ff609a2284 100644 (file)
@@ -46,6 +46,10 @@ public:
   /* CCS support methods/entry methods */
   void initCCS();
   void ccsClientRequest(CkCcsRequestMsg *m);
+  void ccsClientRequestUnsignedChar(CkCcsRequestMsg *m);
+  void ccsClientRequestSumDetailUnsignedChar(CkCcsRequestMsg *m);
+  void ccsClientRequestSumDetailCompressed(CkCcsRequestMsg *m);
+
   void collectData(double startTime, double binSize, int numBins);
   void dataCollected(CkReductionMsg *);
 private: