passing destNode to LrtsSendNetworkFunc.
[charm.git] / src / arch / util / machine-common-core.c
1 /*
2  *created by Chao Mei
3  *revised by Yanhua
4  */
5 #if CMK_C_INLINE
6 #define INLINE_KEYWORD inline
7 #else
8 #define INLINE_KEYWORD
9 #endif
10
11 /******* broadcast related  */
12 #ifndef CMK_BROADCAST_SPANNING_TREE
13 #define CMK_BROADCAST_SPANNING_TREE    1
14 #endif
15
16 #ifndef CMK_BROADCAST_HYPERCUBE
17 #define CMK_BROADCAST_HYPERCUBE        0
18 #endif
19
20 #define BROADCAST_SPANNING_FACTOR      4
21 /* The number of children used when a msg is broadcast inside a node */
22 #define BROADCAST_SPANNING_INTRA_FACTOR  8
23
24 /* Root of broadcast:
25  * non-bcast msg: root = 0;
26  * proc-level bcast msg: root >=1; (CmiMyPe()+1)
27  * node-level bcast msg: root <=-1; (-CmiMyNode()-1)
28  */
29 #define CMI_BROADCAST_ROOT(msg)          ((CmiMsgHeaderBasic *)msg)->root
30 #define CMI_SET_BROADCAST_ROOT(msg, root)  CMI_BROADCAST_ROOT(msg) = (root);
31
32 /**
33  * For some machine layers such as on Active Message framework,
34  * the receiver callback is usally executed on an internal
35  * thread (i.e. not the flow managed by ours). Therefore, for
36  * forwarding broadcast messages, we could have a choice whether
37  * to offload such function to the flow we manage such as the
38  * communication thread. -
39  */
40
41 #ifndef CMK_OFFLOAD_BCAST_PROCESS
42 #define CMK_OFFLOAD_BCAST_PROCESS 0
43 #endif
44
45 #if CMK_OFFLOAD_BCAST_PROCESS
46 CsvDeclare(PCQueue, procBcastQ);
47 #if CMK_NODE_QUEUE_AVAILABLE
48 CsvDeclare(PCQueue, nodeBcastQ);
49 #endif
50 #endif
51
52 static int  MSG_STATISTIC = 0;
53 int     msg_histogram[22];
54 static int _cmi_log2(int size)
55 {
56     int ret = 1;
57     size = size-1;
58     while( (size=size>>1)>0) ret++;
59     return ret;
60 }
61 #if CMK_BROADCAST_HYPERCUBE
62 /* ceil(log2(CmiNumNodes)) except when _Cmi_numnodes is 1, used for hypercube */
63 static int CmiNodesDim;
64 #endif
65 /* ###End of Broadcast related definitions ### */
66
67
68 static void handleOneBcastMsg(int size, char *msg);
69 static void processBcastQs();
70
71 /* Utility functions for forwarding broadcast messages,
72  * should not be used in machine-specific implementations
73  * except in some special occasions.
74  */
75 static INLINE_KEYWORD void processProcBcastMsg(int size, char *msg);
76 static INLINE_KEYWORD void processNodeBcastMsg(int size, char *msg);
77 static void SendSpanningChildrenProc(int size, char *msg);
78 static void SendHyperCubeProc(int size, char *msg);
79 #if CMK_NODE_QUEUE_AVAILABLE
80 static void SendSpanningChildrenNode(int size, char *msg);
81 static void SendHyperCubeNode(int size, char *msg);
82 #endif
83
84 static void SendSpanningChildren(int size, char *msg, int rankToAssign, int startNode);
85 static void SendHyperCube(int size,  char *msg, int rankToAssign, int startNode);
86
87 #if USE_COMMON_SYNC_BCAST || USE_COMMON_ASYNC_BCAST
88 #if !CMK_BROADCAST_SPANNING_TREE && !CMK_BROADCAST_HYPERCUBE
89 #warning "Broadcast function is based on the plain P2P O(P)-message scheme!!!"
90 #endif
91 #endif
92
93
94 void CmiSyncBroadcastFn(int size, char *msg);
95 CmiCommHandle CmiAsyncBroadcastFn(int size, char *msg);
96 void CmiFreeBroadcastFn(int size, char *msg);
97
98 void CmiSyncBroadcastAllFn(int size, char *msg);
99 CmiCommHandle CmiAsyncBroadcastAllFn(int size, char *msg);
100 void CmiFreeBroadcastAllFn(int size, char *msg);
101
102 #if CMK_NODE_QUEUE_AVAILABLE
103 void CmiSyncNodeBroadcastFn(int size, char *msg);
104 CmiCommHandle CmiAsyncNodeeroadcastFn(int size, char *msg);
105 void CmiFreeNodeBroadcastFn(int size, char *msg);
106
107 void CmiSyncNodeBroadcastAllFn(int size, char *msg);
108 CmiCommHandle CmiAsyncNodeBroadcastAllFn(int size, char *msg);
109 void CmiFreeNodeBroadcastAllFn(int size, char *msg);
110 #endif
111
112 /************** Done with Broadcast related */
113
114 #define CMI_DEST_RANK(msg)               ((CmiMsgHeaderBasic *)msg)->rank
115
116 #ifndef CMK_HAS_SIZE_IN_MSGHDR
117 #define CMK_HAS_SIZE_IN_MSGHDR 1
118 #endif
119 #if CMK_HAS_SIZE_IN_MSGHDR
120 #define CMI_MSG_SIZE(msg)  ((CmiMsgHeaderBasic *)msg)->size
121 #else
122 #define CMI_MSG_SIZE(msg)  (CmiAbort("Has no msg size in header"))
123 #endif
124
125 #if CMK_NODE_QUEUE_AVAILABLE
126 /* This value should be larger than the number of cores used
127  * per charm smp node. So it's currently set to such a large
128  * value.
129  */
130 #define DGRAM_NODEMESSAGE   (0x1FFB)
131 #endif
132
133 /* Node state structure */
134 int               _Cmi_mynode;    /* Which address space am I */
135 int               _Cmi_mynodesize;/* Number of processors in my address space */
136 int               _Cmi_numnodes;  /* Total number of address spaces */
137 int               _Cmi_numpes;    /* Total number of processors */
138
139 CpvDeclare(void*, CmiLocalQueue);
140
141 /* different modes for sending a message */
142 #define P2P_SYNC 0x1
143 #define P2P_ASYNC 0x2
144 #define BCAST_SYNC 0x3
145 #define BCAST_ASYNC 0x4
146
147 enum MACHINE_SMP_MODE {
148     INVALID_MODE,
149     COMM_THREAD_SEND_RECV = 0,
150     COMM_THREAD_ONLY_RECV, /* work threads will do the send */
151     COMM_THREAD_NOT_EXIST /* work threads will do both send and recv */
152 };
153 /* The default mode of smp charm runtime */
154 static enum MACHINE_SMP_MODE Cmi_smp_mode_setting = COMM_THREAD_SEND_RECV;
155
156
157 #if CMK_SMP
158 static volatile int commThdExit = 0;
159 static CmiNodeLock  commThdExitLock = 0;
160
161 /**
162  *  The macro defines whether to have a comm thd to offload some
163  *  work such as forwarding bcast messages etc. This macro
164  *  should be defined before including "machine-smp.c". Note
165  *  that whether a machine layer in SMP mode could run w/o comm
166  *  thread depends on the support of the underlying
167  *  communication library.
168  *
169  */
170 #ifndef CMK_SMP_NO_COMMTHD
171 #define CMK_SMP_NO_COMMTHD 0
172 #endif
173
174 #if CMK_SMP_NO_COMMTHD
175 int Cmi_commthread = 0;
176 #else
177 int Cmi_commthread = 1;
178 #endif
179
180 #endif
181
182 /*SHOULD BE MOVED TO MACHINE-SMP.C ??*/
183 static int Cmi_nodestart;
184
185 /*
186  * Network progress utility variables. Period controls the rate at
187  * which the network poll is called
188  */
189 #ifndef NETWORK_PROGRESS_PERIOD_DEFAULT
190 #define NETWORK_PROGRESS_PERIOD_DEFAULT 1000
191 #endif
192
193 CpvDeclare(unsigned , networkProgressCount);
194 int networkProgressPeriod;
195
196
197 /* ===== Beginning of Common Function Declarations ===== */
198 void CmiAbort(const char *message);
199 static void PerrorExit(const char *msg);
200
201 /* This function handles the msg received as which queue to push into */
202 static void handleOneRecvedMsg(int size, char *msg);
203
204 /* Utility functions for forwarding broadcast messages,
205  * should not be used in machine-specific implementations
206  * except in some special occasions.
207  */
208 static void SendToPeers(int size, char *msg);
209
210
211 void CmiPushPE(int rank, void *msg);
212
213 #if CMK_NODE_QUEUE_AVAILABLE
214 void CmiPushNode(void *msg);
215 #endif
216
217 /* Functions regarding send ops declared in converse.h */
218
219 /* In default, using the common codes for msg sending */
220 #ifndef USE_COMMON_SYNC_P2P
221 #define USE_COMMON_SYNC_P2P 1
222 #endif
223 #ifndef USE_COMMON_ASYNC_P2P
224 #define USE_COMMON_ASYNC_P2P 1
225 #endif
226 #ifndef USE_COMMON_SYNC_BCAST
227 #define USE_COMMON_SYNC_BCAST 1
228 #endif
229 #ifndef USE_COMMON_ASYNC_BCAST
230 #define USE_COMMON_ASYNC_BCAST 1
231 #endif
232
233 static void CmiSendSelf(char *msg);
234
235 void CmiSyncSendFn(int destPE, int size, char *msg);
236 CmiCommHandle CmiAsyncSendFn(int destPE, int size, char *msg);
237 void CmiFreeSendFn(int destPE, int size, char *msg);
238
239 #if CMK_NODE_QUEUE_AVAILABLE
240 static void CmiSendNodeSelf(char *msg);
241
242 void CmiSyncNodeSendFn(int destNode, int size, char *msg);
243 CmiCommHandle CmiAsyncNodeSendFn(int destNode, int size, char *msg);
244 void CmiFreeNodeSendFn(int destNode, int size, char *msg);
245
246 #endif
247
248 /* Functions and variables regarding machine startup */
249 static char     **Cmi_argv;
250 static char     **Cmi_argvcopy;
251 static CmiStartFn Cmi_startfn;   /* The start function */
252 static int        Cmi_usrsched;  /* Continue after start function finishes? */
253 void ConverseInit(int argc, char **argv, CmiStartFn fn, int usched, int initret);
254 static void ConverseRunPE(int everReturn);
255
256 /* Functions regarding machine running on every proc */
257 static void AdvanceCommunication(int whenidle);
258 static void CommunicationServer(int sleepTime);
259 static void CommunicationServerThread(int sleepTime);
260 void ConverseExit(void);
261
262 /* Functions providing incoming network messages */
263 void *CmiGetNonLocal(void);
264 #if CMK_NODE_QUEUE_AVAILABLE
265 void *CmiGetNonLocalNodeQ(void);
266 #endif
267 /* Utiltiy functions */
268 static char *CopyMsg(char *msg, int len);
269
270 /* ===== End of Common Function Declarations ===== */
271
272 #include "machine-smp.c"
273
274 /* ===== Beginning of Idle-state Related Declarations =====  */
275 typedef struct {
276     int sleepMs; /*Milliseconds to sleep while idle*/
277     int nIdles; /*Number of times we've been idle in a row*/
278     CmiState cs; /*Machine state*/
279 } CmiIdleState;
280
281 static CmiIdleState *CmiNotifyGetState(void);
282
283 /**
284  *  Generally,
285  *
286  *  CmiNotifyIdle is used in non-SMP mode when the proc is idle.
287  *  When the proc is idle, AdvanceCommunication needs to be
288  *  called.
289  *
290  *  CmiNotifyStillIdle and CmiNotifyBeginIdle are used in SMP mode.
291  *
292  *  Different layers have choices of registering different callbacks for
293  *  idle state.
294  */
295 static void CmiNotifyBeginIdle(CmiIdleState *s);
296 static void CmiNotifyStillIdle(CmiIdleState *s);
297 void CmiNotifyIdle(void);
298 /* ===== End of Idle-state Related Declarations =====  */
299
300 /* ===== Beginning of Processor/Node State-related Stuff =====*/
301 #if !CMK_SMP
302 /************ non SMP **************/
303 static struct CmiStateStruct Cmi_state;
304 int _Cmi_mype;
305 int _Cmi_myrank;
306
307 void CmiMemLock() {}
308 void CmiMemUnlock() {}
309
310 #define CmiGetState() (&Cmi_state)
311 #define CmiGetStateN(n) (&Cmi_state)
312
313 void CmiYield(void) {
314     sleep(0);
315 }
316
317 static void CmiStartThreads(char **argv) {
318     CmiStateInit(Cmi_nodestart, 0, &Cmi_state);
319     _Cmi_mype = Cmi_nodestart;
320     _Cmi_myrank = 0;
321 }
322 #else
323 /************** SMP *******************/
324 INLINE_KEYWORD int CmiMyPe() {
325     return CmiGetState()->pe;
326 }
327 INLINE_KEYWORD int CmiMyRank() {
328     return CmiGetState()->rank;
329 }
330 INLINE_KEYWORD int CmiNodeFirst(int node) {
331     return node*_Cmi_mynodesize;
332 }
333 INLINE_KEYWORD int CmiNodeSize(int node) {
334     return _Cmi_mynodesize;
335 }
336 INLINE_KEYWORD int CmiNodeOf(int pe) {
337     return (pe/_Cmi_mynodesize);
338 }
339 INLINE_KEYWORD int CmiRankOf(int pe) {
340     return pe%_Cmi_mynodesize;
341 }
342 #endif
343 CsvDeclare(CmiNodeState, NodeState);
344 /* ===== End of Processor/Node State-related Stuff =====*/
345
346 #include "machine-broadcast.c"
347 #include "immediate.c"
348 #include "machine-commthd-util.c"
349
350 /* ===== Beginning of Common Function Definitions ===== */
351 static void PerrorExit(const char *msg) {
352     perror(msg);
353     exit(1);
354 }
355
356 /* ##### Beginning of Functions Related with Message Sending OPs ##### */
357 /*Add a message to this processor's receive queue, pe is a rank */
358 void CmiPushPE(int rank,void *msg) {
359     CmiState cs = CmiGetStateN(rank);
360     MACHSTATE2(3,"Pushing message into rank %d's queue %p{",rank, cs->recv);
361 #if CMK_IMMEDIATE_MSG
362     if (CmiIsImmediate(msg)) {
363         MACHSTATE1(3, "[%p] Push Immediate Message begin{",CmiGetState());
364         CMI_DEST_RANK(msg) = rank;
365         CmiPushImmediateMsg(msg);
366         MACHSTATE1(3, "[%p] Push Immediate Message end}",CmiGetState());
367         return;
368     }
369 #endif
370
371     PCQueuePush(cs->recv,msg);
372
373 #if CMK_SHARED_VARS_POSIX_THREADS_SMP
374   if (_Cmi_noprocforcommthread)
375 #endif
376     CmiIdleLock_addMessage(&cs->idle);
377     MACHSTATE1(3,"} Pushing message into rank %d's queue done",rank);
378 }
379
380 #if CMK_NODE_QUEUE_AVAILABLE
381 /*Add a message to this processor's receive queue */
382 void CmiPushNode(void *msg) {
383     MACHSTATE(3,"Pushing message into NodeRecv queue");
384 #if CMK_IMMEDIATE_MSG
385     if (CmiIsImmediate(msg)) {
386         CMI_DEST_RANK(msg) = 0;
387         CmiPushImmediateMsg(msg);
388         return;
389     }
390 #endif
391     CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
392     PCQueuePush(CsvAccess(NodeState).NodeRecv,msg);
393     CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
394
395 #if CMK_SHARED_VARS_POSIX_THREADS_SMP
396     if (_Cmi_noprocforcommthread)
397 #endif
398     {
399         CmiState cs=CmiGetStateN(0);
400         CmiIdleLock_addMessage(&cs->idle);
401     }
402 }
403 #endif
404
405 /* This function handles the msg received as which queue to push into */
406 static INLINE_KEYWORD void handleOneRecvedMsg(int size, char *msg) {
407     int isBcastMsg = 0;
408 #if CMK_BROADCAST_SPANNING_TREE || CMK_BROADCAST_HYPERCUBE
409     isBcastMsg = (CMI_BROADCAST_ROOT(msg)!=0);
410 #endif
411
412     if (isBcastMsg) {
413         handleOneBcastMsg(size, msg);
414         return;
415     }
416
417 #if CMK_NODE_QUEUE_AVAILABLE
418     if (CMI_DEST_RANK(msg)==DGRAM_NODEMESSAGE){
419         CmiPushNode(msg);
420         return;
421     }
422 #endif
423     CmiPushPE(CMI_DEST_RANK(msg), msg);
424
425 }
426
427
428 static void SendToPeers(int size, char *msg) {
429     /* FIXME: now it's just a flat p2p send!! When node size is large,
430     * it should also be sent in a tree
431     */
432     int exceptRank = CMI_DEST_RANK(msg);
433     int i;
434     for (i=0; i<exceptRank; i++) {
435         CmiPushPE(i, CopyMsg(msg, size));
436     }
437     for (i=exceptRank+1; i<CmiMyNodeSize(); i++) {
438         CmiPushPE(i, CopyMsg(msg, size));
439     }
440 }
441
442
443 /* Functions regarding sending operations */
444 static void CmiSendSelf(char *msg) {
445 #if CMK_IMMEDIATE_MSG
446     if (CmiIsImmediate(msg)) {
447         /* CmiBecomeNonImmediate(msg); */
448         CmiPushImmediateMsg(msg);
449         CmiHandleImmediate();
450         return;
451     }
452 #endif
453     CdsFifo_Enqueue(CpvAccess(CmiLocalQueue),msg);
454 }
455
456 /* Functions regarding P2P send op */
457 #if USE_COMMON_SYNC_P2P
458 void CmiSyncSendFn(int destPE, int size, char *msg) {
459     char *dupmsg = CopyMsg(msg, size);
460     CmiFreeSendFn(destPE, size, dupmsg);
461 }
462
463 #if CMK_USE_PXSHM
464 inline int CmiValidPxshm(int dst, int size);
465 void CmiSendMessagePxshm(char *msg, int size, int dstpe, int *refcount);
466 void CmiInitPxshm(char **argv);
467 inline void CommunicationServerPxshm();
468 void CmiExitPxshm();
469 #endif
470
471 #if CMK_USE_XPMEM
472 inline int CmiValidXpmem(int dst, int size);
473 void CmiSendMessageXpmem(char *msg, int size, int dstpe, int *refcount);
474 void CmiInitXpmem(char **argv);
475 inline void CommunicationServerXpmem();
476 void CmiExitXpmem();
477 #endif
478
479 int refcount = 0;
480
481 /* a wrapper of LrtsSendFunc */
482 #if CMK_C_INLINE
483 inline 
484 #endif
485 CmiCommHandle LrtsSendNetworkFunc(int destNode, int size, char *msg, int mode)
486 {
487         int rank;
488 #if CMK_USE_PXSHM
489         if (CmiValidPxshm(destNode, size)) {
490           CmiSendMessagePxshm(msg, size, destNode, &refcount);
491           //for (int i=0; i<refcount; i++) CmiReference(msg);
492           return 0;
493         }
494 #endif
495 #if CMK_USE_XPMEM
496         if (CmiValidXpmem(destNode, size)) {
497           CmiSendMessageXpmem(msg, size, destNode, &refcount);
498           //for (int i=0; i<refcount; i++) CmiReference(msg);
499           return 0;
500         }
501 #endif
502
503 if (MSG_STATISTIC)
504 {
505     int ret_log = _cmi_log2(size);
506     if(ret_log >21) ret_log = 21;
507     msg_histogram[ret_log]++;
508 }
509     return LrtsSendFunc(destNode, size, msg, mode);
510 }
511
512 void CmiFreeSendFn(int destPE, int size, char *msg) {
513     CMI_SET_BROADCAST_ROOT(msg, 0);
514     CQdCreate(CpvAccess(cQdState), 1);
515     if (CmiMyPe()==destPE) {
516         CmiSendSelf(msg);
517 #if CMK_PERSISTENT_COMM
518         if (phs) curphs++;
519 #endif
520     } else {
521 #if CMK_PERSISTENT_COMM
522         if (phs) {
523           if (size > 8192) {
524             CmiAssert(curphs < phsSize);
525             LrtsSendPersistentMsg(phs[curphs++], destPE, size, msg);
526             return;
527           }
528           else
529             curphs++;
530         }
531 #endif
532         int destNode = CmiNodeOf(destPE);
533 #if CMK_SMP
534         if (CmiMyNode()==destNode) {
535             CmiPushPE(CmiRankOf(destPE), msg);
536             return;
537         }
538 #endif
539         CMI_DEST_RANK(msg) = CmiRankOf(destPE);
540         LrtsSendNetworkFunc(destNode, size, msg, P2P_SYNC);
541     }
542 }
543 #endif
544
545 #if USE_COMMON_ASYNC_P2P
546 CmiCommHandle CmiAsyncSendFn(int destPE, int size, char *msg) {
547     int destNode = CmiNodeOf(destPE);
548     if (destNode == CmiMyNode()) {
549         CmiSyncSendFn(destPE,size,msg);
550         return 0;
551     } else {
552 if (  MSG_STATISTIC)
553 {
554     int ret_log = _cmi_log2(size);
555         if(ret_log >21) ret_log = 21;
556         msg_histogram[ret_log]++;
557 }
558         return LrtsSendFunc(destPE, size, msg, P2P_ASYNC);
559     }
560 }
561 #endif
562
563 #if CMK_NODE_QUEUE_AVAILABLE
564 static void CmiSendNodeSelf(char *msg) {
565 #if CMK_IMMEDIATE_MSG
566     if (CmiIsImmediate(msg)) {
567         CmiPushImmediateMsg(msg);
568         if (!_immRunning) CmiHandleImmediate();
569         return;
570     }
571 #endif
572     CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
573     PCQueuePush(CsvAccess(NodeState).NodeRecv, msg);
574     CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
575 }
576
577 #if USE_COMMON_ASYNC_P2P
578 void CmiSyncNodeSendFn(int destNode, int size, char *msg) {
579     char *dupmsg = CopyMsg(msg, size);
580     CmiFreeNodeSendFn(destNode, size, dupmsg);
581 }
582
583 void CmiFreeNodeSendFn(int destNode, int size, char *msg) {
584     CMI_DEST_RANK(msg) = DGRAM_NODEMESSAGE;
585     CQdCreate(CpvAccess(cQdState), 1);
586     CMI_SET_BROADCAST_ROOT(msg, 0);
587     if (destNode == CmiMyNode()) {
588         CmiSendNodeSelf(msg);
589     } else {
590 if (  MSG_STATISTIC)
591 {
592     int ret_log = _cmi_log2(size);
593     if(ret_log >21) ret_log = 21;
594     msg_histogram[ret_log]++;
595 }
596         LrtsSendFunc(destNode, size, msg, P2P_SYNC);
597     }
598 }
599 #endif
600
601 #if USE_COMMON_ASYNC_P2P
602 CmiCommHandle CmiAsyncNodeSendFn(int destNode, int size, char *msg) {
603     if (destNode == CmiMyNode()) {
604         CmiSyncNodeSendFn(destNode, size, msg);
605         return 0;
606     } else {
607 if (  MSG_STATISTIC)
608 {
609         int ret_log = _cmi_log2(size);
610         if(ret_log >21) ret_log = 21;
611         msg_histogram[ret_log]++;
612 }
613         return LrtsSendFunc(destNode, size, msg, P2P_ASYNC);
614     }
615 }
616 #endif
617 #endif
618
619 /* ##### Beginning of Functions Related with Machine Startup ##### */
620 void ConverseInit(int argc, char **argv, CmiStartFn fn, int usched, int initret) {
621     int _ii;
622     int tmp;
623     MSG_STATISTIC = CmiGetArgFlag(argv, "+msgstatistic");
624     /* processor per node */
625     _Cmi_mynodesize = 1;
626     if (!CmiGetArgInt(argv,"+ppn", &_Cmi_mynodesize))
627         CmiGetArgInt(argv,"++ppn", &_Cmi_mynodesize);
628 #if ! CMK_SMP
629     if (_Cmi_mynodesize > 1 && _Cmi_mynode == 0)
630         CmiAbort("+ppn cannot be used in non SMP version!\n");
631 #endif
632
633     /* Network progress function is used to poll the network when for
634     messages. This flushes receive buffers on some  implementations*/
635     networkProgressPeriod = NETWORK_PROGRESS_PERIOD_DEFAULT;
636     CmiGetArgInt(argv, "+networkProgressPeriod", &networkProgressPeriod);
637
638     /* _Cmi_mynodesize has to be obtained before LrtsInit
639      * because it may be used inside LrtsInit
640      */
641     /* argv could be changed inside LrtsInit */
642     /* Inside this function, the number of nodes and my node id are obtained */
643 if (  MSG_STATISTIC)
644 {
645     for(_ii=0; _ii<22; _ii++)
646         msg_histogram[_ii] = 0;
647 }
648
649     LrtsInit(&argc, &argv, &_Cmi_numnodes, &_Cmi_mynode);
650    
651         if (_Cmi_mynode==0) {
652 #if !CMK_SMP 
653                 printf("Charm++> Running on non-SMP mode\n");
654 #else
655                 printf("Charm++> Running on SMP mode, %d worker threads per process\n", _Cmi_mynodesize);
656                 if (Cmi_smp_mode_setting == COMM_THREAD_SEND_RECV) {
657                         printf("Charm++> The comm. thread will both send and receive messages\n");
658                 } else if (Cmi_smp_mode_setting == COMM_THREAD_ONLY_RECV) {
659                         printf("Charm++> The comm. thread will only receive messages, while work threads will send messages\n");
660                 } else if (Cmi_smp_mode_setting == COMM_THREAD_NOT_EXIST) {
661                         printf("Charm++> There's no comm. thread. Work threads will both send and receive messages\n");
662                 } else {
663                         CmiAbort("Charm++> Invalid SMP mode setting\n");
664                 }
665 #endif
666         }
667
668     _Cmi_numpes = _Cmi_numnodes * _Cmi_mynodesize;
669     Cmi_nodestart = _Cmi_mynode * _Cmi_mynodesize;
670     Cmi_argvcopy = CmiCopyArgs(argv);
671     Cmi_argv = argv;
672     Cmi_startfn = fn;
673     Cmi_usrsched = usched;
674
675 #if CMK_USE_PXSHM
676     CmiInitPxshm(argv);
677 #endif
678 #if CMK_USE_XPMEM
679     CmiInitXpmem(argv);
680 #endif
681
682     /* CmiTimerInit(); */
683 #if CMK_BROADCAST_HYPERCUBE
684     /* CmiNodesDim = ceil(log2(CmiNumNodes)) except when #nodes is 1*/
685     tmp = CmiNumNodes()-1;
686     CmiNodesDim = 0;
687     while (tmp>0) {
688         CmiNodesDim++;
689         tmp = tmp >> 1;
690     }
691     if (CmiNumNodes()==1) CmiNodesDim=1;
692 #endif
693
694     CsvInitialize(CmiNodeState, NodeState);
695     CmiNodeStateInit(&CsvAccess(NodeState));
696 #if CMK_SMP
697     commThdExitLock = CmiCreateLock();
698 #endif
699
700 #if CMK_OFFLOAD_BCAST_PROCESS
701     /* the actual queues should be created on comm thread considering NUMA in SMP */
702     CsvInitialize(PCQueue, procBcastQ);
703 #if CMK_NODE_QUEUE_AVAILABLE
704     CsvInitialize(PCQueue, nodeBcastQ);
705 #endif
706 #endif
707
708 #if CMK_SMP && CMK_LEVERAGE_COMMTHREAD
709     CsvInitialize(PCQueue, notifyCommThdMsgBuffer);
710 #endif
711
712     CmiStartThreads(argv);
713
714     ConverseRunPE(initret);
715 }
716
717 static void ConverseRunPE(int everReturn) {
718     CmiState cs;
719     char** CmiMyArgv;
720
721     LrtsPreCommonInit(everReturn);
722
723 #if CMK_OFFLOAD_BCAST_PROCESS
724     int createQueue = 1;
725 #if CMK_SMP
726 #if CMK_SMP_NO_COMMTHD
727     /* If there's no comm thread, then the queue is created on rank 0 */
728     if (CmiMyRank()) createQueue = 0;
729 #else
730     if (CmiMyRank()<CmiMyNodeSize()) createQueue = 0;
731 #endif
732 #endif
733
734     if (createQueue) {
735         CsvAccess(procBcastQ) = PCQueueCreate();
736 #if CMK_NODE_QUEUE_AVAILABLE
737         CsvAccess(nodeBcastQ) = PCQueueCreate();
738 #endif
739     }
740 #endif
741
742     CmiNodeAllBarrier();
743
744     cs = CmiGetState();
745     CpvInitialize(void *,CmiLocalQueue);
746     CpvAccess(CmiLocalQueue) = cs->localqueue;
747
748     if (CmiMyRank())
749         CmiMyArgv=CmiCopyArgs(Cmi_argvcopy);
750     else
751         CmiMyArgv=Cmi_argv;
752
753     CthInit(CmiMyArgv);
754
755     /* initialize the network progress counter*/
756     /* Network progress function is used to poll the network when for
757        messages. This flushes receive buffers on some  implementations*/
758     CpvInitialize(unsigned , networkProgressCount);
759     CpvAccess(networkProgressCount) = 0;
760
761     ConverseCommonInit(CmiMyArgv);
762
763     LrtsPostCommonInit(everReturn);
764
765 #if CMK_SMP && CMK_LEVERAGE_COMMTHREAD
766     CmiInitNotifyCommThdScheme();
767 #endif
768     /* Converse initialization finishes, immediate messages can be processed.
769        node barrier previously should take care of the node synchronization */
770     _immediateReady = 1;
771
772     /* communication thread */
773     if (CmiMyRank() == CmiMyNodeSize()) {
774         Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
775         while (1) CommunicationServerThread(5);
776     } else { /* worker thread */
777         if (!everReturn) {
778             Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
779             if (Cmi_usrsched==0) CsdScheduler(-1);
780             ConverseExit();
781         }
782     }
783 }
784 /* ##### End of Functions Related with Machine Startup ##### */
785
786 /* ##### Beginning of Functions Related with Machine Running ##### */
787 static INLINE_KEYWORD void AdvanceCommunication(int whenidle) {
788     int doProcessBcast = 1;
789
790 #if CMK_USE_PXSHM
791     CommunicationServerPxshm();
792 #endif
793 #if CMK_USE_XPMEM
794     CommunicationServerXpmem();
795 #endif
796
797     LrtsAdvanceCommunication(whenidle);
798
799 #if CMK_OFFLOAD_BCAST_PROCESS
800 #if CMK_SMP_NO_COMMTHD
801     /*FIXME: only asks rank 0 to process bcast msgs, so perf may suffer*/
802     if (CmiMyRank()) doProcessBcast = 0;
803 #endif
804     if (doProcessBcast) processBcastQs();
805 #endif
806
807 #if CMK_IMMEDIATE_MSG
808 #if !CMK_SMP
809     CmiHandleImmediate();
810 #endif
811 #if CMK_SMP && CMK_SMP_NO_COMMTHD
812     if (CmiMyRank()==0) CmiHandleImmediate();
813 #endif
814 #endif
815 }
816
817 static void CommunicationServer(int sleepTime) {
818 #if CMK_SMP
819     AdvanceCommunication(0);
820
821     if (commThdExit == CmiMyNodeSize()) {
822         MACHSTATE(2, "CommunicationServer exiting {");
823         LrtsDrainResources();
824         MACHSTATE(2, "} CommunicationServer EXIT");
825
826         ConverseCommonExit();
827
828 #if CMK_USE_PXSHM
829         CmiExitPxshm();
830 #endif
831 #if CMK_USE_XPMEM
832         CmiExitXpmem();
833 #endif
834         LrtsExit();
835     }
836 #endif
837 }
838
839 static void CommunicationServerThread(int sleepTime) {
840     CommunicationServer(sleepTime);
841 #if CMK_IMMEDIATE_MSG
842     CmiHandleImmediate();
843 #endif
844 }
845
846 void ConverseExit(void) {
847     int i;
848 #if !CMK_SMP
849     LrtsDrainResources();
850 #else
851         if(Cmi_smp_mode_setting == COMM_THREAD_ONLY_RECV
852            || Cmi_smp_mode_setting == COMM_THREAD_NOT_EXIST)
853                 LrtsDrainResources();
854 #endif
855
856     ConverseCommonExit();
857
858 if (MSG_STATISTIC)
859 {
860     for(i=0; i<22; i++)
861     {
862         CmiPrintf("[MSG PE:%d]", CmiMyPe());
863         if(msg_histogram[i] >0)
864             CmiPrintf("(%d:%d) ", 1<<i, msg_histogram[i]);
865     }
866     CmiPrintf("\n");
867 }
868 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
869     if (CmiMyPe() == 0) CmiPrintf("End of program\n");
870 #endif
871
872 #if !CMK_SMP
873 #if CMK_USE_PXSHM
874     CmiExitPxshm();
875 #endif
876 #if CMK_USE_XPMEM
877     CmiExitXpmem();
878 #endif
879     LrtsExit();
880 #else
881     /* In SMP, the communication thread will exit */
882     /* atomic increment */
883     CmiLock(commThdExitLock);
884     commThdExit++;
885     CmiUnlock(commThdExitLock);
886     while (1) CmiYield();
887 #endif
888 }
889 /* ##### End of Functions Related with Machine Running ##### */
890
891 void CmiAbort(const char *message) {
892 #if CMK_USE_PXSHM
893     CmiExitPxshm();
894 #endif
895 #if CMK_USE_XPMEM
896     CmiExitXpmem();
897 #endif
898     LrtsAbort(message);
899 }
900
901 /* ##### Beginning of Functions Providing Incoming Network Messages ##### */
902 void *CmiGetNonLocal(void) {
903     CmiState cs = CmiGetState();
904     void *msg = NULL;
905
906 #if !CMK_SMP || CMK_SMP_NO_COMMTHD
907     /**
908       * In SMP mode with comm thread, it's possible a normal
909       * msg is sent from an immediate msg which is executed
910       * on comm thread. In this case, the msg is sent to
911       * the network queue of the work thread. Therefore,
912       * even there's only one worker thread, the polling of
913       * network queue is still required.
914       */
915     if (CmiNumPes() == 1) return NULL;
916 #endif
917
918     MACHSTATE2(3, "[%p] CmiGetNonLocal begin %d{", cs, CmiMyPe());
919     CmiIdleLock_checkMessage(&cs->idle);
920     /* ?????although it seems that lock is not needed, I found it crashes very often
921        on mpi-smp without lock */
922     msg = PCQueuePop(cs->recv);
923 #if !CMK_SMP
924     if (!msg) {
925        AdvanceCommunication(0);
926        msg = PCQueuePop(cs->recv);
927     }
928 #else
929 //    LrtsPostNonLocal();
930 #endif
931
932     MACHSTATE3(3,"[%p] CmiGetNonLocal from queue %p with msg %p end }",CmiGetState(),(cs->recv), msg);
933
934     return msg;
935 }
936
937 #if CMK_NODE_QUEUE_AVAILABLE
938 void *CmiGetNonLocalNodeQ(void) {
939     CmiState cs = CmiGetState();
940     char *result = 0;
941     CmiIdleLock_checkMessage(&cs->idle);
942     if (!PCQueueEmpty(CsvAccess(NodeState).NodeRecv)) {
943         MACHSTATE1(3,"CmiGetNonLocalNodeQ begin %d {", CmiMyPe());
944         CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
945         result = (char *) PCQueuePop(CsvAccess(NodeState).NodeRecv);
946         CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
947         MACHSTATE1(3,"} CmiGetNonLocalNodeQ end %d ", CmiMyPe());
948     }
949
950     return result;
951 }
952 #endif
953 /* ##### End of Functions Providing Incoming Network Messages ##### */
954
955 static CmiIdleState *CmiNotifyGetState(void) {
956     CmiIdleState *s=(CmiIdleState *)malloc(sizeof(CmiIdleState));
957     s->sleepMs=0;
958     s->nIdles=0;
959     s->cs=CmiGetState();
960     return s;
961 }
962
963 static void CmiNotifyBeginIdle(CmiIdleState *s) {
964     s->sleepMs=0;
965     s->nIdles=0;
966 }
967
968 /*Number of times to spin before sleeping*/
969 #define SPINS_BEFORE_SLEEP 20
970 static void CmiNotifyStillIdle(CmiIdleState *s) {
971     MACHSTATE1(2,"still idle (%d) begin {",CmiMyPe())
972 #if !CMK_SMP
973     AdvanceCommunication(1);
974 #else
975     LrtsPostNonLocal();
976
977     if (_Cmi_noprocforcommthread) {
978     s->nIdles++;
979     if (s->nIdles>SPINS_BEFORE_SLEEP) { /*Start giving some time back to the OS*/
980         s->sleepMs+=2;
981         if (s->sleepMs>10) s->sleepMs=10;
982     }
983
984     if (s->sleepMs>0) {
985         MACHSTATE1(2,"idle lock(%d) {",CmiMyPe())
986         CmiIdleLock_sleep(&s->cs->idle,s->sleepMs);
987         MACHSTATE1(2,"} idle lock(%d)",CmiMyPe())
988     }
989     }
990 #endif
991
992     MACHSTATE1(2,"still idle (%d) end {",CmiMyPe())
993 }
994
995 /* usually called in non-smp mode */
996 void CmiNotifyIdle(void) {
997     AdvanceCommunication(1);
998     CmiYield();
999 }
1000
1001 /* Utiltiy functions */
1002 static char *CopyMsg(char *msg, int len) {
1003     char *copy = (char *)CmiAlloc(len);
1004 #if CMK_ERROR_CHECKING
1005     if (!copy) {
1006         CmiAbort("Error: out of memory in machine layer\n");
1007     }
1008 #endif
1009     memcpy(copy, msg, len);
1010     return copy;
1011 }
1012
1013
1014 #if CMK_USE_PXSHM
1015 #include "machine-pxshm.c"
1016 #endif
1017 #if CMK_USE_XPMEM
1018 #include "machine-xpmem.c"
1019 #endif
1020
1021