26bffdef9e034475e3c472be9fbf0f44ebfd9da0
[charm.git] / src / ck-perf / trace-projections.C
1 /**
2  * \addtogroup CkPerf
3 */
4 /*@{*/
5
6 #include <string.h>
7
8 #include "charm++.h"
9 #include "trace-projections.h"
10 #include "trace-projectionsBOC.h"
11
12 #if DEBUG_PROJ
13 #define DEBUGF(...) CkPrintf(__VA_ARGS__)
14 #else
15 #define DEBUGF(...)
16 #endif
17 #define DEBUGN(...)  // easy way to selectively disable DEBUGs
18
19 #define DefaultLogBufSize      1000000
20
21 // **CW** Simple delta encoding implementation
22 // delta encoding is on by default. It may be turned off later in
23 // the runtime.
24 int deltaLog;
25 int nonDeltaLog;
26
27 int checknested=0;              // check illegal nested begin/end execute 
28
29 #ifdef PROJ_ANALYSIS
30 // BOC operations readonlys
31 CkGroupID traceProjectionsGID;
32 CkGroupID kMeansGID;
33
34 // New reduction type for Outlier Analysis purposes. This is allowed to be
35 // a global variable according to the Charm++ manual.
36 CkReductionMsg *outlierReduction(int nMsgs,
37                                  CkReductionMsg **msgs);
38 CkReductionMsg *minMaxReduction(int nMsgs,
39                                 CkReductionMsg **msgs);
40 CkReduction::reducerType outlierReductionType;
41 CkReduction::reducerType minMaxReductionType;
42 #endif // PROJ_ANALYSIS
43
44 CkpvStaticDeclare(TraceProjections*, _trace);
45 CtvStaticDeclare(int,curThreadEvent);
46
47 CkpvDeclare(CmiInt8, CtrLogBufSize);
48
49 typedef CkVec<char *>  usrEventVec;
50 CkpvStaticDeclare(usrEventVec, usrEventlist);
51 class UsrEvent {
52 public:
53   int e;
54   char *str;
55   UsrEvent(int _e, char* _s): e(_e),str(_s) {}
56 };
57 CkpvStaticDeclare(CkVec<UsrEvent *>*, usrEvents);
58
59 // When tracing is disabled, these are defined as empty static inlines
60 // in the header, to minimize overhead
61 #if CMK_TRACE_ENABLED
62 /// Disable the outputting of the trace logs
63 void disableTraceLogOutput()
64
65   CkpvAccess(_trace)->setWriteData(false);
66 }
67
68 /// Enable the outputting of the trace logs
69 void enableTraceLogOutput()
70 {
71   CkpvAccess(_trace)->setWriteData(true);
72 }
73
74 /// Force the log files to be flushed
75 void flushTraceLog()
76 {
77   CkpvAccess(_trace)->traceFlushLog();
78 }
79 #endif
80
81 #if ! CMK_TRACE_ENABLED
82 static int warned=0;
83 #define OPTIMIZED_VERSION       \
84         if (!warned) { warned=1;        \
85         CmiPrintf("\n\n!!!! Warning: traceUserEvent not available in optimized version!!!!\n\n\n"); }
86 #else
87 #define OPTIMIZED_VERSION /*empty*/
88 #endif // CMK_TRACE_ENABLED
89
90 /*
91 On T3E, we need to have file number control by open/close files only when needed.
92 */
93 #if CMK_TRACE_LOGFILE_NUM_CONTROL
94   #define OPEN_LOG openLog("a");
95   #define CLOSE_LOG closeLog();
96 #else
97   #define OPEN_LOG
98   #define CLOSE_LOG
99 #endif //CMK_TRACE_LOGFILE_NUM_CONTROL
100
101 #if CMK_HAS_COUNTER_PAPI
102 int papiEvents[NUMPAPIEVENTS] = { PAPI_L2_DCM, PAPI_FP_OPS };
103 #endif // CMK_HAS_COUNTER_PAPI
104
105 /**
106   For each TraceFoo module, _createTraceFoo() must be defined.
107   This function is called in _createTraces() generated in moduleInit.C
108 */
109 void _createTraceprojections(char **argv)
110 {
111   DEBUGF("%d createTraceProjections\n", CkMyPe());
112   CkpvInitialize(CkVec<char *>, usrEventlist);
113   CkpvInitialize(CkVec<UsrEvent *>*, usrEvents);
114   CkpvAccess(usrEvents) = new CkVec<UsrEvent *>();
115 #if CMK_BIGSIM_CHARM
116   // CthRegister does not call the constructor
117 //  CkpvAccess(usrEvents) = CkVec<UsrEvent *>();
118 #endif //CMK_BIGSIM_CHARM
119   CkpvInitialize(TraceProjections*, _trace);
120   CkpvAccess(_trace) = new  TraceProjections(argv);
121   CkpvAccess(_traces)->addTrace(CkpvAccess(_trace));
122   if (CkMyPe()==0) CkPrintf("Charm++: Tracemode Projections enabled.\n");
123 }
124  
125 /* ****** CW TEMPORARY LOCATION ***** Support for thread listeners */
126
127 struct TraceThreadListener {
128   struct CthThreadListener base;
129   int event;
130   int msgType;
131   int ep;
132   int srcPe;
133   int ml;
134   CmiObjId idx;
135 };
136
137 extern "C"
138 void traceThreadListener_suspend(struct CthThreadListener *l)
139 {
140   TraceThreadListener *a=(TraceThreadListener *)l;
141   /* here, we activate the appropriate trace codes for the appropriate
142      registered modules */
143   traceSuspend();
144 }
145
146 extern "C"
147 void traceThreadListener_resume(struct CthThreadListener *l) 
148 {
149   TraceThreadListener *a=(TraceThreadListener *)l;
150   /* here, we activate the appropriate trace codes for the appropriate
151      registered modules */
152   _TRACE_BEGIN_EXECUTE_DETAILED(a->event,a->msgType,a->ep,a->srcPe,a->ml,
153                                 CthGetThreadID(a->base.thread));
154   a->event=-1;
155   a->srcPe=CkMyPe(); /* potential lie to migrated threads */
156   a->ml=0;
157 }
158
159 extern "C"
160 void traceThreadListener_free(struct CthThreadListener *l) 
161 {
162   TraceThreadListener *a=(TraceThreadListener *)l;
163   delete a;
164 }
165
166 void TraceProjections::traceAddThreadListeners(CthThread tid, envelope *e)
167 {
168 #if CMK_TRACE_ENABLED
169   /* strip essential information from the envelope */
170   TraceThreadListener *a= new TraceThreadListener;
171   
172   a->base.suspend=traceThreadListener_suspend;
173   a->base.resume=traceThreadListener_resume;
174   a->base.free=traceThreadListener_free;
175   a->event=e->getEvent();
176   a->msgType=e->getMsgtype();
177   a->ep=e->getEpIdx();
178   a->srcPe=e->getSrcPe();
179   a->ml=e->getTotalsize();
180
181   CthAddListener(tid, (CthThreadListener *)a);
182 #endif
183 }
184
185 void LogPool::openLog(const char *mode)
186 {
187 #if CMK_PROJECTIONS_USE_ZLIB
188   if(compressed) {
189     if (nonDeltaLog) {
190       do {
191         zfp = gzopen(fname, mode);
192       } while (!zfp && (errno == EINTR || errno == EMFILE));
193       if(!zfp) CmiAbort("Cannot open Projections Compressed Non Delta Trace File for writing...\n");
194     }
195     if (deltaLog) {
196       do {
197         deltazfp = gzopen(dfname, mode);
198       } while (!deltazfp && (errno == EINTR || errno == EMFILE));
199       if (!deltazfp) 
200         CmiAbort("Cannot open Projections Compressed Delta Trace File for writing...\n");
201     }
202   } else {
203     if (nonDeltaLog) {
204       do {
205         fp = fopen(fname, mode);
206       } while (!fp && (errno == EINTR || errno == EMFILE));
207       if (!fp) {
208         CkPrintf("[%d] Attempting to open file [%s]\n",CkMyPe(),fname);
209         CmiAbort("Cannot open Projections Non Delta Trace File for writing...\n");
210       }
211     }
212     if (deltaLog) {
213       do {
214         deltafp = fopen(dfname, mode);
215       } while (!deltafp && (errno == EINTR || errno == EMFILE));
216       if (!deltafp) 
217         CmiAbort("Cannot open Projections Delta Trace File for writing...\n");
218     }
219   }
220 #else
221   if (nonDeltaLog) {
222     do {
223       fp = fopen(fname, mode);
224     } while (!fp && (errno == EINTR || errno == EMFILE));
225     if (!fp) {
226       CkPrintf("[%d] Attempting to open file [%s]\n",CkMyPe(),fname);
227       CmiAbort("Cannot open Projections Non Delta Trace File for writing...\n");
228     }
229   }
230   if (deltaLog) {
231     do {
232       deltafp = fopen(dfname, mode);
233     } while (!deltafp && (errno == EINTR || errno == EMFILE));
234     if(!deltafp) 
235       CmiAbort("Cannot open Projections Delta Trace File for writing...\n");
236   }
237 #endif
238 }
239
240 void LogPool::closeLog(void)
241 {
242 #if CMK_PROJECTIONS_USE_ZLIB
243   if(compressed) {
244     if (nonDeltaLog) gzclose(zfp);
245     if (deltaLog) gzclose(deltazfp);
246     return;
247   }
248 #endif
249   if (nonDeltaLog) { 
250 #if !defined(_WIN32) || defined(__CYGWIN__)
251     fsync(fileno(fp)); 
252 #endif
253     fclose(fp); 
254   }
255   if (deltaLog)  { 
256 #if !defined(_WIN32) || defined(__CYGWIN__)
257     fsync(fileno(deltafp)); 
258 #endif
259     fclose(deltafp);  
260   }
261 }
262
263 LogPool::LogPool(char *pgm) {
264   pool = new LogEntry[CkpvAccess(CtrLogBufSize)];
265   // defaults to writing data (no outlier changes)
266   writeData = true;
267   numEntries = 0;
268   // **CW** for simple delta encoding
269   prevTime = 0.0;
270   timeErr = 0.0;
271   globalStartTime = 0.0;
272   globalEndTime = 0.0;
273   headerWritten = 0;
274   numPhases = 0;
275   hasFlushed = false;
276
277   keepPhase = NULL;
278
279   fileCreated = false;
280   poolSize = CkpvAccess(CtrLogBufSize);
281   pgmname = new char[strlen(pgm)+1];
282   strcpy(pgmname, pgm);
283 }
284
285 void LogPool::createFile(const char *fix)
286 {
287   if (fileCreated) {
288     return;
289   }
290
291   char* filenameLastPart = strrchr(pgmname, PATHSEP) + 1; // Last occurrence of path separator
292   char *pathPlusFilePrefix = new char[1024];
293
294   if(nSubdirs > 0){
295     int sd = CkMyPe() % nSubdirs;
296     char *subdir = new char[1024];
297     sprintf(subdir, "%s.projdir.%d", pgmname, sd);
298     CmiMkdir(subdir);
299     sprintf(pathPlusFilePrefix, "%s%c%s%s", subdir, PATHSEP, filenameLastPart, fix);
300     delete[] subdir;
301   } else {
302     sprintf(pathPlusFilePrefix, "%s%s", pgmname, fix);
303   }
304
305   char pestr[10];
306   sprintf(pestr, "%d", CkMyPe());
307 #if CMK_PROJECTIONS_USE_ZLIB
308   int len;
309   if(compressed)
310     len = strlen(pathPlusFilePrefix)+strlen(".logold")+strlen(pestr)+strlen(".gz")+3;
311   else
312     len = strlen(pathPlusFilePrefix)+strlen(".logold")+strlen(pestr)+3;
313 #else
314   int len = strlen(pathPlusFilePrefix)+strlen(".logold")+strlen(pestr)+3;
315 #endif
316
317   if (nonDeltaLog) {
318     fname = new char[len];
319   }
320   if (deltaLog) {
321     dfname = new char[len];
322   }
323 #if CMK_PROJECTIONS_USE_ZLIB
324   if(compressed) {
325     if (deltaLog && nonDeltaLog) {
326       sprintf(fname, "%s.%s.logold.gz",  pathPlusFilePrefix, pestr);
327       sprintf(dfname, "%s.%s.log.gz", pathPlusFilePrefix, pestr);
328     } else {
329       if (nonDeltaLog) {
330         sprintf(fname, "%s.%s.log.gz", pathPlusFilePrefix,pestr);
331       } else {
332         sprintf(dfname, "%s.%s.log.gz", pathPlusFilePrefix, pestr);
333       }
334     }
335   } else {
336     if (deltaLog && nonDeltaLog) {
337       sprintf(fname, "%s.%s.logold", pathPlusFilePrefix, pestr);
338       sprintf(dfname, "%s.%s.log", pathPlusFilePrefix, pestr);
339     } else {
340       if (nonDeltaLog) {
341         sprintf(fname, "%s.%s.log", pathPlusFilePrefix, pestr);
342       } else {
343         sprintf(dfname, "%s.%s.log", pathPlusFilePrefix, pestr);
344       }
345     }
346   }
347 #else
348   if (deltaLog && nonDeltaLog) {
349     sprintf(fname, "%s.%s.logold", pathPlusFilePrefix, pestr);
350     sprintf(dfname, "%s.%s.log", pathPlusFilePrefix, pestr);
351   } else {
352     if (nonDeltaLog) {
353       sprintf(fname, "%s.%s.log", pathPlusFilePrefix, pestr);
354     } else {
355       sprintf(dfname, "%s.%s.log", pathPlusFilePrefix, pestr);
356     }
357   }
358 #endif
359   fileCreated = true;
360   delete[] pathPlusFilePrefix;
361   openLog("w+");
362   CLOSE_LOG 
363 }
364
365 void LogPool::createSts(const char *fix)
366 {
367   CkAssert(CkMyPe() == 0);
368   // create the sts file
369   char *fname = new char[strlen(CkpvAccess(traceRoot))+strlen(fix)+strlen(".sts")+2];
370   sprintf(fname, "%s%s.sts", CkpvAccess(traceRoot), fix);
371   do
372     {
373       stsfp = fopen(fname, "w");
374     } while (!stsfp && (errno == EINTR || errno == EMFILE));
375   if(stsfp==0){
376     CmiPrintf("Cannot open projections sts file for writing due to %s\n", strerror(errno));
377     CmiAbort("Error!!\n");
378   }
379   delete[] fname;
380 }  
381
382 void LogPool::createRC()
383 {
384   // create the projections rc file.
385   fname = 
386     new char[strlen(CkpvAccess(traceRoot))+strlen(".projrc")+1];
387   sprintf(fname, "%s.projrc", CkpvAccess(traceRoot));
388   do {
389     rcfp = fopen(fname, "w");
390   } while (!rcfp && (errno == EINTR || errno == EMFILE));
391   if (rcfp==0) {
392     CmiAbort("Cannot open projections configuration file for writing.\n");
393   }
394   delete[] fname;
395 }
396
397 LogPool::~LogPool() 
398 {
399   if (writeData) {
400     writeLog();
401 #if !CMK_TRACE_LOGFILE_NUM_CONTROL
402     closeLog();
403 #endif
404   }
405
406 #if CMK_BIGSIM_CHARM
407   extern int correctTimeLog;
408   if (correctTimeLog) {
409     createFile("-bg");
410     if (CkMyPe() == 0) {
411       createSts("-bg");
412     }
413     writeHeader();
414     if (CkMyPe() == 0) writeSts(NULL);
415     postProcessLog();
416   }
417 #endif
418
419   delete[] pool;
420   delete [] fname;
421 }
422
423 void LogPool::writeHeader()
424 {
425   if (headerWritten) return;
426   headerWritten = 1;
427   if(!binary) {
428 #if CMK_PROJECTIONS_USE_ZLIB
429     if(compressed) {
430       if (nonDeltaLog) {
431         gzprintf(zfp, "PROJECTIONS-RECORD %d\n", numEntries);
432       }
433       if (deltaLog) {
434         gzprintf(deltazfp, "PROJECTIONS-RECORD %d DELTA\n", numEntries);
435       }
436     } 
437     else /* else clause is below... */
438 #endif
439     /*... may hang over from else above */ {
440       if (nonDeltaLog) {
441         fprintf(fp, "PROJECTIONS-RECORD %d\n", numEntries);
442       }
443       if (deltaLog) {
444         fprintf(deltafp, "PROJECTIONS-RECORD %d DELTA\n", numEntries);
445       }
446     }
447   }
448   else { // binary
449       if (nonDeltaLog) {
450         fwrite(&numEntries,sizeof(numEntries),1,fp);
451       }
452       if (deltaLog) {
453         fwrite(&numEntries,sizeof(numEntries),1,deltafp);
454       }
455   }
456 }
457
458 void LogPool::writeLog(void)
459 {
460   createFile();
461   OPEN_LOG
462   writeHeader();
463   if (nonDeltaLog) write(0);
464   if (deltaLog) write(1);
465   CLOSE_LOG
466 }
467
468 void LogPool::write(int writedelta) 
469 {
470   // **CW** Simple delta encoding implementation
471   // prevTime has to be maintained as an object variable because
472   // LogPool::write may be called several times depending on the
473   // +logsize value.
474   PUP::er *p = NULL;
475   if (binary) {
476     p = new PUP::toDisk(writedelta?deltafp:fp);
477   }
478 #if CMK_PROJECTIONS_USE_ZLIB
479   else if (compressed) {
480     p = new toProjectionsGZFile(writedelta?deltazfp:zfp);
481   }
482 #endif
483   else {
484     p = new toProjectionsFile(writedelta?deltafp:fp);
485   }
486   CmiAssert(p);
487   int curPhase = 0;
488   // **FIXME** - Should probably consider a more sophisticated bounds-based
489   //   approach for selective writing instead of making multiple if-checks
490   //   for every single event.
491   for(UInt i=0; i<numEntries; i++) {
492     if (!writedelta) {
493       if (keepPhase == NULL) {
494         // default case, when no phase selection is required.
495         pool[i].pup(*p);
496       } else {
497         // **FIXME** Might be a good idea to create a "filler" event block for
498         //   all the events taken out by phase filtering.
499         if (pool[i].type == END_PHASE) {
500           // always write phase markers
501           pool[i].pup(*p);
502           curPhase++;
503         } else if (pool[i].type == BEGIN_COMPUTATION ||
504                    pool[i].type == END_COMPUTATION) {
505           // always write BEGIN and END COMPUTATION markers
506           pool[i].pup(*p);
507         } else if (keepPhase[curPhase]) {
508           pool[i].pup(*p);
509         }
510       }
511     }
512     else {      // delta
513       // **FIXME** Implement phase-selective writing for delta logs
514       //   eventually
515       double time = pool[i].time;
516       if (pool[i].type != BEGIN_COMPUTATION && pool[i].type != END_COMPUTATION)
517       {
518         double timeDiff = (time-prevTime)*1.0e6;
519         UInt intTimeDiff = (UInt)timeDiff;
520         timeErr += timeDiff - intTimeDiff; /* timeErr is never >= 2.0 */
521         if (timeErr > 1.0) {
522           timeErr -= 1.0;
523           intTimeDiff++;
524         }
525         pool[i].time = intTimeDiff/1.0e6;
526       }
527       pool[i].pup(*p);
528       pool[i].time = time;      // restore time value
529       prevTime = time;
530     }
531   }
532   delete p;
533   delete [] keepPhase;
534 }
535
536 void LogPool::writeSts(void)
537 {
538   // for whining compilers
539   int i;
540   // generate an automatic unique ID for each log
541   fprintf(stsfp, "PROJECTIONS_ID %s\n", "");
542   fprintf(stsfp, "VERSION %s\n", PROJECTION_VERSION);
543   fprintf(stsfp, "TOTAL_PHASES %d\n", numPhases);
544 #if CMK_HAS_COUNTER_PAPI
545   fprintf(stsfp, "TOTAL_PAPI_EVENTS %d\n", NUMPAPIEVENTS);
546   // for now, use i, next time use papiEvents[i].
547   // **CW** papi event names is a hack.
548   char eventName[PAPI_MAX_STR_LEN];
549   for (i=0;i<NUMPAPIEVENTS;i++) {
550     PAPI_event_code_to_name(papiEvents[i], eventName);
551     fprintf(stsfp, "PAPI_EVENT %d %s\n", i, eventName);
552   }
553 #endif
554   traceWriteSTS(stsfp,CkpvAccess(usrEvents)->length());
555   for(i=0;i<CkpvAccess(usrEvents)->length();i++){
556     fprintf(stsfp, "EVENT %d %s\n", (*CkpvAccess(usrEvents))[i]->e, (*CkpvAccess(usrEvents))[i]->str);
557   }     
558 }
559
560 void LogPool::writeSts(TraceProjections *traceProj){
561   writeSts();
562   if (traceProj != NULL) {
563     CkHashtableIterator  *funcIter = traceProj->getfuncIterator();
564     funcIter->seekStart();
565     int numFuncs = traceProj->getFuncNumber();
566     fprintf(stsfp,"TOTAL_FUNCTIONS %d \n",numFuncs);
567     while(funcIter->hasNext()) {
568       StrKey *key;
569       int *obj = (int *)funcIter->next((void **)&key);
570       fprintf(stsfp,"FUNCTION %d %s \n",*obj,key->getStr());
571     }
572   }
573   fprintf(stsfp, "END\n");
574   fclose(stsfp);
575 }
576
577 void LogPool::writeRC(void)
578 {
579     //CkPrintf("write RC is being executed\n");
580 #ifdef PROJ_ANALYSIS  
581     CkAssert(CkMyPe() == 0);
582     fprintf(rcfp,"RC_GLOBAL_START_TIME %lld\n",
583             (CMK_PUP_LONG_LONG)(1.0e6*globalStartTime));
584     fprintf(rcfp,"RC_GLOBAL_END_TIME   %lld\n",
585             (CMK_PUP_LONG_LONG)(1.0e6*globalEndTime));
586     /* //Yanhua comment it because isOutlierAutomatic is not a variable in trace
587     if (CkpvAccess(_trace)->isOutlierAutomatic()) {
588       fprintf(rcfp,"RC_OUTLIER_FILTERED true\n");
589     } else {
590       fprintf(rcfp,"RC_OUTLIER_FILTERED false\n");
591     }
592     */
593 #endif //PROJ_ANALYSIS
594   fclose(rcfp);
595 }
596
597
598 #if CMK_BIGSIM_CHARM
599 static void updateProjLog(void *data, double t, double recvT, void *ptr)
600 {
601   LogEntry *log = (LogEntry *)data;
602   FILE *fp = *(FILE **)ptr;
603   log->time = t;
604   log->recvTime = recvT<0.0?0:recvT;
605 //  log->write(fp);
606   toProjectionsFile p(fp);
607   log->pup(p);
608 }
609 #endif
610
611 // flush log entries to disk
612 void LogPool::flushLogBuffer()
613 {
614   if (numEntries) {
615     double writeTime = TraceTimer();
616     writeLog();
617     hasFlushed = true;
618     numEntries = 0;
619     new (&pool[numEntries++]) LogEntry(writeTime, BEGIN_INTERRUPT);
620     new (&pool[numEntries++]) LogEntry(TraceTimer(), END_INTERRUPT);
621   }
622 }
623
624 void LogPool::add(UChar type, UShort mIdx, UShort eIdx,
625                   double time, int event, int pe, int ml, CmiObjId *id, 
626                   double recvT, double cpuT, int numPe)
627 {
628   new (&pool[numEntries++])
629     LogEntry(time, type, mIdx, eIdx, event, pe, ml, id, recvT, cpuT, numPe);
630   if ((type == END_PHASE) || (type == END_COMPUTATION)) {
631     numPhases++;
632   }
633   if(poolSize==numEntries) {
634     flushLogBuffer();
635 #if CMK_BIGSIM_CHARM
636     extern int correctTimeLog;
637     if (correctTimeLog) CmiAbort("I/O interrupt!\n");
638 #endif
639   }
640 #if CMK_BIGSIM_CHARM
641   switch (type) {
642     case BEGIN_PROCESSING:
643       pool[numEntries-1].recvTime = BgGetRecvTime();
644     case END_PROCESSING:
645     case BEGIN_COMPUTATION:
646     case END_COMPUTATION:
647     case CREATION:
648     case BEGIN_PACK:
649     case END_PACK:
650     case BEGIN_UNPACK:
651     case END_UNPACK:
652     case USER_EVENT_PAIR:
653       bgAddProjEvent(&pool[numEntries-1], numEntries-1, time, updateProjLog, &fp, BG_EVENT_PROJ);
654   }
655 #endif
656 }
657
658 void LogPool::add(UChar type,double time,UShort funcID,int lineNum,char *fileName){
659 #ifndef CMK_BIGSIM_CHARM
660   new (&pool[numEntries++])
661         LogEntry(time,type,funcID,lineNum,fileName);
662   if(poolSize == numEntries){
663     flushLogBuffer();
664   }
665 #endif  
666 }
667
668
669   
670 void LogPool::addMemoryUsage(unsigned char type,double time,double memUsage){
671 #ifndef CMK_BIGSIM_CHARM
672   new (&pool[numEntries++])
673         LogEntry(type,time,memUsage);
674   if(poolSize == numEntries){
675     flushLogBuffer();
676   }
677 #endif  
678         
679 }  
680
681
682
683 void LogPool::addUserSupplied(int data){
684         // add an event
685         add(USER_SUPPLIED, 0, 0, TraceTimer(), -1, -1, 0, 0, 0, 0, 0 );
686
687         // set the user supplied value for the previously created event 
688         pool[numEntries-1].setUserSuppliedData(data);
689   }
690
691
692 void LogPool::addUserSuppliedNote(char *note){
693         // add an event
694         add(USER_SUPPLIED_NOTE, 0, 0, TraceTimer(), -1, -1, 0, 0, 0, 0, 0 );
695
696         // set the user supplied note for the previously created event 
697         pool[numEntries-1].setUserSuppliedNote(note);
698   }
699
700 void LogPool::addUserSuppliedBracketedNote(char *note, int eventID, double bt, double et){
701   //CkPrintf("LogPool::addUserSuppliedBracketedNote eventID=%d\n", eventID);
702 #ifndef CMK_BIGSIM_CHARM
703 #if MPI_TRACE_MACHINE_HACK
704   //This part of code is used  to combine the contiguous
705   //MPI_Test and MPI_Iprobe events to reduce the number of
706   //entries
707 #define MPI_TEST_EVENT_ID 60
708 #define MPI_IPROBE_EVENT_ID 70 
709   int lastEvent = pool[numEntries-1].event;
710   if((eventID==MPI_TEST_EVENT_ID || eventID==MPI_IPROBE_EVENT_ID) && (eventID==lastEvent)){
711     //just replace the endtime of last event
712     //CkPrintf("addUserSuppliedBracketNote: for event %d\n", lastEvent);
713     pool[numEntries].endTime = et;
714   }else{
715     new (&pool[numEntries++])
716       LogEntry(bt, et, USER_SUPPLIED_BRACKETED_NOTE, note, eventID);
717   }
718 #else
719   new (&pool[numEntries++])
720     LogEntry(bt, et, USER_SUPPLIED_BRACKETED_NOTE, note, eventID);
721 #endif
722   if(poolSize == numEntries){
723     flushLogBuffer();
724   }
725 #endif  
726 }
727
728
729 /* **CW** Not sure if this is the right thing to do. Feels more like
730    a hack than a solution to Sameer's request to add the destination
731    processor information to multicasts and broadcasts.
732
733    In the unlikely event this method is used for Broadcasts as well,
734    pelist == NULL will be used to indicate a global broadcast with 
735    num PEs.
736 */
737 void LogPool::addCreationMulticast(UShort mIdx, UShort eIdx, double time,
738                                    int event, int pe, int ml, CmiObjId *id,
739                                    double recvT, int numPe, int *pelist)
740 {
741   new (&pool[numEntries++])
742     LogEntry(time, mIdx, eIdx, event, pe, ml, id, recvT, numPe, pelist);
743   if(poolSize==numEntries) {
744     flushLogBuffer();
745   }
746 }
747
748 void LogPool::postProcessLog()
749 {
750 #if CMK_BIGSIM_CHARM
751   bgUpdateProj(1);   // event type
752 #endif
753 }
754
755 void LogPool::modLastEntryTimestamp(double ts)
756 {
757   pool[numEntries-1].time = ts;
758   //pool[numEntries-1].cputime = ts;
759 }
760
761 // /** Constructor for a multicast log entry */
762 // 
763 //  THIS WAS MOVED TO trace-projections.h with the other constructors
764 // 
765 // LogEntry::LogEntry(double tm, unsigned short m, unsigned short e, int ev, int p,
766 //           int ml, CmiObjId *d, double rt, int numPe, int *pelist) 
767 // {
768 //     type = CREATION_MULTICAST; mIdx = m; eIdx = e; event = ev; pe = p; time = tm; msglen = ml;
769 //     if (d) id = *d; else {id.id[0]=id.id[1]=id.id[2]=id.id[3]=-1; };
770 //     recvTime = rt; 
771 //     numpes = numPe;
772 //     userSuppliedNote = NULL;
773 //     if (pelist != NULL) {
774 //      pes = new int[numPe];
775 //      for (int i=0; i<numPe; i++) {
776 //        pes[i] = pelist[i];
777 //      }
778 //     } else {
779 //      pes= NULL;
780 //     }
781 // }
782
783 //void LogEntry::addPapi(LONG_LONG_PAPI *papiVals)
784 //{
785 //#if CMK_HAS_COUNTER_PAPI
786 //   memcpy(papiValues, papiVals, sizeof(LONG_LONG_PAPI)*NUMPAPIEVENTS);
787 //#endif
788 //}
789
790
791
792 void LogEntry::pup(PUP::er &p)
793 {
794   int i;
795   CMK_TYPEDEF_UINT8 itime, iEndTime, irecvtime, icputime;
796   char ret = '\n';
797
798   p|type;
799   if (p.isPacking()) itime = (CMK_TYPEDEF_UINT8)(1.0e6*time);
800   if (p.isPacking()) iEndTime = (CMK_TYPEDEF_UINT8)(1.0e6*endTime);
801
802   switch (type) {
803     case USER_EVENT:
804     case USER_EVENT_PAIR:
805       p|mIdx; p|itime; p|event; p|pe;
806       break;
807     case BEGIN_IDLE:
808     case END_IDLE:
809     case BEGIN_PACK:
810     case END_PACK:
811     case BEGIN_UNPACK:
812     case END_UNPACK:
813       p|itime; p|pe; 
814       break;
815     case BEGIN_PROCESSING:
816       if (p.isPacking()) {
817         irecvtime = (CMK_TYPEDEF_UINT8)(recvTime==-1?-1:1.0e6*recvTime);
818         icputime = (CMK_TYPEDEF_UINT8)(1.0e6*cputime);
819       }
820       p|mIdx; p|eIdx; p|itime; p|event; p|pe; 
821       p|msglen; p|irecvtime; 
822       p|id.id[0]; p|id.id[1]; p|id.id[2]; p|id.id[3];
823       p|icputime;
824 #if CMK_HAS_COUNTER_PAPI
825       //p|numPapiEvents;
826       for (i=0; i<NUMPAPIEVENTS; i++) {
827         // not yet!!!
828         //      p|papiIDs[i]; 
829         p|papiValues[i];
830         
831       }
832 #else
833       //p|numPapiEvents;     // non papi version has value 0
834 #endif
835       if (p.isUnpacking()) {
836         recvTime = irecvtime/1.0e6;
837         cputime = icputime/1.0e6;
838       }
839       break;
840     case END_PROCESSING:
841       if (p.isPacking()) icputime = (CMK_TYPEDEF_UINT8)(1.0e6*cputime);
842       p|mIdx; p|eIdx; p|itime; p|event; p|pe; p|msglen; p|icputime;
843 #if CMK_HAS_COUNTER_PAPI
844       //p|numPapiEvents;
845       for (i=0; i<NUMPAPIEVENTS; i++) {
846         // not yet!!!
847         //      p|papiIDs[i];
848         p|papiValues[i];
849       }
850 #else
851       //p|numPapiEvents;  // non papi version has value 0
852 #endif
853       if (p.isUnpacking()) cputime = icputime/1.0e6;
854       break;
855     case USER_SUPPLIED:
856           p|userSuppliedData;
857           p|itime;
858         break;
859     case USER_SUPPLIED_NOTE:
860           p|itime;
861           int length;
862           if (p.isPacking()) length = strlen(userSuppliedNote);
863           p | length;
864           char space;
865           space = ' ';
866           p | space;
867           if (p.isUnpacking()) {
868             userSuppliedNote = new char[length+1];
869             userSuppliedNote[length] = '\0';
870           }
871           PUParray(p,userSuppliedNote, length);
872           break;
873     case USER_SUPPLIED_BRACKETED_NOTE:
874       //CkPrintf("Writting out a USER_SUPPLIED_BRACKETED_NOTE\n");
875           p|itime;
876           p|iEndTime;
877           p|event;
878           int length2;
879           if (p.isPacking()) length2 = strlen(userSuppliedNote);
880           p | length2;
881           char space2;
882           space2 = ' ';
883           p | space2;
884           if (p.isUnpacking()) {
885             userSuppliedNote = new char[length+1];
886             userSuppliedNote[length] = '\0';
887           }
888           PUParray(p,userSuppliedNote, length2);
889           break;
890     case MEMORY_USAGE_CURRENT:
891       p | memUsage;
892       p | itime;
893         break;
894     case CREATION:
895       if (p.isPacking()) irecvtime = (CMK_TYPEDEF_UINT8)(1.0e6*recvTime);
896       p|mIdx; p|eIdx; p|itime;
897       p|event; p|pe; p|msglen; p|irecvtime;
898       if (p.isUnpacking()) recvTime = irecvtime/1.0e6;
899       break;
900     case CREATION_BCAST:
901       if (p.isPacking()) irecvtime = (CMK_TYPEDEF_UINT8)(1.0e6*recvTime);
902       p|mIdx; p|eIdx; p|itime;
903       p|event; p|pe; p|msglen; p|irecvtime; p|numpes;
904       if (p.isUnpacking()) recvTime = irecvtime/1.0e6;
905       break;
906     case CREATION_MULTICAST:
907       if (p.isPacking()) irecvtime = (CMK_TYPEDEF_UINT8)(1.0e6*recvTime);
908       p|mIdx; p|eIdx; p|itime;
909       p|event; p|pe; p|msglen; p|irecvtime; p|numpes;
910       if (p.isUnpacking()) pes = numpes?new int[numpes]:NULL;
911       for (i=0; i<numpes; i++) p|pes[i];
912       if (p.isUnpacking()) recvTime = irecvtime/1.0e6;
913       break;
914     case MESSAGE_RECV:
915       p|mIdx; p|eIdx; p|itime; p|event; p|pe; p|msglen;
916       break;
917
918     case ENQUEUE:
919     case DEQUEUE:
920       p|mIdx; p|itime; p|event; p|pe;
921       break;
922
923     case BEGIN_INTERRUPT:
924     case END_INTERRUPT:
925       p|itime; p|event; p|pe;
926       break;
927
928       // **CW** absolute timestamps are used here to support a quick
929       // way of determining the total time of a run in projections
930       // visualization.
931     case BEGIN_COMPUTATION:
932     case END_COMPUTATION:
933     case BEGIN_TRACE:
934     case END_TRACE:
935       p|itime;
936       break;
937     case BEGIN_FUNC:
938         p | itime;
939         p | mIdx;
940         p | event;
941         if(!p.isUnpacking()){
942                 p(fName,flen-1);
943         }
944         break;
945     case END_FUNC:
946         p | itime;
947         p | mIdx;
948         break;
949     case END_PHASE:
950       p|eIdx; // FIXME: actually the phase ID
951       p|itime;
952       break;
953     default:
954       CmiError("***Internal Error*** Wierd Event %d.\n", type);
955       break;
956   }
957   if (p.isUnpacking()) time = itime/1.0e6;
958   p|ret;
959 }
960
961 TraceProjections::TraceProjections(char **argv): 
962   curevent(0), inEntry(0), computationStarted(0), 
963         converseExit(0), endTime(0.0), traceNestedEvents(0),
964         currentPhaseID(0), lastPhaseEvent(NULL)
965 {
966   //  CkPrintf("Trace projections dummy constructor called on %d\n",CkMyPe());
967
968   if (CkpvAccess(traceOnPe) == 0) return;
969
970   CtvInitialize(int,curThreadEvent);
971   CkpvInitialize(CmiInt8, CtrLogBufSize);
972   CkpvAccess(CtrLogBufSize) = DefaultLogBufSize;
973   CtvAccess(curThreadEvent)=0;
974   if (CmiGetArgLongDesc(argv,"+logsize",&CkpvAccess(CtrLogBufSize), 
975                        "Log entries to buffer per I/O")) {
976     if (CkMyPe() == 0) {
977       CmiPrintf("Trace: logsize: %ld\n", CkpvAccess(CtrLogBufSize));
978     }
979   }
980   checknested = 
981     CmiGetArgFlagDesc(argv,"+checknested",
982                       "check projections nest begin end execute events");
983   traceNestedEvents = 
984     CmiGetArgFlagDesc(argv,"+tracenested",
985               "trace projections nest begin/end execute events");
986   int binary = 
987     CmiGetArgFlagDesc(argv,"+binary-trace",
988                       "Write log files in binary format");
989
990   CmiInt8 nSubdirs = 0;
991   CmiGetArgLongDesc(argv,"+trace-subdirs", &nSubdirs, "Number of subdirectories into which traces will be written");
992
993
994 #if CMK_PROJECTIONS_USE_ZLIB
995   int compressed = CmiGetArgFlagDesc(argv,"+gz-trace","Write log files pre-compressed with gzip");
996 #else
997   // consume the flag so there's no confusing
998   CmiGetArgFlagDesc(argv,"+gz-trace",
999                     "Write log files pre-compressed with gzip");
1000   if(CkMyPe() == 0) CkPrintf("Warning> gz-trace is not supported on this machine!\n");
1001 #endif
1002
1003   // **CW** default to non delta log encoding. The user may choose to do
1004   // create both logs (for debugging) or just the old log timestamping
1005   // (for compatibility).
1006   // Generating just the non delta log takes precedence over generating
1007   // both logs (if both arguments appear on the command line).
1008
1009   // switch to OLD log format until everything works // Gengbin
1010   nonDeltaLog = 1;
1011   deltaLog = 0;
1012   deltaLog = CmiGetArgFlagDesc(argv, "+logDelta",
1013                                   "Generate Delta encoded and simple timestamped log files");
1014
1015   _logPool = new LogPool(CkpvAccess(traceRoot));
1016   _logPool->setNumSubdirs(nSubdirs);
1017   _logPool->setBinary(binary);
1018 #if CMK_PROJECTIONS_USE_ZLIB
1019   _logPool->setCompressed(compressed);
1020 #endif
1021   if (CkMyPe() == 0) {
1022     _logPool->createSts();
1023     _logPool->createRC();
1024   }
1025   funcCount=1;
1026
1027 #if CMK_HAS_COUNTER_PAPI
1028   // We initialize and create the event sets for use with PAPI here.
1029   int papiRetValue;
1030   if(CkMyRank()==0){
1031     papiRetValue = PAPI_library_init(PAPI_VER_CURRENT);
1032     if (papiRetValue != PAPI_VER_CURRENT) {
1033       CmiAbort("PAPI Library initialization failure!\n");
1034     }
1035 #if CMK_SMP
1036     if(PAPI_thread_init(pthread_self) != PAPI_OK){
1037       CmiAbort("PAPI could not be initialized in SMP mode!\n");
1038     }
1039 #endif
1040   }
1041
1042 #if CMK_SMP
1043   //PAPI_thread_init has to finish before calling PAPI_create_eventset
1044   CmiNodeAllBarrier();
1045 #endif
1046   // PAPI 3 mandates the initialization of the set to PAPI_NULL
1047   papiEventSet = PAPI_NULL; 
1048   if (PAPI_create_eventset(&papiEventSet) != PAPI_OK) {
1049     CmiAbort("PAPI failed to create event set!\n");
1050   }
1051   papiRetValue = PAPI_add_events(papiEventSet, papiEvents, NUMPAPIEVENTS);
1052   if (papiRetValue != PAPI_OK) {
1053     if (papiRetValue == PAPI_ECNFLCT) {
1054       CmiAbort("PAPI events conflict! Please re-assign event types!\n");
1055     } else {
1056       CmiAbort("PAPI failed to add designated events!\n");
1057     }
1058   }
1059   memset(papiValues, 0, NUMPAPIEVENTS*sizeof(LONG_LONG_PAPI));
1060 #endif
1061 }
1062
1063 int TraceProjections::traceRegisterUserEvent(const char* evt, int e)
1064 {
1065   OPTIMIZED_VERSION
1066   CkAssert(e==-1 || e>=0);
1067   CkAssert(evt != NULL);
1068   int event;
1069   int biggest = -1;
1070   for (int i=0; i<CkpvAccess(usrEvents)->length(); i++) {
1071     int cur = (*CkpvAccess(usrEvents))[i]->e;
1072     if (cur == e) {
1073       //CmiPrintf("%s %s\n", (*CkpvAccess(usrEvents))[i]->str, evt);
1074       if (strcmp((*CkpvAccess(usrEvents))[i]->str, evt) == 0) 
1075         return e;
1076       else
1077         CmiAbort("UserEvent double registered!");
1078     }
1079     if (cur > biggest) biggest = cur;
1080   }
1081   // if no user events have so far been registered. biggest will be -1
1082   // and hence newly assigned event numbers will begin from 0.
1083   if (e==-1) event = biggest+1;  // automatically assign new event number
1084   else event = e;
1085   CkpvAccess(usrEvents)->push_back(new UsrEvent(event,(char *)evt));
1086   return event;
1087 }
1088
1089 void TraceProjections::traceClearEps(void)
1090 {
1091   // In trace-summary, this zeros out the EP bins, to eliminate noise
1092   // from startup.  Here, this isn't useful, since we can do that in
1093   // post-processing
1094 }
1095
1096 void TraceProjections::traceWriteSts(void)
1097 {
1098   if(CkMyPe()==0)
1099     _logPool->writeSts(this);
1100 }
1101
1102 /** 
1103  * **IMPT NOTES**:
1104  *
1105  * This is called when Converse closes during ConverseCommonExit().
1106  * **FIXME**(?) - is this also exposed as a tracing-framework API call?
1107  *
1108  * Some programs bypass CkExit() (like NAMD, which eventually calls
1109  * ConverseExit()), modules like traces will have to pretend to shutdown
1110  * as if CkExit() was called but at the same time avoid making
1111  * subsequent CkExit() calls (which is usually required for allowing
1112  * other modules to shutdown).
1113  *
1114  * Note that we can only get here if CkExit() was not called, since the
1115  * trace module will un-register itself from TraceArray if it did.
1116  *
1117  */
1118 void TraceProjections::traceClose(void)
1119 {
1120 #ifdef PROJ_ANALYSIS
1121   // CkPrintf("CkExit was not called on shutdown on [%d]\n", CkMyPe());
1122
1123   // sets the flag that tells the code not to make the CkExit call later
1124   converseExit = 1;
1125   if (CkMyPe() == 0) {
1126     CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1127     bocProxy.traceProjectionsParallelShutdown(-1);
1128   }
1129   if(CkMyRank() == CkMyNodeSize()){ //communication thread
1130     CkpvAccess(_trace)->endComputation();
1131     delete _logPool;              // will write
1132     // remove myself from traceArray so that no tracing will be called.
1133     CkpvAccess(_traces)->removeTrace(this);
1134   }
1135 #else
1136   // we've already deleted the logpool, so multiple calls to traceClose
1137   // are tolerated.
1138   if (_logPool == NULL) {
1139     return;
1140   }
1141   if(CkMyPe()==0){
1142     _logPool->writeSts(this);
1143   }
1144   CkpvAccess(_trace)->endComputation();
1145   delete _logPool;              // will write
1146   // remove myself from traceArray so that no tracing will be called.
1147   CkpvAccess(_traces)->removeTrace(this);
1148 #endif
1149 }
1150
1151 /**
1152  *  **IMPT NOTES**:
1153  *
1154  *  This is meant to be called internally by the tracing framework.
1155  *
1156  */
1157 void TraceProjections::closeTrace() {
1158   //  CkPrintf("Close Trace called on [%d]\n", CkMyPe());
1159   if (CkMyPe() == 0) {
1160     // CkPrintf("Pe 0 will now write sts and projrc files\n");
1161     _logPool->writeSts(this);
1162     _logPool->writeRC();
1163     // CkPrintf("Pe 0 has now written sts and projrc files\n");
1164   }
1165   delete _logPool;       // will write logs to file
1166 }
1167
1168 #if CMK_SMP_TRACE_COMMTHREAD
1169 void TraceProjections::traceBeginOnCommThread()
1170 {
1171   if (!computationStarted) return;
1172   _logPool->add(BEGIN_TRACE, 0, 0, TraceTimer(), curevent++, CmiNumPes()+CmiMyNode());
1173 }
1174
1175 void TraceProjections::traceEndOnCommThread()
1176 {
1177   _logPool->add(END_TRACE, 0, 0, TraceTimer(), curevent++, CmiNumPes()+CmiMyNode());
1178 }
1179 #endif
1180
1181 void TraceProjections::traceBegin(void)
1182 {
1183   if (!computationStarted) return;
1184   _logPool->add(BEGIN_TRACE, 0, 0, TraceTimer(), curevent++, CkMyPe());
1185 }
1186
1187 void TraceProjections::traceEnd(void)
1188 {
1189   _logPool->add(END_TRACE, 0, 0, TraceTimer(), curevent++, CkMyPe());
1190 }
1191
1192 void TraceProjections::userEvent(int e)
1193 {
1194   if (!computationStarted) return;
1195   _logPool->add(USER_EVENT, e, 0, TraceTimer(),curevent++,CkMyPe());
1196 }
1197
1198 void TraceProjections::userBracketEvent(int e, double bt, double et)
1199 {
1200   if (!computationStarted) return;
1201   // two events record Begin/End of event e.
1202   _logPool->add(USER_EVENT_PAIR, e, 0, TraceTimer(bt), curevent, CkMyPe());
1203   _logPool->add(USER_EVENT_PAIR, e, 0, TraceTimer(et), curevent++, CkMyPe());
1204 }
1205
1206 void TraceProjections::userSuppliedData(int d)
1207 {
1208   if (!computationStarted) return;
1209   _logPool->addUserSupplied(d);
1210 }
1211
1212 void TraceProjections::userSuppliedNote(char *note)
1213 {
1214   if (!computationStarted) return;
1215   _logPool->addUserSuppliedNote(note);
1216 }
1217
1218
1219 void TraceProjections::userSuppliedBracketedNote(char *note, int eventID, double bt, double et)
1220 {
1221   if (!computationStarted) return;
1222   _logPool->addUserSuppliedBracketedNote(note,  eventID,  bt, et);
1223 }
1224
1225 void TraceProjections::memoryUsage(double m)
1226 {
1227   if (!computationStarted) return;
1228   _logPool->addMemoryUsage(MEMORY_USAGE_CURRENT, TraceTimer(), m );
1229   
1230 }
1231
1232
1233 void TraceProjections::creation(envelope *e, int ep, int num)
1234 {
1235   double curTime = TraceTimer();
1236   if (e == 0) {
1237     CtvAccess(curThreadEvent) = curevent;
1238     _logPool->add(CREATION, ForChareMsg, ep, curTime,
1239                   curevent++, CkMyPe(), 0, NULL, 0, 0.0);
1240   } else {
1241     int type=e->getMsgtype();
1242     e->setEvent(curevent);
1243     if (num > 1) {
1244       _logPool->add(CREATION_BCAST, type, ep, curTime,
1245                     curevent++, CkMyPe(), e->getTotalsize(), 
1246                     NULL, 0, 0.0, num);
1247     } else {
1248       _logPool->add(CREATION, type, ep, curTime,
1249                     curevent++, CkMyPe(), e->getTotalsize(), 
1250                     NULL, 0, 0.0);
1251     }
1252   }
1253 }
1254
1255 //This function is only called from a comm thread in SMP mode. 
1256 void TraceProjections::creation(char *msg)
1257 {
1258 #if CMK_SMP_TRACE_COMMTHREAD
1259         // msg must be a charm message
1260     envelope *e = (envelope *)msg;
1261     int ep = e->getEpIdx();
1262     if(_entryTable[ep]->traceEnabled) {
1263         creation(e, ep, 1);
1264         e->setSrcPe(CkMyPe());              // pretend I am the sender
1265     }
1266 #endif
1267 }
1268
1269 void TraceProjections::traceCommSetMsgID(char *msg)
1270 {
1271 #if CMK_SMP_TRACE_COMMTHREAD
1272     // msg must be a charm message
1273     envelope *e = (envelope *)msg;
1274     int ep = e->getEpIdx();
1275     if(_entryTable[ep]->traceEnabled) {
1276         e->setSrcPe(CkMyPe());              // pretend I am the sender
1277         e->setEvent(curevent);
1278     }
1279 #endif
1280 }
1281
1282 void TraceProjections::traceGetMsgID(char *msg, int *pe, int *event)
1283 {
1284     // msg must be a charm message
1285     *pe = *event = -1;
1286     envelope *e = (envelope *)msg;
1287     int ep = e->getEpIdx();
1288     if(_entryTable[ep]->traceEnabled) {
1289         *pe = e->getSrcPe();
1290         *event = e->getEvent();
1291     }
1292 }
1293
1294 void TraceProjections::traceSetMsgID(char *msg, int pe, int event)
1295 {
1296        // msg must be a charm message
1297     envelope *e = (envelope *)msg;
1298     int ep = e->getEpIdx();
1299     if(ep<=0 || ep>=_entryTable.size()) return;
1300     if (e->getSrcPe()<0 || e->getSrcPe()>=CkNumPes()+CkNumNodes()) return;
1301     if (e->getMsgtype()<=0 || e->getMsgtype()>=LAST_CK_ENVELOPE_TYPE) return;
1302     if(_entryTable[ep]->traceEnabled) {
1303         e->setSrcPe(pe);
1304         e->setEvent(event);
1305     }
1306 }
1307
1308 /* **CW** Non-disruptive attempt to add destination PE knowledge to
1309    Communication Library-specific Multicasts via new event 
1310    CREATION_MULTICAST.
1311 */
1312
1313 void TraceProjections::creationMulticast(envelope *e, int ep, int num,
1314                                          int *pelist)
1315 {
1316   double curTime = TraceTimer();
1317   if (e==0) {
1318     CtvAccess(curThreadEvent)=curevent;
1319     _logPool->addCreationMulticast(ForChareMsg, ep, curTime, curevent++,
1320                                    CkMyPe(), 0, 0, 0.0, num, pelist);
1321   } else {
1322     int type=e->getMsgtype();
1323     e->setEvent(curevent);
1324     _logPool->addCreationMulticast(type, ep, curTime, curevent++, CkMyPe(),
1325                                    e->getTotalsize(), 0, 0.0, num, pelist);
1326   }
1327 }
1328
1329 void TraceProjections::creationDone(int num)
1330 {
1331   // modified the creation done time of the last num log entries
1332   // FIXME: totally a hack
1333   double curTime = TraceTimer();
1334   int idx = _logPool->numEntries-1;
1335   while (idx >=0 && num >0 ) {
1336     LogEntry &log = _logPool->pool[idx];
1337     if ((log.type == CREATION) ||
1338         (log.type == CREATION_BCAST) ||
1339         (log.type == CREATION_MULTICAST)) {
1340       log.recvTime = curTime - log.time;
1341       num --;
1342     }
1343     idx--;
1344   }
1345 }
1346
1347 void TraceProjections::beginExecute(CmiObjId *tid)
1348 {
1349 #if CMK_HAS_COUNTER_PAPI
1350   if (PAPI_read(papiEventSet, papiValues) != PAPI_OK) {
1351     CmiAbort("PAPI failed to read at begin execute!\n");
1352   }
1353 #endif
1354   if (checknested && inEntry) CmiAbort("Nested Begin Execute!\n");
1355   execEvent = CtvAccess(curThreadEvent);
1356   execEp = (-1);
1357   _logPool->add(BEGIN_PROCESSING,ForChareMsg,_threadEP,TraceTimer(),
1358                 execEvent,CkMyPe(), 0, tid);
1359 #if CMK_HAS_COUNTER_PAPI
1360   _logPool->addPapi(papiValues);
1361 #endif
1362   inEntry = 1;
1363 }
1364
1365 void TraceProjections::beginExecute(envelope *e)
1366 {
1367   if(e==0) {
1368 #if CMK_HAS_COUNTER_PAPI
1369     if (PAPI_read(papiEventSet, papiValues) != PAPI_OK) {
1370       CmiAbort("PAPI failed to read at begin execute!\n");
1371     }
1372 #endif
1373     if (checknested && inEntry) CmiAbort("Nested Begin Execute!\n");
1374     execEvent = CtvAccess(curThreadEvent);
1375     execEp = (-1);
1376     _logPool->add(BEGIN_PROCESSING,ForChareMsg,_threadEP,TraceTimer(),
1377                   execEvent,CkMyPe(), 0, NULL, 0.0, TraceCpuTimer());
1378 #if CMK_HAS_COUNTER_PAPI
1379     _logPool->addPapi(papiValues);
1380 #endif
1381     inEntry = 1;
1382   } else {
1383     beginExecute(e->getEvent(),e->getMsgtype(),e->getEpIdx(),
1384                  e->getSrcPe(),e->getTotalsize());
1385   }
1386 }
1387
1388 void TraceProjections::beginExecute(char *msg){
1389 #if CMK_SMP_TRACE_COMMTHREAD
1390         //This function is called from comm thread in SMP mode
1391     envelope *e = (envelope *)msg;
1392     int ep = e->getEpIdx();
1393     if(_entryTable[ep]->traceEnabled)
1394                 beginExecute(e);
1395 #endif
1396 }
1397
1398 void TraceProjections::beginExecute(int event, int msgType, int ep, int srcPe,
1399                                     int mlen, CmiObjId *idx)
1400 {
1401   if (traceNestedEvents) {
1402     if (! nestedEvents.isEmpty()) {
1403       endExecuteLocal();
1404     }
1405     nestedEvents.enq(NestedEvent(event, msgType, ep, srcPe, mlen, idx));
1406   }
1407   beginExecuteLocal(event, msgType, ep, srcPe, mlen, idx);
1408 }
1409
1410 void TraceProjections::changeLastEntryTimestamp(double ts)
1411 {
1412   _logPool->modLastEntryTimestamp(ts);
1413 }
1414
1415 void TraceProjections::beginExecuteLocal(int event, int msgType, int ep, int srcPe,
1416                                     int mlen, CmiObjId *idx)
1417 {
1418 #if CMK_HAS_COUNTER_PAPI
1419   if (PAPI_read(papiEventSet, papiValues) != PAPI_OK) {
1420     CmiAbort("PAPI failed to read at begin execute!\n");
1421   }
1422 #endif
1423   if (checknested && inEntry) CmiAbort("Nested Begin Execute!\n");
1424   execEvent=event;
1425   execEp=ep;
1426   execPe=srcPe;
1427   _logPool->add(BEGIN_PROCESSING,msgType,ep,TraceTimer(),event,
1428                 srcPe, mlen, idx, 0.0, TraceCpuTimer());
1429 #if CMK_HAS_COUNTER_PAPI
1430   _logPool->addPapi(papiValues);
1431 #endif
1432   inEntry = 1;
1433 }
1434
1435 void TraceProjections::endExecute(void)
1436 {
1437   if (traceNestedEvents) nestedEvents.deq();
1438   endExecuteLocal();
1439   if (traceNestedEvents) {
1440     if (! nestedEvents.isEmpty()) {
1441       NestedEvent &ne = nestedEvents.peek();
1442       beginExecuteLocal(ne.event, ne.msgType, ne.ep, ne.srcPe, ne.ml, ne.idx);
1443     }
1444   }
1445 }
1446
1447 void TraceProjections::endExecute(char *msg)
1448 {
1449 #if CMK_SMP_TRACE_COMMTHREAD
1450         //This function is called from comm thread in SMP mode
1451     envelope *e = (envelope *)msg;
1452     int ep = e->getEpIdx();
1453     if(_entryTable[ep]->traceEnabled)
1454                 endExecute();
1455 #endif  
1456 }
1457
1458 void TraceProjections::endExecuteLocal(void)
1459 {
1460 #if CMK_HAS_COUNTER_PAPI
1461   if (PAPI_read(papiEventSet, papiValues) != PAPI_OK) {
1462     CmiAbort("PAPI failed to read at end execute!\n");
1463   }
1464 #endif
1465   if (checknested && !inEntry) CmiAbort("Nested EndExecute!\n");
1466   double cputime = TraceCpuTimer();
1467   if(execEp == (-1)) {
1468     _logPool->add(END_PROCESSING, 0, _threadEP, TraceTimer(),
1469                   execEvent, CkMyPe(), 0, NULL, 0.0, cputime);
1470   } else {
1471     _logPool->add(END_PROCESSING, 0, execEp, TraceTimer(),
1472                   execEvent, execPe, 0, NULL, 0.0, cputime);
1473   }
1474 #if CMK_HAS_COUNTER_PAPI
1475   _logPool->addPapi(papiValues);
1476 #endif
1477   inEntry = 0;
1478 }
1479
1480 void TraceProjections::messageRecv(char *env, int pe)
1481 {
1482 #if 0
1483   envelope *e = (envelope *)env;
1484   int msgType = e->getMsgtype();
1485   int ep = e->getEpIdx();
1486 #if 0
1487   if (msgType==NewChareMsg || msgType==NewVChareMsg
1488           || msgType==ForChareMsg || msgType==ForVidMsg
1489           || msgType==BocInitMsg || msgType==NodeBocInitMsg
1490           || msgType==ForBocMsg || msgType==ForNodeBocMsg)
1491     ep = e->getEpIdx();
1492   else
1493     ep = _threadEP;
1494 #endif
1495   _logPool->add(MESSAGE_RECV, msgType, ep, TraceTimer(),
1496                 curevent++, e->getSrcPe(), e->getTotalsize());
1497 #endif
1498 }
1499
1500 void TraceProjections::beginIdle(double curWallTime)
1501 {
1502   _logPool->add(BEGIN_IDLE, 0, 0, TraceTimer(curWallTime), 0, CkMyPe());
1503 }
1504
1505 void TraceProjections::endIdle(double curWallTime)
1506 {
1507   _logPool->add(END_IDLE, 0, 0, TraceTimer(curWallTime), 0, CkMyPe());
1508 }
1509
1510 void TraceProjections::beginPack(void)
1511 {
1512   _logPool->add(BEGIN_PACK, 0, 0, TraceTimer(), 0, CkMyPe());
1513 }
1514
1515 void TraceProjections::endPack(void)
1516 {
1517   _logPool->add(END_PACK, 0, 0, TraceTimer(), 0, CkMyPe());
1518 }
1519
1520 void TraceProjections::beginUnpack(void)
1521 {
1522   _logPool->add(BEGIN_UNPACK, 0, 0, TraceTimer(), 0, CkMyPe());
1523 }
1524
1525 void TraceProjections::endUnpack(void)
1526 {
1527   _logPool->add(END_UNPACK, 0, 0, TraceTimer(), 0, CkMyPe());
1528 }
1529
1530 void TraceProjections::enqueue(envelope *) {}
1531
1532 void TraceProjections::dequeue(envelope *) {}
1533
1534 void TraceProjections::beginComputation(void)
1535 {
1536   computationStarted = 1;
1537
1538   // Executes the callback function provided by the machine
1539   // layer. This is the proper method to register user events in a
1540   // machine layer because projections is a charm++ module.
1541   if (CkpvAccess(traceOnPe) != 0) {
1542     void (*ptr)() = registerMachineUserEvents();
1543     if (ptr != NULL) {
1544       ptr();
1545     }
1546   }
1547 //  CkpvAccess(traceInitTime) = TRACE_TIMER();
1548 //  CkpvAccess(traceInitCpuTime) = TRACE_CPUTIMER();
1549   _logPool->add(BEGIN_COMPUTATION, 0, 0, TraceTimer(), -1, -1);
1550 #if CMK_HAS_COUNTER_PAPI
1551   // we start the counters here
1552   if (PAPI_start(papiEventSet) != PAPI_OK) {
1553     CmiAbort("PAPI failed to start designated counters!\n");
1554   }
1555 #endif
1556 }
1557
1558 void TraceProjections::endComputation(void)
1559 {
1560 #if CMK_HAS_COUNTER_PAPI
1561   // we stop the counters here. A silent failure is alright since we
1562   // are already at the end of the program.
1563   if (PAPI_stop(papiEventSet, papiValues) != PAPI_OK) {
1564     CkPrintf("Warning: PAPI failed to stop correctly!\n");
1565   }
1566   // NOTE: We should not do a complete close of PAPI until after the
1567   // sts writer is done.
1568 #endif
1569   endTime = TraceTimer();
1570   _logPool->add(END_COMPUTATION, 0, 0, endTime, -1, -1);
1571   /*
1572   CkPrintf("End Computation [%d] records time as %lf\n", CkMyPe(), 
1573            endTime*1e06);
1574   */
1575 }
1576
1577 int TraceProjections::idxRegistered(int idx)
1578 {
1579     int idxVecLen = idxVec.size();
1580     for(int i=0; i<idxVecLen; i++)
1581     {
1582         if(idx == idxVec[i])
1583             return 1;
1584     }
1585     return 0;
1586 }
1587
1588 void TraceProjections::regFunc(const char *name, int &idx, int idxSpecifiedByUser){
1589     StrKey k((char*)name,strlen(name));
1590     int num = funcHashtable.get(k);
1591     
1592     if(num!=0) {
1593         return;
1594         //as for mpi programs, the same function may be registered for several times
1595         //CmiError("\"%s has been already registered! Please change the name!\"\n", name);
1596     }
1597     
1598     int isIdxExisting=0;
1599     if(idxSpecifiedByUser)
1600         isIdxExisting=idxRegistered(idx);
1601     if(isIdxExisting){
1602         return;
1603         //same reason with num!=0
1604         //CmiError("The identifier %d for the trace function has been already registered!", idx);
1605     }
1606
1607     if(idxSpecifiedByUser) {
1608         char *st = new char[strlen(name)+1];
1609         memcpy(st,name,strlen(name)+1);
1610         StrKey *newKey = new StrKey(st,strlen(st));
1611         int &ref = funcHashtable.put(*newKey);
1612         ref=idx;
1613         funcCount++;
1614         idxVec.push_back(idx);  
1615     } else {
1616         char *st = new char[strlen(name)+1];
1617         memcpy(st,name,strlen(name)+1);
1618         StrKey *newKey = new StrKey(st,strlen(st));
1619         int &ref = funcHashtable.put(*newKey);
1620         ref=funcCount;
1621         num = funcCount;
1622         funcCount++;
1623         idx = num;
1624         idxVec.push_back(idx);
1625     }
1626 }
1627
1628 void TraceProjections::beginFunc(char *name,char *file,int line){
1629         StrKey k(name,strlen(name));    
1630         unsigned short  num = (unsigned short)funcHashtable.get(k);
1631         beginFunc(num,file,line);
1632 }
1633
1634 void TraceProjections::beginFunc(int idx,char *file,int line){
1635         if(idx <= 0){
1636                 CmiError("Unregistered function id %d being used in %s:%d \n",idx,file,line);
1637         }       
1638         _logPool->add(BEGIN_FUNC,TraceTimer(),idx,line,file);
1639 }
1640
1641 void TraceProjections::endFunc(char *name){
1642         StrKey k(name,strlen(name));    
1643         int num = funcHashtable.get(k);
1644         endFunc(num);   
1645 }
1646
1647 void TraceProjections::endFunc(int num){
1648         if(num <= 0){
1649                 printf("endFunc without start :O\n");
1650         }
1651         _logPool->add(END_FUNC,TraceTimer(),num,0,NULL);
1652 }
1653
1654 // specialized PUP:ers for handling trace projections logs
1655 void toProjectionsFile::bytes(void *p,int n,size_t itemSize,dataType t)
1656 {
1657   for (int i=0;i<n;i++) 
1658     switch(t) {
1659     case Tchar: CheckAndFPrintF(f,"%c",((char *)p)[i]); break;
1660     case Tuchar:
1661     case Tbyte: CheckAndFPrintF(f,"%d",((unsigned char *)p)[i]); break;
1662     case Tshort: CheckAndFPrintF(f," %d",((short *)p)[i]); break;
1663     case Tushort: CheckAndFPrintF(f," %u",((unsigned short *)p)[i]); break;
1664     case Tint: CheckAndFPrintF(f," %d",((int *)p)[i]); break;
1665     case Tuint: CheckAndFPrintF(f," %u",((unsigned int *)p)[i]); break;
1666     case Tlong: CheckAndFPrintF(f," %ld",((long *)p)[i]); break;
1667     case Tulong: CheckAndFPrintF(f," %lu",((unsigned long *)p)[i]); break;
1668     case Tfloat: CheckAndFPrintF(f," %.7g",((float *)p)[i]); break;
1669     case Tdouble: CheckAndFPrintF(f," %.15g",((double *)p)[i]); break;
1670 #ifdef CMK_PUP_LONG_LONG
1671     case Tlonglong: CheckAndFPrintF(f," %lld",((CMK_PUP_LONG_LONG *)p)[i]); break;
1672     case Tulonglong: CheckAndFPrintF(f," %llu",((unsigned CMK_PUP_LONG_LONG *)p)[i]); break;
1673 #endif
1674     default: CmiAbort("Unrecognized pup type code!");
1675     };
1676 }
1677
1678 void fromProjectionsFile::bytes(void *p,int n,size_t itemSize,dataType t)
1679 {
1680   for (int i=0;i<n;i++) 
1681     switch(t) {
1682     case Tchar: { 
1683       char c = fgetc(f);
1684       if (c==EOF)
1685         parseError("Could not match character");
1686       else
1687         ((char *)p)[i] = c;
1688       break;
1689     }
1690     case Tuchar:
1691     case Tbyte: ((unsigned char *)p)[i]=(unsigned char)readInt("%d"); break;
1692     case Tshort:((short *)p)[i]=(short)readInt(); break;
1693     case Tushort: ((unsigned short *)p)[i]=(unsigned short)readUint(); break;
1694     case Tint:  ((int *)p)[i]=readInt(); break;
1695     case Tuint: ((unsigned int *)p)[i]=readUint(); break;
1696     case Tlong: ((long *)p)[i]=readInt(); break;
1697     case Tulong:((unsigned long *)p)[i]=readUint(); break;
1698     case Tfloat: ((float *)p)[i]=(float)readDouble(); break;
1699     case Tdouble:((double *)p)[i]=readDouble(); break;
1700 #ifdef CMK_PUP_LONG_LONG
1701     case Tlonglong: ((CMK_PUP_LONG_LONG *)p)[i]=readLongInt(); break;
1702     case Tulonglong: ((unsigned CMK_PUP_LONG_LONG *)p)[i]=readLongInt(); break;
1703 #endif
1704     default: CmiAbort("Unrecognized pup type code!");
1705     };
1706 }
1707
1708 #if CMK_PROJECTIONS_USE_ZLIB
1709 void toProjectionsGZFile::bytes(void *p,int n,size_t itemSize,dataType t)
1710 {
1711   for (int i=0;i<n;i++) 
1712     switch(t) {
1713     case Tchar: gzprintf(f,"%c",((char *)p)[i]); break;
1714     case Tuchar:
1715     case Tbyte: gzprintf(f,"%d",((unsigned char *)p)[i]); break;
1716     case Tshort: gzprintf(f," %d",((short *)p)[i]); break;
1717     case Tushort: gzprintf(f," %u",((unsigned short *)p)[i]); break;
1718     case Tint: gzprintf(f," %d",((int *)p)[i]); break;
1719     case Tuint: gzprintf(f," %u",((unsigned int *)p)[i]); break;
1720     case Tlong: gzprintf(f," %ld",((long *)p)[i]); break;
1721     case Tulong: gzprintf(f," %lu",((unsigned long *)p)[i]); break;
1722     case Tfloat: gzprintf(f," %.7g",((float *)p)[i]); break;
1723     case Tdouble: gzprintf(f," %.15g",((double *)p)[i]); break;
1724 #ifdef CMK_PUP_LONG_LONG
1725     case Tlonglong: gzprintf(f," %lld",((CMK_PUP_LONG_LONG *)p)[i]); break;
1726     case Tulonglong: gzprintf(f," %llu",((unsigned CMK_PUP_LONG_LONG *)p)[i]); break;
1727 #endif
1728     default: CmiAbort("Unrecognized pup type code!");
1729     };
1730 }
1731 #endif
1732
1733 void TraceProjections::endPhase() {
1734   double currentPhaseTime = TraceTimer();
1735   if (lastPhaseEvent != NULL) {
1736   } else {
1737     if (_logPool->pool != NULL) {
1738       // assumed to be BEGIN_COMPUTATION
1739     } else {
1740       CkPrintf("[%d] Warning: End Phase encountered in an empty log. Inserting BEGIN_COMPUTATION event\n", CkMyPe());
1741       _logPool->add(BEGIN_COMPUTATION, 0, 0, currentPhaseTime, -1, -1);
1742     }
1743   }
1744
1745   /* Insert endPhase event here. */
1746   /* FIXME: Format should be TYPE, PHASE#, TimeStamp, [StartTime] */
1747   /*        We currently "borrow" from the standard add() method. */
1748   /*        It should really be its own add() method.             */
1749   /* NOTE: assignment to lastPhaseEvent is "pre-emptive".         */
1750   lastPhaseEvent = &(_logPool->pool[_logPool->numEntries]);
1751   _logPool->add(END_PHASE, 0, currentPhaseID, currentPhaseTime, -1, CkMyPe());
1752   currentPhaseID++;
1753 }
1754
1755 #ifdef PROJ_ANALYSIS
1756 // ***** FROM HERE, ALL BOC-BASED FUNCTIONALITY IS DEFINED *******
1757
1758
1759 // ***@@@@ REGISTRATION FUNCTIONS/METHODS @@@@***
1760
1761 void registerOutlierReduction() {
1762   outlierReductionType =
1763     CkReduction::addReducer(outlierReduction);
1764   minMaxReductionType =
1765     CkReduction::addReducer(minMaxReduction);
1766 }
1767
1768 /**
1769  * **IMPT NOTES**:
1770  *
1771  * This is the C++ code that is registered to be activated at module
1772  * shutdown. This is called exactly once on processor 0. Module shutdown
1773  * is initiated as a result of a CkExit() call by the application code
1774  * 
1775  * The exit function must ultimately call CkExit() again to
1776  * so that other module exit functions may proceed after this module is
1777  * done.
1778  *
1779  */
1780 // FIXME: WHY extern "C"???
1781 extern "C" void TraceProjectionsExitHandler()
1782 {
1783 #if CMK_TRACE_ENABLED
1784   // CkPrintf("[%d] TraceProjectionsExitHandler called!\n", CkMyPe());
1785   CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1786   bocProxy.traceProjectionsParallelShutdown(CkMyPe());
1787 #else
1788   CkExit();
1789 #endif
1790 }
1791
1792 // This is called once on each processor but the idiom of use appears
1793 // to be to only have processor 0 register the function.
1794 //
1795 // See initnode in trace-projections.ci
1796 void initTraceProjectionsBOC()
1797 {
1798   // CkPrintf("[%d] Trace Projections initialization called!\n", CkMyPe());
1799 #ifdef __BIGSIM__
1800   if (BgNodeRank() == 0) {
1801 #else
1802     if (CkMyRank() == 0) {
1803 #endif
1804       registerExitFn(TraceProjectionsExitHandler);
1805     }
1806 #if 0
1807   } // this is so indentation does not get messed up
1808 #endif
1809 }
1810
1811 // mainchare for trace-projections BOC-operations. 
1812 // Instantiated at processor 0 and ONLY resides on processor 0 for the 
1813 // rest of its life.
1814 //
1815 // Responsible for:
1816 //   1. Handling commandline arguments
1817 //   2. Creating any objects required for proper BOC operations.
1818 //
1819 TraceProjectionsInit::TraceProjectionsInit(CkArgMsg *msg) {
1820   /** Options for Outlier Analysis */
1821   // defaults. Things will change with support for interactive analysis.
1822   bool findOutliers = false;
1823   bool outlierAutomatic = true;
1824   int numKSeeds = 10; 
1825
1826   int peNumKeep = CkNumPes();  // used as a default
1827   double entryThreshold = 0.0;
1828   bool outlierUsePhases = false;
1829   if (outlierAutomatic) {
1830     CmiGetArgIntDesc(msg->argv, "+outlierNumSeeds", &numKSeeds,
1831                      "Number of cluster seeds to apply at outlier analysis.");
1832     CmiGetArgIntDesc(msg->argv, "+outlierPeNumKeep", 
1833                      &peNumKeep, "Number of Processors to retain data");
1834     CmiGetArgDoubleDesc(msg->argv, "+outlierEpThresh", &entryThreshold,
1835                         "Minimum significance of entry points to be considered for clustering (%).");
1836     findOutliers =
1837       CmiGetArgFlagDesc(msg->argv,"+outlier", "Find Outliers.");
1838     outlierUsePhases = 
1839       CmiGetArgFlagDesc(msg->argv,"+outlierUsePhases",
1840                         "Apply automatic outlier analysis to any available phases.");
1841     if (outlierUsePhases) {
1842       // if the user wants to use an outlier feature, it is assumed outlier
1843       //    analysis is desired.
1844       findOutliers = true;
1845     }
1846   }
1847   bool findStartTime = (CmiTimerAbsolute()==1);
1848   traceProjectionsGID = CProxy_TraceProjectionsBOC::ckNew(findOutliers, findStartTime);
1849   if (findOutliers) {
1850     kMeansGID = CProxy_KMeansBOC::ckNew(outlierAutomatic,
1851                                         numKSeeds,
1852                                         peNumKeep,
1853                                         entryThreshold,
1854                                         outlierUsePhases);
1855   }
1856 }
1857
1858 // Called on every processor.
1859 void TraceProjectionsBOC::traceProjectionsParallelShutdown(int pe) {
1860   //CmiPrintf("[%d] traceProjectionsParallelShutdown called from . \n", CkMyPe(), pe);
1861   endPe = pe;                // the pe that starts CkExit()
1862   if (CkMyPe() == 0) {
1863     analysisStartTime = CmiWallTimer();
1864   }
1865   CkpvAccess(_trace)->endComputation();
1866   // no more tracing for projections on this processor after this point. 
1867   // Note that clear must be called after remove, or bad things will happen.
1868   CkpvAccess(_traces)->removeTrace(CkpvAccess(_trace));
1869   CkpvAccess(_traces)->clearTrace();
1870
1871   // From this point, we start multiple chains of reductions and broadcasts to
1872   // perform final online analysis activities.
1873
1874   // Start all parallel operations at once. 
1875   //   These MUST NOT modify base performance data in LogPool. If they must,
1876   //   then the parallel operations must be phased (and this code to be
1877   //   restructured as necessary)
1878   CProxy_KMeansBOC kMeansProxy(kMeansGID);
1879   CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1880   if (findOutliers) {
1881     parModulesRemaining++;
1882     kMeansProxy[CkMyPe()].startKMeansAnalysis();
1883   }
1884   parModulesRemaining++;
1885   if (findStartTime) 
1886   bocProxy[CkMyPe()].startTimeAnalysis();
1887   else
1888   bocProxy[CkMyPe()].startEndTimeAnalysis();
1889 }
1890
1891 // Called on each processor
1892 void KMeansBOC::startKMeansAnalysis() {
1893   // Initialize all necessary structures
1894   LogPool *pool = CkpvAccess(_trace)->_logPool;
1895
1896  if(CkMyPe()==0)     CkPrintf("[%d] KMeansBOC::startKMeansAnalysis time=\t%g\n", CkMyPe(), CkWallTimer() );
1897   int flushInt = 0;
1898   if (pool->hasFlushed) {
1899     flushInt = 1;
1900   }
1901   
1902   CkCallback cb(CkIndex_KMeansBOC::flushCheck(NULL), 
1903                 0, thisProxy);
1904   contribute(sizeof(int), &flushInt, CkReduction::logical_or, cb);  
1905 }
1906
1907 // Called on processor 0
1908 void KMeansBOC::flushCheck(CkReductionMsg *msg) {
1909   int someFlush = *((int *)msg->getData());
1910
1911   // if(CkMyPe()==0) CkPrintf("[%d] KMeansBOC::flushCheck time=\t%g\n", CkMyPe(), CkWallTimer() );
1912   
1913   if (someFlush == 0) {
1914     // Data intact proceed with KMeans analysis
1915     CProxy_KMeansBOC kMeansProxy(kMeansGID);
1916     kMeansProxy.flushCheckDone();
1917   } else {
1918     // Some processor had flushed it data at some point, abandon KMeans
1919     CkPrintf("Warning: Some processor has flushed its data. No KMeans will be conducted\n");
1920     // terminate KMeans
1921     CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1922     bocProxy[0].kMeansDone();
1923   }
1924 }
1925
1926 // Called on each processor
1927 void KMeansBOC::flushCheckDone() {
1928   // **FIXME** - more flexible metric collection scheme may be necessary
1929   //   in the future for production use.
1930   LogPool *pool = CkpvAccess(_trace)->_logPool;
1931
1932   // if(CkMyPe()==0)     CkPrintf("[%d] KMeansBOC::flushCheckDone time=\t%g\n", CkMyPe(), CkWallTimer() );
1933
1934   numEntryMethods = _entryTable.size();
1935   numMetrics = numEntryMethods + 2; // EPtime + idle and overhead
1936
1937   // maintained across phases
1938   markedBegin = false;
1939   markedIdle = false;
1940   beginBlockTime = 0.0;
1941   beginIdleBlockTime = 0.0;
1942   lastBeginEPIdx = -1; // none
1943
1944   lastPhaseIdx = 0;
1945   currentExecTimes = NULL;
1946   currentPhase = 0;
1947   selected = false;
1948
1949   pool->initializePhases();
1950
1951   // incoming K Seeds and the per-phase filter
1952   incKSeeds = new double[numK*numMetrics];
1953   keepMetric = new bool[numMetrics];
1954
1955   //  Something wrong when call thisProxy[CkMyPe()].getNextPhaseMetrics() !??!
1956   //  CProxy_KMeansBOC kMeansProxy(kMeansGID);
1957   //  kMeansProxy[CkMyPe()].getNextPhaseMetrics();
1958   thisProxy[CkMyPe()].getNextPhaseMetrics();
1959 }
1960
1961 // Called on each processor.
1962 void KMeansBOC::getNextPhaseMetrics() {
1963   // Assumes the presence of the complete logs on this processor.
1964   // Assumes first event is always BEGIN_COMPUTATION
1965   // Assumes each processor sees the same number of phases.
1966   //
1967   // In this code, we collect performance data for this processor.
1968   // All times are in seconds.
1969
1970   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::getNextPhaseMetrics time=\t%g\n", CkMyPe(), CkWallTimer() );  
1971
1972   if (usePhases) {
1973     DEBUGF("[%d] Using Phases\n", CkMyPe());
1974   } else {
1975     DEBUGF("[%d] NOT using Phases\n", CkMyPe());
1976   }
1977   
1978   if (currentExecTimes != NULL) {
1979     delete [] currentExecTimes;
1980   }
1981   currentExecTimes = new double[numMetrics];
1982   for (int i=0; i<numMetrics; i++) {
1983     currentExecTimes[i] = 0.0;
1984   }
1985
1986   int numEventMethods = _entryTable.size();
1987   LogPool *pool = CkpvAccess(_trace)->_logPool;
1988   
1989   CkAssert(pool->numEntries > lastPhaseIdx);
1990   double totalPhaseTime = 0.0;
1991   double totalActiveTime = 0.0; // entry method + idle
1992
1993   for (int i=lastPhaseIdx; i<pool->numEntries; i++) {
1994     if (pool->pool[i].type == BEGIN_PROCESSING) {
1995       // check pairing
1996       if (!markedBegin) {
1997         markedBegin = true;
1998       }
1999       beginBlockTime = pool->pool[i].time;
2000       lastBeginEPIdx = pool->pool[i].eIdx;
2001     } else if (pool->pool[i].type == END_PROCESSING) {
2002       // check pairing
2003       // if End without a begin, just ignore
2004       //   this event. If a phase-boundary is crossed, the Begin
2005       //   event would be maintained in beginBlockTime, so it is 
2006       //   not a problem.
2007       if (markedBegin) {
2008         markedBegin = false;
2009         if (pool->pool[i].event < 0)
2010         {
2011           // ignore dummy events. **FIXME** as they have no eIdx?
2012           continue;
2013         }
2014         currentExecTimes[pool->pool[i].eIdx] += 
2015           pool->pool[i].time - beginBlockTime;
2016         totalActiveTime += pool->pool[i].time - beginBlockTime;
2017         lastBeginEPIdx = -1;
2018       }
2019     } else if (pool->pool[i].type == BEGIN_IDLE) {
2020       // check pairing
2021       if (!markedIdle) {
2022         markedIdle = true;
2023       }
2024       beginIdleBlockTime = pool->pool[i].time;
2025     } else if (pool->pool[i].type == END_IDLE) {
2026       // check pairing
2027       if (markedIdle) {
2028         markedIdle = false;
2029         currentExecTimes[numEventMethods] += 
2030           pool->pool[i].time - beginIdleBlockTime;
2031         totalActiveTime += pool->pool[i].time - beginIdleBlockTime;
2032       }
2033     } else if (pool->pool[i].type == END_PHASE) {
2034       // ignored when not using phases
2035       if (usePhases) {
2036         // when we've not visited this node before
2037         if (i != lastPhaseIdx) { 
2038           totalPhaseTime = 
2039             pool->pool[i].time - pool->pool[lastPhaseIdx].time;
2040           // it is important that proper accounting of time take place here.
2041           // Note that END_PHASE events inevitably occur in the context of
2042           //   some entry method by the way the tracing API is designed.
2043           if (markedBegin) {
2044             CkAssert(lastBeginEPIdx >= 0);
2045             currentExecTimes[lastBeginEPIdx] += 
2046               pool->pool[i].time - beginBlockTime;
2047             totalActiveTime += pool->pool[i].time - beginBlockTime;
2048             // this is so the remainder contributes to the next phase
2049             beginBlockTime = pool->pool[i].time;
2050           }
2051           // The following is unlikely, but stranger things have happened.
2052           if (markedIdle) {
2053             currentExecTimes[numEventMethods] +=
2054               pool->pool[i].time - beginIdleBlockTime;
2055             totalActiveTime += pool->pool[i].time - beginIdleBlockTime;
2056             // this is so the remainder contributes to the next phase
2057             beginIdleBlockTime = pool->pool[i].time;
2058           }
2059           if (totalActiveTime <= totalPhaseTime) {
2060             currentExecTimes[numEventMethods+1] = 
2061               totalPhaseTime - totalActiveTime;
2062           } else {
2063             currentExecTimes[numEventMethods+1] = 0.0;
2064             CkPrintf("[%d] Warning: Overhead found to be negative for Phase %d!\n",
2065                      CkMyPe(), currentPhase);
2066           }
2067           collectKMeansData();
2068           // end the loop (and method) and defer the work till the next call
2069           lastPhaseIdx = i;
2070           break; 
2071         }
2072       }
2073     } else if (pool->pool[i].type == END_COMPUTATION) {
2074       if (markedBegin) {
2075         CkAssert(lastBeginEPIdx >= 0);
2076         currentExecTimes[lastBeginEPIdx] += 
2077           pool->pool[i].time - beginBlockTime;
2078         totalActiveTime += pool->pool[i].time - beginBlockTime;
2079       }
2080       if (markedIdle) {
2081         currentExecTimes[numEventMethods] +=
2082           pool->pool[i].time - beginIdleBlockTime;
2083         totalActiveTime += pool->pool[i].time - beginIdleBlockTime;
2084       }
2085       totalPhaseTime = 
2086         pool->pool[i].time - pool->pool[lastPhaseIdx].time;
2087       if (totalActiveTime <= totalPhaseTime) {
2088         currentExecTimes[numEventMethods+1] = totalPhaseTime - totalActiveTime;
2089       } else {
2090         currentExecTimes[numEventMethods+1] = 0.0;
2091         CkPrintf("[%d] Warning: Overhead found to be negative!\n",
2092                  CkMyPe());
2093       }
2094       collectKMeansData();
2095     }
2096   }
2097 }
2098
2099 /**
2100  *  Through a reduction, collectKMeansData aggregates each processors' data
2101  *  in order for global properties to be determined:
2102  *  
2103  *  1. min & max to determine normalization factors.
2104  *  2. sum to determine global EP averages for possible metric reduction
2105  *       through thresholding.
2106  *  3. sum of squares to compute stddev which may be useful in the future.
2107  *
2108  *  collectKMeansData will also keep the processor's data for the current
2109  *    phase so that it may be normalized and worked on subsequently.
2110  *
2111  **/
2112 void KMeansBOC::collectKMeansData() {
2113   int minOffset = numMetrics;
2114   int maxOffset = 2*numMetrics;
2115   int sosOffset = 3*numMetrics; // sos = Sum Of Squares
2116
2117   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::collectKMeansData time=\tg\n", CkMyPe(), CkWallTimer() );
2118
2119   double *reductionMsg = new double[numMetrics*4];
2120
2121   for (int i=0; i<numMetrics; i++) {
2122     reductionMsg[i] = currentExecTimes[i];
2123     // duplicate the event times for max and min sections of the reduction
2124     reductionMsg[minOffset + i] = currentExecTimes[i];
2125     reductionMsg[maxOffset + i] = currentExecTimes[i];
2126     // compute squares
2127     reductionMsg[sosOffset + i] = currentExecTimes[i]*currentExecTimes[i];
2128   }
2129
2130   CkCallback cb(CkIndex_KMeansBOC::globalMetricRefinement(NULL), 
2131                 0, thisProxy);
2132   contribute((numMetrics*4)*sizeof(double), reductionMsg, 
2133              outlierReductionType, cb);  
2134 }
2135
2136 // The purpose is mainly to initialize the k seeds and generate
2137 //   normalization parameters for each of the metrics. The k seeds
2138 //   and normalization parameters are broadcast to all processors.
2139 //
2140 // Called on processor 0
2141 void KMeansBOC::globalMetricRefinement(CkReductionMsg *msg) {
2142   CkAssert(CkMyPe() == 0);
2143   
2144   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::globalMetricRefinement time=\t%g\n", CkMyPe(), CkWallTimer() );
2145
2146   int sumOffset = 0;
2147   int minOffset = numMetrics;
2148   int maxOffset = 2*numMetrics;
2149   int sosOffset = 3*numMetrics; // sos = Sum Of Squares
2150
2151   // calculate statistics & boundaries for the k seeds for clustering
2152   KMeansStatsMessage *outmsg = 
2153     new (numMetrics, numK*numMetrics, numMetrics*4) KMeansStatsMessage;
2154   outmsg->numMetrics = numMetrics;
2155   outmsg->numKPos = numK*numMetrics;
2156   outmsg->numStats = numMetrics*4;
2157
2158   // Sum | Min | Max | Sum of Squares
2159   double *totalExecTimes = (double *)msg->getData();
2160   double totalTime = 0.0;
2161
2162   for (int i=0; i<numMetrics; i++) {
2163     DEBUGN("%lf\n", totalExecTimes[i]);
2164     totalTime += totalExecTimes[i];
2165
2166     // calculate event mean over all processors
2167     outmsg->stats[sumOffset + i] = totalExecTimes[sumOffset + i]/CkNumPes();
2168
2169     // get the ranges and offsets of each metric. With this, we get
2170     //   normalization factors that can be sent back to each processor to
2171     //   be used as necessary. We reuse max for range. Min remains the offset.
2172     outmsg->stats[minOffset + i] = totalExecTimes[minOffset + i];
2173     outmsg->stats[maxOffset + i] = totalExecTimes[maxOffset + i] -
2174       totalExecTimes[minOffset + i];
2175     
2176     // calculate stddev (using biased variance)
2177     outmsg->stats[sosOffset + i] = 
2178       sqrt((totalExecTimes[sosOffset + i] - 
2179             2*(outmsg->stats[i])*totalExecTimes[i] +
2180             (outmsg->stats[i])*(outmsg->stats[i])*CkNumPes())/
2181            CkNumPes());
2182   }
2183
2184   for (int i=0; i<numMetrics; i++) {
2185     // 1) if the proportion of the max value of the entry method relative to
2186     //   the average time taken over all entry methods across all processors
2187     //   is greater than the stipulated percentage threshold ...; AND
2188     // 2) if the range of values are non-zero.
2189     //
2190     // The current assumption is totalTime > 0 (what program has zero total
2191     //   time from all work?)
2192     keepMetric[i] = ((totalExecTimes[maxOffset + i]/(totalTime/CkNumPes()) >=
2193                      entryThreshold) &&
2194       (totalExecTimes[maxOffset + i] > totalExecTimes[minOffset + i]));
2195     if (keepMetric[i]) {
2196       DEBUGF("[%d] Keep EP %d | Max = %lf | Avg Tot = %lf\n", CkMyPe(), i,
2197              totalExecTimes[maxOffset + i], totalTime/CkNumPes());
2198     } else {
2199       DEBUGN("[%d] DO NOT Keep EP %d\n", CkMyPe(), i);
2200     }
2201     outmsg->filter[i] = keepMetric[i];
2202   }
2203
2204   delete msg;
2205   
2206   // initialize k seeds for this phase
2207   kSeeds = new double[numK*numMetrics];
2208
2209   numKReported = 0;
2210   kNumMembers = new int[numK];
2211
2212   // Randomly select k processors' metric vectors for the k seeds
2213   //  srand((unsigned)(CmiWallTimer()*1.0e06));
2214   srand(11337); // for debugging purposes
2215   for (int k=0; k<numK; k++) {
2216     DEBUGF("Seed %d | ", k);
2217     for (int m=0; m<numMetrics; m++) {
2218       double factor = totalExecTimes[maxOffset + m] - 
2219         totalExecTimes[minOffset + m];
2220       // "uniform" distribution, scaled according to the normalization
2221       //   factors
2222       //      kSeeds[numMetrics*k + m] = ((1.0*(k+1))/numK)*factor;
2223       // Random distribution.
2224       kSeeds[numMetrics*k + m] =
2225         ((rand()*1.0)/RAND_MAX)*factor;
2226       if (keepMetric[m]) {
2227         DEBUGF("[%d|%lf] ", m, kSeeds[numMetrics*k + m]);
2228       }
2229       outmsg->kSeedsPos[numMetrics*k + m] = kSeeds[numMetrics*k + m];
2230     }
2231     DEBUGF("\n");
2232     kNumMembers[k] = 0;
2233   }
2234
2235   // broadcast statistical values to all processors for cluster discovery
2236   thisProxy.findInitialClusters(outmsg);
2237 }
2238
2239
2240
2241 // Called on each processor.
2242 void KMeansBOC::findInitialClusters(KMeansStatsMessage *msg) {
2243
2244  if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::findInitialClusters time=\t%g\n", CkMyPe(), CkWallTimer() );
2245
2246   phaseIter = 0;
2247
2248   // Get info from stats message
2249   CkAssert(numMetrics == msg->numMetrics);
2250   for (int i=0; i<numMetrics; i++) {
2251     keepMetric[i] = msg->filter[i];
2252   }
2253
2254   // Normalize data on local processor.
2255   // **CWL** See my thesis for detailed discussion of normalization of
2256   //    performance data.
2257   // **NOTE** This might change if we want to send data based on the filter
2258   //   instead of all the data.
2259   CkAssert(numMetrics*4 == msg->numStats);
2260   for (int i=0; i<numMetrics; i++) {
2261     currentExecTimes[i] -= msg->stats[numMetrics + i];  // take offset
2262     // **CWL** We do not normalize the range. Entry methods that exhibit
2263     //   large absolute timing variations should be allowed to contribute
2264     //   more to the Euclidean distance measure!
2265     // currentExecTimes[i] /= msg->stats[2*numMetrics + i];
2266   }
2267
2268   // **NOTE** This might change if we want to send data based on the filter
2269   //   instead of all the data.
2270   CkAssert(numK*numMetrics == msg->numKPos);
2271   for (int i=0; i<msg->numKPos; i++) {
2272     incKSeeds[i] = msg->kSeedsPos[i];
2273   }
2274
2275   // Decide which KSeed this processor belongs to.
2276   minDistance = calculateDistance(0);
2277   DEBUGN("[%d] Phase %d Iter %d | Distance from 0 = %lf \n", CkMyPe(), 
2278            currentPhase, phaseIter, minDistance);
2279   minK = 0;
2280   for (int i=1; i<numK; i++) {
2281     double distance = calculateDistance(i);
2282     DEBUGN("[%d] Phase %d Iter %d | Distance from %d = %lf \n", CkMyPe(), 
2283              currentPhase, phaseIter, i, distance);
2284     if (distance < minDistance) {
2285       minDistance = distance;
2286       minK = i;
2287     }
2288   }
2289
2290   // Set up a reduction with the modification vector to the root (0).
2291   //
2292   // The modification vector sends a negative value for each metric
2293   //   for the K this processor no longer belongs to and a positive
2294   //   value to the K the processor now belongs. In addition, a -1.0
2295   //   is sent to the K it is leaving and a +1.0 to the K it is 
2296   //   joining.
2297   //
2298   // The processor must still contribute a "zero returns" even if
2299   //   nothing changes. This will be the basis for determine
2300   //   convergence at the root.
2301   //
2302   // The addtional +1 is meant for the count-change that must be
2303   //   maintained for the special cases at the root when some K
2304   //   may be deprived of all processor points or go from 0 to a
2305   //   positive number of processors (see later comments).
2306   double *modVector = new double[numK*(numMetrics+1)];
2307   for (int i=0; i<numK; i++) {
2308     for (int j=0; j<numMetrics+1; j++) {
2309       modVector[i*(numMetrics+1) + j] = 0.0;
2310     }
2311   }
2312   for (int i=0; i<numMetrics; i++) {
2313     // for this initialization, only positive values need be sent.
2314     modVector[minK*(numMetrics+1) + i] = currentExecTimes[i];
2315   }
2316   modVector[minK*(numMetrics+1)+numMetrics] = 1.0;
2317
2318   CkCallback cb(CkIndex_KMeansBOC::updateKSeeds(NULL), 
2319                 0, thisProxy);
2320   contribute(numK*(numMetrics+1)*sizeof(double), modVector, 
2321              CkReduction::sum_double, cb);  
2322 }
2323
2324 double KMeansBOC::calculateDistance(int k) {
2325   double ret = 0.0;
2326   for (int i=0; i<numMetrics; i++) {
2327     if (keepMetric[i]) {
2328       DEBUGN("[%d] Phase %d Iter %d Metric %d Exec %lf Seed %lf \n", 
2329              CkMyPe(), currentPhase, phaseIter, i,
2330                currentExecTimes[i], incKSeeds[k*numMetrics + i]);
2331       ret += pow(currentExecTimes[i] - incKSeeds[k*numMetrics + i], 2.0);
2332     }
2333   }
2334   return sqrt(ret);
2335 }
2336
2337 void KMeansBOC::updateKSeeds(CkReductionMsg *msg) {
2338   CkAssert(CkMyPe() == 0);
2339
2340   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::updateKSeeds time=\t%g\n", CkMyPe(), CkWallTimer() );
2341
2342   double *modVector = (double *)msg->getData();
2343   // sanity check
2344   CkAssert(numK*(numMetrics+1)*sizeof(double) == msg->getSize());
2345
2346   // A quick convergence test.
2347   bool hasChanges = false;
2348   for (int i=0; i<numK; i++) {
2349     hasChanges = hasChanges || 
2350       (modVector[i*(numMetrics+1) + numMetrics] != 0.0);
2351   }
2352   if (!hasChanges) {
2353     delete msg;
2354     findRepresentatives();
2355   } else {
2356     int overallChange = 0;
2357     for (int i=0; i<numK; i++) {
2358       int change = (int)modVector[i*(numMetrics+1) + numMetrics];
2359       if (change != 0) {
2360         overallChange += change;
2361         // modify the k seeds based on the modification vectors coming in
2362         //
2363         // If a seed initially has no members, its contents do not matter and
2364         //   is simply set to the average of the incoming vector.
2365         // If the change causes a seed to lose all its members, do nothing.
2366         //   Its last-known location is kept to allow it to re-capture
2367         //   membership at the next iteration rather than apply the last
2368         //   changes (which snaps the point unnaturally to 0,0).
2369         // Otherwise, apply the appropriate vector changes.
2370         CkAssert((kNumMembers[i] + change >= 0) &&
2371                  (kNumMembers[i] + change <= CkNumPes()));
2372         if (kNumMembers[i] == 0) {
2373           CkAssert(change > 0);
2374           for (int j=0; j<numMetrics; j++) {
2375             kSeeds[i*numMetrics + j] = modVector[i*(numMetrics+1) + j]/change;
2376           }
2377         } else if (kNumMembers[i] + change == 0) {
2378           // do nothing.
2379         } else {
2380           for (int j=0; j<numMetrics; j++) {
2381             kSeeds[i*numMetrics + j] *= kNumMembers[i];
2382             kSeeds[i*numMetrics + j] += modVector[i*(numMetrics+1) + j];
2383             kSeeds[i*numMetrics + j] /= kNumMembers[i] + change;
2384           }
2385         }
2386         kNumMembers[i] += change;
2387       }
2388       DEBUGN("[%d] Phase %d Iter %d K = %d Membership Count = %d\n",
2389              CkMyPe(), currentPhase, phaseIter, i, kNumMembers[i]);
2390     }
2391     delete msg;
2392
2393     // broadcast the new seed locations.
2394     KSeedsMessage *outmsg = new (numK*numMetrics) KSeedsMessage;
2395     outmsg->numKPos = numK*numMetrics;
2396     for (int i=0; i<numK*numMetrics; i++) {
2397       outmsg->kSeedsPos[i] = kSeeds[i];
2398     }
2399
2400     thisProxy.updateSeedMembership(outmsg);
2401   }
2402 }
2403
2404 // Called on all processors
2405 void KMeansBOC::updateSeedMembership(KSeedsMessage *msg) {
2406
2407   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::updateSeedMembership time=\t%g\n", CkMyPe(), CkWallTimer() );
2408
2409   phaseIter++;
2410
2411   // **NOTE** This might change if we want to send data based on the filter
2412   //   instead of all the data.
2413   CkAssert(numK*numMetrics == msg->numKPos);
2414   for (int i=0; i<msg->numKPos; i++) {
2415     incKSeeds[i] = msg->kSeedsPos[i];
2416   }
2417
2418   // Decide which KSeed this processor belongs to.
2419   lastMinK = minK;
2420   minDistance = calculateDistance(0);
2421   DEBUGN("[%d] Phase %d Iter %d | Distance from 0 = %lf \n", CkMyPe(), 
2422          currentPhase, phaseIter, minDistance);
2423
2424   minK = 0;
2425   for (int i=1; i<numK; i++) {
2426     double distance = calculateDistance(i);
2427     DEBUGN("[%d] Phase %d Iter %d | Distance from %d = %lf \n", CkMyPe(), 
2428            currentPhase, phaseIter, i, distance);
2429     if (distance < minDistance) {
2430       minDistance = distance;
2431       minK = i;
2432     }
2433   }
2434
2435   double *modVector = new double[numK*(numMetrics+1)];
2436   for (int i=0; i<numK; i++) {
2437     for (int j=0; j<numMetrics+1; j++) {
2438       modVector[i*(numMetrics+1) + j] = 0.0;
2439     }
2440   }
2441
2442   if (minK != lastMinK) {
2443     for (int i=0; i<numMetrics; i++) {
2444       modVector[minK*(numMetrics+1) + i] = currentExecTimes[i];
2445       modVector[lastMinK*(numMetrics+1) + i] = -currentExecTimes[i];
2446     }
2447     modVector[minK*(numMetrics+1)+numMetrics] = 1.0;
2448     modVector[lastMinK*(numMetrics+1)+numMetrics] = -1.0;
2449   }
2450
2451   CkCallback cb(CkIndex_KMeansBOC::updateKSeeds(NULL), 
2452                 0, thisProxy);
2453   contribute(numK*(numMetrics+1)*sizeof(double), modVector, 
2454              CkReduction::sum_double, cb);  
2455 }
2456
2457 void KMeansBOC::findRepresentatives() {
2458
2459   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::findRepresentatives time=\t%g\n", CkMyPe(), CkWallTimer() );
2460
2461   int numNonEmptyClusters = 0;
2462   for (int i=0; i<numK; i++) {
2463     if (kNumMembers[i] > 0) {
2464       numNonEmptyClusters++;
2465     }
2466   }
2467
2468   int numRepresentatives = peNumKeep;
2469   // **FIXME**
2470   // This is fairly arbitrary. Next time, choose the centers of the top
2471   //   largest clusters.
2472   if (numRepresentatives < numNonEmptyClusters) {
2473     numRepresentatives = numNonEmptyClusters;
2474   }
2475
2476   int slotsRemaining = numRepresentatives;
2477
2478   DEBUGF("Slots = %d | Non-empty = %d \n", slotsRemaining, 
2479          numNonEmptyClusters);
2480
2481   // determine how many exemplars to select per cluster. Currently
2482   //   hardcoded to 1. Future challenge is to decide on other numbers
2483   //   or proportionality.
2484   //
2485   int exemplarsPerCluster = 1;
2486   slotsRemaining -= exemplarsPerCluster*numNonEmptyClusters;
2487
2488   int numCandidateOutliers = CkNumPes() - 
2489     exemplarsPerCluster*numNonEmptyClusters;
2490
2491   double *remainders = new double[numK];
2492   int *assigned = new int[numK];
2493   exemplarChoicesLeft = new int[numK];
2494   outlierChoicesLeft = new int[numK];
2495
2496   for (int i=0; i<numK; i++) {
2497     assigned[i] = 0;
2498     remainders[i] = 
2499       (kNumMembers[i] - exemplarsPerCluster*numNonEmptyClusters) *
2500       slotsRemaining / numCandidateOutliers;
2501     if (remainders[i] >= 0.0) {
2502       assigned[i] = (int)floor(remainders[i]);
2503       remainders[i] -= assigned[i];
2504     } else {
2505       remainders[i] = 0.0;
2506     }
2507   }
2508
2509   for (int i=0; i<numK; i++) {
2510     slotsRemaining -= assigned[i];
2511   }
2512   CkAssert(slotsRemaining >= 0);
2513
2514   // find clusters to assign the loose slots to, in order of
2515   // remainder proportion
2516   while (slotsRemaining > 0) {
2517     double max = 0.0;
2518     int winner = 0;
2519     for (int i=0; i<numK; i++) {
2520       if (remainders[i] > max) {
2521         max = remainders[i];
2522         winner = i;
2523       }
2524     }
2525     assigned[winner]++;
2526     remainders[winner] = 0.0;
2527     slotsRemaining--;
2528   }
2529
2530   // set up how many reduction cycles of min/max we need to conduct to
2531   // select the representatives.
2532   numSelectionIter = exemplarsPerCluster;
2533   for (int i=0; i<numK; i++) {
2534     if (assigned[i] > numSelectionIter) {
2535       numSelectionIter = assigned[i];
2536     }
2537   }
2538   DEBUGF("Selection Iterations = %d\n", numSelectionIter);
2539
2540   for (int i=0; i<numK; i++) {
2541     if (kNumMembers[i] > 0) {
2542       exemplarChoicesLeft[i] = exemplarsPerCluster;
2543       outlierChoicesLeft[i] = assigned[i];
2544     } else {
2545       exemplarChoicesLeft[i] = 0;
2546       outlierChoicesLeft[i] = 0;
2547     }
2548     DEBUGF("%d | Exemplar = %d | Outlier = %d\n", i, exemplarChoicesLeft[i],
2549            outlierChoicesLeft[i]);
2550   }
2551
2552   delete [] assigned;
2553   delete [] remainders;
2554
2555   // send out first broadcast
2556   KSelectionMessage *outmsg = NULL;
2557   if (numSelectionIter > 0) {
2558     outmsg = new (numK, numK, numK) KSelectionMessage;
2559     outmsg->numKMinIDs = numK;
2560     outmsg->numKMaxIDs = numK;
2561     for (int i=0; i<numK; i++) {
2562       outmsg->minIDs[i] = -1;
2563       outmsg->maxIDs[i] = -1;
2564     }
2565     thisProxy.collectDistances(outmsg);
2566   } else {
2567     CkPrintf("Warning: No selection iteration from the start!\n");
2568     // invoke phase completion on all processors
2569     thisProxy.phaseDone();
2570   }
2571 }
2572
2573 /*
2574  *  lastMin = array of minimum champions of the last tournament
2575  *  lastMax = array of maximum champions of the last tournament
2576  *  lastMaxVal = array of last encountered maximum values, allows previous
2577  *                 minimum winners to eliminate themselves from the next
2578  *                 minimum race.
2579  *
2580  *  Called on all processors.
2581  */
2582 void KMeansBOC::collectDistances(KSelectionMessage *msg) {
2583
2584   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::collectDistances time=\t%g\n", CkMyPe(), CkWallTimer() );
2585
2586   DEBUGF("[%d] %d | min = %d max = %d\n", CkMyPe(),
2587          lastMinK, msg->minIDs[lastMinK], msg->maxIDs[lastMinK]);
2588   if ((CkMyPe() == msg->minIDs[lastMinK]) || 
2589       (CkMyPe() == msg->maxIDs[lastMinK])) {
2590     CkAssert(!selected);
2591     selected = true;
2592   }
2593
2594   // build outgoing reduction structure
2595   //   format = minVal | ID | maxVal | ID
2596   double *minMaxAndIDs = NULL;
2597
2598   minMaxAndIDs = new double[numK*4];
2599   // initialize to the appropriate out-of-band values (for error checks)
2600   for (int i=0; i<numK; i++) {
2601     minMaxAndIDs[i*4] = -1.0; // out-of-band min value
2602     minMaxAndIDs[i*4+1] = -1.0; // out of band ID
2603     minMaxAndIDs[i*4+2] = -1.0; // out-of-band max value
2604     minMaxAndIDs[i*4+3] = -1.0; // out of band ID
2605   }
2606   // If I have not won before, I put myself back into the competition
2607   if (!selected) {
2608     DEBUGF("[%d] My Contribution = %lf\n", CkMyPe(), minDistance);
2609     minMaxAndIDs[lastMinK*4] = minDistance;
2610     minMaxAndIDs[lastMinK*4+1] = CkMyPe();
2611     minMaxAndIDs[lastMinK*4+2] = minDistance;
2612     minMaxAndIDs[lastMinK*4+3] = CkMyPe();
2613   }
2614   delete msg;
2615
2616   CkCallback cb(CkIndex_KMeansBOC::findNextMinMax(NULL), 
2617                 0, thisProxy);
2618   contribute(numK*4*sizeof(double), minMaxAndIDs, 
2619              minMaxReductionType, cb);  
2620 }
2621
2622 void KMeansBOC::findNextMinMax(CkReductionMsg *msg) {
2623   // incoming format:
2624   //   minVal | minID | maxVal | maxID
2625
2626   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::findNextMinMax time=\t%g\n", CkMyPe(), CkWallTimer() );
2627
2628   if (numSelectionIter > 0) {
2629     double *incInfo = (double *)msg->getData();
2630     
2631     KSelectionMessage *outmsg = new (numK, numK) KSelectionMessage;
2632     outmsg->numKMinIDs = numK;
2633     outmsg->numKMaxIDs = numK;
2634     
2635     for (int i=0; i<numK; i++) {
2636       DEBUGF("%d | %lf %d %lf %d \n", i, 
2637              incInfo[i*4], (int)incInfo[i*4+1], 
2638              incInfo[i*4+2], (int)incInfo[i*4+3]);
2639     }
2640
2641     for (int i=0; i<numK; i++) {
2642       if (exemplarChoicesLeft[i] > 0) {
2643         outmsg->minIDs[i] = (int)incInfo[i*4+1];
2644         exemplarChoicesLeft[i]--;
2645       } else {
2646         outmsg->minIDs[i] = -1;
2647       }
2648       if (outlierChoicesLeft[i] > 0) {
2649         outmsg->maxIDs[i] = (int)incInfo[i*4+3];
2650         outlierChoicesLeft[i]--;
2651       } else {
2652         outmsg->maxIDs[i] = -1;
2653       }
2654     }
2655     thisProxy.collectDistances(outmsg);
2656     numSelectionIter--;
2657   } else {
2658     // invoke phase completion on all processors
2659     thisProxy.phaseDone();
2660   }
2661 }
2662
2663 /**
2664  *  Completion of the K-Means clustering and data selection of one phase
2665  *    of the computation.
2666  *
2667  *  Called on every processor.
2668  */
2669 void KMeansBOC::phaseDone() {
2670
2671   //  if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::phaseDone time=\t%g\n", CkMyPe(), CkWallTimer() );
2672
2673   LogPool *pool = CkpvAccess(_trace)->_logPool;
2674   CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
2675
2676   // now decide on what to do with the decision.
2677   if (!selected) {
2678     if (usePhases) {
2679       pool->keepPhase[currentPhase] = false;
2680     } else {
2681       // if not using phases, we're working on the whole log
2682       pool->setAllPhases(false);
2683     }
2684   }
2685
2686   // **FIXME** (?) - All processors have to agree on this, or the reduction
2687   //   will not be correct! The question is "is this enforcible?"
2688   if ((currentPhase == (pool->numPhases-1)) || !usePhases) {
2689     // We're done
2690     int dummy = 0;
2691     CkCallback cb(CkIndex_TraceProjectionsBOC::kMeansDone(NULL), 
2692                   0, bocProxy);
2693     contribute(sizeof(int), &dummy, CkReduction::sum_int, cb);
2694   } else {
2695     // reset all phase-based k-means data and decisions
2696
2697     // **FIXME**!!!!!    
2698     
2699     // invoke the next K-Means computation phase.
2700     currentPhase++;
2701     thisProxy[CkMyPe()].getNextPhaseMetrics();
2702   }
2703 }
2704
2705 void TraceProjectionsBOC::startTimeAnalysis()
2706 {
2707   double startTime = 0.0;
2708   if (CkpvAccess(_trace)->_logPool->numEntries>0)
2709      startTime = CkpvAccess(_trace)->_logPool->pool[0].time;
2710   CkCallback cb(CkIndex_TraceProjectionsBOC::startTimeDone(NULL), thisProxy);
2711   contribute(sizeof(double), &startTime, CkReduction::min_double, cb);  
2712 }
2713
2714 void TraceProjectionsBOC::startTimeDone(CkReductionMsg *msg)
2715 {
2716   // CkPrintf("[%d] TraceProjectionsBOC::startTimeDone time=\t%g parModulesRemaining:%d\n", CkMyPe(), CkWallTimer(), parModulesRemaining);
2717
2718   if (CkpvAccess(_trace) != NULL) {
2719     CkpvAccess(_trace)->_logPool->globalStartTime = *(double *)msg->getData();
2720     CkpvAccess(_trace)->_logPool->setNewStartTime();
2721     //if (CkMyPe() == 0) CkPrintf("Start time determined to be %lf us\n", (CkpvAccess(_trace)->_logPool->globalStartTime)*1e06);
2722   }
2723   delete msg;
2724   thisProxy[CkMyPe()].startEndTimeAnalysis();
2725 }
2726
2727 void TraceProjectionsBOC::startEndTimeAnalysis()
2728 {
2729  //CkPrintf("[%d] TraceProjectionsBOC::startEndTimeAnalysis time=\t%g\n", CkMyPe(), CkWallTimer() );
2730
2731   endTime = CkpvAccess(_trace)->endTime;
2732   // CkPrintf("[%d] End time is %lf us\n", CkMyPe(), endTime*1e06);
2733
2734   CkCallback cb(CkIndex_TraceProjectionsBOC::endTimeDone(NULL), 
2735                 0, thisProxy);
2736   contribute(sizeof(double), &endTime, CkReduction::max_double, cb);  
2737 }
2738
2739 void TraceProjectionsBOC::endTimeDone(CkReductionMsg *msg)
2740 {
2741  //if(CkMyPe()==0)    CkPrintf("[%d] TraceProjectionsBOC::endTimeDone time=\t%g parModulesRemaining:%d\n", CkMyPe(), CkWallTimer(), parModulesRemaining);
2742
2743   CkAssert(CkMyPe() == 0);
2744   parModulesRemaining--;
2745   if (CkpvAccess(_trace) != NULL) {
2746     CkpvAccess(_trace)->_logPool->globalEndTime = *(double *)msg->getData() - CkpvAccess(_trace)->_logPool->globalStartTime;
2747     // CkPrintf("End time determined to be %lf us\n",
2748     //       (CkpvAccess(_trace)->_logPool->globalEndTime)*1e06);
2749   }
2750   delete msg;
2751   if (parModulesRemaining == 0) {
2752     thisProxy[CkMyPe()].finalize();
2753   }
2754 }
2755
2756 void TraceProjectionsBOC::kMeansDone(CkReductionMsg *msg) {
2757
2758  if(CkMyPe()==0)  CkPrintf("[%d] TraceProjectionsBOC::kMeansDone time=\t%g\n", CkMyPe(), CkWallTimer() );
2759
2760   CkAssert(CkMyPe() == 0);
2761   parModulesRemaining--;
2762   CkPrintf("K-Means Analysis Time = %lf seconds\n",
2763            CmiWallTimer()-analysisStartTime);
2764   delete msg;
2765   if (parModulesRemaining == 0) {
2766     thisProxy[CkMyPe()].finalize();
2767   }
2768 }
2769
2770 /**
2771  *
2772  *  This version is called (on processor 0) only if flushCheck fails.
2773  *
2774  */
2775 void TraceProjectionsBOC::kMeansDone() {
2776   CkAssert(CkMyPe() == 0);
2777   parModulesRemaining--;
2778   CkPrintf("K-Means Analysis Aborted because of flush. Time taken = %lf seconds\n",
2779            CmiWallTimer()-analysisStartTime);
2780   if (parModulesRemaining == 0) {
2781     thisProxy[CkMyPe()].finalize();
2782   }
2783 }
2784
2785 void TraceProjectionsBOC::finalize()
2786 {
2787   CkAssert(CkMyPe() == 0);
2788   //CkPrintf("Total Analysis Time = %lf seconds\n", 
2789   //       CmiWallTimer()-analysisStartTime);
2790   thisProxy.closingTraces();
2791 }
2792
2793 // Called on every processor
2794 void TraceProjectionsBOC::closingTraces() {
2795   CkpvAccess(_trace)->closeTrace();
2796
2797     // subtle:  reduction needs to go to the PE which started CkExit()
2798   int pe = 0;
2799   if (endPe != -1) pe = endPe;
2800   CkCallback cb(CkIndex_TraceProjectionsBOC::closeParallelShutdown(NULL), 
2801                 pe, thisProxy); 
2802   contribute(0, NULL, CkReduction::sum_int, cb);  
2803 }
2804
2805 // The sole purpose of this reduction is to decide whether or not
2806 //   Projections as a module needs to call CkExit() to get other
2807 //   modules to shutdown.
2808 void TraceProjectionsBOC::closeParallelShutdown(CkReductionMsg *msg) {
2809   CkAssert(endPe == -1 && CkMyPe() ==0 || CkMyPe() == endPe);
2810   delete msg;
2811   // decide if CkExit() needs to be called
2812   if (!CkpvAccess(_trace)->converseExit) {
2813     CkExit();
2814   }
2815 }
2816 /*
2817  *  Registration and definition of the Outlier Reduction callback.
2818  *  Format: Sum | Min | Max | Sum of Squares
2819  */
2820 CkReductionMsg *outlierReduction(int nMsgs,
2821                                  CkReductionMsg **msgs) {
2822   int numBytes = 0;
2823   int numMetrics = 0;
2824   double *ret = NULL;
2825
2826   if (nMsgs == 1) {
2827     // nothing to do, just pass it on
2828     return CkReductionMsg::buildNew(msgs[0]->getSize(),msgs[0]->getData());
2829   }
2830
2831   if (nMsgs > 1) {
2832     numBytes = msgs[0]->getSize();
2833     // sanity checks
2834     if (numBytes%sizeof(double) != 0) {
2835       CkAbort("Outlier Reduction Size incompatible with doubles!\n");
2836     }
2837     if ((numBytes/sizeof(double))%4 != 0) {
2838       CkAbort("Outlier Reduction Size Array not divisible by 4!\n");
2839     }
2840     numMetrics = (numBytes/sizeof(double))/4;
2841     ret = new double[numMetrics*4];
2842
2843     // copy the first message data into the return structure first
2844     for (int i=0; i<numMetrics*4; i++) {
2845       ret[i] = ((double *)msgs[0]->getData())[i];
2846     }
2847
2848     // Sum | Min | Max | Sum of Squares
2849     for (int msgIdx=1; msgIdx<nMsgs; msgIdx++) {
2850       for (int i=0; i<numMetrics; i++) {
2851         // Sum
2852         ret[i] += ((double *)msgs[msgIdx]->getData())[i];
2853         // Min
2854         ret[numMetrics + i] =
2855           (ret[numMetrics + i] < 
2856            ((double *)msgs[msgIdx]->getData())[numMetrics + i]) 
2857           ? ret[numMetrics + i] : 
2858           ((double *)msgs[msgIdx]->getData())[numMetrics + i];
2859         // Max
2860         ret[2*numMetrics + i] = 
2861           (ret[2*numMetrics + i] >
2862            ((double *)msgs[msgIdx]->getData())[2*numMetrics + i])
2863           ? ret[2*numMetrics + i] :
2864           ((double *)msgs[msgIdx]->getData())[2*numMetrics + i];
2865         // Sum of Squares (squaring already done at leaf)
2866         ret[3*numMetrics + i] +=
2867           ((double *)msgs[msgIdx]->getData())[3*numMetrics + i];
2868       }
2869     }
2870   }
2871   
2872   /* apparently, we do not delete the incoming messages */
2873   return CkReductionMsg::buildNew(numBytes,ret);
2874 }
2875
2876 /*
2877  * The only reason we have a user-defined reduction is to support
2878  *   identification of the "winning" processors as well as to handle
2879  *   both the min and the max of each "tournament". A simple min/max
2880  *   discovery cannot handle ties.
2881  */
2882 CkReductionMsg *minMaxReduction(int nMsgs,
2883                                 CkReductionMsg **msgs) {
2884   CkAssert(nMsgs > 0);
2885
2886   int numBytes = msgs[0]->getSize();
2887   CkAssert(numBytes%sizeof(double) == 0);
2888   int numK = (numBytes/sizeof(double))/4;
2889
2890   double *ret = new double[numK*4];
2891   // fill with out-of-band values
2892   for (int i=0; i<numK; i++) {
2893     ret[i*4] = -1.0;
2894     ret[i*4+1] = -1.0;
2895     ret[i*4+2] = -1.0;
2896     ret[i*4+3] = -1.0;
2897   }
2898
2899   // incoming format K * (minVal | minIdx | maxVal | maxIdx)
2900   for (int i=0; i<nMsgs; i++) {
2901     double *temp = (double *)msgs[i]->getData();
2902     for (int j=0; j<numK; j++) {
2903       // no previous valid min
2904       if (ret[j*4+1] < 0) {
2905         // fill it in only if the incoming min is valid
2906         if (temp[j*4+1] >= 0) {
2907           ret[j*4] = temp[j*4];      // fill min value
2908           ret[j*4+1] = temp[j*4+1];  // fill ID
2909         }
2910       } else {
2911         // find Min, only if incoming min is valid
2912         if (temp[j*4+1] >= 0) {
2913           if (temp[j*4] < ret[j*4]) {
2914             ret[j*4] = temp[j*4];      // replace min value
2915             ret[j*4+1] = temp[j*4+1];  // replace ID
2916           }
2917         }
2918       }
2919       // no previous valid max
2920       if (ret[j*4+3] < 0) {
2921         // fill only if the incoming max is valid
2922         if (temp[j*4+3] >= 0) {
2923           ret[j*4+2] = temp[j*4+2];  // fill max value
2924           ret[j*4+3] = temp[j*4+3];  // fill ID
2925         }
2926       } else {
2927         // find Max, only if incoming max is valid
2928         if (temp[j*4+3] >= 0) {
2929           if (temp[j*4+2] > ret[j*4+2]) {
2930             ret[j*4+2] = temp[j*4+2];  // replace max value
2931             ret[j*4+3] = temp[j*4+3];  // replace ID
2932           }
2933         }
2934       }
2935     }
2936   }
2937   CkReductionMsg *redmsg = CkReductionMsg::buildNew(numBytes, ret);
2938   delete [] ret;
2939   return redmsg;
2940 }
2941
2942 #include "TraceProjections.def.h"
2943 #endif //PROJ_ANALYSIS
2944
2945 /*@}*/