Added handling of phases in projections instrumentation.
authorChee Wai Lee <cheewai1972@gmail.com>
Thu, 10 Dec 2009 07:13:48 +0000 (01:13 -0600)
committerChee Wai Lee <cheewai1972@gmail.com>
Thu, 10 Dec 2009 07:13:48 +0000 (01:13 -0600)
Added latest version of online cluster analysis code. This has not been
fully tested for correctness, but will produce results. The correctness
of basic projections tracing and analysis has not been compromised.

src/ck-perf/trace-common.C
src/ck-perf/trace-common.h
src/ck-perf/trace-projections.C
src/ck-perf/trace-projections.ci
src/ck-perf/trace-projections.h
src/ck-perf/trace-projectionsBOC.h
src/ck-perf/trace.h
src/ck-perf/tracef_f.f90

index a128aba30929f618b2d561970cb0566766cbc537..caccfaacd711792093e7c4d4bd37d22022d169c0 100644 (file)
@@ -485,6 +485,13 @@ void traceMemoryUsage()
 #endif
 }
 
+extern "C"
+void tracePhaseEnd()
+{
+#ifndef CMK_OPTIMIZE
+  _TRACE_ONLY(CkpvAccess(_traces)->endPhase());
+#endif CMK_OPTIMIZE
+}
 
 extern "C"
 void registerMachineUserEventsFunction(void (*eventRegistrationFunc)()) {
index 14d91e4c909c33467e8bb8eecb68aa971126afca..1b717bff813f43e3cd2d18b6ba8f0924c598acca 100644 (file)
@@ -55,7 +55,9 @@
 /* Trace user supplied note (text string, with start, end times, and user event id)  */
 #define USER_SUPPLIED_BRACKETED_NOTE       29
 
-
+/* Support for Phases and time-partial logs */
+#define END_PHASE           30
+#define SURROGATE_BLOCK     31 /* inserted by cluster analysis only */
 
 #define  USER_EVENT_PAIR    100
 
index bcc9017c4e61e0b26d68647f5746de2103320b7e..11dab2662e662c268e0ffdc844631b378421f7f6 100644 (file)
@@ -1,8 +1,8 @@
 /*****************************************************************************
- * $Source$
- * $Author$
- * $Date$
- * $Revision$
+ * $Source: /cvsroot/charm/src/ck-perf/trace-projections.C,v $
+ * $Author: gioachin $
+ * $Date: 2009-08-20 01:09:41 $
+ * $Revision: 2.134 $
  *****************************************************************************/
 
 
 #include "trace-projections.h"
 #include "trace-projectionsBOC.h"
 
-#define DEBUGF(x)           // CmiPrintf x
+#if DEBUG_PROJ
+#define DEBUGF(format, ...) CkPrintf(format, ## __VA_ARGS__)
+#else
+#define DEBUGF(format, ...)           // CmiPrintf x
+#endif
+#define DEBUGN(format, ...)  // easy way to selectively disable DEBUGs
 
 #define DefaultLogBufSize      1000000
 
@@ -29,12 +34,24 @@ int nonDeltaLog;
 
 int checknested=0;             // check illegal nested begin/end execute 
 
+#ifdef PROJ_ANALYSIS
+// BOC operations readonlys
 CkGroupID traceProjectionsGID;
+CkGroupID kMeansGID;
+
+// New reduction type for Outlier Analysis purposes. This is allowed to be
+// a global variable according to the Charm++ manual.
+CkReductionMsg *outlierReduction(int nMsgs,
+                                CkReductionMsg **msgs);
+CkReductionMsg *minMaxReduction(int nMsgs,
+                               CkReductionMsg **msgs);
+CkReduction::reducerType outlierReductionType;
+CkReduction::reducerType minMaxReductionType;
+#endif // PROJ_ANALYSIS
 
 CkpvStaticDeclare(TraceProjections*, _trace);
 CtvStaticDeclare(int,curThreadEvent);
 
-CkpvDeclare(bool, useOutlierAnalysis);
 CkpvDeclare(int, CtrLogBufSize);
 
 typedef CkVec<char *>  usrEventVec;
@@ -47,8 +64,6 @@ public:
 };
 CkpvStaticDeclare(CkVec<UsrEvent *>*, usrEvents);
 
-
-
 /// Disable the outputting of the trace logs
 void disableTraceLogOutput()
 { 
@@ -59,18 +74,8 @@ void disableTraceLogOutput()
 void enableTraceLogOutput()
 {
   CkpvAccess(_trace)->setWriteData(true);
-
 }
 
-/// Force the log files to be flushed
-void flushTraceLog()
-{
-  CkpvAccess(_trace)->traceFlushLog();
-}
-
-
-
-
 #ifdef CMK_OPTIMIZE
 static int warned=0;
 #define OPTIMIZED_VERSION      \
@@ -78,7 +83,7 @@ static int warned=0;
        CmiPrintf("\n\n!!!! Warning: traceUserEvent not available in optimized version!!!!\n\n\n"); }
 #else
 #define OPTIMIZED_VERSION /*empty*/
-#endif
+#endif // CMK_OPTIMIZE
 
 /*
 On T3E, we need to have file number control by open/close files only when needed.
@@ -89,13 +94,13 @@ On T3E, we need to have file number control by open/close files only when needed
 #else
   #define OPEN_LOG
   #define CLOSE_LOG
-#endif
+#endif //CMK_TRACE_LOGFILE_NUM_CONTROL
 
 #if CMK_HAS_COUNTER_PAPI
 int numPAPIEvents = 2;
 int papiEvents[] = { PAPI_L3_DCM, PAPI_FP_OPS };
 char *papiEventNames[] = {"PAPI_L3_DCM", "PAPI_FP_OPS"};
-#endif
+#endif // CMK_HAS_COUNTER_PAPI
 
 /**
   For each TraceFoo module, _createTraceFoo() must be defined.
@@ -103,14 +108,14 @@ char *papiEventNames[] = {"PAPI_L3_DCM", "PAPI_FP_OPS"};
 */
 void _createTraceprojections(char **argv)
 {
-  DEBUGF(("%d createTraceProjections\n", CkMyPe()));
+  DEBUGF("%d createTraceProjections\n", CkMyPe());
   CkpvInitialize(CkVec<char *>, usrEventlist);
   CkpvInitialize(CkVec<UsrEvent *>*, usrEvents);
   CkpvAccess(usrEvents) = new CkVec<UsrEvent *>();
 #if CMK_BLUEGENE_CHARM
   // CthRegister does not call the constructor
 //  CkpvAccess(usrEvents) = CkVec<UsrEvent *>();
-#endif
+#endif //CMK_BLUEGENE_CHARM
   CkpvInitialize(TraceProjections*, _trace);
   CkpvAccess(_trace) = new  TraceProjections(argv);
   CkpvAccess(_traces)->addTrace(CkpvAccess(_trace));
@@ -128,7 +133,6 @@ struct TraceThreadListener {
   CmiObjId idx;
 };
 
-
 extern "C"
 void traceThreadListener_suspend(struct CthThreadListener *l)
 {
@@ -265,6 +269,11 @@ LogPool::LogPool(char *pgm) {
   timeErr = 0.0;
   globalEndTime = 0.0;
   headerWritten = 0;
+  numPhases = 0;
+  hasFlushed = false;
+
+  keepPhase = NULL;
+
   fileCreated = false;
   poolSize = CkpvAccess(CtrLogBufSize);
   pgmname = new char[strlen(pgm)+1];
@@ -454,11 +463,34 @@ void LogPool::write(int writedelta)
     p = new toProjectionsFile(writedelta?deltafp:fp);
   }
   CmiAssert(p);
+  int curPhase = 0;
+  // **FIXME** - Should probably consider a more sophisticated bounds-based
+  //   approach for selective writing instead of making multiple if-checks
+  //   for every single event.
   for(UInt i=0; i<numEntries; i++) {
     if (!writedelta) {
-      pool[i].pup(*p);
+      if (keepPhase == NULL) {
+       // default case, when no phase selection is required.
+       pool[i].pup(*p);
+      } else {
+       // **FIXME** Might be a good idea to create a "filler" event block for
+       //   all the events taken out by phase filtering.
+       if (pool[i].type == END_PHASE) {
+         // always write phase markers
+         pool[i].pup(*p);
+         curPhase++;
+       } else if (pool[i].type == BEGIN_COMPUTATION ||
+                  pool[i].type == END_COMPUTATION) {
+         // always write BEGIN and END COMPUTATION markers
+         pool[i].pup(*p);
+       } else if (keepPhase[curPhase]) {
+         pool[i].pup(*p);
+       }
+      }
     }
     else {     // delta
+      // **FIXME** Implement phase-selective writing for delta logs
+      //   eventually
       double time = pool[i].time;
       if (pool[i].type != BEGIN_COMPUTATION && pool[i].type != END_COMPUTATION)
       {
@@ -477,6 +509,7 @@ void LogPool::write(int writedelta)
     }
   }
   delete p;
+  delete [] keepPhase;
 }
 
 void LogPool::writeSts(void)
@@ -487,6 +520,7 @@ void LogPool::writeSts(void)
   // generate an automatic unique ID for each log
   fprintf(stsfp, "PROJECTIONS_ID %s\n", "");
   fprintf(stsfp, "VERSION %s\n", PROJECTION_VERSION);
+  fprintf(stsfp, "TOTAL_PHASES %d\n", numPhases);
 #if CMK_HAS_COUNTER_PAPI
   fprintf(stsfp, "TOTAL_PAPI_EVENTS %d\n", numPAPIEvents);
   // for now, use i, next time use papiEvents[i].
@@ -520,14 +554,16 @@ void LogPool::writeSts(TraceProjections *traceProj){
 
 void LogPool::writeRC(void)
 {
-  CkAssert(CkMyPe() == 0);
-  fprintf(rcfp,"RC_GLOBAL_END_TIME %lld\n",
-         (CMK_TYPEDEF_UINT8)(1.0e6*globalEndTime));
-  if (CkpvAccess(useOutlierAnalysis)) {
-    fprintf(rcfp,"RC_OUTLIER_FILTERED true\n");
-  } else {
-    fprintf(rcfp,"RC_OUTLIER_FILTERED false\n");
-  }      
+#ifdef PROJ_ANALYSIS  
+  //  CkAssert(CkMyPe() == 0);
+  //  fprintf(rcfp,"RC_GLOBAL_END_TIME %lld\n",
+  //     (CMK_TYPEDEF_UINT8)(1.0e6*globalEndTime));
+  //  if (CkpvAccess(_trace)->isOutlierAutomatic()) {
+  //    fprintf(rcfp,"RC_OUTLIER_FILTERED true\n");
+  //  } else {
+  //    fprintf(rcfp,"RC_OUTLIER_FILTERED false\n");
+  //  }
+#endif //PROJ_ANALYSIS
   fclose(rcfp);
 }
 
@@ -551,6 +587,7 @@ void LogPool::flushLogBuffer()
   if (numEntries) {
     double writeTime = TraceTimer();
     writeLog();
+    hasFlushed = true;
     numEntries = 0;
     new (&pool[numEntries++]) LogEntry(writeTime, BEGIN_INTERRUPT);
     new (&pool[numEntries++]) LogEntry(TraceTimer(), END_INTERRUPT);
@@ -563,6 +600,9 @@ void LogPool::add(UChar type, UShort mIdx, UShort eIdx,
 {
   new (&pool[numEntries++])
     LogEntry(time, type, mIdx, eIdx, event, pe, ml, id, recvT, cpuT, numPe);
+  if ((type == END_PHASE) || (type == END_COMPUTATION)) {
+    numPhases++;
+  }
   if(poolSize==numEntries) {
     flushLogBuffer();
 #if CMK_BLUEGENE_CHARM
@@ -633,25 +673,8 @@ void LogPool::addUserSuppliedNote(char *note){
 void LogPool::addUserSuppliedBracketedNote(char *note, int eventID, double bt, double et){
   //CkPrintf("LogPool::addUserSuppliedBracketedNote eventID=%d\n", eventID);
 #ifndef CMK_BLUEGENE_CHARM
-#if CMK_SMP_TRACE_COMMTHREAD && MPI_SMP_TRACE_COMMTHREAD_HACK  
-//This part of code is used  to combine the contiguous
-//MPI_Test and MPI_Iprobe events to reduce the number of
-//entries
-#define MPI_TEST_EVENT_ID 60
-#define MPI_IPROBE_EVENT_ID 70 
-int lastEvent = pool[numEntries-1].event;
-if((eventID==MPI_TEST_EVENT_ID || eventID==MPI_IPROBE_EVENT_ID) && (eventID==lastEvent)){
-    //just replace the endtime of last event
-    //CkPrintf("addUserSuppliedBracketNote: for event %d\n", lastEvent);
-    pool[numEntries].endTime = et;
-}else{
   new (&pool[numEntries++])
        LogEntry(bt, et, USER_SUPPLIED_BRACKETED_NOTE, note, eventID);
-}
-#else
-  new (&pool[numEntries++])
-       LogEntry(bt, et, USER_SUPPLIED_BRACKETED_NOTE, note, eventID);
-#endif
   if(poolSize == numEntries){
     flushLogBuffer();
   }
@@ -881,6 +904,10 @@ void LogEntry::pup(PUP::er &p)
        p | itime;
        p | mIdx;
        break;
+    case END_PHASE:
+      p|eIdx; // FIXME: actually the phase ID
+      p|itime;
+      break;
     default:
       CmiError("***Internal Error*** Wierd Event %d.\n", type);
       break;
@@ -891,7 +918,8 @@ void LogEntry::pup(PUP::er &p)
 
 TraceProjections::TraceProjections(char **argv): 
   curevent(0), inEntry(0), computationStarted(0), 
-  converseExit(0), endTime(0.0), traceNestedEvents(0)
+       converseExit(0), endTime(0.0), traceNestedEvents(0),
+       currentPhaseID(0), lastPhaseEvent(NULL)
 {
   //  CkPrintf("Trace projections dummy constructor called on %d\n",CkMyPe());
 
@@ -899,11 +927,8 @@ TraceProjections::TraceProjections(char **argv):
 
   CtvInitialize(int,curThreadEvent);
   CkpvInitialize(int, CtrLogBufSize);
-  CkpvInitialize(bool, useOutlierAnalysis);
   CkpvAccess(CtrLogBufSize) = DefaultLogBufSize;
   CtvAccess(curThreadEvent)=0;
-  CkpvAccess(useOutlierAnalysis) =
-    CmiGetArgFlagDesc(argv, "+outlier", "Perform Outlier Analysis");
   if (CmiGetArgIntDesc(argv,"+logsize",&CkpvAccess(CtrLogBufSize), 
                       "Log entries to buffer per I/O")) {
     if (CkMyPe() == 0) {
@@ -944,7 +969,7 @@ TraceProjections::TraceProjections(char **argv):
   _logPool->setBinary(binary);
 #if CMK_PROJECTIONS_USE_ZLIB
   _logPool->setCompressed(compressed);
-#endif
+#endif CMK_PROJECTIONS_USE_ZLIB
   if (CkMyPe() == 0) {
     _logPool->createSts();
     _logPool->createRC();
@@ -972,7 +997,7 @@ TraceProjections::TraceProjections(char **argv):
   }
   papiValues = new long_long[numPAPIEvents];
   memset(papiValues, 0, numPAPIEvents*sizeof(long_long));
-#endif
+#endif CMK_HAS_COUNTER_PAPI
 }
 
 int TraceProjections::traceRegisterUserEvent(const char* evt, int e)
@@ -1014,25 +1039,32 @@ void TraceProjections::traceWriteSts(void)
     _logPool->writeSts(this);
 }
 
-// This is called when Converse closes during ConverseCommonExit()
-// 
-// Some programs bypass 
-// CkExit (like NAMD, which eventually calls ConverseExit), modules
-// like traces will have to pretend to shutdown as if CkExit was called
-// but at the same time avoid making subsequent CkExit calls (which is
-// usually required for allowing other modules to shutdown).
-//
-// Note that we can only get here if CkExit was not called, since the
-// trace module will un-register itself from TraceArray if it did.
+/** 
+ * **IMPT NOTES**:
+ *
+ * This is called when Converse closes during ConverseCommonExit().
+ * **FIXME**(?) - is this also exposed as a tracing-framework API call?
+ *
+ * Some programs bypass CkExit() (like NAMD, which eventually calls
+ * ConverseExit()), modules like traces will have to pretend to shutdown
+ * as if CkExit() was called but at the same time avoid making
+ * subsequent CkExit() calls (which is usually required for allowing
+ * other modules to shutdown).
+ *
+ * Note that we can only get here if CkExit() was not called, since the
+ * trace module will un-register itself from TraceArray if it did.
+ *
+ */
 void TraceProjections::traceClose(void)
 {
 #ifdef PROJ_ANALYSIS
   // CkPrintf("CkExit was not called on shutdown on [%d]\n", CkMyPe());
+
   // sets the flag that tells the code not to make the CkExit call later
   converseExit = 1;
   if (CkMyPe() == 0) {
     CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
-    bocProxy.shutdownAnalysis();
+    bocProxy.traceProjectionsParallelShutdown();
   }
 #else
   // we've already deleted the logpool, so multiple calls to traceClose
@@ -1050,7 +1082,12 @@ void TraceProjections::traceClose(void)
 #endif
 }
 
-// This is meant to be called internally rather than by converse.
+/**
+ *  **IMPT NOTES**:
+ *
+ *  This is meant to be called internally by the tracing framework.
+ *
+ */
 void TraceProjections::closeTrace() {
   //  CkPrintf("Close Trace called on [%d]\n", CkMyPe());
   if (CkMyPe() == 0) {
@@ -1073,19 +1110,6 @@ void TraceProjections::traceEnd(void)
   _logPool->add(END_TRACE, 0, 0, TraceTimer(), curevent++, CkMyPe());
 }
 
-#if CMK_SMP_TRACE_COMMTHREAD
-void TraceProjections::traceBeginOnCommThread()
-{
-  if (!computationStarted) return;
-  _logPool->add(BEGIN_TRACE, 0, 0, TraceTimer(), curevent++, CmiNumPes()+CmiMyNode());
-}
-
-void TraceProjections::traceEndOnCommThread()
-{
-  _logPool->add(END_TRACE, 0, 0, TraceTimer(), curevent++, CmiNumPes()+CmiMyNode());
-}
-#endif
-
 void TraceProjections::userEvent(int e)
 {
   if (!computationStarted) return;
@@ -1548,65 +1572,290 @@ void toProjectionsGZFile::bytes(void *p,int n,size_t itemSize,dataType t)
 }
 #endif
 
+void TraceProjections::endPhase() {
+  double currentPhaseTime = TraceTimer();
+  double lastPhaseTime = 0.0;
+
+  if (lastPhaseEvent != NULL) {
+    lastPhaseTime = lastPhaseEvent->time;
+  } else {
+    if (_logPool->pool != NULL) {
+      // assumed to be BEGIN_COMPUTATION
+      lastPhaseTime = _logPool->pool[0].time;
+    } else {
+      CkPrintf("[%d] Warning: End Phase encountered in an empty log. Inserting BEGIN_COMPUTATION event\n", CkMyPe());
+      _logPool->add(BEGIN_COMPUTATION, 0, 0, currentPhaseTime, -1, -1);
+      lastPhaseTime = currentPhaseTime;
+    }
+  }
+
+  /* Insert endPhase event here. */
+  /* FIXME: Format should be TYPE, PHASE#, TimeStamp, [StartTime] */
+  /*        We currently "borrow" from the standard add() method. */
+  /*        It should really be its own add() method.             */
+  /* NOTE: assignment to lastPhaseEvent is "pre-emptive".         */
+  lastPhaseEvent = &(_logPool->pool[_logPool->numEntries]);
+  _logPool->add(END_PHASE, 0, currentPhaseID, currentPhaseTime, -1, CkMyPe());
+  currentPhaseID++;
+}
+
 #ifdef PROJ_ANALYSIS
-void TraceProjectionsBOC::shutdownAnalysis() {
+// ***** FROM HERE, ALL BOC-BASED FUNCTIONALITY IS DEFINED *******
+
+
+// ***@@@@ REGISTRATION FUNCTIONS/METHODS @@@@***
+
+void registerOutlierReduction() {
+  outlierReductionType =
+    CkReduction::addReducer(outlierReduction);
+  minMaxReductionType =
+    CkReduction::addReducer(minMaxReduction);
+}
+
+/**
+ * **IMPT NOTES**:
+ *
+ * This is the C++ code that is registered to be activated at module
+ * shutdown. This is called exactly once on processor 0. Module shutdown
+ * is initiated as a result of a CkExit() call by the application code
+ * 
+ * The exit function must ultimately call CkExit() again to
+ * so that other module exit functions may proceed after this module is
+ * done.
+ *
+ */
+// FIXME: WHY extern "C"???
+extern "C" void TraceProjectionsExitHandler()
+{
+#ifndef CMK_OPTIMIZE
+  CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
+  bocProxy.traceProjectionsParallelShutdown();
+#else
+  CkExit();
+#endif CMK_OPTIMIZE
+}
+
+// This is called once on each processor but the idiom of use appears
+// to be to only have processor 0 register the function.
+//
+// See initnode in trace-projections.ci
+void initTraceProjectionsBOC()
+{
+  // CkPrintf("[%d] Trace Projections initialization called!\n", CkMyPe());
+#ifdef __BLUEGENE__
+  if (BgNodeRank() == 0) {
+#else
+    if (CkMyRank() == 0) {
+#endif __BLUEGENE__
+      registerExitFn(TraceProjectionsExitHandler);
+    }
+#if 0
+  } // this is so indentation does not get messed up
+#endif
+}
+
+// mainchare for trace-projections BOC-operations. 
+// Instantiated at processor 0 and ONLY resides on processor 0 for the 
+// rest of its life.
+//
+// Responsible for:
+//   1. Handling commandline arguments
+//   2. Creating any objects required for proper BOC operations.
+//
+TraceProjectionsInit::TraceProjectionsInit(CkArgMsg *msg) {
+  /** Options for Outlier Analysis */
+  // defaults. Things will change with support for interactive analysis.
+  bool findOutliers = false;
+  bool outlierAutomatic = true;
+  int numKSeeds = 10; 
+
+  int peNumKeep = CkNumPes();  // used as a default
+  double entryThreshold = 0.0;
+  bool outlierUsePhases = false;
+  if (outlierAutomatic) {
+    CmiGetArgIntDesc(msg->argv, "+outlierNumSeeds", &numKSeeds,
+                    "Number of cluster seeds to apply at outlier analysis.");
+    CmiGetArgIntDesc(msg->argv, "+outlierPeNumKeep", 
+                    &peNumKeep, "Number of Processors to retain data");
+    CmiGetArgDoubleDesc(msg->argv, "+outlierEpThresh", &entryThreshold,
+                       "Minimum significance of entry points to be considered for clustering (%).");
+    findOutliers =
+      CmiGetArgFlagDesc(msg->argv,"+outlier", "Find Outliers.");
+    outlierUsePhases = 
+      CmiGetArgFlagDesc(msg->argv,"+outlierUsePhases",
+                       "Apply automatic outlier analysis to any available phases.");
+    if (outlierUsePhases) {
+      // if the user wants to use an outlier feature, it is assumed outlier
+      //    analysis is desired.
+      findOutliers = true;
+    }
+  }
+  traceProjectionsGID = CProxy_TraceProjectionsBOC::ckNew(findOutliers);
+  if (findOutliers) {
+    kMeansGID = CProxy_KMeansBOC::ckNew(outlierAutomatic,
+                                       numKSeeds,
+                                       peNumKeep,
+                                       entryThreshold,
+                                       outlierUsePhases);
+  }
+};
+
+// Called on every processor.
+void TraceProjectionsBOC::traceProjectionsParallelShutdown() {
   if (CkMyPe() == 0) {
     analysisStartTime = CmiWallTimer();
   }
-  //  CkPrintf("Shutdown analysis called on [%d]\n",CkMyPe());
   CkpvAccess(_trace)->endComputation();
   // no more tracing for projections on this processor after this point. 
   // Note that clear must be called after remove, or bad things will happen.
   CkpvAccess(_traces)->removeTrace(CkpvAccess(_trace));
   CkpvAccess(_traces)->clearTrace();
 
-  // From this point, we start a chain of reductions and broadcasts to
-  // perform final online analysis activities, ending with endTimeReduction
-  if (CkpvAccess(useOutlierAnalysis)) {
-    thisProxy[CkMyPe()].startOutlierAnalysis();
+  // From this point, we start multiple chains of reductions and broadcasts to
+  // perform final online analysis activities.
+
+  // Start all parallel operations at once. 
+  //   These MUST NOT modify base performance data in LogPool. If they must,
+  //   then the parallel operations must be phased (and this code to be
+  //   restructured as necessary)
+  CProxy_KMeansBOC kMeansProxy(kMeansGID);
+  CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
+  if (findOutliers) {
+    parModulesRemaining++;
+    kMeansProxy[CkMyPe()].startKMeansAnalysis();
+  }
+  parModulesRemaining++;
+  bocProxy[CkMyPe()].startEndTimeAnalysis();
+}
+
+// Called on each processor
+void KMeansBOC::startKMeansAnalysis() {
+  // Initialize all necessary structures
+  LogPool *pool = CkpvAccess(_trace)->_logPool;
+
+ if(CkMyPe()==0)     CkPrintf("[%d] KMeansBOC::startKMeansAnalysis time=\t%g\n", CkMyPe(), CkWallTimer() );
+  int flushInt = 0;
+  if (pool->hasFlushed) {
+    flushInt = 1;
+  }
+  
+  CkCallback cb(CkIndex_KMeansBOC::flushCheck(NULL), 
+               0, thisProxy);
+  contribute(sizeof(int), &flushInt, CkReduction::logical_or, cb);  
+}
+
+// Called on processor 0
+void KMeansBOC::flushCheck(CkReductionMsg *msg) {
+  int someFlush = *((int *)msg->getData());
+
+  // if(CkMyPe()==0) CkPrintf("[%d] KMeansBOC::flushCheck time=\t%g\n", CkMyPe(), CkWallTimer() );
+  
+  if (someFlush == 0) {
+    // Data intact proceed with KMeans analysis
+    CProxy_KMeansBOC kMeansProxy(kMeansGID);
+    kMeansProxy.flushCheckDone();
   } else {
-    thisProxy[CkMyPe()].startEndTimeAnalysis();
+    // Some processor had flushed it data at some point, abandon KMeans
+    CkPrintf("Warning: Some processor has flushed its data. No KMeans will be conducted\n");
+    // terminate KMeans
+    CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
+    bocProxy[0].kMeansDone();
   }
 }
 
-void TraceProjectionsBOC::startOutlierAnalysis() {
-  // assumes the presence of the complete logs on this processor.
+// Called on each processor
+void KMeansBOC::flushCheckDone() {
+  // **FIXME** - more flexible metric collection scheme may be necessary
+  //   in the future for production use.
   LogPool *pool = CkpvAccess(_trace)->_logPool;
 
-  // this array stores entry method and idle times (hence +1) and 
-  // is twice as long to store their squares for statistical analysis
-  // when the reduction is performed.
-  int numEvents = _entryTable.size()+1;
-  execTimes = new double[numEvents*2];
-  for (int i=0; i<numEvents*2; i++) {
-    execTimes[i] = 0.0;
+  // if(CkMyPe()==0)     CkPrintf("[%d] KMeansBOC::flushCheckDone time=\t%g\n", CkMyPe(), CkWallTimer() );
+
+  numEntryMethods = _entryTable.size();
+  numMetrics = numEntryMethods + 2; // EPtime + idle and overhead
+
+  // maintained across phases
+  markedBegin = false;
+  markedIdle = false;
+  beginBlockTime = 0.0;
+  beginIdleBlockTime = 0.0;
+  lastBeginEPIdx = -1; // none
+
+  lastPhaseIdx = 0;
+  currentExecTimes = NULL;
+  currentPhase = 0;
+  selected = false;
+
+  pool->initializePhases();
+
+  // incoming K Seeds and the per-phase filter
+  incKSeeds = new double[numK*numMetrics];
+  keepMetric = new bool[numMetrics];
+
+  //  Something wrong when call thisProxy[CkMyPe()].getNextPhaseMetrics() !??!
+  //  CProxy_KMeansBOC kMeansProxy(kMeansGID);
+  //  kMeansProxy[CkMyPe()].getNextPhaseMetrics();
+  thisProxy[CkMyPe()].getNextPhaseMetrics();
+}
+
+// Called on each processor.
+void KMeansBOC::getNextPhaseMetrics() {
+  // Assumes the presence of the complete logs on this processor.
+  // Assumes first event is always BEGIN_COMPUTATION
+  // Assumes each processor sees the same number of phases.
+  //
+  // In this code, we collect performance data for this processor.
+  // All times are in seconds.
+
+  // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::getNextPhaseMetrics time=\t%g\n", CkMyPe(), CkWallTimer() );  
+
+  if (usePhases) {
+    DEBUGF("[%d] Using Phases\n", CkMyPe());
+  } else {
+    DEBUGF("[%d] NOT using Phases\n", CkMyPe());
+  }
+  
+  if (currentExecTimes != NULL) {
+    delete [] currentExecTimes;
+  }
+  currentExecTimes = new double[numMetrics];
+  for (int i=0; i<numMetrics; i++) {
+    currentExecTimes[i] = 0.0;
   }
 
-  bool markedBegin = false;
-  bool markedIdle = false;
-  double beginBlockTime = 0.0;
-  double beginIdleBlockTime = 0.0;
+  int numEventMethods = _entryTable.size();
+  LogPool *pool = CkpvAccess(_trace)->_logPool;
+  
+  CkAssert(pool->numEntries > lastPhaseIdx);
+  double startPhaseTime = pool->pool[lastPhaseIdx].time;
+  double totalPhaseTime = 0.0;
+  double totalActiveTime = 0.0; // entry method + idle
 
-  for (int i=0; i<pool->numEntries; i++) {
+  for (int i=lastPhaseIdx; i<pool->numEntries; i++) {
     if (pool->pool[i].type == BEGIN_PROCESSING) {
       // check pairing
       if (!markedBegin) {
        markedBegin = true;
       }
       beginBlockTime = pool->pool[i].time;
+      lastBeginEPIdx = pool->pool[i].eIdx;
     } else if (pool->pool[i].type == END_PROCESSING) {
       // check pairing
       // if End without a begin, just ignore
-      // this event.
+      //   this event. If a phase-boundary is crossed, the Begin
+      //   event would be maintained in beginBlockTime, so it is 
+      //   not a problem.
       if (markedBegin) {
        markedBegin = false;
        if (pool->pool[i].event < 0)
        {
-         // ignore dummy events
-         break;
+         // ignore dummy events. **FIXME** as they have no eIdx?
+         continue;
        }
-       execTimes[pool->pool[i].eIdx] +=
+       currentExecTimes[pool->pool[i].eIdx] += 
          pool->pool[i].time - beginBlockTime;
+       totalActiveTime += pool->pool[i].time - beginBlockTime;
+       lastBeginEPIdx = -1;
       }
     } else if (pool->pool[i].type == BEGIN_IDLE) {
       // check pairing
@@ -1618,264 +1867,891 @@ void TraceProjectionsBOC::startOutlierAnalysis() {
       // check pairing
       if (markedIdle) {
        markedIdle = false;
-       execTimes[numEvents] +=
+       currentExecTimes[numEventMethods] += 
          pool->pool[i].time - beginIdleBlockTime;
+       totalActiveTime += pool->pool[i].time - beginIdleBlockTime;
+      }
+    } else if (pool->pool[i].type == END_PHASE) {
+      // ignored when not using phases
+      if (usePhases) {
+       // when we've not visited this node before
+       if (i != lastPhaseIdx) { 
+         totalPhaseTime = 
+           pool->pool[i].time - pool->pool[lastPhaseIdx].time;
+         // it is important that proper accounting of time take place here.
+         // Note that END_PHASE events inevitably occur in the context of
+         //   some entry method by the way the tracing API is designed.
+         if (markedBegin) {
+           CkAssert(lastBeginEPIdx >= 0);
+           currentExecTimes[lastBeginEPIdx] += 
+             pool->pool[i].time - beginBlockTime;
+           totalActiveTime += pool->pool[i].time - beginBlockTime;
+           // this is so the remainder contributes to the next phase
+           beginBlockTime = pool->pool[i].time;
+         }
+         // The following is unlikely, but stranger things have happened.
+         if (markedIdle) {
+           currentExecTimes[numEventMethods] +=
+             pool->pool[i].time - beginIdleBlockTime;
+           totalActiveTime += pool->pool[i].time - beginIdleBlockTime;
+           // this is so the remainder contributes to the next phase
+           beginIdleBlockTime = pool->pool[i].time;
+         }
+         if (totalActiveTime <= totalPhaseTime) {
+           currentExecTimes[numEventMethods+1] = 
+             totalPhaseTime - totalActiveTime;
+         } else {
+           currentExecTimes[numEventMethods+1] = 0.0;
+           CkPrintf("[%d] Warning: Overhead found to be negative for Phase %d!\n",
+                    CkMyPe(), currentPhase);
+         }
+         collectKMeansData();
+         // end the loop (and method) and defer the work till the next call
+         lastPhaseIdx = i;
+         break; 
+       }
       }
+    } else if (pool->pool[i].type == END_COMPUTATION) {
+      if (markedBegin) {
+       CkAssert(lastBeginEPIdx >= 0);
+       currentExecTimes[lastBeginEPIdx] += 
+         pool->pool[i].time - beginBlockTime;
+       totalActiveTime += pool->pool[i].time - beginBlockTime;
+      }
+      if (markedIdle) {
+       currentExecTimes[numEventMethods] +=
+         pool->pool[i].time - beginIdleBlockTime;
+       totalActiveTime += pool->pool[i].time - beginIdleBlockTime;
+      }
+      totalPhaseTime = 
+       pool->pool[i].time - pool->pool[lastPhaseIdx].time;
+      if (totalActiveTime <= totalPhaseTime) {
+       currentExecTimes[numEventMethods+1] = totalPhaseTime - totalActiveTime;
+      } else {
+       currentExecTimes[numEventMethods+1] = 0.0;
+       CkPrintf("[%d] Warning: Overhead found to be negative!\n",
+                CkMyPe());
+      }
+      collectKMeansData();
+    }
+  }
+}
+
+/**
+ *  Through a reduction, collectKMeansData aggregates each processors' data
+ *  in order for global properties to be determined:
+ *  
+ *  1. min & max to determine normalization factors.
+ *  2. sum to determine global EP averages for possible metric reduction
+ *       through thresholding.
+ *  3. sum of squares to compute stddev which may be useful in the future.
+ *
+ *  collectKMeansData will also keep the processor's data for the current
+ *    phase so that it may be normalized and worked on subsequently.
+ *
+ **/
+void KMeansBOC::collectKMeansData() {
+  int minOffset = numMetrics;
+  int maxOffset = 2*numMetrics;
+  int sosOffset = 3*numMetrics; // sos = Sum Of Squares
+
+  // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::collectKMeansData time=\tg\n", CkMyPe(), CkWallTimer() );
+
+  double *reductionMsg = new double[numMetrics*4];
+
+  for (int i=0; i<numMetrics; i++) {
+    reductionMsg[i] = currentExecTimes[i];
+    // duplicate the event times for max and min sections of the reduction
+    reductionMsg[minOffset + i] = currentExecTimes[i];
+    reductionMsg[maxOffset + i] = currentExecTimes[i];
+    // compute squares
+    reductionMsg[sosOffset + i] = currentExecTimes[i]*currentExecTimes[i];
+  }
+
+  CkCallback cb(CkIndex_KMeansBOC::globalMetricRefinement(NULL), 
+               0, thisProxy);
+  contribute((numMetrics*4)*sizeof(double), reductionMsg, 
+            outlierReductionType, cb);  
+}
+
+// The purpose is mainly to initialize the k seeds and generate
+//   normalization parameters for each of the metrics. The k seeds
+//   and normalization parameters are broadcast to all processors.
+//
+// Called on processor 0
+void KMeansBOC::globalMetricRefinement(CkReductionMsg *msg) {
+  CkAssert(CkMyPe() == 0);
+  
+  // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::globalMetricRefinement time=\t%g\n", CkMyPe(), CkWallTimer() );
+
+  int sumOffset = 0;
+  int minOffset = numMetrics;
+  int maxOffset = 2*numMetrics;
+  int sosOffset = 3*numMetrics; // sos = Sum Of Squares
+
+  // calculate statistics & boundaries for the k seeds for clustering
+  KMeansStatsMessage *outmsg = 
+    new (numMetrics, numK*numMetrics, numMetrics*4) KMeansStatsMessage;
+  outmsg->numMetrics = numMetrics;
+  outmsg->numKPos = numK*numMetrics;
+  outmsg->numStats = numMetrics*4;
+
+  // Sum | Min | Max | Sum of Squares
+  double *totalExecTimes = (double *)msg->getData();
+  double totalTime = 0.0;
+
+  for (int i=0; i<numMetrics; i++) {
+    DEBUGN("%lf\n", totalExecTimes[i]);
+    totalTime += totalExecTimes[i];
+
+    // calculate event mean over all processors
+    outmsg->stats[sumOffset + i] = totalExecTimes[sumOffset + i]/CkNumPes();
+
+    // get the ranges and offsets of each metric. With this, we get
+    //   normalization factors that can be sent back to each processor to
+    //   be used as necessary. We reuse max for range. Min remains the offset.
+    outmsg->stats[minOffset + i] = totalExecTimes[minOffset + i];
+    outmsg->stats[maxOffset + i] = totalExecTimes[maxOffset + i] -
+      totalExecTimes[minOffset + i];
+    
+    // calculate stddev (using biased variance)
+    outmsg->stats[sosOffset + i] = 
+      sqrt((totalExecTimes[sosOffset + i] - 
+           2*(outmsg->stats[i])*totalExecTimes[i] +
+           (outmsg->stats[i])*(outmsg->stats[i])*CkNumPes())/
+          CkNumPes());
+  }
+
+  for (int i=0; i<numMetrics; i++) {
+    // 1) if the proportion of the max value of the entry method relative to
+    //   the average time taken over all entry methods across all processors
+    //   is greater than the stipulated percentage threshold ...; AND
+    // 2) if the range of values are non-zero.
+    //
+    // The current assumption is totalTime > 0 (what program has zero total
+    //   time from all work?)
+    keepMetric[i] = ((totalExecTimes[maxOffset + i]/(totalTime/CkNumPes()) >=
+                    entryThreshold) &&
+      (totalExecTimes[maxOffset + i] > totalExecTimes[minOffset + i]));
+    if (keepMetric[i]) {
+      DEBUGF("[%d] Keep EP %d | Max = %lf | Avg Tot = %lf\n", CkMyPe(), i,
+            totalExecTimes[maxOffset + i], totalTime/CkNumPes());
+    } else {
+      DEBUGN("[%d] DO NOT Keep EP %d\n", CkMyPe(), i);
     }
+    outmsg->filter[i] = keepMetric[i];
   }
-  // convert all values to milliseconds first (else values would be too small
-  // or too big)
-  for (int i=0; i<numEvents; i++) {
-    execTimes[i] *= 1.0e3;
+
+  delete msg;
+  
+  // initialize k seeds for this phase
+  kSeeds = new double[numK*numMetrics];
+
+  numKReported = 0;
+  kNumMembers = new int[numK];
+
+  // Randomly select k processors' metric vectors for the k seeds
+  //  srand((unsigned)(CmiWallTimer()*1.0e06));
+  srand(11337); // for debugging purposes
+  for (int k=0; k<numK; k++) {
+    DEBUGF("Seed %d | ", k);
+    for (int m=0; m<numMetrics; m++) {
+      double factor = totalExecTimes[maxOffset + m] - 
+       totalExecTimes[minOffset + m];
+      // "uniform" distribution, scaled according to the normalization
+      //   factors
+      //      kSeeds[numMetrics*k + m] = ((1.0*(k+1))/numK)*factor;
+      // Random distribution.
+      kSeeds[numMetrics*k + m] =
+       ((rand()*1.0)/RAND_MAX)*factor;
+      if (keepMetric[m]) {
+       DEBUGF("[%d|%lf] ", m, kSeeds[numMetrics*k + m]);
+      }
+      outmsg->kSeedsPos[numMetrics*k + m] = kSeeds[numMetrics*k + m];
+    }
+    DEBUGF("\n");
+    kNumMembers[k] = 0;
   }
 
-  // compute squares
-  for (int i=0; i<numEvents; i++) {
-    execTimes[i+numEvents] = execTimes[i]*execTimes[i];
+  // broadcast statistical values to all processors for cluster discovery
+  thisProxy.findInitialClusters(outmsg);
+}
+
+
+
+// Called on each processor.
+void KMeansBOC::findInitialClusters(KMeansStatsMessage *msg) {
+
+ if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::findInitialClusters time=\t%g\n", CkMyPe(), CkWallTimer() );
+
+  phaseIter = 0;
+
+  // Get info from stats message
+  CkAssert(numMetrics == msg->numMetrics);
+  for (int i=0; i<numMetrics; i++) {
+    keepMetric[i] = msg->filter[i];
   }
 
-  CkCallback cb(CkIndex_TraceProjectionsBOC::outlierAverageReduction(NULL), 
+  // Normalize data on local processor.
+  // **CWL** See my thesis for detailed discussion of normalization of
+  //    performance data.
+  // **NOTE** This might change if we want to send data based on the filter
+  //   instead of all the data.
+  CkAssert(numMetrics*4 == msg->numStats);
+  for (int i=0; i<numMetrics; i++) {
+    currentExecTimes[i] -= msg->stats[numMetrics + i];  // take offset
+    // **CWL** We do not normalize the range. Entry methods that exhibit
+    //   large absolute timing variations should be allowed to contribute
+    //   more to the Euclidean distance measure!
+    // currentExecTimes[i] /= msg->stats[2*numMetrics + i];
+  }
+
+  // **NOTE** This might change if we want to send data based on the filter
+  //   instead of all the data.
+  CkAssert(numK*numMetrics == msg->numKPos);
+  for (int i=0; i<msg->numKPos; i++) {
+    incKSeeds[i] = msg->kSeedsPos[i];
+  }
+
+  // Decide which KSeed this processor belongs to.
+  minDistance = calculateDistance(0);
+  DEBUGN("[%d] Phase %d Iter %d | Distance from 0 = %lf \n", CkMyPe(), 
+          currentPhase, phaseIter, minDistance);
+  minK = 0;
+  for (int i=1; i<numK; i++) {
+    double distance = calculateDistance(i);
+    DEBUGN("[%d] Phase %d Iter %d | Distance from %d = %lf \n", CkMyPe(), 
+            currentPhase, phaseIter, i, distance);
+    if (distance < minDistance) {
+      minDistance = distance;
+      minK = i;
+    }
+  }
+
+  // Set up a reduction with the modification vector to the root (0).
+  //
+  // The modification vector sends a negative value for each metric
+  //   for the K this processor no longer belongs to and a positive
+  //   value to the K the processor now belongs. In addition, a -1.0
+  //   is sent to the K it is leaving and a +1.0 to the K it is 
+  //   joining.
+  //
+  // The processor must still contribute a "zero returns" even if
+  //   nothing changes. This will be the basis for determine
+  //   convergence at the root.
+  //
+  // The addtional +1 is meant for the count-change that must be
+  //   maintained for the special cases at the root when some K
+  //   may be deprived of all processor points or go from 0 to a
+  //   positive number of processors (see later comments).
+  double *modVector = new double[numK*(numMetrics+1)];
+  for (int i=0; i<numK; i++) {
+    for (int j=0; j<numMetrics+1; j++) {
+      modVector[i*(numMetrics+1) + j] = 0.0;
+    }
+  }
+  for (int i=0; i<numMetrics; i++) {
+    // for this initialization, only positive values need be sent.
+    modVector[minK*(numMetrics+1) + i] = currentExecTimes[i];
+  }
+  modVector[minK*(numMetrics+1)+numMetrics] = 1.0;
+
+  CkCallback cb(CkIndex_KMeansBOC::updateKSeeds(NULL), 
                0, thisProxy);
-  contribute(numEvents*2*sizeof(double), execTimes
+  contribute(numK*(numMetrics+1)*sizeof(double), modVector
             CkReduction::sum_double, cb);  
 }
 
-void TraceProjectionsBOC::outlierAverageReduction(CkReductionMsg *msg) {
+double KMeansBOC::calculateDistance(int k) {
+  double ret = 0.0;
+  for (int i=0; i<numMetrics; i++) {
+    if (keepMetric[i]) {
+      DEBUGN("[%d] Phase %d Iter %d Metric %d Exec %lf Seed %lf \n", 
+            CkMyPe(), currentPhase, phaseIter, i,
+              currentExecTimes[i], incKSeeds[k*numMetrics + i]);
+      ret += pow(currentExecTimes[i] - incKSeeds[k*numMetrics + i], 2.0);
+    }
+  }
+  return sqrt(ret);
+}
+
+void KMeansBOC::updateKSeeds(CkReductionMsg *msg) {
   CkAssert(CkMyPe() == 0);
-  // kinda wierd place to initialize a variable, but ...
-  encounteredWeights = 0;
-  weightArray = new double[CkNumPes()];
-  mapArray = new int[CkNumPes()];
 
-  // calculate statistics
-  int numEvents = _entryTable.size()+1;
-  OutlierStatsMessage *outmsg = new (numEvents*2) OutlierStatsMessage;
+  // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::updateKSeeds time=\t%g\n", CkMyPe(), CkWallTimer() );
 
-  double *execTimes = (double *)msg->getData();
-  /*
-  for (int i=0; i<numEvents; i++) {
-    CkPrintf("EP: %d has Data: %lf and sum of squares %lf\n",
-            i,execTimes[i],execTimes[i+numEvents]);
+  double *modVector = (double *)msg->getData();
+  // sanity check
+  CkAssert(numK*(numMetrics+1)*sizeof(double) == msg->getSize());
+
+  // A quick convergence test.
+  bool hasChanges = false;
+  for (int i=0; i<numK; i++) {
+    hasChanges = hasChanges || 
+      (modVector[i*(numMetrics+1) + numMetrics] != 0.0);
   }
-  */
-  for (int i=0; i<numEvents; i++) {
-    // calculate mean
-    outmsg->stats[i] = execTimes[i]/CkNumPes();
-    // calculate stddev (using biased variance)
-    outmsg->stats[i+numEvents] = 
-      sqrt((execTimes[i+numEvents]-2*(outmsg->stats[i])*execTimes[i]+
-           (outmsg->stats[i])*(outmsg->stats[i])*CkNumPes())/
-          CkNumPes());
-    // CkPrintf("EP:%d Mean:%lf Stddev:%lf\n",i,outmsg->stats[i],
-    //              outmsg->stats[i+numEvents]);
+  if (!hasChanges) {
+    delete msg;
+    findRepresentatives();
+  } else {
+    int overallChange = 0;
+    for (int i=0; i<numK; i++) {
+      int change = (int)modVector[i*(numMetrics+1) + numMetrics];
+      if (change != 0) {
+       overallChange += change;
+       // modify the k seeds based on the modification vectors coming in
+       //
+       // If a seed initially has no members, its contents do not matter and
+       //   is simply set to the average of the incoming vector.
+       // If the change causes a seed to lose all its members, do nothing.
+       //   Its last-known location is kept to allow it to re-capture
+       //   membership at the next iteration rather than apply the last
+       //   changes (which snaps the point unnaturally to 0,0).
+       // Otherwise, apply the appropriate vector changes.
+       CkAssert((kNumMembers[i] + change >= 0) &&
+                (kNumMembers[i] + change <= CkNumPes()));
+       if (kNumMembers[i] == 0) {
+         CkAssert(change > 0);
+         for (int j=0; j<numMetrics; j++) {
+           kSeeds[i*numMetrics + j] = modVector[i*(numMetrics+1) + j]/change;
+         }
+       } else if (kNumMembers[i] + change == 0) {
+         // do nothing.
+       } else {
+         for (int j=0; j<numMetrics; j++) {
+           kSeeds[i*numMetrics + j] *= kNumMembers[i];
+           kSeeds[i*numMetrics + j] += modVector[i*(numMetrics+1) + j];
+           kSeeds[i*numMetrics + j] /= kNumMembers[i] + change;
+         }
+       }
+       kNumMembers[i] += change;
+      }
+      DEBUGN("[%d] Phase %d Iter %d K = %d Membership Count = %d\n",
+            CkMyPe(), currentPhase, phaseIter, i, kNumMembers[i]);
+    }
+    delete msg;
+
+    // broadcast the new seed locations.
+    KSeedsMessage *outmsg = new (numK*numMetrics) KSeedsMessage;
+    outmsg->numKPos = numK*numMetrics;
+    for (int i=0; i<numK*numMetrics; i++) {
+      outmsg->kSeedsPos[i] = kSeeds[i];
+    }
+
+    thisProxy.updateSeedMembership(outmsg);
   }
-  delete msg;
+}
 
-  // output averages to a file in microseconds. File handle to be kept
-  // open so we can write more stats to it.
-  char *fname = new char[strlen(CkpvAccess(traceRoot))+strlen(".outlier")+1];
-  sprintf(fname, "%s.outlier", CkpvAccess(traceRoot));
-  do {
-    outlierfp = fopen(fname, "w");
-  } while (!outlierfp && (errno == EINTR || errno == EMFILE));
-  for (int i=0; i<numEvents; i++) {
-    fprintf(outlierfp,"%lld ",(CMK_TYPEDEF_UINT8)(outmsg->stats[i]*1.0e3));
-  }
-  fprintf(outlierfp,"\n");
-
-  // broadcast statistical values to all processors for weight calculations
-  thisProxy.calculateWeights(outmsg);
-}
-
-void TraceProjectionsBOC::calculateWeights(OutlierStatsMessage *msg) {
-  // calculate outlier "weights"
-  int numEvents = _entryTable.size()+1;
-
-  // this is silly, but do it for now. First pass for computing total
-  // time. It is also a misnomer, since this total will not include
-  // any overhead time.
-  OutlierWeightMessage *outmsg = new OutlierWeightMessage;
-  weight = 0.0;
-  outmsg->sourcePe = CkMyPe();
-  outmsg->weight = 0.0;
-  double total = 0.0;
-  for (int i=0; i<numEvents; i++) {
-    total += execTimes[i];
-  }
-  for (int i=0; i<numEvents; i++) {
-    if ((total > 0.0) &&
-       (msg->stats[i+numEvents] > 0.0)) {
-      outmsg->weight += 
-       (fabs(execTimes[i]-msg->stats[i])/msg->stats[i+numEvents]) *
-       (msg->stats[i]/total);
+// Called on all processors
+void KMeansBOC::updateSeedMembership(KSeedsMessage *msg) {
+
+  // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::updateSeedMembership time=\t%g\n", CkMyPe(), CkWallTimer() );
+
+  phaseIter++;
+
+  // **NOTE** This might change if we want to send data based on the filter
+  //   instead of all the data.
+  CkAssert(numK*numMetrics == msg->numKPos);
+  for (int i=0; i<msg->numKPos; i++) {
+    incKSeeds[i] = msg->kSeedsPos[i];
+  }
+
+  // Decide which KSeed this processor belongs to.
+  lastMinK = minK;
+  minDistance = calculateDistance(0);
+  DEBUGN("[%d] Phase %d Iter %d | Distance from 0 = %lf \n", CkMyPe(), 
+        currentPhase, phaseIter, minDistance);
+
+  minK = 0;
+  for (int i=1; i<numK; i++) {
+    double distance = calculateDistance(i);
+    DEBUGN("[%d] Phase %d Iter %d | Distance from %d = %lf \n", CkMyPe(), 
+          currentPhase, phaseIter, i, distance);
+    if (distance < minDistance) {
+      minDistance = distance;
+      minK = i;
     }
-    // CkPrintf("[%d] EP:%d Weight:%lf\n",CkMyPe(),i,outmsg->weight);
   }
-  weight = outmsg->weight;
-  delete msg;
-  
-  thisProxy[0].determineOutliers(outmsg);
+
+  double *modVector = new double[numK*(numMetrics+1)];
+  for (int i=0; i<numK; i++) {
+    for (int j=0; j<numMetrics+1; j++) {
+      modVector[i*(numMetrics+1) + j] = 0.0;
+    }
+  }
+
+  if (minK != lastMinK) {
+    for (int i=0; i<numMetrics; i++) {
+      modVector[minK*(numMetrics+1) + i] = currentExecTimes[i];
+      modVector[lastMinK*(numMetrics+1) + i] = -currentExecTimes[i];
+    }
+    modVector[minK*(numMetrics+1)+numMetrics] = 1.0;
+    modVector[lastMinK*(numMetrics+1)+numMetrics] = -1.0;
+  }
+
+  CkCallback cb(CkIndex_KMeansBOC::updateKSeeds(NULL), 
+               0, thisProxy);
+  contribute(numK*(numMetrics+1)*sizeof(double), modVector, 
+            CkReduction::sum_double, cb);  
 }
 
-void TraceProjectionsBOC::determineOutliers(OutlierWeightMessage *msg) {
-  CkAssert(CkMyPe() == 0);
-  encounteredWeights++;
+void KMeansBOC::findRepresentatives() {
+
+  // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::findRepresentatives time=\t%g\n", CkMyPe(), CkWallTimer() );
+
+  int numNonEmptyClusters = 0;
+  for (int i=0; i<numK; i++) {
+    if (kNumMembers[i] > 0) {
+      numNonEmptyClusters++;
+    }
+  }
+
+  int numRepresentatives = peNumKeep;
+  // **FIXME**
+  // This is fairly arbitrary. Next time, choose the centers of the top
+  //   largest clusters.
+  if (numRepresentatives < numNonEmptyClusters) {
+    numRepresentatives = numNonEmptyClusters;
+  }
 
-  // For now, implement a full array for sorting later. For better scaling
-  // it should really be a sorted list of maximum k entries.
-  weightArray[msg->sourcePe] = msg->weight;
+  int slotsRemaining = numRepresentatives;
 
-  if (encounteredWeights == CkNumPes()) {
-    OutlierThresholdMessage *outmsg = new OutlierThresholdMessage;
-    outmsg->threshold = 0.0;
+  DEBUGF("Slots = %d | Non-empty = %d \n", slotsRemaining, 
+        numNonEmptyClusters);
 
-    // initialize the map array
-    for (int i=0; i<CkNumPes(); i++) {
-      mapArray[i] = i;
+  // determine how many exemplars to select per cluster. Currently
+  //   hardcoded to 1. Future challenge is to decide on other numbers
+  //   or proportionality.
+  //
+  int exemplarsPerCluster = 1;
+  slotsRemaining -= exemplarsPerCluster*numNonEmptyClusters;
+
+  int numCandidateOutliers = CkNumPes() - 
+    exemplarsPerCluster*numNonEmptyClusters;
+
+  double remainders[numK];
+  int assigned[numK];
+  exemplarChoicesLeft = new int[numK];
+  outlierChoicesLeft = new int[numK];
+
+  for (int i=0; i<numK; i++) {
+    assigned[i] = 0;
+    remainders[i] = 
+      (kNumMembers[i] - exemplarsPerCluster*numNonEmptyClusters) *
+      slotsRemaining / numCandidateOutliers;
+    if (remainders[i] >= 0.0) {
+      assigned[i] = (int)floor(remainders[i]);
+      remainders[i] -= assigned[i];
+    } else {
+      remainders[i] = 0.0;
     }
-    
-    // bubble sort the array
-    for (int p=CkNumPes()-1; p>0; p--) {
-      for (int i=0; i<p; i++) {
-       if (weightArray[i+1] < weightArray[i]) {
-         int tempI;
-         double temp = weightArray[i+1];
-         weightArray[i+1] = weightArray[i];
-         weightArray[i] = temp;
-         tempI = mapArray[i+1];
-         mapArray[i+1] = mapArray[i];
-         mapArray[i] = tempI;
-       }
+  }
+
+  for (int i=0; i<numK; i++) {
+    slotsRemaining -= assigned[i];
+  }
+  CkAssert(slotsRemaining >= 0);
+
+  // find clusters to assign the loose slots to, in order of
+  // remainder proportion
+  while (slotsRemaining > 0) {
+    double max = 0.0;
+    int winner = 0;
+    for (int i=0; i<numK; i++) {
+      if (remainders[i] > max) {
+       max = remainders[i];
+       winner = i;
       }
-    }    
+    }
+    assigned[winner]++;
+    remainders[winner] = 0.0;
+    slotsRemaining--;
+  }
 
-    // default threshold is applied.
-    //
-    // **CW** This needs to be changed to accept a runtime parameter
-    // so the default can be overridden. Default value considers the
-    // top 10% "different" processors as outliers.
-    int thresholdIndex;
-    thresholdIndex = (int)(CkNumPes()*0.9);
-    if (thresholdIndex == CkNumPes()) {
-      thresholdIndex--;
+  // set up how many reduction cycles of min/max we need to conduct to
+  // select the representatives.
+  numSelectionIter = exemplarsPerCluster;
+  for (int i=0; i<numK; i++) {
+    if (assigned[i] > numSelectionIter) {
+      numSelectionIter = assigned[i];
     }
+  }
+  DEBUGF("Selection Iterations = %d\n", numSelectionIter);
 
-    // output the sorted processor list to stats file
-    for (int i=thresholdIndex; i<CkNumPes(); i++) {
-      fprintf(outlierfp,"%d ",mapArray[i]);
+  for (int i=0; i<numK; i++) {
+    if (kNumMembers[i] > 0) {
+      exemplarChoicesLeft[i] = exemplarsPerCluster;
+      outlierChoicesLeft[i] = assigned[i];
+    } else {
+      exemplarChoicesLeft[i] = 0;
+      outlierChoicesLeft[i] = 0;
+    }
+    DEBUGF("%d | Exemplar = %d | Outlier = %d\n", i, exemplarChoicesLeft[i],
+          outlierChoicesLeft[i]);
+  }
+
+  // send out first broadcast
+  KSelectionMessage *outmsg = NULL;
+  if (numSelectionIter > 0) {
+    outmsg = new (numK, numK, numK) KSelectionMessage;
+    outmsg->numKMinIDs = numK;
+    outmsg->numKMaxIDs = numK;
+    for (int i=0; i<numK; i++) {
+      outmsg->minIDs[i] = -1;
+      outmsg->maxIDs[i] = -1;
     }
-    fprintf(outlierfp,"\n");
-    fflush(outlierfp);
-    fclose(outlierfp);
+    thisProxy.collectDistances(outmsg);
+  } else {
+    CkPrintf("Warning: No selection iteration from the start!\n");
+    // invoke phase completion on all processors
+    thisProxy.phaseDone();
+  }
+}
 
-    // CkPrintf("Outlier Index determined to be %d with value %lf\n",
-    //      thresholdIndex, weightArray[thresholdIndex]);
-    outmsg->threshold = weightArray[thresholdIndex];
+/*
+ *  lastMin = array of minimum champions of the last tournament
+ *  lastMax = array of maximum champions of the last tournament
+ *  lastMaxVal = array of last encountered maximum values, allows previous
+ *                 minimum winners to eliminate themselves from the next
+ *                 minimum race.
+ *
+ *  Called on all processors.
+ */
+void KMeansBOC::collectDistances(KSelectionMessage *msg) {
+
+  // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::collectDistances time=\t%g\n", CkMyPe(), CkWallTimer() );
+
+  DEBUGF("[%d] %d | min = %d max = %d\n", CkMyPe(),
+        lastMinK, msg->minIDs[lastMinK], msg->maxIDs[lastMinK]);
+  if ((CkMyPe() == msg->minIDs[lastMinK]) || 
+      (CkMyPe() == msg->maxIDs[lastMinK])) {
+    CkAssert(!selected);
+    selected = true;
+  }
+
+  // build outgoing reduction structure
+  //   format = minVal | ID | maxVal | ID
+  double *minMaxAndIDs = NULL;
+
+  minMaxAndIDs = new double[numK*4];
+  // initialize to the appropriate out-of-band values (for error checks)
+  for (int i=0; i<numK; i++) {
+    minMaxAndIDs[i*4] = -1.0; // out-of-band min value
+    minMaxAndIDs[i*4+1] = -1.0; // out of band ID
+    minMaxAndIDs[i*4+2] = -1.0; // out-of-band max value
+    minMaxAndIDs[i*4+3] = -1.0; // out of band ID
+  }
+  // If I have not won before, I put myself back into the competition
+  if (!selected) {
+    DEBUGF("[%d] My Contribution = %lf\n", CkMyPe(), minDistance);
+    minMaxAndIDs[lastMinK*4] = minDistance;
+    minMaxAndIDs[lastMinK*4+1] = CkMyPe();
+    minMaxAndIDs[lastMinK*4+2] = minDistance;
+    minMaxAndIDs[lastMinK*4+3] = CkMyPe();
+  }
+  delete msg;
+
+  CkCallback cb(CkIndex_KMeansBOC::findNextMinMax(NULL), 
+               0, thisProxy);
+  contribute(numK*4*sizeof(double), minMaxAndIDs, 
+            minMaxReductionType, cb);  
+}
+
+void KMeansBOC::findNextMinMax(CkReductionMsg *msg) {
+  // incoming format:
+  //   minVal | minID | maxVal | maxID
+
+  // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::findNextMinMax time=\t%g\n", CkMyPe(), CkWallTimer() );
+
+  if (numSelectionIter > 0) {
+    double *incInfo = (double *)msg->getData();
     
-    delete msg;
-    thisProxy.setOutliers(outmsg);
+    KSelectionMessage *outmsg = new (numK, numK) KSelectionMessage;
+    outmsg->numKMinIDs = numK;
+    outmsg->numKMaxIDs = numK;
+    
+    for (int i=0; i<numK; i++) {
+      DEBUGF("%d | %lf %d %lf %d \n", i, 
+            incInfo[i*4], (int)incInfo[i*4+1], 
+            incInfo[i*4+2], (int)incInfo[i*4+3]);
+    }
+
+    for (int i=0; i<numK; i++) {
+      if (exemplarChoicesLeft[i] > 0) {
+       outmsg->minIDs[i] = (int)incInfo[i*4+1];
+       exemplarChoicesLeft[i]--;
+      } else {
+       outmsg->minIDs[i] = -1;
+      }
+      if (outlierChoicesLeft[i] > 0) {
+       outmsg->maxIDs[i] = (int)incInfo[i*4+3];
+       outlierChoicesLeft[i]--;
+      } else {
+       outmsg->maxIDs[i] = -1;
+      }
+    }
+    thisProxy.collectDistances(outmsg);
+    numSelectionIter--;
   } else {
-    delete msg;
+    // invoke phase completion on all processors
+    thisProxy.phaseDone();
   }
 }
 
-void TraceProjectionsBOC::setOutliers(OutlierThresholdMessage *msg)
-{
+/**
+ *  Completion of the K-Means clustering and data selection of one phase
+ *    of the computation.
+ *
+ *  Called on every processor.
+ */
+void KMeansBOC::phaseDone() {
+
+  //  if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::phaseDone time=\t%g\n", CkMyPe(), CkWallTimer() );
+
   LogPool *pool = CkpvAccess(_trace)->_logPool;
-  // CkPrintf("[%d] My weight is %lf, threshold is %lf\n",CkMyPe(),weight,
-  //   msg->threshold);
-  if (weight < msg->threshold) {
-    // CkPrintf("[%d] Removing myself from output list\n");
-    pool->writeData = false;
+  CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
+
+  // now decide on what to do with the decision.
+  if (!selected) {
+    if (usePhases) {
+      pool->keepPhase[currentPhase] = false;
+    } else {
+      // if not using phases, we're working on the whole log
+      pool->setAllPhases(false);
+    }
   }
 
-  delete msg;
-  thisProxy[CkMyPe()].startEndTimeAnalysis();
+  // **FIXME** (?) - All processors have to agree on this, or the reduction
+  //   will not be correct! The question is "is this enforcible?"
+  if ((currentPhase == (pool->numPhases-1)) || !usePhases) {
+    // We're done
+    int dummy = 0;
+    CkCallback cb(CkIndex_TraceProjectionsBOC::kMeansDone(NULL), 
+                 0, bocProxy);
+    contribute(sizeof(int), &dummy, CkReduction::sum_int, cb);
+  } else {
+    // reset all phase-based k-means data and decisions
+
+    // **FIXME**!!!!!    
+    
+    // invoke the next K-Means computation phase.
+    currentPhase++;
+    thisProxy[CkMyPe()].getNextPhaseMetrics();
+  }
 }
 
 void TraceProjectionsBOC::startEndTimeAnalysis()
 {
+ if(CkMyPe()==0)    CkPrintf("[%d] TraceProjectionsBOC::startEndTimeAnalysis time=\t%g\n", CkMyPe(), CkWallTimer() );
+
   endTime = CkpvAccess(_trace)->endTime;
   // CkPrintf("[%d] End time is %lf us\n", CkMyPe(), endTime*1e06);
 
-  CkCallback cb(CkIndex_TraceProjectionsBOC::endTimeReduction(NULL), 
+  CkCallback cb(CkIndex_TraceProjectionsBOC::endTimeDone(NULL), 
                0, thisProxy);
   contribute(sizeof(double), &endTime, CkReduction::max_double, cb);  
 }
 
-void TraceProjectionsBOC::endTimeReduction(CkReductionMsg *msg)
+void TraceProjectionsBOC::endTimeDone(CkReductionMsg *msg)
 {
+ if(CkMyPe()==0)    CkPrintf("[%d] TraceProjectionsBOC::endTimeDone time=\t%g\n", CkMyPe(), CkWallTimer() );
+
   CkAssert(CkMyPe() == 0);
+  parModulesRemaining--;
   if (CkpvAccess(_trace) != NULL) {
     CkpvAccess(_trace)->_logPool->globalEndTime = *(double *)msg->getData();
     // CkPrintf("End time determined to be %lf us\n",
     //      (CkpvAccess(_trace)->_logPool->globalEndTime)*1e06);
   }
-  delete msg;  
-  CkPrintf("Total Analysis Time = %lf\n", CmiWallTimer()-analysisStartTime);
-  thisProxy.closeTrace();
+  delete msg;
+  if (parModulesRemaining == 0) {
+    thisProxy[CkMyPe()].finalize();
+  }
 }
 
-void TraceProjectionsBOC::finalReduction(CkReductionMsg *msg)
-{
+void TraceProjectionsBOC::kMeansDone(CkReductionMsg *msg) {
+
+ if(CkMyPe()==0)  CkPrintf("[%d] TraceProjectionsBOC::kMeansDone time=\t%g\n", CkMyPe(), CkWallTimer() );
+
   CkAssert(CkMyPe() == 0);
-  // CkPrintf("Final Reduction called\n");
+  parModulesRemaining--;
+  CkPrintf("K-Means Analysis Time = %lf seconds\n",
+          CmiWallTimer()-analysisStartTime);
   delete msg;
-  CkExit();
+  if (parModulesRemaining == 0) {
+    thisProxy[CkMyPe()].finalize();
+  }
 }
 
-void TraceProjectionsBOC::closeTrace()
-{
-  if (CkpvAccess(_trace) == NULL) {
-    return;
+/**
+ *
+ *  This version is called (on processor 0) only if flushCheck fails.
+ *
+ */
+void TraceProjectionsBOC::kMeansDone() {
+  CkAssert(CkMyPe() == 0);
+  parModulesRemaining--;
+  CkPrintf("K-Means Analysis Aborted because of flush. Time taken = %lf seconds\n",
+          CmiWallTimer()-analysisStartTime);
+  if (parModulesRemaining == 0) {
+    thisProxy[CkMyPe()].finalize();
   }
+}
+
+void TraceProjectionsBOC::finalize()
+{
+  CkAssert(CkMyPe() == 0);
+  CkPrintf("Total Analysis Time = %lf seconds\n", 
+          CmiWallTimer()-analysisStartTime);
+  thisProxy.closingTraces();
+}
+
+// Called on every processor
+void TraceProjectionsBOC::closingTraces() {
   CkpvAccess(_trace)->closeTrace();
-  // the following section is only excuted if CkExit is called, since
-  // closing-progress is required for other modules.
+
+  int dummy = 0;
+  CkCallback cb(CkIndex_TraceProjectionsBOC::closeParallelShutdown(NULL), 
+               0, thisProxy);
+  contribute(sizeof(int), &dummy, CkReduction::sum_int, cb);  
+}
+
+// The sole purpose of this reduction is to decide whether or not
+//   Projections as a module needs to call CkExit() to get other
+//   modules to shutdown.
+void TraceProjectionsBOC::closeParallelShutdown(CkReductionMsg *msg) {
+  CkAssert(CkMyPe() == 0);
+  delete msg;
+  // decide if CkExit() needs to be called
   if (!CkpvAccess(_trace)->converseExit) {
-    // this is a dummy reduction to make sure that all the output
-    // is completed before other modules are allowed to do stuff
-    // and finally _CkExit is called.
-    CkCallback cb(CkIndex_TraceProjectionsBOC::finalReduction(NULL), 
-                 0, thisProxy);
-    contribute(sizeof(double), &dummy, CkReduction::max_double, cb);
+    CkExit();
   }
 }
+/*
+ *  Registration and definition of the Outlier Reduction callback.
+ *  Format: Sum | Min | Max | Sum of Squares
+ */
+CkReductionMsg *outlierReduction(int nMsgs,
+                                CkReductionMsg **msgs) {
+  int numBytes = 0;
+  int numMetrics = 0;
+  double *ret = NULL;
+
+  if (nMsgs == 1) {
+    // nothing to do, just pass it on
+    return CkReductionMsg::buildNew(msgs[0]->getSize(),msgs[0]->getData());
+  }
+
+  if (nMsgs > 1) {
+    numBytes = msgs[0]->getSize();
+    // sanity checks
+    if (numBytes%sizeof(double) != 0) {
+      CkAbort("Outlier Reduction Size incompatible with doubles!\n");
+    }
+    if ((numBytes/sizeof(double))%4 != 0) {
+      CkAbort("Outlier Reduction Size Array not divisible by 4!\n");
+    }
+    numMetrics = (numBytes/sizeof(double))/4;
+    ret = new double[numMetrics*4];
 
-// This is the C++ code that is registered to be activated at module
-// shutdown. This is called exactly once on processor 0.
-// 
-extern "C" void CombineProjections()
-{
-#ifndef CMK_OPTIMIZE
-  // CkPrintf("[%d] CombineProjections called!\n", CkMyPe());
-  CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
-  bocProxy.shutdownAnalysis();
-#else
-  CkExit();
-#endif
+    // copy the first message data into the return structure first
+    for (int i=0; i<numMetrics*4; i++) {
+      ret[i] = ((double *)msgs[0]->getData())[i];
+    }
+
+    // Sum | Min | Max | Sum of Squares
+    for (int msgIdx=1; msgIdx<nMsgs; msgIdx++) {
+      for (int i=0; i<numMetrics; i++) {
+       // Sum
+       ret[i] += ((double *)msgs[msgIdx]->getData())[i];
+       // Min
+       ret[numMetrics + i] =
+         (ret[numMetrics + i] < 
+          ((double *)msgs[msgIdx]->getData())[numMetrics + i]) 
+         ? ret[numMetrics + i] : 
+         ((double *)msgs[msgIdx]->getData())[numMetrics + i];
+       // Max
+       ret[2*numMetrics + i] = 
+         (ret[2*numMetrics + i] >
+          ((double *)msgs[msgIdx]->getData())[2*numMetrics + i])
+         ? ret[2*numMetrics + i] :
+         ((double *)msgs[msgIdx]->getData())[2*numMetrics + i];
+       // Sum of Squares (squaring already done at leaf)
+       ret[3*numMetrics + i] +=
+         ((double *)msgs[msgIdx]->getData())[3*numMetrics + i];
+      }
+    }
+  }
+  
+  /* apparently, we do not delete the incoming messages */
+  return CkReductionMsg::buildNew(numBytes,ret);
 }
 
-// This method is called by module initialization to register the exit
-// function. This exit function must ultimately call CkExit() again to
-// so that other module exit functions may proceed after this module is
-// done.
-//
-// This is called once on each processor but the idiom of use appears
-// to be to only have processor 0 register the function.
-void initTraceProjectionsBOC()
-{
-  // CkPrintf("[%d] Trace Projections initialization called!\n", CkMyPe());
-#ifdef __BLUEGENE__
-  if (BgNodeRank() == 0) {
-#else
-    if (CkMyRank() == 0) {
-#endif
-      registerExitFn(CombineProjections);
+/*
+ * The only reason we have a user-defined reduction is to support
+ *   identification of the "winning" processors as well as to handle
+ *   both the min and the max of each "tournament". A simple min/max
+ *   discovery cannot handle ties.
+ */
+CkReductionMsg *minMaxReduction(int nMsgs,
+                               CkReductionMsg **msgs) {
+  CkAssert(nMsgs > 0);
+
+  int numBytes = msgs[0]->getSize();
+  CkAssert(numBytes%sizeof(double) == 0);
+  int numK = (numBytes/sizeof(double))/4;
+
+  double ret[numK*4];
+  // fill with out-of-band values
+  for (int i=0; i<numK; i++) {
+    ret[i*4] = -1.0;
+    ret[i*4+1] = -1.0;
+    ret[i*4+2] = -1.0;
+    ret[i*4+3] = -1.0;
+  }
+
+  // incoming format K * (minVal | minIdx | maxVal | maxIdx)
+  for (int i=0; i<nMsgs; i++) {
+    double *temp = (double *)msgs[i]->getData();
+    for (int j=0; j<numK; j++) {
+      // no previous valid min
+      if (ret[j*4+1] < 0) {
+       // fill it in only if the incoming min is valid
+       if (temp[j*4+1] >= 0) {
+         ret[j*4] = temp[j*4];      // fill min value
+         ret[j*4+1] = temp[j*4+1];  // fill ID
+       }
+      } else {
+       // find Min, only if incoming min is valid
+       if (temp[j*4+1] >= 0) {
+         if (temp[j*4] < ret[j*4]) {
+           ret[j*4] = temp[j*4];      // replace min value
+           ret[j*4+1] = temp[j*4+1];  // replace ID
+         }
+       }
+      }
+      // no previous valid max
+      if (ret[j*4+3] < 0) {
+       // fill only if the incoming max is valid
+       if (temp[j*4+3] >= 0) {
+         ret[j*4+2] = temp[j*4+2];  // fill max value
+         ret[j*4+3] = temp[j*4+3];  // fill ID
+       }
+      } else {
+       // find Max, only if incoming max is valid
+       if (temp[j*4+3] >= 0) {
+         if (temp[j*4+2] > ret[j*4+2]) {
+           ret[j*4+2] = temp[j*4+2];  // replace max value
+           ret[j*4+3] = temp[j*4+3];  // replace ID
+         }
+       }
+      }
     }
   }
+  return CkReductionMsg::buildNew(numBytes, ret);
+}
 
 #include "TraceProjections.def.h"
-#endif
+#endif //PROJ_ANALYSIS
 
 /*@}*/
index 133aa4b175390dfc6f39602335e48469e48e6477..721486ab15be0822baeb9ad47b5a7d7fb27b42a6 100644 (file)
@@ -1,34 +1,66 @@
-
 module TraceProjections {
 
-  message OutlierStatsMessage {
+  message KMeansStatsMessage {
+    bool filter[];
+    double kSeedsPos[];
     double stats[];
   };
 
-  message OutlierWeightMessage;
+  message KSeedsMessage {
+    double kSeedsPos[];
+  };
 
-  message OutlierThresholdMessage;
+  message KSelectionMessage {
+    int minIDs[];
+    int maxIDs[];
+  };
 
   mainchare TraceProjectionsInit {
+    // This initialization happens at the start of the program and is
+    //   intended to allow commandline arguments to be passed into
+    //   trace-projections.
     entry TraceProjectionsInit(CkArgMsg *m);
   };
 
-  initnode void initTraceProjectionsBOC();
+  // initcalls are automatically executed by the runtime just before
+  //   computation begins. It is assumed this happens before mainchares are
+  //   initialized as it is intended for important runtime registrations.
+  // In this case, we have user-specific reductions and the registration of
+  //   an module exit function.
+  initproc void registerOutlierReduction(void);
+  initnode void initTraceProjectionsBOC(void);
   
   readonly CkGroupID traceProjectionsGID;
+  readonly CkGroupID kMeansGID;
+
+  // "shadow" BOC for TraceProjectionsBOC, specialized for KMeans computations
+  group [migratable] KMeansBOC {
+    entry KMeansBOC(bool, int, int, double, bool);
+    
+    entry void startKMeansAnalysis(void);
+    entry void flushCheck(CkReductionMsg *);
+    entry void flushCheckDone(void);
+    entry void getNextPhaseMetrics(void);
+    entry void globalMetricRefinement(CkReductionMsg *);
+    entry void findInitialClusters(KMeansStatsMessage *);
+    entry void updateKSeeds(CkReductionMsg *);
+    entry void updateSeedMembership(KSeedsMessage *);
+    entry void collectDistances(KSelectionMessage *);
+    entry void findNextMinMax(CkReductionMsg *);
+    entry void phaseDone(void);
+  };
 
   group [migratable] TraceProjectionsBOC {
-    entry TraceProjectionsBOC(void);
-    entry void startOutlierAnalysis();
-    entry void outlierAverageReduction(CkReductionMsg *);
-    entry void calculateWeights(OutlierStatsMessage *);
-    entry void determineOutliers(OutlierWeightMessage *);
-    entry void setOutliers(OutlierThresholdMessage *);
-    entry void startEndTimeAnalysis();
-    entry void endTimeReduction(CkReductionMsg *);
-    entry void finalReduction(CkReductionMsg *);
-    entry [notrace] void shutdownAnalysis(void);
-    entry void closeTrace(void);
+    entry TraceProjectionsBOC(bool);
+
+    entry void traceProjectionsParallelShutdown(void);
+    entry void startEndTimeAnalysis(void);
+    entry void endTimeDone(CkReductionMsg *);
+    entry void kMeansDone(void);
+    entry void kMeansDone(CkReductionMsg *);
+    entry void finalize(void);
+    entry void closingTraces(void);
+    entry void closeParallelShutdown(CkReductionMsg *);
   };
 
 };
index b2cc1a805b237db485abf47fd7cdcd49cd9dfa55..5cfb69389c992e192b14fd46522b9f7d830b1ad0 100644 (file)
@@ -1,8 +1,8 @@
 /*****************************************************************************
- * $Source$
- * $Author$
- * $Date$
- * $Revision$
+ * $Source: /cvsroot/charm/src/ck-perf/trace-projections.h,v $
+ * $Author: gioachin $
+ * $Date: 2009-08-20 01:09:41 $
+ * $Revision: 2.78 $
  *****************************************************************************/
 
 /**
@@ -21,8 +21,6 @@
 #include "trace-common.h"
 #include "ckhashtable.h"
 
-
-
 #if CMK_HAS_COUNTER_PAPI
 #include <papi.h>
 #endif
@@ -243,9 +241,13 @@ class TraceProjections;
 /// log pool in trace projection
 class LogPool {
   friend class TraceProjections;
+#ifdef PROJ_ANALYSIS
+  // The macro is here "just-in-case". Somehow, it seems it is not necessary
+  //   to declare friend classes ahead of time in C++.
   friend class TraceProjectionsBOC;
+  friend class KMeansBOC;
+#endif  //PROJ_ANALYSIS
   friend class controlPointManager;
-
   private:
     bool writeData;
     unsigned int poolSize;
@@ -272,6 +274,10 @@ class LogPool {
     double timeErr;
     double globalEndTime; // used at the end on Pe 0 only
 
+    int numPhases;
+    bool hasFlushed;
+    bool *keepPhase;  // one decision per phase
+
     int headerWritten;
     bool fileCreated;
     void writeHeader();
@@ -293,6 +299,19 @@ class LogPool {
     void writeSts(TraceProjections *traceProj);
     void writeRC(void);
 
+    void initializePhases() {
+      keepPhase = new bool[numPhases];
+      for (int i=0; i<numPhases; i++) {
+       keepPhase[i] = true;
+      }
+    }
+
+    void setAllPhases(bool val) {
+      for (int i=0; i<numPhases; i++) {
+       keepPhase[i] = val;
+      }
+    }
+
     void add(unsigned char type, unsigned short mIdx, unsigned short eIdx,
             double time, int event, int pe, int ml=0, CmiObjId* id=0, 
             double recvT=0.0, double cpuT=0.0, int numPe=0);
@@ -392,7 +411,12 @@ class NestedEvent {
   events descriptions will be written into .sts file.
 */
 class TraceProjections : public Trace {
+#ifdef PROJ_ANALYSIS
+  // The macro is here "just-in-case". Somehow, it seems it is not necessary
+  //   to declare friend classes ahead of time in C++.
   friend class TraceProjectionsBOC;
+  friend class KMeansBOC;
+#endif // PROJ_ANALYSIS
  private:
     LogPool* _logPool;        /**<  logpool for all events */
     int curevent;
@@ -405,9 +429,12 @@ class TraceProjections : public Trace {
     int funcCount;
     CkHashtableT<StrKey,int> funcHashtable;
 
-       int traceNestedEvents;
+    int traceNestedEvents;
     CkQ<NestedEvent> nestedEvents;
     
+    int currentPhaseID;
+    LogEntry* lastPhaseEvent;
+
     //as user now can specify the idx, it's possible that user may specify an existing idx
     //so that we need a data structure to track idx. --added by Chao Mei
     CkVec<int> idxVec;
@@ -417,7 +444,7 @@ class TraceProjections : public Trace {
     LONG_LONG_PAPI *papiValues;
 #endif
 
- public:
 public:
     int converseExit; // used for exits that bypass CkExit.
     double endTime;
 
@@ -454,12 +481,8 @@ class TraceProjections : public Trace {
     void traceClearEps();
     void traceWriteSts();
     void traceClose();
-    void traceBegin();    
+    void traceBegin();
     void traceEnd();
- #if CMK_SMP_TRACE_COMMTHREAD          
-    void traceBeginOnCommThread();
-    void traceEndOnCommThread();
-#endif         
     void traceFlushLog() { _logPool->flushLogBuffer(); }
 
     //functions that perform function tracing
@@ -471,6 +494,11 @@ class TraceProjections : public Trace {
     void endFunc(char *name);
     void endFunc(int num);
 
+    /* start recognizing phases in trace-projections */
+    /* _TRACE_END_PHASE must be called collectively on all processors */
+    /*   in order for phase numbers to match up. */
+    void endPhase();
+
     /* This is for moving projections to being a charm++ module */
     void closeTrace(void);
 
@@ -520,9 +548,6 @@ void disableTraceLogOutput();
 /// Enable the outputting of the trace logs
 void enableTraceLogOutput();
 
-/// Force the log file to be flushed
-void flushTraceLog();
-
 
 #endif
 
index f1fa5ecc72211428ec1179737a37b5ffe15b5b3c..4a213fbff5524b68c50d75f683c14e2521063399 100644 (file)
 #include "envelope.h"
 #include "register.h"
 #include "trace-common.h"
+#include "ckcallback-ccs.h"
 
 #ifndef PROJ_ANALYSIS
+// NOTE: Needed to handle the automatically-generated method so 
+//   trace-projections would build correctly while ignoring any of the 
+//   BOC-based definitions generated by parsing trace-projections.ci.
+//   Hence, we do not include TraceProjections.decl.h in this version.
+//
+//   This version of trace-projections would not permit any form of
+//   end-of-run operations and NONE of the definitions found in 
+//   trace-projections.ci would be visible to the rest of the code
+//   (trace-projections.C), so some care would be needed to ensure 
+//   PROJ_ANALYSIS encloses the correct code-fragments in 
+//   trace-projections.C
 void _registerTraceProjections() {
   // faked call that does nothing.
 }
 #else
 #include "TraceProjections.decl.h"
 
-extern CkGroupID traceProjectionsGID;
+class KMeansStatsMessage : public CMessage_KMeansStatsMessage {
+ public:
+  int numMetrics;
+  int numKPos;
+  int numStats;
+  bool *filter;
+  double *kSeedsPos;
+  double *stats;
+};
 
-class TraceProjectionsInit : public Chare {
-  public:
-  TraceProjectionsInit(CkArgMsg*) {
-    traceProjectionsGID = CProxy_TraceProjectionsBOC::ckNew();
-  }
-  TraceProjectionsInit(CkMigrateMessage *m):Chare(m) {}
+class KSeedsMessage : public CMessage_KSeedsMessage {
+ public:
+  int numKPos;
+  double *kSeedsPos;
 };
 
-class OutlierStatsMessage : public CMessage_OutlierStatsMessage {
+class TraceProjectionsInit : public Chare {
  public:
-  double *stats;
+  TraceProjectionsInit(CkArgMsg *msg);
+ TraceProjectionsInit(CkMigrateMessage *m):Chare(m) {}
 };
 
-class OutlierWeightMessage : public CMessage_OutlierWeightMessage {
+class KSelectionMessage : public CMessage_KSelectionMessage {
  public:
-  int sourcePe;
-  double weight;
+  int numKMinIDs;
+  int numKMaxIDs;
+  int *minIDs;
+  int *maxIDs;
 };
 
-class OutlierThresholdMessage : public CMessage_OutlierThresholdMessage {
+class KMeansBOC : public CBase_KMeansBOC {
+ private:
+  // commandline parameters
+  bool autoCompute;
+  int numK;
+  int peNumKeep;
+  double entryThreshold;
+  bool usePhases;
+
+  int numKReported;
+
+  // variables for correct data gathering across phases
+  bool markedBegin;
+  bool markedIdle;
+  double beginBlockTime;
+  double beginIdleBlockTime;
+  int lastBeginEPIdx;
+  int numSelectionIter;
+  bool selected;
+
+  int currentPhase;
+  int lastPhaseIdx;
+  double *currentExecTimes;
+
+  // kMeans outlier structures - ALL processors will host this data
+  int numEntryMethods;
+  int numMetrics;
+  int phaseIter;
+
+  bool *keepMetric;
+  double *incKSeeds;
+  double minDistance; // distance to the closest seed
+  int lastMinK;
+  int minK; // the seed closest to the processor
+
+  // ONLY processor 0 will host this data, the location vector of K seeds
+  // This is actually a 2-D array, we need it to be contiguous for
+  //   communication purposes.
+  double *kSeeds; 
+  int *kNumMembers;
+
+  int *exemplarChoicesLeft;
+  int *outlierChoicesLeft;
+
  public:
-  double threshold;
+ KMeansBOC(bool outlierAutomatic, int numKSeeds, int _peNumKeep,
+          double _entryThreshold, bool outlierUsePhases) :
+  autoCompute(outlierAutomatic), numK(numKSeeds), 
+    peNumKeep(_peNumKeep), entryThreshold(_entryThreshold),
+    usePhases(outlierUsePhases) {};
+ KMeansBOC(CkMigrateMessage *m):CBase_KMeansBOC(m) {};
+  
+  void startKMeansAnalysis();
+  void flushCheck(CkReductionMsg *msg);
+  void flushCheckDone();
+  void getNextPhaseMetrics();
+  void collectKMeansData(); // C++ method
+  void globalMetricRefinement(CkReductionMsg *msg);
+  void initKSeeds(); // C++ method
+  void findInitialClusters(KMeansStatsMessage *msg);
+  void updateKSeeds(CkReductionMsg *msg);
+  double calculateDistance(int k); // C++ method
+  void updateSeedMembership(KSeedsMessage *msg);
+  void findRepresentatives(); // C++ method
+  void collectDistances(KSelectionMessage *msg);
+  void findNextMinMax(CkReductionMsg *msg);
+  void phaseDone();
+
+  /*
+  void calculateWeights(KMeansStatsMessage *);
+  void determineOutliers(OutlierWeightMessage *);
+  void setOutliers(OutlierThresholdMessage *);
+  */
 };
 
 class TraceProjectionsBOC : public CBase_TraceProjectionsBOC {
-private:
+ private:
+  bool findOutliers;
+
+  int parModulesRemaining;
+
   double dummy;
   double endTime;
   double analysisStartTime;
-  double *execTimes;
-  double weight;
-  int encounteredWeights;
-  double *weightArray;
-  int *mapArray;
+ public:
+ TraceProjectionsBOC(bool _findOutliers) : findOutliers(_findOutliers) {};
+ TraceProjectionsBOC(CkMigrateMessage *m):CBase_TraceProjectionsBOC(m) {};
 
-  FILE *outlierfp;
-  
-public:
-  TraceProjectionsBOC(void) {};
-  TraceProjectionsBOC(CkMigrateMessage *m):CBase_TraceProjectionsBOC(m) {};
-  void startOutlierAnalysis();
-  void outlierAverageReduction(CkReductionMsg *);
-  void calculateWeights(OutlierStatsMessage *);
-  void determineOutliers(OutlierWeightMessage *);
-  void setOutliers(OutlierThresholdMessage *);
+  void traceProjectionsParallelShutdown();
   void startEndTimeAnalysis();
-  void endTimeReduction(CkReductionMsg *);
-  void finalReduction(CkReductionMsg *);
+  void endTimeDone(CkReductionMsg *);
+  void kMeansDone(CkReductionMsg *);
+  void kMeansDone(void);
+  void finalize(void);
   void shutdownAnalysis(void);
-  void closeTrace(void);
+  void closingTraces(void);
+  void closeParallelShutdown(CkReductionMsg *);
+
+  void ccsOutlierRequest(CkCcsRequestMsg *);
 };
 #endif
index 82189e0d33201ccd9e773f4d7017bcb8433bd54d..e5654c351aa9b50605757283f387f1a2991dcd61 100644 (file)
@@ -140,6 +140,8 @@ class Trace {
     // begin/end of execution
     virtual void beginComputation(void) {}
     virtual void endComputation(void) {}
+    // demarkation of a phase boundary
+    virtual void endPhase() {}
     // clear all data collected for entry points
     virtual void traceClearEps() {}
     // enable CCS operations if supported on the trace module
@@ -267,6 +269,9 @@ public:
     inline void endFunc(char *name){ ALLDO(endFunc(name)); }
     inline void endFunc(int idx){ ALLDO(endFunc(idx)); }
 
+    /* Phase Demarkation */
+    inline void endPhase() { ALLDO(endPhase()); }
+
     /* Memory tracing */
     inline void malloc(void *where, int size, void **stack, int stackSize){ ALLDO(malloc(where,size,stack,stackSize)); }
     inline void free(void *where, int size){ ALLDO(free(where, size)); }
@@ -318,6 +323,8 @@ extern "C" {
 #define _TRACE_ENQUEUE(env) _TRACE_ONLY(CkpvAccess(_traces)->enqueue(env))
 #define _TRACE_DEQUEUE(env) _TRACE_ONLY(CkpvAccess(_traces)->dequeue(env))
 
+#define _TRACE_END_PHASE() _TRACE_ONLY(CkpvAccess(_traces)->endPhase())
+
 /* Memory tracing */
 #define _TRACE_MALLOC(where, size, stack, stackSize) _TRACE_ONLY(CkpvAccess(_traces)->malloc(where,size,stack,stackSize))
 #define _TRACE_FREE(where, size) _TRACE_ONLY(CkpvAccess(_traces)->free(where, size))
index ec01bc89328e2941efef8906559efd938caa4d2e..fa4e9f0da924798ccbfa8f55249dc2702a3ae170 100644 (file)
@@ -29,6 +29,8 @@
         subroutine ftraceEndFunc(idx)
           integer, intent(in) :: idx
         end subroutine
+        subroutine ftracePhaseEnd()
+        end subroutine
       end interface
       end module