handle nested events
[charm.git] / src / ck-perf / trace-projections.C
1 /**
2  * \addtogroup CkPerf
3 */
4 /*@{*/
5
6 #include <string.h>
7
8 #include "charm++.h"
9 #include "trace-projections.h"
10 #include "trace-projectionsBOC.h"
11
12 #if DEBUG_PROJ
13 #define DEBUGF(...) CkPrintf(__VA_ARGS__)
14 #else
15 #define DEBUGF(...)
16 #endif
17 #define DEBUGN(...)  // easy way to selectively disable DEBUGs
18
19 #define DefaultLogBufSize      1000000
20
21 // **CW** Simple delta encoding implementation
22 // delta encoding is on by default. It may be turned off later in
23 // the runtime.
24 int deltaLog;
25 int nonDeltaLog;
26
27 int checknested=0;              // check illegal nested begin/end execute 
28
29 #ifdef PROJ_ANALYSIS
30 // BOC operations readonlys
31 CkGroupID traceProjectionsGID;
32 CkGroupID kMeansGID;
33
34 // New reduction type for Outlier Analysis purposes. This is allowed to be
35 // a global variable according to the Charm++ manual.
36 CkReductionMsg *outlierReduction(int nMsgs,
37                                  CkReductionMsg **msgs);
38 CkReductionMsg *minMaxReduction(int nMsgs,
39                                 CkReductionMsg **msgs);
40 CkReduction::reducerType outlierReductionType;
41 CkReduction::reducerType minMaxReductionType;
42 #endif // PROJ_ANALYSIS
43
44 CkpvStaticDeclare(TraceProjections*, _trace);
45 CtvStaticDeclare(int,curThreadEvent);
46
47 CkpvDeclare(CmiInt8, CtrLogBufSize);
48
49 typedef CkVec<char *>  usrEventVec;
50 CkpvStaticDeclare(usrEventVec, usrEventlist);
51 class UsrEvent {
52 public:
53   int e;
54   char *str;
55   UsrEvent(int _e, char* _s): e(_e),str(_s) {}
56 };
57 CkpvStaticDeclare(CkVec<UsrEvent *>*, usrEvents);
58
59 // When tracing is disabled, these are defined as empty static inlines
60 // in the header, to minimize overhead
61 #if CMK_TRACE_ENABLED
62 /// Disable the outputting of the trace logs
63 void disableTraceLogOutput()
64
65   CkpvAccess(_trace)->setWriteData(false);
66 }
67
68 /// Enable the outputting of the trace logs
69 void enableTraceLogOutput()
70 {
71   CkpvAccess(_trace)->setWriteData(true);
72 }
73
74 /// Force the log files to be flushed
75 void flushTraceLog()
76 {
77   CkpvAccess(_trace)->traceFlushLog();
78 }
79 #endif
80
81 #if ! CMK_TRACE_ENABLED
82 static int warned=0;
83 #define OPTIMIZED_VERSION       \
84         if (!warned) { warned=1;        \
85         CmiPrintf("\n\n!!!! Warning: traceUserEvent not available in optimized version!!!!\n\n\n"); }
86 #else
87 #define OPTIMIZED_VERSION /*empty*/
88 #endif // CMK_TRACE_ENABLED
89
90 /*
91 On T3E, we need to have file number control by open/close files only when needed.
92 */
93 #if CMK_TRACE_LOGFILE_NUM_CONTROL
94   #define OPEN_LOG openLog("a");
95   #define CLOSE_LOG closeLog();
96 #else
97   #define OPEN_LOG
98   #define CLOSE_LOG
99 #endif //CMK_TRACE_LOGFILE_NUM_CONTROL
100
101 #if CMK_HAS_COUNTER_PAPI
102 int papiEvents[NUMPAPIEVENTS] = { PAPI_L2_DCM, PAPI_FP_OPS };
103 #endif // CMK_HAS_COUNTER_PAPI
104
105 /**
106   For each TraceFoo module, _createTraceFoo() must be defined.
107   This function is called in _createTraces() generated in moduleInit.C
108 */
109 void _createTraceprojections(char **argv)
110 {
111   DEBUGF("%d createTraceProjections\n", CkMyPe());
112   CkpvInitialize(CkVec<char *>, usrEventlist);
113   CkpvInitialize(CkVec<UsrEvent *>*, usrEvents);
114   CkpvAccess(usrEvents) = new CkVec<UsrEvent *>();
115 #if CMK_BIGSIM_CHARM
116   // CthRegister does not call the constructor
117 //  CkpvAccess(usrEvents) = CkVec<UsrEvent *>();
118 #endif //CMK_BIGSIM_CHARM
119   CkpvInitialize(TraceProjections*, _trace);
120   CkpvAccess(_trace) = new  TraceProjections(argv);
121   CkpvAccess(_traces)->addTrace(CkpvAccess(_trace));
122   if (CkMyPe()==0) CkPrintf("Charm++: Tracemode Projections enabled.\n");
123 }
124  
125 /* ****** CW TEMPORARY LOCATION ***** Support for thread listeners */
126
127 struct TraceThreadListener {
128   struct CthThreadListener base;
129   int event;
130   int msgType;
131   int ep;
132   int srcPe;
133   int ml;
134   CmiObjId idx;
135 };
136
137 extern "C"
138 void traceThreadListener_suspend(struct CthThreadListener *l)
139 {
140   TraceThreadListener *a=(TraceThreadListener *)l;
141   /* here, we activate the appropriate trace codes for the appropriate
142      registered modules */
143   traceSuspend();
144 }
145
146 extern "C"
147 void traceThreadListener_resume(struct CthThreadListener *l) 
148 {
149   TraceThreadListener *a=(TraceThreadListener *)l;
150   /* here, we activate the appropriate trace codes for the appropriate
151      registered modules */
152   _TRACE_BEGIN_EXECUTE_DETAILED(a->event,a->msgType,a->ep,a->srcPe,a->ml,
153                                 CthGetThreadID(a->base.thread));
154   a->event=-1;
155   a->srcPe=CkMyPe(); /* potential lie to migrated threads */
156   a->ml=0;
157 }
158
159 extern "C"
160 void traceThreadListener_free(struct CthThreadListener *l) 
161 {
162   TraceThreadListener *a=(TraceThreadListener *)l;
163   delete a;
164 }
165
166 void TraceProjections::traceAddThreadListeners(CthThread tid, envelope *e)
167 {
168 #if CMK_TRACE_ENABLED
169   /* strip essential information from the envelope */
170   TraceThreadListener *a= new TraceThreadListener;
171   
172   a->base.suspend=traceThreadListener_suspend;
173   a->base.resume=traceThreadListener_resume;
174   a->base.free=traceThreadListener_free;
175   a->event=e->getEvent();
176   a->msgType=e->getMsgtype();
177   a->ep=e->getEpIdx();
178   a->srcPe=e->getSrcPe();
179   a->ml=e->getTotalsize();
180
181   CthAddListener(tid, (CthThreadListener *)a);
182 #endif
183 }
184
185 void LogPool::openLog(const char *mode)
186 {
187 #if CMK_PROJECTIONS_USE_ZLIB
188   if(compressed) {
189     if (nonDeltaLog) {
190       do {
191         zfp = gzopen(fname, mode);
192       } while (!zfp && (errno == EINTR || errno == EMFILE));
193       if(!zfp) CmiAbort("Cannot open Projections Compressed Non Delta Trace File for writing...\n");
194     }
195     if (deltaLog) {
196       do {
197         deltazfp = gzopen(dfname, mode);
198       } while (!deltazfp && (errno == EINTR || errno == EMFILE));
199       if (!deltazfp) 
200         CmiAbort("Cannot open Projections Compressed Delta Trace File for writing...\n");
201     }
202   } else {
203     if (nonDeltaLog) {
204       do {
205         fp = fopen(fname, mode);
206       } while (!fp && (errno == EINTR || errno == EMFILE));
207       if (!fp) {
208         CkPrintf("[%d] Attempting to open file [%s]\n",CkMyPe(),fname);
209         CmiAbort("Cannot open Projections Non Delta Trace File for writing...\n");
210       }
211     }
212     if (deltaLog) {
213       do {
214         deltafp = fopen(dfname, mode);
215       } while (!deltafp && (errno == EINTR || errno == EMFILE));
216       if (!deltafp) 
217         CmiAbort("Cannot open Projections Delta Trace File for writing...\n");
218     }
219   }
220 #else
221   if (nonDeltaLog) {
222     do {
223       fp = fopen(fname, mode);
224     } while (!fp && (errno == EINTR || errno == EMFILE));
225     if (!fp) {
226       CkPrintf("[%d] Attempting to open file [%s]\n",CkMyPe(),fname);
227       CmiAbort("Cannot open Projections Non Delta Trace File for writing...\n");
228     }
229   }
230   if (deltaLog) {
231     do {
232       deltafp = fopen(dfname, mode);
233     } while (!deltafp && (errno == EINTR || errno == EMFILE));
234     if(!deltafp) 
235       CmiAbort("Cannot open Projections Delta Trace File for writing...\n");
236   }
237 #endif
238 }
239
240 void LogPool::closeLog(void)
241 {
242 #if CMK_PROJECTIONS_USE_ZLIB
243   if(compressed) {
244     if (nonDeltaLog) gzclose(zfp);
245     if (deltaLog) gzclose(deltazfp);
246     return;
247   }
248 #endif
249   if (nonDeltaLog) { 
250 #if !defined(_WIN32) || defined(__CYGWIN__)
251     fsync(fileno(fp)); 
252 #endif
253     fclose(fp); 
254   }
255   if (deltaLog)  { 
256 #if !defined(_WIN32) || defined(__CYGWIN__)
257     fsync(fileno(deltafp)); 
258 #endif
259     fclose(deltafp);  
260   }
261 }
262
263 LogPool::LogPool(char *pgm) {
264   pool = new LogEntry[CkpvAccess(CtrLogBufSize)];
265   // defaults to writing data (no outlier changes)
266   writeData = true;
267   numEntries = 0;
268   // **CW** for simple delta encoding
269   prevTime = 0.0;
270   timeErr = 0.0;
271   globalStartTime = 0.0;
272   globalEndTime = 0.0;
273   headerWritten = 0;
274   numPhases = 0;
275   hasFlushed = false;
276
277   keepPhase = NULL;
278
279   fileCreated = false;
280   poolSize = CkpvAccess(CtrLogBufSize);
281   pgmname = new char[strlen(pgm)+1];
282   strcpy(pgmname, pgm);
283 }
284
285 void LogPool::createFile(const char *fix)
286 {
287   if (fileCreated) {
288     return;
289   }
290
291   char* filenameLastPart = strrchr(pgmname, PATHSEP) + 1; // Last occurrence of path separator
292   char *pathPlusFilePrefix = new char[1024];
293
294   if(nSubdirs > 0){
295     int sd = CkMyPe() % nSubdirs;
296     char *subdir = new char[1024];
297     sprintf(subdir, "%s.projdir.%d", pgmname, sd);
298     CmiMkdir(subdir);
299     sprintf(pathPlusFilePrefix, "%s%c%s%s", subdir, PATHSEP, filenameLastPart, fix);
300     delete[] subdir;
301   } else {
302     sprintf(pathPlusFilePrefix, "%s%s", pgmname, fix);
303   }
304
305   char pestr[10];
306   sprintf(pestr, "%d", CkMyPe());
307 #if CMK_PROJECTIONS_USE_ZLIB
308   int len;
309   if(compressed)
310     len = strlen(pathPlusFilePrefix)+strlen(".logold")+strlen(pestr)+strlen(".gz")+3;
311   else
312     len = strlen(pathPlusFilePrefix)+strlen(".logold")+strlen(pestr)+3;
313 #else
314   int len = strlen(pathPlusFilePrefix)+strlen(".logold")+strlen(pestr)+3;
315 #endif
316
317   if (nonDeltaLog) {
318     fname = new char[len];
319   }
320   if (deltaLog) {
321     dfname = new char[len];
322   }
323 #if CMK_PROJECTIONS_USE_ZLIB
324   if(compressed) {
325     if (deltaLog && nonDeltaLog) {
326       sprintf(fname, "%s.%s.logold.gz",  pathPlusFilePrefix, pestr);
327       sprintf(dfname, "%s.%s.log.gz", pathPlusFilePrefix, pestr);
328     } else {
329       if (nonDeltaLog) {
330         sprintf(fname, "%s.%s.log.gz", pathPlusFilePrefix,pestr);
331       } else {
332         sprintf(dfname, "%s.%s.log.gz", pathPlusFilePrefix, pestr);
333       }
334     }
335   } else {
336     if (deltaLog && nonDeltaLog) {
337       sprintf(fname, "%s.%s.logold", pathPlusFilePrefix, pestr);
338       sprintf(dfname, "%s.%s.log", pathPlusFilePrefix, pestr);
339     } else {
340       if (nonDeltaLog) {
341         sprintf(fname, "%s.%s.log", pathPlusFilePrefix, pestr);
342       } else {
343         sprintf(dfname, "%s.%s.log", pathPlusFilePrefix, pestr);
344       }
345     }
346   }
347 #else
348   if (deltaLog && nonDeltaLog) {
349     sprintf(fname, "%s.%s.logold", pathPlusFilePrefix, pestr);
350     sprintf(dfname, "%s.%s.log", pathPlusFilePrefix, pestr);
351   } else {
352     if (nonDeltaLog) {
353       sprintf(fname, "%s.%s.log", pathPlusFilePrefix, pestr);
354     } else {
355       sprintf(dfname, "%s.%s.log", pathPlusFilePrefix, pestr);
356     }
357   }
358 #endif
359   fileCreated = true;
360   delete[] pathPlusFilePrefix;
361   openLog("w+");
362   CLOSE_LOG 
363 }
364
365 void LogPool::createSts(const char *fix)
366 {
367   CkAssert(CkMyPe() == 0);
368   // create the sts file
369   char *fname = new char[strlen(CkpvAccess(traceRoot))+strlen(fix)+strlen(".sts")+2];
370   sprintf(fname, "%s%s.sts", CkpvAccess(traceRoot), fix);
371   do
372     {
373       stsfp = fopen(fname, "w");
374     } while (!stsfp && (errno == EINTR || errno == EMFILE));
375   if(stsfp==0){
376     CmiPrintf("Cannot open projections sts file for writing due to %s\n", strerror(errno));
377     CmiAbort("Error!!\n");
378   }
379   delete[] fname;
380 }  
381
382 void LogPool::createRC()
383 {
384   // create the projections rc file.
385   fname = 
386     new char[strlen(CkpvAccess(traceRoot))+strlen(".projrc")+1];
387   sprintf(fname, "%s.projrc", CkpvAccess(traceRoot));
388   do {
389     rcfp = fopen(fname, "w");
390   } while (!rcfp && (errno == EINTR || errno == EMFILE));
391   if (rcfp==0) {
392     CmiAbort("Cannot open projections configuration file for writing.\n");
393   }
394   delete[] fname;
395 }
396
397 LogPool::~LogPool() 
398 {
399   if (writeData) {
400     writeLog();
401 #if !CMK_TRACE_LOGFILE_NUM_CONTROL
402     closeLog();
403 #endif
404   }
405
406 #if CMK_BIGSIM_CHARM
407   extern int correctTimeLog;
408   if (correctTimeLog) {
409     createFile("-bg");
410     if (CkMyPe() == 0) {
411       createSts("-bg");
412     }
413     writeHeader();
414     if (CkMyPe() == 0) writeSts(NULL);
415     postProcessLog();
416   }
417 #endif
418
419   delete[] pool;
420   delete [] fname;
421 }
422
423 void LogPool::writeHeader()
424 {
425   if (headerWritten) return;
426   headerWritten = 1;
427   if(!binary) {
428 #if CMK_PROJECTIONS_USE_ZLIB
429     if(compressed) {
430       if (nonDeltaLog) {
431         gzprintf(zfp, "PROJECTIONS-RECORD %d\n", numEntries);
432       }
433       if (deltaLog) {
434         gzprintf(deltazfp, "PROJECTIONS-RECORD %d DELTA\n", numEntries);
435       }
436     } 
437     else /* else clause is below... */
438 #endif
439     /*... may hang over from else above */ {
440       if (nonDeltaLog) {
441         fprintf(fp, "PROJECTIONS-RECORD %d\n", numEntries);
442       }
443       if (deltaLog) {
444         fprintf(deltafp, "PROJECTIONS-RECORD %d DELTA\n", numEntries);
445       }
446     }
447   }
448   else { // binary
449       if (nonDeltaLog) {
450         fwrite(&numEntries,sizeof(numEntries),1,fp);
451       }
452       if (deltaLog) {
453         fwrite(&numEntries,sizeof(numEntries),1,deltafp);
454       }
455   }
456 }
457
458 void LogPool::writeLog(void)
459 {
460   createFile();
461   OPEN_LOG
462   writeHeader();
463   if (nonDeltaLog) write(0);
464   if (deltaLog) write(1);
465   CLOSE_LOG
466 }
467
468 void LogPool::write(int writedelta) 
469 {
470   // **CW** Simple delta encoding implementation
471   // prevTime has to be maintained as an object variable because
472   // LogPool::write may be called several times depending on the
473   // +logsize value.
474   PUP::er *p = NULL;
475   if (binary) {
476     p = new PUP::toDisk(writedelta?deltafp:fp);
477   }
478 #if CMK_PROJECTIONS_USE_ZLIB
479   else if (compressed) {
480     p = new toProjectionsGZFile(writedelta?deltazfp:zfp);
481   }
482 #endif
483   else {
484     p = new toProjectionsFile(writedelta?deltafp:fp);
485   }
486   CmiAssert(p);
487   int curPhase = 0;
488   // **FIXME** - Should probably consider a more sophisticated bounds-based
489   //   approach for selective writing instead of making multiple if-checks
490   //   for every single event.
491   for(UInt i=0; i<numEntries; i++) {
492     if (!writedelta) {
493       if (keepPhase == NULL) {
494         // default case, when no phase selection is required.
495         pool[i].pup(*p);
496       } else {
497         // **FIXME** Might be a good idea to create a "filler" event block for
498         //   all the events taken out by phase filtering.
499         if (pool[i].type == END_PHASE) {
500           // always write phase markers
501           pool[i].pup(*p);
502           curPhase++;
503         } else if (pool[i].type == BEGIN_COMPUTATION ||
504                    pool[i].type == END_COMPUTATION) {
505           // always write BEGIN and END COMPUTATION markers
506           pool[i].pup(*p);
507         } else if (keepPhase[curPhase]) {
508           pool[i].pup(*p);
509         }
510       }
511     }
512     else {      // delta
513       // **FIXME** Implement phase-selective writing for delta logs
514       //   eventually
515       double time = pool[i].time;
516       if (pool[i].type != BEGIN_COMPUTATION && pool[i].type != END_COMPUTATION)
517       {
518         double timeDiff = (time-prevTime)*1.0e6;
519         UInt intTimeDiff = (UInt)timeDiff;
520         timeErr += timeDiff - intTimeDiff; /* timeErr is never >= 2.0 */
521         if (timeErr > 1.0) {
522           timeErr -= 1.0;
523           intTimeDiff++;
524         }
525         pool[i].time = intTimeDiff/1.0e6;
526       }
527       pool[i].pup(*p);
528       pool[i].time = time;      // restore time value
529       prevTime = time;
530     }
531   }
532   delete p;
533   delete [] keepPhase;
534 }
535
536 void LogPool::writeSts(void)
537 {
538   // for whining compilers
539   int i;
540   // generate an automatic unique ID for each log
541   fprintf(stsfp, "PROJECTIONS_ID %s\n", "");
542   fprintf(stsfp, "VERSION %s\n", PROJECTION_VERSION);
543   fprintf(stsfp, "TOTAL_PHASES %d\n", numPhases);
544 #if CMK_HAS_COUNTER_PAPI
545   fprintf(stsfp, "TOTAL_PAPI_EVENTS %d\n", NUMPAPIEVENTS);
546   // for now, use i, next time use papiEvents[i].
547   // **CW** papi event names is a hack.
548   char eventName[PAPI_MAX_STR_LEN];
549   for (i=0;i<NUMPAPIEVENTS;i++) {
550     PAPI_event_code_to_name(papiEvents[i], eventName);
551     fprintf(stsfp, "PAPI_EVENT %d %s\n", i, eventName);
552   }
553 #endif
554   traceWriteSTS(stsfp,CkpvAccess(usrEvents)->length());
555   for(i=0;i<CkpvAccess(usrEvents)->length();i++){
556     fprintf(stsfp, "EVENT %d %s\n", (*CkpvAccess(usrEvents))[i]->e, (*CkpvAccess(usrEvents))[i]->str);
557   }     
558 }
559
560 void LogPool::writeSts(TraceProjections *traceProj){
561   writeSts();
562   if (traceProj != NULL) {
563     CkHashtableIterator  *funcIter = traceProj->getfuncIterator();
564     funcIter->seekStart();
565     int numFuncs = traceProj->getFuncNumber();
566     fprintf(stsfp,"TOTAL_FUNCTIONS %d \n",numFuncs);
567     while(funcIter->hasNext()) {
568       StrKey *key;
569       int *obj = (int *)funcIter->next((void **)&key);
570       fprintf(stsfp,"FUNCTION %d %s \n",*obj,key->getStr());
571     }
572   }
573   fprintf(stsfp, "END\n");
574   fclose(stsfp);
575 }
576
577 void LogPool::writeRC(void)
578 {
579     //CkPrintf("write RC is being executed\n");
580 #ifdef PROJ_ANALYSIS  
581     CkAssert(CkMyPe() == 0);
582     fprintf(rcfp,"RC_GLOBAL_START_TIME %lld\n",
583           (CMK_TYPEDEF_UINT8)(1.0e6*globalStartTime));
584     fprintf(rcfp,"RC_GLOBAL_END_TIME   %lld\n",
585           (CMK_TYPEDEF_UINT8)(1.0e6*globalEndTime));
586     /* //Yanhua comment it because isOutlierAutomatic is not a variable in trace
587     if (CkpvAccess(_trace)->isOutlierAutomatic()) {
588       fprintf(rcfp,"RC_OUTLIER_FILTERED true\n");
589     } else {
590       fprintf(rcfp,"RC_OUTLIER_FILTERED false\n");
591     }
592     */
593 #endif //PROJ_ANALYSIS
594   fclose(rcfp);
595 }
596
597
598 #if CMK_BIGSIM_CHARM
599 static void updateProjLog(void *data, double t, double recvT, void *ptr)
600 {
601   LogEntry *log = (LogEntry *)data;
602   FILE *fp = *(FILE **)ptr;
603   log->time = t;
604   log->recvTime = recvT<0.0?0:recvT;
605 //  log->write(fp);
606   toProjectionsFile p(fp);
607   log->pup(p);
608 }
609 #endif
610
611 // flush log entries to disk
612 void LogPool::flushLogBuffer()
613 {
614   if (numEntries) {
615     double writeTime = TraceTimer();
616     writeLog();
617     hasFlushed = true;
618     numEntries = 0;
619     new (&pool[numEntries++]) LogEntry(writeTime, BEGIN_INTERRUPT);
620     new (&pool[numEntries++]) LogEntry(TraceTimer(), END_INTERRUPT);
621   }
622 }
623
624 void LogPool::add(UChar type, UShort mIdx, UShort eIdx,
625                   double time, int event, int pe, int ml, CmiObjId *id, 
626                   double recvT, double cpuT, int numPe)
627 {
628   new (&pool[numEntries++])
629     LogEntry(time, type, mIdx, eIdx, event, pe, ml, id, recvT, cpuT, numPe);
630   if ((type == END_PHASE) || (type == END_COMPUTATION)) {
631     numPhases++;
632   }
633   if(poolSize==numEntries) {
634     flushLogBuffer();
635 #if CMK_BIGSIM_CHARM
636     extern int correctTimeLog;
637     if (correctTimeLog) CmiAbort("I/O interrupt!\n");
638 #endif
639   }
640 #if CMK_BIGSIM_CHARM
641   switch (type) {
642     case BEGIN_PROCESSING:
643       pool[numEntries-1].recvTime = BgGetRecvTime();
644     case END_PROCESSING:
645     case BEGIN_COMPUTATION:
646     case END_COMPUTATION:
647     case CREATION:
648     case BEGIN_PACK:
649     case END_PACK:
650     case BEGIN_UNPACK:
651     case END_UNPACK:
652     case USER_EVENT_PAIR:
653       bgAddProjEvent(&pool[numEntries-1], numEntries-1, time, updateProjLog, &fp, BG_EVENT_PROJ);
654   }
655 #endif
656 }
657
658 void LogPool::add(UChar type,double time,UShort funcID,int lineNum,char *fileName){
659 #ifndef CMK_BIGSIM_CHARM
660   new (&pool[numEntries++])
661         LogEntry(time,type,funcID,lineNum,fileName);
662   if(poolSize == numEntries){
663     flushLogBuffer();
664   }
665 #endif  
666 }
667
668
669   
670 void LogPool::addMemoryUsage(unsigned char type,double time,double memUsage){
671 #ifndef CMK_BIGSIM_CHARM
672   new (&pool[numEntries++])
673         LogEntry(type,time,memUsage);
674   if(poolSize == numEntries){
675     flushLogBuffer();
676   }
677 #endif  
678         
679 }  
680
681
682
683 void LogPool::addUserSupplied(int data){
684         // add an event
685         add(USER_SUPPLIED, 0, 0, TraceTimer(), -1, -1, 0, 0, 0, 0, 0 );
686
687         // set the user supplied value for the previously created event 
688         pool[numEntries-1].setUserSuppliedData(data);
689   }
690
691
692 void LogPool::addUserSuppliedNote(char *note){
693         // add an event
694         add(USER_SUPPLIED_NOTE, 0, 0, TraceTimer(), -1, -1, 0, 0, 0, 0, 0 );
695
696         // set the user supplied note for the previously created event 
697         pool[numEntries-1].setUserSuppliedNote(note);
698   }
699
700 void LogPool::addUserSuppliedBracketedNote(char *note, int eventID, double bt, double et){
701   //CkPrintf("LogPool::addUserSuppliedBracketedNote eventID=%d\n", eventID);
702 #ifndef CMK_BIGSIM_CHARM
703 #if MPI_TRACE_MACHINE_HACK
704   //This part of code is used  to combine the contiguous
705   //MPI_Test and MPI_Iprobe events to reduce the number of
706   //entries
707 #define MPI_TEST_EVENT_ID 60
708 #define MPI_IPROBE_EVENT_ID 70 
709   int lastEvent = pool[numEntries-1].event;
710   if((eventID==MPI_TEST_EVENT_ID || eventID==MPI_IPROBE_EVENT_ID) && (eventID==lastEvent)){
711     //just replace the endtime of last event
712     //CkPrintf("addUserSuppliedBracketNote: for event %d\n", lastEvent);
713     pool[numEntries].endTime = et;
714   }else{
715     new (&pool[numEntries++])
716       LogEntry(bt, et, USER_SUPPLIED_BRACKETED_NOTE, note, eventID);
717   }
718 #else
719   new (&pool[numEntries++])
720     LogEntry(bt, et, USER_SUPPLIED_BRACKETED_NOTE, note, eventID);
721 #endif
722   if(poolSize == numEntries){
723     flushLogBuffer();
724   }
725 #endif  
726 }
727
728
729 /* **CW** Not sure if this is the right thing to do. Feels more like
730    a hack than a solution to Sameer's request to add the destination
731    processor information to multicasts and broadcasts.
732
733    In the unlikely event this method is used for Broadcasts as well,
734    pelist == NULL will be used to indicate a global broadcast with 
735    num PEs.
736 */
737 void LogPool::addCreationMulticast(UShort mIdx, UShort eIdx, double time,
738                                    int event, int pe, int ml, CmiObjId *id,
739                                    double recvT, int numPe, int *pelist)
740 {
741   new (&pool[numEntries++])
742     LogEntry(time, mIdx, eIdx, event, pe, ml, id, recvT, numPe, pelist);
743   if(poolSize==numEntries) {
744     flushLogBuffer();
745   }
746 }
747
748 void LogPool::postProcessLog()
749 {
750 #if CMK_BIGSIM_CHARM
751   bgUpdateProj(1);   // event type
752 #endif
753 }
754
755 void LogPool::modLastEntryTimestamp(double ts)
756 {
757   pool[numEntries-1].time = ts;
758   //pool[numEntries-1].cputime = ts;
759 }
760
761 // /** Constructor for a multicast log entry */
762 // 
763 //  THIS WAS MOVED TO trace-projections.h with the other constructors
764 // 
765 // LogEntry::LogEntry(double tm, unsigned short m, unsigned short e, int ev, int p,
766 //           int ml, CmiObjId *d, double rt, int numPe, int *pelist) 
767 // {
768 //     type = CREATION_MULTICAST; mIdx = m; eIdx = e; event = ev; pe = p; time = tm; msglen = ml;
769 //     if (d) id = *d; else {id.id[0]=id.id[1]=id.id[2]=id.id[3]=-1; };
770 //     recvTime = rt; 
771 //     numpes = numPe;
772 //     userSuppliedNote = NULL;
773 //     if (pelist != NULL) {
774 //      pes = new int[numPe];
775 //      for (int i=0; i<numPe; i++) {
776 //        pes[i] = pelist[i];
777 //      }
778 //     } else {
779 //      pes= NULL;
780 //     }
781 // }
782
783 //void LogEntry::addPapi(LONG_LONG_PAPI *papiVals)
784 //{
785 //#if CMK_HAS_COUNTER_PAPI
786 //   memcpy(papiValues, papiVals, sizeof(LONG_LONG_PAPI)*NUMPAPIEVENTS);
787 //#endif
788 //}
789
790
791
792 void LogEntry::pup(PUP::er &p)
793 {
794   int i;
795   CMK_TYPEDEF_UINT8 itime, iEndTime, irecvtime, icputime;
796   char ret = '\n';
797
798   p|type;
799   if (p.isPacking()) itime = (CMK_TYPEDEF_UINT8)(1.0e6*time);
800   if (p.isPacking()) iEndTime = (CMK_TYPEDEF_UINT8)(1.0e6*endTime);
801
802   switch (type) {
803     case USER_EVENT:
804     case USER_EVENT_PAIR:
805       p|mIdx; p|itime; p|event; p|pe;
806       break;
807     case BEGIN_IDLE:
808     case END_IDLE:
809     case BEGIN_PACK:
810     case END_PACK:
811     case BEGIN_UNPACK:
812     case END_UNPACK:
813       p|itime; p|pe; 
814       break;
815     case BEGIN_PROCESSING:
816       if (p.isPacking()) {
817         irecvtime = (CMK_TYPEDEF_UINT8)(recvTime==-1?-1:1.0e6*recvTime);
818         icputime = (CMK_TYPEDEF_UINT8)(1.0e6*cputime);
819       }
820       p|mIdx; p|eIdx; p|itime; p|event; p|pe; 
821       p|msglen; p|irecvtime; 
822       p|id.id[0]; p|id.id[1]; p|id.id[2]; p|id.id[3];
823       p|icputime;
824 #if CMK_HAS_COUNTER_PAPI
825       //p|numPapiEvents;
826       for (i=0; i<NUMPAPIEVENTS; i++) {
827         // not yet!!!
828         //      p|papiIDs[i]; 
829         p|papiValues[i];
830         
831       }
832 #else
833       //p|numPapiEvents;     // non papi version has value 0
834 #endif
835       if (p.isUnpacking()) {
836         recvTime = irecvtime/1.0e6;
837         cputime = icputime/1.0e6;
838       }
839       break;
840     case END_PROCESSING:
841       if (p.isPacking()) icputime = (CMK_TYPEDEF_UINT8)(1.0e6*cputime);
842       p|mIdx; p|eIdx; p|itime; p|event; p|pe; p|msglen; p|icputime;
843 #if CMK_HAS_COUNTER_PAPI
844       //p|numPapiEvents;
845       for (i=0; i<NUMPAPIEVENTS; i++) {
846         // not yet!!!
847         //      p|papiIDs[i];
848         p|papiValues[i];
849       }
850 #else
851       //p|numPapiEvents;  // non papi version has value 0
852 #endif
853       if (p.isUnpacking()) cputime = icputime/1.0e6;
854       break;
855     case USER_SUPPLIED:
856           p|userSuppliedData;
857           p|itime;
858         break;
859     case USER_SUPPLIED_NOTE:
860           p|itime;
861           int length;
862           if (p.isPacking()) length = strlen(userSuppliedNote);
863           p | length;
864           char space;
865           space = ' ';
866           p | space;
867           if (p.isUnpacking()) {
868             userSuppliedNote = new char[length+1];
869             userSuppliedNote[length] = '\0';
870           }
871           PUParray(p,userSuppliedNote, length);
872           break;
873     case USER_SUPPLIED_BRACKETED_NOTE:
874       //CkPrintf("Writting out a USER_SUPPLIED_BRACKETED_NOTE\n");
875           p|itime;
876           p|iEndTime;
877           p|event;
878           int length2;
879           if (p.isPacking()) length2 = strlen(userSuppliedNote);
880           p | length2;
881           char space2;
882           space2 = ' ';
883           p | space2;
884           if (p.isUnpacking()) {
885             userSuppliedNote = new char[length+1];
886             userSuppliedNote[length] = '\0';
887           }
888           PUParray(p,userSuppliedNote, length2);
889           break;
890     case MEMORY_USAGE_CURRENT:
891       p | memUsage;
892       p | itime;
893         break;
894     case CREATION:
895       if (p.isPacking()) irecvtime = (CMK_TYPEDEF_UINT8)(1.0e6*recvTime);
896       p|mIdx; p|eIdx; p|itime;
897       p|event; p|pe; p|msglen; p|irecvtime;
898       if (p.isUnpacking()) recvTime = irecvtime/1.0e6;
899       break;
900     case CREATION_BCAST:
901       if (p.isPacking()) irecvtime = (CMK_TYPEDEF_UINT8)(1.0e6*recvTime);
902       p|mIdx; p|eIdx; p|itime;
903       p|event; p|pe; p|msglen; p|irecvtime; p|numpes;
904       if (p.isUnpacking()) recvTime = irecvtime/1.0e6;
905       break;
906     case CREATION_MULTICAST:
907       if (p.isPacking()) irecvtime = (CMK_TYPEDEF_UINT8)(1.0e6*recvTime);
908       p|mIdx; p|eIdx; p|itime;
909       p|event; p|pe; p|msglen; p|irecvtime; p|numpes;
910       if (p.isUnpacking()) pes = numpes?new int[numpes]:NULL;
911       for (i=0; i<numpes; i++) p|pes[i];
912       if (p.isUnpacking()) recvTime = irecvtime/1.0e6;
913       break;
914     case MESSAGE_RECV:
915       p|mIdx; p|eIdx; p|itime; p|event; p|pe; p|msglen;
916       break;
917
918     case ENQUEUE:
919     case DEQUEUE:
920       p|mIdx; p|itime; p|event; p|pe;
921       break;
922
923     case BEGIN_INTERRUPT:
924     case END_INTERRUPT:
925       p|itime; p|event; p|pe;
926       break;
927
928       // **CW** absolute timestamps are used here to support a quick
929       // way of determining the total time of a run in projections
930       // visualization.
931     case BEGIN_COMPUTATION:
932     case END_COMPUTATION:
933     case BEGIN_TRACE:
934     case END_TRACE:
935       p|itime;
936       break;
937     case BEGIN_FUNC:
938         p | itime;
939         p | mIdx;
940         p | event;
941         if(!p.isUnpacking()){
942                 p(fName,flen-1);
943         }
944         break;
945     case END_FUNC:
946         p | itime;
947         p | mIdx;
948         break;
949     case END_PHASE:
950       p|eIdx; // FIXME: actually the phase ID
951       p|itime;
952       break;
953     default:
954       CmiError("***Internal Error*** Wierd Event %d.\n", type);
955       break;
956   }
957   if (p.isUnpacking()) time = itime/1.0e6;
958   p|ret;
959 }
960
961 TraceProjections::TraceProjections(char **argv): 
962   curevent(0), inEntry(0), computationStarted(0), 
963         converseExit(0), endTime(0.0), traceNestedEvents(0),
964         currentPhaseID(0), lastPhaseEvent(NULL)
965 {
966   //  CkPrintf("Trace projections dummy constructor called on %d\n",CkMyPe());
967
968   if (CkpvAccess(traceOnPe) == 0) return;
969
970   CtvInitialize(int,curThreadEvent);
971   CkpvInitialize(CmiInt8, CtrLogBufSize);
972   CkpvAccess(CtrLogBufSize) = DefaultLogBufSize;
973   CtvAccess(curThreadEvent)=0;
974   if (CmiGetArgLongDesc(argv,"+logsize",&CkpvAccess(CtrLogBufSize), 
975                        "Log entries to buffer per I/O")) {
976     if (CkMyPe() == 0) {
977       CmiPrintf("Trace: logsize: %ld\n", CkpvAccess(CtrLogBufSize));
978     }
979   }
980   checknested = 
981     CmiGetArgFlagDesc(argv,"+checknested",
982                       "check projections nest begin end execute events");
983   traceNestedEvents = 
984     CmiGetArgFlagDesc(argv,"+tracenested",
985               "trace projections nest begin/end execute events");
986   int binary = 
987     CmiGetArgFlagDesc(argv,"+binary-trace",
988                       "Write log files in binary format");
989
990   CmiInt8 nSubdirs = 0;
991   CmiGetArgLongDesc(argv,"+trace-subdirs", &nSubdirs, "Number of subdirectories into which traces will be written");
992
993
994 #if CMK_PROJECTIONS_USE_ZLIB
995   int compressed = CmiGetArgFlagDesc(argv,"+gz-trace","Write log files pre-compressed with gzip");
996 #else
997   // consume the flag so there's no confusing
998   CmiGetArgFlagDesc(argv,"+gz-trace",
999                     "Write log files pre-compressed with gzip");
1000   if(CkMyPe() == 0) CkPrintf("Warning> gz-trace is not supported on this machine!\n");
1001 #endif
1002
1003   // **CW** default to non delta log encoding. The user may choose to do
1004   // create both logs (for debugging) or just the old log timestamping
1005   // (for compatibility).
1006   // Generating just the non delta log takes precedence over generating
1007   // both logs (if both arguments appear on the command line).
1008
1009   // switch to OLD log format until everything works // Gengbin
1010   nonDeltaLog = 1;
1011   deltaLog = 0;
1012   deltaLog = CmiGetArgFlagDesc(argv, "+logDelta",
1013                                   "Generate Delta encoded and simple timestamped log files");
1014
1015   _logPool = new LogPool(CkpvAccess(traceRoot));
1016   _logPool->setNumSubdirs(nSubdirs);
1017   _logPool->setBinary(binary);
1018 #if CMK_PROJECTIONS_USE_ZLIB
1019   _logPool->setCompressed(compressed);
1020 #endif
1021   if (CkMyPe() == 0) {
1022     _logPool->createSts();
1023     _logPool->createRC();
1024   }
1025   funcCount=1;
1026
1027 #if CMK_HAS_COUNTER_PAPI
1028   // We initialize and create the event sets for use with PAPI here.
1029   int papiRetValue;
1030   if(CkMyRank()==0){
1031     papiRetValue = PAPI_library_init(PAPI_VER_CURRENT);
1032     if (papiRetValue != PAPI_VER_CURRENT) {
1033       CmiAbort("PAPI Library initialization failure!\n");
1034     }
1035 #if CMK_SMP
1036     if(PAPI_thread_init(pthread_self) != PAPI_OK){
1037       CmiAbort("PAPI could not be initialized in SMP mode!\n");
1038     }
1039 #endif
1040   }
1041
1042 #if CMK_SMP
1043   //PAPI_thread_init has to finish before calling PAPI_create_eventset
1044   CmiNodeAllBarrier();
1045 #endif
1046   // PAPI 3 mandates the initialization of the set to PAPI_NULL
1047   papiEventSet = PAPI_NULL; 
1048   if (PAPI_create_eventset(&papiEventSet) != PAPI_OK) {
1049     CmiAbort("PAPI failed to create event set!\n");
1050   }
1051   papiRetValue = PAPI_add_events(papiEventSet, papiEvents, NUMPAPIEVENTS);
1052   if (papiRetValue != PAPI_OK) {
1053     if (papiRetValue == PAPI_ECNFLCT) {
1054       CmiAbort("PAPI events conflict! Please re-assign event types!\n");
1055     } else {
1056       CmiAbort("PAPI failed to add designated events!\n");
1057     }
1058   }
1059   memset(papiValues, 0, NUMPAPIEVENTS*sizeof(LONG_LONG_PAPI));
1060 #endif
1061 }
1062
1063 int TraceProjections::traceRegisterUserEvent(const char* evt, int e)
1064 {
1065   OPTIMIZED_VERSION
1066   CkAssert(e==-1 || e>=0);
1067   CkAssert(evt != NULL);
1068   int event;
1069   int biggest = -1;
1070   for (int i=0; i<CkpvAccess(usrEvents)->length(); i++) {
1071     int cur = (*CkpvAccess(usrEvents))[i]->e;
1072     if (cur == e) {
1073       //CmiPrintf("%s %s\n", (*CkpvAccess(usrEvents))[i]->str, evt);
1074       if (strcmp((*CkpvAccess(usrEvents))[i]->str, evt) == 0) 
1075         return e;
1076       else
1077         CmiAbort("UserEvent double registered!");
1078     }
1079     if (cur > biggest) biggest = cur;
1080   }
1081   // if no user events have so far been registered. biggest will be -1
1082   // and hence newly assigned event numbers will begin from 0.
1083   if (e==-1) event = biggest+1;  // automatically assign new event number
1084   else event = e;
1085   CkpvAccess(usrEvents)->push_back(new UsrEvent(event,(char *)evt));
1086   return event;
1087 }
1088
1089 void TraceProjections::traceClearEps(void)
1090 {
1091   // In trace-summary, this zeros out the EP bins, to eliminate noise
1092   // from startup.  Here, this isn't useful, since we can do that in
1093   // post-processing
1094 }
1095
1096 void TraceProjections::traceWriteSts(void)
1097 {
1098   if(CkMyPe()==0)
1099     _logPool->writeSts(this);
1100 }
1101
1102 /** 
1103  * **IMPT NOTES**:
1104  *
1105  * This is called when Converse closes during ConverseCommonExit().
1106  * **FIXME**(?) - is this also exposed as a tracing-framework API call?
1107  *
1108  * Some programs bypass CkExit() (like NAMD, which eventually calls
1109  * ConverseExit()), modules like traces will have to pretend to shutdown
1110  * as if CkExit() was called but at the same time avoid making
1111  * subsequent CkExit() calls (which is usually required for allowing
1112  * other modules to shutdown).
1113  *
1114  * Note that we can only get here if CkExit() was not called, since the
1115  * trace module will un-register itself from TraceArray if it did.
1116  *
1117  */
1118 void TraceProjections::traceClose(void)
1119 {
1120 #ifdef PROJ_ANALYSIS
1121   // CkPrintf("CkExit was not called on shutdown on [%d]\n", CkMyPe());
1122
1123   // sets the flag that tells the code not to make the CkExit call later
1124   converseExit = 1;
1125   if (CkMyPe() == 0) {
1126     CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1127     bocProxy.traceProjectionsParallelShutdown(-1);
1128   }
1129   if(CkMyRank() == CkMyNodeSize()){ //communication thread
1130     CkpvAccess(_trace)->endComputation();
1131     delete _logPool;              // will write
1132     // remove myself from traceArray so that no tracing will be called.
1133     CkpvAccess(_traces)->removeTrace(this);
1134   }
1135 #else
1136   // we've already deleted the logpool, so multiple calls to traceClose
1137   // are tolerated.
1138   if (_logPool == NULL) {
1139     return;
1140   }
1141   if(CkMyPe()==0){
1142     _logPool->writeSts(this);
1143   }
1144   CkpvAccess(_trace)->endComputation();
1145   delete _logPool;              // will write
1146   // remove myself from traceArray so that no tracing will be called.
1147   CkpvAccess(_traces)->removeTrace(this);
1148 #endif
1149 }
1150
1151 /**
1152  *  **IMPT NOTES**:
1153  *
1154  *  This is meant to be called internally by the tracing framework.
1155  *
1156  */
1157 void TraceProjections::closeTrace() {
1158   //  CkPrintf("Close Trace called on [%d]\n", CkMyPe());
1159   if (CkMyPe() == 0) {
1160     // CkPrintf("Pe 0 will now write sts and projrc files\n");
1161     _logPool->writeSts(this);
1162     _logPool->writeRC();
1163     // CkPrintf("Pe 0 has now written sts and projrc files\n");
1164   }
1165   delete _logPool;       // will write logs to file
1166 }
1167
1168 #if CMK_SMP_TRACE_COMMTHREAD
1169 void TraceProjections::traceBeginOnCommThread()
1170 {
1171   if (!computationStarted) return;
1172   _logPool->add(BEGIN_TRACE, 0, 0, TraceTimer(), curevent++, CmiNumPes()+CmiMyNode());
1173 }
1174
1175 void TraceProjections::traceEndOnCommThread()
1176 {
1177   _logPool->add(END_TRACE, 0, 0, TraceTimer(), curevent++, CmiNumPes()+CmiMyNode());
1178 }
1179 #endif
1180
1181 void TraceProjections::traceBegin(void)
1182 {
1183   if (!computationStarted) return;
1184   _logPool->add(BEGIN_TRACE, 0, 0, TraceTimer(), curevent++, CkMyPe());
1185 }
1186
1187 void TraceProjections::traceEnd(void)
1188 {
1189   _logPool->add(END_TRACE, 0, 0, TraceTimer(), curevent++, CkMyPe());
1190 }
1191
1192 void TraceProjections::userEvent(int e)
1193 {
1194   if (!computationStarted) return;
1195   _logPool->add(USER_EVENT, e, 0, TraceTimer(),curevent++,CkMyPe());
1196 }
1197
1198 void TraceProjections::userBracketEvent(int e, double bt, double et)
1199 {
1200   if (!computationStarted) return;
1201   // two events record Begin/End of event e.
1202   _logPool->add(USER_EVENT_PAIR, e, 0, TraceTimer(bt), curevent, CkMyPe());
1203   _logPool->add(USER_EVENT_PAIR, e, 0, TraceTimer(et), curevent++, CkMyPe());
1204 }
1205
1206 void TraceProjections::userSuppliedData(int d)
1207 {
1208   if (!computationStarted) return;
1209   _logPool->addUserSupplied(d);
1210 }
1211
1212 void TraceProjections::userSuppliedNote(char *note)
1213 {
1214   if (!computationStarted) return;
1215   _logPool->addUserSuppliedNote(note);
1216 }
1217
1218
1219 void TraceProjections::userSuppliedBracketedNote(char *note, int eventID, double bt, double et)
1220 {
1221   if (!computationStarted) return;
1222   _logPool->addUserSuppliedBracketedNote(note,  eventID,  bt, et);
1223 }
1224
1225 void TraceProjections::memoryUsage(double m)
1226 {
1227   if (!computationStarted) return;
1228   _logPool->addMemoryUsage(MEMORY_USAGE_CURRENT, TraceTimer(), m );
1229   
1230 }
1231
1232
1233 void TraceProjections::creation(envelope *e, int ep, int num)
1234 {
1235   double curTime = TraceTimer();
1236   if (e == 0) {
1237     CtvAccess(curThreadEvent) = curevent;
1238     _logPool->add(CREATION, ForChareMsg, ep, curTime,
1239                   curevent++, CkMyPe(), 0, NULL, 0, 0.0);
1240   } else {
1241     int type=e->getMsgtype();
1242     e->setEvent(curevent);
1243     if (num > 1) {
1244       _logPool->add(CREATION_BCAST, type, ep, curTime,
1245                     curevent++, CkMyPe(), e->getTotalsize(), 
1246                     NULL, 0, 0.0, num);
1247     } else {
1248       _logPool->add(CREATION, type, ep, curTime,
1249                     curevent++, CkMyPe(), e->getTotalsize(), 
1250                     NULL, 0, 0.0);
1251     }
1252   }
1253 }
1254
1255 void TraceProjections::creation(char *msg)
1256 {
1257 #if CMK_SMP_TRACE_COMMTHREAD
1258         //This function is only called from a comm thread
1259         //in SMP mode. So, it is possible the msg is not
1260         //a charm msg that contains an envelope, ep idx.
1261         envelope *e = (envelope *)msg;
1262         int ep = e->getEpIdx();
1263         int num = _entryTable.size();
1264         if(ep<num && ep>=0 && _entryTable[ep]->traceEnabled)
1265                 creation(e, ep, 1);
1266 #endif
1267 }
1268
1269
1270 /* **CW** Non-disruptive attempt to add destination PE knowledge to
1271    Communication Library-specific Multicasts via new event 
1272    CREATION_MULTICAST.
1273 */
1274
1275 void TraceProjections::creationMulticast(envelope *e, int ep, int num,
1276                                          int *pelist)
1277 {
1278   double curTime = TraceTimer();
1279   if (e==0) {
1280     CtvAccess(curThreadEvent)=curevent;
1281     _logPool->addCreationMulticast(ForChareMsg, ep, curTime, curevent++,
1282                                    CkMyPe(), 0, 0, 0.0, num, pelist);
1283   } else {
1284     int type=e->getMsgtype();
1285     e->setEvent(curevent);
1286     _logPool->addCreationMulticast(type, ep, curTime, curevent++, CkMyPe(),
1287                                    e->getTotalsize(), 0, 0.0, num, pelist);
1288   }
1289 }
1290
1291 void TraceProjections::creationDone(int num)
1292 {
1293   // modified the creation done time of the last num log entries
1294   // FIXME: totally a hack
1295   double curTime = TraceTimer();
1296   int idx = _logPool->numEntries-1;
1297   while (idx >=0 && num >0 ) {
1298     LogEntry &log = _logPool->pool[idx];
1299     if ((log.type == CREATION) ||
1300         (log.type == CREATION_BCAST) ||
1301         (log.type == CREATION_MULTICAST)) {
1302       log.recvTime = curTime - log.time;
1303       num --;
1304     }
1305     idx--;
1306   }
1307 }
1308
1309 void TraceProjections::beginExecute(CmiObjId *tid)
1310 {
1311 #if CMK_HAS_COUNTER_PAPI
1312   if (PAPI_read(papiEventSet, papiValues) != PAPI_OK) {
1313     CmiAbort("PAPI failed to read at begin execute!\n");
1314   }
1315 #endif
1316   if (checknested && inEntry) CmiAbort("Nested Begin Execute!\n");
1317   execEvent = CtvAccess(curThreadEvent);
1318   execEp = (-1);
1319   _logPool->add(BEGIN_PROCESSING,ForChareMsg,_threadEP,TraceTimer(),
1320                 execEvent,CkMyPe(), 0, tid);
1321 #if CMK_HAS_COUNTER_PAPI
1322   _logPool->addPapi(papiValues);
1323 #endif
1324   inEntry = 1;
1325 }
1326
1327 void TraceProjections::beginExecute(envelope *e)
1328 {
1329   if(e==0) {
1330 #if CMK_HAS_COUNTER_PAPI
1331     if (PAPI_read(papiEventSet, papiValues) != PAPI_OK) {
1332       CmiAbort("PAPI failed to read at begin execute!\n");
1333     }
1334 #endif
1335     if (checknested && inEntry) CmiAbort("Nested Begin Execute!\n");
1336     execEvent = CtvAccess(curThreadEvent);
1337     execEp = (-1);
1338     _logPool->add(BEGIN_PROCESSING,ForChareMsg,_threadEP,TraceTimer(),
1339                   execEvent,CkMyPe(), 0, NULL, 0.0, TraceCpuTimer());
1340 #if CMK_HAS_COUNTER_PAPI
1341     _logPool->addPapi(papiValues);
1342 #endif
1343     inEntry = 1;
1344   } else {
1345     beginExecute(e->getEvent(),e->getMsgtype(),e->getEpIdx(),
1346                  e->getSrcPe(),e->getTotalsize());
1347   }
1348 }
1349
1350 void TraceProjections::beginExecute(char *msg){
1351 #if CMK_SMP_TRACE_COMMTHREAD
1352         //This function is called from comm thread in SMP mode
1353     envelope *e = (envelope *)msg;
1354     int num = _entryTable.size();
1355     int ep = e->getEpIdx();
1356     if(ep<0 || ep>=num) return;
1357     if(_entryTable[ep]->traceEnabled)
1358                 beginExecute(e);
1359 #endif
1360 }
1361
1362 void TraceProjections::beginExecute(int event, int msgType, int ep, int srcPe,
1363                                     int mlen, CmiObjId *idx)
1364 {
1365   if (traceNestedEvents) {
1366     if (! nestedEvents.isEmpty()) {
1367       endExecuteLocal();
1368     }
1369     nestedEvents.enq(NestedEvent(event, msgType, ep, srcPe, mlen, idx));
1370   }
1371   beginExecuteLocal(event, msgType, ep, srcPe, mlen, idx);
1372 }
1373
1374 void TraceProjections::changeLastEntryTimestamp(double ts)
1375 {
1376   _logPool->modLastEntryTimestamp(ts);
1377 }
1378
1379 void TraceProjections::beginExecuteLocal(int event, int msgType, int ep, int srcPe,
1380                                     int mlen, CmiObjId *idx)
1381 {
1382 #if CMK_HAS_COUNTER_PAPI
1383   if (PAPI_read(papiEventSet, papiValues) != PAPI_OK) {
1384     CmiAbort("PAPI failed to read at begin execute!\n");
1385   }
1386 #endif
1387   if (checknested && inEntry) CmiAbort("Nested Begin Execute!\n");
1388   execEvent=event;
1389   execEp=ep;
1390   execPe=srcPe;
1391   _logPool->add(BEGIN_PROCESSING,msgType,ep,TraceTimer(),event,
1392                 srcPe, mlen, idx, 0.0, TraceCpuTimer());
1393 #if CMK_HAS_COUNTER_PAPI
1394   _logPool->addPapi(papiValues);
1395 #endif
1396   inEntry = 1;
1397 }
1398
1399 void TraceProjections::endExecute(void)
1400 {
1401   if (traceNestedEvents) nestedEvents.deq();
1402   endExecuteLocal();
1403   if (traceNestedEvents) {
1404     if (! nestedEvents.isEmpty()) {
1405       NestedEvent &ne = nestedEvents.peek();
1406       beginExecuteLocal(ne.event, ne.msgType, ne.ep, ne.srcPe, ne.ml, ne.idx);
1407     }
1408   }
1409 }
1410
1411 void TraceProjections::endExecute(char *msg)
1412 {
1413 #if CMK_SMP_TRACE_COMMTHREAD
1414         //This function is called from comm thread in SMP mode
1415     envelope *e = (envelope *)msg;
1416     int num = _entryTable.size();
1417     int ep = e->getEpIdx();
1418     if(ep<0 || ep>=num) return;
1419     if(_entryTable[ep]->traceEnabled)
1420                 endExecute();
1421 #endif  
1422 }
1423
1424 void TraceProjections::endExecuteLocal(void)
1425 {
1426 #if CMK_HAS_COUNTER_PAPI
1427   if (PAPI_read(papiEventSet, papiValues) != PAPI_OK) {
1428     CmiAbort("PAPI failed to read at end execute!\n");
1429   }
1430 #endif
1431   if (checknested && !inEntry) CmiAbort("Nested EndExecute!\n");
1432   double cputime = TraceCpuTimer();
1433   if(execEp == (-1)) {
1434     _logPool->add(END_PROCESSING, 0, _threadEP, TraceTimer(),
1435                   execEvent, CkMyPe(), 0, NULL, 0.0, cputime);
1436   } else {
1437     _logPool->add(END_PROCESSING, 0, execEp, TraceTimer(),
1438                   execEvent, execPe, 0, NULL, 0.0, cputime);
1439   }
1440 #if CMK_HAS_COUNTER_PAPI
1441   _logPool->addPapi(papiValues);
1442 #endif
1443   inEntry = 0;
1444 }
1445
1446 void TraceProjections::messageRecv(char *env, int pe)
1447 {
1448 #if 0
1449   envelope *e = (envelope *)env;
1450   int msgType = e->getMsgtype();
1451   int ep = e->getEpIdx();
1452 #if 0
1453   if (msgType==NewChareMsg || msgType==NewVChareMsg
1454           || msgType==ForChareMsg || msgType==ForVidMsg
1455           || msgType==BocInitMsg || msgType==NodeBocInitMsg
1456           || msgType==ForBocMsg || msgType==ForNodeBocMsg)
1457     ep = e->getEpIdx();
1458   else
1459     ep = _threadEP;
1460 #endif
1461   _logPool->add(MESSAGE_RECV, msgType, ep, TraceTimer(),
1462                 curevent++, e->getSrcPe(), e->getTotalsize());
1463 #endif
1464 }
1465
1466 void TraceProjections::beginIdle(double curWallTime)
1467 {
1468   _logPool->add(BEGIN_IDLE, 0, 0, TraceTimer(curWallTime), 0, CkMyPe());
1469 }
1470
1471 void TraceProjections::endIdle(double curWallTime)
1472 {
1473   _logPool->add(END_IDLE, 0, 0, TraceTimer(curWallTime), 0, CkMyPe());
1474 }
1475
1476 void TraceProjections::beginPack(void)
1477 {
1478   _logPool->add(BEGIN_PACK, 0, 0, TraceTimer(), 0, CkMyPe());
1479 }
1480
1481 void TraceProjections::endPack(void)
1482 {
1483   _logPool->add(END_PACK, 0, 0, TraceTimer(), 0, CkMyPe());
1484 }
1485
1486 void TraceProjections::beginUnpack(void)
1487 {
1488   _logPool->add(BEGIN_UNPACK, 0, 0, TraceTimer(), 0, CkMyPe());
1489 }
1490
1491 void TraceProjections::endUnpack(void)
1492 {
1493   _logPool->add(END_UNPACK, 0, 0, TraceTimer(), 0, CkMyPe());
1494 }
1495
1496 void TraceProjections::enqueue(envelope *) {}
1497
1498 void TraceProjections::dequeue(envelope *) {}
1499
1500 void TraceProjections::beginComputation(void)
1501 {
1502   computationStarted = 1;
1503
1504   // Executes the callback function provided by the machine
1505   // layer. This is the proper method to register user events in a
1506   // machine layer because projections is a charm++ module.
1507   if (CkpvAccess(traceOnPe) != 0) {
1508     void (*ptr)() = registerMachineUserEvents();
1509     if (ptr != NULL) {
1510       ptr();
1511     }
1512   }
1513 //  CkpvAccess(traceInitTime) = TRACE_TIMER();
1514 //  CkpvAccess(traceInitCpuTime) = TRACE_CPUTIMER();
1515   _logPool->add(BEGIN_COMPUTATION, 0, 0, TraceTimer(), -1, -1);
1516 #if CMK_HAS_COUNTER_PAPI
1517   // we start the counters here
1518   if (PAPI_start(papiEventSet) != PAPI_OK) {
1519     CmiAbort("PAPI failed to start designated counters!\n");
1520   }
1521 #endif
1522 }
1523
1524 void TraceProjections::endComputation(void)
1525 {
1526 #if CMK_HAS_COUNTER_PAPI
1527   // we stop the counters here. A silent failure is alright since we
1528   // are already at the end of the program.
1529   if (PAPI_stop(papiEventSet, papiValues) != PAPI_OK) {
1530     CkPrintf("Warning: PAPI failed to stop correctly!\n");
1531   }
1532   // NOTE: We should not do a complete close of PAPI until after the
1533   // sts writer is done.
1534 #endif
1535   endTime = TraceTimer();
1536   _logPool->add(END_COMPUTATION, 0, 0, endTime, -1, -1);
1537   /*
1538   CkPrintf("End Computation [%d] records time as %lf\n", CkMyPe(), 
1539            endTime*1e06);
1540   */
1541 }
1542
1543 int TraceProjections::idxRegistered(int idx)
1544 {
1545     int idxVecLen = idxVec.size();
1546     for(int i=0; i<idxVecLen; i++)
1547     {
1548         if(idx == idxVec[i])
1549             return 1;
1550     }
1551     return 0;
1552 }
1553
1554 void TraceProjections::regFunc(const char *name, int &idx, int idxSpecifiedByUser){
1555     StrKey k((char*)name,strlen(name));
1556     int num = funcHashtable.get(k);
1557     
1558     if(num!=0) {
1559         return;
1560         //as for mpi programs, the same function may be registered for several times
1561         //CmiError("\"%s has been already registered! Please change the name!\"\n", name);
1562     }
1563     
1564     int isIdxExisting=0;
1565     if(idxSpecifiedByUser)
1566         isIdxExisting=idxRegistered(idx);
1567     if(isIdxExisting){
1568         return;
1569         //same reason with num!=0
1570         //CmiError("The identifier %d for the trace function has been already registered!", idx);
1571     }
1572
1573     if(idxSpecifiedByUser) {
1574         char *st = new char[strlen(name)+1];
1575         memcpy(st,name,strlen(name)+1);
1576         StrKey *newKey = new StrKey(st,strlen(st));
1577         int &ref = funcHashtable.put(*newKey);
1578         ref=idx;
1579         funcCount++;
1580         idxVec.push_back(idx);  
1581     } else {
1582         char *st = new char[strlen(name)+1];
1583         memcpy(st,name,strlen(name)+1);
1584         StrKey *newKey = new StrKey(st,strlen(st));
1585         int &ref = funcHashtable.put(*newKey);
1586         ref=funcCount;
1587         num = funcCount;
1588         funcCount++;
1589         idx = num;
1590         idxVec.push_back(idx);
1591     }
1592 }
1593
1594 void TraceProjections::beginFunc(char *name,char *file,int line){
1595         StrKey k(name,strlen(name));    
1596         unsigned short  num = (unsigned short)funcHashtable.get(k);
1597         beginFunc(num,file,line);
1598 }
1599
1600 void TraceProjections::beginFunc(int idx,char *file,int line){
1601         if(idx <= 0){
1602                 CmiError("Unregistered function id %d being used in %s:%d \n",idx,file,line);
1603         }       
1604         _logPool->add(BEGIN_FUNC,TraceTimer(),idx,line,file);
1605 }
1606
1607 void TraceProjections::endFunc(char *name){
1608         StrKey k(name,strlen(name));    
1609         int num = funcHashtable.get(k);
1610         endFunc(num);   
1611 }
1612
1613 void TraceProjections::endFunc(int num){
1614         if(num <= 0){
1615                 printf("endFunc without start :O\n");
1616         }
1617         _logPool->add(END_FUNC,TraceTimer(),num,0,NULL);
1618 }
1619
1620 // specialized PUP:ers for handling trace projections logs
1621 void toProjectionsFile::bytes(void *p,int n,size_t itemSize,dataType t)
1622 {
1623   for (int i=0;i<n;i++) 
1624     switch(t) {
1625     case Tchar: CheckAndFPrintF(f,"%c",((char *)p)[i]); break;
1626     case Tuchar:
1627     case Tbyte: CheckAndFPrintF(f,"%d",((unsigned char *)p)[i]); break;
1628     case Tshort: CheckAndFPrintF(f," %d",((short *)p)[i]); break;
1629     case Tushort: CheckAndFPrintF(f," %u",((unsigned short *)p)[i]); break;
1630     case Tint: CheckAndFPrintF(f," %d",((int *)p)[i]); break;
1631     case Tuint: CheckAndFPrintF(f," %u",((unsigned int *)p)[i]); break;
1632     case Tlong: CheckAndFPrintF(f," %ld",((long *)p)[i]); break;
1633     case Tulong: CheckAndFPrintF(f," %lu",((unsigned long *)p)[i]); break;
1634     case Tfloat: CheckAndFPrintF(f," %.7g",((float *)p)[i]); break;
1635     case Tdouble: CheckAndFPrintF(f," %.15g",((double *)p)[i]); break;
1636 #ifdef CMK_PUP_LONG_LONG
1637     case Tlonglong: CheckAndFPrintF(f," %lld",((CMK_TYPEDEF_INT8 *)p)[i]); break;
1638     case Tulonglong: CheckAndFPrintF(f," %llu",((CMK_TYPEDEF_UINT8 *)p)[i]); break;
1639 #endif
1640     default: CmiAbort("Unrecognized pup type code!");
1641     };
1642 }
1643
1644 void fromProjectionsFile::bytes(void *p,int n,size_t itemSize,dataType t)
1645 {
1646   for (int i=0;i<n;i++) 
1647     switch(t) {
1648     case Tchar: { 
1649       char c = fgetc(f);
1650       if (c==EOF)
1651         parseError("Could not match character");
1652       else
1653         ((char *)p)[i] = c;
1654       break;
1655     }
1656     case Tuchar:
1657     case Tbyte: ((unsigned char *)p)[i]=(unsigned char)readInt("%d"); break;
1658     case Tshort:((short *)p)[i]=(short)readInt(); break;
1659     case Tushort: ((unsigned short *)p)[i]=(unsigned short)readUint(); break;
1660     case Tint:  ((int *)p)[i]=readInt(); break;
1661     case Tuint: ((unsigned int *)p)[i]=readUint(); break;
1662     case Tlong: ((long *)p)[i]=readInt(); break;
1663     case Tulong:((unsigned long *)p)[i]=readUint(); break;
1664     case Tfloat: ((float *)p)[i]=(float)readDouble(); break;
1665     case Tdouble:((double *)p)[i]=readDouble(); break;
1666 #ifdef CMK_PUP_LONG_LONG
1667     case Tlonglong: ((CMK_TYPEDEF_INT8 *)p)[i]=readLongInt(); break;
1668     case Tulonglong: ((CMK_TYPEDEF_UINT8 *)p)[i]=readLongInt(); break;
1669 #endif
1670     default: CmiAbort("Unrecognized pup type code!");
1671     };
1672 }
1673
1674 #if CMK_PROJECTIONS_USE_ZLIB
1675 void toProjectionsGZFile::bytes(void *p,int n,size_t itemSize,dataType t)
1676 {
1677   for (int i=0;i<n;i++) 
1678     switch(t) {
1679     case Tchar: gzprintf(f,"%c",((char *)p)[i]); break;
1680     case Tuchar:
1681     case Tbyte: gzprintf(f,"%d",((unsigned char *)p)[i]); break;
1682     case Tshort: gzprintf(f," %d",((short *)p)[i]); break;
1683     case Tushort: gzprintf(f," %u",((unsigned short *)p)[i]); break;
1684     case Tint: gzprintf(f," %d",((int *)p)[i]); break;
1685     case Tuint: gzprintf(f," %u",((unsigned int *)p)[i]); break;
1686     case Tlong: gzprintf(f," %ld",((long *)p)[i]); break;
1687     case Tulong: gzprintf(f," %lu",((unsigned long *)p)[i]); break;
1688     case Tfloat: gzprintf(f," %.7g",((float *)p)[i]); break;
1689     case Tdouble: gzprintf(f," %.15g",((double *)p)[i]); break;
1690 #ifdef CMK_PUP_LONG_LONG
1691     case Tlonglong: gzprintf(f," %lld",((CMK_TYPEDEF_INT8 *)p)[i]); break;
1692     case Tulonglong: gzprintf(f," %llu",((CMK_TYPEDEF_UINT8 *)p)[i]); break;
1693 #endif
1694     default: CmiAbort("Unrecognized pup type code!");
1695     };
1696 }
1697 #endif
1698
1699 void TraceProjections::endPhase() {
1700   double currentPhaseTime = TraceTimer();
1701   if (lastPhaseEvent != NULL) {
1702   } else {
1703     if (_logPool->pool != NULL) {
1704       // assumed to be BEGIN_COMPUTATION
1705     } else {
1706       CkPrintf("[%d] Warning: End Phase encountered in an empty log. Inserting BEGIN_COMPUTATION event\n", CkMyPe());
1707       _logPool->add(BEGIN_COMPUTATION, 0, 0, currentPhaseTime, -1, -1);
1708     }
1709   }
1710
1711   /* Insert endPhase event here. */
1712   /* FIXME: Format should be TYPE, PHASE#, TimeStamp, [StartTime] */
1713   /*        We currently "borrow" from the standard add() method. */
1714   /*        It should really be its own add() method.             */
1715   /* NOTE: assignment to lastPhaseEvent is "pre-emptive".         */
1716   lastPhaseEvent = &(_logPool->pool[_logPool->numEntries]);
1717   _logPool->add(END_PHASE, 0, currentPhaseID, currentPhaseTime, -1, CkMyPe());
1718   currentPhaseID++;
1719 }
1720
1721 #ifdef PROJ_ANALYSIS
1722 // ***** FROM HERE, ALL BOC-BASED FUNCTIONALITY IS DEFINED *******
1723
1724
1725 // ***@@@@ REGISTRATION FUNCTIONS/METHODS @@@@***
1726
1727 void registerOutlierReduction() {
1728   outlierReductionType =
1729     CkReduction::addReducer(outlierReduction);
1730   minMaxReductionType =
1731     CkReduction::addReducer(minMaxReduction);
1732 }
1733
1734 /**
1735  * **IMPT NOTES**:
1736  *
1737  * This is the C++ code that is registered to be activated at module
1738  * shutdown. This is called exactly once on processor 0. Module shutdown
1739  * is initiated as a result of a CkExit() call by the application code
1740  * 
1741  * The exit function must ultimately call CkExit() again to
1742  * so that other module exit functions may proceed after this module is
1743  * done.
1744  *
1745  */
1746 // FIXME: WHY extern "C"???
1747 extern "C" void TraceProjectionsExitHandler()
1748 {
1749 #if CMK_TRACE_ENABLED
1750   // CkPrintf("[%d] TraceProjectionsExitHandler called!\n", CkMyPe());
1751   CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1752   bocProxy.traceProjectionsParallelShutdown(CkMyPe());
1753 #else
1754   CkExit();
1755 #endif
1756 }
1757
1758 // This is called once on each processor but the idiom of use appears
1759 // to be to only have processor 0 register the function.
1760 //
1761 // See initnode in trace-projections.ci
1762 void initTraceProjectionsBOC()
1763 {
1764   // CkPrintf("[%d] Trace Projections initialization called!\n", CkMyPe());
1765 #ifdef __BIGSIM__
1766   if (BgNodeRank() == 0) {
1767 #else
1768     if (CkMyRank() == 0) {
1769 #endif
1770       registerExitFn(TraceProjectionsExitHandler);
1771     }
1772 #if 0
1773   } // this is so indentation does not get messed up
1774 #endif
1775 }
1776
1777 // mainchare for trace-projections BOC-operations. 
1778 // Instantiated at processor 0 and ONLY resides on processor 0 for the 
1779 // rest of its life.
1780 //
1781 // Responsible for:
1782 //   1. Handling commandline arguments
1783 //   2. Creating any objects required for proper BOC operations.
1784 //
1785 TraceProjectionsInit::TraceProjectionsInit(CkArgMsg *msg) {
1786   /** Options for Outlier Analysis */
1787   // defaults. Things will change with support for interactive analysis.
1788   bool findOutliers = false;
1789   bool outlierAutomatic = true;
1790   int numKSeeds = 10; 
1791
1792   int peNumKeep = CkNumPes();  // used as a default
1793   double entryThreshold = 0.0;
1794   bool outlierUsePhases = false;
1795   if (outlierAutomatic) {
1796     CmiGetArgIntDesc(msg->argv, "+outlierNumSeeds", &numKSeeds,
1797                      "Number of cluster seeds to apply at outlier analysis.");
1798     CmiGetArgIntDesc(msg->argv, "+outlierPeNumKeep", 
1799                      &peNumKeep, "Number of Processors to retain data");
1800     CmiGetArgDoubleDesc(msg->argv, "+outlierEpThresh", &entryThreshold,
1801                         "Minimum significance of entry points to be considered for clustering (%).");
1802     findOutliers =
1803       CmiGetArgFlagDesc(msg->argv,"+outlier", "Find Outliers.");
1804     outlierUsePhases = 
1805       CmiGetArgFlagDesc(msg->argv,"+outlierUsePhases",
1806                         "Apply automatic outlier analysis to any available phases.");
1807     if (outlierUsePhases) {
1808       // if the user wants to use an outlier feature, it is assumed outlier
1809       //    analysis is desired.
1810       findOutliers = true;
1811     }
1812   }
1813   bool findStartTime = (CmiTimerAbsolute()==1);
1814   traceProjectionsGID = CProxy_TraceProjectionsBOC::ckNew(findOutliers, findStartTime);
1815   if (findOutliers) {
1816     kMeansGID = CProxy_KMeansBOC::ckNew(outlierAutomatic,
1817                                         numKSeeds,
1818                                         peNumKeep,
1819                                         entryThreshold,
1820                                         outlierUsePhases);
1821   }
1822 }
1823
1824 // Called on every processor.
1825 void TraceProjectionsBOC::traceProjectionsParallelShutdown(int pe) {
1826   //CmiPrintf("[%d] traceProjectionsParallelShutdown called from . \n", CkMyPe(), pe);
1827   endPe = pe;                // the pe that starts CkExit()
1828   if (CkMyPe() == 0) {
1829     analysisStartTime = CmiWallTimer();
1830   }
1831   CkpvAccess(_trace)->endComputation();
1832   // no more tracing for projections on this processor after this point. 
1833   // Note that clear must be called after remove, or bad things will happen.
1834   CkpvAccess(_traces)->removeTrace(CkpvAccess(_trace));
1835   CkpvAccess(_traces)->clearTrace();
1836
1837   // From this point, we start multiple chains of reductions and broadcasts to
1838   // perform final online analysis activities.
1839
1840   // Start all parallel operations at once. 
1841   //   These MUST NOT modify base performance data in LogPool. If they must,
1842   //   then the parallel operations must be phased (and this code to be
1843   //   restructured as necessary)
1844   CProxy_KMeansBOC kMeansProxy(kMeansGID);
1845   CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1846   if (findOutliers) {
1847     parModulesRemaining++;
1848     kMeansProxy[CkMyPe()].startKMeansAnalysis();
1849   }
1850   parModulesRemaining++;
1851   if (findStartTime) 
1852   bocProxy[CkMyPe()].startTimeAnalysis();
1853   else
1854   bocProxy[CkMyPe()].startEndTimeAnalysis();
1855 }
1856
1857 // Called on each processor
1858 void KMeansBOC::startKMeansAnalysis() {
1859   // Initialize all necessary structures
1860   LogPool *pool = CkpvAccess(_trace)->_logPool;
1861
1862  if(CkMyPe()==0)     CkPrintf("[%d] KMeansBOC::startKMeansAnalysis time=\t%g\n", CkMyPe(), CkWallTimer() );
1863   int flushInt = 0;
1864   if (pool->hasFlushed) {
1865     flushInt = 1;
1866   }
1867   
1868   CkCallback cb(CkIndex_KMeansBOC::flushCheck(NULL), 
1869                 0, thisProxy);
1870   contribute(sizeof(int), &flushInt, CkReduction::logical_or, cb);  
1871 }
1872
1873 // Called on processor 0
1874 void KMeansBOC::flushCheck(CkReductionMsg *msg) {
1875   int someFlush = *((int *)msg->getData());
1876
1877   // if(CkMyPe()==0) CkPrintf("[%d] KMeansBOC::flushCheck time=\t%g\n", CkMyPe(), CkWallTimer() );
1878   
1879   if (someFlush == 0) {
1880     // Data intact proceed with KMeans analysis
1881     CProxy_KMeansBOC kMeansProxy(kMeansGID);
1882     kMeansProxy.flushCheckDone();
1883   } else {
1884     // Some processor had flushed it data at some point, abandon KMeans
1885     CkPrintf("Warning: Some processor has flushed its data. No KMeans will be conducted\n");
1886     // terminate KMeans
1887     CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1888     bocProxy[0].kMeansDone();
1889   }
1890 }
1891
1892 // Called on each processor
1893 void KMeansBOC::flushCheckDone() {
1894   // **FIXME** - more flexible metric collection scheme may be necessary
1895   //   in the future for production use.
1896   LogPool *pool = CkpvAccess(_trace)->_logPool;
1897
1898   // if(CkMyPe()==0)     CkPrintf("[%d] KMeansBOC::flushCheckDone time=\t%g\n", CkMyPe(), CkWallTimer() );
1899
1900   numEntryMethods = _entryTable.size();
1901   numMetrics = numEntryMethods + 2; // EPtime + idle and overhead
1902
1903   // maintained across phases
1904   markedBegin = false;
1905   markedIdle = false;
1906   beginBlockTime = 0.0;
1907   beginIdleBlockTime = 0.0;
1908   lastBeginEPIdx = -1; // none
1909
1910   lastPhaseIdx = 0;
1911   currentExecTimes = NULL;
1912   currentPhase = 0;
1913   selected = false;
1914
1915   pool->initializePhases();
1916
1917   // incoming K Seeds and the per-phase filter
1918   incKSeeds = new double[numK*numMetrics];
1919   keepMetric = new bool[numMetrics];
1920
1921   //  Something wrong when call thisProxy[CkMyPe()].getNextPhaseMetrics() !??!
1922   //  CProxy_KMeansBOC kMeansProxy(kMeansGID);
1923   //  kMeansProxy[CkMyPe()].getNextPhaseMetrics();
1924   thisProxy[CkMyPe()].getNextPhaseMetrics();
1925 }
1926
1927 // Called on each processor.
1928 void KMeansBOC::getNextPhaseMetrics() {
1929   // Assumes the presence of the complete logs on this processor.
1930   // Assumes first event is always BEGIN_COMPUTATION
1931   // Assumes each processor sees the same number of phases.
1932   //
1933   // In this code, we collect performance data for this processor.
1934   // All times are in seconds.
1935
1936   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::getNextPhaseMetrics time=\t%g\n", CkMyPe(), CkWallTimer() );  
1937
1938   if (usePhases) {
1939     DEBUGF("[%d] Using Phases\n", CkMyPe());
1940   } else {
1941     DEBUGF("[%d] NOT using Phases\n", CkMyPe());
1942   }
1943   
1944   if (currentExecTimes != NULL) {
1945     delete [] currentExecTimes;
1946   }
1947   currentExecTimes = new double[numMetrics];
1948   for (int i=0; i<numMetrics; i++) {
1949     currentExecTimes[i] = 0.0;
1950   }
1951
1952   int numEventMethods = _entryTable.size();
1953   LogPool *pool = CkpvAccess(_trace)->_logPool;
1954   
1955   CkAssert(pool->numEntries > lastPhaseIdx);
1956   double totalPhaseTime = 0.0;
1957   double totalActiveTime = 0.0; // entry method + idle
1958
1959   for (int i=lastPhaseIdx; i<pool->numEntries; i++) {
1960     if (pool->pool[i].type == BEGIN_PROCESSING) {
1961       // check pairing
1962       if (!markedBegin) {
1963         markedBegin = true;
1964       }
1965       beginBlockTime = pool->pool[i].time;
1966       lastBeginEPIdx = pool->pool[i].eIdx;
1967     } else if (pool->pool[i].type == END_PROCESSING) {
1968       // check pairing
1969       // if End without a begin, just ignore
1970       //   this event. If a phase-boundary is crossed, the Begin
1971       //   event would be maintained in beginBlockTime, so it is 
1972       //   not a problem.
1973       if (markedBegin) {
1974         markedBegin = false;
1975         if (pool->pool[i].event < 0)
1976         {
1977           // ignore dummy events. **FIXME** as they have no eIdx?
1978           continue;
1979         }
1980         currentExecTimes[pool->pool[i].eIdx] += 
1981           pool->pool[i].time - beginBlockTime;
1982         totalActiveTime += pool->pool[i].time - beginBlockTime;
1983         lastBeginEPIdx = -1;
1984       }
1985     } else if (pool->pool[i].type == BEGIN_IDLE) {
1986       // check pairing
1987       if (!markedIdle) {
1988         markedIdle = true;
1989       }
1990       beginIdleBlockTime = pool->pool[i].time;
1991     } else if (pool->pool[i].type == END_IDLE) {
1992       // check pairing
1993       if (markedIdle) {
1994         markedIdle = false;
1995         currentExecTimes[numEventMethods] += 
1996           pool->pool[i].time - beginIdleBlockTime;
1997         totalActiveTime += pool->pool[i].time - beginIdleBlockTime;
1998       }
1999     } else if (pool->pool[i].type == END_PHASE) {
2000       // ignored when not using phases
2001       if (usePhases) {
2002         // when we've not visited this node before
2003         if (i != lastPhaseIdx) { 
2004           totalPhaseTime = 
2005             pool->pool[i].time - pool->pool[lastPhaseIdx].time;
2006           // it is important that proper accounting of time take place here.
2007           // Note that END_PHASE events inevitably occur in the context of
2008           //   some entry method by the way the tracing API is designed.
2009           if (markedBegin) {
2010             CkAssert(lastBeginEPIdx >= 0);
2011             currentExecTimes[lastBeginEPIdx] += 
2012               pool->pool[i].time - beginBlockTime;
2013             totalActiveTime += pool->pool[i].time - beginBlockTime;
2014             // this is so the remainder contributes to the next phase
2015             beginBlockTime = pool->pool[i].time;
2016           }
2017           // The following is unlikely, but stranger things have happened.
2018           if (markedIdle) {
2019             currentExecTimes[numEventMethods] +=
2020               pool->pool[i].time - beginIdleBlockTime;
2021             totalActiveTime += pool->pool[i].time - beginIdleBlockTime;
2022             // this is so the remainder contributes to the next phase
2023             beginIdleBlockTime = pool->pool[i].time;
2024           }
2025           if (totalActiveTime <= totalPhaseTime) {
2026             currentExecTimes[numEventMethods+1] = 
2027               totalPhaseTime - totalActiveTime;
2028           } else {
2029             currentExecTimes[numEventMethods+1] = 0.0;
2030             CkPrintf("[%d] Warning: Overhead found to be negative for Phase %d!\n",
2031                      CkMyPe(), currentPhase);
2032           }
2033           collectKMeansData();
2034           // end the loop (and method) and defer the work till the next call
2035           lastPhaseIdx = i;
2036           break; 
2037         }
2038       }
2039     } else if (pool->pool[i].type == END_COMPUTATION) {
2040       if (markedBegin) {
2041         CkAssert(lastBeginEPIdx >= 0);
2042         currentExecTimes[lastBeginEPIdx] += 
2043           pool->pool[i].time - beginBlockTime;
2044         totalActiveTime += pool->pool[i].time - beginBlockTime;
2045       }
2046       if (markedIdle) {
2047         currentExecTimes[numEventMethods] +=
2048           pool->pool[i].time - beginIdleBlockTime;
2049         totalActiveTime += pool->pool[i].time - beginIdleBlockTime;
2050       }
2051       totalPhaseTime = 
2052         pool->pool[i].time - pool->pool[lastPhaseIdx].time;
2053       if (totalActiveTime <= totalPhaseTime) {
2054         currentExecTimes[numEventMethods+1] = totalPhaseTime - totalActiveTime;
2055       } else {
2056         currentExecTimes[numEventMethods+1] = 0.0;
2057         CkPrintf("[%d] Warning: Overhead found to be negative!\n",
2058                  CkMyPe());
2059       }
2060       collectKMeansData();
2061     }
2062   }
2063 }
2064
2065 /**
2066  *  Through a reduction, collectKMeansData aggregates each processors' data
2067  *  in order for global properties to be determined:
2068  *  
2069  *  1. min & max to determine normalization factors.
2070  *  2. sum to determine global EP averages for possible metric reduction
2071  *       through thresholding.
2072  *  3. sum of squares to compute stddev which may be useful in the future.
2073  *
2074  *  collectKMeansData will also keep the processor's data for the current
2075  *    phase so that it may be normalized and worked on subsequently.
2076  *
2077  **/
2078 void KMeansBOC::collectKMeansData() {
2079   int minOffset = numMetrics;
2080   int maxOffset = 2*numMetrics;
2081   int sosOffset = 3*numMetrics; // sos = Sum Of Squares
2082
2083   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::collectKMeansData time=\tg\n", CkMyPe(), CkWallTimer() );
2084
2085   double *reductionMsg = new double[numMetrics*4];
2086
2087   for (int i=0; i<numMetrics; i++) {
2088     reductionMsg[i] = currentExecTimes[i];
2089     // duplicate the event times for max and min sections of the reduction
2090     reductionMsg[minOffset + i] = currentExecTimes[i];
2091     reductionMsg[maxOffset + i] = currentExecTimes[i];
2092     // compute squares
2093     reductionMsg[sosOffset + i] = currentExecTimes[i]*currentExecTimes[i];
2094   }
2095
2096   CkCallback cb(CkIndex_KMeansBOC::globalMetricRefinement(NULL), 
2097                 0, thisProxy);
2098   contribute((numMetrics*4)*sizeof(double), reductionMsg, 
2099              outlierReductionType, cb);  
2100 }
2101
2102 // The purpose is mainly to initialize the k seeds and generate
2103 //   normalization parameters for each of the metrics. The k seeds
2104 //   and normalization parameters are broadcast to all processors.
2105 //
2106 // Called on processor 0
2107 void KMeansBOC::globalMetricRefinement(CkReductionMsg *msg) {
2108   CkAssert(CkMyPe() == 0);
2109   
2110   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::globalMetricRefinement time=\t%g\n", CkMyPe(), CkWallTimer() );
2111
2112   int sumOffset = 0;
2113   int minOffset = numMetrics;
2114   int maxOffset = 2*numMetrics;
2115   int sosOffset = 3*numMetrics; // sos = Sum Of Squares
2116
2117   // calculate statistics & boundaries for the k seeds for clustering
2118   KMeansStatsMessage *outmsg = 
2119     new (numMetrics, numK*numMetrics, numMetrics*4) KMeansStatsMessage;
2120   outmsg->numMetrics = numMetrics;
2121   outmsg->numKPos = numK*numMetrics;
2122   outmsg->numStats = numMetrics*4;
2123
2124   // Sum | Min | Max | Sum of Squares
2125   double *totalExecTimes = (double *)msg->getData();
2126   double totalTime = 0.0;
2127
2128   for (int i=0; i<numMetrics; i++) {
2129     DEBUGN("%lf\n", totalExecTimes[i]);
2130     totalTime += totalExecTimes[i];
2131
2132     // calculate event mean over all processors
2133     outmsg->stats[sumOffset + i] = totalExecTimes[sumOffset + i]/CkNumPes();
2134
2135     // get the ranges and offsets of each metric. With this, we get
2136     //   normalization factors that can be sent back to each processor to
2137     //   be used as necessary. We reuse max for range. Min remains the offset.
2138     outmsg->stats[minOffset + i] = totalExecTimes[minOffset + i];
2139     outmsg->stats[maxOffset + i] = totalExecTimes[maxOffset + i] -
2140       totalExecTimes[minOffset + i];
2141     
2142     // calculate stddev (using biased variance)
2143     outmsg->stats[sosOffset + i] = 
2144       sqrt((totalExecTimes[sosOffset + i] - 
2145             2*(outmsg->stats[i])*totalExecTimes[i] +
2146             (outmsg->stats[i])*(outmsg->stats[i])*CkNumPes())/
2147            CkNumPes());
2148   }
2149
2150   for (int i=0; i<numMetrics; i++) {
2151     // 1) if the proportion of the max value of the entry method relative to
2152     //   the average time taken over all entry methods across all processors
2153     //   is greater than the stipulated percentage threshold ...; AND
2154     // 2) if the range of values are non-zero.
2155     //
2156     // The current assumption is totalTime > 0 (what program has zero total
2157     //   time from all work?)
2158     keepMetric[i] = ((totalExecTimes[maxOffset + i]/(totalTime/CkNumPes()) >=
2159                      entryThreshold) &&
2160       (totalExecTimes[maxOffset + i] > totalExecTimes[minOffset + i]));
2161     if (keepMetric[i]) {
2162       DEBUGF("[%d] Keep EP %d | Max = %lf | Avg Tot = %lf\n", CkMyPe(), i,
2163              totalExecTimes[maxOffset + i], totalTime/CkNumPes());
2164     } else {
2165       DEBUGN("[%d] DO NOT Keep EP %d\n", CkMyPe(), i);
2166     }
2167     outmsg->filter[i] = keepMetric[i];
2168   }
2169
2170   delete msg;
2171   
2172   // initialize k seeds for this phase
2173   kSeeds = new double[numK*numMetrics];
2174
2175   numKReported = 0;
2176   kNumMembers = new int[numK];
2177
2178   // Randomly select k processors' metric vectors for the k seeds
2179   //  srand((unsigned)(CmiWallTimer()*1.0e06));
2180   srand(11337); // for debugging purposes
2181   for (int k=0; k<numK; k++) {
2182     DEBUGF("Seed %d | ", k);
2183     for (int m=0; m<numMetrics; m++) {
2184       double factor = totalExecTimes[maxOffset + m] - 
2185         totalExecTimes[minOffset + m];
2186       // "uniform" distribution, scaled according to the normalization
2187       //   factors
2188       //      kSeeds[numMetrics*k + m] = ((1.0*(k+1))/numK)*factor;
2189       // Random distribution.
2190       kSeeds[numMetrics*k + m] =
2191         ((rand()*1.0)/RAND_MAX)*factor;
2192       if (keepMetric[m]) {
2193         DEBUGF("[%d|%lf] ", m, kSeeds[numMetrics*k + m]);
2194       }
2195       outmsg->kSeedsPos[numMetrics*k + m] = kSeeds[numMetrics*k + m];
2196     }
2197     DEBUGF("\n");
2198     kNumMembers[k] = 0;
2199   }
2200
2201   // broadcast statistical values to all processors for cluster discovery
2202   thisProxy.findInitialClusters(outmsg);
2203 }
2204
2205
2206
2207 // Called on each processor.
2208 void KMeansBOC::findInitialClusters(KMeansStatsMessage *msg) {
2209
2210  if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::findInitialClusters time=\t%g\n", CkMyPe(), CkWallTimer() );
2211
2212   phaseIter = 0;
2213
2214   // Get info from stats message
2215   CkAssert(numMetrics == msg->numMetrics);
2216   for (int i=0; i<numMetrics; i++) {
2217     keepMetric[i] = msg->filter[i];
2218   }
2219
2220   // Normalize data on local processor.
2221   // **CWL** See my thesis for detailed discussion of normalization of
2222   //    performance data.
2223   // **NOTE** This might change if we want to send data based on the filter
2224   //   instead of all the data.
2225   CkAssert(numMetrics*4 == msg->numStats);
2226   for (int i=0; i<numMetrics; i++) {
2227     currentExecTimes[i] -= msg->stats[numMetrics + i];  // take offset
2228     // **CWL** We do not normalize the range. Entry methods that exhibit
2229     //   large absolute timing variations should be allowed to contribute
2230     //   more to the Euclidean distance measure!
2231     // currentExecTimes[i] /= msg->stats[2*numMetrics + i];
2232   }
2233
2234   // **NOTE** This might change if we want to send data based on the filter
2235   //   instead of all the data.
2236   CkAssert(numK*numMetrics == msg->numKPos);
2237   for (int i=0; i<msg->numKPos; i++) {
2238     incKSeeds[i] = msg->kSeedsPos[i];
2239   }
2240
2241   // Decide which KSeed this processor belongs to.
2242   minDistance = calculateDistance(0);
2243   DEBUGN("[%d] Phase %d Iter %d | Distance from 0 = %lf \n", CkMyPe(), 
2244            currentPhase, phaseIter, minDistance);
2245   minK = 0;
2246   for (int i=1; i<numK; i++) {
2247     double distance = calculateDistance(i);
2248     DEBUGN("[%d] Phase %d Iter %d | Distance from %d = %lf \n", CkMyPe(), 
2249              currentPhase, phaseIter, i, distance);
2250     if (distance < minDistance) {
2251       minDistance = distance;
2252       minK = i;
2253     }
2254   }
2255
2256   // Set up a reduction with the modification vector to the root (0).
2257   //
2258   // The modification vector sends a negative value for each metric
2259   //   for the K this processor no longer belongs to and a positive
2260   //   value to the K the processor now belongs. In addition, a -1.0
2261   //   is sent to the K it is leaving and a +1.0 to the K it is 
2262   //   joining.
2263   //
2264   // The processor must still contribute a "zero returns" even if
2265   //   nothing changes. This will be the basis for determine
2266   //   convergence at the root.
2267   //
2268   // The addtional +1 is meant for the count-change that must be
2269   //   maintained for the special cases at the root when some K
2270   //   may be deprived of all processor points or go from 0 to a
2271   //   positive number of processors (see later comments).
2272   double *modVector = new double[numK*(numMetrics+1)];
2273   for (int i=0; i<numK; i++) {
2274     for (int j=0; j<numMetrics+1; j++) {
2275       modVector[i*(numMetrics+1) + j] = 0.0;
2276     }
2277   }
2278   for (int i=0; i<numMetrics; i++) {
2279     // for this initialization, only positive values need be sent.
2280     modVector[minK*(numMetrics+1) + i] = currentExecTimes[i];
2281   }
2282   modVector[minK*(numMetrics+1)+numMetrics] = 1.0;
2283
2284   CkCallback cb(CkIndex_KMeansBOC::updateKSeeds(NULL), 
2285                 0, thisProxy);
2286   contribute(numK*(numMetrics+1)*sizeof(double), modVector, 
2287              CkReduction::sum_double, cb);  
2288 }
2289
2290 double KMeansBOC::calculateDistance(int k) {
2291   double ret = 0.0;
2292   for (int i=0; i<numMetrics; i++) {
2293     if (keepMetric[i]) {
2294       DEBUGN("[%d] Phase %d Iter %d Metric %d Exec %lf Seed %lf \n", 
2295              CkMyPe(), currentPhase, phaseIter, i,
2296                currentExecTimes[i], incKSeeds[k*numMetrics + i]);
2297       ret += pow(currentExecTimes[i] - incKSeeds[k*numMetrics + i], 2.0);
2298     }
2299   }
2300   return sqrt(ret);
2301 }
2302
2303 void KMeansBOC::updateKSeeds(CkReductionMsg *msg) {
2304   CkAssert(CkMyPe() == 0);
2305
2306   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::updateKSeeds time=\t%g\n", CkMyPe(), CkWallTimer() );
2307
2308   double *modVector = (double *)msg->getData();
2309   // sanity check
2310   CkAssert(numK*(numMetrics+1)*sizeof(double) == msg->getSize());
2311
2312   // A quick convergence test.
2313   bool hasChanges = false;
2314   for (int i=0; i<numK; i++) {
2315     hasChanges = hasChanges || 
2316       (modVector[i*(numMetrics+1) + numMetrics] != 0.0);
2317   }
2318   if (!hasChanges) {
2319     delete msg;
2320     findRepresentatives();
2321   } else {
2322     int overallChange = 0;
2323     for (int i=0; i<numK; i++) {
2324       int change = (int)modVector[i*(numMetrics+1) + numMetrics];
2325       if (change != 0) {
2326         overallChange += change;
2327         // modify the k seeds based on the modification vectors coming in
2328         //
2329         // If a seed initially has no members, its contents do not matter and
2330         //   is simply set to the average of the incoming vector.
2331         // If the change causes a seed to lose all its members, do nothing.
2332         //   Its last-known location is kept to allow it to re-capture
2333         //   membership at the next iteration rather than apply the last
2334         //   changes (which snaps the point unnaturally to 0,0).
2335         // Otherwise, apply the appropriate vector changes.
2336         CkAssert((kNumMembers[i] + change >= 0) &&
2337                  (kNumMembers[i] + change <= CkNumPes()));
2338         if (kNumMembers[i] == 0) {
2339           CkAssert(change > 0);
2340           for (int j=0; j<numMetrics; j++) {
2341             kSeeds[i*numMetrics + j] = modVector[i*(numMetrics+1) + j]/change;
2342           }
2343         } else if (kNumMembers[i] + change == 0) {
2344           // do nothing.
2345         } else {
2346           for (int j=0; j<numMetrics; j++) {
2347             kSeeds[i*numMetrics + j] *= kNumMembers[i];
2348             kSeeds[i*numMetrics + j] += modVector[i*(numMetrics+1) + j];
2349             kSeeds[i*numMetrics + j] /= kNumMembers[i] + change;
2350           }
2351         }
2352         kNumMembers[i] += change;
2353       }
2354       DEBUGN("[%d] Phase %d Iter %d K = %d Membership Count = %d\n",
2355              CkMyPe(), currentPhase, phaseIter, i, kNumMembers[i]);
2356     }
2357     delete msg;
2358
2359     // broadcast the new seed locations.
2360     KSeedsMessage *outmsg = new (numK*numMetrics) KSeedsMessage;
2361     outmsg->numKPos = numK*numMetrics;
2362     for (int i=0; i<numK*numMetrics; i++) {
2363       outmsg->kSeedsPos[i] = kSeeds[i];
2364     }
2365
2366     thisProxy.updateSeedMembership(outmsg);
2367   }
2368 }
2369
2370 // Called on all processors
2371 void KMeansBOC::updateSeedMembership(KSeedsMessage *msg) {
2372
2373   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::updateSeedMembership time=\t%g\n", CkMyPe(), CkWallTimer() );
2374
2375   phaseIter++;
2376
2377   // **NOTE** This might change if we want to send data based on the filter
2378   //   instead of all the data.
2379   CkAssert(numK*numMetrics == msg->numKPos);
2380   for (int i=0; i<msg->numKPos; i++) {
2381     incKSeeds[i] = msg->kSeedsPos[i];
2382   }
2383
2384   // Decide which KSeed this processor belongs to.
2385   lastMinK = minK;
2386   minDistance = calculateDistance(0);
2387   DEBUGN("[%d] Phase %d Iter %d | Distance from 0 = %lf \n", CkMyPe(), 
2388          currentPhase, phaseIter, minDistance);
2389
2390   minK = 0;
2391   for (int i=1; i<numK; i++) {
2392     double distance = calculateDistance(i);
2393     DEBUGN("[%d] Phase %d Iter %d | Distance from %d = %lf \n", CkMyPe(), 
2394            currentPhase, phaseIter, i, distance);
2395     if (distance < minDistance) {
2396       minDistance = distance;
2397       minK = i;
2398     }
2399   }
2400
2401   double *modVector = new double[numK*(numMetrics+1)];
2402   for (int i=0; i<numK; i++) {
2403     for (int j=0; j<numMetrics+1; j++) {
2404       modVector[i*(numMetrics+1) + j] = 0.0;
2405     }
2406   }
2407
2408   if (minK != lastMinK) {
2409     for (int i=0; i<numMetrics; i++) {
2410       modVector[minK*(numMetrics+1) + i] = currentExecTimes[i];
2411       modVector[lastMinK*(numMetrics+1) + i] = -currentExecTimes[i];
2412     }
2413     modVector[minK*(numMetrics+1)+numMetrics] = 1.0;
2414     modVector[lastMinK*(numMetrics+1)+numMetrics] = -1.0;
2415   }
2416
2417   CkCallback cb(CkIndex_KMeansBOC::updateKSeeds(NULL), 
2418                 0, thisProxy);
2419   contribute(numK*(numMetrics+1)*sizeof(double), modVector, 
2420              CkReduction::sum_double, cb);  
2421 }
2422
2423 void KMeansBOC::findRepresentatives() {
2424
2425   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::findRepresentatives time=\t%g\n", CkMyPe(), CkWallTimer() );
2426
2427   int numNonEmptyClusters = 0;
2428   for (int i=0; i<numK; i++) {
2429     if (kNumMembers[i] > 0) {
2430       numNonEmptyClusters++;
2431     }
2432   }
2433
2434   int numRepresentatives = peNumKeep;
2435   // **FIXME**
2436   // This is fairly arbitrary. Next time, choose the centers of the top
2437   //   largest clusters.
2438   if (numRepresentatives < numNonEmptyClusters) {
2439     numRepresentatives = numNonEmptyClusters;
2440   }
2441
2442   int slotsRemaining = numRepresentatives;
2443
2444   DEBUGF("Slots = %d | Non-empty = %d \n", slotsRemaining, 
2445          numNonEmptyClusters);
2446
2447   // determine how many exemplars to select per cluster. Currently
2448   //   hardcoded to 1. Future challenge is to decide on other numbers
2449   //   or proportionality.
2450   //
2451   int exemplarsPerCluster = 1;
2452   slotsRemaining -= exemplarsPerCluster*numNonEmptyClusters;
2453
2454   int numCandidateOutliers = CkNumPes() - 
2455     exemplarsPerCluster*numNonEmptyClusters;
2456
2457   double *remainders = new double[numK];
2458   int *assigned = new int[numK];
2459   exemplarChoicesLeft = new int[numK];
2460   outlierChoicesLeft = new int[numK];
2461
2462   for (int i=0; i<numK; i++) {
2463     assigned[i] = 0;
2464     remainders[i] = 
2465       (kNumMembers[i] - exemplarsPerCluster*numNonEmptyClusters) *
2466       slotsRemaining / numCandidateOutliers;
2467     if (remainders[i] >= 0.0) {
2468       assigned[i] = (int)floor(remainders[i]);
2469       remainders[i] -= assigned[i];
2470     } else {
2471       remainders[i] = 0.0;
2472     }
2473   }
2474
2475   for (int i=0; i<numK; i++) {
2476     slotsRemaining -= assigned[i];
2477   }
2478   CkAssert(slotsRemaining >= 0);
2479
2480   // find clusters to assign the loose slots to, in order of
2481   // remainder proportion
2482   while (slotsRemaining > 0) {
2483     double max = 0.0;
2484     int winner = 0;
2485     for (int i=0; i<numK; i++) {
2486       if (remainders[i] > max) {
2487         max = remainders[i];
2488         winner = i;
2489       }
2490     }
2491     assigned[winner]++;
2492     remainders[winner] = 0.0;
2493     slotsRemaining--;
2494   }
2495
2496   // set up how many reduction cycles of min/max we need to conduct to
2497   // select the representatives.
2498   numSelectionIter = exemplarsPerCluster;
2499   for (int i=0; i<numK; i++) {
2500     if (assigned[i] > numSelectionIter) {
2501       numSelectionIter = assigned[i];
2502     }
2503   }
2504   DEBUGF("Selection Iterations = %d\n", numSelectionIter);
2505
2506   for (int i=0; i<numK; i++) {
2507     if (kNumMembers[i] > 0) {
2508       exemplarChoicesLeft[i] = exemplarsPerCluster;
2509       outlierChoicesLeft[i] = assigned[i];
2510     } else {
2511       exemplarChoicesLeft[i] = 0;
2512       outlierChoicesLeft[i] = 0;
2513     }
2514     DEBUGF("%d | Exemplar = %d | Outlier = %d\n", i, exemplarChoicesLeft[i],
2515            outlierChoicesLeft[i]);
2516   }
2517
2518   delete [] assigned;
2519   delete [] remainders;
2520
2521   // send out first broadcast
2522   KSelectionMessage *outmsg = NULL;
2523   if (numSelectionIter > 0) {
2524     outmsg = new (numK, numK, numK) KSelectionMessage;
2525     outmsg->numKMinIDs = numK;
2526     outmsg->numKMaxIDs = numK;
2527     for (int i=0; i<numK; i++) {
2528       outmsg->minIDs[i] = -1;
2529       outmsg->maxIDs[i] = -1;
2530     }
2531     thisProxy.collectDistances(outmsg);
2532   } else {
2533     CkPrintf("Warning: No selection iteration from the start!\n");
2534     // invoke phase completion on all processors
2535     thisProxy.phaseDone();
2536   }
2537 }
2538
2539 /*
2540  *  lastMin = array of minimum champions of the last tournament
2541  *  lastMax = array of maximum champions of the last tournament
2542  *  lastMaxVal = array of last encountered maximum values, allows previous
2543  *                 minimum winners to eliminate themselves from the next
2544  *                 minimum race.
2545  *
2546  *  Called on all processors.
2547  */
2548 void KMeansBOC::collectDistances(KSelectionMessage *msg) {
2549
2550   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::collectDistances time=\t%g\n", CkMyPe(), CkWallTimer() );
2551
2552   DEBUGF("[%d] %d | min = %d max = %d\n", CkMyPe(),
2553          lastMinK, msg->minIDs[lastMinK], msg->maxIDs[lastMinK]);
2554   if ((CkMyPe() == msg->minIDs[lastMinK]) || 
2555       (CkMyPe() == msg->maxIDs[lastMinK])) {
2556     CkAssert(!selected);
2557     selected = true;
2558   }
2559
2560   // build outgoing reduction structure
2561   //   format = minVal | ID | maxVal | ID
2562   double *minMaxAndIDs = NULL;
2563
2564   minMaxAndIDs = new double[numK*4];
2565   // initialize to the appropriate out-of-band values (for error checks)
2566   for (int i=0; i<numK; i++) {
2567     minMaxAndIDs[i*4] = -1.0; // out-of-band min value
2568     minMaxAndIDs[i*4+1] = -1.0; // out of band ID
2569     minMaxAndIDs[i*4+2] = -1.0; // out-of-band max value
2570     minMaxAndIDs[i*4+3] = -1.0; // out of band ID
2571   }
2572   // If I have not won before, I put myself back into the competition
2573   if (!selected) {
2574     DEBUGF("[%d] My Contribution = %lf\n", CkMyPe(), minDistance);
2575     minMaxAndIDs[lastMinK*4] = minDistance;
2576     minMaxAndIDs[lastMinK*4+1] = CkMyPe();
2577     minMaxAndIDs[lastMinK*4+2] = minDistance;
2578     minMaxAndIDs[lastMinK*4+3] = CkMyPe();
2579   }
2580   delete msg;
2581
2582   CkCallback cb(CkIndex_KMeansBOC::findNextMinMax(NULL), 
2583                 0, thisProxy);
2584   contribute(numK*4*sizeof(double), minMaxAndIDs, 
2585              minMaxReductionType, cb);  
2586 }
2587
2588 void KMeansBOC::findNextMinMax(CkReductionMsg *msg) {
2589   // incoming format:
2590   //   minVal | minID | maxVal | maxID
2591
2592   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::findNextMinMax time=\t%g\n", CkMyPe(), CkWallTimer() );
2593
2594   if (numSelectionIter > 0) {
2595     double *incInfo = (double *)msg->getData();
2596     
2597     KSelectionMessage *outmsg = new (numK, numK) KSelectionMessage;
2598     outmsg->numKMinIDs = numK;
2599     outmsg->numKMaxIDs = numK;
2600     
2601     for (int i=0; i<numK; i++) {
2602       DEBUGF("%d | %lf %d %lf %d \n", i, 
2603              incInfo[i*4], (int)incInfo[i*4+1], 
2604              incInfo[i*4+2], (int)incInfo[i*4+3]);
2605     }
2606
2607     for (int i=0; i<numK; i++) {
2608       if (exemplarChoicesLeft[i] > 0) {
2609         outmsg->minIDs[i] = (int)incInfo[i*4+1];
2610         exemplarChoicesLeft[i]--;
2611       } else {
2612         outmsg->minIDs[i] = -1;
2613       }
2614       if (outlierChoicesLeft[i] > 0) {
2615         outmsg->maxIDs[i] = (int)incInfo[i*4+3];
2616         outlierChoicesLeft[i]--;
2617       } else {
2618         outmsg->maxIDs[i] = -1;
2619       }
2620     }
2621     thisProxy.collectDistances(outmsg);
2622     numSelectionIter--;
2623   } else {
2624     // invoke phase completion on all processors
2625     thisProxy.phaseDone();
2626   }
2627 }
2628
2629 /**
2630  *  Completion of the K-Means clustering and data selection of one phase
2631  *    of the computation.
2632  *
2633  *  Called on every processor.
2634  */
2635 void KMeansBOC::phaseDone() {
2636
2637   //  if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::phaseDone time=\t%g\n", CkMyPe(), CkWallTimer() );
2638
2639   LogPool *pool = CkpvAccess(_trace)->_logPool;
2640   CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
2641
2642   // now decide on what to do with the decision.
2643   if (!selected) {
2644     if (usePhases) {
2645       pool->keepPhase[currentPhase] = false;
2646     } else {
2647       // if not using phases, we're working on the whole log
2648       pool->setAllPhases(false);
2649     }
2650   }
2651
2652   // **FIXME** (?) - All processors have to agree on this, or the reduction
2653   //   will not be correct! The question is "is this enforcible?"
2654   if ((currentPhase == (pool->numPhases-1)) || !usePhases) {
2655     // We're done
2656     int dummy = 0;
2657     CkCallback cb(CkIndex_TraceProjectionsBOC::kMeansDone(NULL), 
2658                   0, bocProxy);
2659     contribute(sizeof(int), &dummy, CkReduction::sum_int, cb);
2660   } else {
2661     // reset all phase-based k-means data and decisions
2662
2663     // **FIXME**!!!!!    
2664     
2665     // invoke the next K-Means computation phase.
2666     currentPhase++;
2667     thisProxy[CkMyPe()].getNextPhaseMetrics();
2668   }
2669 }
2670
2671 void TraceProjectionsBOC::startTimeAnalysis()
2672 {
2673   double startTime = 0.0;
2674   if (CkpvAccess(_trace)->_logPool->numEntries>0)
2675      startTime = CkpvAccess(_trace)->_logPool->pool[0].time;
2676   CkCallback cb(CkIndex_TraceProjectionsBOC::startTimeDone(NULL), thisProxy);
2677   contribute(sizeof(double), &startTime, CkReduction::min_double, cb);  
2678 }
2679
2680 void TraceProjectionsBOC::startTimeDone(CkReductionMsg *msg)
2681 {
2682   // CkPrintf("[%d] TraceProjectionsBOC::startTimeDone time=\t%g parModulesRemaining:%d\n", CkMyPe(), CkWallTimer(), parModulesRemaining);
2683
2684   if (CkpvAccess(_trace) != NULL) {
2685     CkpvAccess(_trace)->_logPool->globalStartTime = *(double *)msg->getData();
2686     CkpvAccess(_trace)->_logPool->setNewStartTime();
2687     //if (CkMyPe() == 0) CkPrintf("Start time determined to be %lf us\n", (CkpvAccess(_trace)->_logPool->globalStartTime)*1e06);
2688   }
2689   delete msg;
2690   thisProxy[CkMyPe()].startEndTimeAnalysis();
2691 }
2692
2693 void TraceProjectionsBOC::startEndTimeAnalysis()
2694 {
2695  //CkPrintf("[%d] TraceProjectionsBOC::startEndTimeAnalysis time=\t%g\n", CkMyPe(), CkWallTimer() );
2696
2697   endTime = CkpvAccess(_trace)->endTime;
2698   // CkPrintf("[%d] End time is %lf us\n", CkMyPe(), endTime*1e06);
2699
2700   CkCallback cb(CkIndex_TraceProjectionsBOC::endTimeDone(NULL), 
2701                 0, thisProxy);
2702   contribute(sizeof(double), &endTime, CkReduction::max_double, cb);  
2703 }
2704
2705 void TraceProjectionsBOC::endTimeDone(CkReductionMsg *msg)
2706 {
2707  //if(CkMyPe()==0)    CkPrintf("[%d] TraceProjectionsBOC::endTimeDone time=\t%g parModulesRemaining:%d\n", CkMyPe(), CkWallTimer(), parModulesRemaining);
2708
2709   CkAssert(CkMyPe() == 0);
2710   parModulesRemaining--;
2711   if (CkpvAccess(_trace) != NULL) {
2712     CkpvAccess(_trace)->_logPool->globalEndTime = *(double *)msg->getData() - CkpvAccess(_trace)->_logPool->globalStartTime;
2713     // CkPrintf("End time determined to be %lf us\n",
2714     //       (CkpvAccess(_trace)->_logPool->globalEndTime)*1e06);
2715   }
2716   delete msg;
2717   if (parModulesRemaining == 0) {
2718     thisProxy[CkMyPe()].finalize();
2719   }
2720 }
2721
2722 void TraceProjectionsBOC::kMeansDone(CkReductionMsg *msg) {
2723
2724  if(CkMyPe()==0)  CkPrintf("[%d] TraceProjectionsBOC::kMeansDone time=\t%g\n", CkMyPe(), CkWallTimer() );
2725
2726   CkAssert(CkMyPe() == 0);
2727   parModulesRemaining--;
2728   CkPrintf("K-Means Analysis Time = %lf seconds\n",
2729            CmiWallTimer()-analysisStartTime);
2730   delete msg;
2731   if (parModulesRemaining == 0) {
2732     thisProxy[CkMyPe()].finalize();
2733   }
2734 }
2735
2736 /**
2737  *
2738  *  This version is called (on processor 0) only if flushCheck fails.
2739  *
2740  */
2741 void TraceProjectionsBOC::kMeansDone() {
2742   CkAssert(CkMyPe() == 0);
2743   parModulesRemaining--;
2744   CkPrintf("K-Means Analysis Aborted because of flush. Time taken = %lf seconds\n",
2745            CmiWallTimer()-analysisStartTime);
2746   if (parModulesRemaining == 0) {
2747     thisProxy[CkMyPe()].finalize();
2748   }
2749 }
2750
2751 void TraceProjectionsBOC::finalize()
2752 {
2753   CkAssert(CkMyPe() == 0);
2754   //CkPrintf("Total Analysis Time = %lf seconds\n", 
2755   //       CmiWallTimer()-analysisStartTime);
2756   thisProxy.closingTraces();
2757 }
2758
2759 // Called on every processor
2760 void TraceProjectionsBOC::closingTraces() {
2761   CkpvAccess(_trace)->closeTrace();
2762
2763     // subtle:  reduction needs to go to the PE which started CkExit()
2764   int pe = 0;
2765   if (endPe != -1) pe = endPe;
2766   CkCallback cb(CkIndex_TraceProjectionsBOC::closeParallelShutdown(NULL), 
2767                 pe, thisProxy); 
2768   contribute(0, NULL, CkReduction::sum_int, cb);  
2769 }
2770
2771 // The sole purpose of this reduction is to decide whether or not
2772 //   Projections as a module needs to call CkExit() to get other
2773 //   modules to shutdown.
2774 void TraceProjectionsBOC::closeParallelShutdown(CkReductionMsg *msg) {
2775   CkAssert(endPe == -1 && CkMyPe() ==0 || CkMyPe() == endPe);
2776   delete msg;
2777   // decide if CkExit() needs to be called
2778   if (!CkpvAccess(_trace)->converseExit) {
2779     CkExit();
2780   }
2781 }
2782 /*
2783  *  Registration and definition of the Outlier Reduction callback.
2784  *  Format: Sum | Min | Max | Sum of Squares
2785  */
2786 CkReductionMsg *outlierReduction(int nMsgs,
2787                                  CkReductionMsg **msgs) {
2788   int numBytes = 0;
2789   int numMetrics = 0;
2790   double *ret = NULL;
2791
2792   if (nMsgs == 1) {
2793     // nothing to do, just pass it on
2794     return CkReductionMsg::buildNew(msgs[0]->getSize(),msgs[0]->getData());
2795   }
2796
2797   if (nMsgs > 1) {
2798     numBytes = msgs[0]->getSize();
2799     // sanity checks
2800     if (numBytes%sizeof(double) != 0) {
2801       CkAbort("Outlier Reduction Size incompatible with doubles!\n");
2802     }
2803     if ((numBytes/sizeof(double))%4 != 0) {
2804       CkAbort("Outlier Reduction Size Array not divisible by 4!\n");
2805     }
2806     numMetrics = (numBytes/sizeof(double))/4;
2807     ret = new double[numMetrics*4];
2808
2809     // copy the first message data into the return structure first
2810     for (int i=0; i<numMetrics*4; i++) {
2811       ret[i] = ((double *)msgs[0]->getData())[i];
2812     }
2813
2814     // Sum | Min | Max | Sum of Squares
2815     for (int msgIdx=1; msgIdx<nMsgs; msgIdx++) {
2816       for (int i=0; i<numMetrics; i++) {
2817         // Sum
2818         ret[i] += ((double *)msgs[msgIdx]->getData())[i];
2819         // Min
2820         ret[numMetrics + i] =
2821           (ret[numMetrics + i] < 
2822            ((double *)msgs[msgIdx]->getData())[numMetrics + i]) 
2823           ? ret[numMetrics + i] : 
2824           ((double *)msgs[msgIdx]->getData())[numMetrics + i];
2825         // Max
2826         ret[2*numMetrics + i] = 
2827           (ret[2*numMetrics + i] >
2828            ((double *)msgs[msgIdx]->getData())[2*numMetrics + i])
2829           ? ret[2*numMetrics + i] :
2830           ((double *)msgs[msgIdx]->getData())[2*numMetrics + i];
2831         // Sum of Squares (squaring already done at leaf)
2832         ret[3*numMetrics + i] +=
2833           ((double *)msgs[msgIdx]->getData())[3*numMetrics + i];
2834       }
2835     }
2836   }
2837   
2838   /* apparently, we do not delete the incoming messages */
2839   return CkReductionMsg::buildNew(numBytes,ret);
2840 }
2841
2842 /*
2843  * The only reason we have a user-defined reduction is to support
2844  *   identification of the "winning" processors as well as to handle
2845  *   both the min and the max of each "tournament". A simple min/max
2846  *   discovery cannot handle ties.
2847  */
2848 CkReductionMsg *minMaxReduction(int nMsgs,
2849                                 CkReductionMsg **msgs) {
2850   CkAssert(nMsgs > 0);
2851
2852   int numBytes = msgs[0]->getSize();
2853   CkAssert(numBytes%sizeof(double) == 0);
2854   int numK = (numBytes/sizeof(double))/4;
2855
2856   double *ret = new double[numK*4];
2857   // fill with out-of-band values
2858   for (int i=0; i<numK; i++) {
2859     ret[i*4] = -1.0;
2860     ret[i*4+1] = -1.0;
2861     ret[i*4+2] = -1.0;
2862     ret[i*4+3] = -1.0;
2863   }
2864
2865   // incoming format K * (minVal | minIdx | maxVal | maxIdx)
2866   for (int i=0; i<nMsgs; i++) {
2867     double *temp = (double *)msgs[i]->getData();
2868     for (int j=0; j<numK; j++) {
2869       // no previous valid min
2870       if (ret[j*4+1] < 0) {
2871         // fill it in only if the incoming min is valid
2872         if (temp[j*4+1] >= 0) {
2873           ret[j*4] = temp[j*4];      // fill min value
2874           ret[j*4+1] = temp[j*4+1];  // fill ID
2875         }
2876       } else {
2877         // find Min, only if incoming min is valid
2878         if (temp[j*4+1] >= 0) {
2879           if (temp[j*4] < ret[j*4]) {
2880             ret[j*4] = temp[j*4];      // replace min value
2881             ret[j*4+1] = temp[j*4+1];  // replace ID
2882           }
2883         }
2884       }
2885       // no previous valid max
2886       if (ret[j*4+3] < 0) {
2887         // fill only if the incoming max is valid
2888         if (temp[j*4+3] >= 0) {
2889           ret[j*4+2] = temp[j*4+2];  // fill max value
2890           ret[j*4+3] = temp[j*4+3];  // fill ID
2891         }
2892       } else {
2893         // find Max, only if incoming max is valid
2894         if (temp[j*4+3] >= 0) {
2895           if (temp[j*4+2] > ret[j*4+2]) {
2896             ret[j*4+2] = temp[j*4+2];  // replace max value
2897             ret[j*4+3] = temp[j*4+3];  // replace ID
2898           }
2899         }
2900       }
2901     }
2902   }
2903   CkReductionMsg *redmsg = CkReductionMsg::buildNew(numBytes, ret);
2904   delete [] ret;
2905   return redmsg;
2906 }
2907
2908 #include "TraceProjections.def.h"
2909 #endif //PROJ_ANALYSIS
2910
2911 /*@}*/