Fixed message tracing on comm thread (to trace those entry methods that need
[charm.git] / src / ck-perf / trace-projections.C
1 /**
2  * \addtogroup CkPerf
3 */
4 /*@{*/
5
6 #include <string.h>
7
8 #include "charm++.h"
9 #include "trace-projections.h"
10 #include "trace-projectionsBOC.h"
11
12 #if DEBUG_PROJ
13 #define DEBUGF(format, ...) CkPrintf(format, ## __VA_ARGS__)
14 #else
15 #define DEBUGF(format, ...)           // CmiPrintf x
16 #endif
17 #define DEBUGN(format, ...)  // easy way to selectively disable DEBUGs
18
19 #define DefaultLogBufSize      1000000
20
21 // **CW** Simple delta encoding implementation
22 // delta encoding is on by default. It may be turned off later in
23 // the runtime.
24 int deltaLog;
25 int nonDeltaLog;
26
27 int checknested=0;              // check illegal nested begin/end execute 
28
29 #ifdef PROJ_ANALYSIS
30 // BOC operations readonlys
31 CkGroupID traceProjectionsGID;
32 CkGroupID kMeansGID;
33
34 // New reduction type for Outlier Analysis purposes. This is allowed to be
35 // a global variable according to the Charm++ manual.
36 CkReductionMsg *outlierReduction(int nMsgs,
37                                  CkReductionMsg **msgs);
38 CkReductionMsg *minMaxReduction(int nMsgs,
39                                 CkReductionMsg **msgs);
40 CkReduction::reducerType outlierReductionType;
41 CkReduction::reducerType minMaxReductionType;
42 #endif // PROJ_ANALYSIS
43
44 CkpvStaticDeclare(TraceProjections*, _trace);
45 CtvStaticDeclare(int,curThreadEvent);
46
47 CkpvDeclare(CmiInt8, CtrLogBufSize);
48
49 typedef CkVec<char *>  usrEventVec;
50 CkpvStaticDeclare(usrEventVec, usrEventlist);
51 class UsrEvent {
52 public:
53   int e;
54   char *str;
55   UsrEvent(int _e, char* _s): e(_e),str(_s) {}
56 };
57 CkpvStaticDeclare(CkVec<UsrEvent *>*, usrEvents);
58
59 // When tracing is disabled, these are defined as empty static inlines
60 // in the header, to minimize overhead
61 #if CMK_TRACE_ENABLED
62 /// Disable the outputting of the trace logs
63 void disableTraceLogOutput()
64
65   CkpvAccess(_trace)->setWriteData(false);
66 }
67
68 /// Enable the outputting of the trace logs
69 void enableTraceLogOutput()
70 {
71   CkpvAccess(_trace)->setWriteData(true);
72 }
73
74 /// Force the log files to be flushed
75 void flushTraceLog()
76 {
77   CkpvAccess(_trace)->traceFlushLog();
78 }
79 #endif
80
81 #if ! CMK_TRACE_ENABLED
82 static int warned=0;
83 #define OPTIMIZED_VERSION       \
84         if (!warned) { warned=1;        \
85         CmiPrintf("\n\n!!!! Warning: traceUserEvent not available in optimized version!!!!\n\n\n"); }
86 #else
87 #define OPTIMIZED_VERSION /*empty*/
88 #endif // CMK_TRACE_ENABLED
89
90 /*
91 On T3E, we need to have file number control by open/close files only when needed.
92 */
93 #if CMK_TRACE_LOGFILE_NUM_CONTROL
94   #define OPEN_LOG openLog("a");
95   #define CLOSE_LOG closeLog();
96 #else
97   #define OPEN_LOG
98   #define CLOSE_LOG
99 #endif //CMK_TRACE_LOGFILE_NUM_CONTROL
100
101 #if CMK_HAS_COUNTER_PAPI
102 int numPAPIEvents = 2;
103 int papiEvents[] = { PAPI_L3_DCM, PAPI_FP_OPS };
104 char *papiEventNames[] = {"PAPI_L3_DCM", "PAPI_FP_OPS"};
105 #endif // CMK_HAS_COUNTER_PAPI
106
107 /**
108   For each TraceFoo module, _createTraceFoo() must be defined.
109   This function is called in _createTraces() generated in moduleInit.C
110 */
111 void _createTraceprojections(char **argv)
112 {
113   DEBUGF("%d createTraceProjections\n", CkMyPe());
114   CkpvInitialize(CkVec<char *>, usrEventlist);
115   CkpvInitialize(CkVec<UsrEvent *>*, usrEvents);
116   CkpvAccess(usrEvents) = new CkVec<UsrEvent *>();
117 #if CMK_BLUEGENE_CHARM
118   // CthRegister does not call the constructor
119 //  CkpvAccess(usrEvents) = CkVec<UsrEvent *>();
120 #endif //CMK_BLUEGENE_CHARM
121   CkpvInitialize(TraceProjections*, _trace);
122   CkpvAccess(_trace) = new  TraceProjections(argv);
123   CkpvAccess(_traces)->addTrace(CkpvAccess(_trace));
124   if (CkMyPe()==0) CkPrintf("Charm++: Tracemode Projections enabled.\n");
125 }
126  
127 /* ****** CW TEMPORARY LOCATION ***** Support for thread listeners */
128
129 struct TraceThreadListener {
130   struct CthThreadListener base;
131   int event;
132   int msgType;
133   int ep;
134   int srcPe;
135   int ml;
136   CmiObjId idx;
137 };
138
139 extern "C"
140 void traceThreadListener_suspend(struct CthThreadListener *l)
141 {
142   TraceThreadListener *a=(TraceThreadListener *)l;
143   /* here, we activate the appropriate trace codes for the appropriate
144      registered modules */
145   traceSuspend();
146 }
147
148 extern "C"
149 void traceThreadListener_resume(struct CthThreadListener *l) 
150 {
151   TraceThreadListener *a=(TraceThreadListener *)l;
152   /* here, we activate the appropriate trace codes for the appropriate
153      registered modules */
154   _TRACE_BEGIN_EXECUTE_DETAILED(a->event,a->msgType,a->ep,a->srcPe,a->ml,
155                                 CthGetThreadID(a->base.thread));
156   a->event=-1;
157   a->srcPe=CkMyPe(); /* potential lie to migrated threads */
158   a->ml=0;
159 }
160
161 extern "C"
162 void traceThreadListener_free(struct CthThreadListener *l) 
163 {
164   TraceThreadListener *a=(TraceThreadListener *)l;
165   delete a;
166 }
167
168 void TraceProjections::traceAddThreadListeners(CthThread tid, envelope *e)
169 {
170 #if CMK_TRACE_ENABLED
171   /* strip essential information from the envelope */
172   TraceThreadListener *a= new TraceThreadListener;
173   
174   a->base.suspend=traceThreadListener_suspend;
175   a->base.resume=traceThreadListener_resume;
176   a->base.free=traceThreadListener_free;
177   a->event=e->getEvent();
178   a->msgType=e->getMsgtype();
179   a->ep=e->getEpIdx();
180   a->srcPe=e->getSrcPe();
181   a->ml=e->getTotalsize();
182
183   CthAddListener(tid, (CthThreadListener *)a);
184 #endif
185 }
186
187 void LogPool::openLog(const char *mode)
188 {
189 #if CMK_PROJECTIONS_USE_ZLIB
190   if(compressed) {
191     if (nonDeltaLog) {
192       do {
193         zfp = gzopen(fname, mode);
194       } while (!zfp && (errno == EINTR || errno == EMFILE));
195       if(!zfp) CmiAbort("Cannot open Projections Compressed Non Delta Trace File for writing...\n");
196     }
197     if (deltaLog) {
198       do {
199         deltazfp = gzopen(dfname, mode);
200       } while (!deltazfp && (errno == EINTR || errno == EMFILE));
201       if (!deltazfp) 
202         CmiAbort("Cannot open Projections Compressed Delta Trace File for writing...\n");
203     }
204   } else {
205     if (nonDeltaLog) {
206       do {
207         fp = fopen(fname, mode);
208       } while (!fp && (errno == EINTR || errno == EMFILE));
209       if (!fp) {
210         CkPrintf("[%d] Attempting to open file [%s]\n",CkMyPe(),fname);
211         CmiAbort("Cannot open Projections Non Delta Trace File for writing...\n");
212       }
213     }
214     if (deltaLog) {
215       do {
216         deltafp = fopen(dfname, mode);
217       } while (!deltafp && (errno == EINTR || errno == EMFILE));
218       if (!deltafp) 
219         CmiAbort("Cannot open Projections Delta Trace File for writing...\n");
220     }
221   }
222 #else
223   if (nonDeltaLog) {
224     do {
225       fp = fopen(fname, mode);
226     } while (!fp && (errno == EINTR || errno == EMFILE));
227     if (!fp) {
228       CkPrintf("[%d] Attempting to open file [%s]\n",CkMyPe(),fname);
229       CmiAbort("Cannot open Projections Non Delta Trace File for writing...\n");
230     }
231   }
232   if (deltaLog) {
233     do {
234       deltafp = fopen(dfname, mode);
235     } while (!deltafp && (errno == EINTR || errno == EMFILE));
236     if(!deltafp) 
237       CmiAbort("Cannot open Projections Delta Trace File for writing...\n");
238   }
239 #endif
240 }
241
242 void LogPool::closeLog(void)
243 {
244 #if CMK_PROJECTIONS_USE_ZLIB
245   if(compressed) {
246     if (nonDeltaLog) gzclose(zfp);
247     if (deltaLog) gzclose(deltazfp);
248     return;
249   }
250 #endif
251   if (nonDeltaLog) { 
252 #if !defined(_WIN32) || defined(__CYGWIN__)
253     fsync(fileno(fp)); 
254 #endif
255     fclose(fp); 
256   }
257   if (deltaLog)  { 
258 #if !defined(_WIN32) || defined(__CYGWIN__)
259     fsync(fileno(deltafp)); 
260 #endif
261     fclose(deltafp);  
262   }
263 }
264
265 LogPool::LogPool(char *pgm) {
266   pool = new LogEntry[CkpvAccess(CtrLogBufSize)];
267   // defaults to writing data (no outlier changes)
268   writeData = true;
269   numEntries = 0;
270   // **CW** for simple delta encoding
271   prevTime = 0.0;
272   timeErr = 0.0;
273   globalEndTime = 0.0;
274   headerWritten = 0;
275   numPhases = 0;
276   hasFlushed = false;
277
278   keepPhase = NULL;
279
280   fileCreated = false;
281   poolSize = CkpvAccess(CtrLogBufSize);
282   pgmname = new char[strlen(pgm)+1];
283   strcpy(pgmname, pgm);
284 }
285
286 void LogPool::createFile(const char *fix)
287 {
288   if (fileCreated) {
289     return;
290   }
291
292   char* filenameLastPart = strrchr(pgmname, PATHSEP) + 1; // Last occurrence of path separator
293   char *pathPlusFilePrefix = new char[1024];
294
295   if(nSubdirs > 0){
296     int sd = CkMyPe() % nSubdirs;
297     char *subdir = new char[1024];
298     sprintf(subdir, "%s.projdir.%d", pgmname, sd);
299     CmiMkdir(subdir);
300     sprintf(pathPlusFilePrefix, "%s%c%s%s", subdir, PATHSEP, filenameLastPart, fix);
301     delete[] subdir;
302   } else {
303     sprintf(pathPlusFilePrefix, "%s%s", pgmname, fix);
304   }
305
306   char pestr[10];
307   sprintf(pestr, "%d", CkMyPe());
308 #if CMK_PROJECTIONS_USE_ZLIB
309   int len;
310   if(compressed)
311     len = strlen(pathPlusFilePrefix)+strlen(".logold")+strlen(pestr)+strlen(".gz")+3;
312   else
313     len = strlen(pathPlusFilePrefix)+strlen(".logold")+strlen(pestr)+3;
314 #else
315   int len = strlen(pathPlusFilePrefix)+strlen(".logold")+strlen(pestr)+3;
316 #endif
317
318   if (nonDeltaLog) {
319     fname = new char[len];
320   }
321   if (deltaLog) {
322     dfname = new char[len];
323   }
324 #if CMK_PROJECTIONS_USE_ZLIB
325   if(compressed) {
326     if (deltaLog && nonDeltaLog) {
327       sprintf(fname, "%s.%s.logold.gz",  pathPlusFilePrefix, pestr);
328       sprintf(dfname, "%s.%s.log.gz", pathPlusFilePrefix, pestr);
329     } else {
330       if (nonDeltaLog) {
331         sprintf(fname, "%s.%s.log.gz", pathPlusFilePrefix,pestr);
332       } else {
333         sprintf(dfname, "%s.%s.log.gz", pathPlusFilePrefix, pestr);
334       }
335     }
336   } else {
337     if (deltaLog && nonDeltaLog) {
338       sprintf(fname, "%s.%s.logold", pathPlusFilePrefix, pestr);
339       sprintf(dfname, "%s.%s.log", pathPlusFilePrefix, pestr);
340     } else {
341       if (nonDeltaLog) {
342         sprintf(fname, "%s.%s.log", pathPlusFilePrefix, pestr);
343       } else {
344         sprintf(dfname, "%s.%s.log", pathPlusFilePrefix, pestr);
345       }
346     }
347   }
348 #else
349   if (deltaLog && nonDeltaLog) {
350     sprintf(fname, "%s.%s.logold", pathPlusFilePrefix, pestr);
351     sprintf(dfname, "%s.%s.log", pathPlusFilePrefix, pestr);
352   } else {
353     if (nonDeltaLog) {
354       sprintf(fname, "%s.%s.log", pathPlusFilePrefix, pestr);
355     } else {
356       sprintf(dfname, "%s.%s.log", pathPlusFilePrefix, pestr);
357     }
358   }
359 #endif
360   fileCreated = true;
361   delete[] pathPlusFilePrefix;
362   openLog("w+");
363   CLOSE_LOG 
364 }
365
366 void LogPool::createSts(const char *fix)
367 {
368   CkAssert(CkMyPe() == 0);
369   // create the sts file
370   char *fname = new char[strlen(CkpvAccess(traceRoot))+strlen(fix)+strlen(".sts")+2];
371   sprintf(fname, "%s%s.sts", CkpvAccess(traceRoot), fix);
372   do
373     {
374       stsfp = fopen(fname, "w");
375     } while (!stsfp && (errno == EINTR || errno == EMFILE));
376   if(stsfp==0)
377     CmiAbort("Cannot open projections sts file for writing.\n");
378   delete[] fname;
379 }  
380
381 void LogPool::createRC()
382 {
383   // create the projections rc file.
384   fname = 
385     new char[strlen(CkpvAccess(traceRoot))+strlen(".projrc")+1];
386   sprintf(fname, "%s.projrc", CkpvAccess(traceRoot));
387   do {
388     rcfp = fopen(fname, "w");
389   } while (!rcfp && (errno == EINTR || errno == EMFILE));
390   if (rcfp==0) {
391     CmiAbort("Cannot open projections configuration file for writing.\n");
392   }
393   delete[] fname;
394 }
395
396 LogPool::~LogPool() 
397 {
398   if (writeData) {
399     writeLog();
400 #if !CMK_TRACE_LOGFILE_NUM_CONTROL
401     closeLog();
402 #endif
403   }
404
405 #if CMK_BLUEGENE_CHARM
406   extern int correctTimeLog;
407   if (correctTimeLog) {
408     createFile("-bg");
409     if (CkMyPe() == 0) {
410       createSts("-bg");
411     }
412     writeHeader();
413     if (CkMyPe() == 0) writeSts(NULL);
414     postProcessLog();
415   }
416 #endif
417
418   delete[] pool;
419   delete [] fname;
420 }
421
422 void LogPool::writeHeader()
423 {
424   if (headerWritten) return;
425   headerWritten = 1;
426   if(!binary) {
427 #if CMK_PROJECTIONS_USE_ZLIB
428     if(compressed) {
429       if (nonDeltaLog) {
430         gzprintf(zfp, "PROJECTIONS-RECORD %d\n", numEntries);
431       }
432       if (deltaLog) {
433         gzprintf(deltazfp, "PROJECTIONS-RECORD %d DELTA\n", numEntries);
434       }
435     } 
436     else /* else clause is below... */
437 #endif
438     /*... may hang over from else above */ {
439       if (nonDeltaLog) {
440         fprintf(fp, "PROJECTIONS-RECORD %d\n", numEntries);
441       }
442       if (deltaLog) {
443         fprintf(deltafp, "PROJECTIONS-RECORD %d DELTA\n", numEntries);
444       }
445     }
446   }
447   else { // binary
448       if (nonDeltaLog) {
449         fwrite(&numEntries,sizeof(numEntries),1,fp);
450       }
451       if (deltaLog) {
452         fwrite(&numEntries,sizeof(numEntries),1,deltafp);
453       }
454   }
455 }
456
457 void LogPool::writeLog(void)
458 {
459   createFile();
460   OPEN_LOG
461   writeHeader();
462   if (nonDeltaLog) write(0);
463   if (deltaLog) write(1);
464   CLOSE_LOG
465 }
466
467 void LogPool::write(int writedelta) 
468 {
469   // **CW** Simple delta encoding implementation
470   // prevTime has to be maintained as an object variable because
471   // LogPool::write may be called several times depending on the
472   // +logsize value.
473   PUP::er *p = NULL;
474   if (binary) {
475     p = new PUP::toDisk(writedelta?deltafp:fp);
476   }
477 #if CMK_PROJECTIONS_USE_ZLIB
478   else if (compressed) {
479     p = new toProjectionsGZFile(writedelta?deltazfp:zfp);
480   }
481 #endif
482   else {
483     p = new toProjectionsFile(writedelta?deltafp:fp);
484   }
485   CmiAssert(p);
486   int curPhase = 0;
487   // **FIXME** - Should probably consider a more sophisticated bounds-based
488   //   approach for selective writing instead of making multiple if-checks
489   //   for every single event.
490   for(UInt i=0; i<numEntries; i++) {
491     if (!writedelta) {
492       if (keepPhase == NULL) {
493         // default case, when no phase selection is required.
494         pool[i].pup(*p);
495       } else {
496         // **FIXME** Might be a good idea to create a "filler" event block for
497         //   all the events taken out by phase filtering.
498         if (pool[i].type == END_PHASE) {
499           // always write phase markers
500           pool[i].pup(*p);
501           curPhase++;
502         } else if (pool[i].type == BEGIN_COMPUTATION ||
503                    pool[i].type == END_COMPUTATION) {
504           // always write BEGIN and END COMPUTATION markers
505           pool[i].pup(*p);
506         } else if (keepPhase[curPhase]) {
507           pool[i].pup(*p);
508         }
509       }
510     }
511     else {      // delta
512       // **FIXME** Implement phase-selective writing for delta logs
513       //   eventually
514       double time = pool[i].time;
515       if (pool[i].type != BEGIN_COMPUTATION && pool[i].type != END_COMPUTATION)
516       {
517         double timeDiff = (time-prevTime)*1.0e6;
518         UInt intTimeDiff = (UInt)timeDiff;
519         timeErr += timeDiff - intTimeDiff; /* timeErr is never >= 2.0 */
520         if (timeErr > 1.0) {
521           timeErr -= 1.0;
522           intTimeDiff++;
523         }
524         pool[i].time = intTimeDiff/1.0e6;
525       }
526       pool[i].pup(*p);
527       pool[i].time = time;      // restore time value
528       prevTime = time;
529     }
530   }
531   delete p;
532   delete [] keepPhase;
533 }
534
535 void LogPool::writeSts(void)
536 {
537   // for whining compilers
538   int i;
539   char name[30];
540   // generate an automatic unique ID for each log
541   fprintf(stsfp, "PROJECTIONS_ID %s\n", "");
542   fprintf(stsfp, "VERSION %s\n", PROJECTION_VERSION);
543   fprintf(stsfp, "TOTAL_PHASES %d\n", numPhases);
544 #if CMK_HAS_COUNTER_PAPI
545   fprintf(stsfp, "TOTAL_PAPI_EVENTS %d\n", numPAPIEvents);
546   // for now, use i, next time use papiEvents[i].
547   // **CW** papi event names is a hack.
548   for (i=0;i<numPAPIEvents;i++) {
549     fprintf(stsfp, "PAPI_EVENT %d %s\n", i, papiEventNames[i]);
550   }
551 #endif
552   traceWriteSTS(stsfp,CkpvAccess(usrEvents)->length());
553   for(i=0;i<CkpvAccess(usrEvents)->length();i++){
554     fprintf(stsfp, "EVENT %d %s\n", (*CkpvAccess(usrEvents))[i]->e, (*CkpvAccess(usrEvents))[i]->str);
555   }     
556 }
557
558 void LogPool::writeSts(TraceProjections *traceProj){
559   writeSts();
560   if (traceProj != NULL) {
561     CkHashtableIterator  *funcIter = traceProj->getfuncIterator();
562     funcIter->seekStart();
563     int numFuncs = traceProj->getFuncNumber();
564     fprintf(stsfp,"TOTAL_FUNCTIONS %d \n",numFuncs);
565     while(funcIter->hasNext()) {
566       StrKey *key;
567       int *obj = (int *)funcIter->next((void **)&key);
568       fprintf(stsfp,"FUNCTION %d %s \n",*obj,key->getStr());
569     }
570   }
571   fprintf(stsfp, "END\n");
572   fclose(stsfp);
573 }
574
575 void LogPool::writeRC(void)
576 {
577     //CkPrintf("write RC is being executed\n");
578 #ifdef PROJ_ANALYSIS  
579     CkAssert(CkMyPe() == 0);
580     fprintf(rcfp,"RC_GLOBAL_END_TIME %lld\n",
581           (CMK_TYPEDEF_UINT8)(1.0e6*globalEndTime));
582     /* //Yanhua comment it because isOutlierAutomatic is not a variable in trace
583     if (CkpvAccess(_trace)->isOutlierAutomatic()) {
584       fprintf(rcfp,"RC_OUTLIER_FILTERED true\n");
585     } else {
586       fprintf(rcfp,"RC_OUTLIER_FILTERED false\n");
587     }
588     */
589 #endif //PROJ_ANALYSIS
590   fclose(rcfp);
591 }
592
593
594 #if CMK_BLUEGENE_CHARM
595 static void updateProjLog(void *data, double t, double recvT, void *ptr)
596 {
597   LogEntry *log = (LogEntry *)data;
598   FILE *fp = *(FILE **)ptr;
599   log->time = t;
600   log->recvTime = recvT<0.0?0:recvT;
601 //  log->write(fp);
602   toProjectionsFile p(fp);
603   log->pup(p);
604 }
605 #endif
606
607 // flush log entries to disk
608 void LogPool::flushLogBuffer()
609 {
610   if (numEntries) {
611     double writeTime = TraceTimer();
612     writeLog();
613     hasFlushed = true;
614     numEntries = 0;
615     new (&pool[numEntries++]) LogEntry(writeTime, BEGIN_INTERRUPT);
616     new (&pool[numEntries++]) LogEntry(TraceTimer(), END_INTERRUPT);
617   }
618 }
619
620 void LogPool::add(UChar type, UShort mIdx, UShort eIdx,
621                   double time, int event, int pe, int ml, CmiObjId *id, 
622                   double recvT, double cpuT, int numPe)
623 {
624   new (&pool[numEntries++])
625     LogEntry(time, type, mIdx, eIdx, event, pe, ml, id, recvT, cpuT, numPe);
626   if ((type == END_PHASE) || (type == END_COMPUTATION)) {
627     numPhases++;
628   }
629   if(poolSize==numEntries) {
630     flushLogBuffer();
631 #if CMK_BLUEGENE_CHARM
632     extern int correctTimeLog;
633     if (correctTimeLog) CmiAbort("I/O interrupt!\n");
634 #endif
635   }
636 #if CMK_BLUEGENE_CHARM
637   switch (type) {
638     case BEGIN_PROCESSING:
639       pool[numEntries-1].recvTime = BgGetRecvTime();
640     case END_PROCESSING:
641     case BEGIN_COMPUTATION:
642     case END_COMPUTATION:
643     case CREATION:
644     case BEGIN_PACK:
645     case END_PACK:
646     case BEGIN_UNPACK:
647     case END_UNPACK:
648     case USER_EVENT_PAIR:
649       bgAddProjEvent(&pool[numEntries-1], numEntries-1, time, updateProjLog, &fp, BG_EVENT_PROJ);
650   }
651 #endif
652 }
653
654 void LogPool::add(UChar type,double time,UShort funcID,int lineNum,char *fileName){
655 #ifndef CMK_BLUEGENE_CHARM
656   new (&pool[numEntries++])
657         LogEntry(time,type,funcID,lineNum,fileName);
658   if(poolSize == numEntries){
659     flushLogBuffer();
660   }
661 #endif  
662 }
663
664
665   
666 void LogPool::addMemoryUsage(unsigned char type,double time,double memUsage){
667 #ifndef CMK_BLUEGENE_CHARM
668   new (&pool[numEntries++])
669         LogEntry(type,time,memUsage);
670   if(poolSize == numEntries){
671     flushLogBuffer();
672   }
673 #endif  
674         
675 }  
676
677
678
679 void LogPool::addUserSupplied(int data){
680         // add an event
681         add(USER_SUPPLIED, 0, 0, TraceTimer(), -1, -1, 0, 0, 0, 0, 0 );
682
683         // set the user supplied value for the previously created event 
684         pool[numEntries-1].setUserSuppliedData(data);
685   }
686
687
688 void LogPool::addUserSuppliedNote(char *note){
689         // add an event
690         add(USER_SUPPLIED_NOTE, 0, 0, TraceTimer(), -1, -1, 0, 0, 0, 0, 0 );
691
692         // set the user supplied note for the previously created event 
693         pool[numEntries-1].setUserSuppliedNote(note);
694   }
695
696 void LogPool::addUserSuppliedBracketedNote(char *note, int eventID, double bt, double et){
697   //CkPrintf("LogPool::addUserSuppliedBracketedNote eventID=%d\n", eventID);
698 #ifndef CMK_BLUEGENE_CHARM
699 #if CMK_SMP_TRACE_COMMTHREAD && MPI_SMP_TRACE_COMMTHREAD_HACK
700   //This part of code is used  to combine the contiguous
701   //MPI_Test and MPI_Iprobe events to reduce the number of
702   //entries
703 #define MPI_TEST_EVENT_ID 60
704 #define MPI_IPROBE_EVENT_ID 70 
705   int lastEvent = pool[numEntries-1].event;
706   if((eventID==MPI_TEST_EVENT_ID || eventID==MPI_IPROBE_EVENT_ID) && (eventID==lastEvent)){
707     //just replace the endtime of last event
708     //CkPrintf("addUserSuppliedBracketNote: for event %d\n", lastEvent);
709     pool[numEntries].endTime = et;
710   }else{
711     new (&pool[numEntries++])
712       LogEntry(bt, et, USER_SUPPLIED_BRACKETED_NOTE, note, eventID);
713   }
714 #else
715   new (&pool[numEntries++])
716     LogEntry(bt, et, USER_SUPPLIED_BRACKETED_NOTE, note, eventID);
717 #endif
718   if(poolSize == numEntries){
719     flushLogBuffer();
720   }
721 #endif  
722 }
723
724
725 /* **CW** Not sure if this is the right thing to do. Feels more like
726    a hack than a solution to Sameer's request to add the destination
727    processor information to multicasts and broadcasts.
728
729    In the unlikely event this method is used for Broadcasts as well,
730    pelist == NULL will be used to indicate a global broadcast with 
731    num PEs.
732 */
733 void LogPool::addCreationMulticast(UShort mIdx, UShort eIdx, double time,
734                                    int event, int pe, int ml, CmiObjId *id,
735                                    double recvT, int numPe, int *pelist)
736 {
737   new (&pool[numEntries++])
738     LogEntry(time, mIdx, eIdx, event, pe, ml, id, recvT, numPe, pelist);
739   if(poolSize==numEntries) {
740     flushLogBuffer();
741   }
742 }
743
744 void LogPool::postProcessLog()
745 {
746 #if CMK_BLUEGENE_CHARM
747   bgUpdateProj(1);   // event type
748 #endif
749 }
750
751 void LogPool::modLastEntryTimestamp(double ts)
752 {
753   pool[numEntries-1].time = ts;
754   //pool[numEntries-1].cputime = ts;
755 }
756
757 // /** Constructor for a multicast log entry */
758 // 
759 //  THIS WAS MOVED TO trace-projections.h with the other constructors
760 // 
761 // LogEntry::LogEntry(double tm, unsigned short m, unsigned short e, int ev, int p,
762 //           int ml, CmiObjId *d, double rt, int numPe, int *pelist) 
763 // {
764 //     type = CREATION_MULTICAST; mIdx = m; eIdx = e; event = ev; pe = p; time = tm; msglen = ml;
765 //     if (d) id = *d; else {id.id[0]=id.id[1]=id.id[2]=id.id[3]=-1; };
766 //     recvTime = rt; 
767 //     numpes = numPe;
768 //     userSuppliedNote = NULL;
769 //     if (pelist != NULL) {
770 //      pes = new int[numPe];
771 //      for (int i=0; i<numPe; i++) {
772 //        pes[i] = pelist[i];
773 //      }
774 //     } else {
775 //      pes= NULL;
776 //     }
777 // }
778
779 void LogEntry::addPapi(int numPapiEvts, int *papi_ids, LONG_LONG_PAPI *papiVals)
780 {
781 #if CMK_HAS_COUNTER_PAPI
782   numPapiEvents = numPapiEvts;
783   if (papiVals != NULL) {
784     papiIDs = new int[numPapiEvents];
785     papiValues = new LONG_LONG_PAPI[numPapiEvents];
786     for (int i=0; i<numPapiEvents; i++) {
787       papiIDs[i] = papi_ids[i];
788       papiValues[i] = papiVals[i];
789     }
790   }
791 #endif
792 }
793
794
795
796 void LogEntry::pup(PUP::er &p)
797 {
798   int i;
799   CMK_TYPEDEF_UINT8 itime, iEndTime, irecvtime, icputime;
800   char ret = '\n';
801
802   p|type;
803   if (p.isPacking()) itime = (CMK_TYPEDEF_UINT8)(1.0e6*time);
804   if (p.isPacking()) iEndTime = (CMK_TYPEDEF_UINT8)(1.0e6*endTime);
805
806   switch (type) {
807     case USER_EVENT:
808     case USER_EVENT_PAIR:
809       p|mIdx; p|itime; p|event; p|pe;
810       break;
811     case BEGIN_IDLE:
812     case END_IDLE:
813     case BEGIN_PACK:
814     case END_PACK:
815     case BEGIN_UNPACK:
816     case END_UNPACK:
817       p|itime; p|pe; 
818       break;
819     case BEGIN_PROCESSING:
820       if (p.isPacking()) {
821         irecvtime = (CMK_TYPEDEF_UINT8)(recvTime==-1?-1:1.0e6*recvTime);
822         icputime = (CMK_TYPEDEF_UINT8)(1.0e6*cputime);
823       }
824       p|mIdx; p|eIdx; p|itime; p|event; p|pe; 
825       p|msglen; p|irecvtime; 
826       p|id.id[0]; p|id.id[1]; p|id.id[2]; p|id.id[3];
827       p|icputime;
828 #if CMK_HAS_COUNTER_PAPI
829       p|numPapiEvents;
830       for (i=0; i<numPapiEvents; i++) {
831         // not yet!!!
832         //      p|papiIDs[i]; 
833         p|papiValues[i];
834         
835       }
836 #else
837       p|numPapiEvents;     // non papi version has value 0
838 #endif
839       if (p.isUnpacking()) {
840         recvTime = irecvtime/1.0e6;
841         cputime = icputime/1.0e6;
842       }
843       break;
844     case END_PROCESSING:
845       if (p.isPacking()) icputime = (CMK_TYPEDEF_UINT8)(1.0e6*cputime);
846       p|mIdx; p|eIdx; p|itime; p|event; p|pe; p|msglen; p|icputime;
847 #if CMK_HAS_COUNTER_PAPI
848       p|numPapiEvents;
849       for (i=0; i<numPapiEvents; i++) {
850         // not yet!!!
851         //      p|papiIDs[i];
852         p|papiValues[i];
853       }
854 #else
855       p|numPapiEvents;  // non papi version has value 0
856 #endif
857       if (p.isUnpacking()) cputime = icputime/1.0e6;
858       break;
859     case USER_SUPPLIED:
860           p|userSuppliedData;
861           p|itime;
862         break;
863     case USER_SUPPLIED_NOTE:
864           p|itime;
865           int length;
866           if (p.isPacking()) length = strlen(userSuppliedNote);
867           p | length;
868           char space;
869           space = ' ';
870           p | space;
871           if (p.isUnpacking()) {
872             userSuppliedNote = new char[length+1];
873             userSuppliedNote[length] = '\0';
874           }
875           PUParray(p,userSuppliedNote, length);
876           break;
877     case USER_SUPPLIED_BRACKETED_NOTE:
878       //CkPrintf("Writting out a USER_SUPPLIED_BRACKETED_NOTE\n");
879           p|itime;
880           p|iEndTime;
881           p|event;
882           int length2;
883           if (p.isPacking()) length2 = strlen(userSuppliedNote);
884           p | length2;
885           char space2;
886           space2 = ' ';
887           p | space2;
888           if (p.isUnpacking()) {
889             userSuppliedNote = new char[length+1];
890             userSuppliedNote[length] = '\0';
891           }
892           PUParray(p,userSuppliedNote, length2);
893           break;
894     case MEMORY_USAGE_CURRENT:
895       p | memUsage;
896       p | itime;
897         break;
898     case CREATION:
899       if (p.isPacking()) irecvtime = (CMK_TYPEDEF_UINT8)(1.0e6*recvTime);
900       p|mIdx; p|eIdx; p|itime;
901       p|event; p|pe; p|msglen; p|irecvtime;
902       if (p.isUnpacking()) recvTime = irecvtime/1.0e6;
903       break;
904     case CREATION_BCAST:
905       if (p.isPacking()) irecvtime = (CMK_TYPEDEF_UINT8)(1.0e6*recvTime);
906       p|mIdx; p|eIdx; p|itime;
907       p|event; p|pe; p|msglen; p|irecvtime; p|numpes;
908       if (p.isUnpacking()) recvTime = irecvtime/1.0e6;
909       break;
910     case CREATION_MULTICAST:
911       if (p.isPacking()) irecvtime = (CMK_TYPEDEF_UINT8)(1.0e6*recvTime);
912       p|mIdx; p|eIdx; p|itime;
913       p|event; p|pe; p|msglen; p|irecvtime; p|numpes;
914       if (p.isUnpacking()) pes = numpes?new int[numpes]:NULL;
915       for (i=0; i<numpes; i++) p|pes[i];
916       if (p.isUnpacking()) recvTime = irecvtime/1.0e6;
917       break;
918     case MESSAGE_RECV:
919       p|mIdx; p|eIdx; p|itime; p|event; p|pe; p|msglen;
920       break;
921
922     case ENQUEUE:
923     case DEQUEUE:
924       p|mIdx; p|itime; p|event; p|pe;
925       break;
926
927     case BEGIN_INTERRUPT:
928     case END_INTERRUPT:
929       p|itime; p|event; p|pe;
930       break;
931
932       // **CW** absolute timestamps are used here to support a quick
933       // way of determining the total time of a run in projections
934       // visualization.
935     case BEGIN_COMPUTATION:
936     case END_COMPUTATION:
937     case BEGIN_TRACE:
938     case END_TRACE:
939       p|itime;
940       break;
941     case BEGIN_FUNC:
942         p | itime;
943         p | mIdx;
944         p | event;
945         if(!p.isUnpacking()){
946                 p(fName,flen-1);
947         }
948         break;
949     case END_FUNC:
950         p | itime;
951         p | mIdx;
952         break;
953     case END_PHASE:
954       p|eIdx; // FIXME: actually the phase ID
955       p|itime;
956       break;
957     default:
958       CmiError("***Internal Error*** Wierd Event %d.\n", type);
959       break;
960   }
961   if (p.isUnpacking()) time = itime/1.0e6;
962   p|ret;
963 }
964
965 TraceProjections::TraceProjections(char **argv): 
966   curevent(0), inEntry(0), computationStarted(0), 
967         converseExit(0), endTime(0.0), traceNestedEvents(0),
968         currentPhaseID(0), lastPhaseEvent(NULL)
969 {
970   //  CkPrintf("Trace projections dummy constructor called on %d\n",CkMyPe());
971
972   if (CkpvAccess(traceOnPe) == 0) return;
973
974   CtvInitialize(int,curThreadEvent);
975   CkpvInitialize(CmiInt8, CtrLogBufSize);
976   CkpvAccess(CtrLogBufSize) = DefaultLogBufSize;
977   CtvAccess(curThreadEvent)=0;
978   if (CmiGetArgLongDesc(argv,"+logsize",&CkpvAccess(CtrLogBufSize), 
979                        "Log entries to buffer per I/O")) {
980     if (CkMyPe() == 0) {
981       CmiPrintf("Trace: logsize: %ld\n", CkpvAccess(CtrLogBufSize));
982     }
983   }
984   checknested = 
985     CmiGetArgFlagDesc(argv,"+checknested",
986                       "check projections nest begin end execute events");
987   traceNestedEvents = 
988     CmiGetArgFlagDesc(argv,"+tracenested",
989               "trace projections nest begin/end execute events");
990   int binary = 
991     CmiGetArgFlagDesc(argv,"+binary-trace",
992                       "Write log files in binary format");
993
994   CmiInt8 nSubdirs = 0;
995   CmiGetArgLongDesc(argv,"+trace-subdirs", &nSubdirs, "Number of subdirectories into which traces will be written");
996
997
998 #if CMK_PROJECTIONS_USE_ZLIB
999   int compressed = CmiGetArgFlagDesc(argv,"+gz-trace","Write log files pre-compressed with gzip");
1000 #else
1001   // consume the flag so there's no confusing
1002   CmiGetArgFlagDesc(argv,"+gz-trace",
1003                     "Write log files pre-compressed with gzip");
1004   if(CkMyPe() == 0) CkPrintf("Warning> gz-trace is not supported on this machine!\n");
1005 #endif
1006
1007   // **CW** default to non delta log encoding. The user may choose to do
1008   // create both logs (for debugging) or just the old log timestamping
1009   // (for compatibility).
1010   // Generating just the non delta log takes precedence over generating
1011   // both logs (if both arguments appear on the command line).
1012
1013   // switch to OLD log format until everything works // Gengbin
1014   nonDeltaLog = 1;
1015   deltaLog = 0;
1016   deltaLog = CmiGetArgFlagDesc(argv, "+logDelta",
1017                                   "Generate Delta encoded and simple timestamped log files");
1018
1019   _logPool = new LogPool(CkpvAccess(traceRoot));
1020   _logPool->setNumSubdirs(nSubdirs);
1021   _logPool->setBinary(binary);
1022 #if CMK_PROJECTIONS_USE_ZLIB
1023   _logPool->setCompressed(compressed);
1024 #endif
1025   if (CkMyPe() == 0) {
1026     _logPool->createSts();
1027     _logPool->createRC();
1028   }
1029   funcCount=1;
1030
1031 #if CMK_HAS_COUNTER_PAPI
1032   // We initialize and create the event sets for use with PAPI here.
1033   int papiRetValue = PAPI_library_init(PAPI_VER_CURRENT);
1034   if (papiRetValue != PAPI_VER_CURRENT) {
1035     CmiAbort("PAPI Library initialization failure!\n");
1036   }
1037   // PAPI 3 mandates the initialization of the set to PAPI_NULL
1038   papiEventSet = PAPI_NULL; 
1039   if (PAPI_create_eventset(&papiEventSet) != PAPI_OK) {
1040     CmiAbort("PAPI failed to create event set!\n");
1041   }
1042   papiRetValue = PAPI_add_events(papiEventSet, papiEvents, numPAPIEvents);
1043   if (papiRetValue != PAPI_OK) {
1044     if (papiRetValue == PAPI_ECNFLCT) {
1045       CmiAbort("PAPI events conflict! Please re-assign event types!\n");
1046     } else {
1047       CmiAbort("PAPI failed to add designated events!\n");
1048     }
1049   }
1050   papiValues = new long_long[numPAPIEvents];
1051   memset(papiValues, 0, numPAPIEvents*sizeof(long_long));
1052 #endif
1053 }
1054
1055 int TraceProjections::traceRegisterUserEvent(const char* evt, int e)
1056 {
1057   OPTIMIZED_VERSION
1058   CkAssert(e==-1 || e>=0);
1059   CkAssert(evt != NULL);
1060   int event;
1061   int biggest = -1;
1062   for (int i=0; i<CkpvAccess(usrEvents)->length(); i++) {
1063     int cur = (*CkpvAccess(usrEvents))[i]->e;
1064     if (cur == e) {
1065       //CmiPrintf("%s %s\n", (*CkpvAccess(usrEvents))[i]->str, evt);
1066       if (strcmp((*CkpvAccess(usrEvents))[i]->str, evt) == 0) 
1067         return e;
1068       else
1069         CmiAbort("UserEvent double registered!");
1070     }
1071     if (cur > biggest) biggest = cur;
1072   }
1073   // if no user events have so far been registered. biggest will be -1
1074   // and hence newly assigned event numbers will begin from 0.
1075   if (e==-1) event = biggest+1;  // automatically assign new event number
1076   else event = e;
1077   CkpvAccess(usrEvents)->push_back(new UsrEvent(event,(char *)evt));
1078   return event;
1079 }
1080
1081 void TraceProjections::traceClearEps(void)
1082 {
1083   // In trace-summary, this zeros out the EP bins, to eliminate noise
1084   // from startup.  Here, this isn't useful, since we can do that in
1085   // post-processing
1086 }
1087
1088 void TraceProjections::traceWriteSts(void)
1089 {
1090   if(CkMyPe()==0)
1091     _logPool->writeSts(this);
1092 }
1093
1094 /** 
1095  * **IMPT NOTES**:
1096  *
1097  * This is called when Converse closes during ConverseCommonExit().
1098  * **FIXME**(?) - is this also exposed as a tracing-framework API call?
1099  *
1100  * Some programs bypass CkExit() (like NAMD, which eventually calls
1101  * ConverseExit()), modules like traces will have to pretend to shutdown
1102  * as if CkExit() was called but at the same time avoid making
1103  * subsequent CkExit() calls (which is usually required for allowing
1104  * other modules to shutdown).
1105  *
1106  * Note that we can only get here if CkExit() was not called, since the
1107  * trace module will un-register itself from TraceArray if it did.
1108  *
1109  */
1110 void TraceProjections::traceClose(void)
1111 {
1112 #ifdef PROJ_ANALYSIS
1113   // CkPrintf("CkExit was not called on shutdown on [%d]\n", CkMyPe());
1114
1115   // sets the flag that tells the code not to make the CkExit call later
1116   converseExit = 1;
1117   if (CkMyPe() == 0) {
1118     CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1119     bocProxy.traceProjectionsParallelShutdown(-1);
1120   }
1121   if(CkMyRank() == CkMyNodeSize()){ //communication thread
1122     CkpvAccess(_trace)->endComputation();
1123     delete _logPool;              // will write
1124     // remove myself from traceArray so that no tracing will be called.
1125     CkpvAccess(_traces)->removeTrace(this);
1126   }
1127 #else
1128   // we've already deleted the logpool, so multiple calls to traceClose
1129   // are tolerated.
1130   if (_logPool == NULL) {
1131     return;
1132   }
1133   if(CkMyPe()==0){
1134     _logPool->writeSts(this);
1135   }
1136   CkpvAccess(_trace)->endComputation();
1137   delete _logPool;              // will write
1138   // remove myself from traceArray so that no tracing will be called.
1139   CkpvAccess(_traces)->removeTrace(this);
1140 #endif
1141 }
1142
1143 /**
1144  *  **IMPT NOTES**:
1145  *
1146  *  This is meant to be called internally by the tracing framework.
1147  *
1148  */
1149 void TraceProjections::closeTrace() {
1150   //  CkPrintf("Close Trace called on [%d]\n", CkMyPe());
1151   if (CkMyPe() == 0) {
1152     // CkPrintf("Pe 0 will now write sts and projrc files\n");
1153     _logPool->writeSts(this);
1154     _logPool->writeRC();
1155     // CkPrintf("Pe 0 has now written sts and projrc files\n");
1156   }
1157   delete _logPool;       // will write logs to file
1158 }
1159
1160 #if CMK_SMP_TRACE_COMMTHREAD
1161 void TraceProjections::traceBeginOnCommThread()
1162 {
1163   if (!computationStarted) return;
1164   _logPool->add(BEGIN_TRACE, 0, 0, TraceTimer(), curevent++, CmiNumPes()+CmiMyNode());
1165 }
1166
1167 void TraceProjections::traceEndOnCommThread()
1168 {
1169   _logPool->add(END_TRACE, 0, 0, TraceTimer(), curevent++, CmiNumPes()+CmiMyNode());
1170 }
1171 #endif
1172
1173 void TraceProjections::traceBegin(void)
1174 {
1175   if (!computationStarted) return;
1176   _logPool->add(BEGIN_TRACE, 0, 0, TraceTimer(), curevent++, CkMyPe());
1177 }
1178
1179 void TraceProjections::traceEnd(void)
1180 {
1181   _logPool->add(END_TRACE, 0, 0, TraceTimer(), curevent++, CkMyPe());
1182 }
1183
1184 void TraceProjections::userEvent(int e)
1185 {
1186   if (!computationStarted) return;
1187   _logPool->add(USER_EVENT, e, 0, TraceTimer(),curevent++,CkMyPe());
1188 }
1189
1190 void TraceProjections::userBracketEvent(int e, double bt, double et)
1191 {
1192   if (!computationStarted) return;
1193   // two events record Begin/End of event e.
1194   _logPool->add(USER_EVENT_PAIR, e, 0, TraceTimer(bt), curevent, CkMyPe());
1195   _logPool->add(USER_EVENT_PAIR, e, 0, TraceTimer(et), curevent++, CkMyPe());
1196 }
1197
1198 void TraceProjections::userSuppliedData(int d)
1199 {
1200   if (!computationStarted) return;
1201   _logPool->addUserSupplied(d);
1202 }
1203
1204 void TraceProjections::userSuppliedNote(char *note)
1205 {
1206   if (!computationStarted) return;
1207   _logPool->addUserSuppliedNote(note);
1208 }
1209
1210
1211 void TraceProjections::userSuppliedBracketedNote(char *note, int eventID, double bt, double et)
1212 {
1213   if (!computationStarted) return;
1214   _logPool->addUserSuppliedBracketedNote(note,  eventID,  bt, et);
1215 }
1216
1217 void TraceProjections::memoryUsage(double m)
1218 {
1219   if (!computationStarted) return;
1220   _logPool->addMemoryUsage(MEMORY_USAGE_CURRENT, TraceTimer(), m );
1221   
1222 }
1223
1224
1225 void TraceProjections::creation(envelope *e, int ep, int num)
1226 {
1227   double curTime = TraceTimer();
1228   if (e == 0) {
1229     CtvAccess(curThreadEvent) = curevent;
1230     _logPool->add(CREATION, ForChareMsg, ep, curTime,
1231                   curevent++, CkMyPe(), 0, NULL, 0, 0.0);
1232   } else {
1233     int type=e->getMsgtype();
1234     e->setEvent(curevent);
1235     if (num > 1) {
1236       _logPool->add(CREATION_BCAST, type, ep, curTime,
1237                     curevent++, CkMyPe(), e->getTotalsize(), 
1238                     NULL, 0, 0.0, num);
1239     } else {
1240       _logPool->add(CREATION, type, ep, curTime,
1241                     curevent++, CkMyPe(), e->getTotalsize(), 
1242                     NULL, 0, 0.0);
1243     }
1244   }
1245 }
1246
1247 void TraceProjections::creation(char *msg)
1248 {
1249 #if CMK_SMP_TRACE_COMMTHREAD
1250         //This function is only called from a comm thread
1251         //in SMP mode. So, it is possible the msg is not
1252         //a charm msg that contains an envelope, ep idx.
1253         envelope *e = (envelope *)msg;
1254         int ep = e->getEpIdx();
1255         int num = _entryTable.size();
1256         if(ep<num && ep>=0 && _entryTable[ep]->traceEnabled)
1257                 creation(e, ep, 1);
1258 #endif
1259 }
1260
1261
1262 /* **CW** Non-disruptive attempt to add destination PE knowledge to
1263    Communication Library-specific Multicasts via new event 
1264    CREATION_MULTICAST.
1265 */
1266
1267 void TraceProjections::creationMulticast(envelope *e, int ep, int num,
1268                                          int *pelist)
1269 {
1270   double curTime = TraceTimer();
1271   if (e==0) {
1272     CtvAccess(curThreadEvent)=curevent;
1273     _logPool->addCreationMulticast(ForChareMsg, ep, curTime, curevent++,
1274                                    CkMyPe(), 0, 0, 0.0, num, pelist);
1275   } else {
1276     int type=e->getMsgtype();
1277     e->setEvent(curevent);
1278     _logPool->addCreationMulticast(type, ep, curTime, curevent++, CkMyPe(),
1279                                    e->getTotalsize(), 0, 0.0, num, pelist);
1280   }
1281 }
1282
1283 void TraceProjections::creationDone(int num)
1284 {
1285   // modified the creation done time of the last num log entries
1286   // FIXME: totally a hack
1287   double curTime = TraceTimer();
1288   int idx = _logPool->numEntries-1;
1289   while (idx >=0 && num >0 ) {
1290     LogEntry &log = _logPool->pool[idx];
1291     if ((log.type == CREATION) ||
1292         (log.type == CREATION_BCAST) ||
1293         (log.type == CREATION_MULTICAST)) {
1294       log.recvTime = curTime - log.time;
1295       num --;
1296     }
1297     idx--;
1298   }
1299 }
1300
1301 void TraceProjections::beginExecute(CmiObjId *tid)
1302 {
1303 #if CMK_HAS_COUNTER_PAPI
1304   if (PAPI_read(papiEventSet, papiValues) != PAPI_OK) {
1305     CmiAbort("PAPI failed to read at begin execute!\n");
1306   }
1307 #endif
1308   if (checknested && inEntry) CmiAbort("Nested Begin Execute!\n");
1309   execEvent = CtvAccess(curThreadEvent);
1310   execEp = (-1);
1311   _logPool->add(BEGIN_PROCESSING,ForChareMsg,_threadEP,TraceTimer(),
1312                 execEvent,CkMyPe(), 0, tid);
1313 #if CMK_HAS_COUNTER_PAPI
1314   _logPool->addPapi(numPAPIEvents, papiEvents, papiValues);
1315 #endif
1316   inEntry = 1;
1317 }
1318
1319 void TraceProjections::beginExecute(envelope *e)
1320 {
1321   if(e==0) {
1322 #if CMK_HAS_COUNTER_PAPI
1323     if (PAPI_read(papiEventSet, papiValues) != PAPI_OK) {
1324       CmiAbort("PAPI failed to read at begin execute!\n");
1325     }
1326 #endif
1327     if (checknested && inEntry) CmiAbort("Nested Begin Execute!\n");
1328     execEvent = CtvAccess(curThreadEvent);
1329     execEp = (-1);
1330     _logPool->add(BEGIN_PROCESSING,ForChareMsg,_threadEP,TraceTimer(),
1331                   execEvent,CkMyPe(), 0, NULL, 0.0, TraceCpuTimer());
1332 #if CMK_HAS_COUNTER_PAPI
1333     _logPool->addPapi(numPAPIEvents, papiEvents, papiValues);
1334 #endif
1335     inEntry = 1;
1336   } else {
1337     beginExecute(e->getEvent(),e->getMsgtype(),e->getEpIdx(),
1338                  e->getSrcPe(),e->getTotalsize());
1339   }
1340 }
1341
1342 void TraceProjections::beginExecute(char *msg){
1343 #if CMK_SMP_TRACE_COMMTHREAD
1344         //This function is called from comm thread in SMP mode
1345     envelope *e = (envelope *)msg;
1346     int num = _entryTable.size();
1347     int ep = e->getEpIdx();
1348     if(ep<0 || ep>=num) return;
1349     if(_entryTable[ep]->traceEnabled)
1350                 beginExecute(e);
1351 #endif
1352 }
1353
1354 void TraceProjections::beginExecute(int event, int msgType, int ep, int srcPe,
1355                                     int mlen, CmiObjId *idx)
1356 {
1357   if (traceNestedEvents) {
1358     if (! nestedEvents.isEmpty()) {
1359       endExecuteLocal();
1360     }
1361     nestedEvents.enq(NestedEvent(event, msgType, ep, srcPe, mlen, idx));
1362   }
1363   beginExecuteLocal(event, msgType, ep, srcPe, mlen, idx);
1364 }
1365
1366 void TraceProjections::changeLastEntryTimestamp(double ts)
1367 {
1368   _logPool->modLastEntryTimestamp(ts);
1369 }
1370
1371 void TraceProjections::beginExecuteLocal(int event, int msgType, int ep, int srcPe,
1372                                     int mlen, CmiObjId *idx)
1373 {
1374 #if CMK_HAS_COUNTER_PAPI
1375   if (PAPI_read(papiEventSet, papiValues) != PAPI_OK) {
1376     CmiAbort("PAPI failed to read at begin execute!\n");
1377   }
1378 #endif
1379   if (checknested && inEntry) CmiAbort("Nested Begin Execute!\n");
1380   execEvent=event;
1381   execEp=ep;
1382   execPe=srcPe;
1383   _logPool->add(BEGIN_PROCESSING,msgType,ep,TraceTimer(),event,
1384                 srcPe, mlen, idx, 0.0, TraceCpuTimer());
1385 #if CMK_HAS_COUNTER_PAPI
1386   _logPool->addPapi(numPAPIEvents, papiEvents, papiValues);
1387 #endif
1388   inEntry = 1;
1389 }
1390
1391 void TraceProjections::endExecute(void)
1392 {
1393   if (traceNestedEvents) nestedEvents.deq();
1394   endExecuteLocal();
1395   if (traceNestedEvents) {
1396     if (! nestedEvents.isEmpty()) {
1397       NestedEvent &ne = nestedEvents.peek();
1398       beginExecuteLocal(ne.event, ne.msgType, ne.ep, ne.srcPe, ne.ml, ne.idx);
1399     }
1400   }
1401 }
1402
1403 void TraceProjections::endExecute(char *msg)
1404 {
1405 #if CMK_SMP_TRACE_COMMTHREAD
1406         //This function is called from comm thread in SMP mode
1407     envelope *e = (envelope *)msg;
1408     int num = _entryTable.size();
1409     int ep = e->getEpIdx();
1410     if(ep<0 || ep>=num) return;
1411     if(_entryTable[ep]->traceEnabled)
1412                 endExecute();
1413 #endif  
1414 }
1415
1416 void TraceProjections::endExecuteLocal(void)
1417 {
1418 #if CMK_HAS_COUNTER_PAPI
1419   if (PAPI_read(papiEventSet, papiValues) != PAPI_OK) {
1420     CmiAbort("PAPI failed to read at end execute!\n");
1421   }
1422 #endif
1423   if (checknested && !inEntry) CmiAbort("Nested EndExecute!\n");
1424   double cputime = TraceCpuTimer();
1425   if(execEp == (-1)) {
1426     _logPool->add(END_PROCESSING, 0, _threadEP, TraceTimer(),
1427                   execEvent, CkMyPe(), 0, NULL, 0.0, cputime);
1428   } else {
1429     _logPool->add(END_PROCESSING, 0, execEp, TraceTimer(),
1430                   execEvent, execPe, 0, NULL, 0.0, cputime);
1431   }
1432 #if CMK_HAS_COUNTER_PAPI
1433   _logPool->addPapi(numPAPIEvents, papiEvents, papiValues);
1434 #endif
1435   inEntry = 0;
1436 }
1437
1438 void TraceProjections::messageRecv(char *env, int pe)
1439 {
1440 #if 0
1441   envelope *e = (envelope *)env;
1442   int msgType = e->getMsgtype();
1443   int ep = e->getEpIdx();
1444 #if 0
1445   if (msgType==NewChareMsg || msgType==NewVChareMsg
1446           || msgType==ForChareMsg || msgType==ForVidMsg
1447           || msgType==BocInitMsg || msgType==NodeBocInitMsg
1448           || msgType==ForBocMsg || msgType==ForNodeBocMsg)
1449     ep = e->getEpIdx();
1450   else
1451     ep = _threadEP;
1452 #endif
1453   _logPool->add(MESSAGE_RECV, msgType, ep, TraceTimer(),
1454                 curevent++, e->getSrcPe(), e->getTotalsize());
1455 #endif
1456 }
1457
1458 void TraceProjections::beginIdle(double curWallTime)
1459 {
1460   _logPool->add(BEGIN_IDLE, 0, 0, TraceTimer(curWallTime), 0, CkMyPe());
1461 }
1462
1463 void TraceProjections::endIdle(double curWallTime)
1464 {
1465   _logPool->add(END_IDLE, 0, 0, TraceTimer(curWallTime), 0, CkMyPe());
1466 }
1467
1468 void TraceProjections::beginPack(void)
1469 {
1470   _logPool->add(BEGIN_PACK, 0, 0, TraceTimer(), 0, CkMyPe());
1471 }
1472
1473 void TraceProjections::endPack(void)
1474 {
1475   _logPool->add(END_PACK, 0, 0, TraceTimer(), 0, CkMyPe());
1476 }
1477
1478 void TraceProjections::beginUnpack(void)
1479 {
1480   _logPool->add(BEGIN_UNPACK, 0, 0, TraceTimer(), 0, CkMyPe());
1481 }
1482
1483 void TraceProjections::endUnpack(void)
1484 {
1485   _logPool->add(END_UNPACK, 0, 0, TraceTimer(), 0, CkMyPe());
1486 }
1487
1488 void TraceProjections::enqueue(envelope *) {}
1489
1490 void TraceProjections::dequeue(envelope *) {}
1491
1492 void TraceProjections::beginComputation(void)
1493 {
1494   computationStarted = 1;
1495
1496   // Executes the callback function provided by the machine
1497   // layer. This is the proper method to register user events in a
1498   // machine layer because projections is a charm++ module.
1499   if (CkpvAccess(traceOnPe) != 0) {
1500     void (*ptr)() = registerMachineUserEvents();
1501     if (ptr != NULL) {
1502       ptr();
1503     }
1504   }
1505 //  CkpvAccess(traceInitTime) = TRACE_TIMER();
1506 //  CkpvAccess(traceInitCpuTime) = TRACE_CPUTIMER();
1507   _logPool->add(BEGIN_COMPUTATION, 0, 0, TraceTimer(), -1, -1);
1508 #if CMK_HAS_COUNTER_PAPI
1509   // we start the counters here
1510   if (PAPI_start(papiEventSet) != PAPI_OK) {
1511     CmiAbort("PAPI failed to start designated counters!\n");
1512   }
1513 #endif
1514 }
1515
1516 void TraceProjections::endComputation(void)
1517 {
1518 #if CMK_HAS_COUNTER_PAPI
1519   // we stop the counters here. A silent failure is alright since we
1520   // are already at the end of the program.
1521   if (PAPI_stop(papiEventSet, papiValues) != PAPI_OK) {
1522     CkPrintf("Warning: PAPI failed to stop correctly!\n");
1523   }
1524   // NOTE: We should not do a complete close of PAPI until after the
1525   // sts writer is done.
1526 #endif
1527   endTime = TraceTimer();
1528   _logPool->add(END_COMPUTATION, 0, 0, endTime, -1, -1);
1529   /*
1530   CkPrintf("End Computation [%d] records time as %lf\n", CkMyPe(), 
1531            endTime*1e06);
1532   */
1533 }
1534
1535 int TraceProjections::idxRegistered(int idx)
1536 {
1537     int idxVecLen = idxVec.size();
1538     for(int i=0; i<idxVecLen; i++)
1539     {
1540         if(idx == idxVec[i])
1541             return 1;
1542     }
1543     return 0;
1544 }
1545
1546 void TraceProjections::regFunc(const char *name, int &idx, int idxSpecifiedByUser){
1547     StrKey k((char*)name,strlen(name));
1548     int num = funcHashtable.get(k);
1549     
1550     if(num!=0) {
1551         return;
1552         //as for mpi programs, the same function may be registered for several times
1553         //CmiError("\"%s has been already registered! Please change the name!\"\n", name);
1554     }
1555     
1556     int isIdxExisting=0;
1557     if(idxSpecifiedByUser)
1558         isIdxExisting=idxRegistered(idx);
1559     if(isIdxExisting){
1560         return;
1561         //same reason with num!=0
1562         //CmiError("The identifier %d for the trace function has been already registered!", idx);
1563     }
1564
1565     if(idxSpecifiedByUser) {
1566         char *st = new char[strlen(name)+1];
1567         memcpy(st,name,strlen(name)+1);
1568         StrKey *newKey = new StrKey(st,strlen(st));
1569         int &ref = funcHashtable.put(*newKey);
1570         ref=idx;
1571         funcCount++;
1572         idxVec.push_back(idx);  
1573     } else {
1574         char *st = new char[strlen(name)+1];
1575         memcpy(st,name,strlen(name)+1);
1576         StrKey *newKey = new StrKey(st,strlen(st));
1577         int &ref = funcHashtable.put(*newKey);
1578         ref=funcCount;
1579         num = funcCount;
1580         funcCount++;
1581         idx = num;
1582         idxVec.push_back(idx);
1583     }
1584 }
1585
1586 void TraceProjections::beginFunc(char *name,char *file,int line){
1587         StrKey k(name,strlen(name));    
1588         unsigned short  num = (unsigned short)funcHashtable.get(k);
1589         beginFunc(num,file,line);
1590 }
1591
1592 void TraceProjections::beginFunc(int idx,char *file,int line){
1593         if(idx <= 0){
1594                 CmiError("Unregistered function id %d being used in %s:%d \n",idx,file,line);
1595         }       
1596         _logPool->add(BEGIN_FUNC,TraceTimer(),idx,line,file);
1597 }
1598
1599 void TraceProjections::endFunc(char *name){
1600         StrKey k(name,strlen(name));    
1601         int num = funcHashtable.get(k);
1602         endFunc(num);   
1603 }
1604
1605 void TraceProjections::endFunc(int num){
1606         if(num <= 0){
1607                 printf("endFunc without start :O\n");
1608         }
1609         _logPool->add(END_FUNC,TraceTimer(),num,0,NULL);
1610 }
1611
1612 // specialized PUP:ers for handling trace projections logs
1613 void toProjectionsFile::bytes(void *p,int n,size_t itemSize,dataType t)
1614 {
1615   for (int i=0;i<n;i++) 
1616     switch(t) {
1617     case Tchar: CheckAndFPrintF(f,"%c",((char *)p)[i]); break;
1618     case Tuchar:
1619     case Tbyte: CheckAndFPrintF(f,"%d",((unsigned char *)p)[i]); break;
1620     case Tshort: CheckAndFPrintF(f," %d",((short *)p)[i]); break;
1621     case Tushort: CheckAndFPrintF(f," %u",((unsigned short *)p)[i]); break;
1622     case Tint: CheckAndFPrintF(f," %d",((int *)p)[i]); break;
1623     case Tuint: CheckAndFPrintF(f," %u",((unsigned int *)p)[i]); break;
1624     case Tlong: CheckAndFPrintF(f," %ld",((long *)p)[i]); break;
1625     case Tulong: CheckAndFPrintF(f," %lu",((unsigned long *)p)[i]); break;
1626     case Tfloat: CheckAndFPrintF(f," %.7g",((float *)p)[i]); break;
1627     case Tdouble: CheckAndFPrintF(f," %.15g",((double *)p)[i]); break;
1628 #ifdef CMK_PUP_LONG_LONG
1629     case Tlonglong: CheckAndFPrintF(f," %lld",((CMK_TYPEDEF_INT8 *)p)[i]); break;
1630     case Tulonglong: CheckAndFPrintF(f," %llu",((CMK_TYPEDEF_UINT8 *)p)[i]); break;
1631 #endif
1632     default: CmiAbort("Unrecognized pup type code!");
1633     };
1634 }
1635
1636 void fromProjectionsFile::bytes(void *p,int n,size_t itemSize,dataType t)
1637 {
1638   for (int i=0;i<n;i++) 
1639     switch(t) {
1640     case Tchar: { 
1641       char c = fgetc(f);
1642       if (c==EOF)
1643         parseError("Could not match character");
1644       else
1645         ((char *)p)[i] = c;
1646       break;
1647     }
1648     case Tuchar:
1649     case Tbyte: ((unsigned char *)p)[i]=(unsigned char)readInt("%d"); break;
1650     case Tshort:((short *)p)[i]=(short)readInt(); break;
1651     case Tushort: ((unsigned short *)p)[i]=(unsigned short)readUint(); break;
1652     case Tint:  ((int *)p)[i]=readInt(); break;
1653     case Tuint: ((unsigned int *)p)[i]=readUint(); break;
1654     case Tlong: ((long *)p)[i]=readInt(); break;
1655     case Tulong:((unsigned long *)p)[i]=readUint(); break;
1656     case Tfloat: ((float *)p)[i]=(float)readDouble(); break;
1657     case Tdouble:((double *)p)[i]=readDouble(); break;
1658 #ifdef CMK_PUP_LONG_LONG
1659     case Tlonglong: ((CMK_TYPEDEF_INT8 *)p)[i]=readLongInt(); break;
1660     case Tulonglong: ((CMK_TYPEDEF_UINT8 *)p)[i]=readLongInt(); break;
1661 #endif
1662     default: CmiAbort("Unrecognized pup type code!");
1663     };
1664 }
1665
1666 #if CMK_PROJECTIONS_USE_ZLIB
1667 void toProjectionsGZFile::bytes(void *p,int n,size_t itemSize,dataType t)
1668 {
1669   for (int i=0;i<n;i++) 
1670     switch(t) {
1671     case Tchar: gzprintf(f,"%c",((char *)p)[i]); break;
1672     case Tuchar:
1673     case Tbyte: gzprintf(f,"%d",((unsigned char *)p)[i]); break;
1674     case Tshort: gzprintf(f," %d",((short *)p)[i]); break;
1675     case Tushort: gzprintf(f," %u",((unsigned short *)p)[i]); break;
1676     case Tint: gzprintf(f," %d",((int *)p)[i]); break;
1677     case Tuint: gzprintf(f," %u",((unsigned int *)p)[i]); break;
1678     case Tlong: gzprintf(f," %ld",((long *)p)[i]); break;
1679     case Tulong: gzprintf(f," %lu",((unsigned long *)p)[i]); break;
1680     case Tfloat: gzprintf(f," %.7g",((float *)p)[i]); break;
1681     case Tdouble: gzprintf(f," %.15g",((double *)p)[i]); break;
1682 #ifdef CMK_PUP_LONG_LONG
1683     case Tlonglong: gzprintf(f," %lld",((CMK_TYPEDEF_INT8 *)p)[i]); break;
1684     case Tulonglong: gzprintf(f," %llu",((CMK_TYPEDEF_UINT8 *)p)[i]); break;
1685 #endif
1686     default: CmiAbort("Unrecognized pup type code!");
1687     };
1688 }
1689 #endif
1690
1691 void TraceProjections::endPhase() {
1692   double currentPhaseTime = TraceTimer();
1693   double lastPhaseTime = 0.0;
1694
1695   if (lastPhaseEvent != NULL) {
1696     lastPhaseTime = lastPhaseEvent->time;
1697   } else {
1698     if (_logPool->pool != NULL) {
1699       // assumed to be BEGIN_COMPUTATION
1700       lastPhaseTime = _logPool->pool[0].time;
1701     } else {
1702       CkPrintf("[%d] Warning: End Phase encountered in an empty log. Inserting BEGIN_COMPUTATION event\n", CkMyPe());
1703       _logPool->add(BEGIN_COMPUTATION, 0, 0, currentPhaseTime, -1, -1);
1704       lastPhaseTime = currentPhaseTime;
1705     }
1706   }
1707
1708   /* Insert endPhase event here. */
1709   /* FIXME: Format should be TYPE, PHASE#, TimeStamp, [StartTime] */
1710   /*        We currently "borrow" from the standard add() method. */
1711   /*        It should really be its own add() method.             */
1712   /* NOTE: assignment to lastPhaseEvent is "pre-emptive".         */
1713   lastPhaseEvent = &(_logPool->pool[_logPool->numEntries]);
1714   _logPool->add(END_PHASE, 0, currentPhaseID, currentPhaseTime, -1, CkMyPe());
1715   currentPhaseID++;
1716 }
1717
1718 #ifdef PROJ_ANALYSIS
1719 // ***** FROM HERE, ALL BOC-BASED FUNCTIONALITY IS DEFINED *******
1720
1721
1722 // ***@@@@ REGISTRATION FUNCTIONS/METHODS @@@@***
1723
1724 void registerOutlierReduction() {
1725   outlierReductionType =
1726     CkReduction::addReducer(outlierReduction);
1727   minMaxReductionType =
1728     CkReduction::addReducer(minMaxReduction);
1729 }
1730
1731 /**
1732  * **IMPT NOTES**:
1733  *
1734  * This is the C++ code that is registered to be activated at module
1735  * shutdown. This is called exactly once on processor 0. Module shutdown
1736  * is initiated as a result of a CkExit() call by the application code
1737  * 
1738  * The exit function must ultimately call CkExit() again to
1739  * so that other module exit functions may proceed after this module is
1740  * done.
1741  *
1742  */
1743 // FIXME: WHY extern "C"???
1744 extern "C" void TraceProjectionsExitHandler()
1745 {
1746 #if CMK_TRACE_ENABLED
1747   // CkPrintf("[%d] TraceProjectionsExitHandler called!\n", CkMyPe());
1748   CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1749   bocProxy.traceProjectionsParallelShutdown(CkMyPe());
1750 #else
1751   CkExit();
1752 #endif
1753 }
1754
1755 // This is called once on each processor but the idiom of use appears
1756 // to be to only have processor 0 register the function.
1757 //
1758 // See initnode in trace-projections.ci
1759 void initTraceProjectionsBOC()
1760 {
1761   // CkPrintf("[%d] Trace Projections initialization called!\n", CkMyPe());
1762 #ifdef __BLUEGENE__
1763   if (BgNodeRank() == 0) {
1764 #else
1765     if (CkMyRank() == 0) {
1766 #endif
1767       registerExitFn(TraceProjectionsExitHandler);
1768     }
1769 #if 0
1770   } // this is so indentation does not get messed up
1771 #endif
1772 }
1773
1774 // mainchare for trace-projections BOC-operations. 
1775 // Instantiated at processor 0 and ONLY resides on processor 0 for the 
1776 // rest of its life.
1777 //
1778 // Responsible for:
1779 //   1. Handling commandline arguments
1780 //   2. Creating any objects required for proper BOC operations.
1781 //
1782 TraceProjectionsInit::TraceProjectionsInit(CkArgMsg *msg) {
1783   /** Options for Outlier Analysis */
1784   // defaults. Things will change with support for interactive analysis.
1785   bool findOutliers = false;
1786   bool outlierAutomatic = true;
1787   int numKSeeds = 10; 
1788
1789   int peNumKeep = CkNumPes();  // used as a default
1790   double entryThreshold = 0.0;
1791   bool outlierUsePhases = false;
1792   if (outlierAutomatic) {
1793     CmiGetArgIntDesc(msg->argv, "+outlierNumSeeds", &numKSeeds,
1794                      "Number of cluster seeds to apply at outlier analysis.");
1795     CmiGetArgIntDesc(msg->argv, "+outlierPeNumKeep", 
1796                      &peNumKeep, "Number of Processors to retain data");
1797     CmiGetArgDoubleDesc(msg->argv, "+outlierEpThresh", &entryThreshold,
1798                         "Minimum significance of entry points to be considered for clustering (%).");
1799     findOutliers =
1800       CmiGetArgFlagDesc(msg->argv,"+outlier", "Find Outliers.");
1801     outlierUsePhases = 
1802       CmiGetArgFlagDesc(msg->argv,"+outlierUsePhases",
1803                         "Apply automatic outlier analysis to any available phases.");
1804     if (outlierUsePhases) {
1805       // if the user wants to use an outlier feature, it is assumed outlier
1806       //    analysis is desired.
1807       findOutliers = true;
1808     }
1809   }
1810   traceProjectionsGID = CProxy_TraceProjectionsBOC::ckNew(findOutliers);
1811   if (findOutliers) {
1812     kMeansGID = CProxy_KMeansBOC::ckNew(outlierAutomatic,
1813                                         numKSeeds,
1814                                         peNumKeep,
1815                                         entryThreshold,
1816                                         outlierUsePhases);
1817   }
1818 }
1819
1820 // Called on every processor.
1821 void TraceProjectionsBOC::traceProjectionsParallelShutdown(int pe) {
1822   //CmiPrintf("[%d] traceProjectionsParallelShutdown called from . \n", CkMyPe(), pe);
1823   endPe = pe;                // the pe that starts CkExit()
1824   if (CkMyPe() == 0) {
1825     analysisStartTime = CmiWallTimer();
1826   }
1827   CkpvAccess(_trace)->endComputation();
1828   // no more tracing for projections on this processor after this point. 
1829   // Note that clear must be called after remove, or bad things will happen.
1830   CkpvAccess(_traces)->removeTrace(CkpvAccess(_trace));
1831   CkpvAccess(_traces)->clearTrace();
1832
1833   // From this point, we start multiple chains of reductions and broadcasts to
1834   // perform final online analysis activities.
1835
1836   // Start all parallel operations at once. 
1837   //   These MUST NOT modify base performance data in LogPool. If they must,
1838   //   then the parallel operations must be phased (and this code to be
1839   //   restructured as necessary)
1840   CProxy_KMeansBOC kMeansProxy(kMeansGID);
1841   CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1842   if (findOutliers) {
1843     parModulesRemaining++;
1844     kMeansProxy[CkMyPe()].startKMeansAnalysis();
1845   }
1846   parModulesRemaining++;
1847   bocProxy[CkMyPe()].startEndTimeAnalysis();
1848 }
1849
1850 // Called on each processor
1851 void KMeansBOC::startKMeansAnalysis() {
1852   // Initialize all necessary structures
1853   LogPool *pool = CkpvAccess(_trace)->_logPool;
1854
1855  if(CkMyPe()==0)     CkPrintf("[%d] KMeansBOC::startKMeansAnalysis time=\t%g\n", CkMyPe(), CkWallTimer() );
1856   int flushInt = 0;
1857   if (pool->hasFlushed) {
1858     flushInt = 1;
1859   }
1860   
1861   CkCallback cb(CkIndex_KMeansBOC::flushCheck(NULL), 
1862                 0, thisProxy);
1863   contribute(sizeof(int), &flushInt, CkReduction::logical_or, cb);  
1864 }
1865
1866 // Called on processor 0
1867 void KMeansBOC::flushCheck(CkReductionMsg *msg) {
1868   int someFlush = *((int *)msg->getData());
1869
1870   // if(CkMyPe()==0) CkPrintf("[%d] KMeansBOC::flushCheck time=\t%g\n", CkMyPe(), CkWallTimer() );
1871   
1872   if (someFlush == 0) {
1873     // Data intact proceed with KMeans analysis
1874     CProxy_KMeansBOC kMeansProxy(kMeansGID);
1875     kMeansProxy.flushCheckDone();
1876   } else {
1877     // Some processor had flushed it data at some point, abandon KMeans
1878     CkPrintf("Warning: Some processor has flushed its data. No KMeans will be conducted\n");
1879     // terminate KMeans
1880     CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1881     bocProxy[0].kMeansDone();
1882   }
1883 }
1884
1885 // Called on each processor
1886 void KMeansBOC::flushCheckDone() {
1887   // **FIXME** - more flexible metric collection scheme may be necessary
1888   //   in the future for production use.
1889   LogPool *pool = CkpvAccess(_trace)->_logPool;
1890
1891   // if(CkMyPe()==0)     CkPrintf("[%d] KMeansBOC::flushCheckDone time=\t%g\n", CkMyPe(), CkWallTimer() );
1892
1893   numEntryMethods = _entryTable.size();
1894   numMetrics = numEntryMethods + 2; // EPtime + idle and overhead
1895
1896   // maintained across phases
1897   markedBegin = false;
1898   markedIdle = false;
1899   beginBlockTime = 0.0;
1900   beginIdleBlockTime = 0.0;
1901   lastBeginEPIdx = -1; // none
1902
1903   lastPhaseIdx = 0;
1904   currentExecTimes = NULL;
1905   currentPhase = 0;
1906   selected = false;
1907
1908   pool->initializePhases();
1909
1910   // incoming K Seeds and the per-phase filter
1911   incKSeeds = new double[numK*numMetrics];
1912   keepMetric = new bool[numMetrics];
1913
1914   //  Something wrong when call thisProxy[CkMyPe()].getNextPhaseMetrics() !??!
1915   //  CProxy_KMeansBOC kMeansProxy(kMeansGID);
1916   //  kMeansProxy[CkMyPe()].getNextPhaseMetrics();
1917   thisProxy[CkMyPe()].getNextPhaseMetrics();
1918 }
1919
1920 // Called on each processor.
1921 void KMeansBOC::getNextPhaseMetrics() {
1922   // Assumes the presence of the complete logs on this processor.
1923   // Assumes first event is always BEGIN_COMPUTATION
1924   // Assumes each processor sees the same number of phases.
1925   //
1926   // In this code, we collect performance data for this processor.
1927   // All times are in seconds.
1928
1929   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::getNextPhaseMetrics time=\t%g\n", CkMyPe(), CkWallTimer() );  
1930
1931   if (usePhases) {
1932     DEBUGF("[%d] Using Phases\n", CkMyPe());
1933   } else {
1934     DEBUGF("[%d] NOT using Phases\n", CkMyPe());
1935   }
1936   
1937   if (currentExecTimes != NULL) {
1938     delete [] currentExecTimes;
1939   }
1940   currentExecTimes = new double[numMetrics];
1941   for (int i=0; i<numMetrics; i++) {
1942     currentExecTimes[i] = 0.0;
1943   }
1944
1945   int numEventMethods = _entryTable.size();
1946   LogPool *pool = CkpvAccess(_trace)->_logPool;
1947   
1948   CkAssert(pool->numEntries > lastPhaseIdx);
1949   double startPhaseTime = pool->pool[lastPhaseIdx].time;
1950   double totalPhaseTime = 0.0;
1951   double totalActiveTime = 0.0; // entry method + idle
1952
1953   for (int i=lastPhaseIdx; i<pool->numEntries; i++) {
1954     if (pool->pool[i].type == BEGIN_PROCESSING) {
1955       // check pairing
1956       if (!markedBegin) {
1957         markedBegin = true;
1958       }
1959       beginBlockTime = pool->pool[i].time;
1960       lastBeginEPIdx = pool->pool[i].eIdx;
1961     } else if (pool->pool[i].type == END_PROCESSING) {
1962       // check pairing
1963       // if End without a begin, just ignore
1964       //   this event. If a phase-boundary is crossed, the Begin
1965       //   event would be maintained in beginBlockTime, so it is 
1966       //   not a problem.
1967       if (markedBegin) {
1968         markedBegin = false;
1969         if (pool->pool[i].event < 0)
1970         {
1971           // ignore dummy events. **FIXME** as they have no eIdx?
1972           continue;
1973         }
1974         currentExecTimes[pool->pool[i].eIdx] += 
1975           pool->pool[i].time - beginBlockTime;
1976         totalActiveTime += pool->pool[i].time - beginBlockTime;
1977         lastBeginEPIdx = -1;
1978       }
1979     } else if (pool->pool[i].type == BEGIN_IDLE) {
1980       // check pairing
1981       if (!markedIdle) {
1982         markedIdle = true;
1983       }
1984       beginIdleBlockTime = pool->pool[i].time;
1985     } else if (pool->pool[i].type == END_IDLE) {
1986       // check pairing
1987       if (markedIdle) {
1988         markedIdle = false;
1989         currentExecTimes[numEventMethods] += 
1990           pool->pool[i].time - beginIdleBlockTime;
1991         totalActiveTime += pool->pool[i].time - beginIdleBlockTime;
1992       }
1993     } else if (pool->pool[i].type == END_PHASE) {
1994       // ignored when not using phases
1995       if (usePhases) {
1996         // when we've not visited this node before
1997         if (i != lastPhaseIdx) { 
1998           totalPhaseTime = 
1999             pool->pool[i].time - pool->pool[lastPhaseIdx].time;
2000           // it is important that proper accounting of time take place here.
2001           // Note that END_PHASE events inevitably occur in the context of
2002           //   some entry method by the way the tracing API is designed.
2003           if (markedBegin) {
2004             CkAssert(lastBeginEPIdx >= 0);
2005             currentExecTimes[lastBeginEPIdx] += 
2006               pool->pool[i].time - beginBlockTime;
2007             totalActiveTime += pool->pool[i].time - beginBlockTime;
2008             // this is so the remainder contributes to the next phase
2009             beginBlockTime = pool->pool[i].time;
2010           }
2011           // The following is unlikely, but stranger things have happened.
2012           if (markedIdle) {
2013             currentExecTimes[numEventMethods] +=
2014               pool->pool[i].time - beginIdleBlockTime;
2015             totalActiveTime += pool->pool[i].time - beginIdleBlockTime;
2016             // this is so the remainder contributes to the next phase
2017             beginIdleBlockTime = pool->pool[i].time;
2018           }
2019           if (totalActiveTime <= totalPhaseTime) {
2020             currentExecTimes[numEventMethods+1] = 
2021               totalPhaseTime - totalActiveTime;
2022           } else {
2023             currentExecTimes[numEventMethods+1] = 0.0;
2024             CkPrintf("[%d] Warning: Overhead found to be negative for Phase %d!\n",
2025                      CkMyPe(), currentPhase);
2026           }
2027           collectKMeansData();
2028           // end the loop (and method) and defer the work till the next call
2029           lastPhaseIdx = i;
2030           break; 
2031         }
2032       }
2033     } else if (pool->pool[i].type == END_COMPUTATION) {
2034       if (markedBegin) {
2035         CkAssert(lastBeginEPIdx >= 0);
2036         currentExecTimes[lastBeginEPIdx] += 
2037           pool->pool[i].time - beginBlockTime;
2038         totalActiveTime += pool->pool[i].time - beginBlockTime;
2039       }
2040       if (markedIdle) {
2041         currentExecTimes[numEventMethods] +=
2042           pool->pool[i].time - beginIdleBlockTime;
2043         totalActiveTime += pool->pool[i].time - beginIdleBlockTime;
2044       }
2045       totalPhaseTime = 
2046         pool->pool[i].time - pool->pool[lastPhaseIdx].time;
2047       if (totalActiveTime <= totalPhaseTime) {
2048         currentExecTimes[numEventMethods+1] = totalPhaseTime - totalActiveTime;
2049       } else {
2050         currentExecTimes[numEventMethods+1] = 0.0;
2051         CkPrintf("[%d] Warning: Overhead found to be negative!\n",
2052                  CkMyPe());
2053       }
2054       collectKMeansData();
2055     }
2056   }
2057 }
2058
2059 /**
2060  *  Through a reduction, collectKMeansData aggregates each processors' data
2061  *  in order for global properties to be determined:
2062  *  
2063  *  1. min & max to determine normalization factors.
2064  *  2. sum to determine global EP averages for possible metric reduction
2065  *       through thresholding.
2066  *  3. sum of squares to compute stddev which may be useful in the future.
2067  *
2068  *  collectKMeansData will also keep the processor's data for the current
2069  *    phase so that it may be normalized and worked on subsequently.
2070  *
2071  **/
2072 void KMeansBOC::collectKMeansData() {
2073   int minOffset = numMetrics;
2074   int maxOffset = 2*numMetrics;
2075   int sosOffset = 3*numMetrics; // sos = Sum Of Squares
2076
2077   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::collectKMeansData time=\tg\n", CkMyPe(), CkWallTimer() );
2078
2079   double *reductionMsg = new double[numMetrics*4];
2080
2081   for (int i=0; i<numMetrics; i++) {
2082     reductionMsg[i] = currentExecTimes[i];
2083     // duplicate the event times for max and min sections of the reduction
2084     reductionMsg[minOffset + i] = currentExecTimes[i];
2085     reductionMsg[maxOffset + i] = currentExecTimes[i];
2086     // compute squares
2087     reductionMsg[sosOffset + i] = currentExecTimes[i]*currentExecTimes[i];
2088   }
2089
2090   CkCallback cb(CkIndex_KMeansBOC::globalMetricRefinement(NULL), 
2091                 0, thisProxy);
2092   contribute((numMetrics*4)*sizeof(double), reductionMsg, 
2093              outlierReductionType, cb);  
2094 }
2095
2096 // The purpose is mainly to initialize the k seeds and generate
2097 //   normalization parameters for each of the metrics. The k seeds
2098 //   and normalization parameters are broadcast to all processors.
2099 //
2100 // Called on processor 0
2101 void KMeansBOC::globalMetricRefinement(CkReductionMsg *msg) {
2102   CkAssert(CkMyPe() == 0);
2103   
2104   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::globalMetricRefinement time=\t%g\n", CkMyPe(), CkWallTimer() );
2105
2106   int sumOffset = 0;
2107   int minOffset = numMetrics;
2108   int maxOffset = 2*numMetrics;
2109   int sosOffset = 3*numMetrics; // sos = Sum Of Squares
2110
2111   // calculate statistics & boundaries for the k seeds for clustering
2112   KMeansStatsMessage *outmsg = 
2113     new (numMetrics, numK*numMetrics, numMetrics*4) KMeansStatsMessage;
2114   outmsg->numMetrics = numMetrics;
2115   outmsg->numKPos = numK*numMetrics;
2116   outmsg->numStats = numMetrics*4;
2117
2118   // Sum | Min | Max | Sum of Squares
2119   double *totalExecTimes = (double *)msg->getData();
2120   double totalTime = 0.0;
2121
2122   for (int i=0; i<numMetrics; i++) {
2123     DEBUGN("%lf\n", totalExecTimes[i]);
2124     totalTime += totalExecTimes[i];
2125
2126     // calculate event mean over all processors
2127     outmsg->stats[sumOffset + i] = totalExecTimes[sumOffset + i]/CkNumPes();
2128
2129     // get the ranges and offsets of each metric. With this, we get
2130     //   normalization factors that can be sent back to each processor to
2131     //   be used as necessary. We reuse max for range. Min remains the offset.
2132     outmsg->stats[minOffset + i] = totalExecTimes[minOffset + i];
2133     outmsg->stats[maxOffset + i] = totalExecTimes[maxOffset + i] -
2134       totalExecTimes[minOffset + i];
2135     
2136     // calculate stddev (using biased variance)
2137     outmsg->stats[sosOffset + i] = 
2138       sqrt((totalExecTimes[sosOffset + i] - 
2139             2*(outmsg->stats[i])*totalExecTimes[i] +
2140             (outmsg->stats[i])*(outmsg->stats[i])*CkNumPes())/
2141            CkNumPes());
2142   }
2143
2144   for (int i=0; i<numMetrics; i++) {
2145     // 1) if the proportion of the max value of the entry method relative to
2146     //   the average time taken over all entry methods across all processors
2147     //   is greater than the stipulated percentage threshold ...; AND
2148     // 2) if the range of values are non-zero.
2149     //
2150     // The current assumption is totalTime > 0 (what program has zero total
2151     //   time from all work?)
2152     keepMetric[i] = ((totalExecTimes[maxOffset + i]/(totalTime/CkNumPes()) >=
2153                      entryThreshold) &&
2154       (totalExecTimes[maxOffset + i] > totalExecTimes[minOffset + i]));
2155     if (keepMetric[i]) {
2156       DEBUGF("[%d] Keep EP %d | Max = %lf | Avg Tot = %lf\n", CkMyPe(), i,
2157              totalExecTimes[maxOffset + i], totalTime/CkNumPes());
2158     } else {
2159       DEBUGN("[%d] DO NOT Keep EP %d\n", CkMyPe(), i);
2160     }
2161     outmsg->filter[i] = keepMetric[i];
2162   }
2163
2164   delete msg;
2165   
2166   // initialize k seeds for this phase
2167   kSeeds = new double[numK*numMetrics];
2168
2169   numKReported = 0;
2170   kNumMembers = new int[numK];
2171
2172   // Randomly select k processors' metric vectors for the k seeds
2173   //  srand((unsigned)(CmiWallTimer()*1.0e06));
2174   srand(11337); // for debugging purposes
2175   for (int k=0; k<numK; k++) {
2176     DEBUGF("Seed %d | ", k);
2177     for (int m=0; m<numMetrics; m++) {
2178       double factor = totalExecTimes[maxOffset + m] - 
2179         totalExecTimes[minOffset + m];
2180       // "uniform" distribution, scaled according to the normalization
2181       //   factors
2182       //      kSeeds[numMetrics*k + m] = ((1.0*(k+1))/numK)*factor;
2183       // Random distribution.
2184       kSeeds[numMetrics*k + m] =
2185         ((rand()*1.0)/RAND_MAX)*factor;
2186       if (keepMetric[m]) {
2187         DEBUGF("[%d|%lf] ", m, kSeeds[numMetrics*k + m]);
2188       }
2189       outmsg->kSeedsPos[numMetrics*k + m] = kSeeds[numMetrics*k + m];
2190     }
2191     DEBUGF("\n");
2192     kNumMembers[k] = 0;
2193   }
2194
2195   // broadcast statistical values to all processors for cluster discovery
2196   thisProxy.findInitialClusters(outmsg);
2197 }
2198
2199
2200
2201 // Called on each processor.
2202 void KMeansBOC::findInitialClusters(KMeansStatsMessage *msg) {
2203
2204  if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::findInitialClusters time=\t%g\n", CkMyPe(), CkWallTimer() );
2205
2206   phaseIter = 0;
2207
2208   // Get info from stats message
2209   CkAssert(numMetrics == msg->numMetrics);
2210   for (int i=0; i<numMetrics; i++) {
2211     keepMetric[i] = msg->filter[i];
2212   }
2213
2214   // Normalize data on local processor.
2215   // **CWL** See my thesis for detailed discussion of normalization of
2216   //    performance data.
2217   // **NOTE** This might change if we want to send data based on the filter
2218   //   instead of all the data.
2219   CkAssert(numMetrics*4 == msg->numStats);
2220   for (int i=0; i<numMetrics; i++) {
2221     currentExecTimes[i] -= msg->stats[numMetrics + i];  // take offset
2222     // **CWL** We do not normalize the range. Entry methods that exhibit
2223     //   large absolute timing variations should be allowed to contribute
2224     //   more to the Euclidean distance measure!
2225     // currentExecTimes[i] /= msg->stats[2*numMetrics + i];
2226   }
2227
2228   // **NOTE** This might change if we want to send data based on the filter
2229   //   instead of all the data.
2230   CkAssert(numK*numMetrics == msg->numKPos);
2231   for (int i=0; i<msg->numKPos; i++) {
2232     incKSeeds[i] = msg->kSeedsPos[i];
2233   }
2234
2235   // Decide which KSeed this processor belongs to.
2236   minDistance = calculateDistance(0);
2237   DEBUGN("[%d] Phase %d Iter %d | Distance from 0 = %lf \n", CkMyPe(), 
2238            currentPhase, phaseIter, minDistance);
2239   minK = 0;
2240   for (int i=1; i<numK; i++) {
2241     double distance = calculateDistance(i);
2242     DEBUGN("[%d] Phase %d Iter %d | Distance from %d = %lf \n", CkMyPe(), 
2243              currentPhase, phaseIter, i, distance);
2244     if (distance < minDistance) {
2245       minDistance = distance;
2246       minK = i;
2247     }
2248   }
2249
2250   // Set up a reduction with the modification vector to the root (0).
2251   //
2252   // The modification vector sends a negative value for each metric
2253   //   for the K this processor no longer belongs to and a positive
2254   //   value to the K the processor now belongs. In addition, a -1.0
2255   //   is sent to the K it is leaving and a +1.0 to the K it is 
2256   //   joining.
2257   //
2258   // The processor must still contribute a "zero returns" even if
2259   //   nothing changes. This will be the basis for determine
2260   //   convergence at the root.
2261   //
2262   // The addtional +1 is meant for the count-change that must be
2263   //   maintained for the special cases at the root when some K
2264   //   may be deprived of all processor points or go from 0 to a
2265   //   positive number of processors (see later comments).
2266   double *modVector = new double[numK*(numMetrics+1)];
2267   for (int i=0; i<numK; i++) {
2268     for (int j=0; j<numMetrics+1; j++) {
2269       modVector[i*(numMetrics+1) + j] = 0.0;
2270     }
2271   }
2272   for (int i=0; i<numMetrics; i++) {
2273     // for this initialization, only positive values need be sent.
2274     modVector[minK*(numMetrics+1) + i] = currentExecTimes[i];
2275   }
2276   modVector[minK*(numMetrics+1)+numMetrics] = 1.0;
2277
2278   CkCallback cb(CkIndex_KMeansBOC::updateKSeeds(NULL), 
2279                 0, thisProxy);
2280   contribute(numK*(numMetrics+1)*sizeof(double), modVector, 
2281              CkReduction::sum_double, cb);  
2282 }
2283
2284 double KMeansBOC::calculateDistance(int k) {
2285   double ret = 0.0;
2286   for (int i=0; i<numMetrics; i++) {
2287     if (keepMetric[i]) {
2288       DEBUGN("[%d] Phase %d Iter %d Metric %d Exec %lf Seed %lf \n", 
2289              CkMyPe(), currentPhase, phaseIter, i,
2290                currentExecTimes[i], incKSeeds[k*numMetrics + i]);
2291       ret += pow(currentExecTimes[i] - incKSeeds[k*numMetrics + i], 2.0);
2292     }
2293   }
2294   return sqrt(ret);
2295 }
2296
2297 void KMeansBOC::updateKSeeds(CkReductionMsg *msg) {
2298   CkAssert(CkMyPe() == 0);
2299
2300   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::updateKSeeds time=\t%g\n", CkMyPe(), CkWallTimer() );
2301
2302   double *modVector = (double *)msg->getData();
2303   // sanity check
2304   CkAssert(numK*(numMetrics+1)*sizeof(double) == msg->getSize());
2305
2306   // A quick convergence test.
2307   bool hasChanges = false;
2308   for (int i=0; i<numK; i++) {
2309     hasChanges = hasChanges || 
2310       (modVector[i*(numMetrics+1) + numMetrics] != 0.0);
2311   }
2312   if (!hasChanges) {
2313     delete msg;
2314     findRepresentatives();
2315   } else {
2316     int overallChange = 0;
2317     for (int i=0; i<numK; i++) {
2318       int change = (int)modVector[i*(numMetrics+1) + numMetrics];
2319       if (change != 0) {
2320         overallChange += change;
2321         // modify the k seeds based on the modification vectors coming in
2322         //
2323         // If a seed initially has no members, its contents do not matter and
2324         //   is simply set to the average of the incoming vector.
2325         // If the change causes a seed to lose all its members, do nothing.
2326         //   Its last-known location is kept to allow it to re-capture
2327         //   membership at the next iteration rather than apply the last
2328         //   changes (which snaps the point unnaturally to 0,0).
2329         // Otherwise, apply the appropriate vector changes.
2330         CkAssert((kNumMembers[i] + change >= 0) &&
2331                  (kNumMembers[i] + change <= CkNumPes()));
2332         if (kNumMembers[i] == 0) {
2333           CkAssert(change > 0);
2334           for (int j=0; j<numMetrics; j++) {
2335             kSeeds[i*numMetrics + j] = modVector[i*(numMetrics+1) + j]/change;
2336           }
2337         } else if (kNumMembers[i] + change == 0) {
2338           // do nothing.
2339         } else {
2340           for (int j=0; j<numMetrics; j++) {
2341             kSeeds[i*numMetrics + j] *= kNumMembers[i];
2342             kSeeds[i*numMetrics + j] += modVector[i*(numMetrics+1) + j];
2343             kSeeds[i*numMetrics + j] /= kNumMembers[i] + change;
2344           }
2345         }
2346         kNumMembers[i] += change;
2347       }
2348       DEBUGN("[%d] Phase %d Iter %d K = %d Membership Count = %d\n",
2349              CkMyPe(), currentPhase, phaseIter, i, kNumMembers[i]);
2350     }
2351     delete msg;
2352
2353     // broadcast the new seed locations.
2354     KSeedsMessage *outmsg = new (numK*numMetrics) KSeedsMessage;
2355     outmsg->numKPos = numK*numMetrics;
2356     for (int i=0; i<numK*numMetrics; i++) {
2357       outmsg->kSeedsPos[i] = kSeeds[i];
2358     }
2359
2360     thisProxy.updateSeedMembership(outmsg);
2361   }
2362 }
2363
2364 // Called on all processors
2365 void KMeansBOC::updateSeedMembership(KSeedsMessage *msg) {
2366
2367   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::updateSeedMembership time=\t%g\n", CkMyPe(), CkWallTimer() );
2368
2369   phaseIter++;
2370
2371   // **NOTE** This might change if we want to send data based on the filter
2372   //   instead of all the data.
2373   CkAssert(numK*numMetrics == msg->numKPos);
2374   for (int i=0; i<msg->numKPos; i++) {
2375     incKSeeds[i] = msg->kSeedsPos[i];
2376   }
2377
2378   // Decide which KSeed this processor belongs to.
2379   lastMinK = minK;
2380   minDistance = calculateDistance(0);
2381   DEBUGN("[%d] Phase %d Iter %d | Distance from 0 = %lf \n", CkMyPe(), 
2382          currentPhase, phaseIter, minDistance);
2383
2384   minK = 0;
2385   for (int i=1; i<numK; i++) {
2386     double distance = calculateDistance(i);
2387     DEBUGN("[%d] Phase %d Iter %d | Distance from %d = %lf \n", CkMyPe(), 
2388            currentPhase, phaseIter, i, distance);
2389     if (distance < minDistance) {
2390       minDistance = distance;
2391       minK = i;
2392     }
2393   }
2394
2395   double *modVector = new double[numK*(numMetrics+1)];
2396   for (int i=0; i<numK; i++) {
2397     for (int j=0; j<numMetrics+1; j++) {
2398       modVector[i*(numMetrics+1) + j] = 0.0;
2399     }
2400   }
2401
2402   if (minK != lastMinK) {
2403     for (int i=0; i<numMetrics; i++) {
2404       modVector[minK*(numMetrics+1) + i] = currentExecTimes[i];
2405       modVector[lastMinK*(numMetrics+1) + i] = -currentExecTimes[i];
2406     }
2407     modVector[minK*(numMetrics+1)+numMetrics] = 1.0;
2408     modVector[lastMinK*(numMetrics+1)+numMetrics] = -1.0;
2409   }
2410
2411   CkCallback cb(CkIndex_KMeansBOC::updateKSeeds(NULL), 
2412                 0, thisProxy);
2413   contribute(numK*(numMetrics+1)*sizeof(double), modVector, 
2414              CkReduction::sum_double, cb);  
2415 }
2416
2417 void KMeansBOC::findRepresentatives() {
2418
2419   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::findRepresentatives time=\t%g\n", CkMyPe(), CkWallTimer() );
2420
2421   int numNonEmptyClusters = 0;
2422   for (int i=0; i<numK; i++) {
2423     if (kNumMembers[i] > 0) {
2424       numNonEmptyClusters++;
2425     }
2426   }
2427
2428   int numRepresentatives = peNumKeep;
2429   // **FIXME**
2430   // This is fairly arbitrary. Next time, choose the centers of the top
2431   //   largest clusters.
2432   if (numRepresentatives < numNonEmptyClusters) {
2433     numRepresentatives = numNonEmptyClusters;
2434   }
2435
2436   int slotsRemaining = numRepresentatives;
2437
2438   DEBUGF("Slots = %d | Non-empty = %d \n", slotsRemaining, 
2439          numNonEmptyClusters);
2440
2441   // determine how many exemplars to select per cluster. Currently
2442   //   hardcoded to 1. Future challenge is to decide on other numbers
2443   //   or proportionality.
2444   //
2445   int exemplarsPerCluster = 1;
2446   slotsRemaining -= exemplarsPerCluster*numNonEmptyClusters;
2447
2448   int numCandidateOutliers = CkNumPes() - 
2449     exemplarsPerCluster*numNonEmptyClusters;
2450
2451   double *remainders = new double[numK];
2452   int *assigned = new int[numK];
2453   exemplarChoicesLeft = new int[numK];
2454   outlierChoicesLeft = new int[numK];
2455
2456   for (int i=0; i<numK; i++) {
2457     assigned[i] = 0;
2458     remainders[i] = 
2459       (kNumMembers[i] - exemplarsPerCluster*numNonEmptyClusters) *
2460       slotsRemaining / numCandidateOutliers;
2461     if (remainders[i] >= 0.0) {
2462       assigned[i] = (int)floor(remainders[i]);
2463       remainders[i] -= assigned[i];
2464     } else {
2465       remainders[i] = 0.0;
2466     }
2467   }
2468
2469   for (int i=0; i<numK; i++) {
2470     slotsRemaining -= assigned[i];
2471   }
2472   CkAssert(slotsRemaining >= 0);
2473
2474   // find clusters to assign the loose slots to, in order of
2475   // remainder proportion
2476   while (slotsRemaining > 0) {
2477     double max = 0.0;
2478     int winner = 0;
2479     for (int i=0; i<numK; i++) {
2480       if (remainders[i] > max) {
2481         max = remainders[i];
2482         winner = i;
2483       }
2484     }
2485     assigned[winner]++;
2486     remainders[winner] = 0.0;
2487     slotsRemaining--;
2488   }
2489
2490   // set up how many reduction cycles of min/max we need to conduct to
2491   // select the representatives.
2492   numSelectionIter = exemplarsPerCluster;
2493   for (int i=0; i<numK; i++) {
2494     if (assigned[i] > numSelectionIter) {
2495       numSelectionIter = assigned[i];
2496     }
2497   }
2498   DEBUGF("Selection Iterations = %d\n", numSelectionIter);
2499
2500   for (int i=0; i<numK; i++) {
2501     if (kNumMembers[i] > 0) {
2502       exemplarChoicesLeft[i] = exemplarsPerCluster;
2503       outlierChoicesLeft[i] = assigned[i];
2504     } else {
2505       exemplarChoicesLeft[i] = 0;
2506       outlierChoicesLeft[i] = 0;
2507     }
2508     DEBUGF("%d | Exemplar = %d | Outlier = %d\n", i, exemplarChoicesLeft[i],
2509            outlierChoicesLeft[i]);
2510   }
2511
2512   delete [] assigned;
2513   delete [] remainders;
2514
2515   // send out first broadcast
2516   KSelectionMessage *outmsg = NULL;
2517   if (numSelectionIter > 0) {
2518     outmsg = new (numK, numK, numK) KSelectionMessage;
2519     outmsg->numKMinIDs = numK;
2520     outmsg->numKMaxIDs = numK;
2521     for (int i=0; i<numK; i++) {
2522       outmsg->minIDs[i] = -1;
2523       outmsg->maxIDs[i] = -1;
2524     }
2525     thisProxy.collectDistances(outmsg);
2526   } else {
2527     CkPrintf("Warning: No selection iteration from the start!\n");
2528     // invoke phase completion on all processors
2529     thisProxy.phaseDone();
2530   }
2531 }
2532
2533 /*
2534  *  lastMin = array of minimum champions of the last tournament
2535  *  lastMax = array of maximum champions of the last tournament
2536  *  lastMaxVal = array of last encountered maximum values, allows previous
2537  *                 minimum winners to eliminate themselves from the next
2538  *                 minimum race.
2539  *
2540  *  Called on all processors.
2541  */
2542 void KMeansBOC::collectDistances(KSelectionMessage *msg) {
2543
2544   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::collectDistances time=\t%g\n", CkMyPe(), CkWallTimer() );
2545
2546   DEBUGF("[%d] %d | min = %d max = %d\n", CkMyPe(),
2547          lastMinK, msg->minIDs[lastMinK], msg->maxIDs[lastMinK]);
2548   if ((CkMyPe() == msg->minIDs[lastMinK]) || 
2549       (CkMyPe() == msg->maxIDs[lastMinK])) {
2550     CkAssert(!selected);
2551     selected = true;
2552   }
2553
2554   // build outgoing reduction structure
2555   //   format = minVal | ID | maxVal | ID
2556   double *minMaxAndIDs = NULL;
2557
2558   minMaxAndIDs = new double[numK*4];
2559   // initialize to the appropriate out-of-band values (for error checks)
2560   for (int i=0; i<numK; i++) {
2561     minMaxAndIDs[i*4] = -1.0; // out-of-band min value
2562     minMaxAndIDs[i*4+1] = -1.0; // out of band ID
2563     minMaxAndIDs[i*4+2] = -1.0; // out-of-band max value
2564     minMaxAndIDs[i*4+3] = -1.0; // out of band ID
2565   }
2566   // If I have not won before, I put myself back into the competition
2567   if (!selected) {
2568     DEBUGF("[%d] My Contribution = %lf\n", CkMyPe(), minDistance);
2569     minMaxAndIDs[lastMinK*4] = minDistance;
2570     minMaxAndIDs[lastMinK*4+1] = CkMyPe();
2571     minMaxAndIDs[lastMinK*4+2] = minDistance;
2572     minMaxAndIDs[lastMinK*4+3] = CkMyPe();
2573   }
2574   delete msg;
2575
2576   CkCallback cb(CkIndex_KMeansBOC::findNextMinMax(NULL), 
2577                 0, thisProxy);
2578   contribute(numK*4*sizeof(double), minMaxAndIDs, 
2579              minMaxReductionType, cb);  
2580 }
2581
2582 void KMeansBOC::findNextMinMax(CkReductionMsg *msg) {
2583   // incoming format:
2584   //   minVal | minID | maxVal | maxID
2585
2586   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::findNextMinMax time=\t%g\n", CkMyPe(), CkWallTimer() );
2587
2588   if (numSelectionIter > 0) {
2589     double *incInfo = (double *)msg->getData();
2590     
2591     KSelectionMessage *outmsg = new (numK, numK) KSelectionMessage;
2592     outmsg->numKMinIDs = numK;
2593     outmsg->numKMaxIDs = numK;
2594     
2595     for (int i=0; i<numK; i++) {
2596       DEBUGF("%d | %lf %d %lf %d \n", i, 
2597              incInfo[i*4], (int)incInfo[i*4+1], 
2598              incInfo[i*4+2], (int)incInfo[i*4+3]);
2599     }
2600
2601     for (int i=0; i<numK; i++) {
2602       if (exemplarChoicesLeft[i] > 0) {
2603         outmsg->minIDs[i] = (int)incInfo[i*4+1];
2604         exemplarChoicesLeft[i]--;
2605       } else {
2606         outmsg->minIDs[i] = -1;
2607       }
2608       if (outlierChoicesLeft[i] > 0) {
2609         outmsg->maxIDs[i] = (int)incInfo[i*4+3];
2610         outlierChoicesLeft[i]--;
2611       } else {
2612         outmsg->maxIDs[i] = -1;
2613       }
2614     }
2615     thisProxy.collectDistances(outmsg);
2616     numSelectionIter--;
2617   } else {
2618     // invoke phase completion on all processors
2619     thisProxy.phaseDone();
2620   }
2621 }
2622
2623 /**
2624  *  Completion of the K-Means clustering and data selection of one phase
2625  *    of the computation.
2626  *
2627  *  Called on every processor.
2628  */
2629 void KMeansBOC::phaseDone() {
2630
2631   //  if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::phaseDone time=\t%g\n", CkMyPe(), CkWallTimer() );
2632
2633   LogPool *pool = CkpvAccess(_trace)->_logPool;
2634   CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
2635
2636   // now decide on what to do with the decision.
2637   if (!selected) {
2638     if (usePhases) {
2639       pool->keepPhase[currentPhase] = false;
2640     } else {
2641       // if not using phases, we're working on the whole log
2642       pool->setAllPhases(false);
2643     }
2644   }
2645
2646   // **FIXME** (?) - All processors have to agree on this, or the reduction
2647   //   will not be correct! The question is "is this enforcible?"
2648   if ((currentPhase == (pool->numPhases-1)) || !usePhases) {
2649     // We're done
2650     int dummy = 0;
2651     CkCallback cb(CkIndex_TraceProjectionsBOC::kMeansDone(NULL), 
2652                   0, bocProxy);
2653     contribute(sizeof(int), &dummy, CkReduction::sum_int, cb);
2654   } else {
2655     // reset all phase-based k-means data and decisions
2656
2657     // **FIXME**!!!!!    
2658     
2659     // invoke the next K-Means computation phase.
2660     currentPhase++;
2661     thisProxy[CkMyPe()].getNextPhaseMetrics();
2662   }
2663 }
2664
2665 void TraceProjectionsBOC::startEndTimeAnalysis()
2666 {
2667  //CkPrintf("[%d] TraceProjectionsBOC::startEndTimeAnalysis time=\t%g\n", CkMyPe(), CkWallTimer() );
2668
2669   endTime = CkpvAccess(_trace)->endTime;
2670   // CkPrintf("[%d] End time is %lf us\n", CkMyPe(), endTime*1e06);
2671
2672   CkCallback cb(CkIndex_TraceProjectionsBOC::endTimeDone(NULL), 
2673                 0, thisProxy);
2674   contribute(sizeof(double), &endTime, CkReduction::max_double, cb);  
2675 }
2676
2677 void TraceProjectionsBOC::endTimeDone(CkReductionMsg *msg)
2678 {
2679  //if(CkMyPe()==0)    CkPrintf("[%d] TraceProjectionsBOC::endTimeDone time=\t%g parModulesRemaining:%d\n", CkMyPe(), CkWallTimer(), parModulesRemaining);
2680
2681   CkAssert(CkMyPe() == 0);
2682   parModulesRemaining--;
2683   if (CkpvAccess(_trace) != NULL) {
2684     CkpvAccess(_trace)->_logPool->globalEndTime = *(double *)msg->getData();
2685     // CkPrintf("End time determined to be %lf us\n",
2686     //       (CkpvAccess(_trace)->_logPool->globalEndTime)*1e06);
2687   }
2688   delete msg;
2689   if (parModulesRemaining == 0) {
2690     thisProxy[CkMyPe()].finalize();
2691   }
2692 }
2693
2694 void TraceProjectionsBOC::kMeansDone(CkReductionMsg *msg) {
2695
2696  if(CkMyPe()==0)  CkPrintf("[%d] TraceProjectionsBOC::kMeansDone time=\t%g\n", CkMyPe(), CkWallTimer() );
2697
2698   CkAssert(CkMyPe() == 0);
2699   parModulesRemaining--;
2700   CkPrintf("K-Means Analysis Time = %lf seconds\n",
2701            CmiWallTimer()-analysisStartTime);
2702   delete msg;
2703   if (parModulesRemaining == 0) {
2704     thisProxy[CkMyPe()].finalize();
2705   }
2706 }
2707
2708 /**
2709  *
2710  *  This version is called (on processor 0) only if flushCheck fails.
2711  *
2712  */
2713 void TraceProjectionsBOC::kMeansDone() {
2714   CkAssert(CkMyPe() == 0);
2715   parModulesRemaining--;
2716   CkPrintf("K-Means Analysis Aborted because of flush. Time taken = %lf seconds\n",
2717            CmiWallTimer()-analysisStartTime);
2718   if (parModulesRemaining == 0) {
2719     thisProxy[CkMyPe()].finalize();
2720   }
2721 }
2722
2723 void TraceProjectionsBOC::finalize()
2724 {
2725   CkAssert(CkMyPe() == 0);
2726   //CkPrintf("Total Analysis Time = %lf seconds\n", 
2727   //       CmiWallTimer()-analysisStartTime);
2728   thisProxy.closingTraces();
2729 }
2730
2731 // Called on every processor
2732 void TraceProjectionsBOC::closingTraces() {
2733   CkpvAccess(_trace)->closeTrace();
2734
2735     // subtle:  reduction needs to go to the PE which started CkExit()
2736   int pe = 0;
2737   if (endPe != -1) pe = endPe;
2738   CkCallback cb(CkIndex_TraceProjectionsBOC::closeParallelShutdown(NULL), 
2739                 pe, thisProxy); 
2740   contribute(0, NULL, CkReduction::sum_int, cb);  
2741 }
2742
2743 // The sole purpose of this reduction is to decide whether or not
2744 //   Projections as a module needs to call CkExit() to get other
2745 //   modules to shutdown.
2746 void TraceProjectionsBOC::closeParallelShutdown(CkReductionMsg *msg) {
2747   CkAssert(endPe == -1 && CkMyPe() ==0 || CkMyPe() == endPe);
2748   delete msg;
2749   // decide if CkExit() needs to be called
2750   if (!CkpvAccess(_trace)->converseExit) {
2751     CkExit();
2752   }
2753 }
2754 /*
2755  *  Registration and definition of the Outlier Reduction callback.
2756  *  Format: Sum | Min | Max | Sum of Squares
2757  */
2758 CkReductionMsg *outlierReduction(int nMsgs,
2759                                  CkReductionMsg **msgs) {
2760   int numBytes = 0;
2761   int numMetrics = 0;
2762   double *ret = NULL;
2763
2764   if (nMsgs == 1) {
2765     // nothing to do, just pass it on
2766     return CkReductionMsg::buildNew(msgs[0]->getSize(),msgs[0]->getData());
2767   }
2768
2769   if (nMsgs > 1) {
2770     numBytes = msgs[0]->getSize();
2771     // sanity checks
2772     if (numBytes%sizeof(double) != 0) {
2773       CkAbort("Outlier Reduction Size incompatible with doubles!\n");
2774     }
2775     if ((numBytes/sizeof(double))%4 != 0) {
2776       CkAbort("Outlier Reduction Size Array not divisible by 4!\n");
2777     }
2778     numMetrics = (numBytes/sizeof(double))/4;
2779     ret = new double[numMetrics*4];
2780
2781     // copy the first message data into the return structure first
2782     for (int i=0; i<numMetrics*4; i++) {
2783       ret[i] = ((double *)msgs[0]->getData())[i];
2784     }
2785
2786     // Sum | Min | Max | Sum of Squares
2787     for (int msgIdx=1; msgIdx<nMsgs; msgIdx++) {
2788       for (int i=0; i<numMetrics; i++) {
2789         // Sum
2790         ret[i] += ((double *)msgs[msgIdx]->getData())[i];
2791         // Min
2792         ret[numMetrics + i] =
2793           (ret[numMetrics + i] < 
2794            ((double *)msgs[msgIdx]->getData())[numMetrics + i]) 
2795           ? ret[numMetrics + i] : 
2796           ((double *)msgs[msgIdx]->getData())[numMetrics + i];
2797         // Max
2798         ret[2*numMetrics + i] = 
2799           (ret[2*numMetrics + i] >
2800            ((double *)msgs[msgIdx]->getData())[2*numMetrics + i])
2801           ? ret[2*numMetrics + i] :
2802           ((double *)msgs[msgIdx]->getData())[2*numMetrics + i];
2803         // Sum of Squares (squaring already done at leaf)
2804         ret[3*numMetrics + i] +=
2805           ((double *)msgs[msgIdx]->getData())[3*numMetrics + i];
2806       }
2807     }
2808   }
2809   
2810   /* apparently, we do not delete the incoming messages */
2811   return CkReductionMsg::buildNew(numBytes,ret);
2812 }
2813
2814 /*
2815  * The only reason we have a user-defined reduction is to support
2816  *   identification of the "winning" processors as well as to handle
2817  *   both the min and the max of each "tournament". A simple min/max
2818  *   discovery cannot handle ties.
2819  */
2820 CkReductionMsg *minMaxReduction(int nMsgs,
2821                                 CkReductionMsg **msgs) {
2822   CkAssert(nMsgs > 0);
2823
2824   int numBytes = msgs[0]->getSize();
2825   CkAssert(numBytes%sizeof(double) == 0);
2826   int numK = (numBytes/sizeof(double))/4;
2827
2828   double *ret = new double[numK*4];
2829   // fill with out-of-band values
2830   for (int i=0; i<numK; i++) {
2831     ret[i*4] = -1.0;
2832     ret[i*4+1] = -1.0;
2833     ret[i*4+2] = -1.0;
2834     ret[i*4+3] = -1.0;
2835   }
2836
2837   // incoming format K * (minVal | minIdx | maxVal | maxIdx)
2838   for (int i=0; i<nMsgs; i++) {
2839     double *temp = (double *)msgs[i]->getData();
2840     for (int j=0; j<numK; j++) {
2841       // no previous valid min
2842       if (ret[j*4+1] < 0) {
2843         // fill it in only if the incoming min is valid
2844         if (temp[j*4+1] >= 0) {
2845           ret[j*4] = temp[j*4];      // fill min value
2846           ret[j*4+1] = temp[j*4+1];  // fill ID
2847         }
2848       } else {
2849         // find Min, only if incoming min is valid
2850         if (temp[j*4+1] >= 0) {
2851           if (temp[j*4] < ret[j*4]) {
2852             ret[j*4] = temp[j*4];      // replace min value
2853             ret[j*4+1] = temp[j*4+1];  // replace ID
2854           }
2855         }
2856       }
2857       // no previous valid max
2858       if (ret[j*4+3] < 0) {
2859         // fill only if the incoming max is valid
2860         if (temp[j*4+3] >= 0) {
2861           ret[j*4+2] = temp[j*4+2];  // fill max value
2862           ret[j*4+3] = temp[j*4+3];  // fill ID
2863         }
2864       } else {
2865         // find Max, only if incoming max is valid
2866         if (temp[j*4+3] >= 0) {
2867           if (temp[j*4+2] > ret[j*4+2]) {
2868             ret[j*4+2] = temp[j*4+2];  // replace max value
2869             ret[j*4+3] = temp[j*4+3];  // replace ID
2870           }
2871         }
2872       }
2873     }
2874   }
2875   CkReductionMsg *redmsg = CkReductionMsg::buildNew(numBytes, ret);
2876   delete [] ret;
2877   return redmsg;
2878 }
2879
2880 #include "TraceProjections.def.h"
2881 #endif //PROJ_ANALYSIS
2882
2883 /*@}*/