fix comm thread tracing for small and large messages.
[charm.git] / src / ck-perf / trace-projections.C
1 /**
2  * \addtogroup CkPerf
3 */
4 /*@{*/
5
6 #include <string.h>
7
8 #include "charm++.h"
9 #include "trace-projections.h"
10 #include "trace-projectionsBOC.h"
11
12 #if DEBUG_PROJ
13 #define DEBUGF(...) CkPrintf(__VA_ARGS__)
14 #else
15 #define DEBUGF(...)
16 #endif
17 #define DEBUGN(...)  // easy way to selectively disable DEBUGs
18
19 #define DefaultLogBufSize      1000000
20
21 // **CW** Simple delta encoding implementation
22 // delta encoding is on by default. It may be turned off later in
23 // the runtime.
24 int deltaLog;
25 int nonDeltaLog;
26
27 int checknested=0;              // check illegal nested begin/end execute 
28
29 #ifdef PROJ_ANALYSIS
30 // BOC operations readonlys
31 CkGroupID traceProjectionsGID;
32 CkGroupID kMeansGID;
33
34 // New reduction type for Outlier Analysis purposes. This is allowed to be
35 // a global variable according to the Charm++ manual.
36 CkReductionMsg *outlierReduction(int nMsgs,
37                                  CkReductionMsg **msgs);
38 CkReductionMsg *minMaxReduction(int nMsgs,
39                                 CkReductionMsg **msgs);
40 CkReduction::reducerType outlierReductionType;
41 CkReduction::reducerType minMaxReductionType;
42 #endif // PROJ_ANALYSIS
43
44 CkpvStaticDeclare(TraceProjections*, _trace);
45 CtvStaticDeclare(int,curThreadEvent);
46
47 CkpvDeclare(CmiInt8, CtrLogBufSize);
48
49 typedef CkVec<char *>  usrEventVec;
50 CkpvStaticDeclare(usrEventVec, usrEventlist);
51 class UsrEvent {
52 public:
53   int e;
54   char *str;
55   UsrEvent(int _e, char* _s): e(_e),str(_s) {}
56 };
57 CkpvStaticDeclare(CkVec<UsrEvent *>*, usrEvents);
58
59 // When tracing is disabled, these are defined as empty static inlines
60 // in the header, to minimize overhead
61 #if CMK_TRACE_ENABLED
62 /// Disable the outputting of the trace logs
63 void disableTraceLogOutput()
64
65   CkpvAccess(_trace)->setWriteData(false);
66 }
67
68 /// Enable the outputting of the trace logs
69 void enableTraceLogOutput()
70 {
71   CkpvAccess(_trace)->setWriteData(true);
72 }
73
74 /// Force the log files to be flushed
75 void flushTraceLog()
76 {
77   CkpvAccess(_trace)->traceFlushLog();
78 }
79 #endif
80
81 #if ! CMK_TRACE_ENABLED
82 static int warned=0;
83 #define OPTIMIZED_VERSION       \
84         if (!warned) { warned=1;        \
85         CmiPrintf("\n\n!!!! Warning: traceUserEvent not available in optimized version!!!!\n\n\n"); }
86 #else
87 #define OPTIMIZED_VERSION /*empty*/
88 #endif // CMK_TRACE_ENABLED
89
90 /*
91 On T3E, we need to have file number control by open/close files only when needed.
92 */
93 #if CMK_TRACE_LOGFILE_NUM_CONTROL
94   #define OPEN_LOG openLog("a");
95   #define CLOSE_LOG closeLog();
96 #else
97   #define OPEN_LOG
98   #define CLOSE_LOG
99 #endif //CMK_TRACE_LOGFILE_NUM_CONTROL
100
101 #if CMK_HAS_COUNTER_PAPI
102 int papiEvents[NUMPAPIEVENTS] = { PAPI_L2_DCM, PAPI_FP_OPS };
103 #endif // CMK_HAS_COUNTER_PAPI
104
105 /**
106   For each TraceFoo module, _createTraceFoo() must be defined.
107   This function is called in _createTraces() generated in moduleInit.C
108 */
109 void _createTraceprojections(char **argv)
110 {
111   DEBUGF("%d createTraceProjections\n", CkMyPe());
112   CkpvInitialize(CkVec<char *>, usrEventlist);
113   CkpvInitialize(CkVec<UsrEvent *>*, usrEvents);
114   CkpvAccess(usrEvents) = new CkVec<UsrEvent *>();
115 #if CMK_BIGSIM_CHARM
116   // CthRegister does not call the constructor
117 //  CkpvAccess(usrEvents) = CkVec<UsrEvent *>();
118 #endif //CMK_BIGSIM_CHARM
119   CkpvInitialize(TraceProjections*, _trace);
120   CkpvAccess(_trace) = new  TraceProjections(argv);
121   CkpvAccess(_traces)->addTrace(CkpvAccess(_trace));
122   if (CkMyPe()==0) CkPrintf("Charm++: Tracemode Projections enabled.\n");
123 }
124  
125 /* ****** CW TEMPORARY LOCATION ***** Support for thread listeners */
126
127 struct TraceThreadListener {
128   struct CthThreadListener base;
129   int event;
130   int msgType;
131   int ep;
132   int srcPe;
133   int ml;
134   CmiObjId idx;
135 };
136
137 extern "C"
138 void traceThreadListener_suspend(struct CthThreadListener *l)
139 {
140   TraceThreadListener *a=(TraceThreadListener *)l;
141   /* here, we activate the appropriate trace codes for the appropriate
142      registered modules */
143   traceSuspend();
144 }
145
146 extern "C"
147 void traceThreadListener_resume(struct CthThreadListener *l) 
148 {
149   TraceThreadListener *a=(TraceThreadListener *)l;
150   /* here, we activate the appropriate trace codes for the appropriate
151      registered modules */
152   _TRACE_BEGIN_EXECUTE_DETAILED(a->event,a->msgType,a->ep,a->srcPe,a->ml,
153                                 CthGetThreadID(a->base.thread));
154   a->event=-1;
155   a->srcPe=CkMyPe(); /* potential lie to migrated threads */
156   a->ml=0;
157 }
158
159 extern "C"
160 void traceThreadListener_free(struct CthThreadListener *l) 
161 {
162   TraceThreadListener *a=(TraceThreadListener *)l;
163   delete a;
164 }
165
166 void TraceProjections::traceAddThreadListeners(CthThread tid, envelope *e)
167 {
168 #if CMK_TRACE_ENABLED
169   /* strip essential information from the envelope */
170   TraceThreadListener *a= new TraceThreadListener;
171   
172   a->base.suspend=traceThreadListener_suspend;
173   a->base.resume=traceThreadListener_resume;
174   a->base.free=traceThreadListener_free;
175   a->event=e->getEvent();
176   a->msgType=e->getMsgtype();
177   a->ep=e->getEpIdx();
178   a->srcPe=e->getSrcPe();
179   a->ml=e->getTotalsize();
180
181   CthAddListener(tid, (CthThreadListener *)a);
182 #endif
183 }
184
185 void LogPool::openLog(const char *mode)
186 {
187 #if CMK_PROJECTIONS_USE_ZLIB
188   if(compressed) {
189     if (nonDeltaLog) {
190       do {
191         zfp = gzopen(fname, mode);
192       } while (!zfp && (errno == EINTR || errno == EMFILE));
193       if(!zfp) CmiAbort("Cannot open Projections Compressed Non Delta Trace File for writing...\n");
194     }
195     if (deltaLog) {
196       do {
197         deltazfp = gzopen(dfname, mode);
198       } while (!deltazfp && (errno == EINTR || errno == EMFILE));
199       if (!deltazfp) 
200         CmiAbort("Cannot open Projections Compressed Delta Trace File for writing...\n");
201     }
202   } else {
203     if (nonDeltaLog) {
204       do {
205         fp = fopen(fname, mode);
206       } while (!fp && (errno == EINTR || errno == EMFILE));
207       if (!fp) {
208         CkPrintf("[%d] Attempting to open file [%s]\n",CkMyPe(),fname);
209         CmiAbort("Cannot open Projections Non Delta Trace File for writing...\n");
210       }
211     }
212     if (deltaLog) {
213       do {
214         deltafp = fopen(dfname, mode);
215       } while (!deltafp && (errno == EINTR || errno == EMFILE));
216       if (!deltafp) 
217         CmiAbort("Cannot open Projections Delta Trace File for writing...\n");
218     }
219   }
220 #else
221   if (nonDeltaLog) {
222     do {
223       fp = fopen(fname, mode);
224     } while (!fp && (errno == EINTR || errno == EMFILE));
225     if (!fp) {
226       CkPrintf("[%d] Attempting to open file [%s]\n",CkMyPe(),fname);
227       CmiAbort("Cannot open Projections Non Delta Trace File for writing...\n");
228     }
229   }
230   if (deltaLog) {
231     do {
232       deltafp = fopen(dfname, mode);
233     } while (!deltafp && (errno == EINTR || errno == EMFILE));
234     if(!deltafp) 
235       CmiAbort("Cannot open Projections Delta Trace File for writing...\n");
236   }
237 #endif
238 }
239
240 void LogPool::closeLog(void)
241 {
242 #if CMK_PROJECTIONS_USE_ZLIB
243   if(compressed) {
244     if (nonDeltaLog) gzclose(zfp);
245     if (deltaLog) gzclose(deltazfp);
246     return;
247   }
248 #endif
249   if (nonDeltaLog) { 
250 #if !defined(_WIN32) || defined(__CYGWIN__)
251     fsync(fileno(fp)); 
252 #endif
253     fclose(fp); 
254   }
255   if (deltaLog)  { 
256 #if !defined(_WIN32) || defined(__CYGWIN__)
257     fsync(fileno(deltafp)); 
258 #endif
259     fclose(deltafp);  
260   }
261 }
262
263 LogPool::LogPool(char *pgm) {
264   pool = new LogEntry[CkpvAccess(CtrLogBufSize)];
265   // defaults to writing data (no outlier changes)
266   writeData = true;
267   numEntries = 0;
268   // **CW** for simple delta encoding
269   prevTime = 0.0;
270   timeErr = 0.0;
271   globalStartTime = 0.0;
272   globalEndTime = 0.0;
273   headerWritten = 0;
274   numPhases = 0;
275   hasFlushed = false;
276
277   keepPhase = NULL;
278
279   fileCreated = false;
280   poolSize = CkpvAccess(CtrLogBufSize);
281   pgmname = new char[strlen(pgm)+1];
282   strcpy(pgmname, pgm);
283 }
284
285 void LogPool::createFile(const char *fix)
286 {
287   if (fileCreated) {
288     return;
289   }
290
291   char* filenameLastPart = strrchr(pgmname, PATHSEP) + 1; // Last occurrence of path separator
292   char *pathPlusFilePrefix = new char[1024];
293
294   if(nSubdirs > 0){
295     int sd = CkMyPe() % nSubdirs;
296     char *subdir = new char[1024];
297     sprintf(subdir, "%s.projdir.%d", pgmname, sd);
298     CmiMkdir(subdir);
299     sprintf(pathPlusFilePrefix, "%s%c%s%s", subdir, PATHSEP, filenameLastPart, fix);
300     delete[] subdir;
301   } else {
302     sprintf(pathPlusFilePrefix, "%s%s", pgmname, fix);
303   }
304
305   char pestr[10];
306   sprintf(pestr, "%d", CkMyPe());
307 #if CMK_PROJECTIONS_USE_ZLIB
308   int len;
309   if(compressed)
310     len = strlen(pathPlusFilePrefix)+strlen(".logold")+strlen(pestr)+strlen(".gz")+3;
311   else
312     len = strlen(pathPlusFilePrefix)+strlen(".logold")+strlen(pestr)+3;
313 #else
314   int len = strlen(pathPlusFilePrefix)+strlen(".logold")+strlen(pestr)+3;
315 #endif
316
317   if (nonDeltaLog) {
318     fname = new char[len];
319   }
320   if (deltaLog) {
321     dfname = new char[len];
322   }
323 #if CMK_PROJECTIONS_USE_ZLIB
324   if(compressed) {
325     if (deltaLog && nonDeltaLog) {
326       sprintf(fname, "%s.%s.logold.gz",  pathPlusFilePrefix, pestr);
327       sprintf(dfname, "%s.%s.log.gz", pathPlusFilePrefix, pestr);
328     } else {
329       if (nonDeltaLog) {
330         sprintf(fname, "%s.%s.log.gz", pathPlusFilePrefix,pestr);
331       } else {
332         sprintf(dfname, "%s.%s.log.gz", pathPlusFilePrefix, pestr);
333       }
334     }
335   } else {
336     if (deltaLog && nonDeltaLog) {
337       sprintf(fname, "%s.%s.logold", pathPlusFilePrefix, pestr);
338       sprintf(dfname, "%s.%s.log", pathPlusFilePrefix, pestr);
339     } else {
340       if (nonDeltaLog) {
341         sprintf(fname, "%s.%s.log", pathPlusFilePrefix, pestr);
342       } else {
343         sprintf(dfname, "%s.%s.log", pathPlusFilePrefix, pestr);
344       }
345     }
346   }
347 #else
348   if (deltaLog && nonDeltaLog) {
349     sprintf(fname, "%s.%s.logold", pathPlusFilePrefix, pestr);
350     sprintf(dfname, "%s.%s.log", pathPlusFilePrefix, pestr);
351   } else {
352     if (nonDeltaLog) {
353       sprintf(fname, "%s.%s.log", pathPlusFilePrefix, pestr);
354     } else {
355       sprintf(dfname, "%s.%s.log", pathPlusFilePrefix, pestr);
356     }
357   }
358 #endif
359   fileCreated = true;
360   delete[] pathPlusFilePrefix;
361   openLog("w+");
362   CLOSE_LOG 
363 }
364
365 void LogPool::createSts(const char *fix)
366 {
367   CkAssert(CkMyPe() == 0);
368   // create the sts file
369   char *fname = new char[strlen(CkpvAccess(traceRoot))+strlen(fix)+strlen(".sts")+2];
370   sprintf(fname, "%s%s.sts", CkpvAccess(traceRoot), fix);
371   do
372     {
373       stsfp = fopen(fname, "w");
374     } while (!stsfp && (errno == EINTR || errno == EMFILE));
375   if(stsfp==0){
376     CmiPrintf("Cannot open projections sts file for writing due to %s\n", strerror(errno));
377     CmiAbort("Error!!\n");
378   }
379   delete[] fname;
380 }  
381
382 void LogPool::createRC()
383 {
384   // create the projections rc file.
385   fname = 
386     new char[strlen(CkpvAccess(traceRoot))+strlen(".projrc")+1];
387   sprintf(fname, "%s.projrc", CkpvAccess(traceRoot));
388   do {
389     rcfp = fopen(fname, "w");
390   } while (!rcfp && (errno == EINTR || errno == EMFILE));
391   if (rcfp==0) {
392     CmiAbort("Cannot open projections configuration file for writing.\n");
393   }
394   delete[] fname;
395 }
396
397 LogPool::~LogPool() 
398 {
399   if (writeData) {
400     writeLog();
401 #if !CMK_TRACE_LOGFILE_NUM_CONTROL
402     closeLog();
403 #endif
404   }
405
406 #if CMK_BIGSIM_CHARM
407   extern int correctTimeLog;
408   if (correctTimeLog) {
409     createFile("-bg");
410     if (CkMyPe() == 0) {
411       createSts("-bg");
412     }
413     writeHeader();
414     if (CkMyPe() == 0) writeSts(NULL);
415     postProcessLog();
416   }
417 #endif
418
419   delete[] pool;
420   delete [] fname;
421 }
422
423 void LogPool::writeHeader()
424 {
425   if (headerWritten) return;
426   headerWritten = 1;
427   if(!binary) {
428 #if CMK_PROJECTIONS_USE_ZLIB
429     if(compressed) {
430       if (nonDeltaLog) {
431         gzprintf(zfp, "PROJECTIONS-RECORD %d\n", numEntries);
432       }
433       if (deltaLog) {
434         gzprintf(deltazfp, "PROJECTIONS-RECORD %d DELTA\n", numEntries);
435       }
436     } 
437     else /* else clause is below... */
438 #endif
439     /*... may hang over from else above */ {
440       if (nonDeltaLog) {
441         fprintf(fp, "PROJECTIONS-RECORD %d\n", numEntries);
442       }
443       if (deltaLog) {
444         fprintf(deltafp, "PROJECTIONS-RECORD %d DELTA\n", numEntries);
445       }
446     }
447   }
448   else { // binary
449       if (nonDeltaLog) {
450         fwrite(&numEntries,sizeof(numEntries),1,fp);
451       }
452       if (deltaLog) {
453         fwrite(&numEntries,sizeof(numEntries),1,deltafp);
454       }
455   }
456 }
457
458 void LogPool::writeLog(void)
459 {
460   createFile();
461   OPEN_LOG
462   writeHeader();
463   if (nonDeltaLog) write(0);
464   if (deltaLog) write(1);
465   CLOSE_LOG
466 }
467
468 void LogPool::write(int writedelta) 
469 {
470   // **CW** Simple delta encoding implementation
471   // prevTime has to be maintained as an object variable because
472   // LogPool::write may be called several times depending on the
473   // +logsize value.
474   PUP::er *p = NULL;
475   if (binary) {
476     p = new PUP::toDisk(writedelta?deltafp:fp);
477   }
478 #if CMK_PROJECTIONS_USE_ZLIB
479   else if (compressed) {
480     p = new toProjectionsGZFile(writedelta?deltazfp:zfp);
481   }
482 #endif
483   else {
484     p = new toProjectionsFile(writedelta?deltafp:fp);
485   }
486   CmiAssert(p);
487   int curPhase = 0;
488   // **FIXME** - Should probably consider a more sophisticated bounds-based
489   //   approach for selective writing instead of making multiple if-checks
490   //   for every single event.
491   for(UInt i=0; i<numEntries; i++) {
492     if (!writedelta) {
493       if (keepPhase == NULL) {
494         // default case, when no phase selection is required.
495         pool[i].pup(*p);
496       } else {
497         // **FIXME** Might be a good idea to create a "filler" event block for
498         //   all the events taken out by phase filtering.
499         if (pool[i].type == END_PHASE) {
500           // always write phase markers
501           pool[i].pup(*p);
502           curPhase++;
503         } else if (pool[i].type == BEGIN_COMPUTATION ||
504                    pool[i].type == END_COMPUTATION) {
505           // always write BEGIN and END COMPUTATION markers
506           pool[i].pup(*p);
507         } else if (keepPhase[curPhase]) {
508           pool[i].pup(*p);
509         }
510       }
511     }
512     else {      // delta
513       // **FIXME** Implement phase-selective writing for delta logs
514       //   eventually
515       double time = pool[i].time;
516       if (pool[i].type != BEGIN_COMPUTATION && pool[i].type != END_COMPUTATION)
517       {
518         double timeDiff = (time-prevTime)*1.0e6;
519         UInt intTimeDiff = (UInt)timeDiff;
520         timeErr += timeDiff - intTimeDiff; /* timeErr is never >= 2.0 */
521         if (timeErr > 1.0) {
522           timeErr -= 1.0;
523           intTimeDiff++;
524         }
525         pool[i].time = intTimeDiff/1.0e6;
526       }
527       pool[i].pup(*p);
528       pool[i].time = time;      // restore time value
529       prevTime = time;
530     }
531   }
532   delete p;
533   delete [] keepPhase;
534 }
535
536 void LogPool::writeSts(void)
537 {
538   // for whining compilers
539   int i;
540   // generate an automatic unique ID for each log
541   fprintf(stsfp, "PROJECTIONS_ID %s\n", "");
542   fprintf(stsfp, "VERSION %s\n", PROJECTION_VERSION);
543   fprintf(stsfp, "TOTAL_PHASES %d\n", numPhases);
544 #if CMK_HAS_COUNTER_PAPI
545   fprintf(stsfp, "TOTAL_PAPI_EVENTS %d\n", NUMPAPIEVENTS);
546   // for now, use i, next time use papiEvents[i].
547   // **CW** papi event names is a hack.
548   char eventName[PAPI_MAX_STR_LEN];
549   for (i=0;i<NUMPAPIEVENTS;i++) {
550     PAPI_event_code_to_name(papiEvents[i], eventName);
551     fprintf(stsfp, "PAPI_EVENT %d %s\n", i, eventName);
552   }
553 #endif
554   traceWriteSTS(stsfp,CkpvAccess(usrEvents)->length());
555   for(i=0;i<CkpvAccess(usrEvents)->length();i++){
556     fprintf(stsfp, "EVENT %d %s\n", (*CkpvAccess(usrEvents))[i]->e, (*CkpvAccess(usrEvents))[i]->str);
557   }     
558 }
559
560 void LogPool::writeSts(TraceProjections *traceProj){
561   writeSts();
562   if (traceProj != NULL) {
563     CkHashtableIterator  *funcIter = traceProj->getfuncIterator();
564     funcIter->seekStart();
565     int numFuncs = traceProj->getFuncNumber();
566     fprintf(stsfp,"TOTAL_FUNCTIONS %d \n",numFuncs);
567     while(funcIter->hasNext()) {
568       StrKey *key;
569       int *obj = (int *)funcIter->next((void **)&key);
570       fprintf(stsfp,"FUNCTION %d %s \n",*obj,key->getStr());
571     }
572   }
573   fprintf(stsfp, "END\n");
574   fclose(stsfp);
575 }
576
577 void LogPool::writeRC(void)
578 {
579     //CkPrintf("write RC is being executed\n");
580 #ifdef PROJ_ANALYSIS  
581     CkAssert(CkMyPe() == 0);
582     fprintf(rcfp,"RC_GLOBAL_START_TIME %lld\n",
583           (CMK_TYPEDEF_UINT8)(1.0e6*globalStartTime));
584     fprintf(rcfp,"RC_GLOBAL_END_TIME   %lld\n",
585           (CMK_TYPEDEF_UINT8)(1.0e6*globalEndTime));
586     /* //Yanhua comment it because isOutlierAutomatic is not a variable in trace
587     if (CkpvAccess(_trace)->isOutlierAutomatic()) {
588       fprintf(rcfp,"RC_OUTLIER_FILTERED true\n");
589     } else {
590       fprintf(rcfp,"RC_OUTLIER_FILTERED false\n");
591     }
592     */
593 #endif //PROJ_ANALYSIS
594   fclose(rcfp);
595 }
596
597
598 #if CMK_BIGSIM_CHARM
599 static void updateProjLog(void *data, double t, double recvT, void *ptr)
600 {
601   LogEntry *log = (LogEntry *)data;
602   FILE *fp = *(FILE **)ptr;
603   log->time = t;
604   log->recvTime = recvT<0.0?0:recvT;
605 //  log->write(fp);
606   toProjectionsFile p(fp);
607   log->pup(p);
608 }
609 #endif
610
611 // flush log entries to disk
612 void LogPool::flushLogBuffer()
613 {
614   if (numEntries) {
615     double writeTime = TraceTimer();
616     writeLog();
617     hasFlushed = true;
618     numEntries = 0;
619     new (&pool[numEntries++]) LogEntry(writeTime, BEGIN_INTERRUPT);
620     new (&pool[numEntries++]) LogEntry(TraceTimer(), END_INTERRUPT);
621   }
622 }
623
624 void LogPool::add(UChar type, UShort mIdx, UShort eIdx,
625                   double time, int event, int pe, int ml, CmiObjId *id, 
626                   double recvT, double cpuT, int numPe)
627 {
628   new (&pool[numEntries++])
629     LogEntry(time, type, mIdx, eIdx, event, pe, ml, id, recvT, cpuT, numPe);
630   if ((type == END_PHASE) || (type == END_COMPUTATION)) {
631     numPhases++;
632   }
633   if(poolSize==numEntries) {
634     flushLogBuffer();
635 #if CMK_BIGSIM_CHARM
636     extern int correctTimeLog;
637     if (correctTimeLog) CmiAbort("I/O interrupt!\n");
638 #endif
639   }
640 #if CMK_BIGSIM_CHARM
641   switch (type) {
642     case BEGIN_PROCESSING:
643       pool[numEntries-1].recvTime = BgGetRecvTime();
644     case END_PROCESSING:
645     case BEGIN_COMPUTATION:
646     case END_COMPUTATION:
647     case CREATION:
648     case BEGIN_PACK:
649     case END_PACK:
650     case BEGIN_UNPACK:
651     case END_UNPACK:
652     case USER_EVENT_PAIR:
653       bgAddProjEvent(&pool[numEntries-1], numEntries-1, time, updateProjLog, &fp, BG_EVENT_PROJ);
654   }
655 #endif
656 }
657
658 void LogPool::add(UChar type,double time,UShort funcID,int lineNum,char *fileName){
659 #ifndef CMK_BIGSIM_CHARM
660   new (&pool[numEntries++])
661         LogEntry(time,type,funcID,lineNum,fileName);
662   if(poolSize == numEntries){
663     flushLogBuffer();
664   }
665 #endif  
666 }
667
668
669   
670 void LogPool::addMemoryUsage(unsigned char type,double time,double memUsage){
671 #ifndef CMK_BIGSIM_CHARM
672   new (&pool[numEntries++])
673         LogEntry(type,time,memUsage);
674   if(poolSize == numEntries){
675     flushLogBuffer();
676   }
677 #endif  
678         
679 }  
680
681
682
683 void LogPool::addUserSupplied(int data){
684         // add an event
685         add(USER_SUPPLIED, 0, 0, TraceTimer(), -1, -1, 0, 0, 0, 0, 0 );
686
687         // set the user supplied value for the previously created event 
688         pool[numEntries-1].setUserSuppliedData(data);
689   }
690
691
692 void LogPool::addUserSuppliedNote(char *note){
693         // add an event
694         add(USER_SUPPLIED_NOTE, 0, 0, TraceTimer(), -1, -1, 0, 0, 0, 0, 0 );
695
696         // set the user supplied note for the previously created event 
697         pool[numEntries-1].setUserSuppliedNote(note);
698   }
699
700 void LogPool::addUserSuppliedBracketedNote(char *note, int eventID, double bt, double et){
701   //CkPrintf("LogPool::addUserSuppliedBracketedNote eventID=%d\n", eventID);
702 #ifndef CMK_BIGSIM_CHARM
703 #if MPI_TRACE_MACHINE_HACK
704   //This part of code is used  to combine the contiguous
705   //MPI_Test and MPI_Iprobe events to reduce the number of
706   //entries
707 #define MPI_TEST_EVENT_ID 60
708 #define MPI_IPROBE_EVENT_ID 70 
709   int lastEvent = pool[numEntries-1].event;
710   if((eventID==MPI_TEST_EVENT_ID || eventID==MPI_IPROBE_EVENT_ID) && (eventID==lastEvent)){
711     //just replace the endtime of last event
712     //CkPrintf("addUserSuppliedBracketNote: for event %d\n", lastEvent);
713     pool[numEntries].endTime = et;
714   }else{
715     new (&pool[numEntries++])
716       LogEntry(bt, et, USER_SUPPLIED_BRACKETED_NOTE, note, eventID);
717   }
718 #else
719   new (&pool[numEntries++])
720     LogEntry(bt, et, USER_SUPPLIED_BRACKETED_NOTE, note, eventID);
721 #endif
722   if(poolSize == numEntries){
723     flushLogBuffer();
724   }
725 #endif  
726 }
727
728
729 /* **CW** Not sure if this is the right thing to do. Feels more like
730    a hack than a solution to Sameer's request to add the destination
731    processor information to multicasts and broadcasts.
732
733    In the unlikely event this method is used for Broadcasts as well,
734    pelist == NULL will be used to indicate a global broadcast with 
735    num PEs.
736 */
737 void LogPool::addCreationMulticast(UShort mIdx, UShort eIdx, double time,
738                                    int event, int pe, int ml, CmiObjId *id,
739                                    double recvT, int numPe, int *pelist)
740 {
741   new (&pool[numEntries++])
742     LogEntry(time, mIdx, eIdx, event, pe, ml, id, recvT, numPe, pelist);
743   if(poolSize==numEntries) {
744     flushLogBuffer();
745   }
746 }
747
748 void LogPool::postProcessLog()
749 {
750 #if CMK_BIGSIM_CHARM
751   bgUpdateProj(1);   // event type
752 #endif
753 }
754
755 void LogPool::modLastEntryTimestamp(double ts)
756 {
757   pool[numEntries-1].time = ts;
758   //pool[numEntries-1].cputime = ts;
759 }
760
761 // /** Constructor for a multicast log entry */
762 // 
763 //  THIS WAS MOVED TO trace-projections.h with the other constructors
764 // 
765 // LogEntry::LogEntry(double tm, unsigned short m, unsigned short e, int ev, int p,
766 //           int ml, CmiObjId *d, double rt, int numPe, int *pelist) 
767 // {
768 //     type = CREATION_MULTICAST; mIdx = m; eIdx = e; event = ev; pe = p; time = tm; msglen = ml;
769 //     if (d) id = *d; else {id.id[0]=id.id[1]=id.id[2]=id.id[3]=-1; };
770 //     recvTime = rt; 
771 //     numpes = numPe;
772 //     userSuppliedNote = NULL;
773 //     if (pelist != NULL) {
774 //      pes = new int[numPe];
775 //      for (int i=0; i<numPe; i++) {
776 //        pes[i] = pelist[i];
777 //      }
778 //     } else {
779 //      pes= NULL;
780 //     }
781 // }
782
783 //void LogEntry::addPapi(LONG_LONG_PAPI *papiVals)
784 //{
785 //#if CMK_HAS_COUNTER_PAPI
786 //   memcpy(papiValues, papiVals, sizeof(LONG_LONG_PAPI)*NUMPAPIEVENTS);
787 //#endif
788 //}
789
790
791
792 void LogEntry::pup(PUP::er &p)
793 {
794   int i;
795   CMK_TYPEDEF_UINT8 itime, iEndTime, irecvtime, icputime;
796   char ret = '\n';
797
798   p|type;
799   if (p.isPacking()) itime = (CMK_TYPEDEF_UINT8)(1.0e6*time);
800   if (p.isPacking()) iEndTime = (CMK_TYPEDEF_UINT8)(1.0e6*endTime);
801
802   switch (type) {
803     case USER_EVENT:
804     case USER_EVENT_PAIR:
805       p|mIdx; p|itime; p|event; p|pe;
806       break;
807     case BEGIN_IDLE:
808     case END_IDLE:
809     case BEGIN_PACK:
810     case END_PACK:
811     case BEGIN_UNPACK:
812     case END_UNPACK:
813       p|itime; p|pe; 
814       break;
815     case BEGIN_PROCESSING:
816       if (p.isPacking()) {
817         irecvtime = (CMK_TYPEDEF_UINT8)(recvTime==-1?-1:1.0e6*recvTime);
818         icputime = (CMK_TYPEDEF_UINT8)(1.0e6*cputime);
819       }
820       p|mIdx; p|eIdx; p|itime; p|event; p|pe; 
821       p|msglen; p|irecvtime; 
822       p|id.id[0]; p|id.id[1]; p|id.id[2]; p|id.id[3];
823       p|icputime;
824 #if CMK_HAS_COUNTER_PAPI
825       //p|numPapiEvents;
826       for (i=0; i<NUMPAPIEVENTS; i++) {
827         // not yet!!!
828         //      p|papiIDs[i]; 
829         p|papiValues[i];
830         
831       }
832 #else
833       //p|numPapiEvents;     // non papi version has value 0
834 #endif
835       if (p.isUnpacking()) {
836         recvTime = irecvtime/1.0e6;
837         cputime = icputime/1.0e6;
838       }
839       break;
840     case END_PROCESSING:
841       if (p.isPacking()) icputime = (CMK_TYPEDEF_UINT8)(1.0e6*cputime);
842       p|mIdx; p|eIdx; p|itime; p|event; p|pe; p|msglen; p|icputime;
843 #if CMK_HAS_COUNTER_PAPI
844       //p|numPapiEvents;
845       for (i=0; i<NUMPAPIEVENTS; i++) {
846         // not yet!!!
847         //      p|papiIDs[i];
848         p|papiValues[i];
849       }
850 #else
851       //p|numPapiEvents;  // non papi version has value 0
852 #endif
853       if (p.isUnpacking()) cputime = icputime/1.0e6;
854       break;
855     case USER_SUPPLIED:
856           p|userSuppliedData;
857           p|itime;
858         break;
859     case USER_SUPPLIED_NOTE:
860           p|itime;
861           int length;
862           if (p.isPacking()) length = strlen(userSuppliedNote);
863           p | length;
864           char space;
865           space = ' ';
866           p | space;
867           if (p.isUnpacking()) {
868             userSuppliedNote = new char[length+1];
869             userSuppliedNote[length] = '\0';
870           }
871           PUParray(p,userSuppliedNote, length);
872           break;
873     case USER_SUPPLIED_BRACKETED_NOTE:
874       //CkPrintf("Writting out a USER_SUPPLIED_BRACKETED_NOTE\n");
875           p|itime;
876           p|iEndTime;
877           p|event;
878           int length2;
879           if (p.isPacking()) length2 = strlen(userSuppliedNote);
880           p | length2;
881           char space2;
882           space2 = ' ';
883           p | space2;
884           if (p.isUnpacking()) {
885             userSuppliedNote = new char[length+1];
886             userSuppliedNote[length] = '\0';
887           }
888           PUParray(p,userSuppliedNote, length2);
889           break;
890     case MEMORY_USAGE_CURRENT:
891       p | memUsage;
892       p | itime;
893         break;
894     case CREATION:
895       if (p.isPacking()) irecvtime = (CMK_TYPEDEF_UINT8)(1.0e6*recvTime);
896       p|mIdx; p|eIdx; p|itime;
897       p|event; p|pe; p|msglen; p|irecvtime;
898       if (p.isUnpacking()) recvTime = irecvtime/1.0e6;
899       break;
900     case CREATION_BCAST:
901       if (p.isPacking()) irecvtime = (CMK_TYPEDEF_UINT8)(1.0e6*recvTime);
902       p|mIdx; p|eIdx; p|itime;
903       p|event; p|pe; p|msglen; p|irecvtime; p|numpes;
904       if (p.isUnpacking()) recvTime = irecvtime/1.0e6;
905       break;
906     case CREATION_MULTICAST:
907       if (p.isPacking()) irecvtime = (CMK_TYPEDEF_UINT8)(1.0e6*recvTime);
908       p|mIdx; p|eIdx; p|itime;
909       p|event; p|pe; p|msglen; p|irecvtime; p|numpes;
910       if (p.isUnpacking()) pes = numpes?new int[numpes]:NULL;
911       for (i=0; i<numpes; i++) p|pes[i];
912       if (p.isUnpacking()) recvTime = irecvtime/1.0e6;
913       break;
914     case MESSAGE_RECV:
915       p|mIdx; p|eIdx; p|itime; p|event; p|pe; p|msglen;
916       break;
917
918     case ENQUEUE:
919     case DEQUEUE:
920       p|mIdx; p|itime; p|event; p|pe;
921       break;
922
923     case BEGIN_INTERRUPT:
924     case END_INTERRUPT:
925       p|itime; p|event; p|pe;
926       break;
927
928       // **CW** absolute timestamps are used here to support a quick
929       // way of determining the total time of a run in projections
930       // visualization.
931     case BEGIN_COMPUTATION:
932     case END_COMPUTATION:
933     case BEGIN_TRACE:
934     case END_TRACE:
935       p|itime;
936       break;
937     case BEGIN_FUNC:
938         p | itime;
939         p | mIdx;
940         p | event;
941         if(!p.isUnpacking()){
942                 p(fName,flen-1);
943         }
944         break;
945     case END_FUNC:
946         p | itime;
947         p | mIdx;
948         break;
949     case END_PHASE:
950       p|eIdx; // FIXME: actually the phase ID
951       p|itime;
952       break;
953     default:
954       CmiError("***Internal Error*** Wierd Event %d.\n", type);
955       break;
956   }
957   if (p.isUnpacking()) time = itime/1.0e6;
958   p|ret;
959 }
960
961 TraceProjections::TraceProjections(char **argv): 
962   curevent(0), inEntry(0), computationStarted(0), 
963         converseExit(0), endTime(0.0), traceNestedEvents(0),
964         currentPhaseID(0), lastPhaseEvent(NULL)
965 {
966   //  CkPrintf("Trace projections dummy constructor called on %d\n",CkMyPe());
967
968   if (CkpvAccess(traceOnPe) == 0) return;
969
970   CtvInitialize(int,curThreadEvent);
971   CkpvInitialize(CmiInt8, CtrLogBufSize);
972   CkpvAccess(CtrLogBufSize) = DefaultLogBufSize;
973   CtvAccess(curThreadEvent)=0;
974   if (CmiGetArgLongDesc(argv,"+logsize",&CkpvAccess(CtrLogBufSize), 
975                        "Log entries to buffer per I/O")) {
976     if (CkMyPe() == 0) {
977       CmiPrintf("Trace: logsize: %ld\n", CkpvAccess(CtrLogBufSize));
978     }
979   }
980   checknested = 
981     CmiGetArgFlagDesc(argv,"+checknested",
982                       "check projections nest begin end execute events");
983   traceNestedEvents = 
984     CmiGetArgFlagDesc(argv,"+tracenested",
985               "trace projections nest begin/end execute events");
986   int binary = 
987     CmiGetArgFlagDesc(argv,"+binary-trace",
988                       "Write log files in binary format");
989
990   CmiInt8 nSubdirs = 0;
991   CmiGetArgLongDesc(argv,"+trace-subdirs", &nSubdirs, "Number of subdirectories into which traces will be written");
992
993
994 #if CMK_PROJECTIONS_USE_ZLIB
995   int compressed = CmiGetArgFlagDesc(argv,"+gz-trace","Write log files pre-compressed with gzip");
996 #else
997   // consume the flag so there's no confusing
998   CmiGetArgFlagDesc(argv,"+gz-trace",
999                     "Write log files pre-compressed with gzip");
1000   if(CkMyPe() == 0) CkPrintf("Warning> gz-trace is not supported on this machine!\n");
1001 #endif
1002
1003   // **CW** default to non delta log encoding. The user may choose to do
1004   // create both logs (for debugging) or just the old log timestamping
1005   // (for compatibility).
1006   // Generating just the non delta log takes precedence over generating
1007   // both logs (if both arguments appear on the command line).
1008
1009   // switch to OLD log format until everything works // Gengbin
1010   nonDeltaLog = 1;
1011   deltaLog = 0;
1012   deltaLog = CmiGetArgFlagDesc(argv, "+logDelta",
1013                                   "Generate Delta encoded and simple timestamped log files");
1014
1015   _logPool = new LogPool(CkpvAccess(traceRoot));
1016   _logPool->setNumSubdirs(nSubdirs);
1017   _logPool->setBinary(binary);
1018 #if CMK_PROJECTIONS_USE_ZLIB
1019   _logPool->setCompressed(compressed);
1020 #endif
1021   if (CkMyPe() == 0) {
1022     _logPool->createSts();
1023     _logPool->createRC();
1024   }
1025   funcCount=1;
1026
1027 #if CMK_HAS_COUNTER_PAPI
1028   // We initialize and create the event sets for use with PAPI here.
1029   int papiRetValue;
1030   if(CkMyRank()==0){
1031     papiRetValue = PAPI_library_init(PAPI_VER_CURRENT);
1032     if (papiRetValue != PAPI_VER_CURRENT) {
1033       CmiAbort("PAPI Library initialization failure!\n");
1034     }
1035 #if CMK_SMP
1036     if(PAPI_thread_init(pthread_self) != PAPI_OK){
1037       CmiAbort("PAPI could not be initialized in SMP mode!\n");
1038     }
1039 #endif
1040   }
1041
1042 #if CMK_SMP
1043   //PAPI_thread_init has to finish before calling PAPI_create_eventset
1044   CmiNodeAllBarrier();
1045 #endif
1046   // PAPI 3 mandates the initialization of the set to PAPI_NULL
1047   papiEventSet = PAPI_NULL; 
1048   if (PAPI_create_eventset(&papiEventSet) != PAPI_OK) {
1049     CmiAbort("PAPI failed to create event set!\n");
1050   }
1051   papiRetValue = PAPI_add_events(papiEventSet, papiEvents, NUMPAPIEVENTS);
1052   if (papiRetValue != PAPI_OK) {
1053     if (papiRetValue == PAPI_ECNFLCT) {
1054       CmiAbort("PAPI events conflict! Please re-assign event types!\n");
1055     } else {
1056       CmiAbort("PAPI failed to add designated events!\n");
1057     }
1058   }
1059   memset(papiValues, 0, NUMPAPIEVENTS*sizeof(LONG_LONG_PAPI));
1060 #endif
1061 }
1062
1063 int TraceProjections::traceRegisterUserEvent(const char* evt, int e)
1064 {
1065   OPTIMIZED_VERSION
1066   CkAssert(e==-1 || e>=0);
1067   CkAssert(evt != NULL);
1068   int event;
1069   int biggest = -1;
1070   for (int i=0; i<CkpvAccess(usrEvents)->length(); i++) {
1071     int cur = (*CkpvAccess(usrEvents))[i]->e;
1072     if (cur == e) {
1073       //CmiPrintf("%s %s\n", (*CkpvAccess(usrEvents))[i]->str, evt);
1074       if (strcmp((*CkpvAccess(usrEvents))[i]->str, evt) == 0) 
1075         return e;
1076       else
1077         CmiAbort("UserEvent double registered!");
1078     }
1079     if (cur > biggest) biggest = cur;
1080   }
1081   // if no user events have so far been registered. biggest will be -1
1082   // and hence newly assigned event numbers will begin from 0.
1083   if (e==-1) event = biggest+1;  // automatically assign new event number
1084   else event = e;
1085   CkpvAccess(usrEvents)->push_back(new UsrEvent(event,(char *)evt));
1086   return event;
1087 }
1088
1089 void TraceProjections::traceClearEps(void)
1090 {
1091   // In trace-summary, this zeros out the EP bins, to eliminate noise
1092   // from startup.  Here, this isn't useful, since we can do that in
1093   // post-processing
1094 }
1095
1096 void TraceProjections::traceWriteSts(void)
1097 {
1098   if(CkMyPe()==0)
1099     _logPool->writeSts(this);
1100 }
1101
1102 /** 
1103  * **IMPT NOTES**:
1104  *
1105  * This is called when Converse closes during ConverseCommonExit().
1106  * **FIXME**(?) - is this also exposed as a tracing-framework API call?
1107  *
1108  * Some programs bypass CkExit() (like NAMD, which eventually calls
1109  * ConverseExit()), modules like traces will have to pretend to shutdown
1110  * as if CkExit() was called but at the same time avoid making
1111  * subsequent CkExit() calls (which is usually required for allowing
1112  * other modules to shutdown).
1113  *
1114  * Note that we can only get here if CkExit() was not called, since the
1115  * trace module will un-register itself from TraceArray if it did.
1116  *
1117  */
1118 void TraceProjections::traceClose(void)
1119 {
1120 #ifdef PROJ_ANALYSIS
1121   // CkPrintf("CkExit was not called on shutdown on [%d]\n", CkMyPe());
1122
1123   // sets the flag that tells the code not to make the CkExit call later
1124   converseExit = 1;
1125   if (CkMyPe() == 0) {
1126     CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1127     bocProxy.traceProjectionsParallelShutdown(-1);
1128   }
1129   if(CkMyRank() == CkMyNodeSize()){ //communication thread
1130     CkpvAccess(_trace)->endComputation();
1131     delete _logPool;              // will write
1132     // remove myself from traceArray so that no tracing will be called.
1133     CkpvAccess(_traces)->removeTrace(this);
1134   }
1135 #else
1136   // we've already deleted the logpool, so multiple calls to traceClose
1137   // are tolerated.
1138   if (_logPool == NULL) {
1139     return;
1140   }
1141   if(CkMyPe()==0){
1142     _logPool->writeSts(this);
1143   }
1144   CkpvAccess(_trace)->endComputation();
1145   delete _logPool;              // will write
1146   // remove myself from traceArray so that no tracing will be called.
1147   CkpvAccess(_traces)->removeTrace(this);
1148 #endif
1149 }
1150
1151 /**
1152  *  **IMPT NOTES**:
1153  *
1154  *  This is meant to be called internally by the tracing framework.
1155  *
1156  */
1157 void TraceProjections::closeTrace() {
1158   //  CkPrintf("Close Trace called on [%d]\n", CkMyPe());
1159   if (CkMyPe() == 0) {
1160     // CkPrintf("Pe 0 will now write sts and projrc files\n");
1161     _logPool->writeSts(this);
1162     _logPool->writeRC();
1163     // CkPrintf("Pe 0 has now written sts and projrc files\n");
1164   }
1165   delete _logPool;       // will write logs to file
1166 }
1167
1168 #if CMK_SMP_TRACE_COMMTHREAD
1169 void TraceProjections::traceBeginOnCommThread()
1170 {
1171   if (!computationStarted) return;
1172   _logPool->add(BEGIN_TRACE, 0, 0, TraceTimer(), curevent++, CmiNumPes()+CmiMyNode());
1173 }
1174
1175 void TraceProjections::traceEndOnCommThread()
1176 {
1177   _logPool->add(END_TRACE, 0, 0, TraceTimer(), curevent++, CmiNumPes()+CmiMyNode());
1178 }
1179 #endif
1180
1181 void TraceProjections::traceBegin(void)
1182 {
1183   if (!computationStarted) return;
1184   _logPool->add(BEGIN_TRACE, 0, 0, TraceTimer(), curevent++, CkMyPe());
1185 }
1186
1187 void TraceProjections::traceEnd(void)
1188 {
1189   _logPool->add(END_TRACE, 0, 0, TraceTimer(), curevent++, CkMyPe());
1190 }
1191
1192 void TraceProjections::userEvent(int e)
1193 {
1194   if (!computationStarted) return;
1195   _logPool->add(USER_EVENT, e, 0, TraceTimer(),curevent++,CkMyPe());
1196 }
1197
1198 void TraceProjections::userBracketEvent(int e, double bt, double et)
1199 {
1200   if (!computationStarted) return;
1201   // two events record Begin/End of event e.
1202   _logPool->add(USER_EVENT_PAIR, e, 0, TraceTimer(bt), curevent, CkMyPe());
1203   _logPool->add(USER_EVENT_PAIR, e, 0, TraceTimer(et), curevent++, CkMyPe());
1204 }
1205
1206 void TraceProjections::userSuppliedData(int d)
1207 {
1208   if (!computationStarted) return;
1209   _logPool->addUserSupplied(d);
1210 }
1211
1212 void TraceProjections::userSuppliedNote(char *note)
1213 {
1214   if (!computationStarted) return;
1215   _logPool->addUserSuppliedNote(note);
1216 }
1217
1218
1219 void TraceProjections::userSuppliedBracketedNote(char *note, int eventID, double bt, double et)
1220 {
1221   if (!computationStarted) return;
1222   _logPool->addUserSuppliedBracketedNote(note,  eventID,  bt, et);
1223 }
1224
1225 void TraceProjections::memoryUsage(double m)
1226 {
1227   if (!computationStarted) return;
1228   _logPool->addMemoryUsage(MEMORY_USAGE_CURRENT, TraceTimer(), m );
1229   
1230 }
1231
1232
1233 void TraceProjections::creation(envelope *e, int ep, int num)
1234 {
1235   double curTime = TraceTimer();
1236   if (e == 0) {
1237     CtvAccess(curThreadEvent) = curevent;
1238     _logPool->add(CREATION, ForChareMsg, ep, curTime,
1239                   curevent++, CkMyPe(), 0, NULL, 0, 0.0);
1240   } else {
1241     int type=e->getMsgtype();
1242     e->setEvent(curevent);
1243     if (num > 1) {
1244       _logPool->add(CREATION_BCAST, type, ep, curTime,
1245                     curevent++, CkMyPe(), e->getTotalsize(), 
1246                     NULL, 0, 0.0, num);
1247     } else {
1248       _logPool->add(CREATION, type, ep, curTime,
1249                     curevent++, CkMyPe(), e->getTotalsize(), 
1250                     NULL, 0, 0.0);
1251     }
1252   }
1253 }
1254
1255 //This function is only called from a comm thread in SMP mode. 
1256 void TraceProjections::creation(char *msg)
1257 {
1258 #if CMK_SMP_TRACE_COMMTHREAD
1259         // msg must be a charm message
1260         envelope *e = (envelope *)msg;
1261         int ep = e->getEpIdx();
1262     if(ep==0) return;
1263     int num = _entryTable.size();
1264     CmiAssert(ep < num);
1265         if(_entryTable[ep]->traceEnabled) {
1266                 creation(e, ep, 1);
1267         e->setSrcPe(CkMyPe());              // pretend I am the sender
1268     }
1269 #endif
1270 }
1271
1272 void TraceProjections::traceCommSetMsgID(char *msg)
1273 {
1274 #if CMK_SMP_TRACE_COMMTHREAD
1275         // msg must be a charm message
1276         envelope *e = (envelope *)msg;
1277         int ep = e->getEpIdx();
1278     if(ep==0) return;
1279     int num = _entryTable.size();
1280     CmiAssert(ep < num);
1281         if(_entryTable[ep]->traceEnabled) {
1282         e->setSrcPe(CkMyPe());              // pretend I am the sender
1283         e->setEvent(curevent);
1284     }
1285 #endif
1286 }
1287
1288 void TraceProjections::traceGetMsgID(char *msg, int *pe, int *event)
1289 {
1290     // msg must be a charm message
1291     *pe = *event = -1;
1292     envelope *e = (envelope *)msg;
1293     int ep = e->getEpIdx();
1294     if(ep==0) return;
1295     int num = _entryTable.size();
1296     CmiAssert(ep < num);
1297     if(_entryTable[ep]->traceEnabled) {
1298         *pe = e->getSrcPe();
1299         *event = e->getEvent();
1300     }
1301 }
1302
1303 void TraceProjections::traceSetMsgID(char *msg, int pe, int event)
1304 {
1305        // msg must be a charm message
1306     envelope *e = (envelope *)msg;
1307     int ep = e->getEpIdx();
1308     if(ep==0) return;
1309     int num = _entryTable.size();
1310     CmiAssert(ep < num);
1311     if(_entryTable[ep]->traceEnabled) {
1312         e->setSrcPe(pe);
1313         e->setEvent(event);
1314     }
1315 }
1316
1317 /* **CW** Non-disruptive attempt to add destination PE knowledge to
1318    Communication Library-specific Multicasts via new event 
1319    CREATION_MULTICAST.
1320 */
1321
1322 void TraceProjections::creationMulticast(envelope *e, int ep, int num,
1323                                          int *pelist)
1324 {
1325   double curTime = TraceTimer();
1326   if (e==0) {
1327     CtvAccess(curThreadEvent)=curevent;
1328     _logPool->addCreationMulticast(ForChareMsg, ep, curTime, curevent++,
1329                                    CkMyPe(), 0, 0, 0.0, num, pelist);
1330   } else {
1331     int type=e->getMsgtype();
1332     e->setEvent(curevent);
1333     _logPool->addCreationMulticast(type, ep, curTime, curevent++, CkMyPe(),
1334                                    e->getTotalsize(), 0, 0.0, num, pelist);
1335   }
1336 }
1337
1338 void TraceProjections::creationDone(int num)
1339 {
1340   // modified the creation done time of the last num log entries
1341   // FIXME: totally a hack
1342   double curTime = TraceTimer();
1343   int idx = _logPool->numEntries-1;
1344   while (idx >=0 && num >0 ) {
1345     LogEntry &log = _logPool->pool[idx];
1346     if ((log.type == CREATION) ||
1347         (log.type == CREATION_BCAST) ||
1348         (log.type == CREATION_MULTICAST)) {
1349       log.recvTime = curTime - log.time;
1350       num --;
1351     }
1352     idx--;
1353   }
1354 }
1355
1356 void TraceProjections::beginExecute(CmiObjId *tid)
1357 {
1358 #if CMK_HAS_COUNTER_PAPI
1359   if (PAPI_read(papiEventSet, papiValues) != PAPI_OK) {
1360     CmiAbort("PAPI failed to read at begin execute!\n");
1361   }
1362 #endif
1363   if (checknested && inEntry) CmiAbort("Nested Begin Execute!\n");
1364   execEvent = CtvAccess(curThreadEvent);
1365   execEp = (-1);
1366   _logPool->add(BEGIN_PROCESSING,ForChareMsg,_threadEP,TraceTimer(),
1367                 execEvent,CkMyPe(), 0, tid);
1368 #if CMK_HAS_COUNTER_PAPI
1369   _logPool->addPapi(papiValues);
1370 #endif
1371   inEntry = 1;
1372 }
1373
1374 void TraceProjections::beginExecute(envelope *e)
1375 {
1376   if(e==0) {
1377 #if CMK_HAS_COUNTER_PAPI
1378     if (PAPI_read(papiEventSet, papiValues) != PAPI_OK) {
1379       CmiAbort("PAPI failed to read at begin execute!\n");
1380     }
1381 #endif
1382     if (checknested && inEntry) CmiAbort("Nested Begin Execute!\n");
1383     execEvent = CtvAccess(curThreadEvent);
1384     execEp = (-1);
1385     _logPool->add(BEGIN_PROCESSING,ForChareMsg,_threadEP,TraceTimer(),
1386                   execEvent,CkMyPe(), 0, NULL, 0.0, TraceCpuTimer());
1387 #if CMK_HAS_COUNTER_PAPI
1388     _logPool->addPapi(papiValues);
1389 #endif
1390     inEntry = 1;
1391   } else {
1392     beginExecute(e->getEvent(),e->getMsgtype(),e->getEpIdx(),
1393                  e->getSrcPe(),e->getTotalsize());
1394   }
1395 }
1396
1397 void TraceProjections::beginExecute(char *msg){
1398 #if CMK_SMP_TRACE_COMMTHREAD
1399         //This function is called from comm thread in SMP mode
1400     envelope *e = (envelope *)msg;
1401     int ep = e->getEpIdx();
1402     if (ep == 0) return;
1403     int num = _entryTable.size();
1404     CmiAssert(ep < num);
1405     if(_entryTable[ep]->traceEnabled)
1406                 beginExecute(e);
1407 #endif
1408 }
1409
1410 void TraceProjections::beginExecute(int event, int msgType, int ep, int srcPe,
1411                                     int mlen, CmiObjId *idx)
1412 {
1413   if (traceNestedEvents) {
1414     if (! nestedEvents.isEmpty()) {
1415       endExecuteLocal();
1416     }
1417     nestedEvents.enq(NestedEvent(event, msgType, ep, srcPe, mlen, idx));
1418   }
1419   beginExecuteLocal(event, msgType, ep, srcPe, mlen, idx);
1420 }
1421
1422 void TraceProjections::changeLastEntryTimestamp(double ts)
1423 {
1424   _logPool->modLastEntryTimestamp(ts);
1425 }
1426
1427 void TraceProjections::beginExecuteLocal(int event, int msgType, int ep, int srcPe,
1428                                     int mlen, CmiObjId *idx)
1429 {
1430 #if CMK_HAS_COUNTER_PAPI
1431   if (PAPI_read(papiEventSet, papiValues) != PAPI_OK) {
1432     CmiAbort("PAPI failed to read at begin execute!\n");
1433   }
1434 #endif
1435   if (checknested && inEntry) CmiAbort("Nested Begin Execute!\n");
1436   execEvent=event;
1437   execEp=ep;
1438   execPe=srcPe;
1439   _logPool->add(BEGIN_PROCESSING,msgType,ep,TraceTimer(),event,
1440                 srcPe, mlen, idx, 0.0, TraceCpuTimer());
1441 #if CMK_HAS_COUNTER_PAPI
1442   _logPool->addPapi(papiValues);
1443 #endif
1444   inEntry = 1;
1445 }
1446
1447 void TraceProjections::endExecute(void)
1448 {
1449   if (traceNestedEvents) nestedEvents.deq();
1450   endExecuteLocal();
1451   if (traceNestedEvents) {
1452     if (! nestedEvents.isEmpty()) {
1453       NestedEvent &ne = nestedEvents.peek();
1454       beginExecuteLocal(ne.event, ne.msgType, ne.ep, ne.srcPe, ne.ml, ne.idx);
1455     }
1456   }
1457 }
1458
1459 void TraceProjections::endExecute(char *msg)
1460 {
1461 #if CMK_SMP_TRACE_COMMTHREAD
1462         //This function is called from comm thread in SMP mode
1463     envelope *e = (envelope *)msg;
1464     int ep = e->getEpIdx();
1465     if (ep == 0) return;
1466     int num = _entryTable.size();
1467     CmiAssert(ep < num);
1468     if(_entryTable[ep]->traceEnabled)
1469                 endExecute();
1470 #endif  
1471 }
1472
1473 void TraceProjections::endExecuteLocal(void)
1474 {
1475 #if CMK_HAS_COUNTER_PAPI
1476   if (PAPI_read(papiEventSet, papiValues) != PAPI_OK) {
1477     CmiAbort("PAPI failed to read at end execute!\n");
1478   }
1479 #endif
1480   if (checknested && !inEntry) CmiAbort("Nested EndExecute!\n");
1481   double cputime = TraceCpuTimer();
1482   if(execEp == (-1)) {
1483     _logPool->add(END_PROCESSING, 0, _threadEP, TraceTimer(),
1484                   execEvent, CkMyPe(), 0, NULL, 0.0, cputime);
1485   } else {
1486     _logPool->add(END_PROCESSING, 0, execEp, TraceTimer(),
1487                   execEvent, execPe, 0, NULL, 0.0, cputime);
1488   }
1489 #if CMK_HAS_COUNTER_PAPI
1490   _logPool->addPapi(papiValues);
1491 #endif
1492   inEntry = 0;
1493 }
1494
1495 void TraceProjections::messageRecv(char *env, int pe)
1496 {
1497 #if 0
1498   envelope *e = (envelope *)env;
1499   int msgType = e->getMsgtype();
1500   int ep = e->getEpIdx();
1501 #if 0
1502   if (msgType==NewChareMsg || msgType==NewVChareMsg
1503           || msgType==ForChareMsg || msgType==ForVidMsg
1504           || msgType==BocInitMsg || msgType==NodeBocInitMsg
1505           || msgType==ForBocMsg || msgType==ForNodeBocMsg)
1506     ep = e->getEpIdx();
1507   else
1508     ep = _threadEP;
1509 #endif
1510   _logPool->add(MESSAGE_RECV, msgType, ep, TraceTimer(),
1511                 curevent++, e->getSrcPe(), e->getTotalsize());
1512 #endif
1513 }
1514
1515 void TraceProjections::beginIdle(double curWallTime)
1516 {
1517   _logPool->add(BEGIN_IDLE, 0, 0, TraceTimer(curWallTime), 0, CkMyPe());
1518 }
1519
1520 void TraceProjections::endIdle(double curWallTime)
1521 {
1522   _logPool->add(END_IDLE, 0, 0, TraceTimer(curWallTime), 0, CkMyPe());
1523 }
1524
1525 void TraceProjections::beginPack(void)
1526 {
1527   _logPool->add(BEGIN_PACK, 0, 0, TraceTimer(), 0, CkMyPe());
1528 }
1529
1530 void TraceProjections::endPack(void)
1531 {
1532   _logPool->add(END_PACK, 0, 0, TraceTimer(), 0, CkMyPe());
1533 }
1534
1535 void TraceProjections::beginUnpack(void)
1536 {
1537   _logPool->add(BEGIN_UNPACK, 0, 0, TraceTimer(), 0, CkMyPe());
1538 }
1539
1540 void TraceProjections::endUnpack(void)
1541 {
1542   _logPool->add(END_UNPACK, 0, 0, TraceTimer(), 0, CkMyPe());
1543 }
1544
1545 void TraceProjections::enqueue(envelope *) {}
1546
1547 void TraceProjections::dequeue(envelope *) {}
1548
1549 void TraceProjections::beginComputation(void)
1550 {
1551   computationStarted = 1;
1552
1553   // Executes the callback function provided by the machine
1554   // layer. This is the proper method to register user events in a
1555   // machine layer because projections is a charm++ module.
1556   if (CkpvAccess(traceOnPe) != 0) {
1557     void (*ptr)() = registerMachineUserEvents();
1558     if (ptr != NULL) {
1559       ptr();
1560     }
1561   }
1562 //  CkpvAccess(traceInitTime) = TRACE_TIMER();
1563 //  CkpvAccess(traceInitCpuTime) = TRACE_CPUTIMER();
1564   _logPool->add(BEGIN_COMPUTATION, 0, 0, TraceTimer(), -1, -1);
1565 #if CMK_HAS_COUNTER_PAPI
1566   // we start the counters here
1567   if (PAPI_start(papiEventSet) != PAPI_OK) {
1568     CmiAbort("PAPI failed to start designated counters!\n");
1569   }
1570 #endif
1571 }
1572
1573 void TraceProjections::endComputation(void)
1574 {
1575 #if CMK_HAS_COUNTER_PAPI
1576   // we stop the counters here. A silent failure is alright since we
1577   // are already at the end of the program.
1578   if (PAPI_stop(papiEventSet, papiValues) != PAPI_OK) {
1579     CkPrintf("Warning: PAPI failed to stop correctly!\n");
1580   }
1581   // NOTE: We should not do a complete close of PAPI until after the
1582   // sts writer is done.
1583 #endif
1584   endTime = TraceTimer();
1585   _logPool->add(END_COMPUTATION, 0, 0, endTime, -1, -1);
1586   /*
1587   CkPrintf("End Computation [%d] records time as %lf\n", CkMyPe(), 
1588            endTime*1e06);
1589   */
1590 }
1591
1592 int TraceProjections::idxRegistered(int idx)
1593 {
1594     int idxVecLen = idxVec.size();
1595     for(int i=0; i<idxVecLen; i++)
1596     {
1597         if(idx == idxVec[i])
1598             return 1;
1599     }
1600     return 0;
1601 }
1602
1603 void TraceProjections::regFunc(const char *name, int &idx, int idxSpecifiedByUser){
1604     StrKey k((char*)name,strlen(name));
1605     int num = funcHashtable.get(k);
1606     
1607     if(num!=0) {
1608         return;
1609         //as for mpi programs, the same function may be registered for several times
1610         //CmiError("\"%s has been already registered! Please change the name!\"\n", name);
1611     }
1612     
1613     int isIdxExisting=0;
1614     if(idxSpecifiedByUser)
1615         isIdxExisting=idxRegistered(idx);
1616     if(isIdxExisting){
1617         return;
1618         //same reason with num!=0
1619         //CmiError("The identifier %d for the trace function has been already registered!", idx);
1620     }
1621
1622     if(idxSpecifiedByUser) {
1623         char *st = new char[strlen(name)+1];
1624         memcpy(st,name,strlen(name)+1);
1625         StrKey *newKey = new StrKey(st,strlen(st));
1626         int &ref = funcHashtable.put(*newKey);
1627         ref=idx;
1628         funcCount++;
1629         idxVec.push_back(idx);  
1630     } else {
1631         char *st = new char[strlen(name)+1];
1632         memcpy(st,name,strlen(name)+1);
1633         StrKey *newKey = new StrKey(st,strlen(st));
1634         int &ref = funcHashtable.put(*newKey);
1635         ref=funcCount;
1636         num = funcCount;
1637         funcCount++;
1638         idx = num;
1639         idxVec.push_back(idx);
1640     }
1641 }
1642
1643 void TraceProjections::beginFunc(char *name,char *file,int line){
1644         StrKey k(name,strlen(name));    
1645         unsigned short  num = (unsigned short)funcHashtable.get(k);
1646         beginFunc(num,file,line);
1647 }
1648
1649 void TraceProjections::beginFunc(int idx,char *file,int line){
1650         if(idx <= 0){
1651                 CmiError("Unregistered function id %d being used in %s:%d \n",idx,file,line);
1652         }       
1653         _logPool->add(BEGIN_FUNC,TraceTimer(),idx,line,file);
1654 }
1655
1656 void TraceProjections::endFunc(char *name){
1657         StrKey k(name,strlen(name));    
1658         int num = funcHashtable.get(k);
1659         endFunc(num);   
1660 }
1661
1662 void TraceProjections::endFunc(int num){
1663         if(num <= 0){
1664                 printf("endFunc without start :O\n");
1665         }
1666         _logPool->add(END_FUNC,TraceTimer(),num,0,NULL);
1667 }
1668
1669 // specialized PUP:ers for handling trace projections logs
1670 void toProjectionsFile::bytes(void *p,int n,size_t itemSize,dataType t)
1671 {
1672   for (int i=0;i<n;i++) 
1673     switch(t) {
1674     case Tchar: CheckAndFPrintF(f,"%c",((char *)p)[i]); break;
1675     case Tuchar:
1676     case Tbyte: CheckAndFPrintF(f,"%d",((unsigned char *)p)[i]); break;
1677     case Tshort: CheckAndFPrintF(f," %d",((short *)p)[i]); break;
1678     case Tushort: CheckAndFPrintF(f," %u",((unsigned short *)p)[i]); break;
1679     case Tint: CheckAndFPrintF(f," %d",((int *)p)[i]); break;
1680     case Tuint: CheckAndFPrintF(f," %u",((unsigned int *)p)[i]); break;
1681     case Tlong: CheckAndFPrintF(f," %ld",((long *)p)[i]); break;
1682     case Tulong: CheckAndFPrintF(f," %lu",((unsigned long *)p)[i]); break;
1683     case Tfloat: CheckAndFPrintF(f," %.7g",((float *)p)[i]); break;
1684     case Tdouble: CheckAndFPrintF(f," %.15g",((double *)p)[i]); break;
1685 #ifdef CMK_PUP_LONG_LONG
1686     case Tlonglong: CheckAndFPrintF(f," %lld",((CMK_TYPEDEF_INT8 *)p)[i]); break;
1687     case Tulonglong: CheckAndFPrintF(f," %llu",((CMK_TYPEDEF_UINT8 *)p)[i]); break;
1688 #endif
1689     default: CmiAbort("Unrecognized pup type code!");
1690     };
1691 }
1692
1693 void fromProjectionsFile::bytes(void *p,int n,size_t itemSize,dataType t)
1694 {
1695   for (int i=0;i<n;i++) 
1696     switch(t) {
1697     case Tchar: { 
1698       char c = fgetc(f);
1699       if (c==EOF)
1700         parseError("Could not match character");
1701       else
1702         ((char *)p)[i] = c;
1703       break;
1704     }
1705     case Tuchar:
1706     case Tbyte: ((unsigned char *)p)[i]=(unsigned char)readInt("%d"); break;
1707     case Tshort:((short *)p)[i]=(short)readInt(); break;
1708     case Tushort: ((unsigned short *)p)[i]=(unsigned short)readUint(); break;
1709     case Tint:  ((int *)p)[i]=readInt(); break;
1710     case Tuint: ((unsigned int *)p)[i]=readUint(); break;
1711     case Tlong: ((long *)p)[i]=readInt(); break;
1712     case Tulong:((unsigned long *)p)[i]=readUint(); break;
1713     case Tfloat: ((float *)p)[i]=(float)readDouble(); break;
1714     case Tdouble:((double *)p)[i]=readDouble(); break;
1715 #ifdef CMK_PUP_LONG_LONG
1716     case Tlonglong: ((CMK_TYPEDEF_INT8 *)p)[i]=readLongInt(); break;
1717     case Tulonglong: ((CMK_TYPEDEF_UINT8 *)p)[i]=readLongInt(); break;
1718 #endif
1719     default: CmiAbort("Unrecognized pup type code!");
1720     };
1721 }
1722
1723 #if CMK_PROJECTIONS_USE_ZLIB
1724 void toProjectionsGZFile::bytes(void *p,int n,size_t itemSize,dataType t)
1725 {
1726   for (int i=0;i<n;i++) 
1727     switch(t) {
1728     case Tchar: gzprintf(f,"%c",((char *)p)[i]); break;
1729     case Tuchar:
1730     case Tbyte: gzprintf(f,"%d",((unsigned char *)p)[i]); break;
1731     case Tshort: gzprintf(f," %d",((short *)p)[i]); break;
1732     case Tushort: gzprintf(f," %u",((unsigned short *)p)[i]); break;
1733     case Tint: gzprintf(f," %d",((int *)p)[i]); break;
1734     case Tuint: gzprintf(f," %u",((unsigned int *)p)[i]); break;
1735     case Tlong: gzprintf(f," %ld",((long *)p)[i]); break;
1736     case Tulong: gzprintf(f," %lu",((unsigned long *)p)[i]); break;
1737     case Tfloat: gzprintf(f," %.7g",((float *)p)[i]); break;
1738     case Tdouble: gzprintf(f," %.15g",((double *)p)[i]); break;
1739 #ifdef CMK_PUP_LONG_LONG
1740     case Tlonglong: gzprintf(f," %lld",((CMK_TYPEDEF_INT8 *)p)[i]); break;
1741     case Tulonglong: gzprintf(f," %llu",((CMK_TYPEDEF_UINT8 *)p)[i]); break;
1742 #endif
1743     default: CmiAbort("Unrecognized pup type code!");
1744     };
1745 }
1746 #endif
1747
1748 void TraceProjections::endPhase() {
1749   double currentPhaseTime = TraceTimer();
1750   if (lastPhaseEvent != NULL) {
1751   } else {
1752     if (_logPool->pool != NULL) {
1753       // assumed to be BEGIN_COMPUTATION
1754     } else {
1755       CkPrintf("[%d] Warning: End Phase encountered in an empty log. Inserting BEGIN_COMPUTATION event\n", CkMyPe());
1756       _logPool->add(BEGIN_COMPUTATION, 0, 0, currentPhaseTime, -1, -1);
1757     }
1758   }
1759
1760   /* Insert endPhase event here. */
1761   /* FIXME: Format should be TYPE, PHASE#, TimeStamp, [StartTime] */
1762   /*        We currently "borrow" from the standard add() method. */
1763   /*        It should really be its own add() method.             */
1764   /* NOTE: assignment to lastPhaseEvent is "pre-emptive".         */
1765   lastPhaseEvent = &(_logPool->pool[_logPool->numEntries]);
1766   _logPool->add(END_PHASE, 0, currentPhaseID, currentPhaseTime, -1, CkMyPe());
1767   currentPhaseID++;
1768 }
1769
1770 #ifdef PROJ_ANALYSIS
1771 // ***** FROM HERE, ALL BOC-BASED FUNCTIONALITY IS DEFINED *******
1772
1773
1774 // ***@@@@ REGISTRATION FUNCTIONS/METHODS @@@@***
1775
1776 void registerOutlierReduction() {
1777   outlierReductionType =
1778     CkReduction::addReducer(outlierReduction);
1779   minMaxReductionType =
1780     CkReduction::addReducer(minMaxReduction);
1781 }
1782
1783 /**
1784  * **IMPT NOTES**:
1785  *
1786  * This is the C++ code that is registered to be activated at module
1787  * shutdown. This is called exactly once on processor 0. Module shutdown
1788  * is initiated as a result of a CkExit() call by the application code
1789  * 
1790  * The exit function must ultimately call CkExit() again to
1791  * so that other module exit functions may proceed after this module is
1792  * done.
1793  *
1794  */
1795 // FIXME: WHY extern "C"???
1796 extern "C" void TraceProjectionsExitHandler()
1797 {
1798 #if CMK_TRACE_ENABLED
1799   // CkPrintf("[%d] TraceProjectionsExitHandler called!\n", CkMyPe());
1800   CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1801   bocProxy.traceProjectionsParallelShutdown(CkMyPe());
1802 #else
1803   CkExit();
1804 #endif
1805 }
1806
1807 // This is called once on each processor but the idiom of use appears
1808 // to be to only have processor 0 register the function.
1809 //
1810 // See initnode in trace-projections.ci
1811 void initTraceProjectionsBOC()
1812 {
1813   // CkPrintf("[%d] Trace Projections initialization called!\n", CkMyPe());
1814 #ifdef __BIGSIM__
1815   if (BgNodeRank() == 0) {
1816 #else
1817     if (CkMyRank() == 0) {
1818 #endif
1819       registerExitFn(TraceProjectionsExitHandler);
1820     }
1821 #if 0
1822   } // this is so indentation does not get messed up
1823 #endif
1824 }
1825
1826 // mainchare for trace-projections BOC-operations. 
1827 // Instantiated at processor 0 and ONLY resides on processor 0 for the 
1828 // rest of its life.
1829 //
1830 // Responsible for:
1831 //   1. Handling commandline arguments
1832 //   2. Creating any objects required for proper BOC operations.
1833 //
1834 TraceProjectionsInit::TraceProjectionsInit(CkArgMsg *msg) {
1835   /** Options for Outlier Analysis */
1836   // defaults. Things will change with support for interactive analysis.
1837   bool findOutliers = false;
1838   bool outlierAutomatic = true;
1839   int numKSeeds = 10; 
1840
1841   int peNumKeep = CkNumPes();  // used as a default
1842   double entryThreshold = 0.0;
1843   bool outlierUsePhases = false;
1844   if (outlierAutomatic) {
1845     CmiGetArgIntDesc(msg->argv, "+outlierNumSeeds", &numKSeeds,
1846                      "Number of cluster seeds to apply at outlier analysis.");
1847     CmiGetArgIntDesc(msg->argv, "+outlierPeNumKeep", 
1848                      &peNumKeep, "Number of Processors to retain data");
1849     CmiGetArgDoubleDesc(msg->argv, "+outlierEpThresh", &entryThreshold,
1850                         "Minimum significance of entry points to be considered for clustering (%).");
1851     findOutliers =
1852       CmiGetArgFlagDesc(msg->argv,"+outlier", "Find Outliers.");
1853     outlierUsePhases = 
1854       CmiGetArgFlagDesc(msg->argv,"+outlierUsePhases",
1855                         "Apply automatic outlier analysis to any available phases.");
1856     if (outlierUsePhases) {
1857       // if the user wants to use an outlier feature, it is assumed outlier
1858       //    analysis is desired.
1859       findOutliers = true;
1860     }
1861   }
1862   bool findStartTime = (CmiTimerAbsolute()==1);
1863   traceProjectionsGID = CProxy_TraceProjectionsBOC::ckNew(findOutliers, findStartTime);
1864   if (findOutliers) {
1865     kMeansGID = CProxy_KMeansBOC::ckNew(outlierAutomatic,
1866                                         numKSeeds,
1867                                         peNumKeep,
1868                                         entryThreshold,
1869                                         outlierUsePhases);
1870   }
1871 }
1872
1873 // Called on every processor.
1874 void TraceProjectionsBOC::traceProjectionsParallelShutdown(int pe) {
1875   //CmiPrintf("[%d] traceProjectionsParallelShutdown called from . \n", CkMyPe(), pe);
1876   endPe = pe;                // the pe that starts CkExit()
1877   if (CkMyPe() == 0) {
1878     analysisStartTime = CmiWallTimer();
1879   }
1880   CkpvAccess(_trace)->endComputation();
1881   // no more tracing for projections on this processor after this point. 
1882   // Note that clear must be called after remove, or bad things will happen.
1883   CkpvAccess(_traces)->removeTrace(CkpvAccess(_trace));
1884   CkpvAccess(_traces)->clearTrace();
1885
1886   // From this point, we start multiple chains of reductions and broadcasts to
1887   // perform final online analysis activities.
1888
1889   // Start all parallel operations at once. 
1890   //   These MUST NOT modify base performance data in LogPool. If they must,
1891   //   then the parallel operations must be phased (and this code to be
1892   //   restructured as necessary)
1893   CProxy_KMeansBOC kMeansProxy(kMeansGID);
1894   CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1895   if (findOutliers) {
1896     parModulesRemaining++;
1897     kMeansProxy[CkMyPe()].startKMeansAnalysis();
1898   }
1899   parModulesRemaining++;
1900   if (findStartTime) 
1901   bocProxy[CkMyPe()].startTimeAnalysis();
1902   else
1903   bocProxy[CkMyPe()].startEndTimeAnalysis();
1904 }
1905
1906 // Called on each processor
1907 void KMeansBOC::startKMeansAnalysis() {
1908   // Initialize all necessary structures
1909   LogPool *pool = CkpvAccess(_trace)->_logPool;
1910
1911  if(CkMyPe()==0)     CkPrintf("[%d] KMeansBOC::startKMeansAnalysis time=\t%g\n", CkMyPe(), CkWallTimer() );
1912   int flushInt = 0;
1913   if (pool->hasFlushed) {
1914     flushInt = 1;
1915   }
1916   
1917   CkCallback cb(CkIndex_KMeansBOC::flushCheck(NULL), 
1918                 0, thisProxy);
1919   contribute(sizeof(int), &flushInt, CkReduction::logical_or, cb);  
1920 }
1921
1922 // Called on processor 0
1923 void KMeansBOC::flushCheck(CkReductionMsg *msg) {
1924   int someFlush = *((int *)msg->getData());
1925
1926   // if(CkMyPe()==0) CkPrintf("[%d] KMeansBOC::flushCheck time=\t%g\n", CkMyPe(), CkWallTimer() );
1927   
1928   if (someFlush == 0) {
1929     // Data intact proceed with KMeans analysis
1930     CProxy_KMeansBOC kMeansProxy(kMeansGID);
1931     kMeansProxy.flushCheckDone();
1932   } else {
1933     // Some processor had flushed it data at some point, abandon KMeans
1934     CkPrintf("Warning: Some processor has flushed its data. No KMeans will be conducted\n");
1935     // terminate KMeans
1936     CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1937     bocProxy[0].kMeansDone();
1938   }
1939 }
1940
1941 // Called on each processor
1942 void KMeansBOC::flushCheckDone() {
1943   // **FIXME** - more flexible metric collection scheme may be necessary
1944   //   in the future for production use.
1945   LogPool *pool = CkpvAccess(_trace)->_logPool;
1946
1947   // if(CkMyPe()==0)     CkPrintf("[%d] KMeansBOC::flushCheckDone time=\t%g\n", CkMyPe(), CkWallTimer() );
1948
1949   numEntryMethods = _entryTable.size();
1950   numMetrics = numEntryMethods + 2; // EPtime + idle and overhead
1951
1952   // maintained across phases
1953   markedBegin = false;
1954   markedIdle = false;
1955   beginBlockTime = 0.0;
1956   beginIdleBlockTime = 0.0;
1957   lastBeginEPIdx = -1; // none
1958
1959   lastPhaseIdx = 0;
1960   currentExecTimes = NULL;
1961   currentPhase = 0;
1962   selected = false;
1963
1964   pool->initializePhases();
1965
1966   // incoming K Seeds and the per-phase filter
1967   incKSeeds = new double[numK*numMetrics];
1968   keepMetric = new bool[numMetrics];
1969
1970   //  Something wrong when call thisProxy[CkMyPe()].getNextPhaseMetrics() !??!
1971   //  CProxy_KMeansBOC kMeansProxy(kMeansGID);
1972   //  kMeansProxy[CkMyPe()].getNextPhaseMetrics();
1973   thisProxy[CkMyPe()].getNextPhaseMetrics();
1974 }
1975
1976 // Called on each processor.
1977 void KMeansBOC::getNextPhaseMetrics() {
1978   // Assumes the presence of the complete logs on this processor.
1979   // Assumes first event is always BEGIN_COMPUTATION
1980   // Assumes each processor sees the same number of phases.
1981   //
1982   // In this code, we collect performance data for this processor.
1983   // All times are in seconds.
1984
1985   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::getNextPhaseMetrics time=\t%g\n", CkMyPe(), CkWallTimer() );  
1986
1987   if (usePhases) {
1988     DEBUGF("[%d] Using Phases\n", CkMyPe());
1989   } else {
1990     DEBUGF("[%d] NOT using Phases\n", CkMyPe());
1991   }
1992   
1993   if (currentExecTimes != NULL) {
1994     delete [] currentExecTimes;
1995   }
1996   currentExecTimes = new double[numMetrics];
1997   for (int i=0; i<numMetrics; i++) {
1998     currentExecTimes[i] = 0.0;
1999   }
2000
2001   int numEventMethods = _entryTable.size();
2002   LogPool *pool = CkpvAccess(_trace)->_logPool;
2003   
2004   CkAssert(pool->numEntries > lastPhaseIdx);
2005   double totalPhaseTime = 0.0;
2006   double totalActiveTime = 0.0; // entry method + idle
2007
2008   for (int i=lastPhaseIdx; i<pool->numEntries; i++) {
2009     if (pool->pool[i].type == BEGIN_PROCESSING) {
2010       // check pairing
2011       if (!markedBegin) {
2012         markedBegin = true;
2013       }
2014       beginBlockTime = pool->pool[i].time;
2015       lastBeginEPIdx = pool->pool[i].eIdx;
2016     } else if (pool->pool[i].type == END_PROCESSING) {
2017       // check pairing
2018       // if End without a begin, just ignore
2019       //   this event. If a phase-boundary is crossed, the Begin
2020       //   event would be maintained in beginBlockTime, so it is 
2021       //   not a problem.
2022       if (markedBegin) {
2023         markedBegin = false;
2024         if (pool->pool[i].event < 0)
2025         {
2026           // ignore dummy events. **FIXME** as they have no eIdx?
2027           continue;
2028         }
2029         currentExecTimes[pool->pool[i].eIdx] += 
2030           pool->pool[i].time - beginBlockTime;
2031         totalActiveTime += pool->pool[i].time - beginBlockTime;
2032         lastBeginEPIdx = -1;
2033       }
2034     } else if (pool->pool[i].type == BEGIN_IDLE) {
2035       // check pairing
2036       if (!markedIdle) {
2037         markedIdle = true;
2038       }
2039       beginIdleBlockTime = pool->pool[i].time;
2040     } else if (pool->pool[i].type == END_IDLE) {
2041       // check pairing
2042       if (markedIdle) {
2043         markedIdle = false;
2044         currentExecTimes[numEventMethods] += 
2045           pool->pool[i].time - beginIdleBlockTime;
2046         totalActiveTime += pool->pool[i].time - beginIdleBlockTime;
2047       }
2048     } else if (pool->pool[i].type == END_PHASE) {
2049       // ignored when not using phases
2050       if (usePhases) {
2051         // when we've not visited this node before
2052         if (i != lastPhaseIdx) { 
2053           totalPhaseTime = 
2054             pool->pool[i].time - pool->pool[lastPhaseIdx].time;
2055           // it is important that proper accounting of time take place here.
2056           // Note that END_PHASE events inevitably occur in the context of
2057           //   some entry method by the way the tracing API is designed.
2058           if (markedBegin) {
2059             CkAssert(lastBeginEPIdx >= 0);
2060             currentExecTimes[lastBeginEPIdx] += 
2061               pool->pool[i].time - beginBlockTime;
2062             totalActiveTime += pool->pool[i].time - beginBlockTime;
2063             // this is so the remainder contributes to the next phase
2064             beginBlockTime = pool->pool[i].time;
2065           }
2066           // The following is unlikely, but stranger things have happened.
2067           if (markedIdle) {
2068             currentExecTimes[numEventMethods] +=
2069               pool->pool[i].time - beginIdleBlockTime;
2070             totalActiveTime += pool->pool[i].time - beginIdleBlockTime;
2071             // this is so the remainder contributes to the next phase
2072             beginIdleBlockTime = pool->pool[i].time;
2073           }
2074           if (totalActiveTime <= totalPhaseTime) {
2075             currentExecTimes[numEventMethods+1] = 
2076               totalPhaseTime - totalActiveTime;
2077           } else {
2078             currentExecTimes[numEventMethods+1] = 0.0;
2079             CkPrintf("[%d] Warning: Overhead found to be negative for Phase %d!\n",
2080                      CkMyPe(), currentPhase);
2081           }
2082           collectKMeansData();
2083           // end the loop (and method) and defer the work till the next call
2084           lastPhaseIdx = i;
2085           break; 
2086         }
2087       }
2088     } else if (pool->pool[i].type == END_COMPUTATION) {
2089       if (markedBegin) {
2090         CkAssert(lastBeginEPIdx >= 0);
2091         currentExecTimes[lastBeginEPIdx] += 
2092           pool->pool[i].time - beginBlockTime;
2093         totalActiveTime += pool->pool[i].time - beginBlockTime;
2094       }
2095       if (markedIdle) {
2096         currentExecTimes[numEventMethods] +=
2097           pool->pool[i].time - beginIdleBlockTime;
2098         totalActiveTime += pool->pool[i].time - beginIdleBlockTime;
2099       }
2100       totalPhaseTime = 
2101         pool->pool[i].time - pool->pool[lastPhaseIdx].time;
2102       if (totalActiveTime <= totalPhaseTime) {
2103         currentExecTimes[numEventMethods+1] = totalPhaseTime - totalActiveTime;
2104       } else {
2105         currentExecTimes[numEventMethods+1] = 0.0;
2106         CkPrintf("[%d] Warning: Overhead found to be negative!\n",
2107                  CkMyPe());
2108       }
2109       collectKMeansData();
2110     }
2111   }
2112 }
2113
2114 /**
2115  *  Through a reduction, collectKMeansData aggregates each processors' data
2116  *  in order for global properties to be determined:
2117  *  
2118  *  1. min & max to determine normalization factors.
2119  *  2. sum to determine global EP averages for possible metric reduction
2120  *       through thresholding.
2121  *  3. sum of squares to compute stddev which may be useful in the future.
2122  *
2123  *  collectKMeansData will also keep the processor's data for the current
2124  *    phase so that it may be normalized and worked on subsequently.
2125  *
2126  **/
2127 void KMeansBOC::collectKMeansData() {
2128   int minOffset = numMetrics;
2129   int maxOffset = 2*numMetrics;
2130   int sosOffset = 3*numMetrics; // sos = Sum Of Squares
2131
2132   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::collectKMeansData time=\tg\n", CkMyPe(), CkWallTimer() );
2133
2134   double *reductionMsg = new double[numMetrics*4];
2135
2136   for (int i=0; i<numMetrics; i++) {
2137     reductionMsg[i] = currentExecTimes[i];
2138     // duplicate the event times for max and min sections of the reduction
2139     reductionMsg[minOffset + i] = currentExecTimes[i];
2140     reductionMsg[maxOffset + i] = currentExecTimes[i];
2141     // compute squares
2142     reductionMsg[sosOffset + i] = currentExecTimes[i]*currentExecTimes[i];
2143   }
2144
2145   CkCallback cb(CkIndex_KMeansBOC::globalMetricRefinement(NULL), 
2146                 0, thisProxy);
2147   contribute((numMetrics*4)*sizeof(double), reductionMsg, 
2148              outlierReductionType, cb);  
2149 }
2150
2151 // The purpose is mainly to initialize the k seeds and generate
2152 //   normalization parameters for each of the metrics. The k seeds
2153 //   and normalization parameters are broadcast to all processors.
2154 //
2155 // Called on processor 0
2156 void KMeansBOC::globalMetricRefinement(CkReductionMsg *msg) {
2157   CkAssert(CkMyPe() == 0);
2158   
2159   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::globalMetricRefinement time=\t%g\n", CkMyPe(), CkWallTimer() );
2160
2161   int sumOffset = 0;
2162   int minOffset = numMetrics;
2163   int maxOffset = 2*numMetrics;
2164   int sosOffset = 3*numMetrics; // sos = Sum Of Squares
2165
2166   // calculate statistics & boundaries for the k seeds for clustering
2167   KMeansStatsMessage *outmsg = 
2168     new (numMetrics, numK*numMetrics, numMetrics*4) KMeansStatsMessage;
2169   outmsg->numMetrics = numMetrics;
2170   outmsg->numKPos = numK*numMetrics;
2171   outmsg->numStats = numMetrics*4;
2172
2173   // Sum | Min | Max | Sum of Squares
2174   double *totalExecTimes = (double *)msg->getData();
2175   double totalTime = 0.0;
2176
2177   for (int i=0; i<numMetrics; i++) {
2178     DEBUGN("%lf\n", totalExecTimes[i]);
2179     totalTime += totalExecTimes[i];
2180
2181     // calculate event mean over all processors
2182     outmsg->stats[sumOffset + i] = totalExecTimes[sumOffset + i]/CkNumPes();
2183
2184     // get the ranges and offsets of each metric. With this, we get
2185     //   normalization factors that can be sent back to each processor to
2186     //   be used as necessary. We reuse max for range. Min remains the offset.
2187     outmsg->stats[minOffset + i] = totalExecTimes[minOffset + i];
2188     outmsg->stats[maxOffset + i] = totalExecTimes[maxOffset + i] -
2189       totalExecTimes[minOffset + i];
2190     
2191     // calculate stddev (using biased variance)
2192     outmsg->stats[sosOffset + i] = 
2193       sqrt((totalExecTimes[sosOffset + i] - 
2194             2*(outmsg->stats[i])*totalExecTimes[i] +
2195             (outmsg->stats[i])*(outmsg->stats[i])*CkNumPes())/
2196            CkNumPes());
2197   }
2198
2199   for (int i=0; i<numMetrics; i++) {
2200     // 1) if the proportion of the max value of the entry method relative to
2201     //   the average time taken over all entry methods across all processors
2202     //   is greater than the stipulated percentage threshold ...; AND
2203     // 2) if the range of values are non-zero.
2204     //
2205     // The current assumption is totalTime > 0 (what program has zero total
2206     //   time from all work?)
2207     keepMetric[i] = ((totalExecTimes[maxOffset + i]/(totalTime/CkNumPes()) >=
2208                      entryThreshold) &&
2209       (totalExecTimes[maxOffset + i] > totalExecTimes[minOffset + i]));
2210     if (keepMetric[i]) {
2211       DEBUGF("[%d] Keep EP %d | Max = %lf | Avg Tot = %lf\n", CkMyPe(), i,
2212              totalExecTimes[maxOffset + i], totalTime/CkNumPes());
2213     } else {
2214       DEBUGN("[%d] DO NOT Keep EP %d\n", CkMyPe(), i);
2215     }
2216     outmsg->filter[i] = keepMetric[i];
2217   }
2218
2219   delete msg;
2220   
2221   // initialize k seeds for this phase
2222   kSeeds = new double[numK*numMetrics];
2223
2224   numKReported = 0;
2225   kNumMembers = new int[numK];
2226
2227   // Randomly select k processors' metric vectors for the k seeds
2228   //  srand((unsigned)(CmiWallTimer()*1.0e06));
2229   srand(11337); // for debugging purposes
2230   for (int k=0; k<numK; k++) {
2231     DEBUGF("Seed %d | ", k);
2232     for (int m=0; m<numMetrics; m++) {
2233       double factor = totalExecTimes[maxOffset + m] - 
2234         totalExecTimes[minOffset + m];
2235       // "uniform" distribution, scaled according to the normalization
2236       //   factors
2237       //      kSeeds[numMetrics*k + m] = ((1.0*(k+1))/numK)*factor;
2238       // Random distribution.
2239       kSeeds[numMetrics*k + m] =
2240         ((rand()*1.0)/RAND_MAX)*factor;
2241       if (keepMetric[m]) {
2242         DEBUGF("[%d|%lf] ", m, kSeeds[numMetrics*k + m]);
2243       }
2244       outmsg->kSeedsPos[numMetrics*k + m] = kSeeds[numMetrics*k + m];
2245     }
2246     DEBUGF("\n");
2247     kNumMembers[k] = 0;
2248   }
2249
2250   // broadcast statistical values to all processors for cluster discovery
2251   thisProxy.findInitialClusters(outmsg);
2252 }
2253
2254
2255
2256 // Called on each processor.
2257 void KMeansBOC::findInitialClusters(KMeansStatsMessage *msg) {
2258
2259  if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::findInitialClusters time=\t%g\n", CkMyPe(), CkWallTimer() );
2260
2261   phaseIter = 0;
2262
2263   // Get info from stats message
2264   CkAssert(numMetrics == msg->numMetrics);
2265   for (int i=0; i<numMetrics; i++) {
2266     keepMetric[i] = msg->filter[i];
2267   }
2268
2269   // Normalize data on local processor.
2270   // **CWL** See my thesis for detailed discussion of normalization of
2271   //    performance data.
2272   // **NOTE** This might change if we want to send data based on the filter
2273   //   instead of all the data.
2274   CkAssert(numMetrics*4 == msg->numStats);
2275   for (int i=0; i<numMetrics; i++) {
2276     currentExecTimes[i] -= msg->stats[numMetrics + i];  // take offset
2277     // **CWL** We do not normalize the range. Entry methods that exhibit
2278     //   large absolute timing variations should be allowed to contribute
2279     //   more to the Euclidean distance measure!
2280     // currentExecTimes[i] /= msg->stats[2*numMetrics + i];
2281   }
2282
2283   // **NOTE** This might change if we want to send data based on the filter
2284   //   instead of all the data.
2285   CkAssert(numK*numMetrics == msg->numKPos);
2286   for (int i=0; i<msg->numKPos; i++) {
2287     incKSeeds[i] = msg->kSeedsPos[i];
2288   }
2289
2290   // Decide which KSeed this processor belongs to.
2291   minDistance = calculateDistance(0);
2292   DEBUGN("[%d] Phase %d Iter %d | Distance from 0 = %lf \n", CkMyPe(), 
2293            currentPhase, phaseIter, minDistance);
2294   minK = 0;
2295   for (int i=1; i<numK; i++) {
2296     double distance = calculateDistance(i);
2297     DEBUGN("[%d] Phase %d Iter %d | Distance from %d = %lf \n", CkMyPe(), 
2298              currentPhase, phaseIter, i, distance);
2299     if (distance < minDistance) {
2300       minDistance = distance;
2301       minK = i;
2302     }
2303   }
2304
2305   // Set up a reduction with the modification vector to the root (0).
2306   //
2307   // The modification vector sends a negative value for each metric
2308   //   for the K this processor no longer belongs to and a positive
2309   //   value to the K the processor now belongs. In addition, a -1.0
2310   //   is sent to the K it is leaving and a +1.0 to the K it is 
2311   //   joining.
2312   //
2313   // The processor must still contribute a "zero returns" even if
2314   //   nothing changes. This will be the basis for determine
2315   //   convergence at the root.
2316   //
2317   // The addtional +1 is meant for the count-change that must be
2318   //   maintained for the special cases at the root when some K
2319   //   may be deprived of all processor points or go from 0 to a
2320   //   positive number of processors (see later comments).
2321   double *modVector = new double[numK*(numMetrics+1)];
2322   for (int i=0; i<numK; i++) {
2323     for (int j=0; j<numMetrics+1; j++) {
2324       modVector[i*(numMetrics+1) + j] = 0.0;
2325     }
2326   }
2327   for (int i=0; i<numMetrics; i++) {
2328     // for this initialization, only positive values need be sent.
2329     modVector[minK*(numMetrics+1) + i] = currentExecTimes[i];
2330   }
2331   modVector[minK*(numMetrics+1)+numMetrics] = 1.0;
2332
2333   CkCallback cb(CkIndex_KMeansBOC::updateKSeeds(NULL), 
2334                 0, thisProxy);
2335   contribute(numK*(numMetrics+1)*sizeof(double), modVector, 
2336              CkReduction::sum_double, cb);  
2337 }
2338
2339 double KMeansBOC::calculateDistance(int k) {
2340   double ret = 0.0;
2341   for (int i=0; i<numMetrics; i++) {
2342     if (keepMetric[i]) {
2343       DEBUGN("[%d] Phase %d Iter %d Metric %d Exec %lf Seed %lf \n", 
2344              CkMyPe(), currentPhase, phaseIter, i,
2345                currentExecTimes[i], incKSeeds[k*numMetrics + i]);
2346       ret += pow(currentExecTimes[i] - incKSeeds[k*numMetrics + i], 2.0);
2347     }
2348   }
2349   return sqrt(ret);
2350 }
2351
2352 void KMeansBOC::updateKSeeds(CkReductionMsg *msg) {
2353   CkAssert(CkMyPe() == 0);
2354
2355   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::updateKSeeds time=\t%g\n", CkMyPe(), CkWallTimer() );
2356
2357   double *modVector = (double *)msg->getData();
2358   // sanity check
2359   CkAssert(numK*(numMetrics+1)*sizeof(double) == msg->getSize());
2360
2361   // A quick convergence test.
2362   bool hasChanges = false;
2363   for (int i=0; i<numK; i++) {
2364     hasChanges = hasChanges || 
2365       (modVector[i*(numMetrics+1) + numMetrics] != 0.0);
2366   }
2367   if (!hasChanges) {
2368     delete msg;
2369     findRepresentatives();
2370   } else {
2371     int overallChange = 0;
2372     for (int i=0; i<numK; i++) {
2373       int change = (int)modVector[i*(numMetrics+1) + numMetrics];
2374       if (change != 0) {
2375         overallChange += change;
2376         // modify the k seeds based on the modification vectors coming in
2377         //
2378         // If a seed initially has no members, its contents do not matter and
2379         //   is simply set to the average of the incoming vector.
2380         // If the change causes a seed to lose all its members, do nothing.
2381         //   Its last-known location is kept to allow it to re-capture
2382         //   membership at the next iteration rather than apply the last
2383         //   changes (which snaps the point unnaturally to 0,0).
2384         // Otherwise, apply the appropriate vector changes.
2385         CkAssert((kNumMembers[i] + change >= 0) &&
2386                  (kNumMembers[i] + change <= CkNumPes()));
2387         if (kNumMembers[i] == 0) {
2388           CkAssert(change > 0);
2389           for (int j=0; j<numMetrics; j++) {
2390             kSeeds[i*numMetrics + j] = modVector[i*(numMetrics+1) + j]/change;
2391           }
2392         } else if (kNumMembers[i] + change == 0) {
2393           // do nothing.
2394         } else {
2395           for (int j=0; j<numMetrics; j++) {
2396             kSeeds[i*numMetrics + j] *= kNumMembers[i];
2397             kSeeds[i*numMetrics + j] += modVector[i*(numMetrics+1) + j];
2398             kSeeds[i*numMetrics + j] /= kNumMembers[i] + change;
2399           }
2400         }
2401         kNumMembers[i] += change;
2402       }
2403       DEBUGN("[%d] Phase %d Iter %d K = %d Membership Count = %d\n",
2404              CkMyPe(), currentPhase, phaseIter, i, kNumMembers[i]);
2405     }
2406     delete msg;
2407
2408     // broadcast the new seed locations.
2409     KSeedsMessage *outmsg = new (numK*numMetrics) KSeedsMessage;
2410     outmsg->numKPos = numK*numMetrics;
2411     for (int i=0; i<numK*numMetrics; i++) {
2412       outmsg->kSeedsPos[i] = kSeeds[i];
2413     }
2414
2415     thisProxy.updateSeedMembership(outmsg);
2416   }
2417 }
2418
2419 // Called on all processors
2420 void KMeansBOC::updateSeedMembership(KSeedsMessage *msg) {
2421
2422   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::updateSeedMembership time=\t%g\n", CkMyPe(), CkWallTimer() );
2423
2424   phaseIter++;
2425
2426   // **NOTE** This might change if we want to send data based on the filter
2427   //   instead of all the data.
2428   CkAssert(numK*numMetrics == msg->numKPos);
2429   for (int i=0; i<msg->numKPos; i++) {
2430     incKSeeds[i] = msg->kSeedsPos[i];
2431   }
2432
2433   // Decide which KSeed this processor belongs to.
2434   lastMinK = minK;
2435   minDistance = calculateDistance(0);
2436   DEBUGN("[%d] Phase %d Iter %d | Distance from 0 = %lf \n", CkMyPe(), 
2437          currentPhase, phaseIter, minDistance);
2438
2439   minK = 0;
2440   for (int i=1; i<numK; i++) {
2441     double distance = calculateDistance(i);
2442     DEBUGN("[%d] Phase %d Iter %d | Distance from %d = %lf \n", CkMyPe(), 
2443            currentPhase, phaseIter, i, distance);
2444     if (distance < minDistance) {
2445       minDistance = distance;
2446       minK = i;
2447     }
2448   }
2449
2450   double *modVector = new double[numK*(numMetrics+1)];
2451   for (int i=0; i<numK; i++) {
2452     for (int j=0; j<numMetrics+1; j++) {
2453       modVector[i*(numMetrics+1) + j] = 0.0;
2454     }
2455   }
2456
2457   if (minK != lastMinK) {
2458     for (int i=0; i<numMetrics; i++) {
2459       modVector[minK*(numMetrics+1) + i] = currentExecTimes[i];
2460       modVector[lastMinK*(numMetrics+1) + i] = -currentExecTimes[i];
2461     }
2462     modVector[minK*(numMetrics+1)+numMetrics] = 1.0;
2463     modVector[lastMinK*(numMetrics+1)+numMetrics] = -1.0;
2464   }
2465
2466   CkCallback cb(CkIndex_KMeansBOC::updateKSeeds(NULL), 
2467                 0, thisProxy);
2468   contribute(numK*(numMetrics+1)*sizeof(double), modVector, 
2469              CkReduction::sum_double, cb);  
2470 }
2471
2472 void KMeansBOC::findRepresentatives() {
2473
2474   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::findRepresentatives time=\t%g\n", CkMyPe(), CkWallTimer() );
2475
2476   int numNonEmptyClusters = 0;
2477   for (int i=0; i<numK; i++) {
2478     if (kNumMembers[i] > 0) {
2479       numNonEmptyClusters++;
2480     }
2481   }
2482
2483   int numRepresentatives = peNumKeep;
2484   // **FIXME**
2485   // This is fairly arbitrary. Next time, choose the centers of the top
2486   //   largest clusters.
2487   if (numRepresentatives < numNonEmptyClusters) {
2488     numRepresentatives = numNonEmptyClusters;
2489   }
2490
2491   int slotsRemaining = numRepresentatives;
2492
2493   DEBUGF("Slots = %d | Non-empty = %d \n", slotsRemaining, 
2494          numNonEmptyClusters);
2495
2496   // determine how many exemplars to select per cluster. Currently
2497   //   hardcoded to 1. Future challenge is to decide on other numbers
2498   //   or proportionality.
2499   //
2500   int exemplarsPerCluster = 1;
2501   slotsRemaining -= exemplarsPerCluster*numNonEmptyClusters;
2502
2503   int numCandidateOutliers = CkNumPes() - 
2504     exemplarsPerCluster*numNonEmptyClusters;
2505
2506   double *remainders = new double[numK];
2507   int *assigned = new int[numK];
2508   exemplarChoicesLeft = new int[numK];
2509   outlierChoicesLeft = new int[numK];
2510
2511   for (int i=0; i<numK; i++) {
2512     assigned[i] = 0;
2513     remainders[i] = 
2514       (kNumMembers[i] - exemplarsPerCluster*numNonEmptyClusters) *
2515       slotsRemaining / numCandidateOutliers;
2516     if (remainders[i] >= 0.0) {
2517       assigned[i] = (int)floor(remainders[i]);
2518       remainders[i] -= assigned[i];
2519     } else {
2520       remainders[i] = 0.0;
2521     }
2522   }
2523
2524   for (int i=0; i<numK; i++) {
2525     slotsRemaining -= assigned[i];
2526   }
2527   CkAssert(slotsRemaining >= 0);
2528
2529   // find clusters to assign the loose slots to, in order of
2530   // remainder proportion
2531   while (slotsRemaining > 0) {
2532     double max = 0.0;
2533     int winner = 0;
2534     for (int i=0; i<numK; i++) {
2535       if (remainders[i] > max) {
2536         max = remainders[i];
2537         winner = i;
2538       }
2539     }
2540     assigned[winner]++;
2541     remainders[winner] = 0.0;
2542     slotsRemaining--;
2543   }
2544
2545   // set up how many reduction cycles of min/max we need to conduct to
2546   // select the representatives.
2547   numSelectionIter = exemplarsPerCluster;
2548   for (int i=0; i<numK; i++) {
2549     if (assigned[i] > numSelectionIter) {
2550       numSelectionIter = assigned[i];
2551     }
2552   }
2553   DEBUGF("Selection Iterations = %d\n", numSelectionIter);
2554
2555   for (int i=0; i<numK; i++) {
2556     if (kNumMembers[i] > 0) {
2557       exemplarChoicesLeft[i] = exemplarsPerCluster;
2558       outlierChoicesLeft[i] = assigned[i];
2559     } else {
2560       exemplarChoicesLeft[i] = 0;
2561       outlierChoicesLeft[i] = 0;
2562     }
2563     DEBUGF("%d | Exemplar = %d | Outlier = %d\n", i, exemplarChoicesLeft[i],
2564            outlierChoicesLeft[i]);
2565   }
2566
2567   delete [] assigned;
2568   delete [] remainders;
2569
2570   // send out first broadcast
2571   KSelectionMessage *outmsg = NULL;
2572   if (numSelectionIter > 0) {
2573     outmsg = new (numK, numK, numK) KSelectionMessage;
2574     outmsg->numKMinIDs = numK;
2575     outmsg->numKMaxIDs = numK;
2576     for (int i=0; i<numK; i++) {
2577       outmsg->minIDs[i] = -1;
2578       outmsg->maxIDs[i] = -1;
2579     }
2580     thisProxy.collectDistances(outmsg);
2581   } else {
2582     CkPrintf("Warning: No selection iteration from the start!\n");
2583     // invoke phase completion on all processors
2584     thisProxy.phaseDone();
2585   }
2586 }
2587
2588 /*
2589  *  lastMin = array of minimum champions of the last tournament
2590  *  lastMax = array of maximum champions of the last tournament
2591  *  lastMaxVal = array of last encountered maximum values, allows previous
2592  *                 minimum winners to eliminate themselves from the next
2593  *                 minimum race.
2594  *
2595  *  Called on all processors.
2596  */
2597 void KMeansBOC::collectDistances(KSelectionMessage *msg) {
2598
2599   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::collectDistances time=\t%g\n", CkMyPe(), CkWallTimer() );
2600
2601   DEBUGF("[%d] %d | min = %d max = %d\n", CkMyPe(),
2602          lastMinK, msg->minIDs[lastMinK], msg->maxIDs[lastMinK]);
2603   if ((CkMyPe() == msg->minIDs[lastMinK]) || 
2604       (CkMyPe() == msg->maxIDs[lastMinK])) {
2605     CkAssert(!selected);
2606     selected = true;
2607   }
2608
2609   // build outgoing reduction structure
2610   //   format = minVal | ID | maxVal | ID
2611   double *minMaxAndIDs = NULL;
2612
2613   minMaxAndIDs = new double[numK*4];
2614   // initialize to the appropriate out-of-band values (for error checks)
2615   for (int i=0; i<numK; i++) {
2616     minMaxAndIDs[i*4] = -1.0; // out-of-band min value
2617     minMaxAndIDs[i*4+1] = -1.0; // out of band ID
2618     minMaxAndIDs[i*4+2] = -1.0; // out-of-band max value
2619     minMaxAndIDs[i*4+3] = -1.0; // out of band ID
2620   }
2621   // If I have not won before, I put myself back into the competition
2622   if (!selected) {
2623     DEBUGF("[%d] My Contribution = %lf\n", CkMyPe(), minDistance);
2624     minMaxAndIDs[lastMinK*4] = minDistance;
2625     minMaxAndIDs[lastMinK*4+1] = CkMyPe();
2626     minMaxAndIDs[lastMinK*4+2] = minDistance;
2627     minMaxAndIDs[lastMinK*4+3] = CkMyPe();
2628   }
2629   delete msg;
2630
2631   CkCallback cb(CkIndex_KMeansBOC::findNextMinMax(NULL), 
2632                 0, thisProxy);
2633   contribute(numK*4*sizeof(double), minMaxAndIDs, 
2634              minMaxReductionType, cb);  
2635 }
2636
2637 void KMeansBOC::findNextMinMax(CkReductionMsg *msg) {
2638   // incoming format:
2639   //   minVal | minID | maxVal | maxID
2640
2641   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::findNextMinMax time=\t%g\n", CkMyPe(), CkWallTimer() );
2642
2643   if (numSelectionIter > 0) {
2644     double *incInfo = (double *)msg->getData();
2645     
2646     KSelectionMessage *outmsg = new (numK, numK) KSelectionMessage;
2647     outmsg->numKMinIDs = numK;
2648     outmsg->numKMaxIDs = numK;
2649     
2650     for (int i=0; i<numK; i++) {
2651       DEBUGF("%d | %lf %d %lf %d \n", i, 
2652              incInfo[i*4], (int)incInfo[i*4+1], 
2653              incInfo[i*4+2], (int)incInfo[i*4+3]);
2654     }
2655
2656     for (int i=0; i<numK; i++) {
2657       if (exemplarChoicesLeft[i] > 0) {
2658         outmsg->minIDs[i] = (int)incInfo[i*4+1];
2659         exemplarChoicesLeft[i]--;
2660       } else {
2661         outmsg->minIDs[i] = -1;
2662       }
2663       if (outlierChoicesLeft[i] > 0) {
2664         outmsg->maxIDs[i] = (int)incInfo[i*4+3];
2665         outlierChoicesLeft[i]--;
2666       } else {
2667         outmsg->maxIDs[i] = -1;
2668       }
2669     }
2670     thisProxy.collectDistances(outmsg);
2671     numSelectionIter--;
2672   } else {
2673     // invoke phase completion on all processors
2674     thisProxy.phaseDone();
2675   }
2676 }
2677
2678 /**
2679  *  Completion of the K-Means clustering and data selection of one phase
2680  *    of the computation.
2681  *
2682  *  Called on every processor.
2683  */
2684 void KMeansBOC::phaseDone() {
2685
2686   //  if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::phaseDone time=\t%g\n", CkMyPe(), CkWallTimer() );
2687
2688   LogPool *pool = CkpvAccess(_trace)->_logPool;
2689   CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
2690
2691   // now decide on what to do with the decision.
2692   if (!selected) {
2693     if (usePhases) {
2694       pool->keepPhase[currentPhase] = false;
2695     } else {
2696       // if not using phases, we're working on the whole log
2697       pool->setAllPhases(false);
2698     }
2699   }
2700
2701   // **FIXME** (?) - All processors have to agree on this, or the reduction
2702   //   will not be correct! The question is "is this enforcible?"
2703   if ((currentPhase == (pool->numPhases-1)) || !usePhases) {
2704     // We're done
2705     int dummy = 0;
2706     CkCallback cb(CkIndex_TraceProjectionsBOC::kMeansDone(NULL), 
2707                   0, bocProxy);
2708     contribute(sizeof(int), &dummy, CkReduction::sum_int, cb);
2709   } else {
2710     // reset all phase-based k-means data and decisions
2711
2712     // **FIXME**!!!!!    
2713     
2714     // invoke the next K-Means computation phase.
2715     currentPhase++;
2716     thisProxy[CkMyPe()].getNextPhaseMetrics();
2717   }
2718 }
2719
2720 void TraceProjectionsBOC::startTimeAnalysis()
2721 {
2722   double startTime = 0.0;
2723   if (CkpvAccess(_trace)->_logPool->numEntries>0)
2724      startTime = CkpvAccess(_trace)->_logPool->pool[0].time;
2725   CkCallback cb(CkIndex_TraceProjectionsBOC::startTimeDone(NULL), thisProxy);
2726   contribute(sizeof(double), &startTime, CkReduction::min_double, cb);  
2727 }
2728
2729 void TraceProjectionsBOC::startTimeDone(CkReductionMsg *msg)
2730 {
2731   // CkPrintf("[%d] TraceProjectionsBOC::startTimeDone time=\t%g parModulesRemaining:%d\n", CkMyPe(), CkWallTimer(), parModulesRemaining);
2732
2733   if (CkpvAccess(_trace) != NULL) {
2734     CkpvAccess(_trace)->_logPool->globalStartTime = *(double *)msg->getData();
2735     CkpvAccess(_trace)->_logPool->setNewStartTime();
2736     //if (CkMyPe() == 0) CkPrintf("Start time determined to be %lf us\n", (CkpvAccess(_trace)->_logPool->globalStartTime)*1e06);
2737   }
2738   delete msg;
2739   thisProxy[CkMyPe()].startEndTimeAnalysis();
2740 }
2741
2742 void TraceProjectionsBOC::startEndTimeAnalysis()
2743 {
2744  //CkPrintf("[%d] TraceProjectionsBOC::startEndTimeAnalysis time=\t%g\n", CkMyPe(), CkWallTimer() );
2745
2746   endTime = CkpvAccess(_trace)->endTime;
2747   // CkPrintf("[%d] End time is %lf us\n", CkMyPe(), endTime*1e06);
2748
2749   CkCallback cb(CkIndex_TraceProjectionsBOC::endTimeDone(NULL), 
2750                 0, thisProxy);
2751   contribute(sizeof(double), &endTime, CkReduction::max_double, cb);  
2752 }
2753
2754 void TraceProjectionsBOC::endTimeDone(CkReductionMsg *msg)
2755 {
2756  //if(CkMyPe()==0)    CkPrintf("[%d] TraceProjectionsBOC::endTimeDone time=\t%g parModulesRemaining:%d\n", CkMyPe(), CkWallTimer(), parModulesRemaining);
2757
2758   CkAssert(CkMyPe() == 0);
2759   parModulesRemaining--;
2760   if (CkpvAccess(_trace) != NULL) {
2761     CkpvAccess(_trace)->_logPool->globalEndTime = *(double *)msg->getData() - CkpvAccess(_trace)->_logPool->globalStartTime;
2762     // CkPrintf("End time determined to be %lf us\n",
2763     //       (CkpvAccess(_trace)->_logPool->globalEndTime)*1e06);
2764   }
2765   delete msg;
2766   if (parModulesRemaining == 0) {
2767     thisProxy[CkMyPe()].finalize();
2768   }
2769 }
2770
2771 void TraceProjectionsBOC::kMeansDone(CkReductionMsg *msg) {
2772
2773  if(CkMyPe()==0)  CkPrintf("[%d] TraceProjectionsBOC::kMeansDone time=\t%g\n", CkMyPe(), CkWallTimer() );
2774
2775   CkAssert(CkMyPe() == 0);
2776   parModulesRemaining--;
2777   CkPrintf("K-Means Analysis Time = %lf seconds\n",
2778            CmiWallTimer()-analysisStartTime);
2779   delete msg;
2780   if (parModulesRemaining == 0) {
2781     thisProxy[CkMyPe()].finalize();
2782   }
2783 }
2784
2785 /**
2786  *
2787  *  This version is called (on processor 0) only if flushCheck fails.
2788  *
2789  */
2790 void TraceProjectionsBOC::kMeansDone() {
2791   CkAssert(CkMyPe() == 0);
2792   parModulesRemaining--;
2793   CkPrintf("K-Means Analysis Aborted because of flush. Time taken = %lf seconds\n",
2794            CmiWallTimer()-analysisStartTime);
2795   if (parModulesRemaining == 0) {
2796     thisProxy[CkMyPe()].finalize();
2797   }
2798 }
2799
2800 void TraceProjectionsBOC::finalize()
2801 {
2802   CkAssert(CkMyPe() == 0);
2803   //CkPrintf("Total Analysis Time = %lf seconds\n", 
2804   //       CmiWallTimer()-analysisStartTime);
2805   thisProxy.closingTraces();
2806 }
2807
2808 // Called on every processor
2809 void TraceProjectionsBOC::closingTraces() {
2810   CkpvAccess(_trace)->closeTrace();
2811
2812     // subtle:  reduction needs to go to the PE which started CkExit()
2813   int pe = 0;
2814   if (endPe != -1) pe = endPe;
2815   CkCallback cb(CkIndex_TraceProjectionsBOC::closeParallelShutdown(NULL), 
2816                 pe, thisProxy); 
2817   contribute(0, NULL, CkReduction::sum_int, cb);  
2818 }
2819
2820 // The sole purpose of this reduction is to decide whether or not
2821 //   Projections as a module needs to call CkExit() to get other
2822 //   modules to shutdown.
2823 void TraceProjectionsBOC::closeParallelShutdown(CkReductionMsg *msg) {
2824   CkAssert(endPe == -1 && CkMyPe() ==0 || CkMyPe() == endPe);
2825   delete msg;
2826   // decide if CkExit() needs to be called
2827   if (!CkpvAccess(_trace)->converseExit) {
2828     CkExit();
2829   }
2830 }
2831 /*
2832  *  Registration and definition of the Outlier Reduction callback.
2833  *  Format: Sum | Min | Max | Sum of Squares
2834  */
2835 CkReductionMsg *outlierReduction(int nMsgs,
2836                                  CkReductionMsg **msgs) {
2837   int numBytes = 0;
2838   int numMetrics = 0;
2839   double *ret = NULL;
2840
2841   if (nMsgs == 1) {
2842     // nothing to do, just pass it on
2843     return CkReductionMsg::buildNew(msgs[0]->getSize(),msgs[0]->getData());
2844   }
2845
2846   if (nMsgs > 1) {
2847     numBytes = msgs[0]->getSize();
2848     // sanity checks
2849     if (numBytes%sizeof(double) != 0) {
2850       CkAbort("Outlier Reduction Size incompatible with doubles!\n");
2851     }
2852     if ((numBytes/sizeof(double))%4 != 0) {
2853       CkAbort("Outlier Reduction Size Array not divisible by 4!\n");
2854     }
2855     numMetrics = (numBytes/sizeof(double))/4;
2856     ret = new double[numMetrics*4];
2857
2858     // copy the first message data into the return structure first
2859     for (int i=0; i<numMetrics*4; i++) {
2860       ret[i] = ((double *)msgs[0]->getData())[i];
2861     }
2862
2863     // Sum | Min | Max | Sum of Squares
2864     for (int msgIdx=1; msgIdx<nMsgs; msgIdx++) {
2865       for (int i=0; i<numMetrics; i++) {
2866         // Sum
2867         ret[i] += ((double *)msgs[msgIdx]->getData())[i];
2868         // Min
2869         ret[numMetrics + i] =
2870           (ret[numMetrics + i] < 
2871            ((double *)msgs[msgIdx]->getData())[numMetrics + i]) 
2872           ? ret[numMetrics + i] : 
2873           ((double *)msgs[msgIdx]->getData())[numMetrics + i];
2874         // Max
2875         ret[2*numMetrics + i] = 
2876           (ret[2*numMetrics + i] >
2877            ((double *)msgs[msgIdx]->getData())[2*numMetrics + i])
2878           ? ret[2*numMetrics + i] :
2879           ((double *)msgs[msgIdx]->getData())[2*numMetrics + i];
2880         // Sum of Squares (squaring already done at leaf)
2881         ret[3*numMetrics + i] +=
2882           ((double *)msgs[msgIdx]->getData())[3*numMetrics + i];
2883       }
2884     }
2885   }
2886   
2887   /* apparently, we do not delete the incoming messages */
2888   return CkReductionMsg::buildNew(numBytes,ret);
2889 }
2890
2891 /*
2892  * The only reason we have a user-defined reduction is to support
2893  *   identification of the "winning" processors as well as to handle
2894  *   both the min and the max of each "tournament". A simple min/max
2895  *   discovery cannot handle ties.
2896  */
2897 CkReductionMsg *minMaxReduction(int nMsgs,
2898                                 CkReductionMsg **msgs) {
2899   CkAssert(nMsgs > 0);
2900
2901   int numBytes = msgs[0]->getSize();
2902   CkAssert(numBytes%sizeof(double) == 0);
2903   int numK = (numBytes/sizeof(double))/4;
2904
2905   double *ret = new double[numK*4];
2906   // fill with out-of-band values
2907   for (int i=0; i<numK; i++) {
2908     ret[i*4] = -1.0;
2909     ret[i*4+1] = -1.0;
2910     ret[i*4+2] = -1.0;
2911     ret[i*4+3] = -1.0;
2912   }
2913
2914   // incoming format K * (minVal | minIdx | maxVal | maxIdx)
2915   for (int i=0; i<nMsgs; i++) {
2916     double *temp = (double *)msgs[i]->getData();
2917     for (int j=0; j<numK; j++) {
2918       // no previous valid min
2919       if (ret[j*4+1] < 0) {
2920         // fill it in only if the incoming min is valid
2921         if (temp[j*4+1] >= 0) {
2922           ret[j*4] = temp[j*4];      // fill min value
2923           ret[j*4+1] = temp[j*4+1];  // fill ID
2924         }
2925       } else {
2926         // find Min, only if incoming min is valid
2927         if (temp[j*4+1] >= 0) {
2928           if (temp[j*4] < ret[j*4]) {
2929             ret[j*4] = temp[j*4];      // replace min value
2930             ret[j*4+1] = temp[j*4+1];  // replace ID
2931           }
2932         }
2933       }
2934       // no previous valid max
2935       if (ret[j*4+3] < 0) {
2936         // fill only if the incoming max is valid
2937         if (temp[j*4+3] >= 0) {
2938           ret[j*4+2] = temp[j*4+2];  // fill max value
2939           ret[j*4+3] = temp[j*4+3];  // fill ID
2940         }
2941       } else {
2942         // find Max, only if incoming max is valid
2943         if (temp[j*4+3] >= 0) {
2944           if (temp[j*4+2] > ret[j*4+2]) {
2945             ret[j*4+2] = temp[j*4+2];  // replace max value
2946             ret[j*4+3] = temp[j*4+3];  // replace ID
2947           }
2948         }
2949       }
2950     }
2951   }
2952   CkReductionMsg *redmsg = CkReductionMsg::buildNew(numBytes, ret);
2953   delete [] ret;
2954   return redmsg;
2955 }
2956
2957 #include "TraceProjections.def.h"
2958 #endif //PROJ_ANALYSIS
2959
2960 /*@}*/