Merge branch 'charm' of charmgit:charm into xiang/optChkp
[charm.git] / src / ck-perf / trace-projections.C
1 /**
2  * \addtogroup CkPerf
3 */
4 /*@{*/
5
6 #include <string.h>
7
8 #include "charm++.h"
9 #include "trace-projections.h"
10 #include "trace-projectionsBOC.h"
11 #include "TopoManager.h"
12
13 #if DEBUG_PROJ
14 #define DEBUGF(...) CkPrintf(__VA_ARGS__)
15 #else
16 #define DEBUGF(...)
17 #endif
18 #define DEBUGN(...)  // easy way to selectively disable DEBUGs
19
20 #define DefaultLogBufSize      1000000
21
22 // **CW** Simple delta encoding implementation
23 // delta encoding is on by default. It may be turned off later in
24 // the runtime.
25 int deltaLog;
26 int nonDeltaLog;
27
28 int checknested=0;              // check illegal nested begin/end execute 
29
30 #ifdef PROJ_ANALYSIS
31 // BOC operations readonlys
32 CkGroupID traceProjectionsGID;
33 CkGroupID kMeansGID;
34
35 // New reduction type for Outlier Analysis purposes. This is allowed to be
36 // a global variable according to the Charm++ manual.
37 CkReductionMsg *outlierReduction(int nMsgs,
38                                  CkReductionMsg **msgs);
39 CkReductionMsg *minMaxReduction(int nMsgs,
40                                 CkReductionMsg **msgs);
41 CkReduction::reducerType outlierReductionType;
42 CkReduction::reducerType minMaxReductionType;
43 #endif // PROJ_ANALYSIS
44
45 CkpvStaticDeclare(TraceProjections*, _trace);
46 CtvStaticDeclare(int,curThreadEvent);
47
48 CkpvDeclare(CmiInt8, CtrLogBufSize);
49
50 typedef CkVec<char *>  usrEventVec;
51 CkpvStaticDeclare(usrEventVec, usrEventlist);
52 class UsrEvent {
53 public:
54   int e;
55   char *str;
56   UsrEvent(int _e, char* _s): e(_e),str(_s) {}
57 };
58 CkpvStaticDeclare(CkVec<UsrEvent *>*, usrEvents);
59
60 // When tracing is disabled, these are defined as empty static inlines
61 // in the header, to minimize overhead
62 #if CMK_TRACE_ENABLED
63 /// Disable the outputting of the trace logs
64 void disableTraceLogOutput()
65
66   CkpvAccess(_trace)->setWriteData(false);
67 }
68
69 /// Enable the outputting of the trace logs
70 void enableTraceLogOutput()
71 {
72   CkpvAccess(_trace)->setWriteData(true);
73 }
74
75 /// Force the log files to be flushed
76 void flushTraceLog()
77 {
78   CkpvAccess(_trace)->traceFlushLog();
79 }
80 #endif
81
82 #if ! CMK_TRACE_ENABLED
83 static int warned=0;
84 #define OPTIMIZED_VERSION       \
85         if (!warned) { warned=1;        \
86         CmiPrintf("\n\n!!!! Warning: traceUserEvent not available in optimized version!!!!\n\n\n"); }
87 #else
88 #define OPTIMIZED_VERSION /*empty*/
89 #endif // CMK_TRACE_ENABLED
90
91 /*
92 On T3E, we need to have file number control by open/close files only when needed.
93 */
94 #if CMK_TRACE_LOGFILE_NUM_CONTROL
95   #define OPEN_LOG openLog("a");
96   #define CLOSE_LOG closeLog();
97 #else
98   #define OPEN_LOG
99   #define CLOSE_LOG
100 #endif //CMK_TRACE_LOGFILE_NUM_CONTROL
101
102 #if CMK_HAS_COUNTER_PAPI
103 #ifdef USE_SPP_PAPI
104 int papiEvents[NUMPAPIEVENTS];
105 #else
106 int papiEvents[NUMPAPIEVENTS] = { PAPI_L2_DCM, PAPI_FP_OPS };
107 #endif
108 #endif // CMK_HAS_COUNTER_PAPI
109
110 /**
111   For each TraceFoo module, _createTraceFoo() must be defined.
112   This function is called in _createTraces() generated in moduleInit.C
113 */
114 void _createTraceprojections(char **argv)
115 {
116   DEBUGF("%d createTraceProjections\n", CkMyPe());
117   CkpvInitialize(CkVec<char *>, usrEventlist);
118   CkpvInitialize(CkVec<UsrEvent *>*, usrEvents);
119   CkpvAccess(usrEvents) = new CkVec<UsrEvent *>();
120 #if CMK_BIGSIM_CHARM
121   // CthRegister does not call the constructor
122 //  CkpvAccess(usrEvents) = CkVec<UsrEvent *>();
123 #endif //CMK_BIGSIM_CHARM
124   CkpvInitialize(TraceProjections*, _trace);
125   CkpvAccess(_trace) = new  TraceProjections(argv);
126   CkpvAccess(_traces)->addTrace(CkpvAccess(_trace));
127   if (CkMyPe()==0) CkPrintf("Charm++: Tracemode Projections enabled.\n");
128 }
129  
130 /* ****** CW TEMPORARY LOCATION ***** Support for thread listeners */
131
132 struct TraceThreadListener {
133   struct CthThreadListener base;
134   int event;
135   int msgType;
136   int ep;
137   int srcPe;
138   int ml;
139   CmiObjId idx;
140 };
141
142 extern "C"
143 void traceThreadListener_suspend(struct CthThreadListener *l)
144 {
145   TraceThreadListener *a=(TraceThreadListener *)l;
146   /* here, we activate the appropriate trace codes for the appropriate
147      registered modules */
148   traceSuspend();
149 }
150
151 extern "C"
152 void traceThreadListener_resume(struct CthThreadListener *l) 
153 {
154   TraceThreadListener *a=(TraceThreadListener *)l;
155   /* here, we activate the appropriate trace codes for the appropriate
156      registered modules */
157   _TRACE_BEGIN_EXECUTE_DETAILED(a->event,a->msgType,a->ep,a->srcPe,a->ml,
158                                 CthGetThreadID(a->base.thread));
159   a->event=-1;
160   a->srcPe=CkMyPe(); /* potential lie to migrated threads */
161   a->ml=0;
162 }
163
164 extern "C"
165 void traceThreadListener_free(struct CthThreadListener *l) 
166 {
167   TraceThreadListener *a=(TraceThreadListener *)l;
168   delete a;
169 }
170
171 void TraceProjections::traceAddThreadListeners(CthThread tid, envelope *e)
172 {
173 #if CMK_TRACE_ENABLED
174   /* strip essential information from the envelope */
175   TraceThreadListener *a= new TraceThreadListener;
176   
177   a->base.suspend=traceThreadListener_suspend;
178   a->base.resume=traceThreadListener_resume;
179   a->base.free=traceThreadListener_free;
180   a->event=e->getEvent();
181   a->msgType=e->getMsgtype();
182   a->ep=e->getEpIdx();
183   a->srcPe=e->getSrcPe();
184   a->ml=e->getTotalsize();
185
186   CthAddListener(tid, (CthThreadListener *)a);
187 #endif
188 }
189
190 void LogPool::openLog(const char *mode)
191 {
192 #if CMK_PROJECTIONS_USE_ZLIB
193   if(compressed) {
194     if (nonDeltaLog) {
195       do {
196         zfp = gzopen(fname, mode);
197       } while (!zfp && (errno == EINTR || errno == EMFILE));
198       if(!zfp) CmiAbort("Cannot open Projections Compressed Non Delta Trace File for writing...\n");
199     }
200     if (deltaLog) {
201       do {
202         deltazfp = gzopen(dfname, mode);
203       } while (!deltazfp && (errno == EINTR || errno == EMFILE));
204       if (!deltazfp) 
205         CmiAbort("Cannot open Projections Compressed Delta Trace File for writing...\n");
206     }
207   } else {
208     if (nonDeltaLog) {
209       do {
210         fp = fopen(fname, mode);
211       } while (!fp && (errno == EINTR || errno == EMFILE));
212       if (!fp) {
213         CkPrintf("[%d] Attempting to open file [%s]\n",CkMyPe(),fname);
214         CmiAbort("Cannot open Projections Non Delta Trace File for writing...\n");
215       }
216     }
217     if (deltaLog) {
218       do {
219         deltafp = fopen(dfname, mode);
220       } while (!deltafp && (errno == EINTR || errno == EMFILE));
221       if (!deltafp) 
222         CmiAbort("Cannot open Projections Delta Trace File for writing...\n");
223     }
224   }
225 #else
226   if (nonDeltaLog) {
227     do {
228       fp = fopen(fname, mode);
229     } while (!fp && (errno == EINTR || errno == EMFILE));
230     if (!fp) {
231       CkPrintf("[%d] Attempting to open file [%s]\n",CkMyPe(),fname);
232       CmiAbort("Cannot open Projections Non Delta Trace File for writing...\n");
233     }
234   }
235   if (deltaLog) {
236     do {
237       deltafp = fopen(dfname, mode);
238     } while (!deltafp && (errno == EINTR || errno == EMFILE));
239     if(!deltafp) 
240       CmiAbort("Cannot open Projections Delta Trace File for writing...\n");
241   }
242 #endif
243 }
244
245 void LogPool::closeLog(void)
246 {
247 #if CMK_PROJECTIONS_USE_ZLIB
248   if(compressed) {
249     if (nonDeltaLog) gzclose(zfp);
250     if (deltaLog) gzclose(deltazfp);
251     return;
252   }
253 #endif
254   if (nonDeltaLog) { 
255 #if !defined(_WIN32) || defined(__CYGWIN__)
256     fsync(fileno(fp)); 
257 #endif
258     fclose(fp); 
259   }
260   if (deltaLog)  { 
261 #if !defined(_WIN32) || defined(__CYGWIN__)
262     fsync(fileno(deltafp)); 
263 #endif
264     fclose(deltafp);  
265   }
266 }
267
268 LogPool::LogPool(char *pgm) {
269   pool = new LogEntry[CkpvAccess(CtrLogBufSize)];
270   // defaults to writing data (no outlier changes)
271   writeData = true;
272   numEntries = 0;
273   // **CW** for simple delta encoding
274   prevTime = 0.0;
275   timeErr = 0.0;
276   globalStartTime = 0.0;
277   globalEndTime = 0.0;
278   headerWritten = 0;
279   numPhases = 0;
280   hasFlushed = false;
281
282   keepPhase = NULL;
283
284   fileCreated = false;
285   poolSize = CkpvAccess(CtrLogBufSize);
286   pgmname = new char[strlen(pgm)+1];
287   strcpy(pgmname, pgm);
288 }
289
290 void LogPool::createFile(const char *fix)
291 {
292   if (fileCreated) {
293     return;
294   }
295
296   char* filenameLastPart = strrchr(pgmname, PATHSEP) + 1; // Last occurrence of path separator
297   char *pathPlusFilePrefix = new char[1024];
298
299   if(nSubdirs > 0){
300     int sd = CkMyPe() % nSubdirs;
301     char *subdir = new char[1024];
302     sprintf(subdir, "%s.projdir.%d", pgmname, sd);
303     CmiMkdir(subdir);
304     sprintf(pathPlusFilePrefix, "%s%c%s%s", subdir, PATHSEP, filenameLastPart, fix);
305     delete[] subdir;
306   } else {
307     sprintf(pathPlusFilePrefix, "%s%s", pgmname, fix);
308   }
309
310   char pestr[10];
311   sprintf(pestr, "%d", CkMyPe());
312 #if CMK_PROJECTIONS_USE_ZLIB
313   int len;
314   if(compressed)
315     len = strlen(pathPlusFilePrefix)+strlen(".logold")+strlen(pestr)+strlen(".gz")+3;
316   else
317     len = strlen(pathPlusFilePrefix)+strlen(".logold")+strlen(pestr)+3;
318 #else
319   int len = strlen(pathPlusFilePrefix)+strlen(".logold")+strlen(pestr)+3;
320 #endif
321
322   if (nonDeltaLog) {
323     fname = new char[len];
324   }
325   if (deltaLog) {
326     dfname = new char[len];
327   }
328 #if CMK_PROJECTIONS_USE_ZLIB
329   if(compressed) {
330     if (deltaLog && nonDeltaLog) {
331       sprintf(fname, "%s.%s.logold.gz",  pathPlusFilePrefix, pestr);
332       sprintf(dfname, "%s.%s.log.gz", pathPlusFilePrefix, pestr);
333     } else {
334       if (nonDeltaLog) {
335         sprintf(fname, "%s.%s.log.gz", pathPlusFilePrefix,pestr);
336       } else {
337         sprintf(dfname, "%s.%s.log.gz", pathPlusFilePrefix, pestr);
338       }
339     }
340   } else {
341     if (deltaLog && nonDeltaLog) {
342       sprintf(fname, "%s.%s.logold", pathPlusFilePrefix, pestr);
343       sprintf(dfname, "%s.%s.log", pathPlusFilePrefix, pestr);
344     } else {
345       if (nonDeltaLog) {
346         sprintf(fname, "%s.%s.log", pathPlusFilePrefix, pestr);
347       } else {
348         sprintf(dfname, "%s.%s.log", pathPlusFilePrefix, pestr);
349       }
350     }
351   }
352 #else
353   if (deltaLog && nonDeltaLog) {
354     sprintf(fname, "%s.%s.logold", pathPlusFilePrefix, pestr);
355     sprintf(dfname, "%s.%s.log", pathPlusFilePrefix, pestr);
356   } else {
357     if (nonDeltaLog) {
358       sprintf(fname, "%s.%s.log", pathPlusFilePrefix, pestr);
359     } else {
360       sprintf(dfname, "%s.%s.log", pathPlusFilePrefix, pestr);
361     }
362   }
363 #endif
364   fileCreated = true;
365   delete[] pathPlusFilePrefix;
366   openLog("w+");
367   CLOSE_LOG 
368 }
369
370 void LogPool::createSts(const char *fix)
371 {
372   CkAssert(CkMyPe() == 0);
373   // create the sts file
374   char *fname = new char[strlen(CkpvAccess(traceRoot))+strlen(fix)+strlen(".sts")+2];
375   sprintf(fname, "%s%s.sts", CkpvAccess(traceRoot), fix);
376   do
377     {
378       stsfp = fopen(fname, "w");
379     } while (!stsfp && (errno == EINTR || errno == EMFILE));
380   if(stsfp==0){
381     CmiPrintf("Cannot open projections sts file for writing due to %s\n", strerror(errno));
382     CmiAbort("Error!!\n");
383   }
384   delete[] fname;
385 }  
386
387 void LogPool::createTopo(const char *fix)
388 {
389   CkAssert(CkMyPe() == 0);
390   // create the topo file
391   char *fname = new char[strlen(CkpvAccess(traceRoot))+strlen(fix)+strlen(".topo")+2];
392   sprintf(fname, "%s%s.topo", CkpvAccess(traceRoot), fix);
393   do
394     {
395       topofp = fopen(fname, "w");
396     } while (!stsfp && (errno == EINTR || errno == EMFILE));
397   if(stsfp==0){
398     CmiPrintf("Cannot open projections topo file for writing due to %s\n", strerror(errno));
399     CmiAbort("Error!!\n");
400   }
401   delete[] fname;
402 }  
403
404 void LogPool::createRC()
405 {
406   // create the projections rc file.
407   fname = 
408     new char[strlen(CkpvAccess(traceRoot))+strlen(".projrc")+1];
409   sprintf(fname, "%s.projrc", CkpvAccess(traceRoot));
410   do {
411     rcfp = fopen(fname, "w");
412   } while (!rcfp && (errno == EINTR || errno == EMFILE));
413   if (rcfp==0) {
414     CmiAbort("Cannot open projections configuration file for writing.\n");
415   }
416   delete[] fname;
417 }
418
419 LogPool::~LogPool() 
420 {
421   if (writeData) {
422     writeLog();
423 #if !CMK_TRACE_LOGFILE_NUM_CONTROL
424     closeLog();
425 #endif
426   }
427
428 #if CMK_BIGSIM_CHARM
429   extern int correctTimeLog;
430   if (correctTimeLog) {
431     createFile("-bg");
432     if (CkMyPe() == 0) {
433       createSts("-bg");
434     }
435     writeHeader();
436     if (CkMyPe() == 0) writeSts(NULL);
437     postProcessLog();
438   }
439 #endif
440
441   delete[] pool;
442   delete [] fname;
443 }
444
445 void LogPool::writeHeader()
446 {
447   if (headerWritten) return;
448   headerWritten = 1;
449   if(!binary) {
450 #if CMK_PROJECTIONS_USE_ZLIB
451     if(compressed) {
452       if (nonDeltaLog) {
453         gzprintf(zfp, "PROJECTIONS-RECORD %d\n", numEntries);
454       }
455       if (deltaLog) {
456         gzprintf(deltazfp, "PROJECTIONS-RECORD %d DELTA\n", numEntries);
457       }
458     } 
459     else /* else clause is below... */
460 #endif
461     /*... may hang over from else above */ {
462       if (nonDeltaLog) {
463         fprintf(fp, "PROJECTIONS-RECORD %d\n", numEntries);
464       }
465       if (deltaLog) {
466         fprintf(deltafp, "PROJECTIONS-RECORD %d DELTA\n", numEntries);
467       }
468     }
469   }
470   else { // binary
471       if (nonDeltaLog) {
472         fwrite(&numEntries,sizeof(numEntries),1,fp);
473       }
474       if (deltaLog) {
475         fwrite(&numEntries,sizeof(numEntries),1,deltafp);
476       }
477   }
478 }
479
480 void LogPool::writeLog(void)
481 {
482   createFile();
483   OPEN_LOG
484   writeHeader();
485   if (nonDeltaLog) write(0);
486   if (deltaLog) write(1);
487   CLOSE_LOG
488 }
489
490 void LogPool::write(int writedelta) 
491 {
492   // **CW** Simple delta encoding implementation
493   // prevTime has to be maintained as an object variable because
494   // LogPool::write may be called several times depending on the
495   // +logsize value.
496   PUP::er *p = NULL;
497   if (binary) {
498     p = new PUP::toDisk(writedelta?deltafp:fp);
499   }
500 #if CMK_PROJECTIONS_USE_ZLIB
501   else if (compressed) {
502     p = new toProjectionsGZFile(writedelta?deltazfp:zfp);
503   }
504 #endif
505   else {
506     p = new toProjectionsFile(writedelta?deltafp:fp);
507   }
508   CmiAssert(p);
509   int curPhase = 0;
510   // **FIXME** - Should probably consider a more sophisticated bounds-based
511   //   approach for selective writing instead of making multiple if-checks
512   //   for every single event.
513   for(UInt i=0; i<numEntries; i++) {
514     if (!writedelta) {
515       if (keepPhase == NULL) {
516         // default case, when no phase selection is required.
517         pool[i].pup(*p);
518       } else {
519         // **FIXME** Might be a good idea to create a "filler" event block for
520         //   all the events taken out by phase filtering.
521         if (pool[i].type == END_PHASE) {
522           // always write phase markers
523           pool[i].pup(*p);
524           curPhase++;
525         } else if (pool[i].type == BEGIN_COMPUTATION ||
526                    pool[i].type == END_COMPUTATION) {
527           // always write BEGIN and END COMPUTATION markers
528           pool[i].pup(*p);
529         } else if (keepPhase[curPhase]) {
530           pool[i].pup(*p);
531         }
532       }
533     }
534     else {      // delta
535       // **FIXME** Implement phase-selective writing for delta logs
536       //   eventually
537       double time = pool[i].time;
538       if (pool[i].type != BEGIN_COMPUTATION && pool[i].type != END_COMPUTATION)
539       {
540         double timeDiff = (time-prevTime)*1.0e6;
541         UInt intTimeDiff = (UInt)timeDiff;
542         timeErr += timeDiff - intTimeDiff; /* timeErr is never >= 2.0 */
543         if (timeErr > 1.0) {
544           timeErr -= 1.0;
545           intTimeDiff++;
546         }
547         pool[i].time = intTimeDiff/1.0e6;
548       }
549       pool[i].pup(*p);
550       pool[i].time = time;      // restore time value
551       prevTime = time;
552     }
553   }
554   delete p;
555   delete [] keepPhase;
556 }
557
558 void LogPool::writeSts(void)
559 {
560   // for whining compilers
561   int i;
562   // generate an automatic unique ID for each log
563   fprintf(stsfp, "PROJECTIONS_ID %s\n", "");
564   fprintf(stsfp, "VERSION %s\n", PROJECTION_VERSION);
565   fprintf(stsfp, "TOTAL_PHASES %d\n", numPhases);
566 #if CMK_HAS_COUNTER_PAPI
567   fprintf(stsfp, "TOTAL_PAPI_EVENTS %d\n", NUMPAPIEVENTS);
568   // for now, use i, next time use papiEvents[i].
569   // **CW** papi event names is a hack.
570   char eventName[PAPI_MAX_STR_LEN];
571   for (i=0;i<NUMPAPIEVENTS;i++) {
572     PAPI_event_code_to_name(papiEvents[i], eventName);
573     fprintf(stsfp, "PAPI_EVENT %d %s\n", i, eventName);
574   }
575 #endif
576   traceWriteSTS(stsfp,CkpvAccess(usrEvents)->length());
577   for(i=0;i<CkpvAccess(usrEvents)->length();i++){
578     fprintf(stsfp, "EVENT %d %s\n", (*CkpvAccess(usrEvents))[i]->e, (*CkpvAccess(usrEvents))[i]->str);
579   }     
580 }
581
582 void LogPool::writeSts(TraceProjections *traceProj){
583   writeSts();
584   if (traceProj != NULL) {
585     CkHashtableIterator  *funcIter = traceProj->getfuncIterator();
586     funcIter->seekStart();
587     int numFuncs = traceProj->getFuncNumber();
588     fprintf(stsfp,"TOTAL_FUNCTIONS %d \n",numFuncs);
589     while(funcIter->hasNext()) {
590       StrKey *key;
591       int *obj = (int *)funcIter->next((void **)&key);
592       fprintf(stsfp,"FUNCTION %d %s \n",*obj,key->getStr());
593     }
594   }
595   fprintf(stsfp, "END\n");
596   fclose(stsfp);
597 }
598
599 void LogPool::writeRC(void)
600 {
601     //CkPrintf("write RC is being executed\n");
602 #ifdef PROJ_ANALYSIS  
603     CkAssert(CkMyPe() == 0);
604     fprintf(rcfp,"RC_GLOBAL_START_TIME %lld\n",
605             (CMK_PUP_LONG_LONG)(1.0e6*globalStartTime));
606     fprintf(rcfp,"RC_GLOBAL_END_TIME   %lld\n",
607             (CMK_PUP_LONG_LONG)(1.0e6*globalEndTime));
608     /* //Yanhua comment it because isOutlierAutomatic is not a variable in trace
609     if (CkpvAccess(_trace)->isOutlierAutomatic()) {
610       fprintf(rcfp,"RC_OUTLIER_FILTERED true\n");
611     } else {
612       fprintf(rcfp,"RC_OUTLIER_FILTERED false\n");
613     }
614     */
615 #endif //PROJ_ANALYSIS
616   fclose(rcfp);
617 }
618
619 void LogPool::writeTopo(void)
620 {
621   TopoManager tmgr;
622   tmgr.printAllocation(topofp);
623   fclose(topofp);
624 }
625
626 #if CMK_BIGSIM_CHARM
627 static void updateProjLog(void *data, double t, double recvT, void *ptr)
628 {
629   LogEntry *log = (LogEntry *)data;
630   FILE *fp = *(FILE **)ptr;
631   log->time = t;
632   log->recvTime = recvT<0.0?0:recvT;
633 //  log->write(fp);
634   toProjectionsFile p(fp);
635   log->pup(p);
636 }
637 #endif
638
639 // flush log entries to disk
640 void LogPool::flushLogBuffer()
641 {
642   if (numEntries) {
643     double writeTime = TraceTimer();
644     writeLog();
645     hasFlushed = true;
646     numEntries = 0;
647     new (&pool[numEntries++]) LogEntry(writeTime, BEGIN_INTERRUPT);
648     new (&pool[numEntries++]) LogEntry(TraceTimer(), END_INTERRUPT);
649     //CkPrintf("Warning: Projections log flushed to disk on PE %d.\n", CkMyPe());
650     if (!traceProjectionsGID.isZero()) {    // report flushing events to PE 0
651       CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
652       bocProxy[0].flush_warning(CkMyPe());
653     }
654   }
655 }
656
657 void LogPool::add(UChar type, UShort mIdx, UShort eIdx,
658                   double time, int event, int pe, int ml, CmiObjId *id, 
659                   double recvT, double cpuT, int numPe)
660 {
661   new (&pool[numEntries++])
662     LogEntry(time, type, mIdx, eIdx, event, pe, ml, id, recvT, cpuT, numPe);
663   if ((type == END_PHASE) || (type == END_COMPUTATION)) {
664     numPhases++;
665   }
666   if(poolSize==numEntries) {
667     flushLogBuffer();
668 #if CMK_BIGSIM_CHARM
669     extern int correctTimeLog;
670     if (correctTimeLog) CmiAbort("I/O interrupt!\n");
671 #endif
672   }
673 #if CMK_BIGSIM_CHARM
674   switch (type) {
675     case BEGIN_PROCESSING:
676       pool[numEntries-1].recvTime = BgGetRecvTime();
677     case END_PROCESSING:
678     case BEGIN_COMPUTATION:
679     case END_COMPUTATION:
680     case CREATION:
681     case BEGIN_PACK:
682     case END_PACK:
683     case BEGIN_UNPACK:
684     case END_UNPACK:
685     case USER_EVENT_PAIR:
686       bgAddProjEvent(&pool[numEntries-1], numEntries-1, time, updateProjLog, &fp, BG_EVENT_PROJ);
687   }
688 #endif
689 }
690
691 void LogPool::add(UChar type,double time,UShort funcID,int lineNum,char *fileName){
692 #ifndef CMK_BIGSIM_CHARM
693   new (&pool[numEntries++])
694         LogEntry(time,type,funcID,lineNum,fileName);
695   if(poolSize == numEntries){
696     flushLogBuffer();
697   }
698 #endif  
699 }
700
701
702   
703 void LogPool::addMemoryUsage(unsigned char type,double time,double memUsage){
704 #ifndef CMK_BIGSIM_CHARM
705   new (&pool[numEntries++])
706         LogEntry(type,time,memUsage);
707   if(poolSize == numEntries){
708     flushLogBuffer();
709   }
710 #endif  
711         
712 }  
713
714
715
716 void LogPool::addUserSupplied(int data){
717         // add an event
718         add(USER_SUPPLIED, 0, 0, TraceTimer(), -1, -1, 0, 0, 0, 0, 0 );
719
720         // set the user supplied value for the previously created event 
721         pool[numEntries-1].setUserSuppliedData(data);
722   }
723
724
725 void LogPool::addUserSuppliedNote(char *note){
726         // add an event
727         add(USER_SUPPLIED_NOTE, 0, 0, TraceTimer(), -1, -1, 0, 0, 0, 0, 0 );
728
729         // set the user supplied note for the previously created event 
730         pool[numEntries-1].setUserSuppliedNote(note);
731   }
732
733 void LogPool::addUserSuppliedBracketedNote(char *note, int eventID, double bt, double et){
734   //CkPrintf("LogPool::addUserSuppliedBracketedNote eventID=%d\n", eventID);
735 #ifndef CMK_BIGSIM_CHARM
736 #if MPI_TRACE_MACHINE_HACK
737   //This part of code is used  to combine the contiguous
738   //MPI_Test and MPI_Iprobe events to reduce the number of
739   //entries
740 #define MPI_TEST_EVENT_ID 60
741 #define MPI_IPROBE_EVENT_ID 70 
742   int lastEvent = pool[numEntries-1].event;
743   if((eventID==MPI_TEST_EVENT_ID || eventID==MPI_IPROBE_EVENT_ID) && (eventID==lastEvent)){
744     //just replace the endtime of last event
745     //CkPrintf("addUserSuppliedBracketNote: for event %d\n", lastEvent);
746     pool[numEntries].endTime = et;
747   }else{
748     new (&pool[numEntries++])
749       LogEntry(bt, et, USER_SUPPLIED_BRACKETED_NOTE, note, eventID);
750   }
751 #else
752   new (&pool[numEntries++])
753     LogEntry(bt, et, USER_SUPPLIED_BRACKETED_NOTE, note, eventID);
754 #endif
755   if(poolSize == numEntries){
756     flushLogBuffer();
757   }
758 #endif  
759 }
760
761
762 /* **CW** Not sure if this is the right thing to do. Feels more like
763    a hack than a solution to Sameer's request to add the destination
764    processor information to multicasts and broadcasts.
765
766    In the unlikely event this method is used for Broadcasts as well,
767    pelist == NULL will be used to indicate a global broadcast with 
768    num PEs.
769 */
770 void LogPool::addCreationMulticast(UShort mIdx, UShort eIdx, double time,
771                                    int event, int pe, int ml, CmiObjId *id,
772                                    double recvT, int numPe, int *pelist)
773 {
774   new (&pool[numEntries++])
775     LogEntry(time, mIdx, eIdx, event, pe, ml, id, recvT, numPe, pelist);
776   if(poolSize==numEntries) {
777     flushLogBuffer();
778   }
779 }
780
781 void LogPool::postProcessLog()
782 {
783 #if CMK_BIGSIM_CHARM
784   bgUpdateProj(1);   // event type
785 #endif
786 }
787
788 void LogPool::modLastEntryTimestamp(double ts)
789 {
790   pool[numEntries-1].time = ts;
791   //pool[numEntries-1].cputime = ts;
792 }
793
794 // /** Constructor for a multicast log entry */
795 // 
796 //  THIS WAS MOVED TO trace-projections.h with the other constructors
797 // 
798 // LogEntry::LogEntry(double tm, unsigned short m, unsigned short e, int ev, int p,
799 //           int ml, CmiObjId *d, double rt, int numPe, int *pelist) 
800 // {
801 //     type = CREATION_MULTICAST; mIdx = m; eIdx = e; event = ev; pe = p; time = tm; msglen = ml;
802 //     if (d) id = *d; else {id.id[0]=id.id[1]=id.id[2]=id.id[3]=-1; };
803 //     recvTime = rt; 
804 //     numpes = numPe;
805 //     userSuppliedNote = NULL;
806 //     if (pelist != NULL) {
807 //      pes = new int[numPe];
808 //      for (int i=0; i<numPe; i++) {
809 //        pes[i] = pelist[i];
810 //      }
811 //     } else {
812 //      pes= NULL;
813 //     }
814 // }
815
816 //void LogEntry::addPapi(LONG_LONG_PAPI *papiVals)
817 //{
818 //#if CMK_HAS_COUNTER_PAPI
819 //   memcpy(papiValues, papiVals, sizeof(LONG_LONG_PAPI)*NUMPAPIEVENTS);
820 //#endif
821 //}
822
823
824
825 void LogEntry::pup(PUP::er &p)
826 {
827   int i;
828   CMK_TYPEDEF_UINT8 itime, iEndTime, irecvtime, icputime;
829   char ret = '\n';
830
831   p|type;
832   if (p.isPacking()) itime = (CMK_TYPEDEF_UINT8)(1.0e6*time);
833   if (p.isPacking()) iEndTime = (CMK_TYPEDEF_UINT8)(1.0e6*endTime);
834
835   switch (type) {
836     case USER_EVENT:
837     case USER_EVENT_PAIR:
838       p|mIdx; p|itime; p|event; p|pe;
839       break;
840     case BEGIN_IDLE:
841     case END_IDLE:
842     case BEGIN_PACK:
843     case END_PACK:
844     case BEGIN_UNPACK:
845     case END_UNPACK:
846       p|itime; p|pe; 
847       break;
848     case BEGIN_PROCESSING:
849       if (p.isPacking()) {
850         irecvtime = (CMK_TYPEDEF_UINT8)(recvTime==-1?-1:1.0e6*recvTime);
851         icputime = (CMK_TYPEDEF_UINT8)(1.0e6*cputime);
852       }
853       p|mIdx; p|eIdx; p|itime; p|event; p|pe; 
854       p|msglen; p|irecvtime; 
855       p|id.id[0]; p|id.id[1]; p|id.id[2]; p|id.id[3];
856       p|icputime;
857 #if CMK_HAS_COUNTER_PAPI
858       //p|numPapiEvents;
859       for (i=0; i<NUMPAPIEVENTS; i++) {
860         // not yet!!!
861         //      p|papiIDs[i]; 
862         p|papiValues[i];
863         
864       }
865 #else
866       //p|numPapiEvents;     // non papi version has value 0
867 #endif
868       if (p.isUnpacking()) {
869         recvTime = irecvtime/1.0e6;
870         cputime = icputime/1.0e6;
871       }
872       break;
873     case END_PROCESSING:
874       if (p.isPacking()) icputime = (CMK_TYPEDEF_UINT8)(1.0e6*cputime);
875       p|mIdx; p|eIdx; p|itime; p|event; p|pe; p|msglen; p|icputime;
876 #if CMK_HAS_COUNTER_PAPI
877       //p|numPapiEvents;
878       for (i=0; i<NUMPAPIEVENTS; i++) {
879         // not yet!!!
880         //      p|papiIDs[i];
881         p|papiValues[i];
882       }
883 #else
884       //p|numPapiEvents;  // non papi version has value 0
885 #endif
886       if (p.isUnpacking()) cputime = icputime/1.0e6;
887       break;
888     case USER_SUPPLIED:
889           p|userSuppliedData;
890           p|itime;
891         break;
892     case USER_SUPPLIED_NOTE:
893           p|itime;
894           int length;
895           if (p.isPacking()) length = strlen(userSuppliedNote);
896           p | length;
897           char space;
898           space = ' ';
899           p | space;
900           if (p.isUnpacking()) {
901             userSuppliedNote = new char[length+1];
902             userSuppliedNote[length] = '\0';
903           }
904           PUParray(p,userSuppliedNote, length);
905           break;
906     case USER_SUPPLIED_BRACKETED_NOTE:
907       //CkPrintf("Writting out a USER_SUPPLIED_BRACKETED_NOTE\n");
908           p|itime;
909           p|iEndTime;
910           p|event;
911           int length2;
912           if (p.isPacking()) length2 = strlen(userSuppliedNote);
913           p | length2;
914           char space2;
915           space2 = ' ';
916           p | space2;
917           if (p.isUnpacking()) {
918             userSuppliedNote = new char[length+1];
919             userSuppliedNote[length] = '\0';
920           }
921           PUParray(p,userSuppliedNote, length2);
922           break;
923     case MEMORY_USAGE_CURRENT:
924       p | memUsage;
925       p | itime;
926         break;
927     case CREATION:
928       if (p.isPacking()) irecvtime = (CMK_TYPEDEF_UINT8)(1.0e6*recvTime);
929       p|mIdx; p|eIdx; p|itime;
930       p|event; p|pe; p|msglen; p|irecvtime;
931       if (p.isUnpacking()) recvTime = irecvtime/1.0e6;
932       break;
933     case CREATION_BCAST:
934       if (p.isPacking()) irecvtime = (CMK_TYPEDEF_UINT8)(1.0e6*recvTime);
935       p|mIdx; p|eIdx; p|itime;
936       p|event; p|pe; p|msglen; p|irecvtime; p|numpes;
937       if (p.isUnpacking()) recvTime = irecvtime/1.0e6;
938       break;
939     case CREATION_MULTICAST:
940       if (p.isPacking()) irecvtime = (CMK_TYPEDEF_UINT8)(1.0e6*recvTime);
941       p|mIdx; p|eIdx; p|itime;
942       p|event; p|pe; p|msglen; p|irecvtime; p|numpes;
943       if (p.isUnpacking()) pes = numpes?new int[numpes]:NULL;
944       for (i=0; i<numpes; i++) p|pes[i];
945       if (p.isUnpacking()) recvTime = irecvtime/1.0e6;
946       break;
947     case MESSAGE_RECV:
948       p|mIdx; p|eIdx; p|itime; p|event; p|pe; p|msglen;
949       break;
950
951     case ENQUEUE:
952     case DEQUEUE:
953       p|mIdx; p|itime; p|event; p|pe;
954       break;
955
956     case BEGIN_INTERRUPT:
957     case END_INTERRUPT:
958       p|itime; p|event; p|pe;
959       break;
960
961       // **CW** absolute timestamps are used here to support a quick
962       // way of determining the total time of a run in projections
963       // visualization.
964     case BEGIN_COMPUTATION:
965     case END_COMPUTATION:
966     case BEGIN_TRACE:
967     case END_TRACE:
968       p|itime;
969       break;
970     case BEGIN_FUNC:
971         p | itime;
972         p | mIdx;
973         p | event;
974         if(!p.isUnpacking()){
975                 p(fName,flen-1);
976         }
977         break;
978     case END_FUNC:
979         p | itime;
980         p | mIdx;
981         break;
982     case END_PHASE:
983       p|eIdx; // FIXME: actually the phase ID
984       p|itime;
985       break;
986     default:
987       CmiError("***Internal Error*** Wierd Event %d.\n", type);
988       break;
989   }
990   if (p.isUnpacking()) time = itime/1.0e6;
991   p|ret;
992 }
993
994 TraceProjections::TraceProjections(char **argv): 
995   curevent(0), inEntry(0), computationStarted(0), 
996         converseExit(0), endTime(0.0), traceNestedEvents(0),
997         currentPhaseID(0), lastPhaseEvent(NULL)
998 {
999   //  CkPrintf("Trace projections dummy constructor called on %d\n",CkMyPe());
1000
1001   if (CkpvAccess(traceOnPe) == 0) return;
1002
1003   CtvInitialize(int,curThreadEvent);
1004   CkpvInitialize(CmiInt8, CtrLogBufSize);
1005   CkpvAccess(CtrLogBufSize) = DefaultLogBufSize;
1006   CtvAccess(curThreadEvent)=0;
1007   if (CmiGetArgLongDesc(argv,"+logsize",&CkpvAccess(CtrLogBufSize), 
1008                        "Log entries to buffer per I/O")) {
1009     if (CkMyPe() == 0) {
1010       CmiPrintf("Trace: logsize: %ld\n", CkpvAccess(CtrLogBufSize));
1011     }
1012   }
1013   checknested = 
1014     CmiGetArgFlagDesc(argv,"+checknested",
1015                       "check projections nest begin end execute events");
1016   traceNestedEvents = 
1017     CmiGetArgFlagDesc(argv,"+tracenested",
1018               "trace projections nest begin/end execute events");
1019   int binary = 
1020     CmiGetArgFlagDesc(argv,"+binary-trace",
1021                       "Write log files in binary format");
1022
1023   CmiInt8 nSubdirs = 0;
1024   CmiGetArgLongDesc(argv,"+trace-subdirs", &nSubdirs, "Number of subdirectories into which traces will be written");
1025
1026
1027 #if CMK_PROJECTIONS_USE_ZLIB
1028   int compressed = true;
1029   CmiGetArgFlagDesc(argv,"+gz-trace","Write log files pre-compressed with gzip");
1030   int disableCompressed = CmiGetArgFlagDesc(argv,"+no-gz-trace","Disable writing log files pre-compressed with gzip");
1031   compressed = compressed && !disableCompressed;
1032 #else
1033   // consume the flag so there's no confusing
1034   CmiGetArgFlagDesc(argv,"+gz-trace",
1035                     "Write log files pre-compressed with gzip");
1036   if(CkMyPe() == 0) CkPrintf("Warning> gz-trace is not supported on this machine!\n");
1037 #endif
1038
1039   // **CW** default to non delta log encoding. The user may choose to do
1040   // create both logs (for debugging) or just the old log timestamping
1041   // (for compatibility).
1042   // Generating just the non delta log takes precedence over generating
1043   // both logs (if both arguments appear on the command line).
1044
1045   // switch to OLD log format until everything works // Gengbin
1046   nonDeltaLog = 1;
1047   deltaLog = 0;
1048   deltaLog = CmiGetArgFlagDesc(argv, "+logDelta",
1049                                   "Generate Delta encoded and simple timestamped log files");
1050
1051   _logPool = new LogPool(CkpvAccess(traceRoot));
1052   _logPool->setNumSubdirs(nSubdirs);
1053   _logPool->setBinary(binary);
1054 #if CMK_PROJECTIONS_USE_ZLIB
1055   _logPool->setCompressed(compressed);
1056 #endif
1057   if (CkMyPe() == 0) {
1058     _logPool->createSts();
1059     _logPool->createRC();
1060     _logPool->createTopo();
1061   }
1062   funcCount=1;
1063
1064 #if CMK_HAS_COUNTER_PAPI
1065   // We initialize and create the event sets for use with PAPI here.
1066   int papiRetValue;
1067   if(CkMyRank()==0){
1068     papiRetValue = PAPI_library_init(PAPI_VER_CURRENT);
1069     if (papiRetValue != PAPI_VER_CURRENT) {
1070       CmiAbort("PAPI Library initialization failure!\n");
1071     }
1072 #if CMK_SMP
1073     if(PAPI_thread_init(pthread_self) != PAPI_OK){
1074       CmiAbort("PAPI could not be initialized in SMP mode!\n");
1075     }
1076 #endif
1077   }
1078
1079 #if CMK_SMP
1080   //PAPI_thread_init has to finish before calling PAPI_create_eventset
1081   #if CMK_SMP_TRACE_COMMTHREAD
1082       CmiNodeAllBarrier();
1083   #else
1084       CmiNodeBarrier();
1085   #endif
1086 #endif
1087   // PAPI 3 mandates the initialization of the set to PAPI_NULL
1088   papiEventSet = PAPI_NULL; 
1089   if (PAPI_create_eventset(&papiEventSet) != PAPI_OK) {
1090     CmiAbort("PAPI failed to create event set!\n");
1091   }
1092 #ifdef USE_SPP_PAPI
1093   //  CmiPrintf("Using SPP counters for PAPI\n");
1094   if(PAPI_query_event(PAPI_FP_OPS)==PAPI_OK) {
1095     papiEvents[0] = PAPI_FP_OPS;
1096   }else{
1097     if(CmiMyPe()==0){
1098       CmiAbort("WARNING: PAPI_FP_OPS doesn't exist on this platform!");
1099     }
1100   }
1101   if(PAPI_query_event(PAPI_TOT_INS)==PAPI_OK) {
1102     papiEvents[1] = PAPI_TOT_INS;
1103   }else{
1104     CmiAbort("WARNING: PAPI_TOT_INS doesn't exist on this platform!");
1105   }
1106   int EventCode;
1107   int ret;
1108   ret=PAPI_event_name_to_code("perf::PERF_COUNT_HW_CACHE_LL:MISS",&EventCode);
1109   if(PAPI_query_event(EventCode)==PAPI_OK) {
1110     papiEvents[2] = EventCode;
1111   }else{
1112     CmiAbort("WARNING: perf::PERF_COUNT_HW_CACHE_LL:MISS doesn't exist on this platform!");
1113   }
1114   ret=PAPI_event_name_to_code("DATA_PREFETCHER:ALL",&EventCode);
1115   if(PAPI_query_event(EventCode)==PAPI_OK) {
1116     papiEvents[3] = EventCode;
1117   }else{
1118     CmiAbort("WARNING: DATA_PREFETCHER:ALL doesn't exist on this platform!");
1119   }
1120   if(PAPI_query_event(PAPI_L1_DCA)==PAPI_OK) {
1121     papiEvents[4] = PAPI_L1_DCA;
1122   }else{
1123     CmiAbort("WARNING: PAPI_L1_DCA doesn't exist on this platform!");
1124   }
1125   if(PAPI_query_event(PAPI_TOT_CYC)==PAPI_OK) {
1126     papiEvents[5] = PAPI_TOT_CYC;
1127   }else{
1128     CmiAbort("WARNING: PAPI_TOT_CYC doesn't exist on this platform!");
1129   }
1130 #else
1131   // just uses { PAPI_L2_DCM, PAPI_FP_OPS } the 2 initialized PAPI_EVENTS
1132 #endif
1133   papiRetValue = PAPI_add_events(papiEventSet, papiEvents, NUMPAPIEVENTS);
1134   if (papiRetValue < 0) {
1135     if (papiRetValue == PAPI_ECNFLCT) {
1136       CmiAbort("PAPI events conflict! Please re-assign event types!\n");
1137     } else {
1138       char error_str[PAPI_MAX_STR_LEN];
1139       PAPI_perror(papiRetValue,error_str,PAPI_MAX_STR_LEN);
1140       CmiPrintf("PAPI failed with error %s val %d\n",error_str,papiRetValue);
1141       CmiAbort("PAPI failed to add designated events!\n");
1142     }
1143   }
1144   if(CkMyPe()==0)
1145     {
1146       CmiPrintf("Registered %d PAPI counters:",NUMPAPIEVENTS);
1147       char nameBuf[PAPI_MAX_STR_LEN];
1148       for(int i=0;i<NUMPAPIEVENTS;i++)
1149         {
1150           PAPI_event_code_to_name(papiEvents[i], nameBuf);
1151           CmiPrintf("%s ",nameBuf);
1152         }
1153       CmiPrintf("\n");
1154     }
1155   memset(papiValues, 0, NUMPAPIEVENTS*sizeof(LONG_LONG_PAPI));
1156 #endif
1157 }
1158
1159 int TraceProjections::traceRegisterUserEvent(const char* evt, int e)
1160 {
1161   OPTIMIZED_VERSION
1162   CkAssert(e==-1 || e>=0);
1163   CkAssert(evt != NULL);
1164   int event;
1165   int biggest = -1;
1166   for (int i=0; i<CkpvAccess(usrEvents)->length(); i++) {
1167     int cur = (*CkpvAccess(usrEvents))[i]->e;
1168     if (cur == e) {
1169       //CmiPrintf("%s %s\n", (*CkpvAccess(usrEvents))[i]->str, evt);
1170       if (strcmp((*CkpvAccess(usrEvents))[i]->str, evt) == 0) 
1171         return e;
1172       else
1173         CmiAbort("UserEvent double registered!");
1174     }
1175     if (cur > biggest) biggest = cur;
1176   }
1177   // if no user events have so far been registered. biggest will be -1
1178   // and hence newly assigned event numbers will begin from 0.
1179   if (e==-1) event = biggest+1;  // automatically assign new event number
1180   else event = e;
1181   CkpvAccess(usrEvents)->push_back(new UsrEvent(event,(char *)evt));
1182   return event;
1183 }
1184
1185 void TraceProjections::traceClearEps(void)
1186 {
1187   // In trace-summary, this zeros out the EP bins, to eliminate noise
1188   // from startup.  Here, this isn't useful, since we can do that in
1189   // post-processing
1190 }
1191
1192 void TraceProjections::traceWriteSts(void)
1193 {
1194   if(CkMyPe()==0)
1195     _logPool->writeSts(this);
1196 }
1197
1198 /** 
1199  * **IMPT NOTES**:
1200  *
1201  * This is called when Converse closes during ConverseCommonExit().
1202  * **FIXME**(?) - is this also exposed as a tracing-framework API call?
1203  *
1204  * Some programs bypass CkExit() (like NAMD, which eventually calls
1205  * ConverseExit()), modules like traces will have to pretend to shutdown
1206  * as if CkExit() was called but at the same time avoid making
1207  * subsequent CkExit() calls (which is usually required for allowing
1208  * other modules to shutdown).
1209  *
1210  * Note that we can only get here if CkExit() was not called, since the
1211  * trace module will un-register itself from TraceArray if it did.
1212  *
1213  */
1214 void TraceProjections::traceClose(void)
1215 {
1216 #ifdef PROJ_ANALYSIS
1217   // CkPrintf("CkExit was not called on shutdown on [%d]\n", CkMyPe());
1218
1219   // sets the flag that tells the code not to make the CkExit call later
1220   converseExit = 1;
1221   if (CkMyPe() == 0) {
1222     CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1223     bocProxy.traceProjectionsParallelShutdown(-1);
1224   }
1225   if(CkMyRank() == CkMyNodeSize()){ //communication thread
1226     CkpvAccess(_trace)->endComputation();
1227     delete _logPool;              // will write
1228     // remove myself from traceArray so that no tracing will be called.
1229     CkpvAccess(_traces)->removeTrace(this);
1230   }
1231 #else
1232   // we've already deleted the logpool, so multiple calls to traceClose
1233   // are tolerated.
1234   if (_logPool == NULL) {
1235     return;
1236   }
1237   if(CkMyPe()==0){
1238     _logPool->writeSts(this);
1239   }
1240   CkpvAccess(_trace)->endComputation();
1241   delete _logPool;              // will write
1242   // remove myself from traceArray so that no tracing will be called.
1243   CkpvAccess(_traces)->removeTrace(this);
1244
1245   CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1246   bocProxy.ckLocalBranch()->print_warning();
1247 #endif
1248 }
1249
1250 /**
1251  *  **IMPT NOTES**:
1252  *
1253  *  This is meant to be called internally by the tracing framework.
1254  *
1255  */
1256 void TraceProjections::closeTrace() {
1257   //  CkPrintf("Close Trace called on [%d]\n", CkMyPe());
1258   if (CkMyPe() == 0) {
1259     // CkPrintf("Pe 0 will now write sts and projrc files\n");
1260     _logPool->writeSts(this);
1261     _logPool->writeRC();
1262     _logPool->writeTopo();
1263     // CkPrintf("Pe 0 has now written sts and projrc files\n");
1264
1265     CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1266     bocProxy.ckLocalBranch()->print_warning();
1267   }
1268   delete _logPool;       // will write logs to file
1269
1270 }
1271
1272 #if CMK_SMP_TRACE_COMMTHREAD
1273 void TraceProjections::traceBeginOnCommThread()
1274 {
1275   if (!computationStarted) return;
1276   _logPool->add(BEGIN_TRACE, 0, 0, TraceTimer(), curevent++, CmiNumPes()+CmiMyNode());
1277 }
1278
1279 void TraceProjections::traceEndOnCommThread()
1280 {
1281   _logPool->add(END_TRACE, 0, 0, TraceTimer(), curevent++, CmiNumPes()+CmiMyNode());
1282 }
1283 #endif
1284
1285 void TraceProjections::traceBegin(void)
1286 {
1287   if (!computationStarted) return;
1288   _logPool->add(BEGIN_TRACE, 0, 0, TraceTimer(), curevent++, CkMyPe());
1289 }
1290
1291 void TraceProjections::traceEnd(void)
1292 {
1293   _logPool->add(END_TRACE, 0, 0, TraceTimer(), curevent++, CkMyPe());
1294 }
1295
1296 void TraceProjections::userEvent(int e)
1297 {
1298   if (!computationStarted) return;
1299   _logPool->add(USER_EVENT, e, 0, TraceTimer(),curevent++,CkMyPe());
1300 }
1301
1302 void TraceProjections::userBracketEvent(int e, double bt, double et)
1303 {
1304   if (!computationStarted) return;
1305   // two events record Begin/End of event e.
1306   _logPool->add(USER_EVENT_PAIR, e, 0, TraceTimer(bt), curevent, CkMyPe());
1307   _logPool->add(USER_EVENT_PAIR, e, 0, TraceTimer(et), curevent++, CkMyPe());
1308 }
1309
1310 void TraceProjections::userSuppliedData(int d)
1311 {
1312   if (!computationStarted) return;
1313   _logPool->addUserSupplied(d);
1314 }
1315
1316 void TraceProjections::userSuppliedNote(char *note)
1317 {
1318   if (!computationStarted) return;
1319   _logPool->addUserSuppliedNote(note);
1320 }
1321
1322
1323 void TraceProjections::userSuppliedBracketedNote(char *note, int eventID, double bt, double et)
1324 {
1325   if (!computationStarted) return;
1326   _logPool->addUserSuppliedBracketedNote(note,  eventID,  bt, et);
1327 }
1328
1329 void TraceProjections::memoryUsage(double m)
1330 {
1331   if (!computationStarted) return;
1332   _logPool->addMemoryUsage(MEMORY_USAGE_CURRENT, TraceTimer(), m );
1333   
1334 }
1335
1336
1337 void TraceProjections::creation(envelope *e, int ep, int num)
1338 {
1339   double curTime = TraceTimer();
1340   if (e == 0) {
1341     CtvAccess(curThreadEvent) = curevent;
1342     _logPool->add(CREATION, ForChareMsg, ep, curTime,
1343                   curevent++, CkMyPe(), 0, NULL, 0, 0.0);
1344   } else {
1345     int type=e->getMsgtype();
1346     e->setEvent(curevent);
1347     if (num > 1) {
1348       _logPool->add(CREATION_BCAST, type, ep, curTime,
1349                     curevent++, CkMyPe(), e->getTotalsize(), 
1350                     NULL, 0, 0.0, num);
1351     } else {
1352       _logPool->add(CREATION, type, ep, curTime,
1353                     curevent++, CkMyPe(), e->getTotalsize(), 
1354                     NULL, 0, 0.0);
1355     }
1356   }
1357 }
1358
1359 //This function is only called from a comm thread in SMP mode. 
1360 void TraceProjections::creation(char *msg)
1361 {
1362 #if CMK_SMP_TRACE_COMMTHREAD
1363         // msg must be a charm message
1364     envelope *e = (envelope *)msg;
1365     int ep = e->getEpIdx();
1366     if(_entryTable[ep]->traceEnabled) {
1367         creation(e, ep, 1);
1368         e->setSrcPe(CkMyPe());              // pretend I am the sender
1369     }
1370 #endif
1371 }
1372
1373 void TraceProjections::traceCommSetMsgID(char *msg)
1374 {
1375 #if CMK_SMP_TRACE_COMMTHREAD
1376     // msg must be a charm message
1377     envelope *e = (envelope *)msg;
1378     int ep = e->getEpIdx();
1379     if(_entryTable[ep]->traceEnabled) {
1380         e->setSrcPe(CkMyPe());              // pretend I am the sender
1381         e->setEvent(curevent);
1382     }
1383 #endif
1384 }
1385
1386 void TraceProjections::traceGetMsgID(char *msg, int *pe, int *event)
1387 {
1388     // msg must be a charm message
1389     *pe = *event = -1;
1390     envelope *e = (envelope *)msg;
1391     int ep = e->getEpIdx();
1392     if(_entryTable[ep]->traceEnabled) {
1393         *pe = e->getSrcPe();
1394         *event = e->getEvent();
1395     }
1396 }
1397
1398 void TraceProjections::traceSetMsgID(char *msg, int pe, int event)
1399 {
1400        // msg must be a charm message
1401     envelope *e = (envelope *)msg;
1402     int ep = e->getEpIdx();
1403     if(ep<=0 || ep>=_entryTable.size()) return;
1404     if (e->getSrcPe()<0 || e->getSrcPe()>=CkNumPes()+CkNumNodes()) return;
1405     if (e->getMsgtype()<=0 || e->getMsgtype()>=LAST_CK_ENVELOPE_TYPE) return;
1406     if(_entryTable[ep]->traceEnabled) {
1407         e->setSrcPe(pe);
1408         e->setEvent(event);
1409     }
1410 }
1411
1412 /* **CW** Non-disruptive attempt to add destination PE knowledge to
1413    Communication Library-specific Multicasts via new event 
1414    CREATION_MULTICAST.
1415 */
1416
1417 void TraceProjections::creationMulticast(envelope *e, int ep, int num,
1418                                          int *pelist)
1419 {
1420   double curTime = TraceTimer();
1421   if (e==0) {
1422     CtvAccess(curThreadEvent)=curevent;
1423     _logPool->addCreationMulticast(ForChareMsg, ep, curTime, curevent++,
1424                                    CkMyPe(), 0, 0, 0.0, num, pelist);
1425   } else {
1426     int type=e->getMsgtype();
1427     e->setEvent(curevent);
1428     _logPool->addCreationMulticast(type, ep, curTime, curevent++, CkMyPe(),
1429                                    e->getTotalsize(), 0, 0.0, num, pelist);
1430   }
1431 }
1432
1433 void TraceProjections::creationDone(int num)
1434 {
1435   // modified the creation done time of the last num log entries
1436   // FIXME: totally a hack
1437   double curTime = TraceTimer();
1438   int idx = _logPool->numEntries-1;
1439   while (idx >=0 && num >0 ) {
1440     LogEntry &log = _logPool->pool[idx];
1441     if ((log.type == CREATION) ||
1442         (log.type == CREATION_BCAST) ||
1443         (log.type == CREATION_MULTICAST)) {
1444       log.recvTime = curTime - log.time;
1445       num --;
1446     }
1447     idx--;
1448   }
1449 }
1450
1451 void TraceProjections::beginExecute(CmiObjId *tid)
1452 {
1453 #if CMK_HAS_COUNTER_PAPI
1454   if (PAPI_read(papiEventSet, papiValues) != PAPI_OK) {
1455     CmiAbort("PAPI failed to read at begin execute!\n");
1456   }
1457 #endif
1458   if (checknested && inEntry) CmiAbort("Nested Begin Execute!\n");
1459   execEvent = CtvAccess(curThreadEvent);
1460   execEp = (-1);
1461   _logPool->add(BEGIN_PROCESSING,ForChareMsg,_threadEP,TraceTimer(),
1462                 execEvent,CkMyPe(), 0, tid);
1463 #if CMK_HAS_COUNTER_PAPI
1464   _logPool->addPapi(papiValues);
1465 #endif
1466   inEntry = 1;
1467 }
1468
1469 void TraceProjections::beginExecute(envelope *e)
1470 {
1471   if(e==0) {
1472 #if CMK_HAS_COUNTER_PAPI
1473     if (PAPI_read(papiEventSet, papiValues) != PAPI_OK) {
1474       CmiAbort("PAPI failed to read at begin execute!\n");
1475     }
1476 #endif
1477     if (checknested && inEntry) CmiAbort("Nested Begin Execute!\n");
1478     execEvent = CtvAccess(curThreadEvent);
1479     execEp = (-1);
1480     _logPool->add(BEGIN_PROCESSING,ForChareMsg,_threadEP,TraceTimer(),
1481                   execEvent,CkMyPe(), 0, NULL, 0.0, TraceCpuTimer());
1482 #if CMK_HAS_COUNTER_PAPI
1483     _logPool->addPapi(papiValues);
1484 #endif
1485     inEntry = 1;
1486   } else {
1487     beginExecute(e->getEvent(),e->getMsgtype(),e->getEpIdx(),
1488                  e->getSrcPe(),e->getTotalsize());
1489   }
1490 }
1491
1492 void TraceProjections::beginExecute(char *msg){
1493 #if CMK_SMP_TRACE_COMMTHREAD
1494         //This function is called from comm thread in SMP mode
1495     envelope *e = (envelope *)msg;
1496     int ep = e->getEpIdx();
1497     if(_entryTable[ep]->traceEnabled)
1498                 beginExecute(e);
1499 #endif
1500 }
1501
1502 void TraceProjections::beginExecute(int event, int msgType, int ep, int srcPe,
1503                                     int mlen, CmiObjId *idx)
1504 {
1505   if (traceNestedEvents) {
1506     if (! nestedEvents.isEmpty()) {
1507       endExecuteLocal();
1508     }
1509     nestedEvents.enq(NestedEvent(event, msgType, ep, srcPe, mlen, idx));
1510   }
1511   beginExecuteLocal(event, msgType, ep, srcPe, mlen, idx);
1512 }
1513
1514 void TraceProjections::changeLastEntryTimestamp(double ts)
1515 {
1516   _logPool->modLastEntryTimestamp(ts);
1517 }
1518
1519 void TraceProjections::beginExecuteLocal(int event, int msgType, int ep, int srcPe,
1520                                     int mlen, CmiObjId *idx)
1521 {
1522 #if CMK_HAS_COUNTER_PAPI
1523   if (PAPI_read(papiEventSet, papiValues) != PAPI_OK) {
1524     CmiAbort("PAPI failed to read at begin execute!\n");
1525   }
1526 #endif
1527   if (checknested && inEntry) CmiAbort("Nested Begin Execute!\n");
1528   execEvent=event;
1529   execEp=ep;
1530   execPe=srcPe;
1531   _logPool->add(BEGIN_PROCESSING,msgType,ep,TraceTimer(),event,
1532                 srcPe, mlen, idx, 0.0, TraceCpuTimer());
1533 #if CMK_HAS_COUNTER_PAPI
1534   _logPool->addPapi(papiValues);
1535 #endif
1536   inEntry = 1;
1537 }
1538
1539 void TraceProjections::endExecute(void)
1540 {
1541   if (traceNestedEvents) nestedEvents.deq();
1542   endExecuteLocal();
1543   if (traceNestedEvents) {
1544     if (! nestedEvents.isEmpty()) {
1545       NestedEvent &ne = nestedEvents.peek();
1546       beginExecuteLocal(ne.event, ne.msgType, ne.ep, ne.srcPe, ne.ml, ne.idx);
1547     }
1548   }
1549 }
1550
1551 void TraceProjections::endExecute(char *msg)
1552 {
1553 #if CMK_SMP_TRACE_COMMTHREAD
1554         //This function is called from comm thread in SMP mode
1555     envelope *e = (envelope *)msg;
1556     int ep = e->getEpIdx();
1557     if(_entryTable[ep]->traceEnabled)
1558                 endExecute();
1559 #endif  
1560 }
1561
1562 void TraceProjections::endExecuteLocal(void)
1563 {
1564 #if CMK_HAS_COUNTER_PAPI
1565   if (PAPI_read(papiEventSet, papiValues) != PAPI_OK) {
1566     CmiAbort("PAPI failed to read at end execute!\n");
1567   }
1568 #endif
1569   if (checknested && !inEntry) CmiAbort("Nested EndExecute!\n");
1570   double cputime = TraceCpuTimer();
1571   if(execEp == (-1)) {
1572     _logPool->add(END_PROCESSING, 0, _threadEP, TraceTimer(),
1573                   execEvent, CkMyPe(), 0, NULL, 0.0, cputime);
1574   } else {
1575     _logPool->add(END_PROCESSING, 0, execEp, TraceTimer(),
1576                   execEvent, execPe, 0, NULL, 0.0, cputime);
1577   }
1578 #if CMK_HAS_COUNTER_PAPI
1579   _logPool->addPapi(papiValues);
1580 #endif
1581   inEntry = 0;
1582 }
1583
1584 void TraceProjections::messageRecv(char *env, int pe)
1585 {
1586 #if 0
1587   envelope *e = (envelope *)env;
1588   int msgType = e->getMsgtype();
1589   int ep = e->getEpIdx();
1590 #if 0
1591   if (msgType==NewChareMsg || msgType==NewVChareMsg
1592           || msgType==ForChareMsg || msgType==ForVidMsg
1593           || msgType==BocInitMsg || msgType==NodeBocInitMsg
1594           || msgType==ForBocMsg || msgType==ForNodeBocMsg)
1595     ep = e->getEpIdx();
1596   else
1597     ep = _threadEP;
1598 #endif
1599   _logPool->add(MESSAGE_RECV, msgType, ep, TraceTimer(),
1600                 curevent++, e->getSrcPe(), e->getTotalsize());
1601 #endif
1602 }
1603
1604 void TraceProjections::beginIdle(double curWallTime)
1605 {
1606   _logPool->add(BEGIN_IDLE, 0, 0, TraceTimer(curWallTime), 0, CkMyPe());
1607 }
1608
1609 void TraceProjections::endIdle(double curWallTime)
1610 {
1611   _logPool->add(END_IDLE, 0, 0, TraceTimer(curWallTime), 0, CkMyPe());
1612 }
1613
1614 void TraceProjections::beginPack(void)
1615 {
1616   _logPool->add(BEGIN_PACK, 0, 0, TraceTimer(), 0, CkMyPe());
1617 }
1618
1619 void TraceProjections::endPack(void)
1620 {
1621   _logPool->add(END_PACK, 0, 0, TraceTimer(), 0, CkMyPe());
1622 }
1623
1624 void TraceProjections::beginUnpack(void)
1625 {
1626   _logPool->add(BEGIN_UNPACK, 0, 0, TraceTimer(), 0, CkMyPe());
1627 }
1628
1629 void TraceProjections::endUnpack(void)
1630 {
1631   _logPool->add(END_UNPACK, 0, 0, TraceTimer(), 0, CkMyPe());
1632 }
1633
1634 void TraceProjections::enqueue(envelope *) {}
1635
1636 void TraceProjections::dequeue(envelope *) {}
1637
1638 void TraceProjections::beginComputation(void)
1639 {
1640   computationStarted = 1;
1641
1642   // Executes the callback function provided by the machine
1643   // layer. This is the proper method to register user events in a
1644   // machine layer because projections is a charm++ module.
1645   if (CkpvAccess(traceOnPe) != 0) {
1646     void (*ptr)() = registerMachineUserEvents();
1647     if (ptr != NULL) {
1648       ptr();
1649     }
1650   }
1651 //  CkpvAccess(traceInitTime) = TRACE_TIMER();
1652 //  CkpvAccess(traceInitCpuTime) = TRACE_CPUTIMER();
1653   _logPool->add(BEGIN_COMPUTATION, 0, 0, TraceTimer(), -1, -1);
1654 #if CMK_HAS_COUNTER_PAPI
1655   // we start the counters here
1656   if (PAPI_start(papiEventSet) != PAPI_OK) {
1657     CmiAbort("PAPI failed to start designated counters!\n");
1658   }
1659 #endif
1660 }
1661
1662 void TraceProjections::endComputation(void)
1663 {
1664 #if CMK_HAS_COUNTER_PAPI
1665   // we stop the counters here. A silent failure is alright since we
1666   // are already at the end of the program.
1667   if (PAPI_stop(papiEventSet, papiValues) != PAPI_OK) {
1668     CkPrintf("Warning: PAPI failed to stop correctly!\n");
1669   }
1670   // NOTE: We should not do a complete close of PAPI until after the
1671   // sts writer is done.
1672 #endif
1673   endTime = TraceTimer();
1674   _logPool->add(END_COMPUTATION, 0, 0, endTime, -1, -1);
1675   /*
1676   CkPrintf("End Computation [%d] records time as %lf\n", CkMyPe(), 
1677            endTime*1e06);
1678   */
1679 }
1680
1681 int TraceProjections::idxRegistered(int idx)
1682 {
1683     int idxVecLen = idxVec.size();
1684     for(int i=0; i<idxVecLen; i++)
1685     {
1686         if(idx == idxVec[i])
1687             return 1;
1688     }
1689     return 0;
1690 }
1691
1692 void TraceProjections::regFunc(const char *name, int &idx, int idxSpecifiedByUser){
1693     StrKey k((char*)name,strlen(name));
1694     int num = funcHashtable.get(k);
1695     
1696     if(num!=0) {
1697         return;
1698         //as for mpi programs, the same function may be registered for several times
1699         //CmiError("\"%s has been already registered! Please change the name!\"\n", name);
1700     }
1701     
1702     int isIdxExisting=0;
1703     if(idxSpecifiedByUser)
1704         isIdxExisting=idxRegistered(idx);
1705     if(isIdxExisting){
1706         return;
1707         //same reason with num!=0
1708         //CmiError("The identifier %d for the trace function has been already registered!", idx);
1709     }
1710
1711     if(idxSpecifiedByUser) {
1712         char *st = new char[strlen(name)+1];
1713         memcpy(st,name,strlen(name)+1);
1714         StrKey *newKey = new StrKey(st,strlen(st));
1715         int &ref = funcHashtable.put(*newKey);
1716         ref=idx;
1717         funcCount++;
1718         idxVec.push_back(idx);  
1719     } else {
1720         char *st = new char[strlen(name)+1];
1721         memcpy(st,name,strlen(name)+1);
1722         StrKey *newKey = new StrKey(st,strlen(st));
1723         int &ref = funcHashtable.put(*newKey);
1724         ref=funcCount;
1725         num = funcCount;
1726         funcCount++;
1727         idx = num;
1728         idxVec.push_back(idx);
1729     }
1730 }
1731
1732 void TraceProjections::beginFunc(char *name,char *file,int line){
1733         StrKey k(name,strlen(name));    
1734         unsigned short  num = (unsigned short)funcHashtable.get(k);
1735         beginFunc(num,file,line);
1736 }
1737
1738 void TraceProjections::beginFunc(int idx,char *file,int line){
1739         if(idx <= 0){
1740                 CmiError("Unregistered function id %d being used in %s:%d \n",idx,file,line);
1741         }       
1742         _logPool->add(BEGIN_FUNC,TraceTimer(),idx,line,file);
1743 }
1744
1745 void TraceProjections::endFunc(char *name){
1746         StrKey k(name,strlen(name));    
1747         int num = funcHashtable.get(k);
1748         endFunc(num);   
1749 }
1750
1751 void TraceProjections::endFunc(int num){
1752         if(num <= 0){
1753                 printf("endFunc without start :O\n");
1754         }
1755         _logPool->add(END_FUNC,TraceTimer(),num,0,NULL);
1756 }
1757
1758 // specialized PUP:ers for handling trace projections logs
1759 void toProjectionsFile::bytes(void *p,int n,size_t itemSize,dataType t)
1760 {
1761   for (int i=0;i<n;i++) 
1762     switch(t) {
1763     case Tchar: CheckAndFPrintF(f,"%c",((char *)p)[i]); break;
1764     case Tuchar:
1765     case Tbyte: CheckAndFPrintF(f,"%d",((unsigned char *)p)[i]); break;
1766     case Tshort: CheckAndFPrintF(f," %d",((short *)p)[i]); break;
1767     case Tushort: CheckAndFPrintF(f," %u",((unsigned short *)p)[i]); break;
1768     case Tint: CheckAndFPrintF(f," %d",((int *)p)[i]); break;
1769     case Tuint: CheckAndFPrintF(f," %u",((unsigned int *)p)[i]); break;
1770     case Tlong: CheckAndFPrintF(f," %ld",((long *)p)[i]); break;
1771     case Tulong: CheckAndFPrintF(f," %lu",((unsigned long *)p)[i]); break;
1772     case Tfloat: CheckAndFPrintF(f," %.7g",((float *)p)[i]); break;
1773     case Tdouble: CheckAndFPrintF(f," %.15g",((double *)p)[i]); break;
1774 #ifdef CMK_PUP_LONG_LONG
1775     case Tlonglong: CheckAndFPrintF(f," %lld",((CMK_PUP_LONG_LONG *)p)[i]); break;
1776     case Tulonglong: CheckAndFPrintF(f," %llu",((unsigned CMK_PUP_LONG_LONG *)p)[i]); break;
1777 #endif
1778     default: CmiAbort("Unrecognized pup type code!");
1779     };
1780 }
1781
1782 void fromProjectionsFile::bytes(void *p,int n,size_t itemSize,dataType t)
1783 {
1784   for (int i=0;i<n;i++) 
1785     switch(t) {
1786     case Tchar: { 
1787       char c = fgetc(f);
1788       if (c==EOF)
1789         parseError("Could not match character");
1790       else
1791         ((char *)p)[i] = c;
1792       break;
1793     }
1794     case Tuchar:
1795     case Tbyte: ((unsigned char *)p)[i]=(unsigned char)readInt("%d"); break;
1796     case Tshort:((short *)p)[i]=(short)readInt(); break;
1797     case Tushort: ((unsigned short *)p)[i]=(unsigned short)readUint(); break;
1798     case Tint:  ((int *)p)[i]=readInt(); break;
1799     case Tuint: ((unsigned int *)p)[i]=readUint(); break;
1800     case Tlong: ((long *)p)[i]=readInt(); break;
1801     case Tulong:((unsigned long *)p)[i]=readUint(); break;
1802     case Tfloat: ((float *)p)[i]=(float)readDouble(); break;
1803     case Tdouble:((double *)p)[i]=readDouble(); break;
1804 #ifdef CMK_PUP_LONG_LONG
1805     case Tlonglong: ((CMK_PUP_LONG_LONG *)p)[i]=readLongInt(); break;
1806     case Tulonglong: ((unsigned CMK_PUP_LONG_LONG *)p)[i]=readLongInt(); break;
1807 #endif
1808     default: CmiAbort("Unrecognized pup type code!");
1809     };
1810 }
1811
1812 #if CMK_PROJECTIONS_USE_ZLIB
1813 void toProjectionsGZFile::bytes(void *p,int n,size_t itemSize,dataType t)
1814 {
1815   for (int i=0;i<n;i++) 
1816     switch(t) {
1817     case Tchar: gzprintf(f,"%c",((char *)p)[i]); break;
1818     case Tuchar:
1819     case Tbyte: gzprintf(f,"%d",((unsigned char *)p)[i]); break;
1820     case Tshort: gzprintf(f," %d",((short *)p)[i]); break;
1821     case Tushort: gzprintf(f," %u",((unsigned short *)p)[i]); break;
1822     case Tint: gzprintf(f," %d",((int *)p)[i]); break;
1823     case Tuint: gzprintf(f," %u",((unsigned int *)p)[i]); break;
1824     case Tlong: gzprintf(f," %ld",((long *)p)[i]); break;
1825     case Tulong: gzprintf(f," %lu",((unsigned long *)p)[i]); break;
1826     case Tfloat: gzprintf(f," %.7g",((float *)p)[i]); break;
1827     case Tdouble: gzprintf(f," %.15g",((double *)p)[i]); break;
1828 #ifdef CMK_PUP_LONG_LONG
1829     case Tlonglong: gzprintf(f," %lld",((CMK_PUP_LONG_LONG *)p)[i]); break;
1830     case Tulonglong: gzprintf(f," %llu",((unsigned CMK_PUP_LONG_LONG *)p)[i]); break;
1831 #endif
1832     default: CmiAbort("Unrecognized pup type code!");
1833     };
1834 }
1835 #endif
1836
1837 void TraceProjections::endPhase() {
1838   double currentPhaseTime = TraceTimer();
1839   if (lastPhaseEvent != NULL) {
1840   } else {
1841     if (_logPool->pool != NULL) {
1842       // assumed to be BEGIN_COMPUTATION
1843     } else {
1844       CkPrintf("[%d] Warning: End Phase encountered in an empty log. Inserting BEGIN_COMPUTATION event\n", CkMyPe());
1845       _logPool->add(BEGIN_COMPUTATION, 0, 0, currentPhaseTime, -1, -1);
1846     }
1847   }
1848
1849   /* Insert endPhase event here. */
1850   /* FIXME: Format should be TYPE, PHASE#, TimeStamp, [StartTime] */
1851   /*        We currently "borrow" from the standard add() method. */
1852   /*        It should really be its own add() method.             */
1853   /* NOTE: assignment to lastPhaseEvent is "pre-emptive".         */
1854   lastPhaseEvent = &(_logPool->pool[_logPool->numEntries]);
1855   _logPool->add(END_PHASE, 0, currentPhaseID, currentPhaseTime, -1, CkMyPe());
1856   currentPhaseID++;
1857 }
1858
1859 #ifdef PROJ_ANALYSIS
1860 // ***** FROM HERE, ALL BOC-BASED FUNCTIONALITY IS DEFINED *******
1861
1862
1863 // ***@@@@ REGISTRATION FUNCTIONS/METHODS @@@@***
1864
1865 void registerOutlierReduction() {
1866   outlierReductionType =
1867     CkReduction::addReducer(outlierReduction);
1868   minMaxReductionType =
1869     CkReduction::addReducer(minMaxReduction);
1870 }
1871
1872 /**
1873  * **IMPT NOTES**:
1874  *
1875  * This is the C++ code that is registered to be activated at module
1876  * shutdown. This is called exactly once on processor 0. Module shutdown
1877  * is initiated as a result of a CkExit() call by the application code
1878  * 
1879  * The exit function must ultimately call CkExit() again to
1880  * so that other module exit functions may proceed after this module is
1881  * done.
1882  *
1883  */
1884 // FIXME: WHY extern "C"???
1885 extern "C" void TraceProjectionsExitHandler()
1886 {
1887 #if CMK_TRACE_ENABLED
1888   // CkPrintf("[%d] TraceProjectionsExitHandler called!\n", CkMyPe());
1889   CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1890   bocProxy.traceProjectionsParallelShutdown(CkMyPe());
1891 #else
1892   CkExit();
1893 #endif
1894 }
1895
1896 // This is called once on each processor but the idiom of use appears
1897 // to be to only have processor 0 register the function.
1898 //
1899 // See initnode in trace-projections.ci
1900 void initTraceProjectionsBOC()
1901 {
1902   // CkPrintf("[%d] Trace Projections initialization called!\n", CkMyPe());
1903 #ifdef __BIGSIM__
1904   if (BgNodeRank() == 0) {
1905 #else
1906     if (CkMyRank() == 0) {
1907 #endif
1908       registerExitFn(TraceProjectionsExitHandler);
1909     }
1910 #if 0
1911   } // this is so indentation does not get messed up
1912 #endif
1913 }
1914
1915 // mainchare for trace-projections BOC-operations. 
1916 // Instantiated at processor 0 and ONLY resides on processor 0 for the 
1917 // rest of its life.
1918 //
1919 // Responsible for:
1920 //   1. Handling commandline arguments
1921 //   2. Creating any objects required for proper BOC operations.
1922 //
1923 TraceProjectionsInit::TraceProjectionsInit(CkArgMsg *msg) {
1924   /** Options for Outlier Analysis */
1925   // defaults. Things will change with support for interactive analysis.
1926   bool findOutliers = false;
1927   bool outlierAutomatic = true;
1928   int numKSeeds = 10; 
1929
1930   int peNumKeep = CkNumPes();  // used as a default
1931   double entryThreshold = 0.0;
1932   bool outlierUsePhases = false;
1933   if (outlierAutomatic) {
1934     CmiGetArgIntDesc(msg->argv, "+outlierNumSeeds", &numKSeeds,
1935                      "Number of cluster seeds to apply at outlier analysis.");
1936     CmiGetArgIntDesc(msg->argv, "+outlierPeNumKeep", 
1937                      &peNumKeep, "Number of Processors to retain data");
1938     CmiGetArgDoubleDesc(msg->argv, "+outlierEpThresh", &entryThreshold,
1939                         "Minimum significance of entry points to be considered for clustering (%).");
1940     findOutliers =
1941       CmiGetArgFlagDesc(msg->argv,"+outlier", "Find Outliers.");
1942     outlierUsePhases = 
1943       CmiGetArgFlagDesc(msg->argv,"+outlierUsePhases",
1944                         "Apply automatic outlier analysis to any available phases.");
1945     if (outlierUsePhases) {
1946       // if the user wants to use an outlier feature, it is assumed outlier
1947       //    analysis is desired.
1948       findOutliers = true;
1949     }
1950   }
1951   bool findStartTime = (CmiTimerAbsolute()==1);
1952   traceProjectionsGID = CProxy_TraceProjectionsBOC::ckNew(findOutliers, findStartTime);
1953   if (findOutliers) {
1954     kMeansGID = CProxy_KMeansBOC::ckNew(outlierAutomatic,
1955                                         numKSeeds,
1956                                         peNumKeep,
1957                                         entryThreshold,
1958                                         outlierUsePhases);
1959   }
1960 }
1961
1962 // Called on every processor.
1963 void TraceProjectionsBOC::traceProjectionsParallelShutdown(int pe) {
1964   //CmiPrintf("[%d] traceProjectionsParallelShutdown called from . \n", CkMyPe(), pe);
1965   endPe = pe;                // the pe that starts CkExit()
1966   if (CkMyPe() == 0) {
1967     analysisStartTime = CmiWallTimer();
1968   }
1969   CkpvAccess(_trace)->endComputation();
1970   // no more tracing for projections on this processor after this point. 
1971   // Note that clear must be called after remove, or bad things will happen.
1972   CkpvAccess(_traces)->removeTrace(CkpvAccess(_trace));
1973   CkpvAccess(_traces)->clearTrace();
1974
1975   // From this point, we start multiple chains of reductions and broadcasts to
1976   // perform final online analysis activities.
1977
1978   // Start all parallel operations at once. 
1979   //   These MUST NOT modify base performance data in LogPool. If they must,
1980   //   then the parallel operations must be phased (and this code to be
1981   //   restructured as necessary)
1982   CProxy_KMeansBOC kMeansProxy(kMeansGID);
1983   CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1984   if (findOutliers) {
1985     parModulesRemaining++;
1986     kMeansProxy[CkMyPe()].startKMeansAnalysis();
1987   }
1988   parModulesRemaining++;
1989   if (findStartTime) 
1990   bocProxy[CkMyPe()].startTimeAnalysis();
1991   else
1992   bocProxy[CkMyPe()].startEndTimeAnalysis();
1993 }
1994
1995 // handle flush log warnings
1996 void TraceProjectionsBOC::flush_warning(int pe) 
1997 {
1998     CmiAssert(CkMyPe() == 0);
1999     std::set<int>::iterator it;
2000     it = list.find(pe);
2001     if (it == list.end())    list.insert(pe);
2002     flush_count++;
2003 }
2004
2005 void TraceProjectionsBOC::print_warning() 
2006 {
2007     CmiAssert(CkMyPe() == 0);
2008     std::set<int>::iterator it;
2009     CkPrintf("*************************************************************\n");
2010     CkPrintf("Warning: Projections log flushed to disk %d times on %d cores: ", flush_count, list.size());
2011     for (it=list.begin(); it!=list.end(); it++)
2012       CkPrintf("%d ", *it);
2013     CkPrintf(".\n");
2014     CkPrintf("Warning: The performance data is likely invalid, unless the flushes have been explicitly synchronized by your program. \n");
2015     CkPrintf("*************************************************************\n");
2016 }
2017
2018 // Called on each processor
2019 void KMeansBOC::startKMeansAnalysis() {
2020   // Initialize all necessary structures
2021   LogPool *pool = CkpvAccess(_trace)->_logPool;
2022
2023  if(CkMyPe()==0)     CkPrintf("[%d] KMeansBOC::startKMeansAnalysis time=\t%g\n", CkMyPe(), CkWallTimer() );
2024   int flushInt = 0;
2025   if (pool->hasFlushed) {
2026     flushInt = 1;
2027   }
2028   
2029   CkCallback cb(CkIndex_KMeansBOC::flushCheck(NULL), 
2030                 0, thisProxy);
2031   contribute(sizeof(int), &flushInt, CkReduction::logical_or, cb);  
2032 }
2033
2034 // Called on processor 0
2035 void KMeansBOC::flushCheck(CkReductionMsg *msg) {
2036   int someFlush = *((int *)msg->getData());
2037
2038   // if(CkMyPe()==0) CkPrintf("[%d] KMeansBOC::flushCheck time=\t%g\n", CkMyPe(), CkWallTimer() );
2039   
2040   if (someFlush == 0) {
2041     // Data intact proceed with KMeans analysis
2042     CProxy_KMeansBOC kMeansProxy(kMeansGID);
2043     kMeansProxy.flushCheckDone();
2044   } else {
2045     // Some processor had flushed it data at some point, abandon KMeans
2046     CkPrintf("Warning: Some processor has flushed its data. No KMeans will be conducted\n");
2047     // terminate KMeans
2048     CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
2049     bocProxy[0].kMeansDone();
2050   }
2051 }
2052
2053 // Called on each processor
2054 void KMeansBOC::flushCheckDone() {
2055   // **FIXME** - more flexible metric collection scheme may be necessary
2056   //   in the future for production use.
2057   LogPool *pool = CkpvAccess(_trace)->_logPool;
2058
2059   // if(CkMyPe()==0)     CkPrintf("[%d] KMeansBOC::flushCheckDone time=\t%g\n", CkMyPe(), CkWallTimer() );
2060
2061   numEntryMethods = _entryTable.size();
2062   numMetrics = numEntryMethods + 2; // EPtime + idle and overhead
2063
2064   // maintained across phases
2065   markedBegin = false;
2066   markedIdle = false;
2067   beginBlockTime = 0.0;
2068   beginIdleBlockTime = 0.0;
2069   lastBeginEPIdx = -1; // none
2070
2071   lastPhaseIdx = 0;
2072   currentExecTimes = NULL;
2073   currentPhase = 0;
2074   selected = false;
2075
2076   pool->initializePhases();
2077
2078   // incoming K Seeds and the per-phase filter
2079   incKSeeds = new double[numK*numMetrics];
2080   keepMetric = new bool[numMetrics];
2081
2082   //  Something wrong when call thisProxy[CkMyPe()].getNextPhaseMetrics() !??!
2083   //  CProxy_KMeansBOC kMeansProxy(kMeansGID);
2084   //  kMeansProxy[CkMyPe()].getNextPhaseMetrics();
2085   thisProxy[CkMyPe()].getNextPhaseMetrics();
2086 }
2087
2088 // Called on each processor.
2089 void KMeansBOC::getNextPhaseMetrics() {
2090   // Assumes the presence of the complete logs on this processor.
2091   // Assumes first event is always BEGIN_COMPUTATION
2092   // Assumes each processor sees the same number of phases.
2093   //
2094   // In this code, we collect performance data for this processor.
2095   // All times are in seconds.
2096
2097   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::getNextPhaseMetrics time=\t%g\n", CkMyPe(), CkWallTimer() );  
2098
2099   if (usePhases) {
2100     DEBUGF("[%d] Using Phases\n", CkMyPe());
2101   } else {
2102     DEBUGF("[%d] NOT using Phases\n", CkMyPe());
2103   }
2104   
2105   if (currentExecTimes != NULL) {
2106     delete [] currentExecTimes;
2107   }
2108   currentExecTimes = new double[numMetrics];
2109   for (int i=0; i<numMetrics; i++) {
2110     currentExecTimes[i] = 0.0;
2111   }
2112
2113   int numEventMethods = _entryTable.size();
2114   LogPool *pool = CkpvAccess(_trace)->_logPool;
2115   
2116   CkAssert(pool->numEntries > lastPhaseIdx);
2117   double totalPhaseTime = 0.0;
2118   double totalActiveTime = 0.0; // entry method + idle
2119
2120   for (int i=lastPhaseIdx; i<pool->numEntries; i++) {
2121     if (pool->pool[i].type == BEGIN_PROCESSING) {
2122       // check pairing
2123       if (!markedBegin) {
2124         markedBegin = true;
2125       }
2126       beginBlockTime = pool->pool[i].time;
2127       lastBeginEPIdx = pool->pool[i].eIdx;
2128     } else if (pool->pool[i].type == END_PROCESSING) {
2129       // check pairing
2130       // if End without a begin, just ignore
2131       //   this event. If a phase-boundary is crossed, the Begin
2132       //   event would be maintained in beginBlockTime, so it is 
2133       //   not a problem.
2134       if (markedBegin) {
2135         markedBegin = false;
2136         if (pool->pool[i].event < 0)
2137         {
2138           // ignore dummy events. **FIXME** as they have no eIdx?
2139           continue;
2140         }
2141         currentExecTimes[pool->pool[i].eIdx] += 
2142           pool->pool[i].time - beginBlockTime;
2143         totalActiveTime += pool->pool[i].time - beginBlockTime;
2144         lastBeginEPIdx = -1;
2145       }
2146     } else if (pool->pool[i].type == BEGIN_IDLE) {
2147       // check pairing
2148       if (!markedIdle) {
2149         markedIdle = true;
2150       }
2151       beginIdleBlockTime = pool->pool[i].time;
2152     } else if (pool->pool[i].type == END_IDLE) {
2153       // check pairing
2154       if (markedIdle) {
2155         markedIdle = false;
2156         currentExecTimes[numEventMethods] += 
2157           pool->pool[i].time - beginIdleBlockTime;
2158         totalActiveTime += pool->pool[i].time - beginIdleBlockTime;
2159       }
2160     } else if (pool->pool[i].type == END_PHASE) {
2161       // ignored when not using phases
2162       if (usePhases) {
2163         // when we've not visited this node before
2164         if (i != lastPhaseIdx) { 
2165           totalPhaseTime = 
2166             pool->pool[i].time - pool->pool[lastPhaseIdx].time;
2167           // it is important that proper accounting of time take place here.
2168           // Note that END_PHASE events inevitably occur in the context of
2169           //   some entry method by the way the tracing API is designed.
2170           if (markedBegin) {
2171             CkAssert(lastBeginEPIdx >= 0);
2172             currentExecTimes[lastBeginEPIdx] += 
2173               pool->pool[i].time - beginBlockTime;
2174             totalActiveTime += pool->pool[i].time - beginBlockTime;
2175             // this is so the remainder contributes to the next phase
2176             beginBlockTime = pool->pool[i].time;
2177           }
2178           // The following is unlikely, but stranger things have happened.
2179           if (markedIdle) {
2180             currentExecTimes[numEventMethods] +=
2181               pool->pool[i].time - beginIdleBlockTime;
2182             totalActiveTime += pool->pool[i].time - beginIdleBlockTime;
2183             // this is so the remainder contributes to the next phase
2184             beginIdleBlockTime = pool->pool[i].time;
2185           }
2186           if (totalActiveTime <= totalPhaseTime) {
2187             currentExecTimes[numEventMethods+1] = 
2188               totalPhaseTime - totalActiveTime;
2189           } else {
2190             currentExecTimes[numEventMethods+1] = 0.0;
2191             CkPrintf("[%d] Warning: Overhead found to be negative for Phase %d!\n",
2192                      CkMyPe(), currentPhase);
2193           }
2194           collectKMeansData();
2195           // end the loop (and method) and defer the work till the next call
2196           lastPhaseIdx = i;
2197           break; 
2198         }
2199       }
2200     } else if (pool->pool[i].type == END_COMPUTATION) {
2201       if (markedBegin) {
2202         CkAssert(lastBeginEPIdx >= 0);
2203         currentExecTimes[lastBeginEPIdx] += 
2204           pool->pool[i].time - beginBlockTime;
2205         totalActiveTime += pool->pool[i].time - beginBlockTime;
2206       }
2207       if (markedIdle) {
2208         currentExecTimes[numEventMethods] +=
2209           pool->pool[i].time - beginIdleBlockTime;
2210         totalActiveTime += pool->pool[i].time - beginIdleBlockTime;
2211       }
2212       totalPhaseTime = 
2213         pool->pool[i].time - pool->pool[lastPhaseIdx].time;
2214       if (totalActiveTime <= totalPhaseTime) {
2215         currentExecTimes[numEventMethods+1] = totalPhaseTime - totalActiveTime;
2216       } else {
2217         currentExecTimes[numEventMethods+1] = 0.0;
2218         CkPrintf("[%d] Warning: Overhead found to be negative!\n",
2219                  CkMyPe());
2220       }
2221       collectKMeansData();
2222     }
2223   }
2224 }
2225
2226 /**
2227  *  Through a reduction, collectKMeansData aggregates each processors' data
2228  *  in order for global properties to be determined:
2229  *  
2230  *  1. min & max to determine normalization factors.
2231  *  2. sum to determine global EP averages for possible metric reduction
2232  *       through thresholding.
2233  *  3. sum of squares to compute stddev which may be useful in the future.
2234  *
2235  *  collectKMeansData will also keep the processor's data for the current
2236  *    phase so that it may be normalized and worked on subsequently.
2237  *
2238  **/
2239 void KMeansBOC::collectKMeansData() {
2240   int minOffset = numMetrics;
2241   int maxOffset = 2*numMetrics;
2242   int sosOffset = 3*numMetrics; // sos = Sum Of Squares
2243
2244   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::collectKMeansData time=\tg\n", CkMyPe(), CkWallTimer() );
2245
2246   double *reductionMsg = new double[numMetrics*4];
2247
2248   for (int i=0; i<numMetrics; i++) {
2249     reductionMsg[i] = currentExecTimes[i];
2250     // duplicate the event times for max and min sections of the reduction
2251     reductionMsg[minOffset + i] = currentExecTimes[i];
2252     reductionMsg[maxOffset + i] = currentExecTimes[i];
2253     // compute squares
2254     reductionMsg[sosOffset + i] = currentExecTimes[i]*currentExecTimes[i];
2255   }
2256
2257   CkCallback cb(CkIndex_KMeansBOC::globalMetricRefinement(NULL), 
2258                 0, thisProxy);
2259   contribute((numMetrics*4)*sizeof(double), reductionMsg, 
2260              outlierReductionType, cb);  
2261 }
2262
2263 // The purpose is mainly to initialize the k seeds and generate
2264 //   normalization parameters for each of the metrics. The k seeds
2265 //   and normalization parameters are broadcast to all processors.
2266 //
2267 // Called on processor 0
2268 void KMeansBOC::globalMetricRefinement(CkReductionMsg *msg) {
2269   CkAssert(CkMyPe() == 0);
2270   
2271   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::globalMetricRefinement time=\t%g\n", CkMyPe(), CkWallTimer() );
2272
2273   int sumOffset = 0;
2274   int minOffset = numMetrics;
2275   int maxOffset = 2*numMetrics;
2276   int sosOffset = 3*numMetrics; // sos = Sum Of Squares
2277
2278   // calculate statistics & boundaries for the k seeds for clustering
2279   KMeansStatsMessage *outmsg = 
2280     new (numMetrics, numK*numMetrics, numMetrics*4) KMeansStatsMessage;
2281   outmsg->numMetrics = numMetrics;
2282   outmsg->numKPos = numK*numMetrics;
2283   outmsg->numStats = numMetrics*4;
2284
2285   // Sum | Min | Max | Sum of Squares
2286   double *totalExecTimes = (double *)msg->getData();
2287   double totalTime = 0.0;
2288
2289   for (int i=0; i<numMetrics; i++) {
2290     DEBUGN("%lf\n", totalExecTimes[i]);
2291     totalTime += totalExecTimes[i];
2292
2293     // calculate event mean over all processors
2294     outmsg->stats[sumOffset + i] = totalExecTimes[sumOffset + i]/CkNumPes();
2295
2296     // get the ranges and offsets of each metric. With this, we get
2297     //   normalization factors that can be sent back to each processor to
2298     //   be used as necessary. We reuse max for range. Min remains the offset.
2299     outmsg->stats[minOffset + i] = totalExecTimes[minOffset + i];
2300     outmsg->stats[maxOffset + i] = totalExecTimes[maxOffset + i] -
2301       totalExecTimes[minOffset + i];
2302     
2303     // calculate stddev (using biased variance)
2304     outmsg->stats[sosOffset + i] = 
2305       sqrt((totalExecTimes[sosOffset + i] - 
2306             2*(outmsg->stats[i])*totalExecTimes[i] +
2307             (outmsg->stats[i])*(outmsg->stats[i])*CkNumPes())/
2308            CkNumPes());
2309   }
2310
2311   for (int i=0; i<numMetrics; i++) {
2312     // 1) if the proportion of the max value of the entry method relative to
2313     //   the average time taken over all entry methods across all processors
2314     //   is greater than the stipulated percentage threshold ...; AND
2315     // 2) if the range of values are non-zero.
2316     //
2317     // The current assumption is totalTime > 0 (what program has zero total
2318     //   time from all work?)
2319     keepMetric[i] = ((totalExecTimes[maxOffset + i]/(totalTime/CkNumPes()) >=
2320                      entryThreshold) &&
2321       (totalExecTimes[maxOffset + i] > totalExecTimes[minOffset + i]));
2322     if (keepMetric[i]) {
2323       DEBUGF("[%d] Keep EP %d | Max = %lf | Avg Tot = %lf\n", CkMyPe(), i,
2324              totalExecTimes[maxOffset + i], totalTime/CkNumPes());
2325     } else {
2326       DEBUGN("[%d] DO NOT Keep EP %d\n", CkMyPe(), i);
2327     }
2328     outmsg->filter[i] = keepMetric[i];
2329   }
2330
2331   delete msg;
2332   
2333   // initialize k seeds for this phase
2334   kSeeds = new double[numK*numMetrics];
2335
2336   numKReported = 0;
2337   kNumMembers = new int[numK];
2338
2339   // Randomly select k processors' metric vectors for the k seeds
2340   //  srand((unsigned)(CmiWallTimer()*1.0e06));
2341   srand(11337); // for debugging purposes
2342   for (int k=0; k<numK; k++) {
2343     DEBUGF("Seed %d | ", k);
2344     for (int m=0; m<numMetrics; m++) {
2345       double factor = totalExecTimes[maxOffset + m] - 
2346         totalExecTimes[minOffset + m];
2347       // "uniform" distribution, scaled according to the normalization
2348       //   factors
2349       //      kSeeds[numMetrics*k + m] = ((1.0*(k+1))/numK)*factor;
2350       // Random distribution.
2351       kSeeds[numMetrics*k + m] =
2352         ((rand()*1.0)/RAND_MAX)*factor;
2353       if (keepMetric[m]) {
2354         DEBUGF("[%d|%lf] ", m, kSeeds[numMetrics*k + m]);
2355       }
2356       outmsg->kSeedsPos[numMetrics*k + m] = kSeeds[numMetrics*k + m];
2357     }
2358     DEBUGF("\n");
2359     kNumMembers[k] = 0;
2360   }
2361
2362   // broadcast statistical values to all processors for cluster discovery
2363   thisProxy.findInitialClusters(outmsg);
2364 }
2365
2366
2367
2368 // Called on each processor.
2369 void KMeansBOC::findInitialClusters(KMeansStatsMessage *msg) {
2370
2371  if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::findInitialClusters time=\t%g\n", CkMyPe(), CkWallTimer() );
2372
2373   phaseIter = 0;
2374
2375   // Get info from stats message
2376   CkAssert(numMetrics == msg->numMetrics);
2377   for (int i=0; i<numMetrics; i++) {
2378     keepMetric[i] = msg->filter[i];
2379   }
2380
2381   // Normalize data on local processor.
2382   // **CWL** See my thesis for detailed discussion of normalization of
2383   //    performance data.
2384   // **NOTE** This might change if we want to send data based on the filter
2385   //   instead of all the data.
2386   CkAssert(numMetrics*4 == msg->numStats);
2387   for (int i=0; i<numMetrics; i++) {
2388     currentExecTimes[i] -= msg->stats[numMetrics + i];  // take offset
2389     // **CWL** We do not normalize the range. Entry methods that exhibit
2390     //   large absolute timing variations should be allowed to contribute
2391     //   more to the Euclidean distance measure!
2392     // currentExecTimes[i] /= msg->stats[2*numMetrics + i];
2393   }
2394
2395   // **NOTE** This might change if we want to send data based on the filter
2396   //   instead of all the data.
2397   CkAssert(numK*numMetrics == msg->numKPos);
2398   for (int i=0; i<msg->numKPos; i++) {
2399     incKSeeds[i] = msg->kSeedsPos[i];
2400   }
2401
2402   // Decide which KSeed this processor belongs to.
2403   minDistance = calculateDistance(0);
2404   DEBUGN("[%d] Phase %d Iter %d | Distance from 0 = %lf \n", CkMyPe(), 
2405            currentPhase, phaseIter, minDistance);
2406   minK = 0;
2407   for (int i=1; i<numK; i++) {
2408     double distance = calculateDistance(i);
2409     DEBUGN("[%d] Phase %d Iter %d | Distance from %d = %lf \n", CkMyPe(), 
2410              currentPhase, phaseIter, i, distance);
2411     if (distance < minDistance) {
2412       minDistance = distance;
2413       minK = i;
2414     }
2415   }
2416
2417   // Set up a reduction with the modification vector to the root (0).
2418   //
2419   // The modification vector sends a negative value for each metric
2420   //   for the K this processor no longer belongs to and a positive
2421   //   value to the K the processor now belongs. In addition, a -1.0
2422   //   is sent to the K it is leaving and a +1.0 to the K it is 
2423   //   joining.
2424   //
2425   // The processor must still contribute a "zero returns" even if
2426   //   nothing changes. This will be the basis for determine
2427   //   convergence at the root.
2428   //
2429   // The addtional +1 is meant for the count-change that must be
2430   //   maintained for the special cases at the root when some K
2431   //   may be deprived of all processor points or go from 0 to a
2432   //   positive number of processors (see later comments).
2433   double *modVector = new double[numK*(numMetrics+1)];
2434   for (int i=0; i<numK; i++) {
2435     for (int j=0; j<numMetrics+1; j++) {
2436       modVector[i*(numMetrics+1) + j] = 0.0;
2437     }
2438   }
2439   for (int i=0; i<numMetrics; i++) {
2440     // for this initialization, only positive values need be sent.
2441     modVector[minK*(numMetrics+1) + i] = currentExecTimes[i];
2442   }
2443   modVector[minK*(numMetrics+1)+numMetrics] = 1.0;
2444
2445   CkCallback cb(CkIndex_KMeansBOC::updateKSeeds(NULL), 
2446                 0, thisProxy);
2447   contribute(numK*(numMetrics+1)*sizeof(double), modVector, 
2448              CkReduction::sum_double, cb);  
2449 }
2450
2451 double KMeansBOC::calculateDistance(int k) {
2452   double ret = 0.0;
2453   for (int i=0; i<numMetrics; i++) {
2454     if (keepMetric[i]) {
2455       DEBUGN("[%d] Phase %d Iter %d Metric %d Exec %lf Seed %lf \n", 
2456              CkMyPe(), currentPhase, phaseIter, i,
2457                currentExecTimes[i], incKSeeds[k*numMetrics + i]);
2458       ret += pow(currentExecTimes[i] - incKSeeds[k*numMetrics + i], 2.0);
2459     }
2460   }
2461   return sqrt(ret);
2462 }
2463
2464 void KMeansBOC::updateKSeeds(CkReductionMsg *msg) {
2465   CkAssert(CkMyPe() == 0);
2466
2467   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::updateKSeeds time=\t%g\n", CkMyPe(), CkWallTimer() );
2468
2469   double *modVector = (double *)msg->getData();
2470   // sanity check
2471   CkAssert(numK*(numMetrics+1)*sizeof(double) == msg->getSize());
2472
2473   // A quick convergence test.
2474   bool hasChanges = false;
2475   for (int i=0; i<numK; i++) {
2476     hasChanges = hasChanges || 
2477       (modVector[i*(numMetrics+1) + numMetrics] != 0.0);
2478   }
2479   if (!hasChanges) {
2480     delete msg;
2481     findRepresentatives();
2482   } else {
2483     int overallChange = 0;
2484     for (int i=0; i<numK; i++) {
2485       int change = (int)modVector[i*(numMetrics+1) + numMetrics];
2486       if (change != 0) {
2487         overallChange += change;
2488         // modify the k seeds based on the modification vectors coming in
2489         //
2490         // If a seed initially has no members, its contents do not matter and
2491         //   is simply set to the average of the incoming vector.
2492         // If the change causes a seed to lose all its members, do nothing.
2493         //   Its last-known location is kept to allow it to re-capture
2494         //   membership at the next iteration rather than apply the last
2495         //   changes (which snaps the point unnaturally to 0,0).
2496         // Otherwise, apply the appropriate vector changes.
2497         CkAssert((kNumMembers[i] + change >= 0) &&
2498                  (kNumMembers[i] + change <= CkNumPes()));
2499         if (kNumMembers[i] == 0) {
2500           CkAssert(change > 0);
2501           for (int j=0; j<numMetrics; j++) {
2502             kSeeds[i*numMetrics + j] = modVector[i*(numMetrics+1) + j]/change;
2503           }
2504         } else if (kNumMembers[i] + change == 0) {
2505           // do nothing.
2506         } else {
2507           for (int j=0; j<numMetrics; j++) {
2508             kSeeds[i*numMetrics + j] *= kNumMembers[i];
2509             kSeeds[i*numMetrics + j] += modVector[i*(numMetrics+1) + j];
2510             kSeeds[i*numMetrics + j] /= kNumMembers[i] + change;
2511           }
2512         }
2513         kNumMembers[i] += change;
2514       }
2515       DEBUGN("[%d] Phase %d Iter %d K = %d Membership Count = %d\n",
2516              CkMyPe(), currentPhase, phaseIter, i, kNumMembers[i]);
2517     }
2518     delete msg;
2519
2520     // broadcast the new seed locations.
2521     KSeedsMessage *outmsg = new (numK*numMetrics) KSeedsMessage;
2522     outmsg->numKPos = numK*numMetrics;
2523     for (int i=0; i<numK*numMetrics; i++) {
2524       outmsg->kSeedsPos[i] = kSeeds[i];
2525     }
2526
2527     thisProxy.updateSeedMembership(outmsg);
2528   }
2529 }
2530
2531 // Called on all processors
2532 void KMeansBOC::updateSeedMembership(KSeedsMessage *msg) {
2533
2534   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::updateSeedMembership time=\t%g\n", CkMyPe(), CkWallTimer() );
2535
2536   phaseIter++;
2537
2538   // **NOTE** This might change if we want to send data based on the filter
2539   //   instead of all the data.
2540   CkAssert(numK*numMetrics == msg->numKPos);
2541   for (int i=0; i<msg->numKPos; i++) {
2542     incKSeeds[i] = msg->kSeedsPos[i];
2543   }
2544
2545   // Decide which KSeed this processor belongs to.
2546   lastMinK = minK;
2547   minDistance = calculateDistance(0);
2548   DEBUGN("[%d] Phase %d Iter %d | Distance from 0 = %lf \n", CkMyPe(), 
2549          currentPhase, phaseIter, minDistance);
2550
2551   minK = 0;
2552   for (int i=1; i<numK; i++) {
2553     double distance = calculateDistance(i);
2554     DEBUGN("[%d] Phase %d Iter %d | Distance from %d = %lf \n", CkMyPe(), 
2555            currentPhase, phaseIter, i, distance);
2556     if (distance < minDistance) {
2557       minDistance = distance;
2558       minK = i;
2559     }
2560   }
2561
2562   double *modVector = new double[numK*(numMetrics+1)];
2563   for (int i=0; i<numK; i++) {
2564     for (int j=0; j<numMetrics+1; j++) {
2565       modVector[i*(numMetrics+1) + j] = 0.0;
2566     }
2567   }
2568
2569   if (minK != lastMinK) {
2570     for (int i=0; i<numMetrics; i++) {
2571       modVector[minK*(numMetrics+1) + i] = currentExecTimes[i];
2572       modVector[lastMinK*(numMetrics+1) + i] = -currentExecTimes[i];
2573     }
2574     modVector[minK*(numMetrics+1)+numMetrics] = 1.0;
2575     modVector[lastMinK*(numMetrics+1)+numMetrics] = -1.0;
2576   }
2577
2578   CkCallback cb(CkIndex_KMeansBOC::updateKSeeds(NULL), 
2579                 0, thisProxy);
2580   contribute(numK*(numMetrics+1)*sizeof(double), modVector, 
2581              CkReduction::sum_double, cb);  
2582 }
2583
2584 void KMeansBOC::findRepresentatives() {
2585
2586   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::findRepresentatives time=\t%g\n", CkMyPe(), CkWallTimer() );
2587
2588   int numNonEmptyClusters = 0;
2589   for (int i=0; i<numK; i++) {
2590     if (kNumMembers[i] > 0) {
2591       numNonEmptyClusters++;
2592     }
2593   }
2594
2595   int numRepresentatives = peNumKeep;
2596   // **FIXME**
2597   // This is fairly arbitrary. Next time, choose the centers of the top
2598   //   largest clusters.
2599   if (numRepresentatives < numNonEmptyClusters) {
2600     numRepresentatives = numNonEmptyClusters;
2601   }
2602
2603   int slotsRemaining = numRepresentatives;
2604
2605   DEBUGF("Slots = %d | Non-empty = %d \n", slotsRemaining, 
2606          numNonEmptyClusters);
2607
2608   // determine how many exemplars to select per cluster. Currently
2609   //   hardcoded to 1. Future challenge is to decide on other numbers
2610   //   or proportionality.
2611   //
2612   int exemplarsPerCluster = 1;
2613   slotsRemaining -= exemplarsPerCluster*numNonEmptyClusters;
2614
2615   int numCandidateOutliers = CkNumPes() - 
2616     exemplarsPerCluster*numNonEmptyClusters;
2617
2618   double *remainders = new double[numK];
2619   int *assigned = new int[numK];
2620   exemplarChoicesLeft = new int[numK];
2621   outlierChoicesLeft = new int[numK];
2622
2623   for (int i=0; i<numK; i++) {
2624     assigned[i] = 0;
2625     remainders[i] = 
2626       (kNumMembers[i] - exemplarsPerCluster*numNonEmptyClusters) *
2627       slotsRemaining / numCandidateOutliers;
2628     if (remainders[i] >= 0.0) {
2629       assigned[i] = (int)floor(remainders[i]);
2630       remainders[i] -= assigned[i];
2631     } else {
2632       remainders[i] = 0.0;
2633     }
2634   }
2635
2636   for (int i=0; i<numK; i++) {
2637     slotsRemaining -= assigned[i];
2638   }
2639   CkAssert(slotsRemaining >= 0);
2640
2641   // find clusters to assign the loose slots to, in order of
2642   // remainder proportion
2643   while (slotsRemaining > 0) {
2644     double max = 0.0;
2645     int winner = 0;
2646     for (int i=0; i<numK; i++) {
2647       if (remainders[i] > max) {
2648         max = remainders[i];
2649         winner = i;
2650       }
2651     }
2652     assigned[winner]++;
2653     remainders[winner] = 0.0;
2654     slotsRemaining--;
2655   }
2656
2657   // set up how many reduction cycles of min/max we need to conduct to
2658   // select the representatives.
2659   numSelectionIter = exemplarsPerCluster;
2660   for (int i=0; i<numK; i++) {
2661     if (assigned[i] > numSelectionIter) {
2662       numSelectionIter = assigned[i];
2663     }
2664   }
2665   DEBUGF("Selection Iterations = %d\n", numSelectionIter);
2666
2667   for (int i=0; i<numK; i++) {
2668     if (kNumMembers[i] > 0) {
2669       exemplarChoicesLeft[i] = exemplarsPerCluster;
2670       outlierChoicesLeft[i] = assigned[i];
2671     } else {
2672       exemplarChoicesLeft[i] = 0;
2673       outlierChoicesLeft[i] = 0;
2674     }
2675     DEBUGF("%d | Exemplar = %d | Outlier = %d\n", i, exemplarChoicesLeft[i],
2676            outlierChoicesLeft[i]);
2677   }
2678
2679   delete [] assigned;
2680   delete [] remainders;
2681
2682   // send out first broadcast
2683   KSelectionMessage *outmsg = NULL;
2684   if (numSelectionIter > 0) {
2685     outmsg = new (numK, numK, numK) KSelectionMessage;
2686     outmsg->numKMinIDs = numK;
2687     outmsg->numKMaxIDs = numK;
2688     for (int i=0; i<numK; i++) {
2689       outmsg->minIDs[i] = -1;
2690       outmsg->maxIDs[i] = -1;
2691     }
2692     thisProxy.collectDistances(outmsg);
2693   } else {
2694     CkPrintf("Warning: No selection iteration from the start!\n");
2695     // invoke phase completion on all processors
2696     thisProxy.phaseDone();
2697   }
2698 }
2699
2700 /*
2701  *  lastMin = array of minimum champions of the last tournament
2702  *  lastMax = array of maximum champions of the last tournament
2703  *  lastMaxVal = array of last encountered maximum values, allows previous
2704  *                 minimum winners to eliminate themselves from the next
2705  *                 minimum race.
2706  *
2707  *  Called on all processors.
2708  */
2709 void KMeansBOC::collectDistances(KSelectionMessage *msg) {
2710
2711   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::collectDistances time=\t%g\n", CkMyPe(), CkWallTimer() );
2712
2713   DEBUGF("[%d] %d | min = %d max = %d\n", CkMyPe(),
2714          lastMinK, msg->minIDs[lastMinK], msg->maxIDs[lastMinK]);
2715   if ((CkMyPe() == msg->minIDs[lastMinK]) || 
2716       (CkMyPe() == msg->maxIDs[lastMinK])) {
2717     CkAssert(!selected);
2718     selected = true;
2719   }
2720
2721   // build outgoing reduction structure
2722   //   format = minVal | ID | maxVal | ID
2723   double *minMaxAndIDs = NULL;
2724
2725   minMaxAndIDs = new double[numK*4];
2726   // initialize to the appropriate out-of-band values (for error checks)
2727   for (int i=0; i<numK; i++) {
2728     minMaxAndIDs[i*4] = -1.0; // out-of-band min value
2729     minMaxAndIDs[i*4+1] = -1.0; // out of band ID
2730     minMaxAndIDs[i*4+2] = -1.0; // out-of-band max value
2731     minMaxAndIDs[i*4+3] = -1.0; // out of band ID
2732   }
2733   // If I have not won before, I put myself back into the competition
2734   if (!selected) {
2735     DEBUGF("[%d] My Contribution = %lf\n", CkMyPe(), minDistance);
2736     minMaxAndIDs[lastMinK*4] = minDistance;
2737     minMaxAndIDs[lastMinK*4+1] = CkMyPe();
2738     minMaxAndIDs[lastMinK*4+2] = minDistance;
2739     minMaxAndIDs[lastMinK*4+3] = CkMyPe();
2740   }
2741   delete msg;
2742
2743   CkCallback cb(CkIndex_KMeansBOC::findNextMinMax(NULL), 
2744                 0, thisProxy);
2745   contribute(numK*4*sizeof(double), minMaxAndIDs, 
2746              minMaxReductionType, cb);  
2747 }
2748
2749 void KMeansBOC::findNextMinMax(CkReductionMsg *msg) {
2750   // incoming format:
2751   //   minVal | minID | maxVal | maxID
2752
2753   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::findNextMinMax time=\t%g\n", CkMyPe(), CkWallTimer() );
2754
2755   if (numSelectionIter > 0) {
2756     double *incInfo = (double *)msg->getData();
2757     
2758     KSelectionMessage *outmsg = new (numK, numK) KSelectionMessage;
2759     outmsg->numKMinIDs = numK;
2760     outmsg->numKMaxIDs = numK;
2761     
2762     for (int i=0; i<numK; i++) {
2763       DEBUGF("%d | %lf %d %lf %d \n", i, 
2764              incInfo[i*4], (int)incInfo[i*4+1], 
2765              incInfo[i*4+2], (int)incInfo[i*4+3]);
2766     }
2767
2768     for (int i=0; i<numK; i++) {
2769       if (exemplarChoicesLeft[i] > 0) {
2770         outmsg->minIDs[i] = (int)incInfo[i*4+1];
2771         exemplarChoicesLeft[i]--;
2772       } else {
2773         outmsg->minIDs[i] = -1;
2774       }
2775       if (outlierChoicesLeft[i] > 0) {
2776         outmsg->maxIDs[i] = (int)incInfo[i*4+3];
2777         outlierChoicesLeft[i]--;
2778       } else {
2779         outmsg->maxIDs[i] = -1;
2780       }
2781     }
2782     thisProxy.collectDistances(outmsg);
2783     numSelectionIter--;
2784   } else {
2785     // invoke phase completion on all processors
2786     thisProxy.phaseDone();
2787   }
2788 }
2789
2790 /**
2791  *  Completion of the K-Means clustering and data selection of one phase
2792  *    of the computation.
2793  *
2794  *  Called on every processor.
2795  */
2796 void KMeansBOC::phaseDone() {
2797
2798   //  if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::phaseDone time=\t%g\n", CkMyPe(), CkWallTimer() );
2799
2800   LogPool *pool = CkpvAccess(_trace)->_logPool;
2801   CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
2802
2803   // now decide on what to do with the decision.
2804   if (!selected) {
2805     if (usePhases) {
2806       pool->keepPhase[currentPhase] = false;
2807     } else {
2808       // if not using phases, we're working on the whole log
2809       pool->setAllPhases(false);
2810     }
2811   }
2812
2813   // **FIXME** (?) - All processors have to agree on this, or the reduction
2814   //   will not be correct! The question is "is this enforcible?"
2815   if ((currentPhase == (pool->numPhases-1)) || !usePhases) {
2816     // We're done
2817     int dummy = 0;
2818     CkCallback cb(CkIndex_TraceProjectionsBOC::kMeansDone(NULL), 
2819                   0, bocProxy);
2820     contribute(sizeof(int), &dummy, CkReduction::sum_int, cb);
2821   } else {
2822     // reset all phase-based k-means data and decisions
2823
2824     // **FIXME**!!!!!    
2825     
2826     // invoke the next K-Means computation phase.
2827     currentPhase++;
2828     thisProxy[CkMyPe()].getNextPhaseMetrics();
2829   }
2830 }
2831
2832 void TraceProjectionsBOC::startTimeAnalysis()
2833 {
2834   double startTime = 0.0;
2835   if (CkpvAccess(_trace)->_logPool->numEntries>0)
2836      startTime = CkpvAccess(_trace)->_logPool->pool[0].time;
2837   CkCallback cb(CkIndex_TraceProjectionsBOC::startTimeDone(NULL), thisProxy);
2838   contribute(sizeof(double), &startTime, CkReduction::min_double, cb);  
2839 }
2840
2841 void TraceProjectionsBOC::startTimeDone(CkReductionMsg *msg)
2842 {
2843   // CkPrintf("[%d] TraceProjectionsBOC::startTimeDone time=\t%g parModulesRemaining:%d\n", CkMyPe(), CkWallTimer(), parModulesRemaining);
2844
2845   if (CkpvAccess(_trace) != NULL) {
2846     CkpvAccess(_trace)->_logPool->globalStartTime = *(double *)msg->getData();
2847     CkpvAccess(_trace)->_logPool->setNewStartTime();
2848     //if (CkMyPe() == 0) CkPrintf("Start time determined to be %lf us\n", (CkpvAccess(_trace)->_logPool->globalStartTime)*1e06);
2849   }
2850   delete msg;
2851   thisProxy[CkMyPe()].startEndTimeAnalysis();
2852 }
2853
2854 void TraceProjectionsBOC::startEndTimeAnalysis()
2855 {
2856  //CkPrintf("[%d] TraceProjectionsBOC::startEndTimeAnalysis time=\t%g\n", CkMyPe(), CkWallTimer() );
2857
2858   endTime = CkpvAccess(_trace)->endTime;
2859   // CkPrintf("[%d] End time is %lf us\n", CkMyPe(), endTime*1e06);
2860
2861   CkCallback cb(CkIndex_TraceProjectionsBOC::endTimeDone(NULL), 
2862                 0, thisProxy);
2863   contribute(sizeof(double), &endTime, CkReduction::max_double, cb);  
2864 }
2865
2866 void TraceProjectionsBOC::endTimeDone(CkReductionMsg *msg)
2867 {
2868  //if(CkMyPe()==0)    CkPrintf("[%d] TraceProjectionsBOC::endTimeDone time=\t%g parModulesRemaining:%d\n", CkMyPe(), CkWallTimer(), parModulesRemaining);
2869
2870   CkAssert(CkMyPe() == 0);
2871   parModulesRemaining--;
2872   if (CkpvAccess(_trace) != NULL) {
2873     CkpvAccess(_trace)->_logPool->globalEndTime = *(double *)msg->getData() - CkpvAccess(_trace)->_logPool->globalStartTime;
2874     // CkPrintf("End time determined to be %lf us\n",
2875     //       (CkpvAccess(_trace)->_logPool->globalEndTime)*1e06);
2876   }
2877   delete msg;
2878   if (parModulesRemaining == 0) {
2879     thisProxy[CkMyPe()].finalize();
2880   }
2881 }
2882
2883 void TraceProjectionsBOC::kMeansDone(CkReductionMsg *msg) {
2884
2885  if(CkMyPe()==0)  CkPrintf("[%d] TraceProjectionsBOC::kMeansDone time=\t%g\n", CkMyPe(), CkWallTimer() );
2886
2887   CkAssert(CkMyPe() == 0);
2888   parModulesRemaining--;
2889   CkPrintf("K-Means Analysis Time = %lf seconds\n",
2890            CmiWallTimer()-analysisStartTime);
2891   delete msg;
2892   if (parModulesRemaining == 0) {
2893     thisProxy[CkMyPe()].finalize();
2894   }
2895 }
2896
2897 /**
2898  *
2899  *  This version is called (on processor 0) only if flushCheck fails.
2900  *
2901  */
2902 void TraceProjectionsBOC::kMeansDone() {
2903   CkAssert(CkMyPe() == 0);
2904   parModulesRemaining--;
2905   CkPrintf("K-Means Analysis Aborted because of flush. Time taken = %lf seconds\n",
2906            CmiWallTimer()-analysisStartTime);
2907   if (parModulesRemaining == 0) {
2908     thisProxy[CkMyPe()].finalize();
2909   }
2910 }
2911
2912 void TraceProjectionsBOC::finalize()
2913 {
2914   CkAssert(CkMyPe() == 0);
2915   //CkPrintf("Total Analysis Time = %lf seconds\n", 
2916   //       CmiWallTimer()-analysisStartTime);
2917   thisProxy.closingTraces();
2918 }
2919
2920 // Called on every processor
2921 void TraceProjectionsBOC::closingTraces() {
2922   CkpvAccess(_trace)->closeTrace();
2923
2924     // subtle:  reduction needs to go to the PE which started CkExit()
2925   int pe = 0;
2926   if (endPe != -1) pe = endPe;
2927   CkCallback cb(CkIndex_TraceProjectionsBOC::closeParallelShutdown(NULL), 
2928                 pe, thisProxy); 
2929   contribute(0, NULL, CkReduction::sum_int, cb);  
2930 }
2931
2932 // The sole purpose of this reduction is to decide whether or not
2933 //   Projections as a module needs to call CkExit() to get other
2934 //   modules to shutdown.
2935 void TraceProjectionsBOC::closeParallelShutdown(CkReductionMsg *msg) {
2936   CkAssert(endPe == -1 && CkMyPe() ==0 || CkMyPe() == endPe);
2937   delete msg;
2938   // decide if CkExit() needs to be called
2939   if (!CkpvAccess(_trace)->converseExit) {
2940     CkExit();
2941   }
2942 }
2943 /*
2944  *  Registration and definition of the Outlier Reduction callback.
2945  *  Format: Sum | Min | Max | Sum of Squares
2946  */
2947 CkReductionMsg *outlierReduction(int nMsgs,
2948                                  CkReductionMsg **msgs) {
2949   int numBytes = 0;
2950   int numMetrics = 0;
2951   double *ret = NULL;
2952
2953   if (nMsgs == 1) {
2954     // nothing to do, just pass it on
2955     return CkReductionMsg::buildNew(msgs[0]->getSize(),msgs[0]->getData());
2956   }
2957
2958   if (nMsgs > 1) {
2959     numBytes = msgs[0]->getSize();
2960     // sanity checks
2961     if (numBytes%sizeof(double) != 0) {
2962       CkAbort("Outlier Reduction Size incompatible with doubles!\n");
2963     }
2964     if ((numBytes/sizeof(double))%4 != 0) {
2965       CkAbort("Outlier Reduction Size Array not divisible by 4!\n");
2966     }
2967     numMetrics = (numBytes/sizeof(double))/4;
2968     ret = new double[numMetrics*4];
2969
2970     // copy the first message data into the return structure first
2971     for (int i=0; i<numMetrics*4; i++) {
2972       ret[i] = ((double *)msgs[0]->getData())[i];
2973     }
2974
2975     // Sum | Min | Max | Sum of Squares
2976     for (int msgIdx=1; msgIdx<nMsgs; msgIdx++) {
2977       for (int i=0; i<numMetrics; i++) {
2978         // Sum
2979         ret[i] += ((double *)msgs[msgIdx]->getData())[i];
2980         // Min
2981         ret[numMetrics + i] =
2982           (ret[numMetrics + i] < 
2983            ((double *)msgs[msgIdx]->getData())[numMetrics + i]) 
2984           ? ret[numMetrics + i] : 
2985           ((double *)msgs[msgIdx]->getData())[numMetrics + i];
2986         // Max
2987         ret[2*numMetrics + i] = 
2988           (ret[2*numMetrics + i] >
2989            ((double *)msgs[msgIdx]->getData())[2*numMetrics + i])
2990           ? ret[2*numMetrics + i] :
2991           ((double *)msgs[msgIdx]->getData())[2*numMetrics + i];
2992         // Sum of Squares (squaring already done at leaf)
2993         ret[3*numMetrics + i] +=
2994           ((double *)msgs[msgIdx]->getData())[3*numMetrics + i];
2995       }
2996     }
2997   }
2998   
2999   /* apparently, we do not delete the incoming messages */
3000   return CkReductionMsg::buildNew(numBytes,ret);
3001 }
3002
3003 /*
3004  * The only reason we have a user-defined reduction is to support
3005  *   identification of the "winning" processors as well as to handle
3006  *   both the min and the max of each "tournament". A simple min/max
3007  *   discovery cannot handle ties.
3008  */
3009 CkReductionMsg *minMaxReduction(int nMsgs,
3010                                 CkReductionMsg **msgs) {
3011   CkAssert(nMsgs > 0);
3012
3013   int numBytes = msgs[0]->getSize();
3014   CkAssert(numBytes%sizeof(double) == 0);
3015   int numK = (numBytes/sizeof(double))/4;
3016
3017   double *ret = new double[numK*4];
3018   // fill with out-of-band values
3019   for (int i=0; i<numK; i++) {
3020     ret[i*4] = -1.0;
3021     ret[i*4+1] = -1.0;
3022     ret[i*4+2] = -1.0;
3023     ret[i*4+3] = -1.0;
3024   }
3025
3026   // incoming format K * (minVal | minIdx | maxVal | maxIdx)
3027   for (int i=0; i<nMsgs; i++) {
3028     double *temp = (double *)msgs[i]->getData();
3029     for (int j=0; j<numK; j++) {
3030       // no previous valid min
3031       if (ret[j*4+1] < 0) {
3032         // fill it in only if the incoming min is valid
3033         if (temp[j*4+1] >= 0) {
3034           ret[j*4] = temp[j*4];      // fill min value
3035           ret[j*4+1] = temp[j*4+1];  // fill ID
3036         }
3037       } else {
3038         // find Min, only if incoming min is valid
3039         if (temp[j*4+1] >= 0) {
3040           if (temp[j*4] < ret[j*4]) {
3041             ret[j*4] = temp[j*4];      // replace min value
3042             ret[j*4+1] = temp[j*4+1];  // replace ID
3043           }
3044         }
3045       }
3046       // no previous valid max
3047       if (ret[j*4+3] < 0) {
3048         // fill only if the incoming max is valid
3049         if (temp[j*4+3] >= 0) {
3050           ret[j*4+2] = temp[j*4+2];  // fill max value
3051           ret[j*4+3] = temp[j*4+3];  // fill ID
3052         }
3053       } else {
3054         // find Max, only if incoming max is valid
3055         if (temp[j*4+3] >= 0) {
3056           if (temp[j*4+2] > ret[j*4+2]) {
3057             ret[j*4+2] = temp[j*4+2];  // replace max value
3058             ret[j*4+3] = temp[j*4+3];  // replace ID
3059           }
3060         }
3061       }
3062     }
3063   }
3064   CkReductionMsg *redmsg = CkReductionMsg::buildNew(numBytes, ret);
3065   delete [] ret;
3066   return redmsg;
3067 }
3068
3069 #include "TraceProjections.def.h"
3070 #endif //PROJ_ANALYSIS
3071
3072 /*@}*/