5d9681666bd4525602897d79436f53523572b057
[charm.git] / src / ck-perf / trace-projections.C
1 /**
2  * \addtogroup CkPerf
3 */
4 /*@{*/
5
6 #include <string.h>
7
8 #include "charm++.h"
9 #include "trace-projections.h"
10 #include "trace-projectionsBOC.h"
11 #include "TopoManager.h"
12
13 #if DEBUG_PROJ
14 #define DEBUGF(...) CkPrintf(__VA_ARGS__)
15 #else
16 #define DEBUGF(...)
17 #endif
18 #define DEBUGN(...)  // easy way to selectively disable DEBUGs
19
20 #define DefaultLogBufSize      1000000
21
22 // **CW** Simple delta encoding implementation
23 // delta encoding is on by default. It may be turned off later in
24 // the runtime.
25 int deltaLog;
26 int nonDeltaLog;
27
28 int checknested=0;              // check illegal nested begin/end execute 
29
30 #ifdef PROJ_ANALYSIS
31 // BOC operations readonlys
32 CkGroupID traceProjectionsGID;
33 CkGroupID kMeansGID;
34
35 // New reduction type for Outlier Analysis purposes. This is allowed to be
36 // a global variable according to the Charm++ manual.
37 CkReductionMsg *outlierReduction(int nMsgs,
38                                  CkReductionMsg **msgs);
39 CkReductionMsg *minMaxReduction(int nMsgs,
40                                 CkReductionMsg **msgs);
41 CkReduction::reducerType outlierReductionType;
42 CkReduction::reducerType minMaxReductionType;
43 #endif // PROJ_ANALYSIS
44
45 CkpvStaticDeclare(TraceProjections*, _trace);
46 CtvStaticDeclare(int,curThreadEvent);
47
48 CkpvDeclare(CmiInt8, CtrLogBufSize);
49
50 typedef CkVec<char *>  usrEventVec;
51 CkpvStaticDeclare(usrEventVec, usrEventlist);
52 class UsrEvent {
53 public:
54   int e;
55   char *str;
56   UsrEvent(int _e, char* _s): e(_e),str(_s) {}
57 };
58 CkpvStaticDeclare(CkVec<UsrEvent *>*, usrEvents);
59
60 // When tracing is disabled, these are defined as empty static inlines
61 // in the header, to minimize overhead
62 #if CMK_TRACE_ENABLED
63 /// Disable the outputting of the trace logs
64 void disableTraceLogOutput()
65
66   CkpvAccess(_trace)->setWriteData(false);
67 }
68
69 /// Enable the outputting of the trace logs
70 void enableTraceLogOutput()
71 {
72   CkpvAccess(_trace)->setWriteData(true);
73 }
74
75 /// Force the log files to be flushed
76 void flushTraceLog()
77 {
78   CkpvAccess(_trace)->traceFlushLog();
79 }
80 #endif
81
82 #if ! CMK_TRACE_ENABLED
83 static int warned=0;
84 #define OPTIMIZED_VERSION       \
85         if (!warned) { warned=1;        \
86         CmiPrintf("\n\n!!!! Warning: traceUserEvent not available in optimized version!!!!\n\n\n"); }
87 #else
88 #define OPTIMIZED_VERSION /*empty*/
89 #endif // CMK_TRACE_ENABLED
90
91 /*
92 On T3E, we need to have file number control by open/close files only when needed.
93 */
94 #if CMK_TRACE_LOGFILE_NUM_CONTROL
95   #define OPEN_LOG openLog("a");
96   #define CLOSE_LOG closeLog();
97 #else
98   #define OPEN_LOG
99   #define CLOSE_LOG
100 #endif //CMK_TRACE_LOGFILE_NUM_CONTROL
101
102 #if CMK_HAS_COUNTER_PAPI
103 int papiEvents[NUMPAPIEVENTS] = { PAPI_L2_DCM, PAPI_FP_OPS };
104 #endif // CMK_HAS_COUNTER_PAPI
105
106 /**
107   For each TraceFoo module, _createTraceFoo() must be defined.
108   This function is called in _createTraces() generated in moduleInit.C
109 */
110 void _createTraceprojections(char **argv)
111 {
112   DEBUGF("%d createTraceProjections\n", CkMyPe());
113   CkpvInitialize(CkVec<char *>, usrEventlist);
114   CkpvInitialize(CkVec<UsrEvent *>*, usrEvents);
115   CkpvAccess(usrEvents) = new CkVec<UsrEvent *>();
116 #if CMK_BIGSIM_CHARM
117   // CthRegister does not call the constructor
118 //  CkpvAccess(usrEvents) = CkVec<UsrEvent *>();
119 #endif //CMK_BIGSIM_CHARM
120   CkpvInitialize(TraceProjections*, _trace);
121   CkpvAccess(_trace) = new  TraceProjections(argv);
122   CkpvAccess(_traces)->addTrace(CkpvAccess(_trace));
123   if (CkMyPe()==0) CkPrintf("Charm++: Tracemode Projections enabled.\n");
124 }
125  
126 /* ****** CW TEMPORARY LOCATION ***** Support for thread listeners */
127
128 struct TraceThreadListener {
129   struct CthThreadListener base;
130   int event;
131   int msgType;
132   int ep;
133   int srcPe;
134   int ml;
135   CmiObjId idx;
136 };
137
138 extern "C"
139 void traceThreadListener_suspend(struct CthThreadListener *l)
140 {
141   TraceThreadListener *a=(TraceThreadListener *)l;
142   /* here, we activate the appropriate trace codes for the appropriate
143      registered modules */
144   traceSuspend();
145 }
146
147 extern "C"
148 void traceThreadListener_resume(struct CthThreadListener *l) 
149 {
150   TraceThreadListener *a=(TraceThreadListener *)l;
151   /* here, we activate the appropriate trace codes for the appropriate
152      registered modules */
153   _TRACE_BEGIN_EXECUTE_DETAILED(a->event,a->msgType,a->ep,a->srcPe,a->ml,
154                                 CthGetThreadID(a->base.thread));
155   a->event=-1;
156   a->srcPe=CkMyPe(); /* potential lie to migrated threads */
157   a->ml=0;
158 }
159
160 extern "C"
161 void traceThreadListener_free(struct CthThreadListener *l) 
162 {
163   TraceThreadListener *a=(TraceThreadListener *)l;
164   delete a;
165 }
166
167 void TraceProjections::traceAddThreadListeners(CthThread tid, envelope *e)
168 {
169 #if CMK_TRACE_ENABLED
170   /* strip essential information from the envelope */
171   TraceThreadListener *a= new TraceThreadListener;
172   
173   a->base.suspend=traceThreadListener_suspend;
174   a->base.resume=traceThreadListener_resume;
175   a->base.free=traceThreadListener_free;
176   a->event=e->getEvent();
177   a->msgType=e->getMsgtype();
178   a->ep=e->getEpIdx();
179   a->srcPe=e->getSrcPe();
180   a->ml=e->getTotalsize();
181
182   CthAddListener(tid, (CthThreadListener *)a);
183 #endif
184 }
185
186 void LogPool::openLog(const char *mode)
187 {
188 #if CMK_PROJECTIONS_USE_ZLIB
189   if(compressed) {
190     if (nonDeltaLog) {
191       do {
192         zfp = gzopen(fname, mode);
193       } while (!zfp && (errno == EINTR || errno == EMFILE));
194       if(!zfp) CmiAbort("Cannot open Projections Compressed Non Delta Trace File for writing...\n");
195     }
196     if (deltaLog) {
197       do {
198         deltazfp = gzopen(dfname, mode);
199       } while (!deltazfp && (errno == EINTR || errno == EMFILE));
200       if (!deltazfp) 
201         CmiAbort("Cannot open Projections Compressed Delta Trace File for writing...\n");
202     }
203   } else {
204     if (nonDeltaLog) {
205       do {
206         fp = fopen(fname, mode);
207       } while (!fp && (errno == EINTR || errno == EMFILE));
208       if (!fp) {
209         CkPrintf("[%d] Attempting to open file [%s]\n",CkMyPe(),fname);
210         CmiAbort("Cannot open Projections Non Delta Trace File for writing...\n");
211       }
212     }
213     if (deltaLog) {
214       do {
215         deltafp = fopen(dfname, mode);
216       } while (!deltafp && (errno == EINTR || errno == EMFILE));
217       if (!deltafp) 
218         CmiAbort("Cannot open Projections Delta Trace File for writing...\n");
219     }
220   }
221 #else
222   if (nonDeltaLog) {
223     do {
224       fp = fopen(fname, mode);
225     } while (!fp && (errno == EINTR || errno == EMFILE));
226     if (!fp) {
227       CkPrintf("[%d] Attempting to open file [%s]\n",CkMyPe(),fname);
228       CmiAbort("Cannot open Projections Non Delta Trace File for writing...\n");
229     }
230   }
231   if (deltaLog) {
232     do {
233       deltafp = fopen(dfname, mode);
234     } while (!deltafp && (errno == EINTR || errno == EMFILE));
235     if(!deltafp) 
236       CmiAbort("Cannot open Projections Delta Trace File for writing...\n");
237   }
238 #endif
239 }
240
241 void LogPool::closeLog(void)
242 {
243 #if CMK_PROJECTIONS_USE_ZLIB
244   if(compressed) {
245     if (nonDeltaLog) gzclose(zfp);
246     if (deltaLog) gzclose(deltazfp);
247     return;
248   }
249 #endif
250   if (nonDeltaLog) { 
251 #if !defined(_WIN32) || defined(__CYGWIN__)
252     fsync(fileno(fp)); 
253 #endif
254     fclose(fp); 
255   }
256   if (deltaLog)  { 
257 #if !defined(_WIN32) || defined(__CYGWIN__)
258     fsync(fileno(deltafp)); 
259 #endif
260     fclose(deltafp);  
261   }
262 }
263
264 LogPool::LogPool(char *pgm) {
265   pool = new LogEntry[CkpvAccess(CtrLogBufSize)];
266   // defaults to writing data (no outlier changes)
267   writeData = true;
268   numEntries = 0;
269   // **CW** for simple delta encoding
270   prevTime = 0.0;
271   timeErr = 0.0;
272   globalStartTime = 0.0;
273   globalEndTime = 0.0;
274   headerWritten = 0;
275   numPhases = 0;
276   hasFlushed = false;
277
278   keepPhase = NULL;
279
280   fileCreated = false;
281   poolSize = CkpvAccess(CtrLogBufSize);
282   pgmname = new char[strlen(pgm)+1];
283   strcpy(pgmname, pgm);
284 }
285
286 void LogPool::createFile(const char *fix)
287 {
288   if (fileCreated) {
289     return;
290   }
291
292   char* filenameLastPart = strrchr(pgmname, PATHSEP) + 1; // Last occurrence of path separator
293   char *pathPlusFilePrefix = new char[1024];
294
295   if(nSubdirs > 0){
296     int sd = CkMyPe() % nSubdirs;
297     char *subdir = new char[1024];
298     sprintf(subdir, "%s.projdir.%d", pgmname, sd);
299     CmiMkdir(subdir);
300     sprintf(pathPlusFilePrefix, "%s%c%s%s", subdir, PATHSEP, filenameLastPart, fix);
301     delete[] subdir;
302   } else {
303     sprintf(pathPlusFilePrefix, "%s%s", pgmname, fix);
304   }
305
306   char pestr[10];
307   sprintf(pestr, "%d", CkMyPe());
308 #if CMK_PROJECTIONS_USE_ZLIB
309   int len;
310   if(compressed)
311     len = strlen(pathPlusFilePrefix)+strlen(".logold")+strlen(pestr)+strlen(".gz")+3;
312   else
313     len = strlen(pathPlusFilePrefix)+strlen(".logold")+strlen(pestr)+3;
314 #else
315   int len = strlen(pathPlusFilePrefix)+strlen(".logold")+strlen(pestr)+3;
316 #endif
317
318   if (nonDeltaLog) {
319     fname = new char[len];
320   }
321   if (deltaLog) {
322     dfname = new char[len];
323   }
324 #if CMK_PROJECTIONS_USE_ZLIB
325   if(compressed) {
326     if (deltaLog && nonDeltaLog) {
327       sprintf(fname, "%s.%s.logold.gz",  pathPlusFilePrefix, pestr);
328       sprintf(dfname, "%s.%s.log.gz", pathPlusFilePrefix, pestr);
329     } else {
330       if (nonDeltaLog) {
331         sprintf(fname, "%s.%s.log.gz", pathPlusFilePrefix,pestr);
332       } else {
333         sprintf(dfname, "%s.%s.log.gz", pathPlusFilePrefix, pestr);
334       }
335     }
336   } else {
337     if (deltaLog && nonDeltaLog) {
338       sprintf(fname, "%s.%s.logold", pathPlusFilePrefix, pestr);
339       sprintf(dfname, "%s.%s.log", pathPlusFilePrefix, pestr);
340     } else {
341       if (nonDeltaLog) {
342         sprintf(fname, "%s.%s.log", pathPlusFilePrefix, pestr);
343       } else {
344         sprintf(dfname, "%s.%s.log", pathPlusFilePrefix, pestr);
345       }
346     }
347   }
348 #else
349   if (deltaLog && nonDeltaLog) {
350     sprintf(fname, "%s.%s.logold", pathPlusFilePrefix, pestr);
351     sprintf(dfname, "%s.%s.log", pathPlusFilePrefix, pestr);
352   } else {
353     if (nonDeltaLog) {
354       sprintf(fname, "%s.%s.log", pathPlusFilePrefix, pestr);
355     } else {
356       sprintf(dfname, "%s.%s.log", pathPlusFilePrefix, pestr);
357     }
358   }
359 #endif
360   fileCreated = true;
361   delete[] pathPlusFilePrefix;
362   openLog("w+");
363   CLOSE_LOG 
364 }
365
366 void LogPool::createSts(const char *fix)
367 {
368   CkAssert(CkMyPe() == 0);
369   // create the sts file
370   char *fname = new char[strlen(CkpvAccess(traceRoot))+strlen(fix)+strlen(".sts")+2];
371   sprintf(fname, "%s%s.sts", CkpvAccess(traceRoot), fix);
372   do
373     {
374       stsfp = fopen(fname, "w");
375     } while (!stsfp && (errno == EINTR || errno == EMFILE));
376   if(stsfp==0){
377     CmiPrintf("Cannot open projections sts file for writing due to %s\n", strerror(errno));
378     CmiAbort("Error!!\n");
379   }
380   delete[] fname;
381 }  
382
383 void LogPool::createTopo(const char *fix)
384 {
385   CkAssert(CkMyPe() == 0);
386   // create the topo file
387   char *fname = new char[strlen(CkpvAccess(traceRoot))+strlen(fix)+strlen(".topo")+2];
388   sprintf(fname, "%s%s.topo", CkpvAccess(traceRoot), fix);
389   do
390     {
391       topofp = fopen(fname, "w");
392     } while (!stsfp && (errno == EINTR || errno == EMFILE));
393   if(stsfp==0){
394     CmiPrintf("Cannot open projections topo file for writing due to %s\n", strerror(errno));
395     CmiAbort("Error!!\n");
396   }
397   delete[] fname;
398 }  
399
400 void LogPool::createRC()
401 {
402   // create the projections rc file.
403   fname = 
404     new char[strlen(CkpvAccess(traceRoot))+strlen(".projrc")+1];
405   sprintf(fname, "%s.projrc", CkpvAccess(traceRoot));
406   do {
407     rcfp = fopen(fname, "w");
408   } while (!rcfp && (errno == EINTR || errno == EMFILE));
409   if (rcfp==0) {
410     CmiAbort("Cannot open projections configuration file for writing.\n");
411   }
412   delete[] fname;
413 }
414
415 LogPool::~LogPool() 
416 {
417   if (writeData) {
418     writeLog();
419 #if !CMK_TRACE_LOGFILE_NUM_CONTROL
420     closeLog();
421 #endif
422   }
423
424 #if CMK_BIGSIM_CHARM
425   extern int correctTimeLog;
426   if (correctTimeLog) {
427     createFile("-bg");
428     if (CkMyPe() == 0) {
429       createSts("-bg");
430     }
431     writeHeader();
432     if (CkMyPe() == 0) writeSts(NULL);
433     postProcessLog();
434   }
435 #endif
436
437   delete[] pool;
438   delete [] fname;
439 }
440
441 void LogPool::writeHeader()
442 {
443   if (headerWritten) return;
444   headerWritten = 1;
445   if(!binary) {
446 #if CMK_PROJECTIONS_USE_ZLIB
447     if(compressed) {
448       if (nonDeltaLog) {
449         gzprintf(zfp, "PROJECTIONS-RECORD %d\n", numEntries);
450       }
451       if (deltaLog) {
452         gzprintf(deltazfp, "PROJECTIONS-RECORD %d DELTA\n", numEntries);
453       }
454     } 
455     else /* else clause is below... */
456 #endif
457     /*... may hang over from else above */ {
458       if (nonDeltaLog) {
459         fprintf(fp, "PROJECTIONS-RECORD %d\n", numEntries);
460       }
461       if (deltaLog) {
462         fprintf(deltafp, "PROJECTIONS-RECORD %d DELTA\n", numEntries);
463       }
464     }
465   }
466   else { // binary
467       if (nonDeltaLog) {
468         fwrite(&numEntries,sizeof(numEntries),1,fp);
469       }
470       if (deltaLog) {
471         fwrite(&numEntries,sizeof(numEntries),1,deltafp);
472       }
473   }
474 }
475
476 void LogPool::writeLog(void)
477 {
478   createFile();
479   OPEN_LOG
480   writeHeader();
481   if (nonDeltaLog) write(0);
482   if (deltaLog) write(1);
483   CLOSE_LOG
484 }
485
486 void LogPool::write(int writedelta) 
487 {
488   // **CW** Simple delta encoding implementation
489   // prevTime has to be maintained as an object variable because
490   // LogPool::write may be called several times depending on the
491   // +logsize value.
492   PUP::er *p = NULL;
493   if (binary) {
494     p = new PUP::toDisk(writedelta?deltafp:fp);
495   }
496 #if CMK_PROJECTIONS_USE_ZLIB
497   else if (compressed) {
498     p = new toProjectionsGZFile(writedelta?deltazfp:zfp);
499   }
500 #endif
501   else {
502     p = new toProjectionsFile(writedelta?deltafp:fp);
503   }
504   CmiAssert(p);
505   int curPhase = 0;
506   // **FIXME** - Should probably consider a more sophisticated bounds-based
507   //   approach for selective writing instead of making multiple if-checks
508   //   for every single event.
509   for(UInt i=0; i<numEntries; i++) {
510     if (!writedelta) {
511       if (keepPhase == NULL) {
512         // default case, when no phase selection is required.
513         pool[i].pup(*p);
514       } else {
515         // **FIXME** Might be a good idea to create a "filler" event block for
516         //   all the events taken out by phase filtering.
517         if (pool[i].type == END_PHASE) {
518           // always write phase markers
519           pool[i].pup(*p);
520           curPhase++;
521         } else if (pool[i].type == BEGIN_COMPUTATION ||
522                    pool[i].type == END_COMPUTATION) {
523           // always write BEGIN and END COMPUTATION markers
524           pool[i].pup(*p);
525         } else if (keepPhase[curPhase]) {
526           pool[i].pup(*p);
527         }
528       }
529     }
530     else {      // delta
531       // **FIXME** Implement phase-selective writing for delta logs
532       //   eventually
533       double time = pool[i].time;
534       if (pool[i].type != BEGIN_COMPUTATION && pool[i].type != END_COMPUTATION)
535       {
536         double timeDiff = (time-prevTime)*1.0e6;
537         UInt intTimeDiff = (UInt)timeDiff;
538         timeErr += timeDiff - intTimeDiff; /* timeErr is never >= 2.0 */
539         if (timeErr > 1.0) {
540           timeErr -= 1.0;
541           intTimeDiff++;
542         }
543         pool[i].time = intTimeDiff/1.0e6;
544       }
545       pool[i].pup(*p);
546       pool[i].time = time;      // restore time value
547       prevTime = time;
548     }
549   }
550   delete p;
551   delete [] keepPhase;
552 }
553
554 void LogPool::writeSts(void)
555 {
556   // for whining compilers
557   int i;
558   // generate an automatic unique ID for each log
559   fprintf(stsfp, "PROJECTIONS_ID %s\n", "");
560   fprintf(stsfp, "VERSION %s\n", PROJECTION_VERSION);
561   fprintf(stsfp, "TOTAL_PHASES %d\n", numPhases);
562 #if CMK_HAS_COUNTER_PAPI
563   fprintf(stsfp, "TOTAL_PAPI_EVENTS %d\n", NUMPAPIEVENTS);
564   // for now, use i, next time use papiEvents[i].
565   // **CW** papi event names is a hack.
566   char eventName[PAPI_MAX_STR_LEN];
567   for (i=0;i<NUMPAPIEVENTS;i++) {
568     PAPI_event_code_to_name(papiEvents[i], eventName);
569     fprintf(stsfp, "PAPI_EVENT %d %s\n", i, eventName);
570   }
571 #endif
572   traceWriteSTS(stsfp,CkpvAccess(usrEvents)->length());
573   for(i=0;i<CkpvAccess(usrEvents)->length();i++){
574     fprintf(stsfp, "EVENT %d %s\n", (*CkpvAccess(usrEvents))[i]->e, (*CkpvAccess(usrEvents))[i]->str);
575   }     
576 }
577
578 void LogPool::writeSts(TraceProjections *traceProj){
579   writeSts();
580   if (traceProj != NULL) {
581     CkHashtableIterator  *funcIter = traceProj->getfuncIterator();
582     funcIter->seekStart();
583     int numFuncs = traceProj->getFuncNumber();
584     fprintf(stsfp,"TOTAL_FUNCTIONS %d \n",numFuncs);
585     while(funcIter->hasNext()) {
586       StrKey *key;
587       int *obj = (int *)funcIter->next((void **)&key);
588       fprintf(stsfp,"FUNCTION %d %s \n",*obj,key->getStr());
589     }
590   }
591   fprintf(stsfp, "END\n");
592   fclose(stsfp);
593 }
594
595 void LogPool::writeRC(void)
596 {
597     //CkPrintf("write RC is being executed\n");
598 #ifdef PROJ_ANALYSIS  
599     CkAssert(CkMyPe() == 0);
600     fprintf(rcfp,"RC_GLOBAL_START_TIME %lld\n",
601             (CMK_PUP_LONG_LONG)(1.0e6*globalStartTime));
602     fprintf(rcfp,"RC_GLOBAL_END_TIME   %lld\n",
603             (CMK_PUP_LONG_LONG)(1.0e6*globalEndTime));
604     /* //Yanhua comment it because isOutlierAutomatic is not a variable in trace
605     if (CkpvAccess(_trace)->isOutlierAutomatic()) {
606       fprintf(rcfp,"RC_OUTLIER_FILTERED true\n");
607     } else {
608       fprintf(rcfp,"RC_OUTLIER_FILTERED false\n");
609     }
610     */
611 #endif //PROJ_ANALYSIS
612   fclose(rcfp);
613 }
614
615 void LogPool::writeTopo(void)
616 {
617   TopoManager tmgr;
618   tmgr.printAllocation(topofp);
619   fclose(topofp);
620 }
621
622 #if CMK_BIGSIM_CHARM
623 static void updateProjLog(void *data, double t, double recvT, void *ptr)
624 {
625   LogEntry *log = (LogEntry *)data;
626   FILE *fp = *(FILE **)ptr;
627   log->time = t;
628   log->recvTime = recvT<0.0?0:recvT;
629 //  log->write(fp);
630   toProjectionsFile p(fp);
631   log->pup(p);
632 }
633 #endif
634
635 // flush log entries to disk
636 void LogPool::flushLogBuffer()
637 {
638   if (numEntries) {
639     double writeTime = TraceTimer();
640     writeLog();
641     hasFlushed = true;
642     numEntries = 0;
643     new (&pool[numEntries++]) LogEntry(writeTime, BEGIN_INTERRUPT);
644     new (&pool[numEntries++]) LogEntry(TraceTimer(), END_INTERRUPT);
645   }
646 }
647
648 void LogPool::add(UChar type, UShort mIdx, UShort eIdx,
649                   double time, int event, int pe, int ml, CmiObjId *id, 
650                   double recvT, double cpuT, int numPe)
651 {
652   new (&pool[numEntries++])
653     LogEntry(time, type, mIdx, eIdx, event, pe, ml, id, recvT, cpuT, numPe);
654   if ((type == END_PHASE) || (type == END_COMPUTATION)) {
655     numPhases++;
656   }
657   if(poolSize==numEntries) {
658     flushLogBuffer();
659 #if CMK_BIGSIM_CHARM
660     extern int correctTimeLog;
661     if (correctTimeLog) CmiAbort("I/O interrupt!\n");
662 #endif
663   }
664 #if CMK_BIGSIM_CHARM
665   switch (type) {
666     case BEGIN_PROCESSING:
667       pool[numEntries-1].recvTime = BgGetRecvTime();
668     case END_PROCESSING:
669     case BEGIN_COMPUTATION:
670     case END_COMPUTATION:
671     case CREATION:
672     case BEGIN_PACK:
673     case END_PACK:
674     case BEGIN_UNPACK:
675     case END_UNPACK:
676     case USER_EVENT_PAIR:
677       bgAddProjEvent(&pool[numEntries-1], numEntries-1, time, updateProjLog, &fp, BG_EVENT_PROJ);
678   }
679 #endif
680 }
681
682 void LogPool::add(UChar type,double time,UShort funcID,int lineNum,char *fileName){
683 #ifndef CMK_BIGSIM_CHARM
684   new (&pool[numEntries++])
685         LogEntry(time,type,funcID,lineNum,fileName);
686   if(poolSize == numEntries){
687     flushLogBuffer();
688   }
689 #endif  
690 }
691
692
693   
694 void LogPool::addMemoryUsage(unsigned char type,double time,double memUsage){
695 #ifndef CMK_BIGSIM_CHARM
696   new (&pool[numEntries++])
697         LogEntry(type,time,memUsage);
698   if(poolSize == numEntries){
699     flushLogBuffer();
700   }
701 #endif  
702         
703 }  
704
705
706
707 void LogPool::addUserSupplied(int data){
708         // add an event
709         add(USER_SUPPLIED, 0, 0, TraceTimer(), -1, -1, 0, 0, 0, 0, 0 );
710
711         // set the user supplied value for the previously created event 
712         pool[numEntries-1].setUserSuppliedData(data);
713   }
714
715
716 void LogPool::addUserSuppliedNote(char *note){
717         // add an event
718         add(USER_SUPPLIED_NOTE, 0, 0, TraceTimer(), -1, -1, 0, 0, 0, 0, 0 );
719
720         // set the user supplied note for the previously created event 
721         pool[numEntries-1].setUserSuppliedNote(note);
722   }
723
724 void LogPool::addUserSuppliedBracketedNote(char *note, int eventID, double bt, double et){
725   //CkPrintf("LogPool::addUserSuppliedBracketedNote eventID=%d\n", eventID);
726 #ifndef CMK_BIGSIM_CHARM
727 #if MPI_TRACE_MACHINE_HACK
728   //This part of code is used  to combine the contiguous
729   //MPI_Test and MPI_Iprobe events to reduce the number of
730   //entries
731 #define MPI_TEST_EVENT_ID 60
732 #define MPI_IPROBE_EVENT_ID 70 
733   int lastEvent = pool[numEntries-1].event;
734   if((eventID==MPI_TEST_EVENT_ID || eventID==MPI_IPROBE_EVENT_ID) && (eventID==lastEvent)){
735     //just replace the endtime of last event
736     //CkPrintf("addUserSuppliedBracketNote: for event %d\n", lastEvent);
737     pool[numEntries].endTime = et;
738   }else{
739     new (&pool[numEntries++])
740       LogEntry(bt, et, USER_SUPPLIED_BRACKETED_NOTE, note, eventID);
741   }
742 #else
743   new (&pool[numEntries++])
744     LogEntry(bt, et, USER_SUPPLIED_BRACKETED_NOTE, note, eventID);
745 #endif
746   if(poolSize == numEntries){
747     flushLogBuffer();
748   }
749 #endif  
750 }
751
752
753 /* **CW** Not sure if this is the right thing to do. Feels more like
754    a hack than a solution to Sameer's request to add the destination
755    processor information to multicasts and broadcasts.
756
757    In the unlikely event this method is used for Broadcasts as well,
758    pelist == NULL will be used to indicate a global broadcast with 
759    num PEs.
760 */
761 void LogPool::addCreationMulticast(UShort mIdx, UShort eIdx, double time,
762                                    int event, int pe, int ml, CmiObjId *id,
763                                    double recvT, int numPe, int *pelist)
764 {
765   new (&pool[numEntries++])
766     LogEntry(time, mIdx, eIdx, event, pe, ml, id, recvT, numPe, pelist);
767   if(poolSize==numEntries) {
768     flushLogBuffer();
769   }
770 }
771
772 void LogPool::postProcessLog()
773 {
774 #if CMK_BIGSIM_CHARM
775   bgUpdateProj(1);   // event type
776 #endif
777 }
778
779 void LogPool::modLastEntryTimestamp(double ts)
780 {
781   pool[numEntries-1].time = ts;
782   //pool[numEntries-1].cputime = ts;
783 }
784
785 // /** Constructor for a multicast log entry */
786 // 
787 //  THIS WAS MOVED TO trace-projections.h with the other constructors
788 // 
789 // LogEntry::LogEntry(double tm, unsigned short m, unsigned short e, int ev, int p,
790 //           int ml, CmiObjId *d, double rt, int numPe, int *pelist) 
791 // {
792 //     type = CREATION_MULTICAST; mIdx = m; eIdx = e; event = ev; pe = p; time = tm; msglen = ml;
793 //     if (d) id = *d; else {id.id[0]=id.id[1]=id.id[2]=id.id[3]=-1; };
794 //     recvTime = rt; 
795 //     numpes = numPe;
796 //     userSuppliedNote = NULL;
797 //     if (pelist != NULL) {
798 //      pes = new int[numPe];
799 //      for (int i=0; i<numPe; i++) {
800 //        pes[i] = pelist[i];
801 //      }
802 //     } else {
803 //      pes= NULL;
804 //     }
805 // }
806
807 //void LogEntry::addPapi(LONG_LONG_PAPI *papiVals)
808 //{
809 //#if CMK_HAS_COUNTER_PAPI
810 //   memcpy(papiValues, papiVals, sizeof(LONG_LONG_PAPI)*NUMPAPIEVENTS);
811 //#endif
812 //}
813
814
815
816 void LogEntry::pup(PUP::er &p)
817 {
818   int i;
819   CMK_TYPEDEF_UINT8 itime, iEndTime, irecvtime, icputime;
820   char ret = '\n';
821
822   p|type;
823   if (p.isPacking()) itime = (CMK_TYPEDEF_UINT8)(1.0e6*time);
824   if (p.isPacking()) iEndTime = (CMK_TYPEDEF_UINT8)(1.0e6*endTime);
825
826   switch (type) {
827     case USER_EVENT:
828     case USER_EVENT_PAIR:
829       p|mIdx; p|itime; p|event; p|pe;
830       break;
831     case BEGIN_IDLE:
832     case END_IDLE:
833     case BEGIN_PACK:
834     case END_PACK:
835     case BEGIN_UNPACK:
836     case END_UNPACK:
837       p|itime; p|pe; 
838       break;
839     case BEGIN_PROCESSING:
840       if (p.isPacking()) {
841         irecvtime = (CMK_TYPEDEF_UINT8)(recvTime==-1?-1:1.0e6*recvTime);
842         icputime = (CMK_TYPEDEF_UINT8)(1.0e6*cputime);
843       }
844       p|mIdx; p|eIdx; p|itime; p|event; p|pe; 
845       p|msglen; p|irecvtime; 
846       p|id.id[0]; p|id.id[1]; p|id.id[2]; p|id.id[3];
847       p|icputime;
848 #if CMK_HAS_COUNTER_PAPI
849       //p|numPapiEvents;
850       for (i=0; i<NUMPAPIEVENTS; i++) {
851         // not yet!!!
852         //      p|papiIDs[i]; 
853         p|papiValues[i];
854         
855       }
856 #else
857       //p|numPapiEvents;     // non papi version has value 0
858 #endif
859       if (p.isUnpacking()) {
860         recvTime = irecvtime/1.0e6;
861         cputime = icputime/1.0e6;
862       }
863       break;
864     case END_PROCESSING:
865       if (p.isPacking()) icputime = (CMK_TYPEDEF_UINT8)(1.0e6*cputime);
866       p|mIdx; p|eIdx; p|itime; p|event; p|pe; p|msglen; p|icputime;
867 #if CMK_HAS_COUNTER_PAPI
868       //p|numPapiEvents;
869       for (i=0; i<NUMPAPIEVENTS; i++) {
870         // not yet!!!
871         //      p|papiIDs[i];
872         p|papiValues[i];
873       }
874 #else
875       //p|numPapiEvents;  // non papi version has value 0
876 #endif
877       if (p.isUnpacking()) cputime = icputime/1.0e6;
878       break;
879     case USER_SUPPLIED:
880           p|userSuppliedData;
881           p|itime;
882         break;
883     case USER_SUPPLIED_NOTE:
884           p|itime;
885           int length;
886           if (p.isPacking()) length = strlen(userSuppliedNote);
887           p | length;
888           char space;
889           space = ' ';
890           p | space;
891           if (p.isUnpacking()) {
892             userSuppliedNote = new char[length+1];
893             userSuppliedNote[length] = '\0';
894           }
895           PUParray(p,userSuppliedNote, length);
896           break;
897     case USER_SUPPLIED_BRACKETED_NOTE:
898       //CkPrintf("Writting out a USER_SUPPLIED_BRACKETED_NOTE\n");
899           p|itime;
900           p|iEndTime;
901           p|event;
902           int length2;
903           if (p.isPacking()) length2 = strlen(userSuppliedNote);
904           p | length2;
905           char space2;
906           space2 = ' ';
907           p | space2;
908           if (p.isUnpacking()) {
909             userSuppliedNote = new char[length+1];
910             userSuppliedNote[length] = '\0';
911           }
912           PUParray(p,userSuppliedNote, length2);
913           break;
914     case MEMORY_USAGE_CURRENT:
915       p | memUsage;
916       p | itime;
917         break;
918     case CREATION:
919       if (p.isPacking()) irecvtime = (CMK_TYPEDEF_UINT8)(1.0e6*recvTime);
920       p|mIdx; p|eIdx; p|itime;
921       p|event; p|pe; p|msglen; p|irecvtime;
922       if (p.isUnpacking()) recvTime = irecvtime/1.0e6;
923       break;
924     case CREATION_BCAST:
925       if (p.isPacking()) irecvtime = (CMK_TYPEDEF_UINT8)(1.0e6*recvTime);
926       p|mIdx; p|eIdx; p|itime;
927       p|event; p|pe; p|msglen; p|irecvtime; p|numpes;
928       if (p.isUnpacking()) recvTime = irecvtime/1.0e6;
929       break;
930     case CREATION_MULTICAST:
931       if (p.isPacking()) irecvtime = (CMK_TYPEDEF_UINT8)(1.0e6*recvTime);
932       p|mIdx; p|eIdx; p|itime;
933       p|event; p|pe; p|msglen; p|irecvtime; p|numpes;
934       if (p.isUnpacking()) pes = numpes?new int[numpes]:NULL;
935       for (i=0; i<numpes; i++) p|pes[i];
936       if (p.isUnpacking()) recvTime = irecvtime/1.0e6;
937       break;
938     case MESSAGE_RECV:
939       p|mIdx; p|eIdx; p|itime; p|event; p|pe; p|msglen;
940       break;
941
942     case ENQUEUE:
943     case DEQUEUE:
944       p|mIdx; p|itime; p|event; p|pe;
945       break;
946
947     case BEGIN_INTERRUPT:
948     case END_INTERRUPT:
949       p|itime; p|event; p|pe;
950       break;
951
952       // **CW** absolute timestamps are used here to support a quick
953       // way of determining the total time of a run in projections
954       // visualization.
955     case BEGIN_COMPUTATION:
956     case END_COMPUTATION:
957     case BEGIN_TRACE:
958     case END_TRACE:
959       p|itime;
960       break;
961     case BEGIN_FUNC:
962         p | itime;
963         p | mIdx;
964         p | event;
965         if(!p.isUnpacking()){
966                 p(fName,flen-1);
967         }
968         break;
969     case END_FUNC:
970         p | itime;
971         p | mIdx;
972         break;
973     case END_PHASE:
974       p|eIdx; // FIXME: actually the phase ID
975       p|itime;
976       break;
977     default:
978       CmiError("***Internal Error*** Wierd Event %d.\n", type);
979       break;
980   }
981   if (p.isUnpacking()) time = itime/1.0e6;
982   p|ret;
983 }
984
985 TraceProjections::TraceProjections(char **argv): 
986   curevent(0), inEntry(0), computationStarted(0), 
987         converseExit(0), endTime(0.0), traceNestedEvents(0),
988         currentPhaseID(0), lastPhaseEvent(NULL)
989 {
990   //  CkPrintf("Trace projections dummy constructor called on %d\n",CkMyPe());
991
992   if (CkpvAccess(traceOnPe) == 0) return;
993
994   CtvInitialize(int,curThreadEvent);
995   CkpvInitialize(CmiInt8, CtrLogBufSize);
996   CkpvAccess(CtrLogBufSize) = DefaultLogBufSize;
997   CtvAccess(curThreadEvent)=0;
998   if (CmiGetArgLongDesc(argv,"+logsize",&CkpvAccess(CtrLogBufSize), 
999                        "Log entries to buffer per I/O")) {
1000     if (CkMyPe() == 0) {
1001       CmiPrintf("Trace: logsize: %ld\n", CkpvAccess(CtrLogBufSize));
1002     }
1003   }
1004   checknested = 
1005     CmiGetArgFlagDesc(argv,"+checknested",
1006                       "check projections nest begin end execute events");
1007   traceNestedEvents = 
1008     CmiGetArgFlagDesc(argv,"+tracenested",
1009               "trace projections nest begin/end execute events");
1010   int binary = 
1011     CmiGetArgFlagDesc(argv,"+binary-trace",
1012                       "Write log files in binary format");
1013
1014   CmiInt8 nSubdirs = 0;
1015   CmiGetArgLongDesc(argv,"+trace-subdirs", &nSubdirs, "Number of subdirectories into which traces will be written");
1016
1017
1018 #if CMK_PROJECTIONS_USE_ZLIB
1019   int compressed = CmiGetArgFlagDesc(argv,"+gz-trace","Write log files pre-compressed with gzip");
1020 #else
1021   // consume the flag so there's no confusing
1022   CmiGetArgFlagDesc(argv,"+gz-trace",
1023                     "Write log files pre-compressed with gzip");
1024   if(CkMyPe() == 0) CkPrintf("Warning> gz-trace is not supported on this machine!\n");
1025 #endif
1026
1027   // **CW** default to non delta log encoding. The user may choose to do
1028   // create both logs (for debugging) or just the old log timestamping
1029   // (for compatibility).
1030   // Generating just the non delta log takes precedence over generating
1031   // both logs (if both arguments appear on the command line).
1032
1033   // switch to OLD log format until everything works // Gengbin
1034   nonDeltaLog = 1;
1035   deltaLog = 0;
1036   deltaLog = CmiGetArgFlagDesc(argv, "+logDelta",
1037                                   "Generate Delta encoded and simple timestamped log files");
1038
1039   _logPool = new LogPool(CkpvAccess(traceRoot));
1040   _logPool->setNumSubdirs(nSubdirs);
1041   _logPool->setBinary(binary);
1042 #if CMK_PROJECTIONS_USE_ZLIB
1043   _logPool->setCompressed(compressed);
1044 #endif
1045   if (CkMyPe() == 0) {
1046     _logPool->createSts();
1047     _logPool->createRC();
1048     _logPool->createTopo();
1049   }
1050   funcCount=1;
1051
1052 #if CMK_HAS_COUNTER_PAPI
1053   // We initialize and create the event sets for use with PAPI here.
1054   int papiRetValue;
1055   if(CkMyRank()==0){
1056     papiRetValue = PAPI_library_init(PAPI_VER_CURRENT);
1057     if (papiRetValue != PAPI_VER_CURRENT) {
1058       CmiAbort("PAPI Library initialization failure!\n");
1059     }
1060 #if CMK_SMP
1061     if(PAPI_thread_init(pthread_self) != PAPI_OK){
1062       CmiAbort("PAPI could not be initialized in SMP mode!\n");
1063     }
1064 #endif
1065   }
1066
1067 #if CMK_SMP
1068   //PAPI_thread_init has to finish before calling PAPI_create_eventset
1069   CmiNodeAllBarrier();
1070 #endif
1071   // PAPI 3 mandates the initialization of the set to PAPI_NULL
1072   papiEventSet = PAPI_NULL; 
1073   if (PAPI_create_eventset(&papiEventSet) != PAPI_OK) {
1074     CmiAbort("PAPI failed to create event set!\n");
1075   }
1076   papiRetValue = PAPI_add_events(papiEventSet, papiEvents, NUMPAPIEVENTS);
1077   if (papiRetValue != PAPI_OK) {
1078     if (papiRetValue == PAPI_ECNFLCT) {
1079       CmiAbort("PAPI events conflict! Please re-assign event types!\n");
1080     } else {
1081       CmiAbort("PAPI failed to add designated events!\n");
1082     }
1083   }
1084   memset(papiValues, 0, NUMPAPIEVENTS*sizeof(LONG_LONG_PAPI));
1085 #endif
1086 }
1087
1088 int TraceProjections::traceRegisterUserEvent(const char* evt, int e)
1089 {
1090   OPTIMIZED_VERSION
1091   CkAssert(e==-1 || e>=0);
1092   CkAssert(evt != NULL);
1093   int event;
1094   int biggest = -1;
1095   for (int i=0; i<CkpvAccess(usrEvents)->length(); i++) {
1096     int cur = (*CkpvAccess(usrEvents))[i]->e;
1097     if (cur == e) {
1098       //CmiPrintf("%s %s\n", (*CkpvAccess(usrEvents))[i]->str, evt);
1099       if (strcmp((*CkpvAccess(usrEvents))[i]->str, evt) == 0) 
1100         return e;
1101       else
1102         CmiAbort("UserEvent double registered!");
1103     }
1104     if (cur > biggest) biggest = cur;
1105   }
1106   // if no user events have so far been registered. biggest will be -1
1107   // and hence newly assigned event numbers will begin from 0.
1108   if (e==-1) event = biggest+1;  // automatically assign new event number
1109   else event = e;
1110   CkpvAccess(usrEvents)->push_back(new UsrEvent(event,(char *)evt));
1111   return event;
1112 }
1113
1114 void TraceProjections::traceClearEps(void)
1115 {
1116   // In trace-summary, this zeros out the EP bins, to eliminate noise
1117   // from startup.  Here, this isn't useful, since we can do that in
1118   // post-processing
1119 }
1120
1121 void TraceProjections::traceWriteSts(void)
1122 {
1123   if(CkMyPe()==0)
1124     _logPool->writeSts(this);
1125 }
1126
1127 /** 
1128  * **IMPT NOTES**:
1129  *
1130  * This is called when Converse closes during ConverseCommonExit().
1131  * **FIXME**(?) - is this also exposed as a tracing-framework API call?
1132  *
1133  * Some programs bypass CkExit() (like NAMD, which eventually calls
1134  * ConverseExit()), modules like traces will have to pretend to shutdown
1135  * as if CkExit() was called but at the same time avoid making
1136  * subsequent CkExit() calls (which is usually required for allowing
1137  * other modules to shutdown).
1138  *
1139  * Note that we can only get here if CkExit() was not called, since the
1140  * trace module will un-register itself from TraceArray if it did.
1141  *
1142  */
1143 void TraceProjections::traceClose(void)
1144 {
1145 #ifdef PROJ_ANALYSIS
1146   // CkPrintf("CkExit was not called on shutdown on [%d]\n", CkMyPe());
1147
1148   // sets the flag that tells the code not to make the CkExit call later
1149   converseExit = 1;
1150   if (CkMyPe() == 0) {
1151     CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1152     bocProxy.traceProjectionsParallelShutdown(-1);
1153   }
1154   if(CkMyRank() == CkMyNodeSize()){ //communication thread
1155     CkpvAccess(_trace)->endComputation();
1156     delete _logPool;              // will write
1157     // remove myself from traceArray so that no tracing will be called.
1158     CkpvAccess(_traces)->removeTrace(this);
1159   }
1160 #else
1161   // we've already deleted the logpool, so multiple calls to traceClose
1162   // are tolerated.
1163   if (_logPool == NULL) {
1164     return;
1165   }
1166   if(CkMyPe()==0){
1167     _logPool->writeSts(this);
1168   }
1169   CkpvAccess(_trace)->endComputation();
1170   delete _logPool;              // will write
1171   // remove myself from traceArray so that no tracing will be called.
1172   CkpvAccess(_traces)->removeTrace(this);
1173 #endif
1174 }
1175
1176 /**
1177  *  **IMPT NOTES**:
1178  *
1179  *  This is meant to be called internally by the tracing framework.
1180  *
1181  */
1182 void TraceProjections::closeTrace() {
1183   //  CkPrintf("Close Trace called on [%d]\n", CkMyPe());
1184   if (CkMyPe() == 0) {
1185     // CkPrintf("Pe 0 will now write sts and projrc files\n");
1186     _logPool->writeSts(this);
1187     _logPool->writeRC();
1188     _logPool->writeTopo();
1189     // CkPrintf("Pe 0 has now written sts and projrc files\n");
1190   }
1191   delete _logPool;       // will write logs to file
1192 }
1193
1194 #if CMK_SMP_TRACE_COMMTHREAD
1195 void TraceProjections::traceBeginOnCommThread()
1196 {
1197   if (!computationStarted) return;
1198   _logPool->add(BEGIN_TRACE, 0, 0, TraceTimer(), curevent++, CmiNumPes()+CmiMyNode());
1199 }
1200
1201 void TraceProjections::traceEndOnCommThread()
1202 {
1203   _logPool->add(END_TRACE, 0, 0, TraceTimer(), curevent++, CmiNumPes()+CmiMyNode());
1204 }
1205 #endif
1206
1207 void TraceProjections::traceBegin(void)
1208 {
1209   if (!computationStarted) return;
1210   _logPool->add(BEGIN_TRACE, 0, 0, TraceTimer(), curevent++, CkMyPe());
1211 }
1212
1213 void TraceProjections::traceEnd(void)
1214 {
1215   _logPool->add(END_TRACE, 0, 0, TraceTimer(), curevent++, CkMyPe());
1216 }
1217
1218 void TraceProjections::userEvent(int e)
1219 {
1220   if (!computationStarted) return;
1221   _logPool->add(USER_EVENT, e, 0, TraceTimer(),curevent++,CkMyPe());
1222 }
1223
1224 void TraceProjections::userBracketEvent(int e, double bt, double et)
1225 {
1226   if (!computationStarted) return;
1227   // two events record Begin/End of event e.
1228   _logPool->add(USER_EVENT_PAIR, e, 0, TraceTimer(bt), curevent, CkMyPe());
1229   _logPool->add(USER_EVENT_PAIR, e, 0, TraceTimer(et), curevent++, CkMyPe());
1230 }
1231
1232 void TraceProjections::userSuppliedData(int d)
1233 {
1234   if (!computationStarted) return;
1235   _logPool->addUserSupplied(d);
1236 }
1237
1238 void TraceProjections::userSuppliedNote(char *note)
1239 {
1240   if (!computationStarted) return;
1241   _logPool->addUserSuppliedNote(note);
1242 }
1243
1244
1245 void TraceProjections::userSuppliedBracketedNote(char *note, int eventID, double bt, double et)
1246 {
1247   if (!computationStarted) return;
1248   _logPool->addUserSuppliedBracketedNote(note,  eventID,  bt, et);
1249 }
1250
1251 void TraceProjections::memoryUsage(double m)
1252 {
1253   if (!computationStarted) return;
1254   _logPool->addMemoryUsage(MEMORY_USAGE_CURRENT, TraceTimer(), m );
1255   
1256 }
1257
1258
1259 void TraceProjections::creation(envelope *e, int ep, int num)
1260 {
1261   double curTime = TraceTimer();
1262   if (e == 0) {
1263     CtvAccess(curThreadEvent) = curevent;
1264     _logPool->add(CREATION, ForChareMsg, ep, curTime,
1265                   curevent++, CkMyPe(), 0, NULL, 0, 0.0);
1266   } else {
1267     int type=e->getMsgtype();
1268     e->setEvent(curevent);
1269     if (num > 1) {
1270       _logPool->add(CREATION_BCAST, type, ep, curTime,
1271                     curevent++, CkMyPe(), e->getTotalsize(), 
1272                     NULL, 0, 0.0, num);
1273     } else {
1274       _logPool->add(CREATION, type, ep, curTime,
1275                     curevent++, CkMyPe(), e->getTotalsize(), 
1276                     NULL, 0, 0.0);
1277     }
1278   }
1279 }
1280
1281 //This function is only called from a comm thread in SMP mode. 
1282 void TraceProjections::creation(char *msg)
1283 {
1284 #if CMK_SMP_TRACE_COMMTHREAD
1285         // msg must be a charm message
1286     envelope *e = (envelope *)msg;
1287     int ep = e->getEpIdx();
1288     if(_entryTable[ep]->traceEnabled) {
1289         creation(e, ep, 1);
1290         e->setSrcPe(CkMyPe());              // pretend I am the sender
1291     }
1292 #endif
1293 }
1294
1295 void TraceProjections::traceCommSetMsgID(char *msg)
1296 {
1297 #if CMK_SMP_TRACE_COMMTHREAD
1298     // msg must be a charm message
1299     envelope *e = (envelope *)msg;
1300     int ep = e->getEpIdx();
1301     if(_entryTable[ep]->traceEnabled) {
1302         e->setSrcPe(CkMyPe());              // pretend I am the sender
1303         e->setEvent(curevent);
1304     }
1305 #endif
1306 }
1307
1308 void TraceProjections::traceGetMsgID(char *msg, int *pe, int *event)
1309 {
1310     // msg must be a charm message
1311     *pe = *event = -1;
1312     envelope *e = (envelope *)msg;
1313     int ep = e->getEpIdx();
1314     if(_entryTable[ep]->traceEnabled) {
1315         *pe = e->getSrcPe();
1316         *event = e->getEvent();
1317     }
1318 }
1319
1320 void TraceProjections::traceSetMsgID(char *msg, int pe, int event)
1321 {
1322        // msg must be a charm message
1323     envelope *e = (envelope *)msg;
1324     int ep = e->getEpIdx();
1325     if(ep<=0 || ep>=_entryTable.size()) return;
1326     if (e->getSrcPe()<0 || e->getSrcPe()>=CkNumPes()+CkNumNodes()) return;
1327     if (e->getMsgtype()<=0 || e->getMsgtype()>=LAST_CK_ENVELOPE_TYPE) return;
1328     if(_entryTable[ep]->traceEnabled) {
1329         e->setSrcPe(pe);
1330         e->setEvent(event);
1331     }
1332 }
1333
1334 /* **CW** Non-disruptive attempt to add destination PE knowledge to
1335    Communication Library-specific Multicasts via new event 
1336    CREATION_MULTICAST.
1337 */
1338
1339 void TraceProjections::creationMulticast(envelope *e, int ep, int num,
1340                                          int *pelist)
1341 {
1342   double curTime = TraceTimer();
1343   if (e==0) {
1344     CtvAccess(curThreadEvent)=curevent;
1345     _logPool->addCreationMulticast(ForChareMsg, ep, curTime, curevent++,
1346                                    CkMyPe(), 0, 0, 0.0, num, pelist);
1347   } else {
1348     int type=e->getMsgtype();
1349     e->setEvent(curevent);
1350     _logPool->addCreationMulticast(type, ep, curTime, curevent++, CkMyPe(),
1351                                    e->getTotalsize(), 0, 0.0, num, pelist);
1352   }
1353 }
1354
1355 void TraceProjections::creationDone(int num)
1356 {
1357   // modified the creation done time of the last num log entries
1358   // FIXME: totally a hack
1359   double curTime = TraceTimer();
1360   int idx = _logPool->numEntries-1;
1361   while (idx >=0 && num >0 ) {
1362     LogEntry &log = _logPool->pool[idx];
1363     if ((log.type == CREATION) ||
1364         (log.type == CREATION_BCAST) ||
1365         (log.type == CREATION_MULTICAST)) {
1366       log.recvTime = curTime - log.time;
1367       num --;
1368     }
1369     idx--;
1370   }
1371 }
1372
1373 void TraceProjections::beginExecute(CmiObjId *tid)
1374 {
1375 #if CMK_HAS_COUNTER_PAPI
1376   if (PAPI_read(papiEventSet, papiValues) != PAPI_OK) {
1377     CmiAbort("PAPI failed to read at begin execute!\n");
1378   }
1379 #endif
1380   if (checknested && inEntry) CmiAbort("Nested Begin Execute!\n");
1381   execEvent = CtvAccess(curThreadEvent);
1382   execEp = (-1);
1383   _logPool->add(BEGIN_PROCESSING,ForChareMsg,_threadEP,TraceTimer(),
1384                 execEvent,CkMyPe(), 0, tid);
1385 #if CMK_HAS_COUNTER_PAPI
1386   _logPool->addPapi(papiValues);
1387 #endif
1388   inEntry = 1;
1389 }
1390
1391 void TraceProjections::beginExecute(envelope *e)
1392 {
1393   if(e==0) {
1394 #if CMK_HAS_COUNTER_PAPI
1395     if (PAPI_read(papiEventSet, papiValues) != PAPI_OK) {
1396       CmiAbort("PAPI failed to read at begin execute!\n");
1397     }
1398 #endif
1399     if (checknested && inEntry) CmiAbort("Nested Begin Execute!\n");
1400     execEvent = CtvAccess(curThreadEvent);
1401     execEp = (-1);
1402     _logPool->add(BEGIN_PROCESSING,ForChareMsg,_threadEP,TraceTimer(),
1403                   execEvent,CkMyPe(), 0, NULL, 0.0, TraceCpuTimer());
1404 #if CMK_HAS_COUNTER_PAPI
1405     _logPool->addPapi(papiValues);
1406 #endif
1407     inEntry = 1;
1408   } else {
1409     beginExecute(e->getEvent(),e->getMsgtype(),e->getEpIdx(),
1410                  e->getSrcPe(),e->getTotalsize());
1411   }
1412 }
1413
1414 void TraceProjections::beginExecute(char *msg){
1415 #if CMK_SMP_TRACE_COMMTHREAD
1416         //This function is called from comm thread in SMP mode
1417     envelope *e = (envelope *)msg;
1418     int ep = e->getEpIdx();
1419     if(_entryTable[ep]->traceEnabled)
1420                 beginExecute(e);
1421 #endif
1422 }
1423
1424 void TraceProjections::beginExecute(int event, int msgType, int ep, int srcPe,
1425                                     int mlen, CmiObjId *idx)
1426 {
1427   if (traceNestedEvents) {
1428     if (! nestedEvents.isEmpty()) {
1429       endExecuteLocal();
1430     }
1431     nestedEvents.enq(NestedEvent(event, msgType, ep, srcPe, mlen, idx));
1432   }
1433   beginExecuteLocal(event, msgType, ep, srcPe, mlen, idx);
1434 }
1435
1436 void TraceProjections::changeLastEntryTimestamp(double ts)
1437 {
1438   _logPool->modLastEntryTimestamp(ts);
1439 }
1440
1441 void TraceProjections::beginExecuteLocal(int event, int msgType, int ep, int srcPe,
1442                                     int mlen, CmiObjId *idx)
1443 {
1444 #if CMK_HAS_COUNTER_PAPI
1445   if (PAPI_read(papiEventSet, papiValues) != PAPI_OK) {
1446     CmiAbort("PAPI failed to read at begin execute!\n");
1447   }
1448 #endif
1449   if (checknested && inEntry) CmiAbort("Nested Begin Execute!\n");
1450   execEvent=event;
1451   execEp=ep;
1452   execPe=srcPe;
1453   _logPool->add(BEGIN_PROCESSING,msgType,ep,TraceTimer(),event,
1454                 srcPe, mlen, idx, 0.0, TraceCpuTimer());
1455 #if CMK_HAS_COUNTER_PAPI
1456   _logPool->addPapi(papiValues);
1457 #endif
1458   inEntry = 1;
1459 }
1460
1461 void TraceProjections::endExecute(void)
1462 {
1463   if (traceNestedEvents) nestedEvents.deq();
1464   endExecuteLocal();
1465   if (traceNestedEvents) {
1466     if (! nestedEvents.isEmpty()) {
1467       NestedEvent &ne = nestedEvents.peek();
1468       beginExecuteLocal(ne.event, ne.msgType, ne.ep, ne.srcPe, ne.ml, ne.idx);
1469     }
1470   }
1471 }
1472
1473 void TraceProjections::endExecute(char *msg)
1474 {
1475 #if CMK_SMP_TRACE_COMMTHREAD
1476         //This function is called from comm thread in SMP mode
1477     envelope *e = (envelope *)msg;
1478     int ep = e->getEpIdx();
1479     if(_entryTable[ep]->traceEnabled)
1480                 endExecute();
1481 #endif  
1482 }
1483
1484 void TraceProjections::endExecuteLocal(void)
1485 {
1486 #if CMK_HAS_COUNTER_PAPI
1487   if (PAPI_read(papiEventSet, papiValues) != PAPI_OK) {
1488     CmiAbort("PAPI failed to read at end execute!\n");
1489   }
1490 #endif
1491   if (checknested && !inEntry) CmiAbort("Nested EndExecute!\n");
1492   double cputime = TraceCpuTimer();
1493   if(execEp == (-1)) {
1494     _logPool->add(END_PROCESSING, 0, _threadEP, TraceTimer(),
1495                   execEvent, CkMyPe(), 0, NULL, 0.0, cputime);
1496   } else {
1497     _logPool->add(END_PROCESSING, 0, execEp, TraceTimer(),
1498                   execEvent, execPe, 0, NULL, 0.0, cputime);
1499   }
1500 #if CMK_HAS_COUNTER_PAPI
1501   _logPool->addPapi(papiValues);
1502 #endif
1503   inEntry = 0;
1504 }
1505
1506 void TraceProjections::messageRecv(char *env, int pe)
1507 {
1508 #if 0
1509   envelope *e = (envelope *)env;
1510   int msgType = e->getMsgtype();
1511   int ep = e->getEpIdx();
1512 #if 0
1513   if (msgType==NewChareMsg || msgType==NewVChareMsg
1514           || msgType==ForChareMsg || msgType==ForVidMsg
1515           || msgType==BocInitMsg || msgType==NodeBocInitMsg
1516           || msgType==ForBocMsg || msgType==ForNodeBocMsg)
1517     ep = e->getEpIdx();
1518   else
1519     ep = _threadEP;
1520 #endif
1521   _logPool->add(MESSAGE_RECV, msgType, ep, TraceTimer(),
1522                 curevent++, e->getSrcPe(), e->getTotalsize());
1523 #endif
1524 }
1525
1526 void TraceProjections::beginIdle(double curWallTime)
1527 {
1528   _logPool->add(BEGIN_IDLE, 0, 0, TraceTimer(curWallTime), 0, CkMyPe());
1529 }
1530
1531 void TraceProjections::endIdle(double curWallTime)
1532 {
1533   _logPool->add(END_IDLE, 0, 0, TraceTimer(curWallTime), 0, CkMyPe());
1534 }
1535
1536 void TraceProjections::beginPack(void)
1537 {
1538   _logPool->add(BEGIN_PACK, 0, 0, TraceTimer(), 0, CkMyPe());
1539 }
1540
1541 void TraceProjections::endPack(void)
1542 {
1543   _logPool->add(END_PACK, 0, 0, TraceTimer(), 0, CkMyPe());
1544 }
1545
1546 void TraceProjections::beginUnpack(void)
1547 {
1548   _logPool->add(BEGIN_UNPACK, 0, 0, TraceTimer(), 0, CkMyPe());
1549 }
1550
1551 void TraceProjections::endUnpack(void)
1552 {
1553   _logPool->add(END_UNPACK, 0, 0, TraceTimer(), 0, CkMyPe());
1554 }
1555
1556 void TraceProjections::enqueue(envelope *) {}
1557
1558 void TraceProjections::dequeue(envelope *) {}
1559
1560 void TraceProjections::beginComputation(void)
1561 {
1562   computationStarted = 1;
1563
1564   // Executes the callback function provided by the machine
1565   // layer. This is the proper method to register user events in a
1566   // machine layer because projections is a charm++ module.
1567   if (CkpvAccess(traceOnPe) != 0) {
1568     void (*ptr)() = registerMachineUserEvents();
1569     if (ptr != NULL) {
1570       ptr();
1571     }
1572   }
1573 //  CkpvAccess(traceInitTime) = TRACE_TIMER();
1574 //  CkpvAccess(traceInitCpuTime) = TRACE_CPUTIMER();
1575   _logPool->add(BEGIN_COMPUTATION, 0, 0, TraceTimer(), -1, -1);
1576 #if CMK_HAS_COUNTER_PAPI
1577   // we start the counters here
1578   if (PAPI_start(papiEventSet) != PAPI_OK) {
1579     CmiAbort("PAPI failed to start designated counters!\n");
1580   }
1581 #endif
1582 }
1583
1584 void TraceProjections::endComputation(void)
1585 {
1586 #if CMK_HAS_COUNTER_PAPI
1587   // we stop the counters here. A silent failure is alright since we
1588   // are already at the end of the program.
1589   if (PAPI_stop(papiEventSet, papiValues) != PAPI_OK) {
1590     CkPrintf("Warning: PAPI failed to stop correctly!\n");
1591   }
1592   // NOTE: We should not do a complete close of PAPI until after the
1593   // sts writer is done.
1594 #endif
1595   endTime = TraceTimer();
1596   _logPool->add(END_COMPUTATION, 0, 0, endTime, -1, -1);
1597   /*
1598   CkPrintf("End Computation [%d] records time as %lf\n", CkMyPe(), 
1599            endTime*1e06);
1600   */
1601 }
1602
1603 int TraceProjections::idxRegistered(int idx)
1604 {
1605     int idxVecLen = idxVec.size();
1606     for(int i=0; i<idxVecLen; i++)
1607     {
1608         if(idx == idxVec[i])
1609             return 1;
1610     }
1611     return 0;
1612 }
1613
1614 void TraceProjections::regFunc(const char *name, int &idx, int idxSpecifiedByUser){
1615     StrKey k((char*)name,strlen(name));
1616     int num = funcHashtable.get(k);
1617     
1618     if(num!=0) {
1619         return;
1620         //as for mpi programs, the same function may be registered for several times
1621         //CmiError("\"%s has been already registered! Please change the name!\"\n", name);
1622     }
1623     
1624     int isIdxExisting=0;
1625     if(idxSpecifiedByUser)
1626         isIdxExisting=idxRegistered(idx);
1627     if(isIdxExisting){
1628         return;
1629         //same reason with num!=0
1630         //CmiError("The identifier %d for the trace function has been already registered!", idx);
1631     }
1632
1633     if(idxSpecifiedByUser) {
1634         char *st = new char[strlen(name)+1];
1635         memcpy(st,name,strlen(name)+1);
1636         StrKey *newKey = new StrKey(st,strlen(st));
1637         int &ref = funcHashtable.put(*newKey);
1638         ref=idx;
1639         funcCount++;
1640         idxVec.push_back(idx);  
1641     } else {
1642         char *st = new char[strlen(name)+1];
1643         memcpy(st,name,strlen(name)+1);
1644         StrKey *newKey = new StrKey(st,strlen(st));
1645         int &ref = funcHashtable.put(*newKey);
1646         ref=funcCount;
1647         num = funcCount;
1648         funcCount++;
1649         idx = num;
1650         idxVec.push_back(idx);
1651     }
1652 }
1653
1654 void TraceProjections::beginFunc(char *name,char *file,int line){
1655         StrKey k(name,strlen(name));    
1656         unsigned short  num = (unsigned short)funcHashtable.get(k);
1657         beginFunc(num,file,line);
1658 }
1659
1660 void TraceProjections::beginFunc(int idx,char *file,int line){
1661         if(idx <= 0){
1662                 CmiError("Unregistered function id %d being used in %s:%d \n",idx,file,line);
1663         }       
1664         _logPool->add(BEGIN_FUNC,TraceTimer(),idx,line,file);
1665 }
1666
1667 void TraceProjections::endFunc(char *name){
1668         StrKey k(name,strlen(name));    
1669         int num = funcHashtable.get(k);
1670         endFunc(num);   
1671 }
1672
1673 void TraceProjections::endFunc(int num){
1674         if(num <= 0){
1675                 printf("endFunc without start :O\n");
1676         }
1677         _logPool->add(END_FUNC,TraceTimer(),num,0,NULL);
1678 }
1679
1680 // specialized PUP:ers for handling trace projections logs
1681 void toProjectionsFile::bytes(void *p,int n,size_t itemSize,dataType t)
1682 {
1683   for (int i=0;i<n;i++) 
1684     switch(t) {
1685     case Tchar: CheckAndFPrintF(f,"%c",((char *)p)[i]); break;
1686     case Tuchar:
1687     case Tbyte: CheckAndFPrintF(f,"%d",((unsigned char *)p)[i]); break;
1688     case Tshort: CheckAndFPrintF(f," %d",((short *)p)[i]); break;
1689     case Tushort: CheckAndFPrintF(f," %u",((unsigned short *)p)[i]); break;
1690     case Tint: CheckAndFPrintF(f," %d",((int *)p)[i]); break;
1691     case Tuint: CheckAndFPrintF(f," %u",((unsigned int *)p)[i]); break;
1692     case Tlong: CheckAndFPrintF(f," %ld",((long *)p)[i]); break;
1693     case Tulong: CheckAndFPrintF(f," %lu",((unsigned long *)p)[i]); break;
1694     case Tfloat: CheckAndFPrintF(f," %.7g",((float *)p)[i]); break;
1695     case Tdouble: CheckAndFPrintF(f," %.15g",((double *)p)[i]); break;
1696 #ifdef CMK_PUP_LONG_LONG
1697     case Tlonglong: CheckAndFPrintF(f," %lld",((CMK_PUP_LONG_LONG *)p)[i]); break;
1698     case Tulonglong: CheckAndFPrintF(f," %llu",((unsigned CMK_PUP_LONG_LONG *)p)[i]); break;
1699 #endif
1700     default: CmiAbort("Unrecognized pup type code!");
1701     };
1702 }
1703
1704 void fromProjectionsFile::bytes(void *p,int n,size_t itemSize,dataType t)
1705 {
1706   for (int i=0;i<n;i++) 
1707     switch(t) {
1708     case Tchar: { 
1709       char c = fgetc(f);
1710       if (c==EOF)
1711         parseError("Could not match character");
1712       else
1713         ((char *)p)[i] = c;
1714       break;
1715     }
1716     case Tuchar:
1717     case Tbyte: ((unsigned char *)p)[i]=(unsigned char)readInt("%d"); break;
1718     case Tshort:((short *)p)[i]=(short)readInt(); break;
1719     case Tushort: ((unsigned short *)p)[i]=(unsigned short)readUint(); break;
1720     case Tint:  ((int *)p)[i]=readInt(); break;
1721     case Tuint: ((unsigned int *)p)[i]=readUint(); break;
1722     case Tlong: ((long *)p)[i]=readInt(); break;
1723     case Tulong:((unsigned long *)p)[i]=readUint(); break;
1724     case Tfloat: ((float *)p)[i]=(float)readDouble(); break;
1725     case Tdouble:((double *)p)[i]=readDouble(); break;
1726 #ifdef CMK_PUP_LONG_LONG
1727     case Tlonglong: ((CMK_PUP_LONG_LONG *)p)[i]=readLongInt(); break;
1728     case Tulonglong: ((unsigned CMK_PUP_LONG_LONG *)p)[i]=readLongInt(); break;
1729 #endif
1730     default: CmiAbort("Unrecognized pup type code!");
1731     };
1732 }
1733
1734 #if CMK_PROJECTIONS_USE_ZLIB
1735 void toProjectionsGZFile::bytes(void *p,int n,size_t itemSize,dataType t)
1736 {
1737   for (int i=0;i<n;i++) 
1738     switch(t) {
1739     case Tchar: gzprintf(f,"%c",((char *)p)[i]); break;
1740     case Tuchar:
1741     case Tbyte: gzprintf(f,"%d",((unsigned char *)p)[i]); break;
1742     case Tshort: gzprintf(f," %d",((short *)p)[i]); break;
1743     case Tushort: gzprintf(f," %u",((unsigned short *)p)[i]); break;
1744     case Tint: gzprintf(f," %d",((int *)p)[i]); break;
1745     case Tuint: gzprintf(f," %u",((unsigned int *)p)[i]); break;
1746     case Tlong: gzprintf(f," %ld",((long *)p)[i]); break;
1747     case Tulong: gzprintf(f," %lu",((unsigned long *)p)[i]); break;
1748     case Tfloat: gzprintf(f," %.7g",((float *)p)[i]); break;
1749     case Tdouble: gzprintf(f," %.15g",((double *)p)[i]); break;
1750 #ifdef CMK_PUP_LONG_LONG
1751     case Tlonglong: gzprintf(f," %lld",((CMK_PUP_LONG_LONG *)p)[i]); break;
1752     case Tulonglong: gzprintf(f," %llu",((unsigned CMK_PUP_LONG_LONG *)p)[i]); break;
1753 #endif
1754     default: CmiAbort("Unrecognized pup type code!");
1755     };
1756 }
1757 #endif
1758
1759 void TraceProjections::endPhase() {
1760   double currentPhaseTime = TraceTimer();
1761   if (lastPhaseEvent != NULL) {
1762   } else {
1763     if (_logPool->pool != NULL) {
1764       // assumed to be BEGIN_COMPUTATION
1765     } else {
1766       CkPrintf("[%d] Warning: End Phase encountered in an empty log. Inserting BEGIN_COMPUTATION event\n", CkMyPe());
1767       _logPool->add(BEGIN_COMPUTATION, 0, 0, currentPhaseTime, -1, -1);
1768     }
1769   }
1770
1771   /* Insert endPhase event here. */
1772   /* FIXME: Format should be TYPE, PHASE#, TimeStamp, [StartTime] */
1773   /*        We currently "borrow" from the standard add() method. */
1774   /*        It should really be its own add() method.             */
1775   /* NOTE: assignment to lastPhaseEvent is "pre-emptive".         */
1776   lastPhaseEvent = &(_logPool->pool[_logPool->numEntries]);
1777   _logPool->add(END_PHASE, 0, currentPhaseID, currentPhaseTime, -1, CkMyPe());
1778   currentPhaseID++;
1779 }
1780
1781 #ifdef PROJ_ANALYSIS
1782 // ***** FROM HERE, ALL BOC-BASED FUNCTIONALITY IS DEFINED *******
1783
1784
1785 // ***@@@@ REGISTRATION FUNCTIONS/METHODS @@@@***
1786
1787 void registerOutlierReduction() {
1788   outlierReductionType =
1789     CkReduction::addReducer(outlierReduction);
1790   minMaxReductionType =
1791     CkReduction::addReducer(minMaxReduction);
1792 }
1793
1794 /**
1795  * **IMPT NOTES**:
1796  *
1797  * This is the C++ code that is registered to be activated at module
1798  * shutdown. This is called exactly once on processor 0. Module shutdown
1799  * is initiated as a result of a CkExit() call by the application code
1800  * 
1801  * The exit function must ultimately call CkExit() again to
1802  * so that other module exit functions may proceed after this module is
1803  * done.
1804  *
1805  */
1806 // FIXME: WHY extern "C"???
1807 extern "C" void TraceProjectionsExitHandler()
1808 {
1809 #if CMK_TRACE_ENABLED
1810   // CkPrintf("[%d] TraceProjectionsExitHandler called!\n", CkMyPe());
1811   CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1812   bocProxy.traceProjectionsParallelShutdown(CkMyPe());
1813 #else
1814   CkExit();
1815 #endif
1816 }
1817
1818 // This is called once on each processor but the idiom of use appears
1819 // to be to only have processor 0 register the function.
1820 //
1821 // See initnode in trace-projections.ci
1822 void initTraceProjectionsBOC()
1823 {
1824   // CkPrintf("[%d] Trace Projections initialization called!\n", CkMyPe());
1825 #ifdef __BIGSIM__
1826   if (BgNodeRank() == 0) {
1827 #else
1828     if (CkMyRank() == 0) {
1829 #endif
1830       registerExitFn(TraceProjectionsExitHandler);
1831     }
1832 #if 0
1833   } // this is so indentation does not get messed up
1834 #endif
1835 }
1836
1837 // mainchare for trace-projections BOC-operations. 
1838 // Instantiated at processor 0 and ONLY resides on processor 0 for the 
1839 // rest of its life.
1840 //
1841 // Responsible for:
1842 //   1. Handling commandline arguments
1843 //   2. Creating any objects required for proper BOC operations.
1844 //
1845 TraceProjectionsInit::TraceProjectionsInit(CkArgMsg *msg) {
1846   /** Options for Outlier Analysis */
1847   // defaults. Things will change with support for interactive analysis.
1848   bool findOutliers = false;
1849   bool outlierAutomatic = true;
1850   int numKSeeds = 10; 
1851
1852   int peNumKeep = CkNumPes();  // used as a default
1853   double entryThreshold = 0.0;
1854   bool outlierUsePhases = false;
1855   if (outlierAutomatic) {
1856     CmiGetArgIntDesc(msg->argv, "+outlierNumSeeds", &numKSeeds,
1857                      "Number of cluster seeds to apply at outlier analysis.");
1858     CmiGetArgIntDesc(msg->argv, "+outlierPeNumKeep", 
1859                      &peNumKeep, "Number of Processors to retain data");
1860     CmiGetArgDoubleDesc(msg->argv, "+outlierEpThresh", &entryThreshold,
1861                         "Minimum significance of entry points to be considered for clustering (%).");
1862     findOutliers =
1863       CmiGetArgFlagDesc(msg->argv,"+outlier", "Find Outliers.");
1864     outlierUsePhases = 
1865       CmiGetArgFlagDesc(msg->argv,"+outlierUsePhases",
1866                         "Apply automatic outlier analysis to any available phases.");
1867     if (outlierUsePhases) {
1868       // if the user wants to use an outlier feature, it is assumed outlier
1869       //    analysis is desired.
1870       findOutliers = true;
1871     }
1872   }
1873   bool findStartTime = (CmiTimerAbsolute()==1);
1874   traceProjectionsGID = CProxy_TraceProjectionsBOC::ckNew(findOutliers, findStartTime);
1875   if (findOutliers) {
1876     kMeansGID = CProxy_KMeansBOC::ckNew(outlierAutomatic,
1877                                         numKSeeds,
1878                                         peNumKeep,
1879                                         entryThreshold,
1880                                         outlierUsePhases);
1881   }
1882 }
1883
1884 // Called on every processor.
1885 void TraceProjectionsBOC::traceProjectionsParallelShutdown(int pe) {
1886   //CmiPrintf("[%d] traceProjectionsParallelShutdown called from . \n", CkMyPe(), pe);
1887   endPe = pe;                // the pe that starts CkExit()
1888   if (CkMyPe() == 0) {
1889     analysisStartTime = CmiWallTimer();
1890   }
1891   CkpvAccess(_trace)->endComputation();
1892   // no more tracing for projections on this processor after this point. 
1893   // Note that clear must be called after remove, or bad things will happen.
1894   CkpvAccess(_traces)->removeTrace(CkpvAccess(_trace));
1895   CkpvAccess(_traces)->clearTrace();
1896
1897   // From this point, we start multiple chains of reductions and broadcasts to
1898   // perform final online analysis activities.
1899
1900   // Start all parallel operations at once. 
1901   //   These MUST NOT modify base performance data in LogPool. If they must,
1902   //   then the parallel operations must be phased (and this code to be
1903   //   restructured as necessary)
1904   CProxy_KMeansBOC kMeansProxy(kMeansGID);
1905   CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1906   if (findOutliers) {
1907     parModulesRemaining++;
1908     kMeansProxy[CkMyPe()].startKMeansAnalysis();
1909   }
1910   parModulesRemaining++;
1911   if (findStartTime) 
1912   bocProxy[CkMyPe()].startTimeAnalysis();
1913   else
1914   bocProxy[CkMyPe()].startEndTimeAnalysis();
1915 }
1916
1917 // Called on each processor
1918 void KMeansBOC::startKMeansAnalysis() {
1919   // Initialize all necessary structures
1920   LogPool *pool = CkpvAccess(_trace)->_logPool;
1921
1922  if(CkMyPe()==0)     CkPrintf("[%d] KMeansBOC::startKMeansAnalysis time=\t%g\n", CkMyPe(), CkWallTimer() );
1923   int flushInt = 0;
1924   if (pool->hasFlushed) {
1925     flushInt = 1;
1926   }
1927   
1928   CkCallback cb(CkIndex_KMeansBOC::flushCheck(NULL), 
1929                 0, thisProxy);
1930   contribute(sizeof(int), &flushInt, CkReduction::logical_or, cb);  
1931 }
1932
1933 // Called on processor 0
1934 void KMeansBOC::flushCheck(CkReductionMsg *msg) {
1935   int someFlush = *((int *)msg->getData());
1936
1937   // if(CkMyPe()==0) CkPrintf("[%d] KMeansBOC::flushCheck time=\t%g\n", CkMyPe(), CkWallTimer() );
1938   
1939   if (someFlush == 0) {
1940     // Data intact proceed with KMeans analysis
1941     CProxy_KMeansBOC kMeansProxy(kMeansGID);
1942     kMeansProxy.flushCheckDone();
1943   } else {
1944     // Some processor had flushed it data at some point, abandon KMeans
1945     CkPrintf("Warning: Some processor has flushed its data. No KMeans will be conducted\n");
1946     // terminate KMeans
1947     CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1948     bocProxy[0].kMeansDone();
1949   }
1950 }
1951
1952 // Called on each processor
1953 void KMeansBOC::flushCheckDone() {
1954   // **FIXME** - more flexible metric collection scheme may be necessary
1955   //   in the future for production use.
1956   LogPool *pool = CkpvAccess(_trace)->_logPool;
1957
1958   // if(CkMyPe()==0)     CkPrintf("[%d] KMeansBOC::flushCheckDone time=\t%g\n", CkMyPe(), CkWallTimer() );
1959
1960   numEntryMethods = _entryTable.size();
1961   numMetrics = numEntryMethods + 2; // EPtime + idle and overhead
1962
1963   // maintained across phases
1964   markedBegin = false;
1965   markedIdle = false;
1966   beginBlockTime = 0.0;
1967   beginIdleBlockTime = 0.0;
1968   lastBeginEPIdx = -1; // none
1969
1970   lastPhaseIdx = 0;
1971   currentExecTimes = NULL;
1972   currentPhase = 0;
1973   selected = false;
1974
1975   pool->initializePhases();
1976
1977   // incoming K Seeds and the per-phase filter
1978   incKSeeds = new double[numK*numMetrics];
1979   keepMetric = new bool[numMetrics];
1980
1981   //  Something wrong when call thisProxy[CkMyPe()].getNextPhaseMetrics() !??!
1982   //  CProxy_KMeansBOC kMeansProxy(kMeansGID);
1983   //  kMeansProxy[CkMyPe()].getNextPhaseMetrics();
1984   thisProxy[CkMyPe()].getNextPhaseMetrics();
1985 }
1986
1987 // Called on each processor.
1988 void KMeansBOC::getNextPhaseMetrics() {
1989   // Assumes the presence of the complete logs on this processor.
1990   // Assumes first event is always BEGIN_COMPUTATION
1991   // Assumes each processor sees the same number of phases.
1992   //
1993   // In this code, we collect performance data for this processor.
1994   // All times are in seconds.
1995
1996   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::getNextPhaseMetrics time=\t%g\n", CkMyPe(), CkWallTimer() );  
1997
1998   if (usePhases) {
1999     DEBUGF("[%d] Using Phases\n", CkMyPe());
2000   } else {
2001     DEBUGF("[%d] NOT using Phases\n", CkMyPe());
2002   }
2003   
2004   if (currentExecTimes != NULL) {
2005     delete [] currentExecTimes;
2006   }
2007   currentExecTimes = new double[numMetrics];
2008   for (int i=0; i<numMetrics; i++) {
2009     currentExecTimes[i] = 0.0;
2010   }
2011
2012   int numEventMethods = _entryTable.size();
2013   LogPool *pool = CkpvAccess(_trace)->_logPool;
2014   
2015   CkAssert(pool->numEntries > lastPhaseIdx);
2016   double totalPhaseTime = 0.0;
2017   double totalActiveTime = 0.0; // entry method + idle
2018
2019   for (int i=lastPhaseIdx; i<pool->numEntries; i++) {
2020     if (pool->pool[i].type == BEGIN_PROCESSING) {
2021       // check pairing
2022       if (!markedBegin) {
2023         markedBegin = true;
2024       }
2025       beginBlockTime = pool->pool[i].time;
2026       lastBeginEPIdx = pool->pool[i].eIdx;
2027     } else if (pool->pool[i].type == END_PROCESSING) {
2028       // check pairing
2029       // if End without a begin, just ignore
2030       //   this event. If a phase-boundary is crossed, the Begin
2031       //   event would be maintained in beginBlockTime, so it is 
2032       //   not a problem.
2033       if (markedBegin) {
2034         markedBegin = false;
2035         if (pool->pool[i].event < 0)
2036         {
2037           // ignore dummy events. **FIXME** as they have no eIdx?
2038           continue;
2039         }
2040         currentExecTimes[pool->pool[i].eIdx] += 
2041           pool->pool[i].time - beginBlockTime;
2042         totalActiveTime += pool->pool[i].time - beginBlockTime;
2043         lastBeginEPIdx = -1;
2044       }
2045     } else if (pool->pool[i].type == BEGIN_IDLE) {
2046       // check pairing
2047       if (!markedIdle) {
2048         markedIdle = true;
2049       }
2050       beginIdleBlockTime = pool->pool[i].time;
2051     } else if (pool->pool[i].type == END_IDLE) {
2052       // check pairing
2053       if (markedIdle) {
2054         markedIdle = false;
2055         currentExecTimes[numEventMethods] += 
2056           pool->pool[i].time - beginIdleBlockTime;
2057         totalActiveTime += pool->pool[i].time - beginIdleBlockTime;
2058       }
2059     } else if (pool->pool[i].type == END_PHASE) {
2060       // ignored when not using phases
2061       if (usePhases) {
2062         // when we've not visited this node before
2063         if (i != lastPhaseIdx) { 
2064           totalPhaseTime = 
2065             pool->pool[i].time - pool->pool[lastPhaseIdx].time;
2066           // it is important that proper accounting of time take place here.
2067           // Note that END_PHASE events inevitably occur in the context of
2068           //   some entry method by the way the tracing API is designed.
2069           if (markedBegin) {
2070             CkAssert(lastBeginEPIdx >= 0);
2071             currentExecTimes[lastBeginEPIdx] += 
2072               pool->pool[i].time - beginBlockTime;
2073             totalActiveTime += pool->pool[i].time - beginBlockTime;
2074             // this is so the remainder contributes to the next phase
2075             beginBlockTime = pool->pool[i].time;
2076           }
2077           // The following is unlikely, but stranger things have happened.
2078           if (markedIdle) {
2079             currentExecTimes[numEventMethods] +=
2080               pool->pool[i].time - beginIdleBlockTime;
2081             totalActiveTime += pool->pool[i].time - beginIdleBlockTime;
2082             // this is so the remainder contributes to the next phase
2083             beginIdleBlockTime = pool->pool[i].time;
2084           }
2085           if (totalActiveTime <= totalPhaseTime) {
2086             currentExecTimes[numEventMethods+1] = 
2087               totalPhaseTime - totalActiveTime;
2088           } else {
2089             currentExecTimes[numEventMethods+1] = 0.0;
2090             CkPrintf("[%d] Warning: Overhead found to be negative for Phase %d!\n",
2091                      CkMyPe(), currentPhase);
2092           }
2093           collectKMeansData();
2094           // end the loop (and method) and defer the work till the next call
2095           lastPhaseIdx = i;
2096           break; 
2097         }
2098       }
2099     } else if (pool->pool[i].type == END_COMPUTATION) {
2100       if (markedBegin) {
2101         CkAssert(lastBeginEPIdx >= 0);
2102         currentExecTimes[lastBeginEPIdx] += 
2103           pool->pool[i].time - beginBlockTime;
2104         totalActiveTime += pool->pool[i].time - beginBlockTime;
2105       }
2106       if (markedIdle) {
2107         currentExecTimes[numEventMethods] +=
2108           pool->pool[i].time - beginIdleBlockTime;
2109         totalActiveTime += pool->pool[i].time - beginIdleBlockTime;
2110       }
2111       totalPhaseTime = 
2112         pool->pool[i].time - pool->pool[lastPhaseIdx].time;
2113       if (totalActiveTime <= totalPhaseTime) {
2114         currentExecTimes[numEventMethods+1] = totalPhaseTime - totalActiveTime;
2115       } else {
2116         currentExecTimes[numEventMethods+1] = 0.0;
2117         CkPrintf("[%d] Warning: Overhead found to be negative!\n",
2118                  CkMyPe());
2119       }
2120       collectKMeansData();
2121     }
2122   }
2123 }
2124
2125 /**
2126  *  Through a reduction, collectKMeansData aggregates each processors' data
2127  *  in order for global properties to be determined:
2128  *  
2129  *  1. min & max to determine normalization factors.
2130  *  2. sum to determine global EP averages for possible metric reduction
2131  *       through thresholding.
2132  *  3. sum of squares to compute stddev which may be useful in the future.
2133  *
2134  *  collectKMeansData will also keep the processor's data for the current
2135  *    phase so that it may be normalized and worked on subsequently.
2136  *
2137  **/
2138 void KMeansBOC::collectKMeansData() {
2139   int minOffset = numMetrics;
2140   int maxOffset = 2*numMetrics;
2141   int sosOffset = 3*numMetrics; // sos = Sum Of Squares
2142
2143   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::collectKMeansData time=\tg\n", CkMyPe(), CkWallTimer() );
2144
2145   double *reductionMsg = new double[numMetrics*4];
2146
2147   for (int i=0; i<numMetrics; i++) {
2148     reductionMsg[i] = currentExecTimes[i];
2149     // duplicate the event times for max and min sections of the reduction
2150     reductionMsg[minOffset + i] = currentExecTimes[i];
2151     reductionMsg[maxOffset + i] = currentExecTimes[i];
2152     // compute squares
2153     reductionMsg[sosOffset + i] = currentExecTimes[i]*currentExecTimes[i];
2154   }
2155
2156   CkCallback cb(CkIndex_KMeansBOC::globalMetricRefinement(NULL), 
2157                 0, thisProxy);
2158   contribute((numMetrics*4)*sizeof(double), reductionMsg, 
2159              outlierReductionType, cb);  
2160 }
2161
2162 // The purpose is mainly to initialize the k seeds and generate
2163 //   normalization parameters for each of the metrics. The k seeds
2164 //   and normalization parameters are broadcast to all processors.
2165 //
2166 // Called on processor 0
2167 void KMeansBOC::globalMetricRefinement(CkReductionMsg *msg) {
2168   CkAssert(CkMyPe() == 0);
2169   
2170   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::globalMetricRefinement time=\t%g\n", CkMyPe(), CkWallTimer() );
2171
2172   int sumOffset = 0;
2173   int minOffset = numMetrics;
2174   int maxOffset = 2*numMetrics;
2175   int sosOffset = 3*numMetrics; // sos = Sum Of Squares
2176
2177   // calculate statistics & boundaries for the k seeds for clustering
2178   KMeansStatsMessage *outmsg = 
2179     new (numMetrics, numK*numMetrics, numMetrics*4) KMeansStatsMessage;
2180   outmsg->numMetrics = numMetrics;
2181   outmsg->numKPos = numK*numMetrics;
2182   outmsg->numStats = numMetrics*4;
2183
2184   // Sum | Min | Max | Sum of Squares
2185   double *totalExecTimes = (double *)msg->getData();
2186   double totalTime = 0.0;
2187
2188   for (int i=0; i<numMetrics; i++) {
2189     DEBUGN("%lf\n", totalExecTimes[i]);
2190     totalTime += totalExecTimes[i];
2191
2192     // calculate event mean over all processors
2193     outmsg->stats[sumOffset + i] = totalExecTimes[sumOffset + i]/CkNumPes();
2194
2195     // get the ranges and offsets of each metric. With this, we get
2196     //   normalization factors that can be sent back to each processor to
2197     //   be used as necessary. We reuse max for range. Min remains the offset.
2198     outmsg->stats[minOffset + i] = totalExecTimes[minOffset + i];
2199     outmsg->stats[maxOffset + i] = totalExecTimes[maxOffset + i] -
2200       totalExecTimes[minOffset + i];
2201     
2202     // calculate stddev (using biased variance)
2203     outmsg->stats[sosOffset + i] = 
2204       sqrt((totalExecTimes[sosOffset + i] - 
2205             2*(outmsg->stats[i])*totalExecTimes[i] +
2206             (outmsg->stats[i])*(outmsg->stats[i])*CkNumPes())/
2207            CkNumPes());
2208   }
2209
2210   for (int i=0; i<numMetrics; i++) {
2211     // 1) if the proportion of the max value of the entry method relative to
2212     //   the average time taken over all entry methods across all processors
2213     //   is greater than the stipulated percentage threshold ...; AND
2214     // 2) if the range of values are non-zero.
2215     //
2216     // The current assumption is totalTime > 0 (what program has zero total
2217     //   time from all work?)
2218     keepMetric[i] = ((totalExecTimes[maxOffset + i]/(totalTime/CkNumPes()) >=
2219                      entryThreshold) &&
2220       (totalExecTimes[maxOffset + i] > totalExecTimes[minOffset + i]));
2221     if (keepMetric[i]) {
2222       DEBUGF("[%d] Keep EP %d | Max = %lf | Avg Tot = %lf\n", CkMyPe(), i,
2223              totalExecTimes[maxOffset + i], totalTime/CkNumPes());
2224     } else {
2225       DEBUGN("[%d] DO NOT Keep EP %d\n", CkMyPe(), i);
2226     }
2227     outmsg->filter[i] = keepMetric[i];
2228   }
2229
2230   delete msg;
2231   
2232   // initialize k seeds for this phase
2233   kSeeds = new double[numK*numMetrics];
2234
2235   numKReported = 0;
2236   kNumMembers = new int[numK];
2237
2238   // Randomly select k processors' metric vectors for the k seeds
2239   //  srand((unsigned)(CmiWallTimer()*1.0e06));
2240   srand(11337); // for debugging purposes
2241   for (int k=0; k<numK; k++) {
2242     DEBUGF("Seed %d | ", k);
2243     for (int m=0; m<numMetrics; m++) {
2244       double factor = totalExecTimes[maxOffset + m] - 
2245         totalExecTimes[minOffset + m];
2246       // "uniform" distribution, scaled according to the normalization
2247       //   factors
2248       //      kSeeds[numMetrics*k + m] = ((1.0*(k+1))/numK)*factor;
2249       // Random distribution.
2250       kSeeds[numMetrics*k + m] =
2251         ((rand()*1.0)/RAND_MAX)*factor;
2252       if (keepMetric[m]) {
2253         DEBUGF("[%d|%lf] ", m, kSeeds[numMetrics*k + m]);
2254       }
2255       outmsg->kSeedsPos[numMetrics*k + m] = kSeeds[numMetrics*k + m];
2256     }
2257     DEBUGF("\n");
2258     kNumMembers[k] = 0;
2259   }
2260
2261   // broadcast statistical values to all processors for cluster discovery
2262   thisProxy.findInitialClusters(outmsg);
2263 }
2264
2265
2266
2267 // Called on each processor.
2268 void KMeansBOC::findInitialClusters(KMeansStatsMessage *msg) {
2269
2270  if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::findInitialClusters time=\t%g\n", CkMyPe(), CkWallTimer() );
2271
2272   phaseIter = 0;
2273
2274   // Get info from stats message
2275   CkAssert(numMetrics == msg->numMetrics);
2276   for (int i=0; i<numMetrics; i++) {
2277     keepMetric[i] = msg->filter[i];
2278   }
2279
2280   // Normalize data on local processor.
2281   // **CWL** See my thesis for detailed discussion of normalization of
2282   //    performance data.
2283   // **NOTE** This might change if we want to send data based on the filter
2284   //   instead of all the data.
2285   CkAssert(numMetrics*4 == msg->numStats);
2286   for (int i=0; i<numMetrics; i++) {
2287     currentExecTimes[i] -= msg->stats[numMetrics + i];  // take offset
2288     // **CWL** We do not normalize the range. Entry methods that exhibit
2289     //   large absolute timing variations should be allowed to contribute
2290     //   more to the Euclidean distance measure!
2291     // currentExecTimes[i] /= msg->stats[2*numMetrics + i];
2292   }
2293
2294   // **NOTE** This might change if we want to send data based on the filter
2295   //   instead of all the data.
2296   CkAssert(numK*numMetrics == msg->numKPos);
2297   for (int i=0; i<msg->numKPos; i++) {
2298     incKSeeds[i] = msg->kSeedsPos[i];
2299   }
2300
2301   // Decide which KSeed this processor belongs to.
2302   minDistance = calculateDistance(0);
2303   DEBUGN("[%d] Phase %d Iter %d | Distance from 0 = %lf \n", CkMyPe(), 
2304            currentPhase, phaseIter, minDistance);
2305   minK = 0;
2306   for (int i=1; i<numK; i++) {
2307     double distance = calculateDistance(i);
2308     DEBUGN("[%d] Phase %d Iter %d | Distance from %d = %lf \n", CkMyPe(), 
2309              currentPhase, phaseIter, i, distance);
2310     if (distance < minDistance) {
2311       minDistance = distance;
2312       minK = i;
2313     }
2314   }
2315
2316   // Set up a reduction with the modification vector to the root (0).
2317   //
2318   // The modification vector sends a negative value for each metric
2319   //   for the K this processor no longer belongs to and a positive
2320   //   value to the K the processor now belongs. In addition, a -1.0
2321   //   is sent to the K it is leaving and a +1.0 to the K it is 
2322   //   joining.
2323   //
2324   // The processor must still contribute a "zero returns" even if
2325   //   nothing changes. This will be the basis for determine
2326   //   convergence at the root.
2327   //
2328   // The addtional +1 is meant for the count-change that must be
2329   //   maintained for the special cases at the root when some K
2330   //   may be deprived of all processor points or go from 0 to a
2331   //   positive number of processors (see later comments).
2332   double *modVector = new double[numK*(numMetrics+1)];
2333   for (int i=0; i<numK; i++) {
2334     for (int j=0; j<numMetrics+1; j++) {
2335       modVector[i*(numMetrics+1) + j] = 0.0;
2336     }
2337   }
2338   for (int i=0; i<numMetrics; i++) {
2339     // for this initialization, only positive values need be sent.
2340     modVector[minK*(numMetrics+1) + i] = currentExecTimes[i];
2341   }
2342   modVector[minK*(numMetrics+1)+numMetrics] = 1.0;
2343
2344   CkCallback cb(CkIndex_KMeansBOC::updateKSeeds(NULL), 
2345                 0, thisProxy);
2346   contribute(numK*(numMetrics+1)*sizeof(double), modVector, 
2347              CkReduction::sum_double, cb);  
2348 }
2349
2350 double KMeansBOC::calculateDistance(int k) {
2351   double ret = 0.0;
2352   for (int i=0; i<numMetrics; i++) {
2353     if (keepMetric[i]) {
2354       DEBUGN("[%d] Phase %d Iter %d Metric %d Exec %lf Seed %lf \n", 
2355              CkMyPe(), currentPhase, phaseIter, i,
2356                currentExecTimes[i], incKSeeds[k*numMetrics + i]);
2357       ret += pow(currentExecTimes[i] - incKSeeds[k*numMetrics + i], 2.0);
2358     }
2359   }
2360   return sqrt(ret);
2361 }
2362
2363 void KMeansBOC::updateKSeeds(CkReductionMsg *msg) {
2364   CkAssert(CkMyPe() == 0);
2365
2366   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::updateKSeeds time=\t%g\n", CkMyPe(), CkWallTimer() );
2367
2368   double *modVector = (double *)msg->getData();
2369   // sanity check
2370   CkAssert(numK*(numMetrics+1)*sizeof(double) == msg->getSize());
2371
2372   // A quick convergence test.
2373   bool hasChanges = false;
2374   for (int i=0; i<numK; i++) {
2375     hasChanges = hasChanges || 
2376       (modVector[i*(numMetrics+1) + numMetrics] != 0.0);
2377   }
2378   if (!hasChanges) {
2379     delete msg;
2380     findRepresentatives();
2381   } else {
2382     int overallChange = 0;
2383     for (int i=0; i<numK; i++) {
2384       int change = (int)modVector[i*(numMetrics+1) + numMetrics];
2385       if (change != 0) {
2386         overallChange += change;
2387         // modify the k seeds based on the modification vectors coming in
2388         //
2389         // If a seed initially has no members, its contents do not matter and
2390         //   is simply set to the average of the incoming vector.
2391         // If the change causes a seed to lose all its members, do nothing.
2392         //   Its last-known location is kept to allow it to re-capture
2393         //   membership at the next iteration rather than apply the last
2394         //   changes (which snaps the point unnaturally to 0,0).
2395         // Otherwise, apply the appropriate vector changes.
2396         CkAssert((kNumMembers[i] + change >= 0) &&
2397                  (kNumMembers[i] + change <= CkNumPes()));
2398         if (kNumMembers[i] == 0) {
2399           CkAssert(change > 0);
2400           for (int j=0; j<numMetrics; j++) {
2401             kSeeds[i*numMetrics + j] = modVector[i*(numMetrics+1) + j]/change;
2402           }
2403         } else if (kNumMembers[i] + change == 0) {
2404           // do nothing.
2405         } else {
2406           for (int j=0; j<numMetrics; j++) {
2407             kSeeds[i*numMetrics + j] *= kNumMembers[i];
2408             kSeeds[i*numMetrics + j] += modVector[i*(numMetrics+1) + j];
2409             kSeeds[i*numMetrics + j] /= kNumMembers[i] + change;
2410           }
2411         }
2412         kNumMembers[i] += change;
2413       }
2414       DEBUGN("[%d] Phase %d Iter %d K = %d Membership Count = %d\n",
2415              CkMyPe(), currentPhase, phaseIter, i, kNumMembers[i]);
2416     }
2417     delete msg;
2418
2419     // broadcast the new seed locations.
2420     KSeedsMessage *outmsg = new (numK*numMetrics) KSeedsMessage;
2421     outmsg->numKPos = numK*numMetrics;
2422     for (int i=0; i<numK*numMetrics; i++) {
2423       outmsg->kSeedsPos[i] = kSeeds[i];
2424     }
2425
2426     thisProxy.updateSeedMembership(outmsg);
2427   }
2428 }
2429
2430 // Called on all processors
2431 void KMeansBOC::updateSeedMembership(KSeedsMessage *msg) {
2432
2433   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::updateSeedMembership time=\t%g\n", CkMyPe(), CkWallTimer() );
2434
2435   phaseIter++;
2436
2437   // **NOTE** This might change if we want to send data based on the filter
2438   //   instead of all the data.
2439   CkAssert(numK*numMetrics == msg->numKPos);
2440   for (int i=0; i<msg->numKPos; i++) {
2441     incKSeeds[i] = msg->kSeedsPos[i];
2442   }
2443
2444   // Decide which KSeed this processor belongs to.
2445   lastMinK = minK;
2446   minDistance = calculateDistance(0);
2447   DEBUGN("[%d] Phase %d Iter %d | Distance from 0 = %lf \n", CkMyPe(), 
2448          currentPhase, phaseIter, minDistance);
2449
2450   minK = 0;
2451   for (int i=1; i<numK; i++) {
2452     double distance = calculateDistance(i);
2453     DEBUGN("[%d] Phase %d Iter %d | Distance from %d = %lf \n", CkMyPe(), 
2454            currentPhase, phaseIter, i, distance);
2455     if (distance < minDistance) {
2456       minDistance = distance;
2457       minK = i;
2458     }
2459   }
2460
2461   double *modVector = new double[numK*(numMetrics+1)];
2462   for (int i=0; i<numK; i++) {
2463     for (int j=0; j<numMetrics+1; j++) {
2464       modVector[i*(numMetrics+1) + j] = 0.0;
2465     }
2466   }
2467
2468   if (minK != lastMinK) {
2469     for (int i=0; i<numMetrics; i++) {
2470       modVector[minK*(numMetrics+1) + i] = currentExecTimes[i];
2471       modVector[lastMinK*(numMetrics+1) + i] = -currentExecTimes[i];
2472     }
2473     modVector[minK*(numMetrics+1)+numMetrics] = 1.0;
2474     modVector[lastMinK*(numMetrics+1)+numMetrics] = -1.0;
2475   }
2476
2477   CkCallback cb(CkIndex_KMeansBOC::updateKSeeds(NULL), 
2478                 0, thisProxy);
2479   contribute(numK*(numMetrics+1)*sizeof(double), modVector, 
2480              CkReduction::sum_double, cb);  
2481 }
2482
2483 void KMeansBOC::findRepresentatives() {
2484
2485   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::findRepresentatives time=\t%g\n", CkMyPe(), CkWallTimer() );
2486
2487   int numNonEmptyClusters = 0;
2488   for (int i=0; i<numK; i++) {
2489     if (kNumMembers[i] > 0) {
2490       numNonEmptyClusters++;
2491     }
2492   }
2493
2494   int numRepresentatives = peNumKeep;
2495   // **FIXME**
2496   // This is fairly arbitrary. Next time, choose the centers of the top
2497   //   largest clusters.
2498   if (numRepresentatives < numNonEmptyClusters) {
2499     numRepresentatives = numNonEmptyClusters;
2500   }
2501
2502   int slotsRemaining = numRepresentatives;
2503
2504   DEBUGF("Slots = %d | Non-empty = %d \n", slotsRemaining, 
2505          numNonEmptyClusters);
2506
2507   // determine how many exemplars to select per cluster. Currently
2508   //   hardcoded to 1. Future challenge is to decide on other numbers
2509   //   or proportionality.
2510   //
2511   int exemplarsPerCluster = 1;
2512   slotsRemaining -= exemplarsPerCluster*numNonEmptyClusters;
2513
2514   int numCandidateOutliers = CkNumPes() - 
2515     exemplarsPerCluster*numNonEmptyClusters;
2516
2517   double *remainders = new double[numK];
2518   int *assigned = new int[numK];
2519   exemplarChoicesLeft = new int[numK];
2520   outlierChoicesLeft = new int[numK];
2521
2522   for (int i=0; i<numK; i++) {
2523     assigned[i] = 0;
2524     remainders[i] = 
2525       (kNumMembers[i] - exemplarsPerCluster*numNonEmptyClusters) *
2526       slotsRemaining / numCandidateOutliers;
2527     if (remainders[i] >= 0.0) {
2528       assigned[i] = (int)floor(remainders[i]);
2529       remainders[i] -= assigned[i];
2530     } else {
2531       remainders[i] = 0.0;
2532     }
2533   }
2534
2535   for (int i=0; i<numK; i++) {
2536     slotsRemaining -= assigned[i];
2537   }
2538   CkAssert(slotsRemaining >= 0);
2539
2540   // find clusters to assign the loose slots to, in order of
2541   // remainder proportion
2542   while (slotsRemaining > 0) {
2543     double max = 0.0;
2544     int winner = 0;
2545     for (int i=0; i<numK; i++) {
2546       if (remainders[i] > max) {
2547         max = remainders[i];
2548         winner = i;
2549       }
2550     }
2551     assigned[winner]++;
2552     remainders[winner] = 0.0;
2553     slotsRemaining--;
2554   }
2555
2556   // set up how many reduction cycles of min/max we need to conduct to
2557   // select the representatives.
2558   numSelectionIter = exemplarsPerCluster;
2559   for (int i=0; i<numK; i++) {
2560     if (assigned[i] > numSelectionIter) {
2561       numSelectionIter = assigned[i];
2562     }
2563   }
2564   DEBUGF("Selection Iterations = %d\n", numSelectionIter);
2565
2566   for (int i=0; i<numK; i++) {
2567     if (kNumMembers[i] > 0) {
2568       exemplarChoicesLeft[i] = exemplarsPerCluster;
2569       outlierChoicesLeft[i] = assigned[i];
2570     } else {
2571       exemplarChoicesLeft[i] = 0;
2572       outlierChoicesLeft[i] = 0;
2573     }
2574     DEBUGF("%d | Exemplar = %d | Outlier = %d\n", i, exemplarChoicesLeft[i],
2575            outlierChoicesLeft[i]);
2576   }
2577
2578   delete [] assigned;
2579   delete [] remainders;
2580
2581   // send out first broadcast
2582   KSelectionMessage *outmsg = NULL;
2583   if (numSelectionIter > 0) {
2584     outmsg = new (numK, numK, numK) KSelectionMessage;
2585     outmsg->numKMinIDs = numK;
2586     outmsg->numKMaxIDs = numK;
2587     for (int i=0; i<numK; i++) {
2588       outmsg->minIDs[i] = -1;
2589       outmsg->maxIDs[i] = -1;
2590     }
2591     thisProxy.collectDistances(outmsg);
2592   } else {
2593     CkPrintf("Warning: No selection iteration from the start!\n");
2594     // invoke phase completion on all processors
2595     thisProxy.phaseDone();
2596   }
2597 }
2598
2599 /*
2600  *  lastMin = array of minimum champions of the last tournament
2601  *  lastMax = array of maximum champions of the last tournament
2602  *  lastMaxVal = array of last encountered maximum values, allows previous
2603  *                 minimum winners to eliminate themselves from the next
2604  *                 minimum race.
2605  *
2606  *  Called on all processors.
2607  */
2608 void KMeansBOC::collectDistances(KSelectionMessage *msg) {
2609
2610   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::collectDistances time=\t%g\n", CkMyPe(), CkWallTimer() );
2611
2612   DEBUGF("[%d] %d | min = %d max = %d\n", CkMyPe(),
2613          lastMinK, msg->minIDs[lastMinK], msg->maxIDs[lastMinK]);
2614   if ((CkMyPe() == msg->minIDs[lastMinK]) || 
2615       (CkMyPe() == msg->maxIDs[lastMinK])) {
2616     CkAssert(!selected);
2617     selected = true;
2618   }
2619
2620   // build outgoing reduction structure
2621   //   format = minVal | ID | maxVal | ID
2622   double *minMaxAndIDs = NULL;
2623
2624   minMaxAndIDs = new double[numK*4];
2625   // initialize to the appropriate out-of-band values (for error checks)
2626   for (int i=0; i<numK; i++) {
2627     minMaxAndIDs[i*4] = -1.0; // out-of-band min value
2628     minMaxAndIDs[i*4+1] = -1.0; // out of band ID
2629     minMaxAndIDs[i*4+2] = -1.0; // out-of-band max value
2630     minMaxAndIDs[i*4+3] = -1.0; // out of band ID
2631   }
2632   // If I have not won before, I put myself back into the competition
2633   if (!selected) {
2634     DEBUGF("[%d] My Contribution = %lf\n", CkMyPe(), minDistance);
2635     minMaxAndIDs[lastMinK*4] = minDistance;
2636     minMaxAndIDs[lastMinK*4+1] = CkMyPe();
2637     minMaxAndIDs[lastMinK*4+2] = minDistance;
2638     minMaxAndIDs[lastMinK*4+3] = CkMyPe();
2639   }
2640   delete msg;
2641
2642   CkCallback cb(CkIndex_KMeansBOC::findNextMinMax(NULL), 
2643                 0, thisProxy);
2644   contribute(numK*4*sizeof(double), minMaxAndIDs, 
2645              minMaxReductionType, cb);  
2646 }
2647
2648 void KMeansBOC::findNextMinMax(CkReductionMsg *msg) {
2649   // incoming format:
2650   //   minVal | minID | maxVal | maxID
2651
2652   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::findNextMinMax time=\t%g\n", CkMyPe(), CkWallTimer() );
2653
2654   if (numSelectionIter > 0) {
2655     double *incInfo = (double *)msg->getData();
2656     
2657     KSelectionMessage *outmsg = new (numK, numK) KSelectionMessage;
2658     outmsg->numKMinIDs = numK;
2659     outmsg->numKMaxIDs = numK;
2660     
2661     for (int i=0; i<numK; i++) {
2662       DEBUGF("%d | %lf %d %lf %d \n", i, 
2663              incInfo[i*4], (int)incInfo[i*4+1], 
2664              incInfo[i*4+2], (int)incInfo[i*4+3]);
2665     }
2666
2667     for (int i=0; i<numK; i++) {
2668       if (exemplarChoicesLeft[i] > 0) {
2669         outmsg->minIDs[i] = (int)incInfo[i*4+1];
2670         exemplarChoicesLeft[i]--;
2671       } else {
2672         outmsg->minIDs[i] = -1;
2673       }
2674       if (outlierChoicesLeft[i] > 0) {
2675         outmsg->maxIDs[i] = (int)incInfo[i*4+3];
2676         outlierChoicesLeft[i]--;
2677       } else {
2678         outmsg->maxIDs[i] = -1;
2679       }
2680     }
2681     thisProxy.collectDistances(outmsg);
2682     numSelectionIter--;
2683   } else {
2684     // invoke phase completion on all processors
2685     thisProxy.phaseDone();
2686   }
2687 }
2688
2689 /**
2690  *  Completion of the K-Means clustering and data selection of one phase
2691  *    of the computation.
2692  *
2693  *  Called on every processor.
2694  */
2695 void KMeansBOC::phaseDone() {
2696
2697   //  if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::phaseDone time=\t%g\n", CkMyPe(), CkWallTimer() );
2698
2699   LogPool *pool = CkpvAccess(_trace)->_logPool;
2700   CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
2701
2702   // now decide on what to do with the decision.
2703   if (!selected) {
2704     if (usePhases) {
2705       pool->keepPhase[currentPhase] = false;
2706     } else {
2707       // if not using phases, we're working on the whole log
2708       pool->setAllPhases(false);
2709     }
2710   }
2711
2712   // **FIXME** (?) - All processors have to agree on this, or the reduction
2713   //   will not be correct! The question is "is this enforcible?"
2714   if ((currentPhase == (pool->numPhases-1)) || !usePhases) {
2715     // We're done
2716     int dummy = 0;
2717     CkCallback cb(CkIndex_TraceProjectionsBOC::kMeansDone(NULL), 
2718                   0, bocProxy);
2719     contribute(sizeof(int), &dummy, CkReduction::sum_int, cb);
2720   } else {
2721     // reset all phase-based k-means data and decisions
2722
2723     // **FIXME**!!!!!    
2724     
2725     // invoke the next K-Means computation phase.
2726     currentPhase++;
2727     thisProxy[CkMyPe()].getNextPhaseMetrics();
2728   }
2729 }
2730
2731 void TraceProjectionsBOC::startTimeAnalysis()
2732 {
2733   double startTime = 0.0;
2734   if (CkpvAccess(_trace)->_logPool->numEntries>0)
2735      startTime = CkpvAccess(_trace)->_logPool->pool[0].time;
2736   CkCallback cb(CkIndex_TraceProjectionsBOC::startTimeDone(NULL), thisProxy);
2737   contribute(sizeof(double), &startTime, CkReduction::min_double, cb);  
2738 }
2739
2740 void TraceProjectionsBOC::startTimeDone(CkReductionMsg *msg)
2741 {
2742   // CkPrintf("[%d] TraceProjectionsBOC::startTimeDone time=\t%g parModulesRemaining:%d\n", CkMyPe(), CkWallTimer(), parModulesRemaining);
2743
2744   if (CkpvAccess(_trace) != NULL) {
2745     CkpvAccess(_trace)->_logPool->globalStartTime = *(double *)msg->getData();
2746     CkpvAccess(_trace)->_logPool->setNewStartTime();
2747     //if (CkMyPe() == 0) CkPrintf("Start time determined to be %lf us\n", (CkpvAccess(_trace)->_logPool->globalStartTime)*1e06);
2748   }
2749   delete msg;
2750   thisProxy[CkMyPe()].startEndTimeAnalysis();
2751 }
2752
2753 void TraceProjectionsBOC::startEndTimeAnalysis()
2754 {
2755  //CkPrintf("[%d] TraceProjectionsBOC::startEndTimeAnalysis time=\t%g\n", CkMyPe(), CkWallTimer() );
2756
2757   endTime = CkpvAccess(_trace)->endTime;
2758   // CkPrintf("[%d] End time is %lf us\n", CkMyPe(), endTime*1e06);
2759
2760   CkCallback cb(CkIndex_TraceProjectionsBOC::endTimeDone(NULL), 
2761                 0, thisProxy);
2762   contribute(sizeof(double), &endTime, CkReduction::max_double, cb);  
2763 }
2764
2765 void TraceProjectionsBOC::endTimeDone(CkReductionMsg *msg)
2766 {
2767  //if(CkMyPe()==0)    CkPrintf("[%d] TraceProjectionsBOC::endTimeDone time=\t%g parModulesRemaining:%d\n", CkMyPe(), CkWallTimer(), parModulesRemaining);
2768
2769   CkAssert(CkMyPe() == 0);
2770   parModulesRemaining--;
2771   if (CkpvAccess(_trace) != NULL) {
2772     CkpvAccess(_trace)->_logPool->globalEndTime = *(double *)msg->getData() - CkpvAccess(_trace)->_logPool->globalStartTime;
2773     // CkPrintf("End time determined to be %lf us\n",
2774     //       (CkpvAccess(_trace)->_logPool->globalEndTime)*1e06);
2775   }
2776   delete msg;
2777   if (parModulesRemaining == 0) {
2778     thisProxy[CkMyPe()].finalize();
2779   }
2780 }
2781
2782 void TraceProjectionsBOC::kMeansDone(CkReductionMsg *msg) {
2783
2784  if(CkMyPe()==0)  CkPrintf("[%d] TraceProjectionsBOC::kMeansDone time=\t%g\n", CkMyPe(), CkWallTimer() );
2785
2786   CkAssert(CkMyPe() == 0);
2787   parModulesRemaining--;
2788   CkPrintf("K-Means Analysis Time = %lf seconds\n",
2789            CmiWallTimer()-analysisStartTime);
2790   delete msg;
2791   if (parModulesRemaining == 0) {
2792     thisProxy[CkMyPe()].finalize();
2793   }
2794 }
2795
2796 /**
2797  *
2798  *  This version is called (on processor 0) only if flushCheck fails.
2799  *
2800  */
2801 void TraceProjectionsBOC::kMeansDone() {
2802   CkAssert(CkMyPe() == 0);
2803   parModulesRemaining--;
2804   CkPrintf("K-Means Analysis Aborted because of flush. Time taken = %lf seconds\n",
2805            CmiWallTimer()-analysisStartTime);
2806   if (parModulesRemaining == 0) {
2807     thisProxy[CkMyPe()].finalize();
2808   }
2809 }
2810
2811 void TraceProjectionsBOC::finalize()
2812 {
2813   CkAssert(CkMyPe() == 0);
2814   //CkPrintf("Total Analysis Time = %lf seconds\n", 
2815   //       CmiWallTimer()-analysisStartTime);
2816   thisProxy.closingTraces();
2817 }
2818
2819 // Called on every processor
2820 void TraceProjectionsBOC::closingTraces() {
2821   CkpvAccess(_trace)->closeTrace();
2822
2823     // subtle:  reduction needs to go to the PE which started CkExit()
2824   int pe = 0;
2825   if (endPe != -1) pe = endPe;
2826   CkCallback cb(CkIndex_TraceProjectionsBOC::closeParallelShutdown(NULL), 
2827                 pe, thisProxy); 
2828   contribute(0, NULL, CkReduction::sum_int, cb);  
2829 }
2830
2831 // The sole purpose of this reduction is to decide whether or not
2832 //   Projections as a module needs to call CkExit() to get other
2833 //   modules to shutdown.
2834 void TraceProjectionsBOC::closeParallelShutdown(CkReductionMsg *msg) {
2835   CkAssert(endPe == -1 && CkMyPe() ==0 || CkMyPe() == endPe);
2836   delete msg;
2837   // decide if CkExit() needs to be called
2838   if (!CkpvAccess(_trace)->converseExit) {
2839     CkExit();
2840   }
2841 }
2842 /*
2843  *  Registration and definition of the Outlier Reduction callback.
2844  *  Format: Sum | Min | Max | Sum of Squares
2845  */
2846 CkReductionMsg *outlierReduction(int nMsgs,
2847                                  CkReductionMsg **msgs) {
2848   int numBytes = 0;
2849   int numMetrics = 0;
2850   double *ret = NULL;
2851
2852   if (nMsgs == 1) {
2853     // nothing to do, just pass it on
2854     return CkReductionMsg::buildNew(msgs[0]->getSize(),msgs[0]->getData());
2855   }
2856
2857   if (nMsgs > 1) {
2858     numBytes = msgs[0]->getSize();
2859     // sanity checks
2860     if (numBytes%sizeof(double) != 0) {
2861       CkAbort("Outlier Reduction Size incompatible with doubles!\n");
2862     }
2863     if ((numBytes/sizeof(double))%4 != 0) {
2864       CkAbort("Outlier Reduction Size Array not divisible by 4!\n");
2865     }
2866     numMetrics = (numBytes/sizeof(double))/4;
2867     ret = new double[numMetrics*4];
2868
2869     // copy the first message data into the return structure first
2870     for (int i=0; i<numMetrics*4; i++) {
2871       ret[i] = ((double *)msgs[0]->getData())[i];
2872     }
2873
2874     // Sum | Min | Max | Sum of Squares
2875     for (int msgIdx=1; msgIdx<nMsgs; msgIdx++) {
2876       for (int i=0; i<numMetrics; i++) {
2877         // Sum
2878         ret[i] += ((double *)msgs[msgIdx]->getData())[i];
2879         // Min
2880         ret[numMetrics + i] =
2881           (ret[numMetrics + i] < 
2882            ((double *)msgs[msgIdx]->getData())[numMetrics + i]) 
2883           ? ret[numMetrics + i] : 
2884           ((double *)msgs[msgIdx]->getData())[numMetrics + i];
2885         // Max
2886         ret[2*numMetrics + i] = 
2887           (ret[2*numMetrics + i] >
2888            ((double *)msgs[msgIdx]->getData())[2*numMetrics + i])
2889           ? ret[2*numMetrics + i] :
2890           ((double *)msgs[msgIdx]->getData())[2*numMetrics + i];
2891         // Sum of Squares (squaring already done at leaf)
2892         ret[3*numMetrics + i] +=
2893           ((double *)msgs[msgIdx]->getData())[3*numMetrics + i];
2894       }
2895     }
2896   }
2897   
2898   /* apparently, we do not delete the incoming messages */
2899   return CkReductionMsg::buildNew(numBytes,ret);
2900 }
2901
2902 /*
2903  * The only reason we have a user-defined reduction is to support
2904  *   identification of the "winning" processors as well as to handle
2905  *   both the min and the max of each "tournament". A simple min/max
2906  *   discovery cannot handle ties.
2907  */
2908 CkReductionMsg *minMaxReduction(int nMsgs,
2909                                 CkReductionMsg **msgs) {
2910   CkAssert(nMsgs > 0);
2911
2912   int numBytes = msgs[0]->getSize();
2913   CkAssert(numBytes%sizeof(double) == 0);
2914   int numK = (numBytes/sizeof(double))/4;
2915
2916   double *ret = new double[numK*4];
2917   // fill with out-of-band values
2918   for (int i=0; i<numK; i++) {
2919     ret[i*4] = -1.0;
2920     ret[i*4+1] = -1.0;
2921     ret[i*4+2] = -1.0;
2922     ret[i*4+3] = -1.0;
2923   }
2924
2925   // incoming format K * (minVal | minIdx | maxVal | maxIdx)
2926   for (int i=0; i<nMsgs; i++) {
2927     double *temp = (double *)msgs[i]->getData();
2928     for (int j=0; j<numK; j++) {
2929       // no previous valid min
2930       if (ret[j*4+1] < 0) {
2931         // fill it in only if the incoming min is valid
2932         if (temp[j*4+1] >= 0) {
2933           ret[j*4] = temp[j*4];      // fill min value
2934           ret[j*4+1] = temp[j*4+1];  // fill ID
2935         }
2936       } else {
2937         // find Min, only if incoming min is valid
2938         if (temp[j*4+1] >= 0) {
2939           if (temp[j*4] < ret[j*4]) {
2940             ret[j*4] = temp[j*4];      // replace min value
2941             ret[j*4+1] = temp[j*4+1];  // replace ID
2942           }
2943         }
2944       }
2945       // no previous valid max
2946       if (ret[j*4+3] < 0) {
2947         // fill only if the incoming max is valid
2948         if (temp[j*4+3] >= 0) {
2949           ret[j*4+2] = temp[j*4+2];  // fill max value
2950           ret[j*4+3] = temp[j*4+3];  // fill ID
2951         }
2952       } else {
2953         // find Max, only if incoming max is valid
2954         if (temp[j*4+3] >= 0) {
2955           if (temp[j*4+2] > ret[j*4+2]) {
2956             ret[j*4+2] = temp[j*4+2];  // replace max value
2957             ret[j*4+3] = temp[j*4+3];  // replace ID
2958           }
2959         }
2960       }
2961     }
2962   }
2963   CkReductionMsg *redmsg = CkReductionMsg::buildNew(numBytes, ret);
2964   delete [] ret;
2965   return redmsg;
2966 }
2967
2968 #include "TraceProjections.def.h"
2969 #endif //PROJ_ANALYSIS
2970
2971 /*@}*/