fix a bug in traceSendMsgComm() which should only trace charm messages (no Converse...
[charm.git] / src / ck-perf / trace-projections.C
1 /**
2  * \addtogroup CkPerf
3 */
4 /*@{*/
5
6 #include <string.h>
7
8 #include "charm++.h"
9 #include "trace-projections.h"
10 #include "trace-projectionsBOC.h"
11
12 #if DEBUG_PROJ
13 #define DEBUGF(...) CkPrintf(__VA_ARGS__)
14 #else
15 #define DEBUGF(...)
16 #endif
17 #define DEBUGN(...)  // easy way to selectively disable DEBUGs
18
19 #define DefaultLogBufSize      1000000
20
21 // **CW** Simple delta encoding implementation
22 // delta encoding is on by default. It may be turned off later in
23 // the runtime.
24 int deltaLog;
25 int nonDeltaLog;
26
27 int checknested=0;              // check illegal nested begin/end execute 
28
29 #ifdef PROJ_ANALYSIS
30 // BOC operations readonlys
31 CkGroupID traceProjectionsGID;
32 CkGroupID kMeansGID;
33
34 // New reduction type for Outlier Analysis purposes. This is allowed to be
35 // a global variable according to the Charm++ manual.
36 CkReductionMsg *outlierReduction(int nMsgs,
37                                  CkReductionMsg **msgs);
38 CkReductionMsg *minMaxReduction(int nMsgs,
39                                 CkReductionMsg **msgs);
40 CkReduction::reducerType outlierReductionType;
41 CkReduction::reducerType minMaxReductionType;
42 #endif // PROJ_ANALYSIS
43
44 CkpvStaticDeclare(TraceProjections*, _trace);
45 CtvStaticDeclare(int,curThreadEvent);
46
47 CkpvDeclare(CmiInt8, CtrLogBufSize);
48
49 typedef CkVec<char *>  usrEventVec;
50 CkpvStaticDeclare(usrEventVec, usrEventlist);
51 class UsrEvent {
52 public:
53   int e;
54   char *str;
55   UsrEvent(int _e, char* _s): e(_e),str(_s) {}
56 };
57 CkpvStaticDeclare(CkVec<UsrEvent *>*, usrEvents);
58
59 // When tracing is disabled, these are defined as empty static inlines
60 // in the header, to minimize overhead
61 #if CMK_TRACE_ENABLED
62 /// Disable the outputting of the trace logs
63 void disableTraceLogOutput()
64
65   CkpvAccess(_trace)->setWriteData(false);
66 }
67
68 /// Enable the outputting of the trace logs
69 void enableTraceLogOutput()
70 {
71   CkpvAccess(_trace)->setWriteData(true);
72 }
73
74 /// Force the log files to be flushed
75 void flushTraceLog()
76 {
77   CkpvAccess(_trace)->traceFlushLog();
78 }
79 #endif
80
81 #if ! CMK_TRACE_ENABLED
82 static int warned=0;
83 #define OPTIMIZED_VERSION       \
84         if (!warned) { warned=1;        \
85         CmiPrintf("\n\n!!!! Warning: traceUserEvent not available in optimized version!!!!\n\n\n"); }
86 #else
87 #define OPTIMIZED_VERSION /*empty*/
88 #endif // CMK_TRACE_ENABLED
89
90 /*
91 On T3E, we need to have file number control by open/close files only when needed.
92 */
93 #if CMK_TRACE_LOGFILE_NUM_CONTROL
94   #define OPEN_LOG openLog("a");
95   #define CLOSE_LOG closeLog();
96 #else
97   #define OPEN_LOG
98   #define CLOSE_LOG
99 #endif //CMK_TRACE_LOGFILE_NUM_CONTROL
100
101 #if CMK_HAS_COUNTER_PAPI
102 int papiEvents[NUMPAPIEVENTS] = { PAPI_L2_DCM, PAPI_FP_OPS };
103 #endif // CMK_HAS_COUNTER_PAPI
104
105 /**
106   For each TraceFoo module, _createTraceFoo() must be defined.
107   This function is called in _createTraces() generated in moduleInit.C
108 */
109 void _createTraceprojections(char **argv)
110 {
111   DEBUGF("%d createTraceProjections\n", CkMyPe());
112   CkpvInitialize(CkVec<char *>, usrEventlist);
113   CkpvInitialize(CkVec<UsrEvent *>*, usrEvents);
114   CkpvAccess(usrEvents) = new CkVec<UsrEvent *>();
115 #if CMK_BIGSIM_CHARM
116   // CthRegister does not call the constructor
117 //  CkpvAccess(usrEvents) = CkVec<UsrEvent *>();
118 #endif //CMK_BIGSIM_CHARM
119   CkpvInitialize(TraceProjections*, _trace);
120   CkpvAccess(_trace) = new  TraceProjections(argv);
121   CkpvAccess(_traces)->addTrace(CkpvAccess(_trace));
122   if (CkMyPe()==0) CkPrintf("Charm++: Tracemode Projections enabled.\n");
123 }
124  
125 /* ****** CW TEMPORARY LOCATION ***** Support for thread listeners */
126
127 struct TraceThreadListener {
128   struct CthThreadListener base;
129   int event;
130   int msgType;
131   int ep;
132   int srcPe;
133   int ml;
134   CmiObjId idx;
135 };
136
137 extern "C"
138 void traceThreadListener_suspend(struct CthThreadListener *l)
139 {
140   TraceThreadListener *a=(TraceThreadListener *)l;
141   /* here, we activate the appropriate trace codes for the appropriate
142      registered modules */
143   traceSuspend();
144 }
145
146 extern "C"
147 void traceThreadListener_resume(struct CthThreadListener *l) 
148 {
149   TraceThreadListener *a=(TraceThreadListener *)l;
150   /* here, we activate the appropriate trace codes for the appropriate
151      registered modules */
152   _TRACE_BEGIN_EXECUTE_DETAILED(a->event,a->msgType,a->ep,a->srcPe,a->ml,
153                                 CthGetThreadID(a->base.thread));
154   a->event=-1;
155   a->srcPe=CkMyPe(); /* potential lie to migrated threads */
156   a->ml=0;
157 }
158
159 extern "C"
160 void traceThreadListener_free(struct CthThreadListener *l) 
161 {
162   TraceThreadListener *a=(TraceThreadListener *)l;
163   delete a;
164 }
165
166 void TraceProjections::traceAddThreadListeners(CthThread tid, envelope *e)
167 {
168 #if CMK_TRACE_ENABLED
169   /* strip essential information from the envelope */
170   TraceThreadListener *a= new TraceThreadListener;
171   
172   a->base.suspend=traceThreadListener_suspend;
173   a->base.resume=traceThreadListener_resume;
174   a->base.free=traceThreadListener_free;
175   a->event=e->getEvent();
176   a->msgType=e->getMsgtype();
177   a->ep=e->getEpIdx();
178   a->srcPe=e->getSrcPe();
179   a->ml=e->getTotalsize();
180
181   CthAddListener(tid, (CthThreadListener *)a);
182 #endif
183 }
184
185 void LogPool::openLog(const char *mode)
186 {
187 #if CMK_PROJECTIONS_USE_ZLIB
188   if(compressed) {
189     if (nonDeltaLog) {
190       do {
191         zfp = gzopen(fname, mode);
192       } while (!zfp && (errno == EINTR || errno == EMFILE));
193       if(!zfp) CmiAbort("Cannot open Projections Compressed Non Delta Trace File for writing...\n");
194     }
195     if (deltaLog) {
196       do {
197         deltazfp = gzopen(dfname, mode);
198       } while (!deltazfp && (errno == EINTR || errno == EMFILE));
199       if (!deltazfp) 
200         CmiAbort("Cannot open Projections Compressed Delta Trace File for writing...\n");
201     }
202   } else {
203     if (nonDeltaLog) {
204       do {
205         fp = fopen(fname, mode);
206       } while (!fp && (errno == EINTR || errno == EMFILE));
207       if (!fp) {
208         CkPrintf("[%d] Attempting to open file [%s]\n",CkMyPe(),fname);
209         CmiAbort("Cannot open Projections Non Delta Trace File for writing...\n");
210       }
211     }
212     if (deltaLog) {
213       do {
214         deltafp = fopen(dfname, mode);
215       } while (!deltafp && (errno == EINTR || errno == EMFILE));
216       if (!deltafp) 
217         CmiAbort("Cannot open Projections Delta Trace File for writing...\n");
218     }
219   }
220 #else
221   if (nonDeltaLog) {
222     do {
223       fp = fopen(fname, mode);
224     } while (!fp && (errno == EINTR || errno == EMFILE));
225     if (!fp) {
226       CkPrintf("[%d] Attempting to open file [%s]\n",CkMyPe(),fname);
227       CmiAbort("Cannot open Projections Non Delta Trace File for writing...\n");
228     }
229   }
230   if (deltaLog) {
231     do {
232       deltafp = fopen(dfname, mode);
233     } while (!deltafp && (errno == EINTR || errno == EMFILE));
234     if(!deltafp) 
235       CmiAbort("Cannot open Projections Delta Trace File for writing...\n");
236   }
237 #endif
238 }
239
240 void LogPool::closeLog(void)
241 {
242 #if CMK_PROJECTIONS_USE_ZLIB
243   if(compressed) {
244     if (nonDeltaLog) gzclose(zfp);
245     if (deltaLog) gzclose(deltazfp);
246     return;
247   }
248 #endif
249   if (nonDeltaLog) { 
250 #if !defined(_WIN32) || defined(__CYGWIN__)
251     fsync(fileno(fp)); 
252 #endif
253     fclose(fp); 
254   }
255   if (deltaLog)  { 
256 #if !defined(_WIN32) || defined(__CYGWIN__)
257     fsync(fileno(deltafp)); 
258 #endif
259     fclose(deltafp);  
260   }
261 }
262
263 LogPool::LogPool(char *pgm) {
264   pool = new LogEntry[CkpvAccess(CtrLogBufSize)];
265   // defaults to writing data (no outlier changes)
266   writeData = true;
267   numEntries = 0;
268   // **CW** for simple delta encoding
269   prevTime = 0.0;
270   timeErr = 0.0;
271   globalStartTime = 0.0;
272   globalEndTime = 0.0;
273   headerWritten = 0;
274   numPhases = 0;
275   hasFlushed = false;
276
277   keepPhase = NULL;
278
279   fileCreated = false;
280   poolSize = CkpvAccess(CtrLogBufSize);
281   pgmname = new char[strlen(pgm)+1];
282   strcpy(pgmname, pgm);
283 }
284
285 void LogPool::createFile(const char *fix)
286 {
287   if (fileCreated) {
288     return;
289   }
290
291   char* filenameLastPart = strrchr(pgmname, PATHSEP) + 1; // Last occurrence of path separator
292   char *pathPlusFilePrefix = new char[1024];
293
294   if(nSubdirs > 0){
295     int sd = CkMyPe() % nSubdirs;
296     char *subdir = new char[1024];
297     sprintf(subdir, "%s.projdir.%d", pgmname, sd);
298     CmiMkdir(subdir);
299     sprintf(pathPlusFilePrefix, "%s%c%s%s", subdir, PATHSEP, filenameLastPart, fix);
300     delete[] subdir;
301   } else {
302     sprintf(pathPlusFilePrefix, "%s%s", pgmname, fix);
303   }
304
305   char pestr[10];
306   sprintf(pestr, "%d", CkMyPe());
307 #if CMK_PROJECTIONS_USE_ZLIB
308   int len;
309   if(compressed)
310     len = strlen(pathPlusFilePrefix)+strlen(".logold")+strlen(pestr)+strlen(".gz")+3;
311   else
312     len = strlen(pathPlusFilePrefix)+strlen(".logold")+strlen(pestr)+3;
313 #else
314   int len = strlen(pathPlusFilePrefix)+strlen(".logold")+strlen(pestr)+3;
315 #endif
316
317   if (nonDeltaLog) {
318     fname = new char[len];
319   }
320   if (deltaLog) {
321     dfname = new char[len];
322   }
323 #if CMK_PROJECTIONS_USE_ZLIB
324   if(compressed) {
325     if (deltaLog && nonDeltaLog) {
326       sprintf(fname, "%s.%s.logold.gz",  pathPlusFilePrefix, pestr);
327       sprintf(dfname, "%s.%s.log.gz", pathPlusFilePrefix, pestr);
328     } else {
329       if (nonDeltaLog) {
330         sprintf(fname, "%s.%s.log.gz", pathPlusFilePrefix,pestr);
331       } else {
332         sprintf(dfname, "%s.%s.log.gz", pathPlusFilePrefix, pestr);
333       }
334     }
335   } else {
336     if (deltaLog && nonDeltaLog) {
337       sprintf(fname, "%s.%s.logold", pathPlusFilePrefix, pestr);
338       sprintf(dfname, "%s.%s.log", pathPlusFilePrefix, pestr);
339     } else {
340       if (nonDeltaLog) {
341         sprintf(fname, "%s.%s.log", pathPlusFilePrefix, pestr);
342       } else {
343         sprintf(dfname, "%s.%s.log", pathPlusFilePrefix, pestr);
344       }
345     }
346   }
347 #else
348   if (deltaLog && nonDeltaLog) {
349     sprintf(fname, "%s.%s.logold", pathPlusFilePrefix, pestr);
350     sprintf(dfname, "%s.%s.log", pathPlusFilePrefix, pestr);
351   } else {
352     if (nonDeltaLog) {
353       sprintf(fname, "%s.%s.log", pathPlusFilePrefix, pestr);
354     } else {
355       sprintf(dfname, "%s.%s.log", pathPlusFilePrefix, pestr);
356     }
357   }
358 #endif
359   fileCreated = true;
360   delete[] pathPlusFilePrefix;
361   openLog("w+");
362   CLOSE_LOG 
363 }
364
365 void LogPool::createSts(const char *fix)
366 {
367   CkAssert(CkMyPe() == 0);
368   // create the sts file
369   char *fname = new char[strlen(CkpvAccess(traceRoot))+strlen(fix)+strlen(".sts")+2];
370   sprintf(fname, "%s%s.sts", CkpvAccess(traceRoot), fix);
371   do
372     {
373       stsfp = fopen(fname, "w");
374     } while (!stsfp && (errno == EINTR || errno == EMFILE));
375   if(stsfp==0){
376     CmiPrintf("Cannot open projections sts file for writing due to %s\n", strerror(errno));
377     CmiAbort("Error!!\n");
378   }
379   delete[] fname;
380 }  
381
382 void LogPool::createRC()
383 {
384   // create the projections rc file.
385   fname = 
386     new char[strlen(CkpvAccess(traceRoot))+strlen(".projrc")+1];
387   sprintf(fname, "%s.projrc", CkpvAccess(traceRoot));
388   do {
389     rcfp = fopen(fname, "w");
390   } while (!rcfp && (errno == EINTR || errno == EMFILE));
391   if (rcfp==0) {
392     CmiAbort("Cannot open projections configuration file for writing.\n");
393   }
394   delete[] fname;
395 }
396
397 LogPool::~LogPool() 
398 {
399   if (writeData) {
400     writeLog();
401 #if !CMK_TRACE_LOGFILE_NUM_CONTROL
402     closeLog();
403 #endif
404   }
405
406 #if CMK_BIGSIM_CHARM
407   extern int correctTimeLog;
408   if (correctTimeLog) {
409     createFile("-bg");
410     if (CkMyPe() == 0) {
411       createSts("-bg");
412     }
413     writeHeader();
414     if (CkMyPe() == 0) writeSts(NULL);
415     postProcessLog();
416   }
417 #endif
418
419   delete[] pool;
420   delete [] fname;
421 }
422
423 void LogPool::writeHeader()
424 {
425   if (headerWritten) return;
426   headerWritten = 1;
427   if(!binary) {
428 #if CMK_PROJECTIONS_USE_ZLIB
429     if(compressed) {
430       if (nonDeltaLog) {
431         gzprintf(zfp, "PROJECTIONS-RECORD %d\n", numEntries);
432       }
433       if (deltaLog) {
434         gzprintf(deltazfp, "PROJECTIONS-RECORD %d DELTA\n", numEntries);
435       }
436     } 
437     else /* else clause is below... */
438 #endif
439     /*... may hang over from else above */ {
440       if (nonDeltaLog) {
441         fprintf(fp, "PROJECTIONS-RECORD %d\n", numEntries);
442       }
443       if (deltaLog) {
444         fprintf(deltafp, "PROJECTIONS-RECORD %d DELTA\n", numEntries);
445       }
446     }
447   }
448   else { // binary
449       if (nonDeltaLog) {
450         fwrite(&numEntries,sizeof(numEntries),1,fp);
451       }
452       if (deltaLog) {
453         fwrite(&numEntries,sizeof(numEntries),1,deltafp);
454       }
455   }
456 }
457
458 void LogPool::writeLog(void)
459 {
460   createFile();
461   OPEN_LOG
462   writeHeader();
463   if (nonDeltaLog) write(0);
464   if (deltaLog) write(1);
465   CLOSE_LOG
466 }
467
468 void LogPool::write(int writedelta) 
469 {
470   // **CW** Simple delta encoding implementation
471   // prevTime has to be maintained as an object variable because
472   // LogPool::write may be called several times depending on the
473   // +logsize value.
474   PUP::er *p = NULL;
475   if (binary) {
476     p = new PUP::toDisk(writedelta?deltafp:fp);
477   }
478 #if CMK_PROJECTIONS_USE_ZLIB
479   else if (compressed) {
480     p = new toProjectionsGZFile(writedelta?deltazfp:zfp);
481   }
482 #endif
483   else {
484     p = new toProjectionsFile(writedelta?deltafp:fp);
485   }
486   CmiAssert(p);
487   int curPhase = 0;
488   // **FIXME** - Should probably consider a more sophisticated bounds-based
489   //   approach for selective writing instead of making multiple if-checks
490   //   for every single event.
491   for(UInt i=0; i<numEntries; i++) {
492     if (!writedelta) {
493       if (keepPhase == NULL) {
494         // default case, when no phase selection is required.
495         pool[i].pup(*p);
496       } else {
497         // **FIXME** Might be a good idea to create a "filler" event block for
498         //   all the events taken out by phase filtering.
499         if (pool[i].type == END_PHASE) {
500           // always write phase markers
501           pool[i].pup(*p);
502           curPhase++;
503         } else if (pool[i].type == BEGIN_COMPUTATION ||
504                    pool[i].type == END_COMPUTATION) {
505           // always write BEGIN and END COMPUTATION markers
506           pool[i].pup(*p);
507         } else if (keepPhase[curPhase]) {
508           pool[i].pup(*p);
509         }
510       }
511     }
512     else {      // delta
513       // **FIXME** Implement phase-selective writing for delta logs
514       //   eventually
515       double time = pool[i].time;
516       if (pool[i].type != BEGIN_COMPUTATION && pool[i].type != END_COMPUTATION)
517       {
518         double timeDiff = (time-prevTime)*1.0e6;
519         UInt intTimeDiff = (UInt)timeDiff;
520         timeErr += timeDiff - intTimeDiff; /* timeErr is never >= 2.0 */
521         if (timeErr > 1.0) {
522           timeErr -= 1.0;
523           intTimeDiff++;
524         }
525         pool[i].time = intTimeDiff/1.0e6;
526       }
527       pool[i].pup(*p);
528       pool[i].time = time;      // restore time value
529       prevTime = time;
530     }
531   }
532   delete p;
533   delete [] keepPhase;
534 }
535
536 void LogPool::writeSts(void)
537 {
538   // for whining compilers
539   int i;
540   // generate an automatic unique ID for each log
541   fprintf(stsfp, "PROJECTIONS_ID %s\n", "");
542   fprintf(stsfp, "VERSION %s\n", PROJECTION_VERSION);
543   fprintf(stsfp, "TOTAL_PHASES %d\n", numPhases);
544 #if CMK_HAS_COUNTER_PAPI
545   fprintf(stsfp, "TOTAL_PAPI_EVENTS %d\n", NUMPAPIEVENTS);
546   // for now, use i, next time use papiEvents[i].
547   // **CW** papi event names is a hack.
548   char eventName[PAPI_MAX_STR_LEN];
549   for (i=0;i<NUMPAPIEVENTS;i++) {
550     PAPI_event_code_to_name(papiEvents[i], eventName);
551     fprintf(stsfp, "PAPI_EVENT %d %s\n", i, eventName);
552   }
553 #endif
554   traceWriteSTS(stsfp,CkpvAccess(usrEvents)->length());
555   for(i=0;i<CkpvAccess(usrEvents)->length();i++){
556     fprintf(stsfp, "EVENT %d %s\n", (*CkpvAccess(usrEvents))[i]->e, (*CkpvAccess(usrEvents))[i]->str);
557   }     
558 }
559
560 void LogPool::writeSts(TraceProjections *traceProj){
561   writeSts();
562   if (traceProj != NULL) {
563     CkHashtableIterator  *funcIter = traceProj->getfuncIterator();
564     funcIter->seekStart();
565     int numFuncs = traceProj->getFuncNumber();
566     fprintf(stsfp,"TOTAL_FUNCTIONS %d \n",numFuncs);
567     while(funcIter->hasNext()) {
568       StrKey *key;
569       int *obj = (int *)funcIter->next((void **)&key);
570       fprintf(stsfp,"FUNCTION %d %s \n",*obj,key->getStr());
571     }
572   }
573   fprintf(stsfp, "END\n");
574   fclose(stsfp);
575 }
576
577 void LogPool::writeRC(void)
578 {
579     //CkPrintf("write RC is being executed\n");
580 #ifdef PROJ_ANALYSIS  
581     CkAssert(CkMyPe() == 0);
582     fprintf(rcfp,"RC_GLOBAL_START_TIME %lld\n",
583           (CMK_TYPEDEF_UINT8)(1.0e6*globalStartTime));
584     fprintf(rcfp,"RC_GLOBAL_END_TIME   %lld\n",
585           (CMK_TYPEDEF_UINT8)(1.0e6*globalEndTime));
586     /* //Yanhua comment it because isOutlierAutomatic is not a variable in trace
587     if (CkpvAccess(_trace)->isOutlierAutomatic()) {
588       fprintf(rcfp,"RC_OUTLIER_FILTERED true\n");
589     } else {
590       fprintf(rcfp,"RC_OUTLIER_FILTERED false\n");
591     }
592     */
593 #endif //PROJ_ANALYSIS
594   fclose(rcfp);
595 }
596
597
598 #if CMK_BIGSIM_CHARM
599 static void updateProjLog(void *data, double t, double recvT, void *ptr)
600 {
601   LogEntry *log = (LogEntry *)data;
602   FILE *fp = *(FILE **)ptr;
603   log->time = t;
604   log->recvTime = recvT<0.0?0:recvT;
605 //  log->write(fp);
606   toProjectionsFile p(fp);
607   log->pup(p);
608 }
609 #endif
610
611 // flush log entries to disk
612 void LogPool::flushLogBuffer()
613 {
614   if (numEntries) {
615     double writeTime = TraceTimer();
616     writeLog();
617     hasFlushed = true;
618     numEntries = 0;
619     new (&pool[numEntries++]) LogEntry(writeTime, BEGIN_INTERRUPT);
620     new (&pool[numEntries++]) LogEntry(TraceTimer(), END_INTERRUPT);
621   }
622 }
623
624 void LogPool::add(UChar type, UShort mIdx, UShort eIdx,
625                   double time, int event, int pe, int ml, CmiObjId *id, 
626                   double recvT, double cpuT, int numPe)
627 {
628   new (&pool[numEntries++])
629     LogEntry(time, type, mIdx, eIdx, event, pe, ml, id, recvT, cpuT, numPe);
630   if ((type == END_PHASE) || (type == END_COMPUTATION)) {
631     numPhases++;
632   }
633   if(poolSize==numEntries) {
634     flushLogBuffer();
635 #if CMK_BIGSIM_CHARM
636     extern int correctTimeLog;
637     if (correctTimeLog) CmiAbort("I/O interrupt!\n");
638 #endif
639   }
640 #if CMK_BIGSIM_CHARM
641   switch (type) {
642     case BEGIN_PROCESSING:
643       pool[numEntries-1].recvTime = BgGetRecvTime();
644     case END_PROCESSING:
645     case BEGIN_COMPUTATION:
646     case END_COMPUTATION:
647     case CREATION:
648     case BEGIN_PACK:
649     case END_PACK:
650     case BEGIN_UNPACK:
651     case END_UNPACK:
652     case USER_EVENT_PAIR:
653       bgAddProjEvent(&pool[numEntries-1], numEntries-1, time, updateProjLog, &fp, BG_EVENT_PROJ);
654   }
655 #endif
656 }
657
658 void LogPool::add(UChar type,double time,UShort funcID,int lineNum,char *fileName){
659 #ifndef CMK_BIGSIM_CHARM
660   new (&pool[numEntries++])
661         LogEntry(time,type,funcID,lineNum,fileName);
662   if(poolSize == numEntries){
663     flushLogBuffer();
664   }
665 #endif  
666 }
667
668
669   
670 void LogPool::addMemoryUsage(unsigned char type,double time,double memUsage){
671 #ifndef CMK_BIGSIM_CHARM
672   new (&pool[numEntries++])
673         LogEntry(type,time,memUsage);
674   if(poolSize == numEntries){
675     flushLogBuffer();
676   }
677 #endif  
678         
679 }  
680
681
682
683 void LogPool::addUserSupplied(int data){
684         // add an event
685         add(USER_SUPPLIED, 0, 0, TraceTimer(), -1, -1, 0, 0, 0, 0, 0 );
686
687         // set the user supplied value for the previously created event 
688         pool[numEntries-1].setUserSuppliedData(data);
689   }
690
691
692 void LogPool::addUserSuppliedNote(char *note){
693         // add an event
694         add(USER_SUPPLIED_NOTE, 0, 0, TraceTimer(), -1, -1, 0, 0, 0, 0, 0 );
695
696         // set the user supplied note for the previously created event 
697         pool[numEntries-1].setUserSuppliedNote(note);
698   }
699
700 void LogPool::addUserSuppliedBracketedNote(char *note, int eventID, double bt, double et){
701   //CkPrintf("LogPool::addUserSuppliedBracketedNote eventID=%d\n", eventID);
702 #ifndef CMK_BIGSIM_CHARM
703 #if MPI_TRACE_MACHINE_HACK
704   //This part of code is used  to combine the contiguous
705   //MPI_Test and MPI_Iprobe events to reduce the number of
706   //entries
707 #define MPI_TEST_EVENT_ID 60
708 #define MPI_IPROBE_EVENT_ID 70 
709   int lastEvent = pool[numEntries-1].event;
710   if((eventID==MPI_TEST_EVENT_ID || eventID==MPI_IPROBE_EVENT_ID) && (eventID==lastEvent)){
711     //just replace the endtime of last event
712     //CkPrintf("addUserSuppliedBracketNote: for event %d\n", lastEvent);
713     pool[numEntries].endTime = et;
714   }else{
715     new (&pool[numEntries++])
716       LogEntry(bt, et, USER_SUPPLIED_BRACKETED_NOTE, note, eventID);
717   }
718 #else
719   new (&pool[numEntries++])
720     LogEntry(bt, et, USER_SUPPLIED_BRACKETED_NOTE, note, eventID);
721 #endif
722   if(poolSize == numEntries){
723     flushLogBuffer();
724   }
725 #endif  
726 }
727
728
729 /* **CW** Not sure if this is the right thing to do. Feels more like
730    a hack than a solution to Sameer's request to add the destination
731    processor information to multicasts and broadcasts.
732
733    In the unlikely event this method is used for Broadcasts as well,
734    pelist == NULL will be used to indicate a global broadcast with 
735    num PEs.
736 */
737 void LogPool::addCreationMulticast(UShort mIdx, UShort eIdx, double time,
738                                    int event, int pe, int ml, CmiObjId *id,
739                                    double recvT, int numPe, int *pelist)
740 {
741   new (&pool[numEntries++])
742     LogEntry(time, mIdx, eIdx, event, pe, ml, id, recvT, numPe, pelist);
743   if(poolSize==numEntries) {
744     flushLogBuffer();
745   }
746 }
747
748 void LogPool::postProcessLog()
749 {
750 #if CMK_BIGSIM_CHARM
751   bgUpdateProj(1);   // event type
752 #endif
753 }
754
755 void LogPool::modLastEntryTimestamp(double ts)
756 {
757   pool[numEntries-1].time = ts;
758   //pool[numEntries-1].cputime = ts;
759 }
760
761 // /** Constructor for a multicast log entry */
762 // 
763 //  THIS WAS MOVED TO trace-projections.h with the other constructors
764 // 
765 // LogEntry::LogEntry(double tm, unsigned short m, unsigned short e, int ev, int p,
766 //           int ml, CmiObjId *d, double rt, int numPe, int *pelist) 
767 // {
768 //     type = CREATION_MULTICAST; mIdx = m; eIdx = e; event = ev; pe = p; time = tm; msglen = ml;
769 //     if (d) id = *d; else {id.id[0]=id.id[1]=id.id[2]=id.id[3]=-1; };
770 //     recvTime = rt; 
771 //     numpes = numPe;
772 //     userSuppliedNote = NULL;
773 //     if (pelist != NULL) {
774 //      pes = new int[numPe];
775 //      for (int i=0; i<numPe; i++) {
776 //        pes[i] = pelist[i];
777 //      }
778 //     } else {
779 //      pes= NULL;
780 //     }
781 // }
782
783 //void LogEntry::addPapi(LONG_LONG_PAPI *papiVals)
784 //{
785 //#if CMK_HAS_COUNTER_PAPI
786 //   memcpy(papiValues, papiVals, sizeof(LONG_LONG_PAPI)*NUMPAPIEVENTS);
787 //#endif
788 //}
789
790
791
792 void LogEntry::pup(PUP::er &p)
793 {
794   int i;
795   CMK_TYPEDEF_UINT8 itime, iEndTime, irecvtime, icputime;
796   char ret = '\n';
797
798   p|type;
799   if (p.isPacking()) itime = (CMK_TYPEDEF_UINT8)(1.0e6*time);
800   if (p.isPacking()) iEndTime = (CMK_TYPEDEF_UINT8)(1.0e6*endTime);
801
802   switch (type) {
803     case USER_EVENT:
804     case USER_EVENT_PAIR:
805       p|mIdx; p|itime; p|event; p|pe;
806       break;
807     case BEGIN_IDLE:
808     case END_IDLE:
809     case BEGIN_PACK:
810     case END_PACK:
811     case BEGIN_UNPACK:
812     case END_UNPACK:
813       p|itime; p|pe; 
814       break;
815     case BEGIN_PROCESSING:
816       if (p.isPacking()) {
817         irecvtime = (CMK_TYPEDEF_UINT8)(recvTime==-1?-1:1.0e6*recvTime);
818         icputime = (CMK_TYPEDEF_UINT8)(1.0e6*cputime);
819       }
820       p|mIdx; p|eIdx; p|itime; p|event; p|pe; 
821       p|msglen; p|irecvtime; 
822       p|id.id[0]; p|id.id[1]; p|id.id[2]; p|id.id[3];
823       p|icputime;
824 #if CMK_HAS_COUNTER_PAPI
825       //p|numPapiEvents;
826       for (i=0; i<NUMPAPIEVENTS; i++) {
827         // not yet!!!
828         //      p|papiIDs[i]; 
829         p|papiValues[i];
830         
831       }
832 #else
833       //p|numPapiEvents;     // non papi version has value 0
834 #endif
835       if (p.isUnpacking()) {
836         recvTime = irecvtime/1.0e6;
837         cputime = icputime/1.0e6;
838       }
839       break;
840     case END_PROCESSING:
841       if (p.isPacking()) icputime = (CMK_TYPEDEF_UINT8)(1.0e6*cputime);
842       p|mIdx; p|eIdx; p|itime; p|event; p|pe; p|msglen; p|icputime;
843 #if CMK_HAS_COUNTER_PAPI
844       //p|numPapiEvents;
845       for (i=0; i<NUMPAPIEVENTS; i++) {
846         // not yet!!!
847         //      p|papiIDs[i];
848         p|papiValues[i];
849       }
850 #else
851       //p|numPapiEvents;  // non papi version has value 0
852 #endif
853       if (p.isUnpacking()) cputime = icputime/1.0e6;
854       break;
855     case USER_SUPPLIED:
856           p|userSuppliedData;
857           p|itime;
858         break;
859     case USER_SUPPLIED_NOTE:
860           p|itime;
861           int length;
862           if (p.isPacking()) length = strlen(userSuppliedNote);
863           p | length;
864           char space;
865           space = ' ';
866           p | space;
867           if (p.isUnpacking()) {
868             userSuppliedNote = new char[length+1];
869             userSuppliedNote[length] = '\0';
870           }
871           PUParray(p,userSuppliedNote, length);
872           break;
873     case USER_SUPPLIED_BRACKETED_NOTE:
874       //CkPrintf("Writting out a USER_SUPPLIED_BRACKETED_NOTE\n");
875           p|itime;
876           p|iEndTime;
877           p|event;
878           int length2;
879           if (p.isPacking()) length2 = strlen(userSuppliedNote);
880           p | length2;
881           char space2;
882           space2 = ' ';
883           p | space2;
884           if (p.isUnpacking()) {
885             userSuppliedNote = new char[length+1];
886             userSuppliedNote[length] = '\0';
887           }
888           PUParray(p,userSuppliedNote, length2);
889           break;
890     case MEMORY_USAGE_CURRENT:
891       p | memUsage;
892       p | itime;
893         break;
894     case CREATION:
895       if (p.isPacking()) irecvtime = (CMK_TYPEDEF_UINT8)(1.0e6*recvTime);
896       p|mIdx; p|eIdx; p|itime;
897       p|event; p|pe; p|msglen; p|irecvtime;
898       if (p.isUnpacking()) recvTime = irecvtime/1.0e6;
899       break;
900     case CREATION_BCAST:
901       if (p.isPacking()) irecvtime = (CMK_TYPEDEF_UINT8)(1.0e6*recvTime);
902       p|mIdx; p|eIdx; p|itime;
903       p|event; p|pe; p|msglen; p|irecvtime; p|numpes;
904       if (p.isUnpacking()) recvTime = irecvtime/1.0e6;
905       break;
906     case CREATION_MULTICAST:
907       if (p.isPacking()) irecvtime = (CMK_TYPEDEF_UINT8)(1.0e6*recvTime);
908       p|mIdx; p|eIdx; p|itime;
909       p|event; p|pe; p|msglen; p|irecvtime; p|numpes;
910       if (p.isUnpacking()) pes = numpes?new int[numpes]:NULL;
911       for (i=0; i<numpes; i++) p|pes[i];
912       if (p.isUnpacking()) recvTime = irecvtime/1.0e6;
913       break;
914     case MESSAGE_RECV:
915       p|mIdx; p|eIdx; p|itime; p|event; p|pe; p|msglen;
916       break;
917
918     case ENQUEUE:
919     case DEQUEUE:
920       p|mIdx; p|itime; p|event; p|pe;
921       break;
922
923     case BEGIN_INTERRUPT:
924     case END_INTERRUPT:
925       p|itime; p|event; p|pe;
926       break;
927
928       // **CW** absolute timestamps are used here to support a quick
929       // way of determining the total time of a run in projections
930       // visualization.
931     case BEGIN_COMPUTATION:
932     case END_COMPUTATION:
933     case BEGIN_TRACE:
934     case END_TRACE:
935       p|itime;
936       break;
937     case BEGIN_FUNC:
938         p | itime;
939         p | mIdx;
940         p | event;
941         if(!p.isUnpacking()){
942                 p(fName,flen-1);
943         }
944         break;
945     case END_FUNC:
946         p | itime;
947         p | mIdx;
948         break;
949     case END_PHASE:
950       p|eIdx; // FIXME: actually the phase ID
951       p|itime;
952       break;
953     default:
954       CmiError("***Internal Error*** Wierd Event %d.\n", type);
955       break;
956   }
957   if (p.isUnpacking()) time = itime/1.0e6;
958   p|ret;
959 }
960
961 TraceProjections::TraceProjections(char **argv): 
962   curevent(0), inEntry(0), computationStarted(0), 
963         converseExit(0), endTime(0.0), traceNestedEvents(0),
964         currentPhaseID(0), lastPhaseEvent(NULL)
965 {
966   //  CkPrintf("Trace projections dummy constructor called on %d\n",CkMyPe());
967
968   if (CkpvAccess(traceOnPe) == 0) return;
969
970   CtvInitialize(int,curThreadEvent);
971   CkpvInitialize(CmiInt8, CtrLogBufSize);
972   CkpvAccess(CtrLogBufSize) = DefaultLogBufSize;
973   CtvAccess(curThreadEvent)=0;
974   if (CmiGetArgLongDesc(argv,"+logsize",&CkpvAccess(CtrLogBufSize), 
975                        "Log entries to buffer per I/O")) {
976     if (CkMyPe() == 0) {
977       CmiPrintf("Trace: logsize: %ld\n", CkpvAccess(CtrLogBufSize));
978     }
979   }
980   checknested = 
981     CmiGetArgFlagDesc(argv,"+checknested",
982                       "check projections nest begin end execute events");
983   traceNestedEvents = 
984     CmiGetArgFlagDesc(argv,"+tracenested",
985               "trace projections nest begin/end execute events");
986   int binary = 
987     CmiGetArgFlagDesc(argv,"+binary-trace",
988                       "Write log files in binary format");
989
990   CmiInt8 nSubdirs = 0;
991   CmiGetArgLongDesc(argv,"+trace-subdirs", &nSubdirs, "Number of subdirectories into which traces will be written");
992
993
994 #if CMK_PROJECTIONS_USE_ZLIB
995   int compressed = CmiGetArgFlagDesc(argv,"+gz-trace","Write log files pre-compressed with gzip");
996 #else
997   // consume the flag so there's no confusing
998   CmiGetArgFlagDesc(argv,"+gz-trace",
999                     "Write log files pre-compressed with gzip");
1000   if(CkMyPe() == 0) CkPrintf("Warning> gz-trace is not supported on this machine!\n");
1001 #endif
1002
1003   // **CW** default to non delta log encoding. The user may choose to do
1004   // create both logs (for debugging) or just the old log timestamping
1005   // (for compatibility).
1006   // Generating just the non delta log takes precedence over generating
1007   // both logs (if both arguments appear on the command line).
1008
1009   // switch to OLD log format until everything works // Gengbin
1010   nonDeltaLog = 1;
1011   deltaLog = 0;
1012   deltaLog = CmiGetArgFlagDesc(argv, "+logDelta",
1013                                   "Generate Delta encoded and simple timestamped log files");
1014
1015   _logPool = new LogPool(CkpvAccess(traceRoot));
1016   _logPool->setNumSubdirs(nSubdirs);
1017   _logPool->setBinary(binary);
1018 #if CMK_PROJECTIONS_USE_ZLIB
1019   _logPool->setCompressed(compressed);
1020 #endif
1021   if (CkMyPe() == 0) {
1022     _logPool->createSts();
1023     _logPool->createRC();
1024   }
1025   funcCount=1;
1026
1027 #if CMK_HAS_COUNTER_PAPI
1028   // We initialize and create the event sets for use with PAPI here.
1029   int papiRetValue;
1030   if(CkMyRank()==0){
1031     papiRetValue = PAPI_library_init(PAPI_VER_CURRENT);
1032     if (papiRetValue != PAPI_VER_CURRENT) {
1033       CmiAbort("PAPI Library initialization failure!\n");
1034     }
1035 #if CMK_SMP
1036     if(PAPI_thread_init(pthread_self) != PAPI_OK){
1037       CmiAbort("PAPI could not be initialized in SMP mode!\n");
1038     }
1039 #endif
1040   }
1041
1042 #if CMK_SMP
1043   //PAPI_thread_init has to finish before calling PAPI_create_eventset
1044   CmiNodeAllBarrier();
1045 #endif
1046   // PAPI 3 mandates the initialization of the set to PAPI_NULL
1047   papiEventSet = PAPI_NULL; 
1048   if (PAPI_create_eventset(&papiEventSet) != PAPI_OK) {
1049     CmiAbort("PAPI failed to create event set!\n");
1050   }
1051   papiRetValue = PAPI_add_events(papiEventSet, papiEvents, NUMPAPIEVENTS);
1052   if (papiRetValue != PAPI_OK) {
1053     if (papiRetValue == PAPI_ECNFLCT) {
1054       CmiAbort("PAPI events conflict! Please re-assign event types!\n");
1055     } else {
1056       CmiAbort("PAPI failed to add designated events!\n");
1057     }
1058   }
1059   memset(papiValues, 0, NUMPAPIEVENTS*sizeof(LONG_LONG_PAPI));
1060 #endif
1061 }
1062
1063 int TraceProjections::traceRegisterUserEvent(const char* evt, int e)
1064 {
1065   OPTIMIZED_VERSION
1066   CkAssert(e==-1 || e>=0);
1067   CkAssert(evt != NULL);
1068   int event;
1069   int biggest = -1;
1070   for (int i=0; i<CkpvAccess(usrEvents)->length(); i++) {
1071     int cur = (*CkpvAccess(usrEvents))[i]->e;
1072     if (cur == e) {
1073       //CmiPrintf("%s %s\n", (*CkpvAccess(usrEvents))[i]->str, evt);
1074       if (strcmp((*CkpvAccess(usrEvents))[i]->str, evt) == 0) 
1075         return e;
1076       else
1077         CmiAbort("UserEvent double registered!");
1078     }
1079     if (cur > biggest) biggest = cur;
1080   }
1081   // if no user events have so far been registered. biggest will be -1
1082   // and hence newly assigned event numbers will begin from 0.
1083   if (e==-1) event = biggest+1;  // automatically assign new event number
1084   else event = e;
1085   CkpvAccess(usrEvents)->push_back(new UsrEvent(event,(char *)evt));
1086   return event;
1087 }
1088
1089 void TraceProjections::traceClearEps(void)
1090 {
1091   // In trace-summary, this zeros out the EP bins, to eliminate noise
1092   // from startup.  Here, this isn't useful, since we can do that in
1093   // post-processing
1094 }
1095
1096 void TraceProjections::traceWriteSts(void)
1097 {
1098   if(CkMyPe()==0)
1099     _logPool->writeSts(this);
1100 }
1101
1102 /** 
1103  * **IMPT NOTES**:
1104  *
1105  * This is called when Converse closes during ConverseCommonExit().
1106  * **FIXME**(?) - is this also exposed as a tracing-framework API call?
1107  *
1108  * Some programs bypass CkExit() (like NAMD, which eventually calls
1109  * ConverseExit()), modules like traces will have to pretend to shutdown
1110  * as if CkExit() was called but at the same time avoid making
1111  * subsequent CkExit() calls (which is usually required for allowing
1112  * other modules to shutdown).
1113  *
1114  * Note that we can only get here if CkExit() was not called, since the
1115  * trace module will un-register itself from TraceArray if it did.
1116  *
1117  */
1118 void TraceProjections::traceClose(void)
1119 {
1120 #ifdef PROJ_ANALYSIS
1121   // CkPrintf("CkExit was not called on shutdown on [%d]\n", CkMyPe());
1122
1123   // sets the flag that tells the code not to make the CkExit call later
1124   converseExit = 1;
1125   if (CkMyPe() == 0) {
1126     CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1127     bocProxy.traceProjectionsParallelShutdown(-1);
1128   }
1129   if(CkMyRank() == CkMyNodeSize()){ //communication thread
1130     CkpvAccess(_trace)->endComputation();
1131     delete _logPool;              // will write
1132     // remove myself from traceArray so that no tracing will be called.
1133     CkpvAccess(_traces)->removeTrace(this);
1134   }
1135 #else
1136   // we've already deleted the logpool, so multiple calls to traceClose
1137   // are tolerated.
1138   if (_logPool == NULL) {
1139     return;
1140   }
1141   if(CkMyPe()==0){
1142     _logPool->writeSts(this);
1143   }
1144   CkpvAccess(_trace)->endComputation();
1145   delete _logPool;              // will write
1146   // remove myself from traceArray so that no tracing will be called.
1147   CkpvAccess(_traces)->removeTrace(this);
1148 #endif
1149 }
1150
1151 /**
1152  *  **IMPT NOTES**:
1153  *
1154  *  This is meant to be called internally by the tracing framework.
1155  *
1156  */
1157 void TraceProjections::closeTrace() {
1158   //  CkPrintf("Close Trace called on [%d]\n", CkMyPe());
1159   if (CkMyPe() == 0) {
1160     // CkPrintf("Pe 0 will now write sts and projrc files\n");
1161     _logPool->writeSts(this);
1162     _logPool->writeRC();
1163     // CkPrintf("Pe 0 has now written sts and projrc files\n");
1164   }
1165   delete _logPool;       // will write logs to file
1166 }
1167
1168 #if CMK_SMP_TRACE_COMMTHREAD
1169 void TraceProjections::traceBeginOnCommThread()
1170 {
1171   if (!computationStarted) return;
1172   _logPool->add(BEGIN_TRACE, 0, 0, TraceTimer(), curevent++, CmiNumPes()+CmiMyNode());
1173 }
1174
1175 void TraceProjections::traceEndOnCommThread()
1176 {
1177   _logPool->add(END_TRACE, 0, 0, TraceTimer(), curevent++, CmiNumPes()+CmiMyNode());
1178 }
1179 #endif
1180
1181 void TraceProjections::traceBegin(void)
1182 {
1183   if (!computationStarted) return;
1184   _logPool->add(BEGIN_TRACE, 0, 0, TraceTimer(), curevent++, CkMyPe());
1185 }
1186
1187 void TraceProjections::traceEnd(void)
1188 {
1189   _logPool->add(END_TRACE, 0, 0, TraceTimer(), curevent++, CkMyPe());
1190 }
1191
1192 void TraceProjections::userEvent(int e)
1193 {
1194   if (!computationStarted) return;
1195   _logPool->add(USER_EVENT, e, 0, TraceTimer(),curevent++,CkMyPe());
1196 }
1197
1198 void TraceProjections::userBracketEvent(int e, double bt, double et)
1199 {
1200   if (!computationStarted) return;
1201   // two events record Begin/End of event e.
1202   _logPool->add(USER_EVENT_PAIR, e, 0, TraceTimer(bt), curevent, CkMyPe());
1203   _logPool->add(USER_EVENT_PAIR, e, 0, TraceTimer(et), curevent++, CkMyPe());
1204 }
1205
1206 void TraceProjections::userSuppliedData(int d)
1207 {
1208   if (!computationStarted) return;
1209   _logPool->addUserSupplied(d);
1210 }
1211
1212 void TraceProjections::userSuppliedNote(char *note)
1213 {
1214   if (!computationStarted) return;
1215   _logPool->addUserSuppliedNote(note);
1216 }
1217
1218
1219 void TraceProjections::userSuppliedBracketedNote(char *note, int eventID, double bt, double et)
1220 {
1221   if (!computationStarted) return;
1222   _logPool->addUserSuppliedBracketedNote(note,  eventID,  bt, et);
1223 }
1224
1225 void TraceProjections::memoryUsage(double m)
1226 {
1227   if (!computationStarted) return;
1228   _logPool->addMemoryUsage(MEMORY_USAGE_CURRENT, TraceTimer(), m );
1229   
1230 }
1231
1232
1233 void TraceProjections::creation(envelope *e, int ep, int num)
1234 {
1235   double curTime = TraceTimer();
1236   if (e == 0) {
1237     CtvAccess(curThreadEvent) = curevent;
1238     _logPool->add(CREATION, ForChareMsg, ep, curTime,
1239                   curevent++, CkMyPe(), 0, NULL, 0, 0.0);
1240   } else {
1241     int type=e->getMsgtype();
1242     e->setEvent(curevent);
1243     if (num > 1) {
1244       _logPool->add(CREATION_BCAST, type, ep, curTime,
1245                     curevent++, CkMyPe(), e->getTotalsize(), 
1246                     NULL, 0, 0.0, num);
1247     } else {
1248       _logPool->add(CREATION, type, ep, curTime,
1249                     curevent++, CkMyPe(), e->getTotalsize(), 
1250                     NULL, 0, 0.0);
1251     }
1252   }
1253 }
1254
1255 void TraceProjections::creation(char *msg)
1256 {
1257 #if CMK_SMP_TRACE_COMMTHREAD
1258         //This function is only called from a comm thread
1259         //in SMP mode. 
1260         // msg must be a charm message
1261         envelope *e = (envelope *)msg;
1262         int ep = e->getEpIdx();
1263         if(_entryTable[ep]->traceEnabled)
1264                 creation(e, ep, 1);
1265 #endif
1266 }
1267
1268
1269 /* **CW** Non-disruptive attempt to add destination PE knowledge to
1270    Communication Library-specific Multicasts via new event 
1271    CREATION_MULTICAST.
1272 */
1273
1274 void TraceProjections::creationMulticast(envelope *e, int ep, int num,
1275                                          int *pelist)
1276 {
1277   double curTime = TraceTimer();
1278   if (e==0) {
1279     CtvAccess(curThreadEvent)=curevent;
1280     _logPool->addCreationMulticast(ForChareMsg, ep, curTime, curevent++,
1281                                    CkMyPe(), 0, 0, 0.0, num, pelist);
1282   } else {
1283     int type=e->getMsgtype();
1284     e->setEvent(curevent);
1285     _logPool->addCreationMulticast(type, ep, curTime, curevent++, CkMyPe(),
1286                                    e->getTotalsize(), 0, 0.0, num, pelist);
1287   }
1288 }
1289
1290 void TraceProjections::creationDone(int num)
1291 {
1292   // modified the creation done time of the last num log entries
1293   // FIXME: totally a hack
1294   double curTime = TraceTimer();
1295   int idx = _logPool->numEntries-1;
1296   while (idx >=0 && num >0 ) {
1297     LogEntry &log = _logPool->pool[idx];
1298     if ((log.type == CREATION) ||
1299         (log.type == CREATION_BCAST) ||
1300         (log.type == CREATION_MULTICAST)) {
1301       log.recvTime = curTime - log.time;
1302       num --;
1303     }
1304     idx--;
1305   }
1306 }
1307
1308 void TraceProjections::beginExecute(CmiObjId *tid)
1309 {
1310 #if CMK_HAS_COUNTER_PAPI
1311   if (PAPI_read(papiEventSet, papiValues) != PAPI_OK) {
1312     CmiAbort("PAPI failed to read at begin execute!\n");
1313   }
1314 #endif
1315   if (checknested && inEntry) CmiAbort("Nested Begin Execute!\n");
1316   execEvent = CtvAccess(curThreadEvent);
1317   execEp = (-1);
1318   _logPool->add(BEGIN_PROCESSING,ForChareMsg,_threadEP,TraceTimer(),
1319                 execEvent,CkMyPe(), 0, tid);
1320 #if CMK_HAS_COUNTER_PAPI
1321   _logPool->addPapi(papiValues);
1322 #endif
1323   inEntry = 1;
1324 }
1325
1326 void TraceProjections::beginExecute(envelope *e)
1327 {
1328   if(e==0) {
1329 #if CMK_HAS_COUNTER_PAPI
1330     if (PAPI_read(papiEventSet, papiValues) != PAPI_OK) {
1331       CmiAbort("PAPI failed to read at begin execute!\n");
1332     }
1333 #endif
1334     if (checknested && inEntry) CmiAbort("Nested Begin Execute!\n");
1335     execEvent = CtvAccess(curThreadEvent);
1336     execEp = (-1);
1337     _logPool->add(BEGIN_PROCESSING,ForChareMsg,_threadEP,TraceTimer(),
1338                   execEvent,CkMyPe(), 0, NULL, 0.0, TraceCpuTimer());
1339 #if CMK_HAS_COUNTER_PAPI
1340     _logPool->addPapi(papiValues);
1341 #endif
1342     inEntry = 1;
1343   } else {
1344     beginExecute(e->getEvent(),e->getMsgtype(),e->getEpIdx(),
1345                  e->getSrcPe(),e->getTotalsize());
1346   }
1347 }
1348
1349 void TraceProjections::beginExecute(char *msg){
1350 #if CMK_SMP_TRACE_COMMTHREAD
1351         //This function is called from comm thread in SMP mode
1352     envelope *e = (envelope *)msg;
1353     int num = _entryTable.size();
1354     int ep = e->getEpIdx();
1355     if(ep<0 || ep>=num) return;
1356     if(_entryTable[ep]->traceEnabled)
1357                 beginExecute(e);
1358 #endif
1359 }
1360
1361 void TraceProjections::beginExecute(int event, int msgType, int ep, int srcPe,
1362                                     int mlen, CmiObjId *idx)
1363 {
1364   if (traceNestedEvents) {
1365     if (! nestedEvents.isEmpty()) {
1366       endExecuteLocal();
1367     }
1368     nestedEvents.enq(NestedEvent(event, msgType, ep, srcPe, mlen, idx));
1369   }
1370   beginExecuteLocal(event, msgType, ep, srcPe, mlen, idx);
1371 }
1372
1373 void TraceProjections::changeLastEntryTimestamp(double ts)
1374 {
1375   _logPool->modLastEntryTimestamp(ts);
1376 }
1377
1378 void TraceProjections::beginExecuteLocal(int event, int msgType, int ep, int srcPe,
1379                                     int mlen, CmiObjId *idx)
1380 {
1381 #if CMK_HAS_COUNTER_PAPI
1382   if (PAPI_read(papiEventSet, papiValues) != PAPI_OK) {
1383     CmiAbort("PAPI failed to read at begin execute!\n");
1384   }
1385 #endif
1386   if (checknested && inEntry) CmiAbort("Nested Begin Execute!\n");
1387   execEvent=event;
1388   execEp=ep;
1389   execPe=srcPe;
1390   _logPool->add(BEGIN_PROCESSING,msgType,ep,TraceTimer(),event,
1391                 srcPe, mlen, idx, 0.0, TraceCpuTimer());
1392 #if CMK_HAS_COUNTER_PAPI
1393   _logPool->addPapi(papiValues);
1394 #endif
1395   inEntry = 1;
1396 }
1397
1398 void TraceProjections::endExecute(void)
1399 {
1400   if (traceNestedEvents) nestedEvents.deq();
1401   endExecuteLocal();
1402   if (traceNestedEvents) {
1403     if (! nestedEvents.isEmpty()) {
1404       NestedEvent &ne = nestedEvents.peek();
1405       beginExecuteLocal(ne.event, ne.msgType, ne.ep, ne.srcPe, ne.ml, ne.idx);
1406     }
1407   }
1408 }
1409
1410 void TraceProjections::endExecute(char *msg)
1411 {
1412 #if CMK_SMP_TRACE_COMMTHREAD
1413         //This function is called from comm thread in SMP mode
1414     envelope *e = (envelope *)msg;
1415     int num = _entryTable.size();
1416     int ep = e->getEpIdx();
1417     if(ep<0 || ep>=num) return;
1418     if(_entryTable[ep]->traceEnabled)
1419                 endExecute();
1420 #endif  
1421 }
1422
1423 void TraceProjections::endExecuteLocal(void)
1424 {
1425 #if CMK_HAS_COUNTER_PAPI
1426   if (PAPI_read(papiEventSet, papiValues) != PAPI_OK) {
1427     CmiAbort("PAPI failed to read at end execute!\n");
1428   }
1429 #endif
1430   if (checknested && !inEntry) CmiAbort("Nested EndExecute!\n");
1431   double cputime = TraceCpuTimer();
1432   if(execEp == (-1)) {
1433     _logPool->add(END_PROCESSING, 0, _threadEP, TraceTimer(),
1434                   execEvent, CkMyPe(), 0, NULL, 0.0, cputime);
1435   } else {
1436     _logPool->add(END_PROCESSING, 0, execEp, TraceTimer(),
1437                   execEvent, execPe, 0, NULL, 0.0, cputime);
1438   }
1439 #if CMK_HAS_COUNTER_PAPI
1440   _logPool->addPapi(papiValues);
1441 #endif
1442   inEntry = 0;
1443 }
1444
1445 void TraceProjections::messageRecv(char *env, int pe)
1446 {
1447 #if 0
1448   envelope *e = (envelope *)env;
1449   int msgType = e->getMsgtype();
1450   int ep = e->getEpIdx();
1451 #if 0
1452   if (msgType==NewChareMsg || msgType==NewVChareMsg
1453           || msgType==ForChareMsg || msgType==ForVidMsg
1454           || msgType==BocInitMsg || msgType==NodeBocInitMsg
1455           || msgType==ForBocMsg || msgType==ForNodeBocMsg)
1456     ep = e->getEpIdx();
1457   else
1458     ep = _threadEP;
1459 #endif
1460   _logPool->add(MESSAGE_RECV, msgType, ep, TraceTimer(),
1461                 curevent++, e->getSrcPe(), e->getTotalsize());
1462 #endif
1463 }
1464
1465 void TraceProjections::beginIdle(double curWallTime)
1466 {
1467   _logPool->add(BEGIN_IDLE, 0, 0, TraceTimer(curWallTime), 0, CkMyPe());
1468 }
1469
1470 void TraceProjections::endIdle(double curWallTime)
1471 {
1472   _logPool->add(END_IDLE, 0, 0, TraceTimer(curWallTime), 0, CkMyPe());
1473 }
1474
1475 void TraceProjections::beginPack(void)
1476 {
1477   _logPool->add(BEGIN_PACK, 0, 0, TraceTimer(), 0, CkMyPe());
1478 }
1479
1480 void TraceProjections::endPack(void)
1481 {
1482   _logPool->add(END_PACK, 0, 0, TraceTimer(), 0, CkMyPe());
1483 }
1484
1485 void TraceProjections::beginUnpack(void)
1486 {
1487   _logPool->add(BEGIN_UNPACK, 0, 0, TraceTimer(), 0, CkMyPe());
1488 }
1489
1490 void TraceProjections::endUnpack(void)
1491 {
1492   _logPool->add(END_UNPACK, 0, 0, TraceTimer(), 0, CkMyPe());
1493 }
1494
1495 void TraceProjections::enqueue(envelope *) {}
1496
1497 void TraceProjections::dequeue(envelope *) {}
1498
1499 void TraceProjections::beginComputation(void)
1500 {
1501   computationStarted = 1;
1502
1503   // Executes the callback function provided by the machine
1504   // layer. This is the proper method to register user events in a
1505   // machine layer because projections is a charm++ module.
1506   if (CkpvAccess(traceOnPe) != 0) {
1507     void (*ptr)() = registerMachineUserEvents();
1508     if (ptr != NULL) {
1509       ptr();
1510     }
1511   }
1512 //  CkpvAccess(traceInitTime) = TRACE_TIMER();
1513 //  CkpvAccess(traceInitCpuTime) = TRACE_CPUTIMER();
1514   _logPool->add(BEGIN_COMPUTATION, 0, 0, TraceTimer(), -1, -1);
1515 #if CMK_HAS_COUNTER_PAPI
1516   // we start the counters here
1517   if (PAPI_start(papiEventSet) != PAPI_OK) {
1518     CmiAbort("PAPI failed to start designated counters!\n");
1519   }
1520 #endif
1521 }
1522
1523 void TraceProjections::endComputation(void)
1524 {
1525 #if CMK_HAS_COUNTER_PAPI
1526   // we stop the counters here. A silent failure is alright since we
1527   // are already at the end of the program.
1528   if (PAPI_stop(papiEventSet, papiValues) != PAPI_OK) {
1529     CkPrintf("Warning: PAPI failed to stop correctly!\n");
1530   }
1531   // NOTE: We should not do a complete close of PAPI until after the
1532   // sts writer is done.
1533 #endif
1534   endTime = TraceTimer();
1535   _logPool->add(END_COMPUTATION, 0, 0, endTime, -1, -1);
1536   /*
1537   CkPrintf("End Computation [%d] records time as %lf\n", CkMyPe(), 
1538            endTime*1e06);
1539   */
1540 }
1541
1542 int TraceProjections::idxRegistered(int idx)
1543 {
1544     int idxVecLen = idxVec.size();
1545     for(int i=0; i<idxVecLen; i++)
1546     {
1547         if(idx == idxVec[i])
1548             return 1;
1549     }
1550     return 0;
1551 }
1552
1553 void TraceProjections::regFunc(const char *name, int &idx, int idxSpecifiedByUser){
1554     StrKey k((char*)name,strlen(name));
1555     int num = funcHashtable.get(k);
1556     
1557     if(num!=0) {
1558         return;
1559         //as for mpi programs, the same function may be registered for several times
1560         //CmiError("\"%s has been already registered! Please change the name!\"\n", name);
1561     }
1562     
1563     int isIdxExisting=0;
1564     if(idxSpecifiedByUser)
1565         isIdxExisting=idxRegistered(idx);
1566     if(isIdxExisting){
1567         return;
1568         //same reason with num!=0
1569         //CmiError("The identifier %d for the trace function has been already registered!", idx);
1570     }
1571
1572     if(idxSpecifiedByUser) {
1573         char *st = new char[strlen(name)+1];
1574         memcpy(st,name,strlen(name)+1);
1575         StrKey *newKey = new StrKey(st,strlen(st));
1576         int &ref = funcHashtable.put(*newKey);
1577         ref=idx;
1578         funcCount++;
1579         idxVec.push_back(idx);  
1580     } else {
1581         char *st = new char[strlen(name)+1];
1582         memcpy(st,name,strlen(name)+1);
1583         StrKey *newKey = new StrKey(st,strlen(st));
1584         int &ref = funcHashtable.put(*newKey);
1585         ref=funcCount;
1586         num = funcCount;
1587         funcCount++;
1588         idx = num;
1589         idxVec.push_back(idx);
1590     }
1591 }
1592
1593 void TraceProjections::beginFunc(char *name,char *file,int line){
1594         StrKey k(name,strlen(name));    
1595         unsigned short  num = (unsigned short)funcHashtable.get(k);
1596         beginFunc(num,file,line);
1597 }
1598
1599 void TraceProjections::beginFunc(int idx,char *file,int line){
1600         if(idx <= 0){
1601                 CmiError("Unregistered function id %d being used in %s:%d \n",idx,file,line);
1602         }       
1603         _logPool->add(BEGIN_FUNC,TraceTimer(),idx,line,file);
1604 }
1605
1606 void TraceProjections::endFunc(char *name){
1607         StrKey k(name,strlen(name));    
1608         int num = funcHashtable.get(k);
1609         endFunc(num);   
1610 }
1611
1612 void TraceProjections::endFunc(int num){
1613         if(num <= 0){
1614                 printf("endFunc without start :O\n");
1615         }
1616         _logPool->add(END_FUNC,TraceTimer(),num,0,NULL);
1617 }
1618
1619 // specialized PUP:ers for handling trace projections logs
1620 void toProjectionsFile::bytes(void *p,int n,size_t itemSize,dataType t)
1621 {
1622   for (int i=0;i<n;i++) 
1623     switch(t) {
1624     case Tchar: CheckAndFPrintF(f,"%c",((char *)p)[i]); break;
1625     case Tuchar:
1626     case Tbyte: CheckAndFPrintF(f,"%d",((unsigned char *)p)[i]); break;
1627     case Tshort: CheckAndFPrintF(f," %d",((short *)p)[i]); break;
1628     case Tushort: CheckAndFPrintF(f," %u",((unsigned short *)p)[i]); break;
1629     case Tint: CheckAndFPrintF(f," %d",((int *)p)[i]); break;
1630     case Tuint: CheckAndFPrintF(f," %u",((unsigned int *)p)[i]); break;
1631     case Tlong: CheckAndFPrintF(f," %ld",((long *)p)[i]); break;
1632     case Tulong: CheckAndFPrintF(f," %lu",((unsigned long *)p)[i]); break;
1633     case Tfloat: CheckAndFPrintF(f," %.7g",((float *)p)[i]); break;
1634     case Tdouble: CheckAndFPrintF(f," %.15g",((double *)p)[i]); break;
1635 #ifdef CMK_PUP_LONG_LONG
1636     case Tlonglong: CheckAndFPrintF(f," %lld",((CMK_TYPEDEF_INT8 *)p)[i]); break;
1637     case Tulonglong: CheckAndFPrintF(f," %llu",((CMK_TYPEDEF_UINT8 *)p)[i]); break;
1638 #endif
1639     default: CmiAbort("Unrecognized pup type code!");
1640     };
1641 }
1642
1643 void fromProjectionsFile::bytes(void *p,int n,size_t itemSize,dataType t)
1644 {
1645   for (int i=0;i<n;i++) 
1646     switch(t) {
1647     case Tchar: { 
1648       char c = fgetc(f);
1649       if (c==EOF)
1650         parseError("Could not match character");
1651       else
1652         ((char *)p)[i] = c;
1653       break;
1654     }
1655     case Tuchar:
1656     case Tbyte: ((unsigned char *)p)[i]=(unsigned char)readInt("%d"); break;
1657     case Tshort:((short *)p)[i]=(short)readInt(); break;
1658     case Tushort: ((unsigned short *)p)[i]=(unsigned short)readUint(); break;
1659     case Tint:  ((int *)p)[i]=readInt(); break;
1660     case Tuint: ((unsigned int *)p)[i]=readUint(); break;
1661     case Tlong: ((long *)p)[i]=readInt(); break;
1662     case Tulong:((unsigned long *)p)[i]=readUint(); break;
1663     case Tfloat: ((float *)p)[i]=(float)readDouble(); break;
1664     case Tdouble:((double *)p)[i]=readDouble(); break;
1665 #ifdef CMK_PUP_LONG_LONG
1666     case Tlonglong: ((CMK_TYPEDEF_INT8 *)p)[i]=readLongInt(); break;
1667     case Tulonglong: ((CMK_TYPEDEF_UINT8 *)p)[i]=readLongInt(); break;
1668 #endif
1669     default: CmiAbort("Unrecognized pup type code!");
1670     };
1671 }
1672
1673 #if CMK_PROJECTIONS_USE_ZLIB
1674 void toProjectionsGZFile::bytes(void *p,int n,size_t itemSize,dataType t)
1675 {
1676   for (int i=0;i<n;i++) 
1677     switch(t) {
1678     case Tchar: gzprintf(f,"%c",((char *)p)[i]); break;
1679     case Tuchar:
1680     case Tbyte: gzprintf(f,"%d",((unsigned char *)p)[i]); break;
1681     case Tshort: gzprintf(f," %d",((short *)p)[i]); break;
1682     case Tushort: gzprintf(f," %u",((unsigned short *)p)[i]); break;
1683     case Tint: gzprintf(f," %d",((int *)p)[i]); break;
1684     case Tuint: gzprintf(f," %u",((unsigned int *)p)[i]); break;
1685     case Tlong: gzprintf(f," %ld",((long *)p)[i]); break;
1686     case Tulong: gzprintf(f," %lu",((unsigned long *)p)[i]); break;
1687     case Tfloat: gzprintf(f," %.7g",((float *)p)[i]); break;
1688     case Tdouble: gzprintf(f," %.15g",((double *)p)[i]); break;
1689 #ifdef CMK_PUP_LONG_LONG
1690     case Tlonglong: gzprintf(f," %lld",((CMK_TYPEDEF_INT8 *)p)[i]); break;
1691     case Tulonglong: gzprintf(f," %llu",((CMK_TYPEDEF_UINT8 *)p)[i]); break;
1692 #endif
1693     default: CmiAbort("Unrecognized pup type code!");
1694     };
1695 }
1696 #endif
1697
1698 void TraceProjections::endPhase() {
1699   double currentPhaseTime = TraceTimer();
1700   if (lastPhaseEvent != NULL) {
1701   } else {
1702     if (_logPool->pool != NULL) {
1703       // assumed to be BEGIN_COMPUTATION
1704     } else {
1705       CkPrintf("[%d] Warning: End Phase encountered in an empty log. Inserting BEGIN_COMPUTATION event\n", CkMyPe());
1706       _logPool->add(BEGIN_COMPUTATION, 0, 0, currentPhaseTime, -1, -1);
1707     }
1708   }
1709
1710   /* Insert endPhase event here. */
1711   /* FIXME: Format should be TYPE, PHASE#, TimeStamp, [StartTime] */
1712   /*        We currently "borrow" from the standard add() method. */
1713   /*        It should really be its own add() method.             */
1714   /* NOTE: assignment to lastPhaseEvent is "pre-emptive".         */
1715   lastPhaseEvent = &(_logPool->pool[_logPool->numEntries]);
1716   _logPool->add(END_PHASE, 0, currentPhaseID, currentPhaseTime, -1, CkMyPe());
1717   currentPhaseID++;
1718 }
1719
1720 #ifdef PROJ_ANALYSIS
1721 // ***** FROM HERE, ALL BOC-BASED FUNCTIONALITY IS DEFINED *******
1722
1723
1724 // ***@@@@ REGISTRATION FUNCTIONS/METHODS @@@@***
1725
1726 void registerOutlierReduction() {
1727   outlierReductionType =
1728     CkReduction::addReducer(outlierReduction);
1729   minMaxReductionType =
1730     CkReduction::addReducer(minMaxReduction);
1731 }
1732
1733 /**
1734  * **IMPT NOTES**:
1735  *
1736  * This is the C++ code that is registered to be activated at module
1737  * shutdown. This is called exactly once on processor 0. Module shutdown
1738  * is initiated as a result of a CkExit() call by the application code
1739  * 
1740  * The exit function must ultimately call CkExit() again to
1741  * so that other module exit functions may proceed after this module is
1742  * done.
1743  *
1744  */
1745 // FIXME: WHY extern "C"???
1746 extern "C" void TraceProjectionsExitHandler()
1747 {
1748 #if CMK_TRACE_ENABLED
1749   // CkPrintf("[%d] TraceProjectionsExitHandler called!\n", CkMyPe());
1750   CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1751   bocProxy.traceProjectionsParallelShutdown(CkMyPe());
1752 #else
1753   CkExit();
1754 #endif
1755 }
1756
1757 // This is called once on each processor but the idiom of use appears
1758 // to be to only have processor 0 register the function.
1759 //
1760 // See initnode in trace-projections.ci
1761 void initTraceProjectionsBOC()
1762 {
1763   // CkPrintf("[%d] Trace Projections initialization called!\n", CkMyPe());
1764 #ifdef __BIGSIM__
1765   if (BgNodeRank() == 0) {
1766 #else
1767     if (CkMyRank() == 0) {
1768 #endif
1769       registerExitFn(TraceProjectionsExitHandler);
1770     }
1771 #if 0
1772   } // this is so indentation does not get messed up
1773 #endif
1774 }
1775
1776 // mainchare for trace-projections BOC-operations. 
1777 // Instantiated at processor 0 and ONLY resides on processor 0 for the 
1778 // rest of its life.
1779 //
1780 // Responsible for:
1781 //   1. Handling commandline arguments
1782 //   2. Creating any objects required for proper BOC operations.
1783 //
1784 TraceProjectionsInit::TraceProjectionsInit(CkArgMsg *msg) {
1785   /** Options for Outlier Analysis */
1786   // defaults. Things will change with support for interactive analysis.
1787   bool findOutliers = false;
1788   bool outlierAutomatic = true;
1789   int numKSeeds = 10; 
1790
1791   int peNumKeep = CkNumPes();  // used as a default
1792   double entryThreshold = 0.0;
1793   bool outlierUsePhases = false;
1794   if (outlierAutomatic) {
1795     CmiGetArgIntDesc(msg->argv, "+outlierNumSeeds", &numKSeeds,
1796                      "Number of cluster seeds to apply at outlier analysis.");
1797     CmiGetArgIntDesc(msg->argv, "+outlierPeNumKeep", 
1798                      &peNumKeep, "Number of Processors to retain data");
1799     CmiGetArgDoubleDesc(msg->argv, "+outlierEpThresh", &entryThreshold,
1800                         "Minimum significance of entry points to be considered for clustering (%).");
1801     findOutliers =
1802       CmiGetArgFlagDesc(msg->argv,"+outlier", "Find Outliers.");
1803     outlierUsePhases = 
1804       CmiGetArgFlagDesc(msg->argv,"+outlierUsePhases",
1805                         "Apply automatic outlier analysis to any available phases.");
1806     if (outlierUsePhases) {
1807       // if the user wants to use an outlier feature, it is assumed outlier
1808       //    analysis is desired.
1809       findOutliers = true;
1810     }
1811   }
1812   bool findStartTime = (CmiTimerAbsolute()==1);
1813   traceProjectionsGID = CProxy_TraceProjectionsBOC::ckNew(findOutliers, findStartTime);
1814   if (findOutliers) {
1815     kMeansGID = CProxy_KMeansBOC::ckNew(outlierAutomatic,
1816                                         numKSeeds,
1817                                         peNumKeep,
1818                                         entryThreshold,
1819                                         outlierUsePhases);
1820   }
1821 }
1822
1823 // Called on every processor.
1824 void TraceProjectionsBOC::traceProjectionsParallelShutdown(int pe) {
1825   //CmiPrintf("[%d] traceProjectionsParallelShutdown called from . \n", CkMyPe(), pe);
1826   endPe = pe;                // the pe that starts CkExit()
1827   if (CkMyPe() == 0) {
1828     analysisStartTime = CmiWallTimer();
1829   }
1830   CkpvAccess(_trace)->endComputation();
1831   // no more tracing for projections on this processor after this point. 
1832   // Note that clear must be called after remove, or bad things will happen.
1833   CkpvAccess(_traces)->removeTrace(CkpvAccess(_trace));
1834   CkpvAccess(_traces)->clearTrace();
1835
1836   // From this point, we start multiple chains of reductions and broadcasts to
1837   // perform final online analysis activities.
1838
1839   // Start all parallel operations at once. 
1840   //   These MUST NOT modify base performance data in LogPool. If they must,
1841   //   then the parallel operations must be phased (and this code to be
1842   //   restructured as necessary)
1843   CProxy_KMeansBOC kMeansProxy(kMeansGID);
1844   CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1845   if (findOutliers) {
1846     parModulesRemaining++;
1847     kMeansProxy[CkMyPe()].startKMeansAnalysis();
1848   }
1849   parModulesRemaining++;
1850   if (findStartTime) 
1851   bocProxy[CkMyPe()].startTimeAnalysis();
1852   else
1853   bocProxy[CkMyPe()].startEndTimeAnalysis();
1854 }
1855
1856 // Called on each processor
1857 void KMeansBOC::startKMeansAnalysis() {
1858   // Initialize all necessary structures
1859   LogPool *pool = CkpvAccess(_trace)->_logPool;
1860
1861  if(CkMyPe()==0)     CkPrintf("[%d] KMeansBOC::startKMeansAnalysis time=\t%g\n", CkMyPe(), CkWallTimer() );
1862   int flushInt = 0;
1863   if (pool->hasFlushed) {
1864     flushInt = 1;
1865   }
1866   
1867   CkCallback cb(CkIndex_KMeansBOC::flushCheck(NULL), 
1868                 0, thisProxy);
1869   contribute(sizeof(int), &flushInt, CkReduction::logical_or, cb);  
1870 }
1871
1872 // Called on processor 0
1873 void KMeansBOC::flushCheck(CkReductionMsg *msg) {
1874   int someFlush = *((int *)msg->getData());
1875
1876   // if(CkMyPe()==0) CkPrintf("[%d] KMeansBOC::flushCheck time=\t%g\n", CkMyPe(), CkWallTimer() );
1877   
1878   if (someFlush == 0) {
1879     // Data intact proceed with KMeans analysis
1880     CProxy_KMeansBOC kMeansProxy(kMeansGID);
1881     kMeansProxy.flushCheckDone();
1882   } else {
1883     // Some processor had flushed it data at some point, abandon KMeans
1884     CkPrintf("Warning: Some processor has flushed its data. No KMeans will be conducted\n");
1885     // terminate KMeans
1886     CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1887     bocProxy[0].kMeansDone();
1888   }
1889 }
1890
1891 // Called on each processor
1892 void KMeansBOC::flushCheckDone() {
1893   // **FIXME** - more flexible metric collection scheme may be necessary
1894   //   in the future for production use.
1895   LogPool *pool = CkpvAccess(_trace)->_logPool;
1896
1897   // if(CkMyPe()==0)     CkPrintf("[%d] KMeansBOC::flushCheckDone time=\t%g\n", CkMyPe(), CkWallTimer() );
1898
1899   numEntryMethods = _entryTable.size();
1900   numMetrics = numEntryMethods + 2; // EPtime + idle and overhead
1901
1902   // maintained across phases
1903   markedBegin = false;
1904   markedIdle = false;
1905   beginBlockTime = 0.0;
1906   beginIdleBlockTime = 0.0;
1907   lastBeginEPIdx = -1; // none
1908
1909   lastPhaseIdx = 0;
1910   currentExecTimes = NULL;
1911   currentPhase = 0;
1912   selected = false;
1913
1914   pool->initializePhases();
1915
1916   // incoming K Seeds and the per-phase filter
1917   incKSeeds = new double[numK*numMetrics];
1918   keepMetric = new bool[numMetrics];
1919
1920   //  Something wrong when call thisProxy[CkMyPe()].getNextPhaseMetrics() !??!
1921   //  CProxy_KMeansBOC kMeansProxy(kMeansGID);
1922   //  kMeansProxy[CkMyPe()].getNextPhaseMetrics();
1923   thisProxy[CkMyPe()].getNextPhaseMetrics();
1924 }
1925
1926 // Called on each processor.
1927 void KMeansBOC::getNextPhaseMetrics() {
1928   // Assumes the presence of the complete logs on this processor.
1929   // Assumes first event is always BEGIN_COMPUTATION
1930   // Assumes each processor sees the same number of phases.
1931   //
1932   // In this code, we collect performance data for this processor.
1933   // All times are in seconds.
1934
1935   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::getNextPhaseMetrics time=\t%g\n", CkMyPe(), CkWallTimer() );  
1936
1937   if (usePhases) {
1938     DEBUGF("[%d] Using Phases\n", CkMyPe());
1939   } else {
1940     DEBUGF("[%d] NOT using Phases\n", CkMyPe());
1941   }
1942   
1943   if (currentExecTimes != NULL) {
1944     delete [] currentExecTimes;
1945   }
1946   currentExecTimes = new double[numMetrics];
1947   for (int i=0; i<numMetrics; i++) {
1948     currentExecTimes[i] = 0.0;
1949   }
1950
1951   int numEventMethods = _entryTable.size();
1952   LogPool *pool = CkpvAccess(_trace)->_logPool;
1953   
1954   CkAssert(pool->numEntries > lastPhaseIdx);
1955   double totalPhaseTime = 0.0;
1956   double totalActiveTime = 0.0; // entry method + idle
1957
1958   for (int i=lastPhaseIdx; i<pool->numEntries; i++) {
1959     if (pool->pool[i].type == BEGIN_PROCESSING) {
1960       // check pairing
1961       if (!markedBegin) {
1962         markedBegin = true;
1963       }
1964       beginBlockTime = pool->pool[i].time;
1965       lastBeginEPIdx = pool->pool[i].eIdx;
1966     } else if (pool->pool[i].type == END_PROCESSING) {
1967       // check pairing
1968       // if End without a begin, just ignore
1969       //   this event. If a phase-boundary is crossed, the Begin
1970       //   event would be maintained in beginBlockTime, so it is 
1971       //   not a problem.
1972       if (markedBegin) {
1973         markedBegin = false;
1974         if (pool->pool[i].event < 0)
1975         {
1976           // ignore dummy events. **FIXME** as they have no eIdx?
1977           continue;
1978         }
1979         currentExecTimes[pool->pool[i].eIdx] += 
1980           pool->pool[i].time - beginBlockTime;
1981         totalActiveTime += pool->pool[i].time - beginBlockTime;
1982         lastBeginEPIdx = -1;
1983       }
1984     } else if (pool->pool[i].type == BEGIN_IDLE) {
1985       // check pairing
1986       if (!markedIdle) {
1987         markedIdle = true;
1988       }
1989       beginIdleBlockTime = pool->pool[i].time;
1990     } else if (pool->pool[i].type == END_IDLE) {
1991       // check pairing
1992       if (markedIdle) {
1993         markedIdle = false;
1994         currentExecTimes[numEventMethods] += 
1995           pool->pool[i].time - beginIdleBlockTime;
1996         totalActiveTime += pool->pool[i].time - beginIdleBlockTime;
1997       }
1998     } else if (pool->pool[i].type == END_PHASE) {
1999       // ignored when not using phases
2000       if (usePhases) {
2001         // when we've not visited this node before
2002         if (i != lastPhaseIdx) { 
2003           totalPhaseTime = 
2004             pool->pool[i].time - pool->pool[lastPhaseIdx].time;
2005           // it is important that proper accounting of time take place here.
2006           // Note that END_PHASE events inevitably occur in the context of
2007           //   some entry method by the way the tracing API is designed.
2008           if (markedBegin) {
2009             CkAssert(lastBeginEPIdx >= 0);
2010             currentExecTimes[lastBeginEPIdx] += 
2011               pool->pool[i].time - beginBlockTime;
2012             totalActiveTime += pool->pool[i].time - beginBlockTime;
2013             // this is so the remainder contributes to the next phase
2014             beginBlockTime = pool->pool[i].time;
2015           }
2016           // The following is unlikely, but stranger things have happened.
2017           if (markedIdle) {
2018             currentExecTimes[numEventMethods] +=
2019               pool->pool[i].time - beginIdleBlockTime;
2020             totalActiveTime += pool->pool[i].time - beginIdleBlockTime;
2021             // this is so the remainder contributes to the next phase
2022             beginIdleBlockTime = pool->pool[i].time;
2023           }
2024           if (totalActiveTime <= totalPhaseTime) {
2025             currentExecTimes[numEventMethods+1] = 
2026               totalPhaseTime - totalActiveTime;
2027           } else {
2028             currentExecTimes[numEventMethods+1] = 0.0;
2029             CkPrintf("[%d] Warning: Overhead found to be negative for Phase %d!\n",
2030                      CkMyPe(), currentPhase);
2031           }
2032           collectKMeansData();
2033           // end the loop (and method) and defer the work till the next call
2034           lastPhaseIdx = i;
2035           break; 
2036         }
2037       }
2038     } else if (pool->pool[i].type == END_COMPUTATION) {
2039       if (markedBegin) {
2040         CkAssert(lastBeginEPIdx >= 0);
2041         currentExecTimes[lastBeginEPIdx] += 
2042           pool->pool[i].time - beginBlockTime;
2043         totalActiveTime += pool->pool[i].time - beginBlockTime;
2044       }
2045       if (markedIdle) {
2046         currentExecTimes[numEventMethods] +=
2047           pool->pool[i].time - beginIdleBlockTime;
2048         totalActiveTime += pool->pool[i].time - beginIdleBlockTime;
2049       }
2050       totalPhaseTime = 
2051         pool->pool[i].time - pool->pool[lastPhaseIdx].time;
2052       if (totalActiveTime <= totalPhaseTime) {
2053         currentExecTimes[numEventMethods+1] = totalPhaseTime - totalActiveTime;
2054       } else {
2055         currentExecTimes[numEventMethods+1] = 0.0;
2056         CkPrintf("[%d] Warning: Overhead found to be negative!\n",
2057                  CkMyPe());
2058       }
2059       collectKMeansData();
2060     }
2061   }
2062 }
2063
2064 /**
2065  *  Through a reduction, collectKMeansData aggregates each processors' data
2066  *  in order for global properties to be determined:
2067  *  
2068  *  1. min & max to determine normalization factors.
2069  *  2. sum to determine global EP averages for possible metric reduction
2070  *       through thresholding.
2071  *  3. sum of squares to compute stddev which may be useful in the future.
2072  *
2073  *  collectKMeansData will also keep the processor's data for the current
2074  *    phase so that it may be normalized and worked on subsequently.
2075  *
2076  **/
2077 void KMeansBOC::collectKMeansData() {
2078   int minOffset = numMetrics;
2079   int maxOffset = 2*numMetrics;
2080   int sosOffset = 3*numMetrics; // sos = Sum Of Squares
2081
2082   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::collectKMeansData time=\tg\n", CkMyPe(), CkWallTimer() );
2083
2084   double *reductionMsg = new double[numMetrics*4];
2085
2086   for (int i=0; i<numMetrics; i++) {
2087     reductionMsg[i] = currentExecTimes[i];
2088     // duplicate the event times for max and min sections of the reduction
2089     reductionMsg[minOffset + i] = currentExecTimes[i];
2090     reductionMsg[maxOffset + i] = currentExecTimes[i];
2091     // compute squares
2092     reductionMsg[sosOffset + i] = currentExecTimes[i]*currentExecTimes[i];
2093   }
2094
2095   CkCallback cb(CkIndex_KMeansBOC::globalMetricRefinement(NULL), 
2096                 0, thisProxy);
2097   contribute((numMetrics*4)*sizeof(double), reductionMsg, 
2098              outlierReductionType, cb);  
2099 }
2100
2101 // The purpose is mainly to initialize the k seeds and generate
2102 //   normalization parameters for each of the metrics. The k seeds
2103 //   and normalization parameters are broadcast to all processors.
2104 //
2105 // Called on processor 0
2106 void KMeansBOC::globalMetricRefinement(CkReductionMsg *msg) {
2107   CkAssert(CkMyPe() == 0);
2108   
2109   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::globalMetricRefinement time=\t%g\n", CkMyPe(), CkWallTimer() );
2110
2111   int sumOffset = 0;
2112   int minOffset = numMetrics;
2113   int maxOffset = 2*numMetrics;
2114   int sosOffset = 3*numMetrics; // sos = Sum Of Squares
2115
2116   // calculate statistics & boundaries for the k seeds for clustering
2117   KMeansStatsMessage *outmsg = 
2118     new (numMetrics, numK*numMetrics, numMetrics*4) KMeansStatsMessage;
2119   outmsg->numMetrics = numMetrics;
2120   outmsg->numKPos = numK*numMetrics;
2121   outmsg->numStats = numMetrics*4;
2122
2123   // Sum | Min | Max | Sum of Squares
2124   double *totalExecTimes = (double *)msg->getData();
2125   double totalTime = 0.0;
2126
2127   for (int i=0; i<numMetrics; i++) {
2128     DEBUGN("%lf\n", totalExecTimes[i]);
2129     totalTime += totalExecTimes[i];
2130
2131     // calculate event mean over all processors
2132     outmsg->stats[sumOffset + i] = totalExecTimes[sumOffset + i]/CkNumPes();
2133
2134     // get the ranges and offsets of each metric. With this, we get
2135     //   normalization factors that can be sent back to each processor to
2136     //   be used as necessary. We reuse max for range. Min remains the offset.
2137     outmsg->stats[minOffset + i] = totalExecTimes[minOffset + i];
2138     outmsg->stats[maxOffset + i] = totalExecTimes[maxOffset + i] -
2139       totalExecTimes[minOffset + i];
2140     
2141     // calculate stddev (using biased variance)
2142     outmsg->stats[sosOffset + i] = 
2143       sqrt((totalExecTimes[sosOffset + i] - 
2144             2*(outmsg->stats[i])*totalExecTimes[i] +
2145             (outmsg->stats[i])*(outmsg->stats[i])*CkNumPes())/
2146            CkNumPes());
2147   }
2148
2149   for (int i=0; i<numMetrics; i++) {
2150     // 1) if the proportion of the max value of the entry method relative to
2151     //   the average time taken over all entry methods across all processors
2152     //   is greater than the stipulated percentage threshold ...; AND
2153     // 2) if the range of values are non-zero.
2154     //
2155     // The current assumption is totalTime > 0 (what program has zero total
2156     //   time from all work?)
2157     keepMetric[i] = ((totalExecTimes[maxOffset + i]/(totalTime/CkNumPes()) >=
2158                      entryThreshold) &&
2159       (totalExecTimes[maxOffset + i] > totalExecTimes[minOffset + i]));
2160     if (keepMetric[i]) {
2161       DEBUGF("[%d] Keep EP %d | Max = %lf | Avg Tot = %lf\n", CkMyPe(), i,
2162              totalExecTimes[maxOffset + i], totalTime/CkNumPes());
2163     } else {
2164       DEBUGN("[%d] DO NOT Keep EP %d\n", CkMyPe(), i);
2165     }
2166     outmsg->filter[i] = keepMetric[i];
2167   }
2168
2169   delete msg;
2170   
2171   // initialize k seeds for this phase
2172   kSeeds = new double[numK*numMetrics];
2173
2174   numKReported = 0;
2175   kNumMembers = new int[numK];
2176
2177   // Randomly select k processors' metric vectors for the k seeds
2178   //  srand((unsigned)(CmiWallTimer()*1.0e06));
2179   srand(11337); // for debugging purposes
2180   for (int k=0; k<numK; k++) {
2181     DEBUGF("Seed %d | ", k);
2182     for (int m=0; m<numMetrics; m++) {
2183       double factor = totalExecTimes[maxOffset + m] - 
2184         totalExecTimes[minOffset + m];
2185       // "uniform" distribution, scaled according to the normalization
2186       //   factors
2187       //      kSeeds[numMetrics*k + m] = ((1.0*(k+1))/numK)*factor;
2188       // Random distribution.
2189       kSeeds[numMetrics*k + m] =
2190         ((rand()*1.0)/RAND_MAX)*factor;
2191       if (keepMetric[m]) {
2192         DEBUGF("[%d|%lf] ", m, kSeeds[numMetrics*k + m]);
2193       }
2194       outmsg->kSeedsPos[numMetrics*k + m] = kSeeds[numMetrics*k + m];
2195     }
2196     DEBUGF("\n");
2197     kNumMembers[k] = 0;
2198   }
2199
2200   // broadcast statistical values to all processors for cluster discovery
2201   thisProxy.findInitialClusters(outmsg);
2202 }
2203
2204
2205
2206 // Called on each processor.
2207 void KMeansBOC::findInitialClusters(KMeansStatsMessage *msg) {
2208
2209  if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::findInitialClusters time=\t%g\n", CkMyPe(), CkWallTimer() );
2210
2211   phaseIter = 0;
2212
2213   // Get info from stats message
2214   CkAssert(numMetrics == msg->numMetrics);
2215   for (int i=0; i<numMetrics; i++) {
2216     keepMetric[i] = msg->filter[i];
2217   }
2218
2219   // Normalize data on local processor.
2220   // **CWL** See my thesis for detailed discussion of normalization of
2221   //    performance data.
2222   // **NOTE** This might change if we want to send data based on the filter
2223   //   instead of all the data.
2224   CkAssert(numMetrics*4 == msg->numStats);
2225   for (int i=0; i<numMetrics; i++) {
2226     currentExecTimes[i] -= msg->stats[numMetrics + i];  // take offset
2227     // **CWL** We do not normalize the range. Entry methods that exhibit
2228     //   large absolute timing variations should be allowed to contribute
2229     //   more to the Euclidean distance measure!
2230     // currentExecTimes[i] /= msg->stats[2*numMetrics + i];
2231   }
2232
2233   // **NOTE** This might change if we want to send data based on the filter
2234   //   instead of all the data.
2235   CkAssert(numK*numMetrics == msg->numKPos);
2236   for (int i=0; i<msg->numKPos; i++) {
2237     incKSeeds[i] = msg->kSeedsPos[i];
2238   }
2239
2240   // Decide which KSeed this processor belongs to.
2241   minDistance = calculateDistance(0);
2242   DEBUGN("[%d] Phase %d Iter %d | Distance from 0 = %lf \n", CkMyPe(), 
2243            currentPhase, phaseIter, minDistance);
2244   minK = 0;
2245   for (int i=1; i<numK; i++) {
2246     double distance = calculateDistance(i);
2247     DEBUGN("[%d] Phase %d Iter %d | Distance from %d = %lf \n", CkMyPe(), 
2248              currentPhase, phaseIter, i, distance);
2249     if (distance < minDistance) {
2250       minDistance = distance;
2251       minK = i;
2252     }
2253   }
2254
2255   // Set up a reduction with the modification vector to the root (0).
2256   //
2257   // The modification vector sends a negative value for each metric
2258   //   for the K this processor no longer belongs to and a positive
2259   //   value to the K the processor now belongs. In addition, a -1.0
2260   //   is sent to the K it is leaving and a +1.0 to the K it is 
2261   //   joining.
2262   //
2263   // The processor must still contribute a "zero returns" even if
2264   //   nothing changes. This will be the basis for determine
2265   //   convergence at the root.
2266   //
2267   // The addtional +1 is meant for the count-change that must be
2268   //   maintained for the special cases at the root when some K
2269   //   may be deprived of all processor points or go from 0 to a
2270   //   positive number of processors (see later comments).
2271   double *modVector = new double[numK*(numMetrics+1)];
2272   for (int i=0; i<numK; i++) {
2273     for (int j=0; j<numMetrics+1; j++) {
2274       modVector[i*(numMetrics+1) + j] = 0.0;
2275     }
2276   }
2277   for (int i=0; i<numMetrics; i++) {
2278     // for this initialization, only positive values need be sent.
2279     modVector[minK*(numMetrics+1) + i] = currentExecTimes[i];
2280   }
2281   modVector[minK*(numMetrics+1)+numMetrics] = 1.0;
2282
2283   CkCallback cb(CkIndex_KMeansBOC::updateKSeeds(NULL), 
2284                 0, thisProxy);
2285   contribute(numK*(numMetrics+1)*sizeof(double), modVector, 
2286              CkReduction::sum_double, cb);  
2287 }
2288
2289 double KMeansBOC::calculateDistance(int k) {
2290   double ret = 0.0;
2291   for (int i=0; i<numMetrics; i++) {
2292     if (keepMetric[i]) {
2293       DEBUGN("[%d] Phase %d Iter %d Metric %d Exec %lf Seed %lf \n", 
2294              CkMyPe(), currentPhase, phaseIter, i,
2295                currentExecTimes[i], incKSeeds[k*numMetrics + i]);
2296       ret += pow(currentExecTimes[i] - incKSeeds[k*numMetrics + i], 2.0);
2297     }
2298   }
2299   return sqrt(ret);
2300 }
2301
2302 void KMeansBOC::updateKSeeds(CkReductionMsg *msg) {
2303   CkAssert(CkMyPe() == 0);
2304
2305   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::updateKSeeds time=\t%g\n", CkMyPe(), CkWallTimer() );
2306
2307   double *modVector = (double *)msg->getData();
2308   // sanity check
2309   CkAssert(numK*(numMetrics+1)*sizeof(double) == msg->getSize());
2310
2311   // A quick convergence test.
2312   bool hasChanges = false;
2313   for (int i=0; i<numK; i++) {
2314     hasChanges = hasChanges || 
2315       (modVector[i*(numMetrics+1) + numMetrics] != 0.0);
2316   }
2317   if (!hasChanges) {
2318     delete msg;
2319     findRepresentatives();
2320   } else {
2321     int overallChange = 0;
2322     for (int i=0; i<numK; i++) {
2323       int change = (int)modVector[i*(numMetrics+1) + numMetrics];
2324       if (change != 0) {
2325         overallChange += change;
2326         // modify the k seeds based on the modification vectors coming in
2327         //
2328         // If a seed initially has no members, its contents do not matter and
2329         //   is simply set to the average of the incoming vector.
2330         // If the change causes a seed to lose all its members, do nothing.
2331         //   Its last-known location is kept to allow it to re-capture
2332         //   membership at the next iteration rather than apply the last
2333         //   changes (which snaps the point unnaturally to 0,0).
2334         // Otherwise, apply the appropriate vector changes.
2335         CkAssert((kNumMembers[i] + change >= 0) &&
2336                  (kNumMembers[i] + change <= CkNumPes()));
2337         if (kNumMembers[i] == 0) {
2338           CkAssert(change > 0);
2339           for (int j=0; j<numMetrics; j++) {
2340             kSeeds[i*numMetrics + j] = modVector[i*(numMetrics+1) + j]/change;
2341           }
2342         } else if (kNumMembers[i] + change == 0) {
2343           // do nothing.
2344         } else {
2345           for (int j=0; j<numMetrics; j++) {
2346             kSeeds[i*numMetrics + j] *= kNumMembers[i];
2347             kSeeds[i*numMetrics + j] += modVector[i*(numMetrics+1) + j];
2348             kSeeds[i*numMetrics + j] /= kNumMembers[i] + change;
2349           }
2350         }
2351         kNumMembers[i] += change;
2352       }
2353       DEBUGN("[%d] Phase %d Iter %d K = %d Membership Count = %d\n",
2354              CkMyPe(), currentPhase, phaseIter, i, kNumMembers[i]);
2355     }
2356     delete msg;
2357
2358     // broadcast the new seed locations.
2359     KSeedsMessage *outmsg = new (numK*numMetrics) KSeedsMessage;
2360     outmsg->numKPos = numK*numMetrics;
2361     for (int i=0; i<numK*numMetrics; i++) {
2362       outmsg->kSeedsPos[i] = kSeeds[i];
2363     }
2364
2365     thisProxy.updateSeedMembership(outmsg);
2366   }
2367 }
2368
2369 // Called on all processors
2370 void KMeansBOC::updateSeedMembership(KSeedsMessage *msg) {
2371
2372   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::updateSeedMembership time=\t%g\n", CkMyPe(), CkWallTimer() );
2373
2374   phaseIter++;
2375
2376   // **NOTE** This might change if we want to send data based on the filter
2377   //   instead of all the data.
2378   CkAssert(numK*numMetrics == msg->numKPos);
2379   for (int i=0; i<msg->numKPos; i++) {
2380     incKSeeds[i] = msg->kSeedsPos[i];
2381   }
2382
2383   // Decide which KSeed this processor belongs to.
2384   lastMinK = minK;
2385   minDistance = calculateDistance(0);
2386   DEBUGN("[%d] Phase %d Iter %d | Distance from 0 = %lf \n", CkMyPe(), 
2387          currentPhase, phaseIter, minDistance);
2388
2389   minK = 0;
2390   for (int i=1; i<numK; i++) {
2391     double distance = calculateDistance(i);
2392     DEBUGN("[%d] Phase %d Iter %d | Distance from %d = %lf \n", CkMyPe(), 
2393            currentPhase, phaseIter, i, distance);
2394     if (distance < minDistance) {
2395       minDistance = distance;
2396       minK = i;
2397     }
2398   }
2399
2400   double *modVector = new double[numK*(numMetrics+1)];
2401   for (int i=0; i<numK; i++) {
2402     for (int j=0; j<numMetrics+1; j++) {
2403       modVector[i*(numMetrics+1) + j] = 0.0;
2404     }
2405   }
2406
2407   if (minK != lastMinK) {
2408     for (int i=0; i<numMetrics; i++) {
2409       modVector[minK*(numMetrics+1) + i] = currentExecTimes[i];
2410       modVector[lastMinK*(numMetrics+1) + i] = -currentExecTimes[i];
2411     }
2412     modVector[minK*(numMetrics+1)+numMetrics] = 1.0;
2413     modVector[lastMinK*(numMetrics+1)+numMetrics] = -1.0;
2414   }
2415
2416   CkCallback cb(CkIndex_KMeansBOC::updateKSeeds(NULL), 
2417                 0, thisProxy);
2418   contribute(numK*(numMetrics+1)*sizeof(double), modVector, 
2419              CkReduction::sum_double, cb);  
2420 }
2421
2422 void KMeansBOC::findRepresentatives() {
2423
2424   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::findRepresentatives time=\t%g\n", CkMyPe(), CkWallTimer() );
2425
2426   int numNonEmptyClusters = 0;
2427   for (int i=0; i<numK; i++) {
2428     if (kNumMembers[i] > 0) {
2429       numNonEmptyClusters++;
2430     }
2431   }
2432
2433   int numRepresentatives = peNumKeep;
2434   // **FIXME**
2435   // This is fairly arbitrary. Next time, choose the centers of the top
2436   //   largest clusters.
2437   if (numRepresentatives < numNonEmptyClusters) {
2438     numRepresentatives = numNonEmptyClusters;
2439   }
2440
2441   int slotsRemaining = numRepresentatives;
2442
2443   DEBUGF("Slots = %d | Non-empty = %d \n", slotsRemaining, 
2444          numNonEmptyClusters);
2445
2446   // determine how many exemplars to select per cluster. Currently
2447   //   hardcoded to 1. Future challenge is to decide on other numbers
2448   //   or proportionality.
2449   //
2450   int exemplarsPerCluster = 1;
2451   slotsRemaining -= exemplarsPerCluster*numNonEmptyClusters;
2452
2453   int numCandidateOutliers = CkNumPes() - 
2454     exemplarsPerCluster*numNonEmptyClusters;
2455
2456   double *remainders = new double[numK];
2457   int *assigned = new int[numK];
2458   exemplarChoicesLeft = new int[numK];
2459   outlierChoicesLeft = new int[numK];
2460
2461   for (int i=0; i<numK; i++) {
2462     assigned[i] = 0;
2463     remainders[i] = 
2464       (kNumMembers[i] - exemplarsPerCluster*numNonEmptyClusters) *
2465       slotsRemaining / numCandidateOutliers;
2466     if (remainders[i] >= 0.0) {
2467       assigned[i] = (int)floor(remainders[i]);
2468       remainders[i] -= assigned[i];
2469     } else {
2470       remainders[i] = 0.0;
2471     }
2472   }
2473
2474   for (int i=0; i<numK; i++) {
2475     slotsRemaining -= assigned[i];
2476   }
2477   CkAssert(slotsRemaining >= 0);
2478
2479   // find clusters to assign the loose slots to, in order of
2480   // remainder proportion
2481   while (slotsRemaining > 0) {
2482     double max = 0.0;
2483     int winner = 0;
2484     for (int i=0; i<numK; i++) {
2485       if (remainders[i] > max) {
2486         max = remainders[i];
2487         winner = i;
2488       }
2489     }
2490     assigned[winner]++;
2491     remainders[winner] = 0.0;
2492     slotsRemaining--;
2493   }
2494
2495   // set up how many reduction cycles of min/max we need to conduct to
2496   // select the representatives.
2497   numSelectionIter = exemplarsPerCluster;
2498   for (int i=0; i<numK; i++) {
2499     if (assigned[i] > numSelectionIter) {
2500       numSelectionIter = assigned[i];
2501     }
2502   }
2503   DEBUGF("Selection Iterations = %d\n", numSelectionIter);
2504
2505   for (int i=0; i<numK; i++) {
2506     if (kNumMembers[i] > 0) {
2507       exemplarChoicesLeft[i] = exemplarsPerCluster;
2508       outlierChoicesLeft[i] = assigned[i];
2509     } else {
2510       exemplarChoicesLeft[i] = 0;
2511       outlierChoicesLeft[i] = 0;
2512     }
2513     DEBUGF("%d | Exemplar = %d | Outlier = %d\n", i, exemplarChoicesLeft[i],
2514            outlierChoicesLeft[i]);
2515   }
2516
2517   delete [] assigned;
2518   delete [] remainders;
2519
2520   // send out first broadcast
2521   KSelectionMessage *outmsg = NULL;
2522   if (numSelectionIter > 0) {
2523     outmsg = new (numK, numK, numK) KSelectionMessage;
2524     outmsg->numKMinIDs = numK;
2525     outmsg->numKMaxIDs = numK;
2526     for (int i=0; i<numK; i++) {
2527       outmsg->minIDs[i] = -1;
2528       outmsg->maxIDs[i] = -1;
2529     }
2530     thisProxy.collectDistances(outmsg);
2531   } else {
2532     CkPrintf("Warning: No selection iteration from the start!\n");
2533     // invoke phase completion on all processors
2534     thisProxy.phaseDone();
2535   }
2536 }
2537
2538 /*
2539  *  lastMin = array of minimum champions of the last tournament
2540  *  lastMax = array of maximum champions of the last tournament
2541  *  lastMaxVal = array of last encountered maximum values, allows previous
2542  *                 minimum winners to eliminate themselves from the next
2543  *                 minimum race.
2544  *
2545  *  Called on all processors.
2546  */
2547 void KMeansBOC::collectDistances(KSelectionMessage *msg) {
2548
2549   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::collectDistances time=\t%g\n", CkMyPe(), CkWallTimer() );
2550
2551   DEBUGF("[%d] %d | min = %d max = %d\n", CkMyPe(),
2552          lastMinK, msg->minIDs[lastMinK], msg->maxIDs[lastMinK]);
2553   if ((CkMyPe() == msg->minIDs[lastMinK]) || 
2554       (CkMyPe() == msg->maxIDs[lastMinK])) {
2555     CkAssert(!selected);
2556     selected = true;
2557   }
2558
2559   // build outgoing reduction structure
2560   //   format = minVal | ID | maxVal | ID
2561   double *minMaxAndIDs = NULL;
2562
2563   minMaxAndIDs = new double[numK*4];
2564   // initialize to the appropriate out-of-band values (for error checks)
2565   for (int i=0; i<numK; i++) {
2566     minMaxAndIDs[i*4] = -1.0; // out-of-band min value
2567     minMaxAndIDs[i*4+1] = -1.0; // out of band ID
2568     minMaxAndIDs[i*4+2] = -1.0; // out-of-band max value
2569     minMaxAndIDs[i*4+3] = -1.0; // out of band ID
2570   }
2571   // If I have not won before, I put myself back into the competition
2572   if (!selected) {
2573     DEBUGF("[%d] My Contribution = %lf\n", CkMyPe(), minDistance);
2574     minMaxAndIDs[lastMinK*4] = minDistance;
2575     minMaxAndIDs[lastMinK*4+1] = CkMyPe();
2576     minMaxAndIDs[lastMinK*4+2] = minDistance;
2577     minMaxAndIDs[lastMinK*4+3] = CkMyPe();
2578   }
2579   delete msg;
2580
2581   CkCallback cb(CkIndex_KMeansBOC::findNextMinMax(NULL), 
2582                 0, thisProxy);
2583   contribute(numK*4*sizeof(double), minMaxAndIDs, 
2584              minMaxReductionType, cb);  
2585 }
2586
2587 void KMeansBOC::findNextMinMax(CkReductionMsg *msg) {
2588   // incoming format:
2589   //   minVal | minID | maxVal | maxID
2590
2591   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::findNextMinMax time=\t%g\n", CkMyPe(), CkWallTimer() );
2592
2593   if (numSelectionIter > 0) {
2594     double *incInfo = (double *)msg->getData();
2595     
2596     KSelectionMessage *outmsg = new (numK, numK) KSelectionMessage;
2597     outmsg->numKMinIDs = numK;
2598     outmsg->numKMaxIDs = numK;
2599     
2600     for (int i=0; i<numK; i++) {
2601       DEBUGF("%d | %lf %d %lf %d \n", i, 
2602              incInfo[i*4], (int)incInfo[i*4+1], 
2603              incInfo[i*4+2], (int)incInfo[i*4+3]);
2604     }
2605
2606     for (int i=0; i<numK; i++) {
2607       if (exemplarChoicesLeft[i] > 0) {
2608         outmsg->minIDs[i] = (int)incInfo[i*4+1];
2609         exemplarChoicesLeft[i]--;
2610       } else {
2611         outmsg->minIDs[i] = -1;
2612       }
2613       if (outlierChoicesLeft[i] > 0) {
2614         outmsg->maxIDs[i] = (int)incInfo[i*4+3];
2615         outlierChoicesLeft[i]--;
2616       } else {
2617         outmsg->maxIDs[i] = -1;
2618       }
2619     }
2620     thisProxy.collectDistances(outmsg);
2621     numSelectionIter--;
2622   } else {
2623     // invoke phase completion on all processors
2624     thisProxy.phaseDone();
2625   }
2626 }
2627
2628 /**
2629  *  Completion of the K-Means clustering and data selection of one phase
2630  *    of the computation.
2631  *
2632  *  Called on every processor.
2633  */
2634 void KMeansBOC::phaseDone() {
2635
2636   //  if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::phaseDone time=\t%g\n", CkMyPe(), CkWallTimer() );
2637
2638   LogPool *pool = CkpvAccess(_trace)->_logPool;
2639   CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
2640
2641   // now decide on what to do with the decision.
2642   if (!selected) {
2643     if (usePhases) {
2644       pool->keepPhase[currentPhase] = false;
2645     } else {
2646       // if not using phases, we're working on the whole log
2647       pool->setAllPhases(false);
2648     }
2649   }
2650
2651   // **FIXME** (?) - All processors have to agree on this, or the reduction
2652   //   will not be correct! The question is "is this enforcible?"
2653   if ((currentPhase == (pool->numPhases-1)) || !usePhases) {
2654     // We're done
2655     int dummy = 0;
2656     CkCallback cb(CkIndex_TraceProjectionsBOC::kMeansDone(NULL), 
2657                   0, bocProxy);
2658     contribute(sizeof(int), &dummy, CkReduction::sum_int, cb);
2659   } else {
2660     // reset all phase-based k-means data and decisions
2661
2662     // **FIXME**!!!!!    
2663     
2664     // invoke the next K-Means computation phase.
2665     currentPhase++;
2666     thisProxy[CkMyPe()].getNextPhaseMetrics();
2667   }
2668 }
2669
2670 void TraceProjectionsBOC::startTimeAnalysis()
2671 {
2672   double startTime = 0.0;
2673   if (CkpvAccess(_trace)->_logPool->numEntries>0)
2674      startTime = CkpvAccess(_trace)->_logPool->pool[0].time;
2675   CkCallback cb(CkIndex_TraceProjectionsBOC::startTimeDone(NULL), thisProxy);
2676   contribute(sizeof(double), &startTime, CkReduction::min_double, cb);  
2677 }
2678
2679 void TraceProjectionsBOC::startTimeDone(CkReductionMsg *msg)
2680 {
2681   // CkPrintf("[%d] TraceProjectionsBOC::startTimeDone time=\t%g parModulesRemaining:%d\n", CkMyPe(), CkWallTimer(), parModulesRemaining);
2682
2683   if (CkpvAccess(_trace) != NULL) {
2684     CkpvAccess(_trace)->_logPool->globalStartTime = *(double *)msg->getData();
2685     CkpvAccess(_trace)->_logPool->setNewStartTime();
2686     //if (CkMyPe() == 0) CkPrintf("Start time determined to be %lf us\n", (CkpvAccess(_trace)->_logPool->globalStartTime)*1e06);
2687   }
2688   delete msg;
2689   thisProxy[CkMyPe()].startEndTimeAnalysis();
2690 }
2691
2692 void TraceProjectionsBOC::startEndTimeAnalysis()
2693 {
2694  //CkPrintf("[%d] TraceProjectionsBOC::startEndTimeAnalysis time=\t%g\n", CkMyPe(), CkWallTimer() );
2695
2696   endTime = CkpvAccess(_trace)->endTime;
2697   // CkPrintf("[%d] End time is %lf us\n", CkMyPe(), endTime*1e06);
2698
2699   CkCallback cb(CkIndex_TraceProjectionsBOC::endTimeDone(NULL), 
2700                 0, thisProxy);
2701   contribute(sizeof(double), &endTime, CkReduction::max_double, cb);  
2702 }
2703
2704 void TraceProjectionsBOC::endTimeDone(CkReductionMsg *msg)
2705 {
2706  //if(CkMyPe()==0)    CkPrintf("[%d] TraceProjectionsBOC::endTimeDone time=\t%g parModulesRemaining:%d\n", CkMyPe(), CkWallTimer(), parModulesRemaining);
2707
2708   CkAssert(CkMyPe() == 0);
2709   parModulesRemaining--;
2710   if (CkpvAccess(_trace) != NULL) {
2711     CkpvAccess(_trace)->_logPool->globalEndTime = *(double *)msg->getData() - CkpvAccess(_trace)->_logPool->globalStartTime;
2712     // CkPrintf("End time determined to be %lf us\n",
2713     //       (CkpvAccess(_trace)->_logPool->globalEndTime)*1e06);
2714   }
2715   delete msg;
2716   if (parModulesRemaining == 0) {
2717     thisProxy[CkMyPe()].finalize();
2718   }
2719 }
2720
2721 void TraceProjectionsBOC::kMeansDone(CkReductionMsg *msg) {
2722
2723  if(CkMyPe()==0)  CkPrintf("[%d] TraceProjectionsBOC::kMeansDone time=\t%g\n", CkMyPe(), CkWallTimer() );
2724
2725   CkAssert(CkMyPe() == 0);
2726   parModulesRemaining--;
2727   CkPrintf("K-Means Analysis Time = %lf seconds\n",
2728            CmiWallTimer()-analysisStartTime);
2729   delete msg;
2730   if (parModulesRemaining == 0) {
2731     thisProxy[CkMyPe()].finalize();
2732   }
2733 }
2734
2735 /**
2736  *
2737  *  This version is called (on processor 0) only if flushCheck fails.
2738  *
2739  */
2740 void TraceProjectionsBOC::kMeansDone() {
2741   CkAssert(CkMyPe() == 0);
2742   parModulesRemaining--;
2743   CkPrintf("K-Means Analysis Aborted because of flush. Time taken = %lf seconds\n",
2744            CmiWallTimer()-analysisStartTime);
2745   if (parModulesRemaining == 0) {
2746     thisProxy[CkMyPe()].finalize();
2747   }
2748 }
2749
2750 void TraceProjectionsBOC::finalize()
2751 {
2752   CkAssert(CkMyPe() == 0);
2753   //CkPrintf("Total Analysis Time = %lf seconds\n", 
2754   //       CmiWallTimer()-analysisStartTime);
2755   thisProxy.closingTraces();
2756 }
2757
2758 // Called on every processor
2759 void TraceProjectionsBOC::closingTraces() {
2760   CkpvAccess(_trace)->closeTrace();
2761
2762     // subtle:  reduction needs to go to the PE which started CkExit()
2763   int pe = 0;
2764   if (endPe != -1) pe = endPe;
2765   CkCallback cb(CkIndex_TraceProjectionsBOC::closeParallelShutdown(NULL), 
2766                 pe, thisProxy); 
2767   contribute(0, NULL, CkReduction::sum_int, cb);  
2768 }
2769
2770 // The sole purpose of this reduction is to decide whether or not
2771 //   Projections as a module needs to call CkExit() to get other
2772 //   modules to shutdown.
2773 void TraceProjectionsBOC::closeParallelShutdown(CkReductionMsg *msg) {
2774   CkAssert(endPe == -1 && CkMyPe() ==0 || CkMyPe() == endPe);
2775   delete msg;
2776   // decide if CkExit() needs to be called
2777   if (!CkpvAccess(_trace)->converseExit) {
2778     CkExit();
2779   }
2780 }
2781 /*
2782  *  Registration and definition of the Outlier Reduction callback.
2783  *  Format: Sum | Min | Max | Sum of Squares
2784  */
2785 CkReductionMsg *outlierReduction(int nMsgs,
2786                                  CkReductionMsg **msgs) {
2787   int numBytes = 0;
2788   int numMetrics = 0;
2789   double *ret = NULL;
2790
2791   if (nMsgs == 1) {
2792     // nothing to do, just pass it on
2793     return CkReductionMsg::buildNew(msgs[0]->getSize(),msgs[0]->getData());
2794   }
2795
2796   if (nMsgs > 1) {
2797     numBytes = msgs[0]->getSize();
2798     // sanity checks
2799     if (numBytes%sizeof(double) != 0) {
2800       CkAbort("Outlier Reduction Size incompatible with doubles!\n");
2801     }
2802     if ((numBytes/sizeof(double))%4 != 0) {
2803       CkAbort("Outlier Reduction Size Array not divisible by 4!\n");
2804     }
2805     numMetrics = (numBytes/sizeof(double))/4;
2806     ret = new double[numMetrics*4];
2807
2808     // copy the first message data into the return structure first
2809     for (int i=0; i<numMetrics*4; i++) {
2810       ret[i] = ((double *)msgs[0]->getData())[i];
2811     }
2812
2813     // Sum | Min | Max | Sum of Squares
2814     for (int msgIdx=1; msgIdx<nMsgs; msgIdx++) {
2815       for (int i=0; i<numMetrics; i++) {
2816         // Sum
2817         ret[i] += ((double *)msgs[msgIdx]->getData())[i];
2818         // Min
2819         ret[numMetrics + i] =
2820           (ret[numMetrics + i] < 
2821            ((double *)msgs[msgIdx]->getData())[numMetrics + i]) 
2822           ? ret[numMetrics + i] : 
2823           ((double *)msgs[msgIdx]->getData())[numMetrics + i];
2824         // Max
2825         ret[2*numMetrics + i] = 
2826           (ret[2*numMetrics + i] >
2827            ((double *)msgs[msgIdx]->getData())[2*numMetrics + i])
2828           ? ret[2*numMetrics + i] :
2829           ((double *)msgs[msgIdx]->getData())[2*numMetrics + i];
2830         // Sum of Squares (squaring already done at leaf)
2831         ret[3*numMetrics + i] +=
2832           ((double *)msgs[msgIdx]->getData())[3*numMetrics + i];
2833       }
2834     }
2835   }
2836   
2837   /* apparently, we do not delete the incoming messages */
2838   return CkReductionMsg::buildNew(numBytes,ret);
2839 }
2840
2841 /*
2842  * The only reason we have a user-defined reduction is to support
2843  *   identification of the "winning" processors as well as to handle
2844  *   both the min and the max of each "tournament". A simple min/max
2845  *   discovery cannot handle ties.
2846  */
2847 CkReductionMsg *minMaxReduction(int nMsgs,
2848                                 CkReductionMsg **msgs) {
2849   CkAssert(nMsgs > 0);
2850
2851   int numBytes = msgs[0]->getSize();
2852   CkAssert(numBytes%sizeof(double) == 0);
2853   int numK = (numBytes/sizeof(double))/4;
2854
2855   double *ret = new double[numK*4];
2856   // fill with out-of-band values
2857   for (int i=0; i<numK; i++) {
2858     ret[i*4] = -1.0;
2859     ret[i*4+1] = -1.0;
2860     ret[i*4+2] = -1.0;
2861     ret[i*4+3] = -1.0;
2862   }
2863
2864   // incoming format K * (minVal | minIdx | maxVal | maxIdx)
2865   for (int i=0; i<nMsgs; i++) {
2866     double *temp = (double *)msgs[i]->getData();
2867     for (int j=0; j<numK; j++) {
2868       // no previous valid min
2869       if (ret[j*4+1] < 0) {
2870         // fill it in only if the incoming min is valid
2871         if (temp[j*4+1] >= 0) {
2872           ret[j*4] = temp[j*4];      // fill min value
2873           ret[j*4+1] = temp[j*4+1];  // fill ID
2874         }
2875       } else {
2876         // find Min, only if incoming min is valid
2877         if (temp[j*4+1] >= 0) {
2878           if (temp[j*4] < ret[j*4]) {
2879             ret[j*4] = temp[j*4];      // replace min value
2880             ret[j*4+1] = temp[j*4+1];  // replace ID
2881           }
2882         }
2883       }
2884       // no previous valid max
2885       if (ret[j*4+3] < 0) {
2886         // fill only if the incoming max is valid
2887         if (temp[j*4+3] >= 0) {
2888           ret[j*4+2] = temp[j*4+2];  // fill max value
2889           ret[j*4+3] = temp[j*4+3];  // fill ID
2890         }
2891       } else {
2892         // find Max, only if incoming max is valid
2893         if (temp[j*4+3] >= 0) {
2894           if (temp[j*4+2] > ret[j*4+2]) {
2895             ret[j*4+2] = temp[j*4+2];  // replace max value
2896             ret[j*4+3] = temp[j*4+3];  // replace ID
2897           }
2898         }
2899       }
2900     }
2901   }
2902   CkReductionMsg *redmsg = CkReductionMsg::buildNew(numBytes, ret);
2903   delete [] ret;
2904   return redmsg;
2905 }
2906
2907 #include "TraceProjections.def.h"
2908 #endif //PROJ_ANALYSIS
2909
2910 /*@}*/