Merge branch 'charm' into typed_reductions
[charm.git] / src / ck-perf / trace-projections.C
1 /**
2  * \addtogroup CkPerf
3 */
4 /*@{*/
5
6 #include <string.h>
7
8 #include "charm++.h"
9 #include "trace-projections.h"
10 #include "trace-projectionsBOC.h"
11
12 #if DEBUG_PROJ
13 #define DEBUGF(format, ...) CkPrintf(format, ## __VA_ARGS__)
14 #else
15 #define DEBUGF(format, ...)           // CmiPrintf x
16 #endif
17 #define DEBUGN(format, ...)  // easy way to selectively disable DEBUGs
18
19 #define DefaultLogBufSize      1000000
20
21 // **CW** Simple delta encoding implementation
22 // delta encoding is on by default. It may be turned off later in
23 // the runtime.
24 int deltaLog;
25 int nonDeltaLog;
26
27 int checknested=0;              // check illegal nested begin/end execute 
28
29 #ifdef PROJ_ANALYSIS
30 // BOC operations readonlys
31 CkGroupID traceProjectionsGID;
32 CkGroupID kMeansGID;
33
34 // New reduction type for Outlier Analysis purposes. This is allowed to be
35 // a global variable according to the Charm++ manual.
36 CkReductionMsg *outlierReduction(int nMsgs,
37                                  CkReductionMsg **msgs);
38 CkReductionMsg *minMaxReduction(int nMsgs,
39                                 CkReductionMsg **msgs);
40 CkReduction::reducerType outlierReductionType;
41 CkReduction::reducerType minMaxReductionType;
42 #endif // PROJ_ANALYSIS
43
44 CkpvStaticDeclare(TraceProjections*, _trace);
45 CtvStaticDeclare(int,curThreadEvent);
46
47 CkpvDeclare(CmiInt8, CtrLogBufSize);
48
49 typedef CkVec<char *>  usrEventVec;
50 CkpvStaticDeclare(usrEventVec, usrEventlist);
51 class UsrEvent {
52 public:
53   int e;
54   char *str;
55   UsrEvent(int _e, char* _s): e(_e),str(_s) {}
56 };
57 CkpvStaticDeclare(CkVec<UsrEvent *>*, usrEvents);
58
59 // When tracing is disabled, these are defined as empty static inlines
60 // in the header, to minimize overhead
61 #if CMK_TRACE_ENABLED
62 /// Disable the outputting of the trace logs
63 void disableTraceLogOutput()
64
65   CkpvAccess(_trace)->setWriteData(false);
66 }
67
68 /// Enable the outputting of the trace logs
69 void enableTraceLogOutput()
70 {
71   CkpvAccess(_trace)->setWriteData(true);
72 }
73
74 /// Force the log files to be flushed
75 void flushTraceLog()
76 {
77   CkpvAccess(_trace)->traceFlushLog();
78 }
79 #endif
80
81 #if ! CMK_TRACE_ENABLED
82 static int warned=0;
83 #define OPTIMIZED_VERSION       \
84         if (!warned) { warned=1;        \
85         CmiPrintf("\n\n!!!! Warning: traceUserEvent not available in optimized version!!!!\n\n\n"); }
86 #else
87 #define OPTIMIZED_VERSION /*empty*/
88 #endif // CMK_TRACE_ENABLED
89
90 /*
91 On T3E, we need to have file number control by open/close files only when needed.
92 */
93 #if CMK_TRACE_LOGFILE_NUM_CONTROL
94   #define OPEN_LOG openLog("a");
95   #define CLOSE_LOG closeLog();
96 #else
97   #define OPEN_LOG
98   #define CLOSE_LOG
99 #endif //CMK_TRACE_LOGFILE_NUM_CONTROL
100
101 #if CMK_HAS_COUNTER_PAPI
102 int numPAPIEvents = 2;
103 int papiEvents[] = { PAPI_L3_DCM, PAPI_FP_OPS };
104 char *papiEventNames[] = {"PAPI_L3_DCM", "PAPI_FP_OPS"};
105 #endif // CMK_HAS_COUNTER_PAPI
106
107 /**
108   For each TraceFoo module, _createTraceFoo() must be defined.
109   This function is called in _createTraces() generated in moduleInit.C
110 */
111 void _createTraceprojections(char **argv)
112 {
113   DEBUGF("%d createTraceProjections\n", CkMyPe());
114   CkpvInitialize(CkVec<char *>, usrEventlist);
115   CkpvInitialize(CkVec<UsrEvent *>*, usrEvents);
116   CkpvAccess(usrEvents) = new CkVec<UsrEvent *>();
117 #if CMK_BLUEGENE_CHARM
118   // CthRegister does not call the constructor
119 //  CkpvAccess(usrEvents) = CkVec<UsrEvent *>();
120 #endif //CMK_BLUEGENE_CHARM
121   CkpvInitialize(TraceProjections*, _trace);
122   CkpvAccess(_trace) = new  TraceProjections(argv);
123   CkpvAccess(_traces)->addTrace(CkpvAccess(_trace));
124   if (CkMyPe()==0) CkPrintf("Charm++: Tracemode Projections enabled.\n");
125 }
126  
127 /* ****** CW TEMPORARY LOCATION ***** Support for thread listeners */
128
129 struct TraceThreadListener {
130   struct CthThreadListener base;
131   int event;
132   int msgType;
133   int ep;
134   int srcPe;
135   int ml;
136   CmiObjId idx;
137 };
138
139 extern "C"
140 void traceThreadListener_suspend(struct CthThreadListener *l)
141 {
142   TraceThreadListener *a=(TraceThreadListener *)l;
143   /* here, we activate the appropriate trace codes for the appropriate
144      registered modules */
145   traceSuspend();
146 }
147
148 extern "C"
149 void traceThreadListener_resume(struct CthThreadListener *l) 
150 {
151   TraceThreadListener *a=(TraceThreadListener *)l;
152   /* here, we activate the appropriate trace codes for the appropriate
153      registered modules */
154   _TRACE_BEGIN_EXECUTE_DETAILED(a->event,a->msgType,a->ep,a->srcPe,a->ml,
155                                 CthGetThreadID(a->base.thread));
156   a->event=-1;
157   a->srcPe=CkMyPe(); /* potential lie to migrated threads */
158   a->ml=0;
159 }
160
161 extern "C"
162 void traceThreadListener_free(struct CthThreadListener *l) 
163 {
164   TraceThreadListener *a=(TraceThreadListener *)l;
165   delete a;
166 }
167
168 void TraceProjections::traceAddThreadListeners(CthThread tid, envelope *e)
169 {
170 #if CMK_TRACE_ENABLED
171   /* strip essential information from the envelope */
172   TraceThreadListener *a= new TraceThreadListener;
173   
174   a->base.suspend=traceThreadListener_suspend;
175   a->base.resume=traceThreadListener_resume;
176   a->base.free=traceThreadListener_free;
177   a->event=e->getEvent();
178   a->msgType=e->getMsgtype();
179   a->ep=e->getEpIdx();
180   a->srcPe=e->getSrcPe();
181   a->ml=e->getTotalsize();
182
183   CthAddListener(tid, (CthThreadListener *)a);
184 #endif
185 }
186
187 void LogPool::openLog(const char *mode)
188 {
189 #if CMK_PROJECTIONS_USE_ZLIB
190   if(compressed) {
191     if (nonDeltaLog) {
192       do {
193         zfp = gzopen(fname, mode);
194       } while (!zfp && (errno == EINTR || errno == EMFILE));
195       if(!zfp) CmiAbort("Cannot open Projections Compressed Non Delta Trace File for writing...\n");
196     }
197     if (deltaLog) {
198       do {
199         deltazfp = gzopen(dfname, mode);
200       } while (!deltazfp && (errno == EINTR || errno == EMFILE));
201       if (!deltazfp) 
202         CmiAbort("Cannot open Projections Compressed Delta Trace File for writing...\n");
203     }
204   } else {
205     if (nonDeltaLog) {
206       do {
207         fp = fopen(fname, mode);
208       } while (!fp && (errno == EINTR || errno == EMFILE));
209       if (!fp) {
210         CkPrintf("[%d] Attempting to open file [%s]\n",CkMyPe(),fname);
211         CmiAbort("Cannot open Projections Non Delta Trace File for writing...\n");
212       }
213     }
214     if (deltaLog) {
215       do {
216         deltafp = fopen(dfname, mode);
217       } while (!deltafp && (errno == EINTR || errno == EMFILE));
218       if (!deltafp) 
219         CmiAbort("Cannot open Projections Delta Trace File for writing...\n");
220     }
221   }
222 #else
223   if (nonDeltaLog) {
224     do {
225       fp = fopen(fname, mode);
226     } while (!fp && (errno == EINTR || errno == EMFILE));
227     if (!fp) {
228       CkPrintf("[%d] Attempting to open file [%s]\n",CkMyPe(),fname);
229       CmiAbort("Cannot open Projections Non Delta Trace File for writing...\n");
230     }
231   }
232   if (deltaLog) {
233     do {
234       deltafp = fopen(dfname, mode);
235     } while (!deltafp && (errno == EINTR || errno == EMFILE));
236     if(!deltafp) 
237       CmiAbort("Cannot open Projections Delta Trace File for writing...\n");
238   }
239 #endif
240 }
241
242 void LogPool::closeLog(void)
243 {
244 #if CMK_PROJECTIONS_USE_ZLIB
245   if(compressed) {
246     if (nonDeltaLog) gzclose(zfp);
247     if (deltaLog) gzclose(deltazfp);
248     return;
249   }
250 #endif
251   if (nonDeltaLog) { 
252 #if !defined(_WIN32) || defined(__CYGWIN__)
253     fsync(fileno(fp)); 
254 #endif
255     fclose(fp); 
256   }
257   if (deltaLog)  { 
258 #if !defined(_WIN32) || defined(__CYGWIN__)
259     fsync(fileno(deltafp)); 
260 #endif
261     fclose(deltafp);  
262   }
263 }
264
265 LogPool::LogPool(char *pgm) {
266   pool = new LogEntry[CkpvAccess(CtrLogBufSize)];
267   // defaults to writing data (no outlier changes)
268   writeData = true;
269   numEntries = 0;
270   // **CW** for simple delta encoding
271   prevTime = 0.0;
272   timeErr = 0.0;
273   globalEndTime = 0.0;
274   headerWritten = 0;
275   numPhases = 0;
276   hasFlushed = false;
277
278   keepPhase = NULL;
279
280   fileCreated = false;
281   poolSize = CkpvAccess(CtrLogBufSize);
282   pgmname = new char[strlen(pgm)+1];
283   strcpy(pgmname, pgm);
284 }
285
286 void LogPool::createFile(const char *fix)
287 {
288   if (fileCreated) {
289     return;
290   }
291
292   char* filenameLastPart = strrchr(pgmname, PATHSEP) + 1; // Last occurrence of path separator
293   char *pathPlusFilePrefix = new char[1024];
294
295   if(nSubdirs > 0){
296     int sd = CkMyPe() % nSubdirs;
297     char *subdir = new char[1024];
298     sprintf(subdir, "%s.projdir.%d", pgmname, sd);
299     CmiMkdir(subdir);
300     sprintf(pathPlusFilePrefix, "%s%c%s%s", subdir, PATHSEP, filenameLastPart, fix);
301     delete[] subdir;
302   } else {
303     sprintf(pathPlusFilePrefix, "%s%s", pgmname, fix);
304   }
305
306   char pestr[10];
307   sprintf(pestr, "%d", CkMyPe());
308 #if CMK_PROJECTIONS_USE_ZLIB
309   int len;
310   if(compressed)
311     len = strlen(pathPlusFilePrefix)+strlen(".logold")+strlen(pestr)+strlen(".gz")+3;
312   else
313     len = strlen(pathPlusFilePrefix)+strlen(".logold")+strlen(pestr)+3;
314 #else
315   int len = strlen(pathPlusFilePrefix)+strlen(".logold")+strlen(pestr)+3;
316 #endif
317
318   if (nonDeltaLog) {
319     fname = new char[len];
320   }
321   if (deltaLog) {
322     dfname = new char[len];
323   }
324 #if CMK_PROJECTIONS_USE_ZLIB
325   if(compressed) {
326     if (deltaLog && nonDeltaLog) {
327       sprintf(fname, "%s.%s.logold.gz",  pathPlusFilePrefix, pestr);
328       sprintf(dfname, "%s.%s.log.gz", pathPlusFilePrefix, pestr);
329     } else {
330       if (nonDeltaLog) {
331         sprintf(fname, "%s.%s.log.gz", pathPlusFilePrefix,pestr);
332       } else {
333         sprintf(dfname, "%s.%s.log.gz", pathPlusFilePrefix, pestr);
334       }
335     }
336   } else {
337     if (deltaLog && nonDeltaLog) {
338       sprintf(fname, "%s.%s.logold", pathPlusFilePrefix, pestr);
339       sprintf(dfname, "%s.%s.log", pathPlusFilePrefix, pestr);
340     } else {
341       if (nonDeltaLog) {
342         sprintf(fname, "%s.%s.log", pathPlusFilePrefix, pestr);
343       } else {
344         sprintf(dfname, "%s.%s.log", pathPlusFilePrefix, pestr);
345       }
346     }
347   }
348 #else
349   if (deltaLog && nonDeltaLog) {
350     sprintf(fname, "%s.%s.logold", pathPlusFilePrefix, pestr);
351     sprintf(dfname, "%s.%s.log", pathPlusFilePrefix, pestr);
352   } else {
353     if (nonDeltaLog) {
354       sprintf(fname, "%s.%s.log", pathPlusFilePrefix, pestr);
355     } else {
356       sprintf(dfname, "%s.%s.log", pathPlusFilePrefix, pestr);
357     }
358   }
359 #endif
360   fileCreated = true;
361   delete[] pathPlusFilePrefix;
362   openLog("w+");
363   CLOSE_LOG 
364 }
365
366 void LogPool::createSts(const char *fix)
367 {
368   CkAssert(CkMyPe() == 0);
369   // create the sts file
370   char *fname = new char[strlen(CkpvAccess(traceRoot))+strlen(fix)+strlen(".sts")+2];
371   sprintf(fname, "%s%s.sts", CkpvAccess(traceRoot), fix);
372   do
373     {
374       stsfp = fopen(fname, "w");
375     } while (!stsfp && (errno == EINTR || errno == EMFILE));
376   if(stsfp==0)
377     CmiAbort("Cannot open projections sts file for writing.\n");
378   delete[] fname;
379 }  
380
381 void LogPool::createRC()
382 {
383   // create the projections rc file.
384   fname = 
385     new char[strlen(CkpvAccess(traceRoot))+strlen(".projrc")+1];
386   sprintf(fname, "%s.projrc", CkpvAccess(traceRoot));
387   do {
388     rcfp = fopen(fname, "w");
389   } while (!rcfp && (errno == EINTR || errno == EMFILE));
390   if (rcfp==0) {
391     CmiAbort("Cannot open projections configuration file for writing.\n");
392   }
393   delete[] fname;
394 }
395
396 LogPool::~LogPool() 
397 {
398   if (writeData) {
399     writeLog();
400 #if !CMK_TRACE_LOGFILE_NUM_CONTROL
401     closeLog();
402 #endif
403   }
404
405 #if CMK_BLUEGENE_CHARM
406   extern int correctTimeLog;
407   if (correctTimeLog) {
408     createFile("-bg");
409     if (CkMyPe() == 0) {
410       createSts("-bg");
411     }
412     writeHeader();
413     if (CkMyPe() == 0) writeSts(NULL);
414     postProcessLog();
415   }
416 #endif
417
418   delete[] pool;
419   delete [] fname;
420 }
421
422 void LogPool::writeHeader()
423 {
424   if (headerWritten) return;
425   headerWritten = 1;
426   if(!binary) {
427 #if CMK_PROJECTIONS_USE_ZLIB
428     if(compressed) {
429       if (nonDeltaLog) {
430         gzprintf(zfp, "PROJECTIONS-RECORD %d\n", numEntries);
431       }
432       if (deltaLog) {
433         gzprintf(deltazfp, "PROJECTIONS-RECORD %d DELTA\n", numEntries);
434       }
435     } 
436     else /* else clause is below... */
437 #endif
438     /*... may hang over from else above */ {
439       if (nonDeltaLog) {
440         fprintf(fp, "PROJECTIONS-RECORD %d\n", numEntries);
441       }
442       if (deltaLog) {
443         fprintf(deltafp, "PROJECTIONS-RECORD %d DELTA\n", numEntries);
444       }
445     }
446   }
447   else { // binary
448       if (nonDeltaLog) {
449         fwrite(&numEntries,sizeof(numEntries),1,fp);
450       }
451       if (deltaLog) {
452         fwrite(&numEntries,sizeof(numEntries),1,deltafp);
453       }
454   }
455 }
456
457 void LogPool::writeLog(void)
458 {
459   createFile();
460   OPEN_LOG
461   writeHeader();
462   if (nonDeltaLog) write(0);
463   if (deltaLog) write(1);
464   CLOSE_LOG
465 }
466
467 void LogPool::write(int writedelta) 
468 {
469   // **CW** Simple delta encoding implementation
470   // prevTime has to be maintained as an object variable because
471   // LogPool::write may be called several times depending on the
472   // +logsize value.
473   PUP::er *p = NULL;
474   if (binary) {
475     p = new PUP::toDisk(writedelta?deltafp:fp);
476   }
477 #if CMK_PROJECTIONS_USE_ZLIB
478   else if (compressed) {
479     p = new toProjectionsGZFile(writedelta?deltazfp:zfp);
480   }
481 #endif
482   else {
483     p = new toProjectionsFile(writedelta?deltafp:fp);
484   }
485   CmiAssert(p);
486   int curPhase = 0;
487   // **FIXME** - Should probably consider a more sophisticated bounds-based
488   //   approach for selective writing instead of making multiple if-checks
489   //   for every single event.
490   for(UInt i=0; i<numEntries; i++) {
491     if (!writedelta) {
492       if (keepPhase == NULL) {
493         // default case, when no phase selection is required.
494         pool[i].pup(*p);
495       } else {
496         // **FIXME** Might be a good idea to create a "filler" event block for
497         //   all the events taken out by phase filtering.
498         if (pool[i].type == END_PHASE) {
499           // always write phase markers
500           pool[i].pup(*p);
501           curPhase++;
502         } else if (pool[i].type == BEGIN_COMPUTATION ||
503                    pool[i].type == END_COMPUTATION) {
504           // always write BEGIN and END COMPUTATION markers
505           pool[i].pup(*p);
506         } else if (keepPhase[curPhase]) {
507           pool[i].pup(*p);
508         }
509       }
510     }
511     else {      // delta
512       // **FIXME** Implement phase-selective writing for delta logs
513       //   eventually
514       double time = pool[i].time;
515       if (pool[i].type != BEGIN_COMPUTATION && pool[i].type != END_COMPUTATION)
516       {
517         double timeDiff = (time-prevTime)*1.0e6;
518         UInt intTimeDiff = (UInt)timeDiff;
519         timeErr += timeDiff - intTimeDiff; /* timeErr is never >= 2.0 */
520         if (timeErr > 1.0) {
521           timeErr -= 1.0;
522           intTimeDiff++;
523         }
524         pool[i].time = intTimeDiff/1.0e6;
525       }
526       pool[i].pup(*p);
527       pool[i].time = time;      // restore time value
528       prevTime = time;
529     }
530   }
531   delete p;
532   delete [] keepPhase;
533 }
534
535 void LogPool::writeSts(void)
536 {
537   // for whining compilers
538   int i;
539   char name[30];
540   // generate an automatic unique ID for each log
541   fprintf(stsfp, "PROJECTIONS_ID %s\n", "");
542   fprintf(stsfp, "VERSION %s\n", PROJECTION_VERSION);
543   fprintf(stsfp, "TOTAL_PHASES %d\n", numPhases);
544 #if CMK_HAS_COUNTER_PAPI
545   fprintf(stsfp, "TOTAL_PAPI_EVENTS %d\n", numPAPIEvents);
546   // for now, use i, next time use papiEvents[i].
547   // **CW** papi event names is a hack.
548   for (i=0;i<numPAPIEvents;i++) {
549     fprintf(stsfp, "PAPI_EVENT %d %s\n", i, papiEventNames[i]);
550   }
551 #endif
552   traceWriteSTS(stsfp,CkpvAccess(usrEvents)->length());
553   for(i=0;i<CkpvAccess(usrEvents)->length();i++){
554     fprintf(stsfp, "EVENT %d %s\n", (*CkpvAccess(usrEvents))[i]->e, (*CkpvAccess(usrEvents))[i]->str);
555   }     
556 }
557
558 void LogPool::writeSts(TraceProjections *traceProj){
559   writeSts();
560   if (traceProj != NULL) {
561     CkHashtableIterator  *funcIter = traceProj->getfuncIterator();
562     funcIter->seekStart();
563     int numFuncs = traceProj->getFuncNumber();
564     fprintf(stsfp,"TOTAL_FUNCTIONS %d \n",numFuncs);
565     while(funcIter->hasNext()) {
566       StrKey *key;
567       int *obj = (int *)funcIter->next((void **)&key);
568       fprintf(stsfp,"FUNCTION %d %s \n",*obj,key->getStr());
569     }
570   }
571   fprintf(stsfp, "END\n");
572   fclose(stsfp);
573 }
574
575 void LogPool::writeRC(void)
576 {
577     //CkPrintf("write RC is being executed\n");
578 #ifdef PROJ_ANALYSIS  
579     CkAssert(CkMyPe() == 0);
580     fprintf(rcfp,"RC_GLOBAL_END_TIME %lld\n",
581           (CMK_TYPEDEF_UINT8)(1.0e6*globalEndTime));
582     /* //Yanhua comment it because isOutlierAutomatic is not a variable in trace
583     if (CkpvAccess(_trace)->isOutlierAutomatic()) {
584       fprintf(rcfp,"RC_OUTLIER_FILTERED true\n");
585     } else {
586       fprintf(rcfp,"RC_OUTLIER_FILTERED false\n");
587     }
588     */
589 #endif //PROJ_ANALYSIS
590   fclose(rcfp);
591 }
592
593
594 #if CMK_BLUEGENE_CHARM
595 static void updateProjLog(void *data, double t, double recvT, void *ptr)
596 {
597   LogEntry *log = (LogEntry *)data;
598   FILE *fp = *(FILE **)ptr;
599   log->time = t;
600   log->recvTime = recvT<0.0?0:recvT;
601 //  log->write(fp);
602   toProjectionsFile p(fp);
603   log->pup(p);
604 }
605 #endif
606
607 // flush log entries to disk
608 void LogPool::flushLogBuffer()
609 {
610   if (numEntries) {
611     double writeTime = TraceTimer();
612     writeLog();
613     hasFlushed = true;
614     numEntries = 0;
615     new (&pool[numEntries++]) LogEntry(writeTime, BEGIN_INTERRUPT);
616     new (&pool[numEntries++]) LogEntry(TraceTimer(), END_INTERRUPT);
617   }
618 }
619
620 void LogPool::add(UChar type, UShort mIdx, UShort eIdx,
621                   double time, int event, int pe, int ml, CmiObjId *id, 
622                   double recvT, double cpuT, int numPe)
623 {
624   new (&pool[numEntries++])
625     LogEntry(time, type, mIdx, eIdx, event, pe, ml, id, recvT, cpuT, numPe);
626   if ((type == END_PHASE) || (type == END_COMPUTATION)) {
627     numPhases++;
628   }
629   if(poolSize==numEntries) {
630     flushLogBuffer();
631 #if CMK_BLUEGENE_CHARM
632     extern int correctTimeLog;
633     if (correctTimeLog) CmiAbort("I/O interrupt!\n");
634 #endif
635   }
636 #if CMK_BLUEGENE_CHARM
637   switch (type) {
638     case BEGIN_PROCESSING:
639       pool[numEntries-1].recvTime = BgGetRecvTime();
640     case END_PROCESSING:
641     case BEGIN_COMPUTATION:
642     case END_COMPUTATION:
643     case CREATION:
644     case BEGIN_PACK:
645     case END_PACK:
646     case BEGIN_UNPACK:
647     case END_UNPACK:
648     case USER_EVENT_PAIR:
649       bgAddProjEvent(&pool[numEntries-1], numEntries-1, time, updateProjLog, &fp, BG_EVENT_PROJ);
650   }
651 #endif
652 }
653
654 void LogPool::add(UChar type,double time,UShort funcID,int lineNum,char *fileName){
655 #ifndef CMK_BLUEGENE_CHARM
656   new (&pool[numEntries++])
657         LogEntry(time,type,funcID,lineNum,fileName);
658   if(poolSize == numEntries){
659     flushLogBuffer();
660   }
661 #endif  
662 }
663
664
665   
666 void LogPool::addMemoryUsage(unsigned char type,double time,double memUsage){
667 #ifndef CMK_BLUEGENE_CHARM
668   new (&pool[numEntries++])
669         LogEntry(type,time,memUsage);
670   if(poolSize == numEntries){
671     flushLogBuffer();
672   }
673 #endif  
674         
675 }  
676
677
678
679 void LogPool::addUserSupplied(int data){
680         // add an event
681         add(USER_SUPPLIED, 0, 0, TraceTimer(), -1, -1, 0, 0, 0, 0, 0 );
682
683         // set the user supplied value for the previously created event 
684         pool[numEntries-1].setUserSuppliedData(data);
685   }
686
687
688 void LogPool::addUserSuppliedNote(char *note){
689         // add an event
690         add(USER_SUPPLIED_NOTE, 0, 0, TraceTimer(), -1, -1, 0, 0, 0, 0, 0 );
691
692         // set the user supplied note for the previously created event 
693         pool[numEntries-1].setUserSuppliedNote(note);
694   }
695
696 void LogPool::addUserSuppliedBracketedNote(char *note, int eventID, double bt, double et){
697   //CkPrintf("LogPool::addUserSuppliedBracketedNote eventID=%d\n", eventID);
698 #ifndef CMK_BLUEGENE_CHARM
699 #if CMK_SMP_TRACE_COMMTHREAD && MPI_SMP_TRACE_COMMTHREAD_HACK
700   //This part of code is used  to combine the contiguous
701   //MPI_Test and MPI_Iprobe events to reduce the number of
702   //entries
703 #define MPI_TEST_EVENT_ID 60
704 #define MPI_IPROBE_EVENT_ID 70 
705   int lastEvent = pool[numEntries-1].event;
706   if((eventID==MPI_TEST_EVENT_ID || eventID==MPI_IPROBE_EVENT_ID) && (eventID==lastEvent)){
707     //just replace the endtime of last event
708     //CkPrintf("addUserSuppliedBracketNote: for event %d\n", lastEvent);
709     pool[numEntries].endTime = et;
710   }else{
711     new (&pool[numEntries++])
712       LogEntry(bt, et, USER_SUPPLIED_BRACKETED_NOTE, note, eventID);
713   }
714 #else
715   new (&pool[numEntries++])
716     LogEntry(bt, et, USER_SUPPLIED_BRACKETED_NOTE, note, eventID);
717 #endif
718   if(poolSize == numEntries){
719     flushLogBuffer();
720   }
721 #endif  
722 }
723
724
725 /* **CW** Not sure if this is the right thing to do. Feels more like
726    a hack than a solution to Sameer's request to add the destination
727    processor information to multicasts and broadcasts.
728
729    In the unlikely event this method is used for Broadcasts as well,
730    pelist == NULL will be used to indicate a global broadcast with 
731    num PEs.
732 */
733 void LogPool::addCreationMulticast(UShort mIdx, UShort eIdx, double time,
734                                    int event, int pe, int ml, CmiObjId *id,
735                                    double recvT, int numPe, int *pelist)
736 {
737   new (&pool[numEntries++])
738     LogEntry(time, mIdx, eIdx, event, pe, ml, id, recvT, numPe, pelist);
739   if(poolSize==numEntries) {
740     flushLogBuffer();
741   }
742 }
743
744 void LogPool::postProcessLog()
745 {
746 #if CMK_BLUEGENE_CHARM
747   bgUpdateProj(1);   // event type
748 #endif
749 }
750
751 void LogPool::modLastEntryTimestamp(double ts)
752 {
753   pool[numEntries-1].time = ts;
754   //pool[numEntries-1].cputime = ts;
755 }
756
757 // /** Constructor for a multicast log entry */
758 // 
759 //  THIS WAS MOVED TO trace-projections.h with the other constructors
760 // 
761 // LogEntry::LogEntry(double tm, unsigned short m, unsigned short e, int ev, int p,
762 //           int ml, CmiObjId *d, double rt, int numPe, int *pelist) 
763 // {
764 //     type = CREATION_MULTICAST; mIdx = m; eIdx = e; event = ev; pe = p; time = tm; msglen = ml;
765 //     if (d) id = *d; else {id.id[0]=id.id[1]=id.id[2]=id.id[3]=-1; };
766 //     recvTime = rt; 
767 //     numpes = numPe;
768 //     userSuppliedNote = NULL;
769 //     if (pelist != NULL) {
770 //      pes = new int[numPe];
771 //      for (int i=0; i<numPe; i++) {
772 //        pes[i] = pelist[i];
773 //      }
774 //     } else {
775 //      pes= NULL;
776 //     }
777 // }
778
779 void LogEntry::addPapi(int numPapiEvts, int *papi_ids, LONG_LONG_PAPI *papiVals)
780 {
781 #if CMK_HAS_COUNTER_PAPI
782   numPapiEvents = numPapiEvts;
783   if (papiVals != NULL) {
784     papiIDs = new int[numPapiEvents];
785     papiValues = new LONG_LONG_PAPI[numPapiEvents];
786     for (int i=0; i<numPapiEvents; i++) {
787       papiIDs[i] = papi_ids[i];
788       papiValues[i] = papiVals[i];
789     }
790   }
791 #endif
792 }
793
794
795
796 void LogEntry::pup(PUP::er &p)
797 {
798   int i;
799   CMK_TYPEDEF_UINT8 itime, iEndTime, irecvtime, icputime;
800   char ret = '\n';
801
802   p|type;
803   if (p.isPacking()) itime = (CMK_TYPEDEF_UINT8)(1.0e6*time);
804   if (p.isPacking()) iEndTime = (CMK_TYPEDEF_UINT8)(1.0e6*endTime);
805
806   switch (type) {
807     case USER_EVENT:
808     case USER_EVENT_PAIR:
809       p|mIdx; p|itime; p|event; p|pe;
810       break;
811     case BEGIN_IDLE:
812     case END_IDLE:
813     case BEGIN_PACK:
814     case END_PACK:
815     case BEGIN_UNPACK:
816     case END_UNPACK:
817       p|itime; p|pe; 
818       break;
819     case BEGIN_PROCESSING:
820       if (p.isPacking()) {
821         irecvtime = (CMK_TYPEDEF_UINT8)(recvTime==-1?-1:1.0e6*recvTime);
822         icputime = (CMK_TYPEDEF_UINT8)(1.0e6*cputime);
823       }
824       p|mIdx; p|eIdx; p|itime; p|event; p|pe; 
825       p|msglen; p|irecvtime; 
826       p|id.id[0]; p|id.id[1]; p|id.id[2]; p|id.id[3];
827       p|icputime;
828 #if CMK_HAS_COUNTER_PAPI
829       p|numPapiEvents;
830       for (i=0; i<numPapiEvents; i++) {
831         // not yet!!!
832         //      p|papiIDs[i]; 
833         p|papiValues[i];
834         
835       }
836 #else
837       p|numPapiEvents;     // non papi version has value 0
838 #endif
839       if (p.isUnpacking()) {
840         recvTime = irecvtime/1.0e6;
841         cputime = icputime/1.0e6;
842       }
843       break;
844     case END_PROCESSING:
845       if (p.isPacking()) icputime = (CMK_TYPEDEF_UINT8)(1.0e6*cputime);
846       p|mIdx; p|eIdx; p|itime; p|event; p|pe; p|msglen; p|icputime;
847 #if CMK_HAS_COUNTER_PAPI
848       p|numPapiEvents;
849       for (i=0; i<numPapiEvents; i++) {
850         // not yet!!!
851         //      p|papiIDs[i];
852         p|papiValues[i];
853       }
854 #else
855       p|numPapiEvents;  // non papi version has value 0
856 #endif
857       if (p.isUnpacking()) cputime = icputime/1.0e6;
858       break;
859     case USER_SUPPLIED:
860           p|userSuppliedData;
861           p|itime;
862         break;
863     case USER_SUPPLIED_NOTE:
864           p|itime;
865           int length;
866           if (p.isPacking()) length = strlen(userSuppliedNote);
867           p | length;
868           char space;
869           space = ' ';
870           p | space;
871           if (p.isUnpacking()) {
872             userSuppliedNote = new char[length+1];
873             userSuppliedNote[length] = '\0';
874           }
875           PUParray(p,userSuppliedNote, length);
876           break;
877     case USER_SUPPLIED_BRACKETED_NOTE:
878       //CkPrintf("Writting out a USER_SUPPLIED_BRACKETED_NOTE\n");
879           p|itime;
880           p|iEndTime;
881           p|event;
882           int length2;
883           if (p.isPacking()) length2 = strlen(userSuppliedNote);
884           p | length2;
885           char space2;
886           space2 = ' ';
887           p | space2;
888           if (p.isUnpacking()) {
889             userSuppliedNote = new char[length+1];
890             userSuppliedNote[length] = '\0';
891           }
892           PUParray(p,userSuppliedNote, length2);
893           break;
894     case MEMORY_USAGE_CURRENT:
895       p | memUsage;
896       p | itime;
897         break;
898     case CREATION:
899       if (p.isPacking()) irecvtime = (CMK_TYPEDEF_UINT8)(1.0e6*recvTime);
900       p|mIdx; p|eIdx; p|itime;
901       p|event; p|pe; p|msglen; p|irecvtime;
902       if (p.isUnpacking()) recvTime = irecvtime/1.0e6;
903       break;
904     case CREATION_BCAST:
905       if (p.isPacking()) irecvtime = (CMK_TYPEDEF_UINT8)(1.0e6*recvTime);
906       p|mIdx; p|eIdx; p|itime;
907       p|event; p|pe; p|msglen; p|irecvtime; p|numpes;
908       if (p.isUnpacking()) recvTime = irecvtime/1.0e6;
909       break;
910     case CREATION_MULTICAST:
911       if (p.isPacking()) irecvtime = (CMK_TYPEDEF_UINT8)(1.0e6*recvTime);
912       p|mIdx; p|eIdx; p|itime;
913       p|event; p|pe; p|msglen; p|irecvtime; p|numpes;
914       if (p.isUnpacking()) pes = numpes?new int[numpes]:NULL;
915       for (i=0; i<numpes; i++) p|pes[i];
916       if (p.isUnpacking()) recvTime = irecvtime/1.0e6;
917       break;
918     case MESSAGE_RECV:
919       p|mIdx; p|eIdx; p|itime; p|event; p|pe; p|msglen;
920       break;
921
922     case ENQUEUE:
923     case DEQUEUE:
924       p|mIdx; p|itime; p|event; p|pe;
925       break;
926
927     case BEGIN_INTERRUPT:
928     case END_INTERRUPT:
929       p|itime; p|event; p|pe;
930       break;
931
932       // **CW** absolute timestamps are used here to support a quick
933       // way of determining the total time of a run in projections
934       // visualization.
935     case BEGIN_COMPUTATION:
936     case END_COMPUTATION:
937     case BEGIN_TRACE:
938     case END_TRACE:
939       p|itime;
940       break;
941     case BEGIN_FUNC:
942         p | itime;
943         p | mIdx;
944         p | event;
945         if(!p.isUnpacking()){
946                 p(fName,flen-1);
947         }
948         break;
949     case END_FUNC:
950         p | itime;
951         p | mIdx;
952         break;
953     case END_PHASE:
954       p|eIdx; // FIXME: actually the phase ID
955       p|itime;
956       break;
957     default:
958       CmiError("***Internal Error*** Wierd Event %d.\n", type);
959       break;
960   }
961   if (p.isUnpacking()) time = itime/1.0e6;
962   p|ret;
963 }
964
965 TraceProjections::TraceProjections(char **argv): 
966   curevent(0), inEntry(0), computationStarted(0), 
967         converseExit(0), endTime(0.0), traceNestedEvents(0),
968         currentPhaseID(0), lastPhaseEvent(NULL)
969 {
970   //  CkPrintf("Trace projections dummy constructor called on %d\n",CkMyPe());
971
972   if (CkpvAccess(traceOnPe) == 0) return;
973
974   CtvInitialize(int,curThreadEvent);
975   CkpvInitialize(CmiInt8, CtrLogBufSize);
976   CkpvAccess(CtrLogBufSize) = DefaultLogBufSize;
977   CtvAccess(curThreadEvent)=0;
978   if (CmiGetArgLongDesc(argv,"+logsize",&CkpvAccess(CtrLogBufSize), 
979                        "Log entries to buffer per I/O")) {
980     if (CkMyPe() == 0) {
981       CmiPrintf("Trace: logsize: %ld\n", CkpvAccess(CtrLogBufSize));
982     }
983   }
984   checknested = 
985     CmiGetArgFlagDesc(argv,"+checknested",
986                       "check projections nest begin end execute events");
987   traceNestedEvents = 
988     CmiGetArgFlagDesc(argv,"+tracenested",
989               "trace projections nest begin/end execute events");
990   int binary = 
991     CmiGetArgFlagDesc(argv,"+binary-trace",
992                       "Write log files in binary format");
993
994   CmiInt8 nSubdirs = 0;
995   CmiGetArgLongDesc(argv,"+trace-subdirs", &nSubdirs, "Number of subdirectories into which traces will be written");
996
997
998 #if CMK_PROJECTIONS_USE_ZLIB
999   int compressed = CmiGetArgFlagDesc(argv,"+gz-trace","Write log files pre-compressed with gzip");
1000 #else
1001   // consume the flag so there's no confusing
1002   CmiGetArgFlagDesc(argv,"+gz-trace",
1003                     "Write log files pre-compressed with gzip");
1004   if(CkMyPe() == 0) CkPrintf("Warning> gz-trace is not supported on this machine!\n");
1005 #endif
1006
1007   // **CW** default to non delta log encoding. The user may choose to do
1008   // create both logs (for debugging) or just the old log timestamping
1009   // (for compatibility).
1010   // Generating just the non delta log takes precedence over generating
1011   // both logs (if both arguments appear on the command line).
1012
1013   // switch to OLD log format until everything works // Gengbin
1014   nonDeltaLog = 1;
1015   deltaLog = 0;
1016   deltaLog = CmiGetArgFlagDesc(argv, "+logDelta",
1017                                   "Generate Delta encoded and simple timestamped log files");
1018
1019   _logPool = new LogPool(CkpvAccess(traceRoot));
1020   _logPool->setNumSubdirs(nSubdirs);
1021   _logPool->setBinary(binary);
1022 #if CMK_PROJECTIONS_USE_ZLIB
1023   _logPool->setCompressed(compressed);
1024 #endif
1025   if (CkMyPe() == 0) {
1026     _logPool->createSts();
1027     _logPool->createRC();
1028   }
1029   funcCount=1;
1030
1031 #if CMK_HAS_COUNTER_PAPI
1032   // We initialize and create the event sets for use with PAPI here.
1033   int papiRetValue = PAPI_library_init(PAPI_VER_CURRENT);
1034   if (papiRetValue != PAPI_VER_CURRENT) {
1035     CmiAbort("PAPI Library initialization failure!\n");
1036   }
1037   // PAPI 3 mandates the initialization of the set to PAPI_NULL
1038   papiEventSet = PAPI_NULL; 
1039   if (PAPI_create_eventset(&papiEventSet) != PAPI_OK) {
1040     CmiAbort("PAPI failed to create event set!\n");
1041   }
1042   papiRetValue = PAPI_add_events(papiEventSet, papiEvents, numPAPIEvents);
1043   if (papiRetValue != PAPI_OK) {
1044     if (papiRetValue == PAPI_ECNFLCT) {
1045       CmiAbort("PAPI events conflict! Please re-assign event types!\n");
1046     } else {
1047       CmiAbort("PAPI failed to add designated events!\n");
1048     }
1049   }
1050   papiValues = new long_long[numPAPIEvents];
1051   memset(papiValues, 0, numPAPIEvents*sizeof(long_long));
1052 #endif
1053 }
1054
1055 int TraceProjections::traceRegisterUserEvent(const char* evt, int e)
1056 {
1057   OPTIMIZED_VERSION
1058   CkAssert(e==-1 || e>=0);
1059   CkAssert(evt != NULL);
1060   int event;
1061   int biggest = -1;
1062   for (int i=0; i<CkpvAccess(usrEvents)->length(); i++) {
1063     int cur = (*CkpvAccess(usrEvents))[i]->e;
1064     if (cur == e) {
1065       //CmiPrintf("%s %s\n", (*CkpvAccess(usrEvents))[i]->str, evt);
1066       if (strcmp((*CkpvAccess(usrEvents))[i]->str, evt) == 0) 
1067         return e;
1068       else
1069         CmiAbort("UserEvent double registered!");
1070     }
1071     if (cur > biggest) biggest = cur;
1072   }
1073   // if no user events have so far been registered. biggest will be -1
1074   // and hence newly assigned event numbers will begin from 0.
1075   if (e==-1) event = biggest+1;  // automatically assign new event number
1076   else event = e;
1077   CkpvAccess(usrEvents)->push_back(new UsrEvent(event,(char *)evt));
1078   return event;
1079 }
1080
1081 void TraceProjections::traceClearEps(void)
1082 {
1083   // In trace-summary, this zeros out the EP bins, to eliminate noise
1084   // from startup.  Here, this isn't useful, since we can do that in
1085   // post-processing
1086 }
1087
1088 void TraceProjections::traceWriteSts(void)
1089 {
1090   if(CkMyPe()==0)
1091     _logPool->writeSts(this);
1092 }
1093
1094 /** 
1095  * **IMPT NOTES**:
1096  *
1097  * This is called when Converse closes during ConverseCommonExit().
1098  * **FIXME**(?) - is this also exposed as a tracing-framework API call?
1099  *
1100  * Some programs bypass CkExit() (like NAMD, which eventually calls
1101  * ConverseExit()), modules like traces will have to pretend to shutdown
1102  * as if CkExit() was called but at the same time avoid making
1103  * subsequent CkExit() calls (which is usually required for allowing
1104  * other modules to shutdown).
1105  *
1106  * Note that we can only get here if CkExit() was not called, since the
1107  * trace module will un-register itself from TraceArray if it did.
1108  *
1109  */
1110 void TraceProjections::traceClose(void)
1111 {
1112 #ifdef PROJ_ANALYSIS
1113   // CkPrintf("CkExit was not called on shutdown on [%d]\n", CkMyPe());
1114
1115   // sets the flag that tells the code not to make the CkExit call later
1116   converseExit = 1;
1117   if (CkMyPe() == 0) {
1118     CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1119     bocProxy.traceProjectionsParallelShutdown(-1);
1120   }
1121   if(CkMyRank() == CkMyNodeSize()){ //communication thread
1122     CkpvAccess(_trace)->endComputation();
1123     delete _logPool;              // will write
1124     // remove myself from traceArray so that no tracing will be called.
1125     CkpvAccess(_traces)->removeTrace(this);
1126   }
1127 #else
1128   // we've already deleted the logpool, so multiple calls to traceClose
1129   // are tolerated.
1130   if (_logPool == NULL) {
1131     return;
1132   }
1133   if(CkMyPe()==0){
1134     _logPool->writeSts(this);
1135   }
1136   CkpvAccess(_trace)->endComputation();
1137   delete _logPool;              // will write
1138   // remove myself from traceArray so that no tracing will be called.
1139   CkpvAccess(_traces)->removeTrace(this);
1140 #endif
1141 }
1142
1143 /**
1144  *  **IMPT NOTES**:
1145  *
1146  *  This is meant to be called internally by the tracing framework.
1147  *
1148  */
1149 void TraceProjections::closeTrace() {
1150   //  CkPrintf("Close Trace called on [%d]\n", CkMyPe());
1151   if (CkMyPe() == 0) {
1152     // CkPrintf("Pe 0 will now write sts and projrc files\n");
1153     _logPool->writeSts(this);
1154     _logPool->writeRC();
1155     // CkPrintf("Pe 0 has now written sts and projrc files\n");
1156   }
1157   delete _logPool;       // will write logs to file
1158 }
1159
1160 #if CMK_SMP_TRACE_COMMTHREAD
1161 void TraceProjections::traceBeginOnCommThread()
1162 {
1163   if (!computationStarted) return;
1164   _logPool->add(BEGIN_TRACE, 0, 0, TraceTimer(), curevent++, CmiNumPes()+CmiMyNode());
1165 }
1166
1167 void TraceProjections::traceEndOnCommThread()
1168 {
1169   _logPool->add(END_TRACE, 0, 0, TraceTimer(), curevent++, CmiNumPes()+CmiMyNode());
1170 }
1171 #endif
1172
1173 void TraceProjections::traceBegin(void)
1174 {
1175   if (!computationStarted) return;
1176   _logPool->add(BEGIN_TRACE, 0, 0, TraceTimer(), curevent++, CkMyPe());
1177 }
1178
1179 void TraceProjections::traceEnd(void)
1180 {
1181   _logPool->add(END_TRACE, 0, 0, TraceTimer(), curevent++, CkMyPe());
1182 }
1183
1184 void TraceProjections::userEvent(int e)
1185 {
1186   if (!computationStarted) return;
1187   _logPool->add(USER_EVENT, e, 0, TraceTimer(),curevent++,CkMyPe());
1188 }
1189
1190 void TraceProjections::userBracketEvent(int e, double bt, double et)
1191 {
1192   if (!computationStarted) return;
1193   // two events record Begin/End of event e.
1194   _logPool->add(USER_EVENT_PAIR, e, 0, TraceTimer(bt), curevent, CkMyPe());
1195   _logPool->add(USER_EVENT_PAIR, e, 0, TraceTimer(et), curevent++, CkMyPe());
1196 }
1197
1198 void TraceProjections::userSuppliedData(int d)
1199 {
1200   if (!computationStarted) return;
1201   _logPool->addUserSupplied(d);
1202 }
1203
1204 void TraceProjections::userSuppliedNote(char *note)
1205 {
1206   if (!computationStarted) return;
1207   _logPool->addUserSuppliedNote(note);
1208 }
1209
1210
1211 void TraceProjections::userSuppliedBracketedNote(char *note, int eventID, double bt, double et)
1212 {
1213   if (!computationStarted) return;
1214   _logPool->addUserSuppliedBracketedNote(note,  eventID,  bt, et);
1215 }
1216
1217 void TraceProjections::memoryUsage(double m)
1218 {
1219   if (!computationStarted) return;
1220   _logPool->addMemoryUsage(MEMORY_USAGE_CURRENT, TraceTimer(), m );
1221   
1222 }
1223
1224
1225 void TraceProjections::creation(envelope *e, int ep, int num)
1226 {
1227   double curTime = TraceTimer();
1228   if (e == 0) {
1229     CtvAccess(curThreadEvent) = curevent;
1230     _logPool->add(CREATION, ForChareMsg, ep, curTime,
1231                   curevent++, CkMyPe(), 0, NULL, 0, 0.0);
1232   } else {
1233     int type=e->getMsgtype();
1234     e->setEvent(curevent);
1235     if (num > 1) {
1236       _logPool->add(CREATION_BCAST, type, ep, curTime,
1237                     curevent++, CkMyPe(), e->getTotalsize(), 
1238                     NULL, 0, 0.0, num);
1239     } else {
1240       _logPool->add(CREATION, type, ep, curTime,
1241                     curevent++, CkMyPe(), e->getTotalsize(), 
1242                     NULL, 0, 0.0);
1243     }
1244   }
1245 }
1246
1247 /* **CW** Non-disruptive attempt to add destination PE knowledge to
1248    Communication Library-specific Multicasts via new event 
1249    CREATION_MULTICAST.
1250 */
1251
1252 void TraceProjections::creationMulticast(envelope *e, int ep, int num,
1253                                          int *pelist)
1254 {
1255   double curTime = TraceTimer();
1256   if (e==0) {
1257     CtvAccess(curThreadEvent)=curevent;
1258     _logPool->addCreationMulticast(ForChareMsg, ep, curTime, curevent++,
1259                                    CkMyPe(), 0, 0, 0.0, num, pelist);
1260   } else {
1261     int type=e->getMsgtype();
1262     e->setEvent(curevent);
1263     _logPool->addCreationMulticast(type, ep, curTime, curevent++, CkMyPe(),
1264                                    e->getTotalsize(), 0, 0.0, num, pelist);
1265   }
1266 }
1267
1268 void TraceProjections::creationDone(int num)
1269 {
1270   // modified the creation done time of the last num log entries
1271   // FIXME: totally a hack
1272   double curTime = TraceTimer();
1273   int idx = _logPool->numEntries-1;
1274   while (idx >=0 && num >0 ) {
1275     LogEntry &log = _logPool->pool[idx];
1276     if ((log.type == CREATION) ||
1277         (log.type == CREATION_BCAST) ||
1278         (log.type == CREATION_MULTICAST)) {
1279       log.recvTime = curTime - log.time;
1280       num --;
1281     }
1282     idx--;
1283   }
1284 }
1285
1286 void TraceProjections::beginExecute(CmiObjId *tid)
1287 {
1288 #if CMK_HAS_COUNTER_PAPI
1289   if (PAPI_read(papiEventSet, papiValues) != PAPI_OK) {
1290     CmiAbort("PAPI failed to read at begin execute!\n");
1291   }
1292 #endif
1293   if (checknested && inEntry) CmiAbort("Nested Begin Execute!\n");
1294   execEvent = CtvAccess(curThreadEvent);
1295   execEp = (-1);
1296   _logPool->add(BEGIN_PROCESSING,ForChareMsg,_threadEP,TraceTimer(),
1297                 execEvent,CkMyPe(), 0, tid);
1298 #if CMK_HAS_COUNTER_PAPI
1299   _logPool->addPapi(numPAPIEvents, papiEvents, papiValues);
1300 #endif
1301   inEntry = 1;
1302 }
1303
1304 void TraceProjections::beginExecute(envelope *e)
1305 {
1306   if(e==0) {
1307 #if CMK_HAS_COUNTER_PAPI
1308     if (PAPI_read(papiEventSet, papiValues) != PAPI_OK) {
1309       CmiAbort("PAPI failed to read at begin execute!\n");
1310     }
1311 #endif
1312     if (checknested && inEntry) CmiAbort("Nested Begin Execute!\n");
1313     execEvent = CtvAccess(curThreadEvent);
1314     execEp = (-1);
1315     _logPool->add(BEGIN_PROCESSING,ForChareMsg,_threadEP,TraceTimer(),
1316                   execEvent,CkMyPe(), 0, NULL, 0.0, TraceCpuTimer());
1317 #if CMK_HAS_COUNTER_PAPI
1318     _logPool->addPapi(numPAPIEvents, papiEvents, papiValues);
1319 #endif
1320     inEntry = 1;
1321   } else {
1322     beginExecute(e->getEvent(),e->getMsgtype(),e->getEpIdx(),
1323                  e->getSrcPe(),e->getTotalsize());
1324   }
1325 }
1326
1327 void TraceProjections::beginExecute(int event, int msgType, int ep, int srcPe,
1328                                     int mlen, CmiObjId *idx)
1329 {
1330   if (traceNestedEvents) {
1331     if (! nestedEvents.isEmpty()) {
1332       endExecuteLocal();
1333     }
1334     nestedEvents.enq(NestedEvent(event, msgType, ep, srcPe, mlen, idx));
1335   }
1336   beginExecuteLocal(event, msgType, ep, srcPe, mlen, idx);
1337 }
1338
1339 void TraceProjections::changeLastEntryTimestamp(double ts)
1340 {
1341   _logPool->modLastEntryTimestamp(ts);
1342 }
1343
1344 void TraceProjections::beginExecuteLocal(int event, int msgType, int ep, int srcPe,
1345                                     int mlen, CmiObjId *idx)
1346 {
1347 #if CMK_HAS_COUNTER_PAPI
1348   if (PAPI_read(papiEventSet, papiValues) != PAPI_OK) {
1349     CmiAbort("PAPI failed to read at begin execute!\n");
1350   }
1351 #endif
1352   if (checknested && inEntry) CmiAbort("Nested Begin Execute!\n");
1353   execEvent=event;
1354   execEp=ep;
1355   execPe=srcPe;
1356   _logPool->add(BEGIN_PROCESSING,msgType,ep,TraceTimer(),event,
1357                 srcPe, mlen, idx, 0.0, TraceCpuTimer());
1358 #if CMK_HAS_COUNTER_PAPI
1359   _logPool->addPapi(numPAPIEvents, papiEvents, papiValues);
1360 #endif
1361   inEntry = 1;
1362 }
1363
1364 void TraceProjections::endExecute(void)
1365 {
1366   if (traceNestedEvents) nestedEvents.deq();
1367   endExecuteLocal();
1368   if (traceNestedEvents) {
1369     if (! nestedEvents.isEmpty()) {
1370       NestedEvent &ne = nestedEvents.peek();
1371       beginExecuteLocal(ne.event, ne.msgType, ne.ep, ne.srcPe, ne.ml, ne.idx);
1372     }
1373   }
1374 }
1375
1376 void TraceProjections::endExecuteLocal(void)
1377 {
1378 #if CMK_HAS_COUNTER_PAPI
1379   if (PAPI_read(papiEventSet, papiValues) != PAPI_OK) {
1380     CmiAbort("PAPI failed to read at end execute!\n");
1381   }
1382 #endif
1383   if (checknested && !inEntry) CmiAbort("Nested EndExecute!\n");
1384   double cputime = TraceCpuTimer();
1385   if(execEp == (-1)) {
1386     _logPool->add(END_PROCESSING, 0, _threadEP, TraceTimer(),
1387                   execEvent, CkMyPe(), 0, NULL, 0.0, cputime);
1388   } else {
1389     _logPool->add(END_PROCESSING, 0, execEp, TraceTimer(),
1390                   execEvent, execPe, 0, NULL, 0.0, cputime);
1391   }
1392 #if CMK_HAS_COUNTER_PAPI
1393   _logPool->addPapi(numPAPIEvents, papiEvents, papiValues);
1394 #endif
1395   inEntry = 0;
1396 }
1397
1398 void TraceProjections::messageRecv(char *env, int pe)
1399 {
1400 #if 0
1401   envelope *e = (envelope *)env;
1402   int msgType = e->getMsgtype();
1403   int ep = e->getEpIdx();
1404 #if 0
1405   if (msgType==NewChareMsg || msgType==NewVChareMsg
1406           || msgType==ForChareMsg || msgType==ForVidMsg
1407           || msgType==BocInitMsg || msgType==NodeBocInitMsg
1408           || msgType==ForBocMsg || msgType==ForNodeBocMsg)
1409     ep = e->getEpIdx();
1410   else
1411     ep = _threadEP;
1412 #endif
1413   _logPool->add(MESSAGE_RECV, msgType, ep, TraceTimer(),
1414                 curevent++, e->getSrcPe(), e->getTotalsize());
1415 #endif
1416 }
1417
1418 void TraceProjections::beginIdle(double curWallTime)
1419 {
1420   _logPool->add(BEGIN_IDLE, 0, 0, TraceTimer(curWallTime), 0, CkMyPe());
1421 }
1422
1423 void TraceProjections::endIdle(double curWallTime)
1424 {
1425   _logPool->add(END_IDLE, 0, 0, TraceTimer(curWallTime), 0, CkMyPe());
1426 }
1427
1428 void TraceProjections::beginPack(void)
1429 {
1430   _logPool->add(BEGIN_PACK, 0, 0, TraceTimer(), 0, CkMyPe());
1431 }
1432
1433 void TraceProjections::endPack(void)
1434 {
1435   _logPool->add(END_PACK, 0, 0, TraceTimer(), 0, CkMyPe());
1436 }
1437
1438 void TraceProjections::beginUnpack(void)
1439 {
1440   _logPool->add(BEGIN_UNPACK, 0, 0, TraceTimer(), 0, CkMyPe());
1441 }
1442
1443 void TraceProjections::endUnpack(void)
1444 {
1445   _logPool->add(END_UNPACK, 0, 0, TraceTimer(), 0, CkMyPe());
1446 }
1447
1448 void TraceProjections::enqueue(envelope *) {}
1449
1450 void TraceProjections::dequeue(envelope *) {}
1451
1452 void TraceProjections::beginComputation(void)
1453 {
1454   computationStarted = 1;
1455
1456   // Executes the callback function provided by the machine
1457   // layer. This is the proper method to register user events in a
1458   // machine layer because projections is a charm++ module.
1459   if (CkpvAccess(traceOnPe) != 0) {
1460     void (*ptr)() = registerMachineUserEvents();
1461     if (ptr != NULL) {
1462       ptr();
1463     }
1464   }
1465 //  CkpvAccess(traceInitTime) = TRACE_TIMER();
1466 //  CkpvAccess(traceInitCpuTime) = TRACE_CPUTIMER();
1467   _logPool->add(BEGIN_COMPUTATION, 0, 0, TraceTimer(), -1, -1);
1468 #if CMK_HAS_COUNTER_PAPI
1469   // we start the counters here
1470   if (PAPI_start(papiEventSet) != PAPI_OK) {
1471     CmiAbort("PAPI failed to start designated counters!\n");
1472   }
1473 #endif
1474 }
1475
1476 void TraceProjections::endComputation(void)
1477 {
1478 #if CMK_HAS_COUNTER_PAPI
1479   // we stop the counters here. A silent failure is alright since we
1480   // are already at the end of the program.
1481   if (PAPI_stop(papiEventSet, papiValues) != PAPI_OK) {
1482     CkPrintf("Warning: PAPI failed to stop correctly!\n");
1483   }
1484   // NOTE: We should not do a complete close of PAPI until after the
1485   // sts writer is done.
1486 #endif
1487   endTime = TraceTimer();
1488   _logPool->add(END_COMPUTATION, 0, 0, endTime, -1, -1);
1489   /*
1490   CkPrintf("End Computation [%d] records time as %lf\n", CkMyPe(), 
1491            endTime*1e06);
1492   */
1493 }
1494
1495 int TraceProjections::idxRegistered(int idx)
1496 {
1497     int idxVecLen = idxVec.size();
1498     for(int i=0; i<idxVecLen; i++)
1499     {
1500         if(idx == idxVec[i])
1501             return 1;
1502     }
1503     return 0;
1504 }
1505
1506 void TraceProjections::regFunc(const char *name, int &idx, int idxSpecifiedByUser){
1507     StrKey k((char*)name,strlen(name));
1508     int num = funcHashtable.get(k);
1509     
1510     if(num!=0) {
1511         return;
1512         //as for mpi programs, the same function may be registered for several times
1513         //CmiError("\"%s has been already registered! Please change the name!\"\n", name);
1514     }
1515     
1516     int isIdxExisting=0;
1517     if(idxSpecifiedByUser)
1518         isIdxExisting=idxRegistered(idx);
1519     if(isIdxExisting){
1520         return;
1521         //same reason with num!=0
1522         //CmiError("The identifier %d for the trace function has been already registered!", idx);
1523     }
1524
1525     if(idxSpecifiedByUser) {
1526         char *st = new char[strlen(name)+1];
1527         memcpy(st,name,strlen(name)+1);
1528         StrKey *newKey = new StrKey(st,strlen(st));
1529         int &ref = funcHashtable.put(*newKey);
1530         ref=idx;
1531         funcCount++;
1532         idxVec.push_back(idx);  
1533     } else {
1534         char *st = new char[strlen(name)+1];
1535         memcpy(st,name,strlen(name)+1);
1536         StrKey *newKey = new StrKey(st,strlen(st));
1537         int &ref = funcHashtable.put(*newKey);
1538         ref=funcCount;
1539         num = funcCount;
1540         funcCount++;
1541         idx = num;
1542         idxVec.push_back(idx);
1543     }
1544 }
1545
1546 void TraceProjections::beginFunc(char *name,char *file,int line){
1547         StrKey k(name,strlen(name));    
1548         unsigned short  num = (unsigned short)funcHashtable.get(k);
1549         beginFunc(num,file,line);
1550 }
1551
1552 void TraceProjections::beginFunc(int idx,char *file,int line){
1553         if(idx <= 0){
1554                 CmiError("Unregistered function id %d being used in %s:%d \n",idx,file,line);
1555         }       
1556         _logPool->add(BEGIN_FUNC,TraceTimer(),idx,line,file);
1557 }
1558
1559 void TraceProjections::endFunc(char *name){
1560         StrKey k(name,strlen(name));    
1561         int num = funcHashtable.get(k);
1562         endFunc(num);   
1563 }
1564
1565 void TraceProjections::endFunc(int num){
1566         if(num <= 0){
1567                 printf("endFunc without start :O\n");
1568         }
1569         _logPool->add(END_FUNC,TraceTimer(),num,0,NULL);
1570 }
1571
1572 // specialized PUP:ers for handling trace projections logs
1573 void toProjectionsFile::bytes(void *p,int n,size_t itemSize,dataType t)
1574 {
1575   for (int i=0;i<n;i++) 
1576     switch(t) {
1577     case Tchar: CheckAndFPrintF(f,"%c",((char *)p)[i]); break;
1578     case Tuchar:
1579     case Tbyte: CheckAndFPrintF(f,"%d",((unsigned char *)p)[i]); break;
1580     case Tshort: CheckAndFPrintF(f," %d",((short *)p)[i]); break;
1581     case Tushort: CheckAndFPrintF(f," %u",((unsigned short *)p)[i]); break;
1582     case Tint: CheckAndFPrintF(f," %d",((int *)p)[i]); break;
1583     case Tuint: CheckAndFPrintF(f," %u",((unsigned int *)p)[i]); break;
1584     case Tlong: CheckAndFPrintF(f," %ld",((long *)p)[i]); break;
1585     case Tulong: CheckAndFPrintF(f," %lu",((unsigned long *)p)[i]); break;
1586     case Tfloat: CheckAndFPrintF(f," %.7g",((float *)p)[i]); break;
1587     case Tdouble: CheckAndFPrintF(f," %.15g",((double *)p)[i]); break;
1588 #ifdef CMK_PUP_LONG_LONG
1589     case Tlonglong: CheckAndFPrintF(f," %lld",((CMK_TYPEDEF_INT8 *)p)[i]); break;
1590     case Tulonglong: CheckAndFPrintF(f," %llu",((CMK_TYPEDEF_UINT8 *)p)[i]); break;
1591 #endif
1592     default: CmiAbort("Unrecognized pup type code!");
1593     };
1594 }
1595
1596 void fromProjectionsFile::bytes(void *p,int n,size_t itemSize,dataType t)
1597 {
1598   for (int i=0;i<n;i++) 
1599     switch(t) {
1600     case Tchar: { 
1601       char c = fgetc(f);
1602       if (c==EOF)
1603         parseError("Could not match character");
1604       else
1605         ((char *)p)[i] = c;
1606       break;
1607     }
1608     case Tuchar:
1609     case Tbyte: ((unsigned char *)p)[i]=(unsigned char)readInt("%d"); break;
1610     case Tshort:((short *)p)[i]=(short)readInt(); break;
1611     case Tushort: ((unsigned short *)p)[i]=(unsigned short)readUint(); break;
1612     case Tint:  ((int *)p)[i]=readInt(); break;
1613     case Tuint: ((unsigned int *)p)[i]=readUint(); break;
1614     case Tlong: ((long *)p)[i]=readInt(); break;
1615     case Tulong:((unsigned long *)p)[i]=readUint(); break;
1616     case Tfloat: ((float *)p)[i]=(float)readDouble(); break;
1617     case Tdouble:((double *)p)[i]=readDouble(); break;
1618 #ifdef CMK_PUP_LONG_LONG
1619     case Tlonglong: ((CMK_TYPEDEF_INT8 *)p)[i]=readLongInt(); break;
1620     case Tulonglong: ((CMK_TYPEDEF_UINT8 *)p)[i]=readLongInt(); break;
1621 #endif
1622     default: CmiAbort("Unrecognized pup type code!");
1623     };
1624 }
1625
1626 #if CMK_PROJECTIONS_USE_ZLIB
1627 void toProjectionsGZFile::bytes(void *p,int n,size_t itemSize,dataType t)
1628 {
1629   for (int i=0;i<n;i++) 
1630     switch(t) {
1631     case Tchar: gzprintf(f,"%c",((char *)p)[i]); break;
1632     case Tuchar:
1633     case Tbyte: gzprintf(f,"%d",((unsigned char *)p)[i]); break;
1634     case Tshort: gzprintf(f," %d",((short *)p)[i]); break;
1635     case Tushort: gzprintf(f," %u",((unsigned short *)p)[i]); break;
1636     case Tint: gzprintf(f," %d",((int *)p)[i]); break;
1637     case Tuint: gzprintf(f," %u",((unsigned int *)p)[i]); break;
1638     case Tlong: gzprintf(f," %ld",((long *)p)[i]); break;
1639     case Tulong: gzprintf(f," %lu",((unsigned long *)p)[i]); break;
1640     case Tfloat: gzprintf(f," %.7g",((float *)p)[i]); break;
1641     case Tdouble: gzprintf(f," %.15g",((double *)p)[i]); break;
1642 #ifdef CMK_PUP_LONG_LONG
1643     case Tlonglong: gzprintf(f," %lld",((CMK_TYPEDEF_INT8 *)p)[i]); break;
1644     case Tulonglong: gzprintf(f," %llu",((CMK_TYPEDEF_UINT8 *)p)[i]); break;
1645 #endif
1646     default: CmiAbort("Unrecognized pup type code!");
1647     };
1648 }
1649 #endif
1650
1651 void TraceProjections::endPhase() {
1652   double currentPhaseTime = TraceTimer();
1653   double lastPhaseTime = 0.0;
1654
1655   if (lastPhaseEvent != NULL) {
1656     lastPhaseTime = lastPhaseEvent->time;
1657   } else {
1658     if (_logPool->pool != NULL) {
1659       // assumed to be BEGIN_COMPUTATION
1660       lastPhaseTime = _logPool->pool[0].time;
1661     } else {
1662       CkPrintf("[%d] Warning: End Phase encountered in an empty log. Inserting BEGIN_COMPUTATION event\n", CkMyPe());
1663       _logPool->add(BEGIN_COMPUTATION, 0, 0, currentPhaseTime, -1, -1);
1664       lastPhaseTime = currentPhaseTime;
1665     }
1666   }
1667
1668   /* Insert endPhase event here. */
1669   /* FIXME: Format should be TYPE, PHASE#, TimeStamp, [StartTime] */
1670   /*        We currently "borrow" from the standard add() method. */
1671   /*        It should really be its own add() method.             */
1672   /* NOTE: assignment to lastPhaseEvent is "pre-emptive".         */
1673   lastPhaseEvent = &(_logPool->pool[_logPool->numEntries]);
1674   _logPool->add(END_PHASE, 0, currentPhaseID, currentPhaseTime, -1, CkMyPe());
1675   currentPhaseID++;
1676 }
1677
1678 #ifdef PROJ_ANALYSIS
1679 // ***** FROM HERE, ALL BOC-BASED FUNCTIONALITY IS DEFINED *******
1680
1681
1682 // ***@@@@ REGISTRATION FUNCTIONS/METHODS @@@@***
1683
1684 void registerOutlierReduction() {
1685   outlierReductionType =
1686     CkReduction::addReducer(outlierReduction);
1687   minMaxReductionType =
1688     CkReduction::addReducer(minMaxReduction);
1689 }
1690
1691 /**
1692  * **IMPT NOTES**:
1693  *
1694  * This is the C++ code that is registered to be activated at module
1695  * shutdown. This is called exactly once on processor 0. Module shutdown
1696  * is initiated as a result of a CkExit() call by the application code
1697  * 
1698  * The exit function must ultimately call CkExit() again to
1699  * so that other module exit functions may proceed after this module is
1700  * done.
1701  *
1702  */
1703 // FIXME: WHY extern "C"???
1704 extern "C" void TraceProjectionsExitHandler()
1705 {
1706 #if CMK_TRACE_ENABLED
1707   // CkPrintf("[%d] TraceProjectionsExitHandler called!\n", CkMyPe());
1708   CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1709   bocProxy.traceProjectionsParallelShutdown(CkMyPe());
1710 #else
1711   CkExit();
1712 #endif
1713 }
1714
1715 // This is called once on each processor but the idiom of use appears
1716 // to be to only have processor 0 register the function.
1717 //
1718 // See initnode in trace-projections.ci
1719 void initTraceProjectionsBOC()
1720 {
1721   // CkPrintf("[%d] Trace Projections initialization called!\n", CkMyPe());
1722 #ifdef __BLUEGENE__
1723   if (BgNodeRank() == 0) {
1724 #else
1725     if (CkMyRank() == 0) {
1726 #endif
1727       registerExitFn(TraceProjectionsExitHandler);
1728     }
1729 #if 0
1730   } // this is so indentation does not get messed up
1731 #endif
1732 }
1733
1734 // mainchare for trace-projections BOC-operations. 
1735 // Instantiated at processor 0 and ONLY resides on processor 0 for the 
1736 // rest of its life.
1737 //
1738 // Responsible for:
1739 //   1. Handling commandline arguments
1740 //   2. Creating any objects required for proper BOC operations.
1741 //
1742 TraceProjectionsInit::TraceProjectionsInit(CkArgMsg *msg) {
1743   /** Options for Outlier Analysis */
1744   // defaults. Things will change with support for interactive analysis.
1745   bool findOutliers = false;
1746   bool outlierAutomatic = true;
1747   int numKSeeds = 10; 
1748
1749   int peNumKeep = CkNumPes();  // used as a default
1750   double entryThreshold = 0.0;
1751   bool outlierUsePhases = false;
1752   if (outlierAutomatic) {
1753     CmiGetArgIntDesc(msg->argv, "+outlierNumSeeds", &numKSeeds,
1754                      "Number of cluster seeds to apply at outlier analysis.");
1755     CmiGetArgIntDesc(msg->argv, "+outlierPeNumKeep", 
1756                      &peNumKeep, "Number of Processors to retain data");
1757     CmiGetArgDoubleDesc(msg->argv, "+outlierEpThresh", &entryThreshold,
1758                         "Minimum significance of entry points to be considered for clustering (%).");
1759     findOutliers =
1760       CmiGetArgFlagDesc(msg->argv,"+outlier", "Find Outliers.");
1761     outlierUsePhases = 
1762       CmiGetArgFlagDesc(msg->argv,"+outlierUsePhases",
1763                         "Apply automatic outlier analysis to any available phases.");
1764     if (outlierUsePhases) {
1765       // if the user wants to use an outlier feature, it is assumed outlier
1766       //    analysis is desired.
1767       findOutliers = true;
1768     }
1769   }
1770   traceProjectionsGID = CProxy_TraceProjectionsBOC::ckNew(findOutliers);
1771   if (findOutliers) {
1772     kMeansGID = CProxy_KMeansBOC::ckNew(outlierAutomatic,
1773                                         numKSeeds,
1774                                         peNumKeep,
1775                                         entryThreshold,
1776                                         outlierUsePhases);
1777   }
1778 }
1779
1780 // Called on every processor.
1781 void TraceProjectionsBOC::traceProjectionsParallelShutdown(int pe) {
1782   //CmiPrintf("[%d] traceProjectionsParallelShutdown called from . \n", CkMyPe(), pe);
1783   endPe = pe;                // the pe that starts CkExit()
1784   if (CkMyPe() == 0) {
1785     analysisStartTime = CmiWallTimer();
1786   }
1787   CkpvAccess(_trace)->endComputation();
1788   // no more tracing for projections on this processor after this point. 
1789   // Note that clear must be called after remove, or bad things will happen.
1790   CkpvAccess(_traces)->removeTrace(CkpvAccess(_trace));
1791   CkpvAccess(_traces)->clearTrace();
1792
1793   // From this point, we start multiple chains of reductions and broadcasts to
1794   // perform final online analysis activities.
1795
1796   // Start all parallel operations at once. 
1797   //   These MUST NOT modify base performance data in LogPool. If they must,
1798   //   then the parallel operations must be phased (and this code to be
1799   //   restructured as necessary)
1800   CProxy_KMeansBOC kMeansProxy(kMeansGID);
1801   CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1802   if (findOutliers) {
1803     parModulesRemaining++;
1804     kMeansProxy[CkMyPe()].startKMeansAnalysis();
1805   }
1806   parModulesRemaining++;
1807   bocProxy[CkMyPe()].startEndTimeAnalysis();
1808 }
1809
1810 // Called on each processor
1811 void KMeansBOC::startKMeansAnalysis() {
1812   // Initialize all necessary structures
1813   LogPool *pool = CkpvAccess(_trace)->_logPool;
1814
1815  if(CkMyPe()==0)     CkPrintf("[%d] KMeansBOC::startKMeansAnalysis time=\t%g\n", CkMyPe(), CkWallTimer() );
1816   int flushInt = 0;
1817   if (pool->hasFlushed) {
1818     flushInt = 1;
1819   }
1820   
1821   CkCallback cb(CkIndex_KMeansBOC::flushCheck(NULL), 
1822                 0, thisProxy);
1823   contribute(sizeof(int), &flushInt, CkReduction::logical_or, cb);  
1824 }
1825
1826 // Called on processor 0
1827 void KMeansBOC::flushCheck(CkReductionMsg *msg) {
1828   int someFlush = *((int *)msg->getData());
1829
1830   // if(CkMyPe()==0) CkPrintf("[%d] KMeansBOC::flushCheck time=\t%g\n", CkMyPe(), CkWallTimer() );
1831   
1832   if (someFlush == 0) {
1833     // Data intact proceed with KMeans analysis
1834     CProxy_KMeansBOC kMeansProxy(kMeansGID);
1835     kMeansProxy.flushCheckDone();
1836   } else {
1837     // Some processor had flushed it data at some point, abandon KMeans
1838     CkPrintf("Warning: Some processor has flushed its data. No KMeans will be conducted\n");
1839     // terminate KMeans
1840     CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1841     bocProxy[0].kMeansDone();
1842   }
1843 }
1844
1845 // Called on each processor
1846 void KMeansBOC::flushCheckDone() {
1847   // **FIXME** - more flexible metric collection scheme may be necessary
1848   //   in the future for production use.
1849   LogPool *pool = CkpvAccess(_trace)->_logPool;
1850
1851   // if(CkMyPe()==0)     CkPrintf("[%d] KMeansBOC::flushCheckDone time=\t%g\n", CkMyPe(), CkWallTimer() );
1852
1853   numEntryMethods = _entryTable.size();
1854   numMetrics = numEntryMethods + 2; // EPtime + idle and overhead
1855
1856   // maintained across phases
1857   markedBegin = false;
1858   markedIdle = false;
1859   beginBlockTime = 0.0;
1860   beginIdleBlockTime = 0.0;
1861   lastBeginEPIdx = -1; // none
1862
1863   lastPhaseIdx = 0;
1864   currentExecTimes = NULL;
1865   currentPhase = 0;
1866   selected = false;
1867
1868   pool->initializePhases();
1869
1870   // incoming K Seeds and the per-phase filter
1871   incKSeeds = new double[numK*numMetrics];
1872   keepMetric = new bool[numMetrics];
1873
1874   //  Something wrong when call thisProxy[CkMyPe()].getNextPhaseMetrics() !??!
1875   //  CProxy_KMeansBOC kMeansProxy(kMeansGID);
1876   //  kMeansProxy[CkMyPe()].getNextPhaseMetrics();
1877   thisProxy[CkMyPe()].getNextPhaseMetrics();
1878 }
1879
1880 // Called on each processor.
1881 void KMeansBOC::getNextPhaseMetrics() {
1882   // Assumes the presence of the complete logs on this processor.
1883   // Assumes first event is always BEGIN_COMPUTATION
1884   // Assumes each processor sees the same number of phases.
1885   //
1886   // In this code, we collect performance data for this processor.
1887   // All times are in seconds.
1888
1889   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::getNextPhaseMetrics time=\t%g\n", CkMyPe(), CkWallTimer() );  
1890
1891   if (usePhases) {
1892     DEBUGF("[%d] Using Phases\n", CkMyPe());
1893   } else {
1894     DEBUGF("[%d] NOT using Phases\n", CkMyPe());
1895   }
1896   
1897   if (currentExecTimes != NULL) {
1898     delete [] currentExecTimes;
1899   }
1900   currentExecTimes = new double[numMetrics];
1901   for (int i=0; i<numMetrics; i++) {
1902     currentExecTimes[i] = 0.0;
1903   }
1904
1905   int numEventMethods = _entryTable.size();
1906   LogPool *pool = CkpvAccess(_trace)->_logPool;
1907   
1908   CkAssert(pool->numEntries > lastPhaseIdx);
1909   double startPhaseTime = pool->pool[lastPhaseIdx].time;
1910   double totalPhaseTime = 0.0;
1911   double totalActiveTime = 0.0; // entry method + idle
1912
1913   for (int i=lastPhaseIdx; i<pool->numEntries; i++) {
1914     if (pool->pool[i].type == BEGIN_PROCESSING) {
1915       // check pairing
1916       if (!markedBegin) {
1917         markedBegin = true;
1918       }
1919       beginBlockTime = pool->pool[i].time;
1920       lastBeginEPIdx = pool->pool[i].eIdx;
1921     } else if (pool->pool[i].type == END_PROCESSING) {
1922       // check pairing
1923       // if End without a begin, just ignore
1924       //   this event. If a phase-boundary is crossed, the Begin
1925       //   event would be maintained in beginBlockTime, so it is 
1926       //   not a problem.
1927       if (markedBegin) {
1928         markedBegin = false;
1929         if (pool->pool[i].event < 0)
1930         {
1931           // ignore dummy events. **FIXME** as they have no eIdx?
1932           continue;
1933         }
1934         currentExecTimes[pool->pool[i].eIdx] += 
1935           pool->pool[i].time - beginBlockTime;
1936         totalActiveTime += pool->pool[i].time - beginBlockTime;
1937         lastBeginEPIdx = -1;
1938       }
1939     } else if (pool->pool[i].type == BEGIN_IDLE) {
1940       // check pairing
1941       if (!markedIdle) {
1942         markedIdle = true;
1943       }
1944       beginIdleBlockTime = pool->pool[i].time;
1945     } else if (pool->pool[i].type == END_IDLE) {
1946       // check pairing
1947       if (markedIdle) {
1948         markedIdle = false;
1949         currentExecTimes[numEventMethods] += 
1950           pool->pool[i].time - beginIdleBlockTime;
1951         totalActiveTime += pool->pool[i].time - beginIdleBlockTime;
1952       }
1953     } else if (pool->pool[i].type == END_PHASE) {
1954       // ignored when not using phases
1955       if (usePhases) {
1956         // when we've not visited this node before
1957         if (i != lastPhaseIdx) { 
1958           totalPhaseTime = 
1959             pool->pool[i].time - pool->pool[lastPhaseIdx].time;
1960           // it is important that proper accounting of time take place here.
1961           // Note that END_PHASE events inevitably occur in the context of
1962           //   some entry method by the way the tracing API is designed.
1963           if (markedBegin) {
1964             CkAssert(lastBeginEPIdx >= 0);
1965             currentExecTimes[lastBeginEPIdx] += 
1966               pool->pool[i].time - beginBlockTime;
1967             totalActiveTime += pool->pool[i].time - beginBlockTime;
1968             // this is so the remainder contributes to the next phase
1969             beginBlockTime = pool->pool[i].time;
1970           }
1971           // The following is unlikely, but stranger things have happened.
1972           if (markedIdle) {
1973             currentExecTimes[numEventMethods] +=
1974               pool->pool[i].time - beginIdleBlockTime;
1975             totalActiveTime += pool->pool[i].time - beginIdleBlockTime;
1976             // this is so the remainder contributes to the next phase
1977             beginIdleBlockTime = pool->pool[i].time;
1978           }
1979           if (totalActiveTime <= totalPhaseTime) {
1980             currentExecTimes[numEventMethods+1] = 
1981               totalPhaseTime - totalActiveTime;
1982           } else {
1983             currentExecTimes[numEventMethods+1] = 0.0;
1984             CkPrintf("[%d] Warning: Overhead found to be negative for Phase %d!\n",
1985                      CkMyPe(), currentPhase);
1986           }
1987           collectKMeansData();
1988           // end the loop (and method) and defer the work till the next call
1989           lastPhaseIdx = i;
1990           break; 
1991         }
1992       }
1993     } else if (pool->pool[i].type == END_COMPUTATION) {
1994       if (markedBegin) {
1995         CkAssert(lastBeginEPIdx >= 0);
1996         currentExecTimes[lastBeginEPIdx] += 
1997           pool->pool[i].time - beginBlockTime;
1998         totalActiveTime += pool->pool[i].time - beginBlockTime;
1999       }
2000       if (markedIdle) {
2001         currentExecTimes[numEventMethods] +=
2002           pool->pool[i].time - beginIdleBlockTime;
2003         totalActiveTime += pool->pool[i].time - beginIdleBlockTime;
2004       }
2005       totalPhaseTime = 
2006         pool->pool[i].time - pool->pool[lastPhaseIdx].time;
2007       if (totalActiveTime <= totalPhaseTime) {
2008         currentExecTimes[numEventMethods+1] = totalPhaseTime - totalActiveTime;
2009       } else {
2010         currentExecTimes[numEventMethods+1] = 0.0;
2011         CkPrintf("[%d] Warning: Overhead found to be negative!\n",
2012                  CkMyPe());
2013       }
2014       collectKMeansData();
2015     }
2016   }
2017 }
2018
2019 /**
2020  *  Through a reduction, collectKMeansData aggregates each processors' data
2021  *  in order for global properties to be determined:
2022  *  
2023  *  1. min & max to determine normalization factors.
2024  *  2. sum to determine global EP averages for possible metric reduction
2025  *       through thresholding.
2026  *  3. sum of squares to compute stddev which may be useful in the future.
2027  *
2028  *  collectKMeansData will also keep the processor's data for the current
2029  *    phase so that it may be normalized and worked on subsequently.
2030  *
2031  **/
2032 void KMeansBOC::collectKMeansData() {
2033   int minOffset = numMetrics;
2034   int maxOffset = 2*numMetrics;
2035   int sosOffset = 3*numMetrics; // sos = Sum Of Squares
2036
2037   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::collectKMeansData time=\tg\n", CkMyPe(), CkWallTimer() );
2038
2039   double *reductionMsg = new double[numMetrics*4];
2040
2041   for (int i=0; i<numMetrics; i++) {
2042     reductionMsg[i] = currentExecTimes[i];
2043     // duplicate the event times for max and min sections of the reduction
2044     reductionMsg[minOffset + i] = currentExecTimes[i];
2045     reductionMsg[maxOffset + i] = currentExecTimes[i];
2046     // compute squares
2047     reductionMsg[sosOffset + i] = currentExecTimes[i]*currentExecTimes[i];
2048   }
2049
2050   CkCallback cb(CkIndex_KMeansBOC::globalMetricRefinement(NULL), 
2051                 0, thisProxy);
2052   contribute((numMetrics*4)*sizeof(double), reductionMsg, 
2053              outlierReductionType, cb);  
2054 }
2055
2056 // The purpose is mainly to initialize the k seeds and generate
2057 //   normalization parameters for each of the metrics. The k seeds
2058 //   and normalization parameters are broadcast to all processors.
2059 //
2060 // Called on processor 0
2061 void KMeansBOC::globalMetricRefinement(CkReductionMsg *msg) {
2062   CkAssert(CkMyPe() == 0);
2063   
2064   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::globalMetricRefinement time=\t%g\n", CkMyPe(), CkWallTimer() );
2065
2066   int sumOffset = 0;
2067   int minOffset = numMetrics;
2068   int maxOffset = 2*numMetrics;
2069   int sosOffset = 3*numMetrics; // sos = Sum Of Squares
2070
2071   // calculate statistics & boundaries for the k seeds for clustering
2072   KMeansStatsMessage *outmsg = 
2073     new (numMetrics, numK*numMetrics, numMetrics*4) KMeansStatsMessage;
2074   outmsg->numMetrics = numMetrics;
2075   outmsg->numKPos = numK*numMetrics;
2076   outmsg->numStats = numMetrics*4;
2077
2078   // Sum | Min | Max | Sum of Squares
2079   double *totalExecTimes = (double *)msg->getData();
2080   double totalTime = 0.0;
2081
2082   for (int i=0; i<numMetrics; i++) {
2083     DEBUGN("%lf\n", totalExecTimes[i]);
2084     totalTime += totalExecTimes[i];
2085
2086     // calculate event mean over all processors
2087     outmsg->stats[sumOffset + i] = totalExecTimes[sumOffset + i]/CkNumPes();
2088
2089     // get the ranges and offsets of each metric. With this, we get
2090     //   normalization factors that can be sent back to each processor to
2091     //   be used as necessary. We reuse max for range. Min remains the offset.
2092     outmsg->stats[minOffset + i] = totalExecTimes[minOffset + i];
2093     outmsg->stats[maxOffset + i] = totalExecTimes[maxOffset + i] -
2094       totalExecTimes[minOffset + i];
2095     
2096     // calculate stddev (using biased variance)
2097     outmsg->stats[sosOffset + i] = 
2098       sqrt((totalExecTimes[sosOffset + i] - 
2099             2*(outmsg->stats[i])*totalExecTimes[i] +
2100             (outmsg->stats[i])*(outmsg->stats[i])*CkNumPes())/
2101            CkNumPes());
2102   }
2103
2104   for (int i=0; i<numMetrics; i++) {
2105     // 1) if the proportion of the max value of the entry method relative to
2106     //   the average time taken over all entry methods across all processors
2107     //   is greater than the stipulated percentage threshold ...; AND
2108     // 2) if the range of values are non-zero.
2109     //
2110     // The current assumption is totalTime > 0 (what program has zero total
2111     //   time from all work?)
2112     keepMetric[i] = ((totalExecTimes[maxOffset + i]/(totalTime/CkNumPes()) >=
2113                      entryThreshold) &&
2114       (totalExecTimes[maxOffset + i] > totalExecTimes[minOffset + i]));
2115     if (keepMetric[i]) {
2116       DEBUGF("[%d] Keep EP %d | Max = %lf | Avg Tot = %lf\n", CkMyPe(), i,
2117              totalExecTimes[maxOffset + i], totalTime/CkNumPes());
2118     } else {
2119       DEBUGN("[%d] DO NOT Keep EP %d\n", CkMyPe(), i);
2120     }
2121     outmsg->filter[i] = keepMetric[i];
2122   }
2123
2124   delete msg;
2125   
2126   // initialize k seeds for this phase
2127   kSeeds = new double[numK*numMetrics];
2128
2129   numKReported = 0;
2130   kNumMembers = new int[numK];
2131
2132   // Randomly select k processors' metric vectors for the k seeds
2133   //  srand((unsigned)(CmiWallTimer()*1.0e06));
2134   srand(11337); // for debugging purposes
2135   for (int k=0; k<numK; k++) {
2136     DEBUGF("Seed %d | ", k);
2137     for (int m=0; m<numMetrics; m++) {
2138       double factor = totalExecTimes[maxOffset + m] - 
2139         totalExecTimes[minOffset + m];
2140       // "uniform" distribution, scaled according to the normalization
2141       //   factors
2142       //      kSeeds[numMetrics*k + m] = ((1.0*(k+1))/numK)*factor;
2143       // Random distribution.
2144       kSeeds[numMetrics*k + m] =
2145         ((rand()*1.0)/RAND_MAX)*factor;
2146       if (keepMetric[m]) {
2147         DEBUGF("[%d|%lf] ", m, kSeeds[numMetrics*k + m]);
2148       }
2149       outmsg->kSeedsPos[numMetrics*k + m] = kSeeds[numMetrics*k + m];
2150     }
2151     DEBUGF("\n");
2152     kNumMembers[k] = 0;
2153   }
2154
2155   // broadcast statistical values to all processors for cluster discovery
2156   thisProxy.findInitialClusters(outmsg);
2157 }
2158
2159
2160
2161 // Called on each processor.
2162 void KMeansBOC::findInitialClusters(KMeansStatsMessage *msg) {
2163
2164  if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::findInitialClusters time=\t%g\n", CkMyPe(), CkWallTimer() );
2165
2166   phaseIter = 0;
2167
2168   // Get info from stats message
2169   CkAssert(numMetrics == msg->numMetrics);
2170   for (int i=0; i<numMetrics; i++) {
2171     keepMetric[i] = msg->filter[i];
2172   }
2173
2174   // Normalize data on local processor.
2175   // **CWL** See my thesis for detailed discussion of normalization of
2176   //    performance data.
2177   // **NOTE** This might change if we want to send data based on the filter
2178   //   instead of all the data.
2179   CkAssert(numMetrics*4 == msg->numStats);
2180   for (int i=0; i<numMetrics; i++) {
2181     currentExecTimes[i] -= msg->stats[numMetrics + i];  // take offset
2182     // **CWL** We do not normalize the range. Entry methods that exhibit
2183     //   large absolute timing variations should be allowed to contribute
2184     //   more to the Euclidean distance measure!
2185     // currentExecTimes[i] /= msg->stats[2*numMetrics + i];
2186   }
2187
2188   // **NOTE** This might change if we want to send data based on the filter
2189   //   instead of all the data.
2190   CkAssert(numK*numMetrics == msg->numKPos);
2191   for (int i=0; i<msg->numKPos; i++) {
2192     incKSeeds[i] = msg->kSeedsPos[i];
2193   }
2194
2195   // Decide which KSeed this processor belongs to.
2196   minDistance = calculateDistance(0);
2197   DEBUGN("[%d] Phase %d Iter %d | Distance from 0 = %lf \n", CkMyPe(), 
2198            currentPhase, phaseIter, minDistance);
2199   minK = 0;
2200   for (int i=1; i<numK; i++) {
2201     double distance = calculateDistance(i);
2202     DEBUGN("[%d] Phase %d Iter %d | Distance from %d = %lf \n", CkMyPe(), 
2203              currentPhase, phaseIter, i, distance);
2204     if (distance < minDistance) {
2205       minDistance = distance;
2206       minK = i;
2207     }
2208   }
2209
2210   // Set up a reduction with the modification vector to the root (0).
2211   //
2212   // The modification vector sends a negative value for each metric
2213   //   for the K this processor no longer belongs to and a positive
2214   //   value to the K the processor now belongs. In addition, a -1.0
2215   //   is sent to the K it is leaving and a +1.0 to the K it is 
2216   //   joining.
2217   //
2218   // The processor must still contribute a "zero returns" even if
2219   //   nothing changes. This will be the basis for determine
2220   //   convergence at the root.
2221   //
2222   // The addtional +1 is meant for the count-change that must be
2223   //   maintained for the special cases at the root when some K
2224   //   may be deprived of all processor points or go from 0 to a
2225   //   positive number of processors (see later comments).
2226   double *modVector = new double[numK*(numMetrics+1)];
2227   for (int i=0; i<numK; i++) {
2228     for (int j=0; j<numMetrics+1; j++) {
2229       modVector[i*(numMetrics+1) + j] = 0.0;
2230     }
2231   }
2232   for (int i=0; i<numMetrics; i++) {
2233     // for this initialization, only positive values need be sent.
2234     modVector[minK*(numMetrics+1) + i] = currentExecTimes[i];
2235   }
2236   modVector[minK*(numMetrics+1)+numMetrics] = 1.0;
2237
2238   CkCallback cb(CkIndex_KMeansBOC::updateKSeeds(NULL), 
2239                 0, thisProxy);
2240   contribute(numK*(numMetrics+1)*sizeof(double), modVector, 
2241              CkReduction::sum_double, cb);  
2242 }
2243
2244 double KMeansBOC::calculateDistance(int k) {
2245   double ret = 0.0;
2246   for (int i=0; i<numMetrics; i++) {
2247     if (keepMetric[i]) {
2248       DEBUGN("[%d] Phase %d Iter %d Metric %d Exec %lf Seed %lf \n", 
2249              CkMyPe(), currentPhase, phaseIter, i,
2250                currentExecTimes[i], incKSeeds[k*numMetrics + i]);
2251       ret += pow(currentExecTimes[i] - incKSeeds[k*numMetrics + i], 2.0);
2252     }
2253   }
2254   return sqrt(ret);
2255 }
2256
2257 void KMeansBOC::updateKSeeds(CkReductionMsg *msg) {
2258   CkAssert(CkMyPe() == 0);
2259
2260   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::updateKSeeds time=\t%g\n", CkMyPe(), CkWallTimer() );
2261
2262   double *modVector = (double *)msg->getData();
2263   // sanity check
2264   CkAssert(numK*(numMetrics+1)*sizeof(double) == msg->getSize());
2265
2266   // A quick convergence test.
2267   bool hasChanges = false;
2268   for (int i=0; i<numK; i++) {
2269     hasChanges = hasChanges || 
2270       (modVector[i*(numMetrics+1) + numMetrics] != 0.0);
2271   }
2272   if (!hasChanges) {
2273     delete msg;
2274     findRepresentatives();
2275   } else {
2276     int overallChange = 0;
2277     for (int i=0; i<numK; i++) {
2278       int change = (int)modVector[i*(numMetrics+1) + numMetrics];
2279       if (change != 0) {
2280         overallChange += change;
2281         // modify the k seeds based on the modification vectors coming in
2282         //
2283         // If a seed initially has no members, its contents do not matter and
2284         //   is simply set to the average of the incoming vector.
2285         // If the change causes a seed to lose all its members, do nothing.
2286         //   Its last-known location is kept to allow it to re-capture
2287         //   membership at the next iteration rather than apply the last
2288         //   changes (which snaps the point unnaturally to 0,0).
2289         // Otherwise, apply the appropriate vector changes.
2290         CkAssert((kNumMembers[i] + change >= 0) &&
2291                  (kNumMembers[i] + change <= CkNumPes()));
2292         if (kNumMembers[i] == 0) {
2293           CkAssert(change > 0);
2294           for (int j=0; j<numMetrics; j++) {
2295             kSeeds[i*numMetrics + j] = modVector[i*(numMetrics+1) + j]/change;
2296           }
2297         } else if (kNumMembers[i] + change == 0) {
2298           // do nothing.
2299         } else {
2300           for (int j=0; j<numMetrics; j++) {
2301             kSeeds[i*numMetrics + j] *= kNumMembers[i];
2302             kSeeds[i*numMetrics + j] += modVector[i*(numMetrics+1) + j];
2303             kSeeds[i*numMetrics + j] /= kNumMembers[i] + change;
2304           }
2305         }
2306         kNumMembers[i] += change;
2307       }
2308       DEBUGN("[%d] Phase %d Iter %d K = %d Membership Count = %d\n",
2309              CkMyPe(), currentPhase, phaseIter, i, kNumMembers[i]);
2310     }
2311     delete msg;
2312
2313     // broadcast the new seed locations.
2314     KSeedsMessage *outmsg = new (numK*numMetrics) KSeedsMessage;
2315     outmsg->numKPos = numK*numMetrics;
2316     for (int i=0; i<numK*numMetrics; i++) {
2317       outmsg->kSeedsPos[i] = kSeeds[i];
2318     }
2319
2320     thisProxy.updateSeedMembership(outmsg);
2321   }
2322 }
2323
2324 // Called on all processors
2325 void KMeansBOC::updateSeedMembership(KSeedsMessage *msg) {
2326
2327   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::updateSeedMembership time=\t%g\n", CkMyPe(), CkWallTimer() );
2328
2329   phaseIter++;
2330
2331   // **NOTE** This might change if we want to send data based on the filter
2332   //   instead of all the data.
2333   CkAssert(numK*numMetrics == msg->numKPos);
2334   for (int i=0; i<msg->numKPos; i++) {
2335     incKSeeds[i] = msg->kSeedsPos[i];
2336   }
2337
2338   // Decide which KSeed this processor belongs to.
2339   lastMinK = minK;
2340   minDistance = calculateDistance(0);
2341   DEBUGN("[%d] Phase %d Iter %d | Distance from 0 = %lf \n", CkMyPe(), 
2342          currentPhase, phaseIter, minDistance);
2343
2344   minK = 0;
2345   for (int i=1; i<numK; i++) {
2346     double distance = calculateDistance(i);
2347     DEBUGN("[%d] Phase %d Iter %d | Distance from %d = %lf \n", CkMyPe(), 
2348            currentPhase, phaseIter, i, distance);
2349     if (distance < minDistance) {
2350       minDistance = distance;
2351       minK = i;
2352     }
2353   }
2354
2355   double *modVector = new double[numK*(numMetrics+1)];
2356   for (int i=0; i<numK; i++) {
2357     for (int j=0; j<numMetrics+1; j++) {
2358       modVector[i*(numMetrics+1) + j] = 0.0;
2359     }
2360   }
2361
2362   if (minK != lastMinK) {
2363     for (int i=0; i<numMetrics; i++) {
2364       modVector[minK*(numMetrics+1) + i] = currentExecTimes[i];
2365       modVector[lastMinK*(numMetrics+1) + i] = -currentExecTimes[i];
2366     }
2367     modVector[minK*(numMetrics+1)+numMetrics] = 1.0;
2368     modVector[lastMinK*(numMetrics+1)+numMetrics] = -1.0;
2369   }
2370
2371   CkCallback cb(CkIndex_KMeansBOC::updateKSeeds(NULL), 
2372                 0, thisProxy);
2373   contribute(numK*(numMetrics+1)*sizeof(double), modVector, 
2374              CkReduction::sum_double, cb);  
2375 }
2376
2377 void KMeansBOC::findRepresentatives() {
2378
2379   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::findRepresentatives time=\t%g\n", CkMyPe(), CkWallTimer() );
2380
2381   int numNonEmptyClusters = 0;
2382   for (int i=0; i<numK; i++) {
2383     if (kNumMembers[i] > 0) {
2384       numNonEmptyClusters++;
2385     }
2386   }
2387
2388   int numRepresentatives = peNumKeep;
2389   // **FIXME**
2390   // This is fairly arbitrary. Next time, choose the centers of the top
2391   //   largest clusters.
2392   if (numRepresentatives < numNonEmptyClusters) {
2393     numRepresentatives = numNonEmptyClusters;
2394   }
2395
2396   int slotsRemaining = numRepresentatives;
2397
2398   DEBUGF("Slots = %d | Non-empty = %d \n", slotsRemaining, 
2399          numNonEmptyClusters);
2400
2401   // determine how many exemplars to select per cluster. Currently
2402   //   hardcoded to 1. Future challenge is to decide on other numbers
2403   //   or proportionality.
2404   //
2405   int exemplarsPerCluster = 1;
2406   slotsRemaining -= exemplarsPerCluster*numNonEmptyClusters;
2407
2408   int numCandidateOutliers = CkNumPes() - 
2409     exemplarsPerCluster*numNonEmptyClusters;
2410
2411   double *remainders = new double[numK];
2412   int *assigned = new int[numK];
2413   exemplarChoicesLeft = new int[numK];
2414   outlierChoicesLeft = new int[numK];
2415
2416   for (int i=0; i<numK; i++) {
2417     assigned[i] = 0;
2418     remainders[i] = 
2419       (kNumMembers[i] - exemplarsPerCluster*numNonEmptyClusters) *
2420       slotsRemaining / numCandidateOutliers;
2421     if (remainders[i] >= 0.0) {
2422       assigned[i] = (int)floor(remainders[i]);
2423       remainders[i] -= assigned[i];
2424     } else {
2425       remainders[i] = 0.0;
2426     }
2427   }
2428
2429   for (int i=0; i<numK; i++) {
2430     slotsRemaining -= assigned[i];
2431   }
2432   CkAssert(slotsRemaining >= 0);
2433
2434   // find clusters to assign the loose slots to, in order of
2435   // remainder proportion
2436   while (slotsRemaining > 0) {
2437     double max = 0.0;
2438     int winner = 0;
2439     for (int i=0; i<numK; i++) {
2440       if (remainders[i] > max) {
2441         max = remainders[i];
2442         winner = i;
2443       }
2444     }
2445     assigned[winner]++;
2446     remainders[winner] = 0.0;
2447     slotsRemaining--;
2448   }
2449
2450   // set up how many reduction cycles of min/max we need to conduct to
2451   // select the representatives.
2452   numSelectionIter = exemplarsPerCluster;
2453   for (int i=0; i<numK; i++) {
2454     if (assigned[i] > numSelectionIter) {
2455       numSelectionIter = assigned[i];
2456     }
2457   }
2458   DEBUGF("Selection Iterations = %d\n", numSelectionIter);
2459
2460   for (int i=0; i<numK; i++) {
2461     if (kNumMembers[i] > 0) {
2462       exemplarChoicesLeft[i] = exemplarsPerCluster;
2463       outlierChoicesLeft[i] = assigned[i];
2464     } else {
2465       exemplarChoicesLeft[i] = 0;
2466       outlierChoicesLeft[i] = 0;
2467     }
2468     DEBUGF("%d | Exemplar = %d | Outlier = %d\n", i, exemplarChoicesLeft[i],
2469            outlierChoicesLeft[i]);
2470   }
2471
2472   delete [] assigned;
2473   delete [] remainders;
2474
2475   // send out first broadcast
2476   KSelectionMessage *outmsg = NULL;
2477   if (numSelectionIter > 0) {
2478     outmsg = new (numK, numK, numK) KSelectionMessage;
2479     outmsg->numKMinIDs = numK;
2480     outmsg->numKMaxIDs = numK;
2481     for (int i=0; i<numK; i++) {
2482       outmsg->minIDs[i] = -1;
2483       outmsg->maxIDs[i] = -1;
2484     }
2485     thisProxy.collectDistances(outmsg);
2486   } else {
2487     CkPrintf("Warning: No selection iteration from the start!\n");
2488     // invoke phase completion on all processors
2489     thisProxy.phaseDone();
2490   }
2491 }
2492
2493 /*
2494  *  lastMin = array of minimum champions of the last tournament
2495  *  lastMax = array of maximum champions of the last tournament
2496  *  lastMaxVal = array of last encountered maximum values, allows previous
2497  *                 minimum winners to eliminate themselves from the next
2498  *                 minimum race.
2499  *
2500  *  Called on all processors.
2501  */
2502 void KMeansBOC::collectDistances(KSelectionMessage *msg) {
2503
2504   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::collectDistances time=\t%g\n", CkMyPe(), CkWallTimer() );
2505
2506   DEBUGF("[%d] %d | min = %d max = %d\n", CkMyPe(),
2507          lastMinK, msg->minIDs[lastMinK], msg->maxIDs[lastMinK]);
2508   if ((CkMyPe() == msg->minIDs[lastMinK]) || 
2509       (CkMyPe() == msg->maxIDs[lastMinK])) {
2510     CkAssert(!selected);
2511     selected = true;
2512   }
2513
2514   // build outgoing reduction structure
2515   //   format = minVal | ID | maxVal | ID
2516   double *minMaxAndIDs = NULL;
2517
2518   minMaxAndIDs = new double[numK*4];
2519   // initialize to the appropriate out-of-band values (for error checks)
2520   for (int i=0; i<numK; i++) {
2521     minMaxAndIDs[i*4] = -1.0; // out-of-band min value
2522     minMaxAndIDs[i*4+1] = -1.0; // out of band ID
2523     minMaxAndIDs[i*4+2] = -1.0; // out-of-band max value
2524     minMaxAndIDs[i*4+3] = -1.0; // out of band ID
2525   }
2526   // If I have not won before, I put myself back into the competition
2527   if (!selected) {
2528     DEBUGF("[%d] My Contribution = %lf\n", CkMyPe(), minDistance);
2529     minMaxAndIDs[lastMinK*4] = minDistance;
2530     minMaxAndIDs[lastMinK*4+1] = CkMyPe();
2531     minMaxAndIDs[lastMinK*4+2] = minDistance;
2532     minMaxAndIDs[lastMinK*4+3] = CkMyPe();
2533   }
2534   delete msg;
2535
2536   CkCallback cb(CkIndex_KMeansBOC::findNextMinMax(NULL), 
2537                 0, thisProxy);
2538   contribute(numK*4*sizeof(double), minMaxAndIDs, 
2539              minMaxReductionType, cb);  
2540 }
2541
2542 void KMeansBOC::findNextMinMax(CkReductionMsg *msg) {
2543   // incoming format:
2544   //   minVal | minID | maxVal | maxID
2545
2546   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::findNextMinMax time=\t%g\n", CkMyPe(), CkWallTimer() );
2547
2548   if (numSelectionIter > 0) {
2549     double *incInfo = (double *)msg->getData();
2550     
2551     KSelectionMessage *outmsg = new (numK, numK) KSelectionMessage;
2552     outmsg->numKMinIDs = numK;
2553     outmsg->numKMaxIDs = numK;
2554     
2555     for (int i=0; i<numK; i++) {
2556       DEBUGF("%d | %lf %d %lf %d \n", i, 
2557              incInfo[i*4], (int)incInfo[i*4+1], 
2558              incInfo[i*4+2], (int)incInfo[i*4+3]);
2559     }
2560
2561     for (int i=0; i<numK; i++) {
2562       if (exemplarChoicesLeft[i] > 0) {
2563         outmsg->minIDs[i] = (int)incInfo[i*4+1];
2564         exemplarChoicesLeft[i]--;
2565       } else {
2566         outmsg->minIDs[i] = -1;
2567       }
2568       if (outlierChoicesLeft[i] > 0) {
2569         outmsg->maxIDs[i] = (int)incInfo[i*4+3];
2570         outlierChoicesLeft[i]--;
2571       } else {
2572         outmsg->maxIDs[i] = -1;
2573       }
2574     }
2575     thisProxy.collectDistances(outmsg);
2576     numSelectionIter--;
2577   } else {
2578     // invoke phase completion on all processors
2579     thisProxy.phaseDone();
2580   }
2581 }
2582
2583 /**
2584  *  Completion of the K-Means clustering and data selection of one phase
2585  *    of the computation.
2586  *
2587  *  Called on every processor.
2588  */
2589 void KMeansBOC::phaseDone() {
2590
2591   //  if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::phaseDone time=\t%g\n", CkMyPe(), CkWallTimer() );
2592
2593   LogPool *pool = CkpvAccess(_trace)->_logPool;
2594   CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
2595
2596   // now decide on what to do with the decision.
2597   if (!selected) {
2598     if (usePhases) {
2599       pool->keepPhase[currentPhase] = false;
2600     } else {
2601       // if not using phases, we're working on the whole log
2602       pool->setAllPhases(false);
2603     }
2604   }
2605
2606   // **FIXME** (?) - All processors have to agree on this, or the reduction
2607   //   will not be correct! The question is "is this enforcible?"
2608   if ((currentPhase == (pool->numPhases-1)) || !usePhases) {
2609     // We're done
2610     int dummy = 0;
2611     CkCallback cb(CkIndex_TraceProjectionsBOC::kMeansDone(NULL), 
2612                   0, bocProxy);
2613     contribute(sizeof(int), &dummy, CkReduction::sum_int, cb);
2614   } else {
2615     // reset all phase-based k-means data and decisions
2616
2617     // **FIXME**!!!!!    
2618     
2619     // invoke the next K-Means computation phase.
2620     currentPhase++;
2621     thisProxy[CkMyPe()].getNextPhaseMetrics();
2622   }
2623 }
2624
2625 void TraceProjectionsBOC::startEndTimeAnalysis()
2626 {
2627  //CkPrintf("[%d] TraceProjectionsBOC::startEndTimeAnalysis time=\t%g\n", CkMyPe(), CkWallTimer() );
2628
2629   endTime = CkpvAccess(_trace)->endTime;
2630   // CkPrintf("[%d] End time is %lf us\n", CkMyPe(), endTime*1e06);
2631
2632   CkCallback cb(CkIndex_TraceProjectionsBOC::endTimeDone(NULL), 
2633                 0, thisProxy);
2634   contribute(sizeof(double), &endTime, CkReduction::max_double, cb);  
2635 }
2636
2637 void TraceProjectionsBOC::endTimeDone(CkReductionMsg *msg)
2638 {
2639  //if(CkMyPe()==0)    CkPrintf("[%d] TraceProjectionsBOC::endTimeDone time=\t%g parModulesRemaining:%d\n", CkMyPe(), CkWallTimer(), parModulesRemaining);
2640
2641   CkAssert(CkMyPe() == 0);
2642   parModulesRemaining--;
2643   if (CkpvAccess(_trace) != NULL) {
2644     CkpvAccess(_trace)->_logPool->globalEndTime = *(double *)msg->getData();
2645     // CkPrintf("End time determined to be %lf us\n",
2646     //       (CkpvAccess(_trace)->_logPool->globalEndTime)*1e06);
2647   }
2648   delete msg;
2649   if (parModulesRemaining == 0) {
2650     thisProxy[CkMyPe()].finalize();
2651   }
2652 }
2653
2654 void TraceProjectionsBOC::kMeansDone(CkReductionMsg *msg) {
2655
2656  if(CkMyPe()==0)  CkPrintf("[%d] TraceProjectionsBOC::kMeansDone time=\t%g\n", CkMyPe(), CkWallTimer() );
2657
2658   CkAssert(CkMyPe() == 0);
2659   parModulesRemaining--;
2660   CkPrintf("K-Means Analysis Time = %lf seconds\n",
2661            CmiWallTimer()-analysisStartTime);
2662   delete msg;
2663   if (parModulesRemaining == 0) {
2664     thisProxy[CkMyPe()].finalize();
2665   }
2666 }
2667
2668 /**
2669  *
2670  *  This version is called (on processor 0) only if flushCheck fails.
2671  *
2672  */
2673 void TraceProjectionsBOC::kMeansDone() {
2674   CkAssert(CkMyPe() == 0);
2675   parModulesRemaining--;
2676   CkPrintf("K-Means Analysis Aborted because of flush. Time taken = %lf seconds\n",
2677            CmiWallTimer()-analysisStartTime);
2678   if (parModulesRemaining == 0) {
2679     thisProxy[CkMyPe()].finalize();
2680   }
2681 }
2682
2683 void TraceProjectionsBOC::finalize()
2684 {
2685   CkAssert(CkMyPe() == 0);
2686   //CkPrintf("Total Analysis Time = %lf seconds\n", 
2687   //       CmiWallTimer()-analysisStartTime);
2688   thisProxy.closingTraces();
2689 }
2690
2691 // Called on every processor
2692 void TraceProjectionsBOC::closingTraces() {
2693   CkpvAccess(_trace)->closeTrace();
2694
2695     // subtle:  reduction needs to go to the PE which started CkExit()
2696   int pe = 0;
2697   if (endPe != -1) pe = endPe;
2698   CkCallback cb(CkIndex_TraceProjectionsBOC::closeParallelShutdown(NULL), 
2699                 pe, thisProxy); 
2700   contribute(0, NULL, CkReduction::sum_int, cb);  
2701 }
2702
2703 // The sole purpose of this reduction is to decide whether or not
2704 //   Projections as a module needs to call CkExit() to get other
2705 //   modules to shutdown.
2706 void TraceProjectionsBOC::closeParallelShutdown(CkReductionMsg *msg) {
2707   CkAssert(endPe == -1 && CkMyPe() ==0 || CkMyPe() == endPe);
2708   delete msg;
2709   // decide if CkExit() needs to be called
2710   if (!CkpvAccess(_trace)->converseExit) {
2711     CkExit();
2712   }
2713 }
2714 /*
2715  *  Registration and definition of the Outlier Reduction callback.
2716  *  Format: Sum | Min | Max | Sum of Squares
2717  */
2718 CkReductionMsg *outlierReduction(int nMsgs,
2719                                  CkReductionMsg **msgs) {
2720   int numBytes = 0;
2721   int numMetrics = 0;
2722   double *ret = NULL;
2723
2724   if (nMsgs == 1) {
2725     // nothing to do, just pass it on
2726     return CkReductionMsg::buildNew(msgs[0]->getSize(),msgs[0]->getData());
2727   }
2728
2729   if (nMsgs > 1) {
2730     numBytes = msgs[0]->getSize();
2731     // sanity checks
2732     if (numBytes%sizeof(double) != 0) {
2733       CkAbort("Outlier Reduction Size incompatible with doubles!\n");
2734     }
2735     if ((numBytes/sizeof(double))%4 != 0) {
2736       CkAbort("Outlier Reduction Size Array not divisible by 4!\n");
2737     }
2738     numMetrics = (numBytes/sizeof(double))/4;
2739     ret = new double[numMetrics*4];
2740
2741     // copy the first message data into the return structure first
2742     for (int i=0; i<numMetrics*4; i++) {
2743       ret[i] = ((double *)msgs[0]->getData())[i];
2744     }
2745
2746     // Sum | Min | Max | Sum of Squares
2747     for (int msgIdx=1; msgIdx<nMsgs; msgIdx++) {
2748       for (int i=0; i<numMetrics; i++) {
2749         // Sum
2750         ret[i] += ((double *)msgs[msgIdx]->getData())[i];
2751         // Min
2752         ret[numMetrics + i] =
2753           (ret[numMetrics + i] < 
2754            ((double *)msgs[msgIdx]->getData())[numMetrics + i]) 
2755           ? ret[numMetrics + i] : 
2756           ((double *)msgs[msgIdx]->getData())[numMetrics + i];
2757         // Max
2758         ret[2*numMetrics + i] = 
2759           (ret[2*numMetrics + i] >
2760            ((double *)msgs[msgIdx]->getData())[2*numMetrics + i])
2761           ? ret[2*numMetrics + i] :
2762           ((double *)msgs[msgIdx]->getData())[2*numMetrics + i];
2763         // Sum of Squares (squaring already done at leaf)
2764         ret[3*numMetrics + i] +=
2765           ((double *)msgs[msgIdx]->getData())[3*numMetrics + i];
2766       }
2767     }
2768   }
2769   
2770   /* apparently, we do not delete the incoming messages */
2771   return CkReductionMsg::buildNew(numBytes,ret);
2772 }
2773
2774 /*
2775  * The only reason we have a user-defined reduction is to support
2776  *   identification of the "winning" processors as well as to handle
2777  *   both the min and the max of each "tournament". A simple min/max
2778  *   discovery cannot handle ties.
2779  */
2780 CkReductionMsg *minMaxReduction(int nMsgs,
2781                                 CkReductionMsg **msgs) {
2782   CkAssert(nMsgs > 0);
2783
2784   int numBytes = msgs[0]->getSize();
2785   CkAssert(numBytes%sizeof(double) == 0);
2786   int numK = (numBytes/sizeof(double))/4;
2787
2788   double *ret = new double[numK*4];
2789   // fill with out-of-band values
2790   for (int i=0; i<numK; i++) {
2791     ret[i*4] = -1.0;
2792     ret[i*4+1] = -1.0;
2793     ret[i*4+2] = -1.0;
2794     ret[i*4+3] = -1.0;
2795   }
2796
2797   // incoming format K * (minVal | minIdx | maxVal | maxIdx)
2798   for (int i=0; i<nMsgs; i++) {
2799     double *temp = (double *)msgs[i]->getData();
2800     for (int j=0; j<numK; j++) {
2801       // no previous valid min
2802       if (ret[j*4+1] < 0) {
2803         // fill it in only if the incoming min is valid
2804         if (temp[j*4+1] >= 0) {
2805           ret[j*4] = temp[j*4];      // fill min value
2806           ret[j*4+1] = temp[j*4+1];  // fill ID
2807         }
2808       } else {
2809         // find Min, only if incoming min is valid
2810         if (temp[j*4+1] >= 0) {
2811           if (temp[j*4] < ret[j*4]) {
2812             ret[j*4] = temp[j*4];      // replace min value
2813             ret[j*4+1] = temp[j*4+1];  // replace ID
2814           }
2815         }
2816       }
2817       // no previous valid max
2818       if (ret[j*4+3] < 0) {
2819         // fill only if the incoming max is valid
2820         if (temp[j*4+3] >= 0) {
2821           ret[j*4+2] = temp[j*4+2];  // fill max value
2822           ret[j*4+3] = temp[j*4+3];  // fill ID
2823         }
2824       } else {
2825         // find Max, only if incoming max is valid
2826         if (temp[j*4+3] >= 0) {
2827           if (temp[j*4+2] > ret[j*4+2]) {
2828             ret[j*4+2] = temp[j*4+2];  // replace max value
2829             ret[j*4+3] = temp[j*4+3];  // replace ID
2830           }
2831         }
2832       }
2833     }
2834   }
2835   CkReductionMsg *redmsg = CkReductionMsg::buildNew(numBytes, ret);
2836   delete [] ret;
2837   return redmsg;
2838 }
2839
2840 #include "TraceProjections.def.h"
2841 #endif //PROJ_ANALYSIS
2842
2843 /*@}*/