Add support for PAPI projections tracing on Cray XE
[charm.git] / src / ck-perf / trace-projections.C
1 /**
2  * \addtogroup CkPerf
3 */
4 /*@{*/
5
6 #include <string.h>
7
8 #include "charm++.h"
9 #include "trace-projections.h"
10 #include "trace-projectionsBOC.h"
11 #include "TopoManager.h"
12
13 #if DEBUG_PROJ
14 #define DEBUGF(...) CkPrintf(__VA_ARGS__)
15 #else
16 #define DEBUGF(...)
17 #endif
18 #define DEBUGN(...)  // easy way to selectively disable DEBUGs
19
20 #define DefaultLogBufSize      1000000
21
22 // **CW** Simple delta encoding implementation
23 // delta encoding is on by default. It may be turned off later in
24 // the runtime.
25 int deltaLog;
26 int nonDeltaLog;
27
28 int checknested=0;              // check illegal nested begin/end execute 
29
30 #ifdef PROJ_ANALYSIS
31 // BOC operations readonlys
32 CkGroupID traceProjectionsGID;
33 CkGroupID kMeansGID;
34
35 // New reduction type for Outlier Analysis purposes. This is allowed to be
36 // a global variable according to the Charm++ manual.
37 CkReductionMsg *outlierReduction(int nMsgs,
38                                  CkReductionMsg **msgs);
39 CkReductionMsg *minMaxReduction(int nMsgs,
40                                 CkReductionMsg **msgs);
41 CkReduction::reducerType outlierReductionType;
42 CkReduction::reducerType minMaxReductionType;
43 #endif // PROJ_ANALYSIS
44
45 CkpvStaticDeclare(TraceProjections*, _trace);
46 CtvStaticDeclare(int,curThreadEvent);
47
48 CkpvDeclare(CmiInt8, CtrLogBufSize);
49
50 typedef CkVec<char *>  usrEventVec;
51 CkpvStaticDeclare(usrEventVec, usrEventlist);
52 class UsrEvent {
53 public:
54   int e;
55   char *str;
56   UsrEvent(int _e, char* _s): e(_e),str(_s) {}
57 };
58 CkpvStaticDeclare(CkVec<UsrEvent *>*, usrEvents);
59
60 // When tracing is disabled, these are defined as empty static inlines
61 // in the header, to minimize overhead
62 #if CMK_TRACE_ENABLED
63 /// Disable the outputting of the trace logs
64 void disableTraceLogOutput()
65
66   CkpvAccess(_trace)->setWriteData(false);
67 }
68
69 /// Enable the outputting of the trace logs
70 void enableTraceLogOutput()
71 {
72   CkpvAccess(_trace)->setWriteData(true);
73 }
74
75 /// Force the log files to be flushed
76 void flushTraceLog()
77 {
78   CkpvAccess(_trace)->traceFlushLog();
79 }
80 #endif
81
82 #if ! CMK_TRACE_ENABLED
83 static int warned=0;
84 #define OPTIMIZED_VERSION       \
85         if (!warned) { warned=1;        \
86         CmiPrintf("\n\n!!!! Warning: traceUserEvent not available in optimized version!!!!\n\n\n"); }
87 #else
88 #define OPTIMIZED_VERSION /*empty*/
89 #endif // CMK_TRACE_ENABLED
90
91 /*
92 On T3E, we need to have file number control by open/close files only when needed.
93 */
94 #if CMK_TRACE_LOGFILE_NUM_CONTROL
95   #define OPEN_LOG openLog("a");
96   #define CLOSE_LOG closeLog();
97 #else
98   #define OPEN_LOG
99   #define CLOSE_LOG
100 #endif //CMK_TRACE_LOGFILE_NUM_CONTROL
101
102 #if CMK_HAS_COUNTER_PAPI
103 #ifdef USE_SPP_PAPI
104 int papiEvents[NUMPAPIEVENTS];
105 #else
106 int papiEvents[NUMPAPIEVENTS] = { PAPI_L2_DCM, PAPI_FP_OPS };
107 #endif
108 #endif // CMK_HAS_COUNTER_PAPI
109
110 /**
111   For each TraceFoo module, _createTraceFoo() must be defined.
112   This function is called in _createTraces() generated in moduleInit.C
113 */
114 void _createTraceprojections(char **argv)
115 {
116   DEBUGF("%d createTraceProjections\n", CkMyPe());
117   CkpvInitialize(CkVec<char *>, usrEventlist);
118   CkpvInitialize(CkVec<UsrEvent *>*, usrEvents);
119   CkpvAccess(usrEvents) = new CkVec<UsrEvent *>();
120 #if CMK_BIGSIM_CHARM
121   // CthRegister does not call the constructor
122 //  CkpvAccess(usrEvents) = CkVec<UsrEvent *>();
123 #endif //CMK_BIGSIM_CHARM
124   CkpvInitialize(TraceProjections*, _trace);
125   CkpvAccess(_trace) = new  TraceProjections(argv);
126   CkpvAccess(_traces)->addTrace(CkpvAccess(_trace));
127   if (CkMyPe()==0) CkPrintf("Charm++: Tracemode Projections enabled.\n");
128 }
129  
130 /* ****** CW TEMPORARY LOCATION ***** Support for thread listeners */
131
132 struct TraceThreadListener {
133   struct CthThreadListener base;
134   int event;
135   int msgType;
136   int ep;
137   int srcPe;
138   int ml;
139   CmiObjId idx;
140 };
141
142 extern "C"
143 void traceThreadListener_suspend(struct CthThreadListener *l)
144 {
145   TraceThreadListener *a=(TraceThreadListener *)l;
146   /* here, we activate the appropriate trace codes for the appropriate
147      registered modules */
148   traceSuspend();
149 }
150
151 extern "C"
152 void traceThreadListener_resume(struct CthThreadListener *l) 
153 {
154   TraceThreadListener *a=(TraceThreadListener *)l;
155   /* here, we activate the appropriate trace codes for the appropriate
156      registered modules */
157   _TRACE_BEGIN_EXECUTE_DETAILED(a->event,a->msgType,a->ep,a->srcPe,a->ml,
158                                 CthGetThreadID(a->base.thread));
159   a->event=-1;
160   a->srcPe=CkMyPe(); /* potential lie to migrated threads */
161   a->ml=0;
162 }
163
164 extern "C"
165 void traceThreadListener_free(struct CthThreadListener *l) 
166 {
167   TraceThreadListener *a=(TraceThreadListener *)l;
168   delete a;
169 }
170
171 void TraceProjections::traceAddThreadListeners(CthThread tid, envelope *e)
172 {
173 #if CMK_TRACE_ENABLED
174   /* strip essential information from the envelope */
175   TraceThreadListener *a= new TraceThreadListener;
176   
177   a->base.suspend=traceThreadListener_suspend;
178   a->base.resume=traceThreadListener_resume;
179   a->base.free=traceThreadListener_free;
180   a->event=e->getEvent();
181   a->msgType=e->getMsgtype();
182   a->ep=e->getEpIdx();
183   a->srcPe=e->getSrcPe();
184   a->ml=e->getTotalsize();
185
186   CthAddListener(tid, (CthThreadListener *)a);
187 #endif
188 }
189
190 void LogPool::openLog(const char *mode)
191 {
192 #if CMK_PROJECTIONS_USE_ZLIB
193   if(compressed) {
194     if (nonDeltaLog) {
195       do {
196         zfp = gzopen(fname, mode);
197       } while (!zfp && (errno == EINTR || errno == EMFILE));
198       if(!zfp) CmiAbort("Cannot open Projections Compressed Non Delta Trace File for writing...\n");
199     }
200     if (deltaLog) {
201       do {
202         deltazfp = gzopen(dfname, mode);
203       } while (!deltazfp && (errno == EINTR || errno == EMFILE));
204       if (!deltazfp) 
205         CmiAbort("Cannot open Projections Compressed Delta Trace File for writing...\n");
206     }
207   } else {
208     if (nonDeltaLog) {
209       do {
210         fp = fopen(fname, mode);
211       } while (!fp && (errno == EINTR || errno == EMFILE));
212       if (!fp) {
213         CkPrintf("[%d] Attempting to open file [%s]\n",CkMyPe(),fname);
214         CmiAbort("Cannot open Projections Non Delta Trace File for writing...\n");
215       }
216     }
217     if (deltaLog) {
218       do {
219         deltafp = fopen(dfname, mode);
220       } while (!deltafp && (errno == EINTR || errno == EMFILE));
221       if (!deltafp) 
222         CmiAbort("Cannot open Projections Delta Trace File for writing...\n");
223     }
224   }
225 #else
226   if (nonDeltaLog) {
227     do {
228       fp = fopen(fname, mode);
229     } while (!fp && (errno == EINTR || errno == EMFILE));
230     if (!fp) {
231       CkPrintf("[%d] Attempting to open file [%s]\n",CkMyPe(),fname);
232       CmiAbort("Cannot open Projections Non Delta Trace File for writing...\n");
233     }
234   }
235   if (deltaLog) {
236     do {
237       deltafp = fopen(dfname, mode);
238     } while (!deltafp && (errno == EINTR || errno == EMFILE));
239     if(!deltafp) 
240       CmiAbort("Cannot open Projections Delta Trace File for writing...\n");
241   }
242 #endif
243 }
244
245 void LogPool::closeLog(void)
246 {
247 #if CMK_PROJECTIONS_USE_ZLIB
248   if(compressed) {
249     if (nonDeltaLog) gzclose(zfp);
250     if (deltaLog) gzclose(deltazfp);
251     return;
252   }
253 #endif
254   if (nonDeltaLog) { 
255 #if !defined(_WIN32) || defined(__CYGWIN__)
256     fsync(fileno(fp)); 
257 #endif
258     fclose(fp); 
259   }
260   if (deltaLog)  { 
261 #if !defined(_WIN32) || defined(__CYGWIN__)
262     fsync(fileno(deltafp)); 
263 #endif
264     fclose(deltafp);  
265   }
266 }
267
268 LogPool::LogPool(char *pgm) {
269   pool = new LogEntry[CkpvAccess(CtrLogBufSize)];
270   // defaults to writing data (no outlier changes)
271   writeData = true;
272   numEntries = 0;
273   // **CW** for simple delta encoding
274   prevTime = 0.0;
275   timeErr = 0.0;
276   globalStartTime = 0.0;
277   globalEndTime = 0.0;
278   headerWritten = 0;
279   numPhases = 0;
280   hasFlushed = false;
281
282   keepPhase = NULL;
283
284   fileCreated = false;
285   poolSize = CkpvAccess(CtrLogBufSize);
286   pgmname = new char[strlen(pgm)+1];
287   strcpy(pgmname, pgm);
288 }
289
290 void LogPool::createFile(const char *fix)
291 {
292   if (fileCreated) {
293     return;
294   }
295
296   char* filenameLastPart = strrchr(pgmname, PATHSEP) + 1; // Last occurrence of path separator
297   char *pathPlusFilePrefix = new char[1024];
298
299   if(nSubdirs > 0){
300     int sd = CkMyPe() % nSubdirs;
301     char *subdir = new char[1024];
302     sprintf(subdir, "%s.projdir.%d", pgmname, sd);
303     CmiMkdir(subdir);
304     sprintf(pathPlusFilePrefix, "%s%c%s%s", subdir, PATHSEP, filenameLastPart, fix);
305     delete[] subdir;
306   } else {
307     sprintf(pathPlusFilePrefix, "%s%s", pgmname, fix);
308   }
309
310   char pestr[10];
311   sprintf(pestr, "%d", CkMyPe());
312 #if CMK_PROJECTIONS_USE_ZLIB
313   int len;
314   if(compressed)
315     len = strlen(pathPlusFilePrefix)+strlen(".logold")+strlen(pestr)+strlen(".gz")+3;
316   else
317     len = strlen(pathPlusFilePrefix)+strlen(".logold")+strlen(pestr)+3;
318 #else
319   int len = strlen(pathPlusFilePrefix)+strlen(".logold")+strlen(pestr)+3;
320 #endif
321
322   if (nonDeltaLog) {
323     fname = new char[len];
324   }
325   if (deltaLog) {
326     dfname = new char[len];
327   }
328 #if CMK_PROJECTIONS_USE_ZLIB
329   if(compressed) {
330     if (deltaLog && nonDeltaLog) {
331       sprintf(fname, "%s.%s.logold.gz",  pathPlusFilePrefix, pestr);
332       sprintf(dfname, "%s.%s.log.gz", pathPlusFilePrefix, pestr);
333     } else {
334       if (nonDeltaLog) {
335         sprintf(fname, "%s.%s.log.gz", pathPlusFilePrefix,pestr);
336       } else {
337         sprintf(dfname, "%s.%s.log.gz", pathPlusFilePrefix, pestr);
338       }
339     }
340   } else {
341     if (deltaLog && nonDeltaLog) {
342       sprintf(fname, "%s.%s.logold", pathPlusFilePrefix, pestr);
343       sprintf(dfname, "%s.%s.log", pathPlusFilePrefix, pestr);
344     } else {
345       if (nonDeltaLog) {
346         sprintf(fname, "%s.%s.log", pathPlusFilePrefix, pestr);
347       } else {
348         sprintf(dfname, "%s.%s.log", pathPlusFilePrefix, pestr);
349       }
350     }
351   }
352 #else
353   if (deltaLog && nonDeltaLog) {
354     sprintf(fname, "%s.%s.logold", pathPlusFilePrefix, pestr);
355     sprintf(dfname, "%s.%s.log", pathPlusFilePrefix, pestr);
356   } else {
357     if (nonDeltaLog) {
358       sprintf(fname, "%s.%s.log", pathPlusFilePrefix, pestr);
359     } else {
360       sprintf(dfname, "%s.%s.log", pathPlusFilePrefix, pestr);
361     }
362   }
363 #endif
364   fileCreated = true;
365   delete[] pathPlusFilePrefix;
366   openLog("w+");
367   CLOSE_LOG 
368 }
369
370 void LogPool::createSts(const char *fix)
371 {
372   CkAssert(CkMyPe() == 0);
373   // create the sts file
374   char *fname = new char[strlen(CkpvAccess(traceRoot))+strlen(fix)+strlen(".sts")+2];
375   sprintf(fname, "%s%s.sts", CkpvAccess(traceRoot), fix);
376   do
377     {
378       stsfp = fopen(fname, "w");
379     } while (!stsfp && (errno == EINTR || errno == EMFILE));
380   if(stsfp==0){
381     CmiPrintf("Cannot open projections sts file for writing due to %s\n", strerror(errno));
382     CmiAbort("Error!!\n");
383   }
384   delete[] fname;
385 }  
386
387 void LogPool::createTopo(const char *fix)
388 {
389   CkAssert(CkMyPe() == 0);
390   // create the topo file
391   char *fname = new char[strlen(CkpvAccess(traceRoot))+strlen(fix)+strlen(".topo")+2];
392   sprintf(fname, "%s%s.topo", CkpvAccess(traceRoot), fix);
393   do
394     {
395       topofp = fopen(fname, "w");
396     } while (!stsfp && (errno == EINTR || errno == EMFILE));
397   if(stsfp==0){
398     CmiPrintf("Cannot open projections topo file for writing due to %s\n", strerror(errno));
399     CmiAbort("Error!!\n");
400   }
401   delete[] fname;
402 }  
403
404 void LogPool::createRC()
405 {
406   // create the projections rc file.
407   fname = 
408     new char[strlen(CkpvAccess(traceRoot))+strlen(".projrc")+1];
409   sprintf(fname, "%s.projrc", CkpvAccess(traceRoot));
410   do {
411     rcfp = fopen(fname, "w");
412   } while (!rcfp && (errno == EINTR || errno == EMFILE));
413   if (rcfp==0) {
414     CmiAbort("Cannot open projections configuration file for writing.\n");
415   }
416   delete[] fname;
417 }
418
419 LogPool::~LogPool() 
420 {
421   if (writeData) {
422     writeLog();
423 #if !CMK_TRACE_LOGFILE_NUM_CONTROL
424     closeLog();
425 #endif
426   }
427
428 #if CMK_BIGSIM_CHARM
429   extern int correctTimeLog;
430   if (correctTimeLog) {
431     createFile("-bg");
432     if (CkMyPe() == 0) {
433       createSts("-bg");
434     }
435     writeHeader();
436     if (CkMyPe() == 0) writeSts(NULL);
437     postProcessLog();
438   }
439 #endif
440
441   delete[] pool;
442   delete [] fname;
443 }
444
445 void LogPool::writeHeader()
446 {
447   if (headerWritten) return;
448   headerWritten = 1;
449   if(!binary) {
450 #if CMK_PROJECTIONS_USE_ZLIB
451     if(compressed) {
452       if (nonDeltaLog) {
453         gzprintf(zfp, "PROJECTIONS-RECORD %d\n", numEntries);
454       }
455       if (deltaLog) {
456         gzprintf(deltazfp, "PROJECTIONS-RECORD %d DELTA\n", numEntries);
457       }
458     } 
459     else /* else clause is below... */
460 #endif
461     /*... may hang over from else above */ {
462       if (nonDeltaLog) {
463         fprintf(fp, "PROJECTIONS-RECORD %d\n", numEntries);
464       }
465       if (deltaLog) {
466         fprintf(deltafp, "PROJECTIONS-RECORD %d DELTA\n", numEntries);
467       }
468     }
469   }
470   else { // binary
471       if (nonDeltaLog) {
472         fwrite(&numEntries,sizeof(numEntries),1,fp);
473       }
474       if (deltaLog) {
475         fwrite(&numEntries,sizeof(numEntries),1,deltafp);
476       }
477   }
478 }
479
480 void LogPool::writeLog(void)
481 {
482   createFile();
483   OPEN_LOG
484   writeHeader();
485   if (nonDeltaLog) write(0);
486   if (deltaLog) write(1);
487   CLOSE_LOG
488 }
489
490 void LogPool::write(int writedelta) 
491 {
492   // **CW** Simple delta encoding implementation
493   // prevTime has to be maintained as an object variable because
494   // LogPool::write may be called several times depending on the
495   // +logsize value.
496   PUP::er *p = NULL;
497   if (binary) {
498     p = new PUP::toDisk(writedelta?deltafp:fp);
499   }
500 #if CMK_PROJECTIONS_USE_ZLIB
501   else if (compressed) {
502     p = new toProjectionsGZFile(writedelta?deltazfp:zfp);
503   }
504 #endif
505   else {
506     p = new toProjectionsFile(writedelta?deltafp:fp);
507   }
508   CmiAssert(p);
509   int curPhase = 0;
510   // **FIXME** - Should probably consider a more sophisticated bounds-based
511   //   approach for selective writing instead of making multiple if-checks
512   //   for every single event.
513   for(UInt i=0; i<numEntries; i++) {
514     if (!writedelta) {
515       if (keepPhase == NULL) {
516         // default case, when no phase selection is required.
517         pool[i].pup(*p);
518       } else {
519         // **FIXME** Might be a good idea to create a "filler" event block for
520         //   all the events taken out by phase filtering.
521         if (pool[i].type == END_PHASE) {
522           // always write phase markers
523           pool[i].pup(*p);
524           curPhase++;
525         } else if (pool[i].type == BEGIN_COMPUTATION ||
526                    pool[i].type == END_COMPUTATION) {
527           // always write BEGIN and END COMPUTATION markers
528           pool[i].pup(*p);
529         } else if (keepPhase[curPhase]) {
530           pool[i].pup(*p);
531         }
532       }
533     }
534     else {      // delta
535       // **FIXME** Implement phase-selective writing for delta logs
536       //   eventually
537       double time = pool[i].time;
538       if (pool[i].type != BEGIN_COMPUTATION && pool[i].type != END_COMPUTATION)
539       {
540         double timeDiff = (time-prevTime)*1.0e6;
541         UInt intTimeDiff = (UInt)timeDiff;
542         timeErr += timeDiff - intTimeDiff; /* timeErr is never >= 2.0 */
543         if (timeErr > 1.0) {
544           timeErr -= 1.0;
545           intTimeDiff++;
546         }
547         pool[i].time = intTimeDiff/1.0e6;
548       }
549       pool[i].pup(*p);
550       pool[i].time = time;      // restore time value
551       prevTime = time;
552     }
553   }
554   delete p;
555   delete [] keepPhase;
556 }
557
558 void LogPool::writeSts(void)
559 {
560   // for whining compilers
561   int i;
562   // generate an automatic unique ID for each log
563   fprintf(stsfp, "PROJECTIONS_ID %s\n", "");
564   fprintf(stsfp, "VERSION %s\n", PROJECTION_VERSION);
565   fprintf(stsfp, "TOTAL_PHASES %d\n", numPhases);
566 #if CMK_HAS_COUNTER_PAPI
567   fprintf(stsfp, "TOTAL_PAPI_EVENTS %d\n", NUMPAPIEVENTS);
568   // for now, use i, next time use papiEvents[i].
569   // **CW** papi event names is a hack.
570   char eventName[PAPI_MAX_STR_LEN];
571   for (i=0;i<NUMPAPIEVENTS;i++) {
572     PAPI_event_code_to_name(papiEvents[i], eventName);
573     fprintf(stsfp, "PAPI_EVENT %d %s\n", i, eventName);
574   }
575 #endif
576   traceWriteSTS(stsfp,CkpvAccess(usrEvents)->length());
577   for(i=0;i<CkpvAccess(usrEvents)->length();i++){
578     fprintf(stsfp, "EVENT %d %s\n", (*CkpvAccess(usrEvents))[i]->e, (*CkpvAccess(usrEvents))[i]->str);
579   }     
580 }
581
582 void LogPool::writeSts(TraceProjections *traceProj){
583   writeSts();
584   if (traceProj != NULL) {
585     CkHashtableIterator  *funcIter = traceProj->getfuncIterator();
586     funcIter->seekStart();
587     int numFuncs = traceProj->getFuncNumber();
588     fprintf(stsfp,"TOTAL_FUNCTIONS %d \n",numFuncs);
589     while(funcIter->hasNext()) {
590       StrKey *key;
591       int *obj = (int *)funcIter->next((void **)&key);
592       fprintf(stsfp,"FUNCTION %d %s \n",*obj,key->getStr());
593     }
594   }
595   fprintf(stsfp, "END\n");
596   fclose(stsfp);
597 }
598
599 void LogPool::writeRC(void)
600 {
601     //CkPrintf("write RC is being executed\n");
602 #ifdef PROJ_ANALYSIS  
603     CkAssert(CkMyPe() == 0);
604     fprintf(rcfp,"RC_GLOBAL_START_TIME %lld\n",
605             (CMK_PUP_LONG_LONG)(1.0e6*globalStartTime));
606     fprintf(rcfp,"RC_GLOBAL_END_TIME   %lld\n",
607             (CMK_PUP_LONG_LONG)(1.0e6*globalEndTime));
608     /* //Yanhua comment it because isOutlierAutomatic is not a variable in trace
609     if (CkpvAccess(_trace)->isOutlierAutomatic()) {
610       fprintf(rcfp,"RC_OUTLIER_FILTERED true\n");
611     } else {
612       fprintf(rcfp,"RC_OUTLIER_FILTERED false\n");
613     }
614     */
615 #endif //PROJ_ANALYSIS
616   fclose(rcfp);
617 }
618
619 void LogPool::writeTopo(void)
620 {
621   TopoManager tmgr;
622   tmgr.printAllocation(topofp);
623   fclose(topofp);
624 }
625
626 #if CMK_BIGSIM_CHARM
627 static void updateProjLog(void *data, double t, double recvT, void *ptr)
628 {
629   LogEntry *log = (LogEntry *)data;
630   FILE *fp = *(FILE **)ptr;
631   log->time = t;
632   log->recvTime = recvT<0.0?0:recvT;
633 //  log->write(fp);
634   toProjectionsFile p(fp);
635   log->pup(p);
636 }
637 #endif
638
639 // flush log entries to disk
640 void LogPool::flushLogBuffer()
641 {
642   if (numEntries) {
643     double writeTime = TraceTimer();
644     writeLog();
645     hasFlushed = true;
646     numEntries = 0;
647     new (&pool[numEntries++]) LogEntry(writeTime, BEGIN_INTERRUPT);
648     new (&pool[numEntries++]) LogEntry(TraceTimer(), END_INTERRUPT);
649   }
650 }
651
652 void LogPool::add(UChar type, UShort mIdx, UShort eIdx,
653                   double time, int event, int pe, int ml, CmiObjId *id, 
654                   double recvT, double cpuT, int numPe)
655 {
656   new (&pool[numEntries++])
657     LogEntry(time, type, mIdx, eIdx, event, pe, ml, id, recvT, cpuT, numPe);
658   if ((type == END_PHASE) || (type == END_COMPUTATION)) {
659     numPhases++;
660   }
661   if(poolSize==numEntries) {
662     flushLogBuffer();
663 #if CMK_BIGSIM_CHARM
664     extern int correctTimeLog;
665     if (correctTimeLog) CmiAbort("I/O interrupt!\n");
666 #endif
667   }
668 #if CMK_BIGSIM_CHARM
669   switch (type) {
670     case BEGIN_PROCESSING:
671       pool[numEntries-1].recvTime = BgGetRecvTime();
672     case END_PROCESSING:
673     case BEGIN_COMPUTATION:
674     case END_COMPUTATION:
675     case CREATION:
676     case BEGIN_PACK:
677     case END_PACK:
678     case BEGIN_UNPACK:
679     case END_UNPACK:
680     case USER_EVENT_PAIR:
681       bgAddProjEvent(&pool[numEntries-1], numEntries-1, time, updateProjLog, &fp, BG_EVENT_PROJ);
682   }
683 #endif
684 }
685
686 void LogPool::add(UChar type,double time,UShort funcID,int lineNum,char *fileName){
687 #ifndef CMK_BIGSIM_CHARM
688   new (&pool[numEntries++])
689         LogEntry(time,type,funcID,lineNum,fileName);
690   if(poolSize == numEntries){
691     flushLogBuffer();
692   }
693 #endif  
694 }
695
696
697   
698 void LogPool::addMemoryUsage(unsigned char type,double time,double memUsage){
699 #ifndef CMK_BIGSIM_CHARM
700   new (&pool[numEntries++])
701         LogEntry(type,time,memUsage);
702   if(poolSize == numEntries){
703     flushLogBuffer();
704   }
705 #endif  
706         
707 }  
708
709
710
711 void LogPool::addUserSupplied(int data){
712         // add an event
713         add(USER_SUPPLIED, 0, 0, TraceTimer(), -1, -1, 0, 0, 0, 0, 0 );
714
715         // set the user supplied value for the previously created event 
716         pool[numEntries-1].setUserSuppliedData(data);
717   }
718
719
720 void LogPool::addUserSuppliedNote(char *note){
721         // add an event
722         add(USER_SUPPLIED_NOTE, 0, 0, TraceTimer(), -1, -1, 0, 0, 0, 0, 0 );
723
724         // set the user supplied note for the previously created event 
725         pool[numEntries-1].setUserSuppliedNote(note);
726   }
727
728 void LogPool::addUserSuppliedBracketedNote(char *note, int eventID, double bt, double et){
729   //CkPrintf("LogPool::addUserSuppliedBracketedNote eventID=%d\n", eventID);
730 #ifndef CMK_BIGSIM_CHARM
731 #if MPI_TRACE_MACHINE_HACK
732   //This part of code is used  to combine the contiguous
733   //MPI_Test and MPI_Iprobe events to reduce the number of
734   //entries
735 #define MPI_TEST_EVENT_ID 60
736 #define MPI_IPROBE_EVENT_ID 70 
737   int lastEvent = pool[numEntries-1].event;
738   if((eventID==MPI_TEST_EVENT_ID || eventID==MPI_IPROBE_EVENT_ID) && (eventID==lastEvent)){
739     //just replace the endtime of last event
740     //CkPrintf("addUserSuppliedBracketNote: for event %d\n", lastEvent);
741     pool[numEntries].endTime = et;
742   }else{
743     new (&pool[numEntries++])
744       LogEntry(bt, et, USER_SUPPLIED_BRACKETED_NOTE, note, eventID);
745   }
746 #else
747   new (&pool[numEntries++])
748     LogEntry(bt, et, USER_SUPPLIED_BRACKETED_NOTE, note, eventID);
749 #endif
750   if(poolSize == numEntries){
751     flushLogBuffer();
752   }
753 #endif  
754 }
755
756
757 /* **CW** Not sure if this is the right thing to do. Feels more like
758    a hack than a solution to Sameer's request to add the destination
759    processor information to multicasts and broadcasts.
760
761    In the unlikely event this method is used for Broadcasts as well,
762    pelist == NULL will be used to indicate a global broadcast with 
763    num PEs.
764 */
765 void LogPool::addCreationMulticast(UShort mIdx, UShort eIdx, double time,
766                                    int event, int pe, int ml, CmiObjId *id,
767                                    double recvT, int numPe, int *pelist)
768 {
769   new (&pool[numEntries++])
770     LogEntry(time, mIdx, eIdx, event, pe, ml, id, recvT, numPe, pelist);
771   if(poolSize==numEntries) {
772     flushLogBuffer();
773   }
774 }
775
776 void LogPool::postProcessLog()
777 {
778 #if CMK_BIGSIM_CHARM
779   bgUpdateProj(1);   // event type
780 #endif
781 }
782
783 void LogPool::modLastEntryTimestamp(double ts)
784 {
785   pool[numEntries-1].time = ts;
786   //pool[numEntries-1].cputime = ts;
787 }
788
789 // /** Constructor for a multicast log entry */
790 // 
791 //  THIS WAS MOVED TO trace-projections.h with the other constructors
792 // 
793 // LogEntry::LogEntry(double tm, unsigned short m, unsigned short e, int ev, int p,
794 //           int ml, CmiObjId *d, double rt, int numPe, int *pelist) 
795 // {
796 //     type = CREATION_MULTICAST; mIdx = m; eIdx = e; event = ev; pe = p; time = tm; msglen = ml;
797 //     if (d) id = *d; else {id.id[0]=id.id[1]=id.id[2]=id.id[3]=-1; };
798 //     recvTime = rt; 
799 //     numpes = numPe;
800 //     userSuppliedNote = NULL;
801 //     if (pelist != NULL) {
802 //      pes = new int[numPe];
803 //      for (int i=0; i<numPe; i++) {
804 //        pes[i] = pelist[i];
805 //      }
806 //     } else {
807 //      pes= NULL;
808 //     }
809 // }
810
811 //void LogEntry::addPapi(LONG_LONG_PAPI *papiVals)
812 //{
813 //#if CMK_HAS_COUNTER_PAPI
814 //   memcpy(papiValues, papiVals, sizeof(LONG_LONG_PAPI)*NUMPAPIEVENTS);
815 //#endif
816 //}
817
818
819
820 void LogEntry::pup(PUP::er &p)
821 {
822   int i;
823   CMK_TYPEDEF_UINT8 itime, iEndTime, irecvtime, icputime;
824   char ret = '\n';
825
826   p|type;
827   if (p.isPacking()) itime = (CMK_TYPEDEF_UINT8)(1.0e6*time);
828   if (p.isPacking()) iEndTime = (CMK_TYPEDEF_UINT8)(1.0e6*endTime);
829
830   switch (type) {
831     case USER_EVENT:
832     case USER_EVENT_PAIR:
833       p|mIdx; p|itime; p|event; p|pe;
834       break;
835     case BEGIN_IDLE:
836     case END_IDLE:
837     case BEGIN_PACK:
838     case END_PACK:
839     case BEGIN_UNPACK:
840     case END_UNPACK:
841       p|itime; p|pe; 
842       break;
843     case BEGIN_PROCESSING:
844       if (p.isPacking()) {
845         irecvtime = (CMK_TYPEDEF_UINT8)(recvTime==-1?-1:1.0e6*recvTime);
846         icputime = (CMK_TYPEDEF_UINT8)(1.0e6*cputime);
847       }
848       p|mIdx; p|eIdx; p|itime; p|event; p|pe; 
849       p|msglen; p|irecvtime; 
850       p|id.id[0]; p|id.id[1]; p|id.id[2]; p|id.id[3];
851       p|icputime;
852 #if CMK_HAS_COUNTER_PAPI
853       //p|numPapiEvents;
854       for (i=0; i<NUMPAPIEVENTS; i++) {
855         // not yet!!!
856         //      p|papiIDs[i]; 
857         p|papiValues[i];
858         
859       }
860 #else
861       //p|numPapiEvents;     // non papi version has value 0
862 #endif
863       if (p.isUnpacking()) {
864         recvTime = irecvtime/1.0e6;
865         cputime = icputime/1.0e6;
866       }
867       break;
868     case END_PROCESSING:
869       if (p.isPacking()) icputime = (CMK_TYPEDEF_UINT8)(1.0e6*cputime);
870       p|mIdx; p|eIdx; p|itime; p|event; p|pe; p|msglen; p|icputime;
871 #if CMK_HAS_COUNTER_PAPI
872       //p|numPapiEvents;
873       for (i=0; i<NUMPAPIEVENTS; i++) {
874         // not yet!!!
875         //      p|papiIDs[i];
876         p|papiValues[i];
877       }
878 #else
879       //p|numPapiEvents;  // non papi version has value 0
880 #endif
881       if (p.isUnpacking()) cputime = icputime/1.0e6;
882       break;
883     case USER_SUPPLIED:
884           p|userSuppliedData;
885           p|itime;
886         break;
887     case USER_SUPPLIED_NOTE:
888           p|itime;
889           int length;
890           if (p.isPacking()) length = strlen(userSuppliedNote);
891           p | length;
892           char space;
893           space = ' ';
894           p | space;
895           if (p.isUnpacking()) {
896             userSuppliedNote = new char[length+1];
897             userSuppliedNote[length] = '\0';
898           }
899           PUParray(p,userSuppliedNote, length);
900           break;
901     case USER_SUPPLIED_BRACKETED_NOTE:
902       //CkPrintf("Writting out a USER_SUPPLIED_BRACKETED_NOTE\n");
903           p|itime;
904           p|iEndTime;
905           p|event;
906           int length2;
907           if (p.isPacking()) length2 = strlen(userSuppliedNote);
908           p | length2;
909           char space2;
910           space2 = ' ';
911           p | space2;
912           if (p.isUnpacking()) {
913             userSuppliedNote = new char[length+1];
914             userSuppliedNote[length] = '\0';
915           }
916           PUParray(p,userSuppliedNote, length2);
917           break;
918     case MEMORY_USAGE_CURRENT:
919       p | memUsage;
920       p | itime;
921         break;
922     case CREATION:
923       if (p.isPacking()) irecvtime = (CMK_TYPEDEF_UINT8)(1.0e6*recvTime);
924       p|mIdx; p|eIdx; p|itime;
925       p|event; p|pe; p|msglen; p|irecvtime;
926       if (p.isUnpacking()) recvTime = irecvtime/1.0e6;
927       break;
928     case CREATION_BCAST:
929       if (p.isPacking()) irecvtime = (CMK_TYPEDEF_UINT8)(1.0e6*recvTime);
930       p|mIdx; p|eIdx; p|itime;
931       p|event; p|pe; p|msglen; p|irecvtime; p|numpes;
932       if (p.isUnpacking()) recvTime = irecvtime/1.0e6;
933       break;
934     case CREATION_MULTICAST:
935       if (p.isPacking()) irecvtime = (CMK_TYPEDEF_UINT8)(1.0e6*recvTime);
936       p|mIdx; p|eIdx; p|itime;
937       p|event; p|pe; p|msglen; p|irecvtime; p|numpes;
938       if (p.isUnpacking()) pes = numpes?new int[numpes]:NULL;
939       for (i=0; i<numpes; i++) p|pes[i];
940       if (p.isUnpacking()) recvTime = irecvtime/1.0e6;
941       break;
942     case MESSAGE_RECV:
943       p|mIdx; p|eIdx; p|itime; p|event; p|pe; p|msglen;
944       break;
945
946     case ENQUEUE:
947     case DEQUEUE:
948       p|mIdx; p|itime; p|event; p|pe;
949       break;
950
951     case BEGIN_INTERRUPT:
952     case END_INTERRUPT:
953       p|itime; p|event; p|pe;
954       break;
955
956       // **CW** absolute timestamps are used here to support a quick
957       // way of determining the total time of a run in projections
958       // visualization.
959     case BEGIN_COMPUTATION:
960     case END_COMPUTATION:
961     case BEGIN_TRACE:
962     case END_TRACE:
963       p|itime;
964       break;
965     case BEGIN_FUNC:
966         p | itime;
967         p | mIdx;
968         p | event;
969         if(!p.isUnpacking()){
970                 p(fName,flen-1);
971         }
972         break;
973     case END_FUNC:
974         p | itime;
975         p | mIdx;
976         break;
977     case END_PHASE:
978       p|eIdx; // FIXME: actually the phase ID
979       p|itime;
980       break;
981     default:
982       CmiError("***Internal Error*** Wierd Event %d.\n", type);
983       break;
984   }
985   if (p.isUnpacking()) time = itime/1.0e6;
986   p|ret;
987 }
988
989 TraceProjections::TraceProjections(char **argv): 
990   curevent(0), inEntry(0), computationStarted(0), 
991         converseExit(0), endTime(0.0), traceNestedEvents(0),
992         currentPhaseID(0), lastPhaseEvent(NULL)
993 {
994   //  CkPrintf("Trace projections dummy constructor called on %d\n",CkMyPe());
995
996   if (CkpvAccess(traceOnPe) == 0) return;
997
998   CtvInitialize(int,curThreadEvent);
999   CkpvInitialize(CmiInt8, CtrLogBufSize);
1000   CkpvAccess(CtrLogBufSize) = DefaultLogBufSize;
1001   CtvAccess(curThreadEvent)=0;
1002   if (CmiGetArgLongDesc(argv,"+logsize",&CkpvAccess(CtrLogBufSize), 
1003                        "Log entries to buffer per I/O")) {
1004     if (CkMyPe() == 0) {
1005       CmiPrintf("Trace: logsize: %ld\n", CkpvAccess(CtrLogBufSize));
1006     }
1007   }
1008   checknested = 
1009     CmiGetArgFlagDesc(argv,"+checknested",
1010                       "check projections nest begin end execute events");
1011   traceNestedEvents = 
1012     CmiGetArgFlagDesc(argv,"+tracenested",
1013               "trace projections nest begin/end execute events");
1014   int binary = 
1015     CmiGetArgFlagDesc(argv,"+binary-trace",
1016                       "Write log files in binary format");
1017
1018   CmiInt8 nSubdirs = 0;
1019   CmiGetArgLongDesc(argv,"+trace-subdirs", &nSubdirs, "Number of subdirectories into which traces will be written");
1020
1021
1022 #if CMK_PROJECTIONS_USE_ZLIB
1023   int compressed = true;
1024   CmiGetArgFlagDesc(argv,"+gz-trace","Write log files pre-compressed with gzip");
1025   int disableCompressed = CmiGetArgFlagDesc(argv,"+no-gz-trace","Disable writing log files pre-compressed with gzip");
1026   compressed = compressed && !disableCompressed;
1027 #else
1028   // consume the flag so there's no confusing
1029   CmiGetArgFlagDesc(argv,"+gz-trace",
1030                     "Write log files pre-compressed with gzip");
1031   if(CkMyPe() == 0) CkPrintf("Warning> gz-trace is not supported on this machine!\n");
1032 #endif
1033
1034   // **CW** default to non delta log encoding. The user may choose to do
1035   // create both logs (for debugging) or just the old log timestamping
1036   // (for compatibility).
1037   // Generating just the non delta log takes precedence over generating
1038   // both logs (if both arguments appear on the command line).
1039
1040   // switch to OLD log format until everything works // Gengbin
1041   nonDeltaLog = 1;
1042   deltaLog = 0;
1043   deltaLog = CmiGetArgFlagDesc(argv, "+logDelta",
1044                                   "Generate Delta encoded and simple timestamped log files");
1045
1046   _logPool = new LogPool(CkpvAccess(traceRoot));
1047   _logPool->setNumSubdirs(nSubdirs);
1048   _logPool->setBinary(binary);
1049 #if CMK_PROJECTIONS_USE_ZLIB
1050   _logPool->setCompressed(compressed);
1051 #endif
1052   if (CkMyPe() == 0) {
1053     _logPool->createSts();
1054     _logPool->createRC();
1055     _logPool->createTopo();
1056   }
1057   funcCount=1;
1058
1059 #if CMK_HAS_COUNTER_PAPI
1060   // We initialize and create the event sets for use with PAPI here.
1061   int papiRetValue;
1062   if(CkMyRank()==0){
1063     papiRetValue = PAPI_library_init(PAPI_VER_CURRENT);
1064     if (papiRetValue != PAPI_VER_CURRENT) {
1065       CmiAbort("PAPI Library initialization failure!\n");
1066     }
1067 #if CMK_SMP
1068     if(PAPI_thread_init(pthread_self) != PAPI_OK){
1069       CmiAbort("PAPI could not be initialized in SMP mode!\n");
1070     }
1071 #endif
1072   }
1073
1074 #if CMK_SMP
1075   //PAPI_thread_init has to finish before calling PAPI_create_eventset
1076   #if CMK_SMP_TRACE_COMMTHREAD
1077       CmiNodeAllBarrier();
1078   #else
1079       CmiNodeBarrier();
1080   #endif
1081 #endif
1082   // PAPI 3 mandates the initialization of the set to PAPI_NULL
1083   papiEventSet = PAPI_NULL; 
1084   if (PAPI_create_eventset(&papiEventSet) != PAPI_OK) {
1085     CmiAbort("PAPI failed to create event set!\n");
1086   }
1087 #ifdef USE_SPP_PAPI
1088   //  CmiPrintf("Using SPP counters for PAPI\n");
1089   if(PAPI_query_event(PAPI_FP_OPS)==PAPI_OK) {
1090     papiEvents[0] = PAPI_FP_OPS;
1091   }else{
1092     if(CmiMyPe()==0){
1093       CmiAbort("WARNING: PAPI_FP_OPS doesn't exist on this platform!");
1094     }
1095   }
1096   if(PAPI_query_event(PAPI_TOT_INS)==PAPI_OK) {
1097     papiEvents[1] = PAPI_TOT_INS;
1098   }else{
1099     CmiAbort("WARNING: PAPI_TOT_INS doesn't exist on this platform!");
1100   }
1101   int EventCode;
1102   int ret;
1103   ret=PAPI_event_name_to_code("perf::PERF_COUNT_HW_CACHE_LL:MISS",&EventCode);
1104   if(PAPI_query_event(EventCode)==PAPI_OK) {
1105     papiEvents[2] = EventCode;
1106   }else{
1107     CmiAbort("WARNING: perf::PERF_COUNT_HW_CACHE_LL:MISS doesn't exist on this platform!");
1108   }
1109   ret=PAPI_event_name_to_code("DATA_PREFETCHER:ALL",&EventCode);
1110   if(PAPI_query_event(EventCode)==PAPI_OK) {
1111     papiEvents[3] = EventCode;
1112   }else{
1113     CmiAbort("WARNING: DATA_PREFETCHER:ALL doesn't exist on this platform!");
1114   }
1115   if(PAPI_query_event(PAPI_L1_DCA)==PAPI_OK) {
1116     papiEvents[4] = PAPI_L1_DCA;
1117   }else{
1118     CmiAbort("WARNING: PAPI_L1_DCA doesn't exist on this platform!");
1119   }
1120   if(PAPI_query_event(PAPI_TOT_CYC)==PAPI_OK) {
1121     papiEvents[5] = PAPI_TOT_CYC;
1122   }else{
1123     CmiAbort("WARNING: PAPI_TOT_CYC doesn't exist on this platform!");
1124   }
1125 #else
1126   // just uses { PAPI_L2_DCM, PAPI_FP_OPS } the 2 initialized PAPI_EVENTS
1127 #endif
1128   papiRetValue = PAPI_add_events(papiEventSet, papiEvents, NUMPAPIEVENTS);
1129   if (papiRetValue < 0) {
1130     if (papiRetValue == PAPI_ECNFLCT) {
1131       CmiAbort("PAPI events conflict! Please re-assign event types!\n");
1132     } else {
1133       char error_str[PAPI_MAX_STR_LEN];
1134       PAPI_perror(papiRetValue,error_str,PAPI_MAX_STR_LEN);
1135       CmiPrintf("PAPI failed with error %s val %d\n",error_str,papiRetValue);
1136       CmiAbort("PAPI failed to add designated events!\n");
1137     }
1138   }
1139   if(CkMyPe()==0)
1140     {
1141       CmiPrintf("Registered %d PAPI counters:",NUMPAPIEVENTS);
1142       char nameBuf[PAPI_MAX_STR_LEN];
1143       for(int i=0;i<NUMPAPIEVENTS;i++)
1144         {
1145           PAPI_event_code_to_name(papiEvents[i], nameBuf);
1146           CmiPrintf("%s ",nameBuf);
1147         }
1148       CmiPrintf("\n");
1149     }
1150   memset(papiValues, 0, NUMPAPIEVENTS*sizeof(LONG_LONG_PAPI));
1151 #endif
1152 }
1153
1154 int TraceProjections::traceRegisterUserEvent(const char* evt, int e)
1155 {
1156   OPTIMIZED_VERSION
1157   CkAssert(e==-1 || e>=0);
1158   CkAssert(evt != NULL);
1159   int event;
1160   int biggest = -1;
1161   for (int i=0; i<CkpvAccess(usrEvents)->length(); i++) {
1162     int cur = (*CkpvAccess(usrEvents))[i]->e;
1163     if (cur == e) {
1164       //CmiPrintf("%s %s\n", (*CkpvAccess(usrEvents))[i]->str, evt);
1165       if (strcmp((*CkpvAccess(usrEvents))[i]->str, evt) == 0) 
1166         return e;
1167       else
1168         CmiAbort("UserEvent double registered!");
1169     }
1170     if (cur > biggest) biggest = cur;
1171   }
1172   // if no user events have so far been registered. biggest will be -1
1173   // and hence newly assigned event numbers will begin from 0.
1174   if (e==-1) event = biggest+1;  // automatically assign new event number
1175   else event = e;
1176   CkpvAccess(usrEvents)->push_back(new UsrEvent(event,(char *)evt));
1177   return event;
1178 }
1179
1180 void TraceProjections::traceClearEps(void)
1181 {
1182   // In trace-summary, this zeros out the EP bins, to eliminate noise
1183   // from startup.  Here, this isn't useful, since we can do that in
1184   // post-processing
1185 }
1186
1187 void TraceProjections::traceWriteSts(void)
1188 {
1189   if(CkMyPe()==0)
1190     _logPool->writeSts(this);
1191 }
1192
1193 /** 
1194  * **IMPT NOTES**:
1195  *
1196  * This is called when Converse closes during ConverseCommonExit().
1197  * **FIXME**(?) - is this also exposed as a tracing-framework API call?
1198  *
1199  * Some programs bypass CkExit() (like NAMD, which eventually calls
1200  * ConverseExit()), modules like traces will have to pretend to shutdown
1201  * as if CkExit() was called but at the same time avoid making
1202  * subsequent CkExit() calls (which is usually required for allowing
1203  * other modules to shutdown).
1204  *
1205  * Note that we can only get here if CkExit() was not called, since the
1206  * trace module will un-register itself from TraceArray if it did.
1207  *
1208  */
1209 void TraceProjections::traceClose(void)
1210 {
1211 #ifdef PROJ_ANALYSIS
1212   // CkPrintf("CkExit was not called on shutdown on [%d]\n", CkMyPe());
1213
1214   // sets the flag that tells the code not to make the CkExit call later
1215   converseExit = 1;
1216   if (CkMyPe() == 0) {
1217     CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1218     bocProxy.traceProjectionsParallelShutdown(-1);
1219   }
1220   if(CkMyRank() == CkMyNodeSize()){ //communication thread
1221     CkpvAccess(_trace)->endComputation();
1222     delete _logPool;              // will write
1223     // remove myself from traceArray so that no tracing will be called.
1224     CkpvAccess(_traces)->removeTrace(this);
1225   }
1226 #else
1227   // we've already deleted the logpool, so multiple calls to traceClose
1228   // are tolerated.
1229   if (_logPool == NULL) {
1230     return;
1231   }
1232   if(CkMyPe()==0){
1233     _logPool->writeSts(this);
1234   }
1235   CkpvAccess(_trace)->endComputation();
1236   delete _logPool;              // will write
1237   // remove myself from traceArray so that no tracing will be called.
1238   CkpvAccess(_traces)->removeTrace(this);
1239 #endif
1240 }
1241
1242 /**
1243  *  **IMPT NOTES**:
1244  *
1245  *  This is meant to be called internally by the tracing framework.
1246  *
1247  */
1248 void TraceProjections::closeTrace() {
1249   //  CkPrintf("Close Trace called on [%d]\n", CkMyPe());
1250   if (CkMyPe() == 0) {
1251     // CkPrintf("Pe 0 will now write sts and projrc files\n");
1252     _logPool->writeSts(this);
1253     _logPool->writeRC();
1254     _logPool->writeTopo();
1255     // CkPrintf("Pe 0 has now written sts and projrc files\n");
1256   }
1257   delete _logPool;       // will write logs to file
1258 }
1259
1260 #if CMK_SMP_TRACE_COMMTHREAD
1261 void TraceProjections::traceBeginOnCommThread()
1262 {
1263   if (!computationStarted) return;
1264   _logPool->add(BEGIN_TRACE, 0, 0, TraceTimer(), curevent++, CmiNumPes()+CmiMyNode());
1265 }
1266
1267 void TraceProjections::traceEndOnCommThread()
1268 {
1269   _logPool->add(END_TRACE, 0, 0, TraceTimer(), curevent++, CmiNumPes()+CmiMyNode());
1270 }
1271 #endif
1272
1273 void TraceProjections::traceBegin(void)
1274 {
1275   if (!computationStarted) return;
1276   _logPool->add(BEGIN_TRACE, 0, 0, TraceTimer(), curevent++, CkMyPe());
1277 }
1278
1279 void TraceProjections::traceEnd(void)
1280 {
1281   _logPool->add(END_TRACE, 0, 0, TraceTimer(), curevent++, CkMyPe());
1282 }
1283
1284 void TraceProjections::userEvent(int e)
1285 {
1286   if (!computationStarted) return;
1287   _logPool->add(USER_EVENT, e, 0, TraceTimer(),curevent++,CkMyPe());
1288 }
1289
1290 void TraceProjections::userBracketEvent(int e, double bt, double et)
1291 {
1292   if (!computationStarted) return;
1293   // two events record Begin/End of event e.
1294   _logPool->add(USER_EVENT_PAIR, e, 0, TraceTimer(bt), curevent, CkMyPe());
1295   _logPool->add(USER_EVENT_PAIR, e, 0, TraceTimer(et), curevent++, CkMyPe());
1296 }
1297
1298 void TraceProjections::userSuppliedData(int d)
1299 {
1300   if (!computationStarted) return;
1301   _logPool->addUserSupplied(d);
1302 }
1303
1304 void TraceProjections::userSuppliedNote(char *note)
1305 {
1306   if (!computationStarted) return;
1307   _logPool->addUserSuppliedNote(note);
1308 }
1309
1310
1311 void TraceProjections::userSuppliedBracketedNote(char *note, int eventID, double bt, double et)
1312 {
1313   if (!computationStarted) return;
1314   _logPool->addUserSuppliedBracketedNote(note,  eventID,  bt, et);
1315 }
1316
1317 void TraceProjections::memoryUsage(double m)
1318 {
1319   if (!computationStarted) return;
1320   _logPool->addMemoryUsage(MEMORY_USAGE_CURRENT, TraceTimer(), m );
1321   
1322 }
1323
1324
1325 void TraceProjections::creation(envelope *e, int ep, int num)
1326 {
1327   double curTime = TraceTimer();
1328   if (e == 0) {
1329     CtvAccess(curThreadEvent) = curevent;
1330     _logPool->add(CREATION, ForChareMsg, ep, curTime,
1331                   curevent++, CkMyPe(), 0, NULL, 0, 0.0);
1332   } else {
1333     int type=e->getMsgtype();
1334     e->setEvent(curevent);
1335     if (num > 1) {
1336       _logPool->add(CREATION_BCAST, type, ep, curTime,
1337                     curevent++, CkMyPe(), e->getTotalsize(), 
1338                     NULL, 0, 0.0, num);
1339     } else {
1340       _logPool->add(CREATION, type, ep, curTime,
1341                     curevent++, CkMyPe(), e->getTotalsize(), 
1342                     NULL, 0, 0.0);
1343     }
1344   }
1345 }
1346
1347 //This function is only called from a comm thread in SMP mode. 
1348 void TraceProjections::creation(char *msg)
1349 {
1350 #if CMK_SMP_TRACE_COMMTHREAD
1351         // msg must be a charm message
1352     envelope *e = (envelope *)msg;
1353     int ep = e->getEpIdx();
1354     if(_entryTable[ep]->traceEnabled) {
1355         creation(e, ep, 1);
1356         e->setSrcPe(CkMyPe());              // pretend I am the sender
1357     }
1358 #endif
1359 }
1360
1361 void TraceProjections::traceCommSetMsgID(char *msg)
1362 {
1363 #if CMK_SMP_TRACE_COMMTHREAD
1364     // msg must be a charm message
1365     envelope *e = (envelope *)msg;
1366     int ep = e->getEpIdx();
1367     if(_entryTable[ep]->traceEnabled) {
1368         e->setSrcPe(CkMyPe());              // pretend I am the sender
1369         e->setEvent(curevent);
1370     }
1371 #endif
1372 }
1373
1374 void TraceProjections::traceGetMsgID(char *msg, int *pe, int *event)
1375 {
1376     // msg must be a charm message
1377     *pe = *event = -1;
1378     envelope *e = (envelope *)msg;
1379     int ep = e->getEpIdx();
1380     if(_entryTable[ep]->traceEnabled) {
1381         *pe = e->getSrcPe();
1382         *event = e->getEvent();
1383     }
1384 }
1385
1386 void TraceProjections::traceSetMsgID(char *msg, int pe, int event)
1387 {
1388        // msg must be a charm message
1389     envelope *e = (envelope *)msg;
1390     int ep = e->getEpIdx();
1391     if(ep<=0 || ep>=_entryTable.size()) return;
1392     if (e->getSrcPe()<0 || e->getSrcPe()>=CkNumPes()+CkNumNodes()) return;
1393     if (e->getMsgtype()<=0 || e->getMsgtype()>=LAST_CK_ENVELOPE_TYPE) return;
1394     if(_entryTable[ep]->traceEnabled) {
1395         e->setSrcPe(pe);
1396         e->setEvent(event);
1397     }
1398 }
1399
1400 /* **CW** Non-disruptive attempt to add destination PE knowledge to
1401    Communication Library-specific Multicasts via new event 
1402    CREATION_MULTICAST.
1403 */
1404
1405 void TraceProjections::creationMulticast(envelope *e, int ep, int num,
1406                                          int *pelist)
1407 {
1408   double curTime = TraceTimer();
1409   if (e==0) {
1410     CtvAccess(curThreadEvent)=curevent;
1411     _logPool->addCreationMulticast(ForChareMsg, ep, curTime, curevent++,
1412                                    CkMyPe(), 0, 0, 0.0, num, pelist);
1413   } else {
1414     int type=e->getMsgtype();
1415     e->setEvent(curevent);
1416     _logPool->addCreationMulticast(type, ep, curTime, curevent++, CkMyPe(),
1417                                    e->getTotalsize(), 0, 0.0, num, pelist);
1418   }
1419 }
1420
1421 void TraceProjections::creationDone(int num)
1422 {
1423   // modified the creation done time of the last num log entries
1424   // FIXME: totally a hack
1425   double curTime = TraceTimer();
1426   int idx = _logPool->numEntries-1;
1427   while (idx >=0 && num >0 ) {
1428     LogEntry &log = _logPool->pool[idx];
1429     if ((log.type == CREATION) ||
1430         (log.type == CREATION_BCAST) ||
1431         (log.type == CREATION_MULTICAST)) {
1432       log.recvTime = curTime - log.time;
1433       num --;
1434     }
1435     idx--;
1436   }
1437 }
1438
1439 void TraceProjections::beginExecute(CmiObjId *tid)
1440 {
1441 #if CMK_HAS_COUNTER_PAPI
1442   if (PAPI_read(papiEventSet, papiValues) != PAPI_OK) {
1443     CmiAbort("PAPI failed to read at begin execute!\n");
1444   }
1445 #endif
1446   if (checknested && inEntry) CmiAbort("Nested Begin Execute!\n");
1447   execEvent = CtvAccess(curThreadEvent);
1448   execEp = (-1);
1449   _logPool->add(BEGIN_PROCESSING,ForChareMsg,_threadEP,TraceTimer(),
1450                 execEvent,CkMyPe(), 0, tid);
1451 #if CMK_HAS_COUNTER_PAPI
1452   _logPool->addPapi(papiValues);
1453 #endif
1454   inEntry = 1;
1455 }
1456
1457 void TraceProjections::beginExecute(envelope *e)
1458 {
1459   if(e==0) {
1460 #if CMK_HAS_COUNTER_PAPI
1461     if (PAPI_read(papiEventSet, papiValues) != PAPI_OK) {
1462       CmiAbort("PAPI failed to read at begin execute!\n");
1463     }
1464 #endif
1465     if (checknested && inEntry) CmiAbort("Nested Begin Execute!\n");
1466     execEvent = CtvAccess(curThreadEvent);
1467     execEp = (-1);
1468     _logPool->add(BEGIN_PROCESSING,ForChareMsg,_threadEP,TraceTimer(),
1469                   execEvent,CkMyPe(), 0, NULL, 0.0, TraceCpuTimer());
1470 #if CMK_HAS_COUNTER_PAPI
1471     _logPool->addPapi(papiValues);
1472 #endif
1473     inEntry = 1;
1474   } else {
1475     beginExecute(e->getEvent(),e->getMsgtype(),e->getEpIdx(),
1476                  e->getSrcPe(),e->getTotalsize());
1477   }
1478 }
1479
1480 void TraceProjections::beginExecute(char *msg){
1481 #if CMK_SMP_TRACE_COMMTHREAD
1482         //This function is called from comm thread in SMP mode
1483     envelope *e = (envelope *)msg;
1484     int ep = e->getEpIdx();
1485     if(_entryTable[ep]->traceEnabled)
1486                 beginExecute(e);
1487 #endif
1488 }
1489
1490 void TraceProjections::beginExecute(int event, int msgType, int ep, int srcPe,
1491                                     int mlen, CmiObjId *idx)
1492 {
1493   if (traceNestedEvents) {
1494     if (! nestedEvents.isEmpty()) {
1495       endExecuteLocal();
1496     }
1497     nestedEvents.enq(NestedEvent(event, msgType, ep, srcPe, mlen, idx));
1498   }
1499   beginExecuteLocal(event, msgType, ep, srcPe, mlen, idx);
1500 }
1501
1502 void TraceProjections::changeLastEntryTimestamp(double ts)
1503 {
1504   _logPool->modLastEntryTimestamp(ts);
1505 }
1506
1507 void TraceProjections::beginExecuteLocal(int event, int msgType, int ep, int srcPe,
1508                                     int mlen, CmiObjId *idx)
1509 {
1510 #if CMK_HAS_COUNTER_PAPI
1511   if (PAPI_read(papiEventSet, papiValues) != PAPI_OK) {
1512     CmiAbort("PAPI failed to read at begin execute!\n");
1513   }
1514 #endif
1515   if (checknested && inEntry) CmiAbort("Nested Begin Execute!\n");
1516   execEvent=event;
1517   execEp=ep;
1518   execPe=srcPe;
1519   _logPool->add(BEGIN_PROCESSING,msgType,ep,TraceTimer(),event,
1520                 srcPe, mlen, idx, 0.0, TraceCpuTimer());
1521 #if CMK_HAS_COUNTER_PAPI
1522   _logPool->addPapi(papiValues);
1523 #endif
1524   inEntry = 1;
1525 }
1526
1527 void TraceProjections::endExecute(void)
1528 {
1529   if (traceNestedEvents) nestedEvents.deq();
1530   endExecuteLocal();
1531   if (traceNestedEvents) {
1532     if (! nestedEvents.isEmpty()) {
1533       NestedEvent &ne = nestedEvents.peek();
1534       beginExecuteLocal(ne.event, ne.msgType, ne.ep, ne.srcPe, ne.ml, ne.idx);
1535     }
1536   }
1537 }
1538
1539 void TraceProjections::endExecute(char *msg)
1540 {
1541 #if CMK_SMP_TRACE_COMMTHREAD
1542         //This function is called from comm thread in SMP mode
1543     envelope *e = (envelope *)msg;
1544     int ep = e->getEpIdx();
1545     if(_entryTable[ep]->traceEnabled)
1546                 endExecute();
1547 #endif  
1548 }
1549
1550 void TraceProjections::endExecuteLocal(void)
1551 {
1552 #if CMK_HAS_COUNTER_PAPI
1553   if (PAPI_read(papiEventSet, papiValues) != PAPI_OK) {
1554     CmiAbort("PAPI failed to read at end execute!\n");
1555   }
1556 #endif
1557   if (checknested && !inEntry) CmiAbort("Nested EndExecute!\n");
1558   double cputime = TraceCpuTimer();
1559   if(execEp == (-1)) {
1560     _logPool->add(END_PROCESSING, 0, _threadEP, TraceTimer(),
1561                   execEvent, CkMyPe(), 0, NULL, 0.0, cputime);
1562   } else {
1563     _logPool->add(END_PROCESSING, 0, execEp, TraceTimer(),
1564                   execEvent, execPe, 0, NULL, 0.0, cputime);
1565   }
1566 #if CMK_HAS_COUNTER_PAPI
1567   _logPool->addPapi(papiValues);
1568 #endif
1569   inEntry = 0;
1570 }
1571
1572 void TraceProjections::messageRecv(char *env, int pe)
1573 {
1574 #if 0
1575   envelope *e = (envelope *)env;
1576   int msgType = e->getMsgtype();
1577   int ep = e->getEpIdx();
1578 #if 0
1579   if (msgType==NewChareMsg || msgType==NewVChareMsg
1580           || msgType==ForChareMsg || msgType==ForVidMsg
1581           || msgType==BocInitMsg || msgType==NodeBocInitMsg
1582           || msgType==ForBocMsg || msgType==ForNodeBocMsg)
1583     ep = e->getEpIdx();
1584   else
1585     ep = _threadEP;
1586 #endif
1587   _logPool->add(MESSAGE_RECV, msgType, ep, TraceTimer(),
1588                 curevent++, e->getSrcPe(), e->getTotalsize());
1589 #endif
1590 }
1591
1592 void TraceProjections::beginIdle(double curWallTime)
1593 {
1594   _logPool->add(BEGIN_IDLE, 0, 0, TraceTimer(curWallTime), 0, CkMyPe());
1595 }
1596
1597 void TraceProjections::endIdle(double curWallTime)
1598 {
1599   _logPool->add(END_IDLE, 0, 0, TraceTimer(curWallTime), 0, CkMyPe());
1600 }
1601
1602 void TraceProjections::beginPack(void)
1603 {
1604   _logPool->add(BEGIN_PACK, 0, 0, TraceTimer(), 0, CkMyPe());
1605 }
1606
1607 void TraceProjections::endPack(void)
1608 {
1609   _logPool->add(END_PACK, 0, 0, TraceTimer(), 0, CkMyPe());
1610 }
1611
1612 void TraceProjections::beginUnpack(void)
1613 {
1614   _logPool->add(BEGIN_UNPACK, 0, 0, TraceTimer(), 0, CkMyPe());
1615 }
1616
1617 void TraceProjections::endUnpack(void)
1618 {
1619   _logPool->add(END_UNPACK, 0, 0, TraceTimer(), 0, CkMyPe());
1620 }
1621
1622 void TraceProjections::enqueue(envelope *) {}
1623
1624 void TraceProjections::dequeue(envelope *) {}
1625
1626 void TraceProjections::beginComputation(void)
1627 {
1628   computationStarted = 1;
1629
1630   // Executes the callback function provided by the machine
1631   // layer. This is the proper method to register user events in a
1632   // machine layer because projections is a charm++ module.
1633   if (CkpvAccess(traceOnPe) != 0) {
1634     void (*ptr)() = registerMachineUserEvents();
1635     if (ptr != NULL) {
1636       ptr();
1637     }
1638   }
1639 //  CkpvAccess(traceInitTime) = TRACE_TIMER();
1640 //  CkpvAccess(traceInitCpuTime) = TRACE_CPUTIMER();
1641   _logPool->add(BEGIN_COMPUTATION, 0, 0, TraceTimer(), -1, -1);
1642 #if CMK_HAS_COUNTER_PAPI
1643   // we start the counters here
1644   if (PAPI_start(papiEventSet) != PAPI_OK) {
1645     CmiAbort("PAPI failed to start designated counters!\n");
1646   }
1647 #endif
1648 }
1649
1650 void TraceProjections::endComputation(void)
1651 {
1652 #if CMK_HAS_COUNTER_PAPI
1653   // we stop the counters here. A silent failure is alright since we
1654   // are already at the end of the program.
1655   if (PAPI_stop(papiEventSet, papiValues) != PAPI_OK) {
1656     CkPrintf("Warning: PAPI failed to stop correctly!\n");
1657   }
1658   // NOTE: We should not do a complete close of PAPI until after the
1659   // sts writer is done.
1660 #endif
1661   endTime = TraceTimer();
1662   _logPool->add(END_COMPUTATION, 0, 0, endTime, -1, -1);
1663   /*
1664   CkPrintf("End Computation [%d] records time as %lf\n", CkMyPe(), 
1665            endTime*1e06);
1666   */
1667 }
1668
1669 int TraceProjections::idxRegistered(int idx)
1670 {
1671     int idxVecLen = idxVec.size();
1672     for(int i=0; i<idxVecLen; i++)
1673     {
1674         if(idx == idxVec[i])
1675             return 1;
1676     }
1677     return 0;
1678 }
1679
1680 void TraceProjections::regFunc(const char *name, int &idx, int idxSpecifiedByUser){
1681     StrKey k((char*)name,strlen(name));
1682     int num = funcHashtable.get(k);
1683     
1684     if(num!=0) {
1685         return;
1686         //as for mpi programs, the same function may be registered for several times
1687         //CmiError("\"%s has been already registered! Please change the name!\"\n", name);
1688     }
1689     
1690     int isIdxExisting=0;
1691     if(idxSpecifiedByUser)
1692         isIdxExisting=idxRegistered(idx);
1693     if(isIdxExisting){
1694         return;
1695         //same reason with num!=0
1696         //CmiError("The identifier %d for the trace function has been already registered!", idx);
1697     }
1698
1699     if(idxSpecifiedByUser) {
1700         char *st = new char[strlen(name)+1];
1701         memcpy(st,name,strlen(name)+1);
1702         StrKey *newKey = new StrKey(st,strlen(st));
1703         int &ref = funcHashtable.put(*newKey);
1704         ref=idx;
1705         funcCount++;
1706         idxVec.push_back(idx);  
1707     } else {
1708         char *st = new char[strlen(name)+1];
1709         memcpy(st,name,strlen(name)+1);
1710         StrKey *newKey = new StrKey(st,strlen(st));
1711         int &ref = funcHashtable.put(*newKey);
1712         ref=funcCount;
1713         num = funcCount;
1714         funcCount++;
1715         idx = num;
1716         idxVec.push_back(idx);
1717     }
1718 }
1719
1720 void TraceProjections::beginFunc(char *name,char *file,int line){
1721         StrKey k(name,strlen(name));    
1722         unsigned short  num = (unsigned short)funcHashtable.get(k);
1723         beginFunc(num,file,line);
1724 }
1725
1726 void TraceProjections::beginFunc(int idx,char *file,int line){
1727         if(idx <= 0){
1728                 CmiError("Unregistered function id %d being used in %s:%d \n",idx,file,line);
1729         }       
1730         _logPool->add(BEGIN_FUNC,TraceTimer(),idx,line,file);
1731 }
1732
1733 void TraceProjections::endFunc(char *name){
1734         StrKey k(name,strlen(name));    
1735         int num = funcHashtable.get(k);
1736         endFunc(num);   
1737 }
1738
1739 void TraceProjections::endFunc(int num){
1740         if(num <= 0){
1741                 printf("endFunc without start :O\n");
1742         }
1743         _logPool->add(END_FUNC,TraceTimer(),num,0,NULL);
1744 }
1745
1746 // specialized PUP:ers for handling trace projections logs
1747 void toProjectionsFile::bytes(void *p,int n,size_t itemSize,dataType t)
1748 {
1749   for (int i=0;i<n;i++) 
1750     switch(t) {
1751     case Tchar: CheckAndFPrintF(f,"%c",((char *)p)[i]); break;
1752     case Tuchar:
1753     case Tbyte: CheckAndFPrintF(f,"%d",((unsigned char *)p)[i]); break;
1754     case Tshort: CheckAndFPrintF(f," %d",((short *)p)[i]); break;
1755     case Tushort: CheckAndFPrintF(f," %u",((unsigned short *)p)[i]); break;
1756     case Tint: CheckAndFPrintF(f," %d",((int *)p)[i]); break;
1757     case Tuint: CheckAndFPrintF(f," %u",((unsigned int *)p)[i]); break;
1758     case Tlong: CheckAndFPrintF(f," %ld",((long *)p)[i]); break;
1759     case Tulong: CheckAndFPrintF(f," %lu",((unsigned long *)p)[i]); break;
1760     case Tfloat: CheckAndFPrintF(f," %.7g",((float *)p)[i]); break;
1761     case Tdouble: CheckAndFPrintF(f," %.15g",((double *)p)[i]); break;
1762 #ifdef CMK_PUP_LONG_LONG
1763     case Tlonglong: CheckAndFPrintF(f," %lld",((CMK_PUP_LONG_LONG *)p)[i]); break;
1764     case Tulonglong: CheckAndFPrintF(f," %llu",((unsigned CMK_PUP_LONG_LONG *)p)[i]); break;
1765 #endif
1766     default: CmiAbort("Unrecognized pup type code!");
1767     };
1768 }
1769
1770 void fromProjectionsFile::bytes(void *p,int n,size_t itemSize,dataType t)
1771 {
1772   for (int i=0;i<n;i++) 
1773     switch(t) {
1774     case Tchar: { 
1775       char c = fgetc(f);
1776       if (c==EOF)
1777         parseError("Could not match character");
1778       else
1779         ((char *)p)[i] = c;
1780       break;
1781     }
1782     case Tuchar:
1783     case Tbyte: ((unsigned char *)p)[i]=(unsigned char)readInt("%d"); break;
1784     case Tshort:((short *)p)[i]=(short)readInt(); break;
1785     case Tushort: ((unsigned short *)p)[i]=(unsigned short)readUint(); break;
1786     case Tint:  ((int *)p)[i]=readInt(); break;
1787     case Tuint: ((unsigned int *)p)[i]=readUint(); break;
1788     case Tlong: ((long *)p)[i]=readInt(); break;
1789     case Tulong:((unsigned long *)p)[i]=readUint(); break;
1790     case Tfloat: ((float *)p)[i]=(float)readDouble(); break;
1791     case Tdouble:((double *)p)[i]=readDouble(); break;
1792 #ifdef CMK_PUP_LONG_LONG
1793     case Tlonglong: ((CMK_PUP_LONG_LONG *)p)[i]=readLongInt(); break;
1794     case Tulonglong: ((unsigned CMK_PUP_LONG_LONG *)p)[i]=readLongInt(); break;
1795 #endif
1796     default: CmiAbort("Unrecognized pup type code!");
1797     };
1798 }
1799
1800 #if CMK_PROJECTIONS_USE_ZLIB
1801 void toProjectionsGZFile::bytes(void *p,int n,size_t itemSize,dataType t)
1802 {
1803   for (int i=0;i<n;i++) 
1804     switch(t) {
1805     case Tchar: gzprintf(f,"%c",((char *)p)[i]); break;
1806     case Tuchar:
1807     case Tbyte: gzprintf(f,"%d",((unsigned char *)p)[i]); break;
1808     case Tshort: gzprintf(f," %d",((short *)p)[i]); break;
1809     case Tushort: gzprintf(f," %u",((unsigned short *)p)[i]); break;
1810     case Tint: gzprintf(f," %d",((int *)p)[i]); break;
1811     case Tuint: gzprintf(f," %u",((unsigned int *)p)[i]); break;
1812     case Tlong: gzprintf(f," %ld",((long *)p)[i]); break;
1813     case Tulong: gzprintf(f," %lu",((unsigned long *)p)[i]); break;
1814     case Tfloat: gzprintf(f," %.7g",((float *)p)[i]); break;
1815     case Tdouble: gzprintf(f," %.15g",((double *)p)[i]); break;
1816 #ifdef CMK_PUP_LONG_LONG
1817     case Tlonglong: gzprintf(f," %lld",((CMK_PUP_LONG_LONG *)p)[i]); break;
1818     case Tulonglong: gzprintf(f," %llu",((unsigned CMK_PUP_LONG_LONG *)p)[i]); break;
1819 #endif
1820     default: CmiAbort("Unrecognized pup type code!");
1821     };
1822 }
1823 #endif
1824
1825 void TraceProjections::endPhase() {
1826   double currentPhaseTime = TraceTimer();
1827   if (lastPhaseEvent != NULL) {
1828   } else {
1829     if (_logPool->pool != NULL) {
1830       // assumed to be BEGIN_COMPUTATION
1831     } else {
1832       CkPrintf("[%d] Warning: End Phase encountered in an empty log. Inserting BEGIN_COMPUTATION event\n", CkMyPe());
1833       _logPool->add(BEGIN_COMPUTATION, 0, 0, currentPhaseTime, -1, -1);
1834     }
1835   }
1836
1837   /* Insert endPhase event here. */
1838   /* FIXME: Format should be TYPE, PHASE#, TimeStamp, [StartTime] */
1839   /*        We currently "borrow" from the standard add() method. */
1840   /*        It should really be its own add() method.             */
1841   /* NOTE: assignment to lastPhaseEvent is "pre-emptive".         */
1842   lastPhaseEvent = &(_logPool->pool[_logPool->numEntries]);
1843   _logPool->add(END_PHASE, 0, currentPhaseID, currentPhaseTime, -1, CkMyPe());
1844   currentPhaseID++;
1845 }
1846
1847 #ifdef PROJ_ANALYSIS
1848 // ***** FROM HERE, ALL BOC-BASED FUNCTIONALITY IS DEFINED *******
1849
1850
1851 // ***@@@@ REGISTRATION FUNCTIONS/METHODS @@@@***
1852
1853 void registerOutlierReduction() {
1854   outlierReductionType =
1855     CkReduction::addReducer(outlierReduction);
1856   minMaxReductionType =
1857     CkReduction::addReducer(minMaxReduction);
1858 }
1859
1860 /**
1861  * **IMPT NOTES**:
1862  *
1863  * This is the C++ code that is registered to be activated at module
1864  * shutdown. This is called exactly once on processor 0. Module shutdown
1865  * is initiated as a result of a CkExit() call by the application code
1866  * 
1867  * The exit function must ultimately call CkExit() again to
1868  * so that other module exit functions may proceed after this module is
1869  * done.
1870  *
1871  */
1872 // FIXME: WHY extern "C"???
1873 extern "C" void TraceProjectionsExitHandler()
1874 {
1875 #if CMK_TRACE_ENABLED
1876   // CkPrintf("[%d] TraceProjectionsExitHandler called!\n", CkMyPe());
1877   CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1878   bocProxy.traceProjectionsParallelShutdown(CkMyPe());
1879 #else
1880   CkExit();
1881 #endif
1882 }
1883
1884 // This is called once on each processor but the idiom of use appears
1885 // to be to only have processor 0 register the function.
1886 //
1887 // See initnode in trace-projections.ci
1888 void initTraceProjectionsBOC()
1889 {
1890   // CkPrintf("[%d] Trace Projections initialization called!\n", CkMyPe());
1891 #ifdef __BIGSIM__
1892   if (BgNodeRank() == 0) {
1893 #else
1894     if (CkMyRank() == 0) {
1895 #endif
1896       registerExitFn(TraceProjectionsExitHandler);
1897     }
1898 #if 0
1899   } // this is so indentation does not get messed up
1900 #endif
1901 }
1902
1903 // mainchare for trace-projections BOC-operations. 
1904 // Instantiated at processor 0 and ONLY resides on processor 0 for the 
1905 // rest of its life.
1906 //
1907 // Responsible for:
1908 //   1. Handling commandline arguments
1909 //   2. Creating any objects required for proper BOC operations.
1910 //
1911 TraceProjectionsInit::TraceProjectionsInit(CkArgMsg *msg) {
1912   /** Options for Outlier Analysis */
1913   // defaults. Things will change with support for interactive analysis.
1914   bool findOutliers = false;
1915   bool outlierAutomatic = true;
1916   int numKSeeds = 10; 
1917
1918   int peNumKeep = CkNumPes();  // used as a default
1919   double entryThreshold = 0.0;
1920   bool outlierUsePhases = false;
1921   if (outlierAutomatic) {
1922     CmiGetArgIntDesc(msg->argv, "+outlierNumSeeds", &numKSeeds,
1923                      "Number of cluster seeds to apply at outlier analysis.");
1924     CmiGetArgIntDesc(msg->argv, "+outlierPeNumKeep", 
1925                      &peNumKeep, "Number of Processors to retain data");
1926     CmiGetArgDoubleDesc(msg->argv, "+outlierEpThresh", &entryThreshold,
1927                         "Minimum significance of entry points to be considered for clustering (%).");
1928     findOutliers =
1929       CmiGetArgFlagDesc(msg->argv,"+outlier", "Find Outliers.");
1930     outlierUsePhases = 
1931       CmiGetArgFlagDesc(msg->argv,"+outlierUsePhases",
1932                         "Apply automatic outlier analysis to any available phases.");
1933     if (outlierUsePhases) {
1934       // if the user wants to use an outlier feature, it is assumed outlier
1935       //    analysis is desired.
1936       findOutliers = true;
1937     }
1938   }
1939   bool findStartTime = (CmiTimerAbsolute()==1);
1940   traceProjectionsGID = CProxy_TraceProjectionsBOC::ckNew(findOutliers, findStartTime);
1941   if (findOutliers) {
1942     kMeansGID = CProxy_KMeansBOC::ckNew(outlierAutomatic,
1943                                         numKSeeds,
1944                                         peNumKeep,
1945                                         entryThreshold,
1946                                         outlierUsePhases);
1947   }
1948 }
1949
1950 // Called on every processor.
1951 void TraceProjectionsBOC::traceProjectionsParallelShutdown(int pe) {
1952   //CmiPrintf("[%d] traceProjectionsParallelShutdown called from . \n", CkMyPe(), pe);
1953   endPe = pe;                // the pe that starts CkExit()
1954   if (CkMyPe() == 0) {
1955     analysisStartTime = CmiWallTimer();
1956   }
1957   CkpvAccess(_trace)->endComputation();
1958   // no more tracing for projections on this processor after this point. 
1959   // Note that clear must be called after remove, or bad things will happen.
1960   CkpvAccess(_traces)->removeTrace(CkpvAccess(_trace));
1961   CkpvAccess(_traces)->clearTrace();
1962
1963   // From this point, we start multiple chains of reductions and broadcasts to
1964   // perform final online analysis activities.
1965
1966   // Start all parallel operations at once. 
1967   //   These MUST NOT modify base performance data in LogPool. If they must,
1968   //   then the parallel operations must be phased (and this code to be
1969   //   restructured as necessary)
1970   CProxy_KMeansBOC kMeansProxy(kMeansGID);
1971   CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1972   if (findOutliers) {
1973     parModulesRemaining++;
1974     kMeansProxy[CkMyPe()].startKMeansAnalysis();
1975   }
1976   parModulesRemaining++;
1977   if (findStartTime) 
1978   bocProxy[CkMyPe()].startTimeAnalysis();
1979   else
1980   bocProxy[CkMyPe()].startEndTimeAnalysis();
1981 }
1982
1983 // Called on each processor
1984 void KMeansBOC::startKMeansAnalysis() {
1985   // Initialize all necessary structures
1986   LogPool *pool = CkpvAccess(_trace)->_logPool;
1987
1988  if(CkMyPe()==0)     CkPrintf("[%d] KMeansBOC::startKMeansAnalysis time=\t%g\n", CkMyPe(), CkWallTimer() );
1989   int flushInt = 0;
1990   if (pool->hasFlushed) {
1991     flushInt = 1;
1992   }
1993   
1994   CkCallback cb(CkIndex_KMeansBOC::flushCheck(NULL), 
1995                 0, thisProxy);
1996   contribute(sizeof(int), &flushInt, CkReduction::logical_or, cb);  
1997 }
1998
1999 // Called on processor 0
2000 void KMeansBOC::flushCheck(CkReductionMsg *msg) {
2001   int someFlush = *((int *)msg->getData());
2002
2003   // if(CkMyPe()==0) CkPrintf("[%d] KMeansBOC::flushCheck time=\t%g\n", CkMyPe(), CkWallTimer() );
2004   
2005   if (someFlush == 0) {
2006     // Data intact proceed with KMeans analysis
2007     CProxy_KMeansBOC kMeansProxy(kMeansGID);
2008     kMeansProxy.flushCheckDone();
2009   } else {
2010     // Some processor had flushed it data at some point, abandon KMeans
2011     CkPrintf("Warning: Some processor has flushed its data. No KMeans will be conducted\n");
2012     // terminate KMeans
2013     CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
2014     bocProxy[0].kMeansDone();
2015   }
2016 }
2017
2018 // Called on each processor
2019 void KMeansBOC::flushCheckDone() {
2020   // **FIXME** - more flexible metric collection scheme may be necessary
2021   //   in the future for production use.
2022   LogPool *pool = CkpvAccess(_trace)->_logPool;
2023
2024   // if(CkMyPe()==0)     CkPrintf("[%d] KMeansBOC::flushCheckDone time=\t%g\n", CkMyPe(), CkWallTimer() );
2025
2026   numEntryMethods = _entryTable.size();
2027   numMetrics = numEntryMethods + 2; // EPtime + idle and overhead
2028
2029   // maintained across phases
2030   markedBegin = false;
2031   markedIdle = false;
2032   beginBlockTime = 0.0;
2033   beginIdleBlockTime = 0.0;
2034   lastBeginEPIdx = -1; // none
2035
2036   lastPhaseIdx = 0;
2037   currentExecTimes = NULL;
2038   currentPhase = 0;
2039   selected = false;
2040
2041   pool->initializePhases();
2042
2043   // incoming K Seeds and the per-phase filter
2044   incKSeeds = new double[numK*numMetrics];
2045   keepMetric = new bool[numMetrics];
2046
2047   //  Something wrong when call thisProxy[CkMyPe()].getNextPhaseMetrics() !??!
2048   //  CProxy_KMeansBOC kMeansProxy(kMeansGID);
2049   //  kMeansProxy[CkMyPe()].getNextPhaseMetrics();
2050   thisProxy[CkMyPe()].getNextPhaseMetrics();
2051 }
2052
2053 // Called on each processor.
2054 void KMeansBOC::getNextPhaseMetrics() {
2055   // Assumes the presence of the complete logs on this processor.
2056   // Assumes first event is always BEGIN_COMPUTATION
2057   // Assumes each processor sees the same number of phases.
2058   //
2059   // In this code, we collect performance data for this processor.
2060   // All times are in seconds.
2061
2062   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::getNextPhaseMetrics time=\t%g\n", CkMyPe(), CkWallTimer() );  
2063
2064   if (usePhases) {
2065     DEBUGF("[%d] Using Phases\n", CkMyPe());
2066   } else {
2067     DEBUGF("[%d] NOT using Phases\n", CkMyPe());
2068   }
2069   
2070   if (currentExecTimes != NULL) {
2071     delete [] currentExecTimes;
2072   }
2073   currentExecTimes = new double[numMetrics];
2074   for (int i=0; i<numMetrics; i++) {
2075     currentExecTimes[i] = 0.0;
2076   }
2077
2078   int numEventMethods = _entryTable.size();
2079   LogPool *pool = CkpvAccess(_trace)->_logPool;
2080   
2081   CkAssert(pool->numEntries > lastPhaseIdx);
2082   double totalPhaseTime = 0.0;
2083   double totalActiveTime = 0.0; // entry method + idle
2084
2085   for (int i=lastPhaseIdx; i<pool->numEntries; i++) {
2086     if (pool->pool[i].type == BEGIN_PROCESSING) {
2087       // check pairing
2088       if (!markedBegin) {
2089         markedBegin = true;
2090       }
2091       beginBlockTime = pool->pool[i].time;
2092       lastBeginEPIdx = pool->pool[i].eIdx;
2093     } else if (pool->pool[i].type == END_PROCESSING) {
2094       // check pairing
2095       // if End without a begin, just ignore
2096       //   this event. If a phase-boundary is crossed, the Begin
2097       //   event would be maintained in beginBlockTime, so it is 
2098       //   not a problem.
2099       if (markedBegin) {
2100         markedBegin = false;
2101         if (pool->pool[i].event < 0)
2102         {
2103           // ignore dummy events. **FIXME** as they have no eIdx?
2104           continue;
2105         }
2106         currentExecTimes[pool->pool[i].eIdx] += 
2107           pool->pool[i].time - beginBlockTime;
2108         totalActiveTime += pool->pool[i].time - beginBlockTime;
2109         lastBeginEPIdx = -1;
2110       }
2111     } else if (pool->pool[i].type == BEGIN_IDLE) {
2112       // check pairing
2113       if (!markedIdle) {
2114         markedIdle = true;
2115       }
2116       beginIdleBlockTime = pool->pool[i].time;
2117     } else if (pool->pool[i].type == END_IDLE) {
2118       // check pairing
2119       if (markedIdle) {
2120         markedIdle = false;
2121         currentExecTimes[numEventMethods] += 
2122           pool->pool[i].time - beginIdleBlockTime;
2123         totalActiveTime += pool->pool[i].time - beginIdleBlockTime;
2124       }
2125     } else if (pool->pool[i].type == END_PHASE) {
2126       // ignored when not using phases
2127       if (usePhases) {
2128         // when we've not visited this node before
2129         if (i != lastPhaseIdx) { 
2130           totalPhaseTime = 
2131             pool->pool[i].time - pool->pool[lastPhaseIdx].time;
2132           // it is important that proper accounting of time take place here.
2133           // Note that END_PHASE events inevitably occur in the context of
2134           //   some entry method by the way the tracing API is designed.
2135           if (markedBegin) {
2136             CkAssert(lastBeginEPIdx >= 0);
2137             currentExecTimes[lastBeginEPIdx] += 
2138               pool->pool[i].time - beginBlockTime;
2139             totalActiveTime += pool->pool[i].time - beginBlockTime;
2140             // this is so the remainder contributes to the next phase
2141             beginBlockTime = pool->pool[i].time;
2142           }
2143           // The following is unlikely, but stranger things have happened.
2144           if (markedIdle) {
2145             currentExecTimes[numEventMethods] +=
2146               pool->pool[i].time - beginIdleBlockTime;
2147             totalActiveTime += pool->pool[i].time - beginIdleBlockTime;
2148             // this is so the remainder contributes to the next phase
2149             beginIdleBlockTime = pool->pool[i].time;
2150           }
2151           if (totalActiveTime <= totalPhaseTime) {
2152             currentExecTimes[numEventMethods+1] = 
2153               totalPhaseTime - totalActiveTime;
2154           } else {
2155             currentExecTimes[numEventMethods+1] = 0.0;
2156             CkPrintf("[%d] Warning: Overhead found to be negative for Phase %d!\n",
2157                      CkMyPe(), currentPhase);
2158           }
2159           collectKMeansData();
2160           // end the loop (and method) and defer the work till the next call
2161           lastPhaseIdx = i;
2162           break; 
2163         }
2164       }
2165     } else if (pool->pool[i].type == END_COMPUTATION) {
2166       if (markedBegin) {
2167         CkAssert(lastBeginEPIdx >= 0);
2168         currentExecTimes[lastBeginEPIdx] += 
2169           pool->pool[i].time - beginBlockTime;
2170         totalActiveTime += pool->pool[i].time - beginBlockTime;
2171       }
2172       if (markedIdle) {
2173         currentExecTimes[numEventMethods] +=
2174           pool->pool[i].time - beginIdleBlockTime;
2175         totalActiveTime += pool->pool[i].time - beginIdleBlockTime;
2176       }
2177       totalPhaseTime = 
2178         pool->pool[i].time - pool->pool[lastPhaseIdx].time;
2179       if (totalActiveTime <= totalPhaseTime) {
2180         currentExecTimes[numEventMethods+1] = totalPhaseTime - totalActiveTime;
2181       } else {
2182         currentExecTimes[numEventMethods+1] = 0.0;
2183         CkPrintf("[%d] Warning: Overhead found to be negative!\n",
2184                  CkMyPe());
2185       }
2186       collectKMeansData();
2187     }
2188   }
2189 }
2190
2191 /**
2192  *  Through a reduction, collectKMeansData aggregates each processors' data
2193  *  in order for global properties to be determined:
2194  *  
2195  *  1. min & max to determine normalization factors.
2196  *  2. sum to determine global EP averages for possible metric reduction
2197  *       through thresholding.
2198  *  3. sum of squares to compute stddev which may be useful in the future.
2199  *
2200  *  collectKMeansData will also keep the processor's data for the current
2201  *    phase so that it may be normalized and worked on subsequently.
2202  *
2203  **/
2204 void KMeansBOC::collectKMeansData() {
2205   int minOffset = numMetrics;
2206   int maxOffset = 2*numMetrics;
2207   int sosOffset = 3*numMetrics; // sos = Sum Of Squares
2208
2209   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::collectKMeansData time=\tg\n", CkMyPe(), CkWallTimer() );
2210
2211   double *reductionMsg = new double[numMetrics*4];
2212
2213   for (int i=0; i<numMetrics; i++) {
2214     reductionMsg[i] = currentExecTimes[i];
2215     // duplicate the event times for max and min sections of the reduction
2216     reductionMsg[minOffset + i] = currentExecTimes[i];
2217     reductionMsg[maxOffset + i] = currentExecTimes[i];
2218     // compute squares
2219     reductionMsg[sosOffset + i] = currentExecTimes[i]*currentExecTimes[i];
2220   }
2221
2222   CkCallback cb(CkIndex_KMeansBOC::globalMetricRefinement(NULL), 
2223                 0, thisProxy);
2224   contribute((numMetrics*4)*sizeof(double), reductionMsg, 
2225              outlierReductionType, cb);  
2226 }
2227
2228 // The purpose is mainly to initialize the k seeds and generate
2229 //   normalization parameters for each of the metrics. The k seeds
2230 //   and normalization parameters are broadcast to all processors.
2231 //
2232 // Called on processor 0
2233 void KMeansBOC::globalMetricRefinement(CkReductionMsg *msg) {
2234   CkAssert(CkMyPe() == 0);
2235   
2236   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::globalMetricRefinement time=\t%g\n", CkMyPe(), CkWallTimer() );
2237
2238   int sumOffset = 0;
2239   int minOffset = numMetrics;
2240   int maxOffset = 2*numMetrics;
2241   int sosOffset = 3*numMetrics; // sos = Sum Of Squares
2242
2243   // calculate statistics & boundaries for the k seeds for clustering
2244   KMeansStatsMessage *outmsg = 
2245     new (numMetrics, numK*numMetrics, numMetrics*4) KMeansStatsMessage;
2246   outmsg->numMetrics = numMetrics;
2247   outmsg->numKPos = numK*numMetrics;
2248   outmsg->numStats = numMetrics*4;
2249
2250   // Sum | Min | Max | Sum of Squares
2251   double *totalExecTimes = (double *)msg->getData();
2252   double totalTime = 0.0;
2253
2254   for (int i=0; i<numMetrics; i++) {
2255     DEBUGN("%lf\n", totalExecTimes[i]);
2256     totalTime += totalExecTimes[i];
2257
2258     // calculate event mean over all processors
2259     outmsg->stats[sumOffset + i] = totalExecTimes[sumOffset + i]/CkNumPes();
2260
2261     // get the ranges and offsets of each metric. With this, we get
2262     //   normalization factors that can be sent back to each processor to
2263     //   be used as necessary. We reuse max for range. Min remains the offset.
2264     outmsg->stats[minOffset + i] = totalExecTimes[minOffset + i];
2265     outmsg->stats[maxOffset + i] = totalExecTimes[maxOffset + i] -
2266       totalExecTimes[minOffset + i];
2267     
2268     // calculate stddev (using biased variance)
2269     outmsg->stats[sosOffset + i] = 
2270       sqrt((totalExecTimes[sosOffset + i] - 
2271             2*(outmsg->stats[i])*totalExecTimes[i] +
2272             (outmsg->stats[i])*(outmsg->stats[i])*CkNumPes())/
2273            CkNumPes());
2274   }
2275
2276   for (int i=0; i<numMetrics; i++) {
2277     // 1) if the proportion of the max value of the entry method relative to
2278     //   the average time taken over all entry methods across all processors
2279     //   is greater than the stipulated percentage threshold ...; AND
2280     // 2) if the range of values are non-zero.
2281     //
2282     // The current assumption is totalTime > 0 (what program has zero total
2283     //   time from all work?)
2284     keepMetric[i] = ((totalExecTimes[maxOffset + i]/(totalTime/CkNumPes()) >=
2285                      entryThreshold) &&
2286       (totalExecTimes[maxOffset + i] > totalExecTimes[minOffset + i]));
2287     if (keepMetric[i]) {
2288       DEBUGF("[%d] Keep EP %d | Max = %lf | Avg Tot = %lf\n", CkMyPe(), i,
2289              totalExecTimes[maxOffset + i], totalTime/CkNumPes());
2290     } else {
2291       DEBUGN("[%d] DO NOT Keep EP %d\n", CkMyPe(), i);
2292     }
2293     outmsg->filter[i] = keepMetric[i];
2294   }
2295
2296   delete msg;
2297   
2298   // initialize k seeds for this phase
2299   kSeeds = new double[numK*numMetrics];
2300
2301   numKReported = 0;
2302   kNumMembers = new int[numK];
2303
2304   // Randomly select k processors' metric vectors for the k seeds
2305   //  srand((unsigned)(CmiWallTimer()*1.0e06));
2306   srand(11337); // for debugging purposes
2307   for (int k=0; k<numK; k++) {
2308     DEBUGF("Seed %d | ", k);
2309     for (int m=0; m<numMetrics; m++) {
2310       double factor = totalExecTimes[maxOffset + m] - 
2311         totalExecTimes[minOffset + m];
2312       // "uniform" distribution, scaled according to the normalization
2313       //   factors
2314       //      kSeeds[numMetrics*k + m] = ((1.0*(k+1))/numK)*factor;
2315       // Random distribution.
2316       kSeeds[numMetrics*k + m] =
2317         ((rand()*1.0)/RAND_MAX)*factor;
2318       if (keepMetric[m]) {
2319         DEBUGF("[%d|%lf] ", m, kSeeds[numMetrics*k + m]);
2320       }
2321       outmsg->kSeedsPos[numMetrics*k + m] = kSeeds[numMetrics*k + m];
2322     }
2323     DEBUGF("\n");
2324     kNumMembers[k] = 0;
2325   }
2326
2327   // broadcast statistical values to all processors for cluster discovery
2328   thisProxy.findInitialClusters(outmsg);
2329 }
2330
2331
2332
2333 // Called on each processor.
2334 void KMeansBOC::findInitialClusters(KMeansStatsMessage *msg) {
2335
2336  if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::findInitialClusters time=\t%g\n", CkMyPe(), CkWallTimer() );
2337
2338   phaseIter = 0;
2339
2340   // Get info from stats message
2341   CkAssert(numMetrics == msg->numMetrics);
2342   for (int i=0; i<numMetrics; i++) {
2343     keepMetric[i] = msg->filter[i];
2344   }
2345
2346   // Normalize data on local processor.
2347   // **CWL** See my thesis for detailed discussion of normalization of
2348   //    performance data.
2349   // **NOTE** This might change if we want to send data based on the filter
2350   //   instead of all the data.
2351   CkAssert(numMetrics*4 == msg->numStats);
2352   for (int i=0; i<numMetrics; i++) {
2353     currentExecTimes[i] -= msg->stats[numMetrics + i];  // take offset
2354     // **CWL** We do not normalize the range. Entry methods that exhibit
2355     //   large absolute timing variations should be allowed to contribute
2356     //   more to the Euclidean distance measure!
2357     // currentExecTimes[i] /= msg->stats[2*numMetrics + i];
2358   }
2359
2360   // **NOTE** This might change if we want to send data based on the filter
2361   //   instead of all the data.
2362   CkAssert(numK*numMetrics == msg->numKPos);
2363   for (int i=0; i<msg->numKPos; i++) {
2364     incKSeeds[i] = msg->kSeedsPos[i];
2365   }
2366
2367   // Decide which KSeed this processor belongs to.
2368   minDistance = calculateDistance(0);
2369   DEBUGN("[%d] Phase %d Iter %d | Distance from 0 = %lf \n", CkMyPe(), 
2370            currentPhase, phaseIter, minDistance);
2371   minK = 0;
2372   for (int i=1; i<numK; i++) {
2373     double distance = calculateDistance(i);
2374     DEBUGN("[%d] Phase %d Iter %d | Distance from %d = %lf \n", CkMyPe(), 
2375              currentPhase, phaseIter, i, distance);
2376     if (distance < minDistance) {
2377       minDistance = distance;
2378       minK = i;
2379     }
2380   }
2381
2382   // Set up a reduction with the modification vector to the root (0).
2383   //
2384   // The modification vector sends a negative value for each metric
2385   //   for the K this processor no longer belongs to and a positive
2386   //   value to the K the processor now belongs. In addition, a -1.0
2387   //   is sent to the K it is leaving and a +1.0 to the K it is 
2388   //   joining.
2389   //
2390   // The processor must still contribute a "zero returns" even if
2391   //   nothing changes. This will be the basis for determine
2392   //   convergence at the root.
2393   //
2394   // The addtional +1 is meant for the count-change that must be
2395   //   maintained for the special cases at the root when some K
2396   //   may be deprived of all processor points or go from 0 to a
2397   //   positive number of processors (see later comments).
2398   double *modVector = new double[numK*(numMetrics+1)];
2399   for (int i=0; i<numK; i++) {
2400     for (int j=0; j<numMetrics+1; j++) {
2401       modVector[i*(numMetrics+1) + j] = 0.0;
2402     }
2403   }
2404   for (int i=0; i<numMetrics; i++) {
2405     // for this initialization, only positive values need be sent.
2406     modVector[minK*(numMetrics+1) + i] = currentExecTimes[i];
2407   }
2408   modVector[minK*(numMetrics+1)+numMetrics] = 1.0;
2409
2410   CkCallback cb(CkIndex_KMeansBOC::updateKSeeds(NULL), 
2411                 0, thisProxy);
2412   contribute(numK*(numMetrics+1)*sizeof(double), modVector, 
2413              CkReduction::sum_double, cb);  
2414 }
2415
2416 double KMeansBOC::calculateDistance(int k) {
2417   double ret = 0.0;
2418   for (int i=0; i<numMetrics; i++) {
2419     if (keepMetric[i]) {
2420       DEBUGN("[%d] Phase %d Iter %d Metric %d Exec %lf Seed %lf \n", 
2421              CkMyPe(), currentPhase, phaseIter, i,
2422                currentExecTimes[i], incKSeeds[k*numMetrics + i]);
2423       ret += pow(currentExecTimes[i] - incKSeeds[k*numMetrics + i], 2.0);
2424     }
2425   }
2426   return sqrt(ret);
2427 }
2428
2429 void KMeansBOC::updateKSeeds(CkReductionMsg *msg) {
2430   CkAssert(CkMyPe() == 0);
2431
2432   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::updateKSeeds time=\t%g\n", CkMyPe(), CkWallTimer() );
2433
2434   double *modVector = (double *)msg->getData();
2435   // sanity check
2436   CkAssert(numK*(numMetrics+1)*sizeof(double) == msg->getSize());
2437
2438   // A quick convergence test.
2439   bool hasChanges = false;
2440   for (int i=0; i<numK; i++) {
2441     hasChanges = hasChanges || 
2442       (modVector[i*(numMetrics+1) + numMetrics] != 0.0);
2443   }
2444   if (!hasChanges) {
2445     delete msg;
2446     findRepresentatives();
2447   } else {
2448     int overallChange = 0;
2449     for (int i=0; i<numK; i++) {
2450       int change = (int)modVector[i*(numMetrics+1) + numMetrics];
2451       if (change != 0) {
2452         overallChange += change;
2453         // modify the k seeds based on the modification vectors coming in
2454         //
2455         // If a seed initially has no members, its contents do not matter and
2456         //   is simply set to the average of the incoming vector.
2457         // If the change causes a seed to lose all its members, do nothing.
2458         //   Its last-known location is kept to allow it to re-capture
2459         //   membership at the next iteration rather than apply the last
2460         //   changes (which snaps the point unnaturally to 0,0).
2461         // Otherwise, apply the appropriate vector changes.
2462         CkAssert((kNumMembers[i] + change >= 0) &&
2463                  (kNumMembers[i] + change <= CkNumPes()));
2464         if (kNumMembers[i] == 0) {
2465           CkAssert(change > 0);
2466           for (int j=0; j<numMetrics; j++) {
2467             kSeeds[i*numMetrics + j] = modVector[i*(numMetrics+1) + j]/change;
2468           }
2469         } else if (kNumMembers[i] + change == 0) {
2470           // do nothing.
2471         } else {
2472           for (int j=0; j<numMetrics; j++) {
2473             kSeeds[i*numMetrics + j] *= kNumMembers[i];
2474             kSeeds[i*numMetrics + j] += modVector[i*(numMetrics+1) + j];
2475             kSeeds[i*numMetrics + j] /= kNumMembers[i] + change;
2476           }
2477         }
2478         kNumMembers[i] += change;
2479       }
2480       DEBUGN("[%d] Phase %d Iter %d K = %d Membership Count = %d\n",
2481              CkMyPe(), currentPhase, phaseIter, i, kNumMembers[i]);
2482     }
2483     delete msg;
2484
2485     // broadcast the new seed locations.
2486     KSeedsMessage *outmsg = new (numK*numMetrics) KSeedsMessage;
2487     outmsg->numKPos = numK*numMetrics;
2488     for (int i=0; i<numK*numMetrics; i++) {
2489       outmsg->kSeedsPos[i] = kSeeds[i];
2490     }
2491
2492     thisProxy.updateSeedMembership(outmsg);
2493   }
2494 }
2495
2496 // Called on all processors
2497 void KMeansBOC::updateSeedMembership(KSeedsMessage *msg) {
2498
2499   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::updateSeedMembership time=\t%g\n", CkMyPe(), CkWallTimer() );
2500
2501   phaseIter++;
2502
2503   // **NOTE** This might change if we want to send data based on the filter
2504   //   instead of all the data.
2505   CkAssert(numK*numMetrics == msg->numKPos);
2506   for (int i=0; i<msg->numKPos; i++) {
2507     incKSeeds[i] = msg->kSeedsPos[i];
2508   }
2509
2510   // Decide which KSeed this processor belongs to.
2511   lastMinK = minK;
2512   minDistance = calculateDistance(0);
2513   DEBUGN("[%d] Phase %d Iter %d | Distance from 0 = %lf \n", CkMyPe(), 
2514          currentPhase, phaseIter, minDistance);
2515
2516   minK = 0;
2517   for (int i=1; i<numK; i++) {
2518     double distance = calculateDistance(i);
2519     DEBUGN("[%d] Phase %d Iter %d | Distance from %d = %lf \n", CkMyPe(), 
2520            currentPhase, phaseIter, i, distance);
2521     if (distance < minDistance) {
2522       minDistance = distance;
2523       minK = i;
2524     }
2525   }
2526
2527   double *modVector = new double[numK*(numMetrics+1)];
2528   for (int i=0; i<numK; i++) {
2529     for (int j=0; j<numMetrics+1; j++) {
2530       modVector[i*(numMetrics+1) + j] = 0.0;
2531     }
2532   }
2533
2534   if (minK != lastMinK) {
2535     for (int i=0; i<numMetrics; i++) {
2536       modVector[minK*(numMetrics+1) + i] = currentExecTimes[i];
2537       modVector[lastMinK*(numMetrics+1) + i] = -currentExecTimes[i];
2538     }
2539     modVector[minK*(numMetrics+1)+numMetrics] = 1.0;
2540     modVector[lastMinK*(numMetrics+1)+numMetrics] = -1.0;
2541   }
2542
2543   CkCallback cb(CkIndex_KMeansBOC::updateKSeeds(NULL), 
2544                 0, thisProxy);
2545   contribute(numK*(numMetrics+1)*sizeof(double), modVector, 
2546              CkReduction::sum_double, cb);  
2547 }
2548
2549 void KMeansBOC::findRepresentatives() {
2550
2551   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::findRepresentatives time=\t%g\n", CkMyPe(), CkWallTimer() );
2552
2553   int numNonEmptyClusters = 0;
2554   for (int i=0; i<numK; i++) {
2555     if (kNumMembers[i] > 0) {
2556       numNonEmptyClusters++;
2557     }
2558   }
2559
2560   int numRepresentatives = peNumKeep;
2561   // **FIXME**
2562   // This is fairly arbitrary. Next time, choose the centers of the top
2563   //   largest clusters.
2564   if (numRepresentatives < numNonEmptyClusters) {
2565     numRepresentatives = numNonEmptyClusters;
2566   }
2567
2568   int slotsRemaining = numRepresentatives;
2569
2570   DEBUGF("Slots = %d | Non-empty = %d \n", slotsRemaining, 
2571          numNonEmptyClusters);
2572
2573   // determine how many exemplars to select per cluster. Currently
2574   //   hardcoded to 1. Future challenge is to decide on other numbers
2575   //   or proportionality.
2576   //
2577   int exemplarsPerCluster = 1;
2578   slotsRemaining -= exemplarsPerCluster*numNonEmptyClusters;
2579
2580   int numCandidateOutliers = CkNumPes() - 
2581     exemplarsPerCluster*numNonEmptyClusters;
2582
2583   double *remainders = new double[numK];
2584   int *assigned = new int[numK];
2585   exemplarChoicesLeft = new int[numK];
2586   outlierChoicesLeft = new int[numK];
2587
2588   for (int i=0; i<numK; i++) {
2589     assigned[i] = 0;
2590     remainders[i] = 
2591       (kNumMembers[i] - exemplarsPerCluster*numNonEmptyClusters) *
2592       slotsRemaining / numCandidateOutliers;
2593     if (remainders[i] >= 0.0) {
2594       assigned[i] = (int)floor(remainders[i]);
2595       remainders[i] -= assigned[i];
2596     } else {
2597       remainders[i] = 0.0;
2598     }
2599   }
2600
2601   for (int i=0; i<numK; i++) {
2602     slotsRemaining -= assigned[i];
2603   }
2604   CkAssert(slotsRemaining >= 0);
2605
2606   // find clusters to assign the loose slots to, in order of
2607   // remainder proportion
2608   while (slotsRemaining > 0) {
2609     double max = 0.0;
2610     int winner = 0;
2611     for (int i=0; i<numK; i++) {
2612       if (remainders[i] > max) {
2613         max = remainders[i];
2614         winner = i;
2615       }
2616     }
2617     assigned[winner]++;
2618     remainders[winner] = 0.0;
2619     slotsRemaining--;
2620   }
2621
2622   // set up how many reduction cycles of min/max we need to conduct to
2623   // select the representatives.
2624   numSelectionIter = exemplarsPerCluster;
2625   for (int i=0; i<numK; i++) {
2626     if (assigned[i] > numSelectionIter) {
2627       numSelectionIter = assigned[i];
2628     }
2629   }
2630   DEBUGF("Selection Iterations = %d\n", numSelectionIter);
2631
2632   for (int i=0; i<numK; i++) {
2633     if (kNumMembers[i] > 0) {
2634       exemplarChoicesLeft[i] = exemplarsPerCluster;
2635       outlierChoicesLeft[i] = assigned[i];
2636     } else {
2637       exemplarChoicesLeft[i] = 0;
2638       outlierChoicesLeft[i] = 0;
2639     }
2640     DEBUGF("%d | Exemplar = %d | Outlier = %d\n", i, exemplarChoicesLeft[i],
2641            outlierChoicesLeft[i]);
2642   }
2643
2644   delete [] assigned;
2645   delete [] remainders;
2646
2647   // send out first broadcast
2648   KSelectionMessage *outmsg = NULL;
2649   if (numSelectionIter > 0) {
2650     outmsg = new (numK, numK, numK) KSelectionMessage;
2651     outmsg->numKMinIDs = numK;
2652     outmsg->numKMaxIDs = numK;
2653     for (int i=0; i<numK; i++) {
2654       outmsg->minIDs[i] = -1;
2655       outmsg->maxIDs[i] = -1;
2656     }
2657     thisProxy.collectDistances(outmsg);
2658   } else {
2659     CkPrintf("Warning: No selection iteration from the start!\n");
2660     // invoke phase completion on all processors
2661     thisProxy.phaseDone();
2662   }
2663 }
2664
2665 /*
2666  *  lastMin = array of minimum champions of the last tournament
2667  *  lastMax = array of maximum champions of the last tournament
2668  *  lastMaxVal = array of last encountered maximum values, allows previous
2669  *                 minimum winners to eliminate themselves from the next
2670  *                 minimum race.
2671  *
2672  *  Called on all processors.
2673  */
2674 void KMeansBOC::collectDistances(KSelectionMessage *msg) {
2675
2676   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::collectDistances time=\t%g\n", CkMyPe(), CkWallTimer() );
2677
2678   DEBUGF("[%d] %d | min = %d max = %d\n", CkMyPe(),
2679          lastMinK, msg->minIDs[lastMinK], msg->maxIDs[lastMinK]);
2680   if ((CkMyPe() == msg->minIDs[lastMinK]) || 
2681       (CkMyPe() == msg->maxIDs[lastMinK])) {
2682     CkAssert(!selected);
2683     selected = true;
2684   }
2685
2686   // build outgoing reduction structure
2687   //   format = minVal | ID | maxVal | ID
2688   double *minMaxAndIDs = NULL;
2689
2690   minMaxAndIDs = new double[numK*4];
2691   // initialize to the appropriate out-of-band values (for error checks)
2692   for (int i=0; i<numK; i++) {
2693     minMaxAndIDs[i*4] = -1.0; // out-of-band min value
2694     minMaxAndIDs[i*4+1] = -1.0; // out of band ID
2695     minMaxAndIDs[i*4+2] = -1.0; // out-of-band max value
2696     minMaxAndIDs[i*4+3] = -1.0; // out of band ID
2697   }
2698   // If I have not won before, I put myself back into the competition
2699   if (!selected) {
2700     DEBUGF("[%d] My Contribution = %lf\n", CkMyPe(), minDistance);
2701     minMaxAndIDs[lastMinK*4] = minDistance;
2702     minMaxAndIDs[lastMinK*4+1] = CkMyPe();
2703     minMaxAndIDs[lastMinK*4+2] = minDistance;
2704     minMaxAndIDs[lastMinK*4+3] = CkMyPe();
2705   }
2706   delete msg;
2707
2708   CkCallback cb(CkIndex_KMeansBOC::findNextMinMax(NULL), 
2709                 0, thisProxy);
2710   contribute(numK*4*sizeof(double), minMaxAndIDs, 
2711              minMaxReductionType, cb);  
2712 }
2713
2714 void KMeansBOC::findNextMinMax(CkReductionMsg *msg) {
2715   // incoming format:
2716   //   minVal | minID | maxVal | maxID
2717
2718   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::findNextMinMax time=\t%g\n", CkMyPe(), CkWallTimer() );
2719
2720   if (numSelectionIter > 0) {
2721     double *incInfo = (double *)msg->getData();
2722     
2723     KSelectionMessage *outmsg = new (numK, numK) KSelectionMessage;
2724     outmsg->numKMinIDs = numK;
2725     outmsg->numKMaxIDs = numK;
2726     
2727     for (int i=0; i<numK; i++) {
2728       DEBUGF("%d | %lf %d %lf %d \n", i, 
2729              incInfo[i*4], (int)incInfo[i*4+1], 
2730              incInfo[i*4+2], (int)incInfo[i*4+3]);
2731     }
2732
2733     for (int i=0; i<numK; i++) {
2734       if (exemplarChoicesLeft[i] > 0) {
2735         outmsg->minIDs[i] = (int)incInfo[i*4+1];
2736         exemplarChoicesLeft[i]--;
2737       } else {
2738         outmsg->minIDs[i] = -1;
2739       }
2740       if (outlierChoicesLeft[i] > 0) {
2741         outmsg->maxIDs[i] = (int)incInfo[i*4+3];
2742         outlierChoicesLeft[i]--;
2743       } else {
2744         outmsg->maxIDs[i] = -1;
2745       }
2746     }
2747     thisProxy.collectDistances(outmsg);
2748     numSelectionIter--;
2749   } else {
2750     // invoke phase completion on all processors
2751     thisProxy.phaseDone();
2752   }
2753 }
2754
2755 /**
2756  *  Completion of the K-Means clustering and data selection of one phase
2757  *    of the computation.
2758  *
2759  *  Called on every processor.
2760  */
2761 void KMeansBOC::phaseDone() {
2762
2763   //  if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::phaseDone time=\t%g\n", CkMyPe(), CkWallTimer() );
2764
2765   LogPool *pool = CkpvAccess(_trace)->_logPool;
2766   CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
2767
2768   // now decide on what to do with the decision.
2769   if (!selected) {
2770     if (usePhases) {
2771       pool->keepPhase[currentPhase] = false;
2772     } else {
2773       // if not using phases, we're working on the whole log
2774       pool->setAllPhases(false);
2775     }
2776   }
2777
2778   // **FIXME** (?) - All processors have to agree on this, or the reduction
2779   //   will not be correct! The question is "is this enforcible?"
2780   if ((currentPhase == (pool->numPhases-1)) || !usePhases) {
2781     // We're done
2782     int dummy = 0;
2783     CkCallback cb(CkIndex_TraceProjectionsBOC::kMeansDone(NULL), 
2784                   0, bocProxy);
2785     contribute(sizeof(int), &dummy, CkReduction::sum_int, cb);
2786   } else {
2787     // reset all phase-based k-means data and decisions
2788
2789     // **FIXME**!!!!!    
2790     
2791     // invoke the next K-Means computation phase.
2792     currentPhase++;
2793     thisProxy[CkMyPe()].getNextPhaseMetrics();
2794   }
2795 }
2796
2797 void TraceProjectionsBOC::startTimeAnalysis()
2798 {
2799   double startTime = 0.0;
2800   if (CkpvAccess(_trace)->_logPool->numEntries>0)
2801      startTime = CkpvAccess(_trace)->_logPool->pool[0].time;
2802   CkCallback cb(CkIndex_TraceProjectionsBOC::startTimeDone(NULL), thisProxy);
2803   contribute(sizeof(double), &startTime, CkReduction::min_double, cb);  
2804 }
2805
2806 void TraceProjectionsBOC::startTimeDone(CkReductionMsg *msg)
2807 {
2808   // CkPrintf("[%d] TraceProjectionsBOC::startTimeDone time=\t%g parModulesRemaining:%d\n", CkMyPe(), CkWallTimer(), parModulesRemaining);
2809
2810   if (CkpvAccess(_trace) != NULL) {
2811     CkpvAccess(_trace)->_logPool->globalStartTime = *(double *)msg->getData();
2812     CkpvAccess(_trace)->_logPool->setNewStartTime();
2813     //if (CkMyPe() == 0) CkPrintf("Start time determined to be %lf us\n", (CkpvAccess(_trace)->_logPool->globalStartTime)*1e06);
2814   }
2815   delete msg;
2816   thisProxy[CkMyPe()].startEndTimeAnalysis();
2817 }
2818
2819 void TraceProjectionsBOC::startEndTimeAnalysis()
2820 {
2821  //CkPrintf("[%d] TraceProjectionsBOC::startEndTimeAnalysis time=\t%g\n", CkMyPe(), CkWallTimer() );
2822
2823   endTime = CkpvAccess(_trace)->endTime;
2824   // CkPrintf("[%d] End time is %lf us\n", CkMyPe(), endTime*1e06);
2825
2826   CkCallback cb(CkIndex_TraceProjectionsBOC::endTimeDone(NULL), 
2827                 0, thisProxy);
2828   contribute(sizeof(double), &endTime, CkReduction::max_double, cb);  
2829 }
2830
2831 void TraceProjectionsBOC::endTimeDone(CkReductionMsg *msg)
2832 {
2833  //if(CkMyPe()==0)    CkPrintf("[%d] TraceProjectionsBOC::endTimeDone time=\t%g parModulesRemaining:%d\n", CkMyPe(), CkWallTimer(), parModulesRemaining);
2834
2835   CkAssert(CkMyPe() == 0);
2836   parModulesRemaining--;
2837   if (CkpvAccess(_trace) != NULL) {
2838     CkpvAccess(_trace)->_logPool->globalEndTime = *(double *)msg->getData() - CkpvAccess(_trace)->_logPool->globalStartTime;
2839     // CkPrintf("End time determined to be %lf us\n",
2840     //       (CkpvAccess(_trace)->_logPool->globalEndTime)*1e06);
2841   }
2842   delete msg;
2843   if (parModulesRemaining == 0) {
2844     thisProxy[CkMyPe()].finalize();
2845   }
2846 }
2847
2848 void TraceProjectionsBOC::kMeansDone(CkReductionMsg *msg) {
2849
2850  if(CkMyPe()==0)  CkPrintf("[%d] TraceProjectionsBOC::kMeansDone time=\t%g\n", CkMyPe(), CkWallTimer() );
2851
2852   CkAssert(CkMyPe() == 0);
2853   parModulesRemaining--;
2854   CkPrintf("K-Means Analysis Time = %lf seconds\n",
2855            CmiWallTimer()-analysisStartTime);
2856   delete msg;
2857   if (parModulesRemaining == 0) {
2858     thisProxy[CkMyPe()].finalize();
2859   }
2860 }
2861
2862 /**
2863  *
2864  *  This version is called (on processor 0) only if flushCheck fails.
2865  *
2866  */
2867 void TraceProjectionsBOC::kMeansDone() {
2868   CkAssert(CkMyPe() == 0);
2869   parModulesRemaining--;
2870   CkPrintf("K-Means Analysis Aborted because of flush. Time taken = %lf seconds\n",
2871            CmiWallTimer()-analysisStartTime);
2872   if (parModulesRemaining == 0) {
2873     thisProxy[CkMyPe()].finalize();
2874   }
2875 }
2876
2877 void TraceProjectionsBOC::finalize()
2878 {
2879   CkAssert(CkMyPe() == 0);
2880   //CkPrintf("Total Analysis Time = %lf seconds\n", 
2881   //       CmiWallTimer()-analysisStartTime);
2882   thisProxy.closingTraces();
2883 }
2884
2885 // Called on every processor
2886 void TraceProjectionsBOC::closingTraces() {
2887   CkpvAccess(_trace)->closeTrace();
2888
2889     // subtle:  reduction needs to go to the PE which started CkExit()
2890   int pe = 0;
2891   if (endPe != -1) pe = endPe;
2892   CkCallback cb(CkIndex_TraceProjectionsBOC::closeParallelShutdown(NULL), 
2893                 pe, thisProxy); 
2894   contribute(0, NULL, CkReduction::sum_int, cb);  
2895 }
2896
2897 // The sole purpose of this reduction is to decide whether or not
2898 //   Projections as a module needs to call CkExit() to get other
2899 //   modules to shutdown.
2900 void TraceProjectionsBOC::closeParallelShutdown(CkReductionMsg *msg) {
2901   CkAssert(endPe == -1 && CkMyPe() ==0 || CkMyPe() == endPe);
2902   delete msg;
2903   // decide if CkExit() needs to be called
2904   if (!CkpvAccess(_trace)->converseExit) {
2905     CkExit();
2906   }
2907 }
2908 /*
2909  *  Registration and definition of the Outlier Reduction callback.
2910  *  Format: Sum | Min | Max | Sum of Squares
2911  */
2912 CkReductionMsg *outlierReduction(int nMsgs,
2913                                  CkReductionMsg **msgs) {
2914   int numBytes = 0;
2915   int numMetrics = 0;
2916   double *ret = NULL;
2917
2918   if (nMsgs == 1) {
2919     // nothing to do, just pass it on
2920     return CkReductionMsg::buildNew(msgs[0]->getSize(),msgs[0]->getData());
2921   }
2922
2923   if (nMsgs > 1) {
2924     numBytes = msgs[0]->getSize();
2925     // sanity checks
2926     if (numBytes%sizeof(double) != 0) {
2927       CkAbort("Outlier Reduction Size incompatible with doubles!\n");
2928     }
2929     if ((numBytes/sizeof(double))%4 != 0) {
2930       CkAbort("Outlier Reduction Size Array not divisible by 4!\n");
2931     }
2932     numMetrics = (numBytes/sizeof(double))/4;
2933     ret = new double[numMetrics*4];
2934
2935     // copy the first message data into the return structure first
2936     for (int i=0; i<numMetrics*4; i++) {
2937       ret[i] = ((double *)msgs[0]->getData())[i];
2938     }
2939
2940     // Sum | Min | Max | Sum of Squares
2941     for (int msgIdx=1; msgIdx<nMsgs; msgIdx++) {
2942       for (int i=0; i<numMetrics; i++) {
2943         // Sum
2944         ret[i] += ((double *)msgs[msgIdx]->getData())[i];
2945         // Min
2946         ret[numMetrics + i] =
2947           (ret[numMetrics + i] < 
2948            ((double *)msgs[msgIdx]->getData())[numMetrics + i]) 
2949           ? ret[numMetrics + i] : 
2950           ((double *)msgs[msgIdx]->getData())[numMetrics + i];
2951         // Max
2952         ret[2*numMetrics + i] = 
2953           (ret[2*numMetrics + i] >
2954            ((double *)msgs[msgIdx]->getData())[2*numMetrics + i])
2955           ? ret[2*numMetrics + i] :
2956           ((double *)msgs[msgIdx]->getData())[2*numMetrics + i];
2957         // Sum of Squares (squaring already done at leaf)
2958         ret[3*numMetrics + i] +=
2959           ((double *)msgs[msgIdx]->getData())[3*numMetrics + i];
2960       }
2961     }
2962   }
2963   
2964   /* apparently, we do not delete the incoming messages */
2965   return CkReductionMsg::buildNew(numBytes,ret);
2966 }
2967
2968 /*
2969  * The only reason we have a user-defined reduction is to support
2970  *   identification of the "winning" processors as well as to handle
2971  *   both the min and the max of each "tournament". A simple min/max
2972  *   discovery cannot handle ties.
2973  */
2974 CkReductionMsg *minMaxReduction(int nMsgs,
2975                                 CkReductionMsg **msgs) {
2976   CkAssert(nMsgs > 0);
2977
2978   int numBytes = msgs[0]->getSize();
2979   CkAssert(numBytes%sizeof(double) == 0);
2980   int numK = (numBytes/sizeof(double))/4;
2981
2982   double *ret = new double[numK*4];
2983   // fill with out-of-band values
2984   for (int i=0; i<numK; i++) {
2985     ret[i*4] = -1.0;
2986     ret[i*4+1] = -1.0;
2987     ret[i*4+2] = -1.0;
2988     ret[i*4+3] = -1.0;
2989   }
2990
2991   // incoming format K * (minVal | minIdx | maxVal | maxIdx)
2992   for (int i=0; i<nMsgs; i++) {
2993     double *temp = (double *)msgs[i]->getData();
2994     for (int j=0; j<numK; j++) {
2995       // no previous valid min
2996       if (ret[j*4+1] < 0) {
2997         // fill it in only if the incoming min is valid
2998         if (temp[j*4+1] >= 0) {
2999           ret[j*4] = temp[j*4];      // fill min value
3000           ret[j*4+1] = temp[j*4+1];  // fill ID
3001         }
3002       } else {
3003         // find Min, only if incoming min is valid
3004         if (temp[j*4+1] >= 0) {
3005           if (temp[j*4] < ret[j*4]) {
3006             ret[j*4] = temp[j*4];      // replace min value
3007             ret[j*4+1] = temp[j*4+1];  // replace ID
3008           }
3009         }
3010       }
3011       // no previous valid max
3012       if (ret[j*4+3] < 0) {
3013         // fill only if the incoming max is valid
3014         if (temp[j*4+3] >= 0) {
3015           ret[j*4+2] = temp[j*4+2];  // fill max value
3016           ret[j*4+3] = temp[j*4+3];  // fill ID
3017         }
3018       } else {
3019         // find Max, only if incoming max is valid
3020         if (temp[j*4+3] >= 0) {
3021           if (temp[j*4+2] > ret[j*4+2]) {
3022             ret[j*4+2] = temp[j*4+2];  // replace max value
3023             ret[j*4+3] = temp[j*4+3];  // replace ID
3024           }
3025         }
3026       }
3027     }
3028   }
3029   CkReductionMsg *redmsg = CkReductionMsg::buildNew(numBytes, ret);
3030   delete [] ret;
3031   return redmsg;
3032 }
3033
3034 #include "TraceProjections.def.h"
3035 #endif //PROJ_ANALYSIS
3036
3037 /*@}*/