Added ability for trace summary to use CCS to stream performance data to
[charm.git] / src / ck-perf / trace-summaryBOC.h
1 #include "ckcallback-ccs.h"
2 #include "TraceSummary.decl.h"
3
4 extern CkGroupID traceSummaryGID;
5 extern CProxy_TraceSummaryInit initProxy;
6 extern bool summaryCcsStreaming;
7
8 void masterCollectData(void *data, double currT);
9
10 class TraceSummaryInit : public Chare {
11  public:
12   int lastRequestedIndexBlock;
13   int indicesPerBlock;
14   double collectionGranularity;
15   CkVec<double> *ccsBufferedData;
16   int nBufferedBins;
17  public:
18   TraceSummaryInit(CkArgMsg *m) {
19     lastRequestedIndexBlock = 0;
20     indicesPerBlock = 1000;
21     collectionGranularity = 0.001; // time in seconds
22     nBufferedBins = 0;
23     traceSummaryGID = CProxy_TraceSummaryBOC::ckNew();
24     CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
25     initProxy = thishandle;
26     CkCallback *cb = new CkCallback(CkIndex_TraceSummaryBOC::sendSummaryBOC(NULL), 0, sumProxy);
27     CProxy_TraceSummaryBOC(traceSummaryGID).ckSetReductionClient(cb);
28     if (CmiGetArgFlagDesc(m->argv,"+sumCCS",
29                           "CCS client connected to Trace Summary")) {
30       ccsBufferedData = new CkVec<double>();
31       CkPrintf("Trace Summary listening in for CCS Client\n");
32       CcsRegisterHandler("CkPerfSummaryCcsClientCB", 
33                          CkCallback(CkIndex_TraceSummaryInit::ccsClientRequest(NULL), thishandle));
34       CcdCallOnConditionKeep(CcdPERIODIC_1second, masterCollectData, 
35                              (void *)this);
36       summaryCcsStreaming = CmiTrue;
37     } else {
38       summaryCcsStreaming = CmiFalse;
39     }
40   }
41   TraceSummaryInit(CkMigrateMessage *m):Chare(m) {}
42   void dataCollected(CkReductionMsg *);  
43   void ccsClientRequest(CkCcsRequestMsg *m);
44 };
45
46 void masterCollectData(void *data, double currT) {
47   // CkPrintf("collectData called\n");
48   TraceSummaryInit *sumInitObj = (TraceSummaryInit *)data;
49
50   double startTime = sumInitObj->lastRequestedIndexBlock*
51     sumInitObj->collectionGranularity*sumInitObj->indicesPerBlock;
52   int numIndicesToGet = (int)floor((currT - startTime)/
53                                    sumInitObj->collectionGranularity);
54   int numBlocksToGet = numIndicesToGet/sumInitObj->indicesPerBlock;
55   // **TODO** consider limiting the total number of blocks each collection
56   //   request will pick up. This is to limit the amount of perturbation
57   //   if it proves to be a problem.
58   CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
59   sumProxy.collectData(startTime, 
60                        sumInitObj->collectionGranularity,
61                        numBlocksToGet*sumInitObj->indicesPerBlock);
62   // assume success
63   sumInitObj->lastRequestedIndexBlock += numBlocksToGet; 
64 }
65
66 class TraceSummaryBOC : public CBase_TraceSummaryBOC {
67 private:
68   int count;
69   BinEntry *bins;
70   int  nBins;
71   int nTracedPEs;
72
73   int nextBinIndexCcs;
74 public:
75   TraceSummaryBOC(void): count(0), bins(NULL), nBins(0), 
76     nTracedPEs(0), nextBinIndexCcs(0) {};
77   TraceSummaryBOC(CkMigrateMessage *m):CBase_TraceSummaryBOC(m) {};
78   void startSumOnly();
79   void askSummary(int size);
80   void sendSummaryBOC(CkReductionMsg *);
81
82   void collectData(double startTime, double binSize, int numBins);
83 private:
84   void write();
85 };
86
87