Change CmiUInt2 msgType to CmiUInt1 cmaMsgType:2 in conv-common.h
[charm.git] / src / arch / util / machine-common-core.C
1 /*
2  *created by Chao Mei
3  *revised by Yanhua, Gengbin
4  */
5
6 #if CMK_C_INLINE
7 #define INLINE_KEYWORD inline
8 #else
9 #define INLINE_KEYWORD
10 #endif
11
12 #if MACHINE_DEBUG_LOG
13 FILE *debugLog = NULL;
14 #endif
15
16 // Macro for message type
17 #define CMI_CMA_MSGTYPE(msg)         ((CmiMsgHeaderBasic *)msg)->cmaMsgType
18
19 /******* broadcast related  */
20 #ifndef CMK_BROADCAST_SPANNING_TREE
21 #define CMK_BROADCAST_SPANNING_TREE    1
22 #endif
23
24 #ifndef CMK_BROADCAST_HYPERCUBE
25 #define CMK_BROADCAST_HYPERCUBE        0
26 #endif
27
28 #define BROADCAST_SPANNING_FACTOR      4
29 /* The number of children used when a msg is broadcast inside a node */
30 #define BROADCAST_SPANNING_INTRA_FACTOR  8
31
32 /* Root of broadcast:
33  * non-bcast msg: root = 0;
34  * proc-level bcast msg: root >=1; (CmiMyPe()+1)
35  * node-level bcast msg: root <=-1; (-CmiMyNode()-1)
36  */
37 #define CMI_BROADCAST_ROOT(msg)          ((CmiMsgHeaderBasic *)msg)->root
38 #define CMI_SET_BROADCAST_ROOT(msg, root)  CMI_BROADCAST_ROOT(msg) = (root);
39
40 /**
41  * For some machine layers such as on Active Message framework,
42  * the receiver callback is usally executed on an internal
43  * thread (i.e. not the flow managed by ours). Therefore, for
44  * forwarding broadcast messages, we could have a choice whether
45  * to offload such function to the flow we manage such as the
46  * communication thread. -
47  */
48
49 #ifndef CMK_OFFLOAD_BCAST_PROCESS
50 #define CMK_OFFLOAD_BCAST_PROCESS 0
51 #endif
52
53 #if CMK_OFFLOAD_BCAST_PROCESS
54 CsvDeclare(CMIQueue, procBcastQ);
55 #if CMK_NODE_QUEUE_AVAILABLE
56 CsvDeclare(CMIQueue, nodeBcastQ);
57 #endif
58 #endif
59
60 static int _exitcode;
61
62 #if CMK_WITH_STATS
63 static int  MSG_STATISTIC = 0;
64 int     msg_histogram[22];
65 static int _cmi_log2(int size)
66 {
67     int ret = 1;
68     size = size-1;
69     while( (size=size>>1)>0) ret++;
70     return ret;
71 }
72 #endif
73
74 #if CMK_BROADCAST_HYPERCUBE
75 /* ceil(log2(CmiNumNodes)) except when _Cmi_numnodes is 1, used for hypercube */
76 static int CmiNodesDim;
77 #endif
78 /* ###End of Broadcast related definitions ### */
79
80 #if CMK_SMP_TRACE_COMMTHREAD
81 CMI_EXTERNC
82 double TraceTimerCommon(void);
83 #endif
84
85 static void handleOneBcastMsg(int size, char *msg);
86 static void processBcastQs(void);
87
88 /* Utility functions for forwarding broadcast messages,
89  * should not be used in machine-specific implementations
90  * except in some special occasions.
91  */
92 static INLINE_KEYWORD void processProcBcastMsg(int size, char *msg);
93 static INLINE_KEYWORD void processNodeBcastMsg(int size, char *msg);
94 static void SendSpanningChildrenProc(int size, char *msg);
95 static void SendHyperCubeProc(int size, char *msg);
96 #if CMK_NODE_QUEUE_AVAILABLE
97 static void SendSpanningChildrenNode(int size, char *msg);
98 static void SendHyperCubeNode(int size, char *msg);
99 #endif
100
101 static void SendSpanningChildren(int size, char *msg, int rankToAssign, int startNode);
102 static void SendHyperCube(int size,  char *msg, int rankToAssign, int startNode);
103
104 #if USE_COMMON_SYNC_BCAST || USE_COMMON_ASYNC_BCAST
105 #if !CMK_BROADCAST_SPANNING_TREE && !CMK_BROADCAST_HYPERCUBE
106 #warning "Broadcast function is based on the plain P2P O(P)-message scheme!!!"
107 #endif
108 #endif
109
110 #include <assert.h>
111
112 void CmiSyncBroadcastFn(int size, char *msg);
113 CmiCommHandle CmiAsyncBroadcastFn(int size, char *msg);
114 void CmiFreeBroadcastFn(int size, char *msg);
115
116 void CmiSyncBroadcastAllFn(int size, char *msg);
117 CmiCommHandle CmiAsyncBroadcastAllFn(int size, char *msg);
118 void CmiFreeBroadcastAllFn(int size, char *msg);
119
120 #if CMK_NODE_QUEUE_AVAILABLE
121 void CmiSyncNodeBroadcastFn(int size, char *msg);
122 CmiCommHandle CmiAsyncNodeeroadcastFn(int size, char *msg);
123 void CmiFreeNodeBroadcastFn(int size, char *msg);
124
125 void CmiSyncNodeBroadcastAllFn(int size, char *msg);
126 CmiCommHandle CmiAsyncNodeBroadcastAllFn(int size, char *msg);
127 void CmiFreeNodeBroadcastAllFn(int size, char *msg);
128 #endif
129
130 /************** Done with Broadcast related */
131
132 #define CMI_DEST_RANK(msg)               ((CmiMsgHeaderBasic *)msg)->rank
133
134 #ifndef CMK_HAS_SIZE_IN_MSGHDR
135 #define CMK_HAS_SIZE_IN_MSGHDR 1
136 #endif
137 #if CMK_HAS_SIZE_IN_MSGHDR
138 #define CMI_MSG_SIZE(msg)  ((CmiMsgHeaderBasic *)msg)->size
139 #else
140 #define CMI_MSG_SIZE(msg)  (CmiAbort("Has no msg size in header"))
141 #endif
142
143 #if CMK_NODE_QUEUE_AVAILABLE
144 /* This value should be larger than the number of cores used
145  * per charm smp node. So it's currently set to such a large
146  * value.
147  */
148 #define DGRAM_NODEMESSAGE   (0x1FFB)
149 #endif
150
151 // global state, equals local if running one partition
152 PartitionInfo _partitionInfo;
153 int _Cmi_mype_global;
154 int _Cmi_numpes_global;
155 int _Cmi_mynode_global;
156 int _Cmi_numnodes_global;
157 static int _writeToStdout = 1;
158
159 // Node state structure, local information for the partition
160 int               _Cmi_myphysnode_numprocesses;  /* Total number of processes within this node */
161 int               _Cmi_mynodesize;/* Number of processors in my address space */
162 int               _Cmi_mynode;    /* Which address space am I */
163 int               _Cmi_numnodes;  /* Total number of address spaces */
164 int               _Cmi_numpes;    /* Total number of processors */
165 CMI_EXTERNC_VARIABLE int userDrivenMode;
166 int               userDrivenMode; /* Set by CharmInit for interop in user driven mode */
167 extern int CharmLibInterOperate;
168
169 CpvDeclare(void*, CmiLocalQueue);
170
171 /* different modes for sending a message */
172 #define P2P_SYNC      0x1
173 #define P2P_ASYNC     0x2
174 #define BCAST_SYNC    0x4
175 #define BCAST_ASYNC   0x8
176 #define OUT_OF_BAND   0x10
177
178 enum MACHINE_SMP_MODE {
179     INVALID_MODE,
180 #if CMK_BLUEGENEQ
181     COMM_THREAD_SEND_RECV = 0,
182 #else 
183     COMM_THREAD_SEND_RECV = 0,
184 #endif
185     COMM_THREAD_ONLY_RECV, /* work threads will do the send */
186     COMM_WORK_THREADS_SEND_RECV, /* work and comm threads do the both send/recv */
187     COMM_THREAD_NOT_EXIST /* work threads will do both send and recv */
188 };
189 /* The default mode of smp charm runtime */
190 static enum MACHINE_SMP_MODE Cmi_smp_mode_setting = COMM_THREAD_SEND_RECV;
191
192 /* Machine layer dependent specific modes of the smp charm runtime are set in individual machine layers */
193
194 #if CMK_OMP
195 /* Suspended tasks should be in a separate queue 
196  * They are pushed by the master thread who generated the tasks. When they are stolen by other PEs and suspended on the other PEs, they should be reinserted to the PEs local Queue so that the locality of resumed tasks can be maintained 
197  * */
198 void CmiSuspendedTaskEnqueue(int targetRank, void *data);
199 void* CmiSuspendedTaskPop();
200 #endif
201
202 #if CMK_SMP
203 #include <atomic>
204 std::atomic<int> commThdExit {0};
205
206 /**
207  *  The macro defines whether to have a comm thd to offload some
208  *  work such as forwarding bcast messages etc. This macro
209  *  should be defined before including "machine-smp.c". Note
210  *  that whether a machine layer in SMP mode could run w/o comm
211  *  thread depends on the support of the underlying
212  *  communication library.
213  *
214  */
215 #ifndef CMK_SMP_NO_COMMTHD
216 #define CMK_SMP_NO_COMMTHD CMK_MULTICORE
217 #endif
218
219 #if CMK_SMP_NO_COMMTHD
220 int Cmi_commthread = 0;
221 #else
222 int Cmi_commthread = 1;
223 #endif
224
225 #endif //CMK_SMP
226
227 /*SHOULD BE MOVED TO MACHINE-SMP.C ??*/
228 int Cmi_nodestart = -1;
229 static int Cmi_nodestartGlobal = -1;
230
231 /*
232  * Network progress utility variables. Period controls the rate at
233  * which the network poll is called
234  */
235 #ifndef NETWORK_PROGRESS_PERIOD_DEFAULT
236 #define NETWORK_PROGRESS_PERIOD_DEFAULT 1000
237 #endif
238
239 CpvDeclare(unsigned , networkProgressCount);
240 int networkProgressPeriod;
241
242 #if CMK_CCS_AVAILABLE
243 CMI_EXTERNC_VARIABLE int ccsRunning;
244 #endif
245
246 /* ===== Beginning of Common Function Declarations ===== */
247 CMK_NORETURN void CmiAbort(const char *message);
248 static void PerrorExit(const char *msg);
249
250 /* This function handles the msg received as which queue to push into */
251 static void handleOneRecvedMsg(int size, char *msg);
252
253 /* Utility functions for forwarding broadcast messages,
254  * should not be used in machine-specific implementations
255  * except in some special occasions.
256  */
257 static void SendToPeers(int size, char *msg);
258
259
260 void CmiPushPE(int rank, void *msg);
261
262 #if CMK_NODE_QUEUE_AVAILABLE
263 void CmiPushNode(void *msg);
264 #endif
265
266 /* Functions regarding send ops declared in converse.h */
267
268 /* In default, using the common codes for msg sending */
269 #ifndef USE_COMMON_SYNC_P2P
270 #define USE_COMMON_SYNC_P2P 1
271 #endif
272 #ifndef USE_COMMON_ASYNC_P2P
273 #define USE_COMMON_ASYNC_P2P 1
274 #endif
275 #ifndef USE_COMMON_SYNC_BCAST
276 #define USE_COMMON_SYNC_BCAST 1
277 #endif
278 #ifndef USE_COMMON_ASYNC_BCAST
279 #define USE_COMMON_ASYNC_BCAST 1
280 #endif
281
282 static void CmiSendSelf(char *msg);
283
284 void CmiSyncSendFn(int destPE, int size, char *msg);
285 CmiCommHandle CmiAsyncSendFn(int destPE, int size, char *msg);
286 void CmiFreeSendFn(int destPE, int size, char *msg);
287
288 #if CMK_NODE_QUEUE_AVAILABLE
289 static void CmiSendNodeSelf(char *msg);
290
291 void CmiSyncNodeSendFn(int destNode, int size, char *msg);
292 CmiCommHandle CmiAsyncNodeSendFn(int destNode, int size, char *msg);
293 void CmiFreeNodeSendFn(int destNode, int size, char *msg);
294
295 #endif
296
297 /* Functions and variables regarding machine startup */
298 static char     **Cmi_argv;
299 static char     **Cmi_argvcopy;
300 static CmiStartFn Cmi_startfn;   /* The start function */
301 static int        Cmi_usrsched;  /* Continue after start function finishes? */
302 void ConverseInit(int argc, char **argv, CmiStartFn fn, int usched, int initret);
303 static void ConverseRunPE(int everReturn);
304
305 /* Functions regarding machine running on every proc */
306 static void AdvanceCommunication(int whenidle);
307 static void CommunicationServer(int sleepTime);
308 CMI_EXTERNC
309 void CommunicationServerThread(int sleepTime);
310
311 /* Functions providing incoming network messages */
312 void *CmiGetNonLocal(void);
313 #if CMK_NODE_QUEUE_AVAILABLE
314 CMI_EXTERNC
315 void *CmiGetNonLocalNodeQ(void);
316 #endif
317 /* Utiltiy functions */
318 static char *CopyMsg(char *msg, int len);
319
320 /* ===== End of Common Function Declarations ===== */
321
322 #include "machine-smp.c"
323
324 /* ===== Beginning of Idle-state Related Declarations =====  */
325 typedef struct {
326     int sleepMs; /*Milliseconds to sleep while idle*/
327     int nIdles; /*Number of times we've been idle in a row*/
328     CmiState cs; /*Machine state*/
329 } CmiIdleState;
330
331 static CmiIdleState *CmiNotifyGetState(void);
332
333 /**
334  *  Generally,
335  *
336  *  CmiNotifyIdle is used in non-SMP mode when the proc is idle.
337  *  When the proc is idle, AdvanceCommunication needs to be
338  *  called.
339  *
340  *  CmiNotifyStillIdle and CmiNotifyBeginIdle are used in SMP mode.
341  *
342  *  Different layers have choices of registering different callbacks for
343  *  idle state.
344  */
345 static void CmiNotifyBeginIdle(CmiIdleState *s);
346 static void CmiNotifyStillIdle(CmiIdleState *s);
347 void CmiNotifyIdle(void);
348 /* ===== End of Idle-state Related Declarations =====  */
349
350 CsvDeclare(CmiNodeState, NodeState);
351 /* ===== Beginning of Processor/Node State-related Stuff =====*/
352 #if !CMK_SMP
353 /************ non SMP **************/
354 static struct CmiStateStruct Cmi_state;
355 int _Cmi_mype;
356 int _Cmi_myrank;
357
358 void CmiMemLock(void) {}
359 void CmiMemUnlock(void) {}
360
361 #define CmiGetState() (&Cmi_state)
362 #define CmiGetStateN(n) (&Cmi_state)
363
364 void CmiYield(void) {
365     sleep(0);
366 }
367
368 static void CmiStartThreads(char **argv) {
369     CmiStateInit(Cmi_nodestart, 0, &Cmi_state);
370     _Cmi_mype = Cmi_nodestart;
371     _Cmi_myrank = 0;
372     _Cmi_mype_global = _Cmi_mynode_global;
373 }
374
375 INLINE_KEYWORD int CmiNodeSpan(void) {
376   return 1;
377 }
378 #else
379 /************** SMP *******************/
380 INLINE_KEYWORD CMIQueue CmiMyRecvQueue(void) {
381     return CmiGetState()->recv;
382 }
383
384 #if CMK_NODE_QUEUE_AVAILABLE
385 INLINE_KEYWORD
386 #if CMK_LOCKLESS_QUEUE
387 MPMCQueue
388 #else
389 CMIQueue
390 #endif
391 CmiMyNodeQueue(void) {
392     return CsvAccess(NodeState).NodeRecv;
393 }
394 #endif
395
396 CMI_EXTERNC
397 int CmiMyPe(void) {
398     return CmiGetState()->pe;
399 }
400 CMI_EXTERNC
401 int CmiNodeSpan(void) {
402   return (CmiMyNodeSize() + 1);
403 }
404 CMI_EXTERNC
405 int CmiMyPeGlobal(void) {
406     return CmiGetPeGlobal(CmiGetState()->pe,CmiMyPartition());
407 }
408 CMI_EXTERNC
409 int CmiMyRank(void) {
410     return CmiGetState()->rank;
411 }
412 CMI_EXTERNC
413 int CmiNodeSize(int node) {
414     return _Cmi_mynodesize;
415 }
416 #if !CMK_MULTICORE // these are defined in converse.h
417 CMI_EXTERNC
418 int CmiNodeFirst(int node) {
419     return node*_Cmi_mynodesize;
420 }
421 CMI_EXTERNC
422 int CmiNodeOf(int pe) {
423     return (pe/_Cmi_mynodesize);
424 }
425 CMI_EXTERNC
426 int CmiRankOf(int pe) {
427     return pe%_Cmi_mynodesize;
428 }
429 #endif // end of !CMK_MULTICORE
430 #endif // end of CMK_SMP
431
432 static int CmiState_hasMessage(void) {
433   CmiState cs = CmiGetState();
434   return CmiIdleLock_hasMessage(cs);
435 }
436
437 // Function declaration
438 CmiCommHandle CmiInterSendNetworkFunc(int destPE, int partition, int size, char *msg, int mode);
439
440 /* ===== End of Processor/Node State-related Stuff =====*/
441
442 #include "machine-broadcast.c"
443 #include "immediate.c"
444 #include "machine-commthd-util.c"
445 #if CMK_USE_CMA
446 // cma_min_thresold and cma_max_threshold specify the range of sizes between which CMA will be used for SHM messaging
447 int cma_works, cma_reg_msg, cma_min_threshold, cma_max_threshold;
448 #include "machine-cma.c"
449 int CmiDoesCMAWork() {
450   return cma_works;
451 }
452 #endif
453
454 /* ===== Beginning of Common Function Definitions ===== */
455 static void PerrorExit(const char *msg) {
456     perror(msg);
457     exit(1);
458 }
459
460 /* ##### Beginning of Functions Related with Message Sending OPs ##### */
461 /*Add a message to this processor's receive queue, pe is a rank */
462 void CmiPushPE(int rank,void *msg) {
463     CmiState cs = CmiGetStateN(rank);
464     MACHSTATE2(3,"Pushing message into rank %d's queue %p{",rank, cs->recv);
465 #if CMK_IMMEDIATE_MSG
466     if (CmiIsImmediate(msg)) {
467         MACHSTATE1(3, "[%p] Push Immediate Message begin{",CmiGetState());
468         CMI_DEST_RANK(msg) = rank;
469         CmiPushImmediateMsg(msg);
470         MACHSTATE1(3, "[%p] Push Immediate Message end}",CmiGetState());
471         return;
472     }
473 #endif
474
475 #if CMK_MACH_SPECIALIZED_QUEUE
476     LrtsSpecializedQueuePush(rank, msg);
477 #elif CMK_SMP_MULTIQ
478     CMIQueuePush(cs->recv[CmiGetState()->myGrpIdx], (char *)msg);
479 #else
480     CMIQueuePush(cs->recv,(char*)msg);
481 #endif
482
483 #if CMK_SHARED_VARS_POSIX_THREADS_SMP
484   if (_Cmi_sleepOnIdle)
485 #endif
486     CmiIdleLock_addMessage(&cs->idle);
487     MACHSTATE1(3,"} Pushing message into rank %d's queue done",rank);
488 }
489
490 #if CMK_OMP
491 void CmiSuspendedTaskEnqueue(int targetRank, void *msg) {
492   CMIQueuePush((PCQueue)CpvAccessOther(CmiSuspendedTaskQueue, targetRank), (char *)msg);
493 }
494
495 void* CmiSuspendedTaskPop() {
496   return CMIQueuePop((CMIQueue)CpvAccess(CmiSuspendedTaskQueue));
497 }
498 #endif
499
500 #if CMK_NODE_QUEUE_AVAILABLE
501 /*Add a message to this processor's receive queue */
502 void CmiPushNode(void *msg) {
503     MACHSTATE(3,"Pushing message into NodeRecv queue");
504 #if CMK_IMMEDIATE_MSG
505     if (CmiIsImmediate(msg)) {
506         CMI_DEST_RANK(msg) = 0;
507         CmiPushImmediateMsg(msg);
508         return;
509     }
510 #endif
511
512 #if CMK_MACH_SPECIALIZED_QUEUE
513     LrtsSpecializedNodeQueuePush((char *)msg);
514 #elif CMK_LOCKLESS_QUEUE
515     MPMCQueuePush(CsvAccess(NodeState).NodeRecv,msg);
516 #else
517     CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
518     CMIQueuePush(CsvAccess(NodeState).NodeRecv, (char *)msg);
519     CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
520 #endif
521
522 #if CMK_SHARED_VARS_POSIX_THREADS_SMP
523     if (_Cmi_sleepOnIdle)
524 #endif
525     {
526         CmiState cs=CmiGetStateN(0);
527         CmiIdleLock_addMessage(&cs->idle);
528     }
529 }
530 #endif
531
532 /* This function handles the msg received as which queue to push into */
533 static INLINE_KEYWORD void handleOneRecvedMsg(int size, char *msg) {
534
535 #if CMK_SMP_TRACE_COMMTHREAD
536     TRACE_COMM_CREATION(TraceTimerCommon(), msg);
537 #endif
538
539 #if CMK_USE_CMA
540     // If CMA message, perform CMA read to get the payload message
541     if(cma_reg_msg && CMI_CMA_MSGTYPE(msg) == CMK_CMA_MD_MSG) {
542       handleOneCmaMdMsg(&size, &msg);  // size & msg are modififed
543     } else if(cma_reg_msg && CMI_CMA_MSGTYPE(msg) == CMK_CMA_ACK_MSG) {
544       handleOneCmaAckMsg(size, msg);
545       return;
546     }
547 #endif
548
549     int isBcastMsg = 0;
550 #if CMK_BROADCAST_SPANNING_TREE || CMK_BROADCAST_HYPERCUBE
551     isBcastMsg = (CMI_BROADCAST_ROOT(msg)!=0);
552 #endif
553
554     if (isBcastMsg) {
555         handleOneBcastMsg(size, msg);
556         return;
557     }
558
559 #if CMK_NODE_QUEUE_AVAILABLE
560     if (CMI_DEST_RANK(msg)==DGRAM_NODEMESSAGE){
561         CmiPushNode(msg);
562         return;
563     }
564 #endif
565     CmiPushPE(CMI_DEST_RANK(msg), msg);
566
567 }
568
569
570 static void SendToPeers(int size, char *msg) {
571     /* FIXME: now it's just a flat p2p send!! When node size is large,
572     * it should also be sent in a tree
573     */
574     int exceptRank = CMI_DEST_RANK(msg);
575     int i;
576     for (i=0; i<exceptRank; i++) {
577         CmiPushPE(i, CopyMsg(msg, size));
578     }
579     for (i=exceptRank+1; i<CmiMyNodeSize(); i++) {
580         CmiPushPE(i, CopyMsg(msg, size));
581     }
582 }
583
584
585 /* Functions regarding sending operations */
586 static void CmiSendSelf(char *msg) {
587 #if CMK_IMMEDIATE_MSG
588     if (CmiIsImmediate(msg)) {
589         /* CmiBecomeNonImmediate(msg); */
590         CmiPushImmediateMsg(msg);
591         CmiHandleImmediate();
592         return;
593     }
594 #endif
595     CdsFifo_Enqueue(CpvAccess(CmiLocalQueue),msg);
596 }
597
598 /* Functions regarding P2P send op */
599 #if USE_COMMON_SYNC_P2P
600 void CmiSyncSendFn(int destPE, int size, char *msg) {
601     char *dupmsg = CopyMsg(msg, size);
602     CmiFreeSendFn(destPE, size, dupmsg);
603 }
604 //inter-partition send
605 void CmiInterSyncSendFn(int destPE, int partition, int size, char *msg) {
606     char *dupmsg = CopyMsg(msg, size);
607     CmiInterFreeSendFn(destPE, partition, size, dupmsg);
608 }
609
610 #if CMK_USE_PXSHM
611 #include "machine-pxshm.c"
612 #endif
613 #if CMK_USE_XPMEM
614 #include "machine-xpmem.c"
615 #endif
616
617 static int refcount = 0;
618
619 #if CMK_USE_OOB
620 CpvExtern(int, _urgentSend);
621 #endif
622
623 //I am changing this function to offload task to a generic function - the one
624 //that handles sending to any partition
625 INLINE_KEYWORD CmiCommHandle CmiSendNetworkFunc(int destPE, int size, char *msg, int mode) {
626   // Set the message as a regular message (defined in lrts-common.h)
627   CMI_CMA_MSGTYPE(msg) = CMK_REG_NO_CMA_MSG;
628   return CmiInterSendNetworkFunc(destPE, CmiMyPartition(), size, msg, mode);
629 }
630 //the generic function that replaces the older one
631 CmiCommHandle CmiInterSendNetworkFunc(int destPE, int partition, int size, char *msg, int mode)
632 {
633         int rank;
634         int destLocalNode = CmiNodeOf(destPE); 
635         int destNode = CmiGetNodeGlobal(destLocalNode,partition); 
636
637 #if CMK_USE_CMA
638         if(cma_reg_msg && partition == CmiMyPartition() && CmiPeOnSamePhysicalNode(CmiMyPe(), destPE)) {
639           if(CMI_CMA_MSGTYPE(msg) == CMK_REG_NO_CMA_MSG && cma_min_threshold <= size && size <= cma_max_threshold) {
640             CmiSendMessageCma(&msg, &size); // size & msg are modififed
641           }
642         }
643 #endif
644
645 #if CMK_USE_PXSHM      
646         if ((partition == CmiMyPartition()) && CmiValidPxshm(destLocalNode, size)) {
647           CmiSendMessagePxshm(msg, size, destLocalNode, &refcount);
648           //for (int i=0; i<refcount; i++) CmiReference(msg);
649           return 0;
650         }
651 #endif
652 #if CMK_USE_XPMEM     
653         if ((partition == CmiMyPartition()) && CmiValidXpmem(destLocalNode, size)) {
654           CmiSendMessageXpmem(msg, size, destLocalNode, &refcount);
655           //for (int i=0; i<refcount; i++) CmiReference(msg);
656           return 0;
657         }
658 #endif
659 #if CMK_PERSISTENT_COMM
660         if (CpvAccess(phs)) {
661           if (size > PERSIST_MIN_SIZE) {
662             CmiAssert(CpvAccess(curphs) < CpvAccess(phsSize));
663             PersistentSendsTable *slot = (PersistentSendsTable *)CpvAccess(phs)[CpvAccess(curphs)];
664             CmiAssert(CmiNodeOf(slot->destPE) == destLocalNode);
665             LrtsSendPersistentMsg(CpvAccess(phs)[CpvAccess(curphs)], destNode, size, msg);
666             return 0;
667           }
668         }
669 #endif
670
671 #if CMK_WITH_STATS
672 if (MSG_STATISTIC)
673 {
674     int ret_log = _cmi_log2(size);
675     if(ret_log >21) ret_log = 21;
676     msg_histogram[ret_log]++;
677 }
678 #endif
679 #if CMK_USE_OOB
680     if (CpvAccess(_urgentSend)) mode |= OUT_OF_BAND;
681 #endif
682     return LrtsSendFunc(destNode, CmiGetPeGlobal(destPE,partition), size, msg, mode);
683 }
684
685 //I am changing this function to offload task to a generic function - the one
686 //that handles sending to any partition
687 void CmiFreeSendFn(int destPE, int size, char *msg) {
688     CmiInterFreeSendFn(destPE, CmiMyPartition(), size, msg);
689 }
690 //and the generic implementation - I may be in danger of making the frequent
691 //case slower - two extra comparisons may happen
692 void CmiInterFreeSendFn(int destPE, int partition, int size, char *msg) {
693     CMI_SET_BROADCAST_ROOT(msg, 0);
694
695     // Set the message as a regular message (defined in lrts-common.h)
696     CMI_CMA_MSGTYPE(msg) = CMK_REG_NO_CMA_MSG;
697 #if CMI_QD
698     CQdCreate(CpvAccess(cQdState), 1);
699 #endif
700     if (CmiMyPe()==destPE && partition == CmiMyPartition()) {
701         CmiSendSelf(msg);
702 #if CMK_PERSISTENT_COMM
703         if (CpvAccess(phs)) CpvAccess(curphs)++;
704 #endif
705     } 
706     else {
707         int destNode = CmiNodeOf(destPE);
708         int destRank = CmiRankOf(destPE);
709 #if CMK_SMP
710         if (CmiMyNode()==destNode && partition == CmiMyPartition()) {
711             CmiPushPE(destRank, msg);
712 #if CMK_PERSISTENT_COMM
713             if (CpvAccess(phs)) CpvAccess(curphs)++;
714 #endif
715             return;
716         }
717 #endif
718         CMI_DEST_RANK(msg) = destRank;
719         CmiInterSendNetworkFunc(destPE, partition, size, msg, P2P_SYNC);
720
721 #if CMK_PERSISTENT_COMM
722         if (CpvAccess(phs)) CpvAccess(curphs)++;
723 #endif
724     }
725 }
726 #endif
727
728 #if USE_COMMON_ASYNC_P2P
729 //not implementing it for partition
730 CmiCommHandle CmiAsyncSendFn(int destPE, int size, char *msg) {
731     int destNode = CmiNodeOf(destPE);
732     if (destNode == CmiMyNode()) {
733         CmiSyncSendFn(destPE,size,msg);
734         return 0;
735     } else {
736 #if CMK_WITH_STATS
737 if (  MSG_STATISTIC)
738 {
739     int ret_log = _cmi_log2(size);
740         if(ret_log >21) ret_log = 21;
741         msg_histogram[ret_log]++;
742 }
743 #endif
744         return CmiSendNetworkFunc(destPE, size, msg, P2P_ASYNC);
745     }
746 }
747 #endif
748
749 #if CMK_NODE_QUEUE_AVAILABLE
750 static void CmiSendNodeSelf(char *msg) {
751 #if CMK_IMMEDIATE_MSG
752     if (CmiIsImmediate(msg)) {
753         CmiPushImmediateMsg(msg);
754         if (!_immRunning) CmiHandleImmediate();
755         return;
756     }
757 #endif
758 #if CMK_MACH_SPECIALIZED_QUEUE
759     LrtsSpecializedNodeQueuePush(msg);
760 #elif CMK_LOCKLESS_QUEUE
761     MPMCQueuePush(CsvAccess(NodeState).NodeRecv, msg);
762 #else
763     CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
764     CMIQueuePush(CsvAccess(NodeState).NodeRecv, msg);
765     CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
766 #endif
767 }
768
769 //I think this #if is incorrect - should be SYNC_P2P
770 #if USE_COMMON_SYNC_P2P
771 void CmiSyncNodeSendFn(int destNode, int size, char *msg) {
772     char *dupmsg = CopyMsg(msg, size);
773     CmiFreeNodeSendFn(destNode, size, dupmsg);
774 }
775 //inter-partition send
776 void CmiInterSyncNodeSendFn(int destNode, int partition, int size, char *msg) {
777     char *dupmsg = CopyMsg(msg, size);
778     CmiInterFreeNodeSendFn(destNode, partition, size, dupmsg);
779 }
780
781 //again, offloading the task to a generic function
782 void CmiFreeNodeSendFn(int destNode, int size, char *msg) {
783   CmiInterFreeNodeSendFn(destNode, CmiMyPartition(), size, msg);
784 }
785 //and the inter-partition function
786 void CmiInterFreeNodeSendFn(int destNode, int partition, int size, char *msg) {
787     CMI_DEST_RANK(msg) = DGRAM_NODEMESSAGE;
788 #if CMI_QD
789     CQdCreate(CpvAccess(cQdState), 1);
790 #endif
791     CMI_SET_BROADCAST_ROOT(msg, 0);
792     // Set the message as a regular message (defined in lrts-common.h)
793     CMI_CMA_MSGTYPE(msg) = CMK_REG_NO_CMA_MSG;
794     if (destNode == CmiMyNode() && CmiMyPartition() == partition) {
795         CmiSendNodeSelf(msg);
796     } else {
797 #if CMK_WITH_STATS
798 if (  MSG_STATISTIC)
799 {
800     int ret_log = _cmi_log2(size);
801     if(ret_log >21) ret_log = 21;
802     msg_histogram[ret_log]++;
803 }
804 #endif
805         CmiInterSendNetworkFunc(CmiNodeFirst(destNode), partition, size, msg, P2P_SYNC);
806     }
807 #if CMK_PERSISTENT_COMM
808     if (CpvAccess(phs)) CpvAccess(curphs)++;
809 #endif
810 }
811 #endif
812
813 #if USE_COMMON_ASYNC_P2P
814 //not implementing it for partition
815 CmiCommHandle CmiAsyncNodeSendFn(int destNode, int size, char *msg) {
816     if (destNode == CmiMyNode()) {
817         CmiSyncNodeSendFn(destNode, size, msg);
818         return 0;
819     } else {
820 #if CMK_WITH_STATS
821 if (  MSG_STATISTIC)
822 {
823         int ret_log = _cmi_log2(size);
824         if(ret_log >21) ret_log = 21;
825         msg_histogram[ret_log]++;
826 }
827 #endif
828         return CmiSendNetworkFunc(CmiNodeFirst(destNode), size, msg, P2P_ASYNC);
829     }
830 }
831 #endif
832 #endif
833
834 // functions related to partition
835 #if defined(_WIN32)
836   /* strtok is thread safe in VC++ */
837 #define strtok_r(x,y,z) strtok(x,y)
838 #endif
839
840 #include "TopoManager.h"
841 CMI_EXTERNC
842 void createCustomPartitions(int numparts, int *partitionSize, int *nodeMap);
843 CMI_EXTERNC
844 void setDefaultPartitionParams(void);
845
846 void create_topoaware_partitions(void) {
847   int i, j, numparts_bak;
848   Partition_Type type_bak;
849   int *testMap;
850
851   _partitionInfo.nodeMap = (int*)malloc(CmiNumNodesGlobal()*sizeof(int));
852   _MEMCHECK(_partitionInfo.nodeMap);
853
854   type_bak = _partitionInfo.type;
855   _partitionInfo.type = PARTITION_SINGLETON;
856
857   numparts_bak = _partitionInfo.numPartitions;
858   _partitionInfo.numPartitions = 1;
859
860   _partitionInfo.myPartition = 0;
861
862   _Cmi_numpes = _Cmi_numnodes * _Cmi_mynodesize;
863   
864   TopoManager_init();
865   if(_partitionInfo.scheme == 100) {
866     createCustomPartitions(numparts_bak, _partitionInfo.partitionSize, _partitionInfo.nodeMap);       
867   } else {
868     TopoManager_createPartitions(_partitionInfo.scheme, numparts_bak, _partitionInfo.nodeMap);
869   }
870   TopoManager_free();
871   
872   _partitionInfo.type = type_bak;
873   _partitionInfo.numPartitions = numparts_bak;
874
875 #if CMK_ERROR_CHECKING
876   testMap = (int*)calloc(CmiNumNodesGlobal(), sizeof(int));
877   for(i = 0; i < CmiNumNodesGlobal(); i++) {
878     assert(_partitionInfo.nodeMap[i] >= 0);
879     assert(_partitionInfo.nodeMap[i] < CmiNumNodesGlobal());
880     assert(testMap[_partitionInfo.nodeMap[i]] == 0);
881     testMap[_partitionInfo.nodeMap[i]] = 1;
882   }
883   free(testMap);
884 #endif
885
886   for(i = 0; i < _partitionInfo.numPartitions; i++) {
887     int endnode = _partitionInfo.partitionPrefix[i] + _partitionInfo.partitionSize[i];
888     for(j = _partitionInfo.partitionPrefix[i]; j < endnode; j++) {
889       if(_partitionInfo.nodeMap[j] == CmiMyNodeGlobal()) {
890         _Cmi_mynode = j - _partitionInfo.partitionPrefix[i];
891         _partitionInfo.myPartition = i;
892       }
893     }
894   }
895 }
896
897 void CmiSetNumPartitions(int nump) {
898   _partitionInfo.numPartitions = nump;
899 }
900
901 void CmiSetMasterPartition(void) {
902   if(!CmiMyNodeGlobal() && _partitionInfo.type != PARTITION_DEFAULT) {
903     CmiAbort("setMasterPartition used with incompatible option\n");
904   }
905   _partitionInfo.type = PARTITION_MASTER;
906
907
908 void CmiSetPartitionSizes(char *sizes) {
909   int length = strlen(sizes);
910   _partitionInfo.partsizes = (char*)malloc((length+1)*sizeof(char));
911
912   if(!CmiMyNodeGlobal() && _partitionInfo.type != PARTITION_DEFAULT) {
913     CmiAbort("setPartitionSizes used with incompatible option\n");
914   }
915
916   memcpy(_partitionInfo.partsizes, sizes, length*sizeof(char));
917   _partitionInfo.partsizes[length] = '\0';
918   _partitionInfo.type = PARTITION_PREFIX;
919 }
920
921 void CmiSetPartitionScheme(int scheme) {
922   _partitionInfo.scheme = scheme;
923   _partitionInfo.isTopoaware = 1;
924 }
925
926 void CmiSetCustomPartitioning(void) {
927   _partitionInfo.scheme = 100;
928   _partitionInfo.isTopoaware = 1;
929 }
930
931 static void create_partition_map( char **argv)
932 {
933   char* token, *tptr;
934   int i, flag;
935   
936   _partitionInfo.numPartitions = 1; 
937   _partitionInfo.type = PARTITION_DEFAULT;
938   _partitionInfo.partsizes = NULL;
939   _partitionInfo.scheme = 0;
940   _partitionInfo.isTopoaware = 0;
941
942   setDefaultPartitionParams();
943
944   if(!CmiGetArgIntDesc(argv,"+partitions", &_partitionInfo.numPartitions,"number of partitions")) {
945     CmiGetArgIntDesc(argv,"+replicas", &_partitionInfo.numPartitions,"number of partitions");
946   }
947
948 #if CMK_MULTICORE
949   if(_partitionInfo.numPartitions != 1) {
950     CmiAbort("+partitions other than 1 is not allowed for multicore build\n");
951   }
952 #endif
953
954   _partitionInfo.partitionSize = (int*)calloc(_partitionInfo.numPartitions,sizeof(int));
955   _partitionInfo.partitionPrefix = (int*)calloc(_partitionInfo.numPartitions,sizeof(int));
956   
957   if (CmiGetArgFlagDesc(argv,"+master_partition","assign a process as master partition")) {
958     _partitionInfo.type = PARTITION_MASTER;
959   }
960  
961   if (CmiGetArgStringDesc(argv, "+partition_sizes", &_partitionInfo.partsizes, "size of partitions")) {
962     if(!CmiMyNodeGlobal() && _partitionInfo.type != PARTITION_DEFAULT) {
963       CmiAbort("+partition_sizes used with incompatible option, possibly +master_partition\n");
964     }
965     _partitionInfo.type = PARTITION_PREFIX;
966   }
967
968   if (CmiGetArgFlagDesc(argv,"+partition_topology","topology aware partitions")) {
969     _partitionInfo.isTopoaware = 1;
970     _partitionInfo.scheme = 1;
971   }  
972
973   if (CmiGetArgIntDesc(argv,"+partition_topology_scheme", &_partitionInfo.scheme, "topology aware partitioning scheme")) {
974     _partitionInfo.isTopoaware = 1;
975   }
976
977   if (CmiGetArgFlagDesc(argv,"+use_custom_partition", "custom partitioning scheme")) {
978     _partitionInfo.scheme = 100;
979     _partitionInfo.isTopoaware = 1;
980   }
981
982   if(_partitionInfo.type == PARTITION_DEFAULT) {
983     if((_Cmi_numnodes_global % _partitionInfo.numPartitions) != 0) {
984       CmiAbort("Number of partitions does not evenly divide number of processes. Aborting\n");
985     }
986     _partitionInfo.partitionPrefix[0] = 0;
987     _partitionInfo.partitionSize[0] = _Cmi_numnodes_global / _partitionInfo.numPartitions;
988     for(i = 1; i < _partitionInfo.numPartitions; i++) {
989       _partitionInfo.partitionSize[i] = _partitionInfo.partitionSize[i-1];
990       _partitionInfo.partitionPrefix[i] = _partitionInfo.partitionPrefix[i-1] + _partitionInfo.partitionSize[i-1];
991     } 
992     _partitionInfo.myPartition = _Cmi_mynode_global / _partitionInfo.partitionSize[0];
993   } else if(_partitionInfo.type == PARTITION_MASTER) {
994     if(((_Cmi_numnodes_global-1) % (_partitionInfo.numPartitions-1)) != 0) {
995       CmiAbort("Number of non-master partitions does not evenly divide number of processes minus one. Aborting\n");
996     }
997     _partitionInfo.partitionSize[0] = 1;
998     _partitionInfo.partitionPrefix[0] = 0;
999     _partitionInfo.partitionSize[1] = (_Cmi_numnodes_global-1) / (_partitionInfo.numPartitions-1);
1000     _partitionInfo.partitionPrefix[1] = 1;
1001     for(i = 2; i < _partitionInfo.numPartitions; i++) {
1002       _partitionInfo.partitionSize[i] = _partitionInfo.partitionSize[i-1];
1003       _partitionInfo.partitionPrefix[i] = _partitionInfo.partitionPrefix[i-1] + _partitionInfo.partitionSize[i-1];
1004     } 
1005     _partitionInfo.myPartition = 1 + (_Cmi_mynode_global-1) / _partitionInfo.partitionSize[1];
1006     if(!_Cmi_mynode_global) 
1007       _partitionInfo.myPartition = 0;
1008   } else if(_partitionInfo.type == PARTITION_PREFIX) {
1009     token = strtok_r(_partitionInfo.partsizes, ",", &tptr);
1010     while (token)
1011     {
1012       int i,j;
1013       int hasdash=0, hascolon=0, hasdot=0;
1014       int start, end, stride = 1, block = 1, size;
1015       for (i = 0; i < strlen(token); i++) {
1016         if (token[i] == '-') hasdash=1;
1017         else if (token[i] == ':') hascolon=1;
1018         else if (token[i] == '.') hasdot=1;
1019       }
1020       if (hasdash) {
1021         if (hascolon) {
1022           if (hasdot) {
1023             if (sscanf(token, "%d-%d:%d.%d#%d", &start, &end, &stride, &block, &size) != 5)
1024               printf("Warning: Check the format of \"%s\".\n", token);
1025           }
1026           else {
1027             if (sscanf(token, "%d-%d:%d#%d", &start, &end, &stride, &size) != 4)
1028               printf("Warning: Check the format of \"%s\".\n", token);
1029           }
1030         }
1031         else {
1032           if (sscanf(token, "%d-%d#%d", &start, &end, &size) != 3)
1033             printf("Warning: Check the format of \"%s\".\n", token);
1034         }
1035       }
1036       else {
1037         if (sscanf(token, "%d#%d", &start, &size) != 2) {
1038           printf("Warning: Check the format of \"%s\".\n", token);
1039         }
1040         end = start;
1041       }
1042       if (block > stride) {
1043         printf("Warning: invalid block size in \"%s\" ignored.\n", token);
1044         block = 1;
1045       }
1046       for (i = start; i <= end; i += stride) {
1047         for (j = 0; j < block; j++) {
1048           if (i + j > end) break;
1049           _partitionInfo.partitionSize[i+j] = size;
1050         }
1051       }
1052       token = strtok_r(NULL, ",", &tptr);
1053     }
1054     _partitionInfo.partitionPrefix[0] = 0;
1055     _partitionInfo.myPartition = 0;
1056     for(i = 1; i < _partitionInfo.numPartitions; i++) {
1057       if(_partitionInfo.partitionSize[i-1] <= 0) {
1058         CmiAbort("Partition size has to be greater than zero.\n");
1059       }
1060       _partitionInfo.partitionPrefix[i] = _partitionInfo.partitionPrefix[i-1] + _partitionInfo.partitionSize[i-1];
1061       if((_Cmi_mynode_global >= _partitionInfo.partitionPrefix[i]) && (_Cmi_mynode_global < (_partitionInfo.partitionPrefix[i] + _partitionInfo.partitionSize[i]))) {
1062         _partitionInfo.myPartition = i;
1063       }
1064     } 
1065     if(_partitionInfo.partitionSize[i-1] <= 0) {
1066       CmiAbort("Partition size has to be greater than zero.\n");
1067     }
1068   }
1069   _Cmi_mynode = _Cmi_mynode - _partitionInfo.partitionPrefix[_partitionInfo.myPartition];
1070
1071   if(_partitionInfo.isTopoaware) {
1072     create_topoaware_partitions();
1073   }
1074 }
1075
1076 void CmiCreatePartitions(char **argv) {
1077   _Cmi_numnodes_global = _Cmi_numnodes;
1078   _Cmi_mynode_global = _Cmi_mynode;
1079   _Cmi_numpes_global = _Cmi_numnodes_global * _Cmi_mynodesize;
1080
1081   if(Cmi_nodestart != -1) {
1082     Cmi_nodestartGlobal = Cmi_nodestart;
1083   } else {
1084     Cmi_nodestartGlobal =  _Cmi_mynode_global * _Cmi_mynodesize;
1085   }
1086
1087   //creates partitions, reset _Cmi_mynode to be the new local rank
1088   if(_partitionInfo.numPartitions > _Cmi_numnodes_global) {
1089     CmiAbort("Number of partitions requested is greater than the number of nodes\n");
1090   }
1091   create_partition_map(argv);
1092   
1093   //reset other local variables
1094   _Cmi_numnodes = CmiMyPartitionSize();
1095   //mype and numpes will be set following this
1096 }
1097
1098 int node_lToGTranslate(int node, int partition) {
1099   int rank;
1100   if(_partitionInfo.type == PARTITION_SINGLETON) { 
1101     return node;
1102   } else if(_partitionInfo.type == PARTITION_DEFAULT) { 
1103     rank =  (partition * _partitionInfo.partitionSize[0]) + node;
1104   } else if(_partitionInfo.type == PARTITION_MASTER) {
1105     if(partition == 0) {
1106       CmiAssert(node == 0);
1107       rank =  0;
1108     } else {
1109       rank = 1 + ((partition - 1) * _partitionInfo.partitionSize[1]) + node;
1110     }
1111   } else if(_partitionInfo.type == PARTITION_PREFIX) {
1112     rank = _partitionInfo.partitionPrefix[partition] + node;
1113   } else {
1114     CmiAbort("Partition type did not match any of the supported types\n");
1115   }
1116   if(_partitionInfo.isTopoaware) {
1117     return _partitionInfo.nodeMap[rank];
1118   } else {
1119     return rank;
1120   }
1121 }
1122
1123 int pe_lToGTranslate(int pe, int partition) {
1124   if(_partitionInfo.type == PARTITION_SINGLETON) 
1125     return pe;
1126
1127   if(pe < CmiPartitionSize(partition)*CmiMyNodeSize()) {
1128     return node_lToGTranslate(CmiNodeOf(pe),partition)*CmiMyNodeSize() + CmiRankOf(pe);
1129   }
1130
1131   return CmiNumPesGlobal() + node_lToGTranslate(pe - CmiPartitionSize(partition)*CmiMyNodeSize(), partition);
1132 }
1133
1134 INLINE_KEYWORD int node_gToLTranslate(int node) {
1135   CmiAbort("Conversion from global rank to local rank is not supported. Please contact Charm++ developers with the use case.\n");
1136   return -1;
1137 }
1138
1139 INLINE_KEYWORD int pe_gToLTranslate(int pe) {
1140   CmiAbort("Conversion from global rank to local rank is not supported. Please contact Charm++ developers with the use case.\n");
1141   return -1;
1142 }
1143 //end of functions related to partition
1144
1145 CMI_EXTERNC_VARIABLE int quietMode;
1146 CMI_EXTERNC_VARIABLE int quietModeRequested;
1147
1148 #if defined(_WIN32)
1149 #include <windows.h> /* for SetEnvironmentVariable() and routines for CMK_SHARED_VARS_NT_THREADS */
1150 #define SET_ENV_VAR(key, value) SetEnvironmentVariable(key, value)
1151 #else
1152 #define SET_ENV_VAR(key, value) setenv(key, value, 0)
1153 #endif
1154 // For INT_MAX
1155 #include <limits.h>
1156
1157 #if CMK_LOCKLESS_QUEUE
1158 #define DefaultDataNodeSize 2048
1159 #define DefaultMaxDataNodes 2048
1160 extern int DataNodeSize;
1161 extern int MaxDataNodes;
1162 extern int QueueUpperBound;
1163 extern int DataNodeWrap;
1164 extern int QueueWrap;
1165 extern int messageQueueOverflow;
1166
1167 int power_of_two_check(int n)
1168 {
1169   return (n > 0 && (n & (n - 1)) == 0);
1170 }
1171
1172
1173 void check_and_set_queue_parameters()
1174 {
1175   if(DataNodeSize <= 0 || MaxDataNodes <= 0 || !power_of_two_check(DataNodeSize) || !power_of_two_check(MaxDataNodes))
1176   {
1177     CmiPrintf("MessageQueues: MessageQueueNodeSize: %d,  Check that this value is > 0 and a power of 2.\n", DataNodeSize);
1178     CmiPrintf("MessageQueues: MessageQueueNodes: %d, Check that this value is > 0 and a power of 2.\n", MaxDataNodes);
1179     CmiAbort("Invalid MessageQueue Parameters");
1180   }
1181   QueueUpperBound = DataNodeSize * MaxDataNodes;
1182   DataNodeWrap = DataNodeSize - 1;
1183   QueueWrap = QueueUpperBound - 1;
1184 }
1185 #endif
1186
1187 /* ##### Beginning of Functions Related with Machine Startup ##### */
1188 void ConverseInit(int argc, char **argv, CmiStartFn fn, int usched, int initret) {
1189     int _ii;
1190     int tmp;
1191     //handle output to files for partition if requested
1192     char *stdoutbase,*stdoutpath;
1193
1194
1195 #if CMK_WITH_STATS
1196     MSG_STATISTIC = CmiGetArgFlag(argv, "+msgstatistic");
1197 #endif
1198
1199   if (CmiGetArgFlagDesc(argv,"++quiet","Omit non-error runtime messages")) {
1200     quietModeRequested = quietMode = 1;
1201   }
1202
1203     CmiInitHwlocTopology();
1204
1205     /* processor per node */
1206     _Cmi_mynodesize = 1;
1207
1208     int ppnSet = 0;
1209     if (!(ppnSet = CmiGetArgInt(argv,"+ppn", &_Cmi_mynodesize)))
1210         ppnSet = CmiGetArgInt(argv,"++ppn", &_Cmi_mynodesize);
1211
1212     int npes = 1;
1213     int plusPSet = CmiGetArgInt(argv,"+p",&npes);
1214
1215     int auto_provision = CmiGetArgFlagDesc(argv, "+auto-provision", "fully utilize available resources");
1216     auto_provision |= CmiGetArgFlagDesc(argv, "+autoProvision", "fully utilize available resources");
1217     int onewth_per_host = CmiGetArgFlagDesc(argv, "+oneWthPerHost", "assign one worker thread per host");
1218     int onewth_per_socket = CmiGetArgFlagDesc(argv, "+oneWthPerSocket", "assign one worker thread per socket");
1219     int onewth_per_core = CmiGetArgFlagDesc(argv, "+oneWthPerCore", "assign one worker thread per core");
1220     int onewth_per_pu = CmiGetArgFlagDesc(argv, "+oneWthPerPU", "assign one worker thread per PU");
1221     int onewth_active = (onewth_per_host > 0) + (onewth_per_socket > 0) + (onewth_per_core > 0) + (onewth_per_pu > 0);
1222     if (onewth_active > 1 || onewth_active + (plusPSet || ppnSet) + auto_provision > 1)
1223     {
1224       CmiError("Error: Only one of +auto-provision, +oneWthPer(Host|Socket|Core|PU), or +p/++ppn is allowed.\n");
1225       exit(1);
1226     }
1227
1228 #if ! CMK_SMP
1229     if (plusPSet && npes != 1)
1230     {
1231       fprintf(stderr,
1232         "To use multiple processors, you must run this program as:\n"
1233         " > charmrun +p%d %s <args>\n"
1234         "or build the %s-smp version of Charm++.\n",
1235         npes,argv[0],CMK_MACHINE_NAME);
1236       exit(1);
1237     }
1238
1239     if ((_Cmi_mynodesize > 1 && _Cmi_mynode == 0) || onewth_per_socket || onewth_per_core || onewth_per_pu)
1240     {
1241       CmiError("Error: +oneWthPer(Socket|Core|PU) and +ppn can only be used in SMP mode.\n");
1242       exit(1);
1243     }
1244 #else
1245     if (onewth_active || auto_provision)
1246     {
1247       SET_ENV_VAR("CmiProcessPerHost", "1");
1248
1249       int ppn;
1250       if (onewth_per_host)
1251       {
1252         ppn = 1;
1253         SET_ENV_VAR("CmiOneWthPerHost", "1");
1254       }
1255       else if (onewth_per_socket)
1256       {
1257         ppn = CmiHwlocTopologyLocal.num_sockets;
1258         SET_ENV_VAR("CmiOneWthPerSocket", "1");
1259       }
1260       else if (onewth_per_core)
1261       {
1262         ppn = CmiHwlocTopologyLocal.num_cores;
1263         SET_ENV_VAR("CmiOneWthPerCore", "1");
1264       }
1265       else // if (onewth_per_pu || auto_provision)
1266       {
1267         ppn = CmiHwlocTopologyLocal.num_pus;
1268         SET_ENV_VAR("CmiOneWthPerPU", "1");
1269       }
1270 # if !CMK_MULTICORE
1271       // account for comm thread, if necessary to avoid oversubscribing PUs
1272       if (ppn + 1 > CmiHwlocTopologyLocal.num_pus)
1273         --ppn;
1274
1275       if (ppn == 0)
1276         ppn = 1;
1277 # endif
1278
1279       if (ppn <= 0)
1280       {
1281         CmiError("Error: Invalid request for %d PEs\n", ppn);
1282         exit(1);
1283       }
1284
1285       _Cmi_mynodesize = ppn;
1286     }
1287     else if (plusPSet)
1288     {
1289       if (ppnSet && _Cmi_mynodesize != npes)
1290       {
1291         // if you want to use redundant arguments they need to be consistent
1292         CmiError("Error: p != ppn, must not have inconsistent values. (%d != %d)\n"
1293           "Standalone invocation should use only one of [+p, +ppn, ++ppn].\n", npes, _Cmi_mynodesize);
1294         exit(1);
1295       }
1296       else
1297       { // +ppn wasn't set, make it the same as +p
1298         _Cmi_mynodesize = npes;
1299       }
1300     }
1301 # if CMK_NET_VERSION || CMK_IBVERBS || CMK_MULTICORE
1302 #  if CMK_MULTICORE
1303     else if (!ppnSet)
1304 #  else
1305     else if (!ppnSet && Cmi_charmrun_fd == -1)
1306 #  endif
1307     {
1308       if (!quietMode)
1309       {
1310         printf("Charm++> No provisioning arguments specified. Running with a single PE.\n"
1311                "         Use +auto-provision to fully subscribe resources or +p1 to silence this message.\n");
1312       }
1313     }
1314 # endif
1315 #endif
1316
1317     /* Network progress function is used to poll the network when for
1318     messages. This flushes receive buffers on some  implementations*/
1319     networkProgressPeriod = NETWORK_PROGRESS_PERIOD_DEFAULT;
1320     CmiGetArgInt(argv, "+networkProgressPeriod", &networkProgressPeriod);
1321
1322     /* _Cmi_mynodesize has to be obtained before LrtsInit
1323      * because it may be used inside LrtsInit
1324      */
1325     /* argv could be changed inside LrtsInit */
1326     /* Inside this function, the number of nodes and my node id are obtained */
1327 #if CMK_WITH_STATS
1328 if (  MSG_STATISTIC)
1329 {
1330     for(_ii=0; _ii<22; _ii++)
1331         msg_histogram[_ii] = 0;
1332 }
1333 #endif
1334 #if CMK_LOCKLESS_QUEUE
1335     /* Lockfree queue initialization */
1336     if (!CmiGetArgIntDesc(argv,"+MessageQueueNodes",&MaxDataNodes, "The size of the message queue static arrays")) {
1337       MaxDataNodes = DefaultMaxDataNodes;
1338     }
1339     if (!CmiGetArgIntDesc(argv,"+MessageQueueNodeSize",&DataNodeSize, "The size of the message queue data nodes")) {
1340       DataNodeSize = DefaultDataNodeSize;
1341     }
1342     check_and_set_queue_parameters();
1343 #endif
1344
1345     LrtsInit(&argc, &argv, &_Cmi_numnodes, &_Cmi_mynode);
1346
1347 #if MACHINE_DEBUG_LOG
1348     char ln[200];
1349     sprintf(ln,"debugLog.%d", _Cmi_mynode);
1350     debugLog=fopen(ln,"w");
1351     if (debugLog == NULL)
1352     {
1353         CmiAbort("debug file not open\n");
1354     }
1355 #endif
1356
1357
1358     if (_Cmi_mynode==0) {
1359 #if !CMK_SMP 
1360       if (!quietMode) printf("Charm++> Running in non-SMP mode: %d processes (PEs)\n", _Cmi_numnodes);
1361       MACHSTATE1(4,"running nonsmp %d", _Cmi_mynode)
1362 #else
1363
1364 #if !CMK_MULTICORE
1365       if (!quietMode) {
1366         // NOTE: calling CmiNumPes() here it sometimes returns zero
1367         int commThdsPerProcess = (Cmi_smp_mode_setting != COMM_THREAD_NOT_EXIST);
1368         int totalPes = CmiNumNodes() * _Cmi_mynodesize;
1369         printf("Charm++> Running in SMP mode: %d processes, %d worker threads (PEs) + %d comm threads per process, %d PEs total\n",
1370                CmiNumNodes(), _Cmi_mynodesize, commThdsPerProcess, totalPes);
1371       }
1372       if (Cmi_smp_mode_setting == COMM_THREAD_SEND_RECV) {
1373         if (!quietMode) printf("Charm++> The comm. thread both sends and receives messages\n");
1374       } else if (Cmi_smp_mode_setting == COMM_THREAD_ONLY_RECV) {
1375         if (!quietMode) printf("Charm++> The comm. thread only receives messages, while work threads send messages\n");
1376       } else if (Cmi_smp_mode_setting == COMM_WORK_THREADS_SEND_RECV) {
1377         if (!quietMode) printf("Charm++> Both  comm. thread and worker thread send and messages\n");
1378       } else if (Cmi_smp_mode_setting == COMM_THREAD_NOT_EXIST) {
1379         if (!quietMode) printf("Charm++> There's no comm. thread. Work threads both send and receive messages\n");
1380       } else {
1381         CmiAbort("Charm++> Invalid SMP mode setting\n");
1382       }
1383 #else
1384       if (!quietMode) printf("Charm++> Running in Multicore mode: %d threads (PEs)\n", _Cmi_mynodesize);
1385 #endif
1386       
1387 #endif
1388     }
1389
1390     CmiCreatePartitions(argv);
1391
1392     _Cmi_numpes = _Cmi_numnodes * _Cmi_mynodesize;
1393     Cmi_nodestart = _Cmi_mynode * _Cmi_mynodesize;
1394     Cmi_argvcopy = CmiCopyArgs(argv);
1395     Cmi_argv = argv;
1396     Cmi_startfn = fn;
1397     Cmi_usrsched = usched;
1398
1399     if ( CmiGetArgStringDesc(argv,"+stdout",&stdoutbase,"base filename to redirect partition stdout to") ) {
1400       stdoutpath = (char *)malloc(strlen(stdoutbase) + 30);
1401       sprintf(stdoutpath, stdoutbase, CmiMyPartition(), CmiMyPartition(), CmiMyPartition());
1402       if ( ! strcmp(stdoutpath, stdoutbase) ) {
1403         sprintf(stdoutpath, "%s.%d", stdoutbase, CmiMyPartition());
1404       }
1405       if ( CmiMyPartition() == 0 && CmiMyNode() == 0 && !quietMode) {
1406         printf("Redirecting stdout to files %s through %d\n",stdoutpath,CmiNumPartitions()-1);
1407       }
1408       if ( ! freopen(stdoutpath, "a", stdout) ) {
1409         fprintf(stderr,"Rank %d failed redirecting stdout to file %s: %s\n", CmiMyNodeGlobal(), stdoutpath,
1410             strerror(errno));
1411         CmiAbort("Error redirecting stdout to file.");
1412       }
1413       _writeToStdout = 0;
1414       free(stdoutpath);
1415     }
1416 #if CMK_SMP
1417     comm_mutex=CmiCreateLock();
1418 #endif
1419
1420 #if CMK_USE_PXSHM
1421     CmiInitPxshm(argv);
1422 #endif
1423 #if CMK_USE_XPMEM
1424     CmiInitXpmem(argv);
1425 #endif
1426 #if CMK_USE_CMA
1427     cma_works = 1;
1428     if (CmiGetArgFlagDesc(argv,"+cma_enable_all","Ignore default thresholds & Enable use of CMA for all intranode SHM transport")) {
1429       cma_min_threshold = 0U;
1430       cma_max_threshold = INT_MAX;
1431     } else {
1432       // CMA Message size lower bound
1433       if(!CmiGetArgIntDesc(argv,"+cma_min_threshold", &cma_min_threshold,"CMA Message size (Bytes) lower bound")) {
1434         // Default cma_min_threshold
1435         cma_min_threshold = CMK_CMA_MIN;
1436       }
1437       // CMA Message size upper bound
1438       if(!CmiGetArgIntDesc(argv,"+cma_max_threshold", &cma_max_threshold,"CMA Message size (Bytes) upper bound")) {
1439         // Default cma_max_threshold
1440         cma_max_threshold = CMK_CMA_MAX;
1441       }
1442       // Disable CMA
1443       if (CmiGetArgFlagDesc(argv,"+cma_disable","Disable use of CMA for SHM transport")) {
1444         cma_works = 0;
1445         cma_reg_msg = false;
1446       }
1447     }
1448
1449     if(cma_works) {
1450       // Initialize CMA if it is not disable to check if the OS supports it
1451       cma_works = CmiInitCma();
1452       /* To use CMA, check that the global variable cma_works flag is set to 1.
1453        * If it is 0, it indicates that CMA calls are supported but operations do
1454        * not have the right permissions for usage. If cma_works is 0, CMA cannot be
1455        * used by the RTS. CMA_HAS_CMA being disabled indicates that CMA calls are not
1456        * even supported by the OS.
1457        */
1458
1459       // Enable/disable disable regular messaging
1460       // TODO: Unset this flag to get CMA working for regular messages
1461       cma_reg_msg = 0;
1462
1463       // Display CMA thresholds for regular messages if it is enabled
1464       if(cma_reg_msg)
1465         CmiDisplayCMAThresholds(cma_min_threshold, cma_max_threshold);
1466     }
1467 #endif
1468
1469     /* CmiTimerInit(); */
1470 #if CMK_BROADCAST_HYPERCUBE
1471     /* CmiNodesDim = ceil(log2(CmiNumNodes)) except when #nodes is 1*/
1472     tmp = CmiNumNodes()-1;
1473     CmiNodesDim = 0;
1474     while (tmp>0) {
1475         CmiNodesDim++;
1476         tmp = tmp >> 1;
1477     }
1478     if (CmiNumNodes()==1) CmiNodesDim=1;
1479 #endif
1480
1481     CsvInitialize(CmiNodeState, NodeState);
1482     CmiNodeStateInit(&CsvAccess(NodeState));
1483
1484 #if CMK_OFFLOAD_BCAST_PROCESS
1485     /* the actual queues should be created on comm thread considering NUMA in SMP */
1486     CsvInitialize(CMIQueue, procBcastQ);
1487 #if CMK_NODE_QUEUE_AVAILABLE
1488     CsvInitialize(CMIQueue, nodeBcastQ);
1489 #endif
1490 #endif
1491
1492 #if CMK_SMP && CMK_LEVERAGE_COMMTHREAD
1493     CsvInitialize(CMIQueue, notifyCommThdMsgBuffer);
1494 #endif
1495
1496     CmiStartThreads(argv);
1497
1498     ConverseRunPE(initret);
1499 }
1500
1501 CMI_EXTERNC
1502 void ConverseCommonInit(char **argv);
1503 CMI_EXTERNC
1504 void CthInit(char **argv);
1505 static void ConverseRunPE(int everReturn) {
1506     CmiState cs;
1507     char** CmiMyArgv;
1508
1509 #if CMK_CCS_AVAILABLE
1510 /**
1511  * The reason to initialize this variable here:
1512  * cmiArgDebugFlag is possibly accessed in CmiPrintf/CmiError etc.,
1513  * therefore, we have to initialize this variable before any calls
1514  * to those functions (such as CmiPrintf). Otherwise, we may encounter
1515  * a memory segmentation fault (bad memory access). Note, even
1516  * testing CpvInitialized(cmiArgDebugFlag) doesn't help to solve
1517  * this problem because the variable indicating whether cmiArgDebugFlag is
1518  * initialized or not is not initialized, thus possibly causing another
1519  * bad memory access. --Chao Mei
1520  */
1521   CpvInitialize(int, cmiArgDebugFlag);
1522   CpvAccess(cmiArgDebugFlag) = 0;
1523 #endif
1524
1525     LrtsPreCommonInit(everReturn);
1526
1527 #if CMK_OFFLOAD_BCAST_PROCESS
1528     int createQueue = 1;
1529 #if CMK_SMP
1530 #if CMK_SMP_NO_COMMTHD
1531     /* If there's no comm thread, then the queue is created on rank 0 */
1532     if (CmiMyRank()) createQueue = 0;
1533 #else
1534     if (CmiMyRank()<CmiMyNodeSize()) createQueue = 0;
1535 #endif
1536 #endif
1537
1538     if (createQueue) {
1539         CsvAccess(procBcastQ) = CMIQueueCreate();
1540 #if CMK_NODE_QUEUE_AVAILABLE
1541         CsvAccess(nodeBcastQ) = CMIQueueCreate();
1542 #endif
1543     }
1544 #endif
1545
1546     CmiNodeAllBarrier();
1547
1548     cs = CmiGetState();
1549     CpvInitialize(void *,CmiLocalQueue);
1550     CpvAccess(CmiLocalQueue) = cs->localqueue;
1551
1552     if (CmiMyRank())
1553         CmiMyArgv=CmiCopyArgs(Cmi_argvcopy);
1554     else
1555         CmiMyArgv=Cmi_argv;
1556
1557     CthInit(CmiMyArgv);
1558 #if CMK_OMP
1559     CmiNodeAllBarrier();
1560 #endif
1561     /* initialize the network progress counter*/
1562     /* Network progress function is used to poll the network when for
1563        messages. This flushes receive buffers on some  implementations*/
1564     CpvInitialize(unsigned , networkProgressCount);
1565     CpvAccess(networkProgressCount) = 0;
1566
1567     ConverseCommonInit(CmiMyArgv);
1568 #if CMK_OMP
1569     CpvAccess(CmiSuspendedTaskQueue) = (void *)CMIQueueCreate();
1570     CmiNodeAllBarrier();
1571 #endif
1572    
1573     // register idle events
1574
1575 #if CMK_SMP
1576     {
1577       CmiIdleState *sidle=CmiNotifyGetState();
1578       CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)sidle);
1579       CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)sidle);
1580     }
1581 #else
1582     CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle, NULL);
1583     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle, NULL);
1584 #endif
1585
1586
1587     LrtsPostCommonInit(everReturn);
1588
1589 #if CMK_SMP && CMK_LEVERAGE_COMMTHREAD
1590     CmiInitNotifyCommThdScheme();
1591 #endif
1592     /* Converse initialization finishes, immediate messages can be processed.
1593        node barrier previously should take care of the node synchronization */
1594     _immediateReady = 1;
1595
1596     /* communication thread */
1597     if (CmiMyRank() == CmiMyNodeSize()) {
1598       Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
1599       if(!CharmLibInterOperate) {
1600         while (1) CommunicationServerThread(5);
1601       }
1602     } else { /* worker thread */
1603       if (!everReturn) {
1604         Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
1605         if (Cmi_usrsched==0) CsdScheduler(-1);
1606         if(!CharmLibInterOperate) {
1607           ConverseExit();
1608         } 
1609       }
1610     }
1611 }
1612 /* ##### End of Functions Related with Machine Startup ##### */
1613
1614 /* ##### Beginning of Functions Related with Machine Running ##### */
1615 static INLINE_KEYWORD void AdvanceCommunication(int whenidle) {
1616     int doProcessBcast = 1;
1617
1618 #if CMK_USE_PXSHM
1619     CommunicationServerPxshm();
1620 #endif
1621 #if CMK_USE_XPMEM
1622     CommunicationServerXpmem();
1623 #endif
1624
1625     LrtsAdvanceCommunication(whenidle);
1626
1627 #if CMK_OFFLOAD_BCAST_PROCESS
1628 #if CMK_SMP_NO_COMMTHD
1629     /*FIXME: only asks rank 0 to process bcast msgs, so perf may suffer*/
1630     if (CmiMyRank()) doProcessBcast = 0;
1631 #endif
1632     if (doProcessBcast) processBcastQs();
1633 #endif
1634
1635 #if CMK_IMMEDIATE_MSG
1636 #if !CMK_SMP
1637     CmiHandleImmediate();
1638 #endif
1639 #if CMK_SMP && CMK_SMP_NO_COMMTHD
1640     if (CmiMyRank()==0) CmiHandleImmediate();
1641 #endif
1642 #endif
1643 }
1644
1645 CMI_EXTERNC
1646 void ConverseCommonExit(void);
1647
1648 static void CommunicationServer(int sleepTime) {
1649 #if CMK_SMP 
1650     AdvanceCommunication(1);
1651
1652     if (std::atomic_load_explicit(&commThdExit, std::memory_order_acquire) == CmiMyNodeSize()) {
1653         MACHSTATE(2, "CommunicationServer exiting {");
1654         LrtsDrainResources();
1655         MACHSTATE(2, "} CommunicationServer EXIT");
1656
1657         ConverseCommonExit();
1658   
1659 #if CMK_USE_XPMEM
1660         CmiExitXpmem();
1661 #endif
1662         CmiNodeAllBarrier();
1663         LrtsExit(_exitcode);
1664     }
1665 #endif
1666 }
1667
1668 void CommunicationServerThread(int sleepTime) {
1669     CommunicationServer(sleepTime);
1670 #if CMK_IMMEDIATE_MSG
1671     CmiHandleImmediate();
1672 #endif
1673 }
1674
1675 void ConverseExit(int exitcode) {
1676     int i;
1677     if (quietModeRequested) quietMode = 1;
1678 #if !CMK_SMP || CMK_SMP_NO_COMMTHD
1679     LrtsDrainResources();
1680 #else
1681         if(Cmi_smp_mode_setting == COMM_THREAD_ONLY_RECV
1682            || Cmi_smp_mode_setting == COMM_THREAD_NOT_EXIST)
1683                 LrtsDrainResources();
1684 #endif
1685
1686     ConverseCommonExit();
1687
1688 #if CMK_WITH_STATS
1689 if (MSG_STATISTIC)
1690 {
1691     for(i=0; i<22; i++)
1692     {
1693         CmiPrintf("[MSG PE:%d]", CmiMyPe());
1694         if(msg_histogram[i] >0)
1695             CmiPrintf("(%d:%d) ", 1<<i, msg_histogram[i]);
1696     }
1697     CmiPrintf("\n");
1698 }
1699 #endif
1700
1701 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
1702     if (CmiMyPe() == 0) 
1703       CmiPrintf("[Partition %d][Node %d] End of program\n",CmiMyPartition(),CmiMyNode());
1704 #endif
1705
1706 #if !CMK_SMP || CMK_BLUEGENEQ || CMK_PAMI_LINUX_PPC8
1707 #if CMK_USE_PXSHM
1708     CmiExitPxshm();
1709 #endif
1710 #if CMK_USE_XPMEM
1711     CmiExitXpmem();
1712 #endif
1713     LrtsExit(exitcode);
1714 #else
1715     /* In SMP, the communication thread will exit */
1716     if (CmiMyRank() == 0)
1717         _exitcode = exitcode;
1718     std::atomic_fetch_add_explicit(&commThdExit, 1, std::memory_order_release); /* atomic increment */
1719     CmiNodeAllBarrier();
1720 #if CMK_SMP_NO_COMMTHD
1721 #if CMK_USE_XPMEM
1722     if (CmiMyRank() == 0) CmiExitXpmem();
1723     CmiNodeAllBarrier();
1724 #endif
1725     if (CmiMyRank() == 0) LrtsExit();
1726 #endif
1727     CmiYield();
1728     if (!CharmLibInterOperate || userDrivenMode) {
1729       while (1) {
1730         CmiYield();
1731       }
1732     }
1733 #endif
1734 }
1735 /* ##### End of Functions Related with Machine Running ##### */
1736
1737 void CmiAbortHelper(const char *source, const char *message, const char *suggestion,
1738                     int tellDebugger, int framesToSkip) {
1739   if (tellDebugger)
1740     CpdAborting(message);
1741
1742   if (CmiNumPartitions() == 1) {
1743     CmiError("------------- Processor %d Exiting: %s ------------\n"
1744              "Reason: %s\n", CmiMyPe(), source, message);
1745   } else {
1746     CmiError("------- Partition %d Processor %d Exiting: %s ------\n"
1747              "Reason: %s\n", CmiMyPartition(), CmiMyPe(), source, message);
1748   }
1749
1750   if (suggestion && suggestion[0])
1751     CmiError("Suggestion: %s\n", suggestion);
1752
1753   CmiPrintStackTrace(framesToSkip);
1754
1755 #if CMK_USE_PXSHM
1756   CmiExitPxshm();
1757 #endif
1758 #if CMK_USE_XPMEM
1759   CmiExitXpmem();
1760 #endif
1761   LrtsAbort(message);
1762 }
1763
1764 void CmiAbort(const char *message) {
1765   CmiAbortHelper("Called CmiAbort", message, NULL, 1, 0);
1766   CMI_NORETURN_FUNCTION_END
1767 }
1768
1769 /* ##### Beginning of Functions Providing Incoming Network Messages ##### */
1770 void *CmiGetNonLocal(void) {
1771     CmiState cs = CmiGetState();
1772     void *msg = NULL;
1773 #if !CMK_SMP || CMK_SMP_NO_COMMTHD
1774     /**
1775       * In SMP mode with comm thread, it's possible a normal
1776       * msg is sent from an immediate msg which is executed
1777       * on comm thread. In this case, the msg is sent to
1778       * the network queue of the work thread. Therefore,
1779       * even there's only one worker thread, the polling of
1780       * network queue is still required.
1781       */
1782 #if CMK_CCS_AVAILABLE
1783     if (CmiNumPes() == 1 && CmiNumPartitions() == 1 && ccsRunning != 1) return NULL;
1784 #else
1785     if (CmiNumPes() == 1 && CmiNumPartitions() == 1) return NULL;
1786 #endif
1787 #endif
1788
1789     MACHSTATE2(3, "[%p] CmiGetNonLocal begin %d{", cs, CmiMyPe());
1790
1791 #if CMK_MACH_SPECIALIZED_QUEUE
1792     msg = LrtsSpecializedQueuePop();
1793 #else
1794     CmiIdleLock_checkMessage(&cs->idle);
1795     /* ?????although it seems that lock is not needed, I found it crashes very often
1796        on mpi-smp without lock */
1797     msg = CMIQueuePop(cs->recv);
1798 #endif
1799 #if (!CMK_SMP || CMK_SMP_NO_COMMTHD) && !CMK_MULTICORE
1800     if (!msg) {
1801        AdvanceCommunication(0);
1802 #if CMK_MACH_SPECIALIZED_QUEUE
1803        msg = LrtsSpecializedQueuePop();
1804 #else
1805        msg = CMIQueuePop(cs->recv);
1806 #endif
1807     }
1808 #else
1809 //    LrtsPostNonLocal();
1810 #endif
1811 #if CMK_CCS_AVAILABLE
1812     if(msg != NULL && CmiNumPes() == 1 && CmiNumPartitions() == 1 )
1813     {
1814       ccsRunning = 0;
1815     }
1816 #endif
1817
1818     MACHSTATE3(3,"[%p] CmiGetNonLocal from queue %p with msg %p end }",CmiGetState(),(cs->recv), msg);
1819
1820     return msg;
1821 }
1822
1823 #if CMK_NODE_QUEUE_AVAILABLE
1824 void *CmiGetNonLocalNodeQ(void) {
1825     char *result = 0;
1826
1827 #if CMK_MACH_SPECIALIZED_QUEUE && CMK_MACH_SPECIALIZED_MUTEX
1828     if (!LrtsSpecializedNodeQueueEmpty()) {
1829       if (LrtsSpecializedMutexTryAcquire() == 0) {
1830         result = LrtsSpecializedNodeQueuePop();
1831         LrtsSpecializedMutexRelease();
1832       }
1833     }
1834 #elif CMK_MACH_SPECIALIZED_QUEUE
1835     if(!LrtsSpecializedNodeQueueEmpty()) {
1836       if(CmiTryLock(CsvAccess(NodeState).CmiNodeRecvLock) == 0) {
1837         result = LrtsSpecializedNodeQueuePop();
1838         CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
1839       }
1840
1841     }
1842 #else
1843     CmiState cs = CmiGetState();
1844     CmiIdleLock_checkMessage(&cs->idle);
1845 #if CMK_LOCKLESS_QUEUE
1846     if (!MPMCQueueEmpty(CsvAccess(NodeState).NodeRecv)) {
1847 #else
1848     if (!CMIQueueEmpty(CsvAccess(NodeState).NodeRecv)) {
1849 #endif
1850         MACHSTATE1(3,"CmiGetNonLocalNodeQ begin %d {", CmiMyPe());
1851 #if CMK_LOCKLESS_QUEUE
1852         result = (char *) MPMCQueuePop(CsvAccess(NodeState).NodeRecv);
1853 #else
1854         CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
1855         result = (char *) CMIQueuePop(CsvAccess(NodeState).NodeRecv);
1856         CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
1857 #endif
1858         MACHSTATE1(3,"} CmiGetNonLocalNodeQ end %d ", CmiMyPe());
1859     }
1860 #endif
1861     return result;
1862
1863 }
1864 #endif
1865 /* ##### End of Functions Providing Incoming Network Messages ##### */
1866
1867 static CmiIdleState *CmiNotifyGetState(void) {
1868     CmiIdleState *s=(CmiIdleState *)malloc(sizeof(CmiIdleState));
1869     s->sleepMs=0;
1870     s->nIdles=0;
1871     s->cs=CmiGetState();
1872     return s;
1873 }
1874
1875 static void CmiNotifyBeginIdle(CmiIdleState *s) {
1876     if(s!= NULL){
1877         s->sleepMs=0;
1878         s->nIdles=0;
1879     }
1880     LrtsBeginIdle();
1881 }
1882
1883 /*Number of times to spin before sleeping*/
1884 #define SPINS_BEFORE_SLEEP 20
1885 static void CmiNotifyStillIdle(CmiIdleState *s) {
1886     MACHSTATE1(2,"still idle (%d) begin {",CmiMyPe())
1887 #if (!CMK_SMP || CMK_SMP_NO_COMMTHD) && !CMK_MULTICORE
1888     AdvanceCommunication(1);
1889 #else
1890     LrtsPostNonLocal();
1891
1892 #if CMK_SHARED_VARS_POSIX_THREADS_SMP
1893     if (_Cmi_sleepOnIdle)
1894 #endif
1895     {
1896     s->nIdles++;
1897     if (s->nIdles>SPINS_BEFORE_SLEEP) { /*Start giving some time back to the OS*/
1898         s->sleepMs+=2;
1899         if (s->sleepMs>10) s->sleepMs=10;
1900     }
1901
1902     if (s->sleepMs>0) {
1903         MACHSTATE1(2,"idle lock(%d) {",CmiMyPe())
1904         CmiIdleLock_sleep(&s->cs->idle,s->sleepMs);
1905         MACHSTATE1(2,"} idle lock(%d)",CmiMyPe())
1906     }
1907     }
1908 #endif
1909     LrtsStillIdle();
1910     CsdResetPeriodic();
1911     MACHSTATE1(2,"still idle (%d) end {",CmiMyPe())
1912 }
1913
1914 /* usually called in non-smp mode */
1915 void CmiNotifyIdle(void) {
1916     AdvanceCommunication(1);
1917     CmiYield();
1918     LrtsNotifyIdle();
1919 }
1920
1921 /* Utiltiy functions */
1922 static char *CopyMsg(char *msg, int len) {
1923     char *copy = (char *)CmiAlloc(len);
1924 #if CMK_ERROR_CHECKING
1925     if (!copy) {
1926         CmiAbort("Error: out of memory in machine layer\n");
1927     }
1928 #endif
1929     memcpy(copy, msg, len);
1930     return copy;
1931 }
1932
1933 /************Barrier Related Functions****************/
1934 /* must be called on all ranks including comm thread in SMP */
1935 int CmiBarrier(void) {
1936 #if CMK_SMP
1937     /* make sure all ranks reach here, otherwise comm threads may reach barrier ignoring other ranks  */
1938     CmiNodeAllBarrier();
1939 #endif
1940 #if ( CMK_SMP && !CMK_SMP_NO_COMMTHD)
1941     if (CmiMyRank() == CmiMyNodeSize())
1942     {
1943 #else
1944     if (CmiMyRank() == 0)
1945     {
1946 #endif
1947         LrtsBarrier();
1948     }
1949 #if CMK_SMP
1950     CmiNodeAllBarrier();
1951 #endif
1952     return 0;
1953 }
1954
1955 /**********Lock Related Functions**********/
1956 #if CMK_USE_COMMON_LOCK
1957 #if CMK_SHARED_VARS_UNAVAILABLE /*Non-smp version of locks*/
1958
1959 LrtsNodeLock LrtsCreateLock(void){ return 0; }
1960 void LrtsLock(LrtsNodeLock lock){ (lock)++; }
1961 void LrtsUnlock(LrtsNodeLock lock){ (lock)--; }
1962 int LrtsTryLock(LrtsNodeLock lock){ return ((lock)?1:((lock)=1,0)); }
1963 void LrtsDestroyLock(LrtsNodeLock lock){ /* empty */ }
1964
1965 #else /*smp version*/
1966 #if CMK_SHARED_VARS_NT_THREADS /*Used only by win versions*/
1967
1968 LrtsNodeLock LrtsCreateLock(void){
1969     HANDLE hMutex = CreateMutex(NULL, FALSE, NULL);
1970     return hMutex;
1971 }
1972 void LrtsLock(LrtsNodeLock lock){
1973     WaitForSingleObject(lock, INFINITE);
1974 }
1975 void LrtsUnlock(LrtsNodeLock lock){
1976     ReleaseMutex(lock);
1977 }
1978 int LrtsTryLock(LrtsNodeLock lock){
1979     return !!WaitForSingleObject(lock, 0);
1980 }
1981 void LrtsDestroyLock(LrtsNodeLock lock){
1982     CloseHandle(lock);
1983 }
1984
1985 #else /*other smp versions uses pthread mutex by default*/
1986 LrtsNodeLock LrtsCreateLock(void){
1987     void *l = malloc(sizeof(pthread_mutex_t));
1988     pthread_mutex_init((pthread_mutex_t *)l,(pthread_mutexattr_t *)0);
1989     return (LrtsNodeLock)l;
1990 }
1991 void LrtsLock(LrtsNodeLock lock){
1992     pthread_mutex_lock((pthread_mutex_t*)lock);
1993 }
1994 void LrtsUnlock(LrtsNodeLock lock){
1995     pthread_mutex_unlock((pthread_mutex_t*)lock);
1996 }
1997 int LrtsTryLock(LrtsNodeLock lock){
1998     return pthread_mutex_trylock((pthread_mutex_t*)lock);
1999 }
2000 void LrtsDestroyLock(LrtsNodeLock lock){
2001     pthread_mutex_destroy((pthread_mutex_t*)lock);
2002     free(lock);
2003 }
2004 #endif
2005
2006 #endif //CMK_SHARED_VARS_UNAVAILABLE
2007 #endif //CMK_USE_COMMON_LOCK