Implemented CkReduce to mimic CmiReduce but within the virtualized BigSim.
[charm.git] / src / langs / bluegene / blue.C
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 /** \file: blue.C -- Converse BlueGene Emulator Code
9  *  Emulator written by Gengbin Zheng, gzheng@uiuc.edu on 2/20/2001
10  */
11  
12 #include <stdio.h>
13 #include <string.h>
14 #include <stdlib.h>
15 #include <math.h>
16 #include <string.h>
17 #include <unistd.h>
18
19 #include "cklists.h"
20 #include "queueing.h"
21 #include "blue.h"
22 #include "blue_impl.h"          // implementation header file
23 //#include "blue_timing.h"      // timing module
24 #include "bigsim_record.h"
25
26 #include "bigsim_ooc.h" //out-of-core module
27 #include "bigsim_debug.h"
28
29 //#define  DEBUGF(x)      //CmiPrintf x;
30
31 #undef DEBUGLEVEL
32 #define DEBUGLEVEL 10
33
34 const double CHARM_OVERHEAD = 0.5E-6;
35
36 /* node level variables */
37 CpvDeclare(nodeInfo*, nodeinfo);                /* represent a bluegene node */
38
39 /* thread level variables */
40 CtvDeclare(threadInfo *, threadinfo);   /* represent a bluegene thread */
41
42 CpvStaticDeclare(CthThread, mainThread);
43
44 /* BG machine parameter */
45 CpvDeclare(BGMach, bgMach);     /* BG machine size description */
46 CpvDeclare(int, numNodes);        /* number of bg nodes on this PE */
47
48 /* emulator node level variables */
49 CpvDeclare(SimState, simState);
50
51 CpvDeclare(int      , CthResumeBigSimThreadIdx);
52
53 static int arg_argc;
54 static char **arg_argv;
55
56 CmiNodeLock initLock;     // used for BnvInitialize
57
58 int _bgSize = 0;                        // short cut of blue gene node size
59 int delayCheckFlag = 1;          // when enabled, only check correction 
60                                         // messages after some interval
61 int programExit = 0;
62
63 static int bgstats_flag = 0;            // flag print stats at end of simulation
64
65 // for debugging log
66 FILE *bgDebugLog;                       // for debugging
67
68 #ifdef CMK_ORIGIN2000
69 extern "C" int start_counters(int e0, int e1);
70 extern "C" int read_counters(int e0, long long *c0, int e1, long long *c1);
71 inline double Count2Time(long long c) { return c*5.e-7; }
72 #elif CMK_HAS_COUNTER_PAPI
73 #include <papi.h>
74 int *papiEvents = NULL;
75 int numPapiEvents;
76 long_long *papiValues = NULL;
77 CmiUInt8 *total_papi_counters = NULL;
78 char **papi_counters_desc = NULL;
79 #define MAX_TOTAL_EVENTS 10
80 #endif
81
82
83 /****************************************************************************
84      little utility functions
85 ****************************************************************************/
86
87 char **BgGetArgv() { return arg_argv; }
88 int    BgGetArgc() { return arg_argc; }
89
90 /***************************************************************************
91      Implementation of the same counter interface currently used for the
92      Origin2000 using PAPI in the underlying layer.
93
94      Because of the difference in counter numbering, it is assumed that
95      the two counters desired are CYCLES and FLOPS and hence the numbers
96      e0 and e1 are ignored.
97
98      start_counters is also not implemented because PAPI does things in
99      a different manner.
100 ****************************************************************************/
101
102 #if CMK_HAS_COUNTER_PAPI
103 /*
104 CmiUInt8 total_ins = 0;
105 CmiUInt8 total_fps = 0;
106 CmiUInt8 total_l1_dcm = 0;
107 CmiUInt8 total_mem_rcy = 0;
108 */
109 int init_counters()
110 {
111     int retval = PAPI_library_init(PAPI_VER_CURRENT);
112
113     if (retval != PAPI_VER_CURRENT) { CmiAbort("PAPI library init error!"); } 
114
115         //a temporary
116         const char *eventDesc[MAX_TOTAL_EVENTS];
117         int lpapiEvents[MAX_TOTAL_EVENTS];
118         
119     numPapiEvents = 0;
120         
121 #define ADD_LOW_LEVEL_EVENT(e, edesc) \
122                 if(PAPI_query_event(e) == PAPI_OK){ \
123                         if(CmiMyPe()==0) printf("PAPI Info:> Event for %s added\n", edesc); \
124                         eventDesc[numPapiEvents] = edesc;       \
125                         lpapiEvents[numPapiEvents++] = e; \
126                 }
127                 
128     // PAPI high level API does not require explicit library intialization
129         eventDesc[numPapiEvents] ="cycles";
130     lpapiEvents[numPapiEvents++] = PAPI_TOT_CYC;
131         
132         
133     /*if (PAPI_query_event(PAPI_FP_INS) == PAPI_OK) {
134       if (CmiMyPe()== 0) printf("PAPI_FP_INS used\n");
135           eventDesc[numPapiEvents] = "floating point instructions";     
136       lpapiEvents[numPapiEvents++] = PAPI_FP_INS;         
137     } else {
138       if (CmiMyPe()== 0) printf("PAPI_TOT_INS used\n");
139           eventDesc[numPapiEvents] = "total instructions";      
140       lpapiEvents[numPapiEvents++] = PAPI_TOT_INS;
141     }
142     */
143         
144         //ADD_LOW_LEVEL_EVENT(PAPI_FP_INS, "floating point instructions");
145         ADD_LOW_LEVEL_EVENT(PAPI_TOT_INS, "total instructions");
146         //ADD_LOW_LEVEL_EVENT(PAPI_L1_DCM, "L1 cache misses");
147         ADD_LOW_LEVEL_EVENT(PAPI_L2_DCM, "L2 data cache misses");       
148         //ADD_LOW_LEVEL_EVENT(PAPI_MEM_RCY, "idle cycles waiting for memory reads");    
149         //ADD_LOW_LEVEL_EVENT(PAPI_TLB_DM, "Data TLB misses");
150         
151         if(numPapiEvents == 0){
152                 CmiAbort("No papi events are defined!\n");
153         }
154         
155         if(numPapiEvents >= MAX_TOTAL_EVENTS){
156                 CmiAbort("Exceed the pre-defined max number of papi events allowed!\n");
157         }
158         
159         papiEvents = new int[numPapiEvents];
160         papiValues = new long_long[numPapiEvents];
161         total_papi_counters = new CmiUInt8[numPapiEvents];
162         papi_counters_desc = new char *[numPapiEvents];
163         for(int i=0; i<numPapiEvents; i++){
164                 papiEvents[i] = lpapiEvents[i];
165                 total_papi_counters[i] = 0;
166                 CmiPrintf("%d: %s\n", i, eventDesc[i]);
167                 papi_counters_desc[i] = new char[strlen(eventDesc[i])+1];
168                 memcpy(papi_counters_desc[i], eventDesc[i], strlen(eventDesc[i]));
169                 papi_counters_desc[i][strlen(eventDesc[i])] = 0;
170         }
171         
172 /*
173     if (PAPI_query_event(PAPI_L1_DCM) == PAPI_OK) {
174       if (CmiMyPe()== 0) printf("PAPI_L1_DCM used\n");
175       papiEvents[numPapiEvents++] = PAPI_L1_DCM;   // L1 cache miss
176     }
177     if (PAPI_query_event(PAPI_TLB_DM) == PAPI_OK) {
178       if (CmiMyPe()== 0) printf("PAPI_TLB_DM used\n");
179       papiEvents[numPapiEvents++] = PAPI_TLB_DM;   // data TLB misses
180     }
181     if (PAPI_query_event(PAPI_MEM_RCY) == PAPI_OK) {
182       if (CmiMyPe()== 0) printf("PAPI_MEM_RCY used\n");
183       papiEvents[numPapiEvents++] = PAPI_MEM_RCY;  // idle cycle waiting for reads
184     }
185 */
186
187     int status = PAPI_start_counters(papiEvents, numPapiEvents);
188     if (status != PAPI_OK) {
189       CmiPrintf("PAPI_start_counters error code: %d\n", status);
190       switch (status) {
191       case PAPI_ENOEVNT:  CmiPrintf("PAPI Error> Hardware Event does not exist\n"); break;
192       case PAPI_ECNFLCT:  CmiPrintf("PAPI Error> Hardware Event exists, but cannot be counted due to counter resource limitations\n"); break;
193       }
194       CmiAbort("Unable to start PAPI counters!\n");
195     }
196 }
197
198 int read_counters(long_long *papiValues, int n) 
199 {
200   // PAPI_read_counters resets the counter, hence it behaves like the perfctr
201   // code for Origin2000
202   int status;
203   status = PAPI_read_counters(papiValues, n);
204   if (status != PAPI_OK) {
205     CmiPrintf("PAPI_read_counters error: %d\n", status);
206     CmiAbort("Failed to read PAPI counters!\n");
207   }
208   
209   /*
210   total_ins += papiValues[0];
211   total_fps += papiValues[1];
212   total_l1_dcm += papiValues[2];
213   total_mem_rcy += papiValues[3];
214   */
215   
216   return 0;
217 }
218
219 inline void CountPapiEvents(){
220   for(int i=0 ;i<numPapiEvents; i++)
221         total_papi_counters[i] += papiValues[i];
222 }
223
224 inline double Count2Time(long_long *papiValues, int n) { 
225   return papiValues[1]*cva(bgMach).fpfactor; 
226 }
227 #endif
228
229 /*****************************************************************************
230      Handler Table, one per thread
231 ****************************************************************************/
232
233 extern "C" void defaultBgHandler(char *null, void *uPtr)
234 {
235   CmiAbort("BG> Invalid Handler called!\n");
236 }
237
238 HandlerTable::HandlerTable()
239 {
240     handlerTableCount = 1;
241     handlerTable = new BgHandlerInfo [MAX_HANDLERS];
242     for (int i=0; i<MAX_HANDLERS; i++) {
243       handlerTable[i].fnPtr = defaultBgHandler;
244       handlerTable[i].userPtr = NULL;
245     }
246 }
247
248 inline int HandlerTable::registerHandler(BgHandler h)
249 {
250     ASSERT(!cva(simState).inEmulatorInit);
251     /* leave 0 as blank, so it can report error luckily */
252     int cur = handlerTableCount++;
253     if (cur >= MAX_HANDLERS)
254       CmiAbort("BG> HandlerID exceed the maximum.\n");
255     handlerTable[cur].fnPtr = (BgHandlerEx)h;
256     handlerTable[cur].userPtr = NULL;
257     return cur;
258 }
259
260 inline void HandlerTable::numberHandler(int idx, BgHandler h)
261 {
262     ASSERT(!cva(simState).inEmulatorInit);
263     if (idx >= handlerTableCount || idx < 1)
264       CmiAbort("BG> HandlerID exceed the maximum!\n");
265     handlerTable[idx].fnPtr = (BgHandlerEx)h;
266     handlerTable[idx].userPtr = NULL;
267 }
268
269 inline void HandlerTable::numberHandlerEx(int idx, BgHandlerEx h, void *uPtr)
270 {
271     ASSERT(!cva(simState).inEmulatorInit);
272     if (idx >= handlerTableCount || idx < 1)
273       CmiAbort("BG> HandlerID exceed the maximum!\n");
274     handlerTable[idx].fnPtr = h;
275     handlerTable[idx].userPtr = uPtr;
276 }
277
278 inline BgHandlerInfo * HandlerTable::getHandle(int handler)
279 {
280 #if 0
281     if (handler >= handlerTableCount) {
282       CmiPrintf("[%d] handler: %d handlerTableCount:%d. \n", tMYNODEID, handler, handlerTableCount);
283       CmiAbort("Invalid handler!");
284     }
285 #endif
286     if (handler >= handlerTableCount || handler<0) return NULL;
287     return &handlerTable[handler];
288 }
289
290 /*****************************************************************************
291       low level API
292 *****************************************************************************/
293
294 int BgRegisterHandler(BgHandler h)
295 {
296   ASSERT(!cva(simState).inEmulatorInit);
297   int cur;
298 #if CMK_BLUEGENE_NODE
299   return tMYNODE->handlerTable.registerHandler(h);
300 #else
301   if (tTHREADTYPE == COMM_THREAD) {
302     return tMYNODE->handlerTable.registerHandler(h);
303   }
304   else {
305     return tHANDLETAB.registerHandler(h);
306   }
307 #endif
308 }
309
310 void BgNumberHandler(int idx, BgHandler h)
311 {
312   ASSERT(!cva(simState).inEmulatorInit);
313 #if CMK_BLUEGENE_NODE
314   tMYNODE->handlerTable.numberHandler(idx,h);
315 #else
316   if (tTHREADTYPE == COMM_THREAD) {
317     tMYNODE->handlerTable.numberHandler(idx, h);
318   }
319   else {
320     tHANDLETAB.numberHandler(idx, h);
321   }
322 #endif
323 }
324
325 void BgNumberHandlerEx(int idx, BgHandlerEx h, void *uPtr)
326 {
327   ASSERT(!cva(simState).inEmulatorInit);
328 #if CMK_BLUEGENE_NODE
329   tMYNODE->handlerTable.numberHandlerEx(idx,h,uPtr);
330 #else
331   if (tTHREADTYPE == COMM_THREAD) {
332     tMYNODE->handlerTable.numberHandlerEx(idx,h,uPtr);
333   }
334   else {
335     tHANDLETAB.numberHandlerEx(idx,h,uPtr);
336   }
337 #endif
338 }
339
340 /*****************************************************************************
341       BG Timing Functions
342 *****************************************************************************/
343
344 void resetVTime()
345 {
346   /* reset start time */
347   int timingMethod = cva(bgMach).timingMethod;
348   if (timingMethod == BG_WALLTIME) {
349     double ct = BG_TIMER();
350     if (tTIMERON) CmiAssert(ct >= tSTARTTIME);
351     tSTARTTIME = ct;
352   }
353   else if (timingMethod == BG_ELAPSE)
354     tSTARTTIME = tCURRTIME;
355 #ifdef CMK_ORIGIN2000
356   else if (timingMethod == BG_COUNTER) {
357     if (start_counters(0, 21) <0) {
358       perror("start_counters");;
359     }
360   }
361 #elif CMK_HAS_COUNTER_PAPI
362   else if (timingMethod == BG_COUNTER) {
363     // do a fake read to reset the counters. It would be more efficient
364     // to use the low level API, but that would be a lot more code to
365     // write for now.
366     if (read_counters(papiValues, numPapiEvents) < 0) perror("read_counters");
367   }
368 #endif
369 }
370
371 void startVTimer()
372 {
373   CmiAssert(tTIMERON == 0);
374   resetVTime();
375   tTIMERON = 1;
376 }
377
378 // should be used only when BG_WALLTIME
379 static inline void advanceTime(double inc)
380 {
381   if (BG_ABS(inc) < 1e-10) inc = 0.0;    // ignore floating point errors
382   if (inc < 0.0) return;
383   CmiAssert(inc>=0.0);
384   inc *= cva(bgMach).cpufactor;
385   tCURRTIME += inc;
386   CmiAssert(tTIMERON==1);
387 }
388
389 void stopVTimer()
390 {
391   int k;
392 #if 0
393   if (tTIMERON != 1) {
394     CmiAbort("stopVTimer called without startVTimer!\n");
395   }
396   CmiAssert(tTIMERON == 1);
397 #else
398   if (tTIMERON == 0) return;         // already stopped
399 #endif
400   const int timingMethod = cva(bgMach).timingMethod;
401   if (timingMethod == BG_WALLTIME) {
402     const double tp = BG_TIMER();
403     double inc = tp-tSTARTTIME;
404     advanceTime(inc-cva(bgMach).timercost);
405 //    tSTARTTIME = BG_TIMER();  // skip the above time
406   }
407   else if (timingMethod == BG_ELAPSE) {
408     // if no bgelapse called, assume it takes 1us
409     if (tCURRTIME-tSTARTTIME < 1E-9) {
410 //      tCURRTIME += 1e-6;
411     }
412   }
413   else if (timingMethod == BG_COUNTER)  {
414 #if CMK_ORIGIN2000
415     long long c0, c1;
416     if (read_counters(0, &c0, 21, &c1) < 0) perror("read_counters");
417     tCURRTIME += Count2Time(c1);
418 #elif CMK_HAS_COUNTER_PAPI
419     if (read_counters(papiValues, numPapiEvents) < 0) perror("read_counters");
420     CountPapiEvents();
421     tCURRTIME += Count2Time(papiValues, numPapiEvents);
422 #endif
423   }
424   tTIMERON = 0;
425 }
426
427 double BgGetTime()
428 {
429 #if 1
430   const int timingMethod = cva(bgMach).timingMethod;
431   if (timingMethod == BG_WALLTIME) {
432     /* accumulate time since last starttime, and reset starttime */
433     if (tTIMERON) {
434       const double tp2= BG_TIMER();
435       double &startTime = tSTARTTIME;
436       double inc = tp2 - startTime;
437       advanceTime(inc-cva(bgMach).timercost);
438       startTime = BG_TIMER();
439     }
440     return tCURRTIME;
441   }
442   else if (timingMethod == BG_ELAPSE) {
443     return tCURRTIME;
444   }
445   else if (timingMethod == BG_COUNTER) {
446     if (tTIMERON) {
447 #if CMK_ORIGIN2000
448       long long c0, c1;
449       if (read_counters(0, &c0, 21, &c1) <0) perror("read_counters");;
450       tCURRTIME += Count2Time(c1);
451       if (start_counters(0, 21)<0) perror("start_counters");;
452 #elif CMK_HAS_COUNTER_PAPI
453     if (read_counters(papiValues, numPapiEvents) < 0) perror("read_counters");
454     tCURRTIME += Count2Time(papiValues, numPapiEvents);
455 #endif
456     }
457     return tCURRTIME;
458   }
459   else 
460     CmiAbort("Unknown Timing Method.");
461   return -1;
462 #else
463   /* sometime I am interested in real wall time */
464   tCURRTIME = CmiWallTimer();
465   return tCURRTIME;
466 #endif
467 }
468
469 // moved to blue_logs.C
470 double BgGetCurTime()
471 {
472   ASSERT(tTHREADTYPE == WORK_THREAD);
473   return tCURRTIME;
474 }
475
476 extern "C" 
477 void BgElapse(double t)
478 {
479 //  ASSERT(tTHREADTYPE == WORK_THREAD);
480   if (cva(bgMach).timingMethod == BG_ELAPSE)
481     tCURRTIME += t;
482 }
483
484 // advance virtual timer no matter what scheme is used
485 extern "C" 
486 void BgAdvance(double t)
487 {
488 //  ASSERT(tTHREADTYPE == WORK_THREAD);
489   tCURRTIME += t;
490 }
491
492 /* BG API Func
493  * called by a communication thread to test if poll data 
494  * in the node's INBUFFER for its own queue 
495  */
496 char * getFullBuffer()
497 {
498   /* I must be a communication thread */
499   if (tTHREADTYPE != COMM_THREAD) 
500     CmiAbort("GetFullBuffer called by a non-communication thread!\n");
501
502   return tMYNODE->getFullBuffer();
503 }
504
505 /**  BG API Func
506  * called by a Converse handler or sendPacket()
507  * add message msgPtr to a bluegene node's inbuffer queue 
508  */
509 void addBgNodeInbuffer(char *msgPtr, int lnodeID)
510 {
511 #ifndef CMK_OPTIMIZE
512   if (lnodeID >= cva(numNodes)) CmiAbort("NodeID is out of range!");
513 #endif
514   nodeInfo &nInfo = cva(nodeinfo)[lnodeID];
515
516   //printf("Adding a msg %p to local node %d and its thread %d\n", msgPtr, lnodeID, CmiBgMsgThreadID(msgPtr));  
517         
518   nInfo.addBgNodeInbuffer(msgPtr);
519 }
520 extern "C" void addBgNodeInbuffer_c(char *msgPtr, int lnodeID) {
521   addBgNodeInbuffer(msgPtr, lnodeID);
522 }
523
524 /** BG API Func 
525  *  called by a comm thread
526  *  add a message to a thread's affinity queue in same node 
527  */
528 void addBgThreadMessage(char *msgPtr, int threadID)
529 {
530 #ifndef CMK_OPTIMIZE
531   if (!cva(bgMach).isWorkThread(threadID)) CmiAbort("ThreadID is out of range!");
532 #endif
533   workThreadInfo *tInfo = (workThreadInfo *)tMYNODE->threadinfo[threadID];
534   tInfo->addAffMessage(msgPtr);
535 }
536
537 /** BG API Func 
538  *  called by a comm thread, add a message to a node's non-affinity queue 
539  */
540 void addBgNodeMessage(char *msgPtr)
541 {
542   tMYNODE->addBgNodeMessage(msgPtr);
543 }
544
545 /** BG API Func 
546  *  check if inBuffer on this node has msg available
547  */
548 int checkReady()
549 {
550   if (tTHREADTYPE != COMM_THREAD)
551     CmiAbort("checkReady called by a non-communication thread!\n");
552   return !tINBUFFER.isEmpty();
553 }
554
555
556 /* handler to process the msg */
557 void msgHandlerFunc(char *msg)
558 {
559   /* bgmsg is CmiMsgHeaderSizeBytes offset of original message pointer */
560   int gnodeID = CmiBgMsgNodeID(msg);
561   if (gnodeID >= 0) {
562 #ifndef CMK_OPTIMIZE
563     if (nodeInfo::Global2PE(gnodeID) != CmiMyPe())
564       CmiAbort("msgHandlerFunc received wrong message!");
565 #endif
566     int lnodeID = nodeInfo::Global2Local(gnodeID);
567     if (cva(bgMach).inReplayMode()) {
568       int x, y, z;
569       BgGetXYZ(cva(bgMach).replay, &x, &y, &z);
570       if (nodeInfo::XYZ2Local(x,y,z) != lnodeID) return;
571       else lnodeID = 0;
572     }
573     addBgNodeInbuffer(msg, lnodeID);
574   }
575   else {
576     CmiAbort("Invalid message!");
577   }
578 }
579
580 /* Converse handler for node level broadcast message */
581 void nodeBCastMsgHandlerFunc(char *msg)
582 {
583   /* bgmsg is CmiMsgHeaderSizeBytes offset of original message pointer */
584   int gnodeID = CmiBgMsgNodeID(msg);
585   CmiInt2 threadID = CmiBgMsgThreadID(msg);
586   int lnodeID;
587
588   if (gnodeID < -1) {
589     gnodeID = - (gnodeID+100);
590     if (nodeInfo::Global2PE(gnodeID) == CmiMyPe())
591       lnodeID = nodeInfo::Global2Local(gnodeID);
592     else
593       lnodeID = -1;
594   }
595   else {
596     ASSERT(gnodeID == -1);
597     lnodeID = gnodeID;
598   }
599   // broadcast except lnodeID:threadId
600   int len = CmiBgMsgLength(msg);
601   int count = 0;
602   for (int i=0; i<cva(numNodes); i++)
603   {
604     if (i==lnodeID) continue;
605     char *dupmsg;
606     if (count == 0) dupmsg = msg;
607     else dupmsg = CmiCopyMsg(msg, len);
608     DEBUGF(("addBgNodeInbuffer to %d\n", i));
609     CmiBgMsgNodeID(dupmsg) = nodeInfo::Local2Global(i);         // updated
610     addBgNodeInbuffer(dupmsg, i);
611     count ++;
612   }
613   if (count == 0) CmiFree(msg);
614 }
615
616 // clone a msg, only has a valid header, plus a pointer to the real msg
617 char *BgCloneMsg(char *msg)
618 {
619   int size = CmiBlueGeneMsgHeaderSizeBytes + sizeof(char *);
620   char *dupmsg = (char *)CmiAlloc(size);
621   memcpy(dupmsg, msg, CmiBlueGeneMsgHeaderSizeBytes);
622   *(char **)(dupmsg + CmiBlueGeneMsgHeaderSizeBytes) = msg;
623   CmiBgMsgRefCount(msg) ++;
624   CmiBgMsgFlag(dupmsg) = BG_CLONE;
625   return dupmsg;
626 }
627
628 // expand the cloned msg to the full size msg
629 char *BgExpandMsg(char *msg)
630 {
631   char *origmsg = *(char **)(msg + CmiBlueGeneMsgHeaderSizeBytes);
632   int size = CmiBgMsgLength(origmsg);
633   char *dupmsg = (char *)CmiAlloc(size);
634   memcpy(dupmsg, msg, CmiBlueGeneMsgHeaderSizeBytes);
635   memcpy(dupmsg+CmiBlueGeneMsgHeaderSizeBytes, origmsg+CmiBlueGeneMsgHeaderSizeBytes, size-CmiBlueGeneMsgHeaderSizeBytes);
636   CmiFree(msg);
637   CmiBgMsgRefCount(origmsg) --;
638   if (CmiBgMsgRefCount(origmsg) == 0) CmiFree(origmsg);
639   return dupmsg;
640 }
641
642 /* Converse handler for thread level broadcast message */
643 void threadBCastMsgHandlerFunc(char *msg)
644 {
645   /* bgmsg is CmiMsgHeaderSizeBytes offset of original message pointer */
646   int gnodeID = CmiBgMsgNodeID(msg);
647   CmiInt2 threadID = CmiBgMsgThreadID(msg);
648   int lnodeID;
649   if (gnodeID < -1) {
650       gnodeID = - (gnodeID+100);
651       if (nodeInfo::Global2PE(gnodeID) == CmiMyPe())
652         lnodeID = nodeInfo::Global2Local(gnodeID);
653       else
654         lnodeID = -1;
655       CmiAssert(threadID != ANYTHREAD);
656   }
657   else {
658     ASSERT(gnodeID == -1);
659     lnodeID = gnodeID;
660   }
661   // broadcast except nodeID:threadId
662   int len = CmiBgMsgLength(msg);
663   // optimization needed if the message size is big
664   // making duplications can easily run out of memory
665   int bigOpt = (len > 4096);
666   for (int i=0; i<cva(numNodes); i++)
667   {
668       for (int j=0; j<cva(bgMach).numWth; j++) {
669         if (i==lnodeID && j==threadID) continue;
670         // for big message, clone a message token instead of a real msg
671         char *dupmsg = bigOpt? BgCloneMsg(msg) : CmiCopyMsg(msg, len);
672         CmiBgMsgNodeID(dupmsg) = nodeInfo::Local2Global(i);
673         CmiBgMsgThreadID(dupmsg) = j;
674         DEBUGF(("[%d] addBgNodeInbuffer to %d tid:%d\n", CmiMyPe(), i, j));
675         addBgNodeInbuffer(dupmsg, i);
676       }
677   }
678   // for big message, will free after all tokens are done
679   if (!bigOpt) CmiFree(msg);
680 }
681
682 /**
683  *              BG Messaging Functions
684  */
685
686 static inline double MSGTIME(int ox, int oy, int oz, int nx, int ny, int nz, int bytes)
687 {
688   return cva(bgMach).network->latency(ox, oy, oz, nx, ny, nz, bytes);
689 }
690
691 /**
692  *   a simple message streaming on demand and special purpose
693  *   user call  BgStartStreaming() and BgEndStreaming()
694  *   each worker thread call one send, and all sends are sent
695  *   via multiplesend at the end
696  */
697
698 static int bg_streaming = 0;
699
700 class BgStreaming {
701 public:
702   char **streamingMsgs;
703   int  *streamingMsgSizes;
704   int count;
705   int totalWorker;
706   int pe;
707 public:
708   BgStreaming() {
709     streamingMsgs = NULL;
710     streamingMsgSizes = NULL;
711     count = 0;
712     totalWorker = 0;
713     pe = -1;
714   }
715   ~BgStreaming() {
716     if (streamingMsgs) {
717       delete [] streamingMsgs;
718       delete [] streamingMsgSizes;
719     }
720   }
721   void init(int nNodes) {
722     totalWorker = nNodes * BgGetNumWorkThread();
723     streamingMsgs = new char *[totalWorker];
724     streamingMsgSizes = new int [totalWorker];
725   }
726   void depositMsg(int p, int size, char *m) {
727     streamingMsgs[count] = m;
728     streamingMsgSizes[count] = size;
729     count ++;
730     if (pe == -1) pe = p;
731     else CmiAssert(pe == p);
732     if (count == totalWorker) {
733       // CkPrintf("streaming send\n");
734       CmiMultipleSend(pe, count, streamingMsgSizes, streamingMsgs);
735       for (int i=0; i<count; i++) CmiFree(streamingMsgs[i]);
736       pe = -1;
737       count = 0;
738     }
739   }
740 };
741
742 BgStreaming bgstreaming;
743
744 void BgStartStreaming()
745 {
746   bg_streaming = 1;
747 }
748
749 void BgEndStreaming()
750 {
751   bg_streaming = 0;
752 }
753
754 void CmiSendPacketWrapper(int pe, int msgSize,char *msg, int streaming)
755 {
756   if (streaming && pe != CmiMyPe())
757     bgstreaming.depositMsg(pe, msgSize, msg);
758   else
759     CmiSyncSendAndFree(pe, msgSize, msg);
760 }
761
762
763 void CmiSendPacket(int x, int y, int z, int msgSize,char *msg)
764 {
765 //  CmiSyncSendAndFree(nodeInfo::XYZ2RealPE(x,y,z),msgSize,(char *)msg);
766 #if !DELAY_SEND
767   const int pe = nodeInfo::XYZ2RealPE(x,y,z);
768   CmiSendPacketWrapper(pe, msgSize, msg, bg_streaming);
769 #else
770   if (!correctTimeLog) {
771     const int pe = nodeInfo::XYZ2RealPE(x,y,z);
772     CmiSendPacketWrapper(pe, msgSize, msg, bg_streaming);
773   }
774   // else messages are kept in the log (MsgEntry), and only will be sent
775   // after timing correction has done on that log.
776   // TODO: streaming has no effect if time correction is on.
777 #endif
778 }
779
780 /* send will copy data to msg buffer */
781 /* user data is not free'd in this routine, user can reuse the data ! */
782 void sendPacket_(nodeInfo *myNode, int x, int y, int z, int threadID, int handlerID, WorkType type, int numbytes, char* sendmsg, int local)
783 {
784   //CmiPrintStackTrace(0);
785   double sendT = BgGetTime();
786
787   double latency;
788   CmiSetHandler(sendmsg, cva(simState).msgHandler);
789   CmiBgMsgNodeID(sendmsg) = nodeInfo::XYZ2Global(x,y,z);
790   CmiBgMsgThreadID(sendmsg) = threadID;
791   CmiBgMsgHandle(sendmsg) = handlerID;
792   CmiBgMsgType(sendmsg) = type;
793   CmiBgMsgLength(sendmsg) = numbytes;
794   CmiBgMsgFlag(sendmsg) = 0;
795   CmiBgMsgRefCount(sendmsg) = 0;
796   if (local) {
797     if (correctTimeLog) BgAdvance(CHARM_OVERHEAD);
798     latency = 0.0;
799   }
800   else {
801     if (correctTimeLog) BgAdvance(cva(bgMach).network->alphacost());
802     latency = MSGTIME(myNode->x, myNode->y, myNode->z, x,y,z, numbytes);
803     CmiAssert(latency >= 0);
804   }
805   CmiBgMsgRecvTime(sendmsg) = latency + sendT;
806   
807   // timing
808   BG_ADDMSG(sendmsg, CmiBgMsgNodeID(sendmsg), threadID, sendT, local, 1);
809
810   //static int addCnt=1; //for debugging only
811   //DEBUGM(4, ("N[%d] add a msg (handler=%d | cnt=%d | len=%d | type=%d | node id:%d\n", BgMyNode(), handlerID, addCnt, numbytes, type, CmiBgMsgNodeID(sendmsg)));  
812   //addCnt++;
813
814   if (local){
815       /* Here local refers to the fact that msg is sent to the processor itself
816        * therefore, we just add this msg to the thread itself
817        */
818       //addBgThreadMessage(sendmsg,threadID);
819       addBgNodeInbuffer(sendmsg, myNode->id);
820   }    
821   else
822     CmiSendPacket(x, y, z, numbytes, sendmsg);
823
824   // bypassing send time
825   resetVTime();
826 }
827
828 /* broadcast will copy data to msg buffer */
829 static inline void nodeBroadcastPacketExcept_(int node, CmiInt2 threadID, int handlerID, WorkType type, int numbytes, char* sendmsg)
830 {
831   double sendT = BgGetTime();
832
833   nodeInfo *myNode = cta(threadinfo)->myNode;
834   CmiSetHandler(sendmsg, cva(simState).nBcastMsgHandler);
835   if (node >= 0)
836     CmiBgMsgNodeID(sendmsg) = -node-100;
837   else
838     CmiBgMsgNodeID(sendmsg) = node;
839   CmiBgMsgThreadID(sendmsg) = threadID; 
840   CmiBgMsgHandle(sendmsg) = handlerID;  
841   CmiBgMsgType(sendmsg) = type; 
842   CmiBgMsgLength(sendmsg) = numbytes;
843   CmiBgMsgFlag(sendmsg) = 0;
844   CmiBgMsgRefCount(sendmsg) = 0;
845   /* FIXME */
846   CmiBgMsgRecvTime(sendmsg) = MSGTIME(myNode->x, myNode->y, myNode->z, 0,0,0, numbytes) + sendT;
847
848   // timing
849   // FIXME
850   BG_ADDMSG(sendmsg, CmiBgMsgNodeID(sendmsg), threadID, sendT, 0, 1);
851
852   DEBUGF(("[%d]CmiSyncBroadcastAllAndFree node: %d\n", BgMyNode(), node));
853 #if DELAY_SEND
854   if (!correctTimeLog)
855 #endif
856   CmiSyncBroadcastAllAndFree(numbytes,sendmsg);
857
858   resetVTime();
859 }
860
861 /* broadcast will copy data to msg buffer */
862 static inline void threadBroadcastPacketExcept_(int node, CmiInt2 threadID, int handlerID, WorkType type, int numbytes, char* sendmsg)
863 {
864   CmiSetHandler(sendmsg, cva(simState).tBcastMsgHandler);       
865   if (node >= 0)
866     CmiBgMsgNodeID(sendmsg) = -node-100;
867   else
868     CmiBgMsgNodeID(sendmsg) = node;
869   CmiBgMsgThreadID(sendmsg) = threadID; 
870   CmiBgMsgHandle(sendmsg) = handlerID;  
871   CmiBgMsgType(sendmsg) = type; 
872   CmiBgMsgLength(sendmsg) = numbytes;
873   CmiBgMsgFlag(sendmsg) = 0;
874   CmiBgMsgRefCount(sendmsg) = 0;
875   /* FIXME */
876   if (correctTimeLog) BgAdvance(cva(bgMach).network->alphacost());
877   double sendT = BgGetTime();
878   CmiBgMsgRecvTime(sendmsg) = sendT;    
879
880   // timing
881 #if 0
882   if (node == BG_BROADCASTALL) {
883     for (int i=0; i<_bgSize; i++) {
884       for (int j=0; j<cva(numWth); j++) {
885         BG_ADDMSG(sendmsg, node);
886       }
887     }
888   }
889   else {
890     CmiAssert(node >= 0);
891     BG_ADDMSG(sendmsg, (node+100));
892   }
893 #else
894   // FIXME
895   BG_ADDMSG(sendmsg, CmiBgMsgNodeID(sendmsg), threadID, sendT, 0, 1);
896 #endif
897
898   DEBUGF(("[%d]CmiSyncBroadcastAllAndFree node: %d tid:%d recvT:%f\n", BgMyNode(), node, threadID, CmiBgMsgRecvTime(sendmsg)));
899 #if DELAY_SEND
900   if (!correctTimeLog)
901 #endif
902   CmiSyncBroadcastAllAndFree(numbytes,sendmsg);
903
904   resetVTime();
905 }
906
907
908 /* sendPacket to route */
909 /* this function can be called by any thread */
910 void BgSendNonLocalPacket(nodeInfo *myNode, int x, int y, int z, int threadID, int handlerID, WorkType type, int numbytes, char * data)
911 {
912   if (cva(bgMach).inReplayMode()) return;     // replay mode, no outgoing msg
913
914 #ifndef CMK_OPTIMIZE
915   if (x<0 || y<0 || z<0 || x>=cva(bgMach).x || y>=cva(bgMach).y || z>=cva(bgMach).z) {
916     CmiPrintf("Trying to send packet to a nonexisting node: (%d %d %d)!\n", x,y,z);
917     CmiAbort("Abort!\n");
918   }
919 #endif
920
921   sendPacket_(myNode, x, y, z, threadID, handlerID, type, numbytes, data, 0);
922 }
923
924 static void _BgSendLocalPacket(nodeInfo *myNode, int threadID, int handlerID, WorkType type, int numbytes, char * data)
925 {
926   sendPacket_(myNode, myNode->x, myNode->y, myNode->z, threadID, handlerID, type, numbytes, data, 1);
927 }
928
929 void BgSendLocalPacket(int threadID, int handlerID, WorkType type,
930                        int numbytes, char* data)
931 {
932   nodeInfo *myNode = cta(threadinfo)->myNode;
933
934   if (cva(bgMach).inReplayMode()) threadID = 0;    // replay mode
935
936   _BgSendLocalPacket(myNode, threadID, handlerID, type, numbytes, data);
937 }
938
939 /* wrapper of the previous two functions */
940 void BgSendPacket(int x, int y, int z, int threadID, int handlerID, WorkType type, int numbytes, char * data)
941 {
942   nodeInfo *myNode = cta(threadinfo)->myNode;
943   if (myNode->x == x && myNode->y == y && myNode->z == z)
944     _BgSendLocalPacket(myNode,threadID, handlerID, type, numbytes, data);
945   else
946     BgSendNonLocalPacket(myNode,x,y,z,threadID,handlerID, type, numbytes, data);
947 }
948
949 void BgBroadcastPacketExcept(int node, CmiInt2 threadID, int handlerID, WorkType type, int numbytes, char * data)
950 {
951   nodeBroadcastPacketExcept_(node, threadID, handlerID, type, numbytes, data);
952 }
953
954 void BgBroadcastAllPacket(int handlerID, WorkType type, int numbytes, char * data)
955 {
956   nodeBroadcastPacketExcept_(BG_BROADCASTALL, ANYTHREAD, handlerID, type, numbytes, data);
957 }
958
959 void BgThreadBroadcastPacketExcept(int node, CmiInt2 threadID, int handlerID, WorkType type, int numbytes, char * data)
960 {
961   if (cva(bgMach).inReplayMode()) return;    // replay mode
962   threadBroadcastPacketExcept_(node, threadID, handlerID, type, numbytes, data);
963 }
964
965 void BgThreadBroadcastAllPacket(int handlerID, WorkType type, int numbytes, char * data)
966 {
967   if (cva(bgMach).inReplayMode()) {      // replay mode, send only to itself
968     BgSendLocalPacket(ANYTHREAD, handlerID, type, numbytes, data);
969     return;
970   }
971   threadBroadcastPacketExcept_(BG_BROADCASTALL, ANYTHREAD, handlerID, type, numbytes, data);
972 }
973
974 /**
975  send a msg to a list of processors (processors represented in global seq #
976 */
977 void BgSyncListSend(int npes, int *pes, int handlerID, WorkType type, int numbytes, char *msg)
978 {
979   nodeInfo *myNode = cta(threadinfo)->myNode;
980
981   CmiSetHandler(msg, cva(simState).msgHandler);
982   CmiBgMsgHandle(msg) = handlerID;
983   CmiBgMsgType(msg) = type;
984   CmiBgMsgLength(msg) = numbytes;
985   CmiBgMsgFlag(msg) = 0;
986   CmiBgMsgRefCount(msg) = 0;
987
988   if (correctTimeLog) BgAdvance(cva(bgMach).network->alphacost());
989
990   double now = BgGetTime();
991
992   // send one by one
993   for (int i=0; i<npes; i++)
994   {
995     int local = 0;
996     int x,y,z,t;
997     int pe = pes[i];
998 #if CMK_BLUEGENE_NODE
999     CmiAbort("Not implemented yet!");
1000 #else
1001     t = pe%BgGetNumWorkThread();
1002     pe = pe/BgGetNumWorkThread();
1003     BgGetXYZ(pe, &x, &y, &z);
1004 #endif
1005
1006     char *sendmsg = CmiCopyMsg(msg, numbytes);
1007     CmiBgMsgNodeID(sendmsg) = nodeInfo::XYZ2Global(x,y,z);
1008     CmiBgMsgThreadID(sendmsg) = t;
1009     double latency = MSGTIME(myNode->x, myNode->y, myNode->z, x,y,z, numbytes);
1010     CmiAssert(latency >= 0);
1011     CmiBgMsgRecvTime(sendmsg) = latency + now;
1012
1013     if (myNode->x == x && myNode->y == y && myNode->z == z) local = 1;
1014
1015     // timing and make sure all msgID are the same
1016     if (i!=0) CpvAccess(msgCounter) --;
1017     BG_ADDMSG(sendmsg, CmiBgMsgNodeID(sendmsg), t, now, local, i==0?npes:-1);
1018
1019     if (myNode->x == x && myNode->y == y && myNode->z == z)
1020       addBgNodeInbuffer(sendmsg, myNode->id);
1021     else
1022       CmiSendPacket(x, y, z, numbytes, sendmsg);
1023   }
1024
1025   CmiFree(msg);
1026
1027   resetVTime();
1028 }
1029
1030 /*****************************************************************************
1031       BG node level API - utilities
1032 *****************************************************************************/
1033
1034 /* must be called in a communication or worker thread */
1035 void BgGetMyXYZ(int *x, int *y, int *z)
1036 {
1037   ASSERT(!cva(simState).inEmulatorInit);
1038   *x = tMYX; *y = tMYY; *z = tMYZ;
1039 }
1040
1041 void BgGetXYZ(int seq, int *x, int *y, int *z)
1042 {
1043   nodeInfo::Global2XYZ(seq, x, y, z);
1044 }
1045
1046 void BgGetSize(int *sx, int *sy, int *sz)
1047 {
1048   cva(bgMach).getSize(sx, sy, sz);
1049 }
1050
1051 int BgTraceProjectionOn(int pe)
1052 {
1053   return cva(bgMach).traceProejctions(pe);
1054 }
1055
1056 /* return the total number of Blue gene nodes */
1057 int BgNumNodes()
1058 {
1059   return _bgSize;
1060 }
1061
1062 void BgSetNumNodes(int x)
1063 {
1064   _bgSize = x;
1065 }
1066
1067 /* can only called in emulatorinit */
1068 void BgSetSize(int sx, int sy, int sz)
1069 {
1070   ASSERT(cva(simState).inEmulatorInit);
1071   cva(bgMach).setSize(sx, sy, sz);
1072 }
1073
1074 /* return number of bg nodes on this emulator node */
1075 int BgNodeSize()
1076 {
1077   ASSERT(!cva(simState).inEmulatorInit);
1078   return cva(numNodes);
1079 }
1080
1081 /* return the bg node ID (local array index) */
1082 int BgMyRank()
1083 {
1084 #ifndef CMK_OPTIMIZE
1085   if (tMYNODE == NULL) CmiAbort("Calling BgMyRank in the main thread!");
1086 #endif
1087   ASSERT(!cva(simState).inEmulatorInit);
1088   return tMYNODEID;
1089 }
1090
1091 /* return my serialed blue gene node number */
1092 int BgMyNode()
1093 {
1094 #ifndef CMK_OPTIMIZE
1095   if (tMYNODE == NULL) CmiAbort("Calling BgMyNode in the main thread!");
1096 #endif
1097   return nodeInfo::XYZ2Global(tMYX, tMYY, tMYZ);
1098 }
1099
1100 /* return a real processor number from a bg node */
1101 int BgNodeToRealPE(int node)
1102 {
1103   return nodeInfo::Global2PE(node);
1104 }
1105
1106 // thread ID on a BG node
1107 int BgGetThreadID()
1108 {
1109   ASSERT(tTHREADTYPE == WORK_THREAD || tTHREADTYPE == COMM_THREAD);
1110 //  if (cva(bgMach).numWth == 1) return 0;   // accessing ctv is expensive
1111   return tMYID;
1112 }
1113
1114 void BgSetThreadID(int x)
1115 {
1116   tMYID = x;
1117 }
1118
1119 int BgGetGlobalThreadID()
1120 {
1121   ASSERT(tTHREADTYPE == WORK_THREAD || tTHREADTYPE == COMM_THREAD);
1122   return nodeInfo::Local2Global(tMYNODEID)*(cva(bgMach).numTh())+tMYID;
1123   //return tMYGLOBALID;
1124 }
1125
1126 int BgGetGlobalWorkerThreadID()
1127 {
1128   ASSERT(tTHREADTYPE == WORK_THREAD);
1129 //  return nodeInfo::Local2Global(tMYNODEID)*cva(bgMach).numWth+tMYID;
1130   return tMYGLOBALID;
1131 }
1132
1133 void BgSetGlobalWorkerThreadID(int pe)
1134 {
1135   ASSERT(tTHREADTYPE == WORK_THREAD);
1136 //  return nodeInfo::Local2Global(tMYNODEID)*cva(bgMach).numWth+tMYID;
1137   tMYGLOBALID = pe;
1138 }
1139
1140 char *BgGetNodeData()
1141 {
1142   return tUSERDATA;
1143 }
1144
1145 void BgSetNodeData(char *data)
1146 {
1147   ASSERT(!cva(simState).inEmulatorInit);
1148   tUSERDATA = data;
1149 }
1150
1151 int BgGetNumWorkThread()
1152 {
1153   return cva(bgMach).numWth;
1154 }
1155
1156 void BgSetNumWorkThread(int num)
1157 {
1158   if (!cva(bgMach).inReplayMode()) ASSERT(cva(simState).inEmulatorInit);
1159   cva(bgMach).numWth = num;
1160 }
1161
1162 int BgGetNumCommThread()
1163 {
1164   return cva(bgMach).numCth;
1165 }
1166
1167 void BgSetNumCommThread(int num)
1168 {
1169   ASSERT(cva(simState).inEmulatorInit);
1170   cva(bgMach).numCth = num;
1171 }
1172
1173 /*****************************************************************************
1174       Communication and Worker threads
1175 *****************************************************************************/
1176
1177 BgStartHandler  workStartFunc = NULL;
1178
1179 void BgSetWorkerThreadStart(BgStartHandler f)
1180 {
1181   workStartFunc = f;
1182 }
1183
1184 extern "C" void CthResumeNormalThread(CthThreadToken* token);
1185
1186 // kernel function for processing a bluegene message
1187 void BgProcessMessageDefault(threadInfo *tinfo, char *msg)
1188 {
1189   DEBUGM(5, ("=====Begin of BgProcessing a msg on node[%d]=====\n", BgMyNode()));
1190   int handler = CmiBgMsgHandle(msg);
1191   //CmiPrintf("[%d] call handler %d\n", BgMyNode(), handler);
1192   CmiAssert(handler < 1000);
1193
1194   BgHandlerInfo *handInfo;
1195 #if  CMK_BLUEGENE_NODE
1196   HandlerTable hdlTbl = tMYNODE->handlerTable;
1197   handInfo = hdlTbl.getHandle(handler);
1198 #else
1199   HandlerTable hdlTbl = tHANDLETAB;
1200   handInfo = hdlTbl.getHandle(handler);
1201   if (handInfo == NULL) handInfo = tMYNODE->handlerTable.getHandle(handler);
1202 #endif
1203
1204   if (handInfo == NULL) {
1205     CmiPrintf("[%d] invalid handler: %d. \n", tMYNODEID, handler);
1206     CmiAbort("BgProcessMessage Failed!");
1207   }
1208   BgHandlerEx entryFunc = handInfo->fnPtr;
1209
1210   CmiSetHandler(msg, CmiBgMsgHandle(msg));
1211
1212   // optimization for broadcast messages:
1213   // if the msg is a broadcast token msg, expand it to a real msg
1214   if (CmiBgMsgFlag(msg) == BG_CLONE) {
1215     msg = BgExpandMsg(msg);
1216   }
1217
1218   if (tinfo->watcher) tinfo->watcher->record(msg);
1219
1220   // don't count thread overhead and timing overhead
1221   startVTimer();
1222
1223   DEBUGM(5, ("Executing function %p\n", entryFunc));    
1224
1225   entryFunc(msg, handInfo->userPtr);
1226
1227   stopVTimer();
1228
1229   DEBUGM(5, ("=====End of BgProcessing a msg on node[%d]=====\n\n", BgMyNode()));
1230 }
1231
1232 void  (*BgProcessMessage)(threadInfo *t, char *msg) = BgProcessMessageDefault;
1233
1234 void scheduleWorkerThread(char *msg)
1235 {
1236   CthThread tid = (CthThread)msg;
1237 //CmiPrintf("scheduleWorkerThread %p\n", tid);
1238   CthAwaken(tid);
1239 }
1240
1241 // thread entry
1242 // for both comm and work thread, virtual function
1243 void run_thread(threadInfo *tinfo)
1244 {
1245   /* set the thread-private threadinfo */
1246   cta(threadinfo) = tinfo;
1247   tinfo->run();
1248 }
1249
1250 /* should be done only once per bg node */
1251 void BgNodeInitialize(nodeInfo *ninfo)
1252 {
1253   CthThread t;
1254   int i;
1255
1256   /* this is here because I will put a message to node inbuffer */
1257   tCURRTIME = 0.0;
1258   tSTARTTIME = CmiWallTimer();
1259
1260   /* creat work threads */
1261   for (i=0; i< cva(bgMach).numWth; i++)
1262   {
1263     threadInfo *tinfo = ninfo->threadinfo[i];
1264     t = CthCreate((CthVoidFn)run_thread, tinfo, cva(bgMach).stacksize);
1265     if (t == NULL) CmiAbort("BG> Failed to create worker thread. \n");
1266     tinfo->setThread(t);
1267     /* put to thread table */
1268     tTHREADTABLE[tinfo->id] = t;
1269 #if BIGSIM_OUT_OF_CORE && BIGSIM_OOC_PREFETCH
1270     //initial scheduling points for workthreads
1271     if(bgUseOutOfCore) schedWorkThds->push((workThreadInfo *)tinfo);
1272 #endif
1273     CthAwaken(t);
1274   }
1275
1276   /* creat communication thread */
1277   for (i=0; i< cva(bgMach).numCth; i++)
1278   {
1279     threadInfo *tinfo = ninfo->threadinfo[i+cva(bgMach).numWth];
1280     t = CthCreate((CthVoidFn)run_thread, tinfo, cva(bgMach).stacksize);
1281     if (t == NULL) CmiAbort("BG> Failed to create communication thread. \n");
1282     tinfo->setThread(t);
1283     /* put to thread table */
1284     tTHREADTABLE[tinfo->id] = t;
1285     CthAwaken(t);
1286   }
1287
1288 }
1289
1290 static void beginExitHandlerFunc(void *msg);
1291 static void writeToDisk();
1292 static void sendCorrectionStats();
1293
1294 static CmiHandler exitHandlerFunc(char *msg)
1295 {
1296   // TODO: free memory before exit
1297   int i,j;
1298
1299   programExit = 2;
1300 #if BLUEGENE_TIMING
1301   // timing
1302   if (0)        // detail
1303   if (genTimeLog) {
1304     for (j=0; j<cva(numNodes); j++)
1305     for (i=0; i<cva(bgMach).numWth; i++) {
1306       BgTimeLine &log = cva(nodeinfo)[j].timelines[i].timeline; 
1307 //      BgPrintThreadTimeLine(nodeInfo::Local2Global(j), i, log);
1308       int x,y,z;
1309       nodeInfo::Local2XYZ(j, &x, &y, &z);
1310       BgWriteThreadTimeLine(arg_argv[0], x, y, z, i, log);
1311     }
1312
1313   }
1314 #endif
1315   if (genTimeLog) sendCorrectionStats();
1316
1317   if (genTimeLog) writeToDisk();
1318
1319 //  if (tTHREADTYPE == WORK_THREAD)
1320   {
1321   int origPe = -2;
1322   // close all tracing modules
1323   for (j=0; j<cva(numNodes); j++)
1324     for (i=0; i<cva(bgMach).numWth; i++) {
1325       int oldPe = CmiSwitchToPE(nodeInfo::Local2Global(j)*cva(bgMach).numWth+i);
1326       if (origPe == -2) origPe = oldPe;
1327       traceCharmClose();
1328 //      CmiSwitchToPE(oldPe);
1329       delete cva(nodeinfo)[j].threadinfo[i]->watcher;   // force dump watcher
1330     }
1331     if (origPe!=-2) CmiSwitchToPE(origPe);
1332   }
1333
1334 #if 0
1335   delete [] cva(nodeinfo);
1336   delete [] cva(inBuffer);
1337   for (i=0; i<cva(numNodes); i++) CmmFree(cva(msgBuffer)[i]);
1338   delete [] cva(msgBuffer);
1339 #endif
1340
1341 #if CMK_HAS_COUNTER_PAPI
1342   if (cva(bgMach).timingMethod == BG_COUNTER) {
1343 /*        
1344   CmiPrintf("BG[PE %d]> cycles: %lld\n", CmiMyPe(), total_ins);
1345   CmiPrintf("BG[PE %d]> floating point instructions: %lld\n", CmiMyPe(), total_fps);
1346   CmiPrintf("BG[PE %d]> L1 cache misses: %lld\n", CmiMyPe(), total_l1_dcm);
1347   //CmiPrintf("BG[PE %d]> cycles stalled waiting for memory access: %lld\n", CmiMyPe(), total_mem_rcy);
1348  */
1349         for(int i=0; i<numPapiEvents; i++){
1350           CmiPrintf("BG[PE %d]> %s: %lld\n", CmiMyPe(), papi_counters_desc[i], total_papi_counters[i]);
1351            delete papi_counters_desc[i];
1352         }
1353         delete papiEvents;
1354         delete papiValues;
1355         delete papi_counters_desc;
1356         delete total_papi_counters;
1357   }
1358 #endif
1359
1360   //ConverseExit();
1361   if (genTimeLog)
1362     { if (CmiMyPe() != 0) CsdExitScheduler(); }
1363   else
1364     CsdExitScheduler();
1365
1366   //if (CmiMyPe() == 0) CmiPrintf("\nBG> BlueGene emulator shutdown gracefully!\n");
1367
1368   return 0;
1369 }
1370
1371 static void sanityCheck()
1372 {
1373   if (cva(bgMach).x==0 || cva(bgMach).y==0 || cva(bgMach).z==0)  {
1374     if (CmiMyPe() == 0)
1375       CmiPrintf("\nMissing parameters for BlueGene machine size!\n<tip> use command line options: +x, +y, or +z.\n");
1376     BgShutdown(); 
1377   } 
1378   else if (cva(bgMach).numCth==0 || cva(bgMach).numWth==0) { 
1379 #if 1
1380     if (cva(bgMach).numCth==0) cva(bgMach).numCth=1;
1381     if (cva(bgMach).numWth==0) cva(bgMach).numWth=1;
1382 #else
1383     if (CmiMyPe() == 0)
1384       CmiPrintf("\nMissing parameters for number of communication/worker threads!\n<tip> use command line options: +cth or +wth.\n");
1385     BgShutdown(); 
1386 #endif
1387   }
1388   if (cva(bgMach).getNodeSize()<CmiNumPes()) {
1389     CmiAbort("\nToo few BlueGene nodes!\n");
1390   }
1391 }
1392
1393 #undef CmiSwitchToPE
1394 extern "C" int CmiSwitchToPEFn(int pe);
1395
1396 // main
1397 CmiStartFn bgMain(int argc, char **argv)
1398 {
1399   int i;
1400   char *configFile = NULL;
1401
1402 #if CMK_CONDS_USE_SPECIAL_CODE
1403   // overwrite possible implementation in machine.c
1404   CmiSwitchToPE = CmiSwitchToPEFn;
1405 #endif
1406
1407   /* initialize all processor level data */
1408   CpvInitialize(BGMach,bgMach);
1409   cva(bgMach).nullify();
1410
1411   CmiArgGroup("Charm++","BlueGene Simulator");
1412   if (CmiGetArgStringDesc(argv, "+bgconfig", &configFile, "BlueGene machine config file")) {
1413    cva(bgMach). read(configFile);
1414   }
1415   CmiGetArgIntDesc(argv, "+x", &cva(bgMach).x, 
1416                 "The x size of the grid of nodes");
1417   CmiGetArgIntDesc(argv, "+y", &cva(bgMach).y, 
1418                 "The y size of the grid of nodes");
1419   CmiGetArgIntDesc(argv, "+z", &cva(bgMach).z, 
1420                 "The z size of the grid of nodes");
1421   CmiGetArgIntDesc(argv, "+cth", &cva(bgMach).numCth, 
1422                 "The number of simulated communication threads per node");
1423   CmiGetArgIntDesc(argv, "+wth", &cva(bgMach).numWth, 
1424                 "The number of simulated worker threads per node");
1425
1426   CmiGetArgIntDesc(argv, "+bgstacksize", &cva(bgMach).stacksize, 
1427                 "Blue Gene thread stack size");
1428
1429   if (CmiGetArgFlagDesc(argv, "+bglog", "Write events to log file"))
1430      genTimeLog = 1;
1431   if (CmiGetArgFlagDesc(argv, "+bgcorrect", "Apply timestamp correction to logs"))
1432     correctTimeLog = 1;
1433   schedule_flag = 0;
1434   if (correctTimeLog) {
1435     genTimeLog = 1;
1436     schedule_flag = 1;
1437   }
1438
1439   if (CmiGetArgFlagDesc(argv, "+bgverbose", "Print debug info verbosely"))
1440     bgverbose = 1;
1441
1442   // for timing method, default using elapse calls.
1443   if(CmiGetArgFlagDesc(argv, "+bgelapse", 
1444                        "Use user provided BgElapse for time prediction")) 
1445       cva(bgMach).timingMethod = BG_ELAPSE;
1446   if(CmiGetArgFlagDesc(argv, "+bgwalltime", 
1447                        "Use walltime method for time prediction")) 
1448       cva(bgMach).timingMethod = BG_WALLTIME;
1449 #ifdef CMK_ORIGIN2000
1450   if(CmiGetArgFlagDesc(argv, "+bgcounter", "Use performance counter")) 
1451       cva(bgMach).timingMethod = BG_COUNTER;
1452 #elif CMK_HAS_COUNTER_PAPI
1453   if (CmiGetArgFlagDesc(argv, "+bgpapi", "Use PAPI Performance counters")) {
1454     cva(bgMach).timingMethod = BG_COUNTER;
1455   }
1456   if (cva(bgMach).timingMethod == BG_COUNTER) {
1457     init_counters();
1458   }
1459 #endif
1460   CmiGetArgDoubleDesc(argv,"+bgfpfactor", &cva(bgMach).fpfactor, 
1461                       "floating point to time factor");
1462   CmiGetArgDoubleDesc(argv,"+bgcpufactor", &cva(bgMach).cpufactor, 
1463                       "scale factor for wallclock time measured");
1464   CmiGetArgDoubleDesc(argv,"+bgtimercost", &cva(bgMach).timercost, 
1465                       "timer cost");
1466   #if 0
1467   if(cva(bgMach).timingMethod == BG_WALLTIME)
1468   {
1469       int count = 1e6;
1470       double start, stop, diff, cost, dummy;
1471
1472       dummy = BG_TIMER(); // In case there's an initialization delay somewhere
1473
1474       start = BG_TIMER();
1475       for (int i = 0; i < count; ++i)
1476           dummy = BG_TIMER();
1477       stop = BG_TIMER();
1478
1479       diff = stop - start;
1480       cost = diff / count;
1481
1482       CmiPrintf("Measured timer cost: %g Actual: %g\n", 
1483                 cost, cva(bgMach).timercost);
1484       cva(bgMach).timercost = cost;
1485   }
1486   #endif
1487   
1488   char *networkModel;
1489   if (CmiGetArgStringDesc(argv, "+bgnetwork", &networkModel, "Network model")) {
1490    cva(bgMach).setNetworkModel(networkModel);
1491   }
1492
1493   bgcorroff = 0;
1494   if(CmiGetArgFlagDesc(argv, "+bgcorroff", "Start with correction off")) 
1495     bgcorroff = 1;
1496
1497   bgstats_flag=0;
1498   if(CmiGetArgFlagDesc(argv, "+bgstats", "Print correction statistics")) 
1499     bgstats_flag = 1;
1500
1501   if (CmiGetArgStringDesc(argv, "+bgtraceroot", &cva(bgMach).traceroot, "Directory to write bgTrace files to"))
1502   {
1503     char *root = (char*)malloc(strlen(cva(bgMach).traceroot) + 10);
1504     sprintf(root, "%s/", cva(bgMach).traceroot);
1505     cva(bgMach).traceroot = root;
1506   }
1507
1508   // record/replay
1509   if (CmiGetArgFlagDesc(argv,"+bgrecord","Record message processing order for BigSim"))
1510     cva(bgMach).record = 1;
1511   int replaype;
1512   if (CmiGetArgIntDesc(argv,"+bgreplay", &replaype, "Re-play message processing order for BigSim")) {
1513     cva(bgMach).replay = replaype;
1514   }
1515   char *procs = NULL;
1516   if (CmiGetArgStringDesc(argv, "+bgrecordprocessors", &procs, "A list of processors to record, e.g. 0,10,20-30")) {
1517     cva(bgMach).recordprocs.set(procs);
1518   }
1519
1520
1521   /* parameters related with out-of-core execution */
1522   int tmpcap=0;
1523   if (CmiGetArgIntDesc(argv, "+bgooccap", &tmpcap, "Simulate with out-of-core support and the number of target processors allowed in memory")){
1524      TBLCAPACITY = tmpcap;
1525     }
1526   if (CmiGetArgDoubleDesc(argv, "+bgooc", &bgOOCMaxMemSize, "Simulate with out-of-core support and the threshhold of memory size")){
1527       bgUseOutOfCore = 1;
1528       _BgInOutOfCoreMode = 1; //the global (the whole converse layer) out-of-core flag
1529
1530       double curFreeMem = bgGetSysFreeMemSize();
1531       if(fabs(bgOOCMaxMemSize - 0.0)<=1e-6){
1532         //using the memory available of the system right now
1533         //assuming no other programs will run after this program runs
1534         bgOOCMaxMemSize = curFreeMem;
1535         CmiPrintf("Using the system's current memory available: %.3fMB\n", bgOOCMaxMemSize);
1536       }
1537       if(bgOOCMaxMemSize > curFreeMem){
1538         CmiPrintf("Warning: not enough memory for the specified memory size, now use the current available memory %3.fMB.\n", curFreeMem);
1539         bgOOCMaxMemSize = curFreeMem;
1540       }
1541       DEBUGF(("out-of-core turned on!\n"));
1542   }      
1543
1544 #if BLUEGENE_DEBUG_LOG
1545   {
1546     char ln[200];
1547     sprintf(ln,"bgdebugLog.%d",CmiMyPe());
1548     bgDebugLog=fopen(ln,"w");
1549   }
1550 #endif
1551
1552   arg_argv = argv;
1553   arg_argc = CmiGetArgc(argv);
1554
1555   /* msg handler */
1556   CpvInitialize(SimState, simState);
1557   cva(simState).msgHandler = CmiRegisterHandler((CmiHandler) msgHandlerFunc);
1558   cva(simState).nBcastMsgHandler = CmiRegisterHandler((CmiHandler)nodeBCastMsgHandlerFunc);
1559   cva(simState).tBcastMsgHandler = CmiRegisterHandler((CmiHandler)threadBCastMsgHandlerFunc);
1560   cva(simState).exitHandler = CmiRegisterHandler((CmiHandler) exitHandlerFunc);
1561
1562   cva(simState).beginExitHandler = CmiRegisterHandler((CmiHandler) beginExitHandlerFunc);
1563   cva(simState).inEmulatorInit = 1;
1564   /* call user defined BgEmulatorInit */
1565   BgEmulatorInit(arg_argc, arg_argv);
1566   cva(simState).inEmulatorInit = 0;
1567
1568   /* check if all bluegene node size and thread information are set */
1569   sanityCheck();
1570
1571   _bgSize = cva(bgMach).getNodeSize(); 
1572
1573   timerFunc = BgGetTime;
1574
1575   BgInitTiming();               // timing module
1576
1577   if (CmiMyPe() == 0) {
1578     CmiPrintf("BG info> Simulating %dx%dx%d nodes with %d comm + %d work threads each.\n", cva(bgMach).x, cva(bgMach).y, cva(bgMach).z, cva(bgMach).numCth, cva(bgMach).numWth);
1579     CmiPrintf("BG info> Network type: %s.\n", cva(bgMach).network->name());
1580     cva(bgMach).network->print();
1581     CmiPrintf("BG info> cpufactor is %f.\n", cva(bgMach).cpufactor);
1582     CmiPrintf("BG info> floating point factor is %f.\n", cva(bgMach).fpfactor);
1583     if (cva(bgMach).stacksize)
1584       CmiPrintf("BG info> BG stack size: %d bytes. \n", cva(bgMach).stacksize);
1585     if (cva(bgMach).timingMethod == BG_ELAPSE) 
1586       CmiPrintf("BG info> Using BgElapse calls for timing method. \n");
1587     else if (cva(bgMach).timingMethod == BG_WALLTIME)
1588       CmiPrintf("BG info> Using WallTimer for timing method. \n");
1589     else if (cva(bgMach).timingMethod == BG_COUNTER)
1590       CmiPrintf("BG info> Using performance counter for timing method. \n");
1591     if (genTimeLog)
1592       CmiPrintf("BG info> Generating timing log. \n");
1593     if (correctTimeLog)
1594       CmiPrintf("BG info> Perform timestamp correction. \n");
1595     if (cva(bgMach).traceroot)
1596       CmiPrintf("BG info> bgTrace root is '%s'. \n", cva(bgMach).traceroot);
1597   }
1598
1599   CtvInitialize(threadInfo *, threadinfo);
1600
1601   /* number of bg nodes on this PE */
1602   CpvInitialize(int, numNodes);
1603   cva(numNodes) = nodeInfo::numLocalNodes();
1604
1605   if (CmiMyRank() == 0)
1606     initLock = CmiCreateLock();     // used for BnvInitialize
1607
1608   bgstreaming.init(cva(numNodes));
1609
1610   //Must initialize out-of-core related data structures before creating any BG nodes and procs
1611 if(bgUseOutOfCore){
1612       initTblThreadInMem();
1613           #if BIGSIM_OUT_OF_CORE && BIGSIM_OOC_PREFETCH
1614       //init prefetch status      
1615       thdsOOCPreStatus = new oocPrefetchStatus[cva(numNodes)*cva(bgMach).numWth];
1616       oocPrefetchSpace = new oocPrefetchBufSpace();
1617       schedWorkThds = new oocWorkThreadQueue();
1618           #endif
1619   }
1620
1621 #if BIGSIM_OUT_OF_CORE
1622   //initialize variables related to get precise
1623   //physical memory usage info for a process
1624   bgMemPageSize = getpagesize();
1625   memset(bgMemStsFile, 0, 25); 
1626   sprintf(bgMemStsFile, "/proc/%d/statm", getpid());
1627 #endif
1628
1629
1630   /* create BG nodes */
1631   CpvInitialize(nodeInfo *, nodeinfo);
1632   cva(nodeinfo) = new nodeInfo[cva(numNodes)];
1633   _MEMCHECK(cva(nodeinfo));
1634
1635   cta(threadinfo) = new threadInfo(-1, UNKNOWN_THREAD, NULL);
1636   _MEMCHECK(cta(threadinfo));
1637
1638   /* create BG processors for each node */
1639   for (i=0; i<cva(numNodes); i++)
1640   {
1641     nodeInfo *ninfo = cva(nodeinfo) + i;
1642     // create threads
1643     ninfo->initThreads(i);
1644
1645     /* pretend that I am a thread */
1646     cta(threadinfo)->myNode = ninfo;
1647
1648     /* initialize a BG node and fire all threads */
1649     BgNodeInitialize(ninfo);
1650   }
1651   // clear main thread.
1652   cta(threadinfo)->myNode = NULL;
1653   CpvInitialize(CthThread, mainThread);
1654   cva(mainThread) = CthSelf();
1655
1656   CpvInitialize(int, CthResumeBigSimThreadIdx);
1657
1658   cva(simState).simStartTime = CmiWallTimer();    
1659   return 0;
1660 }
1661
1662 // for conv-conds:
1663 // if -2 untouch
1664 // if -1 main thread
1665 #if CMK_BLUEGENE_THREAD
1666 extern "C" int CmiSwitchToPEFn(int pe)
1667 {
1668   if (pe == -2) return -2;
1669   int oldpe;
1670 //  ASSERT(tTHREADTYPE != COMM_THREAD);
1671   if (tMYNODE == NULL) oldpe = -1;
1672   else if (tTHREADTYPE == COMM_THREAD) oldpe = -BgGetThreadID();
1673   else if (tTHREADTYPE == WORK_THREAD) oldpe = BgGetGlobalWorkerThreadID();
1674   else oldpe = -1;
1675 //CmiPrintf("CmiSwitchToPE from %d to %d\n", oldpe, pe);
1676   if (pe == -1) {
1677     CthSwitchThread(cva(mainThread));
1678   }
1679   else if (pe < 0) {
1680   }
1681   else {
1682     if (cva(bgMach).inReplayMode()) pe = 0;         /* replay mode */
1683     int t = pe%cva(bgMach).numWth;
1684     int newpe = nodeInfo::Global2Local(pe/cva(bgMach).numWth);
1685     nodeInfo *ninfo = cva(nodeinfo) + newpe;;
1686     threadInfo *tinfo = ninfo->threadinfo[t];
1687     CthSwitchThread(tinfo->me);
1688   }
1689   return oldpe;
1690 }
1691 #else
1692 extern "C" int CmiSwitchToPEFn(int pe)
1693 {
1694   if (pe == -2) return -2;
1695   int oldpe;
1696   if (tMYNODE == NULL) oldpe = -1;
1697   else oldpe = BgMyNode();
1698   if (pe == -1) {
1699     cta(threadinfo)->myNode = NULL;
1700   }
1701   else {
1702     int newpe = nodeInfo::Global2Local(pe);
1703     cta(threadinfo)->myNode = cva(nodeinfo) + newpe;;
1704   }
1705   return oldpe;
1706 }
1707 #endif
1708
1709
1710 /*****************************************************************************
1711                         TimeLog correction
1712 *****************************************************************************/
1713
1714 extern void processCorrectionMsg(int nodeidx);
1715
1716 // return the msg pointer, and the index of the message in the affinity queue.
1717 static inline char* searchInAffinityQueue(int nodeidx, BgMsgID &msgId, CmiInt2 tID, int &index)
1718 {
1719   CmiAssert(tID != ANYTHREAD);
1720   ckMsgQueue &affinityQ = cva(nodeinfo)[nodeidx].affinityQ[tID];
1721   for (int i=0; i<affinityQ.length(); i++)  {
1722       char *msg = affinityQ[i];
1723       BgMsgID md = BgMsgID(CmiBgMsgSrcPe(msg), CmiBgMsgID(msg));
1724       if (msgId == md) {
1725         index = i;
1726         return msg;
1727       }
1728   }
1729   return NULL;
1730 }
1731
1732 // return the msg pointer, thread id and the index of the message in the affinity queue.
1733 static char* searchInAffinityQueueInNode(int nodeidx, BgMsgID &msgId, CmiInt2 &tID, int &index)
1734 {
1735   for (tID=0; tID<cva(bgMach).numWth; tID++) {
1736     char *msg = searchInAffinityQueue(nodeidx, msgId, tID, index);
1737     if (msg) return msg;
1738   }
1739   return NULL;
1740 }
1741
1742 StateCounters stateCounters;
1743
1744 int updateRealMsgs(bgCorrectionMsg *cm, int nodeidx)
1745 {
1746   char *msg;
1747   CmiInt2 tID = cm->tID;
1748   int index;
1749   if (tID == ANYTHREAD) {
1750     msg = searchInAffinityQueueInNode(nodeidx, cm->msgId, tID, index);
1751   }
1752   else {
1753     msg = searchInAffinityQueue(nodeidx, cm->msgId, tID, index);
1754   }
1755   if (msg == NULL) return 0;
1756
1757   ckMsgQueue &affinityQ = cva(nodeinfo)[nodeidx].affinityQ[tID];
1758   CmiBgMsgRecvTime(msg) = cm->tAdjust;
1759   affinityQ.update(index);
1760   CthThread tid = cva(nodeinfo)[nodeidx].threadTable[tID];
1761   unsigned int prio = (unsigned int)(cm->tAdjust*PRIO_FACTOR)+1;
1762   CthAwakenPrio(tid, CQS_QUEUEING_IFIFO, sizeof(int), &prio);
1763   stateCounters.corrMsgCRCnt++;
1764   return 1;       /* invalidate this msg */
1765 }
1766
1767 extern void processBufferCorrectionMsgs(void *ignored);
1768
1769 // Coverse handler for begin exit
1770 // flush and process all correction messages
1771 static void beginExitHandlerFunc(void *msg)
1772 {
1773   CmiFree(msg);
1774   delayCheckFlag = 0;
1775 //CmiPrintf("\n\n\nbeginExitHandlerFunc called on %d\n", CmiMyPe());
1776   programExit = 1;
1777 #if LIMITED_SEND
1778   CQdCreate(CpvAccess(cQdState), BgNodeSize());
1779 #endif
1780
1781 #if 1
1782   for (int i=0; i<BgNodeSize(); i++) 
1783     processCorrectionMsg(i); 
1784 #if USE_MULTISEND
1785   BgSendBufferedCorrMsgs();
1786 #endif
1787 #else
1788   // the msg queue should be empty now.
1789   // don't do insert adjustment, but start to do all timing correction from here
1790   int nodeidx, tID;
1791   for (nodeidx=0; nodeidx<BgNodeSize(); nodeidx++) {
1792     BgTimeLineRec *tlines = cva(nodeinfo)[nodeidx].timelines;
1793     for (tID=0; tID<cva(numWth); tID++) {
1794         BgTimeLineRec &tlinerec = tlines[tID];
1795         BgAdjustTimeLine(tlinerec, nodeidx, tID);
1796     }
1797   }
1798
1799 #endif
1800
1801 #if !THROTTLE_WORK
1802 #if DELAY_CHECK
1803   CcdCallFnAfter(processBufferCorrectionMsgs,NULL,CHECK_INTERVAL);
1804 #endif
1805 #endif
1806 }
1807
1808 #define HISTOGRAM_SIZE  100
1809 // compute total CPU utilization for each timeline and 
1810 // return the number of real msgs
1811 void computeUtilForAll(int* array, int *nReal)
1812 {
1813   double scale = 1.0e3;         // scale to ms
1814
1815   //We measure from 1ms to 5001 ms in steps of 100 ms
1816   int min = 0, step = 1;
1817   int max = min + HISTOGRAM_SIZE*step;
1818   // [min, max: step]  HISTOGRAM_SIZE slots
1819
1820   if (CmiMyPe()==0)
1821     CmiPrintf("computeUtilForAll: min:%d max:%d step:%d scale:%f.\n", min, max, step, scale);
1822   int size = (max-min)/step;
1823   CmiAssert(size == HISTOGRAM_SIZE);
1824   int allmin = -1, allmax = -1;
1825   for(int i=0;i<size;i++) array[i] = 0;
1826
1827   for (int nodeidx=0; nodeidx<cva(numNodes); nodeidx++) {
1828     BgTimeLineRec *tlinerec = cva(nodeinfo)[nodeidx].timelines;
1829     for (int tID=0; tID<cva(bgMach).numWth; tID++) {
1830       int util = (int)(scale*(tlinerec[tID].computeUtil(nReal)));
1831
1832       if (util >= max) { if (util>allmax||allmax==-1) allmax=util; util=max-1;}
1833       if (util < min) { if (util<allmin||allmin==-1) allmin=util; util=min; }
1834       array[(util-min)/step]++;
1835     }
1836   }
1837   if (allmin!=-1 || allmax!=-1)
1838     CmiPrintf("[%d] Warning: computeUtilForAll out of range %f - %f.\n", CmiMyPe(), (allmin==-1)?-1:allmin/scale, (allmax==-1)?-1:allmax/scale);
1839 }
1840
1841 class StatsMessage {
1842   char core[CmiBlueGeneMsgHeaderSizeBytes];
1843 public:
1844   int processCount;
1845   int corrMsgCount;
1846   int realMsgCount;
1847   int maxTimelineLen, minTimelineLen;
1848 };
1849
1850 extern int processCount, corrMsgCount;
1851
1852 static void sendCorrectionStats()
1853 {
1854   int msgSize = sizeof(StatsMessage)+sizeof(int)*HISTOGRAM_SIZE;
1855   StatsMessage *statsMsg = (StatsMessage *)CmiAlloc(msgSize);
1856   statsMsg->processCount = processCount;
1857   statsMsg->corrMsgCount = corrMsgCount;
1858   int numMsgs=0;
1859   int maxTimelineLen=-1, minTimelineLen=CMK_MAXINT;
1860   int totalMem = 0;
1861   if (bgstats_flag) {
1862   for (int nodeidx=0; nodeidx<cva(numNodes); nodeidx++) {
1863     BgTimeLineRec *tlines = cva(nodeinfo)[nodeidx].timelines;
1864     for (int tID=0; tID<cva(bgMach).numWth; tID++) {
1865         BgTimeLineRec &tlinerec = tlines[tID];
1866         int tlen = tlinerec.length();
1867         if (tlen>maxTimelineLen) maxTimelineLen=tlen;
1868         if (tlen<minTimelineLen) minTimelineLen=tlen;
1869         totalMem = tlen*sizeof(BgTimeLog);
1870 //CmiPrintf("[%d node:%d] BgTimeLog: %dK len:%d size of bglog: %d bytes\n", CmiMyPe(), nodeidx, totalMem/1000, tlen, sizeof(BgTimeLog));
1871 #if 0
1872         for (int i=0; i< tlinerec.length(); i++) {
1873           numMsgs += tlinerec[i]->msgs.length();
1874         }
1875 #endif
1876     }
1877   }
1878   computeUtilForAll((int*)(statsMsg+1), &numMsgs);
1879   statsMsg->realMsgCount = numMsgs;
1880   statsMsg->maxTimelineLen = maxTimelineLen;
1881   statsMsg->minTimelineLen = minTimelineLen;
1882   }  // end if
1883
1884   CmiSetHandler(statsMsg, cva(simState).bgStatCollectHandler);
1885   CmiSyncSendAndFree(0, msgSize, statsMsg);
1886 }
1887
1888 // Converse handler for collecting stats
1889 void statsCollectionHandlerFunc(void *msg)
1890 {
1891   static int count=0;
1892   static int pc=0, cc=0, realMsgCount=0;
1893   static int maxTimelineLen=0, minTimelineLen=CMK_MAXINT;
1894   static int *histArray = NULL;
1895   int i;
1896
1897   count++;
1898   if (histArray == NULL) {
1899     histArray = new int[HISTOGRAM_SIZE];
1900     for (i=0; i<HISTOGRAM_SIZE; i++) histArray[i]=0;
1901   }
1902   StatsMessage *m = (StatsMessage *)msg;
1903   pc += m->processCount;
1904   cc += m->corrMsgCount;
1905   realMsgCount += m->realMsgCount;
1906   if (minTimelineLen> m->minTimelineLen) minTimelineLen=m->minTimelineLen;
1907   if (maxTimelineLen< m->maxTimelineLen) maxTimelineLen=m->maxTimelineLen;
1908   int *array = (int *)(m+1);
1909   for (i=0; i<HISTOGRAM_SIZE; i++) histArray[i] += array[i];
1910   if (count == CmiNumPes()) {
1911     if (bgstats_flag) {
1912       CmiPrintf("Total procCount:%d corrMsgCount:%d realMsg:%d timeline:%d-%d\n", pc, cc, realMsgCount, minTimelineLen, maxTimelineLen);
1913       for (i=0; i<HISTOGRAM_SIZE; i++) {
1914         CmiPrintf("%d ", histArray[i]);
1915         if (i%20 == 19) CmiPrintf("\n");
1916       }
1917       CmiPrintf("\n");
1918     }
1919     CsdExitScheduler();
1920   }
1921   CmiFree(msg);
1922 }
1923
1924 // update arrival time from buffer messages
1925 // before start an entry, check message time against buffered timing
1926 // correction message to update to the correct time.
1927 void correctMsgTime(char *msg)
1928 {
1929    if (!correctTimeLog) return;
1930 //   if (CkMsgDoCorrect(msg) == 0) return;
1931
1932    BgMsgID msgId(CmiBgMsgSrcPe(msg), CmiBgMsgID(msg));
1933    CmiInt2 tid = CmiBgMsgThreadID(msg);
1934
1935    bgCorrectionQ &cmsg = cva(nodeinfo)[tMYNODEID].cmsg;
1936    int len = cmsg.length();
1937    for (int i=0; i<len; i++) {
1938      bgCorrectionMsg* m = cmsg[i];
1939      if (msgId == m->msgId && tid == m->tID) {
1940         if (m->tAdjust < 0.0) return;
1941         //CmiPrintf("correctMsgTime from %e to %e\n", CmiBgMsgRecvTime(msg), m->tAdjust);
1942         CmiBgMsgRecvTime(msg) = m->tAdjust;
1943         m->tAdjust = -1.0;       /* invalidate this msg */
1944 //      cmsg.update(i);
1945         stateCounters.corrMsgRCCnt++;
1946         break;
1947      }
1948    }
1949 }
1950
1951
1952 //TODO: write disk bgTraceFiles
1953 static void writeToDisk()
1954 {
1955
1956   char* d = new char[1025];
1957   //Num of simulated procs on this real pe
1958   int numLocalProcs = cva(numNodes)*cva(bgMach).numWth;
1959
1960   const PUP::machineInfo &machInfo = PUP::machineInfo::current();
1961
1962   // write summary file on PE0
1963   if(CmiMyPe()==0){
1964     
1965     sprintf(d, "%sbgTrace", cva(bgMach).traceroot?cva(bgMach).traceroot:""); 
1966     FILE *f2 = fopen(d,"wb");
1967     //Total emulating processors and total target BG processors
1968     int numEmulatingPes=CmiNumPes();
1969     int totalWorkerProcs = BgNumNodes()*cva(bgMach).numWth;
1970
1971     if(f2==NULL) {
1972       CmiPrintf("[%d] Creating trace file %s  failed\n", CmiMyPe(), d);
1973       CmiAbort("BG> Abort");
1974     }
1975     PUP::toDisk p(f2);
1976     p((char *)&machInfo, sizeof(machInfo));
1977     p|totalWorkerProcs;
1978     p|cva(bgMach);
1979     p|numEmulatingPes;
1980     p|bglog_version;
1981     p|CpvAccess(CthResumeBigSimThreadIdx);
1982     
1983     CmiPrintf("[0] Number is numX:%d numY:%d numZ:%d numCth:%d numWth:%d numEmulatingPes:%d totalWorkerProcs:%d bglog_ver:%d\n",cva(bgMach).x,cva(bgMach).y,cva(bgMach).z,cva(bgMach).numCth,cva(bgMach).numWth,numEmulatingPes,totalWorkerProcs,bglog_version);
1984     
1985     fclose(f2);
1986   }
1987   
1988   sprintf(d, "%sbgTrace%d", cva(bgMach).traceroot?cva(bgMach).traceroot:"", CmiMyPe()); 
1989   FILE *f = fopen(d,"wb");
1990  
1991   if(f==NULL)
1992     CmiPrintf("Creating bgTrace%d failed\n",CmiMyPe());
1993   PUP::toDisk p(f);
1994   
1995   p((char *)&machInfo, sizeof(machInfo));       // machine info
1996   p|numLocalProcs;
1997
1998   // CmiPrintf("Timelines are: \n");
1999   int procTablePos = ftell(f);
2000
2001   int *procOffsets = new int[numLocalProcs];
2002   int procTableSize = (numLocalProcs)*sizeof(int);
2003   fseek(f,procTableSize,SEEK_CUR); 
2004
2005   for (int j=0; j<cva(numNodes); j++){
2006     for(int i=0;i<cva(bgMach).numWth;i++){
2007     #if BIGSIM_OUT_OF_CORE
2008         //When isomalloc is used, some events inside BgTimeLineRec are allocated
2009         //through isomalloc. Therefore, the memory containing those events needs
2010         //to be brought back into memory from disk. --Chao Mei          
2011         if(bgUseOutOfCore && CmiMemoryIs(CMI_MEMORY_IS_ISOMALLOC))
2012             bgOutOfCoreSchedule(cva(nodeinfo)[j].threadinfo[i]);
2013      #endif     
2014       BgTimeLineRec &t = cva(nodeinfo)[j].timelines[i];
2015       procOffsets[j*cva(bgMach).numWth + i] = ftell(f);
2016       t.pup(p);
2017     }
2018   }
2019   
2020   fseek(f,procTablePos,SEEK_SET);
2021   p(procOffsets,numLocalProcs);
2022   fclose(f);
2023
2024   CmiPrintf("[%d] Wrote to disk for %d BG nodes. \n", CmiMyPe(), cva(numNodes));
2025 }
2026
2027
2028 // application Converse thread hook
2029
2030 CpvExtern(int      , CthResumeBigSimThreadIdx);
2031
2032 void CthEnqueueBigSimThread(CthThreadToken* token, int s,
2033                                    int pb,unsigned int *prio)
2034 {
2035 /*
2036   CmiSetHandler(token, CpvAccess(CthResumeNormalThreadIdx));
2037   CsdEnqueueGeneral(token, s, pb, prio);
2038 */
2039 #if CMK_BLUEGENE_THREAD
2040   int x, y, z;
2041   BgGetMyXYZ(&x, &y, &z);
2042   int t = BgGetThreadID();
2043 #else
2044   #error "ERROR HERE"
2045 #endif
2046     // local message into queue
2047   DEBUGM(4, ("In EnqueueBigSimThread method!\n"));
2048
2049   DEBUGM(4, ("token [%p] is added to queue pointing to thread[%p]\n", token, token->thread));
2050   BgSendPacket(x,y,z, t, CpvAccess(CthResumeBigSimThreadIdx), LARGE_WORK, sizeof(CthThreadToken), (char *)token);
2051
2052
2053 CthThread CthSuspendBigSimThread()
2054
2055   return  cta(threadinfo)->me;
2056 }
2057
2058 static void bigsimThreadListener_suspend(struct CthThreadListener *l)
2059 {
2060    // stop timer
2061    stopVTimer();
2062 }
2063
2064 static void bigsimThreadListener_resume(struct CthThreadListener *l)
2065 {
2066    // start timer by reset it
2067    resetVTime();
2068 }
2069
2070
2071 void BgSetStrategyBigSimDefault(CthThread t)
2072
2073   CthSetStrategy(t,
2074                  CthEnqueueBigSimThread,
2075                  CthSuspendBigSimThread);
2076
2077   CthThreadListener *a = new CthThreadListener;
2078   a->suspend = bigsimThreadListener_suspend;
2079   a->resume = bigsimThreadListener_resume;
2080   a->free = NULL;
2081   CthAddListener(t, a);
2082 }
2083
2084
2085 int BgIsRecord()
2086 {
2087     return cva(bgMach).record == 1;
2088 }
2089
2090 int BgIsReplay()
2091 {
2092     return cva(bgMach).replay != -1;
2093 }
2094
2095 extern "C" void CkReduce(void *msg, int size, CmiReduceMergeFn mergeFn) {
2096   ((workThreadInfo*)cta(threadinfo))->reduceMsg = msg;
2097   //CmiPrintf("Called CkReduce from %d %hd\n",CmiMyPe(),cta(threadinfo)->globalId);
2098   int numLocal = 0, count = 0;
2099   for (int j=0; j<cva(numNodes); j++){
2100     for(int i=0;i<cva(bgMach).numWth;i++){
2101       workThreadInfo *t = (workThreadInfo*)cva(nodeinfo)[j].threadinfo[i];
2102       if (t->reduceMsg == NULL) return; /* we are not yet ready to reduce */
2103       numLocal ++;
2104     }
2105   }
2106   void **msgLocal = (void**)malloc(sizeof(void*)*(numLocal-1));
2107   for (int j=0; j<cva(numNodes); j++){
2108     for(int i=0;i<cva(bgMach).numWth;i++){
2109       workThreadInfo *t = (workThreadInfo*)cva(nodeinfo)[j].threadinfo[i];
2110       if (t == cta(threadinfo)) break;
2111       msgLocal[count++] = t->reduceMsg;
2112       t->reduceMsg = NULL;
2113     }
2114   }
2115   CmiAssert(count==numLocal-1);
2116   msg = mergeFn(&size, msg, msgLocal, numLocal-1);
2117   CmiReduce(msg, size, mergeFn);
2118   CmiPrintf("Called CmiReduce %d\n",CmiMyPe());
2119   for (int i=0; i<numLocal-1; ++i) CmiFree(msgLocal[i]);
2120   free(msgLocal);
2121 }