59cfbd6fd4cfdb1edac87e9da80b1a2d045faa20
[charm.git] / src / arch / util / machine-common-core.c
1 /*
2  *created by Chao Mei
3  *revised by Yanhua, Gengbin
4  */
5
6 #if CMK_C_INLINE
7 #define INLINE_KEYWORD inline
8 #else
9 #define INLINE_KEYWORD
10 #endif
11
12 /******* broadcast related  */
13 #ifndef CMK_BROADCAST_SPANNING_TREE
14 #define CMK_BROADCAST_SPANNING_TREE    1
15 #endif
16
17 #ifndef CMK_BROADCAST_HYPERCUBE
18 #define CMK_BROADCAST_HYPERCUBE        0
19 #endif
20
21 #define BROADCAST_SPANNING_FACTOR      4
22 /* The number of children used when a msg is broadcast inside a node */
23 #define BROADCAST_SPANNING_INTRA_FACTOR  8
24
25 /* Root of broadcast:
26  * non-bcast msg: root = 0;
27  * proc-level bcast msg: root >=1; (CmiMyPe()+1)
28  * node-level bcast msg: root <=-1; (-CmiMyNode()-1)
29  */
30 #define CMI_BROADCAST_ROOT(msg)          ((CmiMsgHeaderBasic *)msg)->root
31 #define CMI_SET_BROADCAST_ROOT(msg, root)  CMI_BROADCAST_ROOT(msg) = (root);
32
33 /**
34  * For some machine layers such as on Active Message framework,
35  * the receiver callback is usally executed on an internal
36  * thread (i.e. not the flow managed by ours). Therefore, for
37  * forwarding broadcast messages, we could have a choice whether
38  * to offload such function to the flow we manage such as the
39  * communication thread. -
40  */
41
42 #ifndef CMK_OFFLOAD_BCAST_PROCESS
43 #define CMK_OFFLOAD_BCAST_PROCESS 0
44 #endif
45
46 #if CMK_OFFLOAD_BCAST_PROCESS
47 CsvDeclare(PCQueue, procBcastQ);
48 #if CMK_NODE_QUEUE_AVAILABLE
49 CsvDeclare(PCQueue, nodeBcastQ);
50 #endif
51 #endif
52
53 #if CMK_WITH_STATS
54 static int  MSG_STATISTIC = 0;
55 int     msg_histogram[22];
56 static int _cmi_log2(int size)
57 {
58     int ret = 1;
59     size = size-1;
60     while( (size=size>>1)>0) ret++;
61     return ret;
62 }
63 #endif
64
65 #if CMK_BROADCAST_HYPERCUBE
66 /* ceil(log2(CmiNumNodes)) except when _Cmi_numnodes is 1, used for hypercube */
67 static int CmiNodesDim;
68 #endif
69 /* ###End of Broadcast related definitions ### */
70
71
72 static void handleOneBcastMsg(int size, char *msg);
73 static void processBcastQs();
74
75 /* Utility functions for forwarding broadcast messages,
76  * should not be used in machine-specific implementations
77  * except in some special occasions.
78  */
79 static INLINE_KEYWORD void processProcBcastMsg(int size, char *msg);
80 static INLINE_KEYWORD void processNodeBcastMsg(int size, char *msg);
81 static void SendSpanningChildrenProc(int size, char *msg);
82 static void SendHyperCubeProc(int size, char *msg);
83 #if CMK_NODE_QUEUE_AVAILABLE
84 static void SendSpanningChildrenNode(int size, char *msg);
85 static void SendHyperCubeNode(int size, char *msg);
86 #endif
87
88 static void SendSpanningChildren(int size, char *msg, int rankToAssign, int startNode);
89 static void SendHyperCube(int size,  char *msg, int rankToAssign, int startNode);
90
91 #if USE_COMMON_SYNC_BCAST || USE_COMMON_ASYNC_BCAST
92 #if !CMK_BROADCAST_SPANNING_TREE && !CMK_BROADCAST_HYPERCUBE
93 #warning "Broadcast function is based on the plain P2P O(P)-message scheme!!!"
94 #endif
95 #endif
96
97
98 void CmiSyncBroadcastFn(int size, char *msg);
99 CmiCommHandle CmiAsyncBroadcastFn(int size, char *msg);
100 void CmiFreeBroadcastFn(int size, char *msg);
101
102 void CmiSyncBroadcastAllFn(int size, char *msg);
103 CmiCommHandle CmiAsyncBroadcastAllFn(int size, char *msg);
104 void CmiFreeBroadcastAllFn(int size, char *msg);
105
106 #if CMK_NODE_QUEUE_AVAILABLE
107 void CmiSyncNodeBroadcastFn(int size, char *msg);
108 CmiCommHandle CmiAsyncNodeeroadcastFn(int size, char *msg);
109 void CmiFreeNodeBroadcastFn(int size, char *msg);
110
111 void CmiSyncNodeBroadcastAllFn(int size, char *msg);
112 CmiCommHandle CmiAsyncNodeBroadcastAllFn(int size, char *msg);
113 void CmiFreeNodeBroadcastAllFn(int size, char *msg);
114 #endif
115
116 /************** Done with Broadcast related */
117
118 #define CMI_DEST_RANK(msg)               ((CmiMsgHeaderBasic *)msg)->rank
119
120 #ifndef CMK_HAS_SIZE_IN_MSGHDR
121 #define CMK_HAS_SIZE_IN_MSGHDR 1
122 #endif
123 #if CMK_HAS_SIZE_IN_MSGHDR
124 #define CMI_MSG_SIZE(msg)  ((CmiMsgHeaderBasic *)msg)->size
125 #else
126 #define CMI_MSG_SIZE(msg)  (CmiAbort("Has no msg size in header"))
127 #endif
128
129 #if CMK_NODE_QUEUE_AVAILABLE
130 /* This value should be larger than the number of cores used
131  * per charm smp node. So it's currently set to such a large
132  * value.
133  */
134 #define DGRAM_NODEMESSAGE   (0x1FFB)
135 #endif
136
137 /* Node state structure */
138 int               _Cmi_mynode;    /* Which address space am I */
139 int               _Cmi_mynodesize;/* Number of processors in my address space */
140 int               _Cmi_numnodes;  /* Total number of address spaces */
141 int               _Cmi_numpes;    /* Total number of processors */
142
143 CpvDeclare(void*, CmiLocalQueue);
144
145 /* different modes for sending a message */
146 #define P2P_SYNC      0x1
147 #define P2P_ASYNC     0x2
148 #define BCAST_SYNC    0x4
149 #define BCAST_ASYNC   0x8
150 #define OUT_OF_BAND   0x10
151
152 enum MACHINE_SMP_MODE {
153     INVALID_MODE,
154 #if CMK_BLUEGENEQ
155     COMM_THREAD_SEND_RECV = 1,
156 #else 
157     COMM_THREAD_SEND_RECV = 0,
158 #endif
159     COMM_THREAD_ONLY_RECV, /* work threads will do the send */
160     COMM_WORK_THREADS_SEND_RECV, /* work and comm threads do the both send/recv */
161     COMM_THREAD_NOT_EXIST /* work threads will do both send and recv */
162 };
163 /* The default mode of smp charm runtime */
164 static enum MACHINE_SMP_MODE Cmi_smp_mode_setting = COMM_THREAD_SEND_RECV;
165
166
167 #if CMK_SMP
168 static volatile int commThdExit = 0;
169 static CmiNodeLock  commThdExitLock = 0;
170
171 /**
172  *  The macro defines whether to have a comm thd to offload some
173  *  work such as forwarding bcast messages etc. This macro
174  *  should be defined before including "machine-smp.c". Note
175  *  that whether a machine layer in SMP mode could run w/o comm
176  *  thread depends on the support of the underlying
177  *  communication library.
178  *
179  */
180 #ifndef CMK_SMP_NO_COMMTHD
181 #define CMK_SMP_NO_COMMTHD 0
182 #endif
183
184 #if CMK_SMP_NO_COMMTHD
185 int Cmi_commthread = 0;
186 #else
187 int Cmi_commthread = 1;
188 #endif
189
190 #endif
191
192 /*SHOULD BE MOVED TO MACHINE-SMP.C ??*/
193 static int Cmi_nodestart;
194
195 /*
196  * Network progress utility variables. Period controls the rate at
197  * which the network poll is called
198  */
199 #ifndef NETWORK_PROGRESS_PERIOD_DEFAULT
200 #define NETWORK_PROGRESS_PERIOD_DEFAULT 1000
201 #endif
202
203 CpvDeclare(unsigned , networkProgressCount);
204 int networkProgressPeriod;
205
206
207 /* ===== Beginning of Common Function Declarations ===== */
208 void CmiAbort(const char *message);
209 static void PerrorExit(const char *msg);
210
211 /* This function handles the msg received as which queue to push into */
212 static void handleOneRecvedMsg(int size, char *msg);
213
214 /* Utility functions for forwarding broadcast messages,
215  * should not be used in machine-specific implementations
216  * except in some special occasions.
217  */
218 static void SendToPeers(int size, char *msg);
219
220
221 void CmiPushPE(int rank, void *msg);
222
223 #if CMK_NODE_QUEUE_AVAILABLE
224 void CmiPushNode(void *msg);
225 #endif
226
227 /* Functions regarding send ops declared in converse.h */
228
229 /* In default, using the common codes for msg sending */
230 #ifndef USE_COMMON_SYNC_P2P
231 #define USE_COMMON_SYNC_P2P 1
232 #endif
233 #ifndef USE_COMMON_ASYNC_P2P
234 #define USE_COMMON_ASYNC_P2P 1
235 #endif
236 #ifndef USE_COMMON_SYNC_BCAST
237 #define USE_COMMON_SYNC_BCAST 1
238 #endif
239 #ifndef USE_COMMON_ASYNC_BCAST
240 #define USE_COMMON_ASYNC_BCAST 1
241 #endif
242
243 static void CmiSendSelf(char *msg);
244
245 void CmiSyncSendFn(int destPE, int size, char *msg);
246 CmiCommHandle CmiAsyncSendFn(int destPE, int size, char *msg);
247 void CmiFreeSendFn(int destPE, int size, char *msg);
248
249 #if CMK_NODE_QUEUE_AVAILABLE
250 static void CmiSendNodeSelf(char *msg);
251
252 void CmiSyncNodeSendFn(int destNode, int size, char *msg);
253 CmiCommHandle CmiAsyncNodeSendFn(int destNode, int size, char *msg);
254 void CmiFreeNodeSendFn(int destNode, int size, char *msg);
255
256 #endif
257
258 /* Functions and variables regarding machine startup */
259 static char     **Cmi_argv;
260 static char     **Cmi_argvcopy;
261 static CmiStartFn Cmi_startfn;   /* The start function */
262 static int        Cmi_usrsched;  /* Continue after start function finishes? */
263 void ConverseInit(int argc, char **argv, CmiStartFn fn, int usched, int initret);
264 static void ConverseRunPE(int everReturn);
265
266 /* Functions regarding machine running on every proc */
267 static void AdvanceCommunication(int whenidle);
268 static void CommunicationServer(int sleepTime);
269 static void CommunicationServerThread(int sleepTime);
270 void ConverseExit(void);
271
272 /* Functions providing incoming network messages */
273 void *CmiGetNonLocal(void);
274 #if CMK_NODE_QUEUE_AVAILABLE
275 void *CmiGetNonLocalNodeQ(void);
276 #endif
277 /* Utiltiy functions */
278 static char *CopyMsg(char *msg, int len);
279
280 /* ===== End of Common Function Declarations ===== */
281
282 #include "machine-smp.c"
283
284 /* ===== Beginning of Idle-state Related Declarations =====  */
285 typedef struct {
286     int sleepMs; /*Milliseconds to sleep while idle*/
287     int nIdles; /*Number of times we've been idle in a row*/
288     CmiState cs; /*Machine state*/
289 } CmiIdleState;
290
291 static CmiIdleState *CmiNotifyGetState(void);
292
293 /**
294  *  Generally,
295  *
296  *  CmiNotifyIdle is used in non-SMP mode when the proc is idle.
297  *  When the proc is idle, AdvanceCommunication needs to be
298  *  called.
299  *
300  *  CmiNotifyStillIdle and CmiNotifyBeginIdle are used in SMP mode.
301  *
302  *  Different layers have choices of registering different callbacks for
303  *  idle state.
304  */
305 static void CmiNotifyBeginIdle(CmiIdleState *s);
306 static void CmiNotifyStillIdle(CmiIdleState *s);
307 void CmiNotifyIdle(void);
308 /* ===== End of Idle-state Related Declarations =====  */
309
310 /* ===== Beginning of Processor/Node State-related Stuff =====*/
311 #if !CMK_SMP
312 /************ non SMP **************/
313 static struct CmiStateStruct Cmi_state;
314 int _Cmi_mype;
315 int _Cmi_myrank;
316
317 void CmiMemLock() {}
318 void CmiMemUnlock() {}
319
320 #define CmiGetState() (&Cmi_state)
321 #define CmiGetStateN(n) (&Cmi_state)
322
323 void CmiYield(void) {
324     sleep(0);
325 }
326
327 static void CmiStartThreads(char **argv) {
328     CmiStateInit(Cmi_nodestart, 0, &Cmi_state);
329     _Cmi_mype = Cmi_nodestart;
330     _Cmi_myrank = 0;
331 }
332 #else
333 /************** SMP *******************/
334 INLINE_KEYWORD int CmiMyPe() {
335     return CmiGetState()->pe;
336 }
337 INLINE_KEYWORD int CmiMyRank() {
338     return CmiGetState()->rank;
339 }
340 INLINE_KEYWORD int CmiNodeFirst(int node) {
341     return node*_Cmi_mynodesize;
342 }
343 INLINE_KEYWORD int CmiNodeSize(int node) {
344     return _Cmi_mynodesize;
345 }
346 INLINE_KEYWORD int CmiNodeOf(int pe) {
347     return (pe/_Cmi_mynodesize);
348 }
349 INLINE_KEYWORD int CmiRankOf(int pe) {
350     return pe%_Cmi_mynodesize;
351 }
352 #endif
353 CsvDeclare(CmiNodeState, NodeState);
354 /* ===== End of Processor/Node State-related Stuff =====*/
355
356 #include "machine-broadcast.c"
357 #include "immediate.c"
358 #include "machine-commthd-util.c"
359
360 /* ===== Beginning of Common Function Definitions ===== */
361 static void PerrorExit(const char *msg) {
362     perror(msg);
363     exit(1);
364 }
365
366 /* ##### Beginning of Functions Related with Message Sending OPs ##### */
367 /*Add a message to this processor's receive queue, pe is a rank */
368 void CmiPushPE(int rank,void *msg) {
369     CmiState cs = CmiGetStateN(rank);
370     MACHSTATE2(3,"Pushing message into rank %d's queue %p{",rank, cs->recv);
371 #if CMK_IMMEDIATE_MSG
372     if (CmiIsImmediate(msg)) {
373         MACHSTATE1(3, "[%p] Push Immediate Message begin{",CmiGetState());
374         CMI_DEST_RANK(msg) = rank;
375         CmiPushImmediateMsg(msg);
376         MACHSTATE1(3, "[%p] Push Immediate Message end}",CmiGetState());
377         return;
378     }
379 #endif
380
381     PCQueuePush(cs->recv,(char*)msg);
382
383 #if CMK_SHARED_VARS_POSIX_THREADS_SMP
384   if (_Cmi_noprocforcommthread)
385 #endif
386     CmiIdleLock_addMessage(&cs->idle);
387     MACHSTATE1(3,"} Pushing message into rank %d's queue done",rank);
388 }
389
390 #if CMK_NODE_QUEUE_AVAILABLE
391 /*Add a message to this processor's receive queue */
392 void CmiPushNode(void *msg) {
393     MACHSTATE(3,"Pushing message into NodeRecv queue");
394 #if CMK_IMMEDIATE_MSG
395     if (CmiIsImmediate(msg)) {
396         CMI_DEST_RANK(msg) = 0;
397         CmiPushImmediateMsg(msg);
398         return;
399     }
400 #endif
401     CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
402     PCQueuePush(CsvAccess(NodeState).NodeRecv,msg);
403     CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
404
405 #if CMK_SHARED_VARS_POSIX_THREADS_SMP
406     if (_Cmi_noprocforcommthread)
407 #endif
408     {
409         CmiState cs=CmiGetStateN(0);
410         CmiIdleLock_addMessage(&cs->idle);
411     }
412 }
413 #endif
414
415 /* This function handles the msg received as which queue to push into */
416 static INLINE_KEYWORD void handleOneRecvedMsg(int size, char *msg) {
417     int isBcastMsg = 0;
418 #if CMK_BROADCAST_SPANNING_TREE || CMK_BROADCAST_HYPERCUBE
419     isBcastMsg = (CMI_BROADCAST_ROOT(msg)!=0);
420 #endif
421
422     if (isBcastMsg) {
423         handleOneBcastMsg(size, msg);
424         return;
425     }
426
427 #if CMK_NODE_QUEUE_AVAILABLE
428     if (CMI_DEST_RANK(msg)==DGRAM_NODEMESSAGE){
429         CmiPushNode(msg);
430         return;
431     }
432 #endif
433     CmiPushPE(CMI_DEST_RANK(msg), msg);
434
435 }
436
437
438 static void SendToPeers(int size, char *msg) {
439     /* FIXME: now it's just a flat p2p send!! When node size is large,
440     * it should also be sent in a tree
441     */
442     int exceptRank = CMI_DEST_RANK(msg);
443     int i;
444     for (i=0; i<exceptRank; i++) {
445         CmiPushPE(i, CopyMsg(msg, size));
446     }
447     for (i=exceptRank+1; i<CmiMyNodeSize(); i++) {
448         CmiPushPE(i, CopyMsg(msg, size));
449     }
450 }
451
452
453 /* Functions regarding sending operations */
454 static void CmiSendSelf(char *msg) {
455 #if CMK_IMMEDIATE_MSG
456     if (CmiIsImmediate(msg)) {
457         /* CmiBecomeNonImmediate(msg); */
458         CmiPushImmediateMsg(msg);
459         CmiHandleImmediate();
460         return;
461     }
462 #endif
463     CdsFifo_Enqueue(CpvAccess(CmiLocalQueue),msg);
464 }
465
466 /* Functions regarding P2P send op */
467 #if USE_COMMON_SYNC_P2P
468 void CmiSyncSendFn(int destPE, int size, char *msg) {
469     char *dupmsg = CopyMsg(msg, size);
470     CmiFreeSendFn(destPE, size, dupmsg);
471 }
472
473 #if CMK_USE_PXSHM
474 #include "machine-pxshm.c"
475 #endif
476 #if CMK_USE_XPMEM
477 #include "machine-xpmem.c"
478 #endif
479
480 static int refcount = 0;
481
482 #if CMK_USE_OOB
483 CpvExtern(int, _urgentSend);
484 #endif
485
486 /* a wrapper of LrtsSendFunc */
487 #if CMK_C_INLINE
488 inline 
489 #endif
490 CmiCommHandle CmiSendNetworkFunc(int destNode, int size, char *msg, int mode)
491 {
492         int rank;
493 #if CMK_USE_PXSHM
494         if (CmiValidPxshm(destNode, size)) {
495           CmiSendMessagePxshm(msg, size, destNode, &refcount);
496           //for (int i=0; i<refcount; i++) CmiReference(msg);
497           return 0;
498         }
499 #endif
500 #if CMK_USE_XPMEM
501         if (CmiValidXpmem(destNode, size)) {
502           CmiSendMessageXpmem(msg, size, destNode, &refcount);
503           //for (int i=0; i<refcount; i++) CmiReference(msg);
504           return 0;
505         }
506 #endif
507 #if CMK_PERSISTENT_COMM
508         if (CpvAccess(phs)) {
509           if (size > PERSIST_MIN_SIZE) {
510             CmiAssert(CpvAccess(curphs) < CpvAccess(phsSize));
511             LrtsSendPersistentMsg(CpvAccess(phs)[CpvAccess(curphs)], destNode, size, msg);
512             return 0;
513           }
514         }
515 #endif
516
517 #if CMK_WITH_STATS
518 if (MSG_STATISTIC)
519 {
520     int ret_log = _cmi_log2(size);
521     if(ret_log >21) ret_log = 21;
522     msg_histogram[ret_log]++;
523 }
524 #endif
525 #if CMK_USE_OOB
526     if (CpvAccess(_urgentSend)) mode |= OUT_OF_BAND;
527 #endif
528     return LrtsSendFunc(destNode, size, msg, mode);
529 }
530
531 void CmiFreeSendFn(int destPE, int size, char *msg) {
532     CMI_SET_BROADCAST_ROOT(msg, 0);
533     CQdCreate(CpvAccess(cQdState), 1);
534     if (CmiMyPe()==destPE) {
535         CmiSendSelf(msg);
536 #if CMK_PERSISTENT_COMM
537         if (CpvAccess(phs)) CpvAccess(curphs)++;
538 #endif
539     } 
540     else {
541         int destNode = CmiNodeOf(destPE);
542         int destRank = CmiRankOf(destPE);
543 #if CMK_SMP
544         if (CmiMyNode()==destNode) {
545             CmiPushPE(destRank, msg);
546 #if CMK_PERSISTENT_COMM
547             if (CpvAccess(phs)) CpvAccess(curphs)++;
548 #endif
549             return;
550         }
551 #endif
552         CMI_DEST_RANK(msg) = destRank;
553         CmiSendNetworkFunc(destNode, size, msg, P2P_SYNC);
554 #if CMK_PERSISTENT_COMM
555         if (CpvAccess(phs)) CpvAccess(curphs)++;
556 #endif
557     }
558 }
559 #endif
560
561 #if USE_COMMON_ASYNC_P2P
562 CmiCommHandle CmiAsyncSendFn(int destPE, int size, char *msg) {
563     int destNode = CmiNodeOf(destPE);
564     if (destNode == CmiMyNode()) {
565         CmiSyncSendFn(destPE,size,msg);
566         return 0;
567     } else {
568 #if CMK_WITH_STATS
569 if (  MSG_STATISTIC)
570 {
571     int ret_log = _cmi_log2(size);
572         if(ret_log >21) ret_log = 21;
573         msg_histogram[ret_log]++;
574 }
575 #endif
576         return LrtsSendFunc(destPE, size, msg, P2P_ASYNC);
577     }
578 }
579 #endif
580
581 #if CMK_NODE_QUEUE_AVAILABLE
582 static void CmiSendNodeSelf(char *msg) {
583 #if CMK_IMMEDIATE_MSG
584     if (CmiIsImmediate(msg)) {
585         CmiPushImmediateMsg(msg);
586         if (!_immRunning) CmiHandleImmediate();
587         return;
588     }
589 #endif
590     CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
591     PCQueuePush(CsvAccess(NodeState).NodeRecv, msg);
592     CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
593 }
594
595 #if USE_COMMON_ASYNC_P2P
596 void CmiSyncNodeSendFn(int destNode, int size, char *msg) {
597     char *dupmsg = CopyMsg(msg, size);
598     CmiFreeNodeSendFn(destNode, size, dupmsg);
599 }
600
601 void CmiFreeNodeSendFn(int destNode, int size, char *msg) {
602     CMI_DEST_RANK(msg) = DGRAM_NODEMESSAGE;
603     CQdCreate(CpvAccess(cQdState), 1);
604     CMI_SET_BROADCAST_ROOT(msg, 0);
605     if (destNode == CmiMyNode()) {
606         CmiSendNodeSelf(msg);
607     } else {
608 #if CMK_WITH_STATS
609 if (  MSG_STATISTIC)
610 {
611     int ret_log = _cmi_log2(size);
612     if(ret_log >21) ret_log = 21;
613     msg_histogram[ret_log]++;
614 }
615 #endif
616         LrtsSendFunc(destNode, size, msg, P2P_SYNC);
617     }
618 }
619 #endif
620
621 #if USE_COMMON_ASYNC_P2P
622 CmiCommHandle CmiAsyncNodeSendFn(int destNode, int size, char *msg) {
623     if (destNode == CmiMyNode()) {
624         CmiSyncNodeSendFn(destNode, size, msg);
625         return 0;
626     } else {
627 #if CMK_WITH_STATS
628 if (  MSG_STATISTIC)
629 {
630         int ret_log = _cmi_log2(size);
631         if(ret_log >21) ret_log = 21;
632         msg_histogram[ret_log]++;
633 }
634 #endif
635         return LrtsSendFunc(destNode, size, msg, P2P_ASYNC);
636     }
637 }
638 #endif
639 #endif
640
641 /* ##### Beginning of Functions Related with Machine Startup ##### */
642 void ConverseInit(int argc, char **argv, CmiStartFn fn, int usched, int initret) {
643     int _ii;
644     int tmp;
645 #if CMK_WITH_STATS
646     MSG_STATISTIC = CmiGetArgFlag(argv, "+msgstatistic");
647 #endif
648     /* processor per node */
649     _Cmi_mynodesize = 1;
650     if (!CmiGetArgInt(argv,"+ppn", &_Cmi_mynodesize))
651         CmiGetArgInt(argv,"++ppn", &_Cmi_mynodesize);
652 #if ! CMK_SMP
653     if (_Cmi_mynodesize > 1 && _Cmi_mynode == 0)
654         CmiAbort("+ppn cannot be used in non SMP version!\n");
655 #endif
656
657     /* Network progress function is used to poll the network when for
658     messages. This flushes receive buffers on some  implementations*/
659     networkProgressPeriod = NETWORK_PROGRESS_PERIOD_DEFAULT;
660     CmiGetArgInt(argv, "+networkProgressPeriod", &networkProgressPeriod);
661
662     /* _Cmi_mynodesize has to be obtained before LrtsInit
663      * because it may be used inside LrtsInit
664      */
665     /* argv could be changed inside LrtsInit */
666     /* Inside this function, the number of nodes and my node id are obtained */
667 #if CMK_WITH_STATS
668 if (  MSG_STATISTIC)
669 {
670     for(_ii=0; _ii<22; _ii++)
671         msg_histogram[_ii] = 0;
672 }
673 #endif
674
675     LrtsInit(&argc, &argv, &_Cmi_numnodes, &_Cmi_mynode);
676    
677         if (_Cmi_mynode==0) {
678 #if !CMK_SMP 
679                 printf("Charm++> Running on non-SMP mode\n");
680 #else
681                 printf("Charm++> Running on SMP mode, %d worker threads per process\n", _Cmi_mynodesize);
682                 if (Cmi_smp_mode_setting == COMM_THREAD_SEND_RECV) {
683                         printf("Charm++> The comm. thread both sends and receives messages\n");
684                 } else if (Cmi_smp_mode_setting == COMM_THREAD_ONLY_RECV) {
685                         printf("Charm++> The comm. thread only receives messages, while work threads send messages\n");
686                 } else if (Cmi_smp_mode_setting == COMM_WORK_THREADS_SEND_RECV) {
687                         printf("Charm++> Both  comm. thread and worker thread send and messages\n");
688                 } else if (Cmi_smp_mode_setting == COMM_THREAD_NOT_EXIST) {
689                         printf("Charm++> There's no comm. thread. Work threads both send and receive messages\n");
690                 } else {
691                         CmiAbort("Charm++> Invalid SMP mode setting\n");
692                 }
693 #endif
694         }
695
696     _Cmi_numpes = _Cmi_numnodes * _Cmi_mynodesize;
697     Cmi_nodestart = _Cmi_mynode * _Cmi_mynodesize;
698     Cmi_argvcopy = CmiCopyArgs(argv);
699     Cmi_argv = argv;
700     Cmi_startfn = fn;
701     Cmi_usrsched = usched;
702
703 #if CMK_USE_PXSHM
704     CmiInitPxshm(argv);
705 #endif
706 #if CMK_USE_XPMEM
707     CmiInitXpmem(argv);
708 #endif
709
710     /* CmiTimerInit(); */
711 #if CMK_BROADCAST_HYPERCUBE
712     /* CmiNodesDim = ceil(log2(CmiNumNodes)) except when #nodes is 1*/
713     tmp = CmiNumNodes()-1;
714     CmiNodesDim = 0;
715     while (tmp>0) {
716         CmiNodesDim++;
717         tmp = tmp >> 1;
718     }
719     if (CmiNumNodes()==1) CmiNodesDim=1;
720 #endif
721
722     CsvInitialize(CmiNodeState, NodeState);
723     CmiNodeStateInit(&CsvAccess(NodeState));
724 #if CMK_SMP
725     commThdExitLock = CmiCreateLock();
726 #endif
727
728 #if CMK_OFFLOAD_BCAST_PROCESS
729     /* the actual queues should be created on comm thread considering NUMA in SMP */
730     CsvInitialize(PCQueue, procBcastQ);
731 #if CMK_NODE_QUEUE_AVAILABLE
732     CsvInitialize(PCQueue, nodeBcastQ);
733 #endif
734 #endif
735
736 #if CMK_SMP && CMK_LEVERAGE_COMMTHREAD
737     CsvInitialize(PCQueue, notifyCommThdMsgBuffer);
738 #endif
739
740     CmiStartThreads(argv);
741
742     ConverseRunPE(initret);
743 }
744
745 extern void ConverseCommonInit(char **argv);
746 extern void CthInit(char **argv);
747
748 static void ConverseRunPE(int everReturn) {
749     CmiState cs;
750     char** CmiMyArgv;
751
752     LrtsPreCommonInit(everReturn);
753
754 #if CMK_OFFLOAD_BCAST_PROCESS
755     int createQueue = 1;
756 #if CMK_SMP
757 #if CMK_SMP_NO_COMMTHD
758     /* If there's no comm thread, then the queue is created on rank 0 */
759     if (CmiMyRank()) createQueue = 0;
760 #else
761     if (CmiMyRank()<CmiMyNodeSize()) createQueue = 0;
762 #endif
763 #endif
764
765     if (createQueue) {
766         CsvAccess(procBcastQ) = PCQueueCreate();
767 #if CMK_NODE_QUEUE_AVAILABLE
768         CsvAccess(nodeBcastQ) = PCQueueCreate();
769 #endif
770     }
771 #endif
772
773     CmiNodeAllBarrier();
774
775     cs = CmiGetState();
776     CpvInitialize(void *,CmiLocalQueue);
777     CpvAccess(CmiLocalQueue) = cs->localqueue;
778
779     if (CmiMyRank())
780         CmiMyArgv=CmiCopyArgs(Cmi_argvcopy);
781     else
782         CmiMyArgv=Cmi_argv;
783
784     CthInit(CmiMyArgv);
785
786     /* initialize the network progress counter*/
787     /* Network progress function is used to poll the network when for
788        messages. This flushes receive buffers on some  implementations*/
789     CpvInitialize(unsigned , networkProgressCount);
790     CpvAccess(networkProgressCount) = 0;
791
792     ConverseCommonInit(CmiMyArgv);
793
794     LrtsPostCommonInit(everReturn);
795
796 #if CMK_SMP && CMK_LEVERAGE_COMMTHREAD
797     CmiInitNotifyCommThdScheme();
798 #endif
799     /* Converse initialization finishes, immediate messages can be processed.
800        node barrier previously should take care of the node synchronization */
801     _immediateReady = 1;
802
803     if(CharmLibInterOperate) {
804         /* !!! Not considering SMP mode now */
805         /* TODO: make interoperability working in SMP!!! */
806         Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
807         CsdScheduler(-1);
808     } else {
809       /* communication thread */
810       if (CmiMyRank() == CmiMyNodeSize()) {
811         Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
812         while (1) CommunicationServerThread(5);
813       } else { /* worker thread */
814         if (!everReturn) {
815           Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
816           if (Cmi_usrsched==0) CsdScheduler(-1);
817           ConverseExit();
818         }
819       }
820     }
821 }
822 /* ##### End of Functions Related with Machine Startup ##### */
823
824 /* ##### Beginning of Functions Related with Machine Running ##### */
825 static INLINE_KEYWORD void AdvanceCommunication(int whenidle) {
826     int doProcessBcast = 1;
827
828 #if CMK_USE_PXSHM
829     CommunicationServerPxshm();
830 #endif
831 #if CMK_USE_XPMEM
832     CommunicationServerXpmem();
833 #endif
834
835     LrtsAdvanceCommunication(whenidle);
836
837 #if CMK_OFFLOAD_BCAST_PROCESS
838 #if CMK_SMP_NO_COMMTHD
839     /*FIXME: only asks rank 0 to process bcast msgs, so perf may suffer*/
840     if (CmiMyRank()) doProcessBcast = 0;
841 #endif
842     if (doProcessBcast) processBcastQs();
843 #endif
844
845 #if CMK_IMMEDIATE_MSG
846 #if !CMK_SMP
847     CmiHandleImmediate();
848 #endif
849 #if CMK_SMP && CMK_SMP_NO_COMMTHD
850     if (CmiMyRank()==0) CmiHandleImmediate();
851 #endif
852 #endif
853 }
854
855 extern void ConverseCommonExit();
856
857 static void CommunicationServer(int sleepTime) {
858 #if CMK_SMP
859     AdvanceCommunication(1);
860
861     if (commThdExit == CmiMyNodeSize()) {
862         MACHSTATE(2, "CommunicationServer exiting {");
863         LrtsDrainResources();
864         MACHSTATE(2, "} CommunicationServer EXIT");
865
866         ConverseCommonExit();
867
868 #if CMK_USE_PXSHM
869         CmiExitPxshm();
870 #endif
871 #if CMK_USE_XPMEM
872         CmiExitXpmem();
873 #endif
874         LrtsExit();
875     }
876 #endif
877 }
878
879 static void CommunicationServerThread(int sleepTime) {
880     CommunicationServer(sleepTime);
881 #if CMK_IMMEDIATE_MSG
882     CmiHandleImmediate();
883 #endif
884 }
885
886 void ConverseExit(void) {
887     int i;
888 #if !CMK_SMP
889     LrtsDrainResources();
890 #else
891         if(Cmi_smp_mode_setting == COMM_THREAD_ONLY_RECV
892            || Cmi_smp_mode_setting == COMM_THREAD_NOT_EXIST)
893                 LrtsDrainResources();
894 #endif
895
896     ConverseCommonExit();
897
898 #if CMK_WITH_STATS
899 if (MSG_STATISTIC)
900 {
901     for(i=0; i<22; i++)
902     {
903         CmiPrintf("[MSG PE:%d]", CmiMyPe());
904         if(msg_histogram[i] >0)
905             CmiPrintf("(%d:%d) ", 1<<i, msg_histogram[i]);
906     }
907     CmiPrintf("\n");
908 }
909 #endif
910
911 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
912     if (CmiMyPe() == 0) CmiPrintf("End of program\n");
913 #endif
914
915 #if !CMK_SMP
916 #if CMK_USE_PXSHM
917     CmiExitPxshm();
918 #endif
919 #if CMK_USE_XPMEM
920     CmiExitXpmem();
921 #endif
922     LrtsExit();
923 #else
924     /* In SMP, the communication thread will exit */
925     /* atomic increment */
926     CmiLock(commThdExitLock);
927     commThdExit++;
928     CmiUnlock(commThdExitLock);
929     while (1) CmiYield();
930 #endif
931 }
932 /* ##### End of Functions Related with Machine Running ##### */
933
934 void CmiAbort(const char *message) {
935 #if CMK_USE_PXSHM
936     CmiExitPxshm();
937 #endif
938 #if CMK_USE_XPMEM
939     CmiExitXpmem();
940 #endif
941     LrtsAbort(message);
942 }
943
944 /* ##### Beginning of Functions Providing Incoming Network Messages ##### */
945 void *CmiGetNonLocal(void) {
946     CmiState cs = CmiGetState();
947     void *msg = NULL;
948
949 #if !CMK_SMP || CMK_SMP_NO_COMMTHD
950     /**
951       * In SMP mode with comm thread, it's possible a normal
952       * msg is sent from an immediate msg which is executed
953       * on comm thread. In this case, the msg is sent to
954       * the network queue of the work thread. Therefore,
955       * even there's only one worker thread, the polling of
956       * network queue is still required.
957       */
958     if (CmiNumPes() == 1) return NULL;
959 #endif
960
961     MACHSTATE2(3, "[%p] CmiGetNonLocal begin %d{", cs, CmiMyPe());
962     CmiIdleLock_checkMessage(&cs->idle);
963     /* ?????although it seems that lock is not needed, I found it crashes very often
964        on mpi-smp without lock */
965     msg = PCQueuePop(cs->recv);
966 #if !CMK_SMP
967     if (!msg) {
968        AdvanceCommunication(0);
969        msg = PCQueuePop(cs->recv);
970     }
971 #else
972 //    LrtsPostNonLocal();
973 #endif
974
975     MACHSTATE3(3,"[%p] CmiGetNonLocal from queue %p with msg %p end }",CmiGetState(),(cs->recv), msg);
976
977     return msg;
978 }
979
980 #if CMK_NODE_QUEUE_AVAILABLE
981 void *CmiGetNonLocalNodeQ(void) {
982     CmiState cs = CmiGetState();
983     char *result = 0;
984     CmiIdleLock_checkMessage(&cs->idle);
985     if (!PCQueueEmpty(CsvAccess(NodeState).NodeRecv)) {
986         MACHSTATE1(3,"CmiGetNonLocalNodeQ begin %d {", CmiMyPe());
987         CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
988         result = (char *) PCQueuePop(CsvAccess(NodeState).NodeRecv);
989         CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
990         MACHSTATE1(3,"} CmiGetNonLocalNodeQ end %d ", CmiMyPe());
991     }
992
993     return result;
994 }
995 #endif
996 /* ##### End of Functions Providing Incoming Network Messages ##### */
997
998 static CmiIdleState *CmiNotifyGetState(void) {
999     CmiIdleState *s=(CmiIdleState *)malloc(sizeof(CmiIdleState));
1000     s->sleepMs=0;
1001     s->nIdles=0;
1002     s->cs=CmiGetState();
1003     return s;
1004 }
1005
1006 static void CmiNotifyBeginIdle(CmiIdleState *s) {
1007     s->sleepMs=0;
1008     s->nIdles=0;
1009 }
1010
1011 /*Number of times to spin before sleeping*/
1012 #define SPINS_BEFORE_SLEEP 20
1013 static void CmiNotifyStillIdle(CmiIdleState *s) {
1014     MACHSTATE1(2,"still idle (%d) begin {",CmiMyPe())
1015 #if !CMK_SMP
1016     AdvanceCommunication(1);
1017 #else
1018     LrtsPostNonLocal();
1019
1020     if (_Cmi_noprocforcommthread) {
1021     s->nIdles++;
1022     if (s->nIdles>SPINS_BEFORE_SLEEP) { /*Start giving some time back to the OS*/
1023         s->sleepMs+=2;
1024         if (s->sleepMs>10) s->sleepMs=10;
1025     }
1026
1027     if (s->sleepMs>0) {
1028         MACHSTATE1(2,"idle lock(%d) {",CmiMyPe())
1029         CmiIdleLock_sleep(&s->cs->idle,s->sleepMs);
1030         MACHSTATE1(2,"} idle lock(%d)",CmiMyPe())
1031     }
1032     }
1033 #endif
1034
1035     MACHSTATE1(2,"still idle (%d) end {",CmiMyPe())
1036 }
1037
1038 /* usually called in non-smp mode */
1039 void CmiNotifyIdle(void) {
1040     AdvanceCommunication(1);
1041     CmiYield();
1042 }
1043
1044 /* Utiltiy functions */
1045 static char *CopyMsg(char *msg, int len) {
1046     char *copy = (char *)CmiAlloc(len);
1047 #if CMK_ERROR_CHECKING
1048     if (!copy) {
1049         CmiAbort("Error: out of memory in machine layer\n");
1050     }
1051 #endif
1052     memcpy(copy, msg, len);
1053     return copy;
1054 }
1055
1056
1057