a new mode that both worker and comm thread send and receive messages.
[charm.git] / src / arch / util / machine-common-core.c
1 /*
2  *created by Chao Mei
3  *revised by Yanhua, Gengbin
4  */
5
6 #if CMK_C_INLINE
7 #define INLINE_KEYWORD inline
8 #else
9 #define INLINE_KEYWORD
10 #endif
11
12 /******* broadcast related  */
13 #ifndef CMK_BROADCAST_SPANNING_TREE
14 #define CMK_BROADCAST_SPANNING_TREE    1
15 #endif
16
17 #ifndef CMK_BROADCAST_HYPERCUBE
18 #define CMK_BROADCAST_HYPERCUBE        0
19 #endif
20
21 #define BROADCAST_SPANNING_FACTOR      4
22 /* The number of children used when a msg is broadcast inside a node */
23 #define BROADCAST_SPANNING_INTRA_FACTOR  8
24
25 /* Root of broadcast:
26  * non-bcast msg: root = 0;
27  * proc-level bcast msg: root >=1; (CmiMyPe()+1)
28  * node-level bcast msg: root <=-1; (-CmiMyNode()-1)
29  */
30 #define CMI_BROADCAST_ROOT(msg)          ((CmiMsgHeaderBasic *)msg)->root
31 #define CMI_SET_BROADCAST_ROOT(msg, root)  CMI_BROADCAST_ROOT(msg) = (root);
32
33 /**
34  * For some machine layers such as on Active Message framework,
35  * the receiver callback is usally executed on an internal
36  * thread (i.e. not the flow managed by ours). Therefore, for
37  * forwarding broadcast messages, we could have a choice whether
38  * to offload such function to the flow we manage such as the
39  * communication thread. -
40  */
41
42 #ifndef CMK_OFFLOAD_BCAST_PROCESS
43 #define CMK_OFFLOAD_BCAST_PROCESS 0
44 #endif
45
46 #if CMK_OFFLOAD_BCAST_PROCESS
47 CsvDeclare(PCQueue, procBcastQ);
48 #if CMK_NODE_QUEUE_AVAILABLE
49 CsvDeclare(PCQueue, nodeBcastQ);
50 #endif
51 #endif
52
53 #if CMK_WITH_STATS
54 static int  MSG_STATISTIC = 0;
55 int     msg_histogram[22];
56 static int _cmi_log2(int size)
57 {
58     int ret = 1;
59     size = size-1;
60     while( (size=size>>1)>0) ret++;
61     return ret;
62 }
63 #endif
64
65 #if CMK_BROADCAST_HYPERCUBE
66 /* ceil(log2(CmiNumNodes)) except when _Cmi_numnodes is 1, used for hypercube */
67 static int CmiNodesDim;
68 #endif
69 /* ###End of Broadcast related definitions ### */
70
71
72 static void handleOneBcastMsg(int size, char *msg);
73 static void processBcastQs();
74
75 /* Utility functions for forwarding broadcast messages,
76  * should not be used in machine-specific implementations
77  * except in some special occasions.
78  */
79 static INLINE_KEYWORD void processProcBcastMsg(int size, char *msg);
80 static INLINE_KEYWORD void processNodeBcastMsg(int size, char *msg);
81 static void SendSpanningChildrenProc(int size, char *msg);
82 static void SendHyperCubeProc(int size, char *msg);
83 #if CMK_NODE_QUEUE_AVAILABLE
84 static void SendSpanningChildrenNode(int size, char *msg);
85 static void SendHyperCubeNode(int size, char *msg);
86 #endif
87
88 static void SendSpanningChildren(int size, char *msg, int rankToAssign, int startNode);
89 static void SendHyperCube(int size,  char *msg, int rankToAssign, int startNode);
90
91 #if USE_COMMON_SYNC_BCAST || USE_COMMON_ASYNC_BCAST
92 #if !CMK_BROADCAST_SPANNING_TREE && !CMK_BROADCAST_HYPERCUBE
93 #warning "Broadcast function is based on the plain P2P O(P)-message scheme!!!"
94 #endif
95 #endif
96
97
98 void CmiSyncBroadcastFn(int size, char *msg);
99 CmiCommHandle CmiAsyncBroadcastFn(int size, char *msg);
100 void CmiFreeBroadcastFn(int size, char *msg);
101
102 void CmiSyncBroadcastAllFn(int size, char *msg);
103 CmiCommHandle CmiAsyncBroadcastAllFn(int size, char *msg);
104 void CmiFreeBroadcastAllFn(int size, char *msg);
105
106 #if CMK_NODE_QUEUE_AVAILABLE
107 void CmiSyncNodeBroadcastFn(int size, char *msg);
108 CmiCommHandle CmiAsyncNodeeroadcastFn(int size, char *msg);
109 void CmiFreeNodeBroadcastFn(int size, char *msg);
110
111 void CmiSyncNodeBroadcastAllFn(int size, char *msg);
112 CmiCommHandle CmiAsyncNodeBroadcastAllFn(int size, char *msg);
113 void CmiFreeNodeBroadcastAllFn(int size, char *msg);
114 #endif
115
116 /************** Done with Broadcast related */
117
118 #define CMI_DEST_RANK(msg)               ((CmiMsgHeaderBasic *)msg)->rank
119
120 #ifndef CMK_HAS_SIZE_IN_MSGHDR
121 #define CMK_HAS_SIZE_IN_MSGHDR 1
122 #endif
123 #if CMK_HAS_SIZE_IN_MSGHDR
124 #define CMI_MSG_SIZE(msg)  ((CmiMsgHeaderBasic *)msg)->size
125 #else
126 #define CMI_MSG_SIZE(msg)  (CmiAbort("Has no msg size in header"))
127 #endif
128
129 #if CMK_NODE_QUEUE_AVAILABLE
130 /* This value should be larger than the number of cores used
131  * per charm smp node. So it's currently set to such a large
132  * value.
133  */
134 #define DGRAM_NODEMESSAGE   (0x1FFB)
135 #endif
136
137 /* Node state structure */
138 int               _Cmi_mynode;    /* Which address space am I */
139 int               _Cmi_mynodesize;/* Number of processors in my address space */
140 int               _Cmi_numnodes;  /* Total number of address spaces */
141 int               _Cmi_numpes;    /* Total number of processors */
142
143 CpvDeclare(void*, CmiLocalQueue);
144
145 /* different modes for sending a message */
146 #define P2P_SYNC      0x1
147 #define P2P_ASYNC     0x2
148 #define BCAST_SYNC    0x4
149 #define BCAST_ASYNC   0x8
150 #define OUT_OF_BAND   0x10
151
152 enum MACHINE_SMP_MODE {
153     INVALID_MODE,
154     COMM_THREAD_SEND_RECV = 0,
155     COMM_THREAD_ONLY_RECV, /* work threads will do the send */
156     COMM_WORK_THREADS_SEND_RECV, /* work and comm threads do the both send/recv */
157     COMM_THREAD_NOT_EXIST /* work threads will do both send and recv */
158 };
159 /* The default mode of smp charm runtime */
160 static enum MACHINE_SMP_MODE Cmi_smp_mode_setting = COMM_THREAD_SEND_RECV;
161
162
163 #if CMK_SMP
164 static volatile int commThdExit = 0;
165 static CmiNodeLock  commThdExitLock = 0;
166
167 /**
168  *  The macro defines whether to have a comm thd to offload some
169  *  work such as forwarding bcast messages etc. This macro
170  *  should be defined before including "machine-smp.c". Note
171  *  that whether a machine layer in SMP mode could run w/o comm
172  *  thread depends on the support of the underlying
173  *  communication library.
174  *
175  */
176 #ifndef CMK_SMP_NO_COMMTHD
177 #define CMK_SMP_NO_COMMTHD 0
178 #endif
179
180 #if CMK_SMP_NO_COMMTHD
181 int Cmi_commthread = 0;
182 #else
183 int Cmi_commthread = 1;
184 #endif
185
186 #endif
187
188 /*SHOULD BE MOVED TO MACHINE-SMP.C ??*/
189 static int Cmi_nodestart;
190
191 /*
192  * Network progress utility variables. Period controls the rate at
193  * which the network poll is called
194  */
195 #ifndef NETWORK_PROGRESS_PERIOD_DEFAULT
196 #define NETWORK_PROGRESS_PERIOD_DEFAULT 1000
197 #endif
198
199 CpvDeclare(unsigned , networkProgressCount);
200 int networkProgressPeriod;
201
202
203 /* ===== Beginning of Common Function Declarations ===== */
204 void CmiAbort(const char *message);
205 static void PerrorExit(const char *msg);
206
207 /* This function handles the msg received as which queue to push into */
208 static void handleOneRecvedMsg(int size, char *msg);
209
210 /* Utility functions for forwarding broadcast messages,
211  * should not be used in machine-specific implementations
212  * except in some special occasions.
213  */
214 static void SendToPeers(int size, char *msg);
215
216
217 void CmiPushPE(int rank, void *msg);
218
219 #if CMK_NODE_QUEUE_AVAILABLE
220 void CmiPushNode(void *msg);
221 #endif
222
223 /* Functions regarding send ops declared in converse.h */
224
225 /* In default, using the common codes for msg sending */
226 #ifndef USE_COMMON_SYNC_P2P
227 #define USE_COMMON_SYNC_P2P 1
228 #endif
229 #ifndef USE_COMMON_ASYNC_P2P
230 #define USE_COMMON_ASYNC_P2P 1
231 #endif
232 #ifndef USE_COMMON_SYNC_BCAST
233 #define USE_COMMON_SYNC_BCAST 1
234 #endif
235 #ifndef USE_COMMON_ASYNC_BCAST
236 #define USE_COMMON_ASYNC_BCAST 1
237 #endif
238
239 static void CmiSendSelf(char *msg);
240
241 void CmiSyncSendFn(int destPE, int size, char *msg);
242 CmiCommHandle CmiAsyncSendFn(int destPE, int size, char *msg);
243 void CmiFreeSendFn(int destPE, int size, char *msg);
244
245 #if CMK_NODE_QUEUE_AVAILABLE
246 static void CmiSendNodeSelf(char *msg);
247
248 void CmiSyncNodeSendFn(int destNode, int size, char *msg);
249 CmiCommHandle CmiAsyncNodeSendFn(int destNode, int size, char *msg);
250 void CmiFreeNodeSendFn(int destNode, int size, char *msg);
251
252 #endif
253
254 /* Functions and variables regarding machine startup */
255 static char     **Cmi_argv;
256 static char     **Cmi_argvcopy;
257 static CmiStartFn Cmi_startfn;   /* The start function */
258 static int        Cmi_usrsched;  /* Continue after start function finishes? */
259 void ConverseInit(int argc, char **argv, CmiStartFn fn, int usched, int initret);
260 static void ConverseRunPE(int everReturn);
261
262 /* Functions regarding machine running on every proc */
263 static void AdvanceCommunication(int whenidle);
264 static void CommunicationServer(int sleepTime);
265 static void CommunicationServerThread(int sleepTime);
266 void ConverseExit(void);
267
268 /* Functions providing incoming network messages */
269 void *CmiGetNonLocal(void);
270 #if CMK_NODE_QUEUE_AVAILABLE
271 void *CmiGetNonLocalNodeQ(void);
272 #endif
273 /* Utiltiy functions */
274 static char *CopyMsg(char *msg, int len);
275
276 /* ===== End of Common Function Declarations ===== */
277
278 #include "machine-smp.c"
279
280 /* ===== Beginning of Idle-state Related Declarations =====  */
281 typedef struct {
282     int sleepMs; /*Milliseconds to sleep while idle*/
283     int nIdles; /*Number of times we've been idle in a row*/
284     CmiState cs; /*Machine state*/
285 } CmiIdleState;
286
287 static CmiIdleState *CmiNotifyGetState(void);
288
289 /**
290  *  Generally,
291  *
292  *  CmiNotifyIdle is used in non-SMP mode when the proc is idle.
293  *  When the proc is idle, AdvanceCommunication needs to be
294  *  called.
295  *
296  *  CmiNotifyStillIdle and CmiNotifyBeginIdle are used in SMP mode.
297  *
298  *  Different layers have choices of registering different callbacks for
299  *  idle state.
300  */
301 static void CmiNotifyBeginIdle(CmiIdleState *s);
302 static void CmiNotifyStillIdle(CmiIdleState *s);
303 void CmiNotifyIdle(void);
304 /* ===== End of Idle-state Related Declarations =====  */
305
306 /* ===== Beginning of Processor/Node State-related Stuff =====*/
307 #if !CMK_SMP
308 /************ non SMP **************/
309 static struct CmiStateStruct Cmi_state;
310 int _Cmi_mype;
311 int _Cmi_myrank;
312
313 void CmiMemLock() {}
314 void CmiMemUnlock() {}
315
316 #define CmiGetState() (&Cmi_state)
317 #define CmiGetStateN(n) (&Cmi_state)
318
319 void CmiYield(void) {
320     sleep(0);
321 }
322
323 static void CmiStartThreads(char **argv) {
324     CmiStateInit(Cmi_nodestart, 0, &Cmi_state);
325     _Cmi_mype = Cmi_nodestart;
326     _Cmi_myrank = 0;
327 }
328 #else
329 /************** SMP *******************/
330 INLINE_KEYWORD int CmiMyPe() {
331     return CmiGetState()->pe;
332 }
333 INLINE_KEYWORD int CmiMyRank() {
334     return CmiGetState()->rank;
335 }
336 INLINE_KEYWORD int CmiNodeFirst(int node) {
337     return node*_Cmi_mynodesize;
338 }
339 INLINE_KEYWORD int CmiNodeSize(int node) {
340     return _Cmi_mynodesize;
341 }
342 INLINE_KEYWORD int CmiNodeOf(int pe) {
343     return (pe/_Cmi_mynodesize);
344 }
345 INLINE_KEYWORD int CmiRankOf(int pe) {
346     return pe%_Cmi_mynodesize;
347 }
348 #endif
349 CsvDeclare(CmiNodeState, NodeState);
350 /* ===== End of Processor/Node State-related Stuff =====*/
351
352 #include "machine-broadcast.c"
353 #include "immediate.c"
354 #include "machine-commthd-util.c"
355
356 /* ===== Beginning of Common Function Definitions ===== */
357 static void PerrorExit(const char *msg) {
358     perror(msg);
359     exit(1);
360 }
361
362 /* ##### Beginning of Functions Related with Message Sending OPs ##### */
363 /*Add a message to this processor's receive queue, pe is a rank */
364 void CmiPushPE(int rank,void *msg) {
365     CmiState cs = CmiGetStateN(rank);
366     MACHSTATE2(3,"Pushing message into rank %d's queue %p{",rank, cs->recv);
367 #if CMK_IMMEDIATE_MSG
368     if (CmiIsImmediate(msg)) {
369         MACHSTATE1(3, "[%p] Push Immediate Message begin{",CmiGetState());
370         CMI_DEST_RANK(msg) = rank;
371         CmiPushImmediateMsg(msg);
372         MACHSTATE1(3, "[%p] Push Immediate Message end}",CmiGetState());
373         return;
374     }
375 #endif
376
377     PCQueuePush(cs->recv,msg);
378
379 #if CMK_SHARED_VARS_POSIX_THREADS_SMP
380   if (_Cmi_noprocforcommthread)
381 #endif
382     CmiIdleLock_addMessage(&cs->idle);
383     MACHSTATE1(3,"} Pushing message into rank %d's queue done",rank);
384 }
385
386 #if CMK_NODE_QUEUE_AVAILABLE
387 /*Add a message to this processor's receive queue */
388 void CmiPushNode(void *msg) {
389     MACHSTATE(3,"Pushing message into NodeRecv queue");
390 #if CMK_IMMEDIATE_MSG
391     if (CmiIsImmediate(msg)) {
392         CMI_DEST_RANK(msg) = 0;
393         CmiPushImmediateMsg(msg);
394         return;
395     }
396 #endif
397     CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
398     PCQueuePush(CsvAccess(NodeState).NodeRecv,msg);
399     CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
400
401 #if CMK_SHARED_VARS_POSIX_THREADS_SMP
402     if (_Cmi_noprocforcommthread)
403 #endif
404     {
405         CmiState cs=CmiGetStateN(0);
406         CmiIdleLock_addMessage(&cs->idle);
407     }
408 }
409 #endif
410
411 /* This function handles the msg received as which queue to push into */
412 static INLINE_KEYWORD void handleOneRecvedMsg(int size, char *msg) {
413     int isBcastMsg = 0;
414 #if CMK_BROADCAST_SPANNING_TREE || CMK_BROADCAST_HYPERCUBE
415     isBcastMsg = (CMI_BROADCAST_ROOT(msg)!=0);
416 #endif
417
418     if (isBcastMsg) {
419         handleOneBcastMsg(size, msg);
420         return;
421     }
422
423 #if CMK_NODE_QUEUE_AVAILABLE
424     if (CMI_DEST_RANK(msg)==DGRAM_NODEMESSAGE){
425         CmiPushNode(msg);
426         return;
427     }
428 #endif
429     CmiPushPE(CMI_DEST_RANK(msg), msg);
430
431 }
432
433
434 static void SendToPeers(int size, char *msg) {
435     /* FIXME: now it's just a flat p2p send!! When node size is large,
436     * it should also be sent in a tree
437     */
438     int exceptRank = CMI_DEST_RANK(msg);
439     int i;
440     for (i=0; i<exceptRank; i++) {
441         CmiPushPE(i, CopyMsg(msg, size));
442     }
443     for (i=exceptRank+1; i<CmiMyNodeSize(); i++) {
444         CmiPushPE(i, CopyMsg(msg, size));
445     }
446 }
447
448
449 /* Functions regarding sending operations */
450 static void CmiSendSelf(char *msg) {
451 #if CMK_IMMEDIATE_MSG
452     if (CmiIsImmediate(msg)) {
453         /* CmiBecomeNonImmediate(msg); */
454         CmiPushImmediateMsg(msg);
455         CmiHandleImmediate();
456         return;
457     }
458 #endif
459     CdsFifo_Enqueue(CpvAccess(CmiLocalQueue),msg);
460 }
461
462 /* Functions regarding P2P send op */
463 #if USE_COMMON_SYNC_P2P
464 void CmiSyncSendFn(int destPE, int size, char *msg) {
465     char *dupmsg = CopyMsg(msg, size);
466     CmiFreeSendFn(destPE, size, dupmsg);
467 }
468
469 #if CMK_USE_PXSHM
470 #include "machine-pxshm.c"
471 #endif
472 #if CMK_USE_XPMEM
473 #include "machine-xpmem.c"
474 #endif
475
476 int refcount = 0;
477
478 #if CMK_USE_OOB
479 CpvExtern(int, _urgentSend);
480 #endif
481
482 /* a wrapper of LrtsSendFunc */
483 #if CMK_C_INLINE
484 inline 
485 #endif
486 CmiCommHandle LrtsSendNetworkFunc(int destNode, int size, char *msg, int mode)
487 {
488         int rank;
489 #if CMK_USE_PXSHM
490         if (CmiValidPxshm(destNode, size)) {
491           CmiSendMessagePxshm(msg, size, destNode, &refcount);
492           //for (int i=0; i<refcount; i++) CmiReference(msg);
493           return 0;
494         }
495 #endif
496 #if CMK_USE_XPMEM
497         if (CmiValidXpmem(destNode, size)) {
498           CmiSendMessageXpmem(msg, size, destNode, &refcount);
499           //for (int i=0; i<refcount; i++) CmiReference(msg);
500           return 0;
501         }
502 #endif
503
504 #if CMK_WITH_STATS
505 if (MSG_STATISTIC)
506 {
507     int ret_log = _cmi_log2(size);
508     if(ret_log >21) ret_log = 21;
509     msg_histogram[ret_log]++;
510 }
511 #endif
512 #if CMK_USE_OOB
513     if (CpvAccess(_urgentSend)) mode |= OUT_OF_BAND;
514 #endif
515     return LrtsSendFunc(destNode, size, msg, mode);
516 }
517
518 void CmiFreeSendFn(int destPE, int size, char *msg) {
519     CMI_SET_BROADCAST_ROOT(msg, 0);
520     CQdCreate(CpvAccess(cQdState), 1);
521     if (CmiMyPe()==destPE) {
522         CmiSendSelf(msg);
523 #if CMK_PERSISTENT_COMM
524         if (phs) curphs++;
525 #endif
526     } else {
527 #if CMK_PERSISTENT_COMM
528         if (phs) {
529           if (size > 8192) {
530             CmiAssert(curphs < phsSize);
531             LrtsSendPersistentMsg(phs[curphs++], destPE, size, msg);
532             return;
533           }
534           else
535             curphs++;
536         }
537 #endif
538         int destNode = CmiNodeOf(destPE);
539 #if CMK_SMP
540         if (CmiMyNode()==destNode) {
541             CmiPushPE(CmiRankOf(destPE), msg);
542             return;
543         }
544 #endif
545         CMI_DEST_RANK(msg) = CmiRankOf(destPE);
546         LrtsSendNetworkFunc(destNode, size, msg, P2P_SYNC);
547     }
548 }
549 #endif
550
551 #if USE_COMMON_ASYNC_P2P
552 CmiCommHandle CmiAsyncSendFn(int destPE, int size, char *msg) {
553     int destNode = CmiNodeOf(destPE);
554     if (destNode == CmiMyNode()) {
555         CmiSyncSendFn(destPE,size,msg);
556         return 0;
557     } else {
558 #if CMK_WITH_STATS
559 if (  MSG_STATISTIC)
560 {
561     int ret_log = _cmi_log2(size);
562         if(ret_log >21) ret_log = 21;
563         msg_histogram[ret_log]++;
564 }
565 #endif
566         return LrtsSendFunc(destPE, size, msg, P2P_ASYNC);
567     }
568 }
569 #endif
570
571 #if CMK_NODE_QUEUE_AVAILABLE
572 static void CmiSendNodeSelf(char *msg) {
573 #if CMK_IMMEDIATE_MSG
574     if (CmiIsImmediate(msg)) {
575         CmiPushImmediateMsg(msg);
576         if (!_immRunning) CmiHandleImmediate();
577         return;
578     }
579 #endif
580     CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
581     PCQueuePush(CsvAccess(NodeState).NodeRecv, msg);
582     CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
583 }
584
585 #if USE_COMMON_ASYNC_P2P
586 void CmiSyncNodeSendFn(int destNode, int size, char *msg) {
587     char *dupmsg = CopyMsg(msg, size);
588     CmiFreeNodeSendFn(destNode, size, dupmsg);
589 }
590
591 void CmiFreeNodeSendFn(int destNode, int size, char *msg) {
592     CMI_DEST_RANK(msg) = DGRAM_NODEMESSAGE;
593     CQdCreate(CpvAccess(cQdState), 1);
594     CMI_SET_BROADCAST_ROOT(msg, 0);
595     if (destNode == CmiMyNode()) {
596         CmiSendNodeSelf(msg);
597     } else {
598 #if CMK_WITH_STATS
599 if (  MSG_STATISTIC)
600 {
601     int ret_log = _cmi_log2(size);
602     if(ret_log >21) ret_log = 21;
603     msg_histogram[ret_log]++;
604 }
605 #endif
606         LrtsSendFunc(destNode, size, msg, P2P_SYNC);
607     }
608 }
609 #endif
610
611 #if USE_COMMON_ASYNC_P2P
612 CmiCommHandle CmiAsyncNodeSendFn(int destNode, int size, char *msg) {
613     if (destNode == CmiMyNode()) {
614         CmiSyncNodeSendFn(destNode, size, msg);
615         return 0;
616     } else {
617 #if CMK_WITH_STATS
618 if (  MSG_STATISTIC)
619 {
620         int ret_log = _cmi_log2(size);
621         if(ret_log >21) ret_log = 21;
622         msg_histogram[ret_log]++;
623 }
624 #endif
625         return LrtsSendFunc(destNode, size, msg, P2P_ASYNC);
626     }
627 }
628 #endif
629 #endif
630
631 /* ##### Beginning of Functions Related with Machine Startup ##### */
632 void ConverseInit(int argc, char **argv, CmiStartFn fn, int usched, int initret) {
633     int _ii;
634     int tmp;
635 #if CMK_WITH_STATS
636     MSG_STATISTIC = CmiGetArgFlag(argv, "+msgstatistic");
637 #endif
638     /* processor per node */
639     _Cmi_mynodesize = 1;
640     if (!CmiGetArgInt(argv,"+ppn", &_Cmi_mynodesize))
641         CmiGetArgInt(argv,"++ppn", &_Cmi_mynodesize);
642 #if ! CMK_SMP
643     if (_Cmi_mynodesize > 1 && _Cmi_mynode == 0)
644         CmiAbort("+ppn cannot be used in non SMP version!\n");
645 #endif
646
647     /* Network progress function is used to poll the network when for
648     messages. This flushes receive buffers on some  implementations*/
649     networkProgressPeriod = NETWORK_PROGRESS_PERIOD_DEFAULT;
650     CmiGetArgInt(argv, "+networkProgressPeriod", &networkProgressPeriod);
651
652     /* _Cmi_mynodesize has to be obtained before LrtsInit
653      * because it may be used inside LrtsInit
654      */
655     /* argv could be changed inside LrtsInit */
656     /* Inside this function, the number of nodes and my node id are obtained */
657 #if CMK_WITH_STATS
658 if (  MSG_STATISTIC)
659 {
660     for(_ii=0; _ii<22; _ii++)
661         msg_histogram[_ii] = 0;
662 }
663 #endif
664
665     LrtsInit(&argc, &argv, &_Cmi_numnodes, &_Cmi_mynode);
666    
667         if (_Cmi_mynode==0) {
668 #if !CMK_SMP 
669                 printf("Charm++> Running on non-SMP mode\n");
670 #else
671                 printf("Charm++> Running on SMP mode, %d worker threads per process\n", _Cmi_mynodesize);
672                 if (Cmi_smp_mode_setting == COMM_THREAD_SEND_RECV) {
673                         printf("Charm++> The comm. thread both sends and receives messages\n");
674                 } else if (Cmi_smp_mode_setting == COMM_THREAD_ONLY_RECV) {
675                         printf("Charm++> The comm. thread only receives messages, while work threads send messages\n");
676                 } else if (Cmi_smp_mode_setting == COMM_WORK_THREADS_SEND_RECV) {
677                         printf("Charm++> Both  comm. thread and worker thread send and messages\n");
678                 } else if (Cmi_smp_mode_setting == COMM_THREAD_NOT_EXIST) {
679                         printf("Charm++> There's no comm. thread. Work threads both send and receive messages\n");
680                 } else {
681                         CmiAbort("Charm++> Invalid SMP mode setting\n");
682                 }
683 #endif
684         }
685
686     _Cmi_numpes = _Cmi_numnodes * _Cmi_mynodesize;
687     Cmi_nodestart = _Cmi_mynode * _Cmi_mynodesize;
688     Cmi_argvcopy = CmiCopyArgs(argv);
689     Cmi_argv = argv;
690     Cmi_startfn = fn;
691     Cmi_usrsched = usched;
692
693 #if CMK_USE_PXSHM
694     CmiInitPxshm(argv);
695 #endif
696 #if CMK_USE_XPMEM
697     CmiInitXpmem(argv);
698 #endif
699
700     /* CmiTimerInit(); */
701 #if CMK_BROADCAST_HYPERCUBE
702     /* CmiNodesDim = ceil(log2(CmiNumNodes)) except when #nodes is 1*/
703     tmp = CmiNumNodes()-1;
704     CmiNodesDim = 0;
705     while (tmp>0) {
706         CmiNodesDim++;
707         tmp = tmp >> 1;
708     }
709     if (CmiNumNodes()==1) CmiNodesDim=1;
710 #endif
711
712     CsvInitialize(CmiNodeState, NodeState);
713     CmiNodeStateInit(&CsvAccess(NodeState));
714 #if CMK_SMP
715     commThdExitLock = CmiCreateLock();
716 #endif
717
718 #if CMK_OFFLOAD_BCAST_PROCESS
719     /* the actual queues should be created on comm thread considering NUMA in SMP */
720     CsvInitialize(PCQueue, procBcastQ);
721 #if CMK_NODE_QUEUE_AVAILABLE
722     CsvInitialize(PCQueue, nodeBcastQ);
723 #endif
724 #endif
725
726 #if CMK_SMP && CMK_LEVERAGE_COMMTHREAD
727     CsvInitialize(PCQueue, notifyCommThdMsgBuffer);
728 #endif
729
730     CmiStartThreads(argv);
731
732     ConverseRunPE(initret);
733 }
734
735 extern void ConverseCommonInit(char **argv);
736 extern void CthInit(char **argv);
737
738 static void ConverseRunPE(int everReturn) {
739     CmiState cs;
740     char** CmiMyArgv;
741
742     LrtsPreCommonInit(everReturn);
743
744 #if CMK_OFFLOAD_BCAST_PROCESS
745     int createQueue = 1;
746 #if CMK_SMP
747 #if CMK_SMP_NO_COMMTHD
748     /* If there's no comm thread, then the queue is created on rank 0 */
749     if (CmiMyRank()) createQueue = 0;
750 #else
751     if (CmiMyRank()<CmiMyNodeSize()) createQueue = 0;
752 #endif
753 #endif
754
755     if (createQueue) {
756         CsvAccess(procBcastQ) = PCQueueCreate();
757 #if CMK_NODE_QUEUE_AVAILABLE
758         CsvAccess(nodeBcastQ) = PCQueueCreate();
759 #endif
760     }
761 #endif
762
763     CmiNodeAllBarrier();
764
765     cs = CmiGetState();
766     CpvInitialize(void *,CmiLocalQueue);
767     CpvAccess(CmiLocalQueue) = cs->localqueue;
768
769     if (CmiMyRank())
770         CmiMyArgv=CmiCopyArgs(Cmi_argvcopy);
771     else
772         CmiMyArgv=Cmi_argv;
773
774     CthInit(CmiMyArgv);
775
776     /* initialize the network progress counter*/
777     /* Network progress function is used to poll the network when for
778        messages. This flushes receive buffers on some  implementations*/
779     CpvInitialize(unsigned , networkProgressCount);
780     CpvAccess(networkProgressCount) = 0;
781
782     ConverseCommonInit(CmiMyArgv);
783
784     LrtsPostCommonInit(everReturn);
785
786 #if CMK_SMP && CMK_LEVERAGE_COMMTHREAD
787     CmiInitNotifyCommThdScheme();
788 #endif
789     /* Converse initialization finishes, immediate messages can be processed.
790        node barrier previously should take care of the node synchronization */
791     _immediateReady = 1;
792
793     /* communication thread */
794     if (CmiMyRank() == CmiMyNodeSize()) {
795         Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
796         while (1) CommunicationServerThread(5);
797     } else { /* worker thread */
798         if (!everReturn) {
799             Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
800             if (Cmi_usrsched==0) CsdScheduler(-1);
801             ConverseExit();
802         }
803     }
804 }
805 /* ##### End of Functions Related with Machine Startup ##### */
806
807 /* ##### Beginning of Functions Related with Machine Running ##### */
808 static INLINE_KEYWORD void AdvanceCommunication(int whenidle) {
809     int doProcessBcast = 1;
810
811 #if CMK_USE_PXSHM
812     CommunicationServerPxshm();
813 #endif
814 #if CMK_USE_XPMEM
815     CommunicationServerXpmem();
816 #endif
817
818     LrtsAdvanceCommunication(whenidle);
819
820 #if CMK_OFFLOAD_BCAST_PROCESS
821 #if CMK_SMP_NO_COMMTHD
822     /*FIXME: only asks rank 0 to process bcast msgs, so perf may suffer*/
823     if (CmiMyRank()) doProcessBcast = 0;
824 #endif
825     if (doProcessBcast) processBcastQs();
826 #endif
827
828 #if CMK_IMMEDIATE_MSG
829 #if !CMK_SMP
830     CmiHandleImmediate();
831 #endif
832 #if CMK_SMP && CMK_SMP_NO_COMMTHD
833     if (CmiMyRank()==0) CmiHandleImmediate();
834 #endif
835 #endif
836 }
837
838 extern void ConverseCommonExit();
839
840 static void CommunicationServer(int sleepTime) {
841 #if CMK_SMP
842     AdvanceCommunication(0);
843
844     if (commThdExit == CmiMyNodeSize()) {
845         MACHSTATE(2, "CommunicationServer exiting {");
846         LrtsDrainResources();
847         MACHSTATE(2, "} CommunicationServer EXIT");
848
849         ConverseCommonExit();
850
851 #if CMK_USE_PXSHM
852         CmiExitPxshm();
853 #endif
854 #if CMK_USE_XPMEM
855         CmiExitXpmem();
856 #endif
857         LrtsExit();
858     }
859 #endif
860 }
861
862 static void CommunicationServerThread(int sleepTime) {
863     CommunicationServer(sleepTime);
864 #if CMK_IMMEDIATE_MSG
865     CmiHandleImmediate();
866 #endif
867 }
868
869 void ConverseExit(void) {
870     int i;
871 #if !CMK_SMP
872     LrtsDrainResources();
873 #else
874         if(Cmi_smp_mode_setting == COMM_THREAD_ONLY_RECV
875            || Cmi_smp_mode_setting == COMM_THREAD_NOT_EXIST)
876                 LrtsDrainResources();
877 #endif
878
879     ConverseCommonExit();
880
881 #if CMK_WITH_STATS
882 if (MSG_STATISTIC)
883 {
884     for(i=0; i<22; i++)
885     {
886         CmiPrintf("[MSG PE:%d]", CmiMyPe());
887         if(msg_histogram[i] >0)
888             CmiPrintf("(%d:%d) ", 1<<i, msg_histogram[i]);
889     }
890     CmiPrintf("\n");
891 }
892 #endif
893
894 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
895     if (CmiMyPe() == 0) CmiPrintf("End of program\n");
896 #endif
897
898 #if !CMK_SMP
899 #if CMK_USE_PXSHM
900     CmiExitPxshm();
901 #endif
902 #if CMK_USE_XPMEM
903     CmiExitXpmem();
904 #endif
905     LrtsExit();
906 #else
907     /* In SMP, the communication thread will exit */
908     /* atomic increment */
909     CmiLock(commThdExitLock);
910     commThdExit++;
911     CmiUnlock(commThdExitLock);
912     while (1) CmiYield();
913 #endif
914 }
915 /* ##### End of Functions Related with Machine Running ##### */
916
917 void CmiAbort(const char *message) {
918 #if CMK_USE_PXSHM
919     CmiExitPxshm();
920 #endif
921 #if CMK_USE_XPMEM
922     CmiExitXpmem();
923 #endif
924     LrtsAbort(message);
925 }
926
927 /* ##### Beginning of Functions Providing Incoming Network Messages ##### */
928 void *CmiGetNonLocal(void) {
929     CmiState cs = CmiGetState();
930     void *msg = NULL;
931
932 #if !CMK_SMP || CMK_SMP_NO_COMMTHD
933     /**
934       * In SMP mode with comm thread, it's possible a normal
935       * msg is sent from an immediate msg which is executed
936       * on comm thread. In this case, the msg is sent to
937       * the network queue of the work thread. Therefore,
938       * even there's only one worker thread, the polling of
939       * network queue is still required.
940       */
941     if (CmiNumPes() == 1) return NULL;
942 #endif
943
944     MACHSTATE2(3, "[%p] CmiGetNonLocal begin %d{", cs, CmiMyPe());
945     CmiIdleLock_checkMessage(&cs->idle);
946     /* ?????although it seems that lock is not needed, I found it crashes very often
947        on mpi-smp without lock */
948     msg = PCQueuePop(cs->recv);
949 #if !CMK_SMP
950     if (!msg) {
951        AdvanceCommunication(0);
952        msg = PCQueuePop(cs->recv);
953     }
954 #else
955 //    LrtsPostNonLocal();
956 #endif
957
958     MACHSTATE3(3,"[%p] CmiGetNonLocal from queue %p with msg %p end }",CmiGetState(),(cs->recv), msg);
959
960     return msg;
961 }
962
963 #if CMK_NODE_QUEUE_AVAILABLE
964 void *CmiGetNonLocalNodeQ(void) {
965     CmiState cs = CmiGetState();
966     char *result = 0;
967     CmiIdleLock_checkMessage(&cs->idle);
968     if (!PCQueueEmpty(CsvAccess(NodeState).NodeRecv)) {
969         MACHSTATE1(3,"CmiGetNonLocalNodeQ begin %d {", CmiMyPe());
970         CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
971         result = (char *) PCQueuePop(CsvAccess(NodeState).NodeRecv);
972         CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
973         MACHSTATE1(3,"} CmiGetNonLocalNodeQ end %d ", CmiMyPe());
974     }
975
976     return result;
977 }
978 #endif
979 /* ##### End of Functions Providing Incoming Network Messages ##### */
980
981 static CmiIdleState *CmiNotifyGetState(void) {
982     CmiIdleState *s=(CmiIdleState *)malloc(sizeof(CmiIdleState));
983     s->sleepMs=0;
984     s->nIdles=0;
985     s->cs=CmiGetState();
986     return s;
987 }
988
989 static void CmiNotifyBeginIdle(CmiIdleState *s) {
990     s->sleepMs=0;
991     s->nIdles=0;
992 }
993
994 /*Number of times to spin before sleeping*/
995 #define SPINS_BEFORE_SLEEP 20
996 static void CmiNotifyStillIdle(CmiIdleState *s) {
997     MACHSTATE1(2,"still idle (%d) begin {",CmiMyPe())
998 #if !CMK_SMP
999     AdvanceCommunication(1);
1000 #else
1001     LrtsPostNonLocal();
1002
1003     if (_Cmi_noprocforcommthread) {
1004     s->nIdles++;
1005     if (s->nIdles>SPINS_BEFORE_SLEEP) { /*Start giving some time back to the OS*/
1006         s->sleepMs+=2;
1007         if (s->sleepMs>10) s->sleepMs=10;
1008     }
1009
1010     if (s->sleepMs>0) {
1011         MACHSTATE1(2,"idle lock(%d) {",CmiMyPe())
1012         CmiIdleLock_sleep(&s->cs->idle,s->sleepMs);
1013         MACHSTATE1(2,"} idle lock(%d)",CmiMyPe())
1014     }
1015     }
1016 #endif
1017
1018     MACHSTATE1(2,"still idle (%d) end {",CmiMyPe())
1019 }
1020
1021 /* usually called in non-smp mode */
1022 void CmiNotifyIdle(void) {
1023     AdvanceCommunication(1);
1024     CmiYield();
1025 }
1026
1027 /* Utiltiy functions */
1028 static char *CopyMsg(char *msg, int len) {
1029     char *copy = (char *)CmiAlloc(len);
1030 #if CMK_ERROR_CHECKING
1031     if (!copy) {
1032         CmiAbort("Error: out of memory in machine layer\n");
1033     }
1034 #endif
1035     memcpy(copy, msg, len);
1036     return copy;
1037 }
1038
1039
1040