Merge branch 'charm' of charmgit:charm into charm
[charm.git] / src / ck-perf / trace-projections.C
1 /**
2  * \addtogroup CkPerf
3 */
4 /*@{*/
5
6 #include <string.h>
7
8 #include "charm++.h"
9 #include "trace-projections.h"
10 #include "trace-projectionsBOC.h"
11
12 #if DEBUG_PROJ
13 #define DEBUGF(format, ...) CkPrintf(format, ## __VA_ARGS__)
14 #else
15 #define DEBUGF(format, ...)           // CmiPrintf x
16 #endif
17 #define DEBUGN(format, ...)  // easy way to selectively disable DEBUGs
18
19 #define DefaultLogBufSize      1000000
20
21 // **CW** Simple delta encoding implementation
22 // delta encoding is on by default. It may be turned off later in
23 // the runtime.
24 int deltaLog;
25 int nonDeltaLog;
26
27 int checknested=0;              // check illegal nested begin/end execute 
28
29 #ifdef PROJ_ANALYSIS
30 // BOC operations readonlys
31 CkGroupID traceProjectionsGID;
32 CkGroupID kMeansGID;
33
34 // New reduction type for Outlier Analysis purposes. This is allowed to be
35 // a global variable according to the Charm++ manual.
36 CkReductionMsg *outlierReduction(int nMsgs,
37                                  CkReductionMsg **msgs);
38 CkReductionMsg *minMaxReduction(int nMsgs,
39                                 CkReductionMsg **msgs);
40 CkReduction::reducerType outlierReductionType;
41 CkReduction::reducerType minMaxReductionType;
42 #endif // PROJ_ANALYSIS
43
44 CkpvStaticDeclare(TraceProjections*, _trace);
45 CtvStaticDeclare(int,curThreadEvent);
46
47 CkpvDeclare(CmiInt8, CtrLogBufSize);
48
49 typedef CkVec<char *>  usrEventVec;
50 CkpvStaticDeclare(usrEventVec, usrEventlist);
51 class UsrEvent {
52 public:
53   int e;
54   char *str;
55   UsrEvent(int _e, char* _s): e(_e),str(_s) {}
56 };
57 CkpvStaticDeclare(CkVec<UsrEvent *>*, usrEvents);
58
59 // When tracing is disabled, these are defined as empty static inlines
60 // in the header, to minimize overhead
61 #if CMK_TRACE_ENABLED
62 /// Disable the outputting of the trace logs
63 void disableTraceLogOutput()
64
65   CkpvAccess(_trace)->setWriteData(false);
66 }
67
68 /// Enable the outputting of the trace logs
69 void enableTraceLogOutput()
70 {
71   CkpvAccess(_trace)->setWriteData(true);
72 }
73
74 /// Force the log files to be flushed
75 void flushTraceLog()
76 {
77   CkpvAccess(_trace)->traceFlushLog();
78 }
79 #endif
80
81 #if ! CMK_TRACE_ENABLED
82 static int warned=0;
83 #define OPTIMIZED_VERSION       \
84         if (!warned) { warned=1;        \
85         CmiPrintf("\n\n!!!! Warning: traceUserEvent not available in optimized version!!!!\n\n\n"); }
86 #else
87 #define OPTIMIZED_VERSION /*empty*/
88 #endif // CMK_TRACE_ENABLED
89
90 /*
91 On T3E, we need to have file number control by open/close files only when needed.
92 */
93 #if CMK_TRACE_LOGFILE_NUM_CONTROL
94   #define OPEN_LOG openLog("a");
95   #define CLOSE_LOG closeLog();
96 #else
97   #define OPEN_LOG
98   #define CLOSE_LOG
99 #endif //CMK_TRACE_LOGFILE_NUM_CONTROL
100
101 #if CMK_HAS_COUNTER_PAPI
102 int numPAPIEvents = 2;
103 int papiEvents[] = { PAPI_L3_DCM, PAPI_FP_OPS };
104 char *papiEventNames[] = {"PAPI_L3_DCM", "PAPI_FP_OPS"};
105 #endif // CMK_HAS_COUNTER_PAPI
106
107 /**
108   For each TraceFoo module, _createTraceFoo() must be defined.
109   This function is called in _createTraces() generated in moduleInit.C
110 */
111 void _createTraceprojections(char **argv)
112 {
113   DEBUGF("%d createTraceProjections\n", CkMyPe());
114   CkpvInitialize(CkVec<char *>, usrEventlist);
115   CkpvInitialize(CkVec<UsrEvent *>*, usrEvents);
116   CkpvAccess(usrEvents) = new CkVec<UsrEvent *>();
117 #if CMK_BLUEGENE_CHARM
118   // CthRegister does not call the constructor
119 //  CkpvAccess(usrEvents) = CkVec<UsrEvent *>();
120 #endif //CMK_BLUEGENE_CHARM
121   CkpvInitialize(TraceProjections*, _trace);
122   CkpvAccess(_trace) = new  TraceProjections(argv);
123   CkpvAccess(_traces)->addTrace(CkpvAccess(_trace));
124   if (CkMyPe()==0) CkPrintf("Charm++: Tracemode Projections enabled.\n");
125 }
126  
127 /* ****** CW TEMPORARY LOCATION ***** Support for thread listeners */
128
129 struct TraceThreadListener {
130   struct CthThreadListener base;
131   int event;
132   int msgType;
133   int ep;
134   int srcPe;
135   int ml;
136   CmiObjId idx;
137 };
138
139 extern "C"
140 void traceThreadListener_suspend(struct CthThreadListener *l)
141 {
142   TraceThreadListener *a=(TraceThreadListener *)l;
143   /* here, we activate the appropriate trace codes for the appropriate
144      registered modules */
145   traceSuspend();
146 }
147
148 extern "C"
149 void traceThreadListener_resume(struct CthThreadListener *l) 
150 {
151   TraceThreadListener *a=(TraceThreadListener *)l;
152   /* here, we activate the appropriate trace codes for the appropriate
153      registered modules */
154   _TRACE_BEGIN_EXECUTE_DETAILED(a->event,a->msgType,a->ep,a->srcPe,a->ml,
155                                 CthGetThreadID(a->base.thread));
156   a->event=-1;
157   a->srcPe=CkMyPe(); /* potential lie to migrated threads */
158   a->ml=0;
159 }
160
161 extern "C"
162 void traceThreadListener_free(struct CthThreadListener *l) 
163 {
164   TraceThreadListener *a=(TraceThreadListener *)l;
165   delete a;
166 }
167
168 void TraceProjections::traceAddThreadListeners(CthThread tid, envelope *e)
169 {
170 #if CMK_TRACE_ENABLED
171   /* strip essential information from the envelope */
172   TraceThreadListener *a= new TraceThreadListener;
173   
174   a->base.suspend=traceThreadListener_suspend;
175   a->base.resume=traceThreadListener_resume;
176   a->base.free=traceThreadListener_free;
177   a->event=e->getEvent();
178   a->msgType=e->getMsgtype();
179   a->ep=e->getEpIdx();
180   a->srcPe=e->getSrcPe();
181   a->ml=e->getTotalsize();
182
183   CthAddListener(tid, (CthThreadListener *)a);
184 #endif
185 }
186
187 void LogPool::openLog(const char *mode)
188 {
189 #if CMK_PROJECTIONS_USE_ZLIB
190   if(compressed) {
191     if (nonDeltaLog) {
192       do {
193         zfp = gzopen(fname, mode);
194       } while (!zfp && (errno == EINTR || errno == EMFILE));
195       if(!zfp) CmiAbort("Cannot open Projections Compressed Non Delta Trace File for writing...\n");
196     }
197     if (deltaLog) {
198       do {
199         deltazfp = gzopen(dfname, mode);
200       } while (!deltazfp && (errno == EINTR || errno == EMFILE));
201       if (!deltazfp) 
202         CmiAbort("Cannot open Projections Compressed Delta Trace File for writing...\n");
203     }
204   } else {
205     if (nonDeltaLog) {
206       do {
207         fp = fopen(fname, mode);
208       } while (!fp && (errno == EINTR || errno == EMFILE));
209       if (!fp) {
210         CkPrintf("[%d] Attempting to open file [%s]\n",CkMyPe(),fname);
211         CmiAbort("Cannot open Projections Non Delta Trace File for writing...\n");
212       }
213     }
214     if (deltaLog) {
215       do {
216         deltafp = fopen(dfname, mode);
217       } while (!deltafp && (errno == EINTR || errno == EMFILE));
218       if (!deltafp) 
219         CmiAbort("Cannot open Projections Delta Trace File for writing...\n");
220     }
221   }
222 #else
223   if (nonDeltaLog) {
224     do {
225       fp = fopen(fname, mode);
226     } while (!fp && (errno == EINTR || errno == EMFILE));
227     if (!fp) {
228       CkPrintf("[%d] Attempting to open file [%s]\n",CkMyPe(),fname);
229       CmiAbort("Cannot open Projections Non Delta Trace File for writing...\n");
230     }
231   }
232   if (deltaLog) {
233     do {
234       deltafp = fopen(dfname, mode);
235     } while (!deltafp && (errno == EINTR || errno == EMFILE));
236     if(!deltafp) 
237       CmiAbort("Cannot open Projections Delta Trace File for writing...\n");
238   }
239 #endif
240 }
241
242 void LogPool::closeLog(void)
243 {
244 #if CMK_PROJECTIONS_USE_ZLIB
245   if(compressed) {
246     if (nonDeltaLog) gzclose(zfp);
247     if (deltaLog) gzclose(deltazfp);
248     return;
249   }
250 #endif
251   if (nonDeltaLog) { 
252 #if !defined(_WIN32) || defined(__CYGWIN__)
253     fsync(fileno(fp)); 
254 #endif
255     fclose(fp); 
256   }
257   if (deltaLog)  { 
258 #if !defined(_WIN32) || defined(__CYGWIN__)
259     fsync(fileno(deltafp)); 
260 #endif
261     fclose(deltafp);  
262   }
263 }
264
265 LogPool::LogPool(char *pgm) {
266   pool = new LogEntry[CkpvAccess(CtrLogBufSize)];
267   // defaults to writing data (no outlier changes)
268   writeData = true;
269   numEntries = 0;
270   // **CW** for simple delta encoding
271   prevTime = 0.0;
272   timeErr = 0.0;
273   globalStartTime = 0.0;
274   globalEndTime = 0.0;
275   headerWritten = 0;
276   numPhases = 0;
277   hasFlushed = false;
278
279   keepPhase = NULL;
280
281   fileCreated = false;
282   poolSize = CkpvAccess(CtrLogBufSize);
283   pgmname = new char[strlen(pgm)+1];
284   strcpy(pgmname, pgm);
285 }
286
287 void LogPool::createFile(const char *fix)
288 {
289   if (fileCreated) {
290     return;
291   }
292
293   char* filenameLastPart = strrchr(pgmname, PATHSEP) + 1; // Last occurrence of path separator
294   char *pathPlusFilePrefix = new char[1024];
295
296   if(nSubdirs > 0){
297     int sd = CkMyPe() % nSubdirs;
298     char *subdir = new char[1024];
299     sprintf(subdir, "%s.projdir.%d", pgmname, sd);
300     CmiMkdir(subdir);
301     sprintf(pathPlusFilePrefix, "%s%c%s%s", subdir, PATHSEP, filenameLastPart, fix);
302     delete[] subdir;
303   } else {
304     sprintf(pathPlusFilePrefix, "%s%s", pgmname, fix);
305   }
306
307   char pestr[10];
308   sprintf(pestr, "%d", CkMyPe());
309 #if CMK_PROJECTIONS_USE_ZLIB
310   int len;
311   if(compressed)
312     len = strlen(pathPlusFilePrefix)+strlen(".logold")+strlen(pestr)+strlen(".gz")+3;
313   else
314     len = strlen(pathPlusFilePrefix)+strlen(".logold")+strlen(pestr)+3;
315 #else
316   int len = strlen(pathPlusFilePrefix)+strlen(".logold")+strlen(pestr)+3;
317 #endif
318
319   if (nonDeltaLog) {
320     fname = new char[len];
321   }
322   if (deltaLog) {
323     dfname = new char[len];
324   }
325 #if CMK_PROJECTIONS_USE_ZLIB
326   if(compressed) {
327     if (deltaLog && nonDeltaLog) {
328       sprintf(fname, "%s.%s.logold.gz",  pathPlusFilePrefix, pestr);
329       sprintf(dfname, "%s.%s.log.gz", pathPlusFilePrefix, pestr);
330     } else {
331       if (nonDeltaLog) {
332         sprintf(fname, "%s.%s.log.gz", pathPlusFilePrefix,pestr);
333       } else {
334         sprintf(dfname, "%s.%s.log.gz", pathPlusFilePrefix, pestr);
335       }
336     }
337   } else {
338     if (deltaLog && nonDeltaLog) {
339       sprintf(fname, "%s.%s.logold", pathPlusFilePrefix, pestr);
340       sprintf(dfname, "%s.%s.log", pathPlusFilePrefix, pestr);
341     } else {
342       if (nonDeltaLog) {
343         sprintf(fname, "%s.%s.log", pathPlusFilePrefix, pestr);
344       } else {
345         sprintf(dfname, "%s.%s.log", pathPlusFilePrefix, pestr);
346       }
347     }
348   }
349 #else
350   if (deltaLog && nonDeltaLog) {
351     sprintf(fname, "%s.%s.logold", pathPlusFilePrefix, pestr);
352     sprintf(dfname, "%s.%s.log", pathPlusFilePrefix, pestr);
353   } else {
354     if (nonDeltaLog) {
355       sprintf(fname, "%s.%s.log", pathPlusFilePrefix, pestr);
356     } else {
357       sprintf(dfname, "%s.%s.log", pathPlusFilePrefix, pestr);
358     }
359   }
360 #endif
361   fileCreated = true;
362   delete[] pathPlusFilePrefix;
363   openLog("w+");
364   CLOSE_LOG 
365 }
366
367 void LogPool::createSts(const char *fix)
368 {
369   CkAssert(CkMyPe() == 0);
370   // create the sts file
371   char *fname = new char[strlen(CkpvAccess(traceRoot))+strlen(fix)+strlen(".sts")+2];
372   sprintf(fname, "%s%s.sts", CkpvAccess(traceRoot), fix);
373   do
374     {
375       stsfp = fopen(fname, "w");
376     } while (!stsfp && (errno == EINTR || errno == EMFILE));
377   if(stsfp==0)
378     CmiAbort("Cannot open projections sts file for writing.\n");
379   delete[] fname;
380 }  
381
382 void LogPool::createRC()
383 {
384   // create the projections rc file.
385   fname = 
386     new char[strlen(CkpvAccess(traceRoot))+strlen(".projrc")+1];
387   sprintf(fname, "%s.projrc", CkpvAccess(traceRoot));
388   do {
389     rcfp = fopen(fname, "w");
390   } while (!rcfp && (errno == EINTR || errno == EMFILE));
391   if (rcfp==0) {
392     CmiAbort("Cannot open projections configuration file for writing.\n");
393   }
394   delete[] fname;
395 }
396
397 LogPool::~LogPool() 
398 {
399   if (writeData) {
400     writeLog();
401 #if !CMK_TRACE_LOGFILE_NUM_CONTROL
402     closeLog();
403 #endif
404   }
405
406 #if CMK_BLUEGENE_CHARM
407   extern int correctTimeLog;
408   if (correctTimeLog) {
409     createFile("-bg");
410     if (CkMyPe() == 0) {
411       createSts("-bg");
412     }
413     writeHeader();
414     if (CkMyPe() == 0) writeSts(NULL);
415     postProcessLog();
416   }
417 #endif
418
419   delete[] pool;
420   delete [] fname;
421 }
422
423 void LogPool::writeHeader()
424 {
425   if (headerWritten) return;
426   headerWritten = 1;
427   if(!binary) {
428 #if CMK_PROJECTIONS_USE_ZLIB
429     if(compressed) {
430       if (nonDeltaLog) {
431         gzprintf(zfp, "PROJECTIONS-RECORD %d\n", numEntries);
432       }
433       if (deltaLog) {
434         gzprintf(deltazfp, "PROJECTIONS-RECORD %d DELTA\n", numEntries);
435       }
436     } 
437     else /* else clause is below... */
438 #endif
439     /*... may hang over from else above */ {
440       if (nonDeltaLog) {
441         fprintf(fp, "PROJECTIONS-RECORD %d\n", numEntries);
442       }
443       if (deltaLog) {
444         fprintf(deltafp, "PROJECTIONS-RECORD %d DELTA\n", numEntries);
445       }
446     }
447   }
448   else { // binary
449       if (nonDeltaLog) {
450         fwrite(&numEntries,sizeof(numEntries),1,fp);
451       }
452       if (deltaLog) {
453         fwrite(&numEntries,sizeof(numEntries),1,deltafp);
454       }
455   }
456 }
457
458 void LogPool::writeLog(void)
459 {
460   createFile();
461   OPEN_LOG
462   writeHeader();
463   if (nonDeltaLog) write(0);
464   if (deltaLog) write(1);
465   CLOSE_LOG
466 }
467
468 void LogPool::write(int writedelta) 
469 {
470   // **CW** Simple delta encoding implementation
471   // prevTime has to be maintained as an object variable because
472   // LogPool::write may be called several times depending on the
473   // +logsize value.
474   PUP::er *p = NULL;
475   if (binary) {
476     p = new PUP::toDisk(writedelta?deltafp:fp);
477   }
478 #if CMK_PROJECTIONS_USE_ZLIB
479   else if (compressed) {
480     p = new toProjectionsGZFile(writedelta?deltazfp:zfp);
481   }
482 #endif
483   else {
484     p = new toProjectionsFile(writedelta?deltafp:fp);
485   }
486   CmiAssert(p);
487   int curPhase = 0;
488   // **FIXME** - Should probably consider a more sophisticated bounds-based
489   //   approach for selective writing instead of making multiple if-checks
490   //   for every single event.
491   for(UInt i=0; i<numEntries; i++) {
492     if (!writedelta) {
493       if (keepPhase == NULL) {
494         // default case, when no phase selection is required.
495         pool[i].pup(*p);
496       } else {
497         // **FIXME** Might be a good idea to create a "filler" event block for
498         //   all the events taken out by phase filtering.
499         if (pool[i].type == END_PHASE) {
500           // always write phase markers
501           pool[i].pup(*p);
502           curPhase++;
503         } else if (pool[i].type == BEGIN_COMPUTATION ||
504                    pool[i].type == END_COMPUTATION) {
505           // always write BEGIN and END COMPUTATION markers
506           pool[i].pup(*p);
507         } else if (keepPhase[curPhase]) {
508           pool[i].pup(*p);
509         }
510       }
511     }
512     else {      // delta
513       // **FIXME** Implement phase-selective writing for delta logs
514       //   eventually
515       double time = pool[i].time;
516       if (pool[i].type != BEGIN_COMPUTATION && pool[i].type != END_COMPUTATION)
517       {
518         double timeDiff = (time-prevTime)*1.0e6;
519         UInt intTimeDiff = (UInt)timeDiff;
520         timeErr += timeDiff - intTimeDiff; /* timeErr is never >= 2.0 */
521         if (timeErr > 1.0) {
522           timeErr -= 1.0;
523           intTimeDiff++;
524         }
525         pool[i].time = intTimeDiff/1.0e6;
526       }
527       pool[i].pup(*p);
528       pool[i].time = time;      // restore time value
529       prevTime = time;
530     }
531   }
532   delete p;
533   delete [] keepPhase;
534 }
535
536 void LogPool::writeSts(void)
537 {
538   // for whining compilers
539   int i;
540   char name[30];
541   // generate an automatic unique ID for each log
542   fprintf(stsfp, "PROJECTIONS_ID %s\n", "");
543   fprintf(stsfp, "VERSION %s\n", PROJECTION_VERSION);
544   fprintf(stsfp, "TOTAL_PHASES %d\n", numPhases);
545 #if CMK_HAS_COUNTER_PAPI
546   fprintf(stsfp, "TOTAL_PAPI_EVENTS %d\n", numPAPIEvents);
547   // for now, use i, next time use papiEvents[i].
548   // **CW** papi event names is a hack.
549   for (i=0;i<numPAPIEvents;i++) {
550     fprintf(stsfp, "PAPI_EVENT %d %s\n", i, papiEventNames[i]);
551   }
552 #endif
553   traceWriteSTS(stsfp,CkpvAccess(usrEvents)->length());
554   for(i=0;i<CkpvAccess(usrEvents)->length();i++){
555     fprintf(stsfp, "EVENT %d %s\n", (*CkpvAccess(usrEvents))[i]->e, (*CkpvAccess(usrEvents))[i]->str);
556   }     
557 }
558
559 void LogPool::writeSts(TraceProjections *traceProj){
560   writeSts();
561   if (traceProj != NULL) {
562     CkHashtableIterator  *funcIter = traceProj->getfuncIterator();
563     funcIter->seekStart();
564     int numFuncs = traceProj->getFuncNumber();
565     fprintf(stsfp,"TOTAL_FUNCTIONS %d \n",numFuncs);
566     while(funcIter->hasNext()) {
567       StrKey *key;
568       int *obj = (int *)funcIter->next((void **)&key);
569       fprintf(stsfp,"FUNCTION %d %s \n",*obj,key->getStr());
570     }
571   }
572   fprintf(stsfp, "END\n");
573   fclose(stsfp);
574 }
575
576 void LogPool::writeRC(void)
577 {
578     //CkPrintf("write RC is being executed\n");
579 #ifdef PROJ_ANALYSIS  
580     CkAssert(CkMyPe() == 0);
581     fprintf(rcfp,"RC_GLOBAL_START_TIME %lld\n",
582           (CMK_TYPEDEF_UINT8)(1.0e6*globalStartTime));
583     fprintf(rcfp,"RC_GLOBAL_END_TIME   %lld\n",
584           (CMK_TYPEDEF_UINT8)(1.0e6*globalEndTime));
585     /* //Yanhua comment it because isOutlierAutomatic is not a variable in trace
586     if (CkpvAccess(_trace)->isOutlierAutomatic()) {
587       fprintf(rcfp,"RC_OUTLIER_FILTERED true\n");
588     } else {
589       fprintf(rcfp,"RC_OUTLIER_FILTERED false\n");
590     }
591     */
592 #endif //PROJ_ANALYSIS
593   fclose(rcfp);
594 }
595
596
597 #if CMK_BLUEGENE_CHARM
598 static void updateProjLog(void *data, double t, double recvT, void *ptr)
599 {
600   LogEntry *log = (LogEntry *)data;
601   FILE *fp = *(FILE **)ptr;
602   log->time = t;
603   log->recvTime = recvT<0.0?0:recvT;
604 //  log->write(fp);
605   toProjectionsFile p(fp);
606   log->pup(p);
607 }
608 #endif
609
610 // flush log entries to disk
611 void LogPool::flushLogBuffer()
612 {
613   if (numEntries) {
614     double writeTime = TraceTimer();
615     writeLog();
616     hasFlushed = true;
617     numEntries = 0;
618     new (&pool[numEntries++]) LogEntry(writeTime, BEGIN_INTERRUPT);
619     new (&pool[numEntries++]) LogEntry(TraceTimer(), END_INTERRUPT);
620   }
621 }
622
623 void LogPool::add(UChar type, UShort mIdx, UShort eIdx,
624                   double time, int event, int pe, int ml, CmiObjId *id, 
625                   double recvT, double cpuT, int numPe)
626 {
627   new (&pool[numEntries++])
628     LogEntry(time, type, mIdx, eIdx, event, pe, ml, id, recvT, cpuT, numPe);
629   if ((type == END_PHASE) || (type == END_COMPUTATION)) {
630     numPhases++;
631   }
632   if(poolSize==numEntries) {
633     flushLogBuffer();
634 #if CMK_BLUEGENE_CHARM
635     extern int correctTimeLog;
636     if (correctTimeLog) CmiAbort("I/O interrupt!\n");
637 #endif
638   }
639 #if CMK_BLUEGENE_CHARM
640   switch (type) {
641     case BEGIN_PROCESSING:
642       pool[numEntries-1].recvTime = BgGetRecvTime();
643     case END_PROCESSING:
644     case BEGIN_COMPUTATION:
645     case END_COMPUTATION:
646     case CREATION:
647     case BEGIN_PACK:
648     case END_PACK:
649     case BEGIN_UNPACK:
650     case END_UNPACK:
651     case USER_EVENT_PAIR:
652       bgAddProjEvent(&pool[numEntries-1], numEntries-1, time, updateProjLog, &fp, BG_EVENT_PROJ);
653   }
654 #endif
655 }
656
657 void LogPool::add(UChar type,double time,UShort funcID,int lineNum,char *fileName){
658 #ifndef CMK_BLUEGENE_CHARM
659   new (&pool[numEntries++])
660         LogEntry(time,type,funcID,lineNum,fileName);
661   if(poolSize == numEntries){
662     flushLogBuffer();
663   }
664 #endif  
665 }
666
667
668   
669 void LogPool::addMemoryUsage(unsigned char type,double time,double memUsage){
670 #ifndef CMK_BLUEGENE_CHARM
671   new (&pool[numEntries++])
672         LogEntry(type,time,memUsage);
673   if(poolSize == numEntries){
674     flushLogBuffer();
675   }
676 #endif  
677         
678 }  
679
680
681
682 void LogPool::addUserSupplied(int data){
683         // add an event
684         add(USER_SUPPLIED, 0, 0, TraceTimer(), -1, -1, 0, 0, 0, 0, 0 );
685
686         // set the user supplied value for the previously created event 
687         pool[numEntries-1].setUserSuppliedData(data);
688   }
689
690
691 void LogPool::addUserSuppliedNote(char *note){
692         // add an event
693         add(USER_SUPPLIED_NOTE, 0, 0, TraceTimer(), -1, -1, 0, 0, 0, 0, 0 );
694
695         // set the user supplied note for the previously created event 
696         pool[numEntries-1].setUserSuppliedNote(note);
697   }
698
699 void LogPool::addUserSuppliedBracketedNote(char *note, int eventID, double bt, double et){
700   //CkPrintf("LogPool::addUserSuppliedBracketedNote eventID=%d\n", eventID);
701 #ifndef CMK_BLUEGENE_CHARM
702 #if MPI_TRACE_MACHINE_HACK
703   //This part of code is used  to combine the contiguous
704   //MPI_Test and MPI_Iprobe events to reduce the number of
705   //entries
706 #define MPI_TEST_EVENT_ID 60
707 #define MPI_IPROBE_EVENT_ID 70 
708   int lastEvent = pool[numEntries-1].event;
709   if((eventID==MPI_TEST_EVENT_ID || eventID==MPI_IPROBE_EVENT_ID) && (eventID==lastEvent)){
710     //just replace the endtime of last event
711     //CkPrintf("addUserSuppliedBracketNote: for event %d\n", lastEvent);
712     pool[numEntries].endTime = et;
713   }else{
714     new (&pool[numEntries++])
715       LogEntry(bt, et, USER_SUPPLIED_BRACKETED_NOTE, note, eventID);
716   }
717 #else
718   new (&pool[numEntries++])
719     LogEntry(bt, et, USER_SUPPLIED_BRACKETED_NOTE, note, eventID);
720 #endif
721   if(poolSize == numEntries){
722     flushLogBuffer();
723   }
724 #endif  
725 }
726
727
728 /* **CW** Not sure if this is the right thing to do. Feels more like
729    a hack than a solution to Sameer's request to add the destination
730    processor information to multicasts and broadcasts.
731
732    In the unlikely event this method is used for Broadcasts as well,
733    pelist == NULL will be used to indicate a global broadcast with 
734    num PEs.
735 */
736 void LogPool::addCreationMulticast(UShort mIdx, UShort eIdx, double time,
737                                    int event, int pe, int ml, CmiObjId *id,
738                                    double recvT, int numPe, int *pelist)
739 {
740   new (&pool[numEntries++])
741     LogEntry(time, mIdx, eIdx, event, pe, ml, id, recvT, numPe, pelist);
742   if(poolSize==numEntries) {
743     flushLogBuffer();
744   }
745 }
746
747 void LogPool::postProcessLog()
748 {
749 #if CMK_BLUEGENE_CHARM
750   bgUpdateProj(1);   // event type
751 #endif
752 }
753
754 void LogPool::modLastEntryTimestamp(double ts)
755 {
756   pool[numEntries-1].time = ts;
757   //pool[numEntries-1].cputime = ts;
758 }
759
760 // /** Constructor for a multicast log entry */
761 // 
762 //  THIS WAS MOVED TO trace-projections.h with the other constructors
763 // 
764 // LogEntry::LogEntry(double tm, unsigned short m, unsigned short e, int ev, int p,
765 //           int ml, CmiObjId *d, double rt, int numPe, int *pelist) 
766 // {
767 //     type = CREATION_MULTICAST; mIdx = m; eIdx = e; event = ev; pe = p; time = tm; msglen = ml;
768 //     if (d) id = *d; else {id.id[0]=id.id[1]=id.id[2]=id.id[3]=-1; };
769 //     recvTime = rt; 
770 //     numpes = numPe;
771 //     userSuppliedNote = NULL;
772 //     if (pelist != NULL) {
773 //      pes = new int[numPe];
774 //      for (int i=0; i<numPe; i++) {
775 //        pes[i] = pelist[i];
776 //      }
777 //     } else {
778 //      pes= NULL;
779 //     }
780 // }
781
782 void LogEntry::addPapi(int numPapiEvts, int *papi_ids, LONG_LONG_PAPI *papiVals)
783 {
784 #if CMK_HAS_COUNTER_PAPI
785   numPapiEvents = numPapiEvts;
786   if (papiVals != NULL) {
787     papiIDs = new int[numPapiEvents];
788     papiValues = new LONG_LONG_PAPI[numPapiEvents];
789     for (int i=0; i<numPapiEvents; i++) {
790       papiIDs[i] = papi_ids[i];
791       papiValues[i] = papiVals[i];
792     }
793   }
794 #endif
795 }
796
797
798
799 void LogEntry::pup(PUP::er &p)
800 {
801   int i;
802   CMK_TYPEDEF_UINT8 itime, iEndTime, irecvtime, icputime;
803   char ret = '\n';
804
805   p|type;
806   if (p.isPacking()) itime = (CMK_TYPEDEF_UINT8)(1.0e6*time);
807   if (p.isPacking()) iEndTime = (CMK_TYPEDEF_UINT8)(1.0e6*endTime);
808
809   switch (type) {
810     case USER_EVENT:
811     case USER_EVENT_PAIR:
812       p|mIdx; p|itime; p|event; p|pe;
813       break;
814     case BEGIN_IDLE:
815     case END_IDLE:
816     case BEGIN_PACK:
817     case END_PACK:
818     case BEGIN_UNPACK:
819     case END_UNPACK:
820       p|itime; p|pe; 
821       break;
822     case BEGIN_PROCESSING:
823       if (p.isPacking()) {
824         irecvtime = (CMK_TYPEDEF_UINT8)(recvTime==-1?-1:1.0e6*recvTime);
825         icputime = (CMK_TYPEDEF_UINT8)(1.0e6*cputime);
826       }
827       p|mIdx; p|eIdx; p|itime; p|event; p|pe; 
828       p|msglen; p|irecvtime; 
829       p|id.id[0]; p|id.id[1]; p|id.id[2]; p|id.id[3];
830       p|icputime;
831 #if CMK_HAS_COUNTER_PAPI
832       p|numPapiEvents;
833       for (i=0; i<numPapiEvents; i++) {
834         // not yet!!!
835         //      p|papiIDs[i]; 
836         p|papiValues[i];
837         
838       }
839 #else
840       p|numPapiEvents;     // non papi version has value 0
841 #endif
842       if (p.isUnpacking()) {
843         recvTime = irecvtime/1.0e6;
844         cputime = icputime/1.0e6;
845       }
846       break;
847     case END_PROCESSING:
848       if (p.isPacking()) icputime = (CMK_TYPEDEF_UINT8)(1.0e6*cputime);
849       p|mIdx; p|eIdx; p|itime; p|event; p|pe; p|msglen; p|icputime;
850 #if CMK_HAS_COUNTER_PAPI
851       p|numPapiEvents;
852       for (i=0; i<numPapiEvents; i++) {
853         // not yet!!!
854         //      p|papiIDs[i];
855         p|papiValues[i];
856       }
857 #else
858       p|numPapiEvents;  // non papi version has value 0
859 #endif
860       if (p.isUnpacking()) cputime = icputime/1.0e6;
861       break;
862     case USER_SUPPLIED:
863           p|userSuppliedData;
864           p|itime;
865         break;
866     case USER_SUPPLIED_NOTE:
867           p|itime;
868           int length;
869           if (p.isPacking()) length = strlen(userSuppliedNote);
870           p | length;
871           char space;
872           space = ' ';
873           p | space;
874           if (p.isUnpacking()) {
875             userSuppliedNote = new char[length+1];
876             userSuppliedNote[length] = '\0';
877           }
878           PUParray(p,userSuppliedNote, length);
879           break;
880     case USER_SUPPLIED_BRACKETED_NOTE:
881       //CkPrintf("Writting out a USER_SUPPLIED_BRACKETED_NOTE\n");
882           p|itime;
883           p|iEndTime;
884           p|event;
885           int length2;
886           if (p.isPacking()) length2 = strlen(userSuppliedNote);
887           p | length2;
888           char space2;
889           space2 = ' ';
890           p | space2;
891           if (p.isUnpacking()) {
892             userSuppliedNote = new char[length+1];
893             userSuppliedNote[length] = '\0';
894           }
895           PUParray(p,userSuppliedNote, length2);
896           break;
897     case MEMORY_USAGE_CURRENT:
898       p | memUsage;
899       p | itime;
900         break;
901     case CREATION:
902       if (p.isPacking()) irecvtime = (CMK_TYPEDEF_UINT8)(1.0e6*recvTime);
903       p|mIdx; p|eIdx; p|itime;
904       p|event; p|pe; p|msglen; p|irecvtime;
905       if (p.isUnpacking()) recvTime = irecvtime/1.0e6;
906       break;
907     case CREATION_BCAST:
908       if (p.isPacking()) irecvtime = (CMK_TYPEDEF_UINT8)(1.0e6*recvTime);
909       p|mIdx; p|eIdx; p|itime;
910       p|event; p|pe; p|msglen; p|irecvtime; p|numpes;
911       if (p.isUnpacking()) recvTime = irecvtime/1.0e6;
912       break;
913     case CREATION_MULTICAST:
914       if (p.isPacking()) irecvtime = (CMK_TYPEDEF_UINT8)(1.0e6*recvTime);
915       p|mIdx; p|eIdx; p|itime;
916       p|event; p|pe; p|msglen; p|irecvtime; p|numpes;
917       if (p.isUnpacking()) pes = numpes?new int[numpes]:NULL;
918       for (i=0; i<numpes; i++) p|pes[i];
919       if (p.isUnpacking()) recvTime = irecvtime/1.0e6;
920       break;
921     case MESSAGE_RECV:
922       p|mIdx; p|eIdx; p|itime; p|event; p|pe; p|msglen;
923       break;
924
925     case ENQUEUE:
926     case DEQUEUE:
927       p|mIdx; p|itime; p|event; p|pe;
928       break;
929
930     case BEGIN_INTERRUPT:
931     case END_INTERRUPT:
932       p|itime; p|event; p|pe;
933       break;
934
935       // **CW** absolute timestamps are used here to support a quick
936       // way of determining the total time of a run in projections
937       // visualization.
938     case BEGIN_COMPUTATION:
939     case END_COMPUTATION:
940     case BEGIN_TRACE:
941     case END_TRACE:
942       p|itime;
943       break;
944     case BEGIN_FUNC:
945         p | itime;
946         p | mIdx;
947         p | event;
948         if(!p.isUnpacking()){
949                 p(fName,flen-1);
950         }
951         break;
952     case END_FUNC:
953         p | itime;
954         p | mIdx;
955         break;
956     case END_PHASE:
957       p|eIdx; // FIXME: actually the phase ID
958       p|itime;
959       break;
960     default:
961       CmiError("***Internal Error*** Wierd Event %d.\n", type);
962       break;
963   }
964   if (p.isUnpacking()) time = itime/1.0e6;
965   p|ret;
966 }
967
968 TraceProjections::TraceProjections(char **argv): 
969   curevent(0), inEntry(0), computationStarted(0), 
970         converseExit(0), endTime(0.0), traceNestedEvents(0),
971         currentPhaseID(0), lastPhaseEvent(NULL)
972 {
973   //  CkPrintf("Trace projections dummy constructor called on %d\n",CkMyPe());
974
975   if (CkpvAccess(traceOnPe) == 0) return;
976
977   CtvInitialize(int,curThreadEvent);
978   CkpvInitialize(CmiInt8, CtrLogBufSize);
979   CkpvAccess(CtrLogBufSize) = DefaultLogBufSize;
980   CtvAccess(curThreadEvent)=0;
981   if (CmiGetArgLongDesc(argv,"+logsize",&CkpvAccess(CtrLogBufSize), 
982                        "Log entries to buffer per I/O")) {
983     if (CkMyPe() == 0) {
984       CmiPrintf("Trace: logsize: %ld\n", CkpvAccess(CtrLogBufSize));
985     }
986   }
987   checknested = 
988     CmiGetArgFlagDesc(argv,"+checknested",
989                       "check projections nest begin end execute events");
990   traceNestedEvents = 
991     CmiGetArgFlagDesc(argv,"+tracenested",
992               "trace projections nest begin/end execute events");
993   int binary = 
994     CmiGetArgFlagDesc(argv,"+binary-trace",
995                       "Write log files in binary format");
996
997   CmiInt8 nSubdirs = 0;
998   CmiGetArgLongDesc(argv,"+trace-subdirs", &nSubdirs, "Number of subdirectories into which traces will be written");
999
1000
1001 #if CMK_PROJECTIONS_USE_ZLIB
1002   int compressed = CmiGetArgFlagDesc(argv,"+gz-trace","Write log files pre-compressed with gzip");
1003 #else
1004   // consume the flag so there's no confusing
1005   CmiGetArgFlagDesc(argv,"+gz-trace",
1006                     "Write log files pre-compressed with gzip");
1007   if(CkMyPe() == 0) CkPrintf("Warning> gz-trace is not supported on this machine!\n");
1008 #endif
1009
1010   // **CW** default to non delta log encoding. The user may choose to do
1011   // create both logs (for debugging) or just the old log timestamping
1012   // (for compatibility).
1013   // Generating just the non delta log takes precedence over generating
1014   // both logs (if both arguments appear on the command line).
1015
1016   // switch to OLD log format until everything works // Gengbin
1017   nonDeltaLog = 1;
1018   deltaLog = 0;
1019   deltaLog = CmiGetArgFlagDesc(argv, "+logDelta",
1020                                   "Generate Delta encoded and simple timestamped log files");
1021
1022   _logPool = new LogPool(CkpvAccess(traceRoot));
1023   _logPool->setNumSubdirs(nSubdirs);
1024   _logPool->setBinary(binary);
1025 #if CMK_PROJECTIONS_USE_ZLIB
1026   _logPool->setCompressed(compressed);
1027 #endif
1028   if (CkMyPe() == 0) {
1029     _logPool->createSts();
1030     _logPool->createRC();
1031   }
1032   funcCount=1;
1033
1034 #if CMK_HAS_COUNTER_PAPI
1035   // We initialize and create the event sets for use with PAPI here.
1036   int papiRetValue = PAPI_library_init(PAPI_VER_CURRENT);
1037   if (papiRetValue != PAPI_VER_CURRENT) {
1038     CmiAbort("PAPI Library initialization failure!\n");
1039   }
1040   // PAPI 3 mandates the initialization of the set to PAPI_NULL
1041   papiEventSet = PAPI_NULL; 
1042   if (PAPI_create_eventset(&papiEventSet) != PAPI_OK) {
1043     CmiAbort("PAPI failed to create event set!\n");
1044   }
1045   papiRetValue = PAPI_add_events(papiEventSet, papiEvents, numPAPIEvents);
1046   if (papiRetValue != PAPI_OK) {
1047     if (papiRetValue == PAPI_ECNFLCT) {
1048       CmiAbort("PAPI events conflict! Please re-assign event types!\n");
1049     } else {
1050       CmiAbort("PAPI failed to add designated events!\n");
1051     }
1052   }
1053   papiValues = new long_long[numPAPIEvents];
1054   memset(papiValues, 0, numPAPIEvents*sizeof(long_long));
1055 #endif
1056 }
1057
1058 int TraceProjections::traceRegisterUserEvent(const char* evt, int e)
1059 {
1060   OPTIMIZED_VERSION
1061   CkAssert(e==-1 || e>=0);
1062   CkAssert(evt != NULL);
1063   int event;
1064   int biggest = -1;
1065   for (int i=0; i<CkpvAccess(usrEvents)->length(); i++) {
1066     int cur = (*CkpvAccess(usrEvents))[i]->e;
1067     if (cur == e) {
1068       //CmiPrintf("%s %s\n", (*CkpvAccess(usrEvents))[i]->str, evt);
1069       if (strcmp((*CkpvAccess(usrEvents))[i]->str, evt) == 0) 
1070         return e;
1071       else
1072         CmiAbort("UserEvent double registered!");
1073     }
1074     if (cur > biggest) biggest = cur;
1075   }
1076   // if no user events have so far been registered. biggest will be -1
1077   // and hence newly assigned event numbers will begin from 0.
1078   if (e==-1) event = biggest+1;  // automatically assign new event number
1079   else event = e;
1080   CkpvAccess(usrEvents)->push_back(new UsrEvent(event,(char *)evt));
1081   return event;
1082 }
1083
1084 void TraceProjections::traceClearEps(void)
1085 {
1086   // In trace-summary, this zeros out the EP bins, to eliminate noise
1087   // from startup.  Here, this isn't useful, since we can do that in
1088   // post-processing
1089 }
1090
1091 void TraceProjections::traceWriteSts(void)
1092 {
1093   if(CkMyPe()==0)
1094     _logPool->writeSts(this);
1095 }
1096
1097 /** 
1098  * **IMPT NOTES**:
1099  *
1100  * This is called when Converse closes during ConverseCommonExit().
1101  * **FIXME**(?) - is this also exposed as a tracing-framework API call?
1102  *
1103  * Some programs bypass CkExit() (like NAMD, which eventually calls
1104  * ConverseExit()), modules like traces will have to pretend to shutdown
1105  * as if CkExit() was called but at the same time avoid making
1106  * subsequent CkExit() calls (which is usually required for allowing
1107  * other modules to shutdown).
1108  *
1109  * Note that we can only get here if CkExit() was not called, since the
1110  * trace module will un-register itself from TraceArray if it did.
1111  *
1112  */
1113 void TraceProjections::traceClose(void)
1114 {
1115 #ifdef PROJ_ANALYSIS
1116   // CkPrintf("CkExit was not called on shutdown on [%d]\n", CkMyPe());
1117
1118   // sets the flag that tells the code not to make the CkExit call later
1119   converseExit = 1;
1120   if (CkMyPe() == 0) {
1121     CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1122     bocProxy.traceProjectionsParallelShutdown(-1);
1123   }
1124   if(CkMyRank() == CkMyNodeSize()){ //communication thread
1125     CkpvAccess(_trace)->endComputation();
1126     delete _logPool;              // will write
1127     // remove myself from traceArray so that no tracing will be called.
1128     CkpvAccess(_traces)->removeTrace(this);
1129   }
1130 #else
1131   // we've already deleted the logpool, so multiple calls to traceClose
1132   // are tolerated.
1133   if (_logPool == NULL) {
1134     return;
1135   }
1136   if(CkMyPe()==0){
1137     _logPool->writeSts(this);
1138   }
1139   CkpvAccess(_trace)->endComputation();
1140   delete _logPool;              // will write
1141   // remove myself from traceArray so that no tracing will be called.
1142   CkpvAccess(_traces)->removeTrace(this);
1143 #endif
1144 }
1145
1146 /**
1147  *  **IMPT NOTES**:
1148  *
1149  *  This is meant to be called internally by the tracing framework.
1150  *
1151  */
1152 void TraceProjections::closeTrace() {
1153   //  CkPrintf("Close Trace called on [%d]\n", CkMyPe());
1154   if (CkMyPe() == 0) {
1155     // CkPrintf("Pe 0 will now write sts and projrc files\n");
1156     _logPool->writeSts(this);
1157     _logPool->writeRC();
1158     // CkPrintf("Pe 0 has now written sts and projrc files\n");
1159   }
1160   delete _logPool;       // will write logs to file
1161 }
1162
1163 #if CMK_SMP_TRACE_COMMTHREAD
1164 void TraceProjections::traceBeginOnCommThread()
1165 {
1166   if (!computationStarted) return;
1167   _logPool->add(BEGIN_TRACE, 0, 0, TraceTimer(), curevent++, CmiNumPes()+CmiMyNode());
1168 }
1169
1170 void TraceProjections::traceEndOnCommThread()
1171 {
1172   _logPool->add(END_TRACE, 0, 0, TraceTimer(), curevent++, CmiNumPes()+CmiMyNode());
1173 }
1174 #endif
1175
1176 void TraceProjections::traceBegin(void)
1177 {
1178   if (!computationStarted) return;
1179   _logPool->add(BEGIN_TRACE, 0, 0, TraceTimer(), curevent++, CkMyPe());
1180 }
1181
1182 void TraceProjections::traceEnd(void)
1183 {
1184   _logPool->add(END_TRACE, 0, 0, TraceTimer(), curevent++, CkMyPe());
1185 }
1186
1187 void TraceProjections::userEvent(int e)
1188 {
1189   if (!computationStarted) return;
1190   _logPool->add(USER_EVENT, e, 0, TraceTimer(),curevent++,CkMyPe());
1191 }
1192
1193 void TraceProjections::userBracketEvent(int e, double bt, double et)
1194 {
1195   if (!computationStarted) return;
1196   // two events record Begin/End of event e.
1197   _logPool->add(USER_EVENT_PAIR, e, 0, TraceTimer(bt), curevent, CkMyPe());
1198   _logPool->add(USER_EVENT_PAIR, e, 0, TraceTimer(et), curevent++, CkMyPe());
1199 }
1200
1201 void TraceProjections::userSuppliedData(int d)
1202 {
1203   if (!computationStarted) return;
1204   _logPool->addUserSupplied(d);
1205 }
1206
1207 void TraceProjections::userSuppliedNote(char *note)
1208 {
1209   if (!computationStarted) return;
1210   _logPool->addUserSuppliedNote(note);
1211 }
1212
1213
1214 void TraceProjections::userSuppliedBracketedNote(char *note, int eventID, double bt, double et)
1215 {
1216   if (!computationStarted) return;
1217   _logPool->addUserSuppliedBracketedNote(note,  eventID,  bt, et);
1218 }
1219
1220 void TraceProjections::memoryUsage(double m)
1221 {
1222   if (!computationStarted) return;
1223   _logPool->addMemoryUsage(MEMORY_USAGE_CURRENT, TraceTimer(), m );
1224   
1225 }
1226
1227
1228 void TraceProjections::creation(envelope *e, int ep, int num)
1229 {
1230   double curTime = TraceTimer();
1231   if (e == 0) {
1232     CtvAccess(curThreadEvent) = curevent;
1233     _logPool->add(CREATION, ForChareMsg, ep, curTime,
1234                   curevent++, CkMyPe(), 0, NULL, 0, 0.0);
1235   } else {
1236     int type=e->getMsgtype();
1237     e->setEvent(curevent);
1238     if (num > 1) {
1239       _logPool->add(CREATION_BCAST, type, ep, curTime,
1240                     curevent++, CkMyPe(), e->getTotalsize(), 
1241                     NULL, 0, 0.0, num);
1242     } else {
1243       _logPool->add(CREATION, type, ep, curTime,
1244                     curevent++, CkMyPe(), e->getTotalsize(), 
1245                     NULL, 0, 0.0);
1246     }
1247   }
1248 }
1249
1250 void TraceProjections::creation(char *msg)
1251 {
1252 #if CMK_SMP_TRACE_COMMTHREAD
1253         //This function is only called from a comm thread
1254         //in SMP mode. So, it is possible the msg is not
1255         //a charm msg that contains an envelope, ep idx.
1256         envelope *e = (envelope *)msg;
1257         int ep = e->getEpIdx();
1258         int num = _entryTable.size();
1259         if(ep<num && ep>=0 && _entryTable[ep]->traceEnabled)
1260                 creation(e, ep, 1);
1261 #endif
1262 }
1263
1264
1265 /* **CW** Non-disruptive attempt to add destination PE knowledge to
1266    Communication Library-specific Multicasts via new event 
1267    CREATION_MULTICAST.
1268 */
1269
1270 void TraceProjections::creationMulticast(envelope *e, int ep, int num,
1271                                          int *pelist)
1272 {
1273   double curTime = TraceTimer();
1274   if (e==0) {
1275     CtvAccess(curThreadEvent)=curevent;
1276     _logPool->addCreationMulticast(ForChareMsg, ep, curTime, curevent++,
1277                                    CkMyPe(), 0, 0, 0.0, num, pelist);
1278   } else {
1279     int type=e->getMsgtype();
1280     e->setEvent(curevent);
1281     _logPool->addCreationMulticast(type, ep, curTime, curevent++, CkMyPe(),
1282                                    e->getTotalsize(), 0, 0.0, num, pelist);
1283   }
1284 }
1285
1286 void TraceProjections::creationDone(int num)
1287 {
1288   // modified the creation done time of the last num log entries
1289   // FIXME: totally a hack
1290   double curTime = TraceTimer();
1291   int idx = _logPool->numEntries-1;
1292   while (idx >=0 && num >0 ) {
1293     LogEntry &log = _logPool->pool[idx];
1294     if ((log.type == CREATION) ||
1295         (log.type == CREATION_BCAST) ||
1296         (log.type == CREATION_MULTICAST)) {
1297       log.recvTime = curTime - log.time;
1298       num --;
1299     }
1300     idx--;
1301   }
1302 }
1303
1304 void TraceProjections::beginExecute(CmiObjId *tid)
1305 {
1306 #if CMK_HAS_COUNTER_PAPI
1307   if (PAPI_read(papiEventSet, papiValues) != PAPI_OK) {
1308     CmiAbort("PAPI failed to read at begin execute!\n");
1309   }
1310 #endif
1311   if (checknested && inEntry) CmiAbort("Nested Begin Execute!\n");
1312   execEvent = CtvAccess(curThreadEvent);
1313   execEp = (-1);
1314   _logPool->add(BEGIN_PROCESSING,ForChareMsg,_threadEP,TraceTimer(),
1315                 execEvent,CkMyPe(), 0, tid);
1316 #if CMK_HAS_COUNTER_PAPI
1317   _logPool->addPapi(numPAPIEvents, papiEvents, papiValues);
1318 #endif
1319   inEntry = 1;
1320 }
1321
1322 void TraceProjections::beginExecute(envelope *e)
1323 {
1324   if(e==0) {
1325 #if CMK_HAS_COUNTER_PAPI
1326     if (PAPI_read(papiEventSet, papiValues) != PAPI_OK) {
1327       CmiAbort("PAPI failed to read at begin execute!\n");
1328     }
1329 #endif
1330     if (checknested && inEntry) CmiAbort("Nested Begin Execute!\n");
1331     execEvent = CtvAccess(curThreadEvent);
1332     execEp = (-1);
1333     _logPool->add(BEGIN_PROCESSING,ForChareMsg,_threadEP,TraceTimer(),
1334                   execEvent,CkMyPe(), 0, NULL, 0.0, TraceCpuTimer());
1335 #if CMK_HAS_COUNTER_PAPI
1336     _logPool->addPapi(numPAPIEvents, papiEvents, papiValues);
1337 #endif
1338     inEntry = 1;
1339   } else {
1340     beginExecute(e->getEvent(),e->getMsgtype(),e->getEpIdx(),
1341                  e->getSrcPe(),e->getTotalsize());
1342   }
1343 }
1344
1345 void TraceProjections::beginExecute(char *msg){
1346 #if CMK_SMP_TRACE_COMMTHREAD
1347         //This function is called from comm thread in SMP mode
1348     envelope *e = (envelope *)msg;
1349     int num = _entryTable.size();
1350     int ep = e->getEpIdx();
1351     if(ep<0 || ep>=num) return;
1352     if(_entryTable[ep]->traceEnabled)
1353                 beginExecute(e);
1354 #endif
1355 }
1356
1357 void TraceProjections::beginExecute(int event, int msgType, int ep, int srcPe,
1358                                     int mlen, CmiObjId *idx)
1359 {
1360   if (traceNestedEvents) {
1361     if (! nestedEvents.isEmpty()) {
1362       endExecuteLocal();
1363     }
1364     nestedEvents.enq(NestedEvent(event, msgType, ep, srcPe, mlen, idx));
1365   }
1366   beginExecuteLocal(event, msgType, ep, srcPe, mlen, idx);
1367 }
1368
1369 void TraceProjections::changeLastEntryTimestamp(double ts)
1370 {
1371   _logPool->modLastEntryTimestamp(ts);
1372 }
1373
1374 void TraceProjections::beginExecuteLocal(int event, int msgType, int ep, int srcPe,
1375                                     int mlen, CmiObjId *idx)
1376 {
1377 #if CMK_HAS_COUNTER_PAPI
1378   if (PAPI_read(papiEventSet, papiValues) != PAPI_OK) {
1379     CmiAbort("PAPI failed to read at begin execute!\n");
1380   }
1381 #endif
1382   if (checknested && inEntry) CmiAbort("Nested Begin Execute!\n");
1383   execEvent=event;
1384   execEp=ep;
1385   execPe=srcPe;
1386   _logPool->add(BEGIN_PROCESSING,msgType,ep,TraceTimer(),event,
1387                 srcPe, mlen, idx, 0.0, TraceCpuTimer());
1388 #if CMK_HAS_COUNTER_PAPI
1389   _logPool->addPapi(numPAPIEvents, papiEvents, papiValues);
1390 #endif
1391   inEntry = 1;
1392 }
1393
1394 void TraceProjections::endExecute(void)
1395 {
1396   if (traceNestedEvents) nestedEvents.deq();
1397   endExecuteLocal();
1398   if (traceNestedEvents) {
1399     if (! nestedEvents.isEmpty()) {
1400       NestedEvent &ne = nestedEvents.peek();
1401       beginExecuteLocal(ne.event, ne.msgType, ne.ep, ne.srcPe, ne.ml, ne.idx);
1402     }
1403   }
1404 }
1405
1406 void TraceProjections::endExecute(char *msg)
1407 {
1408 #if CMK_SMP_TRACE_COMMTHREAD
1409         //This function is called from comm thread in SMP mode
1410     envelope *e = (envelope *)msg;
1411     int num = _entryTable.size();
1412     int ep = e->getEpIdx();
1413     if(ep<0 || ep>=num) return;
1414     if(_entryTable[ep]->traceEnabled)
1415                 endExecute();
1416 #endif  
1417 }
1418
1419 void TraceProjections::endExecuteLocal(void)
1420 {
1421 #if CMK_HAS_COUNTER_PAPI
1422   if (PAPI_read(papiEventSet, papiValues) != PAPI_OK) {
1423     CmiAbort("PAPI failed to read at end execute!\n");
1424   }
1425 #endif
1426   if (checknested && !inEntry) CmiAbort("Nested EndExecute!\n");
1427   double cputime = TraceCpuTimer();
1428   if(execEp == (-1)) {
1429     _logPool->add(END_PROCESSING, 0, _threadEP, TraceTimer(),
1430                   execEvent, CkMyPe(), 0, NULL, 0.0, cputime);
1431   } else {
1432     _logPool->add(END_PROCESSING, 0, execEp, TraceTimer(),
1433                   execEvent, execPe, 0, NULL, 0.0, cputime);
1434   }
1435 #if CMK_HAS_COUNTER_PAPI
1436   _logPool->addPapi(numPAPIEvents, papiEvents, papiValues);
1437 #endif
1438   inEntry = 0;
1439 }
1440
1441 void TraceProjections::messageRecv(char *env, int pe)
1442 {
1443 #if 0
1444   envelope *e = (envelope *)env;
1445   int msgType = e->getMsgtype();
1446   int ep = e->getEpIdx();
1447 #if 0
1448   if (msgType==NewChareMsg || msgType==NewVChareMsg
1449           || msgType==ForChareMsg || msgType==ForVidMsg
1450           || msgType==BocInitMsg || msgType==NodeBocInitMsg
1451           || msgType==ForBocMsg || msgType==ForNodeBocMsg)
1452     ep = e->getEpIdx();
1453   else
1454     ep = _threadEP;
1455 #endif
1456   _logPool->add(MESSAGE_RECV, msgType, ep, TraceTimer(),
1457                 curevent++, e->getSrcPe(), e->getTotalsize());
1458 #endif
1459 }
1460
1461 void TraceProjections::beginIdle(double curWallTime)
1462 {
1463   _logPool->add(BEGIN_IDLE, 0, 0, TraceTimer(curWallTime), 0, CkMyPe());
1464 }
1465
1466 void TraceProjections::endIdle(double curWallTime)
1467 {
1468   _logPool->add(END_IDLE, 0, 0, TraceTimer(curWallTime), 0, CkMyPe());
1469 }
1470
1471 void TraceProjections::beginPack(void)
1472 {
1473   _logPool->add(BEGIN_PACK, 0, 0, TraceTimer(), 0, CkMyPe());
1474 }
1475
1476 void TraceProjections::endPack(void)
1477 {
1478   _logPool->add(END_PACK, 0, 0, TraceTimer(), 0, CkMyPe());
1479 }
1480
1481 void TraceProjections::beginUnpack(void)
1482 {
1483   _logPool->add(BEGIN_UNPACK, 0, 0, TraceTimer(), 0, CkMyPe());
1484 }
1485
1486 void TraceProjections::endUnpack(void)
1487 {
1488   _logPool->add(END_UNPACK, 0, 0, TraceTimer(), 0, CkMyPe());
1489 }
1490
1491 void TraceProjections::enqueue(envelope *) {}
1492
1493 void TraceProjections::dequeue(envelope *) {}
1494
1495 void TraceProjections::beginComputation(void)
1496 {
1497   computationStarted = 1;
1498
1499   // Executes the callback function provided by the machine
1500   // layer. This is the proper method to register user events in a
1501   // machine layer because projections is a charm++ module.
1502   if (CkpvAccess(traceOnPe) != 0) {
1503     void (*ptr)() = registerMachineUserEvents();
1504     if (ptr != NULL) {
1505       ptr();
1506     }
1507   }
1508 //  CkpvAccess(traceInitTime) = TRACE_TIMER();
1509 //  CkpvAccess(traceInitCpuTime) = TRACE_CPUTIMER();
1510   _logPool->add(BEGIN_COMPUTATION, 0, 0, TraceTimer(), -1, -1);
1511 #if CMK_HAS_COUNTER_PAPI
1512   // we start the counters here
1513   if (PAPI_start(papiEventSet) != PAPI_OK) {
1514     CmiAbort("PAPI failed to start designated counters!\n");
1515   }
1516 #endif
1517 }
1518
1519 void TraceProjections::endComputation(void)
1520 {
1521 #if CMK_HAS_COUNTER_PAPI
1522   // we stop the counters here. A silent failure is alright since we
1523   // are already at the end of the program.
1524   if (PAPI_stop(papiEventSet, papiValues) != PAPI_OK) {
1525     CkPrintf("Warning: PAPI failed to stop correctly!\n");
1526   }
1527   // NOTE: We should not do a complete close of PAPI until after the
1528   // sts writer is done.
1529 #endif
1530   endTime = TraceTimer();
1531   _logPool->add(END_COMPUTATION, 0, 0, endTime, -1, -1);
1532   /*
1533   CkPrintf("End Computation [%d] records time as %lf\n", CkMyPe(), 
1534            endTime*1e06);
1535   */
1536 }
1537
1538 int TraceProjections::idxRegistered(int idx)
1539 {
1540     int idxVecLen = idxVec.size();
1541     for(int i=0; i<idxVecLen; i++)
1542     {
1543         if(idx == idxVec[i])
1544             return 1;
1545     }
1546     return 0;
1547 }
1548
1549 void TraceProjections::regFunc(const char *name, int &idx, int idxSpecifiedByUser){
1550     StrKey k((char*)name,strlen(name));
1551     int num = funcHashtable.get(k);
1552     
1553     if(num!=0) {
1554         return;
1555         //as for mpi programs, the same function may be registered for several times
1556         //CmiError("\"%s has been already registered! Please change the name!\"\n", name);
1557     }
1558     
1559     int isIdxExisting=0;
1560     if(idxSpecifiedByUser)
1561         isIdxExisting=idxRegistered(idx);
1562     if(isIdxExisting){
1563         return;
1564         //same reason with num!=0
1565         //CmiError("The identifier %d for the trace function has been already registered!", idx);
1566     }
1567
1568     if(idxSpecifiedByUser) {
1569         char *st = new char[strlen(name)+1];
1570         memcpy(st,name,strlen(name)+1);
1571         StrKey *newKey = new StrKey(st,strlen(st));
1572         int &ref = funcHashtable.put(*newKey);
1573         ref=idx;
1574         funcCount++;
1575         idxVec.push_back(idx);  
1576     } else {
1577         char *st = new char[strlen(name)+1];
1578         memcpy(st,name,strlen(name)+1);
1579         StrKey *newKey = new StrKey(st,strlen(st));
1580         int &ref = funcHashtable.put(*newKey);
1581         ref=funcCount;
1582         num = funcCount;
1583         funcCount++;
1584         idx = num;
1585         idxVec.push_back(idx);
1586     }
1587 }
1588
1589 void TraceProjections::beginFunc(char *name,char *file,int line){
1590         StrKey k(name,strlen(name));    
1591         unsigned short  num = (unsigned short)funcHashtable.get(k);
1592         beginFunc(num,file,line);
1593 }
1594
1595 void TraceProjections::beginFunc(int idx,char *file,int line){
1596         if(idx <= 0){
1597                 CmiError("Unregistered function id %d being used in %s:%d \n",idx,file,line);
1598         }       
1599         _logPool->add(BEGIN_FUNC,TraceTimer(),idx,line,file);
1600 }
1601
1602 void TraceProjections::endFunc(char *name){
1603         StrKey k(name,strlen(name));    
1604         int num = funcHashtable.get(k);
1605         endFunc(num);   
1606 }
1607
1608 void TraceProjections::endFunc(int num){
1609         if(num <= 0){
1610                 printf("endFunc without start :O\n");
1611         }
1612         _logPool->add(END_FUNC,TraceTimer(),num,0,NULL);
1613 }
1614
1615 // specialized PUP:ers for handling trace projections logs
1616 void toProjectionsFile::bytes(void *p,int n,size_t itemSize,dataType t)
1617 {
1618   for (int i=0;i<n;i++) 
1619     switch(t) {
1620     case Tchar: CheckAndFPrintF(f,"%c",((char *)p)[i]); break;
1621     case Tuchar:
1622     case Tbyte: CheckAndFPrintF(f,"%d",((unsigned char *)p)[i]); break;
1623     case Tshort: CheckAndFPrintF(f," %d",((short *)p)[i]); break;
1624     case Tushort: CheckAndFPrintF(f," %u",((unsigned short *)p)[i]); break;
1625     case Tint: CheckAndFPrintF(f," %d",((int *)p)[i]); break;
1626     case Tuint: CheckAndFPrintF(f," %u",((unsigned int *)p)[i]); break;
1627     case Tlong: CheckAndFPrintF(f," %ld",((long *)p)[i]); break;
1628     case Tulong: CheckAndFPrintF(f," %lu",((unsigned long *)p)[i]); break;
1629     case Tfloat: CheckAndFPrintF(f," %.7g",((float *)p)[i]); break;
1630     case Tdouble: CheckAndFPrintF(f," %.15g",((double *)p)[i]); break;
1631 #ifdef CMK_PUP_LONG_LONG
1632     case Tlonglong: CheckAndFPrintF(f," %lld",((CMK_TYPEDEF_INT8 *)p)[i]); break;
1633     case Tulonglong: CheckAndFPrintF(f," %llu",((CMK_TYPEDEF_UINT8 *)p)[i]); break;
1634 #endif
1635     default: CmiAbort("Unrecognized pup type code!");
1636     };
1637 }
1638
1639 void fromProjectionsFile::bytes(void *p,int n,size_t itemSize,dataType t)
1640 {
1641   for (int i=0;i<n;i++) 
1642     switch(t) {
1643     case Tchar: { 
1644       char c = fgetc(f);
1645       if (c==EOF)
1646         parseError("Could not match character");
1647       else
1648         ((char *)p)[i] = c;
1649       break;
1650     }
1651     case Tuchar:
1652     case Tbyte: ((unsigned char *)p)[i]=(unsigned char)readInt("%d"); break;
1653     case Tshort:((short *)p)[i]=(short)readInt(); break;
1654     case Tushort: ((unsigned short *)p)[i]=(unsigned short)readUint(); break;
1655     case Tint:  ((int *)p)[i]=readInt(); break;
1656     case Tuint: ((unsigned int *)p)[i]=readUint(); break;
1657     case Tlong: ((long *)p)[i]=readInt(); break;
1658     case Tulong:((unsigned long *)p)[i]=readUint(); break;
1659     case Tfloat: ((float *)p)[i]=(float)readDouble(); break;
1660     case Tdouble:((double *)p)[i]=readDouble(); break;
1661 #ifdef CMK_PUP_LONG_LONG
1662     case Tlonglong: ((CMK_TYPEDEF_INT8 *)p)[i]=readLongInt(); break;
1663     case Tulonglong: ((CMK_TYPEDEF_UINT8 *)p)[i]=readLongInt(); break;
1664 #endif
1665     default: CmiAbort("Unrecognized pup type code!");
1666     };
1667 }
1668
1669 #if CMK_PROJECTIONS_USE_ZLIB
1670 void toProjectionsGZFile::bytes(void *p,int n,size_t itemSize,dataType t)
1671 {
1672   for (int i=0;i<n;i++) 
1673     switch(t) {
1674     case Tchar: gzprintf(f,"%c",((char *)p)[i]); break;
1675     case Tuchar:
1676     case Tbyte: gzprintf(f,"%d",((unsigned char *)p)[i]); break;
1677     case Tshort: gzprintf(f," %d",((short *)p)[i]); break;
1678     case Tushort: gzprintf(f," %u",((unsigned short *)p)[i]); break;
1679     case Tint: gzprintf(f," %d",((int *)p)[i]); break;
1680     case Tuint: gzprintf(f," %u",((unsigned int *)p)[i]); break;
1681     case Tlong: gzprintf(f," %ld",((long *)p)[i]); break;
1682     case Tulong: gzprintf(f," %lu",((unsigned long *)p)[i]); break;
1683     case Tfloat: gzprintf(f," %.7g",((float *)p)[i]); break;
1684     case Tdouble: gzprintf(f," %.15g",((double *)p)[i]); break;
1685 #ifdef CMK_PUP_LONG_LONG
1686     case Tlonglong: gzprintf(f," %lld",((CMK_TYPEDEF_INT8 *)p)[i]); break;
1687     case Tulonglong: gzprintf(f," %llu",((CMK_TYPEDEF_UINT8 *)p)[i]); break;
1688 #endif
1689     default: CmiAbort("Unrecognized pup type code!");
1690     };
1691 }
1692 #endif
1693
1694 void TraceProjections::endPhase() {
1695   double currentPhaseTime = TraceTimer();
1696   double lastPhaseTime = 0.0;
1697
1698   if (lastPhaseEvent != NULL) {
1699     lastPhaseTime = lastPhaseEvent->time;
1700   } else {
1701     if (_logPool->pool != NULL) {
1702       // assumed to be BEGIN_COMPUTATION
1703       lastPhaseTime = _logPool->pool[0].time;
1704     } else {
1705       CkPrintf("[%d] Warning: End Phase encountered in an empty log. Inserting BEGIN_COMPUTATION event\n", CkMyPe());
1706       _logPool->add(BEGIN_COMPUTATION, 0, 0, currentPhaseTime, -1, -1);
1707       lastPhaseTime = currentPhaseTime;
1708     }
1709   }
1710
1711   /* Insert endPhase event here. */
1712   /* FIXME: Format should be TYPE, PHASE#, TimeStamp, [StartTime] */
1713   /*        We currently "borrow" from the standard add() method. */
1714   /*        It should really be its own add() method.             */
1715   /* NOTE: assignment to lastPhaseEvent is "pre-emptive".         */
1716   lastPhaseEvent = &(_logPool->pool[_logPool->numEntries]);
1717   _logPool->add(END_PHASE, 0, currentPhaseID, currentPhaseTime, -1, CkMyPe());
1718   currentPhaseID++;
1719 }
1720
1721 #ifdef PROJ_ANALYSIS
1722 // ***** FROM HERE, ALL BOC-BASED FUNCTIONALITY IS DEFINED *******
1723
1724
1725 // ***@@@@ REGISTRATION FUNCTIONS/METHODS @@@@***
1726
1727 void registerOutlierReduction() {
1728   outlierReductionType =
1729     CkReduction::addReducer(outlierReduction);
1730   minMaxReductionType =
1731     CkReduction::addReducer(minMaxReduction);
1732 }
1733
1734 /**
1735  * **IMPT NOTES**:
1736  *
1737  * This is the C++ code that is registered to be activated at module
1738  * shutdown. This is called exactly once on processor 0. Module shutdown
1739  * is initiated as a result of a CkExit() call by the application code
1740  * 
1741  * The exit function must ultimately call CkExit() again to
1742  * so that other module exit functions may proceed after this module is
1743  * done.
1744  *
1745  */
1746 // FIXME: WHY extern "C"???
1747 extern "C" void TraceProjectionsExitHandler()
1748 {
1749 #if CMK_TRACE_ENABLED
1750   // CkPrintf("[%d] TraceProjectionsExitHandler called!\n", CkMyPe());
1751   CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1752   bocProxy.traceProjectionsParallelShutdown(CkMyPe());
1753 #else
1754   CkExit();
1755 #endif
1756 }
1757
1758 // This is called once on each processor but the idiom of use appears
1759 // to be to only have processor 0 register the function.
1760 //
1761 // See initnode in trace-projections.ci
1762 void initTraceProjectionsBOC()
1763 {
1764   // CkPrintf("[%d] Trace Projections initialization called!\n", CkMyPe());
1765 #ifdef __BLUEGENE__
1766   if (BgNodeRank() == 0) {
1767 #else
1768     if (CkMyRank() == 0) {
1769 #endif
1770       registerExitFn(TraceProjectionsExitHandler);
1771     }
1772 #if 0
1773   } // this is so indentation does not get messed up
1774 #endif
1775 }
1776
1777 // mainchare for trace-projections BOC-operations. 
1778 // Instantiated at processor 0 and ONLY resides on processor 0 for the 
1779 // rest of its life.
1780 //
1781 // Responsible for:
1782 //   1. Handling commandline arguments
1783 //   2. Creating any objects required for proper BOC operations.
1784 //
1785 TraceProjectionsInit::TraceProjectionsInit(CkArgMsg *msg) {
1786   /** Options for Outlier Analysis */
1787   // defaults. Things will change with support for interactive analysis.
1788   bool findOutliers = false;
1789   bool outlierAutomatic = true;
1790   int numKSeeds = 10; 
1791
1792   int peNumKeep = CkNumPes();  // used as a default
1793   double entryThreshold = 0.0;
1794   bool outlierUsePhases = false;
1795   if (outlierAutomatic) {
1796     CmiGetArgIntDesc(msg->argv, "+outlierNumSeeds", &numKSeeds,
1797                      "Number of cluster seeds to apply at outlier analysis.");
1798     CmiGetArgIntDesc(msg->argv, "+outlierPeNumKeep", 
1799                      &peNumKeep, "Number of Processors to retain data");
1800     CmiGetArgDoubleDesc(msg->argv, "+outlierEpThresh", &entryThreshold,
1801                         "Minimum significance of entry points to be considered for clustering (%).");
1802     findOutliers =
1803       CmiGetArgFlagDesc(msg->argv,"+outlier", "Find Outliers.");
1804     outlierUsePhases = 
1805       CmiGetArgFlagDesc(msg->argv,"+outlierUsePhases",
1806                         "Apply automatic outlier analysis to any available phases.");
1807     if (outlierUsePhases) {
1808       // if the user wants to use an outlier feature, it is assumed outlier
1809       //    analysis is desired.
1810       findOutliers = true;
1811     }
1812   }
1813   bool findStartTime = (CmiTimerAbsolute()==1);
1814   traceProjectionsGID = CProxy_TraceProjectionsBOC::ckNew(findOutliers, findStartTime);
1815   if (findOutliers) {
1816     kMeansGID = CProxy_KMeansBOC::ckNew(outlierAutomatic,
1817                                         numKSeeds,
1818                                         peNumKeep,
1819                                         entryThreshold,
1820                                         outlierUsePhases);
1821   }
1822 }
1823
1824 // Called on every processor.
1825 void TraceProjectionsBOC::traceProjectionsParallelShutdown(int pe) {
1826   //CmiPrintf("[%d] traceProjectionsParallelShutdown called from . \n", CkMyPe(), pe);
1827   endPe = pe;                // the pe that starts CkExit()
1828   if (CkMyPe() == 0) {
1829     analysisStartTime = CmiWallTimer();
1830   }
1831   CkpvAccess(_trace)->endComputation();
1832   // no more tracing for projections on this processor after this point. 
1833   // Note that clear must be called after remove, or bad things will happen.
1834   CkpvAccess(_traces)->removeTrace(CkpvAccess(_trace));
1835   CkpvAccess(_traces)->clearTrace();
1836
1837   // From this point, we start multiple chains of reductions and broadcasts to
1838   // perform final online analysis activities.
1839
1840   // Start all parallel operations at once. 
1841   //   These MUST NOT modify base performance data in LogPool. If they must,
1842   //   then the parallel operations must be phased (and this code to be
1843   //   restructured as necessary)
1844   CProxy_KMeansBOC kMeansProxy(kMeansGID);
1845   CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1846   if (findOutliers) {
1847     parModulesRemaining++;
1848     kMeansProxy[CkMyPe()].startKMeansAnalysis();
1849   }
1850   parModulesRemaining++;
1851   if (findStartTime) 
1852   bocProxy[CkMyPe()].startTimeAnalysis();
1853   else
1854   bocProxy[CkMyPe()].startEndTimeAnalysis();
1855 }
1856
1857 // Called on each processor
1858 void KMeansBOC::startKMeansAnalysis() {
1859   // Initialize all necessary structures
1860   LogPool *pool = CkpvAccess(_trace)->_logPool;
1861
1862  if(CkMyPe()==0)     CkPrintf("[%d] KMeansBOC::startKMeansAnalysis time=\t%g\n", CkMyPe(), CkWallTimer() );
1863   int flushInt = 0;
1864   if (pool->hasFlushed) {
1865     flushInt = 1;
1866   }
1867   
1868   CkCallback cb(CkIndex_KMeansBOC::flushCheck(NULL), 
1869                 0, thisProxy);
1870   contribute(sizeof(int), &flushInt, CkReduction::logical_or, cb);  
1871 }
1872
1873 // Called on processor 0
1874 void KMeansBOC::flushCheck(CkReductionMsg *msg) {
1875   int someFlush = *((int *)msg->getData());
1876
1877   // if(CkMyPe()==0) CkPrintf("[%d] KMeansBOC::flushCheck time=\t%g\n", CkMyPe(), CkWallTimer() );
1878   
1879   if (someFlush == 0) {
1880     // Data intact proceed with KMeans analysis
1881     CProxy_KMeansBOC kMeansProxy(kMeansGID);
1882     kMeansProxy.flushCheckDone();
1883   } else {
1884     // Some processor had flushed it data at some point, abandon KMeans
1885     CkPrintf("Warning: Some processor has flushed its data. No KMeans will be conducted\n");
1886     // terminate KMeans
1887     CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1888     bocProxy[0].kMeansDone();
1889   }
1890 }
1891
1892 // Called on each processor
1893 void KMeansBOC::flushCheckDone() {
1894   // **FIXME** - more flexible metric collection scheme may be necessary
1895   //   in the future for production use.
1896   LogPool *pool = CkpvAccess(_trace)->_logPool;
1897
1898   // if(CkMyPe()==0)     CkPrintf("[%d] KMeansBOC::flushCheckDone time=\t%g\n", CkMyPe(), CkWallTimer() );
1899
1900   numEntryMethods = _entryTable.size();
1901   numMetrics = numEntryMethods + 2; // EPtime + idle and overhead
1902
1903   // maintained across phases
1904   markedBegin = false;
1905   markedIdle = false;
1906   beginBlockTime = 0.0;
1907   beginIdleBlockTime = 0.0;
1908   lastBeginEPIdx = -1; // none
1909
1910   lastPhaseIdx = 0;
1911   currentExecTimes = NULL;
1912   currentPhase = 0;
1913   selected = false;
1914
1915   pool->initializePhases();
1916
1917   // incoming K Seeds and the per-phase filter
1918   incKSeeds = new double[numK*numMetrics];
1919   keepMetric = new bool[numMetrics];
1920
1921   //  Something wrong when call thisProxy[CkMyPe()].getNextPhaseMetrics() !??!
1922   //  CProxy_KMeansBOC kMeansProxy(kMeansGID);
1923   //  kMeansProxy[CkMyPe()].getNextPhaseMetrics();
1924   thisProxy[CkMyPe()].getNextPhaseMetrics();
1925 }
1926
1927 // Called on each processor.
1928 void KMeansBOC::getNextPhaseMetrics() {
1929   // Assumes the presence of the complete logs on this processor.
1930   // Assumes first event is always BEGIN_COMPUTATION
1931   // Assumes each processor sees the same number of phases.
1932   //
1933   // In this code, we collect performance data for this processor.
1934   // All times are in seconds.
1935
1936   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::getNextPhaseMetrics time=\t%g\n", CkMyPe(), CkWallTimer() );  
1937
1938   if (usePhases) {
1939     DEBUGF("[%d] Using Phases\n", CkMyPe());
1940   } else {
1941     DEBUGF("[%d] NOT using Phases\n", CkMyPe());
1942   }
1943   
1944   if (currentExecTimes != NULL) {
1945     delete [] currentExecTimes;
1946   }
1947   currentExecTimes = new double[numMetrics];
1948   for (int i=0; i<numMetrics; i++) {
1949     currentExecTimes[i] = 0.0;
1950   }
1951
1952   int numEventMethods = _entryTable.size();
1953   LogPool *pool = CkpvAccess(_trace)->_logPool;
1954   
1955   CkAssert(pool->numEntries > lastPhaseIdx);
1956   double startPhaseTime = pool->pool[lastPhaseIdx].time;
1957   double totalPhaseTime = 0.0;
1958   double totalActiveTime = 0.0; // entry method + idle
1959
1960   for (int i=lastPhaseIdx; i<pool->numEntries; i++) {
1961     if (pool->pool[i].type == BEGIN_PROCESSING) {
1962       // check pairing
1963       if (!markedBegin) {
1964         markedBegin = true;
1965       }
1966       beginBlockTime = pool->pool[i].time;
1967       lastBeginEPIdx = pool->pool[i].eIdx;
1968     } else if (pool->pool[i].type == END_PROCESSING) {
1969       // check pairing
1970       // if End without a begin, just ignore
1971       //   this event. If a phase-boundary is crossed, the Begin
1972       //   event would be maintained in beginBlockTime, so it is 
1973       //   not a problem.
1974       if (markedBegin) {
1975         markedBegin = false;
1976         if (pool->pool[i].event < 0)
1977         {
1978           // ignore dummy events. **FIXME** as they have no eIdx?
1979           continue;
1980         }
1981         currentExecTimes[pool->pool[i].eIdx] += 
1982           pool->pool[i].time - beginBlockTime;
1983         totalActiveTime += pool->pool[i].time - beginBlockTime;
1984         lastBeginEPIdx = -1;
1985       }
1986     } else if (pool->pool[i].type == BEGIN_IDLE) {
1987       // check pairing
1988       if (!markedIdle) {
1989         markedIdle = true;
1990       }
1991       beginIdleBlockTime = pool->pool[i].time;
1992     } else if (pool->pool[i].type == END_IDLE) {
1993       // check pairing
1994       if (markedIdle) {
1995         markedIdle = false;
1996         currentExecTimes[numEventMethods] += 
1997           pool->pool[i].time - beginIdleBlockTime;
1998         totalActiveTime += pool->pool[i].time - beginIdleBlockTime;
1999       }
2000     } else if (pool->pool[i].type == END_PHASE) {
2001       // ignored when not using phases
2002       if (usePhases) {
2003         // when we've not visited this node before
2004         if (i != lastPhaseIdx) { 
2005           totalPhaseTime = 
2006             pool->pool[i].time - pool->pool[lastPhaseIdx].time;
2007           // it is important that proper accounting of time take place here.
2008           // Note that END_PHASE events inevitably occur in the context of
2009           //   some entry method by the way the tracing API is designed.
2010           if (markedBegin) {
2011             CkAssert(lastBeginEPIdx >= 0);
2012             currentExecTimes[lastBeginEPIdx] += 
2013               pool->pool[i].time - beginBlockTime;
2014             totalActiveTime += pool->pool[i].time - beginBlockTime;
2015             // this is so the remainder contributes to the next phase
2016             beginBlockTime = pool->pool[i].time;
2017           }
2018           // The following is unlikely, but stranger things have happened.
2019           if (markedIdle) {
2020             currentExecTimes[numEventMethods] +=
2021               pool->pool[i].time - beginIdleBlockTime;
2022             totalActiveTime += pool->pool[i].time - beginIdleBlockTime;
2023             // this is so the remainder contributes to the next phase
2024             beginIdleBlockTime = pool->pool[i].time;
2025           }
2026           if (totalActiveTime <= totalPhaseTime) {
2027             currentExecTimes[numEventMethods+1] = 
2028               totalPhaseTime - totalActiveTime;
2029           } else {
2030             currentExecTimes[numEventMethods+1] = 0.0;
2031             CkPrintf("[%d] Warning: Overhead found to be negative for Phase %d!\n",
2032                      CkMyPe(), currentPhase);
2033           }
2034           collectKMeansData();
2035           // end the loop (and method) and defer the work till the next call
2036           lastPhaseIdx = i;
2037           break; 
2038         }
2039       }
2040     } else if (pool->pool[i].type == END_COMPUTATION) {
2041       if (markedBegin) {
2042         CkAssert(lastBeginEPIdx >= 0);
2043         currentExecTimes[lastBeginEPIdx] += 
2044           pool->pool[i].time - beginBlockTime;
2045         totalActiveTime += pool->pool[i].time - beginBlockTime;
2046       }
2047       if (markedIdle) {
2048         currentExecTimes[numEventMethods] +=
2049           pool->pool[i].time - beginIdleBlockTime;
2050         totalActiveTime += pool->pool[i].time - beginIdleBlockTime;
2051       }
2052       totalPhaseTime = 
2053         pool->pool[i].time - pool->pool[lastPhaseIdx].time;
2054       if (totalActiveTime <= totalPhaseTime) {
2055         currentExecTimes[numEventMethods+1] = totalPhaseTime - totalActiveTime;
2056       } else {
2057         currentExecTimes[numEventMethods+1] = 0.0;
2058         CkPrintf("[%d] Warning: Overhead found to be negative!\n",
2059                  CkMyPe());
2060       }
2061       collectKMeansData();
2062     }
2063   }
2064 }
2065
2066 /**
2067  *  Through a reduction, collectKMeansData aggregates each processors' data
2068  *  in order for global properties to be determined:
2069  *  
2070  *  1. min & max to determine normalization factors.
2071  *  2. sum to determine global EP averages for possible metric reduction
2072  *       through thresholding.
2073  *  3. sum of squares to compute stddev which may be useful in the future.
2074  *
2075  *  collectKMeansData will also keep the processor's data for the current
2076  *    phase so that it may be normalized and worked on subsequently.
2077  *
2078  **/
2079 void KMeansBOC::collectKMeansData() {
2080   int minOffset = numMetrics;
2081   int maxOffset = 2*numMetrics;
2082   int sosOffset = 3*numMetrics; // sos = Sum Of Squares
2083
2084   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::collectKMeansData time=\tg\n", CkMyPe(), CkWallTimer() );
2085
2086   double *reductionMsg = new double[numMetrics*4];
2087
2088   for (int i=0; i<numMetrics; i++) {
2089     reductionMsg[i] = currentExecTimes[i];
2090     // duplicate the event times for max and min sections of the reduction
2091     reductionMsg[minOffset + i] = currentExecTimes[i];
2092     reductionMsg[maxOffset + i] = currentExecTimes[i];
2093     // compute squares
2094     reductionMsg[sosOffset + i] = currentExecTimes[i]*currentExecTimes[i];
2095   }
2096
2097   CkCallback cb(CkIndex_KMeansBOC::globalMetricRefinement(NULL), 
2098                 0, thisProxy);
2099   contribute((numMetrics*4)*sizeof(double), reductionMsg, 
2100              outlierReductionType, cb);  
2101 }
2102
2103 // The purpose is mainly to initialize the k seeds and generate
2104 //   normalization parameters for each of the metrics. The k seeds
2105 //   and normalization parameters are broadcast to all processors.
2106 //
2107 // Called on processor 0
2108 void KMeansBOC::globalMetricRefinement(CkReductionMsg *msg) {
2109   CkAssert(CkMyPe() == 0);
2110   
2111   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::globalMetricRefinement time=\t%g\n", CkMyPe(), CkWallTimer() );
2112
2113   int sumOffset = 0;
2114   int minOffset = numMetrics;
2115   int maxOffset = 2*numMetrics;
2116   int sosOffset = 3*numMetrics; // sos = Sum Of Squares
2117
2118   // calculate statistics & boundaries for the k seeds for clustering
2119   KMeansStatsMessage *outmsg = 
2120     new (numMetrics, numK*numMetrics, numMetrics*4) KMeansStatsMessage;
2121   outmsg->numMetrics = numMetrics;
2122   outmsg->numKPos = numK*numMetrics;
2123   outmsg->numStats = numMetrics*4;
2124
2125   // Sum | Min | Max | Sum of Squares
2126   double *totalExecTimes = (double *)msg->getData();
2127   double totalTime = 0.0;
2128
2129   for (int i=0; i<numMetrics; i++) {
2130     DEBUGN("%lf\n", totalExecTimes[i]);
2131     totalTime += totalExecTimes[i];
2132
2133     // calculate event mean over all processors
2134     outmsg->stats[sumOffset + i] = totalExecTimes[sumOffset + i]/CkNumPes();
2135
2136     // get the ranges and offsets of each metric. With this, we get
2137     //   normalization factors that can be sent back to each processor to
2138     //   be used as necessary. We reuse max for range. Min remains the offset.
2139     outmsg->stats[minOffset + i] = totalExecTimes[minOffset + i];
2140     outmsg->stats[maxOffset + i] = totalExecTimes[maxOffset + i] -
2141       totalExecTimes[minOffset + i];
2142     
2143     // calculate stddev (using biased variance)
2144     outmsg->stats[sosOffset + i] = 
2145       sqrt((totalExecTimes[sosOffset + i] - 
2146             2*(outmsg->stats[i])*totalExecTimes[i] +
2147             (outmsg->stats[i])*(outmsg->stats[i])*CkNumPes())/
2148            CkNumPes());
2149   }
2150
2151   for (int i=0; i<numMetrics; i++) {
2152     // 1) if the proportion of the max value of the entry method relative to
2153     //   the average time taken over all entry methods across all processors
2154     //   is greater than the stipulated percentage threshold ...; AND
2155     // 2) if the range of values are non-zero.
2156     //
2157     // The current assumption is totalTime > 0 (what program has zero total
2158     //   time from all work?)
2159     keepMetric[i] = ((totalExecTimes[maxOffset + i]/(totalTime/CkNumPes()) >=
2160                      entryThreshold) &&
2161       (totalExecTimes[maxOffset + i] > totalExecTimes[minOffset + i]));
2162     if (keepMetric[i]) {
2163       DEBUGF("[%d] Keep EP %d | Max = %lf | Avg Tot = %lf\n", CkMyPe(), i,
2164              totalExecTimes[maxOffset + i], totalTime/CkNumPes());
2165     } else {
2166       DEBUGN("[%d] DO NOT Keep EP %d\n", CkMyPe(), i);
2167     }
2168     outmsg->filter[i] = keepMetric[i];
2169   }
2170
2171   delete msg;
2172   
2173   // initialize k seeds for this phase
2174   kSeeds = new double[numK*numMetrics];
2175
2176   numKReported = 0;
2177   kNumMembers = new int[numK];
2178
2179   // Randomly select k processors' metric vectors for the k seeds
2180   //  srand((unsigned)(CmiWallTimer()*1.0e06));
2181   srand(11337); // for debugging purposes
2182   for (int k=0; k<numK; k++) {
2183     DEBUGF("Seed %d | ", k);
2184     for (int m=0; m<numMetrics; m++) {
2185       double factor = totalExecTimes[maxOffset + m] - 
2186         totalExecTimes[minOffset + m];
2187       // "uniform" distribution, scaled according to the normalization
2188       //   factors
2189       //      kSeeds[numMetrics*k + m] = ((1.0*(k+1))/numK)*factor;
2190       // Random distribution.
2191       kSeeds[numMetrics*k + m] =
2192         ((rand()*1.0)/RAND_MAX)*factor;
2193       if (keepMetric[m]) {
2194         DEBUGF("[%d|%lf] ", m, kSeeds[numMetrics*k + m]);
2195       }
2196       outmsg->kSeedsPos[numMetrics*k + m] = kSeeds[numMetrics*k + m];
2197     }
2198     DEBUGF("\n");
2199     kNumMembers[k] = 0;
2200   }
2201
2202   // broadcast statistical values to all processors for cluster discovery
2203   thisProxy.findInitialClusters(outmsg);
2204 }
2205
2206
2207
2208 // Called on each processor.
2209 void KMeansBOC::findInitialClusters(KMeansStatsMessage *msg) {
2210
2211  if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::findInitialClusters time=\t%g\n", CkMyPe(), CkWallTimer() );
2212
2213   phaseIter = 0;
2214
2215   // Get info from stats message
2216   CkAssert(numMetrics == msg->numMetrics);
2217   for (int i=0; i<numMetrics; i++) {
2218     keepMetric[i] = msg->filter[i];
2219   }
2220
2221   // Normalize data on local processor.
2222   // **CWL** See my thesis for detailed discussion of normalization of
2223   //    performance data.
2224   // **NOTE** This might change if we want to send data based on the filter
2225   //   instead of all the data.
2226   CkAssert(numMetrics*4 == msg->numStats);
2227   for (int i=0; i<numMetrics; i++) {
2228     currentExecTimes[i] -= msg->stats[numMetrics + i];  // take offset
2229     // **CWL** We do not normalize the range. Entry methods that exhibit
2230     //   large absolute timing variations should be allowed to contribute
2231     //   more to the Euclidean distance measure!
2232     // currentExecTimes[i] /= msg->stats[2*numMetrics + i];
2233   }
2234
2235   // **NOTE** This might change if we want to send data based on the filter
2236   //   instead of all the data.
2237   CkAssert(numK*numMetrics == msg->numKPos);
2238   for (int i=0; i<msg->numKPos; i++) {
2239     incKSeeds[i] = msg->kSeedsPos[i];
2240   }
2241
2242   // Decide which KSeed this processor belongs to.
2243   minDistance = calculateDistance(0);
2244   DEBUGN("[%d] Phase %d Iter %d | Distance from 0 = %lf \n", CkMyPe(), 
2245            currentPhase, phaseIter, minDistance);
2246   minK = 0;
2247   for (int i=1; i<numK; i++) {
2248     double distance = calculateDistance(i);
2249     DEBUGN("[%d] Phase %d Iter %d | Distance from %d = %lf \n", CkMyPe(), 
2250              currentPhase, phaseIter, i, distance);
2251     if (distance < minDistance) {
2252       minDistance = distance;
2253       minK = i;
2254     }
2255   }
2256
2257   // Set up a reduction with the modification vector to the root (0).
2258   //
2259   // The modification vector sends a negative value for each metric
2260   //   for the K this processor no longer belongs to and a positive
2261   //   value to the K the processor now belongs. In addition, a -1.0
2262   //   is sent to the K it is leaving and a +1.0 to the K it is 
2263   //   joining.
2264   //
2265   // The processor must still contribute a "zero returns" even if
2266   //   nothing changes. This will be the basis for determine
2267   //   convergence at the root.
2268   //
2269   // The addtional +1 is meant for the count-change that must be
2270   //   maintained for the special cases at the root when some K
2271   //   may be deprived of all processor points or go from 0 to a
2272   //   positive number of processors (see later comments).
2273   double *modVector = new double[numK*(numMetrics+1)];
2274   for (int i=0; i<numK; i++) {
2275     for (int j=0; j<numMetrics+1; j++) {
2276       modVector[i*(numMetrics+1) + j] = 0.0;
2277     }
2278   }
2279   for (int i=0; i<numMetrics; i++) {
2280     // for this initialization, only positive values need be sent.
2281     modVector[minK*(numMetrics+1) + i] = currentExecTimes[i];
2282   }
2283   modVector[minK*(numMetrics+1)+numMetrics] = 1.0;
2284
2285   CkCallback cb(CkIndex_KMeansBOC::updateKSeeds(NULL), 
2286                 0, thisProxy);
2287   contribute(numK*(numMetrics+1)*sizeof(double), modVector, 
2288              CkReduction::sum_double, cb);  
2289 }
2290
2291 double KMeansBOC::calculateDistance(int k) {
2292   double ret = 0.0;
2293   for (int i=0; i<numMetrics; i++) {
2294     if (keepMetric[i]) {
2295       DEBUGN("[%d] Phase %d Iter %d Metric %d Exec %lf Seed %lf \n", 
2296              CkMyPe(), currentPhase, phaseIter, i,
2297                currentExecTimes[i], incKSeeds[k*numMetrics + i]);
2298       ret += pow(currentExecTimes[i] - incKSeeds[k*numMetrics + i], 2.0);
2299     }
2300   }
2301   return sqrt(ret);
2302 }
2303
2304 void KMeansBOC::updateKSeeds(CkReductionMsg *msg) {
2305   CkAssert(CkMyPe() == 0);
2306
2307   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::updateKSeeds time=\t%g\n", CkMyPe(), CkWallTimer() );
2308
2309   double *modVector = (double *)msg->getData();
2310   // sanity check
2311   CkAssert(numK*(numMetrics+1)*sizeof(double) == msg->getSize());
2312
2313   // A quick convergence test.
2314   bool hasChanges = false;
2315   for (int i=0; i<numK; i++) {
2316     hasChanges = hasChanges || 
2317       (modVector[i*(numMetrics+1) + numMetrics] != 0.0);
2318   }
2319   if (!hasChanges) {
2320     delete msg;
2321     findRepresentatives();
2322   } else {
2323     int overallChange = 0;
2324     for (int i=0; i<numK; i++) {
2325       int change = (int)modVector[i*(numMetrics+1) + numMetrics];
2326       if (change != 0) {
2327         overallChange += change;
2328         // modify the k seeds based on the modification vectors coming in
2329         //
2330         // If a seed initially has no members, its contents do not matter and
2331         //   is simply set to the average of the incoming vector.
2332         // If the change causes a seed to lose all its members, do nothing.
2333         //   Its last-known location is kept to allow it to re-capture
2334         //   membership at the next iteration rather than apply the last
2335         //   changes (which snaps the point unnaturally to 0,0).
2336         // Otherwise, apply the appropriate vector changes.
2337         CkAssert((kNumMembers[i] + change >= 0) &&
2338                  (kNumMembers[i] + change <= CkNumPes()));
2339         if (kNumMembers[i] == 0) {
2340           CkAssert(change > 0);
2341           for (int j=0; j<numMetrics; j++) {
2342             kSeeds[i*numMetrics + j] = modVector[i*(numMetrics+1) + j]/change;
2343           }
2344         } else if (kNumMembers[i] + change == 0) {
2345           // do nothing.
2346         } else {
2347           for (int j=0; j<numMetrics; j++) {
2348             kSeeds[i*numMetrics + j] *= kNumMembers[i];
2349             kSeeds[i*numMetrics + j] += modVector[i*(numMetrics+1) + j];
2350             kSeeds[i*numMetrics + j] /= kNumMembers[i] + change;
2351           }
2352         }
2353         kNumMembers[i] += change;
2354       }
2355       DEBUGN("[%d] Phase %d Iter %d K = %d Membership Count = %d\n",
2356              CkMyPe(), currentPhase, phaseIter, i, kNumMembers[i]);
2357     }
2358     delete msg;
2359
2360     // broadcast the new seed locations.
2361     KSeedsMessage *outmsg = new (numK*numMetrics) KSeedsMessage;
2362     outmsg->numKPos = numK*numMetrics;
2363     for (int i=0; i<numK*numMetrics; i++) {
2364       outmsg->kSeedsPos[i] = kSeeds[i];
2365     }
2366
2367     thisProxy.updateSeedMembership(outmsg);
2368   }
2369 }
2370
2371 // Called on all processors
2372 void KMeansBOC::updateSeedMembership(KSeedsMessage *msg) {
2373
2374   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::updateSeedMembership time=\t%g\n", CkMyPe(), CkWallTimer() );
2375
2376   phaseIter++;
2377
2378   // **NOTE** This might change if we want to send data based on the filter
2379   //   instead of all the data.
2380   CkAssert(numK*numMetrics == msg->numKPos);
2381   for (int i=0; i<msg->numKPos; i++) {
2382     incKSeeds[i] = msg->kSeedsPos[i];
2383   }
2384
2385   // Decide which KSeed this processor belongs to.
2386   lastMinK = minK;
2387   minDistance = calculateDistance(0);
2388   DEBUGN("[%d] Phase %d Iter %d | Distance from 0 = %lf \n", CkMyPe(), 
2389          currentPhase, phaseIter, minDistance);
2390
2391   minK = 0;
2392   for (int i=1; i<numK; i++) {
2393     double distance = calculateDistance(i);
2394     DEBUGN("[%d] Phase %d Iter %d | Distance from %d = %lf \n", CkMyPe(), 
2395            currentPhase, phaseIter, i, distance);
2396     if (distance < minDistance) {
2397       minDistance = distance;
2398       minK = i;
2399     }
2400   }
2401
2402   double *modVector = new double[numK*(numMetrics+1)];
2403   for (int i=0; i<numK; i++) {
2404     for (int j=0; j<numMetrics+1; j++) {
2405       modVector[i*(numMetrics+1) + j] = 0.0;
2406     }
2407   }
2408
2409   if (minK != lastMinK) {
2410     for (int i=0; i<numMetrics; i++) {
2411       modVector[minK*(numMetrics+1) + i] = currentExecTimes[i];
2412       modVector[lastMinK*(numMetrics+1) + i] = -currentExecTimes[i];
2413     }
2414     modVector[minK*(numMetrics+1)+numMetrics] = 1.0;
2415     modVector[lastMinK*(numMetrics+1)+numMetrics] = -1.0;
2416   }
2417
2418   CkCallback cb(CkIndex_KMeansBOC::updateKSeeds(NULL), 
2419                 0, thisProxy);
2420   contribute(numK*(numMetrics+1)*sizeof(double), modVector, 
2421              CkReduction::sum_double, cb);  
2422 }
2423
2424 void KMeansBOC::findRepresentatives() {
2425
2426   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::findRepresentatives time=\t%g\n", CkMyPe(), CkWallTimer() );
2427
2428   int numNonEmptyClusters = 0;
2429   for (int i=0; i<numK; i++) {
2430     if (kNumMembers[i] > 0) {
2431       numNonEmptyClusters++;
2432     }
2433   }
2434
2435   int numRepresentatives = peNumKeep;
2436   // **FIXME**
2437   // This is fairly arbitrary. Next time, choose the centers of the top
2438   //   largest clusters.
2439   if (numRepresentatives < numNonEmptyClusters) {
2440     numRepresentatives = numNonEmptyClusters;
2441   }
2442
2443   int slotsRemaining = numRepresentatives;
2444
2445   DEBUGF("Slots = %d | Non-empty = %d \n", slotsRemaining, 
2446          numNonEmptyClusters);
2447
2448   // determine how many exemplars to select per cluster. Currently
2449   //   hardcoded to 1. Future challenge is to decide on other numbers
2450   //   or proportionality.
2451   //
2452   int exemplarsPerCluster = 1;
2453   slotsRemaining -= exemplarsPerCluster*numNonEmptyClusters;
2454
2455   int numCandidateOutliers = CkNumPes() - 
2456     exemplarsPerCluster*numNonEmptyClusters;
2457
2458   double *remainders = new double[numK];
2459   int *assigned = new int[numK];
2460   exemplarChoicesLeft = new int[numK];
2461   outlierChoicesLeft = new int[numK];
2462
2463   for (int i=0; i<numK; i++) {
2464     assigned[i] = 0;
2465     remainders[i] = 
2466       (kNumMembers[i] - exemplarsPerCluster*numNonEmptyClusters) *
2467       slotsRemaining / numCandidateOutliers;
2468     if (remainders[i] >= 0.0) {
2469       assigned[i] = (int)floor(remainders[i]);
2470       remainders[i] -= assigned[i];
2471     } else {
2472       remainders[i] = 0.0;
2473     }
2474   }
2475
2476   for (int i=0; i<numK; i++) {
2477     slotsRemaining -= assigned[i];
2478   }
2479   CkAssert(slotsRemaining >= 0);
2480
2481   // find clusters to assign the loose slots to, in order of
2482   // remainder proportion
2483   while (slotsRemaining > 0) {
2484     double max = 0.0;
2485     int winner = 0;
2486     for (int i=0; i<numK; i++) {
2487       if (remainders[i] > max) {
2488         max = remainders[i];
2489         winner = i;
2490       }
2491     }
2492     assigned[winner]++;
2493     remainders[winner] = 0.0;
2494     slotsRemaining--;
2495   }
2496
2497   // set up how many reduction cycles of min/max we need to conduct to
2498   // select the representatives.
2499   numSelectionIter = exemplarsPerCluster;
2500   for (int i=0; i<numK; i++) {
2501     if (assigned[i] > numSelectionIter) {
2502       numSelectionIter = assigned[i];
2503     }
2504   }
2505   DEBUGF("Selection Iterations = %d\n", numSelectionIter);
2506
2507   for (int i=0; i<numK; i++) {
2508     if (kNumMembers[i] > 0) {
2509       exemplarChoicesLeft[i] = exemplarsPerCluster;
2510       outlierChoicesLeft[i] = assigned[i];
2511     } else {
2512       exemplarChoicesLeft[i] = 0;
2513       outlierChoicesLeft[i] = 0;
2514     }
2515     DEBUGF("%d | Exemplar = %d | Outlier = %d\n", i, exemplarChoicesLeft[i],
2516            outlierChoicesLeft[i]);
2517   }
2518
2519   delete [] assigned;
2520   delete [] remainders;
2521
2522   // send out first broadcast
2523   KSelectionMessage *outmsg = NULL;
2524   if (numSelectionIter > 0) {
2525     outmsg = new (numK, numK, numK) KSelectionMessage;
2526     outmsg->numKMinIDs = numK;
2527     outmsg->numKMaxIDs = numK;
2528     for (int i=0; i<numK; i++) {
2529       outmsg->minIDs[i] = -1;
2530       outmsg->maxIDs[i] = -1;
2531     }
2532     thisProxy.collectDistances(outmsg);
2533   } else {
2534     CkPrintf("Warning: No selection iteration from the start!\n");
2535     // invoke phase completion on all processors
2536     thisProxy.phaseDone();
2537   }
2538 }
2539
2540 /*
2541  *  lastMin = array of minimum champions of the last tournament
2542  *  lastMax = array of maximum champions of the last tournament
2543  *  lastMaxVal = array of last encountered maximum values, allows previous
2544  *                 minimum winners to eliminate themselves from the next
2545  *                 minimum race.
2546  *
2547  *  Called on all processors.
2548  */
2549 void KMeansBOC::collectDistances(KSelectionMessage *msg) {
2550
2551   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::collectDistances time=\t%g\n", CkMyPe(), CkWallTimer() );
2552
2553   DEBUGF("[%d] %d | min = %d max = %d\n", CkMyPe(),
2554          lastMinK, msg->minIDs[lastMinK], msg->maxIDs[lastMinK]);
2555   if ((CkMyPe() == msg->minIDs[lastMinK]) || 
2556       (CkMyPe() == msg->maxIDs[lastMinK])) {
2557     CkAssert(!selected);
2558     selected = true;
2559   }
2560
2561   // build outgoing reduction structure
2562   //   format = minVal | ID | maxVal | ID
2563   double *minMaxAndIDs = NULL;
2564
2565   minMaxAndIDs = new double[numK*4];
2566   // initialize to the appropriate out-of-band values (for error checks)
2567   for (int i=0; i<numK; i++) {
2568     minMaxAndIDs[i*4] = -1.0; // out-of-band min value
2569     minMaxAndIDs[i*4+1] = -1.0; // out of band ID
2570     minMaxAndIDs[i*4+2] = -1.0; // out-of-band max value
2571     minMaxAndIDs[i*4+3] = -1.0; // out of band ID
2572   }
2573   // If I have not won before, I put myself back into the competition
2574   if (!selected) {
2575     DEBUGF("[%d] My Contribution = %lf\n", CkMyPe(), minDistance);
2576     minMaxAndIDs[lastMinK*4] = minDistance;
2577     minMaxAndIDs[lastMinK*4+1] = CkMyPe();
2578     minMaxAndIDs[lastMinK*4+2] = minDistance;
2579     minMaxAndIDs[lastMinK*4+3] = CkMyPe();
2580   }
2581   delete msg;
2582
2583   CkCallback cb(CkIndex_KMeansBOC::findNextMinMax(NULL), 
2584                 0, thisProxy);
2585   contribute(numK*4*sizeof(double), minMaxAndIDs, 
2586              minMaxReductionType, cb);  
2587 }
2588
2589 void KMeansBOC::findNextMinMax(CkReductionMsg *msg) {
2590   // incoming format:
2591   //   minVal | minID | maxVal | maxID
2592
2593   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::findNextMinMax time=\t%g\n", CkMyPe(), CkWallTimer() );
2594
2595   if (numSelectionIter > 0) {
2596     double *incInfo = (double *)msg->getData();
2597     
2598     KSelectionMessage *outmsg = new (numK, numK) KSelectionMessage;
2599     outmsg->numKMinIDs = numK;
2600     outmsg->numKMaxIDs = numK;
2601     
2602     for (int i=0; i<numK; i++) {
2603       DEBUGF("%d | %lf %d %lf %d \n", i, 
2604              incInfo[i*4], (int)incInfo[i*4+1], 
2605              incInfo[i*4+2], (int)incInfo[i*4+3]);
2606     }
2607
2608     for (int i=0; i<numK; i++) {
2609       if (exemplarChoicesLeft[i] > 0) {
2610         outmsg->minIDs[i] = (int)incInfo[i*4+1];
2611         exemplarChoicesLeft[i]--;
2612       } else {
2613         outmsg->minIDs[i] = -1;
2614       }
2615       if (outlierChoicesLeft[i] > 0) {
2616         outmsg->maxIDs[i] = (int)incInfo[i*4+3];
2617         outlierChoicesLeft[i]--;
2618       } else {
2619         outmsg->maxIDs[i] = -1;
2620       }
2621     }
2622     thisProxy.collectDistances(outmsg);
2623     numSelectionIter--;
2624   } else {
2625     // invoke phase completion on all processors
2626     thisProxy.phaseDone();
2627   }
2628 }
2629
2630 /**
2631  *  Completion of the K-Means clustering and data selection of one phase
2632  *    of the computation.
2633  *
2634  *  Called on every processor.
2635  */
2636 void KMeansBOC::phaseDone() {
2637
2638   //  if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::phaseDone time=\t%g\n", CkMyPe(), CkWallTimer() );
2639
2640   LogPool *pool = CkpvAccess(_trace)->_logPool;
2641   CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
2642
2643   // now decide on what to do with the decision.
2644   if (!selected) {
2645     if (usePhases) {
2646       pool->keepPhase[currentPhase] = false;
2647     } else {
2648       // if not using phases, we're working on the whole log
2649       pool->setAllPhases(false);
2650     }
2651   }
2652
2653   // **FIXME** (?) - All processors have to agree on this, or the reduction
2654   //   will not be correct! The question is "is this enforcible?"
2655   if ((currentPhase == (pool->numPhases-1)) || !usePhases) {
2656     // We're done
2657     int dummy = 0;
2658     CkCallback cb(CkIndex_TraceProjectionsBOC::kMeansDone(NULL), 
2659                   0, bocProxy);
2660     contribute(sizeof(int), &dummy, CkReduction::sum_int, cb);
2661   } else {
2662     // reset all phase-based k-means data and decisions
2663
2664     // **FIXME**!!!!!    
2665     
2666     // invoke the next K-Means computation phase.
2667     currentPhase++;
2668     thisProxy[CkMyPe()].getNextPhaseMetrics();
2669   }
2670 }
2671
2672 void TraceProjectionsBOC::startTimeAnalysis()
2673 {
2674   double startTime = 0.0;
2675   if (CkpvAccess(_trace)->_logPool->numEntries>0)
2676      startTime = CkpvAccess(_trace)->_logPool->pool[0].time;
2677   CkCallback cb(CkIndex_TraceProjectionsBOC::startTimeDone(NULL), thisProxy);
2678   contribute(sizeof(double), &startTime, CkReduction::min_double, cb);  
2679 }
2680
2681 void TraceProjectionsBOC::startTimeDone(CkReductionMsg *msg)
2682 {
2683   // CkPrintf("[%d] TraceProjectionsBOC::startTimeDone time=\t%g parModulesRemaining:%d\n", CkMyPe(), CkWallTimer(), parModulesRemaining);
2684
2685   if (CkpvAccess(_trace) != NULL) {
2686     CkpvAccess(_trace)->_logPool->globalStartTime = *(double *)msg->getData();
2687     CkpvAccess(_trace)->_logPool->setNewStartTime();
2688     //if (CkMyPe() == 0) CkPrintf("Start time determined to be %lf us\n", (CkpvAccess(_trace)->_logPool->globalStartTime)*1e06);
2689   }
2690   delete msg;
2691   thisProxy[CkMyPe()].startEndTimeAnalysis();
2692 }
2693
2694 void TraceProjectionsBOC::startEndTimeAnalysis()
2695 {
2696  //CkPrintf("[%d] TraceProjectionsBOC::startEndTimeAnalysis time=\t%g\n", CkMyPe(), CkWallTimer() );
2697
2698   endTime = CkpvAccess(_trace)->endTime;
2699   // CkPrintf("[%d] End time is %lf us\n", CkMyPe(), endTime*1e06);
2700
2701   CkCallback cb(CkIndex_TraceProjectionsBOC::endTimeDone(NULL), 
2702                 0, thisProxy);
2703   contribute(sizeof(double), &endTime, CkReduction::max_double, cb);  
2704 }
2705
2706 void TraceProjectionsBOC::endTimeDone(CkReductionMsg *msg)
2707 {
2708  //if(CkMyPe()==0)    CkPrintf("[%d] TraceProjectionsBOC::endTimeDone time=\t%g parModulesRemaining:%d\n", CkMyPe(), CkWallTimer(), parModulesRemaining);
2709
2710   CkAssert(CkMyPe() == 0);
2711   parModulesRemaining--;
2712   if (CkpvAccess(_trace) != NULL) {
2713     CkpvAccess(_trace)->_logPool->globalEndTime = *(double *)msg->getData() - CkpvAccess(_trace)->_logPool->globalStartTime;
2714     // CkPrintf("End time determined to be %lf us\n",
2715     //       (CkpvAccess(_trace)->_logPool->globalEndTime)*1e06);
2716   }
2717   delete msg;
2718   if (parModulesRemaining == 0) {
2719     thisProxy[CkMyPe()].finalize();
2720   }
2721 }
2722
2723 void TraceProjectionsBOC::kMeansDone(CkReductionMsg *msg) {
2724
2725  if(CkMyPe()==0)  CkPrintf("[%d] TraceProjectionsBOC::kMeansDone time=\t%g\n", CkMyPe(), CkWallTimer() );
2726
2727   CkAssert(CkMyPe() == 0);
2728   parModulesRemaining--;
2729   CkPrintf("K-Means Analysis Time = %lf seconds\n",
2730            CmiWallTimer()-analysisStartTime);
2731   delete msg;
2732   if (parModulesRemaining == 0) {
2733     thisProxy[CkMyPe()].finalize();
2734   }
2735 }
2736
2737 /**
2738  *
2739  *  This version is called (on processor 0) only if flushCheck fails.
2740  *
2741  */
2742 void TraceProjectionsBOC::kMeansDone() {
2743   CkAssert(CkMyPe() == 0);
2744   parModulesRemaining--;
2745   CkPrintf("K-Means Analysis Aborted because of flush. Time taken = %lf seconds\n",
2746            CmiWallTimer()-analysisStartTime);
2747   if (parModulesRemaining == 0) {
2748     thisProxy[CkMyPe()].finalize();
2749   }
2750 }
2751
2752 void TraceProjectionsBOC::finalize()
2753 {
2754   CkAssert(CkMyPe() == 0);
2755   //CkPrintf("Total Analysis Time = %lf seconds\n", 
2756   //       CmiWallTimer()-analysisStartTime);
2757   thisProxy.closingTraces();
2758 }
2759
2760 // Called on every processor
2761 void TraceProjectionsBOC::closingTraces() {
2762   CkpvAccess(_trace)->closeTrace();
2763
2764     // subtle:  reduction needs to go to the PE which started CkExit()
2765   int pe = 0;
2766   if (endPe != -1) pe = endPe;
2767   CkCallback cb(CkIndex_TraceProjectionsBOC::closeParallelShutdown(NULL), 
2768                 pe, thisProxy); 
2769   contribute(0, NULL, CkReduction::sum_int, cb);  
2770 }
2771
2772 // The sole purpose of this reduction is to decide whether or not
2773 //   Projections as a module needs to call CkExit() to get other
2774 //   modules to shutdown.
2775 void TraceProjectionsBOC::closeParallelShutdown(CkReductionMsg *msg) {
2776   CkAssert(endPe == -1 && CkMyPe() ==0 || CkMyPe() == endPe);
2777   delete msg;
2778   // decide if CkExit() needs to be called
2779   if (!CkpvAccess(_trace)->converseExit) {
2780     CkExit();
2781   }
2782 }
2783 /*
2784  *  Registration and definition of the Outlier Reduction callback.
2785  *  Format: Sum | Min | Max | Sum of Squares
2786  */
2787 CkReductionMsg *outlierReduction(int nMsgs,
2788                                  CkReductionMsg **msgs) {
2789   int numBytes = 0;
2790   int numMetrics = 0;
2791   double *ret = NULL;
2792
2793   if (nMsgs == 1) {
2794     // nothing to do, just pass it on
2795     return CkReductionMsg::buildNew(msgs[0]->getSize(),msgs[0]->getData());
2796   }
2797
2798   if (nMsgs > 1) {
2799     numBytes = msgs[0]->getSize();
2800     // sanity checks
2801     if (numBytes%sizeof(double) != 0) {
2802       CkAbort("Outlier Reduction Size incompatible with doubles!\n");
2803     }
2804     if ((numBytes/sizeof(double))%4 != 0) {
2805       CkAbort("Outlier Reduction Size Array not divisible by 4!\n");
2806     }
2807     numMetrics = (numBytes/sizeof(double))/4;
2808     ret = new double[numMetrics*4];
2809
2810     // copy the first message data into the return structure first
2811     for (int i=0; i<numMetrics*4; i++) {
2812       ret[i] = ((double *)msgs[0]->getData())[i];
2813     }
2814
2815     // Sum | Min | Max | Sum of Squares
2816     for (int msgIdx=1; msgIdx<nMsgs; msgIdx++) {
2817       for (int i=0; i<numMetrics; i++) {
2818         // Sum
2819         ret[i] += ((double *)msgs[msgIdx]->getData())[i];
2820         // Min
2821         ret[numMetrics + i] =
2822           (ret[numMetrics + i] < 
2823            ((double *)msgs[msgIdx]->getData())[numMetrics + i]) 
2824           ? ret[numMetrics + i] : 
2825           ((double *)msgs[msgIdx]->getData())[numMetrics + i];
2826         // Max
2827         ret[2*numMetrics + i] = 
2828           (ret[2*numMetrics + i] >
2829            ((double *)msgs[msgIdx]->getData())[2*numMetrics + i])
2830           ? ret[2*numMetrics + i] :
2831           ((double *)msgs[msgIdx]->getData())[2*numMetrics + i];
2832         // Sum of Squares (squaring already done at leaf)
2833         ret[3*numMetrics + i] +=
2834           ((double *)msgs[msgIdx]->getData())[3*numMetrics + i];
2835       }
2836     }
2837   }
2838   
2839   /* apparently, we do not delete the incoming messages */
2840   return CkReductionMsg::buildNew(numBytes,ret);
2841 }
2842
2843 /*
2844  * The only reason we have a user-defined reduction is to support
2845  *   identification of the "winning" processors as well as to handle
2846  *   both the min and the max of each "tournament". A simple min/max
2847  *   discovery cannot handle ties.
2848  */
2849 CkReductionMsg *minMaxReduction(int nMsgs,
2850                                 CkReductionMsg **msgs) {
2851   CkAssert(nMsgs > 0);
2852
2853   int numBytes = msgs[0]->getSize();
2854   CkAssert(numBytes%sizeof(double) == 0);
2855   int numK = (numBytes/sizeof(double))/4;
2856
2857   double *ret = new double[numK*4];
2858   // fill with out-of-band values
2859   for (int i=0; i<numK; i++) {
2860     ret[i*4] = -1.0;
2861     ret[i*4+1] = -1.0;
2862     ret[i*4+2] = -1.0;
2863     ret[i*4+3] = -1.0;
2864   }
2865
2866   // incoming format K * (minVal | minIdx | maxVal | maxIdx)
2867   for (int i=0; i<nMsgs; i++) {
2868     double *temp = (double *)msgs[i]->getData();
2869     for (int j=0; j<numK; j++) {
2870       // no previous valid min
2871       if (ret[j*4+1] < 0) {
2872         // fill it in only if the incoming min is valid
2873         if (temp[j*4+1] >= 0) {
2874           ret[j*4] = temp[j*4];      // fill min value
2875           ret[j*4+1] = temp[j*4+1];  // fill ID
2876         }
2877       } else {
2878         // find Min, only if incoming min is valid
2879         if (temp[j*4+1] >= 0) {
2880           if (temp[j*4] < ret[j*4]) {
2881             ret[j*4] = temp[j*4];      // replace min value
2882             ret[j*4+1] = temp[j*4+1];  // replace ID
2883           }
2884         }
2885       }
2886       // no previous valid max
2887       if (ret[j*4+3] < 0) {
2888         // fill only if the incoming max is valid
2889         if (temp[j*4+3] >= 0) {
2890           ret[j*4+2] = temp[j*4+2];  // fill max value
2891           ret[j*4+3] = temp[j*4+3];  // fill ID
2892         }
2893       } else {
2894         // find Max, only if incoming max is valid
2895         if (temp[j*4+3] >= 0) {
2896           if (temp[j*4+2] > ret[j*4+2]) {
2897             ret[j*4+2] = temp[j*4+2];  // replace max value
2898             ret[j*4+3] = temp[j*4+3];  // replace ID
2899           }
2900         }
2901       }
2902     }
2903   }
2904   CkReductionMsg *redmsg = CkReductionMsg::buildNew(numBytes, ret);
2905   delete [] ret;
2906   return redmsg;
2907 }
2908
2909 #include "TraceProjections.def.h"
2910 #endif //PROJ_ANALYSIS
2911
2912 /*@}*/