Merge branch 'charm' of charmgit:charm into charm
[charm.git] / src / ck-perf / trace-projections.C
1 /**
2  * \addtogroup CkPerf
3 */
4 /*@{*/
5
6 #include <string.h>
7
8 #include "charm++.h"
9 #include "trace-projections.h"
10 #include "trace-projectionsBOC.h"
11
12 #if DEBUG_PROJ
13 #define DEBUGF(format, ...) CkPrintf(format, ## __VA_ARGS__)
14 #else
15 #define DEBUGF(format, ...)           // CmiPrintf x
16 #endif
17 #define DEBUGN(format, ...)  // easy way to selectively disable DEBUGs
18
19 #define DefaultLogBufSize      1000000
20
21 // **CW** Simple delta encoding implementation
22 // delta encoding is on by default. It may be turned off later in
23 // the runtime.
24 int deltaLog;
25 int nonDeltaLog;
26
27 int checknested=0;              // check illegal nested begin/end execute 
28
29 #ifdef PROJ_ANALYSIS
30 // BOC operations readonlys
31 CkGroupID traceProjectionsGID;
32 CkGroupID kMeansGID;
33
34 // New reduction type for Outlier Analysis purposes. This is allowed to be
35 // a global variable according to the Charm++ manual.
36 CkReductionMsg *outlierReduction(int nMsgs,
37                                  CkReductionMsg **msgs);
38 CkReductionMsg *minMaxReduction(int nMsgs,
39                                 CkReductionMsg **msgs);
40 CkReduction::reducerType outlierReductionType;
41 CkReduction::reducerType minMaxReductionType;
42 #endif // PROJ_ANALYSIS
43
44 CkpvStaticDeclare(TraceProjections*, _trace);
45 CtvStaticDeclare(int,curThreadEvent);
46
47 CkpvDeclare(CmiInt8, CtrLogBufSize);
48
49 typedef CkVec<char *>  usrEventVec;
50 CkpvStaticDeclare(usrEventVec, usrEventlist);
51 class UsrEvent {
52 public:
53   int e;
54   char *str;
55   UsrEvent(int _e, char* _s): e(_e),str(_s) {}
56 };
57 CkpvStaticDeclare(CkVec<UsrEvent *>*, usrEvents);
58
59 // When tracing is disabled, these are defined as empty static inlines
60 // in the header, to minimize overhead
61 #if CMK_TRACE_ENABLED
62 /// Disable the outputting of the trace logs
63 void disableTraceLogOutput()
64
65   CkpvAccess(_trace)->setWriteData(false);
66 }
67
68 /// Enable the outputting of the trace logs
69 void enableTraceLogOutput()
70 {
71   CkpvAccess(_trace)->setWriteData(true);
72 }
73
74 /// Force the log files to be flushed
75 void flushTraceLog()
76 {
77   CkpvAccess(_trace)->traceFlushLog();
78 }
79 #endif
80
81 #if ! CMK_TRACE_ENABLED
82 static int warned=0;
83 #define OPTIMIZED_VERSION       \
84         if (!warned) { warned=1;        \
85         CmiPrintf("\n\n!!!! Warning: traceUserEvent not available in optimized version!!!!\n\n\n"); }
86 #else
87 #define OPTIMIZED_VERSION /*empty*/
88 #endif // CMK_TRACE_ENABLED
89
90 /*
91 On T3E, we need to have file number control by open/close files only when needed.
92 */
93 #if CMK_TRACE_LOGFILE_NUM_CONTROL
94   #define OPEN_LOG openLog("a");
95   #define CLOSE_LOG closeLog();
96 #else
97   #define OPEN_LOG
98   #define CLOSE_LOG
99 #endif //CMK_TRACE_LOGFILE_NUM_CONTROL
100
101 #if CMK_HAS_COUNTER_PAPI
102 int papiEvents[NUMPAPIEVENTS] = { PAPI_L2_DCM, PAPI_FP_OPS };
103 #endif // CMK_HAS_COUNTER_PAPI
104
105 /**
106   For each TraceFoo module, _createTraceFoo() must be defined.
107   This function is called in _createTraces() generated in moduleInit.C
108 */
109 void _createTraceprojections(char **argv)
110 {
111   DEBUGF("%d createTraceProjections\n", CkMyPe());
112   CkpvInitialize(CkVec<char *>, usrEventlist);
113   CkpvInitialize(CkVec<UsrEvent *>*, usrEvents);
114   CkpvAccess(usrEvents) = new CkVec<UsrEvent *>();
115 #if CMK_BLUEGENE_CHARM
116   // CthRegister does not call the constructor
117 //  CkpvAccess(usrEvents) = CkVec<UsrEvent *>();
118 #endif //CMK_BLUEGENE_CHARM
119   CkpvInitialize(TraceProjections*, _trace);
120   CkpvAccess(_trace) = new  TraceProjections(argv);
121   CkpvAccess(_traces)->addTrace(CkpvAccess(_trace));
122   if (CkMyPe()==0) CkPrintf("Charm++: Tracemode Projections enabled.\n");
123 }
124  
125 /* ****** CW TEMPORARY LOCATION ***** Support for thread listeners */
126
127 struct TraceThreadListener {
128   struct CthThreadListener base;
129   int event;
130   int msgType;
131   int ep;
132   int srcPe;
133   int ml;
134   CmiObjId idx;
135 };
136
137 extern "C"
138 void traceThreadListener_suspend(struct CthThreadListener *l)
139 {
140   TraceThreadListener *a=(TraceThreadListener *)l;
141   /* here, we activate the appropriate trace codes for the appropriate
142      registered modules */
143   traceSuspend();
144 }
145
146 extern "C"
147 void traceThreadListener_resume(struct CthThreadListener *l) 
148 {
149   TraceThreadListener *a=(TraceThreadListener *)l;
150   /* here, we activate the appropriate trace codes for the appropriate
151      registered modules */
152   _TRACE_BEGIN_EXECUTE_DETAILED(a->event,a->msgType,a->ep,a->srcPe,a->ml,
153                                 CthGetThreadID(a->base.thread));
154   a->event=-1;
155   a->srcPe=CkMyPe(); /* potential lie to migrated threads */
156   a->ml=0;
157 }
158
159 extern "C"
160 void traceThreadListener_free(struct CthThreadListener *l) 
161 {
162   TraceThreadListener *a=(TraceThreadListener *)l;
163   delete a;
164 }
165
166 void TraceProjections::traceAddThreadListeners(CthThread tid, envelope *e)
167 {
168 #if CMK_TRACE_ENABLED
169   /* strip essential information from the envelope */
170   TraceThreadListener *a= new TraceThreadListener;
171   
172   a->base.suspend=traceThreadListener_suspend;
173   a->base.resume=traceThreadListener_resume;
174   a->base.free=traceThreadListener_free;
175   a->event=e->getEvent();
176   a->msgType=e->getMsgtype();
177   a->ep=e->getEpIdx();
178   a->srcPe=e->getSrcPe();
179   a->ml=e->getTotalsize();
180
181   CthAddListener(tid, (CthThreadListener *)a);
182 #endif
183 }
184
185 void LogPool::openLog(const char *mode)
186 {
187 #if CMK_PROJECTIONS_USE_ZLIB
188   if(compressed) {
189     if (nonDeltaLog) {
190       do {
191         zfp = gzopen(fname, mode);
192       } while (!zfp && (errno == EINTR || errno == EMFILE));
193       if(!zfp) CmiAbort("Cannot open Projections Compressed Non Delta Trace File for writing...\n");
194     }
195     if (deltaLog) {
196       do {
197         deltazfp = gzopen(dfname, mode);
198       } while (!deltazfp && (errno == EINTR || errno == EMFILE));
199       if (!deltazfp) 
200         CmiAbort("Cannot open Projections Compressed Delta Trace File for writing...\n");
201     }
202   } else {
203     if (nonDeltaLog) {
204       do {
205         fp = fopen(fname, mode);
206       } while (!fp && (errno == EINTR || errno == EMFILE));
207       if (!fp) {
208         CkPrintf("[%d] Attempting to open file [%s]\n",CkMyPe(),fname);
209         CmiAbort("Cannot open Projections Non Delta Trace File for writing...\n");
210       }
211     }
212     if (deltaLog) {
213       do {
214         deltafp = fopen(dfname, mode);
215       } while (!deltafp && (errno == EINTR || errno == EMFILE));
216       if (!deltafp) 
217         CmiAbort("Cannot open Projections Delta Trace File for writing...\n");
218     }
219   }
220 #else
221   if (nonDeltaLog) {
222     do {
223       fp = fopen(fname, mode);
224     } while (!fp && (errno == EINTR || errno == EMFILE));
225     if (!fp) {
226       CkPrintf("[%d] Attempting to open file [%s]\n",CkMyPe(),fname);
227       CmiAbort("Cannot open Projections Non Delta Trace File for writing...\n");
228     }
229   }
230   if (deltaLog) {
231     do {
232       deltafp = fopen(dfname, mode);
233     } while (!deltafp && (errno == EINTR || errno == EMFILE));
234     if(!deltafp) 
235       CmiAbort("Cannot open Projections Delta Trace File for writing...\n");
236   }
237 #endif
238 }
239
240 void LogPool::closeLog(void)
241 {
242 #if CMK_PROJECTIONS_USE_ZLIB
243   if(compressed) {
244     if (nonDeltaLog) gzclose(zfp);
245     if (deltaLog) gzclose(deltazfp);
246     return;
247   }
248 #endif
249   if (nonDeltaLog) { 
250 #if !defined(_WIN32) || defined(__CYGWIN__)
251     fsync(fileno(fp)); 
252 #endif
253     fclose(fp); 
254   }
255   if (deltaLog)  { 
256 #if !defined(_WIN32) || defined(__CYGWIN__)
257     fsync(fileno(deltafp)); 
258 #endif
259     fclose(deltafp);  
260   }
261 }
262
263 LogPool::LogPool(char *pgm) {
264   pool = new LogEntry[CkpvAccess(CtrLogBufSize)];
265   // defaults to writing data (no outlier changes)
266   writeData = true;
267   numEntries = 0;
268   // **CW** for simple delta encoding
269   prevTime = 0.0;
270   timeErr = 0.0;
271   globalStartTime = 0.0;
272   globalEndTime = 0.0;
273   headerWritten = 0;
274   numPhases = 0;
275   hasFlushed = false;
276
277   keepPhase = NULL;
278
279   fileCreated = false;
280   poolSize = CkpvAccess(CtrLogBufSize);
281   pgmname = new char[strlen(pgm)+1];
282   strcpy(pgmname, pgm);
283 }
284
285 void LogPool::createFile(const char *fix)
286 {
287   if (fileCreated) {
288     return;
289   }
290
291   char* filenameLastPart = strrchr(pgmname, PATHSEP) + 1; // Last occurrence of path separator
292   char *pathPlusFilePrefix = new char[1024];
293
294   if(nSubdirs > 0){
295     int sd = CkMyPe() % nSubdirs;
296     char *subdir = new char[1024];
297     sprintf(subdir, "%s.projdir.%d", pgmname, sd);
298     CmiMkdir(subdir);
299     sprintf(pathPlusFilePrefix, "%s%c%s%s", subdir, PATHSEP, filenameLastPart, fix);
300     delete[] subdir;
301   } else {
302     sprintf(pathPlusFilePrefix, "%s%s", pgmname, fix);
303   }
304
305   char pestr[10];
306   sprintf(pestr, "%d", CkMyPe());
307 #if CMK_PROJECTIONS_USE_ZLIB
308   int len;
309   if(compressed)
310     len = strlen(pathPlusFilePrefix)+strlen(".logold")+strlen(pestr)+strlen(".gz")+3;
311   else
312     len = strlen(pathPlusFilePrefix)+strlen(".logold")+strlen(pestr)+3;
313 #else
314   int len = strlen(pathPlusFilePrefix)+strlen(".logold")+strlen(pestr)+3;
315 #endif
316
317   if (nonDeltaLog) {
318     fname = new char[len];
319   }
320   if (deltaLog) {
321     dfname = new char[len];
322   }
323 #if CMK_PROJECTIONS_USE_ZLIB
324   if(compressed) {
325     if (deltaLog && nonDeltaLog) {
326       sprintf(fname, "%s.%s.logold.gz",  pathPlusFilePrefix, pestr);
327       sprintf(dfname, "%s.%s.log.gz", pathPlusFilePrefix, pestr);
328     } else {
329       if (nonDeltaLog) {
330         sprintf(fname, "%s.%s.log.gz", pathPlusFilePrefix,pestr);
331       } else {
332         sprintf(dfname, "%s.%s.log.gz", pathPlusFilePrefix, pestr);
333       }
334     }
335   } else {
336     if (deltaLog && nonDeltaLog) {
337       sprintf(fname, "%s.%s.logold", pathPlusFilePrefix, pestr);
338       sprintf(dfname, "%s.%s.log", pathPlusFilePrefix, pestr);
339     } else {
340       if (nonDeltaLog) {
341         sprintf(fname, "%s.%s.log", pathPlusFilePrefix, pestr);
342       } else {
343         sprintf(dfname, "%s.%s.log", pathPlusFilePrefix, pestr);
344       }
345     }
346   }
347 #else
348   if (deltaLog && nonDeltaLog) {
349     sprintf(fname, "%s.%s.logold", pathPlusFilePrefix, pestr);
350     sprintf(dfname, "%s.%s.log", pathPlusFilePrefix, pestr);
351   } else {
352     if (nonDeltaLog) {
353       sprintf(fname, "%s.%s.log", pathPlusFilePrefix, pestr);
354     } else {
355       sprintf(dfname, "%s.%s.log", pathPlusFilePrefix, pestr);
356     }
357   }
358 #endif
359   fileCreated = true;
360   delete[] pathPlusFilePrefix;
361   openLog("w+");
362   CLOSE_LOG 
363 }
364
365 void LogPool::createSts(const char *fix)
366 {
367   CkAssert(CkMyPe() == 0);
368   // create the sts file
369   char *fname = new char[strlen(CkpvAccess(traceRoot))+strlen(fix)+strlen(".sts")+2];
370   sprintf(fname, "%s%s.sts", CkpvAccess(traceRoot), fix);
371   do
372     {
373       stsfp = fopen(fname, "w");
374     } while (!stsfp && (errno == EINTR || errno == EMFILE));
375   if(stsfp==0)
376     CmiAbort("Cannot open projections sts file for writing.\n");
377   delete[] fname;
378 }  
379
380 void LogPool::createRC()
381 {
382   // create the projections rc file.
383   fname = 
384     new char[strlen(CkpvAccess(traceRoot))+strlen(".projrc")+1];
385   sprintf(fname, "%s.projrc", CkpvAccess(traceRoot));
386   do {
387     rcfp = fopen(fname, "w");
388   } while (!rcfp && (errno == EINTR || errno == EMFILE));
389   if (rcfp==0) {
390     CmiAbort("Cannot open projections configuration file for writing.\n");
391   }
392   delete[] fname;
393 }
394
395 LogPool::~LogPool() 
396 {
397   if (writeData) {
398     writeLog();
399 #if !CMK_TRACE_LOGFILE_NUM_CONTROL
400     closeLog();
401 #endif
402   }
403
404 #if CMK_BLUEGENE_CHARM
405   extern int correctTimeLog;
406   if (correctTimeLog) {
407     createFile("-bg");
408     if (CkMyPe() == 0) {
409       createSts("-bg");
410     }
411     writeHeader();
412     if (CkMyPe() == 0) writeSts(NULL);
413     postProcessLog();
414   }
415 #endif
416
417   delete[] pool;
418   delete [] fname;
419 }
420
421 void LogPool::writeHeader()
422 {
423   if (headerWritten) return;
424   headerWritten = 1;
425   if(!binary) {
426 #if CMK_PROJECTIONS_USE_ZLIB
427     if(compressed) {
428       if (nonDeltaLog) {
429         gzprintf(zfp, "PROJECTIONS-RECORD %d\n", numEntries);
430       }
431       if (deltaLog) {
432         gzprintf(deltazfp, "PROJECTIONS-RECORD %d DELTA\n", numEntries);
433       }
434     } 
435     else /* else clause is below... */
436 #endif
437     /*... may hang over from else above */ {
438       if (nonDeltaLog) {
439         fprintf(fp, "PROJECTIONS-RECORD %d\n", numEntries);
440       }
441       if (deltaLog) {
442         fprintf(deltafp, "PROJECTIONS-RECORD %d DELTA\n", numEntries);
443       }
444     }
445   }
446   else { // binary
447       if (nonDeltaLog) {
448         fwrite(&numEntries,sizeof(numEntries),1,fp);
449       }
450       if (deltaLog) {
451         fwrite(&numEntries,sizeof(numEntries),1,deltafp);
452       }
453   }
454 }
455
456 void LogPool::writeLog(void)
457 {
458   createFile();
459   OPEN_LOG
460   writeHeader();
461   if (nonDeltaLog) write(0);
462   if (deltaLog) write(1);
463   CLOSE_LOG
464 }
465
466 void LogPool::write(int writedelta) 
467 {
468   // **CW** Simple delta encoding implementation
469   // prevTime has to be maintained as an object variable because
470   // LogPool::write may be called several times depending on the
471   // +logsize value.
472   PUP::er *p = NULL;
473   if (binary) {
474     p = new PUP::toDisk(writedelta?deltafp:fp);
475   }
476 #if CMK_PROJECTIONS_USE_ZLIB
477   else if (compressed) {
478     p = new toProjectionsGZFile(writedelta?deltazfp:zfp);
479   }
480 #endif
481   else {
482     p = new toProjectionsFile(writedelta?deltafp:fp);
483   }
484   CmiAssert(p);
485   int curPhase = 0;
486   // **FIXME** - Should probably consider a more sophisticated bounds-based
487   //   approach for selective writing instead of making multiple if-checks
488   //   for every single event.
489   for(UInt i=0; i<numEntries; i++) {
490     if (!writedelta) {
491       if (keepPhase == NULL) {
492         // default case, when no phase selection is required.
493         pool[i].pup(*p);
494       } else {
495         // **FIXME** Might be a good idea to create a "filler" event block for
496         //   all the events taken out by phase filtering.
497         if (pool[i].type == END_PHASE) {
498           // always write phase markers
499           pool[i].pup(*p);
500           curPhase++;
501         } else if (pool[i].type == BEGIN_COMPUTATION ||
502                    pool[i].type == END_COMPUTATION) {
503           // always write BEGIN and END COMPUTATION markers
504           pool[i].pup(*p);
505         } else if (keepPhase[curPhase]) {
506           pool[i].pup(*p);
507         }
508       }
509     }
510     else {      // delta
511       // **FIXME** Implement phase-selective writing for delta logs
512       //   eventually
513       double time = pool[i].time;
514       if (pool[i].type != BEGIN_COMPUTATION && pool[i].type != END_COMPUTATION)
515       {
516         double timeDiff = (time-prevTime)*1.0e6;
517         UInt intTimeDiff = (UInt)timeDiff;
518         timeErr += timeDiff - intTimeDiff; /* timeErr is never >= 2.0 */
519         if (timeErr > 1.0) {
520           timeErr -= 1.0;
521           intTimeDiff++;
522         }
523         pool[i].time = intTimeDiff/1.0e6;
524       }
525       pool[i].pup(*p);
526       pool[i].time = time;      // restore time value
527       prevTime = time;
528     }
529   }
530   delete p;
531   delete [] keepPhase;
532 }
533
534 void LogPool::writeSts(void)
535 {
536   // for whining compilers
537   int i;
538   char name[30];
539   // generate an automatic unique ID for each log
540   fprintf(stsfp, "PROJECTIONS_ID %s\n", "");
541   fprintf(stsfp, "VERSION %s\n", PROJECTION_VERSION);
542   fprintf(stsfp, "TOTAL_PHASES %d\n", numPhases);
543 #if CMK_HAS_COUNTER_PAPI
544   fprintf(stsfp, "TOTAL_PAPI_EVENTS %d\n", NUMPAPIEVENTS);
545   // for now, use i, next time use papiEvents[i].
546   // **CW** papi event names is a hack.
547   char eventName[PAPI_MAX_STR_LEN];
548   for (i=0;i<NUMPAPIEVENTS;i++) {
549     PAPI_event_code_to_name(papiEvents[i], eventName);
550     fprintf(stsfp, "PAPI_EVENT %d %s\n", i, eventName);
551   }
552 #endif
553   traceWriteSTS(stsfp,CkpvAccess(usrEvents)->length());
554   for(i=0;i<CkpvAccess(usrEvents)->length();i++){
555     fprintf(stsfp, "EVENT %d %s\n", (*CkpvAccess(usrEvents))[i]->e, (*CkpvAccess(usrEvents))[i]->str);
556   }     
557 }
558
559 void LogPool::writeSts(TraceProjections *traceProj){
560   writeSts();
561   if (traceProj != NULL) {
562     CkHashtableIterator  *funcIter = traceProj->getfuncIterator();
563     funcIter->seekStart();
564     int numFuncs = traceProj->getFuncNumber();
565     fprintf(stsfp,"TOTAL_FUNCTIONS %d \n",numFuncs);
566     while(funcIter->hasNext()) {
567       StrKey *key;
568       int *obj = (int *)funcIter->next((void **)&key);
569       fprintf(stsfp,"FUNCTION %d %s \n",*obj,key->getStr());
570     }
571   }
572   fprintf(stsfp, "END\n");
573   fclose(stsfp);
574 }
575
576 void LogPool::writeRC(void)
577 {
578     //CkPrintf("write RC is being executed\n");
579 #ifdef PROJ_ANALYSIS  
580     CkAssert(CkMyPe() == 0);
581     fprintf(rcfp,"RC_GLOBAL_START_TIME %lld\n",
582           (CMK_TYPEDEF_UINT8)(1.0e6*globalStartTime));
583     fprintf(rcfp,"RC_GLOBAL_END_TIME   %lld\n",
584           (CMK_TYPEDEF_UINT8)(1.0e6*globalEndTime));
585     /* //Yanhua comment it because isOutlierAutomatic is not a variable in trace
586     if (CkpvAccess(_trace)->isOutlierAutomatic()) {
587       fprintf(rcfp,"RC_OUTLIER_FILTERED true\n");
588     } else {
589       fprintf(rcfp,"RC_OUTLIER_FILTERED false\n");
590     }
591     */
592 #endif //PROJ_ANALYSIS
593   fclose(rcfp);
594 }
595
596
597 #if CMK_BLUEGENE_CHARM
598 static void updateProjLog(void *data, double t, double recvT, void *ptr)
599 {
600   LogEntry *log = (LogEntry *)data;
601   FILE *fp = *(FILE **)ptr;
602   log->time = t;
603   log->recvTime = recvT<0.0?0:recvT;
604 //  log->write(fp);
605   toProjectionsFile p(fp);
606   log->pup(p);
607 }
608 #endif
609
610 // flush log entries to disk
611 void LogPool::flushLogBuffer()
612 {
613   if (numEntries) {
614     double writeTime = TraceTimer();
615     writeLog();
616     hasFlushed = true;
617     numEntries = 0;
618     new (&pool[numEntries++]) LogEntry(writeTime, BEGIN_INTERRUPT);
619     new (&pool[numEntries++]) LogEntry(TraceTimer(), END_INTERRUPT);
620   }
621 }
622
623 void LogPool::add(UChar type, UShort mIdx, UShort eIdx,
624                   double time, int event, int pe, int ml, CmiObjId *id, 
625                   double recvT, double cpuT, int numPe)
626 {
627   new (&pool[numEntries++])
628     LogEntry(time, type, mIdx, eIdx, event, pe, ml, id, recvT, cpuT, numPe);
629   if ((type == END_PHASE) || (type == END_COMPUTATION)) {
630     numPhases++;
631   }
632   if(poolSize==numEntries) {
633     flushLogBuffer();
634 #if CMK_BLUEGENE_CHARM
635     extern int correctTimeLog;
636     if (correctTimeLog) CmiAbort("I/O interrupt!\n");
637 #endif
638   }
639 #if CMK_BLUEGENE_CHARM
640   switch (type) {
641     case BEGIN_PROCESSING:
642       pool[numEntries-1].recvTime = BgGetRecvTime();
643     case END_PROCESSING:
644     case BEGIN_COMPUTATION:
645     case END_COMPUTATION:
646     case CREATION:
647     case BEGIN_PACK:
648     case END_PACK:
649     case BEGIN_UNPACK:
650     case END_UNPACK:
651     case USER_EVENT_PAIR:
652       bgAddProjEvent(&pool[numEntries-1], numEntries-1, time, updateProjLog, &fp, BG_EVENT_PROJ);
653   }
654 #endif
655 }
656
657 void LogPool::add(UChar type,double time,UShort funcID,int lineNum,char *fileName){
658 #ifndef CMK_BLUEGENE_CHARM
659   new (&pool[numEntries++])
660         LogEntry(time,type,funcID,lineNum,fileName);
661   if(poolSize == numEntries){
662     flushLogBuffer();
663   }
664 #endif  
665 }
666
667
668   
669 void LogPool::addMemoryUsage(unsigned char type,double time,double memUsage){
670 #ifndef CMK_BLUEGENE_CHARM
671   new (&pool[numEntries++])
672         LogEntry(type,time,memUsage);
673   if(poolSize == numEntries){
674     flushLogBuffer();
675   }
676 #endif  
677         
678 }  
679
680
681
682 void LogPool::addUserSupplied(int data){
683         // add an event
684         add(USER_SUPPLIED, 0, 0, TraceTimer(), -1, -1, 0, 0, 0, 0, 0 );
685
686         // set the user supplied value for the previously created event 
687         pool[numEntries-1].setUserSuppliedData(data);
688   }
689
690
691 void LogPool::addUserSuppliedNote(char *note){
692         // add an event
693         add(USER_SUPPLIED_NOTE, 0, 0, TraceTimer(), -1, -1, 0, 0, 0, 0, 0 );
694
695         // set the user supplied note for the previously created event 
696         pool[numEntries-1].setUserSuppliedNote(note);
697   }
698
699 void LogPool::addUserSuppliedBracketedNote(char *note, int eventID, double bt, double et){
700   //CkPrintf("LogPool::addUserSuppliedBracketedNote eventID=%d\n", eventID);
701 #ifndef CMK_BLUEGENE_CHARM
702 #if MPI_TRACE_MACHINE_HACK
703   //This part of code is used  to combine the contiguous
704   //MPI_Test and MPI_Iprobe events to reduce the number of
705   //entries
706 #define MPI_TEST_EVENT_ID 60
707 #define MPI_IPROBE_EVENT_ID 70 
708   int lastEvent = pool[numEntries-1].event;
709   if((eventID==MPI_TEST_EVENT_ID || eventID==MPI_IPROBE_EVENT_ID) && (eventID==lastEvent)){
710     //just replace the endtime of last event
711     //CkPrintf("addUserSuppliedBracketNote: for event %d\n", lastEvent);
712     pool[numEntries].endTime = et;
713   }else{
714     new (&pool[numEntries++])
715       LogEntry(bt, et, USER_SUPPLIED_BRACKETED_NOTE, note, eventID);
716   }
717 #else
718   new (&pool[numEntries++])
719     LogEntry(bt, et, USER_SUPPLIED_BRACKETED_NOTE, note, eventID);
720 #endif
721   if(poolSize == numEntries){
722     flushLogBuffer();
723   }
724 #endif  
725 }
726
727
728 /* **CW** Not sure if this is the right thing to do. Feels more like
729    a hack than a solution to Sameer's request to add the destination
730    processor information to multicasts and broadcasts.
731
732    In the unlikely event this method is used for Broadcasts as well,
733    pelist == NULL will be used to indicate a global broadcast with 
734    num PEs.
735 */
736 void LogPool::addCreationMulticast(UShort mIdx, UShort eIdx, double time,
737                                    int event, int pe, int ml, CmiObjId *id,
738                                    double recvT, int numPe, int *pelist)
739 {
740   new (&pool[numEntries++])
741     LogEntry(time, mIdx, eIdx, event, pe, ml, id, recvT, numPe, pelist);
742   if(poolSize==numEntries) {
743     flushLogBuffer();
744   }
745 }
746
747 void LogPool::postProcessLog()
748 {
749 #if CMK_BLUEGENE_CHARM
750   bgUpdateProj(1);   // event type
751 #endif
752 }
753
754 void LogPool::modLastEntryTimestamp(double ts)
755 {
756   pool[numEntries-1].time = ts;
757   //pool[numEntries-1].cputime = ts;
758 }
759
760 // /** Constructor for a multicast log entry */
761 // 
762 //  THIS WAS MOVED TO trace-projections.h with the other constructors
763 // 
764 // LogEntry::LogEntry(double tm, unsigned short m, unsigned short e, int ev, int p,
765 //           int ml, CmiObjId *d, double rt, int numPe, int *pelist) 
766 // {
767 //     type = CREATION_MULTICAST; mIdx = m; eIdx = e; event = ev; pe = p; time = tm; msglen = ml;
768 //     if (d) id = *d; else {id.id[0]=id.id[1]=id.id[2]=id.id[3]=-1; };
769 //     recvTime = rt; 
770 //     numpes = numPe;
771 //     userSuppliedNote = NULL;
772 //     if (pelist != NULL) {
773 //      pes = new int[numPe];
774 //      for (int i=0; i<numPe; i++) {
775 //        pes[i] = pelist[i];
776 //      }
777 //     } else {
778 //      pes= NULL;
779 //     }
780 // }
781
782 //void LogEntry::addPapi(LONG_LONG_PAPI *papiVals)
783 //{
784 //#if CMK_HAS_COUNTER_PAPI
785 //   memcpy(papiValues, papiVals, sizeof(LONG_LONG_PAPI)*NUMPAPIEVENTS);
786 //#endif
787 //}
788
789
790
791 void LogEntry::pup(PUP::er &p)
792 {
793   int i;
794   CMK_TYPEDEF_UINT8 itime, iEndTime, irecvtime, icputime;
795   char ret = '\n';
796
797   p|type;
798   if (p.isPacking()) itime = (CMK_TYPEDEF_UINT8)(1.0e6*time);
799   if (p.isPacking()) iEndTime = (CMK_TYPEDEF_UINT8)(1.0e6*endTime);
800
801   switch (type) {
802     case USER_EVENT:
803     case USER_EVENT_PAIR:
804       p|mIdx; p|itime; p|event; p|pe;
805       break;
806     case BEGIN_IDLE:
807     case END_IDLE:
808     case BEGIN_PACK:
809     case END_PACK:
810     case BEGIN_UNPACK:
811     case END_UNPACK:
812       p|itime; p|pe; 
813       break;
814     case BEGIN_PROCESSING:
815       if (p.isPacking()) {
816         irecvtime = (CMK_TYPEDEF_UINT8)(recvTime==-1?-1:1.0e6*recvTime);
817         icputime = (CMK_TYPEDEF_UINT8)(1.0e6*cputime);
818       }
819       p|mIdx; p|eIdx; p|itime; p|event; p|pe; 
820       p|msglen; p|irecvtime; 
821       p|id.id[0]; p|id.id[1]; p|id.id[2]; p|id.id[3];
822       p|icputime;
823 #if CMK_HAS_COUNTER_PAPI
824       //p|numPapiEvents;
825       for (i=0; i<NUMPAPIEVENTS; i++) {
826         // not yet!!!
827         //      p|papiIDs[i]; 
828         p|papiValues[i];
829         
830       }
831 #else
832       //p|numPapiEvents;     // non papi version has value 0
833 #endif
834       if (p.isUnpacking()) {
835         recvTime = irecvtime/1.0e6;
836         cputime = icputime/1.0e6;
837       }
838       break;
839     case END_PROCESSING:
840       if (p.isPacking()) icputime = (CMK_TYPEDEF_UINT8)(1.0e6*cputime);
841       p|mIdx; p|eIdx; p|itime; p|event; p|pe; p|msglen; p|icputime;
842 #if CMK_HAS_COUNTER_PAPI
843       //p|numPapiEvents;
844       for (i=0; i<NUMPAPIEVENTS; i++) {
845         // not yet!!!
846         //      p|papiIDs[i];
847         p|papiValues[i];
848       }
849 #else
850       //p|numPapiEvents;  // non papi version has value 0
851 #endif
852       if (p.isUnpacking()) cputime = icputime/1.0e6;
853       break;
854     case USER_SUPPLIED:
855           p|userSuppliedData;
856           p|itime;
857         break;
858     case USER_SUPPLIED_NOTE:
859           p|itime;
860           int length;
861           if (p.isPacking()) length = strlen(userSuppliedNote);
862           p | length;
863           char space;
864           space = ' ';
865           p | space;
866           if (p.isUnpacking()) {
867             userSuppliedNote = new char[length+1];
868             userSuppliedNote[length] = '\0';
869           }
870           PUParray(p,userSuppliedNote, length);
871           break;
872     case USER_SUPPLIED_BRACKETED_NOTE:
873       //CkPrintf("Writting out a USER_SUPPLIED_BRACKETED_NOTE\n");
874           p|itime;
875           p|iEndTime;
876           p|event;
877           int length2;
878           if (p.isPacking()) length2 = strlen(userSuppliedNote);
879           p | length2;
880           char space2;
881           space2 = ' ';
882           p | space2;
883           if (p.isUnpacking()) {
884             userSuppliedNote = new char[length+1];
885             userSuppliedNote[length] = '\0';
886           }
887           PUParray(p,userSuppliedNote, length2);
888           break;
889     case MEMORY_USAGE_CURRENT:
890       p | memUsage;
891       p | itime;
892         break;
893     case CREATION:
894       if (p.isPacking()) irecvtime = (CMK_TYPEDEF_UINT8)(1.0e6*recvTime);
895       p|mIdx; p|eIdx; p|itime;
896       p|event; p|pe; p|msglen; p|irecvtime;
897       if (p.isUnpacking()) recvTime = irecvtime/1.0e6;
898       break;
899     case CREATION_BCAST:
900       if (p.isPacking()) irecvtime = (CMK_TYPEDEF_UINT8)(1.0e6*recvTime);
901       p|mIdx; p|eIdx; p|itime;
902       p|event; p|pe; p|msglen; p|irecvtime; p|numpes;
903       if (p.isUnpacking()) recvTime = irecvtime/1.0e6;
904       break;
905     case CREATION_MULTICAST:
906       if (p.isPacking()) irecvtime = (CMK_TYPEDEF_UINT8)(1.0e6*recvTime);
907       p|mIdx; p|eIdx; p|itime;
908       p|event; p|pe; p|msglen; p|irecvtime; p|numpes;
909       if (p.isUnpacking()) pes = numpes?new int[numpes]:NULL;
910       for (i=0; i<numpes; i++) p|pes[i];
911       if (p.isUnpacking()) recvTime = irecvtime/1.0e6;
912       break;
913     case MESSAGE_RECV:
914       p|mIdx; p|eIdx; p|itime; p|event; p|pe; p|msglen;
915       break;
916
917     case ENQUEUE:
918     case DEQUEUE:
919       p|mIdx; p|itime; p|event; p|pe;
920       break;
921
922     case BEGIN_INTERRUPT:
923     case END_INTERRUPT:
924       p|itime; p|event; p|pe;
925       break;
926
927       // **CW** absolute timestamps are used here to support a quick
928       // way of determining the total time of a run in projections
929       // visualization.
930     case BEGIN_COMPUTATION:
931     case END_COMPUTATION:
932     case BEGIN_TRACE:
933     case END_TRACE:
934       p|itime;
935       break;
936     case BEGIN_FUNC:
937         p | itime;
938         p | mIdx;
939         p | event;
940         if(!p.isUnpacking()){
941                 p(fName,flen-1);
942         }
943         break;
944     case END_FUNC:
945         p | itime;
946         p | mIdx;
947         break;
948     case END_PHASE:
949       p|eIdx; // FIXME: actually the phase ID
950       p|itime;
951       break;
952     default:
953       CmiError("***Internal Error*** Wierd Event %d.\n", type);
954       break;
955   }
956   if (p.isUnpacking()) time = itime/1.0e6;
957   p|ret;
958 }
959
960 TraceProjections::TraceProjections(char **argv): 
961   curevent(0), inEntry(0), computationStarted(0), 
962         converseExit(0), endTime(0.0), traceNestedEvents(0),
963         currentPhaseID(0), lastPhaseEvent(NULL)
964 {
965   //  CkPrintf("Trace projections dummy constructor called on %d\n",CkMyPe());
966
967   if (CkpvAccess(traceOnPe) == 0) return;
968
969   CtvInitialize(int,curThreadEvent);
970   CkpvInitialize(CmiInt8, CtrLogBufSize);
971   CkpvAccess(CtrLogBufSize) = DefaultLogBufSize;
972   CtvAccess(curThreadEvent)=0;
973   if (CmiGetArgLongDesc(argv,"+logsize",&CkpvAccess(CtrLogBufSize), 
974                        "Log entries to buffer per I/O")) {
975     if (CkMyPe() == 0) {
976       CmiPrintf("Trace: logsize: %ld\n", CkpvAccess(CtrLogBufSize));
977     }
978   }
979   checknested = 
980     CmiGetArgFlagDesc(argv,"+checknested",
981                       "check projections nest begin end execute events");
982   traceNestedEvents = 
983     CmiGetArgFlagDesc(argv,"+tracenested",
984               "trace projections nest begin/end execute events");
985   int binary = 
986     CmiGetArgFlagDesc(argv,"+binary-trace",
987                       "Write log files in binary format");
988
989   CmiInt8 nSubdirs = 0;
990   CmiGetArgLongDesc(argv,"+trace-subdirs", &nSubdirs, "Number of subdirectories into which traces will be written");
991
992
993 #if CMK_PROJECTIONS_USE_ZLIB
994   int compressed = CmiGetArgFlagDesc(argv,"+gz-trace","Write log files pre-compressed with gzip");
995 #else
996   // consume the flag so there's no confusing
997   CmiGetArgFlagDesc(argv,"+gz-trace",
998                     "Write log files pre-compressed with gzip");
999   if(CkMyPe() == 0) CkPrintf("Warning> gz-trace is not supported on this machine!\n");
1000 #endif
1001
1002   // **CW** default to non delta log encoding. The user may choose to do
1003   // create both logs (for debugging) or just the old log timestamping
1004   // (for compatibility).
1005   // Generating just the non delta log takes precedence over generating
1006   // both logs (if both arguments appear on the command line).
1007
1008   // switch to OLD log format until everything works // Gengbin
1009   nonDeltaLog = 1;
1010   deltaLog = 0;
1011   deltaLog = CmiGetArgFlagDesc(argv, "+logDelta",
1012                                   "Generate Delta encoded and simple timestamped log files");
1013
1014   _logPool = new LogPool(CkpvAccess(traceRoot));
1015   _logPool->setNumSubdirs(nSubdirs);
1016   _logPool->setBinary(binary);
1017 #if CMK_PROJECTIONS_USE_ZLIB
1018   _logPool->setCompressed(compressed);
1019 #endif
1020   if (CkMyPe() == 0) {
1021     _logPool->createSts();
1022     _logPool->createRC();
1023   }
1024   funcCount=1;
1025
1026 #if CMK_HAS_COUNTER_PAPI
1027   // We initialize and create the event sets for use with PAPI here.
1028   int papiRetValue = PAPI_library_init(PAPI_VER_CURRENT);
1029   if (papiRetValue != PAPI_VER_CURRENT) {
1030     CmiAbort("PAPI Library initialization failure!\n");
1031   }
1032   // PAPI 3 mandates the initialization of the set to PAPI_NULL
1033   papiEventSet = PAPI_NULL; 
1034   if (PAPI_create_eventset(&papiEventSet) != PAPI_OK) {
1035     CmiAbort("PAPI failed to create event set!\n");
1036   }
1037   papiRetValue = PAPI_add_events(papiEventSet, papiEvents, NUMPAPIEVENTS);
1038   if (papiRetValue != PAPI_OK) {
1039     if (papiRetValue == PAPI_ECNFLCT) {
1040       CmiAbort("PAPI events conflict! Please re-assign event types!\n");
1041     } else {
1042       CmiAbort("PAPI failed to add designated events!\n");
1043     }
1044   }
1045   memset(papiValues, 0, NUMPAPIEVENTS*sizeof(LONG_LONG_PAPI));
1046 #endif
1047 }
1048
1049 int TraceProjections::traceRegisterUserEvent(const char* evt, int e)
1050 {
1051   OPTIMIZED_VERSION
1052   CkAssert(e==-1 || e>=0);
1053   CkAssert(evt != NULL);
1054   int event;
1055   int biggest = -1;
1056   for (int i=0; i<CkpvAccess(usrEvents)->length(); i++) {
1057     int cur = (*CkpvAccess(usrEvents))[i]->e;
1058     if (cur == e) {
1059       //CmiPrintf("%s %s\n", (*CkpvAccess(usrEvents))[i]->str, evt);
1060       if (strcmp((*CkpvAccess(usrEvents))[i]->str, evt) == 0) 
1061         return e;
1062       else
1063         CmiAbort("UserEvent double registered!");
1064     }
1065     if (cur > biggest) biggest = cur;
1066   }
1067   // if no user events have so far been registered. biggest will be -1
1068   // and hence newly assigned event numbers will begin from 0.
1069   if (e==-1) event = biggest+1;  // automatically assign new event number
1070   else event = e;
1071   CkpvAccess(usrEvents)->push_back(new UsrEvent(event,(char *)evt));
1072   return event;
1073 }
1074
1075 void TraceProjections::traceClearEps(void)
1076 {
1077   // In trace-summary, this zeros out the EP bins, to eliminate noise
1078   // from startup.  Here, this isn't useful, since we can do that in
1079   // post-processing
1080 }
1081
1082 void TraceProjections::traceWriteSts(void)
1083 {
1084   if(CkMyPe()==0)
1085     _logPool->writeSts(this);
1086 }
1087
1088 /** 
1089  * **IMPT NOTES**:
1090  *
1091  * This is called when Converse closes during ConverseCommonExit().
1092  * **FIXME**(?) - is this also exposed as a tracing-framework API call?
1093  *
1094  * Some programs bypass CkExit() (like NAMD, which eventually calls
1095  * ConverseExit()), modules like traces will have to pretend to shutdown
1096  * as if CkExit() was called but at the same time avoid making
1097  * subsequent CkExit() calls (which is usually required for allowing
1098  * other modules to shutdown).
1099  *
1100  * Note that we can only get here if CkExit() was not called, since the
1101  * trace module will un-register itself from TraceArray if it did.
1102  *
1103  */
1104 void TraceProjections::traceClose(void)
1105 {
1106 #ifdef PROJ_ANALYSIS
1107   // CkPrintf("CkExit was not called on shutdown on [%d]\n", CkMyPe());
1108
1109   // sets the flag that tells the code not to make the CkExit call later
1110   converseExit = 1;
1111   if (CkMyPe() == 0) {
1112     CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1113     bocProxy.traceProjectionsParallelShutdown(-1);
1114   }
1115   if(CkMyRank() == CkMyNodeSize()){ //communication thread
1116     CkpvAccess(_trace)->endComputation();
1117     delete _logPool;              // will write
1118     // remove myself from traceArray so that no tracing will be called.
1119     CkpvAccess(_traces)->removeTrace(this);
1120   }
1121 #else
1122   // we've already deleted the logpool, so multiple calls to traceClose
1123   // are tolerated.
1124   if (_logPool == NULL) {
1125     return;
1126   }
1127   if(CkMyPe()==0){
1128     _logPool->writeSts(this);
1129   }
1130   CkpvAccess(_trace)->endComputation();
1131   delete _logPool;              // will write
1132   // remove myself from traceArray so that no tracing will be called.
1133   CkpvAccess(_traces)->removeTrace(this);
1134 #endif
1135 }
1136
1137 /**
1138  *  **IMPT NOTES**:
1139  *
1140  *  This is meant to be called internally by the tracing framework.
1141  *
1142  */
1143 void TraceProjections::closeTrace() {
1144   //  CkPrintf("Close Trace called on [%d]\n", CkMyPe());
1145   if (CkMyPe() == 0) {
1146     // CkPrintf("Pe 0 will now write sts and projrc files\n");
1147     _logPool->writeSts(this);
1148     _logPool->writeRC();
1149     // CkPrintf("Pe 0 has now written sts and projrc files\n");
1150   }
1151   delete _logPool;       // will write logs to file
1152 }
1153
1154 #if CMK_SMP_TRACE_COMMTHREAD
1155 void TraceProjections::traceBeginOnCommThread()
1156 {
1157   if (!computationStarted) return;
1158   _logPool->add(BEGIN_TRACE, 0, 0, TraceTimer(), curevent++, CmiNumPes()+CmiMyNode());
1159 }
1160
1161 void TraceProjections::traceEndOnCommThread()
1162 {
1163   _logPool->add(END_TRACE, 0, 0, TraceTimer(), curevent++, CmiNumPes()+CmiMyNode());
1164 }
1165 #endif
1166
1167 void TraceProjections::traceBegin(void)
1168 {
1169   if (!computationStarted) return;
1170   _logPool->add(BEGIN_TRACE, 0, 0, TraceTimer(), curevent++, CkMyPe());
1171 }
1172
1173 void TraceProjections::traceEnd(void)
1174 {
1175   _logPool->add(END_TRACE, 0, 0, TraceTimer(), curevent++, CkMyPe());
1176 }
1177
1178 void TraceProjections::userEvent(int e)
1179 {
1180   if (!computationStarted) return;
1181   _logPool->add(USER_EVENT, e, 0, TraceTimer(),curevent++,CkMyPe());
1182 }
1183
1184 void TraceProjections::userBracketEvent(int e, double bt, double et)
1185 {
1186   if (!computationStarted) return;
1187   // two events record Begin/End of event e.
1188   _logPool->add(USER_EVENT_PAIR, e, 0, TraceTimer(bt), curevent, CkMyPe());
1189   _logPool->add(USER_EVENT_PAIR, e, 0, TraceTimer(et), curevent++, CkMyPe());
1190 }
1191
1192 void TraceProjections::userSuppliedData(int d)
1193 {
1194   if (!computationStarted) return;
1195   _logPool->addUserSupplied(d);
1196 }
1197
1198 void TraceProjections::userSuppliedNote(char *note)
1199 {
1200   if (!computationStarted) return;
1201   _logPool->addUserSuppliedNote(note);
1202 }
1203
1204
1205 void TraceProjections::userSuppliedBracketedNote(char *note, int eventID, double bt, double et)
1206 {
1207   if (!computationStarted) return;
1208   _logPool->addUserSuppliedBracketedNote(note,  eventID,  bt, et);
1209 }
1210
1211 void TraceProjections::memoryUsage(double m)
1212 {
1213   if (!computationStarted) return;
1214   _logPool->addMemoryUsage(MEMORY_USAGE_CURRENT, TraceTimer(), m );
1215   
1216 }
1217
1218
1219 void TraceProjections::creation(envelope *e, int ep, int num)
1220 {
1221   double curTime = TraceTimer();
1222   if (e == 0) {
1223     CtvAccess(curThreadEvent) = curevent;
1224     _logPool->add(CREATION, ForChareMsg, ep, curTime,
1225                   curevent++, CkMyPe(), 0, NULL, 0, 0.0);
1226   } else {
1227     int type=e->getMsgtype();
1228     e->setEvent(curevent);
1229     if (num > 1) {
1230       _logPool->add(CREATION_BCAST, type, ep, curTime,
1231                     curevent++, CkMyPe(), e->getTotalsize(), 
1232                     NULL, 0, 0.0, num);
1233     } else {
1234       _logPool->add(CREATION, type, ep, curTime,
1235                     curevent++, CkMyPe(), e->getTotalsize(), 
1236                     NULL, 0, 0.0);
1237     }
1238   }
1239 }
1240
1241 void TraceProjections::creation(char *msg)
1242 {
1243 #if CMK_SMP_TRACE_COMMTHREAD
1244         //This function is only called from a comm thread
1245         //in SMP mode. So, it is possible the msg is not
1246         //a charm msg that contains an envelope, ep idx.
1247         envelope *e = (envelope *)msg;
1248         int ep = e->getEpIdx();
1249         int num = _entryTable.size();
1250         if(ep<num && ep>=0 && _entryTable[ep]->traceEnabled)
1251                 creation(e, ep, 1);
1252 #endif
1253 }
1254
1255
1256 /* **CW** Non-disruptive attempt to add destination PE knowledge to
1257    Communication Library-specific Multicasts via new event 
1258    CREATION_MULTICAST.
1259 */
1260
1261 void TraceProjections::creationMulticast(envelope *e, int ep, int num,
1262                                          int *pelist)
1263 {
1264   double curTime = TraceTimer();
1265   if (e==0) {
1266     CtvAccess(curThreadEvent)=curevent;
1267     _logPool->addCreationMulticast(ForChareMsg, ep, curTime, curevent++,
1268                                    CkMyPe(), 0, 0, 0.0, num, pelist);
1269   } else {
1270     int type=e->getMsgtype();
1271     e->setEvent(curevent);
1272     _logPool->addCreationMulticast(type, ep, curTime, curevent++, CkMyPe(),
1273                                    e->getTotalsize(), 0, 0.0, num, pelist);
1274   }
1275 }
1276
1277 void TraceProjections::creationDone(int num)
1278 {
1279   // modified the creation done time of the last num log entries
1280   // FIXME: totally a hack
1281   double curTime = TraceTimer();
1282   int idx = _logPool->numEntries-1;
1283   while (idx >=0 && num >0 ) {
1284     LogEntry &log = _logPool->pool[idx];
1285     if ((log.type == CREATION) ||
1286         (log.type == CREATION_BCAST) ||
1287         (log.type == CREATION_MULTICAST)) {
1288       log.recvTime = curTime - log.time;
1289       num --;
1290     }
1291     idx--;
1292   }
1293 }
1294
1295 void TraceProjections::beginExecute(CmiObjId *tid)
1296 {
1297 #if CMK_HAS_COUNTER_PAPI
1298   if (PAPI_read(papiEventSet, papiValues) != PAPI_OK) {
1299     CmiAbort("PAPI failed to read at begin execute!\n");
1300   }
1301 #endif
1302   if (checknested && inEntry) CmiAbort("Nested Begin Execute!\n");
1303   execEvent = CtvAccess(curThreadEvent);
1304   execEp = (-1);
1305   _logPool->add(BEGIN_PROCESSING,ForChareMsg,_threadEP,TraceTimer(),
1306                 execEvent,CkMyPe(), 0, tid);
1307 #if CMK_HAS_COUNTER_PAPI
1308   _logPool->addPapi(papiValues);
1309 #endif
1310   inEntry = 1;
1311 }
1312
1313 void TraceProjections::beginExecute(envelope *e)
1314 {
1315   if(e==0) {
1316 #if CMK_HAS_COUNTER_PAPI
1317     if (PAPI_read(papiEventSet, papiValues) != PAPI_OK) {
1318       CmiAbort("PAPI failed to read at begin execute!\n");
1319     }
1320 #endif
1321     if (checknested && inEntry) CmiAbort("Nested Begin Execute!\n");
1322     execEvent = CtvAccess(curThreadEvent);
1323     execEp = (-1);
1324     _logPool->add(BEGIN_PROCESSING,ForChareMsg,_threadEP,TraceTimer(),
1325                   execEvent,CkMyPe(), 0, NULL, 0.0, TraceCpuTimer());
1326 #if CMK_HAS_COUNTER_PAPI
1327     _logPool->addPapi(papiValues);
1328 #endif
1329     inEntry = 1;
1330   } else {
1331     beginExecute(e->getEvent(),e->getMsgtype(),e->getEpIdx(),
1332                  e->getSrcPe(),e->getTotalsize());
1333   }
1334 }
1335
1336 void TraceProjections::beginExecute(char *msg){
1337 #if CMK_SMP_TRACE_COMMTHREAD
1338         //This function is called from comm thread in SMP mode
1339     envelope *e = (envelope *)msg;
1340     int num = _entryTable.size();
1341     int ep = e->getEpIdx();
1342     if(ep<0 || ep>=num) return;
1343     if(_entryTable[ep]->traceEnabled)
1344                 beginExecute(e);
1345 #endif
1346 }
1347
1348 void TraceProjections::beginExecute(int event, int msgType, int ep, int srcPe,
1349                                     int mlen, CmiObjId *idx)
1350 {
1351   if (traceNestedEvents) {
1352     if (! nestedEvents.isEmpty()) {
1353       endExecuteLocal();
1354     }
1355     nestedEvents.enq(NestedEvent(event, msgType, ep, srcPe, mlen, idx));
1356   }
1357   beginExecuteLocal(event, msgType, ep, srcPe, mlen, idx);
1358 }
1359
1360 void TraceProjections::changeLastEntryTimestamp(double ts)
1361 {
1362   _logPool->modLastEntryTimestamp(ts);
1363 }
1364
1365 void TraceProjections::beginExecuteLocal(int event, int msgType, int ep, int srcPe,
1366                                     int mlen, CmiObjId *idx)
1367 {
1368 #if CMK_HAS_COUNTER_PAPI
1369   if (PAPI_read(papiEventSet, papiValues) != PAPI_OK) {
1370     CmiAbort("PAPI failed to read at begin execute!\n");
1371   }
1372 #endif
1373   if (checknested && inEntry) CmiAbort("Nested Begin Execute!\n");
1374   execEvent=event;
1375   execEp=ep;
1376   execPe=srcPe;
1377   _logPool->add(BEGIN_PROCESSING,msgType,ep,TraceTimer(),event,
1378                 srcPe, mlen, idx, 0.0, TraceCpuTimer());
1379 #if CMK_HAS_COUNTER_PAPI
1380   _logPool->addPapi(papiValues);
1381 #endif
1382   inEntry = 1;
1383 }
1384
1385 void TraceProjections::endExecute(void)
1386 {
1387   if (traceNestedEvents) nestedEvents.deq();
1388   endExecuteLocal();
1389   if (traceNestedEvents) {
1390     if (! nestedEvents.isEmpty()) {
1391       NestedEvent &ne = nestedEvents.peek();
1392       beginExecuteLocal(ne.event, ne.msgType, ne.ep, ne.srcPe, ne.ml, ne.idx);
1393     }
1394   }
1395 }
1396
1397 void TraceProjections::endExecute(char *msg)
1398 {
1399 #if CMK_SMP_TRACE_COMMTHREAD
1400         //This function is called from comm thread in SMP mode
1401     envelope *e = (envelope *)msg;
1402     int num = _entryTable.size();
1403     int ep = e->getEpIdx();
1404     if(ep<0 || ep>=num) return;
1405     if(_entryTable[ep]->traceEnabled)
1406                 endExecute();
1407 #endif  
1408 }
1409
1410 void TraceProjections::endExecuteLocal(void)
1411 {
1412 #if CMK_HAS_COUNTER_PAPI
1413   if (PAPI_read(papiEventSet, papiValues) != PAPI_OK) {
1414     CmiAbort("PAPI failed to read at end execute!\n");
1415   }
1416 #endif
1417   if (checknested && !inEntry) CmiAbort("Nested EndExecute!\n");
1418   double cputime = TraceCpuTimer();
1419   if(execEp == (-1)) {
1420     _logPool->add(END_PROCESSING, 0, _threadEP, TraceTimer(),
1421                   execEvent, CkMyPe(), 0, NULL, 0.0, cputime);
1422   } else {
1423     _logPool->add(END_PROCESSING, 0, execEp, TraceTimer(),
1424                   execEvent, execPe, 0, NULL, 0.0, cputime);
1425   }
1426 #if CMK_HAS_COUNTER_PAPI
1427   _logPool->addPapi(papiValues);
1428 #endif
1429   inEntry = 0;
1430 }
1431
1432 void TraceProjections::messageRecv(char *env, int pe)
1433 {
1434 #if 0
1435   envelope *e = (envelope *)env;
1436   int msgType = e->getMsgtype();
1437   int ep = e->getEpIdx();
1438 #if 0
1439   if (msgType==NewChareMsg || msgType==NewVChareMsg
1440           || msgType==ForChareMsg || msgType==ForVidMsg
1441           || msgType==BocInitMsg || msgType==NodeBocInitMsg
1442           || msgType==ForBocMsg || msgType==ForNodeBocMsg)
1443     ep = e->getEpIdx();
1444   else
1445     ep = _threadEP;
1446 #endif
1447   _logPool->add(MESSAGE_RECV, msgType, ep, TraceTimer(),
1448                 curevent++, e->getSrcPe(), e->getTotalsize());
1449 #endif
1450 }
1451
1452 void TraceProjections::beginIdle(double curWallTime)
1453 {
1454   _logPool->add(BEGIN_IDLE, 0, 0, TraceTimer(curWallTime), 0, CkMyPe());
1455 }
1456
1457 void TraceProjections::endIdle(double curWallTime)
1458 {
1459   _logPool->add(END_IDLE, 0, 0, TraceTimer(curWallTime), 0, CkMyPe());
1460 }
1461
1462 void TraceProjections::beginPack(void)
1463 {
1464   _logPool->add(BEGIN_PACK, 0, 0, TraceTimer(), 0, CkMyPe());
1465 }
1466
1467 void TraceProjections::endPack(void)
1468 {
1469   _logPool->add(END_PACK, 0, 0, TraceTimer(), 0, CkMyPe());
1470 }
1471
1472 void TraceProjections::beginUnpack(void)
1473 {
1474   _logPool->add(BEGIN_UNPACK, 0, 0, TraceTimer(), 0, CkMyPe());
1475 }
1476
1477 void TraceProjections::endUnpack(void)
1478 {
1479   _logPool->add(END_UNPACK, 0, 0, TraceTimer(), 0, CkMyPe());
1480 }
1481
1482 void TraceProjections::enqueue(envelope *) {}
1483
1484 void TraceProjections::dequeue(envelope *) {}
1485
1486 void TraceProjections::beginComputation(void)
1487 {
1488   computationStarted = 1;
1489
1490   // Executes the callback function provided by the machine
1491   // layer. This is the proper method to register user events in a
1492   // machine layer because projections is a charm++ module.
1493   if (CkpvAccess(traceOnPe) != 0) {
1494     void (*ptr)() = registerMachineUserEvents();
1495     if (ptr != NULL) {
1496       ptr();
1497     }
1498   }
1499 //  CkpvAccess(traceInitTime) = TRACE_TIMER();
1500 //  CkpvAccess(traceInitCpuTime) = TRACE_CPUTIMER();
1501   _logPool->add(BEGIN_COMPUTATION, 0, 0, TraceTimer(), -1, -1);
1502 #if CMK_HAS_COUNTER_PAPI
1503   // we start the counters here
1504   if (PAPI_start(papiEventSet) != PAPI_OK) {
1505     CmiAbort("PAPI failed to start designated counters!\n");
1506   }
1507 #endif
1508 }
1509
1510 void TraceProjections::endComputation(void)
1511 {
1512 #if CMK_HAS_COUNTER_PAPI
1513   // we stop the counters here. A silent failure is alright since we
1514   // are already at the end of the program.
1515   if (PAPI_stop(papiEventSet, papiValues) != PAPI_OK) {
1516     CkPrintf("Warning: PAPI failed to stop correctly!\n");
1517   }
1518   // NOTE: We should not do a complete close of PAPI until after the
1519   // sts writer is done.
1520 #endif
1521   endTime = TraceTimer();
1522   _logPool->add(END_COMPUTATION, 0, 0, endTime, -1, -1);
1523   /*
1524   CkPrintf("End Computation [%d] records time as %lf\n", CkMyPe(), 
1525            endTime*1e06);
1526   */
1527 }
1528
1529 int TraceProjections::idxRegistered(int idx)
1530 {
1531     int idxVecLen = idxVec.size();
1532     for(int i=0; i<idxVecLen; i++)
1533     {
1534         if(idx == idxVec[i])
1535             return 1;
1536     }
1537     return 0;
1538 }
1539
1540 void TraceProjections::regFunc(const char *name, int &idx, int idxSpecifiedByUser){
1541     StrKey k((char*)name,strlen(name));
1542     int num = funcHashtable.get(k);
1543     
1544     if(num!=0) {
1545         return;
1546         //as for mpi programs, the same function may be registered for several times
1547         //CmiError("\"%s has been already registered! Please change the name!\"\n", name);
1548     }
1549     
1550     int isIdxExisting=0;
1551     if(idxSpecifiedByUser)
1552         isIdxExisting=idxRegistered(idx);
1553     if(isIdxExisting){
1554         return;
1555         //same reason with num!=0
1556         //CmiError("The identifier %d for the trace function has been already registered!", idx);
1557     }
1558
1559     if(idxSpecifiedByUser) {
1560         char *st = new char[strlen(name)+1];
1561         memcpy(st,name,strlen(name)+1);
1562         StrKey *newKey = new StrKey(st,strlen(st));
1563         int &ref = funcHashtable.put(*newKey);
1564         ref=idx;
1565         funcCount++;
1566         idxVec.push_back(idx);  
1567     } else {
1568         char *st = new char[strlen(name)+1];
1569         memcpy(st,name,strlen(name)+1);
1570         StrKey *newKey = new StrKey(st,strlen(st));
1571         int &ref = funcHashtable.put(*newKey);
1572         ref=funcCount;
1573         num = funcCount;
1574         funcCount++;
1575         idx = num;
1576         idxVec.push_back(idx);
1577     }
1578 }
1579
1580 void TraceProjections::beginFunc(char *name,char *file,int line){
1581         StrKey k(name,strlen(name));    
1582         unsigned short  num = (unsigned short)funcHashtable.get(k);
1583         beginFunc(num,file,line);
1584 }
1585
1586 void TraceProjections::beginFunc(int idx,char *file,int line){
1587         if(idx <= 0){
1588                 CmiError("Unregistered function id %d being used in %s:%d \n",idx,file,line);
1589         }       
1590         _logPool->add(BEGIN_FUNC,TraceTimer(),idx,line,file);
1591 }
1592
1593 void TraceProjections::endFunc(char *name){
1594         StrKey k(name,strlen(name));    
1595         int num = funcHashtable.get(k);
1596         endFunc(num);   
1597 }
1598
1599 void TraceProjections::endFunc(int num){
1600         if(num <= 0){
1601                 printf("endFunc without start :O\n");
1602         }
1603         _logPool->add(END_FUNC,TraceTimer(),num,0,NULL);
1604 }
1605
1606 // specialized PUP:ers for handling trace projections logs
1607 void toProjectionsFile::bytes(void *p,int n,size_t itemSize,dataType t)
1608 {
1609   for (int i=0;i<n;i++) 
1610     switch(t) {
1611     case Tchar: CheckAndFPrintF(f,"%c",((char *)p)[i]); break;
1612     case Tuchar:
1613     case Tbyte: CheckAndFPrintF(f,"%d",((unsigned char *)p)[i]); break;
1614     case Tshort: CheckAndFPrintF(f," %d",((short *)p)[i]); break;
1615     case Tushort: CheckAndFPrintF(f," %u",((unsigned short *)p)[i]); break;
1616     case Tint: CheckAndFPrintF(f," %d",((int *)p)[i]); break;
1617     case Tuint: CheckAndFPrintF(f," %u",((unsigned int *)p)[i]); break;
1618     case Tlong: CheckAndFPrintF(f," %ld",((long *)p)[i]); break;
1619     case Tulong: CheckAndFPrintF(f," %lu",((unsigned long *)p)[i]); break;
1620     case Tfloat: CheckAndFPrintF(f," %.7g",((float *)p)[i]); break;
1621     case Tdouble: CheckAndFPrintF(f," %.15g",((double *)p)[i]); break;
1622 #ifdef CMK_PUP_LONG_LONG
1623     case Tlonglong: CheckAndFPrintF(f," %lld",((CMK_TYPEDEF_INT8 *)p)[i]); break;
1624     case Tulonglong: CheckAndFPrintF(f," %llu",((CMK_TYPEDEF_UINT8 *)p)[i]); break;
1625 #endif
1626     default: CmiAbort("Unrecognized pup type code!");
1627     };
1628 }
1629
1630 void fromProjectionsFile::bytes(void *p,int n,size_t itemSize,dataType t)
1631 {
1632   for (int i=0;i<n;i++) 
1633     switch(t) {
1634     case Tchar: { 
1635       char c = fgetc(f);
1636       if (c==EOF)
1637         parseError("Could not match character");
1638       else
1639         ((char *)p)[i] = c;
1640       break;
1641     }
1642     case Tuchar:
1643     case Tbyte: ((unsigned char *)p)[i]=(unsigned char)readInt("%d"); break;
1644     case Tshort:((short *)p)[i]=(short)readInt(); break;
1645     case Tushort: ((unsigned short *)p)[i]=(unsigned short)readUint(); break;
1646     case Tint:  ((int *)p)[i]=readInt(); break;
1647     case Tuint: ((unsigned int *)p)[i]=readUint(); break;
1648     case Tlong: ((long *)p)[i]=readInt(); break;
1649     case Tulong:((unsigned long *)p)[i]=readUint(); break;
1650     case Tfloat: ((float *)p)[i]=(float)readDouble(); break;
1651     case Tdouble:((double *)p)[i]=readDouble(); break;
1652 #ifdef CMK_PUP_LONG_LONG
1653     case Tlonglong: ((CMK_TYPEDEF_INT8 *)p)[i]=readLongInt(); break;
1654     case Tulonglong: ((CMK_TYPEDEF_UINT8 *)p)[i]=readLongInt(); break;
1655 #endif
1656     default: CmiAbort("Unrecognized pup type code!");
1657     };
1658 }
1659
1660 #if CMK_PROJECTIONS_USE_ZLIB
1661 void toProjectionsGZFile::bytes(void *p,int n,size_t itemSize,dataType t)
1662 {
1663   for (int i=0;i<n;i++) 
1664     switch(t) {
1665     case Tchar: gzprintf(f,"%c",((char *)p)[i]); break;
1666     case Tuchar:
1667     case Tbyte: gzprintf(f,"%d",((unsigned char *)p)[i]); break;
1668     case Tshort: gzprintf(f," %d",((short *)p)[i]); break;
1669     case Tushort: gzprintf(f," %u",((unsigned short *)p)[i]); break;
1670     case Tint: gzprintf(f," %d",((int *)p)[i]); break;
1671     case Tuint: gzprintf(f," %u",((unsigned int *)p)[i]); break;
1672     case Tlong: gzprintf(f," %ld",((long *)p)[i]); break;
1673     case Tulong: gzprintf(f," %lu",((unsigned long *)p)[i]); break;
1674     case Tfloat: gzprintf(f," %.7g",((float *)p)[i]); break;
1675     case Tdouble: gzprintf(f," %.15g",((double *)p)[i]); break;
1676 #ifdef CMK_PUP_LONG_LONG
1677     case Tlonglong: gzprintf(f," %lld",((CMK_TYPEDEF_INT8 *)p)[i]); break;
1678     case Tulonglong: gzprintf(f," %llu",((CMK_TYPEDEF_UINT8 *)p)[i]); break;
1679 #endif
1680     default: CmiAbort("Unrecognized pup type code!");
1681     };
1682 }
1683 #endif
1684
1685 void TraceProjections::endPhase() {
1686   double currentPhaseTime = TraceTimer();
1687   double lastPhaseTime = 0.0;
1688
1689   if (lastPhaseEvent != NULL) {
1690     lastPhaseTime = lastPhaseEvent->time;
1691   } else {
1692     if (_logPool->pool != NULL) {
1693       // assumed to be BEGIN_COMPUTATION
1694       lastPhaseTime = _logPool->pool[0].time;
1695     } else {
1696       CkPrintf("[%d] Warning: End Phase encountered in an empty log. Inserting BEGIN_COMPUTATION event\n", CkMyPe());
1697       _logPool->add(BEGIN_COMPUTATION, 0, 0, currentPhaseTime, -1, -1);
1698       lastPhaseTime = currentPhaseTime;
1699     }
1700   }
1701
1702   /* Insert endPhase event here. */
1703   /* FIXME: Format should be TYPE, PHASE#, TimeStamp, [StartTime] */
1704   /*        We currently "borrow" from the standard add() method. */
1705   /*        It should really be its own add() method.             */
1706   /* NOTE: assignment to lastPhaseEvent is "pre-emptive".         */
1707   lastPhaseEvent = &(_logPool->pool[_logPool->numEntries]);
1708   _logPool->add(END_PHASE, 0, currentPhaseID, currentPhaseTime, -1, CkMyPe());
1709   currentPhaseID++;
1710 }
1711
1712 #ifdef PROJ_ANALYSIS
1713 // ***** FROM HERE, ALL BOC-BASED FUNCTIONALITY IS DEFINED *******
1714
1715
1716 // ***@@@@ REGISTRATION FUNCTIONS/METHODS @@@@***
1717
1718 void registerOutlierReduction() {
1719   outlierReductionType =
1720     CkReduction::addReducer(outlierReduction);
1721   minMaxReductionType =
1722     CkReduction::addReducer(minMaxReduction);
1723 }
1724
1725 /**
1726  * **IMPT NOTES**:
1727  *
1728  * This is the C++ code that is registered to be activated at module
1729  * shutdown. This is called exactly once on processor 0. Module shutdown
1730  * is initiated as a result of a CkExit() call by the application code
1731  * 
1732  * The exit function must ultimately call CkExit() again to
1733  * so that other module exit functions may proceed after this module is
1734  * done.
1735  *
1736  */
1737 // FIXME: WHY extern "C"???
1738 extern "C" void TraceProjectionsExitHandler()
1739 {
1740 #if CMK_TRACE_ENABLED
1741   // CkPrintf("[%d] TraceProjectionsExitHandler called!\n", CkMyPe());
1742   CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1743   bocProxy.traceProjectionsParallelShutdown(CkMyPe());
1744 #else
1745   CkExit();
1746 #endif
1747 }
1748
1749 // This is called once on each processor but the idiom of use appears
1750 // to be to only have processor 0 register the function.
1751 //
1752 // See initnode in trace-projections.ci
1753 void initTraceProjectionsBOC()
1754 {
1755   // CkPrintf("[%d] Trace Projections initialization called!\n", CkMyPe());
1756 #ifdef __BLUEGENE__
1757   if (BgNodeRank() == 0) {
1758 #else
1759     if (CkMyRank() == 0) {
1760 #endif
1761       registerExitFn(TraceProjectionsExitHandler);
1762     }
1763 #if 0
1764   } // this is so indentation does not get messed up
1765 #endif
1766 }
1767
1768 // mainchare for trace-projections BOC-operations. 
1769 // Instantiated at processor 0 and ONLY resides on processor 0 for the 
1770 // rest of its life.
1771 //
1772 // Responsible for:
1773 //   1. Handling commandline arguments
1774 //   2. Creating any objects required for proper BOC operations.
1775 //
1776 TraceProjectionsInit::TraceProjectionsInit(CkArgMsg *msg) {
1777   /** Options for Outlier Analysis */
1778   // defaults. Things will change with support for interactive analysis.
1779   bool findOutliers = false;
1780   bool outlierAutomatic = true;
1781   int numKSeeds = 10; 
1782
1783   int peNumKeep = CkNumPes();  // used as a default
1784   double entryThreshold = 0.0;
1785   bool outlierUsePhases = false;
1786   if (outlierAutomatic) {
1787     CmiGetArgIntDesc(msg->argv, "+outlierNumSeeds", &numKSeeds,
1788                      "Number of cluster seeds to apply at outlier analysis.");
1789     CmiGetArgIntDesc(msg->argv, "+outlierPeNumKeep", 
1790                      &peNumKeep, "Number of Processors to retain data");
1791     CmiGetArgDoubleDesc(msg->argv, "+outlierEpThresh", &entryThreshold,
1792                         "Minimum significance of entry points to be considered for clustering (%).");
1793     findOutliers =
1794       CmiGetArgFlagDesc(msg->argv,"+outlier", "Find Outliers.");
1795     outlierUsePhases = 
1796       CmiGetArgFlagDesc(msg->argv,"+outlierUsePhases",
1797                         "Apply automatic outlier analysis to any available phases.");
1798     if (outlierUsePhases) {
1799       // if the user wants to use an outlier feature, it is assumed outlier
1800       //    analysis is desired.
1801       findOutliers = true;
1802     }
1803   }
1804   bool findStartTime = (CmiTimerAbsolute()==1);
1805   traceProjectionsGID = CProxy_TraceProjectionsBOC::ckNew(findOutliers, findStartTime);
1806   if (findOutliers) {
1807     kMeansGID = CProxy_KMeansBOC::ckNew(outlierAutomatic,
1808                                         numKSeeds,
1809                                         peNumKeep,
1810                                         entryThreshold,
1811                                         outlierUsePhases);
1812   }
1813 }
1814
1815 // Called on every processor.
1816 void TraceProjectionsBOC::traceProjectionsParallelShutdown(int pe) {
1817   //CmiPrintf("[%d] traceProjectionsParallelShutdown called from . \n", CkMyPe(), pe);
1818   endPe = pe;                // the pe that starts CkExit()
1819   if (CkMyPe() == 0) {
1820     analysisStartTime = CmiWallTimer();
1821   }
1822   CkpvAccess(_trace)->endComputation();
1823   // no more tracing for projections on this processor after this point. 
1824   // Note that clear must be called after remove, or bad things will happen.
1825   CkpvAccess(_traces)->removeTrace(CkpvAccess(_trace));
1826   CkpvAccess(_traces)->clearTrace();
1827
1828   // From this point, we start multiple chains of reductions and broadcasts to
1829   // perform final online analysis activities.
1830
1831   // Start all parallel operations at once. 
1832   //   These MUST NOT modify base performance data in LogPool. If they must,
1833   //   then the parallel operations must be phased (and this code to be
1834   //   restructured as necessary)
1835   CProxy_KMeansBOC kMeansProxy(kMeansGID);
1836   CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1837   if (findOutliers) {
1838     parModulesRemaining++;
1839     kMeansProxy[CkMyPe()].startKMeansAnalysis();
1840   }
1841   parModulesRemaining++;
1842   if (findStartTime) 
1843   bocProxy[CkMyPe()].startTimeAnalysis();
1844   else
1845   bocProxy[CkMyPe()].startEndTimeAnalysis();
1846 }
1847
1848 // Called on each processor
1849 void KMeansBOC::startKMeansAnalysis() {
1850   // Initialize all necessary structures
1851   LogPool *pool = CkpvAccess(_trace)->_logPool;
1852
1853  if(CkMyPe()==0)     CkPrintf("[%d] KMeansBOC::startKMeansAnalysis time=\t%g\n", CkMyPe(), CkWallTimer() );
1854   int flushInt = 0;
1855   if (pool->hasFlushed) {
1856     flushInt = 1;
1857   }
1858   
1859   CkCallback cb(CkIndex_KMeansBOC::flushCheck(NULL), 
1860                 0, thisProxy);
1861   contribute(sizeof(int), &flushInt, CkReduction::logical_or, cb);  
1862 }
1863
1864 // Called on processor 0
1865 void KMeansBOC::flushCheck(CkReductionMsg *msg) {
1866   int someFlush = *((int *)msg->getData());
1867
1868   // if(CkMyPe()==0) CkPrintf("[%d] KMeansBOC::flushCheck time=\t%g\n", CkMyPe(), CkWallTimer() );
1869   
1870   if (someFlush == 0) {
1871     // Data intact proceed with KMeans analysis
1872     CProxy_KMeansBOC kMeansProxy(kMeansGID);
1873     kMeansProxy.flushCheckDone();
1874   } else {
1875     // Some processor had flushed it data at some point, abandon KMeans
1876     CkPrintf("Warning: Some processor has flushed its data. No KMeans will be conducted\n");
1877     // terminate KMeans
1878     CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1879     bocProxy[0].kMeansDone();
1880   }
1881 }
1882
1883 // Called on each processor
1884 void KMeansBOC::flushCheckDone() {
1885   // **FIXME** - more flexible metric collection scheme may be necessary
1886   //   in the future for production use.
1887   LogPool *pool = CkpvAccess(_trace)->_logPool;
1888
1889   // if(CkMyPe()==0)     CkPrintf("[%d] KMeansBOC::flushCheckDone time=\t%g\n", CkMyPe(), CkWallTimer() );
1890
1891   numEntryMethods = _entryTable.size();
1892   numMetrics = numEntryMethods + 2; // EPtime + idle and overhead
1893
1894   // maintained across phases
1895   markedBegin = false;
1896   markedIdle = false;
1897   beginBlockTime = 0.0;
1898   beginIdleBlockTime = 0.0;
1899   lastBeginEPIdx = -1; // none
1900
1901   lastPhaseIdx = 0;
1902   currentExecTimes = NULL;
1903   currentPhase = 0;
1904   selected = false;
1905
1906   pool->initializePhases();
1907
1908   // incoming K Seeds and the per-phase filter
1909   incKSeeds = new double[numK*numMetrics];
1910   keepMetric = new bool[numMetrics];
1911
1912   //  Something wrong when call thisProxy[CkMyPe()].getNextPhaseMetrics() !??!
1913   //  CProxy_KMeansBOC kMeansProxy(kMeansGID);
1914   //  kMeansProxy[CkMyPe()].getNextPhaseMetrics();
1915   thisProxy[CkMyPe()].getNextPhaseMetrics();
1916 }
1917
1918 // Called on each processor.
1919 void KMeansBOC::getNextPhaseMetrics() {
1920   // Assumes the presence of the complete logs on this processor.
1921   // Assumes first event is always BEGIN_COMPUTATION
1922   // Assumes each processor sees the same number of phases.
1923   //
1924   // In this code, we collect performance data for this processor.
1925   // All times are in seconds.
1926
1927   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::getNextPhaseMetrics time=\t%g\n", CkMyPe(), CkWallTimer() );  
1928
1929   if (usePhases) {
1930     DEBUGF("[%d] Using Phases\n", CkMyPe());
1931   } else {
1932     DEBUGF("[%d] NOT using Phases\n", CkMyPe());
1933   }
1934   
1935   if (currentExecTimes != NULL) {
1936     delete [] currentExecTimes;
1937   }
1938   currentExecTimes = new double[numMetrics];
1939   for (int i=0; i<numMetrics; i++) {
1940     currentExecTimes[i] = 0.0;
1941   }
1942
1943   int numEventMethods = _entryTable.size();
1944   LogPool *pool = CkpvAccess(_trace)->_logPool;
1945   
1946   CkAssert(pool->numEntries > lastPhaseIdx);
1947   double startPhaseTime = pool->pool[lastPhaseIdx].time;
1948   double totalPhaseTime = 0.0;
1949   double totalActiveTime = 0.0; // entry method + idle
1950
1951   for (int i=lastPhaseIdx; i<pool->numEntries; i++) {
1952     if (pool->pool[i].type == BEGIN_PROCESSING) {
1953       // check pairing
1954       if (!markedBegin) {
1955         markedBegin = true;
1956       }
1957       beginBlockTime = pool->pool[i].time;
1958       lastBeginEPIdx = pool->pool[i].eIdx;
1959     } else if (pool->pool[i].type == END_PROCESSING) {
1960       // check pairing
1961       // if End without a begin, just ignore
1962       //   this event. If a phase-boundary is crossed, the Begin
1963       //   event would be maintained in beginBlockTime, so it is 
1964       //   not a problem.
1965       if (markedBegin) {
1966         markedBegin = false;
1967         if (pool->pool[i].event < 0)
1968         {
1969           // ignore dummy events. **FIXME** as they have no eIdx?
1970           continue;
1971         }
1972         currentExecTimes[pool->pool[i].eIdx] += 
1973           pool->pool[i].time - beginBlockTime;
1974         totalActiveTime += pool->pool[i].time - beginBlockTime;
1975         lastBeginEPIdx = -1;
1976       }
1977     } else if (pool->pool[i].type == BEGIN_IDLE) {
1978       // check pairing
1979       if (!markedIdle) {
1980         markedIdle = true;
1981       }
1982       beginIdleBlockTime = pool->pool[i].time;
1983     } else if (pool->pool[i].type == END_IDLE) {
1984       // check pairing
1985       if (markedIdle) {
1986         markedIdle = false;
1987         currentExecTimes[numEventMethods] += 
1988           pool->pool[i].time - beginIdleBlockTime;
1989         totalActiveTime += pool->pool[i].time - beginIdleBlockTime;
1990       }
1991     } else if (pool->pool[i].type == END_PHASE) {
1992       // ignored when not using phases
1993       if (usePhases) {
1994         // when we've not visited this node before
1995         if (i != lastPhaseIdx) { 
1996           totalPhaseTime = 
1997             pool->pool[i].time - pool->pool[lastPhaseIdx].time;
1998           // it is important that proper accounting of time take place here.
1999           // Note that END_PHASE events inevitably occur in the context of
2000           //   some entry method by the way the tracing API is designed.
2001           if (markedBegin) {
2002             CkAssert(lastBeginEPIdx >= 0);
2003             currentExecTimes[lastBeginEPIdx] += 
2004               pool->pool[i].time - beginBlockTime;
2005             totalActiveTime += pool->pool[i].time - beginBlockTime;
2006             // this is so the remainder contributes to the next phase
2007             beginBlockTime = pool->pool[i].time;
2008           }
2009           // The following is unlikely, but stranger things have happened.
2010           if (markedIdle) {
2011             currentExecTimes[numEventMethods] +=
2012               pool->pool[i].time - beginIdleBlockTime;
2013             totalActiveTime += pool->pool[i].time - beginIdleBlockTime;
2014             // this is so the remainder contributes to the next phase
2015             beginIdleBlockTime = pool->pool[i].time;
2016           }
2017           if (totalActiveTime <= totalPhaseTime) {
2018             currentExecTimes[numEventMethods+1] = 
2019               totalPhaseTime - totalActiveTime;
2020           } else {
2021             currentExecTimes[numEventMethods+1] = 0.0;
2022             CkPrintf("[%d] Warning: Overhead found to be negative for Phase %d!\n",
2023                      CkMyPe(), currentPhase);
2024           }
2025           collectKMeansData();
2026           // end the loop (and method) and defer the work till the next call
2027           lastPhaseIdx = i;
2028           break; 
2029         }
2030       }
2031     } else if (pool->pool[i].type == END_COMPUTATION) {
2032       if (markedBegin) {
2033         CkAssert(lastBeginEPIdx >= 0);
2034         currentExecTimes[lastBeginEPIdx] += 
2035           pool->pool[i].time - beginBlockTime;
2036         totalActiveTime += pool->pool[i].time - beginBlockTime;
2037       }
2038       if (markedIdle) {
2039         currentExecTimes[numEventMethods] +=
2040           pool->pool[i].time - beginIdleBlockTime;
2041         totalActiveTime += pool->pool[i].time - beginIdleBlockTime;
2042       }
2043       totalPhaseTime = 
2044         pool->pool[i].time - pool->pool[lastPhaseIdx].time;
2045       if (totalActiveTime <= totalPhaseTime) {
2046         currentExecTimes[numEventMethods+1] = totalPhaseTime - totalActiveTime;
2047       } else {
2048         currentExecTimes[numEventMethods+1] = 0.0;
2049         CkPrintf("[%d] Warning: Overhead found to be negative!\n",
2050                  CkMyPe());
2051       }
2052       collectKMeansData();
2053     }
2054   }
2055 }
2056
2057 /**
2058  *  Through a reduction, collectKMeansData aggregates each processors' data
2059  *  in order for global properties to be determined:
2060  *  
2061  *  1. min & max to determine normalization factors.
2062  *  2. sum to determine global EP averages for possible metric reduction
2063  *       through thresholding.
2064  *  3. sum of squares to compute stddev which may be useful in the future.
2065  *
2066  *  collectKMeansData will also keep the processor's data for the current
2067  *    phase so that it may be normalized and worked on subsequently.
2068  *
2069  **/
2070 void KMeansBOC::collectKMeansData() {
2071   int minOffset = numMetrics;
2072   int maxOffset = 2*numMetrics;
2073   int sosOffset = 3*numMetrics; // sos = Sum Of Squares
2074
2075   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::collectKMeansData time=\tg\n", CkMyPe(), CkWallTimer() );
2076
2077   double *reductionMsg = new double[numMetrics*4];
2078
2079   for (int i=0; i<numMetrics; i++) {
2080     reductionMsg[i] = currentExecTimes[i];
2081     // duplicate the event times for max and min sections of the reduction
2082     reductionMsg[minOffset + i] = currentExecTimes[i];
2083     reductionMsg[maxOffset + i] = currentExecTimes[i];
2084     // compute squares
2085     reductionMsg[sosOffset + i] = currentExecTimes[i]*currentExecTimes[i];
2086   }
2087
2088   CkCallback cb(CkIndex_KMeansBOC::globalMetricRefinement(NULL), 
2089                 0, thisProxy);
2090   contribute((numMetrics*4)*sizeof(double), reductionMsg, 
2091              outlierReductionType, cb);  
2092 }
2093
2094 // The purpose is mainly to initialize the k seeds and generate
2095 //   normalization parameters for each of the metrics. The k seeds
2096 //   and normalization parameters are broadcast to all processors.
2097 //
2098 // Called on processor 0
2099 void KMeansBOC::globalMetricRefinement(CkReductionMsg *msg) {
2100   CkAssert(CkMyPe() == 0);
2101   
2102   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::globalMetricRefinement time=\t%g\n", CkMyPe(), CkWallTimer() );
2103
2104   int sumOffset = 0;
2105   int minOffset = numMetrics;
2106   int maxOffset = 2*numMetrics;
2107   int sosOffset = 3*numMetrics; // sos = Sum Of Squares
2108
2109   // calculate statistics & boundaries for the k seeds for clustering
2110   KMeansStatsMessage *outmsg = 
2111     new (numMetrics, numK*numMetrics, numMetrics*4) KMeansStatsMessage;
2112   outmsg->numMetrics = numMetrics;
2113   outmsg->numKPos = numK*numMetrics;
2114   outmsg->numStats = numMetrics*4;
2115
2116   // Sum | Min | Max | Sum of Squares
2117   double *totalExecTimes = (double *)msg->getData();
2118   double totalTime = 0.0;
2119
2120   for (int i=0; i<numMetrics; i++) {
2121     DEBUGN("%lf\n", totalExecTimes[i]);
2122     totalTime += totalExecTimes[i];
2123
2124     // calculate event mean over all processors
2125     outmsg->stats[sumOffset + i] = totalExecTimes[sumOffset + i]/CkNumPes();
2126
2127     // get the ranges and offsets of each metric. With this, we get
2128     //   normalization factors that can be sent back to each processor to
2129     //   be used as necessary. We reuse max for range. Min remains the offset.
2130     outmsg->stats[minOffset + i] = totalExecTimes[minOffset + i];
2131     outmsg->stats[maxOffset + i] = totalExecTimes[maxOffset + i] -
2132       totalExecTimes[minOffset + i];
2133     
2134     // calculate stddev (using biased variance)
2135     outmsg->stats[sosOffset + i] = 
2136       sqrt((totalExecTimes[sosOffset + i] - 
2137             2*(outmsg->stats[i])*totalExecTimes[i] +
2138             (outmsg->stats[i])*(outmsg->stats[i])*CkNumPes())/
2139            CkNumPes());
2140   }
2141
2142   for (int i=0; i<numMetrics; i++) {
2143     // 1) if the proportion of the max value of the entry method relative to
2144     //   the average time taken over all entry methods across all processors
2145     //   is greater than the stipulated percentage threshold ...; AND
2146     // 2) if the range of values are non-zero.
2147     //
2148     // The current assumption is totalTime > 0 (what program has zero total
2149     //   time from all work?)
2150     keepMetric[i] = ((totalExecTimes[maxOffset + i]/(totalTime/CkNumPes()) >=
2151                      entryThreshold) &&
2152       (totalExecTimes[maxOffset + i] > totalExecTimes[minOffset + i]));
2153     if (keepMetric[i]) {
2154       DEBUGF("[%d] Keep EP %d | Max = %lf | Avg Tot = %lf\n", CkMyPe(), i,
2155              totalExecTimes[maxOffset + i], totalTime/CkNumPes());
2156     } else {
2157       DEBUGN("[%d] DO NOT Keep EP %d\n", CkMyPe(), i);
2158     }
2159     outmsg->filter[i] = keepMetric[i];
2160   }
2161
2162   delete msg;
2163   
2164   // initialize k seeds for this phase
2165   kSeeds = new double[numK*numMetrics];
2166
2167   numKReported = 0;
2168   kNumMembers = new int[numK];
2169
2170   // Randomly select k processors' metric vectors for the k seeds
2171   //  srand((unsigned)(CmiWallTimer()*1.0e06));
2172   srand(11337); // for debugging purposes
2173   for (int k=0; k<numK; k++) {
2174     DEBUGF("Seed %d | ", k);
2175     for (int m=0; m<numMetrics; m++) {
2176       double factor = totalExecTimes[maxOffset + m] - 
2177         totalExecTimes[minOffset + m];
2178       // "uniform" distribution, scaled according to the normalization
2179       //   factors
2180       //      kSeeds[numMetrics*k + m] = ((1.0*(k+1))/numK)*factor;
2181       // Random distribution.
2182       kSeeds[numMetrics*k + m] =
2183         ((rand()*1.0)/RAND_MAX)*factor;
2184       if (keepMetric[m]) {
2185         DEBUGF("[%d|%lf] ", m, kSeeds[numMetrics*k + m]);
2186       }
2187       outmsg->kSeedsPos[numMetrics*k + m] = kSeeds[numMetrics*k + m];
2188     }
2189     DEBUGF("\n");
2190     kNumMembers[k] = 0;
2191   }
2192
2193   // broadcast statistical values to all processors for cluster discovery
2194   thisProxy.findInitialClusters(outmsg);
2195 }
2196
2197
2198
2199 // Called on each processor.
2200 void KMeansBOC::findInitialClusters(KMeansStatsMessage *msg) {
2201
2202  if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::findInitialClusters time=\t%g\n", CkMyPe(), CkWallTimer() );
2203
2204   phaseIter = 0;
2205
2206   // Get info from stats message
2207   CkAssert(numMetrics == msg->numMetrics);
2208   for (int i=0; i<numMetrics; i++) {
2209     keepMetric[i] = msg->filter[i];
2210   }
2211
2212   // Normalize data on local processor.
2213   // **CWL** See my thesis for detailed discussion of normalization of
2214   //    performance data.
2215   // **NOTE** This might change if we want to send data based on the filter
2216   //   instead of all the data.
2217   CkAssert(numMetrics*4 == msg->numStats);
2218   for (int i=0; i<numMetrics; i++) {
2219     currentExecTimes[i] -= msg->stats[numMetrics + i];  // take offset
2220     // **CWL** We do not normalize the range. Entry methods that exhibit
2221     //   large absolute timing variations should be allowed to contribute
2222     //   more to the Euclidean distance measure!
2223     // currentExecTimes[i] /= msg->stats[2*numMetrics + i];
2224   }
2225
2226   // **NOTE** This might change if we want to send data based on the filter
2227   //   instead of all the data.
2228   CkAssert(numK*numMetrics == msg->numKPos);
2229   for (int i=0; i<msg->numKPos; i++) {
2230     incKSeeds[i] = msg->kSeedsPos[i];
2231   }
2232
2233   // Decide which KSeed this processor belongs to.
2234   minDistance = calculateDistance(0);
2235   DEBUGN("[%d] Phase %d Iter %d | Distance from 0 = %lf \n", CkMyPe(), 
2236            currentPhase, phaseIter, minDistance);
2237   minK = 0;
2238   for (int i=1; i<numK; i++) {
2239     double distance = calculateDistance(i);
2240     DEBUGN("[%d] Phase %d Iter %d | Distance from %d = %lf \n", CkMyPe(), 
2241              currentPhase, phaseIter, i, distance);
2242     if (distance < minDistance) {
2243       minDistance = distance;
2244       minK = i;
2245     }
2246   }
2247
2248   // Set up a reduction with the modification vector to the root (0).
2249   //
2250   // The modification vector sends a negative value for each metric
2251   //   for the K this processor no longer belongs to and a positive
2252   //   value to the K the processor now belongs. In addition, a -1.0
2253   //   is sent to the K it is leaving and a +1.0 to the K it is 
2254   //   joining.
2255   //
2256   // The processor must still contribute a "zero returns" even if
2257   //   nothing changes. This will be the basis for determine
2258   //   convergence at the root.
2259   //
2260   // The addtional +1 is meant for the count-change that must be
2261   //   maintained for the special cases at the root when some K
2262   //   may be deprived of all processor points or go from 0 to a
2263   //   positive number of processors (see later comments).
2264   double *modVector = new double[numK*(numMetrics+1)];
2265   for (int i=0; i<numK; i++) {
2266     for (int j=0; j<numMetrics+1; j++) {
2267       modVector[i*(numMetrics+1) + j] = 0.0;
2268     }
2269   }
2270   for (int i=0; i<numMetrics; i++) {
2271     // for this initialization, only positive values need be sent.
2272     modVector[minK*(numMetrics+1) + i] = currentExecTimes[i];
2273   }
2274   modVector[minK*(numMetrics+1)+numMetrics] = 1.0;
2275
2276   CkCallback cb(CkIndex_KMeansBOC::updateKSeeds(NULL), 
2277                 0, thisProxy);
2278   contribute(numK*(numMetrics+1)*sizeof(double), modVector, 
2279              CkReduction::sum_double, cb);  
2280 }
2281
2282 double KMeansBOC::calculateDistance(int k) {
2283   double ret = 0.0;
2284   for (int i=0; i<numMetrics; i++) {
2285     if (keepMetric[i]) {
2286       DEBUGN("[%d] Phase %d Iter %d Metric %d Exec %lf Seed %lf \n", 
2287              CkMyPe(), currentPhase, phaseIter, i,
2288                currentExecTimes[i], incKSeeds[k*numMetrics + i]);
2289       ret += pow(currentExecTimes[i] - incKSeeds[k*numMetrics + i], 2.0);
2290     }
2291   }
2292   return sqrt(ret);
2293 }
2294
2295 void KMeansBOC::updateKSeeds(CkReductionMsg *msg) {
2296   CkAssert(CkMyPe() == 0);
2297
2298   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::updateKSeeds time=\t%g\n", CkMyPe(), CkWallTimer() );
2299
2300   double *modVector = (double *)msg->getData();
2301   // sanity check
2302   CkAssert(numK*(numMetrics+1)*sizeof(double) == msg->getSize());
2303
2304   // A quick convergence test.
2305   bool hasChanges = false;
2306   for (int i=0; i<numK; i++) {
2307     hasChanges = hasChanges || 
2308       (modVector[i*(numMetrics+1) + numMetrics] != 0.0);
2309   }
2310   if (!hasChanges) {
2311     delete msg;
2312     findRepresentatives();
2313   } else {
2314     int overallChange = 0;
2315     for (int i=0; i<numK; i++) {
2316       int change = (int)modVector[i*(numMetrics+1) + numMetrics];
2317       if (change != 0) {
2318         overallChange += change;
2319         // modify the k seeds based on the modification vectors coming in
2320         //
2321         // If a seed initially has no members, its contents do not matter and
2322         //   is simply set to the average of the incoming vector.
2323         // If the change causes a seed to lose all its members, do nothing.
2324         //   Its last-known location is kept to allow it to re-capture
2325         //   membership at the next iteration rather than apply the last
2326         //   changes (which snaps the point unnaturally to 0,0).
2327         // Otherwise, apply the appropriate vector changes.
2328         CkAssert((kNumMembers[i] + change >= 0) &&
2329                  (kNumMembers[i] + change <= CkNumPes()));
2330         if (kNumMembers[i] == 0) {
2331           CkAssert(change > 0);
2332           for (int j=0; j<numMetrics; j++) {
2333             kSeeds[i*numMetrics + j] = modVector[i*(numMetrics+1) + j]/change;
2334           }
2335         } else if (kNumMembers[i] + change == 0) {
2336           // do nothing.
2337         } else {
2338           for (int j=0; j<numMetrics; j++) {
2339             kSeeds[i*numMetrics + j] *= kNumMembers[i];
2340             kSeeds[i*numMetrics + j] += modVector[i*(numMetrics+1) + j];
2341             kSeeds[i*numMetrics + j] /= kNumMembers[i] + change;
2342           }
2343         }
2344         kNumMembers[i] += change;
2345       }
2346       DEBUGN("[%d] Phase %d Iter %d K = %d Membership Count = %d\n",
2347              CkMyPe(), currentPhase, phaseIter, i, kNumMembers[i]);
2348     }
2349     delete msg;
2350
2351     // broadcast the new seed locations.
2352     KSeedsMessage *outmsg = new (numK*numMetrics) KSeedsMessage;
2353     outmsg->numKPos = numK*numMetrics;
2354     for (int i=0; i<numK*numMetrics; i++) {
2355       outmsg->kSeedsPos[i] = kSeeds[i];
2356     }
2357
2358     thisProxy.updateSeedMembership(outmsg);
2359   }
2360 }
2361
2362 // Called on all processors
2363 void KMeansBOC::updateSeedMembership(KSeedsMessage *msg) {
2364
2365   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::updateSeedMembership time=\t%g\n", CkMyPe(), CkWallTimer() );
2366
2367   phaseIter++;
2368
2369   // **NOTE** This might change if we want to send data based on the filter
2370   //   instead of all the data.
2371   CkAssert(numK*numMetrics == msg->numKPos);
2372   for (int i=0; i<msg->numKPos; i++) {
2373     incKSeeds[i] = msg->kSeedsPos[i];
2374   }
2375
2376   // Decide which KSeed this processor belongs to.
2377   lastMinK = minK;
2378   minDistance = calculateDistance(0);
2379   DEBUGN("[%d] Phase %d Iter %d | Distance from 0 = %lf \n", CkMyPe(), 
2380          currentPhase, phaseIter, minDistance);
2381
2382   minK = 0;
2383   for (int i=1; i<numK; i++) {
2384     double distance = calculateDistance(i);
2385     DEBUGN("[%d] Phase %d Iter %d | Distance from %d = %lf \n", CkMyPe(), 
2386            currentPhase, phaseIter, i, distance);
2387     if (distance < minDistance) {
2388       minDistance = distance;
2389       minK = i;
2390     }
2391   }
2392
2393   double *modVector = new double[numK*(numMetrics+1)];
2394   for (int i=0; i<numK; i++) {
2395     for (int j=0; j<numMetrics+1; j++) {
2396       modVector[i*(numMetrics+1) + j] = 0.0;
2397     }
2398   }
2399
2400   if (minK != lastMinK) {
2401     for (int i=0; i<numMetrics; i++) {
2402       modVector[minK*(numMetrics+1) + i] = currentExecTimes[i];
2403       modVector[lastMinK*(numMetrics+1) + i] = -currentExecTimes[i];
2404     }
2405     modVector[minK*(numMetrics+1)+numMetrics] = 1.0;
2406     modVector[lastMinK*(numMetrics+1)+numMetrics] = -1.0;
2407   }
2408
2409   CkCallback cb(CkIndex_KMeansBOC::updateKSeeds(NULL), 
2410                 0, thisProxy);
2411   contribute(numK*(numMetrics+1)*sizeof(double), modVector, 
2412              CkReduction::sum_double, cb);  
2413 }
2414
2415 void KMeansBOC::findRepresentatives() {
2416
2417   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::findRepresentatives time=\t%g\n", CkMyPe(), CkWallTimer() );
2418
2419   int numNonEmptyClusters = 0;
2420   for (int i=0; i<numK; i++) {
2421     if (kNumMembers[i] > 0) {
2422       numNonEmptyClusters++;
2423     }
2424   }
2425
2426   int numRepresentatives = peNumKeep;
2427   // **FIXME**
2428   // This is fairly arbitrary. Next time, choose the centers of the top
2429   //   largest clusters.
2430   if (numRepresentatives < numNonEmptyClusters) {
2431     numRepresentatives = numNonEmptyClusters;
2432   }
2433
2434   int slotsRemaining = numRepresentatives;
2435
2436   DEBUGF("Slots = %d | Non-empty = %d \n", slotsRemaining, 
2437          numNonEmptyClusters);
2438
2439   // determine how many exemplars to select per cluster. Currently
2440   //   hardcoded to 1. Future challenge is to decide on other numbers
2441   //   or proportionality.
2442   //
2443   int exemplarsPerCluster = 1;
2444   slotsRemaining -= exemplarsPerCluster*numNonEmptyClusters;
2445
2446   int numCandidateOutliers = CkNumPes() - 
2447     exemplarsPerCluster*numNonEmptyClusters;
2448
2449   double *remainders = new double[numK];
2450   int *assigned = new int[numK];
2451   exemplarChoicesLeft = new int[numK];
2452   outlierChoicesLeft = new int[numK];
2453
2454   for (int i=0; i<numK; i++) {
2455     assigned[i] = 0;
2456     remainders[i] = 
2457       (kNumMembers[i] - exemplarsPerCluster*numNonEmptyClusters) *
2458       slotsRemaining / numCandidateOutliers;
2459     if (remainders[i] >= 0.0) {
2460       assigned[i] = (int)floor(remainders[i]);
2461       remainders[i] -= assigned[i];
2462     } else {
2463       remainders[i] = 0.0;
2464     }
2465   }
2466
2467   for (int i=0; i<numK; i++) {
2468     slotsRemaining -= assigned[i];
2469   }
2470   CkAssert(slotsRemaining >= 0);
2471
2472   // find clusters to assign the loose slots to, in order of
2473   // remainder proportion
2474   while (slotsRemaining > 0) {
2475     double max = 0.0;
2476     int winner = 0;
2477     for (int i=0; i<numK; i++) {
2478       if (remainders[i] > max) {
2479         max = remainders[i];
2480         winner = i;
2481       }
2482     }
2483     assigned[winner]++;
2484     remainders[winner] = 0.0;
2485     slotsRemaining--;
2486   }
2487
2488   // set up how many reduction cycles of min/max we need to conduct to
2489   // select the representatives.
2490   numSelectionIter = exemplarsPerCluster;
2491   for (int i=0; i<numK; i++) {
2492     if (assigned[i] > numSelectionIter) {
2493       numSelectionIter = assigned[i];
2494     }
2495   }
2496   DEBUGF("Selection Iterations = %d\n", numSelectionIter);
2497
2498   for (int i=0; i<numK; i++) {
2499     if (kNumMembers[i] > 0) {
2500       exemplarChoicesLeft[i] = exemplarsPerCluster;
2501       outlierChoicesLeft[i] = assigned[i];
2502     } else {
2503       exemplarChoicesLeft[i] = 0;
2504       outlierChoicesLeft[i] = 0;
2505     }
2506     DEBUGF("%d | Exemplar = %d | Outlier = %d\n", i, exemplarChoicesLeft[i],
2507            outlierChoicesLeft[i]);
2508   }
2509
2510   delete [] assigned;
2511   delete [] remainders;
2512
2513   // send out first broadcast
2514   KSelectionMessage *outmsg = NULL;
2515   if (numSelectionIter > 0) {
2516     outmsg = new (numK, numK, numK) KSelectionMessage;
2517     outmsg->numKMinIDs = numK;
2518     outmsg->numKMaxIDs = numK;
2519     for (int i=0; i<numK; i++) {
2520       outmsg->minIDs[i] = -1;
2521       outmsg->maxIDs[i] = -1;
2522     }
2523     thisProxy.collectDistances(outmsg);
2524   } else {
2525     CkPrintf("Warning: No selection iteration from the start!\n");
2526     // invoke phase completion on all processors
2527     thisProxy.phaseDone();
2528   }
2529 }
2530
2531 /*
2532  *  lastMin = array of minimum champions of the last tournament
2533  *  lastMax = array of maximum champions of the last tournament
2534  *  lastMaxVal = array of last encountered maximum values, allows previous
2535  *                 minimum winners to eliminate themselves from the next
2536  *                 minimum race.
2537  *
2538  *  Called on all processors.
2539  */
2540 void KMeansBOC::collectDistances(KSelectionMessage *msg) {
2541
2542   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::collectDistances time=\t%g\n", CkMyPe(), CkWallTimer() );
2543
2544   DEBUGF("[%d] %d | min = %d max = %d\n", CkMyPe(),
2545          lastMinK, msg->minIDs[lastMinK], msg->maxIDs[lastMinK]);
2546   if ((CkMyPe() == msg->minIDs[lastMinK]) || 
2547       (CkMyPe() == msg->maxIDs[lastMinK])) {
2548     CkAssert(!selected);
2549     selected = true;
2550   }
2551
2552   // build outgoing reduction structure
2553   //   format = minVal | ID | maxVal | ID
2554   double *minMaxAndIDs = NULL;
2555
2556   minMaxAndIDs = new double[numK*4];
2557   // initialize to the appropriate out-of-band values (for error checks)
2558   for (int i=0; i<numK; i++) {
2559     minMaxAndIDs[i*4] = -1.0; // out-of-band min value
2560     minMaxAndIDs[i*4+1] = -1.0; // out of band ID
2561     minMaxAndIDs[i*4+2] = -1.0; // out-of-band max value
2562     minMaxAndIDs[i*4+3] = -1.0; // out of band ID
2563   }
2564   // If I have not won before, I put myself back into the competition
2565   if (!selected) {
2566     DEBUGF("[%d] My Contribution = %lf\n", CkMyPe(), minDistance);
2567     minMaxAndIDs[lastMinK*4] = minDistance;
2568     minMaxAndIDs[lastMinK*4+1] = CkMyPe();
2569     minMaxAndIDs[lastMinK*4+2] = minDistance;
2570     minMaxAndIDs[lastMinK*4+3] = CkMyPe();
2571   }
2572   delete msg;
2573
2574   CkCallback cb(CkIndex_KMeansBOC::findNextMinMax(NULL), 
2575                 0, thisProxy);
2576   contribute(numK*4*sizeof(double), minMaxAndIDs, 
2577              minMaxReductionType, cb);  
2578 }
2579
2580 void KMeansBOC::findNextMinMax(CkReductionMsg *msg) {
2581   // incoming format:
2582   //   minVal | minID | maxVal | maxID
2583
2584   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::findNextMinMax time=\t%g\n", CkMyPe(), CkWallTimer() );
2585
2586   if (numSelectionIter > 0) {
2587     double *incInfo = (double *)msg->getData();
2588     
2589     KSelectionMessage *outmsg = new (numK, numK) KSelectionMessage;
2590     outmsg->numKMinIDs = numK;
2591     outmsg->numKMaxIDs = numK;
2592     
2593     for (int i=0; i<numK; i++) {
2594       DEBUGF("%d | %lf %d %lf %d \n", i, 
2595              incInfo[i*4], (int)incInfo[i*4+1], 
2596              incInfo[i*4+2], (int)incInfo[i*4+3]);
2597     }
2598
2599     for (int i=0; i<numK; i++) {
2600       if (exemplarChoicesLeft[i] > 0) {
2601         outmsg->minIDs[i] = (int)incInfo[i*4+1];
2602         exemplarChoicesLeft[i]--;
2603       } else {
2604         outmsg->minIDs[i] = -1;
2605       }
2606       if (outlierChoicesLeft[i] > 0) {
2607         outmsg->maxIDs[i] = (int)incInfo[i*4+3];
2608         outlierChoicesLeft[i]--;
2609       } else {
2610         outmsg->maxIDs[i] = -1;
2611       }
2612     }
2613     thisProxy.collectDistances(outmsg);
2614     numSelectionIter--;
2615   } else {
2616     // invoke phase completion on all processors
2617     thisProxy.phaseDone();
2618   }
2619 }
2620
2621 /**
2622  *  Completion of the K-Means clustering and data selection of one phase
2623  *    of the computation.
2624  *
2625  *  Called on every processor.
2626  */
2627 void KMeansBOC::phaseDone() {
2628
2629   //  if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::phaseDone time=\t%g\n", CkMyPe(), CkWallTimer() );
2630
2631   LogPool *pool = CkpvAccess(_trace)->_logPool;
2632   CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
2633
2634   // now decide on what to do with the decision.
2635   if (!selected) {
2636     if (usePhases) {
2637       pool->keepPhase[currentPhase] = false;
2638     } else {
2639       // if not using phases, we're working on the whole log
2640       pool->setAllPhases(false);
2641     }
2642   }
2643
2644   // **FIXME** (?) - All processors have to agree on this, or the reduction
2645   //   will not be correct! The question is "is this enforcible?"
2646   if ((currentPhase == (pool->numPhases-1)) || !usePhases) {
2647     // We're done
2648     int dummy = 0;
2649     CkCallback cb(CkIndex_TraceProjectionsBOC::kMeansDone(NULL), 
2650                   0, bocProxy);
2651     contribute(sizeof(int), &dummy, CkReduction::sum_int, cb);
2652   } else {
2653     // reset all phase-based k-means data and decisions
2654
2655     // **FIXME**!!!!!    
2656     
2657     // invoke the next K-Means computation phase.
2658     currentPhase++;
2659     thisProxy[CkMyPe()].getNextPhaseMetrics();
2660   }
2661 }
2662
2663 void TraceProjectionsBOC::startTimeAnalysis()
2664 {
2665   double startTime = 0.0;
2666   if (CkpvAccess(_trace)->_logPool->numEntries>0)
2667      startTime = CkpvAccess(_trace)->_logPool->pool[0].time;
2668   CkCallback cb(CkIndex_TraceProjectionsBOC::startTimeDone(NULL), thisProxy);
2669   contribute(sizeof(double), &startTime, CkReduction::min_double, cb);  
2670 }
2671
2672 void TraceProjectionsBOC::startTimeDone(CkReductionMsg *msg)
2673 {
2674   // CkPrintf("[%d] TraceProjectionsBOC::startTimeDone time=\t%g parModulesRemaining:%d\n", CkMyPe(), CkWallTimer(), parModulesRemaining);
2675
2676   if (CkpvAccess(_trace) != NULL) {
2677     CkpvAccess(_trace)->_logPool->globalStartTime = *(double *)msg->getData();
2678     CkpvAccess(_trace)->_logPool->setNewStartTime();
2679     //if (CkMyPe() == 0) CkPrintf("Start time determined to be %lf us\n", (CkpvAccess(_trace)->_logPool->globalStartTime)*1e06);
2680   }
2681   delete msg;
2682   thisProxy[CkMyPe()].startEndTimeAnalysis();
2683 }
2684
2685 void TraceProjectionsBOC::startEndTimeAnalysis()
2686 {
2687  //CkPrintf("[%d] TraceProjectionsBOC::startEndTimeAnalysis time=\t%g\n", CkMyPe(), CkWallTimer() );
2688
2689   endTime = CkpvAccess(_trace)->endTime;
2690   // CkPrintf("[%d] End time is %lf us\n", CkMyPe(), endTime*1e06);
2691
2692   CkCallback cb(CkIndex_TraceProjectionsBOC::endTimeDone(NULL), 
2693                 0, thisProxy);
2694   contribute(sizeof(double), &endTime, CkReduction::max_double, cb);  
2695 }
2696
2697 void TraceProjectionsBOC::endTimeDone(CkReductionMsg *msg)
2698 {
2699  //if(CkMyPe()==0)    CkPrintf("[%d] TraceProjectionsBOC::endTimeDone time=\t%g parModulesRemaining:%d\n", CkMyPe(), CkWallTimer(), parModulesRemaining);
2700
2701   CkAssert(CkMyPe() == 0);
2702   parModulesRemaining--;
2703   if (CkpvAccess(_trace) != NULL) {
2704     CkpvAccess(_trace)->_logPool->globalEndTime = *(double *)msg->getData() - CkpvAccess(_trace)->_logPool->globalStartTime;
2705     // CkPrintf("End time determined to be %lf us\n",
2706     //       (CkpvAccess(_trace)->_logPool->globalEndTime)*1e06);
2707   }
2708   delete msg;
2709   if (parModulesRemaining == 0) {
2710     thisProxy[CkMyPe()].finalize();
2711   }
2712 }
2713
2714 void TraceProjectionsBOC::kMeansDone(CkReductionMsg *msg) {
2715
2716  if(CkMyPe()==0)  CkPrintf("[%d] TraceProjectionsBOC::kMeansDone time=\t%g\n", CkMyPe(), CkWallTimer() );
2717
2718   CkAssert(CkMyPe() == 0);
2719   parModulesRemaining--;
2720   CkPrintf("K-Means Analysis Time = %lf seconds\n",
2721            CmiWallTimer()-analysisStartTime);
2722   delete msg;
2723   if (parModulesRemaining == 0) {
2724     thisProxy[CkMyPe()].finalize();
2725   }
2726 }
2727
2728 /**
2729  *
2730  *  This version is called (on processor 0) only if flushCheck fails.
2731  *
2732  */
2733 void TraceProjectionsBOC::kMeansDone() {
2734   CkAssert(CkMyPe() == 0);
2735   parModulesRemaining--;
2736   CkPrintf("K-Means Analysis Aborted because of flush. Time taken = %lf seconds\n",
2737            CmiWallTimer()-analysisStartTime);
2738   if (parModulesRemaining == 0) {
2739     thisProxy[CkMyPe()].finalize();
2740   }
2741 }
2742
2743 void TraceProjectionsBOC::finalize()
2744 {
2745   CkAssert(CkMyPe() == 0);
2746   //CkPrintf("Total Analysis Time = %lf seconds\n", 
2747   //       CmiWallTimer()-analysisStartTime);
2748   thisProxy.closingTraces();
2749 }
2750
2751 // Called on every processor
2752 void TraceProjectionsBOC::closingTraces() {
2753   CkpvAccess(_trace)->closeTrace();
2754
2755     // subtle:  reduction needs to go to the PE which started CkExit()
2756   int pe = 0;
2757   if (endPe != -1) pe = endPe;
2758   CkCallback cb(CkIndex_TraceProjectionsBOC::closeParallelShutdown(NULL), 
2759                 pe, thisProxy); 
2760   contribute(0, NULL, CkReduction::sum_int, cb);  
2761 }
2762
2763 // The sole purpose of this reduction is to decide whether or not
2764 //   Projections as a module needs to call CkExit() to get other
2765 //   modules to shutdown.
2766 void TraceProjectionsBOC::closeParallelShutdown(CkReductionMsg *msg) {
2767   CkAssert(endPe == -1 && CkMyPe() ==0 || CkMyPe() == endPe);
2768   delete msg;
2769   // decide if CkExit() needs to be called
2770   if (!CkpvAccess(_trace)->converseExit) {
2771     CkExit();
2772   }
2773 }
2774 /*
2775  *  Registration and definition of the Outlier Reduction callback.
2776  *  Format: Sum | Min | Max | Sum of Squares
2777  */
2778 CkReductionMsg *outlierReduction(int nMsgs,
2779                                  CkReductionMsg **msgs) {
2780   int numBytes = 0;
2781   int numMetrics = 0;
2782   double *ret = NULL;
2783
2784   if (nMsgs == 1) {
2785     // nothing to do, just pass it on
2786     return CkReductionMsg::buildNew(msgs[0]->getSize(),msgs[0]->getData());
2787   }
2788
2789   if (nMsgs > 1) {
2790     numBytes = msgs[0]->getSize();
2791     // sanity checks
2792     if (numBytes%sizeof(double) != 0) {
2793       CkAbort("Outlier Reduction Size incompatible with doubles!\n");
2794     }
2795     if ((numBytes/sizeof(double))%4 != 0) {
2796       CkAbort("Outlier Reduction Size Array not divisible by 4!\n");
2797     }
2798     numMetrics = (numBytes/sizeof(double))/4;
2799     ret = new double[numMetrics*4];
2800
2801     // copy the first message data into the return structure first
2802     for (int i=0; i<numMetrics*4; i++) {
2803       ret[i] = ((double *)msgs[0]->getData())[i];
2804     }
2805
2806     // Sum | Min | Max | Sum of Squares
2807     for (int msgIdx=1; msgIdx<nMsgs; msgIdx++) {
2808       for (int i=0; i<numMetrics; i++) {
2809         // Sum
2810         ret[i] += ((double *)msgs[msgIdx]->getData())[i];
2811         // Min
2812         ret[numMetrics + i] =
2813           (ret[numMetrics + i] < 
2814            ((double *)msgs[msgIdx]->getData())[numMetrics + i]) 
2815           ? ret[numMetrics + i] : 
2816           ((double *)msgs[msgIdx]->getData())[numMetrics + i];
2817         // Max
2818         ret[2*numMetrics + i] = 
2819           (ret[2*numMetrics + i] >
2820            ((double *)msgs[msgIdx]->getData())[2*numMetrics + i])
2821           ? ret[2*numMetrics + i] :
2822           ((double *)msgs[msgIdx]->getData())[2*numMetrics + i];
2823         // Sum of Squares (squaring already done at leaf)
2824         ret[3*numMetrics + i] +=
2825           ((double *)msgs[msgIdx]->getData())[3*numMetrics + i];
2826       }
2827     }
2828   }
2829   
2830   /* apparently, we do not delete the incoming messages */
2831   return CkReductionMsg::buildNew(numBytes,ret);
2832 }
2833
2834 /*
2835  * The only reason we have a user-defined reduction is to support
2836  *   identification of the "winning" processors as well as to handle
2837  *   both the min and the max of each "tournament". A simple min/max
2838  *   discovery cannot handle ties.
2839  */
2840 CkReductionMsg *minMaxReduction(int nMsgs,
2841                                 CkReductionMsg **msgs) {
2842   CkAssert(nMsgs > 0);
2843
2844   int numBytes = msgs[0]->getSize();
2845   CkAssert(numBytes%sizeof(double) == 0);
2846   int numK = (numBytes/sizeof(double))/4;
2847
2848   double *ret = new double[numK*4];
2849   // fill with out-of-band values
2850   for (int i=0; i<numK; i++) {
2851     ret[i*4] = -1.0;
2852     ret[i*4+1] = -1.0;
2853     ret[i*4+2] = -1.0;
2854     ret[i*4+3] = -1.0;
2855   }
2856
2857   // incoming format K * (minVal | minIdx | maxVal | maxIdx)
2858   for (int i=0; i<nMsgs; i++) {
2859     double *temp = (double *)msgs[i]->getData();
2860     for (int j=0; j<numK; j++) {
2861       // no previous valid min
2862       if (ret[j*4+1] < 0) {
2863         // fill it in only if the incoming min is valid
2864         if (temp[j*4+1] >= 0) {
2865           ret[j*4] = temp[j*4];      // fill min value
2866           ret[j*4+1] = temp[j*4+1];  // fill ID
2867         }
2868       } else {
2869         // find Min, only if incoming min is valid
2870         if (temp[j*4+1] >= 0) {
2871           if (temp[j*4] < ret[j*4]) {
2872             ret[j*4] = temp[j*4];      // replace min value
2873             ret[j*4+1] = temp[j*4+1];  // replace ID
2874           }
2875         }
2876       }
2877       // no previous valid max
2878       if (ret[j*4+3] < 0) {
2879         // fill only if the incoming max is valid
2880         if (temp[j*4+3] >= 0) {
2881           ret[j*4+2] = temp[j*4+2];  // fill max value
2882           ret[j*4+3] = temp[j*4+3];  // fill ID
2883         }
2884       } else {
2885         // find Max, only if incoming max is valid
2886         if (temp[j*4+3] >= 0) {
2887           if (temp[j*4+2] > ret[j*4+2]) {
2888             ret[j*4+2] = temp[j*4+2];  // replace max value
2889             ret[j*4+3] = temp[j*4+3];  // replace ID
2890           }
2891         }
2892       }
2893     }
2894   }
2895   CkReductionMsg *redmsg = CkReductionMsg::buildNew(numBytes, ret);
2896   delete [] ret;
2897   return redmsg;
2898 }
2899
2900 #include "TraceProjections.def.h"
2901 #endif //PROJ_ANALYSIS
2902
2903 /*@}*/