add API to pass application annotation time to tracing framework
[charm.git] / src / ck-perf / trace-projections.C
1 /**
2  * \addtogroup CkPerf
3 */
4 /*@{*/
5
6 #include <string.h>
7
8 #include "charm++.h"
9 #include "trace-projections.h"
10 #include "trace-projectionsBOC.h"
11 #include "TopoManager.h"
12
13 #if DEBUG_PROJ
14 #define DEBUGF(...) CkPrintf(__VA_ARGS__)
15 #else
16 #define DEBUGF(...)
17 #endif
18 #define DEBUGN(...)  // easy way to selectively disable DEBUGs
19
20 #define DefaultLogBufSize      1000000
21
22 // **CW** Simple delta encoding implementation
23 // delta encoding is on by default. It may be turned off later in
24 // the runtime.
25 int deltaLog;
26 int nonDeltaLog;
27
28 int checknested=0;              // check illegal nested begin/end execute 
29
30 #ifdef PROJ_ANALYSIS
31 // BOC operations readonlys
32 CkGroupID traceProjectionsGID;
33 CkGroupID kMeansGID;
34
35 // New reduction type for Outlier Analysis purposes. This is allowed to be
36 // a global variable according to the Charm++ manual.
37 CkReductionMsg *outlierReduction(int nMsgs,
38                                  CkReductionMsg **msgs);
39 CkReductionMsg *minMaxReduction(int nMsgs,
40                                 CkReductionMsg **msgs);
41 CkReduction::reducerType outlierReductionType;
42 CkReduction::reducerType minMaxReductionType;
43 #endif // PROJ_ANALYSIS
44
45 CkpvStaticDeclare(TraceProjections*, _trace);
46 CtvStaticDeclare(int,curThreadEvent);
47
48 CkpvDeclare(CmiInt8, CtrLogBufSize);
49
50 typedef CkVec<char *>  usrEventVec;
51 CkpvStaticDeclare(usrEventVec, usrEventlist);
52 class UsrEvent {
53 public:
54   int e;
55   char *str;
56   UsrEvent(int _e, char* _s): e(_e),str(_s) {}
57 };
58 CkpvStaticDeclare(CkVec<UsrEvent *>*, usrEvents);
59
60 // When tracing is disabled, these are defined as empty static inlines
61 // in the header, to minimize overhead
62 #if CMK_TRACE_ENABLED
63 /// Disable the outputting of the trace logs
64 void disableTraceLogOutput()
65
66   CkpvAccess(_trace)->setWriteData(false);
67 }
68
69 /// Enable the outputting of the trace logs
70 void enableTraceLogOutput()
71 {
72   CkpvAccess(_trace)->setWriteData(true);
73 }
74
75 /// Force the log files to be flushed
76 void flushTraceLog()
77 {
78   CkpvAccess(_trace)->traceFlushLog();
79 }
80 #endif
81
82 #if ! CMK_TRACE_ENABLED
83 static int warned=0;
84 #define OPTIMIZED_VERSION       \
85         if (!warned) { warned=1;        \
86         CmiPrintf("\n\n!!!! Warning: traceUserEvent not available in optimized version!!!!\n\n\n"); }
87 #else
88 #define OPTIMIZED_VERSION /*empty*/
89 #endif // CMK_TRACE_ENABLED
90
91 /*
92 On T3E, we need to have file number control by open/close files only when needed.
93 */
94 #if CMK_TRACE_LOGFILE_NUM_CONTROL
95   #define OPEN_LOG openLog("a");
96   #define CLOSE_LOG closeLog();
97 #else
98   #define OPEN_LOG
99   #define CLOSE_LOG
100 #endif //CMK_TRACE_LOGFILE_NUM_CONTROL
101
102 /**
103   For each TraceFoo module, _createTraceFoo() must be defined.
104   This function is called in _createTraces() generated in moduleInit.C
105 */
106 void _createTraceprojections(char **argv)
107 {
108   DEBUGF("%d createTraceProjections\n", CkMyPe());
109   CkpvInitialize(CkVec<char *>, usrEventlist);
110   CkpvInitialize(CkVec<UsrEvent *>*, usrEvents);
111   CkpvAccess(usrEvents) = new CkVec<UsrEvent *>();
112 #if CMK_BIGSIM_CHARM
113   // CthRegister does not call the constructor
114 //  CkpvAccess(usrEvents) = CkVec<UsrEvent *>();
115 #endif //CMK_BIGSIM_CHARM
116   CkpvInitialize(TraceProjections*, _trace);
117   CkpvAccess(_trace) = new  TraceProjections(argv);
118   CkpvAccess(_traces)->addTrace(CkpvAccess(_trace));
119   if (CkMyPe()==0) CkPrintf("Charm++: Tracemode Projections enabled.\n");
120 }
121  
122 /* ****** CW TEMPORARY LOCATION ***** Support for thread listeners */
123
124 struct TraceThreadListener {
125   struct CthThreadListener base;
126   int event;
127   int msgType;
128   int ep;
129   int srcPe;
130   int ml;
131   CmiObjId idx;
132 };
133
134 extern "C"
135 void traceThreadListener_suspend(struct CthThreadListener *l)
136 {
137   TraceThreadListener *a=(TraceThreadListener *)l;
138   /* here, we activate the appropriate trace codes for the appropriate
139      registered modules */
140   traceSuspend();
141 }
142
143 extern "C"
144 void traceThreadListener_resume(struct CthThreadListener *l) 
145 {
146   TraceThreadListener *a=(TraceThreadListener *)l;
147   /* here, we activate the appropriate trace codes for the appropriate
148      registered modules */
149   _TRACE_BEGIN_EXECUTE_DETAILED(a->event,a->msgType,a->ep,a->srcPe,a->ml,
150                                 CthGetThreadID(a->base.thread));
151   a->event=-1;
152   a->srcPe=CkMyPe(); /* potential lie to migrated threads */
153   a->ml=0;
154 }
155
156 extern "C"
157 void traceThreadListener_free(struct CthThreadListener *l) 
158 {
159   TraceThreadListener *a=(TraceThreadListener *)l;
160   delete a;
161 }
162
163 void TraceProjections::traceAddThreadListeners(CthThread tid, envelope *e)
164 {
165 #if CMK_TRACE_ENABLED
166   /* strip essential information from the envelope */
167   TraceThreadListener *a= new TraceThreadListener;
168   
169   a->base.suspend=traceThreadListener_suspend;
170   a->base.resume=traceThreadListener_resume;
171   a->base.free=traceThreadListener_free;
172   a->event=e->getEvent();
173   a->msgType=e->getMsgtype();
174   a->ep=e->getEpIdx();
175   a->srcPe=e->getSrcPe();
176   a->ml=e->getTotalsize();
177
178   CthAddListener(tid, (CthThreadListener *)a);
179 #endif
180 }
181
182 void LogPool::openLog(const char *mode)
183 {
184 #if CMK_PROJECTIONS_USE_ZLIB
185   if(compressed) {
186     if (nonDeltaLog) {
187       do {
188         zfp = gzopen(fname, mode);
189       } while (!zfp && (errno == EINTR || errno == EMFILE));
190       if(!zfp) CmiAbort("Cannot open Projections Compressed Non Delta Trace File for writing...\n");
191     }
192     if (deltaLog) {
193       do {
194         deltazfp = gzopen(dfname, mode);
195       } while (!deltazfp && (errno == EINTR || errno == EMFILE));
196       if (!deltazfp) 
197         CmiAbort("Cannot open Projections Compressed Delta Trace File for writing...\n");
198     }
199   } else {
200     if (nonDeltaLog) {
201       do {
202         fp = fopen(fname, mode);
203       } while (!fp && (errno == EINTR || errno == EMFILE));
204       if (!fp) {
205         CkPrintf("[%d] Attempting to open file [%s]\n",CkMyPe(),fname);
206         CmiAbort("Cannot open Projections Non Delta Trace File for writing...\n");
207       }
208     }
209     if (deltaLog) {
210       do {
211         deltafp = fopen(dfname, mode);
212       } while (!deltafp && (errno == EINTR || errno == EMFILE));
213       if (!deltafp) 
214         CmiAbort("Cannot open Projections Delta Trace File for writing...\n");
215     }
216   }
217 #else
218   if (nonDeltaLog) {
219     do {
220       fp = fopen(fname, mode);
221     } while (!fp && (errno == EINTR || errno == EMFILE));
222     if (!fp) {
223       CkPrintf("[%d] Attempting to open file [%s]\n",CkMyPe(),fname);
224       CmiAbort("Cannot open Projections Non Delta Trace File for writing...\n");
225     }
226   }
227   if (deltaLog) {
228     do {
229       deltafp = fopen(dfname, mode);
230     } while (!deltafp && (errno == EINTR || errno == EMFILE));
231     if(!deltafp) 
232       CmiAbort("Cannot open Projections Delta Trace File for writing...\n");
233   }
234 #endif
235 }
236
237 void LogPool::closeLog(void)
238 {
239 #if CMK_PROJECTIONS_USE_ZLIB
240   if(compressed) {
241     if (nonDeltaLog) gzclose(zfp);
242     if (deltaLog) gzclose(deltazfp);
243     return;
244   }
245 #endif
246   if (nonDeltaLog) { 
247 #if !defined(_WIN32) || defined(__CYGWIN__)
248     fsync(fileno(fp)); 
249 #endif
250     fclose(fp); 
251   }
252   if (deltaLog)  { 
253 #if !defined(_WIN32) || defined(__CYGWIN__)
254     fsync(fileno(deltafp)); 
255 #endif
256     fclose(deltafp);  
257   }
258 }
259
260 LogPool::LogPool(char *pgm) {
261   pool = new LogEntry[CkpvAccess(CtrLogBufSize)];
262   // defaults to writing data (no outlier changes)
263   writeSummaryFiles = 0;
264   writeData = true;
265   numEntries = 0;
266   // **CW** for simple delta encoding
267   prevTime = 0.0;
268   timeErr = 0.0;
269   globalStartTime = 0.0;
270   globalEndTime = 0.0;
271   headerWritten = 0;
272   numPhases = 0;
273   hasFlushed = false;
274
275   keepPhase = NULL;
276
277   fileCreated = false;
278   poolSize = CkpvAccess(CtrLogBufSize);
279   pgmname = new char[strlen(pgm)+1];
280   strcpy(pgmname, pgm);
281
282   //statistic init
283   statisLastProcessTimer = 0;
284   statisLastIdleTimer = 0;
285   statisLastPackTimer = 0;
286   statisLastUnpackTimer = 0;
287   statisTotalExecutionTime  = 0;
288   statisTotalIdleTime = 0;
289   statisTotalPackTime = 0;
290   statisTotalUnpackTime = 0;
291   statisTotalCreationMsgs = 0;
292   statisTotalCreationBytes = 0;
293   statisTotalMCastMsgs = 0;
294   statisTotalMCastBytes = 0;
295   statisTotalEnqueueMsgs = 0;
296   statisTotalDequeueMsgs = 0;
297   statisTotalRecvMsgs = 0;
298   statisTotalRecvBytes = 0;
299   statisTotalMemAlloc = 0;
300   statisTotalMemFree = 0;
301
302 }
303
304 void LogPool::createFile(const char *fix)
305 {
306   if (fileCreated) {
307     return;
308   }
309   
310   if(CmiNumPartitions() > 1) {
311     CmiMkdir(CkpvAccess(partitionRoot));
312   }
313
314   char* filenameLastPart = strrchr(pgmname, PATHSEP) + 1; // Last occurrence of path separator
315   char *pathPlusFilePrefix = new char[1024];
316  
317   if(nSubdirs > 0){
318     int sd = CkMyPe() % nSubdirs;
319     char *subdir = new char[1024];
320     sprintf(subdir, "%s.projdir.%d", pgmname, sd);
321     CmiMkdir(subdir);
322     sprintf(pathPlusFilePrefix, "%s%c%s%s", subdir, PATHSEP, filenameLastPart, fix);
323     delete[] subdir;
324   } else {
325     sprintf(pathPlusFilePrefix, "%s%s", pgmname, fix);
326   }
327
328   char pestr[10];
329   sprintf(pestr, "%d", CkMyPe());
330 #if CMK_PROJECTIONS_USE_ZLIB
331   int len;
332   if(compressed)
333     len = strlen(pathPlusFilePrefix)+strlen(".logold")+strlen(pestr)+strlen(".gz")+3;
334   else
335     len = strlen(pathPlusFilePrefix)+strlen(".logold")+strlen(pestr)+3;
336 #else
337   int len = strlen(pathPlusFilePrefix)+strlen(".logold")+strlen(pestr)+3;
338 #endif
339
340   if (nonDeltaLog) {
341     fname = new char[len];
342   }
343   if (deltaLog) {
344     dfname = new char[len];
345   }
346 #if CMK_PROJECTIONS_USE_ZLIB
347   if(compressed) {
348     if (deltaLog && nonDeltaLog) {
349       sprintf(fname, "%s.%s.logold.gz",  pathPlusFilePrefix, pestr);
350       sprintf(dfname, "%s.%s.log.gz", pathPlusFilePrefix, pestr);
351     } else {
352       if (nonDeltaLog) {
353         sprintf(fname, "%s.%s.log.gz", pathPlusFilePrefix,pestr);
354       } else {
355         sprintf(dfname, "%s.%s.log.gz", pathPlusFilePrefix, pestr);
356       }
357     }
358   } else {
359     if (deltaLog && nonDeltaLog) {
360       sprintf(fname, "%s.%s.logold", pathPlusFilePrefix, pestr);
361       sprintf(dfname, "%s.%s.log", pathPlusFilePrefix, pestr);
362     } else {
363       if (nonDeltaLog) {
364         sprintf(fname, "%s.%s.log", pathPlusFilePrefix, pestr);
365       } else {
366         sprintf(dfname, "%s.%s.log", pathPlusFilePrefix, pestr);
367       }
368     }
369   }
370 #else
371   if (deltaLog && nonDeltaLog) {
372     sprintf(fname, "%s.%s.logold", pathPlusFilePrefix, pestr);
373     sprintf(dfname, "%s.%s.log", pathPlusFilePrefix, pestr);
374   } else {
375     if (nonDeltaLog) {
376       sprintf(fname, "%s.%s.log", pathPlusFilePrefix, pestr);
377     } else {
378       sprintf(dfname, "%s.%s.log", pathPlusFilePrefix, pestr);
379     }
380   }
381 #endif
382   fileCreated = true;
383   delete[] pathPlusFilePrefix;
384   openLog("w");
385   CLOSE_LOG 
386 }
387
388 void LogPool::createSts(const char *fix)
389 {
390   CkAssert(CkMyPe() == 0);
391   if(CmiNumPartitions() > 1) {
392     CmiMkdir(CkpvAccess(partitionRoot));
393   }
394
395   // create the sts file
396   char *fname = new char[strlen(CkpvAccess(traceRoot))+strlen(fix)+strlen(".sts")+2];
397   sprintf(fname, "%s%s.sts", CkpvAccess(traceRoot), fix);
398   do
399     {
400       stsfp = fopen(fname, "w");
401     } while (!stsfp && (errno == EINTR || errno == EMFILE));
402   if(stsfp==0){
403     CmiPrintf("Cannot open projections sts file for writing due to %s\n", strerror(errno));
404     CmiAbort("Error!!\n");
405   }
406   delete[] fname;
407 }  
408
409 void LogPool::createTopo(const char *fix)
410 {
411   CkAssert(CkMyPe() == 0);
412   // create the topo file
413   char *fname = new char[strlen(CkpvAccess(traceRoot))+strlen(fix)+strlen(".topo")+2];
414   sprintf(fname, "%s%s.topo", CkpvAccess(traceRoot), fix);
415   do
416     {
417       topofp = fopen(fname, "w");
418     } while (!stsfp && (errno == EINTR || errno == EMFILE));
419   if(stsfp==0){
420     CmiPrintf("Cannot open projections topo file for writing due to %s\n", strerror(errno));
421     CmiAbort("Error!!\n");
422   }
423   delete[] fname;
424 }  
425
426 void LogPool::createRC()
427 {
428   // create the projections rc file.
429   fname = 
430     new char[strlen(CkpvAccess(traceRoot))+strlen(".projrc")+1];
431   sprintf(fname, "%s.projrc", CkpvAccess(traceRoot));
432   do {
433     rcfp = fopen(fname, "w");
434   } while (!rcfp && (errno == EINTR || errno == EMFILE));
435   if (rcfp==0) {
436     CmiAbort("Cannot open projections configuration file for writing.\n");
437   }
438   delete[] fname;
439 }
440
441 LogPool::~LogPool() 
442 {
443   if (writeData) {
444       if(writeSummaryFiles)
445           writeStatis();
446     writeLog();
447 #if !CMK_TRACE_LOGFILE_NUM_CONTROL
448     closeLog();
449 #endif
450   }
451
452 #if CMK_BIGSIM_CHARM
453   extern int correctTimeLog;
454   if (correctTimeLog) {
455     createFile("-bg");
456     if (CkMyPe() == 0) {
457       createSts("-bg");
458     }
459     writeHeader();
460     if (CkMyPe() == 0) writeSts(NULL);
461     postProcessLog();
462   }
463 #endif
464
465   delete[] pool;
466   delete [] fname;
467 }
468
469 void LogPool::writeHeader()
470 {
471   if (headerWritten) return;
472   headerWritten = 1;
473   if(!binary) {
474 #if CMK_PROJECTIONS_USE_ZLIB
475     if(compressed) {
476       if (nonDeltaLog) {
477         gzprintf(zfp, "PROJECTIONS-RECORD %d\n", numEntries);
478       }
479       if (deltaLog) {
480         gzprintf(deltazfp, "PROJECTIONS-RECORD %d DELTA\n", numEntries);
481       }
482     } 
483     else /* else clause is below... */
484 #endif
485     /*... may hang over from else above */ {
486       if (nonDeltaLog) {
487         fprintf(fp, "PROJECTIONS-RECORD %d\n", numEntries);
488       }
489       if (deltaLog) {
490         fprintf(deltafp, "PROJECTIONS-RECORD %d DELTA\n", numEntries);
491       }
492     }
493   }
494   else { // binary
495       if (nonDeltaLog) {
496         fwrite(&numEntries,sizeof(numEntries),1,fp);
497       }
498       if (deltaLog) {
499         fwrite(&numEntries,sizeof(numEntries),1,deltafp);
500       }
501   }
502 }
503
504 void LogPool::writeLog(void)
505 {
506   createFile();
507   OPEN_LOG
508   writeHeader();
509   if (nonDeltaLog) write(0);
510   if (deltaLog) write(1);
511   CLOSE_LOG
512 }
513
514 void LogPool::write(int writedelta) 
515 {
516   // **CW** Simple delta encoding implementation
517   // prevTime has to be maintained as an object variable because
518   // LogPool::write may be called several times depending on the
519   // +logsize value.
520   PUP::er *p = NULL;
521   if (binary) {
522     p = new PUP::toDisk(writedelta?deltafp:fp);
523   }
524 #if CMK_PROJECTIONS_USE_ZLIB
525   else if (compressed) {
526     p = new toProjectionsGZFile(writedelta?deltazfp:zfp);
527   }
528 #endif
529   else {
530     p = new toProjectionsFile(writedelta?deltafp:fp);
531   }
532   CmiAssert(p);
533   int curPhase = 0;
534   // **FIXME** - Should probably consider a more sophisticated bounds-based
535   //   approach for selective writing instead of making multiple if-checks
536   //   for every single event.
537   for(UInt i=0; i<numEntries; i++) {
538     if (!writedelta) {
539       if (keepPhase == NULL) {
540         // default case, when no phase selection is required.
541         pool[i].pup(*p);
542       } else {
543         // **FIXME** Might be a good idea to create a "filler" event block for
544         //   all the events taken out by phase filtering.
545         if (pool[i].type == END_PHASE) {
546           // always write phase markers
547           pool[i].pup(*p);
548           curPhase++;
549         } else if (pool[i].type == BEGIN_COMPUTATION ||
550                    pool[i].type == END_COMPUTATION) {
551           // always write BEGIN and END COMPUTATION markers
552           pool[i].pup(*p);
553         } else if (keepPhase[curPhase]) {
554           pool[i].pup(*p);
555         }
556       }
557     }
558     else {      // delta
559       // **FIXME** Implement phase-selective writing for delta logs
560       //   eventually
561       double time = pool[i].time;
562       if (pool[i].type != BEGIN_COMPUTATION && pool[i].type != END_COMPUTATION)
563       {
564         double timeDiff = (time-prevTime)*1.0e6;
565         UInt intTimeDiff = (UInt)timeDiff;
566         timeErr += timeDiff - intTimeDiff; /* timeErr is never >= 2.0 */
567         if (timeErr > 1.0) {
568           timeErr -= 1.0;
569           intTimeDiff++;
570         }
571         pool[i].time = intTimeDiff/1.0e6;
572       }
573       pool[i].pup(*p);
574       pool[i].time = time;      // restore time value
575       prevTime = time;
576     }
577   }
578   delete p;
579   delete [] keepPhase;
580 }
581
582 void LogPool::writeSts(void)
583 {
584   // for whining compilers
585   int i;
586   // generate an automatic unique ID for each log
587   fprintf(stsfp, "PROJECTIONS_ID %s\n", "");
588   fprintf(stsfp, "VERSION %s\n", PROJECTION_VERSION);
589   fprintf(stsfp, "TOTAL_PHASES %d\n", numPhases);
590 #if CMK_HAS_COUNTER_PAPI
591   fprintf(stsfp, "TOTAL_PAPI_EVENTS %d\n", NUMPAPIEVENTS);
592   // for now, use i, next time use papiEvents[i].
593   // **CW** papi event names is a hack.
594   char eventName[PAPI_MAX_STR_LEN];
595   for (i=0;i<NUMPAPIEVENTS;i++) {
596     PAPI_event_code_to_name(papiEvents[i], eventName);
597     fprintf(stsfp, "PAPI_EVENT %d %s\n", i, eventName);
598   }
599 #endif
600   traceWriteSTS(stsfp,CkpvAccess(usrEvents)->length());
601   for(i=0;i<CkpvAccess(usrEvents)->length();i++){
602     fprintf(stsfp, "EVENT %d %s\n", (*CkpvAccess(usrEvents))[i]->e, (*CkpvAccess(usrEvents))[i]->str);
603   }     
604 }
605
606 void LogPool::writeSts(TraceProjections *traceProj){
607   writeSts();
608   if (traceProj != NULL) {
609     CkHashtableIterator  *funcIter = traceProj->getfuncIterator();
610     funcIter->seekStart();
611     int numFuncs = traceProj->getFuncNumber();
612     fprintf(stsfp,"TOTAL_FUNCTIONS %d \n",numFuncs);
613     while(funcIter->hasNext()) {
614       StrKey *key;
615       int *obj = (int *)funcIter->next((void **)&key);
616       fprintf(stsfp,"FUNCTION %d %s \n",*obj,key->getStr());
617     }
618   }
619   fprintf(stsfp, "END\n");
620   fclose(stsfp);
621 }
622
623 void LogPool::writeRC(void)
624 {
625     //CkPrintf("write RC is being executed\n");
626 #ifdef PROJ_ANALYSIS  
627     CkAssert(CkMyPe() == 0);
628     fprintf(rcfp,"RC_GLOBAL_START_TIME %lld\n",
629             (CMK_PUP_LONG_LONG)(1.0e6*globalStartTime));
630     fprintf(rcfp,"RC_GLOBAL_END_TIME   %lld\n",
631             (CMK_PUP_LONG_LONG)(1.0e6*globalEndTime));
632     /* //Yanhua comment it because isOutlierAutomatic is not a variable in trace
633     if (CkpvAccess(_trace)->isOutlierAutomatic()) {
634       fprintf(rcfp,"RC_OUTLIER_FILTERED true\n");
635     } else {
636       fprintf(rcfp,"RC_OUTLIER_FILTERED false\n");
637     }
638     */
639 #endif //PROJ_ANALYSIS
640   fclose(rcfp);
641 }
642
643 void LogPool::writeTopo(void)
644 {
645   TopoManager tmgr;
646   tmgr.printAllocation(topofp);
647   fclose(topofp);
648 }
649
650 void LogPool::writeStatis(void)
651 {
652     
653    // create the statis file
654   char *fname = new char[strlen(CkpvAccess(traceRoot))+strlen(".statis")+10];
655   sprintf(fname, "%s.%d.statis", CkpvAccess(traceRoot), CkMyPe());
656   do
657   {
658       statisfp = fopen(fname, "w");
659   } while (!statisfp && (errno == EINTR || errno == EMFILE));
660   if(statisfp==0){
661       CmiPrintf("Cannot open projections statistic file for writing due to %s\n", strerror(errno));
662       CmiAbort("Error!!\n");
663   }
664   delete[] fname;
665   double totaltime = endComputationTime - beginComputationTime;
666   fprintf(statisfp, "time(sec) percentage\n");
667   fprintf(statisfp, "Time:    \t%f\n", totaltime);
668   fprintf(statisfp, "Idle :\t%f\t %.1f\n", statisTotalIdleTime, statisTotalIdleTime/totaltime * 100); 
669   fprintf(statisfp, "Overhead:    \t%f\t %.1f\n", totaltime - statisTotalIdleTime - statisTotalExecutionTime , (totaltime - statisTotalIdleTime - statisTotalExecutionTime)/totaltime * 100); 
670   fprintf(statisfp, "Exeuction:\t%f\t %.1f\n", statisTotalExecutionTime, statisTotalExecutionTime/totaltime*100); 
671   fprintf(statisfp, "Pack:     \t%f\t %.2f\n", statisTotalPackTime, statisTotalPackTime/totaltime*100); 
672   fprintf(statisfp, "Unpack:   \t%f\t %.2f\n", statisTotalUnpackTime, statisTotalUnpackTime/totaltime*100); 
673   fprintf(statisfp, "Creation Msgs Numbers, Bytes, Avg:   \t%lld\t %lld\t %lld \n", statisTotalCreationMsgs, statisTotalCreationBytes, (statisTotalCreationMsgs>0)?statisTotalCreationBytes/statisTotalCreationMsgs:statisTotalCreationMsgs); 
674   fprintf(statisfp, "Multicast Msgs Numbers, Bytes, Avg:   \t%lld\t %lld\t %lld \n", statisTotalMCastMsgs, statisTotalMCastBytes, (statisTotalMCastMsgs>0)?statisTotalMCastBytes/statisTotalMCastMsgs:statisTotalMCastMsgs); 
675   fprintf(statisfp, "Received Msgs Numbers, Bytes, Avg:   \t%lld\t %lld\t %lld \n", statisTotalRecvMsgs, statisTotalRecvBytes, (statisTotalRecvMsgs>0)?statisTotalRecvBytes/statisTotalRecvMsgs:statisTotalRecvMsgs); 
676   fclose(statisfp);
677 }
678
679 #if CMK_BIGSIM_CHARM
680 static void updateProjLog(void *data, double t, double recvT, void *ptr)
681 {
682   LogEntry *log = (LogEntry *)data;
683   FILE *fp = *(FILE **)ptr;
684   log->time = t;
685   log->recvTime = recvT<0.0?0:recvT;
686 //  log->write(fp);
687   toProjectionsFile p(fp);
688   log->pup(p);
689 }
690 #endif
691
692 // flush log entries to disk
693 void LogPool::flushLogBuffer()
694 {
695   if (numEntries) {
696     double writeTime = TraceTimer();
697     writeLog();
698     hasFlushed = true;
699     numEntries = 0;
700     new (&pool[numEntries++]) LogEntry(writeTime, BEGIN_INTERRUPT);
701     new (&pool[numEntries++]) LogEntry(TraceTimer(), END_INTERRUPT);
702     //CkPrintf("Warning: Projections log flushed to disk on PE %d.\n", CkMyPe());
703     if (!traceProjectionsGID.isZero()) {    // report flushing events to PE 0
704       CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
705       bocProxy[0].flush_warning(CkMyPe());
706     }
707   }
708 }
709
710 void LogPool::add(UChar type, UShort mIdx, UShort eIdx,
711                   double time, int event, int pe, int ml, CmiObjId *id, 
712                   double recvT, double cpuT, int numPe)
713 {
714     switch(type)
715     {
716         case BEGIN_COMPUTATION:
717             beginComputationTime = time;
718             break;
719         case END_COMPUTATION:
720             endComputationTime = time;
721             break;
722         case CREATION:
723             statisTotalCreationMsgs++;
724             statisTotalCreationBytes += ml;
725             break;
726         case CREATION_MULTICAST:
727             statisTotalMCastMsgs++;
728             statisTotalMCastBytes += ml;
729             break;
730         case ENQUEUE:
731             statisTotalEnqueueMsgs++;
732             break;
733         case DEQUEUE:
734             statisTotalDequeueMsgs++;
735             break;
736         case MESSAGE_RECV:
737             statisTotalRecvMsgs++;
738             statisTotalRecvBytes += ml;
739             break;
740         case MEMORY_MALLOC:
741             statisTotalMemAlloc++;
742             break;
743         case MEMORY_FREE:
744             statisTotalMemFree++;
745             break;
746         case BEGIN_PROCESSING:
747             statisLastProcessTimer = time;
748             break;
749         case BEGIN_UNPACK:
750             statisLastUnpackTimer = time;
751             break;
752         case BEGIN_PACK:
753             statisLastPackTimer = time;
754             break;
755         case BEGIN_IDLE:
756             statisLastIdleTimer = time;
757             break;
758         case END_PROCESSING:
759             statisTotalExecutionTime += (time - statisLastProcessTimer);
760             break;
761         case END_UNPACK:
762             statisTotalUnpackTime += (time - statisLastUnpackTimer);
763             break;
764         case END_PACK:
765             statisTotalPackTime += (time - statisLastPackTimer);
766             break;
767         case END_IDLE:
768             statisTotalIdleTime += (time - statisLastIdleTimer);
769             break;
770         default:
771             break;
772     }
773   new (&pool[numEntries++])
774     LogEntry(time, type, mIdx, eIdx, event, pe, ml, id, recvT, cpuT, numPe);
775   if ((type == END_PHASE) || (type == END_COMPUTATION)) {
776     numPhases++;
777   }
778   if(poolSize==numEntries) {
779     flushLogBuffer();
780 #if CMK_BIGSIM_CHARM
781     extern int correctTimeLog;
782     if (correctTimeLog) CmiAbort("I/O interrupt!\n");
783 #endif
784   }
785 #if CMK_BIGSIM_CHARM
786   switch (type) {
787     case BEGIN_PROCESSING:
788       pool[numEntries-1].recvTime = BgGetRecvTime();
789     case END_PROCESSING:
790     case BEGIN_COMPUTATION:
791     case END_COMPUTATION:
792     case CREATION:
793     case BEGIN_PACK:
794     case END_PACK:
795     case BEGIN_UNPACK:
796     case END_UNPACK:
797     case USER_EVENT_PAIR:
798       bgAddProjEvent(&pool[numEntries-1], numEntries-1, time, updateProjLog, &fp, BG_EVENT_PROJ);
799   }
800 #endif
801 }
802
803 void LogPool::add(UChar type,double time,UShort funcID,int lineNum,char *fileName){
804 #ifndef CMK_BIGSIM_CHARM
805   new (&pool[numEntries++])
806         LogEntry(time,type,funcID,lineNum,fileName);
807   if(poolSize == numEntries){
808     flushLogBuffer();
809   }
810 #endif  
811 }
812
813
814   
815 void LogPool::addMemoryUsage(unsigned char type,double time,double memUsage){
816 #ifndef CMK_BIGSIM_CHARM
817   new (&pool[numEntries++])
818         LogEntry(type,time,memUsage);
819   if(poolSize == numEntries){
820     flushLogBuffer();
821   }
822 #endif  
823         
824 }  
825
826
827
828 void LogPool::addUserSupplied(int data){
829         // add an event
830         add(USER_SUPPLIED, 0, 0, TraceTimer(), -1, -1, 0, 0, 0, 0, 0 );
831
832         // set the user supplied value for the previously created event 
833         pool[numEntries-1].setUserSuppliedData(data);
834   }
835
836
837 void LogPool::addUserSuppliedNote(char *note){
838         // add an event
839         add(USER_SUPPLIED_NOTE, 0, 0, TraceTimer(), -1, -1, 0, 0, 0, 0, 0 );
840
841         // set the user supplied note for the previously created event 
842         pool[numEntries-1].setUserSuppliedNote(note);
843   }
844
845 void LogPool::addUserSuppliedBracketedNote(char *note, int eventID, double bt, double et){
846   //CkPrintf("LogPool::addUserSuppliedBracketedNote eventID=%d\n", eventID);
847 #ifndef CMK_BIGSIM_CHARM
848 #if MPI_TRACE_MACHINE_HACK
849   //This part of code is used  to combine the contiguous
850   //MPI_Test and MPI_Iprobe events to reduce the number of
851   //entries
852 #define MPI_TEST_EVENT_ID 60
853 #define MPI_IPROBE_EVENT_ID 70 
854   int lastEvent = pool[numEntries-1].event;
855   if((eventID==MPI_TEST_EVENT_ID || eventID==MPI_IPROBE_EVENT_ID) && (eventID==lastEvent)){
856     //just replace the endtime of last event
857     //CkPrintf("addUserSuppliedBracketNote: for event %d\n", lastEvent);
858     pool[numEntries].endTime = et;
859   }else{
860     new (&pool[numEntries++])
861       LogEntry(bt, et, USER_SUPPLIED_BRACKETED_NOTE, note, eventID);
862   }
863 #else
864   new (&pool[numEntries++])
865     LogEntry(bt, et, USER_SUPPLIED_BRACKETED_NOTE, note, eventID);
866 #endif
867   if(poolSize == numEntries){
868     flushLogBuffer();
869   }
870 #endif  
871 }
872
873
874 /* **CW** Not sure if this is the right thing to do. Feels more like
875    a hack than a solution to Sameer's request to add the destination
876    processor information to multicasts and broadcasts.
877
878    In the unlikely event this method is used for Broadcasts as well,
879    pelist == NULL will be used to indicate a global broadcast with 
880    num PEs.
881 */
882 void LogPool::addCreationMulticast(UShort mIdx, UShort eIdx, double time,
883                                    int event, int pe, int ml, CmiObjId *id,
884                                    double recvT, int numPe, int *pelist)
885 {
886   new (&pool[numEntries++])
887     LogEntry(time, mIdx, eIdx, event, pe, ml, id, recvT, numPe, pelist);
888   if(poolSize==numEntries) {
889     flushLogBuffer();
890   }
891 }
892
893 void LogPool::postProcessLog()
894 {
895 #if CMK_BIGSIM_CHARM
896   bgUpdateProj(1);   // event type
897 #endif
898 }
899
900 void LogPool::modLastEntryTimestamp(double ts)
901 {
902   pool[numEntries-1].time = ts;
903   //pool[numEntries-1].cputime = ts;
904 }
905
906 // /** Constructor for a multicast log entry */
907 // 
908 //  THIS WAS MOVED TO trace-projections.h with the other constructors
909 // 
910 // LogEntry::LogEntry(double tm, unsigned short m, unsigned short e, int ev, int p,
911 //           int ml, CmiObjId *d, double rt, int numPe, int *pelist) 
912 // {
913 //     type = CREATION_MULTICAST; mIdx = m; eIdx = e; event = ev; pe = p; time = tm; msglen = ml;
914 //     if (d) id = *d; else {id.id[0]=id.id[1]=id.id[2]=id.id[3]=-1; };
915 //     recvTime = rt; 
916 //     numpes = numPe;
917 //     userSuppliedNote = NULL;
918 //     if (pelist != NULL) {
919 //      pes = new int[numPe];
920 //      for (int i=0; i<numPe; i++) {
921 //        pes[i] = pelist[i];
922 //      }
923 //     } else {
924 //      pes= NULL;
925 //     }
926 // }
927
928 //void LogEntry::addPapi(LONG_LONG_PAPI *papiVals)
929 //{
930 //#if CMK_HAS_COUNTER_PAPI
931 //   memcpy(papiValues, papiVals, sizeof(LONG_LONG_PAPI)*NUMPAPIEVENTS);
932 //#endif
933 //}
934
935
936
937 void LogEntry::pup(PUP::er &p)
938 {
939   int i;
940   CMK_TYPEDEF_UINT8 itime, iEndTime, irecvtime, icputime;
941   char ret = '\n';
942
943   p|type;
944   if (p.isPacking()) itime = (CMK_TYPEDEF_UINT8)(1.0e6*time);
945   if (p.isPacking()) iEndTime = (CMK_TYPEDEF_UINT8)(1.0e6*endTime);
946
947   switch (type) {
948     case USER_EVENT:
949     case USER_EVENT_PAIR:
950       p|mIdx; p|itime; p|event; p|pe;
951       break;
952     case BEGIN_IDLE:
953     case END_IDLE:
954     case BEGIN_PACK:
955     case END_PACK:
956     case BEGIN_UNPACK:
957     case END_UNPACK:
958       p|itime; p|pe; 
959       break;
960     case BEGIN_PROCESSING:
961       if (p.isPacking()) {
962         irecvtime = (CMK_TYPEDEF_UINT8)(recvTime==-1?-1:1.0e6*recvTime);
963         icputime = (CMK_TYPEDEF_UINT8)(1.0e6*cputime);
964       }
965       p|mIdx; p|eIdx; p|itime; p|event; p|pe; 
966       p|msglen; p|irecvtime; 
967       p|id.id[0]; p|id.id[1]; p|id.id[2]; p|id.id[3];
968       p|icputime;
969 #if CMK_HAS_COUNTER_PAPI
970       //p|numPapiEvents;
971       for (i=0; i<NUMPAPIEVENTS; i++) {
972         // not yet!!!
973         //      p|papiIDs[i]; 
974         p|CkpvAccess(papiValues)[i];
975         
976       }
977 #else
978       //p|numPapiEvents;     // non papi version has value 0
979 #endif
980       if (p.isUnpacking()) {
981         recvTime = irecvtime/1.0e6;
982         cputime = icputime/1.0e6;
983       }
984       break;
985     case END_PROCESSING:
986       if (p.isPacking()) icputime = (CMK_TYPEDEF_UINT8)(1.0e6*cputime);
987       p|mIdx; p|eIdx; p|itime; p|event; p|pe; p|msglen; p|icputime;
988 #if CMK_HAS_COUNTER_PAPI
989       //p|numPapiEvents;
990       for (i=0; i<NUMPAPIEVENTS; i++) {
991         // not yet!!!
992         //      p|papiIDs[i];
993         p|CkpvAccess(papiValues)[i];
994       }
995 #else
996       //p|numPapiEvents;  // non papi version has value 0
997 #endif
998       if (p.isUnpacking()) cputime = icputime/1.0e6;
999       break;
1000     case USER_SUPPLIED:
1001           p|userSuppliedData;
1002           p|itime;
1003         break;
1004     case USER_SUPPLIED_NOTE:
1005           p|itime;
1006           int length;
1007           if (p.isPacking()) length = strlen(userSuppliedNote);
1008           p | length;
1009           char space;
1010           space = ' ';
1011           p | space;
1012           if (p.isUnpacking()) {
1013             userSuppliedNote = new char[length+1];
1014             userSuppliedNote[length] = '\0';
1015           }
1016           PUParray(p,userSuppliedNote, length);
1017           break;
1018     case USER_SUPPLIED_BRACKETED_NOTE:
1019       //CkPrintf("Writting out a USER_SUPPLIED_BRACKETED_NOTE\n");
1020           p|itime;
1021           p|iEndTime;
1022           p|event;
1023           int length2;
1024           if (p.isPacking()) length2 = strlen(userSuppliedNote);
1025           p | length2;
1026           char space2;
1027           space2 = ' ';
1028           p | space2;
1029           if (p.isUnpacking()) {
1030             userSuppliedNote = new char[length+1];
1031             userSuppliedNote[length] = '\0';
1032           }
1033           PUParray(p,userSuppliedNote, length2);
1034           break;
1035     case MEMORY_USAGE_CURRENT:
1036       p | memUsage;
1037       p | itime;
1038         break;
1039     case CREATION:
1040       if (p.isPacking()) irecvtime = (CMK_TYPEDEF_UINT8)(1.0e6*recvTime);
1041       p|mIdx; p|eIdx; p|itime;
1042       p|event; p|pe; p|msglen; p|irecvtime;
1043       if (p.isUnpacking()) recvTime = irecvtime/1.0e6;
1044       break;
1045     case CREATION_BCAST:
1046       if (p.isPacking()) irecvtime = (CMK_TYPEDEF_UINT8)(1.0e6*recvTime);
1047       p|mIdx; p|eIdx; p|itime;
1048       p|event; p|pe; p|msglen; p|irecvtime; p|numpes;
1049       if (p.isUnpacking()) recvTime = irecvtime/1.0e6;
1050       break;
1051     case CREATION_MULTICAST:
1052       if (p.isPacking()) irecvtime = (CMK_TYPEDEF_UINT8)(1.0e6*recvTime);
1053       p|mIdx; p|eIdx; p|itime;
1054       p|event; p|pe; p|msglen; p|irecvtime; p|numpes;
1055       if (p.isUnpacking()) pes = numpes?new int[numpes]:NULL;
1056       for (i=0; i<numpes; i++) p|pes[i];
1057       if (p.isUnpacking()) recvTime = irecvtime/1.0e6;
1058       break;
1059     case MESSAGE_RECV:
1060       p|mIdx; p|eIdx; p|itime; p|event; p|pe; p|msglen;
1061       break;
1062
1063     case ENQUEUE:
1064     case DEQUEUE:
1065       p|mIdx; p|itime; p|event; p|pe;
1066       break;
1067
1068     case BEGIN_INTERRUPT:
1069     case END_INTERRUPT:
1070       p|itime; p|event; p|pe;
1071       break;
1072
1073       // **CW** absolute timestamps are used here to support a quick
1074       // way of determining the total time of a run in projections
1075       // visualization.
1076     case BEGIN_COMPUTATION:
1077     case END_COMPUTATION:
1078     case BEGIN_TRACE:
1079     case END_TRACE:
1080       p|itime;
1081       break;
1082     case BEGIN_FUNC:
1083         p | itime;
1084         p | mIdx;
1085         p | event;
1086         if(!p.isUnpacking()){
1087                 p(fName,flen-1);
1088         }
1089         break;
1090     case END_FUNC:
1091         p | itime;
1092         p | mIdx;
1093         break;
1094     case END_PHASE:
1095       p|eIdx; // FIXME: actually the phase ID
1096       p|itime;
1097       break;
1098     default:
1099       CmiError("***Internal Error*** Wierd Event %d.\n", type);
1100       break;
1101   }
1102   if (p.isUnpacking()) time = itime/1.0e6;
1103   p|ret;
1104 }
1105
1106 TraceProjections::TraceProjections(char **argv): 
1107   curevent(0), inEntry(0), computationStarted(0), 
1108         converseExit(0), endTime(0.0), traceNestedEvents(0),
1109         currentPhaseID(0), lastPhaseEvent(NULL)
1110 {
1111   //  CkPrintf("Trace projections dummy constructor called on %d\n",CkMyPe());
1112
1113   if (CkpvAccess(traceOnPe) == 0) return;
1114
1115   CtvInitialize(int,curThreadEvent);
1116   CkpvInitialize(CmiInt8, CtrLogBufSize);
1117   CkpvAccess(CtrLogBufSize) = DefaultLogBufSize;
1118   CtvAccess(curThreadEvent)=0;
1119   if (CmiGetArgLongDesc(argv,"+logsize",&CkpvAccess(CtrLogBufSize), 
1120                        "Log entries to buffer per I/O")) {
1121     if (CkMyPe() == 0) {
1122       CmiPrintf("Trace: logsize: %ld\n", CkpvAccess(CtrLogBufSize));
1123     }
1124   }
1125   checknested = 
1126     CmiGetArgFlagDesc(argv,"+checknested",
1127                       "check projections nest begin end execute events");
1128   traceNestedEvents = 
1129     CmiGetArgFlagDesc(argv,"+tracenested",
1130               "trace projections nest begin/end execute events");
1131   int binary = 
1132     CmiGetArgFlagDesc(argv,"+binary-trace",
1133                       "Write log files in binary format");
1134
1135   CmiInt8 nSubdirs = 0;
1136   CmiGetArgLongDesc(argv,"+trace-subdirs", &nSubdirs, "Number of subdirectories into which traces will be written");
1137
1138
1139 #if CMK_PROJECTIONS_USE_ZLIB
1140   int compressed = true;
1141   CmiGetArgFlagDesc(argv,"+gz-trace","Write log files pre-compressed with gzip");
1142   int disableCompressed = CmiGetArgFlagDesc(argv,"+no-gz-trace","Disable writing log files pre-compressed with gzip");
1143   compressed = compressed && !disableCompressed;
1144 #else
1145   // consume the flag so there's no confusing
1146   CmiGetArgFlagDesc(argv,"+gz-trace",
1147                     "Write log files pre-compressed with gzip");
1148   if(CkMyPe() == 0) CkPrintf("Warning> gz-trace is not supported on this machine!\n");
1149 #endif
1150
1151   int writeSummaryFiles = CmiGetArgFlagDesc(argv,"+write-analysis-file","Enable writing summary files "); 
1152   
1153   // **CW** default to non delta log encoding. The user may choose to do
1154   // create both logs (for debugging) or just the old log timestamping
1155   // (for compatibility).
1156   // Generating just the non delta log takes precedence over generating
1157   // both logs (if both arguments appear on the command line).
1158
1159   // switch to OLD log format until everything works // Gengbin
1160   nonDeltaLog = 1;
1161   deltaLog = 0;
1162   deltaLog = CmiGetArgFlagDesc(argv, "+logDelta",
1163                                   "Generate Delta encoded and simple timestamped log files");
1164
1165   _logPool = new LogPool(CkpvAccess(traceRoot));
1166   _logPool->setNumSubdirs(nSubdirs);
1167   _logPool->setBinary(binary);
1168   _logPool->setWriteSummaryFiles(writeSummaryFiles);
1169 #if CMK_PROJECTIONS_USE_ZLIB
1170   _logPool->setCompressed(compressed);
1171 #endif
1172   if (CkMyPe() == 0) {
1173     _logPool->createSts();
1174     _logPool->createRC();
1175     _logPool->createTopo();
1176   }
1177   funcCount=1;
1178
1179 #if CMK_HAS_COUNTER_PAPI
1180   initPAPI();
1181 #endif
1182 }
1183
1184 int TraceProjections::traceRegisterUserEvent(const char* evt, int e)
1185 {
1186   OPTIMIZED_VERSION
1187   CkAssert(e==-1 || e>=0);
1188   CkAssert(evt != NULL);
1189   int event;
1190   int biggest = -1;
1191   for (int i=0; i<CkpvAccess(usrEvents)->length(); i++) {
1192     int cur = (*CkpvAccess(usrEvents))[i]->e;
1193     if (cur == e) {
1194       //CmiPrintf("%s %s\n", (*CkpvAccess(usrEvents))[i]->str, evt);
1195       if (strcmp((*CkpvAccess(usrEvents))[i]->str, evt) == 0) 
1196         return e;
1197       else
1198         CmiAbort("UserEvent double registered!");
1199     }
1200     if (cur > biggest) biggest = cur;
1201   }
1202   // if no user events have so far been registered. biggest will be -1
1203   // and hence newly assigned event numbers will begin from 0.
1204   if (e==-1) event = biggest+1;  // automatically assign new event number
1205   else event = e;
1206   CkpvAccess(usrEvents)->push_back(new UsrEvent(event,(char *)evt));
1207   return event;
1208 }
1209
1210 void TraceProjections::traceClearEps(void)
1211 {
1212   // In trace-summary, this zeros out the EP bins, to eliminate noise
1213   // from startup.  Here, this isn't useful, since we can do that in
1214   // post-processing
1215 }
1216
1217 void TraceProjections::traceWriteSts(void)
1218 {
1219   if(CkMyPe()==0)
1220     _logPool->writeSts(this);
1221 }
1222
1223 /** 
1224  * **IMPT NOTES**:
1225  *
1226  * This is called when Converse closes during ConverseCommonExit().
1227  * **FIXME**(?) - is this also exposed as a tracing-framework API call?
1228  *
1229  * Some programs bypass CkExit() (like NAMD, which eventually calls
1230  * ConverseExit()), modules like traces will have to pretend to shutdown
1231  * as if CkExit() was called but at the same time avoid making
1232  * subsequent CkExit() calls (which is usually required for allowing
1233  * other modules to shutdown).
1234  *
1235  * Note that we can only get here if CkExit() was not called, since the
1236  * trace module will un-register itself from TraceArray if it did.
1237  *
1238  */
1239 void TraceProjections::traceClose(void)
1240 {
1241 #ifdef PROJ_ANALYSIS
1242   // CkPrintf("CkExit was not called on shutdown on [%d]\n", CkMyPe());
1243
1244   // sets the flag that tells the code not to make the CkExit call later
1245   converseExit = 1;
1246   if (CkMyPe() == 0) {
1247     CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1248     bocProxy.traceProjectionsParallelShutdown(-1);
1249   }
1250   if(CkMyRank() == CkMyNodeSize()){ //communication thread
1251     CkpvAccess(_trace)->endComputation();
1252     delete _logPool;              // will write
1253     // remove myself from traceArray so that no tracing will be called.
1254     CkpvAccess(_traces)->removeTrace(this);
1255   }
1256 #else
1257   // we've already deleted the logpool, so multiple calls to traceClose
1258   // are tolerated.
1259   if (_logPool == NULL) {
1260     return;
1261   }
1262   if(CkMyPe()==0){
1263     _logPool->writeSts(this);
1264   }
1265   CkpvAccess(_trace)->endComputation();
1266   delete _logPool;              // will write
1267   // remove myself from traceArray so that no tracing will be called.
1268   CkpvAccess(_traces)->removeTrace(this);
1269
1270   CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1271   bocProxy.ckLocalBranch()->print_warning();
1272 #endif
1273 }
1274
1275 /**
1276  *  **IMPT NOTES**:
1277  *
1278  *  This is meant to be called internally by the tracing framework.
1279  *
1280  */
1281 void TraceProjections::closeTrace() {
1282   //  CkPrintf("Close Trace called on [%d]\n", CkMyPe());
1283   if (CkMyPe() == 0) {
1284     // CkPrintf("Pe 0 will now write sts and projrc files\n");
1285     _logPool->writeSts(this);
1286     _logPool->writeRC();
1287     _logPool->writeTopo();
1288     // CkPrintf("Pe 0 has now written sts and projrc files\n");
1289
1290     CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1291     bocProxy.ckLocalBranch()->print_warning();
1292   }
1293   delete _logPool;       // will write logs to file
1294
1295 }
1296
1297 #if CMK_SMP_TRACE_COMMTHREAD
1298 void TraceProjections::traceBeginOnCommThread()
1299 {
1300   if (!computationStarted) return;
1301   _logPool->add(BEGIN_TRACE, 0, 0, TraceTimer(), curevent++, CmiNumPes()+CmiMyNode());
1302 }
1303
1304 void TraceProjections::traceEndOnCommThread()
1305 {
1306   _logPool->add(END_TRACE, 0, 0, TraceTimer(), curevent++, CmiNumPes()+CmiMyNode());
1307 }
1308 #endif
1309
1310 void TraceProjections::traceBegin(void)
1311 {
1312   if (!computationStarted) return;
1313   _logPool->add(BEGIN_TRACE, 0, 0, TraceTimer(), curevent++, CkMyPe());
1314 }
1315
1316 void TraceProjections::traceEnd(void)
1317 {
1318   _logPool->add(END_TRACE, 0, 0, TraceTimer(), curevent++, CkMyPe());
1319 }
1320
1321 void TraceProjections::userEvent(int e)
1322 {
1323   if (!computationStarted) return;
1324   _logPool->add(USER_EVENT, e, 0, TraceTimer(),curevent++,CkMyPe());
1325 }
1326
1327 void TraceProjections::userBracketEvent(int e, double bt, double et)
1328 {
1329   if (!computationStarted) return;
1330   // two events record Begin/End of event e.
1331   _logPool->add(USER_EVENT_PAIR, e, 0, TraceTimer(bt), curevent, CkMyPe());
1332   _logPool->add(USER_EVENT_PAIR, e, 0, TraceTimer(et), curevent++, CkMyPe());
1333 }
1334
1335 void TraceProjections::appWork(int e, double bt, double et)
1336 {
1337 }
1338
1339 void TraceProjections::userSuppliedData(int d)
1340 {
1341   if (!computationStarted) return;
1342   _logPool->addUserSupplied(d);
1343 }
1344
1345 void TraceProjections::userSuppliedNote(char *note)
1346 {
1347   if (!computationStarted) return;
1348   _logPool->addUserSuppliedNote(note);
1349 }
1350
1351
1352 void TraceProjections::userSuppliedBracketedNote(char *note, int eventID, double bt, double et)
1353 {
1354   if (!computationStarted) return;
1355   _logPool->addUserSuppliedBracketedNote(note,  eventID,  bt, et);
1356 }
1357
1358 void TraceProjections::memoryUsage(double m)
1359 {
1360   if (!computationStarted) return;
1361   _logPool->addMemoryUsage(MEMORY_USAGE_CURRENT, TraceTimer(), m );
1362   
1363 }
1364
1365
1366 void TraceProjections::creation(envelope *e, int ep, int num)
1367 {
1368   double curTime = TraceTimer();
1369   if (e == 0) {
1370     CtvAccess(curThreadEvent) = curevent;
1371     _logPool->add(CREATION, ForChareMsg, ep, curTime,
1372                   curevent++, CkMyPe(), 0, NULL, 0, 0.0);
1373   } else {
1374     int type=e->getMsgtype();
1375     e->setEvent(curevent);
1376     if (num > 1) {
1377       _logPool->add(CREATION_BCAST, type, ep, curTime,
1378                     curevent++, CkMyPe(), e->getTotalsize(), 
1379                     NULL, 0, 0.0, num);
1380     } else {
1381       _logPool->add(CREATION, type, ep, curTime,
1382                     curevent++, CkMyPe(), e->getTotalsize(), 
1383                     NULL, 0, 0.0);
1384     }
1385   }
1386 }
1387
1388 //This function is only called from a comm thread in SMP mode. 
1389 void TraceProjections::creation(char *msg)
1390 {
1391 #if CMK_SMP_TRACE_COMMTHREAD
1392         // msg must be a charm message
1393     envelope *e = (envelope *)msg;
1394     int ep = e->getEpIdx();
1395     if(_entryTable[ep]->traceEnabled) {
1396         creation(e, ep, 1);
1397         e->setSrcPe(CkMyPe());              // pretend I am the sender
1398     }
1399 #endif
1400 }
1401
1402 void TraceProjections::traceCommSetMsgID(char *msg)
1403 {
1404 #if CMK_SMP_TRACE_COMMTHREAD
1405     // msg must be a charm message
1406     envelope *e = (envelope *)msg;
1407     int ep = e->getEpIdx();
1408     if(_entryTable[ep]->traceEnabled) {
1409         e->setSrcPe(CkMyPe());              // pretend I am the sender
1410         e->setEvent(curevent);
1411     }
1412 #endif
1413 }
1414
1415 void TraceProjections::traceGetMsgID(char *msg, int *pe, int *event)
1416 {
1417     // msg must be a charm message
1418     *pe = *event = -1;
1419     envelope *e = (envelope *)msg;
1420     int ep = e->getEpIdx();
1421     if(_entryTable[ep]->traceEnabled) {
1422         *pe = e->getSrcPe();
1423         *event = e->getEvent();
1424     }
1425 }
1426
1427 void TraceProjections::traceSetMsgID(char *msg, int pe, int event)
1428 {
1429        // msg must be a charm message
1430     envelope *e = (envelope *)msg;
1431     int ep = e->getEpIdx();
1432     if(ep<=0 || ep>=_entryTable.size()) return;
1433     if (e->getSrcPe()<0 || e->getSrcPe()>=CkNumPes()+CkNumNodes()) return;
1434     if (e->getMsgtype()<=0 || e->getMsgtype()>=LAST_CK_ENVELOPE_TYPE) return;
1435     if(_entryTable[ep]->traceEnabled) {
1436         e->setSrcPe(pe);
1437         e->setEvent(event);
1438     }
1439 }
1440
1441 /* **CW** Non-disruptive attempt to add destination PE knowledge to
1442    Communication Library-specific Multicasts via new event 
1443    CREATION_MULTICAST.
1444 */
1445
1446 void TraceProjections::creationMulticast(envelope *e, int ep, int num,
1447                                          int *pelist)
1448 {
1449   double curTime = TraceTimer();
1450   if (e==0) {
1451     CtvAccess(curThreadEvent)=curevent;
1452     _logPool->addCreationMulticast(ForChareMsg, ep, curTime, curevent++,
1453                                    CkMyPe(), 0, 0, 0.0, num, pelist);
1454   } else {
1455     int type=e->getMsgtype();
1456     e->setEvent(curevent);
1457     _logPool->addCreationMulticast(type, ep, curTime, curevent++, CkMyPe(),
1458                                    e->getTotalsize(), 0, 0.0, num, pelist);
1459   }
1460 }
1461
1462 void TraceProjections::creationDone(int num)
1463 {
1464   // modified the creation done time of the last num log entries
1465   // FIXME: totally a hack
1466   double curTime = TraceTimer();
1467   int idx = _logPool->numEntries-1;
1468   while (idx >=0 && num >0 ) {
1469     LogEntry &log = _logPool->pool[idx];
1470     if ((log.type == CREATION) ||
1471         (log.type == CREATION_BCAST) ||
1472         (log.type == CREATION_MULTICAST)) {
1473       log.recvTime = curTime - log.time;
1474       num --;
1475     }
1476     idx--;
1477   }
1478 }
1479
1480 void TraceProjections::beginExecute(CmiObjId *tid)
1481 {
1482 #if CMK_HAS_COUNTER_PAPI
1483   if (PAPI_read(CkpvAccess(papiEventSet), CkpvAccess(papiValues)) != PAPI_OK) {
1484     CmiAbort("PAPI failed to read at begin execute!\n");
1485   }
1486 #endif
1487   if (checknested && inEntry) CmiAbort("Nested Begin Execute!\n");
1488   execEvent = CtvAccess(curThreadEvent);
1489   execEp = (-1);
1490   _logPool->add(BEGIN_PROCESSING,ForChareMsg,_threadEP, TraceTimer(),
1491                 execEvent,CkMyPe(), 0, tid);
1492 #if CMK_HAS_COUNTER_PAPI
1493   _logPool->addPapi(CkpvAccess(papiValues));
1494 #endif
1495   inEntry = 1;
1496 }
1497
1498 void TraceProjections::beginExecute(envelope *e)
1499 {
1500   if(e==0) {
1501 #if CMK_HAS_COUNTER_PAPI
1502     if (PAPI_read(CkpvAccess(papiEventSet), CkpvAccess(papiValues)) != PAPI_OK) {
1503       CmiAbort("PAPI failed to read at begin execute!\n");
1504     }
1505 #endif
1506     if (checknested && inEntry) CmiAbort("Nested Begin Execute!\n");
1507     execEvent = CtvAccess(curThreadEvent);
1508     execEp = (-1);
1509     _logPool->add(BEGIN_PROCESSING,ForChareMsg,_threadEP, TraceTimer(),
1510                   execEvent,CkMyPe(), 0, NULL, 0.0, TraceCpuTimer());
1511 #if CMK_HAS_COUNTER_PAPI
1512     _logPool->addPapi(CkpvAccess(papiValues));
1513 #endif
1514     inEntry = 1;
1515   } else {
1516     beginExecute(e->getEvent(),e->getMsgtype(),e->getEpIdx(),
1517                  e->getSrcPe(),e->getTotalsize());
1518   }
1519 }
1520
1521 void TraceProjections::beginExecute(char *msg){
1522 #if CMK_SMP_TRACE_COMMTHREAD
1523         //This function is called from comm thread in SMP mode
1524     envelope *e = (envelope *)msg;
1525     int ep = e->getEpIdx();
1526     if(_entryTable[ep]->traceEnabled)
1527                 beginExecute(e);
1528 #endif
1529 }
1530
1531 void TraceProjections::beginExecute(int event, int msgType, int ep, int srcPe,
1532                                     int mlen, CmiObjId *idx)
1533 {
1534   if (traceNestedEvents) {
1535     if (! nestedEvents.isEmpty()) {
1536       endExecuteLocal();
1537     }
1538     nestedEvents.enq(NestedEvent(event, msgType, ep, srcPe, mlen, idx));
1539   }
1540   beginExecuteLocal(event, msgType, ep, srcPe, mlen, idx);
1541 }
1542
1543 void TraceProjections::changeLastEntryTimestamp(double ts)
1544 {
1545   _logPool->modLastEntryTimestamp(ts);
1546 }
1547
1548 void TraceProjections::beginExecuteLocal(int event, int msgType, int ep, int srcPe,
1549                                     int mlen, CmiObjId *idx)
1550 {
1551 #if CMK_HAS_COUNTER_PAPI
1552   if (PAPI_read(CkpvAccess(papiEventSet), CkpvAccess(papiValues)) != PAPI_OK) {
1553     CmiAbort("PAPI failed to read at begin execute!\n");
1554   }
1555 #endif
1556   if (checknested && inEntry) CmiAbort("Nested Begin Execute!\n");
1557   execEvent=event;
1558   execEp=ep;
1559   execPe=srcPe;
1560   _logPool->add(BEGIN_PROCESSING,msgType,ep, TraceTimer(),event,
1561                 srcPe, mlen, idx, 0.0, TraceCpuTimer());
1562 #if CMK_HAS_COUNTER_PAPI
1563   _logPool->addPapi(CkpvAccess(papiValues));
1564 #endif
1565   inEntry = 1;
1566 }
1567
1568 void TraceProjections::endExecute(void)
1569 {
1570   if (traceNestedEvents) nestedEvents.deq();
1571   endExecuteLocal();
1572   if (traceNestedEvents) {
1573     if (! nestedEvents.isEmpty()) {
1574       NestedEvent &ne = nestedEvents.peek();
1575       beginExecuteLocal(ne.event, ne.msgType, ne.ep, ne.srcPe, ne.ml, ne.idx);
1576     }
1577   }
1578 }
1579
1580 void TraceProjections::endExecute(char *msg)
1581 {
1582 #if CMK_SMP_TRACE_COMMTHREAD
1583         //This function is called from comm thread in SMP mode
1584     envelope *e = (envelope *)msg;
1585     int ep = e->getEpIdx();
1586     if(_entryTable[ep]->traceEnabled)
1587                 endExecute();
1588 #endif  
1589 }
1590
1591 void TraceProjections::endExecuteLocal(void)
1592 {
1593 #if CMK_HAS_COUNTER_PAPI
1594   if (PAPI_read(CkpvAccess(papiEventSet), CkpvAccess(papiValues)) != PAPI_OK) {
1595     CmiAbort("PAPI failed to read at end execute!\n");
1596   }
1597 #endif
1598   if (checknested && !inEntry) CmiAbort("Nested EndExecute!\n");
1599   double cputime = TraceCpuTimer();
1600   double now = TraceTimer();
1601   if(execEp == (-1)) {
1602     _logPool->add(END_PROCESSING, 0, _threadEP, now,
1603                   execEvent, CkMyPe(), 0, NULL, 0.0, cputime);
1604   } else {
1605     _logPool->add(END_PROCESSING, 0, execEp, now,
1606                   execEvent, execPe, 0, NULL, 0.0, cputime);
1607   }
1608 #if CMK_HAS_COUNTER_PAPI
1609   _logPool->addPapi(CkpvAccess(papiValues));
1610 #endif
1611   inEntry = 0;
1612 }
1613
1614 void TraceProjections::messageRecv(char *env, int pe)
1615 {
1616 #if 0
1617   envelope *e = (envelope *)env;
1618   int msgType = e->getMsgtype();
1619   int ep = e->getEpIdx();
1620 #if 0
1621   if (msgType==NewChareMsg || msgType==NewVChareMsg
1622           || msgType==ForChareMsg || msgType==ForVidMsg
1623           || msgType==BocInitMsg || msgType==NodeBocInitMsg
1624           || msgType==ForBocMsg || msgType==ForNodeBocMsg)
1625     ep = e->getEpIdx();
1626   else
1627     ep = _threadEP;
1628 #endif
1629   _logPool->add(MESSAGE_RECV, msgType, ep, TraceTimer(),
1630                 curevent++, e->getSrcPe(), e->getTotalsize());
1631 #endif
1632 }
1633
1634 void TraceProjections::beginIdle(double curWallTime)
1635 {
1636     _logPool->add(BEGIN_IDLE, 0, 0, TraceTimer(curWallTime), 0, CkMyPe());
1637 }
1638
1639 void TraceProjections::endIdle(double curWallTime)
1640 {
1641     _logPool->add(END_IDLE, 0, 0, TraceTimer(curWallTime), 0, CkMyPe());
1642 }
1643
1644 void TraceProjections::beginPack(void)
1645 {
1646     _logPool->add(BEGIN_PACK, 0, 0, TraceTimer(), 0, CkMyPe());
1647 }
1648
1649 void TraceProjections::endPack(void)
1650 {
1651     _logPool->add(END_PACK, 0, 0, TraceTimer(), 0, CkMyPe());
1652 }
1653
1654 void TraceProjections::beginUnpack(void)
1655 {
1656     _logPool->add(BEGIN_UNPACK, 0, 0, TraceTimer(), 0, CkMyPe());
1657 }
1658
1659 void TraceProjections::endUnpack(void)
1660 {
1661     _logPool->add(END_UNPACK, 0, 0, TraceTimer(), 0, CkMyPe());
1662 }
1663
1664 void TraceProjections::enqueue(envelope *) {}
1665
1666 void TraceProjections::dequeue(envelope *) {}
1667
1668 void TraceProjections::beginComputation(void)
1669 {
1670   computationStarted = 1;
1671   // Executes the callback function provided by the machine
1672   // layer. This is the proper method to register user events in a
1673   // machine layer because projections is a charm++ module.
1674   if (CkpvAccess(traceOnPe) != 0) {
1675     void (*ptr)() = registerMachineUserEvents();
1676     if (ptr != NULL) {
1677       ptr();
1678     }
1679   }
1680 //  CkpvAccess(traceInitTime) = TRACE_TIMER();
1681 //  CkpvAccess(traceInitCpuTime) = TRACE_CPUTIMER();
1682   _logPool->add(BEGIN_COMPUTATION, 0, 0, TraceTimer(), -1, -1);
1683 #if CMK_HAS_COUNTER_PAPI
1684   // we start the counters here
1685   if(CkpvAccess(papiStarted) == 0)
1686   {
1687       if (PAPI_start(CkpvAccess(papiEventSet)) != PAPI_OK) {
1688           CmiAbort("PAPI failed to start designated counters!\n");
1689       }
1690       CkpvAccess(papiStarted) = 1;
1691   }
1692 #endif
1693 }
1694
1695 void TraceProjections::endComputation(void)
1696 {
1697 #if CMK_HAS_COUNTER_PAPI
1698   // we stop the counters here. A silent failure is alright since we
1699   // are already at the end of the program.
1700   if(CkpvAccess(papiStopped) == 0) {
1701       if (PAPI_stop(CkpvAccess(papiEventSet), CkpvAccess(papiValues)) != PAPI_OK) {
1702           CkPrintf("Warning: PAPI failed to stop correctly!\n");
1703       }
1704       CkpvAccess(papiStopped) = 1;
1705   }
1706   // NOTE: We should not do a complete close of PAPI until after the
1707   // sts writer is done.
1708 #endif
1709   endTime = TraceTimer();
1710   _logPool->add(END_COMPUTATION, 0, 0, endTime, -1, -1);
1711   /*
1712   CkPrintf("End Computation [%d] records time as %lf\n", CkMyPe(), 
1713            endTime*1e06);
1714   */
1715 }
1716
1717 int TraceProjections::idxRegistered(int idx)
1718 {
1719     int idxVecLen = idxVec.size();
1720     for(int i=0; i<idxVecLen; i++)
1721     {
1722         if(idx == idxVec[i])
1723             return 1;
1724     }
1725     return 0;
1726 }
1727
1728 void TraceProjections::regFunc(const char *name, int &idx, int idxSpecifiedByUser){
1729     StrKey k((char*)name,strlen(name));
1730     int num = funcHashtable.get(k);
1731     
1732     if(num!=0) {
1733         return;
1734         //as for mpi programs, the same function may be registered for several times
1735         //CmiError("\"%s has been already registered! Please change the name!\"\n", name);
1736     }
1737     
1738     int isIdxExisting=0;
1739     if(idxSpecifiedByUser)
1740         isIdxExisting=idxRegistered(idx);
1741     if(isIdxExisting){
1742         return;
1743         //same reason with num!=0
1744         //CmiError("The identifier %d for the trace function has been already registered!", idx);
1745     }
1746
1747     if(idxSpecifiedByUser) {
1748         char *st = new char[strlen(name)+1];
1749         memcpy(st,name,strlen(name)+1);
1750         StrKey *newKey = new StrKey(st,strlen(st));
1751         int &ref = funcHashtable.put(*newKey);
1752         ref=idx;
1753         funcCount++;
1754         idxVec.push_back(idx);  
1755     } else {
1756         char *st = new char[strlen(name)+1];
1757         memcpy(st,name,strlen(name)+1);
1758         StrKey *newKey = new StrKey(st,strlen(st));
1759         int &ref = funcHashtable.put(*newKey);
1760         ref=funcCount;
1761         num = funcCount;
1762         funcCount++;
1763         idx = num;
1764         idxVec.push_back(idx);
1765     }
1766 }
1767
1768 void TraceProjections::beginFunc(char *name,char *file,int line){
1769         StrKey k(name,strlen(name));    
1770         unsigned short  num = (unsigned short)funcHashtable.get(k);
1771         beginFunc(num,file,line);
1772 }
1773
1774 void TraceProjections::beginFunc(int idx,char *file,int line){
1775         if(idx <= 0){
1776                 CmiError("Unregistered function id %d being used in %s:%d \n",idx,file,line);
1777         }       
1778         _logPool->add(BEGIN_FUNC,TraceTimer(),idx,line,file);
1779 }
1780
1781 void TraceProjections::endFunc(char *name){
1782         StrKey k(name,strlen(name));    
1783         int num = funcHashtable.get(k);
1784         endFunc(num);   
1785 }
1786
1787 void TraceProjections::endFunc(int num){
1788         if(num <= 0){
1789                 printf("endFunc without start :O\n");
1790         }
1791         _logPool->add(END_FUNC,TraceTimer(),num,0,NULL);
1792 }
1793
1794 // specialized PUP:ers for handling trace projections logs
1795 void toProjectionsFile::bytes(void *p,int n,size_t itemSize,dataType t)
1796 {
1797   for (int i=0;i<n;i++) 
1798     switch(t) {
1799     case Tchar: CheckAndFPrintF(f,"%c",((char *)p)[i]); break;
1800     case Tuchar:
1801     case Tbyte: CheckAndFPrintF(f,"%d",((unsigned char *)p)[i]); break;
1802     case Tshort: CheckAndFPrintF(f," %d",((short *)p)[i]); break;
1803     case Tushort: CheckAndFPrintF(f," %u",((unsigned short *)p)[i]); break;
1804     case Tint: CheckAndFPrintF(f," %d",((int *)p)[i]); break;
1805     case Tuint: CheckAndFPrintF(f," %u",((unsigned int *)p)[i]); break;
1806     case Tlong: CheckAndFPrintF(f," %ld",((long *)p)[i]); break;
1807     case Tulong: CheckAndFPrintF(f," %lu",((unsigned long *)p)[i]); break;
1808     case Tfloat: CheckAndFPrintF(f," %.7g",((float *)p)[i]); break;
1809     case Tdouble: CheckAndFPrintF(f," %.15g",((double *)p)[i]); break;
1810 #ifdef CMK_PUP_LONG_LONG
1811     case Tlonglong: CheckAndFPrintF(f," %lld",((CMK_PUP_LONG_LONG *)p)[i]); break;
1812     case Tulonglong: CheckAndFPrintF(f," %llu",((unsigned CMK_PUP_LONG_LONG *)p)[i]); break;
1813 #endif
1814     default: CmiAbort("Unrecognized pup type code!");
1815     };
1816 }
1817
1818 void fromProjectionsFile::bytes(void *p,int n,size_t itemSize,dataType t)
1819 {
1820   for (int i=0;i<n;i++) 
1821     switch(t) {
1822     case Tchar: { 
1823       char c = fgetc(f);
1824       if (c==EOF)
1825         parseError("Could not match character");
1826       else
1827         ((char *)p)[i] = c;
1828       break;
1829     }
1830     case Tuchar:
1831     case Tbyte: ((unsigned char *)p)[i]=(unsigned char)readInt("%d"); break;
1832     case Tshort:((short *)p)[i]=(short)readInt(); break;
1833     case Tushort: ((unsigned short *)p)[i]=(unsigned short)readUint(); break;
1834     case Tint:  ((int *)p)[i]=readInt(); break;
1835     case Tuint: ((unsigned int *)p)[i]=readUint(); break;
1836     case Tlong: ((long *)p)[i]=readInt(); break;
1837     case Tulong:((unsigned long *)p)[i]=readUint(); break;
1838     case Tfloat: ((float *)p)[i]=(float)readDouble(); break;
1839     case Tdouble:((double *)p)[i]=readDouble(); break;
1840 #ifdef CMK_PUP_LONG_LONG
1841     case Tlonglong: ((CMK_PUP_LONG_LONG *)p)[i]=readLongInt(); break;
1842     case Tulonglong: ((unsigned CMK_PUP_LONG_LONG *)p)[i]=readLongInt(); break;
1843 #endif
1844     default: CmiAbort("Unrecognized pup type code!");
1845     };
1846 }
1847
1848 #if CMK_PROJECTIONS_USE_ZLIB
1849 void toProjectionsGZFile::bytes(void *p,int n,size_t itemSize,dataType t)
1850 {
1851   for (int i=0;i<n;i++) 
1852     switch(t) {
1853     case Tchar: gzprintf(f,"%c",((char *)p)[i]); break;
1854     case Tuchar:
1855     case Tbyte: gzprintf(f,"%d",((unsigned char *)p)[i]); break;
1856     case Tshort: gzprintf(f," %d",((short *)p)[i]); break;
1857     case Tushort: gzprintf(f," %u",((unsigned short *)p)[i]); break;
1858     case Tint: gzprintf(f," %d",((int *)p)[i]); break;
1859     case Tuint: gzprintf(f," %u",((unsigned int *)p)[i]); break;
1860     case Tlong: gzprintf(f," %ld",((long *)p)[i]); break;
1861     case Tulong: gzprintf(f," %lu",((unsigned long *)p)[i]); break;
1862     case Tfloat: gzprintf(f," %.7g",((float *)p)[i]); break;
1863     case Tdouble: gzprintf(f," %.15g",((double *)p)[i]); break;
1864 #ifdef CMK_PUP_LONG_LONG
1865     case Tlonglong: gzprintf(f," %lld",((CMK_PUP_LONG_LONG *)p)[i]); break;
1866     case Tulonglong: gzprintf(f," %llu",((unsigned CMK_PUP_LONG_LONG *)p)[i]); break;
1867 #endif
1868     default: CmiAbort("Unrecognized pup type code!");
1869     };
1870 }
1871 #endif
1872
1873 void TraceProjections::endPhase() {
1874   double currentPhaseTime = TraceTimer();
1875   if (lastPhaseEvent != NULL) {
1876   } else {
1877     if (_logPool->pool != NULL) {
1878       // assumed to be BEGIN_COMPUTATION
1879     } else {
1880       CkPrintf("[%d] Warning: End Phase encountered in an empty log. Inserting BEGIN_COMPUTATION event\n", CkMyPe());
1881       _logPool->add(BEGIN_COMPUTATION, 0, 0, currentPhaseTime, -1, -1);
1882     }
1883   }
1884
1885   /* Insert endPhase event here. */
1886   /* FIXME: Format should be TYPE, PHASE#, TimeStamp, [StartTime] */
1887   /*        We currently "borrow" from the standard add() method. */
1888   /*        It should really be its own add() method.             */
1889   /* NOTE: assignment to lastPhaseEvent is "pre-emptive".         */
1890   lastPhaseEvent = &(_logPool->pool[_logPool->numEntries]);
1891   _logPool->add(END_PHASE, 0, currentPhaseID, currentPhaseTime, -1, CkMyPe());
1892   currentPhaseID++;
1893 }
1894
1895 #ifdef PROJ_ANALYSIS
1896 // ***** FROM HERE, ALL BOC-BASED FUNCTIONALITY IS DEFINED *******
1897
1898
1899 // ***@@@@ REGISTRATION FUNCTIONS/METHODS @@@@***
1900
1901 void registerOutlierReduction() {
1902   outlierReductionType =
1903     CkReduction::addReducer(outlierReduction);
1904   minMaxReductionType =
1905     CkReduction::addReducer(minMaxReduction);
1906 }
1907
1908 /**
1909  * **IMPT NOTES**:
1910  *
1911  * This is the C++ code that is registered to be activated at module
1912  * shutdown. This is called exactly once on processor 0. Module shutdown
1913  * is initiated as a result of a CkExit() call by the application code
1914  * 
1915  * The exit function must ultimately call CkExit() again to
1916  * so that other module exit functions may proceed after this module is
1917  * done.
1918  *
1919  */
1920 // FIXME: WHY extern "C"???
1921 extern "C" void TraceProjectionsExitHandler()
1922 {
1923 #if CMK_TRACE_ENABLED
1924   // CkPrintf("[%d] TraceProjectionsExitHandler called!\n", CkMyPe());
1925   CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1926   bocProxy.traceProjectionsParallelShutdown(CkMyPe());
1927 #else
1928   CkExit();
1929 #endif
1930 }
1931
1932 // This is called once on each processor but the idiom of use appears
1933 // to be to only have processor 0 register the function.
1934 //
1935 // See initnode in trace-projections.ci
1936 void initTraceProjectionsBOC()
1937 {
1938   // CkPrintf("[%d] Trace Projections initialization called!\n", CkMyPe());
1939 #ifdef __BIGSIM__
1940   if (BgNodeRank() == 0) {
1941 #else
1942     if (CkMyRank() == 0) {
1943 #endif
1944       registerExitFn(TraceProjectionsExitHandler);
1945     }
1946 #if 0
1947   } // this is so indentation does not get messed up
1948 #endif
1949 }
1950
1951 // mainchare for trace-projections BOC-operations. 
1952 // Instantiated at processor 0 and ONLY resides on processor 0 for the 
1953 // rest of its life.
1954 //
1955 // Responsible for:
1956 //   1. Handling commandline arguments
1957 //   2. Creating any objects required for proper BOC operations.
1958 //
1959 TraceProjectionsInit::TraceProjectionsInit(CkArgMsg *msg) {
1960   /** Options for Outlier Analysis */
1961   // defaults. Things will change with support for interactive analysis.
1962   bool findOutliers = false;
1963   bool outlierAutomatic = true;
1964   int numKSeeds = 10; 
1965
1966   int peNumKeep = CkNumPes();  // used as a default
1967   double entryThreshold = 0.0;
1968   bool outlierUsePhases = false;
1969   if (outlierAutomatic) {
1970     CmiGetArgIntDesc(msg->argv, "+outlierNumSeeds", &numKSeeds,
1971                      "Number of cluster seeds to apply at outlier analysis.");
1972     CmiGetArgIntDesc(msg->argv, "+outlierPeNumKeep", 
1973                      &peNumKeep, "Number of Processors to retain data");
1974     CmiGetArgDoubleDesc(msg->argv, "+outlierEpThresh", &entryThreshold,
1975                         "Minimum significance of entry points to be considered for clustering (%).");
1976     findOutliers =
1977       CmiGetArgFlagDesc(msg->argv,"+outlier", "Find Outliers.");
1978     outlierUsePhases = 
1979       CmiGetArgFlagDesc(msg->argv,"+outlierUsePhases",
1980                         "Apply automatic outlier analysis to any available phases.");
1981     if (outlierUsePhases) {
1982       // if the user wants to use an outlier feature, it is assumed outlier
1983       //    analysis is desired.
1984       findOutliers = true;
1985     }
1986   }
1987   bool findStartTime = (CmiTimerAbsolute()==1);
1988   traceProjectionsGID = CProxy_TraceProjectionsBOC::ckNew(findOutliers, findStartTime);
1989   if (findOutliers) {
1990     kMeansGID = CProxy_KMeansBOC::ckNew(outlierAutomatic,
1991                                         numKSeeds,
1992                                         peNumKeep,
1993                                         entryThreshold,
1994                                         outlierUsePhases);
1995   }
1996
1997   delete msg;
1998 }
1999
2000 // Called on every processor.
2001 void TraceProjectionsBOC::traceProjectionsParallelShutdown(int pe) {
2002   //CmiPrintf("[%d] traceProjectionsParallelShutdown called from . \n", CkMyPe(), pe);
2003   endPe = pe;                // the pe that starts CkExit()
2004   if (CkMyPe() == 0) {
2005     analysisStartTime = CmiWallTimer();
2006   }
2007   CkpvAccess(_trace)->endComputation();
2008   // no more tracing for projections on this processor after this point. 
2009   // Note that clear must be called after remove, or bad things will happen.
2010   CkpvAccess(_traces)->removeTrace(CkpvAccess(_trace));
2011   CkpvAccess(_traces)->clearTrace();
2012
2013   // From this point, we start multiple chains of reductions and broadcasts to
2014   // perform final online analysis activities.
2015
2016   // Start all parallel operations at once. 
2017   //   These MUST NOT modify base performance data in LogPool. If they must,
2018   //   then the parallel operations must be phased (and this code to be
2019   //   restructured as necessary)
2020   CProxy_KMeansBOC kMeansProxy(kMeansGID);
2021   CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
2022   if (findOutliers) {
2023     parModulesRemaining++;
2024     kMeansProxy[CkMyPe()].startKMeansAnalysis();
2025   }
2026   parModulesRemaining++;
2027   if (findStartTime) 
2028   bocProxy[CkMyPe()].startTimeAnalysis();
2029   else
2030   bocProxy[CkMyPe()].startEndTimeAnalysis();
2031 }
2032
2033 // handle flush log warnings
2034 void TraceProjectionsBOC::flush_warning(int pe) 
2035 {
2036     CmiAssert(CkMyPe() == 0);
2037     std::set<int>::iterator it;
2038     it = list.find(pe);
2039     if (it == list.end())    list.insert(pe);
2040     flush_count++;
2041 }
2042
2043 void TraceProjectionsBOC::print_warning() 
2044 {
2045     CmiAssert(CkMyPe() == 0);
2046     if (flush_count == 0) return;
2047     std::set<int>::iterator it;
2048     CkPrintf("*************************************************************\n");
2049     CkPrintf("Warning: Projections log flushed to disk %d times on %d cores: ", flush_count, list.size());
2050     for (it=list.begin(); it!=list.end(); it++)
2051       CkPrintf("%d ", *it);
2052     CkPrintf(".\n");
2053     CkPrintf("Warning: The performance data is likely invalid, unless the flushes have been explicitly synchronized by your program. \n");
2054     CkPrintf("*************************************************************\n");
2055 }
2056
2057 // Called on each processor
2058 void KMeansBOC::startKMeansAnalysis() {
2059   // Initialize all necessary structures
2060   LogPool *pool = CkpvAccess(_trace)->_logPool;
2061
2062  if(CkMyPe()==0)     CkPrintf("[%d] KMeansBOC::startKMeansAnalysis time=\t%g\n", CkMyPe(), CkWallTimer() );
2063   int flushInt = 0;
2064   if (pool->hasFlushed) {
2065     flushInt = 1;
2066   }
2067   
2068   CkCallback cb(CkIndex_KMeansBOC::flushCheck(NULL), 
2069                 0, thisProxy);
2070   contribute(sizeof(int), &flushInt, CkReduction::logical_or, cb);  
2071 }
2072
2073 // Called on processor 0
2074 void KMeansBOC::flushCheck(CkReductionMsg *msg) {
2075   int someFlush = *((int *)msg->getData());
2076
2077   // if(CkMyPe()==0) CkPrintf("[%d] KMeansBOC::flushCheck time=\t%g\n", CkMyPe(), CkWallTimer() );
2078   
2079   if (someFlush == 0) {
2080     // Data intact proceed with KMeans analysis
2081     CProxy_KMeansBOC kMeansProxy(kMeansGID);
2082     kMeansProxy.flushCheckDone();
2083   } else {
2084     // Some processor had flushed it data at some point, abandon KMeans
2085     CkPrintf("Warning: Some processor has flushed its data. No KMeans will be conducted\n");
2086     // terminate KMeans
2087     CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
2088     bocProxy[0].kMeansDone();
2089   }
2090 }
2091
2092 // Called on each processor
2093 void KMeansBOC::flushCheckDone() {
2094   // **FIXME** - more flexible metric collection scheme may be necessary
2095   //   in the future for production use.
2096   LogPool *pool = CkpvAccess(_trace)->_logPool;
2097
2098   // if(CkMyPe()==0)     CkPrintf("[%d] KMeansBOC::flushCheckDone time=\t%g\n", CkMyPe(), CkWallTimer() );
2099
2100   numEntryMethods = _entryTable.size();
2101   numMetrics = numEntryMethods + 2; // EPtime + idle and overhead
2102
2103   // maintained across phases
2104   markedBegin = false;
2105   markedIdle = false;
2106   beginBlockTime = 0.0;
2107   beginIdleBlockTime = 0.0;
2108   lastBeginEPIdx = -1; // none
2109
2110   lastPhaseIdx = 0;
2111   currentExecTimes = NULL;
2112   currentPhase = 0;
2113   selected = false;
2114
2115   pool->initializePhases();
2116
2117   // incoming K Seeds and the per-phase filter
2118   incKSeeds = new double[numK*numMetrics];
2119   keepMetric = new bool[numMetrics];
2120
2121   //  Something wrong when call thisProxy[CkMyPe()].getNextPhaseMetrics() !??!
2122   //  CProxy_KMeansBOC kMeansProxy(kMeansGID);
2123   //  kMeansProxy[CkMyPe()].getNextPhaseMetrics();
2124   thisProxy[CkMyPe()].getNextPhaseMetrics();
2125 }
2126
2127 // Called on each processor.
2128 void KMeansBOC::getNextPhaseMetrics() {
2129   // Assumes the presence of the complete logs on this processor.
2130   // Assumes first event is always BEGIN_COMPUTATION
2131   // Assumes each processor sees the same number of phases.
2132   //
2133   // In this code, we collect performance data for this processor.
2134   // All times are in seconds.
2135
2136   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::getNextPhaseMetrics time=\t%g\n", CkMyPe(), CkWallTimer() );  
2137
2138   if (usePhases) {
2139     DEBUGF("[%d] Using Phases\n", CkMyPe());
2140   } else {
2141     DEBUGF("[%d] NOT using Phases\n", CkMyPe());
2142   }
2143   
2144   if (currentExecTimes != NULL) {
2145     delete [] currentExecTimes;
2146   }
2147   currentExecTimes = new double[numMetrics];
2148   for (int i=0; i<numMetrics; i++) {
2149     currentExecTimes[i] = 0.0;
2150   }
2151
2152   int numEventMethods = _entryTable.size();
2153   LogPool *pool = CkpvAccess(_trace)->_logPool;
2154   
2155   CkAssert(pool->numEntries > lastPhaseIdx);
2156   double totalPhaseTime = 0.0;
2157   double totalActiveTime = 0.0; // entry method + idle
2158
2159   for (int i=lastPhaseIdx; i<pool->numEntries; i++) {
2160     if (pool->pool[i].type == BEGIN_PROCESSING) {
2161       // check pairing
2162       if (!markedBegin) {
2163         markedBegin = true;
2164       }
2165       beginBlockTime = pool->pool[i].time;
2166       lastBeginEPIdx = pool->pool[i].eIdx;
2167     } else if (pool->pool[i].type == END_PROCESSING) {
2168       // check pairing
2169       // if End without a begin, just ignore
2170       //   this event. If a phase-boundary is crossed, the Begin
2171       //   event would be maintained in beginBlockTime, so it is 
2172       //   not a problem.
2173       if (markedBegin) {
2174         markedBegin = false;
2175         if (pool->pool[i].event < 0)
2176         {
2177           // ignore dummy events. **FIXME** as they have no eIdx?
2178           continue;
2179         }
2180         currentExecTimes[pool->pool[i].eIdx] += 
2181           pool->pool[i].time - beginBlockTime;
2182         totalActiveTime += pool->pool[i].time - beginBlockTime;
2183         lastBeginEPIdx = -1;
2184       }
2185     } else if (pool->pool[i].type == BEGIN_IDLE) {
2186       // check pairing
2187       if (!markedIdle) {
2188         markedIdle = true;
2189       }
2190       beginIdleBlockTime = pool->pool[i].time;
2191     } else if (pool->pool[i].type == END_IDLE) {
2192       // check pairing
2193       if (markedIdle) {
2194         markedIdle = false;
2195         currentExecTimes[numEventMethods] += 
2196           pool->pool[i].time - beginIdleBlockTime;
2197         totalActiveTime += pool->pool[i].time - beginIdleBlockTime;
2198       }
2199     } else if (pool->pool[i].type == END_PHASE) {
2200       // ignored when not using phases
2201       if (usePhases) {
2202         // when we've not visited this node before
2203         if (i != lastPhaseIdx) { 
2204           totalPhaseTime = 
2205             pool->pool[i].time - pool->pool[lastPhaseIdx].time;
2206           // it is important that proper accounting of time take place here.
2207           // Note that END_PHASE events inevitably occur in the context of
2208           //   some entry method by the way the tracing API is designed.
2209           if (markedBegin) {
2210             CkAssert(lastBeginEPIdx >= 0);
2211             currentExecTimes[lastBeginEPIdx] += 
2212               pool->pool[i].time - beginBlockTime;
2213             totalActiveTime += pool->pool[i].time - beginBlockTime;
2214             // this is so the remainder contributes to the next phase
2215             beginBlockTime = pool->pool[i].time;
2216           }
2217           // The following is unlikely, but stranger things have happened.
2218           if (markedIdle) {
2219             currentExecTimes[numEventMethods] +=
2220               pool->pool[i].time - beginIdleBlockTime;
2221             totalActiveTime += pool->pool[i].time - beginIdleBlockTime;
2222             // this is so the remainder contributes to the next phase
2223             beginIdleBlockTime = pool->pool[i].time;
2224           }
2225           if (totalActiveTime <= totalPhaseTime) {
2226             currentExecTimes[numEventMethods+1] = 
2227               totalPhaseTime - totalActiveTime;
2228           } else {
2229             currentExecTimes[numEventMethods+1] = 0.0;
2230             CkPrintf("[%d] Warning: Overhead found to be negative for Phase %d!\n",
2231                      CkMyPe(), currentPhase);
2232           }
2233           collectKMeansData();
2234           // end the loop (and method) and defer the work till the next call
2235           lastPhaseIdx = i;
2236           break; 
2237         }
2238       }
2239     } else if (pool->pool[i].type == END_COMPUTATION) {
2240       if (markedBegin) {
2241         CkAssert(lastBeginEPIdx >= 0);
2242         currentExecTimes[lastBeginEPIdx] += 
2243           pool->pool[i].time - beginBlockTime;
2244         totalActiveTime += pool->pool[i].time - beginBlockTime;
2245       }
2246       if (markedIdle) {
2247         currentExecTimes[numEventMethods] +=
2248           pool->pool[i].time - beginIdleBlockTime;
2249         totalActiveTime += pool->pool[i].time - beginIdleBlockTime;
2250       }
2251       totalPhaseTime = 
2252         pool->pool[i].time - pool->pool[lastPhaseIdx].time;
2253       if (totalActiveTime <= totalPhaseTime) {
2254         currentExecTimes[numEventMethods+1] = totalPhaseTime - totalActiveTime;
2255       } else {
2256         currentExecTimes[numEventMethods+1] = 0.0;
2257         CkPrintf("[%d] Warning: Overhead found to be negative!\n",
2258                  CkMyPe());
2259       }
2260       collectKMeansData();
2261     }
2262   }
2263 }
2264
2265 /**
2266  *  Through a reduction, collectKMeansData aggregates each processors' data
2267  *  in order for global properties to be determined:
2268  *  
2269  *  1. min & max to determine normalization factors.
2270  *  2. sum to determine global EP averages for possible metric reduction
2271  *       through thresholding.
2272  *  3. sum of squares to compute stddev which may be useful in the future.
2273  *
2274  *  collectKMeansData will also keep the processor's data for the current
2275  *    phase so that it may be normalized and worked on subsequently.
2276  *
2277  **/
2278 void KMeansBOC::collectKMeansData() {
2279   int minOffset = numMetrics;
2280   int maxOffset = 2*numMetrics;
2281   int sosOffset = 3*numMetrics; // sos = Sum Of Squares
2282
2283   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::collectKMeansData time=\tg\n", CkMyPe(), CkWallTimer() );
2284
2285   double *reductionMsg = new double[numMetrics*4];
2286
2287   for (int i=0; i<numMetrics; i++) {
2288     reductionMsg[i] = currentExecTimes[i];
2289     // duplicate the event times for max and min sections of the reduction
2290     reductionMsg[minOffset + i] = currentExecTimes[i];
2291     reductionMsg[maxOffset + i] = currentExecTimes[i];
2292     // compute squares
2293     reductionMsg[sosOffset + i] = currentExecTimes[i]*currentExecTimes[i];
2294   }
2295
2296   CkCallback cb(CkIndex_KMeansBOC::globalMetricRefinement(NULL), 
2297                 0, thisProxy);
2298   contribute((numMetrics*4)*sizeof(double), reductionMsg, 
2299              outlierReductionType, cb);  
2300 }
2301
2302 // The purpose is mainly to initialize the k seeds and generate
2303 //   normalization parameters for each of the metrics. The k seeds
2304 //   and normalization parameters are broadcast to all processors.
2305 //
2306 // Called on processor 0
2307 void KMeansBOC::globalMetricRefinement(CkReductionMsg *msg) {
2308   CkAssert(CkMyPe() == 0);
2309   
2310   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::globalMetricRefinement time=\t%g\n", CkMyPe(), CkWallTimer() );
2311
2312   int sumOffset = 0;
2313   int minOffset = numMetrics;
2314   int maxOffset = 2*numMetrics;
2315   int sosOffset = 3*numMetrics; // sos = Sum Of Squares
2316
2317   // calculate statistics & boundaries for the k seeds for clustering
2318   KMeansStatsMessage *outmsg = 
2319     new (numMetrics, numK*numMetrics, numMetrics*4) KMeansStatsMessage;
2320   outmsg->numMetrics = numMetrics;
2321   outmsg->numKPos = numK*numMetrics;
2322   outmsg->numStats = numMetrics*4;
2323
2324   // Sum | Min | Max | Sum of Squares
2325   double *totalExecTimes = (double *)msg->getData();
2326   double totalTime = 0.0;
2327
2328   for (int i=0; i<numMetrics; i++) {
2329     DEBUGN("%lf\n", totalExecTimes[i]);
2330     totalTime += totalExecTimes[i];
2331
2332     // calculate event mean over all processors
2333     outmsg->stats[sumOffset + i] = totalExecTimes[sumOffset + i]/CkNumPes();
2334
2335     // get the ranges and offsets of each metric. With this, we get
2336     //   normalization factors that can be sent back to each processor to
2337     //   be used as necessary. We reuse max for range. Min remains the offset.
2338     outmsg->stats[minOffset + i] = totalExecTimes[minOffset + i];
2339     outmsg->stats[maxOffset + i] = totalExecTimes[maxOffset + i] -
2340       totalExecTimes[minOffset + i];
2341     
2342     // calculate stddev (using biased variance)
2343     outmsg->stats[sosOffset + i] = 
2344       sqrt((totalExecTimes[sosOffset + i] - 
2345             2*(outmsg->stats[i])*totalExecTimes[i] +
2346             (outmsg->stats[i])*(outmsg->stats[i])*CkNumPes())/
2347            CkNumPes());
2348   }
2349
2350   for (int i=0; i<numMetrics; i++) {
2351     // 1) if the proportion of the max value of the entry method relative to
2352     //   the average time taken over all entry methods across all processors
2353     //   is greater than the stipulated percentage threshold ...; AND
2354     // 2) if the range of values are non-zero.
2355     //
2356     // The current assumption is totalTime > 0 (what program has zero total
2357     //   time from all work?)
2358     keepMetric[i] = ((totalExecTimes[maxOffset + i]/(totalTime/CkNumPes()) >=
2359                      entryThreshold) &&
2360       (totalExecTimes[maxOffset + i] > totalExecTimes[minOffset + i]));
2361     if (keepMetric[i]) {
2362       DEBUGF("[%d] Keep EP %d | Max = %lf | Avg Tot = %lf\n", CkMyPe(), i,
2363              totalExecTimes[maxOffset + i], totalTime/CkNumPes());
2364     } else {
2365       DEBUGN("[%d] DO NOT Keep EP %d\n", CkMyPe(), i);
2366     }
2367     outmsg->filter[i] = keepMetric[i];
2368   }
2369
2370   delete msg;
2371   
2372   // initialize k seeds for this phase
2373   kSeeds = new double[numK*numMetrics];
2374
2375   numKReported = 0;
2376   kNumMembers = new int[numK];
2377
2378   // Randomly select k processors' metric vectors for the k seeds
2379   //  srand((unsigned)(CmiWallTimer()*1.0e06));
2380   srand(11337); // for debugging purposes
2381   for (int k=0; k<numK; k++) {
2382     DEBUGF("Seed %d | ", k);
2383     for (int m=0; m<numMetrics; m++) {
2384       double factor = totalExecTimes[maxOffset + m] - 
2385         totalExecTimes[minOffset + m];
2386       // "uniform" distribution, scaled according to the normalization
2387       //   factors
2388       //      kSeeds[numMetrics*k + m] = ((1.0*(k+1))/numK)*factor;
2389       // Random distribution.
2390       kSeeds[numMetrics*k + m] =
2391         ((rand()*1.0)/RAND_MAX)*factor;
2392       if (keepMetric[m]) {
2393         DEBUGF("[%d|%lf] ", m, kSeeds[numMetrics*k + m]);
2394       }
2395       outmsg->kSeedsPos[numMetrics*k + m] = kSeeds[numMetrics*k + m];
2396     }
2397     DEBUGF("\n");
2398     kNumMembers[k] = 0;
2399   }
2400
2401   // broadcast statistical values to all processors for cluster discovery
2402   thisProxy.findInitialClusters(outmsg);
2403 }
2404
2405
2406
2407 // Called on each processor.
2408 void KMeansBOC::findInitialClusters(KMeansStatsMessage *msg) {
2409
2410  if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::findInitialClusters time=\t%g\n", CkMyPe(), CkWallTimer() );
2411
2412   phaseIter = 0;
2413
2414   // Get info from stats message
2415   CkAssert(numMetrics == msg->numMetrics);
2416   for (int i=0; i<numMetrics; i++) {
2417     keepMetric[i] = msg->filter[i];
2418   }
2419
2420   // Normalize data on local processor.
2421   // **CWL** See my thesis for detailed discussion of normalization of
2422   //    performance data.
2423   // **NOTE** This might change if we want to send data based on the filter
2424   //   instead of all the data.
2425   CkAssert(numMetrics*4 == msg->numStats);
2426   for (int i=0; i<numMetrics; i++) {
2427     currentExecTimes[i] -= msg->stats[numMetrics + i];  // take offset
2428     // **CWL** We do not normalize the range. Entry methods that exhibit
2429     //   large absolute timing variations should be allowed to contribute
2430     //   more to the Euclidean distance measure!
2431     // currentExecTimes[i] /= msg->stats[2*numMetrics + i];
2432   }
2433
2434   // **NOTE** This might change if we want to send data based on the filter
2435   //   instead of all the data.
2436   CkAssert(numK*numMetrics == msg->numKPos);
2437   for (int i=0; i<msg->numKPos; i++) {
2438     incKSeeds[i] = msg->kSeedsPos[i];
2439   }
2440
2441   // Decide which KSeed this processor belongs to.
2442   minDistance = calculateDistance(0);
2443   DEBUGN("[%d] Phase %d Iter %d | Distance from 0 = %lf \n", CkMyPe(), 
2444            currentPhase, phaseIter, minDistance);
2445   minK = 0;
2446   for (int i=1; i<numK; i++) {
2447     double distance = calculateDistance(i);
2448     DEBUGN("[%d] Phase %d Iter %d | Distance from %d = %lf \n", CkMyPe(), 
2449              currentPhase, phaseIter, i, distance);
2450     if (distance < minDistance) {
2451       minDistance = distance;
2452       minK = i;
2453     }
2454   }
2455
2456   // Set up a reduction with the modification vector to the root (0).
2457   //
2458   // The modification vector sends a negative value for each metric
2459   //   for the K this processor no longer belongs to and a positive
2460   //   value to the K the processor now belongs. In addition, a -1.0
2461   //   is sent to the K it is leaving and a +1.0 to the K it is 
2462   //   joining.
2463   //
2464   // The processor must still contribute a "zero returns" even if
2465   //   nothing changes. This will be the basis for determine
2466   //   convergence at the root.
2467   //
2468   // The addtional +1 is meant for the count-change that must be
2469   //   maintained for the special cases at the root when some K
2470   //   may be deprived of all processor points or go from 0 to a
2471   //   positive number of processors (see later comments).
2472   double *modVector = new double[numK*(numMetrics+1)];
2473   for (int i=0; i<numK; i++) {
2474     for (int j=0; j<numMetrics+1; j++) {
2475       modVector[i*(numMetrics+1) + j] = 0.0;
2476     }
2477   }
2478   for (int i=0; i<numMetrics; i++) {
2479     // for this initialization, only positive values need be sent.
2480     modVector[minK*(numMetrics+1) + i] = currentExecTimes[i];
2481   }
2482   modVector[minK*(numMetrics+1)+numMetrics] = 1.0;
2483
2484   CkCallback cb(CkIndex_KMeansBOC::updateKSeeds(NULL), 
2485                 0, thisProxy);
2486   contribute(numK*(numMetrics+1)*sizeof(double), modVector, 
2487              CkReduction::sum_double, cb);  
2488 }
2489
2490 double KMeansBOC::calculateDistance(int k) {
2491   double ret = 0.0;
2492   for (int i=0; i<numMetrics; i++) {
2493     if (keepMetric[i]) {
2494       DEBUGN("[%d] Phase %d Iter %d Metric %d Exec %lf Seed %lf \n", 
2495              CkMyPe(), currentPhase, phaseIter, i,
2496                currentExecTimes[i], incKSeeds[k*numMetrics + i]);
2497       ret += pow(currentExecTimes[i] - incKSeeds[k*numMetrics + i], 2.0);
2498     }
2499   }
2500   return sqrt(ret);
2501 }
2502
2503 void KMeansBOC::updateKSeeds(CkReductionMsg *msg) {
2504   CkAssert(CkMyPe() == 0);
2505
2506   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::updateKSeeds time=\t%g\n", CkMyPe(), CkWallTimer() );
2507
2508   double *modVector = (double *)msg->getData();
2509   // sanity check
2510   CkAssert(numK*(numMetrics+1)*sizeof(double) == msg->getSize());
2511
2512   // A quick convergence test.
2513   bool hasChanges = false;
2514   for (int i=0; i<numK; i++) {
2515     hasChanges = hasChanges || 
2516       (modVector[i*(numMetrics+1) + numMetrics] != 0.0);
2517   }
2518   if (!hasChanges) {
2519     delete msg;
2520     findRepresentatives();
2521   } else {
2522     int overallChange = 0;
2523     for (int i=0; i<numK; i++) {
2524       int change = (int)modVector[i*(numMetrics+1) + numMetrics];
2525       if (change != 0) {
2526         overallChange += change;
2527         // modify the k seeds based on the modification vectors coming in
2528         //
2529         // If a seed initially has no members, its contents do not matter and
2530         //   is simply set to the average of the incoming vector.
2531         // If the change causes a seed to lose all its members, do nothing.
2532         //   Its last-known location is kept to allow it to re-capture
2533         //   membership at the next iteration rather than apply the last
2534         //   changes (which snaps the point unnaturally to 0,0).
2535         // Otherwise, apply the appropriate vector changes.
2536         CkAssert((kNumMembers[i] + change >= 0) &&
2537                  (kNumMembers[i] + change <= CkNumPes()));
2538         if (kNumMembers[i] == 0) {
2539           CkAssert(change > 0);
2540           for (int j=0; j<numMetrics; j++) {
2541             kSeeds[i*numMetrics + j] = modVector[i*(numMetrics+1) + j]/change;
2542           }
2543         } else if (kNumMembers[i] + change == 0) {
2544           // do nothing.
2545         } else {
2546           for (int j=0; j<numMetrics; j++) {
2547             kSeeds[i*numMetrics + j] *= kNumMembers[i];
2548             kSeeds[i*numMetrics + j] += modVector[i*(numMetrics+1) + j];
2549             kSeeds[i*numMetrics + j] /= kNumMembers[i] + change;
2550           }
2551         }
2552         kNumMembers[i] += change;
2553       }
2554       DEBUGN("[%d] Phase %d Iter %d K = %d Membership Count = %d\n",
2555              CkMyPe(), currentPhase, phaseIter, i, kNumMembers[i]);
2556     }
2557     delete msg;
2558
2559     // broadcast the new seed locations.
2560     KSeedsMessage *outmsg = new (numK*numMetrics) KSeedsMessage;
2561     outmsg->numKPos = numK*numMetrics;
2562     for (int i=0; i<numK*numMetrics; i++) {
2563       outmsg->kSeedsPos[i] = kSeeds[i];
2564     }
2565
2566     thisProxy.updateSeedMembership(outmsg);
2567   }
2568 }
2569
2570 // Called on all processors
2571 void KMeansBOC::updateSeedMembership(KSeedsMessage *msg) {
2572
2573   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::updateSeedMembership time=\t%g\n", CkMyPe(), CkWallTimer() );
2574
2575   phaseIter++;
2576
2577   // **NOTE** This might change if we want to send data based on the filter
2578   //   instead of all the data.
2579   CkAssert(numK*numMetrics == msg->numKPos);
2580   for (int i=0; i<msg->numKPos; i++) {
2581     incKSeeds[i] = msg->kSeedsPos[i];
2582   }
2583
2584   // Decide which KSeed this processor belongs to.
2585   lastMinK = minK;
2586   minDistance = calculateDistance(0);
2587   DEBUGN("[%d] Phase %d Iter %d | Distance from 0 = %lf \n", CkMyPe(), 
2588          currentPhase, phaseIter, minDistance);
2589
2590   minK = 0;
2591   for (int i=1; i<numK; i++) {
2592     double distance = calculateDistance(i);
2593     DEBUGN("[%d] Phase %d Iter %d | Distance from %d = %lf \n", CkMyPe(), 
2594            currentPhase, phaseIter, i, distance);
2595     if (distance < minDistance) {
2596       minDistance = distance;
2597       minK = i;
2598     }
2599   }
2600
2601   double *modVector = new double[numK*(numMetrics+1)];
2602   for (int i=0; i<numK; i++) {
2603     for (int j=0; j<numMetrics+1; j++) {
2604       modVector[i*(numMetrics+1) + j] = 0.0;
2605     }
2606   }
2607
2608   if (minK != lastMinK) {
2609     for (int i=0; i<numMetrics; i++) {
2610       modVector[minK*(numMetrics+1) + i] = currentExecTimes[i];
2611       modVector[lastMinK*(numMetrics+1) + i] = -currentExecTimes[i];
2612     }
2613     modVector[minK*(numMetrics+1)+numMetrics] = 1.0;
2614     modVector[lastMinK*(numMetrics+1)+numMetrics] = -1.0;
2615   }
2616
2617   CkCallback cb(CkIndex_KMeansBOC::updateKSeeds(NULL), 
2618                 0, thisProxy);
2619   contribute(numK*(numMetrics+1)*sizeof(double), modVector, 
2620              CkReduction::sum_double, cb);  
2621 }
2622
2623 void KMeansBOC::findRepresentatives() {
2624
2625   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::findRepresentatives time=\t%g\n", CkMyPe(), CkWallTimer() );
2626
2627   int numNonEmptyClusters = 0;
2628   for (int i=0; i<numK; i++) {
2629     if (kNumMembers[i] > 0) {
2630       numNonEmptyClusters++;
2631     }
2632   }
2633
2634   int numRepresentatives = peNumKeep;
2635   // **FIXME**
2636   // This is fairly arbitrary. Next time, choose the centers of the top
2637   //   largest clusters.
2638   if (numRepresentatives < numNonEmptyClusters) {
2639     numRepresentatives = numNonEmptyClusters;
2640   }
2641
2642   int slotsRemaining = numRepresentatives;
2643
2644   DEBUGF("Slots = %d | Non-empty = %d \n", slotsRemaining, 
2645          numNonEmptyClusters);
2646
2647   // determine how many exemplars to select per cluster. Currently
2648   //   hardcoded to 1. Future challenge is to decide on other numbers
2649   //   or proportionality.
2650   //
2651   int exemplarsPerCluster = 1;
2652   slotsRemaining -= exemplarsPerCluster*numNonEmptyClusters;
2653
2654   int numCandidateOutliers = CkNumPes() - 
2655     exemplarsPerCluster*numNonEmptyClusters;
2656
2657   double *remainders = new double[numK];
2658   int *assigned = new int[numK];
2659   exemplarChoicesLeft = new int[numK];
2660   outlierChoicesLeft = new int[numK];
2661
2662   for (int i=0; i<numK; i++) {
2663     assigned[i] = 0;
2664     remainders[i] = 
2665       (kNumMembers[i] - exemplarsPerCluster*numNonEmptyClusters) *
2666       slotsRemaining / numCandidateOutliers;
2667     if (remainders[i] >= 0.0) {
2668       assigned[i] = (int)floor(remainders[i]);
2669       remainders[i] -= assigned[i];
2670     } else {
2671       remainders[i] = 0.0;
2672     }
2673   }
2674
2675   for (int i=0; i<numK; i++) {
2676     slotsRemaining -= assigned[i];
2677   }
2678   CkAssert(slotsRemaining >= 0);
2679
2680   // find clusters to assign the loose slots to, in order of
2681   // remainder proportion
2682   while (slotsRemaining > 0) {
2683     double max = 0.0;
2684     int winner = 0;
2685     for (int i=0; i<numK; i++) {
2686       if (remainders[i] > max) {
2687         max = remainders[i];
2688         winner = i;
2689       }
2690     }
2691     assigned[winner]++;
2692     remainders[winner] = 0.0;
2693     slotsRemaining--;
2694   }
2695
2696   // set up how many reduction cycles of min/max we need to conduct to
2697   // select the representatives.
2698   numSelectionIter = exemplarsPerCluster;
2699   for (int i=0; i<numK; i++) {
2700     if (assigned[i] > numSelectionIter) {
2701       numSelectionIter = assigned[i];
2702     }
2703   }
2704   DEBUGF("Selection Iterations = %d\n", numSelectionIter);
2705
2706   for (int i=0; i<numK; i++) {
2707     if (kNumMembers[i] > 0) {
2708       exemplarChoicesLeft[i] = exemplarsPerCluster;
2709       outlierChoicesLeft[i] = assigned[i];
2710     } else {
2711       exemplarChoicesLeft[i] = 0;
2712       outlierChoicesLeft[i] = 0;
2713     }
2714     DEBUGF("%d | Exemplar = %d | Outlier = %d\n", i, exemplarChoicesLeft[i],
2715            outlierChoicesLeft[i]);
2716   }
2717
2718   delete [] assigned;
2719   delete [] remainders;
2720
2721   // send out first broadcast
2722   KSelectionMessage *outmsg = NULL;
2723   if (numSelectionIter > 0) {
2724     outmsg = new (numK, numK, numK) KSelectionMessage;
2725     outmsg->numKMinIDs = numK;
2726     outmsg->numKMaxIDs = numK;
2727     for (int i=0; i<numK; i++) {
2728       outmsg->minIDs[i] = -1;
2729       outmsg->maxIDs[i] = -1;
2730     }
2731     thisProxy.collectDistances(outmsg);
2732   } else {
2733     CkPrintf("Warning: No selection iteration from the start!\n");
2734     // invoke phase completion on all processors
2735     thisProxy.phaseDone();
2736   }
2737 }
2738
2739 /*
2740  *  lastMin = array of minimum champions of the last tournament
2741  *  lastMax = array of maximum champions of the last tournament
2742  *  lastMaxVal = array of last encountered maximum values, allows previous
2743  *                 minimum winners to eliminate themselves from the next
2744  *                 minimum race.
2745  *
2746  *  Called on all processors.
2747  */
2748 void KMeansBOC::collectDistances(KSelectionMessage *msg) {
2749
2750   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::collectDistances time=\t%g\n", CkMyPe(), CkWallTimer() );
2751
2752   DEBUGF("[%d] %d | min = %d max = %d\n", CkMyPe(),
2753          lastMinK, msg->minIDs[lastMinK], msg->maxIDs[lastMinK]);
2754   if ((CkMyPe() == msg->minIDs[lastMinK]) || 
2755       (CkMyPe() == msg->maxIDs[lastMinK])) {
2756     CkAssert(!selected);
2757     selected = true;
2758   }
2759
2760   // build outgoing reduction structure
2761   //   format = minVal | ID | maxVal | ID
2762   double *minMaxAndIDs = NULL;
2763
2764   minMaxAndIDs = new double[numK*4];
2765   // initialize to the appropriate out-of-band values (for error checks)
2766   for (int i=0; i<numK; i++) {
2767     minMaxAndIDs[i*4] = -1.0; // out-of-band min value
2768     minMaxAndIDs[i*4+1] = -1.0; // out of band ID
2769     minMaxAndIDs[i*4+2] = -1.0; // out-of-band max value
2770     minMaxAndIDs[i*4+3] = -1.0; // out of band ID
2771   }
2772   // If I have not won before, I put myself back into the competition
2773   if (!selected) {
2774     DEBUGF("[%d] My Contribution = %lf\n", CkMyPe(), minDistance);
2775     minMaxAndIDs[lastMinK*4] = minDistance;
2776     minMaxAndIDs[lastMinK*4+1] = CkMyPe();
2777     minMaxAndIDs[lastMinK*4+2] = minDistance;
2778     minMaxAndIDs[lastMinK*4+3] = CkMyPe();
2779   }
2780   delete msg;
2781
2782   CkCallback cb(CkIndex_KMeansBOC::findNextMinMax(NULL), 
2783                 0, thisProxy);
2784   contribute(numK*4*sizeof(double), minMaxAndIDs, 
2785              minMaxReductionType, cb);  
2786 }
2787
2788 void KMeansBOC::findNextMinMax(CkReductionMsg *msg) {
2789   // incoming format:
2790   //   minVal | minID | maxVal | maxID
2791
2792   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::findNextMinMax time=\t%g\n", CkMyPe(), CkWallTimer() );
2793
2794   if (numSelectionIter > 0) {
2795     double *incInfo = (double *)msg->getData();
2796     
2797     KSelectionMessage *outmsg = new (numK, numK) KSelectionMessage;
2798     outmsg->numKMinIDs = numK;
2799     outmsg->numKMaxIDs = numK;
2800     
2801     for (int i=0; i<numK; i++) {
2802       DEBUGF("%d | %lf %d %lf %d \n", i, 
2803              incInfo[i*4], (int)incInfo[i*4+1], 
2804              incInfo[i*4+2], (int)incInfo[i*4+3]);
2805     }
2806
2807     for (int i=0; i<numK; i++) {
2808       if (exemplarChoicesLeft[i] > 0) {
2809         outmsg->minIDs[i] = (int)incInfo[i*4+1];
2810         exemplarChoicesLeft[i]--;
2811       } else {
2812         outmsg->minIDs[i] = -1;
2813       }
2814       if (outlierChoicesLeft[i] > 0) {
2815         outmsg->maxIDs[i] = (int)incInfo[i*4+3];
2816         outlierChoicesLeft[i]--;
2817       } else {
2818         outmsg->maxIDs[i] = -1;
2819       }
2820     }
2821     thisProxy.collectDistances(outmsg);
2822     numSelectionIter--;
2823   } else {
2824     // invoke phase completion on all processors
2825     thisProxy.phaseDone();
2826   }
2827 }
2828
2829 /**
2830  *  Completion of the K-Means clustering and data selection of one phase
2831  *    of the computation.
2832  *
2833  *  Called on every processor.
2834  */
2835 void KMeansBOC::phaseDone() {
2836
2837   //  if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::phaseDone time=\t%g\n", CkMyPe(), CkWallTimer() );
2838
2839   LogPool *pool = CkpvAccess(_trace)->_logPool;
2840   CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
2841
2842   // now decide on what to do with the decision.
2843   if (!selected) {
2844     if (usePhases) {
2845       pool->keepPhase[currentPhase] = false;
2846     } else {
2847       // if not using phases, we're working on the whole log
2848       pool->setAllPhases(false);
2849     }
2850   }
2851
2852   // **FIXME** (?) - All processors have to agree on this, or the reduction
2853   //   will not be correct! The question is "is this enforcible?"
2854   if ((currentPhase == (pool->numPhases-1)) || !usePhases) {
2855     // We're done
2856     int dummy = 0;
2857     CkCallback cb(CkIndex_TraceProjectionsBOC::kMeansDone(NULL), 
2858                   0, bocProxy);
2859     contribute(sizeof(int), &dummy, CkReduction::sum_int, cb);
2860   } else {
2861     // reset all phase-based k-means data and decisions
2862
2863     // **FIXME**!!!!!    
2864     
2865     // invoke the next K-Means computation phase.
2866     currentPhase++;
2867     thisProxy[CkMyPe()].getNextPhaseMetrics();
2868   }
2869 }
2870
2871 void TraceProjectionsBOC::startTimeAnalysis()
2872 {
2873   double startTime = 0.0;
2874   if (CkpvAccess(_trace)->_logPool->numEntries>0)
2875      startTime = CkpvAccess(_trace)->_logPool->pool[0].time;
2876   CkCallback cb(CkIndex_TraceProjectionsBOC::startTimeDone(NULL), thisProxy);
2877   contribute(sizeof(double), &startTime, CkReduction::min_double, cb);  
2878 }
2879
2880 void TraceProjectionsBOC::startTimeDone(CkReductionMsg *msg)
2881 {
2882   // CkPrintf("[%d] TraceProjectionsBOC::startTimeDone time=\t%g parModulesRemaining:%d\n", CkMyPe(), CkWallTimer(), parModulesRemaining);
2883
2884   if (CkpvAccess(_trace) != NULL) {
2885     CkpvAccess(_trace)->_logPool->globalStartTime = *(double *)msg->getData();
2886     CkpvAccess(_trace)->_logPool->setNewStartTime();
2887     //if (CkMyPe() == 0) CkPrintf("Start time determined to be %lf us\n", (CkpvAccess(_trace)->_logPool->globalStartTime)*1e06);
2888   }
2889   delete msg;
2890   thisProxy[CkMyPe()].startEndTimeAnalysis();
2891 }
2892
2893 void TraceProjectionsBOC::startEndTimeAnalysis()
2894 {
2895  //CkPrintf("[%d] TraceProjectionsBOC::startEndTimeAnalysis time=\t%g\n", CkMyPe(), CkWallTimer() );
2896
2897   endTime = CkpvAccess(_trace)->endTime;
2898   // CkPrintf("[%d] End time is %lf us\n", CkMyPe(), endTime*1e06);
2899
2900   CkCallback cb(CkIndex_TraceProjectionsBOC::endTimeDone(NULL), 
2901                 0, thisProxy);
2902   contribute(sizeof(double), &endTime, CkReduction::max_double, cb);  
2903 }
2904
2905 void TraceProjectionsBOC::endTimeDone(CkReductionMsg *msg)
2906 {
2907  //if(CkMyPe()==0)    CkPrintf("[%d] TraceProjectionsBOC::endTimeDone time=\t%g parModulesRemaining:%d\n", CkMyPe(), CkWallTimer(), parModulesRemaining);
2908
2909   CkAssert(CkMyPe() == 0);
2910   parModulesRemaining--;
2911   if (CkpvAccess(_trace) != NULL) {
2912     CkpvAccess(_trace)->_logPool->globalEndTime = *(double *)msg->getData() - CkpvAccess(_trace)->_logPool->globalStartTime;
2913     // CkPrintf("End time determined to be %lf us\n",
2914     //       (CkpvAccess(_trace)->_logPool->globalEndTime)*1e06);
2915   }
2916   delete msg;
2917   if (parModulesRemaining == 0) {
2918     thisProxy[CkMyPe()].finalize();
2919   }
2920 }
2921
2922 void TraceProjectionsBOC::kMeansDone(CkReductionMsg *msg) {
2923
2924  if(CkMyPe()==0)  CkPrintf("[%d] TraceProjectionsBOC::kMeansDone time=\t%g\n", CkMyPe(), CkWallTimer() );
2925
2926   CkAssert(CkMyPe() == 0);
2927   parModulesRemaining--;
2928   CkPrintf("K-Means Analysis Time = %lf seconds\n",
2929            CmiWallTimer()-analysisStartTime);
2930   delete msg;
2931   if (parModulesRemaining == 0) {
2932     thisProxy[CkMyPe()].finalize();
2933   }
2934 }
2935
2936 /**
2937  *
2938  *  This version is called (on processor 0) only if flushCheck fails.
2939  *
2940  */
2941 void TraceProjectionsBOC::kMeansDone() {
2942   CkAssert(CkMyPe() == 0);
2943   parModulesRemaining--;
2944   CkPrintf("K-Means Analysis Aborted because of flush. Time taken = %lf seconds\n",
2945            CmiWallTimer()-analysisStartTime);
2946   if (parModulesRemaining == 0) {
2947     thisProxy[CkMyPe()].finalize();
2948   }
2949 }
2950
2951 void TraceProjectionsBOC::finalize()
2952 {
2953   CkAssert(CkMyPe() == 0);
2954   //CkPrintf("Total Analysis Time = %lf seconds\n", 
2955   //       CmiWallTimer()-analysisStartTime);
2956   thisProxy.closingTraces();
2957 }
2958
2959 // Called on every processor
2960 void TraceProjectionsBOC::closingTraces() {
2961   CkpvAccess(_trace)->closeTrace();
2962
2963     // subtle:  reduction needs to go to the PE which started CkExit()
2964   int pe = 0;
2965   if (endPe != -1) pe = endPe;
2966   CkCallback cb(CkIndex_TraceProjectionsBOC::closeParallelShutdown(NULL), 
2967                 pe, thisProxy); 
2968   contribute(0, NULL, CkReduction::sum_int, cb);  
2969 }
2970
2971 // The sole purpose of this reduction is to decide whether or not
2972 //   Projections as a module needs to call CkExit() to get other
2973 //   modules to shutdown.
2974 void TraceProjectionsBOC::closeParallelShutdown(CkReductionMsg *msg) {
2975   CkAssert(endPe == -1 && CkMyPe() ==0 || CkMyPe() == endPe);
2976   delete msg;
2977   // decide if CkExit() needs to be called
2978   if (!CkpvAccess(_trace)->converseExit) {
2979     CkExit();
2980   }
2981 }
2982 /*
2983  *  Registration and definition of the Outlier Reduction callback.
2984  *  Format: Sum | Min | Max | Sum of Squares
2985  */
2986 CkReductionMsg *outlierReduction(int nMsgs,
2987                                  CkReductionMsg **msgs) {
2988   int numBytes = 0;
2989   int numMetrics = 0;
2990   double *ret = NULL;
2991
2992   if (nMsgs == 1) {
2993     // nothing to do, just pass it on
2994     return CkReductionMsg::buildNew(msgs[0]->getSize(),msgs[0]->getData());
2995   }
2996
2997   if (nMsgs > 1) {
2998     numBytes = msgs[0]->getSize();
2999     // sanity checks
3000     if (numBytes%sizeof(double) != 0) {
3001       CkAbort("Outlier Reduction Size incompatible with doubles!\n");
3002     }
3003     if ((numBytes/sizeof(double))%4 != 0) {
3004       CkAbort("Outlier Reduction Size Array not divisible by 4!\n");
3005     }
3006     numMetrics = (numBytes/sizeof(double))/4;
3007     ret = new double[numMetrics*4];
3008
3009     // copy the first message data into the return structure first
3010     for (int i=0; i<numMetrics*4; i++) {
3011       ret[i] = ((double *)msgs[0]->getData())[i];
3012     }
3013
3014     // Sum | Min | Max | Sum of Squares
3015     for (int msgIdx=1; msgIdx<nMsgs; msgIdx++) {
3016       for (int i=0; i<numMetrics; i++) {
3017         // Sum
3018         ret[i] += ((double *)msgs[msgIdx]->getData())[i];
3019         // Min
3020         ret[numMetrics + i] =
3021           (ret[numMetrics + i] < 
3022            ((double *)msgs[msgIdx]->getData())[numMetrics + i]) 
3023           ? ret[numMetrics + i] : 
3024           ((double *)msgs[msgIdx]->getData())[numMetrics + i];
3025         // Max
3026         ret[2*numMetrics + i] = 
3027           (ret[2*numMetrics + i] >
3028            ((double *)msgs[msgIdx]->getData())[2*numMetrics + i])
3029           ? ret[2*numMetrics + i] :
3030           ((double *)msgs[msgIdx]->getData())[2*numMetrics + i];
3031         // Sum of Squares (squaring already done at leaf)
3032         ret[3*numMetrics + i] +=
3033           ((double *)msgs[msgIdx]->getData())[3*numMetrics + i];
3034       }
3035     }
3036   }
3037   
3038   /* apparently, we do not delete the incoming messages */
3039   return CkReductionMsg::buildNew(numBytes,ret);
3040 }
3041
3042 /*
3043  * The only reason we have a user-defined reduction is to support
3044  *   identification of the "winning" processors as well as to handle
3045  *   both the min and the max of each "tournament". A simple min/max
3046  *   discovery cannot handle ties.
3047  */
3048 CkReductionMsg *minMaxReduction(int nMsgs,
3049                                 CkReductionMsg **msgs) {
3050   CkAssert(nMsgs > 0);
3051
3052   int numBytes = msgs[0]->getSize();
3053   CkAssert(numBytes%sizeof(double) == 0);
3054   int numK = (numBytes/sizeof(double))/4;
3055
3056   double *ret = new double[numK*4];
3057   // fill with out-of-band values
3058   for (int i=0; i<numK; i++) {
3059     ret[i*4] = -1.0;
3060     ret[i*4+1] = -1.0;
3061     ret[i*4+2] = -1.0;
3062     ret[i*4+3] = -1.0;
3063   }
3064
3065   // incoming format K * (minVal | minIdx | maxVal | maxIdx)
3066   for (int i=0; i<nMsgs; i++) {
3067     double *temp = (double *)msgs[i]->getData();
3068     for (int j=0; j<numK; j++) {
3069       // no previous valid min
3070       if (ret[j*4+1] < 0) {
3071         // fill it in only if the incoming min is valid
3072         if (temp[j*4+1] >= 0) {
3073           ret[j*4] = temp[j*4];      // fill min value
3074           ret[j*4+1] = temp[j*4+1];  // fill ID
3075         }
3076       } else {
3077         // find Min, only if incoming min is valid
3078         if (temp[j*4+1] >= 0) {
3079           if (temp[j*4] < ret[j*4]) {
3080             ret[j*4] = temp[j*4];      // replace min value
3081             ret[j*4+1] = temp[j*4+1];  // replace ID
3082           }
3083         }
3084       }
3085       // no previous valid max
3086       if (ret[j*4+3] < 0) {
3087         // fill only if the incoming max is valid
3088         if (temp[j*4+3] >= 0) {
3089           ret[j*4+2] = temp[j*4+2];  // fill max value
3090           ret[j*4+3] = temp[j*4+3];  // fill ID
3091         }
3092       } else {
3093         // find Max, only if incoming max is valid
3094         if (temp[j*4+3] >= 0) {
3095           if (temp[j*4+2] > ret[j*4+2]) {
3096             ret[j*4+2] = temp[j*4+2];  // replace max value
3097             ret[j*4+3] = temp[j*4+3];  // replace ID
3098           }
3099         }
3100       }
3101     }
3102   }
3103   CkReductionMsg *redmsg = CkReductionMsg::buildNew(numBytes, ret);
3104   delete [] ret;
3105   return redmsg;
3106 }
3107
3108 #include "TraceProjections.def.h"
3109 #endif //PROJ_ANALYSIS
3110
3111 /*@}*/