Merge branch 'charm' of charmgit:charm into charm
[charm.git] / src / ck-perf / trace-projections.C
1 /**
2  * \addtogroup CkPerf
3 */
4 /*@{*/
5
6 #include <string.h>
7
8 #include "charm++.h"
9 #include "trace-projections.h"
10 #include "trace-projectionsBOC.h"
11
12 #if DEBUG_PROJ
13 #define DEBUGF(...) CkPrintf(__VA_ARGS__)
14 #else
15 #define DEBUGF(...)
16 #endif
17 #define DEBUGN(...)  // easy way to selectively disable DEBUGs
18
19 #define DefaultLogBufSize      1000000
20
21 // **CW** Simple delta encoding implementation
22 // delta encoding is on by default. It may be turned off later in
23 // the runtime.
24 int deltaLog;
25 int nonDeltaLog;
26
27 int checknested=0;              // check illegal nested begin/end execute 
28
29 #ifdef PROJ_ANALYSIS
30 // BOC operations readonlys
31 CkGroupID traceProjectionsGID;
32 CkGroupID kMeansGID;
33
34 // New reduction type for Outlier Analysis purposes. This is allowed to be
35 // a global variable according to the Charm++ manual.
36 CkReductionMsg *outlierReduction(int nMsgs,
37                                  CkReductionMsg **msgs);
38 CkReductionMsg *minMaxReduction(int nMsgs,
39                                 CkReductionMsg **msgs);
40 CkReduction::reducerType outlierReductionType;
41 CkReduction::reducerType minMaxReductionType;
42 #endif // PROJ_ANALYSIS
43
44 CkpvStaticDeclare(TraceProjections*, _trace);
45 CtvStaticDeclare(int,curThreadEvent);
46
47 CkpvDeclare(CmiInt8, CtrLogBufSize);
48
49 typedef CkVec<char *>  usrEventVec;
50 CkpvStaticDeclare(usrEventVec, usrEventlist);
51 class UsrEvent {
52 public:
53   int e;
54   char *str;
55   UsrEvent(int _e, char* _s): e(_e),str(_s) {}
56 };
57 CkpvStaticDeclare(CkVec<UsrEvent *>*, usrEvents);
58
59 // When tracing is disabled, these are defined as empty static inlines
60 // in the header, to minimize overhead
61 #if CMK_TRACE_ENABLED
62 /// Disable the outputting of the trace logs
63 void disableTraceLogOutput()
64
65   CkpvAccess(_trace)->setWriteData(false);
66 }
67
68 /// Enable the outputting of the trace logs
69 void enableTraceLogOutput()
70 {
71   CkpvAccess(_trace)->setWriteData(true);
72 }
73
74 /// Force the log files to be flushed
75 void flushTraceLog()
76 {
77   CkpvAccess(_trace)->traceFlushLog();
78 }
79 #endif
80
81 #if ! CMK_TRACE_ENABLED
82 static int warned=0;
83 #define OPTIMIZED_VERSION       \
84         if (!warned) { warned=1;        \
85         CmiPrintf("\n\n!!!! Warning: traceUserEvent not available in optimized version!!!!\n\n\n"); }
86 #else
87 #define OPTIMIZED_VERSION /*empty*/
88 #endif // CMK_TRACE_ENABLED
89
90 /*
91 On T3E, we need to have file number control by open/close files only when needed.
92 */
93 #if CMK_TRACE_LOGFILE_NUM_CONTROL
94   #define OPEN_LOG openLog("a");
95   #define CLOSE_LOG closeLog();
96 #else
97   #define OPEN_LOG
98   #define CLOSE_LOG
99 #endif //CMK_TRACE_LOGFILE_NUM_CONTROL
100
101 #if CMK_HAS_COUNTER_PAPI
102 int papiEvents[NUMPAPIEVENTS] = { PAPI_L2_DCM, PAPI_FP_OPS };
103 #endif // CMK_HAS_COUNTER_PAPI
104
105 /**
106   For each TraceFoo module, _createTraceFoo() must be defined.
107   This function is called in _createTraces() generated in moduleInit.C
108 */
109 void _createTraceprojections(char **argv)
110 {
111   DEBUGF("%d createTraceProjections\n", CkMyPe());
112   CkpvInitialize(CkVec<char *>, usrEventlist);
113   CkpvInitialize(CkVec<UsrEvent *>*, usrEvents);
114   CkpvAccess(usrEvents) = new CkVec<UsrEvent *>();
115 #if CMK_BIGSIM_CHARM
116   // CthRegister does not call the constructor
117 //  CkpvAccess(usrEvents) = CkVec<UsrEvent *>();
118 #endif //CMK_BIGSIM_CHARM
119   CkpvInitialize(TraceProjections*, _trace);
120   CkpvAccess(_trace) = new  TraceProjections(argv);
121   CkpvAccess(_traces)->addTrace(CkpvAccess(_trace));
122   if (CkMyPe()==0) CkPrintf("Charm++: Tracemode Projections enabled.\n");
123 }
124  
125 /* ****** CW TEMPORARY LOCATION ***** Support for thread listeners */
126
127 struct TraceThreadListener {
128   struct CthThreadListener base;
129   int event;
130   int msgType;
131   int ep;
132   int srcPe;
133   int ml;
134   CmiObjId idx;
135 };
136
137 extern "C"
138 void traceThreadListener_suspend(struct CthThreadListener *l)
139 {
140   TraceThreadListener *a=(TraceThreadListener *)l;
141   /* here, we activate the appropriate trace codes for the appropriate
142      registered modules */
143   traceSuspend();
144 }
145
146 extern "C"
147 void traceThreadListener_resume(struct CthThreadListener *l) 
148 {
149   TraceThreadListener *a=(TraceThreadListener *)l;
150   /* here, we activate the appropriate trace codes for the appropriate
151      registered modules */
152   _TRACE_BEGIN_EXECUTE_DETAILED(a->event,a->msgType,a->ep,a->srcPe,a->ml,
153                                 CthGetThreadID(a->base.thread));
154   a->event=-1;
155   a->srcPe=CkMyPe(); /* potential lie to migrated threads */
156   a->ml=0;
157 }
158
159 extern "C"
160 void traceThreadListener_free(struct CthThreadListener *l) 
161 {
162   TraceThreadListener *a=(TraceThreadListener *)l;
163   delete a;
164 }
165
166 void TraceProjections::traceAddThreadListeners(CthThread tid, envelope *e)
167 {
168 #if CMK_TRACE_ENABLED
169   /* strip essential information from the envelope */
170   TraceThreadListener *a= new TraceThreadListener;
171   
172   a->base.suspend=traceThreadListener_suspend;
173   a->base.resume=traceThreadListener_resume;
174   a->base.free=traceThreadListener_free;
175   a->event=e->getEvent();
176   a->msgType=e->getMsgtype();
177   a->ep=e->getEpIdx();
178   a->srcPe=e->getSrcPe();
179   a->ml=e->getTotalsize();
180
181   CthAddListener(tid, (CthThreadListener *)a);
182 #endif
183 }
184
185 void LogPool::openLog(const char *mode)
186 {
187 #if CMK_PROJECTIONS_USE_ZLIB
188   if(compressed) {
189     if (nonDeltaLog) {
190       do {
191         zfp = gzopen(fname, mode);
192       } while (!zfp && (errno == EINTR || errno == EMFILE));
193       if(!zfp) CmiAbort("Cannot open Projections Compressed Non Delta Trace File for writing...\n");
194     }
195     if (deltaLog) {
196       do {
197         deltazfp = gzopen(dfname, mode);
198       } while (!deltazfp && (errno == EINTR || errno == EMFILE));
199       if (!deltazfp) 
200         CmiAbort("Cannot open Projections Compressed Delta Trace File for writing...\n");
201     }
202   } else {
203     if (nonDeltaLog) {
204       do {
205         fp = fopen(fname, mode);
206       } while (!fp && (errno == EINTR || errno == EMFILE));
207       if (!fp) {
208         CkPrintf("[%d] Attempting to open file [%s]\n",CkMyPe(),fname);
209         CmiAbort("Cannot open Projections Non Delta Trace File for writing...\n");
210       }
211     }
212     if (deltaLog) {
213       do {
214         deltafp = fopen(dfname, mode);
215       } while (!deltafp && (errno == EINTR || errno == EMFILE));
216       if (!deltafp) 
217         CmiAbort("Cannot open Projections Delta Trace File for writing...\n");
218     }
219   }
220 #else
221   if (nonDeltaLog) {
222     do {
223       fp = fopen(fname, mode);
224     } while (!fp && (errno == EINTR || errno == EMFILE));
225     if (!fp) {
226       CkPrintf("[%d] Attempting to open file [%s]\n",CkMyPe(),fname);
227       CmiAbort("Cannot open Projections Non Delta Trace File for writing...\n");
228     }
229   }
230   if (deltaLog) {
231     do {
232       deltafp = fopen(dfname, mode);
233     } while (!deltafp && (errno == EINTR || errno == EMFILE));
234     if(!deltafp) 
235       CmiAbort("Cannot open Projections Delta Trace File for writing...\n");
236   }
237 #endif
238 }
239
240 void LogPool::closeLog(void)
241 {
242 #if CMK_PROJECTIONS_USE_ZLIB
243   if(compressed) {
244     if (nonDeltaLog) gzclose(zfp);
245     if (deltaLog) gzclose(deltazfp);
246     return;
247   }
248 #endif
249   if (nonDeltaLog) { 
250 #if !defined(_WIN32) || defined(__CYGWIN__)
251     fsync(fileno(fp)); 
252 #endif
253     fclose(fp); 
254   }
255   if (deltaLog)  { 
256 #if !defined(_WIN32) || defined(__CYGWIN__)
257     fsync(fileno(deltafp)); 
258 #endif
259     fclose(deltafp);  
260   }
261 }
262
263 LogPool::LogPool(char *pgm) {
264   pool = new LogEntry[CkpvAccess(CtrLogBufSize)];
265   // defaults to writing data (no outlier changes)
266   writeData = true;
267   numEntries = 0;
268   // **CW** for simple delta encoding
269   prevTime = 0.0;
270   timeErr = 0.0;
271   globalStartTime = 0.0;
272   globalEndTime = 0.0;
273   headerWritten = 0;
274   numPhases = 0;
275   hasFlushed = false;
276
277   keepPhase = NULL;
278
279   fileCreated = false;
280   poolSize = CkpvAccess(CtrLogBufSize);
281   pgmname = new char[strlen(pgm)+1];
282   strcpy(pgmname, pgm);
283 }
284
285 void LogPool::createFile(const char *fix)
286 {
287   if (fileCreated) {
288     return;
289   }
290
291   char* filenameLastPart = strrchr(pgmname, PATHSEP) + 1; // Last occurrence of path separator
292   char *pathPlusFilePrefix = new char[1024];
293
294   if(nSubdirs > 0){
295     int sd = CkMyPe() % nSubdirs;
296     char *subdir = new char[1024];
297     sprintf(subdir, "%s.projdir.%d", pgmname, sd);
298     CmiMkdir(subdir);
299     sprintf(pathPlusFilePrefix, "%s%c%s%s", subdir, PATHSEP, filenameLastPart, fix);
300     delete[] subdir;
301   } else {
302     sprintf(pathPlusFilePrefix, "%s%s", pgmname, fix);
303   }
304
305   char pestr[10];
306   sprintf(pestr, "%d", CkMyPe());
307 #if CMK_PROJECTIONS_USE_ZLIB
308   int len;
309   if(compressed)
310     len = strlen(pathPlusFilePrefix)+strlen(".logold")+strlen(pestr)+strlen(".gz")+3;
311   else
312     len = strlen(pathPlusFilePrefix)+strlen(".logold")+strlen(pestr)+3;
313 #else
314   int len = strlen(pathPlusFilePrefix)+strlen(".logold")+strlen(pestr)+3;
315 #endif
316
317   if (nonDeltaLog) {
318     fname = new char[len];
319   }
320   if (deltaLog) {
321     dfname = new char[len];
322   }
323 #if CMK_PROJECTIONS_USE_ZLIB
324   if(compressed) {
325     if (deltaLog && nonDeltaLog) {
326       sprintf(fname, "%s.%s.logold.gz",  pathPlusFilePrefix, pestr);
327       sprintf(dfname, "%s.%s.log.gz", pathPlusFilePrefix, pestr);
328     } else {
329       if (nonDeltaLog) {
330         sprintf(fname, "%s.%s.log.gz", pathPlusFilePrefix,pestr);
331       } else {
332         sprintf(dfname, "%s.%s.log.gz", pathPlusFilePrefix, pestr);
333       }
334     }
335   } else {
336     if (deltaLog && nonDeltaLog) {
337       sprintf(fname, "%s.%s.logold", pathPlusFilePrefix, pestr);
338       sprintf(dfname, "%s.%s.log", pathPlusFilePrefix, pestr);
339     } else {
340       if (nonDeltaLog) {
341         sprintf(fname, "%s.%s.log", pathPlusFilePrefix, pestr);
342       } else {
343         sprintf(dfname, "%s.%s.log", pathPlusFilePrefix, pestr);
344       }
345     }
346   }
347 #else
348   if (deltaLog && nonDeltaLog) {
349     sprintf(fname, "%s.%s.logold", pathPlusFilePrefix, pestr);
350     sprintf(dfname, "%s.%s.log", pathPlusFilePrefix, pestr);
351   } else {
352     if (nonDeltaLog) {
353       sprintf(fname, "%s.%s.log", pathPlusFilePrefix, pestr);
354     } else {
355       sprintf(dfname, "%s.%s.log", pathPlusFilePrefix, pestr);
356     }
357   }
358 #endif
359   fileCreated = true;
360   delete[] pathPlusFilePrefix;
361   openLog("w+");
362   CLOSE_LOG 
363 }
364
365 void LogPool::createSts(const char *fix)
366 {
367   CkAssert(CkMyPe() == 0);
368   // create the sts file
369   char *fname = new char[strlen(CkpvAccess(traceRoot))+strlen(fix)+strlen(".sts")+2];
370   sprintf(fname, "%s%s.sts", CkpvAccess(traceRoot), fix);
371   do
372     {
373       stsfp = fopen(fname, "w");
374     } while (!stsfp && (errno == EINTR || errno == EMFILE));
375   if(stsfp==0){
376     CmiPrintf("Cannot open projections sts file for writing due to %s\n", strerror(errno));
377     CmiAbort("Error!!\n");
378   }
379   delete[] fname;
380 }  
381
382 void LogPool::createRC()
383 {
384   // create the projections rc file.
385   fname = 
386     new char[strlen(CkpvAccess(traceRoot))+strlen(".projrc")+1];
387   sprintf(fname, "%s.projrc", CkpvAccess(traceRoot));
388   do {
389     rcfp = fopen(fname, "w");
390   } while (!rcfp && (errno == EINTR || errno == EMFILE));
391   if (rcfp==0) {
392     CmiAbort("Cannot open projections configuration file for writing.\n");
393   }
394   delete[] fname;
395 }
396
397 LogPool::~LogPool() 
398 {
399   if (writeData) {
400     writeLog();
401 #if !CMK_TRACE_LOGFILE_NUM_CONTROL
402     closeLog();
403 #endif
404   }
405
406 #if CMK_BIGSIM_CHARM
407   extern int correctTimeLog;
408   if (correctTimeLog) {
409     createFile("-bg");
410     if (CkMyPe() == 0) {
411       createSts("-bg");
412     }
413     writeHeader();
414     if (CkMyPe() == 0) writeSts(NULL);
415     postProcessLog();
416   }
417 #endif
418
419   delete[] pool;
420   delete [] fname;
421 }
422
423 void LogPool::writeHeader()
424 {
425   if (headerWritten) return;
426   headerWritten = 1;
427   if(!binary) {
428 #if CMK_PROJECTIONS_USE_ZLIB
429     if(compressed) {
430       if (nonDeltaLog) {
431         gzprintf(zfp, "PROJECTIONS-RECORD %d\n", numEntries);
432       }
433       if (deltaLog) {
434         gzprintf(deltazfp, "PROJECTIONS-RECORD %d DELTA\n", numEntries);
435       }
436     } 
437     else /* else clause is below... */
438 #endif
439     /*... may hang over from else above */ {
440       if (nonDeltaLog) {
441         fprintf(fp, "PROJECTIONS-RECORD %d\n", numEntries);
442       }
443       if (deltaLog) {
444         fprintf(deltafp, "PROJECTIONS-RECORD %d DELTA\n", numEntries);
445       }
446     }
447   }
448   else { // binary
449       if (nonDeltaLog) {
450         fwrite(&numEntries,sizeof(numEntries),1,fp);
451       }
452       if (deltaLog) {
453         fwrite(&numEntries,sizeof(numEntries),1,deltafp);
454       }
455   }
456 }
457
458 void LogPool::writeLog(void)
459 {
460   createFile();
461   OPEN_LOG
462   writeHeader();
463   if (nonDeltaLog) write(0);
464   if (deltaLog) write(1);
465   CLOSE_LOG
466 }
467
468 void LogPool::write(int writedelta) 
469 {
470   // **CW** Simple delta encoding implementation
471   // prevTime has to be maintained as an object variable because
472   // LogPool::write may be called several times depending on the
473   // +logsize value.
474   PUP::er *p = NULL;
475   if (binary) {
476     p = new PUP::toDisk(writedelta?deltafp:fp);
477   }
478 #if CMK_PROJECTIONS_USE_ZLIB
479   else if (compressed) {
480     p = new toProjectionsGZFile(writedelta?deltazfp:zfp);
481   }
482 #endif
483   else {
484     p = new toProjectionsFile(writedelta?deltafp:fp);
485   }
486   CmiAssert(p);
487   int curPhase = 0;
488   // **FIXME** - Should probably consider a more sophisticated bounds-based
489   //   approach for selective writing instead of making multiple if-checks
490   //   for every single event.
491   for(UInt i=0; i<numEntries; i++) {
492     if (!writedelta) {
493       if (keepPhase == NULL) {
494         // default case, when no phase selection is required.
495         pool[i].pup(*p);
496       } else {
497         // **FIXME** Might be a good idea to create a "filler" event block for
498         //   all the events taken out by phase filtering.
499         if (pool[i].type == END_PHASE) {
500           // always write phase markers
501           pool[i].pup(*p);
502           curPhase++;
503         } else if (pool[i].type == BEGIN_COMPUTATION ||
504                    pool[i].type == END_COMPUTATION) {
505           // always write BEGIN and END COMPUTATION markers
506           pool[i].pup(*p);
507         } else if (keepPhase[curPhase]) {
508           pool[i].pup(*p);
509         }
510       }
511     }
512     else {      // delta
513       // **FIXME** Implement phase-selective writing for delta logs
514       //   eventually
515       double time = pool[i].time;
516       if (pool[i].type != BEGIN_COMPUTATION && pool[i].type != END_COMPUTATION)
517       {
518         double timeDiff = (time-prevTime)*1.0e6;
519         UInt intTimeDiff = (UInt)timeDiff;
520         timeErr += timeDiff - intTimeDiff; /* timeErr is never >= 2.0 */
521         if (timeErr > 1.0) {
522           timeErr -= 1.0;
523           intTimeDiff++;
524         }
525         pool[i].time = intTimeDiff/1.0e6;
526       }
527       pool[i].pup(*p);
528       pool[i].time = time;      // restore time value
529       prevTime = time;
530     }
531   }
532   delete p;
533   delete [] keepPhase;
534 }
535
536 void LogPool::writeSts(void)
537 {
538   // for whining compilers
539   int i;
540   // generate an automatic unique ID for each log
541   fprintf(stsfp, "PROJECTIONS_ID %s\n", "");
542   fprintf(stsfp, "VERSION %s\n", PROJECTION_VERSION);
543   fprintf(stsfp, "TOTAL_PHASES %d\n", numPhases);
544 #if CMK_HAS_COUNTER_PAPI
545   fprintf(stsfp, "TOTAL_PAPI_EVENTS %d\n", NUMPAPIEVENTS);
546   // for now, use i, next time use papiEvents[i].
547   // **CW** papi event names is a hack.
548   char eventName[PAPI_MAX_STR_LEN];
549   for (i=0;i<NUMPAPIEVENTS;i++) {
550     PAPI_event_code_to_name(papiEvents[i], eventName);
551     fprintf(stsfp, "PAPI_EVENT %d %s\n", i, eventName);
552   }
553 #endif
554   traceWriteSTS(stsfp,CkpvAccess(usrEvents)->length());
555   for(i=0;i<CkpvAccess(usrEvents)->length();i++){
556     fprintf(stsfp, "EVENT %d %s\n", (*CkpvAccess(usrEvents))[i]->e, (*CkpvAccess(usrEvents))[i]->str);
557   }     
558 }
559
560 void LogPool::writeSts(TraceProjections *traceProj){
561   writeSts();
562   if (traceProj != NULL) {
563     CkHashtableIterator  *funcIter = traceProj->getfuncIterator();
564     funcIter->seekStart();
565     int numFuncs = traceProj->getFuncNumber();
566     fprintf(stsfp,"TOTAL_FUNCTIONS %d \n",numFuncs);
567     while(funcIter->hasNext()) {
568       StrKey *key;
569       int *obj = (int *)funcIter->next((void **)&key);
570       fprintf(stsfp,"FUNCTION %d %s \n",*obj,key->getStr());
571     }
572   }
573   fprintf(stsfp, "END\n");
574   fclose(stsfp);
575 }
576
577 void LogPool::writeRC(void)
578 {
579     //CkPrintf("write RC is being executed\n");
580 #ifdef PROJ_ANALYSIS  
581     CkAssert(CkMyPe() == 0);
582     fprintf(rcfp,"RC_GLOBAL_START_TIME %lld\n",
583           (CMK_TYPEDEF_UINT8)(1.0e6*globalStartTime));
584     fprintf(rcfp,"RC_GLOBAL_END_TIME   %lld\n",
585           (CMK_TYPEDEF_UINT8)(1.0e6*globalEndTime));
586     /* //Yanhua comment it because isOutlierAutomatic is not a variable in trace
587     if (CkpvAccess(_trace)->isOutlierAutomatic()) {
588       fprintf(rcfp,"RC_OUTLIER_FILTERED true\n");
589     } else {
590       fprintf(rcfp,"RC_OUTLIER_FILTERED false\n");
591     }
592     */
593 #endif //PROJ_ANALYSIS
594   fclose(rcfp);
595 }
596
597
598 #if CMK_BIGSIM_CHARM
599 static void updateProjLog(void *data, double t, double recvT, void *ptr)
600 {
601   LogEntry *log = (LogEntry *)data;
602   FILE *fp = *(FILE **)ptr;
603   log->time = t;
604   log->recvTime = recvT<0.0?0:recvT;
605 //  log->write(fp);
606   toProjectionsFile p(fp);
607   log->pup(p);
608 }
609 #endif
610
611 // flush log entries to disk
612 void LogPool::flushLogBuffer()
613 {
614   if (numEntries) {
615     double writeTime = TraceTimer();
616     writeLog();
617     hasFlushed = true;
618     numEntries = 0;
619     new (&pool[numEntries++]) LogEntry(writeTime, BEGIN_INTERRUPT);
620     new (&pool[numEntries++]) LogEntry(TraceTimer(), END_INTERRUPT);
621   }
622 }
623
624 void LogPool::add(UChar type, UShort mIdx, UShort eIdx,
625                   double time, int event, int pe, int ml, CmiObjId *id, 
626                   double recvT, double cpuT, int numPe)
627 {
628   new (&pool[numEntries++])
629     LogEntry(time, type, mIdx, eIdx, event, pe, ml, id, recvT, cpuT, numPe);
630   if ((type == END_PHASE) || (type == END_COMPUTATION)) {
631     numPhases++;
632   }
633   if(poolSize==numEntries) {
634     flushLogBuffer();
635 #if CMK_BIGSIM_CHARM
636     extern int correctTimeLog;
637     if (correctTimeLog) CmiAbort("I/O interrupt!\n");
638 #endif
639   }
640 #if CMK_BIGSIM_CHARM
641   switch (type) {
642     case BEGIN_PROCESSING:
643       pool[numEntries-1].recvTime = BgGetRecvTime();
644     case END_PROCESSING:
645     case BEGIN_COMPUTATION:
646     case END_COMPUTATION:
647     case CREATION:
648     case BEGIN_PACK:
649     case END_PACK:
650     case BEGIN_UNPACK:
651     case END_UNPACK:
652     case USER_EVENT_PAIR:
653       bgAddProjEvent(&pool[numEntries-1], numEntries-1, time, updateProjLog, &fp, BG_EVENT_PROJ);
654   }
655 #endif
656 }
657
658 void LogPool::add(UChar type,double time,UShort funcID,int lineNum,char *fileName){
659 #ifndef CMK_BIGSIM_CHARM
660   new (&pool[numEntries++])
661         LogEntry(time,type,funcID,lineNum,fileName);
662   if(poolSize == numEntries){
663     flushLogBuffer();
664   }
665 #endif  
666 }
667
668
669   
670 void LogPool::addMemoryUsage(unsigned char type,double time,double memUsage){
671 #ifndef CMK_BIGSIM_CHARM
672   new (&pool[numEntries++])
673         LogEntry(type,time,memUsage);
674   if(poolSize == numEntries){
675     flushLogBuffer();
676   }
677 #endif  
678         
679 }  
680
681
682
683 void LogPool::addUserSupplied(int data){
684         // add an event
685         add(USER_SUPPLIED, 0, 0, TraceTimer(), -1, -1, 0, 0, 0, 0, 0 );
686
687         // set the user supplied value for the previously created event 
688         pool[numEntries-1].setUserSuppliedData(data);
689   }
690
691
692 void LogPool::addUserSuppliedNote(char *note){
693         // add an event
694         add(USER_SUPPLIED_NOTE, 0, 0, TraceTimer(), -1, -1, 0, 0, 0, 0, 0 );
695
696         // set the user supplied note for the previously created event 
697         pool[numEntries-1].setUserSuppliedNote(note);
698   }
699
700 void LogPool::addUserSuppliedBracketedNote(char *note, int eventID, double bt, double et){
701   //CkPrintf("LogPool::addUserSuppliedBracketedNote eventID=%d\n", eventID);
702 #ifndef CMK_BIGSIM_CHARM
703 #if MPI_TRACE_MACHINE_HACK
704   //This part of code is used  to combine the contiguous
705   //MPI_Test and MPI_Iprobe events to reduce the number of
706   //entries
707 #define MPI_TEST_EVENT_ID 60
708 #define MPI_IPROBE_EVENT_ID 70 
709   int lastEvent = pool[numEntries-1].event;
710   if((eventID==MPI_TEST_EVENT_ID || eventID==MPI_IPROBE_EVENT_ID) && (eventID==lastEvent)){
711     //just replace the endtime of last event
712     //CkPrintf("addUserSuppliedBracketNote: for event %d\n", lastEvent);
713     pool[numEntries].endTime = et;
714   }else{
715     new (&pool[numEntries++])
716       LogEntry(bt, et, USER_SUPPLIED_BRACKETED_NOTE, note, eventID);
717   }
718 #else
719   new (&pool[numEntries++])
720     LogEntry(bt, et, USER_SUPPLIED_BRACKETED_NOTE, note, eventID);
721 #endif
722   if(poolSize == numEntries){
723     flushLogBuffer();
724   }
725 #endif  
726 }
727
728
729 /* **CW** Not sure if this is the right thing to do. Feels more like
730    a hack than a solution to Sameer's request to add the destination
731    processor information to multicasts and broadcasts.
732
733    In the unlikely event this method is used for Broadcasts as well,
734    pelist == NULL will be used to indicate a global broadcast with 
735    num PEs.
736 */
737 void LogPool::addCreationMulticast(UShort mIdx, UShort eIdx, double time,
738                                    int event, int pe, int ml, CmiObjId *id,
739                                    double recvT, int numPe, int *pelist)
740 {
741   new (&pool[numEntries++])
742     LogEntry(time, mIdx, eIdx, event, pe, ml, id, recvT, numPe, pelist);
743   if(poolSize==numEntries) {
744     flushLogBuffer();
745   }
746 }
747
748 void LogPool::postProcessLog()
749 {
750 #if CMK_BIGSIM_CHARM
751   bgUpdateProj(1);   // event type
752 #endif
753 }
754
755 void LogPool::modLastEntryTimestamp(double ts)
756 {
757   pool[numEntries-1].time = ts;
758   //pool[numEntries-1].cputime = ts;
759 }
760
761 // /** Constructor for a multicast log entry */
762 // 
763 //  THIS WAS MOVED TO trace-projections.h with the other constructors
764 // 
765 // LogEntry::LogEntry(double tm, unsigned short m, unsigned short e, int ev, int p,
766 //           int ml, CmiObjId *d, double rt, int numPe, int *pelist) 
767 // {
768 //     type = CREATION_MULTICAST; mIdx = m; eIdx = e; event = ev; pe = p; time = tm; msglen = ml;
769 //     if (d) id = *d; else {id.id[0]=id.id[1]=id.id[2]=id.id[3]=-1; };
770 //     recvTime = rt; 
771 //     numpes = numPe;
772 //     userSuppliedNote = NULL;
773 //     if (pelist != NULL) {
774 //      pes = new int[numPe];
775 //      for (int i=0; i<numPe; i++) {
776 //        pes[i] = pelist[i];
777 //      }
778 //     } else {
779 //      pes= NULL;
780 //     }
781 // }
782
783 //void LogEntry::addPapi(LONG_LONG_PAPI *papiVals)
784 //{
785 //#if CMK_HAS_COUNTER_PAPI
786 //   memcpy(papiValues, papiVals, sizeof(LONG_LONG_PAPI)*NUMPAPIEVENTS);
787 //#endif
788 //}
789
790
791
792 void LogEntry::pup(PUP::er &p)
793 {
794   int i;
795   CMK_TYPEDEF_UINT8 itime, iEndTime, irecvtime, icputime;
796   char ret = '\n';
797
798   p|type;
799   if (p.isPacking()) itime = (CMK_TYPEDEF_UINT8)(1.0e6*time);
800   if (p.isPacking()) iEndTime = (CMK_TYPEDEF_UINT8)(1.0e6*endTime);
801
802   switch (type) {
803     case USER_EVENT:
804     case USER_EVENT_PAIR:
805       p|mIdx; p|itime; p|event; p|pe;
806       break;
807     case BEGIN_IDLE:
808     case END_IDLE:
809     case BEGIN_PACK:
810     case END_PACK:
811     case BEGIN_UNPACK:
812     case END_UNPACK:
813       p|itime; p|pe; 
814       break;
815     case BEGIN_PROCESSING:
816       if (p.isPacking()) {
817         irecvtime = (CMK_TYPEDEF_UINT8)(recvTime==-1?-1:1.0e6*recvTime);
818         icputime = (CMK_TYPEDEF_UINT8)(1.0e6*cputime);
819       }
820       p|mIdx; p|eIdx; p|itime; p|event; p|pe; 
821       p|msglen; p|irecvtime; 
822       p|id.id[0]; p|id.id[1]; p|id.id[2]; p|id.id[3];
823       p|icputime;
824 #if CMK_HAS_COUNTER_PAPI
825       //p|numPapiEvents;
826       for (i=0; i<NUMPAPIEVENTS; i++) {
827         // not yet!!!
828         //      p|papiIDs[i]; 
829         p|papiValues[i];
830         
831       }
832 #else
833       //p|numPapiEvents;     // non papi version has value 0
834 #endif
835       if (p.isUnpacking()) {
836         recvTime = irecvtime/1.0e6;
837         cputime = icputime/1.0e6;
838       }
839       break;
840     case END_PROCESSING:
841       if (p.isPacking()) icputime = (CMK_TYPEDEF_UINT8)(1.0e6*cputime);
842       p|mIdx; p|eIdx; p|itime; p|event; p|pe; p|msglen; p|icputime;
843 #if CMK_HAS_COUNTER_PAPI
844       //p|numPapiEvents;
845       for (i=0; i<NUMPAPIEVENTS; i++) {
846         // not yet!!!
847         //      p|papiIDs[i];
848         p|papiValues[i];
849       }
850 #else
851       //p|numPapiEvents;  // non papi version has value 0
852 #endif
853       if (p.isUnpacking()) cputime = icputime/1.0e6;
854       break;
855     case USER_SUPPLIED:
856           p|userSuppliedData;
857           p|itime;
858         break;
859     case USER_SUPPLIED_NOTE:
860           p|itime;
861           int length;
862           if (p.isPacking()) length = strlen(userSuppliedNote);
863           p | length;
864           char space;
865           space = ' ';
866           p | space;
867           if (p.isUnpacking()) {
868             userSuppliedNote = new char[length+1];
869             userSuppliedNote[length] = '\0';
870           }
871           PUParray(p,userSuppliedNote, length);
872           break;
873     case USER_SUPPLIED_BRACKETED_NOTE:
874       //CkPrintf("Writting out a USER_SUPPLIED_BRACKETED_NOTE\n");
875           p|itime;
876           p|iEndTime;
877           p|event;
878           int length2;
879           if (p.isPacking()) length2 = strlen(userSuppliedNote);
880           p | length2;
881           char space2;
882           space2 = ' ';
883           p | space2;
884           if (p.isUnpacking()) {
885             userSuppliedNote = new char[length+1];
886             userSuppliedNote[length] = '\0';
887           }
888           PUParray(p,userSuppliedNote, length2);
889           break;
890     case MEMORY_USAGE_CURRENT:
891       p | memUsage;
892       p | itime;
893         break;
894     case CREATION:
895       if (p.isPacking()) irecvtime = (CMK_TYPEDEF_UINT8)(1.0e6*recvTime);
896       p|mIdx; p|eIdx; p|itime;
897       p|event; p|pe; p|msglen; p|irecvtime;
898       if (p.isUnpacking()) recvTime = irecvtime/1.0e6;
899       break;
900     case CREATION_BCAST:
901       if (p.isPacking()) irecvtime = (CMK_TYPEDEF_UINT8)(1.0e6*recvTime);
902       p|mIdx; p|eIdx; p|itime;
903       p|event; p|pe; p|msglen; p|irecvtime; p|numpes;
904       if (p.isUnpacking()) recvTime = irecvtime/1.0e6;
905       break;
906     case CREATION_MULTICAST:
907       if (p.isPacking()) irecvtime = (CMK_TYPEDEF_UINT8)(1.0e6*recvTime);
908       p|mIdx; p|eIdx; p|itime;
909       p|event; p|pe; p|msglen; p|irecvtime; p|numpes;
910       if (p.isUnpacking()) pes = numpes?new int[numpes]:NULL;
911       for (i=0; i<numpes; i++) p|pes[i];
912       if (p.isUnpacking()) recvTime = irecvtime/1.0e6;
913       break;
914     case MESSAGE_RECV:
915       p|mIdx; p|eIdx; p|itime; p|event; p|pe; p|msglen;
916       break;
917
918     case ENQUEUE:
919     case DEQUEUE:
920       p|mIdx; p|itime; p|event; p|pe;
921       break;
922
923     case BEGIN_INTERRUPT:
924     case END_INTERRUPT:
925       p|itime; p|event; p|pe;
926       break;
927
928       // **CW** absolute timestamps are used here to support a quick
929       // way of determining the total time of a run in projections
930       // visualization.
931     case BEGIN_COMPUTATION:
932     case END_COMPUTATION:
933     case BEGIN_TRACE:
934     case END_TRACE:
935       p|itime;
936       break;
937     case BEGIN_FUNC:
938         p | itime;
939         p | mIdx;
940         p | event;
941         if(!p.isUnpacking()){
942                 p(fName,flen-1);
943         }
944         break;
945     case END_FUNC:
946         p | itime;
947         p | mIdx;
948         break;
949     case END_PHASE:
950       p|eIdx; // FIXME: actually the phase ID
951       p|itime;
952       break;
953     default:
954       CmiError("***Internal Error*** Wierd Event %d.\n", type);
955       break;
956   }
957   if (p.isUnpacking()) time = itime/1.0e6;
958   p|ret;
959 }
960
961 TraceProjections::TraceProjections(char **argv): 
962   curevent(0), inEntry(0), computationStarted(0), 
963         converseExit(0), endTime(0.0), traceNestedEvents(0),
964         currentPhaseID(0), lastPhaseEvent(NULL)
965 {
966   //  CkPrintf("Trace projections dummy constructor called on %d\n",CkMyPe());
967
968   if (CkpvAccess(traceOnPe) == 0) return;
969
970   CtvInitialize(int,curThreadEvent);
971   CkpvInitialize(CmiInt8, CtrLogBufSize);
972   CkpvAccess(CtrLogBufSize) = DefaultLogBufSize;
973   CtvAccess(curThreadEvent)=0;
974   if (CmiGetArgLongDesc(argv,"+logsize",&CkpvAccess(CtrLogBufSize), 
975                        "Log entries to buffer per I/O")) {
976     if (CkMyPe() == 0) {
977       CmiPrintf("Trace: logsize: %ld\n", CkpvAccess(CtrLogBufSize));
978     }
979   }
980   checknested = 
981     CmiGetArgFlagDesc(argv,"+checknested",
982                       "check projections nest begin end execute events");
983   traceNestedEvents = 
984     CmiGetArgFlagDesc(argv,"+tracenested",
985               "trace projections nest begin/end execute events");
986   int binary = 
987     CmiGetArgFlagDesc(argv,"+binary-trace",
988                       "Write log files in binary format");
989
990   CmiInt8 nSubdirs = 0;
991   CmiGetArgLongDesc(argv,"+trace-subdirs", &nSubdirs, "Number of subdirectories into which traces will be written");
992
993
994 #if CMK_PROJECTIONS_USE_ZLIB
995   int compressed = CmiGetArgFlagDesc(argv,"+gz-trace","Write log files pre-compressed with gzip");
996 #else
997   // consume the flag so there's no confusing
998   CmiGetArgFlagDesc(argv,"+gz-trace",
999                     "Write log files pre-compressed with gzip");
1000   if(CkMyPe() == 0) CkPrintf("Warning> gz-trace is not supported on this machine!\n");
1001 #endif
1002
1003   // **CW** default to non delta log encoding. The user may choose to do
1004   // create both logs (for debugging) or just the old log timestamping
1005   // (for compatibility).
1006   // Generating just the non delta log takes precedence over generating
1007   // both logs (if both arguments appear on the command line).
1008
1009   // switch to OLD log format until everything works // Gengbin
1010   nonDeltaLog = 1;
1011   deltaLog = 0;
1012   deltaLog = CmiGetArgFlagDesc(argv, "+logDelta",
1013                                   "Generate Delta encoded and simple timestamped log files");
1014
1015   _logPool = new LogPool(CkpvAccess(traceRoot));
1016   _logPool->setNumSubdirs(nSubdirs);
1017   _logPool->setBinary(binary);
1018 #if CMK_PROJECTIONS_USE_ZLIB
1019   _logPool->setCompressed(compressed);
1020 #endif
1021   if (CkMyPe() == 0) {
1022     _logPool->createSts();
1023     _logPool->createRC();
1024   }
1025   funcCount=1;
1026
1027 #if CMK_HAS_COUNTER_PAPI
1028   // We initialize and create the event sets for use with PAPI here.
1029   int papiRetValue;
1030   if(CkMyRank()==0){
1031     papiRetValue = PAPI_library_init(PAPI_VER_CURRENT);
1032     if (papiRetValue != PAPI_VER_CURRENT) {
1033       CmiAbort("PAPI Library initialization failure!\n");
1034     }
1035 #if CMK_SMP
1036     if(PAPI_thread_init(pthread_self) != PAPI_OK){
1037       CmiAbort("PAPI could not be initialized in SMP mode!\n");
1038     }
1039 #endif
1040   }
1041
1042 #if CMK_SMP
1043   //PAPI_thread_init has to finish before calling PAPI_create_eventset
1044   CmiNodeAllBarrier();
1045 #endif
1046   // PAPI 3 mandates the initialization of the set to PAPI_NULL
1047   papiEventSet = PAPI_NULL; 
1048   if (PAPI_create_eventset(&papiEventSet) != PAPI_OK) {
1049     CmiAbort("PAPI failed to create event set!\n");
1050   }
1051   papiRetValue = PAPI_add_events(papiEventSet, papiEvents, NUMPAPIEVENTS);
1052   if (papiRetValue != PAPI_OK) {
1053     if (papiRetValue == PAPI_ECNFLCT) {
1054       CmiAbort("PAPI events conflict! Please re-assign event types!\n");
1055     } else {
1056       CmiAbort("PAPI failed to add designated events!\n");
1057     }
1058   }
1059   memset(papiValues, 0, NUMPAPIEVENTS*sizeof(LONG_LONG_PAPI));
1060 #endif
1061 }
1062
1063 int TraceProjections::traceRegisterUserEvent(const char* evt, int e)
1064 {
1065   OPTIMIZED_VERSION
1066   CkAssert(e==-1 || e>=0);
1067   CkAssert(evt != NULL);
1068   int event;
1069   int biggest = -1;
1070   for (int i=0; i<CkpvAccess(usrEvents)->length(); i++) {
1071     int cur = (*CkpvAccess(usrEvents))[i]->e;
1072     if (cur == e) {
1073       //CmiPrintf("%s %s\n", (*CkpvAccess(usrEvents))[i]->str, evt);
1074       if (strcmp((*CkpvAccess(usrEvents))[i]->str, evt) == 0) 
1075         return e;
1076       else
1077         CmiAbort("UserEvent double registered!");
1078     }
1079     if (cur > biggest) biggest = cur;
1080   }
1081   // if no user events have so far been registered. biggest will be -1
1082   // and hence newly assigned event numbers will begin from 0.
1083   if (e==-1) event = biggest+1;  // automatically assign new event number
1084   else event = e;
1085   CkpvAccess(usrEvents)->push_back(new UsrEvent(event,(char *)evt));
1086   return event;
1087 }
1088
1089 void TraceProjections::traceClearEps(void)
1090 {
1091   // In trace-summary, this zeros out the EP bins, to eliminate noise
1092   // from startup.  Here, this isn't useful, since we can do that in
1093   // post-processing
1094 }
1095
1096 void TraceProjections::traceWriteSts(void)
1097 {
1098   if(CkMyPe()==0)
1099     _logPool->writeSts(this);
1100 }
1101
1102 /** 
1103  * **IMPT NOTES**:
1104  *
1105  * This is called when Converse closes during ConverseCommonExit().
1106  * **FIXME**(?) - is this also exposed as a tracing-framework API call?
1107  *
1108  * Some programs bypass CkExit() (like NAMD, which eventually calls
1109  * ConverseExit()), modules like traces will have to pretend to shutdown
1110  * as if CkExit() was called but at the same time avoid making
1111  * subsequent CkExit() calls (which is usually required for allowing
1112  * other modules to shutdown).
1113  *
1114  * Note that we can only get here if CkExit() was not called, since the
1115  * trace module will un-register itself from TraceArray if it did.
1116  *
1117  */
1118 void TraceProjections::traceClose(void)
1119 {
1120 #ifdef PROJ_ANALYSIS
1121   // CkPrintf("CkExit was not called on shutdown on [%d]\n", CkMyPe());
1122
1123   // sets the flag that tells the code not to make the CkExit call later
1124   converseExit = 1;
1125   if (CkMyPe() == 0) {
1126     CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1127     bocProxy.traceProjectionsParallelShutdown(-1);
1128   }
1129   if(CkMyRank() == CkMyNodeSize()){ //communication thread
1130     CkpvAccess(_trace)->endComputation();
1131     delete _logPool;              // will write
1132     // remove myself from traceArray so that no tracing will be called.
1133     CkpvAccess(_traces)->removeTrace(this);
1134   }
1135 #else
1136   // we've already deleted the logpool, so multiple calls to traceClose
1137   // are tolerated.
1138   if (_logPool == NULL) {
1139     return;
1140   }
1141   if(CkMyPe()==0){
1142     _logPool->writeSts(this);
1143   }
1144   CkpvAccess(_trace)->endComputation();
1145   delete _logPool;              // will write
1146   // remove myself from traceArray so that no tracing will be called.
1147   CkpvAccess(_traces)->removeTrace(this);
1148 #endif
1149 }
1150
1151 /**
1152  *  **IMPT NOTES**:
1153  *
1154  *  This is meant to be called internally by the tracing framework.
1155  *
1156  */
1157 void TraceProjections::closeTrace() {
1158   //  CkPrintf("Close Trace called on [%d]\n", CkMyPe());
1159   if (CkMyPe() == 0) {
1160     // CkPrintf("Pe 0 will now write sts and projrc files\n");
1161     _logPool->writeSts(this);
1162     _logPool->writeRC();
1163     // CkPrintf("Pe 0 has now written sts and projrc files\n");
1164   }
1165   delete _logPool;       // will write logs to file
1166 }
1167
1168 #if CMK_SMP_TRACE_COMMTHREAD
1169 void TraceProjections::traceBeginOnCommThread()
1170 {
1171   if (!computationStarted) return;
1172   _logPool->add(BEGIN_TRACE, 0, 0, TraceTimer(), curevent++, CmiNumPes()+CmiMyNode());
1173 }
1174
1175 void TraceProjections::traceEndOnCommThread()
1176 {
1177   _logPool->add(END_TRACE, 0, 0, TraceTimer(), curevent++, CmiNumPes()+CmiMyNode());
1178 }
1179 #endif
1180
1181 void TraceProjections::traceBegin(void)
1182 {
1183   if (!computationStarted) return;
1184   _logPool->add(BEGIN_TRACE, 0, 0, TraceTimer(), curevent++, CkMyPe());
1185 }
1186
1187 void TraceProjections::traceEnd(void)
1188 {
1189   _logPool->add(END_TRACE, 0, 0, TraceTimer(), curevent++, CkMyPe());
1190 }
1191
1192 void TraceProjections::userEvent(int e)
1193 {
1194   if (!computationStarted) return;
1195   _logPool->add(USER_EVENT, e, 0, TraceTimer(),curevent++,CkMyPe());
1196 }
1197
1198 void TraceProjections::userBracketEvent(int e, double bt, double et)
1199 {
1200   if (!computationStarted) return;
1201   // two events record Begin/End of event e.
1202   _logPool->add(USER_EVENT_PAIR, e, 0, TraceTimer(bt), curevent, CkMyPe());
1203   _logPool->add(USER_EVENT_PAIR, e, 0, TraceTimer(et), curevent++, CkMyPe());
1204 }
1205
1206 void TraceProjections::userSuppliedData(int d)
1207 {
1208   if (!computationStarted) return;
1209   _logPool->addUserSupplied(d);
1210 }
1211
1212 void TraceProjections::userSuppliedNote(char *note)
1213 {
1214   if (!computationStarted) return;
1215   _logPool->addUserSuppliedNote(note);
1216 }
1217
1218
1219 void TraceProjections::userSuppliedBracketedNote(char *note, int eventID, double bt, double et)
1220 {
1221   if (!computationStarted) return;
1222   _logPool->addUserSuppliedBracketedNote(note,  eventID,  bt, et);
1223 }
1224
1225 void TraceProjections::memoryUsage(double m)
1226 {
1227   if (!computationStarted) return;
1228   _logPool->addMemoryUsage(MEMORY_USAGE_CURRENT, TraceTimer(), m );
1229   
1230 }
1231
1232
1233 void TraceProjections::creation(envelope *e, int ep, int num)
1234 {
1235   double curTime = TraceTimer();
1236   if (e == 0) {
1237     CtvAccess(curThreadEvent) = curevent;
1238     _logPool->add(CREATION, ForChareMsg, ep, curTime,
1239                   curevent++, CkMyPe(), 0, NULL, 0, 0.0);
1240   } else {
1241     int type=e->getMsgtype();
1242     e->setEvent(curevent);
1243     if (num > 1) {
1244       _logPool->add(CREATION_BCAST, type, ep, curTime,
1245                     curevent++, CkMyPe(), e->getTotalsize(), 
1246                     NULL, 0, 0.0, num);
1247     } else {
1248       _logPool->add(CREATION, type, ep, curTime,
1249                     curevent++, CkMyPe(), e->getTotalsize(), 
1250                     NULL, 0, 0.0);
1251     }
1252   }
1253 }
1254
1255 //This function is only called from a comm thread in SMP mode. 
1256 void TraceProjections::creation(char *msg)
1257 {
1258 #if CMK_SMP_TRACE_COMMTHREAD
1259         // msg must be a charm message
1260         envelope *e = (envelope *)msg;
1261         int ep = e->getEpIdx();
1262         if(ep==0) return;
1263         int num = _entryTable.size();
1264         CmiAssert(ep < num);
1265         if(_entryTable[ep]->traceEnabled) {
1266                 creation(e, ep, 1);
1267                 e->setSrcPe(CkMyPe());              // pretend I am the sender
1268         }
1269 #endif
1270 }
1271
1272
1273 /* **CW** Non-disruptive attempt to add destination PE knowledge to
1274    Communication Library-specific Multicasts via new event 
1275    CREATION_MULTICAST.
1276 */
1277
1278 void TraceProjections::creationMulticast(envelope *e, int ep, int num,
1279                                          int *pelist)
1280 {
1281   double curTime = TraceTimer();
1282   if (e==0) {
1283     CtvAccess(curThreadEvent)=curevent;
1284     _logPool->addCreationMulticast(ForChareMsg, ep, curTime, curevent++,
1285                                    CkMyPe(), 0, 0, 0.0, num, pelist);
1286   } else {
1287     int type=e->getMsgtype();
1288     e->setEvent(curevent);
1289     _logPool->addCreationMulticast(type, ep, curTime, curevent++, CkMyPe(),
1290                                    e->getTotalsize(), 0, 0.0, num, pelist);
1291   }
1292 }
1293
1294 void TraceProjections::creationDone(int num)
1295 {
1296   // modified the creation done time of the last num log entries
1297   // FIXME: totally a hack
1298   double curTime = TraceTimer();
1299   int idx = _logPool->numEntries-1;
1300   while (idx >=0 && num >0 ) {
1301     LogEntry &log = _logPool->pool[idx];
1302     if ((log.type == CREATION) ||
1303         (log.type == CREATION_BCAST) ||
1304         (log.type == CREATION_MULTICAST)) {
1305       log.recvTime = curTime - log.time;
1306       num --;
1307     }
1308     idx--;
1309   }
1310 }
1311
1312 void TraceProjections::beginExecute(CmiObjId *tid)
1313 {
1314 #if CMK_HAS_COUNTER_PAPI
1315   if (PAPI_read(papiEventSet, papiValues) != PAPI_OK) {
1316     CmiAbort("PAPI failed to read at begin execute!\n");
1317   }
1318 #endif
1319   if (checknested && inEntry) CmiAbort("Nested Begin Execute!\n");
1320   execEvent = CtvAccess(curThreadEvent);
1321   execEp = (-1);
1322   _logPool->add(BEGIN_PROCESSING,ForChareMsg,_threadEP,TraceTimer(),
1323                 execEvent,CkMyPe(), 0, tid);
1324 #if CMK_HAS_COUNTER_PAPI
1325   _logPool->addPapi(papiValues);
1326 #endif
1327   inEntry = 1;
1328 }
1329
1330 void TraceProjections::beginExecute(envelope *e)
1331 {
1332   if(e==0) {
1333 #if CMK_HAS_COUNTER_PAPI
1334     if (PAPI_read(papiEventSet, papiValues) != PAPI_OK) {
1335       CmiAbort("PAPI failed to read at begin execute!\n");
1336     }
1337 #endif
1338     if (checknested && inEntry) CmiAbort("Nested Begin Execute!\n");
1339     execEvent = CtvAccess(curThreadEvent);
1340     execEp = (-1);
1341     _logPool->add(BEGIN_PROCESSING,ForChareMsg,_threadEP,TraceTimer(),
1342                   execEvent,CkMyPe(), 0, NULL, 0.0, TraceCpuTimer());
1343 #if CMK_HAS_COUNTER_PAPI
1344     _logPool->addPapi(papiValues);
1345 #endif
1346     inEntry = 1;
1347   } else {
1348     beginExecute(e->getEvent(),e->getMsgtype(),e->getEpIdx(),
1349                  e->getSrcPe(),e->getTotalsize());
1350   }
1351 }
1352
1353 void TraceProjections::beginExecute(char *msg){
1354 #if CMK_SMP_TRACE_COMMTHREAD
1355         //This function is called from comm thread in SMP mode
1356     envelope *e = (envelope *)msg;
1357     int ep = e->getEpIdx();
1358     if (ep == 0) return;
1359     int num = _entryTable.size();
1360     CmiAssert(ep < num);
1361     if(_entryTable[ep]->traceEnabled)
1362                 beginExecute(e);
1363 #endif
1364 }
1365
1366 void TraceProjections::beginExecute(int event, int msgType, int ep, int srcPe,
1367                                     int mlen, CmiObjId *idx)
1368 {
1369   if (traceNestedEvents) {
1370     if (! nestedEvents.isEmpty()) {
1371       endExecuteLocal();
1372     }
1373     nestedEvents.enq(NestedEvent(event, msgType, ep, srcPe, mlen, idx));
1374   }
1375   beginExecuteLocal(event, msgType, ep, srcPe, mlen, idx);
1376 }
1377
1378 void TraceProjections::changeLastEntryTimestamp(double ts)
1379 {
1380   _logPool->modLastEntryTimestamp(ts);
1381 }
1382
1383 void TraceProjections::beginExecuteLocal(int event, int msgType, int ep, int srcPe,
1384                                     int mlen, CmiObjId *idx)
1385 {
1386 #if CMK_HAS_COUNTER_PAPI
1387   if (PAPI_read(papiEventSet, papiValues) != PAPI_OK) {
1388     CmiAbort("PAPI failed to read at begin execute!\n");
1389   }
1390 #endif
1391   if (checknested && inEntry) CmiAbort("Nested Begin Execute!\n");
1392   execEvent=event;
1393   execEp=ep;
1394   execPe=srcPe;
1395   _logPool->add(BEGIN_PROCESSING,msgType,ep,TraceTimer(),event,
1396                 srcPe, mlen, idx, 0.0, TraceCpuTimer());
1397 #if CMK_HAS_COUNTER_PAPI
1398   _logPool->addPapi(papiValues);
1399 #endif
1400   inEntry = 1;
1401 }
1402
1403 void TraceProjections::endExecute(void)
1404 {
1405   if (traceNestedEvents) nestedEvents.deq();
1406   endExecuteLocal();
1407   if (traceNestedEvents) {
1408     if (! nestedEvents.isEmpty()) {
1409       NestedEvent &ne = nestedEvents.peek();
1410       beginExecuteLocal(ne.event, ne.msgType, ne.ep, ne.srcPe, ne.ml, ne.idx);
1411     }
1412   }
1413 }
1414
1415 void TraceProjections::endExecute(char *msg)
1416 {
1417 #if CMK_SMP_TRACE_COMMTHREAD
1418         //This function is called from comm thread in SMP mode
1419     envelope *e = (envelope *)msg;
1420     int ep = e->getEpIdx();
1421     if (ep == 0) return;
1422     int num = _entryTable.size();
1423     CmiAssert(ep < num);
1424     if(_entryTable[ep]->traceEnabled)
1425                 endExecute();
1426 #endif  
1427 }
1428
1429 void TraceProjections::endExecuteLocal(void)
1430 {
1431 #if CMK_HAS_COUNTER_PAPI
1432   if (PAPI_read(papiEventSet, papiValues) != PAPI_OK) {
1433     CmiAbort("PAPI failed to read at end execute!\n");
1434   }
1435 #endif
1436   if (checknested && !inEntry) CmiAbort("Nested EndExecute!\n");
1437   double cputime = TraceCpuTimer();
1438   if(execEp == (-1)) {
1439     _logPool->add(END_PROCESSING, 0, _threadEP, TraceTimer(),
1440                   execEvent, CkMyPe(), 0, NULL, 0.0, cputime);
1441   } else {
1442     _logPool->add(END_PROCESSING, 0, execEp, TraceTimer(),
1443                   execEvent, execPe, 0, NULL, 0.0, cputime);
1444   }
1445 #if CMK_HAS_COUNTER_PAPI
1446   _logPool->addPapi(papiValues);
1447 #endif
1448   inEntry = 0;
1449 }
1450
1451 void TraceProjections::messageRecv(char *env, int pe)
1452 {
1453 #if 0
1454   envelope *e = (envelope *)env;
1455   int msgType = e->getMsgtype();
1456   int ep = e->getEpIdx();
1457 #if 0
1458   if (msgType==NewChareMsg || msgType==NewVChareMsg
1459           || msgType==ForChareMsg || msgType==ForVidMsg
1460           || msgType==BocInitMsg || msgType==NodeBocInitMsg
1461           || msgType==ForBocMsg || msgType==ForNodeBocMsg)
1462     ep = e->getEpIdx();
1463   else
1464     ep = _threadEP;
1465 #endif
1466   _logPool->add(MESSAGE_RECV, msgType, ep, TraceTimer(),
1467                 curevent++, e->getSrcPe(), e->getTotalsize());
1468 #endif
1469 }
1470
1471 void TraceProjections::beginIdle(double curWallTime)
1472 {
1473   _logPool->add(BEGIN_IDLE, 0, 0, TraceTimer(curWallTime), 0, CkMyPe());
1474 }
1475
1476 void TraceProjections::endIdle(double curWallTime)
1477 {
1478   _logPool->add(END_IDLE, 0, 0, TraceTimer(curWallTime), 0, CkMyPe());
1479 }
1480
1481 void TraceProjections::beginPack(void)
1482 {
1483   _logPool->add(BEGIN_PACK, 0, 0, TraceTimer(), 0, CkMyPe());
1484 }
1485
1486 void TraceProjections::endPack(void)
1487 {
1488   _logPool->add(END_PACK, 0, 0, TraceTimer(), 0, CkMyPe());
1489 }
1490
1491 void TraceProjections::beginUnpack(void)
1492 {
1493   _logPool->add(BEGIN_UNPACK, 0, 0, TraceTimer(), 0, CkMyPe());
1494 }
1495
1496 void TraceProjections::endUnpack(void)
1497 {
1498   _logPool->add(END_UNPACK, 0, 0, TraceTimer(), 0, CkMyPe());
1499 }
1500
1501 void TraceProjections::enqueue(envelope *) {}
1502
1503 void TraceProjections::dequeue(envelope *) {}
1504
1505 void TraceProjections::beginComputation(void)
1506 {
1507   computationStarted = 1;
1508
1509   // Executes the callback function provided by the machine
1510   // layer. This is the proper method to register user events in a
1511   // machine layer because projections is a charm++ module.
1512   if (CkpvAccess(traceOnPe) != 0) {
1513     void (*ptr)() = registerMachineUserEvents();
1514     if (ptr != NULL) {
1515       ptr();
1516     }
1517   }
1518 //  CkpvAccess(traceInitTime) = TRACE_TIMER();
1519 //  CkpvAccess(traceInitCpuTime) = TRACE_CPUTIMER();
1520   _logPool->add(BEGIN_COMPUTATION, 0, 0, TraceTimer(), -1, -1);
1521 #if CMK_HAS_COUNTER_PAPI
1522   // we start the counters here
1523   if (PAPI_start(papiEventSet) != PAPI_OK) {
1524     CmiAbort("PAPI failed to start designated counters!\n");
1525   }
1526 #endif
1527 }
1528
1529 void TraceProjections::endComputation(void)
1530 {
1531 #if CMK_HAS_COUNTER_PAPI
1532   // we stop the counters here. A silent failure is alright since we
1533   // are already at the end of the program.
1534   if (PAPI_stop(papiEventSet, papiValues) != PAPI_OK) {
1535     CkPrintf("Warning: PAPI failed to stop correctly!\n");
1536   }
1537   // NOTE: We should not do a complete close of PAPI until after the
1538   // sts writer is done.
1539 #endif
1540   endTime = TraceTimer();
1541   _logPool->add(END_COMPUTATION, 0, 0, endTime, -1, -1);
1542   /*
1543   CkPrintf("End Computation [%d] records time as %lf\n", CkMyPe(), 
1544            endTime*1e06);
1545   */
1546 }
1547
1548 int TraceProjections::idxRegistered(int idx)
1549 {
1550     int idxVecLen = idxVec.size();
1551     for(int i=0; i<idxVecLen; i++)
1552     {
1553         if(idx == idxVec[i])
1554             return 1;
1555     }
1556     return 0;
1557 }
1558
1559 void TraceProjections::regFunc(const char *name, int &idx, int idxSpecifiedByUser){
1560     StrKey k((char*)name,strlen(name));
1561     int num = funcHashtable.get(k);
1562     
1563     if(num!=0) {
1564         return;
1565         //as for mpi programs, the same function may be registered for several times
1566         //CmiError("\"%s has been already registered! Please change the name!\"\n", name);
1567     }
1568     
1569     int isIdxExisting=0;
1570     if(idxSpecifiedByUser)
1571         isIdxExisting=idxRegistered(idx);
1572     if(isIdxExisting){
1573         return;
1574         //same reason with num!=0
1575         //CmiError("The identifier %d for the trace function has been already registered!", idx);
1576     }
1577
1578     if(idxSpecifiedByUser) {
1579         char *st = new char[strlen(name)+1];
1580         memcpy(st,name,strlen(name)+1);
1581         StrKey *newKey = new StrKey(st,strlen(st));
1582         int &ref = funcHashtable.put(*newKey);
1583         ref=idx;
1584         funcCount++;
1585         idxVec.push_back(idx);  
1586     } else {
1587         char *st = new char[strlen(name)+1];
1588         memcpy(st,name,strlen(name)+1);
1589         StrKey *newKey = new StrKey(st,strlen(st));
1590         int &ref = funcHashtable.put(*newKey);
1591         ref=funcCount;
1592         num = funcCount;
1593         funcCount++;
1594         idx = num;
1595         idxVec.push_back(idx);
1596     }
1597 }
1598
1599 void TraceProjections::beginFunc(char *name,char *file,int line){
1600         StrKey k(name,strlen(name));    
1601         unsigned short  num = (unsigned short)funcHashtable.get(k);
1602         beginFunc(num,file,line);
1603 }
1604
1605 void TraceProjections::beginFunc(int idx,char *file,int line){
1606         if(idx <= 0){
1607                 CmiError("Unregistered function id %d being used in %s:%d \n",idx,file,line);
1608         }       
1609         _logPool->add(BEGIN_FUNC,TraceTimer(),idx,line,file);
1610 }
1611
1612 void TraceProjections::endFunc(char *name){
1613         StrKey k(name,strlen(name));    
1614         int num = funcHashtable.get(k);
1615         endFunc(num);   
1616 }
1617
1618 void TraceProjections::endFunc(int num){
1619         if(num <= 0){
1620                 printf("endFunc without start :O\n");
1621         }
1622         _logPool->add(END_FUNC,TraceTimer(),num,0,NULL);
1623 }
1624
1625 // specialized PUP:ers for handling trace projections logs
1626 void toProjectionsFile::bytes(void *p,int n,size_t itemSize,dataType t)
1627 {
1628   for (int i=0;i<n;i++) 
1629     switch(t) {
1630     case Tchar: CheckAndFPrintF(f,"%c",((char *)p)[i]); break;
1631     case Tuchar:
1632     case Tbyte: CheckAndFPrintF(f,"%d",((unsigned char *)p)[i]); break;
1633     case Tshort: CheckAndFPrintF(f," %d",((short *)p)[i]); break;
1634     case Tushort: CheckAndFPrintF(f," %u",((unsigned short *)p)[i]); break;
1635     case Tint: CheckAndFPrintF(f," %d",((int *)p)[i]); break;
1636     case Tuint: CheckAndFPrintF(f," %u",((unsigned int *)p)[i]); break;
1637     case Tlong: CheckAndFPrintF(f," %ld",((long *)p)[i]); break;
1638     case Tulong: CheckAndFPrintF(f," %lu",((unsigned long *)p)[i]); break;
1639     case Tfloat: CheckAndFPrintF(f," %.7g",((float *)p)[i]); break;
1640     case Tdouble: CheckAndFPrintF(f," %.15g",((double *)p)[i]); break;
1641 #ifdef CMK_PUP_LONG_LONG
1642     case Tlonglong: CheckAndFPrintF(f," %lld",((CMK_TYPEDEF_INT8 *)p)[i]); break;
1643     case Tulonglong: CheckAndFPrintF(f," %llu",((CMK_TYPEDEF_UINT8 *)p)[i]); break;
1644 #endif
1645     default: CmiAbort("Unrecognized pup type code!");
1646     };
1647 }
1648
1649 void fromProjectionsFile::bytes(void *p,int n,size_t itemSize,dataType t)
1650 {
1651   for (int i=0;i<n;i++) 
1652     switch(t) {
1653     case Tchar: { 
1654       char c = fgetc(f);
1655       if (c==EOF)
1656         parseError("Could not match character");
1657       else
1658         ((char *)p)[i] = c;
1659       break;
1660     }
1661     case Tuchar:
1662     case Tbyte: ((unsigned char *)p)[i]=(unsigned char)readInt("%d"); break;
1663     case Tshort:((short *)p)[i]=(short)readInt(); break;
1664     case Tushort: ((unsigned short *)p)[i]=(unsigned short)readUint(); break;
1665     case Tint:  ((int *)p)[i]=readInt(); break;
1666     case Tuint: ((unsigned int *)p)[i]=readUint(); break;
1667     case Tlong: ((long *)p)[i]=readInt(); break;
1668     case Tulong:((unsigned long *)p)[i]=readUint(); break;
1669     case Tfloat: ((float *)p)[i]=(float)readDouble(); break;
1670     case Tdouble:((double *)p)[i]=readDouble(); break;
1671 #ifdef CMK_PUP_LONG_LONG
1672     case Tlonglong: ((CMK_TYPEDEF_INT8 *)p)[i]=readLongInt(); break;
1673     case Tulonglong: ((CMK_TYPEDEF_UINT8 *)p)[i]=readLongInt(); break;
1674 #endif
1675     default: CmiAbort("Unrecognized pup type code!");
1676     };
1677 }
1678
1679 #if CMK_PROJECTIONS_USE_ZLIB
1680 void toProjectionsGZFile::bytes(void *p,int n,size_t itemSize,dataType t)
1681 {
1682   for (int i=0;i<n;i++) 
1683     switch(t) {
1684     case Tchar: gzprintf(f,"%c",((char *)p)[i]); break;
1685     case Tuchar:
1686     case Tbyte: gzprintf(f,"%d",((unsigned char *)p)[i]); break;
1687     case Tshort: gzprintf(f," %d",((short *)p)[i]); break;
1688     case Tushort: gzprintf(f," %u",((unsigned short *)p)[i]); break;
1689     case Tint: gzprintf(f," %d",((int *)p)[i]); break;
1690     case Tuint: gzprintf(f," %u",((unsigned int *)p)[i]); break;
1691     case Tlong: gzprintf(f," %ld",((long *)p)[i]); break;
1692     case Tulong: gzprintf(f," %lu",((unsigned long *)p)[i]); break;
1693     case Tfloat: gzprintf(f," %.7g",((float *)p)[i]); break;
1694     case Tdouble: gzprintf(f," %.15g",((double *)p)[i]); break;
1695 #ifdef CMK_PUP_LONG_LONG
1696     case Tlonglong: gzprintf(f," %lld",((CMK_TYPEDEF_INT8 *)p)[i]); break;
1697     case Tulonglong: gzprintf(f," %llu",((CMK_TYPEDEF_UINT8 *)p)[i]); break;
1698 #endif
1699     default: CmiAbort("Unrecognized pup type code!");
1700     };
1701 }
1702 #endif
1703
1704 void TraceProjections::endPhase() {
1705   double currentPhaseTime = TraceTimer();
1706   if (lastPhaseEvent != NULL) {
1707   } else {
1708     if (_logPool->pool != NULL) {
1709       // assumed to be BEGIN_COMPUTATION
1710     } else {
1711       CkPrintf("[%d] Warning: End Phase encountered in an empty log. Inserting BEGIN_COMPUTATION event\n", CkMyPe());
1712       _logPool->add(BEGIN_COMPUTATION, 0, 0, currentPhaseTime, -1, -1);
1713     }
1714   }
1715
1716   /* Insert endPhase event here. */
1717   /* FIXME: Format should be TYPE, PHASE#, TimeStamp, [StartTime] */
1718   /*        We currently "borrow" from the standard add() method. */
1719   /*        It should really be its own add() method.             */
1720   /* NOTE: assignment to lastPhaseEvent is "pre-emptive".         */
1721   lastPhaseEvent = &(_logPool->pool[_logPool->numEntries]);
1722   _logPool->add(END_PHASE, 0, currentPhaseID, currentPhaseTime, -1, CkMyPe());
1723   currentPhaseID++;
1724 }
1725
1726 #ifdef PROJ_ANALYSIS
1727 // ***** FROM HERE, ALL BOC-BASED FUNCTIONALITY IS DEFINED *******
1728
1729
1730 // ***@@@@ REGISTRATION FUNCTIONS/METHODS @@@@***
1731
1732 void registerOutlierReduction() {
1733   outlierReductionType =
1734     CkReduction::addReducer(outlierReduction);
1735   minMaxReductionType =
1736     CkReduction::addReducer(minMaxReduction);
1737 }
1738
1739 /**
1740  * **IMPT NOTES**:
1741  *
1742  * This is the C++ code that is registered to be activated at module
1743  * shutdown. This is called exactly once on processor 0. Module shutdown
1744  * is initiated as a result of a CkExit() call by the application code
1745  * 
1746  * The exit function must ultimately call CkExit() again to
1747  * so that other module exit functions may proceed after this module is
1748  * done.
1749  *
1750  */
1751 // FIXME: WHY extern "C"???
1752 extern "C" void TraceProjectionsExitHandler()
1753 {
1754 #if CMK_TRACE_ENABLED
1755   // CkPrintf("[%d] TraceProjectionsExitHandler called!\n", CkMyPe());
1756   CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1757   bocProxy.traceProjectionsParallelShutdown(CkMyPe());
1758 #else
1759   CkExit();
1760 #endif
1761 }
1762
1763 // This is called once on each processor but the idiom of use appears
1764 // to be to only have processor 0 register the function.
1765 //
1766 // See initnode in trace-projections.ci
1767 void initTraceProjectionsBOC()
1768 {
1769   // CkPrintf("[%d] Trace Projections initialization called!\n", CkMyPe());
1770 #ifdef __BIGSIM__
1771   if (BgNodeRank() == 0) {
1772 #else
1773     if (CkMyRank() == 0) {
1774 #endif
1775       registerExitFn(TraceProjectionsExitHandler);
1776     }
1777 #if 0
1778   } // this is so indentation does not get messed up
1779 #endif
1780 }
1781
1782 // mainchare for trace-projections BOC-operations. 
1783 // Instantiated at processor 0 and ONLY resides on processor 0 for the 
1784 // rest of its life.
1785 //
1786 // Responsible for:
1787 //   1. Handling commandline arguments
1788 //   2. Creating any objects required for proper BOC operations.
1789 //
1790 TraceProjectionsInit::TraceProjectionsInit(CkArgMsg *msg) {
1791   /** Options for Outlier Analysis */
1792   // defaults. Things will change with support for interactive analysis.
1793   bool findOutliers = false;
1794   bool outlierAutomatic = true;
1795   int numKSeeds = 10; 
1796
1797   int peNumKeep = CkNumPes();  // used as a default
1798   double entryThreshold = 0.0;
1799   bool outlierUsePhases = false;
1800   if (outlierAutomatic) {
1801     CmiGetArgIntDesc(msg->argv, "+outlierNumSeeds", &numKSeeds,
1802                      "Number of cluster seeds to apply at outlier analysis.");
1803     CmiGetArgIntDesc(msg->argv, "+outlierPeNumKeep", 
1804                      &peNumKeep, "Number of Processors to retain data");
1805     CmiGetArgDoubleDesc(msg->argv, "+outlierEpThresh", &entryThreshold,
1806                         "Minimum significance of entry points to be considered for clustering (%).");
1807     findOutliers =
1808       CmiGetArgFlagDesc(msg->argv,"+outlier", "Find Outliers.");
1809     outlierUsePhases = 
1810       CmiGetArgFlagDesc(msg->argv,"+outlierUsePhases",
1811                         "Apply automatic outlier analysis to any available phases.");
1812     if (outlierUsePhases) {
1813       // if the user wants to use an outlier feature, it is assumed outlier
1814       //    analysis is desired.
1815       findOutliers = true;
1816     }
1817   }
1818   bool findStartTime = (CmiTimerAbsolute()==1);
1819   traceProjectionsGID = CProxy_TraceProjectionsBOC::ckNew(findOutliers, findStartTime);
1820   if (findOutliers) {
1821     kMeansGID = CProxy_KMeansBOC::ckNew(outlierAutomatic,
1822                                         numKSeeds,
1823                                         peNumKeep,
1824                                         entryThreshold,
1825                                         outlierUsePhases);
1826   }
1827 }
1828
1829 // Called on every processor.
1830 void TraceProjectionsBOC::traceProjectionsParallelShutdown(int pe) {
1831   //CmiPrintf("[%d] traceProjectionsParallelShutdown called from . \n", CkMyPe(), pe);
1832   endPe = pe;                // the pe that starts CkExit()
1833   if (CkMyPe() == 0) {
1834     analysisStartTime = CmiWallTimer();
1835   }
1836   CkpvAccess(_trace)->endComputation();
1837   // no more tracing for projections on this processor after this point. 
1838   // Note that clear must be called after remove, or bad things will happen.
1839   CkpvAccess(_traces)->removeTrace(CkpvAccess(_trace));
1840   CkpvAccess(_traces)->clearTrace();
1841
1842   // From this point, we start multiple chains of reductions and broadcasts to
1843   // perform final online analysis activities.
1844
1845   // Start all parallel operations at once. 
1846   //   These MUST NOT modify base performance data in LogPool. If they must,
1847   //   then the parallel operations must be phased (and this code to be
1848   //   restructured as necessary)
1849   CProxy_KMeansBOC kMeansProxy(kMeansGID);
1850   CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1851   if (findOutliers) {
1852     parModulesRemaining++;
1853     kMeansProxy[CkMyPe()].startKMeansAnalysis();
1854   }
1855   parModulesRemaining++;
1856   if (findStartTime) 
1857   bocProxy[CkMyPe()].startTimeAnalysis();
1858   else
1859   bocProxy[CkMyPe()].startEndTimeAnalysis();
1860 }
1861
1862 // Called on each processor
1863 void KMeansBOC::startKMeansAnalysis() {
1864   // Initialize all necessary structures
1865   LogPool *pool = CkpvAccess(_trace)->_logPool;
1866
1867  if(CkMyPe()==0)     CkPrintf("[%d] KMeansBOC::startKMeansAnalysis time=\t%g\n", CkMyPe(), CkWallTimer() );
1868   int flushInt = 0;
1869   if (pool->hasFlushed) {
1870     flushInt = 1;
1871   }
1872   
1873   CkCallback cb(CkIndex_KMeansBOC::flushCheck(NULL), 
1874                 0, thisProxy);
1875   contribute(sizeof(int), &flushInt, CkReduction::logical_or, cb);  
1876 }
1877
1878 // Called on processor 0
1879 void KMeansBOC::flushCheck(CkReductionMsg *msg) {
1880   int someFlush = *((int *)msg->getData());
1881
1882   // if(CkMyPe()==0) CkPrintf("[%d] KMeansBOC::flushCheck time=\t%g\n", CkMyPe(), CkWallTimer() );
1883   
1884   if (someFlush == 0) {
1885     // Data intact proceed with KMeans analysis
1886     CProxy_KMeansBOC kMeansProxy(kMeansGID);
1887     kMeansProxy.flushCheckDone();
1888   } else {
1889     // Some processor had flushed it data at some point, abandon KMeans
1890     CkPrintf("Warning: Some processor has flushed its data. No KMeans will be conducted\n");
1891     // terminate KMeans
1892     CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1893     bocProxy[0].kMeansDone();
1894   }
1895 }
1896
1897 // Called on each processor
1898 void KMeansBOC::flushCheckDone() {
1899   // **FIXME** - more flexible metric collection scheme may be necessary
1900   //   in the future for production use.
1901   LogPool *pool = CkpvAccess(_trace)->_logPool;
1902
1903   // if(CkMyPe()==0)     CkPrintf("[%d] KMeansBOC::flushCheckDone time=\t%g\n", CkMyPe(), CkWallTimer() );
1904
1905   numEntryMethods = _entryTable.size();
1906   numMetrics = numEntryMethods + 2; // EPtime + idle and overhead
1907
1908   // maintained across phases
1909   markedBegin = false;
1910   markedIdle = false;
1911   beginBlockTime = 0.0;
1912   beginIdleBlockTime = 0.0;
1913   lastBeginEPIdx = -1; // none
1914
1915   lastPhaseIdx = 0;
1916   currentExecTimes = NULL;
1917   currentPhase = 0;
1918   selected = false;
1919
1920   pool->initializePhases();
1921
1922   // incoming K Seeds and the per-phase filter
1923   incKSeeds = new double[numK*numMetrics];
1924   keepMetric = new bool[numMetrics];
1925
1926   //  Something wrong when call thisProxy[CkMyPe()].getNextPhaseMetrics() !??!
1927   //  CProxy_KMeansBOC kMeansProxy(kMeansGID);
1928   //  kMeansProxy[CkMyPe()].getNextPhaseMetrics();
1929   thisProxy[CkMyPe()].getNextPhaseMetrics();
1930 }
1931
1932 // Called on each processor.
1933 void KMeansBOC::getNextPhaseMetrics() {
1934   // Assumes the presence of the complete logs on this processor.
1935   // Assumes first event is always BEGIN_COMPUTATION
1936   // Assumes each processor sees the same number of phases.
1937   //
1938   // In this code, we collect performance data for this processor.
1939   // All times are in seconds.
1940
1941   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::getNextPhaseMetrics time=\t%g\n", CkMyPe(), CkWallTimer() );  
1942
1943   if (usePhases) {
1944     DEBUGF("[%d] Using Phases\n", CkMyPe());
1945   } else {
1946     DEBUGF("[%d] NOT using Phases\n", CkMyPe());
1947   }
1948   
1949   if (currentExecTimes != NULL) {
1950     delete [] currentExecTimes;
1951   }
1952   currentExecTimes = new double[numMetrics];
1953   for (int i=0; i<numMetrics; i++) {
1954     currentExecTimes[i] = 0.0;
1955   }
1956
1957   int numEventMethods = _entryTable.size();
1958   LogPool *pool = CkpvAccess(_trace)->_logPool;
1959   
1960   CkAssert(pool->numEntries > lastPhaseIdx);
1961   double totalPhaseTime = 0.0;
1962   double totalActiveTime = 0.0; // entry method + idle
1963
1964   for (int i=lastPhaseIdx; i<pool->numEntries; i++) {
1965     if (pool->pool[i].type == BEGIN_PROCESSING) {
1966       // check pairing
1967       if (!markedBegin) {
1968         markedBegin = true;
1969       }
1970       beginBlockTime = pool->pool[i].time;
1971       lastBeginEPIdx = pool->pool[i].eIdx;
1972     } else if (pool->pool[i].type == END_PROCESSING) {
1973       // check pairing
1974       // if End without a begin, just ignore
1975       //   this event. If a phase-boundary is crossed, the Begin
1976       //   event would be maintained in beginBlockTime, so it is 
1977       //   not a problem.
1978       if (markedBegin) {
1979         markedBegin = false;
1980         if (pool->pool[i].event < 0)
1981         {
1982           // ignore dummy events. **FIXME** as they have no eIdx?
1983           continue;
1984         }
1985         currentExecTimes[pool->pool[i].eIdx] += 
1986           pool->pool[i].time - beginBlockTime;
1987         totalActiveTime += pool->pool[i].time - beginBlockTime;
1988         lastBeginEPIdx = -1;
1989       }
1990     } else if (pool->pool[i].type == BEGIN_IDLE) {
1991       // check pairing
1992       if (!markedIdle) {
1993         markedIdle = true;
1994       }
1995       beginIdleBlockTime = pool->pool[i].time;
1996     } else if (pool->pool[i].type == END_IDLE) {
1997       // check pairing
1998       if (markedIdle) {
1999         markedIdle = false;
2000         currentExecTimes[numEventMethods] += 
2001           pool->pool[i].time - beginIdleBlockTime;
2002         totalActiveTime += pool->pool[i].time - beginIdleBlockTime;
2003       }
2004     } else if (pool->pool[i].type == END_PHASE) {
2005       // ignored when not using phases
2006       if (usePhases) {
2007         // when we've not visited this node before
2008         if (i != lastPhaseIdx) { 
2009           totalPhaseTime = 
2010             pool->pool[i].time - pool->pool[lastPhaseIdx].time;
2011           // it is important that proper accounting of time take place here.
2012           // Note that END_PHASE events inevitably occur in the context of
2013           //   some entry method by the way the tracing API is designed.
2014           if (markedBegin) {
2015             CkAssert(lastBeginEPIdx >= 0);
2016             currentExecTimes[lastBeginEPIdx] += 
2017               pool->pool[i].time - beginBlockTime;
2018             totalActiveTime += pool->pool[i].time - beginBlockTime;
2019             // this is so the remainder contributes to the next phase
2020             beginBlockTime = pool->pool[i].time;
2021           }
2022           // The following is unlikely, but stranger things have happened.
2023           if (markedIdle) {
2024             currentExecTimes[numEventMethods] +=
2025               pool->pool[i].time - beginIdleBlockTime;
2026             totalActiveTime += pool->pool[i].time - beginIdleBlockTime;
2027             // this is so the remainder contributes to the next phase
2028             beginIdleBlockTime = pool->pool[i].time;
2029           }
2030           if (totalActiveTime <= totalPhaseTime) {
2031             currentExecTimes[numEventMethods+1] = 
2032               totalPhaseTime - totalActiveTime;
2033           } else {
2034             currentExecTimes[numEventMethods+1] = 0.0;
2035             CkPrintf("[%d] Warning: Overhead found to be negative for Phase %d!\n",
2036                      CkMyPe(), currentPhase);
2037           }
2038           collectKMeansData();
2039           // end the loop (and method) and defer the work till the next call
2040           lastPhaseIdx = i;
2041           break; 
2042         }
2043       }
2044     } else if (pool->pool[i].type == END_COMPUTATION) {
2045       if (markedBegin) {
2046         CkAssert(lastBeginEPIdx >= 0);
2047         currentExecTimes[lastBeginEPIdx] += 
2048           pool->pool[i].time - beginBlockTime;
2049         totalActiveTime += pool->pool[i].time - beginBlockTime;
2050       }
2051       if (markedIdle) {
2052         currentExecTimes[numEventMethods] +=
2053           pool->pool[i].time - beginIdleBlockTime;
2054         totalActiveTime += pool->pool[i].time - beginIdleBlockTime;
2055       }
2056       totalPhaseTime = 
2057         pool->pool[i].time - pool->pool[lastPhaseIdx].time;
2058       if (totalActiveTime <= totalPhaseTime) {
2059         currentExecTimes[numEventMethods+1] = totalPhaseTime - totalActiveTime;
2060       } else {
2061         currentExecTimes[numEventMethods+1] = 0.0;
2062         CkPrintf("[%d] Warning: Overhead found to be negative!\n",
2063                  CkMyPe());
2064       }
2065       collectKMeansData();
2066     }
2067   }
2068 }
2069
2070 /**
2071  *  Through a reduction, collectKMeansData aggregates each processors' data
2072  *  in order for global properties to be determined:
2073  *  
2074  *  1. min & max to determine normalization factors.
2075  *  2. sum to determine global EP averages for possible metric reduction
2076  *       through thresholding.
2077  *  3. sum of squares to compute stddev which may be useful in the future.
2078  *
2079  *  collectKMeansData will also keep the processor's data for the current
2080  *    phase so that it may be normalized and worked on subsequently.
2081  *
2082  **/
2083 void KMeansBOC::collectKMeansData() {
2084   int minOffset = numMetrics;
2085   int maxOffset = 2*numMetrics;
2086   int sosOffset = 3*numMetrics; // sos = Sum Of Squares
2087
2088   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::collectKMeansData time=\tg\n", CkMyPe(), CkWallTimer() );
2089
2090   double *reductionMsg = new double[numMetrics*4];
2091
2092   for (int i=0; i<numMetrics; i++) {
2093     reductionMsg[i] = currentExecTimes[i];
2094     // duplicate the event times for max and min sections of the reduction
2095     reductionMsg[minOffset + i] = currentExecTimes[i];
2096     reductionMsg[maxOffset + i] = currentExecTimes[i];
2097     // compute squares
2098     reductionMsg[sosOffset + i] = currentExecTimes[i]*currentExecTimes[i];
2099   }
2100
2101   CkCallback cb(CkIndex_KMeansBOC::globalMetricRefinement(NULL), 
2102                 0, thisProxy);
2103   contribute((numMetrics*4)*sizeof(double), reductionMsg, 
2104              outlierReductionType, cb);  
2105 }
2106
2107 // The purpose is mainly to initialize the k seeds and generate
2108 //   normalization parameters for each of the metrics. The k seeds
2109 //   and normalization parameters are broadcast to all processors.
2110 //
2111 // Called on processor 0
2112 void KMeansBOC::globalMetricRefinement(CkReductionMsg *msg) {
2113   CkAssert(CkMyPe() == 0);
2114   
2115   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::globalMetricRefinement time=\t%g\n", CkMyPe(), CkWallTimer() );
2116
2117   int sumOffset = 0;
2118   int minOffset = numMetrics;
2119   int maxOffset = 2*numMetrics;
2120   int sosOffset = 3*numMetrics; // sos = Sum Of Squares
2121
2122   // calculate statistics & boundaries for the k seeds for clustering
2123   KMeansStatsMessage *outmsg = 
2124     new (numMetrics, numK*numMetrics, numMetrics*4) KMeansStatsMessage;
2125   outmsg->numMetrics = numMetrics;
2126   outmsg->numKPos = numK*numMetrics;
2127   outmsg->numStats = numMetrics*4;
2128
2129   // Sum | Min | Max | Sum of Squares
2130   double *totalExecTimes = (double *)msg->getData();
2131   double totalTime = 0.0;
2132
2133   for (int i=0; i<numMetrics; i++) {
2134     DEBUGN("%lf\n", totalExecTimes[i]);
2135     totalTime += totalExecTimes[i];
2136
2137     // calculate event mean over all processors
2138     outmsg->stats[sumOffset + i] = totalExecTimes[sumOffset + i]/CkNumPes();
2139
2140     // get the ranges and offsets of each metric. With this, we get
2141     //   normalization factors that can be sent back to each processor to
2142     //   be used as necessary. We reuse max for range. Min remains the offset.
2143     outmsg->stats[minOffset + i] = totalExecTimes[minOffset + i];
2144     outmsg->stats[maxOffset + i] = totalExecTimes[maxOffset + i] -
2145       totalExecTimes[minOffset + i];
2146     
2147     // calculate stddev (using biased variance)
2148     outmsg->stats[sosOffset + i] = 
2149       sqrt((totalExecTimes[sosOffset + i] - 
2150             2*(outmsg->stats[i])*totalExecTimes[i] +
2151             (outmsg->stats[i])*(outmsg->stats[i])*CkNumPes())/
2152            CkNumPes());
2153   }
2154
2155   for (int i=0; i<numMetrics; i++) {
2156     // 1) if the proportion of the max value of the entry method relative to
2157     //   the average time taken over all entry methods across all processors
2158     //   is greater than the stipulated percentage threshold ...; AND
2159     // 2) if the range of values are non-zero.
2160     //
2161     // The current assumption is totalTime > 0 (what program has zero total
2162     //   time from all work?)
2163     keepMetric[i] = ((totalExecTimes[maxOffset + i]/(totalTime/CkNumPes()) >=
2164                      entryThreshold) &&
2165       (totalExecTimes[maxOffset + i] > totalExecTimes[minOffset + i]));
2166     if (keepMetric[i]) {
2167       DEBUGF("[%d] Keep EP %d | Max = %lf | Avg Tot = %lf\n", CkMyPe(), i,
2168              totalExecTimes[maxOffset + i], totalTime/CkNumPes());
2169     } else {
2170       DEBUGN("[%d] DO NOT Keep EP %d\n", CkMyPe(), i);
2171     }
2172     outmsg->filter[i] = keepMetric[i];
2173   }
2174
2175   delete msg;
2176   
2177   // initialize k seeds for this phase
2178   kSeeds = new double[numK*numMetrics];
2179
2180   numKReported = 0;
2181   kNumMembers = new int[numK];
2182
2183   // Randomly select k processors' metric vectors for the k seeds
2184   //  srand((unsigned)(CmiWallTimer()*1.0e06));
2185   srand(11337); // for debugging purposes
2186   for (int k=0; k<numK; k++) {
2187     DEBUGF("Seed %d | ", k);
2188     for (int m=0; m<numMetrics; m++) {
2189       double factor = totalExecTimes[maxOffset + m] - 
2190         totalExecTimes[minOffset + m];
2191       // "uniform" distribution, scaled according to the normalization
2192       //   factors
2193       //      kSeeds[numMetrics*k + m] = ((1.0*(k+1))/numK)*factor;
2194       // Random distribution.
2195       kSeeds[numMetrics*k + m] =
2196         ((rand()*1.0)/RAND_MAX)*factor;
2197       if (keepMetric[m]) {
2198         DEBUGF("[%d|%lf] ", m, kSeeds[numMetrics*k + m]);
2199       }
2200       outmsg->kSeedsPos[numMetrics*k + m] = kSeeds[numMetrics*k + m];
2201     }
2202     DEBUGF("\n");
2203     kNumMembers[k] = 0;
2204   }
2205
2206   // broadcast statistical values to all processors for cluster discovery
2207   thisProxy.findInitialClusters(outmsg);
2208 }
2209
2210
2211
2212 // Called on each processor.
2213 void KMeansBOC::findInitialClusters(KMeansStatsMessage *msg) {
2214
2215  if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::findInitialClusters time=\t%g\n", CkMyPe(), CkWallTimer() );
2216
2217   phaseIter = 0;
2218
2219   // Get info from stats message
2220   CkAssert(numMetrics == msg->numMetrics);
2221   for (int i=0; i<numMetrics; i++) {
2222     keepMetric[i] = msg->filter[i];
2223   }
2224
2225   // Normalize data on local processor.
2226   // **CWL** See my thesis for detailed discussion of normalization of
2227   //    performance data.
2228   // **NOTE** This might change if we want to send data based on the filter
2229   //   instead of all the data.
2230   CkAssert(numMetrics*4 == msg->numStats);
2231   for (int i=0; i<numMetrics; i++) {
2232     currentExecTimes[i] -= msg->stats[numMetrics + i];  // take offset
2233     // **CWL** We do not normalize the range. Entry methods that exhibit
2234     //   large absolute timing variations should be allowed to contribute
2235     //   more to the Euclidean distance measure!
2236     // currentExecTimes[i] /= msg->stats[2*numMetrics + i];
2237   }
2238
2239   // **NOTE** This might change if we want to send data based on the filter
2240   //   instead of all the data.
2241   CkAssert(numK*numMetrics == msg->numKPos);
2242   for (int i=0; i<msg->numKPos; i++) {
2243     incKSeeds[i] = msg->kSeedsPos[i];
2244   }
2245
2246   // Decide which KSeed this processor belongs to.
2247   minDistance = calculateDistance(0);
2248   DEBUGN("[%d] Phase %d Iter %d | Distance from 0 = %lf \n", CkMyPe(), 
2249            currentPhase, phaseIter, minDistance);
2250   minK = 0;
2251   for (int i=1; i<numK; i++) {
2252     double distance = calculateDistance(i);
2253     DEBUGN("[%d] Phase %d Iter %d | Distance from %d = %lf \n", CkMyPe(), 
2254              currentPhase, phaseIter, i, distance);
2255     if (distance < minDistance) {
2256       minDistance = distance;
2257       minK = i;
2258     }
2259   }
2260
2261   // Set up a reduction with the modification vector to the root (0).
2262   //
2263   // The modification vector sends a negative value for each metric
2264   //   for the K this processor no longer belongs to and a positive
2265   //   value to the K the processor now belongs. In addition, a -1.0
2266   //   is sent to the K it is leaving and a +1.0 to the K it is 
2267   //   joining.
2268   //
2269   // The processor must still contribute a "zero returns" even if
2270   //   nothing changes. This will be the basis for determine
2271   //   convergence at the root.
2272   //
2273   // The addtional +1 is meant for the count-change that must be
2274   //   maintained for the special cases at the root when some K
2275   //   may be deprived of all processor points or go from 0 to a
2276   //   positive number of processors (see later comments).
2277   double *modVector = new double[numK*(numMetrics+1)];
2278   for (int i=0; i<numK; i++) {
2279     for (int j=0; j<numMetrics+1; j++) {
2280       modVector[i*(numMetrics+1) + j] = 0.0;
2281     }
2282   }
2283   for (int i=0; i<numMetrics; i++) {
2284     // for this initialization, only positive values need be sent.
2285     modVector[minK*(numMetrics+1) + i] = currentExecTimes[i];
2286   }
2287   modVector[minK*(numMetrics+1)+numMetrics] = 1.0;
2288
2289   CkCallback cb(CkIndex_KMeansBOC::updateKSeeds(NULL), 
2290                 0, thisProxy);
2291   contribute(numK*(numMetrics+1)*sizeof(double), modVector, 
2292              CkReduction::sum_double, cb);  
2293 }
2294
2295 double KMeansBOC::calculateDistance(int k) {
2296   double ret = 0.0;
2297   for (int i=0; i<numMetrics; i++) {
2298     if (keepMetric[i]) {
2299       DEBUGN("[%d] Phase %d Iter %d Metric %d Exec %lf Seed %lf \n", 
2300              CkMyPe(), currentPhase, phaseIter, i,
2301                currentExecTimes[i], incKSeeds[k*numMetrics + i]);
2302       ret += pow(currentExecTimes[i] - incKSeeds[k*numMetrics + i], 2.0);
2303     }
2304   }
2305   return sqrt(ret);
2306 }
2307
2308 void KMeansBOC::updateKSeeds(CkReductionMsg *msg) {
2309   CkAssert(CkMyPe() == 0);
2310
2311   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::updateKSeeds time=\t%g\n", CkMyPe(), CkWallTimer() );
2312
2313   double *modVector = (double *)msg->getData();
2314   // sanity check
2315   CkAssert(numK*(numMetrics+1)*sizeof(double) == msg->getSize());
2316
2317   // A quick convergence test.
2318   bool hasChanges = false;
2319   for (int i=0; i<numK; i++) {
2320     hasChanges = hasChanges || 
2321       (modVector[i*(numMetrics+1) + numMetrics] != 0.0);
2322   }
2323   if (!hasChanges) {
2324     delete msg;
2325     findRepresentatives();
2326   } else {
2327     int overallChange = 0;
2328     for (int i=0; i<numK; i++) {
2329       int change = (int)modVector[i*(numMetrics+1) + numMetrics];
2330       if (change != 0) {
2331         overallChange += change;
2332         // modify the k seeds based on the modification vectors coming in
2333         //
2334         // If a seed initially has no members, its contents do not matter and
2335         //   is simply set to the average of the incoming vector.
2336         // If the change causes a seed to lose all its members, do nothing.
2337         //   Its last-known location is kept to allow it to re-capture
2338         //   membership at the next iteration rather than apply the last
2339         //   changes (which snaps the point unnaturally to 0,0).
2340         // Otherwise, apply the appropriate vector changes.
2341         CkAssert((kNumMembers[i] + change >= 0) &&
2342                  (kNumMembers[i] + change <= CkNumPes()));
2343         if (kNumMembers[i] == 0) {
2344           CkAssert(change > 0);
2345           for (int j=0; j<numMetrics; j++) {
2346             kSeeds[i*numMetrics + j] = modVector[i*(numMetrics+1) + j]/change;
2347           }
2348         } else if (kNumMembers[i] + change == 0) {
2349           // do nothing.
2350         } else {
2351           for (int j=0; j<numMetrics; j++) {
2352             kSeeds[i*numMetrics + j] *= kNumMembers[i];
2353             kSeeds[i*numMetrics + j] += modVector[i*(numMetrics+1) + j];
2354             kSeeds[i*numMetrics + j] /= kNumMembers[i] + change;
2355           }
2356         }
2357         kNumMembers[i] += change;
2358       }
2359       DEBUGN("[%d] Phase %d Iter %d K = %d Membership Count = %d\n",
2360              CkMyPe(), currentPhase, phaseIter, i, kNumMembers[i]);
2361     }
2362     delete msg;
2363
2364     // broadcast the new seed locations.
2365     KSeedsMessage *outmsg = new (numK*numMetrics) KSeedsMessage;
2366     outmsg->numKPos = numK*numMetrics;
2367     for (int i=0; i<numK*numMetrics; i++) {
2368       outmsg->kSeedsPos[i] = kSeeds[i];
2369     }
2370
2371     thisProxy.updateSeedMembership(outmsg);
2372   }
2373 }
2374
2375 // Called on all processors
2376 void KMeansBOC::updateSeedMembership(KSeedsMessage *msg) {
2377
2378   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::updateSeedMembership time=\t%g\n", CkMyPe(), CkWallTimer() );
2379
2380   phaseIter++;
2381
2382   // **NOTE** This might change if we want to send data based on the filter
2383   //   instead of all the data.
2384   CkAssert(numK*numMetrics == msg->numKPos);
2385   for (int i=0; i<msg->numKPos; i++) {
2386     incKSeeds[i] = msg->kSeedsPos[i];
2387   }
2388
2389   // Decide which KSeed this processor belongs to.
2390   lastMinK = minK;
2391   minDistance = calculateDistance(0);
2392   DEBUGN("[%d] Phase %d Iter %d | Distance from 0 = %lf \n", CkMyPe(), 
2393          currentPhase, phaseIter, minDistance);
2394
2395   minK = 0;
2396   for (int i=1; i<numK; i++) {
2397     double distance = calculateDistance(i);
2398     DEBUGN("[%d] Phase %d Iter %d | Distance from %d = %lf \n", CkMyPe(), 
2399            currentPhase, phaseIter, i, distance);
2400     if (distance < minDistance) {
2401       minDistance = distance;
2402       minK = i;
2403     }
2404   }
2405
2406   double *modVector = new double[numK*(numMetrics+1)];
2407   for (int i=0; i<numK; i++) {
2408     for (int j=0; j<numMetrics+1; j++) {
2409       modVector[i*(numMetrics+1) + j] = 0.0;
2410     }
2411   }
2412
2413   if (minK != lastMinK) {
2414     for (int i=0; i<numMetrics; i++) {
2415       modVector[minK*(numMetrics+1) + i] = currentExecTimes[i];
2416       modVector[lastMinK*(numMetrics+1) + i] = -currentExecTimes[i];
2417     }
2418     modVector[minK*(numMetrics+1)+numMetrics] = 1.0;
2419     modVector[lastMinK*(numMetrics+1)+numMetrics] = -1.0;
2420   }
2421
2422   CkCallback cb(CkIndex_KMeansBOC::updateKSeeds(NULL), 
2423                 0, thisProxy);
2424   contribute(numK*(numMetrics+1)*sizeof(double), modVector, 
2425              CkReduction::sum_double, cb);  
2426 }
2427
2428 void KMeansBOC::findRepresentatives() {
2429
2430   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::findRepresentatives time=\t%g\n", CkMyPe(), CkWallTimer() );
2431
2432   int numNonEmptyClusters = 0;
2433   for (int i=0; i<numK; i++) {
2434     if (kNumMembers[i] > 0) {
2435       numNonEmptyClusters++;
2436     }
2437   }
2438
2439   int numRepresentatives = peNumKeep;
2440   // **FIXME**
2441   // This is fairly arbitrary. Next time, choose the centers of the top
2442   //   largest clusters.
2443   if (numRepresentatives < numNonEmptyClusters) {
2444     numRepresentatives = numNonEmptyClusters;
2445   }
2446
2447   int slotsRemaining = numRepresentatives;
2448
2449   DEBUGF("Slots = %d | Non-empty = %d \n", slotsRemaining, 
2450          numNonEmptyClusters);
2451
2452   // determine how many exemplars to select per cluster. Currently
2453   //   hardcoded to 1. Future challenge is to decide on other numbers
2454   //   or proportionality.
2455   //
2456   int exemplarsPerCluster = 1;
2457   slotsRemaining -= exemplarsPerCluster*numNonEmptyClusters;
2458
2459   int numCandidateOutliers = CkNumPes() - 
2460     exemplarsPerCluster*numNonEmptyClusters;
2461
2462   double *remainders = new double[numK];
2463   int *assigned = new int[numK];
2464   exemplarChoicesLeft = new int[numK];
2465   outlierChoicesLeft = new int[numK];
2466
2467   for (int i=0; i<numK; i++) {
2468     assigned[i] = 0;
2469     remainders[i] = 
2470       (kNumMembers[i] - exemplarsPerCluster*numNonEmptyClusters) *
2471       slotsRemaining / numCandidateOutliers;
2472     if (remainders[i] >= 0.0) {
2473       assigned[i] = (int)floor(remainders[i]);
2474       remainders[i] -= assigned[i];
2475     } else {
2476       remainders[i] = 0.0;
2477     }
2478   }
2479
2480   for (int i=0; i<numK; i++) {
2481     slotsRemaining -= assigned[i];
2482   }
2483   CkAssert(slotsRemaining >= 0);
2484
2485   // find clusters to assign the loose slots to, in order of
2486   // remainder proportion
2487   while (slotsRemaining > 0) {
2488     double max = 0.0;
2489     int winner = 0;
2490     for (int i=0; i<numK; i++) {
2491       if (remainders[i] > max) {
2492         max = remainders[i];
2493         winner = i;
2494       }
2495     }
2496     assigned[winner]++;
2497     remainders[winner] = 0.0;
2498     slotsRemaining--;
2499   }
2500
2501   // set up how many reduction cycles of min/max we need to conduct to
2502   // select the representatives.
2503   numSelectionIter = exemplarsPerCluster;
2504   for (int i=0; i<numK; i++) {
2505     if (assigned[i] > numSelectionIter) {
2506       numSelectionIter = assigned[i];
2507     }
2508   }
2509   DEBUGF("Selection Iterations = %d\n", numSelectionIter);
2510
2511   for (int i=0; i<numK; i++) {
2512     if (kNumMembers[i] > 0) {
2513       exemplarChoicesLeft[i] = exemplarsPerCluster;
2514       outlierChoicesLeft[i] = assigned[i];
2515     } else {
2516       exemplarChoicesLeft[i] = 0;
2517       outlierChoicesLeft[i] = 0;
2518     }
2519     DEBUGF("%d | Exemplar = %d | Outlier = %d\n", i, exemplarChoicesLeft[i],
2520            outlierChoicesLeft[i]);
2521   }
2522
2523   delete [] assigned;
2524   delete [] remainders;
2525
2526   // send out first broadcast
2527   KSelectionMessage *outmsg = NULL;
2528   if (numSelectionIter > 0) {
2529     outmsg = new (numK, numK, numK) KSelectionMessage;
2530     outmsg->numKMinIDs = numK;
2531     outmsg->numKMaxIDs = numK;
2532     for (int i=0; i<numK; i++) {
2533       outmsg->minIDs[i] = -1;
2534       outmsg->maxIDs[i] = -1;
2535     }
2536     thisProxy.collectDistances(outmsg);
2537   } else {
2538     CkPrintf("Warning: No selection iteration from the start!\n");
2539     // invoke phase completion on all processors
2540     thisProxy.phaseDone();
2541   }
2542 }
2543
2544 /*
2545  *  lastMin = array of minimum champions of the last tournament
2546  *  lastMax = array of maximum champions of the last tournament
2547  *  lastMaxVal = array of last encountered maximum values, allows previous
2548  *                 minimum winners to eliminate themselves from the next
2549  *                 minimum race.
2550  *
2551  *  Called on all processors.
2552  */
2553 void KMeansBOC::collectDistances(KSelectionMessage *msg) {
2554
2555   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::collectDistances time=\t%g\n", CkMyPe(), CkWallTimer() );
2556
2557   DEBUGF("[%d] %d | min = %d max = %d\n", CkMyPe(),
2558          lastMinK, msg->minIDs[lastMinK], msg->maxIDs[lastMinK]);
2559   if ((CkMyPe() == msg->minIDs[lastMinK]) || 
2560       (CkMyPe() == msg->maxIDs[lastMinK])) {
2561     CkAssert(!selected);
2562     selected = true;
2563   }
2564
2565   // build outgoing reduction structure
2566   //   format = minVal | ID | maxVal | ID
2567   double *minMaxAndIDs = NULL;
2568
2569   minMaxAndIDs = new double[numK*4];
2570   // initialize to the appropriate out-of-band values (for error checks)
2571   for (int i=0; i<numK; i++) {
2572     minMaxAndIDs[i*4] = -1.0; // out-of-band min value
2573     minMaxAndIDs[i*4+1] = -1.0; // out of band ID
2574     minMaxAndIDs[i*4+2] = -1.0; // out-of-band max value
2575     minMaxAndIDs[i*4+3] = -1.0; // out of band ID
2576   }
2577   // If I have not won before, I put myself back into the competition
2578   if (!selected) {
2579     DEBUGF("[%d] My Contribution = %lf\n", CkMyPe(), minDistance);
2580     minMaxAndIDs[lastMinK*4] = minDistance;
2581     minMaxAndIDs[lastMinK*4+1] = CkMyPe();
2582     minMaxAndIDs[lastMinK*4+2] = minDistance;
2583     minMaxAndIDs[lastMinK*4+3] = CkMyPe();
2584   }
2585   delete msg;
2586
2587   CkCallback cb(CkIndex_KMeansBOC::findNextMinMax(NULL), 
2588                 0, thisProxy);
2589   contribute(numK*4*sizeof(double), minMaxAndIDs, 
2590              minMaxReductionType, cb);  
2591 }
2592
2593 void KMeansBOC::findNextMinMax(CkReductionMsg *msg) {
2594   // incoming format:
2595   //   minVal | minID | maxVal | maxID
2596
2597   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::findNextMinMax time=\t%g\n", CkMyPe(), CkWallTimer() );
2598
2599   if (numSelectionIter > 0) {
2600     double *incInfo = (double *)msg->getData();
2601     
2602     KSelectionMessage *outmsg = new (numK, numK) KSelectionMessage;
2603     outmsg->numKMinIDs = numK;
2604     outmsg->numKMaxIDs = numK;
2605     
2606     for (int i=0; i<numK; i++) {
2607       DEBUGF("%d | %lf %d %lf %d \n", i, 
2608              incInfo[i*4], (int)incInfo[i*4+1], 
2609              incInfo[i*4+2], (int)incInfo[i*4+3]);
2610     }
2611
2612     for (int i=0; i<numK; i++) {
2613       if (exemplarChoicesLeft[i] > 0) {
2614         outmsg->minIDs[i] = (int)incInfo[i*4+1];
2615         exemplarChoicesLeft[i]--;
2616       } else {
2617         outmsg->minIDs[i] = -1;
2618       }
2619       if (outlierChoicesLeft[i] > 0) {
2620         outmsg->maxIDs[i] = (int)incInfo[i*4+3];
2621         outlierChoicesLeft[i]--;
2622       } else {
2623         outmsg->maxIDs[i] = -1;
2624       }
2625     }
2626     thisProxy.collectDistances(outmsg);
2627     numSelectionIter--;
2628   } else {
2629     // invoke phase completion on all processors
2630     thisProxy.phaseDone();
2631   }
2632 }
2633
2634 /**
2635  *  Completion of the K-Means clustering and data selection of one phase
2636  *    of the computation.
2637  *
2638  *  Called on every processor.
2639  */
2640 void KMeansBOC::phaseDone() {
2641
2642   //  if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::phaseDone time=\t%g\n", CkMyPe(), CkWallTimer() );
2643
2644   LogPool *pool = CkpvAccess(_trace)->_logPool;
2645   CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
2646
2647   // now decide on what to do with the decision.
2648   if (!selected) {
2649     if (usePhases) {
2650       pool->keepPhase[currentPhase] = false;
2651     } else {
2652       // if not using phases, we're working on the whole log
2653       pool->setAllPhases(false);
2654     }
2655   }
2656
2657   // **FIXME** (?) - All processors have to agree on this, or the reduction
2658   //   will not be correct! The question is "is this enforcible?"
2659   if ((currentPhase == (pool->numPhases-1)) || !usePhases) {
2660     // We're done
2661     int dummy = 0;
2662     CkCallback cb(CkIndex_TraceProjectionsBOC::kMeansDone(NULL), 
2663                   0, bocProxy);
2664     contribute(sizeof(int), &dummy, CkReduction::sum_int, cb);
2665   } else {
2666     // reset all phase-based k-means data and decisions
2667
2668     // **FIXME**!!!!!    
2669     
2670     // invoke the next K-Means computation phase.
2671     currentPhase++;
2672     thisProxy[CkMyPe()].getNextPhaseMetrics();
2673   }
2674 }
2675
2676 void TraceProjectionsBOC::startTimeAnalysis()
2677 {
2678   double startTime = 0.0;
2679   if (CkpvAccess(_trace)->_logPool->numEntries>0)
2680      startTime = CkpvAccess(_trace)->_logPool->pool[0].time;
2681   CkCallback cb(CkIndex_TraceProjectionsBOC::startTimeDone(NULL), thisProxy);
2682   contribute(sizeof(double), &startTime, CkReduction::min_double, cb);  
2683 }
2684
2685 void TraceProjectionsBOC::startTimeDone(CkReductionMsg *msg)
2686 {
2687   // CkPrintf("[%d] TraceProjectionsBOC::startTimeDone time=\t%g parModulesRemaining:%d\n", CkMyPe(), CkWallTimer(), parModulesRemaining);
2688
2689   if (CkpvAccess(_trace) != NULL) {
2690     CkpvAccess(_trace)->_logPool->globalStartTime = *(double *)msg->getData();
2691     CkpvAccess(_trace)->_logPool->setNewStartTime();
2692     //if (CkMyPe() == 0) CkPrintf("Start time determined to be %lf us\n", (CkpvAccess(_trace)->_logPool->globalStartTime)*1e06);
2693   }
2694   delete msg;
2695   thisProxy[CkMyPe()].startEndTimeAnalysis();
2696 }
2697
2698 void TraceProjectionsBOC::startEndTimeAnalysis()
2699 {
2700  //CkPrintf("[%d] TraceProjectionsBOC::startEndTimeAnalysis time=\t%g\n", CkMyPe(), CkWallTimer() );
2701
2702   endTime = CkpvAccess(_trace)->endTime;
2703   // CkPrintf("[%d] End time is %lf us\n", CkMyPe(), endTime*1e06);
2704
2705   CkCallback cb(CkIndex_TraceProjectionsBOC::endTimeDone(NULL), 
2706                 0, thisProxy);
2707   contribute(sizeof(double), &endTime, CkReduction::max_double, cb);  
2708 }
2709
2710 void TraceProjectionsBOC::endTimeDone(CkReductionMsg *msg)
2711 {
2712  //if(CkMyPe()==0)    CkPrintf("[%d] TraceProjectionsBOC::endTimeDone time=\t%g parModulesRemaining:%d\n", CkMyPe(), CkWallTimer(), parModulesRemaining);
2713
2714   CkAssert(CkMyPe() == 0);
2715   parModulesRemaining--;
2716   if (CkpvAccess(_trace) != NULL) {
2717     CkpvAccess(_trace)->_logPool->globalEndTime = *(double *)msg->getData() - CkpvAccess(_trace)->_logPool->globalStartTime;
2718     // CkPrintf("End time determined to be %lf us\n",
2719     //       (CkpvAccess(_trace)->_logPool->globalEndTime)*1e06);
2720   }
2721   delete msg;
2722   if (parModulesRemaining == 0) {
2723     thisProxy[CkMyPe()].finalize();
2724   }
2725 }
2726
2727 void TraceProjectionsBOC::kMeansDone(CkReductionMsg *msg) {
2728
2729  if(CkMyPe()==0)  CkPrintf("[%d] TraceProjectionsBOC::kMeansDone time=\t%g\n", CkMyPe(), CkWallTimer() );
2730
2731   CkAssert(CkMyPe() == 0);
2732   parModulesRemaining--;
2733   CkPrintf("K-Means Analysis Time = %lf seconds\n",
2734            CmiWallTimer()-analysisStartTime);
2735   delete msg;
2736   if (parModulesRemaining == 0) {
2737     thisProxy[CkMyPe()].finalize();
2738   }
2739 }
2740
2741 /**
2742  *
2743  *  This version is called (on processor 0) only if flushCheck fails.
2744  *
2745  */
2746 void TraceProjectionsBOC::kMeansDone() {
2747   CkAssert(CkMyPe() == 0);
2748   parModulesRemaining--;
2749   CkPrintf("K-Means Analysis Aborted because of flush. Time taken = %lf seconds\n",
2750            CmiWallTimer()-analysisStartTime);
2751   if (parModulesRemaining == 0) {
2752     thisProxy[CkMyPe()].finalize();
2753   }
2754 }
2755
2756 void TraceProjectionsBOC::finalize()
2757 {
2758   CkAssert(CkMyPe() == 0);
2759   //CkPrintf("Total Analysis Time = %lf seconds\n", 
2760   //       CmiWallTimer()-analysisStartTime);
2761   thisProxy.closingTraces();
2762 }
2763
2764 // Called on every processor
2765 void TraceProjectionsBOC::closingTraces() {
2766   CkpvAccess(_trace)->closeTrace();
2767
2768     // subtle:  reduction needs to go to the PE which started CkExit()
2769   int pe = 0;
2770   if (endPe != -1) pe = endPe;
2771   CkCallback cb(CkIndex_TraceProjectionsBOC::closeParallelShutdown(NULL), 
2772                 pe, thisProxy); 
2773   contribute(0, NULL, CkReduction::sum_int, cb);  
2774 }
2775
2776 // The sole purpose of this reduction is to decide whether or not
2777 //   Projections as a module needs to call CkExit() to get other
2778 //   modules to shutdown.
2779 void TraceProjectionsBOC::closeParallelShutdown(CkReductionMsg *msg) {
2780   CkAssert(endPe == -1 && CkMyPe() ==0 || CkMyPe() == endPe);
2781   delete msg;
2782   // decide if CkExit() needs to be called
2783   if (!CkpvAccess(_trace)->converseExit) {
2784     CkExit();
2785   }
2786 }
2787 /*
2788  *  Registration and definition of the Outlier Reduction callback.
2789  *  Format: Sum | Min | Max | Sum of Squares
2790  */
2791 CkReductionMsg *outlierReduction(int nMsgs,
2792                                  CkReductionMsg **msgs) {
2793   int numBytes = 0;
2794   int numMetrics = 0;
2795   double *ret = NULL;
2796
2797   if (nMsgs == 1) {
2798     // nothing to do, just pass it on
2799     return CkReductionMsg::buildNew(msgs[0]->getSize(),msgs[0]->getData());
2800   }
2801
2802   if (nMsgs > 1) {
2803     numBytes = msgs[0]->getSize();
2804     // sanity checks
2805     if (numBytes%sizeof(double) != 0) {
2806       CkAbort("Outlier Reduction Size incompatible with doubles!\n");
2807     }
2808     if ((numBytes/sizeof(double))%4 != 0) {
2809       CkAbort("Outlier Reduction Size Array not divisible by 4!\n");
2810     }
2811     numMetrics = (numBytes/sizeof(double))/4;
2812     ret = new double[numMetrics*4];
2813
2814     // copy the first message data into the return structure first
2815     for (int i=0; i<numMetrics*4; i++) {
2816       ret[i] = ((double *)msgs[0]->getData())[i];
2817     }
2818
2819     // Sum | Min | Max | Sum of Squares
2820     for (int msgIdx=1; msgIdx<nMsgs; msgIdx++) {
2821       for (int i=0; i<numMetrics; i++) {
2822         // Sum
2823         ret[i] += ((double *)msgs[msgIdx]->getData())[i];
2824         // Min
2825         ret[numMetrics + i] =
2826           (ret[numMetrics + i] < 
2827            ((double *)msgs[msgIdx]->getData())[numMetrics + i]) 
2828           ? ret[numMetrics + i] : 
2829           ((double *)msgs[msgIdx]->getData())[numMetrics + i];
2830         // Max
2831         ret[2*numMetrics + i] = 
2832           (ret[2*numMetrics + i] >
2833            ((double *)msgs[msgIdx]->getData())[2*numMetrics + i])
2834           ? ret[2*numMetrics + i] :
2835           ((double *)msgs[msgIdx]->getData())[2*numMetrics + i];
2836         // Sum of Squares (squaring already done at leaf)
2837         ret[3*numMetrics + i] +=
2838           ((double *)msgs[msgIdx]->getData())[3*numMetrics + i];
2839       }
2840     }
2841   }
2842   
2843   /* apparently, we do not delete the incoming messages */
2844   return CkReductionMsg::buildNew(numBytes,ret);
2845 }
2846
2847 /*
2848  * The only reason we have a user-defined reduction is to support
2849  *   identification of the "winning" processors as well as to handle
2850  *   both the min and the max of each "tournament". A simple min/max
2851  *   discovery cannot handle ties.
2852  */
2853 CkReductionMsg *minMaxReduction(int nMsgs,
2854                                 CkReductionMsg **msgs) {
2855   CkAssert(nMsgs > 0);
2856
2857   int numBytes = msgs[0]->getSize();
2858   CkAssert(numBytes%sizeof(double) == 0);
2859   int numK = (numBytes/sizeof(double))/4;
2860
2861   double *ret = new double[numK*4];
2862   // fill with out-of-band values
2863   for (int i=0; i<numK; i++) {
2864     ret[i*4] = -1.0;
2865     ret[i*4+1] = -1.0;
2866     ret[i*4+2] = -1.0;
2867     ret[i*4+3] = -1.0;
2868   }
2869
2870   // incoming format K * (minVal | minIdx | maxVal | maxIdx)
2871   for (int i=0; i<nMsgs; i++) {
2872     double *temp = (double *)msgs[i]->getData();
2873     for (int j=0; j<numK; j++) {
2874       // no previous valid min
2875       if (ret[j*4+1] < 0) {
2876         // fill it in only if the incoming min is valid
2877         if (temp[j*4+1] >= 0) {
2878           ret[j*4] = temp[j*4];      // fill min value
2879           ret[j*4+1] = temp[j*4+1];  // fill ID
2880         }
2881       } else {
2882         // find Min, only if incoming min is valid
2883         if (temp[j*4+1] >= 0) {
2884           if (temp[j*4] < ret[j*4]) {
2885             ret[j*4] = temp[j*4];      // replace min value
2886             ret[j*4+1] = temp[j*4+1];  // replace ID
2887           }
2888         }
2889       }
2890       // no previous valid max
2891       if (ret[j*4+3] < 0) {
2892         // fill only if the incoming max is valid
2893         if (temp[j*4+3] >= 0) {
2894           ret[j*4+2] = temp[j*4+2];  // fill max value
2895           ret[j*4+3] = temp[j*4+3];  // fill ID
2896         }
2897       } else {
2898         // find Max, only if incoming max is valid
2899         if (temp[j*4+3] >= 0) {
2900           if (temp[j*4+2] > ret[j*4+2]) {
2901             ret[j*4+2] = temp[j*4+2];  // replace max value
2902             ret[j*4+3] = temp[j*4+3];  // replace ID
2903           }
2904         }
2905       }
2906     }
2907   }
2908   CkReductionMsg *redmsg = CkReductionMsg::buildNew(numBytes, ret);
2909   delete [] ret;
2910   return redmsg;
2911 }
2912
2913 #include "TraceProjections.def.h"
2914 #endif //PROJ_ANALYSIS
2915
2916 /*@}*/