More small changes
[charm.git] / src / ck-perf / trace-projections.C
1 /*****************************************************************************
2  * $Source: /cvsroot/charm/src/ck-perf/trace-projections.C,v $
3  * $Author: gioachin $
4  * $Date: 2009-08-20 01:09:41 $
5  * $Revision: 2.134 $
6  *****************************************************************************/
7
8
9 /**
10  * \addtogroup CkPerf
11 */
12 /*@{*/
13
14 #include <string.h>
15
16 #include "charm++.h"
17 #include "trace-projections.h"
18 #include "trace-projectionsBOC.h"
19
20 #if DEBUG_PROJ
21 #define DEBUGF(format, ...) CkPrintf(format, ## __VA_ARGS__)
22 #else
23 #define DEBUGF(format, ...)           // CmiPrintf x
24 #endif
25 #define DEBUGN(format, ...)  // easy way to selectively disable DEBUGs
26
27 #define DefaultLogBufSize      1000000
28
29 // **CW** Simple delta encoding implementation
30 // delta encoding is on by default. It may be turned off later in
31 // the runtime.
32 int deltaLog;
33 int nonDeltaLog;
34
35 int checknested=0;              // check illegal nested begin/end execute 
36
37 #ifdef PROJ_ANALYSIS
38 // BOC operations readonlys
39 CkGroupID traceProjectionsGID;
40 CkGroupID kMeansGID;
41
42 // New reduction type for Outlier Analysis purposes. This is allowed to be
43 // a global variable according to the Charm++ manual.
44 CkReductionMsg *outlierReduction(int nMsgs,
45                                  CkReductionMsg **msgs);
46 CkReductionMsg *minMaxReduction(int nMsgs,
47                                 CkReductionMsg **msgs);
48 CkReduction::reducerType outlierReductionType;
49 CkReduction::reducerType minMaxReductionType;
50 #endif // PROJ_ANALYSIS
51
52 CkpvStaticDeclare(TraceProjections*, _trace);
53 CtvStaticDeclare(int,curThreadEvent);
54
55 CkpvDeclare(CmiInt8, CtrLogBufSize);
56
57 typedef CkVec<char *>  usrEventVec;
58 CkpvStaticDeclare(usrEventVec, usrEventlist);
59 class UsrEvent {
60 public:
61   int e;
62   char *str;
63   UsrEvent(int _e, char* _s): e(_e),str(_s) {}
64 };
65 CkpvStaticDeclare(CkVec<UsrEvent *>*, usrEvents);
66
67 /// Disable the outputting of the trace logs
68 void disableTraceLogOutput()
69
70   CkpvAccess(_trace)->setWriteData(false);
71 }
72
73 /// Enable the outputting of the trace logs
74 void enableTraceLogOutput()
75 {
76   CkpvAccess(_trace)->setWriteData(true);
77 }
78
79 /// Force the log files to be flushed
80 void flushTraceLog()
81 {
82   CkpvAccess(_trace)->traceFlushLog();
83 }
84
85 #if CMK_TRACE_DISABLED
86 static int warned=0;
87 #define OPTIMIZED_VERSION       \
88         if (!warned) { warned=1;        \
89         CmiPrintf("\n\n!!!! Warning: traceUserEvent not available in optimized version!!!!\n\n\n"); }
90 #else
91 #define OPTIMIZED_VERSION /*empty*/
92 #endif // CMK_TRACE_DISABLED
93
94 /*
95 On T3E, we need to have file number control by open/close files only when needed.
96 */
97 #if CMK_TRACE_LOGFILE_NUM_CONTROL
98   #define OPEN_LOG openLog("a");
99   #define CLOSE_LOG closeLog();
100 #else
101   #define OPEN_LOG
102   #define CLOSE_LOG
103 #endif //CMK_TRACE_LOGFILE_NUM_CONTROL
104
105 #if CMK_HAS_COUNTER_PAPI
106 int numPAPIEvents = 2;
107 int papiEvents[] = { PAPI_L3_DCM, PAPI_FP_OPS };
108 char *papiEventNames[] = {"PAPI_L3_DCM", "PAPI_FP_OPS"};
109 #endif // CMK_HAS_COUNTER_PAPI
110
111 /**
112   For each TraceFoo module, _createTraceFoo() must be defined.
113   This function is called in _createTraces() generated in moduleInit.C
114 */
115 void _createTraceprojections(char **argv)
116 {
117   DEBUGF("%d createTraceProjections\n", CkMyPe());
118   CkpvInitialize(CkVec<char *>, usrEventlist);
119   CkpvInitialize(CkVec<UsrEvent *>*, usrEvents);
120   CkpvAccess(usrEvents) = new CkVec<UsrEvent *>();
121 #if CMK_BLUEGENE_CHARM
122   // CthRegister does not call the constructor
123 //  CkpvAccess(usrEvents) = CkVec<UsrEvent *>();
124 #endif //CMK_BLUEGENE_CHARM
125   CkpvInitialize(TraceProjections*, _trace);
126   CkpvAccess(_trace) = new  TraceProjections(argv);
127   CkpvAccess(_traces)->addTrace(CkpvAccess(_trace));
128   if (CkMyPe()==0) CkPrintf("Charm++: Tracemode Projections enabled.\n");
129 }
130  
131 /* ****** CW TEMPORARY LOCATION ***** Support for thread listeners */
132
133 struct TraceThreadListener {
134   struct CthThreadListener base;
135   int event;
136   int msgType;
137   int ep;
138   int srcPe;
139   int ml;
140   CmiObjId idx;
141 };
142
143 extern "C"
144 void traceThreadListener_suspend(struct CthThreadListener *l)
145 {
146   TraceThreadListener *a=(TraceThreadListener *)l;
147   /* here, we activate the appropriate trace codes for the appropriate
148      registered modules */
149   traceSuspend();
150 }
151
152 extern "C"
153 void traceThreadListener_resume(struct CthThreadListener *l) 
154 {
155   TraceThreadListener *a=(TraceThreadListener *)l;
156   /* here, we activate the appropriate trace codes for the appropriate
157      registered modules */
158   _TRACE_BEGIN_EXECUTE_DETAILED(a->event,a->msgType,a->ep,a->srcPe,a->ml,
159                                 CthGetThreadID(a->base.thread));
160   a->event=-1;
161   a->srcPe=CkMyPe(); /* potential lie to migrated threads */
162   a->ml=0;
163 }
164
165 extern "C"
166 void traceThreadListener_free(struct CthThreadListener *l) 
167 {
168   TraceThreadListener *a=(TraceThreadListener *)l;
169   delete a;
170 }
171
172 void TraceProjections::traceAddThreadListeners(CthThread tid, envelope *e)
173 {
174 #if ! CMK_TRACE_DISABLED
175   /* strip essential information from the envelope */
176   TraceThreadListener *a= new TraceThreadListener;
177   
178   a->base.suspend=traceThreadListener_suspend;
179   a->base.resume=traceThreadListener_resume;
180   a->base.free=traceThreadListener_free;
181   a->event=e->getEvent();
182   a->msgType=e->getMsgtype();
183   a->ep=e->getEpIdx();
184   a->srcPe=e->getSrcPe();
185   a->ml=e->getTotalsize();
186
187   CthAddListener(tid, (CthThreadListener *)a);
188 #endif
189 }
190
191 void LogPool::openLog(const char *mode)
192 {
193 #if CMK_PROJECTIONS_USE_ZLIB
194   if(compressed) {
195     if (nonDeltaLog) {
196       do {
197         zfp = gzopen(fname, mode);
198       } while (!zfp && (errno == EINTR || errno == EMFILE));
199       if(!zfp) CmiAbort("Cannot open Projections Compressed Non Delta Trace File for writing...\n");
200     }
201     if (deltaLog) {
202       do {
203         deltazfp = gzopen(dfname, mode);
204       } while (!deltazfp && (errno == EINTR || errno == EMFILE));
205       if (!deltazfp) 
206         CmiAbort("Cannot open Projections Compressed Delta Trace File for writing...\n");
207     }
208   } else {
209     if (nonDeltaLog) {
210       do {
211         fp = fopen(fname, mode);
212       } while (!fp && (errno == EINTR || errno == EMFILE));
213       if (!fp) {
214         CkPrintf("[%d] Attempting to open file [%s]\n",CkMyPe(),fname);
215         CmiAbort("Cannot open Projections Non Delta Trace File for writing...\n");
216       }
217     }
218     if (deltaLog) {
219       do {
220         deltafp = fopen(dfname, mode);
221       } while (!deltafp && (errno == EINTR || errno == EMFILE));
222       if (!deltafp) 
223         CmiAbort("Cannot open Projections Delta Trace File for writing...\n");
224     }
225   }
226 #else
227   if (nonDeltaLog) {
228     do {
229       fp = fopen(fname, mode);
230     } while (!fp && (errno == EINTR || errno == EMFILE));
231     if (!fp) {
232       CkPrintf("[%d] Attempting to open file [%s]\n",CkMyPe(),fname);
233       CmiAbort("Cannot open Projections Non Delta Trace File for writing...\n");
234     }
235   }
236   if (deltaLog) {
237     do {
238       deltafp = fopen(dfname, mode);
239     } while (!deltafp && (errno == EINTR || errno == EMFILE));
240     if(!deltafp) 
241       CmiAbort("Cannot open Projections Delta Trace File for writing...\n");
242   }
243 #endif
244 }
245
246 void LogPool::closeLog(void)
247 {
248 #if CMK_PROJECTIONS_USE_ZLIB
249   if(compressed) {
250     if (nonDeltaLog) gzclose(zfp);
251     if (deltaLog) gzclose(deltazfp);
252     return;
253   }
254 #endif
255   if (nonDeltaLog) { 
256 #if !defined(_WIN32) || defined(__CYGWIN__)
257     fsync(fileno(fp)); 
258 #endif
259     fclose(fp); 
260   }
261   if (deltaLog)  { 
262 #if !defined(_WIN32) || defined(__CYGWIN__)
263     fsync(fileno(deltafp)); 
264 #endif
265     fclose(deltafp);  
266   }
267 }
268
269 LogPool::LogPool(char *pgm) {
270   pool = new LogEntry[CkpvAccess(CtrLogBufSize)];
271   // defaults to writing data (no outlier changes)
272   writeData = true;
273   numEntries = 0;
274   // **CW** for simple delta encoding
275   prevTime = 0.0;
276   timeErr = 0.0;
277   globalEndTime = 0.0;
278   headerWritten = 0;
279   numPhases = 0;
280   hasFlushed = false;
281
282   keepPhase = NULL;
283
284   fileCreated = false;
285   poolSize = CkpvAccess(CtrLogBufSize);
286   pgmname = new char[strlen(pgm)+1];
287   strcpy(pgmname, pgm);
288 }
289
290 void LogPool::createFile(const char *fix)
291 {
292   if (fileCreated) {
293     return;
294   }
295   char pestr[10];
296   sprintf(pestr, "%d", CkMyPe());
297 #if CMK_PROJECTIONS_USE_ZLIB
298   int len;
299   if(compressed)
300     len = strlen(pgmname)+strlen(fix)+strlen(".logold")+strlen(pestr)+strlen(".gz")+3;
301   else
302     len = strlen(pgmname)+strlen(fix)+strlen(".logold")+strlen(pestr)+3;
303 #else
304   int len = strlen(pgmname)+strlen(fix)+strlen(".logold")+strlen(pestr)+3;
305 #endif
306   if (nonDeltaLog) {
307     fname = new char[len];
308   }
309   if (deltaLog) {
310     dfname = new char[len];
311   }
312 #if CMK_PROJECTIONS_USE_ZLIB
313   if(compressed) {
314     if (deltaLog && nonDeltaLog) {
315       sprintf(fname, "%s%s.%s.logold.gz", pgmname, fix, pestr);
316       sprintf(dfname, "%s%s.%s.log.gz", pgmname, fix, pestr);
317     } else {
318       if (nonDeltaLog) {
319         sprintf(fname, "%s%s.%s.log.gz", pgmname, fix, pestr);
320       } else {
321         sprintf(dfname, "%s%s.%s.log.gz", pgmname, fix, pestr);
322       }
323     }
324   } else {
325     if (deltaLog && nonDeltaLog) {
326       sprintf(fname, "%s%s.%s.logold", pgmname, fix, pestr);
327       sprintf(dfname, "%s%s.%s.log", pgmname, fix, pestr);
328     } else {
329       if (nonDeltaLog) {
330         sprintf(fname, "%s%s.%s.log", pgmname, fix, pestr);
331       } else {
332         sprintf(dfname, "%s%s.%s.log", pgmname, fix, pestr);
333       }
334     }
335   }
336 #else
337   if (deltaLog && nonDeltaLog) {
338     sprintf(fname, "%s%s.%s.logold", pgmname, fix, pestr);
339     sprintf(dfname, "%s%s.%s.log", pgmname, fix, pestr);
340   } else {
341     if (nonDeltaLog) {
342       sprintf(fname, "%s%s.%s.log", pgmname, fix, pestr);
343     } else {
344       sprintf(dfname, "%s%s.%s.log", pgmname, fix, pestr);
345     }
346   }
347 #endif
348   fileCreated = true;
349   openLog("w+");
350   CLOSE_LOG 
351 }
352
353 void LogPool::createSts(const char *fix)
354 {
355   CkAssert(CkMyPe() == 0);
356   // create the sts file
357   char *fname = new char[strlen(CkpvAccess(traceRoot))+strlen(fix)+strlen(".sts")+2];
358   sprintf(fname, "%s%s.sts", CkpvAccess(traceRoot), fix);
359   do
360     {
361       stsfp = fopen(fname, "w");
362     } while (!stsfp && (errno == EINTR || errno == EMFILE));
363   if(stsfp==0)
364     CmiAbort("Cannot open projections sts file for writing.\n");
365   delete[] fname;
366 }  
367
368 void LogPool::createRC()
369 {
370   // create the projections rc file.
371   fname = 
372     new char[strlen(CkpvAccess(traceRoot))+strlen(".projrc")+1];
373   sprintf(fname, "%s.projrc", CkpvAccess(traceRoot));
374   do {
375     rcfp = fopen(fname, "w");
376   } while (!rcfp && (errno == EINTR || errno == EMFILE));
377   if (rcfp==0) {
378     CmiAbort("Cannot open projections configuration file for writing.\n");
379   }
380   delete[] fname;
381 }
382
383 LogPool::~LogPool() 
384 {
385   if (writeData) {
386     writeLog();
387 #if !CMK_TRACE_LOGFILE_NUM_CONTROL
388     closeLog();
389 #endif
390   }
391
392 #if CMK_BLUEGENE_CHARM
393   extern int correctTimeLog;
394   if (correctTimeLog) {
395     createFile("-bg");
396     if (CkMyPe() == 0) {
397       createSts("-bg");
398     }
399     writeHeader();
400     if (CkMyPe() == 0) writeSts(NULL);
401     postProcessLog();
402   }
403 #endif
404
405   delete[] pool;
406   delete [] fname;
407 }
408
409 void LogPool::writeHeader()
410 {
411   if (headerWritten) return;
412   headerWritten = 1;
413   if(!binary) {
414 #if CMK_PROJECTIONS_USE_ZLIB
415     if(compressed) {
416       if (nonDeltaLog) {
417         gzprintf(zfp, "PROJECTIONS-RECORD %d\n", numEntries);
418       }
419       if (deltaLog) {
420         gzprintf(deltazfp, "PROJECTIONS-RECORD %d DELTA\n", numEntries);
421       }
422     } 
423     else /* else clause is below... */
424 #endif
425     /*... may hang over from else above */ {
426       if (nonDeltaLog) {
427         fprintf(fp, "PROJECTIONS-RECORD %d\n", numEntries);
428       }
429       if (deltaLog) {
430         fprintf(deltafp, "PROJECTIONS-RECORD %d DELTA\n", numEntries);
431       }
432     }
433   }
434   else { // binary
435       if (nonDeltaLog) {
436         fwrite(&numEntries,sizeof(numEntries),1,fp);
437       }
438       if (deltaLog) {
439         fwrite(&numEntries,sizeof(numEntries),1,deltafp);
440       }
441   }
442 }
443
444 void LogPool::writeLog(void)
445 {
446   createFile();
447   OPEN_LOG
448   writeHeader();
449   if (nonDeltaLog) write(0);
450   if (deltaLog) write(1);
451   CLOSE_LOG
452 }
453
454 void LogPool::write(int writedelta) 
455 {
456   // **CW** Simple delta encoding implementation
457   // prevTime has to be maintained as an object variable because
458   // LogPool::write may be called several times depending on the
459   // +logsize value.
460   PUP::er *p = NULL;
461   if (binary) {
462     p = new PUP::toDisk(writedelta?deltafp:fp);
463   }
464 #if CMK_PROJECTIONS_USE_ZLIB
465   else if (compressed) {
466     p = new toProjectionsGZFile(writedelta?deltazfp:zfp);
467   }
468 #endif
469   else {
470     p = new toProjectionsFile(writedelta?deltafp:fp);
471   }
472   CmiAssert(p);
473   int curPhase = 0;
474   // **FIXME** - Should probably consider a more sophisticated bounds-based
475   //   approach for selective writing instead of making multiple if-checks
476   //   for every single event.
477   for(UInt i=0; i<numEntries; i++) {
478     if (!writedelta) {
479       if (keepPhase == NULL) {
480         // default case, when no phase selection is required.
481         pool[i].pup(*p);
482       } else {
483         // **FIXME** Might be a good idea to create a "filler" event block for
484         //   all the events taken out by phase filtering.
485         if (pool[i].type == END_PHASE) {
486           // always write phase markers
487           pool[i].pup(*p);
488           curPhase++;
489         } else if (pool[i].type == BEGIN_COMPUTATION ||
490                    pool[i].type == END_COMPUTATION) {
491           // always write BEGIN and END COMPUTATION markers
492           pool[i].pup(*p);
493         } else if (keepPhase[curPhase]) {
494           pool[i].pup(*p);
495         }
496       }
497     }
498     else {      // delta
499       // **FIXME** Implement phase-selective writing for delta logs
500       //   eventually
501       double time = pool[i].time;
502       if (pool[i].type != BEGIN_COMPUTATION && pool[i].type != END_COMPUTATION)
503       {
504         double timeDiff = (time-prevTime)*1.0e6;
505         UInt intTimeDiff = (UInt)timeDiff;
506         timeErr += timeDiff - intTimeDiff; /* timeErr is never >= 2.0 */
507         if (timeErr > 1.0) {
508           timeErr -= 1.0;
509           intTimeDiff++;
510         }
511         pool[i].time = intTimeDiff/1.0e6;
512       }
513       pool[i].pup(*p);
514       pool[i].time = time;      // restore time value
515       prevTime = time;
516     }
517   }
518   delete p;
519   delete [] keepPhase;
520 }
521
522 void LogPool::writeSts(void)
523 {
524   // for whining compilers
525   int i;
526   char name[30];
527   // generate an automatic unique ID for each log
528   fprintf(stsfp, "PROJECTIONS_ID %s\n", "");
529   fprintf(stsfp, "VERSION %s\n", PROJECTION_VERSION);
530   fprintf(stsfp, "TOTAL_PHASES %d\n", numPhases);
531 #if CMK_HAS_COUNTER_PAPI
532   fprintf(stsfp, "TOTAL_PAPI_EVENTS %d\n", numPAPIEvents);
533   // for now, use i, next time use papiEvents[i].
534   // **CW** papi event names is a hack.
535   for (i=0;i<numPAPIEvents;i++) {
536     fprintf(stsfp, "PAPI_EVENT %d %s\n", i, papiEventNames[i]);
537   }
538 #endif
539   traceWriteSTS(stsfp,CkpvAccess(usrEvents)->length());
540   for(i=0;i<CkpvAccess(usrEvents)->length();i++){
541     fprintf(stsfp, "EVENT %d %s\n", (*CkpvAccess(usrEvents))[i]->e, (*CkpvAccess(usrEvents))[i]->str);
542   }     
543 }
544
545 void LogPool::writeSts(TraceProjections *traceProj){
546   writeSts();
547   if (traceProj != NULL) {
548     CkHashtableIterator  *funcIter = traceProj->getfuncIterator();
549     funcIter->seekStart();
550     int numFuncs = traceProj->getFuncNumber();
551     fprintf(stsfp,"TOTAL_FUNCTIONS %d \n",numFuncs);
552     while(funcIter->hasNext()) {
553       StrKey *key;
554       int *obj = (int *)funcIter->next((void **)&key);
555       fprintf(stsfp,"FUNCTION %d %s \n",*obj,key->getStr());
556     }
557   }
558   fprintf(stsfp, "END\n");
559   fclose(stsfp);
560 }
561
562 void LogPool::writeRC(void)
563 {
564     //CkPrintf("write RC is being executed\n");
565 #ifdef PROJ_ANALYSIS  
566     CkAssert(CkMyPe() == 0);
567     fprintf(rcfp,"RC_GLOBAL_END_TIME %lld\n",
568           (CMK_TYPEDEF_UINT8)(1.0e6*globalEndTime));
569     /* //Yanhua comment it because isOutlierAutomatic is not a variable in trace
570     if (CkpvAccess(_trace)->isOutlierAutomatic()) {
571       fprintf(rcfp,"RC_OUTLIER_FILTERED true\n");
572     } else {
573       fprintf(rcfp,"RC_OUTLIER_FILTERED false\n");
574     }
575     */
576 #endif //PROJ_ANALYSIS
577   fclose(rcfp);
578 }
579
580
581 #if CMK_BLUEGENE_CHARM
582 static void updateProjLog(void *data, double t, double recvT, void *ptr)
583 {
584   LogEntry *log = (LogEntry *)data;
585   FILE *fp = *(FILE **)ptr;
586   log->time = t;
587   log->recvTime = recvT<0.0?0:recvT;
588 //  log->write(fp);
589   toProjectionsFile p(fp);
590   log->pup(p);
591 }
592 #endif
593
594 // flush log entries to disk
595 void LogPool::flushLogBuffer()
596 {
597   if (numEntries) {
598     double writeTime = TraceTimer();
599     writeLog();
600     hasFlushed = true;
601     numEntries = 0;
602     new (&pool[numEntries++]) LogEntry(writeTime, BEGIN_INTERRUPT);
603     new (&pool[numEntries++]) LogEntry(TraceTimer(), END_INTERRUPT);
604   }
605 }
606
607 void LogPool::add(UChar type, UShort mIdx, UShort eIdx,
608                   double time, int event, int pe, int ml, CmiObjId *id, 
609                   double recvT, double cpuT, int numPe)
610 {
611   new (&pool[numEntries++])
612     LogEntry(time, type, mIdx, eIdx, event, pe, ml, id, recvT, cpuT, numPe);
613   if ((type == END_PHASE) || (type == END_COMPUTATION)) {
614     numPhases++;
615   }
616   if(poolSize==numEntries) {
617     flushLogBuffer();
618 #if CMK_BLUEGENE_CHARM
619     extern int correctTimeLog;
620     if (correctTimeLog) CmiAbort("I/O interrupt!\n");
621 #endif
622   }
623 #if CMK_BLUEGENE_CHARM
624   switch (type) {
625     case BEGIN_PROCESSING:
626       pool[numEntries-1].recvTime = BgGetRecvTime();
627     case END_PROCESSING:
628     case BEGIN_COMPUTATION:
629     case END_COMPUTATION:
630     case CREATION:
631     case BEGIN_PACK:
632     case END_PACK:
633     case BEGIN_UNPACK:
634     case END_UNPACK:
635     case USER_EVENT_PAIR:
636       bgAddProjEvent(&pool[numEntries-1], numEntries-1, time, updateProjLog, &fp, BG_EVENT_PROJ);
637   }
638 #endif
639 }
640
641 void LogPool::add(UChar type,double time,UShort funcID,int lineNum,char *fileName){
642 #ifndef CMK_BLUEGENE_CHARM
643   new (&pool[numEntries++])
644         LogEntry(time,type,funcID,lineNum,fileName);
645   if(poolSize == numEntries){
646     flushLogBuffer();
647   }
648 #endif  
649 }
650
651
652   
653 void LogPool::addMemoryUsage(unsigned char type,double time,double memUsage){
654 #ifndef CMK_BLUEGENE_CHARM
655   new (&pool[numEntries++])
656         LogEntry(type,time,memUsage);
657   if(poolSize == numEntries){
658     flushLogBuffer();
659   }
660 #endif  
661         
662 }  
663
664
665
666 void LogPool::addUserSupplied(int data){
667         // add an event
668         add(USER_SUPPLIED, 0, 0, TraceTimer(), -1, -1, 0, 0, 0, 0, 0 );
669
670         // set the user supplied value for the previously created event 
671         pool[numEntries-1].setUserSuppliedData(data);
672   }
673
674
675 void LogPool::addUserSuppliedNote(char *note){
676         // add an event
677         add(USER_SUPPLIED_NOTE, 0, 0, TraceTimer(), -1, -1, 0, 0, 0, 0, 0 );
678
679         // set the user supplied note for the previously created event 
680         pool[numEntries-1].setUserSuppliedNote(note);
681   }
682
683 void LogPool::addUserSuppliedBracketedNote(char *note, int eventID, double bt, double et){
684   //CkPrintf("LogPool::addUserSuppliedBracketedNote eventID=%d\n", eventID);
685 #ifndef CMK_BLUEGENE_CHARM
686 #if CMK_SMP_TRACE_COMMTHREAD && MPI_SMP_TRACE_COMMTHREAD_HACK
687   //This part of code is used  to combine the contiguous
688   //MPI_Test and MPI_Iprobe events to reduce the number of
689   //entries
690 #define MPI_TEST_EVENT_ID 60
691 #define MPI_IPROBE_EVENT_ID 70 
692   int lastEvent = pool[numEntries-1].event;
693   if((eventID==MPI_TEST_EVENT_ID || eventID==MPI_IPROBE_EVENT_ID) && (eventID==lastEvent)){
694     //just replace the endtime of last event
695     //CkPrintf("addUserSuppliedBracketNote: for event %d\n", lastEvent);
696     pool[numEntries].endTime = et;
697   }else{
698     new (&pool[numEntries++])
699       LogEntry(bt, et, USER_SUPPLIED_BRACKETED_NOTE, note, eventID);
700   }
701 #else
702   new (&pool[numEntries++])
703     LogEntry(bt, et, USER_SUPPLIED_BRACKETED_NOTE, note, eventID);
704 #endif
705   if(poolSize == numEntries){
706     flushLogBuffer();
707   }
708 #endif  
709 }
710
711
712 /* **CW** Not sure if this is the right thing to do. Feels more like
713    a hack than a solution to Sameer's request to add the destination
714    processor information to multicasts and broadcasts.
715
716    In the unlikely event this method is used for Broadcasts as well,
717    pelist == NULL will be used to indicate a global broadcast with 
718    num PEs.
719 */
720 void LogPool::addCreationMulticast(UShort mIdx, UShort eIdx, double time,
721                                    int event, int pe, int ml, CmiObjId *id,
722                                    double recvT, int numPe, int *pelist)
723 {
724   new (&pool[numEntries++])
725     LogEntry(time, mIdx, eIdx, event, pe, ml, id, recvT, numPe, pelist);
726   if(poolSize==numEntries) {
727     flushLogBuffer();
728   }
729 }
730
731 void LogPool::postProcessLog()
732 {
733 #if CMK_BLUEGENE_CHARM
734   bgUpdateProj(1);   // event type
735 #endif
736 }
737
738 // /** Constructor for a multicast log entry */
739 // 
740 //  THIS WAS MOVED TO trace-projections.h with the other constructors
741 // 
742 // LogEntry::LogEntry(double tm, unsigned short m, unsigned short e, int ev, int p,
743 //           int ml, CmiObjId *d, double rt, int numPe, int *pelist) 
744 // {
745 //     type = CREATION_MULTICAST; mIdx = m; eIdx = e; event = ev; pe = p; time = tm; msglen = ml;
746 //     if (d) id = *d; else {id.id[0]=id.id[1]=id.id[2]=id.id[3]=-1; };
747 //     recvTime = rt; 
748 //     numpes = numPe;
749 //     userSuppliedNote = NULL;
750 //     if (pelist != NULL) {
751 //      pes = new int[numPe];
752 //      for (int i=0; i<numPe; i++) {
753 //        pes[i] = pelist[i];
754 //      }
755 //     } else {
756 //      pes= NULL;
757 //     }
758 // }
759
760 void LogEntry::addPapi(int numPapiEvts, int *papi_ids, LONG_LONG_PAPI *papiVals)
761 {
762 #if CMK_HAS_COUNTER_PAPI
763   numPapiEvents = numPapiEvts;
764   if (papiVals != NULL) {
765     papiIDs = new int[numPapiEvents];
766     papiValues = new LONG_LONG_PAPI[numPapiEvents];
767     for (int i=0; i<numPapiEvents; i++) {
768       papiIDs[i] = papi_ids[i];
769       papiValues[i] = papiVals[i];
770     }
771   }
772 #endif
773 }
774
775
776
777 void LogEntry::pup(PUP::er &p)
778 {
779   int i;
780   CMK_TYPEDEF_UINT8 itime, iEndTime, irecvtime, icputime;
781   char ret = '\n';
782
783   p|type;
784   if (p.isPacking()) itime = (CMK_TYPEDEF_UINT8)(1.0e6*time);
785   if (p.isPacking()) iEndTime = (CMK_TYPEDEF_UINT8)(1.0e6*endTime);
786
787   switch (type) {
788     case USER_EVENT:
789     case USER_EVENT_PAIR:
790       p|mIdx; p|itime; p|event; p|pe;
791       break;
792     case BEGIN_IDLE:
793     case END_IDLE:
794     case BEGIN_PACK:
795     case END_PACK:
796     case BEGIN_UNPACK:
797     case END_UNPACK:
798       p|itime; p|pe; 
799       break;
800     case BEGIN_PROCESSING:
801       if (p.isPacking()) {
802         irecvtime = (CMK_TYPEDEF_UINT8)(recvTime==-1?-1:1.0e6*recvTime);
803         icputime = (CMK_TYPEDEF_UINT8)(1.0e6*cputime);
804       }
805       p|mIdx; p|eIdx; p|itime; p|event; p|pe; 
806       p|msglen; p|irecvtime; 
807       p|id.id[0]; p|id.id[1]; p|id.id[2]; p|id.id[3];
808       p|icputime;
809 #if CMK_HAS_COUNTER_PAPI
810       p|numPapiEvents;
811       for (i=0; i<numPapiEvents; i++) {
812         // not yet!!!
813         //      p|papiIDs[i]; 
814         p|papiValues[i];
815         
816       }
817 #else
818       p|numPapiEvents;     // non papi version has value 0
819 #endif
820       if (p.isUnpacking()) {
821         recvTime = irecvtime/1.0e6;
822         cputime = icputime/1.0e6;
823       }
824       break;
825     case END_PROCESSING:
826       if (p.isPacking()) icputime = (CMK_TYPEDEF_UINT8)(1.0e6*cputime);
827       p|mIdx; p|eIdx; p|itime; p|event; p|pe; p|msglen; p|icputime;
828 #if CMK_HAS_COUNTER_PAPI
829       p|numPapiEvents;
830       for (i=0; i<numPapiEvents; i++) {
831         // not yet!!!
832         //      p|papiIDs[i];
833         p|papiValues[i];
834       }
835 #else
836       p|numPapiEvents;  // non papi version has value 0
837 #endif
838       if (p.isUnpacking()) cputime = icputime/1.0e6;
839       break;
840     case USER_SUPPLIED:
841           p|userSuppliedData;
842           p|itime;
843         break;
844     case USER_SUPPLIED_NOTE:
845           p|itime;
846           int length;
847           if (p.isPacking()) length = strlen(userSuppliedNote);
848           p | length;
849           char space;
850           space = ' ';
851           p | space;
852           if (p.isUnpacking()) {
853             userSuppliedNote = new char[length+1];
854             userSuppliedNote[length] = '\0';
855           }
856           PUParray(p,userSuppliedNote, length);
857           break;
858     case USER_SUPPLIED_BRACKETED_NOTE:
859       //CkPrintf("Writting out a USER_SUPPLIED_BRACKETED_NOTE\n");
860           p|itime;
861           p|iEndTime;
862           p|event;
863           int length2;
864           if (p.isPacking()) length2 = strlen(userSuppliedNote);
865           p | length2;
866           char space2;
867           space2 = ' ';
868           p | space2;
869           if (p.isUnpacking()) {
870             userSuppliedNote = new char[length+1];
871             userSuppliedNote[length] = '\0';
872           }
873           PUParray(p,userSuppliedNote, length2);
874           break;
875     case MEMORY_USAGE_CURRENT:
876       p | memUsage;
877       p | itime;
878         break;
879     case CREATION:
880       if (p.isPacking()) irecvtime = (CMK_TYPEDEF_UINT8)(1.0e6*recvTime);
881       p|mIdx; p|eIdx; p|itime;
882       p|event; p|pe; p|msglen; p|irecvtime;
883       if (p.isUnpacking()) recvTime = irecvtime/1.0e6;
884       break;
885     case CREATION_BCAST:
886       if (p.isPacking()) irecvtime = (CMK_TYPEDEF_UINT8)(1.0e6*recvTime);
887       p|mIdx; p|eIdx; p|itime;
888       p|event; p|pe; p|msglen; p|irecvtime; p|numpes;
889       if (p.isUnpacking()) recvTime = irecvtime/1.0e6;
890       break;
891     case CREATION_MULTICAST:
892       if (p.isPacking()) irecvtime = (CMK_TYPEDEF_UINT8)(1.0e6*recvTime);
893       p|mIdx; p|eIdx; p|itime;
894       p|event; p|pe; p|msglen; p|irecvtime; p|numpes;
895       if (p.isUnpacking()) pes = numpes?new int[numpes]:NULL;
896       for (i=0; i<numpes; i++) p|pes[i];
897       if (p.isUnpacking()) recvTime = irecvtime/1.0e6;
898       break;
899     case MESSAGE_RECV:
900       p|mIdx; p|eIdx; p|itime; p|event; p|pe; p|msglen;
901       break;
902
903     case ENQUEUE:
904     case DEQUEUE:
905       p|mIdx; p|itime; p|event; p|pe;
906       break;
907
908     case BEGIN_INTERRUPT:
909     case END_INTERRUPT:
910       p|itime; p|event; p|pe;
911       break;
912
913       // **CW** absolute timestamps are used here to support a quick
914       // way of determining the total time of a run in projections
915       // visualization.
916     case BEGIN_COMPUTATION:
917     case END_COMPUTATION:
918     case BEGIN_TRACE:
919     case END_TRACE:
920       p|itime;
921       break;
922     case BEGIN_FUNC:
923         p | itime;
924         p | mIdx;
925         p | event;
926         if(!p.isUnpacking()){
927                 p(fName,flen-1);
928         }
929         break;
930     case END_FUNC:
931         p | itime;
932         p | mIdx;
933         break;
934     case END_PHASE:
935       p|eIdx; // FIXME: actually the phase ID
936       p|itime;
937       break;
938     default:
939       CmiError("***Internal Error*** Wierd Event %d.\n", type);
940       break;
941   }
942   if (p.isUnpacking()) time = itime/1.0e6;
943   p|ret;
944 }
945
946 TraceProjections::TraceProjections(char **argv): 
947   curevent(0), inEntry(0), computationStarted(0), 
948         converseExit(0), endTime(0.0), traceNestedEvents(0),
949         currentPhaseID(0), lastPhaseEvent(NULL)
950 {
951   //  CkPrintf("Trace projections dummy constructor called on %d\n",CkMyPe());
952
953   if (CkpvAccess(traceOnPe) == 0) return;
954
955   CtvInitialize(int,curThreadEvent);
956   CkpvInitialize(CmiInt8, CtrLogBufSize);
957   CkpvAccess(CtrLogBufSize) = DefaultLogBufSize;
958   CtvAccess(curThreadEvent)=0;
959   if (CmiGetArgLongDesc(argv,"+logsize",&CkpvAccess(CtrLogBufSize), 
960                        "Log entries to buffer per I/O")) {
961     if (CkMyPe() == 0) {
962       CmiPrintf("Trace: logsize: %ld\n", CkpvAccess(CtrLogBufSize));
963     }
964   }
965   checknested = 
966     CmiGetArgFlagDesc(argv,"+checknested",
967                       "check projections nest begin end execute events");
968   traceNestedEvents = 
969     CmiGetArgFlagDesc(argv,"+tracenested",
970               "trace projections nest begin/end execute events");
971   int binary = 
972     CmiGetArgFlagDesc(argv,"+binary-trace",
973                       "Write log files in binary format");
974 #if CMK_PROJECTIONS_USE_ZLIB
975   int compressed = CmiGetArgFlagDesc(argv,"+gz-trace","Write log files pre-compressed with gzip");
976 #else
977   // consume the flag so there's no confusing
978   CmiGetArgFlagDesc(argv,"+gz-trace",
979                     "Write log files pre-compressed with gzip");
980   if(CkMyPe() == 0) CkPrintf("Warning> gz-trace is not supported on this machine!\n");
981 #endif
982
983   // **CW** default to non delta log encoding. The user may choose to do
984   // create both logs (for debugging) or just the old log timestamping
985   // (for compatibility).
986   // Generating just the non delta log takes precedence over generating
987   // both logs (if both arguments appear on the command line).
988
989   // switch to OLD log format until everything works // Gengbin
990   nonDeltaLog = 1;
991   deltaLog = 0;
992   deltaLog = CmiGetArgFlagDesc(argv, "+logDelta",
993                                   "Generate Delta encoded and simple timestamped log files");
994
995   _logPool = new LogPool(CkpvAccess(traceRoot));
996   _logPool->setBinary(binary);
997 #if CMK_PROJECTIONS_USE_ZLIB
998   _logPool->setCompressed(compressed);
999 #endif
1000   if (CkMyPe() == 0) {
1001     _logPool->createSts();
1002     _logPool->createRC();
1003   }
1004   funcCount=1;
1005
1006 #if CMK_HAS_COUNTER_PAPI
1007   // We initialize and create the event sets for use with PAPI here.
1008   int papiRetValue = PAPI_library_init(PAPI_VER_CURRENT);
1009   if (papiRetValue != PAPI_VER_CURRENT) {
1010     CmiAbort("PAPI Library initialization failure!\n");
1011   }
1012   // PAPI 3 mandates the initialization of the set to PAPI_NULL
1013   papiEventSet = PAPI_NULL; 
1014   if (PAPI_create_eventset(&papiEventSet) != PAPI_OK) {
1015     CmiAbort("PAPI failed to create event set!\n");
1016   }
1017   papiRetValue = PAPI_add_events(papiEventSet, papiEvents, numPAPIEvents);
1018   if (papiRetValue != PAPI_OK) {
1019     if (papiRetValue == PAPI_ECNFLCT) {
1020       CmiAbort("PAPI events conflict! Please re-assign event types!\n");
1021     } else {
1022       CmiAbort("PAPI failed to add designated events!\n");
1023     }
1024   }
1025   papiValues = new long_long[numPAPIEvents];
1026   memset(papiValues, 0, numPAPIEvents*sizeof(long_long));
1027 #endif
1028 }
1029
1030 int TraceProjections::traceRegisterUserEvent(const char* evt, int e)
1031 {
1032   OPTIMIZED_VERSION
1033   CkAssert(e==-1 || e>=0);
1034   CkAssert(evt != NULL);
1035   int event;
1036   int biggest = -1;
1037   for (int i=0; i<CkpvAccess(usrEvents)->length(); i++) {
1038     int cur = (*CkpvAccess(usrEvents))[i]->e;
1039     if (cur == e) {
1040       //CmiPrintf("%s %s\n", (*CkpvAccess(usrEvents))[i]->str, evt);
1041       if (strcmp((*CkpvAccess(usrEvents))[i]->str, evt) == 0) 
1042         return e;
1043       else
1044         CmiAbort("UserEvent double registered!");
1045     }
1046     if (cur > biggest) biggest = cur;
1047   }
1048   // if no user events have so far been registered. biggest will be -1
1049   // and hence newly assigned event numbers will begin from 0.
1050   if (e==-1) event = biggest+1;  // automatically assign new event number
1051   else event = e;
1052   CkpvAccess(usrEvents)->push_back(new UsrEvent(event,(char *)evt));
1053   return event;
1054 }
1055
1056 void TraceProjections::traceClearEps(void)
1057 {
1058   // In trace-summary, this zeros out the EP bins, to eliminate noise
1059   // from startup.  Here, this isn't useful, since we can do that in
1060   // post-processing
1061 }
1062
1063 void TraceProjections::traceWriteSts(void)
1064 {
1065   if(CkMyPe()==0)
1066     _logPool->writeSts(this);
1067 }
1068
1069 /** 
1070  * **IMPT NOTES**:
1071  *
1072  * This is called when Converse closes during ConverseCommonExit().
1073  * **FIXME**(?) - is this also exposed as a tracing-framework API call?
1074  *
1075  * Some programs bypass CkExit() (like NAMD, which eventually calls
1076  * ConverseExit()), modules like traces will have to pretend to shutdown
1077  * as if CkExit() was called but at the same time avoid making
1078  * subsequent CkExit() calls (which is usually required for allowing
1079  * other modules to shutdown).
1080  *
1081  * Note that we can only get here if CkExit() was not called, since the
1082  * trace module will un-register itself from TraceArray if it did.
1083  *
1084  */
1085 void TraceProjections::traceClose(void)
1086 {
1087 #ifdef PROJ_ANALYSIS
1088   // CkPrintf("CkExit was not called on shutdown on [%d]\n", CkMyPe());
1089
1090   // sets the flag that tells the code not to make the CkExit call later
1091   converseExit = 1;
1092   if (CkMyPe() == 0) {
1093     CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1094     bocProxy.traceProjectionsParallelShutdown(-1);
1095   }
1096 #else
1097   // we've already deleted the logpool, so multiple calls to traceClose
1098   // are tolerated.
1099   if (_logPool == NULL) {
1100     return;
1101   }
1102   if(CkMyPe()==0){
1103     _logPool->writeSts(this);
1104   }
1105   CkpvAccess(_trace)->endComputation();
1106   delete _logPool;              // will write
1107   // remove myself from traceArray so that no tracing will be called.
1108   CkpvAccess(_traces)->removeTrace(this);
1109 #endif
1110 }
1111
1112 /**
1113  *  **IMPT NOTES**:
1114  *
1115  *  This is meant to be called internally by the tracing framework.
1116  *
1117  */
1118 void TraceProjections::closeTrace() {
1119   //  CkPrintf("Close Trace called on [%d]\n", CkMyPe());
1120   if (CkMyPe() == 0) {
1121     // CkPrintf("Pe 0 will now write sts and projrc files\n");
1122     _logPool->writeSts(this);
1123     _logPool->writeRC();
1124     // CkPrintf("Pe 0 has now written sts and projrc files\n");
1125   }
1126   delete _logPool;       // will write logs to file
1127 }
1128
1129 #if CMK_SMP_TRACE_COMMTHREAD
1130 void TraceProjections::traceBeginOnCommThread()
1131 {
1132   if (!computationStarted) return;
1133   _logPool->add(BEGIN_TRACE, 0, 0, TraceTimer(), curevent++, CmiNumPes()+CmiMyNode());
1134 }
1135
1136 void TraceProjections::traceEndOnCommThread()
1137 {
1138   _logPool->add(END_TRACE, 0, 0, TraceTimer(), curevent++, CmiNumPes()+CmiMyNode());
1139 }
1140 #endif
1141
1142 void TraceProjections::traceBegin(void)
1143 {
1144   if (!computationStarted) return;
1145   _logPool->add(BEGIN_TRACE, 0, 0, TraceTimer(), curevent++, CkMyPe());
1146 }
1147
1148 void TraceProjections::traceEnd(void)
1149 {
1150   _logPool->add(END_TRACE, 0, 0, TraceTimer(), curevent++, CkMyPe());
1151 }
1152
1153 void TraceProjections::userEvent(int e)
1154 {
1155   if (!computationStarted) return;
1156   _logPool->add(USER_EVENT, e, 0, TraceTimer(),curevent++,CkMyPe());
1157 }
1158
1159 void TraceProjections::userBracketEvent(int e, double bt, double et)
1160 {
1161   if (!computationStarted) return;
1162   // two events record Begin/End of event e.
1163   _logPool->add(USER_EVENT_PAIR, e, 0, TraceTimer(bt), curevent, CkMyPe());
1164   _logPool->add(USER_EVENT_PAIR, e, 0, TraceTimer(et), curevent++, CkMyPe());
1165 }
1166
1167 void TraceProjections::userSuppliedData(int d)
1168 {
1169   if (!computationStarted) return;
1170   _logPool->addUserSupplied(d);
1171 }
1172
1173 void TraceProjections::userSuppliedNote(char *note)
1174 {
1175   if (!computationStarted) return;
1176   _logPool->addUserSuppliedNote(note);
1177 }
1178
1179
1180 void TraceProjections::userSuppliedBracketedNote(char *note, int eventID, double bt, double et)
1181 {
1182   if (!computationStarted) return;
1183   _logPool->addUserSuppliedBracketedNote(note,  eventID,  bt, et);
1184 }
1185
1186 void TraceProjections::memoryUsage(double m)
1187 {
1188   if (!computationStarted) return;
1189   _logPool->addMemoryUsage(MEMORY_USAGE_CURRENT, TraceTimer(), m );
1190   
1191 }
1192
1193
1194 void TraceProjections::creation(envelope *e, int ep, int num)
1195 {
1196   double curTime = TraceTimer();
1197   if (e == 0) {
1198     CtvAccess(curThreadEvent) = curevent;
1199     _logPool->add(CREATION, ForChareMsg, ep, curTime,
1200                   curevent++, CkMyPe(), 0, NULL, 0, 0.0);
1201   } else {
1202     int type=e->getMsgtype();
1203     e->setEvent(curevent);
1204     if (num > 1) {
1205       _logPool->add(CREATION_BCAST, type, ep, curTime,
1206                     curevent++, CkMyPe(), e->getTotalsize(), 
1207                     NULL, 0, 0.0, num);
1208     } else {
1209       _logPool->add(CREATION, type, ep, curTime,
1210                     curevent++, CkMyPe(), e->getTotalsize(), 
1211                     NULL, 0, 0.0);
1212     }
1213   }
1214 }
1215
1216 /* **CW** Non-disruptive attempt to add destination PE knowledge to
1217    Communication Library-specific Multicasts via new event 
1218    CREATION_MULTICAST.
1219 */
1220
1221 void TraceProjections::creationMulticast(envelope *e, int ep, int num,
1222                                          int *pelist)
1223 {
1224   double curTime = TraceTimer();
1225   if (e==0) {
1226     CtvAccess(curThreadEvent)=curevent;
1227     _logPool->addCreationMulticast(ForChareMsg, ep, curTime, curevent++,
1228                                    CkMyPe(), 0, 0, 0.0, num, pelist);
1229   } else {
1230     int type=e->getMsgtype();
1231     e->setEvent(curevent);
1232     _logPool->addCreationMulticast(type, ep, curTime, curevent++, CkMyPe(),
1233                                    e->getTotalsize(), 0, 0.0, num, pelist);
1234   }
1235 }
1236
1237 void TraceProjections::creationDone(int num)
1238 {
1239   // modified the creation done time of the last num log entries
1240   // FIXME: totally a hack
1241   double curTime = TraceTimer();
1242   int idx = _logPool->numEntries-1;
1243   while (idx >=0 && num >0 ) {
1244     LogEntry &log = _logPool->pool[idx];
1245     if ((log.type == CREATION) ||
1246         (log.type == CREATION_BCAST) ||
1247         (log.type == CREATION_MULTICAST)) {
1248       log.recvTime = curTime - log.time;
1249       num --;
1250     }
1251     idx--;
1252   }
1253 }
1254
1255 void TraceProjections::beginExecute(CmiObjId *tid)
1256 {
1257 #if CMK_HAS_COUNTER_PAPI
1258   if (PAPI_read(papiEventSet, papiValues) != PAPI_OK) {
1259     CmiAbort("PAPI failed to read at begin execute!\n");
1260   }
1261 #endif
1262   if (checknested && inEntry) CmiAbort("Nested Begin Execute!\n");
1263   execEvent = CtvAccess(curThreadEvent);
1264   execEp = (-1);
1265   _logPool->add(BEGIN_PROCESSING,ForChareMsg,_threadEP,TraceTimer(),
1266                 execEvent,CkMyPe(), 0, tid);
1267 #if CMK_HAS_COUNTER_PAPI
1268   _logPool->addPapi(numPAPIEvents, papiEvents, papiValues);
1269 #endif
1270   inEntry = 1;
1271 }
1272
1273 void TraceProjections::beginExecute(envelope *e)
1274 {
1275   if(e==0) {
1276 #if CMK_HAS_COUNTER_PAPI
1277     if (PAPI_read(papiEventSet, papiValues) != PAPI_OK) {
1278       CmiAbort("PAPI failed to read at begin execute!\n");
1279     }
1280 #endif
1281     if (checknested && inEntry) CmiAbort("Nested Begin Execute!\n");
1282     execEvent = CtvAccess(curThreadEvent);
1283     execEp = (-1);
1284     _logPool->add(BEGIN_PROCESSING,ForChareMsg,_threadEP,TraceTimer(),
1285                   execEvent,CkMyPe(), 0, NULL, 0.0, TraceCpuTimer());
1286 #if CMK_HAS_COUNTER_PAPI
1287     _logPool->addPapi(numPAPIEvents, papiEvents, papiValues);
1288 #endif
1289     inEntry = 1;
1290   } else {
1291     beginExecute(e->getEvent(),e->getMsgtype(),e->getEpIdx(),
1292                  e->getSrcPe(),e->getTotalsize());
1293   }
1294 }
1295
1296 void TraceProjections::beginExecute(int event, int msgType, int ep, int srcPe,
1297                                     int mlen, CmiObjId *idx)
1298 {
1299   if (traceNestedEvents) {
1300     if (! nestedEvents.isEmpty()) {
1301       endExecuteLocal();
1302     }
1303     nestedEvents.enq(NestedEvent(event, msgType, ep, srcPe, mlen, idx));
1304   }
1305   beginExecuteLocal(event, msgType, ep, srcPe, mlen, idx);
1306 }
1307
1308 void TraceProjections::beginExecuteLocal(int event, int msgType, int ep, int srcPe,
1309                                     int mlen, CmiObjId *idx)
1310 {
1311 #if CMK_HAS_COUNTER_PAPI
1312   if (PAPI_read(papiEventSet, papiValues) != PAPI_OK) {
1313     CmiAbort("PAPI failed to read at begin execute!\n");
1314   }
1315 #endif
1316   if (checknested && inEntry) CmiAbort("Nested Begin Execute!\n");
1317   execEvent=event;
1318   execEp=ep;
1319   execPe=srcPe;
1320   _logPool->add(BEGIN_PROCESSING,msgType,ep,TraceTimer(),event,
1321                 srcPe, mlen, idx, 0.0, TraceCpuTimer());
1322 #if CMK_HAS_COUNTER_PAPI
1323   _logPool->addPapi(numPAPIEvents, papiEvents, papiValues);
1324 #endif
1325   inEntry = 1;
1326 }
1327
1328 void TraceProjections::endExecute(void)
1329 {
1330   if (traceNestedEvents) nestedEvents.deq();
1331   endExecuteLocal();
1332   if (traceNestedEvents) {
1333     if (! nestedEvents.isEmpty()) {
1334       NestedEvent &ne = nestedEvents.peek();
1335       beginExecuteLocal(ne.event, ne.msgType, ne.ep, ne.srcPe, ne.ml, ne.idx);
1336     }
1337   }
1338 }
1339
1340 void TraceProjections::endExecuteLocal(void)
1341 {
1342 #if CMK_HAS_COUNTER_PAPI
1343   if (PAPI_read(papiEventSet, papiValues) != PAPI_OK) {
1344     CmiAbort("PAPI failed to read at end execute!\n");
1345   }
1346 #endif
1347   if (checknested && !inEntry) CmiAbort("Nested EndExecute!\n");
1348   double cputime = TraceCpuTimer();
1349   if(execEp == (-1)) {
1350     _logPool->add(END_PROCESSING, 0, _threadEP, TraceTimer(),
1351                   execEvent, CkMyPe(), 0, NULL, 0.0, cputime);
1352   } else {
1353     _logPool->add(END_PROCESSING, 0, execEp, TraceTimer(),
1354                   execEvent, execPe, 0, NULL, 0.0, cputime);
1355   }
1356 #if CMK_HAS_COUNTER_PAPI
1357   _logPool->addPapi(numPAPIEvents, papiEvents, papiValues);
1358 #endif
1359   inEntry = 0;
1360 }
1361
1362 void TraceProjections::messageRecv(char *env, int pe)
1363 {
1364 #if 0
1365   envelope *e = (envelope *)env;
1366   int msgType = e->getMsgtype();
1367   int ep = e->getEpIdx();
1368 #if 0
1369   if (msgType==NewChareMsg || msgType==NewVChareMsg
1370           || msgType==ForChareMsg || msgType==ForVidMsg
1371           || msgType==BocInitMsg || msgType==NodeBocInitMsg
1372           || msgType==ForBocMsg || msgType==ForNodeBocMsg)
1373     ep = e->getEpIdx();
1374   else
1375     ep = _threadEP;
1376 #endif
1377   _logPool->add(MESSAGE_RECV, msgType, ep, TraceTimer(),
1378                 curevent++, e->getSrcPe(), e->getTotalsize());
1379 #endif
1380 }
1381
1382 void TraceProjections::beginIdle(double curWallTime)
1383 {
1384   _logPool->add(BEGIN_IDLE, 0, 0, TraceTimer(curWallTime), 0, CkMyPe());
1385 }
1386
1387 void TraceProjections::endIdle(double curWallTime)
1388 {
1389   _logPool->add(END_IDLE, 0, 0, TraceTimer(curWallTime), 0, CkMyPe());
1390 }
1391
1392 void TraceProjections::beginPack(void)
1393 {
1394   _logPool->add(BEGIN_PACK, 0, 0, TraceTimer(), 0, CkMyPe());
1395 }
1396
1397 void TraceProjections::endPack(void)
1398 {
1399   _logPool->add(END_PACK, 0, 0, TraceTimer(), 0, CkMyPe());
1400 }
1401
1402 void TraceProjections::beginUnpack(void)
1403 {
1404   _logPool->add(BEGIN_UNPACK, 0, 0, TraceTimer(), 0, CkMyPe());
1405 }
1406
1407 void TraceProjections::endUnpack(void)
1408 {
1409   _logPool->add(END_UNPACK, 0, 0, TraceTimer(), 0, CkMyPe());
1410 }
1411
1412 void TraceProjections::enqueue(envelope *) {}
1413
1414 void TraceProjections::dequeue(envelope *) {}
1415
1416 void TraceProjections::beginComputation(void)
1417 {
1418   computationStarted = 1;
1419
1420   // Executes the callback function provided by the machine
1421   // layer. This is the proper method to register user events in a
1422   // machine layer because projections is a charm++ module.
1423   if (CkpvAccess(traceOnPe) != 0) {
1424     void (*ptr)() = registerMachineUserEvents();
1425     if (ptr != NULL) {
1426       ptr();
1427     }
1428   }
1429 //  CkpvAccess(traceInitTime) = TRACE_TIMER();
1430 //  CkpvAccess(traceInitCpuTime) = TRACE_CPUTIMER();
1431   _logPool->add(BEGIN_COMPUTATION, 0, 0, TraceTimer(), -1, -1);
1432 #if CMK_HAS_COUNTER_PAPI
1433   // we start the counters here
1434   if (PAPI_start(papiEventSet) != PAPI_OK) {
1435     CmiAbort("PAPI failed to start designated counters!\n");
1436   }
1437 #endif
1438 }
1439
1440 void TraceProjections::endComputation(void)
1441 {
1442 #if CMK_HAS_COUNTER_PAPI
1443   // we stop the counters here. A silent failure is alright since we
1444   // are already at the end of the program.
1445   if (PAPI_stop(papiEventSet, papiValues) != PAPI_OK) {
1446     CkPrintf("Warning: PAPI failed to stop correctly!\n");
1447   }
1448   // NOTE: We should not do a complete close of PAPI until after the
1449   // sts writer is done.
1450 #endif
1451   endTime = TraceTimer();
1452   _logPool->add(END_COMPUTATION, 0, 0, endTime, -1, -1);
1453   /*
1454   CkPrintf("End Computation [%d] records time as %lf\n", CkMyPe(), 
1455            endTime*1e06);
1456   */
1457 }
1458
1459 int TraceProjections::idxRegistered(int idx)
1460 {
1461     int idxVecLen = idxVec.size();
1462     for(int i=0; i<idxVecLen; i++)
1463     {
1464         if(idx == idxVec[i])
1465             return 1;
1466     }
1467     return 0;
1468 }
1469
1470 void TraceProjections::regFunc(const char *name, int &idx, int idxSpecifiedByUser){
1471     StrKey k((char*)name,strlen(name));
1472     int num = funcHashtable.get(k);
1473     
1474     if(num!=0) {
1475         return;
1476         //as for mpi programs, the same function may be registered for several times
1477         //CmiError("\"%s has been already registered! Please change the name!\"\n", name);
1478     }
1479     
1480     int isIdxExisting=0;
1481     if(idxSpecifiedByUser)
1482         isIdxExisting=idxRegistered(idx);
1483     if(isIdxExisting){
1484         return;
1485         //same reason with num!=0
1486         //CmiError("The identifier %d for the trace function has been already registered!", idx);
1487     }
1488
1489     if(idxSpecifiedByUser) {
1490         char *st = new char[strlen(name)+1];
1491         memcpy(st,name,strlen(name)+1);
1492         StrKey *newKey = new StrKey(st,strlen(st));
1493         int &ref = funcHashtable.put(*newKey);
1494         ref=idx;
1495         funcCount++;
1496         idxVec.push_back(idx);  
1497     } else {
1498         char *st = new char[strlen(name)+1];
1499         memcpy(st,name,strlen(name)+1);
1500         StrKey *newKey = new StrKey(st,strlen(st));
1501         int &ref = funcHashtable.put(*newKey);
1502         ref=funcCount;
1503         num = funcCount;
1504         funcCount++;
1505         idx = num;
1506         idxVec.push_back(idx);
1507     }
1508 }
1509
1510 void TraceProjections::beginFunc(char *name,char *file,int line){
1511         StrKey k(name,strlen(name));    
1512         unsigned short  num = (unsigned short)funcHashtable.get(k);
1513         beginFunc(num,file,line);
1514 }
1515
1516 void TraceProjections::beginFunc(int idx,char *file,int line){
1517         if(idx <= 0){
1518                 CmiError("Unregistered function id %d being used in %s:%d \n",idx,file,line);
1519         }       
1520         _logPool->add(BEGIN_FUNC,TraceTimer(),idx,line,file);
1521 }
1522
1523 void TraceProjections::endFunc(char *name){
1524         StrKey k(name,strlen(name));    
1525         int num = funcHashtable.get(k);
1526         endFunc(num);   
1527 }
1528
1529 void TraceProjections::endFunc(int num){
1530         if(num <= 0){
1531                 printf("endFunc without start :O\n");
1532         }
1533         _logPool->add(END_FUNC,TraceTimer(),num,0,NULL);
1534 }
1535
1536 // specialized PUP:ers for handling trace projections logs
1537 void toProjectionsFile::bytes(void *p,int n,size_t itemSize,dataType t)
1538 {
1539   for (int i=0;i<n;i++) 
1540     switch(t) {
1541     case Tchar: CheckAndFPrintF(f,"%c",((char *)p)[i]); break;
1542     case Tuchar:
1543     case Tbyte: CheckAndFPrintF(f,"%d",((unsigned char *)p)[i]); break;
1544     case Tshort: CheckAndFPrintF(f," %d",((short *)p)[i]); break;
1545     case Tushort: CheckAndFPrintF(f," %u",((unsigned short *)p)[i]); break;
1546     case Tint: CheckAndFPrintF(f," %d",((int *)p)[i]); break;
1547     case Tuint: CheckAndFPrintF(f," %u",((unsigned int *)p)[i]); break;
1548     case Tlong: CheckAndFPrintF(f," %ld",((long *)p)[i]); break;
1549     case Tulong: CheckAndFPrintF(f," %lu",((unsigned long *)p)[i]); break;
1550     case Tfloat: CheckAndFPrintF(f," %.7g",((float *)p)[i]); break;
1551     case Tdouble: CheckAndFPrintF(f," %.15g",((double *)p)[i]); break;
1552 #ifdef CMK_PUP_LONG_LONG
1553     case Tlonglong: CheckAndFPrintF(f," %lld",((CMK_TYPEDEF_INT8 *)p)[i]); break;
1554     case Tulonglong: CheckAndFPrintF(f," %llu",((CMK_TYPEDEF_UINT8 *)p)[i]); break;
1555 #endif
1556     default: CmiAbort("Unrecognized pup type code!");
1557     };
1558 }
1559
1560 void fromProjectionsFile::bytes(void *p,int n,size_t itemSize,dataType t)
1561 {
1562   for (int i=0;i<n;i++) 
1563     switch(t) {
1564     case Tchar: { 
1565       char c = fgetc(f);
1566       if (c==EOF)
1567         parseError("Could not match character");
1568       else
1569         ((char *)p)[i] = c;
1570       break;
1571     }
1572     case Tuchar:
1573     case Tbyte: ((unsigned char *)p)[i]=(unsigned char)readInt("%d"); break;
1574     case Tshort:((short *)p)[i]=(short)readInt(); break;
1575     case Tushort: ((unsigned short *)p)[i]=(unsigned short)readUint(); break;
1576     case Tint:  ((int *)p)[i]=readInt(); break;
1577     case Tuint: ((unsigned int *)p)[i]=readUint(); break;
1578     case Tlong: ((long *)p)[i]=readInt(); break;
1579     case Tulong:((unsigned long *)p)[i]=readUint(); break;
1580     case Tfloat: ((float *)p)[i]=(float)readDouble(); break;
1581     case Tdouble:((double *)p)[i]=readDouble(); break;
1582 #ifdef CMK_PUP_LONG_LONG
1583     case Tlonglong: ((CMK_TYPEDEF_INT8 *)p)[i]=readLongInt(); break;
1584     case Tulonglong: ((CMK_TYPEDEF_UINT8 *)p)[i]=readLongInt(); break;
1585 #endif
1586     default: CmiAbort("Unrecognized pup type code!");
1587     };
1588 }
1589
1590 #if CMK_PROJECTIONS_USE_ZLIB
1591 void toProjectionsGZFile::bytes(void *p,int n,size_t itemSize,dataType t)
1592 {
1593   for (int i=0;i<n;i++) 
1594     switch(t) {
1595     case Tchar: gzprintf(f,"%c",((char *)p)[i]); break;
1596     case Tuchar:
1597     case Tbyte: gzprintf(f,"%d",((unsigned char *)p)[i]); break;
1598     case Tshort: gzprintf(f," %d",((short *)p)[i]); break;
1599     case Tushort: gzprintf(f," %u",((unsigned short *)p)[i]); break;
1600     case Tint: gzprintf(f," %d",((int *)p)[i]); break;
1601     case Tuint: gzprintf(f," %u",((unsigned int *)p)[i]); break;
1602     case Tlong: gzprintf(f," %ld",((long *)p)[i]); break;
1603     case Tulong: gzprintf(f," %lu",((unsigned long *)p)[i]); break;
1604     case Tfloat: gzprintf(f," %.7g",((float *)p)[i]); break;
1605     case Tdouble: gzprintf(f," %.15g",((double *)p)[i]); break;
1606 #ifdef CMK_PUP_LONG_LONG
1607     case Tlonglong: gzprintf(f," %lld",((CMK_TYPEDEF_INT8 *)p)[i]); break;
1608     case Tulonglong: gzprintf(f," %llu",((CMK_TYPEDEF_UINT8 *)p)[i]); break;
1609 #endif
1610     default: CmiAbort("Unrecognized pup type code!");
1611     };
1612 }
1613 #endif
1614
1615 void TraceProjections::endPhase() {
1616   double currentPhaseTime = TraceTimer();
1617   double lastPhaseTime = 0.0;
1618
1619   if (lastPhaseEvent != NULL) {
1620     lastPhaseTime = lastPhaseEvent->time;
1621   } else {
1622     if (_logPool->pool != NULL) {
1623       // assumed to be BEGIN_COMPUTATION
1624       lastPhaseTime = _logPool->pool[0].time;
1625     } else {
1626       CkPrintf("[%d] Warning: End Phase encountered in an empty log. Inserting BEGIN_COMPUTATION event\n", CkMyPe());
1627       _logPool->add(BEGIN_COMPUTATION, 0, 0, currentPhaseTime, -1, -1);
1628       lastPhaseTime = currentPhaseTime;
1629     }
1630   }
1631
1632   /* Insert endPhase event here. */
1633   /* FIXME: Format should be TYPE, PHASE#, TimeStamp, [StartTime] */
1634   /*        We currently "borrow" from the standard add() method. */
1635   /*        It should really be its own add() method.             */
1636   /* NOTE: assignment to lastPhaseEvent is "pre-emptive".         */
1637   lastPhaseEvent = &(_logPool->pool[_logPool->numEntries]);
1638   _logPool->add(END_PHASE, 0, currentPhaseID, currentPhaseTime, -1, CkMyPe());
1639   currentPhaseID++;
1640 }
1641
1642 #ifdef PROJ_ANALYSIS
1643 // ***** FROM HERE, ALL BOC-BASED FUNCTIONALITY IS DEFINED *******
1644
1645
1646 // ***@@@@ REGISTRATION FUNCTIONS/METHODS @@@@***
1647
1648 void registerOutlierReduction() {
1649   outlierReductionType =
1650     CkReduction::addReducer(outlierReduction);
1651   minMaxReductionType =
1652     CkReduction::addReducer(minMaxReduction);
1653 }
1654
1655 /**
1656  * **IMPT NOTES**:
1657  *
1658  * This is the C++ code that is registered to be activated at module
1659  * shutdown. This is called exactly once on processor 0. Module shutdown
1660  * is initiated as a result of a CkExit() call by the application code
1661  * 
1662  * The exit function must ultimately call CkExit() again to
1663  * so that other module exit functions may proceed after this module is
1664  * done.
1665  *
1666  */
1667 // FIXME: WHY extern "C"???
1668 extern "C" void TraceProjectionsExitHandler()
1669 {
1670 #if ! CMK_TRACE_DISABLED
1671   // CkPrintf("[%d] TraceProjectionsExitHandler called!\n", CkMyPe());
1672   CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1673   bocProxy.traceProjectionsParallelShutdown(CkMyPe());
1674 #else
1675   CkExit();
1676 #endif
1677 }
1678
1679 // This is called once on each processor but the idiom of use appears
1680 // to be to only have processor 0 register the function.
1681 //
1682 // See initnode in trace-projections.ci
1683 void initTraceProjectionsBOC()
1684 {
1685   // CkPrintf("[%d] Trace Projections initialization called!\n", CkMyPe());
1686 #ifdef __BLUEGENE__
1687   if (BgNodeRank() == 0) {
1688 #else
1689     if (CkMyRank() == 0) {
1690 #endif
1691       registerExitFn(TraceProjectionsExitHandler);
1692     }
1693 #if 0
1694   } // this is so indentation does not get messed up
1695 #endif
1696 }
1697
1698 // mainchare for trace-projections BOC-operations. 
1699 // Instantiated at processor 0 and ONLY resides on processor 0 for the 
1700 // rest of its life.
1701 //
1702 // Responsible for:
1703 //   1. Handling commandline arguments
1704 //   2. Creating any objects required for proper BOC operations.
1705 //
1706 TraceProjectionsInit::TraceProjectionsInit(CkArgMsg *msg) {
1707   /** Options for Outlier Analysis */
1708   // defaults. Things will change with support for interactive analysis.
1709   bool findOutliers = false;
1710   bool outlierAutomatic = true;
1711   int numKSeeds = 10; 
1712
1713   int peNumKeep = CkNumPes();  // used as a default
1714   double entryThreshold = 0.0;
1715   bool outlierUsePhases = false;
1716   if (outlierAutomatic) {
1717     CmiGetArgIntDesc(msg->argv, "+outlierNumSeeds", &numKSeeds,
1718                      "Number of cluster seeds to apply at outlier analysis.");
1719     CmiGetArgIntDesc(msg->argv, "+outlierPeNumKeep", 
1720                      &peNumKeep, "Number of Processors to retain data");
1721     CmiGetArgDoubleDesc(msg->argv, "+outlierEpThresh", &entryThreshold,
1722                         "Minimum significance of entry points to be considered for clustering (%).");
1723     findOutliers =
1724       CmiGetArgFlagDesc(msg->argv,"+outlier", "Find Outliers.");
1725     outlierUsePhases = 
1726       CmiGetArgFlagDesc(msg->argv,"+outlierUsePhases",
1727                         "Apply automatic outlier analysis to any available phases.");
1728     if (outlierUsePhases) {
1729       // if the user wants to use an outlier feature, it is assumed outlier
1730       //    analysis is desired.
1731       findOutliers = true;
1732     }
1733   }
1734   traceProjectionsGID = CProxy_TraceProjectionsBOC::ckNew(findOutliers);
1735   if (findOutliers) {
1736     kMeansGID = CProxy_KMeansBOC::ckNew(outlierAutomatic,
1737                                         numKSeeds,
1738                                         peNumKeep,
1739                                         entryThreshold,
1740                                         outlierUsePhases);
1741   }
1742 }
1743
1744 // Called on every processor.
1745 void TraceProjectionsBOC::traceProjectionsParallelShutdown(int pe) {
1746   //CmiPrintf("[%d] traceProjectionsParallelShutdown called from . \n", CkMyPe(), pe);
1747   endPe = pe;                // the pe that starts CkExit()
1748   if (CkMyPe() == 0) {
1749     analysisStartTime = CmiWallTimer();
1750   }
1751   CkpvAccess(_trace)->endComputation();
1752   // no more tracing for projections on this processor after this point. 
1753   // Note that clear must be called after remove, or bad things will happen.
1754   CkpvAccess(_traces)->removeTrace(CkpvAccess(_trace));
1755   CkpvAccess(_traces)->clearTrace();
1756
1757   // From this point, we start multiple chains of reductions and broadcasts to
1758   // perform final online analysis activities.
1759
1760   // Start all parallel operations at once. 
1761   //   These MUST NOT modify base performance data in LogPool. If they must,
1762   //   then the parallel operations must be phased (and this code to be
1763   //   restructured as necessary)
1764   CProxy_KMeansBOC kMeansProxy(kMeansGID);
1765   CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1766   if (findOutliers) {
1767     parModulesRemaining++;
1768     kMeansProxy[CkMyPe()].startKMeansAnalysis();
1769   }
1770   parModulesRemaining++;
1771   bocProxy[CkMyPe()].startEndTimeAnalysis();
1772 }
1773
1774 // Called on each processor
1775 void KMeansBOC::startKMeansAnalysis() {
1776   // Initialize all necessary structures
1777   LogPool *pool = CkpvAccess(_trace)->_logPool;
1778
1779  if(CkMyPe()==0)     CkPrintf("[%d] KMeansBOC::startKMeansAnalysis time=\t%g\n", CkMyPe(), CkWallTimer() );
1780   int flushInt = 0;
1781   if (pool->hasFlushed) {
1782     flushInt = 1;
1783   }
1784   
1785   CkCallback cb(CkIndex_KMeansBOC::flushCheck(NULL), 
1786                 0, thisProxy);
1787   contribute(sizeof(int), &flushInt, CkReduction::logical_or, cb);  
1788 }
1789
1790 // Called on processor 0
1791 void KMeansBOC::flushCheck(CkReductionMsg *msg) {
1792   int someFlush = *((int *)msg->getData());
1793
1794   // if(CkMyPe()==0) CkPrintf("[%d] KMeansBOC::flushCheck time=\t%g\n", CkMyPe(), CkWallTimer() );
1795   
1796   if (someFlush == 0) {
1797     // Data intact proceed with KMeans analysis
1798     CProxy_KMeansBOC kMeansProxy(kMeansGID);
1799     kMeansProxy.flushCheckDone();
1800   } else {
1801     // Some processor had flushed it data at some point, abandon KMeans
1802     CkPrintf("Warning: Some processor has flushed its data. No KMeans will be conducted\n");
1803     // terminate KMeans
1804     CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1805     bocProxy[0].kMeansDone();
1806   }
1807 }
1808
1809 // Called on each processor
1810 void KMeansBOC::flushCheckDone() {
1811   // **FIXME** - more flexible metric collection scheme may be necessary
1812   //   in the future for production use.
1813   LogPool *pool = CkpvAccess(_trace)->_logPool;
1814
1815   // if(CkMyPe()==0)     CkPrintf("[%d] KMeansBOC::flushCheckDone time=\t%g\n", CkMyPe(), CkWallTimer() );
1816
1817   numEntryMethods = _entryTable.size();
1818   numMetrics = numEntryMethods + 2; // EPtime + idle and overhead
1819
1820   // maintained across phases
1821   markedBegin = false;
1822   markedIdle = false;
1823   beginBlockTime = 0.0;
1824   beginIdleBlockTime = 0.0;
1825   lastBeginEPIdx = -1; // none
1826
1827   lastPhaseIdx = 0;
1828   currentExecTimes = NULL;
1829   currentPhase = 0;
1830   selected = false;
1831
1832   pool->initializePhases();
1833
1834   // incoming K Seeds and the per-phase filter
1835   incKSeeds = new double[numK*numMetrics];
1836   keepMetric = new bool[numMetrics];
1837
1838   //  Something wrong when call thisProxy[CkMyPe()].getNextPhaseMetrics() !??!
1839   //  CProxy_KMeansBOC kMeansProxy(kMeansGID);
1840   //  kMeansProxy[CkMyPe()].getNextPhaseMetrics();
1841   thisProxy[CkMyPe()].getNextPhaseMetrics();
1842 }
1843
1844 // Called on each processor.
1845 void KMeansBOC::getNextPhaseMetrics() {
1846   // Assumes the presence of the complete logs on this processor.
1847   // Assumes first event is always BEGIN_COMPUTATION
1848   // Assumes each processor sees the same number of phases.
1849   //
1850   // In this code, we collect performance data for this processor.
1851   // All times are in seconds.
1852
1853   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::getNextPhaseMetrics time=\t%g\n", CkMyPe(), CkWallTimer() );  
1854
1855   if (usePhases) {
1856     DEBUGF("[%d] Using Phases\n", CkMyPe());
1857   } else {
1858     DEBUGF("[%d] NOT using Phases\n", CkMyPe());
1859   }
1860   
1861   if (currentExecTimes != NULL) {
1862     delete [] currentExecTimes;
1863   }
1864   currentExecTimes = new double[numMetrics];
1865   for (int i=0; i<numMetrics; i++) {
1866     currentExecTimes[i] = 0.0;
1867   }
1868
1869   int numEventMethods = _entryTable.size();
1870   LogPool *pool = CkpvAccess(_trace)->_logPool;
1871   
1872   CkAssert(pool->numEntries > lastPhaseIdx);
1873   double startPhaseTime = pool->pool[lastPhaseIdx].time;
1874   double totalPhaseTime = 0.0;
1875   double totalActiveTime = 0.0; // entry method + idle
1876
1877   for (int i=lastPhaseIdx; i<pool->numEntries; i++) {
1878     if (pool->pool[i].type == BEGIN_PROCESSING) {
1879       // check pairing
1880       if (!markedBegin) {
1881         markedBegin = true;
1882       }
1883       beginBlockTime = pool->pool[i].time;
1884       lastBeginEPIdx = pool->pool[i].eIdx;
1885     } else if (pool->pool[i].type == END_PROCESSING) {
1886       // check pairing
1887       // if End without a begin, just ignore
1888       //   this event. If a phase-boundary is crossed, the Begin
1889       //   event would be maintained in beginBlockTime, so it is 
1890       //   not a problem.
1891       if (markedBegin) {
1892         markedBegin = false;
1893         if (pool->pool[i].event < 0)
1894         {
1895           // ignore dummy events. **FIXME** as they have no eIdx?
1896           continue;
1897         }
1898         currentExecTimes[pool->pool[i].eIdx] += 
1899           pool->pool[i].time - beginBlockTime;
1900         totalActiveTime += pool->pool[i].time - beginBlockTime;
1901         lastBeginEPIdx = -1;
1902       }
1903     } else if (pool->pool[i].type == BEGIN_IDLE) {
1904       // check pairing
1905       if (!markedIdle) {
1906         markedIdle = true;
1907       }
1908       beginIdleBlockTime = pool->pool[i].time;
1909     } else if (pool->pool[i].type == END_IDLE) {
1910       // check pairing
1911       if (markedIdle) {
1912         markedIdle = false;
1913         currentExecTimes[numEventMethods] += 
1914           pool->pool[i].time - beginIdleBlockTime;
1915         totalActiveTime += pool->pool[i].time - beginIdleBlockTime;
1916       }
1917     } else if (pool->pool[i].type == END_PHASE) {
1918       // ignored when not using phases
1919       if (usePhases) {
1920         // when we've not visited this node before
1921         if (i != lastPhaseIdx) { 
1922           totalPhaseTime = 
1923             pool->pool[i].time - pool->pool[lastPhaseIdx].time;
1924           // it is important that proper accounting of time take place here.
1925           // Note that END_PHASE events inevitably occur in the context of
1926           //   some entry method by the way the tracing API is designed.
1927           if (markedBegin) {
1928             CkAssert(lastBeginEPIdx >= 0);
1929             currentExecTimes[lastBeginEPIdx] += 
1930               pool->pool[i].time - beginBlockTime;
1931             totalActiveTime += pool->pool[i].time - beginBlockTime;
1932             // this is so the remainder contributes to the next phase
1933             beginBlockTime = pool->pool[i].time;
1934           }
1935           // The following is unlikely, but stranger things have happened.
1936           if (markedIdle) {
1937             currentExecTimes[numEventMethods] +=
1938               pool->pool[i].time - beginIdleBlockTime;
1939             totalActiveTime += pool->pool[i].time - beginIdleBlockTime;
1940             // this is so the remainder contributes to the next phase
1941             beginIdleBlockTime = pool->pool[i].time;
1942           }
1943           if (totalActiveTime <= totalPhaseTime) {
1944             currentExecTimes[numEventMethods+1] = 
1945               totalPhaseTime - totalActiveTime;
1946           } else {
1947             currentExecTimes[numEventMethods+1] = 0.0;
1948             CkPrintf("[%d] Warning: Overhead found to be negative for Phase %d!\n",
1949                      CkMyPe(), currentPhase);
1950           }
1951           collectKMeansData();
1952           // end the loop (and method) and defer the work till the next call
1953           lastPhaseIdx = i;
1954           break; 
1955         }
1956       }
1957     } else if (pool->pool[i].type == END_COMPUTATION) {
1958       if (markedBegin) {
1959         CkAssert(lastBeginEPIdx >= 0);
1960         currentExecTimes[lastBeginEPIdx] += 
1961           pool->pool[i].time - beginBlockTime;
1962         totalActiveTime += pool->pool[i].time - beginBlockTime;
1963       }
1964       if (markedIdle) {
1965         currentExecTimes[numEventMethods] +=
1966           pool->pool[i].time - beginIdleBlockTime;
1967         totalActiveTime += pool->pool[i].time - beginIdleBlockTime;
1968       }
1969       totalPhaseTime = 
1970         pool->pool[i].time - pool->pool[lastPhaseIdx].time;
1971       if (totalActiveTime <= totalPhaseTime) {
1972         currentExecTimes[numEventMethods+1] = totalPhaseTime - totalActiveTime;
1973       } else {
1974         currentExecTimes[numEventMethods+1] = 0.0;
1975         CkPrintf("[%d] Warning: Overhead found to be negative!\n",
1976                  CkMyPe());
1977       }
1978       collectKMeansData();
1979     }
1980   }
1981 }
1982
1983 /**
1984  *  Through a reduction, collectKMeansData aggregates each processors' data
1985  *  in order for global properties to be determined:
1986  *  
1987  *  1. min & max to determine normalization factors.
1988  *  2. sum to determine global EP averages for possible metric reduction
1989  *       through thresholding.
1990  *  3. sum of squares to compute stddev which may be useful in the future.
1991  *
1992  *  collectKMeansData will also keep the processor's data for the current
1993  *    phase so that it may be normalized and worked on subsequently.
1994  *
1995  **/
1996 void KMeansBOC::collectKMeansData() {
1997   int minOffset = numMetrics;
1998   int maxOffset = 2*numMetrics;
1999   int sosOffset = 3*numMetrics; // sos = Sum Of Squares
2000
2001   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::collectKMeansData time=\tg\n", CkMyPe(), CkWallTimer() );
2002
2003   double *reductionMsg = new double[numMetrics*4];
2004
2005   for (int i=0; i<numMetrics; i++) {
2006     reductionMsg[i] = currentExecTimes[i];
2007     // duplicate the event times for max and min sections of the reduction
2008     reductionMsg[minOffset + i] = currentExecTimes[i];
2009     reductionMsg[maxOffset + i] = currentExecTimes[i];
2010     // compute squares
2011     reductionMsg[sosOffset + i] = currentExecTimes[i]*currentExecTimes[i];
2012   }
2013
2014   CkCallback cb(CkIndex_KMeansBOC::globalMetricRefinement(NULL), 
2015                 0, thisProxy);
2016   contribute((numMetrics*4)*sizeof(double), reductionMsg, 
2017              outlierReductionType, cb);  
2018 }
2019
2020 // The purpose is mainly to initialize the k seeds and generate
2021 //   normalization parameters for each of the metrics. The k seeds
2022 //   and normalization parameters are broadcast to all processors.
2023 //
2024 // Called on processor 0
2025 void KMeansBOC::globalMetricRefinement(CkReductionMsg *msg) {
2026   CkAssert(CkMyPe() == 0);
2027   
2028   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::globalMetricRefinement time=\t%g\n", CkMyPe(), CkWallTimer() );
2029
2030   int sumOffset = 0;
2031   int minOffset = numMetrics;
2032   int maxOffset = 2*numMetrics;
2033   int sosOffset = 3*numMetrics; // sos = Sum Of Squares
2034
2035   // calculate statistics & boundaries for the k seeds for clustering
2036   KMeansStatsMessage *outmsg = 
2037     new (numMetrics, numK*numMetrics, numMetrics*4) KMeansStatsMessage;
2038   outmsg->numMetrics = numMetrics;
2039   outmsg->numKPos = numK*numMetrics;
2040   outmsg->numStats = numMetrics*4;
2041
2042   // Sum | Min | Max | Sum of Squares
2043   double *totalExecTimes = (double *)msg->getData();
2044   double totalTime = 0.0;
2045
2046   for (int i=0; i<numMetrics; i++) {
2047     DEBUGN("%lf\n", totalExecTimes[i]);
2048     totalTime += totalExecTimes[i];
2049
2050     // calculate event mean over all processors
2051     outmsg->stats[sumOffset + i] = totalExecTimes[sumOffset + i]/CkNumPes();
2052
2053     // get the ranges and offsets of each metric. With this, we get
2054     //   normalization factors that can be sent back to each processor to
2055     //   be used as necessary. We reuse max for range. Min remains the offset.
2056     outmsg->stats[minOffset + i] = totalExecTimes[minOffset + i];
2057     outmsg->stats[maxOffset + i] = totalExecTimes[maxOffset + i] -
2058       totalExecTimes[minOffset + i];
2059     
2060     // calculate stddev (using biased variance)
2061     outmsg->stats[sosOffset + i] = 
2062       sqrt((totalExecTimes[sosOffset + i] - 
2063             2*(outmsg->stats[i])*totalExecTimes[i] +
2064             (outmsg->stats[i])*(outmsg->stats[i])*CkNumPes())/
2065            CkNumPes());
2066   }
2067
2068   for (int i=0; i<numMetrics; i++) {
2069     // 1) if the proportion of the max value of the entry method relative to
2070     //   the average time taken over all entry methods across all processors
2071     //   is greater than the stipulated percentage threshold ...; AND
2072     // 2) if the range of values are non-zero.
2073     //
2074     // The current assumption is totalTime > 0 (what program has zero total
2075     //   time from all work?)
2076     keepMetric[i] = ((totalExecTimes[maxOffset + i]/(totalTime/CkNumPes()) >=
2077                      entryThreshold) &&
2078       (totalExecTimes[maxOffset + i] > totalExecTimes[minOffset + i]));
2079     if (keepMetric[i]) {
2080       DEBUGF("[%d] Keep EP %d | Max = %lf | Avg Tot = %lf\n", CkMyPe(), i,
2081              totalExecTimes[maxOffset + i], totalTime/CkNumPes());
2082     } else {
2083       DEBUGN("[%d] DO NOT Keep EP %d\n", CkMyPe(), i);
2084     }
2085     outmsg->filter[i] = keepMetric[i];
2086   }
2087
2088   delete msg;
2089   
2090   // initialize k seeds for this phase
2091   kSeeds = new double[numK*numMetrics];
2092
2093   numKReported = 0;
2094   kNumMembers = new int[numK];
2095
2096   // Randomly select k processors' metric vectors for the k seeds
2097   //  srand((unsigned)(CmiWallTimer()*1.0e06));
2098   srand(11337); // for debugging purposes
2099   for (int k=0; k<numK; k++) {
2100     DEBUGF("Seed %d | ", k);
2101     for (int m=0; m<numMetrics; m++) {
2102       double factor = totalExecTimes[maxOffset + m] - 
2103         totalExecTimes[minOffset + m];
2104       // "uniform" distribution, scaled according to the normalization
2105       //   factors
2106       //      kSeeds[numMetrics*k + m] = ((1.0*(k+1))/numK)*factor;
2107       // Random distribution.
2108       kSeeds[numMetrics*k + m] =
2109         ((rand()*1.0)/RAND_MAX)*factor;
2110       if (keepMetric[m]) {
2111         DEBUGF("[%d|%lf] ", m, kSeeds[numMetrics*k + m]);
2112       }
2113       outmsg->kSeedsPos[numMetrics*k + m] = kSeeds[numMetrics*k + m];
2114     }
2115     DEBUGF("\n");
2116     kNumMembers[k] = 0;
2117   }
2118
2119   // broadcast statistical values to all processors for cluster discovery
2120   thisProxy.findInitialClusters(outmsg);
2121 }
2122
2123
2124
2125 // Called on each processor.
2126 void KMeansBOC::findInitialClusters(KMeansStatsMessage *msg) {
2127
2128  if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::findInitialClusters time=\t%g\n", CkMyPe(), CkWallTimer() );
2129
2130   phaseIter = 0;
2131
2132   // Get info from stats message
2133   CkAssert(numMetrics == msg->numMetrics);
2134   for (int i=0; i<numMetrics; i++) {
2135     keepMetric[i] = msg->filter[i];
2136   }
2137
2138   // Normalize data on local processor.
2139   // **CWL** See my thesis for detailed discussion of normalization of
2140   //    performance data.
2141   // **NOTE** This might change if we want to send data based on the filter
2142   //   instead of all the data.
2143   CkAssert(numMetrics*4 == msg->numStats);
2144   for (int i=0; i<numMetrics; i++) {
2145     currentExecTimes[i] -= msg->stats[numMetrics + i];  // take offset
2146     // **CWL** We do not normalize the range. Entry methods that exhibit
2147     //   large absolute timing variations should be allowed to contribute
2148     //   more to the Euclidean distance measure!
2149     // currentExecTimes[i] /= msg->stats[2*numMetrics + i];
2150   }
2151
2152   // **NOTE** This might change if we want to send data based on the filter
2153   //   instead of all the data.
2154   CkAssert(numK*numMetrics == msg->numKPos);
2155   for (int i=0; i<msg->numKPos; i++) {
2156     incKSeeds[i] = msg->kSeedsPos[i];
2157   }
2158
2159   // Decide which KSeed this processor belongs to.
2160   minDistance = calculateDistance(0);
2161   DEBUGN("[%d] Phase %d Iter %d | Distance from 0 = %lf \n", CkMyPe(), 
2162            currentPhase, phaseIter, minDistance);
2163   minK = 0;
2164   for (int i=1; i<numK; i++) {
2165     double distance = calculateDistance(i);
2166     DEBUGN("[%d] Phase %d Iter %d | Distance from %d = %lf \n", CkMyPe(), 
2167              currentPhase, phaseIter, i, distance);
2168     if (distance < minDistance) {
2169       minDistance = distance;
2170       minK = i;
2171     }
2172   }
2173
2174   // Set up a reduction with the modification vector to the root (0).
2175   //
2176   // The modification vector sends a negative value for each metric
2177   //   for the K this processor no longer belongs to and a positive
2178   //   value to the K the processor now belongs. In addition, a -1.0
2179   //   is sent to the K it is leaving and a +1.0 to the K it is 
2180   //   joining.
2181   //
2182   // The processor must still contribute a "zero returns" even if
2183   //   nothing changes. This will be the basis for determine
2184   //   convergence at the root.
2185   //
2186   // The addtional +1 is meant for the count-change that must be
2187   //   maintained for the special cases at the root when some K
2188   //   may be deprived of all processor points or go from 0 to a
2189   //   positive number of processors (see later comments).
2190   double *modVector = new double[numK*(numMetrics+1)];
2191   for (int i=0; i<numK; i++) {
2192     for (int j=0; j<numMetrics+1; j++) {
2193       modVector[i*(numMetrics+1) + j] = 0.0;
2194     }
2195   }
2196   for (int i=0; i<numMetrics; i++) {
2197     // for this initialization, only positive values need be sent.
2198     modVector[minK*(numMetrics+1) + i] = currentExecTimes[i];
2199   }
2200   modVector[minK*(numMetrics+1)+numMetrics] = 1.0;
2201
2202   CkCallback cb(CkIndex_KMeansBOC::updateKSeeds(NULL), 
2203                 0, thisProxy);
2204   contribute(numK*(numMetrics+1)*sizeof(double), modVector, 
2205              CkReduction::sum_double, cb);  
2206 }
2207
2208 double KMeansBOC::calculateDistance(int k) {
2209   double ret = 0.0;
2210   for (int i=0; i<numMetrics; i++) {
2211     if (keepMetric[i]) {
2212       DEBUGN("[%d] Phase %d Iter %d Metric %d Exec %lf Seed %lf \n", 
2213              CkMyPe(), currentPhase, phaseIter, i,
2214                currentExecTimes[i], incKSeeds[k*numMetrics + i]);
2215       ret += pow(currentExecTimes[i] - incKSeeds[k*numMetrics + i], 2.0);
2216     }
2217   }
2218   return sqrt(ret);
2219 }
2220
2221 void KMeansBOC::updateKSeeds(CkReductionMsg *msg) {
2222   CkAssert(CkMyPe() == 0);
2223
2224   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::updateKSeeds time=\t%g\n", CkMyPe(), CkWallTimer() );
2225
2226   double *modVector = (double *)msg->getData();
2227   // sanity check
2228   CkAssert(numK*(numMetrics+1)*sizeof(double) == msg->getSize());
2229
2230   // A quick convergence test.
2231   bool hasChanges = false;
2232   for (int i=0; i<numK; i++) {
2233     hasChanges = hasChanges || 
2234       (modVector[i*(numMetrics+1) + numMetrics] != 0.0);
2235   }
2236   if (!hasChanges) {
2237     delete msg;
2238     findRepresentatives();
2239   } else {
2240     int overallChange = 0;
2241     for (int i=0; i<numK; i++) {
2242       int change = (int)modVector[i*(numMetrics+1) + numMetrics];
2243       if (change != 0) {
2244         overallChange += change;
2245         // modify the k seeds based on the modification vectors coming in
2246         //
2247         // If a seed initially has no members, its contents do not matter and
2248         //   is simply set to the average of the incoming vector.
2249         // If the change causes a seed to lose all its members, do nothing.
2250         //   Its last-known location is kept to allow it to re-capture
2251         //   membership at the next iteration rather than apply the last
2252         //   changes (which snaps the point unnaturally to 0,0).
2253         // Otherwise, apply the appropriate vector changes.
2254         CkAssert((kNumMembers[i] + change >= 0) &&
2255                  (kNumMembers[i] + change <= CkNumPes()));
2256         if (kNumMembers[i] == 0) {
2257           CkAssert(change > 0);
2258           for (int j=0; j<numMetrics; j++) {
2259             kSeeds[i*numMetrics + j] = modVector[i*(numMetrics+1) + j]/change;
2260           }
2261         } else if (kNumMembers[i] + change == 0) {
2262           // do nothing.
2263         } else {
2264           for (int j=0; j<numMetrics; j++) {
2265             kSeeds[i*numMetrics + j] *= kNumMembers[i];
2266             kSeeds[i*numMetrics + j] += modVector[i*(numMetrics+1) + j];
2267             kSeeds[i*numMetrics + j] /= kNumMembers[i] + change;
2268           }
2269         }
2270         kNumMembers[i] += change;
2271       }
2272       DEBUGN("[%d] Phase %d Iter %d K = %d Membership Count = %d\n",
2273              CkMyPe(), currentPhase, phaseIter, i, kNumMembers[i]);
2274     }
2275     delete msg;
2276
2277     // broadcast the new seed locations.
2278     KSeedsMessage *outmsg = new (numK*numMetrics) KSeedsMessage;
2279     outmsg->numKPos = numK*numMetrics;
2280     for (int i=0; i<numK*numMetrics; i++) {
2281       outmsg->kSeedsPos[i] = kSeeds[i];
2282     }
2283
2284     thisProxy.updateSeedMembership(outmsg);
2285   }
2286 }
2287
2288 // Called on all processors
2289 void KMeansBOC::updateSeedMembership(KSeedsMessage *msg) {
2290
2291   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::updateSeedMembership time=\t%g\n", CkMyPe(), CkWallTimer() );
2292
2293   phaseIter++;
2294
2295   // **NOTE** This might change if we want to send data based on the filter
2296   //   instead of all the data.
2297   CkAssert(numK*numMetrics == msg->numKPos);
2298   for (int i=0; i<msg->numKPos; i++) {
2299     incKSeeds[i] = msg->kSeedsPos[i];
2300   }
2301
2302   // Decide which KSeed this processor belongs to.
2303   lastMinK = minK;
2304   minDistance = calculateDistance(0);
2305   DEBUGN("[%d] Phase %d Iter %d | Distance from 0 = %lf \n", CkMyPe(), 
2306          currentPhase, phaseIter, minDistance);
2307
2308   minK = 0;
2309   for (int i=1; i<numK; i++) {
2310     double distance = calculateDistance(i);
2311     DEBUGN("[%d] Phase %d Iter %d | Distance from %d = %lf \n", CkMyPe(), 
2312            currentPhase, phaseIter, i, distance);
2313     if (distance < minDistance) {
2314       minDistance = distance;
2315       minK = i;
2316     }
2317   }
2318
2319   double *modVector = new double[numK*(numMetrics+1)];
2320   for (int i=0; i<numK; i++) {
2321     for (int j=0; j<numMetrics+1; j++) {
2322       modVector[i*(numMetrics+1) + j] = 0.0;
2323     }
2324   }
2325
2326   if (minK != lastMinK) {
2327     for (int i=0; i<numMetrics; i++) {
2328       modVector[minK*(numMetrics+1) + i] = currentExecTimes[i];
2329       modVector[lastMinK*(numMetrics+1) + i] = -currentExecTimes[i];
2330     }
2331     modVector[minK*(numMetrics+1)+numMetrics] = 1.0;
2332     modVector[lastMinK*(numMetrics+1)+numMetrics] = -1.0;
2333   }
2334
2335   CkCallback cb(CkIndex_KMeansBOC::updateKSeeds(NULL), 
2336                 0, thisProxy);
2337   contribute(numK*(numMetrics+1)*sizeof(double), modVector, 
2338              CkReduction::sum_double, cb);  
2339 }
2340
2341 void KMeansBOC::findRepresentatives() {
2342
2343   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::findRepresentatives time=\t%g\n", CkMyPe(), CkWallTimer() );
2344
2345   int numNonEmptyClusters = 0;
2346   for (int i=0; i<numK; i++) {
2347     if (kNumMembers[i] > 0) {
2348       numNonEmptyClusters++;
2349     }
2350   }
2351
2352   int numRepresentatives = peNumKeep;
2353   // **FIXME**
2354   // This is fairly arbitrary. Next time, choose the centers of the top
2355   //   largest clusters.
2356   if (numRepresentatives < numNonEmptyClusters) {
2357     numRepresentatives = numNonEmptyClusters;
2358   }
2359
2360   int slotsRemaining = numRepresentatives;
2361
2362   DEBUGF("Slots = %d | Non-empty = %d \n", slotsRemaining, 
2363          numNonEmptyClusters);
2364
2365   // determine how many exemplars to select per cluster. Currently
2366   //   hardcoded to 1. Future challenge is to decide on other numbers
2367   //   or proportionality.
2368   //
2369   int exemplarsPerCluster = 1;
2370   slotsRemaining -= exemplarsPerCluster*numNonEmptyClusters;
2371
2372   int numCandidateOutliers = CkNumPes() - 
2373     exemplarsPerCluster*numNonEmptyClusters;
2374
2375   double *remainders = new double[numK];
2376   int *assigned = new int[numK];
2377   exemplarChoicesLeft = new int[numK];
2378   outlierChoicesLeft = new int[numK];
2379
2380   for (int i=0; i<numK; i++) {
2381     assigned[i] = 0;
2382     remainders[i] = 
2383       (kNumMembers[i] - exemplarsPerCluster*numNonEmptyClusters) *
2384       slotsRemaining / numCandidateOutliers;
2385     if (remainders[i] >= 0.0) {
2386       assigned[i] = (int)floor(remainders[i]);
2387       remainders[i] -= assigned[i];
2388     } else {
2389       remainders[i] = 0.0;
2390     }
2391   }
2392
2393   for (int i=0; i<numK; i++) {
2394     slotsRemaining -= assigned[i];
2395   }
2396   CkAssert(slotsRemaining >= 0);
2397
2398   // find clusters to assign the loose slots to, in order of
2399   // remainder proportion
2400   while (slotsRemaining > 0) {
2401     double max = 0.0;
2402     int winner = 0;
2403     for (int i=0; i<numK; i++) {
2404       if (remainders[i] > max) {
2405         max = remainders[i];
2406         winner = i;
2407       }
2408     }
2409     assigned[winner]++;
2410     remainders[winner] = 0.0;
2411     slotsRemaining--;
2412   }
2413
2414   // set up how many reduction cycles of min/max we need to conduct to
2415   // select the representatives.
2416   numSelectionIter = exemplarsPerCluster;
2417   for (int i=0; i<numK; i++) {
2418     if (assigned[i] > numSelectionIter) {
2419       numSelectionIter = assigned[i];
2420     }
2421   }
2422   DEBUGF("Selection Iterations = %d\n", numSelectionIter);
2423
2424   for (int i=0; i<numK; i++) {
2425     if (kNumMembers[i] > 0) {
2426       exemplarChoicesLeft[i] = exemplarsPerCluster;
2427       outlierChoicesLeft[i] = assigned[i];
2428     } else {
2429       exemplarChoicesLeft[i] = 0;
2430       outlierChoicesLeft[i] = 0;
2431     }
2432     DEBUGF("%d | Exemplar = %d | Outlier = %d\n", i, exemplarChoicesLeft[i],
2433            outlierChoicesLeft[i]);
2434   }
2435
2436   delete [] assigned;
2437   delete [] remainders;
2438
2439   // send out first broadcast
2440   KSelectionMessage *outmsg = NULL;
2441   if (numSelectionIter > 0) {
2442     outmsg = new (numK, numK, numK) KSelectionMessage;
2443     outmsg->numKMinIDs = numK;
2444     outmsg->numKMaxIDs = numK;
2445     for (int i=0; i<numK; i++) {
2446       outmsg->minIDs[i] = -1;
2447       outmsg->maxIDs[i] = -1;
2448     }
2449     thisProxy.collectDistances(outmsg);
2450   } else {
2451     CkPrintf("Warning: No selection iteration from the start!\n");
2452     // invoke phase completion on all processors
2453     thisProxy.phaseDone();
2454   }
2455 }
2456
2457 /*
2458  *  lastMin = array of minimum champions of the last tournament
2459  *  lastMax = array of maximum champions of the last tournament
2460  *  lastMaxVal = array of last encountered maximum values, allows previous
2461  *                 minimum winners to eliminate themselves from the next
2462  *                 minimum race.
2463  *
2464  *  Called on all processors.
2465  */
2466 void KMeansBOC::collectDistances(KSelectionMessage *msg) {
2467
2468   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::collectDistances time=\t%g\n", CkMyPe(), CkWallTimer() );
2469
2470   DEBUGF("[%d] %d | min = %d max = %d\n", CkMyPe(),
2471          lastMinK, msg->minIDs[lastMinK], msg->maxIDs[lastMinK]);
2472   if ((CkMyPe() == msg->minIDs[lastMinK]) || 
2473       (CkMyPe() == msg->maxIDs[lastMinK])) {
2474     CkAssert(!selected);
2475     selected = true;
2476   }
2477
2478   // build outgoing reduction structure
2479   //   format = minVal | ID | maxVal | ID
2480   double *minMaxAndIDs = NULL;
2481
2482   minMaxAndIDs = new double[numK*4];
2483   // initialize to the appropriate out-of-band values (for error checks)
2484   for (int i=0; i<numK; i++) {
2485     minMaxAndIDs[i*4] = -1.0; // out-of-band min value
2486     minMaxAndIDs[i*4+1] = -1.0; // out of band ID
2487     minMaxAndIDs[i*4+2] = -1.0; // out-of-band max value
2488     minMaxAndIDs[i*4+3] = -1.0; // out of band ID
2489   }
2490   // If I have not won before, I put myself back into the competition
2491   if (!selected) {
2492     DEBUGF("[%d] My Contribution = %lf\n", CkMyPe(), minDistance);
2493     minMaxAndIDs[lastMinK*4] = minDistance;
2494     minMaxAndIDs[lastMinK*4+1] = CkMyPe();
2495     minMaxAndIDs[lastMinK*4+2] = minDistance;
2496     minMaxAndIDs[lastMinK*4+3] = CkMyPe();
2497   }
2498   delete msg;
2499
2500   CkCallback cb(CkIndex_KMeansBOC::findNextMinMax(NULL), 
2501                 0, thisProxy);
2502   contribute(numK*4*sizeof(double), minMaxAndIDs, 
2503              minMaxReductionType, cb);  
2504 }
2505
2506 void KMeansBOC::findNextMinMax(CkReductionMsg *msg) {
2507   // incoming format:
2508   //   minVal | minID | maxVal | maxID
2509
2510   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::findNextMinMax time=\t%g\n", CkMyPe(), CkWallTimer() );
2511
2512   if (numSelectionIter > 0) {
2513     double *incInfo = (double *)msg->getData();
2514     
2515     KSelectionMessage *outmsg = new (numK, numK) KSelectionMessage;
2516     outmsg->numKMinIDs = numK;
2517     outmsg->numKMaxIDs = numK;
2518     
2519     for (int i=0; i<numK; i++) {
2520       DEBUGF("%d | %lf %d %lf %d \n", i, 
2521              incInfo[i*4], (int)incInfo[i*4+1], 
2522              incInfo[i*4+2], (int)incInfo[i*4+3]);
2523     }
2524
2525     for (int i=0; i<numK; i++) {
2526       if (exemplarChoicesLeft[i] > 0) {
2527         outmsg->minIDs[i] = (int)incInfo[i*4+1];
2528         exemplarChoicesLeft[i]--;
2529       } else {
2530         outmsg->minIDs[i] = -1;
2531       }
2532       if (outlierChoicesLeft[i] > 0) {
2533         outmsg->maxIDs[i] = (int)incInfo[i*4+3];
2534         outlierChoicesLeft[i]--;
2535       } else {
2536         outmsg->maxIDs[i] = -1;
2537       }
2538     }
2539     thisProxy.collectDistances(outmsg);
2540     numSelectionIter--;
2541   } else {
2542     // invoke phase completion on all processors
2543     thisProxy.phaseDone();
2544   }
2545 }
2546
2547 /**
2548  *  Completion of the K-Means clustering and data selection of one phase
2549  *    of the computation.
2550  *
2551  *  Called on every processor.
2552  */
2553 void KMeansBOC::phaseDone() {
2554
2555   //  if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::phaseDone time=\t%g\n", CkMyPe(), CkWallTimer() );
2556
2557   LogPool *pool = CkpvAccess(_trace)->_logPool;
2558   CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
2559
2560   // now decide on what to do with the decision.
2561   if (!selected) {
2562     if (usePhases) {
2563       pool->keepPhase[currentPhase] = false;
2564     } else {
2565       // if not using phases, we're working on the whole log
2566       pool->setAllPhases(false);
2567     }
2568   }
2569
2570   // **FIXME** (?) - All processors have to agree on this, or the reduction
2571   //   will not be correct! The question is "is this enforcible?"
2572   if ((currentPhase == (pool->numPhases-1)) || !usePhases) {
2573     // We're done
2574     int dummy = 0;
2575     CkCallback cb(CkIndex_TraceProjectionsBOC::kMeansDone(NULL), 
2576                   0, bocProxy);
2577     contribute(sizeof(int), &dummy, CkReduction::sum_int, cb);
2578   } else {
2579     // reset all phase-based k-means data and decisions
2580
2581     // **FIXME**!!!!!    
2582     
2583     // invoke the next K-Means computation phase.
2584     currentPhase++;
2585     thisProxy[CkMyPe()].getNextPhaseMetrics();
2586   }
2587 }
2588
2589 void TraceProjectionsBOC::startEndTimeAnalysis()
2590 {
2591  //CkPrintf("[%d] TraceProjectionsBOC::startEndTimeAnalysis time=\t%g\n", CkMyPe(), CkWallTimer() );
2592
2593   endTime = CkpvAccess(_trace)->endTime;
2594   // CkPrintf("[%d] End time is %lf us\n", CkMyPe(), endTime*1e06);
2595
2596   CkCallback cb(CkIndex_TraceProjectionsBOC::endTimeDone(NULL), 
2597                 0, thisProxy);
2598   contribute(sizeof(double), &endTime, CkReduction::max_double, cb);  
2599 }
2600
2601 void TraceProjectionsBOC::endTimeDone(CkReductionMsg *msg)
2602 {
2603  //if(CkMyPe()==0)    CkPrintf("[%d] TraceProjectionsBOC::endTimeDone time=\t%g parModulesRemaining:%d\n", CkMyPe(), CkWallTimer(), parModulesRemaining);
2604
2605   CkAssert(CkMyPe() == 0);
2606   parModulesRemaining--;
2607   if (CkpvAccess(_trace) != NULL) {
2608     CkpvAccess(_trace)->_logPool->globalEndTime = *(double *)msg->getData();
2609     // CkPrintf("End time determined to be %lf us\n",
2610     //       (CkpvAccess(_trace)->_logPool->globalEndTime)*1e06);
2611   }
2612   delete msg;
2613   if (parModulesRemaining == 0) {
2614     thisProxy[CkMyPe()].finalize();
2615   }
2616 }
2617
2618 void TraceProjectionsBOC::kMeansDone(CkReductionMsg *msg) {
2619
2620  if(CkMyPe()==0)  CkPrintf("[%d] TraceProjectionsBOC::kMeansDone time=\t%g\n", CkMyPe(), CkWallTimer() );
2621
2622   CkAssert(CkMyPe() == 0);
2623   parModulesRemaining--;
2624   CkPrintf("K-Means Analysis Time = %lf seconds\n",
2625            CmiWallTimer()-analysisStartTime);
2626   delete msg;
2627   if (parModulesRemaining == 0) {
2628     thisProxy[CkMyPe()].finalize();
2629   }
2630 }
2631
2632 /**
2633  *
2634  *  This version is called (on processor 0) only if flushCheck fails.
2635  *
2636  */
2637 void TraceProjectionsBOC::kMeansDone() {
2638   CkAssert(CkMyPe() == 0);
2639   parModulesRemaining--;
2640   CkPrintf("K-Means Analysis Aborted because of flush. Time taken = %lf seconds\n",
2641            CmiWallTimer()-analysisStartTime);
2642   if (parModulesRemaining == 0) {
2643     thisProxy[CkMyPe()].finalize();
2644   }
2645 }
2646
2647 void TraceProjectionsBOC::finalize()
2648 {
2649   CkAssert(CkMyPe() == 0);
2650   //CkPrintf("Total Analysis Time = %lf seconds\n", 
2651   //       CmiWallTimer()-analysisStartTime);
2652   thisProxy.closingTraces();
2653 }
2654
2655 // Called on every processor
2656 void TraceProjectionsBOC::closingTraces() {
2657   CkpvAccess(_trace)->closeTrace();
2658
2659     // subtle:  reduction needs to go to the PE which started CkExit()
2660   int pe = 0;
2661   if (endPe != -1) pe = endPe;
2662   CkCallback cb(CkIndex_TraceProjectionsBOC::closeParallelShutdown(NULL), 
2663                 pe, thisProxy); 
2664   contribute(0, NULL, CkReduction::sum_int, cb);  
2665 }
2666
2667 // The sole purpose of this reduction is to decide whether or not
2668 //   Projections as a module needs to call CkExit() to get other
2669 //   modules to shutdown.
2670 void TraceProjectionsBOC::closeParallelShutdown(CkReductionMsg *msg) {
2671   CkAssert(endPe == -1 && CkMyPe() ==0 || CkMyPe() == endPe);
2672   delete msg;
2673   // decide if CkExit() needs to be called
2674   if (!CkpvAccess(_trace)->converseExit) {
2675     CkExit();
2676   }
2677 }
2678 /*
2679  *  Registration and definition of the Outlier Reduction callback.
2680  *  Format: Sum | Min | Max | Sum of Squares
2681  */
2682 CkReductionMsg *outlierReduction(int nMsgs,
2683                                  CkReductionMsg **msgs) {
2684   int numBytes = 0;
2685   int numMetrics = 0;
2686   double *ret = NULL;
2687
2688   if (nMsgs == 1) {
2689     // nothing to do, just pass it on
2690     return CkReductionMsg::buildNew(msgs[0]->getSize(),msgs[0]->getData());
2691   }
2692
2693   if (nMsgs > 1) {
2694     numBytes = msgs[0]->getSize();
2695     // sanity checks
2696     if (numBytes%sizeof(double) != 0) {
2697       CkAbort("Outlier Reduction Size incompatible with doubles!\n");
2698     }
2699     if ((numBytes/sizeof(double))%4 != 0) {
2700       CkAbort("Outlier Reduction Size Array not divisible by 4!\n");
2701     }
2702     numMetrics = (numBytes/sizeof(double))/4;
2703     ret = new double[numMetrics*4];
2704
2705     // copy the first message data into the return structure first
2706     for (int i=0; i<numMetrics*4; i++) {
2707       ret[i] = ((double *)msgs[0]->getData())[i];
2708     }
2709
2710     // Sum | Min | Max | Sum of Squares
2711     for (int msgIdx=1; msgIdx<nMsgs; msgIdx++) {
2712       for (int i=0; i<numMetrics; i++) {
2713         // Sum
2714         ret[i] += ((double *)msgs[msgIdx]->getData())[i];
2715         // Min
2716         ret[numMetrics + i] =
2717           (ret[numMetrics + i] < 
2718            ((double *)msgs[msgIdx]->getData())[numMetrics + i]) 
2719           ? ret[numMetrics + i] : 
2720           ((double *)msgs[msgIdx]->getData())[numMetrics + i];
2721         // Max
2722         ret[2*numMetrics + i] = 
2723           (ret[2*numMetrics + i] >
2724            ((double *)msgs[msgIdx]->getData())[2*numMetrics + i])
2725           ? ret[2*numMetrics + i] :
2726           ((double *)msgs[msgIdx]->getData())[2*numMetrics + i];
2727         // Sum of Squares (squaring already done at leaf)
2728         ret[3*numMetrics + i] +=
2729           ((double *)msgs[msgIdx]->getData())[3*numMetrics + i];
2730       }
2731     }
2732   }
2733   
2734   /* apparently, we do not delete the incoming messages */
2735   return CkReductionMsg::buildNew(numBytes,ret);
2736 }
2737
2738 /*
2739  * The only reason we have a user-defined reduction is to support
2740  *   identification of the "winning" processors as well as to handle
2741  *   both the min and the max of each "tournament". A simple min/max
2742  *   discovery cannot handle ties.
2743  */
2744 CkReductionMsg *minMaxReduction(int nMsgs,
2745                                 CkReductionMsg **msgs) {
2746   CkAssert(nMsgs > 0);
2747
2748   int numBytes = msgs[0]->getSize();
2749   CkAssert(numBytes%sizeof(double) == 0);
2750   int numK = (numBytes/sizeof(double))/4;
2751
2752   double *ret = new double[numK*4];
2753   // fill with out-of-band values
2754   for (int i=0; i<numK; i++) {
2755     ret[i*4] = -1.0;
2756     ret[i*4+1] = -1.0;
2757     ret[i*4+2] = -1.0;
2758     ret[i*4+3] = -1.0;
2759   }
2760
2761   // incoming format K * (minVal | minIdx | maxVal | maxIdx)
2762   for (int i=0; i<nMsgs; i++) {
2763     double *temp = (double *)msgs[i]->getData();
2764     for (int j=0; j<numK; j++) {
2765       // no previous valid min
2766       if (ret[j*4+1] < 0) {
2767         // fill it in only if the incoming min is valid
2768         if (temp[j*4+1] >= 0) {
2769           ret[j*4] = temp[j*4];      // fill min value
2770           ret[j*4+1] = temp[j*4+1];  // fill ID
2771         }
2772       } else {
2773         // find Min, only if incoming min is valid
2774         if (temp[j*4+1] >= 0) {
2775           if (temp[j*4] < ret[j*4]) {
2776             ret[j*4] = temp[j*4];      // replace min value
2777             ret[j*4+1] = temp[j*4+1];  // replace ID
2778           }
2779         }
2780       }
2781       // no previous valid max
2782       if (ret[j*4+3] < 0) {
2783         // fill only if the incoming max is valid
2784         if (temp[j*4+3] >= 0) {
2785           ret[j*4+2] = temp[j*4+2];  // fill max value
2786           ret[j*4+3] = temp[j*4+3];  // fill ID
2787         }
2788       } else {
2789         // find Max, only if incoming max is valid
2790         if (temp[j*4+3] >= 0) {
2791           if (temp[j*4+2] > ret[j*4+2]) {
2792             ret[j*4+2] = temp[j*4+2];  // replace max value
2793             ret[j*4+3] = temp[j*4+3];  // replace ID
2794           }
2795         }
2796       }
2797     }
2798   }
2799   CkReductionMsg *redmsg = CkReductionMsg::buildNew(numBytes, ret);
2800   delete [] ret;
2801   return redmsg;
2802 }
2803
2804 #include "TraceProjections.def.h"
2805 #endif //PROJ_ANALYSIS
2806
2807 /*@}*/