use Ckpv on currentChareIdx
[charm.git] / src / langs / bluegene / blue.C
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 /** \file: blue.C -- Converse BlueGene Emulator Code
9  *  Emulator written by Gengbin Zheng, gzheng@uiuc.edu on 2/20/2001
10  */
11  
12 #include <stdio.h>
13 #include <string.h>
14 #include <stdlib.h>
15 #include <math.h>
16 #include <string.h>
17 #include <unistd.h>
18
19 #include "cklists.h"
20 #include "queueing.h"
21 #include "blue.h"
22 #include "blue_impl.h"          // implementation header file
23 //#include "blue_timing.h"      // timing module
24 #include "bigsim_record.h"
25
26 #include "bigsim_ooc.h" //out-of-core module
27 #include "bigsim_debug.h"
28
29 //#define  DEBUGF(x)      //CmiPrintf x;
30
31 #undef DEBUGLEVEL
32 #define DEBUGLEVEL 10
33
34 const double CHARM_OVERHEAD = 0.5E-6;
35
36 /* node level variables */
37 CpvDeclare(nodeInfo*, nodeinfo);                /* represent a bluegene node */
38
39 /* thread level variables */
40 CtvDeclare(threadInfo *, threadinfo);   /* represent a bluegene thread */
41
42 CpvStaticDeclare(CthThread, mainThread);
43
44 /* BG machine parameter */
45 CpvDeclare(BGMach, bgMach);     /* BG machine size description */
46 CpvDeclare(int, numNodes);        /* number of bg nodes on this PE */
47
48 /* emulator node level variables */
49 CpvDeclare(SimState, simState);
50
51 CpvDeclare(int      , CthResumeBigSimThreadIdx);
52
53 static int arg_argc;
54 static char **arg_argv;
55
56 CmiNodeLock initLock;     // used for BnvInitialize
57
58 int _bgSize = 0;                        // short cut of blue gene node size
59 int delayCheckFlag = 1;          // when enabled, only check correction 
60                                         // messages after some interval
61 int programExit = 0;
62
63 static int bgstats_flag = 0;            // flag print stats at end of simulation
64
65 // for debugging log
66 FILE *bgDebugLog;                       // for debugging
67
68 #ifdef CMK_ORIGIN2000
69 extern "C" int start_counters(int e0, int e1);
70 extern "C" int read_counters(int e0, long long *c0, int e1, long long *c1);
71 inline double Count2Time(long long c) { return c*5.e-7; }
72 #elif CMK_HAS_COUNTER_PAPI
73 #include <papi.h>
74 int *papiEvents = NULL;
75 int numPapiEvents;
76 long_long *papiValues = NULL;
77 CmiUInt8 *total_papi_counters = NULL;
78 char **papi_counters_desc = NULL;
79 #define MAX_TOTAL_EVENTS 10
80 #endif
81
82
83 /****************************************************************************
84      little utility functions
85 ****************************************************************************/
86
87 char **BgGetArgv() { return arg_argv; }
88 int    BgGetArgc() { return arg_argc; }
89
90 /***************************************************************************
91      Implementation of the same counter interface currently used for the
92      Origin2000 using PAPI in the underlying layer.
93
94      Because of the difference in counter numbering, it is assumed that
95      the two counters desired are CYCLES and FLOPS and hence the numbers
96      e0 and e1 are ignored.
97
98      start_counters is also not implemented because PAPI does things in
99      a different manner.
100 ****************************************************************************/
101
102 #if CMK_HAS_COUNTER_PAPI
103 /*
104 CmiUInt8 total_ins = 0;
105 CmiUInt8 total_fps = 0;
106 CmiUInt8 total_l1_dcm = 0;
107 CmiUInt8 total_mem_rcy = 0;
108 */
109 int init_counters()
110 {
111     int retval = PAPI_library_init(PAPI_VER_CURRENT);
112
113     if (retval != PAPI_VER_CURRENT) { CmiAbort("PAPI library init error!"); } 
114
115         //a temporary
116         const char *eventDesc[MAX_TOTAL_EVENTS];
117         int lpapiEvents[MAX_TOTAL_EVENTS];
118         
119     numPapiEvents = 0;
120         
121 #define ADD_LOW_LEVEL_EVENT(e, edesc) \
122                 if(PAPI_query_event(e) == PAPI_OK){ \
123                         if(CmiMyPe()==0) printf("PAPI Info:> Event for %s added\n", edesc); \
124                         eventDesc[numPapiEvents] = edesc;       \
125                         lpapiEvents[numPapiEvents++] = e; \
126                 }
127                 
128     // PAPI high level API does not require explicit library intialization
129         eventDesc[numPapiEvents] ="cycles";
130     lpapiEvents[numPapiEvents++] = PAPI_TOT_CYC;
131         
132         
133     /*if (PAPI_query_event(PAPI_FP_INS) == PAPI_OK) {
134       if (CmiMyPe()== 0) printf("PAPI_FP_INS used\n");
135           eventDesc[numPapiEvents] = "floating point instructions";     
136       lpapiEvents[numPapiEvents++] = PAPI_FP_INS;         
137     } else {
138       if (CmiMyPe()== 0) printf("PAPI_TOT_INS used\n");
139           eventDesc[numPapiEvents] = "total instructions";      
140       lpapiEvents[numPapiEvents++] = PAPI_TOT_INS;
141     }
142     */
143         
144         //ADD_LOW_LEVEL_EVENT(PAPI_FP_INS, "floating point instructions");
145         ADD_LOW_LEVEL_EVENT(PAPI_TOT_INS, "total instructions");
146         //ADD_LOW_LEVEL_EVENT(PAPI_L1_DCM, "L1 cache misses");
147         ADD_LOW_LEVEL_EVENT(PAPI_L2_DCM, "L2 data cache misses");       
148         //ADD_LOW_LEVEL_EVENT(PAPI_MEM_RCY, "idle cycles waiting for memory reads");    
149         //ADD_LOW_LEVEL_EVENT(PAPI_TLB_DM, "Data TLB misses");
150         
151         if(numPapiEvents == 0){
152                 CmiAbort("No papi events are defined!\n");
153         }
154         
155         if(numPapiEvents >= MAX_TOTAL_EVENTS){
156                 CmiAbort("Exceed the pre-defined max number of papi events allowed!\n");
157         }
158         
159         papiEvents = new int[numPapiEvents];
160         papiValues = new long_long[numPapiEvents];
161         total_papi_counters = new CmiUInt8[numPapiEvents];
162         papi_counters_desc = new char *[numPapiEvents];
163         for(int i=0; i<numPapiEvents; i++){
164                 papiEvents[i] = lpapiEvents[i];
165                 total_papi_counters[i] = 0;
166                 CmiPrintf("%d: %s\n", i, eventDesc[i]);
167                 papi_counters_desc[i] = new char[strlen(eventDesc[i])+1];
168                 memcpy(papi_counters_desc[i], eventDesc[i], strlen(eventDesc[i]));
169                 papi_counters_desc[i][strlen(eventDesc[i])] = 0;
170         }
171         
172 /*
173     if (PAPI_query_event(PAPI_L1_DCM) == PAPI_OK) {
174       if (CmiMyPe()== 0) printf("PAPI_L1_DCM used\n");
175       papiEvents[numPapiEvents++] = PAPI_L1_DCM;   // L1 cache miss
176     }
177     if (PAPI_query_event(PAPI_TLB_DM) == PAPI_OK) {
178       if (CmiMyPe()== 0) printf("PAPI_TLB_DM used\n");
179       papiEvents[numPapiEvents++] = PAPI_TLB_DM;   // data TLB misses
180     }
181     if (PAPI_query_event(PAPI_MEM_RCY) == PAPI_OK) {
182       if (CmiMyPe()== 0) printf("PAPI_MEM_RCY used\n");
183       papiEvents[numPapiEvents++] = PAPI_MEM_RCY;  // idle cycle waiting for reads
184     }
185 */
186
187     int status = PAPI_start_counters(papiEvents, numPapiEvents);
188     if (status != PAPI_OK) {
189       CmiPrintf("PAPI_start_counters error code: %d\n", status);
190       switch (status) {
191       case PAPI_ENOEVNT:  CmiPrintf("PAPI Error> Hardware Event does not exist\n"); break;
192       case PAPI_ECNFLCT:  CmiPrintf("PAPI Error> Hardware Event exists, but cannot be counted due to counter resource limitations\n"); break;
193       }
194       CmiAbort("Unable to start PAPI counters!\n");
195     }
196 }
197
198 int read_counters(long_long *papiValues, int n) 
199 {
200   // PAPI_read_counters resets the counter, hence it behaves like the perfctr
201   // code for Origin2000
202   int status;
203   status = PAPI_read_counters(papiValues, n);
204   if (status != PAPI_OK) {
205     CmiPrintf("PAPI_read_counters error: %d\n", status);
206     CmiAbort("Failed to read PAPI counters!\n");
207   }
208   
209   /*
210   total_ins += papiValues[0];
211   total_fps += papiValues[1];
212   total_l1_dcm += papiValues[2];
213   total_mem_rcy += papiValues[3];
214   */
215   
216   return 0;
217 }
218
219 inline void CountPapiEvents(){
220   for(int i=0 ;i<numPapiEvents; i++)
221         total_papi_counters[i] += papiValues[i];
222 }
223
224 inline double Count2Time(long_long *papiValues, int n) { 
225   return papiValues[1]*cva(bgMach).fpfactor; 
226 }
227 #endif
228
229 /*****************************************************************************
230      Handler Table, one per thread
231 ****************************************************************************/
232
233 extern "C" void defaultBgHandler(char *null, void *uPtr)
234 {
235   CmiAbort("BG> Invalid Handler called!\n");
236 }
237
238 HandlerTable::HandlerTable()
239 {
240     handlerTableCount = 1;
241     handlerTable = new BgHandlerInfo [MAX_HANDLERS];
242     for (int i=0; i<MAX_HANDLERS; i++) {
243       handlerTable[i].fnPtr = defaultBgHandler;
244       handlerTable[i].userPtr = NULL;
245     }
246 }
247
248 inline int HandlerTable::registerHandler(BgHandler h)
249 {
250     ASSERT(!cva(simState).inEmulatorInit);
251     /* leave 0 as blank, so it can report error luckily */
252     int cur = handlerTableCount++;
253     if (cur >= MAX_HANDLERS)
254       CmiAbort("BG> HandlerID exceed the maximum.\n");
255     handlerTable[cur].fnPtr = (BgHandlerEx)h;
256     handlerTable[cur].userPtr = NULL;
257     return cur;
258 }
259
260 inline void HandlerTable::numberHandler(int idx, BgHandler h)
261 {
262     ASSERT(!cva(simState).inEmulatorInit);
263     if (idx >= handlerTableCount || idx < 1)
264       CmiAbort("BG> HandlerID exceed the maximum!\n");
265     handlerTable[idx].fnPtr = (BgHandlerEx)h;
266     handlerTable[idx].userPtr = NULL;
267 }
268
269 inline void HandlerTable::numberHandlerEx(int idx, BgHandlerEx h, void *uPtr)
270 {
271     ASSERT(!cva(simState).inEmulatorInit);
272     if (idx >= handlerTableCount || idx < 1)
273       CmiAbort("BG> HandlerID exceed the maximum!\n");
274     handlerTable[idx].fnPtr = h;
275     handlerTable[idx].userPtr = uPtr;
276 }
277
278 inline BgHandlerInfo * HandlerTable::getHandle(int handler)
279 {
280 #if 0
281     if (handler >= handlerTableCount) {
282       CmiPrintf("[%d] handler: %d handlerTableCount:%d. \n", tMYNODEID, handler, handlerTableCount);
283       CmiAbort("Invalid handler!");
284     }
285 #endif
286     if (handler >= handlerTableCount || handler<0) return NULL;
287     return &handlerTable[handler];
288 }
289
290 /*****************************************************************************
291       low level API
292 *****************************************************************************/
293
294 int BgRegisterHandler(BgHandler h)
295 {
296   ASSERT(!cva(simState).inEmulatorInit);
297   int cur;
298 #if CMK_BLUEGENE_NODE
299   return tMYNODE->handlerTable.registerHandler(h);
300 #else
301   if (tTHREADTYPE == COMM_THREAD) {
302     return tMYNODE->handlerTable.registerHandler(h);
303   }
304   else {
305     return tHANDLETAB.registerHandler(h);
306   }
307 #endif
308 }
309
310 void BgNumberHandler(int idx, BgHandler h)
311 {
312   ASSERT(!cva(simState).inEmulatorInit);
313 #if CMK_BLUEGENE_NODE
314   tMYNODE->handlerTable.numberHandler(idx,h);
315 #else
316   if (tTHREADTYPE == COMM_THREAD) {
317     tMYNODE->handlerTable.numberHandler(idx, h);
318   }
319   else {
320     tHANDLETAB.numberHandler(idx, h);
321   }
322 #endif
323 }
324
325 void BgNumberHandlerEx(int idx, BgHandlerEx h, void *uPtr)
326 {
327   ASSERT(!cva(simState).inEmulatorInit);
328 #if CMK_BLUEGENE_NODE
329   tMYNODE->handlerTable.numberHandlerEx(idx,h,uPtr);
330 #else
331   if (tTHREADTYPE == COMM_THREAD) {
332     tMYNODE->handlerTable.numberHandlerEx(idx,h,uPtr);
333   }
334   else {
335     tHANDLETAB.numberHandlerEx(idx,h,uPtr);
336   }
337 #endif
338 }
339
340 /*****************************************************************************
341       BG Timing Functions
342 *****************************************************************************/
343
344 void resetVTime()
345 {
346   /* reset start time */
347   int timingMethod = cva(bgMach).timingMethod;
348   if (timingMethod == BG_WALLTIME) {
349     double ct = BG_TIMER();
350     if (tTIMERON) CmiAssert(ct >= tSTARTTIME);
351     tSTARTTIME = ct;
352   }
353   else if (timingMethod == BG_ELAPSE)
354     tSTARTTIME = tCURRTIME;
355 #ifdef CMK_ORIGIN2000
356   else if (timingMethod == BG_COUNTER) {
357     if (start_counters(0, 21) <0) {
358       perror("start_counters");;
359     }
360   }
361 #elif CMK_HAS_COUNTER_PAPI
362   else if (timingMethod == BG_COUNTER) {
363     // do a fake read to reset the counters. It would be more efficient
364     // to use the low level API, but that would be a lot more code to
365     // write for now.
366     if (read_counters(papiValues, numPapiEvents) < 0) perror("read_counters");
367   }
368 #endif
369 }
370
371 void startVTimer()
372 {
373   CmiAssert(tTIMERON == 0);
374   resetVTime();
375   tTIMERON = 1;
376 }
377
378 // should be used only when BG_WALLTIME
379 static inline void advanceTime(double inc)
380 {
381   if (BG_ABS(inc) < 1e-10) inc = 0.0;    // ignore floating point errors
382   if (inc < 0.0) return;
383   CmiAssert(inc>=0.0);
384   inc *= cva(bgMach).cpufactor;
385   tCURRTIME += inc;
386   CmiAssert(tTIMERON==1);
387 }
388
389 void stopVTimer()
390 {
391   int k;
392 #if 0
393   if (tTIMERON != 1) {
394     CmiAbort("stopVTimer called without startVTimer!\n");
395   }
396   CmiAssert(tTIMERON == 1);
397 #else
398   if (tTIMERON == 0) return;         // already stopped
399 #endif
400   const int timingMethod = cva(bgMach).timingMethod;
401   if (timingMethod == BG_WALLTIME) {
402     const double tp = BG_TIMER();
403     double inc = tp-tSTARTTIME;
404     advanceTime(inc-cva(bgMach).timercost);
405 //    tSTARTTIME = BG_TIMER();  // skip the above time
406   }
407   else if (timingMethod == BG_ELAPSE) {
408     // if no bgelapse called, assume it takes 1us
409     if (tCURRTIME-tSTARTTIME < 1E-9) {
410 //      tCURRTIME += 1e-6;
411     }
412   }
413   else if (timingMethod == BG_COUNTER)  {
414 #if CMK_ORIGIN2000
415     long long c0, c1;
416     if (read_counters(0, &c0, 21, &c1) < 0) perror("read_counters");
417     tCURRTIME += Count2Time(c1);
418 #elif CMK_HAS_COUNTER_PAPI
419     if (read_counters(papiValues, numPapiEvents) < 0) perror("read_counters");
420     CountPapiEvents();
421     tCURRTIME += Count2Time(papiValues, numPapiEvents);
422 #endif
423   }
424   tTIMERON = 0;
425 }
426
427 double BgGetTime()
428 {
429 #if 1
430   const int timingMethod = cva(bgMach).timingMethod;
431   if (timingMethod == BG_WALLTIME) {
432     /* accumulate time since last starttime, and reset starttime */
433     if (tTIMERON) {
434       const double tp2= BG_TIMER();
435       double &startTime = tSTARTTIME;
436       double inc = tp2 - startTime;
437       advanceTime(inc-cva(bgMach).timercost);
438       startTime = BG_TIMER();
439     }
440     return tCURRTIME;
441   }
442   else if (timingMethod == BG_ELAPSE) {
443     return tCURRTIME;
444   }
445   else if (timingMethod == BG_COUNTER) {
446     if (tTIMERON) {
447 #if CMK_ORIGIN2000
448       long long c0, c1;
449       if (read_counters(0, &c0, 21, &c1) <0) perror("read_counters");;
450       tCURRTIME += Count2Time(c1);
451       if (start_counters(0, 21)<0) perror("start_counters");;
452 #elif CMK_HAS_COUNTER_PAPI
453     if (read_counters(papiValues, numPapiEvents) < 0) perror("read_counters");
454     tCURRTIME += Count2Time(papiValues, numPapiEvents);
455 #endif
456     }
457     return tCURRTIME;
458   }
459   else 
460     CmiAbort("Unknown Timing Method.");
461   return -1;
462 #else
463   /* sometime I am interested in real wall time */
464   tCURRTIME = CmiWallTimer();
465   return tCURRTIME;
466 #endif
467 }
468
469 // moved to blue_logs.C
470 double BgGetCurTime()
471 {
472   ASSERT(tTHREADTYPE == WORK_THREAD);
473   return tCURRTIME;
474 }
475
476 extern "C" 
477 void BgElapse(double t)
478 {
479 //  ASSERT(tTHREADTYPE == WORK_THREAD);
480 #if CMK_ERROR_CHECKING
481   if (tTHREADTYPE == WORK_THREAD)
482   CthCheckMagic(CthSelf());
483 #endif
484   if (cva(bgMach).timingMethod == BG_ELAPSE)
485     tCURRTIME += t;
486 }
487
488 // advance virtual timer no matter what scheme is used
489 extern "C" 
490 void BgAdvance(double t)
491 {
492 //  ASSERT(tTHREADTYPE == WORK_THREAD);
493   tCURRTIME += t;
494 }
495
496 /* BG API Func
497  * called by a communication thread to test if poll data 
498  * in the node's INBUFFER for its own queue 
499  */
500 char * getFullBuffer()
501 {
502   /* I must be a communication thread */
503   if (tTHREADTYPE != COMM_THREAD) 
504     CmiAbort("GetFullBuffer called by a non-communication thread!\n");
505
506   return tMYNODE->getFullBuffer();
507 }
508
509 /**  BG API Func
510  * called by a Converse handler or sendPacket()
511  * add message msgPtr to a bluegene node's inbuffer queue 
512  */
513 extern "C"
514 void addBgNodeInbuffer(char *msgPtr, int lnodeID)
515 {
516 #ifndef CMK_OPTIMIZE
517   if (lnodeID >= cva(numNodes)) CmiAbort("NodeID is out of range!");
518 #endif
519   nodeInfo &nInfo = cva(nodeinfo)[lnodeID];
520
521   //printf("Adding a msg %p to local node %d and its thread %d\n", msgPtr, lnodeID, CmiBgMsgThreadID(msgPtr));  
522         
523   nInfo.addBgNodeInbuffer(msgPtr);
524 }
525
526 /** BG API Func 
527  *  called by a comm thread
528  *  add a message to a thread's affinity queue in same node 
529  */
530 void addBgThreadMessage(char *msgPtr, int threadID)
531 {
532 #ifndef CMK_OPTIMIZE
533   if (!cva(bgMach).isWorkThread(threadID)) CmiAbort("ThreadID is out of range!");
534 #endif
535   workThreadInfo *tInfo = (workThreadInfo *)tMYNODE->threadinfo[threadID];
536   tInfo->addAffMessage(msgPtr);
537 }
538
539 /** BG API Func 
540  *  called by a comm thread, add a message to a node's non-affinity queue 
541  */
542 void addBgNodeMessage(char *msgPtr)
543 {
544   tMYNODE->addBgNodeMessage(msgPtr);
545 }
546
547 void BgEnqueue(char *msg)
548 {
549 #if 0
550   ASSERT(tTHREADTYPE == WORK_THREAD);
551   workThreadInfo *tinfo = (workThreadInfo *)cta(threadinfo);
552   tinfo->addAffMessage(msg);
553 #else
554   nodeInfo *myNode = cta(threadinfo)->myNode;
555   addBgNodeInbuffer(msg, myNode->id);
556 #endif
557 }
558
559 /** BG API Func 
560  *  check if inBuffer on this node has msg available
561  */
562 int checkReady()
563 {
564   if (tTHREADTYPE != COMM_THREAD)
565     CmiAbort("checkReady called by a non-communication thread!\n");
566   return !tINBUFFER.isEmpty();
567 }
568
569
570 /* handler to process the msg */
571 void msgHandlerFunc(char *msg)
572 {
573   /* bgmsg is CmiMsgHeaderSizeBytes offset of original message pointer */
574   int gnodeID = CmiBgMsgNodeID(msg);
575   if (gnodeID >= 0) {
576 #ifndef CMK_OPTIMIZE
577     if (nodeInfo::Global2PE(gnodeID) != CmiMyPe())
578       CmiAbort("msgHandlerFunc received wrong message!");
579 #endif
580     int lnodeID = nodeInfo::Global2Local(gnodeID);
581     if (cva(bgMach).inReplayMode()) {
582       int x, y, z;
583       int node;
584       if (cva(bgMach).replay != -1) {
585         node = cva(bgMach).replay;
586         node = node / cva(bgMach).numWth;
587       }
588       if (cva(bgMach).replaynode != -1) {
589         node = cva(bgMach).replaynode;
590       }
591       BgGetXYZ(node, &x, &y, &z);
592       if (nodeInfo::XYZ2Local(x,y,z) != lnodeID) return;
593       else lnodeID = 0;
594     }
595     addBgNodeInbuffer(msg, lnodeID);
596   }
597   else {
598     CmiAbort("Invalid message!");
599   }
600 }
601
602 /* Converse handler for node level broadcast message */
603 void nodeBCastMsgHandlerFunc(char *msg)
604 {
605   /* bgmsg is CmiMsgHeaderSizeBytes offset of original message pointer */
606   int gnodeID = CmiBgMsgNodeID(msg);
607   CmiInt2 threadID = CmiBgMsgThreadID(msg);
608   int lnodeID;
609
610   if (gnodeID < -1) {
611     gnodeID = - (gnodeID+100);
612     if (cva(bgMach).replaynode != -1) {
613       if (gnodeID == cva(bgMach).replaynode)
614           lnodeID = 0;
615       else
616           lnodeID = -1;
617     }
618     else if (nodeInfo::Global2PE(gnodeID) == CmiMyPe())
619       lnodeID = nodeInfo::Global2Local(gnodeID);
620     else
621       lnodeID = -1;
622   }
623   else {
624     ASSERT(gnodeID == -1);
625     lnodeID = gnodeID;
626   }
627   // broadcast except lnodeID:threadId
628   int len = CmiBgMsgLength(msg);
629   int count = 0;
630   for (int i=0; i<cva(numNodes); i++)
631   {
632     if (i==lnodeID) continue;
633     char *dupmsg;
634     if (count == 0) dupmsg = msg;
635     else dupmsg = CmiCopyMsg(msg, len);
636     DEBUGF(("addBgNodeInbuffer to %d\n", i));
637     CmiBgMsgNodeID(dupmsg) = nodeInfo::Local2Global(i);         // updated
638     addBgNodeInbuffer(dupmsg, i);
639     count ++;
640   }
641   if (count == 0) CmiFree(msg);
642 }
643
644 // clone a msg, only has a valid header, plus a pointer to the real msg
645 char *BgCloneMsg(char *msg)
646 {
647   int size = CmiBlueGeneMsgHeaderSizeBytes + sizeof(char *);
648   char *dupmsg = (char *)CmiAlloc(size);
649   memcpy(dupmsg, msg, CmiBlueGeneMsgHeaderSizeBytes);
650   *(char **)(dupmsg + CmiBlueGeneMsgHeaderSizeBytes) = msg;
651   CmiBgMsgRefCount(msg) ++;
652   CmiBgMsgFlag(dupmsg) = BG_CLONE;
653   return dupmsg;
654 }
655
656 // expand the cloned msg to the full size msg
657 char *BgExpandMsg(char *msg)
658 {
659   char *origmsg = *(char **)(msg + CmiBlueGeneMsgHeaderSizeBytes);
660   int size = CmiBgMsgLength(origmsg);
661   char *dupmsg = (char *)CmiAlloc(size);
662   memcpy(dupmsg, msg, CmiBlueGeneMsgHeaderSizeBytes);
663   memcpy(dupmsg+CmiBlueGeneMsgHeaderSizeBytes, origmsg+CmiBlueGeneMsgHeaderSizeBytes, size-CmiBlueGeneMsgHeaderSizeBytes);
664   CmiFree(msg);
665   CmiBgMsgRefCount(origmsg) --;
666   if (CmiBgMsgRefCount(origmsg) == 0) CmiFree(origmsg);
667   return dupmsg;
668 }
669
670 /* Converse handler for thread level broadcast message */
671 void threadBCastMsgHandlerFunc(char *msg)
672 {
673   /* bgmsg is CmiMsgHeaderSizeBytes offset of original message pointer */
674   int gnodeID = CmiBgMsgNodeID(msg);
675   CmiInt2 threadID = CmiBgMsgThreadID(msg);
676   if (cva(bgMach).replay != -1) {
677     if (gnodeID < -1) {
678       gnodeID = - (gnodeID+100);
679       if (gnodeID == cva(bgMach).replay/cva(bgMach).numWth && threadID == cva(bgMach).replay%cva(bgMach).numWth)
680         return;
681     }
682     CmiBgMsgThreadID(msg) = 0;
683     DEBUGF(("[%d] addBgNodeInbuffer to %d tid:%d\n", CmiMyPe(), i, j));
684     addBgNodeInbuffer(msg, 0);
685     return;
686   }
687   int lnodeID;
688   if (gnodeID < -1) {
689       gnodeID = - (gnodeID+100);
690       if (cva(bgMach).replaynode != -1) {
691         if (gnodeID == cva(bgMach).replaynode)
692           lnodeID = 0;
693         else
694           lnodeID = -1;
695       }
696       else if (nodeInfo::Global2PE(gnodeID) == CmiMyPe())
697         lnodeID = nodeInfo::Global2Local(gnodeID);
698       else
699         lnodeID = -1;
700       CmiAssert(threadID != ANYTHREAD);
701   }
702   else {
703     ASSERT(gnodeID == -1);
704     lnodeID = gnodeID;
705   }
706   // broadcast except nodeID:threadId
707   int len = CmiBgMsgLength(msg);
708   // optimization needed if the message size is big
709   // making duplications can easily run out of memory
710   int bigOpt = (len > 4096);
711   for (int i=0; i<cva(numNodes); i++)
712   {
713       for (int j=0; j<cva(bgMach).numWth; j++) {
714         if (i==lnodeID && j==threadID) continue;
715         // for big message, clone a message token instead of a real msg
716         char *dupmsg = bigOpt? BgCloneMsg(msg) : CmiCopyMsg(msg, len);
717         CmiBgMsgNodeID(dupmsg) = nodeInfo::Local2Global(i);
718         CmiBgMsgThreadID(dupmsg) = j;
719         DEBUGF(("[%d] addBgNodeInbuffer to %d tid:%d\n", CmiMyPe(), i, j));
720         addBgNodeInbuffer(dupmsg, i);
721       }
722   }
723   // for big message, will free after all tokens are done
724   if (!bigOpt) CmiFree(msg);
725 }
726
727 /**
728  *              BG Messaging Functions
729  */
730
731 static inline double MSGTIME(int ox, int oy, int oz, int nx, int ny, int nz, int bytes)
732 {
733   return cva(bgMach).network->latency(ox, oy, oz, nx, ny, nz, bytes);
734 }
735
736 /**
737  *   a simple message streaming on demand and special purpose
738  *   user call  BgStartStreaming() and BgEndStreaming()
739  *   each worker thread call one send, and all sends are sent
740  *   via multiplesend at the end
741  */
742
743 static int bg_streaming = 0;
744
745 class BgStreaming {
746 public:
747   char **streamingMsgs;
748   int  *streamingMsgSizes;
749   int count;
750   int totalWorker;
751   int pe;
752 public:
753   BgStreaming() {
754     streamingMsgs = NULL;
755     streamingMsgSizes = NULL;
756     count = 0;
757     totalWorker = 0;
758     pe = -1;
759   }
760   ~BgStreaming() {
761     if (streamingMsgs) {
762       delete [] streamingMsgs;
763       delete [] streamingMsgSizes;
764     }
765   }
766   void init(int nNodes) {
767     totalWorker = nNodes * BgGetNumWorkThread();
768     streamingMsgs = new char *[totalWorker];
769     streamingMsgSizes = new int [totalWorker];
770   }
771   void depositMsg(int p, int size, char *m) {
772     streamingMsgs[count] = m;
773     streamingMsgSizes[count] = size;
774     count ++;
775     if (pe == -1) pe = p;
776     else CmiAssert(pe == p);
777     if (count == totalWorker) {
778       // CkPrintf("streaming send\n");
779       CmiMultipleSend(pe, count, streamingMsgSizes, streamingMsgs);
780       for (int i=0; i<count; i++) CmiFree(streamingMsgs[i]);
781       pe = -1;
782       count = 0;
783     }
784   }
785 };
786
787 BgStreaming bgstreaming;
788
789 void BgStartStreaming()
790 {
791   bg_streaming = 1;
792 }
793
794 void BgEndStreaming()
795 {
796   bg_streaming = 0;
797 }
798
799 void CmiSendPacketWrapper(int pe, int msgSize,char *msg, int streaming)
800 {
801   if (streaming && pe != CmiMyPe())
802     bgstreaming.depositMsg(pe, msgSize, msg);
803   else
804     CmiSyncSendAndFree(pe, msgSize, msg);
805 }
806
807
808 void CmiSendPacket(int x, int y, int z, int msgSize,char *msg)
809 {
810 //  CmiSyncSendAndFree(nodeInfo::XYZ2RealPE(x,y,z),msgSize,(char *)msg);
811 #if !DELAY_SEND
812   const int pe = nodeInfo::XYZ2RealPE(x,y,z);
813   CmiSendPacketWrapper(pe, msgSize, msg, bg_streaming);
814 #else
815   if (!correctTimeLog) {
816     const int pe = nodeInfo::XYZ2RealPE(x,y,z);
817     CmiSendPacketWrapper(pe, msgSize, msg, bg_streaming);
818   }
819   // else messages are kept in the log (MsgEntry), and only will be sent
820   // after timing correction has done on that log.
821   // TODO: streaming has no effect if time correction is on.
822 #endif
823 }
824
825 /* send will copy data to msg buffer */
826 /* user data is not free'd in this routine, user can reuse the data ! */
827 void sendPacket_(nodeInfo *myNode, int x, int y, int z, int threadID, int handlerID, WorkType type, int numbytes, char* sendmsg, int local)
828 {
829   //CmiPrintStackTrace(0);
830   double sendT = BgGetTime();
831
832   double latency;
833   CmiSetHandler(sendmsg, cva(simState).msgHandler);
834   CmiBgMsgNodeID(sendmsg) = nodeInfo::XYZ2Global(x,y,z);
835   CmiBgMsgThreadID(sendmsg) = threadID;
836   CmiBgMsgHandle(sendmsg) = handlerID;
837   CmiBgMsgType(sendmsg) = type;
838   CmiBgMsgLength(sendmsg) = numbytes;
839   CmiBgMsgFlag(sendmsg) = 0;
840   CmiBgMsgRefCount(sendmsg) = 0;
841   if (local) {
842     if (correctTimeLog) BgAdvance(CHARM_OVERHEAD);
843     latency = 0.0;
844   }
845   else {
846     if (correctTimeLog) BgAdvance(cva(bgMach).network->alphacost());
847     latency = MSGTIME(myNode->x, myNode->y, myNode->z, x,y,z, numbytes);
848     CmiAssert(latency >= 0);
849   }
850   CmiBgMsgRecvTime(sendmsg) = latency + sendT;
851   
852   // timing
853   BG_ADDMSG(sendmsg, CmiBgMsgNodeID(sendmsg), threadID, sendT, local, 1);
854
855   //static int addCnt=1; //for debugging only
856   //DEBUGM(4, ("N[%d] add a msg (handler=%d | cnt=%d | len=%d | type=%d | node id:%d\n", BgMyNode(), handlerID, addCnt, numbytes, type, CmiBgMsgNodeID(sendmsg)));  
857   //addCnt++;
858
859   if (local){
860       /* Here local refers to the fact that msg is sent to the processor itself
861        * therefore, we just add this msg to the thread itself
862        */
863       //addBgThreadMessage(sendmsg,threadID);
864       addBgNodeInbuffer(sendmsg, myNode->id);
865   }    
866   else
867     CmiSendPacket(x, y, z, numbytes, sendmsg);
868
869   // bypassing send time
870   resetVTime();
871 }
872
873 /* broadcast will copy data to msg buffer */
874 static inline void nodeBroadcastPacketExcept_(int node, CmiInt2 threadID, int handlerID, WorkType type, int numbytes, char* sendmsg)
875 {
876   double sendT = BgGetTime();
877
878   nodeInfo *myNode = cta(threadinfo)->myNode;
879   CmiSetHandler(sendmsg, cva(simState).nBcastMsgHandler);
880   if (node >= 0)
881     CmiBgMsgNodeID(sendmsg) = -node-100;
882   else
883     CmiBgMsgNodeID(sendmsg) = node;
884   CmiBgMsgThreadID(sendmsg) = threadID; 
885   CmiBgMsgHandle(sendmsg) = handlerID;  
886   CmiBgMsgType(sendmsg) = type; 
887   CmiBgMsgLength(sendmsg) = numbytes;
888   CmiBgMsgFlag(sendmsg) = 0;
889   CmiBgMsgRefCount(sendmsg) = 0;
890   /* FIXME */
891   CmiBgMsgRecvTime(sendmsg) = MSGTIME(myNode->x, myNode->y, myNode->z, 0,0,0, numbytes) + sendT;
892
893   // timing
894   // FIXME
895   BG_ADDMSG(sendmsg, CmiBgMsgNodeID(sendmsg), threadID, sendT, 0, 1);
896
897   DEBUGF(("[%d]CmiSyncBroadcastAllAndFree node: %d\n", BgMyNode(), node));
898 #if DELAY_SEND
899   if (!correctTimeLog)
900 #endif
901   CmiSyncBroadcastAllAndFree(numbytes,sendmsg);
902
903   resetVTime();
904 }
905
906 /* broadcast will copy data to msg buffer */
907 static inline void threadBroadcastPacketExcept_(int node, CmiInt2 threadID, int handlerID, WorkType type, int numbytes, char* sendmsg)
908 {
909   CmiSetHandler(sendmsg, cva(simState).tBcastMsgHandler);       
910   if (node >= 0)
911     CmiBgMsgNodeID(sendmsg) = -node-100;
912   else
913     CmiBgMsgNodeID(sendmsg) = node;
914   CmiBgMsgThreadID(sendmsg) = threadID; 
915   CmiBgMsgHandle(sendmsg) = handlerID;  
916   CmiBgMsgType(sendmsg) = type; 
917   CmiBgMsgLength(sendmsg) = numbytes;
918   CmiBgMsgFlag(sendmsg) = 0;
919   CmiBgMsgRefCount(sendmsg) = 0;
920   /* FIXME */
921   if (correctTimeLog) BgAdvance(cva(bgMach).network->alphacost());
922   double sendT = BgGetTime();
923   CmiBgMsgRecvTime(sendmsg) = sendT;    
924
925   // timing
926 #if 0
927   if (node == BG_BROADCASTALL) {
928     for (int i=0; i<_bgSize; i++) {
929       for (int j=0; j<cva(numWth); j++) {
930         BG_ADDMSG(sendmsg, node);
931       }
932     }
933   }
934   else {
935     CmiAssert(node >= 0);
936     BG_ADDMSG(sendmsg, (node+100));
937   }
938 #else
939   // FIXME
940   BG_ADDMSG(sendmsg, CmiBgMsgNodeID(sendmsg), threadID, sendT, 0, 1);
941 #endif
942
943   DEBUGF(("[%d]CmiSyncBroadcastAllAndFree node: %d tid:%d recvT:%f\n", BgMyNode(), node, threadID, CmiBgMsgRecvTime(sendmsg)));
944 #if DELAY_SEND
945   if (!correctTimeLog)
946 #endif
947   CmiSyncBroadcastAllAndFree(numbytes,sendmsg);
948
949   resetVTime();
950 }
951
952
953 /* sendPacket to route */
954 /* this function can be called by any thread */
955 void BgSendNonLocalPacket(nodeInfo *myNode, int x, int y, int z, int threadID, int handlerID, WorkType type, int numbytes, char * data)
956 {
957   if (cva(bgMach).inReplayMode()) return;     // replay mode, no outgoing msg
958
959 #ifndef CMK_OPTIMIZE
960   if (x<0 || y<0 || z<0 || x>=cva(bgMach).x || y>=cva(bgMach).y || z>=cva(bgMach).z) {
961     CmiPrintf("Trying to send packet to a nonexisting node: (%d %d %d)!\n", x,y,z);
962     CmiAbort("Abort!\n");
963   }
964 #endif
965
966   sendPacket_(myNode, x, y, z, threadID, handlerID, type, numbytes, data, 0);
967 }
968
969 static void _BgSendLocalPacket(nodeInfo *myNode, int threadID, int handlerID, WorkType type, int numbytes, char * data)
970 {
971   if (cva(bgMach).replay!=-1) { // replay mode
972     int t = cva(bgMach).replay%BgGetNumWorkThread();
973     if (t == threadID) threadID = 0;
974     else return;
975   }
976
977   sendPacket_(myNode, myNode->x, myNode->y, myNode->z, threadID, handlerID, type, numbytes, data, 1);
978 }
979
980 void BgSendLocalPacket(int threadID, int handlerID, WorkType type,
981                        int numbytes, char* data)
982 {
983   nodeInfo *myNode = cta(threadinfo)->myNode;
984
985   if (cva(bgMach).replay!=-1) {     // replay mode
986     threadID = 0;
987     CmiAssert(threadID != -1);
988   }
989
990   _BgSendLocalPacket(myNode, threadID, handlerID, type, numbytes, data);
991 }
992
993 /* wrapper of the previous two functions */
994 void BgSendPacket(int x, int y, int z, int threadID, int handlerID, WorkType type, int numbytes, char * data)
995 {
996   nodeInfo *myNode = cta(threadinfo)->myNode;
997   if (myNode->x == x && myNode->y == y && myNode->z == z)
998     _BgSendLocalPacket(myNode,threadID, handlerID, type, numbytes, data);
999   else
1000     BgSendNonLocalPacket(myNode,x,y,z,threadID,handlerID, type, numbytes, data);
1001 }
1002
1003 void BgBroadcastPacketExcept(int node, CmiInt2 threadID, int handlerID, WorkType type, int numbytes, char * data)
1004 {
1005   nodeBroadcastPacketExcept_(node, threadID, handlerID, type, numbytes, data);
1006 }
1007
1008 void BgBroadcastAllPacket(int handlerID, WorkType type, int numbytes, char * data)
1009 {
1010   nodeBroadcastPacketExcept_(BG_BROADCASTALL, ANYTHREAD, handlerID, type, numbytes, data);
1011 }
1012
1013 void BgThreadBroadcastPacketExcept(int node, CmiInt2 threadID, int handlerID, WorkType type, int numbytes, char * data)
1014 {
1015   if (cva(bgMach).replay!=-1) return;    // replay mode
1016   else if (cva(bgMach).replaynode!=-1) {    // replay mode
1017     //if (node!=-1 && node == cva(bgMach).replaynode/cva(bgMach).numWth)
1018     //  return;
1019   }
1020   threadBroadcastPacketExcept_(node, threadID, handlerID, type, numbytes, data);
1021 }
1022
1023 void BgThreadBroadcastAllPacket(int handlerID, WorkType type, int numbytes, char * data)
1024 {
1025   if (cva(bgMach).replay!=-1) {      // replay mode, send only to itself
1026     int t = cva(bgMach).replay%BgGetNumWorkThread();
1027     BgSendLocalPacket(t, handlerID, type, numbytes, data);
1028     return;
1029   }
1030   threadBroadcastPacketExcept_(BG_BROADCASTALL, ANYTHREAD, handlerID, type, numbytes, data);
1031 }
1032
1033 /**
1034  send a msg to a list of processors (processors represented in global seq #
1035 */
1036 void BgSyncListSend(int npes, int *pes, int handlerID, WorkType type, int numbytes, char *msg)
1037 {
1038   nodeInfo *myNode = cta(threadinfo)->myNode;
1039
1040   CmiSetHandler(msg, cva(simState).msgHandler);
1041   CmiBgMsgHandle(msg) = handlerID;
1042   CmiBgMsgType(msg) = type;
1043   CmiBgMsgLength(msg) = numbytes;
1044   CmiBgMsgFlag(msg) = 0;
1045   CmiBgMsgRefCount(msg) = 0;
1046
1047   if (correctTimeLog) BgAdvance(cva(bgMach).network->alphacost());
1048
1049   double now = BgGetTime();
1050
1051   // send one by one
1052   for (int i=0; i<npes; i++)
1053   {
1054     int local = 0;
1055     int x,y,z,t;
1056     int pe = pes[i];
1057     int node;
1058 #if CMK_BLUEGENE_NODE
1059     CmiAbort("Not implemented yet!");
1060 #else
1061     t = pe%BgGetNumWorkThread();
1062     node = pe/BgGetNumWorkThread();
1063     BgGetXYZ(node, &x, &y, &z);
1064 #endif
1065
1066     char *sendmsg = CmiCopyMsg(msg, numbytes);
1067     CmiBgMsgNodeID(sendmsg) = nodeInfo::XYZ2Global(x,y,z);
1068     CmiBgMsgThreadID(sendmsg) = t;
1069     double latency = MSGTIME(myNode->x, myNode->y, myNode->z, x,y,z, numbytes);
1070     CmiAssert(latency >= 0);
1071     CmiBgMsgRecvTime(sendmsg) = latency + now;
1072
1073     if (myNode->x == x && myNode->y == y && myNode->z == z) local = 1;
1074
1075     // timing and make sure all msgID are the same
1076     if (i!=0) CpvAccess(msgCounter) --;
1077     BG_ADDMSG(sendmsg, CmiBgMsgNodeID(sendmsg), t, now, local, i==0?npes:-1);
1078
1079     BgSendPacket(x, y, z, t, handlerID, type, numbytes, sendmsg);
1080 /*
1081     if (myNode->x == x && myNode->y == y && myNode->z == z)
1082       addBgNodeInbuffer(sendmsg, myNode->id);
1083     else
1084       CmiSendPacket(x, y, z, numbytes, sendmsg);
1085 */
1086   }
1087
1088   CmiFree(msg);
1089
1090   resetVTime();
1091 }
1092
1093 /*****************************************************************************
1094       BG node level API - utilities
1095 *****************************************************************************/
1096
1097 /* must be called in a communication or worker thread */
1098 void BgGetMyXYZ(int *x, int *y, int *z)
1099 {
1100   ASSERT(!cva(simState).inEmulatorInit);
1101   *x = tMYX; *y = tMYY; *z = tMYZ;
1102 }
1103
1104 void BgGetXYZ(int seq, int *x, int *y, int *z)
1105 {
1106   nodeInfo::Global2XYZ(seq, x, y, z);
1107 }
1108
1109 void BgGetSize(int *sx, int *sy, int *sz)
1110 {
1111   cva(bgMach).getSize(sx, sy, sz);
1112 }
1113
1114 int BgTraceProjectionOn(int pe)
1115 {
1116   return cva(bgMach).traceProejctions(pe);
1117 }
1118
1119 /* return the total number of Blue gene nodes */
1120 int BgNumNodes()
1121 {
1122   return _bgSize;
1123 }
1124
1125 void BgSetNumNodes(int x)
1126 {
1127   _bgSize = x;
1128 }
1129
1130 /* can only called in emulatorinit */
1131 void BgSetSize(int sx, int sy, int sz)
1132 {
1133   ASSERT(cva(simState).inEmulatorInit);
1134   cva(bgMach).setSize(sx, sy, sz);
1135 }
1136
1137 /* return number of bg nodes on this emulator node */
1138 int BgNodeSize()
1139 {
1140   ASSERT(!cva(simState).inEmulatorInit);
1141   return cva(numNodes);
1142 }
1143
1144 /* return the bg node ID (local array index) */
1145 int BgMyRank()
1146 {
1147 #ifndef CMK_OPTIMIZE
1148   if (tMYNODE == NULL) CmiAbort("Calling BgMyRank in the main thread!");
1149 #endif
1150   ASSERT(!cva(simState).inEmulatorInit);
1151   return tMYNODEID;
1152 }
1153
1154 /* return my serialed blue gene node number */
1155 int BgMyNode()
1156 {
1157 #ifndef CMK_OPTIMIZE
1158   if (tMYNODE == NULL) CmiAbort("Calling BgMyNode in the main thread!");
1159 #endif
1160   return nodeInfo::XYZ2Global(tMYX, tMYY, tMYZ);
1161 }
1162
1163 /* return a real processor number from a bg node */
1164 int BgNodeToRealPE(int node)
1165 {
1166   return nodeInfo::Global2PE(node);
1167 }
1168
1169 // thread ID on a BG node
1170 int BgGetThreadID()
1171 {
1172   ASSERT(tTHREADTYPE == WORK_THREAD || tTHREADTYPE == COMM_THREAD);
1173 //  if (cva(bgMach).numWth == 1) return 0;   // accessing ctv is expensive
1174   return tMYID;
1175 }
1176
1177 void BgSetThreadID(int x)
1178 {
1179   tMYID = x;
1180 }
1181
1182 int BgGetGlobalThreadID()
1183 {
1184   ASSERT(tTHREADTYPE == WORK_THREAD || tTHREADTYPE == COMM_THREAD);
1185   return nodeInfo::Local2Global(tMYNODEID)*(cva(bgMach).numTh())+tMYID;
1186   //return tMYGLOBALID;
1187 }
1188
1189 int BgGetGlobalWorkerThreadID()
1190 {
1191   ASSERT(tTHREADTYPE == WORK_THREAD);
1192 //  return nodeInfo::Local2Global(tMYNODEID)*cva(bgMach).numWth+tMYID;
1193   return tMYGLOBALID;
1194 }
1195
1196 void BgSetGlobalWorkerThreadID(int pe)
1197 {
1198   ASSERT(tTHREADTYPE == WORK_THREAD);
1199 //  return nodeInfo::Local2Global(tMYNODEID)*cva(bgMach).numWth+tMYID;
1200   tMYGLOBALID = pe;
1201 }
1202
1203 char *BgGetNodeData()
1204 {
1205   return tUSERDATA;
1206 }
1207
1208 void BgSetNodeData(char *data)
1209 {
1210   ASSERT(!cva(simState).inEmulatorInit);
1211   tUSERDATA = data;
1212 }
1213
1214 int BgGetNumWorkThread()
1215 {
1216   return cva(bgMach).numWth;
1217 }
1218
1219 void BgSetNumWorkThread(int num)
1220 {
1221   if (!cva(bgMach).inReplayMode()) ASSERT(cva(simState).inEmulatorInit);
1222   cva(bgMach).numWth = num;
1223 }
1224
1225 int BgGetNumCommThread()
1226 {
1227   return cva(bgMach).numCth;
1228 }
1229
1230 void BgSetNumCommThread(int num)
1231 {
1232   ASSERT(cva(simState).inEmulatorInit);
1233   cva(bgMach).numCth = num;
1234 }
1235
1236 /*****************************************************************************
1237       Communication and Worker threads
1238 *****************************************************************************/
1239
1240 BgStartHandler  workStartFunc = NULL;
1241
1242 void BgSetWorkerThreadStart(BgStartHandler f)
1243 {
1244   workStartFunc = f;
1245 }
1246
1247 extern "C" void CthResumeNormalThread(CthThreadToken* token);
1248
1249 // kernel function for processing a bluegene message
1250 void BgProcessMessageDefault(threadInfo *tinfo, char *msg)
1251 {
1252   DEBUGM(5, ("=====Begin of BgProcessing a msg on node[%d]=====\n", BgMyNode()));
1253   int handler = CmiBgMsgHandle(msg);
1254   //CmiPrintf("[%d] call handler %d\n", BgMyNode(), handler);
1255   CmiAssert(handler < 1000);
1256
1257   BgHandlerInfo *handInfo;
1258 #if  CMK_BLUEGENE_NODE
1259   HandlerTable hdlTbl = tMYNODE->handlerTable;
1260   handInfo = hdlTbl.getHandle(handler);
1261 #else
1262   HandlerTable hdlTbl = tHANDLETAB;
1263   handInfo = hdlTbl.getHandle(handler);
1264   if (handInfo == NULL) handInfo = tMYNODE->handlerTable.getHandle(handler);
1265 #endif
1266
1267   if (handInfo == NULL) {
1268     CmiPrintf("[%d] invalid handler: %d. \n", tMYNODEID, handler);
1269     CmiAbort("BgProcessMessage Failed!");
1270   }
1271   BgHandlerEx entryFunc = handInfo->fnPtr;
1272
1273   if (programExit == 2) return;    // program exit already
1274
1275   CmiSetHandler(msg, CmiBgMsgHandle(msg));
1276
1277   // optimization for broadcast messages:
1278   // if the msg is a broadcast token msg, expand it to a real msg
1279   if (CmiBgMsgFlag(msg) == BG_CLONE) {
1280     msg = BgExpandMsg(msg);
1281   }
1282
1283   if (tinfo->watcher) tinfo->watcher->record(msg);
1284
1285   // don't count thread overhead and timing overhead
1286   startVTimer();
1287
1288   DEBUGM(5, ("Executing function %p\n", entryFunc));    
1289
1290   entryFunc(msg, handInfo->userPtr);
1291
1292 #if CMK_ERROR_CHECKING
1293   CthCheckMagic(CthSelf());
1294 #endif
1295
1296   stopVTimer();
1297
1298   DEBUGM(5, ("=====End of BgProcessing a msg on node[%d]=====\n\n", BgMyNode()));
1299 }
1300
1301 void  (*BgProcessMessage)(threadInfo *t, char *msg) = BgProcessMessageDefault;
1302
1303 void scheduleWorkerThread(char *msg)
1304 {
1305   CthThread tid = (CthThread)msg;
1306 //CmiPrintf("scheduleWorkerThread %p\n", tid);
1307   CthAwaken(tid);
1308 }
1309
1310 // thread entry
1311 // for both comm and work thread, virtual function
1312 void run_thread(threadInfo *tinfo)
1313 {
1314   /* set the thread-private threadinfo */
1315   cta(threadinfo) = tinfo;
1316   tinfo->run();
1317 }
1318
1319 /* should be done only once per bg node */
1320 void BgNodeInitialize(nodeInfo *ninfo)
1321 {
1322   CthThread t;
1323   int i;
1324
1325   /* this is here because I will put a message to node inbuffer */
1326   tCURRTIME = 0.0;
1327   tSTARTTIME = CmiWallTimer();
1328
1329   /* creat work threads */
1330   for (i=0; i< cva(bgMach).numWth; i++)
1331   {
1332     threadInfo *tinfo = ninfo->threadinfo[i];
1333     t = CthCreate((CthVoidFn)run_thread, tinfo, cva(bgMach).stacksize);
1334     if (t == NULL) CmiAbort("BG> Failed to create worker thread. \n");
1335 #if CMK_ERROR_CHECKING
1336     CthSetMagic(t);
1337 #endif
1338     tinfo->setThread(t);
1339     /* put to thread table */
1340     tTHREADTABLE[tinfo->id] = t;
1341 #if BIGSIM_OUT_OF_CORE && BIGSIM_OOC_PREFETCH
1342     //initial scheduling points for workthreads
1343     if(bgUseOutOfCore) schedWorkThds->push((workThreadInfo *)tinfo);
1344 #endif
1345     CthAwaken(t);
1346   }
1347
1348   /* creat communication thread */
1349   for (i=0; i< cva(bgMach).numCth; i++)
1350   {
1351     threadInfo *tinfo = ninfo->threadinfo[i+cva(bgMach).numWth];
1352     t = CthCreate((CthVoidFn)run_thread, tinfo, cva(bgMach).stacksize);
1353     if (t == NULL) CmiAbort("BG> Failed to create communication thread. \n");
1354     tinfo->setThread(t);
1355     /* put to thread table */
1356     tTHREADTABLE[tinfo->id] = t;
1357     CthAwaken(t);
1358   }
1359
1360 }
1361
1362 static void beginExitHandlerFunc(void *msg);
1363 static void writeToDisk();
1364 static void sendCorrectionStats();
1365
1366 static CmiHandler exitHandlerFunc(char *msg)
1367 {
1368   // TODO: free memory before exit
1369   int i,j;
1370
1371   programExit = 2;
1372 #if BLUEGENE_TIMING
1373   // timing
1374   if (0)        // detail
1375   if (genTimeLog) {
1376     for (j=0; j<cva(numNodes); j++)
1377     for (i=0; i<cva(bgMach).numWth; i++) {
1378       BgTimeLine &log = cva(nodeinfo)[j].timelines[i].timeline; 
1379 //      BgPrintThreadTimeLine(nodeInfo::Local2Global(j), i, log);
1380       int x,y,z;
1381       nodeInfo::Local2XYZ(j, &x, &y, &z);
1382       BgWriteThreadTimeLine(arg_argv[0], x, y, z, i, log);
1383     }
1384
1385   }
1386 #endif
1387   if (genTimeLog) sendCorrectionStats();
1388
1389   if (genTimeLog) writeToDisk();
1390
1391 //  if (tTHREADTYPE == WORK_THREAD)
1392   {
1393   int origPe = -2;
1394   // close all tracing modules
1395   for (j=0; j<cva(numNodes); j++)
1396     for (i=0; i<cva(bgMach).numWth; i++) {
1397       int pe = nodeInfo::Local2Global(j)*cva(bgMach).numWth+i;
1398       int oldPe = CmiSwitchToPE(pe);
1399       if (cva(bgMach).replay != -1)
1400         if ( pe != cva(bgMach).replay ) continue;
1401       if (origPe == -2) origPe = oldPe;
1402       traceCharmClose();
1403 //      CmiSwitchToPE(oldPe);
1404       delete cva(nodeinfo)[j].threadinfo[i]->watcher;   // force dump watcher
1405       cva(nodeinfo)[j].threadinfo[i]->watcher = NULL;
1406     }
1407     if (origPe!=-2) CmiSwitchToPE(origPe);
1408   }
1409
1410 #if 0
1411   delete [] cva(nodeinfo);
1412   delete [] cva(inBuffer);
1413   for (i=0; i<cva(numNodes); i++) CmmFree(cva(msgBuffer)[i]);
1414   delete [] cva(msgBuffer);
1415 #endif
1416
1417 #if CMK_HAS_COUNTER_PAPI
1418   if (cva(bgMach).timingMethod == BG_COUNTER) {
1419 /*        
1420   CmiPrintf("BG[PE %d]> cycles: %lld\n", CmiMyPe(), total_ins);
1421   CmiPrintf("BG[PE %d]> floating point instructions: %lld\n", CmiMyPe(), total_fps);
1422   CmiPrintf("BG[PE %d]> L1 cache misses: %lld\n", CmiMyPe(), total_l1_dcm);
1423   //CmiPrintf("BG[PE %d]> cycles stalled waiting for memory access: %lld\n", CmiMyPe(), total_mem_rcy);
1424  */
1425         for(int i=0; i<numPapiEvents; i++){
1426           CmiPrintf("BG[PE %d]> %s: %lld\n", CmiMyPe(), papi_counters_desc[i], total_papi_counters[i]);
1427            delete papi_counters_desc[i];
1428         }
1429         delete papiEvents;
1430         delete papiValues;
1431         delete papi_counters_desc;
1432         delete total_papi_counters;
1433   }
1434 #endif
1435
1436   //ConverseExit();
1437   if (genTimeLog)
1438     { if (CmiMyPe() != 0) CsdExitScheduler(); }
1439   else
1440     CsdExitScheduler();
1441
1442   //if (CmiMyPe() == 0) CmiPrintf("\nBG> BlueGene emulator shutdown gracefully!\n");
1443
1444   return 0;
1445 }
1446
1447 static void sanityCheck()
1448 {
1449   if (cva(bgMach).x==0 || cva(bgMach).y==0 || cva(bgMach).z==0)  {
1450     if (CmiMyPe() == 0)
1451       CmiPrintf("\nMissing parameters for BlueGene machine size!\n<tip> use command line options: +x, +y, or +z.\n");
1452     BgShutdown(); 
1453   } 
1454   else if (cva(bgMach).numCth==0 || cva(bgMach).numWth==0) { 
1455 #if 1
1456     if (cva(bgMach).numCth==0) cva(bgMach).numCth=1;
1457     if (cva(bgMach).numWth==0) cva(bgMach).numWth=1;
1458 #else
1459     if (CmiMyPe() == 0)
1460       CmiPrintf("\nMissing parameters for number of communication/worker threads!\n<tip> use command line options: +cth or +wth.\n");
1461     BgShutdown(); 
1462 #endif
1463   }
1464   if (cva(bgMach).getNodeSize()<CmiNumPes()) {
1465     CmiAbort("\nToo few BlueGene nodes!\n");
1466   }
1467 }
1468
1469 #undef CmiSwitchToPE
1470 extern "C" int CmiSwitchToPEFn(int pe);
1471
1472 // main
1473 CmiStartFn bgMain(int argc, char **argv)
1474 {
1475   int i;
1476   char *configFile = NULL;
1477
1478   BgProcessMessage = BgProcessMessageDefault;
1479 #if CMK_CONDS_USE_SPECIAL_CODE
1480   // overwrite possible implementation in machine.c
1481   CmiSwitchToPE = CmiSwitchToPEFn;
1482 #endif
1483
1484   /* initialize all processor level data */
1485   CpvInitialize(BGMach,bgMach);
1486   cva(bgMach).nullify();
1487
1488   CmiArgGroup("Charm++","BlueGene Simulator");
1489   if (CmiGetArgStringDesc(argv, "+bgconfig", &configFile, "BlueGene machine config file")) {
1490    cva(bgMach). read(configFile);
1491   }
1492   CmiGetArgIntDesc(argv, "+x", &cva(bgMach).x, 
1493                 "The x size of the grid of nodes");
1494   CmiGetArgIntDesc(argv, "+y", &cva(bgMach).y, 
1495                 "The y size of the grid of nodes");
1496   CmiGetArgIntDesc(argv, "+z", &cva(bgMach).z, 
1497                 "The z size of the grid of nodes");
1498   CmiGetArgIntDesc(argv, "+cth", &cva(bgMach).numCth, 
1499                 "The number of simulated communication threads per node");
1500   CmiGetArgIntDesc(argv, "+wth", &cva(bgMach).numWth, 
1501                 "The number of simulated worker threads per node");
1502
1503   CmiGetArgIntDesc(argv, "+bgstacksize", &cva(bgMach).stacksize, 
1504                 "Blue Gene thread stack size");
1505
1506   if (CmiGetArgFlagDesc(argv, "+bglog", "Write events to log file"))
1507      genTimeLog = 1;
1508   if (CmiGetArgFlagDesc(argv, "+bgcorrect", "Apply timestamp correction to logs"))
1509     correctTimeLog = 1;
1510   schedule_flag = 0;
1511   if (correctTimeLog) {
1512     genTimeLog = 1;
1513     schedule_flag = 1;
1514   }
1515
1516   if (CmiGetArgFlagDesc(argv, "+bgverbose", "Print debug info verbosely"))
1517     bgverbose = 1;
1518
1519   // for timing method, default using elapse calls.
1520   if(CmiGetArgFlagDesc(argv, "+bgelapse", 
1521                        "Use user provided BgElapse for time prediction")) 
1522       cva(bgMach).timingMethod = BG_ELAPSE;
1523   if(CmiGetArgFlagDesc(argv, "+bgwalltime", 
1524                        "Use walltime method for time prediction")) 
1525       cva(bgMach).timingMethod = BG_WALLTIME;
1526 #ifdef CMK_ORIGIN2000
1527   if(CmiGetArgFlagDesc(argv, "+bgcounter", "Use performance counter")) 
1528       cva(bgMach).timingMethod = BG_COUNTER;
1529 #elif CMK_HAS_COUNTER_PAPI
1530   if (CmiGetArgFlagDesc(argv, "+bgpapi", "Use PAPI Performance counters")) {
1531     cva(bgMach).timingMethod = BG_COUNTER;
1532   }
1533   if (cva(bgMach).timingMethod == BG_COUNTER) {
1534     init_counters();
1535   }
1536 #endif
1537   CmiGetArgDoubleDesc(argv,"+bgfpfactor", &cva(bgMach).fpfactor, 
1538                       "floating point to time factor");
1539   CmiGetArgDoubleDesc(argv,"+bgcpufactor", &cva(bgMach).cpufactor, 
1540                       "scale factor for wallclock time measured");
1541   CmiGetArgDoubleDesc(argv,"+bgtimercost", &cva(bgMach).timercost, 
1542                       "timer cost");
1543   #if 0
1544   if(cva(bgMach).timingMethod == BG_WALLTIME)
1545   {
1546       int count = 1e6;
1547       double start, stop, diff, cost, dummy;
1548
1549       dummy = BG_TIMER(); // In case there's an initialization delay somewhere
1550
1551       start = BG_TIMER();
1552       for (int i = 0; i < count; ++i)
1553           dummy = BG_TIMER();
1554       stop = BG_TIMER();
1555
1556       diff = stop - start;
1557       cost = diff / count;
1558
1559       CmiPrintf("Measured timer cost: %g Actual: %g\n", 
1560                 cost, cva(bgMach).timercost);
1561       cva(bgMach).timercost = cost;
1562   }
1563   #endif
1564   
1565   char *networkModel;
1566   if (CmiGetArgStringDesc(argv, "+bgnetwork", &networkModel, "Network model")) {
1567    cva(bgMach).setNetworkModel(networkModel);
1568   }
1569
1570   bgcorroff = 0;
1571   if(CmiGetArgFlagDesc(argv, "+bgcorroff", "Start with correction off")) 
1572     bgcorroff = 1;
1573
1574   bgstats_flag=0;
1575   if(CmiGetArgFlagDesc(argv, "+bgstats", "Print correction statistics")) 
1576     bgstats_flag = 1;
1577
1578   if (CmiGetArgStringDesc(argv, "+bgtraceroot", &cva(bgMach).traceroot, "Directory to write bgTrace files to"))
1579   {
1580     char *root = (char*)malloc(strlen(cva(bgMach).traceroot) + 10);
1581     sprintf(root, "%s/", cva(bgMach).traceroot);
1582     cva(bgMach).traceroot = root;
1583   }
1584
1585   // record/replay
1586   if (CmiGetArgFlagDesc(argv,"+bgrecord","Record message processing order for BigSim")) {
1587     cva(bgMach).record = 1;
1588     if (CmiMyPe() == 0)
1589       CmiPrintf("BG info> full record mode. \n");
1590   }
1591   if (CmiGetArgFlagDesc(argv,"+bgrecordnode","Record message processing order for BigSim")) {
1592     cva(bgMach).recordnode = 1;
1593     if (CmiMyPe() == 0)
1594       CmiPrintf("BG info> full record mode on node. \n");
1595   }
1596   int replaype;
1597   if (CmiGetArgIntDesc(argv,"+bgreplay", &replaype, "Re-play message processing order for BigSim")) {
1598     cva(bgMach).replay = replaype;
1599   }
1600   else {
1601     if (CmiGetArgFlagDesc(argv,"+bgreplay","Record message processing order for BigSim"))
1602     cva(bgMach).replay = 0;    // default to 0
1603   }
1604   if (cva(bgMach).replay >= 0) {
1605     if (CmiNumPes()>1)
1606       CmiAbort("BG> bgreplay mode must run on one physical processor.");
1607     if (cva(bgMach).x!=1 || cva(bgMach).y!=1 || cva(bgMach).z!=1 ||
1608          cva(bgMach).numWth!=1 || cva(bgMach).numCth!=1)
1609       CmiAbort("BG> bgreplay mode must run on one target processor.");
1610     CmiPrintf("BG info> replay mode for target processor %d.\n", cva(bgMach).replay);
1611   }
1612   char *procs = NULL;
1613   if (CmiGetArgStringDesc(argv, "+bgrecordprocessors", &procs, "A list of processors to record, e.g. 0,10,20-30")) {
1614     cva(bgMach).recordprocs.set(procs);
1615   }
1616
1617     // record/replay at node level
1618   char *nodes = NULL;
1619   if (CmiGetArgStringDesc(argv, "+bgrecordnodes", &nodes, "A list of nodes to record, e.g. 0,10,20-30")) {
1620     cva(bgMach).recordnodes.set(nodes);
1621   }
1622   int replaynode;
1623   if (CmiGetArgIntDesc(argv,"+bgreplaynode", &replaynode, "Re-play message processing order for BigSim")) {      
1624     cva(bgMach).replaynode = replaynode; 
1625   }
1626   else {
1627     if (CmiGetArgFlagDesc(argv,"+bgreplaynode","Record message processing order for BigSim"))
1628     cva(bgMach).replaynode = 0;    // default to 0
1629   }
1630   if (cva(bgMach).replaynode >= 0) {
1631     int startpe, endpe;
1632     BgRead_nodeinfo(replaynode, startpe, endpe);
1633     if (cva(bgMach).numWth != endpe-startpe+1) {
1634       cva(bgMach).numWth = endpe-startpe+1;     // update wth
1635       CmiPrintf("BG info> numWth is changed to %d.\n", cva(bgMach).numWth);
1636     }
1637     if (CmiNumPes()>1)
1638       CmiAbort("BG> bgreplay mode must run on one physical processor.");
1639     if (cva(bgMach).x!=1 || cva(bgMach).y!=1 || cva(bgMach).z!=1)
1640       CmiAbort("BG> bgreplay mode must run on one target processor.");
1641     CmiPrintf("BG info> replay mode for target node %d.\n", cva(bgMach).replaynode);   
1642   }     
1643   CmiAssert(!(cva(bgMach).replaynode != -1 && cva(bgMach).replay != -1));
1644
1645
1646   /* parameters related with out-of-core execution */
1647   int tmpcap=0;
1648   if (CmiGetArgIntDesc(argv, "+bgooccap", &tmpcap, "Simulate with out-of-core support and the number of target processors allowed in memory")){
1649      TBLCAPACITY = tmpcap;
1650     }
1651   if (CmiGetArgDoubleDesc(argv, "+bgooc", &bgOOCMaxMemSize, "Simulate with out-of-core support and the threshhold of memory size")){
1652       bgUseOutOfCore = 1;
1653       _BgInOutOfCoreMode = 1; //the global (the whole converse layer) out-of-core flag
1654
1655       double curFreeMem = bgGetSysFreeMemSize();
1656       if(fabs(bgOOCMaxMemSize - 0.0)<=1e-6){
1657         //using the memory available of the system right now
1658         //assuming no other programs will run after this program runs
1659         bgOOCMaxMemSize = curFreeMem;
1660         CmiPrintf("Using the system's current memory available: %.3fMB\n", bgOOCMaxMemSize);
1661       }
1662       if(bgOOCMaxMemSize > curFreeMem){
1663         CmiPrintf("Warning: not enough memory for the specified memory size, now use the current available memory %3.fMB.\n", curFreeMem);
1664         bgOOCMaxMemSize = curFreeMem;
1665       }
1666       DEBUGF(("out-of-core turned on!\n"));
1667   }      
1668
1669 #if BLUEGENE_DEBUG_LOG
1670   {
1671     char ln[200];
1672     sprintf(ln,"bgdebugLog.%d",CmiMyPe());
1673     bgDebugLog=fopen(ln,"w");
1674   }
1675 #endif
1676
1677   arg_argv = argv;
1678   arg_argc = CmiGetArgc(argv);
1679
1680   /* msg handler */
1681   CpvInitialize(SimState, simState);
1682   cva(simState).msgHandler = CmiRegisterHandler((CmiHandler) msgHandlerFunc);
1683   cva(simState).nBcastMsgHandler = CmiRegisterHandler((CmiHandler)nodeBCastMsgHandlerFunc);
1684   cva(simState).tBcastMsgHandler = CmiRegisterHandler((CmiHandler)threadBCastMsgHandlerFunc);
1685   cva(simState).exitHandler = CmiRegisterHandler((CmiHandler) exitHandlerFunc);
1686
1687   cva(simState).beginExitHandler = CmiRegisterHandler((CmiHandler) beginExitHandlerFunc);
1688   cva(simState).inEmulatorInit = 1;
1689   /* call user defined BgEmulatorInit */
1690   BgEmulatorInit(arg_argc, arg_argv);
1691   cva(simState).inEmulatorInit = 0;
1692
1693   /* check if all bluegene node size and thread information are set */
1694   sanityCheck();
1695
1696   _bgSize = cva(bgMach).getNodeSize(); 
1697
1698   timerFunc = BgGetTime;
1699
1700   BgInitTiming();               // timing module
1701
1702   if (CmiMyPe() == 0) {
1703     CmiPrintf("BG info> Simulating %dx%dx%d nodes with %d comm + %d work threads each.\n", cva(bgMach).x, cva(bgMach).y, cva(bgMach).z, cva(bgMach).numCth, cva(bgMach).numWth);
1704     CmiPrintf("BG info> Network type: %s.\n", cva(bgMach).network->name());
1705     cva(bgMach).network->print();
1706     CmiPrintf("BG info> cpufactor is %f.\n", cva(bgMach).cpufactor);
1707     CmiPrintf("BG info> floating point factor is %f.\n", cva(bgMach).fpfactor);
1708     if (cva(bgMach).stacksize)
1709       CmiPrintf("BG info> BG stack size: %d bytes. \n", cva(bgMach).stacksize);
1710     if (cva(bgMach).timingMethod == BG_ELAPSE) 
1711       CmiPrintf("BG info> Using BgElapse calls for timing method. \n");
1712     else if (cva(bgMach).timingMethod == BG_WALLTIME)
1713       CmiPrintf("BG info> Using WallTimer for timing method. \n");
1714     else if (cva(bgMach).timingMethod == BG_COUNTER)
1715       CmiPrintf("BG info> Using performance counter for timing method. \n");
1716     if (genTimeLog)
1717       CmiPrintf("BG info> Generating timing log. \n");
1718     if (correctTimeLog)
1719       CmiPrintf("BG info> Perform timestamp correction. \n");
1720     if (cva(bgMach).traceroot)
1721       CmiPrintf("BG info> bgTrace root is '%s'. \n", cva(bgMach).traceroot);
1722   }
1723
1724   CtvInitialize(threadInfo *, threadinfo);
1725
1726   /* number of bg nodes on this PE */
1727   CpvInitialize(int, numNodes);
1728   cva(numNodes) = nodeInfo::numLocalNodes();
1729
1730   if (CmiMyRank() == 0)
1731     initLock = CmiCreateLock();     // used for BnvInitialize
1732
1733   bgstreaming.init(cva(numNodes));
1734
1735   //Must initialize out-of-core related data structures before creating any BG nodes and procs
1736 if(bgUseOutOfCore){
1737       initTblThreadInMem();
1738           #if BIGSIM_OUT_OF_CORE && BIGSIM_OOC_PREFETCH
1739       //init prefetch status      
1740       thdsOOCPreStatus = new oocPrefetchStatus[cva(numNodes)*cva(bgMach).numWth];
1741       oocPrefetchSpace = new oocPrefetchBufSpace();
1742       schedWorkThds = new oocWorkThreadQueue();
1743           #endif
1744   }
1745
1746 #if BIGSIM_OUT_OF_CORE
1747   //initialize variables related to get precise
1748   //physical memory usage info for a process
1749   bgMemPageSize = getpagesize();
1750   memset(bgMemStsFile, 0, 25); 
1751   sprintf(bgMemStsFile, "/proc/%d/statm", getpid());
1752 #endif
1753
1754
1755   /* create BG nodes */
1756   CpvInitialize(nodeInfo *, nodeinfo);
1757   cva(nodeinfo) = new nodeInfo[cva(numNodes)];
1758   _MEMCHECK(cva(nodeinfo));
1759
1760   cta(threadinfo) = new threadInfo(-1, UNKNOWN_THREAD, NULL);
1761   _MEMCHECK(cta(threadinfo));
1762
1763   /* create BG processors for each node */
1764   for (i=0; i<cva(numNodes); i++)
1765   {
1766     nodeInfo *ninfo = cva(nodeinfo) + i;
1767     // create threads
1768     ninfo->initThreads(i);
1769
1770     /* pretend that I am a thread */
1771     cta(threadinfo)->myNode = ninfo;
1772
1773     /* initialize a BG node and fire all threads */
1774     BgNodeInitialize(ninfo);
1775   }
1776   // clear main thread.
1777   cta(threadinfo)->myNode = NULL;
1778   CpvInitialize(CthThread, mainThread);
1779   cva(mainThread) = CthSelf();
1780
1781   CpvInitialize(int, CthResumeBigSimThreadIdx);
1782
1783   cva(simState).simStartTime = CmiWallTimer();    
1784   return 0;
1785 }
1786
1787 // for conv-conds:
1788 // if -2 untouch
1789 // if -1 main thread
1790 #if CMK_BLUEGENE_THREAD
1791 extern "C" int CmiSwitchToPEFn(int pe)
1792 {
1793   if (pe == -2) return -2;
1794   int oldpe;
1795 //  ASSERT(tTHREADTYPE != COMM_THREAD);
1796   if (tMYNODE == NULL) oldpe = -1;
1797   else if (tTHREADTYPE == COMM_THREAD) oldpe = -BgGetThreadID();
1798   else if (tTHREADTYPE == WORK_THREAD) oldpe = BgGetGlobalWorkerThreadID();
1799   else oldpe = -1;
1800 //CmiPrintf("CmiSwitchToPE from %d to %d\n", oldpe, pe);
1801   if (pe == -1) {
1802     CthSwitchThread(cva(mainThread));
1803   }
1804   else if (pe < 0) {
1805   }
1806   else {
1807     if (cva(bgMach).inReplayMode()) pe = 0;         /* replay mode */
1808     int t = pe%cva(bgMach).numWth;
1809     int newpe = nodeInfo::Global2Local(pe/cva(bgMach).numWth);
1810     nodeInfo *ninfo = cva(nodeinfo) + newpe;;
1811     threadInfo *tinfo = ninfo->threadinfo[t];
1812     CthSwitchThread(tinfo->me);
1813   }
1814   return oldpe;
1815 }
1816 #else
1817 extern "C" int CmiSwitchToPEFn(int pe)
1818 {
1819   if (pe == -2) return -2;
1820   int oldpe;
1821   if (tMYNODE == NULL) oldpe = -1;
1822   else oldpe = BgMyNode();
1823   if (pe == -1) {
1824     cta(threadinfo)->myNode = NULL;
1825   }
1826   else {
1827     int newpe = nodeInfo::Global2Local(pe);
1828     cta(threadinfo)->myNode = cva(nodeinfo) + newpe;;
1829   }
1830   return oldpe;
1831 }
1832 #endif
1833
1834
1835 /*****************************************************************************
1836                         TimeLog correction
1837 *****************************************************************************/
1838
1839 extern void processCorrectionMsg(int nodeidx);
1840
1841 // return the msg pointer, and the index of the message in the affinity queue.
1842 static inline char* searchInAffinityQueue(int nodeidx, BgMsgID &msgId, CmiInt2 tID, int &index)
1843 {
1844   CmiAssert(tID != ANYTHREAD);
1845   ckMsgQueue &affinityQ = cva(nodeinfo)[nodeidx].affinityQ[tID];
1846   for (int i=0; i<affinityQ.length(); i++)  {
1847       char *msg = affinityQ[i];
1848       BgMsgID md = BgMsgID(CmiBgMsgSrcPe(msg), CmiBgMsgID(msg));
1849       if (msgId == md) {
1850         index = i;
1851         return msg;
1852       }
1853   }
1854   return NULL;
1855 }
1856
1857 // return the msg pointer, thread id and the index of the message in the affinity queue.
1858 static char* searchInAffinityQueueInNode(int nodeidx, BgMsgID &msgId, CmiInt2 &tID, int &index)
1859 {
1860   for (tID=0; tID<cva(bgMach).numWth; tID++) {
1861     char *msg = searchInAffinityQueue(nodeidx, msgId, tID, index);
1862     if (msg) return msg;
1863   }
1864   return NULL;
1865 }
1866
1867 StateCounters stateCounters;
1868
1869 int updateRealMsgs(bgCorrectionMsg *cm, int nodeidx)
1870 {
1871   char *msg;
1872   CmiInt2 tID = cm->tID;
1873   int index;
1874   if (tID == ANYTHREAD) {
1875     msg = searchInAffinityQueueInNode(nodeidx, cm->msgId, tID, index);
1876   }
1877   else {
1878     msg = searchInAffinityQueue(nodeidx, cm->msgId, tID, index);
1879   }
1880   if (msg == NULL) return 0;
1881
1882   ckMsgQueue &affinityQ = cva(nodeinfo)[nodeidx].affinityQ[tID];
1883   CmiBgMsgRecvTime(msg) = cm->tAdjust;
1884   affinityQ.update(index);
1885   CthThread tid = cva(nodeinfo)[nodeidx].threadTable[tID];
1886   unsigned int prio = (unsigned int)(cm->tAdjust*PRIO_FACTOR)+1;
1887   CthAwakenPrio(tid, CQS_QUEUEING_IFIFO, sizeof(int), &prio);
1888   stateCounters.corrMsgCRCnt++;
1889   return 1;       /* invalidate this msg */
1890 }
1891
1892 extern void processBufferCorrectionMsgs(void *ignored);
1893
1894 // Coverse handler for begin exit
1895 // flush and process all correction messages
1896 static void beginExitHandlerFunc(void *msg)
1897 {
1898   CmiFree(msg);
1899   delayCheckFlag = 0;
1900 //CmiPrintf("\n\n\nbeginExitHandlerFunc called on %d\n", CmiMyPe());
1901   programExit = 1;
1902 #if LIMITED_SEND
1903   CQdCreate(CpvAccess(cQdState), BgNodeSize());
1904 #endif
1905
1906 #if 1
1907   for (int i=0; i<BgNodeSize(); i++) 
1908     processCorrectionMsg(i); 
1909 #if USE_MULTISEND
1910   BgSendBufferedCorrMsgs();
1911 #endif
1912 #else
1913   // the msg queue should be empty now.
1914   // don't do insert adjustment, but start to do all timing correction from here
1915   int nodeidx, tID;
1916   for (nodeidx=0; nodeidx<BgNodeSize(); nodeidx++) {
1917     BgTimeLineRec *tlines = cva(nodeinfo)[nodeidx].timelines;
1918     for (tID=0; tID<cva(numWth); tID++) {
1919         BgTimeLineRec &tlinerec = tlines[tID];
1920         BgAdjustTimeLine(tlinerec, nodeidx, tID);
1921     }
1922   }
1923
1924 #endif
1925
1926 #if !THROTTLE_WORK
1927 #if DELAY_CHECK
1928   CcdCallFnAfter(processBufferCorrectionMsgs,NULL,CHECK_INTERVAL);
1929 #endif
1930 #endif
1931 }
1932
1933 #define HISTOGRAM_SIZE  100
1934 // compute total CPU utilization for each timeline and 
1935 // return the number of real msgs
1936 void computeUtilForAll(int* array, int *nReal)
1937 {
1938   double scale = 1.0e3;         // scale to ms
1939
1940   //We measure from 1ms to 5001 ms in steps of 100 ms
1941   int min = 0, step = 1;
1942   int max = min + HISTOGRAM_SIZE*step;
1943   // [min, max: step]  HISTOGRAM_SIZE slots
1944
1945   if (CmiMyPe()==0)
1946     CmiPrintf("computeUtilForAll: min:%d max:%d step:%d scale:%f.\n", min, max, step, scale);
1947   int size = (max-min)/step;
1948   CmiAssert(size == HISTOGRAM_SIZE);
1949   int allmin = -1, allmax = -1;
1950   for(int i=0;i<size;i++) array[i] = 0;
1951
1952   for (int nodeidx=0; nodeidx<cva(numNodes); nodeidx++) {
1953     BgTimeLineRec *tlinerec = cva(nodeinfo)[nodeidx].timelines;
1954     for (int tID=0; tID<cva(bgMach).numWth; tID++) {
1955       int util = (int)(scale*(tlinerec[tID].computeUtil(nReal)));
1956
1957       if (util >= max) { if (util>allmax||allmax==-1) allmax=util; util=max-1;}
1958       if (util < min) { if (util<allmin||allmin==-1) allmin=util; util=min; }
1959       array[(util-min)/step]++;
1960     }
1961   }
1962   if (allmin!=-1 || allmax!=-1)
1963     CmiPrintf("[%d] Warning: computeUtilForAll out of range %f - %f.\n", CmiMyPe(), (allmin==-1)?-1:allmin/scale, (allmax==-1)?-1:allmax/scale);
1964 }
1965
1966 class StatsMessage {
1967   char core[CmiBlueGeneMsgHeaderSizeBytes];
1968 public:
1969   int processCount;
1970   int corrMsgCount;
1971   int realMsgCount;
1972   int maxTimelineLen, minTimelineLen;
1973 };
1974
1975 extern int processCount, corrMsgCount;
1976
1977 static void sendCorrectionStats()
1978 {
1979   int msgSize = sizeof(StatsMessage)+sizeof(int)*HISTOGRAM_SIZE;
1980   StatsMessage *statsMsg = (StatsMessage *)CmiAlloc(msgSize);
1981   statsMsg->processCount = processCount;
1982   statsMsg->corrMsgCount = corrMsgCount;
1983   int numMsgs=0;
1984   int maxTimelineLen=-1, minTimelineLen=CMK_MAXINT;
1985   int totalMem = 0;
1986   if (bgstats_flag) {
1987   for (int nodeidx=0; nodeidx<cva(numNodes); nodeidx++) {
1988     BgTimeLineRec *tlines = cva(nodeinfo)[nodeidx].timelines;
1989     for (int tID=0; tID<cva(bgMach).numWth; tID++) {
1990         BgTimeLineRec &tlinerec = tlines[tID];
1991         int tlen = tlinerec.length();
1992         if (tlen>maxTimelineLen) maxTimelineLen=tlen;
1993         if (tlen<minTimelineLen) minTimelineLen=tlen;
1994         totalMem = tlen*sizeof(BgTimeLog);
1995 //CmiPrintf("[%d node:%d] BgTimeLog: %dK len:%d size of bglog: %d bytes\n", CmiMyPe(), nodeidx, totalMem/1000, tlen, sizeof(BgTimeLog));
1996 #if 0
1997         for (int i=0; i< tlinerec.length(); i++) {
1998           numMsgs += tlinerec[i]->msgs.length();
1999         }
2000 #endif
2001     }
2002   }
2003   computeUtilForAll((int*)(statsMsg+1), &numMsgs);
2004   statsMsg->realMsgCount = numMsgs;
2005   statsMsg->maxTimelineLen = maxTimelineLen;
2006   statsMsg->minTimelineLen = minTimelineLen;
2007   }  // end if
2008
2009   CmiSetHandler(statsMsg, cva(simState).bgStatCollectHandler);
2010   CmiSyncSendAndFree(0, msgSize, statsMsg);
2011 }
2012
2013 // Converse handler for collecting stats
2014 void statsCollectionHandlerFunc(void *msg)
2015 {
2016   static int count=0;
2017   static int pc=0, cc=0, realMsgCount=0;
2018   static int maxTimelineLen=0, minTimelineLen=CMK_MAXINT;
2019   static int *histArray = NULL;
2020   int i;
2021
2022   count++;
2023   if (histArray == NULL) {
2024     histArray = new int[HISTOGRAM_SIZE];
2025     for (i=0; i<HISTOGRAM_SIZE; i++) histArray[i]=0;
2026   }
2027   StatsMessage *m = (StatsMessage *)msg;
2028   pc += m->processCount;
2029   cc += m->corrMsgCount;
2030   realMsgCount += m->realMsgCount;
2031   if (minTimelineLen> m->minTimelineLen) minTimelineLen=m->minTimelineLen;
2032   if (maxTimelineLen< m->maxTimelineLen) maxTimelineLen=m->maxTimelineLen;
2033   int *array = (int *)(m+1);
2034   for (i=0; i<HISTOGRAM_SIZE; i++) histArray[i] += array[i];
2035   if (count == CmiNumPes()) {
2036     if (bgstats_flag) {
2037       CmiPrintf("Total procCount:%d corrMsgCount:%d realMsg:%d timeline:%d-%d\n", pc, cc, realMsgCount, minTimelineLen, maxTimelineLen);
2038       for (i=0; i<HISTOGRAM_SIZE; i++) {
2039         CmiPrintf("%d ", histArray[i]);
2040         if (i%20 == 19) CmiPrintf("\n");
2041       }
2042       CmiPrintf("\n");
2043     }
2044     CsdExitScheduler();
2045   }
2046   CmiFree(msg);
2047 }
2048
2049 // update arrival time from buffer messages
2050 // before start an entry, check message time against buffered timing
2051 // correction message to update to the correct time.
2052 void correctMsgTime(char *msg)
2053 {
2054    if (!correctTimeLog) return;
2055 //   if (CkMsgDoCorrect(msg) == 0) return;
2056
2057    BgMsgID msgId(CmiBgMsgSrcPe(msg), CmiBgMsgID(msg));
2058    CmiInt2 tid = CmiBgMsgThreadID(msg);
2059
2060    bgCorrectionQ &cmsg = cva(nodeinfo)[tMYNODEID].cmsg;
2061    int len = cmsg.length();
2062    for (int i=0; i<len; i++) {
2063      bgCorrectionMsg* m = cmsg[i];
2064      if (msgId == m->msgId && tid == m->tID) {
2065         if (m->tAdjust < 0.0) return;
2066         //CmiPrintf("correctMsgTime from %e to %e\n", CmiBgMsgRecvTime(msg), m->tAdjust);
2067         CmiBgMsgRecvTime(msg) = m->tAdjust;
2068         m->tAdjust = -1.0;       /* invalidate this msg */
2069 //      cmsg.update(i);
2070         stateCounters.corrMsgRCCnt++;
2071         break;
2072      }
2073    }
2074 }
2075
2076
2077 //TODO: write disk bgTraceFiles
2078 static void writeToDisk()
2079 {
2080
2081   char* d = new char[1025];
2082   //Num of simulated procs on this real pe
2083   int numLocalProcs = cva(numNodes)*cva(bgMach).numWth;
2084
2085   const PUP::machineInfo &machInfo = PUP::machineInfo::current();
2086
2087   // write summary file on PE0
2088   if(CmiMyPe()==0){
2089     
2090     sprintf(d, "%sbgTrace", cva(bgMach).traceroot?cva(bgMach).traceroot:""); 
2091     FILE *f2 = fopen(d,"wb");
2092     //Total emulating processors and total target BG processors
2093     int numEmulatingPes=CmiNumPes();
2094     int totalWorkerProcs = BgNumNodes()*cva(bgMach).numWth;
2095
2096     if(f2==NULL) {
2097       CmiPrintf("[%d] Creating trace file %s  failed\n", CmiMyPe(), d);
2098       CmiAbort("BG> Abort");
2099     }
2100     PUP::toDisk p(f2);
2101     p((char *)&machInfo, sizeof(machInfo));
2102     p|totalWorkerProcs;
2103     p|cva(bgMach);
2104     p|numEmulatingPes;
2105     p|bglog_version;
2106     p|CpvAccess(CthResumeBigSimThreadIdx);
2107     
2108     CmiPrintf("[0] Number is numX:%d numY:%d numZ:%d numCth:%d numWth:%d numEmulatingPes:%d totalWorkerProcs:%d bglog_ver:%d\n",cva(bgMach).x,cva(bgMach).y,cva(bgMach).z,cva(bgMach).numCth,cva(bgMach).numWth,numEmulatingPes,totalWorkerProcs,bglog_version);
2109     
2110     fclose(f2);
2111   }
2112   
2113   sprintf(d, "%sbgTrace%d", cva(bgMach).traceroot?cva(bgMach).traceroot:"", CmiMyPe()); 
2114   FILE *f = fopen(d,"wb");
2115  
2116   if(f==NULL)
2117     CmiPrintf("Creating bgTrace%d failed\n",CmiMyPe());
2118   PUP::toDisk p(f);
2119   
2120   p((char *)&machInfo, sizeof(machInfo));       // machine info
2121   p|numLocalProcs;
2122
2123   // CmiPrintf("Timelines are: \n");
2124   int procTablePos = ftell(f);
2125
2126   int *procOffsets = new int[numLocalProcs];
2127   int procTableSize = (numLocalProcs)*sizeof(int);
2128   fseek(f,procTableSize,SEEK_CUR); 
2129
2130   for (int j=0; j<cva(numNodes); j++){
2131     for(int i=0;i<cva(bgMach).numWth;i++){
2132     #if BIGSIM_OUT_OF_CORE
2133         //When isomalloc is used, some events inside BgTimeLineRec are allocated
2134         //through isomalloc. Therefore, the memory containing those events needs
2135         //to be brought back into memory from disk. --Chao Mei          
2136         if(bgUseOutOfCore && CmiMemoryIs(CMI_MEMORY_IS_ISOMALLOC))
2137             bgOutOfCoreSchedule(cva(nodeinfo)[j].threadinfo[i]);
2138      #endif     
2139       BgTimeLineRec &t = cva(nodeinfo)[j].timelines[i];
2140       procOffsets[j*cva(bgMach).numWth + i] = ftell(f);
2141       t.pup(p);
2142     }
2143   }
2144   
2145   fseek(f,procTablePos,SEEK_SET);
2146   p(procOffsets,numLocalProcs);
2147   fclose(f);
2148
2149   CmiPrintf("[%d] Wrote to disk for %d BG nodes. \n", CmiMyPe(), cva(numNodes));
2150 }
2151
2152
2153 /*****************************************************************************
2154              application Converse thread hook
2155 *****************************************************************************/
2156
2157 CpvExtern(int      , CthResumeBigSimThreadIdx);
2158
2159 void CthEnqueueBigSimThread(CthThreadToken* token, int s,
2160                                    int pb,unsigned int *prio)
2161 {
2162 /*
2163   CmiSetHandler(token, CpvAccess(CthResumeNormalThreadIdx));
2164   CsdEnqueueGeneral(token, s, pb, prio);
2165 */
2166 #if CMK_BLUEGENE_THREAD
2167   int x, y, z;
2168   BgGetMyXYZ(&x, &y, &z);
2169   int t = BgGetThreadID();
2170 #else
2171   #error "ERROR HERE"
2172 #endif
2173     // local message into queue
2174   DEBUGM(4, ("In EnqueueBigSimThread method!\n"));
2175
2176   DEBUGM(4, ("token [%p] is added to queue pointing to thread[%p]\n", token, token->thread));
2177   BgSendPacket(x,y,z, t, CpvAccess(CthResumeBigSimThreadIdx), LARGE_WORK, sizeof(CthThreadToken), (char *)token);
2178
2179
2180 CthThread CthSuspendBigSimThread()
2181
2182   return  cta(threadinfo)->me;
2183 }
2184
2185 static void bigsimThreadListener_suspend(struct CthThreadListener *l)
2186 {
2187    // stop timer
2188    stopVTimer();
2189 }
2190
2191 static void bigsimThreadListener_resume(struct CthThreadListener *l)
2192 {
2193    // start timer by reset it
2194    resetVTime();
2195 }
2196
2197
2198 void BgSetStrategyBigSimDefault(CthThread t)
2199
2200   CthSetStrategy(t,
2201                  CthEnqueueBigSimThread,
2202                  CthSuspendBigSimThread);
2203
2204   CthThreadListener *a = new CthThreadListener;
2205   a->suspend = bigsimThreadListener_suspend;
2206   a->resume = bigsimThreadListener_resume;
2207   a->free = NULL;
2208   CthAddListener(t, a);
2209 }
2210
2211 int BgIsMainthread()
2212 {
2213     return tMYNODE == NULL;
2214 }
2215
2216 int BgIsRecord()
2217 {
2218     return cva(bgMach).record == 1;
2219 }
2220
2221 int BgIsReplay()
2222 {
2223     return cva(bgMach).replay != -1 || cva(bgMach).replaynode != -1;
2224 }
2225
2226 extern "C" void CkReduce(void *msg, int size, CmiReduceMergeFn mergeFn) {
2227   ((workThreadInfo*)cta(threadinfo))->reduceMsg = msg;
2228   //CmiPrintf("Called CkReduce from %d %hd\n",CmiMyPe(),cta(threadinfo)->globalId);
2229   int numLocal = 0, count = 0;
2230   for (int j=0; j<cva(numNodes); j++){
2231     for(int i=0;i<cva(bgMach).numWth;i++){
2232       workThreadInfo *t = (workThreadInfo*)cva(nodeinfo)[j].threadinfo[i];
2233       if (t->reduceMsg == NULL) return; /* we are not yet ready to reduce */
2234       numLocal ++;
2235     }
2236   }
2237   void **msgLocal = (void**)malloc(sizeof(void*)*(numLocal-1));
2238   for (int j=0; j<cva(numNodes); j++){
2239     for(int i=0;i<cva(bgMach).numWth;i++){
2240       workThreadInfo *t = (workThreadInfo*)cva(nodeinfo)[j].threadinfo[i];
2241       if (t == cta(threadinfo)) break;
2242       msgLocal[count++] = t->reduceMsg;
2243       t->reduceMsg = NULL;
2244     }
2245   }
2246   CmiAssert(count==numLocal-1);
2247   msg = mergeFn(&size, msg, msgLocal, numLocal-1);
2248   CmiReduce(msg, size, mergeFn);
2249   CmiPrintf("Called CmiReduce %d\n",CmiMyPe());
2250   for (int i=0; i<numLocal-1; ++i) CmiFree(msgLocal[i]);
2251   free(msgLocal);
2252 }
2253
2254 // for record/replay, to fseek back
2255 void BgRewindRecord()
2256 {
2257   threadInfo *tinfo = cta(threadinfo);
2258   if (tinfo->watcher) tinfo->watcher->rewind();
2259 }
2260