No need to be too verbose
[charm.git] / src / langs / bluegene / blue.C
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 /** \file: blue.C -- Converse BlueGene Emulator Code
9  *  Emulator written by Gengbin Zheng, gzheng@uiuc.edu on 2/20/2001
10  */
11  
12 #include <stdio.h>
13 #include <string.h>
14 #include <stdlib.h>
15 #include <math.h>
16 #include <string.h>
17 #include <unistd.h>
18
19 #include "cklists.h"
20 #include "queueing.h"
21 #include "blue.h"
22 #include "blue_impl.h"          // implementation header file
23 //#include "blue_timing.h"      // timing module
24 #include "bigsim_record.h"
25
26 #include "bigsim_ooc.h" //out-of-core module
27 #include "bigsim_debug.h"
28
29 //#define  DEBUGF(x)      //CmiPrintf x;
30
31 #undef DEBUGLEVEL
32 #define DEBUGLEVEL 10
33
34 const double CHARM_OVERHEAD = 0.5E-6;
35
36 /* node level variables */
37 CpvDeclare(nodeInfo*, nodeinfo);                /* represent a bluegene node */
38
39 /* thread level variables */
40 CtvDeclare(threadInfo *, threadinfo);   /* represent a bluegene thread */
41
42 CpvStaticDeclare(CthThread, mainThread);
43
44 /* BG machine parameter */
45 CpvDeclare(BGMach, bgMach);     /* BG machine size description */
46 CpvDeclare(int, numNodes);        /* number of bg nodes on this PE */
47
48 /* emulator node level variables */
49 CpvDeclare(SimState, simState);
50
51 CpvDeclare(int      , CthResumeBigSimThreadIdx);
52
53 static int arg_argc;
54 static char **arg_argv;
55
56 CmiNodeLock initLock;     // used for BnvInitialize
57
58 int _bgSize = 0;                        // short cut of blue gene node size
59 int delayCheckFlag = 1;          // when enabled, only check correction 
60                                         // messages after some interval
61 int programExit = 0;
62
63 static int bgstats_flag = 0;            // flag print stats at end of simulation
64
65 // for debugging log
66 FILE *bgDebugLog;                       // for debugging
67
68 #ifdef CMK_ORIGIN2000
69 extern "C" int start_counters(int e0, int e1);
70 extern "C" int read_counters(int e0, long long *c0, int e1, long long *c1);
71 inline double Count2Time(long long c) { return c*5.e-7; }
72 #elif CMK_HAS_COUNTER_PAPI
73 #include <papi.h>
74 int *papiEvents = NULL;
75 int numPapiEvents;
76 long_long *papiValues = NULL;
77 CmiUInt8 *total_papi_counters = NULL;
78 char **papi_counters_desc = NULL;
79 #define MAX_TOTAL_EVENTS 10
80 #endif
81
82
83 /****************************************************************************
84      little utility functions
85 ****************************************************************************/
86
87 char **BgGetArgv() { return arg_argv; }
88 int    BgGetArgc() { return arg_argc; }
89
90 /***************************************************************************
91      Implementation of the same counter interface currently used for the
92      Origin2000 using PAPI in the underlying layer.
93
94      Because of the difference in counter numbering, it is assumed that
95      the two counters desired are CYCLES and FLOPS and hence the numbers
96      e0 and e1 are ignored.
97
98      start_counters is also not implemented because PAPI does things in
99      a different manner.
100 ****************************************************************************/
101
102 #if CMK_HAS_COUNTER_PAPI
103 /*
104 CmiUInt8 total_ins = 0;
105 CmiUInt8 total_fps = 0;
106 CmiUInt8 total_l1_dcm = 0;
107 CmiUInt8 total_mem_rcy = 0;
108 */
109 int init_counters()
110 {
111     int retval = PAPI_library_init(PAPI_VER_CURRENT);
112
113     if (retval != PAPI_VER_CURRENT) { CmiAbort("PAPI library init error!"); } 
114
115         //a temporary
116         const char *eventDesc[MAX_TOTAL_EVENTS];
117         int lpapiEvents[MAX_TOTAL_EVENTS];
118         
119     numPapiEvents = 0;
120         
121 #define ADD_LOW_LEVEL_EVENT(e, edesc) \
122                 if(PAPI_query_event(e) == PAPI_OK){ \
123                         if(CmiMyPe()==0) printf("PAPI Info:> Event for %s added\n", edesc); \
124                         eventDesc[numPapiEvents] = edesc;       \
125                         lpapiEvents[numPapiEvents++] = e; \
126                 }
127                 
128     // PAPI high level API does not require explicit library intialization
129         eventDesc[numPapiEvents] ="cycles";
130     lpapiEvents[numPapiEvents++] = PAPI_TOT_CYC;
131         
132         
133     /*if (PAPI_query_event(PAPI_FP_INS) == PAPI_OK) {
134       if (CmiMyPe()== 0) printf("PAPI_FP_INS used\n");
135           eventDesc[numPapiEvents] = "floating point instructions";     
136       lpapiEvents[numPapiEvents++] = PAPI_FP_INS;         
137     } else {
138       if (CmiMyPe()== 0) printf("PAPI_TOT_INS used\n");
139           eventDesc[numPapiEvents] = "total instructions";      
140       lpapiEvents[numPapiEvents++] = PAPI_TOT_INS;
141     }
142     */
143         
144         //ADD_LOW_LEVEL_EVENT(PAPI_FP_INS, "floating point instructions");
145         ADD_LOW_LEVEL_EVENT(PAPI_TOT_INS, "total instructions");
146         //ADD_LOW_LEVEL_EVENT(PAPI_L1_DCM, "L1 cache misses");
147         ADD_LOW_LEVEL_EVENT(PAPI_L2_DCM, "L2 data cache misses");       
148         //ADD_LOW_LEVEL_EVENT(PAPI_MEM_RCY, "idle cycles waiting for memory reads");    
149         //ADD_LOW_LEVEL_EVENT(PAPI_TLB_DM, "Data TLB misses");
150         
151         if(numPapiEvents == 0){
152                 CmiAbort("No papi events are defined!\n");
153         }
154         
155         if(numPapiEvents >= MAX_TOTAL_EVENTS){
156                 CmiAbort("Exceed the pre-defined max number of papi events allowed!\n");
157         }
158         
159         papiEvents = new int[numPapiEvents];
160         papiValues = new long_long[numPapiEvents];
161         total_papi_counters = new CmiUInt8[numPapiEvents];
162         papi_counters_desc = new char *[numPapiEvents];
163         for(int i=0; i<numPapiEvents; i++){
164                 papiEvents[i] = lpapiEvents[i];
165                 total_papi_counters[i] = 0;
166                 CmiPrintf("%d: %s\n", i, eventDesc[i]);
167                 papi_counters_desc[i] = new char[strlen(eventDesc[i])+1];
168                 memcpy(papi_counters_desc[i], eventDesc[i], strlen(eventDesc[i]));
169                 papi_counters_desc[i][strlen(eventDesc[i])] = 0;
170         }
171         
172 /*
173     if (PAPI_query_event(PAPI_L1_DCM) == PAPI_OK) {
174       if (CmiMyPe()== 0) printf("PAPI_L1_DCM used\n");
175       papiEvents[numPapiEvents++] = PAPI_L1_DCM;   // L1 cache miss
176     }
177     if (PAPI_query_event(PAPI_TLB_DM) == PAPI_OK) {
178       if (CmiMyPe()== 0) printf("PAPI_TLB_DM used\n");
179       papiEvents[numPapiEvents++] = PAPI_TLB_DM;   // data TLB misses
180     }
181     if (PAPI_query_event(PAPI_MEM_RCY) == PAPI_OK) {
182       if (CmiMyPe()== 0) printf("PAPI_MEM_RCY used\n");
183       papiEvents[numPapiEvents++] = PAPI_MEM_RCY;  // idle cycle waiting for reads
184     }
185 */
186
187     int status = PAPI_start_counters(papiEvents, numPapiEvents);
188     if (status != PAPI_OK) {
189       CmiPrintf("PAPI_start_counters error code: %d\n", status);
190       switch (status) {
191       case PAPI_ENOEVNT:  CmiPrintf("PAPI Error> Hardware Event does not exist\n"); break;
192       case PAPI_ECNFLCT:  CmiPrintf("PAPI Error> Hardware Event exists, but cannot be counted due to counter resource limitations\n"); break;
193       }
194       CmiAbort("Unable to start PAPI counters!\n");
195     }
196 }
197
198 int read_counters(long_long *papiValues, int n) 
199 {
200   // PAPI_read_counters resets the counter, hence it behaves like the perfctr
201   // code for Origin2000
202   int status;
203   status = PAPI_read_counters(papiValues, n);
204   if (status != PAPI_OK) {
205     CmiPrintf("PAPI_read_counters error: %d\n", status);
206     CmiAbort("Failed to read PAPI counters!\n");
207   }
208   
209   /*
210   total_ins += papiValues[0];
211   total_fps += papiValues[1];
212   total_l1_dcm += papiValues[2];
213   total_mem_rcy += papiValues[3];
214   */
215   
216   return 0;
217 }
218
219 inline void CountPapiEvents(){
220   for(int i=0 ;i<numPapiEvents; i++)
221         total_papi_counters[i] += papiValues[i];
222 }
223
224 inline double Count2Time(long_long *papiValues, int n) { 
225   return papiValues[1]*cva(bgMach).fpfactor; 
226 }
227 #endif
228
229 /*****************************************************************************
230      Handler Table, one per thread
231 ****************************************************************************/
232
233 extern "C" void defaultBgHandler(char *null, void *uPtr)
234 {
235   CmiAbort("BG> Invalid Handler called!\n");
236 }
237
238 HandlerTable::HandlerTable()
239 {
240     handlerTableCount = 1;
241     handlerTable = new BgHandlerInfo [MAX_HANDLERS];
242     for (int i=0; i<MAX_HANDLERS; i++) {
243       handlerTable[i].fnPtr = defaultBgHandler;
244       handlerTable[i].userPtr = NULL;
245     }
246 }
247
248 inline int HandlerTable::registerHandler(BgHandler h)
249 {
250     ASSERT(!cva(simState).inEmulatorInit);
251     /* leave 0 as blank, so it can report error luckily */
252     int cur = handlerTableCount++;
253     if (cur >= MAX_HANDLERS)
254       CmiAbort("BG> HandlerID exceed the maximum.\n");
255     handlerTable[cur].fnPtr = (BgHandlerEx)h;
256     handlerTable[cur].userPtr = NULL;
257     return cur;
258 }
259
260 inline void HandlerTable::numberHandler(int idx, BgHandler h)
261 {
262     ASSERT(!cva(simState).inEmulatorInit);
263     if (idx >= handlerTableCount || idx < 1)
264       CmiAbort("BG> HandlerID exceed the maximum!\n");
265     handlerTable[idx].fnPtr = (BgHandlerEx)h;
266     handlerTable[idx].userPtr = NULL;
267 }
268
269 inline void HandlerTable::numberHandlerEx(int idx, BgHandlerEx h, void *uPtr)
270 {
271     ASSERT(!cva(simState).inEmulatorInit);
272     if (idx >= handlerTableCount || idx < 1)
273       CmiAbort("BG> HandlerID exceed the maximum!\n");
274     handlerTable[idx].fnPtr = h;
275     handlerTable[idx].userPtr = uPtr;
276 }
277
278 inline BgHandlerInfo * HandlerTable::getHandle(int handler)
279 {
280 #if 0
281     if (handler >= handlerTableCount) {
282       CmiPrintf("[%d] handler: %d handlerTableCount:%d. \n", tMYNODEID, handler, handlerTableCount);
283       CmiAbort("Invalid handler!");
284     }
285 #endif
286     if (handler >= handlerTableCount || handler<0) return NULL;
287     return &handlerTable[handler];
288 }
289
290 /*****************************************************************************
291       low level API
292 *****************************************************************************/
293
294 int BgRegisterHandler(BgHandler h)
295 {
296   ASSERT(!cva(simState).inEmulatorInit);
297   int cur;
298 #if CMK_BLUEGENE_NODE
299   return tMYNODE->handlerTable.registerHandler(h);
300 #else
301   if (tTHREADTYPE == COMM_THREAD) {
302     return tMYNODE->handlerTable.registerHandler(h);
303   }
304   else {
305     return tHANDLETAB.registerHandler(h);
306   }
307 #endif
308 }
309
310 void BgNumberHandler(int idx, BgHandler h)
311 {
312   ASSERT(!cva(simState).inEmulatorInit);
313 #if CMK_BLUEGENE_NODE
314   tMYNODE->handlerTable.numberHandler(idx,h);
315 #else
316   if (tTHREADTYPE == COMM_THREAD) {
317     tMYNODE->handlerTable.numberHandler(idx, h);
318   }
319   else {
320     tHANDLETAB.numberHandler(idx, h);
321   }
322 #endif
323 }
324
325 void BgNumberHandlerEx(int idx, BgHandlerEx h, void *uPtr)
326 {
327   ASSERT(!cva(simState).inEmulatorInit);
328 #if CMK_BLUEGENE_NODE
329   tMYNODE->handlerTable.numberHandlerEx(idx,h,uPtr);
330 #else
331   if (tTHREADTYPE == COMM_THREAD) {
332     tMYNODE->handlerTable.numberHandlerEx(idx,h,uPtr);
333   }
334   else {
335     tHANDLETAB.numberHandlerEx(idx,h,uPtr);
336   }
337 #endif
338 }
339
340 /*****************************************************************************
341       BG Timing Functions
342 *****************************************************************************/
343
344 void resetVTime()
345 {
346   /* reset start time */
347   int timingMethod = cva(bgMach).timingMethod;
348   if (timingMethod == BG_WALLTIME) {
349     double ct = BG_TIMER();
350     if (tTIMERON) CmiAssert(ct >= tSTARTTIME);
351     tSTARTTIME = ct;
352   }
353   else if (timingMethod == BG_ELAPSE)
354     tSTARTTIME = tCURRTIME;
355 #ifdef CMK_ORIGIN2000
356   else if (timingMethod == BG_COUNTER) {
357     if (start_counters(0, 21) <0) {
358       perror("start_counters");;
359     }
360   }
361 #elif CMK_HAS_COUNTER_PAPI
362   else if (timingMethod == BG_COUNTER) {
363     // do a fake read to reset the counters. It would be more efficient
364     // to use the low level API, but that would be a lot more code to
365     // write for now.
366     if (read_counters(papiValues, numPapiEvents) < 0) perror("read_counters");
367   }
368 #endif
369 }
370
371 void startVTimer()
372 {
373   CmiAssert(tTIMERON == 0);
374   resetVTime();
375   tTIMERON = 1;
376 }
377
378 // should be used only when BG_WALLTIME
379 static inline void advanceTime(double inc)
380 {
381   if (BG_ABS(inc) < 1e-10) inc = 0.0;    // ignore floating point errors
382   if (inc < 0.0) return;
383   CmiAssert(inc>=0.0);
384   inc *= cva(bgMach).cpufactor;
385   tCURRTIME += inc;
386   CmiAssert(tTIMERON==1);
387 }
388
389 void stopVTimer()
390 {
391   int k;
392 #if 0
393   if (tTIMERON != 1) {
394     CmiAbort("stopVTimer called without startVTimer!\n");
395   }
396   CmiAssert(tTIMERON == 1);
397 #else
398   if (tTIMERON == 0) return;         // already stopped
399 #endif
400   const int timingMethod = cva(bgMach).timingMethod;
401   if (timingMethod == BG_WALLTIME) {
402     const double tp = BG_TIMER();
403     double inc = tp-tSTARTTIME;
404     advanceTime(inc-cva(bgMach).timercost);
405 //    tSTARTTIME = BG_TIMER();  // skip the above time
406   }
407   else if (timingMethod == BG_ELAPSE) {
408     // if no bgelapse called, assume it takes 1us
409     if (tCURRTIME-tSTARTTIME < 1E-9) {
410 //      tCURRTIME += 1e-6;
411     }
412   }
413   else if (timingMethod == BG_COUNTER)  {
414 #if CMK_ORIGIN2000
415     long long c0, c1;
416     if (read_counters(0, &c0, 21, &c1) < 0) perror("read_counters");
417     tCURRTIME += Count2Time(c1);
418 #elif CMK_HAS_COUNTER_PAPI
419     if (read_counters(papiValues, numPapiEvents) < 0) perror("read_counters");
420     CountPapiEvents();
421     tCURRTIME += Count2Time(papiValues, numPapiEvents);
422 #endif
423   }
424   tTIMERON = 0;
425 }
426
427 double BgGetTime()
428 {
429 #if 1
430   const int timingMethod = cva(bgMach).timingMethod;
431   if (timingMethod == BG_WALLTIME) {
432     /* accumulate time since last starttime, and reset starttime */
433     if (tTIMERON) {
434       const double tp2= BG_TIMER();
435       double &startTime = tSTARTTIME;
436       double inc = tp2 - startTime;
437       advanceTime(inc-cva(bgMach).timercost);
438       startTime = BG_TIMER();
439     }
440     return tCURRTIME;
441   }
442   else if (timingMethod == BG_ELAPSE) {
443     return tCURRTIME;
444   }
445   else if (timingMethod == BG_COUNTER) {
446     if (tTIMERON) {
447 #if CMK_ORIGIN2000
448       long long c0, c1;
449       if (read_counters(0, &c0, 21, &c1) <0) perror("read_counters");;
450       tCURRTIME += Count2Time(c1);
451       if (start_counters(0, 21)<0) perror("start_counters");;
452 #elif CMK_HAS_COUNTER_PAPI
453     if (read_counters(papiValues, numPapiEvents) < 0) perror("read_counters");
454     tCURRTIME += Count2Time(papiValues, numPapiEvents);
455 #endif
456     }
457     return tCURRTIME;
458   }
459   else 
460     CmiAbort("Unknown Timing Method.");
461   return -1;
462 #else
463   /* sometime I am interested in real wall time */
464   tCURRTIME = CmiWallTimer();
465   return tCURRTIME;
466 #endif
467 }
468
469 // moved to blue_logs.C
470 double BgGetCurTime()
471 {
472   ASSERT(tTHREADTYPE == WORK_THREAD);
473   return tCURRTIME;
474 }
475
476 extern "C" 
477 void BgElapse(double t)
478 {
479 //  ASSERT(tTHREADTYPE == WORK_THREAD);
480   if (cva(bgMach).timingMethod == BG_ELAPSE)
481     tCURRTIME += t;
482 }
483
484 // advance virtual timer no matter what scheme is used
485 extern "C" 
486 void BgAdvance(double t)
487 {
488 //  ASSERT(tTHREADTYPE == WORK_THREAD);
489   tCURRTIME += t;
490 }
491
492 /* BG API Func
493  * called by a communication thread to test if poll data 
494  * in the node's INBUFFER for its own queue 
495  */
496 char * getFullBuffer()
497 {
498   /* I must be a communication thread */
499   if (tTHREADTYPE != COMM_THREAD) 
500     CmiAbort("GetFullBuffer called by a non-communication thread!\n");
501
502   return tMYNODE->getFullBuffer();
503 }
504
505 /**  BG API Func
506  * called by a Converse handler or sendPacket()
507  * add message msgPtr to a bluegene node's inbuffer queue 
508  */
509 void addBgNodeInbuffer(char *msgPtr, int lnodeID)
510 {
511 #ifndef CMK_OPTIMIZE
512   if (lnodeID >= cva(numNodes)) CmiAbort("NodeID is out of range!");
513 #endif
514   nodeInfo &nInfo = cva(nodeinfo)[lnodeID];
515
516   //printf("Adding a msg %p to local node %d and its thread %d\n", msgPtr, lnodeID, CmiBgMsgThreadID(msgPtr));  
517         
518   nInfo.addBgNodeInbuffer(msgPtr);
519 }
520
521 /** BG API Func 
522  *  called by a comm thread
523  *  add a message to a thread's affinity queue in same node 
524  */
525 void addBgThreadMessage(char *msgPtr, int threadID)
526 {
527 #ifndef CMK_OPTIMIZE
528   if (!cva(bgMach).isWorkThread(threadID)) CmiAbort("ThreadID is out of range!");
529 #endif
530   workThreadInfo *tInfo = (workThreadInfo *)tMYNODE->threadinfo[threadID];
531   tInfo->addAffMessage(msgPtr);
532 }
533
534 /** BG API Func 
535  *  called by a comm thread, add a message to a node's non-affinity queue 
536  */
537 void addBgNodeMessage(char *msgPtr)
538 {
539   tMYNODE->addBgNodeMessage(msgPtr);
540 }
541
542 void BgEnqueue(char *msg)
543 {
544 #if 0
545   ASSERT(tTHREADTYPE == WORK_THREAD);
546   workThreadInfo *tinfo = (workThreadInfo *)cta(threadinfo);
547   tinfo->addAffMessage(msg);
548 #else
549   nodeInfo *myNode = cta(threadinfo)->myNode;
550   addBgNodeInbuffer(msg, myNode->id);
551 #endif
552 }
553
554 /** BG API Func 
555  *  check if inBuffer on this node has msg available
556  */
557 int checkReady()
558 {
559   if (tTHREADTYPE != COMM_THREAD)
560     CmiAbort("checkReady called by a non-communication thread!\n");
561   return !tINBUFFER.isEmpty();
562 }
563
564
565 /* handler to process the msg */
566 void msgHandlerFunc(char *msg)
567 {
568   /* bgmsg is CmiMsgHeaderSizeBytes offset of original message pointer */
569   int gnodeID = CmiBgMsgNodeID(msg);
570   if (gnodeID >= 0) {
571 #ifndef CMK_OPTIMIZE
572     if (nodeInfo::Global2PE(gnodeID) != CmiMyPe())
573       CmiAbort("msgHandlerFunc received wrong message!");
574 #endif
575     int lnodeID = nodeInfo::Global2Local(gnodeID);
576     if (cva(bgMach).inReplayMode()) {
577       int x, y, z;
578       BgGetXYZ(cva(bgMach).replay, &x, &y, &z);
579       if (nodeInfo::XYZ2Local(x,y,z) != lnodeID) return;
580       else lnodeID = 0;
581     }
582     addBgNodeInbuffer(msg, lnodeID);
583   }
584   else {
585     CmiAbort("Invalid message!");
586   }
587 }
588
589 /* Converse handler for node level broadcast message */
590 void nodeBCastMsgHandlerFunc(char *msg)
591 {
592   /* bgmsg is CmiMsgHeaderSizeBytes offset of original message pointer */
593   int gnodeID = CmiBgMsgNodeID(msg);
594   CmiInt2 threadID = CmiBgMsgThreadID(msg);
595   int lnodeID;
596
597   if (gnodeID < -1) {
598     gnodeID = - (gnodeID+100);
599     if (nodeInfo::Global2PE(gnodeID) == CmiMyPe())
600       lnodeID = nodeInfo::Global2Local(gnodeID);
601     else
602       lnodeID = -1;
603   }
604   else {
605     ASSERT(gnodeID == -1);
606     lnodeID = gnodeID;
607   }
608   // broadcast except lnodeID:threadId
609   int len = CmiBgMsgLength(msg);
610   int count = 0;
611   for (int i=0; i<cva(numNodes); i++)
612   {
613     if (i==lnodeID) continue;
614     char *dupmsg;
615     if (count == 0) dupmsg = msg;
616     else dupmsg = CmiCopyMsg(msg, len);
617     DEBUGF(("addBgNodeInbuffer to %d\n", i));
618     CmiBgMsgNodeID(dupmsg) = nodeInfo::Local2Global(i);         // updated
619     addBgNodeInbuffer(dupmsg, i);
620     count ++;
621   }
622   if (count == 0) CmiFree(msg);
623 }
624
625 // clone a msg, only has a valid header, plus a pointer to the real msg
626 char *BgCloneMsg(char *msg)
627 {
628   int size = CmiBlueGeneMsgHeaderSizeBytes + sizeof(char *);
629   char *dupmsg = (char *)CmiAlloc(size);
630   memcpy(dupmsg, msg, CmiBlueGeneMsgHeaderSizeBytes);
631   *(char **)(dupmsg + CmiBlueGeneMsgHeaderSizeBytes) = msg;
632   CmiBgMsgRefCount(msg) ++;
633   CmiBgMsgFlag(dupmsg) = BG_CLONE;
634   return dupmsg;
635 }
636
637 // expand the cloned msg to the full size msg
638 char *BgExpandMsg(char *msg)
639 {
640   char *origmsg = *(char **)(msg + CmiBlueGeneMsgHeaderSizeBytes);
641   int size = CmiBgMsgLength(origmsg);
642   char *dupmsg = (char *)CmiAlloc(size);
643   memcpy(dupmsg, msg, CmiBlueGeneMsgHeaderSizeBytes);
644   memcpy(dupmsg+CmiBlueGeneMsgHeaderSizeBytes, origmsg+CmiBlueGeneMsgHeaderSizeBytes, size-CmiBlueGeneMsgHeaderSizeBytes);
645   CmiFree(msg);
646   CmiBgMsgRefCount(origmsg) --;
647   if (CmiBgMsgRefCount(origmsg) == 0) CmiFree(origmsg);
648   return dupmsg;
649 }
650
651 /* Converse handler for thread level broadcast message */
652 void threadBCastMsgHandlerFunc(char *msg)
653 {
654   /* bgmsg is CmiMsgHeaderSizeBytes offset of original message pointer */
655   int gnodeID = CmiBgMsgNodeID(msg);
656   CmiInt2 threadID = CmiBgMsgThreadID(msg);
657   int lnodeID;
658   if (gnodeID < -1) {
659       gnodeID = - (gnodeID+100);
660       if (nodeInfo::Global2PE(gnodeID) == CmiMyPe())
661         lnodeID = nodeInfo::Global2Local(gnodeID);
662       else
663         lnodeID = -1;
664       CmiAssert(threadID != ANYTHREAD);
665   }
666   else {
667     ASSERT(gnodeID == -1);
668     lnodeID = gnodeID;
669   }
670   // broadcast except nodeID:threadId
671   int len = CmiBgMsgLength(msg);
672   // optimization needed if the message size is big
673   // making duplications can easily run out of memory
674   int bigOpt = (len > 4096);
675   for (int i=0; i<cva(numNodes); i++)
676   {
677       for (int j=0; j<cva(bgMach).numWth; j++) {
678         if (i==lnodeID && j==threadID) continue;
679         // for big message, clone a message token instead of a real msg
680         char *dupmsg = bigOpt? BgCloneMsg(msg) : CmiCopyMsg(msg, len);
681         CmiBgMsgNodeID(dupmsg) = nodeInfo::Local2Global(i);
682         CmiBgMsgThreadID(dupmsg) = j;
683         DEBUGF(("[%d] addBgNodeInbuffer to %d tid:%d\n", CmiMyPe(), i, j));
684         addBgNodeInbuffer(dupmsg, i);
685       }
686   }
687   // for big message, will free after all tokens are done
688   if (!bigOpt) CmiFree(msg);
689 }
690
691 /**
692  *              BG Messaging Functions
693  */
694
695 static inline double MSGTIME(int ox, int oy, int oz, int nx, int ny, int nz, int bytes)
696 {
697   return cva(bgMach).network->latency(ox, oy, oz, nx, ny, nz, bytes);
698 }
699
700 /**
701  *   a simple message streaming on demand and special purpose
702  *   user call  BgStartStreaming() and BgEndStreaming()
703  *   each worker thread call one send, and all sends are sent
704  *   via multiplesend at the end
705  */
706
707 static int bg_streaming = 0;
708
709 class BgStreaming {
710 public:
711   char **streamingMsgs;
712   int  *streamingMsgSizes;
713   int count;
714   int totalWorker;
715   int pe;
716 public:
717   BgStreaming() {
718     streamingMsgs = NULL;
719     streamingMsgSizes = NULL;
720     count = 0;
721     totalWorker = 0;
722     pe = -1;
723   }
724   ~BgStreaming() {
725     if (streamingMsgs) {
726       delete [] streamingMsgs;
727       delete [] streamingMsgSizes;
728     }
729   }
730   void init(int nNodes) {
731     totalWorker = nNodes * BgGetNumWorkThread();
732     streamingMsgs = new char *[totalWorker];
733     streamingMsgSizes = new int [totalWorker];
734   }
735   void depositMsg(int p, int size, char *m) {
736     streamingMsgs[count] = m;
737     streamingMsgSizes[count] = size;
738     count ++;
739     if (pe == -1) pe = p;
740     else CmiAssert(pe == p);
741     if (count == totalWorker) {
742       // CkPrintf("streaming send\n");
743       CmiMultipleSend(pe, count, streamingMsgSizes, streamingMsgs);
744       for (int i=0; i<count; i++) CmiFree(streamingMsgs[i]);
745       pe = -1;
746       count = 0;
747     }
748   }
749 };
750
751 BgStreaming bgstreaming;
752
753 void BgStartStreaming()
754 {
755   bg_streaming = 1;
756 }
757
758 void BgEndStreaming()
759 {
760   bg_streaming = 0;
761 }
762
763 void CmiSendPacketWrapper(int pe, int msgSize,char *msg, int streaming)
764 {
765   if (streaming && pe != CmiMyPe())
766     bgstreaming.depositMsg(pe, msgSize, msg);
767   else
768     CmiSyncSendAndFree(pe, msgSize, msg);
769 }
770
771
772 void CmiSendPacket(int x, int y, int z, int msgSize,char *msg)
773 {
774 //  CmiSyncSendAndFree(nodeInfo::XYZ2RealPE(x,y,z),msgSize,(char *)msg);
775 #if !DELAY_SEND
776   const int pe = nodeInfo::XYZ2RealPE(x,y,z);
777   CmiSendPacketWrapper(pe, msgSize, msg, bg_streaming);
778 #else
779   if (!correctTimeLog) {
780     const int pe = nodeInfo::XYZ2RealPE(x,y,z);
781     CmiSendPacketWrapper(pe, msgSize, msg, bg_streaming);
782   }
783   // else messages are kept in the log (MsgEntry), and only will be sent
784   // after timing correction has done on that log.
785   // TODO: streaming has no effect if time correction is on.
786 #endif
787 }
788
789 /* send will copy data to msg buffer */
790 /* user data is not free'd in this routine, user can reuse the data ! */
791 void sendPacket_(nodeInfo *myNode, int x, int y, int z, int threadID, int handlerID, WorkType type, int numbytes, char* sendmsg, int local)
792 {
793   //CmiPrintStackTrace(0);
794   double sendT = BgGetTime();
795
796   double latency;
797   CmiSetHandler(sendmsg, cva(simState).msgHandler);
798   CmiBgMsgNodeID(sendmsg) = nodeInfo::XYZ2Global(x,y,z);
799   CmiBgMsgThreadID(sendmsg) = threadID;
800   CmiBgMsgHandle(sendmsg) = handlerID;
801   CmiBgMsgType(sendmsg) = type;
802   CmiBgMsgLength(sendmsg) = numbytes;
803   CmiBgMsgFlag(sendmsg) = 0;
804   CmiBgMsgRefCount(sendmsg) = 0;
805   if (local) {
806     if (correctTimeLog) BgAdvance(CHARM_OVERHEAD);
807     latency = 0.0;
808   }
809   else {
810     if (correctTimeLog) BgAdvance(cva(bgMach).network->alphacost());
811     latency = MSGTIME(myNode->x, myNode->y, myNode->z, x,y,z, numbytes);
812     CmiAssert(latency >= 0);
813   }
814   CmiBgMsgRecvTime(sendmsg) = latency + sendT;
815   
816   // timing
817   BG_ADDMSG(sendmsg, CmiBgMsgNodeID(sendmsg), threadID, sendT, local, 1);
818
819   //static int addCnt=1; //for debugging only
820   //DEBUGM(4, ("N[%d] add a msg (handler=%d | cnt=%d | len=%d | type=%d | node id:%d\n", BgMyNode(), handlerID, addCnt, numbytes, type, CmiBgMsgNodeID(sendmsg)));  
821   //addCnt++;
822
823   if (local){
824       /* Here local refers to the fact that msg is sent to the processor itself
825        * therefore, we just add this msg to the thread itself
826        */
827       //addBgThreadMessage(sendmsg,threadID);
828       addBgNodeInbuffer(sendmsg, myNode->id);
829   }    
830   else
831     CmiSendPacket(x, y, z, numbytes, sendmsg);
832
833   // bypassing send time
834   resetVTime();
835 }
836
837 /* broadcast will copy data to msg buffer */
838 static inline void nodeBroadcastPacketExcept_(int node, CmiInt2 threadID, int handlerID, WorkType type, int numbytes, char* sendmsg)
839 {
840   double sendT = BgGetTime();
841
842   nodeInfo *myNode = cta(threadinfo)->myNode;
843   CmiSetHandler(sendmsg, cva(simState).nBcastMsgHandler);
844   if (node >= 0)
845     CmiBgMsgNodeID(sendmsg) = -node-100;
846   else
847     CmiBgMsgNodeID(sendmsg) = node;
848   CmiBgMsgThreadID(sendmsg) = threadID; 
849   CmiBgMsgHandle(sendmsg) = handlerID;  
850   CmiBgMsgType(sendmsg) = type; 
851   CmiBgMsgLength(sendmsg) = numbytes;
852   CmiBgMsgFlag(sendmsg) = 0;
853   CmiBgMsgRefCount(sendmsg) = 0;
854   /* FIXME */
855   CmiBgMsgRecvTime(sendmsg) = MSGTIME(myNode->x, myNode->y, myNode->z, 0,0,0, numbytes) + sendT;
856
857   // timing
858   // FIXME
859   BG_ADDMSG(sendmsg, CmiBgMsgNodeID(sendmsg), threadID, sendT, 0, 1);
860
861   DEBUGF(("[%d]CmiSyncBroadcastAllAndFree node: %d\n", BgMyNode(), node));
862 #if DELAY_SEND
863   if (!correctTimeLog)
864 #endif
865   CmiSyncBroadcastAllAndFree(numbytes,sendmsg);
866
867   resetVTime();
868 }
869
870 /* broadcast will copy data to msg buffer */
871 static inline void threadBroadcastPacketExcept_(int node, CmiInt2 threadID, int handlerID, WorkType type, int numbytes, char* sendmsg)
872 {
873   CmiSetHandler(sendmsg, cva(simState).tBcastMsgHandler);       
874   if (node >= 0)
875     CmiBgMsgNodeID(sendmsg) = -node-100;
876   else
877     CmiBgMsgNodeID(sendmsg) = node;
878   CmiBgMsgThreadID(sendmsg) = threadID; 
879   CmiBgMsgHandle(sendmsg) = handlerID;  
880   CmiBgMsgType(sendmsg) = type; 
881   CmiBgMsgLength(sendmsg) = numbytes;
882   CmiBgMsgFlag(sendmsg) = 0;
883   CmiBgMsgRefCount(sendmsg) = 0;
884   /* FIXME */
885   if (correctTimeLog) BgAdvance(cva(bgMach).network->alphacost());
886   double sendT = BgGetTime();
887   CmiBgMsgRecvTime(sendmsg) = sendT;    
888
889   // timing
890 #if 0
891   if (node == BG_BROADCASTALL) {
892     for (int i=0; i<_bgSize; i++) {
893       for (int j=0; j<cva(numWth); j++) {
894         BG_ADDMSG(sendmsg, node);
895       }
896     }
897   }
898   else {
899     CmiAssert(node >= 0);
900     BG_ADDMSG(sendmsg, (node+100));
901   }
902 #else
903   // FIXME
904   BG_ADDMSG(sendmsg, CmiBgMsgNodeID(sendmsg), threadID, sendT, 0, 1);
905 #endif
906
907   DEBUGF(("[%d]CmiSyncBroadcastAllAndFree node: %d tid:%d recvT:%f\n", BgMyNode(), node, threadID, CmiBgMsgRecvTime(sendmsg)));
908 #if DELAY_SEND
909   if (!correctTimeLog)
910 #endif
911   CmiSyncBroadcastAllAndFree(numbytes,sendmsg);
912
913   resetVTime();
914 }
915
916
917 /* sendPacket to route */
918 /* this function can be called by any thread */
919 void BgSendNonLocalPacket(nodeInfo *myNode, int x, int y, int z, int threadID, int handlerID, WorkType type, int numbytes, char * data)
920 {
921   if (cva(bgMach).inReplayMode()) return;     // replay mode, no outgoing msg
922
923 #ifndef CMK_OPTIMIZE
924   if (x<0 || y<0 || z<0 || x>=cva(bgMach).x || y>=cva(bgMach).y || z>=cva(bgMach).z) {
925     CmiPrintf("Trying to send packet to a nonexisting node: (%d %d %d)!\n", x,y,z);
926     CmiAbort("Abort!\n");
927   }
928 #endif
929
930   sendPacket_(myNode, x, y, z, threadID, handlerID, type, numbytes, data, 0);
931 }
932
933 static void _BgSendLocalPacket(nodeInfo *myNode, int threadID, int handlerID, WorkType type, int numbytes, char * data)
934 {
935   sendPacket_(myNode, myNode->x, myNode->y, myNode->z, threadID, handlerID, type, numbytes, data, 1);
936 }
937
938 void BgSendLocalPacket(int threadID, int handlerID, WorkType type,
939                        int numbytes, char* data)
940 {
941   nodeInfo *myNode = cta(threadinfo)->myNode;
942
943   if (cva(bgMach).inReplayMode()) threadID = 0;    // replay mode
944
945   _BgSendLocalPacket(myNode, threadID, handlerID, type, numbytes, data);
946 }
947
948 /* wrapper of the previous two functions */
949 void BgSendPacket(int x, int y, int z, int threadID, int handlerID, WorkType type, int numbytes, char * data)
950 {
951   nodeInfo *myNode = cta(threadinfo)->myNode;
952   if (myNode->x == x && myNode->y == y && myNode->z == z)
953     _BgSendLocalPacket(myNode,threadID, handlerID, type, numbytes, data);
954   else
955     BgSendNonLocalPacket(myNode,x,y,z,threadID,handlerID, type, numbytes, data);
956 }
957
958 void BgBroadcastPacketExcept(int node, CmiInt2 threadID, int handlerID, WorkType type, int numbytes, char * data)
959 {
960   nodeBroadcastPacketExcept_(node, threadID, handlerID, type, numbytes, data);
961 }
962
963 void BgBroadcastAllPacket(int handlerID, WorkType type, int numbytes, char * data)
964 {
965   nodeBroadcastPacketExcept_(BG_BROADCASTALL, ANYTHREAD, handlerID, type, numbytes, data);
966 }
967
968 void BgThreadBroadcastPacketExcept(int node, CmiInt2 threadID, int handlerID, WorkType type, int numbytes, char * data)
969 {
970   if (cva(bgMach).inReplayMode()) return;    // replay mode
971   threadBroadcastPacketExcept_(node, threadID, handlerID, type, numbytes, data);
972 }
973
974 void BgThreadBroadcastAllPacket(int handlerID, WorkType type, int numbytes, char * data)
975 {
976   if (cva(bgMach).inReplayMode()) {      // replay mode, send only to itself
977     BgSendLocalPacket(ANYTHREAD, handlerID, type, numbytes, data);
978     return;
979   }
980   threadBroadcastPacketExcept_(BG_BROADCASTALL, ANYTHREAD, handlerID, type, numbytes, data);
981 }
982
983 /**
984  send a msg to a list of processors (processors represented in global seq #
985 */
986 void BgSyncListSend(int npes, int *pes, int handlerID, WorkType type, int numbytes, char *msg)
987 {
988   nodeInfo *myNode = cta(threadinfo)->myNode;
989
990   CmiSetHandler(msg, cva(simState).msgHandler);
991   CmiBgMsgHandle(msg) = handlerID;
992   CmiBgMsgType(msg) = type;
993   CmiBgMsgLength(msg) = numbytes;
994   CmiBgMsgFlag(msg) = 0;
995   CmiBgMsgRefCount(msg) = 0;
996
997   if (correctTimeLog) BgAdvance(cva(bgMach).network->alphacost());
998
999   double now = BgGetTime();
1000
1001   // send one by one
1002   for (int i=0; i<npes; i++)
1003   {
1004     int local = 0;
1005     int x,y,z,t;
1006     int pe = pes[i];
1007 #if CMK_BLUEGENE_NODE
1008     CmiAbort("Not implemented yet!");
1009 #else
1010     t = pe%BgGetNumWorkThread();
1011     pe = pe/BgGetNumWorkThread();
1012     BgGetXYZ(pe, &x, &y, &z);
1013 #endif
1014
1015     char *sendmsg = CmiCopyMsg(msg, numbytes);
1016     CmiBgMsgNodeID(sendmsg) = nodeInfo::XYZ2Global(x,y,z);
1017     CmiBgMsgThreadID(sendmsg) = t;
1018     double latency = MSGTIME(myNode->x, myNode->y, myNode->z, x,y,z, numbytes);
1019     CmiAssert(latency >= 0);
1020     CmiBgMsgRecvTime(sendmsg) = latency + now;
1021
1022     if (myNode->x == x && myNode->y == y && myNode->z == z) local = 1;
1023
1024     // timing and make sure all msgID are the same
1025     if (i!=0) CpvAccess(msgCounter) --;
1026     BG_ADDMSG(sendmsg, CmiBgMsgNodeID(sendmsg), t, now, local, i==0?npes:-1);
1027
1028     if (myNode->x == x && myNode->y == y && myNode->z == z)
1029       addBgNodeInbuffer(sendmsg, myNode->id);
1030     else
1031       CmiSendPacket(x, y, z, numbytes, sendmsg);
1032   }
1033
1034   CmiFree(msg);
1035
1036   resetVTime();
1037 }
1038
1039 /*****************************************************************************
1040       BG node level API - utilities
1041 *****************************************************************************/
1042
1043 /* must be called in a communication or worker thread */
1044 void BgGetMyXYZ(int *x, int *y, int *z)
1045 {
1046   ASSERT(!cva(simState).inEmulatorInit);
1047   *x = tMYX; *y = tMYY; *z = tMYZ;
1048 }
1049
1050 void BgGetXYZ(int seq, int *x, int *y, int *z)
1051 {
1052   nodeInfo::Global2XYZ(seq, x, y, z);
1053 }
1054
1055 void BgGetSize(int *sx, int *sy, int *sz)
1056 {
1057   cva(bgMach).getSize(sx, sy, sz);
1058 }
1059
1060 int BgTraceProjectionOn(int pe)
1061 {
1062   return cva(bgMach).traceProejctions(pe);
1063 }
1064
1065 /* return the total number of Blue gene nodes */
1066 int BgNumNodes()
1067 {
1068   return _bgSize;
1069 }
1070
1071 void BgSetNumNodes(int x)
1072 {
1073   _bgSize = x;
1074 }
1075
1076 /* can only called in emulatorinit */
1077 void BgSetSize(int sx, int sy, int sz)
1078 {
1079   ASSERT(cva(simState).inEmulatorInit);
1080   cva(bgMach).setSize(sx, sy, sz);
1081 }
1082
1083 /* return number of bg nodes on this emulator node */
1084 int BgNodeSize()
1085 {
1086   ASSERT(!cva(simState).inEmulatorInit);
1087   return cva(numNodes);
1088 }
1089
1090 /* return the bg node ID (local array index) */
1091 int BgMyRank()
1092 {
1093 #ifndef CMK_OPTIMIZE
1094   if (tMYNODE == NULL) CmiAbort("Calling BgMyRank in the main thread!");
1095 #endif
1096   ASSERT(!cva(simState).inEmulatorInit);
1097   return tMYNODEID;
1098 }
1099
1100 /* return my serialed blue gene node number */
1101 int BgMyNode()
1102 {
1103 #ifndef CMK_OPTIMIZE
1104   if (tMYNODE == NULL) CmiAbort("Calling BgMyNode in the main thread!");
1105 #endif
1106   return nodeInfo::XYZ2Global(tMYX, tMYY, tMYZ);
1107 }
1108
1109 /* return a real processor number from a bg node */
1110 int BgNodeToRealPE(int node)
1111 {
1112   return nodeInfo::Global2PE(node);
1113 }
1114
1115 // thread ID on a BG node
1116 int BgGetThreadID()
1117 {
1118   ASSERT(tTHREADTYPE == WORK_THREAD || tTHREADTYPE == COMM_THREAD);
1119 //  if (cva(bgMach).numWth == 1) return 0;   // accessing ctv is expensive
1120   return tMYID;
1121 }
1122
1123 void BgSetThreadID(int x)
1124 {
1125   tMYID = x;
1126 }
1127
1128 int BgGetGlobalThreadID()
1129 {
1130   ASSERT(tTHREADTYPE == WORK_THREAD || tTHREADTYPE == COMM_THREAD);
1131   return nodeInfo::Local2Global(tMYNODEID)*(cva(bgMach).numTh())+tMYID;
1132   //return tMYGLOBALID;
1133 }
1134
1135 int BgGetGlobalWorkerThreadID()
1136 {
1137   ASSERT(tTHREADTYPE == WORK_THREAD);
1138 //  return nodeInfo::Local2Global(tMYNODEID)*cva(bgMach).numWth+tMYID;
1139   return tMYGLOBALID;
1140 }
1141
1142 void BgSetGlobalWorkerThreadID(int pe)
1143 {
1144   ASSERT(tTHREADTYPE == WORK_THREAD);
1145 //  return nodeInfo::Local2Global(tMYNODEID)*cva(bgMach).numWth+tMYID;
1146   tMYGLOBALID = pe;
1147 }
1148
1149 char *BgGetNodeData()
1150 {
1151   return tUSERDATA;
1152 }
1153
1154 void BgSetNodeData(char *data)
1155 {
1156   ASSERT(!cva(simState).inEmulatorInit);
1157   tUSERDATA = data;
1158 }
1159
1160 int BgGetNumWorkThread()
1161 {
1162   return cva(bgMach).numWth;
1163 }
1164
1165 void BgSetNumWorkThread(int num)
1166 {
1167   if (!cva(bgMach).inReplayMode()) ASSERT(cva(simState).inEmulatorInit);
1168   cva(bgMach).numWth = num;
1169 }
1170
1171 int BgGetNumCommThread()
1172 {
1173   return cva(bgMach).numCth;
1174 }
1175
1176 void BgSetNumCommThread(int num)
1177 {
1178   ASSERT(cva(simState).inEmulatorInit);
1179   cva(bgMach).numCth = num;
1180 }
1181
1182 /*****************************************************************************
1183       Communication and Worker threads
1184 *****************************************************************************/
1185
1186 BgStartHandler  workStartFunc = NULL;
1187
1188 void BgSetWorkerThreadStart(BgStartHandler f)
1189 {
1190   workStartFunc = f;
1191 }
1192
1193 extern "C" void CthResumeNormalThread(CthThreadToken* token);
1194
1195 // kernel function for processing a bluegene message
1196 void BgProcessMessage(threadInfo *tinfo, char *msg)
1197 {
1198   DEBUGM(5, ("=====Begin of BgProcessing a msg on node[%d]=====\n", BgMyNode()));
1199   int handler = CmiBgMsgHandle(msg);
1200   DEBUGF(("[%d] call handler %d\n", BgMyNode(), handler));
1201
1202   BgHandlerInfo *handInfo;
1203 #if  CMK_BLUEGENE_NODE
1204   HandlerTable hdlTbl = tMYNODE->handlerTable;
1205   handInfo = hdlTbl.getHandle(handler);
1206 #else
1207   HandlerTable hdlTbl = tHANDLETAB;
1208   handInfo = hdlTbl.getHandle(handler);
1209   if (handInfo == NULL) handInfo = tMYNODE->handlerTable.getHandle(handler);
1210 #endif
1211
1212   if (handInfo == NULL) {
1213     CmiPrintf("[%d] invalid handler: %d. \n", tMYNODEID, handler);
1214     CmiAbort("BgProcessMessage Failed!");
1215   }
1216   BgHandlerEx entryFunc = handInfo->fnPtr;
1217
1218   CmiSetHandler(msg, CmiBgMsgHandle(msg));
1219
1220   // optimization for broadcast messages:
1221   // if the msg is a broadcast token msg, expand it to a real msg
1222   if (CmiBgMsgFlag(msg) == BG_CLONE) {
1223     msg = BgExpandMsg(msg);
1224   }
1225
1226   if (tinfo->watcher) tinfo->watcher->record(msg);
1227
1228   // don't count thread overhead and timing overhead
1229   startVTimer();
1230
1231   DEBUGM(5, ("Executing function %p\n", entryFunc));    
1232
1233   entryFunc(msg, handInfo->userPtr);
1234
1235   stopVTimer();
1236
1237   DEBUGM(5, ("=====End of BgProcessing a msg on node[%d]=====\n\n", BgMyNode()));
1238 }
1239
1240
1241 void scheduleWorkerThread(char *msg)
1242 {
1243   CthThread tid = (CthThread)msg;
1244 //CmiPrintf("scheduleWorkerThread %p\n", tid);
1245   CthAwaken(tid);
1246 }
1247
1248 // thread entry
1249 // for both comm and work thread, virtual function
1250 void run_thread(threadInfo *tinfo)
1251 {
1252   /* set the thread-private threadinfo */
1253   cta(threadinfo) = tinfo;
1254   tinfo->run();
1255 }
1256
1257 /* should be done only once per bg node */
1258 void BgNodeInitialize(nodeInfo *ninfo)
1259 {
1260   CthThread t;
1261   int i;
1262
1263   /* this is here because I will put a message to node inbuffer */
1264   tCURRTIME = 0.0;
1265   tSTARTTIME = CmiWallTimer();
1266
1267   /* creat work threads */
1268   for (i=0; i< cva(bgMach).numWth; i++)
1269   {
1270     threadInfo *tinfo = ninfo->threadinfo[i];
1271     t = CthCreate((CthVoidFn)run_thread, tinfo, cva(bgMach).stacksize);
1272     if (t == NULL) CmiAbort("BG> Failed to create worker thread. \n");
1273     tinfo->setThread(t);
1274     /* put to thread table */
1275     tTHREADTABLE[tinfo->id] = t;
1276 #if BIGSIM_OUT_OF_CORE && BIGSIM_OOC_PREFETCH
1277     //initial scheduling points for workthreads
1278     if(bgUseOutOfCore) schedWorkThds->push((workThreadInfo *)tinfo);
1279 #endif
1280     CthAwaken(t);
1281   }
1282
1283   /* creat communication thread */
1284   for (i=0; i< cva(bgMach).numCth; i++)
1285   {
1286     threadInfo *tinfo = ninfo->threadinfo[i+cva(bgMach).numWth];
1287     t = CthCreate((CthVoidFn)run_thread, tinfo, cva(bgMach).stacksize);
1288     if (t == NULL) CmiAbort("BG> Failed to create communication thread. \n");
1289     tinfo->setThread(t);
1290     /* put to thread table */
1291     tTHREADTABLE[tinfo->id] = t;
1292     CthAwaken(t);
1293   }
1294
1295 }
1296
1297 static void beginExitHandlerFunc(void *msg);
1298 static void writeToDisk();
1299 static void sendCorrectionStats();
1300
1301 static CmiHandler exitHandlerFunc(char *msg)
1302 {
1303   // TODO: free memory before exit
1304   int i,j;
1305
1306   programExit = 2;
1307 #if BLUEGENE_TIMING
1308   // timing
1309   if (0)        // detail
1310   if (genTimeLog) {
1311     for (j=0; j<cva(numNodes); j++)
1312     for (i=0; i<cva(bgMach).numWth; i++) {
1313       BgTimeLine &log = cva(nodeinfo)[j].timelines[i].timeline; 
1314 //      BgPrintThreadTimeLine(nodeInfo::Local2Global(j), i, log);
1315       int x,y,z;
1316       nodeInfo::Local2XYZ(j, &x, &y, &z);
1317       BgWriteThreadTimeLine(arg_argv[0], x, y, z, i, log);
1318     }
1319
1320   }
1321 #endif
1322   if (genTimeLog) sendCorrectionStats();
1323
1324   if (genTimeLog) writeToDisk();
1325
1326 //  if (tTHREADTYPE == WORK_THREAD)
1327   {
1328   int origPe = -2;
1329   // close all tracing modules
1330   for (j=0; j<cva(numNodes); j++)
1331     for (i=0; i<cva(bgMach).numWth; i++) {
1332       int oldPe = CmiSwitchToPE(nodeInfo::Local2Global(j)*cva(bgMach).numWth+i);
1333       if (origPe == -2) origPe = oldPe;
1334       traceCharmClose();
1335 //      CmiSwitchToPE(oldPe);
1336       delete cva(nodeinfo)[j].threadinfo[i]->watcher;   // force dump watcher
1337       cva(nodeinfo)[j].threadinfo[i]->watcher = NULL;
1338     }
1339     if (origPe!=-2) CmiSwitchToPE(origPe);
1340   }
1341
1342 #if 0
1343   delete [] cva(nodeinfo);
1344   delete [] cva(inBuffer);
1345   for (i=0; i<cva(numNodes); i++) CmmFree(cva(msgBuffer)[i]);
1346   delete [] cva(msgBuffer);
1347 #endif
1348
1349 #if CMK_HAS_COUNTER_PAPI
1350   if (cva(bgMach).timingMethod == BG_COUNTER) {
1351 /*        
1352   CmiPrintf("BG[PE %d]> cycles: %lld\n", CmiMyPe(), total_ins);
1353   CmiPrintf("BG[PE %d]> floating point instructions: %lld\n", CmiMyPe(), total_fps);
1354   CmiPrintf("BG[PE %d]> L1 cache misses: %lld\n", CmiMyPe(), total_l1_dcm);
1355   //CmiPrintf("BG[PE %d]> cycles stalled waiting for memory access: %lld\n", CmiMyPe(), total_mem_rcy);
1356  */
1357         for(int i=0; i<numPapiEvents; i++){
1358           CmiPrintf("BG[PE %d]> %s: %lld\n", CmiMyPe(), papi_counters_desc[i], total_papi_counters[i]);
1359            delete papi_counters_desc[i];
1360         }
1361         delete papiEvents;
1362         delete papiValues;
1363         delete papi_counters_desc;
1364         delete total_papi_counters;
1365   }
1366 #endif
1367
1368   //ConverseExit();
1369   if (genTimeLog)
1370     { if (CmiMyPe() != 0) CsdExitScheduler(); }
1371   else
1372     CsdExitScheduler();
1373
1374   //if (CmiMyPe() == 0) CmiPrintf("\nBG> BlueGene emulator shutdown gracefully!\n");
1375
1376   return 0;
1377 }
1378
1379 static void sanityCheck()
1380 {
1381   if (cva(bgMach).x==0 || cva(bgMach).y==0 || cva(bgMach).z==0)  {
1382     if (CmiMyPe() == 0)
1383       CmiPrintf("\nMissing parameters for BlueGene machine size!\n<tip> use command line options: +x, +y, or +z.\n");
1384     BgShutdown(); 
1385   } 
1386   else if (cva(bgMach).numCth==0 || cva(bgMach).numWth==0) { 
1387 #if 1
1388     if (cva(bgMach).numCth==0) cva(bgMach).numCth=1;
1389     if (cva(bgMach).numWth==0) cva(bgMach).numWth=1;
1390 #else
1391     if (CmiMyPe() == 0)
1392       CmiPrintf("\nMissing parameters for number of communication/worker threads!\n<tip> use command line options: +cth or +wth.\n");
1393     BgShutdown(); 
1394 #endif
1395   }
1396   if (cva(bgMach).getNodeSize()<CmiNumPes()) {
1397     CmiAbort("\nToo few BlueGene nodes!\n");
1398   }
1399 }
1400
1401 #undef CmiSwitchToPE
1402 extern "C" int CmiSwitchToPEFn(int pe);
1403
1404 // main
1405 CmiStartFn bgMain(int argc, char **argv)
1406 {
1407   int i;
1408   char *configFile = NULL;
1409
1410 #if CMK_CONDS_USE_SPECIAL_CODE
1411   // overwrite possible implementation in machine.c
1412   CmiSwitchToPE = CmiSwitchToPEFn;
1413 #endif
1414
1415   /* initialize all processor level data */
1416   CpvInitialize(BGMach,bgMach);
1417   cva(bgMach).nullify();
1418
1419   CmiArgGroup("Charm++","BlueGene Simulator");
1420   if (CmiGetArgStringDesc(argv, "+bgconfig", &configFile, "BlueGene machine config file")) {
1421    cva(bgMach). read(configFile);
1422   }
1423   CmiGetArgIntDesc(argv, "+x", &cva(bgMach).x, 
1424                 "The x size of the grid of nodes");
1425   CmiGetArgIntDesc(argv, "+y", &cva(bgMach).y, 
1426                 "The y size of the grid of nodes");
1427   CmiGetArgIntDesc(argv, "+z", &cva(bgMach).z, 
1428                 "The z size of the grid of nodes");
1429   CmiGetArgIntDesc(argv, "+cth", &cva(bgMach).numCth, 
1430                 "The number of simulated communication threads per node");
1431   CmiGetArgIntDesc(argv, "+wth", &cva(bgMach).numWth, 
1432                 "The number of simulated worker threads per node");
1433
1434   CmiGetArgIntDesc(argv, "+bgstacksize", &cva(bgMach).stacksize, 
1435                 "Blue Gene thread stack size");
1436
1437   if (CmiGetArgFlagDesc(argv, "+bglog", "Write events to log file"))
1438      genTimeLog = 1;
1439   if (CmiGetArgFlagDesc(argv, "+bgcorrect", "Apply timestamp correction to logs"))
1440     correctTimeLog = 1;
1441   schedule_flag = 0;
1442   if (correctTimeLog) {
1443     genTimeLog = 1;
1444     schedule_flag = 1;
1445   }
1446
1447   if (CmiGetArgFlagDesc(argv, "+bgverbose", "Print debug info verbosely"))
1448     bgverbose = 1;
1449
1450   // for timing method, default using elapse calls.
1451   if(CmiGetArgFlagDesc(argv, "+bgelapse", 
1452                        "Use user provided BgElapse for time prediction")) 
1453       cva(bgMach).timingMethod = BG_ELAPSE;
1454   if(CmiGetArgFlagDesc(argv, "+bgwalltime", 
1455                        "Use walltime method for time prediction")) 
1456       cva(bgMach).timingMethod = BG_WALLTIME;
1457 #ifdef CMK_ORIGIN2000
1458   if(CmiGetArgFlagDesc(argv, "+bgcounter", "Use performance counter")) 
1459       cva(bgMach).timingMethod = BG_COUNTER;
1460 #elif CMK_HAS_COUNTER_PAPI
1461   if (CmiGetArgFlagDesc(argv, "+bgpapi", "Use PAPI Performance counters")) {
1462     cva(bgMach).timingMethod = BG_COUNTER;
1463   }
1464   if (cva(bgMach).timingMethod == BG_COUNTER) {
1465     init_counters();
1466   }
1467 #endif
1468   CmiGetArgDoubleDesc(argv,"+bgfpfactor", &cva(bgMach).fpfactor, 
1469                       "floating point to time factor");
1470   CmiGetArgDoubleDesc(argv,"+bgcpufactor", &cva(bgMach).cpufactor, 
1471                       "scale factor for wallclock time measured");
1472   CmiGetArgDoubleDesc(argv,"+bgtimercost", &cva(bgMach).timercost, 
1473                       "timer cost");
1474   #if 0
1475   if(cva(bgMach).timingMethod == BG_WALLTIME)
1476   {
1477       int count = 1e6;
1478       double start, stop, diff, cost, dummy;
1479
1480       dummy = BG_TIMER(); // In case there's an initialization delay somewhere
1481
1482       start = BG_TIMER();
1483       for (int i = 0; i < count; ++i)
1484           dummy = BG_TIMER();
1485       stop = BG_TIMER();
1486
1487       diff = stop - start;
1488       cost = diff / count;
1489
1490       CmiPrintf("Measured timer cost: %g Actual: %g\n", 
1491                 cost, cva(bgMach).timercost);
1492       cva(bgMach).timercost = cost;
1493   }
1494   #endif
1495   
1496   char *networkModel;
1497   if (CmiGetArgStringDesc(argv, "+bgnetwork", &networkModel, "Network model")) {
1498    cva(bgMach).setNetworkModel(networkModel);
1499   }
1500
1501   bgcorroff = 0;
1502   if(CmiGetArgFlagDesc(argv, "+bgcorroff", "Start with correction off")) 
1503     bgcorroff = 1;
1504
1505   bgstats_flag=0;
1506   if(CmiGetArgFlagDesc(argv, "+bgstats", "Print correction statistics")) 
1507     bgstats_flag = 1;
1508
1509   if (CmiGetArgStringDesc(argv, "+bgtraceroot", &cva(bgMach).traceroot, "Directory to write bgTrace files to"))
1510   {
1511     char *root = (char*)malloc(strlen(cva(bgMach).traceroot) + 10);
1512     sprintf(root, "%s/", cva(bgMach).traceroot);
1513     cva(bgMach).traceroot = root;
1514   }
1515
1516   // record/replay
1517   if (CmiGetArgFlagDesc(argv,"+bgrecord","Record message processing order for BigSim")) {
1518     cva(bgMach).record = 1;
1519     if (CmiMyPe() == 0)
1520       CmiPrintf("BG info> full record mode. \n");
1521   }
1522   int replaype;
1523   if (CmiGetArgIntDesc(argv,"+bgreplay", &replaype, "Re-play message processing order for BigSim")) {
1524     cva(bgMach).replay = replaype;
1525   }
1526   else {
1527     if (CmiGetArgFlagDesc(argv,"+bgreplay","Record message processing order for BigSim"))
1528     cva(bgMach).replay = 0;    // default to 0
1529   }
1530   if (cva(bgMach).replay >= 0) {
1531     if (CmiNumPes()>1)
1532       CmiAbort("BG> bgreplay mode must run on one physical processor.");
1533     if (cva(bgMach).x!=1 || cva(bgMach).y!=1 || cva(bgMach).z!=1 ||
1534          cva(bgMach).numWth!=1 || cva(bgMach).numCth!=1)
1535       CmiAbort("BG> bgreplay mode must run on one target processor.");
1536     CmiPrintf("BG info> replay mode for target processor %d.\n", cva(bgMach).replay);
1537   }
1538   char *procs = NULL;
1539   if (CmiGetArgStringDesc(argv, "+bgrecordprocessors", &procs, "A list of processors to record, e.g. 0,10,20-30")) {
1540     cva(bgMach).recordprocs.set(procs);
1541   }
1542
1543
1544   /* parameters related with out-of-core execution */
1545   int tmpcap=0;
1546   if (CmiGetArgIntDesc(argv, "+bgooccap", &tmpcap, "Simulate with out-of-core support and the number of target processors allowed in memory")){
1547      TBLCAPACITY = tmpcap;
1548     }
1549   if (CmiGetArgDoubleDesc(argv, "+bgooc", &bgOOCMaxMemSize, "Simulate with out-of-core support and the threshhold of memory size")){
1550       bgUseOutOfCore = 1;
1551       _BgInOutOfCoreMode = 1; //the global (the whole converse layer) out-of-core flag
1552
1553       double curFreeMem = bgGetSysFreeMemSize();
1554       if(fabs(bgOOCMaxMemSize - 0.0)<=1e-6){
1555         //using the memory available of the system right now
1556         //assuming no other programs will run after this program runs
1557         bgOOCMaxMemSize = curFreeMem;
1558         CmiPrintf("Using the system's current memory available: %.3fMB\n", bgOOCMaxMemSize);
1559       }
1560       if(bgOOCMaxMemSize > curFreeMem){
1561         CmiPrintf("Warning: not enough memory for the specified memory size, now use the current available memory %3.fMB.\n", curFreeMem);
1562         bgOOCMaxMemSize = curFreeMem;
1563       }
1564       DEBUGF(("out-of-core turned on!\n"));
1565   }      
1566
1567 #if BLUEGENE_DEBUG_LOG
1568   {
1569     char ln[200];
1570     sprintf(ln,"bgdebugLog.%d",CmiMyPe());
1571     bgDebugLog=fopen(ln,"w");
1572   }
1573 #endif
1574
1575   arg_argv = argv;
1576   arg_argc = CmiGetArgc(argv);
1577
1578   /* msg handler */
1579   CpvInitialize(SimState, simState);
1580   cva(simState).msgHandler = CmiRegisterHandler((CmiHandler) msgHandlerFunc);
1581   cva(simState).nBcastMsgHandler = CmiRegisterHandler((CmiHandler)nodeBCastMsgHandlerFunc);
1582   cva(simState).tBcastMsgHandler = CmiRegisterHandler((CmiHandler)threadBCastMsgHandlerFunc);
1583   cva(simState).exitHandler = CmiRegisterHandler((CmiHandler) exitHandlerFunc);
1584
1585   cva(simState).beginExitHandler = CmiRegisterHandler((CmiHandler) beginExitHandlerFunc);
1586   cva(simState).inEmulatorInit = 1;
1587   /* call user defined BgEmulatorInit */
1588   BgEmulatorInit(arg_argc, arg_argv);
1589   cva(simState).inEmulatorInit = 0;
1590
1591   /* check if all bluegene node size and thread information are set */
1592   sanityCheck();
1593
1594   _bgSize = cva(bgMach).getNodeSize(); 
1595
1596   timerFunc = BgGetTime;
1597
1598   BgInitTiming();               // timing module
1599
1600   if (CmiMyPe() == 0) {
1601     CmiPrintf("BG info> Simulating %dx%dx%d nodes with %d comm + %d work threads each.\n", cva(bgMach).x, cva(bgMach).y, cva(bgMach).z, cva(bgMach).numCth, cva(bgMach).numWth);
1602     CmiPrintf("BG info> Network type: %s.\n", cva(bgMach).network->name());
1603     cva(bgMach).network->print();
1604     CmiPrintf("BG info> cpufactor is %f.\n", cva(bgMach).cpufactor);
1605     CmiPrintf("BG info> floating point factor is %f.\n", cva(bgMach).fpfactor);
1606     if (cva(bgMach).stacksize)
1607       CmiPrintf("BG info> BG stack size: %d bytes. \n", cva(bgMach).stacksize);
1608     if (cva(bgMach).timingMethod == BG_ELAPSE) 
1609       CmiPrintf("BG info> Using BgElapse calls for timing method. \n");
1610     else if (cva(bgMach).timingMethod == BG_WALLTIME)
1611       CmiPrintf("BG info> Using WallTimer for timing method. \n");
1612     else if (cva(bgMach).timingMethod == BG_COUNTER)
1613       CmiPrintf("BG info> Using performance counter for timing method. \n");
1614     if (genTimeLog)
1615       CmiPrintf("BG info> Generating timing log. \n");
1616     if (correctTimeLog)
1617       CmiPrintf("BG info> Perform timestamp correction. \n");
1618     if (cva(bgMach).traceroot)
1619       CmiPrintf("BG info> bgTrace root is '%s'. \n", cva(bgMach).traceroot);
1620   }
1621
1622   CtvInitialize(threadInfo *, threadinfo);
1623
1624   /* number of bg nodes on this PE */
1625   CpvInitialize(int, numNodes);
1626   cva(numNodes) = nodeInfo::numLocalNodes();
1627
1628   if (CmiMyRank() == 0)
1629     initLock = CmiCreateLock();     // used for BnvInitialize
1630
1631   bgstreaming.init(cva(numNodes));
1632
1633   //Must initialize out-of-core related data structures before creating any BG nodes and procs
1634 if(bgUseOutOfCore){
1635       initTblThreadInMem();
1636           #if BIGSIM_OUT_OF_CORE && BIGSIM_OOC_PREFETCH
1637       //init prefetch status      
1638       thdsOOCPreStatus = new oocPrefetchStatus[cva(numNodes)*cva(bgMach).numWth];
1639       oocPrefetchSpace = new oocPrefetchBufSpace();
1640       schedWorkThds = new oocWorkThreadQueue();
1641           #endif
1642   }
1643
1644 #if BIGSIM_OUT_OF_CORE
1645   //initialize variables related to get precise
1646   //physical memory usage info for a process
1647   bgMemPageSize = getpagesize();
1648   memset(bgMemStsFile, 0, 25); 
1649   sprintf(bgMemStsFile, "/proc/%d/statm", getpid());
1650 #endif
1651
1652
1653   /* create BG nodes */
1654   CpvInitialize(nodeInfo *, nodeinfo);
1655   cva(nodeinfo) = new nodeInfo[cva(numNodes)];
1656   _MEMCHECK(cva(nodeinfo));
1657
1658   cta(threadinfo) = new threadInfo(-1, UNKNOWN_THREAD, NULL);
1659   _MEMCHECK(cta(threadinfo));
1660
1661   /* create BG processors for each node */
1662   for (i=0; i<cva(numNodes); i++)
1663   {
1664     nodeInfo *ninfo = cva(nodeinfo) + i;
1665     // create threads
1666     ninfo->initThreads(i);
1667
1668     /* pretend that I am a thread */
1669     cta(threadinfo)->myNode = ninfo;
1670
1671     /* initialize a BG node and fire all threads */
1672     BgNodeInitialize(ninfo);
1673   }
1674   // clear main thread.
1675   cta(threadinfo)->myNode = NULL;
1676   CpvInitialize(CthThread, mainThread);
1677   cva(mainThread) = CthSelf();
1678
1679   CpvInitialize(int, CthResumeBigSimThreadIdx);
1680
1681   cva(simState).simStartTime = CmiWallTimer();    
1682   return 0;
1683 }
1684
1685 // for conv-conds:
1686 // if -2 untouch
1687 // if -1 main thread
1688 #if CMK_BLUEGENE_THREAD
1689 extern "C" int CmiSwitchToPEFn(int pe)
1690 {
1691   if (pe == -2) return -2;
1692   int oldpe;
1693 //  ASSERT(tTHREADTYPE != COMM_THREAD);
1694   if (tMYNODE == NULL) oldpe = -1;
1695   else if (tTHREADTYPE == COMM_THREAD) oldpe = -BgGetThreadID();
1696   else if (tTHREADTYPE == WORK_THREAD) oldpe = BgGetGlobalWorkerThreadID();
1697   else oldpe = -1;
1698 //CmiPrintf("CmiSwitchToPE from %d to %d\n", oldpe, pe);
1699   if (pe == -1) {
1700     CthSwitchThread(cva(mainThread));
1701   }
1702   else if (pe < 0) {
1703   }
1704   else {
1705     if (cva(bgMach).inReplayMode()) pe = 0;         /* replay mode */
1706     int t = pe%cva(bgMach).numWth;
1707     int newpe = nodeInfo::Global2Local(pe/cva(bgMach).numWth);
1708     nodeInfo *ninfo = cva(nodeinfo) + newpe;;
1709     threadInfo *tinfo = ninfo->threadinfo[t];
1710     CthSwitchThread(tinfo->me);
1711   }
1712   return oldpe;
1713 }
1714 #else
1715 extern "C" int CmiSwitchToPEFn(int pe)
1716 {
1717   if (pe == -2) return -2;
1718   int oldpe;
1719   if (tMYNODE == NULL) oldpe = -1;
1720   else oldpe = BgMyNode();
1721   if (pe == -1) {
1722     cta(threadinfo)->myNode = NULL;
1723   }
1724   else {
1725     int newpe = nodeInfo::Global2Local(pe);
1726     cta(threadinfo)->myNode = cva(nodeinfo) + newpe;;
1727   }
1728   return oldpe;
1729 }
1730 #endif
1731
1732
1733 /*****************************************************************************
1734                         TimeLog correction
1735 *****************************************************************************/
1736
1737 extern void processCorrectionMsg(int nodeidx);
1738
1739 // return the msg pointer, and the index of the message in the affinity queue.
1740 static inline char* searchInAffinityQueue(int nodeidx, BgMsgID &msgId, CmiInt2 tID, int &index)
1741 {
1742   CmiAssert(tID != ANYTHREAD);
1743   ckMsgQueue &affinityQ = cva(nodeinfo)[nodeidx].affinityQ[tID];
1744   for (int i=0; i<affinityQ.length(); i++)  {
1745       char *msg = affinityQ[i];
1746       BgMsgID md = BgMsgID(CmiBgMsgSrcPe(msg), CmiBgMsgID(msg));
1747       if (msgId == md) {
1748         index = i;
1749         return msg;
1750       }
1751   }
1752   return NULL;
1753 }
1754
1755 // return the msg pointer, thread id and the index of the message in the affinity queue.
1756 static char* searchInAffinityQueueInNode(int nodeidx, BgMsgID &msgId, CmiInt2 &tID, int &index)
1757 {
1758   for (tID=0; tID<cva(bgMach).numWth; tID++) {
1759     char *msg = searchInAffinityQueue(nodeidx, msgId, tID, index);
1760     if (msg) return msg;
1761   }
1762   return NULL;
1763 }
1764
1765 StateCounters stateCounters;
1766
1767 int updateRealMsgs(bgCorrectionMsg *cm, int nodeidx)
1768 {
1769   char *msg;
1770   CmiInt2 tID = cm->tID;
1771   int index;
1772   if (tID == ANYTHREAD) {
1773     msg = searchInAffinityQueueInNode(nodeidx, cm->msgId, tID, index);
1774   }
1775   else {
1776     msg = searchInAffinityQueue(nodeidx, cm->msgId, tID, index);
1777   }
1778   if (msg == NULL) return 0;
1779
1780   ckMsgQueue &affinityQ = cva(nodeinfo)[nodeidx].affinityQ[tID];
1781   CmiBgMsgRecvTime(msg) = cm->tAdjust;
1782   affinityQ.update(index);
1783   CthThread tid = cva(nodeinfo)[nodeidx].threadTable[tID];
1784   unsigned int prio = (unsigned int)(cm->tAdjust*PRIO_FACTOR)+1;
1785   CthAwakenPrio(tid, CQS_QUEUEING_IFIFO, sizeof(int), &prio);
1786   stateCounters.corrMsgCRCnt++;
1787   return 1;       /* invalidate this msg */
1788 }
1789
1790 extern void processBufferCorrectionMsgs(void *ignored);
1791
1792 // Coverse handler for begin exit
1793 // flush and process all correction messages
1794 static void beginExitHandlerFunc(void *msg)
1795 {
1796   CmiFree(msg);
1797   delayCheckFlag = 0;
1798 //CmiPrintf("\n\n\nbeginExitHandlerFunc called on %d\n", CmiMyPe());
1799   programExit = 1;
1800 #if LIMITED_SEND
1801   CQdCreate(CpvAccess(cQdState), BgNodeSize());
1802 #endif
1803
1804 #if 1
1805   for (int i=0; i<BgNodeSize(); i++) 
1806     processCorrectionMsg(i); 
1807 #if USE_MULTISEND
1808   BgSendBufferedCorrMsgs();
1809 #endif
1810 #else
1811   // the msg queue should be empty now.
1812   // don't do insert adjustment, but start to do all timing correction from here
1813   int nodeidx, tID;
1814   for (nodeidx=0; nodeidx<BgNodeSize(); nodeidx++) {
1815     BgTimeLineRec *tlines = cva(nodeinfo)[nodeidx].timelines;
1816     for (tID=0; tID<cva(numWth); tID++) {
1817         BgTimeLineRec &tlinerec = tlines[tID];
1818         BgAdjustTimeLine(tlinerec, nodeidx, tID);
1819     }
1820   }
1821
1822 #endif
1823
1824 #if !THROTTLE_WORK
1825 #if DELAY_CHECK
1826   CcdCallFnAfter(processBufferCorrectionMsgs,NULL,CHECK_INTERVAL);
1827 #endif
1828 #endif
1829 }
1830
1831 #define HISTOGRAM_SIZE  100
1832 // compute total CPU utilization for each timeline and 
1833 // return the number of real msgs
1834 void computeUtilForAll(int* array, int *nReal)
1835 {
1836   double scale = 1.0e3;         // scale to ms
1837
1838   //We measure from 1ms to 5001 ms in steps of 100 ms
1839   int min = 0, step = 1;
1840   int max = min + HISTOGRAM_SIZE*step;
1841   // [min, max: step]  HISTOGRAM_SIZE slots
1842
1843   if (CmiMyPe()==0)
1844     CmiPrintf("computeUtilForAll: min:%d max:%d step:%d scale:%f.\n", min, max, step, scale);
1845   int size = (max-min)/step;
1846   CmiAssert(size == HISTOGRAM_SIZE);
1847   int allmin = -1, allmax = -1;
1848   for(int i=0;i<size;i++) array[i] = 0;
1849
1850   for (int nodeidx=0; nodeidx<cva(numNodes); nodeidx++) {
1851     BgTimeLineRec *tlinerec = cva(nodeinfo)[nodeidx].timelines;
1852     for (int tID=0; tID<cva(bgMach).numWth; tID++) {
1853       int util = (int)(scale*(tlinerec[tID].computeUtil(nReal)));
1854
1855       if (util >= max) { if (util>allmax||allmax==-1) allmax=util; util=max-1;}
1856       if (util < min) { if (util<allmin||allmin==-1) allmin=util; util=min; }
1857       array[(util-min)/step]++;
1858     }
1859   }
1860   if (allmin!=-1 || allmax!=-1)
1861     CmiPrintf("[%d] Warning: computeUtilForAll out of range %f - %f.\n", CmiMyPe(), (allmin==-1)?-1:allmin/scale, (allmax==-1)?-1:allmax/scale);
1862 }
1863
1864 class StatsMessage {
1865   char core[CmiBlueGeneMsgHeaderSizeBytes];
1866 public:
1867   int processCount;
1868   int corrMsgCount;
1869   int realMsgCount;
1870   int maxTimelineLen, minTimelineLen;
1871 };
1872
1873 extern int processCount, corrMsgCount;
1874
1875 static void sendCorrectionStats()
1876 {
1877   int msgSize = sizeof(StatsMessage)+sizeof(int)*HISTOGRAM_SIZE;
1878   StatsMessage *statsMsg = (StatsMessage *)CmiAlloc(msgSize);
1879   statsMsg->processCount = processCount;
1880   statsMsg->corrMsgCount = corrMsgCount;
1881   int numMsgs=0;
1882   int maxTimelineLen=-1, minTimelineLen=CMK_MAXINT;
1883   int totalMem = 0;
1884   if (bgstats_flag) {
1885   for (int nodeidx=0; nodeidx<cva(numNodes); nodeidx++) {
1886     BgTimeLineRec *tlines = cva(nodeinfo)[nodeidx].timelines;
1887     for (int tID=0; tID<cva(bgMach).numWth; tID++) {
1888         BgTimeLineRec &tlinerec = tlines[tID];
1889         int tlen = tlinerec.length();
1890         if (tlen>maxTimelineLen) maxTimelineLen=tlen;
1891         if (tlen<minTimelineLen) minTimelineLen=tlen;
1892         totalMem = tlen*sizeof(BgTimeLog);
1893 //CmiPrintf("[%d node:%d] BgTimeLog: %dK len:%d size of bglog: %d bytes\n", CmiMyPe(), nodeidx, totalMem/1000, tlen, sizeof(BgTimeLog));
1894 #if 0
1895         for (int i=0; i< tlinerec.length(); i++) {
1896           numMsgs += tlinerec[i]->msgs.length();
1897         }
1898 #endif
1899     }
1900   }
1901   computeUtilForAll((int*)(statsMsg+1), &numMsgs);
1902   statsMsg->realMsgCount = numMsgs;
1903   statsMsg->maxTimelineLen = maxTimelineLen;
1904   statsMsg->minTimelineLen = minTimelineLen;
1905   }  // end if
1906
1907   CmiSetHandler(statsMsg, cva(simState).bgStatCollectHandler);
1908   CmiSyncSendAndFree(0, msgSize, statsMsg);
1909 }
1910
1911 // Converse handler for collecting stats
1912 void statsCollectionHandlerFunc(void *msg)
1913 {
1914   static int count=0;
1915   static int pc=0, cc=0, realMsgCount=0;
1916   static int maxTimelineLen=0, minTimelineLen=CMK_MAXINT;
1917   static int *histArray = NULL;
1918   int i;
1919
1920   count++;
1921   if (histArray == NULL) {
1922     histArray = new int[HISTOGRAM_SIZE];
1923     for (i=0; i<HISTOGRAM_SIZE; i++) histArray[i]=0;
1924   }
1925   StatsMessage *m = (StatsMessage *)msg;
1926   pc += m->processCount;
1927   cc += m->corrMsgCount;
1928   realMsgCount += m->realMsgCount;
1929   if (minTimelineLen> m->minTimelineLen) minTimelineLen=m->minTimelineLen;
1930   if (maxTimelineLen< m->maxTimelineLen) maxTimelineLen=m->maxTimelineLen;
1931   int *array = (int *)(m+1);
1932   for (i=0; i<HISTOGRAM_SIZE; i++) histArray[i] += array[i];
1933   if (count == CmiNumPes()) {
1934     if (bgstats_flag) {
1935       CmiPrintf("Total procCount:%d corrMsgCount:%d realMsg:%d timeline:%d-%d\n", pc, cc, realMsgCount, minTimelineLen, maxTimelineLen);
1936       for (i=0; i<HISTOGRAM_SIZE; i++) {
1937         CmiPrintf("%d ", histArray[i]);
1938         if (i%20 == 19) CmiPrintf("\n");
1939       }
1940       CmiPrintf("\n");
1941     }
1942     CsdExitScheduler();
1943   }
1944   CmiFree(msg);
1945 }
1946
1947 // update arrival time from buffer messages
1948 // before start an entry, check message time against buffered timing
1949 // correction message to update to the correct time.
1950 void correctMsgTime(char *msg)
1951 {
1952    if (!correctTimeLog) return;
1953 //   if (CkMsgDoCorrect(msg) == 0) return;
1954
1955    BgMsgID msgId(CmiBgMsgSrcPe(msg), CmiBgMsgID(msg));
1956    CmiInt2 tid = CmiBgMsgThreadID(msg);
1957
1958    bgCorrectionQ &cmsg = cva(nodeinfo)[tMYNODEID].cmsg;
1959    int len = cmsg.length();
1960    for (int i=0; i<len; i++) {
1961      bgCorrectionMsg* m = cmsg[i];
1962      if (msgId == m->msgId && tid == m->tID) {
1963         if (m->tAdjust < 0.0) return;
1964         //CmiPrintf("correctMsgTime from %e to %e\n", CmiBgMsgRecvTime(msg), m->tAdjust);
1965         CmiBgMsgRecvTime(msg) = m->tAdjust;
1966         m->tAdjust = -1.0;       /* invalidate this msg */
1967 //      cmsg.update(i);
1968         stateCounters.corrMsgRCCnt++;
1969         break;
1970      }
1971    }
1972 }
1973
1974
1975 //TODO: write disk bgTraceFiles
1976 static void writeToDisk()
1977 {
1978
1979   char* d = new char[1025];
1980   //Num of simulated procs on this real pe
1981   int numLocalProcs = cva(numNodes)*cva(bgMach).numWth;
1982
1983   const PUP::machineInfo &machInfo = PUP::machineInfo::current();
1984
1985   // write summary file on PE0
1986   if(CmiMyPe()==0){
1987     
1988     sprintf(d, "%sbgTrace", cva(bgMach).traceroot?cva(bgMach).traceroot:""); 
1989     FILE *f2 = fopen(d,"wb");
1990     //Total emulating processors and total target BG processors
1991     int numEmulatingPes=CmiNumPes();
1992     int totalWorkerProcs = BgNumNodes()*cva(bgMach).numWth;
1993
1994     if(f2==NULL) {
1995       CmiPrintf("[%d] Creating trace file %s  failed\n", CmiMyPe(), d);
1996       CmiAbort("BG> Abort");
1997     }
1998     PUP::toDisk p(f2);
1999     p((char *)&machInfo, sizeof(machInfo));
2000     p|totalWorkerProcs;
2001     p|cva(bgMach);
2002     p|numEmulatingPes;
2003     p|bglog_version;
2004     p|CpvAccess(CthResumeBigSimThreadIdx);
2005     
2006     CmiPrintf("[0] Number is numX:%d numY:%d numZ:%d numCth:%d numWth:%d numEmulatingPes:%d totalWorkerProcs:%d bglog_ver:%d\n",cva(bgMach).x,cva(bgMach).y,cva(bgMach).z,cva(bgMach).numCth,cva(bgMach).numWth,numEmulatingPes,totalWorkerProcs,bglog_version);
2007     
2008     fclose(f2);
2009   }
2010   
2011   sprintf(d, "%sbgTrace%d", cva(bgMach).traceroot?cva(bgMach).traceroot:"", CmiMyPe()); 
2012   FILE *f = fopen(d,"wb");
2013  
2014   if(f==NULL)
2015     CmiPrintf("Creating bgTrace%d failed\n",CmiMyPe());
2016   PUP::toDisk p(f);
2017   
2018   p((char *)&machInfo, sizeof(machInfo));       // machine info
2019   p|numLocalProcs;
2020
2021   // CmiPrintf("Timelines are: \n");
2022   int procTablePos = ftell(f);
2023
2024   int *procOffsets = new int[numLocalProcs];
2025   int procTableSize = (numLocalProcs)*sizeof(int);
2026   fseek(f,procTableSize,SEEK_CUR); 
2027
2028   for (int j=0; j<cva(numNodes); j++){
2029     for(int i=0;i<cva(bgMach).numWth;i++){
2030     #if BIGSIM_OUT_OF_CORE
2031         //When isomalloc is used, some events inside BgTimeLineRec are allocated
2032         //through isomalloc. Therefore, the memory containing those events needs
2033         //to be brought back into memory from disk. --Chao Mei          
2034         if(bgUseOutOfCore && CmiMemoryIs(CMI_MEMORY_IS_ISOMALLOC))
2035             bgOutOfCoreSchedule(cva(nodeinfo)[j].threadinfo[i]);
2036      #endif     
2037       BgTimeLineRec &t = cva(nodeinfo)[j].timelines[i];
2038       procOffsets[j*cva(bgMach).numWth + i] = ftell(f);
2039       t.pup(p);
2040     }
2041   }
2042   
2043   fseek(f,procTablePos,SEEK_SET);
2044   p(procOffsets,numLocalProcs);
2045   fclose(f);
2046
2047   CmiPrintf("[%d] Wrote to disk for %d BG nodes. \n", CmiMyPe(), cva(numNodes));
2048 }
2049
2050
2051 /*****************************************************************************
2052              application Converse thread hook
2053 *****************************************************************************/
2054
2055 CpvExtern(int      , CthResumeBigSimThreadIdx);
2056
2057 void CthEnqueueBigSimThread(CthThreadToken* token, int s,
2058                                    int pb,unsigned int *prio)
2059 {
2060 /*
2061   CmiSetHandler(token, CpvAccess(CthResumeNormalThreadIdx));
2062   CsdEnqueueGeneral(token, s, pb, prio);
2063 */
2064 #if CMK_BLUEGENE_THREAD
2065   int x, y, z;
2066   BgGetMyXYZ(&x, &y, &z);
2067   int t = BgGetThreadID();
2068 #else
2069   #error "ERROR HERE"
2070 #endif
2071     // local message into queue
2072   DEBUGM(4, ("In EnqueueBigSimThread method!\n"));
2073
2074   DEBUGM(4, ("token [%p] is added to queue pointing to thread[%p]\n", token, token->thread));
2075   BgSendPacket(x,y,z, t, CpvAccess(CthResumeBigSimThreadIdx), LARGE_WORK, sizeof(CthThreadToken), (char *)token);
2076
2077
2078 CthThread CthSuspendBigSimThread()
2079
2080   return  cta(threadinfo)->me;
2081 }
2082
2083 static void bigsimThreadListener_suspend(struct CthThreadListener *l)
2084 {
2085    // stop timer
2086    stopVTimer();
2087 }
2088
2089 static void bigsimThreadListener_resume(struct CthThreadListener *l)
2090 {
2091    // start timer by reset it
2092    resetVTime();
2093 }
2094
2095
2096 void BgSetStrategyBigSimDefault(CthThread t)
2097
2098   CthSetStrategy(t,
2099                  CthEnqueueBigSimThread,
2100                  CthSuspendBigSimThread);
2101
2102   CthThreadListener *a = new CthThreadListener;
2103   a->suspend = bigsimThreadListener_suspend;
2104   a->resume = bigsimThreadListener_resume;
2105   a->free = NULL;
2106   CthAddListener(t, a);
2107 }
2108
2109 int BgIsMainthread()
2110 {
2111     return tMYNODE == NULL;
2112 }
2113
2114 int BgIsRecord()
2115 {
2116     return cva(bgMach).record == 1;
2117 }
2118
2119 int BgIsReplay()
2120 {
2121     return cva(bgMach).replay != -1;
2122 }
2123
2124 // for record/replay, to fseek back
2125 void BgRewindRecord()
2126 {
2127   threadInfo *tinfo = cta(threadinfo);
2128   if (tinfo->watcher) tinfo->watcher->rewind();
2129 }
2130