4175bf5dd0baed260b3192d08a64866c8adad56d
[charm.git] / src / langs / bluegene / blue.C
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 /** \file: blue.C -- Converse BlueGene Emulator Code
9  *  Emulator written by Gengbin Zheng, gzheng@uiuc.edu on 2/20/2001
10  */
11  
12 #include <stdio.h>
13 #include <string.h>
14 #include <stdlib.h>
15 #include <math.h>
16 #include <string.h>
17 #include <unistd.h>
18
19 #include "cklists.h"
20 #include "queueing.h"
21 #include "blue.h"
22 #include "blue_impl.h"          // implementation header file
23 //#include "blue_timing.h"      // timing module
24 #include "bigsim_record.h"
25
26 #include "bigsim_ooc.h" //out-of-core module
27 #include "bigsim_debug.h"
28
29 //#define  DEBUGF(x)      //CmiPrintf x;
30
31 #undef DEBUGLEVEL
32 #define DEBUGLEVEL 10
33
34 const double CHARM_OVERHEAD = 0.5E-6;
35
36 /* node level variables */
37 CpvDeclare(nodeInfo*, nodeinfo);                /* represent a bluegene node */
38
39 /* thread level variables */
40 CtvDeclare(threadInfo *, threadinfo);   /* represent a bluegene thread */
41
42 CpvStaticDeclare(CthThread, mainThread);
43
44 /* BG machine parameter */
45 CpvDeclare(BGMach, bgMach);     /* BG machine size description */
46 CpvDeclare(int, numNodes);        /* number of bg nodes on this PE */
47
48 /* emulator node level variables */
49 CpvDeclare(SimState, simState);
50
51 CpvDeclare(int      , CthResumeBigSimThreadIdx);
52
53 static int arg_argc;
54 static char **arg_argv;
55
56 CmiNodeLock initLock;     // used for BnvInitialize
57
58 int _bgSize = 0;                        // short cut of blue gene node size
59 int delayCheckFlag = 1;          // when enabled, only check correction 
60                                         // messages after some interval
61 int programExit = 0;
62
63 static int bgstats_flag = 0;            // flag print stats at end of simulation
64
65 // for debugging log
66 FILE *bgDebugLog;                       // for debugging
67
68 #ifdef CMK_ORIGIN2000
69 extern "C" int start_counters(int e0, int e1);
70 extern "C" int read_counters(int e0, long long *c0, int e1, long long *c1);
71 inline double Count2Time(long long c) { return c*5.e-7; }
72 #elif CMK_HAS_COUNTER_PAPI
73 #include <papi.h>
74 int *papiEvents = NULL;
75 int numPapiEvents;
76 long_long *papiValues = NULL;
77 CmiUInt8 *total_papi_counters = NULL;
78 char **papi_counters_desc = NULL;
79 #define MAX_TOTAL_EVENTS 10
80 #endif
81
82 BgTracingFn userTracingFn = NULL;
83
84 /****************************************************************************
85      little utility functions
86 ****************************************************************************/
87
88 char **BgGetArgv() { return arg_argv; }
89 int    BgGetArgc() { return arg_argc; }
90
91 /***************************************************************************
92      Implementation of the same counter interface currently used for the
93      Origin2000 using PAPI in the underlying layer.
94
95      Because of the difference in counter numbering, it is assumed that
96      the two counters desired are CYCLES and FLOPS and hence the numbers
97      e0 and e1 are ignored.
98
99      start_counters is also not implemented because PAPI does things in
100      a different manner.
101 ****************************************************************************/
102
103 #if CMK_HAS_COUNTER_PAPI
104 /*
105 CmiUInt8 total_ins = 0;
106 CmiUInt8 total_fps = 0;
107 CmiUInt8 total_l1_dcm = 0;
108 CmiUInt8 total_mem_rcy = 0;
109 */
110 int init_counters()
111 {
112     int retval = PAPI_library_init(PAPI_VER_CURRENT);
113
114     if (retval != PAPI_VER_CURRENT) { CmiAbort("PAPI library init error!"); } 
115
116         //a temporary
117         const char *eventDesc[MAX_TOTAL_EVENTS];
118         int lpapiEvents[MAX_TOTAL_EVENTS];
119         
120     numPapiEvents = 0;
121         
122 #define ADD_LOW_LEVEL_EVENT(e, edesc) \
123                 if(PAPI_query_event(e) == PAPI_OK){ \
124                         if(CmiMyPe()==0) printf("PAPI Info:> Event for %s added\n", edesc); \
125                         eventDesc[numPapiEvents] = edesc;       \
126                         lpapiEvents[numPapiEvents++] = e; \
127                 }
128                 
129     // PAPI high level API does not require explicit library intialization
130         eventDesc[numPapiEvents] ="cycles";
131     lpapiEvents[numPapiEvents++] = PAPI_TOT_CYC;
132         
133         
134     /*if (PAPI_query_event(PAPI_FP_INS) == PAPI_OK) {
135       if (CmiMyPe()== 0) printf("PAPI_FP_INS used\n");
136           eventDesc[numPapiEvents] = "floating point instructions";     
137       lpapiEvents[numPapiEvents++] = PAPI_FP_INS;         
138     } else {
139       if (CmiMyPe()== 0) printf("PAPI_TOT_INS used\n");
140           eventDesc[numPapiEvents] = "total instructions";      
141       lpapiEvents[numPapiEvents++] = PAPI_TOT_INS;
142     }
143     */
144         
145         //ADD_LOW_LEVEL_EVENT(PAPI_FP_INS, "floating point instructions");
146         ADD_LOW_LEVEL_EVENT(PAPI_TOT_INS, "total instructions");
147         //ADD_LOW_LEVEL_EVENT(PAPI_L1_DCM, "L1 cache misses");
148         ADD_LOW_LEVEL_EVENT(PAPI_L2_DCM, "L2 data cache misses");       
149         //ADD_LOW_LEVEL_EVENT(PAPI_MEM_RCY, "idle cycles waiting for memory reads");    
150         //ADD_LOW_LEVEL_EVENT(PAPI_TLB_DM, "Data TLB misses");
151         
152         if(numPapiEvents == 0){
153                 CmiAbort("No papi events are defined!\n");
154         }
155         
156         if(numPapiEvents >= MAX_TOTAL_EVENTS){
157                 CmiAbort("Exceed the pre-defined max number of papi events allowed!\n");
158         }
159         
160         papiEvents = new int[numPapiEvents];
161         papiValues = new long_long[numPapiEvents];
162         total_papi_counters = new CmiUInt8[numPapiEvents];
163         papi_counters_desc = new char *[numPapiEvents];
164         for(int i=0; i<numPapiEvents; i++){
165                 papiEvents[i] = lpapiEvents[i];
166                 total_papi_counters[i] = 0;
167                 CmiPrintf("%d: %s\n", i, eventDesc[i]);
168                 papi_counters_desc[i] = new char[strlen(eventDesc[i])+1];
169                 memcpy(papi_counters_desc[i], eventDesc[i], strlen(eventDesc[i]));
170                 papi_counters_desc[i][strlen(eventDesc[i])] = 0;
171         }
172         
173 /*
174     if (PAPI_query_event(PAPI_L1_DCM) == PAPI_OK) {
175       if (CmiMyPe()== 0) printf("PAPI_L1_DCM used\n");
176       papiEvents[numPapiEvents++] = PAPI_L1_DCM;   // L1 cache miss
177     }
178     if (PAPI_query_event(PAPI_TLB_DM) == PAPI_OK) {
179       if (CmiMyPe()== 0) printf("PAPI_TLB_DM used\n");
180       papiEvents[numPapiEvents++] = PAPI_TLB_DM;   // data TLB misses
181     }
182     if (PAPI_query_event(PAPI_MEM_RCY) == PAPI_OK) {
183       if (CmiMyPe()== 0) printf("PAPI_MEM_RCY used\n");
184       papiEvents[numPapiEvents++] = PAPI_MEM_RCY;  // idle cycle waiting for reads
185     }
186 */
187
188     int status = PAPI_start_counters(papiEvents, numPapiEvents);
189     if (status != PAPI_OK) {
190       CmiPrintf("PAPI_start_counters error code: %d\n", status);
191       switch (status) {
192       case PAPI_ENOEVNT:  CmiPrintf("PAPI Error> Hardware Event does not exist\n"); break;
193       case PAPI_ECNFLCT:  CmiPrintf("PAPI Error> Hardware Event exists, but cannot be counted due to counter resource limitations\n"); break;
194       }
195       CmiAbort("Unable to start PAPI counters!\n");
196     }
197 }
198
199 int read_counters(long_long *papiValues, int n) 
200 {
201   // PAPI_read_counters resets the counter, hence it behaves like the perfctr
202   // code for Origin2000
203   int status;
204   status = PAPI_read_counters(papiValues, n);
205   if (status != PAPI_OK) {
206     CmiPrintf("PAPI_read_counters error: %d\n", status);
207     CmiAbort("Failed to read PAPI counters!\n");
208   }
209   
210   /*
211   total_ins += papiValues[0];
212   total_fps += papiValues[1];
213   total_l1_dcm += papiValues[2];
214   total_mem_rcy += papiValues[3];
215   */
216   
217   return 0;
218 }
219
220 inline void CountPapiEvents(){
221   for(int i=0 ;i<numPapiEvents; i++)
222         total_papi_counters[i] += papiValues[i];
223 }
224
225 inline double Count2Time(long_long *papiValues, int n) { 
226   return papiValues[1]*cva(bgMach).fpfactor; 
227 }
228 #endif
229
230 /*****************************************************************************
231      Handler Table, one per thread
232 ****************************************************************************/
233
234 extern "C" void defaultBgHandler(char *null, void *uPtr)
235 {
236   CmiAbort("BG> Invalid Handler called!\n");
237 }
238
239 HandlerTable::HandlerTable()
240 {
241     handlerTableCount = 1;
242     handlerTable = new BgHandlerInfo [MAX_HANDLERS];
243     for (int i=0; i<MAX_HANDLERS; i++) {
244       handlerTable[i].fnPtr = defaultBgHandler;
245       handlerTable[i].userPtr = NULL;
246     }
247 }
248
249 inline int HandlerTable::registerHandler(BgHandler h)
250 {
251     ASSERT(!cva(simState).inEmulatorInit);
252     /* leave 0 as blank, so it can report error luckily */
253     int cur = handlerTableCount++;
254     if (cur >= MAX_HANDLERS)
255       CmiAbort("BG> HandlerID exceed the maximum.\n");
256     handlerTable[cur].fnPtr = (BgHandlerEx)h;
257     handlerTable[cur].userPtr = NULL;
258     return cur;
259 }
260
261 inline void HandlerTable::numberHandler(int idx, BgHandler h)
262 {
263     ASSERT(!cva(simState).inEmulatorInit);
264     if (idx >= handlerTableCount || idx < 1)
265       CmiAbort("BG> HandlerID exceed the maximum!\n");
266     handlerTable[idx].fnPtr = (BgHandlerEx)h;
267     handlerTable[idx].userPtr = NULL;
268 }
269
270 inline void HandlerTable::numberHandlerEx(int idx, BgHandlerEx h, void *uPtr)
271 {
272     ASSERT(!cva(simState).inEmulatorInit);
273     if (idx >= handlerTableCount || idx < 1)
274       CmiAbort("BG> HandlerID exceed the maximum!\n");
275     handlerTable[idx].fnPtr = h;
276     handlerTable[idx].userPtr = uPtr;
277 }
278
279 inline BgHandlerInfo * HandlerTable::getHandle(int handler)
280 {
281 #if 0
282     if (handler >= handlerTableCount) {
283       CmiPrintf("[%d] handler: %d handlerTableCount:%d. \n", tMYNODEID, handler, handlerTableCount);
284       CmiAbort("Invalid handler!");
285     }
286 #endif
287     if (handler >= handlerTableCount || handler<0) return NULL;
288     return &handlerTable[handler];
289 }
290
291 /*****************************************************************************
292       low level API
293 *****************************************************************************/
294
295 int BgRegisterHandler(BgHandler h)
296 {
297   ASSERT(!cva(simState).inEmulatorInit);
298   int cur;
299 #if CMK_BLUEGENE_NODE
300   return tMYNODE->handlerTable.registerHandler(h);
301 #else
302   if (tTHREADTYPE == COMM_THREAD) {
303     return tMYNODE->handlerTable.registerHandler(h);
304   }
305   else {
306     return tHANDLETAB.registerHandler(h);
307   }
308 #endif
309 }
310
311 void BgNumberHandler(int idx, BgHandler h)
312 {
313   ASSERT(!cva(simState).inEmulatorInit);
314 #if CMK_BLUEGENE_NODE
315   tMYNODE->handlerTable.numberHandler(idx,h);
316 #else
317   if (tTHREADTYPE == COMM_THREAD) {
318     tMYNODE->handlerTable.numberHandler(idx, h);
319   }
320   else {
321     tHANDLETAB.numberHandler(idx, h);
322   }
323 #endif
324 }
325
326 void BgNumberHandlerEx(int idx, BgHandlerEx h, void *uPtr)
327 {
328   ASSERT(!cva(simState).inEmulatorInit);
329 #if CMK_BLUEGENE_NODE
330   tMYNODE->handlerTable.numberHandlerEx(idx,h,uPtr);
331 #else
332   if (tTHREADTYPE == COMM_THREAD) {
333     tMYNODE->handlerTable.numberHandlerEx(idx,h,uPtr);
334   }
335   else {
336     tHANDLETAB.numberHandlerEx(idx,h,uPtr);
337   }
338 #endif
339 }
340
341 /*****************************************************************************
342       BG Timing Functions
343 *****************************************************************************/
344
345 void resetVTime()
346 {
347   /* reset start time */
348   int timingMethod = cva(bgMach).timingMethod;
349   if (timingMethod == BG_WALLTIME) {
350     double ct = BG_TIMER();
351     if (tTIMERON) CmiAssert(ct >= tSTARTTIME);
352     tSTARTTIME = ct;
353   }
354   else if (timingMethod == BG_ELAPSE)
355     tSTARTTIME = tCURRTIME;
356 #ifdef CMK_ORIGIN2000
357   else if (timingMethod == BG_COUNTER) {
358     if (start_counters(0, 21) <0) {
359       perror("start_counters");;
360     }
361   }
362 #elif CMK_HAS_COUNTER_PAPI
363   else if (timingMethod == BG_COUNTER) {
364     // do a fake read to reset the counters. It would be more efficient
365     // to use the low level API, but that would be a lot more code to
366     // write for now.
367     if (read_counters(papiValues, numPapiEvents) < 0) perror("read_counters");
368   }
369 #endif
370 }
371
372 void startVTimer()
373 {
374   CmiAssert(tTIMERON == 0);
375   resetVTime();
376   tTIMERON = 1;
377 }
378
379 // should be used only when BG_WALLTIME
380 static inline void advanceTime(double inc)
381 {
382   if (BG_ABS(inc) < 1e-10) inc = 0.0;    // ignore floating point errors
383   if (inc < 0.0) return;
384   CmiAssert(inc>=0.0);
385   inc *= cva(bgMach).cpufactor;
386   tCURRTIME += inc;
387   CmiAssert(tTIMERON==1);
388 }
389
390 void stopVTimer()
391 {
392   int k;
393 #if 0
394   if (tTIMERON != 1) {
395     CmiAbort("stopVTimer called without startVTimer!\n");
396   }
397   CmiAssert(tTIMERON == 1);
398 #else
399   if (tTIMERON == 0) return;         // already stopped
400 #endif
401   const int timingMethod = cva(bgMach).timingMethod;
402   if (timingMethod == BG_WALLTIME) {
403     const double tp = BG_TIMER();
404     double inc = tp-tSTARTTIME;
405     advanceTime(inc-cva(bgMach).timercost);
406 //    tSTARTTIME = BG_TIMER();  // skip the above time
407   }
408   else if (timingMethod == BG_ELAPSE) {
409     // if no bgelapse called, assume it takes 1us
410     if (tCURRTIME-tSTARTTIME < 1E-9) {
411 //      tCURRTIME += 1e-6;
412     }
413   }
414   else if (timingMethod == BG_COUNTER)  {
415 #if CMK_ORIGIN2000
416     long long c0, c1;
417     if (read_counters(0, &c0, 21, &c1) < 0) perror("read_counters");
418     tCURRTIME += Count2Time(c1);
419 #elif CMK_HAS_COUNTER_PAPI
420     if (read_counters(papiValues, numPapiEvents) < 0) perror("read_counters");
421     CountPapiEvents();
422     tCURRTIME += Count2Time(papiValues, numPapiEvents);
423 #endif
424   }
425   tTIMERON = 0;
426 }
427
428 double BgGetTime()
429 {
430 #if 1
431   const int timingMethod = cva(bgMach).timingMethod;
432   if (timingMethod == BG_WALLTIME) {
433     /* accumulate time since last starttime, and reset starttime */
434     if (tTIMERON) {
435       const double tp2= BG_TIMER();
436       double &startTime = tSTARTTIME;
437       double inc = tp2 - startTime;
438       advanceTime(inc-cva(bgMach).timercost);
439       startTime = BG_TIMER();
440     }
441     return tCURRTIME;
442   }
443   else if (timingMethod == BG_ELAPSE) {
444     return tCURRTIME;
445   }
446   else if (timingMethod == BG_COUNTER) {
447     if (tTIMERON) {
448 #if CMK_ORIGIN2000
449       long long c0, c1;
450       if (read_counters(0, &c0, 21, &c1) <0) perror("read_counters");;
451       tCURRTIME += Count2Time(c1);
452       if (start_counters(0, 21)<0) perror("start_counters");;
453 #elif CMK_HAS_COUNTER_PAPI
454     if (read_counters(papiValues, numPapiEvents) < 0) perror("read_counters");
455     tCURRTIME += Count2Time(papiValues, numPapiEvents);
456 #endif
457     }
458     return tCURRTIME;
459   }
460   else 
461     CmiAbort("Unknown Timing Method.");
462   return -1;
463 #else
464   /* sometime I am interested in real wall time */
465   tCURRTIME = CmiWallTimer();
466   return tCURRTIME;
467 #endif
468 }
469
470 // moved to blue_logs.C
471 double BgGetCurTime()
472 {
473   ASSERT(tTHREADTYPE == WORK_THREAD);
474   return tCURRTIME;
475 }
476
477 extern "C" 
478 void BgElapse(double t)
479 {
480 //  ASSERT(tTHREADTYPE == WORK_THREAD);
481   if (cva(bgMach).timingMethod == BG_ELAPSE)
482     tCURRTIME += t;
483 }
484
485 // advance virtual timer no matter what scheme is used
486 extern "C" 
487 void BgAdvance(double t)
488 {
489 //  ASSERT(tTHREADTYPE == WORK_THREAD);
490   tCURRTIME += t;
491 }
492
493 /* BG API Func
494  * called by a communication thread to test if poll data 
495  * in the node's INBUFFER for its own queue 
496  */
497 char * getFullBuffer()
498 {
499   /* I must be a communication thread */
500   if (tTHREADTYPE != COMM_THREAD) 
501     CmiAbort("GetFullBuffer called by a non-communication thread!\n");
502
503   return tMYNODE->getFullBuffer();
504 }
505
506 /**  BG API Func
507  * called by a Converse handler or sendPacket()
508  * add message msgPtr to a bluegene node's inbuffer queue 
509  */
510 extern "C"
511 void addBgNodeInbuffer(char *msgPtr, int lnodeID)
512 {
513 #if CMK_ERROR_CHECKING
514   if (lnodeID >= cva(numNodes)) CmiAbort("NodeID is out of range!");
515 #endif
516   nodeInfo &nInfo = cva(nodeinfo)[lnodeID];
517
518   //printf("Adding a msg %p to local node %d and its thread %d\n", msgPtr, lnodeID, CmiBgMsgThreadID(msgPtr));  
519         
520   nInfo.addBgNodeInbuffer(msgPtr);
521 }
522
523 /** BG API Func 
524  *  called by a comm thread
525  *  add a message to a thread's affinity queue in same node 
526  */
527 void addBgThreadMessage(char *msgPtr, int threadID)
528 {
529 #if CMK_ERROR_CHECKING
530   if (!cva(bgMach).isWorkThread(threadID)) CmiAbort("ThreadID is out of range!");
531 #endif
532   workThreadInfo *tInfo = (workThreadInfo *)tMYNODE->threadinfo[threadID];
533   tInfo->addAffMessage(msgPtr);
534 }
535
536 /** BG API Func 
537  *  called by a comm thread, add a message to a node's non-affinity queue 
538  */
539 void addBgNodeMessage(char *msgPtr)
540 {
541   tMYNODE->addBgNodeMessage(msgPtr);
542 }
543
544 void BgEnqueue(char *msg)
545 {
546 #if 0
547   ASSERT(tTHREADTYPE == WORK_THREAD);
548   workThreadInfo *tinfo = (workThreadInfo *)cta(threadinfo);
549   tinfo->addAffMessage(msg);
550 #else
551   nodeInfo *myNode = cta(threadinfo)->myNode;
552   addBgNodeInbuffer(msg, myNode->id);
553 #endif
554 }
555
556 /** BG API Func 
557  *  check if inBuffer on this node has msg available
558  */
559 int checkReady()
560 {
561   if (tTHREADTYPE != COMM_THREAD)
562     CmiAbort("checkReady called by a non-communication thread!\n");
563   return !tINBUFFER.isEmpty();
564 }
565
566 /* handler to process the msg */
567 void msgHandlerFunc(char *msg)
568 {
569   /* bgmsg is CmiMsgHeaderSizeBytes offset of original message pointer */
570   int gnodeID = CmiBgMsgNodeID(msg);
571   if (gnodeID >= 0) {
572 #if CMK_ERROR_CHECKING
573     if (nodeInfo::Global2PE(gnodeID) != CmiMyPe())
574       CmiAbort("msgHandlerFunc received wrong message!");
575 #endif
576     int lnodeID = nodeInfo::Global2Local(gnodeID);
577     if (cva(bgMach).inReplayMode()) {
578       int x, y, z;
579       int node;
580       if (cva(bgMach).replay != -1) {
581         node = cva(bgMach).replay;
582         node = node / cva(bgMach).numWth;
583       }
584       if (cva(bgMach).replaynode != -1) {
585         node = cva(bgMach).replaynode;
586       }
587       BgGetXYZ(node, &x, &y, &z);
588       if (nodeInfo::XYZ2Local(x,y,z) != lnodeID) return;
589       else lnodeID = 0;
590     }
591     addBgNodeInbuffer(msg, lnodeID);
592   }
593   else {
594     CmiAbort("Invalid message!");
595   }
596 }
597
598 /* Converse handler for node level broadcast message */
599 void nodeBCastMsgHandlerFunc(char *msg)
600 {
601   /* bgmsg is CmiMsgHeaderSizeBytes offset of original message pointer */
602   int gnodeID = CmiBgMsgNodeID(msg);
603   CmiInt2 threadID = CmiBgMsgThreadID(msg);
604   int lnodeID;
605
606   if (gnodeID < -1) {
607     gnodeID = - (gnodeID+100);
608     if (cva(bgMach).replaynode != -1) {
609       if (gnodeID == cva(bgMach).replaynode)
610           lnodeID = 0;
611       else
612           lnodeID = -1;
613     }
614     else if (nodeInfo::Global2PE(gnodeID) == CmiMyPe())
615       lnodeID = nodeInfo::Global2Local(gnodeID);
616     else
617       lnodeID = -1;
618   }
619   else {
620     ASSERT(gnodeID == -1);
621     lnodeID = gnodeID;
622   }
623   // broadcast except lnodeID:threadId
624   int len = CmiBgMsgLength(msg);
625   int count = 0;
626   for (int i=0; i<cva(numNodes); i++)
627   {
628     if (i==lnodeID) continue;
629     char *dupmsg;
630     if (count == 0) dupmsg = msg;
631     else dupmsg = CmiCopyMsg(msg, len);
632     DEBUGF(("addBgNodeInbuffer to %d\n", i));
633     CmiBgMsgNodeID(dupmsg) = nodeInfo::Local2Global(i);         // updated
634     addBgNodeInbuffer(dupmsg, i);
635     count ++;
636   }
637   if (count == 0) CmiFree(msg);
638 }
639
640 // clone a msg, only has a valid header, plus a pointer to the real msg
641 char *BgCloneMsg(char *msg)
642 {
643   int size = CmiBlueGeneMsgHeaderSizeBytes + sizeof(char *);
644   char *dupmsg = (char *)CmiAlloc(size);
645   memcpy(dupmsg, msg, CmiBlueGeneMsgHeaderSizeBytes);
646   *(char **)(dupmsg + CmiBlueGeneMsgHeaderSizeBytes) = msg;
647   CmiBgMsgRefCount(msg) ++;
648   CmiBgMsgFlag(dupmsg) = BG_CLONE;
649   return dupmsg;
650 }
651
652 // expand the cloned msg to the full size msg
653 char *BgExpandMsg(char *msg)
654 {
655   char *origmsg = *(char **)(msg + CmiBlueGeneMsgHeaderSizeBytes);
656   int size = CmiBgMsgLength(origmsg);
657   char *dupmsg = (char *)CmiAlloc(size);
658   memcpy(dupmsg, msg, CmiBlueGeneMsgHeaderSizeBytes);
659   memcpy(dupmsg+CmiBlueGeneMsgHeaderSizeBytes, origmsg+CmiBlueGeneMsgHeaderSizeBytes, size-CmiBlueGeneMsgHeaderSizeBytes);
660   CmiFree(msg);
661   CmiBgMsgRefCount(origmsg) --;
662   if (CmiBgMsgRefCount(origmsg) == 0) CmiFree(origmsg);
663   return dupmsg;
664 }
665
666 /* Converse handler for thread level broadcast message */
667 void threadBCastMsgHandlerFunc(char *msg)
668 {
669   /* bgmsg is CmiMsgHeaderSizeBytes offset of original message pointer */
670   int gnodeID = CmiBgMsgNodeID(msg);
671   CmiInt2 threadID = CmiBgMsgThreadID(msg);
672   if (cva(bgMach).replay != -1) {
673     if (gnodeID < -1) {
674       gnodeID = - (gnodeID+100);
675       if (gnodeID == cva(bgMach).replay/cva(bgMach).numWth && threadID == cva(bgMach).replay%cva(bgMach).numWth)
676         return;
677     }
678     CmiBgMsgThreadID(msg) = 0;
679     DEBUGF(("[%d] addBgNodeInbuffer to %d tid:%d\n", CmiMyPe(), i, j));
680     addBgNodeInbuffer(msg, 0);
681     return;
682   }
683   int lnodeID;
684   if (gnodeID < -1) {
685       gnodeID = - (gnodeID+100);
686       if (cva(bgMach).replaynode != -1) {
687         if (gnodeID == cva(bgMach).replaynode)
688           lnodeID = 0;
689         else
690           lnodeID = -1;
691       }
692       else if (nodeInfo::Global2PE(gnodeID) == CmiMyPe())
693         lnodeID = nodeInfo::Global2Local(gnodeID);
694       else
695         lnodeID = -1;
696       CmiAssert(threadID != ANYTHREAD);
697   }
698   else {
699     ASSERT(gnodeID == -1);
700     lnodeID = gnodeID;
701   }
702   // broadcast except nodeID:threadId
703   int len = CmiBgMsgLength(msg);
704   // optimization needed if the message size is big
705   // making duplications can easily run out of memory
706   int bigOpt = (len > 4096);
707   for (int i=0; i<cva(numNodes); i++)
708   {
709       for (int j=0; j<cva(bgMach).numWth; j++) {
710         if (i==lnodeID && j==threadID) continue;
711         // for big message, clone a message token instead of a real msg
712         char *dupmsg = bigOpt? BgCloneMsg(msg) : CmiCopyMsg(msg, len);
713         CmiBgMsgNodeID(dupmsg) = nodeInfo::Local2Global(i);
714         CmiBgMsgThreadID(dupmsg) = j;
715         DEBUGF(("[%d] addBgNodeInbuffer to %d tid:%d\n", CmiMyPe(), i, j));
716         addBgNodeInbuffer(dupmsg, i);
717       }
718   }
719   // for big message, will free after all tokens are done
720   if (!bigOpt) CmiFree(msg);
721 }
722
723 /**
724  *              BG Messaging Functions
725  */
726
727 static inline double MSGTIME(int ox, int oy, int oz, int nx, int ny, int nz, int bytes)
728 {
729   return cva(bgMach).network->latency(ox, oy, oz, nx, ny, nz, bytes);
730 }
731
732 /**
733  *   a simple message streaming on demand and special purpose
734  *   user call  BgStartStreaming() and BgEndStreaming()
735  *   each worker thread call one send, and all sends are sent
736  *   via multiplesend at the end
737  */
738
739 static int bg_streaming = 0;
740
741 class BgStreaming {
742 public:
743   char **streamingMsgs;
744   int  *streamingMsgSizes;
745   int count;
746   int totalWorker;
747   int pe;
748 public:
749   BgStreaming() {
750     streamingMsgs = NULL;
751     streamingMsgSizes = NULL;
752     count = 0;
753     totalWorker = 0;
754     pe = -1;
755   }
756   ~BgStreaming() {
757     if (streamingMsgs) {
758       delete [] streamingMsgs;
759       delete [] streamingMsgSizes;
760     }
761   }
762   void init(int nNodes) {
763     totalWorker = nNodes * BgGetNumWorkThread();
764     streamingMsgs = new char *[totalWorker];
765     streamingMsgSizes = new int [totalWorker];
766   }
767   void depositMsg(int p, int size, char *m) {
768     streamingMsgs[count] = m;
769     streamingMsgSizes[count] = size;
770     count ++;
771     if (pe == -1) pe = p;
772     else CmiAssert(pe == p);
773     if (count == totalWorker) {
774       // CkPrintf("streaming send\n");
775       CmiMultipleSend(pe, count, streamingMsgSizes, streamingMsgs);
776       for (int i=0; i<count; i++) CmiFree(streamingMsgs[i]);
777       pe = -1;
778       count = 0;
779     }
780   }
781 };
782
783 BgStreaming bgstreaming;
784
785 void BgStartStreaming()
786 {
787   bg_streaming = 1;
788 }
789
790 void BgEndStreaming()
791 {
792   bg_streaming = 0;
793 }
794
795 void CmiSendPacketWrapper(int pe, int msgSize,char *msg, int streaming)
796 {
797   if (streaming && pe != CmiMyPe())
798     bgstreaming.depositMsg(pe, msgSize, msg);
799   else
800     CmiSyncSendAndFree(pe, msgSize, msg);
801 }
802
803
804 void CmiSendPacket(int x, int y, int z, int msgSize,char *msg)
805 {
806 //  CmiSyncSendAndFree(nodeInfo::XYZ2RealPE(x,y,z),msgSize,(char *)msg);
807 #if !DELAY_SEND
808   const int pe = nodeInfo::XYZ2RealPE(x,y,z);
809   CmiSendPacketWrapper(pe, msgSize, msg, bg_streaming);
810 #else
811   if (!correctTimeLog) {
812     const int pe = nodeInfo::XYZ2RealPE(x,y,z);
813     CmiSendPacketWrapper(pe, msgSize, msg, bg_streaming);
814   }
815   // else messages are kept in the log (MsgEntry), and only will be sent
816   // after timing correction has done on that log.
817   // TODO: streaming has no effect if time correction is on.
818 #endif
819 }
820
821 /* send will copy data to msg buffer */
822 /* user data is not free'd in this routine, user can reuse the data ! */
823 void sendPacket_(nodeInfo *myNode, int x, int y, int z, int threadID, int handlerID, WorkType type, int numbytes, char* sendmsg, int local)
824 {
825   //CmiPrintStackTrace(0);
826   double sendT = BgGetTime();
827
828   double latency;
829   CmiSetHandler(sendmsg, cva(simState).msgHandler);
830   CmiBgMsgNodeID(sendmsg) = nodeInfo::XYZ2Global(x,y,z);
831   CmiBgMsgThreadID(sendmsg) = threadID;
832   CmiBgMsgHandle(sendmsg) = handlerID;
833   CmiBgMsgType(sendmsg) = type;
834   CmiBgMsgLength(sendmsg) = numbytes;
835   CmiBgMsgFlag(sendmsg) = 0;
836   CmiBgMsgRefCount(sendmsg) = 0;
837   if (local) {
838     if (correctTimeLog) BgAdvance(CHARM_OVERHEAD);
839     latency = 0.0;
840   }
841   else {
842     if (correctTimeLog) BgAdvance(cva(bgMach).network->alphacost());
843     latency = MSGTIME(myNode->x, myNode->y, myNode->z, x,y,z, numbytes);
844     CmiAssert(latency >= 0);
845   }
846   CmiBgMsgRecvTime(sendmsg) = latency + sendT;
847   
848   // timing
849   BG_ADDMSG(sendmsg, CmiBgMsgNodeID(sendmsg), threadID, sendT, local, 1);
850
851   //static int addCnt=1; //for debugging only
852   //DEBUGM(4, ("N[%d] add a msg (handler=%d | cnt=%d | len=%d | type=%d | node id:%d\n", BgMyNode(), handlerID, addCnt, numbytes, type, CmiBgMsgNodeID(sendmsg)));  
853   //addCnt++;
854
855   if (local){
856       /* Here local refers to the fact that msg is sent to the processor itself
857        * therefore, we just add this msg to the thread itself
858        */
859       //addBgThreadMessage(sendmsg,threadID);
860       addBgNodeInbuffer(sendmsg, myNode->id);
861   }    
862   else
863     CmiSendPacket(x, y, z, numbytes, sendmsg);
864
865   // bypassing send time
866   resetVTime();
867 }
868
869 /* broadcast will copy data to msg buffer */
870 static inline void nodeBroadcastPacketExcept_(int node, CmiInt2 threadID, int handlerID, WorkType type, int numbytes, char* sendmsg)
871 {
872   double sendT = BgGetTime();
873
874   nodeInfo *myNode = cta(threadinfo)->myNode;
875   CmiSetHandler(sendmsg, cva(simState).nBcastMsgHandler);
876   if (node >= 0)
877     CmiBgMsgNodeID(sendmsg) = -node-100;
878   else
879     CmiBgMsgNodeID(sendmsg) = node;
880   CmiBgMsgThreadID(sendmsg) = threadID; 
881   CmiBgMsgHandle(sendmsg) = handlerID;  
882   CmiBgMsgType(sendmsg) = type; 
883   CmiBgMsgLength(sendmsg) = numbytes;
884   CmiBgMsgFlag(sendmsg) = 0;
885   CmiBgMsgRefCount(sendmsg) = 0;
886   /* FIXME */
887   CmiBgMsgRecvTime(sendmsg) = MSGTIME(myNode->x, myNode->y, myNode->z, 0,0,0, numbytes) + sendT;
888
889   // timing
890   // FIXME
891   BG_ADDMSG(sendmsg, CmiBgMsgNodeID(sendmsg), threadID, sendT, 0, 1);
892
893   DEBUGF(("[%d]CmiSyncBroadcastAllAndFree node: %d\n", BgMyNode(), node));
894 #if DELAY_SEND
895   if (!correctTimeLog)
896 #endif
897   CmiSyncBroadcastAllAndFree(numbytes,sendmsg);
898
899   resetVTime();
900 }
901
902 /* broadcast will copy data to msg buffer */
903 static inline void threadBroadcastPacketExcept_(int node, CmiInt2 threadID, int handlerID, WorkType type, int numbytes, char* sendmsg)
904 {
905   CmiSetHandler(sendmsg, cva(simState).tBcastMsgHandler);       
906   if (node >= 0)
907     CmiBgMsgNodeID(sendmsg) = -node-100;
908   else
909     CmiBgMsgNodeID(sendmsg) = node;
910   CmiBgMsgThreadID(sendmsg) = threadID; 
911   CmiBgMsgHandle(sendmsg) = handlerID;  
912   CmiBgMsgType(sendmsg) = type; 
913   CmiBgMsgLength(sendmsg) = numbytes;
914   CmiBgMsgFlag(sendmsg) = 0;
915   CmiBgMsgRefCount(sendmsg) = 0;
916   /* FIXME */
917   if (correctTimeLog) BgAdvance(cva(bgMach).network->alphacost());
918   double sendT = BgGetTime();
919   CmiBgMsgRecvTime(sendmsg) = sendT;    
920
921   // timing
922 #if 0
923   if (node == BG_BROADCASTALL) {
924     for (int i=0; i<_bgSize; i++) {
925       for (int j=0; j<cva(numWth); j++) {
926         BG_ADDMSG(sendmsg, node);
927       }
928     }
929   }
930   else {
931     CmiAssert(node >= 0);
932     BG_ADDMSG(sendmsg, (node+100));
933   }
934 #else
935   // FIXME
936   BG_ADDMSG(sendmsg, CmiBgMsgNodeID(sendmsg), threadID, sendT, 0, 1);
937 #endif
938
939   DEBUGF(("[%d]CmiSyncBroadcastAllAndFree node: %d tid:%d recvT:%f\n", BgMyNode(), node, threadID, CmiBgMsgRecvTime(sendmsg)));
940 #if DELAY_SEND
941   if (!correctTimeLog)
942 #endif
943   CmiSyncBroadcastAllAndFree(numbytes,sendmsg);
944
945   resetVTime();
946 }
947
948
949 /* sendPacket to route */
950 /* this function can be called by any thread */
951 void BgSendNonLocalPacket(nodeInfo *myNode, int x, int y, int z, int threadID, int handlerID, WorkType type, int numbytes, char * data)
952 {
953   if (cva(bgMach).inReplayMode()) return;     // replay mode, no outgoing msg
954
955 #if CMK_ERROR_CHECKING
956   if (x<0 || y<0 || z<0 || x>=cva(bgMach).x || y>=cva(bgMach).y || z>=cva(bgMach).z) {
957     CmiPrintf("Trying to send packet to a nonexisting node: (%d %d %d)!\n", x,y,z);
958     CmiAbort("Abort!\n");
959   }
960 #endif
961
962   sendPacket_(myNode, x, y, z, threadID, handlerID, type, numbytes, data, 0);
963 }
964
965 static void _BgSendLocalPacket(nodeInfo *myNode, int threadID, int handlerID, WorkType type, int numbytes, char * data)
966 {
967   if (cva(bgMach).replay!=-1) { // replay mode
968     int t = cva(bgMach).replay%BgGetNumWorkThread();
969     if (t == threadID) threadID = 0;
970     else return;
971   }
972
973   sendPacket_(myNode, myNode->x, myNode->y, myNode->z, threadID, handlerID, type, numbytes, data, 1);
974 }
975
976 void BgSendLocalPacket(int threadID, int handlerID, WorkType type,
977                        int numbytes, char* data)
978 {
979   nodeInfo *myNode = cta(threadinfo)->myNode;
980
981   if (cva(bgMach).replay!=-1) {     // replay mode
982     threadID = 0;
983     CmiAssert(threadID != -1);
984   }
985
986   _BgSendLocalPacket(myNode, threadID, handlerID, type, numbytes, data);
987 }
988
989 /* wrapper of the previous two functions */
990 void BgSendPacket(int x, int y, int z, int threadID, int handlerID, WorkType type, int numbytes, char * data)
991 {
992   nodeInfo *myNode = cta(threadinfo)->myNode;
993   if (myNode->x == x && myNode->y == y && myNode->z == z)
994     _BgSendLocalPacket(myNode,threadID, handlerID, type, numbytes, data);
995   else
996     BgSendNonLocalPacket(myNode,x,y,z,threadID,handlerID, type, numbytes, data);
997 }
998
999 void BgBroadcastPacketExcept(int node, CmiInt2 threadID, int handlerID, WorkType type, int numbytes, char * data)
1000 {
1001   nodeBroadcastPacketExcept_(node, threadID, handlerID, type, numbytes, data);
1002 }
1003
1004 void BgBroadcastAllPacket(int handlerID, WorkType type, int numbytes, char * data)
1005 {
1006   nodeBroadcastPacketExcept_(BG_BROADCASTALL, ANYTHREAD, handlerID, type, numbytes, data);
1007 }
1008
1009 void BgThreadBroadcastPacketExcept(int node, CmiInt2 threadID, int handlerID, WorkType type, int numbytes, char * data)
1010 {
1011   if (cva(bgMach).replay!=-1) return;    // replay mode
1012   else if (cva(bgMach).replaynode!=-1) {    // replay mode
1013     //if (node!=-1 && node == cva(bgMach).replaynode/cva(bgMach).numWth)
1014     //  return;
1015   }
1016   threadBroadcastPacketExcept_(node, threadID, handlerID, type, numbytes, data);
1017 }
1018
1019 void BgThreadBroadcastAllPacket(int handlerID, WorkType type, int numbytes, char * data)
1020 {
1021   if (cva(bgMach).replay!=-1) {      // replay mode, send only to itself
1022     int t = cva(bgMach).replay%BgGetNumWorkThread();
1023     BgSendLocalPacket(t, handlerID, type, numbytes, data);
1024     return;
1025   }
1026   threadBroadcastPacketExcept_(BG_BROADCASTALL, ANYTHREAD, handlerID, type, numbytes, data);
1027 }
1028
1029 /**
1030  send a msg to a list of processors (processors represented in global seq #
1031 */
1032 void BgSyncListSend(int npes, int *pes, int handlerID, WorkType type, int numbytes, char *msg)
1033 {
1034   nodeInfo *myNode = cta(threadinfo)->myNode;
1035
1036   CmiSetHandler(msg, cva(simState).msgHandler);
1037   CmiBgMsgHandle(msg) = handlerID;
1038   CmiBgMsgType(msg) = type;
1039   CmiBgMsgLength(msg) = numbytes;
1040   CmiBgMsgFlag(msg) = 0;
1041   CmiBgMsgRefCount(msg) = 0;
1042
1043   if (correctTimeLog) BgAdvance(cva(bgMach).network->alphacost());
1044
1045   double now = BgGetTime();
1046
1047   // send one by one
1048   for (int i=0; i<npes; i++)
1049   {
1050     int local = 0;
1051     int x,y,z,t;
1052     int pe = pes[i];
1053     int node;
1054 #if CMK_BLUEGENE_NODE
1055     CmiAbort("Not implemented yet!");
1056 #else
1057     t = pe%BgGetNumWorkThread();
1058     node = pe/BgGetNumWorkThread();
1059     BgGetXYZ(node, &x, &y, &z);
1060 #endif
1061
1062     char *sendmsg = CmiCopyMsg(msg, numbytes);
1063     CmiBgMsgNodeID(sendmsg) = nodeInfo::XYZ2Global(x,y,z);
1064     CmiBgMsgThreadID(sendmsg) = t;
1065     double latency = MSGTIME(myNode->x, myNode->y, myNode->z, x,y,z, numbytes);
1066     CmiAssert(latency >= 0);
1067     CmiBgMsgRecvTime(sendmsg) = latency + now;
1068
1069     if (myNode->x == x && myNode->y == y && myNode->z == z) local = 1;
1070
1071     // timing and make sure all msgID are the same
1072     if (i!=0) CpvAccess(msgCounter) --;
1073     BG_ADDMSG(sendmsg, CmiBgMsgNodeID(sendmsg), t, now, local, i==0?npes:-1);
1074
1075 #if 0
1076     BgSendPacket(x, y, z, t, handlerID, type, numbytes, sendmsg);
1077 #else
1078     if (myNode->x == x && myNode->y == y && myNode->z == z)
1079       addBgNodeInbuffer(sendmsg, myNode->id);
1080     else {
1081       if (cva(bgMach).inReplayMode()) continue;  // replay mode, no outgoing msg
1082       CmiSendPacket(x, y, z, numbytes, sendmsg);
1083     }
1084 #endif
1085   }
1086
1087   CmiFree(msg);
1088
1089   resetVTime();
1090 }
1091
1092 /*****************************************************************************
1093       BG node level API - utilities
1094 *****************************************************************************/
1095
1096 /* must be called in a communication or worker thread */
1097 void BgGetMyXYZ(int *x, int *y, int *z)
1098 {
1099   ASSERT(!cva(simState).inEmulatorInit);
1100   *x = tMYX; *y = tMYY; *z = tMYZ;
1101 }
1102
1103 void BgGetXYZ(int seq, int *x, int *y, int *z)
1104 {
1105   nodeInfo::Global2XYZ(seq, x, y, z);
1106 }
1107
1108 void BgGetSize(int *sx, int *sy, int *sz)
1109 {
1110   cva(bgMach).getSize(sx, sy, sz);
1111 }
1112
1113 int BgTraceProjectionOn(int pe)
1114 {
1115   return cva(bgMach).traceProjections(pe);
1116 }
1117
1118 /* return the total number of Blue gene nodes */
1119 int BgNumNodes()
1120 {
1121   return _bgSize;
1122 }
1123
1124 void BgSetNumNodes(int x)
1125 {
1126   _bgSize = x;
1127 }
1128
1129 /* can only called in emulatorinit */
1130 void BgSetSize(int sx, int sy, int sz)
1131 {
1132   ASSERT(cva(simState).inEmulatorInit);
1133   cva(bgMach).setSize(sx, sy, sz);
1134 }
1135
1136 /* return number of bg nodes on this emulator node */
1137 int BgNodeSize()
1138 {
1139   ASSERT(!cva(simState).inEmulatorInit);
1140   return cva(numNodes);
1141 }
1142
1143 /* return the bg node ID (local array index) */
1144 int BgMyRank()
1145 {
1146 #if CMK_ERROR_CHECKING
1147   if (tMYNODE == NULL) CmiAbort("Calling BgMyRank in the main thread!");
1148 #endif
1149   ASSERT(!cva(simState).inEmulatorInit);
1150   return tMYNODEID;
1151 }
1152
1153 /* return my serialed blue gene node number */
1154 int BgMyNode()
1155 {
1156 #if CMK_ERROR_CHECKING
1157   if (tMYNODE == NULL) CmiAbort("Calling BgMyNode in the main thread!");
1158 #endif
1159   return nodeInfo::XYZ2Global(tMYX, tMYY, tMYZ);
1160 }
1161
1162 /* return a real processor number from a bg node */
1163 int BgNodeToRealPE(int node)
1164 {
1165   return nodeInfo::Global2PE(node);
1166 }
1167
1168 // thread ID on a BG node
1169 int BgGetThreadID()
1170 {
1171   ASSERT(tTHREADTYPE == WORK_THREAD || tTHREADTYPE == COMM_THREAD);
1172 //  if (cva(bgMach).numWth == 1) return 0;   // accessing ctv is expensive
1173   return tMYID;
1174 }
1175
1176 void BgSetThreadID(int x)
1177 {
1178   tMYID = x;
1179 }
1180
1181 int BgGetGlobalThreadID()
1182 {
1183   ASSERT(tTHREADTYPE == WORK_THREAD || tTHREADTYPE == COMM_THREAD);
1184   return nodeInfo::Local2Global(tMYNODEID)*(cva(bgMach).numTh())+tMYID;
1185   //return tMYGLOBALID;
1186 }
1187
1188 int BgGetGlobalWorkerThreadID()
1189 {
1190   ASSERT(tTHREADTYPE == WORK_THREAD);
1191 //  return nodeInfo::Local2Global(tMYNODEID)*cva(bgMach).numWth+tMYID;
1192   return tMYGLOBALID;
1193 }
1194
1195 void BgSetGlobalWorkerThreadID(int pe)
1196 {
1197   ASSERT(tTHREADTYPE == WORK_THREAD);
1198 //  return nodeInfo::Local2Global(tMYNODEID)*cva(bgMach).numWth+tMYID;
1199   tMYGLOBALID = pe;
1200 }
1201
1202 char *BgGetNodeData()
1203 {
1204   return tUSERDATA;
1205 }
1206
1207 void BgSetNodeData(char *data)
1208 {
1209   ASSERT(!cva(simState).inEmulatorInit);
1210   tUSERDATA = data;
1211 }
1212
1213 int BgGetNumWorkThread()
1214 {
1215   return cva(bgMach).numWth;
1216 }
1217
1218 void BgSetNumWorkThread(int num)
1219 {
1220   if (!cva(bgMach).inReplayMode()) ASSERT(cva(simState).inEmulatorInit);
1221   cva(bgMach).numWth = num;
1222 }
1223
1224 int BgGetNumCommThread()
1225 {
1226   return cva(bgMach).numCth;
1227 }
1228
1229 void BgSetNumCommThread(int num)
1230 {
1231   ASSERT(cva(simState).inEmulatorInit);
1232   cva(bgMach).numCth = num;
1233 }
1234
1235 /*****************************************************************************
1236       Communication and Worker threads
1237 *****************************************************************************/
1238
1239 BgStartHandler  workStartFunc = NULL;
1240
1241 void BgSetWorkerThreadStart(BgStartHandler f)
1242 {
1243   workStartFunc = f;
1244 }
1245
1246 extern "C" void CthResumeNormalThread(CthThreadToken* token);
1247
1248 // kernel function for processing a bluegene message
1249 void BgProcessMessageDefault(threadInfo *tinfo, char *msg)
1250 {
1251   DEBUGM(5, ("=====Begin of BgProcessing a msg on node[%d]=====\n", BgMyNode()));
1252   int handler = CmiBgMsgHandle(msg);
1253   //CmiPrintf("[%d] call handler %d\n", BgMyNode(), handler);
1254   CmiAssert(handler < 1000);
1255
1256   BgHandlerInfo *handInfo;
1257 #if  CMK_BLUEGENE_NODE
1258   HandlerTable hdlTbl = tMYNODE->handlerTable;
1259   handInfo = hdlTbl.getHandle(handler);
1260 #else
1261   HandlerTable hdlTbl = tHANDLETAB;
1262   handInfo = hdlTbl.getHandle(handler);
1263   if (handInfo == NULL) handInfo = tMYNODE->handlerTable.getHandle(handler);
1264 #endif
1265
1266   if (handInfo == NULL) {
1267     CmiPrintf("[%d] invalid handler: %d. \n", tMYNODEID, handler);
1268     CmiAbort("BgProcessMessage Failed!");
1269   }
1270   BgHandlerEx entryFunc = handInfo->fnPtr;
1271
1272   if (programExit == 2) return;    // program exit already
1273
1274   CmiSetHandler(msg, CmiBgMsgHandle(msg));
1275
1276   // optimization for broadcast messages:
1277   // if the msg is a broadcast token msg, expand it to a real msg
1278   if (CmiBgMsgFlag(msg) == BG_CLONE) {
1279     msg = BgExpandMsg(msg);
1280   }
1281
1282   if (tinfo->watcher) tinfo->watcher->record(msg);
1283
1284   // don't count thread overhead and timing overhead
1285   startVTimer();
1286
1287   DEBUGM(5, ("Executing function %p\n", entryFunc));    
1288
1289   entryFunc(msg, handInfo->userPtr);
1290
1291   stopVTimer();
1292
1293   DEBUGM(5, ("=====End of BgProcessing a msg on node[%d]=====\n\n", BgMyNode()));
1294 }
1295
1296 void  (*BgProcessMessage)(threadInfo *t, char *msg) = BgProcessMessageDefault;
1297
1298 void scheduleWorkerThread(char *msg)
1299 {
1300   CthThread tid = (CthThread)msg;
1301 //CmiPrintf("scheduleWorkerThread %p\n", tid);
1302   CthAwaken(tid);
1303 }
1304
1305 // thread entry
1306 // for both comm and work thread, virtual function
1307 void run_thread(threadInfo *tinfo)
1308 {
1309   /* set the thread-private threadinfo */
1310   cta(threadinfo) = tinfo;
1311   tinfo->run();
1312 }
1313
1314 /* should be done only once per bg node */
1315 void BgNodeInitialize(nodeInfo *ninfo)
1316 {
1317   CthThread t;
1318   int i;
1319
1320   /* this is here because I will put a message to node inbuffer */
1321   tCURRTIME = 0.0;
1322   tSTARTTIME = CmiWallTimer();
1323
1324   /* creat work threads */
1325   for (i=0; i< cva(bgMach).numWth; i++)
1326   {
1327     threadInfo *tinfo = ninfo->threadinfo[i];
1328     t = CthCreate((CthVoidFn)run_thread, tinfo, cva(bgMach).stacksize);
1329     if (t == NULL) CmiAbort("BG> Failed to create worker thread. \n");
1330     tinfo->setThread(t);
1331     /* put to thread table */
1332     tTHREADTABLE[tinfo->id] = t;
1333 #if BIGSIM_OUT_OF_CORE && BIGSIM_OOC_PREFETCH
1334     //initial scheduling points for workthreads
1335     if(bgUseOutOfCore) schedWorkThds->push((workThreadInfo *)tinfo);
1336 #endif
1337     CthAwaken(t);
1338   }
1339
1340   /* creat communication thread */
1341   for (i=0; i< cva(bgMach).numCth; i++)
1342   {
1343     threadInfo *tinfo = ninfo->threadinfo[i+cva(bgMach).numWth];
1344     t = CthCreate((CthVoidFn)run_thread, tinfo, cva(bgMach).stacksize);
1345     if (t == NULL) CmiAbort("BG> Failed to create communication thread. \n");
1346     tinfo->setThread(t);
1347     /* put to thread table */
1348     tTHREADTABLE[tinfo->id] = t;
1349     CthAwaken(t);
1350   }
1351
1352 }
1353
1354 static void beginExitHandlerFunc(void *msg);
1355 static void writeToDisk();
1356 static void sendCorrectionStats();
1357
1358 void callAllUserTracingFunction()
1359 {
1360   if (userTracingFn == NULL) return;
1361   int origPe = -2;
1362   // close all tracing modules
1363   for (int j=0; j<cva(numNodes); j++)
1364     for (int i=0; i<cva(bgMach).numWth; i++) {
1365       int pe = nodeInfo::Local2Global(j)*cva(bgMach).numWth+i;
1366       int oldPe = CmiSwitchToPE(pe);
1367       if (cva(bgMach).replay != -1)
1368         if ( pe != cva(bgMach).replay ) continue;
1369       if (origPe == -2) origPe = oldPe;
1370       traceCharmClose();
1371       delete cva(nodeinfo)[j].threadinfo[i]->watcher;   // force dump watcher
1372       cva(nodeinfo)[j].threadinfo[i]->watcher = NULL;
1373       if (userTracingFn) userTracingFn();
1374     }
1375     if (origPe!=-2) CmiSwitchToPE(origPe);
1376 }
1377
1378 static CmiHandler exitHandlerFunc(char *msg)
1379 {
1380   // TODO: free memory before exit
1381   int i,j;
1382
1383   programExit = 2;
1384 #if BLUEGENE_TIMING
1385   // timing
1386   if (0)        // detail
1387   if (genTimeLog) {
1388     for (j=0; j<cva(numNodes); j++)
1389     for (i=0; i<cva(bgMach).numWth; i++) {
1390       BgTimeLine &log = cva(nodeinfo)[j].timelines[i].timeline; 
1391 //      BgPrintThreadTimeLine(nodeInfo::Local2Global(j), i, log);
1392       int x,y,z;
1393       nodeInfo::Local2XYZ(j, &x, &y, &z);
1394       BgWriteThreadTimeLine(arg_argv[0], x, y, z, i, log);
1395     }
1396
1397   }
1398 #endif
1399   if (genTimeLog) sendCorrectionStats();
1400
1401   if (genTimeLog) writeToDisk();
1402
1403 //  if (tTHREADTYPE == WORK_THREAD)
1404   {
1405   int origPe = -2;
1406   // close all tracing modules
1407   for (j=0; j<cva(numNodes); j++)
1408     for (i=0; i<cva(bgMach).numWth; i++) {
1409       int pe = nodeInfo::Local2Global(j)*cva(bgMach).numWth+i;
1410       int oldPe = CmiSwitchToPE(pe);
1411       if (cva(bgMach).replay != -1)
1412         if ( pe != cva(bgMach).replay ) continue;
1413       if (origPe == -2) origPe = oldPe;
1414       traceCharmClose();
1415 //      CmiSwitchToPE(oldPe);
1416       delete cva(nodeinfo)[j].threadinfo[i]->watcher;   // force dump watcher
1417       cva(nodeinfo)[j].threadinfo[i]->watcher = NULL;
1418       if (userTracingFn) userTracingFn();
1419     }
1420     if (origPe!=-2) CmiSwitchToPE(origPe);
1421   }
1422
1423 #if 0
1424   delete [] cva(nodeinfo);
1425   delete [] cva(inBuffer);
1426   for (i=0; i<cva(numNodes); i++) CmmFree(cva(msgBuffer)[i]);
1427   delete [] cva(msgBuffer);
1428 #endif
1429
1430 #if CMK_HAS_COUNTER_PAPI
1431   if (cva(bgMach).timingMethod == BG_COUNTER) {
1432 /*        
1433   CmiPrintf("BG[PE %d]> cycles: %lld\n", CmiMyPe(), total_ins);
1434   CmiPrintf("BG[PE %d]> floating point instructions: %lld\n", CmiMyPe(), total_fps);
1435   CmiPrintf("BG[PE %d]> L1 cache misses: %lld\n", CmiMyPe(), total_l1_dcm);
1436   //CmiPrintf("BG[PE %d]> cycles stalled waiting for memory access: %lld\n", CmiMyPe(), total_mem_rcy);
1437  */
1438         for(int i=0; i<numPapiEvents; i++){
1439           CmiPrintf("BG[PE %d]> %s: %lld\n", CmiMyPe(), papi_counters_desc[i], total_papi_counters[i]);
1440            delete papi_counters_desc[i];
1441         }
1442         delete papiEvents;
1443         delete papiValues;
1444         delete papi_counters_desc;
1445         delete total_papi_counters;
1446   }
1447 #endif
1448
1449   //ConverseExit();
1450   if (genTimeLog)
1451     { if (CmiMyPe() != 0) CsdExitScheduler(); }
1452   else
1453     CsdExitScheduler();
1454
1455   //if (CmiMyPe() == 0) CmiPrintf("\nBG> BlueGene emulator shutdown gracefully!\n");
1456
1457   return 0;
1458 }
1459
1460 static void sanityCheck()
1461 {
1462   if (cva(bgMach).x==0 || cva(bgMach).y==0 || cva(bgMach).z==0)  {
1463     if (CmiMyPe() == 0)
1464       CmiPrintf("\nMissing parameters for BlueGene machine size!\n<tip> use command line options: +x, +y, or +z.\n");
1465     BgShutdown(); 
1466   } 
1467   else if (cva(bgMach).numCth==0 || cva(bgMach).numWth==0) { 
1468 #if 1
1469     if (cva(bgMach).numCth==0) cva(bgMach).numCth=1;
1470     if (cva(bgMach).numWth==0) cva(bgMach).numWth=1;
1471 #else
1472     if (CmiMyPe() == 0)
1473       CmiPrintf("\nMissing parameters for number of communication/worker threads!\n<tip> use command line options: +cth or +wth.\n");
1474     BgShutdown(); 
1475 #endif
1476   }
1477   if (cva(bgMach).getNodeSize()<CmiNumPes()) {
1478     CmiAbort("\nToo few BigSim nodes!\n");
1479   }
1480 }
1481
1482 #undef CmiSwitchToPE
1483 extern "C" int CmiSwitchToPEFn(int pe);
1484
1485 // main
1486 CmiStartFn bgMain(int argc, char **argv)
1487 {
1488   int i;
1489   char *configFile = NULL;
1490
1491   BgProcessMessage = BgProcessMessageDefault;
1492 #if CMK_CONDS_USE_SPECIAL_CODE
1493   // overwrite possible implementation in machine.c
1494   CmiSwitchToPE = CmiSwitchToPEFn;
1495 #endif
1496
1497   /* initialize all processor level data */
1498   CpvInitialize(BGMach,bgMach);
1499   cva(bgMach).nullify();
1500
1501   CmiArgGroup("Charm++","BlueGene Simulator");
1502   if (CmiGetArgStringDesc(argv, "+bgconfig", &configFile, "BlueGene machine config file")) {
1503    cva(bgMach). read(configFile);
1504   }
1505   CmiGetArgIntDesc(argv, "+x", &cva(bgMach).x, 
1506                 "The x size of the grid of nodes");
1507   CmiGetArgIntDesc(argv, "+y", &cva(bgMach).y, 
1508                 "The y size of the grid of nodes");
1509   CmiGetArgIntDesc(argv, "+z", &cva(bgMach).z, 
1510                 "The z size of the grid of nodes");
1511   CmiGetArgIntDesc(argv, "+cth", &cva(bgMach).numCth, 
1512                 "The number of simulated communication threads per node");
1513   CmiGetArgIntDesc(argv, "+wth", &cva(bgMach).numWth, 
1514                 "The number of simulated worker threads per node");
1515
1516   CmiGetArgIntDesc(argv, "+bgstacksize", &cva(bgMach).stacksize, 
1517                 "Blue Gene thread stack size");
1518
1519   if (CmiGetArgFlagDesc(argv, "+bglog", "Write events to log file"))
1520      genTimeLog = 1;
1521   if (CmiGetArgFlagDesc(argv, "+bgcorrect", "Apply timestamp correction to logs"))
1522     correctTimeLog = 1;
1523   schedule_flag = 0;
1524   if (correctTimeLog) {
1525     genTimeLog = 1;
1526     schedule_flag = 1;
1527   }
1528
1529   if (CmiGetArgFlagDesc(argv, "+bgverbose", "Print debug info verbosely"))
1530     bgverbose = 1;
1531
1532   // for timing method, default using elapse calls.
1533   if(CmiGetArgFlagDesc(argv, "+bgelapse", 
1534                        "Use user provided BgElapse for time prediction")) 
1535       cva(bgMach).timingMethod = BG_ELAPSE;
1536   if(CmiGetArgFlagDesc(argv, "+bgwalltime", 
1537                        "Use walltime method for time prediction")) 
1538       cva(bgMach).timingMethod = BG_WALLTIME;
1539 #ifdef CMK_ORIGIN2000
1540   if(CmiGetArgFlagDesc(argv, "+bgcounter", "Use performance counter")) 
1541       cva(bgMach).timingMethod = BG_COUNTER;
1542 #elif CMK_HAS_COUNTER_PAPI
1543   if (CmiGetArgFlagDesc(argv, "+bgpapi", "Use PAPI Performance counters")) {
1544     cva(bgMach).timingMethod = BG_COUNTER;
1545   }
1546   if (cva(bgMach).timingMethod == BG_COUNTER) {
1547     init_counters();
1548   }
1549 #endif
1550   CmiGetArgDoubleDesc(argv,"+bgfpfactor", &cva(bgMach).fpfactor, 
1551                       "floating point to time factor");
1552   CmiGetArgDoubleDesc(argv,"+bgcpufactor", &cva(bgMach).cpufactor, 
1553                       "scale factor for wallclock time measured");
1554   CmiGetArgDoubleDesc(argv,"+bgtimercost", &cva(bgMach).timercost, 
1555                       "timer cost");
1556   #if 0
1557   if(cva(bgMach).timingMethod == BG_WALLTIME)
1558   {
1559       int count = 1e6;
1560       double start, stop, diff, cost, dummy;
1561
1562       dummy = BG_TIMER(); // In case there's an initialization delay somewhere
1563
1564       start = BG_TIMER();
1565       for (int i = 0; i < count; ++i)
1566           dummy = BG_TIMER();
1567       stop = BG_TIMER();
1568
1569       diff = stop - start;
1570       cost = diff / count;
1571
1572       CmiPrintf("Measured timer cost: %g Actual: %g\n", 
1573                 cost, cva(bgMach).timercost);
1574       cva(bgMach).timercost = cost;
1575   }
1576   #endif
1577   
1578   char *networkModel;
1579   if (CmiGetArgStringDesc(argv, "+bgnetwork", &networkModel, "Network model")) {
1580    cva(bgMach).setNetworkModel(networkModel);
1581   }
1582
1583   bgcorroff = 0;
1584   if(CmiGetArgFlagDesc(argv, "+bgcorroff", "Start with correction off")) 
1585     bgcorroff = 1;
1586
1587   bgstats_flag=0;
1588   if(CmiGetArgFlagDesc(argv, "+bgstats", "Print correction statistics")) 
1589     bgstats_flag = 1;
1590
1591   if (CmiGetArgStringDesc(argv, "+bgtraceroot", &cva(bgMach).traceroot, "Directory to write bgTrace files to"))
1592   {
1593     char *root = (char*)malloc(strlen(cva(bgMach).traceroot) + 10);
1594     sprintf(root, "%s/", cva(bgMach).traceroot);
1595     cva(bgMach).traceroot = root;
1596   }
1597
1598   // record/replay
1599   if (CmiGetArgFlagDesc(argv,"+bgrecord","Record message processing order for BigSim")) {
1600     cva(bgMach).record = 1;
1601     if (CmiMyPe() == 0)
1602       CmiPrintf("BG info> full record mode. \n");
1603   }
1604   if (CmiGetArgFlagDesc(argv,"+bgrecordnode","Record message processing order for BigSim")) {
1605     cva(bgMach).recordnode = 1;
1606     if (CmiMyPe() == 0)
1607       CmiPrintf("BG info> full record mode on node. \n");
1608   }
1609   int replaype;
1610   if (CmiGetArgIntDesc(argv,"+bgreplay", &replaype, "Re-play message processing order for BigSim")) {
1611     cva(bgMach).replay = replaype;
1612   }
1613   else {
1614     if (CmiGetArgFlagDesc(argv,"+bgreplay","Record message processing order for BigSim"))
1615     cva(bgMach).replay = 0;    // default to 0
1616   }
1617   if (cva(bgMach).replay >= 0) {
1618     if (CmiNumPes()>1)
1619       CmiAbort("BG> bgreplay mode must run on one physical processor.");
1620     if (cva(bgMach).x!=1 || cva(bgMach).y!=1 || cva(bgMach).z!=1 ||
1621          cva(bgMach).numWth!=1 || cva(bgMach).numCth!=1)
1622       CmiAbort("BG> bgreplay mode must run on one target processor.");
1623     CmiPrintf("BG info> replay mode for target processor %d.\n", cva(bgMach).replay);
1624   }
1625   char *procs = NULL;
1626   if (CmiGetArgStringDesc(argv, "+bgrecordprocessors", &procs, "A list of processors to record, e.g. 0,10,20-30")) {
1627     cva(bgMach).recordprocs.set(procs);
1628   }
1629
1630     // record/replay at node level
1631   char *nodes = NULL;
1632   if (CmiGetArgStringDesc(argv, "+bgrecordnodes", &nodes, "A list of nodes to record, e.g. 0,10,20-30")) {
1633     cva(bgMach).recordnodes.set(nodes);
1634   }
1635   int replaynode;
1636   if (CmiGetArgIntDesc(argv,"+bgreplaynode", &replaynode, "Re-play message processing order for BigSim")) {      
1637     cva(bgMach).replaynode = replaynode; 
1638   }
1639   else {
1640     if (CmiGetArgFlagDesc(argv,"+bgreplaynode","Record message processing order for BigSim"))
1641     cva(bgMach).replaynode = 0;    // default to 0
1642   }
1643   if (cva(bgMach).replaynode >= 0) {
1644     int startpe, endpe;
1645     BgRead_nodeinfo(replaynode, startpe, endpe);
1646     if (cva(bgMach).numWth != endpe-startpe+1) {
1647       cva(bgMach).numWth = endpe-startpe+1;     // update wth
1648       CmiPrintf("BG info> numWth is changed to %d.\n", cva(bgMach).numWth);
1649     }
1650     if (CmiNumPes()>1)
1651       CmiAbort("BG> bgreplay mode must run on one physical processor.");
1652     if (cva(bgMach).x!=1 || cva(bgMach).y!=1 || cva(bgMach).z!=1)
1653       CmiAbort("BG> bgreplay mode must run on one target processor.");
1654     CmiPrintf("BG info> replay mode for target node %d.\n", cva(bgMach).replaynode);   
1655   }     
1656   CmiAssert(!(cva(bgMach).replaynode != -1 && cva(bgMach).replay != -1));
1657
1658
1659   /* parameters related with out-of-core execution */
1660   int tmpcap=0;
1661   if (CmiGetArgIntDesc(argv, "+bgooccap", &tmpcap, "Simulate with out-of-core support and the number of target processors allowed in memory")){
1662      TBLCAPACITY = tmpcap;
1663     }
1664   if (CmiGetArgDoubleDesc(argv, "+bgooc", &bgOOCMaxMemSize, "Simulate with out-of-core support and the threshhold of memory size")){
1665       bgUseOutOfCore = 1;
1666       _BgInOutOfCoreMode = 1; //the global (the whole converse layer) out-of-core flag
1667
1668       double curFreeMem = bgGetSysFreeMemSize();
1669       if(fabs(bgOOCMaxMemSize - 0.0)<=1e-6){
1670         //using the memory available of the system right now
1671         //assuming no other programs will run after this program runs
1672         bgOOCMaxMemSize = curFreeMem;
1673         CmiPrintf("Using the system's current memory available: %.3fMB\n", bgOOCMaxMemSize);
1674       }
1675       if(bgOOCMaxMemSize > curFreeMem){
1676         CmiPrintf("Warning: not enough memory for the specified memory size, now use the current available memory %3.fMB.\n", curFreeMem);
1677         bgOOCMaxMemSize = curFreeMem;
1678       }
1679       DEBUGF(("out-of-core turned on!\n"));
1680   }      
1681
1682 #if BLUEGENE_DEBUG_LOG
1683   {
1684     char ln[200];
1685     sprintf(ln,"bgdebugLog.%d",CmiMyPe());
1686     bgDebugLog=fopen(ln,"w");
1687   }
1688 #endif
1689
1690   arg_argv = argv;
1691   arg_argc = CmiGetArgc(argv);
1692
1693   /* msg handler */
1694   CpvInitialize(SimState, simState);
1695   cva(simState).msgHandler = CmiRegisterHandler((CmiHandler) msgHandlerFunc);
1696   cva(simState).nBcastMsgHandler = CmiRegisterHandler((CmiHandler)nodeBCastMsgHandlerFunc);
1697   cva(simState).tBcastMsgHandler = CmiRegisterHandler((CmiHandler)threadBCastMsgHandlerFunc);
1698   cva(simState).exitHandler = CmiRegisterHandler((CmiHandler) exitHandlerFunc);
1699
1700   cva(simState).beginExitHandler = CmiRegisterHandler((CmiHandler) beginExitHandlerFunc);
1701   cva(simState).inEmulatorInit = 1;
1702   /* call user defined BgEmulatorInit */
1703   BgEmulatorInit(arg_argc, arg_argv);
1704   cva(simState).inEmulatorInit = 0;
1705
1706   /* check if all bluegene node size and thread information are set */
1707   sanityCheck();
1708
1709   _bgSize = cva(bgMach).getNodeSize(); 
1710
1711   timerFunc = BgGetTime;
1712
1713   BgInitTiming();               // timing module
1714
1715   if (CmiMyPe() == 0) {
1716     CmiPrintf("BG info> Simulating %dx%dx%d nodes with %d comm + %d work threads each.\n", cva(bgMach).x, cva(bgMach).y, cva(bgMach).z, cva(bgMach).numCth, cva(bgMach).numWth);
1717     CmiPrintf("BG info> Network type: %s.\n", cva(bgMach).network->name());
1718     cva(bgMach).network->print();
1719     CmiPrintf("BG info> cpufactor is %f.\n", cva(bgMach).cpufactor);
1720     CmiPrintf("BG info> floating point factor is %f.\n", cva(bgMach).fpfactor);
1721     if (cva(bgMach).stacksize)
1722       CmiPrintf("BG info> BG stack size: %d bytes. \n", cva(bgMach).stacksize);
1723     if (cva(bgMach).timingMethod == BG_ELAPSE) 
1724       CmiPrintf("BG info> Using BgElapse calls for timing method. \n");
1725     else if (cva(bgMach).timingMethod == BG_WALLTIME)
1726       CmiPrintf("BG info> Using WallTimer for timing method. \n");
1727     else if (cva(bgMach).timingMethod == BG_COUNTER)
1728       CmiPrintf("BG info> Using performance counter for timing method. \n");
1729     if (genTimeLog)
1730       CmiPrintf("BG info> Generating timing log. \n");
1731     if (correctTimeLog)
1732       CmiPrintf("BG info> Perform timestamp correction. \n");
1733     if (cva(bgMach).traceroot)
1734       CmiPrintf("BG info> bgTrace root is '%s'. \n", cva(bgMach).traceroot);
1735   }
1736
1737   CtvInitialize(threadInfo *, threadinfo);
1738
1739   /* number of bg nodes on this PE */
1740   CpvInitialize(int, numNodes);
1741   cva(numNodes) = nodeInfo::numLocalNodes();
1742
1743   if (CmiMyRank() == 0)
1744     initLock = CmiCreateLock();     // used for BnvInitialize
1745
1746   bgstreaming.init(cva(numNodes));
1747
1748   //Must initialize out-of-core related data structures before creating any BG nodes and procs
1749 if(bgUseOutOfCore){
1750       initTblThreadInMem();
1751           #if BIGSIM_OUT_OF_CORE && BIGSIM_OOC_PREFETCH
1752       //init prefetch status      
1753       thdsOOCPreStatus = new oocPrefetchStatus[cva(numNodes)*cva(bgMach).numWth];
1754       oocPrefetchSpace = new oocPrefetchBufSpace();
1755       schedWorkThds = new oocWorkThreadQueue();
1756           #endif
1757   }
1758
1759 #if BIGSIM_OUT_OF_CORE
1760   //initialize variables related to get precise
1761   //physical memory usage info for a process
1762   bgMemPageSize = getpagesize();
1763   memset(bgMemStsFile, 0, 25); 
1764   sprintf(bgMemStsFile, "/proc/%d/statm", getpid());
1765 #endif
1766
1767
1768   /* create BG nodes */
1769   CpvInitialize(nodeInfo *, nodeinfo);
1770   cva(nodeinfo) = new nodeInfo[cva(numNodes)];
1771   _MEMCHECK(cva(nodeinfo));
1772
1773   cta(threadinfo) = new threadInfo(-1, UNKNOWN_THREAD, NULL);
1774   _MEMCHECK(cta(threadinfo));
1775
1776   /* create BG processors for each node */
1777   for (i=0; i<cva(numNodes); i++)
1778   {
1779     nodeInfo *ninfo = cva(nodeinfo) + i;
1780     // create threads
1781     ninfo->initThreads(i);
1782
1783     /* pretend that I am a thread */
1784     cta(threadinfo)->myNode = ninfo;
1785
1786     /* initialize a BG node and fire all threads */
1787     BgNodeInitialize(ninfo);
1788   }
1789
1790   // clear main thread.
1791   cta(threadinfo)->myNode = NULL;
1792
1793   CpvInitialize(CthThread, mainThread);
1794   cva(mainThread) = CthSelf();
1795
1796   CpvInitialize(int, CthResumeBigSimThreadIdx);
1797
1798   cva(simState).simStartTime = CmiWallTimer();    
1799   return 0;
1800 }
1801
1802 // for conv-conds:
1803 // if -2 untouch
1804 // if -1 main thread
1805 #if CMK_BLUEGENE_THREAD
1806 extern "C" int CmiSwitchToPEFn(int pe)
1807 {
1808   if (pe == -2) return -2;
1809   int oldpe;
1810 //  ASSERT(tTHREADTYPE != COMM_THREAD);
1811   if (tMYNODE == NULL) oldpe = -1;
1812   else if (tTHREADTYPE == COMM_THREAD) oldpe = -BgGetThreadID();
1813   else if (tTHREADTYPE == WORK_THREAD) oldpe = BgGetGlobalWorkerThreadID();
1814   else oldpe = -1;
1815 //CmiPrintf("CmiSwitchToPE from %d to %d\n", oldpe, pe);
1816   if (pe == -1) {
1817     CthSwitchThread(cva(mainThread));
1818   }
1819   else if (pe < 0) {
1820   }
1821   else {
1822 //    if (cva(bgMach).inReplayMode()) pe = 0;         /* replay mode */
1823     int t = pe%cva(bgMach).numWth;
1824     int newpe = nodeInfo::Global2Local(pe/cva(bgMach).numWth);
1825     nodeInfo *ninfo = cva(nodeinfo) + newpe;;
1826     threadInfo *tinfo = ninfo->threadinfo[t];
1827     CthSwitchThread(tinfo->me);
1828   }
1829   return oldpe;
1830 }
1831 #else
1832 extern "C" int CmiSwitchToPEFn(int pe)
1833 {
1834   if (pe == -2) return -2;
1835   int oldpe;
1836   if (tMYNODE == NULL) oldpe = -1;
1837   else oldpe = BgMyNode();
1838   if (pe == -1) {
1839     cta(threadinfo)->myNode = NULL;
1840   }
1841   else {
1842     int newpe = nodeInfo::Global2Local(pe);
1843     cta(threadinfo)->myNode = cva(nodeinfo) + newpe;;
1844   }
1845   return oldpe;
1846 }
1847 #endif
1848
1849
1850 /*****************************************************************************
1851                         TimeLog correction
1852 *****************************************************************************/
1853
1854 extern void processCorrectionMsg(int nodeidx);
1855
1856 // return the msg pointer, and the index of the message in the affinity queue.
1857 static inline char* searchInAffinityQueue(int nodeidx, BgMsgID &msgId, CmiInt2 tID, int &index)
1858 {
1859   CmiAssert(tID != ANYTHREAD);
1860   ckMsgQueue &affinityQ = cva(nodeinfo)[nodeidx].affinityQ[tID];
1861   for (int i=0; i<affinityQ.length(); i++)  {
1862       char *msg = affinityQ[i];
1863       BgMsgID md = BgMsgID(CmiBgMsgSrcPe(msg), CmiBgMsgID(msg));
1864       if (msgId == md) {
1865         index = i;
1866         return msg;
1867       }
1868   }
1869   return NULL;
1870 }
1871
1872 // return the msg pointer, thread id and the index of the message in the affinity queue.
1873 static char* searchInAffinityQueueInNode(int nodeidx, BgMsgID &msgId, CmiInt2 &tID, int &index)
1874 {
1875   for (tID=0; tID<cva(bgMach).numWth; tID++) {
1876     char *msg = searchInAffinityQueue(nodeidx, msgId, tID, index);
1877     if (msg) return msg;
1878   }
1879   return NULL;
1880 }
1881
1882 StateCounters stateCounters;
1883
1884 int updateRealMsgs(bgCorrectionMsg *cm, int nodeidx)
1885 {
1886   char *msg;
1887   CmiInt2 tID = cm->tID;
1888   int index;
1889   if (tID == ANYTHREAD) {
1890     msg = searchInAffinityQueueInNode(nodeidx, cm->msgId, tID, index);
1891   }
1892   else {
1893     msg = searchInAffinityQueue(nodeidx, cm->msgId, tID, index);
1894   }
1895   if (msg == NULL) return 0;
1896
1897   ckMsgQueue &affinityQ = cva(nodeinfo)[nodeidx].affinityQ[tID];
1898   CmiBgMsgRecvTime(msg) = cm->tAdjust;
1899   affinityQ.update(index);
1900   CthThread tid = cva(nodeinfo)[nodeidx].threadTable[tID];
1901   unsigned int prio = (unsigned int)(cm->tAdjust*PRIO_FACTOR)+1;
1902   CthAwakenPrio(tid, CQS_QUEUEING_IFIFO, sizeof(int), &prio);
1903   stateCounters.corrMsgCRCnt++;
1904   return 1;       /* invalidate this msg */
1905 }
1906
1907 extern void processBufferCorrectionMsgs(void *ignored);
1908
1909 // Coverse handler for begin exit
1910 // flush and process all correction messages
1911 static void beginExitHandlerFunc(void *msg)
1912 {
1913   CmiFree(msg);
1914   delayCheckFlag = 0;
1915 //CmiPrintf("\n\n\nbeginExitHandlerFunc called on %d\n", CmiMyPe());
1916   programExit = 1;
1917 #if LIMITED_SEND
1918   CQdCreate(CpvAccess(cQdState), BgNodeSize());
1919 #endif
1920
1921 #if 1
1922   for (int i=0; i<BgNodeSize(); i++) 
1923     processCorrectionMsg(i); 
1924 #if USE_MULTISEND
1925   BgSendBufferedCorrMsgs();
1926 #endif
1927 #else
1928   // the msg queue should be empty now.
1929   // don't do insert adjustment, but start to do all timing correction from here
1930   int nodeidx, tID;
1931   for (nodeidx=0; nodeidx<BgNodeSize(); nodeidx++) {
1932     BgTimeLineRec *tlines = cva(nodeinfo)[nodeidx].timelines;
1933     for (tID=0; tID<cva(numWth); tID++) {
1934         BgTimeLineRec &tlinerec = tlines[tID];
1935         BgAdjustTimeLine(tlinerec, nodeidx, tID);
1936     }
1937   }
1938
1939 #endif
1940
1941 #if !THROTTLE_WORK
1942 #if DELAY_CHECK
1943   CcdCallFnAfter(processBufferCorrectionMsgs,NULL,CHECK_INTERVAL);
1944 #endif
1945 #endif
1946 }
1947
1948 #define HISTOGRAM_SIZE  100
1949 // compute total CPU utilization for each timeline and 
1950 // return the number of real msgs
1951 void computeUtilForAll(int* array, int *nReal)
1952 {
1953   double scale = 1.0e3;         // scale to ms
1954
1955   //We measure from 1ms to 5001 ms in steps of 100 ms
1956   int min = 0, step = 1;
1957   int max = min + HISTOGRAM_SIZE*step;
1958   // [min, max: step]  HISTOGRAM_SIZE slots
1959
1960   if (CmiMyPe()==0)
1961     CmiPrintf("computeUtilForAll: min:%d max:%d step:%d scale:%f.\n", min, max, step, scale);
1962   int size = (max-min)/step;
1963   CmiAssert(size == HISTOGRAM_SIZE);
1964   int allmin = -1, allmax = -1;
1965   for(int i=0;i<size;i++) array[i] = 0;
1966
1967   for (int nodeidx=0; nodeidx<cva(numNodes); nodeidx++) {
1968     BgTimeLineRec *tlinerec = cva(nodeinfo)[nodeidx].timelines;
1969     for (int tID=0; tID<cva(bgMach).numWth; tID++) {
1970       int util = (int)(scale*(tlinerec[tID].computeUtil(nReal)));
1971
1972       if (util >= max) { if (util>allmax||allmax==-1) allmax=util; util=max-1;}
1973       if (util < min) { if (util<allmin||allmin==-1) allmin=util; util=min; }
1974       array[(util-min)/step]++;
1975     }
1976   }
1977   if (allmin!=-1 || allmax!=-1)
1978     CmiPrintf("[%d] Warning: computeUtilForAll out of range %f - %f.\n", CmiMyPe(), (allmin==-1)?-1:allmin/scale, (allmax==-1)?-1:allmax/scale);
1979 }
1980
1981 class StatsMessage {
1982   char core[CmiBlueGeneMsgHeaderSizeBytes];
1983 public:
1984   int processCount;
1985   int corrMsgCount;
1986   int realMsgCount;
1987   int maxTimelineLen, minTimelineLen;
1988 };
1989
1990 extern int processCount, corrMsgCount;
1991
1992 static void sendCorrectionStats()
1993 {
1994   int msgSize = sizeof(StatsMessage)+sizeof(int)*HISTOGRAM_SIZE;
1995   StatsMessage *statsMsg = (StatsMessage *)CmiAlloc(msgSize);
1996   statsMsg->processCount = processCount;
1997   statsMsg->corrMsgCount = corrMsgCount;
1998   int numMsgs=0;
1999   int maxTimelineLen=-1, minTimelineLen=CMK_MAXINT;
2000   int totalMem = 0;
2001   if (bgstats_flag) {
2002   for (int nodeidx=0; nodeidx<cva(numNodes); nodeidx++) {
2003     BgTimeLineRec *tlines = cva(nodeinfo)[nodeidx].timelines;
2004     for (int tID=0; tID<cva(bgMach).numWth; tID++) {
2005         BgTimeLineRec &tlinerec = tlines[tID];
2006         int tlen = tlinerec.length();
2007         if (tlen>maxTimelineLen) maxTimelineLen=tlen;
2008         if (tlen<minTimelineLen) minTimelineLen=tlen;
2009         totalMem = tlen*sizeof(BgTimeLog);
2010 //CmiPrintf("[%d node:%d] BgTimeLog: %dK len:%d size of bglog: %d bytes\n", CmiMyPe(), nodeidx, totalMem/1000, tlen, sizeof(BgTimeLog));
2011 #if 0
2012         for (int i=0; i< tlinerec.length(); i++) {
2013           numMsgs += tlinerec[i]->msgs.length();
2014         }
2015 #endif
2016     }
2017   }
2018   computeUtilForAll((int*)(statsMsg+1), &numMsgs);
2019   statsMsg->realMsgCount = numMsgs;
2020   statsMsg->maxTimelineLen = maxTimelineLen;
2021   statsMsg->minTimelineLen = minTimelineLen;
2022   }  // end if
2023
2024   CmiSetHandler(statsMsg, cva(simState).bgStatCollectHandler);
2025   CmiSyncSendAndFree(0, msgSize, statsMsg);
2026 }
2027
2028 // Converse handler for collecting stats
2029 void statsCollectionHandlerFunc(void *msg)
2030 {
2031   static int count=0;
2032   static int pc=0, cc=0, realMsgCount=0;
2033   static int maxTimelineLen=0, minTimelineLen=CMK_MAXINT;
2034   static int *histArray = NULL;
2035   int i;
2036
2037   count++;
2038   if (histArray == NULL) {
2039     histArray = new int[HISTOGRAM_SIZE];
2040     for (i=0; i<HISTOGRAM_SIZE; i++) histArray[i]=0;
2041   }
2042   StatsMessage *m = (StatsMessage *)msg;
2043   pc += m->processCount;
2044   cc += m->corrMsgCount;
2045   realMsgCount += m->realMsgCount;
2046   if (minTimelineLen> m->minTimelineLen) minTimelineLen=m->minTimelineLen;
2047   if (maxTimelineLen< m->maxTimelineLen) maxTimelineLen=m->maxTimelineLen;
2048   int *array = (int *)(m+1);
2049   for (i=0; i<HISTOGRAM_SIZE; i++) histArray[i] += array[i];
2050   if (count == CmiNumPes()) {
2051     if (bgstats_flag) {
2052       CmiPrintf("Total procCount:%d corrMsgCount:%d realMsg:%d timeline:%d-%d\n", pc, cc, realMsgCount, minTimelineLen, maxTimelineLen);
2053       for (i=0; i<HISTOGRAM_SIZE; i++) {
2054         CmiPrintf("%d ", histArray[i]);
2055         if (i%20 == 19) CmiPrintf("\n");
2056       }
2057       CmiPrintf("\n");
2058     }
2059     CsdExitScheduler();
2060   }
2061   CmiFree(msg);
2062 }
2063
2064 // update arrival time from buffer messages
2065 // before start an entry, check message time against buffered timing
2066 // correction message to update to the correct time.
2067 void correctMsgTime(char *msg)
2068 {
2069    if (!correctTimeLog) return;
2070 //   if (CkMsgDoCorrect(msg) == 0) return;
2071
2072    BgMsgID msgId(CmiBgMsgSrcPe(msg), CmiBgMsgID(msg));
2073    CmiInt2 tid = CmiBgMsgThreadID(msg);
2074
2075    bgCorrectionQ &cmsg = cva(nodeinfo)[tMYNODEID].cmsg;
2076    int len = cmsg.length();
2077    for (int i=0; i<len; i++) {
2078      bgCorrectionMsg* m = cmsg[i];
2079      if (msgId == m->msgId && tid == m->tID) {
2080         if (m->tAdjust < 0.0) return;
2081         //CmiPrintf("correctMsgTime from %e to %e\n", CmiBgMsgRecvTime(msg), m->tAdjust);
2082         CmiBgMsgRecvTime(msg) = m->tAdjust;
2083         m->tAdjust = -1.0;       /* invalidate this msg */
2084 //      cmsg.update(i);
2085         stateCounters.corrMsgRCCnt++;
2086         break;
2087      }
2088    }
2089 }
2090
2091
2092 //TODO: write disk bgTraceFiles
2093 static void writeToDisk()
2094 {
2095
2096   char* d = new char[1025];
2097   //Num of simulated procs on this real pe
2098   int numLocalProcs = cva(numNodes)*cva(bgMach).numWth;
2099
2100   const PUP::machineInfo &machInfo = PUP::machineInfo::current();
2101
2102   // write summary file on PE0
2103   if(CmiMyPe()==0){
2104     
2105     sprintf(d, "%sbgTrace", cva(bgMach).traceroot?cva(bgMach).traceroot:""); 
2106     FILE *f2 = fopen(d,"wb");
2107     //Total emulating processors and total target BG processors
2108     int numEmulatingPes=CmiNumPes();
2109     int totalWorkerProcs = BgNumNodes()*cva(bgMach).numWth;
2110
2111     if(f2==NULL) {
2112       CmiPrintf("[%d] Creating trace file %s  failed\n", CmiMyPe(), d);
2113       CmiAbort("BG> Abort");
2114     }
2115     PUP::toDisk p(f2);
2116     p((char *)&machInfo, sizeof(machInfo));
2117     p|totalWorkerProcs;
2118     p|cva(bgMach);
2119     p|numEmulatingPes;
2120     p|bglog_version;
2121     p|CpvAccess(CthResumeBigSimThreadIdx);
2122     
2123     CmiPrintf("[0] Number is numX:%d numY:%d numZ:%d numCth:%d numWth:%d numEmulatingPes:%d totalWorkerProcs:%d bglog_ver:%d\n",cva(bgMach).x,cva(bgMach).y,cva(bgMach).z,cva(bgMach).numCth,cva(bgMach).numWth,numEmulatingPes,totalWorkerProcs,bglog_version);
2124     
2125     fclose(f2);
2126   }
2127   
2128   sprintf(d, "%sbgTrace%d", cva(bgMach).traceroot?cva(bgMach).traceroot:"", CmiMyPe()); 
2129   FILE *f = fopen(d,"wb");
2130  
2131   if(f==NULL)
2132     CmiPrintf("Creating bgTrace%d failed\n",CmiMyPe());
2133   PUP::toDisk p(f);
2134   
2135   p((char *)&machInfo, sizeof(machInfo));       // machine info
2136   p|numLocalProcs;
2137
2138   // CmiPrintf("Timelines are: \n");
2139   int procTablePos = ftell(f);
2140
2141   int *procOffsets = new int[numLocalProcs];
2142   int procTableSize = (numLocalProcs)*sizeof(int);
2143   fseek(f,procTableSize,SEEK_CUR); 
2144
2145   for (int j=0; j<cva(numNodes); j++){
2146     for(int i=0;i<cva(bgMach).numWth;i++){
2147     #if BIGSIM_OUT_OF_CORE
2148         //When isomalloc is used, some events inside BgTimeLineRec are allocated
2149         //through isomalloc. Therefore, the memory containing those events needs
2150         //to be brought back into memory from disk. --Chao Mei          
2151         if(bgUseOutOfCore && CmiMemoryIs(CMI_MEMORY_IS_ISOMALLOC))
2152             bgOutOfCoreSchedule(cva(nodeinfo)[j].threadinfo[i]);
2153      #endif     
2154       BgTimeLineRec &t = cva(nodeinfo)[j].timelines[i];
2155       procOffsets[j*cva(bgMach).numWth + i] = ftell(f);
2156       t.pup(p);
2157     }
2158   }
2159   
2160   fseek(f,procTablePos,SEEK_SET);
2161   p(procOffsets,numLocalProcs);
2162   fclose(f);
2163
2164   CmiPrintf("[%d] Wrote to disk for %d BG nodes. \n", CmiMyPe(), cva(numNodes));
2165 }
2166
2167
2168 /*****************************************************************************
2169              application Converse thread hook
2170 *****************************************************************************/
2171
2172 CpvExtern(int      , CthResumeBigSimThreadIdx);
2173
2174 void CthEnqueueBigSimThread(CthThreadToken* token, int s,
2175                                    int pb,unsigned int *prio)
2176 {
2177 /*
2178   CmiSetHandler(token, CpvAccess(CthResumeNormalThreadIdx));
2179   CsdEnqueueGeneral(token, s, pb, prio);
2180 */
2181 #if CMK_BLUEGENE_THREAD
2182   int x, y, z;
2183   BgGetMyXYZ(&x, &y, &z);
2184   int t = BgGetThreadID();
2185 #else
2186   #error "ERROR HERE"
2187 #endif
2188     // local message into queue
2189   DEBUGM(4, ("In EnqueueBigSimThread method!\n"));
2190
2191   DEBUGM(4, ("token [%p] is added to queue pointing to thread[%p]\n", token, token->thread));
2192   BgSendPacket(x,y,z, t, CpvAccess(CthResumeBigSimThreadIdx), LARGE_WORK, sizeof(CthThreadToken), (char *)token);
2193
2194
2195 CthThread CthSuspendBigSimThread()
2196
2197   return  cta(threadinfo)->me;
2198 }
2199
2200 static void bigsimThreadListener_suspend(struct CthThreadListener *l)
2201 {
2202    // stop timer
2203    stopVTimer();
2204 }
2205
2206 static void bigsimThreadListener_resume(struct CthThreadListener *l)
2207 {
2208    // start timer by reset it
2209    resetVTime();
2210 }
2211
2212
2213 void BgSetStrategyBigSimDefault(CthThread t)
2214
2215   CthSetStrategy(t,
2216                  CthEnqueueBigSimThread,
2217                  CthSuspendBigSimThread);
2218
2219   CthThreadListener *a = new CthThreadListener;
2220   a->suspend = bigsimThreadListener_suspend;
2221   a->resume = bigsimThreadListener_resume;
2222   a->free = NULL;
2223   CthAddListener(t, a);
2224 }
2225
2226 int BgIsMainthread()
2227 {
2228     return tMYNODE == NULL;
2229 }
2230
2231 int BgIsRecord()
2232 {
2233     return cva(bgMach).record == 1 || cva(bgMach).recordnode == 1;
2234 }
2235
2236 int BgIsReplay()
2237 {
2238     return cva(bgMach).replay != -1 || cva(bgMach).replaynode != -1;
2239 }
2240
2241 extern "C" void CkReduce(void *msg, int size, CmiReduceMergeFn mergeFn) {
2242   ((workThreadInfo*)cta(threadinfo))->reduceMsg = msg;
2243   //CmiPrintf("Called CkReduce from %d %hd\n",CmiMyPe(),cta(threadinfo)->globalId);
2244   int numLocal = 0, count = 0;
2245   for (int j=0; j<cva(numNodes); j++){
2246     for(int i=0;i<cva(bgMach).numWth;i++){
2247       workThreadInfo *t = (workThreadInfo*)cva(nodeinfo)[j].threadinfo[i];
2248       if (t->reduceMsg == NULL) return; /* we are not yet ready to reduce */
2249       numLocal ++;
2250     }
2251   }
2252   
2253   /* Since the current message is passed is as "local" to the merge function,
2254    * and it will not be nullified in the upcoming loop, make it NULL explicitely. */
2255   ((workThreadInfo*)cta(threadinfo))->reduceMsg = NULL;
2256   
2257   void **msgLocal = (void**)malloc(sizeof(void*)*(numLocal-1));
2258   for (int j=0; j<cva(numNodes); j++){
2259     for(int i=0;i<cva(bgMach).numWth;i++){
2260       workThreadInfo *t = (workThreadInfo*)cva(nodeinfo)[j].threadinfo[i];
2261       if (t == cta(threadinfo)) break;
2262       msgLocal[count++] = t->reduceMsg;
2263       t->reduceMsg = NULL;
2264     }
2265   }
2266   CmiAssert(count==numLocal-1);
2267   msg = mergeFn(&size, msg, msgLocal, numLocal-1);
2268   CmiReduce(msg, size, mergeFn);
2269   //CmiPrintf("Called CmiReduce %d\n",CmiMyPe());
2270   for (int i=0; i<numLocal-1; ++i) CmiFree(msgLocal[i]);
2271   free(msgLocal);
2272 }
2273
2274 // for record/replay, to fseek back
2275 void BgRewindRecord()
2276 {
2277   threadInfo *tinfo = cta(threadinfo);
2278   if (tinfo->watcher) tinfo->watcher->rewind();
2279 }
2280
2281
2282 void BgRegisterUserTracingFunction(BgTracingFn fn)
2283 {
2284   userTracingFn = fn;
2285 }
2286
2287