tweak pxshm to work for SMP build
[charm.git] / src / arch / util / machine-common-core.c
1 /*
2  *created by Chao Mei
3  *revised by Yanhua
4  */
5 #if CMK_C_INLINE
6 #define INLINE_KEYWORD inline
7 #else
8 #define INLINE_KEYWORD
9 #endif
10
11 /******* broadcast related  */
12 #ifndef CMK_BROADCAST_SPANNING_TREE
13 #define CMK_BROADCAST_SPANNING_TREE    1
14 #endif
15
16 #ifndef CMK_BROADCAST_HYPERCUBE
17 #define CMK_BROADCAST_HYPERCUBE        0
18 #endif
19
20 #define BROADCAST_SPANNING_FACTOR      4
21 /* The number of children used when a msg is broadcast inside a node */
22 #define BROADCAST_SPANNING_INTRA_FACTOR  8
23
24 /* Root of broadcast:
25  * non-bcast msg: root = 0;
26  * proc-level bcast msg: root >=1; (CmiMyPe()+1)
27  * node-level bcast msg: root <=-1; (-CmiMyNode()-1)
28  */
29 #define CMI_BROADCAST_ROOT(msg)          ((CmiMsgHeaderBasic *)msg)->root
30 #define CMI_SET_BROADCAST_ROOT(msg, root)  CMI_BROADCAST_ROOT(msg) = (root);
31
32 /**
33  * For some machine layers such as on Active Message framework,
34  * the receiver callback is usally executed on an internal
35  * thread (i.e. not the flow managed by ours). Therefore, for
36  * forwarding broadcast messages, we could have a choice whether
37  * to offload such function to the flow we manage such as the
38  * communication thread. -
39  */
40
41 #ifndef CMK_OFFLOAD_BCAST_PROCESS
42 #define CMK_OFFLOAD_BCAST_PROCESS 0
43 #endif
44
45 #if CMK_OFFLOAD_BCAST_PROCESS
46 CsvDeclare(PCQueue, procBcastQ);
47 #if CMK_NODE_QUEUE_AVAILABLE
48 CsvDeclare(PCQueue, nodeBcastQ);
49 #endif
50 #endif
51
52 static int  MSG_STATISTIC = 0;
53 int     msg_histogram[22];
54 static int _cmi_log2(int size)
55 {
56     int ret = 1;
57     size = size-1;
58     while( (size=size>>1)>0) ret++;
59     return ret;
60 }
61 #if CMK_BROADCAST_HYPERCUBE
62 /* ceil(log2(CmiNumNodes)) except when _Cmi_numnodes is 1, used for hypercube */
63 static int CmiNodesDim;
64 #endif
65 /* ###End of Broadcast related definitions ### */
66
67
68 static void handleOneBcastMsg(int size, char *msg);
69 static void processBcastQs();
70
71 /* Utility functions for forwarding broadcast messages,
72  * should not be used in machine-specific implementations
73  * except in some special occasions.
74  */
75 static INLINE_KEYWORD void processProcBcastMsg(int size, char *msg);
76 static INLINE_KEYWORD void processNodeBcastMsg(int size, char *msg);
77 static void SendSpanningChildrenProc(int size, char *msg);
78 static void SendHyperCubeProc(int size, char *msg);
79 #if CMK_NODE_QUEUE_AVAILABLE
80 static void SendSpanningChildrenNode(int size, char *msg);
81 static void SendHyperCubeNode(int size, char *msg);
82 #endif
83
84 static void SendSpanningChildren(int size, char *msg, int rankToAssign, int startNode);
85 static void SendHyperCube(int size,  char *msg, int rankToAssign, int startNode);
86
87 #if USE_COMMON_SYNC_BCAST || USE_COMMON_ASYNC_BCAST
88 #if !CMK_BROADCAST_SPANNING_TREE && !CMK_BROADCAST_HYPERCUBE
89 #warning "Broadcast function is based on the plain P2P O(P)-message scheme!!!"
90 #endif
91 #endif
92
93
94 void CmiSyncBroadcastFn(int size, char *msg);
95 CmiCommHandle CmiAsyncBroadcastFn(int size, char *msg);
96 void CmiFreeBroadcastFn(int size, char *msg);
97
98 void CmiSyncBroadcastAllFn(int size, char *msg);
99 CmiCommHandle CmiAsyncBroadcastAllFn(int size, char *msg);
100 void CmiFreeBroadcastAllFn(int size, char *msg);
101
102 #if CMK_NODE_QUEUE_AVAILABLE
103 void CmiSyncNodeBroadcastFn(int size, char *msg);
104 CmiCommHandle CmiAsyncNodeeroadcastFn(int size, char *msg);
105 void CmiFreeNodeBroadcastFn(int size, char *msg);
106
107 void CmiSyncNodeBroadcastAllFn(int size, char *msg);
108 CmiCommHandle CmiAsyncNodeBroadcastAllFn(int size, char *msg);
109 void CmiFreeNodeBroadcastAllFn(int size, char *msg);
110 #endif
111
112 /************** Done with Broadcast related */
113
114 #define CMI_DEST_RANK(msg)               ((CmiMsgHeaderBasic *)msg)->rank
115
116 #ifndef CMK_HAS_SIZE_IN_MSGHDR
117 #define CMK_HAS_SIZE_IN_MSGHDR 1
118 #endif
119 #if CMK_HAS_SIZE_IN_MSGHDR
120 #define CMI_MSG_SIZE(msg)  ((CmiMsgHeaderBasic *)msg)->size
121 #else
122 #define CMI_MSG_SIZE(msg)  (CmiAbort("Has no msg size in header"))
123 #endif
124
125 #if CMK_NODE_QUEUE_AVAILABLE
126 /* This value should be larger than the number of cores used
127  * per charm smp node. So it's currently set to such a large
128  * value.
129  */
130 #define DGRAM_NODEMESSAGE   (0x1FFB)
131 #endif
132
133 /* Node state structure */
134 int               _Cmi_mynode;    /* Which address space am I */
135 int               _Cmi_mynodesize;/* Number of processors in my address space */
136 int               _Cmi_numnodes;  /* Total number of address spaces */
137 int               _Cmi_numpes;    /* Total number of processors */
138
139 CpvDeclare(void*, CmiLocalQueue);
140
141 /* different modes for sending a message */
142 #define P2P_SYNC 0x1
143 #define P2P_ASYNC 0x2
144 #define BCAST_SYNC 0x3
145 #define BCAST_ASYNC 0x4
146
147 #if CMK_SMP
148 static volatile int commThdExit = 0;
149 static CmiNodeLock  commThdExitLock = 0;
150
151 /**
152  *  The macro defines whether to have a comm thd to offload some
153  *  work such as forwarding bcast messages etc. This macro
154  *  should be defined before including "machine-smp.c". Note
155  *  that whether a machine layer in SMP mode could run w/o comm
156  *  thread depends on the support of the underlying
157  *  communication library.
158  *
159  */
160 #ifndef CMK_SMP_NO_COMMTHD
161 #define CMK_SMP_NO_COMMTHD 0
162 #endif
163
164 #if CMK_SMP_NO_COMMTHD
165 int Cmi_commthread = 0;
166 #else
167 int Cmi_commthread = 1;
168 #endif
169
170 #endif
171
172 /*SHOULD BE MOVED TO MACHINE-SMP.C ??*/
173 static int Cmi_nodestart;
174
175 /*
176  * Network progress utility variables. Period controls the rate at
177  * which the network poll is called
178  */
179 #ifndef NETWORK_PROGRESS_PERIOD_DEFAULT
180 #define NETWORK_PROGRESS_PERIOD_DEFAULT 1000
181 #endif
182
183 CpvDeclare(unsigned , networkProgressCount);
184 int networkProgressPeriod;
185
186
187 /* ===== Beginning of Common Function Declarations ===== */
188 void CmiAbort(const char *message);
189 static void PerrorExit(const char *msg);
190
191 /* This function handles the msg received as which queue to push into */
192 static void handleOneRecvedMsg(int size, char *msg);
193
194 /* Utility functions for forwarding broadcast messages,
195  * should not be used in machine-specific implementations
196  * except in some special occasions.
197  */
198 static void SendToPeers(int size, char *msg);
199
200
201 void CmiPushPE(int rank, void *msg);
202
203 #if CMK_NODE_QUEUE_AVAILABLE
204 void CmiPushNode(void *msg);
205 #endif
206
207 /* Functions regarding send ops declared in converse.h */
208
209 /* In default, using the common codes for msg sending */
210 #ifndef USE_COMMON_SYNC_P2P
211 #define USE_COMMON_SYNC_P2P 1
212 #endif
213 #ifndef USE_COMMON_ASYNC_P2P
214 #define USE_COMMON_ASYNC_P2P 1
215 #endif
216 #ifndef USE_COMMON_SYNC_BCAST
217 #define USE_COMMON_SYNC_BCAST 1
218 #endif
219 #ifndef USE_COMMON_ASYNC_BCAST
220 #define USE_COMMON_ASYNC_BCAST 1
221 #endif
222
223 static void CmiSendSelf(char *msg);
224
225 void CmiSyncSendFn(int destPE, int size, char *msg);
226 CmiCommHandle CmiAsyncSendFn(int destPE, int size, char *msg);
227 void CmiFreeSendFn(int destPE, int size, char *msg);
228
229 #if CMK_NODE_QUEUE_AVAILABLE
230 static void CmiSendNodeSelf(char *msg);
231
232 void CmiSyncNodeSendFn(int destNode, int size, char *msg);
233 CmiCommHandle CmiAsyncNodeSendFn(int destNode, int size, char *msg);
234 void CmiFreeNodeSendFn(int destNode, int size, char *msg);
235
236 #endif
237
238 /* Functions and variables regarding machine startup */
239 static char     **Cmi_argv;
240 static char     **Cmi_argvcopy;
241 static CmiStartFn Cmi_startfn;   /* The start function */
242 static int        Cmi_usrsched;  /* Continue after start function finishes? */
243 void ConverseInit(int argc, char **argv, CmiStartFn fn, int usched, int initret);
244 static void ConverseRunPE(int everReturn);
245
246 /* Functions regarding machine running on every proc */
247 static void AdvanceCommunication();
248 static void CommunicationServer(int sleepTime);
249 static void CommunicationServerThread(int sleepTime);
250 void ConverseExit(void);
251
252 /* Functions providing incoming network messages */
253 void *CmiGetNonLocal(void);
254 #if CMK_NODE_QUEUE_AVAILABLE
255 void *CmiGetNonLocalNodeQ(void);
256 #endif
257 /* Utiltiy functions */
258 static char *CopyMsg(char *msg, int len);
259
260 /* ===== End of Common Function Declarations ===== */
261
262 #include "machine-smp.c"
263
264 /* ===== Beginning of Idle-state Related Declarations =====  */
265 typedef struct {
266     int sleepMs; /*Milliseconds to sleep while idle*/
267     int nIdles; /*Number of times we've been idle in a row*/
268     CmiState cs; /*Machine state*/
269 } CmiIdleState;
270
271 static CmiIdleState *CmiNotifyGetState(void);
272
273 /**
274  *  Generally,
275  *
276  *  CmiNotifyIdle is used in non-SMP mode when the proc is idle.
277  *  When the proc is idle, AdvanceCommunication needs to be
278  *  called.
279  *
280  *  CmiNotifyStillIdle and CmiNotifyBeginIdle are used in SMP mode.
281  *
282  *  Different layers have choices of registering different callbacks for
283  *  idle state.
284  */
285 static void CmiNotifyBeginIdle(CmiIdleState *s);
286 static void CmiNotifyStillIdle(CmiIdleState *s);
287 void CmiNotifyIdle(void);
288 /* ===== End of Idle-state Related Declarations =====  */
289
290 /* ===== Beginning of Processor/Node State-related Stuff =====*/
291 #if !CMK_SMP
292 /************ non SMP **************/
293 static struct CmiStateStruct Cmi_state;
294 int _Cmi_mype;
295 int _Cmi_myrank;
296
297 void CmiMemLock() {}
298 void CmiMemUnlock() {}
299
300 #define CmiGetState() (&Cmi_state)
301 #define CmiGetStateN(n) (&Cmi_state)
302
303 void CmiYield(void) {
304     sleep(0);
305 }
306
307 static void CmiStartThreads(char **argv) {
308     CmiStateInit(Cmi_nodestart, 0, &Cmi_state);
309     _Cmi_mype = Cmi_nodestart;
310     _Cmi_myrank = 0;
311 }
312 #else
313 /************** SMP *******************/
314 INLINE_KEYWORD int CmiMyPe() {
315     return CmiGetState()->pe;
316 }
317 INLINE_KEYWORD int CmiMyRank() {
318     return CmiGetState()->rank;
319 }
320 INLINE_KEYWORD int CmiNodeFirst(int node) {
321     return node*_Cmi_mynodesize;
322 }
323 INLINE_KEYWORD int CmiNodeSize(int node) {
324     return _Cmi_mynodesize;
325 }
326 INLINE_KEYWORD int CmiNodeOf(int pe) {
327     return (pe/_Cmi_mynodesize);
328 }
329 INLINE_KEYWORD int CmiRankOf(int pe) {
330     return pe%_Cmi_mynodesize;
331 }
332 #endif
333 CsvDeclare(CmiNodeState, NodeState);
334 /* ===== End of Processor/Node State-related Stuff =====*/
335
336 #include "machine-broadcast.c"
337 #include "immediate.c"
338
339 /* ===== Beginning of Common Function Definitions ===== */
340 static void PerrorExit(const char *msg) {
341     perror(msg);
342     exit(1);
343 }
344
345 /* ##### Beginning of Functions Related with Message Sending OPs ##### */
346 /*Add a message to this processor's receive queue, pe is a rank */
347 void CmiPushPE(int rank,void *msg) {
348     CmiState cs = CmiGetStateN(rank);
349     MACHSTATE2(3,"Pushing message into rank %d's queue %p{",rank, cs->recv);
350 #if CMK_IMMEDIATE_MSG
351     if (CmiIsImmediate(msg)) {
352         MACHSTATE1(3, "[%p] Push Immediate Message begin{",CmiGetState());
353         CMI_DEST_RANK(msg) = rank;
354         CmiPushImmediateMsg(msg);
355         MACHSTATE1(3, "[%p] Push Immediate Message end}",CmiGetState());
356         return;
357     }
358 #endif
359
360     PCQueuePush(cs->recv,msg);
361     CmiIdleLock_addMessage(&cs->idle);
362     MACHSTATE1(3,"} Pushing message into rank %d's queue done",rank);
363 }
364
365 #if CMK_NODE_QUEUE_AVAILABLE
366 /*Add a message to this processor's receive queue */
367 void CmiPushNode(void *msg) {
368     MACHSTATE(3,"Pushing message into NodeRecv queue");
369 #if CMK_IMMEDIATE_MSG
370     if (CmiIsImmediate(msg)) {
371         CMI_DEST_RANK(msg) = 0;
372         CmiPushImmediateMsg(msg);
373         return;
374     }
375 #endif
376     CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
377     PCQueuePush(CsvAccess(NodeState).NodeRecv,msg);
378     CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
379     {
380         CmiState cs=CmiGetStateN(0);
381         CmiIdleLock_addMessage(&cs->idle);
382     }
383 }
384 #endif
385
386 /* This function handles the msg received as which queue to push into */
387 static INLINE_KEYWORD void handleOneRecvedMsg(int size, char *msg) {
388     int isBcastMsg = 0;
389 #if CMK_BROADCAST_SPANNING_TREE || CMK_BROADCAST_HYPERCUBE
390     isBcastMsg = (CMI_BROADCAST_ROOT(msg)!=0);
391 #endif
392
393     if (isBcastMsg) {
394         handleOneBcastMsg(size, msg);
395         return;
396     }
397
398 #if CMK_NODE_QUEUE_AVAILABLE
399     if (CMI_DEST_RANK(msg)==DGRAM_NODEMESSAGE)
400         CmiPushNode(msg);
401     else
402 #endif
403         CmiPushPE(CMI_DEST_RANK(msg), msg);
404
405 }
406
407
408 static void SendToPeers(int size, char *msg) {
409     /* FIXME: now it's just a flat p2p send!! When node size is large,
410     * it should also be sent in a tree
411     */
412     int exceptRank = CMI_DEST_RANK(msg);
413     int i;
414     for (i=0; i<exceptRank; i++) {
415         CmiPushPE(i, CopyMsg(msg, size));
416     }
417     for (i=exceptRank+1; i<CmiMyNodeSize(); i++) {
418         CmiPushPE(i, CopyMsg(msg, size));
419     }
420 }
421
422
423 /* Functions regarding sending operations */
424 static void CmiSendSelf(char *msg) {
425 #if CMK_IMMEDIATE_MSG
426     if (CmiIsImmediate(msg)) {
427         /* CmiBecomeNonImmediate(msg); */
428         CmiPushImmediateMsg(msg);
429         CmiHandleImmediate();
430         return;
431     }
432 #endif
433     CdsFifo_Enqueue(CpvAccess(CmiLocalQueue),msg);
434 }
435
436 /* Functions regarding P2P send op */
437 #if USE_COMMON_SYNC_P2P
438 void CmiSyncSendFn(int destPE, int size, char *msg) {
439     char *dupmsg = CopyMsg(msg, size);
440     CmiFreeSendFn(destPE, size, dupmsg);
441 }
442
443 #if CMK_USE_PXSHM
444 inline int CmiValidPxshm(int dst, int size);
445 void CmiSendMessagePxshm(char *msg, int size, int dstpe, int *refcount, int rank,unsigned int broot);
446 void CmiInitPxshm(char **argv);
447 inline void CommunicationServerPxshm();
448 void CmiExitPxshm();
449 #endif
450
451 int refcount = 0;
452
453 void CmiFreeSendFn(int destPE, int size, char *msg) {
454     CMI_SET_BROADCAST_ROOT(msg, 0);
455     CQdCreate(CpvAccess(cQdState), 1);
456     if (CmiMyPe()==destPE) {
457         CmiSendSelf(msg);
458 #if CMK_PERSISTENT_COMM
459         if (phs) curphs++;
460 #endif
461     } else {
462 #if CMK_PERSISTENT_COMM
463         if (phs && size > 8192) {
464             CmiAssert(curphs < phsSize);
465             LrtsSendPersistentMsg(phs[curphs++], destPE, size, msg);
466             return;
467         }
468 #endif
469
470         int destNode = CmiNodeOf(destPE);
471 #if CMK_SMP
472         if (CmiMyNode()==destNode) {
473             CmiPushPE(CmiRankOf(destPE), msg);
474             return;
475         }
476 #endif
477         CMI_DEST_RANK(msg) = CmiRankOf(destPE);
478 #if CMK_USE_PXSHM
479         int ret=CmiValidPxshm(destPE, size);
480         if (ret) {
481           CmiSendMessagePxshm(msg, size, destPE, &refcount, CmiRankOf(destPE), 0);
482           //for (int i=0; i<refcount; i++) CmiReference(msg);
483 #if CMK_PERSISTENT_COMM
484           if (phs) curphs++;
485 #endif
486           return;
487         }
488 #endif
489 if (  MSG_STATISTIC)
490 {
491     int ret_log = _cmi_log2(size);
492     if(ret_log >21) ret_log = 21;
493     msg_histogram[ret_log]++;
494 }
495         LrtsSendFunc(destNode, size, msg, P2P_SYNC);
496     }
497 }
498 #endif
499
500 #if USE_COMMON_ASYNC_P2P
501 CmiCommHandle CmiAsyncSendFn(int destPE, int size, char *msg) {
502     int destNode = CmiNodeOf(destPE);
503     if (destNode == CmiMyNode()) {
504         CmiSyncSendFn(destPE,size,msg);
505         return 0;
506     } else {
507 if (  MSG_STATISTIC)
508 {
509     int ret_log = _cmi_log2(size);
510         if(ret_log >21) ret_log = 21;
511         msg_histogram[ret_log]++;
512 }
513         return LrtsSendFunc(destPE, size, msg, P2P_ASYNC);
514     }
515 }
516 #endif
517
518 #if CMK_NODE_QUEUE_AVAILABLE
519 static void CmiSendNodeSelf(char *msg) {
520 #if CMK_IMMEDIATE_MSG
521     if (CmiIsImmediate(msg)) {
522         CmiPushImmediateMsg(msg);
523         if (!_immRunning) CmiHandleImmediate();
524         return;
525     }
526 #endif
527     CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
528     PCQueuePush(CsvAccess(NodeState).NodeRecv, msg);
529     CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
530 }
531
532 #if USE_COMMON_ASYNC_P2P
533 void CmiSyncNodeSendFn(int destNode, int size, char *msg) {
534     char *dupmsg = CopyMsg(msg, size);
535     CmiFreeNodeSendFn(destNode, size, dupmsg);
536 }
537
538 void CmiFreeNodeSendFn(int destNode, int size, char *msg) {
539     CMI_DEST_RANK(msg) = DGRAM_NODEMESSAGE;
540     CQdCreate(CpvAccess(cQdState), 1);
541     CMI_SET_BROADCAST_ROOT(msg, 0);
542     if (destNode == CmiMyNode()) {
543         CmiSendNodeSelf(msg);
544     } else {
545 if (  MSG_STATISTIC)
546 {
547     int ret_log = _cmi_log2(size);
548     if(ret_log >21) ret_log = 21;
549     msg_histogram[ret_log]++;
550 }
551         LrtsSendFunc(destNode, size, msg, P2P_SYNC);
552     }
553 }
554 #endif
555
556 #if USE_COMMON_ASYNC_P2P
557 CmiCommHandle CmiAsyncNodeSendFn(int destNode, int size, char *msg) {
558     if (destNode == CmiMyNode()) {
559         CmiSyncNodeSendFn(destNode, size, msg);
560         return 0;
561     } else {
562 if (  MSG_STATISTIC)
563 {
564         int ret_log = _cmi_log2(size);
565         if(ret_log >21) ret_log = 21;
566         msg_histogram[ret_log]++;
567 }
568         return LrtsSendFunc(destNode, size, msg, P2P_ASYNC);
569     }
570 }
571 #endif
572 #endif
573
574 /* ##### Beginning of Functions Related with Machine Startup ##### */
575 void ConverseInit(int argc, char **argv, CmiStartFn fn, int usched, int initret) {
576     int _ii;
577     int tmp;
578     MSG_STATISTIC = CmiGetArgFlag(argv, "+msgstatistic");
579     /* processor per node */
580     _Cmi_mynodesize = 1;
581     if (!CmiGetArgInt(argv,"+ppn", &_Cmi_mynodesize))
582         CmiGetArgInt(argv,"++ppn", &_Cmi_mynodesize);
583 #if ! CMK_SMP
584     if (_Cmi_mynodesize > 1 && _Cmi_mynode == 0)
585         CmiAbort("+ppn cannot be used in non SMP version!\n");
586 #endif
587
588     /* Network progress function is used to poll the network when for
589     messages. This flushes receive buffers on some  implementations*/
590     networkProgressPeriod = NETWORK_PROGRESS_PERIOD_DEFAULT;
591     CmiGetArgInt(argv, "+networkProgressPeriod", &networkProgressPeriod);
592
593     /* _Cmi_mynodesize has to be obtained before LrtsInit
594      * because it may be used inside LrtsInit
595      */
596     /* argv could be changed inside LrtsInit */
597     /* Inside this function, the number of nodes and my node id are obtained */
598 if (  MSG_STATISTIC)
599 {
600     for(_ii=0; _ii<22; _ii++)
601         msg_histogram[_ii] = 0;
602 }
603
604     LrtsInit(&argc, &argv, &_Cmi_numnodes, &_Cmi_mynode);
605    
606 if (_Cmi_mynode==0) 
607 #if !CMK_SMP 
608     printf("Charm++> Running on Non-smp mode\n");
609 #else
610     printf("Charm++> Running on SMP mode, %d worker threads per process\n", _Cmi_mynodesize);
611 #endif
612
613     _Cmi_numpes = _Cmi_numnodes * _Cmi_mynodesize;
614     Cmi_nodestart = _Cmi_mynode * _Cmi_mynodesize;
615     Cmi_argvcopy = CmiCopyArgs(argv);
616     Cmi_argv = argv;
617     Cmi_startfn = fn;
618     Cmi_usrsched = usched;
619
620 #if CMK_USE_PXSHM
621     CmiInitPxshm(argv);
622 #endif
623
624     /* CmiTimerInit(); */
625 #if CMK_BROADCAST_HYPERCUBE
626     /* CmiNodesDim = ceil(log2(CmiNumNodes)) except when #nodes is 1*/
627     tmp = CmiNumNodes()-1;
628     CmiNodesDim = 0;
629     while (tmp>0) {
630         CmiNodesDim++;
631         tmp = tmp >> 1;
632     }
633     if (CmiNumNodes()==1) CmiNodesDim=1;
634 #endif
635
636     CsvInitialize(CmiNodeState, NodeState);
637     CmiNodeStateInit(&CsvAccess(NodeState));
638 #if CMK_SMP
639     commThdExitLock = CmiCreateLock();
640 #endif
641
642 #if CMK_OFFLOAD_BCAST_PROCESS
643     /* the actual queues should be created on comm thread considering NUMA in SMP */
644     CsvInitialize(PCQueue, procBcastQ);
645 #if CMK_NODE_QUEUE_AVAILABLE
646     CsvInitialize(PCQueue, nodeBcastQ);
647 #endif
648 #endif
649
650     CmiStartThreads(argv);
651
652     ConverseRunPE(initret);
653 }
654
655 static void ConverseRunPE(int everReturn) {
656     CmiState cs;
657     char** CmiMyArgv;
658
659     LrtsPreCommonInit(everReturn);
660
661 #if CMK_OFFLOAD_BCAST_PROCESS
662     int createQueue = 1;
663 #if CMK_SMP
664 #if CMK_SMP_NO_COMMTHD
665     /* If there's no comm thread, then the queue is created on rank 0 */
666     if (CmiMyRank()) createQueue = 0;
667 #else
668     if (CmiMyRank()<CmiMyNodeSize()) createQueue = 0;
669 #endif
670 #endif
671
672     if (createQueue) {
673         CsvAccess(procBcastQ) = PCQueueCreate();
674 #if CMK_NODE_QUEUE_AVAILABLE
675         CsvAccess(nodeBcastQ) = PCQueueCreate();
676 #endif
677     }
678 #endif
679
680     CmiNodeAllBarrier();
681
682     cs = CmiGetState();
683     CpvInitialize(void *,CmiLocalQueue);
684     CpvAccess(CmiLocalQueue) = cs->localqueue;
685
686     if (CmiMyRank())
687         CmiMyArgv=CmiCopyArgs(Cmi_argvcopy);
688     else
689         CmiMyArgv=Cmi_argv;
690
691     CthInit(CmiMyArgv);
692
693     /* initialize the network progress counter*/
694     /* Network progress function is used to poll the network when for
695        messages. This flushes receive buffers on some  implementations*/
696     CpvInitialize(unsigned , networkProgressCount);
697     CpvAccess(networkProgressCount) = 0;
698
699     ConverseCommonInit(CmiMyArgv);
700
701     LrtsPostCommonInit(everReturn);
702
703     /* Converse initialization finishes, immediate messages can be processed.
704        node barrier previously should take care of the node synchronization */
705     _immediateReady = 1;
706
707     /* communication thread */
708     if (CmiMyRank() == CmiMyNodeSize()) {
709         Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
710         while (1) CommunicationServerThread(5);
711     } else { /* worker thread */
712         if (!everReturn) {
713             Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
714             if (Cmi_usrsched==0) CsdScheduler(-1);
715             ConverseExit();
716         }
717     }
718 }
719 /* ##### End of Functions Related with Machine Startup ##### */
720
721 /* ##### Beginning of Functions Related with Machine Running ##### */
722 static INLINE_KEYWORD void AdvanceCommunication() {
723     int doProcessBcast = 1;
724
725 #if CMK_USE_PXSHM
726     CommunicationServerPxshm();
727 #endif
728
729     LrtsAdvanceCommunication();
730
731 #if CMK_OFFLOAD_BCAST_PROCESS
732 #if CMK_SMP_NO_COMMTHD
733     /*FIXME: only asks rank 0 to process bcast msgs, so perf may suffer*/
734     if (CmiMyRank()) doProcessBcast = 0;
735 #endif
736     if (doProcessBcast) processBcastQs();
737 #endif
738
739 #if CMK_IMMEDIATE_MSG
740 #if !CMK_SMP
741     CmiHandleImmediate();
742 #endif
743 #if CMK_SMP && CMK_SMP_NO_COMMTHD
744     if (CmiMyRank()==0) CmiHandleImmediate();
745 #endif
746 #endif
747
748 }
749
750 static void CommunicationServer(int sleepTime) {
751 #if CMK_SMP
752     AdvanceCommunication();
753
754     if (commThdExit == CmiMyNodeSize()) {
755         MACHSTATE(2, "CommunicationServer exiting {");
756         LrtsDrainResources();
757         MACHSTATE(2, "} CommunicationServer EXIT");
758
759         ConverseCommonExit();
760
761 #if CMK_USE_PXSHM
762         CmiExitPxshm();
763 #endif
764         LrtsExit();
765     }
766 #endif
767 }
768
769 static void CommunicationServerThread(int sleepTime) {
770     CommunicationServer(sleepTime);
771 #if CMK_IMMEDIATE_MSG
772     CmiHandleImmediate();
773 #endif
774 }
775
776 void ConverseExit(void) {
777     int i;
778 #if !CMK_SMP
779     LrtsDrainResources();
780 #endif
781
782     ConverseCommonExit();
783
784 if (MSG_STATISTIC)
785 {
786     for(i=0; i<22; i++)
787     {
788         CmiPrintf("[MSG PE:%d]", CmiMyPe());
789         if(msg_histogram[i] >0)
790             CmiPrintf("(%d:%d) ", 1<<i, msg_histogram[i]);
791     }
792     CmiPrintf("\n");
793 }
794 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
795     if (CmiMyPe() == 0) CmiPrintf("End of program\n");
796 #endif
797
798 #if !CMK_SMP
799 #if CMK_USE_PXSHM
800     CmiExitPxshm();
801 #endif
802     LrtsExit();
803 #else
804     /* In SMP, the communication thread will exit */
805     /* atomic increment */
806     CmiLock(commThdExitLock);
807     commThdExit++;
808     CmiUnlock(commThdExitLock);
809     while (1) CmiYield();
810 #endif
811 }
812 /* ##### End of Functions Related with Machine Running ##### */
813
814
815 /* ##### Beginning of Functions Providing Incoming Network Messages ##### */
816 void *CmiGetNonLocal(void) {
817     CmiState cs = CmiGetState();
818     void *msg = NULL;
819
820 #if !CMK_SMP || CMK_SMP_NO_COMMTHD
821     /**
822       * In SMP mode with comm thread, it's possible a normal
823       * msg is sent from an immediate msg which is executed
824       * on comm thread. In this case, the msg is sent to
825       * the network queue of the work thread. Therefore,
826       * even there's only one worker thread, the polling of
827       * network queue is still required.
828       */
829     if (CmiNumPes() == 1) return NULL;
830 #endif
831
832     MACHSTATE2(3, "[%p] CmiGetNonLocal begin %d{", cs, CmiMyPe());
833     CmiIdleLock_checkMessage(&cs->idle);
834     /* ?????although it seems that lock is not needed, I found it crashes very often
835        on mpi-smp without lock */
836     msg = PCQueuePop(cs->recv);
837 #if !CMK_SMP
838     if (!msg) {
839        AdvanceCommunication();
840        msg = PCQueuePop(cs->recv);
841     }
842 #else
843 //    LrtsPostNonLocal();
844 #endif
845
846     MACHSTATE3(3,"[%p] CmiGetNonLocal from queue %p with msg %p end }",CmiGetState(),(cs->recv), msg);
847
848     return msg;
849 }
850
851 #if CMK_NODE_QUEUE_AVAILABLE
852 void *CmiGetNonLocalNodeQ(void) {
853     CmiState cs = CmiGetState();
854     char *result = 0;
855     CmiIdleLock_checkMessage(&cs->idle);
856     if (!PCQueueEmpty(CsvAccess(NodeState).NodeRecv)) {
857         MACHSTATE1(3,"CmiGetNonLocalNodeQ begin %d {", CmiMyPe());
858         CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
859         result = (char *) PCQueuePop(CsvAccess(NodeState).NodeRecv);
860         CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
861         MACHSTATE1(3,"} CmiGetNonLocalNodeQ end %d ", CmiMyPe());
862     }
863
864     return result;
865 }
866 #endif
867 /* ##### End of Functions Providing Incoming Network Messages ##### */
868
869 static CmiIdleState *CmiNotifyGetState(void) {
870     CmiIdleState *s=(CmiIdleState *)malloc(sizeof(CmiIdleState));
871     s->sleepMs=0;
872     s->nIdles=0;
873     s->cs=CmiGetState();
874     return s;
875 }
876
877 static void CmiNotifyBeginIdle(CmiIdleState *s) {
878     s->sleepMs=0;
879     s->nIdles=0;
880 }
881
882 /*Number of times to spin before sleeping*/
883 #define SPINS_BEFORE_SLEEP 20
884 static void CmiNotifyStillIdle(CmiIdleState *s) {
885     MACHSTATE1(2,"still idle (%d) begin {",CmiMyPe())
886     s->nIdles++;
887     if (s->nIdles>SPINS_BEFORE_SLEEP) { /*Start giving some time back to the OS*/
888         s->sleepMs+=2;
889         if (s->sleepMs>10) s->sleepMs=10;
890     }
891
892     if (s->sleepMs>0) {
893         MACHSTATE1(2,"idle lock(%d) {",CmiMyPe())
894         CmiIdleLock_sleep(&s->cs->idle,s->sleepMs);
895         MACHSTATE1(2,"} idle lock(%d)",CmiMyPe())
896     }
897
898 #if !CMK_SMP
899     AdvanceCommunication();
900 #else
901     LrtsPostNonLocal();
902 #endif
903
904     MACHSTATE1(2,"still idle (%d) end {",CmiMyPe())
905 }
906
907 /* usually called in non-smp mode */
908 void CmiNotifyIdle(void) {
909     AdvanceCommunication();
910     CmiYield();
911 }
912
913 /* Utiltiy functions */
914 static char *CopyMsg(char *msg, int len) {
915     char *copy = (char *)CmiAlloc(len);
916 #if CMK_ERROR_CHECKING
917     if (!copy) {
918         CmiAbort("Error: out of memory in machine layer\n");
919     }
920 #endif
921     memcpy(copy, msg, len);
922     return copy;
923 }
924
925
926 #if CMK_USE_PXSHM
927 #include "machine-pxshm.c"
928 #endif
929
930