restructure machine-common, split broadcast related
[charm.git] / src / arch / util / machine-common-core.c
1 /*
2  *created by Chao Mei
3  *revised by Yanhua
4  */
5 #if CMK_C_INLINE
6 #define INLINE_KEYWORD inline
7 #else
8 #define INLINE_KEYWORD
9 #endif
10
11 /******* broadcast related  */
12 #ifndef CMK_BROADCAST_SPANNING_TREE
13 #define CMK_BROADCAST_SPANNING_TREE    1
14 #endif
15
16 #ifndef CMK_BROADCAST_HYPERCUBE
17 #define CMK_BROADCAST_HYPERCUBE        0
18 #endif
19
20 #define BROADCAST_SPANNING_FACTOR      4
21 /* The number of children used when a msg is broadcast inside a node */
22 #define BROADCAST_SPANNING_INTRA_FACTOR  8
23
24 /* Root of broadcast:
25  * non-bcast msg: root = 0;
26  * proc-level bcast msg: root >=1; (CmiMyPe()+1)
27  * node-level bcast msg: root <=-1; (-CmiMyNode()-1)
28  */
29 #define CMI_BROADCAST_ROOT(msg)          ((CmiMsgHeaderBasic *)msg)->root
30 #define CMI_SET_BROADCAST_ROOT(msg, root)  CMI_BROADCAST_ROOT(msg) = (root);
31
32 /**
33  * For some machine layers such as on Active Message framework,
34  * the receiver callback is usally executed on an internal
35  * thread (i.e. not the flow managed by ours). Therefore, for
36  * forwarding broadcast messages, we could have a choice whether
37  * to offload such function to the flow we manage such as the
38  * communication thread. -
39  */
40
41 #ifndef CMK_OFFLOAD_BCAST_PROCESS
42 #define CMK_OFFLOAD_BCAST_PROCESS 0
43 #endif
44
45 #if CMK_OFFLOAD_BCAST_PROCESS
46 CsvDeclare(PCQueue, procBcastQ);
47 #if CMK_NODE_QUEUE_AVAILABLE
48 CsvDeclare(PCQueue, nodeBcastQ);
49 #endif
50 #endif
51
52 #if CMK_BROADCAST_HYPERCUBE
53 /* ceil(log2(CmiNumNodes)) except when _Cmi_numnodes is 1, used for hypercube */
54 static int CmiNodesDim;
55 #endif
56 /* ###End of Broadcast related definitions ### */
57
58
59 static void handleOneBcastMsg(int size, char *msg);
60 static void processBcastQs();
61
62 /* Utility functions for forwarding broadcast messages,
63  * should not be used in machine-specific implementations
64  * except in some special occasions.
65  */
66 static void processProcBcastMsg(int size, char *msg);
67 static void processNodeBcastMsg(int size, char *msg);
68 static void SendSpanningChildrenProc(int size, char *msg);
69 static void SendHyperCubeProc(int size, char *msg);
70 #if CMK_NODE_QUEUE_AVAILABLE
71 static void SendSpanningChildrenNode(int size, char *msg);
72 static void SendHyperCubeNode(int size, char *msg);
73 #endif
74
75 static void SendSpanningChildren(int size, char *msg, int rankToAssign, int startNode);
76 static void SendHyperCube(int size,  char *msg, int rankToAssign, int startNode);
77
78 #if USE_COMMON_SYNC_BCAST || USE_COMMON_ASYNC_BCAST
79 #if !CMK_BROADCAST_SPANNING_TREE && !CMK_BROADCAST_HYPERCUBE
80 #warning "Broadcast function is based on the plain P2P O(P)-message scheme!!!"
81 #endif
82 #endif
83
84
85 void CmiSyncBroadcastFn(int size, char *msg);
86 CmiCommHandle CmiAsyncBroadcastFn(int size, char *msg);
87 void CmiFreeBroadcastFn(int size, char *msg);
88
89 void CmiSyncBroadcastAllFn(int size, char *msg);
90 CmiCommHandle CmiAsyncBroadcastAllFn(int size, char *msg);
91 void CmiFreeBroadcastAllFn(int size, char *msg);
92
93 #if CMK_NODE_QUEUE_AVAILABLE
94 void CmiSyncNodeBroadcastFn(int size, char *msg);
95 CmiCommHandle CmiAsyncNodeBroadcastFn(int size, char *msg);
96 void CmiFreeNodeBroadcastFn(int size, char *msg);
97
98 void CmiSyncNodeBroadcastAllFn(int size, char *msg);
99 CmiCommHandle CmiAsyncNodeBroadcastAllFn(int size, char *msg);
100 void CmiFreeNodeBroadcastAllFn(int size, char *msg);
101 #endif
102
103 /************** Done with Broadcast related */
104
105 #define CMI_DEST_RANK(msg)               ((CmiMsgHeaderBasic *)msg)->rank
106
107 #ifndef CMK_HAS_SIZE_IN_MSGHDR
108 #define CMK_HAS_SIZE_IN_MSGHDR 1
109 #endif
110 #if CMK_HAS_SIZE_IN_MSGHDR
111 #define CMI_MSG_SIZE(msg)  ((CmiMsgHeaderBasic *)msg)->size
112 #else
113 #define CMI_MSG_SIZE(msg)  (CmiAbort("Has no msg size in header"))
114 #endif
115
116 #if CMK_NODE_QUEUE_AVAILABLE
117 /* This value should be larger than the number of cores used
118  * per charm smp node. So it's currently set to such a large
119  * value.
120  */
121 #define DGRAM_NODEMESSAGE   (0x1FFB)
122 #endif
123
124 /* Node state structure */
125 int               _Cmi_mynode;    /* Which address space am I */
126 int               _Cmi_mynodesize;/* Number of processors in my address space */
127 int               _Cmi_numnodes;  /* Total number of address spaces */
128 int               _Cmi_numpes;    /* Total number of processors */
129
130 CpvDeclare(void*, CmiLocalQueue);
131
132 /* different modes for sending a message */
133 #define P2P_SYNC 0x1
134 #define P2P_ASYNC 0x2
135 #define BCAST_SYNC 0x3
136 #define BCAST_ASYNC 0x4
137
138 #if CMK_SMP
139 static volatile int commThdExit = 0;
140 static CmiNodeLock  commThdExitLock = 0;
141
142 /**
143  *  The macro defines whether to have a comm thd to offload some
144  *  work such as forwarding bcast messages etc. This macro
145  *  should be defined before including "machine-smp.c". Note
146  *  that whether a machine layer in SMP mode could run w/o comm
147  *  thread depends on the support of the underlying
148  *  communication library.
149  *
150  */
151 #ifndef CMK_SMP_NO_COMMTHD
152 #define CMK_SMP_NO_COMMTHD 0
153 #endif
154
155 #if CMK_SMP_NO_COMMTHD
156 int Cmi_commthread = 0;
157 #else
158 int Cmi_commthread = 1;
159 #endif
160
161 #endif
162
163 /*SHOULD BE MOVED TO MACHINE-SMP.C ??*/
164 static int Cmi_nodestart;
165
166 /*
167  * Network progress utility variables. Period controls the rate at
168  * which the network poll is called
169  */
170 #ifndef NETWORK_PROGRESS_PERIOD_DEFAULT
171 #define NETWORK_PROGRESS_PERIOD_DEFAULT 1000
172 #endif
173
174 CpvDeclare(unsigned , networkProgressCount);
175 int networkProgressPeriod;
176
177
178 /* ===== Beginning of Common Function Declarations ===== */
179 void CmiAbort(const char *message);
180 static void PerrorExit(const char *msg);
181
182 /* This function handles the msg received as which queue to push into */
183 static void handleOneRecvedMsg(int size, char *msg);
184
185 /* Utility functions for forwarding broadcast messages,
186  * should not be used in machine-specific implementations
187  * except in some special occasions.
188  */
189 static void SendToPeers(int size, char *msg);
190
191
192 void CmiPushPE(int rank, void *msg);
193
194 #if CMK_NODE_QUEUE_AVAILABLE
195 void CmiPushNode(void *msg);
196 #endif
197
198 /* Functions regarding send ops declared in converse.h */
199
200 /* In default, using the common codes for msg sending */
201 #ifndef USE_COMMON_SYNC_P2P
202 #define USE_COMMON_SYNC_P2P 1
203 #endif
204 #ifndef USE_COMMON_ASYNC_P2P
205 #define USE_COMMON_ASYNC_P2P 1
206 #endif
207 #ifndef USE_COMMON_SYNC_BCAST
208 #define USE_COMMON_SYNC_BCAST 1
209 #endif
210 #ifndef USE_COMMON_ASYNC_BCAST
211 #define USE_COMMON_ASYNC_BCAST 1
212 #endif
213
214 static void CmiSendSelf(char *msg);
215
216 void CmiSyncSendFn(int destPE, int size, char *msg);
217 CmiCommHandle CmiAsyncSendFn(int destPE, int size, char *msg);
218 void CmiFreeSendFn(int destPE, int size, char *msg);
219
220 #if CMK_NODE_QUEUE_AVAILABLE
221 static void CmiSendNodeSelf(char *msg);
222
223 void CmiSyncNodeSendFn(int destNode, int size, char *msg);
224 CmiCommHandle CmiAsyncNodeSendFn(int destNode, int size, char *msg);
225 void CmiFreeNodeSendFn(int destNode, int size, char *msg);
226
227 #endif
228
229 /* Functions and variables regarding machine startup */
230 static char     **Cmi_argv;
231 static char     **Cmi_argvcopy;
232 static CmiStartFn Cmi_startfn;   /* The start function */
233 static int        Cmi_usrsched;  /* Continue after start function finishes? */
234 void ConverseInit(int argc, char **argv, CmiStartFn fn, int usched, int initret);
235 static void ConverseRunPE(int everReturn);
236
237 /* Functions regarding machine running on every proc */
238 static void AdvanceCommunication();
239 static void CommunicationServer(int sleepTime);
240 static void CommunicationServerThread(int sleepTime);
241 void ConverseExit(void);
242
243 /* Functions providing incoming network messages */
244 void *CmiGetNonLocal(void);
245 #if CMK_NODE_QUEUE_AVAILABLE
246 void *CmiGetNonLocalNodeQ(void);
247 #endif
248 /* Utiltiy functions */
249 static char *CopyMsg(char *msg, int len);
250
251 /* ===== End of Common Function Declarations ===== */
252
253 #include "machine-smp.c"
254
255 /* ===== Beginning of Idle-state Related Declarations =====  */
256 typedef struct {
257     int sleepMs; /*Milliseconds to sleep while idle*/
258     int nIdles; /*Number of times we've been idle in a row*/
259     CmiState cs; /*Machine state*/
260 } CmiIdleState;
261
262 static CmiIdleState *CmiNotifyGetState(void);
263
264 /**
265  *  Generally,
266  *
267  *  CmiNotifyIdle is used in non-SMP mode when the proc is idle.
268  *  When the proc is idle, AdvanceCommunication needs to be
269  *  called.
270  *
271  *  CmiNotifyStillIdle and CmiNotifyBeginIdle are used in SMP mode.
272  *
273  *  Different layers have choices of registering different callbacks for
274  *  idle state.
275  */
276 static void CmiNotifyBeginIdle(CmiIdleState *s);
277 static void CmiNotifyStillIdle(CmiIdleState *s);
278 void CmiNotifyIdle(void);
279 /* ===== End of Idle-state Related Declarations =====  */
280
281 /* ===== Beginning of Processor/Node State-related Stuff =====*/
282 #if !CMK_SMP
283 /************ non SMP **************/
284 static struct CmiStateStruct Cmi_state;
285 int _Cmi_mype;
286 int _Cmi_myrank;
287
288 void CmiMemLock() {}
289 void CmiMemUnlock() {}
290
291 #define CmiGetState() (&Cmi_state)
292 #define CmiGetStateN(n) (&Cmi_state)
293
294 void CmiYield(void) {
295     sleep(0);
296 }
297
298 static void CmiStartThreads(char **argv) {
299     CmiStateInit(Cmi_nodestart, 0, &Cmi_state);
300     _Cmi_mype = Cmi_nodestart;
301     _Cmi_myrank = 0;
302 }
303 #else
304 /************** SMP *******************/
305 INLINE_KEYWORD int CmiMyPe(void) {
306     return CmiGetState()->pe;
307 }
308 INLINE_KEYWORD int CmiMyRank(void) {
309     return CmiGetState()->rank;
310 }
311 INLINE_KEYWORD int CmiNodeFirst(int node) {
312     return node*_Cmi_mynodesize;
313 }
314 INLINE_KEYWORD int CmiNodeSize(int node) {
315     return _Cmi_mynodesize;
316 }
317 INLINE_KEYWORD int CmiNodeOf(int pe) {
318     return (pe/_Cmi_mynodesize);
319 }
320 INLINE_KEYWORD int CmiRankOf(int pe) {
321     return pe%_Cmi_mynodesize;
322 }
323 #endif
324 CsvDeclare(CmiNodeState, NodeState);
325 /* ===== End of Processor/Node State-related Stuff =====*/
326
327 #include "machine-broadcast.c"
328 #include "immediate.c"
329
330 /* ===== Beginning of Common Function Definitions ===== */
331 static void PerrorExit(const char *msg) {
332     perror(msg);
333     exit(1);
334 }
335
336 /* ##### Beginning of Functions Related with Message Sending OPs ##### */
337 /*Add a message to this processor's receive queue, pe is a rank */
338 void CmiPushPE(int rank,void *msg) {
339     CmiState cs = CmiGetStateN(rank);
340     MACHSTATE2(3,"Pushing message into rank %d's queue %p{",rank, cs->recv);
341 #if CMK_IMMEDIATE_MSG
342     if (CmiIsImmediate(msg)) {
343         MACHSTATE1(3, "[%p] Push Immediate Message begin{",CmiGetState());
344         CMI_DEST_RANK(msg) = rank;
345         CmiPushImmediateMsg(msg);
346         MACHSTATE1(3, "[%p] Push Immediate Message end}",CmiGetState());
347         return;
348     }
349 #endif
350
351     PCQueuePush(cs->recv,msg);
352     CmiIdleLock_addMessage(&cs->idle);
353     MACHSTATE1(3,"} Pushing message into rank %d's queue done",rank);
354 }
355
356 #if CMK_NODE_QUEUE_AVAILABLE
357 /*Add a message to this processor's receive queue */
358 void CmiPushNode(void *msg) {
359     MACHSTATE(3,"Pushing message into NodeRecv queue");
360 #if CMK_IMMEDIATE_MSG
361     if (CmiIsImmediate(msg)) {
362         CMI_DEST_RANK(msg) = 0;
363         CmiPushImmediateMsg(msg);
364         return;
365     }
366 #endif
367     CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
368     PCQueuePush(CsvAccess(NodeState).NodeRecv,msg);
369     CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
370     {
371         CmiState cs=CmiGetStateN(0);
372         CmiIdleLock_addMessage(&cs->idle);
373     }
374 }
375 #endif
376
377 /* This function handles the msg received as which queue to push into */
378 static INLINE_KEYWORD void handleOneRecvedMsg(int size, char *msg) {
379     int isBcastMsg = 0;
380 #if CMK_BROADCAST_SPANNING_TREE || CMK_BROADCAST_HYPERCUBE
381     isBcastMsg = (CMI_BROADCAST_ROOT(msg)!=0);
382 #endif
383
384     if (isBcastMsg) {
385         handleOneBcastMsg(size, msg);
386         return;
387     }
388
389 #if CMK_NODE_QUEUE_AVAILABLE
390     if (CMI_DEST_RANK(msg)==DGRAM_NODEMESSAGE)
391         CmiPushNode(msg);
392     else
393 #endif
394         CmiPushPE(CMI_DEST_RANK(msg), msg);
395
396 }
397
398
399 static void SendToPeers(int size, char *msg) {
400     /* FIXME: now it's just a flat p2p send!! When node size is large,
401     * it should also be sent in a tree
402     */
403     int exceptRank = CMI_DEST_RANK(msg);
404     int i;
405     for (i=0; i<exceptRank; i++) {
406         CmiPushPE(i, CopyMsg(msg, size));
407     }
408     for (i=exceptRank+1; i<CmiMyNodeSize(); i++) {
409         CmiPushPE(i, CopyMsg(msg, size));
410     }
411 }
412
413
414 /* Functions regarding sending operations */
415 static void CmiSendSelf(char *msg) {
416 #if CMK_IMMEDIATE_MSG
417     if (CmiIsImmediate(msg)) {
418         /* CmiBecomeNonImmediate(msg); */
419         CmiPushImmediateMsg(msg);
420         CmiHandleImmediate();
421         return;
422     }
423 #endif
424     CdsFifo_Enqueue(CpvAccess(CmiLocalQueue),msg);
425 }
426
427 /* Functions regarding P2P send op */
428 #if USE_COMMON_SYNC_P2P
429 void CmiSyncSendFn(int destPE, int size, char *msg) {
430     char *dupmsg = CopyMsg(msg, size);
431     CmiFreeSendFn(destPE, size, dupmsg);
432 }
433
434 void CmiFreeSendFn(int destPE, int size, char *msg) {
435     CMI_SET_BROADCAST_ROOT(msg, 0);
436     CQdCreate(CpvAccess(cQdState), 1);
437     if (CmiMyPe()==destPE) {
438         CmiSendSelf(msg);
439     } else {
440         int destNode = CmiNodeOf(destPE);
441 #if CMK_SMP
442         if (CmiMyNode()==destNode) {
443             CmiPushPE(CmiRankOf(destPE), msg);
444             return;
445         }
446 #endif
447         CMI_DEST_RANK(msg) = CmiRankOf(destPE);
448         LrtsSendFunc(destNode, size, msg, P2P_SYNC);
449     }
450 }
451 #endif
452
453 #if USE_COMMON_ASYNC_P2P
454 CmiCommHandle CmiAsyncSendFn(int destPE, int size, char *msg) {
455     int destNode = CmiNodeOf(destPE);
456     if (destNode == CmiMyNode()) {
457         CmiSyncSendFn(destPE,size,msg);
458         return 0;
459     } else {
460         return LrtsSendFunc(destPE, size, msg, P2P_ASYNC);
461     }
462 }
463 #endif
464
465 #if CMK_NODE_QUEUE_AVAILABLE
466 static void CmiSendNodeSelf(char *msg) {
467 #if CMK_IMMEDIATE_MSG
468     if (CmiIsImmediate(msg)) {
469         CmiPushImmediateMsg(msg);
470         if (!_immRunning) CmiHandleImmediate();
471         return;
472     }
473 #endif
474     CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
475     PCQueuePush(CsvAccess(NodeState).NodeRecv, msg);
476     CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
477 }
478
479 #if USE_COMMON_ASYNC_P2P
480 void CmiSyncNodeSendFn(int destNode, int size, char *msg) {
481     char *dupmsg = CopyMsg(msg, size);
482     CmiFreeNodeSendFn(destNode, size, dupmsg);
483 }
484
485 void CmiFreeNodeSendFn(int destNode, int size, char *msg) {
486     CMI_DEST_RANK(msg) = DGRAM_NODEMESSAGE;
487     CQdCreate(CpvAccess(cQdState), 1);
488     CMI_SET_BROADCAST_ROOT(msg, 0);
489     if (destNode == CmiMyNode()) {
490         CmiSendNodeSelf(msg);
491     } else {
492         LrtsSendFunc(destNode, size, msg, P2P_SYNC);
493     }
494 }
495 #endif
496
497 #if USE_COMMON_ASYNC_P2P
498 CmiCommHandle CmiAsyncNodeSendFn(int destNode, int size, char *msg) {
499     if (destNode == CmiMyNode()) {
500         CmiSyncNodeSendFn(destNode, size, msg);
501         return 0;
502     } else {
503         return LrtsSendFunc(destNode, size, msg, P2P_ASYNC);
504     }
505 }
506 #endif
507
508 /* ##### Beginning of Functions Related with Machine Startup ##### */
509 void ConverseInit(int argc, char **argv, CmiStartFn fn, int usched, int initret) {
510     int tmp;
511     /* processor per node */
512     _Cmi_mynodesize = 1;
513     if (!CmiGetArgInt(argv,"+ppn", &_Cmi_mynodesize))
514         CmiGetArgInt(argv,"++ppn", &_Cmi_mynodesize);
515 #if ! CMK_SMP
516     if (_Cmi_mynodesize > 1 && _Cmi_mynode == 0)
517         CmiAbort("+ppn cannot be used in non SMP version!\n");
518 #endif
519
520     /* Network progress function is used to poll the network when for
521     messages. This flushes receive buffers on some  implementations*/
522     networkProgressPeriod = NETWORK_PROGRESS_PERIOD_DEFAULT;
523     CmiGetArgInt(argv, "+networkProgressPeriod", &networkProgressPeriod);
524
525     /* _Cmi_mynodesize has to be obtained before LrtsInit
526      * because it may be used inside LrtsInit
527      */
528     /* argv could be changed inside LrtsInit */
529     /* Inside this function, the number of nodes and my node id are obtained */
530     LrtsInit(&argc, &argv, &_Cmi_numnodes, &_Cmi_mynode);
531
532     _Cmi_numpes = _Cmi_numnodes * _Cmi_mynodesize;
533     Cmi_nodestart = _Cmi_mynode * _Cmi_mynodesize;
534     Cmi_argvcopy = CmiCopyArgs(argv);
535     Cmi_argv = argv;
536     Cmi_startfn = fn;
537     Cmi_usrsched = usched;
538
539     /* CmiTimerInit(); */
540 #if CMK_BROADCAST_HYPERCUBE
541     /* CmiNodesDim = ceil(log2(CmiNumNodes)) except when #nodes is 1*/
542     tmp = CmiNumNodes()-1;
543     CmiNodesDim = 0;
544     while (tmp>0) {
545         CmiNodesDim++;
546         tmp = tmp >> 1;
547     }
548     if (CmiNumNodes()==1) CmiNodesDim=1;
549 #endif
550
551     CsvInitialize(CmiNodeState, NodeState);
552     CmiNodeStateInit(&CsvAccess(NodeState));
553 #if CMK_SMP
554     commThdExitLock = CmiCreateLock();
555 #endif
556
557 #if CMK_OFFLOAD_BCAST_PROCESS
558     /* the actual queues should be created on comm thread considering NUMA in SMP */
559     CsvInitialize(PCQueue, procBcastQ);
560 #if CMK_NODE_QUEUE_AVAILABLE
561     CsvInitialize(PCQueue, nodeBcastQ);
562 #endif
563 #endif
564
565     CmiStartThreads(argv);
566     ConverseRunPE(initret);
567 }
568
569 static void ConverseRunPE(int everReturn) {
570     CmiState cs;
571     char** CmiMyArgv;
572
573     LrtsPreCommonInit(everReturn);
574
575 #if CMK_OFFLOAD_BCAST_PROCESS
576     int createQueue = 1;
577 #if CMK_SMP
578 #if CMK_SMP_NO_COMMTHD
579     /* If there's no comm thread, then the queue is created on rank 0 */
580     if (CmiMyRank()) createQueue = 0;
581 #else
582     if (CmiMyRank()<CmiMyNodeSize()) createQueue = 0;
583 #endif
584 #endif
585
586     if (createQueue) {
587         CsvAccess(procBcastQ) = PCQueueCreate();
588 #if CMK_NODE_QUEUE_AVAILABLE
589         CsvAccess(nodeBcastQ) = PCQueueCreate();
590 #endif
591     }
592 #endif
593
594     CmiNodeAllBarrier();
595
596     cs = CmiGetState();
597     CpvInitialize(void *,CmiLocalQueue);
598     CpvAccess(CmiLocalQueue) = cs->localqueue;
599
600     if (CmiMyRank())
601         CmiMyArgv=CmiCopyArgs(Cmi_argvcopy);
602     else
603         CmiMyArgv=Cmi_argv;
604
605     CthInit(CmiMyArgv);
606
607     /* initialize the network progress counter*/
608     /* Network progress function is used to poll the network when for
609        messages. This flushes receive buffers on some  implementations*/
610     CpvInitialize(unsigned , networkProgressCount);
611     CpvAccess(networkProgressCount) = 0;
612
613     ConverseCommonInit(CmiMyArgv);
614
615     LrtsPostCommonInit(everReturn);
616
617     /* Converse initialization finishes, immediate messages can be processed.
618        node barrier previously should take care of the node synchronization */
619     _immediateReady = 1;
620
621     /* communication thread */
622     if (CmiMyRank() == CmiMyNodeSize()) {
623         Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
624         while (1) CommunicationServerThread(5);
625     } else { /* worker thread */
626         if (!everReturn) {
627             Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
628             if (Cmi_usrsched==0) CsdScheduler(-1);
629             ConverseExit();
630         }
631     }
632 }
633 /* ##### End of Functions Related with Machine Startup ##### */
634
635 /* ##### Beginning of Functions Related with Machine Running ##### */
636 static INLINE_KEYWORD void AdvanceCommunication() {
637     int doProcessBcast = 1;
638
639     LrtsAdvanceCommunication();
640
641 #if CMK_OFFLOAD_BCAST_PROCESS
642 #if CMK_SMP_NO_COMMTHD
643     /*FIXME: only asks rank 0 to process bcast msgs, so perf may suffer*/
644     if (CmiMyRank()) doProcessBcast = 0;
645 #endif
646     if (doProcessBcast) processBcastQs();
647 #endif
648
649 #if CMK_IMMEDIATE_MSG
650 #if !CMK_SMP
651     CmiHandleImmediate();
652 #endif
653 #if CMK_SMP && CMK_SMP_NO_COMMTHD
654     if (CmiMyRank()==0) CmiHandleImmediate();
655 #endif
656 #endif
657
658 }
659
660 static void CommunicationServer(int sleepTime) {
661 #if CMK_SMP
662     AdvanceCommunication();
663
664     if (commThdExit == CmiMyNodeSize()) {
665         MACHSTATE(2, "CommunicationServer exiting {");
666         LrtsDrainResources();
667         MACHSTATE(2, "} CommunicationServer EXIT");
668
669         ConverseCommonExit();
670
671         LrtsExit();
672     }
673 #endif
674 }
675
676 static void CommunicationServerThread(int sleepTime) {
677     CommunicationServer(sleepTime);
678 #if CMK_IMMEDIATE_MSG
679     CmiHandleImmediate();
680 #endif
681 }
682
683 void ConverseExit(void) {
684 #if !CMK_SMP
685     LrtsDrainResources();
686 #endif
687
688     ConverseCommonExit();
689
690 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
691     if (CmiMyPe() == 0) CmiPrintf("End of program\n");
692 #endif
693
694 #if !CMK_SMP
695     LrtsExit();
696 #else
697     /* In SMP, the communication thread will exit */
698     /* atomic increment */
699     CmiLock(commThdExitLock);
700     commThdExit++;
701     CmiUnlock(commThdExitLock);
702     while (1) CmiYield();
703 #endif
704 }
705 /* ##### End of Functions Related with Machine Running ##### */
706
707
708 /* ##### Beginning of Functions Providing Incoming Network Messages ##### */
709 void *CmiGetNonLocal(void) {
710     CmiState cs = CmiGetState();
711     void *msg = NULL;
712
713 #if !CMK_SMP || CMK_SMP_NO_COMMTHD
714     /**
715       * In SMP mode with comm thread, it's possible a normal
716       * msg is sent from an immediate msg which is executed
717       * on comm thread. In this case, the msg is sent to
718       * the network queue of the work thread. Therefore,
719       * even there's only one worker thread, the polling of
720       * network queue is still required.
721       */
722     if (CmiNumPes() == 1) return NULL;
723 #endif
724
725     MACHSTATE2(3, "[%p] CmiGetNonLocal begin %d{", cs, CmiMyPe());
726     CmiIdleLock_checkMessage(&cs->idle);
727     /* ?????although it seems that lock is not needed, I found it crashes very often
728        on mpi-smp without lock */
729 #if !CMK_SMP
730     AdvanceCommunication();
731 #endif
732
733     msg = PCQueuePop(cs->recv);
734
735 #if !CMK_SMP
736     LrtsPostNonLocal();
737 #endif
738
739     MACHSTATE3(3,"[%p] CmiGetNonLocal from queue %p with msg %p end }",CmiGetState(),(cs->recv), msg);
740
741     return msg;
742 }
743 #if CMK_NODE_QUEUE_AVAILABLE
744 void *CmiGetNonLocalNodeQ(void) {
745     CmiState cs = CmiGetState();
746     char *result = 0;
747     CmiIdleLock_checkMessage(&cs->idle);
748     if (!PCQueueEmpty(CsvAccess(NodeState).NodeRecv)) {
749         MACHSTATE1(3,"CmiGetNonLocalNodeQ begin %d {", CmiMyPe());
750         CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
751         result = (char *) PCQueuePop(CsvAccess(NodeState).NodeRecv);
752         CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
753         MACHSTATE1(3,"} CmiGetNonLocalNodeQ end %d ", CmiMyPe());
754     }
755
756     return result;
757 }
758 #endif
759 /* ##### End of Functions Providing Incoming Network Messages ##### */
760
761 static CmiIdleState *CmiNotifyGetState(void) {
762     CmiIdleState *s=(CmiIdleState *)malloc(sizeof(CmiIdleState));
763     s->sleepMs=0;
764     s->nIdles=0;
765     s->cs=CmiGetState();
766     return s;
767 }
768
769 static void CmiNotifyBeginIdle(CmiIdleState *s) {
770     s->sleepMs=0;
771     s->nIdles=0;
772 }
773
774 /*Number of times to spin before sleeping*/
775 #define SPINS_BEFORE_SLEEP 20
776 static void CmiNotifyStillIdle(CmiIdleState *s) {
777     MACHSTATE1(2,"still idle (%d) begin {",CmiMyPe())
778     s->nIdles++;
779     if (s->nIdles>SPINS_BEFORE_SLEEP) { /*Start giving some time back to the OS*/
780         s->sleepMs+=2;
781         if (s->sleepMs>10) s->sleepMs=10;
782     }
783
784     if (s->sleepMs>0) {
785         MACHSTATE1(2,"idle lock(%d) {",CmiMyPe())
786         CmiIdleLock_sleep(&s->cs->idle,s->sleepMs);
787         MACHSTATE1(2,"} idle lock(%d)",CmiMyPe())
788     }
789
790 #if !CMK_SMP
791     AdvanceCommunication();
792 #endif
793
794     MACHSTATE1(2,"still idle (%d) end {",CmiMyPe())
795 }
796
797 /* usually called in non-smp mode */
798 void CmiNotifyIdle(void) {
799     AdvanceCommunication();
800     CmiYield();
801 }
802
803 /* Utiltiy functions */
804 static char *CopyMsg(char *msg, int len) {
805     char *copy = (char *)CmiAlloc(len);
806 #if CMK_ERROR_CHECKING
807     if (!copy) {
808         CmiAbort("Error: out of memory in machine layer\n");
809     }
810 #endif
811     memcpy(copy, msg, len);
812     return copy;
813 }