commit a lot of unwanted changes.
[charm.git] / src / langs / bluegene / blue.C
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 /** \file: blue.C -- Converse BlueGene Emulator Code
9  *  Emulator written by Gengbin Zheng, gzheng@uiuc.edu on 2/20/2001
10  */
11  
12 #include <stdio.h>
13 #include <string.h>
14 #include <stdlib.h>
15 #include <math.h>
16 #include <string.h>
17 #include <unistd.h>
18
19 #include "cklists.h"
20 #include "queueing.h"
21 #include "blue.h"
22 #include "blue_impl.h"          // implementation header file
23 //#include "blue_timing.h"      // timing module
24 #include "bigsim_record.h"
25
26 #include "bigsim_ooc.h" //out-of-core module
27 #include "bigsim_debug.h"
28
29 //#define  DEBUGF(x)      //CmiPrintf x;
30
31 #undef DEBUGLEVEL
32 #define DEBUGLEVEL 10
33
34 const double CHARM_OVERHEAD = 0.5E-6;
35
36 /* node level variables */
37 CpvDeclare(nodeInfo*, nodeinfo);                /* represent a bluegene node */
38
39 /* thread level variables */
40 CtvDeclare(threadInfo *, threadinfo);   /* represent a bluegene thread */
41
42 CpvStaticDeclare(CthThread, mainThread);
43
44 /* BG machine parameter */
45 CpvDeclare(BGMach, bgMach);     /* BG machine size description */
46 CpvDeclare(int, numNodes);        /* number of bg nodes on this PE */
47
48 /* emulator node level variables */
49 CpvDeclare(SimState, simState);
50
51 CpvDeclare(int      , CthResumeBigSimThreadIdx);
52
53 static int arg_argc;
54 static char **arg_argv;
55
56 CmiNodeLock initLock;     // used for BnvInitialize
57
58 int _bgSize = 0;                        // short cut of blue gene node size
59 int delayCheckFlag = 1;          // when enabled, only check correction 
60                                         // messages after some interval
61 int programExit = 0;
62
63 static int bgstats_flag = 0;            // flag print stats at end of simulation
64
65 // for debugging log
66 FILE *bgDebugLog;                       // for debugging
67
68 #ifdef CMK_ORIGIN2000
69 extern "C" int start_counters(int e0, int e1);
70 extern "C" int read_counters(int e0, long long *c0, int e1, long long *c1);
71 inline double Count2Time(long long c) { return c*5.e-7; }
72 #elif CMK_HAS_COUNTER_PAPI
73 #include <papi.h>
74 int *papiEvents = NULL;
75 int numPapiEvents;
76 long_long *papiValues = NULL;
77 CmiUInt8 *total_papi_counters = NULL;
78 char **papi_counters_desc = NULL;
79 #define MAX_TOTAL_EVENTS 10
80 #endif
81
82
83 /****************************************************************************
84      little utility functions
85 ****************************************************************************/
86
87 char **BgGetArgv() { return arg_argv; }
88 int    BgGetArgc() { return arg_argc; }
89
90 /***************************************************************************
91      Implementation of the same counter interface currently used for the
92      Origin2000 using PAPI in the underlying layer.
93
94      Because of the difference in counter numbering, it is assumed that
95      the two counters desired are CYCLES and FLOPS and hence the numbers
96      e0 and e1 are ignored.
97
98      start_counters is also not implemented because PAPI does things in
99      a different manner.
100 ****************************************************************************/
101
102 #if CMK_HAS_COUNTER_PAPI
103 /*
104 CmiUInt8 total_ins = 0;
105 CmiUInt8 total_fps = 0;
106 CmiUInt8 total_l1_dcm = 0;
107 CmiUInt8 total_mem_rcy = 0;
108 */
109 int init_counters()
110 {
111     int retval = PAPI_library_init(PAPI_VER_CURRENT);
112
113     if (retval != PAPI_VER_CURRENT) { CmiAbort("PAPI library init error!"); } 
114
115         //a temporary
116         const char *eventDesc[MAX_TOTAL_EVENTS];
117         int lpapiEvents[MAX_TOTAL_EVENTS];
118         
119     numPapiEvents = 0;
120         
121 #define ADD_LOW_LEVEL_EVENT(e, edesc) \
122                 if(PAPI_query_event(e) == PAPI_OK){ \
123                         if(CmiMyPe()==0) printf("PAPI Info:> Event for %s added\n", edesc); \
124                         eventDesc[numPapiEvents] = edesc;       \
125                         lpapiEvents[numPapiEvents++] = e; \
126                 }
127                 
128     // PAPI high level API does not require explicit library intialization
129         eventDesc[numPapiEvents] ="cycles";
130     lpapiEvents[numPapiEvents++] = PAPI_TOT_CYC;
131         
132         
133     /*if (PAPI_query_event(PAPI_FP_INS) == PAPI_OK) {
134       if (CmiMyPe()== 0) printf("PAPI_FP_INS used\n");
135           eventDesc[numPapiEvents] = "floating point instructions";     
136       lpapiEvents[numPapiEvents++] = PAPI_FP_INS;         
137     } else {
138       if (CmiMyPe()== 0) printf("PAPI_TOT_INS used\n");
139           eventDesc[numPapiEvents] = "total instructions";      
140       lpapiEvents[numPapiEvents++] = PAPI_TOT_INS;
141     }
142     */
143         
144         //ADD_LOW_LEVEL_EVENT(PAPI_FP_INS, "floating point instructions");
145         ADD_LOW_LEVEL_EVENT(PAPI_TOT_INS, "total instructions");
146         //ADD_LOW_LEVEL_EVENT(PAPI_L1_DCM, "L1 cache misses");
147         ADD_LOW_LEVEL_EVENT(PAPI_L2_DCM, "L2 data cache misses");       
148         //ADD_LOW_LEVEL_EVENT(PAPI_MEM_RCY, "idle cycles waiting for memory reads");    
149         //ADD_LOW_LEVEL_EVENT(PAPI_TLB_DM, "Data TLB misses");
150         
151         if(numPapiEvents == 0){
152                 CmiAbort("No papi events are defined!\n");
153         }
154         
155         if(numPapiEvents >= MAX_TOTAL_EVENTS){
156                 CmiAbort("Exceed the pre-defined max number of papi events allowed!\n");
157         }
158         
159         papiEvents = new int[numPapiEvents];
160         papiValues = new long_long[numPapiEvents];
161         total_papi_counters = new CmiUInt8[numPapiEvents];
162         papi_counters_desc = new char *[numPapiEvents];
163         for(int i=0; i<numPapiEvents; i++){
164                 papiEvents[i] = lpapiEvents[i];
165                 total_papi_counters[i] = 0;
166                 CmiPrintf("%d: %s\n", i, eventDesc[i]);
167                 papi_counters_desc[i] = new char[strlen(eventDesc[i])+1];
168                 memcpy(papi_counters_desc[i], eventDesc[i], strlen(eventDesc[i]));
169                 papi_counters_desc[i][strlen(eventDesc[i])] = 0;
170         }
171         
172 /*
173     if (PAPI_query_event(PAPI_L1_DCM) == PAPI_OK) {
174       if (CmiMyPe()== 0) printf("PAPI_L1_DCM used\n");
175       papiEvents[numPapiEvents++] = PAPI_L1_DCM;   // L1 cache miss
176     }
177     if (PAPI_query_event(PAPI_TLB_DM) == PAPI_OK) {
178       if (CmiMyPe()== 0) printf("PAPI_TLB_DM used\n");
179       papiEvents[numPapiEvents++] = PAPI_TLB_DM;   // data TLB misses
180     }
181     if (PAPI_query_event(PAPI_MEM_RCY) == PAPI_OK) {
182       if (CmiMyPe()== 0) printf("PAPI_MEM_RCY used\n");
183       papiEvents[numPapiEvents++] = PAPI_MEM_RCY;  // idle cycle waiting for reads
184     }
185 */
186
187     int status = PAPI_start_counters(papiEvents, numPapiEvents);
188     if (status != PAPI_OK) {
189       CmiPrintf("PAPI_start_counters error code: %d\n", status);
190       switch (status) {
191       case PAPI_ENOEVNT:  CmiPrintf("PAPI Error> Hardware Event does not exist\n"); break;
192       case PAPI_ECNFLCT:  CmiPrintf("PAPI Error> Hardware Event exists, but cannot be counted due to counter resource limitations\n"); break;
193       }
194       CmiAbort("Unable to start PAPI counters!\n");
195     }
196 }
197
198 int read_counters(long_long *papiValues, int n) 
199 {
200   // PAPI_read_counters resets the counter, hence it behaves like the perfctr
201   // code for Origin2000
202   int status;
203   status = PAPI_read_counters(papiValues, n);
204   if (status != PAPI_OK) {
205     CmiPrintf("PAPI_read_counters error: %d\n", status);
206     CmiAbort("Failed to read PAPI counters!\n");
207   }
208   
209   /*
210   total_ins += papiValues[0];
211   total_fps += papiValues[1];
212   total_l1_dcm += papiValues[2];
213   total_mem_rcy += papiValues[3];
214   */
215   
216   return 0;
217 }
218
219 inline void CountPapiEvents(){
220   for(int i=0 ;i<numPapiEvents; i++)
221         total_papi_counters[i] += papiValues[i];
222 }
223
224 inline double Count2Time(long_long *papiValues, int n) { 
225   return papiValues[1]*cva(bgMach).fpfactor; 
226 }
227 #endif
228
229 /*****************************************************************************
230      Handler Table, one per thread
231 ****************************************************************************/
232
233 extern "C" void defaultBgHandler(char *null, void *uPtr)
234 {
235   CmiAbort("BG> Invalid Handler called!\n");
236 }
237
238 HandlerTable::HandlerTable()
239 {
240     handlerTableCount = 1;
241     handlerTable = new BgHandlerInfo [MAX_HANDLERS];
242     for (int i=0; i<MAX_HANDLERS; i++) {
243       handlerTable[i].fnPtr = defaultBgHandler;
244       handlerTable[i].userPtr = NULL;
245     }
246 }
247
248 inline int HandlerTable::registerHandler(BgHandler h)
249 {
250     ASSERT(!cva(simState).inEmulatorInit);
251     /* leave 0 as blank, so it can report error luckily */
252     int cur = handlerTableCount++;
253     if (cur >= MAX_HANDLERS)
254       CmiAbort("BG> HandlerID exceed the maximum.\n");
255     handlerTable[cur].fnPtr = (BgHandlerEx)h;
256     handlerTable[cur].userPtr = NULL;
257     return cur;
258 }
259
260 inline void HandlerTable::numberHandler(int idx, BgHandler h)
261 {
262     ASSERT(!cva(simState).inEmulatorInit);
263     if (idx >= handlerTableCount || idx < 1)
264       CmiAbort("BG> HandlerID exceed the maximum!\n");
265     handlerTable[idx].fnPtr = (BgHandlerEx)h;
266     handlerTable[idx].userPtr = NULL;
267 }
268
269 inline void HandlerTable::numberHandlerEx(int idx, BgHandlerEx h, void *uPtr)
270 {
271     ASSERT(!cva(simState).inEmulatorInit);
272     if (idx >= handlerTableCount || idx < 1)
273       CmiAbort("BG> HandlerID exceed the maximum!\n");
274     handlerTable[idx].fnPtr = h;
275     handlerTable[idx].userPtr = uPtr;
276 }
277
278 inline BgHandlerInfo * HandlerTable::getHandle(int handler)
279 {
280 #if 0
281     if (handler >= handlerTableCount) {
282       CmiPrintf("[%d] handler: %d handlerTableCount:%d. \n", tMYNODEID, handler, handlerTableCount);
283       CmiAbort("Invalid handler!");
284     }
285 #endif
286     if (handler >= handlerTableCount || handler<0) return NULL;
287     return &handlerTable[handler];
288 }
289
290 /*****************************************************************************
291       low level API
292 *****************************************************************************/
293
294 int BgRegisterHandler(BgHandler h)
295 {
296   ASSERT(!cva(simState).inEmulatorInit);
297   int cur;
298 #if CMK_BLUEGENE_NODE
299   return tMYNODE->handlerTable.registerHandler(h);
300 #else
301   if (tTHREADTYPE == COMM_THREAD) {
302     return tMYNODE->handlerTable.registerHandler(h);
303   }
304   else {
305     return tHANDLETAB.registerHandler(h);
306   }
307 #endif
308 }
309
310 void BgNumberHandler(int idx, BgHandler h)
311 {
312   ASSERT(!cva(simState).inEmulatorInit);
313 #if CMK_BLUEGENE_NODE
314   tMYNODE->handlerTable.numberHandler(idx,h);
315 #else
316   if (tTHREADTYPE == COMM_THREAD) {
317     tMYNODE->handlerTable.numberHandler(idx, h);
318   }
319   else {
320     tHANDLETAB.numberHandler(idx, h);
321   }
322 #endif
323 }
324
325 void BgNumberHandlerEx(int idx, BgHandlerEx h, void *uPtr)
326 {
327   ASSERT(!cva(simState).inEmulatorInit);
328 #if CMK_BLUEGENE_NODE
329   tMYNODE->handlerTable.numberHandlerEx(idx,h,uPtr);
330 #else
331   if (tTHREADTYPE == COMM_THREAD) {
332     tMYNODE->handlerTable.numberHandlerEx(idx,h,uPtr);
333   }
334   else {
335     tHANDLETAB.numberHandlerEx(idx,h,uPtr);
336   }
337 #endif
338 }
339
340 /*****************************************************************************
341       BG Timing Functions
342 *****************************************************************************/
343
344 void resetVTime()
345 {
346   /* reset start time */
347   int timingMethod = cva(bgMach).timingMethod;
348   if (timingMethod == BG_WALLTIME) {
349     double ct = BG_TIMER();
350     if (tTIMERON) CmiAssert(ct >= tSTARTTIME);
351     tSTARTTIME = ct;
352   }
353   else if (timingMethod == BG_ELAPSE)
354     tSTARTTIME = tCURRTIME;
355 #ifdef CMK_ORIGIN2000
356   else if (timingMethod == BG_COUNTER) {
357     if (start_counters(0, 21) <0) {
358       perror("start_counters");;
359     }
360   }
361 #elif CMK_HAS_COUNTER_PAPI
362   else if (timingMethod == BG_COUNTER) {
363     // do a fake read to reset the counters. It would be more efficient
364     // to use the low level API, but that would be a lot more code to
365     // write for now.
366     if (read_counters(papiValues, numPapiEvents) < 0) perror("read_counters");
367   }
368 #endif
369 }
370
371 void startVTimer()
372 {
373   CmiAssert(tTIMERON == 0);
374   resetVTime();
375   tTIMERON = 1;
376 }
377
378 // should be used only when BG_WALLTIME
379 static inline void advanceTime(double inc)
380 {
381   if (BG_ABS(inc) < 1e-10) inc = 0.0;    // ignore floating point errors
382   if (inc < 0.0) return;
383   CmiAssert(inc>=0.0);
384   inc *= cva(bgMach).cpufactor;
385   tCURRTIME += inc;
386   CmiAssert(tTIMERON==1);
387 }
388
389 void stopVTimer()
390 {
391   int k;
392 #if 0
393   if (tTIMERON != 1) {
394     CmiAbort("stopVTimer called without startVTimer!\n");
395   }
396   CmiAssert(tTIMERON == 1);
397 #else
398   if (tTIMERON == 0) return;         // already stopped
399 #endif
400   const int timingMethod = cva(bgMach).timingMethod;
401   if (timingMethod == BG_WALLTIME) {
402     const double tp = BG_TIMER();
403     double inc = tp-tSTARTTIME;
404     advanceTime(inc-cva(bgMach).timercost);
405 //    tSTARTTIME = BG_TIMER();  // skip the above time
406   }
407   else if (timingMethod == BG_ELAPSE) {
408     // if no bgelapse called, assume it takes 1us
409     if (tCURRTIME-tSTARTTIME < 1E-9) {
410 //      tCURRTIME += 1e-6;
411     }
412   }
413   else if (timingMethod == BG_COUNTER)  {
414 #if CMK_ORIGIN2000
415     long long c0, c1;
416     if (read_counters(0, &c0, 21, &c1) < 0) perror("read_counters");
417     tCURRTIME += Count2Time(c1);
418 #elif CMK_HAS_COUNTER_PAPI
419     if (read_counters(papiValues, numPapiEvents) < 0) perror("read_counters");
420     CountPapiEvents();
421     tCURRTIME += Count2Time(papiValues, numPapiEvents);
422 #endif
423   }
424   tTIMERON = 0;
425 }
426
427 double BgGetTime()
428 {
429 #if 1
430   const int timingMethod = cva(bgMach).timingMethod;
431   if (timingMethod == BG_WALLTIME) {
432     /* accumulate time since last starttime, and reset starttime */
433     if (tTIMERON) {
434       const double tp2= BG_TIMER();
435       double &startTime = tSTARTTIME;
436       double inc = tp2 - startTime;
437       advanceTime(inc-cva(bgMach).timercost);
438       startTime = BG_TIMER();
439     }
440     return tCURRTIME;
441   }
442   else if (timingMethod == BG_ELAPSE) {
443     return tCURRTIME;
444   }
445   else if (timingMethod == BG_COUNTER) {
446     if (tTIMERON) {
447 #if CMK_ORIGIN2000
448       long long c0, c1;
449       if (read_counters(0, &c0, 21, &c1) <0) perror("read_counters");;
450       tCURRTIME += Count2Time(c1);
451       if (start_counters(0, 21)<0) perror("start_counters");;
452 #elif CMK_HAS_COUNTER_PAPI
453     if (read_counters(papiValues, numPapiEvents) < 0) perror("read_counters");
454     tCURRTIME += Count2Time(papiValues, numPapiEvents);
455 #endif
456     }
457     return tCURRTIME;
458   }
459   else 
460     CmiAbort("Unknown Timing Method.");
461   return -1;
462 #else
463   /* sometime I am interested in real wall time */
464   tCURRTIME = CmiWallTimer();
465   return tCURRTIME;
466 #endif
467 }
468
469 // moved to blue_logs.C
470 double BgGetCurTime()
471 {
472   ASSERT(tTHREADTYPE == WORK_THREAD);
473   return tCURRTIME;
474 }
475
476 extern "C" 
477 void BgElapse(double t)
478 {
479 //  ASSERT(tTHREADTYPE == WORK_THREAD);
480   if (cva(bgMach).timingMethod == BG_ELAPSE)
481     tCURRTIME += t;
482 }
483
484 // advance virtual timer no matter what scheme is used
485 extern "C" 
486 void BgAdvance(double t)
487 {
488 //  ASSERT(tTHREADTYPE == WORK_THREAD);
489   tCURRTIME += t;
490 }
491
492 /* BG API Func
493  * called by a communication thread to test if poll data 
494  * in the node's INBUFFER for its own queue 
495  */
496 char * getFullBuffer()
497 {
498   /* I must be a communication thread */
499   if (tTHREADTYPE != COMM_THREAD) 
500     CmiAbort("GetFullBuffer called by a non-communication thread!\n");
501
502   return tMYNODE->getFullBuffer();
503 }
504
505 /**  BG API Func
506  * called by a Converse handler or sendPacket()
507  * add message msgPtr to a bluegene node's inbuffer queue 
508  */
509 extern "C"
510 void addBgNodeInbuffer(char *msgPtr, int lnodeID)
511 {
512 #ifndef CMK_OPTIMIZE
513   if (lnodeID >= cva(numNodes)) CmiAbort("NodeID is out of range!");
514 #endif
515   nodeInfo &nInfo = cva(nodeinfo)[lnodeID];
516
517   //printf("Adding a msg %p to local node %d and its thread %d\n", msgPtr, lnodeID, CmiBgMsgThreadID(msgPtr));  
518         
519   nInfo.addBgNodeInbuffer(msgPtr);
520 }
521
522 /** BG API Func 
523  *  called by a comm thread
524  *  add a message to a thread's affinity queue in same node 
525  */
526 void addBgThreadMessage(char *msgPtr, int threadID)
527 {
528 #ifndef CMK_OPTIMIZE
529   if (!cva(bgMach).isWorkThread(threadID)) CmiAbort("ThreadID is out of range!");
530 #endif
531   workThreadInfo *tInfo = (workThreadInfo *)tMYNODE->threadinfo[threadID];
532   tInfo->addAffMessage(msgPtr);
533 }
534
535 /** BG API Func 
536  *  called by a comm thread, add a message to a node's non-affinity queue 
537  */
538 void addBgNodeMessage(char *msgPtr)
539 {
540   tMYNODE->addBgNodeMessage(msgPtr);
541 }
542
543 void BgEnqueue(char *msg)
544 {
545 #if 0
546   ASSERT(tTHREADTYPE == WORK_THREAD);
547   workThreadInfo *tinfo = (workThreadInfo *)cta(threadinfo);
548   tinfo->addAffMessage(msg);
549 #else
550   nodeInfo *myNode = cta(threadinfo)->myNode;
551   addBgNodeInbuffer(msg, myNode->id);
552 #endif
553 }
554
555 /** BG API Func 
556  *  check if inBuffer on this node has msg available
557  */
558 int checkReady()
559 {
560   if (tTHREADTYPE != COMM_THREAD)
561     CmiAbort("checkReady called by a non-communication thread!\n");
562   return !tINBUFFER.isEmpty();
563 }
564
565
566 /* handler to process the msg */
567 void msgHandlerFunc(char *msg)
568 {
569   /* bgmsg is CmiMsgHeaderSizeBytes offset of original message pointer */
570   int gnodeID = CmiBgMsgNodeID(msg);
571   if (gnodeID >= 0) {
572 #ifndef CMK_OPTIMIZE
573     if (nodeInfo::Global2PE(gnodeID) != CmiMyPe())
574       CmiAbort("msgHandlerFunc received wrong message!");
575 #endif
576     int lnodeID = nodeInfo::Global2Local(gnodeID);
577     if (cva(bgMach).inReplayMode()) {
578       int x, y, z;
579       int node;
580       if (cva(bgMach).replay != -1) {
581         node = cva(bgMach).replay;
582         node = node / cva(bgMach).numWth;
583       }
584       if (cva(bgMach).replaynode != -1) {
585         node = cva(bgMach).replaynode;
586       }
587       BgGetXYZ(node, &x, &y, &z);
588       if (nodeInfo::XYZ2Local(x,y,z) != lnodeID) return;
589       else lnodeID = 0;
590     }
591     addBgNodeInbuffer(msg, lnodeID);
592   }
593   else {
594     CmiAbort("Invalid message!");
595   }
596 }
597
598 /* Converse handler for node level broadcast message */
599 void nodeBCastMsgHandlerFunc(char *msg)
600 {
601   /* bgmsg is CmiMsgHeaderSizeBytes offset of original message pointer */
602   int gnodeID = CmiBgMsgNodeID(msg);
603   CmiInt2 threadID = CmiBgMsgThreadID(msg);
604   int lnodeID;
605
606   if (gnodeID < -1) {
607     gnodeID = - (gnodeID+100);
608     if (cva(bgMach).replaynode != -1) {
609       if (gnodeID == cva(bgMach).replaynode)
610           lnodeID = 0;
611       else
612           lnodeID = -1;
613     }
614     else if (nodeInfo::Global2PE(gnodeID) == CmiMyPe())
615       lnodeID = nodeInfo::Global2Local(gnodeID);
616     else
617       lnodeID = -1;
618   }
619   else {
620     ASSERT(gnodeID == -1);
621     lnodeID = gnodeID;
622   }
623   // broadcast except lnodeID:threadId
624   int len = CmiBgMsgLength(msg);
625   int count = 0;
626   for (int i=0; i<cva(numNodes); i++)
627   {
628     if (i==lnodeID) continue;
629     char *dupmsg;
630     if (count == 0) dupmsg = msg;
631     else dupmsg = CmiCopyMsg(msg, len);
632     DEBUGF(("addBgNodeInbuffer to %d\n", i));
633     CmiBgMsgNodeID(dupmsg) = nodeInfo::Local2Global(i);         // updated
634     addBgNodeInbuffer(dupmsg, i);
635     count ++;
636   }
637   if (count == 0) CmiFree(msg);
638 }
639
640 // clone a msg, only has a valid header, plus a pointer to the real msg
641 char *BgCloneMsg(char *msg)
642 {
643   int size = CmiBlueGeneMsgHeaderSizeBytes + sizeof(char *);
644   char *dupmsg = (char *)CmiAlloc(size);
645   memcpy(dupmsg, msg, CmiBlueGeneMsgHeaderSizeBytes);
646   *(char **)(dupmsg + CmiBlueGeneMsgHeaderSizeBytes) = msg;
647   CmiBgMsgRefCount(msg) ++;
648   CmiBgMsgFlag(dupmsg) = BG_CLONE;
649   return dupmsg;
650 }
651
652 // expand the cloned msg to the full size msg
653 char *BgExpandMsg(char *msg)
654 {
655   char *origmsg = *(char **)(msg + CmiBlueGeneMsgHeaderSizeBytes);
656   int size = CmiBgMsgLength(origmsg);
657   char *dupmsg = (char *)CmiAlloc(size);
658   memcpy(dupmsg, msg, CmiBlueGeneMsgHeaderSizeBytes);
659   memcpy(dupmsg+CmiBlueGeneMsgHeaderSizeBytes, origmsg+CmiBlueGeneMsgHeaderSizeBytes, size-CmiBlueGeneMsgHeaderSizeBytes);
660   CmiFree(msg);
661   CmiBgMsgRefCount(origmsg) --;
662   if (CmiBgMsgRefCount(origmsg) == 0) CmiFree(origmsg);
663   return dupmsg;
664 }
665
666 /* Converse handler for thread level broadcast message */
667 void threadBCastMsgHandlerFunc(char *msg)
668 {
669   /* bgmsg is CmiMsgHeaderSizeBytes offset of original message pointer */
670   int gnodeID = CmiBgMsgNodeID(msg);
671   CmiInt2 threadID = CmiBgMsgThreadID(msg);
672   if (cva(bgMach).replay != -1) {
673     if (gnodeID < -1) {
674       gnodeID = - (gnodeID+100);
675       if (gnodeID == cva(bgMach).replay/cva(bgMach).numWth && threadID == cva(bgMach).replay%cva(bgMach).numWth)
676         return;
677     }
678     CmiBgMsgThreadID(msg) = 0;
679     DEBUGF(("[%d] addBgNodeInbuffer to %d tid:%d\n", CmiMyPe(), i, j));
680     addBgNodeInbuffer(msg, 0);
681     return;
682   }
683   int lnodeID;
684   if (gnodeID < -1) {
685       gnodeID = - (gnodeID+100);
686       if (cva(bgMach).replaynode != -1) {
687         if (gnodeID == cva(bgMach).replaynode)
688           lnodeID = 0;
689         else
690           lnodeID = -1;
691       }
692       else if (nodeInfo::Global2PE(gnodeID) == CmiMyPe())
693         lnodeID = nodeInfo::Global2Local(gnodeID);
694       else
695         lnodeID = -1;
696       CmiAssert(threadID != ANYTHREAD);
697   }
698   else {
699     ASSERT(gnodeID == -1);
700     lnodeID = gnodeID;
701   }
702   // broadcast except nodeID:threadId
703   int len = CmiBgMsgLength(msg);
704   // optimization needed if the message size is big
705   // making duplications can easily run out of memory
706   int bigOpt = (len > 4096);
707   for (int i=0; i<cva(numNodes); i++)
708   {
709       for (int j=0; j<cva(bgMach).numWth; j++) {
710         if (i==lnodeID && j==threadID) continue;
711         // for big message, clone a message token instead of a real msg
712         char *dupmsg = bigOpt? BgCloneMsg(msg) : CmiCopyMsg(msg, len);
713         CmiBgMsgNodeID(dupmsg) = nodeInfo::Local2Global(i);
714         CmiBgMsgThreadID(dupmsg) = j;
715         DEBUGF(("[%d] addBgNodeInbuffer to %d tid:%d\n", CmiMyPe(), i, j));
716         addBgNodeInbuffer(dupmsg, i);
717       }
718   }
719   // for big message, will free after all tokens are done
720   if (!bigOpt) CmiFree(msg);
721 }
722
723 /**
724  *              BG Messaging Functions
725  */
726
727 static inline double MSGTIME(int ox, int oy, int oz, int nx, int ny, int nz, int bytes)
728 {
729   return cva(bgMach).network->latency(ox, oy, oz, nx, ny, nz, bytes);
730 }
731
732 /**
733  *   a simple message streaming on demand and special purpose
734  *   user call  BgStartStreaming() and BgEndStreaming()
735  *   each worker thread call one send, and all sends are sent
736  *   via multiplesend at the end
737  */
738
739 static int bg_streaming = 0;
740
741 class BgStreaming {
742 public:
743   char **streamingMsgs;
744   int  *streamingMsgSizes;
745   int count;
746   int totalWorker;
747   int pe;
748 public:
749   BgStreaming() {
750     streamingMsgs = NULL;
751     streamingMsgSizes = NULL;
752     count = 0;
753     totalWorker = 0;
754     pe = -1;
755   }
756   ~BgStreaming() {
757     if (streamingMsgs) {
758       delete [] streamingMsgs;
759       delete [] streamingMsgSizes;
760     }
761   }
762   void init(int nNodes) {
763     totalWorker = nNodes * BgGetNumWorkThread();
764     streamingMsgs = new char *[totalWorker];
765     streamingMsgSizes = new int [totalWorker];
766   }
767   void depositMsg(int p, int size, char *m) {
768     streamingMsgs[count] = m;
769     streamingMsgSizes[count] = size;
770     count ++;
771     if (pe == -1) pe = p;
772     else CmiAssert(pe == p);
773     if (count == totalWorker) {
774       // CkPrintf("streaming send\n");
775       CmiMultipleSend(pe, count, streamingMsgSizes, streamingMsgs);
776       for (int i=0; i<count; i++) CmiFree(streamingMsgs[i]);
777       pe = -1;
778       count = 0;
779     }
780   }
781 };
782
783 BgStreaming bgstreaming;
784
785 void BgStartStreaming()
786 {
787   bg_streaming = 1;
788 }
789
790 void BgEndStreaming()
791 {
792   bg_streaming = 0;
793 }
794
795 void CmiSendPacketWrapper(int pe, int msgSize,char *msg, int streaming)
796 {
797   if (streaming && pe != CmiMyPe())
798     bgstreaming.depositMsg(pe, msgSize, msg);
799   else
800     CmiSyncSendAndFree(pe, msgSize, msg);
801 }
802
803
804 void CmiSendPacket(int x, int y, int z, int msgSize,char *msg)
805 {
806 //  CmiSyncSendAndFree(nodeInfo::XYZ2RealPE(x,y,z),msgSize,(char *)msg);
807 #if !DELAY_SEND
808   const int pe = nodeInfo::XYZ2RealPE(x,y,z);
809   CmiSendPacketWrapper(pe, msgSize, msg, bg_streaming);
810 #else
811   if (!correctTimeLog) {
812     const int pe = nodeInfo::XYZ2RealPE(x,y,z);
813     CmiSendPacketWrapper(pe, msgSize, msg, bg_streaming);
814   }
815   // else messages are kept in the log (MsgEntry), and only will be sent
816   // after timing correction has done on that log.
817   // TODO: streaming has no effect if time correction is on.
818 #endif
819 }
820
821 /* send will copy data to msg buffer */
822 /* user data is not free'd in this routine, user can reuse the data ! */
823 void sendPacket_(nodeInfo *myNode, int x, int y, int z, int threadID, int handlerID, WorkType type, int numbytes, char* sendmsg, int local)
824 {
825   //CmiPrintStackTrace(0);
826   double sendT = BgGetTime();
827
828   double latency;
829   CmiSetHandler(sendmsg, cva(simState).msgHandler);
830   CmiBgMsgNodeID(sendmsg) = nodeInfo::XYZ2Global(x,y,z);
831   CmiBgMsgThreadID(sendmsg) = threadID;
832   CmiBgMsgHandle(sendmsg) = handlerID;
833   CmiBgMsgType(sendmsg) = type;
834   CmiBgMsgLength(sendmsg) = numbytes;
835   CmiBgMsgFlag(sendmsg) = 0;
836   CmiBgMsgRefCount(sendmsg) = 0;
837   if (local) {
838     if (correctTimeLog) BgAdvance(CHARM_OVERHEAD);
839     latency = 0.0;
840   }
841   else {
842     if (correctTimeLog) BgAdvance(cva(bgMach).network->alphacost());
843     latency = MSGTIME(myNode->x, myNode->y, myNode->z, x,y,z, numbytes);
844     CmiAssert(latency >= 0);
845   }
846   CmiBgMsgRecvTime(sendmsg) = latency + sendT;
847   
848   // timing
849   BG_ADDMSG(sendmsg, CmiBgMsgNodeID(sendmsg), threadID, sendT, local, 1);
850
851   //static int addCnt=1; //for debugging only
852   //DEBUGM(4, ("N[%d] add a msg (handler=%d | cnt=%d | len=%d | type=%d | node id:%d\n", BgMyNode(), handlerID, addCnt, numbytes, type, CmiBgMsgNodeID(sendmsg)));  
853   //addCnt++;
854
855   if (local){
856       /* Here local refers to the fact that msg is sent to the processor itself
857        * therefore, we just add this msg to the thread itself
858        */
859       //addBgThreadMessage(sendmsg,threadID);
860       addBgNodeInbuffer(sendmsg, myNode->id);
861   }    
862   else
863     CmiSendPacket(x, y, z, numbytes, sendmsg);
864
865   // bypassing send time
866   resetVTime();
867 }
868
869 /* broadcast will copy data to msg buffer */
870 static inline void nodeBroadcastPacketExcept_(int node, CmiInt2 threadID, int handlerID, WorkType type, int numbytes, char* sendmsg)
871 {
872   double sendT = BgGetTime();
873
874   nodeInfo *myNode = cta(threadinfo)->myNode;
875   CmiSetHandler(sendmsg, cva(simState).nBcastMsgHandler);
876   if (node >= 0)
877     CmiBgMsgNodeID(sendmsg) = -node-100;
878   else
879     CmiBgMsgNodeID(sendmsg) = node;
880   CmiBgMsgThreadID(sendmsg) = threadID; 
881   CmiBgMsgHandle(sendmsg) = handlerID;  
882   CmiBgMsgType(sendmsg) = type; 
883   CmiBgMsgLength(sendmsg) = numbytes;
884   CmiBgMsgFlag(sendmsg) = 0;
885   CmiBgMsgRefCount(sendmsg) = 0;
886   /* FIXME */
887   CmiBgMsgRecvTime(sendmsg) = MSGTIME(myNode->x, myNode->y, myNode->z, 0,0,0, numbytes) + sendT;
888
889   // timing
890   // FIXME
891   BG_ADDMSG(sendmsg, CmiBgMsgNodeID(sendmsg), threadID, sendT, 0, 1);
892
893   DEBUGF(("[%d]CmiSyncBroadcastAllAndFree node: %d\n", BgMyNode(), node));
894 #if DELAY_SEND
895   if (!correctTimeLog)
896 #endif
897   CmiSyncBroadcastAllAndFree(numbytes,sendmsg);
898
899   resetVTime();
900 }
901
902 /* broadcast will copy data to msg buffer */
903 static inline void threadBroadcastPacketExcept_(int node, CmiInt2 threadID, int handlerID, WorkType type, int numbytes, char* sendmsg)
904 {
905   CmiSetHandler(sendmsg, cva(simState).tBcastMsgHandler);       
906   if (node >= 0)
907     CmiBgMsgNodeID(sendmsg) = -node-100;
908   else
909     CmiBgMsgNodeID(sendmsg) = node;
910   CmiBgMsgThreadID(sendmsg) = threadID; 
911   CmiBgMsgHandle(sendmsg) = handlerID;  
912   CmiBgMsgType(sendmsg) = type; 
913   CmiBgMsgLength(sendmsg) = numbytes;
914   CmiBgMsgFlag(sendmsg) = 0;
915   CmiBgMsgRefCount(sendmsg) = 0;
916   /* FIXME */
917   if (correctTimeLog) BgAdvance(cva(bgMach).network->alphacost());
918   double sendT = BgGetTime();
919   CmiBgMsgRecvTime(sendmsg) = sendT;    
920
921   // timing
922 #if 0
923   if (node == BG_BROADCASTALL) {
924     for (int i=0; i<_bgSize; i++) {
925       for (int j=0; j<cva(numWth); j++) {
926         BG_ADDMSG(sendmsg, node);
927       }
928     }
929   }
930   else {
931     CmiAssert(node >= 0);
932     BG_ADDMSG(sendmsg, (node+100));
933   }
934 #else
935   // FIXME
936   BG_ADDMSG(sendmsg, CmiBgMsgNodeID(sendmsg), threadID, sendT, 0, 1);
937 #endif
938
939   DEBUGF(("[%d]CmiSyncBroadcastAllAndFree node: %d tid:%d recvT:%f\n", BgMyNode(), node, threadID, CmiBgMsgRecvTime(sendmsg)));
940 #if DELAY_SEND
941   if (!correctTimeLog)
942 #endif
943   CmiSyncBroadcastAllAndFree(numbytes,sendmsg);
944
945   resetVTime();
946 }
947
948
949 /* sendPacket to route */
950 /* this function can be called by any thread */
951 void BgSendNonLocalPacket(nodeInfo *myNode, int x, int y, int z, int threadID, int handlerID, WorkType type, int numbytes, char * data)
952 {
953   if (cva(bgMach).inReplayMode()) return;     // replay mode, no outgoing msg
954
955 #ifndef CMK_OPTIMIZE
956   if (x<0 || y<0 || z<0 || x>=cva(bgMach).x || y>=cva(bgMach).y || z>=cva(bgMach).z) {
957     CmiPrintf("Trying to send packet to a nonexisting node: (%d %d %d)!\n", x,y,z);
958     CmiAbort("Abort!\n");
959   }
960 #endif
961
962   sendPacket_(myNode, x, y, z, threadID, handlerID, type, numbytes, data, 0);
963 }
964
965 static void _BgSendLocalPacket(nodeInfo *myNode, int threadID, int handlerID, WorkType type, int numbytes, char * data)
966 {
967   if (cva(bgMach).replay!=-1) { // replay mode
968     int t = cva(bgMach).replay%BgGetNumWorkThread();
969     if (t == threadID) threadID = 0;
970     else return;
971   }
972
973   sendPacket_(myNode, myNode->x, myNode->y, myNode->z, threadID, handlerID, type, numbytes, data, 1);
974 }
975
976 void BgSendLocalPacket(int threadID, int handlerID, WorkType type,
977                        int numbytes, char* data)
978 {
979   nodeInfo *myNode = cta(threadinfo)->myNode;
980
981   if (cva(bgMach).replay!=-1) {     // replay mode
982     threadID = 0;
983     CmiAssert(threadID != -1);
984   }
985
986   _BgSendLocalPacket(myNode, threadID, handlerID, type, numbytes, data);
987 }
988
989 /* wrapper of the previous two functions */
990 void BgSendPacket(int x, int y, int z, int threadID, int handlerID, WorkType type, int numbytes, char * data)
991 {
992   nodeInfo *myNode = cta(threadinfo)->myNode;
993   if (myNode->x == x && myNode->y == y && myNode->z == z)
994     _BgSendLocalPacket(myNode,threadID, handlerID, type, numbytes, data);
995   else
996     BgSendNonLocalPacket(myNode,x,y,z,threadID,handlerID, type, numbytes, data);
997 }
998
999 void BgBroadcastPacketExcept(int node, CmiInt2 threadID, int handlerID, WorkType type, int numbytes, char * data)
1000 {
1001   nodeBroadcastPacketExcept_(node, threadID, handlerID, type, numbytes, data);
1002 }
1003
1004 void BgBroadcastAllPacket(int handlerID, WorkType type, int numbytes, char * data)
1005 {
1006   nodeBroadcastPacketExcept_(BG_BROADCASTALL, ANYTHREAD, handlerID, type, numbytes, data);
1007 }
1008
1009 void BgThreadBroadcastPacketExcept(int node, CmiInt2 threadID, int handlerID, WorkType type, int numbytes, char * data)
1010 {
1011   if (cva(bgMach).replay!=-1) return;    // replay mode
1012   else if (cva(bgMach).replaynode!=-1) {    // replay mode
1013     //if (node!=-1 && node == cva(bgMach).replaynode/cva(bgMach).numWth)
1014     //  return;
1015   }
1016   threadBroadcastPacketExcept_(node, threadID, handlerID, type, numbytes, data);
1017 }
1018
1019 void BgThreadBroadcastAllPacket(int handlerID, WorkType type, int numbytes, char * data)
1020 {
1021   if (cva(bgMach).replay!=-1) {      // replay mode, send only to itself
1022     int t = cva(bgMach).replay%BgGetNumWorkThread();
1023     BgSendLocalPacket(t, handlerID, type, numbytes, data);
1024     return;
1025   }
1026   threadBroadcastPacketExcept_(BG_BROADCASTALL, ANYTHREAD, handlerID, type, numbytes, data);
1027 }
1028
1029 /**
1030  send a msg to a list of processors (processors represented in global seq #
1031 */
1032 void BgSyncListSend(int npes, int *pes, int handlerID, WorkType type, int numbytes, char *msg)
1033 {
1034   nodeInfo *myNode = cta(threadinfo)->myNode;
1035
1036   CmiSetHandler(msg, cva(simState).msgHandler);
1037   CmiBgMsgHandle(msg) = handlerID;
1038   CmiBgMsgType(msg) = type;
1039   CmiBgMsgLength(msg) = numbytes;
1040   CmiBgMsgFlag(msg) = 0;
1041   CmiBgMsgRefCount(msg) = 0;
1042
1043   if (correctTimeLog) BgAdvance(cva(bgMach).network->alphacost());
1044
1045   double now = BgGetTime();
1046
1047   // send one by one
1048   for (int i=0; i<npes; i++)
1049   {
1050     int local = 0;
1051     int x,y,z,t;
1052     int pe = pes[i];
1053     int node;
1054 #if CMK_BLUEGENE_NODE
1055     CmiAbort("Not implemented yet!");
1056 #else
1057     t = pe%BgGetNumWorkThread();
1058     node = pe/BgGetNumWorkThread();
1059     BgGetXYZ(node, &x, &y, &z);
1060 #endif
1061
1062     char *sendmsg = CmiCopyMsg(msg, numbytes);
1063     CmiBgMsgNodeID(sendmsg) = nodeInfo::XYZ2Global(x,y,z);
1064     CmiBgMsgThreadID(sendmsg) = t;
1065     double latency = MSGTIME(myNode->x, myNode->y, myNode->z, x,y,z, numbytes);
1066     CmiAssert(latency >= 0);
1067     CmiBgMsgRecvTime(sendmsg) = latency + now;
1068
1069     if (myNode->x == x && myNode->y == y && myNode->z == z) local = 1;
1070
1071     // timing and make sure all msgID are the same
1072     if (i!=0) CpvAccess(msgCounter) --;
1073     BG_ADDMSG(sendmsg, CmiBgMsgNodeID(sendmsg), t, now, local, i==0?npes:-1);
1074
1075     BgSendPacket(x, y, z, t, handlerID, type, numbytes, sendmsg);
1076 /*
1077     if (myNode->x == x && myNode->y == y && myNode->z == z)
1078       addBgNodeInbuffer(sendmsg, myNode->id);
1079     else
1080       CmiSendPacket(x, y, z, numbytes, sendmsg);
1081 */
1082   }
1083
1084   CmiFree(msg);
1085
1086   resetVTime();
1087 }
1088
1089 /*****************************************************************************
1090       BG node level API - utilities
1091 *****************************************************************************/
1092
1093 /* must be called in a communication or worker thread */
1094 void BgGetMyXYZ(int *x, int *y, int *z)
1095 {
1096   ASSERT(!cva(simState).inEmulatorInit);
1097   *x = tMYX; *y = tMYY; *z = tMYZ;
1098 }
1099
1100 void BgGetXYZ(int seq, int *x, int *y, int *z)
1101 {
1102   nodeInfo::Global2XYZ(seq, x, y, z);
1103 }
1104
1105 void BgGetSize(int *sx, int *sy, int *sz)
1106 {
1107   cva(bgMach).getSize(sx, sy, sz);
1108 }
1109
1110 int BgTraceProjectionOn(int pe)
1111 {
1112   return cva(bgMach).traceProejctions(pe);
1113 }
1114
1115 /* return the total number of Blue gene nodes */
1116 int BgNumNodes()
1117 {
1118   return _bgSize;
1119 }
1120
1121 void BgSetNumNodes(int x)
1122 {
1123   _bgSize = x;
1124 }
1125
1126 /* can only called in emulatorinit */
1127 void BgSetSize(int sx, int sy, int sz)
1128 {
1129   ASSERT(cva(simState).inEmulatorInit);
1130   cva(bgMach).setSize(sx, sy, sz);
1131 }
1132
1133 /* return number of bg nodes on this emulator node */
1134 int BgNodeSize()
1135 {
1136   ASSERT(!cva(simState).inEmulatorInit);
1137   return cva(numNodes);
1138 }
1139
1140 /* return the bg node ID (local array index) */
1141 int BgMyRank()
1142 {
1143 #ifndef CMK_OPTIMIZE
1144   if (tMYNODE == NULL) CmiAbort("Calling BgMyRank in the main thread!");
1145 #endif
1146   ASSERT(!cva(simState).inEmulatorInit);
1147   return tMYNODEID;
1148 }
1149
1150 /* return my serialed blue gene node number */
1151 int BgMyNode()
1152 {
1153 #ifndef CMK_OPTIMIZE
1154   if (tMYNODE == NULL) CmiAbort("Calling BgMyNode in the main thread!");
1155 #endif
1156   return nodeInfo::XYZ2Global(tMYX, tMYY, tMYZ);
1157 }
1158
1159 /* return a real processor number from a bg node */
1160 int BgNodeToRealPE(int node)
1161 {
1162   return nodeInfo::Global2PE(node);
1163 }
1164
1165 // thread ID on a BG node
1166 int BgGetThreadID()
1167 {
1168   ASSERT(tTHREADTYPE == WORK_THREAD || tTHREADTYPE == COMM_THREAD);
1169 //  if (cva(bgMach).numWth == 1) return 0;   // accessing ctv is expensive
1170   return tMYID;
1171 }
1172
1173 void BgSetThreadID(int x)
1174 {
1175   tMYID = x;
1176 }
1177
1178 int BgGetGlobalThreadID()
1179 {
1180   ASSERT(tTHREADTYPE == WORK_THREAD || tTHREADTYPE == COMM_THREAD);
1181   return nodeInfo::Local2Global(tMYNODEID)*(cva(bgMach).numTh())+tMYID;
1182   //return tMYGLOBALID;
1183 }
1184
1185 int BgGetGlobalWorkerThreadID()
1186 {
1187   ASSERT(tTHREADTYPE == WORK_THREAD);
1188 //  return nodeInfo::Local2Global(tMYNODEID)*cva(bgMach).numWth+tMYID;
1189   return tMYGLOBALID;
1190 }
1191
1192 void BgSetGlobalWorkerThreadID(int pe)
1193 {
1194   ASSERT(tTHREADTYPE == WORK_THREAD);
1195 //  return nodeInfo::Local2Global(tMYNODEID)*cva(bgMach).numWth+tMYID;
1196   tMYGLOBALID = pe;
1197 }
1198
1199 char *BgGetNodeData()
1200 {
1201   return tUSERDATA;
1202 }
1203
1204 void BgSetNodeData(char *data)
1205 {
1206   ASSERT(!cva(simState).inEmulatorInit);
1207   tUSERDATA = data;
1208 }
1209
1210 int BgGetNumWorkThread()
1211 {
1212   return cva(bgMach).numWth;
1213 }
1214
1215 void BgSetNumWorkThread(int num)
1216 {
1217   if (!cva(bgMach).inReplayMode()) ASSERT(cva(simState).inEmulatorInit);
1218   cva(bgMach).numWth = num;
1219 }
1220
1221 int BgGetNumCommThread()
1222 {
1223   return cva(bgMach).numCth;
1224 }
1225
1226 void BgSetNumCommThread(int num)
1227 {
1228   ASSERT(cva(simState).inEmulatorInit);
1229   cva(bgMach).numCth = num;
1230 }
1231
1232 /*****************************************************************************
1233       Communication and Worker threads
1234 *****************************************************************************/
1235
1236 BgStartHandler  workStartFunc = NULL;
1237
1238 void BgSetWorkerThreadStart(BgStartHandler f)
1239 {
1240   workStartFunc = f;
1241 }
1242
1243 extern "C" void CthResumeNormalThread(CthThreadToken* token);
1244
1245 // kernel function for processing a bluegene message
1246 void BgProcessMessageDefault(threadInfo *tinfo, char *msg)
1247 {
1248   DEBUGM(5, ("=====Begin of BgProcessing a msg on node[%d]=====\n", BgMyNode()));
1249   int handler = CmiBgMsgHandle(msg);
1250   //CmiPrintf("[%d] call handler %d\n", BgMyNode(), handler);
1251   CmiAssert(handler < 1000);
1252
1253   BgHandlerInfo *handInfo;
1254 #if  CMK_BLUEGENE_NODE
1255   HandlerTable hdlTbl = tMYNODE->handlerTable;
1256   handInfo = hdlTbl.getHandle(handler);
1257 #else
1258   HandlerTable hdlTbl = tHANDLETAB;
1259   handInfo = hdlTbl.getHandle(handler);
1260   if (handInfo == NULL) handInfo = tMYNODE->handlerTable.getHandle(handler);
1261 #endif
1262
1263   if (handInfo == NULL) {
1264     CmiPrintf("[%d] invalid handler: %d. \n", tMYNODEID, handler);
1265     CmiAbort("BgProcessMessage Failed!");
1266   }
1267   BgHandlerEx entryFunc = handInfo->fnPtr;
1268
1269   if (programExit == 2) return;    // program exit already
1270
1271   CmiSetHandler(msg, CmiBgMsgHandle(msg));
1272
1273   // optimization for broadcast messages:
1274   // if the msg is a broadcast token msg, expand it to a real msg
1275   if (CmiBgMsgFlag(msg) == BG_CLONE) {
1276     msg = BgExpandMsg(msg);
1277   }
1278
1279   if (tinfo->watcher) tinfo->watcher->record(msg);
1280
1281   // don't count thread overhead and timing overhead
1282   startVTimer();
1283
1284   DEBUGM(5, ("Executing function %p\n", entryFunc));    
1285
1286   entryFunc(msg, handInfo->userPtr);
1287
1288   stopVTimer();
1289
1290   DEBUGM(5, ("=====End of BgProcessing a msg on node[%d]=====\n\n", BgMyNode()));
1291 }
1292
1293 void  (*BgProcessMessage)(threadInfo *t, char *msg) = BgProcessMessageDefault;
1294
1295 void scheduleWorkerThread(char *msg)
1296 {
1297   CthThread tid = (CthThread)msg;
1298 //CmiPrintf("scheduleWorkerThread %p\n", tid);
1299   CthAwaken(tid);
1300 }
1301
1302 // thread entry
1303 // for both comm and work thread, virtual function
1304 void run_thread(threadInfo *tinfo)
1305 {
1306   /* set the thread-private threadinfo */
1307   cta(threadinfo) = tinfo;
1308   tinfo->run();
1309 }
1310
1311 /* should be done only once per bg node */
1312 void BgNodeInitialize(nodeInfo *ninfo)
1313 {
1314   CthThread t;
1315   int i;
1316
1317   /* this is here because I will put a message to node inbuffer */
1318   tCURRTIME = 0.0;
1319   tSTARTTIME = CmiWallTimer();
1320
1321   /* creat work threads */
1322   for (i=0; i< cva(bgMach).numWth; i++)
1323   {
1324     threadInfo *tinfo = ninfo->threadinfo[i];
1325     t = CthCreate((CthVoidFn)run_thread, tinfo, cva(bgMach).stacksize);
1326     if (t == NULL) CmiAbort("BG> Failed to create worker thread. \n");
1327     tinfo->setThread(t);
1328     /* put to thread table */
1329     tTHREADTABLE[tinfo->id] = t;
1330 #if BIGSIM_OUT_OF_CORE && BIGSIM_OOC_PREFETCH
1331     //initial scheduling points for workthreads
1332     if(bgUseOutOfCore) schedWorkThds->push((workThreadInfo *)tinfo);
1333 #endif
1334     CthAwaken(t);
1335   }
1336
1337   /* creat communication thread */
1338   for (i=0; i< cva(bgMach).numCth; i++)
1339   {
1340     threadInfo *tinfo = ninfo->threadinfo[i+cva(bgMach).numWth];
1341     t = CthCreate((CthVoidFn)run_thread, tinfo, cva(bgMach).stacksize);
1342     if (t == NULL) CmiAbort("BG> Failed to create communication thread. \n");
1343     tinfo->setThread(t);
1344     /* put to thread table */
1345     tTHREADTABLE[tinfo->id] = t;
1346     CthAwaken(t);
1347   }
1348
1349 }
1350
1351 static void beginExitHandlerFunc(void *msg);
1352 static void writeToDisk();
1353 static void sendCorrectionStats();
1354
1355 static CmiHandler exitHandlerFunc(char *msg)
1356 {
1357   // TODO: free memory before exit
1358   int i,j;
1359
1360   programExit = 2;
1361 #if BLUEGENE_TIMING
1362   // timing
1363   if (0)        // detail
1364   if (genTimeLog) {
1365     for (j=0; j<cva(numNodes); j++)
1366     for (i=0; i<cva(bgMach).numWth; i++) {
1367       BgTimeLine &log = cva(nodeinfo)[j].timelines[i].timeline; 
1368 //      BgPrintThreadTimeLine(nodeInfo::Local2Global(j), i, log);
1369       int x,y,z;
1370       nodeInfo::Local2XYZ(j, &x, &y, &z);
1371       BgWriteThreadTimeLine(arg_argv[0], x, y, z, i, log);
1372     }
1373
1374   }
1375 #endif
1376   if (genTimeLog) sendCorrectionStats();
1377
1378   if (genTimeLog) writeToDisk();
1379
1380 //  if (tTHREADTYPE == WORK_THREAD)
1381   {
1382   int origPe = -2;
1383   // close all tracing modules
1384   for (j=0; j<cva(numNodes); j++)
1385     for (i=0; i<cva(bgMach).numWth; i++) {
1386       int pe = nodeInfo::Local2Global(j)*cva(bgMach).numWth+i;
1387       int oldPe = CmiSwitchToPE(pe);
1388       if (cva(bgMach).replay != -1)
1389         if ( pe != cva(bgMach).replay ) continue;
1390       if (origPe == -2) origPe = oldPe;
1391       traceCharmClose();
1392 //      CmiSwitchToPE(oldPe);
1393       delete cva(nodeinfo)[j].threadinfo[i]->watcher;   // force dump watcher
1394       cva(nodeinfo)[j].threadinfo[i]->watcher = NULL;
1395     }
1396     if (origPe!=-2) CmiSwitchToPE(origPe);
1397   }
1398
1399 #if 0
1400   delete [] cva(nodeinfo);
1401   delete [] cva(inBuffer);
1402   for (i=0; i<cva(numNodes); i++) CmmFree(cva(msgBuffer)[i]);
1403   delete [] cva(msgBuffer);
1404 #endif
1405
1406 #if CMK_HAS_COUNTER_PAPI
1407   if (cva(bgMach).timingMethod == BG_COUNTER) {
1408 /*        
1409   CmiPrintf("BG[PE %d]> cycles: %lld\n", CmiMyPe(), total_ins);
1410   CmiPrintf("BG[PE %d]> floating point instructions: %lld\n", CmiMyPe(), total_fps);
1411   CmiPrintf("BG[PE %d]> L1 cache misses: %lld\n", CmiMyPe(), total_l1_dcm);
1412   //CmiPrintf("BG[PE %d]> cycles stalled waiting for memory access: %lld\n", CmiMyPe(), total_mem_rcy);
1413  */
1414         for(int i=0; i<numPapiEvents; i++){
1415           CmiPrintf("BG[PE %d]> %s: %lld\n", CmiMyPe(), papi_counters_desc[i], total_papi_counters[i]);
1416            delete papi_counters_desc[i];
1417         }
1418         delete papiEvents;
1419         delete papiValues;
1420         delete papi_counters_desc;
1421         delete total_papi_counters;
1422   }
1423 #endif
1424
1425   //ConverseExit();
1426   if (genTimeLog)
1427     { if (CmiMyPe() != 0) CsdExitScheduler(); }
1428   else
1429     CsdExitScheduler();
1430
1431   //if (CmiMyPe() == 0) CmiPrintf("\nBG> BlueGene emulator shutdown gracefully!\n");
1432
1433   return 0;
1434 }
1435
1436 static void sanityCheck()
1437 {
1438   if (cva(bgMach).x==0 || cva(bgMach).y==0 || cva(bgMach).z==0)  {
1439     if (CmiMyPe() == 0)
1440       CmiPrintf("\nMissing parameters for BlueGene machine size!\n<tip> use command line options: +x, +y, or +z.\n");
1441     BgShutdown(); 
1442   } 
1443   else if (cva(bgMach).numCth==0 || cva(bgMach).numWth==0) { 
1444 #if 1
1445     if (cva(bgMach).numCth==0) cva(bgMach).numCth=1;
1446     if (cva(bgMach).numWth==0) cva(bgMach).numWth=1;
1447 #else
1448     if (CmiMyPe() == 0)
1449       CmiPrintf("\nMissing parameters for number of communication/worker threads!\n<tip> use command line options: +cth or +wth.\n");
1450     BgShutdown(); 
1451 #endif
1452   }
1453   if (cva(bgMach).getNodeSize()<CmiNumPes()) {
1454     CmiAbort("\nToo few BlueGene nodes!\n");
1455   }
1456 }
1457
1458 #undef CmiSwitchToPE
1459 extern "C" int CmiSwitchToPEFn(int pe);
1460
1461 // main
1462 CmiStartFn bgMain(int argc, char **argv)
1463 {
1464   int i;
1465   char *configFile = NULL;
1466
1467   BgProcessMessage = BgProcessMessageDefault;
1468 #if CMK_CONDS_USE_SPECIAL_CODE
1469   // overwrite possible implementation in machine.c
1470   CmiSwitchToPE = CmiSwitchToPEFn;
1471 #endif
1472
1473   /* initialize all processor level data */
1474   CpvInitialize(BGMach,bgMach);
1475   cva(bgMach).nullify();
1476
1477   CmiArgGroup("Charm++","BlueGene Simulator");
1478   if (CmiGetArgStringDesc(argv, "+bgconfig", &configFile, "BlueGene machine config file")) {
1479    cva(bgMach). read(configFile);
1480   }
1481   CmiGetArgIntDesc(argv, "+x", &cva(bgMach).x, 
1482                 "The x size of the grid of nodes");
1483   CmiGetArgIntDesc(argv, "+y", &cva(bgMach).y, 
1484                 "The y size of the grid of nodes");
1485   CmiGetArgIntDesc(argv, "+z", &cva(bgMach).z, 
1486                 "The z size of the grid of nodes");
1487   CmiGetArgIntDesc(argv, "+cth", &cva(bgMach).numCth, 
1488                 "The number of simulated communication threads per node");
1489   CmiGetArgIntDesc(argv, "+wth", &cva(bgMach).numWth, 
1490                 "The number of simulated worker threads per node");
1491
1492   CmiGetArgIntDesc(argv, "+bgstacksize", &cva(bgMach).stacksize, 
1493                 "Blue Gene thread stack size");
1494
1495   if (CmiGetArgFlagDesc(argv, "+bglog", "Write events to log file"))
1496      genTimeLog = 1;
1497   if (CmiGetArgFlagDesc(argv, "+bgcorrect", "Apply timestamp correction to logs"))
1498     correctTimeLog = 1;
1499   schedule_flag = 0;
1500   if (correctTimeLog) {
1501     genTimeLog = 1;
1502     schedule_flag = 1;
1503   }
1504
1505   if (CmiGetArgFlagDesc(argv, "+bgverbose", "Print debug info verbosely"))
1506     bgverbose = 1;
1507
1508   // for timing method, default using elapse calls.
1509   if(CmiGetArgFlagDesc(argv, "+bgelapse", 
1510                        "Use user provided BgElapse for time prediction")) 
1511       cva(bgMach).timingMethod = BG_ELAPSE;
1512   if(CmiGetArgFlagDesc(argv, "+bgwalltime", 
1513                        "Use walltime method for time prediction")) 
1514       cva(bgMach).timingMethod = BG_WALLTIME;
1515 #ifdef CMK_ORIGIN2000
1516   if(CmiGetArgFlagDesc(argv, "+bgcounter", "Use performance counter")) 
1517       cva(bgMach).timingMethod = BG_COUNTER;
1518 #elif CMK_HAS_COUNTER_PAPI
1519   if (CmiGetArgFlagDesc(argv, "+bgpapi", "Use PAPI Performance counters")) {
1520     cva(bgMach).timingMethod = BG_COUNTER;
1521   }
1522   if (cva(bgMach).timingMethod == BG_COUNTER) {
1523     init_counters();
1524   }
1525 #endif
1526   CmiGetArgDoubleDesc(argv,"+bgfpfactor", &cva(bgMach).fpfactor, 
1527                       "floating point to time factor");
1528   CmiGetArgDoubleDesc(argv,"+bgcpufactor", &cva(bgMach).cpufactor, 
1529                       "scale factor for wallclock time measured");
1530   CmiGetArgDoubleDesc(argv,"+bgtimercost", &cva(bgMach).timercost, 
1531                       "timer cost");
1532   #if 0
1533   if(cva(bgMach).timingMethod == BG_WALLTIME)
1534   {
1535       int count = 1e6;
1536       double start, stop, diff, cost, dummy;
1537
1538       dummy = BG_TIMER(); // In case there's an initialization delay somewhere
1539
1540       start = BG_TIMER();
1541       for (int i = 0; i < count; ++i)
1542           dummy = BG_TIMER();
1543       stop = BG_TIMER();
1544
1545       diff = stop - start;
1546       cost = diff / count;
1547
1548       CmiPrintf("Measured timer cost: %g Actual: %g\n", 
1549                 cost, cva(bgMach).timercost);
1550       cva(bgMach).timercost = cost;
1551   }
1552   #endif
1553   
1554   char *networkModel;
1555   if (CmiGetArgStringDesc(argv, "+bgnetwork", &networkModel, "Network model")) {
1556    cva(bgMach).setNetworkModel(networkModel);
1557   }
1558
1559   bgcorroff = 0;
1560   if(CmiGetArgFlagDesc(argv, "+bgcorroff", "Start with correction off")) 
1561     bgcorroff = 1;
1562
1563   bgstats_flag=0;
1564   if(CmiGetArgFlagDesc(argv, "+bgstats", "Print correction statistics")) 
1565     bgstats_flag = 1;
1566
1567   if (CmiGetArgStringDesc(argv, "+bgtraceroot", &cva(bgMach).traceroot, "Directory to write bgTrace files to"))
1568   {
1569     char *root = (char*)malloc(strlen(cva(bgMach).traceroot) + 10);
1570     sprintf(root, "%s/", cva(bgMach).traceroot);
1571     cva(bgMach).traceroot = root;
1572   }
1573
1574   // record/replay
1575   if (CmiGetArgFlagDesc(argv,"+bgrecord","Record message processing order for BigSim")) {
1576     cva(bgMach).record = 1;
1577     if (CmiMyPe() == 0)
1578       CmiPrintf("BG info> full record mode. \n");
1579   }
1580   if (CmiGetArgFlagDesc(argv,"+bgrecordnode","Record message processing order for BigSim")) {
1581     cva(bgMach).recordnode = 1;
1582     if (CmiMyPe() == 0)
1583       CmiPrintf("BG info> full record mode on node. \n");
1584   }
1585   int replaype;
1586   if (CmiGetArgIntDesc(argv,"+bgreplay", &replaype, "Re-play message processing order for BigSim")) {
1587     cva(bgMach).replay = replaype;
1588   }
1589   else {
1590     if (CmiGetArgFlagDesc(argv,"+bgreplay","Record message processing order for BigSim"))
1591     cva(bgMach).replay = 0;    // default to 0
1592   }
1593   if (cva(bgMach).replay >= 0) {
1594     if (CmiNumPes()>1)
1595       CmiAbort("BG> bgreplay mode must run on one physical processor.");
1596     if (cva(bgMach).x!=1 || cva(bgMach).y!=1 || cva(bgMach).z!=1 ||
1597          cva(bgMach).numWth!=1 || cva(bgMach).numCth!=1)
1598       CmiAbort("BG> bgreplay mode must run on one target processor.");
1599     CmiPrintf("BG info> replay mode for target processor %d.\n", cva(bgMach).replay);
1600   }
1601   char *procs = NULL;
1602   if (CmiGetArgStringDesc(argv, "+bgrecordprocessors", &procs, "A list of processors to record, e.g. 0,10,20-30")) {
1603     cva(bgMach).recordprocs.set(procs);
1604   }
1605
1606     // record/replay at node level
1607   char *nodes = NULL;
1608   if (CmiGetArgStringDesc(argv, "+bgrecordnodes", &nodes, "A list of nodes to record, e.g. 0,10,20-30")) {
1609     cva(bgMach).recordnodes.set(nodes);
1610   }
1611   int replaynode;
1612   if (CmiGetArgIntDesc(argv,"+bgreplaynode", &replaynode, "Re-play message processing order for BigSim")) {      
1613     cva(bgMach).replaynode = replaynode; 
1614   }
1615   else {
1616     if (CmiGetArgFlagDesc(argv,"+bgreplaynode","Record message processing order for BigSim"))
1617     cva(bgMach).replaynode = 0;    // default to 0
1618   }
1619   if (cva(bgMach).replaynode >= 0) {
1620     int startpe, endpe;
1621     BgRead_nodeinfo(replaynode, startpe, endpe);
1622     if (cva(bgMach).numWth != endpe-startpe+1) {
1623       cva(bgMach).numWth = endpe-startpe+1;     // update wth
1624       CmiPrintf("BG info> numWth is changed to %d.\n", cva(bgMach).numWth);
1625     }
1626     if (CmiNumPes()>1)
1627       CmiAbort("BG> bgreplay mode must run on one physical processor.");
1628     if (cva(bgMach).x!=1 || cva(bgMach).y!=1 || cva(bgMach).z!=1)
1629       CmiAbort("BG> bgreplay mode must run on one target processor.");
1630     CmiPrintf("BG info> replay mode for target node %d.\n", cva(bgMach).replaynode);   
1631   }     
1632   CmiAssert(!(cva(bgMach).replaynode != -1 && cva(bgMach).replay != -1));
1633
1634
1635   /* parameters related with out-of-core execution */
1636   int tmpcap=0;
1637   if (CmiGetArgIntDesc(argv, "+bgooccap", &tmpcap, "Simulate with out-of-core support and the number of target processors allowed in memory")){
1638      TBLCAPACITY = tmpcap;
1639     }
1640   if (CmiGetArgDoubleDesc(argv, "+bgooc", &bgOOCMaxMemSize, "Simulate with out-of-core support and the threshhold of memory size")){
1641       bgUseOutOfCore = 1;
1642       _BgInOutOfCoreMode = 1; //the global (the whole converse layer) out-of-core flag
1643
1644       double curFreeMem = bgGetSysFreeMemSize();
1645       if(fabs(bgOOCMaxMemSize - 0.0)<=1e-6){
1646         //using the memory available of the system right now
1647         //assuming no other programs will run after this program runs
1648         bgOOCMaxMemSize = curFreeMem;
1649         CmiPrintf("Using the system's current memory available: %.3fMB\n", bgOOCMaxMemSize);
1650       }
1651       if(bgOOCMaxMemSize > curFreeMem){
1652         CmiPrintf("Warning: not enough memory for the specified memory size, now use the current available memory %3.fMB.\n", curFreeMem);
1653         bgOOCMaxMemSize = curFreeMem;
1654       }
1655       DEBUGF(("out-of-core turned on!\n"));
1656   }      
1657
1658 #if BLUEGENE_DEBUG_LOG
1659   {
1660     char ln[200];
1661     sprintf(ln,"bgdebugLog.%d",CmiMyPe());
1662     bgDebugLog=fopen(ln,"w");
1663   }
1664 #endif
1665
1666   arg_argv = argv;
1667   arg_argc = CmiGetArgc(argv);
1668
1669   /* msg handler */
1670   CpvInitialize(SimState, simState);
1671   cva(simState).msgHandler = CmiRegisterHandler((CmiHandler) msgHandlerFunc);
1672   cva(simState).nBcastMsgHandler = CmiRegisterHandler((CmiHandler)nodeBCastMsgHandlerFunc);
1673   cva(simState).tBcastMsgHandler = CmiRegisterHandler((CmiHandler)threadBCastMsgHandlerFunc);
1674   cva(simState).exitHandler = CmiRegisterHandler((CmiHandler) exitHandlerFunc);
1675
1676   cva(simState).beginExitHandler = CmiRegisterHandler((CmiHandler) beginExitHandlerFunc);
1677   cva(simState).inEmulatorInit = 1;
1678   /* call user defined BgEmulatorInit */
1679   BgEmulatorInit(arg_argc, arg_argv);
1680   cva(simState).inEmulatorInit = 0;
1681
1682   /* check if all bluegene node size and thread information are set */
1683   sanityCheck();
1684
1685   _bgSize = cva(bgMach).getNodeSize(); 
1686
1687   timerFunc = BgGetTime;
1688
1689   BgInitTiming();               // timing module
1690
1691   if (CmiMyPe() == 0) {
1692     CmiPrintf("BG info> Simulating %dx%dx%d nodes with %d comm + %d work threads each.\n", cva(bgMach).x, cva(bgMach).y, cva(bgMach).z, cva(bgMach).numCth, cva(bgMach).numWth);
1693     CmiPrintf("BG info> Network type: %s.\n", cva(bgMach).network->name());
1694     cva(bgMach).network->print();
1695     CmiPrintf("BG info> cpufactor is %f.\n", cva(bgMach).cpufactor);
1696     CmiPrintf("BG info> floating point factor is %f.\n", cva(bgMach).fpfactor);
1697     if (cva(bgMach).stacksize)
1698       CmiPrintf("BG info> BG stack size: %d bytes. \n", cva(bgMach).stacksize);
1699     if (cva(bgMach).timingMethod == BG_ELAPSE) 
1700       CmiPrintf("BG info> Using BgElapse calls for timing method. \n");
1701     else if (cva(bgMach).timingMethod == BG_WALLTIME)
1702       CmiPrintf("BG info> Using WallTimer for timing method. \n");
1703     else if (cva(bgMach).timingMethod == BG_COUNTER)
1704       CmiPrintf("BG info> Using performance counter for timing method. \n");
1705     if (genTimeLog)
1706       CmiPrintf("BG info> Generating timing log. \n");
1707     if (correctTimeLog)
1708       CmiPrintf("BG info> Perform timestamp correction. \n");
1709     if (cva(bgMach).traceroot)
1710       CmiPrintf("BG info> bgTrace root is '%s'. \n", cva(bgMach).traceroot);
1711   }
1712
1713   CtvInitialize(threadInfo *, threadinfo);
1714
1715   /* number of bg nodes on this PE */
1716   CpvInitialize(int, numNodes);
1717   cva(numNodes) = nodeInfo::numLocalNodes();
1718
1719   if (CmiMyRank() == 0)
1720     initLock = CmiCreateLock();     // used for BnvInitialize
1721
1722   bgstreaming.init(cva(numNodes));
1723
1724   //Must initialize out-of-core related data structures before creating any BG nodes and procs
1725 if(bgUseOutOfCore){
1726       initTblThreadInMem();
1727           #if BIGSIM_OUT_OF_CORE && BIGSIM_OOC_PREFETCH
1728       //init prefetch status      
1729       thdsOOCPreStatus = new oocPrefetchStatus[cva(numNodes)*cva(bgMach).numWth];
1730       oocPrefetchSpace = new oocPrefetchBufSpace();
1731       schedWorkThds = new oocWorkThreadQueue();
1732           #endif
1733   }
1734
1735 #if BIGSIM_OUT_OF_CORE
1736   //initialize variables related to get precise
1737   //physical memory usage info for a process
1738   bgMemPageSize = getpagesize();
1739   memset(bgMemStsFile, 0, 25); 
1740   sprintf(bgMemStsFile, "/proc/%d/statm", getpid());
1741 #endif
1742
1743
1744   /* create BG nodes */
1745   CpvInitialize(nodeInfo *, nodeinfo);
1746   cva(nodeinfo) = new nodeInfo[cva(numNodes)];
1747   _MEMCHECK(cva(nodeinfo));
1748
1749   cta(threadinfo) = new threadInfo(-1, UNKNOWN_THREAD, NULL);
1750   _MEMCHECK(cta(threadinfo));
1751
1752   /* create BG processors for each node */
1753   for (i=0; i<cva(numNodes); i++)
1754   {
1755     nodeInfo *ninfo = cva(nodeinfo) + i;
1756     // create threads
1757     ninfo->initThreads(i);
1758
1759     /* pretend that I am a thread */
1760     cta(threadinfo)->myNode = ninfo;
1761
1762     /* initialize a BG node and fire all threads */
1763     BgNodeInitialize(ninfo);
1764   }
1765   // clear main thread.
1766   cta(threadinfo)->myNode = NULL;
1767   CpvInitialize(CthThread, mainThread);
1768   cva(mainThread) = CthSelf();
1769
1770   CpvInitialize(int, CthResumeBigSimThreadIdx);
1771
1772   cva(simState).simStartTime = CmiWallTimer();    
1773   return 0;
1774 }
1775
1776 // for conv-conds:
1777 // if -2 untouch
1778 // if -1 main thread
1779 #if CMK_BLUEGENE_THREAD
1780 extern "C" int CmiSwitchToPEFn(int pe)
1781 {
1782   if (pe == -2) return -2;
1783   int oldpe;
1784 //  ASSERT(tTHREADTYPE != COMM_THREAD);
1785   if (tMYNODE == NULL) oldpe = -1;
1786   else if (tTHREADTYPE == COMM_THREAD) oldpe = -BgGetThreadID();
1787   else if (tTHREADTYPE == WORK_THREAD) oldpe = BgGetGlobalWorkerThreadID();
1788   else oldpe = -1;
1789 //CmiPrintf("CmiSwitchToPE from %d to %d\n", oldpe, pe);
1790   if (pe == -1) {
1791     CthSwitchThread(cva(mainThread));
1792   }
1793   else if (pe < 0) {
1794   }
1795   else {
1796     if (cva(bgMach).inReplayMode()) pe = 0;         /* replay mode */
1797     int t = pe%cva(bgMach).numWth;
1798     int newpe = nodeInfo::Global2Local(pe/cva(bgMach).numWth);
1799     nodeInfo *ninfo = cva(nodeinfo) + newpe;;
1800     threadInfo *tinfo = ninfo->threadinfo[t];
1801     CthSwitchThread(tinfo->me);
1802   }
1803   return oldpe;
1804 }
1805 #else
1806 extern "C" int CmiSwitchToPEFn(int pe)
1807 {
1808   if (pe == -2) return -2;
1809   int oldpe;
1810   if (tMYNODE == NULL) oldpe = -1;
1811   else oldpe = BgMyNode();
1812   if (pe == -1) {
1813     cta(threadinfo)->myNode = NULL;
1814   }
1815   else {
1816     int newpe = nodeInfo::Global2Local(pe);
1817     cta(threadinfo)->myNode = cva(nodeinfo) + newpe;;
1818   }
1819   return oldpe;
1820 }
1821 #endif
1822
1823
1824 /*****************************************************************************
1825                         TimeLog correction
1826 *****************************************************************************/
1827
1828 extern void processCorrectionMsg(int nodeidx);
1829
1830 // return the msg pointer, and the index of the message in the affinity queue.
1831 static inline char* searchInAffinityQueue(int nodeidx, BgMsgID &msgId, CmiInt2 tID, int &index)
1832 {
1833   CmiAssert(tID != ANYTHREAD);
1834   ckMsgQueue &affinityQ = cva(nodeinfo)[nodeidx].affinityQ[tID];
1835   for (int i=0; i<affinityQ.length(); i++)  {
1836       char *msg = affinityQ[i];
1837       BgMsgID md = BgMsgID(CmiBgMsgSrcPe(msg), CmiBgMsgID(msg));
1838       if (msgId == md) {
1839         index = i;
1840         return msg;
1841       }
1842   }
1843   return NULL;
1844 }
1845
1846 // return the msg pointer, thread id and the index of the message in the affinity queue.
1847 static char* searchInAffinityQueueInNode(int nodeidx, BgMsgID &msgId, CmiInt2 &tID, int &index)
1848 {
1849   for (tID=0; tID<cva(bgMach).numWth; tID++) {
1850     char *msg = searchInAffinityQueue(nodeidx, msgId, tID, index);
1851     if (msg) return msg;
1852   }
1853   return NULL;
1854 }
1855
1856 StateCounters stateCounters;
1857
1858 int updateRealMsgs(bgCorrectionMsg *cm, int nodeidx)
1859 {
1860   char *msg;
1861   CmiInt2 tID = cm->tID;
1862   int index;
1863   if (tID == ANYTHREAD) {
1864     msg = searchInAffinityQueueInNode(nodeidx, cm->msgId, tID, index);
1865   }
1866   else {
1867     msg = searchInAffinityQueue(nodeidx, cm->msgId, tID, index);
1868   }
1869   if (msg == NULL) return 0;
1870
1871   ckMsgQueue &affinityQ = cva(nodeinfo)[nodeidx].affinityQ[tID];
1872   CmiBgMsgRecvTime(msg) = cm->tAdjust;
1873   affinityQ.update(index);
1874   CthThread tid = cva(nodeinfo)[nodeidx].threadTable[tID];
1875   unsigned int prio = (unsigned int)(cm->tAdjust*PRIO_FACTOR)+1;
1876   CthAwakenPrio(tid, CQS_QUEUEING_IFIFO, sizeof(int), &prio);
1877   stateCounters.corrMsgCRCnt++;
1878   return 1;       /* invalidate this msg */
1879 }
1880
1881 extern void processBufferCorrectionMsgs(void *ignored);
1882
1883 // Coverse handler for begin exit
1884 // flush and process all correction messages
1885 static void beginExitHandlerFunc(void *msg)
1886 {
1887   CmiFree(msg);
1888   delayCheckFlag = 0;
1889 //CmiPrintf("\n\n\nbeginExitHandlerFunc called on %d\n", CmiMyPe());
1890   programExit = 1;
1891 #if LIMITED_SEND
1892   CQdCreate(CpvAccess(cQdState), BgNodeSize());
1893 #endif
1894
1895 #if 1
1896   for (int i=0; i<BgNodeSize(); i++) 
1897     processCorrectionMsg(i); 
1898 #if USE_MULTISEND
1899   BgSendBufferedCorrMsgs();
1900 #endif
1901 #else
1902   // the msg queue should be empty now.
1903   // don't do insert adjustment, but start to do all timing correction from here
1904   int nodeidx, tID;
1905   for (nodeidx=0; nodeidx<BgNodeSize(); nodeidx++) {
1906     BgTimeLineRec *tlines = cva(nodeinfo)[nodeidx].timelines;
1907     for (tID=0; tID<cva(numWth); tID++) {
1908         BgTimeLineRec &tlinerec = tlines[tID];
1909         BgAdjustTimeLine(tlinerec, nodeidx, tID);
1910     }
1911   }
1912
1913 #endif
1914
1915 #if !THROTTLE_WORK
1916 #if DELAY_CHECK
1917   CcdCallFnAfter(processBufferCorrectionMsgs,NULL,CHECK_INTERVAL);
1918 #endif
1919 #endif
1920 }
1921
1922 #define HISTOGRAM_SIZE  100
1923 // compute total CPU utilization for each timeline and 
1924 // return the number of real msgs
1925 void computeUtilForAll(int* array, int *nReal)
1926 {
1927   double scale = 1.0e3;         // scale to ms
1928
1929   //We measure from 1ms to 5001 ms in steps of 100 ms
1930   int min = 0, step = 1;
1931   int max = min + HISTOGRAM_SIZE*step;
1932   // [min, max: step]  HISTOGRAM_SIZE slots
1933
1934   if (CmiMyPe()==0)
1935     CmiPrintf("computeUtilForAll: min:%d max:%d step:%d scale:%f.\n", min, max, step, scale);
1936   int size = (max-min)/step;
1937   CmiAssert(size == HISTOGRAM_SIZE);
1938   int allmin = -1, allmax = -1;
1939   for(int i=0;i<size;i++) array[i] = 0;
1940
1941   for (int nodeidx=0; nodeidx<cva(numNodes); nodeidx++) {
1942     BgTimeLineRec *tlinerec = cva(nodeinfo)[nodeidx].timelines;
1943     for (int tID=0; tID<cva(bgMach).numWth; tID++) {
1944       int util = (int)(scale*(tlinerec[tID].computeUtil(nReal)));
1945
1946       if (util >= max) { if (util>allmax||allmax==-1) allmax=util; util=max-1;}
1947       if (util < min) { if (util<allmin||allmin==-1) allmin=util; util=min; }
1948       array[(util-min)/step]++;
1949     }
1950   }
1951   if (allmin!=-1 || allmax!=-1)
1952     CmiPrintf("[%d] Warning: computeUtilForAll out of range %f - %f.\n", CmiMyPe(), (allmin==-1)?-1:allmin/scale, (allmax==-1)?-1:allmax/scale);
1953 }
1954
1955 class StatsMessage {
1956   char core[CmiBlueGeneMsgHeaderSizeBytes];
1957 public:
1958   int processCount;
1959   int corrMsgCount;
1960   int realMsgCount;
1961   int maxTimelineLen, minTimelineLen;
1962 };
1963
1964 extern int processCount, corrMsgCount;
1965
1966 static void sendCorrectionStats()
1967 {
1968   int msgSize = sizeof(StatsMessage)+sizeof(int)*HISTOGRAM_SIZE;
1969   StatsMessage *statsMsg = (StatsMessage *)CmiAlloc(msgSize);
1970   statsMsg->processCount = processCount;
1971   statsMsg->corrMsgCount = corrMsgCount;
1972   int numMsgs=0;
1973   int maxTimelineLen=-1, minTimelineLen=CMK_MAXINT;
1974   int totalMem = 0;
1975   if (bgstats_flag) {
1976   for (int nodeidx=0; nodeidx<cva(numNodes); nodeidx++) {
1977     BgTimeLineRec *tlines = cva(nodeinfo)[nodeidx].timelines;
1978     for (int tID=0; tID<cva(bgMach).numWth; tID++) {
1979         BgTimeLineRec &tlinerec = tlines[tID];
1980         int tlen = tlinerec.length();
1981         if (tlen>maxTimelineLen) maxTimelineLen=tlen;
1982         if (tlen<minTimelineLen) minTimelineLen=tlen;
1983         totalMem = tlen*sizeof(BgTimeLog);
1984 //CmiPrintf("[%d node:%d] BgTimeLog: %dK len:%d size of bglog: %d bytes\n", CmiMyPe(), nodeidx, totalMem/1000, tlen, sizeof(BgTimeLog));
1985 #if 0
1986         for (int i=0; i< tlinerec.length(); i++) {
1987           numMsgs += tlinerec[i]->msgs.length();
1988         }
1989 #endif
1990     }
1991   }
1992   computeUtilForAll((int*)(statsMsg+1), &numMsgs);
1993   statsMsg->realMsgCount = numMsgs;
1994   statsMsg->maxTimelineLen = maxTimelineLen;
1995   statsMsg->minTimelineLen = minTimelineLen;
1996   }  // end if
1997
1998   CmiSetHandler(statsMsg, cva(simState).bgStatCollectHandler);
1999   CmiSyncSendAndFree(0, msgSize, statsMsg);
2000 }
2001
2002 // Converse handler for collecting stats
2003 void statsCollectionHandlerFunc(void *msg)
2004 {
2005   static int count=0;
2006   static int pc=0, cc=0, realMsgCount=0;
2007   static int maxTimelineLen=0, minTimelineLen=CMK_MAXINT;
2008   static int *histArray = NULL;
2009   int i;
2010
2011   count++;
2012   if (histArray == NULL) {
2013     histArray = new int[HISTOGRAM_SIZE];
2014     for (i=0; i<HISTOGRAM_SIZE; i++) histArray[i]=0;
2015   }
2016   StatsMessage *m = (StatsMessage *)msg;
2017   pc += m->processCount;
2018   cc += m->corrMsgCount;
2019   realMsgCount += m->realMsgCount;
2020   if (minTimelineLen> m->minTimelineLen) minTimelineLen=m->minTimelineLen;
2021   if (maxTimelineLen< m->maxTimelineLen) maxTimelineLen=m->maxTimelineLen;
2022   int *array = (int *)(m+1);
2023   for (i=0; i<HISTOGRAM_SIZE; i++) histArray[i] += array[i];
2024   if (count == CmiNumPes()) {
2025     if (bgstats_flag) {
2026       CmiPrintf("Total procCount:%d corrMsgCount:%d realMsg:%d timeline:%d-%d\n", pc, cc, realMsgCount, minTimelineLen, maxTimelineLen);
2027       for (i=0; i<HISTOGRAM_SIZE; i++) {
2028         CmiPrintf("%d ", histArray[i]);
2029         if (i%20 == 19) CmiPrintf("\n");
2030       }
2031       CmiPrintf("\n");
2032     }
2033     CsdExitScheduler();
2034   }
2035   CmiFree(msg);
2036 }
2037
2038 // update arrival time from buffer messages
2039 // before start an entry, check message time against buffered timing
2040 // correction message to update to the correct time.
2041 void correctMsgTime(char *msg)
2042 {
2043    if (!correctTimeLog) return;
2044 //   if (CkMsgDoCorrect(msg) == 0) return;
2045
2046    BgMsgID msgId(CmiBgMsgSrcPe(msg), CmiBgMsgID(msg));
2047    CmiInt2 tid = CmiBgMsgThreadID(msg);
2048
2049    bgCorrectionQ &cmsg = cva(nodeinfo)[tMYNODEID].cmsg;
2050    int len = cmsg.length();
2051    for (int i=0; i<len; i++) {
2052      bgCorrectionMsg* m = cmsg[i];
2053      if (msgId == m->msgId && tid == m->tID) {
2054         if (m->tAdjust < 0.0) return;
2055         //CmiPrintf("correctMsgTime from %e to %e\n", CmiBgMsgRecvTime(msg), m->tAdjust);
2056         CmiBgMsgRecvTime(msg) = m->tAdjust;
2057         m->tAdjust = -1.0;       /* invalidate this msg */
2058 //      cmsg.update(i);
2059         stateCounters.corrMsgRCCnt++;
2060         break;
2061      }
2062    }
2063 }
2064
2065
2066 //TODO: write disk bgTraceFiles
2067 static void writeToDisk()
2068 {
2069
2070   char* d = new char[1025];
2071   //Num of simulated procs on this real pe
2072   int numLocalProcs = cva(numNodes)*cva(bgMach).numWth;
2073
2074   const PUP::machineInfo &machInfo = PUP::machineInfo::current();
2075
2076   // write summary file on PE0
2077   if(CmiMyPe()==0){
2078     
2079     sprintf(d, "%sbgTrace", cva(bgMach).traceroot?cva(bgMach).traceroot:""); 
2080     FILE *f2 = fopen(d,"wb");
2081     //Total emulating processors and total target BG processors
2082     int numEmulatingPes=CmiNumPes();
2083     int totalWorkerProcs = BgNumNodes()*cva(bgMach).numWth;
2084
2085     if(f2==NULL) {
2086       CmiPrintf("[%d] Creating trace file %s  failed\n", CmiMyPe(), d);
2087       CmiAbort("BG> Abort");
2088     }
2089     PUP::toDisk p(f2);
2090     p((char *)&machInfo, sizeof(machInfo));
2091     p|totalWorkerProcs;
2092     p|cva(bgMach);
2093     p|numEmulatingPes;
2094     p|bglog_version;
2095     p|CpvAccess(CthResumeBigSimThreadIdx);
2096     
2097     CmiPrintf("[0] Number is numX:%d numY:%d numZ:%d numCth:%d numWth:%d numEmulatingPes:%d totalWorkerProcs:%d bglog_ver:%d\n",cva(bgMach).x,cva(bgMach).y,cva(bgMach).z,cva(bgMach).numCth,cva(bgMach).numWth,numEmulatingPes,totalWorkerProcs,bglog_version);
2098     
2099     fclose(f2);
2100   }
2101   
2102   sprintf(d, "%sbgTrace%d", cva(bgMach).traceroot?cva(bgMach).traceroot:"", CmiMyPe()); 
2103   FILE *f = fopen(d,"wb");
2104  
2105   if(f==NULL)
2106     CmiPrintf("Creating bgTrace%d failed\n",CmiMyPe());
2107   PUP::toDisk p(f);
2108   
2109   p((char *)&machInfo, sizeof(machInfo));       // machine info
2110   p|numLocalProcs;
2111
2112   // CmiPrintf("Timelines are: \n");
2113   int procTablePos = ftell(f);
2114
2115   int *procOffsets = new int[numLocalProcs];
2116   int procTableSize = (numLocalProcs)*sizeof(int);
2117   fseek(f,procTableSize,SEEK_CUR); 
2118
2119   for (int j=0; j<cva(numNodes); j++){
2120     for(int i=0;i<cva(bgMach).numWth;i++){
2121     #if BIGSIM_OUT_OF_CORE
2122         //When isomalloc is used, some events inside BgTimeLineRec are allocated
2123         //through isomalloc. Therefore, the memory containing those events needs
2124         //to be brought back into memory from disk. --Chao Mei          
2125         if(bgUseOutOfCore && CmiMemoryIs(CMI_MEMORY_IS_ISOMALLOC))
2126             bgOutOfCoreSchedule(cva(nodeinfo)[j].threadinfo[i]);
2127      #endif     
2128       BgTimeLineRec &t = cva(nodeinfo)[j].timelines[i];
2129       procOffsets[j*cva(bgMach).numWth + i] = ftell(f);
2130       t.pup(p);
2131     }
2132   }
2133   
2134   fseek(f,procTablePos,SEEK_SET);
2135   p(procOffsets,numLocalProcs);
2136   fclose(f);
2137
2138   CmiPrintf("[%d] Wrote to disk for %d BG nodes. \n", CmiMyPe(), cva(numNodes));
2139 }
2140
2141
2142 /*****************************************************************************
2143              application Converse thread hook
2144 *****************************************************************************/
2145
2146 CpvExtern(int      , CthResumeBigSimThreadIdx);
2147
2148 void CthEnqueueBigSimThread(CthThreadToken* token, int s,
2149                                    int pb,unsigned int *prio)
2150 {
2151 /*
2152   CmiSetHandler(token, CpvAccess(CthResumeNormalThreadIdx));
2153   CsdEnqueueGeneral(token, s, pb, prio);
2154 */
2155 #if CMK_BLUEGENE_THREAD
2156   int x, y, z;
2157   BgGetMyXYZ(&x, &y, &z);
2158   int t = BgGetThreadID();
2159 #else
2160   #error "ERROR HERE"
2161 #endif
2162     // local message into queue
2163   DEBUGM(4, ("In EnqueueBigSimThread method!\n"));
2164
2165   DEBUGM(4, ("token [%p] is added to queue pointing to thread[%p]\n", token, token->thread));
2166   BgSendPacket(x,y,z, t, CpvAccess(CthResumeBigSimThreadIdx), LARGE_WORK, sizeof(CthThreadToken), (char *)token);
2167
2168
2169 CthThread CthSuspendBigSimThread()
2170
2171   return  cta(threadinfo)->me;
2172 }
2173
2174 static void bigsimThreadListener_suspend(struct CthThreadListener *l)
2175 {
2176    // stop timer
2177    stopVTimer();
2178 }
2179
2180 static void bigsimThreadListener_resume(struct CthThreadListener *l)
2181 {
2182    // start timer by reset it
2183    resetVTime();
2184 }
2185
2186
2187 void BgSetStrategyBigSimDefault(CthThread t)
2188
2189   CthSetStrategy(t,
2190                  CthEnqueueBigSimThread,
2191                  CthSuspendBigSimThread);
2192
2193   CthThreadListener *a = new CthThreadListener;
2194   a->suspend = bigsimThreadListener_suspend;
2195   a->resume = bigsimThreadListener_resume;
2196   a->free = NULL;
2197   CthAddListener(t, a);
2198 }
2199
2200 int BgIsMainthread()
2201 {
2202     return tMYNODE == NULL;
2203 }
2204
2205 int BgIsRecord()
2206 {
2207     return cva(bgMach).record == 1;
2208 }
2209
2210 int BgIsReplay()
2211 {
2212     return cva(bgMach).replay != -1 || cva(bgMach).replaynode != -1;
2213 }
2214
2215 extern "C" void CkReduce(void *msg, int size, CmiReduceMergeFn mergeFn) {
2216   ((workThreadInfo*)cta(threadinfo))->reduceMsg = msg;
2217   //CmiPrintf("Called CkReduce from %d %hd\n",CmiMyPe(),cta(threadinfo)->globalId);
2218   int numLocal = 0, count = 0;
2219   for (int j=0; j<cva(numNodes); j++){
2220     for(int i=0;i<cva(bgMach).numWth;i++){
2221       workThreadInfo *t = (workThreadInfo*)cva(nodeinfo)[j].threadinfo[i];
2222       if (t->reduceMsg == NULL) return; /* we are not yet ready to reduce */
2223       numLocal ++;
2224     }
2225   }
2226   void **msgLocal = (void**)malloc(sizeof(void*)*(numLocal-1));
2227   for (int j=0; j<cva(numNodes); j++){
2228     for(int i=0;i<cva(bgMach).numWth;i++){
2229       workThreadInfo *t = (workThreadInfo*)cva(nodeinfo)[j].threadinfo[i];
2230       if (t == cta(threadinfo)) break;
2231       msgLocal[count++] = t->reduceMsg;
2232       t->reduceMsg = NULL;
2233     }
2234   }
2235   CmiAssert(count==numLocal-1);
2236   msg = mergeFn(&size, msg, msgLocal, numLocal-1);
2237   CmiReduce(msg, size, mergeFn);
2238   CmiPrintf("Called CmiReduce %d\n",CmiMyPe());
2239   for (int i=0; i<numLocal-1; ++i) CmiFree(msgLocal[i]);
2240   free(msgLocal);
2241 }
2242
2243 // for record/replay, to fseek back
2244 void BgRewindRecord()
2245 {
2246   threadInfo *tinfo = cta(threadinfo);
2247   if (tinfo->watcher) tinfo->watcher->rewind();
2248 }
2249