merge projections and autoperf to use same PAPI init functions
[charm.git] / src / ck-perf / trace-projections.C
1 /**
2  * \addtogroup CkPerf
3 */
4 /*@{*/
5
6 #include <string.h>
7
8 #include "charm++.h"
9 #include "trace-projections.h"
10 #include "trace-projectionsBOC.h"
11 #include "TopoManager.h"
12
13 #if DEBUG_PROJ
14 #define DEBUGF(...) CkPrintf(__VA_ARGS__)
15 #else
16 #define DEBUGF(...)
17 #endif
18 #define DEBUGN(...)  // easy way to selectively disable DEBUGs
19
20 #define DefaultLogBufSize      1000000
21
22 // **CW** Simple delta encoding implementation
23 // delta encoding is on by default. It may be turned off later in
24 // the runtime.
25 int deltaLog;
26 int nonDeltaLog;
27
28 int checknested=0;              // check illegal nested begin/end execute 
29
30 #ifdef PROJ_ANALYSIS
31 // BOC operations readonlys
32 CkGroupID traceProjectionsGID;
33 CkGroupID kMeansGID;
34
35 // New reduction type for Outlier Analysis purposes. This is allowed to be
36 // a global variable according to the Charm++ manual.
37 CkReductionMsg *outlierReduction(int nMsgs,
38                                  CkReductionMsg **msgs);
39 CkReductionMsg *minMaxReduction(int nMsgs,
40                                 CkReductionMsg **msgs);
41 CkReduction::reducerType outlierReductionType;
42 CkReduction::reducerType minMaxReductionType;
43 #endif // PROJ_ANALYSIS
44
45 CkpvStaticDeclare(TraceProjections*, _trace);
46 CtvStaticDeclare(int,curThreadEvent);
47
48 CkpvDeclare(CmiInt8, CtrLogBufSize);
49
50 typedef CkVec<char *>  usrEventVec;
51 CkpvStaticDeclare(usrEventVec, usrEventlist);
52 class UsrEvent {
53 public:
54   int e;
55   char *str;
56   UsrEvent(int _e, char* _s): e(_e),str(_s) {}
57 };
58 CkpvStaticDeclare(CkVec<UsrEvent *>*, usrEvents);
59
60 // When tracing is disabled, these are defined as empty static inlines
61 // in the header, to minimize overhead
62 #if CMK_TRACE_ENABLED
63 /// Disable the outputting of the trace logs
64 void disableTraceLogOutput()
65
66   CkpvAccess(_trace)->setWriteData(false);
67 }
68
69 /// Enable the outputting of the trace logs
70 void enableTraceLogOutput()
71 {
72   CkpvAccess(_trace)->setWriteData(true);
73 }
74
75 /// Force the log files to be flushed
76 void flushTraceLog()
77 {
78   CkpvAccess(_trace)->traceFlushLog();
79 }
80 #endif
81
82 #if ! CMK_TRACE_ENABLED
83 static int warned=0;
84 #define OPTIMIZED_VERSION       \
85         if (!warned) { warned=1;        \
86         CmiPrintf("\n\n!!!! Warning: traceUserEvent not available in optimized version!!!!\n\n\n"); }
87 #else
88 #define OPTIMIZED_VERSION /*empty*/
89 #endif // CMK_TRACE_ENABLED
90
91 /*
92 On T3E, we need to have file number control by open/close files only when needed.
93 */
94 #if CMK_TRACE_LOGFILE_NUM_CONTROL
95   #define OPEN_LOG openLog("a");
96   #define CLOSE_LOG closeLog();
97 #else
98   #define OPEN_LOG
99   #define CLOSE_LOG
100 #endif //CMK_TRACE_LOGFILE_NUM_CONTROL
101
102 /**
103   For each TraceFoo module, _createTraceFoo() must be defined.
104   This function is called in _createTraces() generated in moduleInit.C
105 */
106 void _createTraceprojections(char **argv)
107 {
108   DEBUGF("%d createTraceProjections\n", CkMyPe());
109   CkpvInitialize(CkVec<char *>, usrEventlist);
110   CkpvInitialize(CkVec<UsrEvent *>*, usrEvents);
111   CkpvAccess(usrEvents) = new CkVec<UsrEvent *>();
112 #if CMK_BIGSIM_CHARM
113   // CthRegister does not call the constructor
114 //  CkpvAccess(usrEvents) = CkVec<UsrEvent *>();
115 #endif //CMK_BIGSIM_CHARM
116   CkpvInitialize(TraceProjections*, _trace);
117   CkpvAccess(_trace) = new  TraceProjections(argv);
118   CkpvAccess(_traces)->addTrace(CkpvAccess(_trace));
119   if (CkMyPe()==0) CkPrintf("Charm++: Tracemode Projections enabled.\n");
120 }
121  
122 /* ****** CW TEMPORARY LOCATION ***** Support for thread listeners */
123
124 struct TraceThreadListener {
125   struct CthThreadListener base;
126   int event;
127   int msgType;
128   int ep;
129   int srcPe;
130   int ml;
131   CmiObjId idx;
132 };
133
134 extern "C"
135 void traceThreadListener_suspend(struct CthThreadListener *l)
136 {
137   TraceThreadListener *a=(TraceThreadListener *)l;
138   /* here, we activate the appropriate trace codes for the appropriate
139      registered modules */
140   traceSuspend();
141 }
142
143 extern "C"
144 void traceThreadListener_resume(struct CthThreadListener *l) 
145 {
146   TraceThreadListener *a=(TraceThreadListener *)l;
147   /* here, we activate the appropriate trace codes for the appropriate
148      registered modules */
149   _TRACE_BEGIN_EXECUTE_DETAILED(a->event,a->msgType,a->ep,a->srcPe,a->ml,
150                                 CthGetThreadID(a->base.thread));
151   a->event=-1;
152   a->srcPe=CkMyPe(); /* potential lie to migrated threads */
153   a->ml=0;
154 }
155
156 extern "C"
157 void traceThreadListener_free(struct CthThreadListener *l) 
158 {
159   TraceThreadListener *a=(TraceThreadListener *)l;
160   delete a;
161 }
162
163 void TraceProjections::traceAddThreadListeners(CthThread tid, envelope *e)
164 {
165 #if CMK_TRACE_ENABLED
166   /* strip essential information from the envelope */
167   TraceThreadListener *a= new TraceThreadListener;
168   
169   a->base.suspend=traceThreadListener_suspend;
170   a->base.resume=traceThreadListener_resume;
171   a->base.free=traceThreadListener_free;
172   a->event=e->getEvent();
173   a->msgType=e->getMsgtype();
174   a->ep=e->getEpIdx();
175   a->srcPe=e->getSrcPe();
176   a->ml=e->getTotalsize();
177
178   CthAddListener(tid, (CthThreadListener *)a);
179 #endif
180 }
181
182 void LogPool::openLog(const char *mode)
183 {
184 #if CMK_PROJECTIONS_USE_ZLIB
185   if(compressed) {
186     if (nonDeltaLog) {
187       do {
188         zfp = gzopen(fname, mode);
189       } while (!zfp && (errno == EINTR || errno == EMFILE));
190       if(!zfp) CmiAbort("Cannot open Projections Compressed Non Delta Trace File for writing...\n");
191     }
192     if (deltaLog) {
193       do {
194         deltazfp = gzopen(dfname, mode);
195       } while (!deltazfp && (errno == EINTR || errno == EMFILE));
196       if (!deltazfp) 
197         CmiAbort("Cannot open Projections Compressed Delta Trace File for writing...\n");
198     }
199   } else {
200     if (nonDeltaLog) {
201       do {
202         fp = fopen(fname, mode);
203       } while (!fp && (errno == EINTR || errno == EMFILE));
204       if (!fp) {
205         CkPrintf("[%d] Attempting to open file [%s]\n",CkMyPe(),fname);
206         CmiAbort("Cannot open Projections Non Delta Trace File for writing...\n");
207       }
208     }
209     if (deltaLog) {
210       do {
211         deltafp = fopen(dfname, mode);
212       } while (!deltafp && (errno == EINTR || errno == EMFILE));
213       if (!deltafp) 
214         CmiAbort("Cannot open Projections Delta Trace File for writing...\n");
215     }
216   }
217 #else
218   if (nonDeltaLog) {
219     do {
220       fp = fopen(fname, mode);
221     } while (!fp && (errno == EINTR || errno == EMFILE));
222     if (!fp) {
223       CkPrintf("[%d] Attempting to open file [%s]\n",CkMyPe(),fname);
224       CmiAbort("Cannot open Projections Non Delta Trace File for writing...\n");
225     }
226   }
227   if (deltaLog) {
228     do {
229       deltafp = fopen(dfname, mode);
230     } while (!deltafp && (errno == EINTR || errno == EMFILE));
231     if(!deltafp) 
232       CmiAbort("Cannot open Projections Delta Trace File for writing...\n");
233   }
234 #endif
235 }
236
237 void LogPool::closeLog(void)
238 {
239 #if CMK_PROJECTIONS_USE_ZLIB
240   if(compressed) {
241     if (nonDeltaLog) gzclose(zfp);
242     if (deltaLog) gzclose(deltazfp);
243     return;
244   }
245 #endif
246   if (nonDeltaLog) { 
247 #if !defined(_WIN32) || defined(__CYGWIN__)
248     fsync(fileno(fp)); 
249 #endif
250     fclose(fp); 
251   }
252   if (deltaLog)  { 
253 #if !defined(_WIN32) || defined(__CYGWIN__)
254     fsync(fileno(deltafp)); 
255 #endif
256     fclose(deltafp);  
257   }
258 }
259
260 LogPool::LogPool(char *pgm) {
261   pool = new LogEntry[CkpvAccess(CtrLogBufSize)];
262   // defaults to writing data (no outlier changes)
263   writeSummaryFiles = 0;
264   writeData = true;
265   numEntries = 0;
266   // **CW** for simple delta encoding
267   prevTime = 0.0;
268   timeErr = 0.0;
269   globalStartTime = 0.0;
270   globalEndTime = 0.0;
271   headerWritten = 0;
272   numPhases = 0;
273   hasFlushed = false;
274
275   keepPhase = NULL;
276
277   fileCreated = false;
278   poolSize = CkpvAccess(CtrLogBufSize);
279   pgmname = new char[strlen(pgm)+1];
280   strcpy(pgmname, pgm);
281
282   //statistic init
283   statisLastProcessTimer = 0;
284   statisLastIdleTimer = 0;
285   statisLastPackTimer = 0;
286   statisLastUnpackTimer = 0;
287   statisTotalExecutionTime  = 0;
288   statisTotalIdleTime = 0;
289   statisTotalPackTime = 0;
290   statisTotalUnpackTime = 0;
291   statisTotalCreationMsgs = 0;
292   statisTotalCreationBytes = 0;
293   statisTotalMCastMsgs = 0;
294   statisTotalMCastBytes = 0;
295   statisTotalEnqueueMsgs = 0;
296   statisTotalDequeueMsgs = 0;
297   statisTotalRecvMsgs = 0;
298   statisTotalRecvBytes = 0;
299   statisTotalMemAlloc = 0;
300   statisTotalMemFree = 0;
301
302 }
303
304 void LogPool::createFile(const char *fix)
305 {
306   if (fileCreated) {
307     return;
308   }
309   
310   if(CmiNumPartitions() > 1) {
311     CmiMkdir(CkpvAccess(partitionRoot));
312   }
313
314   char* filenameLastPart = strrchr(pgmname, PATHSEP) + 1; // Last occurrence of path separator
315   char *pathPlusFilePrefix = new char[1024];
316  
317   if(nSubdirs > 0){
318     int sd = CkMyPe() % nSubdirs;
319     char *subdir = new char[1024];
320     sprintf(subdir, "%s.projdir.%d", pgmname, sd);
321     CmiMkdir(subdir);
322     sprintf(pathPlusFilePrefix, "%s%c%s%s", subdir, PATHSEP, filenameLastPart, fix);
323     delete[] subdir;
324   } else {
325     sprintf(pathPlusFilePrefix, "%s%s", pgmname, fix);
326   }
327
328   char pestr[10];
329   sprintf(pestr, "%d", CkMyPe());
330 #if CMK_PROJECTIONS_USE_ZLIB
331   int len;
332   if(compressed)
333     len = strlen(pathPlusFilePrefix)+strlen(".logold")+strlen(pestr)+strlen(".gz")+3;
334   else
335     len = strlen(pathPlusFilePrefix)+strlen(".logold")+strlen(pestr)+3;
336 #else
337   int len = strlen(pathPlusFilePrefix)+strlen(".logold")+strlen(pestr)+3;
338 #endif
339
340   if (nonDeltaLog) {
341     fname = new char[len];
342   }
343   if (deltaLog) {
344     dfname = new char[len];
345   }
346 #if CMK_PROJECTIONS_USE_ZLIB
347   if(compressed) {
348     if (deltaLog && nonDeltaLog) {
349       sprintf(fname, "%s.%s.logold.gz",  pathPlusFilePrefix, pestr);
350       sprintf(dfname, "%s.%s.log.gz", pathPlusFilePrefix, pestr);
351     } else {
352       if (nonDeltaLog) {
353         sprintf(fname, "%s.%s.log.gz", pathPlusFilePrefix,pestr);
354       } else {
355         sprintf(dfname, "%s.%s.log.gz", pathPlusFilePrefix, pestr);
356       }
357     }
358   } else {
359     if (deltaLog && nonDeltaLog) {
360       sprintf(fname, "%s.%s.logold", pathPlusFilePrefix, pestr);
361       sprintf(dfname, "%s.%s.log", pathPlusFilePrefix, pestr);
362     } else {
363       if (nonDeltaLog) {
364         sprintf(fname, "%s.%s.log", pathPlusFilePrefix, pestr);
365       } else {
366         sprintf(dfname, "%s.%s.log", pathPlusFilePrefix, pestr);
367       }
368     }
369   }
370 #else
371   if (deltaLog && nonDeltaLog) {
372     sprintf(fname, "%s.%s.logold", pathPlusFilePrefix, pestr);
373     sprintf(dfname, "%s.%s.log", pathPlusFilePrefix, pestr);
374   } else {
375     if (nonDeltaLog) {
376       sprintf(fname, "%s.%s.log", pathPlusFilePrefix, pestr);
377     } else {
378       sprintf(dfname, "%s.%s.log", pathPlusFilePrefix, pestr);
379     }
380   }
381 #endif
382   fileCreated = true;
383   delete[] pathPlusFilePrefix;
384   openLog("w");
385   CLOSE_LOG 
386 }
387
388 void LogPool::createSts(const char *fix)
389 {
390   CkAssert(CkMyPe() == 0);
391   if(CmiNumPartitions() > 1) {
392     CmiMkdir(CkpvAccess(partitionRoot));
393   }
394
395   // create the sts file
396   char *fname = new char[strlen(CkpvAccess(traceRoot))+strlen(fix)+strlen(".sts")+2];
397   sprintf(fname, "%s%s.sts", CkpvAccess(traceRoot), fix);
398   do
399     {
400       stsfp = fopen(fname, "w");
401     } while (!stsfp && (errno == EINTR || errno == EMFILE));
402   if(stsfp==0){
403     CmiPrintf("Cannot open projections sts file for writing due to %s\n", strerror(errno));
404     CmiAbort("Error!!\n");
405   }
406   delete[] fname;
407 }  
408
409 void LogPool::createTopo(const char *fix)
410 {
411   CkAssert(CkMyPe() == 0);
412   // create the topo file
413   char *fname = new char[strlen(CkpvAccess(traceRoot))+strlen(fix)+strlen(".topo")+2];
414   sprintf(fname, "%s%s.topo", CkpvAccess(traceRoot), fix);
415   do
416     {
417       topofp = fopen(fname, "w");
418     } while (!stsfp && (errno == EINTR || errno == EMFILE));
419   if(stsfp==0){
420     CmiPrintf("Cannot open projections topo file for writing due to %s\n", strerror(errno));
421     CmiAbort("Error!!\n");
422   }
423   delete[] fname;
424 }  
425
426 void LogPool::createRC()
427 {
428   // create the projections rc file.
429   fname = 
430     new char[strlen(CkpvAccess(traceRoot))+strlen(".projrc")+1];
431   sprintf(fname, "%s.projrc", CkpvAccess(traceRoot));
432   do {
433     rcfp = fopen(fname, "w");
434   } while (!rcfp && (errno == EINTR || errno == EMFILE));
435   if (rcfp==0) {
436     CmiAbort("Cannot open projections configuration file for writing.\n");
437   }
438   delete[] fname;
439 }
440
441 LogPool::~LogPool() 
442 {
443   if (writeData) {
444       if(writeSummaryFiles)
445           writeStatis();
446     writeLog();
447 #if !CMK_TRACE_LOGFILE_NUM_CONTROL
448     closeLog();
449 #endif
450   }
451
452 #if CMK_BIGSIM_CHARM
453   extern int correctTimeLog;
454   if (correctTimeLog) {
455     createFile("-bg");
456     if (CkMyPe() == 0) {
457       createSts("-bg");
458     }
459     writeHeader();
460     if (CkMyPe() == 0) writeSts(NULL);
461     postProcessLog();
462   }
463 #endif
464
465   delete[] pool;
466   delete [] fname;
467 }
468
469 void LogPool::writeHeader()
470 {
471   if (headerWritten) return;
472   headerWritten = 1;
473   if(!binary) {
474 #if CMK_PROJECTIONS_USE_ZLIB
475     if(compressed) {
476       if (nonDeltaLog) {
477         gzprintf(zfp, "PROJECTIONS-RECORD %d\n", numEntries);
478       }
479       if (deltaLog) {
480         gzprintf(deltazfp, "PROJECTIONS-RECORD %d DELTA\n", numEntries);
481       }
482     } 
483     else /* else clause is below... */
484 #endif
485     /*... may hang over from else above */ {
486       if (nonDeltaLog) {
487         fprintf(fp, "PROJECTIONS-RECORD %d\n", numEntries);
488       }
489       if (deltaLog) {
490         fprintf(deltafp, "PROJECTIONS-RECORD %d DELTA\n", numEntries);
491       }
492     }
493   }
494   else { // binary
495       if (nonDeltaLog) {
496         fwrite(&numEntries,sizeof(numEntries),1,fp);
497       }
498       if (deltaLog) {
499         fwrite(&numEntries,sizeof(numEntries),1,deltafp);
500       }
501   }
502 }
503
504 void LogPool::writeLog(void)
505 {
506   createFile();
507   OPEN_LOG
508   writeHeader();
509   if (nonDeltaLog) write(0);
510   if (deltaLog) write(1);
511   CLOSE_LOG
512 }
513
514 void LogPool::write(int writedelta) 
515 {
516   // **CW** Simple delta encoding implementation
517   // prevTime has to be maintained as an object variable because
518   // LogPool::write may be called several times depending on the
519   // +logsize value.
520   PUP::er *p = NULL;
521   if (binary) {
522     p = new PUP::toDisk(writedelta?deltafp:fp);
523   }
524 #if CMK_PROJECTIONS_USE_ZLIB
525   else if (compressed) {
526     p = new toProjectionsGZFile(writedelta?deltazfp:zfp);
527   }
528 #endif
529   else {
530     p = new toProjectionsFile(writedelta?deltafp:fp);
531   }
532   CmiAssert(p);
533   int curPhase = 0;
534   // **FIXME** - Should probably consider a more sophisticated bounds-based
535   //   approach for selective writing instead of making multiple if-checks
536   //   for every single event.
537   for(UInt i=0; i<numEntries; i++) {
538     if (!writedelta) {
539       if (keepPhase == NULL) {
540         // default case, when no phase selection is required.
541         pool[i].pup(*p);
542       } else {
543         // **FIXME** Might be a good idea to create a "filler" event block for
544         //   all the events taken out by phase filtering.
545         if (pool[i].type == END_PHASE) {
546           // always write phase markers
547           pool[i].pup(*p);
548           curPhase++;
549         } else if (pool[i].type == BEGIN_COMPUTATION ||
550                    pool[i].type == END_COMPUTATION) {
551           // always write BEGIN and END COMPUTATION markers
552           pool[i].pup(*p);
553         } else if (keepPhase[curPhase]) {
554           pool[i].pup(*p);
555         }
556       }
557     }
558     else {      // delta
559       // **FIXME** Implement phase-selective writing for delta logs
560       //   eventually
561       double time = pool[i].time;
562       if (pool[i].type != BEGIN_COMPUTATION && pool[i].type != END_COMPUTATION)
563       {
564         double timeDiff = (time-prevTime)*1.0e6;
565         UInt intTimeDiff = (UInt)timeDiff;
566         timeErr += timeDiff - intTimeDiff; /* timeErr is never >= 2.0 */
567         if (timeErr > 1.0) {
568           timeErr -= 1.0;
569           intTimeDiff++;
570         }
571         pool[i].time = intTimeDiff/1.0e6;
572       }
573       pool[i].pup(*p);
574       pool[i].time = time;      // restore time value
575       prevTime = time;
576     }
577   }
578   delete p;
579   delete [] keepPhase;
580 }
581
582 void LogPool::writeSts(void)
583 {
584   // for whining compilers
585   int i;
586   // generate an automatic unique ID for each log
587   fprintf(stsfp, "PROJECTIONS_ID %s\n", "");
588   fprintf(stsfp, "VERSION %s\n", PROJECTION_VERSION);
589   fprintf(stsfp, "TOTAL_PHASES %d\n", numPhases);
590 #if CMK_HAS_COUNTER_PAPI
591   fprintf(stsfp, "TOTAL_PAPI_EVENTS %d\n", NUMPAPIEVENTS);
592   // for now, use i, next time use papiEvents[i].
593   // **CW** papi event names is a hack.
594   char eventName[PAPI_MAX_STR_LEN];
595   for (i=0;i<NUMPAPIEVENTS;i++) {
596     PAPI_event_code_to_name(papiEvents[i], eventName);
597     fprintf(stsfp, "PAPI_EVENT %d %s\n", i, eventName);
598   }
599 #endif
600   traceWriteSTS(stsfp,CkpvAccess(usrEvents)->length());
601   for(i=0;i<CkpvAccess(usrEvents)->length();i++){
602     fprintf(stsfp, "EVENT %d %s\n", (*CkpvAccess(usrEvents))[i]->e, (*CkpvAccess(usrEvents))[i]->str);
603   }     
604 }
605
606 void LogPool::writeSts(TraceProjections *traceProj){
607   writeSts();
608   if (traceProj != NULL) {
609     CkHashtableIterator  *funcIter = traceProj->getfuncIterator();
610     funcIter->seekStart();
611     int numFuncs = traceProj->getFuncNumber();
612     fprintf(stsfp,"TOTAL_FUNCTIONS %d \n",numFuncs);
613     while(funcIter->hasNext()) {
614       StrKey *key;
615       int *obj = (int *)funcIter->next((void **)&key);
616       fprintf(stsfp,"FUNCTION %d %s \n",*obj,key->getStr());
617     }
618   }
619   fprintf(stsfp, "END\n");
620   fclose(stsfp);
621 }
622
623 void LogPool::writeRC(void)
624 {
625     //CkPrintf("write RC is being executed\n");
626 #ifdef PROJ_ANALYSIS  
627     CkAssert(CkMyPe() == 0);
628     fprintf(rcfp,"RC_GLOBAL_START_TIME %lld\n",
629             (CMK_PUP_LONG_LONG)(1.0e6*globalStartTime));
630     fprintf(rcfp,"RC_GLOBAL_END_TIME   %lld\n",
631             (CMK_PUP_LONG_LONG)(1.0e6*globalEndTime));
632     /* //Yanhua comment it because isOutlierAutomatic is not a variable in trace
633     if (CkpvAccess(_trace)->isOutlierAutomatic()) {
634       fprintf(rcfp,"RC_OUTLIER_FILTERED true\n");
635     } else {
636       fprintf(rcfp,"RC_OUTLIER_FILTERED false\n");
637     }
638     */
639 #endif //PROJ_ANALYSIS
640   fclose(rcfp);
641 }
642
643 void LogPool::writeTopo(void)
644 {
645   TopoManager tmgr;
646   tmgr.printAllocation(topofp);
647   fclose(topofp);
648 }
649
650 void LogPool::writeStatis(void)
651 {
652     
653    // create the statis file
654   char *fname = new char[strlen(CkpvAccess(traceRoot))+strlen(".statis")+10];
655   sprintf(fname, "%s.%d.statis", CkpvAccess(traceRoot), CkMyPe());
656   do
657   {
658       statisfp = fopen(fname, "w");
659   } while (!statisfp && (errno == EINTR || errno == EMFILE));
660   if(statisfp==0){
661       CmiPrintf("Cannot open projections statistic file for writing due to %s\n", strerror(errno));
662       CmiAbort("Error!!\n");
663   }
664   delete[] fname;
665   double totaltime = endComputationTime - beginComputationTime;
666   fprintf(statisfp, "time(sec) percentage\n");
667   fprintf(statisfp, "Time:    \t%f\n", totaltime);
668   fprintf(statisfp, "Idle :\t%f\t %.1f\n", statisTotalIdleTime, statisTotalIdleTime/totaltime * 100); 
669   fprintf(statisfp, "Overhead:    \t%f\t %.1f\n", totaltime - statisTotalIdleTime - statisTotalExecutionTime , (totaltime - statisTotalIdleTime - statisTotalExecutionTime)/totaltime * 100); 
670   fprintf(statisfp, "Exeuction:\t%f\t %.1f\n", statisTotalExecutionTime, statisTotalExecutionTime/totaltime*100); 
671   fprintf(statisfp, "Pack:     \t%f\t %.2f\n", statisTotalPackTime, statisTotalPackTime/totaltime*100); 
672   fprintf(statisfp, "Unpack:   \t%f\t %.2f\n", statisTotalUnpackTime, statisTotalUnpackTime/totaltime*100); 
673   fprintf(statisfp, "Creation Msgs Numbers, Bytes, Avg:   \t%lld\t %lld\t %lld \n", statisTotalCreationMsgs, statisTotalCreationBytes, (statisTotalCreationMsgs>0)?statisTotalCreationBytes/statisTotalCreationMsgs:statisTotalCreationMsgs); 
674   fprintf(statisfp, "Multicast Msgs Numbers, Bytes, Avg:   \t%lld\t %lld\t %lld \n", statisTotalMCastMsgs, statisTotalMCastBytes, (statisTotalMCastMsgs>0)?statisTotalMCastBytes/statisTotalMCastMsgs:statisTotalMCastMsgs); 
675   fprintf(statisfp, "Received Msgs Numbers, Bytes, Avg:   \t%lld\t %lld\t %lld \n", statisTotalRecvMsgs, statisTotalRecvBytes, (statisTotalRecvMsgs>0)?statisTotalRecvBytes/statisTotalRecvMsgs:statisTotalRecvMsgs); 
676   fclose(statisfp);
677 }
678
679 #if CMK_BIGSIM_CHARM
680 static void updateProjLog(void *data, double t, double recvT, void *ptr)
681 {
682   LogEntry *log = (LogEntry *)data;
683   FILE *fp = *(FILE **)ptr;
684   log->time = t;
685   log->recvTime = recvT<0.0?0:recvT;
686 //  log->write(fp);
687   toProjectionsFile p(fp);
688   log->pup(p);
689 }
690 #endif
691
692 // flush log entries to disk
693 void LogPool::flushLogBuffer()
694 {
695   if (numEntries) {
696     double writeTime = TraceTimer();
697     writeLog();
698     hasFlushed = true;
699     numEntries = 0;
700     new (&pool[numEntries++]) LogEntry(writeTime, BEGIN_INTERRUPT);
701     new (&pool[numEntries++]) LogEntry(TraceTimer(), END_INTERRUPT);
702     //CkPrintf("Warning: Projections log flushed to disk on PE %d.\n", CkMyPe());
703     if (!traceProjectionsGID.isZero()) {    // report flushing events to PE 0
704       CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
705       bocProxy[0].flush_warning(CkMyPe());
706     }
707   }
708 }
709
710 void LogPool::add(UChar type, UShort mIdx, UShort eIdx,
711                   double time, int event, int pe, int ml, CmiObjId *id, 
712                   double recvT, double cpuT, int numPe)
713 {
714     switch(type)
715     {
716         case BEGIN_COMPUTATION:
717             beginComputationTime = time;
718             break;
719         case END_COMPUTATION:
720             endComputationTime = time;
721             break;
722         case CREATION:
723             statisTotalCreationMsgs++;
724             statisTotalCreationBytes += ml;
725             break;
726         case CREATION_MULTICAST:
727             statisTotalMCastMsgs++;
728             statisTotalMCastBytes += ml;
729             break;
730         case ENQUEUE:
731             statisTotalEnqueueMsgs++;
732             break;
733         case DEQUEUE:
734             statisTotalDequeueMsgs++;
735             break;
736         case MESSAGE_RECV:
737             statisTotalRecvMsgs++;
738             statisTotalRecvBytes += ml;
739             break;
740         case MEMORY_MALLOC:
741             statisTotalMemAlloc++;
742             break;
743         case MEMORY_FREE:
744             statisTotalMemFree++;
745             break;
746         case BEGIN_PROCESSING:
747             statisLastProcessTimer = time;
748             break;
749         case BEGIN_UNPACK:
750             statisLastUnpackTimer = time;
751             break;
752         case BEGIN_PACK:
753             statisLastPackTimer = time;
754             break;
755         case BEGIN_IDLE:
756             statisLastIdleTimer = time;
757             break;
758         case END_PROCESSING:
759             statisTotalExecutionTime += (time - statisLastProcessTimer);
760             break;
761         case END_UNPACK:
762             statisTotalUnpackTime += (time - statisLastUnpackTimer);
763             break;
764         case END_PACK:
765             statisTotalPackTime += (time - statisLastPackTimer);
766             break;
767         case END_IDLE:
768             statisTotalIdleTime += (time - statisLastIdleTimer);
769             break;
770         default:
771             break;
772     }
773   new (&pool[numEntries++])
774     LogEntry(time, type, mIdx, eIdx, event, pe, ml, id, recvT, cpuT, numPe);
775   if ((type == END_PHASE) || (type == END_COMPUTATION)) {
776     numPhases++;
777   }
778   if(poolSize==numEntries) {
779     flushLogBuffer();
780 #if CMK_BIGSIM_CHARM
781     extern int correctTimeLog;
782     if (correctTimeLog) CmiAbort("I/O interrupt!\n");
783 #endif
784   }
785 #if CMK_BIGSIM_CHARM
786   switch (type) {
787     case BEGIN_PROCESSING:
788       pool[numEntries-1].recvTime = BgGetRecvTime();
789     case END_PROCESSING:
790     case BEGIN_COMPUTATION:
791     case END_COMPUTATION:
792     case CREATION:
793     case BEGIN_PACK:
794     case END_PACK:
795     case BEGIN_UNPACK:
796     case END_UNPACK:
797     case USER_EVENT_PAIR:
798       bgAddProjEvent(&pool[numEntries-1], numEntries-1, time, updateProjLog, &fp, BG_EVENT_PROJ);
799   }
800 #endif
801 }
802
803 void LogPool::add(UChar type,double time,UShort funcID,int lineNum,char *fileName){
804 #ifndef CMK_BIGSIM_CHARM
805   new (&pool[numEntries++])
806         LogEntry(time,type,funcID,lineNum,fileName);
807   if(poolSize == numEntries){
808     flushLogBuffer();
809   }
810 #endif  
811 }
812
813
814   
815 void LogPool::addMemoryUsage(unsigned char type,double time,double memUsage){
816 #ifndef CMK_BIGSIM_CHARM
817   new (&pool[numEntries++])
818         LogEntry(type,time,memUsage);
819   if(poolSize == numEntries){
820     flushLogBuffer();
821   }
822 #endif  
823         
824 }  
825
826
827
828 void LogPool::addUserSupplied(int data){
829         // add an event
830         add(USER_SUPPLIED, 0, 0, TraceTimer(), -1, -1, 0, 0, 0, 0, 0 );
831
832         // set the user supplied value for the previously created event 
833         pool[numEntries-1].setUserSuppliedData(data);
834   }
835
836
837 void LogPool::addUserSuppliedNote(char *note){
838         // add an event
839         add(USER_SUPPLIED_NOTE, 0, 0, TraceTimer(), -1, -1, 0, 0, 0, 0, 0 );
840
841         // set the user supplied note for the previously created event 
842         pool[numEntries-1].setUserSuppliedNote(note);
843   }
844
845 void LogPool::addUserSuppliedBracketedNote(char *note, int eventID, double bt, double et){
846   //CkPrintf("LogPool::addUserSuppliedBracketedNote eventID=%d\n", eventID);
847 #ifndef CMK_BIGSIM_CHARM
848 #if MPI_TRACE_MACHINE_HACK
849   //This part of code is used  to combine the contiguous
850   //MPI_Test and MPI_Iprobe events to reduce the number of
851   //entries
852 #define MPI_TEST_EVENT_ID 60
853 #define MPI_IPROBE_EVENT_ID 70 
854   int lastEvent = pool[numEntries-1].event;
855   if((eventID==MPI_TEST_EVENT_ID || eventID==MPI_IPROBE_EVENT_ID) && (eventID==lastEvent)){
856     //just replace the endtime of last event
857     //CkPrintf("addUserSuppliedBracketNote: for event %d\n", lastEvent);
858     pool[numEntries].endTime = et;
859   }else{
860     new (&pool[numEntries++])
861       LogEntry(bt, et, USER_SUPPLIED_BRACKETED_NOTE, note, eventID);
862   }
863 #else
864   new (&pool[numEntries++])
865     LogEntry(bt, et, USER_SUPPLIED_BRACKETED_NOTE, note, eventID);
866 #endif
867   if(poolSize == numEntries){
868     flushLogBuffer();
869   }
870 #endif  
871 }
872
873
874 /* **CW** Not sure if this is the right thing to do. Feels more like
875    a hack than a solution to Sameer's request to add the destination
876    processor information to multicasts and broadcasts.
877
878    In the unlikely event this method is used for Broadcasts as well,
879    pelist == NULL will be used to indicate a global broadcast with 
880    num PEs.
881 */
882 void LogPool::addCreationMulticast(UShort mIdx, UShort eIdx, double time,
883                                    int event, int pe, int ml, CmiObjId *id,
884                                    double recvT, int numPe, int *pelist)
885 {
886   new (&pool[numEntries++])
887     LogEntry(time, mIdx, eIdx, event, pe, ml, id, recvT, numPe, pelist);
888   if(poolSize==numEntries) {
889     flushLogBuffer();
890   }
891 }
892
893 void LogPool::postProcessLog()
894 {
895 #if CMK_BIGSIM_CHARM
896   bgUpdateProj(1);   // event type
897 #endif
898 }
899
900 void LogPool::modLastEntryTimestamp(double ts)
901 {
902   pool[numEntries-1].time = ts;
903   //pool[numEntries-1].cputime = ts;
904 }
905
906 // /** Constructor for a multicast log entry */
907 // 
908 //  THIS WAS MOVED TO trace-projections.h with the other constructors
909 // 
910 // LogEntry::LogEntry(double tm, unsigned short m, unsigned short e, int ev, int p,
911 //           int ml, CmiObjId *d, double rt, int numPe, int *pelist) 
912 // {
913 //     type = CREATION_MULTICAST; mIdx = m; eIdx = e; event = ev; pe = p; time = tm; msglen = ml;
914 //     if (d) id = *d; else {id.id[0]=id.id[1]=id.id[2]=id.id[3]=-1; };
915 //     recvTime = rt; 
916 //     numpes = numPe;
917 //     userSuppliedNote = NULL;
918 //     if (pelist != NULL) {
919 //      pes = new int[numPe];
920 //      for (int i=0; i<numPe; i++) {
921 //        pes[i] = pelist[i];
922 //      }
923 //     } else {
924 //      pes= NULL;
925 //     }
926 // }
927
928 //void LogEntry::addPapi(LONG_LONG_PAPI *papiVals)
929 //{
930 //#if CMK_HAS_COUNTER_PAPI
931 //   memcpy(papiValues, papiVals, sizeof(LONG_LONG_PAPI)*NUMPAPIEVENTS);
932 //#endif
933 //}
934
935
936
937 void LogEntry::pup(PUP::er &p)
938 {
939   int i;
940   CMK_TYPEDEF_UINT8 itime, iEndTime, irecvtime, icputime;
941   char ret = '\n';
942
943   p|type;
944   if (p.isPacking()) itime = (CMK_TYPEDEF_UINT8)(1.0e6*time);
945   if (p.isPacking()) iEndTime = (CMK_TYPEDEF_UINT8)(1.0e6*endTime);
946
947   switch (type) {
948     case USER_EVENT:
949     case USER_EVENT_PAIR:
950       p|mIdx; p|itime; p|event; p|pe;
951       break;
952     case BEGIN_IDLE:
953     case END_IDLE:
954     case BEGIN_PACK:
955     case END_PACK:
956     case BEGIN_UNPACK:
957     case END_UNPACK:
958       p|itime; p|pe; 
959       break;
960     case BEGIN_PROCESSING:
961       if (p.isPacking()) {
962         irecvtime = (CMK_TYPEDEF_UINT8)(recvTime==-1?-1:1.0e6*recvTime);
963         icputime = (CMK_TYPEDEF_UINT8)(1.0e6*cputime);
964       }
965       p|mIdx; p|eIdx; p|itime; p|event; p|pe; 
966       p|msglen; p|irecvtime; 
967       p|id.id[0]; p|id.id[1]; p|id.id[2]; p|id.id[3];
968       p|icputime;
969 #if CMK_HAS_COUNTER_PAPI
970       //p|numPapiEvents;
971       for (i=0; i<NUMPAPIEVENTS; i++) {
972         // not yet!!!
973         //      p|papiIDs[i]; 
974         p|papiValues[i];
975         
976       }
977 #else
978       //p|numPapiEvents;     // non papi version has value 0
979 #endif
980       if (p.isUnpacking()) {
981         recvTime = irecvtime/1.0e6;
982         cputime = icputime/1.0e6;
983       }
984       break;
985     case END_PROCESSING:
986       if (p.isPacking()) icputime = (CMK_TYPEDEF_UINT8)(1.0e6*cputime);
987       p|mIdx; p|eIdx; p|itime; p|event; p|pe; p|msglen; p|icputime;
988 #if CMK_HAS_COUNTER_PAPI
989       //p|numPapiEvents;
990       for (i=0; i<NUMPAPIEVENTS; i++) {
991         // not yet!!!
992         //      p|papiIDs[i];
993         p|papiValues[i];
994       }
995 #else
996       //p|numPapiEvents;  // non papi version has value 0
997 #endif
998       if (p.isUnpacking()) cputime = icputime/1.0e6;
999       break;
1000     case USER_SUPPLIED:
1001           p|userSuppliedData;
1002           p|itime;
1003         break;
1004     case USER_SUPPLIED_NOTE:
1005           p|itime;
1006           int length;
1007           if (p.isPacking()) length = strlen(userSuppliedNote);
1008           p | length;
1009           char space;
1010           space = ' ';
1011           p | space;
1012           if (p.isUnpacking()) {
1013             userSuppliedNote = new char[length+1];
1014             userSuppliedNote[length] = '\0';
1015           }
1016           PUParray(p,userSuppliedNote, length);
1017           break;
1018     case USER_SUPPLIED_BRACKETED_NOTE:
1019       //CkPrintf("Writting out a USER_SUPPLIED_BRACKETED_NOTE\n");
1020           p|itime;
1021           p|iEndTime;
1022           p|event;
1023           int length2;
1024           if (p.isPacking()) length2 = strlen(userSuppliedNote);
1025           p | length2;
1026           char space2;
1027           space2 = ' ';
1028           p | space2;
1029           if (p.isUnpacking()) {
1030             userSuppliedNote = new char[length+1];
1031             userSuppliedNote[length] = '\0';
1032           }
1033           PUParray(p,userSuppliedNote, length2);
1034           break;
1035     case MEMORY_USAGE_CURRENT:
1036       p | memUsage;
1037       p | itime;
1038         break;
1039     case CREATION:
1040       if (p.isPacking()) irecvtime = (CMK_TYPEDEF_UINT8)(1.0e6*recvTime);
1041       p|mIdx; p|eIdx; p|itime;
1042       p|event; p|pe; p|msglen; p|irecvtime;
1043       if (p.isUnpacking()) recvTime = irecvtime/1.0e6;
1044       break;
1045     case CREATION_BCAST:
1046       if (p.isPacking()) irecvtime = (CMK_TYPEDEF_UINT8)(1.0e6*recvTime);
1047       p|mIdx; p|eIdx; p|itime;
1048       p|event; p|pe; p|msglen; p|irecvtime; p|numpes;
1049       if (p.isUnpacking()) recvTime = irecvtime/1.0e6;
1050       break;
1051     case CREATION_MULTICAST:
1052       if (p.isPacking()) irecvtime = (CMK_TYPEDEF_UINT8)(1.0e6*recvTime);
1053       p|mIdx; p|eIdx; p|itime;
1054       p|event; p|pe; p|msglen; p|irecvtime; p|numpes;
1055       if (p.isUnpacking()) pes = numpes?new int[numpes]:NULL;
1056       for (i=0; i<numpes; i++) p|pes[i];
1057       if (p.isUnpacking()) recvTime = irecvtime/1.0e6;
1058       break;
1059     case MESSAGE_RECV:
1060       p|mIdx; p|eIdx; p|itime; p|event; p|pe; p|msglen;
1061       break;
1062
1063     case ENQUEUE:
1064     case DEQUEUE:
1065       p|mIdx; p|itime; p|event; p|pe;
1066       break;
1067
1068     case BEGIN_INTERRUPT:
1069     case END_INTERRUPT:
1070       p|itime; p|event; p|pe;
1071       break;
1072
1073       // **CW** absolute timestamps are used here to support a quick
1074       // way of determining the total time of a run in projections
1075       // visualization.
1076     case BEGIN_COMPUTATION:
1077     case END_COMPUTATION:
1078     case BEGIN_TRACE:
1079     case END_TRACE:
1080       p|itime;
1081       break;
1082     case BEGIN_FUNC:
1083         p | itime;
1084         p | mIdx;
1085         p | event;
1086         if(!p.isUnpacking()){
1087                 p(fName,flen-1);
1088         }
1089         break;
1090     case END_FUNC:
1091         p | itime;
1092         p | mIdx;
1093         break;
1094     case END_PHASE:
1095       p|eIdx; // FIXME: actually the phase ID
1096       p|itime;
1097       break;
1098     default:
1099       CmiError("***Internal Error*** Wierd Event %d.\n", type);
1100       break;
1101   }
1102   if (p.isUnpacking()) time = itime/1.0e6;
1103   p|ret;
1104 }
1105
1106 TraceProjections::TraceProjections(char **argv): 
1107   curevent(0), inEntry(0), computationStarted(0), 
1108         converseExit(0), endTime(0.0), traceNestedEvents(0),
1109         currentPhaseID(0), lastPhaseEvent(NULL)
1110 {
1111   //  CkPrintf("Trace projections dummy constructor called on %d\n",CkMyPe());
1112
1113   if (CkpvAccess(traceOnPe) == 0) return;
1114
1115   CtvInitialize(int,curThreadEvent);
1116   CkpvInitialize(CmiInt8, CtrLogBufSize);
1117   CkpvAccess(CtrLogBufSize) = DefaultLogBufSize;
1118   CtvAccess(curThreadEvent)=0;
1119   if (CmiGetArgLongDesc(argv,"+logsize",&CkpvAccess(CtrLogBufSize), 
1120                        "Log entries to buffer per I/O")) {
1121     if (CkMyPe() == 0) {
1122       CmiPrintf("Trace: logsize: %ld\n", CkpvAccess(CtrLogBufSize));
1123     }
1124   }
1125   checknested = 
1126     CmiGetArgFlagDesc(argv,"+checknested",
1127                       "check projections nest begin end execute events");
1128   traceNestedEvents = 
1129     CmiGetArgFlagDesc(argv,"+tracenested",
1130               "trace projections nest begin/end execute events");
1131   int binary = 
1132     CmiGetArgFlagDesc(argv,"+binary-trace",
1133                       "Write log files in binary format");
1134
1135   CmiInt8 nSubdirs = 0;
1136   CmiGetArgLongDesc(argv,"+trace-subdirs", &nSubdirs, "Number of subdirectories into which traces will be written");
1137
1138
1139 #if CMK_PROJECTIONS_USE_ZLIB
1140   int compressed = true;
1141   CmiGetArgFlagDesc(argv,"+gz-trace","Write log files pre-compressed with gzip");
1142   int disableCompressed = CmiGetArgFlagDesc(argv,"+no-gz-trace","Disable writing log files pre-compressed with gzip");
1143   compressed = compressed && !disableCompressed;
1144 #else
1145   // consume the flag so there's no confusing
1146   CmiGetArgFlagDesc(argv,"+gz-trace",
1147                     "Write log files pre-compressed with gzip");
1148   if(CkMyPe() == 0) CkPrintf("Warning> gz-trace is not supported on this machine!\n");
1149 #endif
1150
1151   int writeSummaryFiles = CmiGetArgFlagDesc(argv,"+write-analysis-file","Enable writing summary files "); 
1152   
1153   // **CW** default to non delta log encoding. The user may choose to do
1154   // create both logs (for debugging) or just the old log timestamping
1155   // (for compatibility).
1156   // Generating just the non delta log takes precedence over generating
1157   // both logs (if both arguments appear on the command line).
1158
1159   // switch to OLD log format until everything works // Gengbin
1160   nonDeltaLog = 1;
1161   deltaLog = 0;
1162   deltaLog = CmiGetArgFlagDesc(argv, "+logDelta",
1163                                   "Generate Delta encoded and simple timestamped log files");
1164
1165   _logPool = new LogPool(CkpvAccess(traceRoot));
1166   _logPool->setNumSubdirs(nSubdirs);
1167   _logPool->setBinary(binary);
1168   _logPool->setWriteSummaryFiles(writeSummaryFiles);
1169 #if CMK_PROJECTIONS_USE_ZLIB
1170   _logPool->setCompressed(compressed);
1171 #endif
1172   if (CkMyPe() == 0) {
1173     _logPool->createSts();
1174     _logPool->createRC();
1175     _logPool->createTopo();
1176   }
1177   funcCount=1;
1178
1179 #if CMK_HAS_COUNTER_PAPI
1180   initPAPI();
1181 #endif
1182 }
1183
1184 int TraceProjections::traceRegisterUserEvent(const char* evt, int e)
1185 {
1186   OPTIMIZED_VERSION
1187   CkAssert(e==-1 || e>=0);
1188   CkAssert(evt != NULL);
1189   int event;
1190   int biggest = -1;
1191   for (int i=0; i<CkpvAccess(usrEvents)->length(); i++) {
1192     int cur = (*CkpvAccess(usrEvents))[i]->e;
1193     if (cur == e) {
1194       //CmiPrintf("%s %s\n", (*CkpvAccess(usrEvents))[i]->str, evt);
1195       if (strcmp((*CkpvAccess(usrEvents))[i]->str, evt) == 0) 
1196         return e;
1197       else
1198         CmiAbort("UserEvent double registered!");
1199     }
1200     if (cur > biggest) biggest = cur;
1201   }
1202   // if no user events have so far been registered. biggest will be -1
1203   // and hence newly assigned event numbers will begin from 0.
1204   if (e==-1) event = biggest+1;  // automatically assign new event number
1205   else event = e;
1206   CkpvAccess(usrEvents)->push_back(new UsrEvent(event,(char *)evt));
1207   return event;
1208 }
1209
1210 void TraceProjections::traceClearEps(void)
1211 {
1212   // In trace-summary, this zeros out the EP bins, to eliminate noise
1213   // from startup.  Here, this isn't useful, since we can do that in
1214   // post-processing
1215 }
1216
1217 void TraceProjections::traceWriteSts(void)
1218 {
1219   if(CkMyPe()==0)
1220     _logPool->writeSts(this);
1221 }
1222
1223 /** 
1224  * **IMPT NOTES**:
1225  *
1226  * This is called when Converse closes during ConverseCommonExit().
1227  * **FIXME**(?) - is this also exposed as a tracing-framework API call?
1228  *
1229  * Some programs bypass CkExit() (like NAMD, which eventually calls
1230  * ConverseExit()), modules like traces will have to pretend to shutdown
1231  * as if CkExit() was called but at the same time avoid making
1232  * subsequent CkExit() calls (which is usually required for allowing
1233  * other modules to shutdown).
1234  *
1235  * Note that we can only get here if CkExit() was not called, since the
1236  * trace module will un-register itself from TraceArray if it did.
1237  *
1238  */
1239 void TraceProjections::traceClose(void)
1240 {
1241 #ifdef PROJ_ANALYSIS
1242   // CkPrintf("CkExit was not called on shutdown on [%d]\n", CkMyPe());
1243
1244   // sets the flag that tells the code not to make the CkExit call later
1245   converseExit = 1;
1246   if (CkMyPe() == 0) {
1247     CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1248     bocProxy.traceProjectionsParallelShutdown(-1);
1249   }
1250   if(CkMyRank() == CkMyNodeSize()){ //communication thread
1251     CkpvAccess(_trace)->endComputation();
1252     delete _logPool;              // will write
1253     // remove myself from traceArray so that no tracing will be called.
1254     CkpvAccess(_traces)->removeTrace(this);
1255   }
1256 #else
1257   // we've already deleted the logpool, so multiple calls to traceClose
1258   // are tolerated.
1259   if (_logPool == NULL) {
1260     return;
1261   }
1262   if(CkMyPe()==0){
1263     _logPool->writeSts(this);
1264   }
1265   CkpvAccess(_trace)->endComputation();
1266   delete _logPool;              // will write
1267   // remove myself from traceArray so that no tracing will be called.
1268   CkpvAccess(_traces)->removeTrace(this);
1269
1270   CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1271   bocProxy.ckLocalBranch()->print_warning();
1272 #endif
1273 }
1274
1275 /**
1276  *  **IMPT NOTES**:
1277  *
1278  *  This is meant to be called internally by the tracing framework.
1279  *
1280  */
1281 void TraceProjections::closeTrace() {
1282   //  CkPrintf("Close Trace called on [%d]\n", CkMyPe());
1283   if (CkMyPe() == 0) {
1284     // CkPrintf("Pe 0 will now write sts and projrc files\n");
1285     _logPool->writeSts(this);
1286     _logPool->writeRC();
1287     _logPool->writeTopo();
1288     // CkPrintf("Pe 0 has now written sts and projrc files\n");
1289
1290     CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1291     bocProxy.ckLocalBranch()->print_warning();
1292   }
1293   delete _logPool;       // will write logs to file
1294
1295 }
1296
1297 #if CMK_SMP_TRACE_COMMTHREAD
1298 void TraceProjections::traceBeginOnCommThread()
1299 {
1300   if (!computationStarted) return;
1301   _logPool->add(BEGIN_TRACE, 0, 0, TraceTimer(), curevent++, CmiNumPes()+CmiMyNode());
1302 }
1303
1304 void TraceProjections::traceEndOnCommThread()
1305 {
1306   _logPool->add(END_TRACE, 0, 0, TraceTimer(), curevent++, CmiNumPes()+CmiMyNode());
1307 }
1308 #endif
1309
1310 void TraceProjections::traceBegin(void)
1311 {
1312   if (!computationStarted) return;
1313   _logPool->add(BEGIN_TRACE, 0, 0, TraceTimer(), curevent++, CkMyPe());
1314 }
1315
1316 void TraceProjections::traceEnd(void)
1317 {
1318   _logPool->add(END_TRACE, 0, 0, TraceTimer(), curevent++, CkMyPe());
1319 }
1320
1321 void TraceProjections::userEvent(int e)
1322 {
1323   if (!computationStarted) return;
1324   _logPool->add(USER_EVENT, e, 0, TraceTimer(),curevent++,CkMyPe());
1325 }
1326
1327 void TraceProjections::userBracketEvent(int e, double bt, double et)
1328 {
1329   if (!computationStarted) return;
1330   // two events record Begin/End of event e.
1331   _logPool->add(USER_EVENT_PAIR, e, 0, TraceTimer(bt), curevent, CkMyPe());
1332   _logPool->add(USER_EVENT_PAIR, e, 0, TraceTimer(et), curevent++, CkMyPe());
1333 }
1334
1335 void TraceProjections::userSuppliedData(int d)
1336 {
1337   if (!computationStarted) return;
1338   _logPool->addUserSupplied(d);
1339 }
1340
1341 void TraceProjections::userSuppliedNote(char *note)
1342 {
1343   if (!computationStarted) return;
1344   _logPool->addUserSuppliedNote(note);
1345 }
1346
1347
1348 void TraceProjections::userSuppliedBracketedNote(char *note, int eventID, double bt, double et)
1349 {
1350   if (!computationStarted) return;
1351   _logPool->addUserSuppliedBracketedNote(note,  eventID,  bt, et);
1352 }
1353
1354 void TraceProjections::memoryUsage(double m)
1355 {
1356   if (!computationStarted) return;
1357   _logPool->addMemoryUsage(MEMORY_USAGE_CURRENT, TraceTimer(), m );
1358   
1359 }
1360
1361
1362 void TraceProjections::creation(envelope *e, int ep, int num)
1363 {
1364   double curTime = TraceTimer();
1365   if (e == 0) {
1366     CtvAccess(curThreadEvent) = curevent;
1367     _logPool->add(CREATION, ForChareMsg, ep, curTime,
1368                   curevent++, CkMyPe(), 0, NULL, 0, 0.0);
1369   } else {
1370     int type=e->getMsgtype();
1371     e->setEvent(curevent);
1372     if (num > 1) {
1373       _logPool->add(CREATION_BCAST, type, ep, curTime,
1374                     curevent++, CkMyPe(), e->getTotalsize(), 
1375                     NULL, 0, 0.0, num);
1376     } else {
1377       _logPool->add(CREATION, type, ep, curTime,
1378                     curevent++, CkMyPe(), e->getTotalsize(), 
1379                     NULL, 0, 0.0);
1380     }
1381   }
1382 }
1383
1384 //This function is only called from a comm thread in SMP mode. 
1385 void TraceProjections::creation(char *msg)
1386 {
1387 #if CMK_SMP_TRACE_COMMTHREAD
1388         // msg must be a charm message
1389     envelope *e = (envelope *)msg;
1390     int ep = e->getEpIdx();
1391     if(_entryTable[ep]->traceEnabled) {
1392         creation(e, ep, 1);
1393         e->setSrcPe(CkMyPe());              // pretend I am the sender
1394     }
1395 #endif
1396 }
1397
1398 void TraceProjections::traceCommSetMsgID(char *msg)
1399 {
1400 #if CMK_SMP_TRACE_COMMTHREAD
1401     // msg must be a charm message
1402     envelope *e = (envelope *)msg;
1403     int ep = e->getEpIdx();
1404     if(_entryTable[ep]->traceEnabled) {
1405         e->setSrcPe(CkMyPe());              // pretend I am the sender
1406         e->setEvent(curevent);
1407     }
1408 #endif
1409 }
1410
1411 void TraceProjections::traceGetMsgID(char *msg, int *pe, int *event)
1412 {
1413     // msg must be a charm message
1414     *pe = *event = -1;
1415     envelope *e = (envelope *)msg;
1416     int ep = e->getEpIdx();
1417     if(_entryTable[ep]->traceEnabled) {
1418         *pe = e->getSrcPe();
1419         *event = e->getEvent();
1420     }
1421 }
1422
1423 void TraceProjections::traceSetMsgID(char *msg, int pe, int event)
1424 {
1425        // msg must be a charm message
1426     envelope *e = (envelope *)msg;
1427     int ep = e->getEpIdx();
1428     if(ep<=0 || ep>=_entryTable.size()) return;
1429     if (e->getSrcPe()<0 || e->getSrcPe()>=CkNumPes()+CkNumNodes()) return;
1430     if (e->getMsgtype()<=0 || e->getMsgtype()>=LAST_CK_ENVELOPE_TYPE) return;
1431     if(_entryTable[ep]->traceEnabled) {
1432         e->setSrcPe(pe);
1433         e->setEvent(event);
1434     }
1435 }
1436
1437 /* **CW** Non-disruptive attempt to add destination PE knowledge to
1438    Communication Library-specific Multicasts via new event 
1439    CREATION_MULTICAST.
1440 */
1441
1442 void TraceProjections::creationMulticast(envelope *e, int ep, int num,
1443                                          int *pelist)
1444 {
1445   double curTime = TraceTimer();
1446   if (e==0) {
1447     CtvAccess(curThreadEvent)=curevent;
1448     _logPool->addCreationMulticast(ForChareMsg, ep, curTime, curevent++,
1449                                    CkMyPe(), 0, 0, 0.0, num, pelist);
1450   } else {
1451     int type=e->getMsgtype();
1452     e->setEvent(curevent);
1453     _logPool->addCreationMulticast(type, ep, curTime, curevent++, CkMyPe(),
1454                                    e->getTotalsize(), 0, 0.0, num, pelist);
1455   }
1456 }
1457
1458 void TraceProjections::creationDone(int num)
1459 {
1460   // modified the creation done time of the last num log entries
1461   // FIXME: totally a hack
1462   double curTime = TraceTimer();
1463   int idx = _logPool->numEntries-1;
1464   while (idx >=0 && num >0 ) {
1465     LogEntry &log = _logPool->pool[idx];
1466     if ((log.type == CREATION) ||
1467         (log.type == CREATION_BCAST) ||
1468         (log.type == CREATION_MULTICAST)) {
1469       log.recvTime = curTime - log.time;
1470       num --;
1471     }
1472     idx--;
1473   }
1474 }
1475
1476 void TraceProjections::beginExecute(CmiObjId *tid)
1477 {
1478 #if CMK_HAS_COUNTER_PAPI
1479   if (PAPI_read(papiEventSet, papiValues) != PAPI_OK) {
1480     CmiAbort("PAPI failed to read at begin execute!\n");
1481   }
1482 #endif
1483   if (checknested && inEntry) CmiAbort("Nested Begin Execute!\n");
1484   execEvent = CtvAccess(curThreadEvent);
1485   execEp = (-1);
1486   _logPool->add(BEGIN_PROCESSING,ForChareMsg,_threadEP, TraceTimer(),
1487                 execEvent,CkMyPe(), 0, tid);
1488 #if CMK_HAS_COUNTER_PAPI
1489   _logPool->addPapi(papiValues);
1490 #endif
1491   inEntry = 1;
1492 }
1493
1494 void TraceProjections::beginExecute(envelope *e)
1495 {
1496   if(e==0) {
1497 #if CMK_HAS_COUNTER_PAPI
1498     if (PAPI_read(papiEventSet, papiValues) != PAPI_OK) {
1499       CmiAbort("PAPI failed to read at begin execute!\n");
1500     }
1501 #endif
1502     if (checknested && inEntry) CmiAbort("Nested Begin Execute!\n");
1503     execEvent = CtvAccess(curThreadEvent);
1504     execEp = (-1);
1505     _logPool->add(BEGIN_PROCESSING,ForChareMsg,_threadEP, TraceTimer(),
1506                   execEvent,CkMyPe(), 0, NULL, 0.0, TraceCpuTimer());
1507 #if CMK_HAS_COUNTER_PAPI
1508     _logPool->addPapi(papiValues);
1509 #endif
1510     inEntry = 1;
1511   } else {
1512     beginExecute(e->getEvent(),e->getMsgtype(),e->getEpIdx(),
1513                  e->getSrcPe(),e->getTotalsize());
1514   }
1515 }
1516
1517 void TraceProjections::beginExecute(char *msg){
1518 #if CMK_SMP_TRACE_COMMTHREAD
1519         //This function is called from comm thread in SMP mode
1520     envelope *e = (envelope *)msg;
1521     int ep = e->getEpIdx();
1522     if(_entryTable[ep]->traceEnabled)
1523                 beginExecute(e);
1524 #endif
1525 }
1526
1527 void TraceProjections::beginExecute(int event, int msgType, int ep, int srcPe,
1528                                     int mlen, CmiObjId *idx)
1529 {
1530   if (traceNestedEvents) {
1531     if (! nestedEvents.isEmpty()) {
1532       endExecuteLocal();
1533     }
1534     nestedEvents.enq(NestedEvent(event, msgType, ep, srcPe, mlen, idx));
1535   }
1536   beginExecuteLocal(event, msgType, ep, srcPe, mlen, idx);
1537 }
1538
1539 void TraceProjections::changeLastEntryTimestamp(double ts)
1540 {
1541   _logPool->modLastEntryTimestamp(ts);
1542 }
1543
1544 void TraceProjections::beginExecuteLocal(int event, int msgType, int ep, int srcPe,
1545                                     int mlen, CmiObjId *idx)
1546 {
1547 #if CMK_HAS_COUNTER_PAPI
1548   if (PAPI_read(papiEventSet, papiValues) != PAPI_OK) {
1549     CmiAbort("PAPI failed to read at begin execute!\n");
1550   }
1551 #endif
1552   if (checknested && inEntry) CmiAbort("Nested Begin Execute!\n");
1553   execEvent=event;
1554   execEp=ep;
1555   execPe=srcPe;
1556   _logPool->add(BEGIN_PROCESSING,msgType,ep, TraceTimer(),event,
1557                 srcPe, mlen, idx, 0.0, TraceCpuTimer());
1558 #if CMK_HAS_COUNTER_PAPI
1559   _logPool->addPapi(papiValues);
1560 #endif
1561   inEntry = 1;
1562 }
1563
1564 void TraceProjections::endExecute(void)
1565 {
1566   if (traceNestedEvents) nestedEvents.deq();
1567   endExecuteLocal();
1568   if (traceNestedEvents) {
1569     if (! nestedEvents.isEmpty()) {
1570       NestedEvent &ne = nestedEvents.peek();
1571       beginExecuteLocal(ne.event, ne.msgType, ne.ep, ne.srcPe, ne.ml, ne.idx);
1572     }
1573   }
1574 }
1575
1576 void TraceProjections::endExecute(char *msg)
1577 {
1578 #if CMK_SMP_TRACE_COMMTHREAD
1579         //This function is called from comm thread in SMP mode
1580     envelope *e = (envelope *)msg;
1581     int ep = e->getEpIdx();
1582     if(_entryTable[ep]->traceEnabled)
1583                 endExecute();
1584 #endif  
1585 }
1586
1587 void TraceProjections::endExecuteLocal(void)
1588 {
1589 #if CMK_HAS_COUNTER_PAPI
1590   if (PAPI_read(papiEventSet, papiValues) != PAPI_OK) {
1591     CmiAbort("PAPI failed to read at end execute!\n");
1592   }
1593 #endif
1594   if (checknested && !inEntry) CmiAbort("Nested EndExecute!\n");
1595   double cputime = TraceCpuTimer();
1596   double now = TraceTimer();
1597   if(execEp == (-1)) {
1598     _logPool->add(END_PROCESSING, 0, _threadEP, now,
1599                   execEvent, CkMyPe(), 0, NULL, 0.0, cputime);
1600   } else {
1601     _logPool->add(END_PROCESSING, 0, execEp, now,
1602                   execEvent, execPe, 0, NULL, 0.0, cputime);
1603   }
1604 #if CMK_HAS_COUNTER_PAPI
1605   _logPool->addPapi(papiValues);
1606 #endif
1607   inEntry = 0;
1608 }
1609
1610 void TraceProjections::messageRecv(char *env, int pe)
1611 {
1612 #if 0
1613   envelope *e = (envelope *)env;
1614   int msgType = e->getMsgtype();
1615   int ep = e->getEpIdx();
1616 #if 0
1617   if (msgType==NewChareMsg || msgType==NewVChareMsg
1618           || msgType==ForChareMsg || msgType==ForVidMsg
1619           || msgType==BocInitMsg || msgType==NodeBocInitMsg
1620           || msgType==ForBocMsg || msgType==ForNodeBocMsg)
1621     ep = e->getEpIdx();
1622   else
1623     ep = _threadEP;
1624 #endif
1625   _logPool->add(MESSAGE_RECV, msgType, ep, TraceTimer(),
1626                 curevent++, e->getSrcPe(), e->getTotalsize());
1627 #endif
1628 }
1629
1630 void TraceProjections::beginIdle(double curWallTime)
1631 {
1632     _logPool->add(BEGIN_IDLE, 0, 0, TraceTimer(curWallTime), 0, CkMyPe());
1633 }
1634
1635 void TraceProjections::endIdle(double curWallTime)
1636 {
1637     _logPool->add(END_IDLE, 0, 0, TraceTimer(curWallTime), 0, CkMyPe());
1638 }
1639
1640 void TraceProjections::beginPack(void)
1641 {
1642     _logPool->add(BEGIN_PACK, 0, 0, TraceTimer(), 0, CkMyPe());
1643 }
1644
1645 void TraceProjections::endPack(void)
1646 {
1647     _logPool->add(END_PACK, 0, 0, TraceTimer(), 0, CkMyPe());
1648 }
1649
1650 void TraceProjections::beginUnpack(void)
1651 {
1652     _logPool->add(BEGIN_UNPACK, 0, 0, TraceTimer(), 0, CkMyPe());
1653 }
1654
1655 void TraceProjections::endUnpack(void)
1656 {
1657     _logPool->add(END_UNPACK, 0, 0, TraceTimer(), 0, CkMyPe());
1658 }
1659
1660 void TraceProjections::enqueue(envelope *) {}
1661
1662 void TraceProjections::dequeue(envelope *) {}
1663
1664 void TraceProjections::beginComputation(void)
1665 {
1666   computationStarted = 1;
1667   // Executes the callback function provided by the machine
1668   // layer. This is the proper method to register user events in a
1669   // machine layer because projections is a charm++ module.
1670   if (CkpvAccess(traceOnPe) != 0) {
1671     void (*ptr)() = registerMachineUserEvents();
1672     if (ptr != NULL) {
1673       ptr();
1674     }
1675   }
1676 //  CkpvAccess(traceInitTime) = TRACE_TIMER();
1677 //  CkpvAccess(traceInitCpuTime) = TRACE_CPUTIMER();
1678   _logPool->add(BEGIN_COMPUTATION, 0, 0, TraceTimer(), -1, -1);
1679 #if CMK_HAS_COUNTER_PAPI
1680   // we start the counters here
1681   if (PAPI_start(papiEventSet) != PAPI_OK) {
1682     CmiAbort("PAPI failed to start designated counters!\n");
1683   }
1684 #endif
1685 }
1686
1687 void TraceProjections::endComputation(void)
1688 {
1689 #if CMK_HAS_COUNTER_PAPI
1690   // we stop the counters here. A silent failure is alright since we
1691   // are already at the end of the program.
1692   if (PAPI_stop(papiEventSet, papiValues) != PAPI_OK) {
1693     CkPrintf("Warning: PAPI failed to stop correctly!\n");
1694   }
1695   // NOTE: We should not do a complete close of PAPI until after the
1696   // sts writer is done.
1697 #endif
1698   endTime = TraceTimer();
1699   _logPool->add(END_COMPUTATION, 0, 0, endTime, -1, -1);
1700   /*
1701   CkPrintf("End Computation [%d] records time as %lf\n", CkMyPe(), 
1702            endTime*1e06);
1703   */
1704 }
1705
1706 int TraceProjections::idxRegistered(int idx)
1707 {
1708     int idxVecLen = idxVec.size();
1709     for(int i=0; i<idxVecLen; i++)
1710     {
1711         if(idx == idxVec[i])
1712             return 1;
1713     }
1714     return 0;
1715 }
1716
1717 void TraceProjections::regFunc(const char *name, int &idx, int idxSpecifiedByUser){
1718     StrKey k((char*)name,strlen(name));
1719     int num = funcHashtable.get(k);
1720     
1721     if(num!=0) {
1722         return;
1723         //as for mpi programs, the same function may be registered for several times
1724         //CmiError("\"%s has been already registered! Please change the name!\"\n", name);
1725     }
1726     
1727     int isIdxExisting=0;
1728     if(idxSpecifiedByUser)
1729         isIdxExisting=idxRegistered(idx);
1730     if(isIdxExisting){
1731         return;
1732         //same reason with num!=0
1733         //CmiError("The identifier %d for the trace function has been already registered!", idx);
1734     }
1735
1736     if(idxSpecifiedByUser) {
1737         char *st = new char[strlen(name)+1];
1738         memcpy(st,name,strlen(name)+1);
1739         StrKey *newKey = new StrKey(st,strlen(st));
1740         int &ref = funcHashtable.put(*newKey);
1741         ref=idx;
1742         funcCount++;
1743         idxVec.push_back(idx);  
1744     } else {
1745         char *st = new char[strlen(name)+1];
1746         memcpy(st,name,strlen(name)+1);
1747         StrKey *newKey = new StrKey(st,strlen(st));
1748         int &ref = funcHashtable.put(*newKey);
1749         ref=funcCount;
1750         num = funcCount;
1751         funcCount++;
1752         idx = num;
1753         idxVec.push_back(idx);
1754     }
1755 }
1756
1757 void TraceProjections::beginFunc(char *name,char *file,int line){
1758         StrKey k(name,strlen(name));    
1759         unsigned short  num = (unsigned short)funcHashtable.get(k);
1760         beginFunc(num,file,line);
1761 }
1762
1763 void TraceProjections::beginFunc(int idx,char *file,int line){
1764         if(idx <= 0){
1765                 CmiError("Unregistered function id %d being used in %s:%d \n",idx,file,line);
1766         }       
1767         _logPool->add(BEGIN_FUNC,TraceTimer(),idx,line,file);
1768 }
1769
1770 void TraceProjections::endFunc(char *name){
1771         StrKey k(name,strlen(name));    
1772         int num = funcHashtable.get(k);
1773         endFunc(num);   
1774 }
1775
1776 void TraceProjections::endFunc(int num){
1777         if(num <= 0){
1778                 printf("endFunc without start :O\n");
1779         }
1780         _logPool->add(END_FUNC,TraceTimer(),num,0,NULL);
1781 }
1782
1783 // specialized PUP:ers for handling trace projections logs
1784 void toProjectionsFile::bytes(void *p,int n,size_t itemSize,dataType t)
1785 {
1786   for (int i=0;i<n;i++) 
1787     switch(t) {
1788     case Tchar: CheckAndFPrintF(f,"%c",((char *)p)[i]); break;
1789     case Tuchar:
1790     case Tbyte: CheckAndFPrintF(f,"%d",((unsigned char *)p)[i]); break;
1791     case Tshort: CheckAndFPrintF(f," %d",((short *)p)[i]); break;
1792     case Tushort: CheckAndFPrintF(f," %u",((unsigned short *)p)[i]); break;
1793     case Tint: CheckAndFPrintF(f," %d",((int *)p)[i]); break;
1794     case Tuint: CheckAndFPrintF(f," %u",((unsigned int *)p)[i]); break;
1795     case Tlong: CheckAndFPrintF(f," %ld",((long *)p)[i]); break;
1796     case Tulong: CheckAndFPrintF(f," %lu",((unsigned long *)p)[i]); break;
1797     case Tfloat: CheckAndFPrintF(f," %.7g",((float *)p)[i]); break;
1798     case Tdouble: CheckAndFPrintF(f," %.15g",((double *)p)[i]); break;
1799 #ifdef CMK_PUP_LONG_LONG
1800     case Tlonglong: CheckAndFPrintF(f," %lld",((CMK_PUP_LONG_LONG *)p)[i]); break;
1801     case Tulonglong: CheckAndFPrintF(f," %llu",((unsigned CMK_PUP_LONG_LONG *)p)[i]); break;
1802 #endif
1803     default: CmiAbort("Unrecognized pup type code!");
1804     };
1805 }
1806
1807 void fromProjectionsFile::bytes(void *p,int n,size_t itemSize,dataType t)
1808 {
1809   for (int i=0;i<n;i++) 
1810     switch(t) {
1811     case Tchar: { 
1812       char c = fgetc(f);
1813       if (c==EOF)
1814         parseError("Could not match character");
1815       else
1816         ((char *)p)[i] = c;
1817       break;
1818     }
1819     case Tuchar:
1820     case Tbyte: ((unsigned char *)p)[i]=(unsigned char)readInt("%d"); break;
1821     case Tshort:((short *)p)[i]=(short)readInt(); break;
1822     case Tushort: ((unsigned short *)p)[i]=(unsigned short)readUint(); break;
1823     case Tint:  ((int *)p)[i]=readInt(); break;
1824     case Tuint: ((unsigned int *)p)[i]=readUint(); break;
1825     case Tlong: ((long *)p)[i]=readInt(); break;
1826     case Tulong:((unsigned long *)p)[i]=readUint(); break;
1827     case Tfloat: ((float *)p)[i]=(float)readDouble(); break;
1828     case Tdouble:((double *)p)[i]=readDouble(); break;
1829 #ifdef CMK_PUP_LONG_LONG
1830     case Tlonglong: ((CMK_PUP_LONG_LONG *)p)[i]=readLongInt(); break;
1831     case Tulonglong: ((unsigned CMK_PUP_LONG_LONG *)p)[i]=readLongInt(); break;
1832 #endif
1833     default: CmiAbort("Unrecognized pup type code!");
1834     };
1835 }
1836
1837 #if CMK_PROJECTIONS_USE_ZLIB
1838 void toProjectionsGZFile::bytes(void *p,int n,size_t itemSize,dataType t)
1839 {
1840   for (int i=0;i<n;i++) 
1841     switch(t) {
1842     case Tchar: gzprintf(f,"%c",((char *)p)[i]); break;
1843     case Tuchar:
1844     case Tbyte: gzprintf(f,"%d",((unsigned char *)p)[i]); break;
1845     case Tshort: gzprintf(f," %d",((short *)p)[i]); break;
1846     case Tushort: gzprintf(f," %u",((unsigned short *)p)[i]); break;
1847     case Tint: gzprintf(f," %d",((int *)p)[i]); break;
1848     case Tuint: gzprintf(f," %u",((unsigned int *)p)[i]); break;
1849     case Tlong: gzprintf(f," %ld",((long *)p)[i]); break;
1850     case Tulong: gzprintf(f," %lu",((unsigned long *)p)[i]); break;
1851     case Tfloat: gzprintf(f," %.7g",((float *)p)[i]); break;
1852     case Tdouble: gzprintf(f," %.15g",((double *)p)[i]); break;
1853 #ifdef CMK_PUP_LONG_LONG
1854     case Tlonglong: gzprintf(f," %lld",((CMK_PUP_LONG_LONG *)p)[i]); break;
1855     case Tulonglong: gzprintf(f," %llu",((unsigned CMK_PUP_LONG_LONG *)p)[i]); break;
1856 #endif
1857     default: CmiAbort("Unrecognized pup type code!");
1858     };
1859 }
1860 #endif
1861
1862 void TraceProjections::endPhase() {
1863   double currentPhaseTime = TraceTimer();
1864   if (lastPhaseEvent != NULL) {
1865   } else {
1866     if (_logPool->pool != NULL) {
1867       // assumed to be BEGIN_COMPUTATION
1868     } else {
1869       CkPrintf("[%d] Warning: End Phase encountered in an empty log. Inserting BEGIN_COMPUTATION event\n", CkMyPe());
1870       _logPool->add(BEGIN_COMPUTATION, 0, 0, currentPhaseTime, -1, -1);
1871     }
1872   }
1873
1874   /* Insert endPhase event here. */
1875   /* FIXME: Format should be TYPE, PHASE#, TimeStamp, [StartTime] */
1876   /*        We currently "borrow" from the standard add() method. */
1877   /*        It should really be its own add() method.             */
1878   /* NOTE: assignment to lastPhaseEvent is "pre-emptive".         */
1879   lastPhaseEvent = &(_logPool->pool[_logPool->numEntries]);
1880   _logPool->add(END_PHASE, 0, currentPhaseID, currentPhaseTime, -1, CkMyPe());
1881   currentPhaseID++;
1882 }
1883
1884 #ifdef PROJ_ANALYSIS
1885 // ***** FROM HERE, ALL BOC-BASED FUNCTIONALITY IS DEFINED *******
1886
1887
1888 // ***@@@@ REGISTRATION FUNCTIONS/METHODS @@@@***
1889
1890 void registerOutlierReduction() {
1891   outlierReductionType =
1892     CkReduction::addReducer(outlierReduction);
1893   minMaxReductionType =
1894     CkReduction::addReducer(minMaxReduction);
1895 }
1896
1897 /**
1898  * **IMPT NOTES**:
1899  *
1900  * This is the C++ code that is registered to be activated at module
1901  * shutdown. This is called exactly once on processor 0. Module shutdown
1902  * is initiated as a result of a CkExit() call by the application code
1903  * 
1904  * The exit function must ultimately call CkExit() again to
1905  * so that other module exit functions may proceed after this module is
1906  * done.
1907  *
1908  */
1909 // FIXME: WHY extern "C"???
1910 extern "C" void TraceProjectionsExitHandler()
1911 {
1912 #if CMK_TRACE_ENABLED
1913   // CkPrintf("[%d] TraceProjectionsExitHandler called!\n", CkMyPe());
1914   CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1915   bocProxy.traceProjectionsParallelShutdown(CkMyPe());
1916 #else
1917   CkExit();
1918 #endif
1919 }
1920
1921 // This is called once on each processor but the idiom of use appears
1922 // to be to only have processor 0 register the function.
1923 //
1924 // See initnode in trace-projections.ci
1925 void initTraceProjectionsBOC()
1926 {
1927   // CkPrintf("[%d] Trace Projections initialization called!\n", CkMyPe());
1928 #ifdef __BIGSIM__
1929   if (BgNodeRank() == 0) {
1930 #else
1931     if (CkMyRank() == 0) {
1932 #endif
1933       registerExitFn(TraceProjectionsExitHandler);
1934     }
1935 #if 0
1936   } // this is so indentation does not get messed up
1937 #endif
1938 }
1939
1940 // mainchare for trace-projections BOC-operations. 
1941 // Instantiated at processor 0 and ONLY resides on processor 0 for the 
1942 // rest of its life.
1943 //
1944 // Responsible for:
1945 //   1. Handling commandline arguments
1946 //   2. Creating any objects required for proper BOC operations.
1947 //
1948 TraceProjectionsInit::TraceProjectionsInit(CkArgMsg *msg) {
1949   /** Options for Outlier Analysis */
1950   // defaults. Things will change with support for interactive analysis.
1951   bool findOutliers = false;
1952   bool outlierAutomatic = true;
1953   int numKSeeds = 10; 
1954
1955   int peNumKeep = CkNumPes();  // used as a default
1956   double entryThreshold = 0.0;
1957   bool outlierUsePhases = false;
1958   if (outlierAutomatic) {
1959     CmiGetArgIntDesc(msg->argv, "+outlierNumSeeds", &numKSeeds,
1960                      "Number of cluster seeds to apply at outlier analysis.");
1961     CmiGetArgIntDesc(msg->argv, "+outlierPeNumKeep", 
1962                      &peNumKeep, "Number of Processors to retain data");
1963     CmiGetArgDoubleDesc(msg->argv, "+outlierEpThresh", &entryThreshold,
1964                         "Minimum significance of entry points to be considered for clustering (%).");
1965     findOutliers =
1966       CmiGetArgFlagDesc(msg->argv,"+outlier", "Find Outliers.");
1967     outlierUsePhases = 
1968       CmiGetArgFlagDesc(msg->argv,"+outlierUsePhases",
1969                         "Apply automatic outlier analysis to any available phases.");
1970     if (outlierUsePhases) {
1971       // if the user wants to use an outlier feature, it is assumed outlier
1972       //    analysis is desired.
1973       findOutliers = true;
1974     }
1975   }
1976   bool findStartTime = (CmiTimerAbsolute()==1);
1977   traceProjectionsGID = CProxy_TraceProjectionsBOC::ckNew(findOutliers, findStartTime);
1978   if (findOutliers) {
1979     kMeansGID = CProxy_KMeansBOC::ckNew(outlierAutomatic,
1980                                         numKSeeds,
1981                                         peNumKeep,
1982                                         entryThreshold,
1983                                         outlierUsePhases);
1984   }
1985
1986   delete msg;
1987 }
1988
1989 // Called on every processor.
1990 void TraceProjectionsBOC::traceProjectionsParallelShutdown(int pe) {
1991   //CmiPrintf("[%d] traceProjectionsParallelShutdown called from . \n", CkMyPe(), pe);
1992   endPe = pe;                // the pe that starts CkExit()
1993   if (CkMyPe() == 0) {
1994     analysisStartTime = CmiWallTimer();
1995   }
1996   CkpvAccess(_trace)->endComputation();
1997   // no more tracing for projections on this processor after this point. 
1998   // Note that clear must be called after remove, or bad things will happen.
1999   CkpvAccess(_traces)->removeTrace(CkpvAccess(_trace));
2000   CkpvAccess(_traces)->clearTrace();
2001
2002   // From this point, we start multiple chains of reductions and broadcasts to
2003   // perform final online analysis activities.
2004
2005   // Start all parallel operations at once. 
2006   //   These MUST NOT modify base performance data in LogPool. If they must,
2007   //   then the parallel operations must be phased (and this code to be
2008   //   restructured as necessary)
2009   CProxy_KMeansBOC kMeansProxy(kMeansGID);
2010   CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
2011   if (findOutliers) {
2012     parModulesRemaining++;
2013     kMeansProxy[CkMyPe()].startKMeansAnalysis();
2014   }
2015   parModulesRemaining++;
2016   if (findStartTime) 
2017   bocProxy[CkMyPe()].startTimeAnalysis();
2018   else
2019   bocProxy[CkMyPe()].startEndTimeAnalysis();
2020 }
2021
2022 // handle flush log warnings
2023 void TraceProjectionsBOC::flush_warning(int pe) 
2024 {
2025     CmiAssert(CkMyPe() == 0);
2026     std::set<int>::iterator it;
2027     it = list.find(pe);
2028     if (it == list.end())    list.insert(pe);
2029     flush_count++;
2030 }
2031
2032 void TraceProjectionsBOC::print_warning() 
2033 {
2034     CmiAssert(CkMyPe() == 0);
2035     if (flush_count == 0) return;
2036     std::set<int>::iterator it;
2037     CkPrintf("*************************************************************\n");
2038     CkPrintf("Warning: Projections log flushed to disk %d times on %d cores: ", flush_count, list.size());
2039     for (it=list.begin(); it!=list.end(); it++)
2040       CkPrintf("%d ", *it);
2041     CkPrintf(".\n");
2042     CkPrintf("Warning: The performance data is likely invalid, unless the flushes have been explicitly synchronized by your program. \n");
2043     CkPrintf("*************************************************************\n");
2044 }
2045
2046 // Called on each processor
2047 void KMeansBOC::startKMeansAnalysis() {
2048   // Initialize all necessary structures
2049   LogPool *pool = CkpvAccess(_trace)->_logPool;
2050
2051  if(CkMyPe()==0)     CkPrintf("[%d] KMeansBOC::startKMeansAnalysis time=\t%g\n", CkMyPe(), CkWallTimer() );
2052   int flushInt = 0;
2053   if (pool->hasFlushed) {
2054     flushInt = 1;
2055   }
2056   
2057   CkCallback cb(CkIndex_KMeansBOC::flushCheck(NULL), 
2058                 0, thisProxy);
2059   contribute(sizeof(int), &flushInt, CkReduction::logical_or, cb);  
2060 }
2061
2062 // Called on processor 0
2063 void KMeansBOC::flushCheck(CkReductionMsg *msg) {
2064   int someFlush = *((int *)msg->getData());
2065
2066   // if(CkMyPe()==0) CkPrintf("[%d] KMeansBOC::flushCheck time=\t%g\n", CkMyPe(), CkWallTimer() );
2067   
2068   if (someFlush == 0) {
2069     // Data intact proceed with KMeans analysis
2070     CProxy_KMeansBOC kMeansProxy(kMeansGID);
2071     kMeansProxy.flushCheckDone();
2072   } else {
2073     // Some processor had flushed it data at some point, abandon KMeans
2074     CkPrintf("Warning: Some processor has flushed its data. No KMeans will be conducted\n");
2075     // terminate KMeans
2076     CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
2077     bocProxy[0].kMeansDone();
2078   }
2079 }
2080
2081 // Called on each processor
2082 void KMeansBOC::flushCheckDone() {
2083   // **FIXME** - more flexible metric collection scheme may be necessary
2084   //   in the future for production use.
2085   LogPool *pool = CkpvAccess(_trace)->_logPool;
2086
2087   // if(CkMyPe()==0)     CkPrintf("[%d] KMeansBOC::flushCheckDone time=\t%g\n", CkMyPe(), CkWallTimer() );
2088
2089   numEntryMethods = _entryTable.size();
2090   numMetrics = numEntryMethods + 2; // EPtime + idle and overhead
2091
2092   // maintained across phases
2093   markedBegin = false;
2094   markedIdle = false;
2095   beginBlockTime = 0.0;
2096   beginIdleBlockTime = 0.0;
2097   lastBeginEPIdx = -1; // none
2098
2099   lastPhaseIdx = 0;
2100   currentExecTimes = NULL;
2101   currentPhase = 0;
2102   selected = false;
2103
2104   pool->initializePhases();
2105
2106   // incoming K Seeds and the per-phase filter
2107   incKSeeds = new double[numK*numMetrics];
2108   keepMetric = new bool[numMetrics];
2109
2110   //  Something wrong when call thisProxy[CkMyPe()].getNextPhaseMetrics() !??!
2111   //  CProxy_KMeansBOC kMeansProxy(kMeansGID);
2112   //  kMeansProxy[CkMyPe()].getNextPhaseMetrics();
2113   thisProxy[CkMyPe()].getNextPhaseMetrics();
2114 }
2115
2116 // Called on each processor.
2117 void KMeansBOC::getNextPhaseMetrics() {
2118   // Assumes the presence of the complete logs on this processor.
2119   // Assumes first event is always BEGIN_COMPUTATION
2120   // Assumes each processor sees the same number of phases.
2121   //
2122   // In this code, we collect performance data for this processor.
2123   // All times are in seconds.
2124
2125   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::getNextPhaseMetrics time=\t%g\n", CkMyPe(), CkWallTimer() );  
2126
2127   if (usePhases) {
2128     DEBUGF("[%d] Using Phases\n", CkMyPe());
2129   } else {
2130     DEBUGF("[%d] NOT using Phases\n", CkMyPe());
2131   }
2132   
2133   if (currentExecTimes != NULL) {
2134     delete [] currentExecTimes;
2135   }
2136   currentExecTimes = new double[numMetrics];
2137   for (int i=0; i<numMetrics; i++) {
2138     currentExecTimes[i] = 0.0;
2139   }
2140
2141   int numEventMethods = _entryTable.size();
2142   LogPool *pool = CkpvAccess(_trace)->_logPool;
2143   
2144   CkAssert(pool->numEntries > lastPhaseIdx);
2145   double totalPhaseTime = 0.0;
2146   double totalActiveTime = 0.0; // entry method + idle
2147
2148   for (int i=lastPhaseIdx; i<pool->numEntries; i++) {
2149     if (pool->pool[i].type == BEGIN_PROCESSING) {
2150       // check pairing
2151       if (!markedBegin) {
2152         markedBegin = true;
2153       }
2154       beginBlockTime = pool->pool[i].time;
2155       lastBeginEPIdx = pool->pool[i].eIdx;
2156     } else if (pool->pool[i].type == END_PROCESSING) {
2157       // check pairing
2158       // if End without a begin, just ignore
2159       //   this event. If a phase-boundary is crossed, the Begin
2160       //   event would be maintained in beginBlockTime, so it is 
2161       //   not a problem.
2162       if (markedBegin) {
2163         markedBegin = false;
2164         if (pool->pool[i].event < 0)
2165         {
2166           // ignore dummy events. **FIXME** as they have no eIdx?
2167           continue;
2168         }
2169         currentExecTimes[pool->pool[i].eIdx] += 
2170           pool->pool[i].time - beginBlockTime;
2171         totalActiveTime += pool->pool[i].time - beginBlockTime;
2172         lastBeginEPIdx = -1;
2173       }
2174     } else if (pool->pool[i].type == BEGIN_IDLE) {
2175       // check pairing
2176       if (!markedIdle) {
2177         markedIdle = true;
2178       }
2179       beginIdleBlockTime = pool->pool[i].time;
2180     } else if (pool->pool[i].type == END_IDLE) {
2181       // check pairing
2182       if (markedIdle) {
2183         markedIdle = false;
2184         currentExecTimes[numEventMethods] += 
2185           pool->pool[i].time - beginIdleBlockTime;
2186         totalActiveTime += pool->pool[i].time - beginIdleBlockTime;
2187       }
2188     } else if (pool->pool[i].type == END_PHASE) {
2189       // ignored when not using phases
2190       if (usePhases) {
2191         // when we've not visited this node before
2192         if (i != lastPhaseIdx) { 
2193           totalPhaseTime = 
2194             pool->pool[i].time - pool->pool[lastPhaseIdx].time;
2195           // it is important that proper accounting of time take place here.
2196           // Note that END_PHASE events inevitably occur in the context of
2197           //   some entry method by the way the tracing API is designed.
2198           if (markedBegin) {
2199             CkAssert(lastBeginEPIdx >= 0);
2200             currentExecTimes[lastBeginEPIdx] += 
2201               pool->pool[i].time - beginBlockTime;
2202             totalActiveTime += pool->pool[i].time - beginBlockTime;
2203             // this is so the remainder contributes to the next phase
2204             beginBlockTime = pool->pool[i].time;
2205           }
2206           // The following is unlikely, but stranger things have happened.
2207           if (markedIdle) {
2208             currentExecTimes[numEventMethods] +=
2209               pool->pool[i].time - beginIdleBlockTime;
2210             totalActiveTime += pool->pool[i].time - beginIdleBlockTime;
2211             // this is so the remainder contributes to the next phase
2212             beginIdleBlockTime = pool->pool[i].time;
2213           }
2214           if (totalActiveTime <= totalPhaseTime) {
2215             currentExecTimes[numEventMethods+1] = 
2216               totalPhaseTime - totalActiveTime;
2217           } else {
2218             currentExecTimes[numEventMethods+1] = 0.0;
2219             CkPrintf("[%d] Warning: Overhead found to be negative for Phase %d!\n",
2220                      CkMyPe(), currentPhase);
2221           }
2222           collectKMeansData();
2223           // end the loop (and method) and defer the work till the next call
2224           lastPhaseIdx = i;
2225           break; 
2226         }
2227       }
2228     } else if (pool->pool[i].type == END_COMPUTATION) {
2229       if (markedBegin) {
2230         CkAssert(lastBeginEPIdx >= 0);
2231         currentExecTimes[lastBeginEPIdx] += 
2232           pool->pool[i].time - beginBlockTime;
2233         totalActiveTime += pool->pool[i].time - beginBlockTime;
2234       }
2235       if (markedIdle) {
2236         currentExecTimes[numEventMethods] +=
2237           pool->pool[i].time - beginIdleBlockTime;
2238         totalActiveTime += pool->pool[i].time - beginIdleBlockTime;
2239       }
2240       totalPhaseTime = 
2241         pool->pool[i].time - pool->pool[lastPhaseIdx].time;
2242       if (totalActiveTime <= totalPhaseTime) {
2243         currentExecTimes[numEventMethods+1] = totalPhaseTime - totalActiveTime;
2244       } else {
2245         currentExecTimes[numEventMethods+1] = 0.0;
2246         CkPrintf("[%d] Warning: Overhead found to be negative!\n",
2247                  CkMyPe());
2248       }
2249       collectKMeansData();
2250     }
2251   }
2252 }
2253
2254 /**
2255  *  Through a reduction, collectKMeansData aggregates each processors' data
2256  *  in order for global properties to be determined:
2257  *  
2258  *  1. min & max to determine normalization factors.
2259  *  2. sum to determine global EP averages for possible metric reduction
2260  *       through thresholding.
2261  *  3. sum of squares to compute stddev which may be useful in the future.
2262  *
2263  *  collectKMeansData will also keep the processor's data for the current
2264  *    phase so that it may be normalized and worked on subsequently.
2265  *
2266  **/
2267 void KMeansBOC::collectKMeansData() {
2268   int minOffset = numMetrics;
2269   int maxOffset = 2*numMetrics;
2270   int sosOffset = 3*numMetrics; // sos = Sum Of Squares
2271
2272   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::collectKMeansData time=\tg\n", CkMyPe(), CkWallTimer() );
2273
2274   double *reductionMsg = new double[numMetrics*4];
2275
2276   for (int i=0; i<numMetrics; i++) {
2277     reductionMsg[i] = currentExecTimes[i];
2278     // duplicate the event times for max and min sections of the reduction
2279     reductionMsg[minOffset + i] = currentExecTimes[i];
2280     reductionMsg[maxOffset + i] = currentExecTimes[i];
2281     // compute squares
2282     reductionMsg[sosOffset + i] = currentExecTimes[i]*currentExecTimes[i];
2283   }
2284
2285   CkCallback cb(CkIndex_KMeansBOC::globalMetricRefinement(NULL), 
2286                 0, thisProxy);
2287   contribute((numMetrics*4)*sizeof(double), reductionMsg, 
2288              outlierReductionType, cb);  
2289 }
2290
2291 // The purpose is mainly to initialize the k seeds and generate
2292 //   normalization parameters for each of the metrics. The k seeds
2293 //   and normalization parameters are broadcast to all processors.
2294 //
2295 // Called on processor 0
2296 void KMeansBOC::globalMetricRefinement(CkReductionMsg *msg) {
2297   CkAssert(CkMyPe() == 0);
2298   
2299   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::globalMetricRefinement time=\t%g\n", CkMyPe(), CkWallTimer() );
2300
2301   int sumOffset = 0;
2302   int minOffset = numMetrics;
2303   int maxOffset = 2*numMetrics;
2304   int sosOffset = 3*numMetrics; // sos = Sum Of Squares
2305
2306   // calculate statistics & boundaries for the k seeds for clustering
2307   KMeansStatsMessage *outmsg = 
2308     new (numMetrics, numK*numMetrics, numMetrics*4) KMeansStatsMessage;
2309   outmsg->numMetrics = numMetrics;
2310   outmsg->numKPos = numK*numMetrics;
2311   outmsg->numStats = numMetrics*4;
2312
2313   // Sum | Min | Max | Sum of Squares
2314   double *totalExecTimes = (double *)msg->getData();
2315   double totalTime = 0.0;
2316
2317   for (int i=0; i<numMetrics; i++) {
2318     DEBUGN("%lf\n", totalExecTimes[i]);
2319     totalTime += totalExecTimes[i];
2320
2321     // calculate event mean over all processors
2322     outmsg->stats[sumOffset + i] = totalExecTimes[sumOffset + i]/CkNumPes();
2323
2324     // get the ranges and offsets of each metric. With this, we get
2325     //   normalization factors that can be sent back to each processor to
2326     //   be used as necessary. We reuse max for range. Min remains the offset.
2327     outmsg->stats[minOffset + i] = totalExecTimes[minOffset + i];
2328     outmsg->stats[maxOffset + i] = totalExecTimes[maxOffset + i] -
2329       totalExecTimes[minOffset + i];
2330     
2331     // calculate stddev (using biased variance)
2332     outmsg->stats[sosOffset + i] = 
2333       sqrt((totalExecTimes[sosOffset + i] - 
2334             2*(outmsg->stats[i])*totalExecTimes[i] +
2335             (outmsg->stats[i])*(outmsg->stats[i])*CkNumPes())/
2336            CkNumPes());
2337   }
2338
2339   for (int i=0; i<numMetrics; i++) {
2340     // 1) if the proportion of the max value of the entry method relative to
2341     //   the average time taken over all entry methods across all processors
2342     //   is greater than the stipulated percentage threshold ...; AND
2343     // 2) if the range of values are non-zero.
2344     //
2345     // The current assumption is totalTime > 0 (what program has zero total
2346     //   time from all work?)
2347     keepMetric[i] = ((totalExecTimes[maxOffset + i]/(totalTime/CkNumPes()) >=
2348                      entryThreshold) &&
2349       (totalExecTimes[maxOffset + i] > totalExecTimes[minOffset + i]));
2350     if (keepMetric[i]) {
2351       DEBUGF("[%d] Keep EP %d | Max = %lf | Avg Tot = %lf\n", CkMyPe(), i,
2352              totalExecTimes[maxOffset + i], totalTime/CkNumPes());
2353     } else {
2354       DEBUGN("[%d] DO NOT Keep EP %d\n", CkMyPe(), i);
2355     }
2356     outmsg->filter[i] = keepMetric[i];
2357   }
2358
2359   delete msg;
2360   
2361   // initialize k seeds for this phase
2362   kSeeds = new double[numK*numMetrics];
2363
2364   numKReported = 0;
2365   kNumMembers = new int[numK];
2366
2367   // Randomly select k processors' metric vectors for the k seeds
2368   //  srand((unsigned)(CmiWallTimer()*1.0e06));
2369   srand(11337); // for debugging purposes
2370   for (int k=0; k<numK; k++) {
2371     DEBUGF("Seed %d | ", k);
2372     for (int m=0; m<numMetrics; m++) {
2373       double factor = totalExecTimes[maxOffset + m] - 
2374         totalExecTimes[minOffset + m];
2375       // "uniform" distribution, scaled according to the normalization
2376       //   factors
2377       //      kSeeds[numMetrics*k + m] = ((1.0*(k+1))/numK)*factor;
2378       // Random distribution.
2379       kSeeds[numMetrics*k + m] =
2380         ((rand()*1.0)/RAND_MAX)*factor;
2381       if (keepMetric[m]) {
2382         DEBUGF("[%d|%lf] ", m, kSeeds[numMetrics*k + m]);
2383       }
2384       outmsg->kSeedsPos[numMetrics*k + m] = kSeeds[numMetrics*k + m];
2385     }
2386     DEBUGF("\n");
2387     kNumMembers[k] = 0;
2388   }
2389
2390   // broadcast statistical values to all processors for cluster discovery
2391   thisProxy.findInitialClusters(outmsg);
2392 }
2393
2394
2395
2396 // Called on each processor.
2397 void KMeansBOC::findInitialClusters(KMeansStatsMessage *msg) {
2398
2399  if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::findInitialClusters time=\t%g\n", CkMyPe(), CkWallTimer() );
2400
2401   phaseIter = 0;
2402
2403   // Get info from stats message
2404   CkAssert(numMetrics == msg->numMetrics);
2405   for (int i=0; i<numMetrics; i++) {
2406     keepMetric[i] = msg->filter[i];
2407   }
2408
2409   // Normalize data on local processor.
2410   // **CWL** See my thesis for detailed discussion of normalization of
2411   //    performance data.
2412   // **NOTE** This might change if we want to send data based on the filter
2413   //   instead of all the data.
2414   CkAssert(numMetrics*4 == msg->numStats);
2415   for (int i=0; i<numMetrics; i++) {
2416     currentExecTimes[i] -= msg->stats[numMetrics + i];  // take offset
2417     // **CWL** We do not normalize the range. Entry methods that exhibit
2418     //   large absolute timing variations should be allowed to contribute
2419     //   more to the Euclidean distance measure!
2420     // currentExecTimes[i] /= msg->stats[2*numMetrics + i];
2421   }
2422
2423   // **NOTE** This might change if we want to send data based on the filter
2424   //   instead of all the data.
2425   CkAssert(numK*numMetrics == msg->numKPos);
2426   for (int i=0; i<msg->numKPos; i++) {
2427     incKSeeds[i] = msg->kSeedsPos[i];
2428   }
2429
2430   // Decide which KSeed this processor belongs to.
2431   minDistance = calculateDistance(0);
2432   DEBUGN("[%d] Phase %d Iter %d | Distance from 0 = %lf \n", CkMyPe(), 
2433            currentPhase, phaseIter, minDistance);
2434   minK = 0;
2435   for (int i=1; i<numK; i++) {
2436     double distance = calculateDistance(i);
2437     DEBUGN("[%d] Phase %d Iter %d | Distance from %d = %lf \n", CkMyPe(), 
2438              currentPhase, phaseIter, i, distance);
2439     if (distance < minDistance) {
2440       minDistance = distance;
2441       minK = i;
2442     }
2443   }
2444
2445   // Set up a reduction with the modification vector to the root (0).
2446   //
2447   // The modification vector sends a negative value for each metric
2448   //   for the K this processor no longer belongs to and a positive
2449   //   value to the K the processor now belongs. In addition, a -1.0
2450   //   is sent to the K it is leaving and a +1.0 to the K it is 
2451   //   joining.
2452   //
2453   // The processor must still contribute a "zero returns" even if
2454   //   nothing changes. This will be the basis for determine
2455   //   convergence at the root.
2456   //
2457   // The addtional +1 is meant for the count-change that must be
2458   //   maintained for the special cases at the root when some K
2459   //   may be deprived of all processor points or go from 0 to a
2460   //   positive number of processors (see later comments).
2461   double *modVector = new double[numK*(numMetrics+1)];
2462   for (int i=0; i<numK; i++) {
2463     for (int j=0; j<numMetrics+1; j++) {
2464       modVector[i*(numMetrics+1) + j] = 0.0;
2465     }
2466   }
2467   for (int i=0; i<numMetrics; i++) {
2468     // for this initialization, only positive values need be sent.
2469     modVector[minK*(numMetrics+1) + i] = currentExecTimes[i];
2470   }
2471   modVector[minK*(numMetrics+1)+numMetrics] = 1.0;
2472
2473   CkCallback cb(CkIndex_KMeansBOC::updateKSeeds(NULL), 
2474                 0, thisProxy);
2475   contribute(numK*(numMetrics+1)*sizeof(double), modVector, 
2476              CkReduction::sum_double, cb);  
2477 }
2478
2479 double KMeansBOC::calculateDistance(int k) {
2480   double ret = 0.0;
2481   for (int i=0; i<numMetrics; i++) {
2482     if (keepMetric[i]) {
2483       DEBUGN("[%d] Phase %d Iter %d Metric %d Exec %lf Seed %lf \n", 
2484              CkMyPe(), currentPhase, phaseIter, i,
2485                currentExecTimes[i], incKSeeds[k*numMetrics + i]);
2486       ret += pow(currentExecTimes[i] - incKSeeds[k*numMetrics + i], 2.0);
2487     }
2488   }
2489   return sqrt(ret);
2490 }
2491
2492 void KMeansBOC::updateKSeeds(CkReductionMsg *msg) {
2493   CkAssert(CkMyPe() == 0);
2494
2495   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::updateKSeeds time=\t%g\n", CkMyPe(), CkWallTimer() );
2496
2497   double *modVector = (double *)msg->getData();
2498   // sanity check
2499   CkAssert(numK*(numMetrics+1)*sizeof(double) == msg->getSize());
2500
2501   // A quick convergence test.
2502   bool hasChanges = false;
2503   for (int i=0; i<numK; i++) {
2504     hasChanges = hasChanges || 
2505       (modVector[i*(numMetrics+1) + numMetrics] != 0.0);
2506   }
2507   if (!hasChanges) {
2508     delete msg;
2509     findRepresentatives();
2510   } else {
2511     int overallChange = 0;
2512     for (int i=0; i<numK; i++) {
2513       int change = (int)modVector[i*(numMetrics+1) + numMetrics];
2514       if (change != 0) {
2515         overallChange += change;
2516         // modify the k seeds based on the modification vectors coming in
2517         //
2518         // If a seed initially has no members, its contents do not matter and
2519         //   is simply set to the average of the incoming vector.
2520         // If the change causes a seed to lose all its members, do nothing.
2521         //   Its last-known location is kept to allow it to re-capture
2522         //   membership at the next iteration rather than apply the last
2523         //   changes (which snaps the point unnaturally to 0,0).
2524         // Otherwise, apply the appropriate vector changes.
2525         CkAssert((kNumMembers[i] + change >= 0) &&
2526                  (kNumMembers[i] + change <= CkNumPes()));
2527         if (kNumMembers[i] == 0) {
2528           CkAssert(change > 0);
2529           for (int j=0; j<numMetrics; j++) {
2530             kSeeds[i*numMetrics + j] = modVector[i*(numMetrics+1) + j]/change;
2531           }
2532         } else if (kNumMembers[i] + change == 0) {
2533           // do nothing.
2534         } else {
2535           for (int j=0; j<numMetrics; j++) {
2536             kSeeds[i*numMetrics + j] *= kNumMembers[i];
2537             kSeeds[i*numMetrics + j] += modVector[i*(numMetrics+1) + j];
2538             kSeeds[i*numMetrics + j] /= kNumMembers[i] + change;
2539           }
2540         }
2541         kNumMembers[i] += change;
2542       }
2543       DEBUGN("[%d] Phase %d Iter %d K = %d Membership Count = %d\n",
2544              CkMyPe(), currentPhase, phaseIter, i, kNumMembers[i]);
2545     }
2546     delete msg;
2547
2548     // broadcast the new seed locations.
2549     KSeedsMessage *outmsg = new (numK*numMetrics) KSeedsMessage;
2550     outmsg->numKPos = numK*numMetrics;
2551     for (int i=0; i<numK*numMetrics; i++) {
2552       outmsg->kSeedsPos[i] = kSeeds[i];
2553     }
2554
2555     thisProxy.updateSeedMembership(outmsg);
2556   }
2557 }
2558
2559 // Called on all processors
2560 void KMeansBOC::updateSeedMembership(KSeedsMessage *msg) {
2561
2562   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::updateSeedMembership time=\t%g\n", CkMyPe(), CkWallTimer() );
2563
2564   phaseIter++;
2565
2566   // **NOTE** This might change if we want to send data based on the filter
2567   //   instead of all the data.
2568   CkAssert(numK*numMetrics == msg->numKPos);
2569   for (int i=0; i<msg->numKPos; i++) {
2570     incKSeeds[i] = msg->kSeedsPos[i];
2571   }
2572
2573   // Decide which KSeed this processor belongs to.
2574   lastMinK = minK;
2575   minDistance = calculateDistance(0);
2576   DEBUGN("[%d] Phase %d Iter %d | Distance from 0 = %lf \n", CkMyPe(), 
2577          currentPhase, phaseIter, minDistance);
2578
2579   minK = 0;
2580   for (int i=1; i<numK; i++) {
2581     double distance = calculateDistance(i);
2582     DEBUGN("[%d] Phase %d Iter %d | Distance from %d = %lf \n", CkMyPe(), 
2583            currentPhase, phaseIter, i, distance);
2584     if (distance < minDistance) {
2585       minDistance = distance;
2586       minK = i;
2587     }
2588   }
2589
2590   double *modVector = new double[numK*(numMetrics+1)];
2591   for (int i=0; i<numK; i++) {
2592     for (int j=0; j<numMetrics+1; j++) {
2593       modVector[i*(numMetrics+1) + j] = 0.0;
2594     }
2595   }
2596
2597   if (minK != lastMinK) {
2598     for (int i=0; i<numMetrics; i++) {
2599       modVector[minK*(numMetrics+1) + i] = currentExecTimes[i];
2600       modVector[lastMinK*(numMetrics+1) + i] = -currentExecTimes[i];
2601     }
2602     modVector[minK*(numMetrics+1)+numMetrics] = 1.0;
2603     modVector[lastMinK*(numMetrics+1)+numMetrics] = -1.0;
2604   }
2605
2606   CkCallback cb(CkIndex_KMeansBOC::updateKSeeds(NULL), 
2607                 0, thisProxy);
2608   contribute(numK*(numMetrics+1)*sizeof(double), modVector, 
2609              CkReduction::sum_double, cb);  
2610 }
2611
2612 void KMeansBOC::findRepresentatives() {
2613
2614   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::findRepresentatives time=\t%g\n", CkMyPe(), CkWallTimer() );
2615
2616   int numNonEmptyClusters = 0;
2617   for (int i=0; i<numK; i++) {
2618     if (kNumMembers[i] > 0) {
2619       numNonEmptyClusters++;
2620     }
2621   }
2622
2623   int numRepresentatives = peNumKeep;
2624   // **FIXME**
2625   // This is fairly arbitrary. Next time, choose the centers of the top
2626   //   largest clusters.
2627   if (numRepresentatives < numNonEmptyClusters) {
2628     numRepresentatives = numNonEmptyClusters;
2629   }
2630
2631   int slotsRemaining = numRepresentatives;
2632
2633   DEBUGF("Slots = %d | Non-empty = %d \n", slotsRemaining, 
2634          numNonEmptyClusters);
2635
2636   // determine how many exemplars to select per cluster. Currently
2637   //   hardcoded to 1. Future challenge is to decide on other numbers
2638   //   or proportionality.
2639   //
2640   int exemplarsPerCluster = 1;
2641   slotsRemaining -= exemplarsPerCluster*numNonEmptyClusters;
2642
2643   int numCandidateOutliers = CkNumPes() - 
2644     exemplarsPerCluster*numNonEmptyClusters;
2645
2646   double *remainders = new double[numK];
2647   int *assigned = new int[numK];
2648   exemplarChoicesLeft = new int[numK];
2649   outlierChoicesLeft = new int[numK];
2650
2651   for (int i=0; i<numK; i++) {
2652     assigned[i] = 0;
2653     remainders[i] = 
2654       (kNumMembers[i] - exemplarsPerCluster*numNonEmptyClusters) *
2655       slotsRemaining / numCandidateOutliers;
2656     if (remainders[i] >= 0.0) {
2657       assigned[i] = (int)floor(remainders[i]);
2658       remainders[i] -= assigned[i];
2659     } else {
2660       remainders[i] = 0.0;
2661     }
2662   }
2663
2664   for (int i=0; i<numK; i++) {
2665     slotsRemaining -= assigned[i];
2666   }
2667   CkAssert(slotsRemaining >= 0);
2668
2669   // find clusters to assign the loose slots to, in order of
2670   // remainder proportion
2671   while (slotsRemaining > 0) {
2672     double max = 0.0;
2673     int winner = 0;
2674     for (int i=0; i<numK; i++) {
2675       if (remainders[i] > max) {
2676         max = remainders[i];
2677         winner = i;
2678       }
2679     }
2680     assigned[winner]++;
2681     remainders[winner] = 0.0;
2682     slotsRemaining--;
2683   }
2684
2685   // set up how many reduction cycles of min/max we need to conduct to
2686   // select the representatives.
2687   numSelectionIter = exemplarsPerCluster;
2688   for (int i=0; i<numK; i++) {
2689     if (assigned[i] > numSelectionIter) {
2690       numSelectionIter = assigned[i];
2691     }
2692   }
2693   DEBUGF("Selection Iterations = %d\n", numSelectionIter);
2694
2695   for (int i=0; i<numK; i++) {
2696     if (kNumMembers[i] > 0) {
2697       exemplarChoicesLeft[i] = exemplarsPerCluster;
2698       outlierChoicesLeft[i] = assigned[i];
2699     } else {
2700       exemplarChoicesLeft[i] = 0;
2701       outlierChoicesLeft[i] = 0;
2702     }
2703     DEBUGF("%d | Exemplar = %d | Outlier = %d\n", i, exemplarChoicesLeft[i],
2704            outlierChoicesLeft[i]);
2705   }
2706
2707   delete [] assigned;
2708   delete [] remainders;
2709
2710   // send out first broadcast
2711   KSelectionMessage *outmsg = NULL;
2712   if (numSelectionIter > 0) {
2713     outmsg = new (numK, numK, numK) KSelectionMessage;
2714     outmsg->numKMinIDs = numK;
2715     outmsg->numKMaxIDs = numK;
2716     for (int i=0; i<numK; i++) {
2717       outmsg->minIDs[i] = -1;
2718       outmsg->maxIDs[i] = -1;
2719     }
2720     thisProxy.collectDistances(outmsg);
2721   } else {
2722     CkPrintf("Warning: No selection iteration from the start!\n");
2723     // invoke phase completion on all processors
2724     thisProxy.phaseDone();
2725   }
2726 }
2727
2728 /*
2729  *  lastMin = array of minimum champions of the last tournament
2730  *  lastMax = array of maximum champions of the last tournament
2731  *  lastMaxVal = array of last encountered maximum values, allows previous
2732  *                 minimum winners to eliminate themselves from the next
2733  *                 minimum race.
2734  *
2735  *  Called on all processors.
2736  */
2737 void KMeansBOC::collectDistances(KSelectionMessage *msg) {
2738
2739   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::collectDistances time=\t%g\n", CkMyPe(), CkWallTimer() );
2740
2741   DEBUGF("[%d] %d | min = %d max = %d\n", CkMyPe(),
2742          lastMinK, msg->minIDs[lastMinK], msg->maxIDs[lastMinK]);
2743   if ((CkMyPe() == msg->minIDs[lastMinK]) || 
2744       (CkMyPe() == msg->maxIDs[lastMinK])) {
2745     CkAssert(!selected);
2746     selected = true;
2747   }
2748
2749   // build outgoing reduction structure
2750   //   format = minVal | ID | maxVal | ID
2751   double *minMaxAndIDs = NULL;
2752
2753   minMaxAndIDs = new double[numK*4];
2754   // initialize to the appropriate out-of-band values (for error checks)
2755   for (int i=0; i<numK; i++) {
2756     minMaxAndIDs[i*4] = -1.0; // out-of-band min value
2757     minMaxAndIDs[i*4+1] = -1.0; // out of band ID
2758     minMaxAndIDs[i*4+2] = -1.0; // out-of-band max value
2759     minMaxAndIDs[i*4+3] = -1.0; // out of band ID
2760   }
2761   // If I have not won before, I put myself back into the competition
2762   if (!selected) {
2763     DEBUGF("[%d] My Contribution = %lf\n", CkMyPe(), minDistance);
2764     minMaxAndIDs[lastMinK*4] = minDistance;
2765     minMaxAndIDs[lastMinK*4+1] = CkMyPe();
2766     minMaxAndIDs[lastMinK*4+2] = minDistance;
2767     minMaxAndIDs[lastMinK*4+3] = CkMyPe();
2768   }
2769   delete msg;
2770
2771   CkCallback cb(CkIndex_KMeansBOC::findNextMinMax(NULL), 
2772                 0, thisProxy);
2773   contribute(numK*4*sizeof(double), minMaxAndIDs, 
2774              minMaxReductionType, cb);  
2775 }
2776
2777 void KMeansBOC::findNextMinMax(CkReductionMsg *msg) {
2778   // incoming format:
2779   //   minVal | minID | maxVal | maxID
2780
2781   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::findNextMinMax time=\t%g\n", CkMyPe(), CkWallTimer() );
2782
2783   if (numSelectionIter > 0) {
2784     double *incInfo = (double *)msg->getData();
2785     
2786     KSelectionMessage *outmsg = new (numK, numK) KSelectionMessage;
2787     outmsg->numKMinIDs = numK;
2788     outmsg->numKMaxIDs = numK;
2789     
2790     for (int i=0; i<numK; i++) {
2791       DEBUGF("%d | %lf %d %lf %d \n", i, 
2792              incInfo[i*4], (int)incInfo[i*4+1], 
2793              incInfo[i*4+2], (int)incInfo[i*4+3]);
2794     }
2795
2796     for (int i=0; i<numK; i++) {
2797       if (exemplarChoicesLeft[i] > 0) {
2798         outmsg->minIDs[i] = (int)incInfo[i*4+1];
2799         exemplarChoicesLeft[i]--;
2800       } else {
2801         outmsg->minIDs[i] = -1;
2802       }
2803       if (outlierChoicesLeft[i] > 0) {
2804         outmsg->maxIDs[i] = (int)incInfo[i*4+3];
2805         outlierChoicesLeft[i]--;
2806       } else {
2807         outmsg->maxIDs[i] = -1;
2808       }
2809     }
2810     thisProxy.collectDistances(outmsg);
2811     numSelectionIter--;
2812   } else {
2813     // invoke phase completion on all processors
2814     thisProxy.phaseDone();
2815   }
2816 }
2817
2818 /**
2819  *  Completion of the K-Means clustering and data selection of one phase
2820  *    of the computation.
2821  *
2822  *  Called on every processor.
2823  */
2824 void KMeansBOC::phaseDone() {
2825
2826   //  if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::phaseDone time=\t%g\n", CkMyPe(), CkWallTimer() );
2827
2828   LogPool *pool = CkpvAccess(_trace)->_logPool;
2829   CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
2830
2831   // now decide on what to do with the decision.
2832   if (!selected) {
2833     if (usePhases) {
2834       pool->keepPhase[currentPhase] = false;
2835     } else {
2836       // if not using phases, we're working on the whole log
2837       pool->setAllPhases(false);
2838     }
2839   }
2840
2841   // **FIXME** (?) - All processors have to agree on this, or the reduction
2842   //   will not be correct! The question is "is this enforcible?"
2843   if ((currentPhase == (pool->numPhases-1)) || !usePhases) {
2844     // We're done
2845     int dummy = 0;
2846     CkCallback cb(CkIndex_TraceProjectionsBOC::kMeansDone(NULL), 
2847                   0, bocProxy);
2848     contribute(sizeof(int), &dummy, CkReduction::sum_int, cb);
2849   } else {
2850     // reset all phase-based k-means data and decisions
2851
2852     // **FIXME**!!!!!    
2853     
2854     // invoke the next K-Means computation phase.
2855     currentPhase++;
2856     thisProxy[CkMyPe()].getNextPhaseMetrics();
2857   }
2858 }
2859
2860 void TraceProjectionsBOC::startTimeAnalysis()
2861 {
2862   double startTime = 0.0;
2863   if (CkpvAccess(_trace)->_logPool->numEntries>0)
2864      startTime = CkpvAccess(_trace)->_logPool->pool[0].time;
2865   CkCallback cb(CkIndex_TraceProjectionsBOC::startTimeDone(NULL), thisProxy);
2866   contribute(sizeof(double), &startTime, CkReduction::min_double, cb);  
2867 }
2868
2869 void TraceProjectionsBOC::startTimeDone(CkReductionMsg *msg)
2870 {
2871   // CkPrintf("[%d] TraceProjectionsBOC::startTimeDone time=\t%g parModulesRemaining:%d\n", CkMyPe(), CkWallTimer(), parModulesRemaining);
2872
2873   if (CkpvAccess(_trace) != NULL) {
2874     CkpvAccess(_trace)->_logPool->globalStartTime = *(double *)msg->getData();
2875     CkpvAccess(_trace)->_logPool->setNewStartTime();
2876     //if (CkMyPe() == 0) CkPrintf("Start time determined to be %lf us\n", (CkpvAccess(_trace)->_logPool->globalStartTime)*1e06);
2877   }
2878   delete msg;
2879   thisProxy[CkMyPe()].startEndTimeAnalysis();
2880 }
2881
2882 void TraceProjectionsBOC::startEndTimeAnalysis()
2883 {
2884  //CkPrintf("[%d] TraceProjectionsBOC::startEndTimeAnalysis time=\t%g\n", CkMyPe(), CkWallTimer() );
2885
2886   endTime = CkpvAccess(_trace)->endTime;
2887   // CkPrintf("[%d] End time is %lf us\n", CkMyPe(), endTime*1e06);
2888
2889   CkCallback cb(CkIndex_TraceProjectionsBOC::endTimeDone(NULL), 
2890                 0, thisProxy);
2891   contribute(sizeof(double), &endTime, CkReduction::max_double, cb);  
2892 }
2893
2894 void TraceProjectionsBOC::endTimeDone(CkReductionMsg *msg)
2895 {
2896  //if(CkMyPe()==0)    CkPrintf("[%d] TraceProjectionsBOC::endTimeDone time=\t%g parModulesRemaining:%d\n", CkMyPe(), CkWallTimer(), parModulesRemaining);
2897
2898   CkAssert(CkMyPe() == 0);
2899   parModulesRemaining--;
2900   if (CkpvAccess(_trace) != NULL) {
2901     CkpvAccess(_trace)->_logPool->globalEndTime = *(double *)msg->getData() - CkpvAccess(_trace)->_logPool->globalStartTime;
2902     // CkPrintf("End time determined to be %lf us\n",
2903     //       (CkpvAccess(_trace)->_logPool->globalEndTime)*1e06);
2904   }
2905   delete msg;
2906   if (parModulesRemaining == 0) {
2907     thisProxy[CkMyPe()].finalize();
2908   }
2909 }
2910
2911 void TraceProjectionsBOC::kMeansDone(CkReductionMsg *msg) {
2912
2913  if(CkMyPe()==0)  CkPrintf("[%d] TraceProjectionsBOC::kMeansDone time=\t%g\n", CkMyPe(), CkWallTimer() );
2914
2915   CkAssert(CkMyPe() == 0);
2916   parModulesRemaining--;
2917   CkPrintf("K-Means Analysis Time = %lf seconds\n",
2918            CmiWallTimer()-analysisStartTime);
2919   delete msg;
2920   if (parModulesRemaining == 0) {
2921     thisProxy[CkMyPe()].finalize();
2922   }
2923 }
2924
2925 /**
2926  *
2927  *  This version is called (on processor 0) only if flushCheck fails.
2928  *
2929  */
2930 void TraceProjectionsBOC::kMeansDone() {
2931   CkAssert(CkMyPe() == 0);
2932   parModulesRemaining--;
2933   CkPrintf("K-Means Analysis Aborted because of flush. Time taken = %lf seconds\n",
2934            CmiWallTimer()-analysisStartTime);
2935   if (parModulesRemaining == 0) {
2936     thisProxy[CkMyPe()].finalize();
2937   }
2938 }
2939
2940 void TraceProjectionsBOC::finalize()
2941 {
2942   CkAssert(CkMyPe() == 0);
2943   //CkPrintf("Total Analysis Time = %lf seconds\n", 
2944   //       CmiWallTimer()-analysisStartTime);
2945   thisProxy.closingTraces();
2946 }
2947
2948 // Called on every processor
2949 void TraceProjectionsBOC::closingTraces() {
2950   CkpvAccess(_trace)->closeTrace();
2951
2952     // subtle:  reduction needs to go to the PE which started CkExit()
2953   int pe = 0;
2954   if (endPe != -1) pe = endPe;
2955   CkCallback cb(CkIndex_TraceProjectionsBOC::closeParallelShutdown(NULL), 
2956                 pe, thisProxy); 
2957   contribute(0, NULL, CkReduction::sum_int, cb);  
2958 }
2959
2960 // The sole purpose of this reduction is to decide whether or not
2961 //   Projections as a module needs to call CkExit() to get other
2962 //   modules to shutdown.
2963 void TraceProjectionsBOC::closeParallelShutdown(CkReductionMsg *msg) {
2964   CkAssert(endPe == -1 && CkMyPe() ==0 || CkMyPe() == endPe);
2965   delete msg;
2966   // decide if CkExit() needs to be called
2967   if (!CkpvAccess(_trace)->converseExit) {
2968     CkExit();
2969   }
2970 }
2971 /*
2972  *  Registration and definition of the Outlier Reduction callback.
2973  *  Format: Sum | Min | Max | Sum of Squares
2974  */
2975 CkReductionMsg *outlierReduction(int nMsgs,
2976                                  CkReductionMsg **msgs) {
2977   int numBytes = 0;
2978   int numMetrics = 0;
2979   double *ret = NULL;
2980
2981   if (nMsgs == 1) {
2982     // nothing to do, just pass it on
2983     return CkReductionMsg::buildNew(msgs[0]->getSize(),msgs[0]->getData());
2984   }
2985
2986   if (nMsgs > 1) {
2987     numBytes = msgs[0]->getSize();
2988     // sanity checks
2989     if (numBytes%sizeof(double) != 0) {
2990       CkAbort("Outlier Reduction Size incompatible with doubles!\n");
2991     }
2992     if ((numBytes/sizeof(double))%4 != 0) {
2993       CkAbort("Outlier Reduction Size Array not divisible by 4!\n");
2994     }
2995     numMetrics = (numBytes/sizeof(double))/4;
2996     ret = new double[numMetrics*4];
2997
2998     // copy the first message data into the return structure first
2999     for (int i=0; i<numMetrics*4; i++) {
3000       ret[i] = ((double *)msgs[0]->getData())[i];
3001     }
3002
3003     // Sum | Min | Max | Sum of Squares
3004     for (int msgIdx=1; msgIdx<nMsgs; msgIdx++) {
3005       for (int i=0; i<numMetrics; i++) {
3006         // Sum
3007         ret[i] += ((double *)msgs[msgIdx]->getData())[i];
3008         // Min
3009         ret[numMetrics + i] =
3010           (ret[numMetrics + i] < 
3011            ((double *)msgs[msgIdx]->getData())[numMetrics + i]) 
3012           ? ret[numMetrics + i] : 
3013           ((double *)msgs[msgIdx]->getData())[numMetrics + i];
3014         // Max
3015         ret[2*numMetrics + i] = 
3016           (ret[2*numMetrics + i] >
3017            ((double *)msgs[msgIdx]->getData())[2*numMetrics + i])
3018           ? ret[2*numMetrics + i] :
3019           ((double *)msgs[msgIdx]->getData())[2*numMetrics + i];
3020         // Sum of Squares (squaring already done at leaf)
3021         ret[3*numMetrics + i] +=
3022           ((double *)msgs[msgIdx]->getData())[3*numMetrics + i];
3023       }
3024     }
3025   }
3026   
3027   /* apparently, we do not delete the incoming messages */
3028   return CkReductionMsg::buildNew(numBytes,ret);
3029 }
3030
3031 /*
3032  * The only reason we have a user-defined reduction is to support
3033  *   identification of the "winning" processors as well as to handle
3034  *   both the min and the max of each "tournament". A simple min/max
3035  *   discovery cannot handle ties.
3036  */
3037 CkReductionMsg *minMaxReduction(int nMsgs,
3038                                 CkReductionMsg **msgs) {
3039   CkAssert(nMsgs > 0);
3040
3041   int numBytes = msgs[0]->getSize();
3042   CkAssert(numBytes%sizeof(double) == 0);
3043   int numK = (numBytes/sizeof(double))/4;
3044
3045   double *ret = new double[numK*4];
3046   // fill with out-of-band values
3047   for (int i=0; i<numK; i++) {
3048     ret[i*4] = -1.0;
3049     ret[i*4+1] = -1.0;
3050     ret[i*4+2] = -1.0;
3051     ret[i*4+3] = -1.0;
3052   }
3053
3054   // incoming format K * (minVal | minIdx | maxVal | maxIdx)
3055   for (int i=0; i<nMsgs; i++) {
3056     double *temp = (double *)msgs[i]->getData();
3057     for (int j=0; j<numK; j++) {
3058       // no previous valid min
3059       if (ret[j*4+1] < 0) {
3060         // fill it in only if the incoming min is valid
3061         if (temp[j*4+1] >= 0) {
3062           ret[j*4] = temp[j*4];      // fill min value
3063           ret[j*4+1] = temp[j*4+1];  // fill ID
3064         }
3065       } else {
3066         // find Min, only if incoming min is valid
3067         if (temp[j*4+1] >= 0) {
3068           if (temp[j*4] < ret[j*4]) {
3069             ret[j*4] = temp[j*4];      // replace min value
3070             ret[j*4+1] = temp[j*4+1];  // replace ID
3071           }
3072         }
3073       }
3074       // no previous valid max
3075       if (ret[j*4+3] < 0) {
3076         // fill only if the incoming max is valid
3077         if (temp[j*4+3] >= 0) {
3078           ret[j*4+2] = temp[j*4+2];  // fill max value
3079           ret[j*4+3] = temp[j*4+3];  // fill ID
3080         }
3081       } else {
3082         // find Max, only if incoming max is valid
3083         if (temp[j*4+3] >= 0) {
3084           if (temp[j*4+2] > ret[j*4+2]) {
3085             ret[j*4+2] = temp[j*4+2];  // replace max value
3086             ret[j*4+3] = temp[j*4+3];  // replace ID
3087           }
3088         }
3089       }
3090     }
3091   }
3092   CkReductionMsg *redmsg = CkReductionMsg::buildNew(numBytes, ret);
3093   delete [] ret;
3094   return redmsg;
3095 }
3096
3097 #include "TraceProjections.def.h"
3098 #endif //PROJ_ANALYSIS
3099
3100 /*@}*/