Merge branch 'charm' of charmgit:charm into charm
[charm.git] / src / ck-perf / trace-projections.C
1 /**
2  * \addtogroup CkPerf
3 */
4 /*@{*/
5
6 #include <string.h>
7
8 #include "charm++.h"
9 #include "trace-projections.h"
10 #include "trace-projectionsBOC.h"
11
12 #if DEBUG_PROJ
13 #define DEBUGF(format, ...) CkPrintf(format, ## __VA_ARGS__)
14 #else
15 #define DEBUGF(format, ...)           // CmiPrintf x
16 #endif
17 #define DEBUGN(format, ...)  // easy way to selectively disable DEBUGs
18
19 #define DefaultLogBufSize      1000000
20
21 // **CW** Simple delta encoding implementation
22 // delta encoding is on by default. It may be turned off later in
23 // the runtime.
24 int deltaLog;
25 int nonDeltaLog;
26
27 int checknested=0;              // check illegal nested begin/end execute 
28
29 #ifdef PROJ_ANALYSIS
30 // BOC operations readonlys
31 CkGroupID traceProjectionsGID;
32 CkGroupID kMeansGID;
33
34 // New reduction type for Outlier Analysis purposes. This is allowed to be
35 // a global variable according to the Charm++ manual.
36 CkReductionMsg *outlierReduction(int nMsgs,
37                                  CkReductionMsg **msgs);
38 CkReductionMsg *minMaxReduction(int nMsgs,
39                                 CkReductionMsg **msgs);
40 CkReduction::reducerType outlierReductionType;
41 CkReduction::reducerType minMaxReductionType;
42 #endif // PROJ_ANALYSIS
43
44 CkpvStaticDeclare(TraceProjections*, _trace);
45 CtvStaticDeclare(int,curThreadEvent);
46
47 CkpvDeclare(CmiInt8, CtrLogBufSize);
48
49 typedef CkVec<char *>  usrEventVec;
50 CkpvStaticDeclare(usrEventVec, usrEventlist);
51 class UsrEvent {
52 public:
53   int e;
54   char *str;
55   UsrEvent(int _e, char* _s): e(_e),str(_s) {}
56 };
57 CkpvStaticDeclare(CkVec<UsrEvent *>*, usrEvents);
58
59 // When tracing is disabled, these are defined as empty static inlines
60 // in the header, to minimize overhead
61 #if CMK_TRACE_ENABLED
62 /// Disable the outputting of the trace logs
63 void disableTraceLogOutput()
64
65   CkpvAccess(_trace)->setWriteData(false);
66 }
67
68 /// Enable the outputting of the trace logs
69 void enableTraceLogOutput()
70 {
71   CkpvAccess(_trace)->setWriteData(true);
72 }
73
74 /// Force the log files to be flushed
75 void flushTraceLog()
76 {
77   CkpvAccess(_trace)->traceFlushLog();
78 }
79 #endif
80
81 #if ! CMK_TRACE_ENABLED
82 static int warned=0;
83 #define OPTIMIZED_VERSION       \
84         if (!warned) { warned=1;        \
85         CmiPrintf("\n\n!!!! Warning: traceUserEvent not available in optimized version!!!!\n\n\n"); }
86 #else
87 #define OPTIMIZED_VERSION /*empty*/
88 #endif // CMK_TRACE_ENABLED
89
90 /*
91 On T3E, we need to have file number control by open/close files only when needed.
92 */
93 #if CMK_TRACE_LOGFILE_NUM_CONTROL
94   #define OPEN_LOG openLog("a");
95   #define CLOSE_LOG closeLog();
96 #else
97   #define OPEN_LOG
98   #define CLOSE_LOG
99 #endif //CMK_TRACE_LOGFILE_NUM_CONTROL
100
101 #if CMK_HAS_COUNTER_PAPI
102 int papiEvents[NUMPAPIEVENTS] = { PAPI_L2_DCM, PAPI_FP_OPS };
103 #endif // CMK_HAS_COUNTER_PAPI
104
105 /**
106   For each TraceFoo module, _createTraceFoo() must be defined.
107   This function is called in _createTraces() generated in moduleInit.C
108 */
109 void _createTraceprojections(char **argv)
110 {
111   DEBUGF("%d createTraceProjections\n", CkMyPe());
112   CkpvInitialize(CkVec<char *>, usrEventlist);
113   CkpvInitialize(CkVec<UsrEvent *>*, usrEvents);
114   CkpvAccess(usrEvents) = new CkVec<UsrEvent *>();
115 #if CMK_BLUEGENE_CHARM
116   // CthRegister does not call the constructor
117 //  CkpvAccess(usrEvents) = CkVec<UsrEvent *>();
118 #endif //CMK_BLUEGENE_CHARM
119   CkpvInitialize(TraceProjections*, _trace);
120   CkpvAccess(_trace) = new  TraceProjections(argv);
121   CkpvAccess(_traces)->addTrace(CkpvAccess(_trace));
122   if (CkMyPe()==0) CkPrintf("Charm++: Tracemode Projections enabled.\n");
123 }
124  
125 /* ****** CW TEMPORARY LOCATION ***** Support for thread listeners */
126
127 struct TraceThreadListener {
128   struct CthThreadListener base;
129   int event;
130   int msgType;
131   int ep;
132   int srcPe;
133   int ml;
134   CmiObjId idx;
135 };
136
137 extern "C"
138 void traceThreadListener_suspend(struct CthThreadListener *l)
139 {
140   TraceThreadListener *a=(TraceThreadListener *)l;
141   /* here, we activate the appropriate trace codes for the appropriate
142      registered modules */
143   traceSuspend();
144 }
145
146 extern "C"
147 void traceThreadListener_resume(struct CthThreadListener *l) 
148 {
149   TraceThreadListener *a=(TraceThreadListener *)l;
150   /* here, we activate the appropriate trace codes for the appropriate
151      registered modules */
152   _TRACE_BEGIN_EXECUTE_DETAILED(a->event,a->msgType,a->ep,a->srcPe,a->ml,
153                                 CthGetThreadID(a->base.thread));
154   a->event=-1;
155   a->srcPe=CkMyPe(); /* potential lie to migrated threads */
156   a->ml=0;
157 }
158
159 extern "C"
160 void traceThreadListener_free(struct CthThreadListener *l) 
161 {
162   TraceThreadListener *a=(TraceThreadListener *)l;
163   delete a;
164 }
165
166 void TraceProjections::traceAddThreadListeners(CthThread tid, envelope *e)
167 {
168 #if CMK_TRACE_ENABLED
169   /* strip essential information from the envelope */
170   TraceThreadListener *a= new TraceThreadListener;
171   
172   a->base.suspend=traceThreadListener_suspend;
173   a->base.resume=traceThreadListener_resume;
174   a->base.free=traceThreadListener_free;
175   a->event=e->getEvent();
176   a->msgType=e->getMsgtype();
177   a->ep=e->getEpIdx();
178   a->srcPe=e->getSrcPe();
179   a->ml=e->getTotalsize();
180
181   CthAddListener(tid, (CthThreadListener *)a);
182 #endif
183 }
184
185 void LogPool::openLog(const char *mode)
186 {
187 #if CMK_PROJECTIONS_USE_ZLIB
188   if(compressed) {
189     if (nonDeltaLog) {
190       do {
191         zfp = gzopen(fname, mode);
192       } while (!zfp && (errno == EINTR || errno == EMFILE));
193       if(!zfp) CmiAbort("Cannot open Projections Compressed Non Delta Trace File for writing...\n");
194     }
195     if (deltaLog) {
196       do {
197         deltazfp = gzopen(dfname, mode);
198       } while (!deltazfp && (errno == EINTR || errno == EMFILE));
199       if (!deltazfp) 
200         CmiAbort("Cannot open Projections Compressed Delta Trace File for writing...\n");
201     }
202   } else {
203     if (nonDeltaLog) {
204       do {
205         fp = fopen(fname, mode);
206       } while (!fp && (errno == EINTR || errno == EMFILE));
207       if (!fp) {
208         CkPrintf("[%d] Attempting to open file [%s]\n",CkMyPe(),fname);
209         CmiAbort("Cannot open Projections Non Delta Trace File for writing...\n");
210       }
211     }
212     if (deltaLog) {
213       do {
214         deltafp = fopen(dfname, mode);
215       } while (!deltafp && (errno == EINTR || errno == EMFILE));
216       if (!deltafp) 
217         CmiAbort("Cannot open Projections Delta Trace File for writing...\n");
218     }
219   }
220 #else
221   if (nonDeltaLog) {
222     do {
223       fp = fopen(fname, mode);
224     } while (!fp && (errno == EINTR || errno == EMFILE));
225     if (!fp) {
226       CkPrintf("[%d] Attempting to open file [%s]\n",CkMyPe(),fname);
227       CmiAbort("Cannot open Projections Non Delta Trace File for writing...\n");
228     }
229   }
230   if (deltaLog) {
231     do {
232       deltafp = fopen(dfname, mode);
233     } while (!deltafp && (errno == EINTR || errno == EMFILE));
234     if(!deltafp) 
235       CmiAbort("Cannot open Projections Delta Trace File for writing...\n");
236   }
237 #endif
238 }
239
240 void LogPool::closeLog(void)
241 {
242 #if CMK_PROJECTIONS_USE_ZLIB
243   if(compressed) {
244     if (nonDeltaLog) gzclose(zfp);
245     if (deltaLog) gzclose(deltazfp);
246     return;
247   }
248 #endif
249   if (nonDeltaLog) { 
250 #if !defined(_WIN32) || defined(__CYGWIN__)
251     fsync(fileno(fp)); 
252 #endif
253     fclose(fp); 
254   }
255   if (deltaLog)  { 
256 #if !defined(_WIN32) || defined(__CYGWIN__)
257     fsync(fileno(deltafp)); 
258 #endif
259     fclose(deltafp);  
260   }
261 }
262
263 LogPool::LogPool(char *pgm) {
264   pool = new LogEntry[CkpvAccess(CtrLogBufSize)];
265   // defaults to writing data (no outlier changes)
266   writeData = true;
267   numEntries = 0;
268   // **CW** for simple delta encoding
269   prevTime = 0.0;
270   timeErr = 0.0;
271   globalStartTime = 0.0;
272   globalEndTime = 0.0;
273   headerWritten = 0;
274   numPhases = 0;
275   hasFlushed = false;
276
277   keepPhase = NULL;
278
279   fileCreated = false;
280   poolSize = CkpvAccess(CtrLogBufSize);
281   pgmname = new char[strlen(pgm)+1];
282   strcpy(pgmname, pgm);
283 }
284
285 void LogPool::createFile(const char *fix)
286 {
287   if (fileCreated) {
288     return;
289   }
290
291   char* filenameLastPart = strrchr(pgmname, PATHSEP) + 1; // Last occurrence of path separator
292   char *pathPlusFilePrefix = new char[1024];
293
294   if(nSubdirs > 0){
295     int sd = CkMyPe() % nSubdirs;
296     char *subdir = new char[1024];
297     sprintf(subdir, "%s.projdir.%d", pgmname, sd);
298     CmiMkdir(subdir);
299     sprintf(pathPlusFilePrefix, "%s%c%s%s", subdir, PATHSEP, filenameLastPart, fix);
300     delete[] subdir;
301   } else {
302     sprintf(pathPlusFilePrefix, "%s%s", pgmname, fix);
303   }
304
305   char pestr[10];
306   sprintf(pestr, "%d", CkMyPe());
307 #if CMK_PROJECTIONS_USE_ZLIB
308   int len;
309   if(compressed)
310     len = strlen(pathPlusFilePrefix)+strlen(".logold")+strlen(pestr)+strlen(".gz")+3;
311   else
312     len = strlen(pathPlusFilePrefix)+strlen(".logold")+strlen(pestr)+3;
313 #else
314   int len = strlen(pathPlusFilePrefix)+strlen(".logold")+strlen(pestr)+3;
315 #endif
316
317   if (nonDeltaLog) {
318     fname = new char[len];
319   }
320   if (deltaLog) {
321     dfname = new char[len];
322   }
323 #if CMK_PROJECTIONS_USE_ZLIB
324   if(compressed) {
325     if (deltaLog && nonDeltaLog) {
326       sprintf(fname, "%s.%s.logold.gz",  pathPlusFilePrefix, pestr);
327       sprintf(dfname, "%s.%s.log.gz", pathPlusFilePrefix, pestr);
328     } else {
329       if (nonDeltaLog) {
330         sprintf(fname, "%s.%s.log.gz", pathPlusFilePrefix,pestr);
331       } else {
332         sprintf(dfname, "%s.%s.log.gz", pathPlusFilePrefix, pestr);
333       }
334     }
335   } else {
336     if (deltaLog && nonDeltaLog) {
337       sprintf(fname, "%s.%s.logold", pathPlusFilePrefix, pestr);
338       sprintf(dfname, "%s.%s.log", pathPlusFilePrefix, pestr);
339     } else {
340       if (nonDeltaLog) {
341         sprintf(fname, "%s.%s.log", pathPlusFilePrefix, pestr);
342       } else {
343         sprintf(dfname, "%s.%s.log", pathPlusFilePrefix, pestr);
344       }
345     }
346   }
347 #else
348   if (deltaLog && nonDeltaLog) {
349     sprintf(fname, "%s.%s.logold", pathPlusFilePrefix, pestr);
350     sprintf(dfname, "%s.%s.log", pathPlusFilePrefix, pestr);
351   } else {
352     if (nonDeltaLog) {
353       sprintf(fname, "%s.%s.log", pathPlusFilePrefix, pestr);
354     } else {
355       sprintf(dfname, "%s.%s.log", pathPlusFilePrefix, pestr);
356     }
357   }
358 #endif
359   fileCreated = true;
360   delete[] pathPlusFilePrefix;
361   openLog("w+");
362   CLOSE_LOG 
363 }
364
365 void LogPool::createSts(const char *fix)
366 {
367   CkAssert(CkMyPe() == 0);
368   // create the sts file
369   char *fname = new char[strlen(CkpvAccess(traceRoot))+strlen(fix)+strlen(".sts")+2];
370   sprintf(fname, "%s%s.sts", CkpvAccess(traceRoot), fix);
371   do
372     {
373       stsfp = fopen(fname, "w");
374     } while (!stsfp && (errno == EINTR || errno == EMFILE));
375   if(stsfp==0){
376     CmiPrintf("Cannot open projections sts file for writing due to %s\n", strerror(errno));
377     CmiAbort("Error!!\n");
378   }
379   delete[] fname;
380 }  
381
382 void LogPool::createRC()
383 {
384   // create the projections rc file.
385   fname = 
386     new char[strlen(CkpvAccess(traceRoot))+strlen(".projrc")+1];
387   sprintf(fname, "%s.projrc", CkpvAccess(traceRoot));
388   do {
389     rcfp = fopen(fname, "w");
390   } while (!rcfp && (errno == EINTR || errno == EMFILE));
391   if (rcfp==0) {
392     CmiAbort("Cannot open projections configuration file for writing.\n");
393   }
394   delete[] fname;
395 }
396
397 LogPool::~LogPool() 
398 {
399   if (writeData) {
400     writeLog();
401 #if !CMK_TRACE_LOGFILE_NUM_CONTROL
402     closeLog();
403 #endif
404   }
405
406 #if CMK_BLUEGENE_CHARM
407   extern int correctTimeLog;
408   if (correctTimeLog) {
409     createFile("-bg");
410     if (CkMyPe() == 0) {
411       createSts("-bg");
412     }
413     writeHeader();
414     if (CkMyPe() == 0) writeSts(NULL);
415     postProcessLog();
416   }
417 #endif
418
419   delete[] pool;
420   delete [] fname;
421 }
422
423 void LogPool::writeHeader()
424 {
425   if (headerWritten) return;
426   headerWritten = 1;
427   if(!binary) {
428 #if CMK_PROJECTIONS_USE_ZLIB
429     if(compressed) {
430       if (nonDeltaLog) {
431         gzprintf(zfp, "PROJECTIONS-RECORD %d\n", numEntries);
432       }
433       if (deltaLog) {
434         gzprintf(deltazfp, "PROJECTIONS-RECORD %d DELTA\n", numEntries);
435       }
436     } 
437     else /* else clause is below... */
438 #endif
439     /*... may hang over from else above */ {
440       if (nonDeltaLog) {
441         fprintf(fp, "PROJECTIONS-RECORD %d\n", numEntries);
442       }
443       if (deltaLog) {
444         fprintf(deltafp, "PROJECTIONS-RECORD %d DELTA\n", numEntries);
445       }
446     }
447   }
448   else { // binary
449       if (nonDeltaLog) {
450         fwrite(&numEntries,sizeof(numEntries),1,fp);
451       }
452       if (deltaLog) {
453         fwrite(&numEntries,sizeof(numEntries),1,deltafp);
454       }
455   }
456 }
457
458 void LogPool::writeLog(void)
459 {
460   createFile();
461   OPEN_LOG
462   writeHeader();
463   if (nonDeltaLog) write(0);
464   if (deltaLog) write(1);
465   CLOSE_LOG
466 }
467
468 void LogPool::write(int writedelta) 
469 {
470   // **CW** Simple delta encoding implementation
471   // prevTime has to be maintained as an object variable because
472   // LogPool::write may be called several times depending on the
473   // +logsize value.
474   PUP::er *p = NULL;
475   if (binary) {
476     p = new PUP::toDisk(writedelta?deltafp:fp);
477   }
478 #if CMK_PROJECTIONS_USE_ZLIB
479   else if (compressed) {
480     p = new toProjectionsGZFile(writedelta?deltazfp:zfp);
481   }
482 #endif
483   else {
484     p = new toProjectionsFile(writedelta?deltafp:fp);
485   }
486   CmiAssert(p);
487   int curPhase = 0;
488   // **FIXME** - Should probably consider a more sophisticated bounds-based
489   //   approach for selective writing instead of making multiple if-checks
490   //   for every single event.
491   for(UInt i=0; i<numEntries; i++) {
492     if (!writedelta) {
493       if (keepPhase == NULL) {
494         // default case, when no phase selection is required.
495         pool[i].pup(*p);
496       } else {
497         // **FIXME** Might be a good idea to create a "filler" event block for
498         //   all the events taken out by phase filtering.
499         if (pool[i].type == END_PHASE) {
500           // always write phase markers
501           pool[i].pup(*p);
502           curPhase++;
503         } else if (pool[i].type == BEGIN_COMPUTATION ||
504                    pool[i].type == END_COMPUTATION) {
505           // always write BEGIN and END COMPUTATION markers
506           pool[i].pup(*p);
507         } else if (keepPhase[curPhase]) {
508           pool[i].pup(*p);
509         }
510       }
511     }
512     else {      // delta
513       // **FIXME** Implement phase-selective writing for delta logs
514       //   eventually
515       double time = pool[i].time;
516       if (pool[i].type != BEGIN_COMPUTATION && pool[i].type != END_COMPUTATION)
517       {
518         double timeDiff = (time-prevTime)*1.0e6;
519         UInt intTimeDiff = (UInt)timeDiff;
520         timeErr += timeDiff - intTimeDiff; /* timeErr is never >= 2.0 */
521         if (timeErr > 1.0) {
522           timeErr -= 1.0;
523           intTimeDiff++;
524         }
525         pool[i].time = intTimeDiff/1.0e6;
526       }
527       pool[i].pup(*p);
528       pool[i].time = time;      // restore time value
529       prevTime = time;
530     }
531   }
532   delete p;
533   delete [] keepPhase;
534 }
535
536 void LogPool::writeSts(void)
537 {
538   // for whining compilers
539   int i;
540   char name[30];
541   // generate an automatic unique ID for each log
542   fprintf(stsfp, "PROJECTIONS_ID %s\n", "");
543   fprintf(stsfp, "VERSION %s\n", PROJECTION_VERSION);
544   fprintf(stsfp, "TOTAL_PHASES %d\n", numPhases);
545 #if CMK_HAS_COUNTER_PAPI
546   fprintf(stsfp, "TOTAL_PAPI_EVENTS %d\n", NUMPAPIEVENTS);
547   // for now, use i, next time use papiEvents[i].
548   // **CW** papi event names is a hack.
549   char eventName[PAPI_MAX_STR_LEN];
550   for (i=0;i<NUMPAPIEVENTS;i++) {
551     PAPI_event_code_to_name(papiEvents[i], eventName);
552     fprintf(stsfp, "PAPI_EVENT %d %s\n", i, eventName);
553   }
554 #endif
555   traceWriteSTS(stsfp,CkpvAccess(usrEvents)->length());
556   for(i=0;i<CkpvAccess(usrEvents)->length();i++){
557     fprintf(stsfp, "EVENT %d %s\n", (*CkpvAccess(usrEvents))[i]->e, (*CkpvAccess(usrEvents))[i]->str);
558   }     
559 }
560
561 void LogPool::writeSts(TraceProjections *traceProj){
562   writeSts();
563   if (traceProj != NULL) {
564     CkHashtableIterator  *funcIter = traceProj->getfuncIterator();
565     funcIter->seekStart();
566     int numFuncs = traceProj->getFuncNumber();
567     fprintf(stsfp,"TOTAL_FUNCTIONS %d \n",numFuncs);
568     while(funcIter->hasNext()) {
569       StrKey *key;
570       int *obj = (int *)funcIter->next((void **)&key);
571       fprintf(stsfp,"FUNCTION %d %s \n",*obj,key->getStr());
572     }
573   }
574   fprintf(stsfp, "END\n");
575   fclose(stsfp);
576 }
577
578 void LogPool::writeRC(void)
579 {
580     //CkPrintf("write RC is being executed\n");
581 #ifdef PROJ_ANALYSIS  
582     CkAssert(CkMyPe() == 0);
583     fprintf(rcfp,"RC_GLOBAL_START_TIME %lld\n",
584           (CMK_TYPEDEF_UINT8)(1.0e6*globalStartTime));
585     fprintf(rcfp,"RC_GLOBAL_END_TIME   %lld\n",
586           (CMK_TYPEDEF_UINT8)(1.0e6*globalEndTime));
587     /* //Yanhua comment it because isOutlierAutomatic is not a variable in trace
588     if (CkpvAccess(_trace)->isOutlierAutomatic()) {
589       fprintf(rcfp,"RC_OUTLIER_FILTERED true\n");
590     } else {
591       fprintf(rcfp,"RC_OUTLIER_FILTERED false\n");
592     }
593     */
594 #endif //PROJ_ANALYSIS
595   fclose(rcfp);
596 }
597
598
599 #if CMK_BLUEGENE_CHARM
600 static void updateProjLog(void *data, double t, double recvT, void *ptr)
601 {
602   LogEntry *log = (LogEntry *)data;
603   FILE *fp = *(FILE **)ptr;
604   log->time = t;
605   log->recvTime = recvT<0.0?0:recvT;
606 //  log->write(fp);
607   toProjectionsFile p(fp);
608   log->pup(p);
609 }
610 #endif
611
612 // flush log entries to disk
613 void LogPool::flushLogBuffer()
614 {
615   if (numEntries) {
616     double writeTime = TraceTimer();
617     writeLog();
618     hasFlushed = true;
619     numEntries = 0;
620     new (&pool[numEntries++]) LogEntry(writeTime, BEGIN_INTERRUPT);
621     new (&pool[numEntries++]) LogEntry(TraceTimer(), END_INTERRUPT);
622   }
623 }
624
625 void LogPool::add(UChar type, UShort mIdx, UShort eIdx,
626                   double time, int event, int pe, int ml, CmiObjId *id, 
627                   double recvT, double cpuT, int numPe)
628 {
629   new (&pool[numEntries++])
630     LogEntry(time, type, mIdx, eIdx, event, pe, ml, id, recvT, cpuT, numPe);
631   if ((type == END_PHASE) || (type == END_COMPUTATION)) {
632     numPhases++;
633   }
634   if(poolSize==numEntries) {
635     flushLogBuffer();
636 #if CMK_BLUEGENE_CHARM
637     extern int correctTimeLog;
638     if (correctTimeLog) CmiAbort("I/O interrupt!\n");
639 #endif
640   }
641 #if CMK_BLUEGENE_CHARM
642   switch (type) {
643     case BEGIN_PROCESSING:
644       pool[numEntries-1].recvTime = BgGetRecvTime();
645     case END_PROCESSING:
646     case BEGIN_COMPUTATION:
647     case END_COMPUTATION:
648     case CREATION:
649     case BEGIN_PACK:
650     case END_PACK:
651     case BEGIN_UNPACK:
652     case END_UNPACK:
653     case USER_EVENT_PAIR:
654       bgAddProjEvent(&pool[numEntries-1], numEntries-1, time, updateProjLog, &fp, BG_EVENT_PROJ);
655   }
656 #endif
657 }
658
659 void LogPool::add(UChar type,double time,UShort funcID,int lineNum,char *fileName){
660 #ifndef CMK_BLUEGENE_CHARM
661   new (&pool[numEntries++])
662         LogEntry(time,type,funcID,lineNum,fileName);
663   if(poolSize == numEntries){
664     flushLogBuffer();
665   }
666 #endif  
667 }
668
669
670   
671 void LogPool::addMemoryUsage(unsigned char type,double time,double memUsage){
672 #ifndef CMK_BLUEGENE_CHARM
673   new (&pool[numEntries++])
674         LogEntry(type,time,memUsage);
675   if(poolSize == numEntries){
676     flushLogBuffer();
677   }
678 #endif  
679         
680 }  
681
682
683
684 void LogPool::addUserSupplied(int data){
685         // add an event
686         add(USER_SUPPLIED, 0, 0, TraceTimer(), -1, -1, 0, 0, 0, 0, 0 );
687
688         // set the user supplied value for the previously created event 
689         pool[numEntries-1].setUserSuppliedData(data);
690   }
691
692
693 void LogPool::addUserSuppliedNote(char *note){
694         // add an event
695         add(USER_SUPPLIED_NOTE, 0, 0, TraceTimer(), -1, -1, 0, 0, 0, 0, 0 );
696
697         // set the user supplied note for the previously created event 
698         pool[numEntries-1].setUserSuppliedNote(note);
699   }
700
701 void LogPool::addUserSuppliedBracketedNote(char *note, int eventID, double bt, double et){
702   //CkPrintf("LogPool::addUserSuppliedBracketedNote eventID=%d\n", eventID);
703 #ifndef CMK_BLUEGENE_CHARM
704 #if MPI_TRACE_MACHINE_HACK
705   //This part of code is used  to combine the contiguous
706   //MPI_Test and MPI_Iprobe events to reduce the number of
707   //entries
708 #define MPI_TEST_EVENT_ID 60
709 #define MPI_IPROBE_EVENT_ID 70 
710   int lastEvent = pool[numEntries-1].event;
711   if((eventID==MPI_TEST_EVENT_ID || eventID==MPI_IPROBE_EVENT_ID) && (eventID==lastEvent)){
712     //just replace the endtime of last event
713     //CkPrintf("addUserSuppliedBracketNote: for event %d\n", lastEvent);
714     pool[numEntries].endTime = et;
715   }else{
716     new (&pool[numEntries++])
717       LogEntry(bt, et, USER_SUPPLIED_BRACKETED_NOTE, note, eventID);
718   }
719 #else
720   new (&pool[numEntries++])
721     LogEntry(bt, et, USER_SUPPLIED_BRACKETED_NOTE, note, eventID);
722 #endif
723   if(poolSize == numEntries){
724     flushLogBuffer();
725   }
726 #endif  
727 }
728
729
730 /* **CW** Not sure if this is the right thing to do. Feels more like
731    a hack than a solution to Sameer's request to add the destination
732    processor information to multicasts and broadcasts.
733
734    In the unlikely event this method is used for Broadcasts as well,
735    pelist == NULL will be used to indicate a global broadcast with 
736    num PEs.
737 */
738 void LogPool::addCreationMulticast(UShort mIdx, UShort eIdx, double time,
739                                    int event, int pe, int ml, CmiObjId *id,
740                                    double recvT, int numPe, int *pelist)
741 {
742   new (&pool[numEntries++])
743     LogEntry(time, mIdx, eIdx, event, pe, ml, id, recvT, numPe, pelist);
744   if(poolSize==numEntries) {
745     flushLogBuffer();
746   }
747 }
748
749 void LogPool::postProcessLog()
750 {
751 #if CMK_BLUEGENE_CHARM
752   bgUpdateProj(1);   // event type
753 #endif
754 }
755
756 void LogPool::modLastEntryTimestamp(double ts)
757 {
758   pool[numEntries-1].time = ts;
759   //pool[numEntries-1].cputime = ts;
760 }
761
762 // /** Constructor for a multicast log entry */
763 // 
764 //  THIS WAS MOVED TO trace-projections.h with the other constructors
765 // 
766 // LogEntry::LogEntry(double tm, unsigned short m, unsigned short e, int ev, int p,
767 //           int ml, CmiObjId *d, double rt, int numPe, int *pelist) 
768 // {
769 //     type = CREATION_MULTICAST; mIdx = m; eIdx = e; event = ev; pe = p; time = tm; msglen = ml;
770 //     if (d) id = *d; else {id.id[0]=id.id[1]=id.id[2]=id.id[3]=-1; };
771 //     recvTime = rt; 
772 //     numpes = numPe;
773 //     userSuppliedNote = NULL;
774 //     if (pelist != NULL) {
775 //      pes = new int[numPe];
776 //      for (int i=0; i<numPe; i++) {
777 //        pes[i] = pelist[i];
778 //      }
779 //     } else {
780 //      pes= NULL;
781 //     }
782 // }
783
784 //void LogEntry::addPapi(LONG_LONG_PAPI *papiVals)
785 //{
786 //#if CMK_HAS_COUNTER_PAPI
787 //   memcpy(papiValues, papiVals, sizeof(LONG_LONG_PAPI)*NUMPAPIEVENTS);
788 //#endif
789 //}
790
791
792
793 void LogEntry::pup(PUP::er &p)
794 {
795   int i;
796   CMK_TYPEDEF_UINT8 itime, iEndTime, irecvtime, icputime;
797   char ret = '\n';
798
799   p|type;
800   if (p.isPacking()) itime = (CMK_TYPEDEF_UINT8)(1.0e6*time);
801   if (p.isPacking()) iEndTime = (CMK_TYPEDEF_UINT8)(1.0e6*endTime);
802
803   switch (type) {
804     case USER_EVENT:
805     case USER_EVENT_PAIR:
806       p|mIdx; p|itime; p|event; p|pe;
807       break;
808     case BEGIN_IDLE:
809     case END_IDLE:
810     case BEGIN_PACK:
811     case END_PACK:
812     case BEGIN_UNPACK:
813     case END_UNPACK:
814       p|itime; p|pe; 
815       break;
816     case BEGIN_PROCESSING:
817       if (p.isPacking()) {
818         irecvtime = (CMK_TYPEDEF_UINT8)(recvTime==-1?-1:1.0e6*recvTime);
819         icputime = (CMK_TYPEDEF_UINT8)(1.0e6*cputime);
820       }
821       p|mIdx; p|eIdx; p|itime; p|event; p|pe; 
822       p|msglen; p|irecvtime; 
823       p|id.id[0]; p|id.id[1]; p|id.id[2]; p|id.id[3];
824       p|icputime;
825 #if CMK_HAS_COUNTER_PAPI
826       //p|numPapiEvents;
827       for (i=0; i<NUMPAPIEVENTS; i++) {
828         // not yet!!!
829         //      p|papiIDs[i]; 
830         p|papiValues[i];
831         
832       }
833 #else
834       //p|numPapiEvents;     // non papi version has value 0
835 #endif
836       if (p.isUnpacking()) {
837         recvTime = irecvtime/1.0e6;
838         cputime = icputime/1.0e6;
839       }
840       break;
841     case END_PROCESSING:
842       if (p.isPacking()) icputime = (CMK_TYPEDEF_UINT8)(1.0e6*cputime);
843       p|mIdx; p|eIdx; p|itime; p|event; p|pe; p|msglen; p|icputime;
844 #if CMK_HAS_COUNTER_PAPI
845       //p|numPapiEvents;
846       for (i=0; i<NUMPAPIEVENTS; i++) {
847         // not yet!!!
848         //      p|papiIDs[i];
849         p|papiValues[i];
850       }
851 #else
852       //p|numPapiEvents;  // non papi version has value 0
853 #endif
854       if (p.isUnpacking()) cputime = icputime/1.0e6;
855       break;
856     case USER_SUPPLIED:
857           p|userSuppliedData;
858           p|itime;
859         break;
860     case USER_SUPPLIED_NOTE:
861           p|itime;
862           int length;
863           if (p.isPacking()) length = strlen(userSuppliedNote);
864           p | length;
865           char space;
866           space = ' ';
867           p | space;
868           if (p.isUnpacking()) {
869             userSuppliedNote = new char[length+1];
870             userSuppliedNote[length] = '\0';
871           }
872           PUParray(p,userSuppliedNote, length);
873           break;
874     case USER_SUPPLIED_BRACKETED_NOTE:
875       //CkPrintf("Writting out a USER_SUPPLIED_BRACKETED_NOTE\n");
876           p|itime;
877           p|iEndTime;
878           p|event;
879           int length2;
880           if (p.isPacking()) length2 = strlen(userSuppliedNote);
881           p | length2;
882           char space2;
883           space2 = ' ';
884           p | space2;
885           if (p.isUnpacking()) {
886             userSuppliedNote = new char[length+1];
887             userSuppliedNote[length] = '\0';
888           }
889           PUParray(p,userSuppliedNote, length2);
890           break;
891     case MEMORY_USAGE_CURRENT:
892       p | memUsage;
893       p | itime;
894         break;
895     case CREATION:
896       if (p.isPacking()) irecvtime = (CMK_TYPEDEF_UINT8)(1.0e6*recvTime);
897       p|mIdx; p|eIdx; p|itime;
898       p|event; p|pe; p|msglen; p|irecvtime;
899       if (p.isUnpacking()) recvTime = irecvtime/1.0e6;
900       break;
901     case CREATION_BCAST:
902       if (p.isPacking()) irecvtime = (CMK_TYPEDEF_UINT8)(1.0e6*recvTime);
903       p|mIdx; p|eIdx; p|itime;
904       p|event; p|pe; p|msglen; p|irecvtime; p|numpes;
905       if (p.isUnpacking()) recvTime = irecvtime/1.0e6;
906       break;
907     case CREATION_MULTICAST:
908       if (p.isPacking()) irecvtime = (CMK_TYPEDEF_UINT8)(1.0e6*recvTime);
909       p|mIdx; p|eIdx; p|itime;
910       p|event; p|pe; p|msglen; p|irecvtime; p|numpes;
911       if (p.isUnpacking()) pes = numpes?new int[numpes]:NULL;
912       for (i=0; i<numpes; i++) p|pes[i];
913       if (p.isUnpacking()) recvTime = irecvtime/1.0e6;
914       break;
915     case MESSAGE_RECV:
916       p|mIdx; p|eIdx; p|itime; p|event; p|pe; p|msglen;
917       break;
918
919     case ENQUEUE:
920     case DEQUEUE:
921       p|mIdx; p|itime; p|event; p|pe;
922       break;
923
924     case BEGIN_INTERRUPT:
925     case END_INTERRUPT:
926       p|itime; p|event; p|pe;
927       break;
928
929       // **CW** absolute timestamps are used here to support a quick
930       // way of determining the total time of a run in projections
931       // visualization.
932     case BEGIN_COMPUTATION:
933     case END_COMPUTATION:
934     case BEGIN_TRACE:
935     case END_TRACE:
936       p|itime;
937       break;
938     case BEGIN_FUNC:
939         p | itime;
940         p | mIdx;
941         p | event;
942         if(!p.isUnpacking()){
943                 p(fName,flen-1);
944         }
945         break;
946     case END_FUNC:
947         p | itime;
948         p | mIdx;
949         break;
950     case END_PHASE:
951       p|eIdx; // FIXME: actually the phase ID
952       p|itime;
953       break;
954     default:
955       CmiError("***Internal Error*** Wierd Event %d.\n", type);
956       break;
957   }
958   if (p.isUnpacking()) time = itime/1.0e6;
959   p|ret;
960 }
961
962 TraceProjections::TraceProjections(char **argv): 
963   curevent(0), inEntry(0), computationStarted(0), 
964         converseExit(0), endTime(0.0), traceNestedEvents(0),
965         currentPhaseID(0), lastPhaseEvent(NULL)
966 {
967   //  CkPrintf("Trace projections dummy constructor called on %d\n",CkMyPe());
968
969   if (CkpvAccess(traceOnPe) == 0) return;
970
971   CtvInitialize(int,curThreadEvent);
972   CkpvInitialize(CmiInt8, CtrLogBufSize);
973   CkpvAccess(CtrLogBufSize) = DefaultLogBufSize;
974   CtvAccess(curThreadEvent)=0;
975   if (CmiGetArgLongDesc(argv,"+logsize",&CkpvAccess(CtrLogBufSize), 
976                        "Log entries to buffer per I/O")) {
977     if (CkMyPe() == 0) {
978       CmiPrintf("Trace: logsize: %ld\n", CkpvAccess(CtrLogBufSize));
979     }
980   }
981   checknested = 
982     CmiGetArgFlagDesc(argv,"+checknested",
983                       "check projections nest begin end execute events");
984   traceNestedEvents = 
985     CmiGetArgFlagDesc(argv,"+tracenested",
986               "trace projections nest begin/end execute events");
987   int binary = 
988     CmiGetArgFlagDesc(argv,"+binary-trace",
989                       "Write log files in binary format");
990
991   CmiInt8 nSubdirs = 0;
992   CmiGetArgLongDesc(argv,"+trace-subdirs", &nSubdirs, "Number of subdirectories into which traces will be written");
993
994
995 #if CMK_PROJECTIONS_USE_ZLIB
996   int compressed = CmiGetArgFlagDesc(argv,"+gz-trace","Write log files pre-compressed with gzip");
997 #else
998   // consume the flag so there's no confusing
999   CmiGetArgFlagDesc(argv,"+gz-trace",
1000                     "Write log files pre-compressed with gzip");
1001   if(CkMyPe() == 0) CkPrintf("Warning> gz-trace is not supported on this machine!\n");
1002 #endif
1003
1004   // **CW** default to non delta log encoding. The user may choose to do
1005   // create both logs (for debugging) or just the old log timestamping
1006   // (for compatibility).
1007   // Generating just the non delta log takes precedence over generating
1008   // both logs (if both arguments appear on the command line).
1009
1010   // switch to OLD log format until everything works // Gengbin
1011   nonDeltaLog = 1;
1012   deltaLog = 0;
1013   deltaLog = CmiGetArgFlagDesc(argv, "+logDelta",
1014                                   "Generate Delta encoded and simple timestamped log files");
1015
1016   _logPool = new LogPool(CkpvAccess(traceRoot));
1017   _logPool->setNumSubdirs(nSubdirs);
1018   _logPool->setBinary(binary);
1019 #if CMK_PROJECTIONS_USE_ZLIB
1020   _logPool->setCompressed(compressed);
1021 #endif
1022   if (CkMyPe() == 0) {
1023     _logPool->createSts();
1024     _logPool->createRC();
1025   }
1026   funcCount=1;
1027
1028 #if CMK_HAS_COUNTER_PAPI
1029   // We initialize and create the event sets for use with PAPI here.
1030   int papiRetValue;
1031   if(CkMyRank()==0){
1032     papiRetValue = PAPI_library_init(PAPI_VER_CURRENT);
1033     if (papiRetValue != PAPI_VER_CURRENT) {
1034       CmiAbort("PAPI Library initialization failure!\n");
1035     }
1036 #if CMK_SMP
1037     if(PAPI_thread_init(pthread_self) != PAPI_OK){
1038       CmiAbort("PAPI could not be initialized in SMP mode!\n");
1039     }
1040 #endif
1041   }
1042
1043 #if CMK_SMP
1044   //PAPI_thread_init has to finish before calling PAPI_create_eventset
1045   CmiNodeAllBarrier();
1046 #endif
1047   // PAPI 3 mandates the initialization of the set to PAPI_NULL
1048   papiEventSet = PAPI_NULL; 
1049   if (PAPI_create_eventset(&papiEventSet) != PAPI_OK) {
1050     CmiAbort("PAPI failed to create event set!\n");
1051   }
1052   papiRetValue = PAPI_add_events(papiEventSet, papiEvents, NUMPAPIEVENTS);
1053   if (papiRetValue != PAPI_OK) {
1054     if (papiRetValue == PAPI_ECNFLCT) {
1055       CmiAbort("PAPI events conflict! Please re-assign event types!\n");
1056     } else {
1057       CmiAbort("PAPI failed to add designated events!\n");
1058     }
1059   }
1060   memset(papiValues, 0, NUMPAPIEVENTS*sizeof(LONG_LONG_PAPI));
1061 #endif
1062 }
1063
1064 int TraceProjections::traceRegisterUserEvent(const char* evt, int e)
1065 {
1066   OPTIMIZED_VERSION
1067   CkAssert(e==-1 || e>=0);
1068   CkAssert(evt != NULL);
1069   int event;
1070   int biggest = -1;
1071   for (int i=0; i<CkpvAccess(usrEvents)->length(); i++) {
1072     int cur = (*CkpvAccess(usrEvents))[i]->e;
1073     if (cur == e) {
1074       //CmiPrintf("%s %s\n", (*CkpvAccess(usrEvents))[i]->str, evt);
1075       if (strcmp((*CkpvAccess(usrEvents))[i]->str, evt) == 0) 
1076         return e;
1077       else
1078         CmiAbort("UserEvent double registered!");
1079     }
1080     if (cur > biggest) biggest = cur;
1081   }
1082   // if no user events have so far been registered. biggest will be -1
1083   // and hence newly assigned event numbers will begin from 0.
1084   if (e==-1) event = biggest+1;  // automatically assign new event number
1085   else event = e;
1086   CkpvAccess(usrEvents)->push_back(new UsrEvent(event,(char *)evt));
1087   return event;
1088 }
1089
1090 void TraceProjections::traceClearEps(void)
1091 {
1092   // In trace-summary, this zeros out the EP bins, to eliminate noise
1093   // from startup.  Here, this isn't useful, since we can do that in
1094   // post-processing
1095 }
1096
1097 void TraceProjections::traceWriteSts(void)
1098 {
1099   if(CkMyPe()==0)
1100     _logPool->writeSts(this);
1101 }
1102
1103 /** 
1104  * **IMPT NOTES**:
1105  *
1106  * This is called when Converse closes during ConverseCommonExit().
1107  * **FIXME**(?) - is this also exposed as a tracing-framework API call?
1108  *
1109  * Some programs bypass CkExit() (like NAMD, which eventually calls
1110  * ConverseExit()), modules like traces will have to pretend to shutdown
1111  * as if CkExit() was called but at the same time avoid making
1112  * subsequent CkExit() calls (which is usually required for allowing
1113  * other modules to shutdown).
1114  *
1115  * Note that we can only get here if CkExit() was not called, since the
1116  * trace module will un-register itself from TraceArray if it did.
1117  *
1118  */
1119 void TraceProjections::traceClose(void)
1120 {
1121 #ifdef PROJ_ANALYSIS
1122   // CkPrintf("CkExit was not called on shutdown on [%d]\n", CkMyPe());
1123
1124   // sets the flag that tells the code not to make the CkExit call later
1125   converseExit = 1;
1126   if (CkMyPe() == 0) {
1127     CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1128     bocProxy.traceProjectionsParallelShutdown(-1);
1129   }
1130   if(CkMyRank() == CkMyNodeSize()){ //communication thread
1131     CkpvAccess(_trace)->endComputation();
1132     delete _logPool;              // will write
1133     // remove myself from traceArray so that no tracing will be called.
1134     CkpvAccess(_traces)->removeTrace(this);
1135   }
1136 #else
1137   // we've already deleted the logpool, so multiple calls to traceClose
1138   // are tolerated.
1139   if (_logPool == NULL) {
1140     return;
1141   }
1142   if(CkMyPe()==0){
1143     _logPool->writeSts(this);
1144   }
1145   CkpvAccess(_trace)->endComputation();
1146   delete _logPool;              // will write
1147   // remove myself from traceArray so that no tracing will be called.
1148   CkpvAccess(_traces)->removeTrace(this);
1149 #endif
1150 }
1151
1152 /**
1153  *  **IMPT NOTES**:
1154  *
1155  *  This is meant to be called internally by the tracing framework.
1156  *
1157  */
1158 void TraceProjections::closeTrace() {
1159   //  CkPrintf("Close Trace called on [%d]\n", CkMyPe());
1160   if (CkMyPe() == 0) {
1161     // CkPrintf("Pe 0 will now write sts and projrc files\n");
1162     _logPool->writeSts(this);
1163     _logPool->writeRC();
1164     // CkPrintf("Pe 0 has now written sts and projrc files\n");
1165   }
1166   delete _logPool;       // will write logs to file
1167 }
1168
1169 #if CMK_SMP_TRACE_COMMTHREAD
1170 void TraceProjections::traceBeginOnCommThread()
1171 {
1172   if (!computationStarted) return;
1173   _logPool->add(BEGIN_TRACE, 0, 0, TraceTimer(), curevent++, CmiNumPes()+CmiMyNode());
1174 }
1175
1176 void TraceProjections::traceEndOnCommThread()
1177 {
1178   _logPool->add(END_TRACE, 0, 0, TraceTimer(), curevent++, CmiNumPes()+CmiMyNode());
1179 }
1180 #endif
1181
1182 void TraceProjections::traceBegin(void)
1183 {
1184   if (!computationStarted) return;
1185   _logPool->add(BEGIN_TRACE, 0, 0, TraceTimer(), curevent++, CkMyPe());
1186 }
1187
1188 void TraceProjections::traceEnd(void)
1189 {
1190   _logPool->add(END_TRACE, 0, 0, TraceTimer(), curevent++, CkMyPe());
1191 }
1192
1193 void TraceProjections::userEvent(int e)
1194 {
1195   if (!computationStarted) return;
1196   _logPool->add(USER_EVENT, e, 0, TraceTimer(),curevent++,CkMyPe());
1197 }
1198
1199 void TraceProjections::userBracketEvent(int e, double bt, double et)
1200 {
1201   if (!computationStarted) return;
1202   // two events record Begin/End of event e.
1203   _logPool->add(USER_EVENT_PAIR, e, 0, TraceTimer(bt), curevent, CkMyPe());
1204   _logPool->add(USER_EVENT_PAIR, e, 0, TraceTimer(et), curevent++, CkMyPe());
1205 }
1206
1207 void TraceProjections::userSuppliedData(int d)
1208 {
1209   if (!computationStarted) return;
1210   _logPool->addUserSupplied(d);
1211 }
1212
1213 void TraceProjections::userSuppliedNote(char *note)
1214 {
1215   if (!computationStarted) return;
1216   _logPool->addUserSuppliedNote(note);
1217 }
1218
1219
1220 void TraceProjections::userSuppliedBracketedNote(char *note, int eventID, double bt, double et)
1221 {
1222   if (!computationStarted) return;
1223   _logPool->addUserSuppliedBracketedNote(note,  eventID,  bt, et);
1224 }
1225
1226 void TraceProjections::memoryUsage(double m)
1227 {
1228   if (!computationStarted) return;
1229   _logPool->addMemoryUsage(MEMORY_USAGE_CURRENT, TraceTimer(), m );
1230   
1231 }
1232
1233
1234 void TraceProjections::creation(envelope *e, int ep, int num)
1235 {
1236   double curTime = TraceTimer();
1237   if (e == 0) {
1238     CtvAccess(curThreadEvent) = curevent;
1239     _logPool->add(CREATION, ForChareMsg, ep, curTime,
1240                   curevent++, CkMyPe(), 0, NULL, 0, 0.0);
1241   } else {
1242     int type=e->getMsgtype();
1243     e->setEvent(curevent);
1244     if (num > 1) {
1245       _logPool->add(CREATION_BCAST, type, ep, curTime,
1246                     curevent++, CkMyPe(), e->getTotalsize(), 
1247                     NULL, 0, 0.0, num);
1248     } else {
1249       _logPool->add(CREATION, type, ep, curTime,
1250                     curevent++, CkMyPe(), e->getTotalsize(), 
1251                     NULL, 0, 0.0);
1252     }
1253   }
1254 }
1255
1256 void TraceProjections::creation(char *msg)
1257 {
1258 #if CMK_SMP_TRACE_COMMTHREAD
1259         //This function is only called from a comm thread
1260         //in SMP mode. So, it is possible the msg is not
1261         //a charm msg that contains an envelope, ep idx.
1262         envelope *e = (envelope *)msg;
1263         int ep = e->getEpIdx();
1264         int num = _entryTable.size();
1265         if(ep<num && ep>=0 && _entryTable[ep]->traceEnabled)
1266                 creation(e, ep, 1);
1267 #endif
1268 }
1269
1270
1271 /* **CW** Non-disruptive attempt to add destination PE knowledge to
1272    Communication Library-specific Multicasts via new event 
1273    CREATION_MULTICAST.
1274 */
1275
1276 void TraceProjections::creationMulticast(envelope *e, int ep, int num,
1277                                          int *pelist)
1278 {
1279   double curTime = TraceTimer();
1280   if (e==0) {
1281     CtvAccess(curThreadEvent)=curevent;
1282     _logPool->addCreationMulticast(ForChareMsg, ep, curTime, curevent++,
1283                                    CkMyPe(), 0, 0, 0.0, num, pelist);
1284   } else {
1285     int type=e->getMsgtype();
1286     e->setEvent(curevent);
1287     _logPool->addCreationMulticast(type, ep, curTime, curevent++, CkMyPe(),
1288                                    e->getTotalsize(), 0, 0.0, num, pelist);
1289   }
1290 }
1291
1292 void TraceProjections::creationDone(int num)
1293 {
1294   // modified the creation done time of the last num log entries
1295   // FIXME: totally a hack
1296   double curTime = TraceTimer();
1297   int idx = _logPool->numEntries-1;
1298   while (idx >=0 && num >0 ) {
1299     LogEntry &log = _logPool->pool[idx];
1300     if ((log.type == CREATION) ||
1301         (log.type == CREATION_BCAST) ||
1302         (log.type == CREATION_MULTICAST)) {
1303       log.recvTime = curTime - log.time;
1304       num --;
1305     }
1306     idx--;
1307   }
1308 }
1309
1310 void TraceProjections::beginExecute(CmiObjId *tid)
1311 {
1312 #if CMK_HAS_COUNTER_PAPI
1313   if (PAPI_read(papiEventSet, papiValues) != PAPI_OK) {
1314     CmiAbort("PAPI failed to read at begin execute!\n");
1315   }
1316 #endif
1317   if (checknested && inEntry) CmiAbort("Nested Begin Execute!\n");
1318   execEvent = CtvAccess(curThreadEvent);
1319   execEp = (-1);
1320   _logPool->add(BEGIN_PROCESSING,ForChareMsg,_threadEP,TraceTimer(),
1321                 execEvent,CkMyPe(), 0, tid);
1322 #if CMK_HAS_COUNTER_PAPI
1323   _logPool->addPapi(papiValues);
1324 #endif
1325   inEntry = 1;
1326 }
1327
1328 void TraceProjections::beginExecute(envelope *e)
1329 {
1330   if(e==0) {
1331 #if CMK_HAS_COUNTER_PAPI
1332     if (PAPI_read(papiEventSet, papiValues) != PAPI_OK) {
1333       CmiAbort("PAPI failed to read at begin execute!\n");
1334     }
1335 #endif
1336     if (checknested && inEntry) CmiAbort("Nested Begin Execute!\n");
1337     execEvent = CtvAccess(curThreadEvent);
1338     execEp = (-1);
1339     _logPool->add(BEGIN_PROCESSING,ForChareMsg,_threadEP,TraceTimer(),
1340                   execEvent,CkMyPe(), 0, NULL, 0.0, TraceCpuTimer());
1341 #if CMK_HAS_COUNTER_PAPI
1342     _logPool->addPapi(papiValues);
1343 #endif
1344     inEntry = 1;
1345   } else {
1346     beginExecute(e->getEvent(),e->getMsgtype(),e->getEpIdx(),
1347                  e->getSrcPe(),e->getTotalsize());
1348   }
1349 }
1350
1351 void TraceProjections::beginExecute(char *msg){
1352 #if CMK_SMP_TRACE_COMMTHREAD
1353         //This function is called from comm thread in SMP mode
1354     envelope *e = (envelope *)msg;
1355     int num = _entryTable.size();
1356     int ep = e->getEpIdx();
1357     if(ep<0 || ep>=num) return;
1358     if(_entryTable[ep]->traceEnabled)
1359                 beginExecute(e);
1360 #endif
1361 }
1362
1363 void TraceProjections::beginExecute(int event, int msgType, int ep, int srcPe,
1364                                     int mlen, CmiObjId *idx)
1365 {
1366   if (traceNestedEvents) {
1367     if (! nestedEvents.isEmpty()) {
1368       endExecuteLocal();
1369     }
1370     nestedEvents.enq(NestedEvent(event, msgType, ep, srcPe, mlen, idx));
1371   }
1372   beginExecuteLocal(event, msgType, ep, srcPe, mlen, idx);
1373 }
1374
1375 void TraceProjections::changeLastEntryTimestamp(double ts)
1376 {
1377   _logPool->modLastEntryTimestamp(ts);
1378 }
1379
1380 void TraceProjections::beginExecuteLocal(int event, int msgType, int ep, int srcPe,
1381                                     int mlen, CmiObjId *idx)
1382 {
1383 #if CMK_HAS_COUNTER_PAPI
1384   if (PAPI_read(papiEventSet, papiValues) != PAPI_OK) {
1385     CmiAbort("PAPI failed to read at begin execute!\n");
1386   }
1387 #endif
1388   if (checknested && inEntry) CmiAbort("Nested Begin Execute!\n");
1389   execEvent=event;
1390   execEp=ep;
1391   execPe=srcPe;
1392   _logPool->add(BEGIN_PROCESSING,msgType,ep,TraceTimer(),event,
1393                 srcPe, mlen, idx, 0.0, TraceCpuTimer());
1394 #if CMK_HAS_COUNTER_PAPI
1395   _logPool->addPapi(papiValues);
1396 #endif
1397   inEntry = 1;
1398 }
1399
1400 void TraceProjections::endExecute(void)
1401 {
1402   if (traceNestedEvents) nestedEvents.deq();
1403   endExecuteLocal();
1404   if (traceNestedEvents) {
1405     if (! nestedEvents.isEmpty()) {
1406       NestedEvent &ne = nestedEvents.peek();
1407       beginExecuteLocal(ne.event, ne.msgType, ne.ep, ne.srcPe, ne.ml, ne.idx);
1408     }
1409   }
1410 }
1411
1412 void TraceProjections::endExecute(char *msg)
1413 {
1414 #if CMK_SMP_TRACE_COMMTHREAD
1415         //This function is called from comm thread in SMP mode
1416     envelope *e = (envelope *)msg;
1417     int num = _entryTable.size();
1418     int ep = e->getEpIdx();
1419     if(ep<0 || ep>=num) return;
1420     if(_entryTable[ep]->traceEnabled)
1421                 endExecute();
1422 #endif  
1423 }
1424
1425 void TraceProjections::endExecuteLocal(void)
1426 {
1427 #if CMK_HAS_COUNTER_PAPI
1428   if (PAPI_read(papiEventSet, papiValues) != PAPI_OK) {
1429     CmiAbort("PAPI failed to read at end execute!\n");
1430   }
1431 #endif
1432   if (checknested && !inEntry) CmiAbort("Nested EndExecute!\n");
1433   double cputime = TraceCpuTimer();
1434   if(execEp == (-1)) {
1435     _logPool->add(END_PROCESSING, 0, _threadEP, TraceTimer(),
1436                   execEvent, CkMyPe(), 0, NULL, 0.0, cputime);
1437   } else {
1438     _logPool->add(END_PROCESSING, 0, execEp, TraceTimer(),
1439                   execEvent, execPe, 0, NULL, 0.0, cputime);
1440   }
1441 #if CMK_HAS_COUNTER_PAPI
1442   _logPool->addPapi(papiValues);
1443 #endif
1444   inEntry = 0;
1445 }
1446
1447 void TraceProjections::messageRecv(char *env, int pe)
1448 {
1449 #if 0
1450   envelope *e = (envelope *)env;
1451   int msgType = e->getMsgtype();
1452   int ep = e->getEpIdx();
1453 #if 0
1454   if (msgType==NewChareMsg || msgType==NewVChareMsg
1455           || msgType==ForChareMsg || msgType==ForVidMsg
1456           || msgType==BocInitMsg || msgType==NodeBocInitMsg
1457           || msgType==ForBocMsg || msgType==ForNodeBocMsg)
1458     ep = e->getEpIdx();
1459   else
1460     ep = _threadEP;
1461 #endif
1462   _logPool->add(MESSAGE_RECV, msgType, ep, TraceTimer(),
1463                 curevent++, e->getSrcPe(), e->getTotalsize());
1464 #endif
1465 }
1466
1467 void TraceProjections::beginIdle(double curWallTime)
1468 {
1469   _logPool->add(BEGIN_IDLE, 0, 0, TraceTimer(curWallTime), 0, CkMyPe());
1470 }
1471
1472 void TraceProjections::endIdle(double curWallTime)
1473 {
1474   _logPool->add(END_IDLE, 0, 0, TraceTimer(curWallTime), 0, CkMyPe());
1475 }
1476
1477 void TraceProjections::beginPack(void)
1478 {
1479   _logPool->add(BEGIN_PACK, 0, 0, TraceTimer(), 0, CkMyPe());
1480 }
1481
1482 void TraceProjections::endPack(void)
1483 {
1484   _logPool->add(END_PACK, 0, 0, TraceTimer(), 0, CkMyPe());
1485 }
1486
1487 void TraceProjections::beginUnpack(void)
1488 {
1489   _logPool->add(BEGIN_UNPACK, 0, 0, TraceTimer(), 0, CkMyPe());
1490 }
1491
1492 void TraceProjections::endUnpack(void)
1493 {
1494   _logPool->add(END_UNPACK, 0, 0, TraceTimer(), 0, CkMyPe());
1495 }
1496
1497 void TraceProjections::enqueue(envelope *) {}
1498
1499 void TraceProjections::dequeue(envelope *) {}
1500
1501 void TraceProjections::beginComputation(void)
1502 {
1503   computationStarted = 1;
1504
1505   // Executes the callback function provided by the machine
1506   // layer. This is the proper method to register user events in a
1507   // machine layer because projections is a charm++ module.
1508   if (CkpvAccess(traceOnPe) != 0) {
1509     void (*ptr)() = registerMachineUserEvents();
1510     if (ptr != NULL) {
1511       ptr();
1512     }
1513   }
1514 //  CkpvAccess(traceInitTime) = TRACE_TIMER();
1515 //  CkpvAccess(traceInitCpuTime) = TRACE_CPUTIMER();
1516   _logPool->add(BEGIN_COMPUTATION, 0, 0, TraceTimer(), -1, -1);
1517 #if CMK_HAS_COUNTER_PAPI
1518   // we start the counters here
1519   if (PAPI_start(papiEventSet) != PAPI_OK) {
1520     CmiAbort("PAPI failed to start designated counters!\n");
1521   }
1522 #endif
1523 }
1524
1525 void TraceProjections::endComputation(void)
1526 {
1527 #if CMK_HAS_COUNTER_PAPI
1528   // we stop the counters here. A silent failure is alright since we
1529   // are already at the end of the program.
1530   if (PAPI_stop(papiEventSet, papiValues) != PAPI_OK) {
1531     CkPrintf("Warning: PAPI failed to stop correctly!\n");
1532   }
1533   // NOTE: We should not do a complete close of PAPI until after the
1534   // sts writer is done.
1535 #endif
1536   endTime = TraceTimer();
1537   _logPool->add(END_COMPUTATION, 0, 0, endTime, -1, -1);
1538   /*
1539   CkPrintf("End Computation [%d] records time as %lf\n", CkMyPe(), 
1540            endTime*1e06);
1541   */
1542 }
1543
1544 int TraceProjections::idxRegistered(int idx)
1545 {
1546     int idxVecLen = idxVec.size();
1547     for(int i=0; i<idxVecLen; i++)
1548     {
1549         if(idx == idxVec[i])
1550             return 1;
1551     }
1552     return 0;
1553 }
1554
1555 void TraceProjections::regFunc(const char *name, int &idx, int idxSpecifiedByUser){
1556     StrKey k((char*)name,strlen(name));
1557     int num = funcHashtable.get(k);
1558     
1559     if(num!=0) {
1560         return;
1561         //as for mpi programs, the same function may be registered for several times
1562         //CmiError("\"%s has been already registered! Please change the name!\"\n", name);
1563     }
1564     
1565     int isIdxExisting=0;
1566     if(idxSpecifiedByUser)
1567         isIdxExisting=idxRegistered(idx);
1568     if(isIdxExisting){
1569         return;
1570         //same reason with num!=0
1571         //CmiError("The identifier %d for the trace function has been already registered!", idx);
1572     }
1573
1574     if(idxSpecifiedByUser) {
1575         char *st = new char[strlen(name)+1];
1576         memcpy(st,name,strlen(name)+1);
1577         StrKey *newKey = new StrKey(st,strlen(st));
1578         int &ref = funcHashtable.put(*newKey);
1579         ref=idx;
1580         funcCount++;
1581         idxVec.push_back(idx);  
1582     } else {
1583         char *st = new char[strlen(name)+1];
1584         memcpy(st,name,strlen(name)+1);
1585         StrKey *newKey = new StrKey(st,strlen(st));
1586         int &ref = funcHashtable.put(*newKey);
1587         ref=funcCount;
1588         num = funcCount;
1589         funcCount++;
1590         idx = num;
1591         idxVec.push_back(idx);
1592     }
1593 }
1594
1595 void TraceProjections::beginFunc(char *name,char *file,int line){
1596         StrKey k(name,strlen(name));    
1597         unsigned short  num = (unsigned short)funcHashtable.get(k);
1598         beginFunc(num,file,line);
1599 }
1600
1601 void TraceProjections::beginFunc(int idx,char *file,int line){
1602         if(idx <= 0){
1603                 CmiError("Unregistered function id %d being used in %s:%d \n",idx,file,line);
1604         }       
1605         _logPool->add(BEGIN_FUNC,TraceTimer(),idx,line,file);
1606 }
1607
1608 void TraceProjections::endFunc(char *name){
1609         StrKey k(name,strlen(name));    
1610         int num = funcHashtable.get(k);
1611         endFunc(num);   
1612 }
1613
1614 void TraceProjections::endFunc(int num){
1615         if(num <= 0){
1616                 printf("endFunc without start :O\n");
1617         }
1618         _logPool->add(END_FUNC,TraceTimer(),num,0,NULL);
1619 }
1620
1621 // specialized PUP:ers for handling trace projections logs
1622 void toProjectionsFile::bytes(void *p,int n,size_t itemSize,dataType t)
1623 {
1624   for (int i=0;i<n;i++) 
1625     switch(t) {
1626     case Tchar: CheckAndFPrintF(f,"%c",((char *)p)[i]); break;
1627     case Tuchar:
1628     case Tbyte: CheckAndFPrintF(f,"%d",((unsigned char *)p)[i]); break;
1629     case Tshort: CheckAndFPrintF(f," %d",((short *)p)[i]); break;
1630     case Tushort: CheckAndFPrintF(f," %u",((unsigned short *)p)[i]); break;
1631     case Tint: CheckAndFPrintF(f," %d",((int *)p)[i]); break;
1632     case Tuint: CheckAndFPrintF(f," %u",((unsigned int *)p)[i]); break;
1633     case Tlong: CheckAndFPrintF(f," %ld",((long *)p)[i]); break;
1634     case Tulong: CheckAndFPrintF(f," %lu",((unsigned long *)p)[i]); break;
1635     case Tfloat: CheckAndFPrintF(f," %.7g",((float *)p)[i]); break;
1636     case Tdouble: CheckAndFPrintF(f," %.15g",((double *)p)[i]); break;
1637 #ifdef CMK_PUP_LONG_LONG
1638     case Tlonglong: CheckAndFPrintF(f," %lld",((CMK_TYPEDEF_INT8 *)p)[i]); break;
1639     case Tulonglong: CheckAndFPrintF(f," %llu",((CMK_TYPEDEF_UINT8 *)p)[i]); break;
1640 #endif
1641     default: CmiAbort("Unrecognized pup type code!");
1642     };
1643 }
1644
1645 void fromProjectionsFile::bytes(void *p,int n,size_t itemSize,dataType t)
1646 {
1647   for (int i=0;i<n;i++) 
1648     switch(t) {
1649     case Tchar: { 
1650       char c = fgetc(f);
1651       if (c==EOF)
1652         parseError("Could not match character");
1653       else
1654         ((char *)p)[i] = c;
1655       break;
1656     }
1657     case Tuchar:
1658     case Tbyte: ((unsigned char *)p)[i]=(unsigned char)readInt("%d"); break;
1659     case Tshort:((short *)p)[i]=(short)readInt(); break;
1660     case Tushort: ((unsigned short *)p)[i]=(unsigned short)readUint(); break;
1661     case Tint:  ((int *)p)[i]=readInt(); break;
1662     case Tuint: ((unsigned int *)p)[i]=readUint(); break;
1663     case Tlong: ((long *)p)[i]=readInt(); break;
1664     case Tulong:((unsigned long *)p)[i]=readUint(); break;
1665     case Tfloat: ((float *)p)[i]=(float)readDouble(); break;
1666     case Tdouble:((double *)p)[i]=readDouble(); break;
1667 #ifdef CMK_PUP_LONG_LONG
1668     case Tlonglong: ((CMK_TYPEDEF_INT8 *)p)[i]=readLongInt(); break;
1669     case Tulonglong: ((CMK_TYPEDEF_UINT8 *)p)[i]=readLongInt(); break;
1670 #endif
1671     default: CmiAbort("Unrecognized pup type code!");
1672     };
1673 }
1674
1675 #if CMK_PROJECTIONS_USE_ZLIB
1676 void toProjectionsGZFile::bytes(void *p,int n,size_t itemSize,dataType t)
1677 {
1678   for (int i=0;i<n;i++) 
1679     switch(t) {
1680     case Tchar: gzprintf(f,"%c",((char *)p)[i]); break;
1681     case Tuchar:
1682     case Tbyte: gzprintf(f,"%d",((unsigned char *)p)[i]); break;
1683     case Tshort: gzprintf(f," %d",((short *)p)[i]); break;
1684     case Tushort: gzprintf(f," %u",((unsigned short *)p)[i]); break;
1685     case Tint: gzprintf(f," %d",((int *)p)[i]); break;
1686     case Tuint: gzprintf(f," %u",((unsigned int *)p)[i]); break;
1687     case Tlong: gzprintf(f," %ld",((long *)p)[i]); break;
1688     case Tulong: gzprintf(f," %lu",((unsigned long *)p)[i]); break;
1689     case Tfloat: gzprintf(f," %.7g",((float *)p)[i]); break;
1690     case Tdouble: gzprintf(f," %.15g",((double *)p)[i]); break;
1691 #ifdef CMK_PUP_LONG_LONG
1692     case Tlonglong: gzprintf(f," %lld",((CMK_TYPEDEF_INT8 *)p)[i]); break;
1693     case Tulonglong: gzprintf(f," %llu",((CMK_TYPEDEF_UINT8 *)p)[i]); break;
1694 #endif
1695     default: CmiAbort("Unrecognized pup type code!");
1696     };
1697 }
1698 #endif
1699
1700 void TraceProjections::endPhase() {
1701   double currentPhaseTime = TraceTimer();
1702   double lastPhaseTime = 0.0;
1703
1704   if (lastPhaseEvent != NULL) {
1705     lastPhaseTime = lastPhaseEvent->time;
1706   } else {
1707     if (_logPool->pool != NULL) {
1708       // assumed to be BEGIN_COMPUTATION
1709       lastPhaseTime = _logPool->pool[0].time;
1710     } else {
1711       CkPrintf("[%d] Warning: End Phase encountered in an empty log. Inserting BEGIN_COMPUTATION event\n", CkMyPe());
1712       _logPool->add(BEGIN_COMPUTATION, 0, 0, currentPhaseTime, -1, -1);
1713       lastPhaseTime = currentPhaseTime;
1714     }
1715   }
1716
1717   /* Insert endPhase event here. */
1718   /* FIXME: Format should be TYPE, PHASE#, TimeStamp, [StartTime] */
1719   /*        We currently "borrow" from the standard add() method. */
1720   /*        It should really be its own add() method.             */
1721   /* NOTE: assignment to lastPhaseEvent is "pre-emptive".         */
1722   lastPhaseEvent = &(_logPool->pool[_logPool->numEntries]);
1723   _logPool->add(END_PHASE, 0, currentPhaseID, currentPhaseTime, -1, CkMyPe());
1724   currentPhaseID++;
1725 }
1726
1727 #ifdef PROJ_ANALYSIS
1728 // ***** FROM HERE, ALL BOC-BASED FUNCTIONALITY IS DEFINED *******
1729
1730
1731 // ***@@@@ REGISTRATION FUNCTIONS/METHODS @@@@***
1732
1733 void registerOutlierReduction() {
1734   outlierReductionType =
1735     CkReduction::addReducer(outlierReduction);
1736   minMaxReductionType =
1737     CkReduction::addReducer(minMaxReduction);
1738 }
1739
1740 /**
1741  * **IMPT NOTES**:
1742  *
1743  * This is the C++ code that is registered to be activated at module
1744  * shutdown. This is called exactly once on processor 0. Module shutdown
1745  * is initiated as a result of a CkExit() call by the application code
1746  * 
1747  * The exit function must ultimately call CkExit() again to
1748  * so that other module exit functions may proceed after this module is
1749  * done.
1750  *
1751  */
1752 // FIXME: WHY extern "C"???
1753 extern "C" void TraceProjectionsExitHandler()
1754 {
1755 #if CMK_TRACE_ENABLED
1756   // CkPrintf("[%d] TraceProjectionsExitHandler called!\n", CkMyPe());
1757   CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1758   bocProxy.traceProjectionsParallelShutdown(CkMyPe());
1759 #else
1760   CkExit();
1761 #endif
1762 }
1763
1764 // This is called once on each processor but the idiom of use appears
1765 // to be to only have processor 0 register the function.
1766 //
1767 // See initnode in trace-projections.ci
1768 void initTraceProjectionsBOC()
1769 {
1770   // CkPrintf("[%d] Trace Projections initialization called!\n", CkMyPe());
1771 #ifdef __BLUEGENE__
1772   if (BgNodeRank() == 0) {
1773 #else
1774     if (CkMyRank() == 0) {
1775 #endif
1776       registerExitFn(TraceProjectionsExitHandler);
1777     }
1778 #if 0
1779   } // this is so indentation does not get messed up
1780 #endif
1781 }
1782
1783 // mainchare for trace-projections BOC-operations. 
1784 // Instantiated at processor 0 and ONLY resides on processor 0 for the 
1785 // rest of its life.
1786 //
1787 // Responsible for:
1788 //   1. Handling commandline arguments
1789 //   2. Creating any objects required for proper BOC operations.
1790 //
1791 TraceProjectionsInit::TraceProjectionsInit(CkArgMsg *msg) {
1792   /** Options for Outlier Analysis */
1793   // defaults. Things will change with support for interactive analysis.
1794   bool findOutliers = false;
1795   bool outlierAutomatic = true;
1796   int numKSeeds = 10; 
1797
1798   int peNumKeep = CkNumPes();  // used as a default
1799   double entryThreshold = 0.0;
1800   bool outlierUsePhases = false;
1801   if (outlierAutomatic) {
1802     CmiGetArgIntDesc(msg->argv, "+outlierNumSeeds", &numKSeeds,
1803                      "Number of cluster seeds to apply at outlier analysis.");
1804     CmiGetArgIntDesc(msg->argv, "+outlierPeNumKeep", 
1805                      &peNumKeep, "Number of Processors to retain data");
1806     CmiGetArgDoubleDesc(msg->argv, "+outlierEpThresh", &entryThreshold,
1807                         "Minimum significance of entry points to be considered for clustering (%).");
1808     findOutliers =
1809       CmiGetArgFlagDesc(msg->argv,"+outlier", "Find Outliers.");
1810     outlierUsePhases = 
1811       CmiGetArgFlagDesc(msg->argv,"+outlierUsePhases",
1812                         "Apply automatic outlier analysis to any available phases.");
1813     if (outlierUsePhases) {
1814       // if the user wants to use an outlier feature, it is assumed outlier
1815       //    analysis is desired.
1816       findOutliers = true;
1817     }
1818   }
1819   bool findStartTime = (CmiTimerAbsolute()==1);
1820   traceProjectionsGID = CProxy_TraceProjectionsBOC::ckNew(findOutliers, findStartTime);
1821   if (findOutliers) {
1822     kMeansGID = CProxy_KMeansBOC::ckNew(outlierAutomatic,
1823                                         numKSeeds,
1824                                         peNumKeep,
1825                                         entryThreshold,
1826                                         outlierUsePhases);
1827   }
1828 }
1829
1830 // Called on every processor.
1831 void TraceProjectionsBOC::traceProjectionsParallelShutdown(int pe) {
1832   //CmiPrintf("[%d] traceProjectionsParallelShutdown called from . \n", CkMyPe(), pe);
1833   endPe = pe;                // the pe that starts CkExit()
1834   if (CkMyPe() == 0) {
1835     analysisStartTime = CmiWallTimer();
1836   }
1837   CkpvAccess(_trace)->endComputation();
1838   // no more tracing for projections on this processor after this point. 
1839   // Note that clear must be called after remove, or bad things will happen.
1840   CkpvAccess(_traces)->removeTrace(CkpvAccess(_trace));
1841   CkpvAccess(_traces)->clearTrace();
1842
1843   // From this point, we start multiple chains of reductions and broadcasts to
1844   // perform final online analysis activities.
1845
1846   // Start all parallel operations at once. 
1847   //   These MUST NOT modify base performance data in LogPool. If they must,
1848   //   then the parallel operations must be phased (and this code to be
1849   //   restructured as necessary)
1850   CProxy_KMeansBOC kMeansProxy(kMeansGID);
1851   CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1852   if (findOutliers) {
1853     parModulesRemaining++;
1854     kMeansProxy[CkMyPe()].startKMeansAnalysis();
1855   }
1856   parModulesRemaining++;
1857   if (findStartTime) 
1858   bocProxy[CkMyPe()].startTimeAnalysis();
1859   else
1860   bocProxy[CkMyPe()].startEndTimeAnalysis();
1861 }
1862
1863 // Called on each processor
1864 void KMeansBOC::startKMeansAnalysis() {
1865   // Initialize all necessary structures
1866   LogPool *pool = CkpvAccess(_trace)->_logPool;
1867
1868  if(CkMyPe()==0)     CkPrintf("[%d] KMeansBOC::startKMeansAnalysis time=\t%g\n", CkMyPe(), CkWallTimer() );
1869   int flushInt = 0;
1870   if (pool->hasFlushed) {
1871     flushInt = 1;
1872   }
1873   
1874   CkCallback cb(CkIndex_KMeansBOC::flushCheck(NULL), 
1875                 0, thisProxy);
1876   contribute(sizeof(int), &flushInt, CkReduction::logical_or, cb);  
1877 }
1878
1879 // Called on processor 0
1880 void KMeansBOC::flushCheck(CkReductionMsg *msg) {
1881   int someFlush = *((int *)msg->getData());
1882
1883   // if(CkMyPe()==0) CkPrintf("[%d] KMeansBOC::flushCheck time=\t%g\n", CkMyPe(), CkWallTimer() );
1884   
1885   if (someFlush == 0) {
1886     // Data intact proceed with KMeans analysis
1887     CProxy_KMeansBOC kMeansProxy(kMeansGID);
1888     kMeansProxy.flushCheckDone();
1889   } else {
1890     // Some processor had flushed it data at some point, abandon KMeans
1891     CkPrintf("Warning: Some processor has flushed its data. No KMeans will be conducted\n");
1892     // terminate KMeans
1893     CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1894     bocProxy[0].kMeansDone();
1895   }
1896 }
1897
1898 // Called on each processor
1899 void KMeansBOC::flushCheckDone() {
1900   // **FIXME** - more flexible metric collection scheme may be necessary
1901   //   in the future for production use.
1902   LogPool *pool = CkpvAccess(_trace)->_logPool;
1903
1904   // if(CkMyPe()==0)     CkPrintf("[%d] KMeansBOC::flushCheckDone time=\t%g\n", CkMyPe(), CkWallTimer() );
1905
1906   numEntryMethods = _entryTable.size();
1907   numMetrics = numEntryMethods + 2; // EPtime + idle and overhead
1908
1909   // maintained across phases
1910   markedBegin = false;
1911   markedIdle = false;
1912   beginBlockTime = 0.0;
1913   beginIdleBlockTime = 0.0;
1914   lastBeginEPIdx = -1; // none
1915
1916   lastPhaseIdx = 0;
1917   currentExecTimes = NULL;
1918   currentPhase = 0;
1919   selected = false;
1920
1921   pool->initializePhases();
1922
1923   // incoming K Seeds and the per-phase filter
1924   incKSeeds = new double[numK*numMetrics];
1925   keepMetric = new bool[numMetrics];
1926
1927   //  Something wrong when call thisProxy[CkMyPe()].getNextPhaseMetrics() !??!
1928   //  CProxy_KMeansBOC kMeansProxy(kMeansGID);
1929   //  kMeansProxy[CkMyPe()].getNextPhaseMetrics();
1930   thisProxy[CkMyPe()].getNextPhaseMetrics();
1931 }
1932
1933 // Called on each processor.
1934 void KMeansBOC::getNextPhaseMetrics() {
1935   // Assumes the presence of the complete logs on this processor.
1936   // Assumes first event is always BEGIN_COMPUTATION
1937   // Assumes each processor sees the same number of phases.
1938   //
1939   // In this code, we collect performance data for this processor.
1940   // All times are in seconds.
1941
1942   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::getNextPhaseMetrics time=\t%g\n", CkMyPe(), CkWallTimer() );  
1943
1944   if (usePhases) {
1945     DEBUGF("[%d] Using Phases\n", CkMyPe());
1946   } else {
1947     DEBUGF("[%d] NOT using Phases\n", CkMyPe());
1948   }
1949   
1950   if (currentExecTimes != NULL) {
1951     delete [] currentExecTimes;
1952   }
1953   currentExecTimes = new double[numMetrics];
1954   for (int i=0; i<numMetrics; i++) {
1955     currentExecTimes[i] = 0.0;
1956   }
1957
1958   int numEventMethods = _entryTable.size();
1959   LogPool *pool = CkpvAccess(_trace)->_logPool;
1960   
1961   CkAssert(pool->numEntries > lastPhaseIdx);
1962   double startPhaseTime = pool->pool[lastPhaseIdx].time;
1963   double totalPhaseTime = 0.0;
1964   double totalActiveTime = 0.0; // entry method + idle
1965
1966   for (int i=lastPhaseIdx; i<pool->numEntries; i++) {
1967     if (pool->pool[i].type == BEGIN_PROCESSING) {
1968       // check pairing
1969       if (!markedBegin) {
1970         markedBegin = true;
1971       }
1972       beginBlockTime = pool->pool[i].time;
1973       lastBeginEPIdx = pool->pool[i].eIdx;
1974     } else if (pool->pool[i].type == END_PROCESSING) {
1975       // check pairing
1976       // if End without a begin, just ignore
1977       //   this event. If a phase-boundary is crossed, the Begin
1978       //   event would be maintained in beginBlockTime, so it is 
1979       //   not a problem.
1980       if (markedBegin) {
1981         markedBegin = false;
1982         if (pool->pool[i].event < 0)
1983         {
1984           // ignore dummy events. **FIXME** as they have no eIdx?
1985           continue;
1986         }
1987         currentExecTimes[pool->pool[i].eIdx] += 
1988           pool->pool[i].time - beginBlockTime;
1989         totalActiveTime += pool->pool[i].time - beginBlockTime;
1990         lastBeginEPIdx = -1;
1991       }
1992     } else if (pool->pool[i].type == BEGIN_IDLE) {
1993       // check pairing
1994       if (!markedIdle) {
1995         markedIdle = true;
1996       }
1997       beginIdleBlockTime = pool->pool[i].time;
1998     } else if (pool->pool[i].type == END_IDLE) {
1999       // check pairing
2000       if (markedIdle) {
2001         markedIdle = false;
2002         currentExecTimes[numEventMethods] += 
2003           pool->pool[i].time - beginIdleBlockTime;
2004         totalActiveTime += pool->pool[i].time - beginIdleBlockTime;
2005       }
2006     } else if (pool->pool[i].type == END_PHASE) {
2007       // ignored when not using phases
2008       if (usePhases) {
2009         // when we've not visited this node before
2010         if (i != lastPhaseIdx) { 
2011           totalPhaseTime = 
2012             pool->pool[i].time - pool->pool[lastPhaseIdx].time;
2013           // it is important that proper accounting of time take place here.
2014           // Note that END_PHASE events inevitably occur in the context of
2015           //   some entry method by the way the tracing API is designed.
2016           if (markedBegin) {
2017             CkAssert(lastBeginEPIdx >= 0);
2018             currentExecTimes[lastBeginEPIdx] += 
2019               pool->pool[i].time - beginBlockTime;
2020             totalActiveTime += pool->pool[i].time - beginBlockTime;
2021             // this is so the remainder contributes to the next phase
2022             beginBlockTime = pool->pool[i].time;
2023           }
2024           // The following is unlikely, but stranger things have happened.
2025           if (markedIdle) {
2026             currentExecTimes[numEventMethods] +=
2027               pool->pool[i].time - beginIdleBlockTime;
2028             totalActiveTime += pool->pool[i].time - beginIdleBlockTime;
2029             // this is so the remainder contributes to the next phase
2030             beginIdleBlockTime = pool->pool[i].time;
2031           }
2032           if (totalActiveTime <= totalPhaseTime) {
2033             currentExecTimes[numEventMethods+1] = 
2034               totalPhaseTime - totalActiveTime;
2035           } else {
2036             currentExecTimes[numEventMethods+1] = 0.0;
2037             CkPrintf("[%d] Warning: Overhead found to be negative for Phase %d!\n",
2038                      CkMyPe(), currentPhase);
2039           }
2040           collectKMeansData();
2041           // end the loop (and method) and defer the work till the next call
2042           lastPhaseIdx = i;
2043           break; 
2044         }
2045       }
2046     } else if (pool->pool[i].type == END_COMPUTATION) {
2047       if (markedBegin) {
2048         CkAssert(lastBeginEPIdx >= 0);
2049         currentExecTimes[lastBeginEPIdx] += 
2050           pool->pool[i].time - beginBlockTime;
2051         totalActiveTime += pool->pool[i].time - beginBlockTime;
2052       }
2053       if (markedIdle) {
2054         currentExecTimes[numEventMethods] +=
2055           pool->pool[i].time - beginIdleBlockTime;
2056         totalActiveTime += pool->pool[i].time - beginIdleBlockTime;
2057       }
2058       totalPhaseTime = 
2059         pool->pool[i].time - pool->pool[lastPhaseIdx].time;
2060       if (totalActiveTime <= totalPhaseTime) {
2061         currentExecTimes[numEventMethods+1] = totalPhaseTime - totalActiveTime;
2062       } else {
2063         currentExecTimes[numEventMethods+1] = 0.0;
2064         CkPrintf("[%d] Warning: Overhead found to be negative!\n",
2065                  CkMyPe());
2066       }
2067       collectKMeansData();
2068     }
2069   }
2070 }
2071
2072 /**
2073  *  Through a reduction, collectKMeansData aggregates each processors' data
2074  *  in order for global properties to be determined:
2075  *  
2076  *  1. min & max to determine normalization factors.
2077  *  2. sum to determine global EP averages for possible metric reduction
2078  *       through thresholding.
2079  *  3. sum of squares to compute stddev which may be useful in the future.
2080  *
2081  *  collectKMeansData will also keep the processor's data for the current
2082  *    phase so that it may be normalized and worked on subsequently.
2083  *
2084  **/
2085 void KMeansBOC::collectKMeansData() {
2086   int minOffset = numMetrics;
2087   int maxOffset = 2*numMetrics;
2088   int sosOffset = 3*numMetrics; // sos = Sum Of Squares
2089
2090   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::collectKMeansData time=\tg\n", CkMyPe(), CkWallTimer() );
2091
2092   double *reductionMsg = new double[numMetrics*4];
2093
2094   for (int i=0; i<numMetrics; i++) {
2095     reductionMsg[i] = currentExecTimes[i];
2096     // duplicate the event times for max and min sections of the reduction
2097     reductionMsg[minOffset + i] = currentExecTimes[i];
2098     reductionMsg[maxOffset + i] = currentExecTimes[i];
2099     // compute squares
2100     reductionMsg[sosOffset + i] = currentExecTimes[i]*currentExecTimes[i];
2101   }
2102
2103   CkCallback cb(CkIndex_KMeansBOC::globalMetricRefinement(NULL), 
2104                 0, thisProxy);
2105   contribute((numMetrics*4)*sizeof(double), reductionMsg, 
2106              outlierReductionType, cb);  
2107 }
2108
2109 // The purpose is mainly to initialize the k seeds and generate
2110 //   normalization parameters for each of the metrics. The k seeds
2111 //   and normalization parameters are broadcast to all processors.
2112 //
2113 // Called on processor 0
2114 void KMeansBOC::globalMetricRefinement(CkReductionMsg *msg) {
2115   CkAssert(CkMyPe() == 0);
2116   
2117   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::globalMetricRefinement time=\t%g\n", CkMyPe(), CkWallTimer() );
2118
2119   int sumOffset = 0;
2120   int minOffset = numMetrics;
2121   int maxOffset = 2*numMetrics;
2122   int sosOffset = 3*numMetrics; // sos = Sum Of Squares
2123
2124   // calculate statistics & boundaries for the k seeds for clustering
2125   KMeansStatsMessage *outmsg = 
2126     new (numMetrics, numK*numMetrics, numMetrics*4) KMeansStatsMessage;
2127   outmsg->numMetrics = numMetrics;
2128   outmsg->numKPos = numK*numMetrics;
2129   outmsg->numStats = numMetrics*4;
2130
2131   // Sum | Min | Max | Sum of Squares
2132   double *totalExecTimes = (double *)msg->getData();
2133   double totalTime = 0.0;
2134
2135   for (int i=0; i<numMetrics; i++) {
2136     DEBUGN("%lf\n", totalExecTimes[i]);
2137     totalTime += totalExecTimes[i];
2138
2139     // calculate event mean over all processors
2140     outmsg->stats[sumOffset + i] = totalExecTimes[sumOffset + i]/CkNumPes();
2141
2142     // get the ranges and offsets of each metric. With this, we get
2143     //   normalization factors that can be sent back to each processor to
2144     //   be used as necessary. We reuse max for range. Min remains the offset.
2145     outmsg->stats[minOffset + i] = totalExecTimes[minOffset + i];
2146     outmsg->stats[maxOffset + i] = totalExecTimes[maxOffset + i] -
2147       totalExecTimes[minOffset + i];
2148     
2149     // calculate stddev (using biased variance)
2150     outmsg->stats[sosOffset + i] = 
2151       sqrt((totalExecTimes[sosOffset + i] - 
2152             2*(outmsg->stats[i])*totalExecTimes[i] +
2153             (outmsg->stats[i])*(outmsg->stats[i])*CkNumPes())/
2154            CkNumPes());
2155   }
2156
2157   for (int i=0; i<numMetrics; i++) {
2158     // 1) if the proportion of the max value of the entry method relative to
2159     //   the average time taken over all entry methods across all processors
2160     //   is greater than the stipulated percentage threshold ...; AND
2161     // 2) if the range of values are non-zero.
2162     //
2163     // The current assumption is totalTime > 0 (what program has zero total
2164     //   time from all work?)
2165     keepMetric[i] = ((totalExecTimes[maxOffset + i]/(totalTime/CkNumPes()) >=
2166                      entryThreshold) &&
2167       (totalExecTimes[maxOffset + i] > totalExecTimes[minOffset + i]));
2168     if (keepMetric[i]) {
2169       DEBUGF("[%d] Keep EP %d | Max = %lf | Avg Tot = %lf\n", CkMyPe(), i,
2170              totalExecTimes[maxOffset + i], totalTime/CkNumPes());
2171     } else {
2172       DEBUGN("[%d] DO NOT Keep EP %d\n", CkMyPe(), i);
2173     }
2174     outmsg->filter[i] = keepMetric[i];
2175   }
2176
2177   delete msg;
2178   
2179   // initialize k seeds for this phase
2180   kSeeds = new double[numK*numMetrics];
2181
2182   numKReported = 0;
2183   kNumMembers = new int[numK];
2184
2185   // Randomly select k processors' metric vectors for the k seeds
2186   //  srand((unsigned)(CmiWallTimer()*1.0e06));
2187   srand(11337); // for debugging purposes
2188   for (int k=0; k<numK; k++) {
2189     DEBUGF("Seed %d | ", k);
2190     for (int m=0; m<numMetrics; m++) {
2191       double factor = totalExecTimes[maxOffset + m] - 
2192         totalExecTimes[minOffset + m];
2193       // "uniform" distribution, scaled according to the normalization
2194       //   factors
2195       //      kSeeds[numMetrics*k + m] = ((1.0*(k+1))/numK)*factor;
2196       // Random distribution.
2197       kSeeds[numMetrics*k + m] =
2198         ((rand()*1.0)/RAND_MAX)*factor;
2199       if (keepMetric[m]) {
2200         DEBUGF("[%d|%lf] ", m, kSeeds[numMetrics*k + m]);
2201       }
2202       outmsg->kSeedsPos[numMetrics*k + m] = kSeeds[numMetrics*k + m];
2203     }
2204     DEBUGF("\n");
2205     kNumMembers[k] = 0;
2206   }
2207
2208   // broadcast statistical values to all processors for cluster discovery
2209   thisProxy.findInitialClusters(outmsg);
2210 }
2211
2212
2213
2214 // Called on each processor.
2215 void KMeansBOC::findInitialClusters(KMeansStatsMessage *msg) {
2216
2217  if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::findInitialClusters time=\t%g\n", CkMyPe(), CkWallTimer() );
2218
2219   phaseIter = 0;
2220
2221   // Get info from stats message
2222   CkAssert(numMetrics == msg->numMetrics);
2223   for (int i=0; i<numMetrics; i++) {
2224     keepMetric[i] = msg->filter[i];
2225   }
2226
2227   // Normalize data on local processor.
2228   // **CWL** See my thesis for detailed discussion of normalization of
2229   //    performance data.
2230   // **NOTE** This might change if we want to send data based on the filter
2231   //   instead of all the data.
2232   CkAssert(numMetrics*4 == msg->numStats);
2233   for (int i=0; i<numMetrics; i++) {
2234     currentExecTimes[i] -= msg->stats[numMetrics + i];  // take offset
2235     // **CWL** We do not normalize the range. Entry methods that exhibit
2236     //   large absolute timing variations should be allowed to contribute
2237     //   more to the Euclidean distance measure!
2238     // currentExecTimes[i] /= msg->stats[2*numMetrics + i];
2239   }
2240
2241   // **NOTE** This might change if we want to send data based on the filter
2242   //   instead of all the data.
2243   CkAssert(numK*numMetrics == msg->numKPos);
2244   for (int i=0; i<msg->numKPos; i++) {
2245     incKSeeds[i] = msg->kSeedsPos[i];
2246   }
2247
2248   // Decide which KSeed this processor belongs to.
2249   minDistance = calculateDistance(0);
2250   DEBUGN("[%d] Phase %d Iter %d | Distance from 0 = %lf \n", CkMyPe(), 
2251            currentPhase, phaseIter, minDistance);
2252   minK = 0;
2253   for (int i=1; i<numK; i++) {
2254     double distance = calculateDistance(i);
2255     DEBUGN("[%d] Phase %d Iter %d | Distance from %d = %lf \n", CkMyPe(), 
2256              currentPhase, phaseIter, i, distance);
2257     if (distance < minDistance) {
2258       minDistance = distance;
2259       minK = i;
2260     }
2261   }
2262
2263   // Set up a reduction with the modification vector to the root (0).
2264   //
2265   // The modification vector sends a negative value for each metric
2266   //   for the K this processor no longer belongs to and a positive
2267   //   value to the K the processor now belongs. In addition, a -1.0
2268   //   is sent to the K it is leaving and a +1.0 to the K it is 
2269   //   joining.
2270   //
2271   // The processor must still contribute a "zero returns" even if
2272   //   nothing changes. This will be the basis for determine
2273   //   convergence at the root.
2274   //
2275   // The addtional +1 is meant for the count-change that must be
2276   //   maintained for the special cases at the root when some K
2277   //   may be deprived of all processor points or go from 0 to a
2278   //   positive number of processors (see later comments).
2279   double *modVector = new double[numK*(numMetrics+1)];
2280   for (int i=0; i<numK; i++) {
2281     for (int j=0; j<numMetrics+1; j++) {
2282       modVector[i*(numMetrics+1) + j] = 0.0;
2283     }
2284   }
2285   for (int i=0; i<numMetrics; i++) {
2286     // for this initialization, only positive values need be sent.
2287     modVector[minK*(numMetrics+1) + i] = currentExecTimes[i];
2288   }
2289   modVector[minK*(numMetrics+1)+numMetrics] = 1.0;
2290
2291   CkCallback cb(CkIndex_KMeansBOC::updateKSeeds(NULL), 
2292                 0, thisProxy);
2293   contribute(numK*(numMetrics+1)*sizeof(double), modVector, 
2294              CkReduction::sum_double, cb);  
2295 }
2296
2297 double KMeansBOC::calculateDistance(int k) {
2298   double ret = 0.0;
2299   for (int i=0; i<numMetrics; i++) {
2300     if (keepMetric[i]) {
2301       DEBUGN("[%d] Phase %d Iter %d Metric %d Exec %lf Seed %lf \n", 
2302              CkMyPe(), currentPhase, phaseIter, i,
2303                currentExecTimes[i], incKSeeds[k*numMetrics + i]);
2304       ret += pow(currentExecTimes[i] - incKSeeds[k*numMetrics + i], 2.0);
2305     }
2306   }
2307   return sqrt(ret);
2308 }
2309
2310 void KMeansBOC::updateKSeeds(CkReductionMsg *msg) {
2311   CkAssert(CkMyPe() == 0);
2312
2313   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::updateKSeeds time=\t%g\n", CkMyPe(), CkWallTimer() );
2314
2315   double *modVector = (double *)msg->getData();
2316   // sanity check
2317   CkAssert(numK*(numMetrics+1)*sizeof(double) == msg->getSize());
2318
2319   // A quick convergence test.
2320   bool hasChanges = false;
2321   for (int i=0; i<numK; i++) {
2322     hasChanges = hasChanges || 
2323       (modVector[i*(numMetrics+1) + numMetrics] != 0.0);
2324   }
2325   if (!hasChanges) {
2326     delete msg;
2327     findRepresentatives();
2328   } else {
2329     int overallChange = 0;
2330     for (int i=0; i<numK; i++) {
2331       int change = (int)modVector[i*(numMetrics+1) + numMetrics];
2332       if (change != 0) {
2333         overallChange += change;
2334         // modify the k seeds based on the modification vectors coming in
2335         //
2336         // If a seed initially has no members, its contents do not matter and
2337         //   is simply set to the average of the incoming vector.
2338         // If the change causes a seed to lose all its members, do nothing.
2339         //   Its last-known location is kept to allow it to re-capture
2340         //   membership at the next iteration rather than apply the last
2341         //   changes (which snaps the point unnaturally to 0,0).
2342         // Otherwise, apply the appropriate vector changes.
2343         CkAssert((kNumMembers[i] + change >= 0) &&
2344                  (kNumMembers[i] + change <= CkNumPes()));
2345         if (kNumMembers[i] == 0) {
2346           CkAssert(change > 0);
2347           for (int j=0; j<numMetrics; j++) {
2348             kSeeds[i*numMetrics + j] = modVector[i*(numMetrics+1) + j]/change;
2349           }
2350         } else if (kNumMembers[i] + change == 0) {
2351           // do nothing.
2352         } else {
2353           for (int j=0; j<numMetrics; j++) {
2354             kSeeds[i*numMetrics + j] *= kNumMembers[i];
2355             kSeeds[i*numMetrics + j] += modVector[i*(numMetrics+1) + j];
2356             kSeeds[i*numMetrics + j] /= kNumMembers[i] + change;
2357           }
2358         }
2359         kNumMembers[i] += change;
2360       }
2361       DEBUGN("[%d] Phase %d Iter %d K = %d Membership Count = %d\n",
2362              CkMyPe(), currentPhase, phaseIter, i, kNumMembers[i]);
2363     }
2364     delete msg;
2365
2366     // broadcast the new seed locations.
2367     KSeedsMessage *outmsg = new (numK*numMetrics) KSeedsMessage;
2368     outmsg->numKPos = numK*numMetrics;
2369     for (int i=0; i<numK*numMetrics; i++) {
2370       outmsg->kSeedsPos[i] = kSeeds[i];
2371     }
2372
2373     thisProxy.updateSeedMembership(outmsg);
2374   }
2375 }
2376
2377 // Called on all processors
2378 void KMeansBOC::updateSeedMembership(KSeedsMessage *msg) {
2379
2380   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::updateSeedMembership time=\t%g\n", CkMyPe(), CkWallTimer() );
2381
2382   phaseIter++;
2383
2384   // **NOTE** This might change if we want to send data based on the filter
2385   //   instead of all the data.
2386   CkAssert(numK*numMetrics == msg->numKPos);
2387   for (int i=0; i<msg->numKPos; i++) {
2388     incKSeeds[i] = msg->kSeedsPos[i];
2389   }
2390
2391   // Decide which KSeed this processor belongs to.
2392   lastMinK = minK;
2393   minDistance = calculateDistance(0);
2394   DEBUGN("[%d] Phase %d Iter %d | Distance from 0 = %lf \n", CkMyPe(), 
2395          currentPhase, phaseIter, minDistance);
2396
2397   minK = 0;
2398   for (int i=1; i<numK; i++) {
2399     double distance = calculateDistance(i);
2400     DEBUGN("[%d] Phase %d Iter %d | Distance from %d = %lf \n", CkMyPe(), 
2401            currentPhase, phaseIter, i, distance);
2402     if (distance < minDistance) {
2403       minDistance = distance;
2404       minK = i;
2405     }
2406   }
2407
2408   double *modVector = new double[numK*(numMetrics+1)];
2409   for (int i=0; i<numK; i++) {
2410     for (int j=0; j<numMetrics+1; j++) {
2411       modVector[i*(numMetrics+1) + j] = 0.0;
2412     }
2413   }
2414
2415   if (minK != lastMinK) {
2416     for (int i=0; i<numMetrics; i++) {
2417       modVector[minK*(numMetrics+1) + i] = currentExecTimes[i];
2418       modVector[lastMinK*(numMetrics+1) + i] = -currentExecTimes[i];
2419     }
2420     modVector[minK*(numMetrics+1)+numMetrics] = 1.0;
2421     modVector[lastMinK*(numMetrics+1)+numMetrics] = -1.0;
2422   }
2423
2424   CkCallback cb(CkIndex_KMeansBOC::updateKSeeds(NULL), 
2425                 0, thisProxy);
2426   contribute(numK*(numMetrics+1)*sizeof(double), modVector, 
2427              CkReduction::sum_double, cb);  
2428 }
2429
2430 void KMeansBOC::findRepresentatives() {
2431
2432   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::findRepresentatives time=\t%g\n", CkMyPe(), CkWallTimer() );
2433
2434   int numNonEmptyClusters = 0;
2435   for (int i=0; i<numK; i++) {
2436     if (kNumMembers[i] > 0) {
2437       numNonEmptyClusters++;
2438     }
2439   }
2440
2441   int numRepresentatives = peNumKeep;
2442   // **FIXME**
2443   // This is fairly arbitrary. Next time, choose the centers of the top
2444   //   largest clusters.
2445   if (numRepresentatives < numNonEmptyClusters) {
2446     numRepresentatives = numNonEmptyClusters;
2447   }
2448
2449   int slotsRemaining = numRepresentatives;
2450
2451   DEBUGF("Slots = %d | Non-empty = %d \n", slotsRemaining, 
2452          numNonEmptyClusters);
2453
2454   // determine how many exemplars to select per cluster. Currently
2455   //   hardcoded to 1. Future challenge is to decide on other numbers
2456   //   or proportionality.
2457   //
2458   int exemplarsPerCluster = 1;
2459   slotsRemaining -= exemplarsPerCluster*numNonEmptyClusters;
2460
2461   int numCandidateOutliers = CkNumPes() - 
2462     exemplarsPerCluster*numNonEmptyClusters;
2463
2464   double *remainders = new double[numK];
2465   int *assigned = new int[numK];
2466   exemplarChoicesLeft = new int[numK];
2467   outlierChoicesLeft = new int[numK];
2468
2469   for (int i=0; i<numK; i++) {
2470     assigned[i] = 0;
2471     remainders[i] = 
2472       (kNumMembers[i] - exemplarsPerCluster*numNonEmptyClusters) *
2473       slotsRemaining / numCandidateOutliers;
2474     if (remainders[i] >= 0.0) {
2475       assigned[i] = (int)floor(remainders[i]);
2476       remainders[i] -= assigned[i];
2477     } else {
2478       remainders[i] = 0.0;
2479     }
2480   }
2481
2482   for (int i=0; i<numK; i++) {
2483     slotsRemaining -= assigned[i];
2484   }
2485   CkAssert(slotsRemaining >= 0);
2486
2487   // find clusters to assign the loose slots to, in order of
2488   // remainder proportion
2489   while (slotsRemaining > 0) {
2490     double max = 0.0;
2491     int winner = 0;
2492     for (int i=0; i<numK; i++) {
2493       if (remainders[i] > max) {
2494         max = remainders[i];
2495         winner = i;
2496       }
2497     }
2498     assigned[winner]++;
2499     remainders[winner] = 0.0;
2500     slotsRemaining--;
2501   }
2502
2503   // set up how many reduction cycles of min/max we need to conduct to
2504   // select the representatives.
2505   numSelectionIter = exemplarsPerCluster;
2506   for (int i=0; i<numK; i++) {
2507     if (assigned[i] > numSelectionIter) {
2508       numSelectionIter = assigned[i];
2509     }
2510   }
2511   DEBUGF("Selection Iterations = %d\n", numSelectionIter);
2512
2513   for (int i=0; i<numK; i++) {
2514     if (kNumMembers[i] > 0) {
2515       exemplarChoicesLeft[i] = exemplarsPerCluster;
2516       outlierChoicesLeft[i] = assigned[i];
2517     } else {
2518       exemplarChoicesLeft[i] = 0;
2519       outlierChoicesLeft[i] = 0;
2520     }
2521     DEBUGF("%d | Exemplar = %d | Outlier = %d\n", i, exemplarChoicesLeft[i],
2522            outlierChoicesLeft[i]);
2523   }
2524
2525   delete [] assigned;
2526   delete [] remainders;
2527
2528   // send out first broadcast
2529   KSelectionMessage *outmsg = NULL;
2530   if (numSelectionIter > 0) {
2531     outmsg = new (numK, numK, numK) KSelectionMessage;
2532     outmsg->numKMinIDs = numK;
2533     outmsg->numKMaxIDs = numK;
2534     for (int i=0; i<numK; i++) {
2535       outmsg->minIDs[i] = -1;
2536       outmsg->maxIDs[i] = -1;
2537     }
2538     thisProxy.collectDistances(outmsg);
2539   } else {
2540     CkPrintf("Warning: No selection iteration from the start!\n");
2541     // invoke phase completion on all processors
2542     thisProxy.phaseDone();
2543   }
2544 }
2545
2546 /*
2547  *  lastMin = array of minimum champions of the last tournament
2548  *  lastMax = array of maximum champions of the last tournament
2549  *  lastMaxVal = array of last encountered maximum values, allows previous
2550  *                 minimum winners to eliminate themselves from the next
2551  *                 minimum race.
2552  *
2553  *  Called on all processors.
2554  */
2555 void KMeansBOC::collectDistances(KSelectionMessage *msg) {
2556
2557   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::collectDistances time=\t%g\n", CkMyPe(), CkWallTimer() );
2558
2559   DEBUGF("[%d] %d | min = %d max = %d\n", CkMyPe(),
2560          lastMinK, msg->minIDs[lastMinK], msg->maxIDs[lastMinK]);
2561   if ((CkMyPe() == msg->minIDs[lastMinK]) || 
2562       (CkMyPe() == msg->maxIDs[lastMinK])) {
2563     CkAssert(!selected);
2564     selected = true;
2565   }
2566
2567   // build outgoing reduction structure
2568   //   format = minVal | ID | maxVal | ID
2569   double *minMaxAndIDs = NULL;
2570
2571   minMaxAndIDs = new double[numK*4];
2572   // initialize to the appropriate out-of-band values (for error checks)
2573   for (int i=0; i<numK; i++) {
2574     minMaxAndIDs[i*4] = -1.0; // out-of-band min value
2575     minMaxAndIDs[i*4+1] = -1.0; // out of band ID
2576     minMaxAndIDs[i*4+2] = -1.0; // out-of-band max value
2577     minMaxAndIDs[i*4+3] = -1.0; // out of band ID
2578   }
2579   // If I have not won before, I put myself back into the competition
2580   if (!selected) {
2581     DEBUGF("[%d] My Contribution = %lf\n", CkMyPe(), minDistance);
2582     minMaxAndIDs[lastMinK*4] = minDistance;
2583     minMaxAndIDs[lastMinK*4+1] = CkMyPe();
2584     minMaxAndIDs[lastMinK*4+2] = minDistance;
2585     minMaxAndIDs[lastMinK*4+3] = CkMyPe();
2586   }
2587   delete msg;
2588
2589   CkCallback cb(CkIndex_KMeansBOC::findNextMinMax(NULL), 
2590                 0, thisProxy);
2591   contribute(numK*4*sizeof(double), minMaxAndIDs, 
2592              minMaxReductionType, cb);  
2593 }
2594
2595 void KMeansBOC::findNextMinMax(CkReductionMsg *msg) {
2596   // incoming format:
2597   //   minVal | minID | maxVal | maxID
2598
2599   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::findNextMinMax time=\t%g\n", CkMyPe(), CkWallTimer() );
2600
2601   if (numSelectionIter > 0) {
2602     double *incInfo = (double *)msg->getData();
2603     
2604     KSelectionMessage *outmsg = new (numK, numK) KSelectionMessage;
2605     outmsg->numKMinIDs = numK;
2606     outmsg->numKMaxIDs = numK;
2607     
2608     for (int i=0; i<numK; i++) {
2609       DEBUGF("%d | %lf %d %lf %d \n", i, 
2610              incInfo[i*4], (int)incInfo[i*4+1], 
2611              incInfo[i*4+2], (int)incInfo[i*4+3]);
2612     }
2613
2614     for (int i=0; i<numK; i++) {
2615       if (exemplarChoicesLeft[i] > 0) {
2616         outmsg->minIDs[i] = (int)incInfo[i*4+1];
2617         exemplarChoicesLeft[i]--;
2618       } else {
2619         outmsg->minIDs[i] = -1;
2620       }
2621       if (outlierChoicesLeft[i] > 0) {
2622         outmsg->maxIDs[i] = (int)incInfo[i*4+3];
2623         outlierChoicesLeft[i]--;
2624       } else {
2625         outmsg->maxIDs[i] = -1;
2626       }
2627     }
2628     thisProxy.collectDistances(outmsg);
2629     numSelectionIter--;
2630   } else {
2631     // invoke phase completion on all processors
2632     thisProxy.phaseDone();
2633   }
2634 }
2635
2636 /**
2637  *  Completion of the K-Means clustering and data selection of one phase
2638  *    of the computation.
2639  *
2640  *  Called on every processor.
2641  */
2642 void KMeansBOC::phaseDone() {
2643
2644   //  if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::phaseDone time=\t%g\n", CkMyPe(), CkWallTimer() );
2645
2646   LogPool *pool = CkpvAccess(_trace)->_logPool;
2647   CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
2648
2649   // now decide on what to do with the decision.
2650   if (!selected) {
2651     if (usePhases) {
2652       pool->keepPhase[currentPhase] = false;
2653     } else {
2654       // if not using phases, we're working on the whole log
2655       pool->setAllPhases(false);
2656     }
2657   }
2658
2659   // **FIXME** (?) - All processors have to agree on this, or the reduction
2660   //   will not be correct! The question is "is this enforcible?"
2661   if ((currentPhase == (pool->numPhases-1)) || !usePhases) {
2662     // We're done
2663     int dummy = 0;
2664     CkCallback cb(CkIndex_TraceProjectionsBOC::kMeansDone(NULL), 
2665                   0, bocProxy);
2666     contribute(sizeof(int), &dummy, CkReduction::sum_int, cb);
2667   } else {
2668     // reset all phase-based k-means data and decisions
2669
2670     // **FIXME**!!!!!    
2671     
2672     // invoke the next K-Means computation phase.
2673     currentPhase++;
2674     thisProxy[CkMyPe()].getNextPhaseMetrics();
2675   }
2676 }
2677
2678 void TraceProjectionsBOC::startTimeAnalysis()
2679 {
2680   double startTime = 0.0;
2681   if (CkpvAccess(_trace)->_logPool->numEntries>0)
2682      startTime = CkpvAccess(_trace)->_logPool->pool[0].time;
2683   CkCallback cb(CkIndex_TraceProjectionsBOC::startTimeDone(NULL), thisProxy);
2684   contribute(sizeof(double), &startTime, CkReduction::min_double, cb);  
2685 }
2686
2687 void TraceProjectionsBOC::startTimeDone(CkReductionMsg *msg)
2688 {
2689   // CkPrintf("[%d] TraceProjectionsBOC::startTimeDone time=\t%g parModulesRemaining:%d\n", CkMyPe(), CkWallTimer(), parModulesRemaining);
2690
2691   if (CkpvAccess(_trace) != NULL) {
2692     CkpvAccess(_trace)->_logPool->globalStartTime = *(double *)msg->getData();
2693     CkpvAccess(_trace)->_logPool->setNewStartTime();
2694     //if (CkMyPe() == 0) CkPrintf("Start time determined to be %lf us\n", (CkpvAccess(_trace)->_logPool->globalStartTime)*1e06);
2695   }
2696   delete msg;
2697   thisProxy[CkMyPe()].startEndTimeAnalysis();
2698 }
2699
2700 void TraceProjectionsBOC::startEndTimeAnalysis()
2701 {
2702  //CkPrintf("[%d] TraceProjectionsBOC::startEndTimeAnalysis time=\t%g\n", CkMyPe(), CkWallTimer() );
2703
2704   endTime = CkpvAccess(_trace)->endTime;
2705   // CkPrintf("[%d] End time is %lf us\n", CkMyPe(), endTime*1e06);
2706
2707   CkCallback cb(CkIndex_TraceProjectionsBOC::endTimeDone(NULL), 
2708                 0, thisProxy);
2709   contribute(sizeof(double), &endTime, CkReduction::max_double, cb);  
2710 }
2711
2712 void TraceProjectionsBOC::endTimeDone(CkReductionMsg *msg)
2713 {
2714  //if(CkMyPe()==0)    CkPrintf("[%d] TraceProjectionsBOC::endTimeDone time=\t%g parModulesRemaining:%d\n", CkMyPe(), CkWallTimer(), parModulesRemaining);
2715
2716   CkAssert(CkMyPe() == 0);
2717   parModulesRemaining--;
2718   if (CkpvAccess(_trace) != NULL) {
2719     CkpvAccess(_trace)->_logPool->globalEndTime = *(double *)msg->getData() - CkpvAccess(_trace)->_logPool->globalStartTime;
2720     // CkPrintf("End time determined to be %lf us\n",
2721     //       (CkpvAccess(_trace)->_logPool->globalEndTime)*1e06);
2722   }
2723   delete msg;
2724   if (parModulesRemaining == 0) {
2725     thisProxy[CkMyPe()].finalize();
2726   }
2727 }
2728
2729 void TraceProjectionsBOC::kMeansDone(CkReductionMsg *msg) {
2730
2731  if(CkMyPe()==0)  CkPrintf("[%d] TraceProjectionsBOC::kMeansDone time=\t%g\n", CkMyPe(), CkWallTimer() );
2732
2733   CkAssert(CkMyPe() == 0);
2734   parModulesRemaining--;
2735   CkPrintf("K-Means Analysis Time = %lf seconds\n",
2736            CmiWallTimer()-analysisStartTime);
2737   delete msg;
2738   if (parModulesRemaining == 0) {
2739     thisProxy[CkMyPe()].finalize();
2740   }
2741 }
2742
2743 /**
2744  *
2745  *  This version is called (on processor 0) only if flushCheck fails.
2746  *
2747  */
2748 void TraceProjectionsBOC::kMeansDone() {
2749   CkAssert(CkMyPe() == 0);
2750   parModulesRemaining--;
2751   CkPrintf("K-Means Analysis Aborted because of flush. Time taken = %lf seconds\n",
2752            CmiWallTimer()-analysisStartTime);
2753   if (parModulesRemaining == 0) {
2754     thisProxy[CkMyPe()].finalize();
2755   }
2756 }
2757
2758 void TraceProjectionsBOC::finalize()
2759 {
2760   CkAssert(CkMyPe() == 0);
2761   //CkPrintf("Total Analysis Time = %lf seconds\n", 
2762   //       CmiWallTimer()-analysisStartTime);
2763   thisProxy.closingTraces();
2764 }
2765
2766 // Called on every processor
2767 void TraceProjectionsBOC::closingTraces() {
2768   CkpvAccess(_trace)->closeTrace();
2769
2770     // subtle:  reduction needs to go to the PE which started CkExit()
2771   int pe = 0;
2772   if (endPe != -1) pe = endPe;
2773   CkCallback cb(CkIndex_TraceProjectionsBOC::closeParallelShutdown(NULL), 
2774                 pe, thisProxy); 
2775   contribute(0, NULL, CkReduction::sum_int, cb);  
2776 }
2777
2778 // The sole purpose of this reduction is to decide whether or not
2779 //   Projections as a module needs to call CkExit() to get other
2780 //   modules to shutdown.
2781 void TraceProjectionsBOC::closeParallelShutdown(CkReductionMsg *msg) {
2782   CkAssert(endPe == -1 && CkMyPe() ==0 || CkMyPe() == endPe);
2783   delete msg;
2784   // decide if CkExit() needs to be called
2785   if (!CkpvAccess(_trace)->converseExit) {
2786     CkExit();
2787   }
2788 }
2789 /*
2790  *  Registration and definition of the Outlier Reduction callback.
2791  *  Format: Sum | Min | Max | Sum of Squares
2792  */
2793 CkReductionMsg *outlierReduction(int nMsgs,
2794                                  CkReductionMsg **msgs) {
2795   int numBytes = 0;
2796   int numMetrics = 0;
2797   double *ret = NULL;
2798
2799   if (nMsgs == 1) {
2800     // nothing to do, just pass it on
2801     return CkReductionMsg::buildNew(msgs[0]->getSize(),msgs[0]->getData());
2802   }
2803
2804   if (nMsgs > 1) {
2805     numBytes = msgs[0]->getSize();
2806     // sanity checks
2807     if (numBytes%sizeof(double) != 0) {
2808       CkAbort("Outlier Reduction Size incompatible with doubles!\n");
2809     }
2810     if ((numBytes/sizeof(double))%4 != 0) {
2811       CkAbort("Outlier Reduction Size Array not divisible by 4!\n");
2812     }
2813     numMetrics = (numBytes/sizeof(double))/4;
2814     ret = new double[numMetrics*4];
2815
2816     // copy the first message data into the return structure first
2817     for (int i=0; i<numMetrics*4; i++) {
2818       ret[i] = ((double *)msgs[0]->getData())[i];
2819     }
2820
2821     // Sum | Min | Max | Sum of Squares
2822     for (int msgIdx=1; msgIdx<nMsgs; msgIdx++) {
2823       for (int i=0; i<numMetrics; i++) {
2824         // Sum
2825         ret[i] += ((double *)msgs[msgIdx]->getData())[i];
2826         // Min
2827         ret[numMetrics + i] =
2828           (ret[numMetrics + i] < 
2829            ((double *)msgs[msgIdx]->getData())[numMetrics + i]) 
2830           ? ret[numMetrics + i] : 
2831           ((double *)msgs[msgIdx]->getData())[numMetrics + i];
2832         // Max
2833         ret[2*numMetrics + i] = 
2834           (ret[2*numMetrics + i] >
2835            ((double *)msgs[msgIdx]->getData())[2*numMetrics + i])
2836           ? ret[2*numMetrics + i] :
2837           ((double *)msgs[msgIdx]->getData())[2*numMetrics + i];
2838         // Sum of Squares (squaring already done at leaf)
2839         ret[3*numMetrics + i] +=
2840           ((double *)msgs[msgIdx]->getData())[3*numMetrics + i];
2841       }
2842     }
2843   }
2844   
2845   /* apparently, we do not delete the incoming messages */
2846   return CkReductionMsg::buildNew(numBytes,ret);
2847 }
2848
2849 /*
2850  * The only reason we have a user-defined reduction is to support
2851  *   identification of the "winning" processors as well as to handle
2852  *   both the min and the max of each "tournament". A simple min/max
2853  *   discovery cannot handle ties.
2854  */
2855 CkReductionMsg *minMaxReduction(int nMsgs,
2856                                 CkReductionMsg **msgs) {
2857   CkAssert(nMsgs > 0);
2858
2859   int numBytes = msgs[0]->getSize();
2860   CkAssert(numBytes%sizeof(double) == 0);
2861   int numK = (numBytes/sizeof(double))/4;
2862
2863   double *ret = new double[numK*4];
2864   // fill with out-of-band values
2865   for (int i=0; i<numK; i++) {
2866     ret[i*4] = -1.0;
2867     ret[i*4+1] = -1.0;
2868     ret[i*4+2] = -1.0;
2869     ret[i*4+3] = -1.0;
2870   }
2871
2872   // incoming format K * (minVal | minIdx | maxVal | maxIdx)
2873   for (int i=0; i<nMsgs; i++) {
2874     double *temp = (double *)msgs[i]->getData();
2875     for (int j=0; j<numK; j++) {
2876       // no previous valid min
2877       if (ret[j*4+1] < 0) {
2878         // fill it in only if the incoming min is valid
2879         if (temp[j*4+1] >= 0) {
2880           ret[j*4] = temp[j*4];      // fill min value
2881           ret[j*4+1] = temp[j*4+1];  // fill ID
2882         }
2883       } else {
2884         // find Min, only if incoming min is valid
2885         if (temp[j*4+1] >= 0) {
2886           if (temp[j*4] < ret[j*4]) {
2887             ret[j*4] = temp[j*4];      // replace min value
2888             ret[j*4+1] = temp[j*4+1];  // replace ID
2889           }
2890         }
2891       }
2892       // no previous valid max
2893       if (ret[j*4+3] < 0) {
2894         // fill only if the incoming max is valid
2895         if (temp[j*4+3] >= 0) {
2896           ret[j*4+2] = temp[j*4+2];  // fill max value
2897           ret[j*4+3] = temp[j*4+3];  // fill ID
2898         }
2899       } else {
2900         // find Max, only if incoming max is valid
2901         if (temp[j*4+3] >= 0) {
2902           if (temp[j*4+2] > ret[j*4+2]) {
2903             ret[j*4+2] = temp[j*4+2];  // replace max value
2904             ret[j*4+3] = temp[j*4+3];  // replace ID
2905           }
2906         }
2907       }
2908     }
2909   }
2910   CkReductionMsg *redmsg = CkReductionMsg::buildNew(numBytes, ret);
2911   delete [] ret;
2912   return redmsg;
2913 }
2914
2915 #include "TraceProjections.def.h"
2916 #endif //PROJ_ANALYSIS
2917
2918 /*@}*/