Merge branch 'charm' into virtualDebug
[charm.git] / src / langs / bluegene / blue.C
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 /** \file: blue.C -- Converse BlueGene Emulator Code
9  *  Emulator written by Gengbin Zheng, gzheng@uiuc.edu on 2/20/2001
10  */
11  
12 #include <stdio.h>
13 #include <string.h>
14 #include <stdlib.h>
15 #include <math.h>
16 #include <string.h>
17 #include <unistd.h>
18
19 #include "cklists.h"
20 #include "queueing.h"
21 #include "blue.h"
22 #include "blue_impl.h"          // implementation header file
23 //#include "blue_timing.h"      // timing module
24 #include "bigsim_record.h"
25
26 #include "bigsim_ooc.h" //out-of-core module
27 #include "bigsim_debug.h"
28
29 //#define  DEBUGF(x)      //CmiPrintf x;
30
31 #undef DEBUGLEVEL
32 #define DEBUGLEVEL 10
33
34 const double CHARM_OVERHEAD = 0.5E-6;
35
36 /* node level variables */
37 CpvDeclare(nodeInfo*, nodeinfo);                /* represent a bluegene node */
38
39 /* thread level variables */
40 CtvDeclare(threadInfo *, threadinfo);   /* represent a bluegene thread */
41
42 CpvStaticDeclare(CthThread, mainThread);
43
44 /* BG machine parameter */
45 CpvDeclare(BGMach, bgMach);     /* BG machine size description */
46 CpvDeclare(int, numNodes);        /* number of bg nodes on this PE */
47
48 /* emulator node level variables */
49 CpvDeclare(SimState, simState);
50
51 CpvDeclare(int      , CthResumeBigSimThreadIdx);
52
53 static int arg_argc;
54 static char **arg_argv;
55
56 CmiNodeLock initLock;     // used for BnvInitialize
57
58 int _bgSize = 0;                        // short cut of blue gene node size
59 int delayCheckFlag = 1;          // when enabled, only check correction 
60                                         // messages after some interval
61 int programExit = 0;
62
63 static int bgstats_flag = 0;            // flag print stats at end of simulation
64
65 // for debugging log
66 FILE *bgDebugLog;                       // for debugging
67
68 #ifdef CMK_ORIGIN2000
69 extern "C" int start_counters(int e0, int e1);
70 extern "C" int read_counters(int e0, long long *c0, int e1, long long *c1);
71 inline double Count2Time(long long c) { return c*5.e-7; }
72 #elif CMK_HAS_COUNTER_PAPI
73 #include <papi.h>
74 int *papiEvents = NULL;
75 int numPapiEvents;
76 long_long *papiValues = NULL;
77 CmiUInt8 *total_papi_counters = NULL;
78 char **papi_counters_desc = NULL;
79 #define MAX_TOTAL_EVENTS 10
80 #endif
81
82
83 /****************************************************************************
84      little utility functions
85 ****************************************************************************/
86
87 char **BgGetArgv() { return arg_argv; }
88 int    BgGetArgc() { return arg_argc; }
89
90 /***************************************************************************
91      Implementation of the same counter interface currently used for the
92      Origin2000 using PAPI in the underlying layer.
93
94      Because of the difference in counter numbering, it is assumed that
95      the two counters desired are CYCLES and FLOPS and hence the numbers
96      e0 and e1 are ignored.
97
98      start_counters is also not implemented because PAPI does things in
99      a different manner.
100 ****************************************************************************/
101
102 #if CMK_HAS_COUNTER_PAPI
103 /*
104 CmiUInt8 total_ins = 0;
105 CmiUInt8 total_fps = 0;
106 CmiUInt8 total_l1_dcm = 0;
107 CmiUInt8 total_mem_rcy = 0;
108 */
109 int init_counters()
110 {
111     int retval = PAPI_library_init(PAPI_VER_CURRENT);
112
113     if (retval != PAPI_VER_CURRENT) { CmiAbort("PAPI library init error!"); } 
114
115         //a temporary
116         const char *eventDesc[MAX_TOTAL_EVENTS];
117         int lpapiEvents[MAX_TOTAL_EVENTS];
118         
119     numPapiEvents = 0;
120         
121 #define ADD_LOW_LEVEL_EVENT(e, edesc) \
122                 if(PAPI_query_event(e) == PAPI_OK){ \
123                         if(CmiMyPe()==0) printf("PAPI Info:> Event for %s added\n", edesc); \
124                         eventDesc[numPapiEvents] = edesc;       \
125                         lpapiEvents[numPapiEvents++] = e; \
126                 }
127                 
128     // PAPI high level API does not require explicit library intialization
129         eventDesc[numPapiEvents] ="cycles";
130     lpapiEvents[numPapiEvents++] = PAPI_TOT_CYC;
131         
132         
133     /*if (PAPI_query_event(PAPI_FP_INS) == PAPI_OK) {
134       if (CmiMyPe()== 0) printf("PAPI_FP_INS used\n");
135           eventDesc[numPapiEvents] = "floating point instructions";     
136       lpapiEvents[numPapiEvents++] = PAPI_FP_INS;         
137     } else {
138       if (CmiMyPe()== 0) printf("PAPI_TOT_INS used\n");
139           eventDesc[numPapiEvents] = "total instructions";      
140       lpapiEvents[numPapiEvents++] = PAPI_TOT_INS;
141     }
142     */
143         
144         //ADD_LOW_LEVEL_EVENT(PAPI_FP_INS, "floating point instructions");
145         ADD_LOW_LEVEL_EVENT(PAPI_TOT_INS, "total instructions");
146         //ADD_LOW_LEVEL_EVENT(PAPI_L1_DCM, "L1 cache misses");
147         ADD_LOW_LEVEL_EVENT(PAPI_L2_DCM, "L2 data cache misses");       
148         //ADD_LOW_LEVEL_EVENT(PAPI_MEM_RCY, "idle cycles waiting for memory reads");    
149         //ADD_LOW_LEVEL_EVENT(PAPI_TLB_DM, "Data TLB misses");
150         
151         if(numPapiEvents == 0){
152                 CmiAbort("No papi events are defined!\n");
153         }
154         
155         if(numPapiEvents >= MAX_TOTAL_EVENTS){
156                 CmiAbort("Exceed the pre-defined max number of papi events allowed!\n");
157         }
158         
159         papiEvents = new int[numPapiEvents];
160         papiValues = new long_long[numPapiEvents];
161         total_papi_counters = new CmiUInt8[numPapiEvents];
162         papi_counters_desc = new char *[numPapiEvents];
163         for(int i=0; i<numPapiEvents; i++){
164                 papiEvents[i] = lpapiEvents[i];
165                 total_papi_counters[i] = 0;
166                 CmiPrintf("%d: %s\n", i, eventDesc[i]);
167                 papi_counters_desc[i] = new char[strlen(eventDesc[i])+1];
168                 memcpy(papi_counters_desc[i], eventDesc[i], strlen(eventDesc[i]));
169                 papi_counters_desc[i][strlen(eventDesc[i])] = 0;
170         }
171         
172 /*
173     if (PAPI_query_event(PAPI_L1_DCM) == PAPI_OK) {
174       if (CmiMyPe()== 0) printf("PAPI_L1_DCM used\n");
175       papiEvents[numPapiEvents++] = PAPI_L1_DCM;   // L1 cache miss
176     }
177     if (PAPI_query_event(PAPI_TLB_DM) == PAPI_OK) {
178       if (CmiMyPe()== 0) printf("PAPI_TLB_DM used\n");
179       papiEvents[numPapiEvents++] = PAPI_TLB_DM;   // data TLB misses
180     }
181     if (PAPI_query_event(PAPI_MEM_RCY) == PAPI_OK) {
182       if (CmiMyPe()== 0) printf("PAPI_MEM_RCY used\n");
183       papiEvents[numPapiEvents++] = PAPI_MEM_RCY;  // idle cycle waiting for reads
184     }
185 */
186
187     int status = PAPI_start_counters(papiEvents, numPapiEvents);
188     if (status != PAPI_OK) {
189       CmiPrintf("PAPI_start_counters error code: %d\n", status);
190       switch (status) {
191       case PAPI_ENOEVNT:  CmiPrintf("PAPI Error> Hardware Event does not exist\n"); break;
192       case PAPI_ECNFLCT:  CmiPrintf("PAPI Error> Hardware Event exists, but cannot be counted due to counter resource limitations\n"); break;
193       }
194       CmiAbort("Unable to start PAPI counters!\n");
195     }
196 }
197
198 int read_counters(long_long *papiValues, int n) 
199 {
200   // PAPI_read_counters resets the counter, hence it behaves like the perfctr
201   // code for Origin2000
202   int status;
203   status = PAPI_read_counters(papiValues, n);
204   if (status != PAPI_OK) {
205     CmiPrintf("PAPI_read_counters error: %d\n", status);
206     CmiAbort("Failed to read PAPI counters!\n");
207   }
208   
209   /*
210   total_ins += papiValues[0];
211   total_fps += papiValues[1];
212   total_l1_dcm += papiValues[2];
213   total_mem_rcy += papiValues[3];
214   */
215   
216   return 0;
217 }
218
219 inline void CountPapiEvents(){
220   for(int i=0 ;i<numPapiEvents; i++)
221         total_papi_counters[i] += papiValues[i];
222 }
223
224 inline double Count2Time(long_long *papiValues, int n) { 
225   return papiValues[1]*cva(bgMach).fpfactor; 
226 }
227 #endif
228
229 /*****************************************************************************
230      Handler Table, one per thread
231 ****************************************************************************/
232
233 extern "C" void defaultBgHandler(char *null, void *uPtr)
234 {
235   CmiAbort("BG> Invalid Handler called!\n");
236 }
237
238 HandlerTable::HandlerTable()
239 {
240     handlerTableCount = 1;
241     handlerTable = new BgHandlerInfo [MAX_HANDLERS];
242     for (int i=0; i<MAX_HANDLERS; i++) {
243       handlerTable[i].fnPtr = defaultBgHandler;
244       handlerTable[i].userPtr = NULL;
245     }
246 }
247
248 inline int HandlerTable::registerHandler(BgHandler h)
249 {
250     ASSERT(!cva(simState).inEmulatorInit);
251     /* leave 0 as blank, so it can report error luckily */
252     int cur = handlerTableCount++;
253     if (cur >= MAX_HANDLERS)
254       CmiAbort("BG> HandlerID exceed the maximum.\n");
255     handlerTable[cur].fnPtr = (BgHandlerEx)h;
256     handlerTable[cur].userPtr = NULL;
257     return cur;
258 }
259
260 inline void HandlerTable::numberHandler(int idx, BgHandler h)
261 {
262     ASSERT(!cva(simState).inEmulatorInit);
263     if (idx >= handlerTableCount || idx < 1)
264       CmiAbort("BG> HandlerID exceed the maximum!\n");
265     handlerTable[idx].fnPtr = (BgHandlerEx)h;
266     handlerTable[idx].userPtr = NULL;
267 }
268
269 inline void HandlerTable::numberHandlerEx(int idx, BgHandlerEx h, void *uPtr)
270 {
271     ASSERT(!cva(simState).inEmulatorInit);
272     if (idx >= handlerTableCount || idx < 1)
273       CmiAbort("BG> HandlerID exceed the maximum!\n");
274     handlerTable[idx].fnPtr = h;
275     handlerTable[idx].userPtr = uPtr;
276 }
277
278 inline BgHandlerInfo * HandlerTable::getHandle(int handler)
279 {
280 #if 0
281     if (handler >= handlerTableCount) {
282       CmiPrintf("[%d] handler: %d handlerTableCount:%d. \n", tMYNODEID, handler, handlerTableCount);
283       CmiAbort("Invalid handler!");
284     }
285 #endif
286     if (handler >= handlerTableCount || handler<0) return NULL;
287     return &handlerTable[handler];
288 }
289
290 /*****************************************************************************
291       low level API
292 *****************************************************************************/
293
294 int BgRegisterHandler(BgHandler h)
295 {
296   ASSERT(!cva(simState).inEmulatorInit);
297   int cur;
298 #if CMK_BLUEGENE_NODE
299   return tMYNODE->handlerTable.registerHandler(h);
300 #else
301   if (tTHREADTYPE == COMM_THREAD) {
302     return tMYNODE->handlerTable.registerHandler(h);
303   }
304   else {
305     return tHANDLETAB.registerHandler(h);
306   }
307 #endif
308 }
309
310 void BgNumberHandler(int idx, BgHandler h)
311 {
312   ASSERT(!cva(simState).inEmulatorInit);
313 #if CMK_BLUEGENE_NODE
314   tMYNODE->handlerTable.numberHandler(idx,h);
315 #else
316   if (tTHREADTYPE == COMM_THREAD) {
317     tMYNODE->handlerTable.numberHandler(idx, h);
318   }
319   else {
320     tHANDLETAB.numberHandler(idx, h);
321   }
322 #endif
323 }
324
325 void BgNumberHandlerEx(int idx, BgHandlerEx h, void *uPtr)
326 {
327   ASSERT(!cva(simState).inEmulatorInit);
328 #if CMK_BLUEGENE_NODE
329   tMYNODE->handlerTable.numberHandlerEx(idx,h,uPtr);
330 #else
331   if (tTHREADTYPE == COMM_THREAD) {
332     tMYNODE->handlerTable.numberHandlerEx(idx,h,uPtr);
333   }
334   else {
335     tHANDLETAB.numberHandlerEx(idx,h,uPtr);
336   }
337 #endif
338 }
339
340 /*****************************************************************************
341       BG Timing Functions
342 *****************************************************************************/
343
344 void resetVTime()
345 {
346   /* reset start time */
347   int timingMethod = cva(bgMach).timingMethod;
348   if (timingMethod == BG_WALLTIME) {
349     double ct = BG_TIMER();
350     if (tTIMERON) CmiAssert(ct >= tSTARTTIME);
351     tSTARTTIME = ct;
352   }
353   else if (timingMethod == BG_ELAPSE)
354     tSTARTTIME = tCURRTIME;
355 #ifdef CMK_ORIGIN2000
356   else if (timingMethod == BG_COUNTER) {
357     if (start_counters(0, 21) <0) {
358       perror("start_counters");;
359     }
360   }
361 #elif CMK_HAS_COUNTER_PAPI
362   else if (timingMethod == BG_COUNTER) {
363     // do a fake read to reset the counters. It would be more efficient
364     // to use the low level API, but that would be a lot more code to
365     // write for now.
366     if (read_counters(papiValues, numPapiEvents) < 0) perror("read_counters");
367   }
368 #endif
369 }
370
371 void startVTimer()
372 {
373   CmiAssert(tTIMERON == 0);
374   resetVTime();
375   tTIMERON = 1;
376 }
377
378 // should be used only when BG_WALLTIME
379 static inline void advanceTime(double inc)
380 {
381   if (BG_ABS(inc) < 1e-10) inc = 0.0;    // ignore floating point errors
382   if (inc < 0.0) return;
383   CmiAssert(inc>=0.0);
384   inc *= cva(bgMach).cpufactor;
385   tCURRTIME += inc;
386   CmiAssert(tTIMERON==1);
387 }
388
389 void stopVTimer()
390 {
391   int k;
392 #if 0
393   if (tTIMERON != 1) {
394     CmiAbort("stopVTimer called without startVTimer!\n");
395   }
396   CmiAssert(tTIMERON == 1);
397 #else
398   if (tTIMERON == 0) return;         // already stopped
399 #endif
400   const int timingMethod = cva(bgMach).timingMethod;
401   if (timingMethod == BG_WALLTIME) {
402     const double tp = BG_TIMER();
403     double inc = tp-tSTARTTIME;
404     advanceTime(inc-cva(bgMach).timercost);
405 //    tSTARTTIME = BG_TIMER();  // skip the above time
406   }
407   else if (timingMethod == BG_ELAPSE) {
408     // if no bgelapse called, assume it takes 1us
409     if (tCURRTIME-tSTARTTIME < 1E-9) {
410 //      tCURRTIME += 1e-6;
411     }
412   }
413   else if (timingMethod == BG_COUNTER)  {
414 #if CMK_ORIGIN2000
415     long long c0, c1;
416     if (read_counters(0, &c0, 21, &c1) < 0) perror("read_counters");
417     tCURRTIME += Count2Time(c1);
418 #elif CMK_HAS_COUNTER_PAPI
419     if (read_counters(papiValues, numPapiEvents) < 0) perror("read_counters");
420     CountPapiEvents();
421     tCURRTIME += Count2Time(papiValues, numPapiEvents);
422 #endif
423   }
424   tTIMERON = 0;
425 }
426
427 double BgGetTime()
428 {
429 #if 1
430   const int timingMethod = cva(bgMach).timingMethod;
431   if (timingMethod == BG_WALLTIME) {
432     /* accumulate time since last starttime, and reset starttime */
433     if (tTIMERON) {
434       const double tp2= BG_TIMER();
435       double &startTime = tSTARTTIME;
436       double inc = tp2 - startTime;
437       advanceTime(inc-cva(bgMach).timercost);
438       startTime = BG_TIMER();
439     }
440     return tCURRTIME;
441   }
442   else if (timingMethod == BG_ELAPSE) {
443     return tCURRTIME;
444   }
445   else if (timingMethod == BG_COUNTER) {
446     if (tTIMERON) {
447 #if CMK_ORIGIN2000
448       long long c0, c1;
449       if (read_counters(0, &c0, 21, &c1) <0) perror("read_counters");;
450       tCURRTIME += Count2Time(c1);
451       if (start_counters(0, 21)<0) perror("start_counters");;
452 #elif CMK_HAS_COUNTER_PAPI
453     if (read_counters(papiValues, numPapiEvents) < 0) perror("read_counters");
454     tCURRTIME += Count2Time(papiValues, numPapiEvents);
455 #endif
456     }
457     return tCURRTIME;
458   }
459   else 
460     CmiAbort("Unknown Timing Method.");
461   return -1;
462 #else
463   /* sometime I am interested in real wall time */
464   tCURRTIME = CmiWallTimer();
465   return tCURRTIME;
466 #endif
467 }
468
469 // moved to blue_logs.C
470 double BgGetCurTime()
471 {
472   ASSERT(tTHREADTYPE == WORK_THREAD);
473   return tCURRTIME;
474 }
475
476 extern "C" 
477 void BgElapse(double t)
478 {
479 //  ASSERT(tTHREADTYPE == WORK_THREAD);
480   if (cva(bgMach).timingMethod == BG_ELAPSE)
481     tCURRTIME += t;
482 }
483
484 // advance virtual timer no matter what scheme is used
485 extern "C" 
486 void BgAdvance(double t)
487 {
488 //  ASSERT(tTHREADTYPE == WORK_THREAD);
489   tCURRTIME += t;
490 }
491
492 /* BG API Func
493  * called by a communication thread to test if poll data 
494  * in the node's INBUFFER for its own queue 
495  */
496 char * getFullBuffer()
497 {
498   /* I must be a communication thread */
499   if (tTHREADTYPE != COMM_THREAD) 
500     CmiAbort("GetFullBuffer called by a non-communication thread!\n");
501
502   return tMYNODE->getFullBuffer();
503 }
504
505 /**  BG API Func
506  * called by a Converse handler or sendPacket()
507  * add message msgPtr to a bluegene node's inbuffer queue 
508  */
509 void addBgNodeInbuffer(char *msgPtr, int lnodeID)
510 {
511 #ifndef CMK_OPTIMIZE
512   if (lnodeID >= cva(numNodes)) CmiAbort("NodeID is out of range!");
513 #endif
514   nodeInfo &nInfo = cva(nodeinfo)[lnodeID];
515
516   //printf("Adding a msg %p to local node %d and its thread %d\n", msgPtr, lnodeID, CmiBgMsgThreadID(msgPtr));  
517         
518   nInfo.addBgNodeInbuffer(msgPtr);
519 }
520 extern "C" void addBgNodeInbuffer_c(char *msgPtr, int lnodeID) {
521   addBgNodeInbuffer(msgPtr, lnodeID);
522 }
523
524 /** BG API Func 
525  *  called by a comm thread
526  *  add a message to a thread's affinity queue in same node 
527  */
528 void addBgThreadMessage(char *msgPtr, int threadID)
529 {
530 #ifndef CMK_OPTIMIZE
531   if (!cva(bgMach).isWorkThread(threadID)) CmiAbort("ThreadID is out of range!");
532 #endif
533   workThreadInfo *tInfo = (workThreadInfo *)tMYNODE->threadinfo[threadID];
534   tInfo->addAffMessage(msgPtr);
535 }
536
537 /** BG API Func 
538  *  called by a comm thread, add a message to a node's non-affinity queue 
539  */
540 void addBgNodeMessage(char *msgPtr)
541 {
542   tMYNODE->addBgNodeMessage(msgPtr);
543 }
544
545 void BgEnqueue(char *msg)
546 {
547 #if 0
548   ASSERT(tTHREADTYPE == WORK_THREAD);
549   workThreadInfo *tinfo = (workThreadInfo *)cta(threadinfo);
550   tinfo->addAffMessage(msg);
551 #else
552   nodeInfo *myNode = cta(threadinfo)->myNode;
553   addBgNodeInbuffer(msg, myNode->id);
554 #endif
555 }
556
557 /** BG API Func 
558  *  check if inBuffer on this node has msg available
559  */
560 int checkReady()
561 {
562   if (tTHREADTYPE != COMM_THREAD)
563     CmiAbort("checkReady called by a non-communication thread!\n");
564   return !tINBUFFER.isEmpty();
565 }
566
567
568 /* handler to process the msg */
569 void msgHandlerFunc(char *msg)
570 {
571   /* bgmsg is CmiMsgHeaderSizeBytes offset of original message pointer */
572   int gnodeID = CmiBgMsgNodeID(msg);
573   if (gnodeID >= 0) {
574 #ifndef CMK_OPTIMIZE
575     if (nodeInfo::Global2PE(gnodeID) != CmiMyPe())
576       CmiAbort("msgHandlerFunc received wrong message!");
577 #endif
578     int lnodeID = nodeInfo::Global2Local(gnodeID);
579     if (cva(bgMach).inReplayMode()) {
580       int x, y, z;
581       BgGetXYZ(cva(bgMach).replay, &x, &y, &z);
582       if (nodeInfo::XYZ2Local(x,y,z) != lnodeID) return;
583       else lnodeID = 0;
584     }
585     addBgNodeInbuffer(msg, lnodeID);
586   }
587   else {
588     CmiAbort("Invalid message!");
589   }
590 }
591
592 /* Converse handler for node level broadcast message */
593 void nodeBCastMsgHandlerFunc(char *msg)
594 {
595   /* bgmsg is CmiMsgHeaderSizeBytes offset of original message pointer */
596   int gnodeID = CmiBgMsgNodeID(msg);
597   CmiInt2 threadID = CmiBgMsgThreadID(msg);
598   int lnodeID;
599
600   if (gnodeID < -1) {
601     gnodeID = - (gnodeID+100);
602     if (nodeInfo::Global2PE(gnodeID) == CmiMyPe())
603       lnodeID = nodeInfo::Global2Local(gnodeID);
604     else
605       lnodeID = -1;
606   }
607   else {
608     ASSERT(gnodeID == -1);
609     lnodeID = gnodeID;
610   }
611   // broadcast except lnodeID:threadId
612   int len = CmiBgMsgLength(msg);
613   int count = 0;
614   for (int i=0; i<cva(numNodes); i++)
615   {
616     if (i==lnodeID) continue;
617     char *dupmsg;
618     if (count == 0) dupmsg = msg;
619     else dupmsg = CmiCopyMsg(msg, len);
620     DEBUGF(("addBgNodeInbuffer to %d\n", i));
621     CmiBgMsgNodeID(dupmsg) = nodeInfo::Local2Global(i);         // updated
622     addBgNodeInbuffer(dupmsg, i);
623     count ++;
624   }
625   if (count == 0) CmiFree(msg);
626 }
627
628 // clone a msg, only has a valid header, plus a pointer to the real msg
629 char *BgCloneMsg(char *msg)
630 {
631   int size = CmiBlueGeneMsgHeaderSizeBytes + sizeof(char *);
632   char *dupmsg = (char *)CmiAlloc(size);
633   memcpy(dupmsg, msg, CmiBlueGeneMsgHeaderSizeBytes);
634   *(char **)(dupmsg + CmiBlueGeneMsgHeaderSizeBytes) = msg;
635   CmiBgMsgRefCount(msg) ++;
636   CmiBgMsgFlag(dupmsg) = BG_CLONE;
637   return dupmsg;
638 }
639
640 // expand the cloned msg to the full size msg
641 char *BgExpandMsg(char *msg)
642 {
643   char *origmsg = *(char **)(msg + CmiBlueGeneMsgHeaderSizeBytes);
644   int size = CmiBgMsgLength(origmsg);
645   char *dupmsg = (char *)CmiAlloc(size);
646   memcpy(dupmsg, msg, CmiBlueGeneMsgHeaderSizeBytes);
647   memcpy(dupmsg+CmiBlueGeneMsgHeaderSizeBytes, origmsg+CmiBlueGeneMsgHeaderSizeBytes, size-CmiBlueGeneMsgHeaderSizeBytes);
648   CmiFree(msg);
649   CmiBgMsgRefCount(origmsg) --;
650   if (CmiBgMsgRefCount(origmsg) == 0) CmiFree(origmsg);
651   return dupmsg;
652 }
653
654 /* Converse handler for thread level broadcast message */
655 void threadBCastMsgHandlerFunc(char *msg)
656 {
657   /* bgmsg is CmiMsgHeaderSizeBytes offset of original message pointer */
658   int gnodeID = CmiBgMsgNodeID(msg);
659   CmiInt2 threadID = CmiBgMsgThreadID(msg);
660   int lnodeID;
661   if (gnodeID < -1) {
662       gnodeID = - (gnodeID+100);
663       if (nodeInfo::Global2PE(gnodeID) == CmiMyPe())
664         lnodeID = nodeInfo::Global2Local(gnodeID);
665       else
666         lnodeID = -1;
667       CmiAssert(threadID != ANYTHREAD);
668   }
669   else {
670     ASSERT(gnodeID == -1);
671     lnodeID = gnodeID;
672   }
673   // broadcast except nodeID:threadId
674   int len = CmiBgMsgLength(msg);
675   // optimization needed if the message size is big
676   // making duplications can easily run out of memory
677   int bigOpt = (len > 4096);
678   for (int i=0; i<cva(numNodes); i++)
679   {
680       for (int j=0; j<cva(bgMach).numWth; j++) {
681         if (i==lnodeID && j==threadID) continue;
682         // for big message, clone a message token instead of a real msg
683         char *dupmsg = bigOpt? BgCloneMsg(msg) : CmiCopyMsg(msg, len);
684         CmiBgMsgNodeID(dupmsg) = nodeInfo::Local2Global(i);
685         CmiBgMsgThreadID(dupmsg) = j;
686         DEBUGF(("[%d] addBgNodeInbuffer to %d tid:%d\n", CmiMyPe(), i, j));
687         addBgNodeInbuffer(dupmsg, i);
688       }
689   }
690   // for big message, will free after all tokens are done
691   if (!bigOpt) CmiFree(msg);
692 }
693
694 /**
695  *              BG Messaging Functions
696  */
697
698 static inline double MSGTIME(int ox, int oy, int oz, int nx, int ny, int nz, int bytes)
699 {
700   return cva(bgMach).network->latency(ox, oy, oz, nx, ny, nz, bytes);
701 }
702
703 /**
704  *   a simple message streaming on demand and special purpose
705  *   user call  BgStartStreaming() and BgEndStreaming()
706  *   each worker thread call one send, and all sends are sent
707  *   via multiplesend at the end
708  */
709
710 static int bg_streaming = 0;
711
712 class BgStreaming {
713 public:
714   char **streamingMsgs;
715   int  *streamingMsgSizes;
716   int count;
717   int totalWorker;
718   int pe;
719 public:
720   BgStreaming() {
721     streamingMsgs = NULL;
722     streamingMsgSizes = NULL;
723     count = 0;
724     totalWorker = 0;
725     pe = -1;
726   }
727   ~BgStreaming() {
728     if (streamingMsgs) {
729       delete [] streamingMsgs;
730       delete [] streamingMsgSizes;
731     }
732   }
733   void init(int nNodes) {
734     totalWorker = nNodes * BgGetNumWorkThread();
735     streamingMsgs = new char *[totalWorker];
736     streamingMsgSizes = new int [totalWorker];
737   }
738   void depositMsg(int p, int size, char *m) {
739     streamingMsgs[count] = m;
740     streamingMsgSizes[count] = size;
741     count ++;
742     if (pe == -1) pe = p;
743     else CmiAssert(pe == p);
744     if (count == totalWorker) {
745       // CkPrintf("streaming send\n");
746       CmiMultipleSend(pe, count, streamingMsgSizes, streamingMsgs);
747       for (int i=0; i<count; i++) CmiFree(streamingMsgs[i]);
748       pe = -1;
749       count = 0;
750     }
751   }
752 };
753
754 BgStreaming bgstreaming;
755
756 void BgStartStreaming()
757 {
758   bg_streaming = 1;
759 }
760
761 void BgEndStreaming()
762 {
763   bg_streaming = 0;
764 }
765
766 void CmiSendPacketWrapper(int pe, int msgSize,char *msg, int streaming)
767 {
768   if (streaming && pe != CmiMyPe())
769     bgstreaming.depositMsg(pe, msgSize, msg);
770   else
771     CmiSyncSendAndFree(pe, msgSize, msg);
772 }
773
774
775 void CmiSendPacket(int x, int y, int z, int msgSize,char *msg)
776 {
777 //  CmiSyncSendAndFree(nodeInfo::XYZ2RealPE(x,y,z),msgSize,(char *)msg);
778 #if !DELAY_SEND
779   const int pe = nodeInfo::XYZ2RealPE(x,y,z);
780   CmiSendPacketWrapper(pe, msgSize, msg, bg_streaming);
781 #else
782   if (!correctTimeLog) {
783     const int pe = nodeInfo::XYZ2RealPE(x,y,z);
784     CmiSendPacketWrapper(pe, msgSize, msg, bg_streaming);
785   }
786   // else messages are kept in the log (MsgEntry), and only will be sent
787   // after timing correction has done on that log.
788   // TODO: streaming has no effect if time correction is on.
789 #endif
790 }
791
792 /* send will copy data to msg buffer */
793 /* user data is not free'd in this routine, user can reuse the data ! */
794 void sendPacket_(nodeInfo *myNode, int x, int y, int z, int threadID, int handlerID, WorkType type, int numbytes, char* sendmsg, int local)
795 {
796   //CmiPrintStackTrace(0);
797   double sendT = BgGetTime();
798
799   double latency;
800   CmiSetHandler(sendmsg, cva(simState).msgHandler);
801   CmiBgMsgNodeID(sendmsg) = nodeInfo::XYZ2Global(x,y,z);
802   CmiBgMsgThreadID(sendmsg) = threadID;
803   CmiBgMsgHandle(sendmsg) = handlerID;
804   CmiBgMsgType(sendmsg) = type;
805   CmiBgMsgLength(sendmsg) = numbytes;
806   CmiBgMsgFlag(sendmsg) = 0;
807   CmiBgMsgRefCount(sendmsg) = 0;
808   if (local) {
809     if (correctTimeLog) BgAdvance(CHARM_OVERHEAD);
810     latency = 0.0;
811   }
812   else {
813     if (correctTimeLog) BgAdvance(cva(bgMach).network->alphacost());
814     latency = MSGTIME(myNode->x, myNode->y, myNode->z, x,y,z, numbytes);
815     CmiAssert(latency >= 0);
816   }
817   CmiBgMsgRecvTime(sendmsg) = latency + sendT;
818   
819   // timing
820   BG_ADDMSG(sendmsg, CmiBgMsgNodeID(sendmsg), threadID, sendT, local, 1);
821
822   //static int addCnt=1; //for debugging only
823   //DEBUGM(4, ("N[%d] add a msg (handler=%d | cnt=%d | len=%d | type=%d | node id:%d\n", BgMyNode(), handlerID, addCnt, numbytes, type, CmiBgMsgNodeID(sendmsg)));  
824   //addCnt++;
825
826   if (local){
827       /* Here local refers to the fact that msg is sent to the processor itself
828        * therefore, we just add this msg to the thread itself
829        */
830       //addBgThreadMessage(sendmsg,threadID);
831       addBgNodeInbuffer(sendmsg, myNode->id);
832   }    
833   else
834     CmiSendPacket(x, y, z, numbytes, sendmsg);
835
836   // bypassing send time
837   resetVTime();
838 }
839
840 /* broadcast will copy data to msg buffer */
841 static inline void nodeBroadcastPacketExcept_(int node, CmiInt2 threadID, int handlerID, WorkType type, int numbytes, char* sendmsg)
842 {
843   double sendT = BgGetTime();
844
845   nodeInfo *myNode = cta(threadinfo)->myNode;
846   CmiSetHandler(sendmsg, cva(simState).nBcastMsgHandler);
847   if (node >= 0)
848     CmiBgMsgNodeID(sendmsg) = -node-100;
849   else
850     CmiBgMsgNodeID(sendmsg) = node;
851   CmiBgMsgThreadID(sendmsg) = threadID; 
852   CmiBgMsgHandle(sendmsg) = handlerID;  
853   CmiBgMsgType(sendmsg) = type; 
854   CmiBgMsgLength(sendmsg) = numbytes;
855   CmiBgMsgFlag(sendmsg) = 0;
856   CmiBgMsgRefCount(sendmsg) = 0;
857   /* FIXME */
858   CmiBgMsgRecvTime(sendmsg) = MSGTIME(myNode->x, myNode->y, myNode->z, 0,0,0, numbytes) + sendT;
859
860   // timing
861   // FIXME
862   BG_ADDMSG(sendmsg, CmiBgMsgNodeID(sendmsg), threadID, sendT, 0, 1);
863
864   DEBUGF(("[%d]CmiSyncBroadcastAllAndFree node: %d\n", BgMyNode(), node));
865 #if DELAY_SEND
866   if (!correctTimeLog)
867 #endif
868   CmiSyncBroadcastAllAndFree(numbytes,sendmsg);
869
870   resetVTime();
871 }
872
873 /* broadcast will copy data to msg buffer */
874 static inline void threadBroadcastPacketExcept_(int node, CmiInt2 threadID, int handlerID, WorkType type, int numbytes, char* sendmsg)
875 {
876   CmiSetHandler(sendmsg, cva(simState).tBcastMsgHandler);       
877   if (node >= 0)
878     CmiBgMsgNodeID(sendmsg) = -node-100;
879   else
880     CmiBgMsgNodeID(sendmsg) = node;
881   CmiBgMsgThreadID(sendmsg) = threadID; 
882   CmiBgMsgHandle(sendmsg) = handlerID;  
883   CmiBgMsgType(sendmsg) = type; 
884   CmiBgMsgLength(sendmsg) = numbytes;
885   CmiBgMsgFlag(sendmsg) = 0;
886   CmiBgMsgRefCount(sendmsg) = 0;
887   /* FIXME */
888   if (correctTimeLog) BgAdvance(cva(bgMach).network->alphacost());
889   double sendT = BgGetTime();
890   CmiBgMsgRecvTime(sendmsg) = sendT;    
891
892   // timing
893 #if 0
894   if (node == BG_BROADCASTALL) {
895     for (int i=0; i<_bgSize; i++) {
896       for (int j=0; j<cva(numWth); j++) {
897         BG_ADDMSG(sendmsg, node);
898       }
899     }
900   }
901   else {
902     CmiAssert(node >= 0);
903     BG_ADDMSG(sendmsg, (node+100));
904   }
905 #else
906   // FIXME
907   BG_ADDMSG(sendmsg, CmiBgMsgNodeID(sendmsg), threadID, sendT, 0, 1);
908 #endif
909
910   DEBUGF(("[%d]CmiSyncBroadcastAllAndFree node: %d tid:%d recvT:%f\n", BgMyNode(), node, threadID, CmiBgMsgRecvTime(sendmsg)));
911 #if DELAY_SEND
912   if (!correctTimeLog)
913 #endif
914   CmiSyncBroadcastAllAndFree(numbytes,sendmsg);
915
916   resetVTime();
917 }
918
919
920 /* sendPacket to route */
921 /* this function can be called by any thread */
922 void BgSendNonLocalPacket(nodeInfo *myNode, int x, int y, int z, int threadID, int handlerID, WorkType type, int numbytes, char * data)
923 {
924   if (cva(bgMach).inReplayMode()) return;     // replay mode, no outgoing msg
925
926 #ifndef CMK_OPTIMIZE
927   if (x<0 || y<0 || z<0 || x>=cva(bgMach).x || y>=cva(bgMach).y || z>=cva(bgMach).z) {
928     CmiPrintf("Trying to send packet to a nonexisting node: (%d %d %d)!\n", x,y,z);
929     CmiAbort("Abort!\n");
930   }
931 #endif
932
933   sendPacket_(myNode, x, y, z, threadID, handlerID, type, numbytes, data, 0);
934 }
935
936 static void _BgSendLocalPacket(nodeInfo *myNode, int threadID, int handlerID, WorkType type, int numbytes, char * data)
937 {
938   sendPacket_(myNode, myNode->x, myNode->y, myNode->z, threadID, handlerID, type, numbytes, data, 1);
939 }
940
941 void BgSendLocalPacket(int threadID, int handlerID, WorkType type,
942                        int numbytes, char* data)
943 {
944   nodeInfo *myNode = cta(threadinfo)->myNode;
945
946   if (cva(bgMach).inReplayMode()) threadID = 0;    // replay mode
947
948   _BgSendLocalPacket(myNode, threadID, handlerID, type, numbytes, data);
949 }
950
951 /* wrapper of the previous two functions */
952 void BgSendPacket(int x, int y, int z, int threadID, int handlerID, WorkType type, int numbytes, char * data)
953 {
954   nodeInfo *myNode = cta(threadinfo)->myNode;
955   if (myNode->x == x && myNode->y == y && myNode->z == z)
956     _BgSendLocalPacket(myNode,threadID, handlerID, type, numbytes, data);
957   else
958     BgSendNonLocalPacket(myNode,x,y,z,threadID,handlerID, type, numbytes, data);
959 }
960
961 void BgBroadcastPacketExcept(int node, CmiInt2 threadID, int handlerID, WorkType type, int numbytes, char * data)
962 {
963   nodeBroadcastPacketExcept_(node, threadID, handlerID, type, numbytes, data);
964 }
965
966 void BgBroadcastAllPacket(int handlerID, WorkType type, int numbytes, char * data)
967 {
968   nodeBroadcastPacketExcept_(BG_BROADCASTALL, ANYTHREAD, handlerID, type, numbytes, data);
969 }
970
971 void BgThreadBroadcastPacketExcept(int node, CmiInt2 threadID, int handlerID, WorkType type, int numbytes, char * data)
972 {
973   if (cva(bgMach).inReplayMode()) return;    // replay mode
974   threadBroadcastPacketExcept_(node, threadID, handlerID, type, numbytes, data);
975 }
976
977 void BgThreadBroadcastAllPacket(int handlerID, WorkType type, int numbytes, char * data)
978 {
979   if (cva(bgMach).inReplayMode()) {      // replay mode, send only to itself
980     BgSendLocalPacket(ANYTHREAD, handlerID, type, numbytes, data);
981     return;
982   }
983   threadBroadcastPacketExcept_(BG_BROADCASTALL, ANYTHREAD, handlerID, type, numbytes, data);
984 }
985
986 /**
987  send a msg to a list of processors (processors represented in global seq #
988 */
989 void BgSyncListSend(int npes, int *pes, int handlerID, WorkType type, int numbytes, char *msg)
990 {
991   nodeInfo *myNode = cta(threadinfo)->myNode;
992
993   CmiSetHandler(msg, cva(simState).msgHandler);
994   CmiBgMsgHandle(msg) = handlerID;
995   CmiBgMsgType(msg) = type;
996   CmiBgMsgLength(msg) = numbytes;
997   CmiBgMsgFlag(msg) = 0;
998   CmiBgMsgRefCount(msg) = 0;
999
1000   if (correctTimeLog) BgAdvance(cva(bgMach).network->alphacost());
1001
1002   double now = BgGetTime();
1003
1004   // send one by one
1005   for (int i=0; i<npes; i++)
1006   {
1007     int local = 0;
1008     int x,y,z,t;
1009     int pe = pes[i];
1010 #if CMK_BLUEGENE_NODE
1011     CmiAbort("Not implemented yet!");
1012 #else
1013     t = pe%BgGetNumWorkThread();
1014     pe = pe/BgGetNumWorkThread();
1015     BgGetXYZ(pe, &x, &y, &z);
1016 #endif
1017
1018     char *sendmsg = CmiCopyMsg(msg, numbytes);
1019     CmiBgMsgNodeID(sendmsg) = nodeInfo::XYZ2Global(x,y,z);
1020     CmiBgMsgThreadID(sendmsg) = t;
1021     double latency = MSGTIME(myNode->x, myNode->y, myNode->z, x,y,z, numbytes);
1022     CmiAssert(latency >= 0);
1023     CmiBgMsgRecvTime(sendmsg) = latency + now;
1024
1025     if (myNode->x == x && myNode->y == y && myNode->z == z) local = 1;
1026
1027     // timing and make sure all msgID are the same
1028     if (i!=0) CpvAccess(msgCounter) --;
1029     BG_ADDMSG(sendmsg, CmiBgMsgNodeID(sendmsg), t, now, local, i==0?npes:-1);
1030
1031     if (myNode->x == x && myNode->y == y && myNode->z == z)
1032       addBgNodeInbuffer(sendmsg, myNode->id);
1033     else
1034       CmiSendPacket(x, y, z, numbytes, sendmsg);
1035   }
1036
1037   CmiFree(msg);
1038
1039   resetVTime();
1040 }
1041
1042 /*****************************************************************************
1043       BG node level API - utilities
1044 *****************************************************************************/
1045
1046 /* must be called in a communication or worker thread */
1047 void BgGetMyXYZ(int *x, int *y, int *z)
1048 {
1049   ASSERT(!cva(simState).inEmulatorInit);
1050   *x = tMYX; *y = tMYY; *z = tMYZ;
1051 }
1052
1053 void BgGetXYZ(int seq, int *x, int *y, int *z)
1054 {
1055   nodeInfo::Global2XYZ(seq, x, y, z);
1056 }
1057
1058 void BgGetSize(int *sx, int *sy, int *sz)
1059 {
1060   cva(bgMach).getSize(sx, sy, sz);
1061 }
1062
1063 int BgTraceProjectionOn(int pe)
1064 {
1065   return cva(bgMach).traceProejctions(pe);
1066 }
1067
1068 /* return the total number of Blue gene nodes */
1069 int BgNumNodes()
1070 {
1071   return _bgSize;
1072 }
1073
1074 void BgSetNumNodes(int x)
1075 {
1076   _bgSize = x;
1077 }
1078
1079 /* can only called in emulatorinit */
1080 void BgSetSize(int sx, int sy, int sz)
1081 {
1082   ASSERT(cva(simState).inEmulatorInit);
1083   cva(bgMach).setSize(sx, sy, sz);
1084 }
1085
1086 /* return number of bg nodes on this emulator node */
1087 int BgNodeSize()
1088 {
1089   ASSERT(!cva(simState).inEmulatorInit);
1090   return cva(numNodes);
1091 }
1092
1093 /* return the bg node ID (local array index) */
1094 int BgMyRank()
1095 {
1096 #ifndef CMK_OPTIMIZE
1097   if (tMYNODE == NULL) CmiAbort("Calling BgMyRank in the main thread!");
1098 #endif
1099   ASSERT(!cva(simState).inEmulatorInit);
1100   return tMYNODEID;
1101 }
1102
1103 /* return my serialed blue gene node number */
1104 int BgMyNode()
1105 {
1106 #ifndef CMK_OPTIMIZE
1107   if (tMYNODE == NULL) CmiAbort("Calling BgMyNode in the main thread!");
1108 #endif
1109   return nodeInfo::XYZ2Global(tMYX, tMYY, tMYZ);
1110 }
1111
1112 /* return a real processor number from a bg node */
1113 int BgNodeToRealPE(int node)
1114 {
1115   return nodeInfo::Global2PE(node);
1116 }
1117
1118 // thread ID on a BG node
1119 int BgGetThreadID()
1120 {
1121   ASSERT(tTHREADTYPE == WORK_THREAD || tTHREADTYPE == COMM_THREAD);
1122 //  if (cva(bgMach).numWth == 1) return 0;   // accessing ctv is expensive
1123   return tMYID;
1124 }
1125
1126 void BgSetThreadID(int x)
1127 {
1128   tMYID = x;
1129 }
1130
1131 int BgGetGlobalThreadID()
1132 {
1133   ASSERT(tTHREADTYPE == WORK_THREAD || tTHREADTYPE == COMM_THREAD);
1134   return nodeInfo::Local2Global(tMYNODEID)*(cva(bgMach).numTh())+tMYID;
1135   //return tMYGLOBALID;
1136 }
1137
1138 int BgGetGlobalWorkerThreadID()
1139 {
1140   ASSERT(tTHREADTYPE == WORK_THREAD);
1141 //  return nodeInfo::Local2Global(tMYNODEID)*cva(bgMach).numWth+tMYID;
1142   return tMYGLOBALID;
1143 }
1144
1145 void BgSetGlobalWorkerThreadID(int pe)
1146 {
1147   ASSERT(tTHREADTYPE == WORK_THREAD);
1148 //  return nodeInfo::Local2Global(tMYNODEID)*cva(bgMach).numWth+tMYID;
1149   tMYGLOBALID = pe;
1150 }
1151
1152 char *BgGetNodeData()
1153 {
1154   return tUSERDATA;
1155 }
1156
1157 void BgSetNodeData(char *data)
1158 {
1159   ASSERT(!cva(simState).inEmulatorInit);
1160   tUSERDATA = data;
1161 }
1162
1163 int BgGetNumWorkThread()
1164 {
1165   return cva(bgMach).numWth;
1166 }
1167
1168 void BgSetNumWorkThread(int num)
1169 {
1170   if (!cva(bgMach).inReplayMode()) ASSERT(cva(simState).inEmulatorInit);
1171   cva(bgMach).numWth = num;
1172 }
1173
1174 int BgGetNumCommThread()
1175 {
1176   return cva(bgMach).numCth;
1177 }
1178
1179 void BgSetNumCommThread(int num)
1180 {
1181   ASSERT(cva(simState).inEmulatorInit);
1182   cva(bgMach).numCth = num;
1183 }
1184
1185 /*****************************************************************************
1186       Communication and Worker threads
1187 *****************************************************************************/
1188
1189 BgStartHandler  workStartFunc = NULL;
1190
1191 void BgSetWorkerThreadStart(BgStartHandler f)
1192 {
1193   workStartFunc = f;
1194 }
1195
1196 extern "C" void CthResumeNormalThread(CthThreadToken* token);
1197
1198 // kernel function for processing a bluegene message
1199 void BgProcessMessageDefault(threadInfo *tinfo, char *msg)
1200 {
1201   DEBUGM(5, ("=====Begin of BgProcessing a msg on node[%d]=====\n", BgMyNode()));
1202   int handler = CmiBgMsgHandle(msg);
1203   //CmiPrintf("[%d] call handler %d\n", BgMyNode(), handler);
1204   CmiAssert(handler < 1000);
1205
1206   BgHandlerInfo *handInfo;
1207 #if  CMK_BLUEGENE_NODE
1208   HandlerTable hdlTbl = tMYNODE->handlerTable;
1209   handInfo = hdlTbl.getHandle(handler);
1210 #else
1211   HandlerTable hdlTbl = tHANDLETAB;
1212   handInfo = hdlTbl.getHandle(handler);
1213   if (handInfo == NULL) handInfo = tMYNODE->handlerTable.getHandle(handler);
1214 #endif
1215
1216   if (handInfo == NULL) {
1217     CmiPrintf("[%d] invalid handler: %d. \n", tMYNODEID, handler);
1218     CmiAbort("BgProcessMessage Failed!");
1219   }
1220   BgHandlerEx entryFunc = handInfo->fnPtr;
1221
1222   CmiSetHandler(msg, CmiBgMsgHandle(msg));
1223
1224   // optimization for broadcast messages:
1225   // if the msg is a broadcast token msg, expand it to a real msg
1226   if (CmiBgMsgFlag(msg) == BG_CLONE) {
1227     msg = BgExpandMsg(msg);
1228   }
1229
1230   if (tinfo->watcher) tinfo->watcher->record(msg);
1231
1232   // don't count thread overhead and timing overhead
1233   startVTimer();
1234
1235   DEBUGM(5, ("Executing function %p\n", entryFunc));    
1236
1237   entryFunc(msg, handInfo->userPtr);
1238
1239   stopVTimer();
1240
1241   DEBUGM(5, ("=====End of BgProcessing a msg on node[%d]=====\n\n", BgMyNode()));
1242 }
1243
1244 void  (*BgProcessMessage)(threadInfo *t, char *msg) = BgProcessMessageDefault;
1245
1246 void scheduleWorkerThread(char *msg)
1247 {
1248   CthThread tid = (CthThread)msg;
1249 //CmiPrintf("scheduleWorkerThread %p\n", tid);
1250   CthAwaken(tid);
1251 }
1252
1253 // thread entry
1254 // for both comm and work thread, virtual function
1255 void run_thread(threadInfo *tinfo)
1256 {
1257   /* set the thread-private threadinfo */
1258   cta(threadinfo) = tinfo;
1259   tinfo->run();
1260 }
1261
1262 /* should be done only once per bg node */
1263 void BgNodeInitialize(nodeInfo *ninfo)
1264 {
1265   CthThread t;
1266   int i;
1267
1268   /* this is here because I will put a message to node inbuffer */
1269   tCURRTIME = 0.0;
1270   tSTARTTIME = CmiWallTimer();
1271
1272   /* creat work threads */
1273   for (i=0; i< cva(bgMach).numWth; i++)
1274   {
1275     threadInfo *tinfo = ninfo->threadinfo[i];
1276     t = CthCreate((CthVoidFn)run_thread, tinfo, cva(bgMach).stacksize);
1277     if (t == NULL) CmiAbort("BG> Failed to create worker thread. \n");
1278     tinfo->setThread(t);
1279     /* put to thread table */
1280     tTHREADTABLE[tinfo->id] = t;
1281 #if BIGSIM_OUT_OF_CORE && BIGSIM_OOC_PREFETCH
1282     //initial scheduling points for workthreads
1283     if(bgUseOutOfCore) schedWorkThds->push((workThreadInfo *)tinfo);
1284 #endif
1285     CthAwaken(t);
1286   }
1287
1288   /* creat communication thread */
1289   for (i=0; i< cva(bgMach).numCth; i++)
1290   {
1291     threadInfo *tinfo = ninfo->threadinfo[i+cva(bgMach).numWth];
1292     t = CthCreate((CthVoidFn)run_thread, tinfo, cva(bgMach).stacksize);
1293     if (t == NULL) CmiAbort("BG> Failed to create communication thread. \n");
1294     tinfo->setThread(t);
1295     /* put to thread table */
1296     tTHREADTABLE[tinfo->id] = t;
1297     CthAwaken(t);
1298   }
1299
1300 }
1301
1302 static void beginExitHandlerFunc(void *msg);
1303 static void writeToDisk();
1304 static void sendCorrectionStats();
1305
1306 static CmiHandler exitHandlerFunc(char *msg)
1307 {
1308   // TODO: free memory before exit
1309   int i,j;
1310
1311   programExit = 2;
1312 #if BLUEGENE_TIMING
1313   // timing
1314   if (0)        // detail
1315   if (genTimeLog) {
1316     for (j=0; j<cva(numNodes); j++)
1317     for (i=0; i<cva(bgMach).numWth; i++) {
1318       BgTimeLine &log = cva(nodeinfo)[j].timelines[i].timeline; 
1319 //      BgPrintThreadTimeLine(nodeInfo::Local2Global(j), i, log);
1320       int x,y,z;
1321       nodeInfo::Local2XYZ(j, &x, &y, &z);
1322       BgWriteThreadTimeLine(arg_argv[0], x, y, z, i, log);
1323     }
1324
1325   }
1326 #endif
1327   if (genTimeLog) sendCorrectionStats();
1328
1329   if (genTimeLog) writeToDisk();
1330
1331 //  if (tTHREADTYPE == WORK_THREAD)
1332   {
1333   int origPe = -2;
1334   // close all tracing modules
1335   for (j=0; j<cva(numNodes); j++)
1336     for (i=0; i<cva(bgMach).numWth; i++) {
1337       int oldPe = CmiSwitchToPE(nodeInfo::Local2Global(j)*cva(bgMach).numWth+i);
1338       if (origPe == -2) origPe = oldPe;
1339       traceCharmClose();
1340 //      CmiSwitchToPE(oldPe);
1341       delete cva(nodeinfo)[j].threadinfo[i]->watcher;   // force dump watcher
1342       cva(nodeinfo)[j].threadinfo[i]->watcher = NULL;
1343     }
1344     if (origPe!=-2) CmiSwitchToPE(origPe);
1345   }
1346
1347 #if 0
1348   delete [] cva(nodeinfo);
1349   delete [] cva(inBuffer);
1350   for (i=0; i<cva(numNodes); i++) CmmFree(cva(msgBuffer)[i]);
1351   delete [] cva(msgBuffer);
1352 #endif
1353
1354 #if CMK_HAS_COUNTER_PAPI
1355   if (cva(bgMach).timingMethod == BG_COUNTER) {
1356 /*        
1357   CmiPrintf("BG[PE %d]> cycles: %lld\n", CmiMyPe(), total_ins);
1358   CmiPrintf("BG[PE %d]> floating point instructions: %lld\n", CmiMyPe(), total_fps);
1359   CmiPrintf("BG[PE %d]> L1 cache misses: %lld\n", CmiMyPe(), total_l1_dcm);
1360   //CmiPrintf("BG[PE %d]> cycles stalled waiting for memory access: %lld\n", CmiMyPe(), total_mem_rcy);
1361  */
1362         for(int i=0; i<numPapiEvents; i++){
1363           CmiPrintf("BG[PE %d]> %s: %lld\n", CmiMyPe(), papi_counters_desc[i], total_papi_counters[i]);
1364            delete papi_counters_desc[i];
1365         }
1366         delete papiEvents;
1367         delete papiValues;
1368         delete papi_counters_desc;
1369         delete total_papi_counters;
1370   }
1371 #endif
1372
1373   //ConverseExit();
1374   if (genTimeLog)
1375     { if (CmiMyPe() != 0) CsdExitScheduler(); }
1376   else
1377     CsdExitScheduler();
1378
1379   //if (CmiMyPe() == 0) CmiPrintf("\nBG> BlueGene emulator shutdown gracefully!\n");
1380
1381   return 0;
1382 }
1383
1384 static void sanityCheck()
1385 {
1386   if (cva(bgMach).x==0 || cva(bgMach).y==0 || cva(bgMach).z==0)  {
1387     if (CmiMyPe() == 0)
1388       CmiPrintf("\nMissing parameters for BlueGene machine size!\n<tip> use command line options: +x, +y, or +z.\n");
1389     BgShutdown(); 
1390   } 
1391   else if (cva(bgMach).numCth==0 || cva(bgMach).numWth==0) { 
1392 #if 1
1393     if (cva(bgMach).numCth==0) cva(bgMach).numCth=1;
1394     if (cva(bgMach).numWth==0) cva(bgMach).numWth=1;
1395 #else
1396     if (CmiMyPe() == 0)
1397       CmiPrintf("\nMissing parameters for number of communication/worker threads!\n<tip> use command line options: +cth or +wth.\n");
1398     BgShutdown(); 
1399 #endif
1400   }
1401   if (cva(bgMach).getNodeSize()<CmiNumPes()) {
1402     CmiAbort("\nToo few BlueGene nodes!\n");
1403   }
1404 }
1405
1406 #undef CmiSwitchToPE
1407 extern "C" int CmiSwitchToPEFn(int pe);
1408
1409 // main
1410 CmiStartFn bgMain(int argc, char **argv)
1411 {
1412   int i;
1413   char *configFile = NULL;
1414
1415 #if CMK_CONDS_USE_SPECIAL_CODE
1416   // overwrite possible implementation in machine.c
1417   CmiSwitchToPE = CmiSwitchToPEFn;
1418 #endif
1419
1420   /* initialize all processor level data */
1421   CpvInitialize(BGMach,bgMach);
1422   cva(bgMach).nullify();
1423
1424   CmiArgGroup("Charm++","BlueGene Simulator");
1425   if (CmiGetArgStringDesc(argv, "+bgconfig", &configFile, "BlueGene machine config file")) {
1426    cva(bgMach). read(configFile);
1427   }
1428   CmiGetArgIntDesc(argv, "+x", &cva(bgMach).x, 
1429                 "The x size of the grid of nodes");
1430   CmiGetArgIntDesc(argv, "+y", &cva(bgMach).y, 
1431                 "The y size of the grid of nodes");
1432   CmiGetArgIntDesc(argv, "+z", &cva(bgMach).z, 
1433                 "The z size of the grid of nodes");
1434   CmiGetArgIntDesc(argv, "+cth", &cva(bgMach).numCth, 
1435                 "The number of simulated communication threads per node");
1436   CmiGetArgIntDesc(argv, "+wth", &cva(bgMach).numWth, 
1437                 "The number of simulated worker threads per node");
1438
1439   CmiGetArgIntDesc(argv, "+bgstacksize", &cva(bgMach).stacksize, 
1440                 "Blue Gene thread stack size");
1441
1442   if (CmiGetArgFlagDesc(argv, "+bglog", "Write events to log file"))
1443      genTimeLog = 1;
1444   if (CmiGetArgFlagDesc(argv, "+bgcorrect", "Apply timestamp correction to logs"))
1445     correctTimeLog = 1;
1446   schedule_flag = 0;
1447   if (correctTimeLog) {
1448     genTimeLog = 1;
1449     schedule_flag = 1;
1450   }
1451
1452   if (CmiGetArgFlagDesc(argv, "+bgverbose", "Print debug info verbosely"))
1453     bgverbose = 1;
1454
1455   // for timing method, default using elapse calls.
1456   if(CmiGetArgFlagDesc(argv, "+bgelapse", 
1457                        "Use user provided BgElapse for time prediction")) 
1458       cva(bgMach).timingMethod = BG_ELAPSE;
1459   if(CmiGetArgFlagDesc(argv, "+bgwalltime", 
1460                        "Use walltime method for time prediction")) 
1461       cva(bgMach).timingMethod = BG_WALLTIME;
1462 #ifdef CMK_ORIGIN2000
1463   if(CmiGetArgFlagDesc(argv, "+bgcounter", "Use performance counter")) 
1464       cva(bgMach).timingMethod = BG_COUNTER;
1465 #elif CMK_HAS_COUNTER_PAPI
1466   if (CmiGetArgFlagDesc(argv, "+bgpapi", "Use PAPI Performance counters")) {
1467     cva(bgMach).timingMethod = BG_COUNTER;
1468   }
1469   if (cva(bgMach).timingMethod == BG_COUNTER) {
1470     init_counters();
1471   }
1472 #endif
1473   CmiGetArgDoubleDesc(argv,"+bgfpfactor", &cva(bgMach).fpfactor, 
1474                       "floating point to time factor");
1475   CmiGetArgDoubleDesc(argv,"+bgcpufactor", &cva(bgMach).cpufactor, 
1476                       "scale factor for wallclock time measured");
1477   CmiGetArgDoubleDesc(argv,"+bgtimercost", &cva(bgMach).timercost, 
1478                       "timer cost");
1479   #if 0
1480   if(cva(bgMach).timingMethod == BG_WALLTIME)
1481   {
1482       int count = 1e6;
1483       double start, stop, diff, cost, dummy;
1484
1485       dummy = BG_TIMER(); // In case there's an initialization delay somewhere
1486
1487       start = BG_TIMER();
1488       for (int i = 0; i < count; ++i)
1489           dummy = BG_TIMER();
1490       stop = BG_TIMER();
1491
1492       diff = stop - start;
1493       cost = diff / count;
1494
1495       CmiPrintf("Measured timer cost: %g Actual: %g\n", 
1496                 cost, cva(bgMach).timercost);
1497       cva(bgMach).timercost = cost;
1498   }
1499   #endif
1500   
1501   char *networkModel;
1502   if (CmiGetArgStringDesc(argv, "+bgnetwork", &networkModel, "Network model")) {
1503    cva(bgMach).setNetworkModel(networkModel);
1504   }
1505
1506   bgcorroff = 0;
1507   if(CmiGetArgFlagDesc(argv, "+bgcorroff", "Start with correction off")) 
1508     bgcorroff = 1;
1509
1510   bgstats_flag=0;
1511   if(CmiGetArgFlagDesc(argv, "+bgstats", "Print correction statistics")) 
1512     bgstats_flag = 1;
1513
1514   if (CmiGetArgStringDesc(argv, "+bgtraceroot", &cva(bgMach).traceroot, "Directory to write bgTrace files to"))
1515   {
1516     char *root = (char*)malloc(strlen(cva(bgMach).traceroot) + 10);
1517     sprintf(root, "%s/", cva(bgMach).traceroot);
1518     cva(bgMach).traceroot = root;
1519   }
1520
1521   // record/replay
1522   if (CmiGetArgFlagDesc(argv,"+bgrecord","Record message processing order for BigSim")) {
1523     cva(bgMach).record = 1;
1524     if (CmiMyPe() == 0)
1525       CmiPrintf("BG info> full record mode. \n");
1526   }
1527   int replaype;
1528   if (CmiGetArgIntDesc(argv,"+bgreplay", &replaype, "Re-play message processing order for BigSim")) {
1529     cva(bgMach).replay = replaype;
1530   }
1531   else {
1532     if (CmiGetArgFlagDesc(argv,"+bgreplay","Record message processing order for BigSim"))
1533     cva(bgMach).replay = 0;    // default to 0
1534   }
1535   if (cva(bgMach).replay >= 0) {
1536     if (CmiNumPes()>1)
1537       CmiAbort("BG> bgreplay mode must run on one physical processor.");
1538     if (cva(bgMach).x!=1 || cva(bgMach).y!=1 || cva(bgMach).z!=1 ||
1539          cva(bgMach).numWth!=1 || cva(bgMach).numCth!=1)
1540       CmiAbort("BG> bgreplay mode must run on one target processor.");
1541     CmiPrintf("BG info> replay mode for target processor %d.\n", cva(bgMach).replay);
1542   }
1543   char *procs = NULL;
1544   if (CmiGetArgStringDesc(argv, "+bgrecordprocessors", &procs, "A list of processors to record, e.g. 0,10,20-30")) {
1545     cva(bgMach).recordprocs.set(procs);
1546   }
1547
1548
1549   /* parameters related with out-of-core execution */
1550   int tmpcap=0;
1551   if (CmiGetArgIntDesc(argv, "+bgooccap", &tmpcap, "Simulate with out-of-core support and the number of target processors allowed in memory")){
1552      TBLCAPACITY = tmpcap;
1553     }
1554   if (CmiGetArgDoubleDesc(argv, "+bgooc", &bgOOCMaxMemSize, "Simulate with out-of-core support and the threshhold of memory size")){
1555       bgUseOutOfCore = 1;
1556       _BgInOutOfCoreMode = 1; //the global (the whole converse layer) out-of-core flag
1557
1558       double curFreeMem = bgGetSysFreeMemSize();
1559       if(fabs(bgOOCMaxMemSize - 0.0)<=1e-6){
1560         //using the memory available of the system right now
1561         //assuming no other programs will run after this program runs
1562         bgOOCMaxMemSize = curFreeMem;
1563         CmiPrintf("Using the system's current memory available: %.3fMB\n", bgOOCMaxMemSize);
1564       }
1565       if(bgOOCMaxMemSize > curFreeMem){
1566         CmiPrintf("Warning: not enough memory for the specified memory size, now use the current available memory %3.fMB.\n", curFreeMem);
1567         bgOOCMaxMemSize = curFreeMem;
1568       }
1569       DEBUGF(("out-of-core turned on!\n"));
1570   }      
1571
1572 #if BLUEGENE_DEBUG_LOG
1573   {
1574     char ln[200];
1575     sprintf(ln,"bgdebugLog.%d",CmiMyPe());
1576     bgDebugLog=fopen(ln,"w");
1577   }
1578 #endif
1579
1580   arg_argv = argv;
1581   arg_argc = CmiGetArgc(argv);
1582
1583   /* msg handler */
1584   CpvInitialize(SimState, simState);
1585   cva(simState).msgHandler = CmiRegisterHandler((CmiHandler) msgHandlerFunc);
1586   cva(simState).nBcastMsgHandler = CmiRegisterHandler((CmiHandler)nodeBCastMsgHandlerFunc);
1587   cva(simState).tBcastMsgHandler = CmiRegisterHandler((CmiHandler)threadBCastMsgHandlerFunc);
1588   cva(simState).exitHandler = CmiRegisterHandler((CmiHandler) exitHandlerFunc);
1589
1590   cva(simState).beginExitHandler = CmiRegisterHandler((CmiHandler) beginExitHandlerFunc);
1591   cva(simState).inEmulatorInit = 1;
1592   /* call user defined BgEmulatorInit */
1593   BgEmulatorInit(arg_argc, arg_argv);
1594   cva(simState).inEmulatorInit = 0;
1595
1596   /* check if all bluegene node size and thread information are set */
1597   sanityCheck();
1598
1599   _bgSize = cva(bgMach).getNodeSize(); 
1600
1601   timerFunc = BgGetTime;
1602
1603   BgInitTiming();               // timing module
1604
1605   if (CmiMyPe() == 0) {
1606     CmiPrintf("BG info> Simulating %dx%dx%d nodes with %d comm + %d work threads each.\n", cva(bgMach).x, cva(bgMach).y, cva(bgMach).z, cva(bgMach).numCth, cva(bgMach).numWth);
1607     CmiPrintf("BG info> Network type: %s.\n", cva(bgMach).network->name());
1608     cva(bgMach).network->print();
1609     CmiPrintf("BG info> cpufactor is %f.\n", cva(bgMach).cpufactor);
1610     CmiPrintf("BG info> floating point factor is %f.\n", cva(bgMach).fpfactor);
1611     if (cva(bgMach).stacksize)
1612       CmiPrintf("BG info> BG stack size: %d bytes. \n", cva(bgMach).stacksize);
1613     if (cva(bgMach).timingMethod == BG_ELAPSE) 
1614       CmiPrintf("BG info> Using BgElapse calls for timing method. \n");
1615     else if (cva(bgMach).timingMethod == BG_WALLTIME)
1616       CmiPrintf("BG info> Using WallTimer for timing method. \n");
1617     else if (cva(bgMach).timingMethod == BG_COUNTER)
1618       CmiPrintf("BG info> Using performance counter for timing method. \n");
1619     if (genTimeLog)
1620       CmiPrintf("BG info> Generating timing log. \n");
1621     if (correctTimeLog)
1622       CmiPrintf("BG info> Perform timestamp correction. \n");
1623     if (cva(bgMach).traceroot)
1624       CmiPrintf("BG info> bgTrace root is '%s'. \n", cva(bgMach).traceroot);
1625   }
1626
1627   CtvInitialize(threadInfo *, threadinfo);
1628
1629   /* number of bg nodes on this PE */
1630   CpvInitialize(int, numNodes);
1631   cva(numNodes) = nodeInfo::numLocalNodes();
1632
1633   if (CmiMyRank() == 0)
1634     initLock = CmiCreateLock();     // used for BnvInitialize
1635
1636   bgstreaming.init(cva(numNodes));
1637
1638   //Must initialize out-of-core related data structures before creating any BG nodes and procs
1639 if(bgUseOutOfCore){
1640       initTblThreadInMem();
1641           #if BIGSIM_OUT_OF_CORE && BIGSIM_OOC_PREFETCH
1642       //init prefetch status      
1643       thdsOOCPreStatus = new oocPrefetchStatus[cva(numNodes)*cva(bgMach).numWth];
1644       oocPrefetchSpace = new oocPrefetchBufSpace();
1645       schedWorkThds = new oocWorkThreadQueue();
1646           #endif
1647   }
1648
1649 #if BIGSIM_OUT_OF_CORE
1650   //initialize variables related to get precise
1651   //physical memory usage info for a process
1652   bgMemPageSize = getpagesize();
1653   memset(bgMemStsFile, 0, 25); 
1654   sprintf(bgMemStsFile, "/proc/%d/statm", getpid());
1655 #endif
1656
1657
1658   /* create BG nodes */
1659   CpvInitialize(nodeInfo *, nodeinfo);
1660   cva(nodeinfo) = new nodeInfo[cva(numNodes)];
1661   _MEMCHECK(cva(nodeinfo));
1662
1663   cta(threadinfo) = new threadInfo(-1, UNKNOWN_THREAD, NULL);
1664   _MEMCHECK(cta(threadinfo));
1665
1666   /* create BG processors for each node */
1667   for (i=0; i<cva(numNodes); i++)
1668   {
1669     nodeInfo *ninfo = cva(nodeinfo) + i;
1670     // create threads
1671     ninfo->initThreads(i);
1672
1673     /* pretend that I am a thread */
1674     cta(threadinfo)->myNode = ninfo;
1675
1676     /* initialize a BG node and fire all threads */
1677     BgNodeInitialize(ninfo);
1678   }
1679   // clear main thread.
1680   cta(threadinfo)->myNode = NULL;
1681   CpvInitialize(CthThread, mainThread);
1682   cva(mainThread) = CthSelf();
1683
1684   CpvInitialize(int, CthResumeBigSimThreadIdx);
1685
1686   cva(simState).simStartTime = CmiWallTimer();    
1687   return 0;
1688 }
1689
1690 // for conv-conds:
1691 // if -2 untouch
1692 // if -1 main thread
1693 #if CMK_BLUEGENE_THREAD
1694 extern "C" int CmiSwitchToPEFn(int pe)
1695 {
1696   if (pe == -2) return -2;
1697   int oldpe;
1698 //  ASSERT(tTHREADTYPE != COMM_THREAD);
1699   if (tMYNODE == NULL) oldpe = -1;
1700   else if (tTHREADTYPE == COMM_THREAD) oldpe = -BgGetThreadID();
1701   else if (tTHREADTYPE == WORK_THREAD) oldpe = BgGetGlobalWorkerThreadID();
1702   else oldpe = -1;
1703 //CmiPrintf("CmiSwitchToPE from %d to %d\n", oldpe, pe);
1704   if (pe == -1) {
1705     CthSwitchThread(cva(mainThread));
1706   }
1707   else if (pe < 0) {
1708   }
1709   else {
1710     if (cva(bgMach).inReplayMode()) pe = 0;         /* replay mode */
1711     int t = pe%cva(bgMach).numWth;
1712     int newpe = nodeInfo::Global2Local(pe/cva(bgMach).numWth);
1713     nodeInfo *ninfo = cva(nodeinfo) + newpe;;
1714     threadInfo *tinfo = ninfo->threadinfo[t];
1715     CthSwitchThread(tinfo->me);
1716   }
1717   return oldpe;
1718 }
1719 #else
1720 extern "C" int CmiSwitchToPEFn(int pe)
1721 {
1722   if (pe == -2) return -2;
1723   int oldpe;
1724   if (tMYNODE == NULL) oldpe = -1;
1725   else oldpe = BgMyNode();
1726   if (pe == -1) {
1727     cta(threadinfo)->myNode = NULL;
1728   }
1729   else {
1730     int newpe = nodeInfo::Global2Local(pe);
1731     cta(threadinfo)->myNode = cva(nodeinfo) + newpe;;
1732   }
1733   return oldpe;
1734 }
1735 #endif
1736
1737
1738 /*****************************************************************************
1739                         TimeLog correction
1740 *****************************************************************************/
1741
1742 extern void processCorrectionMsg(int nodeidx);
1743
1744 // return the msg pointer, and the index of the message in the affinity queue.
1745 static inline char* searchInAffinityQueue(int nodeidx, BgMsgID &msgId, CmiInt2 tID, int &index)
1746 {
1747   CmiAssert(tID != ANYTHREAD);
1748   ckMsgQueue &affinityQ = cva(nodeinfo)[nodeidx].affinityQ[tID];
1749   for (int i=0; i<affinityQ.length(); i++)  {
1750       char *msg = affinityQ[i];
1751       BgMsgID md = BgMsgID(CmiBgMsgSrcPe(msg), CmiBgMsgID(msg));
1752       if (msgId == md) {
1753         index = i;
1754         return msg;
1755       }
1756   }
1757   return NULL;
1758 }
1759
1760 // return the msg pointer, thread id and the index of the message in the affinity queue.
1761 static char* searchInAffinityQueueInNode(int nodeidx, BgMsgID &msgId, CmiInt2 &tID, int &index)
1762 {
1763   for (tID=0; tID<cva(bgMach).numWth; tID++) {
1764     char *msg = searchInAffinityQueue(nodeidx, msgId, tID, index);
1765     if (msg) return msg;
1766   }
1767   return NULL;
1768 }
1769
1770 StateCounters stateCounters;
1771
1772 int updateRealMsgs(bgCorrectionMsg *cm, int nodeidx)
1773 {
1774   char *msg;
1775   CmiInt2 tID = cm->tID;
1776   int index;
1777   if (tID == ANYTHREAD) {
1778     msg = searchInAffinityQueueInNode(nodeidx, cm->msgId, tID, index);
1779   }
1780   else {
1781     msg = searchInAffinityQueue(nodeidx, cm->msgId, tID, index);
1782   }
1783   if (msg == NULL) return 0;
1784
1785   ckMsgQueue &affinityQ = cva(nodeinfo)[nodeidx].affinityQ[tID];
1786   CmiBgMsgRecvTime(msg) = cm->tAdjust;
1787   affinityQ.update(index);
1788   CthThread tid = cva(nodeinfo)[nodeidx].threadTable[tID];
1789   unsigned int prio = (unsigned int)(cm->tAdjust*PRIO_FACTOR)+1;
1790   CthAwakenPrio(tid, CQS_QUEUEING_IFIFO, sizeof(int), &prio);
1791   stateCounters.corrMsgCRCnt++;
1792   return 1;       /* invalidate this msg */
1793 }
1794
1795 extern void processBufferCorrectionMsgs(void *ignored);
1796
1797 // Coverse handler for begin exit
1798 // flush and process all correction messages
1799 static void beginExitHandlerFunc(void *msg)
1800 {
1801   CmiFree(msg);
1802   delayCheckFlag = 0;
1803 //CmiPrintf("\n\n\nbeginExitHandlerFunc called on %d\n", CmiMyPe());
1804   programExit = 1;
1805 #if LIMITED_SEND
1806   CQdCreate(CpvAccess(cQdState), BgNodeSize());
1807 #endif
1808
1809 #if 1
1810   for (int i=0; i<BgNodeSize(); i++) 
1811     processCorrectionMsg(i); 
1812 #if USE_MULTISEND
1813   BgSendBufferedCorrMsgs();
1814 #endif
1815 #else
1816   // the msg queue should be empty now.
1817   // don't do insert adjustment, but start to do all timing correction from here
1818   int nodeidx, tID;
1819   for (nodeidx=0; nodeidx<BgNodeSize(); nodeidx++) {
1820     BgTimeLineRec *tlines = cva(nodeinfo)[nodeidx].timelines;
1821     for (tID=0; tID<cva(numWth); tID++) {
1822         BgTimeLineRec &tlinerec = tlines[tID];
1823         BgAdjustTimeLine(tlinerec, nodeidx, tID);
1824     }
1825   }
1826
1827 #endif
1828
1829 #if !THROTTLE_WORK
1830 #if DELAY_CHECK
1831   CcdCallFnAfter(processBufferCorrectionMsgs,NULL,CHECK_INTERVAL);
1832 #endif
1833 #endif
1834 }
1835
1836 #define HISTOGRAM_SIZE  100
1837 // compute total CPU utilization for each timeline and 
1838 // return the number of real msgs
1839 void computeUtilForAll(int* array, int *nReal)
1840 {
1841   double scale = 1.0e3;         // scale to ms
1842
1843   //We measure from 1ms to 5001 ms in steps of 100 ms
1844   int min = 0, step = 1;
1845   int max = min + HISTOGRAM_SIZE*step;
1846   // [min, max: step]  HISTOGRAM_SIZE slots
1847
1848   if (CmiMyPe()==0)
1849     CmiPrintf("computeUtilForAll: min:%d max:%d step:%d scale:%f.\n", min, max, step, scale);
1850   int size = (max-min)/step;
1851   CmiAssert(size == HISTOGRAM_SIZE);
1852   int allmin = -1, allmax = -1;
1853   for(int i=0;i<size;i++) array[i] = 0;
1854
1855   for (int nodeidx=0; nodeidx<cva(numNodes); nodeidx++) {
1856     BgTimeLineRec *tlinerec = cva(nodeinfo)[nodeidx].timelines;
1857     for (int tID=0; tID<cva(bgMach).numWth; tID++) {
1858       int util = (int)(scale*(tlinerec[tID].computeUtil(nReal)));
1859
1860       if (util >= max) { if (util>allmax||allmax==-1) allmax=util; util=max-1;}
1861       if (util < min) { if (util<allmin||allmin==-1) allmin=util; util=min; }
1862       array[(util-min)/step]++;
1863     }
1864   }
1865   if (allmin!=-1 || allmax!=-1)
1866     CmiPrintf("[%d] Warning: computeUtilForAll out of range %f - %f.\n", CmiMyPe(), (allmin==-1)?-1:allmin/scale, (allmax==-1)?-1:allmax/scale);
1867 }
1868
1869 class StatsMessage {
1870   char core[CmiBlueGeneMsgHeaderSizeBytes];
1871 public:
1872   int processCount;
1873   int corrMsgCount;
1874   int realMsgCount;
1875   int maxTimelineLen, minTimelineLen;
1876 };
1877
1878 extern int processCount, corrMsgCount;
1879
1880 static void sendCorrectionStats()
1881 {
1882   int msgSize = sizeof(StatsMessage)+sizeof(int)*HISTOGRAM_SIZE;
1883   StatsMessage *statsMsg = (StatsMessage *)CmiAlloc(msgSize);
1884   statsMsg->processCount = processCount;
1885   statsMsg->corrMsgCount = corrMsgCount;
1886   int numMsgs=0;
1887   int maxTimelineLen=-1, minTimelineLen=CMK_MAXINT;
1888   int totalMem = 0;
1889   if (bgstats_flag) {
1890   for (int nodeidx=0; nodeidx<cva(numNodes); nodeidx++) {
1891     BgTimeLineRec *tlines = cva(nodeinfo)[nodeidx].timelines;
1892     for (int tID=0; tID<cva(bgMach).numWth; tID++) {
1893         BgTimeLineRec &tlinerec = tlines[tID];
1894         int tlen = tlinerec.length();
1895         if (tlen>maxTimelineLen) maxTimelineLen=tlen;
1896         if (tlen<minTimelineLen) minTimelineLen=tlen;
1897         totalMem = tlen*sizeof(BgTimeLog);
1898 //CmiPrintf("[%d node:%d] BgTimeLog: %dK len:%d size of bglog: %d bytes\n", CmiMyPe(), nodeidx, totalMem/1000, tlen, sizeof(BgTimeLog));
1899 #if 0
1900         for (int i=0; i< tlinerec.length(); i++) {
1901           numMsgs += tlinerec[i]->msgs.length();
1902         }
1903 #endif
1904     }
1905   }
1906   computeUtilForAll((int*)(statsMsg+1), &numMsgs);
1907   statsMsg->realMsgCount = numMsgs;
1908   statsMsg->maxTimelineLen = maxTimelineLen;
1909   statsMsg->minTimelineLen = minTimelineLen;
1910   }  // end if
1911
1912   CmiSetHandler(statsMsg, cva(simState).bgStatCollectHandler);
1913   CmiSyncSendAndFree(0, msgSize, statsMsg);
1914 }
1915
1916 // Converse handler for collecting stats
1917 void statsCollectionHandlerFunc(void *msg)
1918 {
1919   static int count=0;
1920   static int pc=0, cc=0, realMsgCount=0;
1921   static int maxTimelineLen=0, minTimelineLen=CMK_MAXINT;
1922   static int *histArray = NULL;
1923   int i;
1924
1925   count++;
1926   if (histArray == NULL) {
1927     histArray = new int[HISTOGRAM_SIZE];
1928     for (i=0; i<HISTOGRAM_SIZE; i++) histArray[i]=0;
1929   }
1930   StatsMessage *m = (StatsMessage *)msg;
1931   pc += m->processCount;
1932   cc += m->corrMsgCount;
1933   realMsgCount += m->realMsgCount;
1934   if (minTimelineLen> m->minTimelineLen) minTimelineLen=m->minTimelineLen;
1935   if (maxTimelineLen< m->maxTimelineLen) maxTimelineLen=m->maxTimelineLen;
1936   int *array = (int *)(m+1);
1937   for (i=0; i<HISTOGRAM_SIZE; i++) histArray[i] += array[i];
1938   if (count == CmiNumPes()) {
1939     if (bgstats_flag) {
1940       CmiPrintf("Total procCount:%d corrMsgCount:%d realMsg:%d timeline:%d-%d\n", pc, cc, realMsgCount, minTimelineLen, maxTimelineLen);
1941       for (i=0; i<HISTOGRAM_SIZE; i++) {
1942         CmiPrintf("%d ", histArray[i]);
1943         if (i%20 == 19) CmiPrintf("\n");
1944       }
1945       CmiPrintf("\n");
1946     }
1947     CsdExitScheduler();
1948   }
1949   CmiFree(msg);
1950 }
1951
1952 // update arrival time from buffer messages
1953 // before start an entry, check message time against buffered timing
1954 // correction message to update to the correct time.
1955 void correctMsgTime(char *msg)
1956 {
1957    if (!correctTimeLog) return;
1958 //   if (CkMsgDoCorrect(msg) == 0) return;
1959
1960    BgMsgID msgId(CmiBgMsgSrcPe(msg), CmiBgMsgID(msg));
1961    CmiInt2 tid = CmiBgMsgThreadID(msg);
1962
1963    bgCorrectionQ &cmsg = cva(nodeinfo)[tMYNODEID].cmsg;
1964    int len = cmsg.length();
1965    for (int i=0; i<len; i++) {
1966      bgCorrectionMsg* m = cmsg[i];
1967      if (msgId == m->msgId && tid == m->tID) {
1968         if (m->tAdjust < 0.0) return;
1969         //CmiPrintf("correctMsgTime from %e to %e\n", CmiBgMsgRecvTime(msg), m->tAdjust);
1970         CmiBgMsgRecvTime(msg) = m->tAdjust;
1971         m->tAdjust = -1.0;       /* invalidate this msg */
1972 //      cmsg.update(i);
1973         stateCounters.corrMsgRCCnt++;
1974         break;
1975      }
1976    }
1977 }
1978
1979
1980 //TODO: write disk bgTraceFiles
1981 static void writeToDisk()
1982 {
1983
1984   char* d = new char[1025];
1985   //Num of simulated procs on this real pe
1986   int numLocalProcs = cva(numNodes)*cva(bgMach).numWth;
1987
1988   const PUP::machineInfo &machInfo = PUP::machineInfo::current();
1989
1990   // write summary file on PE0
1991   if(CmiMyPe()==0){
1992     
1993     sprintf(d, "%sbgTrace", cva(bgMach).traceroot?cva(bgMach).traceroot:""); 
1994     FILE *f2 = fopen(d,"wb");
1995     //Total emulating processors and total target BG processors
1996     int numEmulatingPes=CmiNumPes();
1997     int totalWorkerProcs = BgNumNodes()*cva(bgMach).numWth;
1998
1999     if(f2==NULL) {
2000       CmiPrintf("[%d] Creating trace file %s  failed\n", CmiMyPe(), d);
2001       CmiAbort("BG> Abort");
2002     }
2003     PUP::toDisk p(f2);
2004     p((char *)&machInfo, sizeof(machInfo));
2005     p|totalWorkerProcs;
2006     p|cva(bgMach);
2007     p|numEmulatingPes;
2008     p|bglog_version;
2009     p|CpvAccess(CthResumeBigSimThreadIdx);
2010     
2011     CmiPrintf("[0] Number is numX:%d numY:%d numZ:%d numCth:%d numWth:%d numEmulatingPes:%d totalWorkerProcs:%d bglog_ver:%d\n",cva(bgMach).x,cva(bgMach).y,cva(bgMach).z,cva(bgMach).numCth,cva(bgMach).numWth,numEmulatingPes,totalWorkerProcs,bglog_version);
2012     
2013     fclose(f2);
2014   }
2015   
2016   sprintf(d, "%sbgTrace%d", cva(bgMach).traceroot?cva(bgMach).traceroot:"", CmiMyPe()); 
2017   FILE *f = fopen(d,"wb");
2018  
2019   if(f==NULL)
2020     CmiPrintf("Creating bgTrace%d failed\n",CmiMyPe());
2021   PUP::toDisk p(f);
2022   
2023   p((char *)&machInfo, sizeof(machInfo));       // machine info
2024   p|numLocalProcs;
2025
2026   // CmiPrintf("Timelines are: \n");
2027   int procTablePos = ftell(f);
2028
2029   int *procOffsets = new int[numLocalProcs];
2030   int procTableSize = (numLocalProcs)*sizeof(int);
2031   fseek(f,procTableSize,SEEK_CUR); 
2032
2033   for (int j=0; j<cva(numNodes); j++){
2034     for(int i=0;i<cva(bgMach).numWth;i++){
2035     #if BIGSIM_OUT_OF_CORE
2036         //When isomalloc is used, some events inside BgTimeLineRec are allocated
2037         //through isomalloc. Therefore, the memory containing those events needs
2038         //to be brought back into memory from disk. --Chao Mei          
2039         if(bgUseOutOfCore && CmiMemoryIs(CMI_MEMORY_IS_ISOMALLOC))
2040             bgOutOfCoreSchedule(cva(nodeinfo)[j].threadinfo[i]);
2041      #endif     
2042       BgTimeLineRec &t = cva(nodeinfo)[j].timelines[i];
2043       procOffsets[j*cva(bgMach).numWth + i] = ftell(f);
2044       t.pup(p);
2045     }
2046   }
2047   
2048   fseek(f,procTablePos,SEEK_SET);
2049   p(procOffsets,numLocalProcs);
2050   fclose(f);
2051
2052   CmiPrintf("[%d] Wrote to disk for %d BG nodes. \n", CmiMyPe(), cva(numNodes));
2053 }
2054
2055
2056 /*****************************************************************************
2057              application Converse thread hook
2058 *****************************************************************************/
2059
2060 CpvExtern(int      , CthResumeBigSimThreadIdx);
2061
2062 void CthEnqueueBigSimThread(CthThreadToken* token, int s,
2063                                    int pb,unsigned int *prio)
2064 {
2065 /*
2066   CmiSetHandler(token, CpvAccess(CthResumeNormalThreadIdx));
2067   CsdEnqueueGeneral(token, s, pb, prio);
2068 */
2069 #if CMK_BLUEGENE_THREAD
2070   int x, y, z;
2071   BgGetMyXYZ(&x, &y, &z);
2072   int t = BgGetThreadID();
2073 #else
2074   #error "ERROR HERE"
2075 #endif
2076     // local message into queue
2077   DEBUGM(4, ("In EnqueueBigSimThread method!\n"));
2078
2079   DEBUGM(4, ("token [%p] is added to queue pointing to thread[%p]\n", token, token->thread));
2080   BgSendPacket(x,y,z, t, CpvAccess(CthResumeBigSimThreadIdx), LARGE_WORK, sizeof(CthThreadToken), (char *)token);
2081
2082
2083 CthThread CthSuspendBigSimThread()
2084
2085   return  cta(threadinfo)->me;
2086 }
2087
2088 static void bigsimThreadListener_suspend(struct CthThreadListener *l)
2089 {
2090    // stop timer
2091    stopVTimer();
2092 }
2093
2094 static void bigsimThreadListener_resume(struct CthThreadListener *l)
2095 {
2096    // start timer by reset it
2097    resetVTime();
2098 }
2099
2100
2101 void BgSetStrategyBigSimDefault(CthThread t)
2102
2103   CthSetStrategy(t,
2104                  CthEnqueueBigSimThread,
2105                  CthSuspendBigSimThread);
2106
2107   CthThreadListener *a = new CthThreadListener;
2108   a->suspend = bigsimThreadListener_suspend;
2109   a->resume = bigsimThreadListener_resume;
2110   a->free = NULL;
2111   CthAddListener(t, a);
2112 }
2113
2114 int BgIsMainthread()
2115 {
2116     return tMYNODE == NULL;
2117 }
2118
2119 int BgIsRecord()
2120 {
2121     return cva(bgMach).record == 1;
2122 }
2123
2124 int BgIsReplay()
2125 {
2126     return cva(bgMach).replay != -1;
2127 }
2128
2129 extern "C" void CkReduce(void *msg, int size, CmiReduceMergeFn mergeFn) {
2130   ((workThreadInfo*)cta(threadinfo))->reduceMsg = msg;
2131   //CmiPrintf("Called CkReduce from %d %hd\n",CmiMyPe(),cta(threadinfo)->globalId);
2132   int numLocal = 0, count = 0;
2133   for (int j=0; j<cva(numNodes); j++){
2134     for(int i=0;i<cva(bgMach).numWth;i++){
2135       workThreadInfo *t = (workThreadInfo*)cva(nodeinfo)[j].threadinfo[i];
2136       if (t->reduceMsg == NULL) return; /* we are not yet ready to reduce */
2137       numLocal ++;
2138     }
2139   }
2140   void **msgLocal = (void**)malloc(sizeof(void*)*(numLocal-1));
2141   for (int j=0; j<cva(numNodes); j++){
2142     for(int i=0;i<cva(bgMach).numWth;i++){
2143       workThreadInfo *t = (workThreadInfo*)cva(nodeinfo)[j].threadinfo[i];
2144       if (t == cta(threadinfo)) break;
2145       msgLocal[count++] = t->reduceMsg;
2146       t->reduceMsg = NULL;
2147     }
2148   }
2149   CmiAssert(count==numLocal-1);
2150   msg = mergeFn(&size, msg, msgLocal, numLocal-1);
2151   CmiReduce(msg, size, mergeFn);
2152   CmiPrintf("Called CmiReduce %d\n",CmiMyPe());
2153   for (int i=0; i<numLocal-1; ++i) CmiFree(msgLocal[i]);
2154   free(msgLocal);
2155 }
2156
2157 // for record/replay, to fseek back
2158 void BgRewindRecord()
2159 {
2160   threadInfo *tinfo = cta(threadinfo);
2161   if (tinfo->watcher) tinfo->watcher->rewind();
2162 }
2163