warning when projections flushing logs.
[charm.git] / src / ck-perf / trace-projections.C
1 /**
2  * \addtogroup CkPerf
3 */
4 /*@{*/
5
6 #include <string.h>
7
8 #include "charm++.h"
9 #include "trace-projections.h"
10 #include "trace-projectionsBOC.h"
11 #include "TopoManager.h"
12
13 #if DEBUG_PROJ
14 #define DEBUGF(...) CkPrintf(__VA_ARGS__)
15 #else
16 #define DEBUGF(...)
17 #endif
18 #define DEBUGN(...)  // easy way to selectively disable DEBUGs
19
20 #define DefaultLogBufSize      1000000
21
22 // **CW** Simple delta encoding implementation
23 // delta encoding is on by default. It may be turned off later in
24 // the runtime.
25 int deltaLog;
26 int nonDeltaLog;
27
28 int checknested=0;              // check illegal nested begin/end execute 
29
30 #ifdef PROJ_ANALYSIS
31 // BOC operations readonlys
32 CkGroupID traceProjectionsGID;
33 CkGroupID kMeansGID;
34
35 // New reduction type for Outlier Analysis purposes. This is allowed to be
36 // a global variable according to the Charm++ manual.
37 CkReductionMsg *outlierReduction(int nMsgs,
38                                  CkReductionMsg **msgs);
39 CkReductionMsg *minMaxReduction(int nMsgs,
40                                 CkReductionMsg **msgs);
41 CkReduction::reducerType outlierReductionType;
42 CkReduction::reducerType minMaxReductionType;
43 #endif // PROJ_ANALYSIS
44
45 CkpvStaticDeclare(TraceProjections*, _trace);
46 CtvStaticDeclare(int,curThreadEvent);
47
48 CkpvDeclare(CmiInt8, CtrLogBufSize);
49
50 typedef CkVec<char *>  usrEventVec;
51 CkpvStaticDeclare(usrEventVec, usrEventlist);
52 class UsrEvent {
53 public:
54   int e;
55   char *str;
56   UsrEvent(int _e, char* _s): e(_e),str(_s) {}
57 };
58 CkpvStaticDeclare(CkVec<UsrEvent *>*, usrEvents);
59
60 // When tracing is disabled, these are defined as empty static inlines
61 // in the header, to minimize overhead
62 #if CMK_TRACE_ENABLED
63 /// Disable the outputting of the trace logs
64 void disableTraceLogOutput()
65
66   CkpvAccess(_trace)->setWriteData(false);
67 }
68
69 /// Enable the outputting of the trace logs
70 void enableTraceLogOutput()
71 {
72   CkpvAccess(_trace)->setWriteData(true);
73 }
74
75 /// Force the log files to be flushed
76 void flushTraceLog()
77 {
78   CkpvAccess(_trace)->traceFlushLog();
79 }
80 #endif
81
82 #if ! CMK_TRACE_ENABLED
83 static int warned=0;
84 #define OPTIMIZED_VERSION       \
85         if (!warned) { warned=1;        \
86         CmiPrintf("\n\n!!!! Warning: traceUserEvent not available in optimized version!!!!\n\n\n"); }
87 #else
88 #define OPTIMIZED_VERSION /*empty*/
89 #endif // CMK_TRACE_ENABLED
90
91 /*
92 On T3E, we need to have file number control by open/close files only when needed.
93 */
94 #if CMK_TRACE_LOGFILE_NUM_CONTROL
95   #define OPEN_LOG openLog("a");
96   #define CLOSE_LOG closeLog();
97 #else
98   #define OPEN_LOG
99   #define CLOSE_LOG
100 #endif //CMK_TRACE_LOGFILE_NUM_CONTROL
101
102 #if CMK_HAS_COUNTER_PAPI
103 #ifdef USE_SPP_PAPI
104 int papiEvents[NUMPAPIEVENTS];
105 #else
106 int papiEvents[NUMPAPIEVENTS] = { PAPI_L2_DCM, PAPI_FP_OPS };
107 #endif
108 #endif // CMK_HAS_COUNTER_PAPI
109
110 /**
111   For each TraceFoo module, _createTraceFoo() must be defined.
112   This function is called in _createTraces() generated in moduleInit.C
113 */
114 void _createTraceprojections(char **argv)
115 {
116   DEBUGF("%d createTraceProjections\n", CkMyPe());
117   CkpvInitialize(CkVec<char *>, usrEventlist);
118   CkpvInitialize(CkVec<UsrEvent *>*, usrEvents);
119   CkpvAccess(usrEvents) = new CkVec<UsrEvent *>();
120 #if CMK_BIGSIM_CHARM
121   // CthRegister does not call the constructor
122 //  CkpvAccess(usrEvents) = CkVec<UsrEvent *>();
123 #endif //CMK_BIGSIM_CHARM
124   CkpvInitialize(TraceProjections*, _trace);
125   CkpvAccess(_trace) = new  TraceProjections(argv);
126   CkpvAccess(_traces)->addTrace(CkpvAccess(_trace));
127   if (CkMyPe()==0) CkPrintf("Charm++: Tracemode Projections enabled.\n");
128 }
129  
130 /* ****** CW TEMPORARY LOCATION ***** Support for thread listeners */
131
132 struct TraceThreadListener {
133   struct CthThreadListener base;
134   int event;
135   int msgType;
136   int ep;
137   int srcPe;
138   int ml;
139   CmiObjId idx;
140 };
141
142 extern "C"
143 void traceThreadListener_suspend(struct CthThreadListener *l)
144 {
145   TraceThreadListener *a=(TraceThreadListener *)l;
146   /* here, we activate the appropriate trace codes for the appropriate
147      registered modules */
148   traceSuspend();
149 }
150
151 extern "C"
152 void traceThreadListener_resume(struct CthThreadListener *l) 
153 {
154   TraceThreadListener *a=(TraceThreadListener *)l;
155   /* here, we activate the appropriate trace codes for the appropriate
156      registered modules */
157   _TRACE_BEGIN_EXECUTE_DETAILED(a->event,a->msgType,a->ep,a->srcPe,a->ml,
158                                 CthGetThreadID(a->base.thread));
159   a->event=-1;
160   a->srcPe=CkMyPe(); /* potential lie to migrated threads */
161   a->ml=0;
162 }
163
164 extern "C"
165 void traceThreadListener_free(struct CthThreadListener *l) 
166 {
167   TraceThreadListener *a=(TraceThreadListener *)l;
168   delete a;
169 }
170
171 void TraceProjections::traceAddThreadListeners(CthThread tid, envelope *e)
172 {
173 #if CMK_TRACE_ENABLED
174   /* strip essential information from the envelope */
175   TraceThreadListener *a= new TraceThreadListener;
176   
177   a->base.suspend=traceThreadListener_suspend;
178   a->base.resume=traceThreadListener_resume;
179   a->base.free=traceThreadListener_free;
180   a->event=e->getEvent();
181   a->msgType=e->getMsgtype();
182   a->ep=e->getEpIdx();
183   a->srcPe=e->getSrcPe();
184   a->ml=e->getTotalsize();
185
186   CthAddListener(tid, (CthThreadListener *)a);
187 #endif
188 }
189
190 void LogPool::openLog(const char *mode)
191 {
192 #if CMK_PROJECTIONS_USE_ZLIB
193   if(compressed) {
194     if (nonDeltaLog) {
195       do {
196         zfp = gzopen(fname, mode);
197       } while (!zfp && (errno == EINTR || errno == EMFILE));
198       if(!zfp) CmiAbort("Cannot open Projections Compressed Non Delta Trace File for writing...\n");
199     }
200     if (deltaLog) {
201       do {
202         deltazfp = gzopen(dfname, mode);
203       } while (!deltazfp && (errno == EINTR || errno == EMFILE));
204       if (!deltazfp) 
205         CmiAbort("Cannot open Projections Compressed Delta Trace File for writing...\n");
206     }
207   } else {
208     if (nonDeltaLog) {
209       do {
210         fp = fopen(fname, mode);
211       } while (!fp && (errno == EINTR || errno == EMFILE));
212       if (!fp) {
213         CkPrintf("[%d] Attempting to open file [%s]\n",CkMyPe(),fname);
214         CmiAbort("Cannot open Projections Non Delta Trace File for writing...\n");
215       }
216     }
217     if (deltaLog) {
218       do {
219         deltafp = fopen(dfname, mode);
220       } while (!deltafp && (errno == EINTR || errno == EMFILE));
221       if (!deltafp) 
222         CmiAbort("Cannot open Projections Delta Trace File for writing...\n");
223     }
224   }
225 #else
226   if (nonDeltaLog) {
227     do {
228       fp = fopen(fname, mode);
229     } while (!fp && (errno == EINTR || errno == EMFILE));
230     if (!fp) {
231       CkPrintf("[%d] Attempting to open file [%s]\n",CkMyPe(),fname);
232       CmiAbort("Cannot open Projections Non Delta Trace File for writing...\n");
233     }
234   }
235   if (deltaLog) {
236     do {
237       deltafp = fopen(dfname, mode);
238     } while (!deltafp && (errno == EINTR || errno == EMFILE));
239     if(!deltafp) 
240       CmiAbort("Cannot open Projections Delta Trace File for writing...\n");
241   }
242 #endif
243 }
244
245 void LogPool::closeLog(void)
246 {
247 #if CMK_PROJECTIONS_USE_ZLIB
248   if(compressed) {
249     if (nonDeltaLog) gzclose(zfp);
250     if (deltaLog) gzclose(deltazfp);
251     return;
252   }
253 #endif
254   if (nonDeltaLog) { 
255 #if !defined(_WIN32) || defined(__CYGWIN__)
256     fsync(fileno(fp)); 
257 #endif
258     fclose(fp); 
259   }
260   if (deltaLog)  { 
261 #if !defined(_WIN32) || defined(__CYGWIN__)
262     fsync(fileno(deltafp)); 
263 #endif
264     fclose(deltafp);  
265   }
266 }
267
268 LogPool::LogPool(char *pgm) {
269   pool = new LogEntry[CkpvAccess(CtrLogBufSize)];
270   // defaults to writing data (no outlier changes)
271   writeData = true;
272   numEntries = 0;
273   // **CW** for simple delta encoding
274   prevTime = 0.0;
275   timeErr = 0.0;
276   globalStartTime = 0.0;
277   globalEndTime = 0.0;
278   headerWritten = 0;
279   numPhases = 0;
280   hasFlushed = false;
281
282   keepPhase = NULL;
283
284   fileCreated = false;
285   poolSize = CkpvAccess(CtrLogBufSize);
286   pgmname = new char[strlen(pgm)+1];
287   strcpy(pgmname, pgm);
288 }
289
290 void LogPool::createFile(const char *fix)
291 {
292   if (fileCreated) {
293     return;
294   }
295
296   char* filenameLastPart = strrchr(pgmname, PATHSEP) + 1; // Last occurrence of path separator
297   char *pathPlusFilePrefix = new char[1024];
298
299   if(nSubdirs > 0){
300     int sd = CkMyPe() % nSubdirs;
301     char *subdir = new char[1024];
302     sprintf(subdir, "%s.projdir.%d", pgmname, sd);
303     CmiMkdir(subdir);
304     sprintf(pathPlusFilePrefix, "%s%c%s%s", subdir, PATHSEP, filenameLastPart, fix);
305     delete[] subdir;
306   } else {
307     sprintf(pathPlusFilePrefix, "%s%s", pgmname, fix);
308   }
309
310   char pestr[10];
311   sprintf(pestr, "%d", CkMyPe());
312 #if CMK_PROJECTIONS_USE_ZLIB
313   int len;
314   if(compressed)
315     len = strlen(pathPlusFilePrefix)+strlen(".logold")+strlen(pestr)+strlen(".gz")+3;
316   else
317     len = strlen(pathPlusFilePrefix)+strlen(".logold")+strlen(pestr)+3;
318 #else
319   int len = strlen(pathPlusFilePrefix)+strlen(".logold")+strlen(pestr)+3;
320 #endif
321
322   if (nonDeltaLog) {
323     fname = new char[len];
324   }
325   if (deltaLog) {
326     dfname = new char[len];
327   }
328 #if CMK_PROJECTIONS_USE_ZLIB
329   if(compressed) {
330     if (deltaLog && nonDeltaLog) {
331       sprintf(fname, "%s.%s.logold.gz",  pathPlusFilePrefix, pestr);
332       sprintf(dfname, "%s.%s.log.gz", pathPlusFilePrefix, pestr);
333     } else {
334       if (nonDeltaLog) {
335         sprintf(fname, "%s.%s.log.gz", pathPlusFilePrefix,pestr);
336       } else {
337         sprintf(dfname, "%s.%s.log.gz", pathPlusFilePrefix, pestr);
338       }
339     }
340   } else {
341     if (deltaLog && nonDeltaLog) {
342       sprintf(fname, "%s.%s.logold", pathPlusFilePrefix, pestr);
343       sprintf(dfname, "%s.%s.log", pathPlusFilePrefix, pestr);
344     } else {
345       if (nonDeltaLog) {
346         sprintf(fname, "%s.%s.log", pathPlusFilePrefix, pestr);
347       } else {
348         sprintf(dfname, "%s.%s.log", pathPlusFilePrefix, pestr);
349       }
350     }
351   }
352 #else
353   if (deltaLog && nonDeltaLog) {
354     sprintf(fname, "%s.%s.logold", pathPlusFilePrefix, pestr);
355     sprintf(dfname, "%s.%s.log", pathPlusFilePrefix, pestr);
356   } else {
357     if (nonDeltaLog) {
358       sprintf(fname, "%s.%s.log", pathPlusFilePrefix, pestr);
359     } else {
360       sprintf(dfname, "%s.%s.log", pathPlusFilePrefix, pestr);
361     }
362   }
363 #endif
364   fileCreated = true;
365   delete[] pathPlusFilePrefix;
366   openLog("w+");
367   CLOSE_LOG 
368 }
369
370 void LogPool::createSts(const char *fix)
371 {
372   CkAssert(CkMyPe() == 0);
373   // create the sts file
374   char *fname = new char[strlen(CkpvAccess(traceRoot))+strlen(fix)+strlen(".sts")+2];
375   sprintf(fname, "%s%s.sts", CkpvAccess(traceRoot), fix);
376   do
377     {
378       stsfp = fopen(fname, "w");
379     } while (!stsfp && (errno == EINTR || errno == EMFILE));
380   if(stsfp==0){
381     CmiPrintf("Cannot open projections sts file for writing due to %s\n", strerror(errno));
382     CmiAbort("Error!!\n");
383   }
384   delete[] fname;
385 }  
386
387 void LogPool::createTopo(const char *fix)
388 {
389   CkAssert(CkMyPe() == 0);
390   // create the topo file
391   char *fname = new char[strlen(CkpvAccess(traceRoot))+strlen(fix)+strlen(".topo")+2];
392   sprintf(fname, "%s%s.topo", CkpvAccess(traceRoot), fix);
393   do
394     {
395       topofp = fopen(fname, "w");
396     } while (!stsfp && (errno == EINTR || errno == EMFILE));
397   if(stsfp==0){
398     CmiPrintf("Cannot open projections topo file for writing due to %s\n", strerror(errno));
399     CmiAbort("Error!!\n");
400   }
401   delete[] fname;
402 }  
403
404 void LogPool::createRC()
405 {
406   // create the projections rc file.
407   fname = 
408     new char[strlen(CkpvAccess(traceRoot))+strlen(".projrc")+1];
409   sprintf(fname, "%s.projrc", CkpvAccess(traceRoot));
410   do {
411     rcfp = fopen(fname, "w");
412   } while (!rcfp && (errno == EINTR || errno == EMFILE));
413   if (rcfp==0) {
414     CmiAbort("Cannot open projections configuration file for writing.\n");
415   }
416   delete[] fname;
417 }
418
419 LogPool::~LogPool() 
420 {
421   if (writeData) {
422     writeLog();
423 #if !CMK_TRACE_LOGFILE_NUM_CONTROL
424     closeLog();
425 #endif
426   }
427
428 #if CMK_BIGSIM_CHARM
429   extern int correctTimeLog;
430   if (correctTimeLog) {
431     createFile("-bg");
432     if (CkMyPe() == 0) {
433       createSts("-bg");
434     }
435     writeHeader();
436     if (CkMyPe() == 0) writeSts(NULL);
437     postProcessLog();
438   }
439 #endif
440
441   delete[] pool;
442   delete [] fname;
443 }
444
445 void LogPool::writeHeader()
446 {
447   if (headerWritten) return;
448   headerWritten = 1;
449   if(!binary) {
450 #if CMK_PROJECTIONS_USE_ZLIB
451     if(compressed) {
452       if (nonDeltaLog) {
453         gzprintf(zfp, "PROJECTIONS-RECORD %d\n", numEntries);
454       }
455       if (deltaLog) {
456         gzprintf(deltazfp, "PROJECTIONS-RECORD %d DELTA\n", numEntries);
457       }
458     } 
459     else /* else clause is below... */
460 #endif
461     /*... may hang over from else above */ {
462       if (nonDeltaLog) {
463         fprintf(fp, "PROJECTIONS-RECORD %d\n", numEntries);
464       }
465       if (deltaLog) {
466         fprintf(deltafp, "PROJECTIONS-RECORD %d DELTA\n", numEntries);
467       }
468     }
469   }
470   else { // binary
471       if (nonDeltaLog) {
472         fwrite(&numEntries,sizeof(numEntries),1,fp);
473       }
474       if (deltaLog) {
475         fwrite(&numEntries,sizeof(numEntries),1,deltafp);
476       }
477   }
478 }
479
480 void LogPool::writeLog(void)
481 {
482   createFile();
483   OPEN_LOG
484   writeHeader();
485   if (nonDeltaLog) write(0);
486   if (deltaLog) write(1);
487   CLOSE_LOG
488 }
489
490 void LogPool::write(int writedelta) 
491 {
492   // **CW** Simple delta encoding implementation
493   // prevTime has to be maintained as an object variable because
494   // LogPool::write may be called several times depending on the
495   // +logsize value.
496   PUP::er *p = NULL;
497   if (binary) {
498     p = new PUP::toDisk(writedelta?deltafp:fp);
499   }
500 #if CMK_PROJECTIONS_USE_ZLIB
501   else if (compressed) {
502     p = new toProjectionsGZFile(writedelta?deltazfp:zfp);
503   }
504 #endif
505   else {
506     p = new toProjectionsFile(writedelta?deltafp:fp);
507   }
508   CmiAssert(p);
509   int curPhase = 0;
510   // **FIXME** - Should probably consider a more sophisticated bounds-based
511   //   approach for selective writing instead of making multiple if-checks
512   //   for every single event.
513   for(UInt i=0; i<numEntries; i++) {
514     if (!writedelta) {
515       if (keepPhase == NULL) {
516         // default case, when no phase selection is required.
517         pool[i].pup(*p);
518       } else {
519         // **FIXME** Might be a good idea to create a "filler" event block for
520         //   all the events taken out by phase filtering.
521         if (pool[i].type == END_PHASE) {
522           // always write phase markers
523           pool[i].pup(*p);
524           curPhase++;
525         } else if (pool[i].type == BEGIN_COMPUTATION ||
526                    pool[i].type == END_COMPUTATION) {
527           // always write BEGIN and END COMPUTATION markers
528           pool[i].pup(*p);
529         } else if (keepPhase[curPhase]) {
530           pool[i].pup(*p);
531         }
532       }
533     }
534     else {      // delta
535       // **FIXME** Implement phase-selective writing for delta logs
536       //   eventually
537       double time = pool[i].time;
538       if (pool[i].type != BEGIN_COMPUTATION && pool[i].type != END_COMPUTATION)
539       {
540         double timeDiff = (time-prevTime)*1.0e6;
541         UInt intTimeDiff = (UInt)timeDiff;
542         timeErr += timeDiff - intTimeDiff; /* timeErr is never >= 2.0 */
543         if (timeErr > 1.0) {
544           timeErr -= 1.0;
545           intTimeDiff++;
546         }
547         pool[i].time = intTimeDiff/1.0e6;
548       }
549       pool[i].pup(*p);
550       pool[i].time = time;      // restore time value
551       prevTime = time;
552     }
553   }
554   delete p;
555   delete [] keepPhase;
556 }
557
558 void LogPool::writeSts(void)
559 {
560   // for whining compilers
561   int i;
562   // generate an automatic unique ID for each log
563   fprintf(stsfp, "PROJECTIONS_ID %s\n", "");
564   fprintf(stsfp, "VERSION %s\n", PROJECTION_VERSION);
565   fprintf(stsfp, "TOTAL_PHASES %d\n", numPhases);
566 #if CMK_HAS_COUNTER_PAPI
567   fprintf(stsfp, "TOTAL_PAPI_EVENTS %d\n", NUMPAPIEVENTS);
568   // for now, use i, next time use papiEvents[i].
569   // **CW** papi event names is a hack.
570   char eventName[PAPI_MAX_STR_LEN];
571   for (i=0;i<NUMPAPIEVENTS;i++) {
572     PAPI_event_code_to_name(papiEvents[i], eventName);
573     fprintf(stsfp, "PAPI_EVENT %d %s\n", i, eventName);
574   }
575 #endif
576   traceWriteSTS(stsfp,CkpvAccess(usrEvents)->length());
577   for(i=0;i<CkpvAccess(usrEvents)->length();i++){
578     fprintf(stsfp, "EVENT %d %s\n", (*CkpvAccess(usrEvents))[i]->e, (*CkpvAccess(usrEvents))[i]->str);
579   }     
580 }
581
582 void LogPool::writeSts(TraceProjections *traceProj){
583   writeSts();
584   if (traceProj != NULL) {
585     CkHashtableIterator  *funcIter = traceProj->getfuncIterator();
586     funcIter->seekStart();
587     int numFuncs = traceProj->getFuncNumber();
588     fprintf(stsfp,"TOTAL_FUNCTIONS %d \n",numFuncs);
589     while(funcIter->hasNext()) {
590       StrKey *key;
591       int *obj = (int *)funcIter->next((void **)&key);
592       fprintf(stsfp,"FUNCTION %d %s \n",*obj,key->getStr());
593     }
594   }
595   fprintf(stsfp, "END\n");
596   fclose(stsfp);
597 }
598
599 void LogPool::writeRC(void)
600 {
601     //CkPrintf("write RC is being executed\n");
602 #ifdef PROJ_ANALYSIS  
603     CkAssert(CkMyPe() == 0);
604     fprintf(rcfp,"RC_GLOBAL_START_TIME %lld\n",
605             (CMK_PUP_LONG_LONG)(1.0e6*globalStartTime));
606     fprintf(rcfp,"RC_GLOBAL_END_TIME   %lld\n",
607             (CMK_PUP_LONG_LONG)(1.0e6*globalEndTime));
608     /* //Yanhua comment it because isOutlierAutomatic is not a variable in trace
609     if (CkpvAccess(_trace)->isOutlierAutomatic()) {
610       fprintf(rcfp,"RC_OUTLIER_FILTERED true\n");
611     } else {
612       fprintf(rcfp,"RC_OUTLIER_FILTERED false\n");
613     }
614     */
615 #endif //PROJ_ANALYSIS
616   fclose(rcfp);
617 }
618
619 void LogPool::writeTopo(void)
620 {
621   TopoManager tmgr;
622   tmgr.printAllocation(topofp);
623   fclose(topofp);
624 }
625
626 #if CMK_BIGSIM_CHARM
627 static void updateProjLog(void *data, double t, double recvT, void *ptr)
628 {
629   LogEntry *log = (LogEntry *)data;
630   FILE *fp = *(FILE **)ptr;
631   log->time = t;
632   log->recvTime = recvT<0.0?0:recvT;
633 //  log->write(fp);
634   toProjectionsFile p(fp);
635   log->pup(p);
636 }
637 #endif
638
639 // flush log entries to disk
640 void LogPool::flushLogBuffer()
641 {
642   if (numEntries) {
643     double writeTime = TraceTimer();
644     writeLog();
645     hasFlushed = true;
646     numEntries = 0;
647     new (&pool[numEntries++]) LogEntry(writeTime, BEGIN_INTERRUPT);
648     new (&pool[numEntries++]) LogEntry(TraceTimer(), END_INTERRUPT);
649     CkPrintf("Warning: Projections log flushed to disk on PE %d.\n", CkMyPe());
650   }
651 }
652
653 void LogPool::add(UChar type, UShort mIdx, UShort eIdx,
654                   double time, int event, int pe, int ml, CmiObjId *id, 
655                   double recvT, double cpuT, int numPe)
656 {
657   new (&pool[numEntries++])
658     LogEntry(time, type, mIdx, eIdx, event, pe, ml, id, recvT, cpuT, numPe);
659   if ((type == END_PHASE) || (type == END_COMPUTATION)) {
660     numPhases++;
661   }
662   if(poolSize==numEntries) {
663     flushLogBuffer();
664 #if CMK_BIGSIM_CHARM
665     extern int correctTimeLog;
666     if (correctTimeLog) CmiAbort("I/O interrupt!\n");
667 #endif
668   }
669 #if CMK_BIGSIM_CHARM
670   switch (type) {
671     case BEGIN_PROCESSING:
672       pool[numEntries-1].recvTime = BgGetRecvTime();
673     case END_PROCESSING:
674     case BEGIN_COMPUTATION:
675     case END_COMPUTATION:
676     case CREATION:
677     case BEGIN_PACK:
678     case END_PACK:
679     case BEGIN_UNPACK:
680     case END_UNPACK:
681     case USER_EVENT_PAIR:
682       bgAddProjEvent(&pool[numEntries-1], numEntries-1, time, updateProjLog, &fp, BG_EVENT_PROJ);
683   }
684 #endif
685 }
686
687 void LogPool::add(UChar type,double time,UShort funcID,int lineNum,char *fileName){
688 #ifndef CMK_BIGSIM_CHARM
689   new (&pool[numEntries++])
690         LogEntry(time,type,funcID,lineNum,fileName);
691   if(poolSize == numEntries){
692     flushLogBuffer();
693   }
694 #endif  
695 }
696
697
698   
699 void LogPool::addMemoryUsage(unsigned char type,double time,double memUsage){
700 #ifndef CMK_BIGSIM_CHARM
701   new (&pool[numEntries++])
702         LogEntry(type,time,memUsage);
703   if(poolSize == numEntries){
704     flushLogBuffer();
705   }
706 #endif  
707         
708 }  
709
710
711
712 void LogPool::addUserSupplied(int data){
713         // add an event
714         add(USER_SUPPLIED, 0, 0, TraceTimer(), -1, -1, 0, 0, 0, 0, 0 );
715
716         // set the user supplied value for the previously created event 
717         pool[numEntries-1].setUserSuppliedData(data);
718   }
719
720
721 void LogPool::addUserSuppliedNote(char *note){
722         // add an event
723         add(USER_SUPPLIED_NOTE, 0, 0, TraceTimer(), -1, -1, 0, 0, 0, 0, 0 );
724
725         // set the user supplied note for the previously created event 
726         pool[numEntries-1].setUserSuppliedNote(note);
727   }
728
729 void LogPool::addUserSuppliedBracketedNote(char *note, int eventID, double bt, double et){
730   //CkPrintf("LogPool::addUserSuppliedBracketedNote eventID=%d\n", eventID);
731 #ifndef CMK_BIGSIM_CHARM
732 #if MPI_TRACE_MACHINE_HACK
733   //This part of code is used  to combine the contiguous
734   //MPI_Test and MPI_Iprobe events to reduce the number of
735   //entries
736 #define MPI_TEST_EVENT_ID 60
737 #define MPI_IPROBE_EVENT_ID 70 
738   int lastEvent = pool[numEntries-1].event;
739   if((eventID==MPI_TEST_EVENT_ID || eventID==MPI_IPROBE_EVENT_ID) && (eventID==lastEvent)){
740     //just replace the endtime of last event
741     //CkPrintf("addUserSuppliedBracketNote: for event %d\n", lastEvent);
742     pool[numEntries].endTime = et;
743   }else{
744     new (&pool[numEntries++])
745       LogEntry(bt, et, USER_SUPPLIED_BRACKETED_NOTE, note, eventID);
746   }
747 #else
748   new (&pool[numEntries++])
749     LogEntry(bt, et, USER_SUPPLIED_BRACKETED_NOTE, note, eventID);
750 #endif
751   if(poolSize == numEntries){
752     flushLogBuffer();
753   }
754 #endif  
755 }
756
757
758 /* **CW** Not sure if this is the right thing to do. Feels more like
759    a hack than a solution to Sameer's request to add the destination
760    processor information to multicasts and broadcasts.
761
762    In the unlikely event this method is used for Broadcasts as well,
763    pelist == NULL will be used to indicate a global broadcast with 
764    num PEs.
765 */
766 void LogPool::addCreationMulticast(UShort mIdx, UShort eIdx, double time,
767                                    int event, int pe, int ml, CmiObjId *id,
768                                    double recvT, int numPe, int *pelist)
769 {
770   new (&pool[numEntries++])
771     LogEntry(time, mIdx, eIdx, event, pe, ml, id, recvT, numPe, pelist);
772   if(poolSize==numEntries) {
773     flushLogBuffer();
774   }
775 }
776
777 void LogPool::postProcessLog()
778 {
779 #if CMK_BIGSIM_CHARM
780   bgUpdateProj(1);   // event type
781 #endif
782 }
783
784 void LogPool::modLastEntryTimestamp(double ts)
785 {
786   pool[numEntries-1].time = ts;
787   //pool[numEntries-1].cputime = ts;
788 }
789
790 // /** Constructor for a multicast log entry */
791 // 
792 //  THIS WAS MOVED TO trace-projections.h with the other constructors
793 // 
794 // LogEntry::LogEntry(double tm, unsigned short m, unsigned short e, int ev, int p,
795 //           int ml, CmiObjId *d, double rt, int numPe, int *pelist) 
796 // {
797 //     type = CREATION_MULTICAST; mIdx = m; eIdx = e; event = ev; pe = p; time = tm; msglen = ml;
798 //     if (d) id = *d; else {id.id[0]=id.id[1]=id.id[2]=id.id[3]=-1; };
799 //     recvTime = rt; 
800 //     numpes = numPe;
801 //     userSuppliedNote = NULL;
802 //     if (pelist != NULL) {
803 //      pes = new int[numPe];
804 //      for (int i=0; i<numPe; i++) {
805 //        pes[i] = pelist[i];
806 //      }
807 //     } else {
808 //      pes= NULL;
809 //     }
810 // }
811
812 //void LogEntry::addPapi(LONG_LONG_PAPI *papiVals)
813 //{
814 //#if CMK_HAS_COUNTER_PAPI
815 //   memcpy(papiValues, papiVals, sizeof(LONG_LONG_PAPI)*NUMPAPIEVENTS);
816 //#endif
817 //}
818
819
820
821 void LogEntry::pup(PUP::er &p)
822 {
823   int i;
824   CMK_TYPEDEF_UINT8 itime, iEndTime, irecvtime, icputime;
825   char ret = '\n';
826
827   p|type;
828   if (p.isPacking()) itime = (CMK_TYPEDEF_UINT8)(1.0e6*time);
829   if (p.isPacking()) iEndTime = (CMK_TYPEDEF_UINT8)(1.0e6*endTime);
830
831   switch (type) {
832     case USER_EVENT:
833     case USER_EVENT_PAIR:
834       p|mIdx; p|itime; p|event; p|pe;
835       break;
836     case BEGIN_IDLE:
837     case END_IDLE:
838     case BEGIN_PACK:
839     case END_PACK:
840     case BEGIN_UNPACK:
841     case END_UNPACK:
842       p|itime; p|pe; 
843       break;
844     case BEGIN_PROCESSING:
845       if (p.isPacking()) {
846         irecvtime = (CMK_TYPEDEF_UINT8)(recvTime==-1?-1:1.0e6*recvTime);
847         icputime = (CMK_TYPEDEF_UINT8)(1.0e6*cputime);
848       }
849       p|mIdx; p|eIdx; p|itime; p|event; p|pe; 
850       p|msglen; p|irecvtime; 
851       p|id.id[0]; p|id.id[1]; p|id.id[2]; p|id.id[3];
852       p|icputime;
853 #if CMK_HAS_COUNTER_PAPI
854       //p|numPapiEvents;
855       for (i=0; i<NUMPAPIEVENTS; i++) {
856         // not yet!!!
857         //      p|papiIDs[i]; 
858         p|papiValues[i];
859         
860       }
861 #else
862       //p|numPapiEvents;     // non papi version has value 0
863 #endif
864       if (p.isUnpacking()) {
865         recvTime = irecvtime/1.0e6;
866         cputime = icputime/1.0e6;
867       }
868       break;
869     case END_PROCESSING:
870       if (p.isPacking()) icputime = (CMK_TYPEDEF_UINT8)(1.0e6*cputime);
871       p|mIdx; p|eIdx; p|itime; p|event; p|pe; p|msglen; p|icputime;
872 #if CMK_HAS_COUNTER_PAPI
873       //p|numPapiEvents;
874       for (i=0; i<NUMPAPIEVENTS; i++) {
875         // not yet!!!
876         //      p|papiIDs[i];
877         p|papiValues[i];
878       }
879 #else
880       //p|numPapiEvents;  // non papi version has value 0
881 #endif
882       if (p.isUnpacking()) cputime = icputime/1.0e6;
883       break;
884     case USER_SUPPLIED:
885           p|userSuppliedData;
886           p|itime;
887         break;
888     case USER_SUPPLIED_NOTE:
889           p|itime;
890           int length;
891           if (p.isPacking()) length = strlen(userSuppliedNote);
892           p | length;
893           char space;
894           space = ' ';
895           p | space;
896           if (p.isUnpacking()) {
897             userSuppliedNote = new char[length+1];
898             userSuppliedNote[length] = '\0';
899           }
900           PUParray(p,userSuppliedNote, length);
901           break;
902     case USER_SUPPLIED_BRACKETED_NOTE:
903       //CkPrintf("Writting out a USER_SUPPLIED_BRACKETED_NOTE\n");
904           p|itime;
905           p|iEndTime;
906           p|event;
907           int length2;
908           if (p.isPacking()) length2 = strlen(userSuppliedNote);
909           p | length2;
910           char space2;
911           space2 = ' ';
912           p | space2;
913           if (p.isUnpacking()) {
914             userSuppliedNote = new char[length+1];
915             userSuppliedNote[length] = '\0';
916           }
917           PUParray(p,userSuppliedNote, length2);
918           break;
919     case MEMORY_USAGE_CURRENT:
920       p | memUsage;
921       p | itime;
922         break;
923     case CREATION:
924       if (p.isPacking()) irecvtime = (CMK_TYPEDEF_UINT8)(1.0e6*recvTime);
925       p|mIdx; p|eIdx; p|itime;
926       p|event; p|pe; p|msglen; p|irecvtime;
927       if (p.isUnpacking()) recvTime = irecvtime/1.0e6;
928       break;
929     case CREATION_BCAST:
930       if (p.isPacking()) irecvtime = (CMK_TYPEDEF_UINT8)(1.0e6*recvTime);
931       p|mIdx; p|eIdx; p|itime;
932       p|event; p|pe; p|msglen; p|irecvtime; p|numpes;
933       if (p.isUnpacking()) recvTime = irecvtime/1.0e6;
934       break;
935     case CREATION_MULTICAST:
936       if (p.isPacking()) irecvtime = (CMK_TYPEDEF_UINT8)(1.0e6*recvTime);
937       p|mIdx; p|eIdx; p|itime;
938       p|event; p|pe; p|msglen; p|irecvtime; p|numpes;
939       if (p.isUnpacking()) pes = numpes?new int[numpes]:NULL;
940       for (i=0; i<numpes; i++) p|pes[i];
941       if (p.isUnpacking()) recvTime = irecvtime/1.0e6;
942       break;
943     case MESSAGE_RECV:
944       p|mIdx; p|eIdx; p|itime; p|event; p|pe; p|msglen;
945       break;
946
947     case ENQUEUE:
948     case DEQUEUE:
949       p|mIdx; p|itime; p|event; p|pe;
950       break;
951
952     case BEGIN_INTERRUPT:
953     case END_INTERRUPT:
954       p|itime; p|event; p|pe;
955       break;
956
957       // **CW** absolute timestamps are used here to support a quick
958       // way of determining the total time of a run in projections
959       // visualization.
960     case BEGIN_COMPUTATION:
961     case END_COMPUTATION:
962     case BEGIN_TRACE:
963     case END_TRACE:
964       p|itime;
965       break;
966     case BEGIN_FUNC:
967         p | itime;
968         p | mIdx;
969         p | event;
970         if(!p.isUnpacking()){
971                 p(fName,flen-1);
972         }
973         break;
974     case END_FUNC:
975         p | itime;
976         p | mIdx;
977         break;
978     case END_PHASE:
979       p|eIdx; // FIXME: actually the phase ID
980       p|itime;
981       break;
982     default:
983       CmiError("***Internal Error*** Wierd Event %d.\n", type);
984       break;
985   }
986   if (p.isUnpacking()) time = itime/1.0e6;
987   p|ret;
988 }
989
990 TraceProjections::TraceProjections(char **argv): 
991   curevent(0), inEntry(0), computationStarted(0), 
992         converseExit(0), endTime(0.0), traceNestedEvents(0),
993         currentPhaseID(0), lastPhaseEvent(NULL)
994 {
995   //  CkPrintf("Trace projections dummy constructor called on %d\n",CkMyPe());
996
997   if (CkpvAccess(traceOnPe) == 0) return;
998
999   CtvInitialize(int,curThreadEvent);
1000   CkpvInitialize(CmiInt8, CtrLogBufSize);
1001   CkpvAccess(CtrLogBufSize) = DefaultLogBufSize;
1002   CtvAccess(curThreadEvent)=0;
1003   if (CmiGetArgLongDesc(argv,"+logsize",&CkpvAccess(CtrLogBufSize), 
1004                        "Log entries to buffer per I/O")) {
1005     if (CkMyPe() == 0) {
1006       CmiPrintf("Trace: logsize: %ld\n", CkpvAccess(CtrLogBufSize));
1007     }
1008   }
1009   checknested = 
1010     CmiGetArgFlagDesc(argv,"+checknested",
1011                       "check projections nest begin end execute events");
1012   traceNestedEvents = 
1013     CmiGetArgFlagDesc(argv,"+tracenested",
1014               "trace projections nest begin/end execute events");
1015   int binary = 
1016     CmiGetArgFlagDesc(argv,"+binary-trace",
1017                       "Write log files in binary format");
1018
1019   CmiInt8 nSubdirs = 0;
1020   CmiGetArgLongDesc(argv,"+trace-subdirs", &nSubdirs, "Number of subdirectories into which traces will be written");
1021
1022
1023 #if CMK_PROJECTIONS_USE_ZLIB
1024   int compressed = true;
1025   CmiGetArgFlagDesc(argv,"+gz-trace","Write log files pre-compressed with gzip");
1026   int disableCompressed = CmiGetArgFlagDesc(argv,"+no-gz-trace","Disable writing log files pre-compressed with gzip");
1027   compressed = compressed && !disableCompressed;
1028 #else
1029   // consume the flag so there's no confusing
1030   CmiGetArgFlagDesc(argv,"+gz-trace",
1031                     "Write log files pre-compressed with gzip");
1032   if(CkMyPe() == 0) CkPrintf("Warning> gz-trace is not supported on this machine!\n");
1033 #endif
1034
1035   // **CW** default to non delta log encoding. The user may choose to do
1036   // create both logs (for debugging) or just the old log timestamping
1037   // (for compatibility).
1038   // Generating just the non delta log takes precedence over generating
1039   // both logs (if both arguments appear on the command line).
1040
1041   // switch to OLD log format until everything works // Gengbin
1042   nonDeltaLog = 1;
1043   deltaLog = 0;
1044   deltaLog = CmiGetArgFlagDesc(argv, "+logDelta",
1045                                   "Generate Delta encoded and simple timestamped log files");
1046
1047   _logPool = new LogPool(CkpvAccess(traceRoot));
1048   _logPool->setNumSubdirs(nSubdirs);
1049   _logPool->setBinary(binary);
1050 #if CMK_PROJECTIONS_USE_ZLIB
1051   _logPool->setCompressed(compressed);
1052 #endif
1053   if (CkMyPe() == 0) {
1054     _logPool->createSts();
1055     _logPool->createRC();
1056     _logPool->createTopo();
1057   }
1058   funcCount=1;
1059
1060 #if CMK_HAS_COUNTER_PAPI
1061   // We initialize and create the event sets for use with PAPI here.
1062   int papiRetValue;
1063   if(CkMyRank()==0){
1064     papiRetValue = PAPI_library_init(PAPI_VER_CURRENT);
1065     if (papiRetValue != PAPI_VER_CURRENT) {
1066       CmiAbort("PAPI Library initialization failure!\n");
1067     }
1068 #if CMK_SMP
1069     if(PAPI_thread_init(pthread_self) != PAPI_OK){
1070       CmiAbort("PAPI could not be initialized in SMP mode!\n");
1071     }
1072 #endif
1073   }
1074
1075 #if CMK_SMP
1076   //PAPI_thread_init has to finish before calling PAPI_create_eventset
1077   #if CMK_SMP_TRACE_COMMTHREAD
1078       CmiNodeAllBarrier();
1079   #else
1080       CmiNodeBarrier();
1081   #endif
1082 #endif
1083   // PAPI 3 mandates the initialization of the set to PAPI_NULL
1084   papiEventSet = PAPI_NULL; 
1085   if (PAPI_create_eventset(&papiEventSet) != PAPI_OK) {
1086     CmiAbort("PAPI failed to create event set!\n");
1087   }
1088 #ifdef USE_SPP_PAPI
1089   //  CmiPrintf("Using SPP counters for PAPI\n");
1090   if(PAPI_query_event(PAPI_FP_OPS)==PAPI_OK) {
1091     papiEvents[0] = PAPI_FP_OPS;
1092   }else{
1093     if(CmiMyPe()==0){
1094       CmiAbort("WARNING: PAPI_FP_OPS doesn't exist on this platform!");
1095     }
1096   }
1097   if(PAPI_query_event(PAPI_TOT_INS)==PAPI_OK) {
1098     papiEvents[1] = PAPI_TOT_INS;
1099   }else{
1100     CmiAbort("WARNING: PAPI_TOT_INS doesn't exist on this platform!");
1101   }
1102   int EventCode;
1103   int ret;
1104   ret=PAPI_event_name_to_code("perf::PERF_COUNT_HW_CACHE_LL:MISS",&EventCode);
1105   if(PAPI_query_event(EventCode)==PAPI_OK) {
1106     papiEvents[2] = EventCode;
1107   }else{
1108     CmiAbort("WARNING: perf::PERF_COUNT_HW_CACHE_LL:MISS doesn't exist on this platform!");
1109   }
1110   ret=PAPI_event_name_to_code("DATA_PREFETCHER:ALL",&EventCode);
1111   if(PAPI_query_event(EventCode)==PAPI_OK) {
1112     papiEvents[3] = EventCode;
1113   }else{
1114     CmiAbort("WARNING: DATA_PREFETCHER:ALL doesn't exist on this platform!");
1115   }
1116   if(PAPI_query_event(PAPI_L1_DCA)==PAPI_OK) {
1117     papiEvents[4] = PAPI_L1_DCA;
1118   }else{
1119     CmiAbort("WARNING: PAPI_L1_DCA doesn't exist on this platform!");
1120   }
1121   if(PAPI_query_event(PAPI_TOT_CYC)==PAPI_OK) {
1122     papiEvents[5] = PAPI_TOT_CYC;
1123   }else{
1124     CmiAbort("WARNING: PAPI_TOT_CYC doesn't exist on this platform!");
1125   }
1126 #else
1127   // just uses { PAPI_L2_DCM, PAPI_FP_OPS } the 2 initialized PAPI_EVENTS
1128 #endif
1129   papiRetValue = PAPI_add_events(papiEventSet, papiEvents, NUMPAPIEVENTS);
1130   if (papiRetValue < 0) {
1131     if (papiRetValue == PAPI_ECNFLCT) {
1132       CmiAbort("PAPI events conflict! Please re-assign event types!\n");
1133     } else {
1134       char error_str[PAPI_MAX_STR_LEN];
1135       PAPI_perror(papiRetValue,error_str,PAPI_MAX_STR_LEN);
1136       CmiPrintf("PAPI failed with error %s val %d\n",error_str,papiRetValue);
1137       CmiAbort("PAPI failed to add designated events!\n");
1138     }
1139   }
1140   if(CkMyPe()==0)
1141     {
1142       CmiPrintf("Registered %d PAPI counters:",NUMPAPIEVENTS);
1143       char nameBuf[PAPI_MAX_STR_LEN];
1144       for(int i=0;i<NUMPAPIEVENTS;i++)
1145         {
1146           PAPI_event_code_to_name(papiEvents[i], nameBuf);
1147           CmiPrintf("%s ",nameBuf);
1148         }
1149       CmiPrintf("\n");
1150     }
1151   memset(papiValues, 0, NUMPAPIEVENTS*sizeof(LONG_LONG_PAPI));
1152 #endif
1153 }
1154
1155 int TraceProjections::traceRegisterUserEvent(const char* evt, int e)
1156 {
1157   OPTIMIZED_VERSION
1158   CkAssert(e==-1 || e>=0);
1159   CkAssert(evt != NULL);
1160   int event;
1161   int biggest = -1;
1162   for (int i=0; i<CkpvAccess(usrEvents)->length(); i++) {
1163     int cur = (*CkpvAccess(usrEvents))[i]->e;
1164     if (cur == e) {
1165       //CmiPrintf("%s %s\n", (*CkpvAccess(usrEvents))[i]->str, evt);
1166       if (strcmp((*CkpvAccess(usrEvents))[i]->str, evt) == 0) 
1167         return e;
1168       else
1169         CmiAbort("UserEvent double registered!");
1170     }
1171     if (cur > biggest) biggest = cur;
1172   }
1173   // if no user events have so far been registered. biggest will be -1
1174   // and hence newly assigned event numbers will begin from 0.
1175   if (e==-1) event = biggest+1;  // automatically assign new event number
1176   else event = e;
1177   CkpvAccess(usrEvents)->push_back(new UsrEvent(event,(char *)evt));
1178   return event;
1179 }
1180
1181 void TraceProjections::traceClearEps(void)
1182 {
1183   // In trace-summary, this zeros out the EP bins, to eliminate noise
1184   // from startup.  Here, this isn't useful, since we can do that in
1185   // post-processing
1186 }
1187
1188 void TraceProjections::traceWriteSts(void)
1189 {
1190   if(CkMyPe()==0)
1191     _logPool->writeSts(this);
1192 }
1193
1194 /** 
1195  * **IMPT NOTES**:
1196  *
1197  * This is called when Converse closes during ConverseCommonExit().
1198  * **FIXME**(?) - is this also exposed as a tracing-framework API call?
1199  *
1200  * Some programs bypass CkExit() (like NAMD, which eventually calls
1201  * ConverseExit()), modules like traces will have to pretend to shutdown
1202  * as if CkExit() was called but at the same time avoid making
1203  * subsequent CkExit() calls (which is usually required for allowing
1204  * other modules to shutdown).
1205  *
1206  * Note that we can only get here if CkExit() was not called, since the
1207  * trace module will un-register itself from TraceArray if it did.
1208  *
1209  */
1210 void TraceProjections::traceClose(void)
1211 {
1212 #ifdef PROJ_ANALYSIS
1213   // CkPrintf("CkExit was not called on shutdown on [%d]\n", CkMyPe());
1214
1215   // sets the flag that tells the code not to make the CkExit call later
1216   converseExit = 1;
1217   if (CkMyPe() == 0) {
1218     CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1219     bocProxy.traceProjectionsParallelShutdown(-1);
1220   }
1221   if(CkMyRank() == CkMyNodeSize()){ //communication thread
1222     CkpvAccess(_trace)->endComputation();
1223     delete _logPool;              // will write
1224     // remove myself from traceArray so that no tracing will be called.
1225     CkpvAccess(_traces)->removeTrace(this);
1226   }
1227 #else
1228   // we've already deleted the logpool, so multiple calls to traceClose
1229   // are tolerated.
1230   if (_logPool == NULL) {
1231     return;
1232   }
1233   if(CkMyPe()==0){
1234     _logPool->writeSts(this);
1235   }
1236   CkpvAccess(_trace)->endComputation();
1237   delete _logPool;              // will write
1238   // remove myself from traceArray so that no tracing will be called.
1239   CkpvAccess(_traces)->removeTrace(this);
1240 #endif
1241 }
1242
1243 /**
1244  *  **IMPT NOTES**:
1245  *
1246  *  This is meant to be called internally by the tracing framework.
1247  *
1248  */
1249 void TraceProjections::closeTrace() {
1250   //  CkPrintf("Close Trace called on [%d]\n", CkMyPe());
1251   if (CkMyPe() == 0) {
1252     // CkPrintf("Pe 0 will now write sts and projrc files\n");
1253     _logPool->writeSts(this);
1254     _logPool->writeRC();
1255     _logPool->writeTopo();
1256     // CkPrintf("Pe 0 has now written sts and projrc files\n");
1257   }
1258   delete _logPool;       // will write logs to file
1259 }
1260
1261 #if CMK_SMP_TRACE_COMMTHREAD
1262 void TraceProjections::traceBeginOnCommThread()
1263 {
1264   if (!computationStarted) return;
1265   _logPool->add(BEGIN_TRACE, 0, 0, TraceTimer(), curevent++, CmiNumPes()+CmiMyNode());
1266 }
1267
1268 void TraceProjections::traceEndOnCommThread()
1269 {
1270   _logPool->add(END_TRACE, 0, 0, TraceTimer(), curevent++, CmiNumPes()+CmiMyNode());
1271 }
1272 #endif
1273
1274 void TraceProjections::traceBegin(void)
1275 {
1276   if (!computationStarted) return;
1277   _logPool->add(BEGIN_TRACE, 0, 0, TraceTimer(), curevent++, CkMyPe());
1278 }
1279
1280 void TraceProjections::traceEnd(void)
1281 {
1282   _logPool->add(END_TRACE, 0, 0, TraceTimer(), curevent++, CkMyPe());
1283 }
1284
1285 void TraceProjections::userEvent(int e)
1286 {
1287   if (!computationStarted) return;
1288   _logPool->add(USER_EVENT, e, 0, TraceTimer(),curevent++,CkMyPe());
1289 }
1290
1291 void TraceProjections::userBracketEvent(int e, double bt, double et)
1292 {
1293   if (!computationStarted) return;
1294   // two events record Begin/End of event e.
1295   _logPool->add(USER_EVENT_PAIR, e, 0, TraceTimer(bt), curevent, CkMyPe());
1296   _logPool->add(USER_EVENT_PAIR, e, 0, TraceTimer(et), curevent++, CkMyPe());
1297 }
1298
1299 void TraceProjections::userSuppliedData(int d)
1300 {
1301   if (!computationStarted) return;
1302   _logPool->addUserSupplied(d);
1303 }
1304
1305 void TraceProjections::userSuppliedNote(char *note)
1306 {
1307   if (!computationStarted) return;
1308   _logPool->addUserSuppliedNote(note);
1309 }
1310
1311
1312 void TraceProjections::userSuppliedBracketedNote(char *note, int eventID, double bt, double et)
1313 {
1314   if (!computationStarted) return;
1315   _logPool->addUserSuppliedBracketedNote(note,  eventID,  bt, et);
1316 }
1317
1318 void TraceProjections::memoryUsage(double m)
1319 {
1320   if (!computationStarted) return;
1321   _logPool->addMemoryUsage(MEMORY_USAGE_CURRENT, TraceTimer(), m );
1322   
1323 }
1324
1325
1326 void TraceProjections::creation(envelope *e, int ep, int num)
1327 {
1328   double curTime = TraceTimer();
1329   if (e == 0) {
1330     CtvAccess(curThreadEvent) = curevent;
1331     _logPool->add(CREATION, ForChareMsg, ep, curTime,
1332                   curevent++, CkMyPe(), 0, NULL, 0, 0.0);
1333   } else {
1334     int type=e->getMsgtype();
1335     e->setEvent(curevent);
1336     if (num > 1) {
1337       _logPool->add(CREATION_BCAST, type, ep, curTime,
1338                     curevent++, CkMyPe(), e->getTotalsize(), 
1339                     NULL, 0, 0.0, num);
1340     } else {
1341       _logPool->add(CREATION, type, ep, curTime,
1342                     curevent++, CkMyPe(), e->getTotalsize(), 
1343                     NULL, 0, 0.0);
1344     }
1345   }
1346 }
1347
1348 //This function is only called from a comm thread in SMP mode. 
1349 void TraceProjections::creation(char *msg)
1350 {
1351 #if CMK_SMP_TRACE_COMMTHREAD
1352         // msg must be a charm message
1353     envelope *e = (envelope *)msg;
1354     int ep = e->getEpIdx();
1355     if(_entryTable[ep]->traceEnabled) {
1356         creation(e, ep, 1);
1357         e->setSrcPe(CkMyPe());              // pretend I am the sender
1358     }
1359 #endif
1360 }
1361
1362 void TraceProjections::traceCommSetMsgID(char *msg)
1363 {
1364 #if CMK_SMP_TRACE_COMMTHREAD
1365     // msg must be a charm message
1366     envelope *e = (envelope *)msg;
1367     int ep = e->getEpIdx();
1368     if(_entryTable[ep]->traceEnabled) {
1369         e->setSrcPe(CkMyPe());              // pretend I am the sender
1370         e->setEvent(curevent);
1371     }
1372 #endif
1373 }
1374
1375 void TraceProjections::traceGetMsgID(char *msg, int *pe, int *event)
1376 {
1377     // msg must be a charm message
1378     *pe = *event = -1;
1379     envelope *e = (envelope *)msg;
1380     int ep = e->getEpIdx();
1381     if(_entryTable[ep]->traceEnabled) {
1382         *pe = e->getSrcPe();
1383         *event = e->getEvent();
1384     }
1385 }
1386
1387 void TraceProjections::traceSetMsgID(char *msg, int pe, int event)
1388 {
1389        // msg must be a charm message
1390     envelope *e = (envelope *)msg;
1391     int ep = e->getEpIdx();
1392     if(ep<=0 || ep>=_entryTable.size()) return;
1393     if (e->getSrcPe()<0 || e->getSrcPe()>=CkNumPes()+CkNumNodes()) return;
1394     if (e->getMsgtype()<=0 || e->getMsgtype()>=LAST_CK_ENVELOPE_TYPE) return;
1395     if(_entryTable[ep]->traceEnabled) {
1396         e->setSrcPe(pe);
1397         e->setEvent(event);
1398     }
1399 }
1400
1401 /* **CW** Non-disruptive attempt to add destination PE knowledge to
1402    Communication Library-specific Multicasts via new event 
1403    CREATION_MULTICAST.
1404 */
1405
1406 void TraceProjections::creationMulticast(envelope *e, int ep, int num,
1407                                          int *pelist)
1408 {
1409   double curTime = TraceTimer();
1410   if (e==0) {
1411     CtvAccess(curThreadEvent)=curevent;
1412     _logPool->addCreationMulticast(ForChareMsg, ep, curTime, curevent++,
1413                                    CkMyPe(), 0, 0, 0.0, num, pelist);
1414   } else {
1415     int type=e->getMsgtype();
1416     e->setEvent(curevent);
1417     _logPool->addCreationMulticast(type, ep, curTime, curevent++, CkMyPe(),
1418                                    e->getTotalsize(), 0, 0.0, num, pelist);
1419   }
1420 }
1421
1422 void TraceProjections::creationDone(int num)
1423 {
1424   // modified the creation done time of the last num log entries
1425   // FIXME: totally a hack
1426   double curTime = TraceTimer();
1427   int idx = _logPool->numEntries-1;
1428   while (idx >=0 && num >0 ) {
1429     LogEntry &log = _logPool->pool[idx];
1430     if ((log.type == CREATION) ||
1431         (log.type == CREATION_BCAST) ||
1432         (log.type == CREATION_MULTICAST)) {
1433       log.recvTime = curTime - log.time;
1434       num --;
1435     }
1436     idx--;
1437   }
1438 }
1439
1440 void TraceProjections::beginExecute(CmiObjId *tid)
1441 {
1442 #if CMK_HAS_COUNTER_PAPI
1443   if (PAPI_read(papiEventSet, papiValues) != PAPI_OK) {
1444     CmiAbort("PAPI failed to read at begin execute!\n");
1445   }
1446 #endif
1447   if (checknested && inEntry) CmiAbort("Nested Begin Execute!\n");
1448   execEvent = CtvAccess(curThreadEvent);
1449   execEp = (-1);
1450   _logPool->add(BEGIN_PROCESSING,ForChareMsg,_threadEP,TraceTimer(),
1451                 execEvent,CkMyPe(), 0, tid);
1452 #if CMK_HAS_COUNTER_PAPI
1453   _logPool->addPapi(papiValues);
1454 #endif
1455   inEntry = 1;
1456 }
1457
1458 void TraceProjections::beginExecute(envelope *e)
1459 {
1460   if(e==0) {
1461 #if CMK_HAS_COUNTER_PAPI
1462     if (PAPI_read(papiEventSet, papiValues) != PAPI_OK) {
1463       CmiAbort("PAPI failed to read at begin execute!\n");
1464     }
1465 #endif
1466     if (checknested && inEntry) CmiAbort("Nested Begin Execute!\n");
1467     execEvent = CtvAccess(curThreadEvent);
1468     execEp = (-1);
1469     _logPool->add(BEGIN_PROCESSING,ForChareMsg,_threadEP,TraceTimer(),
1470                   execEvent,CkMyPe(), 0, NULL, 0.0, TraceCpuTimer());
1471 #if CMK_HAS_COUNTER_PAPI
1472     _logPool->addPapi(papiValues);
1473 #endif
1474     inEntry = 1;
1475   } else {
1476     beginExecute(e->getEvent(),e->getMsgtype(),e->getEpIdx(),
1477                  e->getSrcPe(),e->getTotalsize());
1478   }
1479 }
1480
1481 void TraceProjections::beginExecute(char *msg){
1482 #if CMK_SMP_TRACE_COMMTHREAD
1483         //This function is called from comm thread in SMP mode
1484     envelope *e = (envelope *)msg;
1485     int ep = e->getEpIdx();
1486     if(_entryTable[ep]->traceEnabled)
1487                 beginExecute(e);
1488 #endif
1489 }
1490
1491 void TraceProjections::beginExecute(int event, int msgType, int ep, int srcPe,
1492                                     int mlen, CmiObjId *idx)
1493 {
1494   if (traceNestedEvents) {
1495     if (! nestedEvents.isEmpty()) {
1496       endExecuteLocal();
1497     }
1498     nestedEvents.enq(NestedEvent(event, msgType, ep, srcPe, mlen, idx));
1499   }
1500   beginExecuteLocal(event, msgType, ep, srcPe, mlen, idx);
1501 }
1502
1503 void TraceProjections::changeLastEntryTimestamp(double ts)
1504 {
1505   _logPool->modLastEntryTimestamp(ts);
1506 }
1507
1508 void TraceProjections::beginExecuteLocal(int event, int msgType, int ep, int srcPe,
1509                                     int mlen, CmiObjId *idx)
1510 {
1511 #if CMK_HAS_COUNTER_PAPI
1512   if (PAPI_read(papiEventSet, papiValues) != PAPI_OK) {
1513     CmiAbort("PAPI failed to read at begin execute!\n");
1514   }
1515 #endif
1516   if (checknested && inEntry) CmiAbort("Nested Begin Execute!\n");
1517   execEvent=event;
1518   execEp=ep;
1519   execPe=srcPe;
1520   _logPool->add(BEGIN_PROCESSING,msgType,ep,TraceTimer(),event,
1521                 srcPe, mlen, idx, 0.0, TraceCpuTimer());
1522 #if CMK_HAS_COUNTER_PAPI
1523   _logPool->addPapi(papiValues);
1524 #endif
1525   inEntry = 1;
1526 }
1527
1528 void TraceProjections::endExecute(void)
1529 {
1530   if (traceNestedEvents) nestedEvents.deq();
1531   endExecuteLocal();
1532   if (traceNestedEvents) {
1533     if (! nestedEvents.isEmpty()) {
1534       NestedEvent &ne = nestedEvents.peek();
1535       beginExecuteLocal(ne.event, ne.msgType, ne.ep, ne.srcPe, ne.ml, ne.idx);
1536     }
1537   }
1538 }
1539
1540 void TraceProjections::endExecute(char *msg)
1541 {
1542 #if CMK_SMP_TRACE_COMMTHREAD
1543         //This function is called from comm thread in SMP mode
1544     envelope *e = (envelope *)msg;
1545     int ep = e->getEpIdx();
1546     if(_entryTable[ep]->traceEnabled)
1547                 endExecute();
1548 #endif  
1549 }
1550
1551 void TraceProjections::endExecuteLocal(void)
1552 {
1553 #if CMK_HAS_COUNTER_PAPI
1554   if (PAPI_read(papiEventSet, papiValues) != PAPI_OK) {
1555     CmiAbort("PAPI failed to read at end execute!\n");
1556   }
1557 #endif
1558   if (checknested && !inEntry) CmiAbort("Nested EndExecute!\n");
1559   double cputime = TraceCpuTimer();
1560   if(execEp == (-1)) {
1561     _logPool->add(END_PROCESSING, 0, _threadEP, TraceTimer(),
1562                   execEvent, CkMyPe(), 0, NULL, 0.0, cputime);
1563   } else {
1564     _logPool->add(END_PROCESSING, 0, execEp, TraceTimer(),
1565                   execEvent, execPe, 0, NULL, 0.0, cputime);
1566   }
1567 #if CMK_HAS_COUNTER_PAPI
1568   _logPool->addPapi(papiValues);
1569 #endif
1570   inEntry = 0;
1571 }
1572
1573 void TraceProjections::messageRecv(char *env, int pe)
1574 {
1575 #if 0
1576   envelope *e = (envelope *)env;
1577   int msgType = e->getMsgtype();
1578   int ep = e->getEpIdx();
1579 #if 0
1580   if (msgType==NewChareMsg || msgType==NewVChareMsg
1581           || msgType==ForChareMsg || msgType==ForVidMsg
1582           || msgType==BocInitMsg || msgType==NodeBocInitMsg
1583           || msgType==ForBocMsg || msgType==ForNodeBocMsg)
1584     ep = e->getEpIdx();
1585   else
1586     ep = _threadEP;
1587 #endif
1588   _logPool->add(MESSAGE_RECV, msgType, ep, TraceTimer(),
1589                 curevent++, e->getSrcPe(), e->getTotalsize());
1590 #endif
1591 }
1592
1593 void TraceProjections::beginIdle(double curWallTime)
1594 {
1595   _logPool->add(BEGIN_IDLE, 0, 0, TraceTimer(curWallTime), 0, CkMyPe());
1596 }
1597
1598 void TraceProjections::endIdle(double curWallTime)
1599 {
1600   _logPool->add(END_IDLE, 0, 0, TraceTimer(curWallTime), 0, CkMyPe());
1601 }
1602
1603 void TraceProjections::beginPack(void)
1604 {
1605   _logPool->add(BEGIN_PACK, 0, 0, TraceTimer(), 0, CkMyPe());
1606 }
1607
1608 void TraceProjections::endPack(void)
1609 {
1610   _logPool->add(END_PACK, 0, 0, TraceTimer(), 0, CkMyPe());
1611 }
1612
1613 void TraceProjections::beginUnpack(void)
1614 {
1615   _logPool->add(BEGIN_UNPACK, 0, 0, TraceTimer(), 0, CkMyPe());
1616 }
1617
1618 void TraceProjections::endUnpack(void)
1619 {
1620   _logPool->add(END_UNPACK, 0, 0, TraceTimer(), 0, CkMyPe());
1621 }
1622
1623 void TraceProjections::enqueue(envelope *) {}
1624
1625 void TraceProjections::dequeue(envelope *) {}
1626
1627 void TraceProjections::beginComputation(void)
1628 {
1629   computationStarted = 1;
1630
1631   // Executes the callback function provided by the machine
1632   // layer. This is the proper method to register user events in a
1633   // machine layer because projections is a charm++ module.
1634   if (CkpvAccess(traceOnPe) != 0) {
1635     void (*ptr)() = registerMachineUserEvents();
1636     if (ptr != NULL) {
1637       ptr();
1638     }
1639   }
1640 //  CkpvAccess(traceInitTime) = TRACE_TIMER();
1641 //  CkpvAccess(traceInitCpuTime) = TRACE_CPUTIMER();
1642   _logPool->add(BEGIN_COMPUTATION, 0, 0, TraceTimer(), -1, -1);
1643 #if CMK_HAS_COUNTER_PAPI
1644   // we start the counters here
1645   if (PAPI_start(papiEventSet) != PAPI_OK) {
1646     CmiAbort("PAPI failed to start designated counters!\n");
1647   }
1648 #endif
1649 }
1650
1651 void TraceProjections::endComputation(void)
1652 {
1653 #if CMK_HAS_COUNTER_PAPI
1654   // we stop the counters here. A silent failure is alright since we
1655   // are already at the end of the program.
1656   if (PAPI_stop(papiEventSet, papiValues) != PAPI_OK) {
1657     CkPrintf("Warning: PAPI failed to stop correctly!\n");
1658   }
1659   // NOTE: We should not do a complete close of PAPI until after the
1660   // sts writer is done.
1661 #endif
1662   endTime = TraceTimer();
1663   _logPool->add(END_COMPUTATION, 0, 0, endTime, -1, -1);
1664   /*
1665   CkPrintf("End Computation [%d] records time as %lf\n", CkMyPe(), 
1666            endTime*1e06);
1667   */
1668 }
1669
1670 int TraceProjections::idxRegistered(int idx)
1671 {
1672     int idxVecLen = idxVec.size();
1673     for(int i=0; i<idxVecLen; i++)
1674     {
1675         if(idx == idxVec[i])
1676             return 1;
1677     }
1678     return 0;
1679 }
1680
1681 void TraceProjections::regFunc(const char *name, int &idx, int idxSpecifiedByUser){
1682     StrKey k((char*)name,strlen(name));
1683     int num = funcHashtable.get(k);
1684     
1685     if(num!=0) {
1686         return;
1687         //as for mpi programs, the same function may be registered for several times
1688         //CmiError("\"%s has been already registered! Please change the name!\"\n", name);
1689     }
1690     
1691     int isIdxExisting=0;
1692     if(idxSpecifiedByUser)
1693         isIdxExisting=idxRegistered(idx);
1694     if(isIdxExisting){
1695         return;
1696         //same reason with num!=0
1697         //CmiError("The identifier %d for the trace function has been already registered!", idx);
1698     }
1699
1700     if(idxSpecifiedByUser) {
1701         char *st = new char[strlen(name)+1];
1702         memcpy(st,name,strlen(name)+1);
1703         StrKey *newKey = new StrKey(st,strlen(st));
1704         int &ref = funcHashtable.put(*newKey);
1705         ref=idx;
1706         funcCount++;
1707         idxVec.push_back(idx);  
1708     } else {
1709         char *st = new char[strlen(name)+1];
1710         memcpy(st,name,strlen(name)+1);
1711         StrKey *newKey = new StrKey(st,strlen(st));
1712         int &ref = funcHashtable.put(*newKey);
1713         ref=funcCount;
1714         num = funcCount;
1715         funcCount++;
1716         idx = num;
1717         idxVec.push_back(idx);
1718     }
1719 }
1720
1721 void TraceProjections::beginFunc(char *name,char *file,int line){
1722         StrKey k(name,strlen(name));    
1723         unsigned short  num = (unsigned short)funcHashtable.get(k);
1724         beginFunc(num,file,line);
1725 }
1726
1727 void TraceProjections::beginFunc(int idx,char *file,int line){
1728         if(idx <= 0){
1729                 CmiError("Unregistered function id %d being used in %s:%d \n",idx,file,line);
1730         }       
1731         _logPool->add(BEGIN_FUNC,TraceTimer(),idx,line,file);
1732 }
1733
1734 void TraceProjections::endFunc(char *name){
1735         StrKey k(name,strlen(name));    
1736         int num = funcHashtable.get(k);
1737         endFunc(num);   
1738 }
1739
1740 void TraceProjections::endFunc(int num){
1741         if(num <= 0){
1742                 printf("endFunc without start :O\n");
1743         }
1744         _logPool->add(END_FUNC,TraceTimer(),num,0,NULL);
1745 }
1746
1747 // specialized PUP:ers for handling trace projections logs
1748 void toProjectionsFile::bytes(void *p,int n,size_t itemSize,dataType t)
1749 {
1750   for (int i=0;i<n;i++) 
1751     switch(t) {
1752     case Tchar: CheckAndFPrintF(f,"%c",((char *)p)[i]); break;
1753     case Tuchar:
1754     case Tbyte: CheckAndFPrintF(f,"%d",((unsigned char *)p)[i]); break;
1755     case Tshort: CheckAndFPrintF(f," %d",((short *)p)[i]); break;
1756     case Tushort: CheckAndFPrintF(f," %u",((unsigned short *)p)[i]); break;
1757     case Tint: CheckAndFPrintF(f," %d",((int *)p)[i]); break;
1758     case Tuint: CheckAndFPrintF(f," %u",((unsigned int *)p)[i]); break;
1759     case Tlong: CheckAndFPrintF(f," %ld",((long *)p)[i]); break;
1760     case Tulong: CheckAndFPrintF(f," %lu",((unsigned long *)p)[i]); break;
1761     case Tfloat: CheckAndFPrintF(f," %.7g",((float *)p)[i]); break;
1762     case Tdouble: CheckAndFPrintF(f," %.15g",((double *)p)[i]); break;
1763 #ifdef CMK_PUP_LONG_LONG
1764     case Tlonglong: CheckAndFPrintF(f," %lld",((CMK_PUP_LONG_LONG *)p)[i]); break;
1765     case Tulonglong: CheckAndFPrintF(f," %llu",((unsigned CMK_PUP_LONG_LONG *)p)[i]); break;
1766 #endif
1767     default: CmiAbort("Unrecognized pup type code!");
1768     };
1769 }
1770
1771 void fromProjectionsFile::bytes(void *p,int n,size_t itemSize,dataType t)
1772 {
1773   for (int i=0;i<n;i++) 
1774     switch(t) {
1775     case Tchar: { 
1776       char c = fgetc(f);
1777       if (c==EOF)
1778         parseError("Could not match character");
1779       else
1780         ((char *)p)[i] = c;
1781       break;
1782     }
1783     case Tuchar:
1784     case Tbyte: ((unsigned char *)p)[i]=(unsigned char)readInt("%d"); break;
1785     case Tshort:((short *)p)[i]=(short)readInt(); break;
1786     case Tushort: ((unsigned short *)p)[i]=(unsigned short)readUint(); break;
1787     case Tint:  ((int *)p)[i]=readInt(); break;
1788     case Tuint: ((unsigned int *)p)[i]=readUint(); break;
1789     case Tlong: ((long *)p)[i]=readInt(); break;
1790     case Tulong:((unsigned long *)p)[i]=readUint(); break;
1791     case Tfloat: ((float *)p)[i]=(float)readDouble(); break;
1792     case Tdouble:((double *)p)[i]=readDouble(); break;
1793 #ifdef CMK_PUP_LONG_LONG
1794     case Tlonglong: ((CMK_PUP_LONG_LONG *)p)[i]=readLongInt(); break;
1795     case Tulonglong: ((unsigned CMK_PUP_LONG_LONG *)p)[i]=readLongInt(); break;
1796 #endif
1797     default: CmiAbort("Unrecognized pup type code!");
1798     };
1799 }
1800
1801 #if CMK_PROJECTIONS_USE_ZLIB
1802 void toProjectionsGZFile::bytes(void *p,int n,size_t itemSize,dataType t)
1803 {
1804   for (int i=0;i<n;i++) 
1805     switch(t) {
1806     case Tchar: gzprintf(f,"%c",((char *)p)[i]); break;
1807     case Tuchar:
1808     case Tbyte: gzprintf(f,"%d",((unsigned char *)p)[i]); break;
1809     case Tshort: gzprintf(f," %d",((short *)p)[i]); break;
1810     case Tushort: gzprintf(f," %u",((unsigned short *)p)[i]); break;
1811     case Tint: gzprintf(f," %d",((int *)p)[i]); break;
1812     case Tuint: gzprintf(f," %u",((unsigned int *)p)[i]); break;
1813     case Tlong: gzprintf(f," %ld",((long *)p)[i]); break;
1814     case Tulong: gzprintf(f," %lu",((unsigned long *)p)[i]); break;
1815     case Tfloat: gzprintf(f," %.7g",((float *)p)[i]); break;
1816     case Tdouble: gzprintf(f," %.15g",((double *)p)[i]); break;
1817 #ifdef CMK_PUP_LONG_LONG
1818     case Tlonglong: gzprintf(f," %lld",((CMK_PUP_LONG_LONG *)p)[i]); break;
1819     case Tulonglong: gzprintf(f," %llu",((unsigned CMK_PUP_LONG_LONG *)p)[i]); break;
1820 #endif
1821     default: CmiAbort("Unrecognized pup type code!");
1822     };
1823 }
1824 #endif
1825
1826 void TraceProjections::endPhase() {
1827   double currentPhaseTime = TraceTimer();
1828   if (lastPhaseEvent != NULL) {
1829   } else {
1830     if (_logPool->pool != NULL) {
1831       // assumed to be BEGIN_COMPUTATION
1832     } else {
1833       CkPrintf("[%d] Warning: End Phase encountered in an empty log. Inserting BEGIN_COMPUTATION event\n", CkMyPe());
1834       _logPool->add(BEGIN_COMPUTATION, 0, 0, currentPhaseTime, -1, -1);
1835     }
1836   }
1837
1838   /* Insert endPhase event here. */
1839   /* FIXME: Format should be TYPE, PHASE#, TimeStamp, [StartTime] */
1840   /*        We currently "borrow" from the standard add() method. */
1841   /*        It should really be its own add() method.             */
1842   /* NOTE: assignment to lastPhaseEvent is "pre-emptive".         */
1843   lastPhaseEvent = &(_logPool->pool[_logPool->numEntries]);
1844   _logPool->add(END_PHASE, 0, currentPhaseID, currentPhaseTime, -1, CkMyPe());
1845   currentPhaseID++;
1846 }
1847
1848 #ifdef PROJ_ANALYSIS
1849 // ***** FROM HERE, ALL BOC-BASED FUNCTIONALITY IS DEFINED *******
1850
1851
1852 // ***@@@@ REGISTRATION FUNCTIONS/METHODS @@@@***
1853
1854 void registerOutlierReduction() {
1855   outlierReductionType =
1856     CkReduction::addReducer(outlierReduction);
1857   minMaxReductionType =
1858     CkReduction::addReducer(minMaxReduction);
1859 }
1860
1861 /**
1862  * **IMPT NOTES**:
1863  *
1864  * This is the C++ code that is registered to be activated at module
1865  * shutdown. This is called exactly once on processor 0. Module shutdown
1866  * is initiated as a result of a CkExit() call by the application code
1867  * 
1868  * The exit function must ultimately call CkExit() again to
1869  * so that other module exit functions may proceed after this module is
1870  * done.
1871  *
1872  */
1873 // FIXME: WHY extern "C"???
1874 extern "C" void TraceProjectionsExitHandler()
1875 {
1876 #if CMK_TRACE_ENABLED
1877   // CkPrintf("[%d] TraceProjectionsExitHandler called!\n", CkMyPe());
1878   CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1879   bocProxy.traceProjectionsParallelShutdown(CkMyPe());
1880 #else
1881   CkExit();
1882 #endif
1883 }
1884
1885 // This is called once on each processor but the idiom of use appears
1886 // to be to only have processor 0 register the function.
1887 //
1888 // See initnode in trace-projections.ci
1889 void initTraceProjectionsBOC()
1890 {
1891   // CkPrintf("[%d] Trace Projections initialization called!\n", CkMyPe());
1892 #ifdef __BIGSIM__
1893   if (BgNodeRank() == 0) {
1894 #else
1895     if (CkMyRank() == 0) {
1896 #endif
1897       registerExitFn(TraceProjectionsExitHandler);
1898     }
1899 #if 0
1900   } // this is so indentation does not get messed up
1901 #endif
1902 }
1903
1904 // mainchare for trace-projections BOC-operations. 
1905 // Instantiated at processor 0 and ONLY resides on processor 0 for the 
1906 // rest of its life.
1907 //
1908 // Responsible for:
1909 //   1. Handling commandline arguments
1910 //   2. Creating any objects required for proper BOC operations.
1911 //
1912 TraceProjectionsInit::TraceProjectionsInit(CkArgMsg *msg) {
1913   /** Options for Outlier Analysis */
1914   // defaults. Things will change with support for interactive analysis.
1915   bool findOutliers = false;
1916   bool outlierAutomatic = true;
1917   int numKSeeds = 10; 
1918
1919   int peNumKeep = CkNumPes();  // used as a default
1920   double entryThreshold = 0.0;
1921   bool outlierUsePhases = false;
1922   if (outlierAutomatic) {
1923     CmiGetArgIntDesc(msg->argv, "+outlierNumSeeds", &numKSeeds,
1924                      "Number of cluster seeds to apply at outlier analysis.");
1925     CmiGetArgIntDesc(msg->argv, "+outlierPeNumKeep", 
1926                      &peNumKeep, "Number of Processors to retain data");
1927     CmiGetArgDoubleDesc(msg->argv, "+outlierEpThresh", &entryThreshold,
1928                         "Minimum significance of entry points to be considered for clustering (%).");
1929     findOutliers =
1930       CmiGetArgFlagDesc(msg->argv,"+outlier", "Find Outliers.");
1931     outlierUsePhases = 
1932       CmiGetArgFlagDesc(msg->argv,"+outlierUsePhases",
1933                         "Apply automatic outlier analysis to any available phases.");
1934     if (outlierUsePhases) {
1935       // if the user wants to use an outlier feature, it is assumed outlier
1936       //    analysis is desired.
1937       findOutliers = true;
1938     }
1939   }
1940   bool findStartTime = (CmiTimerAbsolute()==1);
1941   traceProjectionsGID = CProxy_TraceProjectionsBOC::ckNew(findOutliers, findStartTime);
1942   if (findOutliers) {
1943     kMeansGID = CProxy_KMeansBOC::ckNew(outlierAutomatic,
1944                                         numKSeeds,
1945                                         peNumKeep,
1946                                         entryThreshold,
1947                                         outlierUsePhases);
1948   }
1949 }
1950
1951 // Called on every processor.
1952 void TraceProjectionsBOC::traceProjectionsParallelShutdown(int pe) {
1953   //CmiPrintf("[%d] traceProjectionsParallelShutdown called from . \n", CkMyPe(), pe);
1954   endPe = pe;                // the pe that starts CkExit()
1955   if (CkMyPe() == 0) {
1956     analysisStartTime = CmiWallTimer();
1957   }
1958   CkpvAccess(_trace)->endComputation();
1959   // no more tracing for projections on this processor after this point. 
1960   // Note that clear must be called after remove, or bad things will happen.
1961   CkpvAccess(_traces)->removeTrace(CkpvAccess(_trace));
1962   CkpvAccess(_traces)->clearTrace();
1963
1964   // From this point, we start multiple chains of reductions and broadcasts to
1965   // perform final online analysis activities.
1966
1967   // Start all parallel operations at once. 
1968   //   These MUST NOT modify base performance data in LogPool. If they must,
1969   //   then the parallel operations must be phased (and this code to be
1970   //   restructured as necessary)
1971   CProxy_KMeansBOC kMeansProxy(kMeansGID);
1972   CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1973   if (findOutliers) {
1974     parModulesRemaining++;
1975     kMeansProxy[CkMyPe()].startKMeansAnalysis();
1976   }
1977   parModulesRemaining++;
1978   if (findStartTime) 
1979   bocProxy[CkMyPe()].startTimeAnalysis();
1980   else
1981   bocProxy[CkMyPe()].startEndTimeAnalysis();
1982 }
1983
1984 // Called on each processor
1985 void KMeansBOC::startKMeansAnalysis() {
1986   // Initialize all necessary structures
1987   LogPool *pool = CkpvAccess(_trace)->_logPool;
1988
1989  if(CkMyPe()==0)     CkPrintf("[%d] KMeansBOC::startKMeansAnalysis time=\t%g\n", CkMyPe(), CkWallTimer() );
1990   int flushInt = 0;
1991   if (pool->hasFlushed) {
1992     flushInt = 1;
1993   }
1994   
1995   CkCallback cb(CkIndex_KMeansBOC::flushCheck(NULL), 
1996                 0, thisProxy);
1997   contribute(sizeof(int), &flushInt, CkReduction::logical_or, cb);  
1998 }
1999
2000 // Called on processor 0
2001 void KMeansBOC::flushCheck(CkReductionMsg *msg) {
2002   int someFlush = *((int *)msg->getData());
2003
2004   // if(CkMyPe()==0) CkPrintf("[%d] KMeansBOC::flushCheck time=\t%g\n", CkMyPe(), CkWallTimer() );
2005   
2006   if (someFlush == 0) {
2007     // Data intact proceed with KMeans analysis
2008     CProxy_KMeansBOC kMeansProxy(kMeansGID);
2009     kMeansProxy.flushCheckDone();
2010   } else {
2011     // Some processor had flushed it data at some point, abandon KMeans
2012     CkPrintf("Warning: Some processor has flushed its data. No KMeans will be conducted\n");
2013     // terminate KMeans
2014     CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
2015     bocProxy[0].kMeansDone();
2016   }
2017 }
2018
2019 // Called on each processor
2020 void KMeansBOC::flushCheckDone() {
2021   // **FIXME** - more flexible metric collection scheme may be necessary
2022   //   in the future for production use.
2023   LogPool *pool = CkpvAccess(_trace)->_logPool;
2024
2025   // if(CkMyPe()==0)     CkPrintf("[%d] KMeansBOC::flushCheckDone time=\t%g\n", CkMyPe(), CkWallTimer() );
2026
2027   numEntryMethods = _entryTable.size();
2028   numMetrics = numEntryMethods + 2; // EPtime + idle and overhead
2029
2030   // maintained across phases
2031   markedBegin = false;
2032   markedIdle = false;
2033   beginBlockTime = 0.0;
2034   beginIdleBlockTime = 0.0;
2035   lastBeginEPIdx = -1; // none
2036
2037   lastPhaseIdx = 0;
2038   currentExecTimes = NULL;
2039   currentPhase = 0;
2040   selected = false;
2041
2042   pool->initializePhases();
2043
2044   // incoming K Seeds and the per-phase filter
2045   incKSeeds = new double[numK*numMetrics];
2046   keepMetric = new bool[numMetrics];
2047
2048   //  Something wrong when call thisProxy[CkMyPe()].getNextPhaseMetrics() !??!
2049   //  CProxy_KMeansBOC kMeansProxy(kMeansGID);
2050   //  kMeansProxy[CkMyPe()].getNextPhaseMetrics();
2051   thisProxy[CkMyPe()].getNextPhaseMetrics();
2052 }
2053
2054 // Called on each processor.
2055 void KMeansBOC::getNextPhaseMetrics() {
2056   // Assumes the presence of the complete logs on this processor.
2057   // Assumes first event is always BEGIN_COMPUTATION
2058   // Assumes each processor sees the same number of phases.
2059   //
2060   // In this code, we collect performance data for this processor.
2061   // All times are in seconds.
2062
2063   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::getNextPhaseMetrics time=\t%g\n", CkMyPe(), CkWallTimer() );  
2064
2065   if (usePhases) {
2066     DEBUGF("[%d] Using Phases\n", CkMyPe());
2067   } else {
2068     DEBUGF("[%d] NOT using Phases\n", CkMyPe());
2069   }
2070   
2071   if (currentExecTimes != NULL) {
2072     delete [] currentExecTimes;
2073   }
2074   currentExecTimes = new double[numMetrics];
2075   for (int i=0; i<numMetrics; i++) {
2076     currentExecTimes[i] = 0.0;
2077   }
2078
2079   int numEventMethods = _entryTable.size();
2080   LogPool *pool = CkpvAccess(_trace)->_logPool;
2081   
2082   CkAssert(pool->numEntries > lastPhaseIdx);
2083   double totalPhaseTime = 0.0;
2084   double totalActiveTime = 0.0; // entry method + idle
2085
2086   for (int i=lastPhaseIdx; i<pool->numEntries; i++) {
2087     if (pool->pool[i].type == BEGIN_PROCESSING) {
2088       // check pairing
2089       if (!markedBegin) {
2090         markedBegin = true;
2091       }
2092       beginBlockTime = pool->pool[i].time;
2093       lastBeginEPIdx = pool->pool[i].eIdx;
2094     } else if (pool->pool[i].type == END_PROCESSING) {
2095       // check pairing
2096       // if End without a begin, just ignore
2097       //   this event. If a phase-boundary is crossed, the Begin
2098       //   event would be maintained in beginBlockTime, so it is 
2099       //   not a problem.
2100       if (markedBegin) {
2101         markedBegin = false;
2102         if (pool->pool[i].event < 0)
2103         {
2104           // ignore dummy events. **FIXME** as they have no eIdx?
2105           continue;
2106         }
2107         currentExecTimes[pool->pool[i].eIdx] += 
2108           pool->pool[i].time - beginBlockTime;
2109         totalActiveTime += pool->pool[i].time - beginBlockTime;
2110         lastBeginEPIdx = -1;
2111       }
2112     } else if (pool->pool[i].type == BEGIN_IDLE) {
2113       // check pairing
2114       if (!markedIdle) {
2115         markedIdle = true;
2116       }
2117       beginIdleBlockTime = pool->pool[i].time;
2118     } else if (pool->pool[i].type == END_IDLE) {
2119       // check pairing
2120       if (markedIdle) {
2121         markedIdle = false;
2122         currentExecTimes[numEventMethods] += 
2123           pool->pool[i].time - beginIdleBlockTime;
2124         totalActiveTime += pool->pool[i].time - beginIdleBlockTime;
2125       }
2126     } else if (pool->pool[i].type == END_PHASE) {
2127       // ignored when not using phases
2128       if (usePhases) {
2129         // when we've not visited this node before
2130         if (i != lastPhaseIdx) { 
2131           totalPhaseTime = 
2132             pool->pool[i].time - pool->pool[lastPhaseIdx].time;
2133           // it is important that proper accounting of time take place here.
2134           // Note that END_PHASE events inevitably occur in the context of
2135           //   some entry method by the way the tracing API is designed.
2136           if (markedBegin) {
2137             CkAssert(lastBeginEPIdx >= 0);
2138             currentExecTimes[lastBeginEPIdx] += 
2139               pool->pool[i].time - beginBlockTime;
2140             totalActiveTime += pool->pool[i].time - beginBlockTime;
2141             // this is so the remainder contributes to the next phase
2142             beginBlockTime = pool->pool[i].time;
2143           }
2144           // The following is unlikely, but stranger things have happened.
2145           if (markedIdle) {
2146             currentExecTimes[numEventMethods] +=
2147               pool->pool[i].time - beginIdleBlockTime;
2148             totalActiveTime += pool->pool[i].time - beginIdleBlockTime;
2149             // this is so the remainder contributes to the next phase
2150             beginIdleBlockTime = pool->pool[i].time;
2151           }
2152           if (totalActiveTime <= totalPhaseTime) {
2153             currentExecTimes[numEventMethods+1] = 
2154               totalPhaseTime - totalActiveTime;
2155           } else {
2156             currentExecTimes[numEventMethods+1] = 0.0;
2157             CkPrintf("[%d] Warning: Overhead found to be negative for Phase %d!\n",
2158                      CkMyPe(), currentPhase);
2159           }
2160           collectKMeansData();
2161           // end the loop (and method) and defer the work till the next call
2162           lastPhaseIdx = i;
2163           break; 
2164         }
2165       }
2166     } else if (pool->pool[i].type == END_COMPUTATION) {
2167       if (markedBegin) {
2168         CkAssert(lastBeginEPIdx >= 0);
2169         currentExecTimes[lastBeginEPIdx] += 
2170           pool->pool[i].time - beginBlockTime;
2171         totalActiveTime += pool->pool[i].time - beginBlockTime;
2172       }
2173       if (markedIdle) {
2174         currentExecTimes[numEventMethods] +=
2175           pool->pool[i].time - beginIdleBlockTime;
2176         totalActiveTime += pool->pool[i].time - beginIdleBlockTime;
2177       }
2178       totalPhaseTime = 
2179         pool->pool[i].time - pool->pool[lastPhaseIdx].time;
2180       if (totalActiveTime <= totalPhaseTime) {
2181         currentExecTimes[numEventMethods+1] = totalPhaseTime - totalActiveTime;
2182       } else {
2183         currentExecTimes[numEventMethods+1] = 0.0;
2184         CkPrintf("[%d] Warning: Overhead found to be negative!\n",
2185                  CkMyPe());
2186       }
2187       collectKMeansData();
2188     }
2189   }
2190 }
2191
2192 /**
2193  *  Through a reduction, collectKMeansData aggregates each processors' data
2194  *  in order for global properties to be determined:
2195  *  
2196  *  1. min & max to determine normalization factors.
2197  *  2. sum to determine global EP averages for possible metric reduction
2198  *       through thresholding.
2199  *  3. sum of squares to compute stddev which may be useful in the future.
2200  *
2201  *  collectKMeansData will also keep the processor's data for the current
2202  *    phase so that it may be normalized and worked on subsequently.
2203  *
2204  **/
2205 void KMeansBOC::collectKMeansData() {
2206   int minOffset = numMetrics;
2207   int maxOffset = 2*numMetrics;
2208   int sosOffset = 3*numMetrics; // sos = Sum Of Squares
2209
2210   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::collectKMeansData time=\tg\n", CkMyPe(), CkWallTimer() );
2211
2212   double *reductionMsg = new double[numMetrics*4];
2213
2214   for (int i=0; i<numMetrics; i++) {
2215     reductionMsg[i] = currentExecTimes[i];
2216     // duplicate the event times for max and min sections of the reduction
2217     reductionMsg[minOffset + i] = currentExecTimes[i];
2218     reductionMsg[maxOffset + i] = currentExecTimes[i];
2219     // compute squares
2220     reductionMsg[sosOffset + i] = currentExecTimes[i]*currentExecTimes[i];
2221   }
2222
2223   CkCallback cb(CkIndex_KMeansBOC::globalMetricRefinement(NULL), 
2224                 0, thisProxy);
2225   contribute((numMetrics*4)*sizeof(double), reductionMsg, 
2226              outlierReductionType, cb);  
2227 }
2228
2229 // The purpose is mainly to initialize the k seeds and generate
2230 //   normalization parameters for each of the metrics. The k seeds
2231 //   and normalization parameters are broadcast to all processors.
2232 //
2233 // Called on processor 0
2234 void KMeansBOC::globalMetricRefinement(CkReductionMsg *msg) {
2235   CkAssert(CkMyPe() == 0);
2236   
2237   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::globalMetricRefinement time=\t%g\n", CkMyPe(), CkWallTimer() );
2238
2239   int sumOffset = 0;
2240   int minOffset = numMetrics;
2241   int maxOffset = 2*numMetrics;
2242   int sosOffset = 3*numMetrics; // sos = Sum Of Squares
2243
2244   // calculate statistics & boundaries for the k seeds for clustering
2245   KMeansStatsMessage *outmsg = 
2246     new (numMetrics, numK*numMetrics, numMetrics*4) KMeansStatsMessage;
2247   outmsg->numMetrics = numMetrics;
2248   outmsg->numKPos = numK*numMetrics;
2249   outmsg->numStats = numMetrics*4;
2250
2251   // Sum | Min | Max | Sum of Squares
2252   double *totalExecTimes = (double *)msg->getData();
2253   double totalTime = 0.0;
2254
2255   for (int i=0; i<numMetrics; i++) {
2256     DEBUGN("%lf\n", totalExecTimes[i]);
2257     totalTime += totalExecTimes[i];
2258
2259     // calculate event mean over all processors
2260     outmsg->stats[sumOffset + i] = totalExecTimes[sumOffset + i]/CkNumPes();
2261
2262     // get the ranges and offsets of each metric. With this, we get
2263     //   normalization factors that can be sent back to each processor to
2264     //   be used as necessary. We reuse max for range. Min remains the offset.
2265     outmsg->stats[minOffset + i] = totalExecTimes[minOffset + i];
2266     outmsg->stats[maxOffset + i] = totalExecTimes[maxOffset + i] -
2267       totalExecTimes[minOffset + i];
2268     
2269     // calculate stddev (using biased variance)
2270     outmsg->stats[sosOffset + i] = 
2271       sqrt((totalExecTimes[sosOffset + i] - 
2272             2*(outmsg->stats[i])*totalExecTimes[i] +
2273             (outmsg->stats[i])*(outmsg->stats[i])*CkNumPes())/
2274            CkNumPes());
2275   }
2276
2277   for (int i=0; i<numMetrics; i++) {
2278     // 1) if the proportion of the max value of the entry method relative to
2279     //   the average time taken over all entry methods across all processors
2280     //   is greater than the stipulated percentage threshold ...; AND
2281     // 2) if the range of values are non-zero.
2282     //
2283     // The current assumption is totalTime > 0 (what program has zero total
2284     //   time from all work?)
2285     keepMetric[i] = ((totalExecTimes[maxOffset + i]/(totalTime/CkNumPes()) >=
2286                      entryThreshold) &&
2287       (totalExecTimes[maxOffset + i] > totalExecTimes[minOffset + i]));
2288     if (keepMetric[i]) {
2289       DEBUGF("[%d] Keep EP %d | Max = %lf | Avg Tot = %lf\n", CkMyPe(), i,
2290              totalExecTimes[maxOffset + i], totalTime/CkNumPes());
2291     } else {
2292       DEBUGN("[%d] DO NOT Keep EP %d\n", CkMyPe(), i);
2293     }
2294     outmsg->filter[i] = keepMetric[i];
2295   }
2296
2297   delete msg;
2298   
2299   // initialize k seeds for this phase
2300   kSeeds = new double[numK*numMetrics];
2301
2302   numKReported = 0;
2303   kNumMembers = new int[numK];
2304
2305   // Randomly select k processors' metric vectors for the k seeds
2306   //  srand((unsigned)(CmiWallTimer()*1.0e06));
2307   srand(11337); // for debugging purposes
2308   for (int k=0; k<numK; k++) {
2309     DEBUGF("Seed %d | ", k);
2310     for (int m=0; m<numMetrics; m++) {
2311       double factor = totalExecTimes[maxOffset + m] - 
2312         totalExecTimes[minOffset + m];
2313       // "uniform" distribution, scaled according to the normalization
2314       //   factors
2315       //      kSeeds[numMetrics*k + m] = ((1.0*(k+1))/numK)*factor;
2316       // Random distribution.
2317       kSeeds[numMetrics*k + m] =
2318         ((rand()*1.0)/RAND_MAX)*factor;
2319       if (keepMetric[m]) {
2320         DEBUGF("[%d|%lf] ", m, kSeeds[numMetrics*k + m]);
2321       }
2322       outmsg->kSeedsPos[numMetrics*k + m] = kSeeds[numMetrics*k + m];
2323     }
2324     DEBUGF("\n");
2325     kNumMembers[k] = 0;
2326   }
2327
2328   // broadcast statistical values to all processors for cluster discovery
2329   thisProxy.findInitialClusters(outmsg);
2330 }
2331
2332
2333
2334 // Called on each processor.
2335 void KMeansBOC::findInitialClusters(KMeansStatsMessage *msg) {
2336
2337  if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::findInitialClusters time=\t%g\n", CkMyPe(), CkWallTimer() );
2338
2339   phaseIter = 0;
2340
2341   // Get info from stats message
2342   CkAssert(numMetrics == msg->numMetrics);
2343   for (int i=0; i<numMetrics; i++) {
2344     keepMetric[i] = msg->filter[i];
2345   }
2346
2347   // Normalize data on local processor.
2348   // **CWL** See my thesis for detailed discussion of normalization of
2349   //    performance data.
2350   // **NOTE** This might change if we want to send data based on the filter
2351   //   instead of all the data.
2352   CkAssert(numMetrics*4 == msg->numStats);
2353   for (int i=0; i<numMetrics; i++) {
2354     currentExecTimes[i] -= msg->stats[numMetrics + i];  // take offset
2355     // **CWL** We do not normalize the range. Entry methods that exhibit
2356     //   large absolute timing variations should be allowed to contribute
2357     //   more to the Euclidean distance measure!
2358     // currentExecTimes[i] /= msg->stats[2*numMetrics + i];
2359   }
2360
2361   // **NOTE** This might change if we want to send data based on the filter
2362   //   instead of all the data.
2363   CkAssert(numK*numMetrics == msg->numKPos);
2364   for (int i=0; i<msg->numKPos; i++) {
2365     incKSeeds[i] = msg->kSeedsPos[i];
2366   }
2367
2368   // Decide which KSeed this processor belongs to.
2369   minDistance = calculateDistance(0);
2370   DEBUGN("[%d] Phase %d Iter %d | Distance from 0 = %lf \n", CkMyPe(), 
2371            currentPhase, phaseIter, minDistance);
2372   minK = 0;
2373   for (int i=1; i<numK; i++) {
2374     double distance = calculateDistance(i);
2375     DEBUGN("[%d] Phase %d Iter %d | Distance from %d = %lf \n", CkMyPe(), 
2376              currentPhase, phaseIter, i, distance);
2377     if (distance < minDistance) {
2378       minDistance = distance;
2379       minK = i;
2380     }
2381   }
2382
2383   // Set up a reduction with the modification vector to the root (0).
2384   //
2385   // The modification vector sends a negative value for each metric
2386   //   for the K this processor no longer belongs to and a positive
2387   //   value to the K the processor now belongs. In addition, a -1.0
2388   //   is sent to the K it is leaving and a +1.0 to the K it is 
2389   //   joining.
2390   //
2391   // The processor must still contribute a "zero returns" even if
2392   //   nothing changes. This will be the basis for determine
2393   //   convergence at the root.
2394   //
2395   // The addtional +1 is meant for the count-change that must be
2396   //   maintained for the special cases at the root when some K
2397   //   may be deprived of all processor points or go from 0 to a
2398   //   positive number of processors (see later comments).
2399   double *modVector = new double[numK*(numMetrics+1)];
2400   for (int i=0; i<numK; i++) {
2401     for (int j=0; j<numMetrics+1; j++) {
2402       modVector[i*(numMetrics+1) + j] = 0.0;
2403     }
2404   }
2405   for (int i=0; i<numMetrics; i++) {
2406     // for this initialization, only positive values need be sent.
2407     modVector[minK*(numMetrics+1) + i] = currentExecTimes[i];
2408   }
2409   modVector[minK*(numMetrics+1)+numMetrics] = 1.0;
2410
2411   CkCallback cb(CkIndex_KMeansBOC::updateKSeeds(NULL), 
2412                 0, thisProxy);
2413   contribute(numK*(numMetrics+1)*sizeof(double), modVector, 
2414              CkReduction::sum_double, cb);  
2415 }
2416
2417 double KMeansBOC::calculateDistance(int k) {
2418   double ret = 0.0;
2419   for (int i=0; i<numMetrics; i++) {
2420     if (keepMetric[i]) {
2421       DEBUGN("[%d] Phase %d Iter %d Metric %d Exec %lf Seed %lf \n", 
2422              CkMyPe(), currentPhase, phaseIter, i,
2423                currentExecTimes[i], incKSeeds[k*numMetrics + i]);
2424       ret += pow(currentExecTimes[i] - incKSeeds[k*numMetrics + i], 2.0);
2425     }
2426   }
2427   return sqrt(ret);
2428 }
2429
2430 void KMeansBOC::updateKSeeds(CkReductionMsg *msg) {
2431   CkAssert(CkMyPe() == 0);
2432
2433   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::updateKSeeds time=\t%g\n", CkMyPe(), CkWallTimer() );
2434
2435   double *modVector = (double *)msg->getData();
2436   // sanity check
2437   CkAssert(numK*(numMetrics+1)*sizeof(double) == msg->getSize());
2438
2439   // A quick convergence test.
2440   bool hasChanges = false;
2441   for (int i=0; i<numK; i++) {
2442     hasChanges = hasChanges || 
2443       (modVector[i*(numMetrics+1) + numMetrics] != 0.0);
2444   }
2445   if (!hasChanges) {
2446     delete msg;
2447     findRepresentatives();
2448   } else {
2449     int overallChange = 0;
2450     for (int i=0; i<numK; i++) {
2451       int change = (int)modVector[i*(numMetrics+1) + numMetrics];
2452       if (change != 0) {
2453         overallChange += change;
2454         // modify the k seeds based on the modification vectors coming in
2455         //
2456         // If a seed initially has no members, its contents do not matter and
2457         //   is simply set to the average of the incoming vector.
2458         // If the change causes a seed to lose all its members, do nothing.
2459         //   Its last-known location is kept to allow it to re-capture
2460         //   membership at the next iteration rather than apply the last
2461         //   changes (which snaps the point unnaturally to 0,0).
2462         // Otherwise, apply the appropriate vector changes.
2463         CkAssert((kNumMembers[i] + change >= 0) &&
2464                  (kNumMembers[i] + change <= CkNumPes()));
2465         if (kNumMembers[i] == 0) {
2466           CkAssert(change > 0);
2467           for (int j=0; j<numMetrics; j++) {
2468             kSeeds[i*numMetrics + j] = modVector[i*(numMetrics+1) + j]/change;
2469           }
2470         } else if (kNumMembers[i] + change == 0) {
2471           // do nothing.
2472         } else {
2473           for (int j=0; j<numMetrics; j++) {
2474             kSeeds[i*numMetrics + j] *= kNumMembers[i];
2475             kSeeds[i*numMetrics + j] += modVector[i*(numMetrics+1) + j];
2476             kSeeds[i*numMetrics + j] /= kNumMembers[i] + change;
2477           }
2478         }
2479         kNumMembers[i] += change;
2480       }
2481       DEBUGN("[%d] Phase %d Iter %d K = %d Membership Count = %d\n",
2482              CkMyPe(), currentPhase, phaseIter, i, kNumMembers[i]);
2483     }
2484     delete msg;
2485
2486     // broadcast the new seed locations.
2487     KSeedsMessage *outmsg = new (numK*numMetrics) KSeedsMessage;
2488     outmsg->numKPos = numK*numMetrics;
2489     for (int i=0; i<numK*numMetrics; i++) {
2490       outmsg->kSeedsPos[i] = kSeeds[i];
2491     }
2492
2493     thisProxy.updateSeedMembership(outmsg);
2494   }
2495 }
2496
2497 // Called on all processors
2498 void KMeansBOC::updateSeedMembership(KSeedsMessage *msg) {
2499
2500   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::updateSeedMembership time=\t%g\n", CkMyPe(), CkWallTimer() );
2501
2502   phaseIter++;
2503
2504   // **NOTE** This might change if we want to send data based on the filter
2505   //   instead of all the data.
2506   CkAssert(numK*numMetrics == msg->numKPos);
2507   for (int i=0; i<msg->numKPos; i++) {
2508     incKSeeds[i] = msg->kSeedsPos[i];
2509   }
2510
2511   // Decide which KSeed this processor belongs to.
2512   lastMinK = minK;
2513   minDistance = calculateDistance(0);
2514   DEBUGN("[%d] Phase %d Iter %d | Distance from 0 = %lf \n", CkMyPe(), 
2515          currentPhase, phaseIter, minDistance);
2516
2517   minK = 0;
2518   for (int i=1; i<numK; i++) {
2519     double distance = calculateDistance(i);
2520     DEBUGN("[%d] Phase %d Iter %d | Distance from %d = %lf \n", CkMyPe(), 
2521            currentPhase, phaseIter, i, distance);
2522     if (distance < minDistance) {
2523       minDistance = distance;
2524       minK = i;
2525     }
2526   }
2527
2528   double *modVector = new double[numK*(numMetrics+1)];
2529   for (int i=0; i<numK; i++) {
2530     for (int j=0; j<numMetrics+1; j++) {
2531       modVector[i*(numMetrics+1) + j] = 0.0;
2532     }
2533   }
2534
2535   if (minK != lastMinK) {
2536     for (int i=0; i<numMetrics; i++) {
2537       modVector[minK*(numMetrics+1) + i] = currentExecTimes[i];
2538       modVector[lastMinK*(numMetrics+1) + i] = -currentExecTimes[i];
2539     }
2540     modVector[minK*(numMetrics+1)+numMetrics] = 1.0;
2541     modVector[lastMinK*(numMetrics+1)+numMetrics] = -1.0;
2542   }
2543
2544   CkCallback cb(CkIndex_KMeansBOC::updateKSeeds(NULL), 
2545                 0, thisProxy);
2546   contribute(numK*(numMetrics+1)*sizeof(double), modVector, 
2547              CkReduction::sum_double, cb);  
2548 }
2549
2550 void KMeansBOC::findRepresentatives() {
2551
2552   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::findRepresentatives time=\t%g\n", CkMyPe(), CkWallTimer() );
2553
2554   int numNonEmptyClusters = 0;
2555   for (int i=0; i<numK; i++) {
2556     if (kNumMembers[i] > 0) {
2557       numNonEmptyClusters++;
2558     }
2559   }
2560
2561   int numRepresentatives = peNumKeep;
2562   // **FIXME**
2563   // This is fairly arbitrary. Next time, choose the centers of the top
2564   //   largest clusters.
2565   if (numRepresentatives < numNonEmptyClusters) {
2566     numRepresentatives = numNonEmptyClusters;
2567   }
2568
2569   int slotsRemaining = numRepresentatives;
2570
2571   DEBUGF("Slots = %d | Non-empty = %d \n", slotsRemaining, 
2572          numNonEmptyClusters);
2573
2574   // determine how many exemplars to select per cluster. Currently
2575   //   hardcoded to 1. Future challenge is to decide on other numbers
2576   //   or proportionality.
2577   //
2578   int exemplarsPerCluster = 1;
2579   slotsRemaining -= exemplarsPerCluster*numNonEmptyClusters;
2580
2581   int numCandidateOutliers = CkNumPes() - 
2582     exemplarsPerCluster*numNonEmptyClusters;
2583
2584   double *remainders = new double[numK];
2585   int *assigned = new int[numK];
2586   exemplarChoicesLeft = new int[numK];
2587   outlierChoicesLeft = new int[numK];
2588
2589   for (int i=0; i<numK; i++) {
2590     assigned[i] = 0;
2591     remainders[i] = 
2592       (kNumMembers[i] - exemplarsPerCluster*numNonEmptyClusters) *
2593       slotsRemaining / numCandidateOutliers;
2594     if (remainders[i] >= 0.0) {
2595       assigned[i] = (int)floor(remainders[i]);
2596       remainders[i] -= assigned[i];
2597     } else {
2598       remainders[i] = 0.0;
2599     }
2600   }
2601
2602   for (int i=0; i<numK; i++) {
2603     slotsRemaining -= assigned[i];
2604   }
2605   CkAssert(slotsRemaining >= 0);
2606
2607   // find clusters to assign the loose slots to, in order of
2608   // remainder proportion
2609   while (slotsRemaining > 0) {
2610     double max = 0.0;
2611     int winner = 0;
2612     for (int i=0; i<numK; i++) {
2613       if (remainders[i] > max) {
2614         max = remainders[i];
2615         winner = i;
2616       }
2617     }
2618     assigned[winner]++;
2619     remainders[winner] = 0.0;
2620     slotsRemaining--;
2621   }
2622
2623   // set up how many reduction cycles of min/max we need to conduct to
2624   // select the representatives.
2625   numSelectionIter = exemplarsPerCluster;
2626   for (int i=0; i<numK; i++) {
2627     if (assigned[i] > numSelectionIter) {
2628       numSelectionIter = assigned[i];
2629     }
2630   }
2631   DEBUGF("Selection Iterations = %d\n", numSelectionIter);
2632
2633   for (int i=0; i<numK; i++) {
2634     if (kNumMembers[i] > 0) {
2635       exemplarChoicesLeft[i] = exemplarsPerCluster;
2636       outlierChoicesLeft[i] = assigned[i];
2637     } else {
2638       exemplarChoicesLeft[i] = 0;
2639       outlierChoicesLeft[i] = 0;
2640     }
2641     DEBUGF("%d | Exemplar = %d | Outlier = %d\n", i, exemplarChoicesLeft[i],
2642            outlierChoicesLeft[i]);
2643   }
2644
2645   delete [] assigned;
2646   delete [] remainders;
2647
2648   // send out first broadcast
2649   KSelectionMessage *outmsg = NULL;
2650   if (numSelectionIter > 0) {
2651     outmsg = new (numK, numK, numK) KSelectionMessage;
2652     outmsg->numKMinIDs = numK;
2653     outmsg->numKMaxIDs = numK;
2654     for (int i=0; i<numK; i++) {
2655       outmsg->minIDs[i] = -1;
2656       outmsg->maxIDs[i] = -1;
2657     }
2658     thisProxy.collectDistances(outmsg);
2659   } else {
2660     CkPrintf("Warning: No selection iteration from the start!\n");
2661     // invoke phase completion on all processors
2662     thisProxy.phaseDone();
2663   }
2664 }
2665
2666 /*
2667  *  lastMin = array of minimum champions of the last tournament
2668  *  lastMax = array of maximum champions of the last tournament
2669  *  lastMaxVal = array of last encountered maximum values, allows previous
2670  *                 minimum winners to eliminate themselves from the next
2671  *                 minimum race.
2672  *
2673  *  Called on all processors.
2674  */
2675 void KMeansBOC::collectDistances(KSelectionMessage *msg) {
2676
2677   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::collectDistances time=\t%g\n", CkMyPe(), CkWallTimer() );
2678
2679   DEBUGF("[%d] %d | min = %d max = %d\n", CkMyPe(),
2680          lastMinK, msg->minIDs[lastMinK], msg->maxIDs[lastMinK]);
2681   if ((CkMyPe() == msg->minIDs[lastMinK]) || 
2682       (CkMyPe() == msg->maxIDs[lastMinK])) {
2683     CkAssert(!selected);
2684     selected = true;
2685   }
2686
2687   // build outgoing reduction structure
2688   //   format = minVal | ID | maxVal | ID
2689   double *minMaxAndIDs = NULL;
2690
2691   minMaxAndIDs = new double[numK*4];
2692   // initialize to the appropriate out-of-band values (for error checks)
2693   for (int i=0; i<numK; i++) {
2694     minMaxAndIDs[i*4] = -1.0; // out-of-band min value
2695     minMaxAndIDs[i*4+1] = -1.0; // out of band ID
2696     minMaxAndIDs[i*4+2] = -1.0; // out-of-band max value
2697     minMaxAndIDs[i*4+3] = -1.0; // out of band ID
2698   }
2699   // If I have not won before, I put myself back into the competition
2700   if (!selected) {
2701     DEBUGF("[%d] My Contribution = %lf\n", CkMyPe(), minDistance);
2702     minMaxAndIDs[lastMinK*4] = minDistance;
2703     minMaxAndIDs[lastMinK*4+1] = CkMyPe();
2704     minMaxAndIDs[lastMinK*4+2] = minDistance;
2705     minMaxAndIDs[lastMinK*4+3] = CkMyPe();
2706   }
2707   delete msg;
2708
2709   CkCallback cb(CkIndex_KMeansBOC::findNextMinMax(NULL), 
2710                 0, thisProxy);
2711   contribute(numK*4*sizeof(double), minMaxAndIDs, 
2712              minMaxReductionType, cb);  
2713 }
2714
2715 void KMeansBOC::findNextMinMax(CkReductionMsg *msg) {
2716   // incoming format:
2717   //   minVal | minID | maxVal | maxID
2718
2719   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::findNextMinMax time=\t%g\n", CkMyPe(), CkWallTimer() );
2720
2721   if (numSelectionIter > 0) {
2722     double *incInfo = (double *)msg->getData();
2723     
2724     KSelectionMessage *outmsg = new (numK, numK) KSelectionMessage;
2725     outmsg->numKMinIDs = numK;
2726     outmsg->numKMaxIDs = numK;
2727     
2728     for (int i=0; i<numK; i++) {
2729       DEBUGF("%d | %lf %d %lf %d \n", i, 
2730              incInfo[i*4], (int)incInfo[i*4+1], 
2731              incInfo[i*4+2], (int)incInfo[i*4+3]);
2732     }
2733
2734     for (int i=0; i<numK; i++) {
2735       if (exemplarChoicesLeft[i] > 0) {
2736         outmsg->minIDs[i] = (int)incInfo[i*4+1];
2737         exemplarChoicesLeft[i]--;
2738       } else {
2739         outmsg->minIDs[i] = -1;
2740       }
2741       if (outlierChoicesLeft[i] > 0) {
2742         outmsg->maxIDs[i] = (int)incInfo[i*4+3];
2743         outlierChoicesLeft[i]--;
2744       } else {
2745         outmsg->maxIDs[i] = -1;
2746       }
2747     }
2748     thisProxy.collectDistances(outmsg);
2749     numSelectionIter--;
2750   } else {
2751     // invoke phase completion on all processors
2752     thisProxy.phaseDone();
2753   }
2754 }
2755
2756 /**
2757  *  Completion of the K-Means clustering and data selection of one phase
2758  *    of the computation.
2759  *
2760  *  Called on every processor.
2761  */
2762 void KMeansBOC::phaseDone() {
2763
2764   //  if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::phaseDone time=\t%g\n", CkMyPe(), CkWallTimer() );
2765
2766   LogPool *pool = CkpvAccess(_trace)->_logPool;
2767   CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
2768
2769   // now decide on what to do with the decision.
2770   if (!selected) {
2771     if (usePhases) {
2772       pool->keepPhase[currentPhase] = false;
2773     } else {
2774       // if not using phases, we're working on the whole log
2775       pool->setAllPhases(false);
2776     }
2777   }
2778
2779   // **FIXME** (?) - All processors have to agree on this, or the reduction
2780   //   will not be correct! The question is "is this enforcible?"
2781   if ((currentPhase == (pool->numPhases-1)) || !usePhases) {
2782     // We're done
2783     int dummy = 0;
2784     CkCallback cb(CkIndex_TraceProjectionsBOC::kMeansDone(NULL), 
2785                   0, bocProxy);
2786     contribute(sizeof(int), &dummy, CkReduction::sum_int, cb);
2787   } else {
2788     // reset all phase-based k-means data and decisions
2789
2790     // **FIXME**!!!!!    
2791     
2792     // invoke the next K-Means computation phase.
2793     currentPhase++;
2794     thisProxy[CkMyPe()].getNextPhaseMetrics();
2795   }
2796 }
2797
2798 void TraceProjectionsBOC::startTimeAnalysis()
2799 {
2800   double startTime = 0.0;
2801   if (CkpvAccess(_trace)->_logPool->numEntries>0)
2802      startTime = CkpvAccess(_trace)->_logPool->pool[0].time;
2803   CkCallback cb(CkIndex_TraceProjectionsBOC::startTimeDone(NULL), thisProxy);
2804   contribute(sizeof(double), &startTime, CkReduction::min_double, cb);  
2805 }
2806
2807 void TraceProjectionsBOC::startTimeDone(CkReductionMsg *msg)
2808 {
2809   // CkPrintf("[%d] TraceProjectionsBOC::startTimeDone time=\t%g parModulesRemaining:%d\n", CkMyPe(), CkWallTimer(), parModulesRemaining);
2810
2811   if (CkpvAccess(_trace) != NULL) {
2812     CkpvAccess(_trace)->_logPool->globalStartTime = *(double *)msg->getData();
2813     CkpvAccess(_trace)->_logPool->setNewStartTime();
2814     //if (CkMyPe() == 0) CkPrintf("Start time determined to be %lf us\n", (CkpvAccess(_trace)->_logPool->globalStartTime)*1e06);
2815   }
2816   delete msg;
2817   thisProxy[CkMyPe()].startEndTimeAnalysis();
2818 }
2819
2820 void TraceProjectionsBOC::startEndTimeAnalysis()
2821 {
2822  //CkPrintf("[%d] TraceProjectionsBOC::startEndTimeAnalysis time=\t%g\n", CkMyPe(), CkWallTimer() );
2823
2824   endTime = CkpvAccess(_trace)->endTime;
2825   // CkPrintf("[%d] End time is %lf us\n", CkMyPe(), endTime*1e06);
2826
2827   CkCallback cb(CkIndex_TraceProjectionsBOC::endTimeDone(NULL), 
2828                 0, thisProxy);
2829   contribute(sizeof(double), &endTime, CkReduction::max_double, cb);  
2830 }
2831
2832 void TraceProjectionsBOC::endTimeDone(CkReductionMsg *msg)
2833 {
2834  //if(CkMyPe()==0)    CkPrintf("[%d] TraceProjectionsBOC::endTimeDone time=\t%g parModulesRemaining:%d\n", CkMyPe(), CkWallTimer(), parModulesRemaining);
2835
2836   CkAssert(CkMyPe() == 0);
2837   parModulesRemaining--;
2838   if (CkpvAccess(_trace) != NULL) {
2839     CkpvAccess(_trace)->_logPool->globalEndTime = *(double *)msg->getData() - CkpvAccess(_trace)->_logPool->globalStartTime;
2840     // CkPrintf("End time determined to be %lf us\n",
2841     //       (CkpvAccess(_trace)->_logPool->globalEndTime)*1e06);
2842   }
2843   delete msg;
2844   if (parModulesRemaining == 0) {
2845     thisProxy[CkMyPe()].finalize();
2846   }
2847 }
2848
2849 void TraceProjectionsBOC::kMeansDone(CkReductionMsg *msg) {
2850
2851  if(CkMyPe()==0)  CkPrintf("[%d] TraceProjectionsBOC::kMeansDone time=\t%g\n", CkMyPe(), CkWallTimer() );
2852
2853   CkAssert(CkMyPe() == 0);
2854   parModulesRemaining--;
2855   CkPrintf("K-Means Analysis Time = %lf seconds\n",
2856            CmiWallTimer()-analysisStartTime);
2857   delete msg;
2858   if (parModulesRemaining == 0) {
2859     thisProxy[CkMyPe()].finalize();
2860   }
2861 }
2862
2863 /**
2864  *
2865  *  This version is called (on processor 0) only if flushCheck fails.
2866  *
2867  */
2868 void TraceProjectionsBOC::kMeansDone() {
2869   CkAssert(CkMyPe() == 0);
2870   parModulesRemaining--;
2871   CkPrintf("K-Means Analysis Aborted because of flush. Time taken = %lf seconds\n",
2872            CmiWallTimer()-analysisStartTime);
2873   if (parModulesRemaining == 0) {
2874     thisProxy[CkMyPe()].finalize();
2875   }
2876 }
2877
2878 void TraceProjectionsBOC::finalize()
2879 {
2880   CkAssert(CkMyPe() == 0);
2881   //CkPrintf("Total Analysis Time = %lf seconds\n", 
2882   //       CmiWallTimer()-analysisStartTime);
2883   thisProxy.closingTraces();
2884 }
2885
2886 // Called on every processor
2887 void TraceProjectionsBOC::closingTraces() {
2888   CkpvAccess(_trace)->closeTrace();
2889
2890     // subtle:  reduction needs to go to the PE which started CkExit()
2891   int pe = 0;
2892   if (endPe != -1) pe = endPe;
2893   CkCallback cb(CkIndex_TraceProjectionsBOC::closeParallelShutdown(NULL), 
2894                 pe, thisProxy); 
2895   contribute(0, NULL, CkReduction::sum_int, cb);  
2896 }
2897
2898 // The sole purpose of this reduction is to decide whether or not
2899 //   Projections as a module needs to call CkExit() to get other
2900 //   modules to shutdown.
2901 void TraceProjectionsBOC::closeParallelShutdown(CkReductionMsg *msg) {
2902   CkAssert(endPe == -1 && CkMyPe() ==0 || CkMyPe() == endPe);
2903   delete msg;
2904   // decide if CkExit() needs to be called
2905   if (!CkpvAccess(_trace)->converseExit) {
2906     CkExit();
2907   }
2908 }
2909 /*
2910  *  Registration and definition of the Outlier Reduction callback.
2911  *  Format: Sum | Min | Max | Sum of Squares
2912  */
2913 CkReductionMsg *outlierReduction(int nMsgs,
2914                                  CkReductionMsg **msgs) {
2915   int numBytes = 0;
2916   int numMetrics = 0;
2917   double *ret = NULL;
2918
2919   if (nMsgs == 1) {
2920     // nothing to do, just pass it on
2921     return CkReductionMsg::buildNew(msgs[0]->getSize(),msgs[0]->getData());
2922   }
2923
2924   if (nMsgs > 1) {
2925     numBytes = msgs[0]->getSize();
2926     // sanity checks
2927     if (numBytes%sizeof(double) != 0) {
2928       CkAbort("Outlier Reduction Size incompatible with doubles!\n");
2929     }
2930     if ((numBytes/sizeof(double))%4 != 0) {
2931       CkAbort("Outlier Reduction Size Array not divisible by 4!\n");
2932     }
2933     numMetrics = (numBytes/sizeof(double))/4;
2934     ret = new double[numMetrics*4];
2935
2936     // copy the first message data into the return structure first
2937     for (int i=0; i<numMetrics*4; i++) {
2938       ret[i] = ((double *)msgs[0]->getData())[i];
2939     }
2940
2941     // Sum | Min | Max | Sum of Squares
2942     for (int msgIdx=1; msgIdx<nMsgs; msgIdx++) {
2943       for (int i=0; i<numMetrics; i++) {
2944         // Sum
2945         ret[i] += ((double *)msgs[msgIdx]->getData())[i];
2946         // Min
2947         ret[numMetrics + i] =
2948           (ret[numMetrics + i] < 
2949            ((double *)msgs[msgIdx]->getData())[numMetrics + i]) 
2950           ? ret[numMetrics + i] : 
2951           ((double *)msgs[msgIdx]->getData())[numMetrics + i];
2952         // Max
2953         ret[2*numMetrics + i] = 
2954           (ret[2*numMetrics + i] >
2955            ((double *)msgs[msgIdx]->getData())[2*numMetrics + i])
2956           ? ret[2*numMetrics + i] :
2957           ((double *)msgs[msgIdx]->getData())[2*numMetrics + i];
2958         // Sum of Squares (squaring already done at leaf)
2959         ret[3*numMetrics + i] +=
2960           ((double *)msgs[msgIdx]->getData())[3*numMetrics + i];
2961       }
2962     }
2963   }
2964   
2965   /* apparently, we do not delete the incoming messages */
2966   return CkReductionMsg::buildNew(numBytes,ret);
2967 }
2968
2969 /*
2970  * The only reason we have a user-defined reduction is to support
2971  *   identification of the "winning" processors as well as to handle
2972  *   both the min and the max of each "tournament". A simple min/max
2973  *   discovery cannot handle ties.
2974  */
2975 CkReductionMsg *minMaxReduction(int nMsgs,
2976                                 CkReductionMsg **msgs) {
2977   CkAssert(nMsgs > 0);
2978
2979   int numBytes = msgs[0]->getSize();
2980   CkAssert(numBytes%sizeof(double) == 0);
2981   int numK = (numBytes/sizeof(double))/4;
2982
2983   double *ret = new double[numK*4];
2984   // fill with out-of-band values
2985   for (int i=0; i<numK; i++) {
2986     ret[i*4] = -1.0;
2987     ret[i*4+1] = -1.0;
2988     ret[i*4+2] = -1.0;
2989     ret[i*4+3] = -1.0;
2990   }
2991
2992   // incoming format K * (minVal | minIdx | maxVal | maxIdx)
2993   for (int i=0; i<nMsgs; i++) {
2994     double *temp = (double *)msgs[i]->getData();
2995     for (int j=0; j<numK; j++) {
2996       // no previous valid min
2997       if (ret[j*4+1] < 0) {
2998         // fill it in only if the incoming min is valid
2999         if (temp[j*4+1] >= 0) {
3000           ret[j*4] = temp[j*4];      // fill min value
3001           ret[j*4+1] = temp[j*4+1];  // fill ID
3002         }
3003       } else {
3004         // find Min, only if incoming min is valid
3005         if (temp[j*4+1] >= 0) {
3006           if (temp[j*4] < ret[j*4]) {
3007             ret[j*4] = temp[j*4];      // replace min value
3008             ret[j*4+1] = temp[j*4+1];  // replace ID
3009           }
3010         }
3011       }
3012       // no previous valid max
3013       if (ret[j*4+3] < 0) {
3014         // fill only if the incoming max is valid
3015         if (temp[j*4+3] >= 0) {
3016           ret[j*4+2] = temp[j*4+2];  // fill max value
3017           ret[j*4+3] = temp[j*4+3];  // fill ID
3018         }
3019       } else {
3020         // find Max, only if incoming max is valid
3021         if (temp[j*4+3] >= 0) {
3022           if (temp[j*4+2] > ret[j*4+2]) {
3023             ret[j*4+2] = temp[j*4+2];  // replace max value
3024             ret[j*4+3] = temp[j*4+3];  // replace ID
3025           }
3026         }
3027       }
3028     }
3029   }
3030   CkReductionMsg *redmsg = CkReductionMsg::buildNew(numBytes, ret);
3031   delete [] ret;
3032   return redmsg;
3033 }
3034
3035 #include "TraceProjections.def.h"
3036 #endif //PROJ_ANALYSIS
3037
3038 /*@}*/