revise app interation API for better interface
[charm.git] / src / ck-perf / trace-projections.C
1 /**
2  * \addtogroup CkPerf
3 */
4 /*@{*/
5
6 #include <string.h>
7
8 #include "charm++.h"
9 #include "trace-projections.h"
10 #include "trace-projectionsBOC.h"
11 #include "TopoManager.h"
12
13 #if DEBUG_PROJ
14 #define DEBUGF(...) CkPrintf(__VA_ARGS__)
15 #else
16 #define DEBUGF(...)
17 #endif
18 #define DEBUGN(...)  // easy way to selectively disable DEBUGs
19
20 #define DefaultLogBufSize      1000000
21
22 // **CW** Simple delta encoding implementation
23 // delta encoding is on by default. It may be turned off later in
24 // the runtime.
25 int deltaLog;
26 int nonDeltaLog;
27
28 int checknested=0;              // check illegal nested begin/end execute 
29
30 #ifdef PROJ_ANALYSIS
31 // BOC operations readonlys
32 CkGroupID traceProjectionsGID;
33 CkGroupID kMeansGID;
34
35 // New reduction type for Outlier Analysis purposes. This is allowed to be
36 // a global variable according to the Charm++ manual.
37 CkReductionMsg *outlierReduction(int nMsgs,
38                                  CkReductionMsg **msgs);
39 CkReductionMsg *minMaxReduction(int nMsgs,
40                                 CkReductionMsg **msgs);
41 CkReduction::reducerType outlierReductionType;
42 CkReduction::reducerType minMaxReductionType;
43 #endif // PROJ_ANALYSIS
44
45 CkpvStaticDeclare(TraceProjections*, _trace);
46 CtvStaticDeclare(int,curThreadEvent);
47
48 CkpvDeclare(CmiInt8, CtrLogBufSize);
49
50 typedef CkVec<char *>  usrEventVec;
51 CkpvStaticDeclare(usrEventVec, usrEventlist);
52 class UsrEvent {
53 public:
54   int e;
55   char *str;
56   UsrEvent(int _e, char* _s): e(_e),str(_s) {}
57 };
58 CkpvStaticDeclare(CkVec<UsrEvent *>*, usrEvents);
59
60 // When tracing is disabled, these are defined as empty static inlines
61 // in the header, to minimize overhead
62 #if CMK_TRACE_ENABLED
63 /// Disable the outputting of the trace logs
64 void disableTraceLogOutput()
65
66   CkpvAccess(_trace)->setWriteData(false);
67 }
68
69 /// Enable the outputting of the trace logs
70 void enableTraceLogOutput()
71 {
72   CkpvAccess(_trace)->setWriteData(true);
73 }
74
75 /// Force the log files to be flushed
76 void flushTraceLog()
77 {
78   CkpvAccess(_trace)->traceFlushLog();
79 }
80 #endif
81
82 #if ! CMK_TRACE_ENABLED
83 static int warned=0;
84 #define OPTIMIZED_VERSION       \
85         if (!warned) { warned=1;        \
86         CmiPrintf("\n\n!!!! Warning: traceUserEvent not available in optimized version!!!!\n\n\n"); }
87 #else
88 #define OPTIMIZED_VERSION /*empty*/
89 #endif // CMK_TRACE_ENABLED
90
91 /*
92 On T3E, we need to have file number control by open/close files only when needed.
93 */
94 #if CMK_TRACE_LOGFILE_NUM_CONTROL
95   #define OPEN_LOG openLog("a");
96   #define CLOSE_LOG closeLog();
97 #else
98   #define OPEN_LOG
99   #define CLOSE_LOG
100 #endif //CMK_TRACE_LOGFILE_NUM_CONTROL
101
102 /**
103   For each TraceFoo module, _createTraceFoo() must be defined.
104   This function is called in _createTraces() generated in moduleInit.C
105 */
106 void _createTraceprojections(char **argv)
107 {
108   DEBUGF("%d createTraceProjections\n", CkMyPe());
109   CkpvInitialize(CkVec<char *>, usrEventlist);
110   CkpvInitialize(CkVec<UsrEvent *>*, usrEvents);
111   CkpvAccess(usrEvents) = new CkVec<UsrEvent *>();
112 #if CMK_BIGSIM_CHARM
113   // CthRegister does not call the constructor
114 //  CkpvAccess(usrEvents) = CkVec<UsrEvent *>();
115 #endif //CMK_BIGSIM_CHARM
116   CkpvInitialize(TraceProjections*, _trace);
117   CkpvAccess(_trace) = new  TraceProjections(argv);
118   CkpvAccess(_traces)->addTrace(CkpvAccess(_trace));
119   if (CkMyPe()==0) CkPrintf("Charm++: Tracemode Projections enabled.\n");
120 }
121  
122 /* ****** CW TEMPORARY LOCATION ***** Support for thread listeners */
123
124 struct TraceThreadListener {
125   struct CthThreadListener base;
126   int event;
127   int msgType;
128   int ep;
129   int srcPe;
130   int ml;
131   CmiObjId idx;
132 };
133
134 extern "C"
135 void traceThreadListener_suspend(struct CthThreadListener *l)
136 {
137   TraceThreadListener *a=(TraceThreadListener *)l;
138   /* here, we activate the appropriate trace codes for the appropriate
139      registered modules */
140   traceSuspend();
141 }
142
143 extern "C"
144 void traceThreadListener_resume(struct CthThreadListener *l) 
145 {
146   TraceThreadListener *a=(TraceThreadListener *)l;
147   /* here, we activate the appropriate trace codes for the appropriate
148      registered modules */
149   _TRACE_BEGIN_EXECUTE_DETAILED(a->event,a->msgType,a->ep,a->srcPe,a->ml,
150                                 CthGetThreadID(a->base.thread));
151   a->event=-1;
152   a->srcPe=CkMyPe(); /* potential lie to migrated threads */
153   a->ml=0;
154 }
155
156 extern "C"
157 void traceThreadListener_free(struct CthThreadListener *l) 
158 {
159   TraceThreadListener *a=(TraceThreadListener *)l;
160   delete a;
161 }
162
163 void TraceProjections::traceAddThreadListeners(CthThread tid, envelope *e)
164 {
165 #if CMK_TRACE_ENABLED
166   /* strip essential information from the envelope */
167   TraceThreadListener *a= new TraceThreadListener;
168   
169   a->base.suspend=traceThreadListener_suspend;
170   a->base.resume=traceThreadListener_resume;
171   a->base.free=traceThreadListener_free;
172   a->event=e->getEvent();
173   a->msgType=e->getMsgtype();
174   a->ep=e->getEpIdx();
175   a->srcPe=e->getSrcPe();
176   a->ml=e->getTotalsize();
177
178   CthAddListener(tid, (CthThreadListener *)a);
179 #endif
180 }
181
182 void LogPool::openLog(const char *mode)
183 {
184 #if CMK_PROJECTIONS_USE_ZLIB
185   if(compressed) {
186     if (nonDeltaLog) {
187       do {
188         zfp = gzopen(fname, mode);
189       } while (!zfp && (errno == EINTR || errno == EMFILE));
190       if(!zfp) CmiAbort("Cannot open Projections Compressed Non Delta Trace File for writing...\n");
191     }
192     if (deltaLog) {
193       do {
194         deltazfp = gzopen(dfname, mode);
195       } while (!deltazfp && (errno == EINTR || errno == EMFILE));
196       if (!deltazfp) 
197         CmiAbort("Cannot open Projections Compressed Delta Trace File for writing...\n");
198     }
199   } else {
200     if (nonDeltaLog) {
201       do {
202         fp = fopen(fname, mode);
203       } while (!fp && (errno == EINTR || errno == EMFILE));
204       if (!fp) {
205         CkPrintf("[%d] Attempting to open file [%s]\n",CkMyPe(),fname);
206         CmiAbort("Cannot open Projections Non Delta Trace File for writing...\n");
207       }
208     }
209     if (deltaLog) {
210       do {
211         deltafp = fopen(dfname, mode);
212       } while (!deltafp && (errno == EINTR || errno == EMFILE));
213       if (!deltafp) 
214         CmiAbort("Cannot open Projections Delta Trace File for writing...\n");
215     }
216   }
217 #else
218   if (nonDeltaLog) {
219     do {
220       fp = fopen(fname, mode);
221     } while (!fp && (errno == EINTR || errno == EMFILE));
222     if (!fp) {
223       CkPrintf("[%d] Attempting to open file [%s]\n",CkMyPe(),fname);
224       CmiAbort("Cannot open Projections Non Delta Trace File for writing...\n");
225     }
226   }
227   if (deltaLog) {
228     do {
229       deltafp = fopen(dfname, mode);
230     } while (!deltafp && (errno == EINTR || errno == EMFILE));
231     if(!deltafp) 
232       CmiAbort("Cannot open Projections Delta Trace File for writing...\n");
233   }
234 #endif
235 }
236
237 void LogPool::closeLog(void)
238 {
239 #if CMK_PROJECTIONS_USE_ZLIB
240   if(compressed) {
241     if (nonDeltaLog) gzclose(zfp);
242     if (deltaLog) gzclose(deltazfp);
243     return;
244   }
245 #endif
246   if (nonDeltaLog) { 
247 #if !defined(_WIN32) || defined(__CYGWIN__)
248     fsync(fileno(fp)); 
249 #endif
250     fclose(fp); 
251   }
252   if (deltaLog)  { 
253 #if !defined(_WIN32) || defined(__CYGWIN__)
254     fsync(fileno(deltafp)); 
255 #endif
256     fclose(deltafp);  
257   }
258 }
259
260 LogPool::LogPool(char *pgm) {
261   pool = new LogEntry[CkpvAccess(CtrLogBufSize)];
262   // defaults to writing data (no outlier changes)
263   writeSummaryFiles = 0;
264   writeData = true;
265   numEntries = 0;
266   // **CW** for simple delta encoding
267   prevTime = 0.0;
268   timeErr = 0.0;
269   globalStartTime = 0.0;
270   globalEndTime = 0.0;
271   headerWritten = 0;
272   numPhases = 0;
273   hasFlushed = false;
274
275   keepPhase = NULL;
276
277   fileCreated = false;
278   poolSize = CkpvAccess(CtrLogBufSize);
279   pgmname = new char[strlen(pgm)+1];
280   strcpy(pgmname, pgm);
281
282   //statistic init
283   statisLastProcessTimer = 0;
284   statisLastIdleTimer = 0;
285   statisLastPackTimer = 0;
286   statisLastUnpackTimer = 0;
287   statisTotalExecutionTime  = 0;
288   statisTotalIdleTime = 0;
289   statisTotalPackTime = 0;
290   statisTotalUnpackTime = 0;
291   statisTotalCreationMsgs = 0;
292   statisTotalCreationBytes = 0;
293   statisTotalMCastMsgs = 0;
294   statisTotalMCastBytes = 0;
295   statisTotalEnqueueMsgs = 0;
296   statisTotalDequeueMsgs = 0;
297   statisTotalRecvMsgs = 0;
298   statisTotalRecvBytes = 0;
299   statisTotalMemAlloc = 0;
300   statisTotalMemFree = 0;
301
302 }
303
304 void LogPool::createFile(const char *fix)
305 {
306   if (fileCreated) {
307     return;
308   }
309   
310   if(CmiNumPartitions() > 1) {
311     CmiMkdir(CkpvAccess(partitionRoot));
312   }
313
314   char* filenameLastPart = strrchr(pgmname, PATHSEP) + 1; // Last occurrence of path separator
315   char *pathPlusFilePrefix = new char[1024];
316  
317   if(nSubdirs > 0){
318     int sd = CkMyPe() % nSubdirs;
319     char *subdir = new char[1024];
320     sprintf(subdir, "%s.projdir.%d", pgmname, sd);
321     CmiMkdir(subdir);
322     sprintf(pathPlusFilePrefix, "%s%c%s%s", subdir, PATHSEP, filenameLastPart, fix);
323     delete[] subdir;
324   } else {
325     sprintf(pathPlusFilePrefix, "%s%s", pgmname, fix);
326   }
327
328   char pestr[10];
329   sprintf(pestr, "%d", CkMyPe());
330 #if CMK_PROJECTIONS_USE_ZLIB
331   int len;
332   if(compressed)
333     len = strlen(pathPlusFilePrefix)+strlen(".logold")+strlen(pestr)+strlen(".gz")+3;
334   else
335     len = strlen(pathPlusFilePrefix)+strlen(".logold")+strlen(pestr)+3;
336 #else
337   int len = strlen(pathPlusFilePrefix)+strlen(".logold")+strlen(pestr)+3;
338 #endif
339
340   if (nonDeltaLog) {
341     fname = new char[len];
342   }
343   if (deltaLog) {
344     dfname = new char[len];
345   }
346 #if CMK_PROJECTIONS_USE_ZLIB
347   if(compressed) {
348     if (deltaLog && nonDeltaLog) {
349       sprintf(fname, "%s.%s.logold.gz",  pathPlusFilePrefix, pestr);
350       sprintf(dfname, "%s.%s.log.gz", pathPlusFilePrefix, pestr);
351     } else {
352       if (nonDeltaLog) {
353         sprintf(fname, "%s.%s.log.gz", pathPlusFilePrefix,pestr);
354       } else {
355         sprintf(dfname, "%s.%s.log.gz", pathPlusFilePrefix, pestr);
356       }
357     }
358   } else {
359     if (deltaLog && nonDeltaLog) {
360       sprintf(fname, "%s.%s.logold", pathPlusFilePrefix, pestr);
361       sprintf(dfname, "%s.%s.log", pathPlusFilePrefix, pestr);
362     } else {
363       if (nonDeltaLog) {
364         sprintf(fname, "%s.%s.log", pathPlusFilePrefix, pestr);
365       } else {
366         sprintf(dfname, "%s.%s.log", pathPlusFilePrefix, pestr);
367       }
368     }
369   }
370 #else
371   if (deltaLog && nonDeltaLog) {
372     sprintf(fname, "%s.%s.logold", pathPlusFilePrefix, pestr);
373     sprintf(dfname, "%s.%s.log", pathPlusFilePrefix, pestr);
374   } else {
375     if (nonDeltaLog) {
376       sprintf(fname, "%s.%s.log", pathPlusFilePrefix, pestr);
377     } else {
378       sprintf(dfname, "%s.%s.log", pathPlusFilePrefix, pestr);
379     }
380   }
381 #endif
382   fileCreated = true;
383   delete[] pathPlusFilePrefix;
384   openLog("w");
385   CLOSE_LOG 
386 }
387
388 void LogPool::createSts(const char *fix)
389 {
390   CkAssert(CkMyPe() == 0);
391   if(CmiNumPartitions() > 1) {
392     CmiMkdir(CkpvAccess(partitionRoot));
393   }
394
395   // create the sts file
396   char *fname = new char[strlen(CkpvAccess(traceRoot))+strlen(fix)+strlen(".sts")+2];
397   sprintf(fname, "%s%s.sts", CkpvAccess(traceRoot), fix);
398   do
399     {
400       stsfp = fopen(fname, "w");
401     } while (!stsfp && (errno == EINTR || errno == EMFILE));
402   if(stsfp==0){
403     CmiPrintf("Cannot open projections sts file for writing due to %s\n", strerror(errno));
404     CmiAbort("Error!!\n");
405   }
406   delete[] fname;
407 }  
408
409 void LogPool::createTopo(const char *fix)
410 {
411   CkAssert(CkMyPe() == 0);
412   // create the topo file
413   char *fname = new char[strlen(CkpvAccess(traceRoot))+strlen(fix)+strlen(".topo")+2];
414   sprintf(fname, "%s%s.topo", CkpvAccess(traceRoot), fix);
415   do
416     {
417       topofp = fopen(fname, "w");
418     } while (!stsfp && (errno == EINTR || errno == EMFILE));
419   if(stsfp==0){
420     CmiPrintf("Cannot open projections topo file for writing due to %s\n", strerror(errno));
421     CmiAbort("Error!!\n");
422   }
423   delete[] fname;
424 }  
425
426 void LogPool::createRC()
427 {
428   // create the projections rc file.
429   fname = 
430     new char[strlen(CkpvAccess(traceRoot))+strlen(".projrc")+1];
431   sprintf(fname, "%s.projrc", CkpvAccess(traceRoot));
432   do {
433     rcfp = fopen(fname, "w");
434   } while (!rcfp && (errno == EINTR || errno == EMFILE));
435   if (rcfp==0) {
436     CmiAbort("Cannot open projections configuration file for writing.\n");
437   }
438   delete[] fname;
439 }
440
441 LogPool::~LogPool() 
442 {
443   if (writeData) {
444       if(writeSummaryFiles)
445           writeStatis();
446     writeLog();
447 #if !CMK_TRACE_LOGFILE_NUM_CONTROL
448     closeLog();
449 #endif
450   }
451
452 #if CMK_BIGSIM_CHARM
453   extern int correctTimeLog;
454   if (correctTimeLog) {
455     createFile("-bg");
456     if (CkMyPe() == 0) {
457       createSts("-bg");
458     }
459     writeHeader();
460     if (CkMyPe() == 0) writeSts(NULL);
461     postProcessLog();
462   }
463 #endif
464
465   delete[] pool;
466   delete [] fname;
467 }
468
469 void LogPool::writeHeader()
470 {
471   if (headerWritten) return;
472   headerWritten = 1;
473   if(!binary) {
474 #if CMK_PROJECTIONS_USE_ZLIB
475     if(compressed) {
476       if (nonDeltaLog) {
477         gzprintf(zfp, "PROJECTIONS-RECORD %d\n", numEntries);
478       }
479       if (deltaLog) {
480         gzprintf(deltazfp, "PROJECTIONS-RECORD %d DELTA\n", numEntries);
481       }
482     } 
483     else /* else clause is below... */
484 #endif
485     /*... may hang over from else above */ {
486       if (nonDeltaLog) {
487         fprintf(fp, "PROJECTIONS-RECORD %d\n", numEntries);
488       }
489       if (deltaLog) {
490         fprintf(deltafp, "PROJECTIONS-RECORD %d DELTA\n", numEntries);
491       }
492     }
493   }
494   else { // binary
495       if (nonDeltaLog) {
496         fwrite(&numEntries,sizeof(numEntries),1,fp);
497       }
498       if (deltaLog) {
499         fwrite(&numEntries,sizeof(numEntries),1,deltafp);
500       }
501   }
502 }
503
504 void LogPool::writeLog(void)
505 {
506   createFile();
507   OPEN_LOG
508   writeHeader();
509   if (nonDeltaLog) write(0);
510   if (deltaLog) write(1);
511   CLOSE_LOG
512 }
513
514 void LogPool::write(int writedelta) 
515 {
516   // **CW** Simple delta encoding implementation
517   // prevTime has to be maintained as an object variable because
518   // LogPool::write may be called several times depending on the
519   // +logsize value.
520   PUP::er *p = NULL;
521   if (binary) {
522     p = new PUP::toDisk(writedelta?deltafp:fp);
523   }
524 #if CMK_PROJECTIONS_USE_ZLIB
525   else if (compressed) {
526     p = new toProjectionsGZFile(writedelta?deltazfp:zfp);
527   }
528 #endif
529   else {
530     p = new toProjectionsFile(writedelta?deltafp:fp);
531   }
532   CmiAssert(p);
533   int curPhase = 0;
534   // **FIXME** - Should probably consider a more sophisticated bounds-based
535   //   approach for selective writing instead of making multiple if-checks
536   //   for every single event.
537   for(UInt i=0; i<numEntries; i++) {
538     if (!writedelta) {
539       if (keepPhase == NULL) {
540         // default case, when no phase selection is required.
541         pool[i].pup(*p);
542       } else {
543         // **FIXME** Might be a good idea to create a "filler" event block for
544         //   all the events taken out by phase filtering.
545         if (pool[i].type == END_PHASE) {
546           // always write phase markers
547           pool[i].pup(*p);
548           curPhase++;
549         } else if (pool[i].type == BEGIN_COMPUTATION ||
550                    pool[i].type == END_COMPUTATION) {
551           // always write BEGIN and END COMPUTATION markers
552           pool[i].pup(*p);
553         } else if (keepPhase[curPhase]) {
554           pool[i].pup(*p);
555         }
556       }
557     }
558     else {      // delta
559       // **FIXME** Implement phase-selective writing for delta logs
560       //   eventually
561       double time = pool[i].time;
562       if (pool[i].type != BEGIN_COMPUTATION && pool[i].type != END_COMPUTATION)
563       {
564         double timeDiff = (time-prevTime)*1.0e6;
565         UInt intTimeDiff = (UInt)timeDiff;
566         timeErr += timeDiff - intTimeDiff; /* timeErr is never >= 2.0 */
567         if (timeErr > 1.0) {
568           timeErr -= 1.0;
569           intTimeDiff++;
570         }
571         pool[i].time = intTimeDiff/1.0e6;
572       }
573       pool[i].pup(*p);
574       pool[i].time = time;      // restore time value
575       prevTime = time;
576     }
577   }
578   delete p;
579   delete [] keepPhase;
580 }
581
582 void LogPool::writeSts(void)
583 {
584   // for whining compilers
585   int i;
586   // generate an automatic unique ID for each log
587   fprintf(stsfp, "PROJECTIONS_ID %s\n", "");
588   fprintf(stsfp, "VERSION %s\n", PROJECTION_VERSION);
589   fprintf(stsfp, "TOTAL_PHASES %d\n", numPhases);
590 #if CMK_HAS_COUNTER_PAPI
591   fprintf(stsfp, "TOTAL_PAPI_EVENTS %d\n", NUMPAPIEVENTS);
592   // for now, use i, next time use papiEvents[i].
593   // **CW** papi event names is a hack.
594   char eventName[PAPI_MAX_STR_LEN];
595   for (i=0;i<NUMPAPIEVENTS;i++) {
596     PAPI_event_code_to_name(papiEvents[i], eventName);
597     fprintf(stsfp, "PAPI_EVENT %d %s\n", i, eventName);
598   }
599 #endif
600   traceWriteSTS(stsfp,CkpvAccess(usrEvents)->length());
601   for(i=0;i<CkpvAccess(usrEvents)->length();i++){
602     fprintf(stsfp, "EVENT %d %s\n", (*CkpvAccess(usrEvents))[i]->e, (*CkpvAccess(usrEvents))[i]->str);
603   }     
604 }
605
606 void LogPool::writeSts(TraceProjections *traceProj){
607   writeSts();
608   if (traceProj != NULL) {
609     CkHashtableIterator  *funcIter = traceProj->getfuncIterator();
610     funcIter->seekStart();
611     int numFuncs = traceProj->getFuncNumber();
612     fprintf(stsfp,"TOTAL_FUNCTIONS %d \n",numFuncs);
613     while(funcIter->hasNext()) {
614       StrKey *key;
615       int *obj = (int *)funcIter->next((void **)&key);
616       fprintf(stsfp,"FUNCTION %d %s \n",*obj,key->getStr());
617     }
618   }
619   fprintf(stsfp, "END\n");
620   fclose(stsfp);
621 }
622
623 void LogPool::writeRC(void)
624 {
625     //CkPrintf("write RC is being executed\n");
626 #ifdef PROJ_ANALYSIS  
627     CkAssert(CkMyPe() == 0);
628     fprintf(rcfp,"RC_GLOBAL_START_TIME %lld\n",
629             (CMK_PUP_LONG_LONG)(1.0e6*globalStartTime));
630     fprintf(rcfp,"RC_GLOBAL_END_TIME   %lld\n",
631             (CMK_PUP_LONG_LONG)(1.0e6*globalEndTime));
632     /* //Yanhua comment it because isOutlierAutomatic is not a variable in trace
633     if (CkpvAccess(_trace)->isOutlierAutomatic()) {
634       fprintf(rcfp,"RC_OUTLIER_FILTERED true\n");
635     } else {
636       fprintf(rcfp,"RC_OUTLIER_FILTERED false\n");
637     }
638     */
639 #endif //PROJ_ANALYSIS
640   fclose(rcfp);
641 }
642
643 void LogPool::writeTopo(void)
644 {
645   TopoManager tmgr;
646   tmgr.printAllocation(topofp);
647   fclose(topofp);
648 }
649
650 void LogPool::writeStatis(void)
651 {
652     
653    // create the statis file
654   char *fname = new char[strlen(CkpvAccess(traceRoot))+strlen(".statis")+10];
655   sprintf(fname, "%s.%d.statis", CkpvAccess(traceRoot), CkMyPe());
656   do
657   {
658       statisfp = fopen(fname, "w");
659   } while (!statisfp && (errno == EINTR || errno == EMFILE));
660   if(statisfp==0){
661       CmiPrintf("Cannot open projections statistic file for writing due to %s\n", strerror(errno));
662       CmiAbort("Error!!\n");
663   }
664   delete[] fname;
665   double totaltime = endComputationTime - beginComputationTime;
666   fprintf(statisfp, "time(sec) percentage\n");
667   fprintf(statisfp, "Time:    \t%f\n", totaltime);
668   fprintf(statisfp, "Idle :\t%f\t %.1f\n", statisTotalIdleTime, statisTotalIdleTime/totaltime * 100); 
669   fprintf(statisfp, "Overhead:    \t%f\t %.1f\n", totaltime - statisTotalIdleTime - statisTotalExecutionTime , (totaltime - statisTotalIdleTime - statisTotalExecutionTime)/totaltime * 100); 
670   fprintf(statisfp, "Exeuction:\t%f\t %.1f\n", statisTotalExecutionTime, statisTotalExecutionTime/totaltime*100); 
671   fprintf(statisfp, "Pack:     \t%f\t %.2f\n", statisTotalPackTime, statisTotalPackTime/totaltime*100); 
672   fprintf(statisfp, "Unpack:   \t%f\t %.2f\n", statisTotalUnpackTime, statisTotalUnpackTime/totaltime*100); 
673   fprintf(statisfp, "Creation Msgs Numbers, Bytes, Avg:   \t%lld\t %lld\t %lld \n", statisTotalCreationMsgs, statisTotalCreationBytes, (statisTotalCreationMsgs>0)?statisTotalCreationBytes/statisTotalCreationMsgs:statisTotalCreationMsgs); 
674   fprintf(statisfp, "Multicast Msgs Numbers, Bytes, Avg:   \t%lld\t %lld\t %lld \n", statisTotalMCastMsgs, statisTotalMCastBytes, (statisTotalMCastMsgs>0)?statisTotalMCastBytes/statisTotalMCastMsgs:statisTotalMCastMsgs); 
675   fprintf(statisfp, "Received Msgs Numbers, Bytes, Avg:   \t%lld\t %lld\t %lld \n", statisTotalRecvMsgs, statisTotalRecvBytes, (statisTotalRecvMsgs>0)?statisTotalRecvBytes/statisTotalRecvMsgs:statisTotalRecvMsgs); 
676   fclose(statisfp);
677 }
678
679 #if CMK_BIGSIM_CHARM
680 static void updateProjLog(void *data, double t, double recvT, void *ptr)
681 {
682   LogEntry *log = (LogEntry *)data;
683   FILE *fp = *(FILE **)ptr;
684   log->time = t;
685   log->recvTime = recvT<0.0?0:recvT;
686 //  log->write(fp);
687   toProjectionsFile p(fp);
688   log->pup(p);
689 }
690 #endif
691
692 // flush log entries to disk
693 void LogPool::flushLogBuffer()
694 {
695   if (numEntries) {
696     double writeTime = TraceTimer();
697     writeLog();
698     hasFlushed = true;
699     numEntries = 0;
700     new (&pool[numEntries++]) LogEntry(writeTime, BEGIN_INTERRUPT);
701     new (&pool[numEntries++]) LogEntry(TraceTimer(), END_INTERRUPT);
702     //CkPrintf("Warning: Projections log flushed to disk on PE %d.\n", CkMyPe());
703     if (!traceProjectionsGID.isZero()) {    // report flushing events to PE 0
704       CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
705       bocProxy[0].flush_warning(CkMyPe());
706     }
707   }
708 }
709
710 void LogPool::add(UChar type, UShort mIdx, UShort eIdx,
711                   double time, int event, int pe, int ml, CmiObjId *id, 
712                   double recvT, double cpuT, int numPe)
713 {
714     switch(type)
715     {
716         case BEGIN_COMPUTATION:
717             beginComputationTime = time;
718             break;
719         case END_COMPUTATION:
720             endComputationTime = time;
721             break;
722         case CREATION:
723             statisTotalCreationMsgs++;
724             statisTotalCreationBytes += ml;
725             break;
726         case CREATION_MULTICAST:
727             statisTotalMCastMsgs++;
728             statisTotalMCastBytes += ml;
729             break;
730         case ENQUEUE:
731             statisTotalEnqueueMsgs++;
732             break;
733         case DEQUEUE:
734             statisTotalDequeueMsgs++;
735             break;
736         case MESSAGE_RECV:
737             statisTotalRecvMsgs++;
738             statisTotalRecvBytes += ml;
739             break;
740         case MEMORY_MALLOC:
741             statisTotalMemAlloc++;
742             break;
743         case MEMORY_FREE:
744             statisTotalMemFree++;
745             break;
746         case BEGIN_PROCESSING:
747             statisLastProcessTimer = time;
748             break;
749         case BEGIN_UNPACK:
750             statisLastUnpackTimer = time;
751             break;
752         case BEGIN_PACK:
753             statisLastPackTimer = time;
754             break;
755         case BEGIN_IDLE:
756             statisLastIdleTimer = time;
757             break;
758         case END_PROCESSING:
759             statisTotalExecutionTime += (time - statisLastProcessTimer);
760             break;
761         case END_UNPACK:
762             statisTotalUnpackTime += (time - statisLastUnpackTimer);
763             break;
764         case END_PACK:
765             statisTotalPackTime += (time - statisLastPackTimer);
766             break;
767         case END_IDLE:
768             statisTotalIdleTime += (time - statisLastIdleTimer);
769             break;
770         default:
771             break;
772     }
773   new (&pool[numEntries++])
774     LogEntry(time, type, mIdx, eIdx, event, pe, ml, id, recvT, cpuT, numPe);
775   if ((type == END_PHASE) || (type == END_COMPUTATION)) {
776     numPhases++;
777   }
778   if(poolSize==numEntries) {
779     flushLogBuffer();
780 #if CMK_BIGSIM_CHARM
781     extern int correctTimeLog;
782     if (correctTimeLog) CmiAbort("I/O interrupt!\n");
783 #endif
784   }
785 #if CMK_BIGSIM_CHARM
786   switch (type) {
787     case BEGIN_PROCESSING:
788       pool[numEntries-1].recvTime = BgGetRecvTime();
789     case END_PROCESSING:
790     case BEGIN_COMPUTATION:
791     case END_COMPUTATION:
792     case CREATION:
793     case BEGIN_PACK:
794     case END_PACK:
795     case BEGIN_UNPACK:
796     case END_UNPACK:
797     case USER_EVENT_PAIR:
798       bgAddProjEvent(&pool[numEntries-1], numEntries-1, time, updateProjLog, &fp, BG_EVENT_PROJ);
799   }
800 #endif
801 }
802
803 void LogPool::add(UChar type,double time,UShort funcID,int lineNum,char *fileName){
804 #ifndef CMK_BIGSIM_CHARM
805   new (&pool[numEntries++])
806         LogEntry(time,type,funcID,lineNum,fileName);
807   if(poolSize == numEntries){
808     flushLogBuffer();
809   }
810 #endif  
811 }
812
813
814   
815 void LogPool::addMemoryUsage(unsigned char type,double time,double memUsage){
816 #ifndef CMK_BIGSIM_CHARM
817   new (&pool[numEntries++])
818         LogEntry(type,time,memUsage);
819   if(poolSize == numEntries){
820     flushLogBuffer();
821   }
822 #endif  
823         
824 }  
825
826
827
828 void LogPool::addUserSupplied(int data){
829         // add an event
830         add(USER_SUPPLIED, 0, 0, TraceTimer(), -1, -1, 0, 0, 0, 0, 0 );
831
832         // set the user supplied value for the previously created event 
833         pool[numEntries-1].setUserSuppliedData(data);
834   }
835
836
837 void LogPool::addUserSuppliedNote(char *note){
838         // add an event
839         add(USER_SUPPLIED_NOTE, 0, 0, TraceTimer(), -1, -1, 0, 0, 0, 0, 0 );
840
841         // set the user supplied note for the previously created event 
842         pool[numEntries-1].setUserSuppliedNote(note);
843   }
844
845 void LogPool::addUserSuppliedBracketedNote(char *note, int eventID, double bt, double et){
846   //CkPrintf("LogPool::addUserSuppliedBracketedNote eventID=%d\n", eventID);
847 #ifndef CMK_BIGSIM_CHARM
848 #if MPI_TRACE_MACHINE_HACK
849   //This part of code is used  to combine the contiguous
850   //MPI_Test and MPI_Iprobe events to reduce the number of
851   //entries
852 #define MPI_TEST_EVENT_ID 60
853 #define MPI_IPROBE_EVENT_ID 70 
854   int lastEvent = pool[numEntries-1].event;
855   if((eventID==MPI_TEST_EVENT_ID || eventID==MPI_IPROBE_EVENT_ID) && (eventID==lastEvent)){
856     //just replace the endtime of last event
857     //CkPrintf("addUserSuppliedBracketNote: for event %d\n", lastEvent);
858     pool[numEntries].endTime = et;
859   }else{
860     new (&pool[numEntries++])
861       LogEntry(bt, et, USER_SUPPLIED_BRACKETED_NOTE, note, eventID);
862   }
863 #else
864   new (&pool[numEntries++])
865     LogEntry(bt, et, USER_SUPPLIED_BRACKETED_NOTE, note, eventID);
866 #endif
867   if(poolSize == numEntries){
868     flushLogBuffer();
869   }
870 #endif  
871 }
872
873
874 /* **CW** Not sure if this is the right thing to do. Feels more like
875    a hack than a solution to Sameer's request to add the destination
876    processor information to multicasts and broadcasts.
877
878    In the unlikely event this method is used for Broadcasts as well,
879    pelist == NULL will be used to indicate a global broadcast with 
880    num PEs.
881 */
882 void LogPool::addCreationMulticast(UShort mIdx, UShort eIdx, double time,
883                                    int event, int pe, int ml, CmiObjId *id,
884                                    double recvT, int numPe, int *pelist)
885 {
886   new (&pool[numEntries++])
887     LogEntry(time, mIdx, eIdx, event, pe, ml, id, recvT, numPe, pelist);
888   if(poolSize==numEntries) {
889     flushLogBuffer();
890   }
891 }
892
893 void LogPool::postProcessLog()
894 {
895 #if CMK_BIGSIM_CHARM
896   bgUpdateProj(1);   // event type
897 #endif
898 }
899
900 void LogPool::modLastEntryTimestamp(double ts)
901 {
902   pool[numEntries-1].time = ts;
903   //pool[numEntries-1].cputime = ts;
904 }
905
906 // /** Constructor for a multicast log entry */
907 // 
908 //  THIS WAS MOVED TO trace-projections.h with the other constructors
909 // 
910 // LogEntry::LogEntry(double tm, unsigned short m, unsigned short e, int ev, int p,
911 //           int ml, CmiObjId *d, double rt, int numPe, int *pelist) 
912 // {
913 //     type = CREATION_MULTICAST; mIdx = m; eIdx = e; event = ev; pe = p; time = tm; msglen = ml;
914 //     if (d) id = *d; else {id.id[0]=id.id[1]=id.id[2]=id.id[3]=-1; };
915 //     recvTime = rt; 
916 //     numpes = numPe;
917 //     userSuppliedNote = NULL;
918 //     if (pelist != NULL) {
919 //      pes = new int[numPe];
920 //      for (int i=0; i<numPe; i++) {
921 //        pes[i] = pelist[i];
922 //      }
923 //     } else {
924 //      pes= NULL;
925 //     }
926 // }
927
928 //void LogEntry::addPapi(LONG_LONG_PAPI *papiVals)
929 //{
930 //#if CMK_HAS_COUNTER_PAPI
931 //   memcpy(papiValues, papiVals, sizeof(LONG_LONG_PAPI)*NUMPAPIEVENTS);
932 //#endif
933 //}
934
935
936
937 void LogEntry::pup(PUP::er &p)
938 {
939   int i;
940   CMK_TYPEDEF_UINT8 itime, iEndTime, irecvtime, icputime;
941   char ret = '\n';
942
943   p|type;
944   if (p.isPacking()) itime = (CMK_TYPEDEF_UINT8)(1.0e6*time);
945   if (p.isPacking()) iEndTime = (CMK_TYPEDEF_UINT8)(1.0e6*endTime);
946
947   switch (type) {
948     case USER_EVENT:
949     case USER_EVENT_PAIR:
950       p|mIdx; p|itime; p|event; p|pe;
951       break;
952     case BEGIN_IDLE:
953     case END_IDLE:
954     case BEGIN_PACK:
955     case END_PACK:
956     case BEGIN_UNPACK:
957     case END_UNPACK:
958       p|itime; p|pe; 
959       break;
960     case BEGIN_PROCESSING:
961       if (p.isPacking()) {
962         irecvtime = (CMK_TYPEDEF_UINT8)(recvTime==-1?-1:1.0e6*recvTime);
963         icputime = (CMK_TYPEDEF_UINT8)(1.0e6*cputime);
964       }
965       p|mIdx; p|eIdx; p|itime; p|event; p|pe; 
966       p|msglen; p|irecvtime; 
967       p|id.id[0]; p|id.id[1]; p|id.id[2]; p|id.id[3];
968       p|icputime;
969 #if CMK_HAS_COUNTER_PAPI
970       //p|numPapiEvents;
971       for (i=0; i<NUMPAPIEVENTS; i++) {
972         // not yet!!!
973         //      p|papiIDs[i]; 
974         p|CkpvAccess(papiValues)[i];
975         
976       }
977 #else
978       //p|numPapiEvents;     // non papi version has value 0
979 #endif
980       if (p.isUnpacking()) {
981         recvTime = irecvtime/1.0e6;
982         cputime = icputime/1.0e6;
983       }
984       break;
985     case END_PROCESSING:
986       if (p.isPacking()) icputime = (CMK_TYPEDEF_UINT8)(1.0e6*cputime);
987       p|mIdx; p|eIdx; p|itime; p|event; p|pe; p|msglen; p|icputime;
988 #if CMK_HAS_COUNTER_PAPI
989       //p|numPapiEvents;
990       for (i=0; i<NUMPAPIEVENTS; i++) {
991         // not yet!!!
992         //      p|papiIDs[i];
993         p|CkpvAccess(papiValues)[i];
994       }
995 #else
996       //p|numPapiEvents;  // non papi version has value 0
997 #endif
998       if (p.isUnpacking()) cputime = icputime/1.0e6;
999       break;
1000     case USER_SUPPLIED:
1001           p|userSuppliedData;
1002           p|itime;
1003         break;
1004     case USER_SUPPLIED_NOTE:
1005           p|itime;
1006           int length;
1007           if (p.isPacking()) length = strlen(userSuppliedNote);
1008           p | length;
1009           char space;
1010           space = ' ';
1011           p | space;
1012           if (p.isUnpacking()) {
1013             userSuppliedNote = new char[length+1];
1014             userSuppliedNote[length] = '\0';
1015           }
1016           PUParray(p,userSuppliedNote, length);
1017           break;
1018     case USER_SUPPLIED_BRACKETED_NOTE:
1019       //CkPrintf("Writting out a USER_SUPPLIED_BRACKETED_NOTE\n");
1020           p|itime;
1021           p|iEndTime;
1022           p|event;
1023           int length2;
1024           if (p.isPacking()) length2 = strlen(userSuppliedNote);
1025           p | length2;
1026           char space2;
1027           space2 = ' ';
1028           p | space2;
1029           if (p.isUnpacking()) {
1030             userSuppliedNote = new char[length+1];
1031             userSuppliedNote[length] = '\0';
1032           }
1033           PUParray(p,userSuppliedNote, length2);
1034           break;
1035     case MEMORY_USAGE_CURRENT:
1036       p | memUsage;
1037       p | itime;
1038         break;
1039     case CREATION:
1040       if (p.isPacking()) irecvtime = (CMK_TYPEDEF_UINT8)(1.0e6*recvTime);
1041       p|mIdx; p|eIdx; p|itime;
1042       p|event; p|pe; p|msglen; p|irecvtime;
1043       if (p.isUnpacking()) recvTime = irecvtime/1.0e6;
1044       break;
1045     case CREATION_BCAST:
1046       if (p.isPacking()) irecvtime = (CMK_TYPEDEF_UINT8)(1.0e6*recvTime);
1047       p|mIdx; p|eIdx; p|itime;
1048       p|event; p|pe; p|msglen; p|irecvtime; p|numpes;
1049       if (p.isUnpacking()) recvTime = irecvtime/1.0e6;
1050       break;
1051     case CREATION_MULTICAST:
1052       if (p.isPacking()) irecvtime = (CMK_TYPEDEF_UINT8)(1.0e6*recvTime);
1053       p|mIdx; p|eIdx; p|itime;
1054       p|event; p|pe; p|msglen; p|irecvtime; p|numpes;
1055       if (p.isUnpacking()) pes = numpes?new int[numpes]:NULL;
1056       for (i=0; i<numpes; i++) p|pes[i];
1057       if (p.isUnpacking()) recvTime = irecvtime/1.0e6;
1058       break;
1059     case MESSAGE_RECV:
1060       p|mIdx; p|eIdx; p|itime; p|event; p|pe; p|msglen;
1061       break;
1062
1063     case ENQUEUE:
1064     case DEQUEUE:
1065       p|mIdx; p|itime; p|event; p|pe;
1066       break;
1067
1068     case BEGIN_INTERRUPT:
1069     case END_INTERRUPT:
1070       p|itime; p|event; p|pe;
1071       break;
1072
1073       // **CW** absolute timestamps are used here to support a quick
1074       // way of determining the total time of a run in projections
1075       // visualization.
1076     case BEGIN_COMPUTATION:
1077     case END_COMPUTATION:
1078     case BEGIN_TRACE:
1079     case END_TRACE:
1080       p|itime;
1081       break;
1082     case BEGIN_FUNC:
1083         p | itime;
1084         p | mIdx;
1085         p | event;
1086         if(!p.isUnpacking()){
1087                 p(fName,flen-1);
1088         }
1089         break;
1090     case END_FUNC:
1091         p | itime;
1092         p | mIdx;
1093         break;
1094     case END_PHASE:
1095       p|eIdx; // FIXME: actually the phase ID
1096       p|itime;
1097       break;
1098     default:
1099       CmiError("***Internal Error*** Wierd Event %d.\n", type);
1100       break;
1101   }
1102   if (p.isUnpacking()) time = itime/1.0e6;
1103   p|ret;
1104 }
1105
1106 TraceProjections::TraceProjections(char **argv): 
1107   curevent(0), inEntry(0), computationStarted(0), 
1108         converseExit(0), endTime(0.0), traceNestedEvents(0),
1109         currentPhaseID(0), lastPhaseEvent(NULL)
1110 {
1111   //  CkPrintf("Trace projections dummy constructor called on %d\n",CkMyPe());
1112
1113   if (CkpvAccess(traceOnPe) == 0) return;
1114
1115   CtvInitialize(int,curThreadEvent);
1116   CkpvInitialize(CmiInt8, CtrLogBufSize);
1117   CkpvAccess(CtrLogBufSize) = DefaultLogBufSize;
1118   CtvAccess(curThreadEvent)=0;
1119   if (CmiGetArgLongDesc(argv,"+logsize",&CkpvAccess(CtrLogBufSize), 
1120                        "Log entries to buffer per I/O")) {
1121     if (CkMyPe() == 0) {
1122       CmiPrintf("Trace: logsize: %ld\n", CkpvAccess(CtrLogBufSize));
1123     }
1124   }
1125   checknested = 
1126     CmiGetArgFlagDesc(argv,"+checknested",
1127                       "check projections nest begin end execute events");
1128   traceNestedEvents = 
1129     CmiGetArgFlagDesc(argv,"+tracenested",
1130               "trace projections nest begin/end execute events");
1131   int binary = 
1132     CmiGetArgFlagDesc(argv,"+binary-trace",
1133                       "Write log files in binary format");
1134
1135   CmiInt8 nSubdirs = 0;
1136   CmiGetArgLongDesc(argv,"+trace-subdirs", &nSubdirs, "Number of subdirectories into which traces will be written");
1137
1138
1139 #if CMK_PROJECTIONS_USE_ZLIB
1140   int compressed = true;
1141   CmiGetArgFlagDesc(argv,"+gz-trace","Write log files pre-compressed with gzip");
1142   int disableCompressed = CmiGetArgFlagDesc(argv,"+no-gz-trace","Disable writing log files pre-compressed with gzip");
1143   compressed = compressed && !disableCompressed;
1144 #else
1145   // consume the flag so there's no confusing
1146   CmiGetArgFlagDesc(argv,"+gz-trace",
1147                     "Write log files pre-compressed with gzip");
1148   if(CkMyPe() == 0) CkPrintf("Warning> gz-trace is not supported on this machine!\n");
1149 #endif
1150
1151   int writeSummaryFiles = CmiGetArgFlagDesc(argv,"+write-analysis-file","Enable writing summary files "); 
1152   
1153   // **CW** default to non delta log encoding. The user may choose to do
1154   // create both logs (for debugging) or just the old log timestamping
1155   // (for compatibility).
1156   // Generating just the non delta log takes precedence over generating
1157   // both logs (if both arguments appear on the command line).
1158
1159   // switch to OLD log format until everything works // Gengbin
1160   nonDeltaLog = 1;
1161   deltaLog = 0;
1162   deltaLog = CmiGetArgFlagDesc(argv, "+logDelta",
1163                                   "Generate Delta encoded and simple timestamped log files");
1164
1165   _logPool = new LogPool(CkpvAccess(traceRoot));
1166   _logPool->setNumSubdirs(nSubdirs);
1167   _logPool->setBinary(binary);
1168   _logPool->setWriteSummaryFiles(writeSummaryFiles);
1169 #if CMK_PROJECTIONS_USE_ZLIB
1170   _logPool->setCompressed(compressed);
1171 #endif
1172   if (CkMyPe() == 0) {
1173     _logPool->createSts();
1174     _logPool->createRC();
1175     _logPool->createTopo();
1176   }
1177   funcCount=1;
1178
1179 #if CMK_HAS_COUNTER_PAPI
1180   initPAPI();
1181 #endif
1182 }
1183
1184 int TraceProjections::traceRegisterUserEvent(const char* evt, int e)
1185 {
1186   OPTIMIZED_VERSION
1187   CkAssert(e==-1 || e>=0);
1188   CkAssert(evt != NULL);
1189   int event;
1190   int biggest = -1;
1191   for (int i=0; i<CkpvAccess(usrEvents)->length(); i++) {
1192     int cur = (*CkpvAccess(usrEvents))[i]->e;
1193     if (cur == e) {
1194       //CmiPrintf("%s %s\n", (*CkpvAccess(usrEvents))[i]->str, evt);
1195       if (strcmp((*CkpvAccess(usrEvents))[i]->str, evt) == 0) 
1196         return e;
1197       else
1198         CmiAbort("UserEvent double registered!");
1199     }
1200     if (cur > biggest) biggest = cur;
1201   }
1202   // if no user events have so far been registered. biggest will be -1
1203   // and hence newly assigned event numbers will begin from 0.
1204   if (e==-1) event = biggest+1;  // automatically assign new event number
1205   else event = e;
1206   CkpvAccess(usrEvents)->push_back(new UsrEvent(event,(char *)evt));
1207   return event;
1208 }
1209
1210 void TraceProjections::traceClearEps(void)
1211 {
1212   // In trace-summary, this zeros out the EP bins, to eliminate noise
1213   // from startup.  Here, this isn't useful, since we can do that in
1214   // post-processing
1215 }
1216
1217 void TraceProjections::traceWriteSts(void)
1218 {
1219   if(CkMyPe()==0)
1220     _logPool->writeSts(this);
1221 }
1222
1223 /** 
1224  * **IMPT NOTES**:
1225  *
1226  * This is called when Converse closes during ConverseCommonExit().
1227  * **FIXME**(?) - is this also exposed as a tracing-framework API call?
1228  *
1229  * Some programs bypass CkExit() (like NAMD, which eventually calls
1230  * ConverseExit()), modules like traces will have to pretend to shutdown
1231  * as if CkExit() was called but at the same time avoid making
1232  * subsequent CkExit() calls (which is usually required for allowing
1233  * other modules to shutdown).
1234  *
1235  * Note that we can only get here if CkExit() was not called, since the
1236  * trace module will un-register itself from TraceArray if it did.
1237  *
1238  */
1239 void TraceProjections::traceClose(void)
1240 {
1241 #ifdef PROJ_ANALYSIS
1242   // CkPrintf("CkExit was not called on shutdown on [%d]\n", CkMyPe());
1243
1244   // sets the flag that tells the code not to make the CkExit call later
1245   converseExit = 1;
1246   if (CkMyPe() == 0) {
1247     CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1248     bocProxy.traceProjectionsParallelShutdown(-1);
1249   }
1250   if(CkMyRank() == CkMyNodeSize()){ //communication thread
1251     CkpvAccess(_trace)->endComputation();
1252     delete _logPool;              // will write
1253     // remove myself from traceArray so that no tracing will be called.
1254     CkpvAccess(_traces)->removeTrace(this);
1255   }
1256 #else
1257   // we've already deleted the logpool, so multiple calls to traceClose
1258   // are tolerated.
1259   if (_logPool == NULL) {
1260     return;
1261   }
1262   if(CkMyPe()==0){
1263     _logPool->writeSts(this);
1264   }
1265   CkpvAccess(_trace)->endComputation();
1266   delete _logPool;              // will write
1267   // remove myself from traceArray so that no tracing will be called.
1268   CkpvAccess(_traces)->removeTrace(this);
1269
1270   CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1271   bocProxy.ckLocalBranch()->print_warning();
1272 #endif
1273 }
1274
1275 /**
1276  *  **IMPT NOTES**:
1277  *
1278  *  This is meant to be called internally by the tracing framework.
1279  *
1280  */
1281 void TraceProjections::closeTrace() {
1282   //  CkPrintf("Close Trace called on [%d]\n", CkMyPe());
1283   if (CkMyPe() == 0) {
1284     // CkPrintf("Pe 0 will now write sts and projrc files\n");
1285     _logPool->writeSts(this);
1286     _logPool->writeRC();
1287     _logPool->writeTopo();
1288     // CkPrintf("Pe 0 has now written sts and projrc files\n");
1289
1290     CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1291     bocProxy.ckLocalBranch()->print_warning();
1292   }
1293   delete _logPool;       // will write logs to file
1294
1295 }
1296
1297 #if CMK_SMP_TRACE_COMMTHREAD
1298 void TraceProjections::traceBeginOnCommThread()
1299 {
1300   if (!computationStarted) return;
1301   _logPool->add(BEGIN_TRACE, 0, 0, TraceTimer(), curevent++, CmiNumPes()+CmiMyNode());
1302 }
1303
1304 void TraceProjections::traceEndOnCommThread()
1305 {
1306   _logPool->add(END_TRACE, 0, 0, TraceTimer(), curevent++, CmiNumPes()+CmiMyNode());
1307 }
1308 #endif
1309
1310 void TraceProjections::traceBegin(void)
1311 {
1312   if (!computationStarted) return;
1313   _logPool->add(BEGIN_TRACE, 0, 0, TraceTimer(), curevent++, CkMyPe());
1314 }
1315
1316 void TraceProjections::traceEnd(void)
1317 {
1318   _logPool->add(END_TRACE, 0, 0, TraceTimer(), curevent++, CkMyPe());
1319 }
1320
1321 void TraceProjections::userEvent(int e)
1322 {
1323   if (!computationStarted) return;
1324   _logPool->add(USER_EVENT, e, 0, TraceTimer(),curevent++,CkMyPe());
1325 }
1326
1327 void TraceProjections::userBracketEvent(int e, double bt, double et)
1328 {
1329   if (!computationStarted) return;
1330   // two events record Begin/End of event e.
1331   _logPool->add(USER_EVENT_PAIR, e, 0, TraceTimer(bt), curevent, CkMyPe());
1332   _logPool->add(USER_EVENT_PAIR, e, 0, TraceTimer(et), curevent++, CkMyPe());
1333 }
1334
1335 void TraceProjections::userSuppliedData(int d)
1336 {
1337   if (!computationStarted) return;
1338   _logPool->addUserSupplied(d);
1339 }
1340
1341 void TraceProjections::userSuppliedNote(char *note)
1342 {
1343   if (!computationStarted) return;
1344   _logPool->addUserSuppliedNote(note);
1345 }
1346
1347
1348 void TraceProjections::userSuppliedBracketedNote(char *note, int eventID, double bt, double et)
1349 {
1350   if (!computationStarted) return;
1351   _logPool->addUserSuppliedBracketedNote(note,  eventID,  bt, et);
1352 }
1353
1354 void TraceProjections::memoryUsage(double m)
1355 {
1356   if (!computationStarted) return;
1357   _logPool->addMemoryUsage(MEMORY_USAGE_CURRENT, TraceTimer(), m );
1358   
1359 }
1360
1361
1362 void TraceProjections::creation(envelope *e, int ep, int num)
1363 {
1364   double curTime = TraceTimer();
1365   if (e == 0) {
1366     CtvAccess(curThreadEvent) = curevent;
1367     _logPool->add(CREATION, ForChareMsg, ep, curTime,
1368                   curevent++, CkMyPe(), 0, NULL, 0, 0.0);
1369   } else {
1370     int type=e->getMsgtype();
1371     e->setEvent(curevent);
1372     if (num > 1) {
1373       _logPool->add(CREATION_BCAST, type, ep, curTime,
1374                     curevent++, CkMyPe(), e->getTotalsize(), 
1375                     NULL, 0, 0.0, num);
1376     } else {
1377       _logPool->add(CREATION, type, ep, curTime,
1378                     curevent++, CkMyPe(), e->getTotalsize(), 
1379                     NULL, 0, 0.0);
1380     }
1381   }
1382 }
1383
1384 //This function is only called from a comm thread in SMP mode. 
1385 void TraceProjections::creation(char *msg)
1386 {
1387 #if CMK_SMP_TRACE_COMMTHREAD
1388         // msg must be a charm message
1389     envelope *e = (envelope *)msg;
1390     int ep = e->getEpIdx();
1391     if(_entryTable[ep]->traceEnabled) {
1392         creation(e, ep, 1);
1393         e->setSrcPe(CkMyPe());              // pretend I am the sender
1394     }
1395 #endif
1396 }
1397
1398 void TraceProjections::traceCommSetMsgID(char *msg)
1399 {
1400 #if CMK_SMP_TRACE_COMMTHREAD
1401     // msg must be a charm message
1402     envelope *e = (envelope *)msg;
1403     int ep = e->getEpIdx();
1404     if(_entryTable[ep]->traceEnabled) {
1405         e->setSrcPe(CkMyPe());              // pretend I am the sender
1406         e->setEvent(curevent);
1407     }
1408 #endif
1409 }
1410
1411 void TraceProjections::traceGetMsgID(char *msg, int *pe, int *event)
1412 {
1413     // msg must be a charm message
1414     *pe = *event = -1;
1415     envelope *e = (envelope *)msg;
1416     int ep = e->getEpIdx();
1417     if(_entryTable[ep]->traceEnabled) {
1418         *pe = e->getSrcPe();
1419         *event = e->getEvent();
1420     }
1421 }
1422
1423 void TraceProjections::traceSetMsgID(char *msg, int pe, int event)
1424 {
1425        // msg must be a charm message
1426     envelope *e = (envelope *)msg;
1427     int ep = e->getEpIdx();
1428     if(ep<=0 || ep>=_entryTable.size()) return;
1429     if (e->getSrcPe()<0 || e->getSrcPe()>=CkNumPes()+CkNumNodes()) return;
1430     if (e->getMsgtype()<=0 || e->getMsgtype()>=LAST_CK_ENVELOPE_TYPE) return;
1431     if(_entryTable[ep]->traceEnabled) {
1432         e->setSrcPe(pe);
1433         e->setEvent(event);
1434     }
1435 }
1436
1437 /* **CW** Non-disruptive attempt to add destination PE knowledge to
1438    Communication Library-specific Multicasts via new event 
1439    CREATION_MULTICAST.
1440 */
1441
1442 void TraceProjections::creationMulticast(envelope *e, int ep, int num,
1443                                          int *pelist)
1444 {
1445   double curTime = TraceTimer();
1446   if (e==0) {
1447     CtvAccess(curThreadEvent)=curevent;
1448     _logPool->addCreationMulticast(ForChareMsg, ep, curTime, curevent++,
1449                                    CkMyPe(), 0, 0, 0.0, num, pelist);
1450   } else {
1451     int type=e->getMsgtype();
1452     e->setEvent(curevent);
1453     _logPool->addCreationMulticast(type, ep, curTime, curevent++, CkMyPe(),
1454                                    e->getTotalsize(), 0, 0.0, num, pelist);
1455   }
1456 }
1457
1458 void TraceProjections::creationDone(int num)
1459 {
1460   // modified the creation done time of the last num log entries
1461   // FIXME: totally a hack
1462   double curTime = TraceTimer();
1463   int idx = _logPool->numEntries-1;
1464   while (idx >=0 && num >0 ) {
1465     LogEntry &log = _logPool->pool[idx];
1466     if ((log.type == CREATION) ||
1467         (log.type == CREATION_BCAST) ||
1468         (log.type == CREATION_MULTICAST)) {
1469       log.recvTime = curTime - log.time;
1470       num --;
1471     }
1472     idx--;
1473   }
1474 }
1475
1476 void TraceProjections::beginExecute(CmiObjId *tid)
1477 {
1478 #if CMK_HAS_COUNTER_PAPI
1479   if (PAPI_read(CkpvAccess(papiEventSet), CkpvAccess(papiValues)) != PAPI_OK) {
1480     CmiAbort("PAPI failed to read at begin execute!\n");
1481   }
1482 #endif
1483   if (checknested && inEntry) CmiAbort("Nested Begin Execute!\n");
1484   execEvent = CtvAccess(curThreadEvent);
1485   execEp = (-1);
1486   _logPool->add(BEGIN_PROCESSING,ForChareMsg,_threadEP, TraceTimer(),
1487                 execEvent,CkMyPe(), 0, tid);
1488 #if CMK_HAS_COUNTER_PAPI
1489   _logPool->addPapi(CkpvAccess(papiValues));
1490 #endif
1491   inEntry = 1;
1492 }
1493
1494 void TraceProjections::beginExecute(envelope *e)
1495 {
1496   if(e==0) {
1497 #if CMK_HAS_COUNTER_PAPI
1498     if (PAPI_read(CkpvAccess(papiEventSet), CkpvAccess(papiValues)) != PAPI_OK) {
1499       CmiAbort("PAPI failed to read at begin execute!\n");
1500     }
1501 #endif
1502     if (checknested && inEntry) CmiAbort("Nested Begin Execute!\n");
1503     execEvent = CtvAccess(curThreadEvent);
1504     execEp = (-1);
1505     _logPool->add(BEGIN_PROCESSING,ForChareMsg,_threadEP, TraceTimer(),
1506                   execEvent,CkMyPe(), 0, NULL, 0.0, TraceCpuTimer());
1507 #if CMK_HAS_COUNTER_PAPI
1508     _logPool->addPapi(CkpvAccess(papiValues));
1509 #endif
1510     inEntry = 1;
1511   } else {
1512     beginExecute(e->getEvent(),e->getMsgtype(),e->getEpIdx(),
1513                  e->getSrcPe(),e->getTotalsize());
1514   }
1515 }
1516
1517 void TraceProjections::beginExecute(char *msg){
1518 #if CMK_SMP_TRACE_COMMTHREAD
1519         //This function is called from comm thread in SMP mode
1520     envelope *e = (envelope *)msg;
1521     int ep = e->getEpIdx();
1522     if(_entryTable[ep]->traceEnabled)
1523                 beginExecute(e);
1524 #endif
1525 }
1526
1527 void TraceProjections::beginExecute(int event, int msgType, int ep, int srcPe,
1528                                     int mlen, CmiObjId *idx)
1529 {
1530   if (traceNestedEvents) {
1531     if (! nestedEvents.isEmpty()) {
1532       endExecuteLocal();
1533     }
1534     nestedEvents.enq(NestedEvent(event, msgType, ep, srcPe, mlen, idx));
1535   }
1536   beginExecuteLocal(event, msgType, ep, srcPe, mlen, idx);
1537 }
1538
1539 void TraceProjections::changeLastEntryTimestamp(double ts)
1540 {
1541   _logPool->modLastEntryTimestamp(ts);
1542 }
1543
1544 void TraceProjections::beginExecuteLocal(int event, int msgType, int ep, int srcPe,
1545                                     int mlen, CmiObjId *idx)
1546 {
1547 #if CMK_HAS_COUNTER_PAPI
1548   if (PAPI_read(CkpvAccess(papiEventSet), CkpvAccess(papiValues)) != PAPI_OK) {
1549     CmiAbort("PAPI failed to read at begin execute!\n");
1550   }
1551 #endif
1552   if (checknested && inEntry) CmiAbort("Nested Begin Execute!\n");
1553   execEvent=event;
1554   execEp=ep;
1555   execPe=srcPe;
1556   _logPool->add(BEGIN_PROCESSING,msgType,ep, TraceTimer(),event,
1557                 srcPe, mlen, idx, 0.0, TraceCpuTimer());
1558 #if CMK_HAS_COUNTER_PAPI
1559   _logPool->addPapi(CkpvAccess(papiValues));
1560 #endif
1561   inEntry = 1;
1562 }
1563
1564 void TraceProjections::endExecute(void)
1565 {
1566   if (traceNestedEvents) nestedEvents.deq();
1567   endExecuteLocal();
1568   if (traceNestedEvents) {
1569     if (! nestedEvents.isEmpty()) {
1570       NestedEvent &ne = nestedEvents.peek();
1571       beginExecuteLocal(ne.event, ne.msgType, ne.ep, ne.srcPe, ne.ml, ne.idx);
1572     }
1573   }
1574 }
1575
1576 void TraceProjections::endExecute(char *msg)
1577 {
1578 #if CMK_SMP_TRACE_COMMTHREAD
1579         //This function is called from comm thread in SMP mode
1580     envelope *e = (envelope *)msg;
1581     int ep = e->getEpIdx();
1582     if(_entryTable[ep]->traceEnabled)
1583                 endExecute();
1584 #endif  
1585 }
1586
1587 void TraceProjections::endExecuteLocal(void)
1588 {
1589 #if CMK_HAS_COUNTER_PAPI
1590   if (PAPI_read(CkpvAccess(papiEventSet), CkpvAccess(papiValues)) != PAPI_OK) {
1591     CmiAbort("PAPI failed to read at end execute!\n");
1592   }
1593 #endif
1594   if (checknested && !inEntry) CmiAbort("Nested EndExecute!\n");
1595   double cputime = TraceCpuTimer();
1596   double now = TraceTimer();
1597   if(execEp == (-1)) {
1598     _logPool->add(END_PROCESSING, 0, _threadEP, now,
1599                   execEvent, CkMyPe(), 0, NULL, 0.0, cputime);
1600   } else {
1601     _logPool->add(END_PROCESSING, 0, execEp, now,
1602                   execEvent, execPe, 0, NULL, 0.0, cputime);
1603   }
1604 #if CMK_HAS_COUNTER_PAPI
1605   _logPool->addPapi(CkpvAccess(papiValues));
1606 #endif
1607   inEntry = 0;
1608 }
1609
1610 void TraceProjections::messageRecv(char *env, int pe)
1611 {
1612 #if 0
1613   envelope *e = (envelope *)env;
1614   int msgType = e->getMsgtype();
1615   int ep = e->getEpIdx();
1616 #if 0
1617   if (msgType==NewChareMsg || msgType==NewVChareMsg
1618           || msgType==ForChareMsg || msgType==ForVidMsg
1619           || msgType==BocInitMsg || msgType==NodeBocInitMsg
1620           || msgType==ForBocMsg || msgType==ForNodeBocMsg)
1621     ep = e->getEpIdx();
1622   else
1623     ep = _threadEP;
1624 #endif
1625   _logPool->add(MESSAGE_RECV, msgType, ep, TraceTimer(),
1626                 curevent++, e->getSrcPe(), e->getTotalsize());
1627 #endif
1628 }
1629
1630 void TraceProjections::beginIdle(double curWallTime)
1631 {
1632     _logPool->add(BEGIN_IDLE, 0, 0, TraceTimer(curWallTime), 0, CkMyPe());
1633 }
1634
1635 void TraceProjections::endIdle(double curWallTime)
1636 {
1637     _logPool->add(END_IDLE, 0, 0, TraceTimer(curWallTime), 0, CkMyPe());
1638 }
1639
1640 void TraceProjections::beginPack(void)
1641 {
1642     _logPool->add(BEGIN_PACK, 0, 0, TraceTimer(), 0, CkMyPe());
1643 }
1644
1645 void TraceProjections::endPack(void)
1646 {
1647     _logPool->add(END_PACK, 0, 0, TraceTimer(), 0, CkMyPe());
1648 }
1649
1650 void TraceProjections::beginUnpack(void)
1651 {
1652     _logPool->add(BEGIN_UNPACK, 0, 0, TraceTimer(), 0, CkMyPe());
1653 }
1654
1655 void TraceProjections::endUnpack(void)
1656 {
1657     _logPool->add(END_UNPACK, 0, 0, TraceTimer(), 0, CkMyPe());
1658 }
1659
1660 void TraceProjections::enqueue(envelope *) {}
1661
1662 void TraceProjections::dequeue(envelope *) {}
1663
1664 void TraceProjections::beginComputation(void)
1665 {
1666   computationStarted = 1;
1667   // Executes the callback function provided by the machine
1668   // layer. This is the proper method to register user events in a
1669   // machine layer because projections is a charm++ module.
1670   if (CkpvAccess(traceOnPe) != 0) {
1671     void (*ptr)() = registerMachineUserEvents();
1672     if (ptr != NULL) {
1673       ptr();
1674     }
1675   }
1676 //  CkpvAccess(traceInitTime) = TRACE_TIMER();
1677 //  CkpvAccess(traceInitCpuTime) = TRACE_CPUTIMER();
1678   _logPool->add(BEGIN_COMPUTATION, 0, 0, TraceTimer(), -1, -1);
1679 #if CMK_HAS_COUNTER_PAPI
1680   // we start the counters here
1681   if(CkpvAccess(papiStarted) == 0)
1682   {
1683       if (PAPI_start(CkpvAccess(papiEventSet)) != PAPI_OK) {
1684           CmiAbort("PAPI failed to start designated counters!\n");
1685       }
1686       CkpvAccess(papiStarted) = 1;
1687   }
1688 #endif
1689 }
1690
1691 void TraceProjections::endComputation(void)
1692 {
1693 #if CMK_HAS_COUNTER_PAPI
1694   // we stop the counters here. A silent failure is alright since we
1695   // are already at the end of the program.
1696   if(CkpvAccess(papiStopped) == 0) {
1697       if (PAPI_stop(CkpvAccess(papiEventSet), CkpvAccess(papiValues)) != PAPI_OK) {
1698           CkPrintf("Warning: PAPI failed to stop correctly!\n");
1699       }
1700       CkpvAccess(papiStopped) = 1;
1701   }
1702   // NOTE: We should not do a complete close of PAPI until after the
1703   // sts writer is done.
1704 #endif
1705   endTime = TraceTimer();
1706   _logPool->add(END_COMPUTATION, 0, 0, endTime, -1, -1);
1707   /*
1708   CkPrintf("End Computation [%d] records time as %lf\n", CkMyPe(), 
1709            endTime*1e06);
1710   */
1711 }
1712
1713 int TraceProjections::idxRegistered(int idx)
1714 {
1715     int idxVecLen = idxVec.size();
1716     for(int i=0; i<idxVecLen; i++)
1717     {
1718         if(idx == idxVec[i])
1719             return 1;
1720     }
1721     return 0;
1722 }
1723
1724 void TraceProjections::regFunc(const char *name, int &idx, int idxSpecifiedByUser){
1725     StrKey k((char*)name,strlen(name));
1726     int num = funcHashtable.get(k);
1727     
1728     if(num!=0) {
1729         return;
1730         //as for mpi programs, the same function may be registered for several times
1731         //CmiError("\"%s has been already registered! Please change the name!\"\n", name);
1732     }
1733     
1734     int isIdxExisting=0;
1735     if(idxSpecifiedByUser)
1736         isIdxExisting=idxRegistered(idx);
1737     if(isIdxExisting){
1738         return;
1739         //same reason with num!=0
1740         //CmiError("The identifier %d for the trace function has been already registered!", idx);
1741     }
1742
1743     if(idxSpecifiedByUser) {
1744         char *st = new char[strlen(name)+1];
1745         memcpy(st,name,strlen(name)+1);
1746         StrKey *newKey = new StrKey(st,strlen(st));
1747         int &ref = funcHashtable.put(*newKey);
1748         ref=idx;
1749         funcCount++;
1750         idxVec.push_back(idx);  
1751     } else {
1752         char *st = new char[strlen(name)+1];
1753         memcpy(st,name,strlen(name)+1);
1754         StrKey *newKey = new StrKey(st,strlen(st));
1755         int &ref = funcHashtable.put(*newKey);
1756         ref=funcCount;
1757         num = funcCount;
1758         funcCount++;
1759         idx = num;
1760         idxVec.push_back(idx);
1761     }
1762 }
1763
1764 void TraceProjections::beginFunc(char *name,char *file,int line){
1765         StrKey k(name,strlen(name));    
1766         unsigned short  num = (unsigned short)funcHashtable.get(k);
1767         beginFunc(num,file,line);
1768 }
1769
1770 void TraceProjections::beginFunc(int idx,char *file,int line){
1771         if(idx <= 0){
1772                 CmiError("Unregistered function id %d being used in %s:%d \n",idx,file,line);
1773         }       
1774         _logPool->add(BEGIN_FUNC,TraceTimer(),idx,line,file);
1775 }
1776
1777 void TraceProjections::endFunc(char *name){
1778         StrKey k(name,strlen(name));    
1779         int num = funcHashtable.get(k);
1780         endFunc(num);   
1781 }
1782
1783 void TraceProjections::endFunc(int num){
1784         if(num <= 0){
1785                 printf("endFunc without start :O\n");
1786         }
1787         _logPool->add(END_FUNC,TraceTimer(),num,0,NULL);
1788 }
1789
1790 // specialized PUP:ers for handling trace projections logs
1791 void toProjectionsFile::bytes(void *p,int n,size_t itemSize,dataType t)
1792 {
1793   for (int i=0;i<n;i++) 
1794     switch(t) {
1795     case Tchar: CheckAndFPrintF(f,"%c",((char *)p)[i]); break;
1796     case Tuchar:
1797     case Tbyte: CheckAndFPrintF(f,"%d",((unsigned char *)p)[i]); break;
1798     case Tshort: CheckAndFPrintF(f," %d",((short *)p)[i]); break;
1799     case Tushort: CheckAndFPrintF(f," %u",((unsigned short *)p)[i]); break;
1800     case Tint: CheckAndFPrintF(f," %d",((int *)p)[i]); break;
1801     case Tuint: CheckAndFPrintF(f," %u",((unsigned int *)p)[i]); break;
1802     case Tlong: CheckAndFPrintF(f," %ld",((long *)p)[i]); break;
1803     case Tulong: CheckAndFPrintF(f," %lu",((unsigned long *)p)[i]); break;
1804     case Tfloat: CheckAndFPrintF(f," %.7g",((float *)p)[i]); break;
1805     case Tdouble: CheckAndFPrintF(f," %.15g",((double *)p)[i]); break;
1806 #ifdef CMK_PUP_LONG_LONG
1807     case Tlonglong: CheckAndFPrintF(f," %lld",((CMK_PUP_LONG_LONG *)p)[i]); break;
1808     case Tulonglong: CheckAndFPrintF(f," %llu",((unsigned CMK_PUP_LONG_LONG *)p)[i]); break;
1809 #endif
1810     default: CmiAbort("Unrecognized pup type code!");
1811     };
1812 }
1813
1814 void fromProjectionsFile::bytes(void *p,int n,size_t itemSize,dataType t)
1815 {
1816   for (int i=0;i<n;i++) 
1817     switch(t) {
1818     case Tchar: { 
1819       char c = fgetc(f);
1820       if (c==EOF)
1821         parseError("Could not match character");
1822       else
1823         ((char *)p)[i] = c;
1824       break;
1825     }
1826     case Tuchar:
1827     case Tbyte: ((unsigned char *)p)[i]=(unsigned char)readInt("%d"); break;
1828     case Tshort:((short *)p)[i]=(short)readInt(); break;
1829     case Tushort: ((unsigned short *)p)[i]=(unsigned short)readUint(); break;
1830     case Tint:  ((int *)p)[i]=readInt(); break;
1831     case Tuint: ((unsigned int *)p)[i]=readUint(); break;
1832     case Tlong: ((long *)p)[i]=readInt(); break;
1833     case Tulong:((unsigned long *)p)[i]=readUint(); break;
1834     case Tfloat: ((float *)p)[i]=(float)readDouble(); break;
1835     case Tdouble:((double *)p)[i]=readDouble(); break;
1836 #ifdef CMK_PUP_LONG_LONG
1837     case Tlonglong: ((CMK_PUP_LONG_LONG *)p)[i]=readLongInt(); break;
1838     case Tulonglong: ((unsigned CMK_PUP_LONG_LONG *)p)[i]=readLongInt(); break;
1839 #endif
1840     default: CmiAbort("Unrecognized pup type code!");
1841     };
1842 }
1843
1844 #if CMK_PROJECTIONS_USE_ZLIB
1845 void toProjectionsGZFile::bytes(void *p,int n,size_t itemSize,dataType t)
1846 {
1847   for (int i=0;i<n;i++) 
1848     switch(t) {
1849     case Tchar: gzprintf(f,"%c",((char *)p)[i]); break;
1850     case Tuchar:
1851     case Tbyte: gzprintf(f,"%d",((unsigned char *)p)[i]); break;
1852     case Tshort: gzprintf(f," %d",((short *)p)[i]); break;
1853     case Tushort: gzprintf(f," %u",((unsigned short *)p)[i]); break;
1854     case Tint: gzprintf(f," %d",((int *)p)[i]); break;
1855     case Tuint: gzprintf(f," %u",((unsigned int *)p)[i]); break;
1856     case Tlong: gzprintf(f," %ld",((long *)p)[i]); break;
1857     case Tulong: gzprintf(f," %lu",((unsigned long *)p)[i]); break;
1858     case Tfloat: gzprintf(f," %.7g",((float *)p)[i]); break;
1859     case Tdouble: gzprintf(f," %.15g",((double *)p)[i]); break;
1860 #ifdef CMK_PUP_LONG_LONG
1861     case Tlonglong: gzprintf(f," %lld",((CMK_PUP_LONG_LONG *)p)[i]); break;
1862     case Tulonglong: gzprintf(f," %llu",((unsigned CMK_PUP_LONG_LONG *)p)[i]); break;
1863 #endif
1864     default: CmiAbort("Unrecognized pup type code!");
1865     };
1866 }
1867 #endif
1868
1869 void TraceProjections::endPhase() {
1870   double currentPhaseTime = TraceTimer();
1871   if (lastPhaseEvent != NULL) {
1872   } else {
1873     if (_logPool->pool != NULL) {
1874       // assumed to be BEGIN_COMPUTATION
1875     } else {
1876       CkPrintf("[%d] Warning: End Phase encountered in an empty log. Inserting BEGIN_COMPUTATION event\n", CkMyPe());
1877       _logPool->add(BEGIN_COMPUTATION, 0, 0, currentPhaseTime, -1, -1);
1878     }
1879   }
1880
1881   /* Insert endPhase event here. */
1882   /* FIXME: Format should be TYPE, PHASE#, TimeStamp, [StartTime] */
1883   /*        We currently "borrow" from the standard add() method. */
1884   /*        It should really be its own add() method.             */
1885   /* NOTE: assignment to lastPhaseEvent is "pre-emptive".         */
1886   lastPhaseEvent = &(_logPool->pool[_logPool->numEntries]);
1887   _logPool->add(END_PHASE, 0, currentPhaseID, currentPhaseTime, -1, CkMyPe());
1888   currentPhaseID++;
1889 }
1890
1891 #ifdef PROJ_ANALYSIS
1892 // ***** FROM HERE, ALL BOC-BASED FUNCTIONALITY IS DEFINED *******
1893
1894
1895 // ***@@@@ REGISTRATION FUNCTIONS/METHODS @@@@***
1896
1897 void registerOutlierReduction() {
1898   outlierReductionType =
1899     CkReduction::addReducer(outlierReduction);
1900   minMaxReductionType =
1901     CkReduction::addReducer(minMaxReduction);
1902 }
1903
1904 /**
1905  * **IMPT NOTES**:
1906  *
1907  * This is the C++ code that is registered to be activated at module
1908  * shutdown. This is called exactly once on processor 0. Module shutdown
1909  * is initiated as a result of a CkExit() call by the application code
1910  * 
1911  * The exit function must ultimately call CkExit() again to
1912  * so that other module exit functions may proceed after this module is
1913  * done.
1914  *
1915  */
1916 // FIXME: WHY extern "C"???
1917 extern "C" void TraceProjectionsExitHandler()
1918 {
1919 #if CMK_TRACE_ENABLED
1920   // CkPrintf("[%d] TraceProjectionsExitHandler called!\n", CkMyPe());
1921   CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1922   bocProxy.traceProjectionsParallelShutdown(CkMyPe());
1923 #else
1924   CkExit();
1925 #endif
1926 }
1927
1928 // This is called once on each processor but the idiom of use appears
1929 // to be to only have processor 0 register the function.
1930 //
1931 // See initnode in trace-projections.ci
1932 void initTraceProjectionsBOC()
1933 {
1934   // CkPrintf("[%d] Trace Projections initialization called!\n", CkMyPe());
1935 #ifdef __BIGSIM__
1936   if (BgNodeRank() == 0) {
1937 #else
1938     if (CkMyRank() == 0) {
1939 #endif
1940       registerExitFn(TraceProjectionsExitHandler);
1941     }
1942 #if 0
1943   } // this is so indentation does not get messed up
1944 #endif
1945 }
1946
1947 // mainchare for trace-projections BOC-operations. 
1948 // Instantiated at processor 0 and ONLY resides on processor 0 for the 
1949 // rest of its life.
1950 //
1951 // Responsible for:
1952 //   1. Handling commandline arguments
1953 //   2. Creating any objects required for proper BOC operations.
1954 //
1955 TraceProjectionsInit::TraceProjectionsInit(CkArgMsg *msg) {
1956   /** Options for Outlier Analysis */
1957   // defaults. Things will change with support for interactive analysis.
1958   bool findOutliers = false;
1959   bool outlierAutomatic = true;
1960   int numKSeeds = 10; 
1961
1962   int peNumKeep = CkNumPes();  // used as a default
1963   double entryThreshold = 0.0;
1964   bool outlierUsePhases = false;
1965   if (outlierAutomatic) {
1966     CmiGetArgIntDesc(msg->argv, "+outlierNumSeeds", &numKSeeds,
1967                      "Number of cluster seeds to apply at outlier analysis.");
1968     CmiGetArgIntDesc(msg->argv, "+outlierPeNumKeep", 
1969                      &peNumKeep, "Number of Processors to retain data");
1970     CmiGetArgDoubleDesc(msg->argv, "+outlierEpThresh", &entryThreshold,
1971                         "Minimum significance of entry points to be considered for clustering (%).");
1972     findOutliers =
1973       CmiGetArgFlagDesc(msg->argv,"+outlier", "Find Outliers.");
1974     outlierUsePhases = 
1975       CmiGetArgFlagDesc(msg->argv,"+outlierUsePhases",
1976                         "Apply automatic outlier analysis to any available phases.");
1977     if (outlierUsePhases) {
1978       // if the user wants to use an outlier feature, it is assumed outlier
1979       //    analysis is desired.
1980       findOutliers = true;
1981     }
1982   }
1983   bool findStartTime = (CmiTimerAbsolute()==1);
1984   traceProjectionsGID = CProxy_TraceProjectionsBOC::ckNew(findOutliers, findStartTime);
1985   if (findOutliers) {
1986     kMeansGID = CProxy_KMeansBOC::ckNew(outlierAutomatic,
1987                                         numKSeeds,
1988                                         peNumKeep,
1989                                         entryThreshold,
1990                                         outlierUsePhases);
1991   }
1992
1993   delete msg;
1994 }
1995
1996 // Called on every processor.
1997 void TraceProjectionsBOC::traceProjectionsParallelShutdown(int pe) {
1998   //CmiPrintf("[%d] traceProjectionsParallelShutdown called from . \n", CkMyPe(), pe);
1999   endPe = pe;                // the pe that starts CkExit()
2000   if (CkMyPe() == 0) {
2001     analysisStartTime = CmiWallTimer();
2002   }
2003   CkpvAccess(_trace)->endComputation();
2004   // no more tracing for projections on this processor after this point. 
2005   // Note that clear must be called after remove, or bad things will happen.
2006   CkpvAccess(_traces)->removeTrace(CkpvAccess(_trace));
2007   CkpvAccess(_traces)->clearTrace();
2008
2009   // From this point, we start multiple chains of reductions and broadcasts to
2010   // perform final online analysis activities.
2011
2012   // Start all parallel operations at once. 
2013   //   These MUST NOT modify base performance data in LogPool. If they must,
2014   //   then the parallel operations must be phased (and this code to be
2015   //   restructured as necessary)
2016   CProxy_KMeansBOC kMeansProxy(kMeansGID);
2017   CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
2018   if (findOutliers) {
2019     parModulesRemaining++;
2020     kMeansProxy[CkMyPe()].startKMeansAnalysis();
2021   }
2022   parModulesRemaining++;
2023   if (findStartTime) 
2024   bocProxy[CkMyPe()].startTimeAnalysis();
2025   else
2026   bocProxy[CkMyPe()].startEndTimeAnalysis();
2027 }
2028
2029 // handle flush log warnings
2030 void TraceProjectionsBOC::flush_warning(int pe) 
2031 {
2032     CmiAssert(CkMyPe() == 0);
2033     std::set<int>::iterator it;
2034     it = list.find(pe);
2035     if (it == list.end())    list.insert(pe);
2036     flush_count++;
2037 }
2038
2039 void TraceProjectionsBOC::print_warning() 
2040 {
2041     CmiAssert(CkMyPe() == 0);
2042     if (flush_count == 0) return;
2043     std::set<int>::iterator it;
2044     CkPrintf("*************************************************************\n");
2045     CkPrintf("Warning: Projections log flushed to disk %d times on %d cores: ", flush_count, list.size());
2046     for (it=list.begin(); it!=list.end(); it++)
2047       CkPrintf("%d ", *it);
2048     CkPrintf(".\n");
2049     CkPrintf("Warning: The performance data is likely invalid, unless the flushes have been explicitly synchronized by your program. \n");
2050     CkPrintf("*************************************************************\n");
2051 }
2052
2053 // Called on each processor
2054 void KMeansBOC::startKMeansAnalysis() {
2055   // Initialize all necessary structures
2056   LogPool *pool = CkpvAccess(_trace)->_logPool;
2057
2058  if(CkMyPe()==0)     CkPrintf("[%d] KMeansBOC::startKMeansAnalysis time=\t%g\n", CkMyPe(), CkWallTimer() );
2059   int flushInt = 0;
2060   if (pool->hasFlushed) {
2061     flushInt = 1;
2062   }
2063   
2064   CkCallback cb(CkIndex_KMeansBOC::flushCheck(NULL), 
2065                 0, thisProxy);
2066   contribute(sizeof(int), &flushInt, CkReduction::logical_or, cb);  
2067 }
2068
2069 // Called on processor 0
2070 void KMeansBOC::flushCheck(CkReductionMsg *msg) {
2071   int someFlush = *((int *)msg->getData());
2072
2073   // if(CkMyPe()==0) CkPrintf("[%d] KMeansBOC::flushCheck time=\t%g\n", CkMyPe(), CkWallTimer() );
2074   
2075   if (someFlush == 0) {
2076     // Data intact proceed with KMeans analysis
2077     CProxy_KMeansBOC kMeansProxy(kMeansGID);
2078     kMeansProxy.flushCheckDone();
2079   } else {
2080     // Some processor had flushed it data at some point, abandon KMeans
2081     CkPrintf("Warning: Some processor has flushed its data. No KMeans will be conducted\n");
2082     // terminate KMeans
2083     CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
2084     bocProxy[0].kMeansDone();
2085   }
2086 }
2087
2088 // Called on each processor
2089 void KMeansBOC::flushCheckDone() {
2090   // **FIXME** - more flexible metric collection scheme may be necessary
2091   //   in the future for production use.
2092   LogPool *pool = CkpvAccess(_trace)->_logPool;
2093
2094   // if(CkMyPe()==0)     CkPrintf("[%d] KMeansBOC::flushCheckDone time=\t%g\n", CkMyPe(), CkWallTimer() );
2095
2096   numEntryMethods = _entryTable.size();
2097   numMetrics = numEntryMethods + 2; // EPtime + idle and overhead
2098
2099   // maintained across phases
2100   markedBegin = false;
2101   markedIdle = false;
2102   beginBlockTime = 0.0;
2103   beginIdleBlockTime = 0.0;
2104   lastBeginEPIdx = -1; // none
2105
2106   lastPhaseIdx = 0;
2107   currentExecTimes = NULL;
2108   currentPhase = 0;
2109   selected = false;
2110
2111   pool->initializePhases();
2112
2113   // incoming K Seeds and the per-phase filter
2114   incKSeeds = new double[numK*numMetrics];
2115   keepMetric = new bool[numMetrics];
2116
2117   //  Something wrong when call thisProxy[CkMyPe()].getNextPhaseMetrics() !??!
2118   //  CProxy_KMeansBOC kMeansProxy(kMeansGID);
2119   //  kMeansProxy[CkMyPe()].getNextPhaseMetrics();
2120   thisProxy[CkMyPe()].getNextPhaseMetrics();
2121 }
2122
2123 // Called on each processor.
2124 void KMeansBOC::getNextPhaseMetrics() {
2125   // Assumes the presence of the complete logs on this processor.
2126   // Assumes first event is always BEGIN_COMPUTATION
2127   // Assumes each processor sees the same number of phases.
2128   //
2129   // In this code, we collect performance data for this processor.
2130   // All times are in seconds.
2131
2132   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::getNextPhaseMetrics time=\t%g\n", CkMyPe(), CkWallTimer() );  
2133
2134   if (usePhases) {
2135     DEBUGF("[%d] Using Phases\n", CkMyPe());
2136   } else {
2137     DEBUGF("[%d] NOT using Phases\n", CkMyPe());
2138   }
2139   
2140   if (currentExecTimes != NULL) {
2141     delete [] currentExecTimes;
2142   }
2143   currentExecTimes = new double[numMetrics];
2144   for (int i=0; i<numMetrics; i++) {
2145     currentExecTimes[i] = 0.0;
2146   }
2147
2148   int numEventMethods = _entryTable.size();
2149   LogPool *pool = CkpvAccess(_trace)->_logPool;
2150   
2151   CkAssert(pool->numEntries > lastPhaseIdx);
2152   double totalPhaseTime = 0.0;
2153   double totalActiveTime = 0.0; // entry method + idle
2154
2155   for (int i=lastPhaseIdx; i<pool->numEntries; i++) {
2156     if (pool->pool[i].type == BEGIN_PROCESSING) {
2157       // check pairing
2158       if (!markedBegin) {
2159         markedBegin = true;
2160       }
2161       beginBlockTime = pool->pool[i].time;
2162       lastBeginEPIdx = pool->pool[i].eIdx;
2163     } else if (pool->pool[i].type == END_PROCESSING) {
2164       // check pairing
2165       // if End without a begin, just ignore
2166       //   this event. If a phase-boundary is crossed, the Begin
2167       //   event would be maintained in beginBlockTime, so it is 
2168       //   not a problem.
2169       if (markedBegin) {
2170         markedBegin = false;
2171         if (pool->pool[i].event < 0)
2172         {
2173           // ignore dummy events. **FIXME** as they have no eIdx?
2174           continue;
2175         }
2176         currentExecTimes[pool->pool[i].eIdx] += 
2177           pool->pool[i].time - beginBlockTime;
2178         totalActiveTime += pool->pool[i].time - beginBlockTime;
2179         lastBeginEPIdx = -1;
2180       }
2181     } else if (pool->pool[i].type == BEGIN_IDLE) {
2182       // check pairing
2183       if (!markedIdle) {
2184         markedIdle = true;
2185       }
2186       beginIdleBlockTime = pool->pool[i].time;
2187     } else if (pool->pool[i].type == END_IDLE) {
2188       // check pairing
2189       if (markedIdle) {
2190         markedIdle = false;
2191         currentExecTimes[numEventMethods] += 
2192           pool->pool[i].time - beginIdleBlockTime;
2193         totalActiveTime += pool->pool[i].time - beginIdleBlockTime;
2194       }
2195     } else if (pool->pool[i].type == END_PHASE) {
2196       // ignored when not using phases
2197       if (usePhases) {
2198         // when we've not visited this node before
2199         if (i != lastPhaseIdx) { 
2200           totalPhaseTime = 
2201             pool->pool[i].time - pool->pool[lastPhaseIdx].time;
2202           // it is important that proper accounting of time take place here.
2203           // Note that END_PHASE events inevitably occur in the context of
2204           //   some entry method by the way the tracing API is designed.
2205           if (markedBegin) {
2206             CkAssert(lastBeginEPIdx >= 0);
2207             currentExecTimes[lastBeginEPIdx] += 
2208               pool->pool[i].time - beginBlockTime;
2209             totalActiveTime += pool->pool[i].time - beginBlockTime;
2210             // this is so the remainder contributes to the next phase
2211             beginBlockTime = pool->pool[i].time;
2212           }
2213           // The following is unlikely, but stranger things have happened.
2214           if (markedIdle) {
2215             currentExecTimes[numEventMethods] +=
2216               pool->pool[i].time - beginIdleBlockTime;
2217             totalActiveTime += pool->pool[i].time - beginIdleBlockTime;
2218             // this is so the remainder contributes to the next phase
2219             beginIdleBlockTime = pool->pool[i].time;
2220           }
2221           if (totalActiveTime <= totalPhaseTime) {
2222             currentExecTimes[numEventMethods+1] = 
2223               totalPhaseTime - totalActiveTime;
2224           } else {
2225             currentExecTimes[numEventMethods+1] = 0.0;
2226             CkPrintf("[%d] Warning: Overhead found to be negative for Phase %d!\n",
2227                      CkMyPe(), currentPhase);
2228           }
2229           collectKMeansData();
2230           // end the loop (and method) and defer the work till the next call
2231           lastPhaseIdx = i;
2232           break; 
2233         }
2234       }
2235     } else if (pool->pool[i].type == END_COMPUTATION) {
2236       if (markedBegin) {
2237         CkAssert(lastBeginEPIdx >= 0);
2238         currentExecTimes[lastBeginEPIdx] += 
2239           pool->pool[i].time - beginBlockTime;
2240         totalActiveTime += pool->pool[i].time - beginBlockTime;
2241       }
2242       if (markedIdle) {
2243         currentExecTimes[numEventMethods] +=
2244           pool->pool[i].time - beginIdleBlockTime;
2245         totalActiveTime += pool->pool[i].time - beginIdleBlockTime;
2246       }
2247       totalPhaseTime = 
2248         pool->pool[i].time - pool->pool[lastPhaseIdx].time;
2249       if (totalActiveTime <= totalPhaseTime) {
2250         currentExecTimes[numEventMethods+1] = totalPhaseTime - totalActiveTime;
2251       } else {
2252         currentExecTimes[numEventMethods+1] = 0.0;
2253         CkPrintf("[%d] Warning: Overhead found to be negative!\n",
2254                  CkMyPe());
2255       }
2256       collectKMeansData();
2257     }
2258   }
2259 }
2260
2261 /**
2262  *  Through a reduction, collectKMeansData aggregates each processors' data
2263  *  in order for global properties to be determined:
2264  *  
2265  *  1. min & max to determine normalization factors.
2266  *  2. sum to determine global EP averages for possible metric reduction
2267  *       through thresholding.
2268  *  3. sum of squares to compute stddev which may be useful in the future.
2269  *
2270  *  collectKMeansData will also keep the processor's data for the current
2271  *    phase so that it may be normalized and worked on subsequently.
2272  *
2273  **/
2274 void KMeansBOC::collectKMeansData() {
2275   int minOffset = numMetrics;
2276   int maxOffset = 2*numMetrics;
2277   int sosOffset = 3*numMetrics; // sos = Sum Of Squares
2278
2279   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::collectKMeansData time=\tg\n", CkMyPe(), CkWallTimer() );
2280
2281   double *reductionMsg = new double[numMetrics*4];
2282
2283   for (int i=0; i<numMetrics; i++) {
2284     reductionMsg[i] = currentExecTimes[i];
2285     // duplicate the event times for max and min sections of the reduction
2286     reductionMsg[minOffset + i] = currentExecTimes[i];
2287     reductionMsg[maxOffset + i] = currentExecTimes[i];
2288     // compute squares
2289     reductionMsg[sosOffset + i] = currentExecTimes[i]*currentExecTimes[i];
2290   }
2291
2292   CkCallback cb(CkIndex_KMeansBOC::globalMetricRefinement(NULL), 
2293                 0, thisProxy);
2294   contribute((numMetrics*4)*sizeof(double), reductionMsg, 
2295              outlierReductionType, cb);  
2296 }
2297
2298 // The purpose is mainly to initialize the k seeds and generate
2299 //   normalization parameters for each of the metrics. The k seeds
2300 //   and normalization parameters are broadcast to all processors.
2301 //
2302 // Called on processor 0
2303 void KMeansBOC::globalMetricRefinement(CkReductionMsg *msg) {
2304   CkAssert(CkMyPe() == 0);
2305   
2306   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::globalMetricRefinement time=\t%g\n", CkMyPe(), CkWallTimer() );
2307
2308   int sumOffset = 0;
2309   int minOffset = numMetrics;
2310   int maxOffset = 2*numMetrics;
2311   int sosOffset = 3*numMetrics; // sos = Sum Of Squares
2312
2313   // calculate statistics & boundaries for the k seeds for clustering
2314   KMeansStatsMessage *outmsg = 
2315     new (numMetrics, numK*numMetrics, numMetrics*4) KMeansStatsMessage;
2316   outmsg->numMetrics = numMetrics;
2317   outmsg->numKPos = numK*numMetrics;
2318   outmsg->numStats = numMetrics*4;
2319
2320   // Sum | Min | Max | Sum of Squares
2321   double *totalExecTimes = (double *)msg->getData();
2322   double totalTime = 0.0;
2323
2324   for (int i=0; i<numMetrics; i++) {
2325     DEBUGN("%lf\n", totalExecTimes[i]);
2326     totalTime += totalExecTimes[i];
2327
2328     // calculate event mean over all processors
2329     outmsg->stats[sumOffset + i] = totalExecTimes[sumOffset + i]/CkNumPes();
2330
2331     // get the ranges and offsets of each metric. With this, we get
2332     //   normalization factors that can be sent back to each processor to
2333     //   be used as necessary. We reuse max for range. Min remains the offset.
2334     outmsg->stats[minOffset + i] = totalExecTimes[minOffset + i];
2335     outmsg->stats[maxOffset + i] = totalExecTimes[maxOffset + i] -
2336       totalExecTimes[minOffset + i];
2337     
2338     // calculate stddev (using biased variance)
2339     outmsg->stats[sosOffset + i] = 
2340       sqrt((totalExecTimes[sosOffset + i] - 
2341             2*(outmsg->stats[i])*totalExecTimes[i] +
2342             (outmsg->stats[i])*(outmsg->stats[i])*CkNumPes())/
2343            CkNumPes());
2344   }
2345
2346   for (int i=0; i<numMetrics; i++) {
2347     // 1) if the proportion of the max value of the entry method relative to
2348     //   the average time taken over all entry methods across all processors
2349     //   is greater than the stipulated percentage threshold ...; AND
2350     // 2) if the range of values are non-zero.
2351     //
2352     // The current assumption is totalTime > 0 (what program has zero total
2353     //   time from all work?)
2354     keepMetric[i] = ((totalExecTimes[maxOffset + i]/(totalTime/CkNumPes()) >=
2355                      entryThreshold) &&
2356       (totalExecTimes[maxOffset + i] > totalExecTimes[minOffset + i]));
2357     if (keepMetric[i]) {
2358       DEBUGF("[%d] Keep EP %d | Max = %lf | Avg Tot = %lf\n", CkMyPe(), i,
2359              totalExecTimes[maxOffset + i], totalTime/CkNumPes());
2360     } else {
2361       DEBUGN("[%d] DO NOT Keep EP %d\n", CkMyPe(), i);
2362     }
2363     outmsg->filter[i] = keepMetric[i];
2364   }
2365
2366   delete msg;
2367   
2368   // initialize k seeds for this phase
2369   kSeeds = new double[numK*numMetrics];
2370
2371   numKReported = 0;
2372   kNumMembers = new int[numK];
2373
2374   // Randomly select k processors' metric vectors for the k seeds
2375   //  srand((unsigned)(CmiWallTimer()*1.0e06));
2376   srand(11337); // for debugging purposes
2377   for (int k=0; k<numK; k++) {
2378     DEBUGF("Seed %d | ", k);
2379     for (int m=0; m<numMetrics; m++) {
2380       double factor = totalExecTimes[maxOffset + m] - 
2381         totalExecTimes[minOffset + m];
2382       // "uniform" distribution, scaled according to the normalization
2383       //   factors
2384       //      kSeeds[numMetrics*k + m] = ((1.0*(k+1))/numK)*factor;
2385       // Random distribution.
2386       kSeeds[numMetrics*k + m] =
2387         ((rand()*1.0)/RAND_MAX)*factor;
2388       if (keepMetric[m]) {
2389         DEBUGF("[%d|%lf] ", m, kSeeds[numMetrics*k + m]);
2390       }
2391       outmsg->kSeedsPos[numMetrics*k + m] = kSeeds[numMetrics*k + m];
2392     }
2393     DEBUGF("\n");
2394     kNumMembers[k] = 0;
2395   }
2396
2397   // broadcast statistical values to all processors for cluster discovery
2398   thisProxy.findInitialClusters(outmsg);
2399 }
2400
2401
2402
2403 // Called on each processor.
2404 void KMeansBOC::findInitialClusters(KMeansStatsMessage *msg) {
2405
2406  if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::findInitialClusters time=\t%g\n", CkMyPe(), CkWallTimer() );
2407
2408   phaseIter = 0;
2409
2410   // Get info from stats message
2411   CkAssert(numMetrics == msg->numMetrics);
2412   for (int i=0; i<numMetrics; i++) {
2413     keepMetric[i] = msg->filter[i];
2414   }
2415
2416   // Normalize data on local processor.
2417   // **CWL** See my thesis for detailed discussion of normalization of
2418   //    performance data.
2419   // **NOTE** This might change if we want to send data based on the filter
2420   //   instead of all the data.
2421   CkAssert(numMetrics*4 == msg->numStats);
2422   for (int i=0; i<numMetrics; i++) {
2423     currentExecTimes[i] -= msg->stats[numMetrics + i];  // take offset
2424     // **CWL** We do not normalize the range. Entry methods that exhibit
2425     //   large absolute timing variations should be allowed to contribute
2426     //   more to the Euclidean distance measure!
2427     // currentExecTimes[i] /= msg->stats[2*numMetrics + i];
2428   }
2429
2430   // **NOTE** This might change if we want to send data based on the filter
2431   //   instead of all the data.
2432   CkAssert(numK*numMetrics == msg->numKPos);
2433   for (int i=0; i<msg->numKPos; i++) {
2434     incKSeeds[i] = msg->kSeedsPos[i];
2435   }
2436
2437   // Decide which KSeed this processor belongs to.
2438   minDistance = calculateDistance(0);
2439   DEBUGN("[%d] Phase %d Iter %d | Distance from 0 = %lf \n", CkMyPe(), 
2440            currentPhase, phaseIter, minDistance);
2441   minK = 0;
2442   for (int i=1; i<numK; i++) {
2443     double distance = calculateDistance(i);
2444     DEBUGN("[%d] Phase %d Iter %d | Distance from %d = %lf \n", CkMyPe(), 
2445              currentPhase, phaseIter, i, distance);
2446     if (distance < minDistance) {
2447       minDistance = distance;
2448       minK = i;
2449     }
2450   }
2451
2452   // Set up a reduction with the modification vector to the root (0).
2453   //
2454   // The modification vector sends a negative value for each metric
2455   //   for the K this processor no longer belongs to and a positive
2456   //   value to the K the processor now belongs. In addition, a -1.0
2457   //   is sent to the K it is leaving and a +1.0 to the K it is 
2458   //   joining.
2459   //
2460   // The processor must still contribute a "zero returns" even if
2461   //   nothing changes. This will be the basis for determine
2462   //   convergence at the root.
2463   //
2464   // The addtional +1 is meant for the count-change that must be
2465   //   maintained for the special cases at the root when some K
2466   //   may be deprived of all processor points or go from 0 to a
2467   //   positive number of processors (see later comments).
2468   double *modVector = new double[numK*(numMetrics+1)];
2469   for (int i=0; i<numK; i++) {
2470     for (int j=0; j<numMetrics+1; j++) {
2471       modVector[i*(numMetrics+1) + j] = 0.0;
2472     }
2473   }
2474   for (int i=0; i<numMetrics; i++) {
2475     // for this initialization, only positive values need be sent.
2476     modVector[minK*(numMetrics+1) + i] = currentExecTimes[i];
2477   }
2478   modVector[minK*(numMetrics+1)+numMetrics] = 1.0;
2479
2480   CkCallback cb(CkIndex_KMeansBOC::updateKSeeds(NULL), 
2481                 0, thisProxy);
2482   contribute(numK*(numMetrics+1)*sizeof(double), modVector, 
2483              CkReduction::sum_double, cb);  
2484 }
2485
2486 double KMeansBOC::calculateDistance(int k) {
2487   double ret = 0.0;
2488   for (int i=0; i<numMetrics; i++) {
2489     if (keepMetric[i]) {
2490       DEBUGN("[%d] Phase %d Iter %d Metric %d Exec %lf Seed %lf \n", 
2491              CkMyPe(), currentPhase, phaseIter, i,
2492                currentExecTimes[i], incKSeeds[k*numMetrics + i]);
2493       ret += pow(currentExecTimes[i] - incKSeeds[k*numMetrics + i], 2.0);
2494     }
2495   }
2496   return sqrt(ret);
2497 }
2498
2499 void KMeansBOC::updateKSeeds(CkReductionMsg *msg) {
2500   CkAssert(CkMyPe() == 0);
2501
2502   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::updateKSeeds time=\t%g\n", CkMyPe(), CkWallTimer() );
2503
2504   double *modVector = (double *)msg->getData();
2505   // sanity check
2506   CkAssert(numK*(numMetrics+1)*sizeof(double) == msg->getSize());
2507
2508   // A quick convergence test.
2509   bool hasChanges = false;
2510   for (int i=0; i<numK; i++) {
2511     hasChanges = hasChanges || 
2512       (modVector[i*(numMetrics+1) + numMetrics] != 0.0);
2513   }
2514   if (!hasChanges) {
2515     delete msg;
2516     findRepresentatives();
2517   } else {
2518     int overallChange = 0;
2519     for (int i=0; i<numK; i++) {
2520       int change = (int)modVector[i*(numMetrics+1) + numMetrics];
2521       if (change != 0) {
2522         overallChange += change;
2523         // modify the k seeds based on the modification vectors coming in
2524         //
2525         // If a seed initially has no members, its contents do not matter and
2526         //   is simply set to the average of the incoming vector.
2527         // If the change causes a seed to lose all its members, do nothing.
2528         //   Its last-known location is kept to allow it to re-capture
2529         //   membership at the next iteration rather than apply the last
2530         //   changes (which snaps the point unnaturally to 0,0).
2531         // Otherwise, apply the appropriate vector changes.
2532         CkAssert((kNumMembers[i] + change >= 0) &&
2533                  (kNumMembers[i] + change <= CkNumPes()));
2534         if (kNumMembers[i] == 0) {
2535           CkAssert(change > 0);
2536           for (int j=0; j<numMetrics; j++) {
2537             kSeeds[i*numMetrics + j] = modVector[i*(numMetrics+1) + j]/change;
2538           }
2539         } else if (kNumMembers[i] + change == 0) {
2540           // do nothing.
2541         } else {
2542           for (int j=0; j<numMetrics; j++) {
2543             kSeeds[i*numMetrics + j] *= kNumMembers[i];
2544             kSeeds[i*numMetrics + j] += modVector[i*(numMetrics+1) + j];
2545             kSeeds[i*numMetrics + j] /= kNumMembers[i] + change;
2546           }
2547         }
2548         kNumMembers[i] += change;
2549       }
2550       DEBUGN("[%d] Phase %d Iter %d K = %d Membership Count = %d\n",
2551              CkMyPe(), currentPhase, phaseIter, i, kNumMembers[i]);
2552     }
2553     delete msg;
2554
2555     // broadcast the new seed locations.
2556     KSeedsMessage *outmsg = new (numK*numMetrics) KSeedsMessage;
2557     outmsg->numKPos = numK*numMetrics;
2558     for (int i=0; i<numK*numMetrics; i++) {
2559       outmsg->kSeedsPos[i] = kSeeds[i];
2560     }
2561
2562     thisProxy.updateSeedMembership(outmsg);
2563   }
2564 }
2565
2566 // Called on all processors
2567 void KMeansBOC::updateSeedMembership(KSeedsMessage *msg) {
2568
2569   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::updateSeedMembership time=\t%g\n", CkMyPe(), CkWallTimer() );
2570
2571   phaseIter++;
2572
2573   // **NOTE** This might change if we want to send data based on the filter
2574   //   instead of all the data.
2575   CkAssert(numK*numMetrics == msg->numKPos);
2576   for (int i=0; i<msg->numKPos; i++) {
2577     incKSeeds[i] = msg->kSeedsPos[i];
2578   }
2579
2580   // Decide which KSeed this processor belongs to.
2581   lastMinK = minK;
2582   minDistance = calculateDistance(0);
2583   DEBUGN("[%d] Phase %d Iter %d | Distance from 0 = %lf \n", CkMyPe(), 
2584          currentPhase, phaseIter, minDistance);
2585
2586   minK = 0;
2587   for (int i=1; i<numK; i++) {
2588     double distance = calculateDistance(i);
2589     DEBUGN("[%d] Phase %d Iter %d | Distance from %d = %lf \n", CkMyPe(), 
2590            currentPhase, phaseIter, i, distance);
2591     if (distance < minDistance) {
2592       minDistance = distance;
2593       minK = i;
2594     }
2595   }
2596
2597   double *modVector = new double[numK*(numMetrics+1)];
2598   for (int i=0; i<numK; i++) {
2599     for (int j=0; j<numMetrics+1; j++) {
2600       modVector[i*(numMetrics+1) + j] = 0.0;
2601     }
2602   }
2603
2604   if (minK != lastMinK) {
2605     for (int i=0; i<numMetrics; i++) {
2606       modVector[minK*(numMetrics+1) + i] = currentExecTimes[i];
2607       modVector[lastMinK*(numMetrics+1) + i] = -currentExecTimes[i];
2608     }
2609     modVector[minK*(numMetrics+1)+numMetrics] = 1.0;
2610     modVector[lastMinK*(numMetrics+1)+numMetrics] = -1.0;
2611   }
2612
2613   CkCallback cb(CkIndex_KMeansBOC::updateKSeeds(NULL), 
2614                 0, thisProxy);
2615   contribute(numK*(numMetrics+1)*sizeof(double), modVector, 
2616              CkReduction::sum_double, cb);  
2617 }
2618
2619 void KMeansBOC::findRepresentatives() {
2620
2621   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::findRepresentatives time=\t%g\n", CkMyPe(), CkWallTimer() );
2622
2623   int numNonEmptyClusters = 0;
2624   for (int i=0; i<numK; i++) {
2625     if (kNumMembers[i] > 0) {
2626       numNonEmptyClusters++;
2627     }
2628   }
2629
2630   int numRepresentatives = peNumKeep;
2631   // **FIXME**
2632   // This is fairly arbitrary. Next time, choose the centers of the top
2633   //   largest clusters.
2634   if (numRepresentatives < numNonEmptyClusters) {
2635     numRepresentatives = numNonEmptyClusters;
2636   }
2637
2638   int slotsRemaining = numRepresentatives;
2639
2640   DEBUGF("Slots = %d | Non-empty = %d \n", slotsRemaining, 
2641          numNonEmptyClusters);
2642
2643   // determine how many exemplars to select per cluster. Currently
2644   //   hardcoded to 1. Future challenge is to decide on other numbers
2645   //   or proportionality.
2646   //
2647   int exemplarsPerCluster = 1;
2648   slotsRemaining -= exemplarsPerCluster*numNonEmptyClusters;
2649
2650   int numCandidateOutliers = CkNumPes() - 
2651     exemplarsPerCluster*numNonEmptyClusters;
2652
2653   double *remainders = new double[numK];
2654   int *assigned = new int[numK];
2655   exemplarChoicesLeft = new int[numK];
2656   outlierChoicesLeft = new int[numK];
2657
2658   for (int i=0; i<numK; i++) {
2659     assigned[i] = 0;
2660     remainders[i] = 
2661       (kNumMembers[i] - exemplarsPerCluster*numNonEmptyClusters) *
2662       slotsRemaining / numCandidateOutliers;
2663     if (remainders[i] >= 0.0) {
2664       assigned[i] = (int)floor(remainders[i]);
2665       remainders[i] -= assigned[i];
2666     } else {
2667       remainders[i] = 0.0;
2668     }
2669   }
2670
2671   for (int i=0; i<numK; i++) {
2672     slotsRemaining -= assigned[i];
2673   }
2674   CkAssert(slotsRemaining >= 0);
2675
2676   // find clusters to assign the loose slots to, in order of
2677   // remainder proportion
2678   while (slotsRemaining > 0) {
2679     double max = 0.0;
2680     int winner = 0;
2681     for (int i=0; i<numK; i++) {
2682       if (remainders[i] > max) {
2683         max = remainders[i];
2684         winner = i;
2685       }
2686     }
2687     assigned[winner]++;
2688     remainders[winner] = 0.0;
2689     slotsRemaining--;
2690   }
2691
2692   // set up how many reduction cycles of min/max we need to conduct to
2693   // select the representatives.
2694   numSelectionIter = exemplarsPerCluster;
2695   for (int i=0; i<numK; i++) {
2696     if (assigned[i] > numSelectionIter) {
2697       numSelectionIter = assigned[i];
2698     }
2699   }
2700   DEBUGF("Selection Iterations = %d\n", numSelectionIter);
2701
2702   for (int i=0; i<numK; i++) {
2703     if (kNumMembers[i] > 0) {
2704       exemplarChoicesLeft[i] = exemplarsPerCluster;
2705       outlierChoicesLeft[i] = assigned[i];
2706     } else {
2707       exemplarChoicesLeft[i] = 0;
2708       outlierChoicesLeft[i] = 0;
2709     }
2710     DEBUGF("%d | Exemplar = %d | Outlier = %d\n", i, exemplarChoicesLeft[i],
2711            outlierChoicesLeft[i]);
2712   }
2713
2714   delete [] assigned;
2715   delete [] remainders;
2716
2717   // send out first broadcast
2718   KSelectionMessage *outmsg = NULL;
2719   if (numSelectionIter > 0) {
2720     outmsg = new (numK, numK, numK) KSelectionMessage;
2721     outmsg->numKMinIDs = numK;
2722     outmsg->numKMaxIDs = numK;
2723     for (int i=0; i<numK; i++) {
2724       outmsg->minIDs[i] = -1;
2725       outmsg->maxIDs[i] = -1;
2726     }
2727     thisProxy.collectDistances(outmsg);
2728   } else {
2729     CkPrintf("Warning: No selection iteration from the start!\n");
2730     // invoke phase completion on all processors
2731     thisProxy.phaseDone();
2732   }
2733 }
2734
2735 /*
2736  *  lastMin = array of minimum champions of the last tournament
2737  *  lastMax = array of maximum champions of the last tournament
2738  *  lastMaxVal = array of last encountered maximum values, allows previous
2739  *                 minimum winners to eliminate themselves from the next
2740  *                 minimum race.
2741  *
2742  *  Called on all processors.
2743  */
2744 void KMeansBOC::collectDistances(KSelectionMessage *msg) {
2745
2746   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::collectDistances time=\t%g\n", CkMyPe(), CkWallTimer() );
2747
2748   DEBUGF("[%d] %d | min = %d max = %d\n", CkMyPe(),
2749          lastMinK, msg->minIDs[lastMinK], msg->maxIDs[lastMinK]);
2750   if ((CkMyPe() == msg->minIDs[lastMinK]) || 
2751       (CkMyPe() == msg->maxIDs[lastMinK])) {
2752     CkAssert(!selected);
2753     selected = true;
2754   }
2755
2756   // build outgoing reduction structure
2757   //   format = minVal | ID | maxVal | ID
2758   double *minMaxAndIDs = NULL;
2759
2760   minMaxAndIDs = new double[numK*4];
2761   // initialize to the appropriate out-of-band values (for error checks)
2762   for (int i=0; i<numK; i++) {
2763     minMaxAndIDs[i*4] = -1.0; // out-of-band min value
2764     minMaxAndIDs[i*4+1] = -1.0; // out of band ID
2765     minMaxAndIDs[i*4+2] = -1.0; // out-of-band max value
2766     minMaxAndIDs[i*4+3] = -1.0; // out of band ID
2767   }
2768   // If I have not won before, I put myself back into the competition
2769   if (!selected) {
2770     DEBUGF("[%d] My Contribution = %lf\n", CkMyPe(), minDistance);
2771     minMaxAndIDs[lastMinK*4] = minDistance;
2772     minMaxAndIDs[lastMinK*4+1] = CkMyPe();
2773     minMaxAndIDs[lastMinK*4+2] = minDistance;
2774     minMaxAndIDs[lastMinK*4+3] = CkMyPe();
2775   }
2776   delete msg;
2777
2778   CkCallback cb(CkIndex_KMeansBOC::findNextMinMax(NULL), 
2779                 0, thisProxy);
2780   contribute(numK*4*sizeof(double), minMaxAndIDs, 
2781              minMaxReductionType, cb);  
2782 }
2783
2784 void KMeansBOC::findNextMinMax(CkReductionMsg *msg) {
2785   // incoming format:
2786   //   minVal | minID | maxVal | maxID
2787
2788   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::findNextMinMax time=\t%g\n", CkMyPe(), CkWallTimer() );
2789
2790   if (numSelectionIter > 0) {
2791     double *incInfo = (double *)msg->getData();
2792     
2793     KSelectionMessage *outmsg = new (numK, numK) KSelectionMessage;
2794     outmsg->numKMinIDs = numK;
2795     outmsg->numKMaxIDs = numK;
2796     
2797     for (int i=0; i<numK; i++) {
2798       DEBUGF("%d | %lf %d %lf %d \n", i, 
2799              incInfo[i*4], (int)incInfo[i*4+1], 
2800              incInfo[i*4+2], (int)incInfo[i*4+3]);
2801     }
2802
2803     for (int i=0; i<numK; i++) {
2804       if (exemplarChoicesLeft[i] > 0) {
2805         outmsg->minIDs[i] = (int)incInfo[i*4+1];
2806         exemplarChoicesLeft[i]--;
2807       } else {
2808         outmsg->minIDs[i] = -1;
2809       }
2810       if (outlierChoicesLeft[i] > 0) {
2811         outmsg->maxIDs[i] = (int)incInfo[i*4+3];
2812         outlierChoicesLeft[i]--;
2813       } else {
2814         outmsg->maxIDs[i] = -1;
2815       }
2816     }
2817     thisProxy.collectDistances(outmsg);
2818     numSelectionIter--;
2819   } else {
2820     // invoke phase completion on all processors
2821     thisProxy.phaseDone();
2822   }
2823 }
2824
2825 /**
2826  *  Completion of the K-Means clustering and data selection of one phase
2827  *    of the computation.
2828  *
2829  *  Called on every processor.
2830  */
2831 void KMeansBOC::phaseDone() {
2832
2833   //  if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::phaseDone time=\t%g\n", CkMyPe(), CkWallTimer() );
2834
2835   LogPool *pool = CkpvAccess(_trace)->_logPool;
2836   CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
2837
2838   // now decide on what to do with the decision.
2839   if (!selected) {
2840     if (usePhases) {
2841       pool->keepPhase[currentPhase] = false;
2842     } else {
2843       // if not using phases, we're working on the whole log
2844       pool->setAllPhases(false);
2845     }
2846   }
2847
2848   // **FIXME** (?) - All processors have to agree on this, or the reduction
2849   //   will not be correct! The question is "is this enforcible?"
2850   if ((currentPhase == (pool->numPhases-1)) || !usePhases) {
2851     // We're done
2852     int dummy = 0;
2853     CkCallback cb(CkIndex_TraceProjectionsBOC::kMeansDone(NULL), 
2854                   0, bocProxy);
2855     contribute(sizeof(int), &dummy, CkReduction::sum_int, cb);
2856   } else {
2857     // reset all phase-based k-means data and decisions
2858
2859     // **FIXME**!!!!!    
2860     
2861     // invoke the next K-Means computation phase.
2862     currentPhase++;
2863     thisProxy[CkMyPe()].getNextPhaseMetrics();
2864   }
2865 }
2866
2867 void TraceProjectionsBOC::startTimeAnalysis()
2868 {
2869   double startTime = 0.0;
2870   if (CkpvAccess(_trace)->_logPool->numEntries>0)
2871      startTime = CkpvAccess(_trace)->_logPool->pool[0].time;
2872   CkCallback cb(CkIndex_TraceProjectionsBOC::startTimeDone(NULL), thisProxy);
2873   contribute(sizeof(double), &startTime, CkReduction::min_double, cb);  
2874 }
2875
2876 void TraceProjectionsBOC::startTimeDone(CkReductionMsg *msg)
2877 {
2878   // CkPrintf("[%d] TraceProjectionsBOC::startTimeDone time=\t%g parModulesRemaining:%d\n", CkMyPe(), CkWallTimer(), parModulesRemaining);
2879
2880   if (CkpvAccess(_trace) != NULL) {
2881     CkpvAccess(_trace)->_logPool->globalStartTime = *(double *)msg->getData();
2882     CkpvAccess(_trace)->_logPool->setNewStartTime();
2883     //if (CkMyPe() == 0) CkPrintf("Start time determined to be %lf us\n", (CkpvAccess(_trace)->_logPool->globalStartTime)*1e06);
2884   }
2885   delete msg;
2886   thisProxy[CkMyPe()].startEndTimeAnalysis();
2887 }
2888
2889 void TraceProjectionsBOC::startEndTimeAnalysis()
2890 {
2891  //CkPrintf("[%d] TraceProjectionsBOC::startEndTimeAnalysis time=\t%g\n", CkMyPe(), CkWallTimer() );
2892
2893   endTime = CkpvAccess(_trace)->endTime;
2894   // CkPrintf("[%d] End time is %lf us\n", CkMyPe(), endTime*1e06);
2895
2896   CkCallback cb(CkIndex_TraceProjectionsBOC::endTimeDone(NULL), 
2897                 0, thisProxy);
2898   contribute(sizeof(double), &endTime, CkReduction::max_double, cb);  
2899 }
2900
2901 void TraceProjectionsBOC::endTimeDone(CkReductionMsg *msg)
2902 {
2903  //if(CkMyPe()==0)    CkPrintf("[%d] TraceProjectionsBOC::endTimeDone time=\t%g parModulesRemaining:%d\n", CkMyPe(), CkWallTimer(), parModulesRemaining);
2904
2905   CkAssert(CkMyPe() == 0);
2906   parModulesRemaining--;
2907   if (CkpvAccess(_trace) != NULL) {
2908     CkpvAccess(_trace)->_logPool->globalEndTime = *(double *)msg->getData() - CkpvAccess(_trace)->_logPool->globalStartTime;
2909     // CkPrintf("End time determined to be %lf us\n",
2910     //       (CkpvAccess(_trace)->_logPool->globalEndTime)*1e06);
2911   }
2912   delete msg;
2913   if (parModulesRemaining == 0) {
2914     thisProxy[CkMyPe()].finalize();
2915   }
2916 }
2917
2918 void TraceProjectionsBOC::kMeansDone(CkReductionMsg *msg) {
2919
2920  if(CkMyPe()==0)  CkPrintf("[%d] TraceProjectionsBOC::kMeansDone time=\t%g\n", CkMyPe(), CkWallTimer() );
2921
2922   CkAssert(CkMyPe() == 0);
2923   parModulesRemaining--;
2924   CkPrintf("K-Means Analysis Time = %lf seconds\n",
2925            CmiWallTimer()-analysisStartTime);
2926   delete msg;
2927   if (parModulesRemaining == 0) {
2928     thisProxy[CkMyPe()].finalize();
2929   }
2930 }
2931
2932 /**
2933  *
2934  *  This version is called (on processor 0) only if flushCheck fails.
2935  *
2936  */
2937 void TraceProjectionsBOC::kMeansDone() {
2938   CkAssert(CkMyPe() == 0);
2939   parModulesRemaining--;
2940   CkPrintf("K-Means Analysis Aborted because of flush. Time taken = %lf seconds\n",
2941            CmiWallTimer()-analysisStartTime);
2942   if (parModulesRemaining == 0) {
2943     thisProxy[CkMyPe()].finalize();
2944   }
2945 }
2946
2947 void TraceProjectionsBOC::finalize()
2948 {
2949   CkAssert(CkMyPe() == 0);
2950   //CkPrintf("Total Analysis Time = %lf seconds\n", 
2951   //       CmiWallTimer()-analysisStartTime);
2952   thisProxy.closingTraces();
2953 }
2954
2955 // Called on every processor
2956 void TraceProjectionsBOC::closingTraces() {
2957   CkpvAccess(_trace)->closeTrace();
2958
2959     // subtle:  reduction needs to go to the PE which started CkExit()
2960   int pe = 0;
2961   if (endPe != -1) pe = endPe;
2962   CkCallback cb(CkIndex_TraceProjectionsBOC::closeParallelShutdown(NULL), 
2963                 pe, thisProxy); 
2964   contribute(0, NULL, CkReduction::sum_int, cb);  
2965 }
2966
2967 // The sole purpose of this reduction is to decide whether or not
2968 //   Projections as a module needs to call CkExit() to get other
2969 //   modules to shutdown.
2970 void TraceProjectionsBOC::closeParallelShutdown(CkReductionMsg *msg) {
2971   CkAssert(endPe == -1 && CkMyPe() ==0 || CkMyPe() == endPe);
2972   delete msg;
2973   // decide if CkExit() needs to be called
2974   if (!CkpvAccess(_trace)->converseExit) {
2975     CkExit();
2976   }
2977 }
2978 /*
2979  *  Registration and definition of the Outlier Reduction callback.
2980  *  Format: Sum | Min | Max | Sum of Squares
2981  */
2982 CkReductionMsg *outlierReduction(int nMsgs,
2983                                  CkReductionMsg **msgs) {
2984   int numBytes = 0;
2985   int numMetrics = 0;
2986   double *ret = NULL;
2987
2988   if (nMsgs == 1) {
2989     // nothing to do, just pass it on
2990     return CkReductionMsg::buildNew(msgs[0]->getSize(),msgs[0]->getData());
2991   }
2992
2993   if (nMsgs > 1) {
2994     numBytes = msgs[0]->getSize();
2995     // sanity checks
2996     if (numBytes%sizeof(double) != 0) {
2997       CkAbort("Outlier Reduction Size incompatible with doubles!\n");
2998     }
2999     if ((numBytes/sizeof(double))%4 != 0) {
3000       CkAbort("Outlier Reduction Size Array not divisible by 4!\n");
3001     }
3002     numMetrics = (numBytes/sizeof(double))/4;
3003     ret = new double[numMetrics*4];
3004
3005     // copy the first message data into the return structure first
3006     for (int i=0; i<numMetrics*4; i++) {
3007       ret[i] = ((double *)msgs[0]->getData())[i];
3008     }
3009
3010     // Sum | Min | Max | Sum of Squares
3011     for (int msgIdx=1; msgIdx<nMsgs; msgIdx++) {
3012       for (int i=0; i<numMetrics; i++) {
3013         // Sum
3014         ret[i] += ((double *)msgs[msgIdx]->getData())[i];
3015         // Min
3016         ret[numMetrics + i] =
3017           (ret[numMetrics + i] < 
3018            ((double *)msgs[msgIdx]->getData())[numMetrics + i]) 
3019           ? ret[numMetrics + i] : 
3020           ((double *)msgs[msgIdx]->getData())[numMetrics + i];
3021         // Max
3022         ret[2*numMetrics + i] = 
3023           (ret[2*numMetrics + i] >
3024            ((double *)msgs[msgIdx]->getData())[2*numMetrics + i])
3025           ? ret[2*numMetrics + i] :
3026           ((double *)msgs[msgIdx]->getData())[2*numMetrics + i];
3027         // Sum of Squares (squaring already done at leaf)
3028         ret[3*numMetrics + i] +=
3029           ((double *)msgs[msgIdx]->getData())[3*numMetrics + i];
3030       }
3031     }
3032   }
3033   
3034   /* apparently, we do not delete the incoming messages */
3035   return CkReductionMsg::buildNew(numBytes,ret);
3036 }
3037
3038 /*
3039  * The only reason we have a user-defined reduction is to support
3040  *   identification of the "winning" processors as well as to handle
3041  *   both the min and the max of each "tournament". A simple min/max
3042  *   discovery cannot handle ties.
3043  */
3044 CkReductionMsg *minMaxReduction(int nMsgs,
3045                                 CkReductionMsg **msgs) {
3046   CkAssert(nMsgs > 0);
3047
3048   int numBytes = msgs[0]->getSize();
3049   CkAssert(numBytes%sizeof(double) == 0);
3050   int numK = (numBytes/sizeof(double))/4;
3051
3052   double *ret = new double[numK*4];
3053   // fill with out-of-band values
3054   for (int i=0; i<numK; i++) {
3055     ret[i*4] = -1.0;
3056     ret[i*4+1] = -1.0;
3057     ret[i*4+2] = -1.0;
3058     ret[i*4+3] = -1.0;
3059   }
3060
3061   // incoming format K * (minVal | minIdx | maxVal | maxIdx)
3062   for (int i=0; i<nMsgs; i++) {
3063     double *temp = (double *)msgs[i]->getData();
3064     for (int j=0; j<numK; j++) {
3065       // no previous valid min
3066       if (ret[j*4+1] < 0) {
3067         // fill it in only if the incoming min is valid
3068         if (temp[j*4+1] >= 0) {
3069           ret[j*4] = temp[j*4];      // fill min value
3070           ret[j*4+1] = temp[j*4+1];  // fill ID
3071         }
3072       } else {
3073         // find Min, only if incoming min is valid
3074         if (temp[j*4+1] >= 0) {
3075           if (temp[j*4] < ret[j*4]) {
3076             ret[j*4] = temp[j*4];      // replace min value
3077             ret[j*4+1] = temp[j*4+1];  // replace ID
3078           }
3079         }
3080       }
3081       // no previous valid max
3082       if (ret[j*4+3] < 0) {
3083         // fill only if the incoming max is valid
3084         if (temp[j*4+3] >= 0) {
3085           ret[j*4+2] = temp[j*4+2];  // fill max value
3086           ret[j*4+3] = temp[j*4+3];  // fill ID
3087         }
3088       } else {
3089         // find Max, only if incoming max is valid
3090         if (temp[j*4+3] >= 0) {
3091           if (temp[j*4+2] > ret[j*4+2]) {
3092             ret[j*4+2] = temp[j*4+2];  // replace max value
3093             ret[j*4+3] = temp[j*4+3];  // replace ID
3094           }
3095         }
3096       }
3097     }
3098   }
3099   CkReductionMsg *redmsg = CkReductionMsg::buildNew(numBytes, ret);
3100   delete [] ret;
3101   return redmsg;
3102 }
3103
3104 #include "TraceProjections.def.h"
3105 #endif //PROJ_ANALYSIS
3106
3107 /*@}*/