Merged arrays into Charj mainline.
[charm.git] / src / ck-perf / trace-projections.C
1 /*****************************************************************************
2  * $Source: /cvsroot/charm/src/ck-perf/trace-projections.C,v $
3  * $Author: gioachin $
4  * $Date: 2009-08-20 01:09:41 $
5  * $Revision: 2.134 $
6  *****************************************************************************/
7
8
9 /**
10  * \addtogroup CkPerf
11 */
12 /*@{*/
13
14 #include <string.h>
15
16 #include "charm++.h"
17 #include "trace-projections.h"
18 #include "trace-projectionsBOC.h"
19
20 #if DEBUG_PROJ
21 #define DEBUGF(format, ...) CkPrintf(format, ## __VA_ARGS__)
22 #else
23 #define DEBUGF(format, ...)           // CmiPrintf x
24 #endif
25 #define DEBUGN(format, ...)  // easy way to selectively disable DEBUGs
26
27 #define DefaultLogBufSize      1000000
28
29 // **CW** Simple delta encoding implementation
30 // delta encoding is on by default. It may be turned off later in
31 // the runtime.
32 int deltaLog;
33 int nonDeltaLog;
34
35 int checknested=0;              // check illegal nested begin/end execute 
36
37 #ifdef PROJ_ANALYSIS
38 // BOC operations readonlys
39 CkGroupID traceProjectionsGID;
40 CkGroupID kMeansGID;
41
42 // New reduction type for Outlier Analysis purposes. This is allowed to be
43 // a global variable according to the Charm++ manual.
44 CkReductionMsg *outlierReduction(int nMsgs,
45                                  CkReductionMsg **msgs);
46 CkReductionMsg *minMaxReduction(int nMsgs,
47                                 CkReductionMsg **msgs);
48 CkReduction::reducerType outlierReductionType;
49 CkReduction::reducerType minMaxReductionType;
50 #endif // PROJ_ANALYSIS
51
52 CkpvStaticDeclare(TraceProjections*, _trace);
53 CtvStaticDeclare(int,curThreadEvent);
54
55 CkpvDeclare(CmiInt8, CtrLogBufSize);
56
57 typedef CkVec<char *>  usrEventVec;
58 CkpvStaticDeclare(usrEventVec, usrEventlist);
59 class UsrEvent {
60 public:
61   int e;
62   char *str;
63   UsrEvent(int _e, char* _s): e(_e),str(_s) {}
64 };
65 CkpvStaticDeclare(CkVec<UsrEvent *>*, usrEvents);
66
67 /// Disable the outputting of the trace logs
68 void disableTraceLogOutput()
69
70   CkpvAccess(_trace)->setWriteData(false);
71 }
72
73 /// Enable the outputting of the trace logs
74 void enableTraceLogOutput()
75 {
76   CkpvAccess(_trace)->setWriteData(true);
77 }
78
79 /// Force the log files to be flushed
80 void flushTraceLog()
81 {
82   CkpvAccess(_trace)->traceFlushLog();
83 }
84
85 #if ! CMK_TRACE_ENABLED
86 static int warned=0;
87 #define OPTIMIZED_VERSION       \
88         if (!warned) { warned=1;        \
89         CmiPrintf("\n\n!!!! Warning: traceUserEvent not available in optimized version!!!!\n\n\n"); }
90 #else
91 #define OPTIMIZED_VERSION /*empty*/
92 #endif // CMK_TRACE_ENABLED
93
94 /*
95 On T3E, we need to have file number control by open/close files only when needed.
96 */
97 #if CMK_TRACE_LOGFILE_NUM_CONTROL
98   #define OPEN_LOG openLog("a");
99   #define CLOSE_LOG closeLog();
100 #else
101   #define OPEN_LOG
102   #define CLOSE_LOG
103 #endif //CMK_TRACE_LOGFILE_NUM_CONTROL
104
105 #if CMK_HAS_COUNTER_PAPI
106 int numPAPIEvents = 2;
107 int papiEvents[] = { PAPI_L3_DCM, PAPI_FP_OPS };
108 char *papiEventNames[] = {"PAPI_L3_DCM", "PAPI_FP_OPS"};
109 #endif // CMK_HAS_COUNTER_PAPI
110
111 /**
112   For each TraceFoo module, _createTraceFoo() must be defined.
113   This function is called in _createTraces() generated in moduleInit.C
114 */
115 void _createTraceprojections(char **argv)
116 {
117   DEBUGF("%d createTraceProjections\n", CkMyPe());
118   CkpvInitialize(CkVec<char *>, usrEventlist);
119   CkpvInitialize(CkVec<UsrEvent *>*, usrEvents);
120   CkpvAccess(usrEvents) = new CkVec<UsrEvent *>();
121 #if CMK_BLUEGENE_CHARM
122   // CthRegister does not call the constructor
123 //  CkpvAccess(usrEvents) = CkVec<UsrEvent *>();
124 #endif //CMK_BLUEGENE_CHARM
125   CkpvInitialize(TraceProjections*, _trace);
126   CkpvAccess(_trace) = new  TraceProjections(argv);
127   CkpvAccess(_traces)->addTrace(CkpvAccess(_trace));
128   if (CkMyPe()==0) CkPrintf("Charm++: Tracemode Projections enabled.\n");
129 }
130  
131 /* ****** CW TEMPORARY LOCATION ***** Support for thread listeners */
132
133 struct TraceThreadListener {
134   struct CthThreadListener base;
135   int event;
136   int msgType;
137   int ep;
138   int srcPe;
139   int ml;
140   CmiObjId idx;
141 };
142
143 extern "C"
144 void traceThreadListener_suspend(struct CthThreadListener *l)
145 {
146   TraceThreadListener *a=(TraceThreadListener *)l;
147   /* here, we activate the appropriate trace codes for the appropriate
148      registered modules */
149   traceSuspend();
150 }
151
152 extern "C"
153 void traceThreadListener_resume(struct CthThreadListener *l) 
154 {
155   TraceThreadListener *a=(TraceThreadListener *)l;
156   /* here, we activate the appropriate trace codes for the appropriate
157      registered modules */
158   _TRACE_BEGIN_EXECUTE_DETAILED(a->event,a->msgType,a->ep,a->srcPe,a->ml,
159                                 CthGetThreadID(a->base.thread));
160   a->event=-1;
161   a->srcPe=CkMyPe(); /* potential lie to migrated threads */
162   a->ml=0;
163 }
164
165 extern "C"
166 void traceThreadListener_free(struct CthThreadListener *l) 
167 {
168   TraceThreadListener *a=(TraceThreadListener *)l;
169   delete a;
170 }
171
172 void TraceProjections::traceAddThreadListeners(CthThread tid, envelope *e)
173 {
174 #if CMK_TRACE_ENABLED
175   /* strip essential information from the envelope */
176   TraceThreadListener *a= new TraceThreadListener;
177   
178   a->base.suspend=traceThreadListener_suspend;
179   a->base.resume=traceThreadListener_resume;
180   a->base.free=traceThreadListener_free;
181   a->event=e->getEvent();
182   a->msgType=e->getMsgtype();
183   a->ep=e->getEpIdx();
184   a->srcPe=e->getSrcPe();
185   a->ml=e->getTotalsize();
186
187   CthAddListener(tid, (CthThreadListener *)a);
188 #endif
189 }
190
191 void LogPool::openLog(const char *mode)
192 {
193 #if CMK_PROJECTIONS_USE_ZLIB
194   if(compressed) {
195     if (nonDeltaLog) {
196       do {
197         zfp = gzopen(fname, mode);
198       } while (!zfp && (errno == EINTR || errno == EMFILE));
199       if(!zfp) CmiAbort("Cannot open Projections Compressed Non Delta Trace File for writing...\n");
200     }
201     if (deltaLog) {
202       do {
203         deltazfp = gzopen(dfname, mode);
204       } while (!deltazfp && (errno == EINTR || errno == EMFILE));
205       if (!deltazfp) 
206         CmiAbort("Cannot open Projections Compressed Delta Trace File for writing...\n");
207     }
208   } else {
209     if (nonDeltaLog) {
210       do {
211         fp = fopen(fname, mode);
212       } while (!fp && (errno == EINTR || errno == EMFILE));
213       if (!fp) {
214         CkPrintf("[%d] Attempting to open file [%s]\n",CkMyPe(),fname);
215         CmiAbort("Cannot open Projections Non Delta Trace File for writing...\n");
216       }
217     }
218     if (deltaLog) {
219       do {
220         deltafp = fopen(dfname, mode);
221       } while (!deltafp && (errno == EINTR || errno == EMFILE));
222       if (!deltafp) 
223         CmiAbort("Cannot open Projections Delta Trace File for writing...\n");
224     }
225   }
226 #else
227   if (nonDeltaLog) {
228     do {
229       fp = fopen(fname, mode);
230     } while (!fp && (errno == EINTR || errno == EMFILE));
231     if (!fp) {
232       CkPrintf("[%d] Attempting to open file [%s]\n",CkMyPe(),fname);
233       CmiAbort("Cannot open Projections Non Delta Trace File for writing...\n");
234     }
235   }
236   if (deltaLog) {
237     do {
238       deltafp = fopen(dfname, mode);
239     } while (!deltafp && (errno == EINTR || errno == EMFILE));
240     if(!deltafp) 
241       CmiAbort("Cannot open Projections Delta Trace File for writing...\n");
242   }
243 #endif
244 }
245
246 void LogPool::closeLog(void)
247 {
248 #if CMK_PROJECTIONS_USE_ZLIB
249   if(compressed) {
250     if (nonDeltaLog) gzclose(zfp);
251     if (deltaLog) gzclose(deltazfp);
252     return;
253   }
254 #endif
255   if (nonDeltaLog) { 
256 #if !defined(_WIN32) || defined(__CYGWIN__)
257     fsync(fileno(fp)); 
258 #endif
259     fclose(fp); 
260   }
261   if (deltaLog)  { 
262 #if !defined(_WIN32) || defined(__CYGWIN__)
263     fsync(fileno(deltafp)); 
264 #endif
265     fclose(deltafp);  
266   }
267 }
268
269 LogPool::LogPool(char *pgm) {
270   pool = new LogEntry[CkpvAccess(CtrLogBufSize)];
271   // defaults to writing data (no outlier changes)
272   writeData = true;
273   numEntries = 0;
274   // **CW** for simple delta encoding
275   prevTime = 0.0;
276   timeErr = 0.0;
277   globalEndTime = 0.0;
278   headerWritten = 0;
279   numPhases = 0;
280   hasFlushed = false;
281
282   keepPhase = NULL;
283
284   fileCreated = false;
285   poolSize = CkpvAccess(CtrLogBufSize);
286   pgmname = new char[strlen(pgm)+1];
287   strcpy(pgmname, pgm);
288 }
289
290 void LogPool::createFile(const char *fix)
291 {
292   if (fileCreated) {
293     return;
294   }
295
296   char* filenameLastPart = strrchr(pgmname, PATHSEP) + 1; // Last occurrence of path separator
297   char *pathPlusFilePrefix = new char[1024];
298
299   if(nSubdirs > 0){
300     int sd = CkMyPe() % nSubdirs;
301     char *subdir = new char[1024];
302     sprintf(subdir, "%s.projdir.%d", pgmname, sd);
303     CmiMkdir(subdir);
304     sprintf(pathPlusFilePrefix, "%s%c%s%s", subdir, PATHSEP, filenameLastPart, fix);
305     delete[] subdir;
306   } else {
307     sprintf(pathPlusFilePrefix, "%s%s", pgmname, fix);
308   }
309
310   char pestr[10];
311   sprintf(pestr, "%d", CkMyPe());
312 #if CMK_PROJECTIONS_USE_ZLIB
313   int len;
314   if(compressed)
315     len = strlen(pathPlusFilePrefix)+strlen(".logold")+strlen(pestr)+strlen(".gz")+3;
316   else
317     len = strlen(pathPlusFilePrefix)+strlen(".logold")+strlen(pestr)+3;
318 #else
319   int len = strlen(pathPlusFilePrefix)+strlen(".logold")+strlen(pestr)+3;
320 #endif
321
322   if (nonDeltaLog) {
323     fname = new char[len];
324   }
325   if (deltaLog) {
326     dfname = new char[len];
327   }
328 #if CMK_PROJECTIONS_USE_ZLIB
329   if(compressed) {
330     if (deltaLog && nonDeltaLog) {
331       sprintf(fname, "%s.%s.logold.gz",  pathPlusFilePrefix, pestr);
332       sprintf(dfname, "%s.%s.log.gz", pathPlusFilePrefix, pestr);
333     } else {
334       if (nonDeltaLog) {
335         sprintf(fname, "%s.%s.log.gz", pathPlusFilePrefix,pestr);
336       } else {
337         sprintf(dfname, "%s.%s.log.gz", pathPlusFilePrefix, pestr);
338       }
339     }
340   } else {
341     if (deltaLog && nonDeltaLog) {
342       sprintf(fname, "%s.%s.logold", pathPlusFilePrefix, pestr);
343       sprintf(dfname, "%s.%s.log", pathPlusFilePrefix, pestr);
344     } else {
345       if (nonDeltaLog) {
346         sprintf(fname, "%s.%s.log", pathPlusFilePrefix, pestr);
347       } else {
348         sprintf(dfname, "%s.%s.log", pathPlusFilePrefix, pestr);
349       }
350     }
351   }
352 #else
353   if (deltaLog && nonDeltaLog) {
354     sprintf(fname, "%s.%s.logold", pathPlusFilePrefix, pestr);
355     sprintf(dfname, "%s.%s.log", pathPlusFilePrefix, pestr);
356   } else {
357     if (nonDeltaLog) {
358       sprintf(fname, "%s.%s.log", pathPlusFilePrefix, pestr);
359     } else {
360       sprintf(dfname, "%s.%s.log", pathPlusFilePrefix, pestr);
361     }
362   }
363 #endif
364   fileCreated = true;
365   delete[] pathPlusFilePrefix;
366   openLog("w+");
367   CLOSE_LOG 
368 }
369
370 void LogPool::createSts(const char *fix)
371 {
372   CkAssert(CkMyPe() == 0);
373   // create the sts file
374   char *fname = new char[strlen(CkpvAccess(traceRoot))+strlen(fix)+strlen(".sts")+2];
375   sprintf(fname, "%s%s.sts", CkpvAccess(traceRoot), fix);
376   do
377     {
378       stsfp = fopen(fname, "w");
379     } while (!stsfp && (errno == EINTR || errno == EMFILE));
380   if(stsfp==0)
381     CmiAbort("Cannot open projections sts file for writing.\n");
382   delete[] fname;
383 }  
384
385 void LogPool::createRC()
386 {
387   // create the projections rc file.
388   fname = 
389     new char[strlen(CkpvAccess(traceRoot))+strlen(".projrc")+1];
390   sprintf(fname, "%s.projrc", CkpvAccess(traceRoot));
391   do {
392     rcfp = fopen(fname, "w");
393   } while (!rcfp && (errno == EINTR || errno == EMFILE));
394   if (rcfp==0) {
395     CmiAbort("Cannot open projections configuration file for writing.\n");
396   }
397   delete[] fname;
398 }
399
400 LogPool::~LogPool() 
401 {
402   if (writeData) {
403     writeLog();
404 #if !CMK_TRACE_LOGFILE_NUM_CONTROL
405     closeLog();
406 #endif
407   }
408
409 #if CMK_BLUEGENE_CHARM
410   extern int correctTimeLog;
411   if (correctTimeLog) {
412     createFile("-bg");
413     if (CkMyPe() == 0) {
414       createSts("-bg");
415     }
416     writeHeader();
417     if (CkMyPe() == 0) writeSts(NULL);
418     postProcessLog();
419   }
420 #endif
421
422   delete[] pool;
423   delete [] fname;
424 }
425
426 void LogPool::writeHeader()
427 {
428   if (headerWritten) return;
429   headerWritten = 1;
430   if(!binary) {
431 #if CMK_PROJECTIONS_USE_ZLIB
432     if(compressed) {
433       if (nonDeltaLog) {
434         gzprintf(zfp, "PROJECTIONS-RECORD %d\n", numEntries);
435       }
436       if (deltaLog) {
437         gzprintf(deltazfp, "PROJECTIONS-RECORD %d DELTA\n", numEntries);
438       }
439     } 
440     else /* else clause is below... */
441 #endif
442     /*... may hang over from else above */ {
443       if (nonDeltaLog) {
444         fprintf(fp, "PROJECTIONS-RECORD %d\n", numEntries);
445       }
446       if (deltaLog) {
447         fprintf(deltafp, "PROJECTIONS-RECORD %d DELTA\n", numEntries);
448       }
449     }
450   }
451   else { // binary
452       if (nonDeltaLog) {
453         fwrite(&numEntries,sizeof(numEntries),1,fp);
454       }
455       if (deltaLog) {
456         fwrite(&numEntries,sizeof(numEntries),1,deltafp);
457       }
458   }
459 }
460
461 void LogPool::writeLog(void)
462 {
463   createFile();
464   OPEN_LOG
465   writeHeader();
466   if (nonDeltaLog) write(0);
467   if (deltaLog) write(1);
468   CLOSE_LOG
469 }
470
471 void LogPool::write(int writedelta) 
472 {
473   // **CW** Simple delta encoding implementation
474   // prevTime has to be maintained as an object variable because
475   // LogPool::write may be called several times depending on the
476   // +logsize value.
477   PUP::er *p = NULL;
478   if (binary) {
479     p = new PUP::toDisk(writedelta?deltafp:fp);
480   }
481 #if CMK_PROJECTIONS_USE_ZLIB
482   else if (compressed) {
483     p = new toProjectionsGZFile(writedelta?deltazfp:zfp);
484   }
485 #endif
486   else {
487     p = new toProjectionsFile(writedelta?deltafp:fp);
488   }
489   CmiAssert(p);
490   int curPhase = 0;
491   // **FIXME** - Should probably consider a more sophisticated bounds-based
492   //   approach for selective writing instead of making multiple if-checks
493   //   for every single event.
494   for(UInt i=0; i<numEntries; i++) {
495     if (!writedelta) {
496       if (keepPhase == NULL) {
497         // default case, when no phase selection is required.
498         pool[i].pup(*p);
499       } else {
500         // **FIXME** Might be a good idea to create a "filler" event block for
501         //   all the events taken out by phase filtering.
502         if (pool[i].type == END_PHASE) {
503           // always write phase markers
504           pool[i].pup(*p);
505           curPhase++;
506         } else if (pool[i].type == BEGIN_COMPUTATION ||
507                    pool[i].type == END_COMPUTATION) {
508           // always write BEGIN and END COMPUTATION markers
509           pool[i].pup(*p);
510         } else if (keepPhase[curPhase]) {
511           pool[i].pup(*p);
512         }
513       }
514     }
515     else {      // delta
516       // **FIXME** Implement phase-selective writing for delta logs
517       //   eventually
518       double time = pool[i].time;
519       if (pool[i].type != BEGIN_COMPUTATION && pool[i].type != END_COMPUTATION)
520       {
521         double timeDiff = (time-prevTime)*1.0e6;
522         UInt intTimeDiff = (UInt)timeDiff;
523         timeErr += timeDiff - intTimeDiff; /* timeErr is never >= 2.0 */
524         if (timeErr > 1.0) {
525           timeErr -= 1.0;
526           intTimeDiff++;
527         }
528         pool[i].time = intTimeDiff/1.0e6;
529       }
530       pool[i].pup(*p);
531       pool[i].time = time;      // restore time value
532       prevTime = time;
533     }
534   }
535   delete p;
536   delete [] keepPhase;
537 }
538
539 void LogPool::writeSts(void)
540 {
541   // for whining compilers
542   int i;
543   char name[30];
544   // generate an automatic unique ID for each log
545   fprintf(stsfp, "PROJECTIONS_ID %s\n", "");
546   fprintf(stsfp, "VERSION %s\n", PROJECTION_VERSION);
547   fprintf(stsfp, "TOTAL_PHASES %d\n", numPhases);
548 #if CMK_HAS_COUNTER_PAPI
549   fprintf(stsfp, "TOTAL_PAPI_EVENTS %d\n", numPAPIEvents);
550   // for now, use i, next time use papiEvents[i].
551   // **CW** papi event names is a hack.
552   for (i=0;i<numPAPIEvents;i++) {
553     fprintf(stsfp, "PAPI_EVENT %d %s\n", i, papiEventNames[i]);
554   }
555 #endif
556   traceWriteSTS(stsfp,CkpvAccess(usrEvents)->length());
557   for(i=0;i<CkpvAccess(usrEvents)->length();i++){
558     fprintf(stsfp, "EVENT %d %s\n", (*CkpvAccess(usrEvents))[i]->e, (*CkpvAccess(usrEvents))[i]->str);
559   }     
560 }
561
562 void LogPool::writeSts(TraceProjections *traceProj){
563   writeSts();
564   if (traceProj != NULL) {
565     CkHashtableIterator  *funcIter = traceProj->getfuncIterator();
566     funcIter->seekStart();
567     int numFuncs = traceProj->getFuncNumber();
568     fprintf(stsfp,"TOTAL_FUNCTIONS %d \n",numFuncs);
569     while(funcIter->hasNext()) {
570       StrKey *key;
571       int *obj = (int *)funcIter->next((void **)&key);
572       fprintf(stsfp,"FUNCTION %d %s \n",*obj,key->getStr());
573     }
574   }
575   fprintf(stsfp, "END\n");
576   fclose(stsfp);
577 }
578
579 void LogPool::writeRC(void)
580 {
581     //CkPrintf("write RC is being executed\n");
582 #ifdef PROJ_ANALYSIS  
583     CkAssert(CkMyPe() == 0);
584     fprintf(rcfp,"RC_GLOBAL_END_TIME %lld\n",
585           (CMK_TYPEDEF_UINT8)(1.0e6*globalEndTime));
586     /* //Yanhua comment it because isOutlierAutomatic is not a variable in trace
587     if (CkpvAccess(_trace)->isOutlierAutomatic()) {
588       fprintf(rcfp,"RC_OUTLIER_FILTERED true\n");
589     } else {
590       fprintf(rcfp,"RC_OUTLIER_FILTERED false\n");
591     }
592     */
593 #endif //PROJ_ANALYSIS
594   fclose(rcfp);
595 }
596
597
598 #if CMK_BLUEGENE_CHARM
599 static void updateProjLog(void *data, double t, double recvT, void *ptr)
600 {
601   LogEntry *log = (LogEntry *)data;
602   FILE *fp = *(FILE **)ptr;
603   log->time = t;
604   log->recvTime = recvT<0.0?0:recvT;
605 //  log->write(fp);
606   toProjectionsFile p(fp);
607   log->pup(p);
608 }
609 #endif
610
611 // flush log entries to disk
612 void LogPool::flushLogBuffer()
613 {
614   if (numEntries) {
615     double writeTime = TraceTimer();
616     writeLog();
617     hasFlushed = true;
618     numEntries = 0;
619     new (&pool[numEntries++]) LogEntry(writeTime, BEGIN_INTERRUPT);
620     new (&pool[numEntries++]) LogEntry(TraceTimer(), END_INTERRUPT);
621   }
622 }
623
624 void LogPool::add(UChar type, UShort mIdx, UShort eIdx,
625                   double time, int event, int pe, int ml, CmiObjId *id, 
626                   double recvT, double cpuT, int numPe)
627 {
628   new (&pool[numEntries++])
629     LogEntry(time, type, mIdx, eIdx, event, pe, ml, id, recvT, cpuT, numPe);
630   if ((type == END_PHASE) || (type == END_COMPUTATION)) {
631     numPhases++;
632   }
633   if(poolSize==numEntries) {
634     flushLogBuffer();
635 #if CMK_BLUEGENE_CHARM
636     extern int correctTimeLog;
637     if (correctTimeLog) CmiAbort("I/O interrupt!\n");
638 #endif
639   }
640 #if CMK_BLUEGENE_CHARM
641   switch (type) {
642     case BEGIN_PROCESSING:
643       pool[numEntries-1].recvTime = BgGetRecvTime();
644     case END_PROCESSING:
645     case BEGIN_COMPUTATION:
646     case END_COMPUTATION:
647     case CREATION:
648     case BEGIN_PACK:
649     case END_PACK:
650     case BEGIN_UNPACK:
651     case END_UNPACK:
652     case USER_EVENT_PAIR:
653       bgAddProjEvent(&pool[numEntries-1], numEntries-1, time, updateProjLog, &fp, BG_EVENT_PROJ);
654   }
655 #endif
656 }
657
658 void LogPool::add(UChar type,double time,UShort funcID,int lineNum,char *fileName){
659 #ifndef CMK_BLUEGENE_CHARM
660   new (&pool[numEntries++])
661         LogEntry(time,type,funcID,lineNum,fileName);
662   if(poolSize == numEntries){
663     flushLogBuffer();
664   }
665 #endif  
666 }
667
668
669   
670 void LogPool::addMemoryUsage(unsigned char type,double time,double memUsage){
671 #ifndef CMK_BLUEGENE_CHARM
672   new (&pool[numEntries++])
673         LogEntry(type,time,memUsage);
674   if(poolSize == numEntries){
675     flushLogBuffer();
676   }
677 #endif  
678         
679 }  
680
681
682
683 void LogPool::addUserSupplied(int data){
684         // add an event
685         add(USER_SUPPLIED, 0, 0, TraceTimer(), -1, -1, 0, 0, 0, 0, 0 );
686
687         // set the user supplied value for the previously created event 
688         pool[numEntries-1].setUserSuppliedData(data);
689   }
690
691
692 void LogPool::addUserSuppliedNote(char *note){
693         // add an event
694         add(USER_SUPPLIED_NOTE, 0, 0, TraceTimer(), -1, -1, 0, 0, 0, 0, 0 );
695
696         // set the user supplied note for the previously created event 
697         pool[numEntries-1].setUserSuppliedNote(note);
698   }
699
700 void LogPool::addUserSuppliedBracketedNote(char *note, int eventID, double bt, double et){
701   //CkPrintf("LogPool::addUserSuppliedBracketedNote eventID=%d\n", eventID);
702 #ifndef CMK_BLUEGENE_CHARM
703 #if CMK_SMP_TRACE_COMMTHREAD && MPI_SMP_TRACE_COMMTHREAD_HACK
704   //This part of code is used  to combine the contiguous
705   //MPI_Test and MPI_Iprobe events to reduce the number of
706   //entries
707 #define MPI_TEST_EVENT_ID 60
708 #define MPI_IPROBE_EVENT_ID 70 
709   int lastEvent = pool[numEntries-1].event;
710   if((eventID==MPI_TEST_EVENT_ID || eventID==MPI_IPROBE_EVENT_ID) && (eventID==lastEvent)){
711     //just replace the endtime of last event
712     //CkPrintf("addUserSuppliedBracketNote: for event %d\n", lastEvent);
713     pool[numEntries].endTime = et;
714   }else{
715     new (&pool[numEntries++])
716       LogEntry(bt, et, USER_SUPPLIED_BRACKETED_NOTE, note, eventID);
717   }
718 #else
719   new (&pool[numEntries++])
720     LogEntry(bt, et, USER_SUPPLIED_BRACKETED_NOTE, note, eventID);
721 #endif
722   if(poolSize == numEntries){
723     flushLogBuffer();
724   }
725 #endif  
726 }
727
728
729 /* **CW** Not sure if this is the right thing to do. Feels more like
730    a hack than a solution to Sameer's request to add the destination
731    processor information to multicasts and broadcasts.
732
733    In the unlikely event this method is used for Broadcasts as well,
734    pelist == NULL will be used to indicate a global broadcast with 
735    num PEs.
736 */
737 void LogPool::addCreationMulticast(UShort mIdx, UShort eIdx, double time,
738                                    int event, int pe, int ml, CmiObjId *id,
739                                    double recvT, int numPe, int *pelist)
740 {
741   new (&pool[numEntries++])
742     LogEntry(time, mIdx, eIdx, event, pe, ml, id, recvT, numPe, pelist);
743   if(poolSize==numEntries) {
744     flushLogBuffer();
745   }
746 }
747
748 void LogPool::postProcessLog()
749 {
750 #if CMK_BLUEGENE_CHARM
751   bgUpdateProj(1);   // event type
752 #endif
753 }
754
755 // /** Constructor for a multicast log entry */
756 // 
757 //  THIS WAS MOVED TO trace-projections.h with the other constructors
758 // 
759 // LogEntry::LogEntry(double tm, unsigned short m, unsigned short e, int ev, int p,
760 //           int ml, CmiObjId *d, double rt, int numPe, int *pelist) 
761 // {
762 //     type = CREATION_MULTICAST; mIdx = m; eIdx = e; event = ev; pe = p; time = tm; msglen = ml;
763 //     if (d) id = *d; else {id.id[0]=id.id[1]=id.id[2]=id.id[3]=-1; };
764 //     recvTime = rt; 
765 //     numpes = numPe;
766 //     userSuppliedNote = NULL;
767 //     if (pelist != NULL) {
768 //      pes = new int[numPe];
769 //      for (int i=0; i<numPe; i++) {
770 //        pes[i] = pelist[i];
771 //      }
772 //     } else {
773 //      pes= NULL;
774 //     }
775 // }
776
777 void LogEntry::addPapi(int numPapiEvts, int *papi_ids, LONG_LONG_PAPI *papiVals)
778 {
779 #if CMK_HAS_COUNTER_PAPI
780   numPapiEvents = numPapiEvts;
781   if (papiVals != NULL) {
782     papiIDs = new int[numPapiEvents];
783     papiValues = new LONG_LONG_PAPI[numPapiEvents];
784     for (int i=0; i<numPapiEvents; i++) {
785       papiIDs[i] = papi_ids[i];
786       papiValues[i] = papiVals[i];
787     }
788   }
789 #endif
790 }
791
792
793
794 void LogEntry::pup(PUP::er &p)
795 {
796   int i;
797   CMK_TYPEDEF_UINT8 itime, iEndTime, irecvtime, icputime;
798   char ret = '\n';
799
800   p|type;
801   if (p.isPacking()) itime = (CMK_TYPEDEF_UINT8)(1.0e6*time);
802   if (p.isPacking()) iEndTime = (CMK_TYPEDEF_UINT8)(1.0e6*endTime);
803
804   switch (type) {
805     case USER_EVENT:
806     case USER_EVENT_PAIR:
807       p|mIdx; p|itime; p|event; p|pe;
808       break;
809     case BEGIN_IDLE:
810     case END_IDLE:
811     case BEGIN_PACK:
812     case END_PACK:
813     case BEGIN_UNPACK:
814     case END_UNPACK:
815       p|itime; p|pe; 
816       break;
817     case BEGIN_PROCESSING:
818       if (p.isPacking()) {
819         irecvtime = (CMK_TYPEDEF_UINT8)(recvTime==-1?-1:1.0e6*recvTime);
820         icputime = (CMK_TYPEDEF_UINT8)(1.0e6*cputime);
821       }
822       p|mIdx; p|eIdx; p|itime; p|event; p|pe; 
823       p|msglen; p|irecvtime; 
824       p|id.id[0]; p|id.id[1]; p|id.id[2]; p|id.id[3];
825       p|icputime;
826 #if CMK_HAS_COUNTER_PAPI
827       p|numPapiEvents;
828       for (i=0; i<numPapiEvents; i++) {
829         // not yet!!!
830         //      p|papiIDs[i]; 
831         p|papiValues[i];
832         
833       }
834 #else
835       p|numPapiEvents;     // non papi version has value 0
836 #endif
837       if (p.isUnpacking()) {
838         recvTime = irecvtime/1.0e6;
839         cputime = icputime/1.0e6;
840       }
841       break;
842     case END_PROCESSING:
843       if (p.isPacking()) icputime = (CMK_TYPEDEF_UINT8)(1.0e6*cputime);
844       p|mIdx; p|eIdx; p|itime; p|event; p|pe; p|msglen; p|icputime;
845 #if CMK_HAS_COUNTER_PAPI
846       p|numPapiEvents;
847       for (i=0; i<numPapiEvents; i++) {
848         // not yet!!!
849         //      p|papiIDs[i];
850         p|papiValues[i];
851       }
852 #else
853       p|numPapiEvents;  // non papi version has value 0
854 #endif
855       if (p.isUnpacking()) cputime = icputime/1.0e6;
856       break;
857     case USER_SUPPLIED:
858           p|userSuppliedData;
859           p|itime;
860         break;
861     case USER_SUPPLIED_NOTE:
862           p|itime;
863           int length;
864           if (p.isPacking()) length = strlen(userSuppliedNote);
865           p | length;
866           char space;
867           space = ' ';
868           p | space;
869           if (p.isUnpacking()) {
870             userSuppliedNote = new char[length+1];
871             userSuppliedNote[length] = '\0';
872           }
873           PUParray(p,userSuppliedNote, length);
874           break;
875     case USER_SUPPLIED_BRACKETED_NOTE:
876       //CkPrintf("Writting out a USER_SUPPLIED_BRACKETED_NOTE\n");
877           p|itime;
878           p|iEndTime;
879           p|event;
880           int length2;
881           if (p.isPacking()) length2 = strlen(userSuppliedNote);
882           p | length2;
883           char space2;
884           space2 = ' ';
885           p | space2;
886           if (p.isUnpacking()) {
887             userSuppliedNote = new char[length+1];
888             userSuppliedNote[length] = '\0';
889           }
890           PUParray(p,userSuppliedNote, length2);
891           break;
892     case MEMORY_USAGE_CURRENT:
893       p | memUsage;
894       p | itime;
895         break;
896     case CREATION:
897       if (p.isPacking()) irecvtime = (CMK_TYPEDEF_UINT8)(1.0e6*recvTime);
898       p|mIdx; p|eIdx; p|itime;
899       p|event; p|pe; p|msglen; p|irecvtime;
900       if (p.isUnpacking()) recvTime = irecvtime/1.0e6;
901       break;
902     case CREATION_BCAST:
903       if (p.isPacking()) irecvtime = (CMK_TYPEDEF_UINT8)(1.0e6*recvTime);
904       p|mIdx; p|eIdx; p|itime;
905       p|event; p|pe; p|msglen; p|irecvtime; p|numpes;
906       if (p.isUnpacking()) recvTime = irecvtime/1.0e6;
907       break;
908     case CREATION_MULTICAST:
909       if (p.isPacking()) irecvtime = (CMK_TYPEDEF_UINT8)(1.0e6*recvTime);
910       p|mIdx; p|eIdx; p|itime;
911       p|event; p|pe; p|msglen; p|irecvtime; p|numpes;
912       if (p.isUnpacking()) pes = numpes?new int[numpes]:NULL;
913       for (i=0; i<numpes; i++) p|pes[i];
914       if (p.isUnpacking()) recvTime = irecvtime/1.0e6;
915       break;
916     case MESSAGE_RECV:
917       p|mIdx; p|eIdx; p|itime; p|event; p|pe; p|msglen;
918       break;
919
920     case ENQUEUE:
921     case DEQUEUE:
922       p|mIdx; p|itime; p|event; p|pe;
923       break;
924
925     case BEGIN_INTERRUPT:
926     case END_INTERRUPT:
927       p|itime; p|event; p|pe;
928       break;
929
930       // **CW** absolute timestamps are used here to support a quick
931       // way of determining the total time of a run in projections
932       // visualization.
933     case BEGIN_COMPUTATION:
934     case END_COMPUTATION:
935     case BEGIN_TRACE:
936     case END_TRACE:
937       p|itime;
938       break;
939     case BEGIN_FUNC:
940         p | itime;
941         p | mIdx;
942         p | event;
943         if(!p.isUnpacking()){
944                 p(fName,flen-1);
945         }
946         break;
947     case END_FUNC:
948         p | itime;
949         p | mIdx;
950         break;
951     case END_PHASE:
952       p|eIdx; // FIXME: actually the phase ID
953       p|itime;
954       break;
955     default:
956       CmiError("***Internal Error*** Wierd Event %d.\n", type);
957       break;
958   }
959   if (p.isUnpacking()) time = itime/1.0e6;
960   p|ret;
961 }
962
963 TraceProjections::TraceProjections(char **argv): 
964   curevent(0), inEntry(0), computationStarted(0), 
965         converseExit(0), endTime(0.0), traceNestedEvents(0),
966         currentPhaseID(0), lastPhaseEvent(NULL)
967 {
968   //  CkPrintf("Trace projections dummy constructor called on %d\n",CkMyPe());
969
970   if (CkpvAccess(traceOnPe) == 0) return;
971
972   CtvInitialize(int,curThreadEvent);
973   CkpvInitialize(CmiInt8, CtrLogBufSize);
974   CkpvAccess(CtrLogBufSize) = DefaultLogBufSize;
975   CtvAccess(curThreadEvent)=0;
976   if (CmiGetArgLongDesc(argv,"+logsize",&CkpvAccess(CtrLogBufSize), 
977                        "Log entries to buffer per I/O")) {
978     if (CkMyPe() == 0) {
979       CmiPrintf("Trace: logsize: %ld\n", CkpvAccess(CtrLogBufSize));
980     }
981   }
982   checknested = 
983     CmiGetArgFlagDesc(argv,"+checknested",
984                       "check projections nest begin end execute events");
985   traceNestedEvents = 
986     CmiGetArgFlagDesc(argv,"+tracenested",
987               "trace projections nest begin/end execute events");
988   int binary = 
989     CmiGetArgFlagDesc(argv,"+binary-trace",
990                       "Write log files in binary format");
991
992   CmiInt8 nSubdirs = 0;
993   CmiGetArgLongDesc(argv,"+trace-subdirs", &nSubdirs, "Number of subdirectories into which traces will be written");
994
995
996 #if CMK_PROJECTIONS_USE_ZLIB
997   int compressed = CmiGetArgFlagDesc(argv,"+gz-trace","Write log files pre-compressed with gzip");
998 #else
999   // consume the flag so there's no confusing
1000   CmiGetArgFlagDesc(argv,"+gz-trace",
1001                     "Write log files pre-compressed with gzip");
1002   if(CkMyPe() == 0) CkPrintf("Warning> gz-trace is not supported on this machine!\n");
1003 #endif
1004
1005   // **CW** default to non delta log encoding. The user may choose to do
1006   // create both logs (for debugging) or just the old log timestamping
1007   // (for compatibility).
1008   // Generating just the non delta log takes precedence over generating
1009   // both logs (if both arguments appear on the command line).
1010
1011   // switch to OLD log format until everything works // Gengbin
1012   nonDeltaLog = 1;
1013   deltaLog = 0;
1014   deltaLog = CmiGetArgFlagDesc(argv, "+logDelta",
1015                                   "Generate Delta encoded and simple timestamped log files");
1016
1017   _logPool = new LogPool(CkpvAccess(traceRoot));
1018   _logPool->setNumSubdirs(nSubdirs);
1019   _logPool->setBinary(binary);
1020 #if CMK_PROJECTIONS_USE_ZLIB
1021   _logPool->setCompressed(compressed);
1022 #endif
1023   if (CkMyPe() == 0) {
1024     _logPool->createSts();
1025     _logPool->createRC();
1026   }
1027   funcCount=1;
1028
1029 #if CMK_HAS_COUNTER_PAPI
1030   // We initialize and create the event sets for use with PAPI here.
1031   int papiRetValue = PAPI_library_init(PAPI_VER_CURRENT);
1032   if (papiRetValue != PAPI_VER_CURRENT) {
1033     CmiAbort("PAPI Library initialization failure!\n");
1034   }
1035   // PAPI 3 mandates the initialization of the set to PAPI_NULL
1036   papiEventSet = PAPI_NULL; 
1037   if (PAPI_create_eventset(&papiEventSet) != PAPI_OK) {
1038     CmiAbort("PAPI failed to create event set!\n");
1039   }
1040   papiRetValue = PAPI_add_events(papiEventSet, papiEvents, numPAPIEvents);
1041   if (papiRetValue != PAPI_OK) {
1042     if (papiRetValue == PAPI_ECNFLCT) {
1043       CmiAbort("PAPI events conflict! Please re-assign event types!\n");
1044     } else {
1045       CmiAbort("PAPI failed to add designated events!\n");
1046     }
1047   }
1048   papiValues = new long_long[numPAPIEvents];
1049   memset(papiValues, 0, numPAPIEvents*sizeof(long_long));
1050 #endif
1051 }
1052
1053 int TraceProjections::traceRegisterUserEvent(const char* evt, int e)
1054 {
1055   OPTIMIZED_VERSION
1056   CkAssert(e==-1 || e>=0);
1057   CkAssert(evt != NULL);
1058   int event;
1059   int biggest = -1;
1060   for (int i=0; i<CkpvAccess(usrEvents)->length(); i++) {
1061     int cur = (*CkpvAccess(usrEvents))[i]->e;
1062     if (cur == e) {
1063       //CmiPrintf("%s %s\n", (*CkpvAccess(usrEvents))[i]->str, evt);
1064       if (strcmp((*CkpvAccess(usrEvents))[i]->str, evt) == 0) 
1065         return e;
1066       else
1067         CmiAbort("UserEvent double registered!");
1068     }
1069     if (cur > biggest) biggest = cur;
1070   }
1071   // if no user events have so far been registered. biggest will be -1
1072   // and hence newly assigned event numbers will begin from 0.
1073   if (e==-1) event = biggest+1;  // automatically assign new event number
1074   else event = e;
1075   CkpvAccess(usrEvents)->push_back(new UsrEvent(event,(char *)evt));
1076   return event;
1077 }
1078
1079 void TraceProjections::traceClearEps(void)
1080 {
1081   // In trace-summary, this zeros out the EP bins, to eliminate noise
1082   // from startup.  Here, this isn't useful, since we can do that in
1083   // post-processing
1084 }
1085
1086 void TraceProjections::traceWriteSts(void)
1087 {
1088   if(CkMyPe()==0)
1089     _logPool->writeSts(this);
1090 }
1091
1092 /** 
1093  * **IMPT NOTES**:
1094  *
1095  * This is called when Converse closes during ConverseCommonExit().
1096  * **FIXME**(?) - is this also exposed as a tracing-framework API call?
1097  *
1098  * Some programs bypass CkExit() (like NAMD, which eventually calls
1099  * ConverseExit()), modules like traces will have to pretend to shutdown
1100  * as if CkExit() was called but at the same time avoid making
1101  * subsequent CkExit() calls (which is usually required for allowing
1102  * other modules to shutdown).
1103  *
1104  * Note that we can only get here if CkExit() was not called, since the
1105  * trace module will un-register itself from TraceArray if it did.
1106  *
1107  */
1108 void TraceProjections::traceClose(void)
1109 {
1110 #ifdef PROJ_ANALYSIS
1111   // CkPrintf("CkExit was not called on shutdown on [%d]\n", CkMyPe());
1112
1113   // sets the flag that tells the code not to make the CkExit call later
1114   converseExit = 1;
1115   if (CkMyPe() == 0) {
1116     CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1117     bocProxy.traceProjectionsParallelShutdown(-1);
1118   }
1119 #else
1120   // we've already deleted the logpool, so multiple calls to traceClose
1121   // are tolerated.
1122   if (_logPool == NULL) {
1123     return;
1124   }
1125   if(CkMyPe()==0){
1126     _logPool->writeSts(this);
1127   }
1128   CkpvAccess(_trace)->endComputation();
1129   delete _logPool;              // will write
1130   // remove myself from traceArray so that no tracing will be called.
1131   CkpvAccess(_traces)->removeTrace(this);
1132 #endif
1133 }
1134
1135 /**
1136  *  **IMPT NOTES**:
1137  *
1138  *  This is meant to be called internally by the tracing framework.
1139  *
1140  */
1141 void TraceProjections::closeTrace() {
1142   //  CkPrintf("Close Trace called on [%d]\n", CkMyPe());
1143   if (CkMyPe() == 0) {
1144     // CkPrintf("Pe 0 will now write sts and projrc files\n");
1145     _logPool->writeSts(this);
1146     _logPool->writeRC();
1147     // CkPrintf("Pe 0 has now written sts and projrc files\n");
1148   }
1149   delete _logPool;       // will write logs to file
1150 }
1151
1152 #if CMK_SMP_TRACE_COMMTHREAD
1153 void TraceProjections::traceBeginOnCommThread()
1154 {
1155   if (!computationStarted) return;
1156   _logPool->add(BEGIN_TRACE, 0, 0, TraceTimer(), curevent++, CmiNumPes()+CmiMyNode());
1157 }
1158
1159 void TraceProjections::traceEndOnCommThread()
1160 {
1161   _logPool->add(END_TRACE, 0, 0, TraceTimer(), curevent++, CmiNumPes()+CmiMyNode());
1162 }
1163 #endif
1164
1165 void TraceProjections::traceBegin(void)
1166 {
1167   if (!computationStarted) return;
1168   _logPool->add(BEGIN_TRACE, 0, 0, TraceTimer(), curevent++, CkMyPe());
1169 }
1170
1171 void TraceProjections::traceEnd(void)
1172 {
1173   _logPool->add(END_TRACE, 0, 0, TraceTimer(), curevent++, CkMyPe());
1174 }
1175
1176 void TraceProjections::userEvent(int e)
1177 {
1178   if (!computationStarted) return;
1179   _logPool->add(USER_EVENT, e, 0, TraceTimer(),curevent++,CkMyPe());
1180 }
1181
1182 void TraceProjections::userBracketEvent(int e, double bt, double et)
1183 {
1184   if (!computationStarted) return;
1185   // two events record Begin/End of event e.
1186   _logPool->add(USER_EVENT_PAIR, e, 0, TraceTimer(bt), curevent, CkMyPe());
1187   _logPool->add(USER_EVENT_PAIR, e, 0, TraceTimer(et), curevent++, CkMyPe());
1188 }
1189
1190 void TraceProjections::userSuppliedData(int d)
1191 {
1192   if (!computationStarted) return;
1193   _logPool->addUserSupplied(d);
1194 }
1195
1196 void TraceProjections::userSuppliedNote(char *note)
1197 {
1198   if (!computationStarted) return;
1199   _logPool->addUserSuppliedNote(note);
1200 }
1201
1202
1203 void TraceProjections::userSuppliedBracketedNote(char *note, int eventID, double bt, double et)
1204 {
1205   if (!computationStarted) return;
1206   _logPool->addUserSuppliedBracketedNote(note,  eventID,  bt, et);
1207 }
1208
1209 void TraceProjections::memoryUsage(double m)
1210 {
1211   if (!computationStarted) return;
1212   _logPool->addMemoryUsage(MEMORY_USAGE_CURRENT, TraceTimer(), m );
1213   
1214 }
1215
1216
1217 void TraceProjections::creation(envelope *e, int ep, int num)
1218 {
1219   double curTime = TraceTimer();
1220   if (e == 0) {
1221     CtvAccess(curThreadEvent) = curevent;
1222     _logPool->add(CREATION, ForChareMsg, ep, curTime,
1223                   curevent++, CkMyPe(), 0, NULL, 0, 0.0);
1224   } else {
1225     int type=e->getMsgtype();
1226     e->setEvent(curevent);
1227     if (num > 1) {
1228       _logPool->add(CREATION_BCAST, type, ep, curTime,
1229                     curevent++, CkMyPe(), e->getTotalsize(), 
1230                     NULL, 0, 0.0, num);
1231     } else {
1232       _logPool->add(CREATION, type, ep, curTime,
1233                     curevent++, CkMyPe(), e->getTotalsize(), 
1234                     NULL, 0, 0.0);
1235     }
1236   }
1237 }
1238
1239 /* **CW** Non-disruptive attempt to add destination PE knowledge to
1240    Communication Library-specific Multicasts via new event 
1241    CREATION_MULTICAST.
1242 */
1243
1244 void TraceProjections::creationMulticast(envelope *e, int ep, int num,
1245                                          int *pelist)
1246 {
1247   double curTime = TraceTimer();
1248   if (e==0) {
1249     CtvAccess(curThreadEvent)=curevent;
1250     _logPool->addCreationMulticast(ForChareMsg, ep, curTime, curevent++,
1251                                    CkMyPe(), 0, 0, 0.0, num, pelist);
1252   } else {
1253     int type=e->getMsgtype();
1254     e->setEvent(curevent);
1255     _logPool->addCreationMulticast(type, ep, curTime, curevent++, CkMyPe(),
1256                                    e->getTotalsize(), 0, 0.0, num, pelist);
1257   }
1258 }
1259
1260 void TraceProjections::creationDone(int num)
1261 {
1262   // modified the creation done time of the last num log entries
1263   // FIXME: totally a hack
1264   double curTime = TraceTimer();
1265   int idx = _logPool->numEntries-1;
1266   while (idx >=0 && num >0 ) {
1267     LogEntry &log = _logPool->pool[idx];
1268     if ((log.type == CREATION) ||
1269         (log.type == CREATION_BCAST) ||
1270         (log.type == CREATION_MULTICAST)) {
1271       log.recvTime = curTime - log.time;
1272       num --;
1273     }
1274     idx--;
1275   }
1276 }
1277
1278 void TraceProjections::beginExecute(CmiObjId *tid)
1279 {
1280 #if CMK_HAS_COUNTER_PAPI
1281   if (PAPI_read(papiEventSet, papiValues) != PAPI_OK) {
1282     CmiAbort("PAPI failed to read at begin execute!\n");
1283   }
1284 #endif
1285   if (checknested && inEntry) CmiAbort("Nested Begin Execute!\n");
1286   execEvent = CtvAccess(curThreadEvent);
1287   execEp = (-1);
1288   _logPool->add(BEGIN_PROCESSING,ForChareMsg,_threadEP,TraceTimer(),
1289                 execEvent,CkMyPe(), 0, tid);
1290 #if CMK_HAS_COUNTER_PAPI
1291   _logPool->addPapi(numPAPIEvents, papiEvents, papiValues);
1292 #endif
1293   inEntry = 1;
1294 }
1295
1296 void TraceProjections::beginExecute(envelope *e)
1297 {
1298   if(e==0) {
1299 #if CMK_HAS_COUNTER_PAPI
1300     if (PAPI_read(papiEventSet, papiValues) != PAPI_OK) {
1301       CmiAbort("PAPI failed to read at begin execute!\n");
1302     }
1303 #endif
1304     if (checknested && inEntry) CmiAbort("Nested Begin Execute!\n");
1305     execEvent = CtvAccess(curThreadEvent);
1306     execEp = (-1);
1307     _logPool->add(BEGIN_PROCESSING,ForChareMsg,_threadEP,TraceTimer(),
1308                   execEvent,CkMyPe(), 0, NULL, 0.0, TraceCpuTimer());
1309 #if CMK_HAS_COUNTER_PAPI
1310     _logPool->addPapi(numPAPIEvents, papiEvents, papiValues);
1311 #endif
1312     inEntry = 1;
1313   } else {
1314     beginExecute(e->getEvent(),e->getMsgtype(),e->getEpIdx(),
1315                  e->getSrcPe(),e->getTotalsize());
1316   }
1317 }
1318
1319 void TraceProjections::beginExecute(int event, int msgType, int ep, int srcPe,
1320                                     int mlen, CmiObjId *idx)
1321 {
1322   if (traceNestedEvents) {
1323     if (! nestedEvents.isEmpty()) {
1324       endExecuteLocal();
1325     }
1326     nestedEvents.enq(NestedEvent(event, msgType, ep, srcPe, mlen, idx));
1327   }
1328   beginExecuteLocal(event, msgType, ep, srcPe, mlen, idx);
1329 }
1330
1331 void TraceProjections::beginExecuteLocal(int event, int msgType, int ep, int srcPe,
1332                                     int mlen, CmiObjId *idx)
1333 {
1334 #if CMK_HAS_COUNTER_PAPI
1335   if (PAPI_read(papiEventSet, papiValues) != PAPI_OK) {
1336     CmiAbort("PAPI failed to read at begin execute!\n");
1337   }
1338 #endif
1339   if (checknested && inEntry) CmiAbort("Nested Begin Execute!\n");
1340   execEvent=event;
1341   execEp=ep;
1342   execPe=srcPe;
1343   _logPool->add(BEGIN_PROCESSING,msgType,ep,TraceTimer(),event,
1344                 srcPe, mlen, idx, 0.0, TraceCpuTimer());
1345 #if CMK_HAS_COUNTER_PAPI
1346   _logPool->addPapi(numPAPIEvents, papiEvents, papiValues);
1347 #endif
1348   inEntry = 1;
1349 }
1350
1351 void TraceProjections::endExecute(void)
1352 {
1353   if (traceNestedEvents) nestedEvents.deq();
1354   endExecuteLocal();
1355   if (traceNestedEvents) {
1356     if (! nestedEvents.isEmpty()) {
1357       NestedEvent &ne = nestedEvents.peek();
1358       beginExecuteLocal(ne.event, ne.msgType, ne.ep, ne.srcPe, ne.ml, ne.idx);
1359     }
1360   }
1361 }
1362
1363 void TraceProjections::endExecuteLocal(void)
1364 {
1365 #if CMK_HAS_COUNTER_PAPI
1366   if (PAPI_read(papiEventSet, papiValues) != PAPI_OK) {
1367     CmiAbort("PAPI failed to read at end execute!\n");
1368   }
1369 #endif
1370   if (checknested && !inEntry) CmiAbort("Nested EndExecute!\n");
1371   double cputime = TraceCpuTimer();
1372   if(execEp == (-1)) {
1373     _logPool->add(END_PROCESSING, 0, _threadEP, TraceTimer(),
1374                   execEvent, CkMyPe(), 0, NULL, 0.0, cputime);
1375   } else {
1376     _logPool->add(END_PROCESSING, 0, execEp, TraceTimer(),
1377                   execEvent, execPe, 0, NULL, 0.0, cputime);
1378   }
1379 #if CMK_HAS_COUNTER_PAPI
1380   _logPool->addPapi(numPAPIEvents, papiEvents, papiValues);
1381 #endif
1382   inEntry = 0;
1383 }
1384
1385 void TraceProjections::messageRecv(char *env, int pe)
1386 {
1387 #if 0
1388   envelope *e = (envelope *)env;
1389   int msgType = e->getMsgtype();
1390   int ep = e->getEpIdx();
1391 #if 0
1392   if (msgType==NewChareMsg || msgType==NewVChareMsg
1393           || msgType==ForChareMsg || msgType==ForVidMsg
1394           || msgType==BocInitMsg || msgType==NodeBocInitMsg
1395           || msgType==ForBocMsg || msgType==ForNodeBocMsg)
1396     ep = e->getEpIdx();
1397   else
1398     ep = _threadEP;
1399 #endif
1400   _logPool->add(MESSAGE_RECV, msgType, ep, TraceTimer(),
1401                 curevent++, e->getSrcPe(), e->getTotalsize());
1402 #endif
1403 }
1404
1405 void TraceProjections::beginIdle(double curWallTime)
1406 {
1407   _logPool->add(BEGIN_IDLE, 0, 0, TraceTimer(curWallTime), 0, CkMyPe());
1408 }
1409
1410 void TraceProjections::endIdle(double curWallTime)
1411 {
1412   _logPool->add(END_IDLE, 0, 0, TraceTimer(curWallTime), 0, CkMyPe());
1413 }
1414
1415 void TraceProjections::beginPack(void)
1416 {
1417   _logPool->add(BEGIN_PACK, 0, 0, TraceTimer(), 0, CkMyPe());
1418 }
1419
1420 void TraceProjections::endPack(void)
1421 {
1422   _logPool->add(END_PACK, 0, 0, TraceTimer(), 0, CkMyPe());
1423 }
1424
1425 void TraceProjections::beginUnpack(void)
1426 {
1427   _logPool->add(BEGIN_UNPACK, 0, 0, TraceTimer(), 0, CkMyPe());
1428 }
1429
1430 void TraceProjections::endUnpack(void)
1431 {
1432   _logPool->add(END_UNPACK, 0, 0, TraceTimer(), 0, CkMyPe());
1433 }
1434
1435 void TraceProjections::enqueue(envelope *) {}
1436
1437 void TraceProjections::dequeue(envelope *) {}
1438
1439 void TraceProjections::beginComputation(void)
1440 {
1441   computationStarted = 1;
1442
1443   // Executes the callback function provided by the machine
1444   // layer. This is the proper method to register user events in a
1445   // machine layer because projections is a charm++ module.
1446   if (CkpvAccess(traceOnPe) != 0) {
1447     void (*ptr)() = registerMachineUserEvents();
1448     if (ptr != NULL) {
1449       ptr();
1450     }
1451   }
1452 //  CkpvAccess(traceInitTime) = TRACE_TIMER();
1453 //  CkpvAccess(traceInitCpuTime) = TRACE_CPUTIMER();
1454   _logPool->add(BEGIN_COMPUTATION, 0, 0, TraceTimer(), -1, -1);
1455 #if CMK_HAS_COUNTER_PAPI
1456   // we start the counters here
1457   if (PAPI_start(papiEventSet) != PAPI_OK) {
1458     CmiAbort("PAPI failed to start designated counters!\n");
1459   }
1460 #endif
1461 }
1462
1463 void TraceProjections::endComputation(void)
1464 {
1465 #if CMK_HAS_COUNTER_PAPI
1466   // we stop the counters here. A silent failure is alright since we
1467   // are already at the end of the program.
1468   if (PAPI_stop(papiEventSet, papiValues) != PAPI_OK) {
1469     CkPrintf("Warning: PAPI failed to stop correctly!\n");
1470   }
1471   // NOTE: We should not do a complete close of PAPI until after the
1472   // sts writer is done.
1473 #endif
1474   endTime = TraceTimer();
1475   _logPool->add(END_COMPUTATION, 0, 0, endTime, -1, -1);
1476   /*
1477   CkPrintf("End Computation [%d] records time as %lf\n", CkMyPe(), 
1478            endTime*1e06);
1479   */
1480 }
1481
1482 int TraceProjections::idxRegistered(int idx)
1483 {
1484     int idxVecLen = idxVec.size();
1485     for(int i=0; i<idxVecLen; i++)
1486     {
1487         if(idx == idxVec[i])
1488             return 1;
1489     }
1490     return 0;
1491 }
1492
1493 void TraceProjections::regFunc(const char *name, int &idx, int idxSpecifiedByUser){
1494     StrKey k((char*)name,strlen(name));
1495     int num = funcHashtable.get(k);
1496     
1497     if(num!=0) {
1498         return;
1499         //as for mpi programs, the same function may be registered for several times
1500         //CmiError("\"%s has been already registered! Please change the name!\"\n", name);
1501     }
1502     
1503     int isIdxExisting=0;
1504     if(idxSpecifiedByUser)
1505         isIdxExisting=idxRegistered(idx);
1506     if(isIdxExisting){
1507         return;
1508         //same reason with num!=0
1509         //CmiError("The identifier %d for the trace function has been already registered!", idx);
1510     }
1511
1512     if(idxSpecifiedByUser) {
1513         char *st = new char[strlen(name)+1];
1514         memcpy(st,name,strlen(name)+1);
1515         StrKey *newKey = new StrKey(st,strlen(st));
1516         int &ref = funcHashtable.put(*newKey);
1517         ref=idx;
1518         funcCount++;
1519         idxVec.push_back(idx);  
1520     } else {
1521         char *st = new char[strlen(name)+1];
1522         memcpy(st,name,strlen(name)+1);
1523         StrKey *newKey = new StrKey(st,strlen(st));
1524         int &ref = funcHashtable.put(*newKey);
1525         ref=funcCount;
1526         num = funcCount;
1527         funcCount++;
1528         idx = num;
1529         idxVec.push_back(idx);
1530     }
1531 }
1532
1533 void TraceProjections::beginFunc(char *name,char *file,int line){
1534         StrKey k(name,strlen(name));    
1535         unsigned short  num = (unsigned short)funcHashtable.get(k);
1536         beginFunc(num,file,line);
1537 }
1538
1539 void TraceProjections::beginFunc(int idx,char *file,int line){
1540         if(idx <= 0){
1541                 CmiError("Unregistered function id %d being used in %s:%d \n",idx,file,line);
1542         }       
1543         _logPool->add(BEGIN_FUNC,TraceTimer(),idx,line,file);
1544 }
1545
1546 void TraceProjections::endFunc(char *name){
1547         StrKey k(name,strlen(name));    
1548         int num = funcHashtable.get(k);
1549         endFunc(num);   
1550 }
1551
1552 void TraceProjections::endFunc(int num){
1553         if(num <= 0){
1554                 printf("endFunc without start :O\n");
1555         }
1556         _logPool->add(END_FUNC,TraceTimer(),num,0,NULL);
1557 }
1558
1559 // specialized PUP:ers for handling trace projections logs
1560 void toProjectionsFile::bytes(void *p,int n,size_t itemSize,dataType t)
1561 {
1562   for (int i=0;i<n;i++) 
1563     switch(t) {
1564     case Tchar: CheckAndFPrintF(f,"%c",((char *)p)[i]); break;
1565     case Tuchar:
1566     case Tbyte: CheckAndFPrintF(f,"%d",((unsigned char *)p)[i]); break;
1567     case Tshort: CheckAndFPrintF(f," %d",((short *)p)[i]); break;
1568     case Tushort: CheckAndFPrintF(f," %u",((unsigned short *)p)[i]); break;
1569     case Tint: CheckAndFPrintF(f," %d",((int *)p)[i]); break;
1570     case Tuint: CheckAndFPrintF(f," %u",((unsigned int *)p)[i]); break;
1571     case Tlong: CheckAndFPrintF(f," %ld",((long *)p)[i]); break;
1572     case Tulong: CheckAndFPrintF(f," %lu",((unsigned long *)p)[i]); break;
1573     case Tfloat: CheckAndFPrintF(f," %.7g",((float *)p)[i]); break;
1574     case Tdouble: CheckAndFPrintF(f," %.15g",((double *)p)[i]); break;
1575 #ifdef CMK_PUP_LONG_LONG
1576     case Tlonglong: CheckAndFPrintF(f," %lld",((CMK_TYPEDEF_INT8 *)p)[i]); break;
1577     case Tulonglong: CheckAndFPrintF(f," %llu",((CMK_TYPEDEF_UINT8 *)p)[i]); break;
1578 #endif
1579     default: CmiAbort("Unrecognized pup type code!");
1580     };
1581 }
1582
1583 void fromProjectionsFile::bytes(void *p,int n,size_t itemSize,dataType t)
1584 {
1585   for (int i=0;i<n;i++) 
1586     switch(t) {
1587     case Tchar: { 
1588       char c = fgetc(f);
1589       if (c==EOF)
1590         parseError("Could not match character");
1591       else
1592         ((char *)p)[i] = c;
1593       break;
1594     }
1595     case Tuchar:
1596     case Tbyte: ((unsigned char *)p)[i]=(unsigned char)readInt("%d"); break;
1597     case Tshort:((short *)p)[i]=(short)readInt(); break;
1598     case Tushort: ((unsigned short *)p)[i]=(unsigned short)readUint(); break;
1599     case Tint:  ((int *)p)[i]=readInt(); break;
1600     case Tuint: ((unsigned int *)p)[i]=readUint(); break;
1601     case Tlong: ((long *)p)[i]=readInt(); break;
1602     case Tulong:((unsigned long *)p)[i]=readUint(); break;
1603     case Tfloat: ((float *)p)[i]=(float)readDouble(); break;
1604     case Tdouble:((double *)p)[i]=readDouble(); break;
1605 #ifdef CMK_PUP_LONG_LONG
1606     case Tlonglong: ((CMK_TYPEDEF_INT8 *)p)[i]=readLongInt(); break;
1607     case Tulonglong: ((CMK_TYPEDEF_UINT8 *)p)[i]=readLongInt(); break;
1608 #endif
1609     default: CmiAbort("Unrecognized pup type code!");
1610     };
1611 }
1612
1613 #if CMK_PROJECTIONS_USE_ZLIB
1614 void toProjectionsGZFile::bytes(void *p,int n,size_t itemSize,dataType t)
1615 {
1616   for (int i=0;i<n;i++) 
1617     switch(t) {
1618     case Tchar: gzprintf(f,"%c",((char *)p)[i]); break;
1619     case Tuchar:
1620     case Tbyte: gzprintf(f,"%d",((unsigned char *)p)[i]); break;
1621     case Tshort: gzprintf(f," %d",((short *)p)[i]); break;
1622     case Tushort: gzprintf(f," %u",((unsigned short *)p)[i]); break;
1623     case Tint: gzprintf(f," %d",((int *)p)[i]); break;
1624     case Tuint: gzprintf(f," %u",((unsigned int *)p)[i]); break;
1625     case Tlong: gzprintf(f," %ld",((long *)p)[i]); break;
1626     case Tulong: gzprintf(f," %lu",((unsigned long *)p)[i]); break;
1627     case Tfloat: gzprintf(f," %.7g",((float *)p)[i]); break;
1628     case Tdouble: gzprintf(f," %.15g",((double *)p)[i]); break;
1629 #ifdef CMK_PUP_LONG_LONG
1630     case Tlonglong: gzprintf(f," %lld",((CMK_TYPEDEF_INT8 *)p)[i]); break;
1631     case Tulonglong: gzprintf(f," %llu",((CMK_TYPEDEF_UINT8 *)p)[i]); break;
1632 #endif
1633     default: CmiAbort("Unrecognized pup type code!");
1634     };
1635 }
1636 #endif
1637
1638 void TraceProjections::endPhase() {
1639   double currentPhaseTime = TraceTimer();
1640   double lastPhaseTime = 0.0;
1641
1642   if (lastPhaseEvent != NULL) {
1643     lastPhaseTime = lastPhaseEvent->time;
1644   } else {
1645     if (_logPool->pool != NULL) {
1646       // assumed to be BEGIN_COMPUTATION
1647       lastPhaseTime = _logPool->pool[0].time;
1648     } else {
1649       CkPrintf("[%d] Warning: End Phase encountered in an empty log. Inserting BEGIN_COMPUTATION event\n", CkMyPe());
1650       _logPool->add(BEGIN_COMPUTATION, 0, 0, currentPhaseTime, -1, -1);
1651       lastPhaseTime = currentPhaseTime;
1652     }
1653   }
1654
1655   /* Insert endPhase event here. */
1656   /* FIXME: Format should be TYPE, PHASE#, TimeStamp, [StartTime] */
1657   /*        We currently "borrow" from the standard add() method. */
1658   /*        It should really be its own add() method.             */
1659   /* NOTE: assignment to lastPhaseEvent is "pre-emptive".         */
1660   lastPhaseEvent = &(_logPool->pool[_logPool->numEntries]);
1661   _logPool->add(END_PHASE, 0, currentPhaseID, currentPhaseTime, -1, CkMyPe());
1662   currentPhaseID++;
1663 }
1664
1665 #ifdef PROJ_ANALYSIS
1666 // ***** FROM HERE, ALL BOC-BASED FUNCTIONALITY IS DEFINED *******
1667
1668
1669 // ***@@@@ REGISTRATION FUNCTIONS/METHODS @@@@***
1670
1671 void registerOutlierReduction() {
1672   outlierReductionType =
1673     CkReduction::addReducer(outlierReduction);
1674   minMaxReductionType =
1675     CkReduction::addReducer(minMaxReduction);
1676 }
1677
1678 /**
1679  * **IMPT NOTES**:
1680  *
1681  * This is the C++ code that is registered to be activated at module
1682  * shutdown. This is called exactly once on processor 0. Module shutdown
1683  * is initiated as a result of a CkExit() call by the application code
1684  * 
1685  * The exit function must ultimately call CkExit() again to
1686  * so that other module exit functions may proceed after this module is
1687  * done.
1688  *
1689  */
1690 // FIXME: WHY extern "C"???
1691 extern "C" void TraceProjectionsExitHandler()
1692 {
1693 #if CMK_TRACE_ENABLED
1694   // CkPrintf("[%d] TraceProjectionsExitHandler called!\n", CkMyPe());
1695   CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1696   bocProxy.traceProjectionsParallelShutdown(CkMyPe());
1697 #else
1698   CkExit();
1699 #endif
1700 }
1701
1702 // This is called once on each processor but the idiom of use appears
1703 // to be to only have processor 0 register the function.
1704 //
1705 // See initnode in trace-projections.ci
1706 void initTraceProjectionsBOC()
1707 {
1708   // CkPrintf("[%d] Trace Projections initialization called!\n", CkMyPe());
1709 #ifdef __BLUEGENE__
1710   if (BgNodeRank() == 0) {
1711 #else
1712     if (CkMyRank() == 0) {
1713 #endif
1714       registerExitFn(TraceProjectionsExitHandler);
1715     }
1716 #if 0
1717   } // this is so indentation does not get messed up
1718 #endif
1719 }
1720
1721 // mainchare for trace-projections BOC-operations. 
1722 // Instantiated at processor 0 and ONLY resides on processor 0 for the 
1723 // rest of its life.
1724 //
1725 // Responsible for:
1726 //   1. Handling commandline arguments
1727 //   2. Creating any objects required for proper BOC operations.
1728 //
1729 TraceProjectionsInit::TraceProjectionsInit(CkArgMsg *msg) {
1730   /** Options for Outlier Analysis */
1731   // defaults. Things will change with support for interactive analysis.
1732   bool findOutliers = false;
1733   bool outlierAutomatic = true;
1734   int numKSeeds = 10; 
1735
1736   int peNumKeep = CkNumPes();  // used as a default
1737   double entryThreshold = 0.0;
1738   bool outlierUsePhases = false;
1739   if (outlierAutomatic) {
1740     CmiGetArgIntDesc(msg->argv, "+outlierNumSeeds", &numKSeeds,
1741                      "Number of cluster seeds to apply at outlier analysis.");
1742     CmiGetArgIntDesc(msg->argv, "+outlierPeNumKeep", 
1743                      &peNumKeep, "Number of Processors to retain data");
1744     CmiGetArgDoubleDesc(msg->argv, "+outlierEpThresh", &entryThreshold,
1745                         "Minimum significance of entry points to be considered for clustering (%).");
1746     findOutliers =
1747       CmiGetArgFlagDesc(msg->argv,"+outlier", "Find Outliers.");
1748     outlierUsePhases = 
1749       CmiGetArgFlagDesc(msg->argv,"+outlierUsePhases",
1750                         "Apply automatic outlier analysis to any available phases.");
1751     if (outlierUsePhases) {
1752       // if the user wants to use an outlier feature, it is assumed outlier
1753       //    analysis is desired.
1754       findOutliers = true;
1755     }
1756   }
1757   traceProjectionsGID = CProxy_TraceProjectionsBOC::ckNew(findOutliers);
1758   if (findOutliers) {
1759     kMeansGID = CProxy_KMeansBOC::ckNew(outlierAutomatic,
1760                                         numKSeeds,
1761                                         peNumKeep,
1762                                         entryThreshold,
1763                                         outlierUsePhases);
1764   }
1765 }
1766
1767 // Called on every processor.
1768 void TraceProjectionsBOC::traceProjectionsParallelShutdown(int pe) {
1769   //CmiPrintf("[%d] traceProjectionsParallelShutdown called from . \n", CkMyPe(), pe);
1770   endPe = pe;                // the pe that starts CkExit()
1771   if (CkMyPe() == 0) {
1772     analysisStartTime = CmiWallTimer();
1773   }
1774   CkpvAccess(_trace)->endComputation();
1775   // no more tracing for projections on this processor after this point. 
1776   // Note that clear must be called after remove, or bad things will happen.
1777   CkpvAccess(_traces)->removeTrace(CkpvAccess(_trace));
1778   CkpvAccess(_traces)->clearTrace();
1779
1780   // From this point, we start multiple chains of reductions and broadcasts to
1781   // perform final online analysis activities.
1782
1783   // Start all parallel operations at once. 
1784   //   These MUST NOT modify base performance data in LogPool. If they must,
1785   //   then the parallel operations must be phased (and this code to be
1786   //   restructured as necessary)
1787   CProxy_KMeansBOC kMeansProxy(kMeansGID);
1788   CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1789   if (findOutliers) {
1790     parModulesRemaining++;
1791     kMeansProxy[CkMyPe()].startKMeansAnalysis();
1792   }
1793   parModulesRemaining++;
1794   bocProxy[CkMyPe()].startEndTimeAnalysis();
1795 }
1796
1797 // Called on each processor
1798 void KMeansBOC::startKMeansAnalysis() {
1799   // Initialize all necessary structures
1800   LogPool *pool = CkpvAccess(_trace)->_logPool;
1801
1802  if(CkMyPe()==0)     CkPrintf("[%d] KMeansBOC::startKMeansAnalysis time=\t%g\n", CkMyPe(), CkWallTimer() );
1803   int flushInt = 0;
1804   if (pool->hasFlushed) {
1805     flushInt = 1;
1806   }
1807   
1808   CkCallback cb(CkIndex_KMeansBOC::flushCheck(NULL), 
1809                 0, thisProxy);
1810   contribute(sizeof(int), &flushInt, CkReduction::logical_or, cb);  
1811 }
1812
1813 // Called on processor 0
1814 void KMeansBOC::flushCheck(CkReductionMsg *msg) {
1815   int someFlush = *((int *)msg->getData());
1816
1817   // if(CkMyPe()==0) CkPrintf("[%d] KMeansBOC::flushCheck time=\t%g\n", CkMyPe(), CkWallTimer() );
1818   
1819   if (someFlush == 0) {
1820     // Data intact proceed with KMeans analysis
1821     CProxy_KMeansBOC kMeansProxy(kMeansGID);
1822     kMeansProxy.flushCheckDone();
1823   } else {
1824     // Some processor had flushed it data at some point, abandon KMeans
1825     CkPrintf("Warning: Some processor has flushed its data. No KMeans will be conducted\n");
1826     // terminate KMeans
1827     CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1828     bocProxy[0].kMeansDone();
1829   }
1830 }
1831
1832 // Called on each processor
1833 void KMeansBOC::flushCheckDone() {
1834   // **FIXME** - more flexible metric collection scheme may be necessary
1835   //   in the future for production use.
1836   LogPool *pool = CkpvAccess(_trace)->_logPool;
1837
1838   // if(CkMyPe()==0)     CkPrintf("[%d] KMeansBOC::flushCheckDone time=\t%g\n", CkMyPe(), CkWallTimer() );
1839
1840   numEntryMethods = _entryTable.size();
1841   numMetrics = numEntryMethods + 2; // EPtime + idle and overhead
1842
1843   // maintained across phases
1844   markedBegin = false;
1845   markedIdle = false;
1846   beginBlockTime = 0.0;
1847   beginIdleBlockTime = 0.0;
1848   lastBeginEPIdx = -1; // none
1849
1850   lastPhaseIdx = 0;
1851   currentExecTimes = NULL;
1852   currentPhase = 0;
1853   selected = false;
1854
1855   pool->initializePhases();
1856
1857   // incoming K Seeds and the per-phase filter
1858   incKSeeds = new double[numK*numMetrics];
1859   keepMetric = new bool[numMetrics];
1860
1861   //  Something wrong when call thisProxy[CkMyPe()].getNextPhaseMetrics() !??!
1862   //  CProxy_KMeansBOC kMeansProxy(kMeansGID);
1863   //  kMeansProxy[CkMyPe()].getNextPhaseMetrics();
1864   thisProxy[CkMyPe()].getNextPhaseMetrics();
1865 }
1866
1867 // Called on each processor.
1868 void KMeansBOC::getNextPhaseMetrics() {
1869   // Assumes the presence of the complete logs on this processor.
1870   // Assumes first event is always BEGIN_COMPUTATION
1871   // Assumes each processor sees the same number of phases.
1872   //
1873   // In this code, we collect performance data for this processor.
1874   // All times are in seconds.
1875
1876   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::getNextPhaseMetrics time=\t%g\n", CkMyPe(), CkWallTimer() );  
1877
1878   if (usePhases) {
1879     DEBUGF("[%d] Using Phases\n", CkMyPe());
1880   } else {
1881     DEBUGF("[%d] NOT using Phases\n", CkMyPe());
1882   }
1883   
1884   if (currentExecTimes != NULL) {
1885     delete [] currentExecTimes;
1886   }
1887   currentExecTimes = new double[numMetrics];
1888   for (int i=0; i<numMetrics; i++) {
1889     currentExecTimes[i] = 0.0;
1890   }
1891
1892   int numEventMethods = _entryTable.size();
1893   LogPool *pool = CkpvAccess(_trace)->_logPool;
1894   
1895   CkAssert(pool->numEntries > lastPhaseIdx);
1896   double startPhaseTime = pool->pool[lastPhaseIdx].time;
1897   double totalPhaseTime = 0.0;
1898   double totalActiveTime = 0.0; // entry method + idle
1899
1900   for (int i=lastPhaseIdx; i<pool->numEntries; i++) {
1901     if (pool->pool[i].type == BEGIN_PROCESSING) {
1902       // check pairing
1903       if (!markedBegin) {
1904         markedBegin = true;
1905       }
1906       beginBlockTime = pool->pool[i].time;
1907       lastBeginEPIdx = pool->pool[i].eIdx;
1908     } else if (pool->pool[i].type == END_PROCESSING) {
1909       // check pairing
1910       // if End without a begin, just ignore
1911       //   this event. If a phase-boundary is crossed, the Begin
1912       //   event would be maintained in beginBlockTime, so it is 
1913       //   not a problem.
1914       if (markedBegin) {
1915         markedBegin = false;
1916         if (pool->pool[i].event < 0)
1917         {
1918           // ignore dummy events. **FIXME** as they have no eIdx?
1919           continue;
1920         }
1921         currentExecTimes[pool->pool[i].eIdx] += 
1922           pool->pool[i].time - beginBlockTime;
1923         totalActiveTime += pool->pool[i].time - beginBlockTime;
1924         lastBeginEPIdx = -1;
1925       }
1926     } else if (pool->pool[i].type == BEGIN_IDLE) {
1927       // check pairing
1928       if (!markedIdle) {
1929         markedIdle = true;
1930       }
1931       beginIdleBlockTime = pool->pool[i].time;
1932     } else if (pool->pool[i].type == END_IDLE) {
1933       // check pairing
1934       if (markedIdle) {
1935         markedIdle = false;
1936         currentExecTimes[numEventMethods] += 
1937           pool->pool[i].time - beginIdleBlockTime;
1938         totalActiveTime += pool->pool[i].time - beginIdleBlockTime;
1939       }
1940     } else if (pool->pool[i].type == END_PHASE) {
1941       // ignored when not using phases
1942       if (usePhases) {
1943         // when we've not visited this node before
1944         if (i != lastPhaseIdx) { 
1945           totalPhaseTime = 
1946             pool->pool[i].time - pool->pool[lastPhaseIdx].time;
1947           // it is important that proper accounting of time take place here.
1948           // Note that END_PHASE events inevitably occur in the context of
1949           //   some entry method by the way the tracing API is designed.
1950           if (markedBegin) {
1951             CkAssert(lastBeginEPIdx >= 0);
1952             currentExecTimes[lastBeginEPIdx] += 
1953               pool->pool[i].time - beginBlockTime;
1954             totalActiveTime += pool->pool[i].time - beginBlockTime;
1955             // this is so the remainder contributes to the next phase
1956             beginBlockTime = pool->pool[i].time;
1957           }
1958           // The following is unlikely, but stranger things have happened.
1959           if (markedIdle) {
1960             currentExecTimes[numEventMethods] +=
1961               pool->pool[i].time - beginIdleBlockTime;
1962             totalActiveTime += pool->pool[i].time - beginIdleBlockTime;
1963             // this is so the remainder contributes to the next phase
1964             beginIdleBlockTime = pool->pool[i].time;
1965           }
1966           if (totalActiveTime <= totalPhaseTime) {
1967             currentExecTimes[numEventMethods+1] = 
1968               totalPhaseTime - totalActiveTime;
1969           } else {
1970             currentExecTimes[numEventMethods+1] = 0.0;
1971             CkPrintf("[%d] Warning: Overhead found to be negative for Phase %d!\n",
1972                      CkMyPe(), currentPhase);
1973           }
1974           collectKMeansData();
1975           // end the loop (and method) and defer the work till the next call
1976           lastPhaseIdx = i;
1977           break; 
1978         }
1979       }
1980     } else if (pool->pool[i].type == END_COMPUTATION) {
1981       if (markedBegin) {
1982         CkAssert(lastBeginEPIdx >= 0);
1983         currentExecTimes[lastBeginEPIdx] += 
1984           pool->pool[i].time - beginBlockTime;
1985         totalActiveTime += pool->pool[i].time - beginBlockTime;
1986       }
1987       if (markedIdle) {
1988         currentExecTimes[numEventMethods] +=
1989           pool->pool[i].time - beginIdleBlockTime;
1990         totalActiveTime += pool->pool[i].time - beginIdleBlockTime;
1991       }
1992       totalPhaseTime = 
1993         pool->pool[i].time - pool->pool[lastPhaseIdx].time;
1994       if (totalActiveTime <= totalPhaseTime) {
1995         currentExecTimes[numEventMethods+1] = totalPhaseTime - totalActiveTime;
1996       } else {
1997         currentExecTimes[numEventMethods+1] = 0.0;
1998         CkPrintf("[%d] Warning: Overhead found to be negative!\n",
1999                  CkMyPe());
2000       }
2001       collectKMeansData();
2002     }
2003   }
2004 }
2005
2006 /**
2007  *  Through a reduction, collectKMeansData aggregates each processors' data
2008  *  in order for global properties to be determined:
2009  *  
2010  *  1. min & max to determine normalization factors.
2011  *  2. sum to determine global EP averages for possible metric reduction
2012  *       through thresholding.
2013  *  3. sum of squares to compute stddev which may be useful in the future.
2014  *
2015  *  collectKMeansData will also keep the processor's data for the current
2016  *    phase so that it may be normalized and worked on subsequently.
2017  *
2018  **/
2019 void KMeansBOC::collectKMeansData() {
2020   int minOffset = numMetrics;
2021   int maxOffset = 2*numMetrics;
2022   int sosOffset = 3*numMetrics; // sos = Sum Of Squares
2023
2024   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::collectKMeansData time=\tg\n", CkMyPe(), CkWallTimer() );
2025
2026   double *reductionMsg = new double[numMetrics*4];
2027
2028   for (int i=0; i<numMetrics; i++) {
2029     reductionMsg[i] = currentExecTimes[i];
2030     // duplicate the event times for max and min sections of the reduction
2031     reductionMsg[minOffset + i] = currentExecTimes[i];
2032     reductionMsg[maxOffset + i] = currentExecTimes[i];
2033     // compute squares
2034     reductionMsg[sosOffset + i] = currentExecTimes[i]*currentExecTimes[i];
2035   }
2036
2037   CkCallback cb(CkIndex_KMeansBOC::globalMetricRefinement(NULL), 
2038                 0, thisProxy);
2039   contribute((numMetrics*4)*sizeof(double), reductionMsg, 
2040              outlierReductionType, cb);  
2041 }
2042
2043 // The purpose is mainly to initialize the k seeds and generate
2044 //   normalization parameters for each of the metrics. The k seeds
2045 //   and normalization parameters are broadcast to all processors.
2046 //
2047 // Called on processor 0
2048 void KMeansBOC::globalMetricRefinement(CkReductionMsg *msg) {
2049   CkAssert(CkMyPe() == 0);
2050   
2051   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::globalMetricRefinement time=\t%g\n", CkMyPe(), CkWallTimer() );
2052
2053   int sumOffset = 0;
2054   int minOffset = numMetrics;
2055   int maxOffset = 2*numMetrics;
2056   int sosOffset = 3*numMetrics; // sos = Sum Of Squares
2057
2058   // calculate statistics & boundaries for the k seeds for clustering
2059   KMeansStatsMessage *outmsg = 
2060     new (numMetrics, numK*numMetrics, numMetrics*4) KMeansStatsMessage;
2061   outmsg->numMetrics = numMetrics;
2062   outmsg->numKPos = numK*numMetrics;
2063   outmsg->numStats = numMetrics*4;
2064
2065   // Sum | Min | Max | Sum of Squares
2066   double *totalExecTimes = (double *)msg->getData();
2067   double totalTime = 0.0;
2068
2069   for (int i=0; i<numMetrics; i++) {
2070     DEBUGN("%lf\n", totalExecTimes[i]);
2071     totalTime += totalExecTimes[i];
2072
2073     // calculate event mean over all processors
2074     outmsg->stats[sumOffset + i] = totalExecTimes[sumOffset + i]/CkNumPes();
2075
2076     // get the ranges and offsets of each metric. With this, we get
2077     //   normalization factors that can be sent back to each processor to
2078     //   be used as necessary. We reuse max for range. Min remains the offset.
2079     outmsg->stats[minOffset + i] = totalExecTimes[minOffset + i];
2080     outmsg->stats[maxOffset + i] = totalExecTimes[maxOffset + i] -
2081       totalExecTimes[minOffset + i];
2082     
2083     // calculate stddev (using biased variance)
2084     outmsg->stats[sosOffset + i] = 
2085       sqrt((totalExecTimes[sosOffset + i] - 
2086             2*(outmsg->stats[i])*totalExecTimes[i] +
2087             (outmsg->stats[i])*(outmsg->stats[i])*CkNumPes())/
2088            CkNumPes());
2089   }
2090
2091   for (int i=0; i<numMetrics; i++) {
2092     // 1) if the proportion of the max value of the entry method relative to
2093     //   the average time taken over all entry methods across all processors
2094     //   is greater than the stipulated percentage threshold ...; AND
2095     // 2) if the range of values are non-zero.
2096     //
2097     // The current assumption is totalTime > 0 (what program has zero total
2098     //   time from all work?)
2099     keepMetric[i] = ((totalExecTimes[maxOffset + i]/(totalTime/CkNumPes()) >=
2100                      entryThreshold) &&
2101       (totalExecTimes[maxOffset + i] > totalExecTimes[minOffset + i]));
2102     if (keepMetric[i]) {
2103       DEBUGF("[%d] Keep EP %d | Max = %lf | Avg Tot = %lf\n", CkMyPe(), i,
2104              totalExecTimes[maxOffset + i], totalTime/CkNumPes());
2105     } else {
2106       DEBUGN("[%d] DO NOT Keep EP %d\n", CkMyPe(), i);
2107     }
2108     outmsg->filter[i] = keepMetric[i];
2109   }
2110
2111   delete msg;
2112   
2113   // initialize k seeds for this phase
2114   kSeeds = new double[numK*numMetrics];
2115
2116   numKReported = 0;
2117   kNumMembers = new int[numK];
2118
2119   // Randomly select k processors' metric vectors for the k seeds
2120   //  srand((unsigned)(CmiWallTimer()*1.0e06));
2121   srand(11337); // for debugging purposes
2122   for (int k=0; k<numK; k++) {
2123     DEBUGF("Seed %d | ", k);
2124     for (int m=0; m<numMetrics; m++) {
2125       double factor = totalExecTimes[maxOffset + m] - 
2126         totalExecTimes[minOffset + m];
2127       // "uniform" distribution, scaled according to the normalization
2128       //   factors
2129       //      kSeeds[numMetrics*k + m] = ((1.0*(k+1))/numK)*factor;
2130       // Random distribution.
2131       kSeeds[numMetrics*k + m] =
2132         ((rand()*1.0)/RAND_MAX)*factor;
2133       if (keepMetric[m]) {
2134         DEBUGF("[%d|%lf] ", m, kSeeds[numMetrics*k + m]);
2135       }
2136       outmsg->kSeedsPos[numMetrics*k + m] = kSeeds[numMetrics*k + m];
2137     }
2138     DEBUGF("\n");
2139     kNumMembers[k] = 0;
2140   }
2141
2142   // broadcast statistical values to all processors for cluster discovery
2143   thisProxy.findInitialClusters(outmsg);
2144 }
2145
2146
2147
2148 // Called on each processor.
2149 void KMeansBOC::findInitialClusters(KMeansStatsMessage *msg) {
2150
2151  if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::findInitialClusters time=\t%g\n", CkMyPe(), CkWallTimer() );
2152
2153   phaseIter = 0;
2154
2155   // Get info from stats message
2156   CkAssert(numMetrics == msg->numMetrics);
2157   for (int i=0; i<numMetrics; i++) {
2158     keepMetric[i] = msg->filter[i];
2159   }
2160
2161   // Normalize data on local processor.
2162   // **CWL** See my thesis for detailed discussion of normalization of
2163   //    performance data.
2164   // **NOTE** This might change if we want to send data based on the filter
2165   //   instead of all the data.
2166   CkAssert(numMetrics*4 == msg->numStats);
2167   for (int i=0; i<numMetrics; i++) {
2168     currentExecTimes[i] -= msg->stats[numMetrics + i];  // take offset
2169     // **CWL** We do not normalize the range. Entry methods that exhibit
2170     //   large absolute timing variations should be allowed to contribute
2171     //   more to the Euclidean distance measure!
2172     // currentExecTimes[i] /= msg->stats[2*numMetrics + i];
2173   }
2174
2175   // **NOTE** This might change if we want to send data based on the filter
2176   //   instead of all the data.
2177   CkAssert(numK*numMetrics == msg->numKPos);
2178   for (int i=0; i<msg->numKPos; i++) {
2179     incKSeeds[i] = msg->kSeedsPos[i];
2180   }
2181
2182   // Decide which KSeed this processor belongs to.
2183   minDistance = calculateDistance(0);
2184   DEBUGN("[%d] Phase %d Iter %d | Distance from 0 = %lf \n", CkMyPe(), 
2185            currentPhase, phaseIter, minDistance);
2186   minK = 0;
2187   for (int i=1; i<numK; i++) {
2188     double distance = calculateDistance(i);
2189     DEBUGN("[%d] Phase %d Iter %d | Distance from %d = %lf \n", CkMyPe(), 
2190              currentPhase, phaseIter, i, distance);
2191     if (distance < minDistance) {
2192       minDistance = distance;
2193       minK = i;
2194     }
2195   }
2196
2197   // Set up a reduction with the modification vector to the root (0).
2198   //
2199   // The modification vector sends a negative value for each metric
2200   //   for the K this processor no longer belongs to and a positive
2201   //   value to the K the processor now belongs. In addition, a -1.0
2202   //   is sent to the K it is leaving and a +1.0 to the K it is 
2203   //   joining.
2204   //
2205   // The processor must still contribute a "zero returns" even if
2206   //   nothing changes. This will be the basis for determine
2207   //   convergence at the root.
2208   //
2209   // The addtional +1 is meant for the count-change that must be
2210   //   maintained for the special cases at the root when some K
2211   //   may be deprived of all processor points or go from 0 to a
2212   //   positive number of processors (see later comments).
2213   double *modVector = new double[numK*(numMetrics+1)];
2214   for (int i=0; i<numK; i++) {
2215     for (int j=0; j<numMetrics+1; j++) {
2216       modVector[i*(numMetrics+1) + j] = 0.0;
2217     }
2218   }
2219   for (int i=0; i<numMetrics; i++) {
2220     // for this initialization, only positive values need be sent.
2221     modVector[minK*(numMetrics+1) + i] = currentExecTimes[i];
2222   }
2223   modVector[minK*(numMetrics+1)+numMetrics] = 1.0;
2224
2225   CkCallback cb(CkIndex_KMeansBOC::updateKSeeds(NULL), 
2226                 0, thisProxy);
2227   contribute(numK*(numMetrics+1)*sizeof(double), modVector, 
2228              CkReduction::sum_double, cb);  
2229 }
2230
2231 double KMeansBOC::calculateDistance(int k) {
2232   double ret = 0.0;
2233   for (int i=0; i<numMetrics; i++) {
2234     if (keepMetric[i]) {
2235       DEBUGN("[%d] Phase %d Iter %d Metric %d Exec %lf Seed %lf \n", 
2236              CkMyPe(), currentPhase, phaseIter, i,
2237                currentExecTimes[i], incKSeeds[k*numMetrics + i]);
2238       ret += pow(currentExecTimes[i] - incKSeeds[k*numMetrics + i], 2.0);
2239     }
2240   }
2241   return sqrt(ret);
2242 }
2243
2244 void KMeansBOC::updateKSeeds(CkReductionMsg *msg) {
2245   CkAssert(CkMyPe() == 0);
2246
2247   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::updateKSeeds time=\t%g\n", CkMyPe(), CkWallTimer() );
2248
2249   double *modVector = (double *)msg->getData();
2250   // sanity check
2251   CkAssert(numK*(numMetrics+1)*sizeof(double) == msg->getSize());
2252
2253   // A quick convergence test.
2254   bool hasChanges = false;
2255   for (int i=0; i<numK; i++) {
2256     hasChanges = hasChanges || 
2257       (modVector[i*(numMetrics+1) + numMetrics] != 0.0);
2258   }
2259   if (!hasChanges) {
2260     delete msg;
2261     findRepresentatives();
2262   } else {
2263     int overallChange = 0;
2264     for (int i=0; i<numK; i++) {
2265       int change = (int)modVector[i*(numMetrics+1) + numMetrics];
2266       if (change != 0) {
2267         overallChange += change;
2268         // modify the k seeds based on the modification vectors coming in
2269         //
2270         // If a seed initially has no members, its contents do not matter and
2271         //   is simply set to the average of the incoming vector.
2272         // If the change causes a seed to lose all its members, do nothing.
2273         //   Its last-known location is kept to allow it to re-capture
2274         //   membership at the next iteration rather than apply the last
2275         //   changes (which snaps the point unnaturally to 0,0).
2276         // Otherwise, apply the appropriate vector changes.
2277         CkAssert((kNumMembers[i] + change >= 0) &&
2278                  (kNumMembers[i] + change <= CkNumPes()));
2279         if (kNumMembers[i] == 0) {
2280           CkAssert(change > 0);
2281           for (int j=0; j<numMetrics; j++) {
2282             kSeeds[i*numMetrics + j] = modVector[i*(numMetrics+1) + j]/change;
2283           }
2284         } else if (kNumMembers[i] + change == 0) {
2285           // do nothing.
2286         } else {
2287           for (int j=0; j<numMetrics; j++) {
2288             kSeeds[i*numMetrics + j] *= kNumMembers[i];
2289             kSeeds[i*numMetrics + j] += modVector[i*(numMetrics+1) + j];
2290             kSeeds[i*numMetrics + j] /= kNumMembers[i] + change;
2291           }
2292         }
2293         kNumMembers[i] += change;
2294       }
2295       DEBUGN("[%d] Phase %d Iter %d K = %d Membership Count = %d\n",
2296              CkMyPe(), currentPhase, phaseIter, i, kNumMembers[i]);
2297     }
2298     delete msg;
2299
2300     // broadcast the new seed locations.
2301     KSeedsMessage *outmsg = new (numK*numMetrics) KSeedsMessage;
2302     outmsg->numKPos = numK*numMetrics;
2303     for (int i=0; i<numK*numMetrics; i++) {
2304       outmsg->kSeedsPos[i] = kSeeds[i];
2305     }
2306
2307     thisProxy.updateSeedMembership(outmsg);
2308   }
2309 }
2310
2311 // Called on all processors
2312 void KMeansBOC::updateSeedMembership(KSeedsMessage *msg) {
2313
2314   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::updateSeedMembership time=\t%g\n", CkMyPe(), CkWallTimer() );
2315
2316   phaseIter++;
2317
2318   // **NOTE** This might change if we want to send data based on the filter
2319   //   instead of all the data.
2320   CkAssert(numK*numMetrics == msg->numKPos);
2321   for (int i=0; i<msg->numKPos; i++) {
2322     incKSeeds[i] = msg->kSeedsPos[i];
2323   }
2324
2325   // Decide which KSeed this processor belongs to.
2326   lastMinK = minK;
2327   minDistance = calculateDistance(0);
2328   DEBUGN("[%d] Phase %d Iter %d | Distance from 0 = %lf \n", CkMyPe(), 
2329          currentPhase, phaseIter, minDistance);
2330
2331   minK = 0;
2332   for (int i=1; i<numK; i++) {
2333     double distance = calculateDistance(i);
2334     DEBUGN("[%d] Phase %d Iter %d | Distance from %d = %lf \n", CkMyPe(), 
2335            currentPhase, phaseIter, i, distance);
2336     if (distance < minDistance) {
2337       minDistance = distance;
2338       minK = i;
2339     }
2340   }
2341
2342   double *modVector = new double[numK*(numMetrics+1)];
2343   for (int i=0; i<numK; i++) {
2344     for (int j=0; j<numMetrics+1; j++) {
2345       modVector[i*(numMetrics+1) + j] = 0.0;
2346     }
2347   }
2348
2349   if (minK != lastMinK) {
2350     for (int i=0; i<numMetrics; i++) {
2351       modVector[minK*(numMetrics+1) + i] = currentExecTimes[i];
2352       modVector[lastMinK*(numMetrics+1) + i] = -currentExecTimes[i];
2353     }
2354     modVector[minK*(numMetrics+1)+numMetrics] = 1.0;
2355     modVector[lastMinK*(numMetrics+1)+numMetrics] = -1.0;
2356   }
2357
2358   CkCallback cb(CkIndex_KMeansBOC::updateKSeeds(NULL), 
2359                 0, thisProxy);
2360   contribute(numK*(numMetrics+1)*sizeof(double), modVector, 
2361              CkReduction::sum_double, cb);  
2362 }
2363
2364 void KMeansBOC::findRepresentatives() {
2365
2366   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::findRepresentatives time=\t%g\n", CkMyPe(), CkWallTimer() );
2367
2368   int numNonEmptyClusters = 0;
2369   for (int i=0; i<numK; i++) {
2370     if (kNumMembers[i] > 0) {
2371       numNonEmptyClusters++;
2372     }
2373   }
2374
2375   int numRepresentatives = peNumKeep;
2376   // **FIXME**
2377   // This is fairly arbitrary. Next time, choose the centers of the top
2378   //   largest clusters.
2379   if (numRepresentatives < numNonEmptyClusters) {
2380     numRepresentatives = numNonEmptyClusters;
2381   }
2382
2383   int slotsRemaining = numRepresentatives;
2384
2385   DEBUGF("Slots = %d | Non-empty = %d \n", slotsRemaining, 
2386          numNonEmptyClusters);
2387
2388   // determine how many exemplars to select per cluster. Currently
2389   //   hardcoded to 1. Future challenge is to decide on other numbers
2390   //   or proportionality.
2391   //
2392   int exemplarsPerCluster = 1;
2393   slotsRemaining -= exemplarsPerCluster*numNonEmptyClusters;
2394
2395   int numCandidateOutliers = CkNumPes() - 
2396     exemplarsPerCluster*numNonEmptyClusters;
2397
2398   double *remainders = new double[numK];
2399   int *assigned = new int[numK];
2400   exemplarChoicesLeft = new int[numK];
2401   outlierChoicesLeft = new int[numK];
2402
2403   for (int i=0; i<numK; i++) {
2404     assigned[i] = 0;
2405     remainders[i] = 
2406       (kNumMembers[i] - exemplarsPerCluster*numNonEmptyClusters) *
2407       slotsRemaining / numCandidateOutliers;
2408     if (remainders[i] >= 0.0) {
2409       assigned[i] = (int)floor(remainders[i]);
2410       remainders[i] -= assigned[i];
2411     } else {
2412       remainders[i] = 0.0;
2413     }
2414   }
2415
2416   for (int i=0; i<numK; i++) {
2417     slotsRemaining -= assigned[i];
2418   }
2419   CkAssert(slotsRemaining >= 0);
2420
2421   // find clusters to assign the loose slots to, in order of
2422   // remainder proportion
2423   while (slotsRemaining > 0) {
2424     double max = 0.0;
2425     int winner = 0;
2426     for (int i=0; i<numK; i++) {
2427       if (remainders[i] > max) {
2428         max = remainders[i];
2429         winner = i;
2430       }
2431     }
2432     assigned[winner]++;
2433     remainders[winner] = 0.0;
2434     slotsRemaining--;
2435   }
2436
2437   // set up how many reduction cycles of min/max we need to conduct to
2438   // select the representatives.
2439   numSelectionIter = exemplarsPerCluster;
2440   for (int i=0; i<numK; i++) {
2441     if (assigned[i] > numSelectionIter) {
2442       numSelectionIter = assigned[i];
2443     }
2444   }
2445   DEBUGF("Selection Iterations = %d\n", numSelectionIter);
2446
2447   for (int i=0; i<numK; i++) {
2448     if (kNumMembers[i] > 0) {
2449       exemplarChoicesLeft[i] = exemplarsPerCluster;
2450       outlierChoicesLeft[i] = assigned[i];
2451     } else {
2452       exemplarChoicesLeft[i] = 0;
2453       outlierChoicesLeft[i] = 0;
2454     }
2455     DEBUGF("%d | Exemplar = %d | Outlier = %d\n", i, exemplarChoicesLeft[i],
2456            outlierChoicesLeft[i]);
2457   }
2458
2459   delete [] assigned;
2460   delete [] remainders;
2461
2462   // send out first broadcast
2463   KSelectionMessage *outmsg = NULL;
2464   if (numSelectionIter > 0) {
2465     outmsg = new (numK, numK, numK) KSelectionMessage;
2466     outmsg->numKMinIDs = numK;
2467     outmsg->numKMaxIDs = numK;
2468     for (int i=0; i<numK; i++) {
2469       outmsg->minIDs[i] = -1;
2470       outmsg->maxIDs[i] = -1;
2471     }
2472     thisProxy.collectDistances(outmsg);
2473   } else {
2474     CkPrintf("Warning: No selection iteration from the start!\n");
2475     // invoke phase completion on all processors
2476     thisProxy.phaseDone();
2477   }
2478 }
2479
2480 /*
2481  *  lastMin = array of minimum champions of the last tournament
2482  *  lastMax = array of maximum champions of the last tournament
2483  *  lastMaxVal = array of last encountered maximum values, allows previous
2484  *                 minimum winners to eliminate themselves from the next
2485  *                 minimum race.
2486  *
2487  *  Called on all processors.
2488  */
2489 void KMeansBOC::collectDistances(KSelectionMessage *msg) {
2490
2491   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::collectDistances time=\t%g\n", CkMyPe(), CkWallTimer() );
2492
2493   DEBUGF("[%d] %d | min = %d max = %d\n", CkMyPe(),
2494          lastMinK, msg->minIDs[lastMinK], msg->maxIDs[lastMinK]);
2495   if ((CkMyPe() == msg->minIDs[lastMinK]) || 
2496       (CkMyPe() == msg->maxIDs[lastMinK])) {
2497     CkAssert(!selected);
2498     selected = true;
2499   }
2500
2501   // build outgoing reduction structure
2502   //   format = minVal | ID | maxVal | ID
2503   double *minMaxAndIDs = NULL;
2504
2505   minMaxAndIDs = new double[numK*4];
2506   // initialize to the appropriate out-of-band values (for error checks)
2507   for (int i=0; i<numK; i++) {
2508     minMaxAndIDs[i*4] = -1.0; // out-of-band min value
2509     minMaxAndIDs[i*4+1] = -1.0; // out of band ID
2510     minMaxAndIDs[i*4+2] = -1.0; // out-of-band max value
2511     minMaxAndIDs[i*4+3] = -1.0; // out of band ID
2512   }
2513   // If I have not won before, I put myself back into the competition
2514   if (!selected) {
2515     DEBUGF("[%d] My Contribution = %lf\n", CkMyPe(), minDistance);
2516     minMaxAndIDs[lastMinK*4] = minDistance;
2517     minMaxAndIDs[lastMinK*4+1] = CkMyPe();
2518     minMaxAndIDs[lastMinK*4+2] = minDistance;
2519     minMaxAndIDs[lastMinK*4+3] = CkMyPe();
2520   }
2521   delete msg;
2522
2523   CkCallback cb(CkIndex_KMeansBOC::findNextMinMax(NULL), 
2524                 0, thisProxy);
2525   contribute(numK*4*sizeof(double), minMaxAndIDs, 
2526              minMaxReductionType, cb);  
2527 }
2528
2529 void KMeansBOC::findNextMinMax(CkReductionMsg *msg) {
2530   // incoming format:
2531   //   minVal | minID | maxVal | maxID
2532
2533   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::findNextMinMax time=\t%g\n", CkMyPe(), CkWallTimer() );
2534
2535   if (numSelectionIter > 0) {
2536     double *incInfo = (double *)msg->getData();
2537     
2538     KSelectionMessage *outmsg = new (numK, numK) KSelectionMessage;
2539     outmsg->numKMinIDs = numK;
2540     outmsg->numKMaxIDs = numK;
2541     
2542     for (int i=0; i<numK; i++) {
2543       DEBUGF("%d | %lf %d %lf %d \n", i, 
2544              incInfo[i*4], (int)incInfo[i*4+1], 
2545              incInfo[i*4+2], (int)incInfo[i*4+3]);
2546     }
2547
2548     for (int i=0; i<numK; i++) {
2549       if (exemplarChoicesLeft[i] > 0) {
2550         outmsg->minIDs[i] = (int)incInfo[i*4+1];
2551         exemplarChoicesLeft[i]--;
2552       } else {
2553         outmsg->minIDs[i] = -1;
2554       }
2555       if (outlierChoicesLeft[i] > 0) {
2556         outmsg->maxIDs[i] = (int)incInfo[i*4+3];
2557         outlierChoicesLeft[i]--;
2558       } else {
2559         outmsg->maxIDs[i] = -1;
2560       }
2561     }
2562     thisProxy.collectDistances(outmsg);
2563     numSelectionIter--;
2564   } else {
2565     // invoke phase completion on all processors
2566     thisProxy.phaseDone();
2567   }
2568 }
2569
2570 /**
2571  *  Completion of the K-Means clustering and data selection of one phase
2572  *    of the computation.
2573  *
2574  *  Called on every processor.
2575  */
2576 void KMeansBOC::phaseDone() {
2577
2578   //  if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::phaseDone time=\t%g\n", CkMyPe(), CkWallTimer() );
2579
2580   LogPool *pool = CkpvAccess(_trace)->_logPool;
2581   CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
2582
2583   // now decide on what to do with the decision.
2584   if (!selected) {
2585     if (usePhases) {
2586       pool->keepPhase[currentPhase] = false;
2587     } else {
2588       // if not using phases, we're working on the whole log
2589       pool->setAllPhases(false);
2590     }
2591   }
2592
2593   // **FIXME** (?) - All processors have to agree on this, or the reduction
2594   //   will not be correct! The question is "is this enforcible?"
2595   if ((currentPhase == (pool->numPhases-1)) || !usePhases) {
2596     // We're done
2597     int dummy = 0;
2598     CkCallback cb(CkIndex_TraceProjectionsBOC::kMeansDone(NULL), 
2599                   0, bocProxy);
2600     contribute(sizeof(int), &dummy, CkReduction::sum_int, cb);
2601   } else {
2602     // reset all phase-based k-means data and decisions
2603
2604     // **FIXME**!!!!!    
2605     
2606     // invoke the next K-Means computation phase.
2607     currentPhase++;
2608     thisProxy[CkMyPe()].getNextPhaseMetrics();
2609   }
2610 }
2611
2612 void TraceProjectionsBOC::startEndTimeAnalysis()
2613 {
2614  //CkPrintf("[%d] TraceProjectionsBOC::startEndTimeAnalysis time=\t%g\n", CkMyPe(), CkWallTimer() );
2615
2616   endTime = CkpvAccess(_trace)->endTime;
2617   // CkPrintf("[%d] End time is %lf us\n", CkMyPe(), endTime*1e06);
2618
2619   CkCallback cb(CkIndex_TraceProjectionsBOC::endTimeDone(NULL), 
2620                 0, thisProxy);
2621   contribute(sizeof(double), &endTime, CkReduction::max_double, cb);  
2622 }
2623
2624 void TraceProjectionsBOC::endTimeDone(CkReductionMsg *msg)
2625 {
2626  //if(CkMyPe()==0)    CkPrintf("[%d] TraceProjectionsBOC::endTimeDone time=\t%g parModulesRemaining:%d\n", CkMyPe(), CkWallTimer(), parModulesRemaining);
2627
2628   CkAssert(CkMyPe() == 0);
2629   parModulesRemaining--;
2630   if (CkpvAccess(_trace) != NULL) {
2631     CkpvAccess(_trace)->_logPool->globalEndTime = *(double *)msg->getData();
2632     // CkPrintf("End time determined to be %lf us\n",
2633     //       (CkpvAccess(_trace)->_logPool->globalEndTime)*1e06);
2634   }
2635   delete msg;
2636   if (parModulesRemaining == 0) {
2637     thisProxy[CkMyPe()].finalize();
2638   }
2639 }
2640
2641 void TraceProjectionsBOC::kMeansDone(CkReductionMsg *msg) {
2642
2643  if(CkMyPe()==0)  CkPrintf("[%d] TraceProjectionsBOC::kMeansDone time=\t%g\n", CkMyPe(), CkWallTimer() );
2644
2645   CkAssert(CkMyPe() == 0);
2646   parModulesRemaining--;
2647   CkPrintf("K-Means Analysis Time = %lf seconds\n",
2648            CmiWallTimer()-analysisStartTime);
2649   delete msg;
2650   if (parModulesRemaining == 0) {
2651     thisProxy[CkMyPe()].finalize();
2652   }
2653 }
2654
2655 /**
2656  *
2657  *  This version is called (on processor 0) only if flushCheck fails.
2658  *
2659  */
2660 void TraceProjectionsBOC::kMeansDone() {
2661   CkAssert(CkMyPe() == 0);
2662   parModulesRemaining--;
2663   CkPrintf("K-Means Analysis Aborted because of flush. Time taken = %lf seconds\n",
2664            CmiWallTimer()-analysisStartTime);
2665   if (parModulesRemaining == 0) {
2666     thisProxy[CkMyPe()].finalize();
2667   }
2668 }
2669
2670 void TraceProjectionsBOC::finalize()
2671 {
2672   CkAssert(CkMyPe() == 0);
2673   //CkPrintf("Total Analysis Time = %lf seconds\n", 
2674   //       CmiWallTimer()-analysisStartTime);
2675   thisProxy.closingTraces();
2676 }
2677
2678 // Called on every processor
2679 void TraceProjectionsBOC::closingTraces() {
2680   CkpvAccess(_trace)->closeTrace();
2681
2682     // subtle:  reduction needs to go to the PE which started CkExit()
2683   int pe = 0;
2684   if (endPe != -1) pe = endPe;
2685   CkCallback cb(CkIndex_TraceProjectionsBOC::closeParallelShutdown(NULL), 
2686                 pe, thisProxy); 
2687   contribute(0, NULL, CkReduction::sum_int, cb);  
2688 }
2689
2690 // The sole purpose of this reduction is to decide whether or not
2691 //   Projections as a module needs to call CkExit() to get other
2692 //   modules to shutdown.
2693 void TraceProjectionsBOC::closeParallelShutdown(CkReductionMsg *msg) {
2694   CkAssert(endPe == -1 && CkMyPe() ==0 || CkMyPe() == endPe);
2695   delete msg;
2696   // decide if CkExit() needs to be called
2697   if (!CkpvAccess(_trace)->converseExit) {
2698     CkExit();
2699   }
2700 }
2701 /*
2702  *  Registration and definition of the Outlier Reduction callback.
2703  *  Format: Sum | Min | Max | Sum of Squares
2704  */
2705 CkReductionMsg *outlierReduction(int nMsgs,
2706                                  CkReductionMsg **msgs) {
2707   int numBytes = 0;
2708   int numMetrics = 0;
2709   double *ret = NULL;
2710
2711   if (nMsgs == 1) {
2712     // nothing to do, just pass it on
2713     return CkReductionMsg::buildNew(msgs[0]->getSize(),msgs[0]->getData());
2714   }
2715
2716   if (nMsgs > 1) {
2717     numBytes = msgs[0]->getSize();
2718     // sanity checks
2719     if (numBytes%sizeof(double) != 0) {
2720       CkAbort("Outlier Reduction Size incompatible with doubles!\n");
2721     }
2722     if ((numBytes/sizeof(double))%4 != 0) {
2723       CkAbort("Outlier Reduction Size Array not divisible by 4!\n");
2724     }
2725     numMetrics = (numBytes/sizeof(double))/4;
2726     ret = new double[numMetrics*4];
2727
2728     // copy the first message data into the return structure first
2729     for (int i=0; i<numMetrics*4; i++) {
2730       ret[i] = ((double *)msgs[0]->getData())[i];
2731     }
2732
2733     // Sum | Min | Max | Sum of Squares
2734     for (int msgIdx=1; msgIdx<nMsgs; msgIdx++) {
2735       for (int i=0; i<numMetrics; i++) {
2736         // Sum
2737         ret[i] += ((double *)msgs[msgIdx]->getData())[i];
2738         // Min
2739         ret[numMetrics + i] =
2740           (ret[numMetrics + i] < 
2741            ((double *)msgs[msgIdx]->getData())[numMetrics + i]) 
2742           ? ret[numMetrics + i] : 
2743           ((double *)msgs[msgIdx]->getData())[numMetrics + i];
2744         // Max
2745         ret[2*numMetrics + i] = 
2746           (ret[2*numMetrics + i] >
2747            ((double *)msgs[msgIdx]->getData())[2*numMetrics + i])
2748           ? ret[2*numMetrics + i] :
2749           ((double *)msgs[msgIdx]->getData())[2*numMetrics + i];
2750         // Sum of Squares (squaring already done at leaf)
2751         ret[3*numMetrics + i] +=
2752           ((double *)msgs[msgIdx]->getData())[3*numMetrics + i];
2753       }
2754     }
2755   }
2756   
2757   /* apparently, we do not delete the incoming messages */
2758   return CkReductionMsg::buildNew(numBytes,ret);
2759 }
2760
2761 /*
2762  * The only reason we have a user-defined reduction is to support
2763  *   identification of the "winning" processors as well as to handle
2764  *   both the min and the max of each "tournament". A simple min/max
2765  *   discovery cannot handle ties.
2766  */
2767 CkReductionMsg *minMaxReduction(int nMsgs,
2768                                 CkReductionMsg **msgs) {
2769   CkAssert(nMsgs > 0);
2770
2771   int numBytes = msgs[0]->getSize();
2772   CkAssert(numBytes%sizeof(double) == 0);
2773   int numK = (numBytes/sizeof(double))/4;
2774
2775   double *ret = new double[numK*4];
2776   // fill with out-of-band values
2777   for (int i=0; i<numK; i++) {
2778     ret[i*4] = -1.0;
2779     ret[i*4+1] = -1.0;
2780     ret[i*4+2] = -1.0;
2781     ret[i*4+3] = -1.0;
2782   }
2783
2784   // incoming format K * (minVal | minIdx | maxVal | maxIdx)
2785   for (int i=0; i<nMsgs; i++) {
2786     double *temp = (double *)msgs[i]->getData();
2787     for (int j=0; j<numK; j++) {
2788       // no previous valid min
2789       if (ret[j*4+1] < 0) {
2790         // fill it in only if the incoming min is valid
2791         if (temp[j*4+1] >= 0) {
2792           ret[j*4] = temp[j*4];      // fill min value
2793           ret[j*4+1] = temp[j*4+1];  // fill ID
2794         }
2795       } else {
2796         // find Min, only if incoming min is valid
2797         if (temp[j*4+1] >= 0) {
2798           if (temp[j*4] < ret[j*4]) {
2799             ret[j*4] = temp[j*4];      // replace min value
2800             ret[j*4+1] = temp[j*4+1];  // replace ID
2801           }
2802         }
2803       }
2804       // no previous valid max
2805       if (ret[j*4+3] < 0) {
2806         // fill only if the incoming max is valid
2807         if (temp[j*4+3] >= 0) {
2808           ret[j*4+2] = temp[j*4+2];  // fill max value
2809           ret[j*4+3] = temp[j*4+3];  // fill ID
2810         }
2811       } else {
2812         // find Max, only if incoming max is valid
2813         if (temp[j*4+3] >= 0) {
2814           if (temp[j*4+2] > ret[j*4+2]) {
2815             ret[j*4+2] = temp[j*4+2];  // replace max value
2816             ret[j*4+3] = temp[j*4+3];  // replace ID
2817           }
2818         }
2819       }
2820     }
2821   }
2822   CkReductionMsg *redmsg = CkReductionMsg::buildNew(numBytes, ret);
2823   delete [] ret;
2824   return redmsg;
2825 }
2826
2827 #include "TraceProjections.def.h"
2828 #endif //PROJ_ANALYSIS
2829
2830 /*@}*/