17de54bccbc0015032f2cbefaa0bce4d746301a3
[charm.git] / src / ck-perf / trace-projections.C
1 /**
2  * \addtogroup CkPerf
3 */
4 /*@{*/
5
6 #include <string.h>
7
8 #include "charm++.h"
9 #include "trace-projections.h"
10 #include "trace-projectionsBOC.h"
11 #include "TopoManager.h"
12
13 #if DEBUG_PROJ
14 #define DEBUGF(...) CkPrintf(__VA_ARGS__)
15 #else
16 #define DEBUGF(...)
17 #endif
18 #define DEBUGN(...)  // easy way to selectively disable DEBUGs
19
20 #define DefaultLogBufSize      1000000
21
22 // **CW** Simple delta encoding implementation
23 // delta encoding is on by default. It may be turned off later in
24 // the runtime.
25 int deltaLog;
26 int nonDeltaLog;
27
28 int checknested=0;              // check illegal nested begin/end execute 
29
30 #ifdef PROJ_ANALYSIS
31 // BOC operations readonlys
32 CkGroupID traceProjectionsGID;
33 CkGroupID kMeansGID;
34
35 // New reduction type for Outlier Analysis purposes. This is allowed to be
36 // a global variable according to the Charm++ manual.
37 CkReductionMsg *outlierReduction(int nMsgs,
38                                  CkReductionMsg **msgs);
39 CkReductionMsg *minMaxReduction(int nMsgs,
40                                 CkReductionMsg **msgs);
41 CkReduction::reducerType outlierReductionType;
42 CkReduction::reducerType minMaxReductionType;
43 #endif // PROJ_ANALYSIS
44
45 CkpvStaticDeclare(TraceProjections*, _trace);
46 CtvStaticDeclare(int,curThreadEvent);
47
48 CkpvDeclare(CmiInt8, CtrLogBufSize);
49
50 typedef CkVec<char *>  usrEventVec;
51 CkpvStaticDeclare(usrEventVec, usrEventlist);
52 class UsrEvent {
53 public:
54   int e;
55   char *str;
56   UsrEvent(int _e, char* _s): e(_e),str(_s) {}
57 };
58 CkpvStaticDeclare(CkVec<UsrEvent *>*, usrEvents);
59
60 // When tracing is disabled, these are defined as empty static inlines
61 // in the header, to minimize overhead
62 #if CMK_TRACE_ENABLED
63 /// Disable the outputting of the trace logs
64 void disableTraceLogOutput()
65
66   CkpvAccess(_trace)->setWriteData(false);
67 }
68
69 /// Enable the outputting of the trace logs
70 void enableTraceLogOutput()
71 {
72   CkpvAccess(_trace)->setWriteData(true);
73 }
74
75 /// Force the log files to be flushed
76 void flushTraceLog()
77 {
78   CkpvAccess(_trace)->traceFlushLog();
79 }
80 #endif
81
82 #if ! CMK_TRACE_ENABLED
83 static int warned=0;
84 #define OPTIMIZED_VERSION       \
85         if (!warned) { warned=1;        \
86         CmiPrintf("\n\n!!!! Warning: traceUserEvent not available in optimized version!!!!\n\n\n"); }
87 #else
88 #define OPTIMIZED_VERSION /*empty*/
89 #endif // CMK_TRACE_ENABLED
90
91 /*
92 On T3E, we need to have file number control by open/close files only when needed.
93 */
94 #if CMK_TRACE_LOGFILE_NUM_CONTROL
95   #define OPEN_LOG openLog("a");
96   #define CLOSE_LOG closeLog();
97 #else
98   #define OPEN_LOG
99   #define CLOSE_LOG
100 #endif //CMK_TRACE_LOGFILE_NUM_CONTROL
101
102 #if CMK_HAS_COUNTER_PAPI
103 int papiEvents[NUMPAPIEVENTS] = { PAPI_L2_DCM, PAPI_FP_OPS };
104 #endif // CMK_HAS_COUNTER_PAPI
105
106 /**
107   For each TraceFoo module, _createTraceFoo() must be defined.
108   This function is called in _createTraces() generated in moduleInit.C
109 */
110 void _createTraceprojections(char **argv)
111 {
112   DEBUGF("%d createTraceProjections\n", CkMyPe());
113   CkpvInitialize(CkVec<char *>, usrEventlist);
114   CkpvInitialize(CkVec<UsrEvent *>*, usrEvents);
115   CkpvAccess(usrEvents) = new CkVec<UsrEvent *>();
116 #if CMK_BIGSIM_CHARM
117   // CthRegister does not call the constructor
118 //  CkpvAccess(usrEvents) = CkVec<UsrEvent *>();
119 #endif //CMK_BIGSIM_CHARM
120   CkpvInitialize(TraceProjections*, _trace);
121   CkpvAccess(_trace) = new  TraceProjections(argv);
122   CkpvAccess(_traces)->addTrace(CkpvAccess(_trace));
123   if (CkMyPe()==0) CkPrintf("Charm++: Tracemode Projections enabled.\n");
124 }
125  
126 /* ****** CW TEMPORARY LOCATION ***** Support for thread listeners */
127
128 struct TraceThreadListener {
129   struct CthThreadListener base;
130   int event;
131   int msgType;
132   int ep;
133   int srcPe;
134   int ml;
135   CmiObjId idx;
136 };
137
138 extern "C"
139 void traceThreadListener_suspend(struct CthThreadListener *l)
140 {
141   TraceThreadListener *a=(TraceThreadListener *)l;
142   /* here, we activate the appropriate trace codes for the appropriate
143      registered modules */
144   traceSuspend();
145 }
146
147 extern "C"
148 void traceThreadListener_resume(struct CthThreadListener *l) 
149 {
150   TraceThreadListener *a=(TraceThreadListener *)l;
151   /* here, we activate the appropriate trace codes for the appropriate
152      registered modules */
153   _TRACE_BEGIN_EXECUTE_DETAILED(a->event,a->msgType,a->ep,a->srcPe,a->ml,
154                                 CthGetThreadID(a->base.thread));
155   a->event=-1;
156   a->srcPe=CkMyPe(); /* potential lie to migrated threads */
157   a->ml=0;
158 }
159
160 extern "C"
161 void traceThreadListener_free(struct CthThreadListener *l) 
162 {
163   TraceThreadListener *a=(TraceThreadListener *)l;
164   delete a;
165 }
166
167 void TraceProjections::traceAddThreadListeners(CthThread tid, envelope *e)
168 {
169 #if CMK_TRACE_ENABLED
170   /* strip essential information from the envelope */
171   TraceThreadListener *a= new TraceThreadListener;
172   
173   a->base.suspend=traceThreadListener_suspend;
174   a->base.resume=traceThreadListener_resume;
175   a->base.free=traceThreadListener_free;
176   a->event=e->getEvent();
177   a->msgType=e->getMsgtype();
178   a->ep=e->getEpIdx();
179   a->srcPe=e->getSrcPe();
180   a->ml=e->getTotalsize();
181
182   CthAddListener(tid, (CthThreadListener *)a);
183 #endif
184 }
185
186 void LogPool::openLog(const char *mode)
187 {
188 #if CMK_PROJECTIONS_USE_ZLIB
189   if(compressed) {
190     if (nonDeltaLog) {
191       do {
192         zfp = gzopen(fname, mode);
193       } while (!zfp && (errno == EINTR || errno == EMFILE));
194       if(!zfp) CmiAbort("Cannot open Projections Compressed Non Delta Trace File for writing...\n");
195     }
196     if (deltaLog) {
197       do {
198         deltazfp = gzopen(dfname, mode);
199       } while (!deltazfp && (errno == EINTR || errno == EMFILE));
200       if (!deltazfp) 
201         CmiAbort("Cannot open Projections Compressed Delta Trace File for writing...\n");
202     }
203   } else {
204     if (nonDeltaLog) {
205       do {
206         fp = fopen(fname, mode);
207       } while (!fp && (errno == EINTR || errno == EMFILE));
208       if (!fp) {
209         CkPrintf("[%d] Attempting to open file [%s]\n",CkMyPe(),fname);
210         CmiAbort("Cannot open Projections Non Delta Trace File for writing...\n");
211       }
212     }
213     if (deltaLog) {
214       do {
215         deltafp = fopen(dfname, mode);
216       } while (!deltafp && (errno == EINTR || errno == EMFILE));
217       if (!deltafp) 
218         CmiAbort("Cannot open Projections Delta Trace File for writing...\n");
219     }
220   }
221 #else
222   if (nonDeltaLog) {
223     do {
224       fp = fopen(fname, mode);
225     } while (!fp && (errno == EINTR || errno == EMFILE));
226     if (!fp) {
227       CkPrintf("[%d] Attempting to open file [%s]\n",CkMyPe(),fname);
228       CmiAbort("Cannot open Projections Non Delta Trace File for writing...\n");
229     }
230   }
231   if (deltaLog) {
232     do {
233       deltafp = fopen(dfname, mode);
234     } while (!deltafp && (errno == EINTR || errno == EMFILE));
235     if(!deltafp) 
236       CmiAbort("Cannot open Projections Delta Trace File for writing...\n");
237   }
238 #endif
239 }
240
241 void LogPool::closeLog(void)
242 {
243 #if CMK_PROJECTIONS_USE_ZLIB
244   if(compressed) {
245     if (nonDeltaLog) gzclose(zfp);
246     if (deltaLog) gzclose(deltazfp);
247     return;
248   }
249 #endif
250   if (nonDeltaLog) { 
251 #if !defined(_WIN32) || defined(__CYGWIN__)
252     fsync(fileno(fp)); 
253 #endif
254     fclose(fp); 
255   }
256   if (deltaLog)  { 
257 #if !defined(_WIN32) || defined(__CYGWIN__)
258     fsync(fileno(deltafp)); 
259 #endif
260     fclose(deltafp);  
261   }
262 }
263
264 LogPool::LogPool(char *pgm) {
265   pool = new LogEntry[CkpvAccess(CtrLogBufSize)];
266   // defaults to writing data (no outlier changes)
267   writeData = true;
268   numEntries = 0;
269   // **CW** for simple delta encoding
270   prevTime = 0.0;
271   timeErr = 0.0;
272   globalStartTime = 0.0;
273   globalEndTime = 0.0;
274   headerWritten = 0;
275   numPhases = 0;
276   hasFlushed = false;
277
278   keepPhase = NULL;
279
280   fileCreated = false;
281   poolSize = CkpvAccess(CtrLogBufSize);
282   pgmname = new char[strlen(pgm)+1];
283   strcpy(pgmname, pgm);
284 }
285
286 void LogPool::createFile(const char *fix)
287 {
288   if (fileCreated) {
289     return;
290   }
291
292   char* filenameLastPart = strrchr(pgmname, PATHSEP) + 1; // Last occurrence of path separator
293   char *pathPlusFilePrefix = new char[1024];
294
295   if(nSubdirs > 0){
296     int sd = CkMyPe() % nSubdirs;
297     char *subdir = new char[1024];
298     sprintf(subdir, "%s.projdir.%d", pgmname, sd);
299     CmiMkdir(subdir);
300     sprintf(pathPlusFilePrefix, "%s%c%s%s", subdir, PATHSEP, filenameLastPart, fix);
301     delete[] subdir;
302   } else {
303     sprintf(pathPlusFilePrefix, "%s%s", pgmname, fix);
304   }
305
306   char pestr[10];
307   sprintf(pestr, "%d", CkMyPe());
308 #if CMK_PROJECTIONS_USE_ZLIB
309   int len;
310   if(compressed)
311     len = strlen(pathPlusFilePrefix)+strlen(".logold")+strlen(pestr)+strlen(".gz")+3;
312   else
313     len = strlen(pathPlusFilePrefix)+strlen(".logold")+strlen(pestr)+3;
314 #else
315   int len = strlen(pathPlusFilePrefix)+strlen(".logold")+strlen(pestr)+3;
316 #endif
317
318   if (nonDeltaLog) {
319     fname = new char[len];
320   }
321   if (deltaLog) {
322     dfname = new char[len];
323   }
324 #if CMK_PROJECTIONS_USE_ZLIB
325   if(compressed) {
326     if (deltaLog && nonDeltaLog) {
327       sprintf(fname, "%s.%s.logold.gz",  pathPlusFilePrefix, pestr);
328       sprintf(dfname, "%s.%s.log.gz", pathPlusFilePrefix, pestr);
329     } else {
330       if (nonDeltaLog) {
331         sprintf(fname, "%s.%s.log.gz", pathPlusFilePrefix,pestr);
332       } else {
333         sprintf(dfname, "%s.%s.log.gz", pathPlusFilePrefix, pestr);
334       }
335     }
336   } else {
337     if (deltaLog && nonDeltaLog) {
338       sprintf(fname, "%s.%s.logold", pathPlusFilePrefix, pestr);
339       sprintf(dfname, "%s.%s.log", pathPlusFilePrefix, pestr);
340     } else {
341       if (nonDeltaLog) {
342         sprintf(fname, "%s.%s.log", pathPlusFilePrefix, pestr);
343       } else {
344         sprintf(dfname, "%s.%s.log", pathPlusFilePrefix, pestr);
345       }
346     }
347   }
348 #else
349   if (deltaLog && nonDeltaLog) {
350     sprintf(fname, "%s.%s.logold", pathPlusFilePrefix, pestr);
351     sprintf(dfname, "%s.%s.log", pathPlusFilePrefix, pestr);
352   } else {
353     if (nonDeltaLog) {
354       sprintf(fname, "%s.%s.log", pathPlusFilePrefix, pestr);
355     } else {
356       sprintf(dfname, "%s.%s.log", pathPlusFilePrefix, pestr);
357     }
358   }
359 #endif
360   fileCreated = true;
361   delete[] pathPlusFilePrefix;
362   openLog("w+");
363   CLOSE_LOG 
364 }
365
366 void LogPool::createSts(const char *fix)
367 {
368   CkAssert(CkMyPe() == 0);
369   // create the sts file
370   char *fname = new char[strlen(CkpvAccess(traceRoot))+strlen(fix)+strlen(".sts")+2];
371   sprintf(fname, "%s%s.sts", CkpvAccess(traceRoot), fix);
372   do
373     {
374       stsfp = fopen(fname, "w");
375     } while (!stsfp && (errno == EINTR || errno == EMFILE));
376   if(stsfp==0){
377     CmiPrintf("Cannot open projections sts file for writing due to %s\n", strerror(errno));
378     CmiAbort("Error!!\n");
379   }
380   delete[] fname;
381 }  
382
383 void LogPool::createTopo(const char *fix)
384 {
385   CkAssert(CkMyPe() == 0);
386   // create the topo file
387   char *fname = new char[strlen(CkpvAccess(traceRoot))+strlen(fix)+strlen(".topo")+2];
388   sprintf(fname, "%s%s.topo", CkpvAccess(traceRoot), fix);
389   do
390     {
391       topofp = fopen(fname, "w");
392     } while (!stsfp && (errno == EINTR || errno == EMFILE));
393   if(stsfp==0){
394     CmiPrintf("Cannot open projections topo file for writing due to %s\n", strerror(errno));
395     CmiAbort("Error!!\n");
396   }
397   delete[] fname;
398 }  
399
400 void LogPool::createRC()
401 {
402   // create the projections rc file.
403   fname = 
404     new char[strlen(CkpvAccess(traceRoot))+strlen(".projrc")+1];
405   sprintf(fname, "%s.projrc", CkpvAccess(traceRoot));
406   do {
407     rcfp = fopen(fname, "w");
408   } while (!rcfp && (errno == EINTR || errno == EMFILE));
409   if (rcfp==0) {
410     CmiAbort("Cannot open projections configuration file for writing.\n");
411   }
412   delete[] fname;
413 }
414
415 LogPool::~LogPool() 
416 {
417   if (writeData) {
418     writeLog();
419 #if !CMK_TRACE_LOGFILE_NUM_CONTROL
420     closeLog();
421 #endif
422   }
423
424 #if CMK_BIGSIM_CHARM
425   extern int correctTimeLog;
426   if (correctTimeLog) {
427     createFile("-bg");
428     if (CkMyPe() == 0) {
429       createSts("-bg");
430     }
431     writeHeader();
432     if (CkMyPe() == 0) writeSts(NULL);
433     postProcessLog();
434   }
435 #endif
436
437   delete[] pool;
438   delete [] fname;
439 }
440
441 void LogPool::writeHeader()
442 {
443   if (headerWritten) return;
444   headerWritten = 1;
445   if(!binary) {
446 #if CMK_PROJECTIONS_USE_ZLIB
447     if(compressed) {
448       if (nonDeltaLog) {
449         gzprintf(zfp, "PROJECTIONS-RECORD %d\n", numEntries);
450       }
451       if (deltaLog) {
452         gzprintf(deltazfp, "PROJECTIONS-RECORD %d DELTA\n", numEntries);
453       }
454     } 
455     else /* else clause is below... */
456 #endif
457     /*... may hang over from else above */ {
458       if (nonDeltaLog) {
459         fprintf(fp, "PROJECTIONS-RECORD %d\n", numEntries);
460       }
461       if (deltaLog) {
462         fprintf(deltafp, "PROJECTIONS-RECORD %d DELTA\n", numEntries);
463       }
464     }
465   }
466   else { // binary
467       if (nonDeltaLog) {
468         fwrite(&numEntries,sizeof(numEntries),1,fp);
469       }
470       if (deltaLog) {
471         fwrite(&numEntries,sizeof(numEntries),1,deltafp);
472       }
473   }
474 }
475
476 void LogPool::writeLog(void)
477 {
478   createFile();
479   OPEN_LOG
480   writeHeader();
481   if (nonDeltaLog) write(0);
482   if (deltaLog) write(1);
483   CLOSE_LOG
484 }
485
486 void LogPool::write(int writedelta) 
487 {
488   // **CW** Simple delta encoding implementation
489   // prevTime has to be maintained as an object variable because
490   // LogPool::write may be called several times depending on the
491   // +logsize value.
492   PUP::er *p = NULL;
493   if (binary) {
494     p = new PUP::toDisk(writedelta?deltafp:fp);
495   }
496 #if CMK_PROJECTIONS_USE_ZLIB
497   else if (compressed) {
498     p = new toProjectionsGZFile(writedelta?deltazfp:zfp);
499   }
500 #endif
501   else {
502     p = new toProjectionsFile(writedelta?deltafp:fp);
503   }
504   CmiAssert(p);
505   int curPhase = 0;
506   // **FIXME** - Should probably consider a more sophisticated bounds-based
507   //   approach for selective writing instead of making multiple if-checks
508   //   for every single event.
509   for(UInt i=0; i<numEntries; i++) {
510     if (!writedelta) {
511       if (keepPhase == NULL) {
512         // default case, when no phase selection is required.
513         pool[i].pup(*p);
514       } else {
515         // **FIXME** Might be a good idea to create a "filler" event block for
516         //   all the events taken out by phase filtering.
517         if (pool[i].type == END_PHASE) {
518           // always write phase markers
519           pool[i].pup(*p);
520           curPhase++;
521         } else if (pool[i].type == BEGIN_COMPUTATION ||
522                    pool[i].type == END_COMPUTATION) {
523           // always write BEGIN and END COMPUTATION markers
524           pool[i].pup(*p);
525         } else if (keepPhase[curPhase]) {
526           pool[i].pup(*p);
527         }
528       }
529     }
530     else {      // delta
531       // **FIXME** Implement phase-selective writing for delta logs
532       //   eventually
533       double time = pool[i].time;
534       if (pool[i].type != BEGIN_COMPUTATION && pool[i].type != END_COMPUTATION)
535       {
536         double timeDiff = (time-prevTime)*1.0e6;
537         UInt intTimeDiff = (UInt)timeDiff;
538         timeErr += timeDiff - intTimeDiff; /* timeErr is never >= 2.0 */
539         if (timeErr > 1.0) {
540           timeErr -= 1.0;
541           intTimeDiff++;
542         }
543         pool[i].time = intTimeDiff/1.0e6;
544       }
545       pool[i].pup(*p);
546       pool[i].time = time;      // restore time value
547       prevTime = time;
548     }
549   }
550   delete p;
551   delete [] keepPhase;
552 }
553
554 void LogPool::writeSts(void)
555 {
556   // for whining compilers
557   int i;
558   // generate an automatic unique ID for each log
559   fprintf(stsfp, "PROJECTIONS_ID %s\n", "");
560   fprintf(stsfp, "VERSION %s\n", PROJECTION_VERSION);
561   fprintf(stsfp, "TOTAL_PHASES %d\n", numPhases);
562 #if CMK_HAS_COUNTER_PAPI
563   fprintf(stsfp, "TOTAL_PAPI_EVENTS %d\n", NUMPAPIEVENTS);
564   // for now, use i, next time use papiEvents[i].
565   // **CW** papi event names is a hack.
566   char eventName[PAPI_MAX_STR_LEN];
567   for (i=0;i<NUMPAPIEVENTS;i++) {
568     PAPI_event_code_to_name(papiEvents[i], eventName);
569     fprintf(stsfp, "PAPI_EVENT %d %s\n", i, eventName);
570   }
571 #endif
572   traceWriteSTS(stsfp,CkpvAccess(usrEvents)->length());
573   for(i=0;i<CkpvAccess(usrEvents)->length();i++){
574     fprintf(stsfp, "EVENT %d %s\n", (*CkpvAccess(usrEvents))[i]->e, (*CkpvAccess(usrEvents))[i]->str);
575   }     
576 }
577
578 void LogPool::writeSts(TraceProjections *traceProj){
579   writeSts();
580   if (traceProj != NULL) {
581     CkHashtableIterator  *funcIter = traceProj->getfuncIterator();
582     funcIter->seekStart();
583     int numFuncs = traceProj->getFuncNumber();
584     fprintf(stsfp,"TOTAL_FUNCTIONS %d \n",numFuncs);
585     while(funcIter->hasNext()) {
586       StrKey *key;
587       int *obj = (int *)funcIter->next((void **)&key);
588       fprintf(stsfp,"FUNCTION %d %s \n",*obj,key->getStr());
589     }
590   }
591   fprintf(stsfp, "END\n");
592   fclose(stsfp);
593 }
594
595 void LogPool::writeRC(void)
596 {
597     //CkPrintf("write RC is being executed\n");
598 #ifdef PROJ_ANALYSIS  
599     CkAssert(CkMyPe() == 0);
600     fprintf(rcfp,"RC_GLOBAL_START_TIME %lld\n",
601             (CMK_PUP_LONG_LONG)(1.0e6*globalStartTime));
602     fprintf(rcfp,"RC_GLOBAL_END_TIME   %lld\n",
603             (CMK_PUP_LONG_LONG)(1.0e6*globalEndTime));
604     /* //Yanhua comment it because isOutlierAutomatic is not a variable in trace
605     if (CkpvAccess(_trace)->isOutlierAutomatic()) {
606       fprintf(rcfp,"RC_OUTLIER_FILTERED true\n");
607     } else {
608       fprintf(rcfp,"RC_OUTLIER_FILTERED false\n");
609     }
610     */
611 #endif //PROJ_ANALYSIS
612   fclose(rcfp);
613 }
614
615 void LogPool::writeTopo(void)
616 {
617   TopoManager tmgr;
618   tmgr.printAllocation(topofp);
619   fclose(topofp);
620 }
621
622 #if CMK_BIGSIM_CHARM
623 static void updateProjLog(void *data, double t, double recvT, void *ptr)
624 {
625   LogEntry *log = (LogEntry *)data;
626   FILE *fp = *(FILE **)ptr;
627   log->time = t;
628   log->recvTime = recvT<0.0?0:recvT;
629 //  log->write(fp);
630   toProjectionsFile p(fp);
631   log->pup(p);
632 }
633 #endif
634
635 // flush log entries to disk
636 void LogPool::flushLogBuffer()
637 {
638   if (numEntries) {
639     double writeTime = TraceTimer();
640     writeLog();
641     hasFlushed = true;
642     numEntries = 0;
643     new (&pool[numEntries++]) LogEntry(writeTime, BEGIN_INTERRUPT);
644     new (&pool[numEntries++]) LogEntry(TraceTimer(), END_INTERRUPT);
645   }
646 }
647
648 void LogPool::add(UChar type, UShort mIdx, UShort eIdx,
649                   double time, int event, int pe, int ml, CmiObjId *id, 
650                   double recvT, double cpuT, int numPe)
651 {
652   new (&pool[numEntries++])
653     LogEntry(time, type, mIdx, eIdx, event, pe, ml, id, recvT, cpuT, numPe);
654   if ((type == END_PHASE) || (type == END_COMPUTATION)) {
655     numPhases++;
656   }
657   if(poolSize==numEntries) {
658     flushLogBuffer();
659 #if CMK_BIGSIM_CHARM
660     extern int correctTimeLog;
661     if (correctTimeLog) CmiAbort("I/O interrupt!\n");
662 #endif
663   }
664 #if CMK_BIGSIM_CHARM
665   switch (type) {
666     case BEGIN_PROCESSING:
667       pool[numEntries-1].recvTime = BgGetRecvTime();
668     case END_PROCESSING:
669     case BEGIN_COMPUTATION:
670     case END_COMPUTATION:
671     case CREATION:
672     case BEGIN_PACK:
673     case END_PACK:
674     case BEGIN_UNPACK:
675     case END_UNPACK:
676     case USER_EVENT_PAIR:
677       bgAddProjEvent(&pool[numEntries-1], numEntries-1, time, updateProjLog, &fp, BG_EVENT_PROJ);
678   }
679 #endif
680 }
681
682 void LogPool::add(UChar type,double time,UShort funcID,int lineNum,char *fileName){
683 #ifndef CMK_BIGSIM_CHARM
684   new (&pool[numEntries++])
685         LogEntry(time,type,funcID,lineNum,fileName);
686   if(poolSize == numEntries){
687     flushLogBuffer();
688   }
689 #endif  
690 }
691
692
693   
694 void LogPool::addMemoryUsage(unsigned char type,double time,double memUsage){
695 #ifndef CMK_BIGSIM_CHARM
696   new (&pool[numEntries++])
697         LogEntry(type,time,memUsage);
698   if(poolSize == numEntries){
699     flushLogBuffer();
700   }
701 #endif  
702         
703 }  
704
705
706
707 void LogPool::addUserSupplied(int data){
708         // add an event
709         add(USER_SUPPLIED, 0, 0, TraceTimer(), -1, -1, 0, 0, 0, 0, 0 );
710
711         // set the user supplied value for the previously created event 
712         pool[numEntries-1].setUserSuppliedData(data);
713   }
714
715
716 void LogPool::addUserSuppliedNote(char *note){
717         // add an event
718         add(USER_SUPPLIED_NOTE, 0, 0, TraceTimer(), -1, -1, 0, 0, 0, 0, 0 );
719
720         // set the user supplied note for the previously created event 
721         pool[numEntries-1].setUserSuppliedNote(note);
722   }
723
724 void LogPool::addUserSuppliedBracketedNote(char *note, int eventID, double bt, double et){
725   //CkPrintf("LogPool::addUserSuppliedBracketedNote eventID=%d\n", eventID);
726 #ifndef CMK_BIGSIM_CHARM
727 #if MPI_TRACE_MACHINE_HACK
728   //This part of code is used  to combine the contiguous
729   //MPI_Test and MPI_Iprobe events to reduce the number of
730   //entries
731 #define MPI_TEST_EVENT_ID 60
732 #define MPI_IPROBE_EVENT_ID 70 
733   int lastEvent = pool[numEntries-1].event;
734   if((eventID==MPI_TEST_EVENT_ID || eventID==MPI_IPROBE_EVENT_ID) && (eventID==lastEvent)){
735     //just replace the endtime of last event
736     //CkPrintf("addUserSuppliedBracketNote: for event %d\n", lastEvent);
737     pool[numEntries].endTime = et;
738   }else{
739     new (&pool[numEntries++])
740       LogEntry(bt, et, USER_SUPPLIED_BRACKETED_NOTE, note, eventID);
741   }
742 #else
743   new (&pool[numEntries++])
744     LogEntry(bt, et, USER_SUPPLIED_BRACKETED_NOTE, note, eventID);
745 #endif
746   if(poolSize == numEntries){
747     flushLogBuffer();
748   }
749 #endif  
750 }
751
752
753 /* **CW** Not sure if this is the right thing to do. Feels more like
754    a hack than a solution to Sameer's request to add the destination
755    processor information to multicasts and broadcasts.
756
757    In the unlikely event this method is used for Broadcasts as well,
758    pelist == NULL will be used to indicate a global broadcast with 
759    num PEs.
760 */
761 void LogPool::addCreationMulticast(UShort mIdx, UShort eIdx, double time,
762                                    int event, int pe, int ml, CmiObjId *id,
763                                    double recvT, int numPe, int *pelist)
764 {
765   new (&pool[numEntries++])
766     LogEntry(time, mIdx, eIdx, event, pe, ml, id, recvT, numPe, pelist);
767   if(poolSize==numEntries) {
768     flushLogBuffer();
769   }
770 }
771
772 void LogPool::postProcessLog()
773 {
774 #if CMK_BIGSIM_CHARM
775   bgUpdateProj(1);   // event type
776 #endif
777 }
778
779 void LogPool::modLastEntryTimestamp(double ts)
780 {
781   pool[numEntries-1].time = ts;
782   //pool[numEntries-1].cputime = ts;
783 }
784
785 // /** Constructor for a multicast log entry */
786 // 
787 //  THIS WAS MOVED TO trace-projections.h with the other constructors
788 // 
789 // LogEntry::LogEntry(double tm, unsigned short m, unsigned short e, int ev, int p,
790 //           int ml, CmiObjId *d, double rt, int numPe, int *pelist) 
791 // {
792 //     type = CREATION_MULTICAST; mIdx = m; eIdx = e; event = ev; pe = p; time = tm; msglen = ml;
793 //     if (d) id = *d; else {id.id[0]=id.id[1]=id.id[2]=id.id[3]=-1; };
794 //     recvTime = rt; 
795 //     numpes = numPe;
796 //     userSuppliedNote = NULL;
797 //     if (pelist != NULL) {
798 //      pes = new int[numPe];
799 //      for (int i=0; i<numPe; i++) {
800 //        pes[i] = pelist[i];
801 //      }
802 //     } else {
803 //      pes= NULL;
804 //     }
805 // }
806
807 //void LogEntry::addPapi(LONG_LONG_PAPI *papiVals)
808 //{
809 //#if CMK_HAS_COUNTER_PAPI
810 //   memcpy(papiValues, papiVals, sizeof(LONG_LONG_PAPI)*NUMPAPIEVENTS);
811 //#endif
812 //}
813
814
815
816 void LogEntry::pup(PUP::er &p)
817 {
818   int i;
819   CMK_TYPEDEF_UINT8 itime, iEndTime, irecvtime, icputime;
820   char ret = '\n';
821
822   p|type;
823   if (p.isPacking()) itime = (CMK_TYPEDEF_UINT8)(1.0e6*time);
824   if (p.isPacking()) iEndTime = (CMK_TYPEDEF_UINT8)(1.0e6*endTime);
825
826   switch (type) {
827     case USER_EVENT:
828     case USER_EVENT_PAIR:
829       p|mIdx; p|itime; p|event; p|pe;
830       break;
831     case BEGIN_IDLE:
832     case END_IDLE:
833     case BEGIN_PACK:
834     case END_PACK:
835     case BEGIN_UNPACK:
836     case END_UNPACK:
837       p|itime; p|pe; 
838       break;
839     case BEGIN_PROCESSING:
840       if (p.isPacking()) {
841         irecvtime = (CMK_TYPEDEF_UINT8)(recvTime==-1?-1:1.0e6*recvTime);
842         icputime = (CMK_TYPEDEF_UINT8)(1.0e6*cputime);
843       }
844       p|mIdx; p|eIdx; p|itime; p|event; p|pe; 
845       p|msglen; p|irecvtime; 
846       p|id.id[0]; p|id.id[1]; p|id.id[2]; p|id.id[3];
847       p|icputime;
848 #if CMK_HAS_COUNTER_PAPI
849       //p|numPapiEvents;
850       for (i=0; i<NUMPAPIEVENTS; i++) {
851         // not yet!!!
852         //      p|papiIDs[i]; 
853         p|papiValues[i];
854         
855       }
856 #else
857       //p|numPapiEvents;     // non papi version has value 0
858 #endif
859       if (p.isUnpacking()) {
860         recvTime = irecvtime/1.0e6;
861         cputime = icputime/1.0e6;
862       }
863       break;
864     case END_PROCESSING:
865       if (p.isPacking()) icputime = (CMK_TYPEDEF_UINT8)(1.0e6*cputime);
866       p|mIdx; p|eIdx; p|itime; p|event; p|pe; p|msglen; p|icputime;
867 #if CMK_HAS_COUNTER_PAPI
868       //p|numPapiEvents;
869       for (i=0; i<NUMPAPIEVENTS; i++) {
870         // not yet!!!
871         //      p|papiIDs[i];
872         p|papiValues[i];
873       }
874 #else
875       //p|numPapiEvents;  // non papi version has value 0
876 #endif
877       if (p.isUnpacking()) cputime = icputime/1.0e6;
878       break;
879     case USER_SUPPLIED:
880           p|userSuppliedData;
881           p|itime;
882         break;
883     case USER_SUPPLIED_NOTE:
884           p|itime;
885           int length;
886           if (p.isPacking()) length = strlen(userSuppliedNote);
887           p | length;
888           char space;
889           space = ' ';
890           p | space;
891           if (p.isUnpacking()) {
892             userSuppliedNote = new char[length+1];
893             userSuppliedNote[length] = '\0';
894           }
895           PUParray(p,userSuppliedNote, length);
896           break;
897     case USER_SUPPLIED_BRACKETED_NOTE:
898       //CkPrintf("Writting out a USER_SUPPLIED_BRACKETED_NOTE\n");
899           p|itime;
900           p|iEndTime;
901           p|event;
902           int length2;
903           if (p.isPacking()) length2 = strlen(userSuppliedNote);
904           p | length2;
905           char space2;
906           space2 = ' ';
907           p | space2;
908           if (p.isUnpacking()) {
909             userSuppliedNote = new char[length+1];
910             userSuppliedNote[length] = '\0';
911           }
912           PUParray(p,userSuppliedNote, length2);
913           break;
914     case MEMORY_USAGE_CURRENT:
915       p | memUsage;
916       p | itime;
917         break;
918     case CREATION:
919       if (p.isPacking()) irecvtime = (CMK_TYPEDEF_UINT8)(1.0e6*recvTime);
920       p|mIdx; p|eIdx; p|itime;
921       p|event; p|pe; p|msglen; p|irecvtime;
922       if (p.isUnpacking()) recvTime = irecvtime/1.0e6;
923       break;
924     case CREATION_BCAST:
925       if (p.isPacking()) irecvtime = (CMK_TYPEDEF_UINT8)(1.0e6*recvTime);
926       p|mIdx; p|eIdx; p|itime;
927       p|event; p|pe; p|msglen; p|irecvtime; p|numpes;
928       if (p.isUnpacking()) recvTime = irecvtime/1.0e6;
929       break;
930     case CREATION_MULTICAST:
931       if (p.isPacking()) irecvtime = (CMK_TYPEDEF_UINT8)(1.0e6*recvTime);
932       p|mIdx; p|eIdx; p|itime;
933       p|event; p|pe; p|msglen; p|irecvtime; p|numpes;
934       if (p.isUnpacking()) pes = numpes?new int[numpes]:NULL;
935       for (i=0; i<numpes; i++) p|pes[i];
936       if (p.isUnpacking()) recvTime = irecvtime/1.0e6;
937       break;
938     case MESSAGE_RECV:
939       p|mIdx; p|eIdx; p|itime; p|event; p|pe; p|msglen;
940       break;
941
942     case ENQUEUE:
943     case DEQUEUE:
944       p|mIdx; p|itime; p|event; p|pe;
945       break;
946
947     case BEGIN_INTERRUPT:
948     case END_INTERRUPT:
949       p|itime; p|event; p|pe;
950       break;
951
952       // **CW** absolute timestamps are used here to support a quick
953       // way of determining the total time of a run in projections
954       // visualization.
955     case BEGIN_COMPUTATION:
956     case END_COMPUTATION:
957     case BEGIN_TRACE:
958     case END_TRACE:
959       p|itime;
960       break;
961     case BEGIN_FUNC:
962         p | itime;
963         p | mIdx;
964         p | event;
965         if(!p.isUnpacking()){
966                 p(fName,flen-1);
967         }
968         break;
969     case END_FUNC:
970         p | itime;
971         p | mIdx;
972         break;
973     case END_PHASE:
974       p|eIdx; // FIXME: actually the phase ID
975       p|itime;
976       break;
977     default:
978       CmiError("***Internal Error*** Wierd Event %d.\n", type);
979       break;
980   }
981   if (p.isUnpacking()) time = itime/1.0e6;
982   p|ret;
983 }
984
985 TraceProjections::TraceProjections(char **argv): 
986   curevent(0), inEntry(0), computationStarted(0), 
987         converseExit(0), endTime(0.0), traceNestedEvents(0),
988         currentPhaseID(0), lastPhaseEvent(NULL)
989 {
990   //  CkPrintf("Trace projections dummy constructor called on %d\n",CkMyPe());
991
992   if (CkpvAccess(traceOnPe) == 0) return;
993
994   CtvInitialize(int,curThreadEvent);
995   CkpvInitialize(CmiInt8, CtrLogBufSize);
996   CkpvAccess(CtrLogBufSize) = DefaultLogBufSize;
997   CtvAccess(curThreadEvent)=0;
998   if (CmiGetArgLongDesc(argv,"+logsize",&CkpvAccess(CtrLogBufSize), 
999                        "Log entries to buffer per I/O")) {
1000     if (CkMyPe() == 0) {
1001       CmiPrintf("Trace: logsize: %ld\n", CkpvAccess(CtrLogBufSize));
1002     }
1003   }
1004   checknested = 
1005     CmiGetArgFlagDesc(argv,"+checknested",
1006                       "check projections nest begin end execute events");
1007   traceNestedEvents = 
1008     CmiGetArgFlagDesc(argv,"+tracenested",
1009               "trace projections nest begin/end execute events");
1010   int binary = 
1011     CmiGetArgFlagDesc(argv,"+binary-trace",
1012                       "Write log files in binary format");
1013
1014   CmiInt8 nSubdirs = 0;
1015   CmiGetArgLongDesc(argv,"+trace-subdirs", &nSubdirs, "Number of subdirectories into which traces will be written");
1016
1017
1018 #if CMK_PROJECTIONS_USE_ZLIB
1019   int compressed = true;
1020   CmiGetArgFlagDesc(argv,"+gz-trace","Write log files pre-compressed with gzip");
1021   int disableCompressed = CmiGetArgFlagDesc(argv,"+no-gz-trace","Disable writing log files pre-compressed with gzip");
1022   compressed = compressed && !disableCompressed;
1023 #else
1024   // consume the flag so there's no confusing
1025   CmiGetArgFlagDesc(argv,"+gz-trace",
1026                     "Write log files pre-compressed with gzip");
1027   if(CkMyPe() == 0) CkPrintf("Warning> gz-trace is not supported on this machine!\n");
1028 #endif
1029
1030   // **CW** default to non delta log encoding. The user may choose to do
1031   // create both logs (for debugging) or just the old log timestamping
1032   // (for compatibility).
1033   // Generating just the non delta log takes precedence over generating
1034   // both logs (if both arguments appear on the command line).
1035
1036   // switch to OLD log format until everything works // Gengbin
1037   nonDeltaLog = 1;
1038   deltaLog = 0;
1039   deltaLog = CmiGetArgFlagDesc(argv, "+logDelta",
1040                                   "Generate Delta encoded and simple timestamped log files");
1041
1042   _logPool = new LogPool(CkpvAccess(traceRoot));
1043   _logPool->setNumSubdirs(nSubdirs);
1044   _logPool->setBinary(binary);
1045 #if CMK_PROJECTIONS_USE_ZLIB
1046   _logPool->setCompressed(compressed);
1047 #endif
1048   if (CkMyPe() == 0) {
1049     _logPool->createSts();
1050     _logPool->createRC();
1051     _logPool->createTopo();
1052   }
1053   funcCount=1;
1054
1055 #if CMK_HAS_COUNTER_PAPI
1056   // We initialize and create the event sets for use with PAPI here.
1057   int papiRetValue;
1058   if(CkMyRank()==0){
1059     papiRetValue = PAPI_library_init(PAPI_VER_CURRENT);
1060     if (papiRetValue != PAPI_VER_CURRENT) {
1061       CmiAbort("PAPI Library initialization failure!\n");
1062     }
1063 #if CMK_SMP
1064     if(PAPI_thread_init(pthread_self) != PAPI_OK){
1065       CmiAbort("PAPI could not be initialized in SMP mode!\n");
1066     }
1067 #endif
1068   }
1069
1070 #if CMK_SMP
1071   //PAPI_thread_init has to finish before calling PAPI_create_eventset
1072   CmiNodeAllBarrier();
1073 #endif
1074   // PAPI 3 mandates the initialization of the set to PAPI_NULL
1075   papiEventSet = PAPI_NULL; 
1076   if (PAPI_create_eventset(&papiEventSet) != PAPI_OK) {
1077     CmiAbort("PAPI failed to create event set!\n");
1078   }
1079   papiRetValue = PAPI_add_events(papiEventSet, papiEvents, NUMPAPIEVENTS);
1080   if (papiRetValue != PAPI_OK) {
1081     if (papiRetValue == PAPI_ECNFLCT) {
1082       CmiAbort("PAPI events conflict! Please re-assign event types!\n");
1083     } else {
1084       CmiAbort("PAPI failed to add designated events!\n");
1085     }
1086   }
1087   memset(papiValues, 0, NUMPAPIEVENTS*sizeof(LONG_LONG_PAPI));
1088 #endif
1089 }
1090
1091 int TraceProjections::traceRegisterUserEvent(const char* evt, int e)
1092 {
1093   OPTIMIZED_VERSION
1094   CkAssert(e==-1 || e>=0);
1095   CkAssert(evt != NULL);
1096   int event;
1097   int biggest = -1;
1098   for (int i=0; i<CkpvAccess(usrEvents)->length(); i++) {
1099     int cur = (*CkpvAccess(usrEvents))[i]->e;
1100     if (cur == e) {
1101       //CmiPrintf("%s %s\n", (*CkpvAccess(usrEvents))[i]->str, evt);
1102       if (strcmp((*CkpvAccess(usrEvents))[i]->str, evt) == 0) 
1103         return e;
1104       else
1105         CmiAbort("UserEvent double registered!");
1106     }
1107     if (cur > biggest) biggest = cur;
1108   }
1109   // if no user events have so far been registered. biggest will be -1
1110   // and hence newly assigned event numbers will begin from 0.
1111   if (e==-1) event = biggest+1;  // automatically assign new event number
1112   else event = e;
1113   CkpvAccess(usrEvents)->push_back(new UsrEvent(event,(char *)evt));
1114   return event;
1115 }
1116
1117 void TraceProjections::traceClearEps(void)
1118 {
1119   // In trace-summary, this zeros out the EP bins, to eliminate noise
1120   // from startup.  Here, this isn't useful, since we can do that in
1121   // post-processing
1122 }
1123
1124 void TraceProjections::traceWriteSts(void)
1125 {
1126   if(CkMyPe()==0)
1127     _logPool->writeSts(this);
1128 }
1129
1130 /** 
1131  * **IMPT NOTES**:
1132  *
1133  * This is called when Converse closes during ConverseCommonExit().
1134  * **FIXME**(?) - is this also exposed as a tracing-framework API call?
1135  *
1136  * Some programs bypass CkExit() (like NAMD, which eventually calls
1137  * ConverseExit()), modules like traces will have to pretend to shutdown
1138  * as if CkExit() was called but at the same time avoid making
1139  * subsequent CkExit() calls (which is usually required for allowing
1140  * other modules to shutdown).
1141  *
1142  * Note that we can only get here if CkExit() was not called, since the
1143  * trace module will un-register itself from TraceArray if it did.
1144  *
1145  */
1146 void TraceProjections::traceClose(void)
1147 {
1148 #ifdef PROJ_ANALYSIS
1149   // CkPrintf("CkExit was not called on shutdown on [%d]\n", CkMyPe());
1150
1151   // sets the flag that tells the code not to make the CkExit call later
1152   converseExit = 1;
1153   if (CkMyPe() == 0) {
1154     CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1155     bocProxy.traceProjectionsParallelShutdown(-1);
1156   }
1157   if(CkMyRank() == CkMyNodeSize()){ //communication thread
1158     CkpvAccess(_trace)->endComputation();
1159     delete _logPool;              // will write
1160     // remove myself from traceArray so that no tracing will be called.
1161     CkpvAccess(_traces)->removeTrace(this);
1162   }
1163 #else
1164   // we've already deleted the logpool, so multiple calls to traceClose
1165   // are tolerated.
1166   if (_logPool == NULL) {
1167     return;
1168   }
1169   if(CkMyPe()==0){
1170     _logPool->writeSts(this);
1171   }
1172   CkpvAccess(_trace)->endComputation();
1173   delete _logPool;              // will write
1174   // remove myself from traceArray so that no tracing will be called.
1175   CkpvAccess(_traces)->removeTrace(this);
1176 #endif
1177 }
1178
1179 /**
1180  *  **IMPT NOTES**:
1181  *
1182  *  This is meant to be called internally by the tracing framework.
1183  *
1184  */
1185 void TraceProjections::closeTrace() {
1186   //  CkPrintf("Close Trace called on [%d]\n", CkMyPe());
1187   if (CkMyPe() == 0) {
1188     // CkPrintf("Pe 0 will now write sts and projrc files\n");
1189     _logPool->writeSts(this);
1190     _logPool->writeRC();
1191     _logPool->writeTopo();
1192     // CkPrintf("Pe 0 has now written sts and projrc files\n");
1193   }
1194   delete _logPool;       // will write logs to file
1195 }
1196
1197 #if CMK_SMP_TRACE_COMMTHREAD
1198 void TraceProjections::traceBeginOnCommThread()
1199 {
1200   if (!computationStarted) return;
1201   _logPool->add(BEGIN_TRACE, 0, 0, TraceTimer(), curevent++, CmiNumPes()+CmiMyNode());
1202 }
1203
1204 void TraceProjections::traceEndOnCommThread()
1205 {
1206   _logPool->add(END_TRACE, 0, 0, TraceTimer(), curevent++, CmiNumPes()+CmiMyNode());
1207 }
1208 #endif
1209
1210 void TraceProjections::traceBegin(void)
1211 {
1212   if (!computationStarted) return;
1213   _logPool->add(BEGIN_TRACE, 0, 0, TraceTimer(), curevent++, CkMyPe());
1214 }
1215
1216 void TraceProjections::traceEnd(void)
1217 {
1218   _logPool->add(END_TRACE, 0, 0, TraceTimer(), curevent++, CkMyPe());
1219 }
1220
1221 void TraceProjections::userEvent(int e)
1222 {
1223   if (!computationStarted) return;
1224   _logPool->add(USER_EVENT, e, 0, TraceTimer(),curevent++,CkMyPe());
1225 }
1226
1227 void TraceProjections::userBracketEvent(int e, double bt, double et)
1228 {
1229   if (!computationStarted) return;
1230   // two events record Begin/End of event e.
1231   _logPool->add(USER_EVENT_PAIR, e, 0, TraceTimer(bt), curevent, CkMyPe());
1232   _logPool->add(USER_EVENT_PAIR, e, 0, TraceTimer(et), curevent++, CkMyPe());
1233 }
1234
1235 void TraceProjections::userSuppliedData(int d)
1236 {
1237   if (!computationStarted) return;
1238   _logPool->addUserSupplied(d);
1239 }
1240
1241 void TraceProjections::userSuppliedNote(char *note)
1242 {
1243   if (!computationStarted) return;
1244   _logPool->addUserSuppliedNote(note);
1245 }
1246
1247
1248 void TraceProjections::userSuppliedBracketedNote(char *note, int eventID, double bt, double et)
1249 {
1250   if (!computationStarted) return;
1251   _logPool->addUserSuppliedBracketedNote(note,  eventID,  bt, et);
1252 }
1253
1254 void TraceProjections::memoryUsage(double m)
1255 {
1256   if (!computationStarted) return;
1257   _logPool->addMemoryUsage(MEMORY_USAGE_CURRENT, TraceTimer(), m );
1258   
1259 }
1260
1261
1262 void TraceProjections::creation(envelope *e, int ep, int num)
1263 {
1264   double curTime = TraceTimer();
1265   if (e == 0) {
1266     CtvAccess(curThreadEvent) = curevent;
1267     _logPool->add(CREATION, ForChareMsg, ep, curTime,
1268                   curevent++, CkMyPe(), 0, NULL, 0, 0.0);
1269   } else {
1270     int type=e->getMsgtype();
1271     e->setEvent(curevent);
1272     if (num > 1) {
1273       _logPool->add(CREATION_BCAST, type, ep, curTime,
1274                     curevent++, CkMyPe(), e->getTotalsize(), 
1275                     NULL, 0, 0.0, num);
1276     } else {
1277       _logPool->add(CREATION, type, ep, curTime,
1278                     curevent++, CkMyPe(), e->getTotalsize(), 
1279                     NULL, 0, 0.0);
1280     }
1281   }
1282 }
1283
1284 //This function is only called from a comm thread in SMP mode. 
1285 void TraceProjections::creation(char *msg)
1286 {
1287 #if CMK_SMP_TRACE_COMMTHREAD
1288         // msg must be a charm message
1289     envelope *e = (envelope *)msg;
1290     int ep = e->getEpIdx();
1291     if(_entryTable[ep]->traceEnabled) {
1292         creation(e, ep, 1);
1293         e->setSrcPe(CkMyPe());              // pretend I am the sender
1294     }
1295 #endif
1296 }
1297
1298 void TraceProjections::traceCommSetMsgID(char *msg)
1299 {
1300 #if CMK_SMP_TRACE_COMMTHREAD
1301     // msg must be a charm message
1302     envelope *e = (envelope *)msg;
1303     int ep = e->getEpIdx();
1304     if(_entryTable[ep]->traceEnabled) {
1305         e->setSrcPe(CkMyPe());              // pretend I am the sender
1306         e->setEvent(curevent);
1307     }
1308 #endif
1309 }
1310
1311 void TraceProjections::traceGetMsgID(char *msg, int *pe, int *event)
1312 {
1313     // msg must be a charm message
1314     *pe = *event = -1;
1315     envelope *e = (envelope *)msg;
1316     int ep = e->getEpIdx();
1317     if(_entryTable[ep]->traceEnabled) {
1318         *pe = e->getSrcPe();
1319         *event = e->getEvent();
1320     }
1321 }
1322
1323 void TraceProjections::traceSetMsgID(char *msg, int pe, int event)
1324 {
1325        // msg must be a charm message
1326     envelope *e = (envelope *)msg;
1327     int ep = e->getEpIdx();
1328     if(ep<=0 || ep>=_entryTable.size()) return;
1329     if (e->getSrcPe()<0 || e->getSrcPe()>=CkNumPes()+CkNumNodes()) return;
1330     if (e->getMsgtype()<=0 || e->getMsgtype()>=LAST_CK_ENVELOPE_TYPE) return;
1331     if(_entryTable[ep]->traceEnabled) {
1332         e->setSrcPe(pe);
1333         e->setEvent(event);
1334     }
1335 }
1336
1337 /* **CW** Non-disruptive attempt to add destination PE knowledge to
1338    Communication Library-specific Multicasts via new event 
1339    CREATION_MULTICAST.
1340 */
1341
1342 void TraceProjections::creationMulticast(envelope *e, int ep, int num,
1343                                          int *pelist)
1344 {
1345   double curTime = TraceTimer();
1346   if (e==0) {
1347     CtvAccess(curThreadEvent)=curevent;
1348     _logPool->addCreationMulticast(ForChareMsg, ep, curTime, curevent++,
1349                                    CkMyPe(), 0, 0, 0.0, num, pelist);
1350   } else {
1351     int type=e->getMsgtype();
1352     e->setEvent(curevent);
1353     _logPool->addCreationMulticast(type, ep, curTime, curevent++, CkMyPe(),
1354                                    e->getTotalsize(), 0, 0.0, num, pelist);
1355   }
1356 }
1357
1358 void TraceProjections::creationDone(int num)
1359 {
1360   // modified the creation done time of the last num log entries
1361   // FIXME: totally a hack
1362   double curTime = TraceTimer();
1363   int idx = _logPool->numEntries-1;
1364   while (idx >=0 && num >0 ) {
1365     LogEntry &log = _logPool->pool[idx];
1366     if ((log.type == CREATION) ||
1367         (log.type == CREATION_BCAST) ||
1368         (log.type == CREATION_MULTICAST)) {
1369       log.recvTime = curTime - log.time;
1370       num --;
1371     }
1372     idx--;
1373   }
1374 }
1375
1376 void TraceProjections::beginExecute(CmiObjId *tid)
1377 {
1378 #if CMK_HAS_COUNTER_PAPI
1379   if (PAPI_read(papiEventSet, papiValues) != PAPI_OK) {
1380     CmiAbort("PAPI failed to read at begin execute!\n");
1381   }
1382 #endif
1383   if (checknested && inEntry) CmiAbort("Nested Begin Execute!\n");
1384   execEvent = CtvAccess(curThreadEvent);
1385   execEp = (-1);
1386   _logPool->add(BEGIN_PROCESSING,ForChareMsg,_threadEP,TraceTimer(),
1387                 execEvent,CkMyPe(), 0, tid);
1388 #if CMK_HAS_COUNTER_PAPI
1389   _logPool->addPapi(papiValues);
1390 #endif
1391   inEntry = 1;
1392 }
1393
1394 void TraceProjections::beginExecute(envelope *e)
1395 {
1396   if(e==0) {
1397 #if CMK_HAS_COUNTER_PAPI
1398     if (PAPI_read(papiEventSet, papiValues) != PAPI_OK) {
1399       CmiAbort("PAPI failed to read at begin execute!\n");
1400     }
1401 #endif
1402     if (checknested && inEntry) CmiAbort("Nested Begin Execute!\n");
1403     execEvent = CtvAccess(curThreadEvent);
1404     execEp = (-1);
1405     _logPool->add(BEGIN_PROCESSING,ForChareMsg,_threadEP,TraceTimer(),
1406                   execEvent,CkMyPe(), 0, NULL, 0.0, TraceCpuTimer());
1407 #if CMK_HAS_COUNTER_PAPI
1408     _logPool->addPapi(papiValues);
1409 #endif
1410     inEntry = 1;
1411   } else {
1412     beginExecute(e->getEvent(),e->getMsgtype(),e->getEpIdx(),
1413                  e->getSrcPe(),e->getTotalsize());
1414   }
1415 }
1416
1417 void TraceProjections::beginExecute(char *msg){
1418 #if CMK_SMP_TRACE_COMMTHREAD
1419         //This function is called from comm thread in SMP mode
1420     envelope *e = (envelope *)msg;
1421     int ep = e->getEpIdx();
1422     if(_entryTable[ep]->traceEnabled)
1423                 beginExecute(e);
1424 #endif
1425 }
1426
1427 void TraceProjections::beginExecute(int event, int msgType, int ep, int srcPe,
1428                                     int mlen, CmiObjId *idx)
1429 {
1430   if (traceNestedEvents) {
1431     if (! nestedEvents.isEmpty()) {
1432       endExecuteLocal();
1433     }
1434     nestedEvents.enq(NestedEvent(event, msgType, ep, srcPe, mlen, idx));
1435   }
1436   beginExecuteLocal(event, msgType, ep, srcPe, mlen, idx);
1437 }
1438
1439 void TraceProjections::changeLastEntryTimestamp(double ts)
1440 {
1441   _logPool->modLastEntryTimestamp(ts);
1442 }
1443
1444 void TraceProjections::beginExecuteLocal(int event, int msgType, int ep, int srcPe,
1445                                     int mlen, CmiObjId *idx)
1446 {
1447 #if CMK_HAS_COUNTER_PAPI
1448   if (PAPI_read(papiEventSet, papiValues) != PAPI_OK) {
1449     CmiAbort("PAPI failed to read at begin execute!\n");
1450   }
1451 #endif
1452   if (checknested && inEntry) CmiAbort("Nested Begin Execute!\n");
1453   execEvent=event;
1454   execEp=ep;
1455   execPe=srcPe;
1456   _logPool->add(BEGIN_PROCESSING,msgType,ep,TraceTimer(),event,
1457                 srcPe, mlen, idx, 0.0, TraceCpuTimer());
1458 #if CMK_HAS_COUNTER_PAPI
1459   _logPool->addPapi(papiValues);
1460 #endif
1461   inEntry = 1;
1462 }
1463
1464 void TraceProjections::endExecute(void)
1465 {
1466   if (traceNestedEvents) nestedEvents.deq();
1467   endExecuteLocal();
1468   if (traceNestedEvents) {
1469     if (! nestedEvents.isEmpty()) {
1470       NestedEvent &ne = nestedEvents.peek();
1471       beginExecuteLocal(ne.event, ne.msgType, ne.ep, ne.srcPe, ne.ml, ne.idx);
1472     }
1473   }
1474 }
1475
1476 void TraceProjections::endExecute(char *msg)
1477 {
1478 #if CMK_SMP_TRACE_COMMTHREAD
1479         //This function is called from comm thread in SMP mode
1480     envelope *e = (envelope *)msg;
1481     int ep = e->getEpIdx();
1482     if(_entryTable[ep]->traceEnabled)
1483                 endExecute();
1484 #endif  
1485 }
1486
1487 void TraceProjections::endExecuteLocal(void)
1488 {
1489 #if CMK_HAS_COUNTER_PAPI
1490   if (PAPI_read(papiEventSet, papiValues) != PAPI_OK) {
1491     CmiAbort("PAPI failed to read at end execute!\n");
1492   }
1493 #endif
1494   if (checknested && !inEntry) CmiAbort("Nested EndExecute!\n");
1495   double cputime = TraceCpuTimer();
1496   if(execEp == (-1)) {
1497     _logPool->add(END_PROCESSING, 0, _threadEP, TraceTimer(),
1498                   execEvent, CkMyPe(), 0, NULL, 0.0, cputime);
1499   } else {
1500     _logPool->add(END_PROCESSING, 0, execEp, TraceTimer(),
1501                   execEvent, execPe, 0, NULL, 0.0, cputime);
1502   }
1503 #if CMK_HAS_COUNTER_PAPI
1504   _logPool->addPapi(papiValues);
1505 #endif
1506   inEntry = 0;
1507 }
1508
1509 void TraceProjections::messageRecv(char *env, int pe)
1510 {
1511 #if 0
1512   envelope *e = (envelope *)env;
1513   int msgType = e->getMsgtype();
1514   int ep = e->getEpIdx();
1515 #if 0
1516   if (msgType==NewChareMsg || msgType==NewVChareMsg
1517           || msgType==ForChareMsg || msgType==ForVidMsg
1518           || msgType==BocInitMsg || msgType==NodeBocInitMsg
1519           || msgType==ForBocMsg || msgType==ForNodeBocMsg)
1520     ep = e->getEpIdx();
1521   else
1522     ep = _threadEP;
1523 #endif
1524   _logPool->add(MESSAGE_RECV, msgType, ep, TraceTimer(),
1525                 curevent++, e->getSrcPe(), e->getTotalsize());
1526 #endif
1527 }
1528
1529 void TraceProjections::beginIdle(double curWallTime)
1530 {
1531   _logPool->add(BEGIN_IDLE, 0, 0, TraceTimer(curWallTime), 0, CkMyPe());
1532 }
1533
1534 void TraceProjections::endIdle(double curWallTime)
1535 {
1536   _logPool->add(END_IDLE, 0, 0, TraceTimer(curWallTime), 0, CkMyPe());
1537 }
1538
1539 void TraceProjections::beginPack(void)
1540 {
1541   _logPool->add(BEGIN_PACK, 0, 0, TraceTimer(), 0, CkMyPe());
1542 }
1543
1544 void TraceProjections::endPack(void)
1545 {
1546   _logPool->add(END_PACK, 0, 0, TraceTimer(), 0, CkMyPe());
1547 }
1548
1549 void TraceProjections::beginUnpack(void)
1550 {
1551   _logPool->add(BEGIN_UNPACK, 0, 0, TraceTimer(), 0, CkMyPe());
1552 }
1553
1554 void TraceProjections::endUnpack(void)
1555 {
1556   _logPool->add(END_UNPACK, 0, 0, TraceTimer(), 0, CkMyPe());
1557 }
1558
1559 void TraceProjections::enqueue(envelope *) {}
1560
1561 void TraceProjections::dequeue(envelope *) {}
1562
1563 void TraceProjections::beginComputation(void)
1564 {
1565   computationStarted = 1;
1566
1567   // Executes the callback function provided by the machine
1568   // layer. This is the proper method to register user events in a
1569   // machine layer because projections is a charm++ module.
1570   if (CkpvAccess(traceOnPe) != 0) {
1571     void (*ptr)() = registerMachineUserEvents();
1572     if (ptr != NULL) {
1573       ptr();
1574     }
1575   }
1576 //  CkpvAccess(traceInitTime) = TRACE_TIMER();
1577 //  CkpvAccess(traceInitCpuTime) = TRACE_CPUTIMER();
1578   _logPool->add(BEGIN_COMPUTATION, 0, 0, TraceTimer(), -1, -1);
1579 #if CMK_HAS_COUNTER_PAPI
1580   // we start the counters here
1581   if (PAPI_start(papiEventSet) != PAPI_OK) {
1582     CmiAbort("PAPI failed to start designated counters!\n");
1583   }
1584 #endif
1585 }
1586
1587 void TraceProjections::endComputation(void)
1588 {
1589 #if CMK_HAS_COUNTER_PAPI
1590   // we stop the counters here. A silent failure is alright since we
1591   // are already at the end of the program.
1592   if (PAPI_stop(papiEventSet, papiValues) != PAPI_OK) {
1593     CkPrintf("Warning: PAPI failed to stop correctly!\n");
1594   }
1595   // NOTE: We should not do a complete close of PAPI until after the
1596   // sts writer is done.
1597 #endif
1598   endTime = TraceTimer();
1599   _logPool->add(END_COMPUTATION, 0, 0, endTime, -1, -1);
1600   /*
1601   CkPrintf("End Computation [%d] records time as %lf\n", CkMyPe(), 
1602            endTime*1e06);
1603   */
1604 }
1605
1606 int TraceProjections::idxRegistered(int idx)
1607 {
1608     int idxVecLen = idxVec.size();
1609     for(int i=0; i<idxVecLen; i++)
1610     {
1611         if(idx == idxVec[i])
1612             return 1;
1613     }
1614     return 0;
1615 }
1616
1617 void TraceProjections::regFunc(const char *name, int &idx, int idxSpecifiedByUser){
1618     StrKey k((char*)name,strlen(name));
1619     int num = funcHashtable.get(k);
1620     
1621     if(num!=0) {
1622         return;
1623         //as for mpi programs, the same function may be registered for several times
1624         //CmiError("\"%s has been already registered! Please change the name!\"\n", name);
1625     }
1626     
1627     int isIdxExisting=0;
1628     if(idxSpecifiedByUser)
1629         isIdxExisting=idxRegistered(idx);
1630     if(isIdxExisting){
1631         return;
1632         //same reason with num!=0
1633         //CmiError("The identifier %d for the trace function has been already registered!", idx);
1634     }
1635
1636     if(idxSpecifiedByUser) {
1637         char *st = new char[strlen(name)+1];
1638         memcpy(st,name,strlen(name)+1);
1639         StrKey *newKey = new StrKey(st,strlen(st));
1640         int &ref = funcHashtable.put(*newKey);
1641         ref=idx;
1642         funcCount++;
1643         idxVec.push_back(idx);  
1644     } else {
1645         char *st = new char[strlen(name)+1];
1646         memcpy(st,name,strlen(name)+1);
1647         StrKey *newKey = new StrKey(st,strlen(st));
1648         int &ref = funcHashtable.put(*newKey);
1649         ref=funcCount;
1650         num = funcCount;
1651         funcCount++;
1652         idx = num;
1653         idxVec.push_back(idx);
1654     }
1655 }
1656
1657 void TraceProjections::beginFunc(char *name,char *file,int line){
1658         StrKey k(name,strlen(name));    
1659         unsigned short  num = (unsigned short)funcHashtable.get(k);
1660         beginFunc(num,file,line);
1661 }
1662
1663 void TraceProjections::beginFunc(int idx,char *file,int line){
1664         if(idx <= 0){
1665                 CmiError("Unregistered function id %d being used in %s:%d \n",idx,file,line);
1666         }       
1667         _logPool->add(BEGIN_FUNC,TraceTimer(),idx,line,file);
1668 }
1669
1670 void TraceProjections::endFunc(char *name){
1671         StrKey k(name,strlen(name));    
1672         int num = funcHashtable.get(k);
1673         endFunc(num);   
1674 }
1675
1676 void TraceProjections::endFunc(int num){
1677         if(num <= 0){
1678                 printf("endFunc without start :O\n");
1679         }
1680         _logPool->add(END_FUNC,TraceTimer(),num,0,NULL);
1681 }
1682
1683 // specialized PUP:ers for handling trace projections logs
1684 void toProjectionsFile::bytes(void *p,int n,size_t itemSize,dataType t)
1685 {
1686   for (int i=0;i<n;i++) 
1687     switch(t) {
1688     case Tchar: CheckAndFPrintF(f,"%c",((char *)p)[i]); break;
1689     case Tuchar:
1690     case Tbyte: CheckAndFPrintF(f,"%d",((unsigned char *)p)[i]); break;
1691     case Tshort: CheckAndFPrintF(f," %d",((short *)p)[i]); break;
1692     case Tushort: CheckAndFPrintF(f," %u",((unsigned short *)p)[i]); break;
1693     case Tint: CheckAndFPrintF(f," %d",((int *)p)[i]); break;
1694     case Tuint: CheckAndFPrintF(f," %u",((unsigned int *)p)[i]); break;
1695     case Tlong: CheckAndFPrintF(f," %ld",((long *)p)[i]); break;
1696     case Tulong: CheckAndFPrintF(f," %lu",((unsigned long *)p)[i]); break;
1697     case Tfloat: CheckAndFPrintF(f," %.7g",((float *)p)[i]); break;
1698     case Tdouble: CheckAndFPrintF(f," %.15g",((double *)p)[i]); break;
1699 #ifdef CMK_PUP_LONG_LONG
1700     case Tlonglong: CheckAndFPrintF(f," %lld",((CMK_PUP_LONG_LONG *)p)[i]); break;
1701     case Tulonglong: CheckAndFPrintF(f," %llu",((unsigned CMK_PUP_LONG_LONG *)p)[i]); break;
1702 #endif
1703     default: CmiAbort("Unrecognized pup type code!");
1704     };
1705 }
1706
1707 void fromProjectionsFile::bytes(void *p,int n,size_t itemSize,dataType t)
1708 {
1709   for (int i=0;i<n;i++) 
1710     switch(t) {
1711     case Tchar: { 
1712       char c = fgetc(f);
1713       if (c==EOF)
1714         parseError("Could not match character");
1715       else
1716         ((char *)p)[i] = c;
1717       break;
1718     }
1719     case Tuchar:
1720     case Tbyte: ((unsigned char *)p)[i]=(unsigned char)readInt("%d"); break;
1721     case Tshort:((short *)p)[i]=(short)readInt(); break;
1722     case Tushort: ((unsigned short *)p)[i]=(unsigned short)readUint(); break;
1723     case Tint:  ((int *)p)[i]=readInt(); break;
1724     case Tuint: ((unsigned int *)p)[i]=readUint(); break;
1725     case Tlong: ((long *)p)[i]=readInt(); break;
1726     case Tulong:((unsigned long *)p)[i]=readUint(); break;
1727     case Tfloat: ((float *)p)[i]=(float)readDouble(); break;
1728     case Tdouble:((double *)p)[i]=readDouble(); break;
1729 #ifdef CMK_PUP_LONG_LONG
1730     case Tlonglong: ((CMK_PUP_LONG_LONG *)p)[i]=readLongInt(); break;
1731     case Tulonglong: ((unsigned CMK_PUP_LONG_LONG *)p)[i]=readLongInt(); break;
1732 #endif
1733     default: CmiAbort("Unrecognized pup type code!");
1734     };
1735 }
1736
1737 #if CMK_PROJECTIONS_USE_ZLIB
1738 void toProjectionsGZFile::bytes(void *p,int n,size_t itemSize,dataType t)
1739 {
1740   for (int i=0;i<n;i++) 
1741     switch(t) {
1742     case Tchar: gzprintf(f,"%c",((char *)p)[i]); break;
1743     case Tuchar:
1744     case Tbyte: gzprintf(f,"%d",((unsigned char *)p)[i]); break;
1745     case Tshort: gzprintf(f," %d",((short *)p)[i]); break;
1746     case Tushort: gzprintf(f," %u",((unsigned short *)p)[i]); break;
1747     case Tint: gzprintf(f," %d",((int *)p)[i]); break;
1748     case Tuint: gzprintf(f," %u",((unsigned int *)p)[i]); break;
1749     case Tlong: gzprintf(f," %ld",((long *)p)[i]); break;
1750     case Tulong: gzprintf(f," %lu",((unsigned long *)p)[i]); break;
1751     case Tfloat: gzprintf(f," %.7g",((float *)p)[i]); break;
1752     case Tdouble: gzprintf(f," %.15g",((double *)p)[i]); break;
1753 #ifdef CMK_PUP_LONG_LONG
1754     case Tlonglong: gzprintf(f," %lld",((CMK_PUP_LONG_LONG *)p)[i]); break;
1755     case Tulonglong: gzprintf(f," %llu",((unsigned CMK_PUP_LONG_LONG *)p)[i]); break;
1756 #endif
1757     default: CmiAbort("Unrecognized pup type code!");
1758     };
1759 }
1760 #endif
1761
1762 void TraceProjections::endPhase() {
1763   double currentPhaseTime = TraceTimer();
1764   if (lastPhaseEvent != NULL) {
1765   } else {
1766     if (_logPool->pool != NULL) {
1767       // assumed to be BEGIN_COMPUTATION
1768     } else {
1769       CkPrintf("[%d] Warning: End Phase encountered in an empty log. Inserting BEGIN_COMPUTATION event\n", CkMyPe());
1770       _logPool->add(BEGIN_COMPUTATION, 0, 0, currentPhaseTime, -1, -1);
1771     }
1772   }
1773
1774   /* Insert endPhase event here. */
1775   /* FIXME: Format should be TYPE, PHASE#, TimeStamp, [StartTime] */
1776   /*        We currently "borrow" from the standard add() method. */
1777   /*        It should really be its own add() method.             */
1778   /* NOTE: assignment to lastPhaseEvent is "pre-emptive".         */
1779   lastPhaseEvent = &(_logPool->pool[_logPool->numEntries]);
1780   _logPool->add(END_PHASE, 0, currentPhaseID, currentPhaseTime, -1, CkMyPe());
1781   currentPhaseID++;
1782 }
1783
1784 #ifdef PROJ_ANALYSIS
1785 // ***** FROM HERE, ALL BOC-BASED FUNCTIONALITY IS DEFINED *******
1786
1787
1788 // ***@@@@ REGISTRATION FUNCTIONS/METHODS @@@@***
1789
1790 void registerOutlierReduction() {
1791   outlierReductionType =
1792     CkReduction::addReducer(outlierReduction);
1793   minMaxReductionType =
1794     CkReduction::addReducer(minMaxReduction);
1795 }
1796
1797 /**
1798  * **IMPT NOTES**:
1799  *
1800  * This is the C++ code that is registered to be activated at module
1801  * shutdown. This is called exactly once on processor 0. Module shutdown
1802  * is initiated as a result of a CkExit() call by the application code
1803  * 
1804  * The exit function must ultimately call CkExit() again to
1805  * so that other module exit functions may proceed after this module is
1806  * done.
1807  *
1808  */
1809 // FIXME: WHY extern "C"???
1810 extern "C" void TraceProjectionsExitHandler()
1811 {
1812 #if CMK_TRACE_ENABLED
1813   // CkPrintf("[%d] TraceProjectionsExitHandler called!\n", CkMyPe());
1814   CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1815   bocProxy.traceProjectionsParallelShutdown(CkMyPe());
1816 #else
1817   CkExit();
1818 #endif
1819 }
1820
1821 // This is called once on each processor but the idiom of use appears
1822 // to be to only have processor 0 register the function.
1823 //
1824 // See initnode in trace-projections.ci
1825 void initTraceProjectionsBOC()
1826 {
1827   // CkPrintf("[%d] Trace Projections initialization called!\n", CkMyPe());
1828 #ifdef __BIGSIM__
1829   if (BgNodeRank() == 0) {
1830 #else
1831     if (CkMyRank() == 0) {
1832 #endif
1833       registerExitFn(TraceProjectionsExitHandler);
1834     }
1835 #if 0
1836   } // this is so indentation does not get messed up
1837 #endif
1838 }
1839
1840 // mainchare for trace-projections BOC-operations. 
1841 // Instantiated at processor 0 and ONLY resides on processor 0 for the 
1842 // rest of its life.
1843 //
1844 // Responsible for:
1845 //   1. Handling commandline arguments
1846 //   2. Creating any objects required for proper BOC operations.
1847 //
1848 TraceProjectionsInit::TraceProjectionsInit(CkArgMsg *msg) {
1849   /** Options for Outlier Analysis */
1850   // defaults. Things will change with support for interactive analysis.
1851   bool findOutliers = false;
1852   bool outlierAutomatic = true;
1853   int numKSeeds = 10; 
1854
1855   int peNumKeep = CkNumPes();  // used as a default
1856   double entryThreshold = 0.0;
1857   bool outlierUsePhases = false;
1858   if (outlierAutomatic) {
1859     CmiGetArgIntDesc(msg->argv, "+outlierNumSeeds", &numKSeeds,
1860                      "Number of cluster seeds to apply at outlier analysis.");
1861     CmiGetArgIntDesc(msg->argv, "+outlierPeNumKeep", 
1862                      &peNumKeep, "Number of Processors to retain data");
1863     CmiGetArgDoubleDesc(msg->argv, "+outlierEpThresh", &entryThreshold,
1864                         "Minimum significance of entry points to be considered for clustering (%).");
1865     findOutliers =
1866       CmiGetArgFlagDesc(msg->argv,"+outlier", "Find Outliers.");
1867     outlierUsePhases = 
1868       CmiGetArgFlagDesc(msg->argv,"+outlierUsePhases",
1869                         "Apply automatic outlier analysis to any available phases.");
1870     if (outlierUsePhases) {
1871       // if the user wants to use an outlier feature, it is assumed outlier
1872       //    analysis is desired.
1873       findOutliers = true;
1874     }
1875   }
1876   bool findStartTime = (CmiTimerAbsolute()==1);
1877   traceProjectionsGID = CProxy_TraceProjectionsBOC::ckNew(findOutliers, findStartTime);
1878   if (findOutliers) {
1879     kMeansGID = CProxy_KMeansBOC::ckNew(outlierAutomatic,
1880                                         numKSeeds,
1881                                         peNumKeep,
1882                                         entryThreshold,
1883                                         outlierUsePhases);
1884   }
1885 }
1886
1887 // Called on every processor.
1888 void TraceProjectionsBOC::traceProjectionsParallelShutdown(int pe) {
1889   //CmiPrintf("[%d] traceProjectionsParallelShutdown called from . \n", CkMyPe(), pe);
1890   endPe = pe;                // the pe that starts CkExit()
1891   if (CkMyPe() == 0) {
1892     analysisStartTime = CmiWallTimer();
1893   }
1894   CkpvAccess(_trace)->endComputation();
1895   // no more tracing for projections on this processor after this point. 
1896   // Note that clear must be called after remove, or bad things will happen.
1897   CkpvAccess(_traces)->removeTrace(CkpvAccess(_trace));
1898   CkpvAccess(_traces)->clearTrace();
1899
1900   // From this point, we start multiple chains of reductions and broadcasts to
1901   // perform final online analysis activities.
1902
1903   // Start all parallel operations at once. 
1904   //   These MUST NOT modify base performance data in LogPool. If they must,
1905   //   then the parallel operations must be phased (and this code to be
1906   //   restructured as necessary)
1907   CProxy_KMeansBOC kMeansProxy(kMeansGID);
1908   CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1909   if (findOutliers) {
1910     parModulesRemaining++;
1911     kMeansProxy[CkMyPe()].startKMeansAnalysis();
1912   }
1913   parModulesRemaining++;
1914   if (findStartTime) 
1915   bocProxy[CkMyPe()].startTimeAnalysis();
1916   else
1917   bocProxy[CkMyPe()].startEndTimeAnalysis();
1918 }
1919
1920 // Called on each processor
1921 void KMeansBOC::startKMeansAnalysis() {
1922   // Initialize all necessary structures
1923   LogPool *pool = CkpvAccess(_trace)->_logPool;
1924
1925  if(CkMyPe()==0)     CkPrintf("[%d] KMeansBOC::startKMeansAnalysis time=\t%g\n", CkMyPe(), CkWallTimer() );
1926   int flushInt = 0;
1927   if (pool->hasFlushed) {
1928     flushInt = 1;
1929   }
1930   
1931   CkCallback cb(CkIndex_KMeansBOC::flushCheck(NULL), 
1932                 0, thisProxy);
1933   contribute(sizeof(int), &flushInt, CkReduction::logical_or, cb);  
1934 }
1935
1936 // Called on processor 0
1937 void KMeansBOC::flushCheck(CkReductionMsg *msg) {
1938   int someFlush = *((int *)msg->getData());
1939
1940   // if(CkMyPe()==0) CkPrintf("[%d] KMeansBOC::flushCheck time=\t%g\n", CkMyPe(), CkWallTimer() );
1941   
1942   if (someFlush == 0) {
1943     // Data intact proceed with KMeans analysis
1944     CProxy_KMeansBOC kMeansProxy(kMeansGID);
1945     kMeansProxy.flushCheckDone();
1946   } else {
1947     // Some processor had flushed it data at some point, abandon KMeans
1948     CkPrintf("Warning: Some processor has flushed its data. No KMeans will be conducted\n");
1949     // terminate KMeans
1950     CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1951     bocProxy[0].kMeansDone();
1952   }
1953 }
1954
1955 // Called on each processor
1956 void KMeansBOC::flushCheckDone() {
1957   // **FIXME** - more flexible metric collection scheme may be necessary
1958   //   in the future for production use.
1959   LogPool *pool = CkpvAccess(_trace)->_logPool;
1960
1961   // if(CkMyPe()==0)     CkPrintf("[%d] KMeansBOC::flushCheckDone time=\t%g\n", CkMyPe(), CkWallTimer() );
1962
1963   numEntryMethods = _entryTable.size();
1964   numMetrics = numEntryMethods + 2; // EPtime + idle and overhead
1965
1966   // maintained across phases
1967   markedBegin = false;
1968   markedIdle = false;
1969   beginBlockTime = 0.0;
1970   beginIdleBlockTime = 0.0;
1971   lastBeginEPIdx = -1; // none
1972
1973   lastPhaseIdx = 0;
1974   currentExecTimes = NULL;
1975   currentPhase = 0;
1976   selected = false;
1977
1978   pool->initializePhases();
1979
1980   // incoming K Seeds and the per-phase filter
1981   incKSeeds = new double[numK*numMetrics];
1982   keepMetric = new bool[numMetrics];
1983
1984   //  Something wrong when call thisProxy[CkMyPe()].getNextPhaseMetrics() !??!
1985   //  CProxy_KMeansBOC kMeansProxy(kMeansGID);
1986   //  kMeansProxy[CkMyPe()].getNextPhaseMetrics();
1987   thisProxy[CkMyPe()].getNextPhaseMetrics();
1988 }
1989
1990 // Called on each processor.
1991 void KMeansBOC::getNextPhaseMetrics() {
1992   // Assumes the presence of the complete logs on this processor.
1993   // Assumes first event is always BEGIN_COMPUTATION
1994   // Assumes each processor sees the same number of phases.
1995   //
1996   // In this code, we collect performance data for this processor.
1997   // All times are in seconds.
1998
1999   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::getNextPhaseMetrics time=\t%g\n", CkMyPe(), CkWallTimer() );  
2000
2001   if (usePhases) {
2002     DEBUGF("[%d] Using Phases\n", CkMyPe());
2003   } else {
2004     DEBUGF("[%d] NOT using Phases\n", CkMyPe());
2005   }
2006   
2007   if (currentExecTimes != NULL) {
2008     delete [] currentExecTimes;
2009   }
2010   currentExecTimes = new double[numMetrics];
2011   for (int i=0; i<numMetrics; i++) {
2012     currentExecTimes[i] = 0.0;
2013   }
2014
2015   int numEventMethods = _entryTable.size();
2016   LogPool *pool = CkpvAccess(_trace)->_logPool;
2017   
2018   CkAssert(pool->numEntries > lastPhaseIdx);
2019   double totalPhaseTime = 0.0;
2020   double totalActiveTime = 0.0; // entry method + idle
2021
2022   for (int i=lastPhaseIdx; i<pool->numEntries; i++) {
2023     if (pool->pool[i].type == BEGIN_PROCESSING) {
2024       // check pairing
2025       if (!markedBegin) {
2026         markedBegin = true;
2027       }
2028       beginBlockTime = pool->pool[i].time;
2029       lastBeginEPIdx = pool->pool[i].eIdx;
2030     } else if (pool->pool[i].type == END_PROCESSING) {
2031       // check pairing
2032       // if End without a begin, just ignore
2033       //   this event. If a phase-boundary is crossed, the Begin
2034       //   event would be maintained in beginBlockTime, so it is 
2035       //   not a problem.
2036       if (markedBegin) {
2037         markedBegin = false;
2038         if (pool->pool[i].event < 0)
2039         {
2040           // ignore dummy events. **FIXME** as they have no eIdx?
2041           continue;
2042         }
2043         currentExecTimes[pool->pool[i].eIdx] += 
2044           pool->pool[i].time - beginBlockTime;
2045         totalActiveTime += pool->pool[i].time - beginBlockTime;
2046         lastBeginEPIdx = -1;
2047       }
2048     } else if (pool->pool[i].type == BEGIN_IDLE) {
2049       // check pairing
2050       if (!markedIdle) {
2051         markedIdle = true;
2052       }
2053       beginIdleBlockTime = pool->pool[i].time;
2054     } else if (pool->pool[i].type == END_IDLE) {
2055       // check pairing
2056       if (markedIdle) {
2057         markedIdle = false;
2058         currentExecTimes[numEventMethods] += 
2059           pool->pool[i].time - beginIdleBlockTime;
2060         totalActiveTime += pool->pool[i].time - beginIdleBlockTime;
2061       }
2062     } else if (pool->pool[i].type == END_PHASE) {
2063       // ignored when not using phases
2064       if (usePhases) {
2065         // when we've not visited this node before
2066         if (i != lastPhaseIdx) { 
2067           totalPhaseTime = 
2068             pool->pool[i].time - pool->pool[lastPhaseIdx].time;
2069           // it is important that proper accounting of time take place here.
2070           // Note that END_PHASE events inevitably occur in the context of
2071           //   some entry method by the way the tracing API is designed.
2072           if (markedBegin) {
2073             CkAssert(lastBeginEPIdx >= 0);
2074             currentExecTimes[lastBeginEPIdx] += 
2075               pool->pool[i].time - beginBlockTime;
2076             totalActiveTime += pool->pool[i].time - beginBlockTime;
2077             // this is so the remainder contributes to the next phase
2078             beginBlockTime = pool->pool[i].time;
2079           }
2080           // The following is unlikely, but stranger things have happened.
2081           if (markedIdle) {
2082             currentExecTimes[numEventMethods] +=
2083               pool->pool[i].time - beginIdleBlockTime;
2084             totalActiveTime += pool->pool[i].time - beginIdleBlockTime;
2085             // this is so the remainder contributes to the next phase
2086             beginIdleBlockTime = pool->pool[i].time;
2087           }
2088           if (totalActiveTime <= totalPhaseTime) {
2089             currentExecTimes[numEventMethods+1] = 
2090               totalPhaseTime - totalActiveTime;
2091           } else {
2092             currentExecTimes[numEventMethods+1] = 0.0;
2093             CkPrintf("[%d] Warning: Overhead found to be negative for Phase %d!\n",
2094                      CkMyPe(), currentPhase);
2095           }
2096           collectKMeansData();
2097           // end the loop (and method) and defer the work till the next call
2098           lastPhaseIdx = i;
2099           break; 
2100         }
2101       }
2102     } else if (pool->pool[i].type == END_COMPUTATION) {
2103       if (markedBegin) {
2104         CkAssert(lastBeginEPIdx >= 0);
2105         currentExecTimes[lastBeginEPIdx] += 
2106           pool->pool[i].time - beginBlockTime;
2107         totalActiveTime += pool->pool[i].time - beginBlockTime;
2108       }
2109       if (markedIdle) {
2110         currentExecTimes[numEventMethods] +=
2111           pool->pool[i].time - beginIdleBlockTime;
2112         totalActiveTime += pool->pool[i].time - beginIdleBlockTime;
2113       }
2114       totalPhaseTime = 
2115         pool->pool[i].time - pool->pool[lastPhaseIdx].time;
2116       if (totalActiveTime <= totalPhaseTime) {
2117         currentExecTimes[numEventMethods+1] = totalPhaseTime - totalActiveTime;
2118       } else {
2119         currentExecTimes[numEventMethods+1] = 0.0;
2120         CkPrintf("[%d] Warning: Overhead found to be negative!\n",
2121                  CkMyPe());
2122       }
2123       collectKMeansData();
2124     }
2125   }
2126 }
2127
2128 /**
2129  *  Through a reduction, collectKMeansData aggregates each processors' data
2130  *  in order for global properties to be determined:
2131  *  
2132  *  1. min & max to determine normalization factors.
2133  *  2. sum to determine global EP averages for possible metric reduction
2134  *       through thresholding.
2135  *  3. sum of squares to compute stddev which may be useful in the future.
2136  *
2137  *  collectKMeansData will also keep the processor's data for the current
2138  *    phase so that it may be normalized and worked on subsequently.
2139  *
2140  **/
2141 void KMeansBOC::collectKMeansData() {
2142   int minOffset = numMetrics;
2143   int maxOffset = 2*numMetrics;
2144   int sosOffset = 3*numMetrics; // sos = Sum Of Squares
2145
2146   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::collectKMeansData time=\tg\n", CkMyPe(), CkWallTimer() );
2147
2148   double *reductionMsg = new double[numMetrics*4];
2149
2150   for (int i=0; i<numMetrics; i++) {
2151     reductionMsg[i] = currentExecTimes[i];
2152     // duplicate the event times for max and min sections of the reduction
2153     reductionMsg[minOffset + i] = currentExecTimes[i];
2154     reductionMsg[maxOffset + i] = currentExecTimes[i];
2155     // compute squares
2156     reductionMsg[sosOffset + i] = currentExecTimes[i]*currentExecTimes[i];
2157   }
2158
2159   CkCallback cb(CkIndex_KMeansBOC::globalMetricRefinement(NULL), 
2160                 0, thisProxy);
2161   contribute((numMetrics*4)*sizeof(double), reductionMsg, 
2162              outlierReductionType, cb);  
2163 }
2164
2165 // The purpose is mainly to initialize the k seeds and generate
2166 //   normalization parameters for each of the metrics. The k seeds
2167 //   and normalization parameters are broadcast to all processors.
2168 //
2169 // Called on processor 0
2170 void KMeansBOC::globalMetricRefinement(CkReductionMsg *msg) {
2171   CkAssert(CkMyPe() == 0);
2172   
2173   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::globalMetricRefinement time=\t%g\n", CkMyPe(), CkWallTimer() );
2174
2175   int sumOffset = 0;
2176   int minOffset = numMetrics;
2177   int maxOffset = 2*numMetrics;
2178   int sosOffset = 3*numMetrics; // sos = Sum Of Squares
2179
2180   // calculate statistics & boundaries for the k seeds for clustering
2181   KMeansStatsMessage *outmsg = 
2182     new (numMetrics, numK*numMetrics, numMetrics*4) KMeansStatsMessage;
2183   outmsg->numMetrics = numMetrics;
2184   outmsg->numKPos = numK*numMetrics;
2185   outmsg->numStats = numMetrics*4;
2186
2187   // Sum | Min | Max | Sum of Squares
2188   double *totalExecTimes = (double *)msg->getData();
2189   double totalTime = 0.0;
2190
2191   for (int i=0; i<numMetrics; i++) {
2192     DEBUGN("%lf\n", totalExecTimes[i]);
2193     totalTime += totalExecTimes[i];
2194
2195     // calculate event mean over all processors
2196     outmsg->stats[sumOffset + i] = totalExecTimes[sumOffset + i]/CkNumPes();
2197
2198     // get the ranges and offsets of each metric. With this, we get
2199     //   normalization factors that can be sent back to each processor to
2200     //   be used as necessary. We reuse max for range. Min remains the offset.
2201     outmsg->stats[minOffset + i] = totalExecTimes[minOffset + i];
2202     outmsg->stats[maxOffset + i] = totalExecTimes[maxOffset + i] -
2203       totalExecTimes[minOffset + i];
2204     
2205     // calculate stddev (using biased variance)
2206     outmsg->stats[sosOffset + i] = 
2207       sqrt((totalExecTimes[sosOffset + i] - 
2208             2*(outmsg->stats[i])*totalExecTimes[i] +
2209             (outmsg->stats[i])*(outmsg->stats[i])*CkNumPes())/
2210            CkNumPes());
2211   }
2212
2213   for (int i=0; i<numMetrics; i++) {
2214     // 1) if the proportion of the max value of the entry method relative to
2215     //   the average time taken over all entry methods across all processors
2216     //   is greater than the stipulated percentage threshold ...; AND
2217     // 2) if the range of values are non-zero.
2218     //
2219     // The current assumption is totalTime > 0 (what program has zero total
2220     //   time from all work?)
2221     keepMetric[i] = ((totalExecTimes[maxOffset + i]/(totalTime/CkNumPes()) >=
2222                      entryThreshold) &&
2223       (totalExecTimes[maxOffset + i] > totalExecTimes[minOffset + i]));
2224     if (keepMetric[i]) {
2225       DEBUGF("[%d] Keep EP %d | Max = %lf | Avg Tot = %lf\n", CkMyPe(), i,
2226              totalExecTimes[maxOffset + i], totalTime/CkNumPes());
2227     } else {
2228       DEBUGN("[%d] DO NOT Keep EP %d\n", CkMyPe(), i);
2229     }
2230     outmsg->filter[i] = keepMetric[i];
2231   }
2232
2233   delete msg;
2234   
2235   // initialize k seeds for this phase
2236   kSeeds = new double[numK*numMetrics];
2237
2238   numKReported = 0;
2239   kNumMembers = new int[numK];
2240
2241   // Randomly select k processors' metric vectors for the k seeds
2242   //  srand((unsigned)(CmiWallTimer()*1.0e06));
2243   srand(11337); // for debugging purposes
2244   for (int k=0; k<numK; k++) {
2245     DEBUGF("Seed %d | ", k);
2246     for (int m=0; m<numMetrics; m++) {
2247       double factor = totalExecTimes[maxOffset + m] - 
2248         totalExecTimes[minOffset + m];
2249       // "uniform" distribution, scaled according to the normalization
2250       //   factors
2251       //      kSeeds[numMetrics*k + m] = ((1.0*(k+1))/numK)*factor;
2252       // Random distribution.
2253       kSeeds[numMetrics*k + m] =
2254         ((rand()*1.0)/RAND_MAX)*factor;
2255       if (keepMetric[m]) {
2256         DEBUGF("[%d|%lf] ", m, kSeeds[numMetrics*k + m]);
2257       }
2258       outmsg->kSeedsPos[numMetrics*k + m] = kSeeds[numMetrics*k + m];
2259     }
2260     DEBUGF("\n");
2261     kNumMembers[k] = 0;
2262   }
2263
2264   // broadcast statistical values to all processors for cluster discovery
2265   thisProxy.findInitialClusters(outmsg);
2266 }
2267
2268
2269
2270 // Called on each processor.
2271 void KMeansBOC::findInitialClusters(KMeansStatsMessage *msg) {
2272
2273  if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::findInitialClusters time=\t%g\n", CkMyPe(), CkWallTimer() );
2274
2275   phaseIter = 0;
2276
2277   // Get info from stats message
2278   CkAssert(numMetrics == msg->numMetrics);
2279   for (int i=0; i<numMetrics; i++) {
2280     keepMetric[i] = msg->filter[i];
2281   }
2282
2283   // Normalize data on local processor.
2284   // **CWL** See my thesis for detailed discussion of normalization of
2285   //    performance data.
2286   // **NOTE** This might change if we want to send data based on the filter
2287   //   instead of all the data.
2288   CkAssert(numMetrics*4 == msg->numStats);
2289   for (int i=0; i<numMetrics; i++) {
2290     currentExecTimes[i] -= msg->stats[numMetrics + i];  // take offset
2291     // **CWL** We do not normalize the range. Entry methods that exhibit
2292     //   large absolute timing variations should be allowed to contribute
2293     //   more to the Euclidean distance measure!
2294     // currentExecTimes[i] /= msg->stats[2*numMetrics + i];
2295   }
2296
2297   // **NOTE** This might change if we want to send data based on the filter
2298   //   instead of all the data.
2299   CkAssert(numK*numMetrics == msg->numKPos);
2300   for (int i=0; i<msg->numKPos; i++) {
2301     incKSeeds[i] = msg->kSeedsPos[i];
2302   }
2303
2304   // Decide which KSeed this processor belongs to.
2305   minDistance = calculateDistance(0);
2306   DEBUGN("[%d] Phase %d Iter %d | Distance from 0 = %lf \n", CkMyPe(), 
2307            currentPhase, phaseIter, minDistance);
2308   minK = 0;
2309   for (int i=1; i<numK; i++) {
2310     double distance = calculateDistance(i);
2311     DEBUGN("[%d] Phase %d Iter %d | Distance from %d = %lf \n", CkMyPe(), 
2312              currentPhase, phaseIter, i, distance);
2313     if (distance < minDistance) {
2314       minDistance = distance;
2315       minK = i;
2316     }
2317   }
2318
2319   // Set up a reduction with the modification vector to the root (0).
2320   //
2321   // The modification vector sends a negative value for each metric
2322   //   for the K this processor no longer belongs to and a positive
2323   //   value to the K the processor now belongs. In addition, a -1.0
2324   //   is sent to the K it is leaving and a +1.0 to the K it is 
2325   //   joining.
2326   //
2327   // The processor must still contribute a "zero returns" even if
2328   //   nothing changes. This will be the basis for determine
2329   //   convergence at the root.
2330   //
2331   // The addtional +1 is meant for the count-change that must be
2332   //   maintained for the special cases at the root when some K
2333   //   may be deprived of all processor points or go from 0 to a
2334   //   positive number of processors (see later comments).
2335   double *modVector = new double[numK*(numMetrics+1)];
2336   for (int i=0; i<numK; i++) {
2337     for (int j=0; j<numMetrics+1; j++) {
2338       modVector[i*(numMetrics+1) + j] = 0.0;
2339     }
2340   }
2341   for (int i=0; i<numMetrics; i++) {
2342     // for this initialization, only positive values need be sent.
2343     modVector[minK*(numMetrics+1) + i] = currentExecTimes[i];
2344   }
2345   modVector[minK*(numMetrics+1)+numMetrics] = 1.0;
2346
2347   CkCallback cb(CkIndex_KMeansBOC::updateKSeeds(NULL), 
2348                 0, thisProxy);
2349   contribute(numK*(numMetrics+1)*sizeof(double), modVector, 
2350              CkReduction::sum_double, cb);  
2351 }
2352
2353 double KMeansBOC::calculateDistance(int k) {
2354   double ret = 0.0;
2355   for (int i=0; i<numMetrics; i++) {
2356     if (keepMetric[i]) {
2357       DEBUGN("[%d] Phase %d Iter %d Metric %d Exec %lf Seed %lf \n", 
2358              CkMyPe(), currentPhase, phaseIter, i,
2359                currentExecTimes[i], incKSeeds[k*numMetrics + i]);
2360       ret += pow(currentExecTimes[i] - incKSeeds[k*numMetrics + i], 2.0);
2361     }
2362   }
2363   return sqrt(ret);
2364 }
2365
2366 void KMeansBOC::updateKSeeds(CkReductionMsg *msg) {
2367   CkAssert(CkMyPe() == 0);
2368
2369   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::updateKSeeds time=\t%g\n", CkMyPe(), CkWallTimer() );
2370
2371   double *modVector = (double *)msg->getData();
2372   // sanity check
2373   CkAssert(numK*(numMetrics+1)*sizeof(double) == msg->getSize());
2374
2375   // A quick convergence test.
2376   bool hasChanges = false;
2377   for (int i=0; i<numK; i++) {
2378     hasChanges = hasChanges || 
2379       (modVector[i*(numMetrics+1) + numMetrics] != 0.0);
2380   }
2381   if (!hasChanges) {
2382     delete msg;
2383     findRepresentatives();
2384   } else {
2385     int overallChange = 0;
2386     for (int i=0; i<numK; i++) {
2387       int change = (int)modVector[i*(numMetrics+1) + numMetrics];
2388       if (change != 0) {
2389         overallChange += change;
2390         // modify the k seeds based on the modification vectors coming in
2391         //
2392         // If a seed initially has no members, its contents do not matter and
2393         //   is simply set to the average of the incoming vector.
2394         // If the change causes a seed to lose all its members, do nothing.
2395         //   Its last-known location is kept to allow it to re-capture
2396         //   membership at the next iteration rather than apply the last
2397         //   changes (which snaps the point unnaturally to 0,0).
2398         // Otherwise, apply the appropriate vector changes.
2399         CkAssert((kNumMembers[i] + change >= 0) &&
2400                  (kNumMembers[i] + change <= CkNumPes()));
2401         if (kNumMembers[i] == 0) {
2402           CkAssert(change > 0);
2403           for (int j=0; j<numMetrics; j++) {
2404             kSeeds[i*numMetrics + j] = modVector[i*(numMetrics+1) + j]/change;
2405           }
2406         } else if (kNumMembers[i] + change == 0) {
2407           // do nothing.
2408         } else {
2409           for (int j=0; j<numMetrics; j++) {
2410             kSeeds[i*numMetrics + j] *= kNumMembers[i];
2411             kSeeds[i*numMetrics + j] += modVector[i*(numMetrics+1) + j];
2412             kSeeds[i*numMetrics + j] /= kNumMembers[i] + change;
2413           }
2414         }
2415         kNumMembers[i] += change;
2416       }
2417       DEBUGN("[%d] Phase %d Iter %d K = %d Membership Count = %d\n",
2418              CkMyPe(), currentPhase, phaseIter, i, kNumMembers[i]);
2419     }
2420     delete msg;
2421
2422     // broadcast the new seed locations.
2423     KSeedsMessage *outmsg = new (numK*numMetrics) KSeedsMessage;
2424     outmsg->numKPos = numK*numMetrics;
2425     for (int i=0; i<numK*numMetrics; i++) {
2426       outmsg->kSeedsPos[i] = kSeeds[i];
2427     }
2428
2429     thisProxy.updateSeedMembership(outmsg);
2430   }
2431 }
2432
2433 // Called on all processors
2434 void KMeansBOC::updateSeedMembership(KSeedsMessage *msg) {
2435
2436   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::updateSeedMembership time=\t%g\n", CkMyPe(), CkWallTimer() );
2437
2438   phaseIter++;
2439
2440   // **NOTE** This might change if we want to send data based on the filter
2441   //   instead of all the data.
2442   CkAssert(numK*numMetrics == msg->numKPos);
2443   for (int i=0; i<msg->numKPos; i++) {
2444     incKSeeds[i] = msg->kSeedsPos[i];
2445   }
2446
2447   // Decide which KSeed this processor belongs to.
2448   lastMinK = minK;
2449   minDistance = calculateDistance(0);
2450   DEBUGN("[%d] Phase %d Iter %d | Distance from 0 = %lf \n", CkMyPe(), 
2451          currentPhase, phaseIter, minDistance);
2452
2453   minK = 0;
2454   for (int i=1; i<numK; i++) {
2455     double distance = calculateDistance(i);
2456     DEBUGN("[%d] Phase %d Iter %d | Distance from %d = %lf \n", CkMyPe(), 
2457            currentPhase, phaseIter, i, distance);
2458     if (distance < minDistance) {
2459       minDistance = distance;
2460       minK = i;
2461     }
2462   }
2463
2464   double *modVector = new double[numK*(numMetrics+1)];
2465   for (int i=0; i<numK; i++) {
2466     for (int j=0; j<numMetrics+1; j++) {
2467       modVector[i*(numMetrics+1) + j] = 0.0;
2468     }
2469   }
2470
2471   if (minK != lastMinK) {
2472     for (int i=0; i<numMetrics; i++) {
2473       modVector[minK*(numMetrics+1) + i] = currentExecTimes[i];
2474       modVector[lastMinK*(numMetrics+1) + i] = -currentExecTimes[i];
2475     }
2476     modVector[minK*(numMetrics+1)+numMetrics] = 1.0;
2477     modVector[lastMinK*(numMetrics+1)+numMetrics] = -1.0;
2478   }
2479
2480   CkCallback cb(CkIndex_KMeansBOC::updateKSeeds(NULL), 
2481                 0, thisProxy);
2482   contribute(numK*(numMetrics+1)*sizeof(double), modVector, 
2483              CkReduction::sum_double, cb);  
2484 }
2485
2486 void KMeansBOC::findRepresentatives() {
2487
2488   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::findRepresentatives time=\t%g\n", CkMyPe(), CkWallTimer() );
2489
2490   int numNonEmptyClusters = 0;
2491   for (int i=0; i<numK; i++) {
2492     if (kNumMembers[i] > 0) {
2493       numNonEmptyClusters++;
2494     }
2495   }
2496
2497   int numRepresentatives = peNumKeep;
2498   // **FIXME**
2499   // This is fairly arbitrary. Next time, choose the centers of the top
2500   //   largest clusters.
2501   if (numRepresentatives < numNonEmptyClusters) {
2502     numRepresentatives = numNonEmptyClusters;
2503   }
2504
2505   int slotsRemaining = numRepresentatives;
2506
2507   DEBUGF("Slots = %d | Non-empty = %d \n", slotsRemaining, 
2508          numNonEmptyClusters);
2509
2510   // determine how many exemplars to select per cluster. Currently
2511   //   hardcoded to 1. Future challenge is to decide on other numbers
2512   //   or proportionality.
2513   //
2514   int exemplarsPerCluster = 1;
2515   slotsRemaining -= exemplarsPerCluster*numNonEmptyClusters;
2516
2517   int numCandidateOutliers = CkNumPes() - 
2518     exemplarsPerCluster*numNonEmptyClusters;
2519
2520   double *remainders = new double[numK];
2521   int *assigned = new int[numK];
2522   exemplarChoicesLeft = new int[numK];
2523   outlierChoicesLeft = new int[numK];
2524
2525   for (int i=0; i<numK; i++) {
2526     assigned[i] = 0;
2527     remainders[i] = 
2528       (kNumMembers[i] - exemplarsPerCluster*numNonEmptyClusters) *
2529       slotsRemaining / numCandidateOutliers;
2530     if (remainders[i] >= 0.0) {
2531       assigned[i] = (int)floor(remainders[i]);
2532       remainders[i] -= assigned[i];
2533     } else {
2534       remainders[i] = 0.0;
2535     }
2536   }
2537
2538   for (int i=0; i<numK; i++) {
2539     slotsRemaining -= assigned[i];
2540   }
2541   CkAssert(slotsRemaining >= 0);
2542
2543   // find clusters to assign the loose slots to, in order of
2544   // remainder proportion
2545   while (slotsRemaining > 0) {
2546     double max = 0.0;
2547     int winner = 0;
2548     for (int i=0; i<numK; i++) {
2549       if (remainders[i] > max) {
2550         max = remainders[i];
2551         winner = i;
2552       }
2553     }
2554     assigned[winner]++;
2555     remainders[winner] = 0.0;
2556     slotsRemaining--;
2557   }
2558
2559   // set up how many reduction cycles of min/max we need to conduct to
2560   // select the representatives.
2561   numSelectionIter = exemplarsPerCluster;
2562   for (int i=0; i<numK; i++) {
2563     if (assigned[i] > numSelectionIter) {
2564       numSelectionIter = assigned[i];
2565     }
2566   }
2567   DEBUGF("Selection Iterations = %d\n", numSelectionIter);
2568
2569   for (int i=0; i<numK; i++) {
2570     if (kNumMembers[i] > 0) {
2571       exemplarChoicesLeft[i] = exemplarsPerCluster;
2572       outlierChoicesLeft[i] = assigned[i];
2573     } else {
2574       exemplarChoicesLeft[i] = 0;
2575       outlierChoicesLeft[i] = 0;
2576     }
2577     DEBUGF("%d | Exemplar = %d | Outlier = %d\n", i, exemplarChoicesLeft[i],
2578            outlierChoicesLeft[i]);
2579   }
2580
2581   delete [] assigned;
2582   delete [] remainders;
2583
2584   // send out first broadcast
2585   KSelectionMessage *outmsg = NULL;
2586   if (numSelectionIter > 0) {
2587     outmsg = new (numK, numK, numK) KSelectionMessage;
2588     outmsg->numKMinIDs = numK;
2589     outmsg->numKMaxIDs = numK;
2590     for (int i=0; i<numK; i++) {
2591       outmsg->minIDs[i] = -1;
2592       outmsg->maxIDs[i] = -1;
2593     }
2594     thisProxy.collectDistances(outmsg);
2595   } else {
2596     CkPrintf("Warning: No selection iteration from the start!\n");
2597     // invoke phase completion on all processors
2598     thisProxy.phaseDone();
2599   }
2600 }
2601
2602 /*
2603  *  lastMin = array of minimum champions of the last tournament
2604  *  lastMax = array of maximum champions of the last tournament
2605  *  lastMaxVal = array of last encountered maximum values, allows previous
2606  *                 minimum winners to eliminate themselves from the next
2607  *                 minimum race.
2608  *
2609  *  Called on all processors.
2610  */
2611 void KMeansBOC::collectDistances(KSelectionMessage *msg) {
2612
2613   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::collectDistances time=\t%g\n", CkMyPe(), CkWallTimer() );
2614
2615   DEBUGF("[%d] %d | min = %d max = %d\n", CkMyPe(),
2616          lastMinK, msg->minIDs[lastMinK], msg->maxIDs[lastMinK]);
2617   if ((CkMyPe() == msg->minIDs[lastMinK]) || 
2618       (CkMyPe() == msg->maxIDs[lastMinK])) {
2619     CkAssert(!selected);
2620     selected = true;
2621   }
2622
2623   // build outgoing reduction structure
2624   //   format = minVal | ID | maxVal | ID
2625   double *minMaxAndIDs = NULL;
2626
2627   minMaxAndIDs = new double[numK*4];
2628   // initialize to the appropriate out-of-band values (for error checks)
2629   for (int i=0; i<numK; i++) {
2630     minMaxAndIDs[i*4] = -1.0; // out-of-band min value
2631     minMaxAndIDs[i*4+1] = -1.0; // out of band ID
2632     minMaxAndIDs[i*4+2] = -1.0; // out-of-band max value
2633     minMaxAndIDs[i*4+3] = -1.0; // out of band ID
2634   }
2635   // If I have not won before, I put myself back into the competition
2636   if (!selected) {
2637     DEBUGF("[%d] My Contribution = %lf\n", CkMyPe(), minDistance);
2638     minMaxAndIDs[lastMinK*4] = minDistance;
2639     minMaxAndIDs[lastMinK*4+1] = CkMyPe();
2640     minMaxAndIDs[lastMinK*4+2] = minDistance;
2641     minMaxAndIDs[lastMinK*4+3] = CkMyPe();
2642   }
2643   delete msg;
2644
2645   CkCallback cb(CkIndex_KMeansBOC::findNextMinMax(NULL), 
2646                 0, thisProxy);
2647   contribute(numK*4*sizeof(double), minMaxAndIDs, 
2648              minMaxReductionType, cb);  
2649 }
2650
2651 void KMeansBOC::findNextMinMax(CkReductionMsg *msg) {
2652   // incoming format:
2653   //   minVal | minID | maxVal | maxID
2654
2655   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::findNextMinMax time=\t%g\n", CkMyPe(), CkWallTimer() );
2656
2657   if (numSelectionIter > 0) {
2658     double *incInfo = (double *)msg->getData();
2659     
2660     KSelectionMessage *outmsg = new (numK, numK) KSelectionMessage;
2661     outmsg->numKMinIDs = numK;
2662     outmsg->numKMaxIDs = numK;
2663     
2664     for (int i=0; i<numK; i++) {
2665       DEBUGF("%d | %lf %d %lf %d \n", i, 
2666              incInfo[i*4], (int)incInfo[i*4+1], 
2667              incInfo[i*4+2], (int)incInfo[i*4+3]);
2668     }
2669
2670     for (int i=0; i<numK; i++) {
2671       if (exemplarChoicesLeft[i] > 0) {
2672         outmsg->minIDs[i] = (int)incInfo[i*4+1];
2673         exemplarChoicesLeft[i]--;
2674       } else {
2675         outmsg->minIDs[i] = -1;
2676       }
2677       if (outlierChoicesLeft[i] > 0) {
2678         outmsg->maxIDs[i] = (int)incInfo[i*4+3];
2679         outlierChoicesLeft[i]--;
2680       } else {
2681         outmsg->maxIDs[i] = -1;
2682       }
2683     }
2684     thisProxy.collectDistances(outmsg);
2685     numSelectionIter--;
2686   } else {
2687     // invoke phase completion on all processors
2688     thisProxy.phaseDone();
2689   }
2690 }
2691
2692 /**
2693  *  Completion of the K-Means clustering and data selection of one phase
2694  *    of the computation.
2695  *
2696  *  Called on every processor.
2697  */
2698 void KMeansBOC::phaseDone() {
2699
2700   //  if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::phaseDone time=\t%g\n", CkMyPe(), CkWallTimer() );
2701
2702   LogPool *pool = CkpvAccess(_trace)->_logPool;
2703   CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
2704
2705   // now decide on what to do with the decision.
2706   if (!selected) {
2707     if (usePhases) {
2708       pool->keepPhase[currentPhase] = false;
2709     } else {
2710       // if not using phases, we're working on the whole log
2711       pool->setAllPhases(false);
2712     }
2713   }
2714
2715   // **FIXME** (?) - All processors have to agree on this, or the reduction
2716   //   will not be correct! The question is "is this enforcible?"
2717   if ((currentPhase == (pool->numPhases-1)) || !usePhases) {
2718     // We're done
2719     int dummy = 0;
2720     CkCallback cb(CkIndex_TraceProjectionsBOC::kMeansDone(NULL), 
2721                   0, bocProxy);
2722     contribute(sizeof(int), &dummy, CkReduction::sum_int, cb);
2723   } else {
2724     // reset all phase-based k-means data and decisions
2725
2726     // **FIXME**!!!!!    
2727     
2728     // invoke the next K-Means computation phase.
2729     currentPhase++;
2730     thisProxy[CkMyPe()].getNextPhaseMetrics();
2731   }
2732 }
2733
2734 void TraceProjectionsBOC::startTimeAnalysis()
2735 {
2736   double startTime = 0.0;
2737   if (CkpvAccess(_trace)->_logPool->numEntries>0)
2738      startTime = CkpvAccess(_trace)->_logPool->pool[0].time;
2739   CkCallback cb(CkIndex_TraceProjectionsBOC::startTimeDone(NULL), thisProxy);
2740   contribute(sizeof(double), &startTime, CkReduction::min_double, cb);  
2741 }
2742
2743 void TraceProjectionsBOC::startTimeDone(CkReductionMsg *msg)
2744 {
2745   // CkPrintf("[%d] TraceProjectionsBOC::startTimeDone time=\t%g parModulesRemaining:%d\n", CkMyPe(), CkWallTimer(), parModulesRemaining);
2746
2747   if (CkpvAccess(_trace) != NULL) {
2748     CkpvAccess(_trace)->_logPool->globalStartTime = *(double *)msg->getData();
2749     CkpvAccess(_trace)->_logPool->setNewStartTime();
2750     //if (CkMyPe() == 0) CkPrintf("Start time determined to be %lf us\n", (CkpvAccess(_trace)->_logPool->globalStartTime)*1e06);
2751   }
2752   delete msg;
2753   thisProxy[CkMyPe()].startEndTimeAnalysis();
2754 }
2755
2756 void TraceProjectionsBOC::startEndTimeAnalysis()
2757 {
2758  //CkPrintf("[%d] TraceProjectionsBOC::startEndTimeAnalysis time=\t%g\n", CkMyPe(), CkWallTimer() );
2759
2760   endTime = CkpvAccess(_trace)->endTime;
2761   // CkPrintf("[%d] End time is %lf us\n", CkMyPe(), endTime*1e06);
2762
2763   CkCallback cb(CkIndex_TraceProjectionsBOC::endTimeDone(NULL), 
2764                 0, thisProxy);
2765   contribute(sizeof(double), &endTime, CkReduction::max_double, cb);  
2766 }
2767
2768 void TraceProjectionsBOC::endTimeDone(CkReductionMsg *msg)
2769 {
2770  //if(CkMyPe()==0)    CkPrintf("[%d] TraceProjectionsBOC::endTimeDone time=\t%g parModulesRemaining:%d\n", CkMyPe(), CkWallTimer(), parModulesRemaining);
2771
2772   CkAssert(CkMyPe() == 0);
2773   parModulesRemaining--;
2774   if (CkpvAccess(_trace) != NULL) {
2775     CkpvAccess(_trace)->_logPool->globalEndTime = *(double *)msg->getData() - CkpvAccess(_trace)->_logPool->globalStartTime;
2776     // CkPrintf("End time determined to be %lf us\n",
2777     //       (CkpvAccess(_trace)->_logPool->globalEndTime)*1e06);
2778   }
2779   delete msg;
2780   if (parModulesRemaining == 0) {
2781     thisProxy[CkMyPe()].finalize();
2782   }
2783 }
2784
2785 void TraceProjectionsBOC::kMeansDone(CkReductionMsg *msg) {
2786
2787  if(CkMyPe()==0)  CkPrintf("[%d] TraceProjectionsBOC::kMeansDone time=\t%g\n", CkMyPe(), CkWallTimer() );
2788
2789   CkAssert(CkMyPe() == 0);
2790   parModulesRemaining--;
2791   CkPrintf("K-Means Analysis Time = %lf seconds\n",
2792            CmiWallTimer()-analysisStartTime);
2793   delete msg;
2794   if (parModulesRemaining == 0) {
2795     thisProxy[CkMyPe()].finalize();
2796   }
2797 }
2798
2799 /**
2800  *
2801  *  This version is called (on processor 0) only if flushCheck fails.
2802  *
2803  */
2804 void TraceProjectionsBOC::kMeansDone() {
2805   CkAssert(CkMyPe() == 0);
2806   parModulesRemaining--;
2807   CkPrintf("K-Means Analysis Aborted because of flush. Time taken = %lf seconds\n",
2808            CmiWallTimer()-analysisStartTime);
2809   if (parModulesRemaining == 0) {
2810     thisProxy[CkMyPe()].finalize();
2811   }
2812 }
2813
2814 void TraceProjectionsBOC::finalize()
2815 {
2816   CkAssert(CkMyPe() == 0);
2817   //CkPrintf("Total Analysis Time = %lf seconds\n", 
2818   //       CmiWallTimer()-analysisStartTime);
2819   thisProxy.closingTraces();
2820 }
2821
2822 // Called on every processor
2823 void TraceProjectionsBOC::closingTraces() {
2824   CkpvAccess(_trace)->closeTrace();
2825
2826     // subtle:  reduction needs to go to the PE which started CkExit()
2827   int pe = 0;
2828   if (endPe != -1) pe = endPe;
2829   CkCallback cb(CkIndex_TraceProjectionsBOC::closeParallelShutdown(NULL), 
2830                 pe, thisProxy); 
2831   contribute(0, NULL, CkReduction::sum_int, cb);  
2832 }
2833
2834 // The sole purpose of this reduction is to decide whether or not
2835 //   Projections as a module needs to call CkExit() to get other
2836 //   modules to shutdown.
2837 void TraceProjectionsBOC::closeParallelShutdown(CkReductionMsg *msg) {
2838   CkAssert(endPe == -1 && CkMyPe() ==0 || CkMyPe() == endPe);
2839   delete msg;
2840   // decide if CkExit() needs to be called
2841   if (!CkpvAccess(_trace)->converseExit) {
2842     CkExit();
2843   }
2844 }
2845 /*
2846  *  Registration and definition of the Outlier Reduction callback.
2847  *  Format: Sum | Min | Max | Sum of Squares
2848  */
2849 CkReductionMsg *outlierReduction(int nMsgs,
2850                                  CkReductionMsg **msgs) {
2851   int numBytes = 0;
2852   int numMetrics = 0;
2853   double *ret = NULL;
2854
2855   if (nMsgs == 1) {
2856     // nothing to do, just pass it on
2857     return CkReductionMsg::buildNew(msgs[0]->getSize(),msgs[0]->getData());
2858   }
2859
2860   if (nMsgs > 1) {
2861     numBytes = msgs[0]->getSize();
2862     // sanity checks
2863     if (numBytes%sizeof(double) != 0) {
2864       CkAbort("Outlier Reduction Size incompatible with doubles!\n");
2865     }
2866     if ((numBytes/sizeof(double))%4 != 0) {
2867       CkAbort("Outlier Reduction Size Array not divisible by 4!\n");
2868     }
2869     numMetrics = (numBytes/sizeof(double))/4;
2870     ret = new double[numMetrics*4];
2871
2872     // copy the first message data into the return structure first
2873     for (int i=0; i<numMetrics*4; i++) {
2874       ret[i] = ((double *)msgs[0]->getData())[i];
2875     }
2876
2877     // Sum | Min | Max | Sum of Squares
2878     for (int msgIdx=1; msgIdx<nMsgs; msgIdx++) {
2879       for (int i=0; i<numMetrics; i++) {
2880         // Sum
2881         ret[i] += ((double *)msgs[msgIdx]->getData())[i];
2882         // Min
2883         ret[numMetrics + i] =
2884           (ret[numMetrics + i] < 
2885            ((double *)msgs[msgIdx]->getData())[numMetrics + i]) 
2886           ? ret[numMetrics + i] : 
2887           ((double *)msgs[msgIdx]->getData())[numMetrics + i];
2888         // Max
2889         ret[2*numMetrics + i] = 
2890           (ret[2*numMetrics + i] >
2891            ((double *)msgs[msgIdx]->getData())[2*numMetrics + i])
2892           ? ret[2*numMetrics + i] :
2893           ((double *)msgs[msgIdx]->getData())[2*numMetrics + i];
2894         // Sum of Squares (squaring already done at leaf)
2895         ret[3*numMetrics + i] +=
2896           ((double *)msgs[msgIdx]->getData())[3*numMetrics + i];
2897       }
2898     }
2899   }
2900   
2901   /* apparently, we do not delete the incoming messages */
2902   return CkReductionMsg::buildNew(numBytes,ret);
2903 }
2904
2905 /*
2906  * The only reason we have a user-defined reduction is to support
2907  *   identification of the "winning" processors as well as to handle
2908  *   both the min and the max of each "tournament". A simple min/max
2909  *   discovery cannot handle ties.
2910  */
2911 CkReductionMsg *minMaxReduction(int nMsgs,
2912                                 CkReductionMsg **msgs) {
2913   CkAssert(nMsgs > 0);
2914
2915   int numBytes = msgs[0]->getSize();
2916   CkAssert(numBytes%sizeof(double) == 0);
2917   int numK = (numBytes/sizeof(double))/4;
2918
2919   double *ret = new double[numK*4];
2920   // fill with out-of-band values
2921   for (int i=0; i<numK; i++) {
2922     ret[i*4] = -1.0;
2923     ret[i*4+1] = -1.0;
2924     ret[i*4+2] = -1.0;
2925     ret[i*4+3] = -1.0;
2926   }
2927
2928   // incoming format K * (minVal | minIdx | maxVal | maxIdx)
2929   for (int i=0; i<nMsgs; i++) {
2930     double *temp = (double *)msgs[i]->getData();
2931     for (int j=0; j<numK; j++) {
2932       // no previous valid min
2933       if (ret[j*4+1] < 0) {
2934         // fill it in only if the incoming min is valid
2935         if (temp[j*4+1] >= 0) {
2936           ret[j*4] = temp[j*4];      // fill min value
2937           ret[j*4+1] = temp[j*4+1];  // fill ID
2938         }
2939       } else {
2940         // find Min, only if incoming min is valid
2941         if (temp[j*4+1] >= 0) {
2942           if (temp[j*4] < ret[j*4]) {
2943             ret[j*4] = temp[j*4];      // replace min value
2944             ret[j*4+1] = temp[j*4+1];  // replace ID
2945           }
2946         }
2947       }
2948       // no previous valid max
2949       if (ret[j*4+3] < 0) {
2950         // fill only if the incoming max is valid
2951         if (temp[j*4+3] >= 0) {
2952           ret[j*4+2] = temp[j*4+2];  // fill max value
2953           ret[j*4+3] = temp[j*4+3];  // fill ID
2954         }
2955       } else {
2956         // find Max, only if incoming max is valid
2957         if (temp[j*4+3] >= 0) {
2958           if (temp[j*4+2] > ret[j*4+2]) {
2959             ret[j*4+2] = temp[j*4+2];  // replace max value
2960             ret[j*4+3] = temp[j*4+3];  // replace ID
2961           }
2962         }
2963       }
2964     }
2965   }
2966   CkReductionMsg *redmsg = CkReductionMsg::buildNew(numBytes, ret);
2967   delete [] ret;
2968   return redmsg;
2969 }
2970
2971 #include "TraceProjections.def.h"
2972 #endif //PROJ_ANALYSIS
2973
2974 /*@}*/